diff --git a/pkg/apis/core/types.go b/pkg/apis/core/types.go index 5bbd397fbaa..885b8ccbb46 100644 --- a/pkg/apis/core/types.go +++ b/pkg/apis/core/types.go @@ -2433,7 +2433,7 @@ const ( PodReasonSchedulingGated = "SchedulingGated" // ContainersReady indicates whether all containers in the pod are ready. ContainersReady PodConditionType = "ContainersReady" - // AlphaNoCompatGuaranteeDisruptionTarget indicates the pod is about to be deleted due to a + // AlphaNoCompatGuaranteeDisruptionTarget indicates the pod is about to be terminated due to a // disruption (such as preemption, eviction API or garbage-collection). // The constant is to be renamed once the name is accepted within the KEP-3329. AlphaNoCompatGuaranteeDisruptionTarget PodConditionType = "DisruptionTarget" diff --git a/pkg/controller/disruption/disruption.go b/pkg/controller/disruption/disruption.go index b69eb8c9481..ca8d24665a5 100644 --- a/pkg/controller/disruption/disruption.go +++ b/pkg/controller/disruption/disruption.go @@ -999,7 +999,10 @@ func (dc *DisruptionController) nonTerminatingPodHasStaleDisruptionCondition(pod return false, 0 } _, cond := apipod.GetPodCondition(&pod.Status, v1.AlphaNoCompatGuaranteeDisruptionTarget) - if cond == nil || cond.Status != v1.ConditionTrue { + // Pod disruption conditions added by kubelet are never considered stale because the condition might take + // arbitrarily long before the pod is terminating (has deletion timestamp). Also, pod conditions present + // on pods in terminal phase are not stale to avoid unnecessary status updates. + if cond == nil || cond.Status != v1.ConditionTrue || cond.Reason == v1.AlphaNoCompatGuaranteePodReasonTerminationByKubelet || apipod.IsPodPhaseTerminal(pod.Status.Phase) { return false, 0 } waitFor := dc.stalePodDisruptionTimeout - dc.clock.Since(cond.LastTransitionTime.Time) diff --git a/pkg/kubelet/eviction/eviction_manager.go b/pkg/kubelet/eviction/eviction_manager.go index 4fc204200a6..8dfebe84ace 100644 --- a/pkg/kubelet/eviction/eviction_manager.go +++ b/pkg/kubelet/eviction/eviction_manager.go @@ -27,11 +27,14 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/record" v1helper "k8s.io/component-helpers/scheduling/corev1" statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" apiv1resource "k8s.io/kubernetes/pkg/api/v1/resource" v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos" + "k8s.io/kubernetes/pkg/features" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/metrics" @@ -386,7 +389,16 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act gracePeriodOverride = m.config.MaxPodGracePeriodSeconds } message, annotations := evictionMessage(resourceToReclaim, pod, statsFunc) - if m.evictPod(pod, gracePeriodOverride, message, annotations) { + var condition *v1.PodCondition + if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { + condition = &v1.PodCondition{ + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionTrue, + Reason: v1.AlphaNoCompatGuaranteePodReasonTerminationByKubelet, + Message: message, + } + } + if m.evictPod(pod, gracePeriodOverride, message, annotations, condition) { metrics.Evictions.WithLabelValues(string(thresholdToReclaim.Signal)).Inc() return []*v1.Pod{pod} } @@ -492,7 +504,7 @@ func (m *managerImpl) emptyDirLimitEviction(podStats statsapi.PodStats, pod *v1. used := podVolumeUsed[pod.Spec.Volumes[i].Name] if used != nil && size != nil && size.Sign() == 1 && used.Cmp(*size) > 0 { // the emptyDir usage exceeds the size limit, evict the pod - if m.evictPod(pod, 0, fmt.Sprintf(emptyDirMessageFmt, pod.Spec.Volumes[i].Name, size.String()), nil) { + if m.evictPod(pod, 0, fmt.Sprintf(emptyDirMessageFmt, pod.Spec.Volumes[i].Name, size.String()), nil, nil) { metrics.Evictions.WithLabelValues(signalEmptyDirFsLimit).Inc() return true } @@ -519,7 +531,8 @@ func (m *managerImpl) podEphemeralStorageLimitEviction(podStats statsapi.PodStat podEphemeralStorageLimit := podLimits[v1.ResourceEphemeralStorage] if podEphemeralStorageTotalUsage.Cmp(podEphemeralStorageLimit) > 0 { // the total usage of pod exceeds the total size limit of containers, evict the pod - if m.evictPod(pod, 0, fmt.Sprintf(podEphemeralStorageMessageFmt, podEphemeralStorageLimit.String()), nil) { + message := fmt.Sprintf(podEphemeralStorageMessageFmt, podEphemeralStorageLimit.String()) + if m.evictPod(pod, 0, message, nil, nil) { metrics.Evictions.WithLabelValues(signalEphemeralPodFsLimit).Inc() return true } @@ -545,7 +558,7 @@ func (m *managerImpl) containerEphemeralStorageLimitEviction(podStats statsapi.P if ephemeralStorageThreshold, ok := thresholdsMap[containerStat.Name]; ok { if ephemeralStorageThreshold.Cmp(*containerUsed) < 0 { - if m.evictPod(pod, 0, fmt.Sprintf(containerEphemeralStorageMessageFmt, containerStat.Name, ephemeralStorageThreshold.String()), nil) { + if m.evictPod(pod, 0, fmt.Sprintf(containerEphemeralStorageMessageFmt, containerStat.Name, ephemeralStorageThreshold.String()), nil, nil) { metrics.Evictions.WithLabelValues(signalEphemeralContainerFsLimit).Inc() return true } @@ -556,7 +569,7 @@ func (m *managerImpl) containerEphemeralStorageLimitEviction(podStats statsapi.P return false } -func (m *managerImpl) evictPod(pod *v1.Pod, gracePeriodOverride int64, evictMsg string, annotations map[string]string) bool { +func (m *managerImpl) evictPod(pod *v1.Pod, gracePeriodOverride int64, evictMsg string, annotations map[string]string, condition *v1.PodCondition) bool { // If the pod is marked as critical and static, and support for critical pod annotations is enabled, // do not evict such pods. Static pods are not re-admitted after evictions. // https://github.com/kubernetes/kubernetes/issues/40573 has more details. @@ -572,6 +585,9 @@ func (m *managerImpl) evictPod(pod *v1.Pod, gracePeriodOverride int64, evictMsg status.Phase = v1.PodFailed status.Reason = Reason status.Message = evictMsg + if condition != nil { + podutil.UpdatePodCondition(status, condition) + } }) if err != nil { klog.ErrorS(err, "Eviction manager: pod failed to evict", "pod", klog.KObj(pod)) diff --git a/pkg/kubelet/eviction/eviction_manager_test.go b/pkg/kubelet/eviction/eviction_manager_test.go index 578d586282c..1bd8d66c683 100644 --- a/pkg/kubelet/eviction/eviction_manager_test.go +++ b/pkg/kubelet/eviction/eviction_manager_test.go @@ -23,6 +23,8 @@ import ( "time" gomock "github.com/golang/mock/gomock" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/types" @@ -185,6 +187,206 @@ type podToMake struct { perLocalVolumeInodesUsed string } +func TestMemoryPressure_VerifyPodStatus(t *testing.T) { + testCases := map[string]struct { + wantPodStatus v1.PodStatus + }{ + "eviction due to memory pressure": { + wantPodStatus: v1.PodStatus{ + Phase: v1.PodFailed, + Reason: "Evicted", + Message: "The node was low on resource: memory. ", + }, + }, + } + for name, tc := range testCases { + for _, enablePodDisruptionConditions := range []bool{false, true} { + t.Run(fmt.Sprintf("%s;PodDisruptionConditions=%v", name, enablePodDisruptionConditions), func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodDisruptionConditions, enablePodDisruptionConditions)() + + podMaker := makePodWithMemoryStats + summaryStatsMaker := makeMemoryStats + podsToMake := []podToMake{ + {name: "below-requests", requests: newResourceList("", "1Gi", ""), limits: newResourceList("", "1Gi", ""), memoryWorkingSet: "900Mi"}, + {name: "above-requests", requests: newResourceList("", "100Mi", ""), limits: newResourceList("", "1Gi", ""), memoryWorkingSet: "700Mi"}, + } + pods := []*v1.Pod{} + podStats := map[*v1.Pod]statsapi.PodStats{} + for _, podToMake := range podsToMake { + pod, podStat := podMaker(podToMake.name, podToMake.priority, podToMake.requests, podToMake.limits, podToMake.memoryWorkingSet) + pods = append(pods, pod) + podStats[pod] = podStat + } + activePodsFunc := func() []*v1.Pod { + return pods + } + + fakeClock := testingclock.NewFakeClock(time.Now()) + podKiller := &mockPodKiller{} + diskInfoProvider := &mockDiskInfoProvider{dedicatedImageFs: false} + diskGC := &mockDiskGC{err: nil} + nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""} + + config := Config{ + PressureTransitionPeriod: time.Minute * 5, + Thresholds: []evictionapi.Threshold{ + { + Signal: evictionapi.SignalMemoryAvailable, + Operator: evictionapi.OpLessThan, + Value: evictionapi.ThresholdValue{ + Quantity: quantityMustParse("2Gi"), + }, + }, + }, + } + summaryProvider := &fakeSummaryProvider{result: summaryStatsMaker("1500Mi", podStats)} + manager := &managerImpl{ + clock: fakeClock, + killPodFunc: podKiller.killPodNow, + imageGC: diskGC, + containerGC: diskGC, + config: config, + recorder: &record.FakeRecorder{}, + summaryProvider: summaryProvider, + nodeRef: nodeRef, + nodeConditionsLastObservedAt: nodeConditionsObservedAt{}, + thresholdsFirstObservedAt: thresholdsObservedAt{}, + } + + // synchronize to detect the memory pressure + manager.synchronize(diskInfoProvider, activePodsFunc) + + // verify memory pressure is detected + if !manager.IsUnderMemoryPressure() { + t.Fatalf("Manager should have detected memory pressure") + } + + // verify a pod is selected for eviction + if podKiller.pod == nil { + t.Fatalf("Manager should have selected a pod for eviction") + } + + wantPodStatus := tc.wantPodStatus.DeepCopy() + if enablePodDisruptionConditions { + wantPodStatus.Conditions = append(wantPodStatus.Conditions, v1.PodCondition{ + Type: "DisruptionTarget", + Status: "True", + Reason: "TerminationByKubelet", + Message: "The node was low on resource: memory. ", + }) + } + + // verify the pod status after applying the status update function + podKiller.statusFn(&podKiller.pod.Status) + if diff := cmp.Diff(*wantPodStatus, podKiller.pod.Status, cmpopts.IgnoreFields(v1.PodCondition{}, "LastProbeTime", "LastTransitionTime")); diff != "" { + t.Errorf("Unexpected pod status of the evicted pod (-want,+got):\n%s", diff) + } + }) + } + } +} + +func TestDiskPressureNodeFs_VerifyPodStatus(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.LocalStorageCapacityIsolation, true)() + + testCases := map[string]struct { + wantPodStatus v1.PodStatus + }{ + "eviction due to disk pressure": { + wantPodStatus: v1.PodStatus{ + Phase: v1.PodFailed, + Reason: "Evicted", + Message: "The node was low on resource: ephemeral-storage. ", + }, + }, + } + for name, tc := range testCases { + for _, enablePodDisruptionConditions := range []bool{false, true} { + t.Run(fmt.Sprintf("%s;PodDisruptionConditions=%v", name, enablePodDisruptionConditions), func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodDisruptionConditions, enablePodDisruptionConditions)() + + podMaker := makePodWithDiskStats + summaryStatsMaker := makeDiskStats + podsToMake := []podToMake{ + {name: "below-requests", requests: newResourceList("", "", "1Gi"), limits: newResourceList("", "", "1Gi"), rootFsUsed: "900Mi"}, + {name: "above-requests", requests: newResourceList("", "", "100Mi"), limits: newResourceList("", "", "1Gi"), rootFsUsed: "700Mi"}, + } + pods := []*v1.Pod{} + podStats := map[*v1.Pod]statsapi.PodStats{} + for _, podToMake := range podsToMake { + pod, podStat := podMaker(podToMake.name, podToMake.priority, podToMake.requests, podToMake.limits, podToMake.rootFsUsed, podToMake.logsFsUsed, podToMake.perLocalVolumeUsed) + pods = append(pods, pod) + podStats[pod] = podStat + } + activePodsFunc := func() []*v1.Pod { + return pods + } + + fakeClock := testingclock.NewFakeClock(time.Now()) + podKiller := &mockPodKiller{} + diskInfoProvider := &mockDiskInfoProvider{dedicatedImageFs: false} + diskGC := &mockDiskGC{err: nil} + nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""} + + config := Config{ + PressureTransitionPeriod: time.Minute * 5, + Thresholds: []evictionapi.Threshold{ + { + Signal: evictionapi.SignalNodeFsAvailable, + Operator: evictionapi.OpLessThan, + Value: evictionapi.ThresholdValue{ + Quantity: quantityMustParse("2Gi"), + }, + }, + }, + } + summaryProvider := &fakeSummaryProvider{result: summaryStatsMaker("1.5Gi", "200Gi", podStats)} + manager := &managerImpl{ + clock: fakeClock, + killPodFunc: podKiller.killPodNow, + imageGC: diskGC, + containerGC: diskGC, + config: config, + recorder: &record.FakeRecorder{}, + summaryProvider: summaryProvider, + nodeRef: nodeRef, + nodeConditionsLastObservedAt: nodeConditionsObservedAt{}, + thresholdsFirstObservedAt: thresholdsObservedAt{}, + } + + // synchronize + manager.synchronize(diskInfoProvider, activePodsFunc) + + // verify menager detected disk pressure + if !manager.IsUnderDiskPressure() { + t.Fatalf("Manager should report disk pressure") + } + + // verify a pod is selected for eviction + if podKiller.pod == nil { + t.Fatalf("Manager should have selected a pod for eviction") + } + + wantPodStatus := tc.wantPodStatus.DeepCopy() + if enablePodDisruptionConditions { + wantPodStatus.Conditions = append(wantPodStatus.Conditions, v1.PodCondition{ + Type: "DisruptionTarget", + Status: "True", + Reason: "TerminationByKubelet", + Message: "The node was low on resource: ephemeral-storage. ", + }) + } + + // verify the pod status after applying the status update function + podKiller.statusFn(&podKiller.pod.Status) + if diff := cmp.Diff(*wantPodStatus, podKiller.pod.Status, cmpopts.IgnoreFields(v1.PodCondition{}, "LastProbeTime", "LastTransitionTime")); diff != "" { + t.Errorf("Unexpected pod status of the evicted pod (-want,+got):\n%s", diff) + } + }) + } + } +} + // TestMemoryPressure func TestMemoryPressure(t *testing.T) { podMaker := makePodWithMemoryStats diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index ab9f0ee116e..c168204e3b7 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -1510,6 +1510,22 @@ func (kl *Kubelet) generateAPIPodStatus(pod *v1.Pod, podStatus *kubecontainer.Po s.Conditions = append(s.Conditions, c) } } + + if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { + // copy over the pod disruption conditions from state which is already + // updated during the eviciton (due to either node resource pressure or + // node graceful shutdown). We do not re-generate the conditions based + // on the container statuses as they are added based on one-time events. + cType := v1.AlphaNoCompatGuaranteeDisruptionTarget + if _, condition := podutil.GetPodConditionFromList(oldPodStatus.Conditions, cType); condition != nil { + if i, _ := podutil.GetPodConditionFromList(s.Conditions, cType); i >= 0 { + s.Conditions[i] = *condition + } else { + s.Conditions = append(s.Conditions, *condition) + } + } + } + // set all Kubelet-owned conditions if utilfeature.DefaultFeatureGate.Enabled(features.PodHasNetworkCondition) { s.Conditions = append(s.Conditions, status.GeneratePodHasNetworkCondition(pod, podStatus)) diff --git a/pkg/kubelet/kubelet_pods_test.go b/pkg/kubelet/kubelet_pods_test.go index 8fc06f6cc03..7af53dc40c8 100644 --- a/pkg/kubelet/kubelet_pods_test.go +++ b/pkg/kubelet/kubelet_pods_test.go @@ -2497,7 +2497,9 @@ func Test_generateAPIPodStatus(t *testing.T) { }, }, } + now := metav1.Now() + normalized_now := now.Rfc3339Copy() tests := []struct { name string @@ -2505,9 +2507,66 @@ func Test_generateAPIPodStatus(t *testing.T) { currentStatus *kubecontainer.PodStatus unreadyContainer []string previousStatus v1.PodStatus + enablePodDisruptionConditions bool expected v1.PodStatus + expectedPodDisruptionCondition v1.PodCondition expectedPodHasNetworkCondition v1.PodCondition }{ + { + name: "pod disruption condition is copied over; PodDisruptionConditions enabled", + pod: &v1.Pod{ + Spec: desiredState, + Status: v1.PodStatus{ + ContainerStatuses: []v1.ContainerStatus{ + runningState("containerA"), + runningState("containerB"), + }, + Conditions: []v1.PodCondition{{ + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionTrue, + LastTransitionTime: normalized_now, + }}, + }, + ObjectMeta: metav1.ObjectMeta{Name: "my-pod", DeletionTimestamp: &now}, + }, + currentStatus: sandboxReadyStatus, + previousStatus: v1.PodStatus{ + ContainerStatuses: []v1.ContainerStatus{ + runningState("containerA"), + runningState("containerB"), + }, + Conditions: []v1.PodCondition{{ + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionTrue, + LastTransitionTime: normalized_now, + }}, + }, + enablePodDisruptionConditions: true, + expected: v1.PodStatus{ + Phase: v1.PodRunning, + HostIP: "127.0.0.1", + QOSClass: v1.PodQOSBestEffort, + Conditions: []v1.PodCondition{ + {Type: v1.PodInitialized, Status: v1.ConditionTrue}, + {Type: v1.PodReady, Status: v1.ConditionTrue}, + {Type: v1.ContainersReady, Status: v1.ConditionTrue}, + {Type: v1.PodScheduled, Status: v1.ConditionTrue}, + }, + ContainerStatuses: []v1.ContainerStatus{ + ready(waitingWithLastTerminationUnknown("containerA", 0)), + ready(waitingWithLastTerminationUnknown("containerB", 0)), + }, + }, + expectedPodDisruptionCondition: v1.PodCondition{ + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionTrue, + LastTransitionTime: normalized_now, + }, + expectedPodHasNetworkCondition: v1.PodCondition{ + Type: kubetypes.PodHasNetwork, + Status: v1.ConditionTrue, + }, + }, { name: "current status ready, with previous statuses and deletion", pod: &v1.Pod{ @@ -2876,6 +2935,7 @@ func Test_generateAPIPodStatus(t *testing.T) { for _, test := range tests { for _, enablePodHasNetworkCondition := range []bool{false, true} { t.Run(test.name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodDisruptionConditions, test.enablePodDisruptionConditions)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodHasNetworkCondition, enablePodHasNetworkCondition)() testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() @@ -2884,12 +2944,16 @@ func Test_generateAPIPodStatus(t *testing.T) { for _, name := range test.unreadyContainer { kl.readinessManager.Set(kubecontainer.BuildContainerID("", findContainerStatusByName(test.expected, name).ContainerID), results.Failure, test.pod) } + expected := test.expected.DeepCopy() actual := kl.generateAPIPodStatus(test.pod, test.currentStatus) if enablePodHasNetworkCondition { - test.expected.Conditions = append([]v1.PodCondition{test.expectedPodHasNetworkCondition}, test.expected.Conditions...) + expected.Conditions = append([]v1.PodCondition{test.expectedPodHasNetworkCondition}, expected.Conditions...) } - if !apiequality.Semantic.DeepEqual(test.expected, actual) { - t.Fatalf("Unexpected status: %s", diff.ObjectReflectDiff(actual, test.expected)) + if test.enablePodDisruptionConditions { + expected.Conditions = append([]v1.PodCondition{test.expectedPodDisruptionCondition}, expected.Conditions...) + } + if !apiequality.Semantic.DeepEqual(*expected, actual) { + t.Fatalf("Unexpected status: %s", diff.ObjectReflectDiff(*expected, actual)) } }) } diff --git a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go index fb34f1bfb10..30f15b152f4 100644 --- a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go +++ b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go @@ -31,6 +31,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/apis/scheduling" "k8s.io/kubernetes/pkg/features" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" @@ -380,6 +381,14 @@ func (m *managerImpl) processShutdownEvent() error { } status.Message = nodeShutdownMessage status.Reason = nodeShutdownReason + if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { + podutil.UpdatePodCondition(status, &v1.PodCondition{ + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionTrue, + Reason: v1.AlphaNoCompatGuaranteePodReasonTerminationByKubelet, + Message: nodeShutdownMessage, + }) + } }); err != nil { m.logger.V(1).Info("Shutdown manager failed killing pod", "pod", klog.KObj(pod), "err", err) } else { diff --git a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux_test.go b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux_test.go index 652b4a32d8e..2372c87b215 100644 --- a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux_test.go +++ b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux_test.go @@ -27,6 +27,8 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -123,10 +125,86 @@ func TestManager(t *testing.T) { shutdownGracePeriodCriticalPods time.Duration systemInhibitDelay time.Duration overrideSystemInhibitDelay time.Duration + enablePodDisruptionConditions bool expectedDidOverrideInhibitDelay bool expectedPodToGracePeriodOverride map[string]int64 expectedError error + expectedPodStatuses map[string]v1.PodStatus }{ + { + desc: "verify pod status; PodDisruptionConditions enabled", + activePods: []*v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{Name: "running-pod"}, + Spec: v1.PodSpec{}, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "failed-pod"}, + Spec: v1.PodSpec{}, + Status: v1.PodStatus{ + Phase: v1.PodFailed, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "succeeded-pod"}, + Spec: v1.PodSpec{}, + Status: v1.PodStatus{ + Phase: v1.PodSucceeded, + }, + }, + }, + shutdownGracePeriodRequested: time.Duration(30 * time.Second), + shutdownGracePeriodCriticalPods: time.Duration(10 * time.Second), + systemInhibitDelay: time.Duration(40 * time.Second), + overrideSystemInhibitDelay: time.Duration(40 * time.Second), + enablePodDisruptionConditions: true, + expectedDidOverrideInhibitDelay: false, + expectedPodToGracePeriodOverride: map[string]int64{"running-pod": 20, "failed-pod": 20, "succeeded-pod": 20}, + expectedPodStatuses: map[string]v1.PodStatus{ + "running-pod": { + Phase: v1.PodFailed, + Message: "Pod was terminated in response to imminent node shutdown.", + Reason: "Terminated", + Conditions: []v1.PodCondition{ + { + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionTrue, + Reason: "TerminationByKubelet", + Message: "Pod was terminated in response to imminent node shutdown.", + }, + }, + }, + "failed-pod": { + Phase: v1.PodFailed, + Message: "Pod was terminated in response to imminent node shutdown.", + Reason: "Terminated", + Conditions: []v1.PodCondition{ + { + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionTrue, + Reason: "TerminationByKubelet", + Message: "Pod was terminated in response to imminent node shutdown.", + }, + }, + }, + "succeeded-pod": { + Phase: v1.PodSucceeded, + Message: "Pod was terminated in response to imminent node shutdown.", + Reason: "Terminated", + Conditions: []v1.PodCondition{ + { + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionTrue, + Reason: "TerminationByKubelet", + Message: "Pod was terminated in response to imminent node shutdown.", + }, + }, + }, + }, + }, { desc: "no override (total=30s, critical=10s)", activePods: []*v1.Pod{normalPodNoGracePeriod, criticalPodNoGracePeriod}, @@ -134,8 +212,21 @@ func TestManager(t *testing.T) { shutdownGracePeriodCriticalPods: time.Duration(10 * time.Second), systemInhibitDelay: time.Duration(40 * time.Second), overrideSystemInhibitDelay: time.Duration(40 * time.Second), + enablePodDisruptionConditions: false, expectedDidOverrideInhibitDelay: false, expectedPodToGracePeriodOverride: map[string]int64{"normal-pod-nil-grace-period": 20, "critical-pod-nil-grace-period": 10}, + expectedPodStatuses: map[string]v1.PodStatus{ + "normal-pod-nil-grace-period": { + Phase: v1.PodFailed, + Message: "Pod was terminated in response to imminent node shutdown.", + Reason: "Terminated", + }, + "critical-pod-nil-grace-period": { + Phase: v1.PodFailed, + Message: "Pod was terminated in response to imminent node shutdown.", + Reason: "Terminated", + }, + }, }, { desc: "no override (total=30s, critical=10s) pods with terminationGracePeriod and without", @@ -228,6 +319,7 @@ func TestManager(t *testing.T) { if gracePeriodOverride != nil { gracePeriod = *gracePeriodOverride } + fn(&pod.Status) podKillChan <- PodKillInfo{Name: pod.Name, GracePeriod: gracePeriod} return nil } @@ -239,6 +331,7 @@ func TestManager(t *testing.T) { systemDbus = func() (dbusInhibiter, error) { return fakeDbus, nil } + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, pkgfeatures.PodDisruptionConditions, tc.enablePodDisruptionConditions)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, pkgfeatures.GracefulNodeShutdown, true)() proberManager := probetest.FakeManager{} @@ -296,6 +389,13 @@ func TestManager(t *testing.T) { assert.Equal(t, manager.Admit(nil).Admit, false) assert.Equal(t, tc.expectedPodToGracePeriodOverride, killedPodsToGracePeriods) assert.Equal(t, tc.expectedDidOverrideInhibitDelay, fakeDbus.didOverrideInhibitDelay, "override system inhibit delay differs") + if tc.expectedPodStatuses != nil { + for _, pod := range tc.activePods { + if diff := cmp.Diff(tc.expectedPodStatuses[pod.Name], pod.Status, cmpopts.IgnoreFields(v1.PodCondition{}, "LastProbeTime", "LastTransitionTime")); diff != "" { + t.Errorf("Unexpected PodStatus: (-want,+got):\n%s", diff) + } + } + } } }) } diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index 5fd79754afe..64cdfec2aa6 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -34,8 +34,10 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/klog/v2" podutil "k8s.io/kubernetes/pkg/api/v1/pod" + "k8s.io/kubernetes/pkg/features" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/metrics" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" @@ -150,7 +152,8 @@ func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager, podD func isPodStatusByKubeletEqual(oldStatus, status *v1.PodStatus) bool { oldCopy := oldStatus.DeepCopy() for _, c := range status.Conditions { - if kubetypes.PodConditionByKubelet(c.Type) { + // both owned and shared conditions are used for kubelet status equality + if kubetypes.PodConditionByKubelet(c.Type) || kubetypes.PodConditionSharedByKubelet(c.Type) { _, oc := podutil.GetPodCondition(oldCopy, c.Type) if oc == nil || oc.Status != c.Status || oc.Message != c.Message || oc.Reason != c.Reason { return false @@ -501,6 +504,11 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp // Set PodScheduledCondition.LastTransitionTime. updateLastTransitionTime(&status, &oldStatus, v1.PodScheduled) + if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { + // Set DisruptionTarget.LastTransitionTime. + updateLastTransitionTime(&status, &oldStatus, v1.AlphaNoCompatGuaranteeDisruptionTarget) + } + // ensure that the start time does not change across updates. if oldStatus.StartTime != nil && !oldStatus.StartTime.IsZero() { status.StartTime = oldStatus.StartTime @@ -879,9 +887,17 @@ func mergePodStatus(oldPodStatus, newPodStatus v1.PodStatus, couldHaveRunningCon podConditions = append(podConditions, c) } } + for _, c := range newPodStatus.Conditions { if kubetypes.PodConditionByKubelet(c.Type) { podConditions = append(podConditions, c) + } else if kubetypes.PodConditionSharedByKubelet(c.Type) { + // for shared conditions we update or append in podConditions + if i, _ := podutil.GetPodConditionFromList(podConditions, c.Type); i >= 0 { + podConditions[i] = c + } else { + podConditions = append(podConditions, c) + } } } newPodStatus.Conditions = podConditions @@ -900,6 +916,16 @@ func mergePodStatus(oldPodStatus, newPodStatus v1.PodStatus, couldHaveRunningCon newPodStatus.Phase = oldPodStatus.Phase newPodStatus.Reason = oldPodStatus.Reason newPodStatus.Message = oldPodStatus.Message + if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { + // revert setting of the pod disruption condition until the pod is terminal in order to do not issue + // an unnecessary PATCH request + revertPodCondition(&oldPodStatus, &newPodStatus, v1.AlphaNoCompatGuaranteeDisruptionTarget) + } + } else { + if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { + // update the LastTransitionTime when transitioning into the failed state + updateLastTransitionTime(&newPodStatus, &oldPodStatus, v1.AlphaNoCompatGuaranteeDisruptionTarget) + } } } @@ -920,6 +946,18 @@ func mergePodStatus(oldPodStatus, newPodStatus v1.PodStatus, couldHaveRunningCon return newPodStatus } +func revertPodCondition(oldPodStatus, newPodStatus *v1.PodStatus, cType v1.PodConditionType) { + if newIndex, newCondition := podutil.GetPodConditionFromList(newPodStatus.Conditions, cType); newCondition != nil { + if _, oldCondition := podutil.GetPodConditionFromList(oldPodStatus.Conditions, cType); oldCondition != nil { + // revert the new condition to what was before + newPodStatus.Conditions[newIndex] = *oldCondition + } else { + // delete the new condition as it wasn't there before + newPodStatus.Conditions = append(newPodStatus.Conditions[:newIndex], newPodStatus.Conditions[newIndex+1:]...) + } + } +} + // NeedToReconcilePodReadiness returns if the pod "Ready" condition need to be reconcile func NeedToReconcilePodReadiness(pod *v1.Pod) bool { if len(pod.Spec.ReadinessGates) == 0 { diff --git a/pkg/kubelet/status/status_manager_test.go b/pkg/kubelet/status/status_manager_test.go index aa1ca964f31..de6f643f540 100644 --- a/pkg/kubelet/status/status_manager_test.go +++ b/pkg/kubelet/status/status_manager_test.go @@ -34,11 +34,14 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/diff" + utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" + featuregatetesting "k8s.io/component-base/featuregate/testing" podutil "k8s.io/kubernetes/pkg/api/v1/pod" api "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/kubernetes/pkg/features" kubeconfigmap "k8s.io/kubernetes/pkg/kubelet/configmap" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" @@ -1400,22 +1403,204 @@ func deleteAction() core.DeleteAction { func TestMergePodStatus(t *testing.T) { useCases := []struct { - desc string - hasRunningContainers bool - oldPodStatus func(input v1.PodStatus) v1.PodStatus - newPodStatus func(input v1.PodStatus) v1.PodStatus - expectPodStatus v1.PodStatus + desc string + enablePodDisruptionConditions bool + hasRunningContainers bool + oldPodStatus func(input v1.PodStatus) v1.PodStatus + newPodStatus func(input v1.PodStatus) v1.PodStatus + expectPodStatus v1.PodStatus }{ { "no change", false, + false, func(input v1.PodStatus) v1.PodStatus { return input }, func(input v1.PodStatus) v1.PodStatus { return input }, getPodStatus(), }, + { + "add DisruptionTarget condition when transitioning into failed phase; PodDisruptionConditions enabled", + true, + false, + func(input v1.PodStatus) v1.PodStatus { return input }, + func(input v1.PodStatus) v1.PodStatus { + input.Phase = v1.PodFailed + input.Conditions = append(input.Conditions, v1.PodCondition{ + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionTrue, + Reason: "TerminationByKubelet", + }) + return input + }, + v1.PodStatus{ + Phase: v1.PodFailed, + Conditions: []v1.PodCondition{ + { + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionTrue, + Reason: "TerminationByKubelet", + }, + { + Type: v1.PodReady, + Status: v1.ConditionFalse, + Reason: "PodFailed", + }, + { + Type: v1.PodScheduled, + Status: v1.ConditionTrue, + }, + { + Type: v1.ContainersReady, + Status: v1.ConditionFalse, + Reason: "PodFailed", + }, + }, + Message: "Message", + }, + }, + { + "don't add DisruptionTarget condition when transitioning into failed phase, but there are might still be running containers; PodDisruptionConditions enabled", + true, + true, + func(input v1.PodStatus) v1.PodStatus { return input }, + func(input v1.PodStatus) v1.PodStatus { + input.Phase = v1.PodFailed + input.Conditions = append(input.Conditions, v1.PodCondition{ + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionTrue, + Reason: "TerminationByKubelet", + }) + return input + }, + v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, + { + Type: v1.PodScheduled, + Status: v1.ConditionTrue, + }, + }, + Message: "Message", + }, + }, + { + "preserve DisruptionTarget condition; PodDisruptionConditions enabled", + true, + false, + func(input v1.PodStatus) v1.PodStatus { + input.Conditions = append(input.Conditions, v1.PodCondition{ + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionTrue, + Reason: "TerminationByKubelet", + }) + return input + }, + func(input v1.PodStatus) v1.PodStatus { + return input + }, + v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, + { + Type: v1.PodScheduled, + Status: v1.ConditionTrue, + }, + { + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionTrue, + Reason: "TerminationByKubelet", + }, + }, + Message: "Message", + }, + }, + { + "preserve DisruptionTarget condition; PodDisruptionConditions disabled", + false, + false, + func(input v1.PodStatus) v1.PodStatus { + input.Conditions = append(input.Conditions, v1.PodCondition{ + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionTrue, + Reason: "TerminationByKubelet", + }) + return input + }, + func(input v1.PodStatus) v1.PodStatus { + return input + }, + v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, + { + Type: v1.PodScheduled, + Status: v1.ConditionTrue, + }, + { + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionTrue, + Reason: "TerminationByKubelet", + }, + }, + Message: "Message", + }, + }, + { + "override DisruptionTarget condition; PodDisruptionConditions enabled", + true, + false, + func(input v1.PodStatus) v1.PodStatus { + input.Conditions = append(input.Conditions, v1.PodCondition{ + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionTrue, + Reason: "EvictedByEvictionAPI", + }) + return input + }, + func(input v1.PodStatus) v1.PodStatus { + input.Conditions = append(input.Conditions, v1.PodCondition{ + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionTrue, + Reason: "TerminationByKubelet", + }) + return input + }, + v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, + { + Type: v1.PodScheduled, + Status: v1.ConditionTrue, + }, + { + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionTrue, + Reason: "TerminationByKubelet", + }, + }, + Message: "Message", + }, + }, { "readiness changes", false, + false, func(input v1.PodStatus) v1.PodStatus { return input }, func(input v1.PodStatus) v1.PodStatus { input.Conditions[0].Status = v1.ConditionFalse @@ -1439,6 +1624,7 @@ func TestMergePodStatus(t *testing.T) { { "additional pod condition", false, + false, func(input v1.PodStatus) v1.PodStatus { input.Conditions = append(input.Conditions, v1.PodCondition{ Type: v1.PodConditionType("example.com/feature"), @@ -1469,6 +1655,7 @@ func TestMergePodStatus(t *testing.T) { { "additional pod condition and readiness changes", false, + false, func(input v1.PodStatus) v1.PodStatus { input.Conditions = append(input.Conditions, v1.PodCondition{ Type: v1.PodConditionType("example.com/feature"), @@ -1502,6 +1689,7 @@ func TestMergePodStatus(t *testing.T) { { "additional pod condition changes", false, + false, func(input v1.PodStatus) v1.PodStatus { input.Conditions = append(input.Conditions, v1.PodCondition{ Type: v1.PodConditionType("example.com/feature"), @@ -1538,6 +1726,7 @@ func TestMergePodStatus(t *testing.T) { { "phase is transitioning to failed and no containers running", false, + false, func(input v1.PodStatus) v1.PodStatus { input.Phase = v1.PodRunning input.Reason = "Unknown" @@ -1556,10 +1745,12 @@ func TestMergePodStatus(t *testing.T) { { Type: v1.PodReady, Status: v1.ConditionFalse, + Reason: "PodFailed", }, { Type: v1.ContainersReady, Status: v1.ConditionFalse, + Reason: "PodFailed", }, { Type: v1.PodScheduled, @@ -1572,6 +1763,7 @@ func TestMergePodStatus(t *testing.T) { }, { "phase is transitioning to failed and containers running", + false, true, func(input v1.PodStatus) v1.PodStatus { input.Phase = v1.PodRunning @@ -1605,6 +1797,7 @@ func TestMergePodStatus(t *testing.T) { for _, tc := range useCases { t.Run(tc.desc, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodDisruptionConditions, tc.enablePodDisruptionConditions)() output := mergePodStatus(tc.oldPodStatus(getPodStatus()), tc.newPodStatus(getPodStatus()), tc.hasRunningContainers) if !conditionsEqual(output.Conditions, tc.expectPodStatus.Conditions) || !statusEqual(output, tc.expectPodStatus) { t.Fatalf("unexpected output: %s", cmp.Diff(tc.expectPodStatus, output)) @@ -1630,7 +1823,7 @@ func conditionsEqual(left, right []v1.PodCondition) bool { for _, r := range right { if l.Type == r.Type { found = true - if l.Status != r.Status { + if l.Status != r.Status || l.Reason != r.Reason { return false } } diff --git a/pkg/kubelet/types/pod_status.go b/pkg/kubelet/types/pod_status.go index eb255eb8ef6..93a12c6165a 100644 --- a/pkg/kubelet/types/pod_status.go +++ b/pkg/kubelet/types/pod_status.go @@ -44,3 +44,13 @@ func PodConditionByKubelet(conditionType v1.PodConditionType) bool { } return false } + +// PodConditionSharedByKubelet returns if the pod condition type is shared by kubelet +func PodConditionSharedByKubelet(conditionType v1.PodConditionType) bool { + if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { + if conditionType == v1.AlphaNoCompatGuaranteeDisruptionTarget { + return true + } + } + return false +} diff --git a/pkg/kubelet/types/pod_status_test.go b/pkg/kubelet/types/pod_status_test.go index e03b6fb5a50..2e8bd26031c 100644 --- a/pkg/kubelet/types/pod_status_test.go +++ b/pkg/kubelet/types/pod_status_test.go @@ -27,6 +27,8 @@ import ( func TestPodConditionByKubelet(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodHasNetworkCondition, true)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodDisruptionConditions, true)() + trueCases := []v1.PodConditionType{ v1.PodScheduled, v1.PodReady, @@ -52,3 +54,28 @@ func TestPodConditionByKubelet(t *testing.T) { } } } + +func TestPodConditionSharedByKubelet(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodDisruptionConditions, true)() + + trueCases := []v1.PodConditionType{ + v1.AlphaNoCompatGuaranteeDisruptionTarget, + } + + for _, tc := range trueCases { + if !PodConditionSharedByKubelet(tc) { + t.Errorf("Expect %q to be condition shared by kubelet.", tc) + } + } + + falseCases := []v1.PodConditionType{ + v1.PodConditionType("abcd"), + v1.PodConditionType(v1.PodReasonUnschedulable), + } + + for _, tc := range falseCases { + if PodConditionSharedByKubelet(tc) { + t.Errorf("Expect %q NOT to be condition shared by kubelet.", tc) + } + } +} diff --git a/staging/src/k8s.io/api/core/v1/types.go b/staging/src/k8s.io/api/core/v1/types.go index 11cde28150b..d9ae784cbd5 100644 --- a/staging/src/k8s.io/api/core/v1/types.go +++ b/staging/src/k8s.io/api/core/v1/types.go @@ -2654,7 +2654,7 @@ const ( PodReady PodConditionType = "Ready" // PodScheduled represents status of the scheduling process for this pod. PodScheduled PodConditionType = "PodScheduled" - // AlphaNoCompatGuaranteeDisruptionTarget indicates the pod is about to be deleted due to a + // AlphaNoCompatGuaranteeDisruptionTarget indicates the pod is about to be terminated due to a // disruption (such as preemption, eviction API or garbage-collection). // The constant is to be renamed once the name is accepted within the KEP-3329. AlphaNoCompatGuaranteeDisruptionTarget PodConditionType = "DisruptionTarget" @@ -2673,6 +2673,10 @@ const ( // PodReasonSchedulerError reason in PodScheduled PodCondition means that some internal error happens // during scheduling, for example due to nodeAffinity parsing errors. PodReasonSchedulerError = "SchedulerError" + + // TerminationByKubelet reason in DisruptionTarget pod condition indicates that the termination + // is initiated by kubelet + AlphaNoCompatGuaranteePodReasonTerminationByKubelet = "TerminationByKubelet" ) // PodCondition contains details for the current condition of this pod. diff --git a/test/e2e/framework/pod/utils.go b/test/e2e/framework/pod/utils.go index 8851b27e42d..a62c29a62d7 100644 --- a/test/e2e/framework/pod/utils.go +++ b/test/e2e/framework/pod/utils.go @@ -231,3 +231,13 @@ func mixinRestrictedContainerSecurityContext(container *v1.Container) { } } } + +// FindPodConditionByType loops through all pod conditions in pod status and returns the specified condition. +func FindPodConditionByType(podStatus *v1.PodStatus, conditionType v1.PodConditionType) *v1.PodCondition { + for _, cond := range podStatus.Conditions { + if cond.Type == conditionType { + return &cond + } + } + return nil +} diff --git a/test/e2e_node/eviction_test.go b/test/e2e_node/eviction_test.go index 0d3ff72f946..174c2a5ef0b 100644 --- a/test/e2e_node/eviction_test.go +++ b/test/e2e_node/eviction_test.go @@ -31,6 +31,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" kubeletstatsv1alpha1 "k8s.io/kubelet/pkg/apis/stats/v1alpha1" + "k8s.io/kubernetes/pkg/features" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" "k8s.io/kubernetes/pkg/kubelet/eviction" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" @@ -500,6 +501,28 @@ var _ = SIGDescribe("PriorityPidEvictionOrdering [Slow] [Serial] [Disruptive][No specs[2].pod.Spec.PriorityClassName = highPriorityClassName runEvictionTest(f, pressureTimeout, expectedNodeCondition, expectedStarvedResource, logPidMetrics, specs) }) + + ginkgo.Context(fmt.Sprintf(testContextFmt, expectedNodeCondition)+"; PodDisruptionConditions enabled [NodeFeature:PodDisruptionConditions]", func() { + tempSetCurrentKubeletConfig(f, func(initialConfig *kubeletconfig.KubeletConfiguration) { + pidsConsumed := int64(10000) + summary := eventuallyGetSummary() + availablePids := *(summary.Node.Rlimit.MaxPID) - *(summary.Node.Rlimit.NumOfRunningProcesses) + initialConfig.EvictionHard = map[string]string{string(evictionapi.SignalPIDAvailable): fmt.Sprintf("%d", availablePids-pidsConsumed)} + initialConfig.EvictionMinimumReclaim = map[string]string{} + initialConfig.FeatureGates = map[string]bool{ + string(features.PodDisruptionConditions): true, + } + }) + disruptionTarget := v1.AlphaNoCompatGuaranteeDisruptionTarget + specs := []podEvictSpec{ + { + evictionPriority: 1, + pod: pidConsumingPod("fork-bomb-container", 30000), + wantPodDisruptionCondition: &disruptionTarget, + }, + } + runEvictionTest(f, pressureTimeout, expectedNodeCondition, expectedStarvedResource, logPidMetrics, specs) + }) }) // Struct used by runEvictionTest that specifies the pod, and when that pod should be evicted, relative to other pods @@ -507,8 +530,9 @@ type podEvictSpec struct { // P0 should never be evicted, P1 shouldn't evict before P2, etc. // If two are ranked at P1, either is permitted to fail before the other. // The test ends when all pods other than p0 have been evicted - evictionPriority int - pod *v1.Pod + evictionPriority int + pod *v1.Pod + wantPodDisruptionCondition *v1.PodConditionType } // runEvictionTest sets up a testing environment given the provided pods, and checks a few things: @@ -560,6 +584,9 @@ func runEvictionTest(f *framework.Framework, pressureTimeout time.Duration, expe return verifyEvictionOrdering(f, testSpecs) }, pressureTimeout, evictionPollInterval).Should(gomega.BeNil()) + ginkgo.By("checking for the expected pod conditions for evicted pods") + verifyPodConditions(f, testSpecs) + // We observe pressure from the API server. The eviction manager observes pressure from the kubelet internal stats. // This means the eviction manager will observe pressure before we will, creating a delay between when the eviction manager // evicts a pod, and when we observe the pressure by querying the API server. Add a delay here to account for this delay @@ -725,6 +752,21 @@ func verifyEvictionOrdering(f *framework.Framework, testSpecs []podEvictSpec) er return fmt.Errorf("pods that should be evicted are still running: %#v", pendingPods) } +func verifyPodConditions(f *framework.Framework, testSpecs []podEvictSpec) { + for _, spec := range testSpecs { + if spec.wantPodDisruptionCondition != nil { + pod, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Get(context.TODO(), spec.pod.Name, metav1.GetOptions{}) + framework.ExpectNoError(err, "Failed to get the recent pod object for name: %q", pod.Name) + + cType := *spec.wantPodDisruptionCondition + podDisruptionCondition := e2epod.FindPodConditionByType(&pod.Status, cType) + if podDisruptionCondition == nil { + framework.Failf("pod %q should have the condition: %q, pod status: %v", pod.Name, cType, pod.Status) + } + } + } +} + func verifyEvictionEvents(f *framework.Framework, testSpecs []podEvictSpec, expectedStarvedResource v1.ResourceName) { for _, spec := range testSpecs { pod := spec.pod diff --git a/test/e2e_node/node_shutdown_linux_test.go b/test/e2e_node/node_shutdown_linux_test.go index 98e3ba8516f..f25e50606a8 100644 --- a/test/e2e_node/node_shutdown_linux_test.go +++ b/test/e2e_node/node_shutdown_linux_test.go @@ -55,6 +55,109 @@ import ( var _ = SIGDescribe("GracefulNodeShutdown [Serial] [NodeFeature:GracefulNodeShutdown] [NodeFeature:GracefulNodeShutdownBasedOnPodPriority]", func() { f := framework.NewDefaultFramework("graceful-node-shutdown") f.NamespacePodSecurityEnforceLevel = admissionapi.LevelPrivileged + + ginkgo.Context("graceful node shutdown when PodDisruptionConditions are enabled [NodeFeature:PodDisruptionConditions]", func() { + + const ( + pollInterval = 1 * time.Second + podStatusUpdateTimeout = 30 * time.Second + nodeStatusUpdateTimeout = 30 * time.Second + nodeShutdownGracePeriod = 30 * time.Second + ) + + tempSetCurrentKubeletConfig(f, func(initialConfig *kubeletconfig.KubeletConfiguration) { + initialConfig.FeatureGates = map[string]bool{ + string(features.GracefulNodeShutdown): true, + string(features.PodDisruptionConditions): true, + string(features.GracefulNodeShutdownBasedOnPodPriority): false, + } + initialConfig.ShutdownGracePeriod = metav1.Duration{Duration: nodeShutdownGracePeriod} + }) + + ginkgo.BeforeEach(func() { + ginkgo.By("Wait for the node to be ready") + waitForNodeReady() + }) + + ginkgo.AfterEach(func() { + ginkgo.By("Emitting Shutdown false signal; cancelling the shutdown") + err := emitSignalPrepareForShutdown(false) + framework.ExpectNoError(err) + }) + + ginkgo.It("should add the DisruptionTarget pod failure condition to the evicted pods", func() { + nodeName := getNodeName(f) + nodeSelector := fields.Set{ + "spec.nodeName": nodeName, + }.AsSelector().String() + + // Define test pods + pods := []*v1.Pod{ + getGracePeriodOverrideTestPod("pod-to-evict", nodeName, 5, ""), + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ginkgo.By("reating batch pods") + e2epod.NewPodClient(f).CreateBatch(pods) + + list, err := e2epod.NewPodClient(f).List(ctx, metav1.ListOptions{ + FieldSelector: nodeSelector, + }) + + framework.ExpectNoError(err) + framework.ExpectEqual(len(list.Items), len(pods), "the number of pods is not as expected") + + list, err = e2epod.NewPodClient(f).List(ctx, metav1.ListOptions{ + FieldSelector: nodeSelector, + }) + if err != nil { + framework.Failf("Failed to start batch pod: %q", err) + } + framework.ExpectEqual(len(list.Items), len(pods), "the number of pods is not as expected") + + for _, pod := range list.Items { + framework.Logf("Pod %q status conditions: %q", pod.Name, &pod.Status.Conditions) + } + + ginkgo.By("Verifying batch pods are running") + for _, pod := range list.Items { + if podReady, err := testutils.PodRunningReady(&pod); err != nil || !podReady { + framework.Failf("Failed to start batch pod: %v", pod.Name) + } + } + + ginkgo.By("Emitting shutdown signal") + err = emitSignalPrepareForShutdown(true) + framework.ExpectNoError(err) + + ginkgo.By("Verifying that all pods are shutdown") + // All pod should be shutdown + gomega.Eventually(func() error { + list, err = e2epod.NewPodClient(f).List(ctx, metav1.ListOptions{ + FieldSelector: nodeSelector, + }) + if err != nil { + return err + } + framework.ExpectEqual(len(list.Items), len(pods), "the number of pods is not as expected") + + for _, pod := range list.Items { + if !isPodShutdown(&pod) { + framework.Logf("Expecting pod to be shutdown, but it's not currently: Pod: %q, Pod Status %+v", pod.Name, pod.Status) + return fmt.Errorf("pod should be shutdown, phase: %s", pod.Status.Phase) + } + podDisruptionCondition := e2epod.FindPodConditionByType(&pod.Status, v1.AlphaNoCompatGuaranteeDisruptionTarget) + if podDisruptionCondition == nil { + framework.Failf("pod %q should have the condition: %q, pod status: %v", pod.Name, v1.AlphaNoCompatGuaranteeDisruptionTarget, pod.Status) + } + } + return nil + }, podStatusUpdateTimeout+(nodeShutdownGracePeriod), pollInterval).Should(gomega.BeNil()) + }) + }) + ginkgo.Context("when gracefully shutting down", func() { const ( diff --git a/test/integration/disruption/disruption_test.go b/test/integration/disruption/disruption_test.go index 3075b235c2f..62edf29cdbb 100644 --- a/test/integration/disruption/disruption_test.go +++ b/test/integration/disruption/disruption_test.go @@ -668,9 +668,12 @@ func TestStalePodDisruption(t *testing.T) { cases := map[string]struct { deletePod bool + podPhase v1.PodPhase + reason string wantConditions []v1.PodCondition }{ "stale-condition": { + podPhase: v1.PodRunning, wantConditions: []v1.PodCondition{ { Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, @@ -679,6 +682,7 @@ func TestStalePodDisruption(t *testing.T) { }, }, "deleted-pod": { + podPhase: v1.PodRunning, deletePod: true, wantConditions: []v1.PodCondition{ { @@ -687,6 +691,26 @@ func TestStalePodDisruption(t *testing.T) { }, }, }, + "disruption-condition-by-kubelet": { + podPhase: v1.PodFailed, + reason: v1.AlphaNoCompatGuaranteePodReasonTerminationByKubelet, + wantConditions: []v1.PodCondition{ + { + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionTrue, + Reason: v1.AlphaNoCompatGuaranteePodReasonTerminationByKubelet, + }, + }, + }, + "disruption-condition-on-failed-pod": { + podPhase: v1.PodFailed, + wantConditions: []v1.PodCondition{ + { + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionTrue, + }, + }, + }, } for name, tc := range cases { @@ -702,10 +726,11 @@ func TestStalePodDisruption(t *testing.T) { t.Fatalf("Failed creating pod: %v", err) } - pod.Status.Phase = v1.PodRunning + pod.Status.Phase = tc.podPhase pod.Status.Conditions = append(pod.Status.Conditions, v1.PodCondition{ Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, Status: v1.ConditionTrue, + Reason: tc.reason, LastTransitionTime: metav1.Now(), }) pod, err = clientSet.CoreV1().Pods(nsName).UpdateStatus(ctx, pod, metav1.UpdateOptions{})