diff --git a/pkg/controller/statefulset/stateful_set_control.go b/pkg/controller/statefulset/stateful_set_control.go index 6389d79117a..87cf3a9ba9a 100644 --- a/pkg/controller/statefulset/stateful_set_control.go +++ b/pkg/controller/statefulset/stateful_set_control.go @@ -132,6 +132,13 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p if !isCreated(replicas[i]) { return ssc.podControl.CreateStatefulPod(set, replicas[i]) } + // If we find a Pod that is currently terminating, we must wait until graceful deletion + // completes before we continue to make progress. + if isTerminating(replicas[i]) { + glog.V(2).Infof("StatefulSet %s is waiting for Pod %s to Terminate", + set.Name, replicas[i].Name) + return nil + } // If we have a Pod that has been created but is not running and ready we can not make progress. // We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its // ordinal, are Running and Ready. diff --git a/pkg/controller/statefulset/stateful_set_control_test.go b/pkg/controller/statefulset/stateful_set_control_test.go index 87bcd78ba74..4ec84d541c4 100644 --- a/pkg/controller/statefulset/stateful_set_control_test.go +++ b/pkg/controller/statefulset/stateful_set_control_test.go @@ -404,6 +404,72 @@ func TestDefaultStatefulSetControlUpdatePodFailure(t *testing.T) { } } +func TestDefaultStatefulSetControlBlocksOnTerminating(t *testing.T) { + set := newStatefulSet(3) + client := fake.NewSimpleClientset(set) + + informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) + spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets()) + ssc := NewDefaultStatefulSetControl(spc) + spc.SetUpdateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0) + + stop := make(chan struct{}) + defer close(stop) + informerFactory.Start(stop) + cache.WaitForCacheSync( + stop, + informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced, + informerFactory.Core().V1().Pods().Informer().HasSynced, + ) + + if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + var err error + set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + t.Fatalf("Error getting updated StatefulSet: %v", err) + } + if set.Status.Replicas != 3 { + t.Fatal("Failed to scale StatefulSet to 3 replicas") + } + // scale the set and add a terminated pod + *set.Spec.Replicas = 4 + pods, err := spc.addTerminatingPod(set, 2) + if err != nil { + t.Fatal(err) + } + if err := ssc.UpdateStatefulSet(set, pods); err != nil { + t.Fatal(err) + } + pods, err = spc.podsLister.List(labels.Everything()) + if err != nil { + t.Fatalf("Error listing pods: %v", err) + } + if len(pods) != 3 { + t.Fatalf("Expected 3 pods, got %d", len(pods)) + } + sort.Sort(ascendingOrdinal(pods)) + spc.DeleteStatefulPod(set, pods[2]) + pods, err = spc.podsLister.List(labels.Everything()) + if err != nil { + t.Fatalf("Error listing pods: %v", err) + } + if len(pods) != 2 { + t.Fatalf("Expected 3 pods, got %d", len(pods)) + } + if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + t.Fatalf("Error getting updated StatefulSet: %v", err) + } + if set.Status.Replicas != 4 { + t.Fatal("Failed to scale StatefulSet to 3 replicas") + } +} + func TestDefaultStatefulSetControlUpdateSetStatusFailure(t *testing.T) { set := newStatefulSet(3) client := fake.NewSimpleClientset(set) @@ -691,7 +757,7 @@ func (spc *fakeStatefulPodControl) setPodInitStatus(set *apps.StatefulSet, ordin return spc.podsLister.Pods(set.Namespace).List(selector) } -func (spc *fakeStatefulPodControl) addTerminatedPod(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) { +func (spc *fakeStatefulPodControl) addTerminatingPod(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) { pod := newStatefulSetPod(set, ordinal) pod.Status.Phase = v1.PodRunning deleted := metav1.NewTime(time.Now()) @@ -907,7 +973,7 @@ func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlIn if err != nil { return err } - if pods, err = spc.addTerminatedPod(set, ordinal); err != nil { + if pods, err = spc.addTerminatingPod(set, ordinal); err != nil { return err } if err = ssc.UpdateStatefulSet(set, pods); err != nil { diff --git a/pkg/controller/statefulset/stateful_set_test.go b/pkg/controller/statefulset/stateful_set_test.go index 71b24b9171c..1f73b051268 100644 --- a/pkg/controller/statefulset/stateful_set_test.go +++ b/pkg/controller/statefulset/stateful_set_test.go @@ -91,11 +91,11 @@ func TestStatefulSetControllerRespectsTermination(t *testing.T) { if set.Status.Replicas != 3 { t.Error("Falied to scale statefulset to 3 replicas") } - pods, err := spc.addTerminatedPod(set, 3) + pods, err := spc.addTerminatingPod(set, 3) if err != nil { t.Error(err) } - pods, err = spc.addTerminatedPod(set, 4) + pods, err = spc.addTerminatingPod(set, 4) if err != nil { t.Error(err) } @@ -669,7 +669,7 @@ func scaleDownStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetContr spc.setsIndexer.Add(set) ssc.enqueueStatefulSet(set) fakeWorker(ssc) - pods, err = spc.addTerminatedPod(set, ord) + pods, err = spc.addTerminatingPod(set, ord) pod = getPodAtOrdinal(pods, ord) ssc.updatePod(&prev, pod) fakeWorker(ssc) @@ -679,7 +679,7 @@ func scaleDownStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetContr for set.Status.Replicas > *set.Spec.Replicas { pods, err = spc.podsLister.Pods(set.Namespace).List(selector) ord := len(pods) - pods, err = spc.addTerminatedPod(set, ord) + pods, err = spc.addTerminatingPod(set, ord) pod = getPodAtOrdinal(pods, ord) ssc.updatePod(&prev, pod) fakeWorker(ssc) diff --git a/pkg/controller/statefulset/stateful_set_utils.go b/pkg/controller/statefulset/stateful_set_utils.go index 1f0c3cba43c..12bc47827a6 100644 --- a/pkg/controller/statefulset/stateful_set_utils.go +++ b/pkg/controller/statefulset/stateful_set_utils.go @@ -217,14 +217,14 @@ func isFailed(pod *v1.Pod) bool { return pod.Status.Phase == v1.PodFailed } -// isTerminated returns true if pod's deletion Timestamp has been set -func isTerminated(pod *v1.Pod) bool { +// isTerminating returns true if pod's DeletionTimestamp has been set +func isTerminating(pod *v1.Pod) bool { return pod.DeletionTimestamp != nil } // isHealthy returns true if pod is running and ready and has not been terminated func isHealthy(pod *v1.Pod) bool { - return isRunningAndReady(pod) && !isTerminated(pod) + return isRunningAndReady(pod) && !isTerminating(pod) } // newControllerRef returns an ControllerRef pointing to a given StatefulSet.