Merge pull request #32654 from soltysh/sj_clientset

Automatic merge from submit-queue

Switch ScheduledJob controller to use clientset

**What this PR does / why we need it**:
This is part of #25442. I've applied here the same fix I've applied in the manual client in #29187, see the 1st commit for that (@caesarxuchao we've talked about it in #29856).

@deads2k as promised 
@janetkuo ptal
This commit is contained in:
Kubernetes Submit Queue 2016-09-16 05:03:57 -07:00 committed by GitHub
commit e8fbcb1669
18 changed files with 77 additions and 110 deletions

View File

@ -404,13 +404,9 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
glog.Infof("Starting %s apis", groupVersion) glog.Infof("Starting %s apis", groupVersion)
if containsResource(resources, "scheduledjobs") { if containsResource(resources, "scheduledjobs") {
glog.Infof("Starting scheduledjob controller") glog.Infof("Starting scheduledjob controller")
// TODO: this is a temp fix for allowing kubeClient list v2alpha1 sj, should switch to using clientset // // TODO: this is a temp fix for allowing kubeClient list v2alpha1 sj, should switch to using clientset
kubeconfig.ContentConfig.GroupVersion = &unversioned.GroupVersion{Group: batch.GroupName, Version: "v2alpha1"} kubeconfig.ContentConfig.GroupVersion = &unversioned.GroupVersion{Group: batch.GroupName, Version: "v2alpha1"}
kubeClient, err := client.New(kubeconfig) go scheduledjob.NewScheduledJobController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "scheduledjob-controller"))).
if err != nil {
glog.Fatalf("Invalid API configuration: %v", err)
}
go scheduledjob.NewScheduledJobController(kubeClient).
Run(wait.NeverStop) Run(wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))

View File

@ -204,12 +204,10 @@ func setConfigDefaults(config *$.Config|raw$) error {
if config.UserAgent == "" { if config.UserAgent == "" {
config.UserAgent = $.DefaultKubernetesUserAgent|raw$() config.UserAgent = $.DefaultKubernetesUserAgent|raw$()
} }
// TODO: Unconditionally set the config.Version, until we fix the config. if config.GroupVersion == nil || config.GroupVersion.Group != g.GroupVersion.Group {
//if config.Version == "" { copyGroupVersion := g.GroupVersion
copyGroupVersion := g.GroupVersion config.GroupVersion = &copyGroupVersion
config.GroupVersion = &copyGroupVersion }
//}
config.NegotiatedSerializer = $.codecs|raw$ config.NegotiatedSerializer = $.codecs|raw$
if config.QPS == 0 { if config.QPS == 0 {

View File

@ -74,12 +74,10 @@ func setConfigDefaults(config *restclient.Config) error {
if config.UserAgent == "" { if config.UserAgent == "" {
config.UserAgent = restclient.DefaultKubernetesUserAgent() config.UserAgent = restclient.DefaultKubernetesUserAgent()
} }
// TODO: Unconditionally set the config.Version, until we fix the config. if config.GroupVersion == nil || config.GroupVersion.Group != g.GroupVersion.Group {
//if config.Version == "" { copyGroupVersion := g.GroupVersion
copyGroupVersion := g.GroupVersion config.GroupVersion = &copyGroupVersion
config.GroupVersion = &copyGroupVersion }
//}
config.NegotiatedSerializer = api.Codecs config.NegotiatedSerializer = api.Codecs
if config.QPS == 0 { if config.QPS == 0 {

View File

@ -89,12 +89,10 @@ func setConfigDefaults(config *restclient.Config) error {
if config.UserAgent == "" { if config.UserAgent == "" {
config.UserAgent = restclient.DefaultKubernetesUserAgent() config.UserAgent = restclient.DefaultKubernetesUserAgent()
} }
// TODO: Unconditionally set the config.Version, until we fix the config. if config.GroupVersion == nil || config.GroupVersion.Group != g.GroupVersion.Group {
//if config.Version == "" { copyGroupVersion := g.GroupVersion
copyGroupVersion := g.GroupVersion config.GroupVersion = &copyGroupVersion
config.GroupVersion = &copyGroupVersion }
//}
config.NegotiatedSerializer = api.Codecs config.NegotiatedSerializer = api.Codecs
if config.QPS == 0 { if config.QPS == 0 {

View File

@ -79,12 +79,10 @@ func setConfigDefaults(config *restclient.Config) error {
if config.UserAgent == "" { if config.UserAgent == "" {
config.UserAgent = restclient.DefaultKubernetesUserAgent() config.UserAgent = restclient.DefaultKubernetesUserAgent()
} }
// TODO: Unconditionally set the config.Version, until we fix the config. if config.GroupVersion == nil || config.GroupVersion.Group != g.GroupVersion.Group {
//if config.Version == "" { copyGroupVersion := g.GroupVersion
copyGroupVersion := g.GroupVersion config.GroupVersion = &copyGroupVersion
config.GroupVersion = &copyGroupVersion }
//}
config.NegotiatedSerializer = api.Codecs config.NegotiatedSerializer = api.Codecs
if config.QPS == 0 { if config.QPS == 0 {

View File

@ -74,12 +74,10 @@ func setConfigDefaults(config *restclient.Config) error {
if config.UserAgent == "" { if config.UserAgent == "" {
config.UserAgent = restclient.DefaultKubernetesUserAgent() config.UserAgent = restclient.DefaultKubernetesUserAgent()
} }
// TODO: Unconditionally set the config.Version, until we fix the config. if config.GroupVersion == nil || config.GroupVersion.Group != g.GroupVersion.Group {
//if config.Version == "" { copyGroupVersion := g.GroupVersion
copyGroupVersion := g.GroupVersion config.GroupVersion = &copyGroupVersion
config.GroupVersion = &copyGroupVersion }
//}
config.NegotiatedSerializer = api.Codecs config.NegotiatedSerializer = api.Codecs
if config.QPS == 0 { if config.QPS == 0 {

View File

@ -74,12 +74,10 @@ func setConfigDefaults(config *restclient.Config) error {
if config.UserAgent == "" { if config.UserAgent == "" {
config.UserAgent = restclient.DefaultKubernetesUserAgent() config.UserAgent = restclient.DefaultKubernetesUserAgent()
} }
// TODO: Unconditionally set the config.Version, until we fix the config. if config.GroupVersion == nil || config.GroupVersion.Group != g.GroupVersion.Group {
//if config.Version == "" { copyGroupVersion := g.GroupVersion
copyGroupVersion := g.GroupVersion config.GroupVersion = &copyGroupVersion
config.GroupVersion = &copyGroupVersion }
//}
config.NegotiatedSerializer = api.Codecs config.NegotiatedSerializer = api.Codecs
if config.QPS == 0 { if config.QPS == 0 {

View File

@ -74,12 +74,10 @@ func setConfigDefaults(config *restclient.Config) error {
if config.UserAgent == "" { if config.UserAgent == "" {
config.UserAgent = restclient.DefaultKubernetesUserAgent() config.UserAgent = restclient.DefaultKubernetesUserAgent()
} }
// TODO: Unconditionally set the config.Version, until we fix the config. if config.GroupVersion == nil || config.GroupVersion.Group != g.GroupVersion.Group {
//if config.Version == "" { copyGroupVersion := g.GroupVersion
copyGroupVersion := g.GroupVersion config.GroupVersion = &copyGroupVersion
config.GroupVersion = &copyGroupVersion }
//}
config.NegotiatedSerializer = api.Codecs config.NegotiatedSerializer = api.Codecs
if config.QPS == 0 { if config.QPS == 0 {

View File

@ -84,12 +84,10 @@ func setConfigDefaults(config *restclient.Config) error {
if config.UserAgent == "" { if config.UserAgent == "" {
config.UserAgent = restclient.DefaultKubernetesUserAgent() config.UserAgent = restclient.DefaultKubernetesUserAgent()
} }
// TODO: Unconditionally set the config.Version, until we fix the config. if config.GroupVersion == nil || config.GroupVersion.Group != g.GroupVersion.Group {
//if config.Version == "" { copyGroupVersion := g.GroupVersion
copyGroupVersion := g.GroupVersion config.GroupVersion = &copyGroupVersion
config.GroupVersion = &copyGroupVersion }
//}
config.NegotiatedSerializer = api.Codecs config.NegotiatedSerializer = api.Codecs
if config.QPS == 0 { if config.QPS == 0 {

View File

@ -74,12 +74,10 @@ func setConfigDefaults(config *restclient.Config) error {
if config.UserAgent == "" { if config.UserAgent == "" {
config.UserAgent = restclient.DefaultKubernetesUserAgent() config.UserAgent = restclient.DefaultKubernetesUserAgent()
} }
// TODO: Unconditionally set the config.Version, until we fix the config. if config.GroupVersion == nil || config.GroupVersion.Group != g.GroupVersion.Group {
//if config.Version == "" { copyGroupVersion := g.GroupVersion
copyGroupVersion := g.GroupVersion config.GroupVersion = &copyGroupVersion
config.GroupVersion = &copyGroupVersion }
//}
config.NegotiatedSerializer = api.Codecs config.NegotiatedSerializer = api.Codecs
if config.QPS == 0 { if config.QPS == 0 {

View File

@ -79,12 +79,10 @@ func setConfigDefaults(config *restclient.Config) error {
if config.UserAgent == "" { if config.UserAgent == "" {
config.UserAgent = restclient.DefaultKubernetesUserAgent() config.UserAgent = restclient.DefaultKubernetesUserAgent()
} }
// TODO: Unconditionally set the config.Version, until we fix the config. if config.GroupVersion == nil || config.GroupVersion.Group != g.GroupVersion.Group {
//if config.Version == "" { copyGroupVersion := g.GroupVersion
copyGroupVersion := g.GroupVersion config.GroupVersion = &copyGroupVersion
config.GroupVersion = &copyGroupVersion }
//}
config.NegotiatedSerializer = api.Codecs config.NegotiatedSerializer = api.Codecs
if config.QPS == 0 { if config.QPS == 0 {

View File

@ -74,12 +74,10 @@ func setConfigDefaults(config *restclient.Config) error {
if config.UserAgent == "" { if config.UserAgent == "" {
config.UserAgent = restclient.DefaultKubernetesUserAgent() config.UserAgent = restclient.DefaultKubernetesUserAgent()
} }
// TODO: Unconditionally set the config.Version, until we fix the config. if config.GroupVersion == nil || config.GroupVersion.Group != g.GroupVersion.Group {
//if config.Version == "" { copyGroupVersion := g.GroupVersion
copyGroupVersion := g.GroupVersion config.GroupVersion = &copyGroupVersion
config.GroupVersion = &copyGroupVersion }
//}
config.NegotiatedSerializer = api.Codecs config.NegotiatedSerializer = api.Codecs
if config.QPS == 0 { if config.QPS == 0 {

View File

@ -149,12 +149,10 @@ func setConfigDefaults(config *restclient.Config) error {
if config.UserAgent == "" { if config.UserAgent == "" {
config.UserAgent = restclient.DefaultKubernetesUserAgent() config.UserAgent = restclient.DefaultKubernetesUserAgent()
} }
// TODO: Unconditionally set the config.Version, until we fix the config. if config.GroupVersion == nil || config.GroupVersion.Group != g.GroupVersion.Group {
//if config.Version == "" { copyGroupVersion := g.GroupVersion
copyGroupVersion := g.GroupVersion config.GroupVersion = &copyGroupVersion
config.GroupVersion = &copyGroupVersion }
//}
config.NegotiatedSerializer = api.Codecs config.NegotiatedSerializer = api.Codecs
if config.QPS == 0 { if config.QPS == 0 {

View File

@ -109,12 +109,10 @@ func setConfigDefaults(config *restclient.Config) error {
if config.UserAgent == "" { if config.UserAgent == "" {
config.UserAgent = restclient.DefaultKubernetesUserAgent() config.UserAgent = restclient.DefaultKubernetesUserAgent()
} }
// TODO: Unconditionally set the config.Version, until we fix the config. if config.GroupVersion == nil || config.GroupVersion.Group != g.GroupVersion.Group {
//if config.Version == "" { copyGroupVersion := g.GroupVersion
copyGroupVersion := g.GroupVersion config.GroupVersion = &copyGroupVersion
config.GroupVersion = &copyGroupVersion }
//}
config.NegotiatedSerializer = api.Codecs config.NegotiatedSerializer = api.Codecs
if config.QPS == 0 { if config.QPS == 0 {

View File

@ -89,12 +89,10 @@ func setConfigDefaults(config *restclient.Config) error {
if config.UserAgent == "" { if config.UserAgent == "" {
config.UserAgent = restclient.DefaultKubernetesUserAgent() config.UserAgent = restclient.DefaultKubernetesUserAgent()
} }
// TODO: Unconditionally set the config.Version, until we fix the config. if config.GroupVersion == nil || config.GroupVersion.Group != g.GroupVersion.Group {
//if config.Version == "" { copyGroupVersion := g.GroupVersion
copyGroupVersion := g.GroupVersion config.GroupVersion = &copyGroupVersion
config.GroupVersion = &copyGroupVersion }
//}
config.NegotiatedSerializer = api.Codecs config.NegotiatedSerializer = api.Codecs
if config.QPS == 0 { if config.QPS == 0 {

View File

@ -74,12 +74,10 @@ func setConfigDefaults(config *restclient.Config) error {
if config.UserAgent == "" { if config.UserAgent == "" {
config.UserAgent = restclient.DefaultKubernetesUserAgent() config.UserAgent = restclient.DefaultKubernetesUserAgent()
} }
// TODO: Unconditionally set the config.Version, until we fix the config. if config.GroupVersion == nil || config.GroupVersion.Group != g.GroupVersion.Group {
//if config.Version == "" { copyGroupVersion := g.GroupVersion
copyGroupVersion := g.GroupVersion config.GroupVersion = &copyGroupVersion
config.GroupVersion = &copyGroupVersion }
//}
config.NegotiatedSerializer = api.Codecs config.NegotiatedSerializer = api.Codecs
if config.QPS == 0 { if config.QPS == 0 {

View File

@ -38,8 +38,9 @@ import (
"k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/batch"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller/job" "k8s.io/kubernetes/pkg/controller/job"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
utilerrors "k8s.io/kubernetes/pkg/util/errors" utilerrors "k8s.io/kubernetes/pkg/util/errors"
@ -51,21 +52,21 @@ import (
// Utilities for dealing with Jobs and ScheduledJobs and time. // Utilities for dealing with Jobs and ScheduledJobs and time.
type ScheduledJobController struct { type ScheduledJobController struct {
kubeClient *client.Client kubeClient clientset.Interface
jobControl jobControlInterface jobControl jobControlInterface
sjControl sjControlInterface sjControl sjControlInterface
podControl podControlInterface podControl podControlInterface
recorder record.EventRecorder recorder record.EventRecorder
} }
func NewScheduledJobController(kubeClient *client.Client) *ScheduledJobController { func NewScheduledJobController(kubeClient clientset.Interface) *ScheduledJobController {
eventBroadcaster := record.NewBroadcaster() eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartLogging(glog.Infof)
// TODO: remove the wrapper when every clients have moved to use the clientset. // TODO: remove the wrapper when every clients have moved to use the clientset.
eventBroadcaster.StartRecordingToSink(kubeClient.Events("")) eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")})
if kubeClient != nil && kubeClient.GetRateLimiter() != nil { if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("scheduledjob_controller", kubeClient.GetRateLimiter()) metrics.RegisterMetricAndTrackRateLimiterUsage("scheduledjob_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
} }
jm := &ScheduledJobController{ jm := &ScheduledJobController{
@ -73,13 +74,13 @@ func NewScheduledJobController(kubeClient *client.Client) *ScheduledJobControlle
jobControl: realJobControl{KubeClient: kubeClient}, jobControl: realJobControl{KubeClient: kubeClient},
sjControl: &realSJControl{KubeClient: kubeClient}, sjControl: &realSJControl{KubeClient: kubeClient},
podControl: &realPodControl{KubeClient: kubeClient}, podControl: &realPodControl{KubeClient: kubeClient},
recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduled-job-controller"}), recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduledjob-controller"}),
} }
return jm return jm
} }
func NewScheduledJobControllerFromClient(kubeClient *client.Client) *ScheduledJobController { func NewScheduledJobControllerFromClient(kubeClient clientset.Interface) *ScheduledJobController {
jm := NewScheduledJobController(kubeClient) jm := NewScheduledJobController(kubeClient)
return jm return jm
} }

View File

@ -21,8 +21,8 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/batch"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
) )
@ -34,7 +34,7 @@ type sjControlInterface interface {
// realSJControl is the default implementation of sjControlInterface. // realSJControl is the default implementation of sjControlInterface.
type realSJControl struct { type realSJControl struct {
KubeClient *client.Client KubeClient clientset.Interface
} }
var _ sjControlInterface = &realSJControl{} var _ sjControlInterface = &realSJControl{}
@ -73,7 +73,7 @@ type jobControlInterface interface {
// realJobControl is the default implementation of jobControlInterface. // realJobControl is the default implementation of jobControlInterface.
type realJobControl struct { type realJobControl struct {
KubeClient *client.Client KubeClient clientset.Interface
Recorder record.EventRecorder Recorder record.EventRecorder
} }
@ -182,18 +182,18 @@ type podControlInterface interface {
// realPodControl is the default implementation of podControlInterface. // realPodControl is the default implementation of podControlInterface.
type realPodControl struct { type realPodControl struct {
KubeClient *client.Client KubeClient clientset.Interface
Recorder record.EventRecorder Recorder record.EventRecorder
} }
var _ podControlInterface = &realPodControl{} var _ podControlInterface = &realPodControl{}
func (r realPodControl) ListPods(namespace string, opts api.ListOptions) (*api.PodList, error) { func (r realPodControl) ListPods(namespace string, opts api.ListOptions) (*api.PodList, error) {
return r.KubeClient.Pods(namespace).List(opts) return r.KubeClient.Core().Pods(namespace).List(opts)
} }
func (r realPodControl) DeletePod(namespace string, name string) error { func (r realPodControl) DeletePod(namespace string, name string) error {
return r.KubeClient.Pods(namespace).Delete(name, nil) return r.KubeClient.Core().Pods(namespace).Delete(name, nil)
} }
type fakePodControl struct { type fakePodControl struct {