diff --git a/pkg/controller/deployment/BUILD b/pkg/controller/deployment/BUILD index c5607e18ead..f0cb68a73fd 100644 --- a/pkg/controller/deployment/BUILD +++ b/pkg/controller/deployment/BUILD @@ -37,7 +37,7 @@ go_library( "//vendor:k8s.io/apimachinery/pkg/api/errors", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/labels", - "//vendor:k8s.io/apimachinery/pkg/runtime/schema", + "//vendor:k8s.io/apimachinery/pkg/types", "//vendor:k8s.io/apimachinery/pkg/util/errors", "//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/wait", diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index ac637347257..3e31684b274 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -31,8 +31,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime/schema" - utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" v1core "k8s.io/client-go/kubernetes/typed/core/v1" @@ -58,9 +57,8 @@ const ( maxRetries = 5 ) -func getDeploymentKind() schema.GroupVersionKind { - return extensions.SchemeGroupVersion.WithKind("Deployment") -} +// controllerKind contains the schema.GroupVersionKind for this controller type. +var controllerKind = extensions.SchemeGroupVersion.WithKind("Deployment") // DeploymentController is responsible for synchronizing Deployment objects stored // in the system with actual running replica sets and pods. @@ -174,28 +172,6 @@ func (dc *DeploymentController) updateDeployment(old, cur interface{}) { curD := cur.(*extensions.Deployment) glog.V(4).Infof("Updating deployment %s", oldD.Name) dc.enqueueDeployment(curD) - // If the selector of the current deployment just changed, we need to requeue any old - // overlapping deployments. If the new selector steps on another deployment, the current - // deployment will get denied during the resync loop. - if !reflect.DeepEqual(curD.Spec.Selector, oldD.Spec.Selector) { - deployments, err := dc.dLister.Deployments(curD.Namespace).List(labels.Everything()) - if err != nil { - utilruntime.HandleError(fmt.Errorf("error listing deployments in namespace %s: %v", curD.Namespace, err)) - return - } - // Trigger cleanup of any old overlapping deployments; we don't care about any error - // returned here. - for i := range deployments { - otherD := deployments[i] - - oldOverlaps, oldErr := util.OverlapsWith(oldD, otherD) - curOverlaps, curErr := util.OverlapsWith(curD, otherD) - // Enqueue otherD so it gets cleaned up - if oldErr == nil && curErr == nil && oldOverlaps && !curOverlaps { - dc.enqueueDeployment(otherD) - } - } - } } func (dc *DeploymentController) deleteDeployment(obj interface{}) { @@ -214,22 +190,6 @@ func (dc *DeploymentController) deleteDeployment(obj interface{}) { } glog.V(4).Infof("Deleting deployment %s", d.Name) dc.enqueueDeployment(d) - deployments, err := dc.dLister.Deployments(d.Namespace).List(labels.Everything()) - if err != nil { - utilruntime.HandleError(fmt.Errorf("error listing deployments in namespace %s: %v", d.Namespace, err)) - return - } - // Trigger cleanup of any old overlapping deployments; we don't care about any error - // returned here. - for i := range deployments { - otherD := deployments[i] - - overlaps, err := util.OverlapsWith(d, otherD) - // Enqueue otherD so it gets cleaned up - if err == nil && overlaps { - dc.enqueueDeployment(otherD) - } - } } // addReplicaSet enqueues the deployment that manages a ReplicaSet when the ReplicaSet is created. @@ -336,8 +296,20 @@ func (dc *DeploymentController) deletePod(obj interface{}) { } glog.V(4).Infof("Pod %s deleted.", pod.Name) if d := dc.getDeploymentForPod(pod); d != nil && d.Spec.Strategy.Type == extensions.RecreateDeploymentStrategyType { - podList, err := dc.listPods(d) - if err == nil && len(podList.Items) == 0 { + // Sync if this Deployment now has no more Pods. + rsList, err := dc.getReplicaSetsForDeployment(d) + if err != nil { + return + } + podMap, err := dc.getPodMapForReplicaSets(d.Namespace, rsList) + if err != nil { + return + } + numPods := 0 + for _, podList := range podMap { + numPods += len(podList.Items) + } + if numPods == 0 { dc.enqueueDeployment(d) } } @@ -447,23 +419,52 @@ func (dc *DeploymentController) handleErr(err error, key interface{}) { dc.queue.Forget(key) } -// claimReplicaSets uses NewReplicaSetControllerRefManager to classify ReplicaSets -// and adopts them if their labels match the Deployment but are missing the reference. -// It also removes the controllerRef for ReplicaSets, whose labels no longer matches -// the deployment. -func (dc *DeploymentController) claimReplicaSets(deployment *extensions.Deployment) error { - rsList, err := dc.rsLister.ReplicaSets(deployment.Namespace).List(labels.Everything()) +// getReplicaSetsForDeployment uses ControllerRefManager to reconcile +// ControllerRef by adopting and orphaning. +// It returns the list of ReplicaSets that this Deployment should manage. +func (dc *DeploymentController) getReplicaSetsForDeployment(d *extensions.Deployment) ([]*extensions.ReplicaSet, error) { + // List all ReplicaSets to find those we own but that no longer match our + // selector. They will be orphaned by ClaimReplicaSets(). + rsList, err := dc.rsLister.ReplicaSets(d.Namespace).List(labels.Everything()) if err != nil { - return err + return nil, err } + deploymentSelector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector) + if err != nil { + return nil, fmt.Errorf("deployment %s/%s has invalid label selector: %v", d.Namespace, d.Name, err) + } + cm := controller.NewReplicaSetControllerRefManager(dc.rsControl, d, deploymentSelector, controllerKind) + return cm.ClaimReplicaSets(rsList) +} - deploymentSelector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector) +// getPodMapForReplicaSets scans the list of all Pods and returns a map from +// RS UID to Pods controlled by that RS, based on the Pod's ControllerRef. +func (dc *DeploymentController) getPodMapForReplicaSets(namespace string, rsList []*extensions.ReplicaSet) (map[types.UID]*v1.PodList, error) { + // List all Pods. + pods, err := dc.podLister.Pods(namespace).List(labels.Everything()) if err != nil { - return fmt.Errorf("deployment %s/%s has invalid label selector: %v", deployment.Namespace, deployment.Name, err) + return nil, err } - cm := controller.NewReplicaSetControllerRefManager(dc.rsControl, deployment, deploymentSelector, getDeploymentKind()) - _, err = cm.ClaimReplicaSets(rsList) - return err + // Group Pods by their controller (if it's in rsList). + podMap := make(map[types.UID]*v1.PodList, len(rsList)) + for _, rs := range rsList { + podMap[rs.UID] = &v1.PodList{} + } + for _, pod := range pods { + // Ignore inactive Pods since that's what ReplicaSet does. + if !controller.IsPodActive(pod) { + continue + } + controllerRef := controller.GetControllerOf(pod) + if controllerRef == nil { + continue + } + // Only append if we care about this UID. + if podList, ok := podMap[controllerRef.UID]; ok { + podList.Items = append(podList.Items, *pod) + } + } + return podMap, nil } // syncDeployment will sync the deployment with the given key. @@ -506,25 +507,21 @@ func (dc *DeploymentController) syncDeployment(key string) error { return nil } - deployments, err := dc.dLister.Deployments(d.Namespace).List(labels.Everything()) + // List ReplicaSets owned by this Deployment, while reconciling ControllerRef + // through adoption/orphaning. + rsList, err := dc.getReplicaSetsForDeployment(d) if err != nil { - return fmt.Errorf("error listing deployments in namespace %s: %v", d.Namespace, err) + return err } - - // Handle overlapping deployments by deterministically avoid syncing deployments that fight over ReplicaSets. - overlaps, err := dc.handleOverlap(d, deployments) + // List all Pods owned by this Deployment, grouped by their ReplicaSet. + // This is expensive, so do it once and pass it along to subroutines. + podMap, err := dc.getPodMapForReplicaSets(d.Namespace, rsList) if err != nil { - if overlaps { - // Emit an event and return a nil error for overlapping deployments so we won't resync them again. - dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectorOverlap", err.Error()) - return nil - } - // For any other failure, we should retry the deployment. return err } if d.DeletionTimestamp != nil { - return dc.syncStatusOnly(d) + return dc.syncStatusOnly(d, rsList, podMap) } // Why run the cleanup policy only when there is no rollback request? @@ -536,7 +533,7 @@ func (dc *DeploymentController) syncDeployment(key string) error { // (and chances are higher that they will work again as opposed to others that didn't) for candidates to // automatically roll back to (#23211) and the cleanup policy should help. if d.Spec.RollbackTo == nil { - _, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, false) + _, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, false) if err != nil { return err } @@ -546,11 +543,6 @@ func (dc *DeploymentController) syncDeployment(key string) error { dc.cleanupDeployment(oldRSs, d) } - err = dc.claimReplicaSets(d) - if err != nil { - return err - } - // Update deployment conditions with an Unknown condition when pausing/resuming // a deployment. In this way, we can be sure that we won't timeout when a user // resumes a Deployment with a set progressDeadlineSeconds. @@ -558,7 +550,7 @@ func (dc *DeploymentController) syncDeployment(key string) error { return err } - _, err = dc.hasFailed(d) + _, err = dc.hasFailed(d, rsList, podMap) if err != nil { return err } @@ -567,152 +559,29 @@ func (dc *DeploymentController) syncDeployment(key string) error { // See https://github.com/kubernetes/kubernetes/issues/23211. if d.Spec.Paused { - return dc.sync(d) + return dc.sync(d, rsList, podMap) } // rollback is not re-entrant in case the underlying replica sets are updated with a new // revision so we should ensure that we won't proceed to update replica sets until we // make sure that the deployment has cleaned up its rollback spec in subsequent enqueues. if d.Spec.RollbackTo != nil { - return dc.rollback(d) + return dc.rollback(d, rsList, podMap) } - scalingEvent, err := dc.isScalingEvent(d) + scalingEvent, err := dc.isScalingEvent(d, rsList, podMap) if err != nil { return err } if scalingEvent { - return dc.sync(d) + return dc.sync(d, rsList, podMap) } switch d.Spec.Strategy.Type { case extensions.RecreateDeploymentStrategyType: - return dc.rolloutRecreate(d) + return dc.rolloutRecreate(d, rsList, podMap) case extensions.RollingUpdateDeploymentStrategyType: - return dc.rolloutRolling(d) + return dc.rolloutRolling(d, rsList, podMap) } return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type) } - -// handleOverlap will avoid syncing the newer overlapping ones (only sync the oldest one). New/old is -// determined by when the deployment's selector is last updated. -func (dc *DeploymentController) handleOverlap(d *extensions.Deployment, deployments []*extensions.Deployment) (bool, error) { - overlapping := false - var errs []error - - for i := range deployments { - otherD := deployments[i] - - if d.Name == otherD.Name { - continue - } - - // Error is already checked during validation - foundOverlaps, _ := util.OverlapsWith(d, otherD) - - // If the otherD deployment overlaps with the current we need to identify which one - // holds the set longer and mark the other as overlapping. Requeue the overlapping - // deployments if this one has been marked deleted, we only update its status as long - // as it is not actually deleted. - if foundOverlaps && d.DeletionTimestamp == nil { - overlapping = true - // Look at the overlapping annotation in both deployments. If one of them has it and points - // to the other one then we don't need to compare their timestamps. - otherOverlapsWith := otherD.Annotations[util.OverlapAnnotation] - currentOverlapsWith := d.Annotations[util.OverlapAnnotation] - // The other deployment is already marked as overlapping with the current one. - if otherOverlapsWith == d.Name { - var err error - if d, err = dc.clearDeploymentOverlap(d, otherD.Name); err != nil { - errs = append(errs, err) - } - continue - } - - otherCopy, err := util.DeploymentDeepCopy(otherD) - if err != nil { - return false, err - } - - // Skip syncing this one if older overlapping one is found. - if currentOverlapsWith == otherCopy.Name || util.SelectorUpdatedBefore(otherCopy, d) { - if _, err = dc.markDeploymentOverlap(d, otherCopy.Name); err != nil { - return false, err - } - if _, err = dc.clearDeploymentOverlap(otherCopy, d.Name); err != nil { - return false, err - } - return true, fmt.Errorf("deployment %s/%s has overlapping selector with an older deployment %s/%s, skip syncing it", d.Namespace, d.Name, otherCopy.Namespace, otherCopy.Name) - } - - // TODO: We need to support annotations in deployments that overlap with multiple other - // deployments. - if _, err = dc.markDeploymentOverlap(otherCopy, d.Name); err != nil { - errs = append(errs, err) - } - // This is going to get some deployments into update hotlooping if we remove the overlapping - // annotation unconditionally. - // - // Scenario: - // --> Deployment foo with label selector A=A is created. - // --> Deployment bar with label selector A=A,B=B is created. Marked as overlapping since it - // overlaps with foo. - // --> Deployment baz with label selector B=B is created. Marked as overlapping, since it - // overlaps with bar, bar overlapping annotation is cleaned up. Next sync loop marks bar - // as overlapping and it gets in an update hotloop. - if d, err = dc.clearDeploymentOverlap(d, otherCopy.Name); err != nil { - errs = append(errs, err) - } - continue - } - - // If the otherD deployment does not overlap with the current deployment *anymore* - // we need to cleanup otherD from the overlapping annotation so it can be synced by - // the deployment controller. - dName, hasOverlappingAnnotation := otherD.Annotations[util.OverlapAnnotation] - if hasOverlappingAnnotation && dName == d.Name { - otherCopy, err := util.DeploymentDeepCopy(otherD) - if err != nil { - return false, err - } - if _, err = dc.clearDeploymentOverlap(otherCopy, d.Name); err != nil { - errs = append(errs, err) - } - } - } - - if !overlapping { - var err error - if d, err = dc.clearDeploymentOverlap(d, ""); err != nil { - errs = append(errs, err) - } - } - - return false, utilerrors.NewAggregate(errs) -} - -func (dc *DeploymentController) markDeploymentOverlap(deployment *extensions.Deployment, withDeployment string) (*extensions.Deployment, error) { - if deployment.Annotations[util.OverlapAnnotation] == withDeployment && deployment.Status.ObservedGeneration >= deployment.Generation { - return deployment, nil - } - if deployment.Annotations == nil { - deployment.Annotations = make(map[string]string) - } - // Update observedGeneration for overlapping deployments so that their deletion won't be blocked. - deployment.Status.ObservedGeneration = deployment.Generation - deployment.Annotations[util.OverlapAnnotation] = withDeployment - return dc.client.Extensions().Deployments(deployment.Namespace).UpdateStatus(deployment) -} - -func (dc *DeploymentController) clearDeploymentOverlap(deployment *extensions.Deployment, otherName string) (*extensions.Deployment, error) { - overlapsWith := deployment.Annotations[util.OverlapAnnotation] - if len(overlapsWith) == 0 { - return deployment, nil - } - // This is not the deployment found in the annotation - do not remove the annotation. - if len(otherName) > 0 && otherName != overlapsWith { - return deployment, nil - } - delete(deployment.Annotations, util.OverlapAnnotation) - return dc.client.Extensions().Deployments(deployment.Namespace).UpdateStatus(deployment) -} diff --git a/pkg/controller/deployment/deployment_controller_test.go b/pkg/controller/deployment/deployment_controller_test.go index 7e63cf02915..dff3d54b8af 100644 --- a/pkg/controller/deployment/deployment_controller_test.go +++ b/pkg/controller/deployment/deployment_controller_test.go @@ -18,7 +18,6 @@ package deployment import ( "testing" - "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -108,13 +107,13 @@ func newDeployment(name string, replicas int, revisionHistoryLimit *int32, maxSu } func newReplicaSet(d *extensions.Deployment, name string, replicas int) *extensions.ReplicaSet { - control := true return &extensions.ReplicaSet{ ObjectMeta: metav1.ObjectMeta{ Name: name, + UID: uuid.NewUUID(), Namespace: metav1.NamespaceDefault, Labels: d.Spec.Selector.MatchLabels, - OwnerReferences: []metav1.OwnerReference{{APIVersion: getDeploymentKind().GroupVersion().Version, Kind: getDeploymentKind().Kind, Name: d.Name, UID: d.UID, Controller: &control}}, + OwnerReferences: []metav1.OwnerReference{*newControllerRef(d)}, }, Spec: extensions.ReplicaSetSpec{ Selector: d.Spec.Selector, @@ -311,195 +310,6 @@ func TestReentrantRollback(t *testing.T) { f.run(getKey(d, t)) } -// TestOverlappingDeployment ensures that an overlapping deployment will not be synced by -// the controller. -func TestOverlappingDeployment(t *testing.T) { - f := newFixture(t) - now := metav1.Now() - later := metav1.Time{Time: now.Add(time.Minute)} - - foo := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"}) - foo.CreationTimestamp = now - bar := newDeployment("bar", 1, nil, nil, nil, map[string]string{"foo": "bar", "app": "baz"}) - bar.CreationTimestamp = later - - f.dLister = append(f.dLister, foo, bar) - f.objects = append(f.objects, foo, bar) - - f.expectUpdateDeploymentStatusAction(bar) - f.run(getKey(bar, t)) - - for _, a := range filterInformerActions(f.client.Actions()) { - action, ok := a.(core.UpdateAction) - if !ok { - continue - } - d, ok := action.GetObject().(*extensions.Deployment) - if !ok { - continue - } - if d.Name == "bar" && d.Annotations[util.OverlapAnnotation] != "foo" { - t.Errorf("annotations weren't updated for the overlapping deployment: %v", d.Annotations) - } - } -} - -// TestSyncOverlappedDeployment ensures that from two overlapping deployments, the older -// one will be synced and the newer will be marked as overlapping. Note that in reality it's -// not always the older deployment that is the one that works vs the rest but the one which -// has the selector unchanged for longer time. -func TestSyncOverlappedDeployment(t *testing.T) { - f := newFixture(t) - now := metav1.Now() - later := metav1.Time{Time: now.Add(time.Minute)} - - foo := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"}) - foo.CreationTimestamp = now - bar := newDeployment("bar", 1, nil, nil, nil, map[string]string{"foo": "bar", "app": "baz"}) - bar.CreationTimestamp = later - - f.dLister = append(f.dLister, foo, bar) - f.objects = append(f.objects, foo, bar) - - f.expectUpdateDeploymentStatusAction(bar) - f.expectCreateRSAction(newReplicaSet(foo, "foo-rs", 1)) - f.expectUpdateDeploymentStatusAction(foo) - f.expectUpdateDeploymentStatusAction(foo) - f.run(getKey(foo, t)) - - for _, a := range filterInformerActions(f.client.Actions()) { - action, ok := a.(core.UpdateAction) - if !ok { - continue - } - d, ok := action.GetObject().(*extensions.Deployment) - if !ok { - continue - } - if d.Name == "bar" && d.Annotations[util.OverlapAnnotation] != "foo" { - t.Errorf("annotations weren't updated for the overlapping deployment: %v", d.Annotations) - } - } -} - -// TestSelectorUpdate ensures that from two overlapping deployments, the one that is working won't -// be marked as overlapping if its selector is updated but still overlaps with the other one. -func TestSelectorUpdate(t *testing.T) { - f := newFixture(t) - now := metav1.Now() - later := metav1.Time{Time: now.Add(time.Minute)} - selectorUpdated := metav1.Time{Time: later.Add(time.Minute)} - - foo := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"}) - foo.CreationTimestamp = now - foo.Annotations = map[string]string{util.SelectorUpdateAnnotation: selectorUpdated.Format(time.RFC3339)} - bar := newDeployment("bar", 1, nil, nil, nil, map[string]string{"foo": "bar", "app": "baz"}) - bar.CreationTimestamp = later - bar.Annotations = map[string]string{util.OverlapAnnotation: "foo"} - - f.dLister = append(f.dLister, foo, bar) - f.objects = append(f.objects, foo, bar) - - f.expectCreateRSAction(newReplicaSet(foo, "foo-rs", 1)) - f.expectUpdateDeploymentStatusAction(foo) - f.expectUpdateDeploymentStatusAction(foo) - f.run(getKey(foo, t)) - - for _, a := range filterInformerActions(f.client.Actions()) { - action, ok := a.(core.UpdateAction) - if !ok { - continue - } - d, ok := action.GetObject().(*extensions.Deployment) - if !ok { - continue - } - - if d.Name == "foo" && len(d.Annotations[util.OverlapAnnotation]) > 0 { - t.Errorf("deployment %q should not have the overlapping annotation", d.Name) - } - if d.Name == "bar" && len(d.Annotations[util.OverlapAnnotation]) == 0 { - t.Errorf("deployment %q should have the overlapping annotation", d.Name) - } - } -} - -// TestDeletedDeploymentShouldCleanupOverlaps ensures that the deletion of a deployment -// will cleanup any deployments that overlap with it. -func TestDeletedDeploymentShouldCleanupOverlaps(t *testing.T) { - f := newFixture(t) - now := metav1.Now() - earlier := metav1.Time{Time: now.Add(-time.Minute)} - later := metav1.Time{Time: now.Add(time.Minute)} - - foo := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"}) - foo.CreationTimestamp = earlier - foo.DeletionTimestamp = &now - bar := newDeployment("bar", 1, nil, nil, nil, map[string]string{"foo": "bar"}) - bar.CreationTimestamp = later - bar.Annotations = map[string]string{util.OverlapAnnotation: "foo"} - - f.dLister = append(f.dLister, foo, bar) - f.objects = append(f.objects, foo, bar) - - f.expectUpdateDeploymentStatusAction(bar) - f.expectUpdateDeploymentStatusAction(foo) - f.run(getKey(foo, t)) - - for _, a := range filterInformerActions(f.client.Actions()) { - action, ok := a.(core.UpdateAction) - if !ok { - continue - } - d := action.GetObject().(*extensions.Deployment) - if d.Name != "bar" { - continue - } - - if len(d.Annotations[util.OverlapAnnotation]) > 0 { - t.Errorf("annotations weren't cleaned up for the overlapping deployment: %v", d.Annotations) - } - } -} - -// TestDeletedDeploymentShouldNotCleanupOtherOverlaps ensures that the deletion of -// a deployment will not cleanup deployments that overlap with another deployment. -func TestDeletedDeploymentShouldNotCleanupOtherOverlaps(t *testing.T) { - f := newFixture(t) - now := metav1.Now() - earlier := metav1.Time{Time: now.Add(-time.Minute)} - later := metav1.Time{Time: now.Add(time.Minute)} - - foo := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"}) - foo.CreationTimestamp = earlier - foo.DeletionTimestamp = &now - bar := newDeployment("bar", 1, nil, nil, nil, map[string]string{"bla": "bla"}) - bar.CreationTimestamp = later - // Notice this deployment is overlapping with another deployment - bar.Annotations = map[string]string{util.OverlapAnnotation: "baz"} - - f.dLister = append(f.dLister, foo, bar) - f.objects = append(f.objects, foo, bar) - - f.expectUpdateDeploymentStatusAction(foo) - f.run(getKey(foo, t)) - - for _, a := range filterInformerActions(f.client.Actions()) { - action, ok := a.(core.UpdateAction) - if !ok { - continue - } - d := action.GetObject().(*extensions.Deployment) - if d.Name != "bar" { - continue - } - - if len(d.Annotations[util.OverlapAnnotation]) == 0 { - t.Errorf("overlapping annotation should not be cleaned up for bar: %v", d.Annotations) - } - } -} - // TestPodDeletionEnqueuesRecreateDeployment ensures that the deletion of a pod // will requeue a Recreate deployment iff there is no other pod returned from the // client. @@ -562,6 +372,179 @@ func TestPodDeletionDoesntEnqueueRecreateDeployment(t *testing.T) { } } +func TestGetReplicaSetsForDeployment(t *testing.T) { + f := newFixture(t) + + // Two Deployments with same labels. + d1 := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"}) + d2 := newDeployment("bar", 1, nil, nil, nil, map[string]string{"foo": "bar"}) + + // Two ReplicaSets that match labels for both Deployments, + // but have ControllerRefs to make ownership explicit. + rs1 := newReplicaSet(d1, "rs1", 1) + rs2 := newReplicaSet(d2, "rs2", 1) + + f.dLister = append(f.dLister, d1, d2) + f.rsLister = append(f.rsLister, rs1, rs2) + f.objects = append(f.objects, d1, d2, rs1, rs2) + + // Start the fixture. + c, informers := f.newController() + stopCh := make(chan struct{}) + defer close(stopCh) + informers.Start(stopCh) + + rsList, err := c.getReplicaSetsForDeployment(d1) + if err != nil { + t.Fatalf("getReplicaSetsForDeployment() error: %v", err) + } + rsNames := []string{} + for _, rs := range rsList { + rsNames = append(rsNames, rs.Name) + } + if len(rsNames) != 1 || rsNames[0] != rs1.Name { + t.Errorf("getReplicaSetsForDeployment() = %v, want [%v]", rsNames, rs1.Name) + } + + rsList, err = c.getReplicaSetsForDeployment(d2) + if err != nil { + t.Fatalf("getReplicaSetsForDeployment() error: %v", err) + } + rsNames = []string{} + for _, rs := range rsList { + rsNames = append(rsNames, rs.Name) + } + if len(rsNames) != 1 || rsNames[0] != rs2.Name { + t.Errorf("getReplicaSetsForDeployment() = %v, want [%v]", rsNames, rs2.Name) + } +} + +func TestGetReplicaSetsForDeploymentAdopt(t *testing.T) { + f := newFixture(t) + + d := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"}) + + // RS with matching labels, but orphaned. Should be adopted and returned. + rs := newReplicaSet(d, "rs", 1) + rs.OwnerReferences = nil + + f.dLister = append(f.dLister, d) + f.rsLister = append(f.rsLister, rs) + f.objects = append(f.objects, d, rs) + + // Start the fixture. + c, informers := f.newController() + stopCh := make(chan struct{}) + defer close(stopCh) + informers.Start(stopCh) + + rsList, err := c.getReplicaSetsForDeployment(d) + if err != nil { + t.Fatalf("getReplicaSetsForDeployment() error: %v", err) + } + rsNames := []string{} + for _, rs := range rsList { + rsNames = append(rsNames, rs.Name) + } + if len(rsNames) != 1 || rsNames[0] != rs.Name { + t.Errorf("getReplicaSetsForDeployment() = %v, want [%v]", rsNames, rs.Name) + } +} + +func TestGetReplicaSetsForDeploymentRelease(t *testing.T) { + f := newFixture(t) + + d := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"}) + + // RS with matching ControllerRef, but wrong labels. Should be released. + rs := newReplicaSet(d, "rs", 1) + rs.Labels = map[string]string{"foo": "notbar"} + + f.dLister = append(f.dLister, d) + f.rsLister = append(f.rsLister, rs) + f.objects = append(f.objects, d, rs) + + // Start the fixture. + c, informers := f.newController() + stopCh := make(chan struct{}) + defer close(stopCh) + informers.Start(stopCh) + + rsList, err := c.getReplicaSetsForDeployment(d) + if err != nil { + t.Fatalf("getReplicaSetsForDeployment() error: %v", err) + } + rsNames := []string{} + for _, rs := range rsList { + rsNames = append(rsNames, rs.Name) + } + if len(rsNames) != 0 { + t.Errorf("getReplicaSetsForDeployment() = %v, want []", rsNames) + } +} + +func TestGetPodMapForReplicaSets(t *testing.T) { + f := newFixture(t) + + d := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"}) + + // Two ReplicaSets that match labels for both Deployments, + // but have ControllerRefs to make ownership explicit. + rs1 := newReplicaSet(d, "rs1", 1) + rs2 := newReplicaSet(d, "rs2", 1) + + // Add a Pod for each ReplicaSet. + pod1 := generatePodFromRS(rs1) + pod2 := generatePodFromRS(rs2) + // Add a Pod that has matching labels, but no ControllerRef. + pod3 := generatePodFromRS(rs1) + pod3.Name = "pod3" + pod3.OwnerReferences = nil + // Add a Pod that has matching labels and ControllerRef, but is inactive. + pod4 := generatePodFromRS(rs1) + pod4.Name = "pod4" + pod4.Status.Phase = v1.PodFailed + + f.dLister = append(f.dLister, d) + f.rsLister = append(f.rsLister, rs1, rs2) + f.podLister = append(f.podLister, pod1, pod2, pod3, pod4) + f.objects = append(f.objects, d, rs1, rs2, pod1, pod2, pod3, pod4) + + // Start the fixture. + c, informers := f.newController() + stopCh := make(chan struct{}) + defer close(stopCh) + informers.Start(stopCh) + + podMap, err := c.getPodMapForReplicaSets(d.Namespace, f.rsLister) + if err != nil { + t.Fatalf("getPodMapForReplicaSets() error: %v", err) + } + podCount := 0 + for _, podList := range podMap { + podCount += len(podList.Items) + } + if got, want := podCount, 2; got != want { + t.Errorf("podCount = %v, want %v", got, want) + } + + if got, want := len(podMap), 2; got != want { + t.Errorf("len(podMap) = %v, want %v", got, want) + } + if got, want := len(podMap[rs1.UID].Items), 1; got != want { + t.Errorf("len(podMap[rs1]) = %v, want %v", got, want) + } + if got, want := podMap[rs1.UID].Items[0].Name, "rs1-pod"; got != want { + t.Errorf("podMap[rs1] = [%v], want [%v]", got, want) + } + if got, want := len(podMap[rs2.UID].Items), 1; got != want { + t.Errorf("len(podMap[rs2]) = %v, want %v", got, want) + } + if got, want := podMap[rs2.UID].Items[0].Name, "rs2-pod"; got != want { + t.Errorf("podMap[rs2] = [%v], want [%v]", got, want) + } +} + // generatePodFromRS creates a pod, with the input ReplicaSet's selector and its template func generatePodFromRS(rs *extensions.ReplicaSet) *v1.Pod { trueVar := true diff --git a/pkg/controller/deployment/progress.go b/pkg/controller/deployment/progress.go index 7363da98e16..9c02f2f669f 100644 --- a/pkg/controller/deployment/progress.go +++ b/pkg/controller/deployment/progress.go @@ -23,6 +23,7 @@ import ( "github.com/golang/glog" + "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/api/v1" extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" "k8s.io/kubernetes/pkg/controller/deployment/util" @@ -33,12 +34,12 @@ import ( // and when new pods scale up or old pods scale down. Progress is not estimated for paused // deployments or when users don't really care about it ie. progressDeadlineSeconds is not // specified. -func (dc *DeploymentController) hasFailed(d *extensions.Deployment) (bool, error) { +func (dc *DeploymentController) hasFailed(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) (bool, error) { if d.Spec.ProgressDeadlineSeconds == nil || d.Spec.RollbackTo != nil || d.Spec.Paused { return false, nil } - newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, false) + newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, false) if err != nil { return false, err } diff --git a/pkg/controller/deployment/recreate.go b/pkg/controller/deployment/recreate.go index 3c3be85481e..9499cc37dd6 100644 --- a/pkg/controller/deployment/recreate.go +++ b/pkg/controller/deployment/recreate.go @@ -17,14 +17,16 @@ limitations under the License. package deployment import ( + "k8s.io/apimachinery/pkg/types" + "k8s.io/kubernetes/pkg/api/v1" extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" "k8s.io/kubernetes/pkg/controller" ) // rolloutRecreate implements the logic for recreating a replica set. -func (dc *DeploymentController) rolloutRecreate(deployment *extensions.Deployment) error { +func (dc *DeploymentController) rolloutRecreate(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) error { // Don't create a new RS if not already existed, so that we avoid scaling up before scaling down - newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, false) + newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, false) if err != nil { return err } @@ -32,31 +34,29 @@ func (dc *DeploymentController) rolloutRecreate(deployment *extensions.Deploymen activeOldRSs := controller.FilterActiveReplicaSets(oldRSs) // scale down old replica sets - scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(activeOldRSs, deployment) + scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(activeOldRSs, d) if err != nil { return err } if scaledDown { // Update DeploymentStatus - return dc.syncRolloutStatus(allRSs, newRS, deployment) + return dc.syncRolloutStatus(allRSs, newRS, d) } - newStatus := calculateStatus(allRSs, newRS, deployment) + newStatus := calculateStatus(allRSs, newRS, d) // Do not process a deployment when it has old pods running. if newStatus.UpdatedReplicas == 0 { - podList, err := dc.listPods(deployment) - if err != nil { - return err - } - if len(podList.Items) > 0 { - return dc.syncRolloutStatus(allRSs, newRS, deployment) + for _, podList := range podMap { + if len(podList.Items) > 0 { + return dc.syncRolloutStatus(allRSs, newRS, d) + } } } // If we need to create a new RS, create it now // TODO: Create a new RS without re-listing all RSs. if newRS == nil { - newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(deployment, true) + newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, true) if err != nil { return err } @@ -64,17 +64,17 @@ func (dc *DeploymentController) rolloutRecreate(deployment *extensions.Deploymen } // scale up new replica set - scaledUp, err := dc.scaleUpNewReplicaSetForRecreate(newRS, deployment) + scaledUp, err := dc.scaleUpNewReplicaSetForRecreate(newRS, d) if err != nil { return err } if scaledUp { // Update DeploymentStatus - return dc.syncRolloutStatus(allRSs, newRS, deployment) + return dc.syncRolloutStatus(allRSs, newRS, d) } // Sync deployment status - return dc.syncRolloutStatus(allRSs, newRS, deployment) + return dc.syncRolloutStatus(allRSs, newRS, d) } // scaleDownOldReplicaSetsForRecreate scales down old replica sets when deployment strategy is "Recreate" diff --git a/pkg/controller/deployment/rollback.go b/pkg/controller/deployment/rollback.go index 68cec72f569..703cf682b19 100644 --- a/pkg/controller/deployment/rollback.go +++ b/pkg/controller/deployment/rollback.go @@ -21,14 +21,15 @@ import ( "github.com/golang/glog" + "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/api/v1" extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" ) // rollback the deployment to the specified revision. In any case cleanup the rollback spec. -func (dc *DeploymentController) rollback(d *extensions.Deployment) error { - newRS, allOldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, true) +func (dc *DeploymentController) rollback(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) error { + newRS, allOldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, true) if err != nil { return err } diff --git a/pkg/controller/deployment/rolling.go b/pkg/controller/deployment/rolling.go index c783cb048b9..6c979b6786a 100644 --- a/pkg/controller/deployment/rolling.go +++ b/pkg/controller/deployment/rolling.go @@ -21,42 +21,44 @@ import ( "sort" "github.com/golang/glog" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/integer" + "k8s.io/kubernetes/pkg/api/v1" extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" "k8s.io/kubernetes/pkg/controller" deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" ) // rolloutRolling implements the logic for rolling a new replica set. -func (dc *DeploymentController) rolloutRolling(deployment *extensions.Deployment) error { - newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, true) +func (dc *DeploymentController) rolloutRolling(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) error { + newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, true) if err != nil { return err } allRSs := append(oldRSs, newRS) // Scale up, if we can. - scaledUp, err := dc.reconcileNewReplicaSet(allRSs, newRS, deployment) + scaledUp, err := dc.reconcileNewReplicaSet(allRSs, newRS, d) if err != nil { return err } if scaledUp { // Update DeploymentStatus - return dc.syncRolloutStatus(allRSs, newRS, deployment) + return dc.syncRolloutStatus(allRSs, newRS, d) } // Scale down, if we can. - scaledDown, err := dc.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, deployment) + scaledDown, err := dc.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d) if err != nil { return err } if scaledDown { // Update DeploymentStatus - return dc.syncRolloutStatus(allRSs, newRS, deployment) + return dc.syncRolloutStatus(allRSs, newRS, d) } // Sync deployment status - return dc.syncRolloutStatus(allRSs, newRS, deployment) + return dc.syncRolloutStatus(allRSs, newRS, d) } func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) { diff --git a/pkg/controller/deployment/sync.go b/pkg/controller/deployment/sync.go index cf855939b1a..4571e4a4345 100644 --- a/pkg/controller/deployment/sync.go +++ b/pkg/controller/deployment/sync.go @@ -25,7 +25,7 @@ import ( "github.com/golang/glog" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" @@ -36,31 +36,31 @@ import ( ) // syncStatusOnly only updates Deployments Status and doesn't take any mutating actions. -func (dc *DeploymentController) syncStatusOnly(deployment *extensions.Deployment) error { - newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, false) +func (dc *DeploymentController) syncStatusOnly(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) error { + newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, false) if err != nil { return err } allRSs := append(oldRSs, newRS) - return dc.syncDeploymentStatus(allRSs, newRS, deployment) + return dc.syncDeploymentStatus(allRSs, newRS, d) } // sync is responsible for reconciling deployments on scaling events or when they // are paused. -func (dc *DeploymentController) sync(deployment *extensions.Deployment) error { - newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, false) +func (dc *DeploymentController) sync(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) error { + newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, false) if err != nil { return err } - if err := dc.scale(deployment, newRS, oldRSs); err != nil { + if err := dc.scale(d, newRS, oldRSs); err != nil { // If we get an error while trying to scale, the deployment will be requeued // so we can abort this resync return err } allRSs := append(oldRSs, newRS) - return dc.syncDeploymentStatus(allRSs, newRS, deployment) + return dc.syncDeploymentStatus(allRSs, newRS, d) } // checkPausedConditions checks if the given deployment is paused or not and adds an appropriate condition. @@ -98,25 +98,31 @@ func (dc *DeploymentController) checkPausedConditions(d *extensions.Deployment) } // getAllReplicaSetsAndSyncRevision returns all the replica sets for the provided deployment (new and all old), with new RS's and deployment's revision updated. +// +// rsList should come from getReplicaSetsForDeployment(d). +// podMap should come from getPodMapForReplicaSets(rsList). +// These are passed around to avoid repeating expensive API calls. +// // 1. Get all old RSes this deployment targets, and calculate the max revision number among them (maxOldV). // 2. Get new RS this deployment targets (whose pod template matches deployment's), and update new RS's revision number to (maxOldV + 1), // only if its revision number is smaller than (maxOldV + 1). If this step failed, we'll update it in the next deployment sync loop. // 3. Copy new RS's revision number to deployment (update deployment's revision). If this step failed, we'll update it in the next deployment sync loop. +// // Note that currently the deployment controller is using caches to avoid querying the server for reads. // This may lead to stale reads of replica sets, thus incorrect deployment status. -func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(deployment *extensions.Deployment, createIfNotExisted bool) (*extensions.ReplicaSet, []*extensions.ReplicaSet, error) { +func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList, createIfNotExisted bool) (*extensions.ReplicaSet, []*extensions.ReplicaSet, error) { // List the deployment's RSes & Pods and apply pod-template-hash info to deployment's adopted RSes/Pods - rsList, podList, err := dc.rsAndPodsWithHashKeySynced(deployment) + rsList, podList, err := dc.rsAndPodsWithHashKeySynced(d, rsList, podMap) if err != nil { return nil, nil, fmt.Errorf("error labeling replica sets and pods with pod-template-hash: %v", err) } - _, allOldRSs, err := deploymentutil.FindOldReplicaSets(deployment, rsList, podList) + _, allOldRSs, err := deploymentutil.FindOldReplicaSets(d, rsList, podList) if err != nil { return nil, nil, err } // Get new replica set with the updated revision number - newRS, err := dc.getNewReplicaSet(deployment, rsList, allOldRSs, createIfNotExisted) + newRS, err := dc.getNewReplicaSet(d, rsList, allOldRSs, createIfNotExisted) if err != nil { return nil, nil, err } @@ -124,33 +130,28 @@ func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(deployment *ext return newRS, allOldRSs, nil } -// rsAndPodsWithHashKeySynced returns the RSes and pods the given deployment targets, with pod-template-hash information synced. -func (dc *DeploymentController) rsAndPodsWithHashKeySynced(deployment *extensions.Deployment) ([]*extensions.ReplicaSet, *v1.PodList, error) { - rsList, err := deploymentutil.ListReplicaSets(deployment, - func(namespace string, options metav1.ListOptions) ([]*extensions.ReplicaSet, error) { - parsed, err := labels.Parse(options.LabelSelector) - if err != nil { - return nil, err - } - return dc.rsLister.ReplicaSets(namespace).List(parsed) - }) - if err != nil { - return nil, nil, fmt.Errorf("error listing ReplicaSets: %v", err) - } +// rsAndPodsWithHashKeySynced returns the RSes and pods the given deployment +// targets, with pod-template-hash information synced. +// +// rsList should come from getReplicaSetsForDeployment(d). +// podMap should come from getPodMapForReplicaSets(rsList). +// These are passed around to avoid repeating expensive API calls. +func (dc *DeploymentController) rsAndPodsWithHashKeySynced(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) ([]*extensions.ReplicaSet, *v1.PodList, error) { syncedRSList := []*extensions.ReplicaSet{} for _, rs := range rsList { // Add pod-template-hash information if it's not in the RS. // Otherwise, new RS produced by Deployment will overlap with pre-existing ones // that aren't constrained by the pod-template-hash. - syncedRS, err := dc.addHashKeyToRSAndPods(rs) + syncedRS, err := dc.addHashKeyToRSAndPods(rs, podMap[rs.UID]) if err != nil { return nil, nil, err } syncedRSList = append(syncedRSList, syncedRS) } - syncedPodList, err := dc.listPods(deployment) - if err != nil { - return nil, nil, err + // Put all Pods from podMap into one list. + syncedPodList := &v1.PodList{} + for _, podList := range podMap { + syncedPodList.Items = append(syncedPodList.Items, podList.Items...) } return syncedRSList, syncedPodList, nil } @@ -159,7 +160,7 @@ func (dc *DeploymentController) rsAndPodsWithHashKeySynced(deployment *extension // 1. Add hash label to the rs's pod template, and make sure the controller sees this update so that no orphaned pods will be created // 2. Add hash label to all pods this rs owns, wait until replicaset controller reports rs.Status.FullyLabeledReplicas equal to the desired number of replicas // 3. Add hash label to the rs's label and selector -func (dc *DeploymentController) addHashKeyToRSAndPods(rs *extensions.ReplicaSet) (*extensions.ReplicaSet, error) { +func (dc *DeploymentController) addHashKeyToRSAndPods(rs *extensions.ReplicaSet, podList *v1.PodList) (*extensions.ReplicaSet, error) { // If the rs already has the new hash label in its selector, it's done syncing if labelsutil.SelectorHasLabel(rs.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) { return rs, nil @@ -189,24 +190,7 @@ func (dc *DeploymentController) addHashKeyToRSAndPods(rs *extensions.ReplicaSet) } // 2. Update all pods managed by the rs to have the new hash label, so they will be correctly adopted. - selector, err := metav1.LabelSelectorAsSelector(updatedRS.Spec.Selector) - if err != nil { - return nil, fmt.Errorf("error in converting selector to label selector for replica set %s: %s", updatedRS.Name, err) - } - options := metav1.ListOptions{LabelSelector: selector.String()} - parsed, err := labels.Parse(options.LabelSelector) - if err != nil { - return nil, err - } - pods, err := dc.podLister.Pods(updatedRS.Namespace).List(parsed) - if err != nil { - return nil, fmt.Errorf("error in getting pod list for namespace %s and list options %+v: %s", rs.Namespace, options, err) - } - podList := v1.PodList{Items: make([]v1.Pod, 0, len(pods))} - for i := range pods { - podList.Items = append(podList.Items, *pods[i]) - } - if err := deploymentutil.LabelPodsWithHash(&podList, dc.client, dc.podLister, rs.Namespace, rs.Name, hash); err != nil { + if err := deploymentutil.LabelPodsWithHash(podList, dc.client, dc.podLister, rs.Namespace, rs.Name, hash); err != nil { return nil, fmt.Errorf("error in adding template hash label %s to pods %+v: %s", hash, podList, err) } @@ -242,22 +226,6 @@ func (dc *DeploymentController) addHashKeyToRSAndPods(rs *extensions.ReplicaSet) return updatedRS, nil } -func (dc *DeploymentController) listPods(deployment *extensions.Deployment) (*v1.PodList, error) { - return deploymentutil.ListPods(deployment, - func(namespace string, options metav1.ListOptions) (*v1.PodList, error) { - parsed, err := labels.Parse(options.LabelSelector) - if err != nil { - return nil, err - } - pods, err := dc.podLister.Pods(namespace).List(parsed) - result := v1.PodList{Items: make([]v1.Pod, 0, len(pods))} - for i := range pods { - result.Items = append(result.Items, *pods[i]) - } - return &result, err - }) -} - // Returns a replica set that matches the intent of the given deployment. Returns nil if the new replica set doesn't exist yet. // 1. Get existing new RS (the RS that the given deployment targets, whose pod template is the same as deployment's). // 2. If there's existing new RS, update its revision number if it's smaller than (maxOldRevision + 1), where maxOldRevision is the max revision number among all old RSes. @@ -329,25 +297,17 @@ func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployme newRS := extensions.ReplicaSet{ ObjectMeta: metav1.ObjectMeta{ // Make the name deterministic, to ensure idempotence - Name: deployment.Name + "-" + podTemplateSpecHash, - Namespace: namespace, + Name: deployment.Name + "-" + podTemplateSpecHash, + Namespace: namespace, + OwnerReferences: []metav1.OwnerReference{*newControllerRef(deployment)}, }, Spec: extensions.ReplicaSetSpec{ - Replicas: func(i int32) *int32 { return &i }(0), + Replicas: new(int32), MinReadySeconds: deployment.Spec.MinReadySeconds, Selector: newRSSelector, Template: newRSTemplate, }, } - var trueVar = true - controllerRef := &metav1.OwnerReference{ - APIVersion: getDeploymentKind().GroupVersion().String(), - Kind: getDeploymentKind().Kind, - Name: deployment.Name, - UID: deployment.UID, - Controller: &trueVar, - } - newRS.OwnerReferences = append(newRS.OwnerReferences, *controllerRef) allRSs := append(oldRSs, &newRS) newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, &newRS) if err != nil { @@ -632,8 +592,12 @@ func calculateStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaS // isScalingEvent checks whether the provided deployment has been updated with a scaling event // by looking at the desired-replicas annotation in the active replica sets of the deployment. -func (dc *DeploymentController) isScalingEvent(d *extensions.Deployment) (bool, error) { - newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, false) +// +// rsList should come from getReplicaSetsForDeployment(d). +// podMap should come from getPodMapForReplicaSets(rsList). +// These are passed around to avoid repeating expensive API calls. +func (dc *DeploymentController) isScalingEvent(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) (bool, error) { + newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, false) if err != nil { return false, err } @@ -649,3 +613,15 @@ func (dc *DeploymentController) isScalingEvent(d *extensions.Deployment) (bool, } return false, nil } + +// newControllerRef returns a ControllerRef pointing to the deployment. +func newControllerRef(d *extensions.Deployment) *metav1.OwnerReference { + isController := true + return &metav1.OwnerReference{ + APIVersion: controllerKind.GroupVersion().String(), + Kind: controllerKind.Kind, + Name: d.Name, + UID: d.UID, + Controller: &isController, + } +}