Merge pull request #83537 from ahg-g/ahg-metadata

Added PredicateMetadata and PriorityMetadata to framework's CycleState
This commit is contained in:
Kubernetes Prow Robot 2019-10-08 18:37:50 -07:00 committed by GitHub
commit 0110db3a14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 91 additions and 38 deletions

View File

@ -15,6 +15,7 @@ go_library(
"//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/apis/extender/v1:go_default_library",
"//pkg/scheduler/framework/plugins/migration:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",

View File

@ -42,6 +42,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
extenderv1 "k8s.io/kubernetes/pkg/scheduler/apis/extender/v1"
migration "k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
@ -177,7 +178,6 @@ func (g *genericScheduler) Schedule(state *framework.CycleState, pod *v1.Pod) (r
if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
return result, err
}
// Run "prefilter" plugins.
preFilterStatus := g.framework.RunPreFilterPlugins(state, pod)
if !preFilterStatus.IsSuccess() {
@ -473,6 +473,7 @@ func (g *genericScheduler) findNodesThatFit(state *framework.CycleState, pod *v1
// We can use the same metadata producer for all nodes.
meta := g.predicateMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap)
state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta})
checkNode := func(i int) {
nodeName := g.cache.NodeTree().Next()
@ -555,36 +556,42 @@ func (g *genericScheduler) findNodesThatFit(state *framework.CycleState, pod *v1
// addNominatedPods adds pods with equal or greater priority which are nominated
// to run on the node given in nodeInfo to meta and nodeInfo. It returns 1) whether
// any pod was added, 2) augmented meta data, 3) augmented nodeInfo.
func addNominatedPods(pod *v1.Pod, meta predicates.PredicateMetadata,
// any pod was added, 2) augmented metadata, 3) augmented CycleState 4) augmented nodeInfo.
func (g *genericScheduler) addNominatedPods(pod *v1.Pod, meta predicates.PredicateMetadata, state *framework.CycleState,
nodeInfo *schedulernodeinfo.NodeInfo, queue internalqueue.SchedulingQueue) (bool, predicates.PredicateMetadata,
*schedulernodeinfo.NodeInfo) {
*framework.CycleState, *schedulernodeinfo.NodeInfo, error) {
if queue == nil || nodeInfo == nil || nodeInfo.Node() == nil {
// This may happen only in tests.
return false, meta, nodeInfo
return false, meta, state, nodeInfo, nil
}
nominatedPods := queue.NominatedPodsForNode(nodeInfo.Node().Name)
if nominatedPods == nil || len(nominatedPods) == 0 {
return false, meta, nodeInfo
if len(nominatedPods) == 0 {
return false, meta, state, nodeInfo, nil
}
nodeInfoOut := nodeInfo.Clone()
var metaOut predicates.PredicateMetadata
if meta != nil {
metaOut = meta.ShallowCopy()
}
nodeInfoOut := nodeInfo.Clone()
stateOut := state.Clone()
stateOut.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: metaOut})
podsAdded := false
for _, p := range nominatedPods {
if podutil.GetPodPriority(p) >= podutil.GetPodPriority(pod) && p.UID != pod.UID {
nodeInfoOut.AddPod(p)
if metaOut != nil {
if err := metaOut.AddPod(p, nodeInfoOut.Node()); err != nil {
klog.Warningf("unable to add pod, nominated pod %s, incoming pod %s: %v", p.Name, pod.Name, err)
return false, meta, state, nodeInfo, err
}
}
status := g.framework.RunPreFilterExtensionAddPod(stateOut, pod, p, nodeInfoOut)
if !status.IsSuccess() {
return false, meta, state, nodeInfo, status.AsError()
}
podsAdded = true
}
}
return podsAdded, metaOut, nodeInfoOut
return podsAdded, metaOut, stateOut, nodeInfoOut, nil
}
// podFitsOnNode checks whether a node given by NodeInfo satisfies the given predicate functions.
@ -630,19 +637,25 @@ func (g *genericScheduler) podFitsOnNode(
// they may end up getting scheduled to a different node.
for i := 0; i < 2; i++ {
metaToUse := meta
stateToUse := state
nodeInfoToUse := info
if i == 0 {
podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(pod, meta, info, queue)
var err error
podsAdded, metaToUse, stateToUse, nodeInfoToUse, err = g.addNominatedPods(pod, meta, state, info, queue)
if err != nil {
return false, []predicates.PredicateFailureReason{}, nil, err
}
} else if !podsAdded || len(failedPredicates) != 0 {
break
}
for _, predicateKey := range predicates.Ordering() {
var (
fit bool
reasons []predicates.PredicateFailureReason
err error
)
//TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric
if predicate, exist := predicateFuncs[predicateKey]; exist {
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
if err != nil {
@ -663,7 +676,7 @@ func (g *genericScheduler) podFitsOnNode(
}
}
status = g.framework.RunFilterPlugins(state, pod, nodeInfoToUse)
status = g.framework.RunFilterPlugins(stateToUse, pod, nodeInfoToUse)
if !status.IsSuccess() && !status.IsUnschedulable() {
return false, failedPredicates, status, status.AsError()
}
@ -772,6 +785,7 @@ func PrioritizeNodes(
}
// Run the Score plugins.
state.Write(migration.PrioritiesStateKey, &migration.PrioritiesStateData{Reference: meta})
scoresMap, scoreStatus := fwk.RunScorePlugins(state, pod, nodes)
if !scoreStatus.IsSuccess() {
return framework.NodeScoreList{}, scoreStatus.AsError()
@ -1002,13 +1016,18 @@ func (g *genericScheduler) selectNodesForPreemption(
meta := metadataProducer(pod, nodeNameToInfo)
checkNode := func(i int) {
nodeName := potentialNodes[i].Name
if nodeNameToInfo[nodeName] == nil {
return
}
nodeInfoCopy := nodeNameToInfo[nodeName].Clone()
var metaCopy predicates.PredicateMetadata
if meta != nil {
metaCopy = meta.ShallowCopy()
}
stateClone := state.Clone()
stateCopy := state.Clone()
stateCopy.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: metaCopy})
pods, numPDBViolations, fits := g.selectVictimsOnNode(
stateClone, pod, metaCopy, nodeNameToInfo[nodeName], fitPredicates, queue, pdbs)
stateCopy, pod, metaCopy, nodeInfoCopy, fitPredicates, queue, pdbs)
if fits {
resultLock.Lock()
victims := extenderv1.Victims{
@ -1086,35 +1105,31 @@ func (g *genericScheduler) selectVictimsOnNode(
queue internalqueue.SchedulingQueue,
pdbs []*policy.PodDisruptionBudget,
) ([]*v1.Pod, int, bool) {
if nodeInfo == nil {
return nil, 0, false
}
potentialVictims := util.SortableList{CompFunc: util.MoreImportantPod}
nodeInfoCopy := nodeInfo.Clone()
removePod := func(rp *v1.Pod) error {
if err := nodeInfoCopy.RemovePod(rp); err != nil {
if err := nodeInfo.RemovePod(rp); err != nil {
return err
}
if meta != nil {
if err := meta.RemovePod(rp, nodeInfoCopy.Node()); err != nil {
if err := meta.RemovePod(rp, nodeInfo.Node()); err != nil {
return err
}
}
status := g.framework.RunPreFilterExtensionRemovePod(state, pod, rp, nodeInfoCopy)
status := g.framework.RunPreFilterExtensionRemovePod(state, pod, rp, nodeInfo)
if !status.IsSuccess() {
return status.AsError()
}
return nil
}
addPod := func(ap *v1.Pod) error {
nodeInfoCopy.AddPod(ap)
nodeInfo.AddPod(ap)
if meta != nil {
if err := meta.AddPod(ap, nodeInfoCopy.Node()); err != nil {
if err := meta.AddPod(ap, nodeInfo.Node()); err != nil {
return err
}
}
status := g.framework.RunPreFilterExtensionAddPod(state, pod, ap, nodeInfoCopy)
status := g.framework.RunPreFilterExtensionAddPod(state, pod, ap, nodeInfo)
if !status.IsSuccess() {
return status.AsError()
}
@ -1123,7 +1138,7 @@ func (g *genericScheduler) selectVictimsOnNode(
// As the first step, remove all the lower priority pods from the node and
// check if the given pod can be scheduled.
podPriority := podutil.GetPodPriority(pod)
for _, p := range nodeInfoCopy.Pods() {
for _, p := range nodeInfo.Pods() {
if podutil.GetPodPriority(p) < podPriority {
potentialVictims.Items = append(potentialVictims.Items, p)
if err := removePod(p); err != nil {
@ -1137,7 +1152,7 @@ func (g *genericScheduler) selectVictimsOnNode(
// inter-pod affinity to one or more victims, but we have decided not to
// support this case for performance reasons. Having affinity to lower
// priority pods is not a recommended configuration anyway.
if fits, _, _, err := g.podFitsOnNode(state, pod, meta, nodeInfoCopy, fitPredicates, queue, false); !fits {
if fits, _, _, err := g.podFitsOnNode(state, pod, meta, nodeInfo, fitPredicates, queue, false); !fits {
if err != nil {
klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
}
@ -1155,7 +1170,7 @@ func (g *genericScheduler) selectVictimsOnNode(
if err := addPod(p); err != nil {
return false, err
}
fits, _, _, _ := g.podFitsOnNode(state, pod, meta, nodeInfoCopy, fitPredicates, queue, false)
fits, _, _, _ := g.podFitsOnNode(state, pod, meta, nodeInfo, fitPredicates, queue, false)
if !fits {
if err := removePod(p); err != nil {
return false, err

View File

@ -719,7 +719,7 @@ func TestFindFitAllError(t *testing.T) {
nodes := makeNodeList([]string{"3", "2", "1"})
scheduler := makeScheduler(predicates, nodes)
_, predicateMap, _, err := scheduler.findNodesThatFit(nil, &v1.Pod{})
_, predicateMap, _, err := scheduler.findNodesThatFit(framework.NewCycleState(), &v1.Pod{})
if err != nil {
t.Errorf("unexpected error: %v", err)
@ -749,7 +749,7 @@ func TestFindFitSomeError(t *testing.T) {
scheduler := makeScheduler(predicates, nodes)
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}}
_, predicateMap, _, err := scheduler.findNodesThatFit(nil, pod)
_, predicateMap, _, err := scheduler.findNodesThatFit(framework.NewCycleState(), pod)
if err != nil {
t.Errorf("unexpected error: %v", err)
@ -830,7 +830,7 @@ func TestFindFitPredicateCallCounts(t *testing.T) {
cache.UpdateNodeInfoSnapshot(scheduler.nodeInfoSnapshot)
queue.UpdateNominatedPodForNode(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: types.UID("nominated")}, Spec: v1.PodSpec{Priority: &midPriority}}, "1")
_, _, _, err := scheduler.findNodesThatFit(nil, test.pod)
_, _, _, err := scheduler.findNodesThatFit(framework.NewCycleState(), test.pod)
if err != nil {
t.Errorf("unexpected error: %v", err)
@ -999,7 +999,7 @@ func TestZeroRequest(t *testing.T) {
list, err := PrioritizeNodes(
test.pod, nodeNameToInfo, metaData, priorityConfigs,
schedulertesting.FakeNodeLister(test.nodes), []algorithm.SchedulerExtender{}, emptyFramework, nil)
schedulertesting.FakeNodeLister(test.nodes), []algorithm.SchedulerExtender{}, emptyFramework, framework.NewCycleState())
if err != nil {
t.Errorf("unexpected error: %v", err)
}

View File

@ -21,6 +21,14 @@ import (
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
)
const (
// PredicatesStateKey is the key in CycleState to PredicateStateData
PredicatesStateKey = "predicates"
// PrioritiesStateKey is the key in CycleState to PrioritiesStateData
PrioritiesStateKey = "priorities"
)
// PredicateResultToFrameworkStatus converts a predicate result (PredicateFailureReason + error)
// to a framework status.
func PredicateResultToFrameworkStatus(reasons []predicates.PredicateFailureReason, err error) *framework.Status {
@ -47,3 +55,36 @@ func ErrorToFrameworkStatus(err error) *framework.Status {
}
return nil
}
// PredicatesStateData is a pointer to PredicateMetadata. In the normal case, StateData is supposed to
// be generated and stored in CycleState by a framework plugin (like a PreFilter pre-computing data for
// its corresponding Filter). However, during migration, the scheduler will inject a pointer to
// PredicateMetadata into CycleState. This "hack" is necessary because during migration Filters that implement
// predicates functionality will be calling into the existing predicate functions, and need
// to pass PredicateMetadata.
type PredicatesStateData struct {
Reference interface{}
}
// Clone is supposed to make a copy of the data, but since this is just a pointer, we are practically
// just copying the pointer. This is ok because the actual reference to the PredicateMetadata
// copy that is made by generic_scheduler during preemption cycle will be injected again outside
// the framework.
func (p *PredicatesStateData) Clone() framework.StateData {
return &PredicatesStateData{
Reference: p.Reference,
}
}
// PrioritiesStateData is a pointer to PrioritiesMetadata.
type PrioritiesStateData struct {
Reference interface{}
}
// Clone is supposed to make a copy of the data, but since this is just a pointer, we are practically
// just copying the pointer.
func (p *PrioritiesStateData) Clone() framework.StateData {
return &PrioritiesStateData{
Reference: p.Reference,
}
}

View File

@ -555,9 +555,8 @@ func (f *framework) GetWaitingPod(uid types.UID) WaitingPod {
// point. Returns nil if no plugins where configred.
func (f *framework) ListPlugins() map[string][]string {
m := make(map[string][]string)
insert := func(ptr interface{}) {
plugins := reflect.ValueOf(ptr).Elem()
for _, e := range f.getExtensionPoints(&config.Plugins{}) {
plugins := reflect.ValueOf(e.slicePtr).Elem()
var names []string
for i := 0; i < plugins.Len(); i++ {
name := plugins.Index(i).Interface().(Plugin).Name()
@ -568,9 +567,6 @@ func (f *framework) ListPlugins() map[string][]string {
m[extName] = names
}
}
for _, e := range f.getExtensionPoints(&config.Plugins{}) {
insert(e.slicePtr)
}
if len(m) > 0 {
return m
}