From e7c2ecf799f3601b5eb810827a5f321b684e87b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filip=20K=C5=99epinsk=C3=BD?= Date: Mon, 24 Nov 2025 14:39:50 +0100 Subject: [PATCH 1/4] wire now (time) to the availability checks in the StatefulSet controller - this helps to make the controller reconcilliation consistent --- pkg/controller/statefulset/stateful_set.go | 8 +- .../statefulset/stateful_set_control.go | 73 +++++++----------- .../statefulset/stateful_set_control_test.go | 76 +++++++++---------- .../statefulset/stateful_set_utils.go | 9 ++- 4 files changed, 77 insertions(+), 89 deletions(-) diff --git a/pkg/controller/statefulset/stateful_set.go b/pkg/controller/statefulset/stateful_set.go index 257396f2809..c57b7f16c48 100644 --- a/pkg/controller/statefulset/stateful_set.go +++ b/pkg/controller/statefulset/stateful_set.go @@ -44,6 +44,7 @@ import ( "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/history" "k8s.io/kubernetes/pkg/controller/statefulset/metrics" + "k8s.io/utils/clock" "k8s.io/klog/v2" ) @@ -81,6 +82,7 @@ type StatefulSetController struct { queue workqueue.TypedRateLimitingInterface[string] // eventBroadcaster is the core of event processing pipeline. eventBroadcaster record.EventBroadcaster + clock clock.PassiveClock } // NewStatefulSetController creates a new statefulset controller. @@ -118,6 +120,7 @@ func NewStatefulSetController( podControl: controller.RealPodControl{KubeClient: kubeClient, Recorder: recorder}, eventBroadcaster: eventBroadcaster, + clock: clock.RealClock{}, } podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -463,7 +466,7 @@ func (ssc *StatefulSetController) worker(ctx context.Context) { // sync syncs the given statefulset. func (ssc *StatefulSetController) sync(ctx context.Context, key string) error { - startTime := time.Now() + startTime := ssc.clock.Now() logger := klog.FromContext(ctx) defer func() { logger.V(4).Info("Finished syncing statefulset", "key", key, "time", time.Since(startTime)) @@ -508,7 +511,8 @@ func (ssc *StatefulSetController) syncStatefulSet(ctx context.Context, set *apps logger.V(4).Info("Syncing StatefulSet with pods", "statefulSet", klog.KObj(set), "pods", len(pods)) var status *apps.StatefulSetStatus var err error - status, err = ssc.control.UpdateStatefulSet(ctx, set, pods) + now := ssc.clock.Now() + status, err = ssc.control.UpdateStatefulSet(ctx, set, pods, now) if err != nil { return err } diff --git a/pkg/controller/statefulset/stateful_set_control.go b/pkg/controller/statefulset/stateful_set_control.go index 888683457c3..dd339cc7814 100644 --- a/pkg/controller/statefulset/stateful_set_control.go +++ b/pkg/controller/statefulset/stateful_set_control.go @@ -20,6 +20,7 @@ import ( "context" "sort" "sync" + "time" "k8s.io/klog/v2" "k8s.io/utils/lru" @@ -48,7 +49,7 @@ type StatefulSetControlInterface interface { // If an implementation returns a non-nil error, the invocation will be retried using a rate-limited strategy. // Implementors should sink any errors that they do not wish to trigger a retry, and they may feel free to // exit exceptionally at any point provided they wish the update to be re-run at a later point in time. - UpdateStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error) + UpdateStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod, now time.Time) (*apps.StatefulSetStatus, error) // ListRevisions returns a array of the ControllerRevisions that represent the revisions of set. If the returned // error is nil, the returns slice of ControllerRevisions is valid. ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error) @@ -83,7 +84,7 @@ type defaultStatefulSetControl struct { // strategy allows these constraints to be relaxed - pods will be created and deleted eagerly and // in no particular order. Clients using the burst strategy should be careful to ensure they // understand the consistency implications of having unpredictable numbers of pods available. -func (ssc *defaultStatefulSetControl) UpdateStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error) { +func (ssc *defaultStatefulSetControl) UpdateStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod, now time.Time) (*apps.StatefulSetStatus, error) { set = set.DeepCopy() // set is modified when a new revision is created in performUpdate. Make a copy now to avoid mutation errors. // list all revisions and sort them @@ -93,7 +94,7 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(ctx context.Context, set } history.SortControllerRevisions(revisions) - currentRevision, updateRevision, status, err := ssc.performUpdate(ctx, set, pods, revisions) + currentRevision, updateRevision, status, err := ssc.performUpdate(ctx, set, pods, revisions, now) if err != nil { errs := []error{err} if agg, ok := err.(utilerrors.Aggregate); ok { @@ -107,7 +108,7 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(ctx context.Context, set } func (ssc *defaultStatefulSetControl) performUpdate( - ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod, revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, *apps.StatefulSetStatus, error) { + ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod, revisions []*apps.ControllerRevision, now time.Time) (*apps.ControllerRevision, *apps.ControllerRevision, *apps.StatefulSetStatus, error) { var currentStatus *apps.StatefulSetStatus logger := klog.FromContext(ctx) // get the current, and update revisions @@ -117,7 +118,7 @@ func (ssc *defaultStatefulSetControl) performUpdate( } // perform the main update function and get the status - currentStatus, err = ssc.updateStatefulSet(ctx, set, currentRevision, updateRevision, collisionCount, pods) + currentStatus, err = ssc.updateStatefulSet(ctx, set, currentRevision, updateRevision, collisionCount, pods, now) if err != nil && currentStatus == nil { return currentRevision, updateRevision, nil, err } @@ -367,7 +368,7 @@ type replicaStatus struct { updatedReplicas int32 } -func computeReplicaStatus(pods []*v1.Pod, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision) replicaStatus { +func computeReplicaStatus(pods []*v1.Pod, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision, now time.Time) replicaStatus { status := replicaStatus{} for _, pod := range pods { if isCreated(pod) { @@ -378,7 +379,7 @@ func computeReplicaStatus(pods []*v1.Pod, minReadySeconds int32, currentRevision if isRunningAndReady(pod) { status.readyReplicas++ // count the number of running and available replicas - if isRunningAndAvailable(pod, minReadySeconds) { + if isRunningAndAvailable(pod, minReadySeconds, now) { status.availableReplicas++ } @@ -398,14 +399,14 @@ func computeReplicaStatus(pods []*v1.Pod, minReadySeconds int32, currentRevision return status } -func updateStatus(status *apps.StatefulSetStatus, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision, podLists ...[]*v1.Pod) { +func updateStatus(status *apps.StatefulSetStatus, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision, now time.Time, podLists ...[]*v1.Pod) { status.Replicas = 0 status.ReadyReplicas = 0 status.AvailableReplicas = 0 status.CurrentReplicas = 0 status.UpdatedReplicas = 0 for _, list := range podLists { - replicaStatus := computeReplicaStatus(list, minReadySeconds, currentRevision, updateRevision) + replicaStatus := computeReplicaStatus(list, minReadySeconds, currentRevision, updateRevision, now) status.Replicas += replicaStatus.replicas status.ReadyReplicas += replicaStatus.readyReplicas status.AvailableReplicas += replicaStatus.availableReplicas @@ -414,13 +415,7 @@ func updateStatus(status *apps.StatefulSetStatus, minReadySeconds int32, current } } -func (ssc *defaultStatefulSetControl) processReplica( - ctx context.Context, - set *apps.StatefulSet, - updateSet *apps.StatefulSet, - monotonic bool, - replicas []*v1.Pod, - i int) (bool, error) { +func (ssc *defaultStatefulSetControl) processReplica(ctx context.Context, set *apps.StatefulSet, updateSet *apps.StatefulSet, monotonic bool, replicas []*v1.Pod, i int, now time.Time) (bool, error) { logger := klog.FromContext(ctx) // Note that pods with phase Succeeded will also trigger this event. This is @@ -484,7 +479,7 @@ func (ssc *defaultStatefulSetControl) processReplica( // If we have a Pod that has been created but is not available we can not make progress. // We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its // ordinal, are Available. - if !isRunningAndAvailable(replicas[i], set.Spec.MinReadySeconds) && monotonic { + if !isRunningAndAvailable(replicas[i], set.Spec.MinReadySeconds, now) && monotonic { logger.V(4).Info("StatefulSet is waiting for Pod to be Available", "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i])) return true, nil @@ -510,7 +505,7 @@ func (ssc *defaultStatefulSetControl) processReplica( return false, nil } -func (ssc *defaultStatefulSetControl) processCondemned(ctx context.Context, set *apps.StatefulSet, firstUnhealthyPod *v1.Pod, monotonic bool, condemned []*v1.Pod, i int) (bool, error) { +func (ssc *defaultStatefulSetControl) processCondemned(ctx context.Context, set *apps.StatefulSet, firstUnhealthyPod *v1.Pod, monotonic bool, condemned []*v1.Pod, i int, now time.Time) (bool, error) { logger := klog.FromContext(ctx) if isTerminating(condemned[i]) { // if we are in monotonic mode, block and wait for terminating pods to expire @@ -528,7 +523,7 @@ func (ssc *defaultStatefulSetControl) processCondemned(ctx context.Context, set return true, nil } // if we are in monotonic mode and the condemned target is not the first unhealthy Pod, block. - if !isRunningAndAvailable(condemned[i], set.Spec.MinReadySeconds) && monotonic && condemned[i] != firstUnhealthyPod { + if !isRunningAndAvailable(condemned[i], set.Spec.MinReadySeconds, now) && monotonic && condemned[i] != firstUnhealthyPod { logger.V(4).Info("StatefulSet is waiting for Pod to be Available prior to scale down", "statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod)) return true, nil @@ -563,13 +558,7 @@ func runForAll(pods []*v1.Pod, fn func(i int) (bool, error), monotonic bool) (bo // all Pods with ordinal less than UpdateStrategy.Partition.Ordinal must be at Status.CurrentRevision and all other // Pods must be at Status.UpdateRevision. If the returned error is nil, the returned StatefulSetStatus is valid and the // update must be recorded. If the error is not nil, the method should be retried until successful. -func (ssc *defaultStatefulSetControl) updateStatefulSet( - ctx context.Context, - set *apps.StatefulSet, - currentRevision *apps.ControllerRevision, - updateRevision *apps.ControllerRevision, - collisionCount int32, - pods []*v1.Pod) (*apps.StatefulSetStatus, error) { +func (ssc *defaultStatefulSetControl) updateStatefulSet(ctx context.Context, set *apps.StatefulSet, currentRevision *apps.ControllerRevision, updateRevision *apps.ControllerRevision, collisionCount int32, pods []*v1.Pod, now time.Time) (*apps.StatefulSetStatus, error) { logger := klog.FromContext(ctx) // get the current and update revisions of the set. currentSet, err := ApplyRevision(set, currentRevision) @@ -589,7 +578,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( status.CollisionCount = new(int32) *status.CollisionCount = collisionCount - updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, pods) + updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, now, pods) replicaCount := int(*set.Spec.Replicas) // slice that will contain all Pods such that getStartOrdinal(set) <= getOrdinal(pod) <= getEndOrdinal(set) @@ -630,7 +619,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( // find the first unhealthy Pod for i := range replicas { - if isUnavailable(replicas[i], set.Spec.MinReadySeconds) { + if isUnavailable(replicas[i], set.Spec.MinReadySeconds, now) { unavailable++ if firstUnavailablePod == nil { firstUnavailablePod = replicas[i] @@ -640,7 +629,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( // or the first unhealthy condemned Pod (condemned are sorted in descending order for ease of use) for i := len(condemned) - 1; i >= 0; i-- { - if isUnavailable(condemned[i], set.Spec.MinReadySeconds) { + if isUnavailable(condemned[i], set.Spec.MinReadySeconds, now) { unavailable++ if firstUnavailablePod == nil { firstUnavailablePod = condemned[i] @@ -662,10 +651,10 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( // First, process each living replica. Exit if we run into an error or something blocking in monotonic mode. processReplicaFn := func(i int) (bool, error) { - return ssc.processReplica(ctx, set, updateSet, monotonic, replicas, i) + return ssc.processReplica(ctx, set, updateSet, monotonic, replicas, i, now) } if shouldExit, err := runForAll(replicas, processReplicaFn, monotonic); shouldExit || err != nil { - updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned) + updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, now, replicas, condemned) return &status, err } @@ -681,7 +670,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( return false, nil } if shouldExit, err := runForAll(condemned, fixPodClaim, monotonic); shouldExit || err != nil { - updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned) + updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, now, replicas, condemned) return &status, err } @@ -692,14 +681,14 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( // Note that we do not resurrect Pods in this interval. Also note that scaling will take precedence over // updates. processCondemnedFn := func(i int) (bool, error) { - return ssc.processCondemned(ctx, set, firstUnavailablePod, monotonic, condemned, i) + return ssc.processCondemned(ctx, set, firstUnavailablePod, monotonic, condemned, i, now) } if shouldExit, err := runForAll(condemned, processCondemnedFn, monotonic); shouldExit || err != nil { - updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned) + updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, now, replicas, condemned) return &status, err } - updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned) + updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, now, replicas, condemned) // for the OnDelete strategy we short circuit. Pods will be updated when they are manually deleted. if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType { @@ -713,6 +702,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( replicas, updateRevision, status, + now, ) } @@ -738,7 +728,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( } // wait for unavailable Pods on update - if isUnavailable(replicas[target], set.Spec.MinReadySeconds) { + if isUnavailable(replicas[target], set.Spec.MinReadySeconds, now) { logger.V(4).Info("StatefulSet is waiting for Pod to update", "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[target])) return &status, nil @@ -748,14 +738,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( return &status, nil } -func updateStatefulSetAfterInvariantEstablished( - ctx context.Context, - ssc *defaultStatefulSetControl, - set *apps.StatefulSet, - replicas []*v1.Pod, - updateRevision *apps.ControllerRevision, - status apps.StatefulSetStatus, -) (*apps.StatefulSetStatus, error) { +func updateStatefulSetAfterInvariantEstablished(ctx context.Context, ssc *defaultStatefulSetControl, set *apps.StatefulSet, replicas []*v1.Pod, updateRevision *apps.ControllerRevision, status apps.StatefulSetStatus, now time.Time) (*apps.StatefulSetStatus, error) { logger := klog.FromContext(ctx) replicaCount := int(*set.Spec.Replicas) @@ -783,7 +766,7 @@ func updateStatefulSetAfterInvariantEstablished( unavailablePods := 0 for target := len(replicas) - 1; target >= 0; target-- { - if isUnavailable(replicas[target], set.Spec.MinReadySeconds) { + if isUnavailable(replicas[target], set.Spec.MinReadySeconds, now) { unavailablePods++ } } diff --git a/pkg/controller/statefulset/stateful_set_control_test.go b/pkg/controller/statefulset/stateful_set_control_test.go index 7fd7d529879..734572e86d4 100644 --- a/pkg/controller/statefulset/stateful_set_control_test.go +++ b/pkg/controller/statefulset/stateful_set_control_test.go @@ -359,7 +359,7 @@ func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) if err != nil { t.Error(err) } - if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil { t.Errorf("Failed to update StatefulSet : %s", err) } set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name) @@ -369,7 +369,7 @@ func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) if pods, err = om.setPodRunning(set, i); err != nil { t.Error(err) } - if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil { t.Errorf("Failed to update StatefulSet : %s", err) } set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name) @@ -384,7 +384,7 @@ func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) if err != nil { t.Error(err) } - if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil { t.Errorf("Failed to update StatefulSet : %s", err) } set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name) @@ -408,7 +408,7 @@ func recreatesPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc, if err != nil { t.Error(err) } - if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil { t.Errorf("Error updating StatefulSet %s", err) } if err := invariants(set, om); err != nil { @@ -442,7 +442,7 @@ func recreatesPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc, // Expect pod deletion failure om.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0) expectedNumOfDeleteRequests++ - if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); !isOrHasInternalError(err) { + if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); !isOrHasInternalError(err) { t.Errorf("StatefulSetControl did not return InternalError, found %s", err) } if err := invariants(set, om); err != nil { @@ -458,7 +458,7 @@ func recreatesPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc, // Expect pod deletion expectedNumOfDeleteRequests++ - if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil { t.Errorf("Error updating StatefulSet %s", err) } if err := invariants(set, om); err != nil { @@ -472,7 +472,7 @@ func recreatesPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc, } // Expect no additional delete calls and expect pod creation - if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil { t.Errorf("Error updating StatefulSet %s", err) } if err := invariants(set, om); err != nil { @@ -584,7 +584,7 @@ func UpdatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantF om.podsIndexer.Update(pods[0]) // now it should fail - if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); !isOrHasInternalError(err) { + if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); !isOrHasInternalError(err) { t.Errorf("StatefulSetControl did not return InternalError, found %s", err) } } @@ -657,7 +657,7 @@ func NewRevisionDeletePodFailure(t *testing.T, set *apps.StatefulSet, invariants // delete fails om.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0) - _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods) + _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()) if err == nil { t.Error("Expected err in update StatefulSet when deleting a pod") } @@ -678,7 +678,7 @@ func NewRevisionDeletePodFailure(t *testing.T, set *apps.StatefulSet, invariants // delete works om.SetDeleteStatefulPodError(nil, 0) - status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods) + status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()) if err != nil { t.Fatalf("Unexpected err in update StatefulSet: %v", err) } @@ -767,7 +767,7 @@ func RecreatesPVCForPendingPod(t *testing.T, set *apps.StatefulSet, invariants i if err != nil { t.Error(err) } - if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil { t.Errorf("Error updating StatefulSet %s", err) } if err := invariants(set, om); err != nil { @@ -782,7 +782,7 @@ func RecreatesPVCForPendingPod(t *testing.T, set *apps.StatefulSet, invariants i } pods[0].Status.Phase = v1.PodPending om.podsIndexer.Update(pods[0]) - if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil { t.Errorf("Error updating StatefulSet %s", err) } // invariants check if there any missing PVCs for the Pods @@ -994,7 +994,7 @@ func TestStatefulSetControlRollingUpdateWithMaxUnavailable(t *testing.T) { spc.setPodRunning(set, 4) originalPods, _ := spc.setPodReadyCondition(set, 4, true) sort.Sort(ascendingOrdinal(originalPods)) - if _, err := ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil { + if _, err := ssc.UpdateStatefulSet(context.TODO(), set, originalPods, time.Now()); err != nil { t.Fatal(err) } pods, err := spc.podsLister.Pods(set.Namespace).List(selector) @@ -1008,7 +1008,7 @@ func TestStatefulSetControlRollingUpdateWithMaxUnavailable(t *testing.T) { } // create new pod 3 - if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil { t.Fatal(err) } pods, err = spc.podsLister.Pods(set.Namespace).List(selector) @@ -1037,7 +1037,7 @@ func TestStatefulSetControlRollingUpdateWithMaxUnavailable(t *testing.T) { spc.setPodReadyCondition(set, 4, true) // create new pod 4 (only one pod gets created at a time due to OrderedReady) - if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil { t.Fatal(err) } pods, err := spc.podsLister.Pods(set.Namespace).List(selector) @@ -1106,7 +1106,7 @@ func TestStatefulSetControlRollingUpdateWithMaxUnavailable(t *testing.T) { sort.Sort(ascendingOrdinal(originalPods)) // since maxUnavailable is 2, update pods 4 and 5, this will delete the pod 4 and 5, - if _, err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil { + if _, err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods, time.Now()); err != nil { t.Fatal(err) } pods, err := spc.podsLister.Pods(set.Namespace).List(selector) @@ -1122,7 +1122,7 @@ func TestStatefulSetControlRollingUpdateWithMaxUnavailable(t *testing.T) { } // create new pods - if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil { t.Fatal(err) } pods, err = spc.podsLister.Pods(set.Namespace).List(selector) @@ -1138,7 +1138,7 @@ func TestStatefulSetControlRollingUpdateWithMaxUnavailable(t *testing.T) { spc.setPodReadyCondition(set, 5, true) originalPods, _ = spc.setPodReadyCondition(set, 3, true) sort.Sort(ascendingOrdinal(originalPods)) - if _, err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil { + if _, err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods, time.Now()); err != nil { t.Fatal(err) } pods, err = spc.podsLister.Pods(set.Namespace).List(selector) @@ -1234,7 +1234,7 @@ func TestStatefulSetControlRollingUpdateWithMaxUnavailableInOrderedModeVerifyInv // try to update the statefulset // this function is only called in main code when feature gate is enabled - if _, err = updateStatefulSetAfterInvariantEstablished(context.TODO(), ssc.(*defaultStatefulSetControl), set, originalPods, updateRevision, status); err != nil { + if _, err = updateStatefulSetAfterInvariantEstablished(context.TODO(), ssc.(*defaultStatefulSetControl), set, originalPods, updateRevision, status, time.Now()); err != nil { t.Fatal(err) } pods, err := spc.podsLister.Pods(set.Namespace).List(selector) @@ -1969,7 +1969,7 @@ func TestStatefulSetControlLimitsHistory(t *testing.T) { if err != nil { t.Fatalf("%s: %s", test.name, err) } - _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods) + _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()) if err != nil { t.Fatalf("%s: %s", test.name, err) } @@ -2336,7 +2336,7 @@ func TestStatefulSetAvailability(t *testing.T) { if err != nil { t.Fatalf("%s: %s", test.name, err) } - status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods) + status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()) if err != nil { t.Fatalf("%s: %s", test.name, err) } @@ -2407,7 +2407,7 @@ func TestStatefulSetStatusUpdate(t *testing.T) { if err != nil { t.Error(err) } - _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods) + _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()) if ssu.updateStatusTracker.requests != 1 { t.Errorf("Did not update status") } @@ -3077,7 +3077,7 @@ func parallelScaleUpStatefulSetControl(set *apps.StatefulSet, } // run the controller once and check invariants - _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods) + _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()) if err != nil { return err } @@ -3113,14 +3113,14 @@ func parallelScaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetC return err } sort.Sort(ascendingOrdinal(pods)) - if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil { return err } set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name) if err != nil { return err } - if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil { return err } } @@ -3179,7 +3179,7 @@ func scaleUpStatefulSetControl(set *apps.StatefulSet, } } // run the controller once and check invariants - _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods) + _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()) if err != nil { return err } @@ -3207,7 +3207,7 @@ func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlIn } sort.Sort(ascendingOrdinal(pods)) if idx := len(pods) - 1; idx >= 0 { - if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil { return err } set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name) @@ -3217,7 +3217,7 @@ func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlIn if pods, err = om.addTerminatingPod(set, getOrdinal(pods[idx])); err != nil { return err } - if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil { return err } set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name) @@ -3234,7 +3234,7 @@ func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlIn om.podsIndexer.Delete(pods[len(pods)-1]) } } - if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil { return err } set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name) @@ -3327,7 +3327,7 @@ func updateStatefulSetControl(set *apps.StatefulSet, if err != nil { return err } - if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil { return err } @@ -3375,7 +3375,7 @@ func updateStatefulSetControl(set *apps.StatefulSet, } } - if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil { return err } set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name) @@ -3453,7 +3453,7 @@ func TestStatefulSetRollingUpdateRespectsMinReadySeconds(t *testing.T) { } // Perform update - should be blocked because pods haven't been ready for 30 seconds - status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods) + status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()) if err != nil { t.Fatalf("failed to update StatefulSet: %s", err) } @@ -3483,7 +3483,7 @@ func TestStatefulSetRollingUpdateRespectsMinReadySeconds(t *testing.T) { } // Perform update again - now should proceed - status, err = ssc.UpdateStatefulSet(context.TODO(), set, pods) + status, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()) if err != nil { t.Fatalf("failed to update StatefulSet: %s", err) } @@ -3537,7 +3537,7 @@ func TestStatefulSetScaleDownRespectsMinReadySeconds(t *testing.T) { } // Scale down should be blocked because pods haven't been available for 30 seconds - status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods) + status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()) if err != nil { t.Fatalf("failed to update StatefulSet: %s", err) } @@ -3600,7 +3600,7 @@ func TestStatefulSetOnDeleteStrategyIgnoresMinReadySeconds(t *testing.T) { } // OnDelete strategy should complete regardless of MinReadySeconds - status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods) + status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()) if err != nil { t.Fatalf("failed to update StatefulSet: %s", err) } @@ -3645,7 +3645,7 @@ func TestStatefulSetZeroMinReadySeconds(t *testing.T) { } // With zero MinReadySeconds, pods should be immediately available - status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods) + status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()) if err != nil { t.Fatalf("failed to update StatefulSet: %s", err) } @@ -3695,7 +3695,7 @@ func TestStatefulSetPartitionRollingUpdateWithMinReadySeconds(t *testing.T) { } // Rolling update should be blocked because pods haven't been available for 30 seconds - status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods) + status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()) if err != nil { t.Fatalf("failed to update StatefulSet: %s", err) } @@ -3798,7 +3798,7 @@ func TestStatefulSetMetrics(t *testing.T) { ReadyReplicas: readyReplicas, } updateRevision := &apps.ControllerRevision{} - _, err = updateStatefulSetAfterInvariantEstablished(context.TODO(), ssc.(*defaultStatefulSetControl), set, pods, updateRevision, status) + _, err = updateStatefulSetAfterInvariantEstablished(context.TODO(), ssc.(*defaultStatefulSetControl), set, pods, updateRevision, status, time.Now()) if err != nil { t.Fatal(err) } diff --git a/pkg/controller/statefulset/stateful_set_utils.go b/pkg/controller/statefulset/stateful_set_utils.go index 27465419bc0..b63aa62dfda 100644 --- a/pkg/controller/statefulset/stateful_set_utils.go +++ b/pkg/controller/statefulset/stateful_set_utils.go @@ -21,6 +21,7 @@ import ( "fmt" "regexp" "strconv" + "time" apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" @@ -457,8 +458,8 @@ func isRunningAndReady(pod *v1.Pod) bool { return pod.Status.Phase == v1.PodRunning && podutil.IsPodReady(pod) } -func isRunningAndAvailable(pod *v1.Pod, minReadySeconds int32) bool { - return podutil.IsPodAvailable(pod, minReadySeconds, metav1.Now()) +func isRunningAndAvailable(pod *v1.Pod, minReadySeconds int32, now time.Time) bool { + return podutil.IsPodAvailable(pod, minReadySeconds, metav1.Time{Time: now}) } // isCreated returns true if pod has been created and is maintained by the API server @@ -487,8 +488,8 @@ func isTerminating(pod *v1.Pod) bool { } // isUnavailable returns true if pod is not available or if it is terminating -func isUnavailable(pod *v1.Pod, minReadySeconds int32) bool { - return !isRunningAndAvailable(pod, minReadySeconds) || isTerminating(pod) +func isUnavailable(pod *v1.Pod, minReadySeconds int32, now time.Time) bool { + return !podutil.IsPodAvailable(pod, minReadySeconds, metav1.Time{Time: now}) || isTerminating(pod) } // allowsBurst is true if the alpha burst annotation is set. From f8578e8d8b72cd1049da8b2785fc5a9b825aa0f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filip=20K=C5=99epinsk=C3=BD?= Date: Mon, 24 Nov 2025 14:40:31 +0100 Subject: [PATCH 2/4] schedule pod availability checks at the correct time in StatefulSets --- pkg/controller/statefulset/stateful_set.go | 26 ++++++++++++++++------ 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/pkg/controller/statefulset/stateful_set.go b/pkg/controller/statefulset/stateful_set.go index c57b7f16c48..922df499c1c 100644 --- a/pkg/controller/statefulset/stateful_set.go +++ b/pkg/controller/statefulset/stateful_set.go @@ -45,6 +45,7 @@ import ( "k8s.io/kubernetes/pkg/controller/history" "k8s.io/kubernetes/pkg/controller/statefulset/metrics" "k8s.io/utils/clock" + "k8s.io/utils/ptr" "k8s.io/klog/v2" ) @@ -269,9 +270,10 @@ func (ssc *StatefulSetController) updatePod(logger klog.Logger, old, cur interfa // having its status updated with the newly available replica. if !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && set.Spec.MinReadySeconds > 0 { logger.V(2).Info("StatefulSet will be enqueued after minReadySeconds for availability check", "statefulSet", klog.KObj(set), "minReadySeconds", set.Spec.MinReadySeconds) - // Add a second to avoid milliseconds skew in AddAfter. - // See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info. - ssc.enqueueSSAfter(logger, set, (time.Duration(set.Spec.MinReadySeconds)*time.Second)+time.Second) + // If there are multiple pods with varying readiness times, we cannot correctly track it + // with the current queue. Further resyncs are attempted at the end of the syncStatefulSet + // function. + ssc.enqueueSSAfter(logger, set, time.Duration(set.Spec.MinReadySeconds)*time.Second) } return } @@ -510,17 +512,27 @@ func (ssc *StatefulSetController) syncStatefulSet(ctx context.Context, set *apps logger := klog.FromContext(ctx) logger.V(4).Info("Syncing StatefulSet with pods", "statefulSet", klog.KObj(set), "pods", len(pods)) var status *apps.StatefulSetStatus + var nextSyncDuration *time.Duration var err error + // Use the same time for calculating status and nextSyncDuration. now := ssc.clock.Now() status, err = ssc.control.UpdateStatefulSet(ctx, set, pods, now) if err != nil { return err } logger.V(4).Info("Successfully synced StatefulSet", "statefulSet", klog.KObj(set)) - // One more sync to handle the clock skew. This is also helping in requeuing right after status update - if set.Spec.MinReadySeconds > 0 && status != nil && status.AvailableReplicas != *set.Spec.Replicas { - ssc.enqueueSSAfter(logger, set, time.Duration(set.Spec.MinReadySeconds)*time.Second) + // Plan the next availability check as a last line of defense against queue preemption (we have one queue key for checking availability of all the pods) + // or early sync (see https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info). + if set.Spec.MinReadySeconds > 0 && status != nil && status.ReadyReplicas != status.AvailableReplicas { + // Safeguard fallback to the .spec.minReadySeconds to ensure that we always end up with .status.availableReplicas updated. + nextSyncDuration = ptr.To(time.Duration(set.Spec.MinReadySeconds) * time.Second) + // Use the same point in time (now) for calculating status and nextSyncDuration to get matching availability for the pods. + if nextCheck := controller.FindMinNextPodAvailabilityCheck(pods, set.Spec.MinReadySeconds, now, ssc.clock); nextCheck != nil { + nextSyncDuration = nextCheck + } + } + if nextSyncDuration != nil { + ssc.enqueueSSAfter(logger, set, *nextSyncDuration) } - return nil } From 04da1f09e5d1029277a09cce670c1314eaf3fb70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filip=20K=C5=99epinsk=C3=BD?= Date: Tue, 25 Nov 2025 00:08:20 +0100 Subject: [PATCH 3/4] replace "k8s.io/klog/v2/ktesting" with "k8s.io/kubernetes/test/utils/ktesting" for advanced features (e.g. Eventually) --- pkg/controller/statefulset/stateful_set_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/statefulset/stateful_set_test.go b/pkg/controller/statefulset/stateful_set_test.go index 78a8cc95f75..b2d46c537ad 100644 --- a/pkg/controller/statefulset/stateful_set_test.go +++ b/pkg/controller/statefulset/stateful_set_test.go @@ -37,9 +37,9 @@ import ( core "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" - "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/history" + "k8s.io/kubernetes/test/utils/ktesting" ) var parentKind = apps.SchemeGroupVersion.WithKind("StatefulSet") From 802ed9eaa9f0af357b95f8ddb3109f1acc8a3195 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filip=20K=C5=99epinsk=C3=BD?= Date: Tue, 25 Nov 2025 15:07:34 +0100 Subject: [PATCH 4/4] add StatefulSetAvailabilityCheck test --- .../statefulset/stateful_set_test.go | 63 +++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/pkg/controller/statefulset/stateful_set_test.go b/pkg/controller/statefulset/stateful_set_test.go index b2d46c537ad..cad55da0445 100644 --- a/pkg/controller/statefulset/stateful_set_test.go +++ b/pkg/controller/statefulset/stateful_set_test.go @@ -23,6 +23,9 @@ import ( "fmt" "sort" "testing" + "time" + + "github.com/onsi/gomega" apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" @@ -907,6 +910,66 @@ func TestStaleOwnerRefOnScaleup(t *testing.T) { } } +func TestStatefulSetAvailabilityCheck(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + + set := setMinReadySeconds(newStatefulSet(4), int32(5)) // 5 seconds + set = setupPodManagementPolicy(apps.ParallelPodManagement, set) + ssc, _, om, _ := newFakeStatefulSetController(ctx, set) + if err := om.setsIndexer.Add(set); err != nil { + t.Fatalf("could not add set to the cache: %v", err) + } + now := time.Now() + + pods := []*v1.Pod{} + pods = append(pods, newStatefulSetPod(set, 0)) + pods = append(pods, newStatefulSetPod(set, 1)) + pods[1].Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue, LastTransitionTime: metav1.Time{Time: now}}} + pods = append(pods, newStatefulSetPod(set, 2)) + pods[2].Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue, LastTransitionTime: metav1.Time{Time: now.Add(-2 * time.Second)}}} + pods = append(pods, newStatefulSetPod(set, 3)) + pods[3].Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue, LastTransitionTime: metav1.Time{Time: now.Add(-4300 * time.Millisecond)}}} + + for i, pod := range pods { + if err := om.podsIndexer.Add(pod); err != nil { + t.Fatalf("could not add pod to the cache %d: %v", i, err) + } + var err error + if pods, err = om.setPodRunning(set, i); err != nil { + t.Fatalf("%d: %v", i, err) + } + } + err := ssc.syncStatefulSet(ctx, set, pods) + if err != nil { + t.Fatal(err) + } + + if set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name); err != nil { + t.Fatalf("Could not get StatefulSet: %v", err) + } + + // one pod is not ready + if set.Status.ReadyReplicas != 3 { + t.Errorf("Expected updated StatefulSet to contain ready replicas %v, got %v instead", + 3, set.Status.ReadyReplicas) + } + if set.Status.AvailableReplicas != 0 { + t.Errorf("Expected updated StatefulSet to contain available replicas %v, got %v instead", + 0, set.Status.AvailableReplicas) + } + + if got, want := ssc.queue.Len(), 0; got != want { + t.Errorf("queue.Len() = %v, want %v", got, want) + } + + // RS should be re-queued after 700ms to recompute .status.availableReplicas (200ms extra for the test). + ktesting.Eventually(ctx, func(tCtx ktesting.TContext) int { + return ssc.queue.Len() + }).WithTimeout(900*time.Millisecond). + WithPolling(10*time.Millisecond). + Should(gomega.Equal(1), " StatefulSet should be re-queued to recompute .status.availableReplicas") +} + func newFakeStatefulSetController(ctx context.Context, initialObjects ...runtime.Object) (*StatefulSetController, *StatefulPodControl, *fakeObjectManager, history.Interface) { client := fake.NewSimpleClientset(initialObjects...) informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())