diff --git a/pkg/scheduler/core/BUILD b/pkg/scheduler/core/BUILD index 56db5b3c7ec..8e09b6fe884 100644 --- a/pkg/scheduler/core/BUILD +++ b/pkg/scheduler/core/BUILD @@ -15,6 +15,7 @@ go_library( "//pkg/scheduler/algorithm/priorities:go_default_library", "//pkg/scheduler/api:go_default_library", "//pkg/scheduler/apis/extender/v1:go_default_library", + "//pkg/scheduler/framework/plugins/migration:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/queue:go_default_library", diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 9e52f714f6e..1dd238f6091 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -42,6 +42,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" extenderv1 "k8s.io/kubernetes/pkg/scheduler/apis/extender/v1" + migration "k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" @@ -177,7 +178,6 @@ func (g *genericScheduler) Schedule(state *framework.CycleState, pod *v1.Pod) (r if err := podPassesBasicChecks(pod, g.pvcLister); err != nil { return result, err } - // Run "prefilter" plugins. preFilterStatus := g.framework.RunPreFilterPlugins(state, pod) if !preFilterStatus.IsSuccess() { @@ -473,6 +473,7 @@ func (g *genericScheduler) findNodesThatFit(state *framework.CycleState, pod *v1 // We can use the same metadata producer for all nodes. meta := g.predicateMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap) + state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta}) checkNode := func(i int) { nodeName := g.cache.NodeTree().Next() @@ -555,36 +556,42 @@ func (g *genericScheduler) findNodesThatFit(state *framework.CycleState, pod *v1 // addNominatedPods adds pods with equal or greater priority which are nominated // to run on the node given in nodeInfo to meta and nodeInfo. It returns 1) whether -// any pod was added, 2) augmented meta data, 3) augmented nodeInfo. -func addNominatedPods(pod *v1.Pod, meta predicates.PredicateMetadata, +// any pod was added, 2) augmented metadata, 3) augmented CycleState 4) augmented nodeInfo. +func (g *genericScheduler) addNominatedPods(pod *v1.Pod, meta predicates.PredicateMetadata, state *framework.CycleState, nodeInfo *schedulernodeinfo.NodeInfo, queue internalqueue.SchedulingQueue) (bool, predicates.PredicateMetadata, - *schedulernodeinfo.NodeInfo) { + *framework.CycleState, *schedulernodeinfo.NodeInfo, error) { if queue == nil || nodeInfo == nil || nodeInfo.Node() == nil { // This may happen only in tests. - return false, meta, nodeInfo + return false, meta, state, nodeInfo, nil } nominatedPods := queue.NominatedPodsForNode(nodeInfo.Node().Name) - if nominatedPods == nil || len(nominatedPods) == 0 { - return false, meta, nodeInfo + if len(nominatedPods) == 0 { + return false, meta, state, nodeInfo, nil } + nodeInfoOut := nodeInfo.Clone() var metaOut predicates.PredicateMetadata if meta != nil { metaOut = meta.ShallowCopy() } - nodeInfoOut := nodeInfo.Clone() + stateOut := state.Clone() + stateOut.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: metaOut}) podsAdded := false for _, p := range nominatedPods { if podutil.GetPodPriority(p) >= podutil.GetPodPriority(pod) && p.UID != pod.UID { nodeInfoOut.AddPod(p) if metaOut != nil { if err := metaOut.AddPod(p, nodeInfoOut.Node()); err != nil { - klog.Warningf("unable to add pod, nominated pod %s, incoming pod %s: %v", p.Name, pod.Name, err) + return false, meta, state, nodeInfo, err } } + status := g.framework.RunPreFilterExtensionAddPod(stateOut, pod, p, nodeInfoOut) + if !status.IsSuccess() { + return false, meta, state, nodeInfo, status.AsError() + } podsAdded = true } } - return podsAdded, metaOut, nodeInfoOut + return podsAdded, metaOut, stateOut, nodeInfoOut, nil } // podFitsOnNode checks whether a node given by NodeInfo satisfies the given predicate functions. @@ -630,19 +637,25 @@ func (g *genericScheduler) podFitsOnNode( // they may end up getting scheduled to a different node. for i := 0; i < 2; i++ { metaToUse := meta + stateToUse := state nodeInfoToUse := info if i == 0 { - podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(pod, meta, info, queue) + var err error + podsAdded, metaToUse, stateToUse, nodeInfoToUse, err = g.addNominatedPods(pod, meta, state, info, queue) + if err != nil { + return false, []predicates.PredicateFailureReason{}, nil, err + } } else if !podsAdded || len(failedPredicates) != 0 { break } + for _, predicateKey := range predicates.Ordering() { var ( fit bool reasons []predicates.PredicateFailureReason err error ) - //TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric + if predicate, exist := predicateFuncs[predicateKey]; exist { fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse) if err != nil { @@ -663,7 +676,7 @@ func (g *genericScheduler) podFitsOnNode( } } - status = g.framework.RunFilterPlugins(state, pod, nodeInfoToUse) + status = g.framework.RunFilterPlugins(stateToUse, pod, nodeInfoToUse) if !status.IsSuccess() && !status.IsUnschedulable() { return false, failedPredicates, status, status.AsError() } @@ -772,6 +785,7 @@ func PrioritizeNodes( } // Run the Score plugins. + state.Write(migration.PrioritiesStateKey, &migration.PrioritiesStateData{Reference: meta}) scoresMap, scoreStatus := fwk.RunScorePlugins(state, pod, nodes) if !scoreStatus.IsSuccess() { return framework.NodeScoreList{}, scoreStatus.AsError() @@ -1002,13 +1016,18 @@ func (g *genericScheduler) selectNodesForPreemption( meta := metadataProducer(pod, nodeNameToInfo) checkNode := func(i int) { nodeName := potentialNodes[i].Name + if nodeNameToInfo[nodeName] == nil { + return + } + nodeInfoCopy := nodeNameToInfo[nodeName].Clone() var metaCopy predicates.PredicateMetadata if meta != nil { metaCopy = meta.ShallowCopy() } - stateClone := state.Clone() + stateCopy := state.Clone() + stateCopy.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: metaCopy}) pods, numPDBViolations, fits := g.selectVictimsOnNode( - stateClone, pod, metaCopy, nodeNameToInfo[nodeName], fitPredicates, queue, pdbs) + stateCopy, pod, metaCopy, nodeInfoCopy, fitPredicates, queue, pdbs) if fits { resultLock.Lock() victims := extenderv1.Victims{ @@ -1086,35 +1105,31 @@ func (g *genericScheduler) selectVictimsOnNode( queue internalqueue.SchedulingQueue, pdbs []*policy.PodDisruptionBudget, ) ([]*v1.Pod, int, bool) { - if nodeInfo == nil { - return nil, 0, false - } potentialVictims := util.SortableList{CompFunc: util.MoreImportantPod} - nodeInfoCopy := nodeInfo.Clone() removePod := func(rp *v1.Pod) error { - if err := nodeInfoCopy.RemovePod(rp); err != nil { + if err := nodeInfo.RemovePod(rp); err != nil { return err } if meta != nil { - if err := meta.RemovePod(rp, nodeInfoCopy.Node()); err != nil { + if err := meta.RemovePod(rp, nodeInfo.Node()); err != nil { return err } } - status := g.framework.RunPreFilterExtensionRemovePod(state, pod, rp, nodeInfoCopy) + status := g.framework.RunPreFilterExtensionRemovePod(state, pod, rp, nodeInfo) if !status.IsSuccess() { return status.AsError() } return nil } addPod := func(ap *v1.Pod) error { - nodeInfoCopy.AddPod(ap) + nodeInfo.AddPod(ap) if meta != nil { - if err := meta.AddPod(ap, nodeInfoCopy.Node()); err != nil { + if err := meta.AddPod(ap, nodeInfo.Node()); err != nil { return err } } - status := g.framework.RunPreFilterExtensionAddPod(state, pod, ap, nodeInfoCopy) + status := g.framework.RunPreFilterExtensionAddPod(state, pod, ap, nodeInfo) if !status.IsSuccess() { return status.AsError() } @@ -1123,7 +1138,7 @@ func (g *genericScheduler) selectVictimsOnNode( // As the first step, remove all the lower priority pods from the node and // check if the given pod can be scheduled. podPriority := podutil.GetPodPriority(pod) - for _, p := range nodeInfoCopy.Pods() { + for _, p := range nodeInfo.Pods() { if podutil.GetPodPriority(p) < podPriority { potentialVictims.Items = append(potentialVictims.Items, p) if err := removePod(p); err != nil { @@ -1137,7 +1152,7 @@ func (g *genericScheduler) selectVictimsOnNode( // inter-pod affinity to one or more victims, but we have decided not to // support this case for performance reasons. Having affinity to lower // priority pods is not a recommended configuration anyway. - if fits, _, _, err := g.podFitsOnNode(state, pod, meta, nodeInfoCopy, fitPredicates, queue, false); !fits { + if fits, _, _, err := g.podFitsOnNode(state, pod, meta, nodeInfo, fitPredicates, queue, false); !fits { if err != nil { klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err) } @@ -1155,7 +1170,7 @@ func (g *genericScheduler) selectVictimsOnNode( if err := addPod(p); err != nil { return false, err } - fits, _, _, _ := g.podFitsOnNode(state, pod, meta, nodeInfoCopy, fitPredicates, queue, false) + fits, _, _, _ := g.podFitsOnNode(state, pod, meta, nodeInfo, fitPredicates, queue, false) if !fits { if err := removePod(p); err != nil { return false, err diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index a10e7ee8dcd..a9157f4a1f2 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -719,7 +719,7 @@ func TestFindFitAllError(t *testing.T) { nodes := makeNodeList([]string{"3", "2", "1"}) scheduler := makeScheduler(predicates, nodes) - _, predicateMap, _, err := scheduler.findNodesThatFit(nil, &v1.Pod{}) + _, predicateMap, _, err := scheduler.findNodesThatFit(framework.NewCycleState(), &v1.Pod{}) if err != nil { t.Errorf("unexpected error: %v", err) @@ -749,7 +749,7 @@ func TestFindFitSomeError(t *testing.T) { scheduler := makeScheduler(predicates, nodes) pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}} - _, predicateMap, _, err := scheduler.findNodesThatFit(nil, pod) + _, predicateMap, _, err := scheduler.findNodesThatFit(framework.NewCycleState(), pod) if err != nil { t.Errorf("unexpected error: %v", err) @@ -830,7 +830,7 @@ func TestFindFitPredicateCallCounts(t *testing.T) { cache.UpdateNodeInfoSnapshot(scheduler.nodeInfoSnapshot) queue.UpdateNominatedPodForNode(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: types.UID("nominated")}, Spec: v1.PodSpec{Priority: &midPriority}}, "1") - _, _, _, err := scheduler.findNodesThatFit(nil, test.pod) + _, _, _, err := scheduler.findNodesThatFit(framework.NewCycleState(), test.pod) if err != nil { t.Errorf("unexpected error: %v", err) @@ -999,7 +999,7 @@ func TestZeroRequest(t *testing.T) { list, err := PrioritizeNodes( test.pod, nodeNameToInfo, metaData, priorityConfigs, - schedulertesting.FakeNodeLister(test.nodes), []algorithm.SchedulerExtender{}, emptyFramework, nil) + schedulertesting.FakeNodeLister(test.nodes), []algorithm.SchedulerExtender{}, emptyFramework, framework.NewCycleState()) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/pkg/scheduler/framework/plugins/migration/utils.go b/pkg/scheduler/framework/plugins/migration/utils.go index 5962a46f71c..fb6ba859a98 100644 --- a/pkg/scheduler/framework/plugins/migration/utils.go +++ b/pkg/scheduler/framework/plugins/migration/utils.go @@ -21,6 +21,14 @@ import ( framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" ) +const ( + // PredicatesStateKey is the key in CycleState to PredicateStateData + PredicatesStateKey = "predicates" + + // PrioritiesStateKey is the key in CycleState to PrioritiesStateData + PrioritiesStateKey = "priorities" +) + // PredicateResultToFrameworkStatus converts a predicate result (PredicateFailureReason + error) // to a framework status. func PredicateResultToFrameworkStatus(reasons []predicates.PredicateFailureReason, err error) *framework.Status { @@ -47,3 +55,36 @@ func ErrorToFrameworkStatus(err error) *framework.Status { } return nil } + +// PredicatesStateData is a pointer to PredicateMetadata. In the normal case, StateData is supposed to +// be generated and stored in CycleState by a framework plugin (like a PreFilter pre-computing data for +// its corresponding Filter). However, during migration, the scheduler will inject a pointer to +// PredicateMetadata into CycleState. This "hack" is necessary because during migration Filters that implement +// predicates functionality will be calling into the existing predicate functions, and need +// to pass PredicateMetadata. +type PredicatesStateData struct { + Reference interface{} +} + +// Clone is supposed to make a copy of the data, but since this is just a pointer, we are practically +// just copying the pointer. This is ok because the actual reference to the PredicateMetadata +// copy that is made by generic_scheduler during preemption cycle will be injected again outside +// the framework. +func (p *PredicatesStateData) Clone() framework.StateData { + return &PredicatesStateData{ + Reference: p.Reference, + } +} + +// PrioritiesStateData is a pointer to PrioritiesMetadata. +type PrioritiesStateData struct { + Reference interface{} +} + +// Clone is supposed to make a copy of the data, but since this is just a pointer, we are practically +// just copying the pointer. +func (p *PrioritiesStateData) Clone() framework.StateData { + return &PrioritiesStateData{ + Reference: p.Reference, + } +} diff --git a/pkg/scheduler/framework/v1alpha1/framework.go b/pkg/scheduler/framework/v1alpha1/framework.go index cc490b85a61..a0c5a3195a0 100644 --- a/pkg/scheduler/framework/v1alpha1/framework.go +++ b/pkg/scheduler/framework/v1alpha1/framework.go @@ -555,9 +555,8 @@ func (f *framework) GetWaitingPod(uid types.UID) WaitingPod { // point. Returns nil if no plugins where configred. func (f *framework) ListPlugins() map[string][]string { m := make(map[string][]string) - - insert := func(ptr interface{}) { - plugins := reflect.ValueOf(ptr).Elem() + for _, e := range f.getExtensionPoints(&config.Plugins{}) { + plugins := reflect.ValueOf(e.slicePtr).Elem() var names []string for i := 0; i < plugins.Len(); i++ { name := plugins.Index(i).Interface().(Plugin).Name() @@ -568,9 +567,6 @@ func (f *framework) ListPlugins() map[string][]string { m[extName] = names } } - for _, e := range f.getExtensionPoints(&config.Plugins{}) { - insert(e.slicePtr) - } if len(m) > 0 { return m }