diff --git a/test/integration/volume/attach_detach_test.go b/test/integration/volume/attach_detach_test.go index 73474676bb2..a851f28d453 100644 --- a/test/integration/volume/attach_detach_test.go +++ b/test/integration/volume/attach_detach_test.go @@ -30,9 +30,14 @@ import ( clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" + basemetric "k8s.io/component-base/metrics" + metricstestutil "k8s.io/component-base/metrics/testutil" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" + "k8s.io/kubernetes/pkg/controller/podgc" + podgcmetrics "k8s.io/kubernetes/pkg/controller/podgc/metrics" "k8s.io/kubernetes/pkg/controller/volume/attachdetach" volumecache "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" + "k8s.io/kubernetes/pkg/controller/volume/attachdetach/metrics" "k8s.io/kubernetes/pkg/controller/volume/persistentvolume" persistentvolumeoptions "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/options" "k8s.io/kubernetes/pkg/volume" @@ -137,6 +142,98 @@ var defaultTimerConfig = attachdetach.TimerConfig{ DesiredStateOfWorldPopulatorListPodsRetryDuration: 3 * time.Second, } +// TestPodTerminationWithNodeOOSDetach integration test verifies that if `out-of-service` taints is applied to the node +// Which is shutdown non gracefully, then all the pods will immediately get terminated and volume be immediately detached +// without waiting for the default timout period +func TestPodTerminationWithNodeOOSDetach(t *testing.T) { + // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. + server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd()) + defer server.TearDownFn() + + tCtx := ktesting.Init(t) + defer tCtx.Cancel("test has completed") + testClient, ctrl, pvCtrl, podgcCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, attachdetach.TimerConfig{ + ReconcilerLoopPeriod: 100 * time.Millisecond, + ReconcilerMaxWaitForUnmountDuration: 6 * time.Second, + DesiredStateOfWorldPopulatorLoopSleepPeriod: 24 * time.Hour, + // Use high duration to disable DesiredStateOfWorldPopulator.findAndAddActivePods loop in test. + DesiredStateOfWorldPopulatorListPodsRetryDuration: 24 * time.Hour, + }) + + namespaceName := "test-node-oos" + nodeName := "node-sandbox" + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName, + Annotations: map[string]string{ + util.ControllerManagedAttachAnnotation: "true", + }, + }, + } + ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t) + defer framework.DeleteNamespaceOrDie(testClient, ns, t) + + _, err := testClient.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to created node : %v", err) + } + + pod := fakePodWithVol(namespaceName) + if _, err := testClient.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}); err != nil { + t.Errorf("Failed to create pod : %v", err) + } + + // start controller loop + podInformer := informers.Core().V1().Pods().Informer() + informers.Start(tCtx.Done()) + informers.WaitForCacheSync(tCtx.Done()) + go ctrl.Run(tCtx) + go pvCtrl.Run(tCtx) + go podgcCtrl.Run(tCtx) + + waitToObservePods(t, podInformer, 1) + // wait for volume to be attached + waitForVolumeToBeAttached(tCtx, t, testClient, pod.Name, nodeName) + + // Patch the node to mark the volume in use as attach-detach controller verifies if safe to detach the volume + // based on that. + node.Status.VolumesInUse = append(node.Status.VolumesInUse, "kubernetes.io/mock-provisioner/fake-mount") + node, err = testClient.CoreV1().Nodes().UpdateStatus(context.TODO(), node, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("error in patch volumeInUse status to nodes: %s", err) + } + // Delete the pod with grace period time so that it is stuck in terminating state + gracePeriod := int64(300) + err = testClient.CoreV1().Pods(namespaceName).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{ + GracePeriodSeconds: &gracePeriod, + }) + if err != nil { + t.Fatalf("error in deleting pod: %v", err) + } + + // varify that DeletionTimestamp is not nil, means pod is in `Terminating` stsate + waitForPodDeletionTimestampToSet(tCtx, t, testClient, pod.Name, pod.Namespace) + + // taint the node `out-of-service` + taint := v1.Taint{ + Key: v1.TaintNodeOutOfService, + Effect: v1.TaintEffectNoExecute, + } + node.Spec.Taints = append(node.Spec.Taints, taint) + if _, err := testClient.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{}); err != nil { + t.Fatalf("error in patch oos taint to node: %v", err) + } + waitForNodeToBeTainted(tCtx, t, testClient, v1.TaintNodeOutOfService, nodeName) + + // Verify if the pod was force deleted. + // When the node has out-of-service taint, and only if node is NotReady and pod is Terminating force delete will happen. + waitForMetric(tCtx, t, podgcmetrics.DeletingPodsTotal.WithLabelValues(namespaceName, podgcmetrics.PodGCReasonTerminatingOutOfService), 1, "terminating-pod-metric") + // verify the volume was force detached + // Note: Metrics are accumulating + waitForMetric(tCtx, t, metrics.ForceDetachMetricCounter.WithLabelValues(metrics.ForceDetachReasonOutOfService), 1, "detach-metric") + +} + // Via integration test we can verify that if pod delete // event is somehow missed by AttachDetach controller - it still // gets cleaned up by Desired State of World populator. @@ -157,7 +254,7 @@ func TestPodDeletionWithDswp(t *testing.T) { tCtx := ktesting.Init(t) defer tCtx.Cancel("test has completed") - testClient, ctrl, pvCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, defaultTimerConfig) + testClient, ctrl, pvCtrl, podgcCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, defaultTimerConfig) ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t) defer framework.DeleteNamespaceOrDie(testClient, ns, t) @@ -184,6 +281,7 @@ func TestPodDeletionWithDswp(t *testing.T) { go ctrl.Run(tCtx) // Run pvCtrl to avoid leaking goroutines started during its creation. go pvCtrl.Run(tCtx) + go podgcCtrl.Run(tCtx) waitToObservePods(t, podInformer, 1) podKey, err := cache.MetaNamespaceKeyFunc(pod) @@ -214,7 +312,7 @@ func initCSIObjects(stopCh <-chan struct{}, informers clientgoinformers.SharedIn go informers.Storage().V1().CSIDrivers().Informer().Run(stopCh) } -func TestPodUpdateWithWithADC(t *testing.T) { +func TestPodUpdateWithADC(t *testing.T) { // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd()) defer server.TearDownFn() @@ -231,7 +329,7 @@ func TestPodUpdateWithWithADC(t *testing.T) { tCtx := ktesting.Init(t) defer tCtx.Cancel("test has completed") - testClient, ctrl, pvCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, defaultTimerConfig) + testClient, ctrl, pvCtrl, podgcCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, defaultTimerConfig) ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t) defer framework.DeleteNamespaceOrDie(testClient, ns, t) @@ -261,6 +359,7 @@ func TestPodUpdateWithWithADC(t *testing.T) { go ctrl.Run(tCtx) // Run pvCtrl to avoid leaking goroutines started during its creation. go pvCtrl.Run(tCtx) + go podgcCtrl.Run(tCtx) waitToObservePods(t, podInformer, 1) podKey, err := cache.MetaNamespaceKeyFunc(pod) @@ -326,7 +425,7 @@ func waitForPodFuncInDSWP(t *testing.T, dswp volumecache.DesiredStateOfWorld, ch } } -func createAdClients(ctx context.Context, t *testing.T, server *kubeapiservertesting.TestServer, syncPeriod time.Duration, timers attachdetach.TimerConfig) (*clientset.Clientset, attachdetach.AttachDetachController, *persistentvolume.PersistentVolumeController, clientgoinformers.SharedInformerFactory) { +func createAdClients(ctx context.Context, t *testing.T, server *kubeapiservertesting.TestServer, syncPeriod time.Duration, timers attachdetach.TimerConfig) (*clientset.Clientset, attachdetach.AttachDetachController, *persistentvolume.PersistentVolumeController, *podgc.PodGCController, clientgoinformers.SharedInformerFactory) { config := restclient.CopyConfig(server.ClientConfig) config.QPS = 1000000 config.Burst = 1000000 @@ -383,11 +482,20 @@ func createAdClients(ctx context.Context, t *testing.T, server *kubeapiservertes NodeInformer: informers.Core().V1().Nodes(), EnableDynamicProvisioning: false, } + podgcCtrl := podgc.NewPodGCInternal( + ctx, + testClient, + informers.Core().V1().Pods(), + informers.Core().V1().Nodes(), + 0, + 500*time.Millisecond, + time.Second, + ) pvCtrl, err := persistentvolume.NewController(ctx, params) if err != nil { t.Fatalf("Failed to create PV controller: %v", err) } - return testClient, ctrl, pvCtrl, informers + return testClient, ctrl, pvCtrl, podgcCtrl, informers } // Via integration test we can verify that if pod add @@ -410,7 +518,7 @@ func TestPodAddedByDswp(t *testing.T) { tCtx := ktesting.Init(t) defer tCtx.Cancel("test has completed") - testClient, ctrl, pvCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, defaultTimerConfig) + testClient, ctrl, pvCtrl, podgcCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, defaultTimerConfig) ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t) defer framework.DeleteNamespaceOrDie(testClient, ns, t) @@ -439,6 +547,7 @@ func TestPodAddedByDswp(t *testing.T) { go ctrl.Run(tCtx) // Run pvCtrl to avoid leaking goroutines started during its creation. go pvCtrl.Run(tCtx) + go podgcCtrl.Run(tCtx) waitToObservePods(t, podInformer, 1) podKey, err := cache.MetaNamespaceKeyFunc(pod) @@ -480,7 +589,7 @@ func TestPVCBoundWithADC(t *testing.T) { namespaceName := "test-pod-deletion" - testClient, ctrl, pvCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, attachdetach.TimerConfig{ + testClient, ctrl, pvCtrl, podgcCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, attachdetach.TimerConfig{ ReconcilerLoopPeriod: 100 * time.Millisecond, ReconcilerMaxWaitForUnmountDuration: 6 * time.Second, DesiredStateOfWorldPopulatorLoopSleepPeriod: 24 * time.Hour, @@ -528,6 +637,7 @@ func TestPVCBoundWithADC(t *testing.T) { initCSIObjects(tCtx.Done(), informers) go ctrl.Run(tCtx) go pvCtrl.Run(tCtx) + go podgcCtrl.Run(tCtx) waitToObservePods(t, informers.Core().V1().Pods().Informer(), 4) // Give attachdetach controller enough time to populate pods into DSWP. @@ -561,3 +671,70 @@ func createPVForPVC(t *testing.T, testClient *clientset.Clientset, pvc *v1.Persi t.Errorf("Failed to create pv : %v", err) } } + +// Wait for DeletionTimestamp added to pod +func waitForPodDeletionTimestampToSet(tCtx context.Context, t *testing.T, testingClient *clientset.Clientset, podName, podNamespace string) { + if err := wait.PollUntilContextCancel(tCtx, 100*time.Millisecond, false, func(context.Context) (bool, error) { + pod, err := testingClient.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + if pod.DeletionTimestamp != nil { + return true, nil + } + return false, nil + }); err != nil { + t.Fatalf("Failed to get deletionTimestamp of pod: %s, namespace: %s", podName, podNamespace) + } +} + +// Wait for VolumeAttach added to node +func waitForVolumeToBeAttached(ctx context.Context, t *testing.T, testingClient *clientset.Clientset, podName, nodeName string) { + if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 120*time.Second, false, func(context.Context) (bool, error) { + node, err := testingClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) + if len(node.Status.VolumesAttached) >= 1 { + return true, nil + } + if err != nil { + t.Fatalf("Failed to get the node : %v", err) + } + return false, nil + }); err != nil { + t.Fatalf("Failed to attach volume to pod: %s for node: %s", podName, nodeName) + } +} + +// Wait for taint added to node +func waitForNodeToBeTainted(ctx context.Context, t *testing.T, testingClient *clientset.Clientset, taintKey, nodeName string) { + if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 60*time.Second, false, func(context.Context) (bool, error) { + node, err := testingClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + for _, taint := range node.Spec.Taints { + if taint.Key == taintKey { + return true, nil + } + } + return false, nil + }); err != nil { + t.Fatalf("Failed to add taint: %s to node: %s", taintKey, nodeName) + } +} + +func waitForMetric(ctx context.Context, t *testing.T, m basemetric.CounterMetric, expectedCount float64, identifier string) { + if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 60*time.Second, false, func(ctx context.Context) (done bool, err error) { + gotCount, err := metricstestutil.GetCounterMetricValue(m) + if err != nil { + t.Fatal(err, identifier) + } + t.Logf("expected metric count %g but got %g for %s", expectedCount, gotCount, identifier) + // As metrics are global, this condition ( >= ) is applied, just to check the minimum expectation. + if gotCount >= expectedCount { + return true, nil + } + return false, nil + }); err != nil { + t.Fatalf("Failed to match the count of metrics to expect: %v", expectedCount) + } +}