diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index ea15940d807..2be4e6be224 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -929,7 +929,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, util.SetNodeOwnerFunc(klet.heartbeatClient, string(klet.nodeName))) // setup node shutdown manager - shutdownManager, shutdownAdmitHandler := nodeshutdown.NewManager(&nodeshutdown.Config{ + shutdownManager := nodeshutdown.NewManager(&nodeshutdown.Config{ Logger: logger, ProbeManager: klet.probeManager, VolumeManager: klet.volumeManager, @@ -948,7 +948,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, if err != nil { return nil, fmt.Errorf("create user namespace manager: %w", err) } - klet.admitHandlers.AddPodAdmitHandler(shutdownAdmitHandler) + klet.admitHandlers.AddPodAdmitHandler(shutdownManager) // Finally, put the most recent version of the config on the Kubelet, so // people can see how it was configured. diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 27c47ed9015..c8d493b973b 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -351,7 +351,7 @@ func newTestKubeletWithImageList( kubelet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler) // setup shutdown manager - shutdownManager, shutdownAdmitHandler := nodeshutdown.NewManager(&nodeshutdown.Config{ + shutdownManager := nodeshutdown.NewManager(&nodeshutdown.Config{ Logger: logger, ProbeManager: kubelet.probeManager, Recorder: fakeRecorder, @@ -363,7 +363,7 @@ func newTestKubeletWithImageList( ShutdownGracePeriodCriticalPods: 0, }) kubelet.shutdownManager = shutdownManager - kubelet.admitHandlers.AddPodAdmitHandler(shutdownAdmitHandler) + kubelet.admitHandlers.AddPodAdmitHandler(shutdownManager) // Add this as cleanup predicate pod admitter kubelet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(kubelet.getNodeAnyWay, lifecycle.NewAdmissionFailureHandlerStub(), kubelet.containerManager.UpdatePluginResources)) diff --git a/pkg/kubelet/nodeshutdown/nodeshutdown_manager.go b/pkg/kubelet/nodeshutdown/nodeshutdown_manager.go index 0b5a5a244c6..15e8582fdad 100644 --- a/pkg/kubelet/nodeshutdown/nodeshutdown_manager.go +++ b/pkg/kubelet/nodeshutdown/nodeshutdown_manager.go @@ -17,11 +17,19 @@ limitations under the License. package nodeshutdown import ( + "context" + "fmt" + "sort" + "sync" "time" v1 "k8s.io/api/core/v1" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" + "k8s.io/kubernetes/pkg/apis/scheduling" + "k8s.io/kubernetes/pkg/features" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" "k8s.io/kubernetes/pkg/kubelet/eviction" "k8s.io/kubernetes/pkg/kubelet/lifecycle" @@ -32,6 +40,8 @@ import ( // Manager interface provides methods for Kubelet to manage node shutdown. type Manager interface { + lifecycle.PodAdmitHandler + Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult Start() error ShutdownStatus() error @@ -71,3 +81,211 @@ func (managerStub) Start() error { func (managerStub) ShutdownStatus() error { return nil } + +const ( + nodeShutdownReason = "Terminated" + nodeShutdownMessage = "Pod was terminated in response to imminent node shutdown." +) + +// podManager is responsible for killing active pods by priority. +type podManager struct { + logger klog.Logger + shutdownGracePeriodByPodPriority []kubeletconfig.ShutdownGracePeriodByPodPriority + clock clock.Clock + killPodFunc eviction.KillPodFunc + volumeManager volumemanager.VolumeManager +} + +func newPodManager(conf *Config) *podManager { + shutdownGracePeriodByPodPriority := conf.ShutdownGracePeriodByPodPriority + + // Migration from the original configuration + if !utilfeature.DefaultFeatureGate.Enabled(features.GracefulNodeShutdownBasedOnPodPriority) || + len(shutdownGracePeriodByPodPriority) == 0 { + shutdownGracePeriodByPodPriority = migrateConfig(conf.ShutdownGracePeriodRequested, conf.ShutdownGracePeriodCriticalPods) + } + + // Sort by priority from low to high + sort.Slice(shutdownGracePeriodByPodPriority, func(i, j int) bool { + return shutdownGracePeriodByPodPriority[i].Priority < shutdownGracePeriodByPodPriority[j].Priority + }) + + if conf.Clock == nil { + conf.Clock = clock.RealClock{} + } + + return &podManager{ + logger: conf.Logger, + shutdownGracePeriodByPodPriority: shutdownGracePeriodByPodPriority, + clock: conf.Clock, + killPodFunc: conf.KillPodFunc, + volumeManager: conf.VolumeManager, + } +} + +// killPods terminates pods by priority. +func (m *podManager) killPods(activePods []*v1.Pod) error { + groups := groupByPriority(m.shutdownGracePeriodByPodPriority, activePods) + for _, group := range groups { + // If there are no pods in a particular range, + // then do not wait for pods in that priority range. + if len(group.Pods) == 0 { + continue + } + + var wg sync.WaitGroup + wg.Add(len(group.Pods)) + for _, pod := range group.Pods { + go func(pod *v1.Pod, group podShutdownGroup) { + defer wg.Done() + + gracePeriodOverride := group.ShutdownGracePeriodSeconds + + // If the pod's spec specifies a termination gracePeriod which is less than the gracePeriodOverride calculated, use the pod spec termination gracePeriod. + if pod.Spec.TerminationGracePeriodSeconds != nil && *pod.Spec.TerminationGracePeriodSeconds <= gracePeriodOverride { + gracePeriodOverride = *pod.Spec.TerminationGracePeriodSeconds + } + + m.logger.V(1).Info("Shutdown manager killing pod with gracePeriod", "pod", klog.KObj(pod), "gracePeriod", gracePeriodOverride) + + if err := m.killPodFunc(pod, false, &gracePeriodOverride, func(status *v1.PodStatus) { + // set the pod status to failed (unless it was already in a successful terminal phase) + if status.Phase != v1.PodSucceeded { + status.Phase = v1.PodFailed + } + status.Message = nodeShutdownMessage + status.Reason = nodeShutdownReason + podutil.UpdatePodCondition(status, &v1.PodCondition{ + Type: v1.DisruptionTarget, + Status: v1.ConditionTrue, + Reason: v1.PodReasonTerminationByKubelet, + Message: nodeShutdownMessage, + }) + }); err != nil { + m.logger.V(1).Info("Shutdown manager failed killing pod", "pod", klog.KObj(pod), "err", err) + } else { + m.logger.V(1).Info("Shutdown manager finished killing pod", "pod", klog.KObj(pod)) + } + }(pod, group) + } + + // This duration determines how long the shutdown manager will wait for the pods in this group + // to terminate before proceeding to the next group. + var groupTerminationWaitDuration = time.Duration(group.ShutdownGracePeriodSeconds) * time.Second + var ( + doneCh = make(chan struct{}) + timer = m.clock.NewTimer(groupTerminationWaitDuration) + ctx, ctxCancel = context.WithTimeout(context.Background(), groupTerminationWaitDuration) + ) + go func() { + defer close(doneCh) + defer ctxCancel() + wg.Wait() + // The signal to kill a Pod was sent successfully to all the pods, + // let's wait until all the volumes are unmounted from all the pods before + // continuing to the next group. This is done so that the CSI Driver (assuming + // that it's part of the highest group) has a chance to perform unmounts. + if err := m.volumeManager.WaitForAllPodsUnmount(ctx, group.Pods); err != nil { + var podIdentifiers []string + for _, pod := range group.Pods { + podIdentifiers = append(podIdentifiers, fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)) + } + + // Waiting for volume teardown is done on a best basis effort, + // report an error and continue. + // + // Depending on the user provided kubelet configuration value + // either the `timer` will tick and we'll continue to shutdown the next group, or, + // WaitForAllPodsUnmount will timeout, therefore this goroutine + // will close doneCh and we'll continue to shutdown the next group. + m.logger.Error(err, "Failed while waiting for all the volumes belonging to Pods in this group to unmount", "pods", podIdentifiers) + } + }() + + select { + case <-doneCh: + timer.Stop() + m.logger.V(1).Info("Done waiting for all pods in group to terminate", "gracePeriod", group.ShutdownGracePeriodSeconds, "priority", group.Priority) + case <-timer.C(): + ctxCancel() + m.logger.V(1).Info("Shutdown manager pod killing time out", "gracePeriod", group.ShutdownGracePeriodSeconds, "priority", group.Priority) + } + } + + return nil +} + +func (m *podManager) periodRequested() time.Duration { + var sum int64 + for _, period := range m.shutdownGracePeriodByPodPriority { + sum += period.ShutdownGracePeriodSeconds + } + return time.Duration(sum) * time.Second +} + +func migrateConfig(shutdownGracePeriodRequested, shutdownGracePeriodCriticalPods time.Duration) []kubeletconfig.ShutdownGracePeriodByPodPriority { + if shutdownGracePeriodRequested == 0 { + return nil + } + defaultPriority := shutdownGracePeriodRequested - shutdownGracePeriodCriticalPods + if defaultPriority < 0 { + return nil + } + criticalPriority := shutdownGracePeriodRequested - defaultPriority + if criticalPriority < 0 { + return nil + } + return []kubeletconfig.ShutdownGracePeriodByPodPriority{ + { + Priority: scheduling.DefaultPriorityWhenNoDefaultClassExists, + ShutdownGracePeriodSeconds: int64(defaultPriority / time.Second), + }, + { + Priority: scheduling.SystemCriticalPriority, + ShutdownGracePeriodSeconds: int64(criticalPriority / time.Second), + }, + } +} + +type podShutdownGroup struct { + kubeletconfig.ShutdownGracePeriodByPodPriority + Pods []*v1.Pod +} + +func groupByPriority(shutdownGracePeriodByPodPriority []kubeletconfig.ShutdownGracePeriodByPodPriority, pods []*v1.Pod) []podShutdownGroup { + groups := make([]podShutdownGroup, 0, len(shutdownGracePeriodByPodPriority)) + for _, period := range shutdownGracePeriodByPodPriority { + groups = append(groups, podShutdownGroup{ + ShutdownGracePeriodByPodPriority: period, + }) + } + + for _, pod := range pods { + var priority int32 + if pod.Spec.Priority != nil { + priority = *pod.Spec.Priority + } + + // Find the group index according to the priority. + index := sort.Search(len(groups), func(i int) bool { + return groups[i].Priority >= priority + }) + + // 1. Those higher than the highest priority default to the highest priority + // 2. Those lower than the lowest priority default to the lowest priority + // 3. Those boundary priority default to the lower priority + // if priority of pod is: + // groups[index-1].Priority <= pod priority < groups[index].Priority + // in which case we want to pick lower one (i.e index-1) + if index == len(groups) { + index = len(groups) - 1 + } else if index < 0 { + index = 0 + } else if index > 0 && groups[index].Priority > priority { + index-- + } + + groups[index].Pods = append(groups[index].Pods, pod) + } + return groups +} diff --git a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go index e56d03155c2..79299651e9c 100644 --- a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go +++ b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go @@ -21,10 +21,8 @@ limitations under the License. package nodeshutdown import ( - "context" "fmt" "path/filepath" - "sort" "sync" "time" @@ -32,23 +30,16 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" - podutil "k8s.io/kubernetes/pkg/api/v1/pod" - "k8s.io/kubernetes/pkg/apis/scheduling" "k8s.io/kubernetes/pkg/features" - kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" kubeletevents "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/kubelet/eviction" "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/nodeshutdown/systemd" "k8s.io/kubernetes/pkg/kubelet/prober" - "k8s.io/kubernetes/pkg/kubelet/volumemanager" - "k8s.io/utils/clock" ) const ( - nodeShutdownReason = "Terminated" - nodeShutdownMessage = "Pod was terminated in response to imminent node shutdown." nodeShutdownNotAdmittedReason = "NodeShutdown" nodeShutdownNotAdmittedMessage = "Pod was rejected as the node is shutting down." dbusReconnectPeriod = 1 * time.Second @@ -75,12 +66,7 @@ type managerImpl struct { nodeRef *v1.ObjectReference probeManager prober.Manager - volumeManager volumemanager.VolumeManager - - shutdownGracePeriodByPodPriority []kubeletconfig.ShutdownGracePeriodByPodPriority - getPods eviction.ActivePodsFunc - killPodFunc eviction.KillPodFunc syncNodeStatus func() dbusCon dbusInhibiter @@ -88,53 +74,36 @@ type managerImpl struct { nodeShuttingDownMutex sync.Mutex nodeShuttingDownNow bool - - clock clock.Clock + podManager *podManager enableMetrics bool storage storage } // NewManager returns a new node shutdown manager. -func NewManager(conf *Config) (Manager, lifecycle.PodAdmitHandler) { +func NewManager(conf *Config) Manager { if !utilfeature.DefaultFeatureGate.Enabled(features.GracefulNodeShutdown) { m := managerStub{} - return m, m + return m } - shutdownGracePeriodByPodPriority := conf.ShutdownGracePeriodByPodPriority - // Migration from the original configuration - if !utilfeature.DefaultFeatureGate.Enabled(features.GracefulNodeShutdownBasedOnPodPriority) || - len(shutdownGracePeriodByPodPriority) == 0 { - shutdownGracePeriodByPodPriority = migrateConfig(conf.ShutdownGracePeriodRequested, conf.ShutdownGracePeriodCriticalPods) - } + podManager := newPodManager(conf) // Disable if the configuration is empty - if len(shutdownGracePeriodByPodPriority) == 0 { + if len(podManager.shutdownGracePeriodByPodPriority) == 0 { m := managerStub{} - return m, m + return m } - // Sort by priority from low to high - sort.Slice(shutdownGracePeriodByPodPriority, func(i, j int) bool { - return shutdownGracePeriodByPodPriority[i].Priority < shutdownGracePeriodByPodPriority[j].Priority - }) - - if conf.Clock == nil { - conf.Clock = clock.RealClock{} - } manager := &managerImpl{ - logger: conf.Logger, - probeManager: conf.ProbeManager, - recorder: conf.Recorder, - volumeManager: conf.VolumeManager, - nodeRef: conf.NodeRef, - getPods: conf.GetPodsFunc, - killPodFunc: conf.KillPodFunc, - syncNodeStatus: conf.SyncNodeStatusFunc, - shutdownGracePeriodByPodPriority: shutdownGracePeriodByPodPriority, - clock: conf.Clock, - enableMetrics: utilfeature.DefaultFeatureGate.Enabled(features.GracefulNodeShutdownBasedOnPodPriority), + logger: conf.Logger, + probeManager: conf.ProbeManager, + recorder: conf.Recorder, + nodeRef: conf.NodeRef, + getPods: conf.GetPodsFunc, + syncNodeStatus: conf.SyncNodeStatusFunc, + podManager: podManager, + enableMetrics: utilfeature.DefaultFeatureGate.Enabled(features.GracefulNodeShutdownBasedOnPodPriority), storage: localStorage{ Path: filepath.Join(conf.StateDirectory, localStorageStateFile), }, @@ -142,9 +111,9 @@ func NewManager(conf *Config) (Manager, lifecycle.PodAdmitHandler) { manager.logger.Info("Creating node shutdown manager", "shutdownGracePeriodRequested", conf.ShutdownGracePeriodRequested, "shutdownGracePeriodCriticalPods", conf.ShutdownGracePeriodCriticalPods, - "shutdownGracePeriodByPodPriority", shutdownGracePeriodByPodPriority, + "shutdownGracePeriodByPodPriority", podManager.shutdownGracePeriodByPodPriority, ) - return manager, manager + return manager } // Admit rejects all pods if node is shutting @@ -217,7 +186,7 @@ func (m *managerImpl) start() (chan struct{}, error) { } // If the logind's InhibitDelayMaxUSec as configured in (logind.conf) is less than periodRequested, attempt to update the value to periodRequested. - if periodRequested := m.periodRequested(); periodRequested > currentInhibitDelay { + if periodRequested := m.podManager.periodRequested(); periodRequested > currentInhibitDelay { err := m.dbusCon.OverrideInhibitDelay(periodRequested) if err != nil { return nil, fmt.Errorf("unable to override inhibit delay by shutdown manager: %v", err) @@ -356,166 +325,5 @@ func (m *managerImpl) processShutdownEvent() error { }() } - groups := groupByPriority(m.shutdownGracePeriodByPodPriority, activePods) - for _, group := range groups { - // If there are no pods in a particular range, - // then do not wait for pods in that priority range. - if len(group.Pods) == 0 { - continue - } - - var wg sync.WaitGroup - wg.Add(len(group.Pods)) - for _, pod := range group.Pods { - go func(pod *v1.Pod, group podShutdownGroup) { - defer wg.Done() - - gracePeriodOverride := group.ShutdownGracePeriodSeconds - - // If the pod's spec specifies a termination gracePeriod which is less than the gracePeriodOverride calculated, use the pod spec termination gracePeriod. - if pod.Spec.TerminationGracePeriodSeconds != nil && *pod.Spec.TerminationGracePeriodSeconds <= gracePeriodOverride { - gracePeriodOverride = *pod.Spec.TerminationGracePeriodSeconds - } - - m.logger.V(1).Info("Shutdown manager killing pod with gracePeriod", "pod", klog.KObj(pod), "gracePeriod", gracePeriodOverride) - - if err := m.killPodFunc(pod, false, &gracePeriodOverride, func(status *v1.PodStatus) { - // set the pod status to failed (unless it was already in a successful terminal phase) - if status.Phase != v1.PodSucceeded { - status.Phase = v1.PodFailed - } - status.Message = nodeShutdownMessage - status.Reason = nodeShutdownReason - podutil.UpdatePodCondition(status, &v1.PodCondition{ - Type: v1.DisruptionTarget, - Status: v1.ConditionTrue, - Reason: v1.PodReasonTerminationByKubelet, - Message: nodeShutdownMessage, - }) - }); err != nil { - m.logger.V(1).Info("Shutdown manager failed killing pod", "pod", klog.KObj(pod), "err", err) - } else { - m.logger.V(1).Info("Shutdown manager finished killing pod", "pod", klog.KObj(pod)) - } - }(pod, group) - } - - // This duration determines how long the shutdown manager will wait for the pods in this group - // to terminate before proceeding to the next group. - var groupTerminationWaitDuration = time.Duration(group.ShutdownGracePeriodSeconds) * time.Second - var ( - doneCh = make(chan struct{}) - timer = m.clock.NewTimer(groupTerminationWaitDuration) - ctx, ctxCancel = context.WithTimeout(context.Background(), groupTerminationWaitDuration) - ) - go func() { - defer close(doneCh) - defer ctxCancel() - wg.Wait() - // The signal to kill a Pod was sent successfully to all the pods, - // let's wait until all the volumes are unmounted from all the pods before - // continuing to the next group. This is done so that the CSI Driver (assuming - // that it's part of the highest group) has a chance to perform unmounts. - if err := m.volumeManager.WaitForAllPodsUnmount(ctx, group.Pods); err != nil { - var podIdentifiers []string - for _, pod := range group.Pods { - podIdentifiers = append(podIdentifiers, fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)) - } - - // Waiting for volume teardown is done on a best basis effort, - // report an error and continue. - // - // Depending on the user provided kubelet configuration value - // either the `timer` will tick and we'll continue to shutdown the next group, or, - // WaitForAllPodsUnmount will timeout, therefore this goroutine - // will close doneCh and we'll continue to shutdown the next group. - m.logger.Error(err, "Failed while waiting for all the volumes belonging to Pods in this group to unmount", "pods", podIdentifiers) - } - }() - - select { - case <-doneCh: - timer.Stop() - case <-timer.C(): - ctxCancel() - m.logger.V(1).Info("Shutdown manager pod killing time out", "gracePeriod", group.ShutdownGracePeriodSeconds, "priority", group.Priority) - } - } - - return nil -} - -func (m *managerImpl) periodRequested() time.Duration { - var sum int64 - for _, period := range m.shutdownGracePeriodByPodPriority { - sum += period.ShutdownGracePeriodSeconds - } - return time.Duration(sum) * time.Second -} - -func migrateConfig(shutdownGracePeriodRequested, shutdownGracePeriodCriticalPods time.Duration) []kubeletconfig.ShutdownGracePeriodByPodPriority { - if shutdownGracePeriodRequested == 0 { - return nil - } - defaultPriority := shutdownGracePeriodRequested - shutdownGracePeriodCriticalPods - if defaultPriority < 0 { - return nil - } - criticalPriority := shutdownGracePeriodRequested - defaultPriority - if criticalPriority < 0 { - return nil - } - return []kubeletconfig.ShutdownGracePeriodByPodPriority{ - { - Priority: scheduling.DefaultPriorityWhenNoDefaultClassExists, - ShutdownGracePeriodSeconds: int64(defaultPriority / time.Second), - }, - { - Priority: scheduling.SystemCriticalPriority, - ShutdownGracePeriodSeconds: int64(criticalPriority / time.Second), - }, - } -} - -func groupByPriority(shutdownGracePeriodByPodPriority []kubeletconfig.ShutdownGracePeriodByPodPriority, pods []*v1.Pod) []podShutdownGroup { - groups := make([]podShutdownGroup, 0, len(shutdownGracePeriodByPodPriority)) - for _, period := range shutdownGracePeriodByPodPriority { - groups = append(groups, podShutdownGroup{ - ShutdownGracePeriodByPodPriority: period, - }) - } - - for _, pod := range pods { - var priority int32 - if pod.Spec.Priority != nil { - priority = *pod.Spec.Priority - } - - // Find the group index according to the priority. - index := sort.Search(len(groups), func(i int) bool { - return groups[i].Priority >= priority - }) - - // 1. Those higher than the highest priority default to the highest priority - // 2. Those lower than the lowest priority default to the lowest priority - // 3. Those boundary priority default to the lower priority - // if priority of pod is: - // groups[index-1].Priority <= pod priority < groups[index].Priority - // in which case we want to pick lower one (i.e index-1) - if index == len(groups) { - index = len(groups) - 1 - } else if index < 0 { - index = 0 - } else if index > 0 && groups[index].Priority > priority { - index-- - } - - groups[index].Pods = append(groups[index].Pods, pod) - } - return groups -} - -type podShutdownGroup struct { - kubeletconfig.ShutdownGracePeriodByPodPriority - Pods []*v1.Pod + return m.podManager.killPods(activePods) } diff --git a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux_test.go b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux_test.go index 1566153b9e8..625856803a6 100644 --- a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux_test.go +++ b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux_test.go @@ -92,19 +92,6 @@ func (f *fakeDbus) OverrideInhibitDelay(inhibitDelayMax time.Duration) error { return nil } -func makePod(name string, priority int32, terminationGracePeriod *int64) *v1.Pod { - return &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - UID: types.UID(name), - }, - Spec: v1.PodSpec{ - Priority: &priority, - TerminationGracePeriodSeconds: terminationGracePeriod, - }, - } -} - func TestManager(t *testing.T) { systemDbusTmp := systemDbus defer func() { @@ -352,7 +339,7 @@ func TestManager(t *testing.T) { fakeRecorder := &record.FakeRecorder{} fakeVolumeManager := volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil) nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""} - manager, _ := NewManager(&Config{ + manager := NewManager(&Config{ Logger: logger, ProbeManager: proberManager, VolumeManager: fakeVolumeManager, @@ -459,7 +446,7 @@ func TestFeatureEnabled(t *testing.T) { fakeVolumeManager := volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil) nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""} - manager, _ := NewManager(&Config{ + manager := NewManager(&Config{ Logger: logger, ProbeManager: proberManager, VolumeManager: fakeVolumeManager, @@ -517,7 +504,7 @@ func TestRestart(t *testing.T) { fakeRecorder := &record.FakeRecorder{} fakeVolumeManager := volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil) nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""} - manager, _ := NewManager(&Config{ + manager := NewManager(&Config{ Logger: logger, ProbeManager: proberManager, VolumeManager: fakeVolumeManager, @@ -551,199 +538,6 @@ func TestRestart(t *testing.T) { } } -func Test_migrateConfig(t *testing.T) { - type shutdownConfig struct { - shutdownGracePeriodRequested time.Duration - shutdownGracePeriodCriticalPods time.Duration - } - tests := []struct { - name string - args shutdownConfig - want []kubeletconfig.ShutdownGracePeriodByPodPriority - }{ - { - name: "both shutdownGracePeriodRequested and shutdownGracePeriodCriticalPods", - args: shutdownConfig{ - shutdownGracePeriodRequested: 300 * time.Second, - shutdownGracePeriodCriticalPods: 120 * time.Second, - }, - want: []kubeletconfig.ShutdownGracePeriodByPodPriority{ - { - Priority: scheduling.DefaultPriorityWhenNoDefaultClassExists, - ShutdownGracePeriodSeconds: 180, - }, - { - Priority: scheduling.SystemCriticalPriority, - ShutdownGracePeriodSeconds: 120, - }, - }, - }, - { - name: "only shutdownGracePeriodRequested", - args: shutdownConfig{ - shutdownGracePeriodRequested: 100 * time.Second, - shutdownGracePeriodCriticalPods: 0 * time.Second, - }, - want: []kubeletconfig.ShutdownGracePeriodByPodPriority{ - { - Priority: scheduling.DefaultPriorityWhenNoDefaultClassExists, - ShutdownGracePeriodSeconds: 100, - }, - { - Priority: scheduling.SystemCriticalPriority, - ShutdownGracePeriodSeconds: 0, - }, - }, - }, - { - name: "empty configuration", - args: shutdownConfig{ - shutdownGracePeriodRequested: 0 * time.Second, - shutdownGracePeriodCriticalPods: 0 * time.Second, - }, - want: nil, - }, - { - name: "wrong configuration", - args: shutdownConfig{ - shutdownGracePeriodRequested: 1 * time.Second, - shutdownGracePeriodCriticalPods: 100 * time.Second, - }, - want: nil, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := migrateConfig(tt.args.shutdownGracePeriodRequested, tt.args.shutdownGracePeriodCriticalPods); !assert.Equal(t, tt.want, got) { - t.Errorf("migrateConfig() = %v, want %v", got, tt.want) - } - }) - } -} - -func Test_groupByPriority(t *testing.T) { - type args struct { - shutdownGracePeriodByPodPriority []kubeletconfig.ShutdownGracePeriodByPodPriority - pods []*v1.Pod - } - tests := []struct { - name string - args args - want []podShutdownGroup - }{ - { - name: "migrate config", - args: args{ - shutdownGracePeriodByPodPriority: migrateConfig(300*time.Second /* shutdownGracePeriodRequested */, 120*time.Second /* shutdownGracePeriodCriticalPods */), - pods: []*v1.Pod{ - makePod("normal-pod", scheduling.DefaultPriorityWhenNoDefaultClassExists, nil), - makePod("highest-user-definable-pod", scheduling.HighestUserDefinablePriority, nil), - makePod("critical-pod", scheduling.SystemCriticalPriority, nil), - }, - }, - want: []podShutdownGroup{ - { - ShutdownGracePeriodByPodPriority: kubeletconfig.ShutdownGracePeriodByPodPriority{ - Priority: scheduling.DefaultPriorityWhenNoDefaultClassExists, - ShutdownGracePeriodSeconds: 180, - }, - Pods: []*v1.Pod{ - makePod("normal-pod", scheduling.DefaultPriorityWhenNoDefaultClassExists, nil), - makePod("highest-user-definable-pod", scheduling.HighestUserDefinablePriority, nil), - }, - }, - { - ShutdownGracePeriodByPodPriority: kubeletconfig.ShutdownGracePeriodByPodPriority{ - Priority: scheduling.SystemCriticalPriority, - ShutdownGracePeriodSeconds: 120, - }, - Pods: []*v1.Pod{ - makePod("critical-pod", scheduling.SystemCriticalPriority, nil), - }, - }, - }, - }, - { - name: "pod priority", - args: args{ - shutdownGracePeriodByPodPriority: []kubeletconfig.ShutdownGracePeriodByPodPriority{ - { - Priority: 1, - ShutdownGracePeriodSeconds: 10, - }, - { - Priority: 2, - ShutdownGracePeriodSeconds: 20, - }, - { - Priority: 3, - ShutdownGracePeriodSeconds: 30, - }, - { - Priority: 4, - ShutdownGracePeriodSeconds: 40, - }, - }, - pods: []*v1.Pod{ - makePod("pod-0", 0, nil), - makePod("pod-1", 1, nil), - makePod("pod-2", 2, nil), - makePod("pod-3", 3, nil), - makePod("pod-4", 4, nil), - makePod("pod-5", 5, nil), - }, - }, - want: []podShutdownGroup{ - { - ShutdownGracePeriodByPodPriority: kubeletconfig.ShutdownGracePeriodByPodPriority{ - Priority: 1, - ShutdownGracePeriodSeconds: 10, - }, - Pods: []*v1.Pod{ - makePod("pod-0", 0, nil), - makePod("pod-1", 1, nil), - }, - }, - { - ShutdownGracePeriodByPodPriority: kubeletconfig.ShutdownGracePeriodByPodPriority{ - Priority: 2, - ShutdownGracePeriodSeconds: 20, - }, - Pods: []*v1.Pod{ - makePod("pod-2", 2, nil), - }, - }, - { - ShutdownGracePeriodByPodPriority: kubeletconfig.ShutdownGracePeriodByPodPriority{ - Priority: 3, - ShutdownGracePeriodSeconds: 30, - }, - Pods: []*v1.Pod{ - makePod("pod-3", 3, nil), - }, - }, - { - ShutdownGracePeriodByPodPriority: kubeletconfig.ShutdownGracePeriodByPodPriority{ - Priority: 4, - ShutdownGracePeriodSeconds: 40, - }, - Pods: []*v1.Pod{ - makePod("pod-4", 4, nil), - makePod("pod-5", 5, nil), - }, - }, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := groupByPriority(tt.args.shutdownGracePeriodByPodPriority, tt.args.pods); !assert.Equal(t, tt.want, got) { - t.Errorf("groupByPriority() = %v, want %v", got, tt.want) - } - }) - } -} - func Test_managerImpl_processShutdownEvent(t *testing.T) { var ( probeManager = probetest.FakeManager{} @@ -818,20 +612,23 @@ func Test_managerImpl_processShutdownEvent(t *testing.T) { ), ) m := &managerImpl{ - logger: logger, - volumeManager: tt.fields.volumeManager, - recorder: tt.fields.recorder, - nodeRef: tt.fields.nodeRef, - probeManager: tt.fields.probeManager, - shutdownGracePeriodByPodPriority: tt.fields.shutdownGracePeriodByPodPriority, - getPods: tt.fields.getPods, - killPodFunc: tt.fields.killPodFunc, - syncNodeStatus: tt.fields.syncNodeStatus, - dbusCon: tt.fields.dbusCon, - inhibitLock: tt.fields.inhibitLock, - nodeShuttingDownMutex: sync.Mutex{}, - nodeShuttingDownNow: tt.fields.nodeShuttingDownNow, - clock: tt.fields.clock, + logger: logger, + recorder: tt.fields.recorder, + nodeRef: tt.fields.nodeRef, + probeManager: tt.fields.probeManager, + getPods: tt.fields.getPods, + syncNodeStatus: tt.fields.syncNodeStatus, + dbusCon: tt.fields.dbusCon, + inhibitLock: tt.fields.inhibitLock, + nodeShuttingDownMutex: sync.Mutex{}, + nodeShuttingDownNow: tt.fields.nodeShuttingDownNow, + podManager: &podManager{ + logger: logger, + volumeManager: tt.fields.volumeManager, + shutdownGracePeriodByPodPriority: tt.fields.shutdownGracePeriodByPodPriority, + killPodFunc: tt.fields.killPodFunc, + clock: tt.fields.clock, + }, } if err := m.processShutdownEvent(); (err != nil) != tt.wantErr { t.Errorf("managerImpl.processShutdownEvent() error = %v, wantErr %v", err, tt.wantErr) @@ -870,28 +667,31 @@ func Test_processShutdownEvent_VolumeUnmountTimeout(t *testing.T) { ) logger := ktesting.NewLogger(t, ktesting.NewConfig(ktesting.BufferLogs(true))) m := &managerImpl{ - logger: logger, - volumeManager: fakeVolumeManager, - recorder: fakeRecorder, - nodeRef: nodeRef, - probeManager: probeManager, - shutdownGracePeriodByPodPriority: []kubeletconfig.ShutdownGracePeriodByPodPriority{ - { - Priority: 1, - ShutdownGracePeriodSeconds: int64(shutdownGracePeriodSeconds), - }, - }, + logger: logger, + recorder: fakeRecorder, + nodeRef: nodeRef, + probeManager: probeManager, getPods: func() []*v1.Pod { return []*v1.Pod{ makePod("test-pod", 1, nil), } }, - killPodFunc: func(pod *v1.Pod, isEvicted bool, gracePeriodOverride *int64, fn func(*v1.PodStatus)) error { - return nil - }, syncNodeStatus: syncNodeStatus, dbusCon: &fakeDbus{}, - clock: fakeclock, + podManager: &podManager{ + logger: logger, + volumeManager: fakeVolumeManager, + shutdownGracePeriodByPodPriority: []kubeletconfig.ShutdownGracePeriodByPodPriority{ + { + Priority: 1, + ShutdownGracePeriodSeconds: int64(shutdownGracePeriodSeconds), + }, + }, + killPodFunc: func(pod *v1.Pod, isEvicted bool, gracePeriodOverride *int64, fn func(*v1.PodStatus)) error { + return nil + }, + clock: fakeclock, + }, } start := fakeclock.Now() diff --git a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_others.go b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_others.go index 9ce8c0e2705..919b29745e9 100644 --- a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_others.go +++ b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_others.go @@ -19,12 +19,8 @@ limitations under the License. package nodeshutdown -import ( - "k8s.io/kubernetes/pkg/kubelet/lifecycle" -) - // NewManager returns a fake node shutdown manager for non linux platforms. -func NewManager(conf *Config) (Manager, lifecycle.PodAdmitHandler) { +func NewManager(conf *Config) Manager { m := managerStub{} - return m, m + return m } diff --git a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_test.go b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_test.go new file mode 100644 index 00000000000..0fba64b2832 --- /dev/null +++ b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_test.go @@ -0,0 +1,235 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodeshutdown + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/kubernetes/pkg/apis/scheduling" + kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" +) + +func makePod(name string, priority int32, terminationGracePeriod *int64) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(name), + }, + Spec: v1.PodSpec{ + Priority: &priority, + TerminationGracePeriodSeconds: terminationGracePeriod, + }, + } +} + +func Test_migrateConfig(t *testing.T) { + type shutdownConfig struct { + shutdownGracePeriodRequested time.Duration + shutdownGracePeriodCriticalPods time.Duration + } + tests := []struct { + name string + args shutdownConfig + want []kubeletconfig.ShutdownGracePeriodByPodPriority + }{ + { + name: "both shutdownGracePeriodRequested and shutdownGracePeriodCriticalPods", + args: shutdownConfig{ + shutdownGracePeriodRequested: 300 * time.Second, + shutdownGracePeriodCriticalPods: 120 * time.Second, + }, + want: []kubeletconfig.ShutdownGracePeriodByPodPriority{ + { + Priority: scheduling.DefaultPriorityWhenNoDefaultClassExists, + ShutdownGracePeriodSeconds: 180, + }, + { + Priority: scheduling.SystemCriticalPriority, + ShutdownGracePeriodSeconds: 120, + }, + }, + }, + { + name: "only shutdownGracePeriodRequested", + args: shutdownConfig{ + shutdownGracePeriodRequested: 100 * time.Second, + shutdownGracePeriodCriticalPods: 0 * time.Second, + }, + want: []kubeletconfig.ShutdownGracePeriodByPodPriority{ + { + Priority: scheduling.DefaultPriorityWhenNoDefaultClassExists, + ShutdownGracePeriodSeconds: 100, + }, + { + Priority: scheduling.SystemCriticalPriority, + ShutdownGracePeriodSeconds: 0, + }, + }, + }, + { + name: "empty configuration", + args: shutdownConfig{ + shutdownGracePeriodRequested: 0 * time.Second, + shutdownGracePeriodCriticalPods: 0 * time.Second, + }, + want: nil, + }, + { + name: "wrong configuration", + args: shutdownConfig{ + shutdownGracePeriodRequested: 1 * time.Second, + shutdownGracePeriodCriticalPods: 100 * time.Second, + }, + want: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := migrateConfig(tt.args.shutdownGracePeriodRequested, tt.args.shutdownGracePeriodCriticalPods); !assert.Equal(t, tt.want, got) { + t.Errorf("migrateConfig() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_groupByPriority(t *testing.T) { + type args struct { + shutdownGracePeriodByPodPriority []kubeletconfig.ShutdownGracePeriodByPodPriority + pods []*v1.Pod + } + tests := []struct { + name string + args args + want []podShutdownGroup + }{ + { + name: "migrate config", + args: args{ + shutdownGracePeriodByPodPriority: migrateConfig(300*time.Second /* shutdownGracePeriodRequested */, 120*time.Second /* shutdownGracePeriodCriticalPods */), + pods: []*v1.Pod{ + makePod("normal-pod", scheduling.DefaultPriorityWhenNoDefaultClassExists, nil), + makePod("highest-user-definable-pod", scheduling.HighestUserDefinablePriority, nil), + makePod("critical-pod", scheduling.SystemCriticalPriority, nil), + }, + }, + want: []podShutdownGroup{ + { + ShutdownGracePeriodByPodPriority: kubeletconfig.ShutdownGracePeriodByPodPriority{ + Priority: scheduling.DefaultPriorityWhenNoDefaultClassExists, + ShutdownGracePeriodSeconds: 180, + }, + Pods: []*v1.Pod{ + makePod("normal-pod", scheduling.DefaultPriorityWhenNoDefaultClassExists, nil), + makePod("highest-user-definable-pod", scheduling.HighestUserDefinablePriority, nil), + }, + }, + { + ShutdownGracePeriodByPodPriority: kubeletconfig.ShutdownGracePeriodByPodPriority{ + Priority: scheduling.SystemCriticalPriority, + ShutdownGracePeriodSeconds: 120, + }, + Pods: []*v1.Pod{ + makePod("critical-pod", scheduling.SystemCriticalPriority, nil), + }, + }, + }, + }, + { + name: "pod priority", + args: args{ + shutdownGracePeriodByPodPriority: []kubeletconfig.ShutdownGracePeriodByPodPriority{ + { + Priority: 1, + ShutdownGracePeriodSeconds: 10, + }, + { + Priority: 2, + ShutdownGracePeriodSeconds: 20, + }, + { + Priority: 3, + ShutdownGracePeriodSeconds: 30, + }, + { + Priority: 4, + ShutdownGracePeriodSeconds: 40, + }, + }, + pods: []*v1.Pod{ + makePod("pod-0", 0, nil), + makePod("pod-1", 1, nil), + makePod("pod-2", 2, nil), + makePod("pod-3", 3, nil), + makePod("pod-4", 4, nil), + makePod("pod-5", 5, nil), + }, + }, + want: []podShutdownGroup{ + { + ShutdownGracePeriodByPodPriority: kubeletconfig.ShutdownGracePeriodByPodPriority{ + Priority: 1, + ShutdownGracePeriodSeconds: 10, + }, + Pods: []*v1.Pod{ + makePod("pod-0", 0, nil), + makePod("pod-1", 1, nil), + }, + }, + { + ShutdownGracePeriodByPodPriority: kubeletconfig.ShutdownGracePeriodByPodPriority{ + Priority: 2, + ShutdownGracePeriodSeconds: 20, + }, + Pods: []*v1.Pod{ + makePod("pod-2", 2, nil), + }, + }, + { + ShutdownGracePeriodByPodPriority: kubeletconfig.ShutdownGracePeriodByPodPriority{ + Priority: 3, + ShutdownGracePeriodSeconds: 30, + }, + Pods: []*v1.Pod{ + makePod("pod-3", 3, nil), + }, + }, + { + ShutdownGracePeriodByPodPriority: kubeletconfig.ShutdownGracePeriodByPodPriority{ + Priority: 4, + ShutdownGracePeriodSeconds: 40, + }, + Pods: []*v1.Pod{ + makePod("pod-4", 4, nil), + makePod("pod-5", 5, nil), + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := groupByPriority(tt.args.shutdownGracePeriodByPodPriority, tt.args.pods); !assert.Equal(t, tt.want, got) { + t.Errorf("groupByPriority() = %v, want %v", got, tt.want) + } + }) + } +}