diff --git a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go index 1e857528f6a..a7d62048f57 100644 --- a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go +++ b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go @@ -330,15 +330,19 @@ func (m *managerImpl) processShutdownEvent() error { }(pod, group) } - c := make(chan struct{}) + var ( + doneCh = make(chan struct{}) + timer = m.clock.NewTimer(time.Duration(group.ShutdownGracePeriodSeconds) * time.Second) + ) go func() { - defer close(c) + defer close(doneCh) wg.Wait() }() select { - case <-c: - case <-time.After(time.Duration(group.ShutdownGracePeriodSeconds) * time.Second): + case <-doneCh: + timer.Stop() + case <-timer.C(): klog.V(1).InfoS("Shutdown manager pod killing time out", "gracePeriod", group.ShutdownGracePeriodSeconds, "priority", group.Priority) } } diff --git a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux_test.go b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux_test.go index 028364c2367..0ed27932541 100644 --- a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux_test.go +++ b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux_test.go @@ -20,6 +20,7 @@ limitations under the License. package nodeshutdown import ( + "bytes" "fmt" "strings" "sync" @@ -33,11 +34,15 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/record" featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/apis/scheduling" pkgfeatures "k8s.io/kubernetes/pkg/features" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" + "k8s.io/kubernetes/pkg/kubelet/eviction" "k8s.io/kubernetes/pkg/kubelet/nodeshutdown/systemd" + "k8s.io/kubernetes/pkg/kubelet/prober" probetest "k8s.io/kubernetes/pkg/kubelet/prober/testing" + "k8s.io/utils/clock" testingclock "k8s.io/utils/clock/testing" ) @@ -608,3 +613,101 @@ func Test_groupByPriority(t *testing.T) { }) } } + +func Test_managerImpl_processShutdownEvent(t *testing.T) { + var ( + probeManager = probetest.FakeManager{} + fakeRecorder = &record.FakeRecorder{} + syncNodeStatus = func() {} + nodeRef = &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""} + fakeclock = testingclock.NewFakeClock(time.Now()) + ) + + type fields struct { + recorder record.EventRecorder + nodeRef *v1.ObjectReference + probeManager prober.Manager + shutdownGracePeriodByPodPriority []kubeletconfig.ShutdownGracePeriodByPodPriority + getPods eviction.ActivePodsFunc + killPodFunc eviction.KillPodFunc + syncNodeStatus func() + dbusCon dbusInhibiter + inhibitLock systemd.InhibitLock + nodeShuttingDownNow bool + clock clock.Clock + } + tests := []struct { + name string + fields fields + wantErr bool + exceptOutputContains string + }{ + { + name: "kill pod func take too long", + fields: fields{ + recorder: fakeRecorder, + nodeRef: nodeRef, + probeManager: probeManager, + shutdownGracePeriodByPodPriority: []kubeletconfig.ShutdownGracePeriodByPodPriority{ + { + Priority: 1, + ShutdownGracePeriodSeconds: 10, + }, + { + Priority: 2, + ShutdownGracePeriodSeconds: 20, + }, + }, + getPods: func() []*v1.Pod { + return []*v1.Pod{ + makePod("normal-pod", 1, nil), + makePod("critical-pod", 2, nil), + } + }, + killPodFunc: func(pod *v1.Pod, isEvicted bool, gracePeriodOverride *int64, fn func(*v1.PodStatus)) error { + fakeclock.Step(60 * time.Second) + return nil + }, + syncNodeStatus: syncNodeStatus, + clock: fakeclock, + dbusCon: &fakeDbus{}, + }, + wantErr: false, + exceptOutputContains: "Shutdown manager pod killing time out", + }, + } + for _, tt := range tests { + + l := klog.Level(1) + l.Set("1") + tmpWriteBuffer := bytes.NewBuffer(nil) + klog.SetOutput(tmpWriteBuffer) + klog.LogToStderr(false) + + t.Run(tt.name, func(t *testing.T) { + m := &managerImpl{ + recorder: tt.fields.recorder, + nodeRef: tt.fields.nodeRef, + probeManager: tt.fields.probeManager, + shutdownGracePeriodByPodPriority: tt.fields.shutdownGracePeriodByPodPriority, + getPods: tt.fields.getPods, + killPodFunc: tt.fields.killPodFunc, + syncNodeStatus: tt.fields.syncNodeStatus, + dbusCon: tt.fields.dbusCon, + inhibitLock: tt.fields.inhibitLock, + nodeShuttingDownMutex: sync.Mutex{}, + nodeShuttingDownNow: tt.fields.nodeShuttingDownNow, + clock: tt.fields.clock, + } + if err := m.processShutdownEvent(); (err != nil) != tt.wantErr { + t.Errorf("managerImpl.processShutdownEvent() error = %v, wantErr %v", err, tt.wantErr) + } + klog.Flush() + + log := tmpWriteBuffer.String() + if !strings.Contains(log, tt.exceptOutputContains) { + t.Errorf("managerImpl.processShutdownEvent() should log %s, got %s", tt.exceptOutputContains, log) + } + }) + } +}