diff --git a/federation/pkg/federation-controller/replicaset/replicasetcontroller.go b/federation/pkg/federation-controller/replicaset/replicasetcontroller.go index e70ef841237..fb2c39db8a6 100644 --- a/federation/pkg/federation-controller/replicaset/replicasetcontroller.go +++ b/federation/pkg/federation-controller/replicaset/replicasetcontroller.go @@ -169,7 +169,7 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe } frsc.fedPodInformer = fedutil.NewFederatedInformer(federationClient, podFedInformerFactory, &fedutil.ClusterLifecycleHandlerFuncs{}) - frsc.replicaSetStore.Store, frsc.replicaSetController = cache.NewInformer( + frsc.replicaSetStore.Indexer, frsc.replicaSetController = cache.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { versionedOptions := fedutil.VersionizeV1ListOptions(options) @@ -185,6 +185,7 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe fedutil.NewTriggerOnMetaAndSpecChanges( func(obj runtime.Object) { frsc.deliverFedReplicaSetObj(obj, replicaSetReviewDelay) }, ), + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) frsc.fedUpdater = fedutil.NewFederatedUpdater(frsc.fedReplicaSetInformer, @@ -291,7 +292,7 @@ func (frsc *ReplicaSetController) deliverLocalReplicaSet(obj interface{}, durati glog.Errorf("Couldn't get key for object %v: %v", obj, err) return } - _, exists, err := frsc.replicaSetStore.Store.GetByKey(key) + _, exists, err := frsc.replicaSetStore.Indexer.GetByKey(key) if err != nil { glog.Errorf("Couldn't get federation replicaset %v: %v", key, err) return @@ -419,7 +420,7 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) (reconciliatio startTime := time.Now() defer glog.V(4).Infof("Finished reconcile replicaset %q (%v)", key, time.Now().Sub(startTime)) - obj, exists, err := frsc.replicaSetStore.Store.GetByKey(key) + obj, exists, err := frsc.replicaSetStore.Indexer.GetByKey(key) if err != nil { return statusError, err } @@ -536,7 +537,7 @@ func (frsc *ReplicaSetController) reconcileReplicaSetsOnClusterChange() { if !frsc.isSynced() { frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay) } - rss := frsc.replicaSetStore.Store.List() + rss := frsc.replicaSetStore.Indexer.List() for _, rs := range rss { key, _ := controller.KeyFunc(rs) frsc.deliverReplicaSetByKey(key, 0, false) diff --git a/pkg/client/cache/listers.go b/pkg/client/cache/listers.go index ef3d171e213..e6e49b9b6b5 100644 --- a/pkg/client/cache/listers.go +++ b/pkg/client/cache/listers.go @@ -21,7 +21,6 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/apps" @@ -137,221 +136,6 @@ func (s storeToNodeConditionLister) List() (nodes []*api.Node, err error) { return } -// StoreToDeploymentLister gives a store List and Exists methods. The store must contain only Deployments. -type StoreToDeploymentLister struct { - Indexer -} - -// Exists checks if the given deployment exists in the store. -func (s *StoreToDeploymentLister) Exists(deployment *extensions.Deployment) (bool, error) { - _, exists, err := s.Indexer.Get(deployment) - if err != nil { - return false, err - } - return exists, nil -} - -// StoreToDeploymentLister lists all deployments in the store. -// TODO: converge on the interface in pkg/client -func (s *StoreToDeploymentLister) List() (deployments []extensions.Deployment, err error) { - for _, c := range s.Indexer.List() { - deployments = append(deployments, *(c.(*extensions.Deployment))) - } - return deployments, nil -} - -// GetDeploymentsForReplicaSet returns a list of deployments managing a replica set. Returns an error only if no matching deployments are found. -func (s *StoreToDeploymentLister) GetDeploymentsForReplicaSet(rs *extensions.ReplicaSet) (deployments []extensions.Deployment, err error) { - if len(rs.Labels) == 0 { - err = fmt.Errorf("no deployments found for ReplicaSet %v because it has no labels", rs.Name) - return - } - - // TODO: MODIFY THIS METHOD so that it checks for the podTemplateSpecHash label - dList, err := s.Deployments(rs.Namespace).List(labels.Everything()) - if err != nil { - return - } - for _, d := range dList { - selector, err := unversioned.LabelSelectorAsSelector(d.Spec.Selector) - if err != nil { - return nil, fmt.Errorf("invalid label selector: %v", err) - } - // If a deployment with a nil or empty selector creeps in, it should match nothing, not everything. - if selector.Empty() || !selector.Matches(labels.Set(rs.Labels)) { - continue - } - deployments = append(deployments, d) - } - if len(deployments) == 0 { - err = fmt.Errorf("could not find deployments set for ReplicaSet %s in namespace %s with labels: %v", rs.Name, rs.Namespace, rs.Labels) - } - return -} - -type storeToDeploymentNamespacer struct { - indexer Indexer - namespace string -} - -// storeToDeploymentNamespacer lists deployments under its namespace in the store. -func (s storeToDeploymentNamespacer) List(selector labels.Selector) (deployments []extensions.Deployment, err error) { - if s.namespace == api.NamespaceAll { - for _, m := range s.indexer.List() { - d := *(m.(*extensions.Deployment)) - if selector.Matches(labels.Set(d.Labels)) { - deployments = append(deployments, d) - } - } - return - } - - key := &extensions.Deployment{ObjectMeta: api.ObjectMeta{Namespace: s.namespace}} - items, err := s.indexer.Index(NamespaceIndex, key) - if err != nil { - // Ignore error; do slow search without index. - glog.Warningf("can not retrieve list of objects using index : %v", err) - for _, m := range s.indexer.List() { - d := *(m.(*extensions.Deployment)) - if s.namespace == d.Namespace && selector.Matches(labels.Set(d.Labels)) { - deployments = append(deployments, d) - } - } - return deployments, nil - } - for _, m := range items { - d := *(m.(*extensions.Deployment)) - if selector.Matches(labels.Set(d.Labels)) { - deployments = append(deployments, d) - } - } - return -} - -func (s *StoreToDeploymentLister) Deployments(namespace string) storeToDeploymentNamespacer { - return storeToDeploymentNamespacer{s.Indexer, namespace} -} - -// GetDeploymentsForPods returns a list of deployments managing a pod. Returns an error only if no matching deployments are found. -func (s *StoreToDeploymentLister) GetDeploymentsForPod(pod *api.Pod) (deployments []extensions.Deployment, err error) { - if len(pod.Labels) == 0 { - err = fmt.Errorf("no deployments found for Pod %v because it has no labels", pod.Name) - return - } - - if len(pod.Labels[extensions.DefaultDeploymentUniqueLabelKey]) == 0 { - return - } - - dList, err := s.Deployments(pod.Namespace).List(labels.Everything()) - if err != nil { - return - } - for _, d := range dList { - selector, err := unversioned.LabelSelectorAsSelector(d.Spec.Selector) - if err != nil { - return nil, fmt.Errorf("invalid label selector: %v", err) - } - // If a deployment with a nil or empty selector creeps in, it should match nothing, not everything. - if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) { - continue - } - deployments = append(deployments, d) - } - if len(deployments) == 0 { - err = fmt.Errorf("could not find deployments set for Pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) - } - return -} - -// StoreToReplicaSetLister gives a store List and Exists methods. The store must contain only ReplicaSets. -type StoreToReplicaSetLister struct { - Store -} - -// Exists checks if the given ReplicaSet exists in the store. -func (s *StoreToReplicaSetLister) Exists(rs *extensions.ReplicaSet) (bool, error) { - _, exists, err := s.Store.Get(rs) - if err != nil { - return false, err - } - return exists, nil -} - -// List lists all ReplicaSets in the store. -// TODO: converge on the interface in pkg/client -func (s *StoreToReplicaSetLister) List() (rss []extensions.ReplicaSet, err error) { - for _, rs := range s.Store.List() { - rss = append(rss, *(rs.(*extensions.ReplicaSet))) - } - return rss, nil -} - -type storeReplicaSetsNamespacer struct { - store Store - namespace string -} - -func (s storeReplicaSetsNamespacer) List(selector labels.Selector) (rss []extensions.ReplicaSet, err error) { - for _, c := range s.store.List() { - rs := *(c.(*extensions.ReplicaSet)) - if s.namespace == api.NamespaceAll || s.namespace == rs.Namespace { - if selector.Matches(labels.Set(rs.Labels)) { - rss = append(rss, rs) - } - } - } - return -} - -func (s storeReplicaSetsNamespacer) Get(name string) (*extensions.ReplicaSet, error) { - obj, exists, err := s.store.GetByKey(s.namespace + "/" + name) - if err != nil { - return nil, err - } - if !exists { - return nil, errors.NewNotFound(extensions.Resource("replicaset"), name) - } - return obj.(*extensions.ReplicaSet), nil -} - -func (s *StoreToReplicaSetLister) ReplicaSets(namespace string) storeReplicaSetsNamespacer { - return storeReplicaSetsNamespacer{s.Store, namespace} -} - -// GetPodReplicaSets returns a list of ReplicaSets managing a pod. Returns an error only if no matching ReplicaSets are found. -func (s *StoreToReplicaSetLister) GetPodReplicaSets(pod *api.Pod) (rss []extensions.ReplicaSet, err error) { - var selector labels.Selector - var rs extensions.ReplicaSet - - if len(pod.Labels) == 0 { - err = fmt.Errorf("no ReplicaSets found for pod %v because it has no labels", pod.Name) - return - } - - for _, m := range s.Store.List() { - rs = *m.(*extensions.ReplicaSet) - if rs.Namespace != pod.Namespace { - continue - } - selector, err = unversioned.LabelSelectorAsSelector(rs.Spec.Selector) - if err != nil { - err = fmt.Errorf("invalid selector: %v", err) - return - } - - // If a ReplicaSet with a nil or empty selector creeps in, it should match nothing, not everything. - if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) { - continue - } - rss = append(rss, rs) - } - if len(rss) == 0 { - err = fmt.Errorf("could not find ReplicaSet for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) - } - return -} - // StoreToDaemonSetLister gives a store List and Exists methods. The store must contain only DaemonSets. type StoreToDaemonSetLister struct { Store diff --git a/pkg/client/cache/listers_extensions.go b/pkg/client/cache/listers_extensions.go new file mode 100644 index 00000000000..539657765d1 --- /dev/null +++ b/pkg/client/cache/listers_extensions.go @@ -0,0 +1,210 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "fmt" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/extensions" + "k8s.io/kubernetes/pkg/labels" +) + +// TODO: generate these classes and methods for all resources of interest using +// a script. Can use "go generate" once 1.4 is supported by all users. + +// Lister makes an Index have the List method. The Stores must contain only the expected type +// Example: +// s := cache.NewStore() +// lw := cache.ListWatch{Client: c, FieldSelector: sel, Resource: "pods"} +// r := cache.NewReflector(lw, &extensions.Deployment{}, s).Run() +// l := StoreToDeploymentLister{s} +// l.List() + +// StoreToDeploymentLister helps list deployments +type StoreToDeploymentLister struct { + Indexer Indexer +} + +func (s *StoreToDeploymentLister) List(selector labels.Selector) (ret []*extensions.Deployment, err error) { + err = ListAll(s.Indexer, selector, func(m interface{}) { + ret = append(ret, m.(*extensions.Deployment)) + }) + return ret, err +} + +func (s *StoreToDeploymentLister) Deployments(namespace string) storeDeploymentsNamespacer { + return storeDeploymentsNamespacer{Indexer: s.Indexer, namespace: namespace} +} + +type storeDeploymentsNamespacer struct { + Indexer Indexer + namespace string +} + +func (s storeDeploymentsNamespacer) List(selector labels.Selector) (ret []*extensions.Deployment, err error) { + err = ListAllByNamespace(s.Indexer, s.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*extensions.Deployment)) + }) + return ret, err +} + +func (s storeDeploymentsNamespacer) Get(name string) (*extensions.Deployment, error) { + obj, exists, err := s.Indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(extensions.Resource("deployment"), name) + } + return obj.(*extensions.Deployment), nil +} + +// GetDeploymentsForReplicaSet returns a list of deployments managing a replica set. Returns an error only if no matching deployments are found. +func (s *StoreToDeploymentLister) GetDeploymentsForReplicaSet(rs *extensions.ReplicaSet) (deployments []*extensions.Deployment, err error) { + if len(rs.Labels) == 0 { + err = fmt.Errorf("no deployments found for ReplicaSet %v because it has no labels", rs.Name) + return + } + + // TODO: MODIFY THIS METHOD so that it checks for the podTemplateSpecHash label + dList, err := s.Deployments(rs.Namespace).List(labels.Everything()) + if err != nil { + return + } + for _, d := range dList { + selector, err := unversioned.LabelSelectorAsSelector(d.Spec.Selector) + if err != nil { + return nil, fmt.Errorf("invalid label selector: %v", err) + } + // If a deployment with a nil or empty selector creeps in, it should match nothing, not everything. + if selector.Empty() || !selector.Matches(labels.Set(rs.Labels)) { + continue + } + deployments = append(deployments, d) + } + if len(deployments) == 0 { + err = fmt.Errorf("could not find deployments set for ReplicaSet %s in namespace %s with labels: %v", rs.Name, rs.Namespace, rs.Labels) + } + return +} + +// GetDeploymentsForDeployments returns a list of deployments managing a pod. Returns an error only if no matching deployments are found. +// TODO eliminate shallow copies +func (s *StoreToDeploymentLister) GetDeploymentsForPod(pod *api.Pod) (deployments []*extensions.Deployment, err error) { + if len(pod.Labels) == 0 { + err = fmt.Errorf("no deployments found for Pod %v because it has no labels", pod.Name) + return + } + + if len(pod.Labels[extensions.DefaultDeploymentUniqueLabelKey]) == 0 { + return + } + + dList, err := s.Deployments(pod.Namespace).List(labels.Everything()) + if err != nil { + return + } + for _, d := range dList { + selector, err := unversioned.LabelSelectorAsSelector(d.Spec.Selector) + if err != nil { + return nil, fmt.Errorf("invalid label selector: %v", err) + } + // If a deployment with a nil or empty selector creeps in, it should match nothing, not everything. + if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) { + continue + } + deployments = append(deployments, d) + } + if len(deployments) == 0 { + err = fmt.Errorf("could not find deployments set for Pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + } + return +} + +// StoreToReplicaSetLister helps list replicasets +type StoreToReplicaSetLister struct { + Indexer Indexer +} + +func (s *StoreToReplicaSetLister) List(selector labels.Selector) (ret []*extensions.ReplicaSet, err error) { + err = ListAll(s.Indexer, selector, func(m interface{}) { + ret = append(ret, m.(*extensions.ReplicaSet)) + }) + return ret, err +} + +func (s *StoreToReplicaSetLister) ReplicaSets(namespace string) storeReplicaSetsNamespacer { + return storeReplicaSetsNamespacer{Indexer: s.Indexer, namespace: namespace} +} + +type storeReplicaSetsNamespacer struct { + Indexer Indexer + namespace string +} + +func (s storeReplicaSetsNamespacer) List(selector labels.Selector) (ret []*extensions.ReplicaSet, err error) { + err = ListAllByNamespace(s.Indexer, s.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*extensions.ReplicaSet)) + }) + return ret, err +} + +func (s storeReplicaSetsNamespacer) Get(name string) (*extensions.ReplicaSet, error) { + obj, exists, err := s.Indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(extensions.Resource("replicaset"), name) + } + return obj.(*extensions.ReplicaSet), nil +} + +// GetPodReplicaSets returns a list of ReplicaSets managing a pod. Returns an error only if no matching ReplicaSets are found. +func (s *StoreToReplicaSetLister) GetPodReplicaSets(pod *api.Pod) (rss []*extensions.ReplicaSet, err error) { + if len(pod.Labels) == 0 { + err = fmt.Errorf("no ReplicaSets found for pod %v because it has no labels", pod.Name) + return + } + + list, err := s.ReplicaSets(pod.Namespace).List(labels.Everything()) + if err != nil { + return + } + for _, rs := range list { + if rs.Namespace != pod.Namespace { + continue + } + selector, err := unversioned.LabelSelectorAsSelector(rs.Spec.Selector) + if err != nil { + return nil, fmt.Errorf("invalid selector: %v", err) + } + + // If a ReplicaSet with a nil or empty selector creeps in, it should match nothing, not everything. + if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) { + continue + } + rss = append(rss, rs) + } + if len(rss) == 0 { + err = fmt.Errorf("could not find ReplicaSet for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + } + return +} diff --git a/pkg/client/cache/listers_test.go b/pkg/client/cache/listers_test.go index 36223eabd96..597362eb800 100644 --- a/pkg/client/cache/listers_test.go +++ b/pkg/client/cache/listers_test.go @@ -280,11 +280,11 @@ func TestStoreToReplicationControllerLister(t *testing.T) { } func TestStoreToReplicaSetLister(t *testing.T) { - store := NewStore(MetaNamespaceKeyFunc) + store := NewIndexer(MetaNamespaceKeyFunc, Indexers{NamespaceIndex: MetaNamespaceIndexFunc}) lister := StoreToReplicaSetLister{store} testCases := []struct { inRSs []*extensions.ReplicaSet - list func() ([]extensions.ReplicaSet, error) + list func() ([]*extensions.ReplicaSet, error) outRSNames sets.String expectErr bool }{ @@ -293,8 +293,8 @@ func TestStoreToReplicaSetLister(t *testing.T) { inRSs: []*extensions.ReplicaSet{ {ObjectMeta: api.ObjectMeta{Name: "basic"}}, }, - list: func() ([]extensions.ReplicaSet, error) { - return lister.List() + list: func() ([]*extensions.ReplicaSet, error) { + return lister.List(labels.Everything()) }, outRSNames: sets.NewString("basic"), }, @@ -308,7 +308,7 @@ func TestStoreToReplicaSetLister(t *testing.T) { }, }, }, - list: func() ([]extensions.ReplicaSet, error) { + list: func() ([]*extensions.ReplicaSet, error) { pod := &api.Pod{ ObjectMeta: api.ObjectMeta{Name: "pod1", Namespace: "ns"}, } @@ -324,7 +324,7 @@ func TestStoreToReplicaSetLister(t *testing.T) { ObjectMeta: api.ObjectMeta{Name: "basic", Namespace: "ns"}, }, }, - list: func() ([]extensions.ReplicaSet, error) { + list: func() ([]*extensions.ReplicaSet, error) { pod := &api.Pod{ ObjectMeta: api.ObjectMeta{ Name: "pod1", @@ -353,7 +353,7 @@ func TestStoreToReplicaSetLister(t *testing.T) { }, }, }, - list: func() ([]extensions.ReplicaSet, error) { + list: func() ([]*extensions.ReplicaSet, error) { pod := &api.Pod{ ObjectMeta: api.ObjectMeta{ Name: "pod1", diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index 716d0522136..2a7e7a65c13 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -67,27 +67,27 @@ type DeploymentController struct { syncHandler func(dKey string) error // A store of deployments, populated by the dController - dStore cache.StoreToDeploymentLister + dLister cache.StoreToDeploymentLister // Watches changes to all deployments dController *cache.Controller // A store of ReplicaSets, populated by the rsController - rsStore cache.StoreToReplicaSetLister + rsLister cache.StoreToReplicaSetLister // Watches changes to all ReplicaSets rsController *cache.Controller // A store of pods, populated by the podController - podStore cache.StoreToPodLister + podLister cache.StoreToPodLister // Watches changes to all pods podController *cache.Controller - // dStoreSynced returns true if the Deployment store has been synced at least once. + // dListerSynced returns true if the Deployment store has been synced at least once. // Added as a member to the struct to allow injection for testing. - dStoreSynced func() bool - // rsStoreSynced returns true if the ReplicaSet store has been synced at least once. + dListerSynced func() bool + // rsListerSynced returns true if the ReplicaSet store has been synced at least once. // Added as a member to the struct to allow injection for testing. - rsStoreSynced func() bool - // podStoreSynced returns true if the pod store has been synced at least once. + rsListerSynced func() bool + // podListerSynced returns true if the pod store has been synced at least once. // Added as a member to the struct to allow injection for testing. - podStoreSynced func() bool + podListerSynced func() bool // Deployments that need to be synced queue workqueue.RateLimitingInterface @@ -109,7 +109,7 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"), } - dc.dStore.Indexer, dc.dController = cache.NewIndexerInformer( + dc.dLister.Indexer, dc.dController = cache.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { return dc.client.Extensions().Deployments(api.NamespaceAll).List(options) @@ -129,7 +129,7 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) - dc.rsStore.Store, dc.rsController = cache.NewInformer( + dc.rsLister.Indexer, dc.rsController = cache.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { return dc.client.Extensions().ReplicaSets(api.NamespaceAll).List(options) @@ -145,9 +145,10 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller UpdateFunc: dc.updateReplicaSet, DeleteFunc: dc.deleteReplicaSet, }, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) - dc.podStore.Indexer, dc.podController = cache.NewIndexerInformer( + dc.podLister.Indexer, dc.podController = cache.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { return dc.client.Core().Pods(api.NamespaceAll).List(options) @@ -167,9 +168,9 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller ) dc.syncHandler = dc.syncDeployment - dc.dStoreSynced = dc.dController.HasSynced - dc.rsStoreSynced = dc.rsController.HasSynced - dc.podStoreSynced = dc.podController.HasSynced + dc.dListerSynced = dc.dController.HasSynced + dc.rsListerSynced = dc.rsController.HasSynced + dc.podListerSynced = dc.podController.HasSynced return dc } @@ -183,7 +184,7 @@ func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) { // Wait for the rc and dc stores to sync before starting any work in this controller. ready := make(chan struct{}) - go dc.waitForSyncedStores(ready, stopCh) + go dc.waitForSyncedListers(ready, stopCh) select { case <-ready: case <-stopCh: @@ -199,10 +200,10 @@ func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) { dc.queue.ShutDown() } -func (dc *DeploymentController) waitForSyncedStores(ready chan<- struct{}, stopCh <-chan struct{}) { +func (dc *DeploymentController) waitForSyncedListers(ready chan<- struct{}, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() - for !dc.dStoreSynced() || !dc.rsStoreSynced() || !dc.podStoreSynced() { + for !dc.dListerSynced() || !dc.rsListerSynced() || !dc.podListerSynced() { select { case <-time.After(StoreSyncedPollPeriod): case <-stopCh: @@ -255,7 +256,7 @@ func (dc *DeploymentController) addReplicaSet(obj interface{}) { // getDeploymentForReplicaSet returns the deployment managing the given ReplicaSet. func (dc *DeploymentController) getDeploymentForReplicaSet(rs *extensions.ReplicaSet) *extensions.Deployment { - deployments, err := dc.dStore.GetDeploymentsForReplicaSet(rs) + deployments, err := dc.dLister.GetDeploymentsForReplicaSet(rs) if err != nil || len(deployments) == 0 { glog.V(4).Infof("Error: %v. No deployment found for ReplicaSet %v, deployment controller will avoid syncing.", err, rs.Name) return nil @@ -268,7 +269,7 @@ func (dc *DeploymentController) getDeploymentForReplicaSet(rs *extensions.Replic sort.Sort(util.BySelectorLastUpdateTime(deployments)) glog.Errorf("user error! more than one deployment is selecting replica set %s/%s with labels: %#v, returning %s/%s", rs.Namespace, rs.Name, rs.Labels, deployments[0].Namespace, deployments[0].Name) } - return &deployments[0] + return deployments[0] } // updateReplicaSet figures out what deployment(s) manage a ReplicaSet when the ReplicaSet @@ -328,7 +329,7 @@ func (dc *DeploymentController) deleteReplicaSet(obj interface{}) { // getDeploymentForPod returns the deployment that manages the given Pod. // If there are multiple deployments for a given Pod, only return the oldest one. func (dc *DeploymentController) getDeploymentForPod(pod *api.Pod) *extensions.Deployment { - deployments, err := dc.dStore.GetDeploymentsForPod(pod) + deployments, err := dc.dLister.GetDeploymentsForPod(pod) if err != nil || len(deployments) == 0 { glog.V(4).Infof("Error: %v. No deployment found for Pod %v, deployment controller will avoid syncing.", err, pod.Name) return nil @@ -338,7 +339,7 @@ func (dc *DeploymentController) getDeploymentForPod(pod *api.Pod) *extensions.De sort.Sort(util.BySelectorLastUpdateTime(deployments)) glog.Errorf("user error! more than one deployment is selecting pod %s/%s with labels: %#v, returning %s/%s", pod.Namespace, pod.Name, pod.Labels, deployments[0].Namespace, deployments[0].Name) } - return &deployments[0] + return deployments[0] } // When a pod is created, ensure its controller syncs @@ -478,7 +479,7 @@ func (dc *DeploymentController) syncDeployment(key string) error { glog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Now().Sub(startTime)) }() - obj, exists, err := dc.dStore.Indexer.GetByKey(key) + obj, exists, err := dc.dLister.Indexer.GetByKey(key) if err != nil { glog.Errorf("Unable to retrieve deployment %v from store: %v", key, err) return err @@ -548,24 +549,27 @@ func (dc *DeploymentController) handleOverlap(d *extensions.Deployment) error { if err != nil { return fmt.Errorf("deployment %s/%s has invalid label selector: %v", d.Namespace, d.Name, err) } - deployments, err := dc.dStore.Deployments(d.Namespace).List(labels.Everything()) + deployments, err := dc.dLister.Deployments(d.Namespace).List(labels.Everything()) if err != nil { return fmt.Errorf("error listing deployments in namespace %s: %v", d.Namespace, err) } overlapping := false - for i := range deployments { - other := &deployments[i] + for _, other := range deployments { if !selector.Empty() && selector.Matches(labels.Set(other.Spec.Template.Labels)) && d.UID != other.UID { + deploymentCopy, err := util.DeploymentDeepCopy(other) + if err != nil { + return err + } overlapping = true // We don't care if the overlapping annotation update failed or not (we don't make decision on it) d, _ = dc.markDeploymentOverlap(d, other.Name) - other, _ = dc.markDeploymentOverlap(other, d.Name) + deploymentCopy, _ = dc.markDeploymentOverlap(deploymentCopy, d.Name) // Skip syncing this one if older overlapping one is found // TODO: figure out a better way to determine which deployment to skip, // either with controller reference, or with validation. // Using oldest active replica set to determine which deployment to skip wouldn't make much difference, // since new replica set hasn't been created after selector update - if util.SelectorUpdatedBefore(other, d) { + if util.SelectorUpdatedBefore(deploymentCopy, d) { return fmt.Errorf("found deployment %s/%s has overlapping selector with an older deployment %s/%s, skip syncing it", d.Namespace, d.Name, other.Namespace, other.Name) } } diff --git a/pkg/controller/deployment/deployment_controller_test.go b/pkg/controller/deployment/deployment_controller_test.go index 84c15795f95..c5392149523 100644 --- a/pkg/controller/deployment/deployment_controller_test.go +++ b/pkg/controller/deployment/deployment_controller_test.go @@ -157,9 +157,9 @@ type fixture struct { client *fake.Clientset // Objects to put in the store. - dStore []*exp.Deployment - rsStore []*exp.ReplicaSet - podStore []*api.Pod + dLister []*exp.Deployment + rsLister []*exp.ReplicaSet + podLister []*api.Pod // Actions expected to happen on the client. Objects from here are also // preloaded into NewSimpleFake. @@ -200,16 +200,16 @@ func (f *fixture) run(deploymentName string) { f.client = fake.NewSimpleClientset(f.objects...) c := NewDeploymentController(f.client, controller.NoResyncPeriodFunc) c.eventRecorder = &record.FakeRecorder{} - c.rsStoreSynced = alwaysReady - c.podStoreSynced = alwaysReady - for _, d := range f.dStore { - c.dStore.Indexer.Add(d) + c.rsListerSynced = alwaysReady + c.podListerSynced = alwaysReady + for _, d := range f.dLister { + c.dLister.Indexer.Add(d) } - for _, rs := range f.rsStore { - c.rsStore.Store.Add(rs) + for _, rs := range f.rsLister { + c.rsLister.Indexer.Add(rs) } - for _, pod := range f.podStore { - c.podStore.Indexer.Add(pod) + for _, pod := range f.podLister { + c.podLister.Indexer.Add(pod) } err := c.syncDeployment(deploymentName) @@ -240,7 +240,7 @@ func TestSyncDeploymentCreatesReplicaSet(t *testing.T) { f := newFixture(t) d := newDeployment(1, nil) - f.dStore = append(f.dStore, d) + f.dLister = append(f.dLister, d) f.objects = append(f.objects, d) rs := newReplicaSet(d, "deploymentrs-4186632231", 1) @@ -258,7 +258,7 @@ func TestSyncDeploymentDontDoAnythingDuringDeletion(t *testing.T) { d := newDeployment(1, nil) now := unversioned.Now() d.DeletionTimestamp = &now - f.dStore = append(f.dStore, d) + f.dLister = append(f.dLister, d) f.run(getKey(d, t)) } @@ -269,13 +269,13 @@ func TestDeploymentController_dontSyncDeploymentsWithEmptyPodSelector(t *testing controller := NewDeploymentController(fake, controller.NoResyncPeriodFunc) controller.eventRecorder = &record.FakeRecorder{} - controller.rsStoreSynced = alwaysReady - controller.podStoreSynced = alwaysReady + controller.rsListerSynced = alwaysReady + controller.podListerSynced = alwaysReady d := newDeployment(1, nil) empty := unversioned.LabelSelector{} d.Spec.Selector = &empty - controller.dStore.Indexer.Add(d) + controller.dLister.Indexer.Add(d) // We expect the deployment controller to not take action here since it's configuration // is invalid, even though no replicasets exist that match it's selector. controller.syncDeployment(fmt.Sprintf("%s/%s", d.ObjectMeta.Namespace, d.ObjectMeta.Name)) diff --git a/pkg/controller/deployment/recreate.go b/pkg/controller/deployment/recreate.go index a0b238d0327..e3ec1d7c6b9 100644 --- a/pkg/controller/deployment/recreate.go +++ b/pkg/controller/deployment/recreate.go @@ -108,7 +108,7 @@ func (dc *DeploymentController) waitForInactiveReplicaSets(oldRSs []*extensions. statusReplicas := rs.Status.Replicas if err := wait.ExponentialBackoff(unversionedclient.DefaultRetry, func() (bool, error) { - replicaSet, err := dc.rsStore.ReplicaSets(rs.Namespace).Get(rs.Name) + replicaSet, err := dc.rsLister.ReplicaSets(rs.Namespace).Get(rs.Name) if err != nil { return false, err } diff --git a/pkg/controller/deployment/sync.go b/pkg/controller/deployment/sync.go index 1479d02f701..f739462f52b 100644 --- a/pkg/controller/deployment/sync.go +++ b/pkg/controller/deployment/sync.go @@ -103,15 +103,15 @@ func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(deployment *ext } // rsAndPodsWithHashKeySynced returns the RSes and pods the given deployment targets, with pod-template-hash information synced. -func (dc *DeploymentController) rsAndPodsWithHashKeySynced(deployment *extensions.Deployment) ([]extensions.ReplicaSet, *api.PodList, error) { +func (dc *DeploymentController) rsAndPodsWithHashKeySynced(deployment *extensions.Deployment) ([]*extensions.ReplicaSet, *api.PodList, error) { rsList, err := deploymentutil.ListReplicaSets(deployment, - func(namespace string, options api.ListOptions) ([]extensions.ReplicaSet, error) { - return dc.rsStore.ReplicaSets(namespace).List(options.LabelSelector) + func(namespace string, options api.ListOptions) ([]*extensions.ReplicaSet, error) { + return dc.rsLister.ReplicaSets(namespace).List(options.LabelSelector) }) if err != nil { return nil, nil, fmt.Errorf("error listing ReplicaSets: %v", err) } - syncedRSList := []extensions.ReplicaSet{} + syncedRSList := []*extensions.ReplicaSet{} for _, rs := range rsList { // Add pod-template-hash information if it's not in the RS. // Otherwise, new RS produced by Deployment will overlap with pre-existing ones @@ -120,7 +120,7 @@ func (dc *DeploymentController) rsAndPodsWithHashKeySynced(deployment *extension if err != nil { return nil, nil, err } - syncedRSList = append(syncedRSList, *syncedRS) + syncedRSList = append(syncedRSList, syncedRS) } syncedPodList, err := dc.listPods(deployment) if err != nil { @@ -133,8 +133,13 @@ func (dc *DeploymentController) rsAndPodsWithHashKeySynced(deployment *extension // 1. Add hash label to the rs's pod template, and make sure the controller sees this update so that no orphaned pods will be created // 2. Add hash label to all pods this rs owns, wait until replicaset controller reports rs.Status.FullyLabeledReplicas equal to the desired number of replicas // 3. Add hash label to the rs's label and selector -func (dc *DeploymentController) addHashKeyToRSAndPods(rs extensions.ReplicaSet) (updatedRS *extensions.ReplicaSet, err error) { - updatedRS = &rs +func (dc *DeploymentController) addHashKeyToRSAndPods(rs *extensions.ReplicaSet) (updatedRS *extensions.ReplicaSet, err error) { + objCopy, err := api.Scheme.Copy(rs) + if err != nil { + return nil, err + } + updatedRS = objCopy.(*extensions.ReplicaSet) + // If the rs already has the new hash label in its selector, it's done syncing if labelsutil.SelectorHasLabel(rs.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) { return @@ -158,7 +163,7 @@ func (dc *DeploymentController) addHashKeyToRSAndPods(rs extensions.ReplicaSet) if !rsUpdated { // If RS wasn't updated but didn't return error in step 1, we've hit a RS not found error. // Return here and retry in the next sync loop. - return &rs, nil + return rs, nil } // Make sure rs pod template is updated so that it won't create pods without the new label (orphaned pods). if updatedRS.Generation > updatedRS.Status.ObservedGeneration { @@ -174,7 +179,7 @@ func (dc *DeploymentController) addHashKeyToRSAndPods(rs extensions.ReplicaSet) return nil, fmt.Errorf("error in converting selector to label selector for replica set %s: %s", updatedRS.Name, err) } options := api.ListOptions{LabelSelector: selector} - pods, err := dc.podStore.Pods(namespace).List(options.LabelSelector) + pods, err := dc.podLister.Pods(namespace).List(options.LabelSelector) if err != nil { return nil, fmt.Errorf("error in getting pod list for namespace %s and list options %+v: %s", namespace, options, err) } @@ -228,7 +233,7 @@ func (dc *DeploymentController) addHashKeyToRSAndPods(rs extensions.ReplicaSet) func (dc *DeploymentController) listPods(deployment *extensions.Deployment) (*api.PodList, error) { return deploymentutil.ListPods(deployment, func(namespace string, options api.ListOptions) (*api.PodList, error) { - pods, err := dc.podStore.Pods(namespace).List(options.LabelSelector) + pods, err := dc.podLister.Pods(namespace).List(options.LabelSelector) result := api.PodList{Items: make([]api.Pod, 0, len(pods))} for i := range pods { result.Items = append(result.Items, *pods[i]) @@ -242,7 +247,7 @@ func (dc *DeploymentController) listPods(deployment *extensions.Deployment) (*ap // 2. If there's existing new RS, update its revision number if it's smaller than (maxOldRevision + 1), where maxOldRevision is the max revision number among all old RSes. // 3. If there's no existing new RS and createIfNotExisted is true, create one with appropriate revision number (maxOldRevision + 1) and replicas. // Note that the pod-template-hash will be added to adopted RSes and pods. -func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployment, rsList []extensions.ReplicaSet, maxOldRevision int64, oldRSs []*extensions.ReplicaSet, createIfNotExisted bool) (*extensions.ReplicaSet, error) { +func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployment, rsList []*extensions.ReplicaSet, maxOldRevision int64, oldRSs []*extensions.ReplicaSet, createIfNotExisted bool) (*extensions.ReplicaSet, error) { // Calculate revision number for this new replica set newRevision := strconv.FormatInt(maxOldRevision+1, 10) @@ -250,11 +255,17 @@ func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployme if err != nil { return nil, err } else if existingNewRS != nil { - // Set existing new replica set's annotation - if deploymentutil.SetNewReplicaSetAnnotations(deployment, existingNewRS, newRevision, true) { - return dc.client.Extensions().ReplicaSets(deployment.ObjectMeta.Namespace).Update(existingNewRS) + objCopy, err := api.Scheme.Copy(existingNewRS) + if err != nil { + return nil, err } - return existingNewRS, nil + rsCopy := objCopy.(*extensions.ReplicaSet) + + // Set existing new replica set's annotation + if deploymentutil.SetNewReplicaSetAnnotations(deployment, rsCopy, newRevision, true) { + return dc.client.Extensions().ReplicaSets(deployment.ObjectMeta.Namespace).Update(rsCopy) + } + return rsCopy, nil } if !createIfNotExisted { diff --git a/pkg/controller/deployment/sync_test.go b/pkg/controller/deployment/sync_test.go index 97a7ccb4825..1b05bc804b0 100644 --- a/pkg/controller/deployment/sync_test.go +++ b/pkg/controller/deployment/sync_test.go @@ -326,10 +326,10 @@ func TestDeploymentController_cleanupDeployment(t *testing.T) { controller := NewDeploymentController(fake, controller.NoResyncPeriodFunc) controller.eventRecorder = &record.FakeRecorder{} - controller.rsStoreSynced = alwaysReady - controller.podStoreSynced = alwaysReady + controller.rsListerSynced = alwaysReady + controller.podListerSynced = alwaysReady for _, rs := range test.oldRSs { - controller.rsStore.Add(rs) + controller.rsLister.Indexer.Add(rs) } d := newDeployment(1, &tests[i].revisionHistoryLimit) diff --git a/pkg/controller/deployment/util/deployment_util.go b/pkg/controller/deployment/util/deployment_util.go index ed62f44158e..9a081501b0e 100644 --- a/pkg/controller/deployment/util/deployment_util.go +++ b/pkg/controller/deployment/util/deployment_util.go @@ -368,11 +368,18 @@ func GetNewReplicaSet(deployment *extensions.Deployment, c clientset.Interface) } // listReplicaSets lists all RSes the given deployment targets with the given client interface. -func listReplicaSets(deployment *extensions.Deployment, c clientset.Interface) ([]extensions.ReplicaSet, error) { +func listReplicaSets(deployment *extensions.Deployment, c clientset.Interface) ([]*extensions.ReplicaSet, error) { return ListReplicaSets(deployment, - func(namespace string, options api.ListOptions) ([]extensions.ReplicaSet, error) { + func(namespace string, options api.ListOptions) ([]*extensions.ReplicaSet, error) { rsList, err := c.Extensions().ReplicaSets(namespace).List(options) - return rsList.Items, err + if err != nil { + return nil, err + } + ret := []*extensions.ReplicaSet{} + for i := range rsList.Items { + ret = append(ret, &rsList.Items[i]) + } + return ret, err }) } @@ -385,11 +392,11 @@ func listPods(deployment *extensions.Deployment, c clientset.Interface) (*api.Po } // TODO: switch this to full namespacers -type rsListFunc func(string, api.ListOptions) ([]extensions.ReplicaSet, error) +type rsListFunc func(string, api.ListOptions) ([]*extensions.ReplicaSet, error) type podListFunc func(string, api.ListOptions) (*api.PodList, error) // ListReplicaSets returns a slice of RSes the given deployment targets. -func ListReplicaSets(deployment *extensions.Deployment, getRSList rsListFunc) ([]extensions.ReplicaSet, error) { +func ListReplicaSets(deployment *extensions.Deployment, getRSList rsListFunc) ([]*extensions.ReplicaSet, error) { // TODO: Right now we list replica sets by their labels. We should list them by selector, i.e. the replica set's selector // should be a superset of the deployment's selector, see https://github.com/kubernetes/kubernetes/issues/19830; // or use controllerRef, see https://github.com/kubernetes/kubernetes/issues/2210 @@ -442,7 +449,7 @@ func equalIgnoreHash(template1, template2 api.PodTemplateSpec) (bool, error) { } // FindNewReplicaSet returns the new RS this given deployment targets (the one with the same pod template). -func FindNewReplicaSet(deployment *extensions.Deployment, rsList []extensions.ReplicaSet) (*extensions.ReplicaSet, error) { +func FindNewReplicaSet(deployment *extensions.Deployment, rsList []*extensions.ReplicaSet) (*extensions.ReplicaSet, error) { newRSTemplate := GetNewReplicaSetTemplate(deployment) for i := range rsList { equal, err := equalIgnoreHash(rsList[i].Spec.Template, newRSTemplate) @@ -451,7 +458,7 @@ func FindNewReplicaSet(deployment *extensions.Deployment, rsList []extensions.Re } if equal { // This is the new ReplicaSet. - return &rsList[i], nil + return rsList[i], nil } } // new ReplicaSet does not exist. @@ -460,11 +467,11 @@ func FindNewReplicaSet(deployment *extensions.Deployment, rsList []extensions.Re // FindOldReplicaSets returns the old replica sets targeted by the given Deployment, with the given PodList and slice of RSes. // Note that the first set of old replica sets doesn't include the ones with no pods, and the second set of old replica sets include all old replica sets. -func FindOldReplicaSets(deployment *extensions.Deployment, rsList []extensions.ReplicaSet, podList *api.PodList) ([]*extensions.ReplicaSet, []*extensions.ReplicaSet, error) { +func FindOldReplicaSets(deployment *extensions.Deployment, rsList []*extensions.ReplicaSet, podList *api.PodList) ([]*extensions.ReplicaSet, []*extensions.ReplicaSet, error) { // Find all pods whose labels match deployment.Spec.Selector, and corresponding replica sets for pods in podList. // All pods and replica sets are labeled with pod-template-hash to prevent overlapping - oldRSs := map[string]extensions.ReplicaSet{} - allOldRSs := map[string]extensions.ReplicaSet{} + oldRSs := map[string]*extensions.ReplicaSet{} + allOldRSs := map[string]*extensions.ReplicaSet{} newRSTemplate := GetNewReplicaSetTemplate(deployment) for _, pod := range podList.Items { podLabelsSelector := labels.Set(pod.ObjectMeta.Labels) @@ -490,12 +497,12 @@ func FindOldReplicaSets(deployment *extensions.Deployment, rsList []extensions.R requiredRSs := []*extensions.ReplicaSet{} for key := range oldRSs { value := oldRSs[key] - requiredRSs = append(requiredRSs, &value) + requiredRSs = append(requiredRSs, value) } allRSs := []*extensions.ReplicaSet{} for key := range allOldRSs { value := allOldRSs[key] - allRSs = append(allRSs, &value) + allRSs = append(allRSs, value) } return requiredRSs, allRSs, nil } @@ -826,12 +833,12 @@ func LastSelectorUpdate(d *extensions.Deployment) unversioned.Time { // BySelectorLastUpdateTime sorts a list of deployments by the last update time of their selector, // first using their creation timestamp and then their names as a tie breaker. -type BySelectorLastUpdateTime []extensions.Deployment +type BySelectorLastUpdateTime []*extensions.Deployment func (o BySelectorLastUpdateTime) Len() int { return len(o) } func (o BySelectorLastUpdateTime) Swap(i, j int) { o[i], o[j] = o[j], o[i] } func (o BySelectorLastUpdateTime) Less(i, j int) bool { - ti, tj := LastSelectorUpdate(&o[i]), LastSelectorUpdate(&o[j]) + ti, tj := LastSelectorUpdate(o[i]), LastSelectorUpdate(o[j]) if ti.Equal(tj) { if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) { return o[i].Name < o[j].Name diff --git a/pkg/controller/deployment/util/deployment_util_test.go b/pkg/controller/deployment/util/deployment_util_test.go index c95670ed5ee..4eab29807ab 100644 --- a/pkg/controller/deployment/util/deployment_util_test.go +++ b/pkg/controller/deployment/util/deployment_util_test.go @@ -512,19 +512,19 @@ func TestFindNewReplicaSet(t *testing.T) { tests := []struct { test string deployment extensions.Deployment - rsList []extensions.ReplicaSet + rsList []*extensions.ReplicaSet expected *extensions.ReplicaSet }{ { test: "Get new ReplicaSet with the same spec but different pod-template-hash value", deployment: deployment, - rsList: []extensions.ReplicaSet{newRS, oldRS}, + rsList: []*extensions.ReplicaSet{&newRS, &oldRS}, expected: &newRS, }, { test: "Get nil new ReplicaSet", deployment: deployment, - rsList: []extensions.ReplicaSet{oldRS}, + rsList: []*extensions.ReplicaSet{&oldRS}, expected: nil, }, } @@ -550,14 +550,14 @@ func TestFindOldReplicaSets(t *testing.T) { tests := []struct { test string deployment extensions.Deployment - rsList []extensions.ReplicaSet + rsList []*extensions.ReplicaSet podList *api.PodList expected []*extensions.ReplicaSet }{ { test: "Get old ReplicaSets", deployment: deployment, - rsList: []extensions.ReplicaSet{newRS, oldRS}, + rsList: []*extensions.ReplicaSet{&newRS, &oldRS}, podList: &api.PodList{ Items: []api.Pod{ newPod, @@ -569,7 +569,7 @@ func TestFindOldReplicaSets(t *testing.T) { { test: "Get old ReplicaSets with no new ReplicaSet", deployment: deployment, - rsList: []extensions.ReplicaSet{oldRS}, + rsList: []*extensions.ReplicaSet{&oldRS}, podList: &api.PodList{ Items: []api.Pod{ oldPod, @@ -580,7 +580,7 @@ func TestFindOldReplicaSets(t *testing.T) { { test: "Get empty old ReplicaSets", deployment: deployment, - rsList: []extensions.ReplicaSet{newRS}, + rsList: []*extensions.ReplicaSet{&newRS}, podList: &api.PodList{ Items: []api.Pod{ newPod, diff --git a/pkg/controller/disruption/disruption.go b/pkg/controller/disruption/disruption.go index 2c11f82a92d..2b3736f031b 100644 --- a/pkg/controller/disruption/disruption.go +++ b/pkg/controller/disruption/disruption.go @@ -142,7 +142,7 @@ func NewDisruptionController(podInformer cache.SharedIndexInformer, kubeClient i dc.rcLister.Indexer = dc.rcIndexer - dc.rsStore, dc.rsController = cache.NewInformer( + dc.rsLister.Indexer, dc.rsController = cache.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { return dc.kubeClient.Extensions().ReplicaSets(api.NamespaceAll).List(options) @@ -154,9 +154,9 @@ func NewDisruptionController(podInformer cache.SharedIndexInformer, kubeClient i &extensions.ReplicaSet{}, 30*time.Second, cache.ResourceEventHandlerFuncs{}, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) - - dc.rsLister.Store = dc.rsStore + dc.rsStore = dc.rsLister.Indexer dc.dIndexer, dc.dController = cache.NewIndexerInformer( &cache.ListWatch{ @@ -202,7 +202,7 @@ func (dc *DisruptionController) getPodReplicaSets(pod *api.Pod) ([]controllerAnd for _, rs := range rss { // GetDeploymentsForReplicaSet returns an error only if no matching // deployments are found. - _, err := dc.dLister.GetDeploymentsForReplicaSet(&rs) + _, err := dc.dLister.GetDeploymentsForReplicaSet(rs) if err == nil { // A deployment was found, so this finder will not count this RS. continue } @@ -227,7 +227,7 @@ func (dc *DisruptionController) getPodDeployments(pod *api.Pod) ([]controllerAnd } controllerScale := map[types.UID]int32{} for _, rs := range rss { - ds, err := dc.dLister.GetDeploymentsForReplicaSet(&rs) + ds, err := dc.dLister.GetDeploymentsForReplicaSet(rs) // GetDeploymentsForReplicaSet returns an error only if no matching // deployments are found. In that case we skip this ReplicaSet. if err != nil { diff --git a/pkg/controller/disruption/disruption_test.go b/pkg/controller/disruption/disruption_test.go index ad4b8e0469e..0ef90c2ba44 100644 --- a/pkg/controller/disruption/disruption_test.go +++ b/pkg/controller/disruption/disruption_test.go @@ -80,7 +80,7 @@ func newFakeDisruptionController() (*DisruptionController, *pdbStates) { pdbLister: cache.StoreToPodDisruptionBudgetLister{Store: cache.NewStore(controller.KeyFunc)}, podLister: cache.StoreToPodLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{})}, rcLister: cache.StoreToReplicationControllerLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, - rsLister: cache.StoreToReplicaSetLister{Store: cache.NewStore(controller.KeyFunc)}, + rsLister: cache.StoreToReplicaSetLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, dLister: cache.StoreToDeploymentLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{})}, getUpdater: func() updater { return ps.Set }, broadcaster: record.NewBroadcaster(), @@ -310,7 +310,7 @@ func TestReplicaSet(t *testing.T) { add(t, dc.pdbLister.Store, pdb) rs, _ := newReplicaSet(t, 10) - add(t, dc.rsLister.Store, rs) + add(t, dc.rsLister.Indexer, rs) pod, _ := newPod(t, "pod") add(t, dc.podLister.Indexer, pod) @@ -459,7 +459,7 @@ func TestTwoControllers(t *testing.T) { rs, _ := newReplicaSet(t, collectionSize) rs.Spec.Selector = newSel(dLabels) rs.Labels = dLabels - add(t, dc.rsLister.Store, rs) + add(t, dc.rsLister.Indexer, rs) dc.sync(pdbName) ps.VerifyPdbStatus(t, pdbName, true, minimumOne+1, minimumOne, collectionSize) diff --git a/pkg/controller/informers/extensions.go b/pkg/controller/informers/extensions.go index a0539d28fd5..ba9a7a1fc98 100644 --- a/pkg/controller/informers/extensions.go +++ b/pkg/controller/informers/extensions.go @@ -68,3 +68,87 @@ func (f *daemonSetInformer) Lister() *cache.StoreToDaemonSetLister { informer := f.Informer() return &cache.StoreToDaemonSetLister{Store: informer.GetIndexer()} } + +// DeploymentInformer is a type of SharedIndexInformer which watches and lists all deployments. +type DeploymentInformer interface { + Informer() cache.SharedIndexInformer + Lister() *cache.StoreToDeploymentLister +} + +type deploymentInformer struct { + *sharedInformerFactory +} + +func (f *deploymentInformer) Informer() cache.SharedIndexInformer { + f.lock.Lock() + defer f.lock.Unlock() + + informerType := reflect.TypeOf(&extensions.Deployment{}) + informer, exists := f.informers[informerType] + if exists { + return informer + } + informer = cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return f.client.Extensions().Deployments(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return f.client.Extensions().Deployments(api.NamespaceAll).Watch(options) + }, + }, + &extensions.Deployment{}, + f.defaultResync, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) + f.informers[informerType] = informer + + return informer +} + +func (f *deploymentInformer) Lister() *cache.StoreToDeploymentLister { + informer := f.Informer() + return &cache.StoreToDeploymentLister{Indexer: informer.GetIndexer()} +} + +// ReplicaSetInformer is a type of SharedIndexInformer which watches and lists all replicasets. +type ReplicaSetInformer interface { + Informer() cache.SharedIndexInformer + Lister() *cache.StoreToReplicaSetLister +} + +type replicaSetInformer struct { + *sharedInformerFactory +} + +func (f *replicaSetInformer) Informer() cache.SharedIndexInformer { + f.lock.Lock() + defer f.lock.Unlock() + + informerType := reflect.TypeOf(&extensions.ReplicaSet{}) + informer, exists := f.informers[informerType] + if exists { + return informer + } + informer = cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return f.client.Extensions().ReplicaSets(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return f.client.Extensions().ReplicaSets(api.NamespaceAll).Watch(options) + }, + }, + &extensions.ReplicaSet{}, + f.defaultResync, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) + f.informers[informerType] = informer + + return informer +} + +func (f *replicaSetInformer) Lister() *cache.StoreToReplicaSetLister { + informer := f.Informer() + return &cache.StoreToReplicaSetLister{Indexer: informer.GetIndexer()} +} diff --git a/pkg/controller/informers/factory.go b/pkg/controller/informers/factory.go index 6350c16fb3b..6218df59167 100644 --- a/pkg/controller/informers/factory.go +++ b/pkg/controller/informers/factory.go @@ -38,6 +38,8 @@ type SharedInformerFactory interface { PersistentVolumes() PVInformer DaemonSets() DaemonSetInformer + Deployments() DeploymentInformer + ReplicaSets() ReplicaSetInformer } type sharedInformerFactory struct { @@ -102,3 +104,11 @@ func (f *sharedInformerFactory) PersistentVolumes() PVInformer { func (f *sharedInformerFactory) DaemonSets() DaemonSetInformer { return &daemonSetInformer{sharedInformerFactory: f} } + +func (f *sharedInformerFactory) Deployments() DeploymentInformer { + return &deploymentInformer{sharedInformerFactory: f} +} + +func (f *sharedInformerFactory) ReplicaSets() ReplicaSetInformer { + return &replicaSetInformer{sharedInformerFactory: f} +} diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index 825e05384e6..8ed0491f9a0 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -142,7 +142,7 @@ func newReplicaSetController(eventRecorder record.EventRecorder, podInformer cac garbageCollectorEnabled: garbageCollectorEnabled, } - rsc.rsStore.Store, rsc.rsController = cache.NewInformer( + rsc.rsStore.Indexer, rsc.rsController = cache.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { return rsc.kubeClient.Extensions().ReplicaSets(api.NamespaceAll).List(options) @@ -162,6 +162,7 @@ func newReplicaSetController(eventRecorder record.EventRecorder, podInformer cac // way of achieving this is by performing a `stop` operation on the replica set. DeleteFunc: rsc.enqueueReplicaSet, }, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -251,9 +252,9 @@ func (rsc *ReplicaSetController) getPodReplicaSet(pod *api.Pod) *extensions.Repl } // update lookup cache - rsc.lookupCache.Update(pod, &rss[0]) + rsc.lookupCache.Update(pod, rss[0]) - return &rss[0] + return rss[0] } // callback when RS is updated @@ -296,9 +297,9 @@ func (rsc *ReplicaSetController) updateRS(old, cur interface{}) { // isCacheValid check if the cache is valid func (rsc *ReplicaSetController) isCacheValid(pod *api.Pod, cachedRS *extensions.ReplicaSet) bool { - _, exists, err := rsc.rsStore.Get(cachedRS) + _, err := rsc.rsStore.ReplicaSets(cachedRS.Namespace).Get(cachedRS.Name) // rs has been deleted or updated, cache is invalid - if err != nil || !exists || !isReplicaSetMatch(pod, cachedRS) { + if err != nil || !isReplicaSetMatch(pod, cachedRS) { return false } return true @@ -564,7 +565,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error { return nil } - obj, exists, err := rsc.rsStore.Store.GetByKey(key) + obj, exists, err := rsc.rsStore.Indexer.GetByKey(key) if !exists { glog.Infof("ReplicaSet has been deleted %v", key) rsc.expectations.DeleteExpectations(key) diff --git a/pkg/controller/replicaset/replica_set_test.go b/pkg/controller/replicaset/replica_set_test.go index 0ff69064131..8844a4168e5 100644 --- a/pkg/controller/replicaset/replica_set_test.go +++ b/pkg/controller/replicaset/replica_set_test.go @@ -165,7 +165,7 @@ func TestSyncReplicaSetDoesNothing(t *testing.T) { // 2 running pods, a controller with 2 replicas, sync is a no-op labelMap := map[string]string{"foo": "bar"} rsSpec := newReplicaSet(2, labelMap) - manager.rsStore.Store.Add(rsSpec) + manager.rsStore.Indexer.Add(rsSpec) newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rsSpec, "pod") manager.podControl = &fakePodControl @@ -183,7 +183,7 @@ func TestSyncReplicaSetDeletes(t *testing.T) { // 2 running pods and a controller with 1 replica, one pod delete expected labelMap := map[string]string{"foo": "bar"} rsSpec := newReplicaSet(1, labelMap) - manager.rsStore.Store.Add(rsSpec) + manager.rsStore.Indexer.Add(rsSpec) newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rsSpec, "pod") manager.syncReplicaSet(getKey(rsSpec, t)) @@ -207,7 +207,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) { // the controller matching the selectors of the deleted pod into the work queue. labelMap := map[string]string{"foo": "bar"} rsSpec := newReplicaSet(1, labelMap) - manager.rsStore.Store.Add(rsSpec) + manager.rsStore.Indexer.Add(rsSpec) pods := newPodList(nil, 1, api.PodRunning, labelMap, rsSpec, "pod") manager.deletePod(cache.DeletedFinalStateUnknown{Key: "foo", Obj: &pods.Items[0]}) @@ -232,7 +232,7 @@ func TestSyncReplicaSetCreates(t *testing.T) { // A controller with 2 replicas and no pods in the store, 2 creates expected labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(2, labelMap) - manager.rsStore.Store.Add(rs) + manager.rsStore.Indexer.Add(rs) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl @@ -256,7 +256,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) { activePods := 5 labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(activePods, labelMap) - manager.rsStore.Store.Add(rs) + manager.rsStore.Indexer.Add(rs) rs.Status = extensions.ReplicaSetStatus{Replicas: int32(activePods), ReadyReplicas: int32(activePods), AvailableReplicas: int32(activePods)} newPodList(manager.podStore.Indexer, activePods, api.PodRunning, labelMap, rs, "pod") @@ -301,7 +301,7 @@ func TestControllerUpdateReplicas(t *testing.T) { extraLabelMap := map[string]string{"foo": "bar", "extraKey": "extraValue"} rs := newReplicaSet(5, labelMap) rs.Spec.Template.Labels = extraLabelMap - manager.rsStore.Store.Add(rs) + manager.rsStore.Indexer.Add(rs) rs.Status = extensions.ReplicaSetStatus{Replicas: 2, FullyLabeledReplicas: 6, ReadyReplicas: 2, AvailableReplicas: 2, ObservedGeneration: 0} rs.Generation = 1 newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rs, "pod") @@ -344,7 +344,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) { labelMap := map[string]string{"foo": "bar"} rsSpec := newReplicaSet(2, labelMap) - manager.rsStore.Store.Add(rsSpec) + manager.rsStore.Indexer.Add(rsSpec) newPodList(manager.podStore.Indexer, 1, api.PodRunning, labelMap, rsSpec, "pod") // Creates a replica and sets expectations @@ -439,7 +439,7 @@ func TestPodControllerLookup(t *testing.T) { } for _, c := range testCases { for _, r := range c.inRSs { - manager.rsStore.Add(r) + manager.rsStore.Indexer.Add(r) } if rs := manager.getPodReplicaSet(c.pod); rs != nil { if c.outRSName != rs.Name { @@ -471,7 +471,7 @@ func TestWatchControllers(t *testing.T) { // and closes the received channel to indicate that the test can finish. manager.syncHandler = func(key string) error { - obj, exists, err := manager.rsStore.Store.GetByKey(key) + obj, exists, err := manager.rsStore.Indexer.GetByKey(key) if !exists || err != nil { t.Errorf("Expected to find replica set under key %v", key) } @@ -509,13 +509,13 @@ func TestWatchPods(t *testing.T) { // Put one ReplicaSet and one pod into the controller's stores labelMap := map[string]string{"foo": "bar"} testRSSpec := newReplicaSet(1, labelMap) - manager.rsStore.Store.Add(testRSSpec) + manager.rsStore.Indexer.Add(testRSSpec) received := make(chan string) // The pod update sent through the fakeWatcher should figure out the managing ReplicaSet and // send it into the syncHandler. manager.syncHandler = func(key string) error { - obj, exists, err := manager.rsStore.Store.GetByKey(key) + obj, exists, err := manager.rsStore.Indexer.GetByKey(key) if !exists || err != nil { t.Errorf("Expected to find replica set under key %v", key) } @@ -553,7 +553,7 @@ func TestUpdatePods(t *testing.T) { received := make(chan string) manager.syncHandler = func(key string) error { - obj, exists, err := manager.rsStore.Store.GetByKey(key) + obj, exists, err := manager.rsStore.Indexer.GetByKey(key) if !exists || err != nil { t.Errorf("Expected to find replica set under key %v", key) } @@ -568,12 +568,12 @@ func TestUpdatePods(t *testing.T) { // Put 2 ReplicaSets and one pod into the controller's stores labelMap1 := map[string]string{"foo": "bar"} testRSSpec1 := newReplicaSet(1, labelMap1) - manager.rsStore.Store.Add(testRSSpec1) + manager.rsStore.Indexer.Add(testRSSpec1) testRSSpec2 := *testRSSpec1 labelMap2 := map[string]string{"bar": "foo"} testRSSpec2.Spec.Selector = &unversioned.LabelSelector{MatchLabels: labelMap2} testRSSpec2.Name = "barfoo" - manager.rsStore.Store.Add(&testRSSpec2) + manager.rsStore.Indexer.Add(&testRSSpec2) // case 1: We put in the podStore a pod with labels matching testRSSpec1, // then update its labels to match testRSSpec2. We expect to receive a sync @@ -632,7 +632,7 @@ func TestControllerUpdateRequeue(t *testing.T) { labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(1, labelMap) - manager.rsStore.Store.Add(rs) + manager.rsStore.Indexer.Add(rs) rs.Status = extensions.ReplicaSetStatus{Replicas: 2} newPodList(manager.podStore.Indexer, 1, api.PodRunning, labelMap, rs, "pod") @@ -714,7 +714,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) labelMap := map[string]string{"foo": "bar"} rsSpec := newReplicaSet(numReplicas, labelMap) - manager.rsStore.Store.Add(rsSpec) + manager.rsStore.Indexer.Add(rsSpec) expectedPods := int32(0) pods := newPodList(nil, numReplicas, api.PodPending, labelMap, rsSpec, "pod") @@ -728,7 +728,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) for _, replicas := range []int32{int32(numReplicas), 0} { rsSpec.Spec.Replicas = replicas - manager.rsStore.Store.Add(rsSpec) + manager.rsStore.Indexer.Add(rsSpec) for i := 0; i < numReplicas; i += burstReplicas { manager.syncReplicaSet(getKey(rsSpec, t)) @@ -866,7 +866,7 @@ func TestRSSyncExpectations(t *testing.T) { labelMap := map[string]string{"foo": "bar"} rsSpec := newReplicaSet(2, labelMap) - manager.rsStore.Store.Add(rsSpec) + manager.rsStore.Indexer.Add(rsSpec) pods := newPodList(nil, 2, api.PodPending, labelMap, rsSpec, "pod") manager.podStore.Indexer.Add(&pods.Items[0]) postExpectationsPod := pods.Items[1] @@ -889,7 +889,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) { manager.podStoreSynced = alwaysReady rs := newReplicaSet(1, map[string]string{"foo": "bar"}) - manager.rsStore.Store.Add(rs) + manager.rsStore.Indexer.Add(rs) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl @@ -911,7 +911,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) { if !exists || err != nil { t.Errorf("No expectations found for ReplicaSet") } - manager.rsStore.Delete(rs) + manager.rsStore.Indexer.Delete(rs) manager.syncReplicaSet(getKey(rs, t)) if _, exists, err = manager.expectations.GetExpectations(rsKey); exists { @@ -936,7 +936,7 @@ func TestRSManagerNotReady(t *testing.T) { // want to end up creating replicas in this case until the pod reflector // has synced, so the ReplicaSet controller should just requeue the ReplicaSet. rsSpec := newReplicaSet(1, map[string]string{"foo": "bar"}) - manager.rsStore.Store.Add(rsSpec) + manager.rsStore.Indexer.Add(rsSpec) rsKey := getKey(rsSpec, t) manager.syncReplicaSet(rsKey) @@ -980,7 +980,7 @@ func TestOverlappingRSs(t *testing.T) { } shuffledControllers := shuffle(controllers) for j := range shuffledControllers { - manager.rsStore.Store.Add(shuffledControllers[j]) + manager.rsStore.Indexer.Add(shuffledControllers[j]) } // Add a pod and make sure only the oldest ReplicaSet is synced pods := newPodList(nil, 1, api.PodPending, labelMap, controllers[0], "pod") @@ -1001,7 +1001,7 @@ func TestDeletionTimestamp(t *testing.T) { manager.podStoreSynced = alwaysReady rs := newReplicaSet(1, labelMap) - manager.rsStore.Store.Add(rs) + manager.rsStore.Indexer.Add(rs) rsKey, err := controller.KeyFunc(rs) if err != nil { t.Errorf("Couldn't get key for object %#v: %v", rs, err) @@ -1101,7 +1101,7 @@ func TestDoNotPatchPodWithOtherControlRef(t *testing.T) { manager, fakePodControl := setupManagerWithGCEnabled() labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(2, labelMap) - manager.rsStore.Store.Add(rs) + manager.rsStore.Indexer.Add(rs) var trueVar = true otherControllerReference := api.OwnerReference{UID: uuid.NewUUID(), APIVersion: "v1beta1", Kind: "ReplicaSet", Name: "AnotherRS", Controller: &trueVar} // add to podStore a matching Pod controlled by another controller. Expect no patch. @@ -1120,7 +1120,7 @@ func TestPatchPodWithOtherOwnerRef(t *testing.T) { manager, fakePodControl := setupManagerWithGCEnabled() labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(2, labelMap) - manager.rsStore.Store.Add(rs) + manager.rsStore.Indexer.Add(rs) // add to podStore one more matching pod that doesn't have a controller // ref, but has an owner ref pointing to other object. Expect a patch to // take control of it. @@ -1141,7 +1141,7 @@ func TestPatchPodWithCorrectOwnerRef(t *testing.T) { manager, fakePodControl := setupManagerWithGCEnabled() labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(2, labelMap) - manager.rsStore.Store.Add(rs) + manager.rsStore.Indexer.Add(rs) // add to podStore a matching pod that has an ownerRef pointing to the rs, // but ownerRef.Controller is false. Expect a patch to take control it. rsOwnerReference := api.OwnerReference{UID: rs.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: rs.Name} @@ -1161,7 +1161,7 @@ func TestPatchPodFails(t *testing.T) { manager, fakePodControl := setupManagerWithGCEnabled() labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(2, labelMap) - manager.rsStore.Store.Add(rs) + manager.rsStore.Indexer.Add(rs) // add to podStore two matching pods. Expect two patches to take control // them. manager.podStore.Indexer.Add(newPod("pod1", rs, api.PodRunning, nil)) @@ -1181,7 +1181,7 @@ func TestPatchExtraPodsThenDelete(t *testing.T) { manager, fakePodControl := setupManagerWithGCEnabled() labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(2, labelMap) - manager.rsStore.Store.Add(rs) + manager.rsStore.Indexer.Add(rs) // add to podStore three matching pods. Expect three patches to take control // them, and later delete one of them. manager.podStore.Indexer.Add(newPod("pod1", rs, api.PodRunning, nil)) @@ -1199,7 +1199,7 @@ func TestUpdateLabelsRemoveControllerRef(t *testing.T) { manager, fakePodControl := setupManagerWithGCEnabled() labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(2, labelMap) - manager.rsStore.Store.Add(rs) + manager.rsStore.Indexer.Add(rs) // put one pod in the podStore pod := newPod("pod", rs, api.PodRunning, nil) pod.ResourceVersion = "1" @@ -1246,7 +1246,7 @@ func TestUpdateSelectorControllerRef(t *testing.T) { // put the updatedRS into the store. This is consistent with the behavior of // the Informer: Informer updates the store before call the handler // (updateRS() in this case). - manager.rsStore.Store.Add(&updatedRS) + manager.rsStore.Indexer.Add(&updatedRS) manager.updateRS(rs, &updatedRS) // verifies that the rs is added to the queue rsKey := getKey(rs, t) @@ -1274,7 +1274,7 @@ func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) { rs := newReplicaSet(2, labelMap) now := unversioned.Now() rs.DeletionTimestamp = &now - manager.rsStore.Store.Add(rs) + manager.rsStore.Indexer.Add(rs) pod1 := newPod("pod1", rs, api.PodRunning, nil) manager.podStore.Indexer.Add(pod1) @@ -1304,7 +1304,7 @@ func TestReadyReplicas(t *testing.T) { rs := newReplicaSet(2, labelMap) rs.Status = extensions.ReplicaSetStatus{Replicas: 2, ReadyReplicas: 0, AvailableReplicas: 0, ObservedGeneration: 1} rs.Generation = 1 - manager.rsStore.Store.Add(rs) + manager.rsStore.Indexer.Add(rs) newPodList(manager.podStore.Indexer, 2, api.PodPending, labelMap, rs, "pod") newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rs, "pod") @@ -1346,7 +1346,7 @@ func TestAvailableReplicas(t *testing.T) { rs.Generation = 1 // minReadySeconds set to 15s rs.Spec.MinReadySeconds = 15 - manager.rsStore.Store.Add(rs) + manager.rsStore.Indexer.Add(rs) // First pod becomes ready 20s ago moment := unversioned.Time{Time: time.Now().Add(-2e10)} diff --git a/pkg/controller/replicaset/replica_set_utils.go b/pkg/controller/replicaset/replica_set_utils.go index edaf8a8b326..f321a18fb85 100644 --- a/pkg/controller/replicaset/replica_set_utils.go +++ b/pkg/controller/replicaset/replica_set_utils.go @@ -74,7 +74,7 @@ func updateReplicaCount(rsClient client.ReplicaSetInterface, rs extensions.Repli } // overlappingReplicaSets sorts a list of ReplicaSets by creation timestamp, using their names as a tie breaker. -type overlappingReplicaSets []extensions.ReplicaSet +type overlappingReplicaSets []*extensions.ReplicaSet func (o overlappingReplicaSets) Len() int { return len(o) } func (o overlappingReplicaSets) Swap(i, j int) { o[i], o[j] = o[j], o[i] } diff --git a/pkg/util/replicaset/replicaset.go b/pkg/util/replicaset/replicaset.go index 516f24de22b..7570f2bfad6 100644 --- a/pkg/util/replicaset/replicaset.go +++ b/pkg/util/replicaset/replicaset.go @@ -85,7 +85,7 @@ func UpdateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs } // GetPodTemplateSpecHash returns the pod template hash of a ReplicaSet's pod template space -func GetPodTemplateSpecHash(rs extensions.ReplicaSet) string { +func GetPodTemplateSpecHash(rs *extensions.ReplicaSet) string { meta := rs.Spec.Template.ObjectMeta meta.Labels = labelsutil.CloneAndRemoveLabel(meta.Labels, extensions.DefaultDeploymentUniqueLabelKey) return fmt.Sprintf("%d", podutil.GetPodTemplateSpecHash(api.PodTemplateSpec{ diff --git a/plugin/pkg/scheduler/algorithm/listers.go b/plugin/pkg/scheduler/algorithm/listers.go index 066fe305997..690b0a3f793 100644 --- a/plugin/pkg/scheduler/algorithm/listers.go +++ b/plugin/pkg/scheduler/algorithm/listers.go @@ -150,35 +150,23 @@ func (f FakeControllerLister) GetPodControllers(pod *api.Pod) (controllers []*ap // ReplicaSetLister interface represents anything that can produce a list of ReplicaSet; the list is consumed by a scheduler. type ReplicaSetLister interface { - // Lists all the replicasets - List() ([]extensions.ReplicaSet, error) // Gets the replicasets for the given pod - GetPodReplicaSets(*api.Pod) ([]extensions.ReplicaSet, error) + GetPodReplicaSets(*api.Pod) ([]*extensions.ReplicaSet, error) } // EmptyReplicaSetLister implements ReplicaSetLister on []extensions.ReplicaSet returning empty data type EmptyReplicaSetLister struct{} -// List returns nil -func (f EmptyReplicaSetLister) List() ([]extensions.ReplicaSet, error) { - return nil, nil -} - // GetPodReplicaSets returns nil -func (f EmptyReplicaSetLister) GetPodReplicaSets(pod *api.Pod) (rss []extensions.ReplicaSet, err error) { +func (f EmptyReplicaSetLister) GetPodReplicaSets(pod *api.Pod) (rss []*extensions.ReplicaSet, err error) { return nil, nil } // FakeReplicaSetLister implements ControllerLister on []extensions.ReplicaSet for test purposes. -type FakeReplicaSetLister []extensions.ReplicaSet - -// List returns []extensions.ReplicaSet, the list of all ReplicaSets. -func (f FakeReplicaSetLister) List() ([]extensions.ReplicaSet, error) { - return f, nil -} +type FakeReplicaSetLister []*extensions.ReplicaSet // GetPodReplicaSets gets the ReplicaSets that have the selector that match the labels on the given pod -func (f FakeReplicaSetLister) GetPodReplicaSets(pod *api.Pod) (rss []extensions.ReplicaSet, err error) { +func (f FakeReplicaSetLister) GetPodReplicaSets(pod *api.Pod) (rss []*extensions.ReplicaSet, err error) { var selector labels.Selector for _, rs := range f { diff --git a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go index a25fd105777..908398a5750 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go @@ -58,7 +58,7 @@ func TestSelectorSpreadPriority(t *testing.T) { pods []*api.Pod nodes []string rcs []*api.ReplicationController - rss []extensions.ReplicaSet + rss []*extensions.ReplicaSet services []*api.Service expectedList schedulerapi.HostPriorityList test string @@ -197,7 +197,7 @@ func TestSelectorSpreadPriority(t *testing.T) { }, nodes: []string{"machine1", "machine2"}, services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}}, - rss: []extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}}, + rss: []*extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}}, // We use ReplicaSet, instead of ReplicationController. The result should be exactly as above. expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}}, test: "service with partial pod label matches with service and replica set", @@ -225,7 +225,7 @@ func TestSelectorSpreadPriority(t *testing.T) { }, nodes: []string{"machine1", "machine2"}, services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"bar": "foo"}}}}, - rss: []extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}}, + rss: []*extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}}, // We use ReplicaSet, instead of ReplicationController. The result should be exactly as above. expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}}, test: "disjoined service and replica set should be treated equally", @@ -251,7 +251,7 @@ func TestSelectorSpreadPriority(t *testing.T) { {Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicaSet", "name", "abc123")}}, }, nodes: []string{"machine1", "machine2"}, - rss: []extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}}, + rss: []*extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}}, // We use ReplicaSet, instead of ReplicationController. The result should be exactly as above. expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 0}}, test: "Replica set with partial pod label matches", @@ -276,7 +276,7 @@ func TestSelectorSpreadPriority(t *testing.T) { {Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicaSet", "name", "abc123")}}, }, nodes: []string{"machine1", "machine2"}, - rss: []extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"baz": "blah"}}}}}, + rss: []*extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"baz": "blah"}}}}}, // We use ReplicaSet, instead of ReplicationController. The result should be exactly as above. expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}}, test: "Another replication set with partial pod label matches", @@ -344,7 +344,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) { pods []*api.Pod nodes []string rcs []*api.ReplicationController - rss []extensions.ReplicaSet + rss []*extensions.ReplicaSet services []*api.Service expectedList schedulerapi.HostPriorityList test string diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 6dd62f52e60..6d29781f6e0 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -110,7 +110,7 @@ func NewConfigFactory(client *client.Client, schedulerName string, hardPodAffini PVCLister: &cache.StoreToPVCFetcher{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, ServiceLister: &cache.StoreToServiceLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, ControllerLister: &cache.StoreToReplicationControllerLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, - ReplicaSetLister: &cache.StoreToReplicaSetLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, + ReplicaSetLister: &cache.StoreToReplicaSetLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, schedulerCache: schedulerCache, StopEverything: stopEverything, SchedulerName: schedulerName, @@ -425,7 +425,7 @@ func (f *ConfigFactory) Run() { // Watch and cache all ReplicaSet objects. Scheduler needs to find all pods // created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly. // Cache this locally. - cache.NewReflector(f.createReplicaSetLW(), &extensions.ReplicaSet{}, f.ReplicaSetLister.Store, 0).RunUntil(f.StopEverything) + cache.NewReflector(f.createReplicaSetLW(), &extensions.ReplicaSet{}, f.ReplicaSetLister.Indexer, 0).RunUntil(f.StopEverything) } func (f *ConfigFactory) getNextPod() *api.Pod { diff --git a/plugin/pkg/scheduler/generic_scheduler_test.go b/plugin/pkg/scheduler/generic_scheduler_test.go index 258de553b02..b9ffd87b631 100644 --- a/plugin/pkg/scheduler/generic_scheduler_test.go +++ b/plugin/pkg/scheduler/generic_scheduler_test.go @@ -496,7 +496,7 @@ func TestZeroRequest(t *testing.T) { Function: algorithmpriorities.NewSelectorSpreadPriority( algorithm.FakeServiceLister([]*api.Service{}), algorithm.FakeControllerLister([]*api.ReplicationController{}), - algorithm.FakeReplicaSetLister([]extensions.ReplicaSet{})), + algorithm.FakeReplicaSetLister([]*extensions.ReplicaSet{})), Weight: 1, }, }