mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-20 02:11:09 +00:00
Merge pull request #109694 from RyanAoh/fix-108837
Be sure to update the status of StatefulSet even if the new replica creation fails
This commit is contained in:
commit
b74d023e70
@ -109,16 +109,6 @@ func (ssc *defaultStatefulSetControl) performUpdate(
|
||||
if err != nil {
|
||||
return currentRevision, updateRevision, currentStatus, err
|
||||
}
|
||||
// update the set's status
|
||||
err = ssc.updateStatefulSetStatus(ctx, set, currentStatus)
|
||||
if err != nil {
|
||||
return currentRevision, updateRevision, currentStatus, err
|
||||
}
|
||||
klog.V(4).InfoS("StatefulSet pod status", "statefulSet", klog.KObj(set),
|
||||
"replicas", currentStatus.Replicas,
|
||||
"readyReplicas", currentStatus.ReadyReplicas,
|
||||
"currentReplicas", currentStatus.CurrentReplicas,
|
||||
"updatedReplicas", currentStatus.UpdatedReplicas)
|
||||
|
||||
klog.V(4).InfoS("StatefulSet revisions", "statefulSet", klog.KObj(set),
|
||||
"currentRevision", currentStatus.CurrentRevision,
|
||||
@ -274,7 +264,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
|
||||
currentRevision *apps.ControllerRevision,
|
||||
updateRevision *apps.ControllerRevision,
|
||||
collisionCount int32,
|
||||
pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
|
||||
pods []*v1.Pod) (statefulSetStatus *apps.StatefulSetStatus, updateErr error) {
|
||||
// get the current and update revisions of the set.
|
||||
currentSet, err := ApplyRevision(set, currentRevision)
|
||||
if err != nil {
|
||||
@ -340,6 +330,23 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
|
||||
// If the ordinal could not be parsed (ord < 0), ignore the Pod.
|
||||
}
|
||||
|
||||
// make sure to update the latest status even if there is an error later
|
||||
defer func() {
|
||||
// update the set's status
|
||||
statusErr := ssc.updateStatefulSetStatus(ctx, set, &status)
|
||||
if statusErr == nil {
|
||||
klog.V(4).InfoS("Updated status", "statefulSet", klog.KObj(set),
|
||||
"replicas", status.Replicas,
|
||||
"readyReplicas", status.ReadyReplicas,
|
||||
"currentReplicas", status.CurrentReplicas,
|
||||
"updatedReplicas", status.UpdatedReplicas)
|
||||
} else if updateErr == nil {
|
||||
updateErr = statusErr
|
||||
} else {
|
||||
klog.V(4).InfoS("Could not update status", "statefulSet", klog.KObj(set), "err", statusErr)
|
||||
}
|
||||
}()
|
||||
|
||||
// for any empty indices in the sequence [0,set.Spec.Replicas) create a new Pod at the correct revision
|
||||
for ord := 0; ord < replicaCount; ord++ {
|
||||
if replicas[ord] == nil {
|
||||
|
@ -30,10 +30,13 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/informers"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
"k8s.io/kubernetes/pkg/controller/statefulset"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
)
|
||||
@ -351,3 +354,57 @@ func setPodsReadyCondition(t *testing.T, clientSet clientset.Interface, pods *v1
|
||||
t.Fatalf("failed to mark all StatefulSet pods to ready: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// add for issue: https://github.com/kubernetes/kubernetes/issues/108837
|
||||
func TestStatefulSetStatusWithPodFail(t *testing.T) {
|
||||
limitedPodNumber := 2
|
||||
controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig()
|
||||
controlPlaneConfig.GenericConfig.AdmissionControl = &fakePodFailAdmission{
|
||||
limitedPodNumber: limitedPodNumber,
|
||||
}
|
||||
_, s, closeFn := framework.RunAnAPIServer(controlPlaneConfig)
|
||||
defer closeFn()
|
||||
|
||||
config := restclient.Config{Host: s.URL}
|
||||
c, err := clientset.NewForConfig(&config)
|
||||
if err != nil {
|
||||
t.Fatalf("Could not create clientset: %v", err)
|
||||
}
|
||||
resyncPeriod := 12 * time.Hour
|
||||
informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "statefulset-informers")), resyncPeriod)
|
||||
|
||||
ssc := statefulset.NewStatefulSetController(
|
||||
informers.Core().V1().Pods(),
|
||||
informers.Apps().V1().StatefulSets(),
|
||||
informers.Core().V1().PersistentVolumeClaims(),
|
||||
informers.Apps().V1().ControllerRevisions(),
|
||||
clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "statefulset-controller")),
|
||||
)
|
||||
|
||||
ns := framework.CreateTestingNamespace("test-pod-fail", s, t)
|
||||
defer framework.DeleteTestingNamespace(ns, s, t)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
informers.Start(ctx.Done())
|
||||
go ssc.Run(ctx, 5)
|
||||
|
||||
sts := newSTS("sts", ns.Name, 4)
|
||||
_, err = c.AppsV1().StatefulSets(sts.Namespace).Create(context.TODO(), sts, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Could not create statefuleSet %s: %v", sts.Name, err)
|
||||
}
|
||||
|
||||
wantReplicas := limitedPodNumber
|
||||
var gotReplicas int32
|
||||
if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
|
||||
newSTS, err := c.AppsV1().StatefulSets(sts.Namespace).Get(context.TODO(), sts.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
gotReplicas = newSTS.Status.Replicas
|
||||
return gotReplicas == int32(wantReplicas), nil
|
||||
}); err != nil {
|
||||
t.Fatalf("StatefulSet %s status has %d replicas, want replicas %d: %v", sts.Name, gotReplicas, wantReplicas, err)
|
||||
}
|
||||
}
|
||||
|
@ -29,12 +29,14 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/admission"
|
||||
"k8s.io/client-go/informers"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
typedappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
|
||||
typedv1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/util/retry"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
|
||||
//svc "k8s.io/kubernetes/pkg/api/v1/service"
|
||||
"k8s.io/kubernetes/pkg/controller/statefulset"
|
||||
@ -309,3 +311,26 @@ func scaleSTS(t *testing.T, c clientset.Interface, sts *appsv1.StatefulSet, repl
|
||||
}
|
||||
waitSTSStable(t, c, sts)
|
||||
}
|
||||
|
||||
var _ admission.ValidationInterface = &fakePodFailAdmission{}
|
||||
|
||||
type fakePodFailAdmission struct {
|
||||
limitedPodNumber int
|
||||
succeedPodsCount int
|
||||
}
|
||||
|
||||
func (f *fakePodFailAdmission) Handles(operation admission.Operation) bool {
|
||||
return operation == admission.Create
|
||||
}
|
||||
|
||||
func (f *fakePodFailAdmission) Validate(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) (err error) {
|
||||
if attr.GetKind().GroupKind() != api.Kind("Pod") {
|
||||
return nil
|
||||
}
|
||||
|
||||
if f.succeedPodsCount >= f.limitedPodNumber {
|
||||
return fmt.Errorf("fakePodFailAdmission error")
|
||||
}
|
||||
f.succeedPodsCount++
|
||||
return nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user