diff --git a/pkg/controller/daemon/daemon_controller.go b/pkg/controller/daemon/daemon_controller.go index d8a2f67dcf8..ffcebcb17db 100644 --- a/pkg/controller/daemon/daemon_controller.go +++ b/pkg/controller/daemon/daemon_controller.go @@ -752,7 +752,7 @@ func (dsc *DaemonSetsController) getDaemonPods(ctx context.Context, ds *apps.Dae // This also reconciles ControllerRef by adopting/orphaning. // Note that returned Pods are pointers to objects in the cache. // If you want to modify one, you need to deep-copy it first. -func (dsc *DaemonSetsController) getNodesToDaemonPods(ctx context.Context, ds *apps.DaemonSet) (map[string][]*v1.Pod, error) { +func (dsc *DaemonSetsController) getNodesToDaemonPods(ctx context.Context, ds *apps.DaemonSet, includeDeletedTerminal bool) (map[string][]*v1.Pod, error) { claimedPods, err := dsc.getDaemonPods(ctx, ds) if err != nil { return nil, err @@ -761,6 +761,12 @@ func (dsc *DaemonSetsController) getNodesToDaemonPods(ctx context.Context, ds *a nodeToDaemonPods := make(map[string][]*v1.Pod) logger := klog.FromContext(ctx) for _, pod := range claimedPods { + if !includeDeletedTerminal && podutil.IsPodTerminal(pod) && pod.DeletionTimestamp != nil { + // This Pod has a finalizer or is already scheduled for deletion from the + // store by the kubelet or the Pod GC. The DS controller doesn't have + // anything else to do with it. + continue + } nodeName, err := util.GetTargetNodeName(pod) if err != nil { logger.Info("Failed to get target node name of Pod in DaemonSet", @@ -953,7 +959,7 @@ func (dsc *DaemonSetsController) updateDaemonSet(ctx context.Context, ds *apps.D // syncNodes with a list of pods to remove and a list of nodes to run a Pod of ds. func (dsc *DaemonSetsController) manage(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error { // Find out the pods which are created for the nodes by DaemonSet. - nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds) + nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds, false) if err != nil { return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) } @@ -1154,7 +1160,7 @@ func storeDaemonSetStatus( func (dsc *DaemonSetsController) updateDaemonSetStatus(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string, updateObservedGen bool) error { logger := klog.FromContext(ctx) logger.V(4).Info("Updating daemon set status") - nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds) + nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds, false) if err != nil { return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) } diff --git a/pkg/controller/daemon/daemon_controller_test.go b/pkg/controller/daemon/daemon_controller_test.go index 72646863cd3..bfaff5b6316 100644 --- a/pkg/controller/daemon/daemon_controller_test.go +++ b/pkg/controller/daemon/daemon_controller_test.go @@ -2739,75 +2739,128 @@ func TestDeleteUnscheduledPodForNotExistingNode(t *testing.T) { } func TestGetNodesToDaemonPods(t *testing.T) { - for _, strategy := range updateStrategies() { - ds := newDaemonSet("foo") - ds.Spec.UpdateStrategy = *strategy - ds2 := newDaemonSet("foo2") - ds2.Spec.UpdateStrategy = *strategy - _, ctx := ktesting.NewTestContext(t) - manager, _, _, err := newTestController(ctx, ds, ds2) - if err != nil { - t.Fatalf("error creating DaemonSets controller: %v", err) - } - err = manager.dsStore.Add(ds) - if err != nil { - t.Fatal(err) - } - err = manager.dsStore.Add(ds2) - if err != nil { - t.Fatal(err) - } - addNodes(manager.nodeStore, 0, 2, nil) - - // These pods should be returned. - wantedPods := []*v1.Pod{ - newPod("matching-owned-0-", "node-0", simpleDaemonSetLabel, ds), - newPod("matching-orphan-0-", "node-0", simpleDaemonSetLabel, nil), - newPod("matching-owned-1-", "node-1", simpleDaemonSetLabel, ds), - newPod("matching-orphan-1-", "node-1", simpleDaemonSetLabel, nil), - } - failedPod := newPod("matching-owned-failed-pod-1-", "node-1", simpleDaemonSetLabel, ds) - failedPod.Status = v1.PodStatus{Phase: v1.PodFailed} - wantedPods = append(wantedPods, failedPod) - for _, pod := range wantedPods { - manager.podStore.Add(pod) - } - - // These pods should be ignored. - ignoredPods := []*v1.Pod{ - newPod("non-matching-owned-0-", "node-0", simpleDaemonSetLabel2, ds), - newPod("non-matching-orphan-1-", "node-1", simpleDaemonSetLabel2, nil), - newPod("matching-owned-by-other-0-", "node-0", simpleDaemonSetLabel, ds2), - } - for _, pod := range ignoredPods { - err = manager.podStore.Add(pod) + ds := newDaemonSet("foo") + ds2 := newDaemonSet("foo2") + cases := map[string]struct { + includeDeletedTerminal bool + wantedPods []*v1.Pod + ignoredPods []*v1.Pod + }{ + "exclude deleted terminal pods": { + wantedPods: []*v1.Pod{ + newPod("matching-owned-0-", "node-0", simpleDaemonSetLabel, ds), + newPod("matching-orphan-0-", "node-0", simpleDaemonSetLabel, nil), + newPod("matching-owned-1-", "node-1", simpleDaemonSetLabel, ds), + newPod("matching-orphan-1-", "node-1", simpleDaemonSetLabel, nil), + func() *v1.Pod { + pod := newPod("matching-owned-succeeded-pod-0-", "node-0", simpleDaemonSetLabel, ds) + pod.Status = v1.PodStatus{Phase: v1.PodSucceeded} + return pod + }(), + func() *v1.Pod { + pod := newPod("matching-owned-failed-pod-1-", "node-1", simpleDaemonSetLabel, ds) + pod.Status = v1.PodStatus{Phase: v1.PodFailed} + return pod + }(), + }, + ignoredPods: []*v1.Pod{ + newPod("non-matching-owned-0-", "node-0", simpleDaemonSetLabel2, ds), + newPod("non-matching-orphan-1-", "node-1", simpleDaemonSetLabel2, nil), + newPod("matching-owned-by-other-0-", "node-0", simpleDaemonSetLabel, ds2), + func() *v1.Pod { + pod := newPod("matching-owned-succeeded-deleted-pod-0-", "node-0", simpleDaemonSetLabel, ds) + now := metav1.Now() + pod.DeletionTimestamp = &now + pod.Status = v1.PodStatus{Phase: v1.PodSucceeded} + return pod + }(), + func() *v1.Pod { + pod := newPod("matching-owned-failed-deleted-pod-1-", "node-1", simpleDaemonSetLabel, ds) + now := metav1.Now() + pod.DeletionTimestamp = &now + pod.Status = v1.PodStatus{Phase: v1.PodFailed} + return pod + }(), + }, + }, + "include deleted terminal pods": { + includeDeletedTerminal: true, + wantedPods: []*v1.Pod{ + newPod("matching-owned-0-", "node-0", simpleDaemonSetLabel, ds), + newPod("matching-orphan-0-", "node-0", simpleDaemonSetLabel, nil), + newPod("matching-owned-1-", "node-1", simpleDaemonSetLabel, ds), + newPod("matching-orphan-1-", "node-1", simpleDaemonSetLabel, nil), + func() *v1.Pod { + pod := newPod("matching-owned-succeeded-pod-0-", "node-0", simpleDaemonSetLabel, ds) + pod.Status = v1.PodStatus{Phase: v1.PodSucceeded} + return pod + }(), + func() *v1.Pod { + pod := newPod("matching-owned-failed-deleted-pod-1-", "node-1", simpleDaemonSetLabel, ds) + now := metav1.Now() + pod.DeletionTimestamp = &now + pod.Status = v1.PodStatus{Phase: v1.PodFailed} + return pod + }(), + }, + ignoredPods: []*v1.Pod{ + newPod("non-matching-owned-0-", "node-0", simpleDaemonSetLabel2, ds), + newPod("non-matching-orphan-1-", "node-1", simpleDaemonSetLabel2, nil), + newPod("matching-owned-by-other-0-", "node-0", simpleDaemonSetLabel, ds2), + }, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + manager, _, _, err := newTestController(ctx, ds, ds2) + if err != nil { + t.Fatalf("error creating DaemonSets controller: %v", err) + } + err = manager.dsStore.Add(ds) if err != nil { t.Fatal(err) } - } + err = manager.dsStore.Add(ds2) + if err != nil { + t.Fatal(err) + } + addNodes(manager.nodeStore, 0, 2, nil) - nodesToDaemonPods, err := manager.getNodesToDaemonPods(context.TODO(), ds) - if err != nil { - t.Fatalf("getNodesToDaemonPods() error: %v", err) - } - gotPods := map[string]bool{} - for node, pods := range nodesToDaemonPods { - for _, pod := range pods { - if pod.Spec.NodeName != node { - t.Errorf("pod %v grouped into %v but belongs in %v", pod.Name, node, pod.Spec.NodeName) + for _, pod := range tc.wantedPods { + manager.podStore.Add(pod) + } + + for _, pod := range tc.ignoredPods { + err = manager.podStore.Add(pod) + if err != nil { + t.Fatal(err) } - gotPods[pod.Name] = true } - } - for _, pod := range wantedPods { - if !gotPods[pod.Name] { - t.Errorf("expected pod %v but didn't get it", pod.Name) + + nodesToDaemonPods, err := manager.getNodesToDaemonPods(context.TODO(), ds, tc.includeDeletedTerminal) + if err != nil { + t.Fatalf("getNodesToDaemonPods() error: %v", err) } - delete(gotPods, pod.Name) - } - for podName := range gotPods { - t.Errorf("unexpected pod %v was returned", podName) - } + gotPods := map[string]bool{} + for node, pods := range nodesToDaemonPods { + for _, pod := range pods { + if pod.Spec.NodeName != node { + t.Errorf("pod %v grouped into %v but belongs in %v", pod.Name, node, pod.Spec.NodeName) + } + gotPods[pod.Name] = true + } + } + for _, pod := range tc.wantedPods { + if !gotPods[pod.Name] { + t.Errorf("expected pod %v but didn't get it", pod.Name) + } + delete(gotPods, pod.Name) + } + for podName := range gotPods { + t.Errorf("unexpected pod %v was returned", podName) + } + }) } } diff --git a/pkg/controller/daemon/update.go b/pkg/controller/daemon/update.go index 2665b170c04..d7755da95d6 100644 --- a/pkg/controller/daemon/update.go +++ b/pkg/controller/daemon/update.go @@ -43,7 +43,7 @@ import ( // remaining within the constraints imposed by the update strategy. func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error { logger := klog.FromContext(ctx) - nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds) + nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds, false) if err != nil { return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) } @@ -294,7 +294,8 @@ func (dsc *DaemonSetsController) constructHistory(ctx context.Context, ds *apps. } func (dsc *DaemonSetsController) cleanupHistory(ctx context.Context, ds *apps.DaemonSet, old []*apps.ControllerRevision) error { - nodesToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds) + // Include deleted terminal pods when maintaining history. + nodesToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds, true) if err != nil { return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) } diff --git a/test/integration/daemonset/daemonset_test.go b/test/integration/daemonset/daemonset_test.go index 4c22d1cc525..c1d84951745 100644 --- a/test/integration/daemonset/daemonset_test.go +++ b/test/integration/daemonset/daemonset_test.go @@ -49,6 +49,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/profile" labelsutil "k8s.io/kubernetes/pkg/util/labels" "k8s.io/kubernetes/test/integration/framework" + testutils "k8s.io/kubernetes/test/integration/util" "k8s.io/kubernetes/test/utils/ktesting" ) @@ -155,6 +156,7 @@ func newDaemonSet(name, namespace string) *apps.DaemonSet { } func cleanupDaemonSets(t *testing.T, cs clientset.Interface, ds *apps.DaemonSet) { + t.Helper() ds, err := cs.AppsV1().DaemonSets(ds.Namespace).Get(context.TODO(), ds.Name, metav1.GetOptions{}) if err != nil { t.Errorf("Failed to get DaemonSet %s/%s: %v", ds.Namespace, ds.Name, err) @@ -176,6 +178,10 @@ func cleanupDaemonSets(t *testing.T, cs clientset.Interface, ds *apps.DaemonSet) return } + if len(ds.Spec.Template.Finalizers) > 0 { + testutils.RemovePodFinalizersInNamespace(context.TODO(), cs, t, ds.Namespace) + } + // Wait for the daemon set controller to kill all the daemon pods. if err := wait.Poll(100*time.Millisecond, 30*time.Second, func() (bool, error) { updatedDS, err := cs.AppsV1().DaemonSets(ds.Namespace).Get(context.TODO(), ds.Name, metav1.GetOptions{}) @@ -275,9 +281,7 @@ func validateDaemonSetPodsAndMarkReady( ) { if err := wait.Poll(time.Second, 60*time.Second, func() (bool, error) { objects := podInformer.GetIndexer().List() - if len(objects) != numberPods { - return false, nil - } + nonTerminatedPods := 0 for _, object := range objects { pod := object.(*v1.Pod) @@ -294,6 +298,10 @@ func validateDaemonSetPodsAndMarkReady( t.Errorf("controllerRef.Controller is not set to true") } + if podutil.IsPodPhaseTerminal(pod.Status.Phase) { + continue + } + nonTerminatedPods++ if !podutil.IsPodReady(pod) && len(pod.Spec.NodeName) != 0 { podCopy := pod.DeepCopy() podCopy.Status = v1.PodStatus{ @@ -307,7 +315,7 @@ func validateDaemonSetPodsAndMarkReady( } } - return true, nil + return nonTerminatedPods == numberPods, nil }); err != nil { t.Fatal(err) } @@ -536,8 +544,23 @@ func TestSimpleDaemonSetLaunchesPods(t *testing.T) { } func TestSimpleDaemonSetRestartsPodsOnTerminalPhase(t *testing.T) { - for _, podPhase := range []v1.PodPhase{v1.PodSucceeded, v1.PodFailed} { - t.Run(string(podPhase), func(t *testing.T) { + cases := map[string]struct { + phase v1.PodPhase + finalizer bool + }{ + "Succeeded": { + phase: v1.PodSucceeded, + }, + "Failed": { + phase: v1.PodFailed, + }, + "Succeeded with finalizer": { + phase: v1.PodSucceeded, + finalizer: true, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) { ctx, closeFn, dc, informers, clientset := setup(t) defer closeFn() @@ -553,6 +576,9 @@ func TestSimpleDaemonSetRestartsPodsOnTerminalPhase(t *testing.T) { go dc.Run(ctx, 2) ds := newDaemonSet("restart-terminal-pod", ns.Name) + if tc.finalizer { + ds.Spec.Template.Finalizers = append(ds.Spec.Template.Finalizers, "test.k8s.io/finalizer") + } ds.Spec.UpdateStrategy = *strategy if _, err := dsClient.Create(ctx, ds, metav1.CreateOptions{}); err != nil { t.Fatalf("Failed to create DaemonSet: %v", err) @@ -566,9 +592,9 @@ func TestSimpleDaemonSetRestartsPodsOnTerminalPhase(t *testing.T) { validateDaemonSetStatus(dsClient, ds.Name, int32(numNodes), t) podToMarkAsTerminal := podInformer.GetIndexer().List()[0].(*v1.Pod) podCopy := podToMarkAsTerminal.DeepCopy() - podCopy.Status.Phase = podPhase + podCopy.Status.Phase = tc.phase if _, err := podClient.UpdateStatus(ctx, podCopy, metav1.UpdateOptions{}); err != nil { - t.Fatalf("Failed to mark the pod as terminal with phase: %v. Error: %v", podPhase, err) + t.Fatalf("Failed to mark the pod as terminal with phase: %v. Error: %v", tc.phase, err) } // verify all pods are active. They either continue Running or are Pending after restart validateDaemonSetPodsActive(podClient, podInformer, numNodes, t) diff --git a/test/integration/podgc/podgc_test.go b/test/integration/podgc/podgc_test.go index 76ca9b9ed73..4957e9fc443 100644 --- a/test/integration/podgc/podgc_test.go +++ b/test/integration/podgc/podgc_test.go @@ -116,7 +116,7 @@ func TestPodGcOrphanedPodsWithFinalizer(t *testing.T) { if err != nil { t.Fatalf("Error %v, while creating pod: %v", err, klog.KObj(pod)) } - defer testutils.RemovePodFinalizers(testCtx.ClientSet, t, []*v1.Pod{pod}) + defer testutils.RemovePodFinalizers(testCtx.Ctx, testCtx.ClientSet, t, *pod) pod.Status.Phase = test.phase if _, err := testCtx.ClientSet.CoreV1().Pods(testCtx.NS.Name).UpdateStatus(testCtx.Ctx, pod, metav1.UpdateOptions{}); err != nil { @@ -224,7 +224,7 @@ func TestTerminatingOnOutOfServiceNode(t *testing.T) { t.Fatalf("Error %v, while creating pod: %v", err, klog.KObj(pod)) } if test.withFinalizer { - defer testutils.RemovePodFinalizers(testCtx.ClientSet, t, []*v1.Pod{pod}) + defer testutils.RemovePodFinalizers(testCtx.Ctx, testCtx.ClientSet, t, *pod) } // trigger termination of the pod, but with long grace period so that it is not removed immediately diff --git a/test/integration/util/util.go b/test/integration/util/util.go index 359459e6846..69f24213fa2 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -18,6 +18,7 @@ package util import ( "context" + "encoding/json" "errors" "fmt" "testing" @@ -27,6 +28,7 @@ import ( policy "k8s.io/api/policy/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/admission" @@ -229,17 +231,33 @@ func CleanupTest(t *testing.T, testCtx *TestContext) { testCtx.CloseFn() } +func RemovePodFinalizersInNamespace(ctx context.Context, cs clientset.Interface, t *testing.T, ns string) { + t.Helper() + pods, err := cs.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{}) + if err != nil { + t.Fatalf("Failed obtaining list of pods: %v", err) + } + RemovePodFinalizers(ctx, cs, t, pods.Items...) +} + // RemovePodFinalizers removes pod finalizers for the pods -func RemovePodFinalizers(cs clientset.Interface, t *testing.T, pods []*v1.Pod) { +func RemovePodFinalizers(ctx context.Context, cs clientset.Interface, t *testing.T, pods ...v1.Pod) { + t.Helper() for _, p := range pods { - pod, err := cs.CoreV1().Pods(p.Namespace).Get(context.TODO(), p.Name, metav1.GetOptions{}) + pod, err := cs.CoreV1().Pods(p.Namespace).Get(ctx, p.Name, metav1.GetOptions{}) if err != nil && !apierrors.IsNotFound(err) { - t.Errorf("error while removing pod finalizers for %v: %v", klog.KObj(p), err) - } else if pod != nil { - pod.ObjectMeta.Finalizers = nil - _, err = cs.CoreV1().Pods(pod.Namespace).Update(context.TODO(), pod, metav1.UpdateOptions{}) + t.Errorf("error while removing pod finalizers for %v: %v", klog.KObj(&p), err) + } else if pod != nil && len(pod.Finalizers) > 0 { + // Use Patch to remove finalizer, instead of Update, to avoid transient + // conflicts. + patchBytes, _ := json.Marshal(map[string]interface{}{ + "metadata": map[string]interface{}{ + "$deleteFromPrimitiveList/finalizers": pod.Finalizers, + }, + }) + _, err = cs.CoreV1().Pods(pod.Namespace).Patch(ctx, pod.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) if err != nil { - t.Errorf("error while updating pod status for %v: %v", klog.KObj(p), err) + t.Errorf("error while updating pod status for %v: %v", klog.KObj(&p), err) } } }