From 6910e766991a4470fdf530ebbd5b2d9ba5f485e7 Mon Sep 17 00:00:00 2001 From: geyingqi Date: Sat, 26 Oct 2024 12:33:57 +0800 Subject: [PATCH 1/4] test:Add podgcCtrl to attachdetachClients --- test/integration/volume/attach_detach_test.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/test/integration/volume/attach_detach_test.go b/test/integration/volume/attach_detach_test.go index 73474676bb2..4f5c3340558 100644 --- a/test/integration/volume/attach_detach_test.go +++ b/test/integration/volume/attach_detach_test.go @@ -31,6 +31,7 @@ import ( restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" + "k8s.io/kubernetes/pkg/controller/podgc" "k8s.io/kubernetes/pkg/controller/volume/attachdetach" volumecache "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" "k8s.io/kubernetes/pkg/controller/volume/persistentvolume" @@ -326,7 +327,7 @@ func waitForPodFuncInDSWP(t *testing.T, dswp volumecache.DesiredStateOfWorld, ch } } -func createAdClients(ctx context.Context, t *testing.T, server *kubeapiservertesting.TestServer, syncPeriod time.Duration, timers attachdetach.TimerConfig) (*clientset.Clientset, attachdetach.AttachDetachController, *persistentvolume.PersistentVolumeController, clientgoinformers.SharedInformerFactory) { +func createAdClients(ctx context.Context, t *testing.T, server *kubeapiservertesting.TestServer, syncPeriod time.Duration, timers attachdetach.TimerConfig) (*clientset.Clientset, attachdetach.AttachDetachController, *persistentvolume.PersistentVolumeController, *podgc.PodGCController, clientgoinformers.SharedInformerFactory) { config := restclient.CopyConfig(server.ClientConfig) config.QPS = 1000000 config.Burst = 1000000 @@ -383,11 +384,20 @@ func createAdClients(ctx context.Context, t *testing.T, server *kubeapiservertes NodeInformer: informers.Core().V1().Nodes(), EnableDynamicProvisioning: false, } + podgcCtrl := podgc.NewPodGCInternal( + ctx, + testClient, + informers.Core().V1().Pods(), + informers.Core().V1().Nodes(), + 0, + 500*time.Millisecond, + time.Second, + ) pvCtrl, err := persistentvolume.NewController(ctx, params) if err != nil { t.Fatalf("Failed to create PV controller: %v", err) } - return testClient, ctrl, pvCtrl, informers + return testClient, ctrl, pvCtrl, podgcCtrl, informers } // Via integration test we can verify that if pod add From 734c36851fb94d27cb021b392702cad512a039f4 Mon Sep 17 00:00:00 2001 From: geyingqi Date: Sat, 26 Oct 2024 12:34:12 +0800 Subject: [PATCH 2/4] test:Add podgcCtrl to attachdetachClients --- test/integration/volume/attach_detach_test.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/test/integration/volume/attach_detach_test.go b/test/integration/volume/attach_detach_test.go index 4f5c3340558..ce89ea36bd7 100644 --- a/test/integration/volume/attach_detach_test.go +++ b/test/integration/volume/attach_detach_test.go @@ -158,7 +158,7 @@ func TestPodDeletionWithDswp(t *testing.T) { tCtx := ktesting.Init(t) defer tCtx.Cancel("test has completed") - testClient, ctrl, pvCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, defaultTimerConfig) + testClient, ctrl, pvCtrl, podgcCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, defaultTimerConfig) ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t) defer framework.DeleteNamespaceOrDie(testClient, ns, t) @@ -185,6 +185,7 @@ func TestPodDeletionWithDswp(t *testing.T) { go ctrl.Run(tCtx) // Run pvCtrl to avoid leaking goroutines started during its creation. go pvCtrl.Run(tCtx) + go podgcCtrl.Run(tCtx) waitToObservePods(t, podInformer, 1) podKey, err := cache.MetaNamespaceKeyFunc(pod) @@ -232,7 +233,7 @@ func TestPodUpdateWithWithADC(t *testing.T) { tCtx := ktesting.Init(t) defer tCtx.Cancel("test has completed") - testClient, ctrl, pvCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, defaultTimerConfig) + testClient, ctrl, pvCtrl, podgcCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, defaultTimerConfig) ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t) defer framework.DeleteNamespaceOrDie(testClient, ns, t) @@ -262,6 +263,7 @@ func TestPodUpdateWithWithADC(t *testing.T) { go ctrl.Run(tCtx) // Run pvCtrl to avoid leaking goroutines started during its creation. go pvCtrl.Run(tCtx) + go podgcCtrl.Run(tCtx) waitToObservePods(t, podInformer, 1) podKey, err := cache.MetaNamespaceKeyFunc(pod) @@ -420,7 +422,7 @@ func TestPodAddedByDswp(t *testing.T) { tCtx := ktesting.Init(t) defer tCtx.Cancel("test has completed") - testClient, ctrl, pvCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, defaultTimerConfig) + testClient, ctrl, pvCtrl, podgcCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, defaultTimerConfig) ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t) defer framework.DeleteNamespaceOrDie(testClient, ns, t) @@ -449,6 +451,7 @@ func TestPodAddedByDswp(t *testing.T) { go ctrl.Run(tCtx) // Run pvCtrl to avoid leaking goroutines started during its creation. go pvCtrl.Run(tCtx) + go podgcCtrl.Run(tCtx) waitToObservePods(t, podInformer, 1) podKey, err := cache.MetaNamespaceKeyFunc(pod) @@ -490,7 +493,7 @@ func TestPVCBoundWithADC(t *testing.T) { namespaceName := "test-pod-deletion" - testClient, ctrl, pvCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, attachdetach.TimerConfig{ + testClient, ctrl, pvCtrl, podgcCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, attachdetach.TimerConfig{ ReconcilerLoopPeriod: 100 * time.Millisecond, ReconcilerMaxWaitForUnmountDuration: 6 * time.Second, DesiredStateOfWorldPopulatorLoopSleepPeriod: 24 * time.Hour, @@ -538,6 +541,7 @@ func TestPVCBoundWithADC(t *testing.T) { initCSIObjects(tCtx.Done(), informers) go ctrl.Run(tCtx) go pvCtrl.Run(tCtx) + go podgcCtrl.Run(tCtx) waitToObservePods(t, informers.Core().V1().Pods().Informer(), 4) // Give attachdetach controller enough time to populate pods into DSWP. From 94f6422562e3c6ec343b4a2d3c220c323e1a62e4 Mon Sep 17 00:00:00 2001 From: geyingqi Date: Sat, 26 Oct 2024 12:56:37 +0800 Subject: [PATCH 3/4] test:Fix typo --- test/integration/volume/attach_detach_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/volume/attach_detach_test.go b/test/integration/volume/attach_detach_test.go index ce89ea36bd7..880b5fe24a1 100644 --- a/test/integration/volume/attach_detach_test.go +++ b/test/integration/volume/attach_detach_test.go @@ -216,7 +216,7 @@ func initCSIObjects(stopCh <-chan struct{}, informers clientgoinformers.SharedIn go informers.Storage().V1().CSIDrivers().Informer().Run(stopCh) } -func TestPodUpdateWithWithADC(t *testing.T) { +func TestPodUpdateWithADC(t *testing.T) { // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd()) defer server.TearDownFn() From 22c41caca9f2d91d7cc1e20fab87f70caa0b3a06 Mon Sep 17 00:00:00 2001 From: geyingqi Date: Sat, 26 Oct 2024 17:09:13 +0800 Subject: [PATCH 4/4] Add integrate test to OOS node --- test/integration/volume/attach_detach_test.go | 163 ++++++++++++++++++ 1 file changed, 163 insertions(+) diff --git a/test/integration/volume/attach_detach_test.go b/test/integration/volume/attach_detach_test.go index 880b5fe24a1..a851f28d453 100644 --- a/test/integration/volume/attach_detach_test.go +++ b/test/integration/volume/attach_detach_test.go @@ -30,10 +30,14 @@ import ( clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" + basemetric "k8s.io/component-base/metrics" + metricstestutil "k8s.io/component-base/metrics/testutil" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" "k8s.io/kubernetes/pkg/controller/podgc" + podgcmetrics "k8s.io/kubernetes/pkg/controller/podgc/metrics" "k8s.io/kubernetes/pkg/controller/volume/attachdetach" volumecache "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" + "k8s.io/kubernetes/pkg/controller/volume/attachdetach/metrics" "k8s.io/kubernetes/pkg/controller/volume/persistentvolume" persistentvolumeoptions "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/options" "k8s.io/kubernetes/pkg/volume" @@ -138,6 +142,98 @@ var defaultTimerConfig = attachdetach.TimerConfig{ DesiredStateOfWorldPopulatorListPodsRetryDuration: 3 * time.Second, } +// TestPodTerminationWithNodeOOSDetach integration test verifies that if `out-of-service` taints is applied to the node +// Which is shutdown non gracefully, then all the pods will immediately get terminated and volume be immediately detached +// without waiting for the default timout period +func TestPodTerminationWithNodeOOSDetach(t *testing.T) { + // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. + server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd()) + defer server.TearDownFn() + + tCtx := ktesting.Init(t) + defer tCtx.Cancel("test has completed") + testClient, ctrl, pvCtrl, podgcCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, attachdetach.TimerConfig{ + ReconcilerLoopPeriod: 100 * time.Millisecond, + ReconcilerMaxWaitForUnmountDuration: 6 * time.Second, + DesiredStateOfWorldPopulatorLoopSleepPeriod: 24 * time.Hour, + // Use high duration to disable DesiredStateOfWorldPopulator.findAndAddActivePods loop in test. + DesiredStateOfWorldPopulatorListPodsRetryDuration: 24 * time.Hour, + }) + + namespaceName := "test-node-oos" + nodeName := "node-sandbox" + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName, + Annotations: map[string]string{ + util.ControllerManagedAttachAnnotation: "true", + }, + }, + } + ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t) + defer framework.DeleteNamespaceOrDie(testClient, ns, t) + + _, err := testClient.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to created node : %v", err) + } + + pod := fakePodWithVol(namespaceName) + if _, err := testClient.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}); err != nil { + t.Errorf("Failed to create pod : %v", err) + } + + // start controller loop + podInformer := informers.Core().V1().Pods().Informer() + informers.Start(tCtx.Done()) + informers.WaitForCacheSync(tCtx.Done()) + go ctrl.Run(tCtx) + go pvCtrl.Run(tCtx) + go podgcCtrl.Run(tCtx) + + waitToObservePods(t, podInformer, 1) + // wait for volume to be attached + waitForVolumeToBeAttached(tCtx, t, testClient, pod.Name, nodeName) + + // Patch the node to mark the volume in use as attach-detach controller verifies if safe to detach the volume + // based on that. + node.Status.VolumesInUse = append(node.Status.VolumesInUse, "kubernetes.io/mock-provisioner/fake-mount") + node, err = testClient.CoreV1().Nodes().UpdateStatus(context.TODO(), node, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("error in patch volumeInUse status to nodes: %s", err) + } + // Delete the pod with grace period time so that it is stuck in terminating state + gracePeriod := int64(300) + err = testClient.CoreV1().Pods(namespaceName).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{ + GracePeriodSeconds: &gracePeriod, + }) + if err != nil { + t.Fatalf("error in deleting pod: %v", err) + } + + // varify that DeletionTimestamp is not nil, means pod is in `Terminating` stsate + waitForPodDeletionTimestampToSet(tCtx, t, testClient, pod.Name, pod.Namespace) + + // taint the node `out-of-service` + taint := v1.Taint{ + Key: v1.TaintNodeOutOfService, + Effect: v1.TaintEffectNoExecute, + } + node.Spec.Taints = append(node.Spec.Taints, taint) + if _, err := testClient.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{}); err != nil { + t.Fatalf("error in patch oos taint to node: %v", err) + } + waitForNodeToBeTainted(tCtx, t, testClient, v1.TaintNodeOutOfService, nodeName) + + // Verify if the pod was force deleted. + // When the node has out-of-service taint, and only if node is NotReady and pod is Terminating force delete will happen. + waitForMetric(tCtx, t, podgcmetrics.DeletingPodsTotal.WithLabelValues(namespaceName, podgcmetrics.PodGCReasonTerminatingOutOfService), 1, "terminating-pod-metric") + // verify the volume was force detached + // Note: Metrics are accumulating + waitForMetric(tCtx, t, metrics.ForceDetachMetricCounter.WithLabelValues(metrics.ForceDetachReasonOutOfService), 1, "detach-metric") + +} + // Via integration test we can verify that if pod delete // event is somehow missed by AttachDetach controller - it still // gets cleaned up by Desired State of World populator. @@ -575,3 +671,70 @@ func createPVForPVC(t *testing.T, testClient *clientset.Clientset, pvc *v1.Persi t.Errorf("Failed to create pv : %v", err) } } + +// Wait for DeletionTimestamp added to pod +func waitForPodDeletionTimestampToSet(tCtx context.Context, t *testing.T, testingClient *clientset.Clientset, podName, podNamespace string) { + if err := wait.PollUntilContextCancel(tCtx, 100*time.Millisecond, false, func(context.Context) (bool, error) { + pod, err := testingClient.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + if pod.DeletionTimestamp != nil { + return true, nil + } + return false, nil + }); err != nil { + t.Fatalf("Failed to get deletionTimestamp of pod: %s, namespace: %s", podName, podNamespace) + } +} + +// Wait for VolumeAttach added to node +func waitForVolumeToBeAttached(ctx context.Context, t *testing.T, testingClient *clientset.Clientset, podName, nodeName string) { + if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 120*time.Second, false, func(context.Context) (bool, error) { + node, err := testingClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) + if len(node.Status.VolumesAttached) >= 1 { + return true, nil + } + if err != nil { + t.Fatalf("Failed to get the node : %v", err) + } + return false, nil + }); err != nil { + t.Fatalf("Failed to attach volume to pod: %s for node: %s", podName, nodeName) + } +} + +// Wait for taint added to node +func waitForNodeToBeTainted(ctx context.Context, t *testing.T, testingClient *clientset.Clientset, taintKey, nodeName string) { + if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 60*time.Second, false, func(context.Context) (bool, error) { + node, err := testingClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + for _, taint := range node.Spec.Taints { + if taint.Key == taintKey { + return true, nil + } + } + return false, nil + }); err != nil { + t.Fatalf("Failed to add taint: %s to node: %s", taintKey, nodeName) + } +} + +func waitForMetric(ctx context.Context, t *testing.T, m basemetric.CounterMetric, expectedCount float64, identifier string) { + if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 60*time.Second, false, func(ctx context.Context) (done bool, err error) { + gotCount, err := metricstestutil.GetCounterMetricValue(m) + if err != nil { + t.Fatal(err, identifier) + } + t.Logf("expected metric count %g but got %g for %s", expectedCount, gotCount, identifier) + // As metrics are global, this condition ( >= ) is applied, just to check the minimum expectation. + if gotCount >= expectedCount { + return true, nil + } + return false, nil + }); err != nil { + t.Fatalf("Failed to match the count of metrics to expect: %v", expectedCount) + } +}