mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-03 01:06:27 +00:00
Merge pull request #128404 from mowangdk/test/add_integrate_test_of_node_OOS
Add non graceful shutdown integration test
This commit is contained in:
commit
5cfad4f858
@ -30,9 +30,14 @@ import (
|
|||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
restclient "k8s.io/client-go/rest"
|
restclient "k8s.io/client-go/rest"
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
|
basemetric "k8s.io/component-base/metrics"
|
||||||
|
metricstestutil "k8s.io/component-base/metrics/testutil"
|
||||||
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
||||||
|
"k8s.io/kubernetes/pkg/controller/podgc"
|
||||||
|
podgcmetrics "k8s.io/kubernetes/pkg/controller/podgc/metrics"
|
||||||
"k8s.io/kubernetes/pkg/controller/volume/attachdetach"
|
"k8s.io/kubernetes/pkg/controller/volume/attachdetach"
|
||||||
volumecache "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
|
volumecache "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
|
||||||
|
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/metrics"
|
||||||
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
|
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
|
||||||
persistentvolumeoptions "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/options"
|
persistentvolumeoptions "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/options"
|
||||||
"k8s.io/kubernetes/pkg/volume"
|
"k8s.io/kubernetes/pkg/volume"
|
||||||
@ -137,6 +142,98 @@ var defaultTimerConfig = attachdetach.TimerConfig{
|
|||||||
DesiredStateOfWorldPopulatorListPodsRetryDuration: 3 * time.Second,
|
DesiredStateOfWorldPopulatorListPodsRetryDuration: 3 * time.Second,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestPodTerminationWithNodeOOSDetach integration test verifies that if `out-of-service` taints is applied to the node
|
||||||
|
// Which is shutdown non gracefully, then all the pods will immediately get terminated and volume be immediately detached
|
||||||
|
// without waiting for the default timout period
|
||||||
|
func TestPodTerminationWithNodeOOSDetach(t *testing.T) {
|
||||||
|
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
|
||||||
|
server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd())
|
||||||
|
defer server.TearDownFn()
|
||||||
|
|
||||||
|
tCtx := ktesting.Init(t)
|
||||||
|
defer tCtx.Cancel("test has completed")
|
||||||
|
testClient, ctrl, pvCtrl, podgcCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, attachdetach.TimerConfig{
|
||||||
|
ReconcilerLoopPeriod: 100 * time.Millisecond,
|
||||||
|
ReconcilerMaxWaitForUnmountDuration: 6 * time.Second,
|
||||||
|
DesiredStateOfWorldPopulatorLoopSleepPeriod: 24 * time.Hour,
|
||||||
|
// Use high duration to disable DesiredStateOfWorldPopulator.findAndAddActivePods loop in test.
|
||||||
|
DesiredStateOfWorldPopulatorListPodsRetryDuration: 24 * time.Hour,
|
||||||
|
})
|
||||||
|
|
||||||
|
namespaceName := "test-node-oos"
|
||||||
|
nodeName := "node-sandbox"
|
||||||
|
node := &v1.Node{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: nodeName,
|
||||||
|
Annotations: map[string]string{
|
||||||
|
util.ControllerManagedAttachAnnotation: "true",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t)
|
||||||
|
defer framework.DeleteNamespaceOrDie(testClient, ns, t)
|
||||||
|
|
||||||
|
_, err := testClient.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to created node : %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
pod := fakePodWithVol(namespaceName)
|
||||||
|
if _, err := testClient.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}); err != nil {
|
||||||
|
t.Errorf("Failed to create pod : %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// start controller loop
|
||||||
|
podInformer := informers.Core().V1().Pods().Informer()
|
||||||
|
informers.Start(tCtx.Done())
|
||||||
|
informers.WaitForCacheSync(tCtx.Done())
|
||||||
|
go ctrl.Run(tCtx)
|
||||||
|
go pvCtrl.Run(tCtx)
|
||||||
|
go podgcCtrl.Run(tCtx)
|
||||||
|
|
||||||
|
waitToObservePods(t, podInformer, 1)
|
||||||
|
// wait for volume to be attached
|
||||||
|
waitForVolumeToBeAttached(tCtx, t, testClient, pod.Name, nodeName)
|
||||||
|
|
||||||
|
// Patch the node to mark the volume in use as attach-detach controller verifies if safe to detach the volume
|
||||||
|
// based on that.
|
||||||
|
node.Status.VolumesInUse = append(node.Status.VolumesInUse, "kubernetes.io/mock-provisioner/fake-mount")
|
||||||
|
node, err = testClient.CoreV1().Nodes().UpdateStatus(context.TODO(), node, metav1.UpdateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error in patch volumeInUse status to nodes: %s", err)
|
||||||
|
}
|
||||||
|
// Delete the pod with grace period time so that it is stuck in terminating state
|
||||||
|
gracePeriod := int64(300)
|
||||||
|
err = testClient.CoreV1().Pods(namespaceName).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{
|
||||||
|
GracePeriodSeconds: &gracePeriod,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error in deleting pod: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// varify that DeletionTimestamp is not nil, means pod is in `Terminating` stsate
|
||||||
|
waitForPodDeletionTimestampToSet(tCtx, t, testClient, pod.Name, pod.Namespace)
|
||||||
|
|
||||||
|
// taint the node `out-of-service`
|
||||||
|
taint := v1.Taint{
|
||||||
|
Key: v1.TaintNodeOutOfService,
|
||||||
|
Effect: v1.TaintEffectNoExecute,
|
||||||
|
}
|
||||||
|
node.Spec.Taints = append(node.Spec.Taints, taint)
|
||||||
|
if _, err := testClient.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{}); err != nil {
|
||||||
|
t.Fatalf("error in patch oos taint to node: %v", err)
|
||||||
|
}
|
||||||
|
waitForNodeToBeTainted(tCtx, t, testClient, v1.TaintNodeOutOfService, nodeName)
|
||||||
|
|
||||||
|
// Verify if the pod was force deleted.
|
||||||
|
// When the node has out-of-service taint, and only if node is NotReady and pod is Terminating force delete will happen.
|
||||||
|
waitForMetric(tCtx, t, podgcmetrics.DeletingPodsTotal.WithLabelValues(namespaceName, podgcmetrics.PodGCReasonTerminatingOutOfService), 1, "terminating-pod-metric")
|
||||||
|
// verify the volume was force detached
|
||||||
|
// Note: Metrics are accumulating
|
||||||
|
waitForMetric(tCtx, t, metrics.ForceDetachMetricCounter.WithLabelValues(metrics.ForceDetachReasonOutOfService), 1, "detach-metric")
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
// Via integration test we can verify that if pod delete
|
// Via integration test we can verify that if pod delete
|
||||||
// event is somehow missed by AttachDetach controller - it still
|
// event is somehow missed by AttachDetach controller - it still
|
||||||
// gets cleaned up by Desired State of World populator.
|
// gets cleaned up by Desired State of World populator.
|
||||||
@ -157,7 +254,7 @@ func TestPodDeletionWithDswp(t *testing.T) {
|
|||||||
|
|
||||||
tCtx := ktesting.Init(t)
|
tCtx := ktesting.Init(t)
|
||||||
defer tCtx.Cancel("test has completed")
|
defer tCtx.Cancel("test has completed")
|
||||||
testClient, ctrl, pvCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, defaultTimerConfig)
|
testClient, ctrl, pvCtrl, podgcCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, defaultTimerConfig)
|
||||||
|
|
||||||
ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t)
|
ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t)
|
||||||
defer framework.DeleteNamespaceOrDie(testClient, ns, t)
|
defer framework.DeleteNamespaceOrDie(testClient, ns, t)
|
||||||
@ -184,6 +281,7 @@ func TestPodDeletionWithDswp(t *testing.T) {
|
|||||||
go ctrl.Run(tCtx)
|
go ctrl.Run(tCtx)
|
||||||
// Run pvCtrl to avoid leaking goroutines started during its creation.
|
// Run pvCtrl to avoid leaking goroutines started during its creation.
|
||||||
go pvCtrl.Run(tCtx)
|
go pvCtrl.Run(tCtx)
|
||||||
|
go podgcCtrl.Run(tCtx)
|
||||||
|
|
||||||
waitToObservePods(t, podInformer, 1)
|
waitToObservePods(t, podInformer, 1)
|
||||||
podKey, err := cache.MetaNamespaceKeyFunc(pod)
|
podKey, err := cache.MetaNamespaceKeyFunc(pod)
|
||||||
@ -214,7 +312,7 @@ func initCSIObjects(stopCh <-chan struct{}, informers clientgoinformers.SharedIn
|
|||||||
go informers.Storage().V1().CSIDrivers().Informer().Run(stopCh)
|
go informers.Storage().V1().CSIDrivers().Informer().Run(stopCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPodUpdateWithWithADC(t *testing.T) {
|
func TestPodUpdateWithADC(t *testing.T) {
|
||||||
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
|
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
|
||||||
server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd())
|
server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd())
|
||||||
defer server.TearDownFn()
|
defer server.TearDownFn()
|
||||||
@ -231,7 +329,7 @@ func TestPodUpdateWithWithADC(t *testing.T) {
|
|||||||
|
|
||||||
tCtx := ktesting.Init(t)
|
tCtx := ktesting.Init(t)
|
||||||
defer tCtx.Cancel("test has completed")
|
defer tCtx.Cancel("test has completed")
|
||||||
testClient, ctrl, pvCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, defaultTimerConfig)
|
testClient, ctrl, pvCtrl, podgcCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, defaultTimerConfig)
|
||||||
|
|
||||||
ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t)
|
ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t)
|
||||||
defer framework.DeleteNamespaceOrDie(testClient, ns, t)
|
defer framework.DeleteNamespaceOrDie(testClient, ns, t)
|
||||||
@ -261,6 +359,7 @@ func TestPodUpdateWithWithADC(t *testing.T) {
|
|||||||
go ctrl.Run(tCtx)
|
go ctrl.Run(tCtx)
|
||||||
// Run pvCtrl to avoid leaking goroutines started during its creation.
|
// Run pvCtrl to avoid leaking goroutines started during its creation.
|
||||||
go pvCtrl.Run(tCtx)
|
go pvCtrl.Run(tCtx)
|
||||||
|
go podgcCtrl.Run(tCtx)
|
||||||
|
|
||||||
waitToObservePods(t, podInformer, 1)
|
waitToObservePods(t, podInformer, 1)
|
||||||
podKey, err := cache.MetaNamespaceKeyFunc(pod)
|
podKey, err := cache.MetaNamespaceKeyFunc(pod)
|
||||||
@ -326,7 +425,7 @@ func waitForPodFuncInDSWP(t *testing.T, dswp volumecache.DesiredStateOfWorld, ch
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func createAdClients(ctx context.Context, t *testing.T, server *kubeapiservertesting.TestServer, syncPeriod time.Duration, timers attachdetach.TimerConfig) (*clientset.Clientset, attachdetach.AttachDetachController, *persistentvolume.PersistentVolumeController, clientgoinformers.SharedInformerFactory) {
|
func createAdClients(ctx context.Context, t *testing.T, server *kubeapiservertesting.TestServer, syncPeriod time.Duration, timers attachdetach.TimerConfig) (*clientset.Clientset, attachdetach.AttachDetachController, *persistentvolume.PersistentVolumeController, *podgc.PodGCController, clientgoinformers.SharedInformerFactory) {
|
||||||
config := restclient.CopyConfig(server.ClientConfig)
|
config := restclient.CopyConfig(server.ClientConfig)
|
||||||
config.QPS = 1000000
|
config.QPS = 1000000
|
||||||
config.Burst = 1000000
|
config.Burst = 1000000
|
||||||
@ -383,11 +482,20 @@ func createAdClients(ctx context.Context, t *testing.T, server *kubeapiservertes
|
|||||||
NodeInformer: informers.Core().V1().Nodes(),
|
NodeInformer: informers.Core().V1().Nodes(),
|
||||||
EnableDynamicProvisioning: false,
|
EnableDynamicProvisioning: false,
|
||||||
}
|
}
|
||||||
|
podgcCtrl := podgc.NewPodGCInternal(
|
||||||
|
ctx,
|
||||||
|
testClient,
|
||||||
|
informers.Core().V1().Pods(),
|
||||||
|
informers.Core().V1().Nodes(),
|
||||||
|
0,
|
||||||
|
500*time.Millisecond,
|
||||||
|
time.Second,
|
||||||
|
)
|
||||||
pvCtrl, err := persistentvolume.NewController(ctx, params)
|
pvCtrl, err := persistentvolume.NewController(ctx, params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to create PV controller: %v", err)
|
t.Fatalf("Failed to create PV controller: %v", err)
|
||||||
}
|
}
|
||||||
return testClient, ctrl, pvCtrl, informers
|
return testClient, ctrl, pvCtrl, podgcCtrl, informers
|
||||||
}
|
}
|
||||||
|
|
||||||
// Via integration test we can verify that if pod add
|
// Via integration test we can verify that if pod add
|
||||||
@ -410,7 +518,7 @@ func TestPodAddedByDswp(t *testing.T) {
|
|||||||
|
|
||||||
tCtx := ktesting.Init(t)
|
tCtx := ktesting.Init(t)
|
||||||
defer tCtx.Cancel("test has completed")
|
defer tCtx.Cancel("test has completed")
|
||||||
testClient, ctrl, pvCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, defaultTimerConfig)
|
testClient, ctrl, pvCtrl, podgcCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, defaultTimerConfig)
|
||||||
|
|
||||||
ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t)
|
ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t)
|
||||||
defer framework.DeleteNamespaceOrDie(testClient, ns, t)
|
defer framework.DeleteNamespaceOrDie(testClient, ns, t)
|
||||||
@ -439,6 +547,7 @@ func TestPodAddedByDswp(t *testing.T) {
|
|||||||
go ctrl.Run(tCtx)
|
go ctrl.Run(tCtx)
|
||||||
// Run pvCtrl to avoid leaking goroutines started during its creation.
|
// Run pvCtrl to avoid leaking goroutines started during its creation.
|
||||||
go pvCtrl.Run(tCtx)
|
go pvCtrl.Run(tCtx)
|
||||||
|
go podgcCtrl.Run(tCtx)
|
||||||
|
|
||||||
waitToObservePods(t, podInformer, 1)
|
waitToObservePods(t, podInformer, 1)
|
||||||
podKey, err := cache.MetaNamespaceKeyFunc(pod)
|
podKey, err := cache.MetaNamespaceKeyFunc(pod)
|
||||||
@ -480,7 +589,7 @@ func TestPVCBoundWithADC(t *testing.T) {
|
|||||||
|
|
||||||
namespaceName := "test-pod-deletion"
|
namespaceName := "test-pod-deletion"
|
||||||
|
|
||||||
testClient, ctrl, pvCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, attachdetach.TimerConfig{
|
testClient, ctrl, pvCtrl, podgcCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, attachdetach.TimerConfig{
|
||||||
ReconcilerLoopPeriod: 100 * time.Millisecond,
|
ReconcilerLoopPeriod: 100 * time.Millisecond,
|
||||||
ReconcilerMaxWaitForUnmountDuration: 6 * time.Second,
|
ReconcilerMaxWaitForUnmountDuration: 6 * time.Second,
|
||||||
DesiredStateOfWorldPopulatorLoopSleepPeriod: 24 * time.Hour,
|
DesiredStateOfWorldPopulatorLoopSleepPeriod: 24 * time.Hour,
|
||||||
@ -528,6 +637,7 @@ func TestPVCBoundWithADC(t *testing.T) {
|
|||||||
initCSIObjects(tCtx.Done(), informers)
|
initCSIObjects(tCtx.Done(), informers)
|
||||||
go ctrl.Run(tCtx)
|
go ctrl.Run(tCtx)
|
||||||
go pvCtrl.Run(tCtx)
|
go pvCtrl.Run(tCtx)
|
||||||
|
go podgcCtrl.Run(tCtx)
|
||||||
|
|
||||||
waitToObservePods(t, informers.Core().V1().Pods().Informer(), 4)
|
waitToObservePods(t, informers.Core().V1().Pods().Informer(), 4)
|
||||||
// Give attachdetach controller enough time to populate pods into DSWP.
|
// Give attachdetach controller enough time to populate pods into DSWP.
|
||||||
@ -561,3 +671,70 @@ func createPVForPVC(t *testing.T, testClient *clientset.Clientset, pvc *v1.Persi
|
|||||||
t.Errorf("Failed to create pv : %v", err)
|
t.Errorf("Failed to create pv : %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wait for DeletionTimestamp added to pod
|
||||||
|
func waitForPodDeletionTimestampToSet(tCtx context.Context, t *testing.T, testingClient *clientset.Clientset, podName, podNamespace string) {
|
||||||
|
if err := wait.PollUntilContextCancel(tCtx, 100*time.Millisecond, false, func(context.Context) (bool, error) {
|
||||||
|
pod, err := testingClient.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if pod.DeletionTimestamp != nil {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("Failed to get deletionTimestamp of pod: %s, namespace: %s", podName, podNamespace)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for VolumeAttach added to node
|
||||||
|
func waitForVolumeToBeAttached(ctx context.Context, t *testing.T, testingClient *clientset.Clientset, podName, nodeName string) {
|
||||||
|
if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 120*time.Second, false, func(context.Context) (bool, error) {
|
||||||
|
node, err := testingClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
|
||||||
|
if len(node.Status.VolumesAttached) >= 1 {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to get the node : %v", err)
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("Failed to attach volume to pod: %s for node: %s", podName, nodeName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for taint added to node
|
||||||
|
func waitForNodeToBeTainted(ctx context.Context, t *testing.T, testingClient *clientset.Clientset, taintKey, nodeName string) {
|
||||||
|
if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 60*time.Second, false, func(context.Context) (bool, error) {
|
||||||
|
node, err := testingClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
for _, taint := range node.Spec.Taints {
|
||||||
|
if taint.Key == taintKey {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("Failed to add taint: %s to node: %s", taintKey, nodeName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func waitForMetric(ctx context.Context, t *testing.T, m basemetric.CounterMetric, expectedCount float64, identifier string) {
|
||||||
|
if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 60*time.Second, false, func(ctx context.Context) (done bool, err error) {
|
||||||
|
gotCount, err := metricstestutil.GetCounterMetricValue(m)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err, identifier)
|
||||||
|
}
|
||||||
|
t.Logf("expected metric count %g but got %g for %s", expectedCount, gotCount, identifier)
|
||||||
|
// As metrics are global, this condition ( >= ) is applied, just to check the minimum expectation.
|
||||||
|
if gotCount >= expectedCount {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("Failed to match the count of metrics to expect: %v", expectedCount)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user