mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
Merge pull request #44730 from kow3ns/fix-44229
Automatic merge from submit-queue (batch tested with PRs 44625, 43594, 44756, 44730) Check for terminating Pod prior to launching successor in StatefulSet Modifies sync loop for StatefulSet controller to check if a Pod is terminating before launching its successor. Fixes #44229. Should be cherry picked into 1.6 branch. **Which issue this PR fixes** fixes #44229 ```release-note NONE ```
This commit is contained in:
commit
3317957a33
@ -132,6 +132,13 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p
|
|||||||
if !isCreated(replicas[i]) {
|
if !isCreated(replicas[i]) {
|
||||||
return ssc.podControl.CreateStatefulPod(set, replicas[i])
|
return ssc.podControl.CreateStatefulPod(set, replicas[i])
|
||||||
}
|
}
|
||||||
|
// If we find a Pod that is currently terminating, we must wait until graceful deletion
|
||||||
|
// completes before we continue to make progress.
|
||||||
|
if isTerminating(replicas[i]) {
|
||||||
|
glog.V(2).Infof("StatefulSet %s is waiting for Pod %s to Terminate",
|
||||||
|
set.Name, replicas[i].Name)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
// If we have a Pod that has been created but is not running and ready we can not make progress.
|
// If we have a Pod that has been created but is not running and ready we can not make progress.
|
||||||
// We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
|
// We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
|
||||||
// ordinal, are Running and Ready.
|
// ordinal, are Running and Ready.
|
||||||
|
@ -404,6 +404,72 @@ func TestDefaultStatefulSetControlUpdatePodFailure(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDefaultStatefulSetControlBlocksOnTerminating(t *testing.T) {
|
||||||
|
set := newStatefulSet(3)
|
||||||
|
client := fake.NewSimpleClientset(set)
|
||||||
|
|
||||||
|
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
|
||||||
|
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
|
||||||
|
ssc := NewDefaultStatefulSetControl(spc)
|
||||||
|
spc.SetUpdateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
|
||||||
|
|
||||||
|
stop := make(chan struct{})
|
||||||
|
defer close(stop)
|
||||||
|
informerFactory.Start(stop)
|
||||||
|
cache.WaitForCacheSync(
|
||||||
|
stop,
|
||||||
|
informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced,
|
||||||
|
informerFactory.Core().V1().Pods().Informer().HasSynced,
|
||||||
|
)
|
||||||
|
|
||||||
|
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil {
|
||||||
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
var err error
|
||||||
|
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error getting updated StatefulSet: %v", err)
|
||||||
|
}
|
||||||
|
if set.Status.Replicas != 3 {
|
||||||
|
t.Fatal("Failed to scale StatefulSet to 3 replicas")
|
||||||
|
}
|
||||||
|
// scale the set and add a terminated pod
|
||||||
|
*set.Spec.Replicas = 4
|
||||||
|
pods, err := spc.addTerminatingPod(set, 2)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err := ssc.UpdateStatefulSet(set, pods); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
pods, err = spc.podsLister.List(labels.Everything())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error listing pods: %v", err)
|
||||||
|
}
|
||||||
|
if len(pods) != 3 {
|
||||||
|
t.Fatalf("Expected 3 pods, got %d", len(pods))
|
||||||
|
}
|
||||||
|
sort.Sort(ascendingOrdinal(pods))
|
||||||
|
spc.DeleteStatefulPod(set, pods[2])
|
||||||
|
pods, err = spc.podsLister.List(labels.Everything())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error listing pods: %v", err)
|
||||||
|
}
|
||||||
|
if len(pods) != 2 {
|
||||||
|
t.Fatalf("Expected 3 pods, got %d", len(pods))
|
||||||
|
}
|
||||||
|
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil {
|
||||||
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error getting updated StatefulSet: %v", err)
|
||||||
|
}
|
||||||
|
if set.Status.Replicas != 4 {
|
||||||
|
t.Fatal("Failed to scale StatefulSet to 3 replicas")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestDefaultStatefulSetControlUpdateSetStatusFailure(t *testing.T) {
|
func TestDefaultStatefulSetControlUpdateSetStatusFailure(t *testing.T) {
|
||||||
set := newStatefulSet(3)
|
set := newStatefulSet(3)
|
||||||
client := fake.NewSimpleClientset(set)
|
client := fake.NewSimpleClientset(set)
|
||||||
@ -691,7 +757,7 @@ func (spc *fakeStatefulPodControl) setPodInitStatus(set *apps.StatefulSet, ordin
|
|||||||
return spc.podsLister.Pods(set.Namespace).List(selector)
|
return spc.podsLister.Pods(set.Namespace).List(selector)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (spc *fakeStatefulPodControl) addTerminatedPod(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) {
|
func (spc *fakeStatefulPodControl) addTerminatingPod(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) {
|
||||||
pod := newStatefulSetPod(set, ordinal)
|
pod := newStatefulSetPod(set, ordinal)
|
||||||
pod.Status.Phase = v1.PodRunning
|
pod.Status.Phase = v1.PodRunning
|
||||||
deleted := metav1.NewTime(time.Now())
|
deleted := metav1.NewTime(time.Now())
|
||||||
@ -907,7 +973,7 @@ func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlIn
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if pods, err = spc.addTerminatedPod(set, ordinal); err != nil {
|
if pods, err = spc.addTerminatingPod(set, ordinal); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err = ssc.UpdateStatefulSet(set, pods); err != nil {
|
if err = ssc.UpdateStatefulSet(set, pods); err != nil {
|
||||||
|
@ -91,11 +91,11 @@ func TestStatefulSetControllerRespectsTermination(t *testing.T) {
|
|||||||
if set.Status.Replicas != 3 {
|
if set.Status.Replicas != 3 {
|
||||||
t.Error("Falied to scale statefulset to 3 replicas")
|
t.Error("Falied to scale statefulset to 3 replicas")
|
||||||
}
|
}
|
||||||
pods, err := spc.addTerminatedPod(set, 3)
|
pods, err := spc.addTerminatingPod(set, 3)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
pods, err = spc.addTerminatedPod(set, 4)
|
pods, err = spc.addTerminatingPod(set, 4)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
@ -669,7 +669,7 @@ func scaleDownStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetContr
|
|||||||
spc.setsIndexer.Add(set)
|
spc.setsIndexer.Add(set)
|
||||||
ssc.enqueueStatefulSet(set)
|
ssc.enqueueStatefulSet(set)
|
||||||
fakeWorker(ssc)
|
fakeWorker(ssc)
|
||||||
pods, err = spc.addTerminatedPod(set, ord)
|
pods, err = spc.addTerminatingPod(set, ord)
|
||||||
pod = getPodAtOrdinal(pods, ord)
|
pod = getPodAtOrdinal(pods, ord)
|
||||||
ssc.updatePod(&prev, pod)
|
ssc.updatePod(&prev, pod)
|
||||||
fakeWorker(ssc)
|
fakeWorker(ssc)
|
||||||
@ -679,7 +679,7 @@ func scaleDownStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetContr
|
|||||||
for set.Status.Replicas > *set.Spec.Replicas {
|
for set.Status.Replicas > *set.Spec.Replicas {
|
||||||
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
|
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
|
||||||
ord := len(pods)
|
ord := len(pods)
|
||||||
pods, err = spc.addTerminatedPod(set, ord)
|
pods, err = spc.addTerminatingPod(set, ord)
|
||||||
pod = getPodAtOrdinal(pods, ord)
|
pod = getPodAtOrdinal(pods, ord)
|
||||||
ssc.updatePod(&prev, pod)
|
ssc.updatePod(&prev, pod)
|
||||||
fakeWorker(ssc)
|
fakeWorker(ssc)
|
||||||
|
@ -217,14 +217,14 @@ func isFailed(pod *v1.Pod) bool {
|
|||||||
return pod.Status.Phase == v1.PodFailed
|
return pod.Status.Phase == v1.PodFailed
|
||||||
}
|
}
|
||||||
|
|
||||||
// isTerminated returns true if pod's deletion Timestamp has been set
|
// isTerminating returns true if pod's DeletionTimestamp has been set
|
||||||
func isTerminated(pod *v1.Pod) bool {
|
func isTerminating(pod *v1.Pod) bool {
|
||||||
return pod.DeletionTimestamp != nil
|
return pod.DeletionTimestamp != nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// isHealthy returns true if pod is running and ready and has not been terminated
|
// isHealthy returns true if pod is running and ready and has not been terminated
|
||||||
func isHealthy(pod *v1.Pod) bool {
|
func isHealthy(pod *v1.Pod) bool {
|
||||||
return isRunningAndReady(pod) && !isTerminated(pod)
|
return isRunningAndReady(pod) && !isTerminating(pod)
|
||||||
}
|
}
|
||||||
|
|
||||||
// newControllerRef returns an ControllerRef pointing to a given StatefulSet.
|
// newControllerRef returns an ControllerRef pointing to a given StatefulSet.
|
||||||
|
Loading…
Reference in New Issue
Block a user