diff --git a/pkg/scheduler/core/extender_test.go b/pkg/scheduler/core/extender_test.go index 97ece6448d2..3ab1aadf16b 100644 --- a/pkg/scheduler/core/extender_test.go +++ b/pkg/scheduler/core/extender_test.go @@ -29,6 +29,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" + framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" @@ -531,6 +532,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { extenders = append(extenders, &test.extenders[ii]) } cache := internalcache.New(time.Duration(0), wait.NeverStop) + fwk, _ := framework.NewFramework(EmptyPluginRegistry, nil) for _, name := range test.nodes { cache.AddNode(createNode(name)) } @@ -542,7 +544,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { predicates.EmptyPredicateMetadataProducer, test.prioritizers, priorities.EmptyPriorityMetadataProducer, - emptyPluginSet, + fwk, extenders, nil, schedulertesting.FakePersistentVolumeClaimLister{}, diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 851daaa4bbd..88592446cff 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -39,11 +39,11 @@ import ( "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" + framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" "k8s.io/kubernetes/pkg/scheduler/metrics" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" - pluginsv1alpha1 "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1" "k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/pkg/scheduler/volumebinder" utiltrace "k8s.io/utils/trace" @@ -158,11 +158,11 @@ type genericScheduler struct { priorityMetaProducer priorities.PriorityMetadataProducer predicateMetaProducer predicates.PredicateMetadataProducer prioritizers []priorities.PriorityConfig - pluginSet pluginsv1alpha1.PluginSet + framework framework.Framework extenders []algorithm.SchedulerExtender lastNodeIndex uint64 alwaysCheckAllPredicates bool - nodeInfoSnapshot internalcache.NodeInfoSnapshot + nodeInfoSnapshot *internalcache.NodeInfoSnapshot volumeBinder *volumebinder.VolumeBinder pvcLister corelisters.PersistentVolumeClaimLister pdbLister algorithm.PDBLister @@ -174,7 +174,7 @@ type genericScheduler struct { // functions. func (g *genericScheduler) snapshot() error { // Used for all fit and priority funcs. - return g.cache.UpdateNodeInfoSnapshot(&g.nodeInfoSnapshot) + return g.cache.UpdateNodeInfoSnapshot(g.nodeInfoSnapshot) } // Schedule tries to schedule the given pod to one of the nodes in the node list. @@ -1206,7 +1206,7 @@ func NewGenericScheduler( predicateMetaProducer predicates.PredicateMetadataProducer, prioritizers []priorities.PriorityConfig, priorityMetaProducer priorities.PriorityMetadataProducer, - pluginSet pluginsv1alpha1.PluginSet, + framework framework.Framework, extenders []algorithm.SchedulerExtender, volumeBinder *volumebinder.VolumeBinder, pvcLister corelisters.PersistentVolumeClaimLister, @@ -1222,9 +1222,9 @@ func NewGenericScheduler( predicateMetaProducer: predicateMetaProducer, prioritizers: prioritizers, priorityMetaProducer: priorityMetaProducer, - pluginSet: pluginSet, + framework: framework, extenders: extenders, - nodeInfoSnapshot: internalcache.NewNodeInfoSnapshot(), + nodeInfoSnapshot: framework.NodeInfoSnapshot(), volumeBinder: volumeBinder, pvcLister: pvcLister, pdbLister: pdbLister, diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index 2a7d9cf424a..86edcbceb23 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -38,10 +38,10 @@ import ( "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities" priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" + framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" - plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1" schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing" ) @@ -134,27 +134,9 @@ func getNodeReducePriority(pod *v1.Pod, meta interface{}, nodeNameToInfo map[str return nil } -// EmptyPluginSet is a test plugin set used by the default scheduler. -type EmptyPluginSet struct{} - -var _ plugins.PluginSet = EmptyPluginSet{} - -// ReservePlugins returns a slice of default reserve plugins. -func (r EmptyPluginSet) ReservePlugins() []plugins.ReservePlugin { - return []plugins.ReservePlugin{} -} - -// PrebindPlugins returns a slice of default prebind plugins. -func (r EmptyPluginSet) PrebindPlugins() []plugins.PrebindPlugin { - return []plugins.PrebindPlugin{} -} - -// Data returns a pointer to PluginData. -func (r EmptyPluginSet) Data() *plugins.PluginData { - return &plugins.PluginData{} -} - -var emptyPluginSet = &EmptyPluginSet{} +// EmptyPluginRegistry is a test plugin set used by the default scheduler. +var EmptyPluginRegistry = framework.Registry{} +var emptyFramework, _ = framework.NewFramework(EmptyPluginRegistry, nil) func makeNodeList(nodeNames []string) []*v1.Node { result := make([]*v1.Node, 0, len(nodeNames)) @@ -456,6 +438,7 @@ func TestGenericScheduler(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { cache := internalcache.New(time.Duration(0), wait.NeverStop) + fwk, _ := framework.NewFramework(EmptyPluginRegistry, nil) for _, pod := range test.pods { cache.AddPod(pod) } @@ -474,7 +457,7 @@ func TestGenericScheduler(t *testing.T) { algorithmpredicates.EmptyPredicateMetadataProducer, test.prioritizers, priorities.EmptyPriorityMetadataProducer, - emptyPluginSet, + fwk, []algorithm.SchedulerExtender{}, nil, pvcLister, @@ -498,6 +481,7 @@ func TestGenericScheduler(t *testing.T) { func makeScheduler(predicates map[string]algorithmpredicates.FitPredicate, nodes []*v1.Node) *genericScheduler { algorithmpredicates.SetPredicatesOrdering(order) cache := internalcache.New(time.Duration(0), wait.NeverStop) + fwk, _ := framework.NewFramework(EmptyPluginRegistry, nil) for _, n := range nodes { cache.AddNode(n) } @@ -510,10 +494,10 @@ func makeScheduler(predicates map[string]algorithmpredicates.FitPredicate, nodes algorithmpredicates.EmptyPredicateMetadataProducer, prioritizers, priorities.EmptyPriorityMetadataProducer, - emptyPluginSet, + fwk, nil, nil, nil, nil, false, false, schedulerapi.DefaultPercentageOfNodesToScore) - cache.UpdateNodeInfoSnapshot(&s.(*genericScheduler).nodeInfoSnapshot) + cache.UpdateNodeInfoSnapshot(s.(*genericScheduler).nodeInfoSnapshot) return s.(*genericScheduler) } @@ -1483,6 +1467,7 @@ func TestPreempt(t *testing.T) { t.Logf("===== Running test %v", t.Name()) stop := make(chan struct{}) cache := internalcache.New(time.Duration(0), stop) + fwk, _ := framework.NewFramework(EmptyPluginRegistry, nil) for _, pod := range test.pods { cache.AddPod(pod) } @@ -1509,7 +1494,7 @@ func TestPreempt(t *testing.T) { algorithmpredicates.EmptyPredicateMetadataProducer, []priorities.PriorityConfig{{Function: numericPriority, Weight: 1}}, priorities.EmptyPriorityMetadataProducer, - emptyPluginSet, + fwk, extenders, nil, schedulertesting.FakePersistentVolumeClaimLister{}, diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index 0c49942230a..46ab49386de 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -51,11 +51,10 @@ import ( schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" "k8s.io/kubernetes/pkg/scheduler/api/validation" "k8s.io/kubernetes/pkg/scheduler/core" + framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" - "k8s.io/kubernetes/pkg/scheduler/plugins" - pluginsv1alpha1 "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1" "k8s.io/kubernetes/pkg/scheduler/volumebinder" ) @@ -92,8 +91,8 @@ type Config struct { // PodPreemptor is used to evict pods and update 'NominatedNode' field of // the preemptor pod. PodPreemptor PodPreemptor - // PlugingSet has a set of plugins and data used to run them. - PluginSet pluginsv1alpha1.PluginSet + // Framework runs scheduler plugins at configured extension points. + Framework framework.Framework // NextPod should be a function that blocks until the next pod // is available. We don't use a channel for this, because scheduling @@ -183,8 +182,8 @@ type configFactory struct { pdbLister policylisters.PodDisruptionBudgetLister // a means to list all StorageClasses storageClassLister storagelisters.StorageClassLister - // pluginRunner has a set of plugins and the context used for running them. - pluginSet pluginsv1alpha1.PluginSet + // framework has a set of plugins and the context used for running them. + framework framework.Framework // Close this to stop all reflectors StopEverything <-chan struct{} @@ -238,6 +237,7 @@ type ConfigFactoryArgs struct { PercentageOfNodesToScore int32 BindTimeoutSeconds int64 StopCh <-chan struct{} + Registry framework.Registry } // NewConfigFactory initializes the default implementation of a Configurator. To encourage eventual privatization of the struct type, we only @@ -248,6 +248,11 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator { stopEverything = wait.NeverStop } schedulerCache := internalcache.New(30*time.Second, stopEverything) + // TODO(bsalamat): config files should be passed to the framework. + framework, err := framework.NewFramework(args.Registry, nil) + if err != nil { + klog.Fatalf("error initializing the scheduling framework: %v", err) + } // storageClassInformer is only enabled through VolumeScheduling feature gate var storageClassLister storagelisters.StorageClassLister @@ -267,6 +272,7 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator { statefulSetLister: args.StatefulSetInformer.Lister(), pdbLister: args.PdbInformer.Lister(), storageClassLister: storageClassLister, + framework: framework, schedulerCache: schedulerCache, StopEverything: stopEverything, schedulerName: args.SchedulerName, @@ -435,9 +441,6 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, return nil, err } - // TODO(bsalamat): the default registrar should be able to process config files. - c.pluginSet = plugins.NewDefaultPluginSet(pluginsv1alpha1.NewPluginContext(), &c.schedulerCache) - algo := core.NewGenericScheduler( c.schedulerCache, c.podQueue, @@ -445,7 +448,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, predicateMetaProducer, priorityConfigs, priorityMetaProducer, - c.pluginSet, + c.framework, extenders, c.volumeBinder, c.pVCLister, @@ -463,7 +466,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, GetBinder: getBinderFunc(c.client, extenders), PodConditionUpdater: &podConditionUpdater{c.client}, PodPreemptor: &podPreemptor{c.client}, - PluginSet: c.pluginSet, + Framework: c.framework, WaitForCacheSync: func() bool { return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced) }, diff --git a/pkg/scheduler/factory/factory_test.go b/pkg/scheduler/factory/factory_test.go index 7e039536994..631598090e8 100644 --- a/pkg/scheduler/factory/factory_test.go +++ b/pkg/scheduler/factory/factory_test.go @@ -39,6 +39,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" latestschedulerapi "k8s.io/kubernetes/pkg/scheduler/api/latest" + framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" @@ -494,6 +495,7 @@ func newConfigFactory(client clientset.Interface, hardPodAffinitySymmetricWeight schedulerapi.DefaultPercentageOfNodesToScore, bindTimeoutSeconds, stopCh, + framework.NewRegistry(), }) } diff --git a/pkg/scheduler/plugins/BUILD b/pkg/scheduler/framework/BUILD similarity index 100% rename from pkg/scheduler/plugins/BUILD rename to pkg/scheduler/framework/BUILD diff --git a/pkg/scheduler/plugins/examples/BUILD b/pkg/scheduler/framework/plugins/examples/BUILD similarity index 100% rename from pkg/scheduler/plugins/examples/BUILD rename to pkg/scheduler/framework/plugins/examples/BUILD diff --git a/pkg/scheduler/framework/plugins/examples/multipoint/multipoint.go b/pkg/scheduler/framework/plugins/examples/multipoint/multipoint.go new file mode 100644 index 00000000000..bf2d6131cef --- /dev/null +++ b/pkg/scheduler/framework/plugins/examples/multipoint/multipoint.go @@ -0,0 +1,68 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multipoint + +import ( + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" +) + +// CommunicatingPlugin is an example of a plugin that implements two +// extension points. It communicates through pluginContext with another function. +type CommunicatingPlugin struct{} + +var _ = framework.ReservePlugin(CommunicatingPlugin{}) + +// Name is the name of the plug used in Registry and configurations. +const Name = "multipoint-communicating-plugin" + +// Name returns name of the plugin. It is used in logs, etc. +func (mc CommunicatingPlugin) Name() string { + return Name +} + +// Reserve is the functions invoked by the framework at "reserve" extension point. +func (mc CommunicatingPlugin) Reserve(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status { + if pod == nil { + return framework.NewStatus(framework.Error, "pod cannot be nil") + } + if pod.Name == "my-test-pod" { + pc.Lock() + pc.Write(framework.ContextKey(pod.Name), "never bind") + pc.Unlock() + } + return nil +} + +// Prebind is the functions invoked by the framework at "prebind" extension point. +func (mc CommunicatingPlugin) Prebind(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status { + if pod == nil { + return framework.NewStatus(framework.Error, "pod cannot be nil") + } + pc.RLock() + defer pc.RUnlock() + if v, e := pc.Read(framework.ContextKey(pod.Name)); e == nil && v == "never bind" { + return framework.NewStatus(framework.Unschedulable, "pod is not permitted") + } + return nil +} + +// New initializes a new plugin and returns it. +func New(_ *runtime.Unknown, _ framework.Framework) (framework.Plugin, error) { + return &CommunicatingPlugin{}, nil +} diff --git a/pkg/scheduler/plugins/examples/prebind.go b/pkg/scheduler/framework/plugins/examples/prebind/prebind.go similarity index 52% rename from pkg/scheduler/plugins/examples/prebind.go rename to pkg/scheduler/framework/plugins/examples/prebind/prebind.go index 13e71eb0bd3..cb7217447e8 100644 --- a/pkg/scheduler/plugins/examples/prebind.go +++ b/pkg/scheduler/framework/plugins/examples/prebind/prebind.go @@ -1,5 +1,5 @@ /* -Copyright 2018 The Kubernetes Authors. +Copyright 2019 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,35 +14,42 @@ See the License for the specific language governing permissions and limitations under the License. */ -package examples +package prebind import ( "fmt" "k8s.io/api/core/v1" - plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1" + "k8s.io/apimachinery/pkg/runtime" + framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" ) // StatelessPrebindExample is an example of a simple plugin that has no state // and implements only one hook for prebind. type StatelessPrebindExample struct{} -var _ = plugins.PrebindPlugin(StatelessPrebindExample{}) +var _ = framework.PrebindPlugin(StatelessPrebindExample{}) + +// Name is the name of the plugin used in Registry and configurations. +const Name = "stateless-prebind-plugin-example" // Name returns name of the plugin. It is used in logs, etc. func (sr StatelessPrebindExample) Name() string { - return "stateless-prebind-plugin-example" + return Name } // Prebind is the functions invoked by the framework at "prebind" extension point. -func (sr StatelessPrebindExample) Prebind(ps plugins.PluginSet, pod *v1.Pod, nodeName string) (bool, error) { +func (sr StatelessPrebindExample) Prebind(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status { if pod == nil { - return false, fmt.Errorf("pod cannot be nil") + return framework.NewStatus(framework.Error, fmt.Sprintf("pod cannot be nil")) } - return true, nil + if pod.Namespace != "foo" { + return framework.NewStatus(framework.Unschedulable, "only pods from 'foo' namespace are allowed") + } + return nil } -// NewStatelessPrebindExample initializes a new plugin and returns it. -func NewStatelessPrebindExample() *StatelessPrebindExample { - return &StatelessPrebindExample{} +// New initializes a new plugin and returns it. +func New(_ *runtime.Unknown, _ framework.Framework) (framework.Plugin, error) { + return &StatelessPrebindExample{}, nil } diff --git a/pkg/scheduler/framework/plugins/examples/stateful/stateful.go b/pkg/scheduler/framework/plugins/examples/stateful/stateful.go new file mode 100644 index 00000000000..e42022a48bb --- /dev/null +++ b/pkg/scheduler/framework/plugins/examples/stateful/stateful.go @@ -0,0 +1,72 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package stateful + +import ( + "fmt" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/klog" + framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" +) + +// MultipointExample is an example plugin that is executed at multiple extension points. +// This plugin is stateful. It receives arguments at initialization (NewMultipointPlugin) +// and changes its state when it is executed. +type MultipointExample struct { + mpState map[int]string + numRuns int +} + +var _ = framework.ReservePlugin(&MultipointExample{}) +var _ = framework.PrebindPlugin(&MultipointExample{}) + +// Name is the name of the plug used in Registry and configurations. +const Name = "multipoint-plugin-example" + +// Name returns name of the plugin. It is used in logs, etc. +func (mp *MultipointExample) Name() string { + return Name +} + +// Reserve is the functions invoked by the framework at "reserve" extension point. +func (mp *MultipointExample) Reserve(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status { + mp.numRuns++ + return nil +} + +// Prebind is the functions invoked by the framework at "prebind" extension point. +func (mp *MultipointExample) Prebind(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status { + mp.numRuns++ + if pod == nil { + return framework.NewStatus(framework.Error, "pod must not be nil") + } + return nil +} + +// New initializes a new plugin and returns it. +func New(config *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) { + if config == nil { + klog.Error("MultipointExample configuration cannot be empty") + return nil, fmt.Errorf("MultipointExample configuration cannot be empty") + } + mp := MultipointExample{ + mpState: make(map[int]string), + } + return &mp, nil +} diff --git a/pkg/scheduler/plugins/v1alpha1/BUILD b/pkg/scheduler/framework/v1alpha1/BUILD similarity index 100% rename from pkg/scheduler/plugins/v1alpha1/BUILD rename to pkg/scheduler/framework/v1alpha1/BUILD diff --git a/pkg/scheduler/plugins/v1alpha1/context.go b/pkg/scheduler/framework/v1alpha1/context.go similarity index 73% rename from pkg/scheduler/plugins/v1alpha1/context.go rename to pkg/scheduler/framework/v1alpha1/context.go index 0631b5f0d8b..0dfe6a47335 100644 --- a/pkg/scheduler/plugins/v1alpha1/context.go +++ b/pkg/scheduler/framework/v1alpha1/context.go @@ -1,5 +1,5 @@ /* -Copyright 2018 The Kubernetes Authors. +Copyright 2019 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -37,7 +37,7 @@ type ContextKey string // PluginContext does not provide any data protection, as all plugins are assumed to be // trusted. type PluginContext struct { - Mx sync.RWMutex + mx sync.RWMutex storage map[ContextKey]ContextData } @@ -50,6 +50,8 @@ func NewPluginContext() *PluginContext { // Read retrieves data with the given "key" from PluginContext. If the key is not // present an error is returned. +// This function is not thread safe. In multi-threaded code, lock should be +// acquired first. func (c *PluginContext) Read(key ContextKey) (ContextData, error) { if v, ok := c.storage[key]; ok { return v, nil @@ -57,38 +59,36 @@ func (c *PluginContext) Read(key ContextKey) (ContextData, error) { return nil, errors.New(NotFound) } -// SyncRead is the thread safe version of Read(...). -func (c *PluginContext) SyncRead(key ContextKey) (ContextData, error) { - c.Mx.RLock() - defer c.Mx.RUnlock() - return c.Read(key) -} - // Write stores the given "val" in PluginContext with the given "key". +// This function is not thread safe. In multi-threaded code, lock should be +// acquired first. func (c *PluginContext) Write(key ContextKey, val ContextData) { c.storage[key] = val } -// SyncWrite is the thread safe version of Write(...). -func (c *PluginContext) SyncWrite(key ContextKey, val ContextData) { - c.Mx.Lock() - defer c.Mx.Unlock() - c.Write(key, val) -} - // Delete deletes data with the given key from PluginContext. +// This function is not thread safe. In multi-threaded code, lock should be +// acquired first. func (c *PluginContext) Delete(key ContextKey) { delete(c.storage, key) } -// SyncDelete is the thread safe version of Write(...). -func (c *PluginContext) SyncDelete(key ContextKey) { - c.Mx.Lock() - defer c.Mx.Unlock() - c.Delete(key) +// Lock acquires PluginContext lock. +func (c *PluginContext) Lock() { + c.mx.Lock() } -// Reset removes all the information in the PluginContext. -func (c *PluginContext) Reset() { - c.storage = make(map[ContextKey]ContextData) +// Unlock releases PluginContext lock. +func (c *PluginContext) Unlock() { + c.mx.Unlock() +} + +// RLock acquires PluginContext read lock. +func (c *PluginContext) RLock() { + c.mx.RLock() +} + +// RUnlock releases PluginContext read lock. +func (c *PluginContext) RUnlock() { + c.mx.RUnlock() } diff --git a/pkg/scheduler/framework/v1alpha1/interface.go b/pkg/scheduler/framework/v1alpha1/interface.go new file mode 100644 index 00000000000..49b4a235169 --- /dev/null +++ b/pkg/scheduler/framework/v1alpha1/interface.go @@ -0,0 +1,142 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This file defines the scheduling framework plugin interfaces. + +package v1alpha1 + +import ( + "errors" + + "k8s.io/api/core/v1" + internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" +) + +// Code is the Status code/type which is returned from plugins. +type Code int + +// These are predefined codes used in a Status. +const ( + // Success means that plugin ran correctly and found pod schedulable. + // NOTE: A nil status is also considered as "Success". + Success Code = 0 + // Error is used for internal plugin errors, unexpected input, etc. + Error Code = 1 + // Unschedulable is used when a plugin finds a pod unschedulable. + // The accompanying status message should explain why the pod is unschedulable. + Unschedulable Code = 2 +) + +// Status indicates the result of running a plugin. It consists of a code and a +// message. When the status code is not `Success`, the status message should +// explain why. +// NOTE: A nil Status is also considered as Success. +type Status struct { + code Code + message string +} + +// Code returns code of the Status. +func (s *Status) Code() Code { + if s == nil { + return Success + } + return s.code +} + +// Message returns message of the Status. +func (s *Status) Message() string { + return s.message +} + +// IsSuccess returns true if and only if "Status" is nil or Code is "Success". +func (s *Status) IsSuccess() bool { + if s == nil || s.code == Success { + return true + } + return false +} + +// AsError returns an "error" object with the same message as that of the Status. +func (s *Status) AsError() error { + if s.IsSuccess() { + return nil + } + return errors.New(s.message) +} + +// NewStatus makes a Status out of the given arguments and returns its pointer. +func NewStatus(code Code, msg string) *Status { + return &Status{ + code: code, + message: msg, + } +} + +// Plugin is the parent type for all the scheduling framework plugins. +type Plugin interface { + Name() string +} + +// ReservePlugin is an interface for Reserve plugins. These plugins are called +// at the reservation point. These are meant to update the state of the plugin. +// This concept used to be called 'assume' in the original scheduler. +// These plugins should return only Success or Error in Status.code. However, +// the scheduler accepts other valid codes as well. Anything other than Success +// will lead to rejection of the pod. +type ReservePlugin interface { + Plugin + // Reserve is called by the scheduling framework when the scheduler cache is + // updated. + Reserve(pc *PluginContext, p *v1.Pod, nodeName string) *Status +} + +// PrebindPlugin is an interface that must be implemented by "prebind" plugins. +// These plugins are called before a pod being scheduled +type PrebindPlugin interface { + Plugin + // Prebind is called before binding a pod. All prebind plugins must return + // success or the pod will be rejected and won't be sent for binding. + Prebind(pc *PluginContext, p *v1.Pod, nodeName string) *Status +} + +// Framework manages the set of plugins in use by the scheduling framework. +// Configured plugins are called at specified points in a scheduling context. +type Framework interface { + FrameworkHandle + // RunPrebindPlugins runs the set of configured prebind plugins. It returns + // *Status and its code is set to non-success if any of the plugins returns + // anything but Success. If the Status code is "Unschedulable", it is + // considered as a scheduling check failure, otherwise, it is considered as an + // internal error. In either case the pod is not going to be bound. + RunPrebindPlugins(pc *PluginContext, pod *v1.Pod, nodeName string) *Status + + // RunReservePlugins runs the set of configured reserve plugins. If any of these + // plugins returns an error, it does not continue running the remaining ones and + // returns the error. In such case, pod will not be scheduled. + RunReservePlugins(pc *PluginContext, pod *v1.Pod, nodeName string) *Status +} + +// FrameworkHandle provides data and some tools that plugins can use. It is +// passed to the plugin factories at the time of plugin initialization. Plugins +// must store and use this handle to call framework functions. +type FrameworkHandle interface { + // NodeInfoSnapshot return the latest NodeInfo snapshot. The snapshot + // is taken at the beginning of a scheduling cycle and remains unchanged until + // a pod finishes "Reserve" point. There is no guarantee that the information + // remains unchanged in the binding phase of scheduling. + NodeInfoSnapshot() *internalcache.NodeInfoSnapshot +} diff --git a/pkg/scheduler/internal/cache/cache.go b/pkg/scheduler/internal/cache/cache.go index 790a3bcc6e3..a663c0c0e22 100644 --- a/pkg/scheduler/internal/cache/cache.go +++ b/pkg/scheduler/internal/cache/cache.go @@ -122,8 +122,8 @@ func newNodeInfoListItem(ni *schedulernodeinfo.NodeInfo) *nodeInfoListItem { } // NewNodeInfoSnapshot initializes a NodeInfoSnapshot struct and returns it. -func NewNodeInfoSnapshot() NodeInfoSnapshot { - return NodeInfoSnapshot{ +func NewNodeInfoSnapshot() *NodeInfoSnapshot { + return &NodeInfoSnapshot{ NodeInfoMap: make(map[string]*schedulernodeinfo.NodeInfo), } } diff --git a/pkg/scheduler/internal/cache/cache_test.go b/pkg/scheduler/internal/cache/cache_test.go index 8bb3555b212..70e2f9f9ca0 100644 --- a/pkg/scheduler/internal/cache/cache_test.go +++ b/pkg/scheduler/internal/cache/cache_test.go @@ -1078,7 +1078,7 @@ func TestNodeOperators(t *testing.T) { // Case 2: dump cached nodes successfully. cachedNodes := NewNodeInfoSnapshot() - cache.UpdateNodeInfoSnapshot(&cachedNodes) + cache.UpdateNodeInfoSnapshot(cachedNodes) newNode, found := cachedNodes.NodeInfoMap[node.Name] if !found || len(cachedNodes.NodeInfoMap) != 1 { t.Errorf("failed to dump cached nodes:\n got: %v \nexpected: %v", cachedNodes, cache.nodes) @@ -1180,7 +1180,7 @@ func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) { } var cache *schedulerCache - var snapshot NodeInfoSnapshot + var snapshot *NodeInfoSnapshot type operation = func() addNode := func(i int) operation { @@ -1215,8 +1215,8 @@ func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) { } updateSnapshot := func() operation { return func() { - cache.UpdateNodeInfoSnapshot(&snapshot) - if err := compareCacheWithNodeInfoSnapshot(cache, &snapshot); err != nil { + cache.UpdateNodeInfoSnapshot(snapshot) + if err := compareCacheWithNodeInfoSnapshot(cache, snapshot); err != nil { t.Error(err) } } @@ -1356,8 +1356,8 @@ func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) { } // Always update the snapshot at the end of operations and compare it. - cache.UpdateNodeInfoSnapshot(&snapshot) - if err := compareCacheWithNodeInfoSnapshot(cache, &snapshot); err != nil { + cache.UpdateNodeInfoSnapshot(snapshot) + if err := compareCacheWithNodeInfoSnapshot(cache, snapshot); err != nil { t.Error(err) } }) @@ -1391,7 +1391,7 @@ func BenchmarkUpdate1kNodes30kPods(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { cachedNodes := NewNodeInfoSnapshot() - cache.UpdateNodeInfoSnapshot(&cachedNodes) + cache.UpdateNodeInfoSnapshot(cachedNodes) } } diff --git a/pkg/scheduler/plugins/examples/multipoint.go b/pkg/scheduler/plugins/examples/multipoint.go deleted file mode 100644 index 3b82f219802..00000000000 --- a/pkg/scheduler/plugins/examples/multipoint.go +++ /dev/null @@ -1,62 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package examples - -import ( - "fmt" - - "k8s.io/api/core/v1" - plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1" -) - -// MultipointCommunicatingPlugin is an example of a plugin that implements two -// extension points. It communicates through pluginContext with another function. -type MultipointCommunicatingPlugin struct{} - -var _ = plugins.ReservePlugin(MultipointCommunicatingPlugin{}) - -// Name returns name of the plugin. It is used in logs, etc. -func (mc MultipointCommunicatingPlugin) Name() string { - return "multipoint-communicating-plugin" -} - -// Reserve is the functions invoked by the framework at "reserve" extension point. -func (mc MultipointCommunicatingPlugin) Reserve(ps plugins.PluginSet, pod *v1.Pod, nodeName string) error { - if pod == nil { - return fmt.Errorf("pod cannot be nil") - } - if pod.Name == "my-test-pod" { - ps.Data().Ctx.SyncWrite(plugins.ContextKey(pod.Name), "never bind") - } - return nil -} - -// Prebind is the functions invoked by the framework at "prebind" extension point. -func (mc MultipointCommunicatingPlugin) Prebind(ps plugins.PluginSet, pod *v1.Pod, nodeName string) (bool, error) { - if pod == nil { - return false, fmt.Errorf("pod cannot be nil") - } - if v, e := ps.Data().Ctx.SyncRead(plugins.ContextKey(pod.Name)); e == nil && v == "never bind" { - return false, nil - } - return true, nil -} - -// NewMultipointCommunicatingPlugin initializes a new plugin and returns it. -func NewMultipointCommunicatingPlugin() *MultipointCommunicatingPlugin { - return &MultipointCommunicatingPlugin{} -} diff --git a/pkg/scheduler/plugins/examples/stateful.go b/pkg/scheduler/plugins/examples/stateful.go deleted file mode 100644 index 2b8b210305e..00000000000 --- a/pkg/scheduler/plugins/examples/stateful.go +++ /dev/null @@ -1,68 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package examples - -import ( - "fmt" - "k8s.io/klog" - - "k8s.io/api/core/v1" - plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1" -) - -// StatefulMultipointExample is an example plugin that is executed at multiple extension points. -// This plugin is stateful. It receives arguments at initialization (NewMultipointPlugin) -// and changes its state when it is executed. -type StatefulMultipointExample struct { - mpState map[int]string - numRuns int -} - -var _ = plugins.ReservePlugin(&StatefulMultipointExample{}) -var _ = plugins.PrebindPlugin(&StatefulMultipointExample{}) - -// Name returns name of the plugin. It is used in logs, etc. -func (mp *StatefulMultipointExample) Name() string { - return "multipoint-plugin-example" -} - -// Reserve is the functions invoked by the framework at "reserve" extension point. -func (mp *StatefulMultipointExample) Reserve(ps plugins.PluginSet, pod *v1.Pod, nodeName string) error { - mp.numRuns++ - return nil -} - -// Prebind is the functions invoked by the framework at "prebind" extension point. -func (mp *StatefulMultipointExample) Prebind(ps plugins.PluginSet, pod *v1.Pod, nodeName string) (bool, error) { - mp.numRuns++ - if pod == nil { - return false, fmt.Errorf("pod must not be nil") - } - return true, nil -} - -// NewStatefulMultipointExample initializes a new plugin and returns it. -func NewStatefulMultipointExample(initState ...interface{}) *StatefulMultipointExample { - if len(initState) == 0 { - klog.Error("StatefulMultipointExample needs exactly one argument for initialization") - return nil - } - mp := StatefulMultipointExample{ - mpState: initState[0].(map[int]string), - } - return &mp -} diff --git a/pkg/scheduler/plugins/registrar.go b/pkg/scheduler/plugins/registrar.go deleted file mode 100644 index 62de362d962..00000000000 --- a/pkg/scheduler/plugins/registrar.go +++ /dev/null @@ -1,77 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package plugins - -import ( - "k8s.io/kubernetes/pkg/scheduler/internal/cache" - plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1" -) - -// DefaultPluginSet is the default plugin registrar used by the default scheduler. -type DefaultPluginSet struct { - data *plugins.PluginData - reservePlugins []plugins.ReservePlugin - prebindPlugins []plugins.PrebindPlugin -} - -var _ = plugins.PluginSet(&DefaultPluginSet{}) - -// ReservePlugins returns a slice of default reserve plugins. -func (r *DefaultPluginSet) ReservePlugins() []plugins.ReservePlugin { - return r.reservePlugins -} - -// PrebindPlugins returns a slice of default prebind plugins. -func (r *DefaultPluginSet) PrebindPlugins() []plugins.PrebindPlugin { - return r.prebindPlugins -} - -// Data returns a pointer to PluginData. -func (r *DefaultPluginSet) Data() *plugins.PluginData { - return r.data -} - -// NewDefaultPluginSet initializes default plugin set and returns its pointer. -func NewDefaultPluginSet(ctx *plugins.PluginContext, schedulerCache *cache.Cache) *DefaultPluginSet { - defaultRegistrar := DefaultPluginSet{ - data: &plugins.PluginData{ - Ctx: ctx, - SchedulerCache: schedulerCache, - }, - } - defaultRegistrar.registerReservePlugins() - defaultRegistrar.registerPrebindPlugins() - return &defaultRegistrar -} - -func (r *DefaultPluginSet) registerReservePlugins() { - r.reservePlugins = []plugins.ReservePlugin{ - // Init functions of all reserve plugins go here. They are called in the - // same order that they are registered. - // Example: - // examples.NewStatefulMultipointExample(map[int]string{1: "test1", 2: "test2"}), - } -} - -func (r *DefaultPluginSet) registerPrebindPlugins() { - r.prebindPlugins = []plugins.PrebindPlugin{ - // Init functions of all prebind plugins go here. They are called in the - // same order that they are registered. - // Example: - // examples.NewStatelessPrebindExample(), - } -} diff --git a/pkg/scheduler/plugins/v1alpha1/interface.go b/pkg/scheduler/plugins/v1alpha1/interface.go deleted file mode 100644 index 0d0c90b43e6..00000000000 --- a/pkg/scheduler/plugins/v1alpha1/interface.go +++ /dev/null @@ -1,63 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// This file defines the scheduling framework plugin interfaces. - -package v1alpha1 - -import ( - "k8s.io/api/core/v1" - "k8s.io/kubernetes/pkg/scheduler/internal/cache" -) - -// PluginData carries information that plugins may need. -type PluginData struct { - Ctx *PluginContext - SchedulerCache *cache.Cache - // We may want to add the scheduling queue here too. -} - -// Plugin is the parent type for all the scheduling framework plugins. -type Plugin interface { - Name() string -} - -// ReservePlugin is an interface for Reserve plugins. These plugins are called -// at the reservation point, AKA "assume". These are meant to updated the state -// of the plugin. They do not return any value (other than error). -type ReservePlugin interface { - Plugin - // Reserve is called by the scheduling framework when the scheduler cache is - // updated. - Reserve(ps PluginSet, p *v1.Pod, nodeName string) error -} - -// PrebindPlugin is an interface that must be implemented by "prebind" plugins. -// These plugins are called before a pod being scheduled -type PrebindPlugin interface { - Plugin - // Prebind is called before binding a pod. All prebind plugins must return - // or the pod will not be sent for binding. - Prebind(ps PluginSet, p *v1.Pod, nodeName string) (bool, error) -} - -// PluginSet registers plugins used by the scheduling framework. -// The plugins registered are called at specified points in an scheduling cycle. -type PluginSet interface { - Data() *PluginData - ReservePlugins() []ReservePlugin - PrebindPlugins() []PrebindPlugin -} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 10e1304fa27..4c2b44bcb2b 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -17,7 +17,6 @@ limitations under the License. package scheduler import ( - "errors" "fmt" "io/ioutil" "os" @@ -40,6 +39,7 @@ import ( kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/core" "k8s.io/kubernetes/pkg/scheduler/factory" + framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" "k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/util" @@ -156,6 +156,7 @@ func New(client clientset.Interface, DisablePreemption: options.disablePreemption, PercentageOfNodesToScore: options.percentageOfNodesToScore, BindTimeoutSeconds: options.bindTimeoutSeconds, + Registry: framework.NewRegistry(), }) var config *factory.Config source := schedulerAlgorithmSource @@ -434,11 +435,7 @@ func (sched *Scheduler) bind(assumed *v1.Pod, b *v1.Binding) error { // scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting. func (sched *Scheduler) scheduleOne() { - plugins := sched.config.PluginSet - // Remove all plugin context data at the beginning of a scheduling cycle. - if plugins.Data().Ctx != nil { - plugins.Data().Ctx.Reset() - } + fwk := sched.config.Framework pod := sched.config.NextPod() // pod could be nil when schedulerQueue is closed @@ -455,6 +452,7 @@ func (sched *Scheduler) scheduleOne() { // Synchronously attempt to find a fit for the pod. start := time.Now() + pluginContext := framework.NewPluginContext() scheduleResult, err := sched.schedule(pod) if err != nil { // schedule() may have failed because the pod would not fit on any host, so we try to @@ -505,15 +503,12 @@ func (sched *Scheduler) scheduleOne() { } // Run "reserve" plugins. - for _, pl := range plugins.ReservePlugins() { - if err := pl.Reserve(plugins, assumedPod, scheduleResult.SuggestedHost); err != nil { - klog.Errorf("error while running %v reserve plugin for pod %v: %v", pl.Name(), assumedPod.Name, err) - sched.recordSchedulingFailure(assumedPod, err, SchedulerError, - fmt.Sprintf("reserve plugin %v failed", pl.Name())) - metrics.PodScheduleErrors.Inc() - return - } + if sts := fwk.RunReservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { + sched.recordSchedulingFailure(assumedPod, sts.AsError(), SchedulerError, sts.Message()) + metrics.PodScheduleErrors.Inc() + return } + // assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost err = sched.assume(assumedPod, scheduleResult.SuggestedHost) if err != nil { @@ -534,29 +529,20 @@ func (sched *Scheduler) scheduleOne() { } // Run "prebind" plugins. - for _, pl := range plugins.PrebindPlugins() { - approved, err := pl.Prebind(plugins, assumedPod, scheduleResult.SuggestedHost) - if err != nil { - approved = false - klog.Errorf("error while running %v prebind plugin for pod %v: %v", pl.Name(), assumedPod.Name, err) + prebindStatus := fwk.RunPrebindPlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost) + if !prebindStatus.IsSuccess() { + var reason string + if prebindStatus.Code() == framework.Unschedulable { + reason = v1.PodReasonUnschedulable + } else { metrics.PodScheduleErrors.Inc() + reason = SchedulerError } - if !approved { - if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil { - klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) - } - var reason string - if err == nil { - msg := fmt.Sprintf("prebind plugin %v rejected pod %v.", pl.Name(), assumedPod.Name) - klog.V(4).Infof(msg) - err = errors.New(msg) - reason = v1.PodReasonUnschedulable - } else { - reason = SchedulerError - } - sched.recordSchedulingFailure(assumedPod, err, reason, err.Error()) - return + if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil { + klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) } + sched.recordSchedulingFailure(assumedPod, prebindStatus.AsError(), reason, prebindStatus.Message()) + return } err := sched.bind(assumedPod, &v1.Binding{ diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 86e187ba46d..e458020b575 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -50,6 +50,7 @@ import ( kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/core" "k8s.io/kubernetes/pkg/scheduler/factory" + framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" @@ -196,6 +197,7 @@ func TestSchedulerCreation(t *testing.T) { eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"}), kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &testSource}, stopCh, + EmptyPluginRegistry, WithBindTimeoutSeconds(defaultBindTimeout)) if err != nil { @@ -273,6 +275,7 @@ func TestScheduler(t *testing.T) { var gotAssumedPod *v1.Pod var gotBinding *v1.Binding + fwk, _ := framework.NewFramework(EmptyPluginRegistry, nil) s := NewFromConfig(&factory.Config{ SchedulerCache: &fakecache.Cache{ ForgetFunc: func(pod *v1.Pod) { @@ -298,7 +301,7 @@ func TestScheduler(t *testing.T) { NextPod: func() *v1.Pod { return item.sendPod }, - PluginSet: &EmptyPluginSet{}, + Framework: fwk, Recorder: eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"}), VolumeBinder: volumebinder.NewFakeVolumeBinder(&persistentvolume.FakeVolumeBinderConfig{AllBound: true}), }) @@ -636,6 +639,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) { // queuedPodStore: pods queued before processing. // scache: scheduler cache that might contain assumed pods. func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]predicates.FitPredicate, recorder record.EventRecorder) (*Scheduler, chan *v1.Binding, chan error) { + framework, _ := framework.NewFramework(EmptyPluginRegistry, nil) algo := core.NewGenericScheduler( scache, internalqueue.NewSchedulingQueue(nil), @@ -643,7 +647,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C predicates.EmptyPredicateMetadataProducer, []priorities.PriorityConfig{}, priorities.EmptyPriorityMetadataProducer, - &EmptyPluginSet{}, + framework, []algorithm.SchedulerExtender{}, nil, informerFactory.Core().V1().PersistentVolumeClaims().Lister(), @@ -674,7 +678,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C Recorder: &record.FakeRecorder{}, PodConditionUpdater: fakePodConditionUpdater{}, PodPreemptor: fakePodPreemptor{}, - PluginSet: &EmptyPluginSet{}, + Framework: framework, VolumeBinder: volumebinder.NewFakeVolumeBinder(&persistentvolume.FakeVolumeBinderConfig{AllBound: true}), } @@ -688,6 +692,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C } func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]predicates.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) { + framework, _ := framework.NewFramework(EmptyPluginRegistry, nil) algo := core.NewGenericScheduler( scache, internalqueue.NewSchedulingQueue(nil), @@ -695,7 +700,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc predicates.EmptyPredicateMetadataProducer, []priorities.PriorityConfig{}, priorities.EmptyPriorityMetadataProducer, - &EmptyPluginSet{}, + framework, []algorithm.SchedulerExtender{}, nil, informerFactory.Core().V1().PersistentVolumeClaims().Lister(), @@ -730,7 +735,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc PodConditionUpdater: fakePodConditionUpdater{}, PodPreemptor: fakePodPreemptor{}, StopEverything: stop, - PluginSet: &EmptyPluginSet{}, + Framework: framework, VolumeBinder: volumebinder.NewFakeVolumeBinder(&persistentvolume.FakeVolumeBinderConfig{AllBound: true}), }) diff --git a/pkg/scheduler/testutil.go b/pkg/scheduler/testutil.go index fb7ffeb826f..b893ae8fe87 100644 --- a/pkg/scheduler/testutil.go +++ b/pkg/scheduler/testutil.go @@ -27,8 +27,8 @@ import ( "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" "k8s.io/kubernetes/pkg/scheduler/factory" + framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" - plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1" ) // FakeConfigurator is an implementation for test. @@ -91,25 +91,5 @@ func (fc *FakeConfigurator) CreateFromKeys(predicateKeys, priorityKeys sets.Stri return fc.Config, nil } -// EmptyPluginSet is the default plugin registrar used by the default scheduler. -type EmptyPluginSet struct{} - -var _ = plugins.PluginSet(EmptyPluginSet{}) - -// ReservePlugins returns a slice of default reserve plugins. -func (r EmptyPluginSet) ReservePlugins() []plugins.ReservePlugin { - return []plugins.ReservePlugin{} -} - -// PrebindPlugins returns a slice of default prebind plugins. -func (r EmptyPluginSet) PrebindPlugins() []plugins.PrebindPlugin { - return []plugins.PrebindPlugin{} -} - -// Data returns a pointer to PluginData. -func (r EmptyPluginSet) Data() *plugins.PluginData { - return &plugins.PluginData{ - Ctx: nil, - SchedulerCache: nil, - } -} +// EmptyPluginRegistry is an empty plugin registry used in tests. +var EmptyPluginRegistry = framework.Registry{} diff --git a/test/integration/scheduler/framework_test.go b/test/integration/scheduler/framework_test.go new file mode 100644 index 00000000000..c900bd6a7e9 --- /dev/null +++ b/test/integration/scheduler/framework_test.go @@ -0,0 +1,218 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "fmt" + "k8s.io/apimachinery/pkg/runtime" + "testing" + "time" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" + framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" +) + +// TesterPlugin is common ancestor for a test plugin that allows injection of +// failures and some other test functionalities. +type TesterPlugin struct { + numReserveCalled int + numPrebindCalled int + failReserve bool + failPrebind bool + rejectPrebind bool +} + +type ReservePlugin struct { + TesterPlugin +} + +type PrebindPlugin struct { + TesterPlugin +} + +const ( + reservePluginName = "reserve-plugin" + prebindPluginName = "prebind-plugin" +) + +var _ = framework.ReservePlugin(&ReservePlugin{}) +var _ = framework.PrebindPlugin(&PrebindPlugin{}) + +// Name returns name of the plugin. +func (rp *ReservePlugin) Name() string { + return reservePluginName +} + +// Reserve is a test function that returns an error or nil, depending on the +// value of "failReserve". +func (rp *ReservePlugin) Reserve(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status { + rp.numReserveCalled++ + if rp.failReserve { + return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)) + } + return nil +} + +var resPlugin = &ReservePlugin{} +var pbdPlugin = &PrebindPlugin{} + +// NewReservePlugin is the factory for reserve plugin. +func NewReservePlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) { + return resPlugin, nil +} + +// Name returns name of the plugin. +func (pp *PrebindPlugin) Name() string { + return prebindPluginName +} + +// Prebind is a test function that returns (true, nil) or errors for testing. +func (pp *PrebindPlugin) Prebind(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status { + pp.numPrebindCalled++ + if pp.failPrebind { + return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)) + } + if pp.rejectPrebind { + return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name)) + } + return nil +} + +// NewPrebindPlugin is the factory for prebind plugin. +func NewPrebindPlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) { + return pbdPlugin, nil +} + +// TestReservePlugin tests invocation of reserve plugins. +func TestReservePlugin(t *testing.T) { + // Create a plugin registry for testing. Register only a reserve plugin. + registry := framework.Registry{reservePluginName: NewReservePlugin} + + // Create the master and the scheduler with the test plugin set. + context := initTestSchedulerWithOptions(t, + initTestMaster(t, "reserve-plugin", nil), + false, nil, registry, false, time.Second) + defer cleanupTest(t, context) + + cs := context.clientSet + // Add a few nodes. + _, err := createNodes(cs, "test-node", nil, 2) + if err != nil { + t.Fatalf("Cannot create nodes: %v", err) + } + + for _, fail := range []bool{false, true} { + resPlugin.failReserve = fail + // Create a best effort pod. + pod, err := createPausePod(cs, + initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name})) + if err != nil { + t.Errorf("Error while creating a test pod: %v", err) + } + + if fail { + if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(cs, pod.Namespace, pod.Name)); err != nil { + t.Errorf("Didn't expected the pod to be scheduled. error: %v", err) + } + } else { + if err = waitForPodToSchedule(cs, pod); err != nil { + t.Errorf("Expected the pod to be scheduled. error: %v", err) + } + } + + if resPlugin.numReserveCalled == 0 { + t.Errorf("Expected the reserve plugin to be called.") + } + + cleanupPods(cs, t, []*v1.Pod{pod}) + } +} + +// TestPrebindPlugin tests invocation of prebind plugins. +func TestPrebindPlugin(t *testing.T) { + // Create a plugin registry for testing. Register only a reserve plugin. + registry := framework.Registry{prebindPluginName: NewPrebindPlugin} + + // Create the master and the scheduler with the test plugin set. + context := initTestSchedulerWithOptions(t, + initTestMaster(t, "prebind-plugin", nil), + false, nil, registry, false, time.Second) + defer cleanupTest(t, context) + + cs := context.clientSet + // Add a few nodes. + _, err := createNodes(cs, "test-node", nil, 2) + if err != nil { + t.Fatalf("Cannot create nodes: %v", err) + } + + tests := []struct { + fail bool + reject bool + }{ + { + fail: false, + reject: false, + }, + { + fail: true, + reject: false, + }, + { + fail: false, + reject: true, + }, + { + fail: true, + reject: true, + }, + } + + for i, test := range tests { + pbdPlugin.failPrebind = test.fail + pbdPlugin.rejectPrebind = test.reject + // Create a best effort pod. + pod, err := createPausePod(cs, + initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name})) + if err != nil { + t.Errorf("Error while creating a test pod: %v", err) + } + + if test.fail { + if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(cs, pod.Namespace, pod.Name)); err != nil { + t.Errorf("test #%v: Expected a scheduling error, but didn't get it. error: %v", i, err) + } + } else { + if test.reject { + if err = waitForPodUnschedulable(cs, pod); err != nil { + t.Errorf("test #%v: Didn't expected the pod to be scheduled. error: %v", i, err) + } + } else { + if err = waitForPodToSchedule(cs, pod); err != nil { + t.Errorf("test #%v: Expected the pod to be scheduled. error: %v", i, err) + } + } + } + + if pbdPlugin.numPrebindCalled == 0 { + t.Errorf("Expected the prebind plugin to be called.") + } + + cleanupPods(cs, t, []*v1.Pod{pod}) + } +} diff --git a/test/integration/scheduler/plugin_test.go b/test/integration/scheduler/plugin_test.go deleted file mode 100644 index b1bf97f673f..00000000000 --- a/test/integration/scheduler/plugin_test.go +++ /dev/null @@ -1,269 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package scheduler - -import ( - "fmt" - "testing" - "time" - - "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/wait" - plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1" -) - -// StatefulMultipointExample is an example plugin that is executed at multiple extension points. -// This plugin is stateful. It receives arguments at initialization (NewMultipointPlugin) -// and changes its state when it is executed. -type TesterPlugin struct { - numReserveCalled int - numPrebindCalled int - failReserve bool - failPrebind bool - rejectPrebind bool -} - -var _ = plugins.ReservePlugin(&TesterPlugin{}) -var _ = plugins.PrebindPlugin(&TesterPlugin{}) - -// Name returns name of the plugin. -func (tp *TesterPlugin) Name() string { - return "tester-plugin" -} - -// Reserve is a test function that returns an error or nil, depending on the -// value of "failReserve". -func (tp *TesterPlugin) Reserve(ps plugins.PluginSet, pod *v1.Pod, nodeName string) error { - tp.numReserveCalled++ - if tp.failReserve { - return fmt.Errorf("injecting failure for pod %v", pod.Name) - } - return nil -} - -// Prebind is a test function that returns (true, nil) or errors for testing. -func (tp *TesterPlugin) Prebind(ps plugins.PluginSet, pod *v1.Pod, nodeName string) (bool, error) { - var err error - tp.numPrebindCalled++ - if tp.failPrebind { - err = fmt.Errorf("injecting failure for pod %v", pod.Name) - } - if tp.rejectPrebind { - return false, err - } - return true, err -} - -// TestPluginSet is a plugin set used for testing purposes. -type TestPluginSet struct { - data *plugins.PluginData - reservePlugins []plugins.ReservePlugin - prebindPlugins []plugins.PrebindPlugin -} - -var _ = plugins.PluginSet(&TestPluginSet{}) - -// ReservePlugins returns a slice of default reserve plugins. -func (r *TestPluginSet) ReservePlugins() []plugins.ReservePlugin { - return r.reservePlugins -} - -// PrebindPlugins returns a slice of default prebind plugins. -func (r *TestPluginSet) PrebindPlugins() []plugins.PrebindPlugin { - return r.prebindPlugins -} - -// Data returns a pointer to PluginData. -func (r *TestPluginSet) Data() *plugins.PluginData { - return r.data -} - -// TestReservePlugin tests invocation of reserve plugins. -func TestReservePlugin(t *testing.T) { - // Create a plugin set for testing. Register only a reserve plugin. - testerPlugin := &TesterPlugin{} - testPluginSet := &TestPluginSet{ - data: &plugins.PluginData{ - Ctx: plugins.NewPluginContext(), - }, - reservePlugins: []plugins.ReservePlugin{testerPlugin}, - } - - // Create the master and the scheduler with the test plugin set. - context := initTestSchedulerWithOptions(t, - initTestMaster(t, "reserve-plugin", nil), - false, nil, testPluginSet, false, time.Second) - defer cleanupTest(t, context) - - cs := context.clientSet - // Add a few nodes. - _, err := createNodes(cs, "test-node", nil, 2) - if err != nil { - t.Fatalf("Cannot create nodes: %v", err) - } - - for _, fail := range []bool{false, true} { - testerPlugin.failReserve = fail - // Create a best effort pod. - pod, err := createPausePod(cs, - initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name})) - if err != nil { - t.Errorf("Error while creating a test pod: %v", err) - } - - if fail { - if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(cs, pod.Namespace, pod.Name)); err != nil { - t.Errorf("Didn't expected the pod to be scheduled. error: %v", err) - } - } else { - if err = waitForPodToSchedule(cs, pod); err != nil { - t.Errorf("Expected the pod to be scheduled. error: %v", err) - } - } - - if testerPlugin.numReserveCalled == 0 { - t.Errorf("Expected the reserve plugin to be called.") - } - - cleanupPods(cs, t, []*v1.Pod{pod}) - } -} - -// TestPrebindPlugin tests invocation of prebind plugins. -func TestPrebindPlugin(t *testing.T) { - // Create a plugin set for testing. Register only a prebind plugin. - testerPlugin := &TesterPlugin{} - testPluginSet := &TestPluginSet{ - data: &plugins.PluginData{ - Ctx: plugins.NewPluginContext(), - }, - prebindPlugins: []plugins.PrebindPlugin{testerPlugin}, - } - - // Create the master and the scheduler with the test plugin set. - context := initTestSchedulerWithOptions(t, - initTestMaster(t, "prebind-plugin", nil), - false, nil, testPluginSet, false, time.Second) - defer cleanupTest(t, context) - - cs := context.clientSet - // Add a few nodes. - _, err := createNodes(cs, "test-node", nil, 2) - if err != nil { - t.Fatalf("Cannot create nodes: %v", err) - } - - tests := []struct { - fail bool - reject bool - }{ - { - fail: false, - reject: false, - }, - { - fail: true, - reject: false, - }, - { - fail: false, - reject: true, - }, - { - fail: true, - reject: true, - }, - } - - for i, test := range tests { - testerPlugin.failPrebind = test.fail - testerPlugin.rejectPrebind = test.reject - // Create a best effort pod. - pod, err := createPausePod(cs, - initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name})) - if err != nil { - t.Errorf("Error while creating a test pod: %v", err) - } - - if test.fail { - if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(cs, pod.Namespace, pod.Name)); err != nil { - t.Errorf("test #%v: Expected a scheduling error, but didn't get it. error: %v", i, err) - } - } else { - if test.reject { - if err = waitForPodUnschedulable(cs, pod); err != nil { - t.Errorf("test #%v: Didn't expected the pod to be scheduled. error: %v", i, err) - } - } else { - if err = waitForPodToSchedule(cs, pod); err != nil { - t.Errorf("test #%v: Expected the pod to be scheduled. error: %v", i, err) - } - } - } - - if testerPlugin.numPrebindCalled == 0 { - t.Errorf("Expected the prebind plugin to be called.") - } - - cleanupPods(cs, t, []*v1.Pod{pod}) - } -} - -// TestContextCleanup tests that data inserted in the pluginContext is removed -// after a scheduling cycle is over. -func TestContextCleanup(t *testing.T) { - // Create a plugin set for testing. - testerPlugin := &TesterPlugin{} - testPluginSet := &TestPluginSet{ - data: &plugins.PluginData{ - Ctx: plugins.NewPluginContext(), - }, - reservePlugins: []plugins.ReservePlugin{testerPlugin}, - prebindPlugins: []plugins.PrebindPlugin{testerPlugin}, - } - - // Create the master and the scheduler with the test plugin set. - context := initTestSchedulerWithOptions(t, - initTestMaster(t, "plugin-context-cleanup", nil), - false, nil, testPluginSet, false, time.Second) - defer cleanupTest(t, context) - - cs := context.clientSet - // Add a few nodes. - _, err := createNodes(cs, "test-node", nil, 2) - if err != nil { - t.Fatalf("Cannot create nodes: %v", err) - } - - // Insert something in the plugin context. - testPluginSet.Data().Ctx.Write("test", "foo") - - // Create and schedule a best effort pod. - pod, err := runPausePod(cs, - initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name})) - if err != nil { - t.Errorf("Error while creating or scheduling a test pod: %v", err) - } - - // Make sure the data inserted in the plugin context is removed. - _, err = testPluginSet.Data().Ctx.Read("test") - if err == nil || err.Error() != plugins.NotFound { - t.Errorf("Expected the plugin context to be cleaned up after a scheduling cycle. error: %v", err) - } - - cleanupPods(cs, t, []*v1.Pod{pod}) -} diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index 1226aa16618..a3642662f61 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -43,6 +43,7 @@ import ( schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/factory" + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" "k8s.io/kubernetes/test/integration/framework" ) @@ -595,7 +596,7 @@ func TestMultiScheduler(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - schedulerConfigFactory2 := createConfiguratorWithPodInformer(fooScheduler, clientSet2, podInformer2, informerFactory2, stopCh) + schedulerConfigFactory2 := createConfiguratorWithPodInformer(fooScheduler, clientSet2, podInformer2, informerFactory2, schedulerframework.NewRegistry(), stopCh) schedulerConfig2, err := schedulerConfigFactory2.Create() if err != nil { t.Errorf("Couldn't create scheduler config: %v", err) diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index 70cc08c2479..c33f7f180b6 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -49,7 +49,7 @@ import ( _ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" "k8s.io/kubernetes/pkg/scheduler/factory" - plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1" + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" taintutils "k8s.io/kubernetes/pkg/util/taints" "k8s.io/kubernetes/test/integration/framework" imageutils "k8s.io/kubernetes/test/utils/image" @@ -73,6 +73,7 @@ func createConfiguratorWithPodInformer( clientSet clientset.Interface, podInformer coreinformers.PodInformer, informerFactory informers.SharedInformerFactory, + pluginRegistry schedulerframework.Registry, stopCh <-chan struct{}, ) factory.Configurator { return factory.NewConfigFactory(&factory.ConfigFactoryArgs{ @@ -88,6 +89,7 @@ func createConfiguratorWithPodInformer( ServiceInformer: informerFactory.Core().V1().Services(), PdbInformer: informerFactory.Policy().V1beta1().PodDisruptionBudgets(), StorageClassInformer: informerFactory.Storage().V1().StorageClasses(), + Registry: pluginRegistry, HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight, DisablePreemption: false, PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore, @@ -146,7 +148,7 @@ func initTestScheduler( ) *testContext { // Pod preemption is enabled by default scheduler configuration, but preemption only happens when PodPriority // feature gate is enabled at the same time. - return initTestSchedulerWithOptions(t, context, setPodInformer, policy, nil, false, time.Second) + return initTestSchedulerWithOptions(t, context, setPodInformer, policy, schedulerframework.NewRegistry(), false, time.Second) } // initTestSchedulerWithOptions initializes a test environment and creates a scheduler with default @@ -156,7 +158,7 @@ func initTestSchedulerWithOptions( context *testContext, setPodInformer bool, policy *schedulerapi.Policy, - pluginSet plugins.PluginSet, + pluginRegistry schedulerframework.Registry, disablePreemption bool, resyncPeriod time.Duration, ) *testContext { @@ -173,7 +175,7 @@ func initTestSchedulerWithOptions( } context.schedulerConfigFactory = createConfiguratorWithPodInformer( - v1.DefaultSchedulerName, context.clientSet, podInformer, context.informerFactory, context.stopCh) + v1.DefaultSchedulerName, context.clientSet, podInformer, context.informerFactory, pluginRegistry, context.stopCh) var err error @@ -208,11 +210,6 @@ func initTestSchedulerWithOptions( controller.WaitForCacheSync("scheduler", context.schedulerConfig.StopEverything, podInformer.Informer().HasSynced) } - // Set pluginSet if provided. DefaultPluginSet is used if this is not specified. - if pluginSet != nil { - context.schedulerConfig.PluginSet = pluginSet - } - eventBroadcaster := record.NewBroadcaster() context.schedulerConfig.Recorder = eventBroadcaster.NewRecorder( legacyscheme.Scheme, @@ -259,7 +256,8 @@ func initTest(t *testing.T, nsPrefix string) *testContext { // configuration but with pod preemption disabled. func initTestDisablePreemption(t *testing.T, nsPrefix string) *testContext { return initTestSchedulerWithOptions( - t, initTestMaster(t, nsPrefix, nil), true, nil, nil, true, time.Second) + t, initTestMaster(t, nsPrefix, nil), true, nil, + schedulerframework.NewRegistry(), true, time.Second) } // cleanupTest deletes the scheduler and the test namespace. It should be called