diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 0f51df87b0c..1121c0b416b 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -262,6 +262,16 @@ const ( // Enables generic ephemeral inline volume support for pods GenericEphemeralVolume featuregate.Feature = "GenericEphemeralVolume" + // owner: @chendave + // alpha: v1.21 + // + // PreferNominatedNode tells scheduler whether the nominated node will be checked first before looping + // all the rest of nodes in the cluster. + // Enabling this feature also implies the preemptor pod might not be dispatched to the best candidate in + // some corner case, e.g. another node releases enough resources after the nominated node has been set + // and hence is the best candidate instead. + PreferNominatedNode featuregate.Feature = "PreferNominatedNode" + // owner: @tallclair // alpha: v1.12 // beta: v1.14 @@ -771,6 +781,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS GracefulNodeShutdown: {Default: false, PreRelease: featuregate.Alpha}, ServiceLBNodePortControl: {Default: false, PreRelease: featuregate.Alpha}, MixedProtocolLBService: {Default: false, PreRelease: featuregate.Alpha}, + PreferNominatedNode: {Default: false, PreRelease: featuregate.Alpha}, // inherited features from generic apiserver, relisted here to get a conflict if it is changed // unintentionally on either side: diff --git a/pkg/scheduler/core/BUILD b/pkg/scheduler/core/BUILD index 0e745285c5a..d8319e5258d 100644 --- a/pkg/scheduler/core/BUILD +++ b/pkg/scheduler/core/BUILD @@ -9,6 +9,7 @@ go_library( importpath = "k8s.io/kubernetes/pkg/scheduler/core", visibility = ["//visibility:public"], deps = [ + "//pkg/features:go_default_library", "//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/framework:go_default_library", "//pkg/scheduler/framework/runtime:go_default_library", @@ -18,6 +19,7 @@ go_library( "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library", "//vendor/k8s.io/klog/v2:go_default_library", @@ -34,6 +36,7 @@ go_test( embed = [":go_default_library"], deps = [ "//pkg/controller/volume/persistentvolume/util:go_default_library", + "//pkg/features:go_default_library", "//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/framework:go_default_library", "//pkg/scheduler/framework/plugins/defaultbinder:go_default_library", @@ -54,8 +57,10 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", + "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", ], ) diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 6e278feb48d..168bdf6ae0d 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -27,7 +27,9 @@ import ( "k8s.io/klog/v2" v1 "k8s.io/api/core/v1" + "k8s.io/apiserver/pkg/util/feature" extenderv1 "k8s.io/kube-scheduler/extender/v1" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/runtime" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" @@ -195,6 +197,26 @@ func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes i return numNodes } +func (g *genericScheduler) evaluateNominatedNode(ctx context.Context, pod *v1.Pod, fwk framework.Framework, state *framework.CycleState, filteredNodesStatuses framework.NodeToStatusMap) ([]*v1.Node, error) { + nnn := pod.Status.NominatedNodeName + nodeInfo, err := g.nodeInfoSnapshot.Get(nnn) + if err != nil { + return nil, err + } + node := []*framework.NodeInfo{nodeInfo} + feasibleNodes, err := g.findNodesThatPassFilters(ctx, fwk, state, pod, filteredNodesStatuses, node) + if err != nil { + return nil, err + } + + feasibleNodes, err = g.findNodesThatPassExtenders(pod, feasibleNodes, filteredNodesStatuses) + if err != nil { + return nil, err + } + + return feasibleNodes, nil +} + // Filters the nodes to find the ones that fit the pod based on the framework // filter plugins and filter extenders. func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) { @@ -202,27 +224,38 @@ func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, fwk framewor // Run "prefilter" plugins. s := fwk.RunPreFilterPlugins(ctx, state, pod) + allNodes, err := g.nodeInfoSnapshot.NodeInfos().List() + if err != nil { + return nil, nil, err + } if !s.IsSuccess() { if !s.IsUnschedulable() { return nil, nil, s.AsError() } // All nodes will have the same status. Some non trivial refactoring is // needed to avoid this copy. - allNodes, err := g.nodeInfoSnapshot.NodeInfos().List() - if err != nil { - return nil, nil, err - } for _, n := range allNodes { filteredNodesStatuses[n.Node().Name] = s } return nil, filteredNodesStatuses, nil } - feasibleNodes, err := g.findNodesThatPassFilters(ctx, fwk, state, pod, filteredNodesStatuses) + // "NominatedNodeName" can potentially be set in a previous scheduling cycle as a result of preemption. + // This node is likely the only candidate that will fit the pod, and hence we try it first before iterating over all nodes. + if len(pod.Status.NominatedNodeName) > 0 && feature.DefaultFeatureGate.Enabled(features.PreferNominatedNode) { + feasibleNodes, err := g.evaluateNominatedNode(ctx, pod, fwk, state, filteredNodesStatuses) + if err != nil { + klog.ErrorS(err, "Evaluation failed on nominated node", "pod", klog.KObj(pod), "node", pod.Status.NominatedNodeName) + } + // Nominated node passes all the filters, scheduler is good to assign this node to the pod. + if len(feasibleNodes) != 0 { + return feasibleNodes, filteredNodesStatuses, nil + } + } + feasibleNodes, err := g.findNodesThatPassFilters(ctx, fwk, state, pod, filteredNodesStatuses, allNodes) if err != nil { return nil, nil, err } - feasibleNodes, err = g.findNodesThatPassExtenders(pod, feasibleNodes, filteredNodesStatuses) if err != nil { return nil, nil, err @@ -231,22 +264,23 @@ func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, fwk framewor } // findNodesThatPassFilters finds the nodes that fit the filter plugins. -func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod, statuses framework.NodeToStatusMap) ([]*v1.Node, error) { - allNodes, err := g.nodeInfoSnapshot.NodeInfos().List() - if err != nil { - return nil, err - } - - numNodesToFind := g.numFeasibleNodesToFind(int32(len(allNodes))) +func (g *genericScheduler) findNodesThatPassFilters( + ctx context.Context, + fwk framework.Framework, + state *framework.CycleState, + pod *v1.Pod, + statuses framework.NodeToStatusMap, + nodes []*framework.NodeInfo) ([]*v1.Node, error) { + numNodesToFind := g.numFeasibleNodesToFind(int32(len(nodes))) // Create feasible list with enough space to avoid growing it // and allow assigning. feasibleNodes := make([]*v1.Node, numNodesToFind) if !fwk.HasFilterPlugins() { - length := len(allNodes) + length := len(nodes) for i := range feasibleNodes { - feasibleNodes[i] = allNodes[(g.nextStartNodeIndex+i)%length].Node() + feasibleNodes[i] = nodes[(g.nextStartNodeIndex+i)%length].Node() } g.nextStartNodeIndex = (g.nextStartNodeIndex + len(feasibleNodes)) % length return feasibleNodes, nil @@ -259,7 +293,7 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, fwk fra checkNode := func(i int) { // We check the nodes starting from where we left off in the previous scheduling cycle, // this is to make sure all nodes have the same chance of being examined across pods. - nodeInfo := allNodes[(g.nextStartNodeIndex+i)%len(allNodes)] + nodeInfo := nodes[(g.nextStartNodeIndex+i)%len(nodes)] status := fwk.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo) if status.Code() == framework.Error { errCh.SendErrorWithCancel(status.AsError(), cancel) @@ -291,9 +325,9 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, fwk fra // Stops searching for more nodes once the configured number of feasible nodes // are found. - parallelize.Until(ctx, len(allNodes), checkNode) + parallelize.Until(ctx, len(nodes), checkNode) processedNodes := int(feasibleNodesLen) + len(statuses) - g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(allNodes) + g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(nodes) feasibleNodes = feasibleNodes[:feasibleNodesLen] if err := errCh.ReceiveError(); err != nil { diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index 89ed3051cfe..97f082bcbea 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -33,9 +33,12 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" clientsetfake "k8s.io/client-go/kubernetes/fake" + featuregatetesting "k8s.io/component-base/featuregate/testing" pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util" + "k8s.io/kubernetes/pkg/features" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" @@ -1198,3 +1201,88 @@ func TestFairEvaluationForNodes(t *testing.T) { } } } + +func TestPreferNominatedNodeFilterCallCounts(t *testing.T) { + tests := []struct { + name string + feature bool + pod *v1.Pod + nodeReturnCodeMap map[string]framework.Code + expectedCount int32 + expectedPatchRequests int + }{ + { + name: "Enable the feature, pod has the nominated node set, filter is called only once", + feature: true, + pod: st.MakePod().Name("p_with_nominated_node").UID("p").Priority(highPriority).NominatedNodeName("node1").Obj(), + expectedCount: 1, + }, + { + name: "Disable the feature, pod has the nominated node, filter is called for each node", + feature: false, + pod: st.MakePod().Name("p_with_nominated_node").UID("p").Priority(highPriority).NominatedNodeName("node1").Obj(), + expectedCount: 3, + }, + { + name: "pod without the nominated pod, filter is called for each node", + feature: true, + pod: st.MakePod().Name("p_without_nominated_node").UID("p").Priority(highPriority).Obj(), + expectedCount: 3, + }, + { + name: "nominated pod cannot pass the filter, filter is called for each node", + feature: true, + pod: st.MakePod().Name("p_with_nominated_node").UID("p").Priority(highPriority).NominatedNodeName("node1").Obj(), + nodeReturnCodeMap: map[string]framework.Code{"node1": framework.Unschedulable}, + expectedCount: 4, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PreferNominatedNode, test.feature)() + // create three nodes in the cluster. + nodes := makeNodeList([]string{"node1", "node2", "node3"}) + client := &clientsetfake.Clientset{} + cache := internalcache.New(time.Duration(0), wait.NeverStop) + for _, n := range nodes { + cache.AddNode(n) + } + plugin := st.FakeFilterPlugin{FailedNodeReturnCodeMap: test.nodeReturnCodeMap} + registerFakeFilterFunc := st.RegisterFilterPlugin( + "FakeFilter", + func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { + return &plugin, nil + }, + ) + registerPlugins := []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + registerFakeFilterFunc, + st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + } + fwk, err := st.NewFramework( + registerPlugins, + frameworkruntime.WithClientSet(client), + frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()), + ) + if err != nil { + t.Fatal(err) + } + snapshot := internalcache.NewSnapshot(nil, nodes) + scheduler := NewGenericScheduler( + cache, + snapshot, + []framework.Extender{}, + schedulerapi.DefaultPercentageOfNodesToScore).(*genericScheduler) + + _, _, err = scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), test.pod) + + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if test.expectedCount != plugin.NumFilterCalled { + t.Errorf("predicate was called %d times, expected is %d", plugin.NumFilterCalled, test.expectedCount) + } + }) + } +} diff --git a/test/integration/scheduler/preemption_test.go b/test/integration/scheduler/preemption_test.go index 00a36f1cf4a..025190901fb 100644 --- a/test/integration/scheduler/preemption_test.go +++ b/test/integration/scheduler/preemption_test.go @@ -1314,3 +1314,152 @@ func TestPDBInPreemption(t *testing.T) { }) } } + +func initTestPreferNominatedNode(t *testing.T, nsPrefix string, opts ...scheduler.Option) *testutils.TestContext { + testCtx := testutils.InitTestSchedulerWithOptions(t, testutils.InitTestMaster(t, nsPrefix, nil), nil, opts...) + testutils.SyncInformerFactory(testCtx) + // wraps the NextPod() method to make it appear the preemption has been done already and the nominated node has been set. + f := testCtx.Scheduler.NextPod + testCtx.Scheduler.NextPod = func() (podInfo *framework.QueuedPodInfo) { + podInfo = f() + podInfo.Pod.Status.NominatedNodeName = "node-1" + return podInfo + } + go testCtx.Scheduler.Run(testCtx.Ctx) + return testCtx +} + +// TestPreferNominatedNode test when the feature of "PreferNominatedNode" is enabled, the overall scheduling logic is not changed. +// If the nominated node pass all the filters, then preemptor pod will run on the nominated node, otherwise, it will be scheduled +// to another node in the cluster that ables to pass all the filters. +// NOTE: This integration test is not intending to check the logic of preemption, but rather a sanity check when the feature is +// enabled. +func TestPreferNominatedNode(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PreferNominatedNode, true)() + testCtx := initTestPreferNominatedNode(t, "perfer-nominated-node") + t.Cleanup(func() { + testutils.CleanupTest(t, testCtx) + }) + cs := testCtx.ClientSet + defaultPodRes := &v1.ResourceRequirements{Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(100, resource.DecimalSI)}, + } + defaultNodeRes := map[v1.ResourceName]string{ + v1.ResourcePods: "32", + v1.ResourceCPU: "500m", + v1.ResourceMemory: "500", + } + + type nodeConfig struct { + name string + res map[v1.ResourceName]string + } + + tests := []struct { + name string + nodes []*nodeConfig + existingPods []*v1.Pod + pod *v1.Pod + runnningNode string + }{ + { + name: "nominated node released all resource, preemptor is scheduled to the nominated node", + nodes: []*nodeConfig{ + {name: "node-1", res: defaultNodeRes}, + {name: "node-2", res: defaultNodeRes}, + }, + existingPods: []*v1.Pod{ + initPausePod(&pausePodConfig{ + Name: "low-pod1", + Namespace: testCtx.NS.Name, + Priority: &lowPriority, + NodeName: "node-2", + Resources: defaultPodRes, + }), + }, + pod: initPausePod(&pausePodConfig{ + Name: "preemptor-pod", + Namespace: testCtx.NS.Name, + Priority: &highPriority, + Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, + }, + }), + runnningNode: "node-1", + }, + { + name: "nominated node cannot pass all the filters, preemptor should find a different node", + nodes: []*nodeConfig{ + {name: "node-1", res: defaultNodeRes}, + {name: "node-2", res: defaultNodeRes}, + }, + existingPods: []*v1.Pod{ + initPausePod(&pausePodConfig{ + Name: "low-pod1", + Namespace: testCtx.NS.Name, + Priority: &lowPriority, + Resources: defaultPodRes, + NodeName: "node-1", + }), + }, + pod: initPausePod(&pausePodConfig{ + Name: "preemptor-pod", + Namespace: testCtx.NS.Name, + Priority: &highPriority, + Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, + }, + }), + runnningNode: "node-2", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + var err error + var preemptor *v1.Pod + for _, nodeConf := range test.nodes { + _, err := createNode(cs, st.MakeNode().Name(nodeConf.name).Capacity(nodeConf.res).Obj()) + if err != nil { + t.Fatalf("Error creating node %v: %v", nodeConf.name, err) + } + } + pods := make([]*v1.Pod, len(test.existingPods)) + // Create and run existingPods. + for i, p := range test.existingPods { + pods[i], err = runPausePod(cs, p) + if err != nil { + t.Fatalf("Error running pause pod: %v", err) + } + } + preemptor, err = createPausePod(cs, test.pod) + if err != nil { + t.Errorf("Error while creating high priority pod: %v", err) + } + err = wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { + preemptor, err = cs.CoreV1().Pods(test.pod.Namespace).Get(context.TODO(), test.pod.Name, metav1.GetOptions{}) + if err != nil { + t.Errorf("Error getting the preemptor pod info: %v", err) + } + if len(preemptor.Spec.NodeName) == 0 { + return false, err + } + return true, nil + }) + if err != nil { + t.Errorf("Cannot schedule Pod %v/%v, error: %v", test.pod.Namespace, test.pod.Name, err) + } + // Make sure the pod has been scheduled to the right node. + if preemptor.Spec.NodeName != test.runnningNode { + t.Errorf("Expect pod running on %v, got %v.", test.runnningNode, preemptor.Spec.NodeName) + } + pods = append(pods, preemptor) + // cleanup + defer testutils.CleanupPods(cs, t, pods) + cs.CoreV1().Nodes().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{}) + }) + } +}