mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Merge pull request #83430 from draveness/feature/rename-plugincontext-to-cyclestate
feat(scheduler): rename PluginContext to CycleState
This commit is contained in:
commit
eba349b855
@ -557,7 +557,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
|
||||
schedulerapi.DefaultPercentageOfNodesToScore,
|
||||
false)
|
||||
podIgnored := &v1.Pod{}
|
||||
result, err := scheduler.Schedule(framework.NewPluginContext(), podIgnored)
|
||||
result, err := scheduler.Schedule(framework.NewCycleState(), podIgnored)
|
||||
if test.expectsErr {
|
||||
if err == nil {
|
||||
t.Errorf("Unexpected non-error, result %+v", result)
|
||||
|
@ -134,12 +134,12 @@ func (f *FitError) Error() string {
|
||||
// onto machines.
|
||||
// TODO: Rename this type.
|
||||
type ScheduleAlgorithm interface {
|
||||
Schedule(*framework.PluginContext, *v1.Pod) (scheduleResult ScheduleResult, err error)
|
||||
Schedule(*framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error)
|
||||
// Preempt receives scheduling errors for a pod and tries to create room for
|
||||
// the pod by preempting lower priority pods if possible.
|
||||
// It returns the node where preemption happened, a list of preempted pods, a
|
||||
// list of pods whose nominated node name should be removed, and error if any.
|
||||
Preempt(*framework.PluginContext, *v1.Pod, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
|
||||
Preempt(*framework.CycleState, *v1.Pod, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
|
||||
// Predicates() returns a pointer to a map of predicate functions. This is
|
||||
// exposed for testing.
|
||||
Predicates() map[string]predicates.FitPredicate
|
||||
@ -191,7 +191,7 @@ func (g *genericScheduler) snapshot() error {
|
||||
// Schedule tries to schedule the given pod to one of the nodes in the node list.
|
||||
// If it succeeds, it will return the name of the node.
|
||||
// If it fails, it will return a FitError error with reasons.
|
||||
func (g *genericScheduler) Schedule(pluginContext *framework.PluginContext, pod *v1.Pod) (result ScheduleResult, err error) {
|
||||
func (g *genericScheduler) Schedule(state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
|
||||
trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
|
||||
defer trace.LogIfLong(100 * time.Millisecond)
|
||||
|
||||
@ -200,7 +200,7 @@ func (g *genericScheduler) Schedule(pluginContext *framework.PluginContext, pod
|
||||
}
|
||||
|
||||
// Run "prefilter" plugins.
|
||||
preFilterStatus := g.framework.RunPreFilterPlugins(pluginContext, pod)
|
||||
preFilterStatus := g.framework.RunPreFilterPlugins(state, pod)
|
||||
if !preFilterStatus.IsSuccess() {
|
||||
return result, preFilterStatus.AsError()
|
||||
}
|
||||
@ -216,13 +216,13 @@ func (g *genericScheduler) Schedule(pluginContext *framework.PluginContext, pod
|
||||
|
||||
trace.Step("Basic checks done")
|
||||
startPredicateEvalTime := time.Now()
|
||||
filteredNodes, failedPredicateMap, filteredNodesStatuses, err := g.findNodesThatFit(pluginContext, pod)
|
||||
filteredNodes, failedPredicateMap, filteredNodesStatuses, err := g.findNodesThatFit(state, pod)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
|
||||
// Run "postfilter" plugins.
|
||||
postfilterStatus := g.framework.RunPostFilterPlugins(pluginContext, pod, filteredNodes, filteredNodesStatuses)
|
||||
postfilterStatus := g.framework.RunPostFilterPlugins(state, pod, filteredNodes, filteredNodesStatuses)
|
||||
if !postfilterStatus.IsSuccess() {
|
||||
return result, postfilterStatus.AsError()
|
||||
}
|
||||
@ -254,7 +254,7 @@ func (g *genericScheduler) Schedule(pluginContext *framework.PluginContext, pod
|
||||
}
|
||||
|
||||
metaPrioritiesInterface := g.priorityMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap)
|
||||
priorityList, err := PrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders, g.framework, pluginContext)
|
||||
priorityList, err := PrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders, g.framework, state)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
@ -326,7 +326,7 @@ func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (st
|
||||
// other pods with the same priority. The nominated pod prevents other pods from
|
||||
// using the nominated resources and the nominated pod could take a long time
|
||||
// before it is retried after many other pending pods.
|
||||
func (g *genericScheduler) Preempt(pluginContext *framework.PluginContext, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
|
||||
func (g *genericScheduler) Preempt(state *framework.CycleState, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
|
||||
// Scheduler may return various types of errors. Consider preemption only if
|
||||
// the error is of type FitError.
|
||||
fitError, ok := scheduleErr.(*FitError)
|
||||
@ -351,7 +351,7 @@ func (g *genericScheduler) Preempt(pluginContext *framework.PluginContext, pod *
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
nodeToVictims, err := g.selectNodesForPreemption(pluginContext, pod, g.nodeInfoSnapshot.NodeInfoMap, potentialNodes, g.predicates,
|
||||
nodeToVictims, err := g.selectNodesForPreemption(state, pod, g.nodeInfoSnapshot.NodeInfoMap, potentialNodes, g.predicates,
|
||||
g.predicateMetaProducer, g.schedulingQueue, pdbs)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
@ -470,7 +470,7 @@ func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes i
|
||||
|
||||
// Filters the nodes to find the ones that fit based on the given predicate functions
|
||||
// Each node is passed through the predicate functions to determine if it is a fit
|
||||
func (g *genericScheduler) findNodesThatFit(pluginContext *framework.PluginContext, pod *v1.Pod) ([]*v1.Node, FailedPredicateMap, framework.NodeToStatusMap, error) {
|
||||
func (g *genericScheduler) findNodesThatFit(state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, FailedPredicateMap, framework.NodeToStatusMap, error) {
|
||||
var filtered []*v1.Node
|
||||
failedPredicateMap := FailedPredicateMap{}
|
||||
filteredNodesStatuses := framework.NodeToStatusMap{}
|
||||
@ -499,7 +499,7 @@ func (g *genericScheduler) findNodesThatFit(pluginContext *framework.PluginConte
|
||||
nodeName := g.cache.NodeTree().Next()
|
||||
|
||||
fits, failedPredicates, status, err := g.podFitsOnNode(
|
||||
pluginContext,
|
||||
state,
|
||||
pod,
|
||||
meta,
|
||||
g.nodeInfoSnapshot.NodeInfoMap[nodeName],
|
||||
@ -619,7 +619,7 @@ func addNominatedPods(pod *v1.Pod, meta predicates.PredicateMetadata,
|
||||
// add the nominated pods. Removal of the victims is done by SelectVictimsOnNode().
|
||||
// It removes victims from meta and NodeInfo before calling this function.
|
||||
func (g *genericScheduler) podFitsOnNode(
|
||||
pluginContext *framework.PluginContext,
|
||||
state *framework.CycleState,
|
||||
pod *v1.Pod,
|
||||
meta predicates.PredicateMetadata,
|
||||
info *schedulernodeinfo.NodeInfo,
|
||||
@ -684,7 +684,7 @@ func (g *genericScheduler) podFitsOnNode(
|
||||
}
|
||||
}
|
||||
|
||||
status = g.framework.RunFilterPlugins(pluginContext, pod, nodeInfoToUse)
|
||||
status = g.framework.RunFilterPlugins(state, pod, nodeInfoToUse)
|
||||
if !status.IsSuccess() && !status.IsUnschedulable() {
|
||||
return false, failedPredicates, status, status.AsError()
|
||||
}
|
||||
@ -707,7 +707,7 @@ func PrioritizeNodes(
|
||||
nodes []*v1.Node,
|
||||
extenders []algorithm.SchedulerExtender,
|
||||
fwk framework.Framework,
|
||||
pluginContext *framework.PluginContext) (framework.NodeScoreList, error) {
|
||||
state *framework.CycleState) (framework.NodeScoreList, error) {
|
||||
// If no priority configs are provided, then the EqualPriority function is applied
|
||||
// This is required to generate the priority list in the required format
|
||||
if len(priorityConfigs) == 0 && len(extenders) == 0 {
|
||||
@ -793,7 +793,7 @@ func PrioritizeNodes(
|
||||
}
|
||||
|
||||
// Run the Score plugins.
|
||||
scoresMap, scoreStatus := fwk.RunScorePlugins(pluginContext, pod, nodes)
|
||||
scoresMap, scoreStatus := fwk.RunScorePlugins(state, pod, nodes)
|
||||
if !scoreStatus.IsSuccess() {
|
||||
return framework.NodeScoreList{}, scoreStatus.AsError()
|
||||
}
|
||||
@ -1005,7 +1005,7 @@ func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*extenderv1.Victims) *
|
||||
// selectNodesForPreemption finds all the nodes with possible victims for
|
||||
// preemption in parallel.
|
||||
func (g *genericScheduler) selectNodesForPreemption(
|
||||
pluginContext *framework.PluginContext,
|
||||
state *framework.CycleState,
|
||||
pod *v1.Pod,
|
||||
nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
|
||||
potentialNodes []*v1.Node,
|
||||
@ -1025,9 +1025,9 @@ func (g *genericScheduler) selectNodesForPreemption(
|
||||
if meta != nil {
|
||||
metaCopy = meta.ShallowCopy()
|
||||
}
|
||||
pluginContextClone := pluginContext.Clone()
|
||||
stateClone := state.Clone()
|
||||
pods, numPDBViolations, fits := g.selectVictimsOnNode(
|
||||
pluginContextClone, pod, metaCopy, nodeNameToInfo[nodeName], fitPredicates, queue, pdbs)
|
||||
stateClone, pod, metaCopy, nodeNameToInfo[nodeName], fitPredicates, queue, pdbs)
|
||||
if fits {
|
||||
resultLock.Lock()
|
||||
victims := extenderv1.Victims{
|
||||
@ -1097,7 +1097,7 @@ func filterPodsWithPDBViolation(pods []interface{}, pdbs []*policy.PodDisruption
|
||||
// due to pod affinity, node affinity, or node anti-affinity reasons. None of
|
||||
// these predicates can be satisfied by removing more pods from the node.
|
||||
func (g *genericScheduler) selectVictimsOnNode(
|
||||
pluginContext *framework.PluginContext,
|
||||
state *framework.CycleState,
|
||||
pod *v1.Pod,
|
||||
meta predicates.PredicateMetadata,
|
||||
nodeInfo *schedulernodeinfo.NodeInfo,
|
||||
@ -1120,7 +1120,7 @@ func (g *genericScheduler) selectVictimsOnNode(
|
||||
return err
|
||||
}
|
||||
}
|
||||
status := g.framework.RunPreFilterExtensionRemovePod(pluginContext, pod, rp, nodeInfoCopy)
|
||||
status := g.framework.RunPreFilterExtensionRemovePod(state, pod, rp, nodeInfoCopy)
|
||||
if !status.IsSuccess() {
|
||||
return status.AsError()
|
||||
}
|
||||
@ -1133,7 +1133,7 @@ func (g *genericScheduler) selectVictimsOnNode(
|
||||
return err
|
||||
}
|
||||
}
|
||||
status := g.framework.RunPreFilterExtensionAddPod(pluginContext, pod, ap, nodeInfoCopy)
|
||||
status := g.framework.RunPreFilterExtensionAddPod(state, pod, ap, nodeInfoCopy)
|
||||
if !status.IsSuccess() {
|
||||
return status.AsError()
|
||||
}
|
||||
@ -1156,7 +1156,7 @@ func (g *genericScheduler) selectVictimsOnNode(
|
||||
// inter-pod affinity to one or more victims, but we have decided not to
|
||||
// support this case for performance reasons. Having affinity to lower
|
||||
// priority pods is not a recommended configuration anyway.
|
||||
if fits, _, _, err := g.podFitsOnNode(pluginContext, pod, meta, nodeInfoCopy, fitPredicates, queue, false); !fits {
|
||||
if fits, _, _, err := g.podFitsOnNode(state, pod, meta, nodeInfoCopy, fitPredicates, queue, false); !fits {
|
||||
if err != nil {
|
||||
klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
|
||||
}
|
||||
@ -1174,7 +1174,7 @@ func (g *genericScheduler) selectVictimsOnNode(
|
||||
if err := addPod(p); err != nil {
|
||||
return false, err
|
||||
}
|
||||
fits, _, _, _ := g.podFitsOnNode(pluginContext, pod, meta, nodeInfoCopy, fitPredicates, queue, false)
|
||||
fits, _, _, _ := g.podFitsOnNode(state, pod, meta, nodeInfoCopy, fitPredicates, queue, false)
|
||||
if !fits {
|
||||
if err := removePod(p); err != nil {
|
||||
return false, err
|
||||
|
@ -161,7 +161,7 @@ func (fp *FakeFilterPlugin) reset() {
|
||||
|
||||
// Filter is a test function that returns an error or nil, depending on the
|
||||
// value of "failedNodeReturnCodeMap".
|
||||
func (fp *FakeFilterPlugin) Filter(pc *framework.PluginContext, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
|
||||
func (fp *FakeFilterPlugin) Filter(state *framework.CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
|
||||
atomic.AddInt32(&fp.numFilterCalled, 1)
|
||||
|
||||
if returnCode, ok := fp.failedNodeReturnCodeMap[nodeInfo.Node().Name]; ok {
|
||||
@ -674,7 +674,7 @@ func TestGenericScheduler(t *testing.T) {
|
||||
false,
|
||||
schedulerapi.DefaultPercentageOfNodesToScore,
|
||||
false)
|
||||
result, err := scheduler.Schedule(framework.NewPluginContext(), test.pod)
|
||||
result, err := scheduler.Schedule(framework.NewCycleState(), test.pod)
|
||||
if !reflect.DeepEqual(err, test.wErr) {
|
||||
t.Errorf("Unexpected error: %v, expected: %v", err.Error(), test.wErr)
|
||||
}
|
||||
@ -1426,8 +1426,8 @@ func TestSelectNodesForPreemption(t *testing.T) {
|
||||
newnode := makeNode("newnode", 1000*5, priorityutil.DefaultMemoryRequest*5)
|
||||
newnode.ObjectMeta.Labels = map[string]string{"hostname": "newnode"}
|
||||
nodes = append(nodes, newnode)
|
||||
pluginContext := framework.NewPluginContext()
|
||||
nodeToPods, err := g.selectNodesForPreemption(pluginContext, test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata, nil, nil)
|
||||
state := framework.NewCycleState()
|
||||
nodeToPods, err := g.selectNodesForPreemption(state, test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata, nil, nil)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
@ -1650,8 +1650,8 @@ func TestPickOneNodeForPreemption(t *testing.T) {
|
||||
nodes = append(nodes, makeNode(n, priorityutil.DefaultMilliCPURequest*5, priorityutil.DefaultMemoryRequest*5))
|
||||
}
|
||||
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, nodes)
|
||||
pluginContext := framework.NewPluginContext()
|
||||
candidateNodes, _ := g.selectNodesForPreemption(pluginContext, test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata, nil, nil)
|
||||
state := framework.NewCycleState()
|
||||
candidateNodes, _ := g.selectNodesForPreemption(state, test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata, nil, nil)
|
||||
node := pickOneNodeForPreemption(candidateNodes)
|
||||
found := false
|
||||
for _, nodeName := range test.expected {
|
||||
@ -2135,14 +2135,14 @@ func TestPreempt(t *testing.T) {
|
||||
false,
|
||||
schedulerapi.DefaultPercentageOfNodesToScore,
|
||||
true)
|
||||
pluginContext := framework.NewPluginContext()
|
||||
state := framework.NewCycleState()
|
||||
scheduler.(*genericScheduler).snapshot()
|
||||
// Call Preempt and check the expected results.
|
||||
failedPredMap := defaultFailedPredMap
|
||||
if test.failedPredMap != nil {
|
||||
failedPredMap = test.failedPredMap
|
||||
}
|
||||
node, victims, _, err := scheduler.Preempt(pluginContext, test.pod, error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))
|
||||
node, victims, _, err := scheduler.Preempt(state, test.pod, error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error in preemption: %v", err)
|
||||
}
|
||||
@ -2172,7 +2172,7 @@ func TestPreempt(t *testing.T) {
|
||||
test.pod.Status.NominatedNodeName = node.Name
|
||||
}
|
||||
// Call preempt again and make sure it doesn't preempt any more pods.
|
||||
node, victims, _, err = scheduler.Preempt(pluginContext, test.pod, error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))
|
||||
node, victims, _, err = scheduler.Preempt(state, test.pod, error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error in preemption: %v", err)
|
||||
}
|
||||
|
@ -631,7 +631,7 @@ func (t *TestPlugin) Name() string {
|
||||
return t.name
|
||||
}
|
||||
|
||||
func (t *TestPlugin) Score(pc *framework.PluginContext, p *v1.Pod, nodeName string) (int, *framework.Status) {
|
||||
func (t *TestPlugin) Score(state *framework.CycleState, p *v1.Pod, nodeName string) (int, *framework.Status) {
|
||||
return 1, nil
|
||||
}
|
||||
|
||||
@ -639,7 +639,7 @@ func (t *TestPlugin) Extensions() framework.ScoreExtensions {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *TestPlugin) Filter(pc *framework.PluginContext, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
|
||||
func (t *TestPlugin) Filter(state *framework.CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -23,7 +23,7 @@ import (
|
||||
)
|
||||
|
||||
// CommunicatingPlugin is an example of a plugin that implements two
|
||||
// extension points. It communicates through pluginContext with another function.
|
||||
// extension points. It communicates through state with another function.
|
||||
type CommunicatingPlugin struct{}
|
||||
|
||||
var _ = framework.ReservePlugin(CommunicatingPlugin{})
|
||||
@ -37,39 +37,39 @@ func (mc CommunicatingPlugin) Name() string {
|
||||
return Name
|
||||
}
|
||||
|
||||
type contextData struct {
|
||||
type stateData struct {
|
||||
data string
|
||||
}
|
||||
|
||||
func (f *contextData) Clone() framework.ContextData {
|
||||
copy := &contextData{
|
||||
func (f *stateData) Clone() framework.StateData {
|
||||
copy := &stateData{
|
||||
data: f.data,
|
||||
}
|
||||
return copy
|
||||
}
|
||||
|
||||
// Reserve is the functions invoked by the framework at "reserve" extension point.
|
||||
func (mc CommunicatingPlugin) Reserve(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
|
||||
func (mc CommunicatingPlugin) Reserve(state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
|
||||
if pod == nil {
|
||||
return framework.NewStatus(framework.Error, "pod cannot be nil")
|
||||
}
|
||||
if pod.Name == "my-test-pod" {
|
||||
pc.Lock()
|
||||
pc.Write(framework.ContextKey(pod.Name), &contextData{data: "never bind"})
|
||||
pc.Unlock()
|
||||
state.Lock()
|
||||
state.Write(framework.StateKey(pod.Name), &stateData{data: "never bind"})
|
||||
state.Unlock()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// PreBind is the functions invoked by the framework at "prebind" extension point.
|
||||
func (mc CommunicatingPlugin) PreBind(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
|
||||
func (mc CommunicatingPlugin) PreBind(state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
|
||||
if pod == nil {
|
||||
return framework.NewStatus(framework.Error, "pod cannot be nil")
|
||||
}
|
||||
pc.RLock()
|
||||
defer pc.RUnlock()
|
||||
if v, e := pc.Read(framework.ContextKey(pod.Name)); e == nil {
|
||||
if value, ok := v.(*contextData); ok && value.data == "never bind" {
|
||||
state.RLock()
|
||||
defer state.RUnlock()
|
||||
if v, e := state.Read(framework.StateKey(pod.Name)); e == nil {
|
||||
if value, ok := v.(*stateData); ok && value.data == "never bind" {
|
||||
return framework.NewStatus(framework.Unschedulable, "pod is not permitted")
|
||||
}
|
||||
}
|
||||
|
@ -39,7 +39,7 @@ func (sr StatelessPreBindExample) Name() string {
|
||||
}
|
||||
|
||||
// PreBind is the functions invoked by the framework at "prebind" extension point.
|
||||
func (sr StatelessPreBindExample) PreBind(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
|
||||
func (sr StatelessPreBindExample) PreBind(state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
|
||||
if pod == nil {
|
||||
return framework.NewStatus(framework.Error, fmt.Sprintf("pod cannot be nil"))
|
||||
}
|
||||
|
@ -47,14 +47,14 @@ func (mp *MultipointExample) Name() string {
|
||||
}
|
||||
|
||||
// Reserve is the functions invoked by the framework at "reserve" extension point.
|
||||
func (mp *MultipointExample) Reserve(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
|
||||
func (mp *MultipointExample) Reserve(state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
|
||||
// Reserve is not called concurrently, and so we don't need to lock.
|
||||
mp.numRuns++
|
||||
return nil
|
||||
}
|
||||
|
||||
// PreBind is the functions invoked by the framework at "prebind" extension point.
|
||||
func (mp *MultipointExample) PreBind(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
|
||||
func (mp *MultipointExample) PreBind(state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
|
||||
// PreBind could be called concurrently for different pods.
|
||||
mp.mu.Lock()
|
||||
defer mp.mu.Unlock()
|
||||
|
@ -17,7 +17,7 @@ limitations under the License.
|
||||
package noop
|
||||
|
||||
import (
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
@ -41,7 +41,7 @@ func (n Filter) Name() string {
|
||||
}
|
||||
|
||||
// Filter invoked at the filter extension point.
|
||||
func (n Filter) Filter(pc *framework.PluginContext, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status {
|
||||
func (n Filter) Filter(state *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -3,7 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"context.go",
|
||||
"cycle_state.go",
|
||||
"framework.go",
|
||||
"interface.go",
|
||||
"registry.go",
|
||||
@ -43,7 +43,7 @@ filegroup(
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = [
|
||||
"context_test.go",
|
||||
"cycle_state_test.go",
|
||||
"framework_test.go",
|
||||
"interface_test.go",
|
||||
"registry_test.go",
|
||||
|
@ -1,112 +0,0 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package v1alpha1
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
)
|
||||
|
||||
const (
|
||||
// NotFound is the not found error message.
|
||||
NotFound = "not found"
|
||||
)
|
||||
|
||||
// ContextData is a generic type for arbitrary data stored in PluginContext.
|
||||
type ContextData interface {
|
||||
// Clone is an interface to make a copy of ContextData. For performance reasons,
|
||||
// clone should make shallow copies for members (e.g., slices or maps) that are not
|
||||
// impacted by PreFilter's optional AddPod/RemovePod methods.
|
||||
Clone() ContextData
|
||||
}
|
||||
|
||||
// ContextKey is the type of keys stored in PluginContext.
|
||||
type ContextKey string
|
||||
|
||||
// PluginContext provides a mechanism for plugins to store and retrieve arbitrary data.
|
||||
// ContextData stored by one plugin can be read, altered, or deleted by another plugin.
|
||||
// PluginContext does not provide any data protection, as all plugins are assumed to be
|
||||
// trusted.
|
||||
type PluginContext struct {
|
||||
mx sync.RWMutex
|
||||
storage map[ContextKey]ContextData
|
||||
}
|
||||
|
||||
// NewPluginContext initializes a new PluginContext and returns its pointer.
|
||||
func NewPluginContext() *PluginContext {
|
||||
return &PluginContext{
|
||||
storage: make(map[ContextKey]ContextData),
|
||||
}
|
||||
}
|
||||
|
||||
// Clone creates a copy of PluginContext and returns its pointer. Clone returns
|
||||
// nil if the context being cloned is nil.
|
||||
func (c *PluginContext) Clone() *PluginContext {
|
||||
if c == nil {
|
||||
return nil
|
||||
}
|
||||
copy := NewPluginContext()
|
||||
for k, v := range c.storage {
|
||||
copy.Write(k, v.Clone())
|
||||
}
|
||||
return copy
|
||||
}
|
||||
|
||||
// Read retrieves data with the given "key" from PluginContext. If the key is not
|
||||
// present an error is returned.
|
||||
// This function is not thread safe. In multi-threaded code, lock should be
|
||||
// acquired first.
|
||||
func (c *PluginContext) Read(key ContextKey) (ContextData, error) {
|
||||
if v, ok := c.storage[key]; ok {
|
||||
return v, nil
|
||||
}
|
||||
return nil, errors.New(NotFound)
|
||||
}
|
||||
|
||||
// Write stores the given "val" in PluginContext with the given "key".
|
||||
// This function is not thread safe. In multi-threaded code, lock should be
|
||||
// acquired first.
|
||||
func (c *PluginContext) Write(key ContextKey, val ContextData) {
|
||||
c.storage[key] = val
|
||||
}
|
||||
|
||||
// Delete deletes data with the given key from PluginContext.
|
||||
// This function is not thread safe. In multi-threaded code, lock should be
|
||||
// acquired first.
|
||||
func (c *PluginContext) Delete(key ContextKey) {
|
||||
delete(c.storage, key)
|
||||
}
|
||||
|
||||
// Lock acquires PluginContext lock.
|
||||
func (c *PluginContext) Lock() {
|
||||
c.mx.Lock()
|
||||
}
|
||||
|
||||
// Unlock releases PluginContext lock.
|
||||
func (c *PluginContext) Unlock() {
|
||||
c.mx.Unlock()
|
||||
}
|
||||
|
||||
// RLock acquires PluginContext read lock.
|
||||
func (c *PluginContext) RLock() {
|
||||
c.mx.RLock()
|
||||
}
|
||||
|
||||
// RUnlock releases PluginContext read lock.
|
||||
func (c *PluginContext) RUnlock() {
|
||||
c.mx.RUnlock()
|
||||
}
|
112
pkg/scheduler/framework/v1alpha1/cycle_state.go
Normal file
112
pkg/scheduler/framework/v1alpha1/cycle_state.go
Normal file
@ -0,0 +1,112 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package v1alpha1
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
)
|
||||
|
||||
const (
|
||||
// NotFound is the not found error message.
|
||||
NotFound = "not found"
|
||||
)
|
||||
|
||||
// StateData is a generic type for arbitrary data stored in CycleState.
|
||||
type StateData interface {
|
||||
// Clone is an interface to make a copy of StateData. For performance reasons,
|
||||
// clone should make shallow copies for members (e.g., slices or maps) that are not
|
||||
// impacted by PreFilter's optional AddPod/RemovePod methods.
|
||||
Clone() StateData
|
||||
}
|
||||
|
||||
// StateKey is the type of keys stored in CycleState.
|
||||
type StateKey string
|
||||
|
||||
// CycleState provides a mechanism for plugins to store and retrieve arbitrary data.
|
||||
// StateData stored by one plugin can be read, altered, or deleted by another plugin.
|
||||
// CycleState does not provide any data protection, as all plugins are assumed to be
|
||||
// trusted.
|
||||
type CycleState struct {
|
||||
mx sync.RWMutex
|
||||
storage map[StateKey]StateData
|
||||
}
|
||||
|
||||
// NewCycleState initializes a new CycleState and returns its pointer.
|
||||
func NewCycleState() *CycleState {
|
||||
return &CycleState{
|
||||
storage: make(map[StateKey]StateData),
|
||||
}
|
||||
}
|
||||
|
||||
// Clone creates a copy of CycleState and returns its pointer. Clone returns
|
||||
// nil if the context being cloned is nil.
|
||||
func (c *CycleState) Clone() *CycleState {
|
||||
if c == nil {
|
||||
return nil
|
||||
}
|
||||
copy := NewCycleState()
|
||||
for k, v := range c.storage {
|
||||
copy.Write(k, v.Clone())
|
||||
}
|
||||
return copy
|
||||
}
|
||||
|
||||
// Read retrieves data with the given "key" from CycleState. If the key is not
|
||||
// present an error is returned.
|
||||
// This function is not thread safe. In multi-threaded code, lock should be
|
||||
// acquired first.
|
||||
func (c *CycleState) Read(key StateKey) (StateData, error) {
|
||||
if v, ok := c.storage[key]; ok {
|
||||
return v, nil
|
||||
}
|
||||
return nil, errors.New(NotFound)
|
||||
}
|
||||
|
||||
// Write stores the given "val" in CycleState with the given "key".
|
||||
// This function is not thread safe. In multi-threaded code, lock should be
|
||||
// acquired first.
|
||||
func (c *CycleState) Write(key StateKey, val StateData) {
|
||||
c.storage[key] = val
|
||||
}
|
||||
|
||||
// Delete deletes data with the given key from CycleState.
|
||||
// This function is not thread safe. In multi-threaded code, lock should be
|
||||
// acquired first.
|
||||
func (c *CycleState) Delete(key StateKey) {
|
||||
delete(c.storage, key)
|
||||
}
|
||||
|
||||
// Lock acquires CycleState lock.
|
||||
func (c *CycleState) Lock() {
|
||||
c.mx.Lock()
|
||||
}
|
||||
|
||||
// Unlock releases CycleState lock.
|
||||
func (c *CycleState) Unlock() {
|
||||
c.mx.Unlock()
|
||||
}
|
||||
|
||||
// RLock acquires CycleState read lock.
|
||||
func (c *CycleState) RLock() {
|
||||
c.mx.RLock()
|
||||
}
|
||||
|
||||
// RUnlock releases CycleState read lock.
|
||||
func (c *CycleState) RUnlock() {
|
||||
c.mx.RUnlock()
|
||||
}
|
@ -24,26 +24,26 @@ type fakeData struct {
|
||||
data string
|
||||
}
|
||||
|
||||
func (f *fakeData) Clone() ContextData {
|
||||
func (f *fakeData) Clone() StateData {
|
||||
copy := &fakeData{
|
||||
data: f.data,
|
||||
}
|
||||
return copy
|
||||
}
|
||||
|
||||
func TestPluginContextClone(t *testing.T) {
|
||||
var key ContextKey = "key"
|
||||
func TestCycleStateClone(t *testing.T) {
|
||||
var key StateKey = "key"
|
||||
data1 := "value1"
|
||||
data2 := "value2"
|
||||
|
||||
pc := NewPluginContext()
|
||||
state := NewCycleState()
|
||||
originalValue := &fakeData{
|
||||
data: data1,
|
||||
}
|
||||
pc.Write(key, originalValue)
|
||||
pcCopy := pc.Clone()
|
||||
state.Write(key, originalValue)
|
||||
stateCopy := state.Clone()
|
||||
|
||||
valueCopy, err := pcCopy.Read(key)
|
||||
valueCopy, err := stateCopy.Read(key)
|
||||
if err != nil {
|
||||
t.Errorf("failed to read copied value: %v", err)
|
||||
}
|
||||
@ -52,7 +52,7 @@ func TestPluginContextClone(t *testing.T) {
|
||||
}
|
||||
|
||||
originalValue.data = data2
|
||||
original, err := pc.Read(key)
|
||||
original, err := state.Read(key)
|
||||
if err != nil {
|
||||
t.Errorf("failed to read original value: %v", err)
|
||||
}
|
||||
@ -65,10 +65,10 @@ func TestPluginContextClone(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestPluginContextCloneNil(t *testing.T) {
|
||||
var pc *PluginContext
|
||||
pcCopy := pc.Clone()
|
||||
if pcCopy != nil {
|
||||
func TestCycleStateCloneNil(t *testing.T) {
|
||||
var state *CycleState
|
||||
stateCopy := state.Clone()
|
||||
if stateCopy != nil {
|
||||
t.Errorf("clone expected to be nil")
|
||||
}
|
||||
}
|
@ -87,9 +87,9 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
|
||||
}
|
||||
|
||||
// find the config args of a plugin
|
||||
pc := pluginConfig[name]
|
||||
state := pluginConfig[name]
|
||||
|
||||
p, err := factory(pc, f)
|
||||
p, err := factory(state, f)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error initializing plugin %q: %v", name, err)
|
||||
}
|
||||
@ -205,9 +205,9 @@ func (f *framework) QueueSortFunc() LessFunc {
|
||||
// anything but Success. If a non-success status is returned, then the scheduling
|
||||
// cycle is aborted.
|
||||
func (f *framework) RunPreFilterPlugins(
|
||||
pc *PluginContext, pod *v1.Pod) *Status {
|
||||
state *CycleState, pod *v1.Pod) *Status {
|
||||
for _, pl := range f.preFilterPlugins {
|
||||
status := pl.PreFilter(pc, pod)
|
||||
status := pl.PreFilter(state, pod)
|
||||
if !status.IsSuccess() {
|
||||
if status.IsUnschedulable() {
|
||||
msg := fmt.Sprintf("rejected by %q at prefilter: %v", pl.Name(), status.Message())
|
||||
@ -226,13 +226,13 @@ func (f *framework) RunPreFilterPlugins(
|
||||
// RunPreFilterExtensionAddPod calls the AddPod interface for the set of configured
|
||||
// PreFilter plugins. It returns directly if any of the plugins return any
|
||||
// status other than Success.
|
||||
func (f *framework) RunPreFilterExtensionAddPod(pc *PluginContext, podToSchedule *v1.Pod,
|
||||
func (f *framework) RunPreFilterExtensionAddPod(state *CycleState, podToSchedule *v1.Pod,
|
||||
podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
|
||||
for _, pl := range f.preFilterPlugins {
|
||||
if pl.Extensions() == nil {
|
||||
continue
|
||||
}
|
||||
if status := pl.Extensions().AddPod(pc, podToSchedule, podToAdd, nodeInfo); !status.IsSuccess() {
|
||||
if status := pl.Extensions().AddPod(state, podToSchedule, podToAdd, nodeInfo); !status.IsSuccess() {
|
||||
msg := fmt.Sprintf("error while running AddPod for plugin %q while scheduling pod %q: %v",
|
||||
pl.Name(), podToSchedule.Name, status.Message())
|
||||
klog.Error(msg)
|
||||
@ -246,13 +246,13 @@ func (f *framework) RunPreFilterExtensionAddPod(pc *PluginContext, podToSchedule
|
||||
// RunPreFilterExtensionRemovePod calls the RemovePod interface for the set of configured
|
||||
// PreFilter plugins. It returns directly if any of the plugins return any
|
||||
// status other than Success.
|
||||
func (f *framework) RunPreFilterExtensionRemovePod(pc *PluginContext, podToSchedule *v1.Pod,
|
||||
func (f *framework) RunPreFilterExtensionRemovePod(state *CycleState, podToSchedule *v1.Pod,
|
||||
podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
|
||||
for _, pl := range f.preFilterPlugins {
|
||||
if pl.Extensions() == nil {
|
||||
continue
|
||||
}
|
||||
if status := pl.Extensions().RemovePod(pc, podToSchedule, podToRemove, nodeInfo); !status.IsSuccess() {
|
||||
if status := pl.Extensions().RemovePod(state, podToSchedule, podToRemove, nodeInfo); !status.IsSuccess() {
|
||||
msg := fmt.Sprintf("error while running RemovePod for plugin %q while scheduling pod %q: %v",
|
||||
pl.Name(), podToSchedule.Name, status.Message())
|
||||
klog.Error(msg)
|
||||
@ -267,10 +267,10 @@ func (f *framework) RunPreFilterExtensionRemovePod(pc *PluginContext, podToSched
|
||||
// the given node. If any of these plugins doesn't return "Success", the
|
||||
// given node is not suitable for running pod.
|
||||
// Meanwhile, the failure message and status are set for the given node.
|
||||
func (f *framework) RunFilterPlugins(pc *PluginContext,
|
||||
func (f *framework) RunFilterPlugins(state *CycleState,
|
||||
pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
|
||||
for _, pl := range f.filterPlugins {
|
||||
status := pl.Filter(pc, pod, nodeInfo)
|
||||
status := pl.Filter(state, pod, nodeInfo)
|
||||
if !status.IsSuccess() {
|
||||
if !status.IsUnschedulable() {
|
||||
errMsg := fmt.Sprintf("error while running %q filter plugin for pod %q: %v",
|
||||
@ -289,13 +289,13 @@ func (f *framework) RunFilterPlugins(pc *PluginContext,
|
||||
// of these plugins returns any status other than "Success", the given node is
|
||||
// rejected. The filteredNodeStatuses is the set of filtered nodes and their statuses.
|
||||
func (f *framework) RunPostFilterPlugins(
|
||||
pc *PluginContext,
|
||||
state *CycleState,
|
||||
pod *v1.Pod,
|
||||
nodes []*v1.Node,
|
||||
filteredNodesStatuses NodeToStatusMap,
|
||||
) *Status {
|
||||
for _, pl := range f.postFilterPlugins {
|
||||
status := pl.PostFilter(pc, pod, nodes, filteredNodesStatuses)
|
||||
status := pl.PostFilter(state, pod, nodes, filteredNodesStatuses)
|
||||
if !status.IsSuccess() {
|
||||
msg := fmt.Sprintf("error while running %q postfilter plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
|
||||
klog.Error(msg)
|
||||
@ -310,7 +310,7 @@ func (f *framework) RunPostFilterPlugins(
|
||||
// stores for each scoring plugin name the corresponding NodeScoreList(s).
|
||||
// It also returns *Status, which is set to non-success if any of the plugins returns
|
||||
// a non-success status.
|
||||
func (f *framework) RunScorePlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1.Node) (PluginToNodeScores, *Status) {
|
||||
func (f *framework) RunScorePlugins(state *CycleState, pod *v1.Pod, nodes []*v1.Node) (PluginToNodeScores, *Status) {
|
||||
pluginToNodeScores := make(PluginToNodeScores, len(f.scorePlugins))
|
||||
for _, pl := range f.scorePlugins {
|
||||
pluginToNodeScores[pl.Name()] = make(NodeScoreList, len(nodes))
|
||||
@ -322,7 +322,7 @@ func (f *framework) RunScorePlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1.
|
||||
workqueue.ParallelizeUntil(ctx, 16, len(nodes), func(index int) {
|
||||
for _, pl := range f.scorePlugins {
|
||||
nodeName := nodes[index].Name
|
||||
score, status := pl.Score(pc, pod, nodeName)
|
||||
score, status := pl.Score(state, pod, nodeName)
|
||||
if !status.IsSuccess() {
|
||||
errCh.SendErrorWithCancel(fmt.Errorf(status.Message()), cancel)
|
||||
return
|
||||
@ -346,7 +346,7 @@ func (f *framework) RunScorePlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1.
|
||||
if pl.Extensions() == nil {
|
||||
return
|
||||
}
|
||||
if status := pl.Extensions().NormalizeScore(pc, pod, nodeScoreList); !status.IsSuccess() {
|
||||
if status := pl.Extensions().NormalizeScore(state, pod, nodeScoreList); !status.IsSuccess() {
|
||||
err := fmt.Errorf("normalize score plugin %q failed with error %v", pl.Name(), status.Message())
|
||||
errCh.SendErrorWithCancel(err, cancel)
|
||||
return
|
||||
@ -388,9 +388,9 @@ func (f *framework) RunScorePlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1.
|
||||
// failure (bool) if any of the plugins returns an error. It also returns an
|
||||
// error containing the rejection message or the error occurred in the plugin.
|
||||
func (f *framework) RunPreBindPlugins(
|
||||
pc *PluginContext, pod *v1.Pod, nodeName string) *Status {
|
||||
state *CycleState, pod *v1.Pod, nodeName string) *Status {
|
||||
for _, pl := range f.preBindPlugins {
|
||||
status := pl.PreBind(pc, pod, nodeName)
|
||||
status := pl.PreBind(state, pod, nodeName)
|
||||
if !status.IsSuccess() {
|
||||
msg := fmt.Sprintf("error while running %q prebind plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
|
||||
klog.Error(msg)
|
||||
@ -401,13 +401,13 @@ func (f *framework) RunPreBindPlugins(
|
||||
}
|
||||
|
||||
// RunBindPlugins runs the set of configured bind plugins until one returns a non `Skip` status.
|
||||
func (f *framework) RunBindPlugins(pc *PluginContext, pod *v1.Pod, nodeName string) *Status {
|
||||
func (f *framework) RunBindPlugins(state *CycleState, pod *v1.Pod, nodeName string) *Status {
|
||||
if len(f.bindPlugins) == 0 {
|
||||
return NewStatus(Skip, "")
|
||||
}
|
||||
var status *Status
|
||||
for _, bp := range f.bindPlugins {
|
||||
status = bp.Bind(pc, pod, nodeName)
|
||||
status = bp.Bind(state, pod, nodeName)
|
||||
if status != nil && status.Code() == Skip {
|
||||
continue
|
||||
}
|
||||
@ -423,9 +423,9 @@ func (f *framework) RunBindPlugins(pc *PluginContext, pod *v1.Pod, nodeName stri
|
||||
|
||||
// RunPostBindPlugins runs the set of configured postbind plugins.
|
||||
func (f *framework) RunPostBindPlugins(
|
||||
pc *PluginContext, pod *v1.Pod, nodeName string) {
|
||||
state *CycleState, pod *v1.Pod, nodeName string) {
|
||||
for _, pl := range f.postBindPlugins {
|
||||
pl.PostBind(pc, pod, nodeName)
|
||||
pl.PostBind(state, pod, nodeName)
|
||||
}
|
||||
}
|
||||
|
||||
@ -433,9 +433,9 @@ func (f *framework) RunPostBindPlugins(
|
||||
// plugins returns an error, it does not continue running the remaining ones and
|
||||
// returns the error. In such case, pod will not be scheduled.
|
||||
func (f *framework) RunReservePlugins(
|
||||
pc *PluginContext, pod *v1.Pod, nodeName string) *Status {
|
||||
state *CycleState, pod *v1.Pod, nodeName string) *Status {
|
||||
for _, pl := range f.reservePlugins {
|
||||
status := pl.Reserve(pc, pod, nodeName)
|
||||
status := pl.Reserve(state, pod, nodeName)
|
||||
if !status.IsSuccess() {
|
||||
msg := fmt.Sprintf("error while running %q reserve plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
|
||||
klog.Error(msg)
|
||||
@ -447,9 +447,9 @@ func (f *framework) RunReservePlugins(
|
||||
|
||||
// RunUnreservePlugins runs the set of configured unreserve plugins.
|
||||
func (f *framework) RunUnreservePlugins(
|
||||
pc *PluginContext, pod *v1.Pod, nodeName string) {
|
||||
state *CycleState, pod *v1.Pod, nodeName string) {
|
||||
for _, pl := range f.unreservePlugins {
|
||||
pl.Unreserve(pc, pod, nodeName)
|
||||
pl.Unreserve(state, pod, nodeName)
|
||||
}
|
||||
}
|
||||
|
||||
@ -461,11 +461,11 @@ func (f *framework) RunUnreservePlugins(
|
||||
// Note that if multiple plugins asked to wait, then we wait for the minimum
|
||||
// timeout duration.
|
||||
func (f *framework) RunPermitPlugins(
|
||||
pc *PluginContext, pod *v1.Pod, nodeName string) *Status {
|
||||
state *CycleState, pod *v1.Pod, nodeName string) *Status {
|
||||
timeout := maxTimeout
|
||||
statusCode := Success
|
||||
for _, pl := range f.permitPlugins {
|
||||
status, d := pl.Permit(pc, pod, nodeName)
|
||||
status, d := pl.Permit(state, pod, nodeName)
|
||||
if !status.IsSuccess() {
|
||||
if status.IsUnschedulable() {
|
||||
msg := fmt.Sprintf("rejected by %q at permit: %v", pl.Name(), status.Message())
|
||||
@ -535,13 +535,13 @@ func (f *framework) GetWaitingPod(uid types.UID) WaitingPod {
|
||||
}
|
||||
|
||||
func pluginNameToConfig(args []config.PluginConfig) map[string]*runtime.Unknown {
|
||||
pc := make(map[string]*runtime.Unknown, 0)
|
||||
state := make(map[string]*runtime.Unknown, 0)
|
||||
for i := range args {
|
||||
// This is needed because the type of PluginConfig.Args is not pointer type.
|
||||
p := args[i]
|
||||
pc[p.Name] = &p.Args
|
||||
state[p.Name] = &p.Args
|
||||
}
|
||||
return pc
|
||||
return state
|
||||
}
|
||||
|
||||
func pluginsNeeded(plugins *config.Plugins) map[string]config.Plugin {
|
||||
|
@ -81,11 +81,11 @@ func (pl *TestScoreWithNormalizePlugin) Name() string {
|
||||
return pl.name
|
||||
}
|
||||
|
||||
func (pl *TestScoreWithNormalizePlugin) NormalizeScore(pc *PluginContext, pod *v1.Pod, scores NodeScoreList) *Status {
|
||||
func (pl *TestScoreWithNormalizePlugin) NormalizeScore(state *CycleState, pod *v1.Pod, scores NodeScoreList) *Status {
|
||||
return injectNormalizeRes(pl.inj, scores)
|
||||
}
|
||||
|
||||
func (pl *TestScoreWithNormalizePlugin) Score(pc *PluginContext, p *v1.Pod, nodeName string) (int, *Status) {
|
||||
func (pl *TestScoreWithNormalizePlugin) Score(state *CycleState, p *v1.Pod, nodeName string) (int, *Status) {
|
||||
return setScoreRes(pl.inj)
|
||||
}
|
||||
|
||||
@ -103,7 +103,7 @@ func (pl *TestScorePlugin) Name() string {
|
||||
return pl.name
|
||||
}
|
||||
|
||||
func (pl *TestScorePlugin) Score(pc *PluginContext, p *v1.Pod, nodeName string) (int, *Status) {
|
||||
func (pl *TestScorePlugin) Score(state *CycleState, p *v1.Pod, nodeName string) (int, *Status) {
|
||||
return setScoreRes(pl.inj)
|
||||
}
|
||||
|
||||
@ -127,7 +127,7 @@ func (pl *TestPreFilterPlugin) Name() string {
|
||||
return preFilterPluginName
|
||||
}
|
||||
|
||||
func (pl *TestPreFilterPlugin) PreFilter(pc *PluginContext, p *v1.Pod) *Status {
|
||||
func (pl *TestPreFilterPlugin) PreFilter(state *CycleState, p *v1.Pod) *Status {
|
||||
pl.PreFilterCalled++
|
||||
return nil
|
||||
}
|
||||
@ -147,18 +147,18 @@ func (pl *TestPreFilterWithExtensionsPlugin) Name() string {
|
||||
return preFilterWithExtensionsPluginName
|
||||
}
|
||||
|
||||
func (pl *TestPreFilterWithExtensionsPlugin) PreFilter(pc *PluginContext, p *v1.Pod) *Status {
|
||||
func (pl *TestPreFilterWithExtensionsPlugin) PreFilter(state *CycleState, p *v1.Pod) *Status {
|
||||
pl.PreFilterCalled++
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pl *TestPreFilterWithExtensionsPlugin) AddPod(pc *PluginContext, podToSchedule *v1.Pod,
|
||||
func (pl *TestPreFilterWithExtensionsPlugin) AddPod(state *CycleState, podToSchedule *v1.Pod,
|
||||
podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
|
||||
pl.AddCalled++
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pl *TestPreFilterWithExtensionsPlugin) RemovePod(pc *PluginContext, podToSchedule *v1.Pod,
|
||||
func (pl *TestPreFilterWithExtensionsPlugin) RemovePod(state *CycleState, podToSchedule *v1.Pod,
|
||||
podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
|
||||
pl.RemoveCalled++
|
||||
return nil
|
||||
@ -175,7 +175,7 @@ func (dp *TestDuplicatePlugin) Name() string {
|
||||
return duplicatePluginName
|
||||
}
|
||||
|
||||
func (dp *TestDuplicatePlugin) PreFilter(pc *PluginContext, p *v1.Pod) *Status {
|
||||
func (dp *TestDuplicatePlugin) PreFilter(state *CycleState, p *v1.Pod) *Status {
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -206,7 +206,7 @@ var defaultWeights = map[string]int32{
|
||||
}
|
||||
|
||||
var emptyArgs []config.PluginConfig = make([]config.PluginConfig, 0)
|
||||
var pc = &PluginContext{}
|
||||
var state = &CycleState{}
|
||||
|
||||
// Pod is only used for logging errors.
|
||||
var pod = &v1.Pod{}
|
||||
@ -453,7 +453,7 @@ func TestRunScorePlugins(t *testing.T) {
|
||||
t.Fatalf("Failed to create framework for testing: %v", err)
|
||||
}
|
||||
|
||||
res, status := f.RunScorePlugins(pc, pod, nodes)
|
||||
res, status := f.RunScorePlugins(state, pod, nodes)
|
||||
|
||||
if tt.err {
|
||||
if status.IsSuccess() {
|
||||
|
@ -177,10 +177,10 @@ type QueueSortPlugin interface {
|
||||
type PreFilterExtensions interface {
|
||||
// AddPod is called by the framework while trying to evaluate the impact
|
||||
// of adding podToAdd to the node while scheduling podToSchedule.
|
||||
AddPod(pc *PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status
|
||||
AddPod(state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status
|
||||
// RemovePod is called by the framework while trying to evaluate the impact
|
||||
// of removing podToRemove from the node while scheduling podToSchedule.
|
||||
RemovePod(pc *PluginContext, podToSchedule *v1.Pod, podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status
|
||||
RemovePod(state *CycleState, podToSchedule *v1.Pod, podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status
|
||||
}
|
||||
|
||||
// PreFilterPlugin is an interface that must be implemented by "prefilter" plugins.
|
||||
@ -189,12 +189,12 @@ type PreFilterPlugin interface {
|
||||
Plugin
|
||||
// PreFilter is called at the beginning of the scheduling cycle. All PreFilter
|
||||
// plugins must return success or the pod will be rejected.
|
||||
PreFilter(pc *PluginContext, p *v1.Pod) *Status
|
||||
PreFilter(state *CycleState, p *v1.Pod) *Status
|
||||
// Extensions returns a PreFilterExtensions interface if the plugin implements one,
|
||||
// or nil if it does not. A Pre-filter plugin can provide extensions to incrementally
|
||||
// modify its pre-processed info. The framework guarantees that the extensions
|
||||
// AddPod/RemovePod will only be called after PreFilter, possibly on a cloned
|
||||
// PluginContext, and may call those functions more than once before calling
|
||||
// CycleState, and may call those functions more than once before calling
|
||||
// Filter again on a specific node.
|
||||
Extensions() PreFilterExtensions
|
||||
}
|
||||
@ -220,7 +220,7 @@ type FilterPlugin interface {
|
||||
// For example, during preemption, we may pass a copy of the original
|
||||
// nodeInfo object that has some pods removed from it to evaluate the
|
||||
// possibility of preempting them to schedule the target pod.
|
||||
Filter(pc *PluginContext, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status
|
||||
Filter(state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status
|
||||
}
|
||||
|
||||
// PostFilterPlugin is an interface for Post-filter plugin. Post-filter is an
|
||||
@ -233,7 +233,7 @@ type PostFilterPlugin interface {
|
||||
// passed the filtering phase. All postfilter plugins must return success or
|
||||
// the pod will be rejected. The filteredNodesStatuses is the set of filtered nodes
|
||||
// and their filter status.
|
||||
PostFilter(pc *PluginContext, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap) *Status
|
||||
PostFilter(state *CycleState, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap) *Status
|
||||
}
|
||||
|
||||
// ScoreExtensions is an interface for Score extended functionality.
|
||||
@ -241,7 +241,7 @@ type ScoreExtensions interface {
|
||||
// NormalizeScore is called for all node scores produced by the same plugin's "Score"
|
||||
// method. A successful run of NormalizeScore will update the scores list and return
|
||||
// a success status.
|
||||
NormalizeScore(pc *PluginContext, p *v1.Pod, scores NodeScoreList) *Status
|
||||
NormalizeScore(state *CycleState, p *v1.Pod, scores NodeScoreList) *Status
|
||||
}
|
||||
|
||||
// ScorePlugin is an interface that must be implemented by "score" plugins to rank
|
||||
@ -251,7 +251,7 @@ type ScorePlugin interface {
|
||||
// Score is called on each filtered node. It must return success and an integer
|
||||
// indicating the rank of the node. All scoring plugins must return success or
|
||||
// the pod will be rejected.
|
||||
Score(pc *PluginContext, p *v1.Pod, nodeName string) (int, *Status)
|
||||
Score(state *CycleState, p *v1.Pod, nodeName string) (int, *Status)
|
||||
|
||||
// Extensions returns a ScoreExtensions interface if it implements one, or nil if does not.
|
||||
Extensions() ScoreExtensions
|
||||
@ -267,7 +267,7 @@ type ReservePlugin interface {
|
||||
Plugin
|
||||
// Reserve is called by the scheduling framework when the scheduler cache is
|
||||
// updated.
|
||||
Reserve(pc *PluginContext, p *v1.Pod, nodeName string) *Status
|
||||
Reserve(state *CycleState, p *v1.Pod, nodeName string) *Status
|
||||
}
|
||||
|
||||
// PreBindPlugin is an interface that must be implemented by "prebind" plugins.
|
||||
@ -276,7 +276,7 @@ type PreBindPlugin interface {
|
||||
Plugin
|
||||
// PreBind is called before binding a pod. All prebind plugins must return
|
||||
// success or the pod will be rejected and won't be sent for binding.
|
||||
PreBind(pc *PluginContext, p *v1.Pod, nodeName string) *Status
|
||||
PreBind(state *CycleState, p *v1.Pod, nodeName string) *Status
|
||||
}
|
||||
|
||||
// PostBindPlugin is an interface that must be implemented by "postbind" plugins.
|
||||
@ -287,7 +287,7 @@ type PostBindPlugin interface {
|
||||
// informational. A common application of this extension point is for cleaning
|
||||
// up. If a plugin needs to clean-up its state after a pod is scheduled and
|
||||
// bound, PostBind is the extension point that it should register.
|
||||
PostBind(pc *PluginContext, p *v1.Pod, nodeName string)
|
||||
PostBind(state *CycleState, p *v1.Pod, nodeName string)
|
||||
}
|
||||
|
||||
// UnreservePlugin is an interface for Unreserve plugins. This is an informational
|
||||
@ -298,7 +298,7 @@ type UnreservePlugin interface {
|
||||
Plugin
|
||||
// Unreserve is called by the scheduling framework when a reserved pod was
|
||||
// rejected in a later phase.
|
||||
Unreserve(pc *PluginContext, p *v1.Pod, nodeName string)
|
||||
Unreserve(state *CycleState, p *v1.Pod, nodeName string)
|
||||
}
|
||||
|
||||
// PermitPlugin is an interface that must be implemented by "permit" plugins.
|
||||
@ -311,7 +311,7 @@ type PermitPlugin interface {
|
||||
// The pod will also be rejected if the wait timeout or the pod is rejected while
|
||||
// waiting. Note that if the plugin returns "wait", the framework will wait only
|
||||
// after running the remaining plugins given that no other plugin rejects the pod.
|
||||
Permit(pc *PluginContext, p *v1.Pod, nodeName string) (*Status, time.Duration)
|
||||
Permit(state *CycleState, p *v1.Pod, nodeName string) (*Status, time.Duration)
|
||||
}
|
||||
|
||||
// BindPlugin is an interface that must be implemented by "bind" plugins. Bind
|
||||
@ -324,7 +324,7 @@ type BindPlugin interface {
|
||||
// remaining bind plugins are skipped. When a bind plugin does not handle a pod,
|
||||
// it must return Skip in its Status code. If a bind plugin returns an Error, the
|
||||
// pod is rejected and will not be bound.
|
||||
Bind(pc *PluginContext, p *v1.Pod, nodeName string) *Status
|
||||
Bind(state *CycleState, p *v1.Pod, nodeName string) *Status
|
||||
}
|
||||
|
||||
// Framework manages the set of plugins in use by the scheduling framework.
|
||||
@ -338,7 +338,7 @@ type Framework interface {
|
||||
// *Status and its code is set to non-success if any of the plugins returns
|
||||
// anything but Success. If a non-success status is returned, then the scheduling
|
||||
// cycle is aborted.
|
||||
RunPreFilterPlugins(pc *PluginContext, pod *v1.Pod) *Status
|
||||
RunPreFilterPlugins(state *CycleState, pod *v1.Pod) *Status
|
||||
|
||||
// RunFilterPlugins runs the set of configured filter plugins for pod on
|
||||
// the given node. It returns directly if any of the filter plugins
|
||||
@ -349,46 +349,46 @@ type Framework interface {
|
||||
// pass a copy of the original nodeInfo object that has some pods
|
||||
// removed from it to evaluate the possibility of preempting them to
|
||||
// schedule the target pod.
|
||||
RunFilterPlugins(pc *PluginContext, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status
|
||||
RunFilterPlugins(state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status
|
||||
|
||||
// RunPreFilterExtensionAddPod calls the AddPod interface for the set of configured
|
||||
// PreFilter plugins. It returns directly if any of the plugins return any
|
||||
// status other than Success.
|
||||
RunPreFilterExtensionAddPod(pc *PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status
|
||||
RunPreFilterExtensionAddPod(state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status
|
||||
|
||||
// RunPreFilterExtensionRemovePod calls the RemovePod interface for the set of configured
|
||||
// PreFilter plugins. It returns directly if any of the plugins return any
|
||||
// status other than Success.
|
||||
RunPreFilterExtensionRemovePod(pc *PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status
|
||||
RunPreFilterExtensionRemovePod(state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status
|
||||
|
||||
// RunPostFilterPlugins runs the set of configured post-filter plugins. If any
|
||||
// of these plugins returns any status other than "Success", the given node is
|
||||
// rejected. The filteredNodeStatuses is the set of filtered nodes and their statuses.
|
||||
RunPostFilterPlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap) *Status
|
||||
RunPostFilterPlugins(state *CycleState, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap) *Status
|
||||
|
||||
// RunScorePlugins runs the set of configured scoring plugins. It returns a map that
|
||||
// stores for each scoring plugin name the corresponding NodeScoreList(s).
|
||||
// It also returns *Status, which is set to non-success if any of the plugins returns
|
||||
// a non-success status.
|
||||
RunScorePlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1.Node) (PluginToNodeScores, *Status)
|
||||
RunScorePlugins(state *CycleState, pod *v1.Pod, nodes []*v1.Node) (PluginToNodeScores, *Status)
|
||||
|
||||
// RunPreBindPlugins runs the set of configured prebind plugins. It returns
|
||||
// *Status and its code is set to non-success if any of the plugins returns
|
||||
// anything but Success. If the Status code is "Unschedulable", it is
|
||||
// considered as a scheduling check failure, otherwise, it is considered as an
|
||||
// internal error. In either case the pod is not going to be bound.
|
||||
RunPreBindPlugins(pc *PluginContext, pod *v1.Pod, nodeName string) *Status
|
||||
RunPreBindPlugins(state *CycleState, pod *v1.Pod, nodeName string) *Status
|
||||
|
||||
// RunPostBindPlugins runs the set of configured postbind plugins.
|
||||
RunPostBindPlugins(pc *PluginContext, pod *v1.Pod, nodeName string)
|
||||
RunPostBindPlugins(state *CycleState, pod *v1.Pod, nodeName string)
|
||||
|
||||
// RunReservePlugins runs the set of configured reserve plugins. If any of these
|
||||
// plugins returns an error, it does not continue running the remaining ones and
|
||||
// returns the error. In such case, pod will not be scheduled.
|
||||
RunReservePlugins(pc *PluginContext, pod *v1.Pod, nodeName string) *Status
|
||||
RunReservePlugins(state *CycleState, pod *v1.Pod, nodeName string) *Status
|
||||
|
||||
// RunUnreservePlugins runs the set of configured unreserve plugins.
|
||||
RunUnreservePlugins(pc *PluginContext, pod *v1.Pod, nodeName string)
|
||||
RunUnreservePlugins(state *CycleState, pod *v1.Pod, nodeName string)
|
||||
|
||||
// RunPermitPlugins runs the set of configured permit plugins. If any of these
|
||||
// plugins returns a status other than "Success" or "Wait", it does not continue
|
||||
@ -397,14 +397,14 @@ type Framework interface {
|
||||
// returned by the plugin, if the time expires, then it will return an error.
|
||||
// Note that if multiple plugins asked to wait, then we wait for the minimum
|
||||
// timeout duration.
|
||||
RunPermitPlugins(pc *PluginContext, pod *v1.Pod, nodeName string) *Status
|
||||
RunPermitPlugins(state *CycleState, pod *v1.Pod, nodeName string) *Status
|
||||
|
||||
// RunBindPlugins runs the set of configured bind plugins. A bind plugin may choose
|
||||
// whether or not to handle the given Pod. If a bind plugin chooses to skip the
|
||||
// binding, it should return code=4("skip") status. Otherwise, it should return "Error"
|
||||
// or "Success". If none of the plugins handled binding, RunBindPlugins returns
|
||||
// code=4("skip") status.
|
||||
RunBindPlugins(pc *PluginContext, pod *v1.Pod, nodeName string) *Status
|
||||
RunBindPlugins(state *CycleState, pod *v1.Pod, nodeName string) *Status
|
||||
}
|
||||
|
||||
// FrameworkHandle provides data and some tools that plugins can use. It is
|
||||
|
@ -167,47 +167,47 @@ func (*fakeFramework) NodeInfoSnapshot() *schedulernodeinfo.Snapshot {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*fakeFramework) RunPreFilterPlugins(pc *framework.PluginContext, pod *v1.Pod) *framework.Status {
|
||||
func (*fakeFramework) RunPreFilterPlugins(state *framework.CycleState, pod *v1.Pod) *framework.Status {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*fakeFramework) RunFilterPlugins(pc *framework.PluginContext, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
|
||||
func (*fakeFramework) RunFilterPlugins(state *framework.CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*fakeFramework) RunPreFilterExtensionAddPod(pc *framework.PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
|
||||
func (*fakeFramework) RunPreFilterExtensionAddPod(state *framework.CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*fakeFramework) RunPreFilterExtensionRemovePod(pc *framework.PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
|
||||
func (*fakeFramework) RunPreFilterExtensionRemovePod(state *framework.CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*fakeFramework) RunScorePlugins(pc *framework.PluginContext, pod *v1.Pod, nodes []*v1.Node) (framework.PluginToNodeScores, *framework.Status) {
|
||||
func (*fakeFramework) RunScorePlugins(state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) (framework.PluginToNodeScores, *framework.Status) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (*fakeFramework) RunPreBindPlugins(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
|
||||
func (*fakeFramework) RunPreBindPlugins(state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*fakeFramework) RunBindPlugins(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
|
||||
func (*fakeFramework) RunBindPlugins(state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*fakeFramework) RunPostBindPlugins(pc *framework.PluginContext, pod *v1.Pod, nodeName string) {}
|
||||
func (*fakeFramework) RunPostBindPlugins(state *framework.CycleState, pod *v1.Pod, nodeName string) {}
|
||||
|
||||
func (*fakeFramework) RunPostFilterPlugins(pc *framework.PluginContext, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses framework.NodeToStatusMap) *framework.Status {
|
||||
func (*fakeFramework) RunPostFilterPlugins(state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses framework.NodeToStatusMap) *framework.Status {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*fakeFramework) RunReservePlugins(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
|
||||
func (*fakeFramework) RunReservePlugins(state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*fakeFramework) RunUnreservePlugins(pc *framework.PluginContext, pod *v1.Pod, nodeName string) {}
|
||||
func (*fakeFramework) RunUnreservePlugins(state *framework.CycleState, pod *v1.Pod, nodeName string) {}
|
||||
|
||||
func (*fakeFramework) RunPermitPlugins(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
|
||||
func (*fakeFramework) RunPermitPlugins(state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -384,8 +384,8 @@ func (sched *Scheduler) recordSchedulingFailure(pod *v1.Pod, err error, reason s
|
||||
|
||||
// schedule implements the scheduling algorithm and returns the suggested result(host,
|
||||
// evaluated nodes number,feasible nodes number).
|
||||
func (sched *Scheduler) schedule(pod *v1.Pod, pluginContext *framework.PluginContext) (core.ScheduleResult, error) {
|
||||
result, err := sched.Algorithm.Schedule(pluginContext, pod)
|
||||
func (sched *Scheduler) schedule(pod *v1.Pod, state *framework.CycleState) (core.ScheduleResult, error) {
|
||||
result, err := sched.Algorithm.Schedule(state, pod)
|
||||
if err != nil {
|
||||
pod = pod.DeepCopy()
|
||||
sched.recordSchedulingFailure(pod, err, v1.PodReasonUnschedulable, err.Error())
|
||||
@ -397,14 +397,14 @@ func (sched *Scheduler) schedule(pod *v1.Pod, pluginContext *framework.PluginCon
|
||||
// preempt tries to create room for a pod that has failed to schedule, by preempting lower priority pods if possible.
|
||||
// If it succeeds, it adds the name of the node where preemption has happened to the pod spec.
|
||||
// It returns the node name and an error if any.
|
||||
func (sched *Scheduler) preempt(pluginContext *framework.PluginContext, fwk framework.Framework, preemptor *v1.Pod, scheduleErr error) (string, error) {
|
||||
func (sched *Scheduler) preempt(state *framework.CycleState, fwk framework.Framework, preemptor *v1.Pod, scheduleErr error) (string, error) {
|
||||
preemptor, err := sched.PodPreemptor.GetUpdatedPod(preemptor)
|
||||
if err != nil {
|
||||
klog.Errorf("Error getting the updated preemptor pod object: %v", err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
node, victims, nominatedPodsToClear, err := sched.Algorithm.Preempt(pluginContext, preemptor, scheduleErr)
|
||||
node, victims, nominatedPodsToClear, err := sched.Algorithm.Preempt(state, preemptor, scheduleErr)
|
||||
if err != nil {
|
||||
klog.Errorf("Error preempting victims to make room for %v/%v: %v", preemptor.Namespace, preemptor.Name, err)
|
||||
return "", err
|
||||
@ -521,9 +521,9 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
|
||||
|
||||
// bind binds a pod to a given node defined in a binding object. We expect this to run asynchronously, so we
|
||||
// handle binding metrics internally.
|
||||
func (sched *Scheduler) bind(assumed *v1.Pod, targetNode string, pluginContext *framework.PluginContext) error {
|
||||
func (sched *Scheduler) bind(assumed *v1.Pod, targetNode string, state *framework.CycleState) error {
|
||||
bindingStart := time.Now()
|
||||
bindStatus := sched.Framework.RunBindPlugins(pluginContext, assumed, targetNode)
|
||||
bindStatus := sched.Framework.RunBindPlugins(state, assumed, targetNode)
|
||||
var err error
|
||||
if !bindStatus.IsSuccess() {
|
||||
if bindStatus.Code() == framework.Skip {
|
||||
@ -579,8 +579,8 @@ func (sched *Scheduler) scheduleOne() {
|
||||
|
||||
// Synchronously attempt to find a fit for the pod.
|
||||
start := time.Now()
|
||||
pluginContext := framework.NewPluginContext()
|
||||
scheduleResult, err := sched.schedule(pod, pluginContext)
|
||||
state := framework.NewCycleState()
|
||||
scheduleResult, err := sched.schedule(pod, state)
|
||||
if err != nil {
|
||||
// schedule() may have failed because the pod would not fit on any host, so we try to
|
||||
// preempt, with the expectation that the next time the pod is tried for scheduling it
|
||||
@ -592,7 +592,7 @@ func (sched *Scheduler) scheduleOne() {
|
||||
" No preemption is performed.")
|
||||
} else {
|
||||
preemptionStartTime := time.Now()
|
||||
sched.preempt(pluginContext, fwk, pod, fitError)
|
||||
sched.preempt(state, fwk, pod, fitError)
|
||||
metrics.PreemptionAttempts.Inc()
|
||||
metrics.SchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime))
|
||||
metrics.DeprecatedSchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInMicroseconds(preemptionStartTime))
|
||||
@ -630,7 +630,7 @@ func (sched *Scheduler) scheduleOne() {
|
||||
}
|
||||
|
||||
// Run "reserve" plugins.
|
||||
if sts := fwk.RunReservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
|
||||
if sts := fwk.RunReservePlugins(state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
|
||||
sched.recordSchedulingFailure(assumedPod, sts.AsError(), SchedulerError, sts.Message())
|
||||
metrics.PodScheduleErrors.Inc()
|
||||
return
|
||||
@ -642,7 +642,7 @@ func (sched *Scheduler) scheduleOne() {
|
||||
klog.Errorf("error assuming pod: %v", err)
|
||||
metrics.PodScheduleErrors.Inc()
|
||||
// trigger un-reserve plugins to clean up state associated with the reserved Pod
|
||||
fwk.RunUnreservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
|
||||
fwk.RunUnreservePlugins(state, assumedPod, scheduleResult.SuggestedHost)
|
||||
return
|
||||
}
|
||||
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
|
||||
@ -654,13 +654,13 @@ func (sched *Scheduler) scheduleOne() {
|
||||
klog.Errorf("error binding volumes: %v", err)
|
||||
metrics.PodScheduleErrors.Inc()
|
||||
// trigger un-reserve plugins to clean up state associated with the reserved Pod
|
||||
fwk.RunUnreservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
|
||||
fwk.RunUnreservePlugins(state, assumedPod, scheduleResult.SuggestedHost)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Run "permit" plugins.
|
||||
permitStatus := fwk.RunPermitPlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
|
||||
permitStatus := fwk.RunPermitPlugins(state, assumedPod, scheduleResult.SuggestedHost)
|
||||
if !permitStatus.IsSuccess() {
|
||||
var reason string
|
||||
if permitStatus.IsUnschedulable() {
|
||||
@ -674,13 +674,13 @@ func (sched *Scheduler) scheduleOne() {
|
||||
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
|
||||
}
|
||||
// trigger un-reserve plugins to clean up state associated with the reserved Pod
|
||||
fwk.RunUnreservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
|
||||
fwk.RunUnreservePlugins(state, assumedPod, scheduleResult.SuggestedHost)
|
||||
sched.recordSchedulingFailure(assumedPod, permitStatus.AsError(), reason, permitStatus.Message())
|
||||
return
|
||||
}
|
||||
|
||||
// Run "prebind" plugins.
|
||||
preBindStatus := fwk.RunPreBindPlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
|
||||
preBindStatus := fwk.RunPreBindPlugins(state, assumedPod, scheduleResult.SuggestedHost)
|
||||
if !preBindStatus.IsSuccess() {
|
||||
var reason string
|
||||
metrics.PodScheduleErrors.Inc()
|
||||
@ -689,19 +689,19 @@ func (sched *Scheduler) scheduleOne() {
|
||||
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
|
||||
}
|
||||
// trigger un-reserve plugins to clean up state associated with the reserved Pod
|
||||
fwk.RunUnreservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
|
||||
fwk.RunUnreservePlugins(state, assumedPod, scheduleResult.SuggestedHost)
|
||||
sched.recordSchedulingFailure(assumedPod, preBindStatus.AsError(), reason, preBindStatus.Message())
|
||||
return
|
||||
}
|
||||
|
||||
err := sched.bind(assumedPod, scheduleResult.SuggestedHost, pluginContext)
|
||||
err := sched.bind(assumedPod, scheduleResult.SuggestedHost, state)
|
||||
metrics.E2eSchedulingLatency.Observe(metrics.SinceInSeconds(start))
|
||||
metrics.DeprecatedE2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
|
||||
if err != nil {
|
||||
klog.Errorf("error binding pod: %v", err)
|
||||
metrics.PodScheduleErrors.Inc()
|
||||
// trigger un-reserve plugins to clean up state associated with the reserved Pod
|
||||
fwk.RunUnreservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
|
||||
fwk.RunUnreservePlugins(state, assumedPod, scheduleResult.SuggestedHost)
|
||||
sched.recordSchedulingFailure(assumedPod, err, SchedulerError, fmt.Sprintf("Binding rejected: %v", err))
|
||||
} else {
|
||||
// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
|
||||
@ -713,7 +713,7 @@ func (sched *Scheduler) scheduleOne() {
|
||||
metrics.PodScheduleSuccesses.Inc()
|
||||
|
||||
// Run "postbind" plugins.
|
||||
fwk.RunPostBindPlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
|
||||
fwk.RunPostBindPlugins(state, assumedPod, scheduleResult.SuggestedHost)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
@ -151,7 +151,7 @@ type mockScheduler struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (es mockScheduler) Schedule(pc *framework.PluginContext, pod *v1.Pod) (core.ScheduleResult, error) {
|
||||
func (es mockScheduler) Schedule(state *framework.CycleState, pod *v1.Pod) (core.ScheduleResult, error) {
|
||||
return es.result, es.err
|
||||
}
|
||||
|
||||
@ -165,7 +165,7 @@ func (es mockScheduler) Extenders() []algorithm.SchedulerExtender {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (es mockScheduler) Preempt(pc *framework.PluginContext, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
|
||||
func (es mockScheduler) Preempt(state *framework.CycleState, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
|
||||
return nil, nil, nil, nil
|
||||
}
|
||||
|
||||
|
@ -148,7 +148,7 @@ func (sp *ScorePlugin) reset() {
|
||||
}
|
||||
|
||||
// Score returns the score of scheduling a pod on a specific node.
|
||||
func (sp *ScorePlugin) Score(pc *framework.PluginContext, p *v1.Pod, nodeName string) (int, *framework.Status) {
|
||||
func (sp *ScorePlugin) Score(state *framework.CycleState, p *v1.Pod, nodeName string) (int, *framework.Status) {
|
||||
sp.numScoreCalled++
|
||||
if sp.failScore {
|
||||
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", p.Name))
|
||||
@ -179,13 +179,13 @@ func (sp *ScoreWithNormalizePlugin) reset() {
|
||||
}
|
||||
|
||||
// Score returns the score of scheduling a pod on a specific node.
|
||||
func (sp *ScoreWithNormalizePlugin) Score(pc *framework.PluginContext, p *v1.Pod, nodeName string) (int, *framework.Status) {
|
||||
func (sp *ScoreWithNormalizePlugin) Score(state *framework.CycleState, p *v1.Pod, nodeName string) (int, *framework.Status) {
|
||||
sp.numScoreCalled++
|
||||
score := 10
|
||||
return score, nil
|
||||
}
|
||||
|
||||
func (sp *ScoreWithNormalizePlugin) NormalizeScore(pc *framework.PluginContext, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
|
||||
func (sp *ScoreWithNormalizePlugin) NormalizeScore(state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
|
||||
sp.numNormalizeScoreCalled++
|
||||
return nil
|
||||
}
|
||||
@ -207,7 +207,7 @@ func (fp *FilterPlugin) reset() {
|
||||
|
||||
// Filter is a test function that returns an error or nil, depending on the
|
||||
// value of "failFilter".
|
||||
func (fp *FilterPlugin) Filter(pc *framework.PluginContext, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
|
||||
func (fp *FilterPlugin) Filter(state *framework.CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
|
||||
fp.numFilterCalled++
|
||||
|
||||
if fp.failFilter {
|
||||
@ -224,7 +224,7 @@ func (rp *ReservePlugin) Name() string {
|
||||
|
||||
// Reserve is a test function that returns an error or nil, depending on the
|
||||
// value of "failReserve".
|
||||
func (rp *ReservePlugin) Reserve(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
|
||||
func (rp *ReservePlugin) Reserve(state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
|
||||
rp.numReserveCalled++
|
||||
if rp.failReserve {
|
||||
return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name))
|
||||
@ -243,7 +243,7 @@ func (*PostFilterPlugin) Name() string {
|
||||
}
|
||||
|
||||
// PostFilter is a test function.
|
||||
func (pfp *PostFilterPlugin) PostFilter(_ *framework.PluginContext, pod *v1.Pod, _ []*v1.Node, _ framework.NodeToStatusMap) *framework.Status {
|
||||
func (pfp *PostFilterPlugin) PostFilter(_ *framework.CycleState, pod *v1.Pod, _ []*v1.Node, _ framework.NodeToStatusMap) *framework.Status {
|
||||
pfp.numPostFilterCalled++
|
||||
if pfp.failPostFilter {
|
||||
return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name))
|
||||
@ -264,7 +264,7 @@ func (pp *PreBindPlugin) Name() string {
|
||||
}
|
||||
|
||||
// PreBind is a test function that returns (true, nil) or errors for testing.
|
||||
func (pp *PreBindPlugin) PreBind(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
|
||||
func (pp *PreBindPlugin) PreBind(state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
|
||||
pp.numPreBindCalled++
|
||||
if pp.failPreBind {
|
||||
return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name))
|
||||
@ -288,7 +288,7 @@ func (bp *BindPlugin) Name() string {
|
||||
return bp.PluginName
|
||||
}
|
||||
|
||||
func (bp *BindPlugin) Bind(pc *framework.PluginContext, p *v1.Pod, nodeName string) *framework.Status {
|
||||
func (bp *BindPlugin) Bind(state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status {
|
||||
bp.numBindCalled++
|
||||
if bp.pluginInvokeEventChan != nil {
|
||||
bp.pluginInvokeEventChan <- pluginInvokeEvent{pluginName: bp.Name(), val: bp.numBindCalled}
|
||||
@ -318,7 +318,7 @@ func (pp *PostBindPlugin) Name() string {
|
||||
}
|
||||
|
||||
// PostBind is a test function, which counts the number of times called.
|
||||
func (pp *PostBindPlugin) PostBind(pc *framework.PluginContext, pod *v1.Pod, nodeName string) {
|
||||
func (pp *PostBindPlugin) PostBind(state *framework.CycleState, pod *v1.Pod, nodeName string) {
|
||||
pp.numPostBindCalled++
|
||||
if pp.pluginInvokeEventChan != nil {
|
||||
pp.pluginInvokeEventChan <- pluginInvokeEvent{pluginName: pp.Name(), val: pp.numPostBindCalled}
|
||||
@ -341,7 +341,7 @@ func (pp *PreFilterPlugin) Extensions() framework.PreFilterExtensions {
|
||||
}
|
||||
|
||||
// PreFilter is a test function that returns (true, nil) or errors for testing.
|
||||
func (pp *PreFilterPlugin) PreFilter(pc *framework.PluginContext, pod *v1.Pod) *framework.Status {
|
||||
func (pp *PreFilterPlugin) PreFilter(state *framework.CycleState, pod *v1.Pod) *framework.Status {
|
||||
pp.numPreFilterCalled++
|
||||
if pp.failPreFilter {
|
||||
return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name))
|
||||
@ -366,7 +366,7 @@ func (up *UnreservePlugin) Name() string {
|
||||
|
||||
// Unreserve is a test function that returns an error or nil, depending on the
|
||||
// value of "failUnreserve".
|
||||
func (up *UnreservePlugin) Unreserve(pc *framework.PluginContext, pod *v1.Pod, nodeName string) {
|
||||
func (up *UnreservePlugin) Unreserve(state *framework.CycleState, pod *v1.Pod, nodeName string) {
|
||||
up.numUnreserveCalled++
|
||||
if up.pluginInvokeEventChan != nil {
|
||||
up.pluginInvokeEventChan <- pluginInvokeEvent{pluginName: up.Name(), val: up.numUnreserveCalled}
|
||||
@ -384,7 +384,7 @@ func (pp *PermitPlugin) Name() string {
|
||||
}
|
||||
|
||||
// Permit implements the permit test plugin.
|
||||
func (pp *PermitPlugin) Permit(pc *framework.PluginContext, pod *v1.Pod, nodeName string) (*framework.Status, time.Duration) {
|
||||
func (pp *PermitPlugin) Permit(state *framework.CycleState, pod *v1.Pod, nodeName string) (*framework.Status, time.Duration) {
|
||||
pp.numPermitCalled++
|
||||
if pp.failPermit {
|
||||
return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)), 0
|
||||
|
@ -85,7 +85,7 @@ func (fp *tokenFilter) Name() string {
|
||||
return tokenFilterName
|
||||
}
|
||||
|
||||
func (fp *tokenFilter) Filter(pc *framework.PluginContext, pod *v1.Pod,
|
||||
func (fp *tokenFilter) Filter(state *framework.CycleState, pod *v1.Pod,
|
||||
nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
|
||||
if fp.Tokens > 0 {
|
||||
fp.Tokens--
|
||||
@ -98,17 +98,17 @@ func (fp *tokenFilter) Filter(pc *framework.PluginContext, pod *v1.Pod,
|
||||
return framework.NewStatus(status, fmt.Sprintf("can't fit %v", pod.Name))
|
||||
}
|
||||
|
||||
func (fp *tokenFilter) PreFilter(pc *framework.PluginContext, pod *v1.Pod) *framework.Status {
|
||||
func (fp *tokenFilter) PreFilter(state *framework.CycleState, pod *v1.Pod) *framework.Status {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fp *tokenFilter) AddPod(pc *framework.PluginContext, podToSchedule *v1.Pod,
|
||||
func (fp *tokenFilter) AddPod(state *framework.CycleState, podToSchedule *v1.Pod,
|
||||
podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
|
||||
fp.Tokens--
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fp *tokenFilter) RemovePod(pc *framework.PluginContext, podToSchedule *v1.Pod,
|
||||
func (fp *tokenFilter) RemovePod(state *framework.CycleState, podToSchedule *v1.Pod,
|
||||
podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
|
||||
fp.Tokens++
|
||||
return nil
|
||||
|
Loading…
Reference in New Issue
Block a user