From 9e1adced5db4ccc5e36ef3617498919332e969dd Mon Sep 17 00:00:00 2001 From: carlory Date: Sat, 8 Jul 2023 21:13:46 +0800 Subject: [PATCH] noderesourcefit: scheduler queueing hints Co-authored-by: Kensei Nakada --- .../framework/plugins/noderesources/fit.go | 118 ++++++++- .../plugins/noderesources/fit_test.go | 239 ++++++++++++++++++ 2 files changed, 355 insertions(+), 2 deletions(-) diff --git a/pkg/scheduler/framework/plugins/noderesources/fit.go b/pkg/scheduler/framework/plugins/noderesources/fit.go index ba09fa8628a..58b8fa593c1 100644 --- a/pkg/scheduler/framework/plugins/noderesources/fit.go +++ b/pkg/scheduler/framework/plugins/noderesources/fit.go @@ -21,9 +21,11 @@ import ( "fmt" "strings" + "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/api/v1/resource" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/scheduler/apis/config" @@ -31,6 +33,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" + schedutil "k8s.io/kubernetes/pkg/scheduler/util" ) var _ framework.PreFilterPlugin = &Fit{} @@ -244,11 +247,122 @@ func (f *Fit) EventsToRegister() []framework.ClusterEventWithHint { podActionType |= framework.Update } return []framework.ClusterEventWithHint{ - {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: podActionType}}, - {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}}, + {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: podActionType}, QueueingHintFn: f.isSchedulableAfterPodChange}, + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}, QueueingHintFn: f.isSchedulableAfterNodeChange}, } } +// isSchedulableAfterPodChange is invoked whenever a pod deleted or updated. It checks whether +// that change made a previously unschedulable pod schedulable. +func (f *Fit) isSchedulableAfterPodChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { + originalPod, modifiedPod, err := schedutil.As[*v1.Pod](oldObj, newObj) + if err != nil { + return framework.Queue, err + } + + if modifiedPod == nil { + if originalPod.Spec.NodeName == "" { + logger.V(5).Info("the deleted pod was unscheduled and it wouldn't make the unscheduled pod schedulable", "pod", klog.KObj(pod), "deletedPod", klog.KObj(originalPod)) + return framework.QueueSkip, nil + } + logger.V(5).Info("another scheduled pod was deleted, and it may make the unscheduled pod schedulable", "pod", klog.KObj(pod), "deletedPod", klog.KObj(originalPod)) + return framework.Queue, nil + } + + if !f.enableInPlacePodVerticalScaling { + // If InPlacePodVerticalScaling (KEP 1287) is disabled, it cannot free up resources. + logger.V(5).Info("another pod was modified, but InPlacePodVerticalScaling is disabled, so it doesn't make the unscheduled pod schedulable", "pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod)) + return framework.QueueSkip, nil + } + + // Modifications may or may not be relevant. We only care about modifications that + // change the other pod's resource request and the resource is also requested by the + // pod we are trying to schedule. + if !f.isResourceScaleDown(pod, originalPod, modifiedPod) { + if loggerV := logger.V(10); loggerV.Enabled() { + // Log more information. + loggerV.Info("another Pod got modified, but the modification isn't related to the resource request", "pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod), "diff", cmp.Diff(originalPod, modifiedPod)) + } else { + logger.V(5).Info("another Pod got modified, but the modification isn't related to the resource request", "pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod)) + } + return framework.QueueSkip, nil + } + + logger.V(5).Info("the max request resources of another scheduled pod got reduced and it may make the unscheduled pod schedulable", "pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod)) + return framework.Queue, nil +} + +// isResourceScaleDown checks whether the resource request of the modified pod is less than the original pod +// for the resources requested by the pod we are trying to schedule. +func (f *Fit) isResourceScaleDown(targetPod, originalOtherPod, modifiedOtherPod *v1.Pod) bool { + if modifiedOtherPod.Spec.NodeName == "" { + // no resource is freed up whatever the pod is modified. + return false + } + + // the other pod was scheduled, so modification or deletion may free up some resources. + originalMaxResourceReq, modifiedMaxResourceReq := &framework.Resource{}, &framework.Resource{} + originalMaxResourceReq.SetMaxResource(resource.PodRequests(originalOtherPod, resource.PodResourcesOptions{InPlacePodVerticalScalingEnabled: f.enableInPlacePodVerticalScaling})) + modifiedMaxResourceReq.SetMaxResource(resource.PodRequests(modifiedOtherPod, resource.PodResourcesOptions{InPlacePodVerticalScalingEnabled: f.enableInPlacePodVerticalScaling})) + + // check whether the resource request of the modified pod is less than the original pod. + podRequests := resource.PodRequests(targetPod, resource.PodResourcesOptions{InPlacePodVerticalScalingEnabled: f.enableInPlacePodVerticalScaling}) + for rName, rValue := range podRequests { + if rValue.IsZero() { + // We only care about the resources requested by the pod we are trying to schedule. + continue + } + switch rName { + case v1.ResourceCPU: + if originalMaxResourceReq.MilliCPU > modifiedMaxResourceReq.MilliCPU { + return true + } + case v1.ResourceMemory: + if originalMaxResourceReq.Memory > modifiedMaxResourceReq.Memory { + return true + } + case v1.ResourceEphemeralStorage: + if originalMaxResourceReq.EphemeralStorage > modifiedMaxResourceReq.EphemeralStorage { + return true + } + default: + if schedutil.IsScalarResourceName(rName) && originalMaxResourceReq.ScalarResources[rName] > modifiedMaxResourceReq.ScalarResources[rName] { + return true + } + } + } + return false +} + +// isSchedulableAfterNodeChange is invoked whenever a node added or changed. It checks whether +// that change made a previously unschedulable pod schedulable. +func (f *Fit) isSchedulableAfterNodeChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { + _, modifiedNode, err := schedutil.As[*v1.Node](oldObj, newObj) + if err != nil { + return framework.Queue, err + } + // TODO: also check if the original node meets the pod's resource requestments once preCheck is completely removed. + // See: https://github.com/kubernetes/kubernetes/issues/110175 + if isFit(pod, modifiedNode) { + logger.V(5).Info("node was updated, and may fit with the pod's resource requestments", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode)) + return framework.Queue, nil + } + + logger.V(5).Info("node was created or updated, but it doesn't have enough resource(s) to accommodate this pod", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode)) + return framework.QueueSkip, nil +} + +// isFit checks if the pod fits the node. If the node is nil, it returns false. +// It constructs a fake NodeInfo object for the node and checks if the pod fits the node. +func isFit(pod *v1.Pod, node *v1.Node) bool { + if node == nil { + return false + } + nodeInfo := framework.NewNodeInfo() + nodeInfo.SetNode(node) + return len(Fits(pod, nodeInfo)) == 0 +} + // Filter invoked at the filter extension point. // Checks if a node has sufficient resources, such as cpu, memory, gpu, opaque int resources etc to run a pod. // It returns a list of insufficient resources, if empty, then the node has all the resources requested by the pod. diff --git a/pkg/scheduler/framework/plugins/noderesources/fit_test.go b/pkg/scheduler/framework/plugins/noderesources/fit_test.go index fb454fe6757..a4d9e17e237 100644 --- a/pkg/scheduler/framework/plugins/noderesources/fit_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/fit_test.go @@ -23,9 +23,11 @@ import ( "testing" "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/klog/v2/ktesting" + _ "k8s.io/klog/v2/ktesting/init" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" @@ -1109,9 +1111,246 @@ func TestEventsToRegister(t *testing.T) { t.Run(test.name, func(t *testing.T) { fp := &Fit{enableInPlacePodVerticalScaling: test.inPlacePodVerticalScalingEnabled} actualClusterEvents := fp.EventsToRegister() + for i := range actualClusterEvents { + actualClusterEvents[i].QueueingHintFn = nil + } if diff := cmp.Diff(test.expectedClusterEvents, actualClusterEvents); diff != "" { t.Error("Cluster Events doesn't match extected events (-expected +actual):\n", diff) } }) } } + +func Test_isSchedulableAfterPodChange(t *testing.T) { + testcases := map[string]struct { + pod *v1.Pod + oldObj, newObj interface{} + enableInPlacePodVerticalScaling bool + expectedHint framework.QueueingHint + expectedErr bool + }{ + "backoff-wrong-old-object": { + pod: &v1.Pod{}, + oldObj: "not-a-pod", + enableInPlacePodVerticalScaling: true, + expectedHint: framework.Queue, + expectedErr: true, + }, + "backoff-wrong-new-object": { + pod: &v1.Pod{}, + newObj: "not-a-pod", + enableInPlacePodVerticalScaling: true, + expectedHint: framework.Queue, + expectedErr: true, + }, + "queue-on-deleted": { + pod: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), + oldObj: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Node("fake").Obj(), + enableInPlacePodVerticalScaling: true, + expectedHint: framework.Queue, + }, + "skip-queue-on-unscheduled-pod-deleted": { + pod: &v1.Pod{}, + oldObj: &v1.Pod{}, + enableInPlacePodVerticalScaling: true, + expectedHint: framework.QueueSkip, + }, + "skip-queue-on-disable-inplace-pod-vertical-scaling": { + pod: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), + oldObj: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Node("fake").Obj(), + newObj: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Node("fake").Obj(), + enableInPlacePodVerticalScaling: false, + expectedHint: framework.QueueSkip, + }, + "skip-queue-on-unscheduled-pod": { + pod: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), + oldObj: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj(), + newObj: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), + enableInPlacePodVerticalScaling: true, + expectedHint: framework.QueueSkip, + }, + "skip-queue-on-non-resource-changes": { + pod: &v1.Pod{}, + oldObj: st.MakePod().Label("k", "v").Node("fake").Obj(), + newObj: st.MakePod().Label("foo", "bar").Node("fake").Obj(), + enableInPlacePodVerticalScaling: true, + expectedHint: framework.QueueSkip, + }, + "skip-queue-on-unrelated-resource-changes": { + pod: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), + oldObj: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceMemory: "2"}).Node("fake").Obj(), + newObj: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceMemory: "1"}).Node("fake").Obj(), + enableInPlacePodVerticalScaling: true, + expectedHint: framework.QueueSkip, + }, + "skip-queue-on-resource-scale-up": { + pod: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), + oldObj: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Node("fake").Obj(), + newObj: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Node("fake").Obj(), + enableInPlacePodVerticalScaling: true, + expectedHint: framework.QueueSkip, + }, + "queue-on-some-resource-scale-down": { + pod: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), + oldObj: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Node("fake").Obj(), + newObj: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Node("fake").Obj(), + enableInPlacePodVerticalScaling: true, + expectedHint: framework.Queue, + }, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + p, err := NewFit(ctx, &config.NodeResourcesFitArgs{ScoringStrategy: defaultScoringStrategy}, nil, plfeature.Features{ + EnableInPlacePodVerticalScaling: tc.enableInPlacePodVerticalScaling, + }) + if err != nil { + t.Fatal(err) + } + actualHint, err := p.(*Fit).isSchedulableAfterPodChange(logger, tc.pod, tc.oldObj, tc.newObj) + if tc.expectedErr { + require.Error(t, err) + return + } + require.NoError(t, err) + require.Equal(t, tc.expectedHint, actualHint) + }) + } +} + +func Test_isSchedulableAfterNodeChange(t *testing.T) { + testcases := map[string]struct { + pod *v1.Pod + oldObj, newObj interface{} + expectedHint framework.QueueingHint + expectedErr bool + }{ + "backoff-wrong-new-object": { + pod: &v1.Pod{}, + newObj: "not-a-node", + expectedHint: framework.Queue, + expectedErr: true, + }, + "backoff-wrong-old-object": { + pod: &v1.Pod{}, + oldObj: "not-a-node", + newObj: &v1.Node{}, + expectedHint: framework.Queue, + expectedErr: true, + }, + "skip-queue-on-node-add-without-sufficient-resources": { + pod: newResourcePod(framework.Resource{Memory: 2}), + newObj: st.MakeNode().Capacity(map[v1.ResourceName]string{ + v1.ResourceMemory: "1", + }).Obj(), + expectedHint: framework.QueueSkip, + }, + "skip-queue-on-node-add-without-required-resource-type": { + pod: newResourcePod(framework.Resource{ + ScalarResources: map[v1.ResourceName]int64{extendedResourceA: 1}}, + ), + newObj: st.MakeNode().Capacity(map[v1.ResourceName]string{ + extendedResourceB: "1", + }).Obj(), + expectedHint: framework.QueueSkip, + }, + "queue-on-node-add-with-sufficient-resources": { + pod: newResourcePod(framework.Resource{ + Memory: 2, + ScalarResources: map[v1.ResourceName]int64{extendedResourceA: 1}, + }), + newObj: st.MakeNode().Capacity(map[v1.ResourceName]string{ + v1.ResourceMemory: "4", + extendedResourceA: "2", + }).Obj(), + expectedHint: framework.Queue, + }, + // uncomment this case when the isSchedulableAfterNodeChange also check the + // original node's resources. + // "skip-queue-on-node-unrelated-changes": { + // pod: &v1.Pod{}, + // oldObj: st.MakeNode().Obj(), + // newObj: st.MakeNode().Label("foo", "bar").Obj(), + // expectedHint: framework.QueueSkip, + // }, + "skip-queue-on-node-changes-from-suitable-to-unsuitable": { + pod: newResourcePod(framework.Resource{ + Memory: 2, + ScalarResources: map[v1.ResourceName]int64{extendedResourceA: 1}, + }), + oldObj: st.MakeNode().Capacity(map[v1.ResourceName]string{ + v1.ResourceMemory: "4", + extendedResourceA: "2", + }).Obj(), + newObj: st.MakeNode().Capacity(map[v1.ResourceName]string{ + v1.ResourceMemory: "1", + extendedResourceA: "2", + }).Obj(), + expectedHint: framework.QueueSkip, + }, + "queue-on-node-changes-from-unsuitable-to-suitable": { + pod: newResourcePod(framework.Resource{ + Memory: 2, + ScalarResources: map[v1.ResourceName]int64{extendedResourceA: 1}, + }), + oldObj: st.MakeNode().Capacity(map[v1.ResourceName]string{ + v1.ResourceMemory: "1", + extendedResourceA: "2", + }).Obj(), + newObj: st.MakeNode().Capacity(map[v1.ResourceName]string{ + v1.ResourceMemory: "4", + extendedResourceA: "2", + }).Obj(), + expectedHint: framework.Queue, + }, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + p, err := NewFit(ctx, &config.NodeResourcesFitArgs{ScoringStrategy: defaultScoringStrategy}, nil, plfeature.Features{}) + if err != nil { + t.Fatal(err) + } + actualHint, err := p.(*Fit).isSchedulableAfterNodeChange(logger, tc.pod, tc.oldObj, tc.newObj) + if tc.expectedErr { + require.Error(t, err) + return + } + require.NoError(t, err) + require.Equal(t, tc.expectedHint, actualHint) + }) + } +} + +func TestIsFit(t *testing.T) { + testCases := map[string]struct { + pod *v1.Pod + node *v1.Node + expected bool + }{ + "nil node": { + pod: &v1.Pod{}, + expected: false, + }, + "insufficient resource": { + pod: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj(), + node: st.MakeNode().Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), + expected: false, + }, + "sufficient resource": { + pod: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), + node: st.MakeNode().Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj(), + expected: true, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + if got := isFit(tc.pod, tc.node); got != tc.expected { + t.Errorf("expected: %v, got: %v", tc.expected, got) + } + }) + } +}