Refactor node shutdown manager

Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
This commit is contained in:
Maksym Pavlenko 2024-10-23 17:36:22 -07:00
parent 352056f09d
commit 449f86b0ba
7 changed files with 515 additions and 458 deletions

View File

@ -929,7 +929,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
util.SetNodeOwnerFunc(klet.heartbeatClient, string(klet.nodeName)))
// setup node shutdown manager
shutdownManager, shutdownAdmitHandler := nodeshutdown.NewManager(&nodeshutdown.Config{
shutdownManager := nodeshutdown.NewManager(&nodeshutdown.Config{
Logger: logger,
ProbeManager: klet.probeManager,
VolumeManager: klet.volumeManager,
@ -948,7 +948,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
if err != nil {
return nil, fmt.Errorf("create user namespace manager: %w", err)
}
klet.admitHandlers.AddPodAdmitHandler(shutdownAdmitHandler)
klet.admitHandlers.AddPodAdmitHandler(shutdownManager)
// Finally, put the most recent version of the config on the Kubelet, so
// people can see how it was configured.

View File

@ -351,7 +351,7 @@ func newTestKubeletWithImageList(
kubelet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)
// setup shutdown manager
shutdownManager, shutdownAdmitHandler := nodeshutdown.NewManager(&nodeshutdown.Config{
shutdownManager := nodeshutdown.NewManager(&nodeshutdown.Config{
Logger: logger,
ProbeManager: kubelet.probeManager,
Recorder: fakeRecorder,
@ -363,7 +363,7 @@ func newTestKubeletWithImageList(
ShutdownGracePeriodCriticalPods: 0,
})
kubelet.shutdownManager = shutdownManager
kubelet.admitHandlers.AddPodAdmitHandler(shutdownAdmitHandler)
kubelet.admitHandlers.AddPodAdmitHandler(shutdownManager)
// Add this as cleanup predicate pod admitter
kubelet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(kubelet.getNodeAnyWay, lifecycle.NewAdmissionFailureHandlerStub(), kubelet.containerManager.UpdatePluginResources))

View File

@ -17,11 +17,19 @@ limitations under the License.
package nodeshutdown
import (
"context"
"fmt"
"sort"
"sync"
"time"
v1 "k8s.io/api/core/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/apis/scheduling"
"k8s.io/kubernetes/pkg/features"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/eviction"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
@ -32,6 +40,8 @@ import (
// Manager interface provides methods for Kubelet to manage node shutdown.
type Manager interface {
lifecycle.PodAdmitHandler
Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult
Start() error
ShutdownStatus() error
@ -71,3 +81,211 @@ func (managerStub) Start() error {
func (managerStub) ShutdownStatus() error {
return nil
}
const (
nodeShutdownReason = "Terminated"
nodeShutdownMessage = "Pod was terminated in response to imminent node shutdown."
)
// podManager is responsible for killing active pods by priority.
type podManager struct {
logger klog.Logger
shutdownGracePeriodByPodPriority []kubeletconfig.ShutdownGracePeriodByPodPriority
clock clock.Clock
killPodFunc eviction.KillPodFunc
volumeManager volumemanager.VolumeManager
}
func newPodManager(conf *Config) *podManager {
shutdownGracePeriodByPodPriority := conf.ShutdownGracePeriodByPodPriority
// Migration from the original configuration
if !utilfeature.DefaultFeatureGate.Enabled(features.GracefulNodeShutdownBasedOnPodPriority) ||
len(shutdownGracePeriodByPodPriority) == 0 {
shutdownGracePeriodByPodPriority = migrateConfig(conf.ShutdownGracePeriodRequested, conf.ShutdownGracePeriodCriticalPods)
}
// Sort by priority from low to high
sort.Slice(shutdownGracePeriodByPodPriority, func(i, j int) bool {
return shutdownGracePeriodByPodPriority[i].Priority < shutdownGracePeriodByPodPriority[j].Priority
})
if conf.Clock == nil {
conf.Clock = clock.RealClock{}
}
return &podManager{
logger: conf.Logger,
shutdownGracePeriodByPodPriority: shutdownGracePeriodByPodPriority,
clock: conf.Clock,
killPodFunc: conf.KillPodFunc,
volumeManager: conf.VolumeManager,
}
}
// killPods terminates pods by priority.
func (m *podManager) killPods(activePods []*v1.Pod) error {
groups := groupByPriority(m.shutdownGracePeriodByPodPriority, activePods)
for _, group := range groups {
// If there are no pods in a particular range,
// then do not wait for pods in that priority range.
if len(group.Pods) == 0 {
continue
}
var wg sync.WaitGroup
wg.Add(len(group.Pods))
for _, pod := range group.Pods {
go func(pod *v1.Pod, group podShutdownGroup) {
defer wg.Done()
gracePeriodOverride := group.ShutdownGracePeriodSeconds
// If the pod's spec specifies a termination gracePeriod which is less than the gracePeriodOverride calculated, use the pod spec termination gracePeriod.
if pod.Spec.TerminationGracePeriodSeconds != nil && *pod.Spec.TerminationGracePeriodSeconds <= gracePeriodOverride {
gracePeriodOverride = *pod.Spec.TerminationGracePeriodSeconds
}
m.logger.V(1).Info("Shutdown manager killing pod with gracePeriod", "pod", klog.KObj(pod), "gracePeriod", gracePeriodOverride)
if err := m.killPodFunc(pod, false, &gracePeriodOverride, func(status *v1.PodStatus) {
// set the pod status to failed (unless it was already in a successful terminal phase)
if status.Phase != v1.PodSucceeded {
status.Phase = v1.PodFailed
}
status.Message = nodeShutdownMessage
status.Reason = nodeShutdownReason
podutil.UpdatePodCondition(status, &v1.PodCondition{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
Reason: v1.PodReasonTerminationByKubelet,
Message: nodeShutdownMessage,
})
}); err != nil {
m.logger.V(1).Info("Shutdown manager failed killing pod", "pod", klog.KObj(pod), "err", err)
} else {
m.logger.V(1).Info("Shutdown manager finished killing pod", "pod", klog.KObj(pod))
}
}(pod, group)
}
// This duration determines how long the shutdown manager will wait for the pods in this group
// to terminate before proceeding to the next group.
var groupTerminationWaitDuration = time.Duration(group.ShutdownGracePeriodSeconds) * time.Second
var (
doneCh = make(chan struct{})
timer = m.clock.NewTimer(groupTerminationWaitDuration)
ctx, ctxCancel = context.WithTimeout(context.Background(), groupTerminationWaitDuration)
)
go func() {
defer close(doneCh)
defer ctxCancel()
wg.Wait()
// The signal to kill a Pod was sent successfully to all the pods,
// let's wait until all the volumes are unmounted from all the pods before
// continuing to the next group. This is done so that the CSI Driver (assuming
// that it's part of the highest group) has a chance to perform unmounts.
if err := m.volumeManager.WaitForAllPodsUnmount(ctx, group.Pods); err != nil {
var podIdentifiers []string
for _, pod := range group.Pods {
podIdentifiers = append(podIdentifiers, fmt.Sprintf("%s/%s", pod.Namespace, pod.Name))
}
// Waiting for volume teardown is done on a best basis effort,
// report an error and continue.
//
// Depending on the user provided kubelet configuration value
// either the `timer` will tick and we'll continue to shutdown the next group, or,
// WaitForAllPodsUnmount will timeout, therefore this goroutine
// will close doneCh and we'll continue to shutdown the next group.
m.logger.Error(err, "Failed while waiting for all the volumes belonging to Pods in this group to unmount", "pods", podIdentifiers)
}
}()
select {
case <-doneCh:
timer.Stop()
m.logger.V(1).Info("Done waiting for all pods in group to terminate", "gracePeriod", group.ShutdownGracePeriodSeconds, "priority", group.Priority)
case <-timer.C():
ctxCancel()
m.logger.V(1).Info("Shutdown manager pod killing time out", "gracePeriod", group.ShutdownGracePeriodSeconds, "priority", group.Priority)
}
}
return nil
}
func (m *podManager) periodRequested() time.Duration {
var sum int64
for _, period := range m.shutdownGracePeriodByPodPriority {
sum += period.ShutdownGracePeriodSeconds
}
return time.Duration(sum) * time.Second
}
func migrateConfig(shutdownGracePeriodRequested, shutdownGracePeriodCriticalPods time.Duration) []kubeletconfig.ShutdownGracePeriodByPodPriority {
if shutdownGracePeriodRequested == 0 {
return nil
}
defaultPriority := shutdownGracePeriodRequested - shutdownGracePeriodCriticalPods
if defaultPriority < 0 {
return nil
}
criticalPriority := shutdownGracePeriodRequested - defaultPriority
if criticalPriority < 0 {
return nil
}
return []kubeletconfig.ShutdownGracePeriodByPodPriority{
{
Priority: scheduling.DefaultPriorityWhenNoDefaultClassExists,
ShutdownGracePeriodSeconds: int64(defaultPriority / time.Second),
},
{
Priority: scheduling.SystemCriticalPriority,
ShutdownGracePeriodSeconds: int64(criticalPriority / time.Second),
},
}
}
type podShutdownGroup struct {
kubeletconfig.ShutdownGracePeriodByPodPriority
Pods []*v1.Pod
}
func groupByPriority(shutdownGracePeriodByPodPriority []kubeletconfig.ShutdownGracePeriodByPodPriority, pods []*v1.Pod) []podShutdownGroup {
groups := make([]podShutdownGroup, 0, len(shutdownGracePeriodByPodPriority))
for _, period := range shutdownGracePeriodByPodPriority {
groups = append(groups, podShutdownGroup{
ShutdownGracePeriodByPodPriority: period,
})
}
for _, pod := range pods {
var priority int32
if pod.Spec.Priority != nil {
priority = *pod.Spec.Priority
}
// Find the group index according to the priority.
index := sort.Search(len(groups), func(i int) bool {
return groups[i].Priority >= priority
})
// 1. Those higher than the highest priority default to the highest priority
// 2. Those lower than the lowest priority default to the lowest priority
// 3. Those boundary priority default to the lower priority
// if priority of pod is:
// groups[index-1].Priority <= pod priority < groups[index].Priority
// in which case we want to pick lower one (i.e index-1)
if index == len(groups) {
index = len(groups) - 1
} else if index < 0 {
index = 0
} else if index > 0 && groups[index].Priority > priority {
index--
}
groups[index].Pods = append(groups[index].Pods, pod)
}
return groups
}

View File

@ -21,10 +21,8 @@ limitations under the License.
package nodeshutdown
import (
"context"
"fmt"
"path/filepath"
"sort"
"sync"
"time"
@ -32,23 +30,16 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/apis/scheduling"
"k8s.io/kubernetes/pkg/features"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
kubeletevents "k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/eviction"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/nodeshutdown/systemd"
"k8s.io/kubernetes/pkg/kubelet/prober"
"k8s.io/kubernetes/pkg/kubelet/volumemanager"
"k8s.io/utils/clock"
)
const (
nodeShutdownReason = "Terminated"
nodeShutdownMessage = "Pod was terminated in response to imminent node shutdown."
nodeShutdownNotAdmittedReason = "NodeShutdown"
nodeShutdownNotAdmittedMessage = "Pod was rejected as the node is shutting down."
dbusReconnectPeriod = 1 * time.Second
@ -75,12 +66,7 @@ type managerImpl struct {
nodeRef *v1.ObjectReference
probeManager prober.Manager
volumeManager volumemanager.VolumeManager
shutdownGracePeriodByPodPriority []kubeletconfig.ShutdownGracePeriodByPodPriority
getPods eviction.ActivePodsFunc
killPodFunc eviction.KillPodFunc
syncNodeStatus func()
dbusCon dbusInhibiter
@ -88,53 +74,36 @@ type managerImpl struct {
nodeShuttingDownMutex sync.Mutex
nodeShuttingDownNow bool
clock clock.Clock
podManager *podManager
enableMetrics bool
storage storage
}
// NewManager returns a new node shutdown manager.
func NewManager(conf *Config) (Manager, lifecycle.PodAdmitHandler) {
func NewManager(conf *Config) Manager {
if !utilfeature.DefaultFeatureGate.Enabled(features.GracefulNodeShutdown) {
m := managerStub{}
return m, m
return m
}
shutdownGracePeriodByPodPriority := conf.ShutdownGracePeriodByPodPriority
// Migration from the original configuration
if !utilfeature.DefaultFeatureGate.Enabled(features.GracefulNodeShutdownBasedOnPodPriority) ||
len(shutdownGracePeriodByPodPriority) == 0 {
shutdownGracePeriodByPodPriority = migrateConfig(conf.ShutdownGracePeriodRequested, conf.ShutdownGracePeriodCriticalPods)
}
podManager := newPodManager(conf)
// Disable if the configuration is empty
if len(shutdownGracePeriodByPodPriority) == 0 {
if len(podManager.shutdownGracePeriodByPodPriority) == 0 {
m := managerStub{}
return m, m
return m
}
// Sort by priority from low to high
sort.Slice(shutdownGracePeriodByPodPriority, func(i, j int) bool {
return shutdownGracePeriodByPodPriority[i].Priority < shutdownGracePeriodByPodPriority[j].Priority
})
if conf.Clock == nil {
conf.Clock = clock.RealClock{}
}
manager := &managerImpl{
logger: conf.Logger,
probeManager: conf.ProbeManager,
recorder: conf.Recorder,
volumeManager: conf.VolumeManager,
nodeRef: conf.NodeRef,
getPods: conf.GetPodsFunc,
killPodFunc: conf.KillPodFunc,
syncNodeStatus: conf.SyncNodeStatusFunc,
shutdownGracePeriodByPodPriority: shutdownGracePeriodByPodPriority,
clock: conf.Clock,
enableMetrics: utilfeature.DefaultFeatureGate.Enabled(features.GracefulNodeShutdownBasedOnPodPriority),
logger: conf.Logger,
probeManager: conf.ProbeManager,
recorder: conf.Recorder,
nodeRef: conf.NodeRef,
getPods: conf.GetPodsFunc,
syncNodeStatus: conf.SyncNodeStatusFunc,
podManager: podManager,
enableMetrics: utilfeature.DefaultFeatureGate.Enabled(features.GracefulNodeShutdownBasedOnPodPriority),
storage: localStorage{
Path: filepath.Join(conf.StateDirectory, localStorageStateFile),
},
@ -142,9 +111,9 @@ func NewManager(conf *Config) (Manager, lifecycle.PodAdmitHandler) {
manager.logger.Info("Creating node shutdown manager",
"shutdownGracePeriodRequested", conf.ShutdownGracePeriodRequested,
"shutdownGracePeriodCriticalPods", conf.ShutdownGracePeriodCriticalPods,
"shutdownGracePeriodByPodPriority", shutdownGracePeriodByPodPriority,
"shutdownGracePeriodByPodPriority", podManager.shutdownGracePeriodByPodPriority,
)
return manager, manager
return manager
}
// Admit rejects all pods if node is shutting
@ -217,7 +186,7 @@ func (m *managerImpl) start() (chan struct{}, error) {
}
// If the logind's InhibitDelayMaxUSec as configured in (logind.conf) is less than periodRequested, attempt to update the value to periodRequested.
if periodRequested := m.periodRequested(); periodRequested > currentInhibitDelay {
if periodRequested := m.podManager.periodRequested(); periodRequested > currentInhibitDelay {
err := m.dbusCon.OverrideInhibitDelay(periodRequested)
if err != nil {
return nil, fmt.Errorf("unable to override inhibit delay by shutdown manager: %v", err)
@ -356,166 +325,5 @@ func (m *managerImpl) processShutdownEvent() error {
}()
}
groups := groupByPriority(m.shutdownGracePeriodByPodPriority, activePods)
for _, group := range groups {
// If there are no pods in a particular range,
// then do not wait for pods in that priority range.
if len(group.Pods) == 0 {
continue
}
var wg sync.WaitGroup
wg.Add(len(group.Pods))
for _, pod := range group.Pods {
go func(pod *v1.Pod, group podShutdownGroup) {
defer wg.Done()
gracePeriodOverride := group.ShutdownGracePeriodSeconds
// If the pod's spec specifies a termination gracePeriod which is less than the gracePeriodOverride calculated, use the pod spec termination gracePeriod.
if pod.Spec.TerminationGracePeriodSeconds != nil && *pod.Spec.TerminationGracePeriodSeconds <= gracePeriodOverride {
gracePeriodOverride = *pod.Spec.TerminationGracePeriodSeconds
}
m.logger.V(1).Info("Shutdown manager killing pod with gracePeriod", "pod", klog.KObj(pod), "gracePeriod", gracePeriodOverride)
if err := m.killPodFunc(pod, false, &gracePeriodOverride, func(status *v1.PodStatus) {
// set the pod status to failed (unless it was already in a successful terminal phase)
if status.Phase != v1.PodSucceeded {
status.Phase = v1.PodFailed
}
status.Message = nodeShutdownMessage
status.Reason = nodeShutdownReason
podutil.UpdatePodCondition(status, &v1.PodCondition{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
Reason: v1.PodReasonTerminationByKubelet,
Message: nodeShutdownMessage,
})
}); err != nil {
m.logger.V(1).Info("Shutdown manager failed killing pod", "pod", klog.KObj(pod), "err", err)
} else {
m.logger.V(1).Info("Shutdown manager finished killing pod", "pod", klog.KObj(pod))
}
}(pod, group)
}
// This duration determines how long the shutdown manager will wait for the pods in this group
// to terminate before proceeding to the next group.
var groupTerminationWaitDuration = time.Duration(group.ShutdownGracePeriodSeconds) * time.Second
var (
doneCh = make(chan struct{})
timer = m.clock.NewTimer(groupTerminationWaitDuration)
ctx, ctxCancel = context.WithTimeout(context.Background(), groupTerminationWaitDuration)
)
go func() {
defer close(doneCh)
defer ctxCancel()
wg.Wait()
// The signal to kill a Pod was sent successfully to all the pods,
// let's wait until all the volumes are unmounted from all the pods before
// continuing to the next group. This is done so that the CSI Driver (assuming
// that it's part of the highest group) has a chance to perform unmounts.
if err := m.volumeManager.WaitForAllPodsUnmount(ctx, group.Pods); err != nil {
var podIdentifiers []string
for _, pod := range group.Pods {
podIdentifiers = append(podIdentifiers, fmt.Sprintf("%s/%s", pod.Namespace, pod.Name))
}
// Waiting for volume teardown is done on a best basis effort,
// report an error and continue.
//
// Depending on the user provided kubelet configuration value
// either the `timer` will tick and we'll continue to shutdown the next group, or,
// WaitForAllPodsUnmount will timeout, therefore this goroutine
// will close doneCh and we'll continue to shutdown the next group.
m.logger.Error(err, "Failed while waiting for all the volumes belonging to Pods in this group to unmount", "pods", podIdentifiers)
}
}()
select {
case <-doneCh:
timer.Stop()
case <-timer.C():
ctxCancel()
m.logger.V(1).Info("Shutdown manager pod killing time out", "gracePeriod", group.ShutdownGracePeriodSeconds, "priority", group.Priority)
}
}
return nil
}
func (m *managerImpl) periodRequested() time.Duration {
var sum int64
for _, period := range m.shutdownGracePeriodByPodPriority {
sum += period.ShutdownGracePeriodSeconds
}
return time.Duration(sum) * time.Second
}
func migrateConfig(shutdownGracePeriodRequested, shutdownGracePeriodCriticalPods time.Duration) []kubeletconfig.ShutdownGracePeriodByPodPriority {
if shutdownGracePeriodRequested == 0 {
return nil
}
defaultPriority := shutdownGracePeriodRequested - shutdownGracePeriodCriticalPods
if defaultPriority < 0 {
return nil
}
criticalPriority := shutdownGracePeriodRequested - defaultPriority
if criticalPriority < 0 {
return nil
}
return []kubeletconfig.ShutdownGracePeriodByPodPriority{
{
Priority: scheduling.DefaultPriorityWhenNoDefaultClassExists,
ShutdownGracePeriodSeconds: int64(defaultPriority / time.Second),
},
{
Priority: scheduling.SystemCriticalPriority,
ShutdownGracePeriodSeconds: int64(criticalPriority / time.Second),
},
}
}
func groupByPriority(shutdownGracePeriodByPodPriority []kubeletconfig.ShutdownGracePeriodByPodPriority, pods []*v1.Pod) []podShutdownGroup {
groups := make([]podShutdownGroup, 0, len(shutdownGracePeriodByPodPriority))
for _, period := range shutdownGracePeriodByPodPriority {
groups = append(groups, podShutdownGroup{
ShutdownGracePeriodByPodPriority: period,
})
}
for _, pod := range pods {
var priority int32
if pod.Spec.Priority != nil {
priority = *pod.Spec.Priority
}
// Find the group index according to the priority.
index := sort.Search(len(groups), func(i int) bool {
return groups[i].Priority >= priority
})
// 1. Those higher than the highest priority default to the highest priority
// 2. Those lower than the lowest priority default to the lowest priority
// 3. Those boundary priority default to the lower priority
// if priority of pod is:
// groups[index-1].Priority <= pod priority < groups[index].Priority
// in which case we want to pick lower one (i.e index-1)
if index == len(groups) {
index = len(groups) - 1
} else if index < 0 {
index = 0
} else if index > 0 && groups[index].Priority > priority {
index--
}
groups[index].Pods = append(groups[index].Pods, pod)
}
return groups
}
type podShutdownGroup struct {
kubeletconfig.ShutdownGracePeriodByPodPriority
Pods []*v1.Pod
return m.podManager.killPods(activePods)
}

View File

@ -92,19 +92,6 @@ func (f *fakeDbus) OverrideInhibitDelay(inhibitDelayMax time.Duration) error {
return nil
}
func makePod(name string, priority int32, terminationGracePeriod *int64) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
UID: types.UID(name),
},
Spec: v1.PodSpec{
Priority: &priority,
TerminationGracePeriodSeconds: terminationGracePeriod,
},
}
}
func TestManager(t *testing.T) {
systemDbusTmp := systemDbus
defer func() {
@ -352,7 +339,7 @@ func TestManager(t *testing.T) {
fakeRecorder := &record.FakeRecorder{}
fakeVolumeManager := volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil)
nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
manager, _ := NewManager(&Config{
manager := NewManager(&Config{
Logger: logger,
ProbeManager: proberManager,
VolumeManager: fakeVolumeManager,
@ -459,7 +446,7 @@ func TestFeatureEnabled(t *testing.T) {
fakeVolumeManager := volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil)
nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
manager, _ := NewManager(&Config{
manager := NewManager(&Config{
Logger: logger,
ProbeManager: proberManager,
VolumeManager: fakeVolumeManager,
@ -517,7 +504,7 @@ func TestRestart(t *testing.T) {
fakeRecorder := &record.FakeRecorder{}
fakeVolumeManager := volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil)
nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
manager, _ := NewManager(&Config{
manager := NewManager(&Config{
Logger: logger,
ProbeManager: proberManager,
VolumeManager: fakeVolumeManager,
@ -551,199 +538,6 @@ func TestRestart(t *testing.T) {
}
}
func Test_migrateConfig(t *testing.T) {
type shutdownConfig struct {
shutdownGracePeriodRequested time.Duration
shutdownGracePeriodCriticalPods time.Duration
}
tests := []struct {
name string
args shutdownConfig
want []kubeletconfig.ShutdownGracePeriodByPodPriority
}{
{
name: "both shutdownGracePeriodRequested and shutdownGracePeriodCriticalPods",
args: shutdownConfig{
shutdownGracePeriodRequested: 300 * time.Second,
shutdownGracePeriodCriticalPods: 120 * time.Second,
},
want: []kubeletconfig.ShutdownGracePeriodByPodPriority{
{
Priority: scheduling.DefaultPriorityWhenNoDefaultClassExists,
ShutdownGracePeriodSeconds: 180,
},
{
Priority: scheduling.SystemCriticalPriority,
ShutdownGracePeriodSeconds: 120,
},
},
},
{
name: "only shutdownGracePeriodRequested",
args: shutdownConfig{
shutdownGracePeriodRequested: 100 * time.Second,
shutdownGracePeriodCriticalPods: 0 * time.Second,
},
want: []kubeletconfig.ShutdownGracePeriodByPodPriority{
{
Priority: scheduling.DefaultPriorityWhenNoDefaultClassExists,
ShutdownGracePeriodSeconds: 100,
},
{
Priority: scheduling.SystemCriticalPriority,
ShutdownGracePeriodSeconds: 0,
},
},
},
{
name: "empty configuration",
args: shutdownConfig{
shutdownGracePeriodRequested: 0 * time.Second,
shutdownGracePeriodCriticalPods: 0 * time.Second,
},
want: nil,
},
{
name: "wrong configuration",
args: shutdownConfig{
shutdownGracePeriodRequested: 1 * time.Second,
shutdownGracePeriodCriticalPods: 100 * time.Second,
},
want: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := migrateConfig(tt.args.shutdownGracePeriodRequested, tt.args.shutdownGracePeriodCriticalPods); !assert.Equal(t, tt.want, got) {
t.Errorf("migrateConfig() = %v, want %v", got, tt.want)
}
})
}
}
func Test_groupByPriority(t *testing.T) {
type args struct {
shutdownGracePeriodByPodPriority []kubeletconfig.ShutdownGracePeriodByPodPriority
pods []*v1.Pod
}
tests := []struct {
name string
args args
want []podShutdownGroup
}{
{
name: "migrate config",
args: args{
shutdownGracePeriodByPodPriority: migrateConfig(300*time.Second /* shutdownGracePeriodRequested */, 120*time.Second /* shutdownGracePeriodCriticalPods */),
pods: []*v1.Pod{
makePod("normal-pod", scheduling.DefaultPriorityWhenNoDefaultClassExists, nil),
makePod("highest-user-definable-pod", scheduling.HighestUserDefinablePriority, nil),
makePod("critical-pod", scheduling.SystemCriticalPriority, nil),
},
},
want: []podShutdownGroup{
{
ShutdownGracePeriodByPodPriority: kubeletconfig.ShutdownGracePeriodByPodPriority{
Priority: scheduling.DefaultPriorityWhenNoDefaultClassExists,
ShutdownGracePeriodSeconds: 180,
},
Pods: []*v1.Pod{
makePod("normal-pod", scheduling.DefaultPriorityWhenNoDefaultClassExists, nil),
makePod("highest-user-definable-pod", scheduling.HighestUserDefinablePriority, nil),
},
},
{
ShutdownGracePeriodByPodPriority: kubeletconfig.ShutdownGracePeriodByPodPriority{
Priority: scheduling.SystemCriticalPriority,
ShutdownGracePeriodSeconds: 120,
},
Pods: []*v1.Pod{
makePod("critical-pod", scheduling.SystemCriticalPriority, nil),
},
},
},
},
{
name: "pod priority",
args: args{
shutdownGracePeriodByPodPriority: []kubeletconfig.ShutdownGracePeriodByPodPriority{
{
Priority: 1,
ShutdownGracePeriodSeconds: 10,
},
{
Priority: 2,
ShutdownGracePeriodSeconds: 20,
},
{
Priority: 3,
ShutdownGracePeriodSeconds: 30,
},
{
Priority: 4,
ShutdownGracePeriodSeconds: 40,
},
},
pods: []*v1.Pod{
makePod("pod-0", 0, nil),
makePod("pod-1", 1, nil),
makePod("pod-2", 2, nil),
makePod("pod-3", 3, nil),
makePod("pod-4", 4, nil),
makePod("pod-5", 5, nil),
},
},
want: []podShutdownGroup{
{
ShutdownGracePeriodByPodPriority: kubeletconfig.ShutdownGracePeriodByPodPriority{
Priority: 1,
ShutdownGracePeriodSeconds: 10,
},
Pods: []*v1.Pod{
makePod("pod-0", 0, nil),
makePod("pod-1", 1, nil),
},
},
{
ShutdownGracePeriodByPodPriority: kubeletconfig.ShutdownGracePeriodByPodPriority{
Priority: 2,
ShutdownGracePeriodSeconds: 20,
},
Pods: []*v1.Pod{
makePod("pod-2", 2, nil),
},
},
{
ShutdownGracePeriodByPodPriority: kubeletconfig.ShutdownGracePeriodByPodPriority{
Priority: 3,
ShutdownGracePeriodSeconds: 30,
},
Pods: []*v1.Pod{
makePod("pod-3", 3, nil),
},
},
{
ShutdownGracePeriodByPodPriority: kubeletconfig.ShutdownGracePeriodByPodPriority{
Priority: 4,
ShutdownGracePeriodSeconds: 40,
},
Pods: []*v1.Pod{
makePod("pod-4", 4, nil),
makePod("pod-5", 5, nil),
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := groupByPriority(tt.args.shutdownGracePeriodByPodPriority, tt.args.pods); !assert.Equal(t, tt.want, got) {
t.Errorf("groupByPriority() = %v, want %v", got, tt.want)
}
})
}
}
func Test_managerImpl_processShutdownEvent(t *testing.T) {
var (
probeManager = probetest.FakeManager{}
@ -818,20 +612,23 @@ func Test_managerImpl_processShutdownEvent(t *testing.T) {
),
)
m := &managerImpl{
logger: logger,
volumeManager: tt.fields.volumeManager,
recorder: tt.fields.recorder,
nodeRef: tt.fields.nodeRef,
probeManager: tt.fields.probeManager,
shutdownGracePeriodByPodPriority: tt.fields.shutdownGracePeriodByPodPriority,
getPods: tt.fields.getPods,
killPodFunc: tt.fields.killPodFunc,
syncNodeStatus: tt.fields.syncNodeStatus,
dbusCon: tt.fields.dbusCon,
inhibitLock: tt.fields.inhibitLock,
nodeShuttingDownMutex: sync.Mutex{},
nodeShuttingDownNow: tt.fields.nodeShuttingDownNow,
clock: tt.fields.clock,
logger: logger,
recorder: tt.fields.recorder,
nodeRef: tt.fields.nodeRef,
probeManager: tt.fields.probeManager,
getPods: tt.fields.getPods,
syncNodeStatus: tt.fields.syncNodeStatus,
dbusCon: tt.fields.dbusCon,
inhibitLock: tt.fields.inhibitLock,
nodeShuttingDownMutex: sync.Mutex{},
nodeShuttingDownNow: tt.fields.nodeShuttingDownNow,
podManager: &podManager{
logger: logger,
volumeManager: tt.fields.volumeManager,
shutdownGracePeriodByPodPriority: tt.fields.shutdownGracePeriodByPodPriority,
killPodFunc: tt.fields.killPodFunc,
clock: tt.fields.clock,
},
}
if err := m.processShutdownEvent(); (err != nil) != tt.wantErr {
t.Errorf("managerImpl.processShutdownEvent() error = %v, wantErr %v", err, tt.wantErr)
@ -870,28 +667,31 @@ func Test_processShutdownEvent_VolumeUnmountTimeout(t *testing.T) {
)
logger := ktesting.NewLogger(t, ktesting.NewConfig(ktesting.BufferLogs(true)))
m := &managerImpl{
logger: logger,
volumeManager: fakeVolumeManager,
recorder: fakeRecorder,
nodeRef: nodeRef,
probeManager: probeManager,
shutdownGracePeriodByPodPriority: []kubeletconfig.ShutdownGracePeriodByPodPriority{
{
Priority: 1,
ShutdownGracePeriodSeconds: int64(shutdownGracePeriodSeconds),
},
},
logger: logger,
recorder: fakeRecorder,
nodeRef: nodeRef,
probeManager: probeManager,
getPods: func() []*v1.Pod {
return []*v1.Pod{
makePod("test-pod", 1, nil),
}
},
killPodFunc: func(pod *v1.Pod, isEvicted bool, gracePeriodOverride *int64, fn func(*v1.PodStatus)) error {
return nil
},
syncNodeStatus: syncNodeStatus,
dbusCon: &fakeDbus{},
clock: fakeclock,
podManager: &podManager{
logger: logger,
volumeManager: fakeVolumeManager,
shutdownGracePeriodByPodPriority: []kubeletconfig.ShutdownGracePeriodByPodPriority{
{
Priority: 1,
ShutdownGracePeriodSeconds: int64(shutdownGracePeriodSeconds),
},
},
killPodFunc: func(pod *v1.Pod, isEvicted bool, gracePeriodOverride *int64, fn func(*v1.PodStatus)) error {
return nil
},
clock: fakeclock,
},
}
start := fakeclock.Now()

View File

@ -19,12 +19,8 @@ limitations under the License.
package nodeshutdown
import (
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
)
// NewManager returns a fake node shutdown manager for non linux platforms.
func NewManager(conf *Config) (Manager, lifecycle.PodAdmitHandler) {
func NewManager(conf *Config) Manager {
m := managerStub{}
return m, m
return m
}

View File

@ -0,0 +1,235 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodeshutdown
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/apis/scheduling"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
)
func makePod(name string, priority int32, terminationGracePeriod *int64) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
UID: types.UID(name),
},
Spec: v1.PodSpec{
Priority: &priority,
TerminationGracePeriodSeconds: terminationGracePeriod,
},
}
}
func Test_migrateConfig(t *testing.T) {
type shutdownConfig struct {
shutdownGracePeriodRequested time.Duration
shutdownGracePeriodCriticalPods time.Duration
}
tests := []struct {
name string
args shutdownConfig
want []kubeletconfig.ShutdownGracePeriodByPodPriority
}{
{
name: "both shutdownGracePeriodRequested and shutdownGracePeriodCriticalPods",
args: shutdownConfig{
shutdownGracePeriodRequested: 300 * time.Second,
shutdownGracePeriodCriticalPods: 120 * time.Second,
},
want: []kubeletconfig.ShutdownGracePeriodByPodPriority{
{
Priority: scheduling.DefaultPriorityWhenNoDefaultClassExists,
ShutdownGracePeriodSeconds: 180,
},
{
Priority: scheduling.SystemCriticalPriority,
ShutdownGracePeriodSeconds: 120,
},
},
},
{
name: "only shutdownGracePeriodRequested",
args: shutdownConfig{
shutdownGracePeriodRequested: 100 * time.Second,
shutdownGracePeriodCriticalPods: 0 * time.Second,
},
want: []kubeletconfig.ShutdownGracePeriodByPodPriority{
{
Priority: scheduling.DefaultPriorityWhenNoDefaultClassExists,
ShutdownGracePeriodSeconds: 100,
},
{
Priority: scheduling.SystemCriticalPriority,
ShutdownGracePeriodSeconds: 0,
},
},
},
{
name: "empty configuration",
args: shutdownConfig{
shutdownGracePeriodRequested: 0 * time.Second,
shutdownGracePeriodCriticalPods: 0 * time.Second,
},
want: nil,
},
{
name: "wrong configuration",
args: shutdownConfig{
shutdownGracePeriodRequested: 1 * time.Second,
shutdownGracePeriodCriticalPods: 100 * time.Second,
},
want: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := migrateConfig(tt.args.shutdownGracePeriodRequested, tt.args.shutdownGracePeriodCriticalPods); !assert.Equal(t, tt.want, got) {
t.Errorf("migrateConfig() = %v, want %v", got, tt.want)
}
})
}
}
func Test_groupByPriority(t *testing.T) {
type args struct {
shutdownGracePeriodByPodPriority []kubeletconfig.ShutdownGracePeriodByPodPriority
pods []*v1.Pod
}
tests := []struct {
name string
args args
want []podShutdownGroup
}{
{
name: "migrate config",
args: args{
shutdownGracePeriodByPodPriority: migrateConfig(300*time.Second /* shutdownGracePeriodRequested */, 120*time.Second /* shutdownGracePeriodCriticalPods */),
pods: []*v1.Pod{
makePod("normal-pod", scheduling.DefaultPriorityWhenNoDefaultClassExists, nil),
makePod("highest-user-definable-pod", scheduling.HighestUserDefinablePriority, nil),
makePod("critical-pod", scheduling.SystemCriticalPriority, nil),
},
},
want: []podShutdownGroup{
{
ShutdownGracePeriodByPodPriority: kubeletconfig.ShutdownGracePeriodByPodPriority{
Priority: scheduling.DefaultPriorityWhenNoDefaultClassExists,
ShutdownGracePeriodSeconds: 180,
},
Pods: []*v1.Pod{
makePod("normal-pod", scheduling.DefaultPriorityWhenNoDefaultClassExists, nil),
makePod("highest-user-definable-pod", scheduling.HighestUserDefinablePriority, nil),
},
},
{
ShutdownGracePeriodByPodPriority: kubeletconfig.ShutdownGracePeriodByPodPriority{
Priority: scheduling.SystemCriticalPriority,
ShutdownGracePeriodSeconds: 120,
},
Pods: []*v1.Pod{
makePod("critical-pod", scheduling.SystemCriticalPriority, nil),
},
},
},
},
{
name: "pod priority",
args: args{
shutdownGracePeriodByPodPriority: []kubeletconfig.ShutdownGracePeriodByPodPriority{
{
Priority: 1,
ShutdownGracePeriodSeconds: 10,
},
{
Priority: 2,
ShutdownGracePeriodSeconds: 20,
},
{
Priority: 3,
ShutdownGracePeriodSeconds: 30,
},
{
Priority: 4,
ShutdownGracePeriodSeconds: 40,
},
},
pods: []*v1.Pod{
makePod("pod-0", 0, nil),
makePod("pod-1", 1, nil),
makePod("pod-2", 2, nil),
makePod("pod-3", 3, nil),
makePod("pod-4", 4, nil),
makePod("pod-5", 5, nil),
},
},
want: []podShutdownGroup{
{
ShutdownGracePeriodByPodPriority: kubeletconfig.ShutdownGracePeriodByPodPriority{
Priority: 1,
ShutdownGracePeriodSeconds: 10,
},
Pods: []*v1.Pod{
makePod("pod-0", 0, nil),
makePod("pod-1", 1, nil),
},
},
{
ShutdownGracePeriodByPodPriority: kubeletconfig.ShutdownGracePeriodByPodPriority{
Priority: 2,
ShutdownGracePeriodSeconds: 20,
},
Pods: []*v1.Pod{
makePod("pod-2", 2, nil),
},
},
{
ShutdownGracePeriodByPodPriority: kubeletconfig.ShutdownGracePeriodByPodPriority{
Priority: 3,
ShutdownGracePeriodSeconds: 30,
},
Pods: []*v1.Pod{
makePod("pod-3", 3, nil),
},
},
{
ShutdownGracePeriodByPodPriority: kubeletconfig.ShutdownGracePeriodByPodPriority{
Priority: 4,
ShutdownGracePeriodSeconds: 40,
},
Pods: []*v1.Pod{
makePod("pod-4", 4, nil),
makePod("pod-5", 5, nil),
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := groupByPriority(tt.args.shutdownGracePeriodByPodPriority, tt.args.pods); !assert.Equal(t, tt.want, got) {
t.Errorf("groupByPriority() = %v, want %v", got, tt.want)
}
})
}
}