diff --git a/pkg/controller/statefulset/stateful_set_control.go b/pkg/controller/statefulset/stateful_set_control.go index a248fd81fba..6fb3579fce6 100644 --- a/pkg/controller/statefulset/stateful_set_control.go +++ b/pkg/controller/statefulset/stateful_set_control.go @@ -22,8 +22,10 @@ import ( apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilerrors "k8s.io/apimachinery/pkg/util/errors" + intstrutil "k8s.io/apimachinery/pkg/util/intstr" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" @@ -546,6 +548,16 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( return &status, nil } + if utilfeature.DefaultFeatureGate.Enabled(features.MaxUnavailableStatefulSet) { + return updateStatefulSetAfterInvariantEstablished(ctx, + ssc, + set, + replicas, + updateRevision, + status, + ) + } + // we compute the minimum ordinal of the target sequence for a destructive update based on the strategy. updateMin := 0 if set.Spec.UpdateStrategy.RollingUpdate != nil { @@ -574,6 +586,80 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( return &status, nil } +func updateStatefulSetAfterInvariantEstablished( + ctx context.Context, + ssc *defaultStatefulSetControl, + set *apps.StatefulSet, + replicas []*v1.Pod, + updateRevision *apps.ControllerRevision, + status apps.StatefulSetStatus, +) (*apps.StatefulSetStatus, error) { + + replicaCount := int(*set.Spec.Replicas) + + // we compute the minimum ordinal of the target sequence for a destructive update based on the strategy. + updateMin := 0 + maxUnavailable := 1 + if set.Spec.UpdateStrategy.RollingUpdate != nil { + updateMin = int(*set.Spec.UpdateStrategy.RollingUpdate.Partition) + + // if the feature was enabled and then later disabled, MaxUnavailable may have a value + // other than 1. Ignore the passed in value and Use maxUnavailable as 1 to enforce + // expected behavior when feature gate is not enabled. + var err error + maxUnavailable, err = intstrutil.GetValueFromIntOrPercent(intstrutil.ValueOrDefault(set.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, intstrutil.FromInt(1)), int(replicaCount), false) + if err != nil { + return &status, err + } + + } + + // Collect all targets in the range between the 0 and Spec.Replicas. Count any targets in that range + // that are unhealthy i.e. terminated or not running and ready as unavailable). Select the + // (MaxUnavailable - Unavailable) Pods, in order with respect to their ordinal for termination. Delete + // those pods and count the successful deletions. Update the status with the correct number of deletions. + unavailablePods := 0 + for target := len(replicas) - 1; target >= 0; target-- { + if !isHealthy(replicas[target]) { + unavailablePods++ + } + } + + if unavailablePods >= maxUnavailable { + klog.V(2).Infof("StatefulSet %s/%s found unavailablePods %v, more than or equal to allowed maxUnavailable %v", + set.Namespace, + set.Name, + unavailablePods, + maxUnavailable) + return &status, nil + } + + // Now we need to delete MaxUnavailable- unavailablePods + // start deleting one by one starting from the highest ordinal first + podsToDelete := maxUnavailable - unavailablePods + + deletedPods := 0 + for target := len(replicas) - 1; target >= updateMin && deletedPods < podsToDelete; target-- { + + // delete the Pod if it is healthy and the revision doesnt match the target + if getPodRevision(replicas[target]) != updateRevision.Name && !isTerminating(replicas[target]) { + // delete the Pod if it is healthy and the revision doesnt match the target + klog.V(2).Infof("StatefulSet %s/%s terminating Pod %s for update", + set.Namespace, + set.Name, + replicas[target].Name) + if err := ssc.podControl.DeleteStatefulPod(set, replicas[target]); err != nil { + if !errors.IsNotFound(err) { + return nil, err + } + } + deletedPods++ + status.CurrentReplicas-- + } + } + return &status, nil +} + // updateStatefulSetStatus updates set's Status to be equal to status. If status indicates a complete update, it is // mutated to indicate completion. If status is semantically equivalent to set's Status no update is performed. If the // returned error is nil, the update is successful. diff --git a/pkg/controller/statefulset/stateful_set_control_test.go b/pkg/controller/statefulset/stateful_set_control_test.go index 782215d5189..cda8f435d6d 100644 --- a/pkg/controller/statefulset/stateful_set_control_test.go +++ b/pkg/controller/statefulset/stateful_set_control_test.go @@ -36,6 +36,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/intstr" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" appsinformers "k8s.io/client-go/informers/apps/v1" @@ -728,6 +729,325 @@ func TestStatefulSetControl_getSetRevisions(t *testing.T) { }) } +func setupPodManagementPolicy(podManagementPolicy apps.PodManagementPolicyType, set *apps.StatefulSet) *apps.StatefulSet { + set.Spec.PodManagementPolicy = podManagementPolicy + return set +} + +func TestStatefulSetControlRollingUpdateWithMaxUnavailable(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MaxUnavailableStatefulSet, true)() + + simpleParallelVerificationFn := func( + set *apps.StatefulSet, + spc *fakeObjectManager, + ssc StatefulSetControlInterface, + pods []*v1.Pod, + totalPods int, + selector labels.Selector, + ) []*v1.Pod { + // in burst mode, 2 pods got deleted, so 2 new pods will be created at the same time + if len(pods) != totalPods { + t.Fatalf("Expected create pods 4/5, got pods %v", len(pods)) + } + + // if pod 4 ready, start to update pod 3, even though 5 is not ready + spc.setPodRunning(set, 4) + spc.setPodRunning(set, 5) + originalPods, _ := spc.setPodReady(set, 4) + sort.Sort(ascendingOrdinal(originalPods)) + if _, err := ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil { + t.Fatal(err) + } + pods, err := spc.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Fatal(err) + } + sort.Sort(ascendingOrdinal(pods)) + // pods 0, 1,2, 4,5 should be present(note 3 is missing) + if !reflect.DeepEqual(pods, append(originalPods[:3], originalPods[4:]...)) { + t.Fatalf("Expected pods %v, got pods %v", append(originalPods[:3], originalPods[4:]...), pods) + } + + // create new pod 3 + if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + t.Fatal(err) + } + pods, err = spc.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Fatal(err) + } + if len(pods) != totalPods { + t.Fatalf("Expected create pods 2/3, got pods %v", pods) + } + + return pods + } + simpleOrderedVerificationFn := func( + set *apps.StatefulSet, + spc *fakeObjectManager, + ssc StatefulSetControlInterface, + pods []*v1.Pod, + totalPods int, + selector labels.Selector, + ) []*v1.Pod { + // only one pod gets created at a time due to OrderedReady + if len(pods) != 5 { + t.Fatalf("Expected create pods 5, got pods %v", len(pods)) + } + spc.setPodRunning(set, 4) + pods, _ = spc.setPodReady(set, 4) + + // create new pods 4(only one pod gets created at a time due to OrderedReady) + if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + t.Fatal(err) + } + pods, err := spc.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Fatal(err) + } + + if len(pods) != totalPods { + t.Fatalf("Expected create pods 4, got pods %v", len(pods)) + } + // if pod 4 ready, start to update pod 3 + spc.setPodRunning(set, 5) + originalPods, _ := spc.setPodReady(set, 5) + sort.Sort(ascendingOrdinal(originalPods)) + if _, err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil { + t.Fatal(err) + } + pods, err = spc.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Fatal(err) + } + sort.Sort(ascendingOrdinal(pods)) + + // verify the remaining pods are 0,1,2,4,5 (3 got deleted) + if !reflect.DeepEqual(pods, append(originalPods[:3], originalPods[4:]...)) { + t.Fatalf("Expected pods %v, got pods %v", append(originalPods[:3], originalPods[4:]...), pods) + } + + // create new pod 3 + if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + t.Fatal(err) + } + pods, err = spc.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Fatal(err) + } + if len(pods) != totalPods { + t.Fatalf("Expected create pods 2/3, got pods %v", pods) + } + + return pods + } + testCases := []struct { + policyType apps.PodManagementPolicyType + verifyFn func( + set *apps.StatefulSet, + spc *fakeObjectManager, + ssc StatefulSetControlInterface, + pods []*v1.Pod, + totalPods int, + selector labels.Selector, + ) []*v1.Pod + }{ + {apps.OrderedReadyPodManagement, simpleOrderedVerificationFn}, + {apps.ParallelPodManagement, simpleParallelVerificationFn}, + } + for _, tc := range testCases { + // Setup the statefulSet controller + totalPods := 6 + var partition int32 = 3 + var maxUnavailable = intstr.FromInt(2) + set := setupPodManagementPolicy(tc.policyType, newStatefulSet(totalPods)) + set.Spec.UpdateStrategy = apps.StatefulSetUpdateStrategy{ + Type: apps.RollingUpdateStatefulSetStrategyType, + RollingUpdate: func() *apps.RollingUpdateStatefulSetStrategy { + return &apps.RollingUpdateStatefulSetStrategy{ + Partition: &partition, + MaxUnavailable: &maxUnavailable, + } + }(), + } + + client := fake.NewSimpleClientset() + spc, _, ssc := setupController(client) + if err := scaleUpStatefulSetControl(set, ssc, spc, assertBurstInvariants); err != nil { + t.Fatal(err) + } + set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + t.Fatal(err) + } + + // Change the image to trigger an update + set.Spec.Template.Spec.Containers[0].Image = "foo" + + selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) + if err != nil { + t.Fatal(err) + } + originalPods, err := spc.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Fatal(err) + } + sort.Sort(ascendingOrdinal(originalPods)) + + // since maxUnavailable is 2, update pods 4 and 5, this will delete the pod 4 and 5, + if _, err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil { + t.Fatal(err) + } + pods, err := spc.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Fatal(err) + } + + sort.Sort(ascendingOrdinal(pods)) + + // expected number of pod is 0,1,2,3 + if !reflect.DeepEqual(pods, originalPods[:4]) { + t.Fatalf("Expected pods %v, got pods %v", originalPods[:4], pods) + } + + // create new pods + if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + t.Fatal(err) + } + pods, err = spc.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Fatal(err) + } + + tc.verifyFn(set, spc, ssc, pods, totalPods, selector) + + // pods 3/4/5 ready, should not update other pods + spc.setPodRunning(set, 3) + spc.setPodRunning(set, 5) + spc.setPodReady(set, 5) + originalPods, _ = spc.setPodReady(set, 3) + sort.Sort(ascendingOrdinal(originalPods)) + if _, err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil { + t.Fatal(err) + } + pods, err = spc.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Fatal(err) + } + sort.Sort(ascendingOrdinal(pods)) + if !reflect.DeepEqual(pods, originalPods) { + t.Fatalf("Expected pods %v, got pods %v", originalPods, pods) + } + } + +} + +func setupForInvariant(t *testing.T) (*apps.StatefulSet, *fakeObjectManager, StatefulSetControlInterface, intstr.IntOrString, int) { + totalPods := 6 + set := newStatefulSet(totalPods) + // update all pods >=3(3,4,5) + var partition int32 = 3 + var maxUnavailable = intstr.FromInt(2) + set.Spec.UpdateStrategy = apps.StatefulSetUpdateStrategy{ + Type: apps.RollingUpdateStatefulSetStrategyType, + RollingUpdate: func() *apps.RollingUpdateStatefulSetStrategy { + return &apps.RollingUpdateStatefulSetStrategy{ + Partition: &partition, + MaxUnavailable: &maxUnavailable, + } + }(), + } + + client := fake.NewSimpleClientset() + spc, _, ssc := setupController(client) + if err := scaleUpStatefulSetControl(set, ssc, spc, assertBurstInvariants); err != nil { + t.Fatal(err) + } + set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + t.Fatal(err) + } + + return set, spc, ssc, maxUnavailable, totalPods +} + +func TestStatefulSetControlRollingUpdateWithMaxUnavailableInOrderedModeVerifyInvariant(t *testing.T) { + // Make all pods in statefulset unavailable one by one + // and verify that RollingUpdate doesnt proceed with maxUnavailable set + // this could have been a simple loop, keeping it like this to be able + // to add more params here. + testCases := []struct { + ordinalOfPodToTerminate []int + }{ + + {[]int{}}, + {[]int{5}}, + {[]int{3}}, + {[]int{4}}, + {[]int{5, 4}}, + {[]int{5, 3}}, + {[]int{4, 3}}, + {[]int{5, 4, 3}}, + {[]int{2}}, // note this is an ordinal greater than partition(3) + {[]int{1}}, // note this is an ordinal greater than partition(3) + } + for _, tc := range testCases { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MaxUnavailableStatefulSet, true)() + set, spc, ssc, maxUnavailable, totalPods := setupForInvariant(t) + t.Run(fmt.Sprintf("terminating pod at ordinal %d", tc.ordinalOfPodToTerminate), func(t *testing.T) { + status := apps.StatefulSetStatus{Replicas: int32(totalPods)} + updateRevision := &apps.ControllerRevision{} + + for i := 0; i < len(tc.ordinalOfPodToTerminate); i++ { + // Ensure at least one pod is unavailable before trying to update + _, err := spc.addTerminatingPod(set, tc.ordinalOfPodToTerminate[i]) + if err != nil { + t.Fatal(err) + } + } + + selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) + if err != nil { + t.Fatal(err) + } + + originalPods, err := spc.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Fatal(err) + } + + sort.Sort(ascendingOrdinal(originalPods)) + + // start to update + set.Spec.Template.Spec.Containers[0].Image = "foo" + + // try to update the statefulset + // this function is only called in main code when feature gate is enabled + if _, err = updateStatefulSetAfterInvariantEstablished(context.TODO(), ssc.(*defaultStatefulSetControl), set, originalPods, updateRevision, status); err != nil { + t.Fatal(err) + } + pods, err := spc.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Fatal(err) + } + + sort.Sort(ascendingOrdinal(pods)) + + expecteddPodsToBeDeleted := maxUnavailable.IntValue() - len(tc.ordinalOfPodToTerminate) + if expecteddPodsToBeDeleted < 0 { + expecteddPodsToBeDeleted = 0 + } + + expectedPodsAfterUpdate := totalPods - expecteddPodsToBeDeleted + + if len(pods) != expectedPodsAfterUpdate { + t.Errorf("Expected pods %v, got pods %v", expectedPodsAfterUpdate, len(pods)) + } + + }) + } +} + func TestStatefulSetControlRollingUpdate(t *testing.T) { type testcase struct { name string