disable terminatingReplicas reconciliation in ReplicationController

This commit is contained in:
Filip Křepinský
2025-05-16 15:09:57 +02:00
parent 02eb7d424a
commit b7d16fea7f
5 changed files with 71 additions and 11 deletions

View File

@@ -115,6 +115,21 @@ type ReplicaSetController struct {
// Controllers that need to be synced
queue workqueue.TypedRateLimitingInterface[string]
// Controller specific features; see ReplicaSetControllerFeatures for details.
controllerFeatures ReplicaSetControllerFeatures
}
// ReplicaSetControllerFeatures that can be set in accordance with the controller type (GVK).
// These do not have to correspond to FeatureGates, or their stability, nor do they have to undergo graduation.
type ReplicaSetControllerFeatures struct {
EnableStatusTerminatingReplicas bool
}
func DefaultReplicaSetControllerFeatures() ReplicaSetControllerFeatures {
return ReplicaSetControllerFeatures{
EnableStatusTerminatingReplicas: true,
}
}
// NewReplicaSetController configures a replica set controller with the specified event recorder
@@ -133,13 +148,14 @@ func NewReplicaSetController(ctx context.Context, rsInformer appsinformers.Repli
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replicaset-controller"}),
},
eventBroadcaster,
DefaultReplicaSetControllerFeatures(),
)
}
// NewBaseController is the implementation of NewReplicaSetController with additional injected
// parameters so that it can also serve as the implementation of NewReplicationController.
func NewBaseController(logger klog.Logger, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface, eventBroadcaster record.EventBroadcaster) *ReplicaSetController {
gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface, eventBroadcaster record.EventBroadcaster, controllerFeatures ReplicaSetControllerFeatures) *ReplicaSetController {
rsc := &ReplicaSetController{
GroupVersionKind: gvk,
@@ -152,6 +168,7 @@ func NewBaseController(logger klog.Logger, rsInformer appsinformers.ReplicaSetIn
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{Name: queueName},
),
controllerFeatures: controllerFeatures,
}
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@@ -743,7 +760,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string)
}
var terminatingPods []*v1.Pod
if utilfeature.DefaultFeatureGate.Enabled(features.DeploymentReplicaSetTerminatingReplicas) {
if utilfeature.DefaultFeatureGate.Enabled(features.DeploymentReplicaSetTerminatingReplicas) && rsc.controllerFeatures.EnableStatusTerminatingReplicas {
allTerminatingPods := controller.FilterTerminatingPods(allRSPods)
terminatingPods = controller.FilterClaimedPods(rs, selector, allTerminatingPods)
}
@@ -753,10 +770,10 @@ func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string)
manageReplicasErr = rsc.manageReplicas(ctx, activePods, rs)
}
rs = rs.DeepCopy()
newStatus := calculateStatus(rs, activePods, terminatingPods, manageReplicasErr)
newStatus := calculateStatus(rs, activePods, terminatingPods, manageReplicasErr, rsc.controllerFeatures)
// Always updates status as pods come up or die.
updatedRS, err := updateReplicaSetStatus(logger, rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
updatedRS, err := updateReplicaSetStatus(logger, rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus, rsc.controllerFeatures)
if err != nil {
// Multiple things could lead to this update failing. Requeuing the replica set ensures
// Returning an error causes a requeue without forcing a hotloop

View File

@@ -895,7 +895,10 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) {
numReplicas := int32(10)
newStatus := apps.ReplicaSetStatus{Replicas: numReplicas}
logger, _ := ktesting.NewTestContext(t)
updateReplicaSetStatus(logger, fakeRSClient, rs, newStatus)
_, err := updateReplicaSetStatus(logger, fakeRSClient, rs, newStatus, DefaultReplicaSetControllerFeatures())
if err == nil {
t.Errorf("Expected update err")
}
updates, gets := 0, 0
for _, a := range fakeClient.Actions() {
if a.GetResource().Resource != "replicasets" {

View File

@@ -37,7 +37,7 @@ import (
)
// updateReplicaSetStatus attempts to update the Status.Replicas of the given ReplicaSet, with a single GET/PUT retry.
func updateReplicaSetStatus(logger klog.Logger, c appsclient.ReplicaSetInterface, rs *apps.ReplicaSet, newStatus apps.ReplicaSetStatus) (*apps.ReplicaSet, error) {
func updateReplicaSetStatus(logger klog.Logger, c appsclient.ReplicaSetInterface, rs *apps.ReplicaSet, newStatus apps.ReplicaSetStatus, controllerFeatures ReplicaSetControllerFeatures) (*apps.ReplicaSet, error) {
// This is the steady state. It happens when the ReplicaSet doesn't have any expectations, since
// we do a periodic relist every 30s. If the generations differ but the replicas are
// the same, a caller might've resized to the same replica count.
@@ -61,7 +61,7 @@ func updateReplicaSetStatus(logger klog.Logger, c appsclient.ReplicaSetInterface
var updatedRS *apps.ReplicaSet
for i, rs := 0, rs; ; i++ {
terminatingReplicasUpdateInfo := ""
if utilfeature.DefaultFeatureGate.Enabled(features.DeploymentReplicaSetTerminatingReplicas) {
if utilfeature.DefaultFeatureGate.Enabled(features.DeploymentReplicaSetTerminatingReplicas) && controllerFeatures.EnableStatusTerminatingReplicas {
terminatingReplicasUpdateInfo = fmt.Sprintf("terminatingReplicas %s->%s, ", derefInt32ToStr(rs.Status.TerminatingReplicas), derefInt32ToStr(newStatus.TerminatingReplicas))
}
logger.V(4).Info(fmt.Sprintf("Updating status for %v: %s/%s, ", rs.Kind, rs.Namespace, rs.Name) +
@@ -92,7 +92,7 @@ func updateReplicaSetStatus(logger klog.Logger, c appsclient.ReplicaSetInterface
return nil, updateErr
}
func calculateStatus(rs *apps.ReplicaSet, activePods []*v1.Pod, terminatingPods []*v1.Pod, manageReplicasErr error) apps.ReplicaSetStatus {
func calculateStatus(rs *apps.ReplicaSet, activePods []*v1.Pod, terminatingPods []*v1.Pod, manageReplicasErr error, controllerFeatures ReplicaSetControllerFeatures) apps.ReplicaSetStatus {
newStatus := rs.Status
// Count the number of pods that have labels matching the labels of the pod
// template of the replica set, the matching pods may have more
@@ -116,7 +116,7 @@ func calculateStatus(rs *apps.ReplicaSet, activePods []*v1.Pod, terminatingPods
}
var terminatingReplicasCount *int32
if utilfeature.DefaultFeatureGate.Enabled(features.DeploymentReplicaSetTerminatingReplicas) {
if utilfeature.DefaultFeatureGate.Enabled(features.DeploymentReplicaSetTerminatingReplicas) && controllerFeatures.EnableStatusTerminatingReplicas {
terminatingReplicasCount = ptr.To(int32(len(terminatingPods)))
}

View File

@@ -52,6 +52,7 @@ func TestCalculateStatus(t *testing.T) {
replicaset *apps.ReplicaSet
activePods []*v1.Pod
terminatingPods []*v1.Pod
controllerFeatures *ReplicaSetControllerFeatures
expectedReplicaSetStatus apps.ReplicaSetStatus
}{
{
@@ -62,6 +63,7 @@ func TestCalculateStatus(t *testing.T) {
newPod("pod1", fullyLabelledRS, v1.PodRunning, nil, true),
},
nil,
nil,
apps.ReplicaSetStatus{
Replicas: 1,
FullyLabeledReplicas: 1,
@@ -78,6 +80,7 @@ func TestCalculateStatus(t *testing.T) {
newPod("pod1", notFullyLabelledRS, v1.PodRunning, nil, true),
},
nil,
nil,
apps.ReplicaSetStatus{
Replicas: 1,
FullyLabeledReplicas: 0,
@@ -95,6 +98,7 @@ func TestCalculateStatus(t *testing.T) {
newPod("pod2", fullyLabelledRS, v1.PodRunning, nil, true),
},
nil,
nil,
apps.ReplicaSetStatus{
Replicas: 2,
FullyLabeledReplicas: 2,
@@ -112,6 +116,7 @@ func TestCalculateStatus(t *testing.T) {
newPod("pod2", fullyLabelledRS, v1.PodRunning, nil, true),
},
nil,
nil,
apps.ReplicaSetStatus{
Replicas: 2,
FullyLabeledReplicas: 2,
@@ -129,6 +134,7 @@ func TestCalculateStatus(t *testing.T) {
newPod("pod2", notFullyLabelledRS, v1.PodRunning, nil, true),
},
nil,
nil,
apps.ReplicaSetStatus{
Replicas: 2,
FullyLabeledReplicas: 0,
@@ -146,6 +152,7 @@ func TestCalculateStatus(t *testing.T) {
newPod("pod2", fullyLabelledRS, v1.PodRunning, nil, true),
},
nil,
nil,
apps.ReplicaSetStatus{
Replicas: 2,
FullyLabeledReplicas: 1,
@@ -162,6 +169,7 @@ func TestCalculateStatus(t *testing.T) {
newPod("pod1", fullyLabelledRS, v1.PodPending, nil, true),
},
nil,
nil,
apps.ReplicaSetStatus{
Replicas: 1,
FullyLabeledReplicas: 1,
@@ -178,6 +186,7 @@ func TestCalculateStatus(t *testing.T) {
newPod("pod1", longMinReadySecondsRS, v1.PodRunning, nil, true),
},
nil,
nil,
apps.ReplicaSetStatus{
Replicas: 1,
FullyLabeledReplicas: 1,
@@ -196,6 +205,7 @@ func TestCalculateStatus(t *testing.T) {
[]*v1.Pod{
asTerminating(newPod("pod2", fullyLabelledRS, v1.PodRunning, nil, true)),
},
nil,
apps.ReplicaSetStatus{
Replicas: 1,
FullyLabeledReplicas: 1,
@@ -215,6 +225,7 @@ func TestCalculateStatus(t *testing.T) {
asTerminating(newPod("pod2", fullyLabelledRS, v1.PodRunning, nil, true)),
asTerminating(newPod("pod3", fullyLabelledRS, v1.PodRunning, nil, true)),
},
nil,
apps.ReplicaSetStatus{
Replicas: 1,
FullyLabeledReplicas: 1,
@@ -223,13 +234,37 @@ func TestCalculateStatus(t *testing.T) {
TerminatingReplicas: ptr.To[int32](2),
},
},
{
"1 fully labelled pods and 2 terminating with DeploymentReplicaSetTerminatingReplicas (ReplicationController)",
true,
fullyLabelledRS,
[]*v1.Pod{
newPod("pod1", fullyLabelledRS, v1.PodRunning, nil, true),
},
[]*v1.Pod{
asTerminating(newPod("pod2", fullyLabelledRS, v1.PodRunning, nil, true)),
asTerminating(newPod("pod3", fullyLabelledRS, v1.PodRunning, nil, true)),
},
&ReplicaSetControllerFeatures{
EnableStatusTerminatingReplicas: false,
},
apps.ReplicaSetStatus{
Replicas: 1,
FullyLabeledReplicas: 1,
ReadyReplicas: 1,
AvailableReplicas: 1,
TerminatingReplicas: nil,
},
},
}
for _, test := range rsStatusTests {
t.Run(test.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DeploymentReplicaSetTerminatingReplicas, test.enableDeploymentReplicaSetTerminatingReplicas)
replicaSetStatus := calculateStatus(test.replicaset, test.activePods, test.terminatingPods, nil)
// test ReplicaSet controller default behavior unless specified otherwise in the test case
controllerFeatures := ptr.Deref(test.controllerFeatures, DefaultReplicaSetControllerFeatures())
replicaSetStatus := calculateStatus(test.replicaset, test.activePods, test.terminatingPods, nil, controllerFeatures)
if !reflect.DeepEqual(replicaSetStatus, test.expectedReplicaSetStatus) {
t.Errorf("unexpected replicaset status: expected %v, got %v", test.expectedReplicaSetStatus, replicaSetStatus)
}
@@ -326,7 +361,7 @@ func TestCalculateStatusConditions(t *testing.T) {
for _, test := range rsStatusConditionTests {
t.Run(test.name, func(t *testing.T) {
replicaSetStatus := calculateStatus(test.replicaset, test.activePods, nil, test.manageReplicasErr)
replicaSetStatus := calculateStatus(test.replicaset, test.activePods, nil, test.manageReplicasErr, DefaultReplicaSetControllerFeatures())
// all test cases have at most 1 status condition
if len(replicaSetStatus.Conditions) > 0 {
test.expectedReplicaSetConditions[0].LastTransitionTime = replicaSetStatus.Conditions[0].LastTransitionTime

View File

@@ -63,6 +63,11 @@ func NewReplicationManager(ctx context.Context, podInformer coreinformers.PodInf
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replication-controller"}),
}},
eventBroadcaster,
replicaset.ReplicaSetControllerFeatures{
// ReplicationController API does not support the .status.terminatingReplicas field. However,
// ReplicaSets do support this field, which is then propagated to Deployments for higher-level features.
EnableStatusTerminatingReplicas: false,
},
),
}
}