Merge pull request #136097 from atiratree/automated-cherry-pick-of-#135428-upstream-release-1.35

Automated cherry pick of #135428: schedule pod availability checks at the correct time in StatefulSets
This commit is contained in:
Kubernetes Prow Robot
2026-01-10 09:39:40 +05:30
committed by GitHub
5 changed files with 160 additions and 97 deletions

View File

@@ -44,6 +44,8 @@ import (
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/history"
"k8s.io/kubernetes/pkg/controller/statefulset/metrics"
"k8s.io/utils/clock"
"k8s.io/utils/ptr"
"k8s.io/klog/v2"
)
@@ -81,6 +83,7 @@ type StatefulSetController struct {
queue workqueue.TypedRateLimitingInterface[string]
// eventBroadcaster is the core of event processing pipeline.
eventBroadcaster record.EventBroadcaster
clock clock.PassiveClock
}
// NewStatefulSetController creates a new statefulset controller.
@@ -118,6 +121,7 @@ func NewStatefulSetController(
podControl: controller.RealPodControl{KubeClient: kubeClient, Recorder: recorder},
eventBroadcaster: eventBroadcaster,
clock: clock.RealClock{},
}
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@@ -266,9 +270,10 @@ func (ssc *StatefulSetController) updatePod(logger klog.Logger, old, cur interfa
// having its status updated with the newly available replica.
if !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && set.Spec.MinReadySeconds > 0 {
logger.V(2).Info("StatefulSet will be enqueued after minReadySeconds for availability check", "statefulSet", klog.KObj(set), "minReadySeconds", set.Spec.MinReadySeconds)
// Add a second to avoid milliseconds skew in AddAfter.
// See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info.
ssc.enqueueSSAfter(logger, set, (time.Duration(set.Spec.MinReadySeconds)*time.Second)+time.Second)
// If there are multiple pods with varying readiness times, we cannot correctly track it
// with the current queue. Further resyncs are attempted at the end of the syncStatefulSet
// function.
ssc.enqueueSSAfter(logger, set, time.Duration(set.Spec.MinReadySeconds)*time.Second)
}
return
}
@@ -463,7 +468,7 @@ func (ssc *StatefulSetController) worker(ctx context.Context) {
// sync syncs the given statefulset.
func (ssc *StatefulSetController) sync(ctx context.Context, key string) error {
startTime := time.Now()
startTime := ssc.clock.Now()
logger := klog.FromContext(ctx)
defer func() {
logger.V(4).Info("Finished syncing statefulset", "key", key, "time", time.Since(startTime))
@@ -507,16 +512,27 @@ func (ssc *StatefulSetController) syncStatefulSet(ctx context.Context, set *apps
logger := klog.FromContext(ctx)
logger.V(4).Info("Syncing StatefulSet with pods", "statefulSet", klog.KObj(set), "pods", len(pods))
var status *apps.StatefulSetStatus
var nextSyncDuration *time.Duration
var err error
status, err = ssc.control.UpdateStatefulSet(ctx, set, pods)
// Use the same time for calculating status and nextSyncDuration.
now := ssc.clock.Now()
status, err = ssc.control.UpdateStatefulSet(ctx, set, pods, now)
if err != nil {
return err
}
logger.V(4).Info("Successfully synced StatefulSet", "statefulSet", klog.KObj(set))
// One more sync to handle the clock skew. This is also helping in requeuing right after status update
if set.Spec.MinReadySeconds > 0 && status != nil && status.AvailableReplicas != *set.Spec.Replicas {
ssc.enqueueSSAfter(logger, set, time.Duration(set.Spec.MinReadySeconds)*time.Second)
// Plan the next availability check as a last line of defense against queue preemption (we have one queue key for checking availability of all the pods)
// or early sync (see https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info).
if set.Spec.MinReadySeconds > 0 && status != nil && status.ReadyReplicas != status.AvailableReplicas {
// Safeguard fallback to the .spec.minReadySeconds to ensure that we always end up with .status.availableReplicas updated.
nextSyncDuration = ptr.To(time.Duration(set.Spec.MinReadySeconds) * time.Second)
// Use the same point in time (now) for calculating status and nextSyncDuration to get matching availability for the pods.
if nextCheck := controller.FindMinNextPodAvailabilityCheck(pods, set.Spec.MinReadySeconds, now, ssc.clock); nextCheck != nil {
nextSyncDuration = nextCheck
}
}
if nextSyncDuration != nil {
ssc.enqueueSSAfter(logger, set, *nextSyncDuration)
}
return nil
}

View File

@@ -20,6 +20,7 @@ import (
"context"
"sort"
"sync"
"time"
"k8s.io/klog/v2"
"k8s.io/utils/lru"
@@ -48,7 +49,7 @@ type StatefulSetControlInterface interface {
// If an implementation returns a non-nil error, the invocation will be retried using a rate-limited strategy.
// Implementors should sink any errors that they do not wish to trigger a retry, and they may feel free to
// exit exceptionally at any point provided they wish the update to be re-run at a later point in time.
UpdateStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error)
UpdateStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod, now time.Time) (*apps.StatefulSetStatus, error)
// ListRevisions returns a array of the ControllerRevisions that represent the revisions of set. If the returned
// error is nil, the returns slice of ControllerRevisions is valid.
ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error)
@@ -83,7 +84,7 @@ type defaultStatefulSetControl struct {
// strategy allows these constraints to be relaxed - pods will be created and deleted eagerly and
// in no particular order. Clients using the burst strategy should be careful to ensure they
// understand the consistency implications of having unpredictable numbers of pods available.
func (ssc *defaultStatefulSetControl) UpdateStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
func (ssc *defaultStatefulSetControl) UpdateStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod, now time.Time) (*apps.StatefulSetStatus, error) {
set = set.DeepCopy() // set is modified when a new revision is created in performUpdate. Make a copy now to avoid mutation errors.
// list all revisions and sort them
@@ -93,7 +94,7 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(ctx context.Context, set
}
history.SortControllerRevisions(revisions)
currentRevision, updateRevision, status, err := ssc.performUpdate(ctx, set, pods, revisions)
currentRevision, updateRevision, status, err := ssc.performUpdate(ctx, set, pods, revisions, now)
if err != nil {
errs := []error{err}
if agg, ok := err.(utilerrors.Aggregate); ok {
@@ -107,7 +108,7 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(ctx context.Context, set
}
func (ssc *defaultStatefulSetControl) performUpdate(
ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod, revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, *apps.StatefulSetStatus, error) {
ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod, revisions []*apps.ControllerRevision, now time.Time) (*apps.ControllerRevision, *apps.ControllerRevision, *apps.StatefulSetStatus, error) {
var currentStatus *apps.StatefulSetStatus
logger := klog.FromContext(ctx)
// get the current, and update revisions
@@ -117,7 +118,7 @@ func (ssc *defaultStatefulSetControl) performUpdate(
}
// perform the main update function and get the status
currentStatus, err = ssc.updateStatefulSet(ctx, set, currentRevision, updateRevision, collisionCount, pods)
currentStatus, err = ssc.updateStatefulSet(ctx, set, currentRevision, updateRevision, collisionCount, pods, now)
if err != nil && currentStatus == nil {
return currentRevision, updateRevision, nil, err
}
@@ -367,7 +368,7 @@ type replicaStatus struct {
updatedReplicas int32
}
func computeReplicaStatus(pods []*v1.Pod, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision) replicaStatus {
func computeReplicaStatus(pods []*v1.Pod, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision, now time.Time) replicaStatus {
status := replicaStatus{}
for _, pod := range pods {
if isCreated(pod) {
@@ -378,7 +379,7 @@ func computeReplicaStatus(pods []*v1.Pod, minReadySeconds int32, currentRevision
if isRunningAndReady(pod) {
status.readyReplicas++
// count the number of running and available replicas
if isRunningAndAvailable(pod, minReadySeconds) {
if isRunningAndAvailable(pod, minReadySeconds, now) {
status.availableReplicas++
}
@@ -398,14 +399,14 @@ func computeReplicaStatus(pods []*v1.Pod, minReadySeconds int32, currentRevision
return status
}
func updateStatus(status *apps.StatefulSetStatus, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision, podLists ...[]*v1.Pod) {
func updateStatus(status *apps.StatefulSetStatus, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision, now time.Time, podLists ...[]*v1.Pod) {
status.Replicas = 0
status.ReadyReplicas = 0
status.AvailableReplicas = 0
status.CurrentReplicas = 0
status.UpdatedReplicas = 0
for _, list := range podLists {
replicaStatus := computeReplicaStatus(list, minReadySeconds, currentRevision, updateRevision)
replicaStatus := computeReplicaStatus(list, minReadySeconds, currentRevision, updateRevision, now)
status.Replicas += replicaStatus.replicas
status.ReadyReplicas += replicaStatus.readyReplicas
status.AvailableReplicas += replicaStatus.availableReplicas
@@ -414,13 +415,7 @@ func updateStatus(status *apps.StatefulSetStatus, minReadySeconds int32, current
}
}
func (ssc *defaultStatefulSetControl) processReplica(
ctx context.Context,
set *apps.StatefulSet,
updateSet *apps.StatefulSet,
monotonic bool,
replicas []*v1.Pod,
i int) (bool, error) {
func (ssc *defaultStatefulSetControl) processReplica(ctx context.Context, set *apps.StatefulSet, updateSet *apps.StatefulSet, monotonic bool, replicas []*v1.Pod, i int, now time.Time) (bool, error) {
logger := klog.FromContext(ctx)
// Note that pods with phase Succeeded will also trigger this event. This is
@@ -484,7 +479,7 @@ func (ssc *defaultStatefulSetControl) processReplica(
// If we have a Pod that has been created but is not available we can not make progress.
// We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
// ordinal, are Available.
if !isRunningAndAvailable(replicas[i], set.Spec.MinReadySeconds) && monotonic {
if !isRunningAndAvailable(replicas[i], set.Spec.MinReadySeconds, now) && monotonic {
logger.V(4).Info("StatefulSet is waiting for Pod to be Available",
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
return true, nil
@@ -510,7 +505,7 @@ func (ssc *defaultStatefulSetControl) processReplica(
return false, nil
}
func (ssc *defaultStatefulSetControl) processCondemned(ctx context.Context, set *apps.StatefulSet, firstUnhealthyPod *v1.Pod, monotonic bool, condemned []*v1.Pod, i int) (bool, error) {
func (ssc *defaultStatefulSetControl) processCondemned(ctx context.Context, set *apps.StatefulSet, firstUnhealthyPod *v1.Pod, monotonic bool, condemned []*v1.Pod, i int, now time.Time) (bool, error) {
logger := klog.FromContext(ctx)
if isTerminating(condemned[i]) {
// if we are in monotonic mode, block and wait for terminating pods to expire
@@ -528,7 +523,7 @@ func (ssc *defaultStatefulSetControl) processCondemned(ctx context.Context, set
return true, nil
}
// if we are in monotonic mode and the condemned target is not the first unhealthy Pod, block.
if !isRunningAndAvailable(condemned[i], set.Spec.MinReadySeconds) && monotonic && condemned[i] != firstUnhealthyPod {
if !isRunningAndAvailable(condemned[i], set.Spec.MinReadySeconds, now) && monotonic && condemned[i] != firstUnhealthyPod {
logger.V(4).Info("StatefulSet is waiting for Pod to be Available prior to scale down",
"statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod))
return true, nil
@@ -563,13 +558,7 @@ func runForAll(pods []*v1.Pod, fn func(i int) (bool, error), monotonic bool) (bo
// all Pods with ordinal less than UpdateStrategy.Partition.Ordinal must be at Status.CurrentRevision and all other
// Pods must be at Status.UpdateRevision. If the returned error is nil, the returned StatefulSetStatus is valid and the
// update must be recorded. If the error is not nil, the method should be retried until successful.
func (ssc *defaultStatefulSetControl) updateStatefulSet(
ctx context.Context,
set *apps.StatefulSet,
currentRevision *apps.ControllerRevision,
updateRevision *apps.ControllerRevision,
collisionCount int32,
pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
func (ssc *defaultStatefulSetControl) updateStatefulSet(ctx context.Context, set *apps.StatefulSet, currentRevision *apps.ControllerRevision, updateRevision *apps.ControllerRevision, collisionCount int32, pods []*v1.Pod, now time.Time) (*apps.StatefulSetStatus, error) {
logger := klog.FromContext(ctx)
// get the current and update revisions of the set.
currentSet, err := ApplyRevision(set, currentRevision)
@@ -589,7 +578,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
status.CollisionCount = new(int32)
*status.CollisionCount = collisionCount
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, pods)
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, now, pods)
replicaCount := int(*set.Spec.Replicas)
// slice that will contain all Pods such that getStartOrdinal(set) <= getOrdinal(pod) <= getEndOrdinal(set)
@@ -630,7 +619,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
// find the first unhealthy Pod
for i := range replicas {
if isUnavailable(replicas[i], set.Spec.MinReadySeconds) {
if isUnavailable(replicas[i], set.Spec.MinReadySeconds, now) {
unavailable++
if firstUnavailablePod == nil {
firstUnavailablePod = replicas[i]
@@ -640,7 +629,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
// or the first unhealthy condemned Pod (condemned are sorted in descending order for ease of use)
for i := len(condemned) - 1; i >= 0; i-- {
if isUnavailable(condemned[i], set.Spec.MinReadySeconds) {
if isUnavailable(condemned[i], set.Spec.MinReadySeconds, now) {
unavailable++
if firstUnavailablePod == nil {
firstUnavailablePod = condemned[i]
@@ -662,10 +651,10 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
// First, process each living replica. Exit if we run into an error or something blocking in monotonic mode.
processReplicaFn := func(i int) (bool, error) {
return ssc.processReplica(ctx, set, updateSet, monotonic, replicas, i)
return ssc.processReplica(ctx, set, updateSet, monotonic, replicas, i, now)
}
if shouldExit, err := runForAll(replicas, processReplicaFn, monotonic); shouldExit || err != nil {
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, now, replicas, condemned)
return &status, err
}
@@ -681,7 +670,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
return false, nil
}
if shouldExit, err := runForAll(condemned, fixPodClaim, monotonic); shouldExit || err != nil {
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, now, replicas, condemned)
return &status, err
}
@@ -692,14 +681,14 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
// Note that we do not resurrect Pods in this interval. Also note that scaling will take precedence over
// updates.
processCondemnedFn := func(i int) (bool, error) {
return ssc.processCondemned(ctx, set, firstUnavailablePod, monotonic, condemned, i)
return ssc.processCondemned(ctx, set, firstUnavailablePod, monotonic, condemned, i, now)
}
if shouldExit, err := runForAll(condemned, processCondemnedFn, monotonic); shouldExit || err != nil {
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, now, replicas, condemned)
return &status, err
}
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, now, replicas, condemned)
// for the OnDelete strategy we short circuit. Pods will be updated when they are manually deleted.
if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType {
@@ -713,6 +702,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
replicas,
updateRevision,
status,
now,
)
}
@@ -738,7 +728,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
}
// wait for unavailable Pods on update
if isUnavailable(replicas[target], set.Spec.MinReadySeconds) {
if isUnavailable(replicas[target], set.Spec.MinReadySeconds, now) {
logger.V(4).Info("StatefulSet is waiting for Pod to update",
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[target]))
return &status, nil
@@ -748,14 +738,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
return &status, nil
}
func updateStatefulSetAfterInvariantEstablished(
ctx context.Context,
ssc *defaultStatefulSetControl,
set *apps.StatefulSet,
replicas []*v1.Pod,
updateRevision *apps.ControllerRevision,
status apps.StatefulSetStatus,
) (*apps.StatefulSetStatus, error) {
func updateStatefulSetAfterInvariantEstablished(ctx context.Context, ssc *defaultStatefulSetControl, set *apps.StatefulSet, replicas []*v1.Pod, updateRevision *apps.ControllerRevision, status apps.StatefulSetStatus, now time.Time) (*apps.StatefulSetStatus, error) {
logger := klog.FromContext(ctx)
replicaCount := int(*set.Spec.Replicas)
@@ -783,7 +766,7 @@ func updateStatefulSetAfterInvariantEstablished(
unavailablePods := 0
for target := len(replicas) - 1; target >= 0; target-- {
if isUnavailable(replicas[target], set.Spec.MinReadySeconds) {
if isUnavailable(replicas[target], set.Spec.MinReadySeconds, now) {
unavailablePods++
}
}

View File

@@ -359,7 +359,7 @@ func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc)
if err != nil {
t.Error(err)
}
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
t.Errorf("Failed to update StatefulSet : %s", err)
}
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
@@ -369,7 +369,7 @@ func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc)
if pods, err = om.setPodRunning(set, i); err != nil {
t.Error(err)
}
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
t.Errorf("Failed to update StatefulSet : %s", err)
}
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
@@ -384,7 +384,7 @@ func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc)
if err != nil {
t.Error(err)
}
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
t.Errorf("Failed to update StatefulSet : %s", err)
}
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
@@ -408,7 +408,7 @@ func recreatesPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc,
if err != nil {
t.Error(err)
}
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
if err := invariants(set, om); err != nil {
@@ -442,7 +442,7 @@ func recreatesPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc,
// Expect pod deletion failure
om.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
expectedNumOfDeleteRequests++
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); !isOrHasInternalError(err) {
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); !isOrHasInternalError(err) {
t.Errorf("StatefulSetControl did not return InternalError, found %s", err)
}
if err := invariants(set, om); err != nil {
@@ -458,7 +458,7 @@ func recreatesPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc,
// Expect pod deletion
expectedNumOfDeleteRequests++
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
if err := invariants(set, om); err != nil {
@@ -472,7 +472,7 @@ func recreatesPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc,
}
// Expect no additional delete calls and expect pod creation
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
if err := invariants(set, om); err != nil {
@@ -584,7 +584,7 @@ func UpdatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantF
om.podsIndexer.Update(pods[0])
// now it should fail
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); !isOrHasInternalError(err) {
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); !isOrHasInternalError(err) {
t.Errorf("StatefulSetControl did not return InternalError, found %s", err)
}
}
@@ -657,7 +657,7 @@ func NewRevisionDeletePodFailure(t *testing.T, set *apps.StatefulSet, invariants
// delete fails
om.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
_, err = ssc.UpdateStatefulSet(context.TODO(), set, pods)
_, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
if err == nil {
t.Error("Expected err in update StatefulSet when deleting a pod")
}
@@ -678,7 +678,7 @@ func NewRevisionDeletePodFailure(t *testing.T, set *apps.StatefulSet, invariants
// delete works
om.SetDeleteStatefulPodError(nil, 0)
status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods)
status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
if err != nil {
t.Fatalf("Unexpected err in update StatefulSet: %v", err)
}
@@ -767,7 +767,7 @@ func RecreatesPVCForPendingPod(t *testing.T, set *apps.StatefulSet, invariants i
if err != nil {
t.Error(err)
}
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
if err := invariants(set, om); err != nil {
@@ -782,7 +782,7 @@ func RecreatesPVCForPendingPod(t *testing.T, set *apps.StatefulSet, invariants i
}
pods[0].Status.Phase = v1.PodPending
om.podsIndexer.Update(pods[0])
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
// invariants check if there any missing PVCs for the Pods
@@ -994,7 +994,7 @@ func TestStatefulSetControlRollingUpdateWithMaxUnavailable(t *testing.T) {
spc.setPodRunning(set, 4)
originalPods, _ := spc.setPodReadyCondition(set, 4, true)
sort.Sort(ascendingOrdinal(originalPods))
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, originalPods, time.Now()); err != nil {
t.Fatal(err)
}
pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
@@ -1008,7 +1008,7 @@ func TestStatefulSetControlRollingUpdateWithMaxUnavailable(t *testing.T) {
}
// create new pod 3
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
t.Fatal(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
@@ -1037,7 +1037,7 @@ func TestStatefulSetControlRollingUpdateWithMaxUnavailable(t *testing.T) {
spc.setPodReadyCondition(set, 4, true)
// create new pod 4 (only one pod gets created at a time due to OrderedReady)
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
t.Fatal(err)
}
pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
@@ -1106,7 +1106,7 @@ func TestStatefulSetControlRollingUpdateWithMaxUnavailable(t *testing.T) {
sort.Sort(ascendingOrdinal(originalPods))
// since maxUnavailable is 2, update pods 4 and 5, this will delete the pod 4 and 5,
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods, time.Now()); err != nil {
t.Fatal(err)
}
pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
@@ -1122,7 +1122,7 @@ func TestStatefulSetControlRollingUpdateWithMaxUnavailable(t *testing.T) {
}
// create new pods
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
t.Fatal(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
@@ -1138,7 +1138,7 @@ func TestStatefulSetControlRollingUpdateWithMaxUnavailable(t *testing.T) {
spc.setPodReadyCondition(set, 5, true)
originalPods, _ = spc.setPodReadyCondition(set, 3, true)
sort.Sort(ascendingOrdinal(originalPods))
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods, time.Now()); err != nil {
t.Fatal(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
@@ -1234,7 +1234,7 @@ func TestStatefulSetControlRollingUpdateWithMaxUnavailableInOrderedModeVerifyInv
// try to update the statefulset
// this function is only called in main code when feature gate is enabled
if _, err = updateStatefulSetAfterInvariantEstablished(context.TODO(), ssc.(*defaultStatefulSetControl), set, originalPods, updateRevision, status); err != nil {
if _, err = updateStatefulSetAfterInvariantEstablished(context.TODO(), ssc.(*defaultStatefulSetControl), set, originalPods, updateRevision, status, time.Now()); err != nil {
t.Fatal(err)
}
pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
@@ -1969,7 +1969,7 @@ func TestStatefulSetControlLimitsHistory(t *testing.T) {
if err != nil {
t.Fatalf("%s: %s", test.name, err)
}
_, err = ssc.UpdateStatefulSet(context.TODO(), set, pods)
_, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
if err != nil {
t.Fatalf("%s: %s", test.name, err)
}
@@ -2336,7 +2336,7 @@ func TestStatefulSetAvailability(t *testing.T) {
if err != nil {
t.Fatalf("%s: %s", test.name, err)
}
status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods)
status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
if err != nil {
t.Fatalf("%s: %s", test.name, err)
}
@@ -2407,7 +2407,7 @@ func TestStatefulSetStatusUpdate(t *testing.T) {
if err != nil {
t.Error(err)
}
_, err = ssc.UpdateStatefulSet(context.TODO(), set, pods)
_, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
if ssu.updateStatusTracker.requests != 1 {
t.Errorf("Did not update status")
}
@@ -3077,7 +3077,7 @@ func parallelScaleUpStatefulSetControl(set *apps.StatefulSet,
}
// run the controller once and check invariants
_, err = ssc.UpdateStatefulSet(context.TODO(), set, pods)
_, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
if err != nil {
return err
}
@@ -3113,14 +3113,14 @@ func parallelScaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetC
return err
}
sort.Sort(ascendingOrdinal(pods))
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
return err
}
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
return err
}
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
return err
}
}
@@ -3179,7 +3179,7 @@ func scaleUpStatefulSetControl(set *apps.StatefulSet,
}
}
// run the controller once and check invariants
_, err = ssc.UpdateStatefulSet(context.TODO(), set, pods)
_, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
if err != nil {
return err
}
@@ -3207,7 +3207,7 @@ func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlIn
}
sort.Sort(ascendingOrdinal(pods))
if idx := len(pods) - 1; idx >= 0 {
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
return err
}
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
@@ -3217,7 +3217,7 @@ func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlIn
if pods, err = om.addTerminatingPod(set, getOrdinal(pods[idx])); err != nil {
return err
}
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
return err
}
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
@@ -3234,7 +3234,7 @@ func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlIn
om.podsIndexer.Delete(pods[len(pods)-1])
}
}
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
return err
}
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
@@ -3327,7 +3327,7 @@ func updateStatefulSetControl(set *apps.StatefulSet,
if err != nil {
return err
}
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
return err
}
@@ -3375,7 +3375,7 @@ func updateStatefulSetControl(set *apps.StatefulSet,
}
}
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
return err
}
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
@@ -3453,7 +3453,7 @@ func TestStatefulSetRollingUpdateRespectsMinReadySeconds(t *testing.T) {
}
// Perform update - should be blocked because pods haven't been ready for 30 seconds
status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods)
status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
if err != nil {
t.Fatalf("failed to update StatefulSet: %s", err)
}
@@ -3483,7 +3483,7 @@ func TestStatefulSetRollingUpdateRespectsMinReadySeconds(t *testing.T) {
}
// Perform update again - now should proceed
status, err = ssc.UpdateStatefulSet(context.TODO(), set, pods)
status, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
if err != nil {
t.Fatalf("failed to update StatefulSet: %s", err)
}
@@ -3537,7 +3537,7 @@ func TestStatefulSetScaleDownRespectsMinReadySeconds(t *testing.T) {
}
// Scale down should be blocked because pods haven't been available for 30 seconds
status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods)
status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
if err != nil {
t.Fatalf("failed to update StatefulSet: %s", err)
}
@@ -3600,7 +3600,7 @@ func TestStatefulSetOnDeleteStrategyIgnoresMinReadySeconds(t *testing.T) {
}
// OnDelete strategy should complete regardless of MinReadySeconds
status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods)
status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
if err != nil {
t.Fatalf("failed to update StatefulSet: %s", err)
}
@@ -3645,7 +3645,7 @@ func TestStatefulSetZeroMinReadySeconds(t *testing.T) {
}
// With zero MinReadySeconds, pods should be immediately available
status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods)
status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
if err != nil {
t.Fatalf("failed to update StatefulSet: %s", err)
}
@@ -3695,7 +3695,7 @@ func TestStatefulSetPartitionRollingUpdateWithMinReadySeconds(t *testing.T) {
}
// Rolling update should be blocked because pods haven't been available for 30 seconds
status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods)
status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
if err != nil {
t.Fatalf("failed to update StatefulSet: %s", err)
}
@@ -3798,7 +3798,7 @@ func TestStatefulSetMetrics(t *testing.T) {
ReadyReplicas: readyReplicas,
}
updateRevision := &apps.ControllerRevision{}
_, err = updateStatefulSetAfterInvariantEstablished(context.TODO(), ssc.(*defaultStatefulSetControl), set, pods, updateRevision, status)
_, err = updateStatefulSetAfterInvariantEstablished(context.TODO(), ssc.(*defaultStatefulSetControl), set, pods, updateRevision, status, time.Now())
if err != nil {
t.Fatal(err)
}

View File

@@ -23,6 +23,9 @@ import (
"fmt"
"sort"
"testing"
"time"
"github.com/onsi/gomega"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
@@ -37,9 +40,9 @@ import (
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/history"
"k8s.io/kubernetes/test/utils/ktesting"
)
var parentKind = apps.SchemeGroupVersion.WithKind("StatefulSet")
@@ -907,6 +910,66 @@ func TestStaleOwnerRefOnScaleup(t *testing.T) {
}
}
func TestStatefulSetAvailabilityCheck(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
set := setMinReadySeconds(newStatefulSet(4), int32(5)) // 5 seconds
set = setupPodManagementPolicy(apps.ParallelPodManagement, set)
ssc, _, om, _ := newFakeStatefulSetController(ctx, set)
if err := om.setsIndexer.Add(set); err != nil {
t.Fatalf("could not add set to the cache: %v", err)
}
now := time.Now()
pods := []*v1.Pod{}
pods = append(pods, newStatefulSetPod(set, 0))
pods = append(pods, newStatefulSetPod(set, 1))
pods[1].Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue, LastTransitionTime: metav1.Time{Time: now}}}
pods = append(pods, newStatefulSetPod(set, 2))
pods[2].Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue, LastTransitionTime: metav1.Time{Time: now.Add(-2 * time.Second)}}}
pods = append(pods, newStatefulSetPod(set, 3))
pods[3].Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue, LastTransitionTime: metav1.Time{Time: now.Add(-4300 * time.Millisecond)}}}
for i, pod := range pods {
if err := om.podsIndexer.Add(pod); err != nil {
t.Fatalf("could not add pod to the cache %d: %v", i, err)
}
var err error
if pods, err = om.setPodRunning(set, i); err != nil {
t.Fatalf("%d: %v", i, err)
}
}
err := ssc.syncStatefulSet(ctx, set, pods)
if err != nil {
t.Fatal(err)
}
if set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name); err != nil {
t.Fatalf("Could not get StatefulSet: %v", err)
}
// one pod is not ready
if set.Status.ReadyReplicas != 3 {
t.Errorf("Expected updated StatefulSet to contain ready replicas %v, got %v instead",
3, set.Status.ReadyReplicas)
}
if set.Status.AvailableReplicas != 0 {
t.Errorf("Expected updated StatefulSet to contain available replicas %v, got %v instead",
0, set.Status.AvailableReplicas)
}
if got, want := ssc.queue.Len(), 0; got != want {
t.Errorf("queue.Len() = %v, want %v", got, want)
}
// RS should be re-queued after 700ms to recompute .status.availableReplicas (200ms extra for the test).
ktesting.Eventually(ctx, func(tCtx ktesting.TContext) int {
return ssc.queue.Len()
}).WithTimeout(900*time.Millisecond).
WithPolling(10*time.Millisecond).
Should(gomega.Equal(1), " StatefulSet should be re-queued to recompute .status.availableReplicas")
}
func newFakeStatefulSetController(ctx context.Context, initialObjects ...runtime.Object) (*StatefulSetController, *StatefulPodControl, *fakeObjectManager, history.Interface) {
client := fake.NewSimpleClientset(initialObjects...)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())

View File

@@ -21,6 +21,7 @@ import (
"fmt"
"regexp"
"strconv"
"time"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
@@ -457,8 +458,8 @@ func isRunningAndReady(pod *v1.Pod) bool {
return pod.Status.Phase == v1.PodRunning && podutil.IsPodReady(pod)
}
func isRunningAndAvailable(pod *v1.Pod, minReadySeconds int32) bool {
return podutil.IsPodAvailable(pod, minReadySeconds, metav1.Now())
func isRunningAndAvailable(pod *v1.Pod, minReadySeconds int32, now time.Time) bool {
return podutil.IsPodAvailable(pod, minReadySeconds, metav1.Time{Time: now})
}
// isCreated returns true if pod has been created and is maintained by the API server
@@ -487,8 +488,8 @@ func isTerminating(pod *v1.Pod) bool {
}
// isUnavailable returns true if pod is not available or if it is terminating
func isUnavailable(pod *v1.Pod, minReadySeconds int32) bool {
return !isRunningAndAvailable(pod, minReadySeconds) || isTerminating(pod)
func isUnavailable(pod *v1.Pod, minReadySeconds int32, now time.Time) bool {
return !podutil.IsPodAvailable(pod, minReadySeconds, metav1.Time{Time: now}) || isTerminating(pod)
}
// allowsBurst is true if the alpha burst annotation is set.