diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 81e9ce4c4cd..01a723987f8 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -931,6 +931,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, shutdownManager, shutdownAdmitHandler := nodeshutdown.NewManager(&nodeshutdown.Config{ Logger: logger, ProbeManager: klet.probeManager, + VolumeManager: klet.volumeManager, Recorder: kubeDeps.Recorder, NodeRef: nodeRef, GetPodsFunc: klet.GetActivePods, diff --git a/pkg/kubelet/kubelet_node_status_test.go b/pkg/kubelet/kubelet_node_status_test.go index 14c7c8bd406..64eca034457 100644 --- a/pkg/kubelet/kubelet_node_status_test.go +++ b/pkg/kubelet/kubelet_node_status_test.go @@ -1128,7 +1128,7 @@ func TestUpdateNodeStatusAndVolumesInUseWithNodeLease(t *testing.T) { kubelet.setCachedMachineInfo(&cadvisorapi.MachineInfo{}) // override test volumeManager - fakeVolumeManager := kubeletvolume.NewFakeVolumeManager(tc.existingVolumes) + fakeVolumeManager := kubeletvolume.NewFakeVolumeManager(tc.existingVolumes, 0, nil) kubelet.volumeManager = fakeVolumeManager // Only test VolumesInUse setter diff --git a/pkg/kubelet/nodeshutdown/nodeshutdown_manager.go b/pkg/kubelet/nodeshutdown/nodeshutdown_manager.go index aac590f967d..0b5a5a244c6 100644 --- a/pkg/kubelet/nodeshutdown/nodeshutdown_manager.go +++ b/pkg/kubelet/nodeshutdown/nodeshutdown_manager.go @@ -26,6 +26,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/eviction" "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/prober" + "k8s.io/kubernetes/pkg/kubelet/volumemanager" "k8s.io/utils/clock" ) @@ -40,6 +41,7 @@ type Manager interface { type Config struct { Logger klog.Logger ProbeManager prober.Manager + VolumeManager volumemanager.VolumeManager Recorder record.EventRecorder NodeRef *v1.ObjectReference GetPodsFunc eviction.ActivePodsFunc diff --git a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go index a5fc6f9583f..e56d03155c2 100644 --- a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go +++ b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go @@ -21,6 +21,7 @@ limitations under the License. package nodeshutdown import ( + "context" "fmt" "path/filepath" "sort" @@ -41,6 +42,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/nodeshutdown/systemd" "k8s.io/kubernetes/pkg/kubelet/prober" + "k8s.io/kubernetes/pkg/kubelet/volumemanager" "k8s.io/utils/clock" ) @@ -73,6 +75,8 @@ type managerImpl struct { nodeRef *v1.ObjectReference probeManager prober.Manager + volumeManager volumemanager.VolumeManager + shutdownGracePeriodByPodPriority []kubeletconfig.ShutdownGracePeriodByPodPriority getPods eviction.ActivePodsFunc @@ -123,6 +127,7 @@ func NewManager(conf *Config) (Manager, lifecycle.PodAdmitHandler) { logger: conf.Logger, probeManager: conf.ProbeManager, recorder: conf.Recorder, + volumeManager: conf.VolumeManager, nodeRef: conf.NodeRef, getPods: conf.GetPodsFunc, killPodFunc: conf.KillPodFunc, @@ -395,19 +400,44 @@ func (m *managerImpl) processShutdownEvent() error { }(pod, group) } + // This duration determines how long the shutdown manager will wait for the pods in this group + // to terminate before proceeding to the next group. + var groupTerminationWaitDuration = time.Duration(group.ShutdownGracePeriodSeconds) * time.Second var ( - doneCh = make(chan struct{}) - timer = m.clock.NewTimer(time.Duration(group.ShutdownGracePeriodSeconds) * time.Second) + doneCh = make(chan struct{}) + timer = m.clock.NewTimer(groupTerminationWaitDuration) + ctx, ctxCancel = context.WithTimeout(context.Background(), groupTerminationWaitDuration) ) go func() { defer close(doneCh) + defer ctxCancel() wg.Wait() + // The signal to kill a Pod was sent successfully to all the pods, + // let's wait until all the volumes are unmounted from all the pods before + // continuing to the next group. This is done so that the CSI Driver (assuming + // that it's part of the highest group) has a chance to perform unmounts. + if err := m.volumeManager.WaitForAllPodsUnmount(ctx, group.Pods); err != nil { + var podIdentifiers []string + for _, pod := range group.Pods { + podIdentifiers = append(podIdentifiers, fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)) + } + + // Waiting for volume teardown is done on a best basis effort, + // report an error and continue. + // + // Depending on the user provided kubelet configuration value + // either the `timer` will tick and we'll continue to shutdown the next group, or, + // WaitForAllPodsUnmount will timeout, therefore this goroutine + // will close doneCh and we'll continue to shutdown the next group. + m.logger.Error(err, "Failed while waiting for all the volumes belonging to Pods in this group to unmount", "pods", podIdentifiers) + } }() select { case <-doneCh: timer.Stop() case <-timer.C(): + ctxCancel() m.logger.V(1).Info("Shutdown manager pod killing time out", "gracePeriod", group.ShutdownGracePeriodSeconds, "priority", group.Priority) } } diff --git a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux_test.go b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux_test.go index 65bfe014c84..1566153b9e8 100644 --- a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux_test.go +++ b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux_test.go @@ -30,6 +30,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -45,6 +46,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/nodeshutdown/systemd" "k8s.io/kubernetes/pkg/kubelet/prober" probetest "k8s.io/kubernetes/pkg/kubelet/prober/testing" + "k8s.io/kubernetes/pkg/kubelet/volumemanager" "k8s.io/utils/clock" testingclock "k8s.io/utils/clock/testing" ) @@ -348,10 +350,12 @@ func TestManager(t *testing.T) { proberManager := probetest.FakeManager{} fakeRecorder := &record.FakeRecorder{} + fakeVolumeManager := volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil) nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""} manager, _ := NewManager(&Config{ Logger: logger, ProbeManager: proberManager, + VolumeManager: fakeVolumeManager, Recorder: fakeRecorder, NodeRef: nodeRef, GetPodsFunc: activePodsFunc, @@ -452,11 +456,13 @@ func TestFeatureEnabled(t *testing.T) { proberManager := probetest.FakeManager{} fakeRecorder := &record.FakeRecorder{} + fakeVolumeManager := volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil) nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""} manager, _ := NewManager(&Config{ Logger: logger, ProbeManager: proberManager, + VolumeManager: fakeVolumeManager, Recorder: fakeRecorder, NodeRef: nodeRef, GetPodsFunc: activePodsFunc, @@ -509,10 +515,12 @@ func TestRestart(t *testing.T) { proberManager := probetest.FakeManager{} fakeRecorder := &record.FakeRecorder{} + fakeVolumeManager := volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil) nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""} manager, _ := NewManager(&Config{ Logger: logger, ProbeManager: proberManager, + VolumeManager: fakeVolumeManager, Recorder: fakeRecorder, NodeRef: nodeRef, GetPodsFunc: activePodsFunc, @@ -738,17 +746,19 @@ func Test_groupByPriority(t *testing.T) { func Test_managerImpl_processShutdownEvent(t *testing.T) { var ( - probeManager = probetest.FakeManager{} - fakeRecorder = &record.FakeRecorder{} - syncNodeStatus = func() {} - nodeRef = &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""} - fakeclock = testingclock.NewFakeClock(time.Now()) + probeManager = probetest.FakeManager{} + fakeRecorder = &record.FakeRecorder{} + fakeVolumeManager = volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil) + syncNodeStatus = func() {} + nodeRef = &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""} + fakeclock = testingclock.NewFakeClock(time.Now()) ) type fields struct { recorder record.EventRecorder nodeRef *v1.ObjectReference probeManager prober.Manager + volumeManager volumemanager.VolumeManager shutdownGracePeriodByPodPriority []kubeletconfig.ShutdownGracePeriodByPodPriority getPods eviction.ActivePodsFunc killPodFunc eviction.KillPodFunc @@ -767,9 +777,10 @@ func Test_managerImpl_processShutdownEvent(t *testing.T) { { name: "kill pod func take too long", fields: fields{ - recorder: fakeRecorder, - nodeRef: nodeRef, - probeManager: probeManager, + recorder: fakeRecorder, + nodeRef: nodeRef, + probeManager: probeManager, + volumeManager: fakeVolumeManager, shutdownGracePeriodByPodPriority: []kubeletconfig.ShutdownGracePeriodByPodPriority{ { Priority: 1, @@ -808,6 +819,7 @@ func Test_managerImpl_processShutdownEvent(t *testing.T) { ) m := &managerImpl{ logger: logger, + volumeManager: tt.fields.volumeManager, recorder: tt.fields.recorder, nodeRef: tt.fields.nodeRef, probeManager: tt.fields.probeManager, @@ -839,3 +851,65 @@ func Test_managerImpl_processShutdownEvent(t *testing.T) { }) } } + +func Test_processShutdownEvent_VolumeUnmountTimeout(t *testing.T) { + var ( + probeManager = probetest.FakeManager{} + fakeRecorder = &record.FakeRecorder{} + syncNodeStatus = func() {} + nodeRef = &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""} + fakeclock = testingclock.NewFakeClock(time.Now()) + shutdownGracePeriodSeconds = 2 + ) + + fakeVolumeManager := volumemanager.NewFakeVolumeManager( + []v1.UniqueVolumeName{}, + 3*time.Second, // This value is intentionally longer than the shutdownGracePeriodSeconds (2s) to test the behavior + // for volume unmount operations that take longer than the allowed grace period. + fmt.Errorf("unmount timeout"), + ) + logger := ktesting.NewLogger(t, ktesting.NewConfig(ktesting.BufferLogs(true))) + m := &managerImpl{ + logger: logger, + volumeManager: fakeVolumeManager, + recorder: fakeRecorder, + nodeRef: nodeRef, + probeManager: probeManager, + shutdownGracePeriodByPodPriority: []kubeletconfig.ShutdownGracePeriodByPodPriority{ + { + Priority: 1, + ShutdownGracePeriodSeconds: int64(shutdownGracePeriodSeconds), + }, + }, + getPods: func() []*v1.Pod { + return []*v1.Pod{ + makePod("test-pod", 1, nil), + } + }, + killPodFunc: func(pod *v1.Pod, isEvicted bool, gracePeriodOverride *int64, fn func(*v1.PodStatus)) error { + return nil + }, + syncNodeStatus: syncNodeStatus, + dbusCon: &fakeDbus{}, + clock: fakeclock, + } + + start := fakeclock.Now() + err := m.processShutdownEvent() + end := fakeclock.Now() + + require.NoError(t, err, "managerImpl.processShutdownEvent() should not return an error") + + // Check if processShutdownEvent completed within the expected time + actualDuration := int(end.Sub(start).Seconds()) + assert.LessOrEqual(t, actualDuration, shutdownGracePeriodSeconds, "processShutdownEvent took too long") + + underlier, ok := logger.GetSink().(ktesting.Underlier) + if !ok { + t.Fatalf("Should have had a ktesting LogSink, got %T", logger.GetSink()) + } + + log := underlier.GetBuffer().String() + expectedLogMessage := "Failed while waiting for all the volumes belonging to Pods in this group to unmount" + assert.Contains(t, log, expectedLogMessage, "Expected log message not found") +} diff --git a/pkg/kubelet/volumemanager/volume_manager.go b/pkg/kubelet/volumemanager/volume_manager.go index 4c2f8eedb9c..abe2fd14e5b 100644 --- a/pkg/kubelet/volumemanager/volume_manager.go +++ b/pkg/kubelet/volumemanager/volume_manager.go @@ -23,6 +23,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -31,6 +32,7 @@ import ( v1 "k8s.io/api/core/v1" k8stypes "k8s.io/apimachinery/pkg/types" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" @@ -106,6 +108,12 @@ type VolumeManager interface { // the duration defined in podAttachAndMountTimeout. WaitForUnmount(ctx context.Context, pod *v1.Pod) error + // WaitForAllPodsUnmount is a version of WaitForUnmount that blocks and + // waits until all the volumes belonging to all the pods are unmounted. + // An error is returned if there's at least one Pod with volumes not unmounted + // within the duration defined in podAttachAndMountTimeout. + WaitForAllPodsUnmount(ctx context.Context, pods []*v1.Pod) error + // GetMountedVolumesForPod returns a VolumeMap containing the volumes // referenced by the specified pod that are successfully attached and // mounted. The key in the map is the OuterVolumeSpecName (i.e. @@ -479,6 +487,24 @@ func (vm *volumeManager) WaitForUnmount(ctx context.Context, pod *v1.Pod) error return nil } +func (vm *volumeManager) WaitForAllPodsUnmount(ctx context.Context, pods []*v1.Pod) error { + var ( + errors []error + wg sync.WaitGroup + ) + wg.Add(len(pods)) + for _, pod := range pods { + go func(pod *v1.Pod) { + defer wg.Done() + if err := vm.WaitForUnmount(ctx, pod); err != nil { + errors = append(errors, err) + } + }(pod) + } + wg.Wait() + return utilerrors.NewAggregate(errors) +} + func (vm *volumeManager) getVolumesNotInDSW(uniquePodName types.UniquePodName, expectedVolumes []string) []string { volumesNotInDSW := sets.New[string](expectedVolumes...) diff --git a/pkg/kubelet/volumemanager/volume_manager_fake.go b/pkg/kubelet/volumemanager/volume_manager_fake.go index d7d4e1b22ca..02c424c3801 100644 --- a/pkg/kubelet/volumemanager/volume_manager_fake.go +++ b/pkg/kubelet/volumemanager/volume_manager_fake.go @@ -18,6 +18,7 @@ package volumemanager import ( "context" + "time" v1 "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/kubelet/config" @@ -29,10 +30,14 @@ import ( type FakeVolumeManager struct { volumes map[v1.UniqueVolumeName]bool reportedInUse map[v1.UniqueVolumeName]bool + unmountDelay time.Duration + unmountError error } +var _ VolumeManager = &FakeVolumeManager{} + // NewFakeVolumeManager creates a new VolumeManager test instance -func NewFakeVolumeManager(initialVolumes []v1.UniqueVolumeName) *FakeVolumeManager { +func NewFakeVolumeManager(initialVolumes []v1.UniqueVolumeName, unmountDelay time.Duration, unmountError error) *FakeVolumeManager { volumes := map[v1.UniqueVolumeName]bool{} for _, v := range initialVolumes { volumes[v] = true @@ -40,6 +45,8 @@ func NewFakeVolumeManager(initialVolumes []v1.UniqueVolumeName) *FakeVolumeManag return &FakeVolumeManager{ volumes: volumes, reportedInUse: map[v1.UniqueVolumeName]bool{}, + unmountDelay: unmountDelay, + unmountError: unmountError, } } @@ -57,6 +64,15 @@ func (f *FakeVolumeManager) WaitForUnmount(ctx context.Context, pod *v1.Pod) err return nil } +func (f *FakeVolumeManager) WaitForAllPodsUnmount(ctx context.Context, pods []*v1.Pod) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(f.unmountDelay): + return f.unmountError + } +} + // GetMountedVolumesForPod is not implemented func (f *FakeVolumeManager) GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap { return nil diff --git a/pkg/kubelet/volumemanager/volume_manager_test.go b/pkg/kubelet/volumemanager/volume_manager_test.go index 8c8a4372936..e46649b4307 100644 --- a/pkg/kubelet/volumemanager/volume_manager_test.go +++ b/pkg/kubelet/volumemanager/volume_manager_test.go @@ -25,6 +25,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kubetypes "k8s.io/apimachinery/pkg/types" @@ -86,7 +87,11 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) { if err != nil { t.Fatalf("can't make a temp dir: %v", err) } - defer os.RemoveAll(tmpDir) + defer func() { + if err := os.RemoveAll(tmpDir); err != nil { + t.Fatalf("failed to remove temp dir: %v", err) + } + }() podManager := kubepod.NewBasicPodManager() node, pod, pv, claim := createObjects(test.pvMode, test.podMode) @@ -545,3 +550,65 @@ func delayClaimBecomesBound( } kubeClient.CoreV1().PersistentVolumeClaims(namespace).Update(context.TODO(), volumeClaim, metav1.UpdateOptions{}) } + +func TestWaitForAllPodsUnmount(t *testing.T) { + tmpDir, err := utiltesting.MkTmpdir("volumeManagerTest") + require.NoError(t, err, "Failed to create temp directory") + defer func() { + if err := os.RemoveAll(tmpDir); err != nil { + t.Errorf("Failed to remove temp directory: %v", err) + } + }() + + tests := []struct { + name string + podMode v1.PersistentVolumeMode + expectedError bool + }{ + { + name: "successful unmount", + podMode: "", + expectedError: false, + }, + { + name: "timeout waiting for unmount", + podMode: v1.PersistentVolumeFilesystem, + expectedError: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + podManager := kubepod.NewBasicPodManager() + + node, pod, pv, claim := createObjects(test.podMode, test.podMode) + kubeClient := fake.NewSimpleClientset(node, pod, pv, claim) + + manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient, node) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + sourcesReady := config.NewSourcesReady(func(_ sets.Set[string]) bool { return true }) + go manager.Run(ctx, sourcesReady) + + podManager.SetPods([]*v1.Pod{pod}) + + go simulateVolumeInUseUpdate( + v1.UniqueVolumeName(node.Status.VolumesAttached[0].Name), + ctx.Done(), + manager) + + err := manager.WaitForAttachAndMount(context.Background(), pod) + require.NoError(t, err, "Failed to wait for attach and mount") + + err = manager.WaitForAllPodsUnmount(ctx, []*v1.Pod{pod}) + + if test.expectedError { + require.Error(t, err, "Expected error due to timeout") + require.Contains(t, err.Error(), "context deadline exceeded", "Expected deadline exceeded error") + } else { + require.NoError(t, err, "Expected no error") + } + }) + } +}