Update scheduler framework plugins to align with the latest changes to the framework design

This commit is contained in:
Bobby (Babak) Salamat 2019-03-20 17:07:25 -07:00
parent b3ad4cd6b9
commit 404dc1ed79
26 changed files with 631 additions and 701 deletions

View File

@ -29,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
@ -531,6 +532,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
extenders = append(extenders, &test.extenders[ii])
}
cache := internalcache.New(time.Duration(0), wait.NeverStop)
fwk, _ := framework.NewFramework(EmptyPluginRegistry, nil)
for _, name := range test.nodes {
cache.AddNode(createNode(name))
}
@ -542,7 +544,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
predicates.EmptyPredicateMetadataProducer,
test.prioritizers,
priorities.EmptyPriorityMetadataProducer,
emptyPluginSet,
fwk,
extenders,
nil,
schedulertesting.FakePersistentVolumeClaimLister{},

View File

@ -39,11 +39,11 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/metrics"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
pluginsv1alpha1 "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
utiltrace "k8s.io/utils/trace"
@ -158,11 +158,11 @@ type genericScheduler struct {
priorityMetaProducer priorities.PriorityMetadataProducer
predicateMetaProducer predicates.PredicateMetadataProducer
prioritizers []priorities.PriorityConfig
pluginSet pluginsv1alpha1.PluginSet
framework framework.Framework
extenders []algorithm.SchedulerExtender
lastNodeIndex uint64
alwaysCheckAllPredicates bool
nodeInfoSnapshot internalcache.NodeInfoSnapshot
nodeInfoSnapshot *internalcache.NodeInfoSnapshot
volumeBinder *volumebinder.VolumeBinder
pvcLister corelisters.PersistentVolumeClaimLister
pdbLister algorithm.PDBLister
@ -174,7 +174,7 @@ type genericScheduler struct {
// functions.
func (g *genericScheduler) snapshot() error {
// Used for all fit and priority funcs.
return g.cache.UpdateNodeInfoSnapshot(&g.nodeInfoSnapshot)
return g.cache.UpdateNodeInfoSnapshot(g.nodeInfoSnapshot)
}
// Schedule tries to schedule the given pod to one of the nodes in the node list.
@ -1206,7 +1206,7 @@ func NewGenericScheduler(
predicateMetaProducer predicates.PredicateMetadataProducer,
prioritizers []priorities.PriorityConfig,
priorityMetaProducer priorities.PriorityMetadataProducer,
pluginSet pluginsv1alpha1.PluginSet,
framework framework.Framework,
extenders []algorithm.SchedulerExtender,
volumeBinder *volumebinder.VolumeBinder,
pvcLister corelisters.PersistentVolumeClaimLister,
@ -1222,9 +1222,9 @@ func NewGenericScheduler(
predicateMetaProducer: predicateMetaProducer,
prioritizers: prioritizers,
priorityMetaProducer: priorityMetaProducer,
pluginSet: pluginSet,
framework: framework,
extenders: extenders,
nodeInfoSnapshot: internalcache.NewNodeInfoSnapshot(),
nodeInfoSnapshot: framework.NodeInfoSnapshot(),
volumeBinder: volumeBinder,
pvcLister: pvcLister,
pdbLister: pdbLister,

View File

@ -38,10 +38,10 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing"
)
@ -134,27 +134,9 @@ func getNodeReducePriority(pod *v1.Pod, meta interface{}, nodeNameToInfo map[str
return nil
}
// EmptyPluginSet is a test plugin set used by the default scheduler.
type EmptyPluginSet struct{}
var _ plugins.PluginSet = EmptyPluginSet{}
// ReservePlugins returns a slice of default reserve plugins.
func (r EmptyPluginSet) ReservePlugins() []plugins.ReservePlugin {
return []plugins.ReservePlugin{}
}
// PrebindPlugins returns a slice of default prebind plugins.
func (r EmptyPluginSet) PrebindPlugins() []plugins.PrebindPlugin {
return []plugins.PrebindPlugin{}
}
// Data returns a pointer to PluginData.
func (r EmptyPluginSet) Data() *plugins.PluginData {
return &plugins.PluginData{}
}
var emptyPluginSet = &EmptyPluginSet{}
// EmptyPluginRegistry is a test plugin set used by the default scheduler.
var EmptyPluginRegistry = framework.Registry{}
var emptyFramework, _ = framework.NewFramework(EmptyPluginRegistry, nil)
func makeNodeList(nodeNames []string) []*v1.Node {
result := make([]*v1.Node, 0, len(nodeNames))
@ -456,6 +438,7 @@ func TestGenericScheduler(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
cache := internalcache.New(time.Duration(0), wait.NeverStop)
fwk, _ := framework.NewFramework(EmptyPluginRegistry, nil)
for _, pod := range test.pods {
cache.AddPod(pod)
}
@ -474,7 +457,7 @@ func TestGenericScheduler(t *testing.T) {
algorithmpredicates.EmptyPredicateMetadataProducer,
test.prioritizers,
priorities.EmptyPriorityMetadataProducer,
emptyPluginSet,
fwk,
[]algorithm.SchedulerExtender{},
nil,
pvcLister,
@ -498,6 +481,7 @@ func TestGenericScheduler(t *testing.T) {
func makeScheduler(predicates map[string]algorithmpredicates.FitPredicate, nodes []*v1.Node) *genericScheduler {
algorithmpredicates.SetPredicatesOrdering(order)
cache := internalcache.New(time.Duration(0), wait.NeverStop)
fwk, _ := framework.NewFramework(EmptyPluginRegistry, nil)
for _, n := range nodes {
cache.AddNode(n)
}
@ -510,10 +494,10 @@ func makeScheduler(predicates map[string]algorithmpredicates.FitPredicate, nodes
algorithmpredicates.EmptyPredicateMetadataProducer,
prioritizers,
priorities.EmptyPriorityMetadataProducer,
emptyPluginSet,
fwk,
nil, nil, nil, nil, false, false,
schedulerapi.DefaultPercentageOfNodesToScore)
cache.UpdateNodeInfoSnapshot(&s.(*genericScheduler).nodeInfoSnapshot)
cache.UpdateNodeInfoSnapshot(s.(*genericScheduler).nodeInfoSnapshot)
return s.(*genericScheduler)
}
@ -1483,6 +1467,7 @@ func TestPreempt(t *testing.T) {
t.Logf("===== Running test %v", t.Name())
stop := make(chan struct{})
cache := internalcache.New(time.Duration(0), stop)
fwk, _ := framework.NewFramework(EmptyPluginRegistry, nil)
for _, pod := range test.pods {
cache.AddPod(pod)
}
@ -1509,7 +1494,7 @@ func TestPreempt(t *testing.T) {
algorithmpredicates.EmptyPredicateMetadataProducer,
[]priorities.PriorityConfig{{Function: numericPriority, Weight: 1}},
priorities.EmptyPriorityMetadataProducer,
emptyPluginSet,
fwk,
extenders,
nil,
schedulertesting.FakePersistentVolumeClaimLister{},

View File

@ -51,11 +51,10 @@ import (
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
"k8s.io/kubernetes/pkg/scheduler/api/validation"
"k8s.io/kubernetes/pkg/scheduler/core"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/plugins"
pluginsv1alpha1 "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
)
@ -92,8 +91,8 @@ type Config struct {
// PodPreemptor is used to evict pods and update 'NominatedNode' field of
// the preemptor pod.
PodPreemptor PodPreemptor
// PlugingSet has a set of plugins and data used to run them.
PluginSet pluginsv1alpha1.PluginSet
// Framework runs scheduler plugins at configured extension points.
Framework framework.Framework
// NextPod should be a function that blocks until the next pod
// is available. We don't use a channel for this, because scheduling
@ -183,8 +182,8 @@ type configFactory struct {
pdbLister policylisters.PodDisruptionBudgetLister
// a means to list all StorageClasses
storageClassLister storagelisters.StorageClassLister
// pluginRunner has a set of plugins and the context used for running them.
pluginSet pluginsv1alpha1.PluginSet
// framework has a set of plugins and the context used for running them.
framework framework.Framework
// Close this to stop all reflectors
StopEverything <-chan struct{}
@ -238,6 +237,7 @@ type ConfigFactoryArgs struct {
PercentageOfNodesToScore int32
BindTimeoutSeconds int64
StopCh <-chan struct{}
Registry framework.Registry
}
// NewConfigFactory initializes the default implementation of a Configurator. To encourage eventual privatization of the struct type, we only
@ -248,6 +248,11 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
stopEverything = wait.NeverStop
}
schedulerCache := internalcache.New(30*time.Second, stopEverything)
// TODO(bsalamat): config files should be passed to the framework.
framework, err := framework.NewFramework(args.Registry, nil)
if err != nil {
klog.Fatalf("error initializing the scheduling framework: %v", err)
}
// storageClassInformer is only enabled through VolumeScheduling feature gate
var storageClassLister storagelisters.StorageClassLister
@ -267,6 +272,7 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
statefulSetLister: args.StatefulSetInformer.Lister(),
pdbLister: args.PdbInformer.Lister(),
storageClassLister: storageClassLister,
framework: framework,
schedulerCache: schedulerCache,
StopEverything: stopEverything,
schedulerName: args.SchedulerName,
@ -435,9 +441,6 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
return nil, err
}
// TODO(bsalamat): the default registrar should be able to process config files.
c.pluginSet = plugins.NewDefaultPluginSet(pluginsv1alpha1.NewPluginContext(), &c.schedulerCache)
algo := core.NewGenericScheduler(
c.schedulerCache,
c.podQueue,
@ -445,7 +448,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
predicateMetaProducer,
priorityConfigs,
priorityMetaProducer,
c.pluginSet,
c.framework,
extenders,
c.volumeBinder,
c.pVCLister,
@ -463,7 +466,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
GetBinder: getBinderFunc(c.client, extenders),
PodConditionUpdater: &podConditionUpdater{c.client},
PodPreemptor: &podPreemptor{c.client},
PluginSet: c.pluginSet,
Framework: c.framework,
WaitForCacheSync: func() bool {
return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced)
},

View File

@ -39,6 +39,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
latestschedulerapi "k8s.io/kubernetes/pkg/scheduler/api/latest"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
@ -494,6 +495,7 @@ func newConfigFactory(client clientset.Interface, hardPodAffinitySymmetricWeight
schedulerapi.DefaultPercentageOfNodesToScore,
bindTimeoutSeconds,
stopCh,
framework.NewRegistry(),
})
}

View File

@ -0,0 +1,68 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package multipoint
import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
)
// CommunicatingPlugin is an example of a plugin that implements two
// extension points. It communicates through pluginContext with another function.
type CommunicatingPlugin struct{}
var _ = framework.ReservePlugin(CommunicatingPlugin{})
// Name is the name of the plug used in Registry and configurations.
const Name = "multipoint-communicating-plugin"
// Name returns name of the plugin. It is used in logs, etc.
func (mc CommunicatingPlugin) Name() string {
return Name
}
// Reserve is the functions invoked by the framework at "reserve" extension point.
func (mc CommunicatingPlugin) Reserve(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
if pod == nil {
return framework.NewStatus(framework.Error, "pod cannot be nil")
}
if pod.Name == "my-test-pod" {
pc.Lock()
pc.Write(framework.ContextKey(pod.Name), "never bind")
pc.Unlock()
}
return nil
}
// Prebind is the functions invoked by the framework at "prebind" extension point.
func (mc CommunicatingPlugin) Prebind(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
if pod == nil {
return framework.NewStatus(framework.Error, "pod cannot be nil")
}
pc.RLock()
defer pc.RUnlock()
if v, e := pc.Read(framework.ContextKey(pod.Name)); e == nil && v == "never bind" {
return framework.NewStatus(framework.Unschedulable, "pod is not permitted")
}
return nil
}
// New initializes a new plugin and returns it.
func New(_ *runtime.Unknown, _ framework.Framework) (framework.Plugin, error) {
return &CommunicatingPlugin{}, nil
}

View File

@ -1,5 +1,5 @@
/*
Copyright 2018 The Kubernetes Authors.
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -14,35 +14,42 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package examples
package prebind
import (
"fmt"
"k8s.io/api/core/v1"
plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
"k8s.io/apimachinery/pkg/runtime"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
)
// StatelessPrebindExample is an example of a simple plugin that has no state
// and implements only one hook for prebind.
type StatelessPrebindExample struct{}
var _ = plugins.PrebindPlugin(StatelessPrebindExample{})
var _ = framework.PrebindPlugin(StatelessPrebindExample{})
// Name is the name of the plugin used in Registry and configurations.
const Name = "stateless-prebind-plugin-example"
// Name returns name of the plugin. It is used in logs, etc.
func (sr StatelessPrebindExample) Name() string {
return "stateless-prebind-plugin-example"
return Name
}
// Prebind is the functions invoked by the framework at "prebind" extension point.
func (sr StatelessPrebindExample) Prebind(ps plugins.PluginSet, pod *v1.Pod, nodeName string) (bool, error) {
func (sr StatelessPrebindExample) Prebind(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
if pod == nil {
return false, fmt.Errorf("pod cannot be nil")
return framework.NewStatus(framework.Error, fmt.Sprintf("pod cannot be nil"))
}
return true, nil
if pod.Namespace != "foo" {
return framework.NewStatus(framework.Unschedulable, "only pods from 'foo' namespace are allowed")
}
return nil
}
// NewStatelessPrebindExample initializes a new plugin and returns it.
func NewStatelessPrebindExample() *StatelessPrebindExample {
return &StatelessPrebindExample{}
// New initializes a new plugin and returns it.
func New(_ *runtime.Unknown, _ framework.Framework) (framework.Plugin, error) {
return &StatelessPrebindExample{}, nil
}

View File

@ -0,0 +1,72 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package stateful
import (
"fmt"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
)
// MultipointExample is an example plugin that is executed at multiple extension points.
// This plugin is stateful. It receives arguments at initialization (NewMultipointPlugin)
// and changes its state when it is executed.
type MultipointExample struct {
mpState map[int]string
numRuns int
}
var _ = framework.ReservePlugin(&MultipointExample{})
var _ = framework.PrebindPlugin(&MultipointExample{})
// Name is the name of the plug used in Registry and configurations.
const Name = "multipoint-plugin-example"
// Name returns name of the plugin. It is used in logs, etc.
func (mp *MultipointExample) Name() string {
return Name
}
// Reserve is the functions invoked by the framework at "reserve" extension point.
func (mp *MultipointExample) Reserve(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
mp.numRuns++
return nil
}
// Prebind is the functions invoked by the framework at "prebind" extension point.
func (mp *MultipointExample) Prebind(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
mp.numRuns++
if pod == nil {
return framework.NewStatus(framework.Error, "pod must not be nil")
}
return nil
}
// New initializes a new plugin and returns it.
func New(config *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
if config == nil {
klog.Error("MultipointExample configuration cannot be empty")
return nil, fmt.Errorf("MultipointExample configuration cannot be empty")
}
mp := MultipointExample{
mpState: make(map[int]string),
}
return &mp, nil
}

View File

@ -1,5 +1,5 @@
/*
Copyright 2018 The Kubernetes Authors.
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -37,7 +37,7 @@ type ContextKey string
// PluginContext does not provide any data protection, as all plugins are assumed to be
// trusted.
type PluginContext struct {
Mx sync.RWMutex
mx sync.RWMutex
storage map[ContextKey]ContextData
}
@ -50,6 +50,8 @@ func NewPluginContext() *PluginContext {
// Read retrieves data with the given "key" from PluginContext. If the key is not
// present an error is returned.
// This function is not thread safe. In multi-threaded code, lock should be
// acquired first.
func (c *PluginContext) Read(key ContextKey) (ContextData, error) {
if v, ok := c.storage[key]; ok {
return v, nil
@ -57,38 +59,36 @@ func (c *PluginContext) Read(key ContextKey) (ContextData, error) {
return nil, errors.New(NotFound)
}
// SyncRead is the thread safe version of Read(...).
func (c *PluginContext) SyncRead(key ContextKey) (ContextData, error) {
c.Mx.RLock()
defer c.Mx.RUnlock()
return c.Read(key)
}
// Write stores the given "val" in PluginContext with the given "key".
// This function is not thread safe. In multi-threaded code, lock should be
// acquired first.
func (c *PluginContext) Write(key ContextKey, val ContextData) {
c.storage[key] = val
}
// SyncWrite is the thread safe version of Write(...).
func (c *PluginContext) SyncWrite(key ContextKey, val ContextData) {
c.Mx.Lock()
defer c.Mx.Unlock()
c.Write(key, val)
}
// Delete deletes data with the given key from PluginContext.
// This function is not thread safe. In multi-threaded code, lock should be
// acquired first.
func (c *PluginContext) Delete(key ContextKey) {
delete(c.storage, key)
}
// SyncDelete is the thread safe version of Write(...).
func (c *PluginContext) SyncDelete(key ContextKey) {
c.Mx.Lock()
defer c.Mx.Unlock()
c.Delete(key)
// Lock acquires PluginContext lock.
func (c *PluginContext) Lock() {
c.mx.Lock()
}
// Reset removes all the information in the PluginContext.
func (c *PluginContext) Reset() {
c.storage = make(map[ContextKey]ContextData)
// Unlock releases PluginContext lock.
func (c *PluginContext) Unlock() {
c.mx.Unlock()
}
// RLock acquires PluginContext read lock.
func (c *PluginContext) RLock() {
c.mx.RLock()
}
// RUnlock releases PluginContext read lock.
func (c *PluginContext) RUnlock() {
c.mx.RUnlock()
}

View File

@ -0,0 +1,142 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// This file defines the scheduling framework plugin interfaces.
package v1alpha1
import (
"errors"
"k8s.io/api/core/v1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
)
// Code is the Status code/type which is returned from plugins.
type Code int
// These are predefined codes used in a Status.
const (
// Success means that plugin ran correctly and found pod schedulable.
// NOTE: A nil status is also considered as "Success".
Success Code = 0
// Error is used for internal plugin errors, unexpected input, etc.
Error Code = 1
// Unschedulable is used when a plugin finds a pod unschedulable.
// The accompanying status message should explain why the pod is unschedulable.
Unschedulable Code = 2
)
// Status indicates the result of running a plugin. It consists of a code and a
// message. When the status code is not `Success`, the status message should
// explain why.
// NOTE: A nil Status is also considered as Success.
type Status struct {
code Code
message string
}
// Code returns code of the Status.
func (s *Status) Code() Code {
if s == nil {
return Success
}
return s.code
}
// Message returns message of the Status.
func (s *Status) Message() string {
return s.message
}
// IsSuccess returns true if and only if "Status" is nil or Code is "Success".
func (s *Status) IsSuccess() bool {
if s == nil || s.code == Success {
return true
}
return false
}
// AsError returns an "error" object with the same message as that of the Status.
func (s *Status) AsError() error {
if s.IsSuccess() {
return nil
}
return errors.New(s.message)
}
// NewStatus makes a Status out of the given arguments and returns its pointer.
func NewStatus(code Code, msg string) *Status {
return &Status{
code: code,
message: msg,
}
}
// Plugin is the parent type for all the scheduling framework plugins.
type Plugin interface {
Name() string
}
// ReservePlugin is an interface for Reserve plugins. These plugins are called
// at the reservation point. These are meant to update the state of the plugin.
// This concept used to be called 'assume' in the original scheduler.
// These plugins should return only Success or Error in Status.code. However,
// the scheduler accepts other valid codes as well. Anything other than Success
// will lead to rejection of the pod.
type ReservePlugin interface {
Plugin
// Reserve is called by the scheduling framework when the scheduler cache is
// updated.
Reserve(pc *PluginContext, p *v1.Pod, nodeName string) *Status
}
// PrebindPlugin is an interface that must be implemented by "prebind" plugins.
// These plugins are called before a pod being scheduled
type PrebindPlugin interface {
Plugin
// Prebind is called before binding a pod. All prebind plugins must return
// success or the pod will be rejected and won't be sent for binding.
Prebind(pc *PluginContext, p *v1.Pod, nodeName string) *Status
}
// Framework manages the set of plugins in use by the scheduling framework.
// Configured plugins are called at specified points in a scheduling context.
type Framework interface {
FrameworkHandle
// RunPrebindPlugins runs the set of configured prebind plugins. It returns
// *Status and its code is set to non-success if any of the plugins returns
// anything but Success. If the Status code is "Unschedulable", it is
// considered as a scheduling check failure, otherwise, it is considered as an
// internal error. In either case the pod is not going to be bound.
RunPrebindPlugins(pc *PluginContext, pod *v1.Pod, nodeName string) *Status
// RunReservePlugins runs the set of configured reserve plugins. If any of these
// plugins returns an error, it does not continue running the remaining ones and
// returns the error. In such case, pod will not be scheduled.
RunReservePlugins(pc *PluginContext, pod *v1.Pod, nodeName string) *Status
}
// FrameworkHandle provides data and some tools that plugins can use. It is
// passed to the plugin factories at the time of plugin initialization. Plugins
// must store and use this handle to call framework functions.
type FrameworkHandle interface {
// NodeInfoSnapshot return the latest NodeInfo snapshot. The snapshot
// is taken at the beginning of a scheduling cycle and remains unchanged until
// a pod finishes "Reserve" point. There is no guarantee that the information
// remains unchanged in the binding phase of scheduling.
NodeInfoSnapshot() *internalcache.NodeInfoSnapshot
}

View File

@ -122,8 +122,8 @@ func newNodeInfoListItem(ni *schedulernodeinfo.NodeInfo) *nodeInfoListItem {
}
// NewNodeInfoSnapshot initializes a NodeInfoSnapshot struct and returns it.
func NewNodeInfoSnapshot() NodeInfoSnapshot {
return NodeInfoSnapshot{
func NewNodeInfoSnapshot() *NodeInfoSnapshot {
return &NodeInfoSnapshot{
NodeInfoMap: make(map[string]*schedulernodeinfo.NodeInfo),
}
}

View File

@ -1078,7 +1078,7 @@ func TestNodeOperators(t *testing.T) {
// Case 2: dump cached nodes successfully.
cachedNodes := NewNodeInfoSnapshot()
cache.UpdateNodeInfoSnapshot(&cachedNodes)
cache.UpdateNodeInfoSnapshot(cachedNodes)
newNode, found := cachedNodes.NodeInfoMap[node.Name]
if !found || len(cachedNodes.NodeInfoMap) != 1 {
t.Errorf("failed to dump cached nodes:\n got: %v \nexpected: %v", cachedNodes, cache.nodes)
@ -1180,7 +1180,7 @@ func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) {
}
var cache *schedulerCache
var snapshot NodeInfoSnapshot
var snapshot *NodeInfoSnapshot
type operation = func()
addNode := func(i int) operation {
@ -1215,8 +1215,8 @@ func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) {
}
updateSnapshot := func() operation {
return func() {
cache.UpdateNodeInfoSnapshot(&snapshot)
if err := compareCacheWithNodeInfoSnapshot(cache, &snapshot); err != nil {
cache.UpdateNodeInfoSnapshot(snapshot)
if err := compareCacheWithNodeInfoSnapshot(cache, snapshot); err != nil {
t.Error(err)
}
}
@ -1356,8 +1356,8 @@ func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) {
}
// Always update the snapshot at the end of operations and compare it.
cache.UpdateNodeInfoSnapshot(&snapshot)
if err := compareCacheWithNodeInfoSnapshot(cache, &snapshot); err != nil {
cache.UpdateNodeInfoSnapshot(snapshot)
if err := compareCacheWithNodeInfoSnapshot(cache, snapshot); err != nil {
t.Error(err)
}
})
@ -1391,7 +1391,7 @@ func BenchmarkUpdate1kNodes30kPods(b *testing.B) {
b.ResetTimer()
for n := 0; n < b.N; n++ {
cachedNodes := NewNodeInfoSnapshot()
cache.UpdateNodeInfoSnapshot(&cachedNodes)
cache.UpdateNodeInfoSnapshot(cachedNodes)
}
}

View File

@ -1,62 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package examples
import (
"fmt"
"k8s.io/api/core/v1"
plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
)
// MultipointCommunicatingPlugin is an example of a plugin that implements two
// extension points. It communicates through pluginContext with another function.
type MultipointCommunicatingPlugin struct{}
var _ = plugins.ReservePlugin(MultipointCommunicatingPlugin{})
// Name returns name of the plugin. It is used in logs, etc.
func (mc MultipointCommunicatingPlugin) Name() string {
return "multipoint-communicating-plugin"
}
// Reserve is the functions invoked by the framework at "reserve" extension point.
func (mc MultipointCommunicatingPlugin) Reserve(ps plugins.PluginSet, pod *v1.Pod, nodeName string) error {
if pod == nil {
return fmt.Errorf("pod cannot be nil")
}
if pod.Name == "my-test-pod" {
ps.Data().Ctx.SyncWrite(plugins.ContextKey(pod.Name), "never bind")
}
return nil
}
// Prebind is the functions invoked by the framework at "prebind" extension point.
func (mc MultipointCommunicatingPlugin) Prebind(ps plugins.PluginSet, pod *v1.Pod, nodeName string) (bool, error) {
if pod == nil {
return false, fmt.Errorf("pod cannot be nil")
}
if v, e := ps.Data().Ctx.SyncRead(plugins.ContextKey(pod.Name)); e == nil && v == "never bind" {
return false, nil
}
return true, nil
}
// NewMultipointCommunicatingPlugin initializes a new plugin and returns it.
func NewMultipointCommunicatingPlugin() *MultipointCommunicatingPlugin {
return &MultipointCommunicatingPlugin{}
}

View File

@ -1,68 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package examples
import (
"fmt"
"k8s.io/klog"
"k8s.io/api/core/v1"
plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
)
// StatefulMultipointExample is an example plugin that is executed at multiple extension points.
// This plugin is stateful. It receives arguments at initialization (NewMultipointPlugin)
// and changes its state when it is executed.
type StatefulMultipointExample struct {
mpState map[int]string
numRuns int
}
var _ = plugins.ReservePlugin(&StatefulMultipointExample{})
var _ = plugins.PrebindPlugin(&StatefulMultipointExample{})
// Name returns name of the plugin. It is used in logs, etc.
func (mp *StatefulMultipointExample) Name() string {
return "multipoint-plugin-example"
}
// Reserve is the functions invoked by the framework at "reserve" extension point.
func (mp *StatefulMultipointExample) Reserve(ps plugins.PluginSet, pod *v1.Pod, nodeName string) error {
mp.numRuns++
return nil
}
// Prebind is the functions invoked by the framework at "prebind" extension point.
func (mp *StatefulMultipointExample) Prebind(ps plugins.PluginSet, pod *v1.Pod, nodeName string) (bool, error) {
mp.numRuns++
if pod == nil {
return false, fmt.Errorf("pod must not be nil")
}
return true, nil
}
// NewStatefulMultipointExample initializes a new plugin and returns it.
func NewStatefulMultipointExample(initState ...interface{}) *StatefulMultipointExample {
if len(initState) == 0 {
klog.Error("StatefulMultipointExample needs exactly one argument for initialization")
return nil
}
mp := StatefulMultipointExample{
mpState: initState[0].(map[int]string),
}
return &mp
}

View File

@ -1,77 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package plugins
import (
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
)
// DefaultPluginSet is the default plugin registrar used by the default scheduler.
type DefaultPluginSet struct {
data *plugins.PluginData
reservePlugins []plugins.ReservePlugin
prebindPlugins []plugins.PrebindPlugin
}
var _ = plugins.PluginSet(&DefaultPluginSet{})
// ReservePlugins returns a slice of default reserve plugins.
func (r *DefaultPluginSet) ReservePlugins() []plugins.ReservePlugin {
return r.reservePlugins
}
// PrebindPlugins returns a slice of default prebind plugins.
func (r *DefaultPluginSet) PrebindPlugins() []plugins.PrebindPlugin {
return r.prebindPlugins
}
// Data returns a pointer to PluginData.
func (r *DefaultPluginSet) Data() *plugins.PluginData {
return r.data
}
// NewDefaultPluginSet initializes default plugin set and returns its pointer.
func NewDefaultPluginSet(ctx *plugins.PluginContext, schedulerCache *cache.Cache) *DefaultPluginSet {
defaultRegistrar := DefaultPluginSet{
data: &plugins.PluginData{
Ctx: ctx,
SchedulerCache: schedulerCache,
},
}
defaultRegistrar.registerReservePlugins()
defaultRegistrar.registerPrebindPlugins()
return &defaultRegistrar
}
func (r *DefaultPluginSet) registerReservePlugins() {
r.reservePlugins = []plugins.ReservePlugin{
// Init functions of all reserve plugins go here. They are called in the
// same order that they are registered.
// Example:
// examples.NewStatefulMultipointExample(map[int]string{1: "test1", 2: "test2"}),
}
}
func (r *DefaultPluginSet) registerPrebindPlugins() {
r.prebindPlugins = []plugins.PrebindPlugin{
// Init functions of all prebind plugins go here. They are called in the
// same order that they are registered.
// Example:
// examples.NewStatelessPrebindExample(),
}
}

View File

@ -1,63 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// This file defines the scheduling framework plugin interfaces.
package v1alpha1
import (
"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
)
// PluginData carries information that plugins may need.
type PluginData struct {
Ctx *PluginContext
SchedulerCache *cache.Cache
// We may want to add the scheduling queue here too.
}
// Plugin is the parent type for all the scheduling framework plugins.
type Plugin interface {
Name() string
}
// ReservePlugin is an interface for Reserve plugins. These plugins are called
// at the reservation point, AKA "assume". These are meant to updated the state
// of the plugin. They do not return any value (other than error).
type ReservePlugin interface {
Plugin
// Reserve is called by the scheduling framework when the scheduler cache is
// updated.
Reserve(ps PluginSet, p *v1.Pod, nodeName string) error
}
// PrebindPlugin is an interface that must be implemented by "prebind" plugins.
// These plugins are called before a pod being scheduled
type PrebindPlugin interface {
Plugin
// Prebind is called before binding a pod. All prebind plugins must return
// or the pod will not be sent for binding.
Prebind(ps PluginSet, p *v1.Pod, nodeName string) (bool, error)
}
// PluginSet registers plugins used by the scheduling framework.
// The plugins registered are called at specified points in an scheduling cycle.
type PluginSet interface {
Data() *PluginData
ReservePlugins() []ReservePlugin
PrebindPlugins() []PrebindPlugin
}

View File

@ -17,7 +17,6 @@ limitations under the License.
package scheduler
import (
"errors"
"fmt"
"io/ioutil"
"os"
@ -40,6 +39,7 @@ import (
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/core"
"k8s.io/kubernetes/pkg/scheduler/factory"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util"
@ -156,6 +156,7 @@ func New(client clientset.Interface,
DisablePreemption: options.disablePreemption,
PercentageOfNodesToScore: options.percentageOfNodesToScore,
BindTimeoutSeconds: options.bindTimeoutSeconds,
Registry: framework.NewRegistry(),
})
var config *factory.Config
source := schedulerAlgorithmSource
@ -434,11 +435,7 @@ func (sched *Scheduler) bind(assumed *v1.Pod, b *v1.Binding) error {
// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne() {
plugins := sched.config.PluginSet
// Remove all plugin context data at the beginning of a scheduling cycle.
if plugins.Data().Ctx != nil {
plugins.Data().Ctx.Reset()
}
fwk := sched.config.Framework
pod := sched.config.NextPod()
// pod could be nil when schedulerQueue is closed
@ -455,6 +452,7 @@ func (sched *Scheduler) scheduleOne() {
// Synchronously attempt to find a fit for the pod.
start := time.Now()
pluginContext := framework.NewPluginContext()
scheduleResult, err := sched.schedule(pod)
if err != nil {
// schedule() may have failed because the pod would not fit on any host, so we try to
@ -505,15 +503,12 @@ func (sched *Scheduler) scheduleOne() {
}
// Run "reserve" plugins.
for _, pl := range plugins.ReservePlugins() {
if err := pl.Reserve(plugins, assumedPod, scheduleResult.SuggestedHost); err != nil {
klog.Errorf("error while running %v reserve plugin for pod %v: %v", pl.Name(), assumedPod.Name, err)
sched.recordSchedulingFailure(assumedPod, err, SchedulerError,
fmt.Sprintf("reserve plugin %v failed", pl.Name()))
metrics.PodScheduleErrors.Inc()
return
}
if sts := fwk.RunReservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
sched.recordSchedulingFailure(assumedPod, sts.AsError(), SchedulerError, sts.Message())
metrics.PodScheduleErrors.Inc()
return
}
// assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
if err != nil {
@ -534,29 +529,20 @@ func (sched *Scheduler) scheduleOne() {
}
// Run "prebind" plugins.
for _, pl := range plugins.PrebindPlugins() {
approved, err := pl.Prebind(plugins, assumedPod, scheduleResult.SuggestedHost)
if err != nil {
approved = false
klog.Errorf("error while running %v prebind plugin for pod %v: %v", pl.Name(), assumedPod.Name, err)
prebindStatus := fwk.RunPrebindPlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
if !prebindStatus.IsSuccess() {
var reason string
if prebindStatus.Code() == framework.Unschedulable {
reason = v1.PodReasonUnschedulable
} else {
metrics.PodScheduleErrors.Inc()
reason = SchedulerError
}
if !approved {
if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
}
var reason string
if err == nil {
msg := fmt.Sprintf("prebind plugin %v rejected pod %v.", pl.Name(), assumedPod.Name)
klog.V(4).Infof(msg)
err = errors.New(msg)
reason = v1.PodReasonUnschedulable
} else {
reason = SchedulerError
}
sched.recordSchedulingFailure(assumedPod, err, reason, err.Error())
return
if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
}
sched.recordSchedulingFailure(assumedPod, prebindStatus.AsError(), reason, prebindStatus.Message())
return
}
err := sched.bind(assumedPod, &v1.Binding{

View File

@ -50,6 +50,7 @@ import (
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/core"
"k8s.io/kubernetes/pkg/scheduler/factory"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
@ -196,6 +197,7 @@ func TestSchedulerCreation(t *testing.T) {
eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"}),
kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &testSource},
stopCh,
EmptyPluginRegistry,
WithBindTimeoutSeconds(defaultBindTimeout))
if err != nil {
@ -273,6 +275,7 @@ func TestScheduler(t *testing.T) {
var gotAssumedPod *v1.Pod
var gotBinding *v1.Binding
fwk, _ := framework.NewFramework(EmptyPluginRegistry, nil)
s := NewFromConfig(&factory.Config{
SchedulerCache: &fakecache.Cache{
ForgetFunc: func(pod *v1.Pod) {
@ -298,7 +301,7 @@ func TestScheduler(t *testing.T) {
NextPod: func() *v1.Pod {
return item.sendPod
},
PluginSet: &EmptyPluginSet{},
Framework: fwk,
Recorder: eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"}),
VolumeBinder: volumebinder.NewFakeVolumeBinder(&persistentvolume.FakeVolumeBinderConfig{AllBound: true}),
})
@ -636,6 +639,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
// queuedPodStore: pods queued before processing.
// scache: scheduler cache that might contain assumed pods.
func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]predicates.FitPredicate, recorder record.EventRecorder) (*Scheduler, chan *v1.Binding, chan error) {
framework, _ := framework.NewFramework(EmptyPluginRegistry, nil)
algo := core.NewGenericScheduler(
scache,
internalqueue.NewSchedulingQueue(nil),
@ -643,7 +647,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
predicates.EmptyPredicateMetadataProducer,
[]priorities.PriorityConfig{},
priorities.EmptyPriorityMetadataProducer,
&EmptyPluginSet{},
framework,
[]algorithm.SchedulerExtender{},
nil,
informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
@ -674,7 +678,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
Recorder: &record.FakeRecorder{},
PodConditionUpdater: fakePodConditionUpdater{},
PodPreemptor: fakePodPreemptor{},
PluginSet: &EmptyPluginSet{},
Framework: framework,
VolumeBinder: volumebinder.NewFakeVolumeBinder(&persistentvolume.FakeVolumeBinderConfig{AllBound: true}),
}
@ -688,6 +692,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
}
func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]predicates.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) {
framework, _ := framework.NewFramework(EmptyPluginRegistry, nil)
algo := core.NewGenericScheduler(
scache,
internalqueue.NewSchedulingQueue(nil),
@ -695,7 +700,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
predicates.EmptyPredicateMetadataProducer,
[]priorities.PriorityConfig{},
priorities.EmptyPriorityMetadataProducer,
&EmptyPluginSet{},
framework,
[]algorithm.SchedulerExtender{},
nil,
informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
@ -730,7 +735,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
PodConditionUpdater: fakePodConditionUpdater{},
PodPreemptor: fakePodPreemptor{},
StopEverything: stop,
PluginSet: &EmptyPluginSet{},
Framework: framework,
VolumeBinder: volumebinder.NewFakeVolumeBinder(&persistentvolume.FakeVolumeBinderConfig{AllBound: true}),
})

View File

@ -27,8 +27,8 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
"k8s.io/kubernetes/pkg/scheduler/factory"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
)
// FakeConfigurator is an implementation for test.
@ -91,25 +91,5 @@ func (fc *FakeConfigurator) CreateFromKeys(predicateKeys, priorityKeys sets.Stri
return fc.Config, nil
}
// EmptyPluginSet is the default plugin registrar used by the default scheduler.
type EmptyPluginSet struct{}
var _ = plugins.PluginSet(EmptyPluginSet{})
// ReservePlugins returns a slice of default reserve plugins.
func (r EmptyPluginSet) ReservePlugins() []plugins.ReservePlugin {
return []plugins.ReservePlugin{}
}
// PrebindPlugins returns a slice of default prebind plugins.
func (r EmptyPluginSet) PrebindPlugins() []plugins.PrebindPlugin {
return []plugins.PrebindPlugin{}
}
// Data returns a pointer to PluginData.
func (r EmptyPluginSet) Data() *plugins.PluginData {
return &plugins.PluginData{
Ctx: nil,
SchedulerCache: nil,
}
}
// EmptyPluginRegistry is an empty plugin registry used in tests.
var EmptyPluginRegistry = framework.Registry{}

View File

@ -0,0 +1,218 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"fmt"
"k8s.io/apimachinery/pkg/runtime"
"testing"
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
)
// TesterPlugin is common ancestor for a test plugin that allows injection of
// failures and some other test functionalities.
type TesterPlugin struct {
numReserveCalled int
numPrebindCalled int
failReserve bool
failPrebind bool
rejectPrebind bool
}
type ReservePlugin struct {
TesterPlugin
}
type PrebindPlugin struct {
TesterPlugin
}
const (
reservePluginName = "reserve-plugin"
prebindPluginName = "prebind-plugin"
)
var _ = framework.ReservePlugin(&ReservePlugin{})
var _ = framework.PrebindPlugin(&PrebindPlugin{})
// Name returns name of the plugin.
func (rp *ReservePlugin) Name() string {
return reservePluginName
}
// Reserve is a test function that returns an error or nil, depending on the
// value of "failReserve".
func (rp *ReservePlugin) Reserve(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
rp.numReserveCalled++
if rp.failReserve {
return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name))
}
return nil
}
var resPlugin = &ReservePlugin{}
var pbdPlugin = &PrebindPlugin{}
// NewReservePlugin is the factory for reserve plugin.
func NewReservePlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return resPlugin, nil
}
// Name returns name of the plugin.
func (pp *PrebindPlugin) Name() string {
return prebindPluginName
}
// Prebind is a test function that returns (true, nil) or errors for testing.
func (pp *PrebindPlugin) Prebind(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
pp.numPrebindCalled++
if pp.failPrebind {
return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name))
}
if pp.rejectPrebind {
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name))
}
return nil
}
// NewPrebindPlugin is the factory for prebind plugin.
func NewPrebindPlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return pbdPlugin, nil
}
// TestReservePlugin tests invocation of reserve plugins.
func TestReservePlugin(t *testing.T) {
// Create a plugin registry for testing. Register only a reserve plugin.
registry := framework.Registry{reservePluginName: NewReservePlugin}
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerWithOptions(t,
initTestMaster(t, "reserve-plugin", nil),
false, nil, registry, false, time.Second)
defer cleanupTest(t, context)
cs := context.clientSet
// Add a few nodes.
_, err := createNodes(cs, "test-node", nil, 2)
if err != nil {
t.Fatalf("Cannot create nodes: %v", err)
}
for _, fail := range []bool{false, true} {
resPlugin.failReserve = fail
// Create a best effort pod.
pod, err := createPausePod(cs,
initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name}))
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}
if fail {
if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(cs, pod.Namespace, pod.Name)); err != nil {
t.Errorf("Didn't expected the pod to be scheduled. error: %v", err)
}
} else {
if err = waitForPodToSchedule(cs, pod); err != nil {
t.Errorf("Expected the pod to be scheduled. error: %v", err)
}
}
if resPlugin.numReserveCalled == 0 {
t.Errorf("Expected the reserve plugin to be called.")
}
cleanupPods(cs, t, []*v1.Pod{pod})
}
}
// TestPrebindPlugin tests invocation of prebind plugins.
func TestPrebindPlugin(t *testing.T) {
// Create a plugin registry for testing. Register only a reserve plugin.
registry := framework.Registry{prebindPluginName: NewPrebindPlugin}
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerWithOptions(t,
initTestMaster(t, "prebind-plugin", nil),
false, nil, registry, false, time.Second)
defer cleanupTest(t, context)
cs := context.clientSet
// Add a few nodes.
_, err := createNodes(cs, "test-node", nil, 2)
if err != nil {
t.Fatalf("Cannot create nodes: %v", err)
}
tests := []struct {
fail bool
reject bool
}{
{
fail: false,
reject: false,
},
{
fail: true,
reject: false,
},
{
fail: false,
reject: true,
},
{
fail: true,
reject: true,
},
}
for i, test := range tests {
pbdPlugin.failPrebind = test.fail
pbdPlugin.rejectPrebind = test.reject
// Create a best effort pod.
pod, err := createPausePod(cs,
initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name}))
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}
if test.fail {
if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(cs, pod.Namespace, pod.Name)); err != nil {
t.Errorf("test #%v: Expected a scheduling error, but didn't get it. error: %v", i, err)
}
} else {
if test.reject {
if err = waitForPodUnschedulable(cs, pod); err != nil {
t.Errorf("test #%v: Didn't expected the pod to be scheduled. error: %v", i, err)
}
} else {
if err = waitForPodToSchedule(cs, pod); err != nil {
t.Errorf("test #%v: Expected the pod to be scheduled. error: %v", i, err)
}
}
}
if pbdPlugin.numPrebindCalled == 0 {
t.Errorf("Expected the prebind plugin to be called.")
}
cleanupPods(cs, t, []*v1.Pod{pod})
}
}

View File

@ -1,269 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"fmt"
"testing"
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
)
// StatefulMultipointExample is an example plugin that is executed at multiple extension points.
// This plugin is stateful. It receives arguments at initialization (NewMultipointPlugin)
// and changes its state when it is executed.
type TesterPlugin struct {
numReserveCalled int
numPrebindCalled int
failReserve bool
failPrebind bool
rejectPrebind bool
}
var _ = plugins.ReservePlugin(&TesterPlugin{})
var _ = plugins.PrebindPlugin(&TesterPlugin{})
// Name returns name of the plugin.
func (tp *TesterPlugin) Name() string {
return "tester-plugin"
}
// Reserve is a test function that returns an error or nil, depending on the
// value of "failReserve".
func (tp *TesterPlugin) Reserve(ps plugins.PluginSet, pod *v1.Pod, nodeName string) error {
tp.numReserveCalled++
if tp.failReserve {
return fmt.Errorf("injecting failure for pod %v", pod.Name)
}
return nil
}
// Prebind is a test function that returns (true, nil) or errors for testing.
func (tp *TesterPlugin) Prebind(ps plugins.PluginSet, pod *v1.Pod, nodeName string) (bool, error) {
var err error
tp.numPrebindCalled++
if tp.failPrebind {
err = fmt.Errorf("injecting failure for pod %v", pod.Name)
}
if tp.rejectPrebind {
return false, err
}
return true, err
}
// TestPluginSet is a plugin set used for testing purposes.
type TestPluginSet struct {
data *plugins.PluginData
reservePlugins []plugins.ReservePlugin
prebindPlugins []plugins.PrebindPlugin
}
var _ = plugins.PluginSet(&TestPluginSet{})
// ReservePlugins returns a slice of default reserve plugins.
func (r *TestPluginSet) ReservePlugins() []plugins.ReservePlugin {
return r.reservePlugins
}
// PrebindPlugins returns a slice of default prebind plugins.
func (r *TestPluginSet) PrebindPlugins() []plugins.PrebindPlugin {
return r.prebindPlugins
}
// Data returns a pointer to PluginData.
func (r *TestPluginSet) Data() *plugins.PluginData {
return r.data
}
// TestReservePlugin tests invocation of reserve plugins.
func TestReservePlugin(t *testing.T) {
// Create a plugin set for testing. Register only a reserve plugin.
testerPlugin := &TesterPlugin{}
testPluginSet := &TestPluginSet{
data: &plugins.PluginData{
Ctx: plugins.NewPluginContext(),
},
reservePlugins: []plugins.ReservePlugin{testerPlugin},
}
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerWithOptions(t,
initTestMaster(t, "reserve-plugin", nil),
false, nil, testPluginSet, false, time.Second)
defer cleanupTest(t, context)
cs := context.clientSet
// Add a few nodes.
_, err := createNodes(cs, "test-node", nil, 2)
if err != nil {
t.Fatalf("Cannot create nodes: %v", err)
}
for _, fail := range []bool{false, true} {
testerPlugin.failReserve = fail
// Create a best effort pod.
pod, err := createPausePod(cs,
initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name}))
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}
if fail {
if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(cs, pod.Namespace, pod.Name)); err != nil {
t.Errorf("Didn't expected the pod to be scheduled. error: %v", err)
}
} else {
if err = waitForPodToSchedule(cs, pod); err != nil {
t.Errorf("Expected the pod to be scheduled. error: %v", err)
}
}
if testerPlugin.numReserveCalled == 0 {
t.Errorf("Expected the reserve plugin to be called.")
}
cleanupPods(cs, t, []*v1.Pod{pod})
}
}
// TestPrebindPlugin tests invocation of prebind plugins.
func TestPrebindPlugin(t *testing.T) {
// Create a plugin set for testing. Register only a prebind plugin.
testerPlugin := &TesterPlugin{}
testPluginSet := &TestPluginSet{
data: &plugins.PluginData{
Ctx: plugins.NewPluginContext(),
},
prebindPlugins: []plugins.PrebindPlugin{testerPlugin},
}
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerWithOptions(t,
initTestMaster(t, "prebind-plugin", nil),
false, nil, testPluginSet, false, time.Second)
defer cleanupTest(t, context)
cs := context.clientSet
// Add a few nodes.
_, err := createNodes(cs, "test-node", nil, 2)
if err != nil {
t.Fatalf("Cannot create nodes: %v", err)
}
tests := []struct {
fail bool
reject bool
}{
{
fail: false,
reject: false,
},
{
fail: true,
reject: false,
},
{
fail: false,
reject: true,
},
{
fail: true,
reject: true,
},
}
for i, test := range tests {
testerPlugin.failPrebind = test.fail
testerPlugin.rejectPrebind = test.reject
// Create a best effort pod.
pod, err := createPausePod(cs,
initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name}))
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}
if test.fail {
if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(cs, pod.Namespace, pod.Name)); err != nil {
t.Errorf("test #%v: Expected a scheduling error, but didn't get it. error: %v", i, err)
}
} else {
if test.reject {
if err = waitForPodUnschedulable(cs, pod); err != nil {
t.Errorf("test #%v: Didn't expected the pod to be scheduled. error: %v", i, err)
}
} else {
if err = waitForPodToSchedule(cs, pod); err != nil {
t.Errorf("test #%v: Expected the pod to be scheduled. error: %v", i, err)
}
}
}
if testerPlugin.numPrebindCalled == 0 {
t.Errorf("Expected the prebind plugin to be called.")
}
cleanupPods(cs, t, []*v1.Pod{pod})
}
}
// TestContextCleanup tests that data inserted in the pluginContext is removed
// after a scheduling cycle is over.
func TestContextCleanup(t *testing.T) {
// Create a plugin set for testing.
testerPlugin := &TesterPlugin{}
testPluginSet := &TestPluginSet{
data: &plugins.PluginData{
Ctx: plugins.NewPluginContext(),
},
reservePlugins: []plugins.ReservePlugin{testerPlugin},
prebindPlugins: []plugins.PrebindPlugin{testerPlugin},
}
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerWithOptions(t,
initTestMaster(t, "plugin-context-cleanup", nil),
false, nil, testPluginSet, false, time.Second)
defer cleanupTest(t, context)
cs := context.clientSet
// Add a few nodes.
_, err := createNodes(cs, "test-node", nil, 2)
if err != nil {
t.Fatalf("Cannot create nodes: %v", err)
}
// Insert something in the plugin context.
testPluginSet.Data().Ctx.Write("test", "foo")
// Create and schedule a best effort pod.
pod, err := runPausePod(cs,
initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name}))
if err != nil {
t.Errorf("Error while creating or scheduling a test pod: %v", err)
}
// Make sure the data inserted in the plugin context is removed.
_, err = testPluginSet.Data().Ctx.Read("test")
if err == nil || err.Error() != plugins.NotFound {
t.Errorf("Expected the plugin context to be cleaned up after a scheduling cycle. error: %v", err)
}
cleanupPods(cs, t, []*v1.Pod{pod})
}

View File

@ -43,6 +43,7 @@ import (
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/factory"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/kubernetes/test/integration/framework"
)
@ -595,7 +596,7 @@ func TestMultiScheduler(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
schedulerConfigFactory2 := createConfiguratorWithPodInformer(fooScheduler, clientSet2, podInformer2, informerFactory2, stopCh)
schedulerConfigFactory2 := createConfiguratorWithPodInformer(fooScheduler, clientSet2, podInformer2, informerFactory2, schedulerframework.NewRegistry(), stopCh)
schedulerConfig2, err := schedulerConfigFactory2.Create()
if err != nil {
t.Errorf("Couldn't create scheduler config: %v", err)

View File

@ -49,7 +49,7 @@ import (
_ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
"k8s.io/kubernetes/pkg/scheduler/factory"
plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
taintutils "k8s.io/kubernetes/pkg/util/taints"
"k8s.io/kubernetes/test/integration/framework"
imageutils "k8s.io/kubernetes/test/utils/image"
@ -73,6 +73,7 @@ func createConfiguratorWithPodInformer(
clientSet clientset.Interface,
podInformer coreinformers.PodInformer,
informerFactory informers.SharedInformerFactory,
pluginRegistry schedulerframework.Registry,
stopCh <-chan struct{},
) factory.Configurator {
return factory.NewConfigFactory(&factory.ConfigFactoryArgs{
@ -88,6 +89,7 @@ func createConfiguratorWithPodInformer(
ServiceInformer: informerFactory.Core().V1().Services(),
PdbInformer: informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
StorageClassInformer: informerFactory.Storage().V1().StorageClasses(),
Registry: pluginRegistry,
HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
DisablePreemption: false,
PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
@ -146,7 +148,7 @@ func initTestScheduler(
) *testContext {
// Pod preemption is enabled by default scheduler configuration, but preemption only happens when PodPriority
// feature gate is enabled at the same time.
return initTestSchedulerWithOptions(t, context, setPodInformer, policy, nil, false, time.Second)
return initTestSchedulerWithOptions(t, context, setPodInformer, policy, schedulerframework.NewRegistry(), false, time.Second)
}
// initTestSchedulerWithOptions initializes a test environment and creates a scheduler with default
@ -156,7 +158,7 @@ func initTestSchedulerWithOptions(
context *testContext,
setPodInformer bool,
policy *schedulerapi.Policy,
pluginSet plugins.PluginSet,
pluginRegistry schedulerframework.Registry,
disablePreemption bool,
resyncPeriod time.Duration,
) *testContext {
@ -173,7 +175,7 @@ func initTestSchedulerWithOptions(
}
context.schedulerConfigFactory = createConfiguratorWithPodInformer(
v1.DefaultSchedulerName, context.clientSet, podInformer, context.informerFactory, context.stopCh)
v1.DefaultSchedulerName, context.clientSet, podInformer, context.informerFactory, pluginRegistry, context.stopCh)
var err error
@ -208,11 +210,6 @@ func initTestSchedulerWithOptions(
controller.WaitForCacheSync("scheduler", context.schedulerConfig.StopEverything, podInformer.Informer().HasSynced)
}
// Set pluginSet if provided. DefaultPluginSet is used if this is not specified.
if pluginSet != nil {
context.schedulerConfig.PluginSet = pluginSet
}
eventBroadcaster := record.NewBroadcaster()
context.schedulerConfig.Recorder = eventBroadcaster.NewRecorder(
legacyscheme.Scheme,
@ -259,7 +256,8 @@ func initTest(t *testing.T, nsPrefix string) *testContext {
// configuration but with pod preemption disabled.
func initTestDisablePreemption(t *testing.T, nsPrefix string) *testContext {
return initTestSchedulerWithOptions(
t, initTestMaster(t, nsPrefix, nil), true, nil, nil, true, time.Second)
t, initTestMaster(t, nsPrefix, nil), true, nil,
schedulerframework.NewRegistry(), true, time.Second)
}
// cleanupTest deletes the scheduler and the test namespace. It should be called