mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-12 13:31:52 +00:00
Merge pull request #124563 from atiratree/automated-cherry-pick-of-#123809-upstream-release-1.30
Automated cherry pick of #123809: fix stateful set pod recreation and event spam
This commit is contained in:
commit
3b70df5ad2
@ -98,7 +98,6 @@ func NewStatefulSetController(
|
||||
recorder),
|
||||
NewRealStatefulSetStatusUpdater(kubeClient, setInformer.Lister()),
|
||||
history.NewHistory(kubeClient, revInformer.Lister()),
|
||||
recorder,
|
||||
),
|
||||
pvcListerSynced: pvcInformer.Informer().HasSynced,
|
||||
revListerSynced: revInformer.Informer().HasSynced,
|
||||
@ -235,6 +234,9 @@ func (ssc *StatefulSetController) updatePod(logger klog.Logger, old, cur interfa
|
||||
return
|
||||
}
|
||||
logger.V(4).Info("Pod objectMeta updated", "pod", klog.KObj(curPod), "oldObjectMeta", oldPod.ObjectMeta, "newObjectMeta", curPod.ObjectMeta)
|
||||
if oldPod.Status.Phase != curPod.Status.Phase {
|
||||
logger.V(4).Info("StatefulSet Pod phase changed", "pod", klog.KObj(curPod), "statefulSet", klog.KObj(set), "podPhase", curPod.Status.Phase)
|
||||
}
|
||||
ssc.enqueueStatefulSet(set)
|
||||
// TODO: MinReadySeconds in the Pod will generate an Available condition to be added in
|
||||
// the Pod status which in turn will trigger a requeue of the owning replica set thus
|
||||
|
@ -27,7 +27,6 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/controller/history"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
@ -61,16 +60,14 @@ type StatefulSetControlInterface interface {
|
||||
func NewDefaultStatefulSetControl(
|
||||
podControl *StatefulPodControl,
|
||||
statusUpdater StatefulSetStatusUpdaterInterface,
|
||||
controllerHistory history.Interface,
|
||||
recorder record.EventRecorder) StatefulSetControlInterface {
|
||||
return &defaultStatefulSetControl{podControl, statusUpdater, controllerHistory, recorder}
|
||||
controllerHistory history.Interface) StatefulSetControlInterface {
|
||||
return &defaultStatefulSetControl{podControl, statusUpdater, controllerHistory}
|
||||
}
|
||||
|
||||
type defaultStatefulSetControl struct {
|
||||
podControl *StatefulPodControl
|
||||
statusUpdater StatefulSetStatusUpdaterInterface
|
||||
controllerHistory history.Interface
|
||||
recorder record.EventRecorder
|
||||
}
|
||||
|
||||
// UpdateStatefulSet executes the core logic loop for a stateful set, applying the predictable and
|
||||
@ -367,45 +364,25 @@ func updateStatus(status *apps.StatefulSetStatus, minReadySeconds int32, current
|
||||
func (ssc *defaultStatefulSetControl) processReplica(
|
||||
ctx context.Context,
|
||||
set *apps.StatefulSet,
|
||||
currentRevision *apps.ControllerRevision,
|
||||
updateRevision *apps.ControllerRevision,
|
||||
currentSet *apps.StatefulSet,
|
||||
updateSet *apps.StatefulSet,
|
||||
monotonic bool,
|
||||
replicas []*v1.Pod,
|
||||
i int) (bool, error) {
|
||||
logger := klog.FromContext(ctx)
|
||||
// Delete and recreate pods which finished running.
|
||||
//
|
||||
|
||||
// Note that pods with phase Succeeded will also trigger this event. This is
|
||||
// because final pod phase of evicted or otherwise forcibly stopped pods
|
||||
// (e.g. terminated on node reboot) is determined by the exit code of the
|
||||
// container, not by the reason for pod termination. We should restart the pod
|
||||
// regardless of the exit code.
|
||||
if isFailed(replicas[i]) || isSucceeded(replicas[i]) {
|
||||
if isFailed(replicas[i]) {
|
||||
ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod",
|
||||
"StatefulSet %s/%s is recreating failed Pod %s",
|
||||
set.Namespace,
|
||||
set.Name,
|
||||
replicas[i].Name)
|
||||
} else {
|
||||
ssc.recorder.Eventf(set, v1.EventTypeNormal, "RecreatingTerminatedPod",
|
||||
"StatefulSet %s/%s is recreating terminated Pod %s",
|
||||
set.Namespace,
|
||||
set.Name,
|
||||
replicas[i].Name)
|
||||
if replicas[i].DeletionTimestamp == nil {
|
||||
if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil {
|
||||
return true, err
|
||||
}
|
||||
}
|
||||
if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil {
|
||||
return true, err
|
||||
}
|
||||
replicaOrd := i + getStartOrdinal(set)
|
||||
replicas[i] = newVersionedStatefulSetPod(
|
||||
currentSet,
|
||||
updateSet,
|
||||
currentRevision.Name,
|
||||
updateRevision.Name,
|
||||
replicaOrd)
|
||||
// New pod should be generated on the next sync after the current pod is removed from etcd.
|
||||
return true, nil
|
||||
}
|
||||
// If we find a Pod that has not been created we create the Pod
|
||||
if !isCreated(replicas[i]) {
|
||||
@ -637,7 +614,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
|
||||
|
||||
// First, process each living replica. Exit if we run into an error or something blocking in monotonic mode.
|
||||
processReplicaFn := func(i int) (bool, error) {
|
||||
return ssc.processReplica(ctx, set, currentRevision, updateRevision, currentSet, updateSet, monotonic, replicas, i)
|
||||
return ssc.processReplica(ctx, set, updateSet, monotonic, replicas, i)
|
||||
}
|
||||
if shouldExit, err := runForAll(replicas, processReplicaFn, monotonic); shouldExit || err != nil {
|
||||
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
|
||||
|
@ -60,8 +60,7 @@ func setupController(client clientset.Interface) (*fakeObjectManager, *fakeState
|
||||
om := newFakeObjectManager(informerFactory)
|
||||
spc := NewStatefulPodControlFromManager(om, &noopRecorder{})
|
||||
ssu := newFakeStatefulSetStatusUpdater(informerFactory.Apps().V1().StatefulSets())
|
||||
recorder := &noopRecorder{}
|
||||
ssc := NewDefaultStatefulSetControl(spc, ssu, history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions()), recorder)
|
||||
ssc := NewDefaultStatefulSetControl(spc, ssu, history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions()))
|
||||
|
||||
// The informer is not started. The tests here manipulate the local cache (indexers) directly, and there is no waiting
|
||||
// for client state to sync. In fact, because the client is not updated during tests, informer updates will break tests
|
||||
@ -171,10 +170,11 @@ func TestStatefulSetControl(t *testing.T) {
|
||||
{ReplacesPods, largeSetFn},
|
||||
{RecreatesFailedPod, simpleSetFn},
|
||||
{RecreatesSucceededPod, simpleSetFn},
|
||||
{RecreatesFailedPodWithDeleteFailure, simpleSetFn},
|
||||
{RecreatesSucceededPodWithDeleteFailure, simpleSetFn},
|
||||
{CreatePodFailure, simpleSetFn},
|
||||
{UpdatePodFailure, simpleSetFn},
|
||||
{UpdateSetStatusFailure, simpleSetFn},
|
||||
{PodRecreateDeleteFailure, simpleSetFn},
|
||||
{NewRevisionDeletePodFailure, simpleSetFn},
|
||||
{RecreatesPVCForPendingPod, simpleSetFn},
|
||||
}
|
||||
@ -398,9 +398,10 @@ func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc)
|
||||
}
|
||||
}
|
||||
|
||||
func recreatesPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc, phase v1.PodPhase) {
|
||||
func recreatesPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc, terminalPhase v1.PodPhase, testDeletePodFailure bool) {
|
||||
client := fake.NewSimpleClientset()
|
||||
om, _, ssc := setupController(client)
|
||||
expectedNumOfDeleteRequests := 0
|
||||
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
@ -415,34 +416,105 @@ func recreatesPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc,
|
||||
if err := invariants(set, om); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
pods, err = om.podsLister.Pods(set.Namespace).List(selector)
|
||||
if err != nil {
|
||||
if om.deletePodTracker.requests != expectedNumOfDeleteRequests {
|
||||
t.Errorf("Found unexpected number of delete calls, got %v, expected 1", om.deletePodTracker.requests)
|
||||
}
|
||||
if pods, err = om.podsLister.Pods(set.Namespace).List(selector); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
pods[0].Status.Phase = phase
|
||||
om.podsIndexer.Update(pods[0])
|
||||
|
||||
terminalPodOrdinal := -1
|
||||
for i, pod := range pods {
|
||||
// Set at least Pending phase to acknowledge the creation of pods
|
||||
newPhase := v1.PodPending
|
||||
if i == 0 {
|
||||
// Set terminal phase for the first pod
|
||||
newPhase = terminalPhase
|
||||
terminalPodOrdinal = getOrdinal(pod)
|
||||
}
|
||||
pod.Status.Phase = newPhase
|
||||
if err = om.podsIndexer.Update(pod); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
if pods, err = om.podsLister.Pods(set.Namespace).List(selector); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if testDeletePodFailure {
|
||||
// Expect pod deletion failure
|
||||
om.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
|
||||
expectedNumOfDeleteRequests++
|
||||
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); !isOrHasInternalError(err) {
|
||||
t.Errorf("StatefulSetControl did not return InternalError, found %s", err)
|
||||
}
|
||||
if err := invariants(set, om); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if om.deletePodTracker.requests != expectedNumOfDeleteRequests {
|
||||
t.Errorf("Found unexpected number of delete calls, got %v, expected %v", om.deletePodTracker.requests, expectedNumOfDeleteRequests)
|
||||
}
|
||||
if pods, err = om.podsLister.Pods(set.Namespace).List(selector); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Expect pod deletion
|
||||
expectedNumOfDeleteRequests++
|
||||
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
|
||||
t.Errorf("Error updating StatefulSet %s", err)
|
||||
}
|
||||
if err := invariants(set, om); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
pods, err = om.podsLister.Pods(set.Namespace).List(selector)
|
||||
if err != nil {
|
||||
if om.deletePodTracker.requests != expectedNumOfDeleteRequests {
|
||||
t.Errorf("Found unexpected number of delete calls, got %v, expected %v", om.deletePodTracker.requests, expectedNumOfDeleteRequests)
|
||||
}
|
||||
if pods, err = om.podsLister.Pods(set.Namespace).List(selector); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if isCreated(pods[0]) {
|
||||
|
||||
// Expect no additional delete calls and expect pod creation
|
||||
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
|
||||
t.Errorf("Error updating StatefulSet %s", err)
|
||||
}
|
||||
if err := invariants(set, om); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if om.deletePodTracker.requests != expectedNumOfDeleteRequests {
|
||||
t.Errorf("Found unexpected number of delete calls, got %v, expected %v", om.deletePodTracker.requests, expectedNumOfDeleteRequests)
|
||||
}
|
||||
if pods, err = om.podsLister.Pods(set.Namespace).List(selector); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
recreatedPod := findPodByOrdinal(pods, terminalPodOrdinal)
|
||||
// new recreated pod should have empty phase
|
||||
if recreatedPod == nil || isCreated(recreatedPod) {
|
||||
t.Error("StatefulSet did not recreate failed Pod")
|
||||
}
|
||||
expectedNumberOfCreateRequests := 2
|
||||
if monotonic := !allowsBurst(set); !monotonic {
|
||||
expectedNumberOfCreateRequests = int(*set.Spec.Replicas + 1)
|
||||
}
|
||||
if om.createPodTracker.requests != expectedNumberOfCreateRequests {
|
||||
t.Errorf("Found unexpected number of create calls, got %v, expected %v", om.deletePodTracker.requests, expectedNumberOfCreateRequests)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
|
||||
recreatesPod(t, set, invariants, v1.PodFailed)
|
||||
recreatesPod(t, set, invariants, v1.PodFailed, false)
|
||||
}
|
||||
|
||||
func RecreatesSucceededPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
|
||||
recreatesPod(t, set, invariants, v1.PodSucceeded)
|
||||
recreatesPod(t, set, invariants, v1.PodSucceeded, false)
|
||||
}
|
||||
|
||||
func RecreatesFailedPodWithDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
|
||||
recreatesPod(t, set, invariants, v1.PodFailed, true)
|
||||
}
|
||||
|
||||
func RecreatesSucceededPodWithDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
|
||||
recreatesPod(t, set, invariants, v1.PodSucceeded, true)
|
||||
}
|
||||
|
||||
func CreatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
|
||||
@ -450,8 +522,8 @@ func CreatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantF
|
||||
om, _, ssc := setupController(client)
|
||||
om.SetCreateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2)
|
||||
|
||||
if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil && isOrHasInternalError(err) {
|
||||
t.Errorf("StatefulSetControl did not return InternalError found %s", err)
|
||||
if err := scaleUpStatefulSetControl(set, ssc, om, invariants); !isOrHasInternalError(err) {
|
||||
t.Errorf("StatefulSetControl did not return InternalError, found %s", err)
|
||||
}
|
||||
// Update so set.Status is set for the next scaleUpStatefulSetControl call.
|
||||
var err error
|
||||
@ -514,8 +586,8 @@ func UpdatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantF
|
||||
om.podsIndexer.Update(pods[0])
|
||||
|
||||
// now it should fail
|
||||
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil && isOrHasInternalError(err) {
|
||||
t.Errorf("StatefulSetControl did not return InternalError found %s", err)
|
||||
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); !isOrHasInternalError(err) {
|
||||
t.Errorf("StatefulSetControl did not return InternalError, found %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -524,8 +596,8 @@ func UpdateSetStatusFailure(t *testing.T, set *apps.StatefulSet, invariants inva
|
||||
om, ssu, ssc := setupController(client)
|
||||
ssu.SetUpdateStatefulSetStatusError(apierrors.NewInternalError(errors.New("API server failed")), 2)
|
||||
|
||||
if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil && isOrHasInternalError(err) {
|
||||
t.Errorf("StatefulSetControl did not return InternalError found %s", err)
|
||||
if err := scaleUpStatefulSetControl(set, ssc, om, invariants); !isOrHasInternalError(err) {
|
||||
t.Errorf("StatefulSetControl did not return InternalError, found %s", err)
|
||||
}
|
||||
// Update so set.Status is set for the next scaleUpStatefulSetControl call.
|
||||
var err error
|
||||
@ -551,52 +623,6 @@ func UpdateSetStatusFailure(t *testing.T, set *apps.StatefulSet, invariants inva
|
||||
}
|
||||
}
|
||||
|
||||
func PodRecreateDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
|
||||
client := fake.NewSimpleClientset(set)
|
||||
om, _, ssc := setupController(client)
|
||||
|
||||
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
pods, err := om.podsLister.Pods(set.Namespace).List(selector)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
|
||||
t.Errorf("Error updating StatefulSet %s", err)
|
||||
}
|
||||
if err := invariants(set, om); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
pods, err = om.podsLister.Pods(set.Namespace).List(selector)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
pods[0].Status.Phase = v1.PodFailed
|
||||
om.podsIndexer.Update(pods[0])
|
||||
om.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
|
||||
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil && isOrHasInternalError(err) {
|
||||
t.Errorf("StatefulSet failed to %s", err)
|
||||
}
|
||||
if err := invariants(set, om); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
|
||||
t.Errorf("Error updating StatefulSet %s", err)
|
||||
}
|
||||
if err := invariants(set, om); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
pods, err = om.podsLister.Pods(set.Namespace).List(selector)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if isCreated(pods[0]) {
|
||||
t.Error("StatefulSet did not recreate failed Pod")
|
||||
}
|
||||
}
|
||||
|
||||
func NewRevisionDeletePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
|
||||
client := fake.NewSimpleClientset(set)
|
||||
om, _, ssc := setupController(client)
|
||||
@ -792,7 +818,7 @@ func TestStatefulSetControlScaleDownDeleteError(t *testing.T) {
|
||||
}
|
||||
*set.Spec.Replicas = 0
|
||||
om.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2)
|
||||
if err := scaleDownStatefulSetControl(set, ssc, om, invariants); err != nil && isOrHasInternalError(err) {
|
||||
if err := scaleDownStatefulSetControl(set, ssc, om, invariants); !isOrHasInternalError(err) {
|
||||
t.Errorf("StatefulSetControl failed to throw error on delete %s", err)
|
||||
}
|
||||
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
|
||||
@ -834,8 +860,7 @@ func TestStatefulSetControl_getSetRevisions(t *testing.T) {
|
||||
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
|
||||
spc := NewStatefulPodControlFromManager(newFakeObjectManager(informerFactory), &noopRecorder{})
|
||||
ssu := newFakeStatefulSetStatusUpdater(informerFactory.Apps().V1().StatefulSets())
|
||||
recorder := &noopRecorder{}
|
||||
ssc := defaultStatefulSetControl{spc, ssu, history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions()), recorder}
|
||||
ssc := defaultStatefulSetControl{spc, ssu, history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions())}
|
||||
|
||||
stop := make(chan struct{})
|
||||
defer close(stop)
|
||||
@ -2398,51 +2423,53 @@ type requestTracker struct {
|
||||
err error
|
||||
after int
|
||||
|
||||
parallelLock sync.Mutex
|
||||
parallel int
|
||||
maxParallel int
|
||||
|
||||
delay time.Duration
|
||||
// this block should be updated consistently
|
||||
parallelLock sync.Mutex
|
||||
shouldTrackParallelRequests bool
|
||||
parallelRequests int
|
||||
maxParallelRequests int
|
||||
parallelRequestDelay time.Duration
|
||||
}
|
||||
|
||||
func (rt *requestTracker) errorReady() bool {
|
||||
rt.Lock()
|
||||
defer rt.Unlock()
|
||||
return rt.err != nil && rt.requests >= rt.after
|
||||
}
|
||||
|
||||
func (rt *requestTracker) inc() {
|
||||
rt.parallelLock.Lock()
|
||||
rt.parallel++
|
||||
if rt.maxParallel < rt.parallel {
|
||||
rt.maxParallel = rt.parallel
|
||||
func (rt *requestTracker) trackParallelRequests() {
|
||||
if !rt.shouldTrackParallelRequests {
|
||||
// do not track parallel requests unless specifically enabled
|
||||
return
|
||||
}
|
||||
rt.parallelLock.Unlock()
|
||||
if rt.parallelLock.TryLock() {
|
||||
// lock acquired: we are the only or the first concurrent request
|
||||
// initialize the next set of parallel requests
|
||||
rt.parallelRequests = 1
|
||||
} else {
|
||||
// lock is held by other requests
|
||||
// now wait for the lock to increase the parallelRequests
|
||||
rt.parallelLock.Lock()
|
||||
rt.parallelRequests++
|
||||
}
|
||||
defer rt.parallelLock.Unlock()
|
||||
// update the local maximum of parallel collisions
|
||||
if rt.maxParallelRequests < rt.parallelRequests {
|
||||
rt.maxParallelRequests = rt.parallelRequests
|
||||
}
|
||||
// increase the chance of collisions
|
||||
if rt.parallelRequestDelay > 0 {
|
||||
time.Sleep(rt.parallelRequestDelay)
|
||||
}
|
||||
}
|
||||
|
||||
func (rt *requestTracker) incWithOptionalError() error {
|
||||
rt.Lock()
|
||||
defer rt.Unlock()
|
||||
rt.requests++
|
||||
if rt.delay != 0 {
|
||||
time.Sleep(rt.delay)
|
||||
if rt.err != nil && rt.requests >= rt.after {
|
||||
// reset and pass the error
|
||||
defer func() {
|
||||
rt.err = nil
|
||||
rt.after = 0
|
||||
}()
|
||||
return rt.err
|
||||
}
|
||||
}
|
||||
|
||||
func (rt *requestTracker) reset() {
|
||||
rt.parallelLock.Lock()
|
||||
rt.parallel = 0
|
||||
rt.parallelLock.Unlock()
|
||||
|
||||
rt.Lock()
|
||||
defer rt.Unlock()
|
||||
rt.err = nil
|
||||
rt.after = 0
|
||||
rt.delay = 0
|
||||
}
|
||||
|
||||
func (rt *requestTracker) getErr() error {
|
||||
rt.Lock()
|
||||
defer rt.Unlock()
|
||||
return rt.err
|
||||
return nil
|
||||
}
|
||||
|
||||
func newRequestTracker(requests int, err error, after int) requestTracker {
|
||||
@ -2487,10 +2514,9 @@ func newFakeObjectManager(informerFactory informers.SharedInformerFactory) *fake
|
||||
}
|
||||
|
||||
func (om *fakeObjectManager) CreatePod(ctx context.Context, pod *v1.Pod) error {
|
||||
defer om.createPodTracker.inc()
|
||||
if om.createPodTracker.errorReady() {
|
||||
defer om.createPodTracker.reset()
|
||||
return om.createPodTracker.getErr()
|
||||
defer om.createPodTracker.trackParallelRequests()
|
||||
if err := om.createPodTracker.incWithOptionalError(); err != nil {
|
||||
return err
|
||||
}
|
||||
pod.SetUID(types.UID(pod.Name + "-uid"))
|
||||
return om.podsIndexer.Update(pod)
|
||||
@ -2501,14 +2527,17 @@ func (om *fakeObjectManager) GetPod(namespace, podName string) (*v1.Pod, error)
|
||||
}
|
||||
|
||||
func (om *fakeObjectManager) UpdatePod(pod *v1.Pod) error {
|
||||
defer om.updatePodTracker.trackParallelRequests()
|
||||
if err := om.updatePodTracker.incWithOptionalError(); err != nil {
|
||||
return err
|
||||
}
|
||||
return om.podsIndexer.Update(pod)
|
||||
}
|
||||
|
||||
func (om *fakeObjectManager) DeletePod(pod *v1.Pod) error {
|
||||
defer om.deletePodTracker.inc()
|
||||
if om.deletePodTracker.errorReady() {
|
||||
defer om.deletePodTracker.reset()
|
||||
return om.deletePodTracker.getErr()
|
||||
defer om.deletePodTracker.trackParallelRequests()
|
||||
if err := om.deletePodTracker.incWithOptionalError(); err != nil {
|
||||
return err
|
||||
}
|
||||
if key, err := controller.KeyFunc(pod); err != nil {
|
||||
return err
|
||||
@ -2703,10 +2732,9 @@ func newFakeStatefulSetStatusUpdater(setInformer appsinformers.StatefulSetInform
|
||||
}
|
||||
|
||||
func (ssu *fakeStatefulSetStatusUpdater) UpdateStatefulSetStatus(ctx context.Context, set *apps.StatefulSet, status *apps.StatefulSetStatus) error {
|
||||
defer ssu.updateStatusTracker.inc()
|
||||
if ssu.updateStatusTracker.errorReady() {
|
||||
defer ssu.updateStatusTracker.reset()
|
||||
return ssu.updateStatusTracker.err
|
||||
defer ssu.updateStatusTracker.trackParallelRequests()
|
||||
if err := ssu.updateStatusTracker.incWithOptionalError(); err != nil {
|
||||
return err
|
||||
}
|
||||
set.Status = *status
|
||||
ssu.setsIndexer.Update(set)
|
||||
@ -2955,7 +2983,8 @@ func parallelScale(t *testing.T, set *apps.StatefulSet, replicas, desiredReplica
|
||||
diff := desiredReplicas - replicas
|
||||
client := fake.NewSimpleClientset(set)
|
||||
om, _, ssc := setupController(client)
|
||||
om.createPodTracker.delay = time.Millisecond
|
||||
om.createPodTracker.shouldTrackParallelRequests = true
|
||||
om.createPodTracker.parallelRequestDelay = time.Millisecond
|
||||
|
||||
*set.Spec.Replicas = replicas
|
||||
if err := parallelScaleUpStatefulSetControl(set, ssc, om, invariants); err != nil {
|
||||
@ -2987,8 +3016,8 @@ func parallelScale(t *testing.T, set *apps.StatefulSet, replicas, desiredReplica
|
||||
t.Errorf("Failed to scale statefulset to %v replicas, got %v replicas", desiredReplicas, set.Status.Replicas)
|
||||
}
|
||||
|
||||
if (diff < -1 || diff > 1) && om.createPodTracker.maxParallel <= 1 {
|
||||
t.Errorf("want max parallel requests > 1, got %v", om.createPodTracker.maxParallel)
|
||||
if (diff < -1 || diff > 1) && om.createPodTracker.maxParallelRequests <= 1 {
|
||||
t.Errorf("want max parallel requests > 1, got %v", om.createPodTracker.maxParallelRequests)
|
||||
}
|
||||
}
|
||||
|
||||
@ -3356,6 +3385,16 @@ func newRevisionOrDie(set *apps.StatefulSet, revision int64) *apps.ControllerRev
|
||||
}
|
||||
|
||||
func isOrHasInternalError(err error) bool {
|
||||
agg, ok := err.(utilerrors.Aggregate)
|
||||
return !ok && !apierrors.IsInternalError(err) || ok && len(agg.Errors()) > 0 && !apierrors.IsInternalError(agg.Errors()[0])
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
var agg utilerrors.Aggregate
|
||||
if errors.As(err, &agg) {
|
||||
for _, e := range agg.Errors() {
|
||||
if apierrors.IsInternalError(e) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return apierrors.IsInternalError(err)
|
||||
}
|
||||
|
@ -37,7 +37,6 @@ import (
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
core "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
@ -946,8 +945,7 @@ func newFakeStatefulSetController(ctx context.Context, initialObjects ...runtime
|
||||
ssh := history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions())
|
||||
ssc.podListerSynced = alwaysReady
|
||||
ssc.setListerSynced = alwaysReady
|
||||
recorder := record.NewFakeRecorder(10)
|
||||
ssc.control = NewDefaultStatefulSetControl(spc, ssu, ssh, recorder)
|
||||
ssc.control = NewDefaultStatefulSetControl(spc, ssu, ssh)
|
||||
|
||||
return ssc, spc, om, ssh
|
||||
}
|
||||
|
@ -25,12 +25,12 @@ import (
|
||||
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/util/retry"
|
||||
"k8s.io/kubectl/pkg/util/podutils"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
e2emanifest "k8s.io/kubernetes/test/e2e/framework/manifest"
|
||||
@ -247,21 +247,23 @@ func ExecInStatefulPods(ctx context.Context, c clientset.Interface, ss *appsv1.S
|
||||
}
|
||||
|
||||
// update updates a statefulset, and it is only used within rest.go
|
||||
func update(ctx context.Context, c clientset.Interface, ns, name string, replicas int32) *appsv1.StatefulSet {
|
||||
for i := 0; i < 3; i++ {
|
||||
ss, err := c.AppsV1().StatefulSets(ns).Get(ctx, name, metav1.GetOptions{})
|
||||
func update(ctx context.Context, c clientset.Interface, ns, name string, replicas int32) (ss *appsv1.StatefulSet) {
|
||||
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
|
||||
var err error
|
||||
ss, err = c.AppsV1().StatefulSets(ns).Get(ctx, name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
framework.Failf("failed to get statefulset %q: %v", name, err)
|
||||
}
|
||||
if *(ss.Spec.Replicas) == replicas {
|
||||
return nil
|
||||
}
|
||||
*(ss.Spec.Replicas) = replicas
|
||||
ss, err = c.AppsV1().StatefulSets(ns).Update(ctx, ss, metav1.UpdateOptions{})
|
||||
if err == nil {
|
||||
return ss
|
||||
}
|
||||
if !apierrors.IsConflict(err) && !apierrors.IsServerTimeout(err) {
|
||||
framework.Failf("failed to update statefulset %q: %v", name, err)
|
||||
}
|
||||
return err
|
||||
})
|
||||
if err == nil {
|
||||
return ss
|
||||
}
|
||||
framework.Failf("too many retries draining statefulset %q", name)
|
||||
framework.Failf("failed to update statefulset %q: %v", name, err)
|
||||
return nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user