Merge pull request #93179 from chendave/preemption_improve

prefer nominated node - IMPL
This commit is contained in:
Kubernetes Prow Robot 2021-01-26 21:03:40 -08:00 committed by GitHub
commit 21d3c73f54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 305 additions and 18 deletions

View File

@ -262,6 +262,16 @@ const (
// Enables generic ephemeral inline volume support for pods
GenericEphemeralVolume featuregate.Feature = "GenericEphemeralVolume"
// owner: @chendave
// alpha: v1.21
//
// PreferNominatedNode tells scheduler whether the nominated node will be checked first before looping
// all the rest of nodes in the cluster.
// Enabling this feature also implies the preemptor pod might not be dispatched to the best candidate in
// some corner case, e.g. another node releases enough resources after the nominated node has been set
// and hence is the best candidate instead.
PreferNominatedNode featuregate.Feature = "PreferNominatedNode"
// owner: @tallclair
// alpha: v1.12
// beta: v1.14
@ -771,6 +781,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
GracefulNodeShutdown: {Default: false, PreRelease: featuregate.Alpha},
ServiceLBNodePortControl: {Default: false, PreRelease: featuregate.Alpha},
MixedProtocolLBService: {Default: false, PreRelease: featuregate.Alpha},
PreferNominatedNode: {Default: false, PreRelease: featuregate.Alpha},
// inherited features from generic apiserver, relisted here to get a conflict if it is changed
// unintentionally on either side:

View File

@ -9,6 +9,7 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/scheduler/core",
visibility = ["//visibility:public"],
deps = [
"//pkg/features:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/framework:go_default_library",
"//pkg/scheduler/framework/runtime:go_default_library",
@ -18,6 +19,7 @@ go_library(
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
@ -34,6 +36,7 @@ go_test(
embed = [":go_default_library"],
deps = [
"//pkg/controller/volume/persistentvolume/util:go_default_library",
"//pkg/features:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/framework:go_default_library",
"//pkg/scheduler/framework/plugins/defaultbinder:go_default_library",
@ -54,8 +57,10 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
],
)

View File

@ -27,7 +27,9 @@ import (
"k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
"k8s.io/apiserver/pkg/util/feature"
extenderv1 "k8s.io/kube-scheduler/extender/v1"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
@ -195,6 +197,26 @@ func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes i
return numNodes
}
func (g *genericScheduler) evaluateNominatedNode(ctx context.Context, pod *v1.Pod, fwk framework.Framework, state *framework.CycleState, filteredNodesStatuses framework.NodeToStatusMap) ([]*v1.Node, error) {
nnn := pod.Status.NominatedNodeName
nodeInfo, err := g.nodeInfoSnapshot.Get(nnn)
if err != nil {
return nil, err
}
node := []*framework.NodeInfo{nodeInfo}
feasibleNodes, err := g.findNodesThatPassFilters(ctx, fwk, state, pod, filteredNodesStatuses, node)
if err != nil {
return nil, err
}
feasibleNodes, err = g.findNodesThatPassExtenders(pod, feasibleNodes, filteredNodesStatuses)
if err != nil {
return nil, err
}
return feasibleNodes, nil
}
// Filters the nodes to find the ones that fit the pod based on the framework
// filter plugins and filter extenders.
func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) {
@ -202,27 +224,38 @@ func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, fwk framewor
// Run "prefilter" plugins.
s := fwk.RunPreFilterPlugins(ctx, state, pod)
allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
if err != nil {
return nil, nil, err
}
if !s.IsSuccess() {
if !s.IsUnschedulable() {
return nil, nil, s.AsError()
}
// All nodes will have the same status. Some non trivial refactoring is
// needed to avoid this copy.
allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
if err != nil {
return nil, nil, err
}
for _, n := range allNodes {
filteredNodesStatuses[n.Node().Name] = s
}
return nil, filteredNodesStatuses, nil
}
feasibleNodes, err := g.findNodesThatPassFilters(ctx, fwk, state, pod, filteredNodesStatuses)
// "NominatedNodeName" can potentially be set in a previous scheduling cycle as a result of preemption.
// This node is likely the only candidate that will fit the pod, and hence we try it first before iterating over all nodes.
if len(pod.Status.NominatedNodeName) > 0 && feature.DefaultFeatureGate.Enabled(features.PreferNominatedNode) {
feasibleNodes, err := g.evaluateNominatedNode(ctx, pod, fwk, state, filteredNodesStatuses)
if err != nil {
klog.ErrorS(err, "Evaluation failed on nominated node", "pod", klog.KObj(pod), "node", pod.Status.NominatedNodeName)
}
// Nominated node passes all the filters, scheduler is good to assign this node to the pod.
if len(feasibleNodes) != 0 {
return feasibleNodes, filteredNodesStatuses, nil
}
}
feasibleNodes, err := g.findNodesThatPassFilters(ctx, fwk, state, pod, filteredNodesStatuses, allNodes)
if err != nil {
return nil, nil, err
}
feasibleNodes, err = g.findNodesThatPassExtenders(pod, feasibleNodes, filteredNodesStatuses)
if err != nil {
return nil, nil, err
@ -231,22 +264,23 @@ func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, fwk framewor
}
// findNodesThatPassFilters finds the nodes that fit the filter plugins.
func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
if err != nil {
return nil, err
}
numNodesToFind := g.numFeasibleNodesToFind(int32(len(allNodes)))
func (g *genericScheduler) findNodesThatPassFilters(
ctx context.Context,
fwk framework.Framework,
state *framework.CycleState,
pod *v1.Pod,
statuses framework.NodeToStatusMap,
nodes []*framework.NodeInfo) ([]*v1.Node, error) {
numNodesToFind := g.numFeasibleNodesToFind(int32(len(nodes)))
// Create feasible list with enough space to avoid growing it
// and allow assigning.
feasibleNodes := make([]*v1.Node, numNodesToFind)
if !fwk.HasFilterPlugins() {
length := len(allNodes)
length := len(nodes)
for i := range feasibleNodes {
feasibleNodes[i] = allNodes[(g.nextStartNodeIndex+i)%length].Node()
feasibleNodes[i] = nodes[(g.nextStartNodeIndex+i)%length].Node()
}
g.nextStartNodeIndex = (g.nextStartNodeIndex + len(feasibleNodes)) % length
return feasibleNodes, nil
@ -259,7 +293,7 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, fwk fra
checkNode := func(i int) {
// We check the nodes starting from where we left off in the previous scheduling cycle,
// this is to make sure all nodes have the same chance of being examined across pods.
nodeInfo := allNodes[(g.nextStartNodeIndex+i)%len(allNodes)]
nodeInfo := nodes[(g.nextStartNodeIndex+i)%len(nodes)]
status := fwk.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo)
if status.Code() == framework.Error {
errCh.SendErrorWithCancel(status.AsError(), cancel)
@ -291,9 +325,9 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, fwk fra
// Stops searching for more nodes once the configured number of feasible nodes
// are found.
parallelize.Until(ctx, len(allNodes), checkNode)
parallelize.Until(ctx, len(nodes), checkNode)
processedNodes := int(feasibleNodesLen) + len(statuses)
g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(allNodes)
g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(nodes)
feasibleNodes = feasibleNodes[:feasibleNodesLen]
if err := errCh.ReceiveError(); err != nil {

View File

@ -33,9 +33,12 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
featuregatetesting "k8s.io/component-base/featuregate/testing"
pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util"
"k8s.io/kubernetes/pkg/features"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
@ -1198,3 +1201,88 @@ func TestFairEvaluationForNodes(t *testing.T) {
}
}
}
func TestPreferNominatedNodeFilterCallCounts(t *testing.T) {
tests := []struct {
name string
feature bool
pod *v1.Pod
nodeReturnCodeMap map[string]framework.Code
expectedCount int32
expectedPatchRequests int
}{
{
name: "Enable the feature, pod has the nominated node set, filter is called only once",
feature: true,
pod: st.MakePod().Name("p_with_nominated_node").UID("p").Priority(highPriority).NominatedNodeName("node1").Obj(),
expectedCount: 1,
},
{
name: "Disable the feature, pod has the nominated node, filter is called for each node",
feature: false,
pod: st.MakePod().Name("p_with_nominated_node").UID("p").Priority(highPriority).NominatedNodeName("node1").Obj(),
expectedCount: 3,
},
{
name: "pod without the nominated pod, filter is called for each node",
feature: true,
pod: st.MakePod().Name("p_without_nominated_node").UID("p").Priority(highPriority).Obj(),
expectedCount: 3,
},
{
name: "nominated pod cannot pass the filter, filter is called for each node",
feature: true,
pod: st.MakePod().Name("p_with_nominated_node").UID("p").Priority(highPriority).NominatedNodeName("node1").Obj(),
nodeReturnCodeMap: map[string]framework.Code{"node1": framework.Unschedulable},
expectedCount: 4,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PreferNominatedNode, test.feature)()
// create three nodes in the cluster.
nodes := makeNodeList([]string{"node1", "node2", "node3"})
client := &clientsetfake.Clientset{}
cache := internalcache.New(time.Duration(0), wait.NeverStop)
for _, n := range nodes {
cache.AddNode(n)
}
plugin := st.FakeFilterPlugin{FailedNodeReturnCodeMap: test.nodeReturnCodeMap}
registerFakeFilterFunc := st.RegisterFilterPlugin(
"FakeFilter",
func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) {
return &plugin, nil
},
)
registerPlugins := []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
registerFakeFilterFunc,
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
}
fwk, err := st.NewFramework(
registerPlugins,
frameworkruntime.WithClientSet(client),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()),
)
if err != nil {
t.Fatal(err)
}
snapshot := internalcache.NewSnapshot(nil, nodes)
scheduler := NewGenericScheduler(
cache,
snapshot,
[]framework.Extender{},
schedulerapi.DefaultPercentageOfNodesToScore).(*genericScheduler)
_, _, err = scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), test.pod)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if test.expectedCount != plugin.NumFilterCalled {
t.Errorf("predicate was called %d times, expected is %d", plugin.NumFilterCalled, test.expectedCount)
}
})
}
}

View File

@ -1314,3 +1314,152 @@ func TestPDBInPreemption(t *testing.T) {
})
}
}
func initTestPreferNominatedNode(t *testing.T, nsPrefix string, opts ...scheduler.Option) *testutils.TestContext {
testCtx := testutils.InitTestSchedulerWithOptions(t, testutils.InitTestMaster(t, nsPrefix, nil), nil, opts...)
testutils.SyncInformerFactory(testCtx)
// wraps the NextPod() method to make it appear the preemption has been done already and the nominated node has been set.
f := testCtx.Scheduler.NextPod
testCtx.Scheduler.NextPod = func() (podInfo *framework.QueuedPodInfo) {
podInfo = f()
podInfo.Pod.Status.NominatedNodeName = "node-1"
return podInfo
}
go testCtx.Scheduler.Run(testCtx.Ctx)
return testCtx
}
// TestPreferNominatedNode test when the feature of "PreferNominatedNode" is enabled, the overall scheduling logic is not changed.
// If the nominated node pass all the filters, then preemptor pod will run on the nominated node, otherwise, it will be scheduled
// to another node in the cluster that ables to pass all the filters.
// NOTE: This integration test is not intending to check the logic of preemption, but rather a sanity check when the feature is
// enabled.
func TestPreferNominatedNode(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PreferNominatedNode, true)()
testCtx := initTestPreferNominatedNode(t, "perfer-nominated-node")
t.Cleanup(func() {
testutils.CleanupTest(t, testCtx)
})
cs := testCtx.ClientSet
defaultPodRes := &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(100, resource.DecimalSI)},
}
defaultNodeRes := map[v1.ResourceName]string{
v1.ResourcePods: "32",
v1.ResourceCPU: "500m",
v1.ResourceMemory: "500",
}
type nodeConfig struct {
name string
res map[v1.ResourceName]string
}
tests := []struct {
name string
nodes []*nodeConfig
existingPods []*v1.Pod
pod *v1.Pod
runnningNode string
}{
{
name: "nominated node released all resource, preemptor is scheduled to the nominated node",
nodes: []*nodeConfig{
{name: "node-1", res: defaultNodeRes},
{name: "node-2", res: defaultNodeRes},
},
existingPods: []*v1.Pod{
initPausePod(&pausePodConfig{
Name: "low-pod1",
Namespace: testCtx.NS.Name,
Priority: &lowPriority,
NodeName: "node-2",
Resources: defaultPodRes,
}),
},
pod: initPausePod(&pausePodConfig{
Name: "preemptor-pod",
Namespace: testCtx.NS.Name,
Priority: &highPriority,
Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)},
},
}),
runnningNode: "node-1",
},
{
name: "nominated node cannot pass all the filters, preemptor should find a different node",
nodes: []*nodeConfig{
{name: "node-1", res: defaultNodeRes},
{name: "node-2", res: defaultNodeRes},
},
existingPods: []*v1.Pod{
initPausePod(&pausePodConfig{
Name: "low-pod1",
Namespace: testCtx.NS.Name,
Priority: &lowPriority,
Resources: defaultPodRes,
NodeName: "node-1",
}),
},
pod: initPausePod(&pausePodConfig{
Name: "preemptor-pod",
Namespace: testCtx.NS.Name,
Priority: &highPriority,
Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)},
},
}),
runnningNode: "node-2",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var err error
var preemptor *v1.Pod
for _, nodeConf := range test.nodes {
_, err := createNode(cs, st.MakeNode().Name(nodeConf.name).Capacity(nodeConf.res).Obj())
if err != nil {
t.Fatalf("Error creating node %v: %v", nodeConf.name, err)
}
}
pods := make([]*v1.Pod, len(test.existingPods))
// Create and run existingPods.
for i, p := range test.existingPods {
pods[i], err = runPausePod(cs, p)
if err != nil {
t.Fatalf("Error running pause pod: %v", err)
}
}
preemptor, err = createPausePod(cs, test.pod)
if err != nil {
t.Errorf("Error while creating high priority pod: %v", err)
}
err = wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
preemptor, err = cs.CoreV1().Pods(test.pod.Namespace).Get(context.TODO(), test.pod.Name, metav1.GetOptions{})
if err != nil {
t.Errorf("Error getting the preemptor pod info: %v", err)
}
if len(preemptor.Spec.NodeName) == 0 {
return false, err
}
return true, nil
})
if err != nil {
t.Errorf("Cannot schedule Pod %v/%v, error: %v", test.pod.Namespace, test.pod.Name, err)
}
// Make sure the pod has been scheduled to the right node.
if preemptor.Spec.NodeName != test.runnningNode {
t.Errorf("Expect pod running on %v, got %v.", test.runnningNode, preemptor.Spec.NodeName)
}
pods = append(pods, preemptor)
// cleanup
defer testutils.CleanupPods(cs, t, pods)
cs.CoreV1().Nodes().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{})
})
}
}