mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-05 18:24:07 +00:00
ReplicaSet controller should count terminating pods in the status
This commit is contained in:
parent
28437797b5
commit
dc1914c61c
@ -973,6 +973,21 @@ func compareMaxContainerRestarts(pi *v1.Pod, pj *v1.Pod) *bool {
|
||||
return nil
|
||||
}
|
||||
|
||||
// FilterClaimedPods returns pods that are controlled by the controller and match the selector.
|
||||
func FilterClaimedPods(controller metav1.Object, selector labels.Selector, pods []*v1.Pod) []*v1.Pod {
|
||||
var result []*v1.Pod
|
||||
for _, pod := range pods {
|
||||
if !metav1.IsControlledBy(pod, controller) {
|
||||
// It's an orphan or owned by someone else.
|
||||
continue
|
||||
}
|
||||
if selector.Matches(labels.Set(pod.Labels)) {
|
||||
result = append(result, pod)
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// FilterActivePods returns pods that have not terminated.
|
||||
func FilterActivePods(logger klog.Logger, pods []*v1.Pod) []*v1.Pod {
|
||||
var result []*v1.Pod
|
||||
|
@ -55,6 +55,7 @@ import (
|
||||
"k8s.io/kubernetes/test/utils/ktesting"
|
||||
testingclock "k8s.io/utils/clock/testing"
|
||||
"k8s.io/utils/pointer"
|
||||
"k8s.io/utils/ptr"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@ -433,6 +434,128 @@ func TestCountTerminatingPods(t *testing.T) {
|
||||
assert.Len(t, terminatingList, int(2))
|
||||
}
|
||||
|
||||
func TestClaimedPodFiltering(t *testing.T) {
|
||||
rsUUID := uuid.NewUUID()
|
||||
|
||||
type podData struct {
|
||||
podName string
|
||||
ownerReferences []metav1.OwnerReference
|
||||
labels map[string]string
|
||||
}
|
||||
|
||||
type test struct {
|
||||
name string
|
||||
pods []podData
|
||||
wantPodNames []string
|
||||
}
|
||||
|
||||
tests := []test{
|
||||
{
|
||||
name: "Filters claimed pods",
|
||||
pods: []podData{
|
||||
// single owner reference
|
||||
{podName: "claimed-1", labels: map[string]string{"foo": "bar"}, ownerReferences: []metav1.OwnerReference{
|
||||
{UID: rsUUID, Controller: ptr.To(true)},
|
||||
}},
|
||||
{podName: "wrong-selector-1", labels: map[string]string{"foo": "baz"}, ownerReferences: []metav1.OwnerReference{
|
||||
{UID: rsUUID, Controller: ptr.To(true)},
|
||||
}},
|
||||
{podName: "non-controller-1", labels: map[string]string{"foo": "bar"}, ownerReferences: []metav1.OwnerReference{
|
||||
{UID: rsUUID, Controller: nil},
|
||||
}},
|
||||
{podName: "other-controller-1", labels: map[string]string{"foo": "bar"}, ownerReferences: []metav1.OwnerReference{
|
||||
{UID: uuid.NewUUID(), Controller: ptr.To(true)},
|
||||
}},
|
||||
{podName: "other-workload-1", labels: map[string]string{"foo": "bee"}, ownerReferences: []metav1.OwnerReference{
|
||||
{UID: uuid.NewUUID(), Controller: ptr.To(true)},
|
||||
}},
|
||||
{podName: "standalone-pod-1", labels: map[string]string{"foo": "beetle"}, ownerReferences: []metav1.OwnerReference{}},
|
||||
// additional controller owner reference set to controller=false
|
||||
{podName: "claimed-2", labels: map[string]string{"foo": "bar"}, ownerReferences: []metav1.OwnerReference{
|
||||
{UID: uuid.NewUUID(), Controller: ptr.To(false)},
|
||||
{UID: rsUUID, Controller: ptr.To(true)},
|
||||
}},
|
||||
{podName: "wrong-selector-2", labels: map[string]string{"foo": "baz"}, ownerReferences: []metav1.OwnerReference{
|
||||
{UID: uuid.NewUUID(), Controller: ptr.To(false)},
|
||||
{UID: rsUUID, Controller: ptr.To(true)},
|
||||
}},
|
||||
{podName: "non-controller-2", labels: map[string]string{"foo": "bar"}, ownerReferences: []metav1.OwnerReference{
|
||||
{UID: uuid.NewUUID(), Controller: ptr.To(false)},
|
||||
{UID: rsUUID, Controller: ptr.To(false)},
|
||||
}},
|
||||
{podName: "other-controller-2", labels: map[string]string{"foo": "bar"}, ownerReferences: []metav1.OwnerReference{
|
||||
{UID: uuid.NewUUID(), Controller: ptr.To(false)},
|
||||
{UID: uuid.NewUUID(), Controller: ptr.To(true)},
|
||||
}},
|
||||
{podName: "other-workload-1", labels: map[string]string{"foo": "bee"}, ownerReferences: []metav1.OwnerReference{
|
||||
{UID: uuid.NewUUID(), Controller: ptr.To(false)},
|
||||
{UID: uuid.NewUUID(), Controller: ptr.To(true)},
|
||||
}},
|
||||
{podName: "standalone-pod-1", labels: map[string]string{"foo": "beetle"}, ownerReferences: []metav1.OwnerReference{
|
||||
{UID: uuid.NewUUID(), Controller: ptr.To(false)},
|
||||
}},
|
||||
// additional controller owner reference set to controller=nil
|
||||
{podName: "claimed-3", labels: map[string]string{"foo": "bar"}, ownerReferences: []metav1.OwnerReference{
|
||||
{UID: uuid.NewUUID()},
|
||||
{UID: rsUUID, Controller: ptr.To(true)},
|
||||
}},
|
||||
{podName: "wrong-selector-3", labels: nil, ownerReferences: []metav1.OwnerReference{
|
||||
{UID: uuid.NewUUID()},
|
||||
{UID: rsUUID, Controller: ptr.To(true)},
|
||||
}},
|
||||
{podName: "non-controller-3", labels: map[string]string{"foo": "bar"}, ownerReferences: []metav1.OwnerReference{
|
||||
{UID: uuid.NewUUID()},
|
||||
{UID: rsUUID, Controller: nil},
|
||||
}},
|
||||
{podName: "other-controller-3", labels: map[string]string{"foo": "bar"}, ownerReferences: []metav1.OwnerReference{
|
||||
{UID: uuid.NewUUID()},
|
||||
{UID: uuid.NewUUID(), Controller: ptr.To(true)},
|
||||
}},
|
||||
{podName: "other-workload-1", labels: map[string]string{"foo": "bee"}, ownerReferences: []metav1.OwnerReference{
|
||||
{UID: uuid.NewUUID()},
|
||||
}},
|
||||
{podName: "standalone-pod-1", labels: map[string]string{"foo": "beetle"}, ownerReferences: []metav1.OwnerReference{
|
||||
{UID: uuid.NewUUID()},
|
||||
}},
|
||||
},
|
||||
wantPodNames: []string{"claimed-1", "claimed-2", "claimed-3"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
// This rc is not needed by the test, only the newPodList to give the pods labels/a namespace.
|
||||
rs := newReplicaSet("test-claim", 3, rsUUID)
|
||||
var pods []*v1.Pod
|
||||
for _, p := range test.pods {
|
||||
pods = append(pods, &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: p.podName,
|
||||
Namespace: rs.Namespace,
|
||||
Labels: p.labels,
|
||||
OwnerReferences: p.ownerReferences,
|
||||
},
|
||||
Status: v1.PodStatus{Phase: v1.PodRunning},
|
||||
})
|
||||
}
|
||||
|
||||
selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
|
||||
if err != nil {
|
||||
t.Fatalf("Couldn't get selector for object %#v: %v", rs, err)
|
||||
}
|
||||
got := FilterClaimedPods(rs, selector, pods)
|
||||
gotNames := sets.NewString()
|
||||
for _, pod := range got {
|
||||
gotNames.Insert(pod.Name)
|
||||
}
|
||||
|
||||
if diff := cmp.Diff(test.wantPodNames, gotNames.List()); diff != "" {
|
||||
t.Errorf("Active pod names (-want,+got):\n%s", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestActivePodFiltering(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
type podData struct {
|
||||
|
@ -45,6 +45,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
appsinformers "k8s.io/client-go/informers/apps/v1"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
@ -60,6 +61,7 @@ import (
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/replicaset/metrics"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -564,10 +566,10 @@ func (rsc *ReplicaSetController) processNextWorkItem(ctx context.Context) bool {
|
||||
}
|
||||
|
||||
// manageReplicas checks and updates replicas for the given ReplicaSet.
|
||||
// Does NOT modify <filteredPods>.
|
||||
// Does NOT modify <activePods>.
|
||||
// It will requeue the replica set in case of an error while creating/deleting pods.
|
||||
func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
|
||||
diff := len(filteredPods) - int(*(rs.Spec.Replicas))
|
||||
func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, activePods []*v1.Pod, rs *apps.ReplicaSet) error {
|
||||
diff := len(activePods) - int(*(rs.Spec.Replicas))
|
||||
rsKey, err := controller.KeyFunc(rs)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
|
||||
@ -627,7 +629,7 @@ func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPod
|
||||
utilruntime.HandleError(err)
|
||||
|
||||
// Choose which Pods to delete, preferring those in earlier phases of startup.
|
||||
podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff)
|
||||
podsToDelete := getPodsToDelete(activePods, relatedPods, diff)
|
||||
|
||||
// Snapshot the UIDs (ns/name) of the pods we're expecting to see
|
||||
// deleted, so we know to record their expectations exactly once either
|
||||
@ -707,22 +709,27 @@ func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Ignore inactive pods.
|
||||
filteredPods := controller.FilterActivePods(logger, allPods)
|
||||
|
||||
// NOTE: filteredPods are pointing to objects from cache - if you need to
|
||||
// NOTE: activePods and terminatingPods are pointing to objects from cache - if you need to
|
||||
// modify them, you need to copy it first.
|
||||
filteredPods, err = rsc.claimPods(ctx, rs, selector, filteredPods)
|
||||
allActivePods := controller.FilterActivePods(logger, allPods)
|
||||
activePods, err := rsc.claimPods(ctx, rs, selector, allActivePods)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var terminatingPods []*v1.Pod
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.DeploymentPodReplacementPolicy) {
|
||||
allTerminatingPods := controller.FilterTerminatingPods(allPods)
|
||||
terminatingPods = controller.FilterClaimedPods(rs, selector, allTerminatingPods)
|
||||
}
|
||||
|
||||
var manageReplicasErr error
|
||||
if rsNeedsSync && rs.DeletionTimestamp == nil {
|
||||
manageReplicasErr = rsc.manageReplicas(ctx, filteredPods, rs)
|
||||
manageReplicasErr = rsc.manageReplicas(ctx, activePods, rs)
|
||||
}
|
||||
rs = rs.DeepCopy()
|
||||
newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
|
||||
newStatus := calculateStatus(rs, activePods, terminatingPods, manageReplicasErr)
|
||||
|
||||
// Always updates status as pods come up or die.
|
||||
updatedRS, err := updateReplicaSetStatus(logger, rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
|
||||
|
@ -29,8 +29,11 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
appsclient "k8s.io/client-go/kubernetes/typed/apps/v1"
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/utils/ptr"
|
||||
)
|
||||
|
||||
// updateReplicaSetStatus attempts to update the Status.Replicas of the given ReplicaSet, with a single GET/PUT retry.
|
||||
@ -42,6 +45,7 @@ func updateReplicaSetStatus(logger klog.Logger, c appsclient.ReplicaSetInterface
|
||||
rs.Status.FullyLabeledReplicas == newStatus.FullyLabeledReplicas &&
|
||||
rs.Status.ReadyReplicas == newStatus.ReadyReplicas &&
|
||||
rs.Status.AvailableReplicas == newStatus.AvailableReplicas &&
|
||||
ptr.Equal(rs.Status.TerminatingReplicas, newStatus.TerminatingReplicas) &&
|
||||
rs.Generation == rs.Status.ObservedGeneration &&
|
||||
reflect.DeepEqual(rs.Status.Conditions, newStatus.Conditions) {
|
||||
return rs, nil
|
||||
@ -56,11 +60,16 @@ func updateReplicaSetStatus(logger klog.Logger, c appsclient.ReplicaSetInterface
|
||||
var getErr, updateErr error
|
||||
var updatedRS *apps.ReplicaSet
|
||||
for i, rs := 0, rs; ; i++ {
|
||||
terminatingReplicasUpdateInfo := ""
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.DeploymentPodReplacementPolicy) {
|
||||
terminatingReplicasUpdateInfo = fmt.Sprintf("terminatingReplicas %s->%s, ", derefInt32ToStr(rs.Status.TerminatingReplicas), derefInt32ToStr(newStatus.TerminatingReplicas))
|
||||
}
|
||||
logger.V(4).Info(fmt.Sprintf("Updating status for %v: %s/%s, ", rs.Kind, rs.Namespace, rs.Name) +
|
||||
fmt.Sprintf("replicas %d->%d (need %d), ", rs.Status.Replicas, newStatus.Replicas, *(rs.Spec.Replicas)) +
|
||||
fmt.Sprintf("fullyLabeledReplicas %d->%d, ", rs.Status.FullyLabeledReplicas, newStatus.FullyLabeledReplicas) +
|
||||
fmt.Sprintf("readyReplicas %d->%d, ", rs.Status.ReadyReplicas, newStatus.ReadyReplicas) +
|
||||
fmt.Sprintf("availableReplicas %d->%d, ", rs.Status.AvailableReplicas, newStatus.AvailableReplicas) +
|
||||
terminatingReplicasUpdateInfo +
|
||||
fmt.Sprintf("sequence No: %v->%v", rs.Status.ObservedGeneration, newStatus.ObservedGeneration))
|
||||
|
||||
rs.Status = newStatus
|
||||
@ -83,18 +92,18 @@ func updateReplicaSetStatus(logger klog.Logger, c appsclient.ReplicaSetInterface
|
||||
return nil, updateErr
|
||||
}
|
||||
|
||||
func calculateStatus(rs *apps.ReplicaSet, filteredPods []*v1.Pod, manageReplicasErr error) apps.ReplicaSetStatus {
|
||||
func calculateStatus(rs *apps.ReplicaSet, activePods []*v1.Pod, terminatingPods []*v1.Pod, manageReplicasErr error) apps.ReplicaSetStatus {
|
||||
newStatus := rs.Status
|
||||
// Count the number of pods that have labels matching the labels of the pod
|
||||
// template of the replica set, the matching pods may have more
|
||||
// labels than are in the template. Because the label of podTemplateSpec is
|
||||
// a superset of the selector of the replica set, so the possible
|
||||
// matching pods must be part of the filteredPods.
|
||||
// matching pods must be part of the activePods.
|
||||
fullyLabeledReplicasCount := 0
|
||||
readyReplicasCount := 0
|
||||
availableReplicasCount := 0
|
||||
templateLabel := labels.Set(rs.Spec.Template.Labels).AsSelectorPreValidated()
|
||||
for _, pod := range filteredPods {
|
||||
for _, pod := range activePods {
|
||||
if templateLabel.Matches(labels.Set(pod.Labels)) {
|
||||
fullyLabeledReplicasCount++
|
||||
}
|
||||
@ -106,10 +115,15 @@ func calculateStatus(rs *apps.ReplicaSet, filteredPods []*v1.Pod, manageReplicas
|
||||
}
|
||||
}
|
||||
|
||||
var terminatingReplicasCount *int32
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.DeploymentPodReplacementPolicy) {
|
||||
terminatingReplicasCount = ptr.To(int32(len(terminatingPods)))
|
||||
}
|
||||
|
||||
failureCond := GetCondition(rs.Status, apps.ReplicaSetReplicaFailure)
|
||||
if manageReplicasErr != nil && failureCond == nil {
|
||||
var reason string
|
||||
if diff := len(filteredPods) - int(*(rs.Spec.Replicas)); diff < 0 {
|
||||
if diff := len(activePods) - int(*(rs.Spec.Replicas)); diff < 0 {
|
||||
reason = "FailedCreate"
|
||||
} else if diff > 0 {
|
||||
reason = "FailedDelete"
|
||||
@ -120,10 +134,11 @@ func calculateStatus(rs *apps.ReplicaSet, filteredPods []*v1.Pod, manageReplicas
|
||||
RemoveCondition(&newStatus, apps.ReplicaSetReplicaFailure)
|
||||
}
|
||||
|
||||
newStatus.Replicas = int32(len(filteredPods))
|
||||
newStatus.Replicas = int32(len(activePods))
|
||||
newStatus.FullyLabeledReplicas = int32(fullyLabeledReplicasCount)
|
||||
newStatus.ReadyReplicas = int32(readyReplicasCount)
|
||||
newStatus.AvailableReplicas = int32(availableReplicasCount)
|
||||
newStatus.TerminatingReplicas = terminatingReplicasCount
|
||||
return newStatus
|
||||
}
|
||||
|
||||
@ -175,3 +190,10 @@ func filterOutCondition(conditions []apps.ReplicaSetCondition, condType apps.Rep
|
||||
}
|
||||
return newConditions
|
||||
}
|
||||
|
||||
func derefInt32ToStr(ptr *int32) string {
|
||||
if ptr == nil {
|
||||
return "nil"
|
||||
}
|
||||
return fmt.Sprintf("%d", *ptr)
|
||||
}
|
||||
|
@ -25,6 +25,11 @@ import (
|
||||
|
||||
apps "k8s.io/api/apps/v1"
|
||||
"k8s.io/api/core/v1"
|
||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/utils/ptr"
|
||||
)
|
||||
|
||||
func TestCalculateStatus(t *testing.T) {
|
||||
@ -36,113 +41,199 @@ func TestCalculateStatus(t *testing.T) {
|
||||
longMinReadySecondsRS := newReplicaSet(1, fullLabelMap)
|
||||
longMinReadySecondsRS.Spec.MinReadySeconds = 3600
|
||||
|
||||
asTerminating := func(pod *v1.Pod) *v1.Pod {
|
||||
pod.DeletionTimestamp = ptr.To(meta.Now())
|
||||
return pod
|
||||
}
|
||||
|
||||
rsStatusTests := []struct {
|
||||
name string
|
||||
replicaset *apps.ReplicaSet
|
||||
filteredPods []*v1.Pod
|
||||
expectedReplicaSetStatus apps.ReplicaSetStatus
|
||||
name string
|
||||
enableDeploymentPodReplacementPolicy bool
|
||||
replicaset *apps.ReplicaSet
|
||||
activePods []*v1.Pod
|
||||
terminatingPods []*v1.Pod
|
||||
expectedReplicaSetStatus apps.ReplicaSetStatus
|
||||
}{
|
||||
{
|
||||
"1 fully labelled pod",
|
||||
false,
|
||||
fullyLabelledRS,
|
||||
[]*v1.Pod{
|
||||
newPod("pod1", fullyLabelledRS, v1.PodRunning, nil, true),
|
||||
},
|
||||
nil,
|
||||
apps.ReplicaSetStatus{
|
||||
Replicas: 1,
|
||||
FullyLabeledReplicas: 1,
|
||||
ReadyReplicas: 1,
|
||||
AvailableReplicas: 1,
|
||||
TerminatingReplicas: nil,
|
||||
},
|
||||
},
|
||||
{
|
||||
"1 not fully labelled pod",
|
||||
false,
|
||||
notFullyLabelledRS,
|
||||
[]*v1.Pod{
|
||||
newPod("pod1", notFullyLabelledRS, v1.PodRunning, nil, true),
|
||||
},
|
||||
nil,
|
||||
apps.ReplicaSetStatus{
|
||||
Replicas: 1,
|
||||
FullyLabeledReplicas: 0,
|
||||
ReadyReplicas: 1,
|
||||
AvailableReplicas: 1,
|
||||
TerminatingReplicas: nil,
|
||||
},
|
||||
},
|
||||
{
|
||||
"2 fully labelled pods",
|
||||
false,
|
||||
fullyLabelledRS,
|
||||
[]*v1.Pod{
|
||||
newPod("pod1", fullyLabelledRS, v1.PodRunning, nil, true),
|
||||
newPod("pod2", fullyLabelledRS, v1.PodRunning, nil, true),
|
||||
},
|
||||
nil,
|
||||
apps.ReplicaSetStatus{
|
||||
Replicas: 2,
|
||||
FullyLabeledReplicas: 2,
|
||||
ReadyReplicas: 2,
|
||||
AvailableReplicas: 2,
|
||||
TerminatingReplicas: nil,
|
||||
},
|
||||
},
|
||||
{
|
||||
"2 fully labelled pods with DeploymentPodReplacementPolicy",
|
||||
true,
|
||||
fullyLabelledRS,
|
||||
[]*v1.Pod{
|
||||
newPod("pod1", fullyLabelledRS, v1.PodRunning, nil, true),
|
||||
newPod("pod2", fullyLabelledRS, v1.PodRunning, nil, true),
|
||||
},
|
||||
nil,
|
||||
apps.ReplicaSetStatus{
|
||||
Replicas: 2,
|
||||
FullyLabeledReplicas: 2,
|
||||
ReadyReplicas: 2,
|
||||
AvailableReplicas: 2,
|
||||
TerminatingReplicas: ptr.To[int32](0),
|
||||
},
|
||||
},
|
||||
{
|
||||
"2 not fully labelled pods",
|
||||
false,
|
||||
notFullyLabelledRS,
|
||||
[]*v1.Pod{
|
||||
newPod("pod1", notFullyLabelledRS, v1.PodRunning, nil, true),
|
||||
newPod("pod2", notFullyLabelledRS, v1.PodRunning, nil, true),
|
||||
},
|
||||
nil,
|
||||
apps.ReplicaSetStatus{
|
||||
Replicas: 2,
|
||||
FullyLabeledReplicas: 0,
|
||||
ReadyReplicas: 2,
|
||||
AvailableReplicas: 2,
|
||||
TerminatingReplicas: nil,
|
||||
},
|
||||
},
|
||||
{
|
||||
"1 fully labelled pod, 1 not fully labelled pod",
|
||||
false,
|
||||
notFullyLabelledRS,
|
||||
[]*v1.Pod{
|
||||
newPod("pod1", notFullyLabelledRS, v1.PodRunning, nil, true),
|
||||
newPod("pod2", fullyLabelledRS, v1.PodRunning, nil, true),
|
||||
},
|
||||
nil,
|
||||
apps.ReplicaSetStatus{
|
||||
Replicas: 2,
|
||||
FullyLabeledReplicas: 1,
|
||||
ReadyReplicas: 2,
|
||||
AvailableReplicas: 2,
|
||||
TerminatingReplicas: nil,
|
||||
},
|
||||
},
|
||||
{
|
||||
"1 non-ready pod",
|
||||
false,
|
||||
fullyLabelledRS,
|
||||
[]*v1.Pod{
|
||||
newPod("pod1", fullyLabelledRS, v1.PodPending, nil, true),
|
||||
},
|
||||
nil,
|
||||
apps.ReplicaSetStatus{
|
||||
Replicas: 1,
|
||||
FullyLabeledReplicas: 1,
|
||||
ReadyReplicas: 0,
|
||||
AvailableReplicas: 0,
|
||||
TerminatingReplicas: nil,
|
||||
},
|
||||
},
|
||||
{
|
||||
"1 ready but non-available pod",
|
||||
false,
|
||||
longMinReadySecondsRS,
|
||||
[]*v1.Pod{
|
||||
newPod("pod1", longMinReadySecondsRS, v1.PodRunning, nil, true),
|
||||
},
|
||||
nil,
|
||||
apps.ReplicaSetStatus{
|
||||
Replicas: 1,
|
||||
FullyLabeledReplicas: 1,
|
||||
ReadyReplicas: 1,
|
||||
AvailableReplicas: 0,
|
||||
TerminatingReplicas: nil,
|
||||
},
|
||||
},
|
||||
{
|
||||
"1 fully labelled pod and 1 terminating without DeploymentPodReplacementPolicy",
|
||||
false,
|
||||
fullyLabelledRS,
|
||||
[]*v1.Pod{
|
||||
newPod("pod1", fullyLabelledRS, v1.PodRunning, nil, true),
|
||||
},
|
||||
[]*v1.Pod{
|
||||
asTerminating(newPod("pod2", fullyLabelledRS, v1.PodRunning, nil, true)),
|
||||
},
|
||||
apps.ReplicaSetStatus{
|
||||
Replicas: 1,
|
||||
FullyLabeledReplicas: 1,
|
||||
ReadyReplicas: 1,
|
||||
AvailableReplicas: 1,
|
||||
TerminatingReplicas: nil,
|
||||
},
|
||||
},
|
||||
{
|
||||
"1 fully labelled pods and 2 terminating with DeploymentPodReplacementPolicy",
|
||||
true,
|
||||
fullyLabelledRS,
|
||||
[]*v1.Pod{
|
||||
newPod("pod1", fullyLabelledRS, v1.PodRunning, nil, true),
|
||||
},
|
||||
[]*v1.Pod{
|
||||
asTerminating(newPod("pod2", fullyLabelledRS, v1.PodRunning, nil, true)),
|
||||
asTerminating(newPod("pod3", fullyLabelledRS, v1.PodRunning, nil, true)),
|
||||
},
|
||||
apps.ReplicaSetStatus{
|
||||
Replicas: 1,
|
||||
FullyLabeledReplicas: 1,
|
||||
ReadyReplicas: 1,
|
||||
AvailableReplicas: 1,
|
||||
TerminatingReplicas: ptr.To[int32](2),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range rsStatusTests {
|
||||
replicaSetStatus := calculateStatus(test.replicaset, test.filteredPods, nil)
|
||||
if !reflect.DeepEqual(replicaSetStatus, test.expectedReplicaSetStatus) {
|
||||
t.Errorf("%s: unexpected replicaset status: expected %v, got %v", test.name, test.expectedReplicaSetStatus, replicaSetStatus)
|
||||
}
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DeploymentPodReplacementPolicy, test.enableDeploymentPodReplacementPolicy)
|
||||
|
||||
replicaSetStatus := calculateStatus(test.replicaset, test.activePods, test.terminatingPods, nil)
|
||||
if !reflect.DeepEqual(replicaSetStatus, test.expectedReplicaSetStatus) {
|
||||
t.Errorf("unexpected replicaset status: expected %v, got %v", test.expectedReplicaSetStatus, replicaSetStatus)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -160,7 +251,7 @@ func TestCalculateStatusConditions(t *testing.T) {
|
||||
rsStatusConditionTests := []struct {
|
||||
name string
|
||||
replicaset *apps.ReplicaSet
|
||||
filteredPods []*v1.Pod
|
||||
activePods []*v1.Pod
|
||||
manageReplicasErr error
|
||||
expectedReplicaSetConditions []apps.ReplicaSetCondition
|
||||
}{
|
||||
@ -234,13 +325,15 @@ func TestCalculateStatusConditions(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, test := range rsStatusConditionTests {
|
||||
replicaSetStatus := calculateStatus(test.replicaset, test.filteredPods, test.manageReplicasErr)
|
||||
// all test cases have at most 1 status condition
|
||||
if len(replicaSetStatus.Conditions) > 0 {
|
||||
test.expectedReplicaSetConditions[0].LastTransitionTime = replicaSetStatus.Conditions[0].LastTransitionTime
|
||||
}
|
||||
if !reflect.DeepEqual(replicaSetStatus.Conditions, test.expectedReplicaSetConditions) {
|
||||
t.Errorf("%s: unexpected replicaset status: expected %v, got %v", test.name, test.expectedReplicaSetConditions, replicaSetStatus.Conditions)
|
||||
}
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
replicaSetStatus := calculateStatus(test.replicaset, test.activePods, nil, test.manageReplicasErr)
|
||||
// all test cases have at most 1 status condition
|
||||
if len(replicaSetStatus.Conditions) > 0 {
|
||||
test.expectedReplicaSetConditions[0].LastTransitionTime = replicaSetStatus.Conditions[0].LastTransitionTime
|
||||
}
|
||||
if !reflect.DeepEqual(replicaSetStatus.Conditions, test.expectedReplicaSetConditions) {
|
||||
t.Errorf("unexpected replicaset status: expected %v, got %v", test.expectedReplicaSetConditions, replicaSetStatus.Conditions)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -208,6 +208,20 @@ func waitRSStable(t *testing.T, clientSet clientset.Interface, rs *apps.ReplicaS
|
||||
}
|
||||
}
|
||||
|
||||
// Verify .Status.TerminatingPods is equal to terminatingPods.
|
||||
func waitForTerminatingPods(clientSet clientset.Interface, ctx context.Context, rs *apps.ReplicaSet, terminatingPods *int32) error {
|
||||
if err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(c context.Context) (bool, error) {
|
||||
newRS, err := clientSet.AppsV1().ReplicaSets(rs.Namespace).Get(c, rs.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return ptr.Equal(newRS.Status.TerminatingReplicas, terminatingPods), nil
|
||||
}); err != nil {
|
||||
return fmt.Errorf("failed to verify .Status.TerminatingPods is equal to %d for replicaset %q: %w", ptr.Deref(terminatingPods, -1), rs.Name, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Update .Spec.Replicas to replicas and verify .Status.Replicas is changed accordingly
|
||||
func scaleRS(t *testing.T, c clientset.Interface, rs *apps.ReplicaSet, replicas int32) {
|
||||
rsClient := c.AppsV1().ReplicaSets(rs.Namespace)
|
||||
@ -1056,3 +1070,72 @@ func TestReplicaSetsAppsV1DefaultGCPolicy(t *testing.T) {
|
||||
|
||||
_ = rsClient.Delete(tCtx, rs.Name, metav1.DeleteOptions{})
|
||||
}
|
||||
|
||||
func TestTerminatingReplicas(t *testing.T) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DeploymentPodReplacementPolicy, false)
|
||||
|
||||
tCtx, closeFn, rm, informers, c := rmSetup(t)
|
||||
defer closeFn()
|
||||
ns := framework.CreateNamespaceOrDie(c, "test-terminating-replicas", t)
|
||||
defer framework.DeleteNamespaceOrDie(c, ns, t)
|
||||
stopControllers := runControllerAndInformers(t, rm, informers, 0)
|
||||
defer stopControllers()
|
||||
|
||||
rs := newRS("rs", ns.Name, 5)
|
||||
rs.Spec.Template.Spec.NodeName = "fake-node"
|
||||
rs.Spec.Template.Spec.TerminationGracePeriodSeconds = ptr.To(int64(300))
|
||||
rss, _ := createRSsPods(t, c, []*apps.ReplicaSet{rs}, []*v1.Pod{})
|
||||
rs = rss[0]
|
||||
waitRSStable(t, c, rs)
|
||||
if err := waitForTerminatingPods(c, tCtx, rs, nil); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
podClient := c.CoreV1().Pods(ns.Name)
|
||||
pods := getPods(t, podClient, labelMap())
|
||||
if len(pods.Items) != 5 {
|
||||
t.Fatalf("len(pods) = %d, want 5", len(pods.Items))
|
||||
}
|
||||
setPodsReadyCondition(t, c, pods, v1.ConditionTrue, time.Now())
|
||||
|
||||
// should not update terminating pods when feature gate is disabled
|
||||
if err := podClient.Delete(tCtx, pods.Items[0].Name, metav1.DeleteOptions{}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
waitRSStable(t, c, rs)
|
||||
if err := waitForTerminatingPods(c, tCtx, rs, nil); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// should update terminating pods when feature gate is enabled
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DeploymentPodReplacementPolicy, true)
|
||||
if err := podClient.Delete(tCtx, pods.Items[1].Name, metav1.DeleteOptions{}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
waitRSStable(t, c, rs)
|
||||
if err := waitForTerminatingPods(c, tCtx, rs, ptr.To[int32](2)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// should update status when the pod is removed
|
||||
if err := podClient.Delete(tCtx, pods.Items[0].Name, metav1.DeleteOptions{GracePeriodSeconds: ptr.To(int64(0))}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := waitForTerminatingPods(c, tCtx, rs, ptr.To[int32](1)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// should revert terminating pods to 0 when feature gate is disabled
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DeploymentPodReplacementPolicy, false)
|
||||
if err := podClient.Delete(tCtx, pods.Items[2].Name, metav1.DeleteOptions{}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
waitRSStable(t, c, rs)
|
||||
if err := waitForTerminatingPods(c, tCtx, rs, nil); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
pods = getPods(t, podClient, labelMap())
|
||||
// 5 non-terminating, 2 terminating
|
||||
if len(pods.Items) != 7 {
|
||||
t.Fatalf("len(pods) = %d, want 7", len(pods.Items))
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user