Merge pull request #119177 from carlory/fix-118893-3

noderesourcefit: scheduler queueing hints
This commit is contained in:
Kubernetes Prow Robot 2023-12-15 17:17:19 +01:00 committed by GitHub
commit 195bb67d36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 355 additions and 2 deletions

View File

@ -21,9 +21,11 @@ import (
"fmt"
"strings"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/api/v1/resource"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
@ -31,6 +33,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
)
var _ framework.PreFilterPlugin = &Fit{}
@ -252,11 +255,122 @@ func (f *Fit) EventsToRegister() []framework.ClusterEventWithHint {
podActionType |= framework.Update
}
return []framework.ClusterEventWithHint{
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: podActionType}},
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}},
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: podActionType}, QueueingHintFn: f.isSchedulableAfterPodChange},
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}, QueueingHintFn: f.isSchedulableAfterNodeChange},
}
}
// isSchedulableAfterPodChange is invoked whenever a pod deleted or updated. It checks whether
// that change made a previously unschedulable pod schedulable.
func (f *Fit) isSchedulableAfterPodChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
originalPod, modifiedPod, err := schedutil.As[*v1.Pod](oldObj, newObj)
if err != nil {
return framework.Queue, err
}
if modifiedPod == nil {
if originalPod.Spec.NodeName == "" {
logger.V(5).Info("the deleted pod was unscheduled and it wouldn't make the unscheduled pod schedulable", "pod", klog.KObj(pod), "deletedPod", klog.KObj(originalPod))
return framework.QueueSkip, nil
}
logger.V(5).Info("another scheduled pod was deleted, and it may make the unscheduled pod schedulable", "pod", klog.KObj(pod), "deletedPod", klog.KObj(originalPod))
return framework.Queue, nil
}
if !f.enableInPlacePodVerticalScaling {
// If InPlacePodVerticalScaling (KEP 1287) is disabled, it cannot free up resources.
logger.V(5).Info("another pod was modified, but InPlacePodVerticalScaling is disabled, so it doesn't make the unscheduled pod schedulable", "pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod))
return framework.QueueSkip, nil
}
// Modifications may or may not be relevant. We only care about modifications that
// change the other pod's resource request and the resource is also requested by the
// pod we are trying to schedule.
if !f.isResourceScaleDown(pod, originalPod, modifiedPod) {
if loggerV := logger.V(10); loggerV.Enabled() {
// Log more information.
loggerV.Info("another Pod got modified, but the modification isn't related to the resource request", "pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod), "diff", cmp.Diff(originalPod, modifiedPod))
} else {
logger.V(5).Info("another Pod got modified, but the modification isn't related to the resource request", "pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod))
}
return framework.QueueSkip, nil
}
logger.V(5).Info("the max request resources of another scheduled pod got reduced and it may make the unscheduled pod schedulable", "pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod))
return framework.Queue, nil
}
// isResourceScaleDown checks whether the resource request of the modified pod is less than the original pod
// for the resources requested by the pod we are trying to schedule.
func (f *Fit) isResourceScaleDown(targetPod, originalOtherPod, modifiedOtherPod *v1.Pod) bool {
if modifiedOtherPod.Spec.NodeName == "" {
// no resource is freed up whatever the pod is modified.
return false
}
// the other pod was scheduled, so modification or deletion may free up some resources.
originalMaxResourceReq, modifiedMaxResourceReq := &framework.Resource{}, &framework.Resource{}
originalMaxResourceReq.SetMaxResource(resource.PodRequests(originalOtherPod, resource.PodResourcesOptions{InPlacePodVerticalScalingEnabled: f.enableInPlacePodVerticalScaling}))
modifiedMaxResourceReq.SetMaxResource(resource.PodRequests(modifiedOtherPod, resource.PodResourcesOptions{InPlacePodVerticalScalingEnabled: f.enableInPlacePodVerticalScaling}))
// check whether the resource request of the modified pod is less than the original pod.
podRequests := resource.PodRequests(targetPod, resource.PodResourcesOptions{InPlacePodVerticalScalingEnabled: f.enableInPlacePodVerticalScaling})
for rName, rValue := range podRequests {
if rValue.IsZero() {
// We only care about the resources requested by the pod we are trying to schedule.
continue
}
switch rName {
case v1.ResourceCPU:
if originalMaxResourceReq.MilliCPU > modifiedMaxResourceReq.MilliCPU {
return true
}
case v1.ResourceMemory:
if originalMaxResourceReq.Memory > modifiedMaxResourceReq.Memory {
return true
}
case v1.ResourceEphemeralStorage:
if originalMaxResourceReq.EphemeralStorage > modifiedMaxResourceReq.EphemeralStorage {
return true
}
default:
if schedutil.IsScalarResourceName(rName) && originalMaxResourceReq.ScalarResources[rName] > modifiedMaxResourceReq.ScalarResources[rName] {
return true
}
}
}
return false
}
// isSchedulableAfterNodeChange is invoked whenever a node added or changed. It checks whether
// that change made a previously unschedulable pod schedulable.
func (f *Fit) isSchedulableAfterNodeChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
_, modifiedNode, err := schedutil.As[*v1.Node](oldObj, newObj)
if err != nil {
return framework.Queue, err
}
// TODO: also check if the original node meets the pod's resource requestments once preCheck is completely removed.
// See: https://github.com/kubernetes/kubernetes/issues/110175
if isFit(pod, modifiedNode) {
logger.V(5).Info("node was updated, and may fit with the pod's resource requestments", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode))
return framework.Queue, nil
}
logger.V(5).Info("node was created or updated, but it doesn't have enough resource(s) to accommodate this pod", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode))
return framework.QueueSkip, nil
}
// isFit checks if the pod fits the node. If the node is nil, it returns false.
// It constructs a fake NodeInfo object for the node and checks if the pod fits the node.
func isFit(pod *v1.Pod, node *v1.Node) bool {
if node == nil {
return false
}
nodeInfo := framework.NewNodeInfo()
nodeInfo.SetNode(node)
return len(Fits(pod, nodeInfo)) == 0
}
// Filter invoked at the filter extension point.
// Checks if a node has sufficient resources, such as cpu, memory, gpu, opaque int resources etc to run a pod.
// It returns a list of insufficient resources, if empty, then the node has all the resources requested by the pod.

View File

@ -23,9 +23,11 @@ import (
"testing"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/klog/v2/ktesting"
_ "k8s.io/klog/v2/ktesting/init"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
@ -1112,9 +1114,246 @@ func TestEventsToRegister(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
fp := &Fit{enableInPlacePodVerticalScaling: test.inPlacePodVerticalScalingEnabled}
actualClusterEvents := fp.EventsToRegister()
for i := range actualClusterEvents {
actualClusterEvents[i].QueueingHintFn = nil
}
if diff := cmp.Diff(test.expectedClusterEvents, actualClusterEvents); diff != "" {
t.Error("Cluster Events doesn't match extected events (-expected +actual):\n", diff)
}
})
}
}
func Test_isSchedulableAfterPodChange(t *testing.T) {
testcases := map[string]struct {
pod *v1.Pod
oldObj, newObj interface{}
enableInPlacePodVerticalScaling bool
expectedHint framework.QueueingHint
expectedErr bool
}{
"backoff-wrong-old-object": {
pod: &v1.Pod{},
oldObj: "not-a-pod",
enableInPlacePodVerticalScaling: true,
expectedHint: framework.Queue,
expectedErr: true,
},
"backoff-wrong-new-object": {
pod: &v1.Pod{},
newObj: "not-a-pod",
enableInPlacePodVerticalScaling: true,
expectedHint: framework.Queue,
expectedErr: true,
},
"queue-on-deleted": {
pod: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(),
oldObj: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Node("fake").Obj(),
enableInPlacePodVerticalScaling: true,
expectedHint: framework.Queue,
},
"skip-queue-on-unscheduled-pod-deleted": {
pod: &v1.Pod{},
oldObj: &v1.Pod{},
enableInPlacePodVerticalScaling: true,
expectedHint: framework.QueueSkip,
},
"skip-queue-on-disable-inplace-pod-vertical-scaling": {
pod: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(),
oldObj: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Node("fake").Obj(),
newObj: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Node("fake").Obj(),
enableInPlacePodVerticalScaling: false,
expectedHint: framework.QueueSkip,
},
"skip-queue-on-unscheduled-pod": {
pod: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(),
oldObj: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj(),
newObj: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(),
enableInPlacePodVerticalScaling: true,
expectedHint: framework.QueueSkip,
},
"skip-queue-on-non-resource-changes": {
pod: &v1.Pod{},
oldObj: st.MakePod().Label("k", "v").Node("fake").Obj(),
newObj: st.MakePod().Label("foo", "bar").Node("fake").Obj(),
enableInPlacePodVerticalScaling: true,
expectedHint: framework.QueueSkip,
},
"skip-queue-on-unrelated-resource-changes": {
pod: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(),
oldObj: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceMemory: "2"}).Node("fake").Obj(),
newObj: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceMemory: "1"}).Node("fake").Obj(),
enableInPlacePodVerticalScaling: true,
expectedHint: framework.QueueSkip,
},
"skip-queue-on-resource-scale-up": {
pod: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(),
oldObj: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Node("fake").Obj(),
newObj: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Node("fake").Obj(),
enableInPlacePodVerticalScaling: true,
expectedHint: framework.QueueSkip,
},
"queue-on-some-resource-scale-down": {
pod: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(),
oldObj: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Node("fake").Obj(),
newObj: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Node("fake").Obj(),
enableInPlacePodVerticalScaling: true,
expectedHint: framework.Queue,
},
}
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
p, err := NewFit(ctx, &config.NodeResourcesFitArgs{ScoringStrategy: defaultScoringStrategy}, nil, plfeature.Features{
EnableInPlacePodVerticalScaling: tc.enableInPlacePodVerticalScaling,
})
if err != nil {
t.Fatal(err)
}
actualHint, err := p.(*Fit).isSchedulableAfterPodChange(logger, tc.pod, tc.oldObj, tc.newObj)
if tc.expectedErr {
require.Error(t, err)
return
}
require.NoError(t, err)
require.Equal(t, tc.expectedHint, actualHint)
})
}
}
func Test_isSchedulableAfterNodeChange(t *testing.T) {
testcases := map[string]struct {
pod *v1.Pod
oldObj, newObj interface{}
expectedHint framework.QueueingHint
expectedErr bool
}{
"backoff-wrong-new-object": {
pod: &v1.Pod{},
newObj: "not-a-node",
expectedHint: framework.Queue,
expectedErr: true,
},
"backoff-wrong-old-object": {
pod: &v1.Pod{},
oldObj: "not-a-node",
newObj: &v1.Node{},
expectedHint: framework.Queue,
expectedErr: true,
},
"skip-queue-on-node-add-without-sufficient-resources": {
pod: newResourcePod(framework.Resource{Memory: 2}),
newObj: st.MakeNode().Capacity(map[v1.ResourceName]string{
v1.ResourceMemory: "1",
}).Obj(),
expectedHint: framework.QueueSkip,
},
"skip-queue-on-node-add-without-required-resource-type": {
pod: newResourcePod(framework.Resource{
ScalarResources: map[v1.ResourceName]int64{extendedResourceA: 1}},
),
newObj: st.MakeNode().Capacity(map[v1.ResourceName]string{
extendedResourceB: "1",
}).Obj(),
expectedHint: framework.QueueSkip,
},
"queue-on-node-add-with-sufficient-resources": {
pod: newResourcePod(framework.Resource{
Memory: 2,
ScalarResources: map[v1.ResourceName]int64{extendedResourceA: 1},
}),
newObj: st.MakeNode().Capacity(map[v1.ResourceName]string{
v1.ResourceMemory: "4",
extendedResourceA: "2",
}).Obj(),
expectedHint: framework.Queue,
},
// uncomment this case when the isSchedulableAfterNodeChange also check the
// original node's resources.
// "skip-queue-on-node-unrelated-changes": {
// pod: &v1.Pod{},
// oldObj: st.MakeNode().Obj(),
// newObj: st.MakeNode().Label("foo", "bar").Obj(),
// expectedHint: framework.QueueSkip,
// },
"skip-queue-on-node-changes-from-suitable-to-unsuitable": {
pod: newResourcePod(framework.Resource{
Memory: 2,
ScalarResources: map[v1.ResourceName]int64{extendedResourceA: 1},
}),
oldObj: st.MakeNode().Capacity(map[v1.ResourceName]string{
v1.ResourceMemory: "4",
extendedResourceA: "2",
}).Obj(),
newObj: st.MakeNode().Capacity(map[v1.ResourceName]string{
v1.ResourceMemory: "1",
extendedResourceA: "2",
}).Obj(),
expectedHint: framework.QueueSkip,
},
"queue-on-node-changes-from-unsuitable-to-suitable": {
pod: newResourcePod(framework.Resource{
Memory: 2,
ScalarResources: map[v1.ResourceName]int64{extendedResourceA: 1},
}),
oldObj: st.MakeNode().Capacity(map[v1.ResourceName]string{
v1.ResourceMemory: "1",
extendedResourceA: "2",
}).Obj(),
newObj: st.MakeNode().Capacity(map[v1.ResourceName]string{
v1.ResourceMemory: "4",
extendedResourceA: "2",
}).Obj(),
expectedHint: framework.Queue,
},
}
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
p, err := NewFit(ctx, &config.NodeResourcesFitArgs{ScoringStrategy: defaultScoringStrategy}, nil, plfeature.Features{})
if err != nil {
t.Fatal(err)
}
actualHint, err := p.(*Fit).isSchedulableAfterNodeChange(logger, tc.pod, tc.oldObj, tc.newObj)
if tc.expectedErr {
require.Error(t, err)
return
}
require.NoError(t, err)
require.Equal(t, tc.expectedHint, actualHint)
})
}
}
func TestIsFit(t *testing.T) {
testCases := map[string]struct {
pod *v1.Pod
node *v1.Node
expected bool
}{
"nil node": {
pod: &v1.Pod{},
expected: false,
},
"insufficient resource": {
pod: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj(),
node: st.MakeNode().Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(),
expected: false,
},
"sufficient resource": {
pod: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(),
node: st.MakeNode().Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj(),
expected: true,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
if got := isFit(tc.pod, tc.node); got != tc.expected {
t.Errorf("expected: %v, got: %v", tc.expected, got)
}
})
}
}