diff --git a/pkg/controller/controller_ref_manager.go b/pkg/controller/controller_ref_manager.go index 2f14a353585..cb9d4ea4e14 100644 --- a/pkg/controller/controller_ref_manager.go +++ b/pkg/controller/controller_ref_manager.go @@ -65,7 +65,7 @@ func (m *PodControllerRefManager) Classify(pods []*v1.Pod) ( pod.Namespace, pod.Name, pod.Status.Phase, pod.DeletionTimestamp) continue } - controllerRef := getControllerOf(pod.ObjectMeta) + controllerRef := GetControllerOf(pod.ObjectMeta) if controllerRef != nil { if controllerRef.UID == m.controllerObject.UID { // already controlled @@ -90,9 +90,9 @@ func (m *PodControllerRefManager) Classify(pods []*v1.Pod) ( return matchesAndControlled, matchesNeedsController, controlledDoesNotMatch } -// getControllerOf returns the controllerRef if controllee has a controller, +// GetControllerOf returns the controllerRef if controllee has a controller, // otherwise returns nil. -func getControllerOf(controllee v1.ObjectMeta) *metav1.OwnerReference { +func GetControllerOf(controllee v1.ObjectMeta) *metav1.OwnerReference { for _, owner := range controllee.OwnerReferences { // controlled by other controller if owner.Controller != nil && *owner.Controller == true { diff --git a/pkg/controller/deployment/BUILD b/pkg/controller/deployment/BUILD index 59a0e16cc81..de4044c2a7e 100644 --- a/pkg/controller/deployment/BUILD +++ b/pkg/controller/deployment/BUILD @@ -29,7 +29,6 @@ go_library( "//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/client/clientset_generated/clientset/typed/core/v1:go_default_library", "//pkg/client/record:go_default_library", - "//pkg/client/retry:go_default_library", "//pkg/controller:go_default_library", "//pkg/controller/deployment/util:go_default_library", "//pkg/controller/informers:go_default_library", @@ -49,12 +48,14 @@ go_test( name = "go_default_test", srcs = [ "deployment_controller_test.go", + "recreate_test.go", "rolling_test.go", "sync_test.go", ], library = "go_default_library", tags = ["automanaged"], deps = [ + "//pkg/api:go_default_library", "//pkg/api/v1:go_default_library", "//pkg/apimachinery/registered:go_default_library", "//pkg/apis/extensions/v1beta1:go_default_library", diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index b03dbfc725c..ae5fb644731 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -117,6 +117,9 @@ func NewDeploymentController(dInformer informers.DeploymentInformer, rsInformer UpdateFunc: dc.updateReplicaSet, DeleteFunc: dc.deleteReplicaSet, }) + podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + DeleteFunc: dc.deletePod, + }) dc.syncHandler = dc.syncDeployment dc.dLister = dInformer.Lister() @@ -167,12 +170,12 @@ func (dc *DeploymentController) deleteDeployment(obj interface{}) { if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - glog.Errorf("Couldn't get object from tombstone %#v", obj) + utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) return } d, ok = tombstone.Obj.(*extensions.Deployment) if !ok { - glog.Errorf("Tombstone contained object that is not a Deployment %#v", obj) + utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Deployment %#v", obj)) return } } @@ -202,7 +205,8 @@ func (dc *DeploymentController) getDeploymentForReplicaSet(rs *extensions.Replic // trying to clean up one of the controllers, for now we just return the older one if len(deployments) > 1 { sort.Sort(util.BySelectorLastUpdateTime(deployments)) - glog.Errorf("user error! more than one deployment is selecting replica set %s/%s with labels: %#v, returning %s/%s", rs.Namespace, rs.Name, rs.Labels, deployments[0].Namespace, deployments[0].Name) + glog.V(4).Infof("user error! more than one deployment is selecting replica set %s/%s with labels: %#v, returning %s/%s", + rs.Namespace, rs.Name, rs.Labels, deployments[0].Namespace, deployments[0].Name) } return deployments[0] } @@ -246,12 +250,12 @@ func (dc *DeploymentController) deleteReplicaSet(obj interface{}) { if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - glog.Errorf("Couldn't get object from tombstone %#v, could take up to %v before a deployment recreates/updates replicasets", obj, FullDeploymentResyncPeriod) + utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v, could take up to %v before a deployment recreates/updates replicasets", obj, FullDeploymentResyncPeriod)) return } rs, ok = tombstone.Obj.(*extensions.ReplicaSet) if !ok { - glog.Errorf("Tombstone contained object that is not a ReplicaSet %#v, could take up to %v before a deployment recreates/updates replicasets", obj, FullDeploymentResyncPeriod) + utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a ReplicaSet %#v, could take up to %v before a deployment recreates/updates replicasets", obj, FullDeploymentResyncPeriod)) return } } @@ -261,20 +265,48 @@ func (dc *DeploymentController) deleteReplicaSet(obj interface{}) { } } +// deletePod will enqueue a Recreate Deployment once all of its pods have stopped running. +func (dc *DeploymentController) deletePod(obj interface{}) { + pod, ok := obj.(*v1.Pod) + + // When a delete is dropped, the relist will notice a pod in the store not + // in the list, leading to the insertion of a tombstone object which contains + // the deleted key/value. Note that this value might be stale. If the Pod + // changed labels the new deployment will not be woken up till the periodic resync. + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v, could take up to %v before a deployment recreates/updates pod", obj, FullDeploymentResyncPeriod)) + return + } + pod, ok = tombstone.Obj.(*v1.Pod) + if !ok { + utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %#v, could take up to %v before a deployment recreates/updates pods", obj, FullDeploymentResyncPeriod)) + return + } + } + if d := dc.getDeploymentForPod(pod); d != nil && d.Spec.Strategy.Type == extensions.RecreateDeploymentStrategyType { + podList, err := dc.listPods(d) + if err == nil && len(podList.Items) == 0 { + dc.enqueueDeployment(d) + } + } +} + func (dc *DeploymentController) enqueueDeployment(deployment *extensions.Deployment) { key, err := controller.KeyFunc(deployment) if err != nil { - glog.Errorf("Couldn't get key for object %#v: %v", deployment, err) + utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", deployment, err)) return } dc.queue.Add(key) } -// enqueueAfter will enqueue a deployment after the provided amount of time in a secondary queue. +// checkProgressAfter will enqueue a deployment after the provided amount of time in a secondary queue. // Once the deployment is popped out of the secondary queue, it is checked for progress and requeued // back to the main queue iff it has failed progressing. -func (dc *DeploymentController) enqueueAfter(deployment *extensions.Deployment, after time.Duration) { +func (dc *DeploymentController) checkProgressAfter(deployment *extensions.Deployment, after time.Duration) { key, err := controller.KeyFunc(deployment) if err != nil { utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", deployment, err)) @@ -284,6 +316,42 @@ func (dc *DeploymentController) enqueueAfter(deployment *extensions.Deployment, dc.progressQueue.AddAfter(key, after) } +// getDeploymentForPod returns the deployment managing the given Pod. +func (dc *DeploymentController) getDeploymentForPod(pod *v1.Pod) *extensions.Deployment { + // Find the owning replica set + var rs *extensions.ReplicaSet + var err error + // Look at the owner reference + controllerRef := controller.GetControllerOf(pod.ObjectMeta) + if controllerRef != nil { + // Not a pod owned by a replica set. + if controllerRef.Kind != extensions.SchemeGroupVersion.WithKind("ReplicaSet").Kind { + return nil + } + rs, err = dc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name) + if err != nil { + glog.V(4).Infof("Cannot get replicaset %q for pod %q: %v", controllerRef.Name, pod.Name, err) + return nil + } + } else { + // Fallback to listing replica sets. + rss, err := dc.rsLister.GetPodReplicaSets(pod) + if err != nil { + glog.V(4).Infof("Cannot list replica sets for pod %q: %v", pod.Name, err) + return nil + } + // TODO: Handle multiple replica sets gracefully + // For now we return the oldest replica set. + if len(rss) > 1 { + utilruntime.HandleError(fmt.Errorf("more than one ReplicaSet is selecting pod %q with labels: %+v", pod.Name, pod.Labels)) + sort.Sort(controller.ReplicaSetsByCreationTimestamp(rss)) + } + rs = rss[0] + } + + return dc.getDeploymentForReplicaSet(rs) +} + // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. func (dc *DeploymentController) worker() { @@ -332,7 +400,7 @@ func (dc *DeploymentController) syncDeployment(key string) error { obj, exists, err := dc.dLister.Indexer.GetByKey(key) if err != nil { - glog.Errorf("Unable to retrieve deployment %v from store: %v", key, err) + utilruntime.HandleError(fmt.Errorf("Unable to retrieve deployment %v from store: %v", key, err)) return err } if !exists { diff --git a/pkg/controller/deployment/deployment_controller_test.go b/pkg/controller/deployment/deployment_controller_test.go index 88ff4c44e3f..6ef70b78c04 100644 --- a/pkg/controller/deployment/deployment_controller_test.go +++ b/pkg/controller/deployment/deployment_controller_test.go @@ -231,7 +231,9 @@ func TestSyncDeploymentDontDoAnythingDuringDeletion(t *testing.T) { now := metav1.Now() d.DeletionTimestamp = &now f.dLister = append(f.dLister, d) + f.objects = append(f.objects, d) + f.expectUpdateDeploymentStatusAction(d) f.run(getKey(d, t)) } diff --git a/pkg/controller/deployment/progress.go b/pkg/controller/deployment/progress.go index 28d87912c8f..02a784e5c40 100644 --- a/pkg/controller/deployment/progress.go +++ b/pkg/controller/deployment/progress.go @@ -60,7 +60,7 @@ func (dc *DeploymentController) hasFailed(d *extensions.Deployment) (bool, error // See https://github.com/kubernetes/kubernetes/issues/18568 allRSs := append(oldRSs, newRS) - newStatus := dc.calculateStatus(allRSs, newRS, d) + newStatus := calculateStatus(allRSs, newRS, d) // If the deployment is complete or it is progressing, there is no need to check if it // has timed out. @@ -77,7 +77,7 @@ func (dc *DeploymentController) hasFailed(d *extensions.Deployment) (bool, error // for example a resync of the deployment after it was scaled up. In those cases, // we shouldn't try to estimate any progress. func (dc *DeploymentController) syncRolloutStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, d *extensions.Deployment) error { - newStatus := dc.calculateStatus(allRSs, newRS, d) + newStatus := calculateStatus(allRSs, newRS, d) // If there is no progressDeadlineSeconds set, remove any Progressing condition. if d.Spec.ProgressDeadlineSeconds == nil { @@ -88,23 +88,25 @@ func (dc *DeploymentController) syncRolloutStatus(allRSs []*extensions.ReplicaSe // a new rollout and this is a resync where we don't need to estimate any progress. // In such a case, we should simply not estimate any progress for this deployment. currentCond := util.GetDeploymentCondition(d.Status, extensions.DeploymentProgressing) - isResyncEvent := newStatus.Replicas == newStatus.UpdatedReplicas && currentCond != nil && currentCond.Reason == util.NewRSAvailableReason + isCompleteDeployment := newStatus.Replicas == newStatus.UpdatedReplicas && currentCond != nil && currentCond.Reason == util.NewRSAvailableReason // Check for progress only if there is a progress deadline set and the latest rollout - // hasn't completed yet. We also need to ensure the new replica set exists, otherwise - // we cannot estimate any progress. - if d.Spec.ProgressDeadlineSeconds != nil && !isResyncEvent && newRS != nil { + // hasn't completed yet. + if d.Spec.ProgressDeadlineSeconds != nil && !isCompleteDeployment { switch { case util.DeploymentComplete(d, &newStatus): // Update the deployment conditions with a message for the new replica set that // was successfully deployed. If the condition already exists, we ignore this update. - msg := fmt.Sprintf("Replica set %q has successfully progressed.", newRS.Name) + msg := fmt.Sprintf("ReplicaSet %q has successfully progressed.", newRS.Name) condition := util.NewDeploymentCondition(extensions.DeploymentProgressing, v1.ConditionTrue, util.NewRSAvailableReason, msg) util.SetDeploymentCondition(&newStatus, *condition) case util.DeploymentProgressing(d, &newStatus): // If there is any progress made, continue by not checking if the deployment failed. This // behavior emulates the rolling updater progressDeadline check. - msg := fmt.Sprintf("Replica set %q is progressing.", newRS.Name) + msg := fmt.Sprintf("Deployment %q is progressing.", d.Name) + if newRS != nil { + msg = fmt.Sprintf("ReplicaSet %q is progressing.", newRS.Name) + } condition := util.NewDeploymentCondition(extensions.DeploymentProgressing, v1.ConditionTrue, util.ReplicaSetUpdatedReason, msg) // Update the current Progressing condition or add a new one if it doesn't exist. // If a Progressing condition with status=true already exists, we should update @@ -124,7 +126,10 @@ func (dc *DeploymentController) syncRolloutStatus(allRSs []*extensions.ReplicaSe case util.DeploymentTimedOut(d, &newStatus): // Update the deployment with a timeout condition. If the condition already exists, // we ignore this update. - msg := fmt.Sprintf("Replica set %q has timed out progressing.", newRS.Name) + msg := fmt.Sprintf("Deployment %q has timed out progressing.", d.Name) + if newRS != nil { + msg = fmt.Sprintf("ReplicaSet %q has timed out progressing.", newRS.Name) + } condition := util.NewDeploymentCondition(extensions.DeploymentProgressing, v1.ConditionFalse, util.TimedOutReason, msg) util.SetDeploymentCondition(&newStatus, *condition) } @@ -154,7 +159,7 @@ func (dc *DeploymentController) syncRolloutStatus(allRSs []*extensions.ReplicaSe after := time.Now().Add(time.Duration(*d.Spec.ProgressDeadlineSeconds) * time.Second).Sub(currentCond.LastUpdateTime.Time) glog.V(4).Infof("Queueing up deployment %q for a progress check after %ds", d.Name, int(after.Seconds())) - dc.enqueueAfter(d, after) + dc.checkProgressAfter(d, after) } return nil } diff --git a/pkg/controller/deployment/recreate.go b/pkg/controller/deployment/recreate.go index 6b81063632d..54e4290a9ff 100644 --- a/pkg/controller/deployment/recreate.go +++ b/pkg/controller/deployment/recreate.go @@ -17,12 +17,8 @@ limitations under the License. package deployment import ( - "fmt" - extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" - "k8s.io/kubernetes/pkg/client/retry" "k8s.io/kubernetes/pkg/controller" - "k8s.io/kubernetes/pkg/util/wait" ) // rolloutRecreate implements the logic for recreating a replica set. @@ -45,9 +41,16 @@ func (dc *DeploymentController) rolloutRecreate(deployment *extensions.Deploymen return dc.syncRolloutStatus(allRSs, newRS, deployment) } - // Wait for all old replica set to scale down to zero. - if err := dc.waitForInactiveReplicaSets(activeOldRSs); err != nil { - return err + newStatus := calculateStatus(allRSs, newRS, deployment) + // Do not process a deployment when it has old pods running. + if newStatus.UpdatedReplicas == 0 { + podList, err := dc.listPods(deployment) + if err != nil { + return err + } + if len(podList.Items) > 0 { + return dc.syncRolloutStatus(allRSs, newRS, deployment) + } } // If we need to create a new RS, create it now @@ -97,40 +100,6 @@ func (dc *DeploymentController) scaleDownOldReplicaSetsForRecreate(oldRSs []*ext return scaled, nil } -// waitForInactiveReplicaSets will wait until all passed replica sets are inactive and have been noticed -// by the replica set controller. -func (dc *DeploymentController) waitForInactiveReplicaSets(oldRSs []*extensions.ReplicaSet) error { - for i := range oldRSs { - rs := oldRSs[i] - desiredGeneration := rs.Generation - observedGeneration := rs.Status.ObservedGeneration - specReplicas := *(rs.Spec.Replicas) - statusReplicas := rs.Status.Replicas - - if err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) { - replicaSet, err := dc.rsLister.ReplicaSets(rs.Namespace).Get(rs.Name) - if err != nil { - return false, err - } - - specReplicas = *(replicaSet.Spec.Replicas) - statusReplicas = replicaSet.Status.Replicas - observedGeneration = replicaSet.Status.ObservedGeneration - - // TODO: We also need to wait for terminating replicas to actually terminate. - // See https://github.com/kubernetes/kubernetes/issues/32567 - return observedGeneration >= desiredGeneration && *(replicaSet.Spec.Replicas) == 0 && replicaSet.Status.Replicas == 0, nil - }); err != nil { - if err == wait.ErrWaitTimeout { - err = fmt.Errorf("replica set %q never became inactive: synced=%t, spec.replicas=%d, status.replicas=%d", - rs.Name, observedGeneration >= desiredGeneration, specReplicas, statusReplicas) - } - return err - } - } - return nil -} - // scaleUpNewReplicaSetForRecreate scales up new replica set when deployment strategy is "Recreate" func (dc *DeploymentController) scaleUpNewReplicaSetForRecreate(newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) { scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, *(deployment.Spec.Replicas), deployment) diff --git a/pkg/controller/deployment/recreate_test.go b/pkg/controller/deployment/recreate_test.go new file mode 100644 index 00000000000..60f19548655 --- /dev/null +++ b/pkg/controller/deployment/recreate_test.go @@ -0,0 +1,82 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deployment + +import ( + "fmt" + "testing" + + "k8s.io/kubernetes/pkg/api" + extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/informers" + "k8s.io/kubernetes/pkg/runtime" +) + +func TestScaleDownOldReplicaSets(t *testing.T) { + tests := []struct { + oldRSSizes []int + d *extensions.Deployment + }{ + { + oldRSSizes: []int{3}, + d: newDeployment("foo", 3, nil, nil, nil, map[string]string{"foo": "bar"}), + }, + } + + for i := range tests { + t.Logf("running scenario %d", i) + test := tests[i] + + var oldRSs []*extensions.ReplicaSet + var expected []runtime.Object + + for n, size := range test.oldRSSizes { + rs := newReplicaSet(test.d, fmt.Sprintf("%s-%d", test.d.Name, n), size) + oldRSs = append(oldRSs, rs) + + objCopy, err := api.Scheme.Copy(rs) + if err != nil { + t.Errorf("unexpected error while deep-copying: %v", err) + continue + } + rsCopy := objCopy.(*extensions.ReplicaSet) + + zero := int32(0) + rsCopy.Spec.Replicas = &zero + expected = append(expected, rsCopy) + + if *(oldRSs[n].Spec.Replicas) == *(expected[n].(*extensions.ReplicaSet).Spec.Replicas) { + t.Errorf("broken test - original and expected RS have the same size") + } + } + + kc := fake.NewSimpleClientset(expected...) + informers := informers.NewSharedInformerFactory(kc, nil, controller.NoResyncPeriodFunc()) + c := NewDeploymentController(informers.Deployments(), informers.ReplicaSets(), informers.Pods(), kc) + + c.scaleDownOldReplicaSetsForRecreate(oldRSs, test.d) + for j := range oldRSs { + rs := oldRSs[j] + + if *rs.Spec.Replicas != 0 { + t.Errorf("rs %q has non-zero replicas", rs.Name) + } + } + } +} diff --git a/pkg/controller/deployment/sync.go b/pkg/controller/deployment/sync.go index 94741171001..169e7d23abc 100644 --- a/pkg/controller/deployment/sync.go +++ b/pkg/controller/deployment/sync.go @@ -558,7 +558,7 @@ func (dc *DeploymentController) cleanupDeployment(oldRSs []*extensions.ReplicaSe // syncDeploymentStatus checks if the status is up-to-date and sync it if necessary func (dc *DeploymentController) syncDeploymentStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, d *extensions.Deployment) error { - newStatus := dc.calculateStatus(allRSs, newRS, d) + newStatus := calculateStatus(allRSs, newRS, d) if reflect.DeepEqual(d.Status, newStatus) { return nil @@ -570,7 +570,8 @@ func (dc *DeploymentController) syncDeploymentStatus(allRSs []*extensions.Replic return err } -func (dc *DeploymentController) calculateStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) extensions.DeploymentStatus { +// calculateStatus calculates the latest status for the provided deployment by looking into the provided replica sets. +func calculateStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) extensions.DeploymentStatus { availableReplicas := deploymentutil.GetAvailableReplicaCountForReplicaSets(allRSs) totalReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs) unavailableReplicas := totalReplicas - availableReplicas @@ -580,23 +581,30 @@ func (dc *DeploymentController) calculateStatus(allRSs []*extensions.ReplicaSet, unavailableReplicas = 0 } - if availableReplicas >= *(deployment.Spec.Replicas)-deploymentutil.MaxUnavailable(*deployment) { - minAvailability := deploymentutil.NewDeploymentCondition(extensions.DeploymentAvailable, v1.ConditionTrue, deploymentutil.MinimumReplicasAvailable, "Deployment has minimum availability.") - deploymentutil.SetDeploymentCondition(&deployment.Status, *minAvailability) - } else { - noMinAvailability := deploymentutil.NewDeploymentCondition(extensions.DeploymentAvailable, v1.ConditionFalse, deploymentutil.MinimumReplicasUnavailable, "Deployment does not have minimum availability.") - deploymentutil.SetDeploymentCondition(&deployment.Status, *noMinAvailability) - } - - return extensions.DeploymentStatus{ + status := extensions.DeploymentStatus{ // TODO: Ensure that if we start retrying status updates, we won't pick up a new Generation value. ObservedGeneration: deployment.Generation, Replicas: deploymentutil.GetActualReplicaCountForReplicaSets(allRSs), UpdatedReplicas: deploymentutil.GetActualReplicaCountForReplicaSets([]*extensions.ReplicaSet{newRS}), AvailableReplicas: availableReplicas, UnavailableReplicas: unavailableReplicas, - Conditions: deployment.Status.Conditions, } + + // Copy conditions one by one so we won't mutate the original object. + conditions := deployment.Status.Conditions + for i := range conditions { + status.Conditions = append(status.Conditions, conditions[i]) + } + + if availableReplicas >= *(deployment.Spec.Replicas)-deploymentutil.MaxUnavailable(*deployment) { + minAvailability := deploymentutil.NewDeploymentCondition(extensions.DeploymentAvailable, v1.ConditionTrue, deploymentutil.MinimumReplicasAvailable, "Deployment has minimum availability.") + deploymentutil.SetDeploymentCondition(&status, *minAvailability) + } else { + noMinAvailability := deploymentutil.NewDeploymentCondition(extensions.DeploymentAvailable, v1.ConditionFalse, deploymentutil.MinimumReplicasUnavailable, "Deployment does not have minimum availability.") + deploymentutil.SetDeploymentCondition(&status, *noMinAvailability) + } + + return status } // isScalingEvent checks whether the provided deployment has been updated with a scaling event diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index e9c87fbff61..ab2160502e5 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -207,7 +207,7 @@ func (rsc *ReplicaSetController) getPodReplicaSet(pod *v1.Pod) *extensions.Repli // overlap, sort by creation timestamp, subsort by name, then pick // the first. utilruntime.HandleError(fmt.Errorf("user error! more than one ReplicaSet is selecting pods with labels: %+v", pod.Labels)) - sort.Sort(overlappingReplicaSets(rss)) + sort.Sort(controller.ReplicaSetsByCreationTimestamp(rss)) } // update lookup cache diff --git a/pkg/controller/replicaset/replica_set_utils.go b/pkg/controller/replicaset/replica_set_utils.go index 6fe67f66ac1..b9dbbbdd654 100644 --- a/pkg/controller/replicaset/replica_set_utils.go +++ b/pkg/controller/replicaset/replica_set_utils.go @@ -83,19 +83,6 @@ func updateReplicaSetStatus(c unversionedextensions.ReplicaSetInterface, rs exte } } -// overlappingReplicaSets sorts a list of ReplicaSets by creation timestamp, using their names as a tie breaker. -type overlappingReplicaSets []*extensions.ReplicaSet - -func (o overlappingReplicaSets) Len() int { return len(o) } -func (o overlappingReplicaSets) Swap(i, j int) { o[i], o[j] = o[j], o[i] } - -func (o overlappingReplicaSets) Less(i, j int) bool { - if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) { - return o[i].Name < o[j].Name - } - return o[i].CreationTimestamp.Before(o[j].CreationTimestamp) -} - func calculateStatus(rs extensions.ReplicaSet, filteredPods []*v1.Pod, manageReplicasErr error) extensions.ReplicaSetStatus { newStatus := rs.Status // Count the number of pods that have labels matching the labels of the pod diff --git a/test/e2e/deployment.go b/test/e2e/deployment.go index 086876f5680..49056839d2f 100644 --- a/test/e2e/deployment.go +++ b/test/e2e/deployment.go @@ -66,9 +66,6 @@ var _ = framework.KubeDescribe("Deployment", func() { It("RollingUpdateDeployment should delete old pods and create new ones", func() { testRollingUpdateDeployment(f) }) - It("RollingUpdateDeployment should scale up and down in the right order", func() { - testRollingUpdateDeploymentEvents(f) - }) It("RecreateDeployment should delete old pods and create new ones", func() { testRecreateDeployment(f) }) @@ -314,7 +311,13 @@ func testRollingUpdateDeployment(f *framework.Framework) { rsName := "test-rolling-update-controller" replicas := int32(3) - _, err := c.Extensions().ReplicaSets(ns).Create(newRS(rsName, replicas, rsPodLabels, nginxImageName, nginxImage)) + rsRevision := "3546343826724305832" + annotations := make(map[string]string) + annotations[deploymentutil.RevisionAnnotation] = rsRevision + rs := newRS(rsName, replicas, rsPodLabels, nginxImageName, nginxImage) + rs.Annotations = annotations + By(fmt.Sprintf("Creating replica set %q (going to be adopted)", rs.Name)) + _, err := c.Extensions().ReplicaSets(ns).Create(rs) Expect(err).NotTo(HaveOccurred()) // Verify that the required pods have come up. err = framework.VerifyPods(c, ns, "sample-pod", false, 3) @@ -325,18 +328,21 @@ func testRollingUpdateDeployment(f *framework.Framework) { // Create a deployment to delete nginx pods and instead bring up redis pods. deploymentName := "test-rolling-update-deployment" - framework.Logf("Creating deployment %s", deploymentName) + By(fmt.Sprintf("Creating deployment %q", deploymentName)) deploy, err := c.Extensions().Deployments(ns).Create(newDeployment(deploymentName, replicas, deploymentPodLabels, redisImageName, redisImage, extensions.RollingUpdateDeploymentStrategyType, nil)) Expect(err).NotTo(HaveOccurred()) - // Wait for it to be updated to revision 1 - err = framework.WaitForDeploymentRevisionAndImage(c, ns, deploymentName, "1", redisImage) + // Wait for it to be updated to revision 3546343826724305833. + By(fmt.Sprintf("Ensuring deployment %q gets the next revision from the one the adopted replica set %q has", deploy.Name, rs.Name)) + err = framework.WaitForDeploymentRevisionAndImage(c, ns, deploymentName, "3546343826724305833", redisImage) Expect(err).NotTo(HaveOccurred()) + By(fmt.Sprintf("Ensuring status for deployment %q is the expected", deploy.Name)) err = framework.WaitForDeploymentStatus(c, deploy) Expect(err).NotTo(HaveOccurred()) // There should be 1 old RS (nginx-controller, which is adopted) + By(fmt.Sprintf("Ensuring deployment %q has one old replica set (the one it adopted)", deploy.Name)) deployment, err := c.Extensions().Deployments(ns).Get(deploymentName, metav1.GetOptions{}) Expect(err).NotTo(HaveOccurred()) _, allOldRSs, err := deploymentutil.GetOldReplicaSets(deployment, c) @@ -348,114 +354,34 @@ func testRollingUpdateDeployment(f *framework.Framework) { Expect(len(allOldRSs[0].Spec.Template.Labels[extensions.DefaultDeploymentUniqueLabelKey])).Should(BeNumerically(">", 0)) } -func testRollingUpdateDeploymentEvents(f *framework.Framework) { - ns := f.Namespace.Name - c := f.ClientSet - // Create nginx pods. - deploymentPodLabels := map[string]string{"name": "sample-pod-2"} - rsPodLabels := map[string]string{ - "name": "sample-pod-2", - "pod": nginxImageName, - } - rsName := "test-rolling-scale-controller" - replicas := int32(1) - - rsRevision := "3546343826724305832" - annotations := make(map[string]string) - annotations[deploymentutil.RevisionAnnotation] = rsRevision - rs := newRS(rsName, replicas, rsPodLabels, nginxImageName, nginxImage) - rs.Annotations = annotations - - _, err := c.Extensions().ReplicaSets(ns).Create(rs) - Expect(err).NotTo(HaveOccurred()) - // Verify that the required pods have come up. - err = framework.VerifyPods(c, ns, "sample-pod-2", false, 1) - if err != nil { - framework.Logf("error in waiting for pods to come up: %s", err) - Expect(err).NotTo(HaveOccurred()) - } - - // Create a deployment to delete nginx pods and instead bring up redis pods. - deploymentName := "test-rolling-scale-deployment" - framework.Logf("Creating deployment %s", deploymentName) - deploy, err := c.Extensions().Deployments(ns).Create(newDeployment(deploymentName, replicas, deploymentPodLabels, redisImageName, redisImage, extensions.RollingUpdateDeploymentStrategyType, nil)) - Expect(err).NotTo(HaveOccurred()) - - // Wait for it to be updated to revision 3546343826724305833 - err = framework.WaitForDeploymentRevisionAndImage(c, ns, deploymentName, "3546343826724305833", redisImage) - Expect(err).NotTo(HaveOccurred()) - - err = framework.WaitForDeploymentStatus(c, deploy) - Expect(err).NotTo(HaveOccurred()) - // Verify that the pods were scaled up and down as expected. We use events to verify that. - deployment, err := c.Extensions().Deployments(ns).Get(deploymentName, metav1.GetOptions{}) - Expect(err).NotTo(HaveOccurred()) - framework.WaitForEvents(c, ns, deployment, 2) - events, err := c.Core().Events(ns).Search(deployment) - if err != nil { - framework.Logf("error in listing events: %s", err) - Expect(err).NotTo(HaveOccurred()) - } - // There should be 2 events, one to scale up the new ReplicaSet and then to scale down - // the old ReplicaSet. - Expect(len(events.Items)).Should(Equal(2)) - newRS, err := deploymentutil.GetNewReplicaSet(deployment, c) - Expect(err).NotTo(HaveOccurred()) - Expect(newRS).NotTo(Equal(nil)) - Expect(events.Items[0].Message).Should(Equal(fmt.Sprintf("Scaled up replica set %s to 1", newRS.Name))) - Expect(events.Items[1].Message).Should(Equal(fmt.Sprintf("Scaled down replica set %s to 0", rsName))) -} - func testRecreateDeployment(f *framework.Framework) { ns := f.Namespace.Name c := f.ClientSet - // Create nginx pods. - deploymentPodLabels := map[string]string{"name": "sample-pod-3"} - rsPodLabels := map[string]string{ - "name": "sample-pod-3", - "pod": nginxImageName, - } - rsName := "test-recreate-controller" - replicas := int32(3) - _, err := c.Extensions().ReplicaSets(ns).Create(newRS(rsName, replicas, rsPodLabels, nginxImageName, nginxImage)) - Expect(err).NotTo(HaveOccurred()) - // Verify that the required pods have come up. - err = framework.VerifyPods(c, ns, "sample-pod-3", false, 3) - if err != nil { - framework.Logf("error in waiting for pods to come up: %s", err) - Expect(err).NotTo(HaveOccurred()) - } - - // Create a deployment to delete nginx pods and instead bring up redis pods. + // Create a deployment that brings up redis pods. deploymentName := "test-recreate-deployment" - framework.Logf("Creating deployment %s", deploymentName) - deploy, err := c.Extensions().Deployments(ns).Create(newDeployment(deploymentName, replicas, deploymentPodLabels, redisImageName, redisImage, extensions.RecreateDeploymentStrategyType, nil)) + By(fmt.Sprintf("Creating deployment %q", deploymentName)) + deployment, err := c.Extensions().Deployments(ns).Create(newDeployment(deploymentName, int32(3), map[string]string{"name": "sample-pod-3"}, redisImageName, redisImage, extensions.RecreateDeploymentStrategyType, nil)) Expect(err).NotTo(HaveOccurred()) // Wait for it to be updated to revision 1 + By(fmt.Sprintf("Waiting deployment %q to be updated to revision 1", deploymentName)) err = framework.WaitForDeploymentRevisionAndImage(c, ns, deploymentName, "1", redisImage) Expect(err).NotTo(HaveOccurred()) - err = framework.WaitForDeploymentStatus(c, deploy) + By(fmt.Sprintf("Waiting deployment %q to complete", deploymentName)) + Expect(framework.WaitForDeploymentStatusValid(c, deployment)).NotTo(HaveOccurred()) + + // Update deployment to delete redis pods and bring up nginx pods. + By(fmt.Sprintf("Triggering a new rollout for deployment %q", deploymentName)) + deployment, err = framework.UpdateDeploymentWithRetries(c, ns, deploymentName, func(update *extensions.Deployment) { + update.Spec.Template.Spec.Containers[0].Name = nginxImageName + update.Spec.Template.Spec.Containers[0].Image = nginxImage + }) Expect(err).NotTo(HaveOccurred()) - // Verify that the pods were scaled up and down as expected. We use events to verify that. - deployment, err := c.Extensions().Deployments(ns).Get(deploymentName, metav1.GetOptions{}) - Expect(err).NotTo(HaveOccurred()) - framework.WaitForEvents(c, ns, deployment, 2) - events, err := c.Core().Events(ns).Search(deployment) - if err != nil { - framework.Logf("error in listing events: %s", err) - Expect(err).NotTo(HaveOccurred()) - } - // There should be 2 events, one to scale up the new ReplicaSet and then to scale down the old ReplicaSet. - Expect(len(events.Items)).Should(Equal(2)) - newRS, err := deploymentutil.GetNewReplicaSet(deployment, c) - Expect(err).NotTo(HaveOccurred()) - Expect(newRS).NotTo(Equal(nil)) - Expect(events.Items[0].Message).Should(Equal(fmt.Sprintf("Scaled down replica set %s to 0", rsName))) - Expect(events.Items[1].Message).Should(Equal(fmt.Sprintf("Scaled up replica set %s to 3", newRS.Name))) + By(fmt.Sprintf("Watching deployment %q to verify that new pods will not run with olds pods", deploymentName)) + Expect(framework.WatchRecreateDeployment(c, deployment)).NotTo(HaveOccurred()) } // testDeploymentCleanUpPolicy tests that deployment supports cleanup policy @@ -494,6 +420,7 @@ func testDeploymentCleanUpPolicy(f *framework.Framework) { } stopCh := make(chan struct{}) w, err := c.Core().Pods(ns).Watch(options) + Expect(err).NotTo(HaveOccurred()) go func() { // There should be only one pod being created, which is the pod with the redis image. // The old RS shouldn't create new pod when deployment controller adding pod template hash label to its selector. diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index 64dd28b75ce..cd7a130f1c6 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -3290,6 +3290,40 @@ func WaitForDeploymentRollbackCleared(c clientset.Interface, ns, deploymentName return nil } +// WatchRecreateDeployment watches Recreate deployments and ensures no new pods will run at the same time with +// old pods. +func WatchRecreateDeployment(c clientset.Interface, d *extensions.Deployment) error { + if d.Spec.Strategy.Type != extensions.RecreateDeploymentStrategyType { + return fmt.Errorf("deployment %q does not use a Recreate strategy: %s", d.Name, d.Spec.Strategy.Type) + } + + w, err := c.Extensions().Deployments(d.Namespace).Watch(v1.SingleObject(v1.ObjectMeta{Name: d.Name, ResourceVersion: d.ResourceVersion})) + if err != nil { + return err + } + + status := d.Status + + condition := func(event watch.Event) (bool, error) { + d := event.Object.(*extensions.Deployment) + status = d.Status + + if d.Status.UpdatedReplicas > 0 && d.Status.Replicas != d.Status.UpdatedReplicas { + return false, fmt.Errorf("deployment %q is running new pods alongside old pods: %#v", d.Name, status) + } + + return *(d.Spec.Replicas) == d.Status.Replicas && + *(d.Spec.Replicas) == d.Status.UpdatedReplicas && + d.Generation <= d.Status.ObservedGeneration, nil + } + + _, err = watch.Until(2*time.Minute, w, condition) + if err == wait.ErrWaitTimeout { + err = fmt.Errorf("deployment %q never completed: %#v", d.Name, status) + } + return err +} + // WaitForDeploymentRevisionAndImage waits for the deployment's and its new RS's revision and container image to match the given revision and image. // Note that deployment revision and its new RS revision should be updated shortly, so we only wait for 1 minute here to fail early. func WaitForDeploymentRevisionAndImage(c clientset.Interface, ns, deploymentName string, revision, image string) error { diff --git a/test/test_owners.csv b/test/test_owners.csv index 2345aa2afb4..df920368d1a 100644 --- a/test/test_owners.csv +++ b/test/test_owners.csv @@ -66,7 +66,6 @@ Density should allow running maximum capacity pods on nodes,smarterclayton,1 Density should allow starting * pods per node using *,derekwaynecarr,0 Deployment RecreateDeployment should delete old pods and create new ones,pwittrock,0 Deployment RollingUpdateDeployment should delete old pods and create new ones,pwittrock,0 -Deployment RollingUpdateDeployment should scale up and down in the right order,pwittrock,0 Deployment deployment reaping should cascade to its replica sets and pods,wojtek-t,1 Deployment deployment should create new pods,pwittrock,0 Deployment deployment should delete old replica sets,pwittrock,0