diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index 7fb618fa788..9088bfd692e 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -973,6 +973,21 @@ func compareMaxContainerRestarts(pi *v1.Pod, pj *v1.Pod) *bool { return nil } +// FilterClaimedPods returns pods that are controlled by the controller and match the selector. +func FilterClaimedPods(controller metav1.Object, selector labels.Selector, pods []*v1.Pod) []*v1.Pod { + var result []*v1.Pod + for _, pod := range pods { + if !metav1.IsControlledBy(pod, controller) { + // It's an orphan or owned by someone else. + continue + } + if selector.Matches(labels.Set(pod.Labels)) { + result = append(result, pod) + } + } + return result +} + // FilterActivePods returns pods that have not terminated. func FilterActivePods(logger klog.Logger, pods []*v1.Pod) []*v1.Pod { var result []*v1.Pod diff --git a/pkg/controller/controller_utils_test.go b/pkg/controller/controller_utils_test.go index d14daced9f1..e24add9a555 100644 --- a/pkg/controller/controller_utils_test.go +++ b/pkg/controller/controller_utils_test.go @@ -55,6 +55,7 @@ import ( "k8s.io/kubernetes/test/utils/ktesting" testingclock "k8s.io/utils/clock/testing" "k8s.io/utils/pointer" + "k8s.io/utils/ptr" "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" @@ -433,6 +434,128 @@ func TestCountTerminatingPods(t *testing.T) { assert.Len(t, terminatingList, int(2)) } +func TestClaimedPodFiltering(t *testing.T) { + rsUUID := uuid.NewUUID() + + type podData struct { + podName string + ownerReferences []metav1.OwnerReference + labels map[string]string + } + + type test struct { + name string + pods []podData + wantPodNames []string + } + + tests := []test{ + { + name: "Filters claimed pods", + pods: []podData{ + // single owner reference + {podName: "claimed-1", labels: map[string]string{"foo": "bar"}, ownerReferences: []metav1.OwnerReference{ + {UID: rsUUID, Controller: ptr.To(true)}, + }}, + {podName: "wrong-selector-1", labels: map[string]string{"foo": "baz"}, ownerReferences: []metav1.OwnerReference{ + {UID: rsUUID, Controller: ptr.To(true)}, + }}, + {podName: "non-controller-1", labels: map[string]string{"foo": "bar"}, ownerReferences: []metav1.OwnerReference{ + {UID: rsUUID, Controller: nil}, + }}, + {podName: "other-controller-1", labels: map[string]string{"foo": "bar"}, ownerReferences: []metav1.OwnerReference{ + {UID: uuid.NewUUID(), Controller: ptr.To(true)}, + }}, + {podName: "other-workload-1", labels: map[string]string{"foo": "bee"}, ownerReferences: []metav1.OwnerReference{ + {UID: uuid.NewUUID(), Controller: ptr.To(true)}, + }}, + {podName: "standalone-pod-1", labels: map[string]string{"foo": "beetle"}, ownerReferences: []metav1.OwnerReference{}}, + // additional controller owner reference set to controller=false + {podName: "claimed-2", labels: map[string]string{"foo": "bar"}, ownerReferences: []metav1.OwnerReference{ + {UID: uuid.NewUUID(), Controller: ptr.To(false)}, + {UID: rsUUID, Controller: ptr.To(true)}, + }}, + {podName: "wrong-selector-2", labels: map[string]string{"foo": "baz"}, ownerReferences: []metav1.OwnerReference{ + {UID: uuid.NewUUID(), Controller: ptr.To(false)}, + {UID: rsUUID, Controller: ptr.To(true)}, + }}, + {podName: "non-controller-2", labels: map[string]string{"foo": "bar"}, ownerReferences: []metav1.OwnerReference{ + {UID: uuid.NewUUID(), Controller: ptr.To(false)}, + {UID: rsUUID, Controller: ptr.To(false)}, + }}, + {podName: "other-controller-2", labels: map[string]string{"foo": "bar"}, ownerReferences: []metav1.OwnerReference{ + {UID: uuid.NewUUID(), Controller: ptr.To(false)}, + {UID: uuid.NewUUID(), Controller: ptr.To(true)}, + }}, + {podName: "other-workload-1", labels: map[string]string{"foo": "bee"}, ownerReferences: []metav1.OwnerReference{ + {UID: uuid.NewUUID(), Controller: ptr.To(false)}, + {UID: uuid.NewUUID(), Controller: ptr.To(true)}, + }}, + {podName: "standalone-pod-1", labels: map[string]string{"foo": "beetle"}, ownerReferences: []metav1.OwnerReference{ + {UID: uuid.NewUUID(), Controller: ptr.To(false)}, + }}, + // additional controller owner reference set to controller=nil + {podName: "claimed-3", labels: map[string]string{"foo": "bar"}, ownerReferences: []metav1.OwnerReference{ + {UID: uuid.NewUUID()}, + {UID: rsUUID, Controller: ptr.To(true)}, + }}, + {podName: "wrong-selector-3", labels: nil, ownerReferences: []metav1.OwnerReference{ + {UID: uuid.NewUUID()}, + {UID: rsUUID, Controller: ptr.To(true)}, + }}, + {podName: "non-controller-3", labels: map[string]string{"foo": "bar"}, ownerReferences: []metav1.OwnerReference{ + {UID: uuid.NewUUID()}, + {UID: rsUUID, Controller: nil}, + }}, + {podName: "other-controller-3", labels: map[string]string{"foo": "bar"}, ownerReferences: []metav1.OwnerReference{ + {UID: uuid.NewUUID()}, + {UID: uuid.NewUUID(), Controller: ptr.To(true)}, + }}, + {podName: "other-workload-1", labels: map[string]string{"foo": "bee"}, ownerReferences: []metav1.OwnerReference{ + {UID: uuid.NewUUID()}, + }}, + {podName: "standalone-pod-1", labels: map[string]string{"foo": "beetle"}, ownerReferences: []metav1.OwnerReference{ + {UID: uuid.NewUUID()}, + }}, + }, + wantPodNames: []string{"claimed-1", "claimed-2", "claimed-3"}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // This rc is not needed by the test, only the newPodList to give the pods labels/a namespace. + rs := newReplicaSet("test-claim", 3, rsUUID) + var pods []*v1.Pod + for _, p := range test.pods { + pods = append(pods, &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: p.podName, + Namespace: rs.Namespace, + Labels: p.labels, + OwnerReferences: p.ownerReferences, + }, + Status: v1.PodStatus{Phase: v1.PodRunning}, + }) + } + + selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector) + if err != nil { + t.Fatalf("Couldn't get selector for object %#v: %v", rs, err) + } + got := FilterClaimedPods(rs, selector, pods) + gotNames := sets.NewString() + for _, pod := range got { + gotNames.Insert(pod.Name) + } + + if diff := cmp.Diff(test.wantPodNames, gotNames.List()); diff != "" { + t.Errorf("Active pod names (-want,+got):\n%s", diff) + } + }) + } +} + func TestActivePodFiltering(t *testing.T) { logger, _ := ktesting.NewTestContext(t) type podData struct { diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index a730fa19dae..b1e1beed452 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -45,6 +45,7 @@ import ( "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" appsinformers "k8s.io/client-go/informers/apps/v1" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" @@ -60,6 +61,7 @@ import ( podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/replicaset/metrics" + "k8s.io/kubernetes/pkg/features" ) const ( @@ -564,10 +566,10 @@ func (rsc *ReplicaSetController) processNextWorkItem(ctx context.Context) bool { } // manageReplicas checks and updates replicas for the given ReplicaSet. -// Does NOT modify . +// Does NOT modify . // It will requeue the replica set in case of an error while creating/deleting pods. -func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPods []*v1.Pod, rs *apps.ReplicaSet) error { - diff := len(filteredPods) - int(*(rs.Spec.Replicas)) +func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, activePods []*v1.Pod, rs *apps.ReplicaSet) error { + diff := len(activePods) - int(*(rs.Spec.Replicas)) rsKey, err := controller.KeyFunc(rs) if err != nil { utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err)) @@ -627,7 +629,7 @@ func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPod utilruntime.HandleError(err) // Choose which Pods to delete, preferring those in earlier phases of startup. - podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff) + podsToDelete := getPodsToDelete(activePods, relatedPods, diff) // Snapshot the UIDs (ns/name) of the pods we're expecting to see // deleted, so we know to record their expectations exactly once either @@ -707,22 +709,27 @@ func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string) if err != nil { return err } - // Ignore inactive pods. - filteredPods := controller.FilterActivePods(logger, allPods) - // NOTE: filteredPods are pointing to objects from cache - if you need to + // NOTE: activePods and terminatingPods are pointing to objects from cache - if you need to // modify them, you need to copy it first. - filteredPods, err = rsc.claimPods(ctx, rs, selector, filteredPods) + allActivePods := controller.FilterActivePods(logger, allPods) + activePods, err := rsc.claimPods(ctx, rs, selector, allActivePods) if err != nil { return err } + var terminatingPods []*v1.Pod + if utilfeature.DefaultFeatureGate.Enabled(features.DeploymentPodReplacementPolicy) { + allTerminatingPods := controller.FilterTerminatingPods(allPods) + terminatingPods = controller.FilterClaimedPods(rs, selector, allTerminatingPods) + } + var manageReplicasErr error if rsNeedsSync && rs.DeletionTimestamp == nil { - manageReplicasErr = rsc.manageReplicas(ctx, filteredPods, rs) + manageReplicasErr = rsc.manageReplicas(ctx, activePods, rs) } rs = rs.DeepCopy() - newStatus := calculateStatus(rs, filteredPods, manageReplicasErr) + newStatus := calculateStatus(rs, activePods, terminatingPods, manageReplicasErr) // Always updates status as pods come up or die. updatedRS, err := updateReplicaSetStatus(logger, rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus) diff --git a/pkg/controller/replicaset/replica_set_utils.go b/pkg/controller/replicaset/replica_set_utils.go index 756ef2217bc..b4e826cb3d9 100644 --- a/pkg/controller/replicaset/replica_set_utils.go +++ b/pkg/controller/replicaset/replica_set_utils.go @@ -29,8 +29,11 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + utilfeature "k8s.io/apiserver/pkg/util/feature" appsclient "k8s.io/client-go/kubernetes/typed/apps/v1" podutil "k8s.io/kubernetes/pkg/api/v1/pod" + "k8s.io/kubernetes/pkg/features" + "k8s.io/utils/ptr" ) // updateReplicaSetStatus attempts to update the Status.Replicas of the given ReplicaSet, with a single GET/PUT retry. @@ -42,6 +45,7 @@ func updateReplicaSetStatus(logger klog.Logger, c appsclient.ReplicaSetInterface rs.Status.FullyLabeledReplicas == newStatus.FullyLabeledReplicas && rs.Status.ReadyReplicas == newStatus.ReadyReplicas && rs.Status.AvailableReplicas == newStatus.AvailableReplicas && + ptr.Equal(rs.Status.TerminatingReplicas, newStatus.TerminatingReplicas) && rs.Generation == rs.Status.ObservedGeneration && reflect.DeepEqual(rs.Status.Conditions, newStatus.Conditions) { return rs, nil @@ -56,11 +60,16 @@ func updateReplicaSetStatus(logger klog.Logger, c appsclient.ReplicaSetInterface var getErr, updateErr error var updatedRS *apps.ReplicaSet for i, rs := 0, rs; ; i++ { + terminatingReplicasUpdateInfo := "" + if utilfeature.DefaultFeatureGate.Enabled(features.DeploymentPodReplacementPolicy) { + terminatingReplicasUpdateInfo = fmt.Sprintf("terminatingReplicas %s->%s, ", derefInt32ToStr(rs.Status.TerminatingReplicas), derefInt32ToStr(newStatus.TerminatingReplicas)) + } logger.V(4).Info(fmt.Sprintf("Updating status for %v: %s/%s, ", rs.Kind, rs.Namespace, rs.Name) + fmt.Sprintf("replicas %d->%d (need %d), ", rs.Status.Replicas, newStatus.Replicas, *(rs.Spec.Replicas)) + fmt.Sprintf("fullyLabeledReplicas %d->%d, ", rs.Status.FullyLabeledReplicas, newStatus.FullyLabeledReplicas) + fmt.Sprintf("readyReplicas %d->%d, ", rs.Status.ReadyReplicas, newStatus.ReadyReplicas) + fmt.Sprintf("availableReplicas %d->%d, ", rs.Status.AvailableReplicas, newStatus.AvailableReplicas) + + terminatingReplicasUpdateInfo + fmt.Sprintf("sequence No: %v->%v", rs.Status.ObservedGeneration, newStatus.ObservedGeneration)) rs.Status = newStatus @@ -83,18 +92,18 @@ func updateReplicaSetStatus(logger klog.Logger, c appsclient.ReplicaSetInterface return nil, updateErr } -func calculateStatus(rs *apps.ReplicaSet, filteredPods []*v1.Pod, manageReplicasErr error) apps.ReplicaSetStatus { +func calculateStatus(rs *apps.ReplicaSet, activePods []*v1.Pod, terminatingPods []*v1.Pod, manageReplicasErr error) apps.ReplicaSetStatus { newStatus := rs.Status // Count the number of pods that have labels matching the labels of the pod // template of the replica set, the matching pods may have more // labels than are in the template. Because the label of podTemplateSpec is // a superset of the selector of the replica set, so the possible - // matching pods must be part of the filteredPods. + // matching pods must be part of the activePods. fullyLabeledReplicasCount := 0 readyReplicasCount := 0 availableReplicasCount := 0 templateLabel := labels.Set(rs.Spec.Template.Labels).AsSelectorPreValidated() - for _, pod := range filteredPods { + for _, pod := range activePods { if templateLabel.Matches(labels.Set(pod.Labels)) { fullyLabeledReplicasCount++ } @@ -106,10 +115,15 @@ func calculateStatus(rs *apps.ReplicaSet, filteredPods []*v1.Pod, manageReplicas } } + var terminatingReplicasCount *int32 + if utilfeature.DefaultFeatureGate.Enabled(features.DeploymentPodReplacementPolicy) { + terminatingReplicasCount = ptr.To(int32(len(terminatingPods))) + } + failureCond := GetCondition(rs.Status, apps.ReplicaSetReplicaFailure) if manageReplicasErr != nil && failureCond == nil { var reason string - if diff := len(filteredPods) - int(*(rs.Spec.Replicas)); diff < 0 { + if diff := len(activePods) - int(*(rs.Spec.Replicas)); diff < 0 { reason = "FailedCreate" } else if diff > 0 { reason = "FailedDelete" @@ -120,10 +134,11 @@ func calculateStatus(rs *apps.ReplicaSet, filteredPods []*v1.Pod, manageReplicas RemoveCondition(&newStatus, apps.ReplicaSetReplicaFailure) } - newStatus.Replicas = int32(len(filteredPods)) + newStatus.Replicas = int32(len(activePods)) newStatus.FullyLabeledReplicas = int32(fullyLabeledReplicasCount) newStatus.ReadyReplicas = int32(readyReplicasCount) newStatus.AvailableReplicas = int32(availableReplicasCount) + newStatus.TerminatingReplicas = terminatingReplicasCount return newStatus } @@ -175,3 +190,10 @@ func filterOutCondition(conditions []apps.ReplicaSetCondition, condType apps.Rep } return newConditions } + +func derefInt32ToStr(ptr *int32) string { + if ptr == nil { + return "nil" + } + return fmt.Sprintf("%d", *ptr) +} diff --git a/pkg/controller/replicaset/replica_set_utils_test.go b/pkg/controller/replicaset/replica_set_utils_test.go index 6f65315e77f..c3dc41d790e 100644 --- a/pkg/controller/replicaset/replica_set_utils_test.go +++ b/pkg/controller/replicaset/replica_set_utils_test.go @@ -25,6 +25,11 @@ import ( apps "k8s.io/api/apps/v1" "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/kubernetes/pkg/features" + "k8s.io/utils/ptr" ) func TestCalculateStatus(t *testing.T) { @@ -36,113 +41,199 @@ func TestCalculateStatus(t *testing.T) { longMinReadySecondsRS := newReplicaSet(1, fullLabelMap) longMinReadySecondsRS.Spec.MinReadySeconds = 3600 + asTerminating := func(pod *v1.Pod) *v1.Pod { + pod.DeletionTimestamp = ptr.To(meta.Now()) + return pod + } + rsStatusTests := []struct { - name string - replicaset *apps.ReplicaSet - filteredPods []*v1.Pod - expectedReplicaSetStatus apps.ReplicaSetStatus + name string + enableDeploymentPodReplacementPolicy bool + replicaset *apps.ReplicaSet + activePods []*v1.Pod + terminatingPods []*v1.Pod + expectedReplicaSetStatus apps.ReplicaSetStatus }{ { "1 fully labelled pod", + false, fullyLabelledRS, []*v1.Pod{ newPod("pod1", fullyLabelledRS, v1.PodRunning, nil, true), }, + nil, apps.ReplicaSetStatus{ Replicas: 1, FullyLabeledReplicas: 1, ReadyReplicas: 1, AvailableReplicas: 1, + TerminatingReplicas: nil, }, }, { "1 not fully labelled pod", + false, notFullyLabelledRS, []*v1.Pod{ newPod("pod1", notFullyLabelledRS, v1.PodRunning, nil, true), }, + nil, apps.ReplicaSetStatus{ Replicas: 1, FullyLabeledReplicas: 0, ReadyReplicas: 1, AvailableReplicas: 1, + TerminatingReplicas: nil, }, }, { "2 fully labelled pods", + false, fullyLabelledRS, []*v1.Pod{ newPod("pod1", fullyLabelledRS, v1.PodRunning, nil, true), newPod("pod2", fullyLabelledRS, v1.PodRunning, nil, true), }, + nil, apps.ReplicaSetStatus{ Replicas: 2, FullyLabeledReplicas: 2, ReadyReplicas: 2, AvailableReplicas: 2, + TerminatingReplicas: nil, + }, + }, + { + "2 fully labelled pods with DeploymentPodReplacementPolicy", + true, + fullyLabelledRS, + []*v1.Pod{ + newPod("pod1", fullyLabelledRS, v1.PodRunning, nil, true), + newPod("pod2", fullyLabelledRS, v1.PodRunning, nil, true), + }, + nil, + apps.ReplicaSetStatus{ + Replicas: 2, + FullyLabeledReplicas: 2, + ReadyReplicas: 2, + AvailableReplicas: 2, + TerminatingReplicas: ptr.To[int32](0), }, }, { "2 not fully labelled pods", + false, notFullyLabelledRS, []*v1.Pod{ newPod("pod1", notFullyLabelledRS, v1.PodRunning, nil, true), newPod("pod2", notFullyLabelledRS, v1.PodRunning, nil, true), }, + nil, apps.ReplicaSetStatus{ Replicas: 2, FullyLabeledReplicas: 0, ReadyReplicas: 2, AvailableReplicas: 2, + TerminatingReplicas: nil, }, }, { "1 fully labelled pod, 1 not fully labelled pod", + false, notFullyLabelledRS, []*v1.Pod{ newPod("pod1", notFullyLabelledRS, v1.PodRunning, nil, true), newPod("pod2", fullyLabelledRS, v1.PodRunning, nil, true), }, + nil, apps.ReplicaSetStatus{ Replicas: 2, FullyLabeledReplicas: 1, ReadyReplicas: 2, AvailableReplicas: 2, + TerminatingReplicas: nil, }, }, { "1 non-ready pod", + false, fullyLabelledRS, []*v1.Pod{ newPod("pod1", fullyLabelledRS, v1.PodPending, nil, true), }, + nil, apps.ReplicaSetStatus{ Replicas: 1, FullyLabeledReplicas: 1, ReadyReplicas: 0, AvailableReplicas: 0, + TerminatingReplicas: nil, }, }, { "1 ready but non-available pod", + false, longMinReadySecondsRS, []*v1.Pod{ newPod("pod1", longMinReadySecondsRS, v1.PodRunning, nil, true), }, + nil, apps.ReplicaSetStatus{ Replicas: 1, FullyLabeledReplicas: 1, ReadyReplicas: 1, AvailableReplicas: 0, + TerminatingReplicas: nil, + }, + }, + { + "1 fully labelled pod and 1 terminating without DeploymentPodReplacementPolicy", + false, + fullyLabelledRS, + []*v1.Pod{ + newPod("pod1", fullyLabelledRS, v1.PodRunning, nil, true), + }, + []*v1.Pod{ + asTerminating(newPod("pod2", fullyLabelledRS, v1.PodRunning, nil, true)), + }, + apps.ReplicaSetStatus{ + Replicas: 1, + FullyLabeledReplicas: 1, + ReadyReplicas: 1, + AvailableReplicas: 1, + TerminatingReplicas: nil, + }, + }, + { + "1 fully labelled pods and 2 terminating with DeploymentPodReplacementPolicy", + true, + fullyLabelledRS, + []*v1.Pod{ + newPod("pod1", fullyLabelledRS, v1.PodRunning, nil, true), + }, + []*v1.Pod{ + asTerminating(newPod("pod2", fullyLabelledRS, v1.PodRunning, nil, true)), + asTerminating(newPod("pod3", fullyLabelledRS, v1.PodRunning, nil, true)), + }, + apps.ReplicaSetStatus{ + Replicas: 1, + FullyLabeledReplicas: 1, + ReadyReplicas: 1, + AvailableReplicas: 1, + TerminatingReplicas: ptr.To[int32](2), }, }, } for _, test := range rsStatusTests { - replicaSetStatus := calculateStatus(test.replicaset, test.filteredPods, nil) - if !reflect.DeepEqual(replicaSetStatus, test.expectedReplicaSetStatus) { - t.Errorf("%s: unexpected replicaset status: expected %v, got %v", test.name, test.expectedReplicaSetStatus, replicaSetStatus) - } + t.Run(test.name, func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DeploymentPodReplacementPolicy, test.enableDeploymentPodReplacementPolicy) + + replicaSetStatus := calculateStatus(test.replicaset, test.activePods, test.terminatingPods, nil) + if !reflect.DeepEqual(replicaSetStatus, test.expectedReplicaSetStatus) { + t.Errorf("unexpected replicaset status: expected %v, got %v", test.expectedReplicaSetStatus, replicaSetStatus) + } + }) } } @@ -160,7 +251,7 @@ func TestCalculateStatusConditions(t *testing.T) { rsStatusConditionTests := []struct { name string replicaset *apps.ReplicaSet - filteredPods []*v1.Pod + activePods []*v1.Pod manageReplicasErr error expectedReplicaSetConditions []apps.ReplicaSetCondition }{ @@ -234,13 +325,15 @@ func TestCalculateStatusConditions(t *testing.T) { } for _, test := range rsStatusConditionTests { - replicaSetStatus := calculateStatus(test.replicaset, test.filteredPods, test.manageReplicasErr) - // all test cases have at most 1 status condition - if len(replicaSetStatus.Conditions) > 0 { - test.expectedReplicaSetConditions[0].LastTransitionTime = replicaSetStatus.Conditions[0].LastTransitionTime - } - if !reflect.DeepEqual(replicaSetStatus.Conditions, test.expectedReplicaSetConditions) { - t.Errorf("%s: unexpected replicaset status: expected %v, got %v", test.name, test.expectedReplicaSetConditions, replicaSetStatus.Conditions) - } + t.Run(test.name, func(t *testing.T) { + replicaSetStatus := calculateStatus(test.replicaset, test.activePods, nil, test.manageReplicasErr) + // all test cases have at most 1 status condition + if len(replicaSetStatus.Conditions) > 0 { + test.expectedReplicaSetConditions[0].LastTransitionTime = replicaSetStatus.Conditions[0].LastTransitionTime + } + if !reflect.DeepEqual(replicaSetStatus.Conditions, test.expectedReplicaSetConditions) { + t.Errorf("unexpected replicaset status: expected %v, got %v", test.expectedReplicaSetConditions, replicaSetStatus.Conditions) + } + }) } } diff --git a/test/integration/replicaset/replicaset_test.go b/test/integration/replicaset/replicaset_test.go index 3886cfca949..96a6b4cc96e 100644 --- a/test/integration/replicaset/replicaset_test.go +++ b/test/integration/replicaset/replicaset_test.go @@ -208,6 +208,20 @@ func waitRSStable(t *testing.T, clientSet clientset.Interface, rs *apps.ReplicaS } } +// Verify .Status.TerminatingPods is equal to terminatingPods. +func waitForTerminatingPods(clientSet clientset.Interface, ctx context.Context, rs *apps.ReplicaSet, terminatingPods *int32) error { + if err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(c context.Context) (bool, error) { + newRS, err := clientSet.AppsV1().ReplicaSets(rs.Namespace).Get(c, rs.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + return ptr.Equal(newRS.Status.TerminatingReplicas, terminatingPods), nil + }); err != nil { + return fmt.Errorf("failed to verify .Status.TerminatingPods is equal to %d for replicaset %q: %w", ptr.Deref(terminatingPods, -1), rs.Name, err) + } + return nil +} + // Update .Spec.Replicas to replicas and verify .Status.Replicas is changed accordingly func scaleRS(t *testing.T, c clientset.Interface, rs *apps.ReplicaSet, replicas int32) { rsClient := c.AppsV1().ReplicaSets(rs.Namespace) @@ -1056,3 +1070,72 @@ func TestReplicaSetsAppsV1DefaultGCPolicy(t *testing.T) { _ = rsClient.Delete(tCtx, rs.Name, metav1.DeleteOptions{}) } + +func TestTerminatingReplicas(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DeploymentPodReplacementPolicy, false) + + tCtx, closeFn, rm, informers, c := rmSetup(t) + defer closeFn() + ns := framework.CreateNamespaceOrDie(c, "test-terminating-replicas", t) + defer framework.DeleteNamespaceOrDie(c, ns, t) + stopControllers := runControllerAndInformers(t, rm, informers, 0) + defer stopControllers() + + rs := newRS("rs", ns.Name, 5) + rs.Spec.Template.Spec.NodeName = "fake-node" + rs.Spec.Template.Spec.TerminationGracePeriodSeconds = ptr.To(int64(300)) + rss, _ := createRSsPods(t, c, []*apps.ReplicaSet{rs}, []*v1.Pod{}) + rs = rss[0] + waitRSStable(t, c, rs) + if err := waitForTerminatingPods(c, tCtx, rs, nil); err != nil { + t.Fatal(err) + } + + podClient := c.CoreV1().Pods(ns.Name) + pods := getPods(t, podClient, labelMap()) + if len(pods.Items) != 5 { + t.Fatalf("len(pods) = %d, want 5", len(pods.Items)) + } + setPodsReadyCondition(t, c, pods, v1.ConditionTrue, time.Now()) + + // should not update terminating pods when feature gate is disabled + if err := podClient.Delete(tCtx, pods.Items[0].Name, metav1.DeleteOptions{}); err != nil { + t.Fatal(err) + } + waitRSStable(t, c, rs) + if err := waitForTerminatingPods(c, tCtx, rs, nil); err != nil { + t.Fatal(err) + } + + // should update terminating pods when feature gate is enabled + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DeploymentPodReplacementPolicy, true) + if err := podClient.Delete(tCtx, pods.Items[1].Name, metav1.DeleteOptions{}); err != nil { + t.Fatal(err) + } + waitRSStable(t, c, rs) + if err := waitForTerminatingPods(c, tCtx, rs, ptr.To[int32](2)); err != nil { + t.Fatal(err) + } + // should update status when the pod is removed + if err := podClient.Delete(tCtx, pods.Items[0].Name, metav1.DeleteOptions{GracePeriodSeconds: ptr.To(int64(0))}); err != nil { + t.Fatal(err) + } + if err := waitForTerminatingPods(c, tCtx, rs, ptr.To[int32](1)); err != nil { + t.Fatal(err) + } + + // should revert terminating pods to 0 when feature gate is disabled + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DeploymentPodReplacementPolicy, false) + if err := podClient.Delete(tCtx, pods.Items[2].Name, metav1.DeleteOptions{}); err != nil { + t.Fatal(err) + } + waitRSStable(t, c, rs) + if err := waitForTerminatingPods(c, tCtx, rs, nil); err != nil { + t.Fatal(err) + } + pods = getPods(t, podClient, labelMap()) + // 5 non-terminating, 2 terminating + if len(pods.Items) != 7 { + t.Fatalf("len(pods) = %d, want 7", len(pods.Items)) + } +}