Eliminate running paths of Predicates in scheduler

- eliminate running paths of Predicates in scheduler; use Filter Plugins instead.
- refactor all unit tests
- adjust the TestPreemptWithPermitPlugin integration test
This commit is contained in:
Wei Huang 2019-12-10 10:12:03 -08:00
parent 30a5db136f
commit dc3d1bd238
No known key found for this signature in database
GPG Key ID: BE5E9752F8B6E005
17 changed files with 720 additions and 505 deletions

View File

@ -77,13 +77,17 @@ go_test(
"//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/framework/plugins:go_default_library",
"//pkg/scheduler/framework/plugins/nodelabel:go_default_library",
"//pkg/scheduler/framework/plugins/nodeports:go_default_library",
"//pkg/scheduler/framework/plugins/noderesources:go_default_library",
"//pkg/scheduler/framework/plugins/serviceaffinity:go_default_library",
"//pkg/scheduler/framework/plugins/volumebinding:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/cache/fake:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//pkg/scheduler/testing:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/events/v1beta1:go_default_library",

View File

@ -71,6 +71,7 @@ var (
schedulerFactoryMutex sync.RWMutex
// maps that hold registered algorithm types
// TODO(Huang-Wei): remove this.
fitPredicateMap = make(map[string]FitPredicateFactory)
mandatoryFitPredicates = sets.NewString()
priorityFunctionMap = make(map[string]PriorityConfigFactory)
@ -143,6 +144,7 @@ func ApplyPredicatesAndPriorities(s *Snapshot) {
// RegisterFitPredicate registers a fit predicate with the algorithm
// registry. Returns the name with which the predicate was registered.
// TODO(Huang-Wei): remove this.
func RegisterFitPredicate(name string, predicate predicates.FitPredicate) string {
return RegisterFitPredicateFactory(name, func(AlgorithmFactoryArgs) predicates.FitPredicate { return predicate })
}

View File

@ -55,6 +55,9 @@ go_test(
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/apis/extender/v1:go_default_library",
"//pkg/scheduler/framework/plugins/interpodaffinity:go_default_library",
"//pkg/scheduler/framework/plugins/noderesources:go_default_library",
"//pkg/scheduler/framework/plugins/podtopologyspread:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
@ -62,6 +65,7 @@ go_test(
"//pkg/scheduler/listers/fake:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//pkg/scheduler/testing:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",

View File

@ -42,6 +42,7 @@ import (
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
st "k8s.io/kubernetes/pkg/scheduler/testing"
"k8s.io/kubernetes/pkg/scheduler/util"
)
@ -348,16 +349,16 @@ var _ algorithm.SchedulerExtender = &FakeExtender{}
func TestGenericSchedulerWithExtenders(t *testing.T) {
tests := []struct {
name string
predicates map[string]predicates.FitPredicate
prioritizers []priorities.PriorityConfig
extenders []FakeExtender
nodes []string
expectedResult ScheduleResult
expectsErr bool
name string
registerFilterPlugin st.RegisterFilterPluginFunc
prioritizers []priorities.PriorityConfig
extenders []FakeExtender
nodes []string
expectedResult ScheduleResult
expectsErr bool
}{
{
predicates: map[string]predicates.FitPredicate{"true": truePredicate},
registerFilterPlugin: st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
extenders: []FakeExtender{
{
predicates: []fitPredicate{truePredicateExtender},
@ -371,7 +372,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
name: "test 1",
},
{
predicates: map[string]predicates.FitPredicate{"true": truePredicate},
registerFilterPlugin: st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
extenders: []FakeExtender{
{
predicates: []fitPredicate{truePredicateExtender},
@ -385,7 +386,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
name: "test 2",
},
{
predicates: map[string]predicates.FitPredicate{"true": truePredicate},
registerFilterPlugin: st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
extenders: []FakeExtender{
{
predicates: []fitPredicate{truePredicateExtender},
@ -403,7 +404,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
name: "test 3",
},
{
predicates: map[string]predicates.FitPredicate{"true": truePredicate},
registerFilterPlugin: st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
extenders: []FakeExtender{
{
predicates: []fitPredicate{machine2PredicateExtender},
@ -417,7 +418,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
name: "test 4",
},
{
predicates: map[string]predicates.FitPredicate{"true": truePredicate},
registerFilterPlugin: st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
extenders: []FakeExtender{
{
predicates: []fitPredicate{truePredicateExtender},
@ -434,7 +435,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
name: "test 5",
},
{
predicates: map[string]predicates.FitPredicate{"true": truePredicate},
registerFilterPlugin: st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
extenders: []FakeExtender{
{
predicates: []fitPredicate{truePredicateExtender},
@ -456,8 +457,8 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
name: "test 6",
},
{
predicates: map[string]predicates.FitPredicate{"true": truePredicate},
prioritizers: []priorities.PriorityConfig{{Map: machine2Prioritizer, Weight: 20}},
registerFilterPlugin: st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
prioritizers: []priorities.PriorityConfig{{Map: machine2Prioritizer, Weight: 20}},
extenders: []FakeExtender{
{
predicates: []fitPredicate{truePredicateExtender},
@ -481,8 +482,8 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
// If scheduler sends the pod by mistake, the test would fail
// because of the errors from errorPredicateExtender and/or
// errorPrioritizerExtender.
predicates: map[string]predicates.FitPredicate{"true": truePredicate},
prioritizers: []priorities.PriorityConfig{{Map: machine2Prioritizer, Weight: 1}},
registerFilterPlugin: st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
prioritizers: []priorities.PriorityConfig{{Map: machine2Prioritizer, Weight: 1}},
extenders: []FakeExtender{
{
predicates: []fitPredicate{errorPredicateExtender},
@ -505,7 +506,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
//
// If scheduler did not ignore the extender, the test would fail
// because of the errors from errorPredicateExtender.
predicates: map[string]predicates.FitPredicate{"true": truePredicate},
registerFilterPlugin: st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
extenders: []FakeExtender{
{
predicates: []fitPredicate{errorPredicateExtender},
@ -540,15 +541,24 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
cache.AddNode(createNode(name))
}
queue := internalqueue.NewSchedulingQueue(nil)
registry := framework.Registry{}
plugins := &schedulerapi.Plugins{
Filter: &schedulerapi.PluginSet{},
}
var pluginConfigs []schedulerapi.PluginConfig
test.registerFilterPlugin(&registry, plugins, pluginConfigs)
fwk, _ := framework.NewFramework(registry, plugins, pluginConfigs)
scheduler := NewGenericScheduler(
cache,
queue,
test.predicates,
nil,
predicates.EmptyMetadataProducer,
test.prioritizers,
priorities.EmptyMetadataProducer,
emptySnapshot,
emptyFramework,
fwk,
extenders,
nil,
informerFactory.Core().V1().PersistentVolumeClaims().Lister(),

View File

@ -72,8 +72,9 @@ type FailedPredicateMap map[string][]predicates.PredicateFailureReason
// FitError describes a fit error of a pod.
type FitError struct {
Pod *v1.Pod
NumAllNodes int
Pod *v1.Pod
NumAllNodes int
// TODO(Huang-Wei): remove 'FailedPredicates'
FailedPredicates FailedPredicateMap
FilteredNodesStatuses framework.NodeToStatusMap
}
@ -476,12 +477,13 @@ func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes i
// Filters the nodes to find the ones that fit based on the given predicate functions
// Each node is passed through the predicate functions to determine if it is a fit
// TODO(Huang-Wei): remove 'FailedPredicateMap' from the return parameters.
func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, FailedPredicateMap, framework.NodeToStatusMap, error) {
var filtered []*v1.Node
failedPredicateMap := FailedPredicateMap{}
filteredNodesStatuses := framework.NodeToStatusMap{}
if len(g.predicates) == 0 && !g.framework.HasFilterPlugins() {
if !g.framework.HasFilterPlugins() {
filtered = g.nodeInfoSnapshot.ListNodes()
} else {
allNodes := len(g.nodeInfoSnapshot.NodeInfoList)
@ -506,7 +508,7 @@ func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framewor
// We check the nodes starting from where we left off in the previous scheduling cycle,
// this is to make sure all nodes have the same chance of being examined across pods.
nodeInfo := g.nodeInfoSnapshot.NodeInfoList[(g.nextStartNodeIndex+i)%allNodes]
fits, failedPredicates, status, err := g.podFitsOnNode(
fits, _, status, err := g.podFitsOnNode(
ctx,
state,
pod,
@ -531,9 +533,6 @@ func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framewor
if !status.IsSuccess() {
filteredNodesStatuses[nodeInfo.Node().Name] = status
}
if len(failedPredicates) != 0 {
failedPredicateMap[nodeInfo.Node().Name] = failedPredicates
}
predicateResultLock.Unlock()
}
}
@ -566,6 +565,7 @@ func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framewor
return []*v1.Node{}, FailedPredicateMap{}, framework.NodeToStatusMap{}, err
}
// TODO(Huang-Wei): refactor this to fill 'filteredNodesStatuses' instead of 'failedPredicateMap'.
for failedNodeName, failedMsg := range failedMap {
if _, found := failedPredicateMap[failedNodeName]; !found {
failedPredicateMap[failedNodeName] = []predicates.PredicateFailureReason{}
@ -584,6 +584,7 @@ func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framewor
// addNominatedPods adds pods with equal or greater priority which are nominated
// to run on the node given in nodeInfo to meta and nodeInfo. It returns 1) whether
// any pod was added, 2) augmented metadata, 3) augmented CycleState 4) augmented nodeInfo.
// TODO(Huang-Wei): remove 'meta predicates.Metadata' from the signature.
func (g *genericScheduler) addNominatedPods(ctx context.Context, pod *v1.Pod, meta predicates.Metadata, state *framework.CycleState,
nodeInfo *schedulernodeinfo.NodeInfo) (bool, predicates.Metadata,
*framework.CycleState, *schedulernodeinfo.NodeInfo, error) {
@ -662,12 +663,11 @@ func (g *genericScheduler) podFitsOnNode(
// nominated pods are running because they are not running right now and in fact,
// they may end up getting scheduled to a different node.
for i := 0; i < 2; i++ {
metaToUse := meta
stateToUse := state
nodeInfoToUse := info
if i == 0 {
var err error
podsAdded, metaToUse, stateToUse, nodeInfoToUse, err = g.addNominatedPods(ctx, pod, meta, state, info)
podsAdded, _, stateToUse, nodeInfoToUse, err = g.addNominatedPods(ctx, pod, meta, state, info)
if err != nil {
return false, []predicates.PredicateFailureReason{}, nil, err
}
@ -675,33 +675,6 @@ func (g *genericScheduler) podFitsOnNode(
break
}
for _, predicateKey := range predicates.Ordering() {
var (
fit bool
reasons []predicates.PredicateFailureReason
err error
)
if predicate, exist := g.predicates[predicateKey]; exist {
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
if err != nil {
return false, []predicates.PredicateFailureReason{}, nil, err
}
if !fit {
// eCache is available and valid, and predicates result is unfit, record the fail reasons
failedPredicates = append(failedPredicates, reasons...)
// if alwaysCheckAllPredicates is false, short circuit all predicates when one predicate fails.
if !alwaysCheckAllPredicates {
klog.V(5).Infoln("since alwaysCheckAllPredicates has not been set, the predicate " +
"evaluation is short circuited and there are chances " +
"of other predicates failing as well.")
break
}
}
}
}
status = g.framework.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
if !status.IsSuccess() && !status.IsUnschedulable() {
return false, failedPredicates, status, status.AsError()
@ -1270,6 +1243,7 @@ func podPassesBasicChecks(pod *v1.Pod, pvcLister corelisters.PersistentVolumeCla
}
// NewGenericScheduler creates a genericScheduler object.
// TODO(Huang-Wei): remove 'predicates' and 'alwaysCheckAllPredicates'.
func NewGenericScheduler(
cache internalcache.Cache,
podQueue internalqueue.SchedulingQueue,

File diff suppressed because it is too large Load Diff

View File

@ -231,16 +231,28 @@ func TestCreateFromEmptyConfig(t *testing.T) {
// Test configures a scheduler from a policy that does not specify any
// predicate/priority.
// The predicate/priority from DefaultProvider will be used.
// TODO(Huang-Wei): refactor (or remove) this test along with eliminating 'RegisterFitPredicate()'.
func TestCreateFromConfigWithUnspecifiedPredicatesOrPriorities(t *testing.T) {
predicateOne := "PredicateOne"
client := fake.NewSimpleClientset()
stopCh := make(chan struct{})
defer close(stopCh)
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight, stopCh)
factory.registry.Register(predicateOne, func(_ *runtime.Unknown, fh framework.FrameworkHandle) (framework.Plugin, error) {
return &TestPlugin{name: predicateOne}, nil
})
factory.pluginConfigProducerRegistry.RegisterPredicate(predicateOne, func(_ frameworkplugins.ConfigProducerArgs) (schedulerapi.Plugins, []schedulerapi.PluginConfig) {
return schedulerapi.Plugins{
Filter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{{Name: predicateOne}},
},
}, nil
})
RegisterFitPredicate("PredicateOne", PredicateFunc)
RegisterFitPredicate(predicateOne, PredicateFunc)
RegisterPriorityMapReduceFunction("PriorityOne", PriorityFunc, nil, 1)
RegisterAlgorithmProvider(schedulerapi.SchedulerDefaultProviderName, sets.NewString("PredicateOne"), sets.NewString("PriorityOne"))
RegisterAlgorithmProvider(schedulerapi.SchedulerDefaultProviderName, sets.NewString(predicateOne), sets.NewString("PriorityOne"))
configData := []byte(`{
"kind" : "Policy",
@ -255,7 +267,7 @@ func TestCreateFromConfigWithUnspecifiedPredicatesOrPriorities(t *testing.T) {
if err != nil {
t.Fatalf("Failed to create scheduler from configuration: %v", err)
}
if _, found := c.Algorithm.Predicates()["PredicateOne"]; !found {
if !foundPlugin(c.Plugins.Filter.Enabled, predicateOne) {
t.Errorf("Expected predicate PredicateOne from %q", schedulerapi.SchedulerDefaultProviderName)
}
if len(c.Algorithm.Prioritizers()) != 1 || c.Algorithm.Prioritizers()[0].Name != "PriorityOne" {
@ -263,6 +275,15 @@ func TestCreateFromConfigWithUnspecifiedPredicatesOrPriorities(t *testing.T) {
}
}
func foundPlugin(plugins []schedulerapi.Plugin, name string) bool {
for _, plugin := range plugins {
if plugin.Name == name {
return true
}
}
return false
}
// Test configures a scheduler from a policy that contains empty
// predicate/priority.
// Empty predicate/priority sets will be used.

View File

@ -48,7 +48,7 @@ func (pl *InterPodAffinity) Name() string {
// Filter invoked at the filter extension point.
func (pl *InterPodAffinity) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status {
meta, ok := migration.PredicateMetadata(cycleState).(predicates.Metadata)
meta, ok := migration.CovertStateRefToPredMeta(migration.PredicateMetadata(cycleState))
if !ok {
return migration.ErrorToFrameworkStatus(fmt.Errorf("%+v convert to predicates.Metadata error", cycleState))
}

View File

@ -124,3 +124,13 @@ func PredicateMetadata(state *framework.CycleState) interface{} {
}
return meta
}
// CovertStateRefToPredMeta checks if 'stateRef' is nil, if it is, return nil;
// otherwise covert it to predicates metadata and return.
func CovertStateRefToPredMeta(stateRef interface{}) (predicates.Metadata, bool) {
if stateRef == nil {
return nil, true
}
meta, ok := stateRef.(predicates.Metadata)
return meta, ok
}

View File

@ -43,7 +43,7 @@ func (f *Fit) Name() string {
// Filter invoked at the filter extension point.
func (f *Fit) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status {
meta, ok := migration.PredicateMetadata(cycleState).(predicates.Metadata)
meta, ok := migration.CovertStateRefToPredMeta(migration.PredicateMetadata(cycleState))
if !ok {
return migration.ErrorToFrameworkStatus(fmt.Errorf("%+v convert to predicates.Metadata error", cycleState))
}

View File

@ -47,7 +47,7 @@ func (pl *PodTopologySpread) Name() string {
// Filter invoked at the filter extension point.
func (pl *PodTopologySpread) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status {
meta, ok := migration.PredicateMetadata(cycleState).(predicates.Metadata)
meta, ok := migration.CovertStateRefToPredMeta(migration.PredicateMetadata(cycleState))
if !ok {
return migration.ErrorToFrameworkStatus(fmt.Errorf("%+v convert to predicates.Metadata error", cycleState))
}

View File

@ -85,7 +85,7 @@ func (pl *ServiceAffinity) Name() string {
// Filter invoked at the filter extension point.
func (pl *ServiceAffinity) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status {
meta, ok := migration.PredicateMetadata(cycleState).(predicates.Metadata)
meta, ok := migration.CovertStateRefToPredMeta(migration.PredicateMetadata(cycleState))
if !ok {
return framework.NewStatus(framework.Error, "looking up Metadata")
}

View File

@ -49,12 +49,16 @@ import (
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/core"
frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
st "k8s.io/kubernetes/pkg/scheduler/testing"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
)
@ -368,8 +372,9 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
scache.AddNode(&node)
client := clientsetfake.NewSimpleClientset(&node)
informerFactory := informers.NewSharedInformerFactory(client, 0)
predicateMap := map[string]predicates.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts}
scheduler, bindingChan, _ := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, informerFactory, stop, predicateMap, pod, &node)
f := st.RegisterFilterPlugin("PodFitsHostPorts", nodeports.New)
scheduler, bindingChan, _ := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, informerFactory, stop, f, pod, &node)
waitPodExpireChan := make(chan struct{})
timeout := make(chan struct{})
@ -431,8 +436,8 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
scache.AddNode(&node)
client := clientsetfake.NewSimpleClientset(&node)
informerFactory := informers.NewSharedInformerFactory(client, 0)
predicateMap := map[string]predicates.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts}
scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, informerFactory, stop, predicateMap, firstPod, &node)
f := st.RegisterFilterPlugin(nodeports.Name, nodeports.New)
scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, informerFactory, stop, f, firstPod, &node)
// We use conflicted pod ports to incur fit predicate failure.
secondPod := podWithPort("bar", "", 8080)
@ -444,12 +449,15 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
select {
case err := <-errChan:
expectErr := &core.FitError{
Pod: secondPod,
NumAllNodes: 1,
FailedPredicates: core.FailedPredicateMap{
node.Name: []predicates.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
Pod: secondPod,
NumAllNodes: 1,
FailedPredicates: core.FailedPredicateMap{},
FilteredNodesStatuses: framework.NodeToStatusMap{
node.Name: framework.NewStatus(
framework.Unschedulable,
predicates.ErrPodNotFitsHostPorts.GetReason(),
),
},
FilteredNodesStatuses: framework.NodeToStatusMap{},
}
if !reflect.DeepEqual(expectErr, err) {
t.Errorf("err want=%v, get=%v", expectErr, err)
@ -523,10 +531,10 @@ func TestSchedulerErrorWithLongBinding(t *testing.T) {
client := clientsetfake.NewSimpleClientset(&node)
informerFactory := informers.NewSharedInformerFactory(client, 0)
predicateMap := map[string]predicates.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts}
f := st.RegisterFilterPlugin(nodeports.Name, nodeports.New)
scheduler, bindingChan := setupTestSchedulerLongBindingWithRetry(
queuedPodStore, scache, informerFactory, predicateMap, stop, test.BindingDuration)
queuedPodStore, scache, informerFactory, f, stop, test.BindingDuration)
informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)
@ -558,9 +566,9 @@ func TestSchedulerErrorWithLongBinding(t *testing.T) {
// queuedPodStore: pods queued before processing.
// cache: scheduler cache that might contain assumed pods.
func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcache.FIFO, scache internalcache.Cache,
informerFactory informers.SharedInformerFactory, stop chan struct{}, predicateMap map[string]predicates.FitPredicate, pod *v1.Pod, node *v1.Node) (*Scheduler, chan *v1.Binding, chan error) {
informerFactory informers.SharedInformerFactory, stop chan struct{}, f st.RegisterFilterPluginFunc, pod *v1.Pod, node *v1.Node) (*Scheduler, chan *v1.Binding, chan error) {
scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, predicateMap, nil)
scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, f, nil)
informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)
@ -630,19 +638,18 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
}
client := clientsetfake.NewSimpleClientset(objects...)
informerFactory := informers.NewSharedInformerFactory(client, 0)
predicateMap := map[string]predicates.FitPredicate{
"PodFitsResources": predicates.PodFitsResources,
}
// Create expected failure reasons for all the nodes. Hopefully they will get rolled up into a non-spammy summary.
failedPredicatesMap := core.FailedPredicateMap{}
// Create expected failure reasons for all the nodes. Hopefully they will get rolled up into a non-spammy summary.
failedNodeStatues := framework.NodeToStatusMap{}
for _, node := range nodes {
failedPredicatesMap[node.Name] = []predicates.PredicateFailureReason{
predicates.NewInsufficientResourceError(v1.ResourceCPU, 4000, 0, 2000),
predicates.NewInsufficientResourceError(v1.ResourceMemory, 500, 0, 100),
}
failedNodeStatues[node.Name] = framework.NewStatus(
framework.Unschedulable,
predicates.NewInsufficientResourceError(v1.ResourceCPU, 4000, 0, 2000).GetReason(),
predicates.NewInsufficientResourceError(v1.ResourceMemory, 500, 0, 100).GetReason(),
)
}
scheduler, _, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, predicateMap, nil)
f := st.RegisterFilterPlugin("PodFitsResources", noderesources.NewFit)
scheduler, _, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, f, nil)
informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)
@ -654,8 +661,8 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
expectErr := &core.FitError{
Pod: podWithTooBigResourceRequests,
NumAllNodes: len(nodes),
FailedPredicates: failedPredicatesMap,
FilteredNodesStatuses: framework.NodeToStatusMap{},
FailedPredicates: core.FailedPredicateMap{},
FilteredNodesStatuses: failedNodeStatues,
}
if len(fmt.Sprint(expectErr)) > 150 {
t.Errorf("message is too spammy ! %v ", len(fmt.Sprint(expectErr)))
@ -670,16 +677,23 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
// queuedPodStore: pods queued before processing.
// scache: scheduler cache that might contain assumed pods.
func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]predicates.FitPredicate, recorder events.EventRecorder) (*Scheduler, chan *v1.Binding, chan error) {
func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, f st.RegisterFilterPluginFunc, recorder events.EventRecorder) (*Scheduler, chan *v1.Binding, chan error) {
registry := framework.Registry{}
plugins := &schedulerapi.Plugins{
Filter: &schedulerapi.PluginSet{},
}
var pluginConfigs []schedulerapi.PluginConfig
f(&registry, plugins, pluginConfigs)
fwk, _ := framework.NewFramework(registry, plugins, pluginConfigs)
algo := core.NewGenericScheduler(
scache,
internalqueue.NewSchedulingQueue(nil),
predicateMap,
nil,
predicates.EmptyMetadataProducer,
[]priorities.PriorityConfig{},
priorities.EmptyMetadataProducer,
nodeinfosnapshot.NewEmptySnapshot(),
emptyFramework,
fwk,
[]algorithm.SchedulerExtender{},
nil,
informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
@ -710,7 +724,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
Recorder: &events.FakeRecorder{},
podConditionUpdater: fakePodConditionUpdater{},
podPreemptor: fakePodPreemptor{},
Framework: emptyFramework,
Framework: fwk,
VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
}
@ -721,17 +735,24 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
return sched, bindingChan, errChan
}
func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]predicates.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) {
func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, f st.RegisterFilterPluginFunc, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) {
registry := framework.Registry{}
plugins := &schedulerapi.Plugins{
Filter: &schedulerapi.PluginSet{},
}
var pluginConfigs []schedulerapi.PluginConfig
f(&registry, plugins, pluginConfigs)
fwk, _ := framework.NewFramework(registry, plugins, pluginConfigs)
queue := internalqueue.NewSchedulingQueue(nil)
algo := core.NewGenericScheduler(
scache,
queue,
predicateMap,
nil,
predicates.EmptyMetadataProducer,
[]priorities.PriorityConfig{},
priorities.EmptyMetadataProducer,
nodeinfosnapshot.NewEmptySnapshot(),
emptyFramework,
fwk,
[]algorithm.SchedulerExtender{},
nil,
informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
@ -766,7 +787,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
podConditionUpdater: fakePodConditionUpdater{},
podPreemptor: fakePodPreemptor{},
StopEverything: stop,
Framework: emptyFramework,
Framework: fwk,
VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
SchedulingQueue: queue,
}
@ -788,12 +809,12 @@ func setupTestSchedulerWithVolumeBinding(fakeVolumeBinder *volumebinder.VolumeBi
client := clientsetfake.NewSimpleClientset(&testNode, &testPVC)
informerFactory := informers.NewSharedInformerFactory(client, 0)
predicateMap := map[string]predicates.FitPredicate{
predicates.CheckVolumeBindingPred: predicates.NewVolumeBindingPredicate(fakeVolumeBinder),
}
recorder := broadcaster.NewRecorder(scheme.Scheme, "scheduler")
s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, predicateMap, recorder)
volumeBindingNewFunc := func(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return volumebinding.NewFromVolumeBinder(fakeVolumeBinder), nil
}
f := st.RegisterFilterPlugin(volumebinding.Name, volumeBindingNewFunc)
s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, f, recorder)
informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)
s.VolumeBinder = fakeVolumeBinder
@ -884,7 +905,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
FindErr: findErr,
},
eventReason: "FailedScheduling",
expectError: findErr,
expectError: fmt.Errorf("error while running %q filter plugin for pod %q: %v", volumebinding.Name, "foo", findErr),
},
{
name: "assume error",

View File

@ -5,11 +5,14 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = [
"framework_helpers.go",
"workload_prep.go",
"wrappers.go",
],
importpath = "k8s.io/kubernetes/pkg/scheduler/testing",
deps = [
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
],

View File

@ -0,0 +1,37 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testing
import (
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
)
// RegisterFilterPluginFunc is a function signature used in method RegisterFilterPlugin()
// to register a Filter Plugin to a given registry.
type RegisterFilterPluginFunc func(reg *framework.Registry, plugins *schedulerapi.Plugins, pluginConfigs []schedulerapi.PluginConfig)
// RegisterFilterPlugin returns a function to register a Filter Plugin to a given registry.
func RegisterFilterPlugin(pluginName string, pluginNewFunc framework.PluginFactory) RegisterFilterPluginFunc {
return func(reg *framework.Registry, plugins *schedulerapi.Plugins, pluginConfigs []schedulerapi.PluginConfig) {
reg.Register(pluginName, pluginNewFunc)
plugins.Filter.Enabled = append(plugins.Filter.Enabled, schedulerapi.Plugin{Name: pluginName})
//lint:ignore SA4006 this value of pluginConfigs is never used.
//lint:ignore SA4010 this result of append is never used.
pluginConfigs = append(pluginConfigs, schedulerapi.PluginConfig{Name: pluginName})
}
}

View File

@ -32,6 +32,7 @@ go_test(
"//pkg/scheduler/algorithmprovider:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/apis/extender/v1:go_default_library",
"//pkg/scheduler/framework/plugins/noderesources:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/testing:go_default_library",

View File

@ -30,6 +30,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
scheduler "k8s.io/kubernetes/pkg/scheduler"
schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
@ -1450,6 +1451,11 @@ func TestPreemptWithPermitPlugin(t *testing.T) {
// Create a plugin registry for testing. Register only a permit plugin.
permitPlugin := &PermitPlugin{}
registry, plugins := initRegistryAndConfig(permitPlugin)
// Fit filter plugin must be registered.
registry.Register(noderesources.FitName, noderesources.NewFit)
plugins.Filter = &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{{Name: noderesources.FitName}},
}
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "preempt-with-permit-plugin", nil), 0,