mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Be sure to update the status of StatefulSet even if the new replica creation fails
This commit is contained in:
parent
537941765f
commit
774df1b0af
@ -109,16 +109,6 @@ func (ssc *defaultStatefulSetControl) performUpdate(
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return currentRevision, updateRevision, currentStatus, err
|
return currentRevision, updateRevision, currentStatus, err
|
||||||
}
|
}
|
||||||
// update the set's status
|
|
||||||
err = ssc.updateStatefulSetStatus(ctx, set, currentStatus)
|
|
||||||
if err != nil {
|
|
||||||
return currentRevision, updateRevision, currentStatus, err
|
|
||||||
}
|
|
||||||
klog.V(4).InfoS("StatefulSet pod status", "statefulSet", klog.KObj(set),
|
|
||||||
"replicas", currentStatus.Replicas,
|
|
||||||
"readyReplicas", currentStatus.ReadyReplicas,
|
|
||||||
"currentReplicas", currentStatus.CurrentReplicas,
|
|
||||||
"updatedReplicas", currentStatus.UpdatedReplicas)
|
|
||||||
|
|
||||||
klog.V(4).InfoS("StatefulSet revisions", "statefulSet", klog.KObj(set),
|
klog.V(4).InfoS("StatefulSet revisions", "statefulSet", klog.KObj(set),
|
||||||
"currentRevision", currentStatus.CurrentRevision,
|
"currentRevision", currentStatus.CurrentRevision,
|
||||||
@ -274,7 +264,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
|
|||||||
currentRevision *apps.ControllerRevision,
|
currentRevision *apps.ControllerRevision,
|
||||||
updateRevision *apps.ControllerRevision,
|
updateRevision *apps.ControllerRevision,
|
||||||
collisionCount int32,
|
collisionCount int32,
|
||||||
pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
|
pods []*v1.Pod) (statefulSetStatus *apps.StatefulSetStatus, updateErr error) {
|
||||||
// get the current and update revisions of the set.
|
// get the current and update revisions of the set.
|
||||||
currentSet, err := ApplyRevision(set, currentRevision)
|
currentSet, err := ApplyRevision(set, currentRevision)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -340,6 +330,23 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
|
|||||||
// If the ordinal could not be parsed (ord < 0), ignore the Pod.
|
// If the ordinal could not be parsed (ord < 0), ignore the Pod.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// make sure to update the latest status even if there is an error later
|
||||||
|
defer func() {
|
||||||
|
// update the set's status
|
||||||
|
statusErr := ssc.updateStatefulSetStatus(ctx, set, &status)
|
||||||
|
if statusErr == nil {
|
||||||
|
klog.V(4).InfoS("Updated status", "statefulSet", klog.KObj(set),
|
||||||
|
"replicas", status.Replicas,
|
||||||
|
"readyReplicas", status.ReadyReplicas,
|
||||||
|
"currentReplicas", status.CurrentReplicas,
|
||||||
|
"updatedReplicas", status.UpdatedReplicas)
|
||||||
|
} else if updateErr == nil {
|
||||||
|
updateErr = statusErr
|
||||||
|
} else {
|
||||||
|
klog.V(4).InfoS("Could not update status", "statefulSet", klog.KObj(set), "err", statusErr)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// for any empty indices in the sequence [0,set.Spec.Replicas) create a new Pod at the correct revision
|
// for any empty indices in the sequence [0,set.Spec.Replicas) create a new Pod at the correct revision
|
||||||
for ord := 0; ord < replicaCount; ord++ {
|
for ord := 0; ord < replicaCount; ord++ {
|
||||||
if replicas[ord] == nil {
|
if replicas[ord] == nil {
|
||||||
|
@ -30,10 +30,13 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
"k8s.io/client-go/dynamic"
|
"k8s.io/client-go/dynamic"
|
||||||
|
"k8s.io/client-go/informers"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
|
restclient "k8s.io/client-go/rest"
|
||||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||||
apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
||||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||||
|
"k8s.io/kubernetes/pkg/controller/statefulset"
|
||||||
"k8s.io/kubernetes/pkg/features"
|
"k8s.io/kubernetes/pkg/features"
|
||||||
"k8s.io/kubernetes/test/integration/framework"
|
"k8s.io/kubernetes/test/integration/framework"
|
||||||
)
|
)
|
||||||
@ -349,3 +352,57 @@ func setPodsReadyCondition(t *testing.T, clientSet clientset.Interface, pods *v1
|
|||||||
t.Fatalf("failed to mark all StatefulSet pods to ready: %v", err)
|
t.Fatalf("failed to mark all StatefulSet pods to ready: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// add for issue: https://github.com/kubernetes/kubernetes/issues/108837
|
||||||
|
func TestStatefulSetStatusWithPodFail(t *testing.T) {
|
||||||
|
limitedPodNumber := 2
|
||||||
|
controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig()
|
||||||
|
controlPlaneConfig.GenericConfig.AdmissionControl = &fakePodFailAdmission{
|
||||||
|
limitedPodNumber: limitedPodNumber,
|
||||||
|
}
|
||||||
|
_, s, closeFn := framework.RunAnAPIServer(controlPlaneConfig)
|
||||||
|
defer closeFn()
|
||||||
|
|
||||||
|
config := restclient.Config{Host: s.URL}
|
||||||
|
c, err := clientset.NewForConfig(&config)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Could not create clientset: %v", err)
|
||||||
|
}
|
||||||
|
resyncPeriod := 12 * time.Hour
|
||||||
|
informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "statefulset-informers")), resyncPeriod)
|
||||||
|
|
||||||
|
ssc := statefulset.NewStatefulSetController(
|
||||||
|
informers.Core().V1().Pods(),
|
||||||
|
informers.Apps().V1().StatefulSets(),
|
||||||
|
informers.Core().V1().PersistentVolumeClaims(),
|
||||||
|
informers.Apps().V1().ControllerRevisions(),
|
||||||
|
clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "statefulset-controller")),
|
||||||
|
)
|
||||||
|
|
||||||
|
ns := framework.CreateTestingNamespace("test-pod-fail", s, t)
|
||||||
|
defer framework.DeleteTestingNamespace(ns, s, t)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
informers.Start(ctx.Done())
|
||||||
|
go ssc.Run(ctx, 5)
|
||||||
|
|
||||||
|
sts := newSTS("sts", ns.Name, 4)
|
||||||
|
_, err = c.AppsV1().StatefulSets(sts.Namespace).Create(context.TODO(), sts, metav1.CreateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Could not create statefuleSet %s: %v", sts.Name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
wantReplicas := limitedPodNumber
|
||||||
|
var gotReplicas int32
|
||||||
|
if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
|
||||||
|
newSTS, err := c.AppsV1().StatefulSets(sts.Namespace).Get(context.TODO(), sts.Name, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
gotReplicas = newSTS.Status.Replicas
|
||||||
|
return gotReplicas == int32(wantReplicas), nil
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("StatefulSet %s status has %d replicas, want replicas %d: %v", sts.Name, gotReplicas, wantReplicas, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -29,12 +29,14 @@ import (
|
|||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
"k8s.io/apiserver/pkg/admission"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
typedappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
|
typedappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
|
||||||
typedv1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
typedv1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||||
restclient "k8s.io/client-go/rest"
|
restclient "k8s.io/client-go/rest"
|
||||||
"k8s.io/client-go/util/retry"
|
"k8s.io/client-go/util/retry"
|
||||||
|
api "k8s.io/kubernetes/pkg/apis/core"
|
||||||
|
|
||||||
//svc "k8s.io/kubernetes/pkg/api/v1/service"
|
//svc "k8s.io/kubernetes/pkg/api/v1/service"
|
||||||
"k8s.io/kubernetes/pkg/controller/statefulset"
|
"k8s.io/kubernetes/pkg/controller/statefulset"
|
||||||
@ -309,3 +311,26 @@ func scaleSTS(t *testing.T, c clientset.Interface, sts *appsv1.StatefulSet, repl
|
|||||||
}
|
}
|
||||||
waitSTSStable(t, c, sts)
|
waitSTSStable(t, c, sts)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var _ admission.ValidationInterface = &fakePodFailAdmission{}
|
||||||
|
|
||||||
|
type fakePodFailAdmission struct {
|
||||||
|
limitedPodNumber int
|
||||||
|
succeedPodsCount int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakePodFailAdmission) Handles(operation admission.Operation) bool {
|
||||||
|
return operation == admission.Create
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakePodFailAdmission) Validate(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) (err error) {
|
||||||
|
if attr.GetKind().GroupKind() != api.Kind("Pod") {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if f.succeedPodsCount >= f.limitedPodNumber {
|
||||||
|
return fmt.Errorf("fakePodFailAdmission error")
|
||||||
|
}
|
||||||
|
f.succeedPodsCount++
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user