Deployment controller should count terminating pods in the status

This commit is contained in:
Filip Křepinský 2024-02-29 21:56:48 +01:00
parent dc1914c61c
commit e263b878ee
5 changed files with 206 additions and 28 deletions

View File

@ -27,9 +27,11 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
"k8s.io/kubernetes/pkg/features"
labelsutil "k8s.io/kubernetes/pkg/util/labels" labelsutil "k8s.io/kubernetes/pkg/util/labels"
) )
@ -504,6 +506,9 @@ func calculateStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployme
UnavailableReplicas: unavailableReplicas, UnavailableReplicas: unavailableReplicas,
CollisionCount: deployment.Status.CollisionCount, CollisionCount: deployment.Status.CollisionCount,
} }
if utilfeature.DefaultFeatureGate.Enabled(features.DeploymentPodReplacementPolicy) {
status.TerminatingReplicas = deploymentutil.GetTerminatingReplicaCountForReplicaSets(allRSs)
}
// Copy conditions one by one so we won't mutate the original object. // Copy conditions one by one so we won't mutate the original object.
conditions := deployment.Status.Conditions conditions := deployment.Status.Conditions

View File

@ -714,6 +714,26 @@ func GetAvailableReplicaCountForReplicaSets(replicaSets []*apps.ReplicaSet) int3
return totalAvailableReplicas return totalAvailableReplicas
} }
// GetTerminatingReplicaCountForReplicaSets returns the number of terminating pods for all replica sets
// or returns an error if any replica sets have been synced by the controller but do not report their terminating count.
func GetTerminatingReplicaCountForReplicaSets(replicaSets []*apps.ReplicaSet) *int32 {
terminatingReplicas := int32(0)
for _, rs := range replicaSets {
switch {
case rs == nil:
// No-op
case rs.Status.ObservedGeneration == 0 && rs.Status.TerminatingReplicas == nil:
// Replicasets that have never been synced by the controller don't contribute to TerminatingReplicas
case rs.Status.TerminatingReplicas == nil:
// If any replicaset synced by the controller hasn't reported TerminatingReplicas, we cannot calculate a sum
return nil
default:
terminatingReplicas += *rs.Status.TerminatingReplicas
}
}
return &terminatingReplicas
}
// IsRollingUpdate returns true if the strategy type is a rolling update. // IsRollingUpdate returns true if the strategy type is a rolling update.
func IsRollingUpdate(deployment *apps.Deployment) bool { func IsRollingUpdate(deployment *apps.Deployment) bool {
return deployment.Spec.Strategy.Type == apps.RollingUpdateDeploymentStrategyType return deployment.Spec.Strategy.Type == apps.RollingUpdateDeploymentStrategyType

View File

@ -57,7 +57,7 @@ func generateRS(deployment apps.Deployment) apps.ReplicaSet {
return apps.ReplicaSet{ return apps.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
UID: randomUID(), UID: randomUID(),
Name: names.SimpleNameGenerator.GenerateName("replicaset"), Name: names.SimpleNameGenerator.GenerateName(deployment.Name),
Labels: template.Labels, Labels: template.Labels,
OwnerReferences: []metav1.OwnerReference{*newDControllerRef(&deployment)}, OwnerReferences: []metav1.OwnerReference{*newDControllerRef(&deployment)},
}, },
@ -344,42 +344,93 @@ func TestFindOldReplicaSets(t *testing.T) {
} }
func TestGetReplicaCountForReplicaSets(t *testing.T) { func TestGetReplicaCountForReplicaSets(t *testing.T) {
rs1 := generateRS(generateDeployment("foo")) rs1 := generateRS(generateDeployment("foo-rs"))
rs1.Status.ObservedGeneration = 1
*(rs1.Spec.Replicas) = 1 *(rs1.Spec.Replicas) = 1
rs1.Status.Replicas = 2 rs1.Status.Replicas = 2
rs2 := generateRS(generateDeployment("bar")) rs1.Status.TerminatingReplicas = ptr.To[int32](3)
rs2 := generateRS(generateDeployment("bar-rs"))
rs1.Status.ObservedGeneration = 1
*(rs2.Spec.Replicas) = 2 *(rs2.Spec.Replicas) = 2
rs2.Status.Replicas = 3 rs2.Status.Replicas = 3
rs2.Status.TerminatingReplicas = ptr.To[int32](1)
rs3 := generateRS(generateDeployment("unsynced-rs"))
*(rs3.Spec.Replicas) = 3
rs3.Status.Replicas = 0
rs3.Status.TerminatingReplicas = nil
rs4 := generateRS(generateDeployment("dropped-rs"))
rs4.Status.ObservedGeneration = 1
*(rs4.Spec.Replicas) = 1
rs4.Status.Replicas = 1
rs4.Status.TerminatingReplicas = nil
tests := []struct { tests := []struct {
Name string name string
sets []*apps.ReplicaSet sets []*apps.ReplicaSet
expectedCount int32 expectedCount int32
expectedActual int32 expectedActual int32
expectedTerminating *int32
}{ }{
{ {
"1:2 Replicas", name: "scaling down rs1",
[]*apps.ReplicaSet{&rs1}, sets: []*apps.ReplicaSet{&rs1},
1, expectedCount: 1,
2, expectedActual: 2,
expectedTerminating: ptr.To[int32](3),
}, },
{ {
"3:5 Replicas", name: "scaling down rs1 and rs2",
[]*apps.ReplicaSet{&rs1, &rs2}, sets: []*apps.ReplicaSet{&rs1, &rs2},
3, expectedCount: 3,
5, expectedActual: 5,
expectedTerminating: ptr.To[int32](4),
},
{
name: "scaling up rs3",
sets: []*apps.ReplicaSet{&rs3},
expectedCount: 3,
expectedActual: 0,
expectedTerminating: ptr.To[int32](0),
},
{
name: "scaling down rs1 and rs2 and scaling up rs3",
sets: []*apps.ReplicaSet{&rs1, &rs2, &rs3},
expectedCount: 6,
expectedActual: 5,
expectedTerminating: ptr.To[int32](4),
},
{
name: "invalid/unknown terminating status for rs4",
sets: []*apps.ReplicaSet{&rs4},
expectedCount: 1,
expectedActual: 1,
expectedTerminating: nil,
},
{
name: "invalid/unknown terminating status for rs4 with rs1, rs2 and rs3",
sets: []*apps.ReplicaSet{&rs1, &rs2, &rs3, &rs4},
expectedCount: 7,
expectedActual: 6,
expectedTerminating: nil,
}, },
} }
for _, test := range tests { for _, test := range tests {
t.Run(test.Name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
rs := GetReplicaCountForReplicaSets(test.sets) replicasCount := GetReplicaCountForReplicaSets(test.sets)
if rs != test.expectedCount { if replicasCount != test.expectedCount {
t.Errorf("In test case %s, expectedCount %+v, got %+v", test.Name, test.expectedCount, rs) t.Errorf("expectedCount %d, got %d", test.expectedCount, replicasCount)
} }
rs = GetActualReplicaCountForReplicaSets(test.sets) actualReplicasCount := GetActualReplicaCountForReplicaSets(test.sets)
if rs != test.expectedActual { if actualReplicasCount != test.expectedActual {
t.Errorf("In test case %s, expectedActual %+v, got %+v", test.Name, test.expectedActual, rs) t.Errorf("expectedActual %d, got %d", test.expectedActual, actualReplicasCount)
}
terminatingReplicasCount := GetTerminatingReplicaCountForReplicaSets(test.sets)
if !ptr.Equal(terminatingReplicasCount, test.expectedTerminating) {
t.Errorf("expectedTerminating %d, got %d", ptr.Deref(test.expectedTerminating, -1), ptr.Deref(terminatingReplicasCount, -1))
} }
}) })
} }

View File

@ -28,9 +28,12 @@ import (
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/util/retry" "k8s.io/client-go/util/retry"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2/ktesting" "k8s.io/klog/v2/ktesting"
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/framework"
testutil "k8s.io/kubernetes/test/utils" testutil "k8s.io/kubernetes/test/utils"
"k8s.io/utils/ptr" "k8s.io/utils/ptr"
@ -965,7 +968,7 @@ func TestDeploymentAvailableCondition(t *testing.T) {
} }
// Verify all replicas fields of DeploymentStatus have desired counts // Verify all replicas fields of DeploymentStatus have desired counts
if err = tester.checkDeploymentStatusReplicasFields(10, 10, 0, 0, 10); err != nil { if err = tester.checkDeploymentStatusReplicasFields(10, 10, 0, 0, 10, nil); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -985,7 +988,7 @@ func TestDeploymentAvailableCondition(t *testing.T) {
} }
// Verify all replicas fields of DeploymentStatus have desired counts // Verify all replicas fields of DeploymentStatus have desired counts
if err = tester.checkDeploymentStatusReplicasFields(10, 10, 10, 0, 10); err != nil { if err = tester.checkDeploymentStatusReplicasFields(10, 10, 10, 0, 10, nil); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -1008,7 +1011,7 @@ func TestDeploymentAvailableCondition(t *testing.T) {
} }
// Verify all replicas fields of DeploymentStatus have desired counts // Verify all replicas fields of DeploymentStatus have desired counts
if err = tester.checkDeploymentStatusReplicasFields(10, 10, 10, 10, 0); err != nil { if err = tester.checkDeploymentStatusReplicasFields(10, 10, 10, 10, 0, nil); err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
@ -1303,3 +1306,98 @@ func TestReplicaSetOrphaningAndAdoptionWhenLabelsChange(t *testing.T) {
t.Fatalf("failed waiting for replicaset adoption by deployment %q to complete: %v", deploymentName, err) t.Fatalf("failed waiting for replicaset adoption by deployment %q to complete: %v", deploymentName, err)
} }
} }
func TestTerminatingReplicasDeploymentStatus(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DeploymentPodReplacementPolicy, false)
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
closeFn, rm, dc, informers, c := dcSetup(ctx, t)
defer closeFn()
name := "test-terminating-replica-status"
ns := framework.CreateNamespaceOrDie(c, name, t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
deploymentName := "deployment"
replicas := int32(6)
tester := &deploymentTester{t: t, c: c, deployment: newDeployment(deploymentName, ns.Name, replicas)}
tester.deployment.Spec.Strategy.Type = apps.RecreateDeploymentStrategyType
tester.deployment.Spec.Strategy.RollingUpdate = nil
tester.deployment.Spec.Template.Spec.NodeName = "fake-node"
tester.deployment.Spec.Template.Spec.TerminationGracePeriodSeconds = ptr.To(int64(300))
var err error
tester.deployment, err = c.AppsV1().Deployments(ns.Name).Create(context.TODO(), tester.deployment, metav1.CreateOptions{})
if err != nil {
t.Fatalf("failed to create deployment %q: %v", deploymentName, err)
}
// Start informer and controllers
stopControllers := runControllersAndInformers(t, rm, dc, informers)
defer stopControllers()
// Ensure the deployment completes while marking its pods as ready simultaneously
if err := tester.waitForDeploymentCompleteAndMarkPodsReady(); err != nil {
t.Fatal(err)
}
// Should not update terminating replicas when feature gate is disabled
// Verify all replicas fields of DeploymentStatus have desired counts
if err = tester.checkDeploymentStatusReplicasFields(6, 6, 6, 6, 0, nil); err != nil {
t.Fatal(err)
}
// Scale down the deployment
tester.deployment, err = tester.updateDeployment(func(update *apps.Deployment) {
update.Spec.Replicas = ptr.To(int32(4))
})
if err != nil {
t.Fatalf("failed updating deployment %q: %v", deploymentName, err)
}
// Wait for number of ready replicas to equal number of replicas.
if err = tester.waitForReadyReplicas(); err != nil {
t.Fatal(err)
}
// Verify all replicas fields of DeploymentStatus have desired counts
if err = tester.checkDeploymentStatusReplicasFields(4, 4, 4, 4, 0, nil); err != nil {
t.Fatal(err)
}
// should update terminating replicas when feature gate is enabled
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DeploymentPodReplacementPolicy, true)
// Scale down the deployment
tester.deployment, err = tester.updateDeployment(func(update *apps.Deployment) {
update.Spec.Replicas = ptr.To(int32(3))
})
if err != nil {
t.Fatalf("failed updating deployment %q: %v", deploymentName, err)
}
// Wait for number of ready replicas to equal number of replicas.
if err = tester.waitForReadyReplicas(); err != nil {
t.Fatal(err)
}
// Verify all replicas fields of DeploymentStatus have desired counts
if err = tester.checkDeploymentStatusReplicasFields(3, 3, 3, 3, 0, ptr.To[int32](3)); err != nil {
t.Fatal(err)
}
// should not update terminating replicas when feature gate is disabled
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DeploymentPodReplacementPolicy, false)
// Scale down the deployment
tester.deployment, err = tester.updateDeployment(func(update *apps.Deployment) {
update.Spec.Replicas = ptr.To(int32(2))
})
if err != nil {
t.Fatalf("failed updating deployment %q: %v", deploymentName, err)
}
// Wait for number of ready replicas to equal number of replicas.
if err = tester.waitForReadyReplicas(); err != nil {
t.Fatal(err)
}
// Verify all replicas fields of DeploymentStatus have desired counts
if err = tester.checkDeploymentStatusReplicasFields(2, 2, 2, 2, 0, nil); err != nil {
t.Fatal(err)
}
}

View File

@ -37,6 +37,7 @@ import (
"k8s.io/kubernetes/pkg/controller/replicaset" "k8s.io/kubernetes/pkg/controller/replicaset"
"k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/framework"
testutil "k8s.io/kubernetes/test/utils" testutil "k8s.io/kubernetes/test/utils"
"k8s.io/utils/ptr"
) )
const ( const (
@ -433,7 +434,7 @@ func (d *deploymentTester) markUpdatedPodsReadyWithoutComplete() error {
// Verify all replicas fields of DeploymentStatus have desired count. // Verify all replicas fields of DeploymentStatus have desired count.
// Immediately return an error when found a non-matching replicas field. // Immediately return an error when found a non-matching replicas field.
func (d *deploymentTester) checkDeploymentStatusReplicasFields(replicas, updatedReplicas, readyReplicas, availableReplicas, unavailableReplicas int32) error { func (d *deploymentTester) checkDeploymentStatusReplicasFields(replicas, updatedReplicas, readyReplicas, availableReplicas, unavailableReplicas int32, terminatingReplicas *int32) error {
deployment, err := d.c.AppsV1().Deployments(d.deployment.Namespace).Get(context.TODO(), d.deployment.Name, metav1.GetOptions{}) deployment, err := d.c.AppsV1().Deployments(d.deployment.Namespace).Get(context.TODO(), d.deployment.Name, metav1.GetOptions{})
if err != nil { if err != nil {
return fmt.Errorf("failed to get deployment %q: %v", d.deployment.Name, err) return fmt.Errorf("failed to get deployment %q: %v", d.deployment.Name, err)
@ -448,10 +449,13 @@ func (d *deploymentTester) checkDeploymentStatusReplicasFields(replicas, updated
return fmt.Errorf("unexpected .readyReplicas: expect %d, got %d", readyReplicas, deployment.Status.ReadyReplicas) return fmt.Errorf("unexpected .readyReplicas: expect %d, got %d", readyReplicas, deployment.Status.ReadyReplicas)
} }
if deployment.Status.AvailableReplicas != availableReplicas { if deployment.Status.AvailableReplicas != availableReplicas {
return fmt.Errorf("unexpected .replicas: expect %d, got %d", availableReplicas, deployment.Status.AvailableReplicas) return fmt.Errorf("unexpected .availableReplicas: expect %d, got %d", availableReplicas, deployment.Status.AvailableReplicas)
} }
if deployment.Status.UnavailableReplicas != unavailableReplicas { if deployment.Status.UnavailableReplicas != unavailableReplicas {
return fmt.Errorf("unexpected .replicas: expect %d, got %d", unavailableReplicas, deployment.Status.UnavailableReplicas) return fmt.Errorf("unexpected .unavailableReplicas: expect %d, got %d", unavailableReplicas, deployment.Status.UnavailableReplicas)
}
if !ptr.Equal(deployment.Status.TerminatingReplicas, terminatingReplicas) {
return fmt.Errorf("unexpected .terminatingReplicas: expect %v, got %v", ptr.Deref(terminatingReplicas, -1), ptr.Deref(deployment.Status.TerminatingReplicas, -1))
} }
return nil return nil
} }