From c10f43a2e54fc555c93526b0e1f20c60f5d9aa9d Mon Sep 17 00:00:00 2001 From: mqliang Date: Mon, 2 May 2016 12:35:18 +0800 Subject: [PATCH] implement AddIndexers for SharedIndexInformer --- cmd/integration/integration.go | 3 +- .../app/controllermanager.go | 3 +- pkg/client/cache/index.go | 2 + pkg/client/cache/listers.go | 17 +++- pkg/client/cache/store.go | 5 ++ pkg/client/cache/thread_safe_store.go | 5 ++ pkg/controller/daemon/controller.go | 2 +- .../endpoint/endpoints_controller.go | 2 +- pkg/controller/framework/informers/factory.go | 20 +++-- pkg/controller/framework/shared_informer.go | 83 +++++++++++-------- pkg/controller/gc/gc_controller.go | 6 +- pkg/controller/job/controller.go | 2 +- pkg/controller/node/nodecontroller.go | 6 +- pkg/controller/petset/pet_set.go | 4 +- pkg/controller/petset/pet_set_test.go | 2 +- .../replication/replication_controller.go | 2 +- .../replication_controller_test.go | 4 +- 17 files changed, 107 insertions(+), 61 deletions(-) diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 9afafd1e69e..5c503a7d672 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -38,7 +38,6 @@ import ( "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/v1" - "k8s.io/kubernetes/pkg/client/cache" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/restclient" @@ -195,7 +194,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string eventBroadcaster.StartRecordingToSink(cl.Events("")) scheduler.New(schedulerConfig).Run() - podInformer := informers.CreateSharedIndexPodInformer(clientset, controller.NoResyncPeriodFunc(), cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + podInformer := informers.CreateSharedPodIndexInformer(clientset, controller.NoResyncPeriodFunc()) // ensure the service endpoints are sync'd several times within the window that the integration tests wait go endpointcontroller.NewEndpointController(podInformer, clientset). diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 5e526bf363e..84e44fd12f8 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -37,7 +37,6 @@ import ( "k8s.io/kubernetes/cmd/kube-controller-manager/app/options" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" - "k8s.io/kubernetes/pkg/client/cache" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/leaderelection" "k8s.io/kubernetes/pkg/client/record" @@ -195,7 +194,7 @@ func Run(s *options.CMServer) error { } func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig *restclient.Config, stop <-chan struct{}) error { - podInformer := informers.CreateSharedIndexPodInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pod-informer")), ResyncPeriod(s)(), cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + podInformer := informers.CreateSharedPodIndexInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pod-informer")), ResyncPeriod(s)()) informers := map[reflect.Type]framework.SharedIndexInformer{} informers[reflect.TypeOf(&api.Pod{})] = podInformer diff --git a/pkg/client/cache/index.go b/pkg/client/cache/index.go index 0dcb43e128c..19c5e0650d7 100644 --- a/pkg/client/cache/index.go +++ b/pkg/client/cache/index.go @@ -32,6 +32,8 @@ type Indexer interface { ListIndexFuncValues(indexName string) []string // ByIndex lists object that match on the named indexing function with the exact key ByIndex(indexName, indexKey string) ([]interface{}, error) + // GetIndexer return the indexers + GetIndexers() Indexers } // IndexFunc knows how to provide an indexed value for an object. diff --git a/pkg/client/cache/listers.go b/pkg/client/cache/listers.go index d017dde3a6f..1054c515786 100644 --- a/pkg/client/cache/listers.go +++ b/pkg/client/cache/listers.go @@ -91,8 +91,16 @@ func (s storePodsNamespacer) List(selector labels.Selector) (pods api.PodList, e key := &api.Pod{ObjectMeta: api.ObjectMeta{Namespace: s.namespace}} items, err := s.indexer.Index(NamespaceIndex, key) if err != nil { - return api.PodList{}, err + glog.Warningf("can not retrieve list of objects using index : %v", err) + for _, m := range s.indexer.List() { + pod := m.(*api.Pod) + if s.namespace == pod.Namespace && selector.Matches(labels.Set(pod.Labels)) { + list.Items = append(list.Items, *pod) + } + } + return list, err } + for _, m := range items { pod := m.(*api.Pod) if selector.Matches(labels.Set(pod.Labels)) { @@ -200,6 +208,13 @@ func (s storeReplicationControllersNamespacer) List(selector labels.Selector) (c key := &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: s.namespace}} items, err := s.indexer.Index(NamespaceIndex, key) if err != nil { + glog.Warningf("can not retrieve list of objects using index : %v", err) + for _, m := range s.indexer.List() { + rc := *(m.(*api.ReplicationController)) + if s.namespace == rc.Namespace && selector.Matches(labels.Set(rc.Labels)) { + controllers = append(controllers, rc) + } + } return } for _, m := range items { diff --git a/pkg/client/cache/store.go b/pkg/client/cache/store.go index 17a360f8e42..b7f6f54cebd 100644 --- a/pkg/client/cache/store.go +++ b/pkg/client/cache/store.go @@ -160,6 +160,11 @@ func (c *cache) ListKeys() []string { return c.cacheStorage.ListKeys() } +// GetIndexers returns the indexers of cache +func (c *cache) GetIndexers() Indexers { + return c.cacheStorage.GetIndexers() +} + // Index returns a list of items that match on the index function // Index is thread-safe so long as you treat all items as immutable func (c *cache) Index(indexName string, obj interface{}) ([]interface{}, error) { diff --git a/pkg/client/cache/thread_safe_store.go b/pkg/client/cache/thread_safe_store.go index ae4b802c815..20d985cc4e5 100644 --- a/pkg/client/cache/thread_safe_store.go +++ b/pkg/client/cache/thread_safe_store.go @@ -45,6 +45,7 @@ type ThreadSafeStore interface { Index(indexName string, obj interface{}) ([]interface{}, error) ListIndexFuncValues(name string) []string ByIndex(indexName, indexKey string) ([]interface{}, error) + GetIndexers() Indexers } // threadSafeMap implements ThreadSafeStore @@ -187,6 +188,10 @@ func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string { return names } +func (c *threadSafeMap) GetIndexers() Indexers { + return c.indexers +} + // updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj // updateIndices must be called from a function that already has a lock on the cache func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) error { diff --git a/pkg/controller/daemon/controller.go b/pkg/controller/daemon/controller.go index 3d49441b7b0..0066fd086ac 100644 --- a/pkg/controller/daemon/controller.go +++ b/pkg/controller/daemon/controller.go @@ -210,7 +210,7 @@ func NewDaemonSetsController(podInformer framework.SharedIndexInformer, kubeClie } func NewDaemonSetsControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController { - podInformer := informers.CreateSharedIndexPodInformer(kubeClient, resyncPeriod(), cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod()) dsc := NewDaemonSetsController(podInformer, kubeClient, resyncPeriod, lookupCacheSize) dsc.internalPodInformer = podInformer diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 4ff5f1e210b..5bb9119f983 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -104,7 +104,7 @@ func NewEndpointController(podInformer framework.SharedIndexInformer, client *cl // NewEndpointControllerFromClient returns a new *EndpointController that runs its own informer. func NewEndpointControllerFromClient(client *clientset.Clientset, resyncPeriod controller.ResyncPeriodFunc) *EndpointController { - podInformer := informers.CreateSharedIndexPodInformer(client, resyncPeriod(), cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + podInformer := informers.CreateSharedPodIndexInformer(client, resyncPeriod()) e := NewEndpointController(podInformer, client) e.internalPodInformer = podInformer diff --git a/pkg/controller/framework/informers/factory.go b/pkg/controller/framework/informers/factory.go index 3f63d3bf6b3..3c3472cc52a 100644 --- a/pkg/controller/framework/informers/factory.go +++ b/pkg/controller/framework/informers/factory.go @@ -27,9 +27,9 @@ import ( "k8s.io/kubernetes/pkg/watch" ) -// CreateSharedPodInformer returns a SharedInformer that lists and watches all pods -func CreateSharedPodInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedInformer { - sharedInformer := framework.NewSharedInformer( +// CreateSharedPodInformer returns a SharedIndexInformer that lists and watches all pods +func CreateSharedPodInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer { + sharedInformer := framework.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { return client.Core().Pods(api.NamespaceAll).List(options) @@ -38,13 +38,16 @@ func CreateSharedPodInformer(client clientset.Interface, resyncPeriod time.Durat return client.Core().Pods(api.NamespaceAll).Watch(options) }, }, - &api.Pod{}, resyncPeriod) + &api.Pod{}, + resyncPeriod, + cache.Indexers{}, + ) return sharedInformer } -// CreateSharedIndexPodInformer returns a SharedIndexInformer that lists and watches all pods -func CreateSharedIndexPodInformer(client clientset.Interface, resyncPeriod time.Duration, indexers cache.Indexers) framework.SharedIndexInformer { +// CreateSharedPodIndexInformer returns a SharedIndexInformer that lists and watches all pods +func CreateSharedPodIndexInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer { sharedIndexInformer := framework.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { @@ -54,7 +57,10 @@ func CreateSharedIndexPodInformer(client clientset.Interface, resyncPeriod time. return client.Core().Pods(api.NamespaceAll).Watch(options) }, }, - &api.Pod{}, resyncPeriod, indexers) + &api.Pod{}, + resyncPeriod, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) return sharedIndexInformer } diff --git a/pkg/controller/framework/shared_informer.go b/pkg/controller/framework/shared_informer.go index 51a76e05672..9d9d99bcf4d 100644 --- a/pkg/controller/framework/shared_informer.go +++ b/pkg/controller/framework/shared_informer.go @@ -48,6 +48,7 @@ type SharedInformer interface { type SharedIndexInformer interface { SharedInformer + // AddIndexers add indexers to the informer before it starts. AddIndexers(indexers cache.Indexers) error GetIndexer() cache.Indexer } @@ -57,48 +58,26 @@ type SharedIndexInformer interface { // be shared amongst all consumers. func NewSharedInformer(lw cache.ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer { sharedInformer := &sharedIndexInformer{ - processor: &sharedProcessor{}, - indexer: cache.NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{}), + processor: &sharedProcessor{}, + indexer: cache.NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{}), + listerWatcher: lw, + objectType: objType, + fullResyncPeriod: resyncPeriod, } - - fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, sharedInformer.indexer) - - cfg := &Config{ - Queue: fifo, - ListerWatcher: lw, - ObjectType: objType, - FullResyncPeriod: resyncPeriod, - RetryOnError: false, - - Process: sharedInformer.HandleDeltas, - } - sharedInformer.controller = New(cfg) - return sharedInformer } -/// NewSharedIndexInformer creates a new instance for the listwatcher. +// NewSharedIndexInformer creates a new instance for the listwatcher. // TODO: create a cache/factory of these at a higher level for the list all, watch all of a given resource that can // be shared amongst all consumers. func NewSharedIndexInformer(lw cache.ListerWatcher, objType runtime.Object, resyncPeriod time.Duration, indexers cache.Indexers) SharedIndexInformer { sharedIndexInformer := &sharedIndexInformer{ - processor: &sharedProcessor{}, - indexer: cache.NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), + processor: &sharedProcessor{}, + indexer: cache.NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), + listerWatcher: lw, + objectType: objType, + fullResyncPeriod: resyncPeriod, } - - fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, sharedIndexInformer.indexer) - - cfg := &Config{ - Queue: fifo, - ListerWatcher: lw, - ObjectType: objType, - FullResyncPeriod: resyncPeriod, - RetryOnError: false, - - Process: sharedIndexInformer.HandleDeltas, - } - sharedIndexInformer.controller = New(cfg) - return sharedIndexInformer } @@ -108,6 +87,11 @@ type sharedIndexInformer struct { processor *sharedProcessor + // This block is tracked to handle late initialization of the controller + listerWatcher cache.ListerWatcher + objectType runtime.Object + fullResyncPeriod time.Duration + started bool startedLock sync.Mutex } @@ -144,6 +128,19 @@ type deleteNotification struct { func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() + fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, s.indexer) + + cfg := &Config{ + Queue: fifo, + ListerWatcher: s.listerWatcher, + ObjectType: s.objectType, + FullResyncPeriod: s.fullResyncPeriod, + RetryOnError: false, + + Process: s.HandleDeltas, + } + s.controller = New(cfg) + func() { s.startedLock.Lock() defer s.startedLock.Unlock() @@ -172,9 +169,25 @@ func (s *sharedIndexInformer) GetIndexer() cache.Indexer { return s.indexer } -// TODO(mqliang): implement this func (s *sharedIndexInformer) AddIndexers(indexers cache.Indexers) error { - panic("has not implemeted yet") + s.startedLock.Lock() + defer s.startedLock.Unlock() + + if s.started { + return fmt.Errorf("informer has already started") + } + + oldIndexers := s.indexer.GetIndexers() + + for name, indexFunc := range oldIndexers { + if _, exist := indexers[name]; exist { + return fmt.Errorf("there is an index named %s already exist", name) + } + indexers[name] = indexFunc + } + + s.indexer = cache.NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers) + return nil } func (s *sharedIndexInformer) GetController() ControllerInterface { diff --git a/pkg/controller/gc/gc_controller.go b/pkg/controller/gc/gc_controller.go index 9893d6dbfc6..485c326c177 100644 --- a/pkg/controller/gc/gc_controller.go +++ b/pkg/controller/gc/gc_controller.go @@ -77,8 +77,10 @@ func New(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFun &api.Pod{}, resyncPeriod(), framework.ResourceEventHandlerFuncs{}, - // We don't need to build a index for podStore here - cache.Indexers{}, + // We don't need to build a index for podStore here actually, but build one for consistency. + // It will ensure that if people start making use of the podStore in more specific ways, + // they'll get the benefits they expect. It will also reserve the name for future refactorings. + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) return gcc } diff --git a/pkg/controller/job/controller.go b/pkg/controller/job/controller.go index 108da025efc..964a4ec14d6 100644 --- a/pkg/controller/job/controller.go +++ b/pkg/controller/job/controller.go @@ -135,7 +135,7 @@ func NewJobController(podInformer framework.SharedIndexInformer, kubeClient clie } func NewJobControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) *JobController { - podInformer := informers.CreateSharedIndexPodInformer(kubeClient, resyncPeriod(), cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod()) jm := NewJobController(podInformer, kubeClient) jm.internalPodInformer = podInformer diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index fa23541234d..af87c36883e 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -193,8 +193,10 @@ func NewNodeController( AddFunc: nc.maybeDeleteTerminatingPod, UpdateFunc: func(_, obj interface{}) { nc.maybeDeleteTerminatingPod(obj) }, }, - // We don't need to build a index for podStore here - cache.Indexers{}, + // We don't need to build a index for podStore here actually, but build one for consistency. + // It will ensure that if people start making use of the podStore in more specific ways, + // they'll get the benefits they expect. It will also reserve the name for future refactorings. + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) nc.nodeStore.Store, nc.nodeController = framework.NewInformer( &cache.ListWatch{ diff --git a/pkg/controller/petset/pet_set.go b/pkg/controller/petset/pet_set.go index 809327397c3..a34eb6d5c2e 100644 --- a/pkg/controller/petset/pet_set.go +++ b/pkg/controller/petset/pet_set.go @@ -81,7 +81,7 @@ type PetSetController struct { } // NewPetSetController creates a new petset controller. -func NewPetSetController(podInformer framework.SharedInformer, kubeClient *client.Client, resyncPeriod time.Duration) *PetSetController { +func NewPetSetController(podInformer framework.SharedIndexInformer, kubeClient *client.Client, resyncPeriod time.Duration) *PetSetController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(kubeClient.Events("")) @@ -105,7 +105,7 @@ func NewPetSetController(podInformer framework.SharedInformer, kubeClient *clien // lookup petset accounting for deletion tombstones DeleteFunc: psc.deletePod, }) - psc.podStore.Store = podInformer.GetStore() + psc.podStore.Indexer = podInformer.GetIndexer() psc.podController = podInformer.GetController() psc.psStore.Store, psc.psController = framework.NewInformer( diff --git a/pkg/controller/petset/pet_set_test.go b/pkg/controller/petset/pet_set_test.go index e396f264cc4..8498fce8701 100644 --- a/pkg/controller/petset/pet_set_test.go +++ b/pkg/controller/petset/pet_set_test.go @@ -35,7 +35,7 @@ func newFakePetSetController() (*PetSetController, *fakePetClient) { blockingPetStore: newUnHealthyPetTracker(fpc), podStoreSynced: func() bool { return true }, psStore: cache.StoreToPetSetLister{Store: cache.NewStore(controller.KeyFunc)}, - podStore: cache.StoreToPodLister{Store: cache.NewStore(controller.KeyFunc)}, + podStore: cache.StoreToPodLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{})}, newSyncer: func(blockingPet *pcb) *petSyncer { return &petSyncer{fpc, blockingPet} }, diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index cd8ac0270dd..8fc0f16fda1 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -200,7 +200,7 @@ func NewReplicationManager(podInformer framework.SharedIndexInformer, kubeClient // NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer. func NewReplicationManagerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager { - podInformer := informers.CreateSharedIndexPodInformer(kubeClient, resyncPeriod(), cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod()) rm := NewReplicationManager(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize) rm.internalPodInformer = podInformer diff --git a/pkg/controller/replication/replication_controller_test.go b/pkg/controller/replication/replication_controller_test.go index abfdf1591c4..376e5a4b9c9 100644 --- a/pkg/controller/replication/replication_controller_test.go +++ b/pkg/controller/replication/replication_controller_test.go @@ -231,7 +231,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) { activePods := 5 rc := newReplicationController(activePods) manager.rcStore.Indexer.Add(rc) - rc.Status = api.ReplicationControllerStatus{Replicas: activePods} + rc.Status = api.ReplicationControllerStatus{Replicas: int32(activePods)} newPodList(manager.podStore.Indexer, activePods, api.PodRunning, rc, "pod") fakePodControl := controller.FakePodControl{} @@ -1015,7 +1015,6 @@ func TestDeletionTimestamp(t *testing.T) { } } -/* func BenchmarkGetPodControllerMultiNS(b *testing.B) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) manager := NewReplicationManagerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) @@ -1101,4 +1100,3 @@ func BenchmarkGetPodControllerSingleNS(b *testing.B) { } } } -*/