Merge pull request #96129 from bobbypage/graceful-node-shutdown

Implement Graceful Node Shutdown in Kubelet
This commit is contained in:
Kubernetes Prow Robot 2020-11-12 14:58:25 -08:00 committed by GitHub
commit b2dc35dab2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 1360 additions and 123 deletions

1
go.mod
View File

@ -49,6 +49,7 @@ require (
github.com/go-openapi/strfmt v0.19.3
github.com/go-openapi/validate v0.19.5
github.com/go-ozzo/ozzo-validation v3.5.0+incompatible // indirect
github.com/godbus/dbus/v5 v5.0.3
github.com/gogo/protobuf v1.3.1
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7
github.com/golang/mock v1.3.1

View File

@ -701,6 +701,11 @@ const (
// Enable kubelet to pass pod's service account token to NodePublishVolume
// call of CSI driver which is mounting volumes for that pod.
CSIServiceAccountToken featuregate.Feature = "CSIServiceAccountToken"
// owner: @bobbypage
// alpha: v1.20
// Adds support for kubelet to detect node shutdown and gracefully terminate pods prior to the node being shutdown.
GracefulNodeShutdown featuregate.Feature = "GracefulNodeShutdown"
)
func init() {
@ -806,6 +811,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
LoadBalancerIPMode: {Default: false, PreRelease: featuregate.Alpha},
ExecProbeTimeout: {Default: true, PreRelease: featuregate.GA}, // lock to default in v1.21 and remove in v1.22
KubeletCredentialProviders: {Default: false, PreRelease: featuregate.Alpha},
GracefulNodeShutdown: {Default: false, PreRelease: featuregate.Alpha},
// inherited features from generic apiserver, relisted here to get a conflict if it is changed
// unintentionally on either side:

View File

@ -71,6 +71,7 @@ go_library(
"//pkg/kubelet/metrics:go_default_library",
"//pkg/kubelet/metrics/collectors:go_default_library",
"//pkg/kubelet/network/dns:go_default_library",
"//pkg/kubelet/nodeshutdown:go_default_library",
"//pkg/kubelet/nodestatus:go_default_library",
"//pkg/kubelet/oom:go_default_library",
"//pkg/kubelet/pleg:go_default_library",
@ -308,6 +309,7 @@ filegroup(
"//pkg/kubelet/logs:all-srcs",
"//pkg/kubelet/metrics:all-srcs",
"//pkg/kubelet/network:all-srcs",
"//pkg/kubelet/nodeshutdown:all-srcs",
"//pkg/kubelet/nodestatus:all-srcs",
"//pkg/kubelet/oom:all-srcs",
"//pkg/kubelet/pleg:all-srcs",

View File

@ -234,5 +234,7 @@ var (
"TypeMeta.Kind",
"VolumeStatsAggPeriod.Duration",
"VolumePluginDir",
"ShutdownGracePeriod.Duration",
"ShutdownGracePeriodCriticalPods.Duration",
)
)

View File

@ -67,6 +67,8 @@ registryPullQPS: 5
resolvConf: /etc/resolv.conf
runtimeRequestTimeout: 2m0s
serializeImagePulls: true
shutdownGracePeriod: 0s
shutdownGracePeriodCriticalPods: 0s
streamingConnectionIdleTimeout: 4h0m0s
syncFrequency: 1m0s
topologyManagerPolicy: none

View File

@ -67,6 +67,8 @@ registryPullQPS: 5
resolvConf: /etc/resolv.conf
runtimeRequestTimeout: 2m0s
serializeImagePulls: true
shutdownGracePeriod: 0s
shutdownGracePeriodCriticalPods: 0s
streamingConnectionIdleTimeout: 4h0m0s
syncFrequency: 1m0s
topologyManagerPolicy: none

View File

@ -375,6 +375,13 @@ type KubeletConfiguration struct {
Logging componentbaseconfig.LoggingConfiguration
// EnableSystemLogHandler enables /logs handler.
EnableSystemLogHandler bool
// ShutdownGracePeriod specifies the total duration that the node should delay the shutdown and total grace period for pod termination during a node shutdown.
// Defaults to 30 seconds, requires GracefulNodeShutdown feature gate to be enabled.
ShutdownGracePeriod metav1.Duration
// ShutdownGracePeriodCriticalPods specifies the duration used to terminate critical pods during a node shutdown. This should be less than ShutdownGracePeriod.
// Defaults to 10 seconds, requires GracefulNodeShutdown feature gate to be enabled.
// For example, if ShutdownGracePeriod=30s, and ShutdownGracePeriodCriticalPods=10s, during a node shutdown the first 20 seconds would be reserved for gracefully terminating normal pods, and the last 10 seconds would be reserved for terminating critical pods.
ShutdownGracePeriodCriticalPods metav1.Duration
}
// KubeletAuthorizationMode denotes the authorization mode for the kubelet

View File

@ -350,6 +350,8 @@ func autoConvert_v1beta1_KubeletConfiguration_To_config_KubeletConfiguration(in
if err := v1.Convert_Pointer_bool_To_bool(&in.EnableSystemLogHandler, &out.EnableSystemLogHandler, s); err != nil {
return err
}
out.ShutdownGracePeriod = in.ShutdownGracePeriod
out.ShutdownGracePeriodCriticalPods = in.ShutdownGracePeriodCriticalPods
return nil
}
@ -501,6 +503,8 @@ func autoConvert_config_KubeletConfiguration_To_v1beta1_KubeletConfiguration(in
if err := v1.Convert_bool_To_Pointer_bool(&in.EnableSystemLogHandler, &out.EnableSystemLogHandler, s); err != nil {
return err
}
out.ShutdownGracePeriod = in.ShutdownGracePeriod
out.ShutdownGracePeriodCriticalPods = in.ShutdownGracePeriodCriticalPods
return nil
}

View File

@ -140,6 +140,21 @@ func ValidateKubeletConfiguration(kc *kubeletconfig.KubeletConfiguration) error
allErrors = append(allErrors, fmt.Errorf("invalid configuration: topologyManagerScope non-allowable value: %v", kc.TopologyManagerScope))
}
if localFeatureGate.Enabled(features.GracefulNodeShutdown) {
if kc.ShutdownGracePeriod.Duration < 0 || kc.ShutdownGracePeriodCriticalPods.Duration < 0 || kc.ShutdownGracePeriodCriticalPods.Duration > kc.ShutdownGracePeriod.Duration {
allErrors = append(allErrors, fmt.Errorf("invalid configuration: ShutdownGracePeriod %v must be >= 0, ShutdownGracePeriodCriticalPods %v must be >= 0, and ShutdownGracePeriodCriticalPods %v must be <= ShutdownGracePeriod %v", kc.ShutdownGracePeriod, kc.ShutdownGracePeriodCriticalPods, kc.ShutdownGracePeriodCriticalPods, kc.ShutdownGracePeriod))
}
if kc.ShutdownGracePeriod.Duration > 0 && kc.ShutdownGracePeriod.Duration < time.Duration(time.Second) {
allErrors = append(allErrors, fmt.Errorf("invalid configuration: ShutdownGracePeriod %v must be either zero or otherwise >= 1 sec", kc.ShutdownGracePeriod))
}
if kc.ShutdownGracePeriodCriticalPods.Duration > 0 && kc.ShutdownGracePeriodCriticalPods.Duration < time.Duration(time.Second) {
allErrors = append(allErrors, fmt.Errorf("invalid configuration: ShutdownGracePeriodCriticalPods %v must be either zero or otherwise >= 1 sec", kc.ShutdownGracePeriodCriticalPods))
}
}
if (kc.ShutdownGracePeriod.Duration > 0 || kc.ShutdownGracePeriodCriticalPods.Duration > 0) && !localFeatureGate.Enabled(features.GracefulNodeShutdown) {
allErrors = append(allErrors, fmt.Errorf("invalid configuration: Specifying ShutdownGracePeriod or ShutdownGracePeriodCriticalPods requires feature gate GracefulNodeShutdown"))
}
for _, val := range kc.EnforceNodeAllocatable {
switch val {
case kubetypes.NodeAllocatableEnforcementKey:

View File

@ -27,36 +27,39 @@ import (
func TestValidateKubeletConfiguration(t *testing.T) {
successCase1 := &kubeletconfig.KubeletConfiguration{
CgroupsPerQOS: true,
EnforceNodeAllocatable: []string{"pods", "system-reserved", "kube-reserved"},
SystemReservedCgroup: "/system.slice",
KubeReservedCgroup: "/kubelet.service",
SystemCgroups: "",
CgroupRoot: "",
EventBurst: 10,
EventRecordQPS: 5,
HealthzPort: 10248,
ImageGCHighThresholdPercent: 85,
ImageGCLowThresholdPercent: 80,
IPTablesDropBit: 15,
IPTablesMasqueradeBit: 14,
KubeAPIBurst: 10,
KubeAPIQPS: 5,
MaxOpenFiles: 1000000,
MaxPods: 110,
OOMScoreAdj: -999,
PodsPerCore: 100,
Port: 65535,
ReadOnlyPort: 0,
RegistryBurst: 10,
RegistryPullQPS: 5,
HairpinMode: kubeletconfig.PromiscuousBridge,
NodeLeaseDurationSeconds: 1,
CPUCFSQuotaPeriod: metav1.Duration{Duration: 25 * time.Millisecond},
TopologyManagerScope: kubeletconfig.PodTopologyManagerScope,
TopologyManagerPolicy: kubeletconfig.SingleNumaNodeTopologyManagerPolicy,
CgroupsPerQOS: true,
EnforceNodeAllocatable: []string{"pods", "system-reserved", "kube-reserved"},
SystemReservedCgroup: "/system.slice",
KubeReservedCgroup: "/kubelet.service",
SystemCgroups: "",
CgroupRoot: "",
EventBurst: 10,
EventRecordQPS: 5,
HealthzPort: 10248,
ImageGCHighThresholdPercent: 85,
ImageGCLowThresholdPercent: 80,
IPTablesDropBit: 15,
IPTablesMasqueradeBit: 14,
KubeAPIBurst: 10,
KubeAPIQPS: 5,
MaxOpenFiles: 1000000,
MaxPods: 110,
OOMScoreAdj: -999,
PodsPerCore: 100,
Port: 65535,
ReadOnlyPort: 0,
RegistryBurst: 10,
RegistryPullQPS: 5,
HairpinMode: kubeletconfig.PromiscuousBridge,
NodeLeaseDurationSeconds: 1,
CPUCFSQuotaPeriod: metav1.Duration{Duration: 25 * time.Millisecond},
TopologyManagerScope: kubeletconfig.PodTopologyManagerScope,
TopologyManagerPolicy: kubeletconfig.SingleNumaNodeTopologyManagerPolicy,
ShutdownGracePeriod: metav1.Duration{Duration: 30 * time.Second},
ShutdownGracePeriodCriticalPods: metav1.Duration{Duration: 10 * time.Second},
FeatureGates: map[string]bool{
"CustomCPUCFSQuotaPeriod": true,
"GracefulNodeShutdown": true,
},
}
if allErrors := ValidateKubeletConfiguration(successCase1); allErrors != nil {
@ -64,37 +67,40 @@ func TestValidateKubeletConfiguration(t *testing.T) {
}
successCase2 := &kubeletconfig.KubeletConfiguration{
CgroupsPerQOS: true,
EnforceNodeAllocatable: []string{"pods"},
SystemReservedCgroup: "",
KubeReservedCgroup: "",
SystemCgroups: "",
CgroupRoot: "",
EventBurst: 10,
EventRecordQPS: 5,
HealthzPort: 10248,
ImageGCHighThresholdPercent: 85,
ImageGCLowThresholdPercent: 80,
IPTablesDropBit: 15,
IPTablesMasqueradeBit: 14,
KubeAPIBurst: 10,
KubeAPIQPS: 5,
MaxOpenFiles: 1000000,
MaxPods: 110,
OOMScoreAdj: -999,
PodsPerCore: 100,
Port: 65535,
ReadOnlyPort: 0,
RegistryBurst: 10,
RegistryPullQPS: 5,
HairpinMode: kubeletconfig.PromiscuousBridge,
NodeLeaseDurationSeconds: 1,
CPUCFSQuotaPeriod: metav1.Duration{Duration: 50 * time.Millisecond},
ReservedSystemCPUs: "0-3",
TopologyManagerScope: kubeletconfig.ContainerTopologyManagerScope,
TopologyManagerPolicy: kubeletconfig.NoneTopologyManagerPolicy,
CgroupsPerQOS: true,
EnforceNodeAllocatable: []string{"pods"},
SystemReservedCgroup: "",
KubeReservedCgroup: "",
SystemCgroups: "",
CgroupRoot: "",
EventBurst: 10,
EventRecordQPS: 5,
HealthzPort: 10248,
ImageGCHighThresholdPercent: 85,
ImageGCLowThresholdPercent: 80,
IPTablesDropBit: 15,
IPTablesMasqueradeBit: 14,
KubeAPIBurst: 10,
KubeAPIQPS: 5,
MaxOpenFiles: 1000000,
MaxPods: 110,
OOMScoreAdj: -999,
PodsPerCore: 100,
Port: 65535,
ReadOnlyPort: 0,
RegistryBurst: 10,
RegistryPullQPS: 5,
HairpinMode: kubeletconfig.PromiscuousBridge,
NodeLeaseDurationSeconds: 1,
CPUCFSQuotaPeriod: metav1.Duration{Duration: 50 * time.Millisecond},
ReservedSystemCPUs: "0-3",
TopologyManagerScope: kubeletconfig.ContainerTopologyManagerScope,
TopologyManagerPolicy: kubeletconfig.NoneTopologyManagerPolicy,
ShutdownGracePeriod: metav1.Duration{Duration: 10 * time.Minute},
ShutdownGracePeriodCriticalPods: metav1.Duration{Duration: 0},
FeatureGates: map[string]bool{
"CustomCPUCFSQuotaPeriod": true,
"GracefulNodeShutdown": true,
},
}
if allErrors := ValidateKubeletConfiguration(successCase2); allErrors != nil {
@ -102,68 +108,73 @@ func TestValidateKubeletConfiguration(t *testing.T) {
}
errorCase1 := &kubeletconfig.KubeletConfiguration{
CgroupsPerQOS: false,
EnforceNodeAllocatable: []string{"pods", "system-reserved", "kube-reserved", "illegal-key"},
SystemCgroups: "/",
CgroupRoot: "",
EventBurst: -10,
EventRecordQPS: -10,
HealthzPort: -10,
ImageGCHighThresholdPercent: 101,
ImageGCLowThresholdPercent: 101,
IPTablesDropBit: -10,
IPTablesMasqueradeBit: -10,
KubeAPIBurst: -10,
KubeAPIQPS: -10,
MaxOpenFiles: -10,
MaxPods: -10,
OOMScoreAdj: -1001,
PodsPerCore: -10,
Port: 0,
ReadOnlyPort: -10,
RegistryBurst: -10,
RegistryPullQPS: -10,
HairpinMode: "foo",
NodeLeaseDurationSeconds: -1,
CPUCFSQuotaPeriod: metav1.Duration{Duration: 100 * time.Millisecond},
CgroupsPerQOS: false,
EnforceNodeAllocatable: []string{"pods", "system-reserved", "kube-reserved", "illegal-key"},
SystemCgroups: "/",
CgroupRoot: "",
EventBurst: -10,
EventRecordQPS: -10,
HealthzPort: -10,
ImageGCHighThresholdPercent: 101,
ImageGCLowThresholdPercent: 101,
IPTablesDropBit: -10,
IPTablesMasqueradeBit: -10,
KubeAPIBurst: -10,
KubeAPIQPS: -10,
MaxOpenFiles: -10,
MaxPods: -10,
OOMScoreAdj: -1001,
PodsPerCore: -10,
Port: 0,
ReadOnlyPort: -10,
RegistryBurst: -10,
RegistryPullQPS: -10,
HairpinMode: "foo",
NodeLeaseDurationSeconds: -1,
CPUCFSQuotaPeriod: metav1.Duration{Duration: 100 * time.Millisecond},
ShutdownGracePeriod: metav1.Duration{Duration: 30 * time.Second},
ShutdownGracePeriodCriticalPods: metav1.Duration{Duration: 10 * time.Second},
}
const numErrsErrorCase1 = 27
const numErrsErrorCase1 = 28
if allErrors := ValidateKubeletConfiguration(errorCase1); len(allErrors.(utilerrors.Aggregate).Errors()) != numErrsErrorCase1 {
t.Errorf("expect %d errors, got %v", numErrsErrorCase1, len(allErrors.(utilerrors.Aggregate).Errors()))
}
errorCase2 := &kubeletconfig.KubeletConfiguration{
CgroupsPerQOS: true,
EnforceNodeAllocatable: []string{"pods", "system-reserved", "kube-reserved"},
SystemReservedCgroup: "/system.slice",
KubeReservedCgroup: "/kubelet.service",
SystemCgroups: "",
CgroupRoot: "",
EventBurst: 10,
EventRecordQPS: 5,
HealthzPort: 10248,
ImageGCHighThresholdPercent: 85,
ImageGCLowThresholdPercent: 80,
IPTablesDropBit: 15,
IPTablesMasqueradeBit: 14,
KubeAPIBurst: 10,
KubeAPIQPS: 5,
MaxOpenFiles: 1000000,
MaxPods: 110,
OOMScoreAdj: -999,
PodsPerCore: 100,
Port: 65535,
ReadOnlyPort: 0,
RegistryBurst: 10,
RegistryPullQPS: 5,
HairpinMode: kubeletconfig.PromiscuousBridge,
NodeLeaseDurationSeconds: 1,
CPUCFSQuotaPeriod: metav1.Duration{Duration: 50 * time.Millisecond},
ReservedSystemCPUs: "0-3",
TopologyManagerScope: "invalid",
TopologyManagerPolicy: "invalid",
CgroupsPerQOS: true,
EnforceNodeAllocatable: []string{"pods", "system-reserved", "kube-reserved"},
SystemReservedCgroup: "/system.slice",
KubeReservedCgroup: "/kubelet.service",
SystemCgroups: "",
CgroupRoot: "",
EventBurst: 10,
EventRecordQPS: 5,
HealthzPort: 10248,
ImageGCHighThresholdPercent: 85,
ImageGCLowThresholdPercent: 80,
IPTablesDropBit: 15,
IPTablesMasqueradeBit: 14,
KubeAPIBurst: 10,
KubeAPIQPS: 5,
MaxOpenFiles: 1000000,
MaxPods: 110,
OOMScoreAdj: -999,
PodsPerCore: 100,
Port: 65535,
ReadOnlyPort: 0,
RegistryBurst: 10,
RegistryPullQPS: 5,
HairpinMode: kubeletconfig.PromiscuousBridge,
NodeLeaseDurationSeconds: 1,
CPUCFSQuotaPeriod: metav1.Duration{Duration: 50 * time.Millisecond},
ReservedSystemCPUs: "0-3",
TopologyManagerScope: "invalid",
TopologyManagerPolicy: "invalid",
ShutdownGracePeriod: metav1.Duration{Duration: 40 * time.Second},
ShutdownGracePeriodCriticalPods: metav1.Duration{Duration: 10 * time.Second},
FeatureGates: map[string]bool{
"CustomCPUCFSQuotaPeriod": true,
"GracefulNodeShutdown": true,
},
}
const numErrsErrorCase2 = 3

View File

@ -271,6 +271,8 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) {
copy(*out, *in)
}
out.Logging = in.Logging
out.ShutdownGracePeriod = in.ShutdownGracePeriod
out.ShutdownGracePeriodCriticalPods = in.ShutdownGracePeriodCriticalPods
return
}

View File

@ -84,6 +84,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
"k8s.io/kubernetes/pkg/kubelet/network/dns"
"k8s.io/kubernetes/pkg/kubelet/nodeshutdown"
oomwatcher "k8s.io/kubernetes/pkg/kubelet/oom"
"k8s.io/kubernetes/pkg/kubelet/pleg"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager"
@ -794,6 +795,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
v1.NamespaceNodeLease,
util.SetNodeOwnerFunc(klet.heartbeatClient, string(klet.nodeName)))
klet.shutdownManager = nodeshutdown.NewManager(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder), kubeCfg.ShutdownGracePeriod.Duration, kubeCfg.ShutdownGracePeriodCriticalPods.Duration)
// Finally, put the most recent version of the config on the Kubelet, so
// people can see how it was configured.
klet.kubeletConfiguration = *kubeCfg
@ -1137,6 +1140,9 @@ type Kubelet struct {
// Handles RuntimeClass objects for the Kubelet.
runtimeClassManager *runtimeclass.Manager
// Handles node shutdown events for the Node.
shutdownManager *nodeshutdown.Manager
}
// ListPodStats is delegated to StatsProvider, which implements stats.Provider interface
@ -1353,6 +1359,12 @@ func (kl *Kubelet) initializeRuntimeDependentModules() {
// Start the plugin manager
klog.V(4).Infof("starting plugin manager")
go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop)
err = kl.shutdownManager.Start()
if err != nil {
// The shutdown manager is not critical for kubelet, so log failure, but don't block Kubelet startup if there was a failure starting it.
klog.Errorf("Failed to start node shutdown manager: %v", err)
}
}
// Run starts the kubelet reacting to config updates

View File

@ -600,7 +600,7 @@ func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error {
nodestatus.MemoryPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderMemoryPressure, kl.recordNodeStatusEvent),
nodestatus.DiskPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderDiskPressure, kl.recordNodeStatusEvent),
nodestatus.PIDPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderPIDPressure, kl.recordNodeStatusEvent),
nodestatus.ReadyCondition(kl.clock.Now, kl.runtimeState.runtimeErrors, kl.runtimeState.networkErrors, kl.runtimeState.storageErrors, validateHostFunc, kl.containerManager.Status, kl.recordNodeStatusEvent),
nodestatus.ReadyCondition(kl.clock.Now, kl.runtimeState.runtimeErrors, kl.runtimeState.networkErrors, kl.runtimeState.storageErrors, validateHostFunc, kl.containerManager.Status, kl.shutdownManager.ShutdownStatus, kl.recordNodeStatusEvent),
nodestatus.VolumesInUse(kl.volumeManager.ReconcilerStatesHasBeenSynced, kl.volumeManager.GetVolumesInUse),
// TODO(mtaufen): I decided not to move this setter for now, since all it does is send an event
// and record state back to the Kubelet runtime object. In the future, I'd like to isolate

View File

@ -0,0 +1,127 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"nodeshutdown_manager_linux.go",
"nodeshutdown_manager_others.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/nodeshutdown",
visibility = ["//visibility:public"],
deps = select({
"@io_bazel_rules_go//go/platform:aix": [
"//pkg/kubelet/eviction:go_default_library",
],
"@io_bazel_rules_go//go/platform:android": [
"//pkg/features:go_default_library",
"//pkg/kubelet/eviction:go_default_library",
"//pkg/kubelet/nodeshutdown/systemd:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//pkg/kubelet/util/format:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/github.com/godbus/dbus/v5:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
],
"@io_bazel_rules_go//go/platform:darwin": [
"//pkg/kubelet/eviction:go_default_library",
],
"@io_bazel_rules_go//go/platform:dragonfly": [
"//pkg/kubelet/eviction:go_default_library",
],
"@io_bazel_rules_go//go/platform:freebsd": [
"//pkg/kubelet/eviction:go_default_library",
],
"@io_bazel_rules_go//go/platform:illumos": [
"//pkg/kubelet/eviction:go_default_library",
],
"@io_bazel_rules_go//go/platform:ios": [
"//pkg/kubelet/eviction:go_default_library",
],
"@io_bazel_rules_go//go/platform:js": [
"//pkg/kubelet/eviction:go_default_library",
],
"@io_bazel_rules_go//go/platform:linux": [
"//pkg/features:go_default_library",
"//pkg/kubelet/eviction:go_default_library",
"//pkg/kubelet/nodeshutdown/systemd:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//pkg/kubelet/util/format:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/github.com/godbus/dbus/v5:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
],
"@io_bazel_rules_go//go/platform:nacl": [
"//pkg/kubelet/eviction:go_default_library",
],
"@io_bazel_rules_go//go/platform:netbsd": [
"//pkg/kubelet/eviction:go_default_library",
],
"@io_bazel_rules_go//go/platform:openbsd": [
"//pkg/kubelet/eviction:go_default_library",
],
"@io_bazel_rules_go//go/platform:plan9": [
"//pkg/kubelet/eviction:go_default_library",
],
"@io_bazel_rules_go//go/platform:solaris": [
"//pkg/kubelet/eviction:go_default_library",
],
"@io_bazel_rules_go//go/platform:windows": [
"//pkg/kubelet/eviction:go_default_library",
],
"//conditions:default": [],
}),
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//pkg/kubelet/nodeshutdown/systemd:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
go_test(
name = "go_default_test",
srcs = ["nodeshutdown_manager_linux_test.go"],
embed = [":go_default_library"],
deps = select({
"@io_bazel_rules_go//go/platform:android": [
"//pkg/apis/scheduling:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubelet/nodeshutdown/systemd:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
],
"@io_bazel_rules_go//go/platform:linux": [
"//pkg/apis/scheduling:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubelet/nodeshutdown/systemd:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
],
"//conditions:default": [],
}),
)

View File

@ -0,0 +1,255 @@
// +build linux
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package nodeshutdown can watch for node level shutdown events and trigger graceful termination of pods running on the node prior to a system shutdown.
package nodeshutdown
import (
"fmt"
"sync"
"time"
"github.com/godbus/dbus/v5"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/clock"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/eviction"
"k8s.io/kubernetes/pkg/kubelet/nodeshutdown/systemd"
kubelettypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/format"
)
const (
nodeShutdownReason = "Shutdown"
nodeShutdownMessage = "Node is shutting, evicting pods"
)
var systemDbus = func() (dbusInhibiter, error) {
bus, err := dbus.SystemBus()
if err != nil {
return nil, err
}
return &systemd.DBusCon{SystemBus: bus}, nil
}
type dbusInhibiter interface {
CurrentInhibitDelay() (time.Duration, error)
InhibitShutdown() (systemd.InhibitLock, error)
ReleaseInhibitLock(lock systemd.InhibitLock) error
ReloadLogindConf() error
MonitorShutdown() (<-chan bool, error)
OverrideInhibitDelay(inhibitDelayMax time.Duration) error
}
// Manager has functions that can be used to interact with the Node Shutdown Manager.
type Manager struct {
shutdownGracePeriodRequested time.Duration
shutdownGracePeriodCriticalPods time.Duration
getPods eviction.ActivePodsFunc
killPod eviction.KillPodFunc
dbusCon dbusInhibiter
inhibitLock systemd.InhibitLock
nodeShuttingDownMutex sync.Mutex
nodeShuttingDownNow bool
clock clock.Clock
}
// NewManager returns a new node shutdown manager.
func NewManager(getPodsFunc eviction.ActivePodsFunc, killPodFunc eviction.KillPodFunc, shutdownGracePeriodRequested, shutdownGracePeriodCriticalPods time.Duration) *Manager {
return &Manager{
getPods: getPodsFunc,
killPod: killPodFunc,
shutdownGracePeriodRequested: shutdownGracePeriodRequested,
shutdownGracePeriodCriticalPods: shutdownGracePeriodCriticalPods,
clock: clock.RealClock{},
}
}
// Start starts the node shutdown manager and will start watching the node for shutdown events.
func (m *Manager) Start() error {
if !utilfeature.DefaultFeatureGate.Enabled(features.GracefulNodeShutdown) {
return nil
}
if m.shutdownGracePeriodRequested == 0 {
return nil
}
systemBus, err := systemDbus()
if err != nil {
return err
}
m.dbusCon = systemBus
currentInhibitDelay, err := m.dbusCon.CurrentInhibitDelay()
if err != nil {
return err
}
// If the logind's InhibitDelayMaxUSec as configured in (logind.conf) is less than shutdownGracePeriodRequested, attempt to update the value to shutdownGracePeriodRequested.
if m.shutdownGracePeriodRequested > currentInhibitDelay {
err := m.dbusCon.OverrideInhibitDelay(m.shutdownGracePeriodRequested)
if err != nil {
return fmt.Errorf("unable to override inhibit delay by shutdown manager: %v", err)
}
err = m.dbusCon.ReloadLogindConf()
if err != nil {
return err
}
// Read the current inhibitDelay again, if the override was successful, currentInhibitDelay will be equal to shutdownGracePeriodRequested.
updatedInhibitDelay, err := m.dbusCon.CurrentInhibitDelay()
if err != nil {
return err
}
if updatedInhibitDelay != m.shutdownGracePeriodRequested {
return fmt.Errorf("node shutdown manager was unable to update logind InhibitDelayMaxSec to %v (ShutdownGracePeriod), current value of InhibitDelayMaxSec (%v) is less than requested ShutdownGracePeriod", m.shutdownGracePeriodRequested, updatedInhibitDelay)
}
}
err = m.aquireInhibitLock()
if err != nil {
return err
}
events, err := m.dbusCon.MonitorShutdown()
if err != nil {
releaseErr := m.dbusCon.ReleaseInhibitLock(m.inhibitLock)
if releaseErr != nil {
return fmt.Errorf("failed releasing inhibitLock: %v and failed monitoring shutdown: %v", releaseErr, err)
}
return fmt.Errorf("failed to monitor shutdown: %v", err)
}
go func() {
// Monitor for shutdown events. This follows the logind Inhibit Delay pattern described on https://www.freedesktop.org/wiki/Software/systemd/inhibit/
// 1. When shutdown manager starts, an inhibit lock is taken.
// 2. When shutdown(true) event is received, process the shutdown and release the inhibit lock.
// 3. When shutdown(false) event is received, this indicates a previous shutdown was cancelled. In this case, acquire the inhibit lock again.
for {
select {
case isShuttingDown := <-events:
klog.V(1).Infof("Shutdown manager detected new shutdown event, isNodeShuttingDownNow: %t", isShuttingDown)
m.nodeShuttingDownMutex.Lock()
m.nodeShuttingDownNow = isShuttingDown
m.nodeShuttingDownMutex.Unlock()
if isShuttingDown {
m.processShutdownEvent()
} else {
m.aquireInhibitLock()
}
}
}
}()
return nil
}
func (m *Manager) aquireInhibitLock() error {
lock, err := m.dbusCon.InhibitShutdown()
if err != nil {
return err
}
m.inhibitLock = lock
return nil
}
// ShutdownStatus will return an error if the node is currently shutting down.
func (m *Manager) ShutdownStatus() error {
if !utilfeature.DefaultFeatureGate.Enabled(features.GracefulNodeShutdown) {
return nil
}
m.nodeShuttingDownMutex.Lock()
defer m.nodeShuttingDownMutex.Unlock()
if m.nodeShuttingDownNow {
return fmt.Errorf("node is shutting down")
}
return nil
}
func (m *Manager) processShutdownEvent() error {
klog.V(1).Infof("Shutdown manager processing shutdown event")
activePods := m.getPods()
nonCriticalPodGracePeriod := m.shutdownGracePeriodRequested - m.shutdownGracePeriodCriticalPods
var wg sync.WaitGroup
wg.Add(len(activePods))
for _, pod := range activePods {
go func(pod *v1.Pod) {
defer wg.Done()
var gracePeriodOverride int64
if kubelettypes.IsCriticalPod(pod) {
gracePeriodOverride = int64(m.shutdownGracePeriodCriticalPods.Seconds())
m.clock.Sleep(nonCriticalPodGracePeriod)
} else {
gracePeriodOverride = int64(nonCriticalPodGracePeriod.Seconds())
}
// If the pod's spec specifies a termination gracePeriod which is less than the gracePeriodOverride calculated, use the pod spec termination gracePeriod.
if pod.Spec.TerminationGracePeriodSeconds != nil && *pod.Spec.TerminationGracePeriodSeconds <= gracePeriodOverride {
gracePeriodOverride = *pod.Spec.TerminationGracePeriodSeconds
}
klog.V(1).Infof("Shutdown manager killing pod %q with gracePeriod: %v seconds", format.Pod(pod), gracePeriodOverride)
status := v1.PodStatus{
Phase: v1.PodFailed,
Message: nodeShutdownMessage,
Reason: nodeShutdownReason,
}
err := m.killPod(pod, status, &gracePeriodOverride)
if err != nil {
klog.V(1).Infof("Shutdown manager failed killing pod %q: %v", format.Pod(pod), err)
} else {
klog.V(1).Infof("Shutdown manager finished killing pod %q", format.Pod(pod))
}
}(pod)
}
c := make(chan struct{})
go func() {
defer close(c)
wg.Wait()
}()
// We want to ensure that inhibitLock is released, so only wait up to the shutdownGracePeriodRequested timeout.
select {
case <-c:
break
case <-time.After(m.shutdownGracePeriodRequested):
klog.V(1).Infof("Shutdown manager pod killing did not complete in %v", m.shutdownGracePeriodRequested)
}
m.dbusCon.ReleaseInhibitLock(m.inhibitLock)
klog.V(1).Infof("Shutdown manager completed processing shutdown event, node will shutdown shortly")
return nil
}

View File

@ -0,0 +1,261 @@
// +build linux
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodeshutdown
import (
"fmt"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/apis/scheduling"
pkgfeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/nodeshutdown/systemd"
)
type fakeDbus struct {
currentInhibitDelay time.Duration
overrideSystemInhibitDelay time.Duration
shutdownChan chan bool
didInhibitShutdown bool
didOverrideInhibitDelay bool
}
func (f *fakeDbus) CurrentInhibitDelay() (time.Duration, error) {
if f.didOverrideInhibitDelay {
return f.overrideSystemInhibitDelay, nil
}
return f.currentInhibitDelay, nil
}
func (f *fakeDbus) InhibitShutdown() (systemd.InhibitLock, error) {
f.didInhibitShutdown = true
return systemd.InhibitLock(0), nil
}
func (f *fakeDbus) ReleaseInhibitLock(lock systemd.InhibitLock) error {
return nil
}
func (f *fakeDbus) ReloadLogindConf() error {
return nil
}
func (f *fakeDbus) MonitorShutdown() (<-chan bool, error) {
return f.shutdownChan, nil
}
func (f *fakeDbus) OverrideInhibitDelay(inhibitDelayMax time.Duration) error {
f.didOverrideInhibitDelay = true
return nil
}
func makePod(name string, criticalPod bool, terminationGracePeriod *int64) *v1.Pod {
var priority int32
if criticalPod {
priority = scheduling.SystemCriticalPriority
}
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
UID: types.UID(name),
},
Spec: v1.PodSpec{
Priority: &priority,
TerminationGracePeriodSeconds: terminationGracePeriod,
},
}
}
func TestManager(t *testing.T) {
normalPodNoGracePeriod := makePod("normal-pod-nil-grace-period", false /* criticalPod */, nil /* terminationGracePeriod */)
criticalPodNoGracePeriod := makePod("critical-pod-nil-grace-period", true /* criticalPod */, nil /* terminationGracePeriod */)
shortGracePeriod := int64(2)
normalPodGracePeriod := makePod("normal-pod-grace-period", false /* criticalPod */, &shortGracePeriod /* terminationGracePeriod */)
criticalPodGracePeriod := makePod("critical-pod-grace-period", true /* criticalPod */, &shortGracePeriod /* terminationGracePeriod */)
longGracePeriod := int64(1000)
normalPodLongGracePeriod := makePod("normal-pod-long-grace-period", false /* criticalPod */, &longGracePeriod /* terminationGracePeriod */)
var tests = []struct {
desc string
activePods []*v1.Pod
shutdownGracePeriodRequested time.Duration
shutdownGracePeriodCriticalPods time.Duration
systemInhibitDelay time.Duration
overrideSystemInhibitDelay time.Duration
expectedDidOverrideInhibitDelay bool
expectedPodToGracePeriodOverride map[string]int64
expectedError error
}{
{
desc: "no override (total=30s, critical=10s)",
activePods: []*v1.Pod{normalPodNoGracePeriod, criticalPodNoGracePeriod},
shutdownGracePeriodRequested: time.Duration(30 * time.Second),
shutdownGracePeriodCriticalPods: time.Duration(10 * time.Second),
systemInhibitDelay: time.Duration(40 * time.Second),
overrideSystemInhibitDelay: time.Duration(40 * time.Second),
expectedDidOverrideInhibitDelay: false,
expectedPodToGracePeriodOverride: map[string]int64{"normal-pod-nil-grace-period": 20, "critical-pod-nil-grace-period": 10},
},
{
desc: "no override (total=30s, critical=10s) pods with terminationGracePeriod and without",
activePods: []*v1.Pod{normalPodNoGracePeriod, criticalPodNoGracePeriod, normalPodGracePeriod, criticalPodGracePeriod},
shutdownGracePeriodRequested: time.Duration(30 * time.Second),
shutdownGracePeriodCriticalPods: time.Duration(10 * time.Second),
systemInhibitDelay: time.Duration(40 * time.Second),
overrideSystemInhibitDelay: time.Duration(40 * time.Second),
expectedDidOverrideInhibitDelay: false,
expectedPodToGracePeriodOverride: map[string]int64{"normal-pod-nil-grace-period": 20, "critical-pod-nil-grace-period": 10, "normal-pod-grace-period": 2, "critical-pod-grace-period": 2},
},
{
desc: "no override (total=30s, critical=10s) pod with long terminationGracePeriod is overridden",
activePods: []*v1.Pod{normalPodNoGracePeriod, criticalPodNoGracePeriod, normalPodGracePeriod, criticalPodGracePeriod, normalPodLongGracePeriod},
shutdownGracePeriodRequested: time.Duration(30 * time.Second),
shutdownGracePeriodCriticalPods: time.Duration(10 * time.Second),
systemInhibitDelay: time.Duration(40 * time.Second),
overrideSystemInhibitDelay: time.Duration(40 * time.Second),
expectedDidOverrideInhibitDelay: false,
expectedPodToGracePeriodOverride: map[string]int64{"normal-pod-nil-grace-period": 20, "critical-pod-nil-grace-period": 10, "normal-pod-grace-period": 2, "critical-pod-grace-period": 2, "normal-pod-long-grace-period": 20},
},
{
desc: "no override (total=30, critical=0)",
activePods: []*v1.Pod{normalPodNoGracePeriod, criticalPodNoGracePeriod},
shutdownGracePeriodRequested: time.Duration(30 * time.Second),
shutdownGracePeriodCriticalPods: time.Duration(0 * time.Second),
systemInhibitDelay: time.Duration(40 * time.Second),
overrideSystemInhibitDelay: time.Duration(40 * time.Second),
expectedDidOverrideInhibitDelay: false,
expectedPodToGracePeriodOverride: map[string]int64{"normal-pod-nil-grace-period": 30, "critical-pod-nil-grace-period": 0},
},
{
desc: "override successful (total=30, critical=10)",
activePods: []*v1.Pod{normalPodNoGracePeriod, criticalPodNoGracePeriod},
shutdownGracePeriodRequested: time.Duration(30 * time.Second),
shutdownGracePeriodCriticalPods: time.Duration(10 * time.Second),
systemInhibitDelay: time.Duration(5 * time.Second),
overrideSystemInhibitDelay: time.Duration(30 * time.Second),
expectedDidOverrideInhibitDelay: true,
expectedPodToGracePeriodOverride: map[string]int64{"normal-pod-nil-grace-period": 20, "critical-pod-nil-grace-period": 10},
},
{
desc: "override unsuccessful",
activePods: []*v1.Pod{normalPodNoGracePeriod, criticalPodNoGracePeriod},
shutdownGracePeriodRequested: time.Duration(30 * time.Second),
shutdownGracePeriodCriticalPods: time.Duration(10 * time.Second),
systemInhibitDelay: time.Duration(5 * time.Second),
overrideSystemInhibitDelay: time.Duration(5 * time.Second),
expectedDidOverrideInhibitDelay: true,
expectedPodToGracePeriodOverride: map[string]int64{"normal-pod-nil-grace-period": 5, "critical-pod-nil-grace-period": 0},
expectedError: fmt.Errorf("unable to update logind InhibitDelayMaxSec to 30s (ShutdownGracePeriod), current value of InhibitDelayMaxSec (5s) is less than requested ShutdownGracePeriod"),
},
{
desc: "override unsuccessful, zero time",
activePods: []*v1.Pod{normalPodNoGracePeriod, criticalPodNoGracePeriod},
shutdownGracePeriodRequested: time.Duration(5 * time.Second),
shutdownGracePeriodCriticalPods: time.Duration(5 * time.Second),
systemInhibitDelay: time.Duration(0 * time.Second),
overrideSystemInhibitDelay: time.Duration(0 * time.Second),
expectedError: fmt.Errorf("unable to update logind InhibitDelayMaxSec to 5s (ShutdownGracePeriod), current value of InhibitDelayMaxSec (0s) is less than requested ShutdownGracePeriod"),
},
{
desc: "no override, all time to critical pods",
activePods: []*v1.Pod{normalPodNoGracePeriod, criticalPodNoGracePeriod},
shutdownGracePeriodRequested: time.Duration(5 * time.Second),
shutdownGracePeriodCriticalPods: time.Duration(5 * time.Second),
systemInhibitDelay: time.Duration(5 * time.Second),
overrideSystemInhibitDelay: time.Duration(5 * time.Second),
expectedDidOverrideInhibitDelay: false,
expectedPodToGracePeriodOverride: map[string]int64{"normal-pod-nil-grace-period": 0, "critical-pod-nil-grace-period": 5},
},
}
for _, tc := range tests {
t.Run(tc.desc, func(t *testing.T) {
activePodsFunc := func() []*v1.Pod {
return tc.activePods
}
type PodKillInfo struct {
Name string
GracePeriod int64
}
podKillChan := make(chan PodKillInfo)
killPodsFunc := func(pod *v1.Pod, status v1.PodStatus, gracePeriodOverride *int64) error {
var gracePeriod int64
if gracePeriodOverride != nil {
gracePeriod = *gracePeriodOverride
}
podKillChan <- PodKillInfo{Name: pod.Name, GracePeriod: gracePeriod}
return nil
}
fakeShutdownChan := make(chan bool)
fakeDbus := &fakeDbus{currentInhibitDelay: tc.systemInhibitDelay, shutdownChan: fakeShutdownChan, overrideSystemInhibitDelay: tc.overrideSystemInhibitDelay}
systemDbus = func() (dbusInhibiter, error) {
return fakeDbus, nil
}
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, pkgfeatures.GracefulNodeShutdown, true)()
manager := NewManager(activePodsFunc, killPodsFunc, tc.shutdownGracePeriodRequested, tc.shutdownGracePeriodCriticalPods)
manager.clock = clock.NewFakeClock(time.Now())
err := manager.Start()
if tc.expectedError != nil {
if !strings.Contains(err.Error(), tc.expectedError.Error()) {
t.Errorf("unexpected error message. Got: %s want %s", err.Error(), tc.expectedError.Error())
}
} else {
assert.NoError(t, err, "expected manager.Start() to not return error")
assert.True(t, fakeDbus.didInhibitShutdown, "expected that manager inhibited shutdown")
assert.NoError(t, manager.ShutdownStatus(), "expected that manager does not return error since shutdown is not active")
// Send fake shutdown event
fakeShutdownChan <- true
// Wait for all the pods to be killed
killedPodsToGracePeriods := map[string]int64{}
for i := 0; i < len(tc.activePods); i++ {
select {
case podKillInfo := <-podKillChan:
killedPodsToGracePeriods[podKillInfo.Name] = podKillInfo.GracePeriod
continue
case <-time.After(1 * time.Second):
t.Fatal()
}
}
assert.Error(t, manager.ShutdownStatus(), "expected that manager returns error since shutdown is active")
assert.Equal(t, tc.expectedPodToGracePeriodOverride, killedPodsToGracePeriods)
assert.Equal(t, tc.expectedDidOverrideInhibitDelay, fakeDbus.didOverrideInhibitDelay, "override system inhibit delay differs")
}
})
}
}

View File

@ -0,0 +1,43 @@
// +build !linux
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodeshutdown
import (
"time"
"k8s.io/kubernetes/pkg/kubelet/eviction"
)
// Manager is a fake node shutdown manager for non linux platforms.
type Manager struct{}
// NewManager returns a fake node shutdown manager for non linux platforms.
func NewManager(getPodsFunc eviction.ActivePodsFunc, killPodFunc eviction.KillPodFunc, shutdownGracePeriodRequested, shutdownGracePeriodCriticalPods time.Duration) *Manager {
return &Manager{}
}
// Start is a no-op always returning nil for non linux platforms.
func (m *Manager) Start() error {
return nil
}
// ShutdownStatus is a no-op always returning nil for non linux platforms.
func (m *Manager) ShutdownStatus() error {
return nil
}

View File

@ -0,0 +1,54 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"doc.go",
"inhibit_linux.go",
"inhibit_others.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/nodeshutdown/systemd",
visibility = ["//visibility:public"],
deps = select({
"@io_bazel_rules_go//go/platform:android": [
"//vendor/github.com/godbus/dbus/v5:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
],
"@io_bazel_rules_go//go/platform:linux": [
"//vendor/github.com/godbus/dbus/v5:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
],
"//conditions:default": [],
}),
)
go_test(
name = "go_default_test",
srcs = ["inhibit_linux_test.go"],
embed = [":go_default_library"],
deps = select({
"@io_bazel_rules_go//go/platform:android": [
"//vendor/github.com/godbus/dbus/v5:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
],
"@io_bazel_rules_go//go/platform:linux": [
"//vendor/github.com/godbus/dbus/v5:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
],
"//conditions:default": [],
}),
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,18 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package systemd provides utility functions for kubelet to perform systemd related operations.
package systemd

View File

@ -0,0 +1,186 @@
// +build linux
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package systemd
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"syscall"
"time"
"github.com/godbus/dbus/v5"
"k8s.io/klog/v2"
)
const (
logindService = "org.freedesktop.login1"
logindObject = dbus.ObjectPath("/org/freedesktop/login1")
logindInterface = "org.freedesktop.login1.Manager"
)
type dBusConnector interface {
Object(dest string, path dbus.ObjectPath) dbus.BusObject
AddMatchSignal(options ...dbus.MatchOption) error
Signal(ch chan<- *dbus.Signal)
}
// DBusCon has functions that can be used to interact with systemd and logind over dbus.
type DBusCon struct {
SystemBus dBusConnector
}
// InhibitLock is a lock obtained after creating an systemd inhibitor by calling InhibitShutdown().
type InhibitLock uint32
// CurrentInhibitDelay returns the current delay inhibitor timeout value as configured in logind.conf(5).
// see https://www.freedesktop.org/software/systemd/man/logind.conf.html for more details.
func (bus *DBusCon) CurrentInhibitDelay() (time.Duration, error) {
obj := bus.SystemBus.Object(logindService, logindObject)
res, err := obj.GetProperty(logindInterface + ".InhibitDelayMaxUSec")
if err != nil {
return 0, fmt.Errorf("failed reading InhibitDelayMaxUSec property from logind: %v", err)
}
delay, ok := res.Value().(uint64)
if !ok {
return 0, fmt.Errorf("InhibitDelayMaxUSec from logind is not a uint64 as expected")
}
// InhibitDelayMaxUSec is in microseconds
duration := time.Duration(delay) * time.Microsecond
return duration, nil
}
// InhibitShutdown creates an systemd inhibitor by calling logind's Inhibt() and returns the inhibitor lock
// see https://www.freedesktop.org/wiki/Software/systemd/inhibit/ for more details.
func (bus *DBusCon) InhibitShutdown() (InhibitLock, error) {
obj := bus.SystemBus.Object(logindService, logindObject)
what := "shutdown"
who := "kubelet"
why := "Kubelet needs time to handle node shutdown"
mode := "delay"
call := obj.Call("org.freedesktop.login1.Manager.Inhibit", 0, what, who, why, mode)
if call.Err != nil {
return InhibitLock(0), fmt.Errorf("failed creating systemd inhibitor: %v", call.Err)
}
var fd uint32
err := call.Store(&fd)
if err != nil {
return InhibitLock(0), fmt.Errorf("failed storing inhibit lock file descriptor: %v", err)
}
return InhibitLock(fd), nil
}
// ReleaseInhibitLock will release the underlying inhibit lock which will cause the shutdown to start.
func (bus *DBusCon) ReleaseInhibitLock(lock InhibitLock) error {
err := syscall.Close(int(lock))
if err != nil {
return fmt.Errorf("unable to close systemd inhibitor lock: %v", err)
}
return nil
}
// ReloadLogindConf uses dbus to send a SIGHUP to the systemd-logind service causing logind to reload it's configuration.
func (bus *DBusCon) ReloadLogindConf() error {
systemdService := "org.freedesktop.systemd1"
systemdObject := "/org/freedesktop/systemd1"
systemdInterface := "org.freedesktop.systemd1.Manager"
obj := bus.SystemBus.Object(systemdService, dbus.ObjectPath(systemdObject))
unit := "systemd-logind.service"
who := "all"
var signal int32 = 1 // SIGHUP
call := obj.Call(systemdInterface+".KillUnit", 0, unit, who, signal)
if call.Err != nil {
return fmt.Errorf("unable to reload logind conf: %v", call.Err)
}
return nil
}
// MonitorShutdown detects the a node shutdown by watching for "PrepareForShutdown" logind events.
// see https://www.freedesktop.org/wiki/Software/systemd/inhibit/ for more details.
func (bus *DBusCon) MonitorShutdown() (<-chan bool, error) {
err := bus.SystemBus.AddMatchSignal(dbus.WithMatchInterface(logindInterface), dbus.WithMatchMember("PrepareForShutdown"), dbus.WithMatchObjectPath("/org/freedesktop/login1"))
if err != nil {
return nil, err
}
busChan := make(chan *dbus.Signal, 1)
bus.SystemBus.Signal(busChan)
shutdownChan := make(chan bool, 1)
go func() {
for {
select {
case event := <-busChan:
if event == nil || len(event.Body) == 0 {
klog.Errorf("Failed obtaining shutdown event, PrepareForShutdown event was empty")
}
shutdownActive, ok := event.Body[0].(bool)
if !ok {
klog.Errorf("Failed obtaining shutdown event, PrepareForShutdown event was not bool type as expected")
return
}
shutdownChan <- shutdownActive
}
}
}()
return shutdownChan, nil
}
const (
logindConfigDirectory = "/etc/systemd/logind.conf.d/"
kubeletLogindConf = "99-kubelet.conf"
)
// OverrideInhibitDelay writes a config file to logind overriding InhibitDelayMaxSec to the value desired.
func (bus *DBusCon) OverrideInhibitDelay(inhibitDelayMax time.Duration) error {
err := os.MkdirAll(logindConfigDirectory, 0755)
if err != nil {
return fmt.Errorf("failed creating %v directory: %v", logindConfigDirectory, err)
}
// This attempts to set the `InhibitDelayMaxUSec` dbus property of logind which is MaxInhibitDelay measured in microseconds.
// The corresponding logind config file property is named `InhibitDelayMaxSec` and is measured in seconds which is set via logind.conf config.
// Refer to https://www.freedesktop.org/software/systemd/man/logind.conf.html for more details.
inhibitOverride := fmt.Sprintf(`# Kubelet logind override
[Login]
InhibitDelayMaxSec=%.0f
`, inhibitDelayMax.Seconds())
logindOverridePath := filepath.Join(logindConfigDirectory, kubeletLogindConf)
if err := ioutil.WriteFile(logindOverridePath, []byte(inhibitOverride), 0755); err != nil {
return fmt.Errorf("failed writing logind shutdown inhibit override file %v: %v", logindOverridePath, err)
}
return nil
}

View File

@ -0,0 +1,185 @@
// +build linux
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package systemd
import (
"context"
"fmt"
"testing"
"time"
"github.com/godbus/dbus/v5"
"github.com/stretchr/testify/assert"
)
type fakeDBusObject struct {
properties map[string]interface{}
bodyValue interface{}
}
func (obj *fakeDBusObject) Call(method string, flags dbus.Flags, args ...interface{}) *dbus.Call {
return &dbus.Call{Err: nil, Body: []interface{}{obj.bodyValue}}
}
func (obj *fakeDBusObject) CallWithContext(ctx context.Context, method string, flags dbus.Flags, args ...interface{}) *dbus.Call {
return nil
}
func (obj *fakeDBusObject) Go(method string, flags dbus.Flags, ch chan *dbus.Call, args ...interface{}) *dbus.Call {
return nil
}
func (obj *fakeDBusObject) GoWithContext(ctx context.Context, method string, flags dbus.Flags, ch chan *dbus.Call, args ...interface{}) *dbus.Call {
return nil
}
func (obj *fakeDBusObject) AddMatchSignal(iface, member string, options ...dbus.MatchOption) *dbus.Call {
return nil
}
func (obj *fakeDBusObject) RemoveMatchSignal(iface, member string, options ...dbus.MatchOption) *dbus.Call {
return nil
}
func (obj *fakeDBusObject) GetProperty(p string) (dbus.Variant, error) {
value, ok := obj.properties[p]
if !ok {
return dbus.Variant{}, fmt.Errorf("property %q does not exist in properties: %+v", p, obj.properties)
}
return dbus.MakeVariant(value), nil
}
func (obj *fakeDBusObject) SetProperty(p string, v interface{}) error {
return nil
}
func (obj *fakeDBusObject) Destination() string {
return ""
}
func (obj *fakeDBusObject) Path() dbus.ObjectPath {
return ""
}
type fakeSystemDBus struct {
fakeDBusObject *fakeDBusObject
signalChannel chan<- *dbus.Signal
}
func (f *fakeSystemDBus) Object(dest string, path dbus.ObjectPath) dbus.BusObject {
return f.fakeDBusObject
}
func (f *fakeSystemDBus) Signal(ch chan<- *dbus.Signal) {
f.signalChannel = ch
}
func (f *fakeSystemDBus) AddMatchSignal(options ...dbus.MatchOption) error {
return nil
}
func TestCurrentInhibitDelay(t *testing.T) {
thirtySeconds := time.Duration(30) * time.Second
bus := DBusCon{
SystemBus: &fakeSystemDBus{
fakeDBusObject: &fakeDBusObject{
properties: map[string]interface{}{
"org.freedesktop.login1.Manager.InhibitDelayMaxUSec": uint64(thirtySeconds / time.Microsecond),
},
},
},
}
delay, err := bus.CurrentInhibitDelay()
assert.NoError(t, err)
assert.Equal(t, thirtySeconds, delay)
}
func TestInhibitShutdown(t *testing.T) {
var fakeFd uint32 = 42
bus := DBusCon{
SystemBus: &fakeSystemDBus{
fakeDBusObject: &fakeDBusObject{
bodyValue: fakeFd,
},
},
}
fdLock, err := bus.InhibitShutdown()
assert.Equal(t, InhibitLock(fakeFd), fdLock)
assert.NoError(t, err)
}
func TestReloadLogindConf(t *testing.T) {
bus := DBusCon{
SystemBus: &fakeSystemDBus{
fakeDBusObject: &fakeDBusObject{},
},
}
assert.NoError(t, bus.ReloadLogindConf())
}
func TestMonitorShutdown(t *testing.T) {
var tests = []struct {
desc string
shutdownActive bool
}{
{
desc: "shutdown is active",
shutdownActive: true,
},
{
desc: "shutdown is not active",
shutdownActive: false,
},
}
for _, tc := range tests {
t.Run(tc.desc, func(t *testing.T) {
fakeSystemBus := &fakeSystemDBus{}
bus := DBusCon{
SystemBus: fakeSystemBus,
}
outChan, err := bus.MonitorShutdown()
assert.NoError(t, err)
done := make(chan bool)
go func() {
select {
case res := <-outChan:
assert.Equal(t, tc.shutdownActive, res)
done <- true
case <-time.After(5 * time.Second):
t.Errorf("Timed out waiting for shutdown message")
done <- true
}
}()
signal := &dbus.Signal{Body: []interface{}{tc.shutdownActive}}
fakeSystemBus.signalChannel <- signal
<-done
})
}
}

View File

@ -0,0 +1,19 @@
// +build !linux
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package systemd

View File

@ -498,6 +498,7 @@ func ReadyCondition(
storageErrorsFunc func() error, // typically Kubelet.runtimeState.storageErrors
appArmorValidateHostFunc func() error, // typically Kubelet.appArmorValidator.ValidateHost, might be nil depending on whether there was an appArmorValidator
cmStatusFunc func() cm.Status, // typically Kubelet.containerManager.Status
nodeShutdownManagerErrorsFunc func() error, // typically kubelet.shutdownManager.errors.
recordEventFunc func(eventType, event string), // typically Kubelet.recordNodeStatusEvent
) Setter {
return func(node *v1.Node) error {
@ -512,7 +513,7 @@ func ReadyCondition(
Message: "kubelet is posting ready status",
LastHeartbeatTime: currentTime,
}
errs := []error{runtimeErrorsFunc(), networkErrorsFunc(), storageErrorsFunc()}
errs := []error{runtimeErrorsFunc(), networkErrorsFunc(), storageErrorsFunc(), nodeShutdownManagerErrorsFunc()}
requiredCapacities := []v1.ResourceName{v1.ResourceCPU, v1.ResourceMemory, v1.ResourcePods}
if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) {
requiredCapacities = append(requiredCapacities, v1.ResourceEphemeralStorage)

View File

@ -1109,15 +1109,16 @@ func TestReadyCondition(t *testing.T) {
}
cases := []struct {
desc string
node *v1.Node
runtimeErrors error
networkErrors error
storageErrors error
appArmorValidateHostFunc func() error
cmStatus cm.Status
expectConditions []v1.NodeCondition
expectEvents []testEvent
desc string
node *v1.Node
runtimeErrors error
networkErrors error
storageErrors error
appArmorValidateHostFunc func() error
cmStatus cm.Status
nodeShutdownManagerErrors error
expectConditions []v1.NodeCondition
expectEvents []testEvent
}{
{
desc: "new, ready",
@ -1154,6 +1155,12 @@ func TestReadyCondition(t *testing.T) {
storageErrors: errors.New("some storage error"),
expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "some storage error", now, now)},
},
{
desc: "new, not ready: shutdown active",
node: withCapacity.DeepCopy(),
nodeShutdownManagerErrors: errors.New("node is shutting down"),
expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "node is shutting down", now, now)},
},
{
desc: "new, not ready: runtime and network errors",
node: withCapacity.DeepCopy(),
@ -1234,6 +1241,9 @@ func TestReadyCondition(t *testing.T) {
cmStatusFunc := func() cm.Status {
return tc.cmStatus
}
nodeShutdownErrorsFunc := func() error {
return tc.nodeShutdownManagerErrors
}
events := []testEvent{}
recordEventFunc := func(eventType, event string) {
events = append(events, testEvent{
@ -1242,7 +1252,7 @@ func TestReadyCondition(t *testing.T) {
})
}
// construct setter
setter := ReadyCondition(nowFunc, runtimeErrorsFunc, networkErrorsFunc, storageErrorsFunc, tc.appArmorValidateHostFunc, cmStatusFunc, recordEventFunc)
setter := ReadyCondition(nowFunc, runtimeErrorsFunc, networkErrorsFunc, storageErrorsFunc, tc.appArmorValidateHostFunc, cmStatusFunc, nodeShutdownErrorsFunc, recordEventFunc)
// call setter on node
if err := setter(tc.node); err != nil {
t.Fatalf("unexpected error: %v", err)

View File

@ -815,6 +815,15 @@ type KubeletConfiguration struct {
// Default: true
// +optional
EnableSystemLogHandler *bool `json:"enableSystemLogHandler,omitempty"`
// ShutdownGracePeriod specifies the total duration that the node should delay the shutdown and total grace period for pod termination during a node shutdown.
// Default: "30s"
// +optional
ShutdownGracePeriod metav1.Duration `json:"shutdownGracePeriod,omitempty"`
// ShutdownGracePeriodCriticalPods specifies the duration used to terminate critical pods during a node shutdown. This should be less than ShutdownGracePeriod.
// For example, if ShutdownGracePeriod=30s, and ShutdownGracePeriodCriticalPods=10s, during a node shutdown the first 20 seconds would be reserved for gracefully terminating normal pods, and the last 10 seconds would be reserved for terminating critical pods.
// Default: "10s"
// +optional
ShutdownGracePeriodCriticalPods metav1.Duration `json:"shutdownGracePeriodCriticalPods,omitempty"`
}
type KubeletAuthorizationMode string

View File

@ -301,6 +301,8 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) {
*out = new(bool)
**out = **in
}
out.ShutdownGracePeriod = in.ShutdownGracePeriod
out.ShutdownGracePeriodCriticalPods = in.ShutdownGracePeriodCriticalPods
return
}

1
vendor/modules.txt vendored
View File

@ -459,6 +459,7 @@ github.com/go-ozzo/ozzo-validation/is
github.com/go-stack/stack
# github.com/go-stack/stack => github.com/go-stack/stack v1.8.0
# github.com/godbus/dbus/v5 v5.0.3 => github.com/godbus/dbus/v5 v5.0.3
## explicit
github.com/godbus/dbus/v5
# github.com/godbus/dbus/v5 => github.com/godbus/dbus/v5 v5.0.3
# github.com/gogo/protobuf v1.3.1 => github.com/gogo/protobuf v1.3.1