diff --git a/test/integration/deployment/deployment_test.go b/test/integration/deployment/deployment_test.go index 3a0ca8f989f..96b022a9308 100644 --- a/test/integration/deployment/deployment_test.go +++ b/test/integration/deployment/deployment_test.go @@ -19,9 +19,13 @@ package deployment import ( "context" "fmt" + "math" + "reflect" "strings" "testing" + "github.com/google/go-cmp/cmp" + apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -1403,3 +1407,425 @@ func TestTerminatingReplicasDeploymentStatus(t *testing.T) { t.Fatal(err) } } + +func TestRecreateDeploymentForPodReplacement(t *testing.T) { + tests := []struct { + name string + enableDeploymentReplicaSetTerminatingReplicas bool + expectedReplicasAfterOldRSScaleDown int32 + expectedTerminatingReplicasAfterOldRSScaleDown *int32 + expectedReplicasAfterNewRS int32 + expectedTerminatingReplicasAfterNewRS *int32 + expectedReplicasAfterInFlightPodTermination int32 + expectedTerminatingReplicasAfterInFlightPodTermination *int32 + expectedReplicasAfterInFlightScaleUp int32 + expectedTerminatingReplicasAfterInFlightScaleUp *int32 + expectedReplicasAfterInFlightScaleDown int32 + expectedTerminatingReplicasAfterInFlightScaleDown *int32 + expectedReplicasForDeploymentComplete int32 + expectedTerminatingReplicasForDeploymentComplete *int32 + }{ + { + name: "recreate should wait for terminating pods to complete in a new rollout with DeploymentReplicaSetTerminatingReplicas=false", + enableDeploymentReplicaSetTerminatingReplicas: false, + + expectedReplicasAfterOldRSScaleDown: 0, + expectedTerminatingReplicasAfterOldRSScaleDown: nil, // terminating counting disabled for all expectedTerminating + expectedReplicasAfterNewRS: 6, + expectedTerminatingReplicasAfterNewRS: nil, + expectedReplicasAfterInFlightPodTermination: 6, // 1 pod terminated + expectedTerminatingReplicasAfterInFlightPodTermination: nil, + expectedReplicasAfterInFlightScaleUp: 7, // +1 scale up + expectedTerminatingReplicasAfterInFlightScaleUp: nil, + expectedReplicasAfterInFlightScaleDown: 5, // -2 scale down + expectedTerminatingReplicasAfterInFlightScaleDown: nil, + expectedReplicasForDeploymentComplete: 5, + expectedTerminatingReplicasForDeploymentComplete: nil, + }, + { + name: "recreate should wait for terminating pods to complete in a new rollout with DeploymentReplicaSetTerminatingReplicas=true", + enableDeploymentReplicaSetTerminatingReplicas: true, + + expectedReplicasAfterOldRSScaleDown: 0, + expectedTerminatingReplicasAfterOldRSScaleDown: ptr.To[int32](6), + expectedReplicasAfterNewRS: 6, + expectedTerminatingReplicasAfterNewRS: ptr.To[int32](0), + expectedReplicasAfterInFlightPodTermination: 6, // 1 pod terminated + expectedTerminatingReplicasAfterInFlightPodTermination: ptr.To[int32](1), + expectedReplicasAfterInFlightScaleUp: 7, // +1 scale up + expectedTerminatingReplicasAfterInFlightScaleUp: ptr.To[int32](1), + expectedReplicasAfterInFlightScaleDown: 5, // -2 scale down + expectedTerminatingReplicasAfterInFlightScaleDown: ptr.To[int32](3), + expectedReplicasForDeploymentComplete: 5, + expectedTerminatingReplicasForDeploymentComplete: ptr.To[int32](3), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DeploymentReplicaSetTerminatingReplicas, test.enableDeploymentReplicaSetTerminatingReplicas) + + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + closeFn, rm, dc, informers, c := dcSetup(ctx, t) + defer closeFn() + + name := "test-recreate-deployment-pod-replacement" + ns := framework.CreateNamespaceOrDie(c, name, t) + defer framework.DeleteNamespaceOrDie(c, ns, t) + + // Start informer and controllers + stopControllers := runControllersAndInformers(t, rm, dc, informers) + defer stopControllers() + + deploymentName := "deployment" + replicas := int32(6) + tester := &deploymentTester{t: t, c: c, deployment: newDeployment(deploymentName, ns.Name, replicas)} + tester.deployment.Spec.Strategy.Type = apps.RecreateDeploymentStrategyType + tester.deployment.Spec.Strategy.RollingUpdate = nil + tester.deployment.Spec.Template.Spec.NodeName = "fake-node" + tester.deployment.Spec.Template.Spec.TerminationGracePeriodSeconds = ptr.To(int64(300)) + + var err error + tester.deployment, err = c.AppsV1().Deployments(ns.Name).Create(context.TODO(), tester.deployment, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("failed to create deployment %q: %v", deploymentName, err) + } + + // Ensure the deployment completes while marking its pods as ready simultaneously + if err := tester.waitForDeploymentCompleteAndMarkPodsReady(); err != nil { + t.Fatal(err) + } + // Record current replicaset before starting new rollout + firstRS, err := tester.expectNewReplicaSet() + if err != nil { + t.Fatal(err) + } + + // trigger a new rollout + tester.deployment, err = tester.updateDeployment(func(update *apps.Deployment) { + update.Spec.Template.Spec.Containers[0].Env = append(update.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "deploy2", Value: "true"}) + }) + if err != nil { + t.Fatalf("failed updating deployment %q: %v", deploymentName, err) + } + + // Wait for old replicaset of 1st rollout to have 0 replicas first + firstRS, err = c.AppsV1().ReplicaSets(ns.Name).Get(context.TODO(), firstRS.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("failed to get replicaset %q: %v", firstRS.Name, err) + } + firstRS.Spec.Replicas = ptr.To[int32](0) + if err = tester.waitRSStable(firstRS); err != nil { + t.Fatal(err) + } + + // Verify all replicas fields of DeploymentStatus have desired counts after scale down phase + expectedReplicas := test.expectedReplicasAfterOldRSScaleDown + if err = tester.waitForDeploymentStatusReplicasFields(ctx, expectedReplicas, expectedReplicas, 0, 0, expectedReplicas, test.expectedTerminatingReplicasAfterOldRSScaleDown); err != nil { + t.Fatal(err) + } + + // Verify that the new rollout won't create new replica set, until the old pods terminate + if err := tester.expectNoNewReplicaSet(); err != nil { + t.Fatal(err) + } + // remove terminating pods and skip graceful termination of the old RS + if err := tester.removeRSPods(ctx, firstRS, math.MaxInt, true, 0); err != nil { + t.Fatal(err) + } + + // Verify all replicas fields of DeploymentStatus have desired counts after new RS creation + expectedReplicas = test.expectedReplicasAfterNewRS + if err = tester.waitForDeploymentStatusReplicasFields(ctx, expectedReplicas, expectedReplicas, 0, 0, expectedReplicas, test.expectedTerminatingReplicasAfterNewRS); err != nil { + t.Fatal(err) + } + // Verify that the new rollout created new replica set + secondRS, err := tester.expectNewReplicaSet() + if err != nil { + t.Fatal(err) + } + + // start terminating 1 pod + err = tester.removeRSPods(ctx, secondRS, 1, false, 300) + if err != nil { + t.Fatal(err) + } + // Verify all replicas fields of DeploymentStatus have desired counts after surprise pod termination + expectedReplicas = test.expectedReplicasAfterInFlightPodTermination + if err = tester.waitForDeploymentStatusReplicasFields(ctx, expectedReplicas, expectedReplicas, 0, 0, expectedReplicas, test.expectedTerminatingReplicasAfterInFlightPodTermination); err != nil { + t.Fatal(err) + } + + // Scale up during the deployment rollout + tester.deployment, err = tester.updateDeployment(func(update *apps.Deployment) { + update.Spec.Replicas = ptr.To[int32](7) + }) + if err != nil { + t.Fatalf("failed to update deployment %q: %v", deploymentName, err) + } + + // Verify all replicas fields of DeploymentStatus have desired counts after in flight scale up + expectedReplicas = test.expectedReplicasAfterInFlightScaleUp + if err = tester.waitForDeploymentStatusReplicasFields(ctx, expectedReplicas, expectedReplicas, 0, 0, expectedReplicas, test.expectedTerminatingReplicasAfterInFlightScaleUp); err != nil { + t.Fatal(err) + } + + // Scale down during the deployment rollout + tester.deployment, err = tester.updateDeployment(func(update *apps.Deployment) { + update.Spec.Replicas = ptr.To[int32](5) + }) + if err != nil { + t.Fatalf("failed to update/scale deployment %q: %v", deploymentName, err) + } + + // Verify all replicas fields of DeploymentStatus have desired counts after in flight scale down + expectedReplicas = test.expectedReplicasAfterInFlightScaleDown + if err = tester.waitForDeploymentStatusReplicasFields(ctx, expectedReplicas, expectedReplicas, 0, 0, expectedReplicas, test.expectedTerminatingReplicasAfterInFlightScaleDown); err != nil { + t.Fatal(err) + } + + // Verify all replicas fields of DeploymentStatus have desired counts before the deployment is completed + expectedReplicas = test.expectedReplicasForDeploymentComplete + if err = tester.waitForDeploymentStatusReplicasFields(ctx, expectedReplicas, expectedReplicas, 0, 0, expectedReplicas, test.expectedTerminatingReplicasForDeploymentComplete); err != nil { + t.Fatal(err) + } + + // Ensure the new deployment completes while marking its pods as ready simultaneously + if err = tester.waitForDeploymentCompleteAndMarkPodsReady(); err != nil { + t.Fatal(err) + } + // Verify all replicas fields of DeploymentStatus have desired counts after the deployment is completed + expectedReplicas = test.expectedReplicasForDeploymentComplete + if err = tester.waitForDeploymentStatusReplicasFields(ctx, expectedReplicas, expectedReplicas, expectedReplicas, expectedReplicas, 0, test.expectedTerminatingReplicasForDeploymentComplete); err != nil { + t.Fatal(err) + } + + // remove terminating pods (if there are any) and skip graceful termination of the old RS + if err := tester.removeRSPods(ctx, firstRS, math.MaxInt, true, 0); err != nil { + t.Fatal(err) + } + // remove terminating pods and skip graceful termination of the new RS + if err := tester.removeRSPods(ctx, secondRS, math.MaxInt, true, 0); err != nil { + t.Fatal(err) + } + + // Verify all replicas fields of DeploymentStatus have desired counts after the deployment is completed and old pods have terminated + expectedReplicas = test.expectedReplicasForDeploymentComplete + var expectedFinalTerminatingReplicas *int32 + if test.enableDeploymentReplicaSetTerminatingReplicas { + expectedFinalTerminatingReplicas = ptr.To[int32](0) + } + if err = tester.waitForDeploymentStatusReplicasFields(ctx, expectedReplicas, expectedReplicas, expectedReplicas, expectedReplicas, 0, expectedFinalTerminatingReplicas); err != nil { + t.Fatal(err) + } + }) + } +} + +func TestRollingUpdateAndProportionalScalingForDeploymentPodReplacement(t *testing.T) { + tests := []struct { + name string + enableDeploymentReplicaSetTerminatingReplicas bool + terminatingReplicasFirstRS int32 + terminatingReplicasSecondRS int32 + + expectedFirstRSReplicasDuringNewRollout int32 + expectedSecondRSReplicasDuringNewRollout int32 + expectedTerminatingReplicasDuringNewRollout *int32 + expectedFirstRSReplicasAfterInFlightScaleUp int32 + expectedSecondRSReplicasAfterInFlightScaleUp int32 + expectedTerminatingReplicasDuringInFlightScaleUp *int32 + expectedFirstRSAnnotationsAfterInFlightScaleUp map[string]string + expectedSecondRSAnnotationsAfterInFlightScaleUp map[string]string + }{ + // starts with 100 replicas + 20 maxSurge + { + name: "rolling update should not wait for terminating pods with DeploymentReplicaSetTerminatingReplicas=false", + enableDeploymentReplicaSetTerminatingReplicas: false, + + expectedFirstRSReplicasDuringNewRollout: 100, + expectedSecondRSReplicasDuringNewRollout: 20, + expectedTerminatingReplicasDuringNewRollout: nil, + expectedFirstRSReplicasAfterInFlightScaleUp: 117, + expectedSecondRSReplicasAfterInFlightScaleUp: 23, + expectedTerminatingReplicasDuringInFlightScaleUp: nil, + expectedFirstRSAnnotationsAfterInFlightScaleUp: map[string]string{ + deploymentutil.DesiredReplicasAnnotation: "120", + deploymentutil.MaxReplicasAnnotation: "140", + deploymentutil.RevisionAnnotation: "1", + }, + expectedSecondRSAnnotationsAfterInFlightScaleUp: map[string]string{ + deploymentutil.DesiredReplicasAnnotation: "120", + deploymentutil.MaxReplicasAnnotation: "140", + deploymentutil.RevisionAnnotation: "2", + }, + }, + { + name: "rolling update and scaling should not wait for terminating pods with DeploymentReplicaSetTerminatingReplicas=true", + enableDeploymentReplicaSetTerminatingReplicas: true, + terminatingReplicasFirstRS: 15, + terminatingReplicasSecondRS: 1, + + expectedFirstRSReplicasDuringNewRollout: 100, + expectedSecondRSReplicasDuringNewRollout: 20, + expectedTerminatingReplicasDuringNewRollout: ptr.To[int32](15), + expectedFirstRSReplicasAfterInFlightScaleUp: 117, + expectedSecondRSReplicasAfterInFlightScaleUp: 23, + expectedTerminatingReplicasDuringInFlightScaleUp: ptr.To[int32](16), + expectedFirstRSAnnotationsAfterInFlightScaleUp: map[string]string{ + deploymentutil.DesiredReplicasAnnotation: "120", + deploymentutil.MaxReplicasAnnotation: "140", + deploymentutil.RevisionAnnotation: "1", + }, + expectedSecondRSAnnotationsAfterInFlightScaleUp: map[string]string{ + deploymentutil.DesiredReplicasAnnotation: "120", + deploymentutil.MaxReplicasAnnotation: "140", + deploymentutil.RevisionAnnotation: "2", + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DeploymentReplicaSetTerminatingReplicas, test.enableDeploymentReplicaSetTerminatingReplicas) + + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + closeFn, rm, dc, informers, c := dcSetup(ctx, t) + defer closeFn() + + name := "test-proportional-scaling" + ns := framework.CreateNamespaceOrDie(c, name, t) + defer framework.DeleteNamespaceOrDie(c, ns, t) + + // Start informer and controllers + stopControllers := runControllersAndInformers(t, rm, dc, informers) + defer stopControllers() + + deploymentName := "deployment" + replicas := int32(100) + maxSurge := int32(20) + tester := &deploymentTester{t: t, c: c, deployment: newDeployment(deploymentName, ns.Name, replicas)} + tester.deployment.Spec.Strategy.RollingUpdate.MaxSurge = ptr.To(intstr.FromInt32(maxSurge)) + tester.deployment.Spec.Strategy.RollingUpdate.MaxUnavailable = ptr.To(intstr.FromInt32(0)) + tester.deployment.Spec.Template.Spec.NodeName = "fake-node" + tester.deployment.Spec.Template.Spec.TerminationGracePeriodSeconds = ptr.To(int64(300)) + + var err error + tester.deployment, err = c.AppsV1().Deployments(ns.Name).Create(context.TODO(), tester.deployment, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("failed to create deployment %q: %v", deploymentName, err) + } + + // Ensure the deployment completes while marking its pods as ready simultaneously + if err := tester.waitForDeploymentCompleteAndMarkPodsReady(); err != nil { + t.Fatal(err) + } + // Record current replicaset before starting new rollout + firstRS, err := tester.expectNewReplicaSet() + if err != nil { + t.Fatal(err) + } + + // Terminating some replicas + err = tester.removeRSPods(ctx, firstRS, int(test.terminatingReplicasFirstRS), false, 300) + if err != nil { + t.Fatal(err) + } + // Ensure the deployment completes while marking its pods as ready simultaneously + if err := tester.waitForDeploymentCompleteAndMarkPodsReady(); err != nil { + t.Fatal(err) + } + + // Trigger a new rollout + tester.deployment, err = tester.updateDeployment(func(update *apps.Deployment) { + update.Spec.Template.Spec.Containers[0].Env = append(update.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "deploy2", Value: "true"}) + }) + if err != nil { + t.Fatalf("failed updating deployment %q: %v", deploymentName, err) + } + expectedReplicasDuringNewRollout := test.expectedFirstRSReplicasDuringNewRollout + test.expectedSecondRSReplicasDuringNewRollout + if err = tester.waitForDeploymentStatusReplicasFields(ctx, expectedReplicasDuringNewRollout, test.expectedSecondRSReplicasDuringNewRollout, test.expectedFirstRSReplicasDuringNewRollout, test.expectedFirstRSReplicasDuringNewRollout, test.expectedSecondRSReplicasDuringNewRollout, test.expectedTerminatingReplicasDuringNewRollout); err != nil { + t.Fatal(err) + } + + // Verify that the new rollout created new replica set + secondRS, err := tester.expectNewReplicaSet() + if err != nil { + t.Fatal(err) + } + + // terminating additional replicas + err = tester.removeRSPods(ctx, secondRS, int(test.terminatingReplicasSecondRS), false, 300) + if err != nil { + t.Fatal(err) + } + if err = tester.waitForDeploymentStatusReplicasFields(ctx, expectedReplicasDuringNewRollout, test.expectedSecondRSReplicasDuringNewRollout, test.expectedFirstRSReplicasDuringNewRollout, test.expectedFirstRSReplicasDuringNewRollout, test.expectedSecondRSReplicasDuringNewRollout, test.expectedTerminatingReplicasDuringInFlightScaleUp); err != nil { + t.Fatal(err) + } + + // Scale up during the deployment rollout + tester.deployment, err = tester.updateDeployment(func(update *apps.Deployment) { + update.Spec.Replicas = ptr.To[int32](120) + }) + if err != nil { + t.Fatalf("failed to update/scale deployment %q: %v", deploymentName, err) + } + expectedReplicasDuringInFlightScaleUp := test.expectedFirstRSReplicasAfterInFlightScaleUp + test.expectedSecondRSReplicasAfterInFlightScaleUp + expectedSurgeReplicas := expectedReplicasDuringInFlightScaleUp - test.expectedFirstRSReplicasDuringNewRollout + if err = tester.waitForDeploymentStatusReplicasFields(ctx, expectedReplicasDuringInFlightScaleUp, test.expectedSecondRSReplicasAfterInFlightScaleUp, test.expectedFirstRSReplicasDuringNewRollout, test.expectedFirstRSReplicasDuringNewRollout, expectedSurgeReplicas, test.expectedTerminatingReplicasDuringInFlightScaleUp); err != nil { + t.Fatal(err) + } + + // Check pod count and annotations for all replica sets + firstRS, err = c.AppsV1().ReplicaSets(ns.Name).Get(context.TODO(), firstRS.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("failed to get replicaset %q: %v", firstRS.Name, err) + } + if *(firstRS.Spec.Replicas) != test.expectedFirstRSReplicasAfterInFlightScaleUp { + t.Fatalf("unexpected first RS .spec.replicas: expect %d, got %d", test.expectedFirstRSReplicasAfterInFlightScaleUp, *(firstRS.Spec.Replicas)) + } + if !reflect.DeepEqual(test.expectedFirstRSAnnotationsAfterInFlightScaleUp, firstRS.Annotations) { + t.Fatalf("unexpected %q replica set annotations: %s", firstRS.Name, cmp.Diff(test.expectedFirstRSAnnotationsAfterInFlightScaleUp, firstRS.Annotations)) + } + + secondRS, err = c.AppsV1().ReplicaSets(ns.Name).Get(context.TODO(), secondRS.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("failed to get replicaset %q: %v", secondRS.Name, err) + } + if *(secondRS.Spec.Replicas) != test.expectedSecondRSReplicasAfterInFlightScaleUp { + t.Fatalf("unexpected second RS .spec.replicas: expect %d, got %d", test.expectedSecondRSReplicasAfterInFlightScaleUp, *(secondRS.Spec.Replicas)) + } + if !reflect.DeepEqual(test.expectedSecondRSAnnotationsAfterInFlightScaleUp, secondRS.Annotations) { + t.Fatalf("unexpected %q replica set annotations: %s", secondRS.Name, cmp.Diff(test.expectedSecondRSAnnotationsAfterInFlightScaleUp, secondRS.Annotations)) + } + + // Ensure the deployment completes while marking its pods as ready and removing terminated pods simultaneously + if err := tester.waitForDeploymentCompleteAndMarkPodsReadyAndRemoveTerminated(ctx); err != nil { + t.Fatal(err) + } + + // all replica sets' annotations should be up-to-date in the end + rss := []*apps.ReplicaSet{firstRS, secondRS} + for idx, curRS := range rss { + curRS, err = c.AppsV1().ReplicaSets(ns.Name).Get(context.TODO(), curRS.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("failed to get replicaset when checking desired replicas annotation: %v", err) + } + expectedFinalAnnotations := map[string]string{ + deploymentutil.DesiredReplicasAnnotation: "120", + deploymentutil.MaxReplicasAnnotation: "140", + deploymentutil.RevisionAnnotation: fmt.Sprintf("%d", idx+1), + } + if !reflect.DeepEqual(expectedFinalAnnotations, curRS.Annotations) { + t.Fatalf("unexpected %q replica set annotations: %s", curRS.Name, cmp.Diff(expectedFinalAnnotations, curRS.Annotations)) + } + } + }) + } +} diff --git a/test/integration/deployment/util.go b/test/integration/deployment/util.go index 670d7f814ef..a035689772b 100644 --- a/test/integration/deployment/util.go +++ b/test/integration/deployment/util.go @@ -19,6 +19,7 @@ package deployment import ( "context" "fmt" + "math" "sync" "testing" "time" @@ -220,6 +221,37 @@ func (d *deploymentTester) markUpdatedPodsReady(wg *sync.WaitGroup) { } } +// removeTerminatedPods manually removes terminated Deployment pods, +// until the deployment is complete +func (d *deploymentTester) removeTerminatedPods(ctx context.Context, wg *sync.WaitGroup) { + defer wg.Done() + + err := wait.PollUntilContextTimeout(ctx, pollInterval, pollTimeout, true, func(ctx context.Context) (bool, error) { + // We're done when the deployment is complete + if completed, err := d.deploymentComplete(); err != nil { + return false, err + } else if completed { + return true, nil + } + // Otherwise, mark remaining pods as ready + rsList, err := deploymentutil.ListReplicaSets(d.deployment, deploymentutil.RsListFromClient(d.c.AppsV1())) + if err != nil { + d.t.Log(err) + return false, nil + } + for _, rs := range rsList { + if err := d.removeRSPods(ctx, rs, math.MaxInt, true, 0); err != nil { + d.t.Log(err) + return false, nil + } + } + return false, nil + }) + if err != nil { + d.t.Errorf("failed to remove terminated Deployment pods: %v", err) + } +} + func (d *deploymentTester) deploymentComplete() (bool, error) { latest, err := d.c.AppsV1().Deployments(d.deployment.Namespace).Get(context.TODO(), d.deployment.Name, metav1.GetOptions{}) if err != nil { @@ -283,6 +315,30 @@ func (d *deploymentTester) waitForDeploymentCompleteAndMarkPodsReady() error { return nil } +// waitForDeploymentCompleteAndMarkPodsReadyAndRemoveTerminating waits for the Deployment to complete +// while marking updated Deployment pods as ready and removes terminated pods at the same time. +func (d *deploymentTester) waitForDeploymentCompleteAndMarkPodsReadyAndRemoveTerminated(ctx context.Context) error { + var wg sync.WaitGroup + + // Manually mark updated Deployment pods as ready in a separate goroutine + wg.Add(1) + go d.markUpdatedPodsReady(&wg) + + wg.Add(1) + go d.removeTerminatedPods(ctx, &wg) + + // Wait for the Deployment status to complete using soft check, while Deployment pods are becoming ready + err := d.waitForDeploymentComplete() + if err != nil { + return fmt.Errorf("failed to wait for Deployment status %s: %w", d.deployment.Name, err) + } + + // Wait for goroutine to finish + wg.Wait() + + return nil +} + func (d *deploymentTester) updateDeployment(applyUpdate testutil.UpdateDeploymentFunc) (*apps.Deployment, error) { return testutil.UpdateDeploymentWithRetries(d.c, d.deployment.Namespace, d.deployment.Name, applyUpdate, d.t.Logf, pollInterval, pollTimeout) } @@ -432,6 +488,20 @@ func (d *deploymentTester) markUpdatedPodsReadyWithoutComplete() error { return nil } +// waitForReadyReplicas waits for number of ready replicas to equal number of replicas. +func (d *deploymentTester) waitForDeploymentStatusReplicasFields(ctx context.Context, replicas, updatedReplicas, readyReplicas, availableReplicas, unavailableReplicas int32, terminatingReplicas *int32) error { + var lastErr error + err := wait.PollUntilContextTimeout(ctx, pollInterval, pollTimeout, true, func(_ context.Context) (bool, error) { + // do not pass ctx to checkDeploymentStatusReplicasFields (Deployments().Get) so we always obtain the latest error and not the one from deadline exceeded + lastErr = d.checkDeploymentStatusReplicasFields(replicas, updatedReplicas, readyReplicas, availableReplicas, unavailableReplicas, terminatingReplicas) + return lastErr == nil, nil + }) + if err != nil { + return fmt.Errorf("%w: %w", lastErr, err) + } + return nil +} + // Verify all replicas fields of DeploymentStatus have desired count. // Immediately return an error when found a non-matching replicas field. func (d *deploymentTester) checkDeploymentStatusReplicasFields(replicas, updatedReplicas, readyReplicas, availableReplicas, unavailableReplicas int32, terminatingReplicas *int32) error { @@ -456,6 +526,33 @@ func (d *deploymentTester) checkDeploymentStatusReplicasFields(replicas, updated } if !ptr.Equal(deployment.Status.TerminatingReplicas, terminatingReplicas) { return fmt.Errorf("unexpected .terminatingReplicas: expect %v, got %v", ptr.Deref(terminatingReplicas, -1), ptr.Deref(deployment.Status.TerminatingReplicas, -1)) + } return nil } + +func (d *deploymentTester) removeRSPods(ctx context.Context, replicaset *apps.ReplicaSet, count int, targetOnlyTerminating bool, gracePeriodSeconds int64) error { + selector, err := metav1.LabelSelectorAsSelector(replicaset.Spec.Selector) + if err != nil { + return fmt.Errorf("could not parse a selector for a replica set %q: %w", replicaset.Name, err) + } + pods, err := d.c.CoreV1().Pods(replicaset.Namespace).List(ctx, metav1.ListOptions{LabelSelector: selector.String()}) + if err != nil { + return fmt.Errorf("failed to get pods for a replica set %q: %w", replicaset.Name, err) + } + + for i, pod := range pods.Items { + if i >= count { + break + } + if targetOnlyTerminating && pod.DeletionTimestamp == nil { + continue + } + err := d.c.CoreV1().Pods(replicaset.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{GracePeriodSeconds: ptr.To(gracePeriodSeconds)}) + if err != nil { + return fmt.Errorf("failed to delete pod %q for a replica set %q: %w", pod.Name, replicaset.Name, err) + } + } + + return nil +}