Skip terminal Pods with a deletion timestamp from the Daemonset sync (#118716)

* Skip terminal Pods with a deletion timestamp from the Daemonset sync

Change-Id: I64a347a87c02ee2bd48be10e6fff380c8c81f742

* Review comments and fix integration test

Change-Id: I3eb5ec62bce8b4b150726a1e9b2b517c4e993713

* Include deleted terminal pods in history

Change-Id: I8b921157e6be1c809dd59f8035ec259ea4d96301
This commit is contained in:
Aldo Culquicondor 2023-06-27 11:56:33 -04:00 committed by GitHub
parent 6dbb1c6cf0
commit a4519665fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 188 additions and 84 deletions

View File

@ -752,7 +752,7 @@ func (dsc *DaemonSetsController) getDaemonPods(ctx context.Context, ds *apps.Dae
// This also reconciles ControllerRef by adopting/orphaning.
// Note that returned Pods are pointers to objects in the cache.
// If you want to modify one, you need to deep-copy it first.
func (dsc *DaemonSetsController) getNodesToDaemonPods(ctx context.Context, ds *apps.DaemonSet) (map[string][]*v1.Pod, error) {
func (dsc *DaemonSetsController) getNodesToDaemonPods(ctx context.Context, ds *apps.DaemonSet, includeDeletedTerminal bool) (map[string][]*v1.Pod, error) {
claimedPods, err := dsc.getDaemonPods(ctx, ds)
if err != nil {
return nil, err
@ -761,6 +761,12 @@ func (dsc *DaemonSetsController) getNodesToDaemonPods(ctx context.Context, ds *a
nodeToDaemonPods := make(map[string][]*v1.Pod)
logger := klog.FromContext(ctx)
for _, pod := range claimedPods {
if !includeDeletedTerminal && podutil.IsPodTerminal(pod) && pod.DeletionTimestamp != nil {
// This Pod has a finalizer or is already scheduled for deletion from the
// store by the kubelet or the Pod GC. The DS controller doesn't have
// anything else to do with it.
continue
}
nodeName, err := util.GetTargetNodeName(pod)
if err != nil {
logger.Info("Failed to get target node name of Pod in DaemonSet",
@ -953,7 +959,7 @@ func (dsc *DaemonSetsController) updateDaemonSet(ctx context.Context, ds *apps.D
// syncNodes with a list of pods to remove and a list of nodes to run a Pod of ds.
func (dsc *DaemonSetsController) manage(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
// Find out the pods which are created for the nodes by DaemonSet.
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds)
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds, false)
if err != nil {
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
}
@ -1154,7 +1160,7 @@ func storeDaemonSetStatus(
func (dsc *DaemonSetsController) updateDaemonSetStatus(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string, updateObservedGen bool) error {
logger := klog.FromContext(ctx)
logger.V(4).Info("Updating daemon set status")
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds)
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds, false)
if err != nil {
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
}

View File

@ -2739,75 +2739,128 @@ func TestDeleteUnscheduledPodForNotExistingNode(t *testing.T) {
}
func TestGetNodesToDaemonPods(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
ds2 := newDaemonSet("foo2")
ds2.Spec.UpdateStrategy = *strategy
_, ctx := ktesting.NewTestContext(t)
manager, _, _, err := newTestController(ctx, ds, ds2)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
err = manager.dsStore.Add(ds)
if err != nil {
t.Fatal(err)
}
err = manager.dsStore.Add(ds2)
if err != nil {
t.Fatal(err)
}
addNodes(manager.nodeStore, 0, 2, nil)
// These pods should be returned.
wantedPods := []*v1.Pod{
newPod("matching-owned-0-", "node-0", simpleDaemonSetLabel, ds),
newPod("matching-orphan-0-", "node-0", simpleDaemonSetLabel, nil),
newPod("matching-owned-1-", "node-1", simpleDaemonSetLabel, ds),
newPod("matching-orphan-1-", "node-1", simpleDaemonSetLabel, nil),
}
failedPod := newPod("matching-owned-failed-pod-1-", "node-1", simpleDaemonSetLabel, ds)
failedPod.Status = v1.PodStatus{Phase: v1.PodFailed}
wantedPods = append(wantedPods, failedPod)
for _, pod := range wantedPods {
manager.podStore.Add(pod)
}
// These pods should be ignored.
ignoredPods := []*v1.Pod{
newPod("non-matching-owned-0-", "node-0", simpleDaemonSetLabel2, ds),
newPod("non-matching-orphan-1-", "node-1", simpleDaemonSetLabel2, nil),
newPod("matching-owned-by-other-0-", "node-0", simpleDaemonSetLabel, ds2),
}
for _, pod := range ignoredPods {
err = manager.podStore.Add(pod)
ds := newDaemonSet("foo")
ds2 := newDaemonSet("foo2")
cases := map[string]struct {
includeDeletedTerminal bool
wantedPods []*v1.Pod
ignoredPods []*v1.Pod
}{
"exclude deleted terminal pods": {
wantedPods: []*v1.Pod{
newPod("matching-owned-0-", "node-0", simpleDaemonSetLabel, ds),
newPod("matching-orphan-0-", "node-0", simpleDaemonSetLabel, nil),
newPod("matching-owned-1-", "node-1", simpleDaemonSetLabel, ds),
newPod("matching-orphan-1-", "node-1", simpleDaemonSetLabel, nil),
func() *v1.Pod {
pod := newPod("matching-owned-succeeded-pod-0-", "node-0", simpleDaemonSetLabel, ds)
pod.Status = v1.PodStatus{Phase: v1.PodSucceeded}
return pod
}(),
func() *v1.Pod {
pod := newPod("matching-owned-failed-pod-1-", "node-1", simpleDaemonSetLabel, ds)
pod.Status = v1.PodStatus{Phase: v1.PodFailed}
return pod
}(),
},
ignoredPods: []*v1.Pod{
newPod("non-matching-owned-0-", "node-0", simpleDaemonSetLabel2, ds),
newPod("non-matching-orphan-1-", "node-1", simpleDaemonSetLabel2, nil),
newPod("matching-owned-by-other-0-", "node-0", simpleDaemonSetLabel, ds2),
func() *v1.Pod {
pod := newPod("matching-owned-succeeded-deleted-pod-0-", "node-0", simpleDaemonSetLabel, ds)
now := metav1.Now()
pod.DeletionTimestamp = &now
pod.Status = v1.PodStatus{Phase: v1.PodSucceeded}
return pod
}(),
func() *v1.Pod {
pod := newPod("matching-owned-failed-deleted-pod-1-", "node-1", simpleDaemonSetLabel, ds)
now := metav1.Now()
pod.DeletionTimestamp = &now
pod.Status = v1.PodStatus{Phase: v1.PodFailed}
return pod
}(),
},
},
"include deleted terminal pods": {
includeDeletedTerminal: true,
wantedPods: []*v1.Pod{
newPod("matching-owned-0-", "node-0", simpleDaemonSetLabel, ds),
newPod("matching-orphan-0-", "node-0", simpleDaemonSetLabel, nil),
newPod("matching-owned-1-", "node-1", simpleDaemonSetLabel, ds),
newPod("matching-orphan-1-", "node-1", simpleDaemonSetLabel, nil),
func() *v1.Pod {
pod := newPod("matching-owned-succeeded-pod-0-", "node-0", simpleDaemonSetLabel, ds)
pod.Status = v1.PodStatus{Phase: v1.PodSucceeded}
return pod
}(),
func() *v1.Pod {
pod := newPod("matching-owned-failed-deleted-pod-1-", "node-1", simpleDaemonSetLabel, ds)
now := metav1.Now()
pod.DeletionTimestamp = &now
pod.Status = v1.PodStatus{Phase: v1.PodFailed}
return pod
}(),
},
ignoredPods: []*v1.Pod{
newPod("non-matching-owned-0-", "node-0", simpleDaemonSetLabel2, ds),
newPod("non-matching-orphan-1-", "node-1", simpleDaemonSetLabel2, nil),
newPod("matching-owned-by-other-0-", "node-0", simpleDaemonSetLabel, ds2),
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
manager, _, _, err := newTestController(ctx, ds, ds2)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
err = manager.dsStore.Add(ds)
if err != nil {
t.Fatal(err)
}
}
err = manager.dsStore.Add(ds2)
if err != nil {
t.Fatal(err)
}
addNodes(manager.nodeStore, 0, 2, nil)
nodesToDaemonPods, err := manager.getNodesToDaemonPods(context.TODO(), ds)
if err != nil {
t.Fatalf("getNodesToDaemonPods() error: %v", err)
}
gotPods := map[string]bool{}
for node, pods := range nodesToDaemonPods {
for _, pod := range pods {
if pod.Spec.NodeName != node {
t.Errorf("pod %v grouped into %v but belongs in %v", pod.Name, node, pod.Spec.NodeName)
for _, pod := range tc.wantedPods {
manager.podStore.Add(pod)
}
for _, pod := range tc.ignoredPods {
err = manager.podStore.Add(pod)
if err != nil {
t.Fatal(err)
}
gotPods[pod.Name] = true
}
}
for _, pod := range wantedPods {
if !gotPods[pod.Name] {
t.Errorf("expected pod %v but didn't get it", pod.Name)
nodesToDaemonPods, err := manager.getNodesToDaemonPods(context.TODO(), ds, tc.includeDeletedTerminal)
if err != nil {
t.Fatalf("getNodesToDaemonPods() error: %v", err)
}
delete(gotPods, pod.Name)
}
for podName := range gotPods {
t.Errorf("unexpected pod %v was returned", podName)
}
gotPods := map[string]bool{}
for node, pods := range nodesToDaemonPods {
for _, pod := range pods {
if pod.Spec.NodeName != node {
t.Errorf("pod %v grouped into %v but belongs in %v", pod.Name, node, pod.Spec.NodeName)
}
gotPods[pod.Name] = true
}
}
for _, pod := range tc.wantedPods {
if !gotPods[pod.Name] {
t.Errorf("expected pod %v but didn't get it", pod.Name)
}
delete(gotPods, pod.Name)
}
for podName := range gotPods {
t.Errorf("unexpected pod %v was returned", podName)
}
})
}
}

View File

@ -43,7 +43,7 @@ import (
// remaining within the constraints imposed by the update strategy.
func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
logger := klog.FromContext(ctx)
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds)
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds, false)
if err != nil {
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
}
@ -294,7 +294,8 @@ func (dsc *DaemonSetsController) constructHistory(ctx context.Context, ds *apps.
}
func (dsc *DaemonSetsController) cleanupHistory(ctx context.Context, ds *apps.DaemonSet, old []*apps.ControllerRevision) error {
nodesToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds)
// Include deleted terminal pods when maintaining history.
nodesToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds, true)
if err != nil {
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
}

View File

@ -49,6 +49,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/profile"
labelsutil "k8s.io/kubernetes/pkg/util/labels"
"k8s.io/kubernetes/test/integration/framework"
testutils "k8s.io/kubernetes/test/integration/util"
"k8s.io/kubernetes/test/utils/ktesting"
)
@ -155,6 +156,7 @@ func newDaemonSet(name, namespace string) *apps.DaemonSet {
}
func cleanupDaemonSets(t *testing.T, cs clientset.Interface, ds *apps.DaemonSet) {
t.Helper()
ds, err := cs.AppsV1().DaemonSets(ds.Namespace).Get(context.TODO(), ds.Name, metav1.GetOptions{})
if err != nil {
t.Errorf("Failed to get DaemonSet %s/%s: %v", ds.Namespace, ds.Name, err)
@ -176,6 +178,10 @@ func cleanupDaemonSets(t *testing.T, cs clientset.Interface, ds *apps.DaemonSet)
return
}
if len(ds.Spec.Template.Finalizers) > 0 {
testutils.RemovePodFinalizersInNamespace(context.TODO(), cs, t, ds.Namespace)
}
// Wait for the daemon set controller to kill all the daemon pods.
if err := wait.Poll(100*time.Millisecond, 30*time.Second, func() (bool, error) {
updatedDS, err := cs.AppsV1().DaemonSets(ds.Namespace).Get(context.TODO(), ds.Name, metav1.GetOptions{})
@ -275,9 +281,7 @@ func validateDaemonSetPodsAndMarkReady(
) {
if err := wait.Poll(time.Second, 60*time.Second, func() (bool, error) {
objects := podInformer.GetIndexer().List()
if len(objects) != numberPods {
return false, nil
}
nonTerminatedPods := 0
for _, object := range objects {
pod := object.(*v1.Pod)
@ -294,6 +298,10 @@ func validateDaemonSetPodsAndMarkReady(
t.Errorf("controllerRef.Controller is not set to true")
}
if podutil.IsPodPhaseTerminal(pod.Status.Phase) {
continue
}
nonTerminatedPods++
if !podutil.IsPodReady(pod) && len(pod.Spec.NodeName) != 0 {
podCopy := pod.DeepCopy()
podCopy.Status = v1.PodStatus{
@ -307,7 +315,7 @@ func validateDaemonSetPodsAndMarkReady(
}
}
return true, nil
return nonTerminatedPods == numberPods, nil
}); err != nil {
t.Fatal(err)
}
@ -536,8 +544,23 @@ func TestSimpleDaemonSetLaunchesPods(t *testing.T) {
}
func TestSimpleDaemonSetRestartsPodsOnTerminalPhase(t *testing.T) {
for _, podPhase := range []v1.PodPhase{v1.PodSucceeded, v1.PodFailed} {
t.Run(string(podPhase), func(t *testing.T) {
cases := map[string]struct {
phase v1.PodPhase
finalizer bool
}{
"Succeeded": {
phase: v1.PodSucceeded,
},
"Failed": {
phase: v1.PodFailed,
},
"Succeeded with finalizer": {
phase: v1.PodSucceeded,
finalizer: true,
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
ctx, closeFn, dc, informers, clientset := setup(t)
defer closeFn()
@ -553,6 +576,9 @@ func TestSimpleDaemonSetRestartsPodsOnTerminalPhase(t *testing.T) {
go dc.Run(ctx, 2)
ds := newDaemonSet("restart-terminal-pod", ns.Name)
if tc.finalizer {
ds.Spec.Template.Finalizers = append(ds.Spec.Template.Finalizers, "test.k8s.io/finalizer")
}
ds.Spec.UpdateStrategy = *strategy
if _, err := dsClient.Create(ctx, ds, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create DaemonSet: %v", err)
@ -566,9 +592,9 @@ func TestSimpleDaemonSetRestartsPodsOnTerminalPhase(t *testing.T) {
validateDaemonSetStatus(dsClient, ds.Name, int32(numNodes), t)
podToMarkAsTerminal := podInformer.GetIndexer().List()[0].(*v1.Pod)
podCopy := podToMarkAsTerminal.DeepCopy()
podCopy.Status.Phase = podPhase
podCopy.Status.Phase = tc.phase
if _, err := podClient.UpdateStatus(ctx, podCopy, metav1.UpdateOptions{}); err != nil {
t.Fatalf("Failed to mark the pod as terminal with phase: %v. Error: %v", podPhase, err)
t.Fatalf("Failed to mark the pod as terminal with phase: %v. Error: %v", tc.phase, err)
}
// verify all pods are active. They either continue Running or are Pending after restart
validateDaemonSetPodsActive(podClient, podInformer, numNodes, t)

View File

@ -116,7 +116,7 @@ func TestPodGcOrphanedPodsWithFinalizer(t *testing.T) {
if err != nil {
t.Fatalf("Error %v, while creating pod: %v", err, klog.KObj(pod))
}
defer testutils.RemovePodFinalizers(testCtx.ClientSet, t, []*v1.Pod{pod})
defer testutils.RemovePodFinalizers(testCtx.Ctx, testCtx.ClientSet, t, *pod)
pod.Status.Phase = test.phase
if _, err := testCtx.ClientSet.CoreV1().Pods(testCtx.NS.Name).UpdateStatus(testCtx.Ctx, pod, metav1.UpdateOptions{}); err != nil {
@ -224,7 +224,7 @@ func TestTerminatingOnOutOfServiceNode(t *testing.T) {
t.Fatalf("Error %v, while creating pod: %v", err, klog.KObj(pod))
}
if test.withFinalizer {
defer testutils.RemovePodFinalizers(testCtx.ClientSet, t, []*v1.Pod{pod})
defer testutils.RemovePodFinalizers(testCtx.Ctx, testCtx.ClientSet, t, *pod)
}
// trigger termination of the pod, but with long grace period so that it is not removed immediately

View File

@ -18,6 +18,7 @@ package util
import (
"context"
"encoding/json"
"errors"
"fmt"
"testing"
@ -27,6 +28,7 @@ import (
policy "k8s.io/api/policy/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/admission"
@ -229,17 +231,33 @@ func CleanupTest(t *testing.T, testCtx *TestContext) {
testCtx.CloseFn()
}
func RemovePodFinalizersInNamespace(ctx context.Context, cs clientset.Interface, t *testing.T, ns string) {
t.Helper()
pods, err := cs.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{})
if err != nil {
t.Fatalf("Failed obtaining list of pods: %v", err)
}
RemovePodFinalizers(ctx, cs, t, pods.Items...)
}
// RemovePodFinalizers removes pod finalizers for the pods
func RemovePodFinalizers(cs clientset.Interface, t *testing.T, pods []*v1.Pod) {
func RemovePodFinalizers(ctx context.Context, cs clientset.Interface, t *testing.T, pods ...v1.Pod) {
t.Helper()
for _, p := range pods {
pod, err := cs.CoreV1().Pods(p.Namespace).Get(context.TODO(), p.Name, metav1.GetOptions{})
pod, err := cs.CoreV1().Pods(p.Namespace).Get(ctx, p.Name, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
t.Errorf("error while removing pod finalizers for %v: %v", klog.KObj(p), err)
} else if pod != nil {
pod.ObjectMeta.Finalizers = nil
_, err = cs.CoreV1().Pods(pod.Namespace).Update(context.TODO(), pod, metav1.UpdateOptions{})
t.Errorf("error while removing pod finalizers for %v: %v", klog.KObj(&p), err)
} else if pod != nil && len(pod.Finalizers) > 0 {
// Use Patch to remove finalizer, instead of Update, to avoid transient
// conflicts.
patchBytes, _ := json.Marshal(map[string]interface{}{
"metadata": map[string]interface{}{
"$deleteFromPrimitiveList/finalizers": pod.Finalizers,
},
})
_, err = cs.CoreV1().Pods(pod.Namespace).Patch(ctx, pod.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
if err != nil {
t.Errorf("error while updating pod status for %v: %v", klog.KObj(p), err)
t.Errorf("error while updating pod status for %v: %v", klog.KObj(&p), err)
}
}
}