Merge pull request #24470 from deads2k/shared-cache-02

Automatic merge from submit-queue

update controllers watching all pods to share an informer

This plumbs the shared pod informer through the various controllers to avoid duplicated watches.
This commit is contained in:
k8s-merge-robot 2016-04-23 17:18:47 -07:00
commit ea15d792a1
9 changed files with 132 additions and 100 deletions

View File

@ -264,7 +264,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
KubeClient: resourceQuotaControllerClient, KubeClient: resourceQuotaControllerClient,
ResyncPeriod: controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod.Duration), ResyncPeriod: controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod.Duration),
Registry: resourceQuotaRegistry, Registry: resourceQuotaRegistry,
ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(resourceQuotaControllerClient), ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(podInformer, resourceQuotaControllerClient),
ReplenishmentResyncPeriod: ResyncPeriod(s), ReplenishmentResyncPeriod: ResyncPeriod(s),
GroupKindsToReplenish: groupKindsToReplenish, GroupKindsToReplenish: groupKindsToReplenish,
} }
@ -324,14 +324,14 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
if containsResource(resources, "daemonsets") { if containsResource(resources, "daemonsets") {
glog.Infof("Starting daemon set controller") glog.Infof("Starting daemon set controller")
go daemon.NewDaemonSetsController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), ResyncPeriod(s), s.LookupCacheSizeForDaemonSet). go daemon.NewDaemonSetsController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), ResyncPeriod(s), s.LookupCacheSizeForDaemonSet).
Run(s.ConcurrentDaemonSetSyncs, wait.NeverStop) Run(s.ConcurrentDaemonSetSyncs, wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
} }
if containsResource(resources, "jobs") { if containsResource(resources, "jobs") {
glog.Infof("Starting job controller") glog.Infof("Starting job controller")
go job.NewJobController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "job-controller")), ResyncPeriod(s)). go job.NewJobController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "job-controller"))).
Run(s.ConcurrentJobSyncs, wait.NeverStop) Run(s.ConcurrentJobSyncs, wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
} }

View File

@ -193,7 +193,7 @@ func (s *CMServer) Run(_ []string) error {
Registry: resourceQuotaRegistry, Registry: resourceQuotaRegistry,
GroupKindsToReplenish: groupKindsToReplenish, GroupKindsToReplenish: groupKindsToReplenish,
ReplenishmentResyncPeriod: s.resyncPeriod, ReplenishmentResyncPeriod: s.resyncPeriod,
ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(resourceQuotaControllerClient), ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactoryFromClient(resourceQuotaControllerClient),
} }
go resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions).Run(s.ConcurrentResourceQuotaSyncs, wait.NeverStop) go resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions).Run(s.ConcurrentResourceQuotaSyncs, wait.NeverStop)
@ -248,13 +248,13 @@ func (s *CMServer) Run(_ []string) error {
if containsResource(resources, "daemonsets") { if containsResource(resources, "daemonsets") {
glog.Infof("Starting daemon set controller") glog.Infof("Starting daemon set controller")
go daemon.NewDaemonSetsController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), s.resyncPeriod, s.LookupCacheSizeForDaemonSet). go daemon.NewDaemonSetsControllerFromClient(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), s.resyncPeriod, s.LookupCacheSizeForDaemonSet).
Run(s.ConcurrentDaemonSetSyncs, wait.NeverStop) Run(s.ConcurrentDaemonSetSyncs, wait.NeverStop)
} }
if containsResource(resources, "jobs") { if containsResource(resources, "jobs") {
glog.Infof("Starting job controller") glog.Infof("Starting job controller")
go job.NewJobController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "job-controller")), s.resyncPeriod). go job.NewJobControllerFromClient(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "job-controller")), s.resyncPeriod).
Run(s.ConcurrentJobSyncs, wait.NeverStop) Run(s.ConcurrentJobSyncs, wait.NeverStop)
} }

View File

@ -35,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
@ -69,6 +70,13 @@ type DaemonSetsController struct {
eventRecorder record.EventRecorder eventRecorder record.EventRecorder
podControl controller.PodControlInterface podControl controller.PodControlInterface
// internalPodInformer is used to hold a personal informer. If we're using
// a normal shared informer, then the informer will be started for us. If
// we have a personal informer, we must start it ourselves. If you start
// the controller using NewDaemonSetsController(passing SharedInformer), this
// will be null
internalPodInformer framework.SharedInformer
// An dsc is temporarily suspended after creating/deleting these many replicas. // An dsc is temporarily suspended after creating/deleting these many replicas.
// It resumes normal action after observing the watch events for them. // It resumes normal action after observing the watch events for them.
burstReplicas int burstReplicas int
@ -86,7 +94,7 @@ type DaemonSetsController struct {
// Watches changes to all daemon sets. // Watches changes to all daemon sets.
dsController *framework.Controller dsController *framework.Controller
// Watches changes to all pods // Watches changes to all pods
podController *framework.Controller podController framework.ControllerInterface
// Watches changes to all nodes. // Watches changes to all nodes.
nodeController *framework.Controller nodeController *framework.Controller
// podStoreSynced returns true if the pod store has been synced at least once. // podStoreSynced returns true if the pod store has been synced at least once.
@ -99,7 +107,7 @@ type DaemonSetsController struct {
queue *workqueue.Type queue *workqueue.Type
} }
func NewDaemonSetsController(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController { func NewDaemonSetsController(podInformer framework.SharedInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController {
eventBroadcaster := record.NewBroadcaster() eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartLogging(glog.Infof)
// TODO: remove the wrapper when every clients have moved to use the clientset. // TODO: remove the wrapper when every clients have moved to use the clientset.
@ -163,25 +171,18 @@ func NewDaemonSetsController(kubeClient clientset.Interface, resyncPeriod contro
}, },
}, },
) )
// Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete // Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete
// more pods until all the effects (expectations) of a daemon set's create/delete have been observed. // more pods until all the effects (expectations) of a daemon set's create/delete have been observed.
dsc.podStore.Store, dsc.podController = framework.NewInformer( podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
&cache.ListWatch{ AddFunc: dsc.addPod,
ListFunc: func(options api.ListOptions) (runtime.Object, error) { UpdateFunc: dsc.updatePod,
return dsc.kubeClient.Core().Pods(api.NamespaceAll).List(options) DeleteFunc: dsc.deletePod,
}, })
WatchFunc: func(options api.ListOptions) (watch.Interface, error) { dsc.podStore.Store = podInformer.GetStore()
return dsc.kubeClient.Core().Pods(api.NamespaceAll).Watch(options) dsc.podController = podInformer.GetController()
}, dsc.podStoreSynced = podInformer.HasSynced
},
&api.Pod{},
resyncPeriod(),
framework.ResourceEventHandlerFuncs{
AddFunc: dsc.addPod,
UpdateFunc: dsc.updatePod,
DeleteFunc: dsc.deletePod,
},
)
// Watch for new nodes or updates to nodes - daemon pods are launched on new nodes, and possibly when labels on nodes change, // Watch for new nodes or updates to nodes - daemon pods are launched on new nodes, and possibly when labels on nodes change,
dsc.nodeStore.Store, dsc.nodeController = framework.NewInformer( dsc.nodeStore.Store, dsc.nodeController = framework.NewInformer(
&cache.ListWatch{ &cache.ListWatch{
@ -200,11 +201,18 @@ func NewDaemonSetsController(kubeClient clientset.Interface, resyncPeriod contro
}, },
) )
dsc.syncHandler = dsc.syncDaemonSet dsc.syncHandler = dsc.syncDaemonSet
dsc.podStoreSynced = dsc.podController.HasSynced
dsc.lookupCache = controller.NewMatchingCache(lookupCacheSize) dsc.lookupCache = controller.NewMatchingCache(lookupCacheSize)
return dsc return dsc
} }
func NewDaemonSetsControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController {
podInformer := informers.CreateSharedPodInformer(kubeClient, resyncPeriod())
dsc := NewDaemonSetsController(podInformer, kubeClient, resyncPeriod, lookupCacheSize)
dsc.internalPodInformer = podInformer
return dsc
}
// Run begins watching and syncing daemon sets. // Run begins watching and syncing daemon sets.
func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) { func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
@ -215,6 +223,11 @@ func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
for i := 0; i < workers; i++ { for i := 0; i < workers; i++ {
go wait.Until(dsc.worker, time.Second, stopCh) go wait.Until(dsc.worker, time.Second, stopCh)
} }
if dsc.internalPodInformer != nil {
go dsc.internalPodInformer.Run(stopCh)
}
<-stopCh <-stopCh
glog.Infof("Shutting down Daemon Set Controller") glog.Infof("Shutting down Daemon Set Controller")
dsc.queue.ShutDown() dsc.queue.ShutDown()

View File

@ -133,7 +133,7 @@ func addPods(podStore cache.Store, nodeName string, label map[string]string, num
func newTestController() (*DaemonSetsController, *controller.FakePodControl) { func newTestController() (*DaemonSetsController, *controller.FakePodControl) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewDaemonSetsController(clientset, controller.NoResyncPeriodFunc, 0) manager := NewDaemonSetsControllerFromClient(clientset, controller.NoResyncPeriodFunc, 0)
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
podControl := &controller.FakePodControl{} podControl := &controller.FakePodControl{}
manager.podControl = podControl manager.podControl = podControl

View File

@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/controller/framework/informers"
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
@ -44,6 +45,13 @@ type JobController struct {
kubeClient clientset.Interface kubeClient clientset.Interface
podControl controller.PodControlInterface podControl controller.PodControlInterface
// internalPodInformer is used to hold a personal informer. If we're using
// a normal shared informer, then the informer will be started for us. If
// we have a personal informer, we must start it ourselves. If you start
// the controller using NewJobController(passing SharedInformer), this
// will be null
internalPodInformer framework.SharedInformer
// To allow injection of updateJobStatus for testing. // To allow injection of updateJobStatus for testing.
updateHandler func(job *extensions.Job) error updateHandler func(job *extensions.Job) error
syncHandler func(jobKey string) error syncHandler func(jobKey string) error
@ -61,8 +69,6 @@ type JobController struct {
// A store of pods, populated by the podController // A store of pods, populated by the podController
podStore cache.StoreToPodLister podStore cache.StoreToPodLister
// Watches changes to all pods
podController *framework.Controller
// Jobs that need to be updated // Jobs that need to be updated
queue *workqueue.Type queue *workqueue.Type
@ -70,7 +76,7 @@ type JobController struct {
recorder record.EventRecorder recorder record.EventRecorder
} }
func NewJobController(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) *JobController { func NewJobController(podInformer framework.SharedInformer, kubeClient clientset.Interface) *JobController {
eventBroadcaster := record.NewBroadcaster() eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartLogging(glog.Infof)
// TODO: remove the wrapper when every clients have moved to use the clientset. // TODO: remove the wrapper when every clients have moved to use the clientset.
@ -110,27 +116,24 @@ func NewJobController(kubeClient clientset.Interface, resyncPeriod controller.Re
}, },
) )
jm.podStore.Store, jm.podController = framework.NewInformer( podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
&cache.ListWatch{ AddFunc: jm.addPod,
ListFunc: func(options api.ListOptions) (runtime.Object, error) { UpdateFunc: jm.updatePod,
return jm.kubeClient.Core().Pods(api.NamespaceAll).List(options) DeleteFunc: jm.deletePod,
}, })
WatchFunc: func(options api.ListOptions) (watch.Interface, error) { jm.podStore.Store = podInformer.GetStore()
return jm.kubeClient.Core().Pods(api.NamespaceAll).Watch(options) jm.podStoreSynced = podInformer.HasSynced
},
},
&api.Pod{},
resyncPeriod(),
framework.ResourceEventHandlerFuncs{
AddFunc: jm.addPod,
UpdateFunc: jm.updatePod,
DeleteFunc: jm.deletePod,
},
)
jm.updateHandler = jm.updateJobStatus jm.updateHandler = jm.updateJobStatus
jm.syncHandler = jm.syncJob jm.syncHandler = jm.syncJob
jm.podStoreSynced = jm.podController.HasSynced return jm
}
func NewJobControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) *JobController {
podInformer := informers.CreateSharedPodInformer(kubeClient, resyncPeriod())
jm := NewJobController(podInformer, kubeClient)
jm.internalPodInformer = podInformer
return jm return jm
} }
@ -138,10 +141,14 @@ func NewJobController(kubeClient clientset.Interface, resyncPeriod controller.Re
func (jm *JobController) Run(workers int, stopCh <-chan struct{}) { func (jm *JobController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
go jm.jobController.Run(stopCh) go jm.jobController.Run(stopCh)
go jm.podController.Run(stopCh)
for i := 0; i < workers; i++ { for i := 0; i < workers; i++ {
go wait.Until(jm.worker, time.Second, stopCh) go wait.Until(jm.worker, time.Second, stopCh)
} }
if jm.internalPodInformer != nil {
go jm.internalPodInformer.Run(stopCh)
}
<-stopCh <-stopCh
glog.Infof("Shutting down Job Manager") glog.Infof("Shutting down Job Manager")
jm.queue.ShutDown() jm.queue.ShutDown()

View File

@ -18,8 +18,8 @@ package job
import ( import (
"fmt" "fmt"
"reflect"
"testing" "testing"
"time"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/testapi"
@ -32,7 +32,6 @@ import (
"k8s.io/kubernetes/pkg/client/unversioned/testclient" "k8s.io/kubernetes/pkg/client/unversioned/testclient"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/rand" "k8s.io/kubernetes/pkg/util/rand"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
) )
@ -208,7 +207,7 @@ func TestControllerSyncJob(t *testing.T) {
for name, tc := range testCases { for name, tc := range testCases {
// job manager setup // job manager setup
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewJobController(clientset, controller.NoResyncPeriodFunc) manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{Err: tc.podControllerError} fakePodControl := controller.FakePodControl{Err: tc.podControllerError}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
@ -303,7 +302,7 @@ func TestSyncJobPastDeadline(t *testing.T) {
for name, tc := range testCases { for name, tc := range testCases {
// job manager setup // job manager setup
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewJobController(clientset, controller.NoResyncPeriodFunc) manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
@ -373,7 +372,7 @@ func getCondition(job *extensions.Job, condition extensions.JobConditionType) bo
func TestSyncPastDeadlineJobFinished(t *testing.T) { func TestSyncPastDeadlineJobFinished(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewJobController(clientset, controller.NoResyncPeriodFunc) manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
@ -407,7 +406,7 @@ func TestSyncPastDeadlineJobFinished(t *testing.T) {
func TestSyncJobComplete(t *testing.T) { func TestSyncJobComplete(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewJobController(clientset, controller.NoResyncPeriodFunc) manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
@ -432,7 +431,7 @@ func TestSyncJobComplete(t *testing.T) {
func TestSyncJobDeleted(t *testing.T) { func TestSyncJobDeleted(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewJobController(clientset, controller.NoResyncPeriodFunc) manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
@ -452,7 +451,7 @@ func TestSyncJobDeleted(t *testing.T) {
func TestSyncJobUpdateRequeue(t *testing.T) { func TestSyncJobUpdateRequeue(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewJobController(clientset, controller.NoResyncPeriodFunc) manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
@ -473,7 +472,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) {
func TestJobPodLookup(t *testing.T) { func TestJobPodLookup(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewJobController(clientset, controller.NoResyncPeriodFunc) manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
testCases := []struct { testCases := []struct {
job *extensions.Job job *extensions.Job
@ -563,7 +562,7 @@ func (fe FakeJobExpectations) SatisfiedExpectations(controllerKey string) bool {
// and checking expectations. // and checking expectations.
func TestSyncJobExpectations(t *testing.T) { func TestSyncJobExpectations(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewJobController(clientset, controller.NoResyncPeriodFunc) manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
@ -599,8 +598,8 @@ type FakeWatcher struct {
func TestWatchJobs(t *testing.T) { func TestWatchJobs(t *testing.T) {
clientset := fake.NewSimpleClientset() clientset := fake.NewSimpleClientset()
fakeWatch := watch.NewFake() fakeWatch := watch.NewFake()
clientset.PrependWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil)) clientset.PrependWatchReactor("jobs", core.DefaultWatchReactor(fakeWatch, nil))
manager := NewJobController(clientset, controller.NoResyncPeriodFunc) manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
var testJob extensions.Job var testJob extensions.Job
@ -614,9 +613,12 @@ func TestWatchJobs(t *testing.T) {
if !exists || err != nil { if !exists || err != nil {
t.Errorf("Expected to find job under key %v", key) t.Errorf("Expected to find job under key %v", key)
} }
job := *obj.(*extensions.Job) job, ok := obj.(*extensions.Job)
if !api.Semantic.DeepDerivative(job, testJob) { if !ok {
t.Errorf("Expected %#v, but got %#v", testJob, job) t.Fatalf("unexpected type: %v %#v", reflect.TypeOf(obj), obj)
}
if !api.Semantic.DeepDerivative(*job, testJob) {
t.Errorf("Expected %#v, but got %#v", testJob, *job)
} }
close(received) close(received)
return nil return nil
@ -625,8 +627,7 @@ func TestWatchJobs(t *testing.T) {
// and make sure it hits the sync method. // and make sure it hits the sync method.
stopCh := make(chan struct{}) stopCh := make(chan struct{})
defer close(stopCh) defer close(stopCh)
go manager.jobController.Run(stopCh) go manager.Run(1, stopCh)
go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
// We're sending new job to see if it reaches syncHandler. // We're sending new job to see if it reaches syncHandler.
testJob.Name = "foo" testJob.Name = "foo"
@ -661,27 +662,35 @@ func TestIsJobFinished(t *testing.T) {
} }
func TestWatchPods(t *testing.T) { func TestWatchPods(t *testing.T) {
clientset := fake.NewSimpleClientset() testJob := newJob(2, 2)
clientset := fake.NewSimpleClientset(testJob)
fakeWatch := watch.NewFake() fakeWatch := watch.NewFake()
clientset.PrependWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil)) clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(fakeWatch, nil))
manager := NewJobController(clientset, controller.NoResyncPeriodFunc) manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
// Put one job and one pod into the store // Put one job and one pod into the store
testJob := newJob(2, 2)
manager.jobStore.Store.Add(testJob) manager.jobStore.Store.Add(testJob)
received := make(chan struct{}) received := make(chan struct{})
// The pod update sent through the fakeWatcher should figure out the managing job and // The pod update sent through the fakeWatcher should figure out the managing job and
// send it into the syncHandler. // send it into the syncHandler.
manager.syncHandler = func(key string) error { manager.syncHandler = func(key string) error {
obj, exists, err := manager.jobStore.Store.GetByKey(key) obj, exists, err := manager.jobStore.Store.GetByKey(key)
if !exists || err != nil { if !exists || err != nil {
t.Errorf("Expected to find job under key %v", key) t.Errorf("Expected to find job under key %v", key)
close(received)
return nil
}
job, ok := obj.(*extensions.Job)
if !ok {
t.Errorf("unexpected type: %v %#v", reflect.TypeOf(obj), obj)
close(received)
return nil
} }
job := obj.(*extensions.Job)
if !api.Semantic.DeepDerivative(job, testJob) { if !api.Semantic.DeepDerivative(job, testJob) {
t.Errorf("\nExpected %#v,\nbut got %#v", testJob, job) t.Errorf("\nExpected %#v,\nbut got %#v", testJob, job)
close(received)
return nil
} }
close(received) close(received)
return nil return nil
@ -690,8 +699,7 @@ func TestWatchPods(t *testing.T) {
// and make sure it hits the sync method for the right job. // and make sure it hits the sync method for the right job.
stopCh := make(chan struct{}) stopCh := make(chan struct{})
defer close(stopCh) defer close(stopCh)
go manager.podController.Run(stopCh) go manager.Run(1, stopCh)
go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
pods := newPodList(1, api.PodRunning, testJob) pods := newPodList(1, api.PodRunning, testJob)
testPod := pods[0] testPod := pods[0]

View File

@ -28,6 +28,7 @@ import (
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/quota/evaluator/core" "k8s.io/kubernetes/pkg/quota/evaluator/core"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
@ -86,43 +87,46 @@ func ObjectReplenishmentDeleteFunc(options *ReplenishmentControllerOptions) func
// ReplenishmentControllerFactory knows how to build replenishment controllers // ReplenishmentControllerFactory knows how to build replenishment controllers
type ReplenishmentControllerFactory interface { type ReplenishmentControllerFactory interface {
// NewController returns a controller configured with the specified options // NewController returns a controller configured with the specified options.
NewController(options *ReplenishmentControllerOptions) (*framework.Controller, error) // This method is NOT thread-safe.
NewController(options *ReplenishmentControllerOptions) (framework.ControllerInterface, error)
} }
// replenishmentControllerFactory implements ReplenishmentControllerFactory // replenishmentControllerFactory implements ReplenishmentControllerFactory
type replenishmentControllerFactory struct { type replenishmentControllerFactory struct {
kubeClient clientset.Interface kubeClient clientset.Interface
podInformer framework.SharedInformer
} }
// NewReplenishmentControllerFactory returns a factory that knows how to build controllers // NewReplenishmentControllerFactory returns a factory that knows how to build controllers
// to replenish resources when updated or deleted // to replenish resources when updated or deleted
func NewReplenishmentControllerFactory(kubeClient clientset.Interface) ReplenishmentControllerFactory { func NewReplenishmentControllerFactory(podInformer framework.SharedInformer, kubeClient clientset.Interface) ReplenishmentControllerFactory {
return &replenishmentControllerFactory{ return &replenishmentControllerFactory{
kubeClient: kubeClient, kubeClient: kubeClient,
podInformer: podInformer,
} }
} }
func (r *replenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (*framework.Controller, error) { func NewReplenishmentControllerFactoryFromClient(kubeClient clientset.Interface) ReplenishmentControllerFactory {
var result *framework.Controller return NewReplenishmentControllerFactory(nil, kubeClient)
}
func (r *replenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (framework.ControllerInterface, error) {
var result framework.ControllerInterface
switch options.GroupKind { switch options.GroupKind {
case api.Kind("Pod"): case api.Kind("Pod"):
_, result = framework.NewInformer( if r.podInformer != nil {
&cache.ListWatch{ r.podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return r.kubeClient.Core().Pods(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return r.kubeClient.Core().Pods(api.NamespaceAll).Watch(options)
},
},
&api.Pod{},
options.ResyncPeriod(),
framework.ResourceEventHandlerFuncs{
UpdateFunc: PodReplenishmentUpdateFunc(options), UpdateFunc: PodReplenishmentUpdateFunc(options),
DeleteFunc: ObjectReplenishmentDeleteFunc(options), DeleteFunc: ObjectReplenishmentDeleteFunc(options),
}, })
) result = r.podInformer.GetController()
break
}
r.podInformer = informers.CreateSharedPodInformer(r.kubeClient, options.ResyncPeriod())
result = r.podInformer
case api.Kind("Service"): case api.Kind("Service"):
_, result = framework.NewInformer( _, result = framework.NewInformer(
&cache.ListWatch{ &cache.ListWatch{

View File

@ -69,7 +69,7 @@ type ResourceQuotaController struct {
// knows how to calculate usage // knows how to calculate usage
registry quota.Registry registry quota.Registry
// controllers monitoring to notify for replenishment // controllers monitoring to notify for replenishment
replenishmentControllers []*framework.Controller replenishmentControllers []framework.ControllerInterface
} }
func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *ResourceQuotaController { func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *ResourceQuotaController {
@ -79,7 +79,7 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour
queue: workqueue.New(), queue: workqueue.New(),
resyncPeriod: options.ResyncPeriod, resyncPeriod: options.ResyncPeriod,
registry: options.Registry, registry: options.Registry,
replenishmentControllers: []*framework.Controller{}, replenishmentControllers: []framework.ControllerInterface{},
} }
// set the synchronization handler // set the synchronization handler

View File

@ -113,7 +113,7 @@ func TestSyncResourceQuota(t *testing.T) {
api.Kind("ReplicationController"), api.Kind("ReplicationController"),
api.Kind("PersistentVolumeClaim"), api.Kind("PersistentVolumeClaim"),
}, },
ControllerFactory: NewReplenishmentControllerFactory(kubeClient), ControllerFactory: NewReplenishmentControllerFactoryFromClient(kubeClient),
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc, ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
} }
quotaController := NewResourceQuotaController(resourceQuotaControllerOptions) quotaController := NewResourceQuotaController(resourceQuotaControllerOptions)
@ -199,7 +199,7 @@ func TestSyncResourceQuotaSpecChange(t *testing.T) {
api.Kind("ReplicationController"), api.Kind("ReplicationController"),
api.Kind("PersistentVolumeClaim"), api.Kind("PersistentVolumeClaim"),
}, },
ControllerFactory: NewReplenishmentControllerFactory(kubeClient), ControllerFactory: NewReplenishmentControllerFactoryFromClient(kubeClient),
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc, ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
} }
quotaController := NewResourceQuotaController(resourceQuotaControllerOptions) quotaController := NewResourceQuotaController(resourceQuotaControllerOptions)
@ -276,7 +276,7 @@ func TestSyncResourceQuotaNoChange(t *testing.T) {
api.Kind("ReplicationController"), api.Kind("ReplicationController"),
api.Kind("PersistentVolumeClaim"), api.Kind("PersistentVolumeClaim"),
}, },
ControllerFactory: NewReplenishmentControllerFactory(kubeClient), ControllerFactory: NewReplenishmentControllerFactoryFromClient(kubeClient),
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc, ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
} }
quotaController := NewResourceQuotaController(resourceQuotaControllerOptions) quotaController := NewResourceQuotaController(resourceQuotaControllerOptions)