From 90557ec56c83a3e2e307b77a4f7b7f97dbb5cff8 Mon Sep 17 00:00:00 2001 From: Janet Kuo Date: Tue, 16 Aug 2016 18:47:15 -0700 Subject: [PATCH] Handle overlapping deployments gracefully 1. When overlapping deployments are discovered, annotate them 2. Expose those overlapping annotations as warnings in kubectl describe 3. Only respect the earliest updated one (skip syncing all other overlapping deployments) 4. Use indexer instead of store for deployment lister --- pkg/client/cache/listers.go | 94 ++++++++++++++-- .../deployment/deployment_controller.go | 104 ++++++++++++++---- .../deployment/deployment_controller_test.go | 4 +- .../deployment/util/deployment_util.go | 45 ++++++++ pkg/controller/disruption/disruption.go | 7 +- pkg/controller/disruption/disruption_test.go | 4 +- pkg/kubectl/describe.go | 4 + pkg/registry/deployment/strategy.go | 12 ++ 8 files changed, 233 insertions(+), 41 deletions(-) diff --git a/pkg/client/cache/listers.go b/pkg/client/cache/listers.go index 6041744477f..c768c29760e 100644 --- a/pkg/client/cache/listers.go +++ b/pkg/client/cache/listers.go @@ -285,12 +285,12 @@ func (s *StoreToReplicationControllerLister) GetPodControllers(pod *api.Pod) (co // StoreToDeploymentLister gives a store List and Exists methods. The store must contain only Deployments. type StoreToDeploymentLister struct { - Store + Indexer } // Exists checks if the given deployment exists in the store. func (s *StoreToDeploymentLister) Exists(deployment *extensions.Deployment) (bool, error) { - _, exists, err := s.Store.Get(deployment) + _, exists, err := s.Indexer.Get(deployment) if err != nil { return false, err } @@ -300,7 +300,7 @@ func (s *StoreToDeploymentLister) Exists(deployment *extensions.Deployment) (boo // StoreToDeploymentLister lists all deployments in the store. // TODO: converge on the interface in pkg/client func (s *StoreToDeploymentLister) List() (deployments []extensions.Deployment, err error) { - for _, c := range s.Store.List() { + for _, c := range s.Indexer.List() { deployments = append(deployments, *(c.(*extensions.Deployment))) } return deployments, nil @@ -308,20 +308,17 @@ func (s *StoreToDeploymentLister) List() (deployments []extensions.Deployment, e // GetDeploymentsForReplicaSet returns a list of deployments managing a replica set. Returns an error only if no matching deployments are found. func (s *StoreToDeploymentLister) GetDeploymentsForReplicaSet(rs *extensions.ReplicaSet) (deployments []extensions.Deployment, err error) { - var d extensions.Deployment - if len(rs.Labels) == 0 { err = fmt.Errorf("no deployments found for ReplicaSet %v because it has no labels", rs.Name) return } // TODO: MODIFY THIS METHOD so that it checks for the podTemplateSpecHash label - for _, m := range s.Store.List() { - d = *m.(*extensions.Deployment) - if d.Namespace != rs.Namespace { - continue - } - + dList, err := s.Deployments(rs.Namespace).List(labels.Everything()) + if err != nil { + return + } + for _, d := range dList { selector, err := unversioned.LabelSelectorAsSelector(d.Spec.Selector) if err != nil { return nil, fmt.Errorf("invalid label selector: %v", err) @@ -338,6 +335,81 @@ func (s *StoreToDeploymentLister) GetDeploymentsForReplicaSet(rs *extensions.Rep return } +type storeToDeploymentNamespacer struct { + indexer Indexer + namespace string +} + +// storeToDeploymentNamespacer lists deployments under its namespace in the store. +func (s storeToDeploymentNamespacer) List(selector labels.Selector) (deployments []extensions.Deployment, err error) { + if s.namespace == api.NamespaceAll { + for _, m := range s.indexer.List() { + d := *(m.(*extensions.Deployment)) + if selector.Matches(labels.Set(d.Labels)) { + deployments = append(deployments, d) + } + } + return + } + + key := &extensions.Deployment{ObjectMeta: api.ObjectMeta{Namespace: s.namespace}} + items, err := s.indexer.Index(NamespaceIndex, key) + if err != nil { + // Ignore error; do slow search without index. + glog.Warningf("can not retrieve list of objects using index : %v", err) + for _, m := range s.indexer.List() { + d := *(m.(*extensions.Deployment)) + if s.namespace == d.Namespace && selector.Matches(labels.Set(d.Labels)) { + deployments = append(deployments, d) + } + } + return deployments, nil + } + for _, m := range items { + d := *(m.(*extensions.Deployment)) + if selector.Matches(labels.Set(d.Labels)) { + deployments = append(deployments, d) + } + } + return +} + +func (s *StoreToDeploymentLister) Deployments(namespace string) storeToDeploymentNamespacer { + return storeToDeploymentNamespacer{s.Indexer, namespace} +} + +// GetDeploymentsForPods returns a list of deployments managing a pod. Returns an error only if no matching deployments are found. +func (s *StoreToDeploymentLister) GetDeploymentsForPod(pod *api.Pod) (deployments []extensions.Deployment, err error) { + if len(pod.Labels) == 0 { + err = fmt.Errorf("no deployments found for Pod %v because it has no labels", pod.Name) + return + } + + if len(pod.Labels[extensions.DefaultDeploymentUniqueLabelKey]) == 0 { + return + } + + dList, err := s.Deployments(pod.Namespace).List(labels.Everything()) + if err != nil { + return + } + for _, d := range dList { + selector, err := unversioned.LabelSelectorAsSelector(d.Spec.Selector) + if err != nil { + return nil, fmt.Errorf("invalid label selector: %v", err) + } + // If a deployment with a nil or empty selector creeps in, it should match nothing, not everything. + if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) { + continue + } + deployments = append(deployments, d) + } + if len(deployments) == 0 { + err = fmt.Errorf("could not find deployments set for Pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + } + return +} + // StoreToReplicaSetLister gives a store List and Exists methods. The store must contain only ReplicaSets. type StoreToReplicaSetLister struct { Store diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index d4148e81bd9..1a1f2d27bb1 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -23,6 +23,7 @@ package deployment import ( "fmt" "reflect" + "sort" "time" "github.com/golang/glog" @@ -36,6 +37,7 @@ import ( "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/deployment/util" "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/metrics" utilruntime "k8s.io/kubernetes/pkg/util/runtime" @@ -108,7 +110,7 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), } - dc.dStore.Store, dc.dController = framework.NewInformer( + dc.dStore.Indexer, dc.dController = framework.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { return dc.client.Extensions().Deployments(api.NamespaceAll).List(options) @@ -125,6 +127,7 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller // This will enter the sync loop and no-op, because the deployment has been deleted from the store. DeleteFunc: dc.deleteDeploymentNotification, }, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) dc.rsStore.Store, dc.rsController = framework.NewInformer( @@ -252,7 +255,6 @@ func (dc *DeploymentController) addReplicaSet(obj interface{}) { } // getDeploymentForReplicaSet returns the deployment managing the given ReplicaSet. -// TODO: Surface that we are ignoring multiple deployments for a given ReplicaSet. func (dc *DeploymentController) getDeploymentForReplicaSet(rs *extensions.ReplicaSet) *extensions.Deployment { deployments, err := dc.dStore.GetDeploymentsForReplicaSet(rs) if err != nil || len(deployments) == 0 { @@ -262,8 +264,11 @@ func (dc *DeploymentController) getDeploymentForReplicaSet(rs *extensions.Replic // Because all ReplicaSet's belonging to a deployment should have a unique label key, // there should never be more than one deployment returned by the above method. // If that happens we should probably dynamically repair the situation by ultimately - // trying to clean up one of the controllers, for now we just return one of the two, - // likely randomly. + // trying to clean up one of the controllers, for now we just return the older one + if len(deployments) > 1 { + sort.Sort(util.BySelectorLastUpdateTime(deployments)) + glog.Errorf("user error! more than one deployment is selecting replica set %s/%s with labels: %#v, returning %s/%s", rs.Namespace, rs.Name, rs.Labels, deployments[0].Namespace, deployments[0].Name) + } return &deployments[0] } @@ -321,22 +326,20 @@ func (dc *DeploymentController) deleteReplicaSet(obj interface{}) { } } -// getDeploymentForPod returns the deployment managing the ReplicaSet that manages the given Pod. -// TODO: Surface that we are ignoring multiple deployments for a given Pod. +// getDeploymentForPod returns the deployment that manages the given Pod. +// If there are multiple deployments for a given Pod, only return the oldest one. func (dc *DeploymentController) getDeploymentForPod(pod *api.Pod) *extensions.Deployment { - rss, err := dc.rsStore.GetPodReplicaSets(pod) - if err != nil { - glog.V(4).Infof("Error: %v. No ReplicaSets found for pod %v, deployment controller will avoid syncing.", err, pod.Name) + deployments, err := dc.dStore.GetDeploymentsForPod(pod) + if err != nil || len(deployments) == 0 { + glog.V(4).Infof("Error: %v. No deployment found for Pod %v, deployment controller will avoid syncing.", err, pod.Name) return nil } - for _, rs := range rss { - deployments, err := dc.dStore.GetDeploymentsForReplicaSet(&rs) - if err == nil && len(deployments) > 0 { - return &deployments[0] - } + + if len(deployments) > 1 { + sort.Sort(util.BySelectorLastUpdateTime(deployments)) + glog.Errorf("user error! more than one deployment is selecting pod %s/%s with labels: %#v, returning %s/%s", pod.Namespace, pod.Name, pod.Labels, deployments[0].Namespace, deployments[0].Name) } - glog.V(4).Infof("No deployments found for pod %v, deployment controller will avoid syncing.", pod.Name) - return nil + return &deployments[0] } // When a pod is created, ensure its controller syncs @@ -407,15 +410,28 @@ func (dc *DeploymentController) enqueueDeployment(deployment *extensions.Deploym return } - // TODO: Handle overlapping deployments better. Either disallow them at admission time or - // deterministically avoid syncing deployments that fight over ReplicaSet's. Currently, we - // only ensure that the same deployment is synced for a given ReplicaSet. When we - // periodically relist all deployments there will still be some ReplicaSet instability. One - // way to handle this is by querying the store for all deployments that this deployment - // overlaps, as well as all deployments that overlap this deployments, and sorting them. dc.queue.Add(key) } +func (dc *DeploymentController) markDeploymentOverlap(deployment *extensions.Deployment, withDeployment string) (*extensions.Deployment, error) { + if deployment.Annotations[util.OverlapAnnotation] == withDeployment { + return deployment, nil + } + if deployment.Annotations == nil { + deployment.Annotations = make(map[string]string) + } + deployment.Annotations[util.OverlapAnnotation] = withDeployment + return dc.client.Extensions().Deployments(deployment.Namespace).Update(deployment) +} + +func (dc *DeploymentController) clearDeploymentOverlap(deployment *extensions.Deployment) (*extensions.Deployment, error) { + if len(deployment.Annotations[util.OverlapAnnotation]) == 0 { + return deployment, nil + } + delete(deployment.Annotations, util.OverlapAnnotation) + return dc.client.Extensions().Deployments(deployment.Namespace).Update(deployment) +} + // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. func (dc *DeploymentController) worker() { @@ -463,7 +479,7 @@ func (dc *DeploymentController) syncDeployment(key string) error { glog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Now().Sub(startTime)) }() - obj, exists, err := dc.dStore.Store.GetByKey(key) + obj, exists, err := dc.dStore.Indexer.GetByKey(key) if err != nil { glog.Infof("Unable to retrieve deployment %v from store: %v", key, err) return err @@ -491,6 +507,11 @@ func (dc *DeploymentController) syncDeployment(key string) error { return dc.syncStatusOnly(d) } + // Handle overlapping deployments by deterministically avoid syncing deployments that fight over ReplicaSets. + if err = dc.handleOverlap(d); err != nil { + return err + } + if d.Spec.Paused { return dc.sync(d) } @@ -518,3 +539,40 @@ func (dc *DeploymentController) syncDeployment(key string) error { } return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type) } + +// handleOverlap relists all deployment in the same namespace for overlaps, and avoid syncing +// the newer overlapping ones (only sync the oldest one). New/old is determined by when the +// deployment's selector is last updated. +func (dc *DeploymentController) handleOverlap(d *extensions.Deployment) error { + selector, err := unversioned.LabelSelectorAsSelector(d.Spec.Selector) + if err != nil { + return fmt.Errorf("deployment %s/%s has invalid label selector: %v", d.Namespace, d.Name, err) + } + deployments, err := dc.dStore.Deployments(d.Namespace).List(labels.Everything()) + if err != nil { + return fmt.Errorf("error listing deployments in namespace %s: %v", d.Namespace, err) + } + overlapping := false + for i := range deployments { + other := &deployments[i] + if !selector.Empty() && selector.Matches(labels.Set(other.Spec.Template.Labels)) && d.UID != other.UID { + overlapping = true + // We don't care if the overlapping annotation update failed or not (we don't make decision on it) + d, _ = dc.markDeploymentOverlap(d, other.Name) + other, _ = dc.markDeploymentOverlap(other, d.Name) + // Skip syncing this one if older overlapping one is found + // TODO: figure out a better way to determine which deployment to skip, + // either with controller reference, or with validation. + // Using oldest active replica set to determine which deployment to skip wouldn't make much difference, + // since new replica set hasn't been created after selector update + if util.SelectorUpdatedBefore(other, d) { + return fmt.Errorf("found deployment %s/%s has overlapping selector with an older deployment %s/%s, skip syncing it", d.Namespace, d.Name, other.Namespace, other.Name) + } + } + } + if !overlapping { + // We don't care if the overlapping annotation update failed or not (we don't make decision on it) + d, _ = dc.clearDeploymentOverlap(d) + } + return nil +} diff --git a/pkg/controller/deployment/deployment_controller_test.go b/pkg/controller/deployment/deployment_controller_test.go index ae570fc8d01..84c15795f95 100644 --- a/pkg/controller/deployment/deployment_controller_test.go +++ b/pkg/controller/deployment/deployment_controller_test.go @@ -203,7 +203,7 @@ func (f *fixture) run(deploymentName string) { c.rsStoreSynced = alwaysReady c.podStoreSynced = alwaysReady for _, d := range f.dStore { - c.dStore.Store.Add(d) + c.dStore.Indexer.Add(d) } for _, rs := range f.rsStore { c.rsStore.Store.Add(rs) @@ -275,7 +275,7 @@ func TestDeploymentController_dontSyncDeploymentsWithEmptyPodSelector(t *testing d := newDeployment(1, nil) empty := unversioned.LabelSelector{} d.Spec.Selector = &empty - controller.dStore.Store.Add(d) + controller.dStore.Indexer.Add(d) // We expect the deployment controller to not take action here since it's configuration // is invalid, even though no replicasets exist that match it's selector. controller.syncDeployment(fmt.Sprintf("%s/%s", d.ObjectMeta.Namespace, d.ObjectMeta.Name)) diff --git a/pkg/controller/deployment/util/deployment_util.go b/pkg/controller/deployment/util/deployment_util.go index d7e43508f33..64ed4454ca7 100644 --- a/pkg/controller/deployment/util/deployment_util.go +++ b/pkg/controller/deployment/util/deployment_util.go @@ -58,6 +58,12 @@ const ( RollbackTemplateUnchanged = "DeploymentRollbackTemplateUnchanged" // RollbackDone is the done rollback event reason RollbackDone = "DeploymentRollback" + // OverlapAnnotation marks deployments with overlapping selector with other deployments + // TODO: Delete this annotation when we gracefully handle overlapping selectors. See https://github.com/kubernetes/kubernetes/issues/2210 + OverlapAnnotation = "deployment.kubernetes.io/error-selector-overlapping-with" + // SelectorUpdateAnnotation marks the last time deployment selector update + // TODO: Delete this annotation when we gracefully handle overlapping selectors. See https://github.com/kubernetes/kubernetes/issues/2210 + SelectorUpdateAnnotation = "deployment.kubernetes.io/selector-updated-at" ) // MaxRevision finds the highest revision in the replica sets @@ -791,3 +797,42 @@ func DeploymentDeepCopy(deployment *extensions.Deployment) (*extensions.Deployme } return copied, nil } + +// SelectorUpdatedBefore returns true if the former deployment's selector +// is updated before the latter, false otherwise +func SelectorUpdatedBefore(d1, d2 *extensions.Deployment) bool { + t1, t2 := LastSelectorUpdate(d1), LastSelectorUpdate(d2) + return t1.Before(t2) +} + +// LastSelectorUpdate returns the last time given deployment's selector is updated +func LastSelectorUpdate(d *extensions.Deployment) unversioned.Time { + t := d.Annotations[SelectorUpdateAnnotation] + if len(t) > 0 { + parsedTime, err := time.Parse(t, time.RFC3339) + // If failed to parse the time, use creation timestamp instead + if err != nil { + return d.CreationTimestamp + } + return unversioned.Time{Time: parsedTime} + } + // If it's never updated, use creation timestamp instead + return d.CreationTimestamp +} + +// BySelectorLastUpdateTime sorts a list of deployments by the last update time of their selector, +// first using their creation timestamp and then their names as a tie breaker. +type BySelectorLastUpdateTime []extensions.Deployment + +func (o BySelectorLastUpdateTime) Len() int { return len(o) } +func (o BySelectorLastUpdateTime) Swap(i, j int) { o[i], o[j] = o[j], o[i] } +func (o BySelectorLastUpdateTime) Less(i, j int) bool { + ti, tj := LastSelectorUpdate(&o[i]), LastSelectorUpdate(&o[j]) + if ti.Equal(tj) { + if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) { + return o[i].Name < o[j].Name + } + return o[i].CreationTimestamp.Before(o[j].CreationTimestamp) + } + return ti.Before(tj) +} diff --git a/pkg/controller/disruption/disruption.go b/pkg/controller/disruption/disruption.go index 27be031cc92..bdd4f3b426f 100644 --- a/pkg/controller/disruption/disruption.go +++ b/pkg/controller/disruption/disruption.go @@ -61,7 +61,7 @@ type DisruptionController struct { rsController *framework.Controller rsLister cache.StoreToReplicaSetLister - dStore cache.Store + dIndexer cache.Indexer dController *framework.Controller dLister cache.StoreToDeploymentLister @@ -155,7 +155,7 @@ func NewDisruptionController(podInformer framework.SharedIndexInformer, kubeClie dc.rsLister.Store = dc.rsStore - dc.dStore, dc.dController = framework.NewInformer( + dc.dIndexer, dc.dController = framework.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { return dc.kubeClient.Extensions().Deployments(api.NamespaceAll).List(options) @@ -167,9 +167,10 @@ func NewDisruptionController(podInformer framework.SharedIndexInformer, kubeClie &extensions.Deployment{}, 30*time.Second, framework.ResourceEventHandlerFuncs{}, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) - dc.dLister.Store = dc.dStore + dc.dLister.Indexer = dc.dIndexer return dc } diff --git a/pkg/controller/disruption/disruption_test.go b/pkg/controller/disruption/disruption_test.go index b17189ab297..7446753b25e 100644 --- a/pkg/controller/disruption/disruption_test.go +++ b/pkg/controller/disruption/disruption_test.go @@ -81,7 +81,7 @@ func newFakeDisruptionController() (*DisruptionController, *pdbStates) { podLister: cache.StoreToPodLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{})}, rcLister: cache.StoreToReplicationControllerLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, rsLister: cache.StoreToReplicaSetLister{Store: cache.NewStore(controller.KeyFunc)}, - dLister: cache.StoreToDeploymentLister{Store: cache.NewStore(controller.KeyFunc)}, + dLister: cache.StoreToDeploymentLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{})}, getUpdater: func() updater { return ps.Set }, broadcaster: record.NewBroadcaster(), } @@ -442,7 +442,7 @@ func TestTwoControllers(t *testing.T) { d, _ := newDeployment(t, 11) d.Spec.Selector = newSel(dLabels) - add(t, dc.dLister.Store, d) + add(t, dc.dLister.Indexer, d) dc.sync(pdbName) ps.VerifyPdbStatus(t, pdbName, true, 4, 4, 11) diff --git a/pkg/kubectl/describe.go b/pkg/kubectl/describe.go index 26cc34596ae..6aba8e92c88 100644 --- a/pkg/kubectl/describe.go +++ b/pkg/kubectl/describe.go @@ -2166,6 +2166,10 @@ func (dd *DeploymentDescriber) Describe(namespace, name string, describerSetting } fmt.Fprintf(out, "NewReplicaSet:\t%s\n", printReplicaSetsByLabels(newRSs)) } + overlapWith := d.Annotations[deploymentutil.OverlapAnnotation] + if len(overlapWith) > 0 { + fmt.Fprintf(out, "!!!WARNING!!! This deployment has overlapping label selector with deployment %q and won't behave as expected. Please fix it before continue.\n", overlapWith) + } if describerSettings.ShowEvents { events, err := dd.Core().Events(namespace).Search(d) if err == nil && events != nil { diff --git a/pkg/registry/deployment/strategy.go b/pkg/registry/deployment/strategy.go index e460685b373..e262f64509b 100644 --- a/pkg/registry/deployment/strategy.go +++ b/pkg/registry/deployment/strategy.go @@ -19,10 +19,13 @@ package deployment import ( "fmt" "reflect" + "time" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/apis/extensions/validation" + "k8s.io/kubernetes/pkg/controller/deployment/util" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/registry/generic" @@ -80,6 +83,15 @@ func (deploymentStrategy) PrepareForUpdate(ctx api.Context, obj, old runtime.Obj !reflect.DeepEqual(newDeployment.Annotations, oldDeployment.Annotations) { newDeployment.Generation = oldDeployment.Generation + 1 } + + // Records timestamp on selector updates in annotation + if !reflect.DeepEqual(newDeployment.Spec.Selector, oldDeployment.Spec.Selector) { + if newDeployment.Annotations == nil { + newDeployment.Annotations = make(map[string]string) + } + now := unversioned.Now() + newDeployment.Annotations[util.SelectorUpdateAnnotation] = now.Format(time.RFC3339) + } } // ValidateUpdate is the default update validation for an end user.