Merge pull request #125070 from torredil/kublet-vm-race

Ensure volumes are unmounted during graceful node shutdown
This commit is contained in:
Kubernetes Prow Robot 2024-10-02 00:33:48 +01:00 committed by GitHub
commit e2c17c09a4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 229 additions and 13 deletions

View File

@ -931,6 +931,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
shutdownManager, shutdownAdmitHandler := nodeshutdown.NewManager(&nodeshutdown.Config{
Logger: logger,
ProbeManager: klet.probeManager,
VolumeManager: klet.volumeManager,
Recorder: kubeDeps.Recorder,
NodeRef: nodeRef,
GetPodsFunc: klet.GetActivePods,

View File

@ -1128,7 +1128,7 @@ func TestUpdateNodeStatusAndVolumesInUseWithNodeLease(t *testing.T) {
kubelet.setCachedMachineInfo(&cadvisorapi.MachineInfo{})
// override test volumeManager
fakeVolumeManager := kubeletvolume.NewFakeVolumeManager(tc.existingVolumes)
fakeVolumeManager := kubeletvolume.NewFakeVolumeManager(tc.existingVolumes, 0, nil)
kubelet.volumeManager = fakeVolumeManager
// Only test VolumesInUse setter

View File

@ -26,6 +26,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/eviction"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/prober"
"k8s.io/kubernetes/pkg/kubelet/volumemanager"
"k8s.io/utils/clock"
)
@ -40,6 +41,7 @@ type Manager interface {
type Config struct {
Logger klog.Logger
ProbeManager prober.Manager
VolumeManager volumemanager.VolumeManager
Recorder record.EventRecorder
NodeRef *v1.ObjectReference
GetPodsFunc eviction.ActivePodsFunc

View File

@ -21,6 +21,7 @@ limitations under the License.
package nodeshutdown
import (
"context"
"fmt"
"path/filepath"
"sort"
@ -41,6 +42,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/nodeshutdown/systemd"
"k8s.io/kubernetes/pkg/kubelet/prober"
"k8s.io/kubernetes/pkg/kubelet/volumemanager"
"k8s.io/utils/clock"
)
@ -73,6 +75,8 @@ type managerImpl struct {
nodeRef *v1.ObjectReference
probeManager prober.Manager
volumeManager volumemanager.VolumeManager
shutdownGracePeriodByPodPriority []kubeletconfig.ShutdownGracePeriodByPodPriority
getPods eviction.ActivePodsFunc
@ -123,6 +127,7 @@ func NewManager(conf *Config) (Manager, lifecycle.PodAdmitHandler) {
logger: conf.Logger,
probeManager: conf.ProbeManager,
recorder: conf.Recorder,
volumeManager: conf.VolumeManager,
nodeRef: conf.NodeRef,
getPods: conf.GetPodsFunc,
killPodFunc: conf.KillPodFunc,
@ -395,19 +400,44 @@ func (m *managerImpl) processShutdownEvent() error {
}(pod, group)
}
// This duration determines how long the shutdown manager will wait for the pods in this group
// to terminate before proceeding to the next group.
var groupTerminationWaitDuration = time.Duration(group.ShutdownGracePeriodSeconds) * time.Second
var (
doneCh = make(chan struct{})
timer = m.clock.NewTimer(time.Duration(group.ShutdownGracePeriodSeconds) * time.Second)
doneCh = make(chan struct{})
timer = m.clock.NewTimer(groupTerminationWaitDuration)
ctx, ctxCancel = context.WithTimeout(context.Background(), groupTerminationWaitDuration)
)
go func() {
defer close(doneCh)
defer ctxCancel()
wg.Wait()
// The signal to kill a Pod was sent successfully to all the pods,
// let's wait until all the volumes are unmounted from all the pods before
// continuing to the next group. This is done so that the CSI Driver (assuming
// that it's part of the highest group) has a chance to perform unmounts.
if err := m.volumeManager.WaitForAllPodsUnmount(ctx, group.Pods); err != nil {
var podIdentifiers []string
for _, pod := range group.Pods {
podIdentifiers = append(podIdentifiers, fmt.Sprintf("%s/%s", pod.Namespace, pod.Name))
}
// Waiting for volume teardown is done on a best basis effort,
// report an error and continue.
//
// Depending on the user provided kubelet configuration value
// either the `timer` will tick and we'll continue to shutdown the next group, or,
// WaitForAllPodsUnmount will timeout, therefore this goroutine
// will close doneCh and we'll continue to shutdown the next group.
m.logger.Error(err, "Failed while waiting for all the volumes belonging to Pods in this group to unmount", "pods", podIdentifiers)
}
}()
select {
case <-doneCh:
timer.Stop()
case <-timer.C():
ctxCancel()
m.logger.V(1).Info("Shutdown manager pod killing time out", "gracePeriod", group.ShutdownGracePeriodSeconds, "priority", group.Priority)
}
}

View File

@ -30,6 +30,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
@ -45,6 +46,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/nodeshutdown/systemd"
"k8s.io/kubernetes/pkg/kubelet/prober"
probetest "k8s.io/kubernetes/pkg/kubelet/prober/testing"
"k8s.io/kubernetes/pkg/kubelet/volumemanager"
"k8s.io/utils/clock"
testingclock "k8s.io/utils/clock/testing"
)
@ -348,10 +350,12 @@ func TestManager(t *testing.T) {
proberManager := probetest.FakeManager{}
fakeRecorder := &record.FakeRecorder{}
fakeVolumeManager := volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil)
nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
manager, _ := NewManager(&Config{
Logger: logger,
ProbeManager: proberManager,
VolumeManager: fakeVolumeManager,
Recorder: fakeRecorder,
NodeRef: nodeRef,
GetPodsFunc: activePodsFunc,
@ -452,11 +456,13 @@ func TestFeatureEnabled(t *testing.T) {
proberManager := probetest.FakeManager{}
fakeRecorder := &record.FakeRecorder{}
fakeVolumeManager := volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil)
nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
manager, _ := NewManager(&Config{
Logger: logger,
ProbeManager: proberManager,
VolumeManager: fakeVolumeManager,
Recorder: fakeRecorder,
NodeRef: nodeRef,
GetPodsFunc: activePodsFunc,
@ -509,10 +515,12 @@ func TestRestart(t *testing.T) {
proberManager := probetest.FakeManager{}
fakeRecorder := &record.FakeRecorder{}
fakeVolumeManager := volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil)
nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
manager, _ := NewManager(&Config{
Logger: logger,
ProbeManager: proberManager,
VolumeManager: fakeVolumeManager,
Recorder: fakeRecorder,
NodeRef: nodeRef,
GetPodsFunc: activePodsFunc,
@ -738,17 +746,19 @@ func Test_groupByPriority(t *testing.T) {
func Test_managerImpl_processShutdownEvent(t *testing.T) {
var (
probeManager = probetest.FakeManager{}
fakeRecorder = &record.FakeRecorder{}
syncNodeStatus = func() {}
nodeRef = &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
fakeclock = testingclock.NewFakeClock(time.Now())
probeManager = probetest.FakeManager{}
fakeRecorder = &record.FakeRecorder{}
fakeVolumeManager = volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil)
syncNodeStatus = func() {}
nodeRef = &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
fakeclock = testingclock.NewFakeClock(time.Now())
)
type fields struct {
recorder record.EventRecorder
nodeRef *v1.ObjectReference
probeManager prober.Manager
volumeManager volumemanager.VolumeManager
shutdownGracePeriodByPodPriority []kubeletconfig.ShutdownGracePeriodByPodPriority
getPods eviction.ActivePodsFunc
killPodFunc eviction.KillPodFunc
@ -767,9 +777,10 @@ func Test_managerImpl_processShutdownEvent(t *testing.T) {
{
name: "kill pod func take too long",
fields: fields{
recorder: fakeRecorder,
nodeRef: nodeRef,
probeManager: probeManager,
recorder: fakeRecorder,
nodeRef: nodeRef,
probeManager: probeManager,
volumeManager: fakeVolumeManager,
shutdownGracePeriodByPodPriority: []kubeletconfig.ShutdownGracePeriodByPodPriority{
{
Priority: 1,
@ -808,6 +819,7 @@ func Test_managerImpl_processShutdownEvent(t *testing.T) {
)
m := &managerImpl{
logger: logger,
volumeManager: tt.fields.volumeManager,
recorder: tt.fields.recorder,
nodeRef: tt.fields.nodeRef,
probeManager: tt.fields.probeManager,
@ -839,3 +851,65 @@ func Test_managerImpl_processShutdownEvent(t *testing.T) {
})
}
}
func Test_processShutdownEvent_VolumeUnmountTimeout(t *testing.T) {
var (
probeManager = probetest.FakeManager{}
fakeRecorder = &record.FakeRecorder{}
syncNodeStatus = func() {}
nodeRef = &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
fakeclock = testingclock.NewFakeClock(time.Now())
shutdownGracePeriodSeconds = 2
)
fakeVolumeManager := volumemanager.NewFakeVolumeManager(
[]v1.UniqueVolumeName{},
3*time.Second, // This value is intentionally longer than the shutdownGracePeriodSeconds (2s) to test the behavior
// for volume unmount operations that take longer than the allowed grace period.
fmt.Errorf("unmount timeout"),
)
logger := ktesting.NewLogger(t, ktesting.NewConfig(ktesting.BufferLogs(true)))
m := &managerImpl{
logger: logger,
volumeManager: fakeVolumeManager,
recorder: fakeRecorder,
nodeRef: nodeRef,
probeManager: probeManager,
shutdownGracePeriodByPodPriority: []kubeletconfig.ShutdownGracePeriodByPodPriority{
{
Priority: 1,
ShutdownGracePeriodSeconds: int64(shutdownGracePeriodSeconds),
},
},
getPods: func() []*v1.Pod {
return []*v1.Pod{
makePod("test-pod", 1, nil),
}
},
killPodFunc: func(pod *v1.Pod, isEvicted bool, gracePeriodOverride *int64, fn func(*v1.PodStatus)) error {
return nil
},
syncNodeStatus: syncNodeStatus,
dbusCon: &fakeDbus{},
clock: fakeclock,
}
start := fakeclock.Now()
err := m.processShutdownEvent()
end := fakeclock.Now()
require.NoError(t, err, "managerImpl.processShutdownEvent() should not return an error")
// Check if processShutdownEvent completed within the expected time
actualDuration := int(end.Sub(start).Seconds())
assert.LessOrEqual(t, actualDuration, shutdownGracePeriodSeconds, "processShutdownEvent took too long")
underlier, ok := logger.GetSink().(ktesting.Underlier)
if !ok {
t.Fatalf("Should have had a ktesting LogSink, got %T", logger.GetSink())
}
log := underlier.GetBuffer().String()
expectedLogMessage := "Failed while waiting for all the volumes belonging to Pods in this group to unmount"
assert.Contains(t, log, expectedLogMessage, "Expected log message not found")
}

View File

@ -23,6 +23,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"
utilfeature "k8s.io/apiserver/pkg/util/feature"
@ -31,6 +32,7 @@ import (
v1 "k8s.io/api/core/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
@ -106,6 +108,12 @@ type VolumeManager interface {
// the duration defined in podAttachAndMountTimeout.
WaitForUnmount(ctx context.Context, pod *v1.Pod) error
// WaitForAllPodsUnmount is a version of WaitForUnmount that blocks and
// waits until all the volumes belonging to all the pods are unmounted.
// An error is returned if there's at least one Pod with volumes not unmounted
// within the duration defined in podAttachAndMountTimeout.
WaitForAllPodsUnmount(ctx context.Context, pods []*v1.Pod) error
// GetMountedVolumesForPod returns a VolumeMap containing the volumes
// referenced by the specified pod that are successfully attached and
// mounted. The key in the map is the OuterVolumeSpecName (i.e.
@ -479,6 +487,24 @@ func (vm *volumeManager) WaitForUnmount(ctx context.Context, pod *v1.Pod) error
return nil
}
func (vm *volumeManager) WaitForAllPodsUnmount(ctx context.Context, pods []*v1.Pod) error {
var (
errors []error
wg sync.WaitGroup
)
wg.Add(len(pods))
for _, pod := range pods {
go func(pod *v1.Pod) {
defer wg.Done()
if err := vm.WaitForUnmount(ctx, pod); err != nil {
errors = append(errors, err)
}
}(pod)
}
wg.Wait()
return utilerrors.NewAggregate(errors)
}
func (vm *volumeManager) getVolumesNotInDSW(uniquePodName types.UniquePodName, expectedVolumes []string) []string {
volumesNotInDSW := sets.New[string](expectedVolumes...)

View File

@ -18,6 +18,7 @@ package volumemanager
import (
"context"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/kubelet/config"
@ -29,10 +30,14 @@ import (
type FakeVolumeManager struct {
volumes map[v1.UniqueVolumeName]bool
reportedInUse map[v1.UniqueVolumeName]bool
unmountDelay time.Duration
unmountError error
}
var _ VolumeManager = &FakeVolumeManager{}
// NewFakeVolumeManager creates a new VolumeManager test instance
func NewFakeVolumeManager(initialVolumes []v1.UniqueVolumeName) *FakeVolumeManager {
func NewFakeVolumeManager(initialVolumes []v1.UniqueVolumeName, unmountDelay time.Duration, unmountError error) *FakeVolumeManager {
volumes := map[v1.UniqueVolumeName]bool{}
for _, v := range initialVolumes {
volumes[v] = true
@ -40,6 +45,8 @@ func NewFakeVolumeManager(initialVolumes []v1.UniqueVolumeName) *FakeVolumeManag
return &FakeVolumeManager{
volumes: volumes,
reportedInUse: map[v1.UniqueVolumeName]bool{},
unmountDelay: unmountDelay,
unmountError: unmountError,
}
}
@ -57,6 +64,15 @@ func (f *FakeVolumeManager) WaitForUnmount(ctx context.Context, pod *v1.Pod) err
return nil
}
func (f *FakeVolumeManager) WaitForAllPodsUnmount(ctx context.Context, pods []*v1.Pod) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(f.unmountDelay):
return f.unmountError
}
}
// GetMountedVolumesForPod is not implemented
func (f *FakeVolumeManager) GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap {
return nil

View File

@ -25,6 +25,7 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubetypes "k8s.io/apimachinery/pkg/types"
@ -86,7 +87,11 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) {
if err != nil {
t.Fatalf("can't make a temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
defer func() {
if err := os.RemoveAll(tmpDir); err != nil {
t.Fatalf("failed to remove temp dir: %v", err)
}
}()
podManager := kubepod.NewBasicPodManager()
node, pod, pv, claim := createObjects(test.pvMode, test.podMode)
@ -545,3 +550,65 @@ func delayClaimBecomesBound(
}
kubeClient.CoreV1().PersistentVolumeClaims(namespace).Update(context.TODO(), volumeClaim, metav1.UpdateOptions{})
}
func TestWaitForAllPodsUnmount(t *testing.T) {
tmpDir, err := utiltesting.MkTmpdir("volumeManagerTest")
require.NoError(t, err, "Failed to create temp directory")
defer func() {
if err := os.RemoveAll(tmpDir); err != nil {
t.Errorf("Failed to remove temp directory: %v", err)
}
}()
tests := []struct {
name string
podMode v1.PersistentVolumeMode
expectedError bool
}{
{
name: "successful unmount",
podMode: "",
expectedError: false,
},
{
name: "timeout waiting for unmount",
podMode: v1.PersistentVolumeFilesystem,
expectedError: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
podManager := kubepod.NewBasicPodManager()
node, pod, pv, claim := createObjects(test.podMode, test.podMode)
kubeClient := fake.NewSimpleClientset(node, pod, pv, claim)
manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient, node)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
sourcesReady := config.NewSourcesReady(func(_ sets.Set[string]) bool { return true })
go manager.Run(ctx, sourcesReady)
podManager.SetPods([]*v1.Pod{pod})
go simulateVolumeInUseUpdate(
v1.UniqueVolumeName(node.Status.VolumesAttached[0].Name),
ctx.Done(),
manager)
err := manager.WaitForAttachAndMount(context.Background(), pod)
require.NoError(t, err, "Failed to wait for attach and mount")
err = manager.WaitForAllPodsUnmount(ctx, []*v1.Pod{pod})
if test.expectedError {
require.Error(t, err, "Expected error due to timeout")
require.Contains(t, err.Error(), "context deadline exceeded", "Expected deadline exceeded error")
} else {
require.NoError(t, err, "Expected no error")
}
})
}
}