schedule pod availability checks at the correct time in ReplicaSets

This commit is contained in:
Filip Křepinský
2025-06-05 14:48:30 +02:00
parent 7383971ab3
commit 2cb48f77f0
6 changed files with 398 additions and 17 deletions

View File

@@ -53,6 +53,7 @@ import (
hashutil "k8s.io/kubernetes/pkg/util/hash"
taintutils "k8s.io/kubernetes/pkg/util/taints"
"k8s.io/utils/clock"
"k8s.io/utils/ptr"
"k8s.io/klog/v2"
)
@@ -1030,6 +1031,60 @@ func CountTerminatingPods(pods []*v1.Pod) int32 {
return int32(numberOfTerminatingPods)
}
// nextPodAvailabilityCheck implements similar logic to podutil.IsPodAvailable
func nextPodAvailabilityCheck(pod *v1.Pod, minReadySeconds int32, now time.Time) *time.Duration {
if !podutil.IsPodReady(pod) || minReadySeconds <= 0 {
return nil
}
c := podutil.GetPodReadyCondition(pod.Status)
if c.LastTransitionTime.IsZero() {
return nil
}
minReadySecondsDuration := time.Duration(minReadySeconds) * time.Second
nextCheck := c.LastTransitionTime.Add(minReadySecondsDuration).Sub(now)
if nextCheck > 0 {
return ptr.To(nextCheck)
}
return nil
}
// findMinNextPodAvailabilitySimpleCheck finds a duration when the next availability check should occur. It also returns the
// first pod affected by the future availability recalculation (there might be more pods if they became ready at the same time;
// this helps to implement FindMinNextPodAvailabilityCheck).
func findMinNextPodAvailabilitySimpleCheck(pods []*v1.Pod, minReadySeconds int32, now time.Time) (*time.Duration, *v1.Pod) {
var minAvailabilityCheck *time.Duration
var checkPod *v1.Pod
for _, p := range pods {
nextCheck := nextPodAvailabilityCheck(p, minReadySeconds, now)
if nextCheck != nil && (minAvailabilityCheck == nil || *nextCheck < *minAvailabilityCheck) {
minAvailabilityCheck = nextCheck
checkPod = p
}
}
return minAvailabilityCheck, checkPod
}
// FindMinNextPodAvailabilityCheck finds a duration when the next availability check should occur.
// We should check for the availability at the same time as the status evaluation/update occurs (e.g. .status.availableReplicas) by
// passing lastOwnerStatusEvaluation. This ensures that we will not skip any pods that might become available
// (findMinNextPodAvailabilitySimpleCheck would return nil in the future time), since the owner status evaluation.
// clock is then used to calculate the precise time for the next availability check.
func FindMinNextPodAvailabilityCheck(pods []*v1.Pod, minReadySeconds int32, lastOwnerStatusEvaluation time.Time, clock clock.PassiveClock) *time.Duration {
nextCheckAccordingToOwnerStatusEvaluation, checkPod := findMinNextPodAvailabilitySimpleCheck(pods, minReadySeconds, lastOwnerStatusEvaluation)
if nextCheckAccordingToOwnerStatusEvaluation == nil || checkPod == nil {
return nil
}
// There must be a nextCheck. We try to calculate a more precise value for the next availability check.
// Check the earliest pod to avoid being preempted by a later pod.
if updatedNextCheck := nextPodAvailabilityCheck(checkPod, minReadySeconds, clock.Now()); updatedNextCheck != nil {
// There is a delay since the last Now() call (lastOwnerStatusEvaluation). Use the updatedNextCheck.
return updatedNextCheck
}
// Fall back to 0 (immediate check) in case the last nextPodAvailabilityCheck call (with a refreshed Now) returns nil, as we might be past the check.
return ptr.To(time.Duration(0))
}
func IsPodActive(p *v1.Pod) bool {
return v1.PodSucceeded != p.Status.Phase &&
v1.PodFailed != p.Status.Phase &&

View File

@@ -881,6 +881,244 @@ func TestSortingActivePodsWithRanks(t *testing.T) {
}
}
func TestNextPodAvailabilityCheck(t *testing.T) {
newPodWithReadyCond := func(now metav1.Time, ready bool, beforeSec int) *v1.Pod {
conditionStatus := v1.ConditionFalse
if ready {
conditionStatus = v1.ConditionTrue
}
return &v1.Pod{
Status: v1.PodStatus{
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
LastTransitionTime: metav1.NewTime(now.Add(-1 * time.Duration(beforeSec) * time.Second)),
Status: conditionStatus,
},
},
},
}
}
now := metav1.Now()
tests := []struct {
name string
pod *v1.Pod
minReadySeconds int32
expected *time.Duration
}{
{
name: "not ready",
pod: newPodWithReadyCond(now, false, 0),
minReadySeconds: 0,
expected: nil,
},
{
name: "no minReadySeconds defined",
pod: newPodWithReadyCond(now, true, 0),
minReadySeconds: 0,
expected: nil,
},
{
name: "lastTransitionTime is zero",
pod: func() *v1.Pod {
pod := newPodWithReadyCond(now, true, 0)
pod.Status.Conditions[0].LastTransitionTime = metav1.Time{}
return pod
}(),
minReadySeconds: 1,
expected: nil,
},
{
name: "just became ready - available in 1s",
pod: newPodWithReadyCond(now, true, 0),
minReadySeconds: 1,
expected: ptr.To(time.Second),
},
{
name: "ready for 20s - available in 10s",
pod: newPodWithReadyCond(now, true, 20),
minReadySeconds: 30,
expected: ptr.To(10 * time.Second),
},
{
name: "available",
pod: newPodWithReadyCond(now, true, 51),
minReadySeconds: 50,
expected: nil,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nextAvailable := nextPodAvailabilityCheck(test.pod, test.minReadySeconds, now.Time)
if !ptr.Equal(nextAvailable, test.expected) {
t.Errorf("expected next pod availability check: %v, got: %v", test.expected, nextAvailable)
}
})
}
}
func TestFindMinNextPodAvailabilitySimpleCheck(t *testing.T) {
now := metav1.Now()
pod := func(name string, ready bool, beforeSec int) *v1.Pod {
p := testutil.NewPod(name, "node0")
if ready {
p.Status.Conditions[0].LastTransitionTime = metav1.NewTime(now.Add(-1 * time.Duration(beforeSec) * time.Second))
} else {
p.Status.Conditions[0].Status = v1.ConditionFalse
}
return p
}
tests := []struct {
name string
pods []*v1.Pod
minReadySeconds int32
expected *time.Duration
expectedPod *string
}{
{
name: "no pods",
pods: nil,
minReadySeconds: 0,
expected: nil,
expectedPod: nil,
},
{
name: "unready pods",
pods: []*v1.Pod{
pod("pod1", false, 0),
pod("pod2", false, 0),
},
minReadySeconds: 0,
expected: nil,
expectedPod: nil,
},
{
name: "ready pods with no minReadySeconds",
pods: []*v1.Pod{
pod("pod1", true, 0),
pod("pod2", true, 0),
},
minReadySeconds: 0,
expected: nil,
expectedPod: nil,
},
{
name: "unready and ready pods should find min next availability check",
pods: []*v1.Pod{
pod("pod1", false, 0),
pod("pod2", true, 2),
pod("pod3", true, 0),
pod("pod4", true, 4),
pod("pod5", false, 0),
},
minReadySeconds: 10,
expected: ptr.To(6 * time.Second),
expectedPod: ptr.To("pod4"),
},
{
name: "unready and available pods do not require min next availability check", // only after pods become ready we can schedule one
pods: []*v1.Pod{
pod("pod1", false, 0),
pod("pod2", true, 15),
pod("pod3", true, 11),
pod("pod4", true, 10),
pod("pod5", false, 0),
},
minReadySeconds: 10,
expected: nil,
expectedPod: nil,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nextAvailable, checkPod := findMinNextPodAvailabilitySimpleCheck(test.pods, test.minReadySeconds, now.Time)
var checkPodName *string
if checkPod != nil {
checkPodName = ptr.To(checkPod.Name)
}
if !ptr.Equal(nextAvailable, test.expected) {
t.Errorf("expected next min pod availability check: %v, got: %v", test.expected, nextAvailable)
}
if !ptr.Equal(checkPodName, test.expectedPod) {
t.Errorf("expected next min pod availability check for pod: %v, got: %v", test.expectedPod, checkPodName)
}
// using the same now for status evaluation and the clock should return the same result as findMinNextPodAvailabilitySimpleCheck
nextAvailable = FindMinNextPodAvailabilityCheck(test.pods, test.minReadySeconds, now.Time, testingclock.NewFakeClock(now.Time))
if !ptr.Equal(nextAvailable, test.expected) {
t.Errorf("expected next min pod availability check when status evaluation and clock is now: %v, got: %v", test.expected, nextAvailable)
}
})
}
}
func TestFindMinNextPodAvailability(t *testing.T) {
now := metav1.Now()
pod := func(name string, ready bool, beforeSec int) *v1.Pod {
p := testutil.NewPod(name, "node0")
if ready {
p.Status.Conditions[0].LastTransitionTime = metav1.NewTime(now.Add(-1 * time.Duration(beforeSec) * time.Second))
} else {
p.Status.Conditions[0].Status = v1.ConditionFalse
}
return p
}
tests := []struct {
name string
pods []*v1.Pod
minReadySeconds int32
statusEvaluationDelaySeconds int
expected *time.Duration
}{
{
name: "unready and ready pods should find min next availability check considering status evaluation/update delay",
pods: []*v1.Pod{
pod("pod1", false, 0),
pod("pod2", true, 2),
pod("pod3", true, 0),
pod("pod4", true, 4),
pod("pod5", false, 0),
},
minReadySeconds: 10,
statusEvaluationDelaySeconds: 2, // total is 4+2 since the pod4 became ready
expected: ptr.To(4 * time.Second),
},
{
name: "unready and ready pods should find min next availability check even if the status evaluation delay is longer than minReadySeconds",
pods: []*v1.Pod{
pod("pod1", false, 0),
pod("pod2", true, 2),
pod("pod3", true, 0),
pod("pod4", true, 4),
pod("pod5", false, 0),
},
minReadySeconds: 10,
statusEvaluationDelaySeconds: 7, // total is 4+7 since the pod4 became ready
expected: ptr.To(0 * time.Second),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
oldNow := now.Time
newNow := testingclock.NewFakePassiveClock(now.Add(time.Duration(test.statusEvaluationDelaySeconds) * time.Second))
nextAvailable := FindMinNextPodAvailabilityCheck(test.pods, test.minReadySeconds, oldNow, newNow)
if !ptr.Equal(nextAvailable, test.expected) {
t.Errorf("expected next min pod availability check: %v, got: %v", test.expected, nextAvailable)
}
})
}
}
func TestActiveReplicaSetsFiltering(t *testing.T) {
rsUuid := uuid.NewUUID()

View File

@@ -497,13 +497,14 @@ func (rsc *ReplicaSetController) updatePod(logger klog.Logger, old, cur interfac
// having its status updated with the newly available replica. For now, we can fake the
// update by resyncing the controller MinReadySeconds after the it is requeued because
// a Pod transitioned to Ready.
// If there are multiple pods with varying readiness times, we cannot correctly track it
// with the current queue. Further resyncs are attempted at the end of the syncReplicaSet
// function.
// Note that this still suffers from #29229, we are just moving the problem one level
// "closer" to kubelet (from the deployment to the replica set controller).
if !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && rs.Spec.MinReadySeconds > 0 {
logger.V(2).Info("pod will be enqueued after a while for availability check", "duration", rs.Spec.MinReadySeconds, "kind", rsc.Kind, "pod", klog.KObj(oldPod))
// Add a second to avoid milliseconds skew in AddAfter.
// See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info.
rsc.enqueueRSAfter(rs, (time.Duration(rs.Spec.MinReadySeconds)*time.Second)+time.Second)
rsc.enqueueRSAfter(rs, time.Duration(rs.Spec.MinReadySeconds)*time.Second)
}
return
}
@@ -747,12 +748,14 @@ func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string)
}
var manageReplicasErr error
var nextSyncInSeconds *int
var nextSyncDuration *time.Duration
if rsNeedsSync && rs.DeletionTimestamp == nil {
manageReplicasErr = rsc.manageReplicas(ctx, activePods, rs)
}
rs = rs.DeepCopy()
newStatus := calculateStatus(rs, activePods, terminatingPods, manageReplicasErr, rsc.controllerFeatures, rsc.clock)
// Use the same time for calculating status and nextSyncDuration.
now := rsc.clock.Now()
newStatus := calculateStatus(rs, activePods, terminatingPods, manageReplicasErr, rsc.controllerFeatures, now)
// Always updates status as pods come up or die.
updatedRS, err := updateReplicaSetStatus(logger, rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus, rsc.controllerFeatures)
@@ -764,14 +767,19 @@ func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string)
if manageReplicasErr != nil {
return manageReplicasErr
}
// Resync the ReplicaSet after MinReadySeconds as a last line of defense to guard against clock-skew.
// Plan the next availability check as a last line of defense against queue preemption (we have one queue key for checking availability of all the pods)
// or early sync (see https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info).
if updatedRS.Spec.MinReadySeconds > 0 &&
updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
nextSyncInSeconds = ptr.To(int(updatedRS.Spec.MinReadySeconds))
updatedRS.Status.ReadyReplicas != updatedRS.Status.AvailableReplicas {
// Safeguard fallback to the .spec.minReadySeconds to ensure that we always end up with .status.availableReplicas updated.
nextSyncDuration = ptr.To(time.Duration(updatedRS.Spec.MinReadySeconds) * time.Second)
// Use the same point in time (now) for calculating status and nextSyncDuration to get matching availability for the pods.
if nextCheck := controller.FindMinNextPodAvailabilityCheck(activePods, updatedRS.Spec.MinReadySeconds, now, rsc.clock); nextCheck != nil {
nextSyncDuration = nextCheck
}
}
if nextSyncInSeconds != nil {
rsc.queue.AddAfter(key, time.Duration(*nextSyncInSeconds)*time.Second)
if nextSyncDuration != nil {
rsc.queue.AddAfter(key, *nextSyncDuration)
}
return nil
}

View File

@@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"github.com/onsi/gomega"
"math/rand"
"net/http/httptest"
"net/url"
@@ -1618,6 +1619,87 @@ func TestDoNotAdoptOrCreateIfBeingDeletedRace(t *testing.T) {
}
}
func TestReplicaSetAvailabilityCheck(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(4, labelMap)
rs.Spec.MinReadySeconds = 5
client := fake.NewClientset(rs)
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(t, client, stopCh, BurstReplicas)
if err := informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rs); err != nil {
t.Fatal(err)
}
now := time.Now()
pod1 := newPod("foobar-1", rs, v1.PodPending, nil, true)
pod2 := newPod("foobar-2", rs, v1.PodRunning, &metav1.Time{Time: now}, true)
pod3 := newPod("foobar-3", rs, v1.PodRunning, &metav1.Time{Time: now.Add(-2 * time.Second)}, true)
pod4 := newPod("foobar-4", rs, v1.PodRunning, &metav1.Time{Time: now.Add(-4300 * time.Millisecond)}, true)
if err := informers.Core().V1().Pods().Informer().GetIndexer().Add(pod1); err != nil {
t.Fatal(err)
}
if err := informers.Core().V1().Pods().Informer().GetIndexer().Add(pod2); err != nil {
t.Fatal(err)
}
if err := informers.Core().V1().Pods().Informer().GetIndexer().Add(pod3); err != nil {
t.Fatal(err)
}
if err := informers.Core().V1().Pods().Informer().GetIndexer().Add(pod4); err != nil {
t.Fatal(err)
}
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
err := manager.syncReplicaSet(ctx, GetKey(rs, t))
if err != nil {
t.Fatal(err)
}
var updatedRs *apps.ReplicaSet
for _, a := range client.Actions() {
if a.GetResource().Resource != "replicasets" {
t.Errorf("Unexpected action %+v", a)
continue
}
switch action := a.(type) {
case core.UpdateAction:
var ok bool
if updatedRs, ok = action.GetObject().(*apps.ReplicaSet); !ok {
t.Errorf("Expected a ReplicaSet as the argument to update, got %T", updatedRs)
}
default:
t.Errorf("Unexpected action %+v", a)
}
}
// one pod is not ready
if updatedRs.Status.ReadyReplicas != 3 {
t.Errorf("Expected updated ReplicaSet to contain ready replicas %v, got %v instead",
3, updatedRs.Status.ReadyReplicas)
}
if updatedRs.Status.AvailableReplicas != 0 {
t.Errorf("Expected updated ReplicaSet to contain available replicas %v, got %v instead",
0, updatedRs.Status.AvailableReplicas)
}
if got, want := manager.queue.Len(), 0; got != want {
t.Errorf("queue.Len() = %v, want %v", got, want)
}
// RS should be re-queued after 700ms to recompute .status.availableReplicas (200ms extra for the test).
ktesting.Eventually(ctx, func(tCtx ktesting.TContext) int {
return manager.queue.Len()
}).WithTimeout(900*time.Millisecond).
WithPolling(10*time.Millisecond).
Should(gomega.Equal(1), " RS should be re-queued to recompute .status.availableReplicas")
}
var (
imagePullBackOff apps.ReplicaSetConditionType = "ImagePullBackOff"

View File

@@ -22,6 +22,7 @@ import (
"context"
"fmt"
"reflect"
"time"
"k8s.io/klog/v2"
@@ -33,7 +34,6 @@ import (
appsclient "k8s.io/client-go/kubernetes/typed/apps/v1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/features"
"k8s.io/utils/clock"
"k8s.io/utils/ptr"
)
@@ -93,7 +93,7 @@ func updateReplicaSetStatus(logger klog.Logger, c appsclient.ReplicaSetInterface
return nil, updateErr
}
func calculateStatus(rs *apps.ReplicaSet, activePods []*v1.Pod, terminatingPods []*v1.Pod, manageReplicasErr error, controllerFeatures ReplicaSetControllerFeatures, clock clock.PassiveClock) apps.ReplicaSetStatus {
func calculateStatus(rs *apps.ReplicaSet, activePods []*v1.Pod, terminatingPods []*v1.Pod, manageReplicasErr error, controllerFeatures ReplicaSetControllerFeatures, now time.Time) apps.ReplicaSetStatus {
newStatus := rs.Status
// Count the number of pods that have labels matching the labels of the pod
// template of the replica set, the matching pods may have more
@@ -104,7 +104,6 @@ func calculateStatus(rs *apps.ReplicaSet, activePods []*v1.Pod, terminatingPods
readyReplicasCount := 0
availableReplicasCount := 0
templateLabel := labels.Set(rs.Spec.Template.Labels).AsSelectorPreValidated()
now := clock.Now()
for _, pod := range activePods {
if templateLabel.Matches(labels.Set(pod.Labels)) {
fullyLabeledReplicasCount++

View File

@@ -303,7 +303,7 @@ func TestCalculateStatus(t *testing.T) {
// test ReplicaSet controller default behavior unless specified otherwise in the test case
controllerFeatures := ptr.Deref(test.controllerFeatures, DefaultReplicaSetControllerFeatures())
replicaSetStatus := calculateStatus(test.replicaset, test.activePods, test.terminatingPods, nil, controllerFeatures, clock)
replicaSetStatus := calculateStatus(test.replicaset, test.activePods, test.terminatingPods, nil, controllerFeatures, clock.Now())
if !reflect.DeepEqual(replicaSetStatus, test.expectedReplicaSetStatus) {
t.Errorf("unexpected replicaset status: expected %v, got %v", test.expectedReplicaSetStatus, replicaSetStatus)
}
@@ -400,8 +400,7 @@ func TestCalculateStatusConditions(t *testing.T) {
for _, test := range rsStatusConditionTests {
t.Run(test.name, func(t *testing.T) {
clock := clocktesting.NewFakePassiveClock(time.Now())
replicaSetStatus := calculateStatus(test.replicaset, test.activePods, nil, test.manageReplicasErr, DefaultReplicaSetControllerFeatures(), clock)
replicaSetStatus := calculateStatus(test.replicaset, test.activePods, nil, test.manageReplicasErr, DefaultReplicaSetControllerFeatures(), time.Now())
// all test cases have at most 1 status condition
if len(replicaSetStatus.Conditions) > 0 {
test.expectedReplicaSetConditions[0].LastTransitionTime = replicaSetStatus.Conditions[0].LastTransitionTime