diff --git a/pkg/scheduler/framework/v1alpha1/framework.go b/pkg/scheduler/framework/v1alpha1/framework.go new file mode 100644 index 00000000000..fb860d4209a --- /dev/null +++ b/pkg/scheduler/framework/v1alpha1/framework.go @@ -0,0 +1,114 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "fmt" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/klog" + "k8s.io/kubernetes/pkg/scheduler/internal/cache" +) + +// framework is the component responsible for initializing and running scheduler +// plugins. +type framework struct { + registry Registry + nodeInfoSnapshot *cache.NodeInfoSnapshot + plugins map[string]Plugin // a map of initialized plugins. Plugin name:plugin instance. + reservePlugins []ReservePlugin + prebindPlugins []PrebindPlugin +} + +var _ = Framework(&framework{}) + +// NewFramework initializes plugins given the configuration and the registry. +func NewFramework(r Registry, _ *runtime.Unknown) (Framework, error) { + f := &framework{ + registry: r, + nodeInfoSnapshot: cache.NewNodeInfoSnapshot(), + plugins: make(map[string]Plugin), + } + + // TODO: The framework needs to read the scheduler config and initialize only + // needed plugins. In this initial version of the code, we initialize all. + for name, factory := range r { + // TODO: 'nil' should be replaced by plugin config. + p, err := factory(nil, f) + if err != nil { + return nil, fmt.Errorf("error initializing plugin %v: %v", name, err) + } + f.plugins[name] = p + + // TODO: For now, we assume any plugins that implements an extension + // point wants to be called at that extension point. We should change this + // later and add these plugins based on the configuration. + if rp, ok := p.(ReservePlugin); ok { + f.reservePlugins = append(f.reservePlugins, rp) + } + if pp, ok := p.(PrebindPlugin); ok { + f.prebindPlugins = append(f.prebindPlugins, pp) + } + } + return f, nil +} + +// RunPrebindPlugins runs the set of configured prebind plugins. It returns a +// failure (bool) if any of the plugins returns an error. It also returns an +// error containing the rejection message or the error occurred in the plugin. +func (f *framework) RunPrebindPlugins( + pc *PluginContext, pod *v1.Pod, nodeName string) *Status { + for _, pl := range f.prebindPlugins { + status := pl.Prebind(pc, pod, nodeName) + if !status.IsSuccess() { + if status.Code() == Unschedulable { + msg := fmt.Sprintf("rejected by %v at prebind: %v", pl.Name(), status.Message()) + klog.V(4).Infof(msg) + return NewStatus(status.Code(), msg) + } + msg := fmt.Sprintf("error while running %v prebind plugin for pod %v: %v", pl.Name(), pod.Name, status.Message()) + klog.Error(msg) + return NewStatus(Error, msg) + } + } + return nil +} + +// RunReservePlugins runs the set of configured reserve plugins. If any of these +// plugins returns an error, it does not continue running the remaining ones and +// returns the error. In such case, pod will not be scheduled. +func (f *framework) RunReservePlugins( + pc *PluginContext, pod *v1.Pod, nodeName string) *Status { + for _, pl := range f.reservePlugins { + status := pl.Reserve(pc, pod, nodeName) + if !status.IsSuccess() { + msg := fmt.Sprintf("error while running %v reserve plugin for pod %v: %v", pl.Name(), pod.Name, status.Message()) + klog.Error(msg) + return NewStatus(Error, msg) + } + } + return nil +} + +// NodeInfoSnapshot returns the latest NodeInfo snapshot. The snapshot +// is taken at the beginning of a scheduling cycle and remains unchanged until a +// pod finishes "Reserve". There is no guarantee that the information remains +// unchanged after "Reserve". +func (f *framework) NodeInfoSnapshot() *cache.NodeInfoSnapshot { + return f.nodeInfoSnapshot +} diff --git a/pkg/scheduler/framework/v1alpha1/registry.go b/pkg/scheduler/framework/v1alpha1/registry.go new file mode 100644 index 00000000000..ab92e96865f --- /dev/null +++ b/pkg/scheduler/framework/v1alpha1/registry.go @@ -0,0 +1,67 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "fmt" + + "k8s.io/apimachinery/pkg/runtime" +) + +// PluginFactory is a function that builds a plugin. +type PluginFactory = func(configuration *runtime.Unknown, f FrameworkHandle) (Plugin, error) + +// Registry is a collection of all available plugins. The framework uses a +// registry to enable and initialize configured plugins. +// All plugins must be in the registry before initializing the framework. +type Registry map[string]PluginFactory + +// Register adds a new plugin to the registry. If a plugin with the same name +// exists, it returns an error. +func (r Registry) Register(name string, factory PluginFactory) error { + if _, ok := r[name]; ok { + return fmt.Errorf("a plugin named %v already exists", name) + } + r[name] = factory + return nil +} + +// Unregister removes an existing plugin from the registry. If no plugin with +// the provided name exists, it returns an error. +func (r Registry) Unregister(name string) error { + if _, ok := r[name]; !ok { + return fmt.Errorf("no plugin named %v exists", name) + } + delete(r, name) + return nil +} + +// NewRegistry builds a default registry with all the default plugins. +// This is the registry that Kubernetes default scheduler uses. A scheduler that +// runs custom plugins, can pass a different Registry and when initializing the +// scheduler. +func NewRegistry() Registry { + return Registry{ + // FactoryMap: + // New plugins are registered here. + // example: + // { + // stateful_plugin.Name: stateful.NewStatefulMultipointExample, + // fooplugin.Name: fooplugin.New, + // } + } +}