fix stateful set pod recreation and event spam

- do not emit events when pod reaches terminal phase
- do not try to recreate pod until the old pod has been removed from
  etcd storage
This commit is contained in:
Filip Křepinský 2024-03-07 22:41:32 +01:00
parent a58d4f54cf
commit 7d9b913d10
4 changed files with 100 additions and 98 deletions

View File

@ -98,7 +98,6 @@ func NewStatefulSetController(
recorder), recorder),
NewRealStatefulSetStatusUpdater(kubeClient, setInformer.Lister()), NewRealStatefulSetStatusUpdater(kubeClient, setInformer.Lister()),
history.NewHistory(kubeClient, revInformer.Lister()), history.NewHistory(kubeClient, revInformer.Lister()),
recorder,
), ),
pvcListerSynced: pvcInformer.Informer().HasSynced, pvcListerSynced: pvcInformer.Informer().HasSynced,
revListerSynced: revInformer.Informer().HasSynced, revListerSynced: revInformer.Informer().HasSynced,
@ -235,6 +234,9 @@ func (ssc *StatefulSetController) updatePod(logger klog.Logger, old, cur interfa
return return
} }
logger.V(4).Info("Pod objectMeta updated", "pod", klog.KObj(curPod), "oldObjectMeta", oldPod.ObjectMeta, "newObjectMeta", curPod.ObjectMeta) logger.V(4).Info("Pod objectMeta updated", "pod", klog.KObj(curPod), "oldObjectMeta", oldPod.ObjectMeta, "newObjectMeta", curPod.ObjectMeta)
if oldPod.Status.Phase != curPod.Status.Phase {
logger.V(4).Info("StatefulSet Pod phase changed", "pod", klog.KObj(curPod), "statefulSet", klog.KObj(set), "podPhase", curPod.Status.Phase)
}
ssc.enqueueStatefulSet(set) ssc.enqueueStatefulSet(set)
// TODO: MinReadySeconds in the Pod will generate an Available condition to be added in // TODO: MinReadySeconds in the Pod will generate an Available condition to be added in
// the Pod status which in turn will trigger a requeue of the owning replica set thus // the Pod status which in turn will trigger a requeue of the owning replica set thus

View File

@ -27,7 +27,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors" utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller/history" "k8s.io/kubernetes/pkg/controller/history"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
@ -61,16 +60,14 @@ type StatefulSetControlInterface interface {
func NewDefaultStatefulSetControl( func NewDefaultStatefulSetControl(
podControl *StatefulPodControl, podControl *StatefulPodControl,
statusUpdater StatefulSetStatusUpdaterInterface, statusUpdater StatefulSetStatusUpdaterInterface,
controllerHistory history.Interface, controllerHistory history.Interface) StatefulSetControlInterface {
recorder record.EventRecorder) StatefulSetControlInterface { return &defaultStatefulSetControl{podControl, statusUpdater, controllerHistory}
return &defaultStatefulSetControl{podControl, statusUpdater, controllerHistory, recorder}
} }
type defaultStatefulSetControl struct { type defaultStatefulSetControl struct {
podControl *StatefulPodControl podControl *StatefulPodControl
statusUpdater StatefulSetStatusUpdaterInterface statusUpdater StatefulSetStatusUpdaterInterface
controllerHistory history.Interface controllerHistory history.Interface
recorder record.EventRecorder
} }
// UpdateStatefulSet executes the core logic loop for a stateful set, applying the predictable and // UpdateStatefulSet executes the core logic loop for a stateful set, applying the predictable and
@ -367,45 +364,25 @@ func updateStatus(status *apps.StatefulSetStatus, minReadySeconds int32, current
func (ssc *defaultStatefulSetControl) processReplica( func (ssc *defaultStatefulSetControl) processReplica(
ctx context.Context, ctx context.Context,
set *apps.StatefulSet, set *apps.StatefulSet,
currentRevision *apps.ControllerRevision,
updateRevision *apps.ControllerRevision,
currentSet *apps.StatefulSet,
updateSet *apps.StatefulSet, updateSet *apps.StatefulSet,
monotonic bool, monotonic bool,
replicas []*v1.Pod, replicas []*v1.Pod,
i int) (bool, error) { i int) (bool, error) {
logger := klog.FromContext(ctx) logger := klog.FromContext(ctx)
// Delete and recreate pods which finished running.
//
// Note that pods with phase Succeeded will also trigger this event. This is // Note that pods with phase Succeeded will also trigger this event. This is
// because final pod phase of evicted or otherwise forcibly stopped pods // because final pod phase of evicted or otherwise forcibly stopped pods
// (e.g. terminated on node reboot) is determined by the exit code of the // (e.g. terminated on node reboot) is determined by the exit code of the
// container, not by the reason for pod termination. We should restart the pod // container, not by the reason for pod termination. We should restart the pod
// regardless of the exit code. // regardless of the exit code.
if isFailed(replicas[i]) || isSucceeded(replicas[i]) { if isFailed(replicas[i]) || isSucceeded(replicas[i]) {
if isFailed(replicas[i]) { if replicas[i].DeletionTimestamp == nil {
ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod",
"StatefulSet %s/%s is recreating failed Pod %s",
set.Namespace,
set.Name,
replicas[i].Name)
} else {
ssc.recorder.Eventf(set, v1.EventTypeNormal, "RecreatingTerminatedPod",
"StatefulSet %s/%s is recreating terminated Pod %s",
set.Namespace,
set.Name,
replicas[i].Name)
}
if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil { if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil {
return true, err return true, err
} }
replicaOrd := i + getStartOrdinal(set) }
replicas[i] = newVersionedStatefulSetPod( // New pod should be generated on the next sync after the current pod is removed from etcd.
currentSet, return true, nil
updateSet,
currentRevision.Name,
updateRevision.Name,
replicaOrd)
} }
// If we find a Pod that has not been created we create the Pod // If we find a Pod that has not been created we create the Pod
if !isCreated(replicas[i]) { if !isCreated(replicas[i]) {
@ -637,7 +614,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
// First, process each living replica. Exit if we run into an error or something blocking in monotonic mode. // First, process each living replica. Exit if we run into an error or something blocking in monotonic mode.
processReplicaFn := func(i int) (bool, error) { processReplicaFn := func(i int) (bool, error) {
return ssc.processReplica(ctx, set, currentRevision, updateRevision, currentSet, updateSet, monotonic, replicas, i) return ssc.processReplica(ctx, set, updateSet, monotonic, replicas, i)
} }
if shouldExit, err := runForAll(replicas, processReplicaFn, monotonic); shouldExit || err != nil { if shouldExit, err := runForAll(replicas, processReplicaFn, monotonic); shouldExit || err != nil {
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned) updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)

View File

@ -60,8 +60,7 @@ func setupController(client clientset.Interface) (*fakeObjectManager, *fakeState
om := newFakeObjectManager(informerFactory) om := newFakeObjectManager(informerFactory)
spc := NewStatefulPodControlFromManager(om, &noopRecorder{}) spc := NewStatefulPodControlFromManager(om, &noopRecorder{})
ssu := newFakeStatefulSetStatusUpdater(informerFactory.Apps().V1().StatefulSets()) ssu := newFakeStatefulSetStatusUpdater(informerFactory.Apps().V1().StatefulSets())
recorder := &noopRecorder{} ssc := NewDefaultStatefulSetControl(spc, ssu, history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions()))
ssc := NewDefaultStatefulSetControl(spc, ssu, history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions()), recorder)
// The informer is not started. The tests here manipulate the local cache (indexers) directly, and there is no waiting // The informer is not started. The tests here manipulate the local cache (indexers) directly, and there is no waiting
// for client state to sync. In fact, because the client is not updated during tests, informer updates will break tests // for client state to sync. In fact, because the client is not updated during tests, informer updates will break tests
@ -171,10 +170,11 @@ func TestStatefulSetControl(t *testing.T) {
{ReplacesPods, largeSetFn}, {ReplacesPods, largeSetFn},
{RecreatesFailedPod, simpleSetFn}, {RecreatesFailedPod, simpleSetFn},
{RecreatesSucceededPod, simpleSetFn}, {RecreatesSucceededPod, simpleSetFn},
{RecreatesFailedPodWithDeleteFailure, simpleSetFn},
{RecreatesSucceededPodWithDeleteFailure, simpleSetFn},
{CreatePodFailure, simpleSetFn}, {CreatePodFailure, simpleSetFn},
{UpdatePodFailure, simpleSetFn}, {UpdatePodFailure, simpleSetFn},
{UpdateSetStatusFailure, simpleSetFn}, {UpdateSetStatusFailure, simpleSetFn},
{PodRecreateDeleteFailure, simpleSetFn},
{NewRevisionDeletePodFailure, simpleSetFn}, {NewRevisionDeletePodFailure, simpleSetFn},
{RecreatesPVCForPendingPod, simpleSetFn}, {RecreatesPVCForPendingPod, simpleSetFn},
} }
@ -398,9 +398,10 @@ func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc)
} }
} }
func recreatesPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc, phase v1.PodPhase) { func recreatesPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc, terminalPhase v1.PodPhase, testDeletePodFailure bool) {
client := fake.NewSimpleClientset() client := fake.NewSimpleClientset()
om, _, ssc := setupController(client) om, _, ssc := setupController(client)
expectedNumOfDeleteRequests := 0
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
@ -415,34 +416,105 @@ func recreatesPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc,
if err := invariants(set, om); err != nil { if err := invariants(set, om); err != nil {
t.Error(err) t.Error(err)
} }
pods, err = om.podsLister.Pods(set.Namespace).List(selector) if om.deletePodTracker.requests != expectedNumOfDeleteRequests {
if err != nil { t.Errorf("Found unexpected number of delete calls, got %v, expected 1", om.deletePodTracker.requests)
}
if pods, err = om.podsLister.Pods(set.Namespace).List(selector); err != nil {
t.Error(err) t.Error(err)
} }
pods[0].Status.Phase = phase
om.podsIndexer.Update(pods[0]) terminalPodOrdinal := -1
for i, pod := range pods {
// Set at least Pending phase to acknowledge the creation of pods
newPhase := v1.PodPending
if i == 0 {
// Set terminal phase for the first pod
newPhase = terminalPhase
terminalPodOrdinal = getOrdinal(pod)
}
pod.Status.Phase = newPhase
if err = om.podsIndexer.Update(pod); err != nil {
t.Error(err)
}
}
if pods, err = om.podsLister.Pods(set.Namespace).List(selector); err != nil {
t.Error(err)
}
if testDeletePodFailure {
// Expect pod deletion failure
om.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
expectedNumOfDeleteRequests++
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); !isOrHasInternalError(err) {
t.Errorf("StatefulSetControl did not return InternalError, found %s", err)
}
if err := invariants(set, om); err != nil {
t.Error(err)
}
if om.deletePodTracker.requests != expectedNumOfDeleteRequests {
t.Errorf("Found unexpected number of delete calls, got %v, expected %v", om.deletePodTracker.requests, expectedNumOfDeleteRequests)
}
if pods, err = om.podsLister.Pods(set.Namespace).List(selector); err != nil {
t.Error(err)
}
}
// Expect pod deletion
expectedNumOfDeleteRequests++
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err) t.Errorf("Error updating StatefulSet %s", err)
} }
if err := invariants(set, om); err != nil { if err := invariants(set, om); err != nil {
t.Error(err) t.Error(err)
} }
pods, err = om.podsLister.Pods(set.Namespace).List(selector) if om.deletePodTracker.requests != expectedNumOfDeleteRequests {
if err != nil { t.Errorf("Found unexpected number of delete calls, got %v, expected %v", om.deletePodTracker.requests, expectedNumOfDeleteRequests)
}
if pods, err = om.podsLister.Pods(set.Namespace).List(selector); err != nil {
t.Error(err) t.Error(err)
} }
if isCreated(pods[0]) {
// Expect no additional delete calls and expect pod creation
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
if err := invariants(set, om); err != nil {
t.Error(err)
}
if om.deletePodTracker.requests != expectedNumOfDeleteRequests {
t.Errorf("Found unexpected number of delete calls, got %v, expected %v", om.deletePodTracker.requests, expectedNumOfDeleteRequests)
}
if pods, err = om.podsLister.Pods(set.Namespace).List(selector); err != nil {
t.Error(err)
}
recreatedPod := findPodByOrdinal(pods, terminalPodOrdinal)
// new recreated pod should have empty phase
if recreatedPod == nil || isCreated(recreatedPod) {
t.Error("StatefulSet did not recreate failed Pod") t.Error("StatefulSet did not recreate failed Pod")
} }
expectedNumberOfCreateRequests := 2
if monotonic := !allowsBurst(set); !monotonic {
expectedNumberOfCreateRequests = int(*set.Spec.Replicas + 1)
}
if om.createPodTracker.requests != expectedNumberOfCreateRequests {
t.Errorf("Found unexpected number of create calls, got %v, expected %v", om.deletePodTracker.requests, expectedNumberOfCreateRequests)
}
} }
func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
recreatesPod(t, set, invariants, v1.PodFailed) recreatesPod(t, set, invariants, v1.PodFailed, false)
} }
func RecreatesSucceededPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { func RecreatesSucceededPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
recreatesPod(t, set, invariants, v1.PodSucceeded) recreatesPod(t, set, invariants, v1.PodSucceeded, false)
}
func RecreatesFailedPodWithDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
recreatesPod(t, set, invariants, v1.PodFailed, true)
}
func RecreatesSucceededPodWithDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
recreatesPod(t, set, invariants, v1.PodSucceeded, true)
} }
func CreatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { func CreatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
@ -551,52 +623,6 @@ func UpdateSetStatusFailure(t *testing.T, set *apps.StatefulSet, invariants inva
} }
} }
func PodRecreateDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
client := fake.NewSimpleClientset(set)
om, _, ssc := setupController(client)
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
t.Error(err)
}
pods, err := om.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Error(err)
}
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
if err := invariants(set, om); err != nil {
t.Error(err)
}
pods, err = om.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Error(err)
}
pods[0].Status.Phase = v1.PodFailed
om.podsIndexer.Update(pods[0])
om.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); !isOrHasInternalError(err) {
t.Errorf("StatefulSetControl did not return InternalError, found %s", err)
}
if err := invariants(set, om); err != nil {
t.Error(err)
}
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
if err := invariants(set, om); err != nil {
t.Error(err)
}
pods, err = om.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Error(err)
}
if isCreated(pods[0]) {
t.Error("StatefulSet did not recreate failed Pod")
}
}
func NewRevisionDeletePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { func NewRevisionDeletePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
client := fake.NewSimpleClientset(set) client := fake.NewSimpleClientset(set)
om, _, ssc := setupController(client) om, _, ssc := setupController(client)
@ -834,8 +860,7 @@ func TestStatefulSetControl_getSetRevisions(t *testing.T) {
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
spc := NewStatefulPodControlFromManager(newFakeObjectManager(informerFactory), &noopRecorder{}) spc := NewStatefulPodControlFromManager(newFakeObjectManager(informerFactory), &noopRecorder{})
ssu := newFakeStatefulSetStatusUpdater(informerFactory.Apps().V1().StatefulSets()) ssu := newFakeStatefulSetStatusUpdater(informerFactory.Apps().V1().StatefulSets())
recorder := &noopRecorder{} ssc := defaultStatefulSetControl{spc, ssu, history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions())}
ssc := defaultStatefulSetControl{spc, ssu, history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions()), recorder}
stop := make(chan struct{}) stop := make(chan struct{})
defer close(stop) defer close(stop)

View File

@ -37,7 +37,6 @@ import (
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing" core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
featuregatetesting "k8s.io/component-base/featuregate/testing" featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting" "k8s.io/klog/v2/ktesting"
@ -946,8 +945,7 @@ func newFakeStatefulSetController(ctx context.Context, initialObjects ...runtime
ssh := history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions()) ssh := history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions())
ssc.podListerSynced = alwaysReady ssc.podListerSynced = alwaysReady
ssc.setListerSynced = alwaysReady ssc.setListerSynced = alwaysReady
recorder := record.NewFakeRecorder(10) ssc.control = NewDefaultStatefulSetControl(spc, ssu, ssh)
ssc.control = NewDefaultStatefulSetControl(spc, ssu, ssh, recorder)
return ssc, spc, om, ssh return ssc, spc, om, ssh
} }