mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-04 01:40:07 +00:00
Merge pull request #121389 from aleksandra-malinowska/sts-restart-always
Resubmit "Make StatefulSet restart pods with phase Succeeded"
This commit is contained in:
commit
05765a851c
@ -376,13 +376,27 @@ func (ssc *defaultStatefulSetControl) processReplica(
|
|||||||
replicas []*v1.Pod,
|
replicas []*v1.Pod,
|
||||||
i int) (bool, error) {
|
i int) (bool, error) {
|
||||||
logger := klog.FromContext(ctx)
|
logger := klog.FromContext(ctx)
|
||||||
// delete and recreate failed pods
|
// Delete and recreate pods which finished running.
|
||||||
|
//
|
||||||
|
// Note that pods with phase Succeeded will also trigger this event. This is
|
||||||
|
// because final pod phase of evicted or otherwise forcibly stopped pods
|
||||||
|
// (e.g. terminated on node reboot) is determined by the exit code of the
|
||||||
|
// container, not by the reason for pod termination. We should restart the pod
|
||||||
|
// regardless of the exit code.
|
||||||
|
if isFailed(replicas[i]) || isSucceeded(replicas[i]) {
|
||||||
if isFailed(replicas[i]) {
|
if isFailed(replicas[i]) {
|
||||||
ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod",
|
ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod",
|
||||||
"StatefulSet %s/%s is recreating failed Pod %s",
|
"StatefulSet %s/%s is recreating failed Pod %s",
|
||||||
set.Namespace,
|
set.Namespace,
|
||||||
set.Name,
|
set.Name,
|
||||||
replicas[i].Name)
|
replicas[i].Name)
|
||||||
|
} else {
|
||||||
|
ssc.recorder.Eventf(set, v1.EventTypeNormal, "RecreatingTerminatedPod",
|
||||||
|
"StatefulSet %s/%s is recreating terminated Pod %s",
|
||||||
|
set.Namespace,
|
||||||
|
set.Name,
|
||||||
|
replicas[i].Name)
|
||||||
|
}
|
||||||
if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil {
|
if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil {
|
||||||
return true, err
|
return true, err
|
||||||
}
|
}
|
||||||
|
@ -170,6 +170,7 @@ func TestStatefulSetControl(t *testing.T) {
|
|||||||
{ScalesDown, simpleSetFn},
|
{ScalesDown, simpleSetFn},
|
||||||
{ReplacesPods, largeSetFn},
|
{ReplacesPods, largeSetFn},
|
||||||
{RecreatesFailedPod, simpleSetFn},
|
{RecreatesFailedPod, simpleSetFn},
|
||||||
|
{RecreatesSucceededPod, simpleSetFn},
|
||||||
{CreatePodFailure, simpleSetFn},
|
{CreatePodFailure, simpleSetFn},
|
||||||
{UpdatePodFailure, simpleSetFn},
|
{UpdatePodFailure, simpleSetFn},
|
||||||
{UpdateSetStatusFailure, simpleSetFn},
|
{UpdateSetStatusFailure, simpleSetFn},
|
||||||
@ -397,7 +398,7 @@ func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
|
func recreatesPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc, phase v1.PodPhase) {
|
||||||
client := fake.NewSimpleClientset()
|
client := fake.NewSimpleClientset()
|
||||||
om, _, ssc := setupController(client)
|
om, _, ssc := setupController(client)
|
||||||
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
|
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
|
||||||
@ -418,7 +419,7 @@ func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invarian
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
pods[0].Status.Phase = v1.PodFailed
|
pods[0].Status.Phase = phase
|
||||||
om.podsIndexer.Update(pods[0])
|
om.podsIndexer.Update(pods[0])
|
||||||
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
|
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
|
||||||
t.Errorf("Error updating StatefulSet %s", err)
|
t.Errorf("Error updating StatefulSet %s", err)
|
||||||
@ -433,6 +434,15 @@ func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invarian
|
|||||||
if isCreated(pods[0]) {
|
if isCreated(pods[0]) {
|
||||||
t.Error("StatefulSet did not recreate failed Pod")
|
t.Error("StatefulSet did not recreate failed Pod")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
|
||||||
|
recreatesPod(t, set, invariants, v1.PodFailed)
|
||||||
|
}
|
||||||
|
|
||||||
|
func RecreatesSucceededPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
|
||||||
|
recreatesPod(t, set, invariants, v1.PodSucceeded)
|
||||||
}
|
}
|
||||||
|
|
||||||
func CreatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
|
func CreatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
|
||||||
|
@ -426,6 +426,11 @@ func isFailed(pod *v1.Pod) bool {
|
|||||||
return pod.Status.Phase == v1.PodFailed
|
return pod.Status.Phase == v1.PodFailed
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// isSucceeded returns true if pod has a Phase of PodSucceeded
|
||||||
|
func isSucceeded(pod *v1.Pod) bool {
|
||||||
|
return pod.Status.Phase == v1.PodSucceeded
|
||||||
|
}
|
||||||
|
|
||||||
// isTerminating returns true if pod's DeletionTimestamp has been set
|
// isTerminating returns true if pod's DeletionTimestamp has been set
|
||||||
func isTerminating(pod *v1.Pod) bool {
|
func isTerminating(pod *v1.Pod) bool {
|
||||||
return pod.DeletionTimestamp != nil
|
return pod.DeletionTimestamp != nil
|
||||||
|
@ -28,6 +28,7 @@ import (
|
|||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||||
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/apimachinery/pkg/util/json"
|
"k8s.io/apimachinery/pkg/util/json"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
@ -168,7 +169,7 @@ func TestSpecReplicasChange(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDeletingAndFailedPods(t *testing.T) {
|
func TestDeletingAndTerminatingPods(t *testing.T) {
|
||||||
_, ctx := ktesting.NewTestContext(t)
|
_, ctx := ktesting.NewTestContext(t)
|
||||||
closeFn, rm, informers, c := scSetup(ctx, t)
|
closeFn, rm, informers, c := scSetup(ctx, t)
|
||||||
defer closeFn()
|
defer closeFn()
|
||||||
@ -177,17 +178,19 @@ func TestDeletingAndFailedPods(t *testing.T) {
|
|||||||
cancel := runControllerAndInformers(rm, informers)
|
cancel := runControllerAndInformers(rm, informers)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
podCount := 3
|
||||||
|
|
||||||
labelMap := labelMap()
|
labelMap := labelMap()
|
||||||
sts := newSTS("sts", ns.Name, 2)
|
sts := newSTS("sts", ns.Name, podCount)
|
||||||
stss, _ := createSTSsPods(t, c, []*appsv1.StatefulSet{sts}, []*v1.Pod{})
|
stss, _ := createSTSsPods(t, c, []*appsv1.StatefulSet{sts}, []*v1.Pod{})
|
||||||
sts = stss[0]
|
sts = stss[0]
|
||||||
waitSTSStable(t, c, sts)
|
waitSTSStable(t, c, sts)
|
||||||
|
|
||||||
// Verify STS creates 2 pods
|
// Verify STS creates 3 pods
|
||||||
podClient := c.CoreV1().Pods(ns.Name)
|
podClient := c.CoreV1().Pods(ns.Name)
|
||||||
pods := getPods(t, podClient, labelMap)
|
pods := getPods(t, podClient, labelMap)
|
||||||
if len(pods.Items) != 2 {
|
if len(pods.Items) != podCount {
|
||||||
t.Fatalf("len(pods) = %d, want 2", len(pods.Items))
|
t.Fatalf("len(pods) = %d, want %d", len(pods.Items), podCount)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set first pod as deleting pod
|
// Set first pod as deleting pod
|
||||||
@ -206,23 +209,48 @@ func TestDeletingAndFailedPods(t *testing.T) {
|
|||||||
pod.Status.Phase = v1.PodFailed
|
pod.Status.Phase = v1.PodFailed
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// Set third pod as succeeded pod
|
||||||
|
succeededPod := &pods.Items[2]
|
||||||
|
updatePodStatus(t, podClient, succeededPod.Name, func(pod *v1.Pod) {
|
||||||
|
pod.Status.Phase = v1.PodSucceeded
|
||||||
|
})
|
||||||
|
|
||||||
|
exists := func(pods []v1.Pod, uid types.UID) bool {
|
||||||
|
for _, pod := range pods {
|
||||||
|
if pod.UID == uid {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
|
if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
|
||||||
// Verify only 2 pods exist: deleting pod and new pod replacing failed pod
|
// Verify only 3 pods exist: deleting pod and new pod replacing failed pod
|
||||||
pods = getPods(t, podClient, labelMap)
|
pods = getPods(t, podClient, labelMap)
|
||||||
if len(pods.Items) != 2 {
|
if len(pods.Items) != podCount {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify deleting pod still exists
|
// Verify deleting pod still exists
|
||||||
// Immediately return false with an error if it does not exist
|
// Immediately return false with an error if it does not exist
|
||||||
if pods.Items[0].UID != deletingPod.UID && pods.Items[1].UID != deletingPod.UID {
|
if !exists(pods.Items, deletingPod.UID) {
|
||||||
return false, fmt.Errorf("expected deleting pod %s still exists, but it is not found", deletingPod.Name)
|
return false, fmt.Errorf("expected deleting pod %s still exists, but it is not found", deletingPod.Name)
|
||||||
}
|
}
|
||||||
// Verify failed pod does not exist anymore
|
// Verify failed pod does not exist anymore
|
||||||
if pods.Items[0].UID == failedPod.UID || pods.Items[1].UID == failedPod.UID {
|
if exists(pods.Items, failedPod.UID) {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
// Verify both pods have non-failed status
|
// Verify succeeded pod does not exist anymore
|
||||||
return pods.Items[0].Status.Phase != v1.PodFailed && pods.Items[1].Status.Phase != v1.PodFailed, nil
|
if exists(pods.Items, succeededPod.UID) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
// Verify all pods have non-terminated status
|
||||||
|
for _, pod := range pods.Items {
|
||||||
|
if pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
t.Fatalf("failed to verify failed pod %s has been replaced with a new non-failed pod, and deleting pod %s survives: %v", failedPod.Name, deletingPod.Name, err)
|
t.Fatalf("failed to verify failed pod %s has been replaced with a new non-failed pod, and deleting pod %s survives: %v", failedPod.Name, deletingPod.Name, err)
|
||||||
}
|
}
|
||||||
@ -235,11 +263,11 @@ func TestDeletingAndFailedPods(t *testing.T) {
|
|||||||
if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
|
if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
|
||||||
// Verify only 2 pods exist: new non-deleting pod replacing deleting pod and the non-failed pod
|
// Verify only 2 pods exist: new non-deleting pod replacing deleting pod and the non-failed pod
|
||||||
pods = getPods(t, podClient, labelMap)
|
pods = getPods(t, podClient, labelMap)
|
||||||
if len(pods.Items) != 2 {
|
if len(pods.Items) != podCount {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
// Verify deleting pod does not exist anymore
|
// Verify deleting pod does not exist anymore
|
||||||
return pods.Items[0].UID != deletingPod.UID && pods.Items[1].UID != deletingPod.UID, nil
|
return !exists(pods.Items, deletingPod.UID), nil
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
t.Fatalf("failed to verify deleting pod %s has been replaced with a new non-deleting pod: %v", deletingPod.Name, err)
|
t.Fatalf("failed to verify deleting pod %s has been replaced with a new non-deleting pod: %v", deletingPod.Name, err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user