mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #86133 from Huang-Wei/cleanup-predicate-path
Eliminate running paths of Predicates in scheduler
This commit is contained in:
commit
cfdc365525
@ -77,13 +77,17 @@ go_test(
|
||||
"//pkg/scheduler/core:go_default_library",
|
||||
"//pkg/scheduler/framework/plugins:go_default_library",
|
||||
"//pkg/scheduler/framework/plugins/nodelabel:go_default_library",
|
||||
"//pkg/scheduler/framework/plugins/nodeports:go_default_library",
|
||||
"//pkg/scheduler/framework/plugins/noderesources:go_default_library",
|
||||
"//pkg/scheduler/framework/plugins/serviceaffinity:go_default_library",
|
||||
"//pkg/scheduler/framework/plugins/volumebinding:go_default_library",
|
||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||
"//pkg/scheduler/internal/cache:go_default_library",
|
||||
"//pkg/scheduler/internal/cache/fake:go_default_library",
|
||||
"//pkg/scheduler/internal/queue:go_default_library",
|
||||
"//pkg/scheduler/nodeinfo:go_default_library",
|
||||
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
|
||||
"//pkg/scheduler/testing:go_default_library",
|
||||
"//pkg/scheduler/volumebinder:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/api/events/v1beta1:go_default_library",
|
||||
|
@ -71,6 +71,7 @@ var (
|
||||
schedulerFactoryMutex sync.RWMutex
|
||||
|
||||
// maps that hold registered algorithm types
|
||||
// TODO(Huang-Wei): remove this.
|
||||
fitPredicateMap = make(map[string]FitPredicateFactory)
|
||||
mandatoryFitPredicates = sets.NewString()
|
||||
priorityFunctionMap = make(map[string]PriorityConfigFactory)
|
||||
@ -143,6 +144,7 @@ func ApplyPredicatesAndPriorities(s *Snapshot) {
|
||||
|
||||
// RegisterFitPredicate registers a fit predicate with the algorithm
|
||||
// registry. Returns the name with which the predicate was registered.
|
||||
// TODO(Huang-Wei): remove this.
|
||||
func RegisterFitPredicate(name string, predicate predicates.FitPredicate) string {
|
||||
return RegisterFitPredicateFactory(name, func(AlgorithmFactoryArgs) predicates.FitPredicate { return predicate })
|
||||
}
|
||||
|
@ -55,6 +55,9 @@ go_test(
|
||||
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
|
||||
"//pkg/scheduler/apis/config:go_default_library",
|
||||
"//pkg/scheduler/apis/extender/v1:go_default_library",
|
||||
"//pkg/scheduler/framework/plugins/interpodaffinity:go_default_library",
|
||||
"//pkg/scheduler/framework/plugins/noderesources:go_default_library",
|
||||
"//pkg/scheduler/framework/plugins/podtopologyspread:go_default_library",
|
||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||
"//pkg/scheduler/internal/cache:go_default_library",
|
||||
"//pkg/scheduler/internal/queue:go_default_library",
|
||||
@ -62,6 +65,7 @@ go_test(
|
||||
"//pkg/scheduler/listers/fake:go_default_library",
|
||||
"//pkg/scheduler/nodeinfo:go_default_library",
|
||||
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
|
||||
"//pkg/scheduler/testing:go_default_library",
|
||||
"//pkg/scheduler/util:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||
|
@ -42,6 +42,7 @@ import (
|
||||
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
||||
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
||||
st "k8s.io/kubernetes/pkg/scheduler/testing"
|
||||
"k8s.io/kubernetes/pkg/scheduler/util"
|
||||
)
|
||||
|
||||
@ -348,16 +349,16 @@ var _ algorithm.SchedulerExtender = &FakeExtender{}
|
||||
|
||||
func TestGenericSchedulerWithExtenders(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
predicates map[string]predicates.FitPredicate
|
||||
prioritizers []priorities.PriorityConfig
|
||||
extenders []FakeExtender
|
||||
nodes []string
|
||||
expectedResult ScheduleResult
|
||||
expectsErr bool
|
||||
name string
|
||||
registerFilterPlugin st.RegisterFilterPluginFunc
|
||||
prioritizers []priorities.PriorityConfig
|
||||
extenders []FakeExtender
|
||||
nodes []string
|
||||
expectedResult ScheduleResult
|
||||
expectsErr bool
|
||||
}{
|
||||
{
|
||||
predicates: map[string]predicates.FitPredicate{"true": truePredicate},
|
||||
registerFilterPlugin: st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
|
||||
extenders: []FakeExtender{
|
||||
{
|
||||
predicates: []fitPredicate{truePredicateExtender},
|
||||
@ -371,7 +372,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
|
||||
name: "test 1",
|
||||
},
|
||||
{
|
||||
predicates: map[string]predicates.FitPredicate{"true": truePredicate},
|
||||
registerFilterPlugin: st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
|
||||
extenders: []FakeExtender{
|
||||
{
|
||||
predicates: []fitPredicate{truePredicateExtender},
|
||||
@ -385,7 +386,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
|
||||
name: "test 2",
|
||||
},
|
||||
{
|
||||
predicates: map[string]predicates.FitPredicate{"true": truePredicate},
|
||||
registerFilterPlugin: st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
|
||||
extenders: []FakeExtender{
|
||||
{
|
||||
predicates: []fitPredicate{truePredicateExtender},
|
||||
@ -403,7 +404,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
|
||||
name: "test 3",
|
||||
},
|
||||
{
|
||||
predicates: map[string]predicates.FitPredicate{"true": truePredicate},
|
||||
registerFilterPlugin: st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
|
||||
extenders: []FakeExtender{
|
||||
{
|
||||
predicates: []fitPredicate{machine2PredicateExtender},
|
||||
@ -417,7 +418,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
|
||||
name: "test 4",
|
||||
},
|
||||
{
|
||||
predicates: map[string]predicates.FitPredicate{"true": truePredicate},
|
||||
registerFilterPlugin: st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
|
||||
extenders: []FakeExtender{
|
||||
{
|
||||
predicates: []fitPredicate{truePredicateExtender},
|
||||
@ -434,7 +435,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
|
||||
name: "test 5",
|
||||
},
|
||||
{
|
||||
predicates: map[string]predicates.FitPredicate{"true": truePredicate},
|
||||
registerFilterPlugin: st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
|
||||
extenders: []FakeExtender{
|
||||
{
|
||||
predicates: []fitPredicate{truePredicateExtender},
|
||||
@ -456,8 +457,8 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
|
||||
name: "test 6",
|
||||
},
|
||||
{
|
||||
predicates: map[string]predicates.FitPredicate{"true": truePredicate},
|
||||
prioritizers: []priorities.PriorityConfig{{Map: machine2Prioritizer, Weight: 20}},
|
||||
registerFilterPlugin: st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
|
||||
prioritizers: []priorities.PriorityConfig{{Map: machine2Prioritizer, Weight: 20}},
|
||||
extenders: []FakeExtender{
|
||||
{
|
||||
predicates: []fitPredicate{truePredicateExtender},
|
||||
@ -481,8 +482,8 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
|
||||
// If scheduler sends the pod by mistake, the test would fail
|
||||
// because of the errors from errorPredicateExtender and/or
|
||||
// errorPrioritizerExtender.
|
||||
predicates: map[string]predicates.FitPredicate{"true": truePredicate},
|
||||
prioritizers: []priorities.PriorityConfig{{Map: machine2Prioritizer, Weight: 1}},
|
||||
registerFilterPlugin: st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
|
||||
prioritizers: []priorities.PriorityConfig{{Map: machine2Prioritizer, Weight: 1}},
|
||||
extenders: []FakeExtender{
|
||||
{
|
||||
predicates: []fitPredicate{errorPredicateExtender},
|
||||
@ -505,7 +506,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
|
||||
//
|
||||
// If scheduler did not ignore the extender, the test would fail
|
||||
// because of the errors from errorPredicateExtender.
|
||||
predicates: map[string]predicates.FitPredicate{"true": truePredicate},
|
||||
registerFilterPlugin: st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
|
||||
extenders: []FakeExtender{
|
||||
{
|
||||
predicates: []fitPredicate{errorPredicateExtender},
|
||||
@ -540,15 +541,24 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
|
||||
cache.AddNode(createNode(name))
|
||||
}
|
||||
queue := internalqueue.NewSchedulingQueue(nil)
|
||||
|
||||
registry := framework.Registry{}
|
||||
plugins := &schedulerapi.Plugins{
|
||||
Filter: &schedulerapi.PluginSet{},
|
||||
}
|
||||
var pluginConfigs []schedulerapi.PluginConfig
|
||||
test.registerFilterPlugin(®istry, plugins, pluginConfigs)
|
||||
fwk, _ := framework.NewFramework(registry, plugins, pluginConfigs)
|
||||
|
||||
scheduler := NewGenericScheduler(
|
||||
cache,
|
||||
queue,
|
||||
test.predicates,
|
||||
nil,
|
||||
predicates.EmptyMetadataProducer,
|
||||
test.prioritizers,
|
||||
priorities.EmptyMetadataProducer,
|
||||
emptySnapshot,
|
||||
emptyFramework,
|
||||
fwk,
|
||||
extenders,
|
||||
nil,
|
||||
informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
|
||||
|
@ -72,8 +72,9 @@ type FailedPredicateMap map[string][]predicates.PredicateFailureReason
|
||||
|
||||
// FitError describes a fit error of a pod.
|
||||
type FitError struct {
|
||||
Pod *v1.Pod
|
||||
NumAllNodes int
|
||||
Pod *v1.Pod
|
||||
NumAllNodes int
|
||||
// TODO(Huang-Wei): remove 'FailedPredicates'
|
||||
FailedPredicates FailedPredicateMap
|
||||
FilteredNodesStatuses framework.NodeToStatusMap
|
||||
}
|
||||
@ -476,12 +477,13 @@ func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes i
|
||||
|
||||
// Filters the nodes to find the ones that fit based on the given predicate functions
|
||||
// Each node is passed through the predicate functions to determine if it is a fit
|
||||
// TODO(Huang-Wei): remove 'FailedPredicateMap' from the return parameters.
|
||||
func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, FailedPredicateMap, framework.NodeToStatusMap, error) {
|
||||
var filtered []*v1.Node
|
||||
failedPredicateMap := FailedPredicateMap{}
|
||||
filteredNodesStatuses := framework.NodeToStatusMap{}
|
||||
|
||||
if len(g.predicates) == 0 && !g.framework.HasFilterPlugins() {
|
||||
if !g.framework.HasFilterPlugins() {
|
||||
filtered = g.nodeInfoSnapshot.ListNodes()
|
||||
} else {
|
||||
allNodes := len(g.nodeInfoSnapshot.NodeInfoList)
|
||||
@ -506,7 +508,7 @@ func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framewor
|
||||
// We check the nodes starting from where we left off in the previous scheduling cycle,
|
||||
// this is to make sure all nodes have the same chance of being examined across pods.
|
||||
nodeInfo := g.nodeInfoSnapshot.NodeInfoList[(g.nextStartNodeIndex+i)%allNodes]
|
||||
fits, failedPredicates, status, err := g.podFitsOnNode(
|
||||
fits, _, status, err := g.podFitsOnNode(
|
||||
ctx,
|
||||
state,
|
||||
pod,
|
||||
@ -531,9 +533,6 @@ func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framewor
|
||||
if !status.IsSuccess() {
|
||||
filteredNodesStatuses[nodeInfo.Node().Name] = status
|
||||
}
|
||||
if len(failedPredicates) != 0 {
|
||||
failedPredicateMap[nodeInfo.Node().Name] = failedPredicates
|
||||
}
|
||||
predicateResultLock.Unlock()
|
||||
}
|
||||
}
|
||||
@ -566,6 +565,7 @@ func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framewor
|
||||
return []*v1.Node{}, FailedPredicateMap{}, framework.NodeToStatusMap{}, err
|
||||
}
|
||||
|
||||
// TODO(Huang-Wei): refactor this to fill 'filteredNodesStatuses' instead of 'failedPredicateMap'.
|
||||
for failedNodeName, failedMsg := range failedMap {
|
||||
if _, found := failedPredicateMap[failedNodeName]; !found {
|
||||
failedPredicateMap[failedNodeName] = []predicates.PredicateFailureReason{}
|
||||
@ -584,6 +584,7 @@ func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framewor
|
||||
// addNominatedPods adds pods with equal or greater priority which are nominated
|
||||
// to run on the node given in nodeInfo to meta and nodeInfo. It returns 1) whether
|
||||
// any pod was added, 2) augmented metadata, 3) augmented CycleState 4) augmented nodeInfo.
|
||||
// TODO(Huang-Wei): remove 'meta predicates.Metadata' from the signature.
|
||||
func (g *genericScheduler) addNominatedPods(ctx context.Context, pod *v1.Pod, meta predicates.Metadata, state *framework.CycleState,
|
||||
nodeInfo *schedulernodeinfo.NodeInfo) (bool, predicates.Metadata,
|
||||
*framework.CycleState, *schedulernodeinfo.NodeInfo, error) {
|
||||
@ -662,12 +663,11 @@ func (g *genericScheduler) podFitsOnNode(
|
||||
// nominated pods are running because they are not running right now and in fact,
|
||||
// they may end up getting scheduled to a different node.
|
||||
for i := 0; i < 2; i++ {
|
||||
metaToUse := meta
|
||||
stateToUse := state
|
||||
nodeInfoToUse := info
|
||||
if i == 0 {
|
||||
var err error
|
||||
podsAdded, metaToUse, stateToUse, nodeInfoToUse, err = g.addNominatedPods(ctx, pod, meta, state, info)
|
||||
podsAdded, _, stateToUse, nodeInfoToUse, err = g.addNominatedPods(ctx, pod, meta, state, info)
|
||||
if err != nil {
|
||||
return false, []predicates.PredicateFailureReason{}, nil, err
|
||||
}
|
||||
@ -675,33 +675,6 @@ func (g *genericScheduler) podFitsOnNode(
|
||||
break
|
||||
}
|
||||
|
||||
for _, predicateKey := range predicates.Ordering() {
|
||||
var (
|
||||
fit bool
|
||||
reasons []predicates.PredicateFailureReason
|
||||
err error
|
||||
)
|
||||
|
||||
if predicate, exist := g.predicates[predicateKey]; exist {
|
||||
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
|
||||
if err != nil {
|
||||
return false, []predicates.PredicateFailureReason{}, nil, err
|
||||
}
|
||||
|
||||
if !fit {
|
||||
// eCache is available and valid, and predicates result is unfit, record the fail reasons
|
||||
failedPredicates = append(failedPredicates, reasons...)
|
||||
// if alwaysCheckAllPredicates is false, short circuit all predicates when one predicate fails.
|
||||
if !alwaysCheckAllPredicates {
|
||||
klog.V(5).Infoln("since alwaysCheckAllPredicates has not been set, the predicate " +
|
||||
"evaluation is short circuited and there are chances " +
|
||||
"of other predicates failing as well.")
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
status = g.framework.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
|
||||
if !status.IsSuccess() && !status.IsUnschedulable() {
|
||||
return false, failedPredicates, status, status.AsError()
|
||||
@ -1270,6 +1243,7 @@ func podPassesBasicChecks(pod *v1.Pod, pvcLister corelisters.PersistentVolumeCla
|
||||
}
|
||||
|
||||
// NewGenericScheduler creates a genericScheduler object.
|
||||
// TODO(Huang-Wei): remove 'predicates' and 'alwaysCheckAllPredicates'.
|
||||
func NewGenericScheduler(
|
||||
cache internalcache.Cache,
|
||||
podQueue internalqueue.SchedulingQueue,
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -231,16 +231,28 @@ func TestCreateFromEmptyConfig(t *testing.T) {
|
||||
// Test configures a scheduler from a policy that does not specify any
|
||||
// predicate/priority.
|
||||
// The predicate/priority from DefaultProvider will be used.
|
||||
// TODO(Huang-Wei): refactor (or remove) this test along with eliminating 'RegisterFitPredicate()'.
|
||||
func TestCreateFromConfigWithUnspecifiedPredicatesOrPriorities(t *testing.T) {
|
||||
predicateOne := "PredicateOne"
|
||||
client := fake.NewSimpleClientset()
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight, stopCh)
|
||||
factory.registry.Register(predicateOne, func(_ *runtime.Unknown, fh framework.FrameworkHandle) (framework.Plugin, error) {
|
||||
return &TestPlugin{name: predicateOne}, nil
|
||||
})
|
||||
factory.pluginConfigProducerRegistry.RegisterPredicate(predicateOne, func(_ frameworkplugins.ConfigProducerArgs) (schedulerapi.Plugins, []schedulerapi.PluginConfig) {
|
||||
return schedulerapi.Plugins{
|
||||
Filter: &schedulerapi.PluginSet{
|
||||
Enabled: []schedulerapi.Plugin{{Name: predicateOne}},
|
||||
},
|
||||
}, nil
|
||||
})
|
||||
|
||||
RegisterFitPredicate("PredicateOne", PredicateFunc)
|
||||
RegisterFitPredicate(predicateOne, PredicateFunc)
|
||||
RegisterPriorityMapReduceFunction("PriorityOne", PriorityFunc, nil, 1)
|
||||
|
||||
RegisterAlgorithmProvider(schedulerapi.SchedulerDefaultProviderName, sets.NewString("PredicateOne"), sets.NewString("PriorityOne"))
|
||||
RegisterAlgorithmProvider(schedulerapi.SchedulerDefaultProviderName, sets.NewString(predicateOne), sets.NewString("PriorityOne"))
|
||||
|
||||
configData := []byte(`{
|
||||
"kind" : "Policy",
|
||||
@ -255,7 +267,7 @@ func TestCreateFromConfigWithUnspecifiedPredicatesOrPriorities(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create scheduler from configuration: %v", err)
|
||||
}
|
||||
if _, found := c.Algorithm.Predicates()["PredicateOne"]; !found {
|
||||
if !foundPlugin(c.Plugins.Filter.Enabled, predicateOne) {
|
||||
t.Errorf("Expected predicate PredicateOne from %q", schedulerapi.SchedulerDefaultProviderName)
|
||||
}
|
||||
if len(c.Algorithm.Prioritizers()) != 1 || c.Algorithm.Prioritizers()[0].Name != "PriorityOne" {
|
||||
@ -263,6 +275,15 @@ func TestCreateFromConfigWithUnspecifiedPredicatesOrPriorities(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func foundPlugin(plugins []schedulerapi.Plugin, name string) bool {
|
||||
for _, plugin := range plugins {
|
||||
if plugin.Name == name {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Test configures a scheduler from a policy that contains empty
|
||||
// predicate/priority.
|
||||
// Empty predicate/priority sets will be used.
|
||||
|
@ -48,7 +48,7 @@ func (pl *InterPodAffinity) Name() string {
|
||||
|
||||
// Filter invoked at the filter extension point.
|
||||
func (pl *InterPodAffinity) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status {
|
||||
meta, ok := migration.PredicateMetadata(cycleState).(predicates.Metadata)
|
||||
meta, ok := migration.CovertStateRefToPredMeta(migration.PredicateMetadata(cycleState))
|
||||
if !ok {
|
||||
return migration.ErrorToFrameworkStatus(fmt.Errorf("%+v convert to predicates.Metadata error", cycleState))
|
||||
}
|
||||
|
@ -124,3 +124,13 @@ func PredicateMetadata(state *framework.CycleState) interface{} {
|
||||
}
|
||||
return meta
|
||||
}
|
||||
|
||||
// CovertStateRefToPredMeta checks if 'stateRef' is nil, if it is, return nil;
|
||||
// otherwise covert it to predicates metadata and return.
|
||||
func CovertStateRefToPredMeta(stateRef interface{}) (predicates.Metadata, bool) {
|
||||
if stateRef == nil {
|
||||
return nil, true
|
||||
}
|
||||
meta, ok := stateRef.(predicates.Metadata)
|
||||
return meta, ok
|
||||
}
|
||||
|
@ -43,7 +43,7 @@ func (f *Fit) Name() string {
|
||||
|
||||
// Filter invoked at the filter extension point.
|
||||
func (f *Fit) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status {
|
||||
meta, ok := migration.PredicateMetadata(cycleState).(predicates.Metadata)
|
||||
meta, ok := migration.CovertStateRefToPredMeta(migration.PredicateMetadata(cycleState))
|
||||
if !ok {
|
||||
return migration.ErrorToFrameworkStatus(fmt.Errorf("%+v convert to predicates.Metadata error", cycleState))
|
||||
}
|
||||
|
@ -47,7 +47,7 @@ func (pl *PodTopologySpread) Name() string {
|
||||
|
||||
// Filter invoked at the filter extension point.
|
||||
func (pl *PodTopologySpread) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status {
|
||||
meta, ok := migration.PredicateMetadata(cycleState).(predicates.Metadata)
|
||||
meta, ok := migration.CovertStateRefToPredMeta(migration.PredicateMetadata(cycleState))
|
||||
if !ok {
|
||||
return migration.ErrorToFrameworkStatus(fmt.Errorf("%+v convert to predicates.Metadata error", cycleState))
|
||||
}
|
||||
|
@ -85,7 +85,7 @@ func (pl *ServiceAffinity) Name() string {
|
||||
|
||||
// Filter invoked at the filter extension point.
|
||||
func (pl *ServiceAffinity) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status {
|
||||
meta, ok := migration.PredicateMetadata(cycleState).(predicates.Metadata)
|
||||
meta, ok := migration.CovertStateRefToPredMeta(migration.PredicateMetadata(cycleState))
|
||||
if !ok {
|
||||
return framework.NewStatus(framework.Error, "looking up Metadata")
|
||||
}
|
||||
|
@ -49,12 +49,16 @@ import (
|
||||
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
"k8s.io/kubernetes/pkg/scheduler/core"
|
||||
frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||
fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake"
|
||||
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
||||
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
||||
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
|
||||
st "k8s.io/kubernetes/pkg/scheduler/testing"
|
||||
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
|
||||
)
|
||||
|
||||
@ -368,8 +372,9 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
|
||||
scache.AddNode(&node)
|
||||
client := clientsetfake.NewSimpleClientset(&node)
|
||||
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
||||
predicateMap := map[string]predicates.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts}
|
||||
scheduler, bindingChan, _ := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, informerFactory, stop, predicateMap, pod, &node)
|
||||
|
||||
f := st.RegisterFilterPlugin("PodFitsHostPorts", nodeports.New)
|
||||
scheduler, bindingChan, _ := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, informerFactory, stop, f, pod, &node)
|
||||
|
||||
waitPodExpireChan := make(chan struct{})
|
||||
timeout := make(chan struct{})
|
||||
@ -431,8 +436,8 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
|
||||
scache.AddNode(&node)
|
||||
client := clientsetfake.NewSimpleClientset(&node)
|
||||
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
||||
predicateMap := map[string]predicates.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts}
|
||||
scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, informerFactory, stop, predicateMap, firstPod, &node)
|
||||
f := st.RegisterFilterPlugin(nodeports.Name, nodeports.New)
|
||||
scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, informerFactory, stop, f, firstPod, &node)
|
||||
|
||||
// We use conflicted pod ports to incur fit predicate failure.
|
||||
secondPod := podWithPort("bar", "", 8080)
|
||||
@ -444,12 +449,15 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
|
||||
select {
|
||||
case err := <-errChan:
|
||||
expectErr := &core.FitError{
|
||||
Pod: secondPod,
|
||||
NumAllNodes: 1,
|
||||
FailedPredicates: core.FailedPredicateMap{
|
||||
node.Name: []predicates.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
|
||||
Pod: secondPod,
|
||||
NumAllNodes: 1,
|
||||
FailedPredicates: core.FailedPredicateMap{},
|
||||
FilteredNodesStatuses: framework.NodeToStatusMap{
|
||||
node.Name: framework.NewStatus(
|
||||
framework.Unschedulable,
|
||||
predicates.ErrPodNotFitsHostPorts.GetReason(),
|
||||
),
|
||||
},
|
||||
FilteredNodesStatuses: framework.NodeToStatusMap{},
|
||||
}
|
||||
if !reflect.DeepEqual(expectErr, err) {
|
||||
t.Errorf("err want=%v, get=%v", expectErr, err)
|
||||
@ -523,10 +531,10 @@ func TestSchedulerErrorWithLongBinding(t *testing.T) {
|
||||
|
||||
client := clientsetfake.NewSimpleClientset(&node)
|
||||
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
||||
predicateMap := map[string]predicates.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts}
|
||||
f := st.RegisterFilterPlugin(nodeports.Name, nodeports.New)
|
||||
|
||||
scheduler, bindingChan := setupTestSchedulerLongBindingWithRetry(
|
||||
queuedPodStore, scache, informerFactory, predicateMap, stop, test.BindingDuration)
|
||||
queuedPodStore, scache, informerFactory, f, stop, test.BindingDuration)
|
||||
|
||||
informerFactory.Start(stop)
|
||||
informerFactory.WaitForCacheSync(stop)
|
||||
@ -558,9 +566,9 @@ func TestSchedulerErrorWithLongBinding(t *testing.T) {
|
||||
// queuedPodStore: pods queued before processing.
|
||||
// cache: scheduler cache that might contain assumed pods.
|
||||
func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcache.FIFO, scache internalcache.Cache,
|
||||
informerFactory informers.SharedInformerFactory, stop chan struct{}, predicateMap map[string]predicates.FitPredicate, pod *v1.Pod, node *v1.Node) (*Scheduler, chan *v1.Binding, chan error) {
|
||||
informerFactory informers.SharedInformerFactory, stop chan struct{}, f st.RegisterFilterPluginFunc, pod *v1.Pod, node *v1.Node) (*Scheduler, chan *v1.Binding, chan error) {
|
||||
|
||||
scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, predicateMap, nil)
|
||||
scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, f, nil)
|
||||
|
||||
informerFactory.Start(stop)
|
||||
informerFactory.WaitForCacheSync(stop)
|
||||
@ -630,19 +638,18 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
|
||||
}
|
||||
client := clientsetfake.NewSimpleClientset(objects...)
|
||||
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
||||
predicateMap := map[string]predicates.FitPredicate{
|
||||
"PodFitsResources": predicates.PodFitsResources,
|
||||
}
|
||||
|
||||
// Create expected failure reasons for all the nodes. Hopefully they will get rolled up into a non-spammy summary.
|
||||
failedPredicatesMap := core.FailedPredicateMap{}
|
||||
// Create expected failure reasons for all the nodes. Hopefully they will get rolled up into a non-spammy summary.
|
||||
failedNodeStatues := framework.NodeToStatusMap{}
|
||||
for _, node := range nodes {
|
||||
failedPredicatesMap[node.Name] = []predicates.PredicateFailureReason{
|
||||
predicates.NewInsufficientResourceError(v1.ResourceCPU, 4000, 0, 2000),
|
||||
predicates.NewInsufficientResourceError(v1.ResourceMemory, 500, 0, 100),
|
||||
}
|
||||
failedNodeStatues[node.Name] = framework.NewStatus(
|
||||
framework.Unschedulable,
|
||||
predicates.NewInsufficientResourceError(v1.ResourceCPU, 4000, 0, 2000).GetReason(),
|
||||
predicates.NewInsufficientResourceError(v1.ResourceMemory, 500, 0, 100).GetReason(),
|
||||
)
|
||||
}
|
||||
scheduler, _, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, predicateMap, nil)
|
||||
f := st.RegisterFilterPlugin("PodFitsResources", noderesources.NewFit)
|
||||
scheduler, _, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, f, nil)
|
||||
|
||||
informerFactory.Start(stop)
|
||||
informerFactory.WaitForCacheSync(stop)
|
||||
@ -654,8 +661,8 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
|
||||
expectErr := &core.FitError{
|
||||
Pod: podWithTooBigResourceRequests,
|
||||
NumAllNodes: len(nodes),
|
||||
FailedPredicates: failedPredicatesMap,
|
||||
FilteredNodesStatuses: framework.NodeToStatusMap{},
|
||||
FailedPredicates: core.FailedPredicateMap{},
|
||||
FilteredNodesStatuses: failedNodeStatues,
|
||||
}
|
||||
if len(fmt.Sprint(expectErr)) > 150 {
|
||||
t.Errorf("message is too spammy ! %v ", len(fmt.Sprint(expectErr)))
|
||||
@ -670,16 +677,23 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
|
||||
|
||||
// queuedPodStore: pods queued before processing.
|
||||
// scache: scheduler cache that might contain assumed pods.
|
||||
func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]predicates.FitPredicate, recorder events.EventRecorder) (*Scheduler, chan *v1.Binding, chan error) {
|
||||
func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, f st.RegisterFilterPluginFunc, recorder events.EventRecorder) (*Scheduler, chan *v1.Binding, chan error) {
|
||||
registry := framework.Registry{}
|
||||
plugins := &schedulerapi.Plugins{
|
||||
Filter: &schedulerapi.PluginSet{},
|
||||
}
|
||||
var pluginConfigs []schedulerapi.PluginConfig
|
||||
f(®istry, plugins, pluginConfigs)
|
||||
fwk, _ := framework.NewFramework(registry, plugins, pluginConfigs)
|
||||
algo := core.NewGenericScheduler(
|
||||
scache,
|
||||
internalqueue.NewSchedulingQueue(nil),
|
||||
predicateMap,
|
||||
nil,
|
||||
predicates.EmptyMetadataProducer,
|
||||
[]priorities.PriorityConfig{},
|
||||
priorities.EmptyMetadataProducer,
|
||||
nodeinfosnapshot.NewEmptySnapshot(),
|
||||
emptyFramework,
|
||||
fwk,
|
||||
[]algorithm.SchedulerExtender{},
|
||||
nil,
|
||||
informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
|
||||
@ -710,7 +724,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
|
||||
Recorder: &events.FakeRecorder{},
|
||||
podConditionUpdater: fakePodConditionUpdater{},
|
||||
podPreemptor: fakePodPreemptor{},
|
||||
Framework: emptyFramework,
|
||||
Framework: fwk,
|
||||
VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
|
||||
}
|
||||
|
||||
@ -721,17 +735,24 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
|
||||
return sched, bindingChan, errChan
|
||||
}
|
||||
|
||||
func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]predicates.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) {
|
||||
func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, f st.RegisterFilterPluginFunc, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) {
|
||||
registry := framework.Registry{}
|
||||
plugins := &schedulerapi.Plugins{
|
||||
Filter: &schedulerapi.PluginSet{},
|
||||
}
|
||||
var pluginConfigs []schedulerapi.PluginConfig
|
||||
f(®istry, plugins, pluginConfigs)
|
||||
fwk, _ := framework.NewFramework(registry, plugins, pluginConfigs)
|
||||
queue := internalqueue.NewSchedulingQueue(nil)
|
||||
algo := core.NewGenericScheduler(
|
||||
scache,
|
||||
queue,
|
||||
predicateMap,
|
||||
nil,
|
||||
predicates.EmptyMetadataProducer,
|
||||
[]priorities.PriorityConfig{},
|
||||
priorities.EmptyMetadataProducer,
|
||||
nodeinfosnapshot.NewEmptySnapshot(),
|
||||
emptyFramework,
|
||||
fwk,
|
||||
[]algorithm.SchedulerExtender{},
|
||||
nil,
|
||||
informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
|
||||
@ -766,7 +787,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
|
||||
podConditionUpdater: fakePodConditionUpdater{},
|
||||
podPreemptor: fakePodPreemptor{},
|
||||
StopEverything: stop,
|
||||
Framework: emptyFramework,
|
||||
Framework: fwk,
|
||||
VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
|
||||
SchedulingQueue: queue,
|
||||
}
|
||||
@ -788,12 +809,12 @@ func setupTestSchedulerWithVolumeBinding(fakeVolumeBinder *volumebinder.VolumeBi
|
||||
client := clientsetfake.NewSimpleClientset(&testNode, &testPVC)
|
||||
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
||||
|
||||
predicateMap := map[string]predicates.FitPredicate{
|
||||
predicates.CheckVolumeBindingPred: predicates.NewVolumeBindingPredicate(fakeVolumeBinder),
|
||||
}
|
||||
|
||||
recorder := broadcaster.NewRecorder(scheme.Scheme, "scheduler")
|
||||
s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, predicateMap, recorder)
|
||||
volumeBindingNewFunc := func(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
|
||||
return volumebinding.NewFromVolumeBinder(fakeVolumeBinder), nil
|
||||
}
|
||||
f := st.RegisterFilterPlugin(volumebinding.Name, volumeBindingNewFunc)
|
||||
s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, f, recorder)
|
||||
informerFactory.Start(stop)
|
||||
informerFactory.WaitForCacheSync(stop)
|
||||
s.VolumeBinder = fakeVolumeBinder
|
||||
@ -884,7 +905,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
|
||||
FindErr: findErr,
|
||||
},
|
||||
eventReason: "FailedScheduling",
|
||||
expectError: findErr,
|
||||
expectError: fmt.Errorf("error while running %q filter plugin for pod %q: %v", volumebinding.Name, "foo", findErr),
|
||||
},
|
||||
{
|
||||
name: "assume error",
|
||||
|
@ -5,11 +5,14 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"framework_helpers.go",
|
||||
"workload_prep.go",
|
||||
"wrappers.go",
|
||||
],
|
||||
importpath = "k8s.io/kubernetes/pkg/scheduler/testing",
|
||||
deps = [
|
||||
"//pkg/scheduler/apis/config:go_default_library",
|
||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
],
|
||||
|
37
pkg/scheduler/testing/framework_helpers.go
Normal file
37
pkg/scheduler/testing/framework_helpers.go
Normal file
@ -0,0 +1,37 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package testing
|
||||
|
||||
import (
|
||||
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
)
|
||||
|
||||
// RegisterFilterPluginFunc is a function signature used in method RegisterFilterPlugin()
|
||||
// to register a Filter Plugin to a given registry.
|
||||
type RegisterFilterPluginFunc func(reg *framework.Registry, plugins *schedulerapi.Plugins, pluginConfigs []schedulerapi.PluginConfig)
|
||||
|
||||
// RegisterFilterPlugin returns a function to register a Filter Plugin to a given registry.
|
||||
func RegisterFilterPlugin(pluginName string, pluginNewFunc framework.PluginFactory) RegisterFilterPluginFunc {
|
||||
return func(reg *framework.Registry, plugins *schedulerapi.Plugins, pluginConfigs []schedulerapi.PluginConfig) {
|
||||
reg.Register(pluginName, pluginNewFunc)
|
||||
plugins.Filter.Enabled = append(plugins.Filter.Enabled, schedulerapi.Plugin{Name: pluginName})
|
||||
//lint:ignore SA4006 this value of pluginConfigs is never used.
|
||||
//lint:ignore SA4010 this result of append is never used.
|
||||
pluginConfigs = append(pluginConfigs, schedulerapi.PluginConfig{Name: pluginName})
|
||||
}
|
||||
}
|
@ -32,6 +32,7 @@ go_test(
|
||||
"//pkg/scheduler/algorithmprovider:go_default_library",
|
||||
"//pkg/scheduler/apis/config:go_default_library",
|
||||
"//pkg/scheduler/apis/extender/v1:go_default_library",
|
||||
"//pkg/scheduler/framework/plugins/noderesources:go_default_library",
|
||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||
"//pkg/scheduler/nodeinfo:go_default_library",
|
||||
"//pkg/scheduler/testing:go_default_library",
|
||||
|
@ -30,6 +30,7 @@ import (
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
scheduler "k8s.io/kubernetes/pkg/scheduler"
|
||||
schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
||||
)
|
||||
@ -1450,6 +1451,11 @@ func TestPreemptWithPermitPlugin(t *testing.T) {
|
||||
// Create a plugin registry for testing. Register only a permit plugin.
|
||||
permitPlugin := &PermitPlugin{}
|
||||
registry, plugins := initRegistryAndConfig(permitPlugin)
|
||||
// Fit filter plugin must be registered.
|
||||
registry.Register(noderesources.FitName, noderesources.NewFit)
|
||||
plugins.Filter = &schedulerconfig.PluginSet{
|
||||
Enabled: []schedulerconfig.Plugin{{Name: noderesources.FitName}},
|
||||
}
|
||||
|
||||
// Create the master and the scheduler with the test plugin set.
|
||||
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "preempt-with-permit-plugin", nil), 0,
|
||||
|
Loading…
Reference in New Issue
Block a user