diff --git a/pkg/controller/statefulset/stateful_set.go b/pkg/controller/statefulset/stateful_set.go index 907f6bebbe8..068d11c7e79 100644 --- a/pkg/controller/statefulset/stateful_set.go +++ b/pkg/controller/statefulset/stateful_set.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" appsinformers "k8s.io/client-go/informers/apps/v1" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" @@ -39,8 +40,10 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/history" + "k8s.io/kubernetes/pkg/features" "k8s.io/klog/v2" ) @@ -85,7 +88,6 @@ func NewStatefulSetController( eventBroadcaster.StartStructuredLogging(0) eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "statefulset-controller"}) - ssc := &StatefulSetController{ kubeClient: kubeClient, control: NewDefaultStatefulSetControl( @@ -221,6 +223,15 @@ func (ssc *StatefulSetController) updatePod(old, cur interface{}) { } klog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta) ssc.enqueueStatefulSet(set) + // TODO: MinReadySeconds in the Pod will generate an Available condition to be added in + // the Pod status which in turn will trigger a requeue of the owning replica set thus + // having its status updated with the newly available replica. + if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetMinReadySeconds) && !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && set.Spec.MinReadySeconds > 0 { + klog.V(2).Infof("StatefulSet %s will be enqueued after %ds for availability check", set.Name, set.Spec.MinReadySeconds) + // Add a second to avoid milliseconds skew in AddAfter. + // See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info. + ssc.enqueueSSAfter(set, (time.Duration(set.Spec.MinReadySeconds)*time.Second)+time.Second) + } return } @@ -380,6 +391,16 @@ func (ssc *StatefulSetController) enqueueStatefulSet(obj interface{}) { ssc.queue.Add(key) } +// enqueueStatefulSet enqueues the given statefulset in the work queue after given time +func (ssc *StatefulSetController) enqueueSSAfter(ss *apps.StatefulSet, duration time.Duration) { + key, err := controller.KeyFunc(ss) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", ss, err)) + return + } + ssc.queue.AddAfter(key, duration) +} + // processNextWorkItem dequeues items, processes them, and marks them done. It enforces that the syncHandler is never // invoked concurrently with the same key. func (ssc *StatefulSetController) processNextWorkItem() bool { @@ -446,10 +467,18 @@ func (ssc *StatefulSetController) sync(key string) error { // syncStatefulSet syncs a tuple of (statefulset, []*v1.Pod). func (ssc *StatefulSetController) syncStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error { klog.V(4).Infof("Syncing StatefulSet %v/%v with %d pods", set.Namespace, set.Name, len(pods)) + var status *apps.StatefulSetStatus + var err error // TODO: investigate where we mutate the set during the update as it is not obvious. - if err := ssc.control.UpdateStatefulSet(set.DeepCopy(), pods); err != nil { + status, err = ssc.control.UpdateStatefulSet(set.DeepCopy(), pods) + if err != nil { return err } klog.V(4).Infof("Successfully synced StatefulSet %s/%s successful", set.Namespace, set.Name) + // One more sync to handle the clock skew. This is also helping in requeuing right after status update + if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetMinReadySeconds) && set.Spec.MinReadySeconds > 0 && status != nil && status.AvailableReplicas != *set.Spec.Replicas { + ssc.enqueueSSAfter(set, time.Duration(set.Spec.MinReadySeconds)*time.Second) + } + return nil } diff --git a/pkg/controller/statefulset/stateful_set_control.go b/pkg/controller/statefulset/stateful_set_control.go index 8e71c53e455..d43b8e621c1 100644 --- a/pkg/controller/statefulset/stateful_set_control.go +++ b/pkg/controller/statefulset/stateful_set_control.go @@ -23,9 +23,11 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilerrors "k8s.io/apimachinery/pkg/util/errors" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller/history" + "k8s.io/kubernetes/pkg/features" ) // StatefulSetControl implements the control logic for updating StatefulSets and their children Pods. It is implemented @@ -36,7 +38,7 @@ type StatefulSetControlInterface interface { // If an implementation returns a non-nil error, the invocation will be retried using a rate-limited strategy. // Implementors should sink any errors that they do not wish to trigger a retry, and they may feel free to // exit exceptionally at any point provided they wish the update to be re-run at a later point in time. - UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error + UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error) // ListRevisions returns a array of the ControllerRevisions that represent the revisions of set. If the returned // error is nil, the returns slice of ControllerRevisions is valid. ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error) @@ -71,60 +73,57 @@ type defaultStatefulSetControl struct { // strategy allows these constraints to be relaxed - pods will be created and deleted eagerly and // in no particular order. Clients using the burst strategy should be careful to ensure they // understand the consistency implications of having unpredictable numbers of pods available. -func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error { - +func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error) { // list all revisions and sort them revisions, err := ssc.ListRevisions(set) if err != nil { - return err + return nil, err } history.SortControllerRevisions(revisions) - currentRevision, updateRevision, err := ssc.performUpdate(set, pods, revisions) + currentRevision, updateRevision, status, err := ssc.performUpdate(set, pods, revisions) if err != nil { - return utilerrors.NewAggregate([]error{err, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)}) + return nil, utilerrors.NewAggregate([]error{err, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)}) } // maintain the set's revision history limit - return ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision) + return status, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision) } func (ssc *defaultStatefulSetControl) performUpdate( - set *apps.StatefulSet, pods []*v1.Pod, revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, error) { - + set *apps.StatefulSet, pods []*v1.Pod, revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, *apps.StatefulSetStatus, error) { + var currentStatus *apps.StatefulSetStatus // get the current, and update revisions currentRevision, updateRevision, collisionCount, err := ssc.getStatefulSetRevisions(set, revisions) if err != nil { - return currentRevision, updateRevision, err + return currentRevision, updateRevision, currentStatus, err } // perform the main update function and get the status - status, err := ssc.updateStatefulSet(set, currentRevision, updateRevision, collisionCount, pods) + currentStatus, err = ssc.updateStatefulSet(set, currentRevision, updateRevision, collisionCount, pods) if err != nil { - return currentRevision, updateRevision, err + return currentRevision, updateRevision, currentStatus, err } - // update the set's status - err = ssc.updateStatefulSetStatus(set, status) + err = ssc.updateStatefulSetStatus(set, currentStatus) if err != nil { - return currentRevision, updateRevision, err + return currentRevision, updateRevision, currentStatus, err } - klog.V(4).Infof("StatefulSet %s/%s pod status replicas=%d ready=%d current=%d updated=%d", set.Namespace, set.Name, - status.Replicas, - status.ReadyReplicas, - status.CurrentReplicas, - status.UpdatedReplicas) + currentStatus.Replicas, + currentStatus.ReadyReplicas, + currentStatus.CurrentReplicas, + currentStatus.UpdatedReplicas) klog.V(4).Infof("StatefulSet %s/%s revisions current=%s update=%s", set.Namespace, set.Name, - status.CurrentRevision, - status.UpdateRevision) + currentStatus.CurrentRevision, + currentStatus.UpdateRevision) - return currentRevision, updateRevision, nil + return currentRevision, updateRevision, currentStatus, nil } func (ssc *defaultStatefulSetControl) ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error) { @@ -307,6 +306,15 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( // count the number of running and ready replicas if isRunningAndReady(pods[i]) { status.ReadyReplicas++ + // count the number of running and available replicas + if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetMinReadySeconds) { + if isRunningAndAvailable(pods[i], set.Spec.MinReadySeconds) { + status.AvailableReplicas++ + } + } else { + // If the featuregate is not enabled, all the ready replicas should be considered as available replicas + status.AvailableReplicas = status.ReadyReplicas + } } // count the number of current and update replicas @@ -447,6 +455,19 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( replicas[i].Name) return &status, nil } + // If we have a Pod that has been created but is not available we can not make progress. + // We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its + // ordinal, are Available. + // TODO: Since available is superset of Ready, once we have this featuregate enabled by default, we can remove the + // isRunningAndReady block as only Available pods should be brought down. + if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetMinReadySeconds) && !isRunningAndAvailable(replicas[i], set.Spec.MinReadySeconds) && monotonic { + klog.V(4).Infof( + "StatefulSet %s/%s is waiting for Pod %s to be Available", + set.Namespace, + set.Name, + replicas[i].Name) + return &status, nil + } // Enforce the StatefulSet invariants if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) { continue @@ -458,7 +479,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( } } - // At this point, all of the current Replicas are Running and Ready, we can consider termination. + // At this point, all of the current Replicas are Running, Ready and Available, we can consider termination. // We will wait for all predecessors to be Running and Ready prior to attempting a deletion. // We will terminate Pods in a monotonically decreasing order over [len(pods),set.Spec.Replicas). // Note that we do not resurrect Pods in this interval. Also note that scaling will take precedence over @@ -486,6 +507,17 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( firstUnhealthyPod.Name) return &status, nil } + // if we are in monotonic mode and the condemned target is not the first unhealthy Pod, block. + // TODO: Since available is superset of Ready, once we have this featuregate enabled by default, we can remove the + // isRunningAndReady block as only Available pods should be brought down. + if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetMinReadySeconds) && !isRunningAndAvailable(condemned[target], set.Spec.MinReadySeconds) && monotonic && condemned[target] != firstUnhealthyPod { + klog.V(4).Infof( + "StatefulSet %s/%s is waiting for Pod %s to be Available prior to scale down", + set.Namespace, + set.Name, + firstUnhealthyPod.Name) + return &status, nil + } klog.V(2).Infof("StatefulSet %s/%s terminating Pod %s for scale down", set.Namespace, set.Name, @@ -549,7 +581,6 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( func (ssc *defaultStatefulSetControl) updateStatefulSetStatus( set *apps.StatefulSet, status *apps.StatefulSetStatus) error { - // complete any in progress rolling update if necessary completeRollingUpdate(set, status) diff --git a/pkg/controller/statefulset/stateful_set_control_test.go b/pkg/controller/statefulset/stateful_set_control_test.go index adb6a82b1a6..6911e799fb2 100644 --- a/pkg/controller/statefulset/stateful_set_control_test.go +++ b/pkg/controller/statefulset/stateful_set_control_test.go @@ -34,6 +34,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" utilerrors "k8s.io/apimachinery/pkg/util/errors" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" appsinformers "k8s.io/client-go/informers/apps/v1" coreinformers "k8s.io/client-go/informers/core/v1" @@ -43,9 +44,11 @@ import ( corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + featuregatetesting "k8s.io/component-base/featuregate/testing" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/history" + "k8s.io/kubernetes/pkg/features" ) type invariantFunc func(set *apps.StatefulSet, spc *fakeStatefulPodControl) error @@ -73,6 +76,11 @@ func burst(set *apps.StatefulSet) *apps.StatefulSet { return set } +func setMinReadySeconds(set *apps.StatefulSet, minReadySeconds int32) *apps.StatefulSet { + set.Spec.MinReadySeconds = minReadySeconds + return set +} + func TestStatefulSetControl(t *testing.T) { simpleSetFn := func() *apps.StatefulSet { return newStatefulSet(3) } largeSetFn := func() *apps.StatefulSet { return newStatefulSet(5) } @@ -221,7 +229,7 @@ func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) if err != nil { t.Error(err) } - if err = ssc.UpdateStatefulSet(set, pods); err != nil { + if _, err = ssc.UpdateStatefulSet(set, pods); err != nil { t.Errorf("Failed to update StatefulSet : %s", err) } set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) @@ -231,7 +239,7 @@ func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) if pods, err = spc.setPodRunning(set, i); err != nil { t.Error(err) } - if err = ssc.UpdateStatefulSet(set, pods); err != nil { + if _, err = ssc.UpdateStatefulSet(set, pods); err != nil { t.Errorf("Failed to update StatefulSet : %s", err) } set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) @@ -246,7 +254,7 @@ func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) if err != nil { t.Error(err) } - if err := ssc.UpdateStatefulSet(set, pods); err != nil { + if _, err := ssc.UpdateStatefulSet(set, pods); err != nil { t.Errorf("Failed to update StatefulSet : %s", err) } set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) @@ -270,7 +278,7 @@ func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invarian if err != nil { t.Error(err) } - if err := ssc.UpdateStatefulSet(set, pods); err != nil { + if _, err := ssc.UpdateStatefulSet(set, pods); err != nil { t.Errorf("Error updating StatefulSet %s", err) } if err := invariants(set, spc); err != nil { @@ -282,7 +290,7 @@ func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invarian } pods[0].Status.Phase = v1.PodFailed spc.podsIndexer.Update(pods[0]) - if err := ssc.UpdateStatefulSet(set, pods); err != nil { + if _, err := ssc.UpdateStatefulSet(set, pods); err != nil { t.Errorf("Error updating StatefulSet %s", err) } if err := invariants(set, spc); err != nil { @@ -363,7 +371,7 @@ func UpdatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantF spc.podsIndexer.Update(pods[0]) // now it should fail - if err := ssc.UpdateStatefulSet(set, pods); err != nil && isOrHasInternalError(err) { + if _, err := ssc.UpdateStatefulSet(set, pods); err != nil && isOrHasInternalError(err) { t.Errorf("StatefulSetControl did not return InternalError found %s", err) } } @@ -409,7 +417,7 @@ func PodRecreateDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants in if err != nil { t.Error(err) } - if err := ssc.UpdateStatefulSet(set, pods); err != nil { + if _, err := ssc.UpdateStatefulSet(set, pods); err != nil { t.Errorf("Error updating StatefulSet %s", err) } if err := invariants(set, spc); err != nil { @@ -422,13 +430,13 @@ func PodRecreateDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants in pods[0].Status.Phase = v1.PodFailed spc.podsIndexer.Update(pods[0]) spc.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0) - if err := ssc.UpdateStatefulSet(set, pods); err != nil && isOrHasInternalError(err) { + if _, err := ssc.UpdateStatefulSet(set, pods); err != nil && isOrHasInternalError(err) { t.Errorf("StatefulSet failed to %s", err) } if err := invariants(set, spc); err != nil { t.Error(err) } - if err := ssc.UpdateStatefulSet(set, pods); err != nil { + if _, err := ssc.UpdateStatefulSet(set, pods); err != nil { t.Errorf("Error updating StatefulSet %s", err) } if err := invariants(set, spc); err != nil { @@ -1274,7 +1282,7 @@ func TestStatefulSetControlLimitsHistory(t *testing.T) { if err != nil { t.Fatalf("%s: %s", test.name, err) } - err = ssc.UpdateStatefulSet(set, pods) + _, err = ssc.UpdateStatefulSet(set, pods) if err != nil { t.Fatalf("%s: %s", test.name, err) } @@ -1565,6 +1573,72 @@ func TestStatefulSetControlRollback(t *testing.T) { } } +func TestStatefulSetAvailability(t *testing.T) { + tests := []struct { + name string + inputSTS *apps.StatefulSet + expectedActiveReplicas int32 + readyDuration time.Duration + minReadySecondsFeaturegateEnabled bool + }{ + { + name: "replicas not running for required time, still will be available," + + " when minReadySeconds is disabled", + inputSTS: setMinReadySeconds(newStatefulSet(1), int32(3600)), + readyDuration: 0 * time.Minute, + expectedActiveReplicas: int32(1), + minReadySecondsFeaturegateEnabled: false, + }, + { + name: "replicas running for required time, when minReadySeconds is enabled", + inputSTS: setMinReadySeconds(newStatefulSet(1), int32(3600)), + readyDuration: -120 * time.Minute, + expectedActiveReplicas: int32(1), + minReadySecondsFeaturegateEnabled: true, + }, + { + name: "replicas not running for required time, when minReadySeconds is enabled", + inputSTS: setMinReadySeconds(newStatefulSet(1), int32(3600)), + readyDuration: -30 * time.Minute, + expectedActiveReplicas: int32(0), + minReadySecondsFeaturegateEnabled: true, + }, + } + for _, test := range tests { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetMinReadySeconds, test.minReadySecondsFeaturegateEnabled)() + set := test.inputSTS + client := fake.NewSimpleClientset(set) + spc, _, ssc, stop := setupController(client) + defer close(stop) + if err := scaleUpStatefulSetControl(set, ssc, spc, assertBurstInvariants); err != nil { + t.Fatalf("%s: %s", test.name, err) + } + selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) + if err != nil { + t.Fatalf("%s: %s", test.name, err) + } + _, err = spc.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Fatalf("%s: %s", test.name, err) + } + set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + t.Fatalf("%s: %s", test.name, err) + } + pods, err := spc.setPodAvailable(set, 0, time.Now().Add(test.readyDuration)) + if err != nil { + t.Fatalf("%s: %s", test.name, err) + } + status, err := ssc.UpdateStatefulSet(set, pods) + if err != nil { + t.Fatalf("%s: %s", test.name, err) + } + if status.AvailableReplicas != test.expectedActiveReplicas { + t.Fatalf("expected %d active replicas got %d", test.expectedActiveReplicas, status.AvailableReplicas) + } + } +} + type requestTracker struct { requests int err error @@ -1688,6 +1762,39 @@ func (spc *fakeStatefulPodControl) setPodReady(set *apps.StatefulSet, ordinal in return spc.podsLister.Pods(set.Namespace).List(selector) } +func (spc *fakeStatefulPodControl) setPodAvailable(set *apps.StatefulSet, ordinal int, lastTransitionTime time.Time) ([]*v1.Pod, error) { + selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) + if err != nil { + return nil, err + } + pods, err := spc.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + return nil, err + } + if 0 > ordinal || ordinal >= len(pods) { + return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods)) + } + sort.Sort(ascendingOrdinal(pods)) + pod := pods[ordinal].DeepCopy() + condition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionTrue, LastTransitionTime: metav1.Time{Time: lastTransitionTime}} + _, existingCondition := podutil.GetPodCondition(&pod.Status, condition.Type) + if existingCondition != nil { + existingCondition.Status = v1.ConditionTrue + existingCondition.LastTransitionTime = metav1.Time{Time: lastTransitionTime} + } else { + existingCondition = &v1.PodCondition{ + Type: v1.PodReady, + Status: v1.ConditionTrue, + LastTransitionTime: metav1.Time{Time: lastTransitionTime}, + } + pod.Status.Conditions = append(pod.Status.Conditions, *existingCondition) + } + podutil.UpdatePodCondition(&pod.Status, &condition) + fakeResourceVersion(pod) + spc.podsIndexer.Update(pod) + return spc.podsLister.Pods(set.Namespace).List(selector) +} + func (spc *fakeStatefulPodControl) addTerminatingPod(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) { pod := newStatefulSetPod(set, ordinal) pod.Status.Phase = v1.PodRunning @@ -1981,9 +2088,9 @@ func scaleUpStatefulSetControl(set *apps.StatefulSet, continue } } - // run the controller once and check invariants - if err = ssc.UpdateStatefulSet(set, pods); err != nil { + _, err = ssc.UpdateStatefulSet(set, pods) + if err != nil { return err } set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) @@ -1993,6 +2100,7 @@ func scaleUpStatefulSetControl(set *apps.StatefulSet, if err := invariants(set, spc); err != nil { return err } + //fmt.Printf("Ravig pod conditions %v %v", set.Status.ReadyReplicas, *set.Spec.Replicas) } return invariants(set, spc) } @@ -2009,7 +2117,7 @@ func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlIn } sort.Sort(ascendingOrdinal(pods)) if ordinal := len(pods) - 1; ordinal >= 0 { - if err := ssc.UpdateStatefulSet(set, pods); err != nil { + if _, err := ssc.UpdateStatefulSet(set, pods); err != nil { return err } set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) @@ -2019,7 +2127,7 @@ func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlIn if pods, err = spc.addTerminatingPod(set, ordinal); err != nil { return err } - if err = ssc.UpdateStatefulSet(set, pods); err != nil { + if _, err = ssc.UpdateStatefulSet(set, pods); err != nil { return err } set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) @@ -2036,7 +2144,7 @@ func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlIn spc.podsIndexer.Delete(pods[len(pods)-1]) } } - if err := ssc.UpdateStatefulSet(set, pods); err != nil { + if _, err := ssc.UpdateStatefulSet(set, pods); err != nil { return err } set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) @@ -2099,7 +2207,7 @@ func updateStatefulSetControl(set *apps.StatefulSet, if err != nil { return err } - if err = ssc.UpdateStatefulSet(set, pods); err != nil { + if _, err = ssc.UpdateStatefulSet(set, pods); err != nil { return err } @@ -2147,7 +2255,7 @@ func updateStatefulSetControl(set *apps.StatefulSet, } } - if err = ssc.UpdateStatefulSet(set, pods); err != nil { + if _, err = ssc.UpdateStatefulSet(set, pods); err != nil { return err } set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) diff --git a/pkg/controller/statefulset/stateful_set_status_updater_test.go b/pkg/controller/statefulset/stateful_set_status_updater_test.go index 74d67812da2..4eb9f63a831 100644 --- a/pkg/controller/statefulset/stateful_set_status_updater_test.go +++ b/pkg/controller/statefulset/stateful_set_status_updater_test.go @@ -20,15 +20,16 @@ import ( "errors" "testing" + apps "k8s.io/api/apps/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" - - core "k8s.io/client-go/testing" - "k8s.io/client-go/tools/cache" - - apps "k8s.io/api/apps/v1" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/kubernetes/fake" appslisters "k8s.io/client-go/listers/apps/v1" + core "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/kubernetes/pkg/features" ) func TestStatefulSetUpdaterUpdatesSetStatus(t *testing.T) { @@ -124,3 +125,21 @@ func TestStatefulSetStatusUpdaterUpdateReplicasConflictFailure(t *testing.T) { t.Error("UpdateStatefulSetStatus failed to return an error on get failure") } } + +func TestStatefulSetStatusUpdaterGetAvailableReplicas(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetMinReadySeconds, true)() + set := newStatefulSet(3) + status := apps.StatefulSetStatus{ObservedGeneration: 1, Replicas: 2, AvailableReplicas: 3} + fakeClient := &fake.Clientset{} + updater := NewRealStatefulSetStatusUpdater(fakeClient, nil) + fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) { + update := action.(core.UpdateAction) + return true, update.GetObject(), nil + }) + if err := updater.UpdateStatefulSetStatus(set, &status); err != nil { + t.Errorf("Error returned on successful status update: %s", err) + } + if set.Status.AvailableReplicas != 3 { + t.Errorf("UpdateStatefulSetStatus mutated the sets replicas %d", set.Status.AvailableReplicas) + } +} diff --git a/pkg/controller/statefulset/stateful_set_utils.go b/pkg/controller/statefulset/stateful_set_utils.go index 12600fd6323..39f061de6cb 100644 --- a/pkg/controller/statefulset/stateful_set_utils.go +++ b/pkg/controller/statefulset/stateful_set_utils.go @@ -204,6 +204,10 @@ func isRunningAndReady(pod *v1.Pod) bool { return pod.Status.Phase == v1.PodRunning && podutil.IsPodReady(pod) } +func isRunningAndAvailable(pod *v1.Pod, minReadySeconds int32) bool { + return podutil.IsPodAvailable(pod, minReadySeconds, metav1.Now()) +} + // isCreated returns true if pod has been created and is maintained by the API server func isCreated(pod *v1.Pod) bool { return pod.Status.Phase != "" @@ -366,6 +370,7 @@ func inconsistentStatus(set *apps.StatefulSet, status *apps.StatefulSetStatus) b status.ReadyReplicas != set.Status.ReadyReplicas || status.UpdatedReplicas != set.Status.UpdatedReplicas || status.CurrentRevision != set.Status.CurrentRevision || + status.AvailableReplicas != set.Status.AvailableReplicas || status.UpdateRevision != set.Status.UpdateRevision } diff --git a/test/integration/statefulset/statefulset_test.go b/test/integration/statefulset/statefulset_test.go index ce72e73048b..678e07eae91 100644 --- a/test/integration/statefulset/statefulset_test.go +++ b/test/integration/statefulset/statefulset_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "testing" + "time" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" @@ -27,11 +28,21 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/dynamic" + clientset "k8s.io/client-go/kubernetes" + featuregatetesting "k8s.io/component-base/featuregate/testing" apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/test/integration/framework" ) +const ( + interval = 100 * time.Millisecond + timeout = 60 * time.Second +) + // TestVolumeTemplateNoopUpdate ensures embedded StatefulSet objects with embedded PersistentVolumes can be updated func TestVolumeTemplateNoopUpdate(t *testing.T) { // Start the server with default storage setup @@ -226,3 +237,115 @@ func TestDeletingAndFailedPods(t *testing.T) { t.Fatalf("failed to verify deleting pod %s has been replaced with a new non-deleting pod: %v", deletingPod.Name, err) } } + +func TestStatefulSetAvailable(t *testing.T) { + tests := []struct { + name string + totalReplicas int32 + readyReplicas int32 + activeReplicas int32 + enabled bool + }{ + { + name: "When feature gate is enabled, only certain replicas would become active", + totalReplicas: 4, + readyReplicas: 3, + activeReplicas: 2, + enabled: true, + }, + { + name: "When feature gate is disabled, all the ready replicas would become active", + totalReplicas: 4, + readyReplicas: 3, + activeReplicas: 3, + enabled: false, + }, + } + for _, test := range tests { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetMinReadySeconds, test.enabled)() + s, closeFn, rm, informers, c := scSetup(t) + defer closeFn() + ns := framework.CreateTestingNamespace("test-available-pods", s, t) + defer framework.DeleteTestingNamespace(ns, s, t) + stopCh := runControllerAndInformers(rm, informers) + defer close(stopCh) + + labelMap := labelMap() + sts := newSTS("sts", ns.Name, 4) + sts.Spec.MinReadySeconds = int32(3600) + stss, _ := createSTSsPods(t, c, []*appsv1.StatefulSet{sts}, []*v1.Pod{}) + sts = stss[0] + waitSTSStable(t, c, sts) + + // Verify STS creates 4 pods + podClient := c.CoreV1().Pods(ns.Name) + pods := getPods(t, podClient, labelMap) + if len(pods.Items) != 4 { + t.Fatalf("len(pods) = %d, want 4", len(pods.Items)) + } + + // Separate 3 pods into their own list + firstPodList := &v1.PodList{Items: pods.Items[:1]} + secondPodList := &v1.PodList{Items: pods.Items[1:2]} + thirdPodList := &v1.PodList{Items: pods.Items[2:]} + // First pod: Running, but not Ready + // by setting the Ready condition to false with LastTransitionTime to be now + setPodsReadyCondition(t, c, firstPodList, v1.ConditionFalse, time.Now()) + // Second pod: Running and Ready, but not Available + // by setting LastTransitionTime to now + setPodsReadyCondition(t, c, secondPodList, v1.ConditionTrue, time.Now()) + // Third pod: Running, Ready, and Available + // by setting LastTransitionTime to more than 3600 seconds ago + setPodsReadyCondition(t, c, thirdPodList, v1.ConditionTrue, time.Now().Add(-120*time.Minute)) + + stsClient := c.AppsV1().StatefulSets(ns.Name) + if err := wait.PollImmediate(interval, timeout, func() (bool, error) { + newSts, err := stsClient.Get(context.TODO(), sts.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + // Verify 4 pods exist, 3 pods are Ready, and 2 pods are Available + return newSts.Status.Replicas == test.totalReplicas && newSts.Status.ReadyReplicas == test.readyReplicas && newSts.Status.AvailableReplicas == test.activeReplicas, nil + }); err != nil { + t.Fatalf("Failed to verify number of Replicas, ReadyReplicas and AvailableReplicas of rs %s to be as expected: %v", sts.Name, err) + } + } +} + +func setPodsReadyCondition(t *testing.T, clientSet clientset.Interface, pods *v1.PodList, conditionStatus v1.ConditionStatus, lastTransitionTime time.Time) { + replicas := int32(len(pods.Items)) + var readyPods int32 + err := wait.PollImmediate(interval, timeout, func() (bool, error) { + readyPods = 0 + for i := range pods.Items { + pod := &pods.Items[i] + if podutil.IsPodReady(pod) { + readyPods++ + continue + } + pod.Status.Phase = v1.PodRunning + _, condition := podutil.GetPodCondition(&pod.Status, v1.PodReady) + if condition != nil { + condition.Status = conditionStatus + condition.LastTransitionTime = metav1.Time{Time: lastTransitionTime} + } else { + condition = &v1.PodCondition{ + Type: v1.PodReady, + Status: conditionStatus, + LastTransitionTime: metav1.Time{Time: lastTransitionTime}, + } + pod.Status.Conditions = append(pod.Status.Conditions, *condition) + } + _, err := clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{}) + if err != nil { + // When status fails to be updated, we continue to next pod + continue + } + readyPods++ + } + return readyPods >= replicas, nil + }) + if err != nil { + t.Fatalf("failed to mark all StatefulSet pods to ready: %v", err) + } +}