From 774df1b0af9e76736fef718233fdcaea9c7c2d4f Mon Sep 17 00:00:00 2001 From: Aohan Yang Date: Wed, 27 Apr 2022 20:10:07 +0800 Subject: [PATCH] Be sure to update the status of StatefulSet even if the new replica creation fails --- .../statefulset/stateful_set_control.go | 29 ++++++---- .../statefulset/statefulset_test.go | 57 +++++++++++++++++++ test/integration/statefulset/util.go | 25 ++++++++ 3 files changed, 100 insertions(+), 11 deletions(-) diff --git a/pkg/controller/statefulset/stateful_set_control.go b/pkg/controller/statefulset/stateful_set_control.go index 6fb3579fce6..ba18fb0a5f1 100644 --- a/pkg/controller/statefulset/stateful_set_control.go +++ b/pkg/controller/statefulset/stateful_set_control.go @@ -109,16 +109,6 @@ func (ssc *defaultStatefulSetControl) performUpdate( if err != nil { return currentRevision, updateRevision, currentStatus, err } - // update the set's status - err = ssc.updateStatefulSetStatus(ctx, set, currentStatus) - if err != nil { - return currentRevision, updateRevision, currentStatus, err - } - klog.V(4).InfoS("StatefulSet pod status", "statefulSet", klog.KObj(set), - "replicas", currentStatus.Replicas, - "readyReplicas", currentStatus.ReadyReplicas, - "currentReplicas", currentStatus.CurrentReplicas, - "updatedReplicas", currentStatus.UpdatedReplicas) klog.V(4).InfoS("StatefulSet revisions", "statefulSet", klog.KObj(set), "currentRevision", currentStatus.CurrentRevision, @@ -274,7 +264,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( currentRevision *apps.ControllerRevision, updateRevision *apps.ControllerRevision, collisionCount int32, - pods []*v1.Pod) (*apps.StatefulSetStatus, error) { + pods []*v1.Pod) (statefulSetStatus *apps.StatefulSetStatus, updateErr error) { // get the current and update revisions of the set. currentSet, err := ApplyRevision(set, currentRevision) if err != nil { @@ -340,6 +330,23 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( // If the ordinal could not be parsed (ord < 0), ignore the Pod. } + // make sure to update the latest status even if there is an error later + defer func() { + // update the set's status + statusErr := ssc.updateStatefulSetStatus(ctx, set, &status) + if statusErr == nil { + klog.V(4).InfoS("Updated status", "statefulSet", klog.KObj(set), + "replicas", status.Replicas, + "readyReplicas", status.ReadyReplicas, + "currentReplicas", status.CurrentReplicas, + "updatedReplicas", status.UpdatedReplicas) + } else if updateErr == nil { + updateErr = statusErr + } else { + klog.V(4).InfoS("Could not update status", "statefulSet", klog.KObj(set), "err", statusErr) + } + }() + // for any empty indices in the sequence [0,set.Spec.Replicas) create a new Pod at the correct revision for ord := 0; ord < replicaCount; ord++ { if replicas[ord] == nil { diff --git a/test/integration/statefulset/statefulset_test.go b/test/integration/statefulset/statefulset_test.go index 678e07eae91..01d374eed00 100644 --- a/test/integration/statefulset/statefulset_test.go +++ b/test/integration/statefulset/statefulset_test.go @@ -30,10 +30,13 @@ import ( "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/dynamic" + "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" + restclient "k8s.io/client-go/rest" featuregatetesting "k8s.io/component-base/featuregate/testing" apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" podutil "k8s.io/kubernetes/pkg/api/v1/pod" + "k8s.io/kubernetes/pkg/controller/statefulset" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/test/integration/framework" ) @@ -349,3 +352,57 @@ func setPodsReadyCondition(t *testing.T, clientSet clientset.Interface, pods *v1 t.Fatalf("failed to mark all StatefulSet pods to ready: %v", err) } } + +// add for issue: https://github.com/kubernetes/kubernetes/issues/108837 +func TestStatefulSetStatusWithPodFail(t *testing.T) { + limitedPodNumber := 2 + controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() + controlPlaneConfig.GenericConfig.AdmissionControl = &fakePodFailAdmission{ + limitedPodNumber: limitedPodNumber, + } + _, s, closeFn := framework.RunAnAPIServer(controlPlaneConfig) + defer closeFn() + + config := restclient.Config{Host: s.URL} + c, err := clientset.NewForConfig(&config) + if err != nil { + t.Fatalf("Could not create clientset: %v", err) + } + resyncPeriod := 12 * time.Hour + informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "statefulset-informers")), resyncPeriod) + + ssc := statefulset.NewStatefulSetController( + informers.Core().V1().Pods(), + informers.Apps().V1().StatefulSets(), + informers.Core().V1().PersistentVolumeClaims(), + informers.Apps().V1().ControllerRevisions(), + clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "statefulset-controller")), + ) + + ns := framework.CreateTestingNamespace("test-pod-fail", s, t) + defer framework.DeleteTestingNamespace(ns, s, t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + informers.Start(ctx.Done()) + go ssc.Run(ctx, 5) + + sts := newSTS("sts", ns.Name, 4) + _, err = c.AppsV1().StatefulSets(sts.Namespace).Create(context.TODO(), sts, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Could not create statefuleSet %s: %v", sts.Name, err) + } + + wantReplicas := limitedPodNumber + var gotReplicas int32 + if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) { + newSTS, err := c.AppsV1().StatefulSets(sts.Namespace).Get(context.TODO(), sts.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + gotReplicas = newSTS.Status.Replicas + return gotReplicas == int32(wantReplicas), nil + }); err != nil { + t.Fatalf("StatefulSet %s status has %d replicas, want replicas %d: %v", sts.Name, gotReplicas, wantReplicas, err) + } +} diff --git a/test/integration/statefulset/util.go b/test/integration/statefulset/util.go index b2ef1389c39..6d1705e855d 100644 --- a/test/integration/statefulset/util.go +++ b/test/integration/statefulset/util.go @@ -29,12 +29,14 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/admission" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" typedappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1" typedv1 "k8s.io/client-go/kubernetes/typed/core/v1" restclient "k8s.io/client-go/rest" "k8s.io/client-go/util/retry" + api "k8s.io/kubernetes/pkg/apis/core" //svc "k8s.io/kubernetes/pkg/api/v1/service" "k8s.io/kubernetes/pkg/controller/statefulset" @@ -309,3 +311,26 @@ func scaleSTS(t *testing.T, c clientset.Interface, sts *appsv1.StatefulSet, repl } waitSTSStable(t, c, sts) } + +var _ admission.ValidationInterface = &fakePodFailAdmission{} + +type fakePodFailAdmission struct { + limitedPodNumber int + succeedPodsCount int +} + +func (f *fakePodFailAdmission) Handles(operation admission.Operation) bool { + return operation == admission.Create +} + +func (f *fakePodFailAdmission) Validate(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) (err error) { + if attr.GetKind().GroupKind() != api.Kind("Pod") { + return nil + } + + if f.succeedPodsCount >= f.limitedPodNumber { + return fmt.Errorf("fakePodFailAdmission error") + } + f.succeedPodsCount++ + return nil +}