diff --git a/pkg/client/cache/listers.go b/pkg/client/cache/listers.go index ef3d171e213..469b9c00d49 100644 --- a/pkg/client/cache/listers.go +++ b/pkg/client/cache/listers.go @@ -137,133 +137,6 @@ func (s storeToNodeConditionLister) List() (nodes []*api.Node, err error) { return } -// StoreToDeploymentLister gives a store List and Exists methods. The store must contain only Deployments. -type StoreToDeploymentLister struct { - Indexer -} - -// Exists checks if the given deployment exists in the store. -func (s *StoreToDeploymentLister) Exists(deployment *extensions.Deployment) (bool, error) { - _, exists, err := s.Indexer.Get(deployment) - if err != nil { - return false, err - } - return exists, nil -} - -// StoreToDeploymentLister lists all deployments in the store. -// TODO: converge on the interface in pkg/client -func (s *StoreToDeploymentLister) List() (deployments []extensions.Deployment, err error) { - for _, c := range s.Indexer.List() { - deployments = append(deployments, *(c.(*extensions.Deployment))) - } - return deployments, nil -} - -// GetDeploymentsForReplicaSet returns a list of deployments managing a replica set. Returns an error only if no matching deployments are found. -func (s *StoreToDeploymentLister) GetDeploymentsForReplicaSet(rs *extensions.ReplicaSet) (deployments []extensions.Deployment, err error) { - if len(rs.Labels) == 0 { - err = fmt.Errorf("no deployments found for ReplicaSet %v because it has no labels", rs.Name) - return - } - - // TODO: MODIFY THIS METHOD so that it checks for the podTemplateSpecHash label - dList, err := s.Deployments(rs.Namespace).List(labels.Everything()) - if err != nil { - return - } - for _, d := range dList { - selector, err := unversioned.LabelSelectorAsSelector(d.Spec.Selector) - if err != nil { - return nil, fmt.Errorf("invalid label selector: %v", err) - } - // If a deployment with a nil or empty selector creeps in, it should match nothing, not everything. - if selector.Empty() || !selector.Matches(labels.Set(rs.Labels)) { - continue - } - deployments = append(deployments, d) - } - if len(deployments) == 0 { - err = fmt.Errorf("could not find deployments set for ReplicaSet %s in namespace %s with labels: %v", rs.Name, rs.Namespace, rs.Labels) - } - return -} - -type storeToDeploymentNamespacer struct { - indexer Indexer - namespace string -} - -// storeToDeploymentNamespacer lists deployments under its namespace in the store. -func (s storeToDeploymentNamespacer) List(selector labels.Selector) (deployments []extensions.Deployment, err error) { - if s.namespace == api.NamespaceAll { - for _, m := range s.indexer.List() { - d := *(m.(*extensions.Deployment)) - if selector.Matches(labels.Set(d.Labels)) { - deployments = append(deployments, d) - } - } - return - } - - key := &extensions.Deployment{ObjectMeta: api.ObjectMeta{Namespace: s.namespace}} - items, err := s.indexer.Index(NamespaceIndex, key) - if err != nil { - // Ignore error; do slow search without index. - glog.Warningf("can not retrieve list of objects using index : %v", err) - for _, m := range s.indexer.List() { - d := *(m.(*extensions.Deployment)) - if s.namespace == d.Namespace && selector.Matches(labels.Set(d.Labels)) { - deployments = append(deployments, d) - } - } - return deployments, nil - } - for _, m := range items { - d := *(m.(*extensions.Deployment)) - if selector.Matches(labels.Set(d.Labels)) { - deployments = append(deployments, d) - } - } - return -} - -func (s *StoreToDeploymentLister) Deployments(namespace string) storeToDeploymentNamespacer { - return storeToDeploymentNamespacer{s.Indexer, namespace} -} - -// GetDeploymentsForPods returns a list of deployments managing a pod. Returns an error only if no matching deployments are found. -func (s *StoreToDeploymentLister) GetDeploymentsForPod(pod *api.Pod) (deployments []extensions.Deployment, err error) { - if len(pod.Labels) == 0 { - err = fmt.Errorf("no deployments found for Pod %v because it has no labels", pod.Name) - return - } - - if len(pod.Labels[extensions.DefaultDeploymentUniqueLabelKey]) == 0 { - return - } - - dList, err := s.Deployments(pod.Namespace).List(labels.Everything()) - if err != nil { - return - } - for _, d := range dList { - selector, err := unversioned.LabelSelectorAsSelector(d.Spec.Selector) - if err != nil { - return nil, fmt.Errorf("invalid label selector: %v", err) - } - // If a deployment with a nil or empty selector creeps in, it should match nothing, not everything. - if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) { - continue - } - deployments = append(deployments, d) - } - if len(deployments) == 0 { - err = fmt.Errorf("could not find deployments set for Pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) - } - return -} - // StoreToReplicaSetLister gives a store List and Exists methods. The store must contain only ReplicaSets. type StoreToReplicaSetLister struct { Store diff --git a/pkg/client/cache/listers_extensions.go b/pkg/client/cache/listers_extensions.go new file mode 100644 index 00000000000..e8060b63144 --- /dev/null +++ b/pkg/client/cache/listers_extensions.go @@ -0,0 +1,141 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "fmt" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/extensions" + "k8s.io/kubernetes/pkg/labels" +) + +// TODO: generate these classes and methods for all resources of interest using +// a script. Can use "go generate" once 1.4 is supported by all users. + +// Lister makes an Index have the List method. The Stores must contain only the expected type +// Example: +// s := cache.NewStore() +// lw := cache.ListWatch{Client: c, FieldSelector: sel, Resource: "pods"} +// r := cache.NewReflector(lw, &extensions.Deployment{}, s).Run() +// l := StoreToDeploymentLister{s} +// l.List() + +// StoreToDeploymentLister helps list pods + +// StoreToDeploymentLister gives a store List and Exists methods. The store must contain only Deployments. +type StoreToDeploymentLister struct { + Indexer Indexer +} + +func (s *StoreToDeploymentLister) List(selector labels.Selector) (ret []*extensions.Deployment, err error) { + err = ListAll(s.Indexer, selector, func(m interface{}) { + ret = append(ret, m.(*extensions.Deployment)) + }) + return ret, err +} + +func (s *StoreToDeploymentLister) Deployments(namespace string) storeDeploymentsNamespacer { + return storeDeploymentsNamespacer{Indexer: s.Indexer, namespace: namespace} +} + +type storeDeploymentsNamespacer struct { + Indexer Indexer + namespace string +} + +func (s storeDeploymentsNamespacer) List(selector labels.Selector) (ret []*extensions.Deployment, err error) { + err = ListAllByNamespace(s.Indexer, s.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*extensions.Deployment)) + }) + return ret, err +} + +func (s storeDeploymentsNamespacer) Get(name string) (*extensions.Deployment, error) { + obj, exists, err := s.Indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(extensions.Resource("deployment"), name) + } + return obj.(*extensions.Deployment), nil +} + +// GetDeploymentsForReplicaSet returns a list of deployments managing a replica set. Returns an error only if no matching deployments are found. +func (s *StoreToDeploymentLister) GetDeploymentsForReplicaSet(rs *extensions.ReplicaSet) (deployments []*extensions.Deployment, err error) { + if len(rs.Labels) == 0 { + err = fmt.Errorf("no deployments found for ReplicaSet %v because it has no labels", rs.Name) + return + } + + // TODO: MODIFY THIS METHOD so that it checks for the podTemplateSpecHash label + dList, err := s.Deployments(rs.Namespace).List(labels.Everything()) + if err != nil { + return + } + for _, d := range dList { + selector, err := unversioned.LabelSelectorAsSelector(d.Spec.Selector) + if err != nil { + return nil, fmt.Errorf("invalid label selector: %v", err) + } + // If a deployment with a nil or empty selector creeps in, it should match nothing, not everything. + if selector.Empty() || !selector.Matches(labels.Set(rs.Labels)) { + continue + } + deployments = append(deployments, d) + } + if len(deployments) == 0 { + err = fmt.Errorf("could not find deployments set for ReplicaSet %s in namespace %s with labels: %v", rs.Name, rs.Namespace, rs.Labels) + } + return +} + +// GetDeploymentsForDeployments returns a list of deployments managing a pod. Returns an error only if no matching deployments are found. +// TODO eliminate shallow copies +func (s *StoreToDeploymentLister) GetDeploymentsForPod(pod *api.Pod) (deployments []*extensions.Deployment, err error) { + if len(pod.Labels) == 0 { + err = fmt.Errorf("no deployments found for Pod %v because it has no labels", pod.Name) + return + } + + if len(pod.Labels[extensions.DefaultDeploymentUniqueLabelKey]) == 0 { + return + } + + dList, err := s.Deployments(pod.Namespace).List(labels.Everything()) + if err != nil { + return + } + for _, d := range dList { + selector, err := unversioned.LabelSelectorAsSelector(d.Spec.Selector) + if err != nil { + return nil, fmt.Errorf("invalid label selector: %v", err) + } + // If a deployment with a nil or empty selector creeps in, it should match nothing, not everything. + if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) { + continue + } + deployments = append(deployments, d) + } + if len(deployments) == 0 { + err = fmt.Errorf("could not find deployments set for Pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + } + return +} diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index 716d0522136..118d4a9e256 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -67,27 +67,27 @@ type DeploymentController struct { syncHandler func(dKey string) error // A store of deployments, populated by the dController - dStore cache.StoreToDeploymentLister + dLister cache.StoreToDeploymentLister // Watches changes to all deployments dController *cache.Controller // A store of ReplicaSets, populated by the rsController - rsStore cache.StoreToReplicaSetLister + rsLister cache.StoreToReplicaSetLister // Watches changes to all ReplicaSets rsController *cache.Controller // A store of pods, populated by the podController - podStore cache.StoreToPodLister + podLister cache.StoreToPodLister // Watches changes to all pods podController *cache.Controller - // dStoreSynced returns true if the Deployment store has been synced at least once. + // dListerSynced returns true if the Deployment store has been synced at least once. // Added as a member to the struct to allow injection for testing. - dStoreSynced func() bool - // rsStoreSynced returns true if the ReplicaSet store has been synced at least once. + dListerSynced func() bool + // rsListerSynced returns true if the ReplicaSet store has been synced at least once. // Added as a member to the struct to allow injection for testing. - rsStoreSynced func() bool - // podStoreSynced returns true if the pod store has been synced at least once. + rsListerSynced func() bool + // podListerSynced returns true if the pod store has been synced at least once. // Added as a member to the struct to allow injection for testing. - podStoreSynced func() bool + podListerSynced func() bool // Deployments that need to be synced queue workqueue.RateLimitingInterface @@ -109,7 +109,7 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"), } - dc.dStore.Indexer, dc.dController = cache.NewIndexerInformer( + dc.dLister.Indexer, dc.dController = cache.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { return dc.client.Extensions().Deployments(api.NamespaceAll).List(options) @@ -129,7 +129,7 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) - dc.rsStore.Store, dc.rsController = cache.NewInformer( + dc.rsLister.Store, dc.rsController = cache.NewInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { return dc.client.Extensions().ReplicaSets(api.NamespaceAll).List(options) @@ -147,7 +147,7 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller }, ) - dc.podStore.Indexer, dc.podController = cache.NewIndexerInformer( + dc.podLister.Indexer, dc.podController = cache.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { return dc.client.Core().Pods(api.NamespaceAll).List(options) @@ -167,9 +167,9 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller ) dc.syncHandler = dc.syncDeployment - dc.dStoreSynced = dc.dController.HasSynced - dc.rsStoreSynced = dc.rsController.HasSynced - dc.podStoreSynced = dc.podController.HasSynced + dc.dListerSynced = dc.dController.HasSynced + dc.rsListerSynced = dc.rsController.HasSynced + dc.podListerSynced = dc.podController.HasSynced return dc } @@ -183,7 +183,7 @@ func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) { // Wait for the rc and dc stores to sync before starting any work in this controller. ready := make(chan struct{}) - go dc.waitForSyncedStores(ready, stopCh) + go dc.waitForSyncedListers(ready, stopCh) select { case <-ready: case <-stopCh: @@ -199,10 +199,10 @@ func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) { dc.queue.ShutDown() } -func (dc *DeploymentController) waitForSyncedStores(ready chan<- struct{}, stopCh <-chan struct{}) { +func (dc *DeploymentController) waitForSyncedListers(ready chan<- struct{}, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() - for !dc.dStoreSynced() || !dc.rsStoreSynced() || !dc.podStoreSynced() { + for !dc.dListerSynced() || !dc.rsListerSynced() || !dc.podListerSynced() { select { case <-time.After(StoreSyncedPollPeriod): case <-stopCh: @@ -255,7 +255,7 @@ func (dc *DeploymentController) addReplicaSet(obj interface{}) { // getDeploymentForReplicaSet returns the deployment managing the given ReplicaSet. func (dc *DeploymentController) getDeploymentForReplicaSet(rs *extensions.ReplicaSet) *extensions.Deployment { - deployments, err := dc.dStore.GetDeploymentsForReplicaSet(rs) + deployments, err := dc.dLister.GetDeploymentsForReplicaSet(rs) if err != nil || len(deployments) == 0 { glog.V(4).Infof("Error: %v. No deployment found for ReplicaSet %v, deployment controller will avoid syncing.", err, rs.Name) return nil @@ -268,7 +268,7 @@ func (dc *DeploymentController) getDeploymentForReplicaSet(rs *extensions.Replic sort.Sort(util.BySelectorLastUpdateTime(deployments)) glog.Errorf("user error! more than one deployment is selecting replica set %s/%s with labels: %#v, returning %s/%s", rs.Namespace, rs.Name, rs.Labels, deployments[0].Namespace, deployments[0].Name) } - return &deployments[0] + return deployments[0] } // updateReplicaSet figures out what deployment(s) manage a ReplicaSet when the ReplicaSet @@ -328,7 +328,7 @@ func (dc *DeploymentController) deleteReplicaSet(obj interface{}) { // getDeploymentForPod returns the deployment that manages the given Pod. // If there are multiple deployments for a given Pod, only return the oldest one. func (dc *DeploymentController) getDeploymentForPod(pod *api.Pod) *extensions.Deployment { - deployments, err := dc.dStore.GetDeploymentsForPod(pod) + deployments, err := dc.dLister.GetDeploymentsForPod(pod) if err != nil || len(deployments) == 0 { glog.V(4).Infof("Error: %v. No deployment found for Pod %v, deployment controller will avoid syncing.", err, pod.Name) return nil @@ -338,7 +338,7 @@ func (dc *DeploymentController) getDeploymentForPod(pod *api.Pod) *extensions.De sort.Sort(util.BySelectorLastUpdateTime(deployments)) glog.Errorf("user error! more than one deployment is selecting pod %s/%s with labels: %#v, returning %s/%s", pod.Namespace, pod.Name, pod.Labels, deployments[0].Namespace, deployments[0].Name) } - return &deployments[0] + return deployments[0] } // When a pod is created, ensure its controller syncs @@ -478,7 +478,7 @@ func (dc *DeploymentController) syncDeployment(key string) error { glog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Now().Sub(startTime)) }() - obj, exists, err := dc.dStore.Indexer.GetByKey(key) + obj, exists, err := dc.dLister.Indexer.GetByKey(key) if err != nil { glog.Errorf("Unable to retrieve deployment %v from store: %v", key, err) return err @@ -548,24 +548,27 @@ func (dc *DeploymentController) handleOverlap(d *extensions.Deployment) error { if err != nil { return fmt.Errorf("deployment %s/%s has invalid label selector: %v", d.Namespace, d.Name, err) } - deployments, err := dc.dStore.Deployments(d.Namespace).List(labels.Everything()) + deployments, err := dc.dLister.Deployments(d.Namespace).List(labels.Everything()) if err != nil { return fmt.Errorf("error listing deployments in namespace %s: %v", d.Namespace, err) } overlapping := false - for i := range deployments { - other := &deployments[i] + for _, other := range deployments { if !selector.Empty() && selector.Matches(labels.Set(other.Spec.Template.Labels)) && d.UID != other.UID { + deploymentCopy, err := util.DeploymentDeepCopy(other) + if err != nil { + return err + } overlapping = true // We don't care if the overlapping annotation update failed or not (we don't make decision on it) d, _ = dc.markDeploymentOverlap(d, other.Name) - other, _ = dc.markDeploymentOverlap(other, d.Name) + deploymentCopy, _ = dc.markDeploymentOverlap(deploymentCopy, d.Name) // Skip syncing this one if older overlapping one is found // TODO: figure out a better way to determine which deployment to skip, // either with controller reference, or with validation. // Using oldest active replica set to determine which deployment to skip wouldn't make much difference, // since new replica set hasn't been created after selector update - if util.SelectorUpdatedBefore(other, d) { + if util.SelectorUpdatedBefore(deploymentCopy, d) { return fmt.Errorf("found deployment %s/%s has overlapping selector with an older deployment %s/%s, skip syncing it", d.Namespace, d.Name, other.Namespace, other.Name) } } diff --git a/pkg/controller/deployment/deployment_controller_test.go b/pkg/controller/deployment/deployment_controller_test.go index 84c15795f95..43453080be5 100644 --- a/pkg/controller/deployment/deployment_controller_test.go +++ b/pkg/controller/deployment/deployment_controller_test.go @@ -157,9 +157,9 @@ type fixture struct { client *fake.Clientset // Objects to put in the store. - dStore []*exp.Deployment - rsStore []*exp.ReplicaSet - podStore []*api.Pod + dLister []*exp.Deployment + rsLister []*exp.ReplicaSet + podLister []*api.Pod // Actions expected to happen on the client. Objects from here are also // preloaded into NewSimpleFake. @@ -200,16 +200,16 @@ func (f *fixture) run(deploymentName string) { f.client = fake.NewSimpleClientset(f.objects...) c := NewDeploymentController(f.client, controller.NoResyncPeriodFunc) c.eventRecorder = &record.FakeRecorder{} - c.rsStoreSynced = alwaysReady - c.podStoreSynced = alwaysReady - for _, d := range f.dStore { - c.dStore.Indexer.Add(d) + c.rsListerSynced = alwaysReady + c.podListerSynced = alwaysReady + for _, d := range f.dLister { + c.dLister.Indexer.Add(d) } - for _, rs := range f.rsStore { - c.rsStore.Store.Add(rs) + for _, rs := range f.rsLister { + c.rsLister.Store.Add(rs) } - for _, pod := range f.podStore { - c.podStore.Indexer.Add(pod) + for _, pod := range f.podLister { + c.podLister.Indexer.Add(pod) } err := c.syncDeployment(deploymentName) @@ -240,7 +240,7 @@ func TestSyncDeploymentCreatesReplicaSet(t *testing.T) { f := newFixture(t) d := newDeployment(1, nil) - f.dStore = append(f.dStore, d) + f.dLister = append(f.dLister, d) f.objects = append(f.objects, d) rs := newReplicaSet(d, "deploymentrs-4186632231", 1) @@ -258,7 +258,7 @@ func TestSyncDeploymentDontDoAnythingDuringDeletion(t *testing.T) { d := newDeployment(1, nil) now := unversioned.Now() d.DeletionTimestamp = &now - f.dStore = append(f.dStore, d) + f.dLister = append(f.dLister, d) f.run(getKey(d, t)) } @@ -269,13 +269,13 @@ func TestDeploymentController_dontSyncDeploymentsWithEmptyPodSelector(t *testing controller := NewDeploymentController(fake, controller.NoResyncPeriodFunc) controller.eventRecorder = &record.FakeRecorder{} - controller.rsStoreSynced = alwaysReady - controller.podStoreSynced = alwaysReady + controller.rsListerSynced = alwaysReady + controller.podListerSynced = alwaysReady d := newDeployment(1, nil) empty := unversioned.LabelSelector{} d.Spec.Selector = &empty - controller.dStore.Indexer.Add(d) + controller.dLister.Indexer.Add(d) // We expect the deployment controller to not take action here since it's configuration // is invalid, even though no replicasets exist that match it's selector. controller.syncDeployment(fmt.Sprintf("%s/%s", d.ObjectMeta.Namespace, d.ObjectMeta.Name)) diff --git a/pkg/controller/deployment/recreate.go b/pkg/controller/deployment/recreate.go index a0b238d0327..e3ec1d7c6b9 100644 --- a/pkg/controller/deployment/recreate.go +++ b/pkg/controller/deployment/recreate.go @@ -108,7 +108,7 @@ func (dc *DeploymentController) waitForInactiveReplicaSets(oldRSs []*extensions. statusReplicas := rs.Status.Replicas if err := wait.ExponentialBackoff(unversionedclient.DefaultRetry, func() (bool, error) { - replicaSet, err := dc.rsStore.ReplicaSets(rs.Namespace).Get(rs.Name) + replicaSet, err := dc.rsLister.ReplicaSets(rs.Namespace).Get(rs.Name) if err != nil { return false, err } diff --git a/pkg/controller/deployment/sync.go b/pkg/controller/deployment/sync.go index 1479d02f701..006aad05be1 100644 --- a/pkg/controller/deployment/sync.go +++ b/pkg/controller/deployment/sync.go @@ -106,7 +106,7 @@ func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(deployment *ext func (dc *DeploymentController) rsAndPodsWithHashKeySynced(deployment *extensions.Deployment) ([]extensions.ReplicaSet, *api.PodList, error) { rsList, err := deploymentutil.ListReplicaSets(deployment, func(namespace string, options api.ListOptions) ([]extensions.ReplicaSet, error) { - return dc.rsStore.ReplicaSets(namespace).List(options.LabelSelector) + return dc.rsLister.ReplicaSets(namespace).List(options.LabelSelector) }) if err != nil { return nil, nil, fmt.Errorf("error listing ReplicaSets: %v", err) @@ -174,7 +174,7 @@ func (dc *DeploymentController) addHashKeyToRSAndPods(rs extensions.ReplicaSet) return nil, fmt.Errorf("error in converting selector to label selector for replica set %s: %s", updatedRS.Name, err) } options := api.ListOptions{LabelSelector: selector} - pods, err := dc.podStore.Pods(namespace).List(options.LabelSelector) + pods, err := dc.podLister.Pods(namespace).List(options.LabelSelector) if err != nil { return nil, fmt.Errorf("error in getting pod list for namespace %s and list options %+v: %s", namespace, options, err) } @@ -228,7 +228,7 @@ func (dc *DeploymentController) addHashKeyToRSAndPods(rs extensions.ReplicaSet) func (dc *DeploymentController) listPods(deployment *extensions.Deployment) (*api.PodList, error) { return deploymentutil.ListPods(deployment, func(namespace string, options api.ListOptions) (*api.PodList, error) { - pods, err := dc.podStore.Pods(namespace).List(options.LabelSelector) + pods, err := dc.podLister.Pods(namespace).List(options.LabelSelector) result := api.PodList{Items: make([]api.Pod, 0, len(pods))} for i := range pods { result.Items = append(result.Items, *pods[i]) diff --git a/pkg/controller/deployment/sync_test.go b/pkg/controller/deployment/sync_test.go index 97a7ccb4825..64da36e122b 100644 --- a/pkg/controller/deployment/sync_test.go +++ b/pkg/controller/deployment/sync_test.go @@ -326,10 +326,10 @@ func TestDeploymentController_cleanupDeployment(t *testing.T) { controller := NewDeploymentController(fake, controller.NoResyncPeriodFunc) controller.eventRecorder = &record.FakeRecorder{} - controller.rsStoreSynced = alwaysReady - controller.podStoreSynced = alwaysReady + controller.rsListerSynced = alwaysReady + controller.podListerSynced = alwaysReady for _, rs := range test.oldRSs { - controller.rsStore.Add(rs) + controller.rsLister.Add(rs) } d := newDeployment(1, &tests[i].revisionHistoryLimit) diff --git a/pkg/controller/deployment/util/deployment_util.go b/pkg/controller/deployment/util/deployment_util.go index ed62f44158e..b87c2352f88 100644 --- a/pkg/controller/deployment/util/deployment_util.go +++ b/pkg/controller/deployment/util/deployment_util.go @@ -826,12 +826,12 @@ func LastSelectorUpdate(d *extensions.Deployment) unversioned.Time { // BySelectorLastUpdateTime sorts a list of deployments by the last update time of their selector, // first using their creation timestamp and then their names as a tie breaker. -type BySelectorLastUpdateTime []extensions.Deployment +type BySelectorLastUpdateTime []*extensions.Deployment func (o BySelectorLastUpdateTime) Len() int { return len(o) } func (o BySelectorLastUpdateTime) Swap(i, j int) { o[i], o[j] = o[j], o[i] } func (o BySelectorLastUpdateTime) Less(i, j int) bool { - ti, tj := LastSelectorUpdate(&o[i]), LastSelectorUpdate(&o[j]) + ti, tj := LastSelectorUpdate(o[i]), LastSelectorUpdate(o[j]) if ti.Equal(tj) { if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) { return o[i].Name < o[j].Name diff --git a/pkg/controller/informers/extensions.go b/pkg/controller/informers/extensions.go index a0539d28fd5..20a9d673cd9 100644 --- a/pkg/controller/informers/extensions.go +++ b/pkg/controller/informers/extensions.go @@ -68,3 +68,45 @@ func (f *daemonSetInformer) Lister() *cache.StoreToDaemonSetLister { informer := f.Informer() return &cache.StoreToDaemonSetLister{Store: informer.GetIndexer()} } + +// DeploymentInformer is atype of SharedIndexInformer which watches and lists all deployments. +type DeploymentInformer interface { + Informer() cache.SharedIndexInformer + Lister() *cache.StoreToDeploymentLister +} + +type deploymentInformer struct { + *sharedInformerFactory +} + +func (f *deploymentInformer) Informer() cache.SharedIndexInformer { + f.lock.Lock() + defer f.lock.Unlock() + + informerType := reflect.TypeOf(&extensions.Deployment{}) + informer, exists := f.informers[informerType] + if exists { + return informer + } + informer = cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return f.client.Extensions().Deployments(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return f.client.Extensions().Deployments(api.NamespaceAll).Watch(options) + }, + }, + &extensions.Deployment{}, + f.defaultResync, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) + f.informers[informerType] = informer + + return informer +} + +func (f *deploymentInformer) Lister() *cache.StoreToDeploymentLister { + informer := f.Informer() + return &cache.StoreToDeploymentLister{Store: informer.GetIndexer()} +} diff --git a/pkg/controller/informers/factory.go b/pkg/controller/informers/factory.go index 6350c16fb3b..dc0fb4a59bc 100644 --- a/pkg/controller/informers/factory.go +++ b/pkg/controller/informers/factory.go @@ -38,6 +38,7 @@ type SharedInformerFactory interface { PersistentVolumes() PVInformer DaemonSets() DaemonSetInformer + Deployments() DeploymentInformer } type sharedInformerFactory struct { @@ -102,3 +103,7 @@ func (f *sharedInformerFactory) PersistentVolumes() PVInformer { func (f *sharedInformerFactory) DaemonSets() DaemonSetInformer { return &daemonSetInformer{sharedInformerFactory: f} } + +func (f *sharedInformerFactory) Deployments() DeploymentInformer { + return &deploymentInformer{sharedInformerFactory: f} +}