From e263b878eef5c8cf2ef5f0b17ab359abfd203ab6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filip=20K=C5=99epinsk=C3=BD?= Date: Thu, 29 Feb 2024 21:56:48 +0100 Subject: [PATCH] Deployment controller should count terminating pods in the status --- pkg/controller/deployment/sync.go | 5 + .../deployment/util/deployment_util.go | 20 ++++ .../deployment/util/deployment_util_test.go | 95 ++++++++++++---- .../integration/deployment/deployment_test.go | 104 +++++++++++++++++- test/integration/deployment/util.go | 10 +- 5 files changed, 206 insertions(+), 28 deletions(-) diff --git a/pkg/controller/deployment/sync.go b/pkg/controller/deployment/sync.go index b03c3be8f11..b924e4cf589 100644 --- a/pkg/controller/deployment/sync.go +++ b/pkg/controller/deployment/sync.go @@ -27,9 +27,11 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller" deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" + "k8s.io/kubernetes/pkg/features" labelsutil "k8s.io/kubernetes/pkg/util/labels" ) @@ -504,6 +506,9 @@ func calculateStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployme UnavailableReplicas: unavailableReplicas, CollisionCount: deployment.Status.CollisionCount, } + if utilfeature.DefaultFeatureGate.Enabled(features.DeploymentPodReplacementPolicy) { + status.TerminatingReplicas = deploymentutil.GetTerminatingReplicaCountForReplicaSets(allRSs) + } // Copy conditions one by one so we won't mutate the original object. conditions := deployment.Status.Conditions diff --git a/pkg/controller/deployment/util/deployment_util.go b/pkg/controller/deployment/util/deployment_util.go index 341d0a9e4dc..473812a120d 100644 --- a/pkg/controller/deployment/util/deployment_util.go +++ b/pkg/controller/deployment/util/deployment_util.go @@ -714,6 +714,26 @@ func GetAvailableReplicaCountForReplicaSets(replicaSets []*apps.ReplicaSet) int3 return totalAvailableReplicas } +// GetTerminatingReplicaCountForReplicaSets returns the number of terminating pods for all replica sets +// or returns an error if any replica sets have been synced by the controller but do not report their terminating count. +func GetTerminatingReplicaCountForReplicaSets(replicaSets []*apps.ReplicaSet) *int32 { + terminatingReplicas := int32(0) + for _, rs := range replicaSets { + switch { + case rs == nil: + // No-op + case rs.Status.ObservedGeneration == 0 && rs.Status.TerminatingReplicas == nil: + // Replicasets that have never been synced by the controller don't contribute to TerminatingReplicas + case rs.Status.TerminatingReplicas == nil: + // If any replicaset synced by the controller hasn't reported TerminatingReplicas, we cannot calculate a sum + return nil + default: + terminatingReplicas += *rs.Status.TerminatingReplicas + } + } + return &terminatingReplicas +} + // IsRollingUpdate returns true if the strategy type is a rolling update. func IsRollingUpdate(deployment *apps.Deployment) bool { return deployment.Spec.Strategy.Type == apps.RollingUpdateDeploymentStrategyType diff --git a/pkg/controller/deployment/util/deployment_util_test.go b/pkg/controller/deployment/util/deployment_util_test.go index 17554644eb9..55349d7a9a1 100644 --- a/pkg/controller/deployment/util/deployment_util_test.go +++ b/pkg/controller/deployment/util/deployment_util_test.go @@ -57,7 +57,7 @@ func generateRS(deployment apps.Deployment) apps.ReplicaSet { return apps.ReplicaSet{ ObjectMeta: metav1.ObjectMeta{ UID: randomUID(), - Name: names.SimpleNameGenerator.GenerateName("replicaset"), + Name: names.SimpleNameGenerator.GenerateName(deployment.Name), Labels: template.Labels, OwnerReferences: []metav1.OwnerReference{*newDControllerRef(&deployment)}, }, @@ -344,42 +344,93 @@ func TestFindOldReplicaSets(t *testing.T) { } func TestGetReplicaCountForReplicaSets(t *testing.T) { - rs1 := generateRS(generateDeployment("foo")) + rs1 := generateRS(generateDeployment("foo-rs")) + rs1.Status.ObservedGeneration = 1 *(rs1.Spec.Replicas) = 1 rs1.Status.Replicas = 2 - rs2 := generateRS(generateDeployment("bar")) + rs1.Status.TerminatingReplicas = ptr.To[int32](3) + + rs2 := generateRS(generateDeployment("bar-rs")) + rs1.Status.ObservedGeneration = 1 *(rs2.Spec.Replicas) = 2 rs2.Status.Replicas = 3 + rs2.Status.TerminatingReplicas = ptr.To[int32](1) + + rs3 := generateRS(generateDeployment("unsynced-rs")) + *(rs3.Spec.Replicas) = 3 + rs3.Status.Replicas = 0 + rs3.Status.TerminatingReplicas = nil + + rs4 := generateRS(generateDeployment("dropped-rs")) + rs4.Status.ObservedGeneration = 1 + *(rs4.Spec.Replicas) = 1 + rs4.Status.Replicas = 1 + rs4.Status.TerminatingReplicas = nil tests := []struct { - Name string - sets []*apps.ReplicaSet - expectedCount int32 - expectedActual int32 + name string + sets []*apps.ReplicaSet + expectedCount int32 + expectedActual int32 + expectedTerminating *int32 }{ { - "1:2 Replicas", - []*apps.ReplicaSet{&rs1}, - 1, - 2, + name: "scaling down rs1", + sets: []*apps.ReplicaSet{&rs1}, + expectedCount: 1, + expectedActual: 2, + expectedTerminating: ptr.To[int32](3), }, { - "3:5 Replicas", - []*apps.ReplicaSet{&rs1, &rs2}, - 3, - 5, + name: "scaling down rs1 and rs2", + sets: []*apps.ReplicaSet{&rs1, &rs2}, + expectedCount: 3, + expectedActual: 5, + expectedTerminating: ptr.To[int32](4), + }, + { + name: "scaling up rs3", + sets: []*apps.ReplicaSet{&rs3}, + expectedCount: 3, + expectedActual: 0, + expectedTerminating: ptr.To[int32](0), + }, + { + name: "scaling down rs1 and rs2 and scaling up rs3", + sets: []*apps.ReplicaSet{&rs1, &rs2, &rs3}, + expectedCount: 6, + expectedActual: 5, + expectedTerminating: ptr.To[int32](4), + }, + { + name: "invalid/unknown terminating status for rs4", + sets: []*apps.ReplicaSet{&rs4}, + expectedCount: 1, + expectedActual: 1, + expectedTerminating: nil, + }, + { + name: "invalid/unknown terminating status for rs4 with rs1, rs2 and rs3", + sets: []*apps.ReplicaSet{&rs1, &rs2, &rs3, &rs4}, + expectedCount: 7, + expectedActual: 6, + expectedTerminating: nil, }, } for _, test := range tests { - t.Run(test.Name, func(t *testing.T) { - rs := GetReplicaCountForReplicaSets(test.sets) - if rs != test.expectedCount { - t.Errorf("In test case %s, expectedCount %+v, got %+v", test.Name, test.expectedCount, rs) + t.Run(test.name, func(t *testing.T) { + replicasCount := GetReplicaCountForReplicaSets(test.sets) + if replicasCount != test.expectedCount { + t.Errorf("expectedCount %d, got %d", test.expectedCount, replicasCount) } - rs = GetActualReplicaCountForReplicaSets(test.sets) - if rs != test.expectedActual { - t.Errorf("In test case %s, expectedActual %+v, got %+v", test.Name, test.expectedActual, rs) + actualReplicasCount := GetActualReplicaCountForReplicaSets(test.sets) + if actualReplicasCount != test.expectedActual { + t.Errorf("expectedActual %d, got %d", test.expectedActual, actualReplicasCount) + } + terminatingReplicasCount := GetTerminatingReplicaCountForReplicaSets(test.sets) + if !ptr.Equal(terminatingReplicasCount, test.expectedTerminating) { + t.Errorf("expectedTerminating %d, got %d", ptr.Deref(test.expectedTerminating, -1), ptr.Deref(terminatingReplicasCount, -1)) } }) } diff --git a/test/integration/deployment/deployment_test.go b/test/integration/deployment/deployment_test.go index 6d46b3243d0..11748383eff 100644 --- a/test/integration/deployment/deployment_test.go +++ b/test/integration/deployment/deployment_test.go @@ -28,9 +28,12 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/util/retry" + featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/klog/v2/ktesting" deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/test/integration/framework" testutil "k8s.io/kubernetes/test/utils" "k8s.io/utils/ptr" @@ -965,7 +968,7 @@ func TestDeploymentAvailableCondition(t *testing.T) { } // Verify all replicas fields of DeploymentStatus have desired counts - if err = tester.checkDeploymentStatusReplicasFields(10, 10, 0, 0, 10); err != nil { + if err = tester.checkDeploymentStatusReplicasFields(10, 10, 0, 0, 10, nil); err != nil { t.Fatal(err) } @@ -985,7 +988,7 @@ func TestDeploymentAvailableCondition(t *testing.T) { } // Verify all replicas fields of DeploymentStatus have desired counts - if err = tester.checkDeploymentStatusReplicasFields(10, 10, 10, 0, 10); err != nil { + if err = tester.checkDeploymentStatusReplicasFields(10, 10, 10, 0, 10, nil); err != nil { t.Fatal(err) } @@ -1008,7 +1011,7 @@ func TestDeploymentAvailableCondition(t *testing.T) { } // Verify all replicas fields of DeploymentStatus have desired counts - if err = tester.checkDeploymentStatusReplicasFields(10, 10, 10, 10, 0); err != nil { + if err = tester.checkDeploymentStatusReplicasFields(10, 10, 10, 10, 0, nil); err != nil { t.Fatal(err) } } @@ -1303,3 +1306,98 @@ func TestReplicaSetOrphaningAndAdoptionWhenLabelsChange(t *testing.T) { t.Fatalf("failed waiting for replicaset adoption by deployment %q to complete: %v", deploymentName, err) } } + +func TestTerminatingReplicasDeploymentStatus(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DeploymentPodReplacementPolicy, false) + + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + closeFn, rm, dc, informers, c := dcSetup(ctx, t) + defer closeFn() + + name := "test-terminating-replica-status" + ns := framework.CreateNamespaceOrDie(c, name, t) + defer framework.DeleteNamespaceOrDie(c, ns, t) + + deploymentName := "deployment" + replicas := int32(6) + tester := &deploymentTester{t: t, c: c, deployment: newDeployment(deploymentName, ns.Name, replicas)} + tester.deployment.Spec.Strategy.Type = apps.RecreateDeploymentStrategyType + tester.deployment.Spec.Strategy.RollingUpdate = nil + tester.deployment.Spec.Template.Spec.NodeName = "fake-node" + tester.deployment.Spec.Template.Spec.TerminationGracePeriodSeconds = ptr.To(int64(300)) + + var err error + tester.deployment, err = c.AppsV1().Deployments(ns.Name).Create(context.TODO(), tester.deployment, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("failed to create deployment %q: %v", deploymentName, err) + } + + // Start informer and controllers + stopControllers := runControllersAndInformers(t, rm, dc, informers) + defer stopControllers() + + // Ensure the deployment completes while marking its pods as ready simultaneously + if err := tester.waitForDeploymentCompleteAndMarkPodsReady(); err != nil { + t.Fatal(err) + } + // Should not update terminating replicas when feature gate is disabled + // Verify all replicas fields of DeploymentStatus have desired counts + if err = tester.checkDeploymentStatusReplicasFields(6, 6, 6, 6, 0, nil); err != nil { + t.Fatal(err) + } + + // Scale down the deployment + tester.deployment, err = tester.updateDeployment(func(update *apps.Deployment) { + update.Spec.Replicas = ptr.To(int32(4)) + }) + if err != nil { + t.Fatalf("failed updating deployment %q: %v", deploymentName, err) + } + // Wait for number of ready replicas to equal number of replicas. + if err = tester.waitForReadyReplicas(); err != nil { + t.Fatal(err) + } + // Verify all replicas fields of DeploymentStatus have desired counts + if err = tester.checkDeploymentStatusReplicasFields(4, 4, 4, 4, 0, nil); err != nil { + t.Fatal(err) + } + + // should update terminating replicas when feature gate is enabled + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DeploymentPodReplacementPolicy, true) + // Scale down the deployment + tester.deployment, err = tester.updateDeployment(func(update *apps.Deployment) { + update.Spec.Replicas = ptr.To(int32(3)) + }) + if err != nil { + t.Fatalf("failed updating deployment %q: %v", deploymentName, err) + } + // Wait for number of ready replicas to equal number of replicas. + if err = tester.waitForReadyReplicas(); err != nil { + t.Fatal(err) + } + // Verify all replicas fields of DeploymentStatus have desired counts + if err = tester.checkDeploymentStatusReplicasFields(3, 3, 3, 3, 0, ptr.To[int32](3)); err != nil { + t.Fatal(err) + } + + // should not update terminating replicas when feature gate is disabled + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DeploymentPodReplacementPolicy, false) + // Scale down the deployment + tester.deployment, err = tester.updateDeployment(func(update *apps.Deployment) { + update.Spec.Replicas = ptr.To(int32(2)) + }) + if err != nil { + t.Fatalf("failed updating deployment %q: %v", deploymentName, err) + } + // Wait for number of ready replicas to equal number of replicas. + if err = tester.waitForReadyReplicas(); err != nil { + t.Fatal(err) + } + // Verify all replicas fields of DeploymentStatus have desired counts + if err = tester.checkDeploymentStatusReplicasFields(2, 2, 2, 2, 0, nil); err != nil { + t.Fatal(err) + } +} diff --git a/test/integration/deployment/util.go b/test/integration/deployment/util.go index 92a6f05b0f8..670d7f814ef 100644 --- a/test/integration/deployment/util.go +++ b/test/integration/deployment/util.go @@ -37,6 +37,7 @@ import ( "k8s.io/kubernetes/pkg/controller/replicaset" "k8s.io/kubernetes/test/integration/framework" testutil "k8s.io/kubernetes/test/utils" + "k8s.io/utils/ptr" ) const ( @@ -433,7 +434,7 @@ func (d *deploymentTester) markUpdatedPodsReadyWithoutComplete() error { // Verify all replicas fields of DeploymentStatus have desired count. // Immediately return an error when found a non-matching replicas field. -func (d *deploymentTester) checkDeploymentStatusReplicasFields(replicas, updatedReplicas, readyReplicas, availableReplicas, unavailableReplicas int32) error { +func (d *deploymentTester) checkDeploymentStatusReplicasFields(replicas, updatedReplicas, readyReplicas, availableReplicas, unavailableReplicas int32, terminatingReplicas *int32) error { deployment, err := d.c.AppsV1().Deployments(d.deployment.Namespace).Get(context.TODO(), d.deployment.Name, metav1.GetOptions{}) if err != nil { return fmt.Errorf("failed to get deployment %q: %v", d.deployment.Name, err) @@ -448,10 +449,13 @@ func (d *deploymentTester) checkDeploymentStatusReplicasFields(replicas, updated return fmt.Errorf("unexpected .readyReplicas: expect %d, got %d", readyReplicas, deployment.Status.ReadyReplicas) } if deployment.Status.AvailableReplicas != availableReplicas { - return fmt.Errorf("unexpected .replicas: expect %d, got %d", availableReplicas, deployment.Status.AvailableReplicas) + return fmt.Errorf("unexpected .availableReplicas: expect %d, got %d", availableReplicas, deployment.Status.AvailableReplicas) } if deployment.Status.UnavailableReplicas != unavailableReplicas { - return fmt.Errorf("unexpected .replicas: expect %d, got %d", unavailableReplicas, deployment.Status.UnavailableReplicas) + return fmt.Errorf("unexpected .unavailableReplicas: expect %d, got %d", unavailableReplicas, deployment.Status.UnavailableReplicas) + } + if !ptr.Equal(deployment.Status.TerminatingReplicas, terminatingReplicas) { + return fmt.Errorf("unexpected .terminatingReplicas: expect %v, got %v", ptr.Deref(terminatingReplicas, -1), ptr.Deref(deployment.Status.TerminatingReplicas, -1)) } return nil }