diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 767fe7fbf91..0d9c95601e2 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -44,6 +44,7 @@ import ( client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/controller" endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint" + "k8s.io/kubernetes/pkg/controller/framework/informers" nodecontroller "k8s.io/kubernetes/pkg/controller/node" replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing" @@ -194,14 +195,18 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string eventBroadcaster.StartRecordingToSink(cl.Events("")) scheduler.New(schedulerConfig).Run() + podInformer := informers.CreateSharedPodInformer(clientset, controller.NoResyncPeriodFunc()) + // ensure the service endpoints are sync'd several times within the window that the integration tests wait - go endpointcontroller.NewEndpointController(clientset, controller.NoResyncPeriodFunc). + go endpointcontroller.NewEndpointController(podInformer, clientset). Run(3, wait.NeverStop) // TODO: Write an integration test for the replication controllers watch. - go replicationcontroller.NewReplicationManager(clientset, controller.NoResyncPeriodFunc, replicationcontroller.BurstReplicas, 4096). + go replicationcontroller.NewReplicationManager(podInformer, clientset, controller.NoResyncPeriodFunc, replicationcontroller.BurstReplicas, 4096). Run(3, wait.NeverStop) + go podInformer.Run(wait.NeverStop) + nodeController := nodecontroller.NewNodeController(nil, clientset, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(), 40*time.Second, 60*time.Second, 5*time.Second, nil, false) nodeController.Run(5 * time.Second) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index c8641e2f374..eac87640ca6 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -30,6 +30,7 @@ import ( "net/http" "net/http/pprof" "os" + "reflect" "strconv" "time" @@ -48,6 +49,8 @@ import ( "k8s.io/kubernetes/pkg/controller/daemon" "k8s.io/kubernetes/pkg/controller/deployment" endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint" + "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/controller/gc" "k8s.io/kubernetes/pkg/controller/job" namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace" @@ -189,11 +192,16 @@ func Run(s *options.CMServer) error { } func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig *restclient.Config, stop <-chan struct{}) error { - go endpointcontroller.NewEndpointController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "endpoint-controller")), ResyncPeriod(s)). + podInformer := informers.CreateSharedPodInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pod-informer")), ResyncPeriod(s)()) + informers := map[reflect.Type]framework.SharedInformer{} + informers[reflect.TypeOf(&api.Pod{})] = podInformer + + go endpointcontroller.NewEndpointController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "endpoint-controller"))). Run(s.ConcurrentEndpointSyncs, wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) go replicationcontroller.NewReplicationManager( + podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replication-controller")), ResyncPeriod(s), replicationcontroller.BurstReplicas, @@ -410,6 +418,11 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig ).Run() time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) + // run the shared informers + for _, informer := range informers { + go informer.Run(wait.NeverStop) + } + select {} } diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index af9aa27711a..05c09f90c9d 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -136,7 +136,7 @@ func (s *CMServer) Run(_ []string) error { endpoints := s.createEndpointController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "endpoint-controller"))) go endpoints.Run(s.ConcurrentEndpointSyncs, wait.NeverStop) - go replicationcontroller.NewReplicationManager(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replication-controller")), s.resyncPeriod, replicationcontroller.BurstReplicas, s.LookupCacheSizeForRC). + go replicationcontroller.NewReplicationManagerFromClient(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replication-controller")), s.resyncPeriod, replicationcontroller.BurstReplicas, s.LookupCacheSizeForRC). Run(s.ConcurrentRCSyncs, wait.NeverStop) if s.TerminatedPodGCThreshold > 0 { @@ -346,7 +346,7 @@ func (s *CMServer) createEndpointController(client *clientset.Clientset) kmendpo return kmendpoint.NewEndpointController(client) } glog.V(2).Infof("Creating podIP:containerPort endpoint controller") - stockEndpointController := endpointcontroller.NewEndpointController(client, s.resyncPeriod) + stockEndpointController := endpointcontroller.NewEndpointControllerFromClient(client, s.resyncPeriod) return stockEndpointController } diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index e19c0ce91ae..26fdad8669c 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -33,6 +33,7 @@ import ( clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime" @@ -58,7 +59,7 @@ var ( ) // NewEndpointController returns a new *EndpointController. -func NewEndpointController(client *clientset.Clientset, resyncPeriod controller.ResyncPeriodFunc) *EndpointController { +func NewEndpointController(podInformer framework.SharedInformer, client *clientset.Clientset) *EndpointController { e := &EndpointController{ client: client, queue: workqueue.New(), @@ -85,24 +86,23 @@ func NewEndpointController(client *clientset.Clientset, resyncPeriod controller. }, ) - e.podStore.Store, e.podController = framework.NewInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return e.client.Core().Pods(api.NamespaceAll).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return e.client.Core().Pods(api.NamespaceAll).Watch(options) - }, - }, - &api.Pod{}, - resyncPeriod(), - framework.ResourceEventHandlerFuncs{ - AddFunc: e.addPod, - UpdateFunc: e.updatePod, - DeleteFunc: e.deletePod, - }, - ) - e.podStoreSynced = e.podController.HasSynced + podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{ + AddFunc: e.addPod, + UpdateFunc: e.updatePod, + DeleteFunc: e.deletePod, + }) + e.podStore.Store = podInformer.GetStore() + e.podController = podInformer.GetController() + e.podStoreSynced = podInformer.HasSynced + + return e +} + +// NewEndpointControllerFromClient returns a new *EndpointController that runs its own informer. +func NewEndpointControllerFromClient(client *clientset.Clientset, resyncPeriod controller.ResyncPeriodFunc) *EndpointController { + podInformer := informers.CreateSharedPodInformer(client, resyncPeriod()) + e := NewEndpointController(podInformer, client) + e.internalPodInformer = podInformer return e } @@ -114,6 +114,13 @@ type EndpointController struct { serviceStore cache.StoreToServiceLister podStore cache.StoreToPodLister + // internalPodInformer is used to hold a personal informer. If we're using + // a normal shared informer, then the informer will be started for us. If + // we have a personal informer, we must start it ourselves. If you start + // the controller using NewEndpointController(passing SharedInformer), this + // will be null + internalPodInformer framework.SharedInformer + // Services that need to be updated. A channel is inappropriate here, // because it allows services with lots of pods to be serviced much // more often than services with few pods; it also would cause a @@ -124,7 +131,7 @@ type EndpointController struct { // Since we join two objects, we'll watch both of them with // controllers. serviceController *framework.Controller - podController *framework.Controller + podController framework.ControllerInterface // podStoreSynced returns true if the pod store has been synced at least once. // Added as a member to the struct to allow injection for testing. podStoreSynced func() bool @@ -144,6 +151,11 @@ func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) { time.Sleep(5 * time.Minute) // give time for our cache to fill e.checkLeftoverEndpoints() }() + + if e.internalPodInformer != nil { + go e.internalPodInformer.Run(stopCh) + } + <-stopCh e.queue.ShutDown() } diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index 6f5d43fd815..06998739b38 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -108,7 +108,7 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { // TODO: Uncomment when fix #19254 // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, @@ -142,7 +142,7 @@ func TestCheckLeftoverEndpoints(t *testing.T) { // TODO: Uncomment when fix #19254 // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady endpoints.checkLeftoverEndpoints() @@ -172,7 +172,7 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) { // TODO: Uncomment when fix #19254 // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Store, ns, 1, 1, 0) @@ -216,7 +216,7 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) { // TODO: Uncomment when fix #19254 // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Store, ns, 1, 1, 0) endpoints.serviceStore.Store.Add(&api.Service{ @@ -256,7 +256,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { // TODO: Uncomment when fix #19254 // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Store, ns, 1, 1, 0) endpoints.serviceStore.Store.Add(&api.Service{ @@ -295,7 +295,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) { // TODO: Uncomment when fix #19254 // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Store, ns, 0, 1, 1) endpoints.serviceStore.Store.Add(&api.Service{ @@ -334,7 +334,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) { // TODO: Uncomment when fix #19254 // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Store, ns, 1, 1, 1) endpoints.serviceStore.Store.Add(&api.Service{ @@ -377,7 +377,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { // TODO: Uncomment when fix #19254 // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Store, ns, 1, 1, 0) endpoints.serviceStore.Store.Add(&api.Service{ @@ -419,7 +419,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) { // TODO: Uncomment when fix #19254 // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Store, api.NamespaceDefault, 1, 1, 0) endpoints.serviceStore.Store.Add(&api.Service{ @@ -440,7 +440,7 @@ func TestSyncEndpointsItems(t *testing.T) { // TODO: Uncomment when fix #19254 // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Store, ns, 3, 2, 0) addPods(endpoints.podStore.Store, "blah", 5, 2, 0) // make sure these aren't found! @@ -484,7 +484,7 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) { // TODO: Uncomment when fix #19254 // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Store, ns, 3, 2, 0) serviceLabels := map[string]string{"foo": "bar"} @@ -546,7 +546,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) { // TODO: Uncomment when fix #19254 // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Store, ns, 1, 1, 0) serviceLabels := map[string]string{"baz": "blah"} diff --git a/pkg/controller/framework/controller.go b/pkg/controller/framework/controller.go index ed819525266..cfffabe8cd4 100644 --- a/pkg/controller/framework/controller.go +++ b/pkg/controller/framework/controller.go @@ -68,6 +68,12 @@ type Controller struct { reflectorMutex sync.RWMutex } +// TODO make the "Controller" private, and convert all references to use ControllerInterface instead +type ControllerInterface interface { + Run(stopCh <-chan struct{}) + HasSynced() bool +} + // New makes a new Controller from the given Config. func New(c *Config) *Controller { ctlr := &Controller{ diff --git a/pkg/controller/framework/informers/factory.go b/pkg/controller/framework/informers/factory.go new file mode 100644 index 00000000000..1b4cce4adb8 --- /dev/null +++ b/pkg/controller/framework/informers/factory.go @@ -0,0 +1,44 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package informers + +import ( + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/cache" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/watch" +) + +// CreateSharedPodInformer returns a SharedInformer that lists and watches all pods +func CreateSharedPodInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedInformer { + sharedInformer := framework.NewSharedInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return client.Core().Pods(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return client.Core().Pods(api.NamespaceAll).Watch(options) + }, + }, + &api.Pod{}, resyncPeriod) + + return sharedInformer +} diff --git a/pkg/controller/framework/shared_informer.go b/pkg/controller/framework/shared_informer.go new file mode 100644 index 00000000000..cfa5d47ce23 --- /dev/null +++ b/pkg/controller/framework/shared_informer.go @@ -0,0 +1,297 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "fmt" + "sync" + "time" + + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/runtime" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" +) + +// if you use this, there is one behavior change compared to a standard Informer. +// When you receive a notification, the cache will be AT LEAST as fresh as the +// notification, but it MAY be more fresh. You should NOT depend on the contents +// of the cache exactly matching the notification you've received in handler +// functions. If there was a create, followed by a delete, the cache may NOT +// have your item. This has advantages over the broadcaster since it allows us +// to share a common cache across many controllers. Extending the broadcaster +// would have required us keep duplicate caches for each watch. +type SharedInformer interface { + // events to a single handler are delivered sequentially, but there is no coordination between different handlers + // You may NOT add a handler *after* the SharedInformer is running. That will result in an error being returned. + // TODO we should try to remove this restriction eventually. + AddEventHandler(handler ResourceEventHandler) error + GetStore() cache.Store + // GetController gives back a synthetic interface that "votes" to start the informer + GetController() ControllerInterface + Run(stopCh <-chan struct{}) + HasSynced() bool +} + +type SharedIndexInformer interface { + SharedInformer + + AddIndexer(indexer cache.Indexer) error + GetIndexer() cache.Indexer +} + +// NewSharedInformer creates a new instance for the listwatcher. +// TODO: create a cache/factory of these at a higher level for the list all, watch all of a given resource that can +// be shared amongst all consumers. +func NewSharedInformer(lw cache.ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer { + sharedInformer := &sharedInformer{ + processor: &sharedProcessor{}, + store: cache.NewStore(DeletionHandlingMetaNamespaceKeyFunc), + } + + fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, sharedInformer.store) + + cfg := &Config{ + Queue: fifo, + ListerWatcher: lw, + ObjectType: objType, + FullResyncPeriod: resyncPeriod, + RetryOnError: false, + + Process: sharedInformer.HandleDeltas, + } + sharedInformer.controller = New(cfg) + + return sharedInformer +} + +type sharedInformer struct { + store cache.Store + controller *Controller + + processor *sharedProcessor + + started bool + startedLock sync.Mutex +} + +// dummyController hides the fact that a SharedInformer is different from a dedicated one +// where a caller can `Run`. The run method is disonnected in this case, because higher +// level logic will decide when to start the SharedInformer and related controller. +// Because returning information back is always asynchronous, the legacy callers shouldn't +// notice any change in behavior. +type dummyController struct { + informer *sharedInformer +} + +func (v *dummyController) Run(stopCh <-chan struct{}) { +} + +func (v *dummyController) HasSynced() bool { + return v.informer.HasSynced() +} + +type updateNotification struct { + oldObj interface{} + newObj interface{} +} + +type addNotification struct { + newObj interface{} +} + +type deleteNotification struct { + oldObj interface{} +} + +func (s *sharedInformer) Run(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + + func() { + s.startedLock.Lock() + defer s.startedLock.Unlock() + s.started = true + }() + + s.processor.run(stopCh) + s.controller.Run(stopCh) +} + +func (s *sharedInformer) isStarted() bool { + s.startedLock.Lock() + defer s.startedLock.Unlock() + return s.started +} + +func (s *sharedInformer) HasSynced() bool { + return s.controller.HasSynced() +} + +func (s *sharedInformer) GetStore() cache.Store { + return s.store +} + +func (s *sharedInformer) GetController() ControllerInterface { + return &dummyController{informer: s} +} + +func (s *sharedInformer) AddEventHandler(handler ResourceEventHandler) error { + s.startedLock.Lock() + defer s.startedLock.Unlock() + + if s.started { + return fmt.Errorf("informer has already started") + } + + listener := newProcessListener(handler) + s.processor.listeners = append(s.processor.listeners, listener) + return nil +} + +func (s *sharedInformer) HandleDeltas(obj interface{}) error { + // from oldest to newest + for _, d := range obj.(cache.Deltas) { + switch d.Type { + case cache.Sync, cache.Added, cache.Updated: + if old, exists, err := s.store.Get(d.Object); err == nil && exists { + if err := s.store.Update(d.Object); err != nil { + return err + } + s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}) + } else { + if err := s.store.Add(d.Object); err != nil { + return err + } + s.processor.distribute(addNotification{newObj: d.Object}) + } + case cache.Deleted: + if err := s.store.Delete(d.Object); err != nil { + return err + } + s.processor.distribute(deleteNotification{oldObj: d.Object}) + } + } + return nil +} + +type sharedProcessor struct { + listeners []*processorListener +} + +func (p *sharedProcessor) distribute(obj interface{}) { + for _, listener := range p.listeners { + listener.add(obj) + } +} + +func (p *sharedProcessor) run(stopCh <-chan struct{}) { + for _, listener := range p.listeners { + go listener.run(stopCh) + go listener.pop(stopCh) + } +} + +type processorListener struct { + // lock/cond protects access to 'pendingNotifications'. + lock sync.RWMutex + cond sync.Cond + + // pendingNotifications is an unbounded slice that holds all notifications not yet distributed + // there is one per listener, but a failing/stalled listener will have infinite pendingNotifications + // added until we OOM. + // TODO This is no worse that before, since reflectors were backed by unbounded DeltaFIFOs, but + // we should try to do something better + pendingNotifications []interface{} + + nextCh chan interface{} + + handler ResourceEventHandler +} + +func newProcessListener(handler ResourceEventHandler) *processorListener { + ret := &processorListener{ + pendingNotifications: []interface{}{}, + nextCh: make(chan interface{}), + handler: handler, + } + + ret.cond.L = &ret.lock + return ret +} + +func (p *processorListener) add(notification interface{}) { + p.lock.Lock() + defer p.lock.Unlock() + + p.pendingNotifications = append(p.pendingNotifications, notification) + p.cond.Broadcast() +} + +func (p *processorListener) pop(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + + p.lock.Lock() + defer p.lock.Unlock() + for { + for len(p.pendingNotifications) == 0 { + // check if we're shutdown + select { + case <-stopCh: + return + default: + } + + p.cond.Wait() + } + notification := p.pendingNotifications[0] + p.pendingNotifications = p.pendingNotifications[1:] + + select { + case <-stopCh: + return + case p.nextCh <- notification: + } + } +} + +func (p *processorListener) run(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + + for { + var next interface{} + select { + case <-stopCh: + func() { + p.lock.Lock() + defer p.lock.Unlock() + p.cond.Broadcast() + }() + return + case next = <-p.nextCh: + } + + switch notification := next.(type) { + case updateNotification: + p.handler.OnUpdate(notification.oldObj, notification.newObj) + case addNotification: + p.handler.OnAdd(notification.newObj) + case deleteNotification: + p.handler.OnDelete(notification.oldObj) + default: + utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next)) + } + } +} diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index 92c4145a087..42b72a9ef22 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime" @@ -66,6 +67,13 @@ type ReplicationManager struct { kubeClient clientset.Interface podControl controller.PodControlInterface + // internalPodInformer is used to hold a personal informer. If we're using + // a normal shared informer, then the informer will be started for us. If + // we have a personal informer, we must start it ourselves. If you start + // the controller using NewReplicationManager(passing SharedInformer), this + // will be null + internalPodInformer framework.SharedInformer + // An rc is temporarily suspended after creating/deleting these many replicas. // It resumes normal action after observing the watch events for them. burstReplicas int @@ -82,7 +90,7 @@ type ReplicationManager struct { // A store of pods, populated by the podController podStore cache.StoreToPodLister // Watches changes to all pods - podController *framework.Controller + podController framework.ControllerInterface // podStoreSynced returns true if the pod store has been synced at least once. // Added as a member to the struct to allow injection for testing. podStoreSynced func() bool @@ -93,8 +101,7 @@ type ReplicationManager struct { queue *workqueue.Type } -// NewReplicationManager creates a new ReplicationManager. -func NewReplicationManager(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager { +func NewReplicationManager(podInformer framework.SharedInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")}) @@ -167,31 +174,31 @@ func NewReplicationManager(kubeClient clientset.Interface, resyncPeriod controll }, ) - rm.podStore.Store, rm.podController = framework.NewInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return rm.kubeClient.Core().Pods(api.NamespaceAll).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return rm.kubeClient.Core().Pods(api.NamespaceAll).Watch(options) - }, - }, - &api.Pod{}, - resyncPeriod(), - framework.ResourceEventHandlerFuncs{ - AddFunc: rm.addPod, - // This invokes the rc for every pod change, eg: host assignment. Though this might seem like overkill - // the most frequent pod update is status, and the associated rc will only list from local storage, so - // it should be ok. - UpdateFunc: rm.updatePod, - DeleteFunc: rm.deletePod, - }, - ) + podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{ + AddFunc: rm.addPod, + // This invokes the rc for every pod change, eg: host assignment. Though this might seem like overkill + // the most frequent pod update is status, and the associated rc will only list from local storage, so + // it should be ok. + UpdateFunc: rm.updatePod, + DeleteFunc: rm.deletePod, + }) + rm.podStore.Store = podInformer.GetStore() + rm.podController = podInformer.GetController() rm.syncHandler = rm.syncReplicationController rm.podStoreSynced = rm.podController.HasSynced rm.lookupCache = controller.NewMatchingCache(lookupCacheSize) return rm + +} + +// NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer. +func NewReplicationManagerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager { + podInformer := informers.CreateSharedPodInformer(kubeClient, resyncPeriod()) + rm := NewReplicationManager(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize) + rm.internalPodInformer = podInformer + + return rm } // SetEventRecorder replaces the event recorder used by the replication manager @@ -211,6 +218,11 @@ func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) { for i := 0; i < workers; i++ { go wait.Until(rm.worker, time.Second, stopCh) } + + if rm.internalPodInformer != nil { + go rm.internalPodInformer.Run(stopCh) + } + <-stopCh glog.Infof("Shutting down RC Manager") rm.queue.ShutDown() @@ -478,7 +490,7 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.Re if err := rm.podControl.DeletePod(rc.Namespace, filteredPods[ix].Name, rc); err != nil { // Decrement the expected number of deletes because the informer won't observe this deletion podKey := controller.PodKey(filteredPods[ix]) - glog.V(2).Infof("Failed to delete %v, decrementing expectations for controller %q/%q", podKey, rc.Namespace, rc.Name) + glog.V(2).Infof("Failed to delete %v due to %v, decrementing expectations for controller %q/%q", podKey, err, rc.Namespace, rc.Name) rm.expectations.DeletionObserved(rcKey, podKey) utilruntime.HandleError(err) } diff --git a/pkg/controller/replication/replication_controller_test.go b/pkg/controller/replication/replication_controller_test.go index 79040294407..8ac2299fded 100644 --- a/pkg/controller/replication/replication_controller_test.go +++ b/pkg/controller/replication/replication_controller_test.go @@ -138,7 +138,7 @@ type serverResponse struct { func TestSyncReplicationControllerDoesNothing(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) fakePodControl := controller.FakePodControl{} - manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady // 2 running pods, a controller with 2 replicas, sync is a no-op @@ -154,7 +154,7 @@ func TestSyncReplicationControllerDoesNothing(t *testing.T) { func TestSyncReplicationControllerDeletes(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) fakePodControl := controller.FakePodControl{} - manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady manager.podControl = &fakePodControl @@ -170,7 +170,7 @@ func TestSyncReplicationControllerDeletes(t *testing.T) { func TestDeleteFinalStateUnknown(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) fakePodControl := controller.FakePodControl{} - manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady manager.podControl = &fakePodControl @@ -202,7 +202,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) { func TestSyncReplicationControllerCreates(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady // A controller with 2 replicas and no pods in the store, 2 creates expected @@ -225,7 +225,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) { // TODO: Uncomment when fix #19254 // defer testServer.Close() c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady // Steady state for the replication controller, no Status.Replicas updates expected @@ -267,7 +267,7 @@ func TestControllerUpdateReplicas(t *testing.T) { // TODO: Uncomment when fix #19254 // defer testServer.Close() c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady // Insufficient number of pods in the system, and Status.Replicas is wrong; @@ -313,7 +313,7 @@ func TestSyncReplicationControllerDormancy(t *testing.T) { // defer testServer.Close() c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) fakePodControl := controller.FakePodControl{} - manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady manager.podControl = &fakePodControl @@ -359,7 +359,7 @@ func TestSyncReplicationControllerDormancy(t *testing.T) { } func TestPodControllerLookup(t *testing.T) { - manager := NewReplicationManager(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}), controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicationManagerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}), controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady testCases := []struct { inRCs []*api.ReplicationController @@ -422,7 +422,7 @@ func TestWatchControllers(t *testing.T) { fakeWatch := watch.NewFake() c := &fake.Clientset{} c.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil)) - manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady var testControllerSpec api.ReplicationController @@ -465,7 +465,7 @@ func TestWatchPods(t *testing.T) { fakeWatch := watch.NewFake() c := &fake.Clientset{} c.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil)) - manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady // Put one rc and one pod into the controller's stores @@ -492,6 +492,7 @@ func TestWatchPods(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) go manager.podController.Run(stopCh) + go manager.internalPodInformer.Run(stopCh) go wait.Until(manager.worker, 10*time.Millisecond, stopCh) pods := newPodList(nil, 1, api.PodRunning, testControllerSpec, "pod") @@ -507,7 +508,7 @@ func TestWatchPods(t *testing.T) { } func TestUpdatePods(t *testing.T) { - manager := NewReplicationManager(fake.NewSimpleClientset(), controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicationManagerFromClient(fake.NewSimpleClientset(), controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady received := make(chan string) @@ -567,7 +568,7 @@ func TestControllerUpdateRequeue(t *testing.T) { // defer testServer.Close() c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady rc := newReplicationController(1) @@ -649,7 +650,7 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) { func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) fakePodControl := controller.FakePodControl{} - manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, burstReplicas, 0) + manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, burstReplicas, 0) manager.podStoreSynced = alwaysReady manager.podControl = &fakePodControl @@ -799,7 +800,7 @@ func (fe FakeRCExpectations) SatisfiedExpectations(controllerKey string) bool { func TestRCSyncExpectations(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) fakePodControl := controller.FakePodControl{} - manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, 2, 0) + manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 2, 0) manager.podStoreSynced = alwaysReady manager.podControl = &fakePodControl @@ -823,7 +824,7 @@ func TestRCSyncExpectations(t *testing.T) { func TestDeleteControllerAndExpectations(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, 10, 0) + manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 10, 0) manager.podStoreSynced = alwaysReady rc := newReplicationController(1) @@ -866,7 +867,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) { func TestRCManagerNotReady(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) fakePodControl := controller.FakePodControl{} - manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, 2, 0) + manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 2, 0) manager.podControl = &fakePodControl manager.podStoreSynced = func() bool { return false } @@ -904,7 +905,7 @@ func TestOverlappingRCs(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) for i := 0; i < 5; i++ { - manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, 10, 0) + manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 10, 0) manager.podStoreSynced = alwaysReady // Create 10 rcs, shuffled them randomly and insert them into the rc manager's store @@ -933,7 +934,7 @@ func TestOverlappingRCs(t *testing.T) { func TestDeletionTimestamp(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, 10, 0) + manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 10, 0) manager.podStoreSynced = alwaysReady controllerSpec := newReplicationController(1) @@ -1020,7 +1021,7 @@ func TestDeletionTimestamp(t *testing.T) { func BenchmarkGetPodControllerMultiNS(b *testing.B) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicationManagerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) const nsNum = 1000 @@ -1066,7 +1067,7 @@ func BenchmarkGetPodControllerMultiNS(b *testing.B) { func BenchmarkGetPodControllerSingleNS(b *testing.B) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicationManagerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) const rcNum = 1000 const replicaNum = 3 diff --git a/test/integration/framework/master_utils.go b/test/integration/framework/master_utils.go index 67d988d8d5a..07612882089 100644 --- a/test/integration/framework/master_utils.go +++ b/test/integration/framework/master_utils.go @@ -108,7 +108,7 @@ func NewMasterComponents(c *Config) *MasterComponents { restClient := client.NewOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}, QPS: c.QPS, Burst: c.Burst}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}, QPS: c.QPS, Burst: c.Burst}) rcStopCh := make(chan struct{}) - controllerManager := replicationcontroller.NewReplicationManager(clientset, controller.NoResyncPeriodFunc, c.Burst, 4096) + controllerManager := replicationcontroller.NewReplicationManagerFromClient(clientset, controller.NoResyncPeriodFunc, c.Burst, 4096) // TODO: Support events once we can cleanly shutdown an event recorder. controllerManager.SetEventRecorder(&record.FakeRecorder{})