mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #95687 from tangwz/make_profile_an_interface
scheduler: make Profile an interface.
This commit is contained in:
commit
93fcb22e64
@ -1348,7 +1348,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
defProf := sched.Profiles["default-scheduler"]
|
defProf := sched.Profiles["default-scheduler"]
|
||||||
gotPlugins := defProf.Framework.ListPlugins()
|
gotPlugins := defProf.ListPlugins()
|
||||||
if diff := cmp.Diff(tc.wantPlugins, gotPlugins); diff != "" {
|
if diff := cmp.Diff(tc.wantPlugins, gotPlugins); diff != "" {
|
||||||
t.Errorf("unexpected plugins diff (-want, +got): %s", diff)
|
t.Errorf("unexpected plugins diff (-want, +got): %s", diff)
|
||||||
}
|
}
|
||||||
|
@ -15,7 +15,6 @@ go_library(
|
|||||||
"//pkg/scheduler/internal/cache:go_default_library",
|
"//pkg/scheduler/internal/cache:go_default_library",
|
||||||
"//pkg/scheduler/internal/parallelize:go_default_library",
|
"//pkg/scheduler/internal/parallelize:go_default_library",
|
||||||
"//pkg/scheduler/metrics:go_default_library",
|
"//pkg/scheduler/metrics:go_default_library",
|
||||||
"//pkg/scheduler/profile:go_default_library",
|
|
||||||
"//pkg/scheduler/util:go_default_library",
|
"//pkg/scheduler/util:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
|
||||||
@ -48,7 +47,6 @@ go_test(
|
|||||||
"//pkg/scheduler/framework/runtime:go_default_library",
|
"//pkg/scheduler/framework/runtime:go_default_library",
|
||||||
"//pkg/scheduler/internal/cache:go_default_library",
|
"//pkg/scheduler/internal/cache:go_default_library",
|
||||||
"//pkg/scheduler/internal/queue:go_default_library",
|
"//pkg/scheduler/internal/queue:go_default_library",
|
||||||
"//pkg/scheduler/profile:go_default_library",
|
|
||||||
"//pkg/scheduler/testing:go_default_library",
|
"//pkg/scheduler/testing:go_default_library",
|
||||||
"//pkg/scheduler/util:go_default_library",
|
"//pkg/scheduler/util:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
|
@ -36,7 +36,6 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||||
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||||
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/profile"
|
|
||||||
st "k8s.io/kubernetes/pkg/scheduler/testing"
|
st "k8s.io/kubernetes/pkg/scheduler/testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -278,9 +277,6 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
prof := &profile.Profile{
|
|
||||||
Framework: fwk,
|
|
||||||
}
|
|
||||||
|
|
||||||
scheduler := NewGenericScheduler(
|
scheduler := NewGenericScheduler(
|
||||||
cache,
|
cache,
|
||||||
@ -288,7 +284,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
|
|||||||
extenders,
|
extenders,
|
||||||
schedulerapi.DefaultPercentageOfNodesToScore)
|
schedulerapi.DefaultPercentageOfNodesToScore)
|
||||||
podIgnored := &v1.Pod{}
|
podIgnored := &v1.Pod{}
|
||||||
result, err := scheduler.Schedule(context.Background(), prof, framework.NewCycleState(), podIgnored)
|
result, err := scheduler.Schedule(context.Background(), fwk, framework.NewCycleState(), podIgnored)
|
||||||
if test.expectsErr {
|
if test.expectsErr {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Errorf("Unexpected non-error, result %+v", result)
|
t.Errorf("Unexpected non-error, result %+v", result)
|
||||||
|
@ -36,7 +36,6 @@ import (
|
|||||||
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
|
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/profile"
|
|
||||||
"k8s.io/kubernetes/pkg/scheduler/util"
|
"k8s.io/kubernetes/pkg/scheduler/util"
|
||||||
utiltrace "k8s.io/utils/trace"
|
utiltrace "k8s.io/utils/trace"
|
||||||
)
|
)
|
||||||
@ -94,7 +93,7 @@ func (f *FitError) Error() string {
|
|||||||
// onto machines.
|
// onto machines.
|
||||||
// TODO: Rename this type.
|
// TODO: Rename this type.
|
||||||
type ScheduleAlgorithm interface {
|
type ScheduleAlgorithm interface {
|
||||||
Schedule(context.Context, *profile.Profile, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error)
|
Schedule(context.Context, framework.Framework, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error)
|
||||||
// Extenders returns a slice of extender config. This is exposed for
|
// Extenders returns a slice of extender config. This is exposed for
|
||||||
// testing.
|
// testing.
|
||||||
Extenders() []framework.Extender
|
Extenders() []framework.Extender
|
||||||
@ -129,7 +128,7 @@ func (g *genericScheduler) snapshot() error {
|
|||||||
// Schedule tries to schedule the given pod to one of the nodes in the node list.
|
// Schedule tries to schedule the given pod to one of the nodes in the node list.
|
||||||
// If it succeeds, it will return the name of the node.
|
// If it succeeds, it will return the name of the node.
|
||||||
// If it fails, it will return a FitError error with reasons.
|
// If it fails, it will return a FitError error with reasons.
|
||||||
func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
|
func (g *genericScheduler) Schedule(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
|
||||||
trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
|
trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
|
||||||
defer trace.LogIfLong(100 * time.Millisecond)
|
defer trace.LogIfLong(100 * time.Millisecond)
|
||||||
|
|
||||||
@ -142,7 +141,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile,
|
|||||||
return result, ErrNoNodesAvailable
|
return result, ErrNoNodesAvailable
|
||||||
}
|
}
|
||||||
|
|
||||||
feasibleNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, prof, state, pod)
|
feasibleNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, fwk, state, pod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
@ -165,7 +164,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile,
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, feasibleNodes)
|
priorityList, err := g.prioritizeNodes(ctx, fwk, state, pod, feasibleNodes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
@ -235,11 +234,11 @@ func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes i
|
|||||||
|
|
||||||
// Filters the nodes to find the ones that fit the pod based on the framework
|
// Filters the nodes to find the ones that fit the pod based on the framework
|
||||||
// filter plugins and filter extenders.
|
// filter plugins and filter extenders.
|
||||||
func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) {
|
func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) {
|
||||||
filteredNodesStatuses := make(framework.NodeToStatusMap)
|
filteredNodesStatuses := make(framework.NodeToStatusMap)
|
||||||
|
|
||||||
// Run "prefilter" plugins.
|
// Run "prefilter" plugins.
|
||||||
s := prof.RunPreFilterPlugins(ctx, state, pod)
|
s := fwk.RunPreFilterPlugins(ctx, state, pod)
|
||||||
if !s.IsSuccess() {
|
if !s.IsSuccess() {
|
||||||
if !s.IsUnschedulable() {
|
if !s.IsUnschedulable() {
|
||||||
return nil, nil, s.AsError()
|
return nil, nil, s.AsError()
|
||||||
@ -256,7 +255,7 @@ func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, prof *profil
|
|||||||
return nil, filteredNodesStatuses, nil
|
return nil, filteredNodesStatuses, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
feasibleNodes, err := g.findNodesThatPassFilters(ctx, prof, state, pod, filteredNodesStatuses)
|
feasibleNodes, err := g.findNodesThatPassFilters(ctx, fwk, state, pod, filteredNodesStatuses)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
@ -269,7 +268,7 @@ func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, prof *profil
|
|||||||
}
|
}
|
||||||
|
|
||||||
// findNodesThatPassFilters finds the nodes that fit the filter plugins.
|
// findNodesThatPassFilters finds the nodes that fit the filter plugins.
|
||||||
func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
|
func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
|
||||||
allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
|
allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -281,7 +280,7 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p
|
|||||||
// and allow assigning.
|
// and allow assigning.
|
||||||
feasibleNodes := make([]*v1.Node, numNodesToFind)
|
feasibleNodes := make([]*v1.Node, numNodesToFind)
|
||||||
|
|
||||||
if !prof.HasFilterPlugins() {
|
if !fwk.HasFilterPlugins() {
|
||||||
length := len(allNodes)
|
length := len(allNodes)
|
||||||
for i := range feasibleNodes {
|
for i := range feasibleNodes {
|
||||||
feasibleNodes[i] = allNodes[(g.nextStartNodeIndex+i)%length].Node()
|
feasibleNodes[i] = allNodes[(g.nextStartNodeIndex+i)%length].Node()
|
||||||
@ -298,7 +297,7 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p
|
|||||||
// We check the nodes starting from where we left off in the previous scheduling cycle,
|
// We check the nodes starting from where we left off in the previous scheduling cycle,
|
||||||
// this is to make sure all nodes have the same chance of being examined across pods.
|
// this is to make sure all nodes have the same chance of being examined across pods.
|
||||||
nodeInfo := allNodes[(g.nextStartNodeIndex+i)%len(allNodes)]
|
nodeInfo := allNodes[(g.nextStartNodeIndex+i)%len(allNodes)]
|
||||||
fits, status, err := PodPassesFiltersOnNode(ctx, prof.PreemptHandle(), state, pod, nodeInfo)
|
fits, status, err := PodPassesFiltersOnNode(ctx, fwk.PreemptHandle(), state, pod, nodeInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errCh.SendErrorWithCancel(err, cancel)
|
errCh.SendErrorWithCancel(err, cancel)
|
||||||
return
|
return
|
||||||
@ -326,7 +325,7 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p
|
|||||||
// We record Filter extension point latency here instead of in framework.go because framework.RunFilterPlugins
|
// We record Filter extension point latency here instead of in framework.go because framework.RunFilterPlugins
|
||||||
// function is called for each node, whereas we want to have an overall latency for all nodes per scheduling cycle.
|
// function is called for each node, whereas we want to have an overall latency for all nodes per scheduling cycle.
|
||||||
// Note that this latency also includes latency for `addNominatedPods`, which calls framework.RunPreFilterAddPod.
|
// Note that this latency also includes latency for `addNominatedPods`, which calls framework.RunPreFilterAddPod.
|
||||||
metrics.FrameworkExtensionPointDuration.WithLabelValues(runtime.Filter, statusCode.String(), prof.Name).Observe(metrics.SinceInSeconds(beginCheckNode))
|
metrics.FrameworkExtensionPointDuration.WithLabelValues(runtime.Filter, statusCode.String(), fwk.ProfileName()).Observe(metrics.SinceInSeconds(beginCheckNode))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Stops searching for more nodes once the configured number of feasible nodes
|
// Stops searching for more nodes once the configured number of feasible nodes
|
||||||
@ -470,14 +469,14 @@ func PodPassesFiltersOnNode(
|
|||||||
// All scores are finally combined (added) to get the total weighted scores of all nodes
|
// All scores are finally combined (added) to get the total weighted scores of all nodes
|
||||||
func (g *genericScheduler) prioritizeNodes(
|
func (g *genericScheduler) prioritizeNodes(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
prof *profile.Profile,
|
fwk framework.Framework,
|
||||||
state *framework.CycleState,
|
state *framework.CycleState,
|
||||||
pod *v1.Pod,
|
pod *v1.Pod,
|
||||||
nodes []*v1.Node,
|
nodes []*v1.Node,
|
||||||
) (framework.NodeScoreList, error) {
|
) (framework.NodeScoreList, error) {
|
||||||
// If no priority configs are provided, then all nodes will have a score of one.
|
// If no priority configs are provided, then all nodes will have a score of one.
|
||||||
// This is required to generate the priority list in the required format
|
// This is required to generate the priority list in the required format
|
||||||
if len(g.extenders) == 0 && !prof.HasScorePlugins() {
|
if len(g.extenders) == 0 && !fwk.HasScorePlugins() {
|
||||||
result := make(framework.NodeScoreList, 0, len(nodes))
|
result := make(framework.NodeScoreList, 0, len(nodes))
|
||||||
for i := range nodes {
|
for i := range nodes {
|
||||||
result = append(result, framework.NodeScore{
|
result = append(result, framework.NodeScore{
|
||||||
@ -489,13 +488,13 @@ func (g *genericScheduler) prioritizeNodes(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Run PreScore plugins.
|
// Run PreScore plugins.
|
||||||
preScoreStatus := prof.RunPreScorePlugins(ctx, state, pod, nodes)
|
preScoreStatus := fwk.RunPreScorePlugins(ctx, state, pod, nodes)
|
||||||
if !preScoreStatus.IsSuccess() {
|
if !preScoreStatus.IsSuccess() {
|
||||||
return nil, preScoreStatus.AsError()
|
return nil, preScoreStatus.AsError()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run the Score plugins.
|
// Run the Score plugins.
|
||||||
scoresMap, scoreStatus := prof.RunScorePlugins(ctx, state, pod, nodes)
|
scoresMap, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes)
|
||||||
if !scoreStatus.IsSuccess() {
|
if !scoreStatus.IsSuccess() {
|
||||||
return nil, scoreStatus.AsError()
|
return nil, scoreStatus.AsError()
|
||||||
}
|
}
|
||||||
|
@ -47,7 +47,6 @@ import (
|
|||||||
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||||
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||||
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/profile"
|
|
||||||
st "k8s.io/kubernetes/pkg/scheduler/testing"
|
st "k8s.io/kubernetes/pkg/scheduler/testing"
|
||||||
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
|
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
|
||||||
)
|
)
|
||||||
@ -759,9 +758,6 @@ func TestGenericScheduler(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
prof := &profile.Profile{
|
|
||||||
Framework: fwk,
|
|
||||||
}
|
|
||||||
|
|
||||||
scheduler := NewGenericScheduler(
|
scheduler := NewGenericScheduler(
|
||||||
cache,
|
cache,
|
||||||
@ -772,7 +768,7 @@ func TestGenericScheduler(t *testing.T) {
|
|||||||
informerFactory.Start(ctx.Done())
|
informerFactory.Start(ctx.Done())
|
||||||
informerFactory.WaitForCacheSync(ctx.Done())
|
informerFactory.WaitForCacheSync(ctx.Done())
|
||||||
|
|
||||||
result, err := scheduler.Schedule(ctx, prof, framework.NewCycleState(), test.pod)
|
result, err := scheduler.Schedule(ctx, fwk, framework.NewCycleState(), test.pod)
|
||||||
if err != test.wErr && !strings.Contains(err.Error(), test.wErr.Error()) {
|
if err != test.wErr && !strings.Contains(err.Error(), test.wErr.Error()) {
|
||||||
t.Errorf("Unexpected error: %v, expected: %v", err.Error(), test.wErr)
|
t.Errorf("Unexpected error: %v, expected: %v", err.Error(), test.wErr)
|
||||||
}
|
}
|
||||||
@ -817,9 +813,8 @@ func TestFindFitAllError(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
prof := &profile.Profile{Framework: fwk}
|
|
||||||
|
|
||||||
_, nodeToStatusMap, err := scheduler.findNodesThatFitPod(context.Background(), prof, framework.NewCycleState(), &v1.Pod{})
|
_, nodeToStatusMap, err := scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), &v1.Pod{})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
@ -858,10 +853,9 @@ func TestFindFitSomeError(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
prof := &profile.Profile{Framework: fwk}
|
|
||||||
|
|
||||||
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}}
|
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}}
|
||||||
_, nodeToStatusMap, err := scheduler.findNodesThatFitPod(context.Background(), prof, framework.NewCycleState(), pod)
|
_, nodeToStatusMap, err := scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), pod)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
@ -928,7 +922,6 @@ func TestFindFitPredicateCallCounts(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
prof := &profile.Profile{Framework: fwk}
|
|
||||||
|
|
||||||
scheduler := makeScheduler(nodes)
|
scheduler := makeScheduler(nodes)
|
||||||
if err := scheduler.cache.UpdateSnapshot(scheduler.nodeInfoSnapshot); err != nil {
|
if err := scheduler.cache.UpdateSnapshot(scheduler.nodeInfoSnapshot); err != nil {
|
||||||
@ -936,7 +929,7 @@ func TestFindFitPredicateCallCounts(t *testing.T) {
|
|||||||
}
|
}
|
||||||
fwk.PreemptHandle().AddNominatedPod(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "nominated"}, Spec: v1.PodSpec{Priority: &midPriority}}, "1")
|
fwk.PreemptHandle().AddNominatedPod(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "nominated"}, Spec: v1.PodSpec{Priority: &midPriority}}, "1")
|
||||||
|
|
||||||
_, _, err = scheduler.findNodesThatFitPod(context.Background(), prof, framework.NewCycleState(), test.pod)
|
_, _, err = scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), test.pod)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
@ -1084,7 +1077,6 @@ func TestZeroRequest(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error creating framework: %+v", err)
|
t.Fatalf("error creating framework: %+v", err)
|
||||||
}
|
}
|
||||||
prof := &profile.Profile{Framework: fwk}
|
|
||||||
|
|
||||||
scheduler := NewGenericScheduler(
|
scheduler := NewGenericScheduler(
|
||||||
nil,
|
nil,
|
||||||
@ -1095,12 +1087,12 @@ func TestZeroRequest(t *testing.T) {
|
|||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
state := framework.NewCycleState()
|
state := framework.NewCycleState()
|
||||||
_, _, err = scheduler.findNodesThatFitPod(ctx, prof, state, test.pod)
|
_, _, err = scheduler.findNodesThatFitPod(ctx, fwk, state, test.pod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error filtering nodes: %+v", err)
|
t.Fatalf("error filtering nodes: %+v", err)
|
||||||
}
|
}
|
||||||
prof.RunPreScorePlugins(ctx, state, test.pod, test.nodes)
|
fwk.RunPreScorePlugins(ctx, state, test.pod, test.nodes)
|
||||||
list, err := scheduler.prioritizeNodes(ctx, prof, state, test.pod, test.nodes)
|
list, err := scheduler.prioritizeNodes(ctx, fwk, state, test.pod, test.nodes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -1187,14 +1179,14 @@ func TestFairEvaluationForNodes(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
prof := &profile.Profile{Framework: fwk}
|
|
||||||
// To make numAllNodes % nodesToFind != 0
|
// To make numAllNodes % nodesToFind != 0
|
||||||
g.percentageOfNodesToScore = 30
|
g.percentageOfNodesToScore = 30
|
||||||
nodesToFind := int(g.numFeasibleNodesToFind(int32(numAllNodes)))
|
nodesToFind := int(g.numFeasibleNodesToFind(int32(numAllNodes)))
|
||||||
|
|
||||||
// Iterating over all nodes more than twice
|
// Iterating over all nodes more than twice
|
||||||
for i := 0; i < 2*(numAllNodes/nodesToFind+1); i++ {
|
for i := 0; i < 2*(numAllNodes/nodesToFind+1); i++ {
|
||||||
nodesThatFit, _, err := g.findNodesThatFitPod(context.Background(), prof, framework.NewCycleState(), &v1.Pod{})
|
nodesThatFit, _, err := g.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), &v1.Pod{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -205,14 +205,14 @@ func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) {
|
|||||||
if err := sched.SchedulingQueue.Delete(pod); err != nil {
|
if err := sched.SchedulingQueue.Delete(pod); err != nil {
|
||||||
utilruntime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err))
|
utilruntime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err))
|
||||||
}
|
}
|
||||||
prof, err := sched.profileForPod(pod)
|
fwk, err := sched.frameworkForPod(pod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// This shouldn't happen, because we only accept for scheduling the pods
|
// This shouldn't happen, because we only accept for scheduling the pods
|
||||||
// which specify a scheduler name that matches one of the profiles.
|
// which specify a scheduler name that matches one of the profiles.
|
||||||
klog.Error(err)
|
klog.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
prof.Framework.RejectWaitingPod(pod.UID)
|
fwk.RejectWaitingPod(pod.UID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sched *Scheduler) addPodToCache(obj interface{}) {
|
func (sched *Scheduler) addPodToCache(obj interface{}) {
|
||||||
|
@ -158,7 +158,7 @@ func (c *Configurator) create() (*Scheduler, error) {
|
|||||||
return nil, errors.New("at least one profile is required")
|
return nil, errors.New("at least one profile is required")
|
||||||
}
|
}
|
||||||
// Profiles are required to have equivalent queue sort plugins.
|
// Profiles are required to have equivalent queue sort plugins.
|
||||||
lessFn := profiles[c.profiles[0].SchedulerName].Framework.QueueSortFunc()
|
lessFn := profiles[c.profiles[0].SchedulerName].QueueSortFunc()
|
||||||
podQueue := internalqueue.NewSchedulingQueue(
|
podQueue := internalqueue.NewSchedulingQueue(
|
||||||
lessFn,
|
lessFn,
|
||||||
internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second),
|
internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second),
|
||||||
|
@ -504,6 +504,9 @@ type Framework interface {
|
|||||||
|
|
||||||
// ListPlugins returns a map of extension point name to list of configured Plugins.
|
// ListPlugins returns a map of extension point name to list of configured Plugins.
|
||||||
ListPlugins() map[string][]config.Plugin
|
ListPlugins() map[string][]config.Plugin
|
||||||
|
|
||||||
|
// ProfileName returns the profile name associated to this framework.
|
||||||
|
ProfileName() string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle provides data and some tools that plugins can use. It is
|
// Handle provides data and some tools that plugins can use. It is
|
||||||
|
@ -1027,3 +1027,8 @@ func (f *frameworkImpl) pluginsNeeded(plugins *config.Plugins) map[string]config
|
|||||||
func (f *frameworkImpl) PreemptHandle() framework.PreemptHandle {
|
func (f *frameworkImpl) PreemptHandle() framework.PreemptHandle {
|
||||||
return f.preemptHandle
|
return f.preemptHandle
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ProfileName returns the profile name associated to this framework.
|
||||||
|
func (f *frameworkImpl) ProfileName() string {
|
||||||
|
return f.profileName
|
||||||
|
}
|
||||||
|
@ -39,9 +39,7 @@ go_test(
|
|||||||
"//pkg/scheduler/framework:go_default_library",
|
"//pkg/scheduler/framework:go_default_library",
|
||||||
"//pkg/scheduler/framework/runtime:go_default_library",
|
"//pkg/scheduler/framework/runtime:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/api/events/v1:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
|
|
||||||
"//staging/src/k8s.io/client-go/tools/events:go_default_library",
|
"//staging/src/k8s.io/client-go/tools/events:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -36,33 +36,22 @@ type RecorderFactory func(string) events.EventRecorder
|
|||||||
// FrameworkFactory builds a Framework for a given profile configuration.
|
// FrameworkFactory builds a Framework for a given profile configuration.
|
||||||
type FrameworkFactory func(config.KubeSchedulerProfile, ...frameworkruntime.Option) (framework.Framework, error)
|
type FrameworkFactory func(config.KubeSchedulerProfile, ...frameworkruntime.Option) (framework.Framework, error)
|
||||||
|
|
||||||
// Profile is a scheduling profile.
|
// newProfile builds a Profile for the given configuration.
|
||||||
type Profile struct {
|
func newProfile(cfg config.KubeSchedulerProfile, frameworkFact FrameworkFactory, recorderFact RecorderFactory,
|
||||||
framework.Framework
|
opts ...frameworkruntime.Option) (framework.Framework, error) {
|
||||||
Recorder events.EventRecorder
|
|
||||||
Name string
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewProfile builds a Profile for the given configuration.
|
|
||||||
func NewProfile(cfg config.KubeSchedulerProfile, frameworkFact FrameworkFactory, recorderFact RecorderFactory,
|
|
||||||
opts ...frameworkruntime.Option) (*Profile, error) {
|
|
||||||
recorder := recorderFact(cfg.SchedulerName)
|
recorder := recorderFact(cfg.SchedulerName)
|
||||||
opts = append(opts, frameworkruntime.WithEventRecorder(recorder), frameworkruntime.WithProfileName(cfg.SchedulerName))
|
opts = append(opts, frameworkruntime.WithEventRecorder(recorder), frameworkruntime.WithProfileName(cfg.SchedulerName))
|
||||||
fwk, err := frameworkFact(cfg, opts...)
|
fwk, err := frameworkFact(cfg, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &Profile{
|
return fwk, nil
|
||||||
Name: cfg.SchedulerName,
|
|
||||||
Framework: fwk,
|
|
||||||
Recorder: recorder,
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Map holds profiles indexed by scheduler name.
|
// Map holds frameworks indexed by scheduler name.
|
||||||
type Map map[string]*Profile
|
type Map map[string]framework.Framework
|
||||||
|
|
||||||
// NewMap builds the profiles given by the configuration, indexed by name.
|
// NewMap builds the frameworks given by the configuration, indexed by name.
|
||||||
func NewMap(cfgs []config.KubeSchedulerProfile, frameworkFact FrameworkFactory, recorderFact RecorderFactory,
|
func NewMap(cfgs []config.KubeSchedulerProfile, frameworkFact FrameworkFactory, recorderFact RecorderFactory,
|
||||||
opts ...frameworkruntime.Option) (Map, error) {
|
opts ...frameworkruntime.Option) (Map, error) {
|
||||||
m := make(Map)
|
m := make(Map)
|
||||||
@ -72,7 +61,7 @@ func NewMap(cfgs []config.KubeSchedulerProfile, frameworkFact FrameworkFactory,
|
|||||||
if err := v.validate(cfg); err != nil {
|
if err := v.validate(cfg); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
p, err := NewProfile(cfg, frameworkFact, recorderFact, opts...)
|
p, err := newProfile(cfg, frameworkFact, recorderFact, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("creating profile for scheduler name %s: %v", cfg.SchedulerName, err)
|
return nil, fmt.Errorf("creating profile for scheduler name %s: %v", cfg.SchedulerName, err)
|
||||||
}
|
}
|
||||||
|
@ -23,9 +23,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
eventsv1 "k8s.io/api/events/v1"
|
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/client-go/kubernetes/fake"
|
|
||||||
"k8s.io/client-go/tools/events"
|
"k8s.io/client-go/tools/events"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||||
@ -39,94 +37,6 @@ var fakeRegistry = frameworkruntime.Registry{
|
|||||||
"Another": newFakePlugin,
|
"Another": newFakePlugin,
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNewProfile(t *testing.T) {
|
|
||||||
cases := []struct {
|
|
||||||
name string
|
|
||||||
cfg config.KubeSchedulerProfile
|
|
||||||
wantErr string
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "valid",
|
|
||||||
cfg: config.KubeSchedulerProfile{
|
|
||||||
SchedulerName: "valid-profile",
|
|
||||||
Plugins: &config.Plugins{
|
|
||||||
QueueSort: &config.PluginSet{
|
|
||||||
Enabled: []config.Plugin{
|
|
||||||
{Name: "QueueSort"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Bind: &config.PluginSet{
|
|
||||||
Enabled: []config.Plugin{
|
|
||||||
{Name: "Bind1"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
PluginConfig: []config.PluginConfig{
|
|
||||||
{
|
|
||||||
Name: "QueueSort",
|
|
||||||
Args: &runtime.Unknown{Raw: []byte("{}")},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "invalid framework configuration",
|
|
||||||
cfg: config.KubeSchedulerProfile{
|
|
||||||
SchedulerName: "invalid-profile",
|
|
||||||
Plugins: &config.Plugins{
|
|
||||||
QueueSort: &config.PluginSet{
|
|
||||||
Enabled: []config.Plugin{
|
|
||||||
{Name: "QueueSort"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
wantErr: "at least one bind plugin is needed",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "one queue sort plugin required for profile",
|
|
||||||
cfg: config.KubeSchedulerProfile{
|
|
||||||
SchedulerName: "profile-1",
|
|
||||||
Plugins: &config.Plugins{
|
|
||||||
Bind: &config.PluginSet{
|
|
||||||
Enabled: []config.Plugin{
|
|
||||||
{Name: "Bind1"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
wantErr: "no queue sort plugin is enabled",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
for _, tc := range cases {
|
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
|
||||||
c := fake.NewSimpleClientset()
|
|
||||||
b := events.NewBroadcaster(&events.EventSinkImpl{Interface: c.EventsV1()})
|
|
||||||
p, err := NewProfile(tc.cfg, fakeFrameworkFactory, NewRecorderFactory(b))
|
|
||||||
if err := checkErr(err, tc.wantErr); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if len(tc.wantErr) != 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
called := make(chan struct{})
|
|
||||||
var ctrl string
|
|
||||||
stopFn := b.StartEventWatcher(func(obj runtime.Object) {
|
|
||||||
e, _ := obj.(*eventsv1.Event)
|
|
||||||
ctrl = e.ReportingController
|
|
||||||
close(called)
|
|
||||||
})
|
|
||||||
p.Recorder.Eventf(&v1.Pod{}, nil, v1.EventTypeNormal, "", "", "")
|
|
||||||
<-called
|
|
||||||
stopFn()
|
|
||||||
if ctrl != tc.cfg.SchedulerName {
|
|
||||||
t.Errorf("got controller name %q in event, want %q", ctrl, tc.cfg.SchedulerName)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestNewMap(t *testing.T) {
|
func TestNewMap(t *testing.T) {
|
||||||
cases := []struct {
|
cases := []struct {
|
||||||
name string
|
name string
|
||||||
@ -317,6 +227,22 @@ func TestNewMap(t *testing.T) {
|
|||||||
},
|
},
|
||||||
wantErr: "plugins required for profile",
|
wantErr: "plugins required for profile",
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "invalid framework configuration",
|
||||||
|
cfgs: []config.KubeSchedulerProfile{
|
||||||
|
{
|
||||||
|
SchedulerName: "invalid-profile",
|
||||||
|
Plugins: &config.Plugins{
|
||||||
|
QueueSort: &config.PluginSet{
|
||||||
|
Enabled: []config.Plugin{
|
||||||
|
{Name: "QueueSort"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantErr: "at least one bind plugin is needed",
|
||||||
|
},
|
||||||
}
|
}
|
||||||
for _, tc := range cases {
|
for _, tc := range cases {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
@ -307,7 +307,7 @@ func (sched *Scheduler) Run(ctx context.Context) {
|
|||||||
|
|
||||||
// recordSchedulingFailure records an event for the pod that indicates the
|
// recordSchedulingFailure records an event for the pod that indicates the
|
||||||
// pod has failed to schedule. Also, update the pod condition and nominated node name if set.
|
// pod has failed to schedule. Also, update the pod condition and nominated node name if set.
|
||||||
func (sched *Scheduler) recordSchedulingFailure(prof *profile.Profile, podInfo *framework.QueuedPodInfo, err error, reason string, nominatedNode string) {
|
func (sched *Scheduler) recordSchedulingFailure(fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatedNode string) {
|
||||||
sched.Error(podInfo, err)
|
sched.Error(podInfo, err)
|
||||||
|
|
||||||
// Update the scheduling queue with the nominated pod information. Without
|
// Update the scheduling queue with the nominated pod information. Without
|
||||||
@ -319,7 +319,7 @@ func (sched *Scheduler) recordSchedulingFailure(prof *profile.Profile, podInfo *
|
|||||||
}
|
}
|
||||||
|
|
||||||
pod := podInfo.Pod
|
pod := podInfo.Pod
|
||||||
prof.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", err.Error())
|
fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", err.Error())
|
||||||
if err := updatePod(sched.client, pod, &v1.PodCondition{
|
if err := updatePod(sched.client, pod, &v1.PodCondition{
|
||||||
Type: v1.PodScheduled,
|
Type: v1.PodScheduled,
|
||||||
Status: v1.ConditionFalse,
|
Status: v1.ConditionFalse,
|
||||||
@ -369,17 +369,17 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
|
|||||||
// bind binds a pod to a given node defined in a binding object.
|
// bind binds a pod to a given node defined in a binding object.
|
||||||
// The precedence for binding is: (1) extenders and (2) framework plugins.
|
// The precedence for binding is: (1) extenders and (2) framework plugins.
|
||||||
// We expect this to run asynchronously, so we handle binding metrics internally.
|
// We expect this to run asynchronously, so we handle binding metrics internally.
|
||||||
func (sched *Scheduler) bind(ctx context.Context, prof *profile.Profile, assumed *v1.Pod, targetNode string, state *framework.CycleState) (err error) {
|
func (sched *Scheduler) bind(ctx context.Context, fwk framework.Framework, assumed *v1.Pod, targetNode string, state *framework.CycleState) (err error) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
sched.finishBinding(prof, assumed, targetNode, start, err)
|
sched.finishBinding(fwk, assumed, targetNode, start, err)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
bound, err := sched.extendersBinding(assumed, targetNode)
|
bound, err := sched.extendersBinding(assumed, targetNode)
|
||||||
if bound {
|
if bound {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
bindStatus := prof.RunBindPlugins(ctx, state, assumed, targetNode)
|
bindStatus := fwk.RunBindPlugins(ctx, state, assumed, targetNode)
|
||||||
if bindStatus.IsSuccess() {
|
if bindStatus.IsSuccess() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -403,7 +403,7 @@ func (sched *Scheduler) extendersBinding(pod *v1.Pod, node string) (bool, error)
|
|||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sched *Scheduler) finishBinding(prof *profile.Profile, assumed *v1.Pod, targetNode string, start time.Time, err error) {
|
func (sched *Scheduler) finishBinding(fwk framework.Framework, assumed *v1.Pod, targetNode string, start time.Time, err error) {
|
||||||
if finErr := sched.SchedulerCache.FinishBinding(assumed); finErr != nil {
|
if finErr := sched.SchedulerCache.FinishBinding(assumed); finErr != nil {
|
||||||
klog.Errorf("scheduler cache FinishBinding failed: %v", finErr)
|
klog.Errorf("scheduler cache FinishBinding failed: %v", finErr)
|
||||||
}
|
}
|
||||||
@ -413,7 +413,7 @@ func (sched *Scheduler) finishBinding(prof *profile.Profile, assumed *v1.Pod, ta
|
|||||||
}
|
}
|
||||||
|
|
||||||
metrics.DeprecatedBindingLatency.Observe(metrics.SinceInSeconds(start))
|
metrics.DeprecatedBindingLatency.Observe(metrics.SinceInSeconds(start))
|
||||||
prof.Recorder.Eventf(assumed, nil, v1.EventTypeNormal, "Scheduled", "Binding", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode)
|
fwk.EventRecorder().Eventf(assumed, nil, v1.EventTypeNormal, "Scheduled", "Binding", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode)
|
||||||
}
|
}
|
||||||
|
|
||||||
// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
|
// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
|
||||||
@ -424,14 +424,14 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
pod := podInfo.Pod
|
pod := podInfo.Pod
|
||||||
prof, err := sched.profileForPod(pod)
|
fwk, err := sched.frameworkForPod(pod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// This shouldn't happen, because we only accept for scheduling the pods
|
// This shouldn't happen, because we only accept for scheduling the pods
|
||||||
// which specify a scheduler name that matches one of the profiles.
|
// which specify a scheduler name that matches one of the profiles.
|
||||||
klog.Error(err)
|
klog.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if sched.skipPodSchedule(prof, pod) {
|
if sched.skipPodSchedule(fwk, pod) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -443,7 +443,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
|
|||||||
state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)
|
state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)
|
||||||
schedulingCycleCtx, cancel := context.WithCancel(ctx)
|
schedulingCycleCtx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod)
|
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, fwk, state, pod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Schedule() may have failed because the pod would not fit on any host, so we try to
|
// Schedule() may have failed because the pod would not fit on any host, so we try to
|
||||||
// preempt, with the expectation that the next time the pod is tried for scheduling it
|
// preempt, with the expectation that the next time the pod is tried for scheduling it
|
||||||
@ -451,11 +451,11 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
|
|||||||
// into the resources that were preempted, but this is harmless.
|
// into the resources that were preempted, but this is harmless.
|
||||||
nominatedNode := ""
|
nominatedNode := ""
|
||||||
if fitError, ok := err.(*core.FitError); ok {
|
if fitError, ok := err.(*core.FitError); ok {
|
||||||
if !prof.HasPostFilterPlugins() {
|
if !fwk.HasPostFilterPlugins() {
|
||||||
klog.V(3).Infof("No PostFilter plugins are registered, so no preemption will be performed.")
|
klog.V(3).Infof("No PostFilter plugins are registered, so no preemption will be performed.")
|
||||||
} else {
|
} else {
|
||||||
// Run PostFilter plugins to try to make the pod schedulable in a future scheduling cycle.
|
// Run PostFilter plugins to try to make the pod schedulable in a future scheduling cycle.
|
||||||
result, status := prof.RunPostFilterPlugins(ctx, state, pod, fitError.FilteredNodesStatuses)
|
result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.FilteredNodesStatuses)
|
||||||
if status.Code() == framework.Error {
|
if status.Code() == framework.Error {
|
||||||
klog.Errorf("Status after running PostFilter plugins for pod %v/%v: %v", pod.Namespace, pod.Name, status)
|
klog.Errorf("Status after running PostFilter plugins for pod %v/%v: %v", pod.Namespace, pod.Name, status)
|
||||||
} else {
|
} else {
|
||||||
@ -468,15 +468,15 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
|
|||||||
// Pod did not fit anywhere, so it is counted as a failure. If preemption
|
// Pod did not fit anywhere, so it is counted as a failure. If preemption
|
||||||
// succeeds, the pod should get counted as a success the next time we try to
|
// succeeds, the pod should get counted as a success the next time we try to
|
||||||
// schedule it. (hopefully)
|
// schedule it. (hopefully)
|
||||||
metrics.PodUnschedulable(prof.Name, metrics.SinceInSeconds(start))
|
metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
|
||||||
} else if err == core.ErrNoNodesAvailable {
|
} else if err == core.ErrNoNodesAvailable {
|
||||||
// No nodes available is counted as unschedulable rather than an error.
|
// No nodes available is counted as unschedulable rather than an error.
|
||||||
metrics.PodUnschedulable(prof.Name, metrics.SinceInSeconds(start))
|
metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
|
||||||
} else {
|
} else {
|
||||||
klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod))
|
klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod))
|
||||||
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
|
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
|
||||||
}
|
}
|
||||||
sched.recordSchedulingFailure(prof, podInfo, err, v1.PodReasonUnschedulable, nominatedNode)
|
sched.recordSchedulingFailure(fwk, podInfo, err, v1.PodReasonUnschedulable, nominatedNode)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
|
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
|
||||||
@ -487,45 +487,45 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
|
|||||||
// assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
|
// assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
|
||||||
err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
|
err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
|
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
|
||||||
// This is most probably result of a BUG in retrying logic.
|
// This is most probably result of a BUG in retrying logic.
|
||||||
// We report an error here so that pod scheduling can be retried.
|
// We report an error here so that pod scheduling can be retried.
|
||||||
// This relies on the fact that Error will check if the pod has been bound
|
// This relies on the fact that Error will check if the pod has been bound
|
||||||
// to a node and if so will not add it back to the unscheduled pods queue
|
// to a node and if so will not add it back to the unscheduled pods queue
|
||||||
// (otherwise this would cause an infinite loop).
|
// (otherwise this would cause an infinite loop).
|
||||||
sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError, "")
|
sched.recordSchedulingFailure(fwk, assumedPodInfo, err, SchedulerError, "")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run the Reserve method of reserve plugins.
|
// Run the Reserve method of reserve plugins.
|
||||||
if sts := prof.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
|
if sts := fwk.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
|
||||||
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
|
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
|
||||||
// trigger un-reserve to clean up state associated with the reserved Pod
|
// trigger un-reserve to clean up state associated with the reserved Pod
|
||||||
prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
|
fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
|
||||||
if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
|
if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
|
||||||
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
|
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
|
||||||
}
|
}
|
||||||
sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, "")
|
sched.recordSchedulingFailure(fwk, assumedPodInfo, sts.AsError(), SchedulerError, "")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run "permit" plugins.
|
// Run "permit" plugins.
|
||||||
runPermitStatus := prof.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
|
runPermitStatus := fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
|
||||||
if runPermitStatus.Code() != framework.Wait && !runPermitStatus.IsSuccess() {
|
if runPermitStatus.Code() != framework.Wait && !runPermitStatus.IsSuccess() {
|
||||||
var reason string
|
var reason string
|
||||||
if runPermitStatus.IsUnschedulable() {
|
if runPermitStatus.IsUnschedulable() {
|
||||||
metrics.PodUnschedulable(prof.Name, metrics.SinceInSeconds(start))
|
metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
|
||||||
reason = v1.PodReasonUnschedulable
|
reason = v1.PodReasonUnschedulable
|
||||||
} else {
|
} else {
|
||||||
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
|
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
|
||||||
reason = SchedulerError
|
reason = SchedulerError
|
||||||
}
|
}
|
||||||
// One of the plugins returned status different than success or wait.
|
// One of the plugins returned status different than success or wait.
|
||||||
prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
|
fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
|
||||||
if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
|
if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
|
||||||
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
|
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
|
||||||
}
|
}
|
||||||
sched.recordSchedulingFailure(prof, assumedPodInfo, runPermitStatus.AsError(), reason, "")
|
sched.recordSchedulingFailure(fwk, assumedPodInfo, runPermitStatus.AsError(), reason, "")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -536,58 +536,58 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
|
|||||||
metrics.SchedulerGoroutines.WithLabelValues("binding").Inc()
|
metrics.SchedulerGoroutines.WithLabelValues("binding").Inc()
|
||||||
defer metrics.SchedulerGoroutines.WithLabelValues("binding").Dec()
|
defer metrics.SchedulerGoroutines.WithLabelValues("binding").Dec()
|
||||||
|
|
||||||
waitOnPermitStatus := prof.WaitOnPermit(bindingCycleCtx, assumedPod)
|
waitOnPermitStatus := fwk.WaitOnPermit(bindingCycleCtx, assumedPod)
|
||||||
if !waitOnPermitStatus.IsSuccess() {
|
if !waitOnPermitStatus.IsSuccess() {
|
||||||
var reason string
|
var reason string
|
||||||
if waitOnPermitStatus.IsUnschedulable() {
|
if waitOnPermitStatus.IsUnschedulable() {
|
||||||
metrics.PodUnschedulable(prof.Name, metrics.SinceInSeconds(start))
|
metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
|
||||||
reason = v1.PodReasonUnschedulable
|
reason = v1.PodReasonUnschedulable
|
||||||
} else {
|
} else {
|
||||||
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
|
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
|
||||||
reason = SchedulerError
|
reason = SchedulerError
|
||||||
}
|
}
|
||||||
// trigger un-reserve plugins to clean up state associated with the reserved Pod
|
// trigger un-reserve plugins to clean up state associated with the reserved Pod
|
||||||
prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
|
fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
|
||||||
if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
|
if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
|
||||||
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
|
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
|
||||||
}
|
}
|
||||||
sched.recordSchedulingFailure(prof, assumedPodInfo, waitOnPermitStatus.AsError(), reason, "")
|
sched.recordSchedulingFailure(fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, "")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run "prebind" plugins.
|
// Run "prebind" plugins.
|
||||||
preBindStatus := prof.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
|
preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
|
||||||
if !preBindStatus.IsSuccess() {
|
if !preBindStatus.IsSuccess() {
|
||||||
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
|
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
|
||||||
// trigger un-reserve plugins to clean up state associated with the reserved Pod
|
// trigger un-reserve plugins to clean up state associated with the reserved Pod
|
||||||
prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
|
fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
|
||||||
if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
|
if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
|
||||||
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
|
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
|
||||||
}
|
}
|
||||||
sched.recordSchedulingFailure(prof, assumedPodInfo, preBindStatus.AsError(), SchedulerError, "")
|
sched.recordSchedulingFailure(fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, "")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err := sched.bind(bindingCycleCtx, prof, assumedPod, scheduleResult.SuggestedHost, state)
|
err := sched.bind(bindingCycleCtx, fwk, assumedPod, scheduleResult.SuggestedHost, state)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
|
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
|
||||||
// trigger un-reserve plugins to clean up state associated with the reserved Pod
|
// trigger un-reserve plugins to clean up state associated with the reserved Pod
|
||||||
prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
|
fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
|
||||||
if err := sched.SchedulerCache.ForgetPod(assumedPod); err != nil {
|
if err := sched.SchedulerCache.ForgetPod(assumedPod); err != nil {
|
||||||
klog.Errorf("scheduler cache ForgetPod failed: %v", err)
|
klog.Errorf("scheduler cache ForgetPod failed: %v", err)
|
||||||
}
|
}
|
||||||
sched.recordSchedulingFailure(prof, assumedPodInfo, fmt.Errorf("Binding rejected: %v", err), SchedulerError, "")
|
sched.recordSchedulingFailure(fwk, assumedPodInfo, fmt.Errorf("Binding rejected: %v", err), SchedulerError, "")
|
||||||
} else {
|
} else {
|
||||||
// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
|
// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
|
||||||
if klog.V(2).Enabled() {
|
if klog.V(2).Enabled() {
|
||||||
klog.InfoS("Successfully bound pod to node", "pod", klog.KObj(pod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes)
|
klog.InfoS("Successfully bound pod to node", "pod", klog.KObj(pod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes)
|
||||||
}
|
}
|
||||||
metrics.PodScheduled(prof.Name, metrics.SinceInSeconds(start))
|
metrics.PodScheduled(fwk.ProfileName(), metrics.SinceInSeconds(start))
|
||||||
metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts))
|
metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts))
|
||||||
metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(podInfo)).Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp))
|
metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(podInfo)).Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp))
|
||||||
|
|
||||||
// Run "postbind" plugins.
|
// Run "postbind" plugins.
|
||||||
prof.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
|
fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
@ -601,19 +601,19 @@ func getAttemptsLabel(p *framework.QueuedPodInfo) string {
|
|||||||
return strconv.Itoa(p.Attempts)
|
return strconv.Itoa(p.Attempts)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sched *Scheduler) profileForPod(pod *v1.Pod) (*profile.Profile, error) {
|
func (sched *Scheduler) frameworkForPod(pod *v1.Pod) (framework.Framework, error) {
|
||||||
prof, ok := sched.Profiles[pod.Spec.SchedulerName]
|
fwk, ok := sched.Profiles[pod.Spec.SchedulerName]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("profile not found for scheduler name %q", pod.Spec.SchedulerName)
|
return nil, fmt.Errorf("profile not found for scheduler name %q", pod.Spec.SchedulerName)
|
||||||
}
|
}
|
||||||
return prof, nil
|
return fwk, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// skipPodSchedule returns true if we could skip scheduling the pod for specified cases.
|
// skipPodSchedule returns true if we could skip scheduling the pod for specified cases.
|
||||||
func (sched *Scheduler) skipPodSchedule(prof *profile.Profile, pod *v1.Pod) bool {
|
func (sched *Scheduler) skipPodSchedule(fwk framework.Framework, pod *v1.Pod) bool {
|
||||||
// Case 1: pod is being deleted.
|
// Case 1: pod is being deleted.
|
||||||
if pod.DeletionTimestamp != nil {
|
if pod.DeletionTimestamp != nil {
|
||||||
prof.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
|
fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
|
||||||
klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
|
klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
@ -112,7 +112,7 @@ type mockScheduler struct {
|
|||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (es mockScheduler) Schedule(ctx context.Context, profile *profile.Profile, state *framework.CycleState, pod *v1.Pod) (core.ScheduleResult, error) {
|
func (es mockScheduler) Schedule(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (core.ScheduleResult, error) {
|
||||||
return es.result, es.err
|
return es.result, es.err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -340,7 +340,10 @@ func TestSchedulerScheduleOne(t *testing.T) {
|
|||||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||||
)
|
)
|
||||||
fwk, err := st.NewFramework(registerPluginFuncs, frameworkruntime.WithClientSet(client))
|
fwk, err := st.NewFramework(registerPluginFuncs,
|
||||||
|
frameworkruntime.WithClientSet(client),
|
||||||
|
frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)),
|
||||||
|
frameworkruntime.WithProfileName(testSchedulerName))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -357,11 +360,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
|
|||||||
return &framework.QueuedPodInfo{Pod: item.sendPod}
|
return &framework.QueuedPodInfo{Pod: item.sendPod}
|
||||||
},
|
},
|
||||||
Profiles: profile.Map{
|
Profiles: profile.Map{
|
||||||
testSchedulerName: &profile.Profile{
|
testSchedulerName: fwk,
|
||||||
Framework: fwk,
|
|
||||||
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName),
|
|
||||||
Name: testSchedulerName,
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
called := make(chan struct{})
|
called := make(chan struct{})
|
||||||
@ -808,23 +807,21 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
|
|||||||
return true, b, nil
|
return true, b, nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
var recorder events.EventRecorder
|
||||||
|
if broadcaster != nil {
|
||||||
|
recorder = broadcaster.NewRecorder(scheme.Scheme, testSchedulerName)
|
||||||
|
} else {
|
||||||
|
recorder = &events.FakeRecorder{}
|
||||||
|
}
|
||||||
|
|
||||||
fwk, _ := st.NewFramework(
|
fwk, _ := st.NewFramework(
|
||||||
fns,
|
fns,
|
||||||
frameworkruntime.WithClientSet(client),
|
frameworkruntime.WithClientSet(client),
|
||||||
|
frameworkruntime.WithEventRecorder(recorder),
|
||||||
frameworkruntime.WithInformerFactory(informerFactory),
|
frameworkruntime.WithInformerFactory(informerFactory),
|
||||||
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()),
|
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()),
|
||||||
|
frameworkruntime.WithProfileName(testSchedulerName),
|
||||||
)
|
)
|
||||||
prof := &profile.Profile{
|
|
||||||
Framework: fwk,
|
|
||||||
Recorder: &events.FakeRecorder{},
|
|
||||||
Name: testSchedulerName,
|
|
||||||
}
|
|
||||||
if broadcaster != nil {
|
|
||||||
prof.Recorder = broadcaster.NewRecorder(scheme.Scheme, testSchedulerName)
|
|
||||||
}
|
|
||||||
profiles := profile.Map{
|
|
||||||
testSchedulerName: prof,
|
|
||||||
}
|
|
||||||
|
|
||||||
algo := core.NewGenericScheduler(
|
algo := core.NewGenericScheduler(
|
||||||
scache,
|
scache,
|
||||||
@ -843,8 +840,10 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
|
|||||||
Error: func(p *framework.QueuedPodInfo, err error) {
|
Error: func(p *framework.QueuedPodInfo, err error) {
|
||||||
errChan <- err
|
errChan <- err
|
||||||
},
|
},
|
||||||
Profiles: profiles,
|
Profiles: profile.Map{
|
||||||
client: client,
|
testSchedulerName: fwk,
|
||||||
|
},
|
||||||
|
client: client,
|
||||||
}
|
}
|
||||||
|
|
||||||
return sched, bindingChan, errChan
|
return sched, bindingChan, errChan
|
||||||
@ -1164,14 +1163,10 @@ func TestSchedulerBinding(t *testing.T) {
|
|||||||
fwk, err := st.NewFramework([]st.RegisterPluginFunc{
|
fwk, err := st.NewFramework([]st.RegisterPluginFunc{
|
||||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||||
}, frameworkruntime.WithClientSet(client))
|
}, frameworkruntime.WithClientSet(client), frameworkruntime.WithEventRecorder(&events.FakeRecorder{}))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
prof := &profile.Profile{
|
|
||||||
Framework: fwk,
|
|
||||||
Recorder: &events.FakeRecorder{},
|
|
||||||
}
|
|
||||||
stop := make(chan struct{})
|
stop := make(chan struct{})
|
||||||
defer close(stop)
|
defer close(stop)
|
||||||
scache := internalcache.New(100*time.Millisecond, stop)
|
scache := internalcache.New(100*time.Millisecond, stop)
|
||||||
@ -1185,7 +1180,7 @@ func TestSchedulerBinding(t *testing.T) {
|
|||||||
Algorithm: algo,
|
Algorithm: algo,
|
||||||
SchedulerCache: scache,
|
SchedulerCache: scache,
|
||||||
}
|
}
|
||||||
err = sched.bind(context.Background(), prof, pod, "node", nil)
|
err = sched.bind(context.Background(), fwk, pod, "node", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user