Windows node graceful shutdown

This commit is contained in:
zylxjtu 2024-09-11 22:02:42 +00:00
parent 3140807126
commit 459952a067
10 changed files with 1243 additions and 11 deletions

View File

@ -89,7 +89,7 @@ func initForOS(windowsService bool, windowsPriorityClass string) error {
}
if windowsService {
return service.InitService(serviceName)
return service.InitServiceWithShutdown(serviceName)
}
return nil
}

View File

@ -703,6 +703,13 @@ const (
// Allows kube-proxy to create DSR loadbalancers for Windows
WinDSR featuregate.Feature = "WinDSR"
// owner: @zylxjtu
// kep: https://kep.k8s.io/4802
// alpha: v1.32
//
// Enables support for graceful shutdown windows node.
WindowsGracefulNodeShutdown featuregate.Feature = "WindowsGracefulNodeShutdown"
// owner: @ksubrmnn
//
// Allows kube-proxy to run in Overlay mode for Windows

View File

@ -762,7 +762,9 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
WinDSR: {
{Version: version.MustParse("1.14"), Default: false, PreRelease: featuregate.Alpha},
},
WindowsGracefulNodeShutdown: {
{Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Alpha},
},
WinOverlay: {
{Version: version.MustParse("1.14"), Default: false, PreRelease: featuregate.Alpha},
{Version: version.MustParse("1.20"), Default: true, PreRelease: featuregate.Beta},

View File

@ -1,5 +1,5 @@
//go:build !linux
// +build !linux
//go:build !linux && !windows
// +build !linux,!windows
/*
Copyright 2020 The Kubernetes Authors.
@ -19,7 +19,7 @@ limitations under the License.
package nodeshutdown
// NewManager returns a fake node shutdown manager for non linux platforms.
// NewManager returns a fake node shutdown manager for unsupported platforms.
func NewManager(conf *Config) Manager {
m := managerStub{}
return m

View File

@ -0,0 +1,381 @@
//go:build windows
// +build windows
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package nodeshutdown can watch for node level shutdown events and trigger graceful termination of pods running on the node prior to a system shutdown.
package nodeshutdown
import (
"fmt"
"path/filepath"
"strings"
"sync"
"time"
v1 "k8s.io/api/core/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
kubeletevents "k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/eviction"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/prober"
"k8s.io/kubernetes/pkg/windows/service"
"github.com/pkg/errors"
"golang.org/x/sys/windows/registry"
"golang.org/x/sys/windows/svc/mgr"
)
const (
// Kubelet service name
serviceKubelet = "kubelet"
shutdownOrderRegPath = `SYSTEM\CurrentControlSet\Control`
shutdownOrderStringValue = "PreshutdownOrder"
)
const (
nodeShutdownNotAdmittedReason = "NodeShutdown"
nodeShutdownNotAdmittedMessage = "Pod was rejected as the node is shutting down."
localStorageStateFile = "graceful_node_shutdown_state"
)
// managerImpl has functions that can be used to interact with the Node Shutdown Manager.
type managerImpl struct {
logger klog.Logger
recorder record.EventRecorder
nodeRef *v1.ObjectReference
probeManager prober.Manager
getPods eviction.ActivePodsFunc
syncNodeStatus func()
nodeShuttingDownMutex sync.Mutex
nodeShuttingDownNow bool
podManager *podManager
enableMetrics bool
storage storage
}
// NewManager returns a new node shutdown manager.
func NewManager(conf *Config) Manager {
if !utilfeature.DefaultFeatureGate.Enabled(features.WindowsGracefulNodeShutdown) {
m := managerStub{}
conf.Logger.Info("Node shutdown manager is disabled as the feature gate is not enabled")
return m
}
podManager := newPodManager(conf)
// Disable if the configuration is empty
if len(podManager.shutdownGracePeriodByPodPriority) == 0 {
m := managerStub{}
conf.Logger.Info("Node shutdown manager is disabled as no shutdown grace period is configured")
return m
}
manager := &managerImpl{
logger: conf.Logger,
probeManager: conf.ProbeManager,
recorder: conf.Recorder,
nodeRef: conf.NodeRef,
getPods: conf.GetPodsFunc,
syncNodeStatus: conf.SyncNodeStatusFunc,
podManager: podManager,
enableMetrics: utilfeature.DefaultFeatureGate.Enabled(features.GracefulNodeShutdownBasedOnPodPriority),
storage: localStorage{
Path: filepath.Join(conf.StateDirectory, localStorageStateFile),
},
}
manager.logger.Info("Creating node shutdown manager",
"shutdownGracePeriodRequested", conf.ShutdownGracePeriodRequested,
"shutdownGracePeriodCriticalPods", conf.ShutdownGracePeriodCriticalPods,
"shutdownGracePeriodByPodPriority", podManager.shutdownGracePeriodByPodPriority,
)
return manager
}
// Admit rejects all pods if node is shutting
func (m *managerImpl) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
nodeShuttingDown := m.ShutdownStatus() != nil
if nodeShuttingDown {
return lifecycle.PodAdmitResult{
Admit: false,
Reason: nodeShutdownNotAdmittedReason,
Message: nodeShutdownNotAdmittedMessage,
}
}
return lifecycle.PodAdmitResult{Admit: true}
}
// setMetrics sets the metrics for the node shutdown manager.
func (m *managerImpl) setMetrics() {
if m.enableMetrics && m.storage != nil {
sta := state{}
err := m.storage.Load(&sta)
if err != nil {
m.logger.Error(err, "Failed to load graceful shutdown state")
} else {
if !sta.StartTime.IsZero() {
metrics.GracefulShutdownStartTime.Set(timestamp(sta.StartTime))
}
if !sta.EndTime.IsZero() {
metrics.GracefulShutdownEndTime.Set(timestamp(sta.EndTime))
}
}
}
}
// Start starts the node shutdown manager and will start watching the node for shutdown events.
func (m *managerImpl) Start() error {
m.logger.V(1).Info("Shutdown manager get started")
_, err := m.start()
if err != nil {
return err
}
service.SetPreShutdownHandler(m)
m.setMetrics()
return nil
}
// Check the return value, maybe return error only
func (m *managerImpl) start() (chan struct{}, error) {
// Process the shutdown only when it is running as a windows service
isServiceInitialized := service.IsServiceInitialized()
if !isServiceInitialized {
return nil, errors.Errorf("%s is NOT running as a Windows service", serviceKubelet)
}
// Update the registry key to add the kubelet dependencies to the existing order
mgr, err := mgr.Connect()
if err != nil {
return nil, errors.Wrapf(err, "Could not connect to service manager")
}
defer mgr.Disconnect()
s, err := mgr.OpenService(serviceKubelet)
if err != nil {
return nil, errors.Wrapf(err, "Could not access service %s", serviceKubelet)
}
defer s.Close()
preshutdownInfo, err := service.QueryPreShutdownInfo(s.Handle)
if err != nil {
return nil, errors.Wrapf(err, "Could not query preshutdown info")
}
m.logger.V(1).Info("Shutdown manager get current preshutdown info", "PreshutdownTimeout", preshutdownInfo.PreshutdownTimeout)
config, err := s.Config()
if err != nil {
return nil, errors.Wrapf(err, "Could not access config of service %s", serviceKubelet)
}
// Open the registry key
key, err := registry.OpenKey(registry.LOCAL_MACHINE, shutdownOrderRegPath, registry.QUERY_VALUE|registry.SET_VALUE)
if err != nil {
return nil, errors.Wrapf(err, "Could not access registry")
}
defer key.Close()
// Read the existing values
existingOrders, _, err := key.GetStringsValue(shutdownOrderStringValue)
if err != nil {
return nil, errors.Wrapf(err, "Could not access registry value %s", shutdownOrderStringValue)
}
m.logger.V(1).Info("Shutdown manager get current service preshutdown order", "Preshutdownorder", existingOrders)
// Add the kubelet dependencies to the existing order
newOrders := addToExistingOrder(config.Dependencies, existingOrders)
err = key.SetStringsValue("PreshutdownOrder", newOrders)
if err != nil {
return nil, errors.Wrapf(err, "Could not set registry %s to be new value %s", shutdownOrderStringValue, newOrders)
}
// If the preshutdown timeout is less than periodRequested, attempt to update the value to periodRequested.
if periodRequested := m.periodRequested().Milliseconds(); periodRequested > int64(preshutdownInfo.PreshutdownTimeout) {
m.logger.V(1).Info("Shutdown manager override preshutdown info", "ShutdownGracePeriod", periodRequested)
err := service.UpdatePreShutdownInfo(s.Handle, uint32(periodRequested))
if err != nil {
return nil, fmt.Errorf("Unable to override preshoutdown config by shutdown manager: %v", err)
}
// Read the preshutdownInfo again, if the override was successful, preshutdownInfo will be equal to shutdownGracePeriodRequested.
preshutdownInfo, err := service.QueryPreShutdownInfo(s.Handle)
if err != nil {
return nil, fmt.Errorf("Unable to get preshoutdown info after overrided by shutdown manager: %v", err)
}
if periodRequested > int64(preshutdownInfo.PreshutdownTimeout) {
return nil, fmt.Errorf("Shutdown manager was unable to update preshutdown info to %v (ShutdownGracePeriod), current value of preshutdown info (%v) is less than requested ShutdownGracePeriod", periodRequested, preshutdownInfo.PreshutdownTimeout)
}
}
return nil, nil
}
// ShutdownStatus will return an error if the node is currently shutting down.
func (m *managerImpl) ShutdownStatus() error {
m.nodeShuttingDownMutex.Lock()
defer m.nodeShuttingDownMutex.Unlock()
if m.nodeShuttingDownNow {
return fmt.Errorf("node is shutting down")
}
return nil
}
func (m *managerImpl) ProcessShutdownEvent() error {
m.logger.V(1).Info("Shutdown manager detected new preshutdown event", "event", "preshutdown")
m.recorder.Event(m.nodeRef, v1.EventTypeNormal, kubeletevents.NodeShutdown, "Shutdown manager detected preshutdown event")
m.nodeShuttingDownMutex.Lock()
m.nodeShuttingDownNow = true
m.nodeShuttingDownMutex.Unlock()
go m.syncNodeStatus()
m.logger.V(1).Info("Shutdown manager processing preshutdown event")
activePods := m.getPods()
defer func() {
m.logger.V(1).Info("Shutdown manager completed processing preshutdown event, node will shutdown shortly")
}()
if m.enableMetrics && m.storage != nil {
startTime := time.Now()
err := m.storage.Store(state{
StartTime: startTime,
})
if err != nil {
m.logger.Error(err, "Failed to store graceful shutdown state")
}
metrics.GracefulShutdownStartTime.Set(timestamp(startTime))
metrics.GracefulShutdownEndTime.Set(0)
defer func() {
endTime := time.Now()
err := m.storage.Store(state{
StartTime: startTime,
EndTime: endTime,
})
if err != nil {
m.logger.Error(err, "Failed to store graceful shutdown state")
}
metrics.GracefulShutdownEndTime.Set(timestamp(endTime))
}()
}
return m.podManager.killPods(activePods)
}
func (m *managerImpl) periodRequested() time.Duration {
var sum int64
for _, period := range m.podManager.shutdownGracePeriodByPodPriority {
sum += period.ShutdownGracePeriodSeconds
}
return time.Duration(sum) * time.Second
}
// Helper function to remove all occurrences of a specific item from a string list
func removeItemFromList(stringlist []string, item string) []string {
writeIndex := 0
// Iterate through the list and only keep those don't match the item (case-insensitive)
for _, listItem := range stringlist {
if !strings.EqualFold(listItem, item) {
stringlist[writeIndex] = listItem
writeIndex++
}
}
// Return the modified slice, trimmed to the valid length
return stringlist[:writeIndex]
}
// Helper function to insert an element into a slice at a specified index
func insertAt(slice []string, index int, value string) []string {
// If the index is greater than or equal to the length, append the element to the end
if index >= len(slice) {
return append(slice, value)
}
// Ensure there's enough space in the slice by appending a zero-value element first
slice = append(slice, "")
copy(slice[index+1:], slice[index:])
slice[index] = value
return slice
}
// Dependencies: ["a", "b", "c", "d"]
// ExistingOrder: ["x", "b", "y", "c", "z"]
// The output will be:
// Modified List: ["x", "kubelet", "b", "y", "c", "z", "a", "d"]
func addToExistingOrder(dependencies []string, existingOrder []string) []string {
// Do nothing if dependencies is empty
if len(dependencies) == 0 {
return existingOrder
}
// Remove "Kubelet" from existing order if any
existingOrder = removeItemFromList(existingOrder, serviceKubelet)
// Append items from dependencies to existingOrder if not already present
existingOrderMap := make(map[string]bool)
for _, item := range existingOrder {
existingOrderMap[item] = true
}
// Append non-duplicate items from dependencies to existingOrder
for _, item := range dependencies {
if !existingOrderMap[item] {
existingOrder = append(existingOrder, item)
}
}
// Insert "kubelet" before the first common item
// Honor the order of existing order
firstCommonIndex := -1
for i, item := range existingOrder {
for _, item1 := range dependencies {
if item == item1 {
firstCommonIndex = i
break
}
}
if firstCommonIndex != -1 {
break
}
}
// If a common item is found, insert "kubelet" before it
if firstCommonIndex != -1 {
existingOrder = insertAt(existingOrder, firstCommonIndex, serviceKubelet)
}
return existingOrder
}

View File

@ -0,0 +1,358 @@
//go:build windows
// +build windows
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodeshutdown
import (
"fmt"
"os"
"strings"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2/ktesting"
_ "k8s.io/klog/v2/ktesting/init" // activate ktesting command line flags
pkgfeatures "k8s.io/kubernetes/pkg/features"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/eviction"
"k8s.io/kubernetes/pkg/kubelet/prober"
probetest "k8s.io/kubernetes/pkg/kubelet/prober/testing"
"k8s.io/kubernetes/pkg/kubelet/volumemanager"
"k8s.io/utils/clock"
testingclock "k8s.io/utils/clock/testing"
)
func TestFeatureEnabled(t *testing.T) {
var tests = []struct {
desc string
shutdownGracePeriodRequested time.Duration
featureGateEnabled bool
expectEnabled bool
}{
{
desc: "shutdownGracePeriodRequested 0; disables feature",
shutdownGracePeriodRequested: time.Duration(0 * time.Second),
featureGateEnabled: true,
expectEnabled: false,
},
{
desc: "feature gate disabled; disables feature",
shutdownGracePeriodRequested: time.Duration(100 * time.Second),
featureGateEnabled: false,
expectEnabled: false,
},
{
desc: "feature gate enabled; shutdownGracePeriodRequested > 0; enables feature",
shutdownGracePeriodRequested: time.Duration(100 * time.Second),
featureGateEnabled: true,
expectEnabled: true,
},
}
for _, tc := range tests {
t.Run(tc.desc, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
activePodsFunc := func() []*v1.Pod {
return nil
}
killPodsFunc := func(pod *v1.Pod, evict bool, gracePeriodOverride *int64, fn func(*v1.PodStatus)) error {
return nil
}
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, pkgfeatures.WindowsGracefulNodeShutdown, tc.featureGateEnabled)
proberManager := probetest.FakeManager{}
fakeRecorder := &record.FakeRecorder{}
fakeVolumeManager := volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil)
nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
manager := NewManager(&Config{
Logger: logger,
ProbeManager: proberManager,
VolumeManager: fakeVolumeManager,
Recorder: fakeRecorder,
NodeRef: nodeRef,
GetPodsFunc: activePodsFunc,
KillPodFunc: killPodsFunc,
SyncNodeStatusFunc: func() {},
ShutdownGracePeriodRequested: tc.shutdownGracePeriodRequested,
ShutdownGracePeriodCriticalPods: 0,
StateDirectory: os.TempDir(),
})
assert.Equal(t, tc.expectEnabled, manager != managerStub{})
})
}
}
func Test_managerImpl_ProcessShutdownEvent(t *testing.T) {
var (
probeManager = probetest.FakeManager{}
fakeRecorder = &record.FakeRecorder{}
fakeVolumeManager = volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil)
syncNodeStatus = func() {}
nodeRef = &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
fakeclock = testingclock.NewFakeClock(time.Now())
)
type fields struct {
recorder record.EventRecorder
nodeRef *v1.ObjectReference
probeManager prober.Manager
volumeManager volumemanager.VolumeManager
shutdownGracePeriodByPodPriority []kubeletconfig.ShutdownGracePeriodByPodPriority
getPods eviction.ActivePodsFunc
killPodFunc eviction.KillPodFunc
syncNodeStatus func()
nodeShuttingDownNow bool
clock clock.Clock
}
tests := []struct {
name string
fields fields
wantErr bool
expectedOutputContains string
expectedOutputNotContains string
}{
{
name: "kill pod func finished in time",
fields: fields{
recorder: fakeRecorder,
nodeRef: nodeRef,
probeManager: probeManager,
volumeManager: fakeVolumeManager,
shutdownGracePeriodByPodPriority: []kubeletconfig.ShutdownGracePeriodByPodPriority{
{
Priority: 1,
ShutdownGracePeriodSeconds: 10,
},
{
Priority: 2,
ShutdownGracePeriodSeconds: 20,
},
},
getPods: func() []*v1.Pod {
return []*v1.Pod{
makePod("normal-pod", 1, nil),
makePod("critical-pod", 2, nil),
}
},
killPodFunc: func(pod *v1.Pod, isEvicted bool, gracePeriodOverride *int64, fn func(*v1.PodStatus)) error {
fakeclock.Step(5 * time.Second)
return nil
},
syncNodeStatus: syncNodeStatus,
clock: fakeclock,
},
wantErr: false,
expectedOutputNotContains: "Shutdown manager pod killing time out",
},
{
name: "kill pod func take too long",
fields: fields{
recorder: fakeRecorder,
nodeRef: nodeRef,
probeManager: probeManager,
volumeManager: fakeVolumeManager,
shutdownGracePeriodByPodPriority: []kubeletconfig.ShutdownGracePeriodByPodPriority{
{
Priority: 1,
ShutdownGracePeriodSeconds: 10,
},
{
Priority: 2,
ShutdownGracePeriodSeconds: 20,
},
},
getPods: func() []*v1.Pod {
return []*v1.Pod{
makePod("normal-pod", 1, nil),
makePod("critical-pod", 2, nil),
}
},
killPodFunc: func(pod *v1.Pod, isEvicted bool, gracePeriodOverride *int64, fn func(*v1.PodStatus)) error {
fakeclock.Step(60 * time.Second)
return nil
},
syncNodeStatus: syncNodeStatus,
clock: fakeclock,
},
wantErr: false,
expectedOutputContains: "Shutdown manager pod killing time out",
},
{
name: "volumeManager failed timed out",
fields: fields{
recorder: fakeRecorder,
nodeRef: nodeRef,
probeManager: probeManager,
volumeManager: volumemanager.NewFakeVolumeManager(
[]v1.UniqueVolumeName{},
3*time.Second, // This value is intentionally longer than the shutdownGracePeriodSeconds (2s) to test the behavior
// for volume unmount operations that take longer than the allowed grace period.
fmt.Errorf("unmount timeout"),
),
shutdownGracePeriodByPodPriority: []kubeletconfig.ShutdownGracePeriodByPodPriority{
{
Priority: 1,
ShutdownGracePeriodSeconds: 2,
},
},
getPods: func() []*v1.Pod {
return []*v1.Pod{
makePod("normal-pod", 1, nil),
makePod("critical-pod", 2, nil),
}
},
killPodFunc: func(pod *v1.Pod, isEvicted bool, gracePeriodOverride *int64, fn func(*v1.PodStatus)) error {
return nil
},
syncNodeStatus: syncNodeStatus,
clock: fakeclock,
},
wantErr: false,
expectedOutputContains: "Failed while waiting for all the volumes belonging to Pods in this group to unmount",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
logger := ktesting.NewLogger(t,
ktesting.NewConfig(
ktesting.BufferLogs(true),
),
)
m := &managerImpl{
logger: logger,
recorder: tt.fields.recorder,
nodeRef: tt.fields.nodeRef,
probeManager: tt.fields.probeManager,
getPods: tt.fields.getPods,
syncNodeStatus: tt.fields.syncNodeStatus,
nodeShuttingDownMutex: sync.Mutex{},
nodeShuttingDownNow: tt.fields.nodeShuttingDownNow,
podManager: &podManager{
logger: logger,
volumeManager: tt.fields.volumeManager,
shutdownGracePeriodByPodPriority: tt.fields.shutdownGracePeriodByPodPriority,
killPodFunc: tt.fields.killPodFunc,
clock: tt.fields.clock,
},
}
if err := m.ProcessShutdownEvent(); (err != nil) != tt.wantErr {
t.Errorf("managerImpl.processShutdownEvent() error = %v, wantErr %v", err, tt.wantErr)
}
underlier, ok := logger.GetSink().(ktesting.Underlier)
if !ok {
t.Fatalf("Should have had a ktesting LogSink, got %T", logger.GetSink())
}
log := underlier.GetBuffer().String()
if tt.expectedOutputContains != "" && !strings.Contains(log, tt.expectedOutputContains) {
// Log will be shown on failure. To see it
// during a successful run use "go test -v".
t.Errorf("managerImpl.processShutdownEvent() should have logged %s, see actual output above.", tt.expectedOutputContains)
}
if tt.expectedOutputNotContains != "" && strings.Contains(log, tt.expectedOutputNotContains) {
// Log will be shown on failure. To see it
// during a successful run use "go test -v".
t.Errorf("managerImpl.processShutdownEvent() should have not logged %s, see actual output above.", tt.expectedOutputNotContains)
}
})
}
}
func Test_addToExistingOrder(t *testing.T) {
var tests = []struct {
desc string
dependencies []string
existingOrder []string
expectedOrder []string
}{
{
desc: "dependencies and existingOrder are empty, expectedOrder to be empty",
dependencies: []string{},
existingOrder: []string{},
expectedOrder: []string{},
},
{
desc: "dependencies are empty, expectedOrder to be the same as existingOrder",
dependencies: []string{},
existingOrder: []string{"kubelet", "a", "b", "c"},
expectedOrder: []string{"kubelet", "a", "b", "c"},
},
{
desc: "existingOrder is empty, expectedOrder has the content of 'kubelet' and dependencies",
dependencies: []string{"a", "b", "c"},
existingOrder: []string{},
expectedOrder: []string{"kubelet", "a", "b", "c"},
},
{
desc: "dependencies and existingOrder have no overlap, expectedOrder having the 'kubelet' and dependencies to the end of the existingOrder",
dependencies: []string{"a", "b", "c"},
existingOrder: []string{"d", "e", "f"},
expectedOrder: []string{"d", "e", "f", "kubelet", "a", "b", "c"},
},
{
desc: "dependencies and existingOrder have overlaps, expectedOrder having the 'kubelet' and dependencies and hornor the order in existingorder",
dependencies: []string{"a", "b", "c"},
existingOrder: []string{"d", "b", "a", "f"},
expectedOrder: []string{"d", "kubelet", "b", "a", "f", "c"},
},
{
desc: "existingOrder has 'kubelet', expectedOrder move the kubelet to the correct order",
dependencies: []string{"a", "b", "c"},
existingOrder: []string{"d", "b", "kubelet", "a", "f"},
expectedOrder: []string{"d", "kubelet", "b", "a", "f", "c"},
},
{
desc: "existingOrder has been in the correct order, expectedOrder keep the order",
dependencies: []string{"a", "b", "c"},
existingOrder: []string{"d", "f", "kubelet", "a", "b", "c"},
expectedOrder: []string{"d", "f", "kubelet", "a", "b", "c"},
},
// The following two should not happen in practice, but we should handle it gracefully
{
desc: "dependencies has redundant string, expectedOrder remove the redundant string",
dependencies: []string{"a", "b", "b", "c"},
existingOrder: []string{"d", "b", "a", "f"},
expectedOrder: []string{"d", "kubelet", "b", "a", "f", "c"},
},
{
desc: "existingOrder has redundant string, expectedOrder remove the redundant string",
dependencies: []string{"a", "b", "c"},
existingOrder: []string{"d", "b", "a", "f", "b"},
expectedOrder: []string{"d", "kubelet", "b", "a", "f", "b", "c"},
},
}
for _, tc := range tests {
t.Run(tc.desc, func(t *testing.T) {
order := addToExistingOrder(tc.dependencies, tc.existingOrder)
assert.Equal(t, order, tc.expectedOrder)
})
}
}

View File

@ -0,0 +1,10 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- sig-node-approvers
reviewers:
- sig-windows-api-reviewers
- sig-node-reviewers
labels:
- sig/windows
- sig/node

View File

@ -21,7 +21,9 @@ package service
import (
"os"
"syscall"
"time"
"unsafe"
"k8s.io/apiserver/pkg/server"
"k8s.io/klog/v2"
@ -31,20 +33,89 @@ import (
)
type handler struct {
tosvc chan bool
fromsvc chan error
tosvc chan bool
fromsvc chan error
acceptPreshutdown bool
preshutdownHandler PreshutdownHandler
}
type PreshutdownHandler interface {
ProcessShutdownEvent() error
}
// SERVICE_PRESHUTDOWN_INFO structure
type SERVICE_PRESHUTDOWN_INFO struct {
PreshutdownTimeout uint32 // The time-out value, in milliseconds.
}
func QueryPreShutdownInfo(h windows.Handle) (*SERVICE_PRESHUTDOWN_INFO, error) {
// Query the SERVICE_CONFIG_PRESHUTDOWN_INFO
n := uint32(1024)
b := make([]byte, n)
for {
err := windows.QueryServiceConfig2(h, windows.SERVICE_CONFIG_PRESHUTDOWN_INFO, &b[0], n, &n)
if err == nil {
break
}
if err.(syscall.Errno) != syscall.ERROR_INSUFFICIENT_BUFFER {
return nil, err
}
if n <= uint32(len(b)) {
return nil, err
}
b = make([]byte, n)
}
// Convert the buffer to SERVICE_PRESHUTDOWN_INFO
info := (*SERVICE_PRESHUTDOWN_INFO)(unsafe.Pointer(&b[0]))
return info, nil
}
func UpdatePreShutdownInfo(h windows.Handle, timeoutMilliSeconds uint32) error {
// Set preshutdown info
preshutdownInfo := SERVICE_PRESHUTDOWN_INFO{
PreshutdownTimeout: timeoutMilliSeconds,
}
err := windows.ChangeServiceConfig2(h, windows.SERVICE_CONFIG_PRESHUTDOWN_INFO, (*byte)(unsafe.Pointer(&preshutdownInfo)))
if err != nil {
return err
}
return nil
}
var thehandler *handler // This is, unfortunately, a global along with the service, which means only one service per process.
// InitService is the entry point for running the daemon as a Windows
// service. It returns an indication of whether it is running as a service;
// and an error.
func InitService(serviceName string) error {
return initService(serviceName, false)
}
// InitService is the entry point for running the daemon as a Windows
// service which will accept preshutdown event. It returns an indication
// of whether it is running as a service; and an error.
func InitServiceWithShutdown(serviceName string) error {
return initService(serviceName, true)
}
// initService will try to run the daemon as a Windows
// service, with an option to indicate if the service will accept the preshutdown event.
func initService(serviceName string, acceptPreshutdown bool) error {
var err error
h := &handler{
tosvc: make(chan bool),
fromsvc: make(chan error),
tosvc: make(chan bool),
fromsvc: make(chan error),
acceptPreshutdown: acceptPreshutdown,
preshutdownHandler: nil,
}
var err error
thehandler = h
go func() {
err = svc.Run(serviceName, h)
h.fromsvc <- err
@ -53,18 +124,33 @@ func InitService(serviceName string) error {
// Wait for the first signal from the service handler.
err = <-h.fromsvc
if err != nil {
klog.Errorf("Running %s as a Windows has error %v!", serviceName, err)
return err
}
klog.Infof("Running %s as a Windows service!", serviceName)
return nil
}
func SetPreShutdownHandler(preshutdownhandler PreshutdownHandler) {
thehandler.preshutdownHandler = preshutdownhandler
}
func IsServiceInitialized() bool {
return thehandler != nil
}
func (h *handler) Execute(_ []string, r <-chan svc.ChangeRequest, s chan<- svc.Status) (bool, uint32) {
s <- svc.Status{State: svc.StartPending, Accepts: 0}
// Unblock initService()
h.fromsvc <- nil
s <- svc.Status{State: svc.Running, Accepts: svc.AcceptStop | svc.AcceptShutdown | svc.Accepted(windows.SERVICE_ACCEPT_PARAMCHANGE)}
if h.acceptPreshutdown {
s <- svc.Status{State: svc.Running, Accepts: svc.AcceptStop | svc.AcceptPreShutdown | svc.Accepted(windows.SERVICE_ACCEPT_PARAMCHANGE)}
klog.Infof("Accept preshutdown")
} else {
s <- svc.Status{State: svc.Running, Accepts: svc.AcceptStop | svc.AcceptShutdown | svc.Accepted(windows.SERVICE_ACCEPT_PARAMCHANGE)}
}
klog.Infof("Service running")
Loop:
for {
@ -79,6 +165,8 @@ Loop:
s <- c.CurrentStatus
case svc.Stop, svc.Shutdown:
klog.Infof("Service stopping")
s <- svc.Status{State: svc.StopPending}
// We need to translate this request into a signal that can be handled by the signal handler
// handling shutdowns normally (currently apiserver/pkg/server/signal.go).
// If we do not do this, our main threads won't be notified of the upcoming shutdown.
@ -102,6 +190,15 @@ Loop:
os.Exit(0)
}()
}
break Loop
case svc.PreShutdown:
klog.Infof("Node pre-shutdown")
s <- svc.Status{State: svc.StopPending}
if h.preshutdownHandler != nil {
h.preshutdownHandler.ProcessShutdownEvent()
}
break Loop
}
}

View File

@ -0,0 +1,371 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package windows
import (
"context"
"fmt"
"strings"
"time"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/kubectl/pkg/util/podutils"
"k8s.io/kubernetes/pkg/apis/scheduling"
kubelettypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/kubernetes/test/e2e/feature"
"k8s.io/kubernetes/test/e2e/framework"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
testutils "k8s.io/kubernetes/test/utils"
imageutils "k8s.io/kubernetes/test/utils/image"
admissionapi "k8s.io/pod-security-admission/api"
)
var _ = sigDescribe(feature.Windows, "GracefulNodeShutdown", framework.WithSerial(), framework.WithDisruptive(), framework.WithSlow(), skipUnlessWindows(func() {
f := framework.NewDefaultFramework("windows-node-graceful-shutdown")
f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
ginkgo.It("should be able to gracefully shutdown pods with various grace periods", func(ctx context.Context) {
const (
pollInterval = 1 * time.Second
podStatusUpdateTimeout = 90 * time.Second
nodeStatusUpdateTimeout = 90 * time.Second
nodeShutdownGracePeriod = 20 * time.Second
nodeShutdownGracePeriodCriticalPods = 10 * time.Second
)
ginkgo.By("selecting a Windows node")
targetNode, err := findWindowsNode(ctx, f)
framework.ExpectNoError(err, "Error finding Windows node")
framework.Logf("Using node: %v", targetNode.Name)
nodeName := targetNode.Name
nodeSelector := fields.Set{
"spec.nodeName": nodeName,
}.AsSelector().String()
// Define test pods
pods := []*v1.Pod{
getGracePeriodOverrideTestPod("period-20-"+string(uuid.NewUUID()), nodeName, 20, ""),
getGracePeriodOverrideTestPod("period-25-"+string(uuid.NewUUID()), nodeName, 25, ""),
getGracePeriodOverrideTestPod("period-critical-5-"+string(uuid.NewUUID()), nodeName, 5, scheduling.SystemNodeCritical),
getGracePeriodOverrideTestPod("period-critical-10-"+string(uuid.NewUUID()), nodeName, 10, scheduling.SystemNodeCritical),
}
ginkgo.By("Creating batch pods")
e2epod.NewPodClient(f).CreateBatch(ctx, pods)
list, err := e2epod.NewPodClient(f).List(ctx, metav1.ListOptions{
FieldSelector: nodeSelector,
})
framework.ExpectNoError(err)
gomega.Expect(list.Items).To(gomega.HaveLen(len(pods)), "the number of pods is not as expected")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
defer ginkgo.GinkgoRecover()
w := &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return f.ClientSet.CoreV1().Pods(f.Namespace.Name).Watch(ctx, options)
},
}
// Setup watch to continuously monitor any pod events and detect invalid pod status updates
_, err = watchtools.Until(ctx, list.ResourceVersion, w, func(event watch.Event) (bool, error) {
if pod, ok := event.Object.(*v1.Pod); ok {
if isPodReadyWithFailedStatus(pod) {
return false, fmt.Errorf("failing test due to detecting invalid pod status")
}
// Watch will never terminate (only when the test ends due to context cancellation)
return false, nil
}
return false, nil
})
// Ignore timeout error since the context will be explicitly cancelled and the watch will never return true
if err != nil && !wait.Interrupted(err) {
framework.Failf("watch for invalid pod status failed: %v", err.Error())
}
}()
ginkgo.By("Verifying batch pods are running")
for _, pod := range list.Items {
if podReady, err := testutils.PodRunningReady(&pod); err != nil || !podReady {
framework.Failf("Failed to start batch pod: %v", pod.Name)
}
}
for _, pod := range list.Items {
framework.Logf("Pod (%v/%v) status conditions: %q", pod.Namespace, pod.Name, &pod.Status.Conditions)
}
// use to keep the node active before testing critical pods reaching the terminate state
delyapodName := "delay-shutdown-20-" + string(uuid.NewUUID())
delayPod := getGracePeriodOverrideTestPod(delyapodName, nodeName, 20, scheduling.SystemNodeCritical)
e2epod.NewPodClient(f).CreateSync(ctx, delayPod)
ginkgo.By("Emitting shutdown signal")
emitSignalPrepareForShutdown(nodeName, f, ctx)
ginkgo.By("Verifying that non-critical pods are shutdown")
// Non critical pod should be shutdown
gomega.Eventually(ctx, func(ctx context.Context) error {
list, err = e2epod.NewPodClient(f).List(ctx, metav1.ListOptions{
FieldSelector: nodeSelector,
})
if err != nil {
return err
}
// Find pods with names starting with "period"
filteredPods := []v1.Pod{}
for _, pod := range list.Items {
if strings.HasPrefix(pod.Name, "period") {
filteredPods = append(filteredPods, pod)
}
}
gomega.Expect(filteredPods).To(gomega.HaveLen(len(pods)), "the number of pods is not as expected")
for _, pod := range filteredPods {
if kubelettypes.IsCriticalPod(&pod) {
if isPodShutdown(&pod) {
framework.Logf("Expecting critical pod (%v/%v) to be running, but it's not currently. Pod Status %+v", pod.Namespace, pod.Name, pod.Status)
return fmt.Errorf("critical pod (%v/%v) should not be shutdown, phase: %s", pod.Namespace, pod.Name, pod.Status.Phase)
}
} else {
if !isPodShutdown(&pod) {
framework.Logf("Expecting non-critical pod (%v/%v) to be shutdown, but it's not currently. Pod Status %+v", pod.Namespace, pod.Name, pod.Status)
return fmt.Errorf("pod (%v/%v) should be shutdown, phase: %s", pod.Namespace, pod.Name, pod.Status.Phase)
}
}
}
return nil
}, podStatusUpdateTimeout, pollInterval).Should(gomega.Succeed())
ginkgo.By("Verifying that all pods are shutdown")
// All pod should be shutdown
gomega.Eventually(ctx, func(ctx context.Context) error {
list, err = e2epod.NewPodClient(f).List(ctx, metav1.ListOptions{
FieldSelector: nodeSelector,
})
if err != nil {
return err
}
// Find pods with names starting with "period"
filteredPods := []v1.Pod{}
for _, pod := range list.Items {
if strings.HasPrefix(pod.Name, "period") {
filteredPods = append(filteredPods, pod)
}
}
gomega.Expect(filteredPods).To(gomega.HaveLen(len(pods)), "the number of pods is not as expected")
for _, pod := range filteredPods {
if !isPodShutdown(&pod) {
framework.Logf("Expecting pod (%v/%v) to be shutdown, but it's not currently: Pod Status %+v", pod.Namespace, pod.Name, pod.Status)
return fmt.Errorf("pod (%v/%v) should be shutdown, phase: %s", pod.Namespace, pod.Name, pod.Status.Phase)
}
}
return nil
},
// Critical pod starts shutdown after (nodeShutdownGracePeriod-nodeShutdownGracePeriodCriticalPods)
podStatusUpdateTimeout+(nodeShutdownGracePeriod-nodeShutdownGracePeriodCriticalPods),
pollInterval).Should(gomega.Succeed())
ginkgo.By("Verify that all pod ready to start condition are set to false after terminating")
// All pod ready to start condition should set to false
gomega.Eventually(ctx, func(ctx context.Context) error {
list, err = e2epod.NewPodClient(f).List(ctx, metav1.ListOptions{
FieldSelector: nodeSelector,
})
if err != nil {
return err
}
// Find pods with names starting with "period"
filteredPods := []v1.Pod{}
for _, pod := range list.Items {
if strings.HasPrefix(pod.Name, "period") {
filteredPods = append(filteredPods, pod)
}
}
gomega.Expect(filteredPods).To(gomega.HaveLen(len(pods)), "the number of pods is not as expected")
for _, pod := range filteredPods {
if !isPodReadyToStartConditionSetToFalse(&pod) {
framework.Logf("Expecting pod (%v/%v) 's ready to start condition set to false, "+
"but it's not currently: Pod Condition %+v", pod.Namespace, pod.Name, pod.Status.Conditions)
return fmt.Errorf("pod (%v/%v) 's ready to start condition should be false, condition: %s, phase: %s",
pod.Namespace, pod.Name, pod.Status.Conditions, pod.Status.Phase)
}
}
return nil
},
).Should(gomega.Succeed())
})
}))
// getGracePeriodOverrideTestPod returns a new Pod object containing a container
// runs a shell script, hangs the process until a SIGTERM signal is received.
// The script waits for $PID to ensure that the process does not exist.
// If priorityClassName is scheduling.SystemNodeCritical, the Pod is marked as critical and a comment is added.
func getGracePeriodOverrideTestPod(name string, node string, gracePeriod int64, priorityClassName string) *v1.Pod {
agnhostImage := imageutils.GetE2EImage(imageutils.Agnhost)
pod := &v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: name,
Image: agnhostImage,
Command: []string{"/agnhost", "netexec", "--delay-shutdown", "9999"},
},
},
TerminationGracePeriodSeconds: &gracePeriod,
NodeName: node,
},
}
if priorityClassName == scheduling.SystemNodeCritical {
pod.ObjectMeta.Annotations = map[string]string{
kubelettypes.ConfigSourceAnnotationKey: kubelettypes.FileSource,
}
pod.Spec.PriorityClassName = priorityClassName
if !kubelettypes.IsCriticalPod(pod) {
framework.Failf("pod %q should be a critical pod", pod.Name)
}
} else {
pod.Spec.PriorityClassName = priorityClassName
if kubelettypes.IsCriticalPod(pod) {
framework.Failf("pod %q should not be a critical pod", pod.Name)
}
}
return pod
}
// Emits a reboot event from HPC. Will cause kubelet to react to an active shutdown event.
func emitSignalPrepareForShutdown(nodeName string, f *framework.Framework, ctx context.Context) {
ginkgo.By("scheduling a pod with a container that emits a PrepareForShutdown signal")
windowsImage := imageutils.GetE2EImage(imageutils.Agnhost)
trueVar := true
podName := "reboot-host-test-pod"
user := "NT AUTHORITY\\SYSTEM"
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
},
Spec: v1.PodSpec{
SecurityContext: &v1.PodSecurityContext{
WindowsOptions: &v1.WindowsSecurityContextOptions{
HostProcess: &trueVar,
RunAsUserName: &user,
},
},
HostNetwork: true,
Containers: []v1.Container{
{
Image: windowsImage,
Name: "reboot-computer-test",
Command: []string{
"powershell.exe",
"-Command",
"$os = Get-WmiObject -Class win32_operatingsystem;",
"[Environment]::SetEnvironmentVariable(\"TMP_BOOT_DATE\", $os.LastBootUpTime, \"Machine\");",
"[Environment]::SetEnvironmentVariable(\"TMP_INSTALL_DATE\", $os.InstallDate, \"Machine\");",
"shutdown.exe -r -t 60",
},
},
},
RestartPolicy: v1.RestartPolicyNever,
NodeName: nodeName,
},
}
e2epod.NewPodClient(f).Create(ctx, pod)
ginkgo.By("Waiting for pod to run")
e2epod.NewPodClient(f).WaitForFinish(ctx, podName, 3*time.Minute)
ginkgo.By("Then ensuring pod finished running successfully")
p, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Get(
ctx,
podName,
metav1.GetOptions{})
framework.ExpectNoError(err, "Error retrieving pod")
gomega.Expect(p.Status.Phase).To(gomega.Equal(v1.PodSucceeded))
}
const (
// https://github.com/kubernetes/kubernetes/blob/1dd781ddcad454cc381806fbc6bd5eba8fa368d7/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go#L43-L44
podShutdownReason = "Terminated"
podShutdownMessage = "Pod was terminated in response to imminent node shutdown."
)
func isPodShutdown(pod *v1.Pod) bool {
if pod == nil {
return false
}
hasContainersNotReadyCondition := false
for _, cond := range pod.Status.Conditions {
if cond.Type == v1.ContainersReady && cond.Status == v1.ConditionFalse {
hasContainersNotReadyCondition = true
}
}
return pod.Status.Message == podShutdownMessage && pod.Status.Reason == podShutdownReason && hasContainersNotReadyCondition && pod.Status.Phase == v1.PodFailed
}
// Pods should never report failed phase and have ready condition = true (https://github.com/kubernetes/kubernetes/issues/108594)
func isPodReadyWithFailedStatus(pod *v1.Pod) bool {
return pod.Status.Phase == v1.PodFailed && podutils.IsPodReady(pod)
}
func isPodReadyToStartConditionSetToFalse(pod *v1.Pod) bool {
if pod == nil {
return false
}
readyToStartConditionSetToFalse := false
for _, cond := range pod.Status.Conditions {
if cond.Status == v1.ConditionFalse {
readyToStartConditionSetToFalse = true
}
}
return readyToStartConditionSetToFalse
}

View File

@ -1430,6 +1430,12 @@
lockToDefault: false
preRelease: Alpha
version: "1.32"
- name: WindowsGracefulNodeShutdown
versionedSpecs:
- default: false
lockToDefault: false
preRelease: Alpha
version: "1.32"
- name: WindowsHostNetwork
versionedSpecs:
- default: true