Parallelize computing predicates

This commit is contained in:
Wojciech Tyczynski
2016-04-14 11:13:56 +02:00
parent 545bf184ef
commit 5a73a9d235
2 changed files with 106 additions and 39 deletions

View File

@@ -26,6 +26,7 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
@@ -124,48 +125,32 @@ func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList
// Filters the nodes to find the ones that fit based on the given predicate functions
// Each node is passed through the predicate functions to determine if it is a fit
func findNodesThatFit(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate, nodes api.NodeList, extenders []algorithm.SchedulerExtender) (api.NodeList, FailedPredicateMap, error) {
predicateResultLock := sync.Mutex{}
filtered := []api.Node{}
failedPredicateMap := FailedPredicateMap{}
errs := []error{}
for _, node := range nodes.Items {
fits := true
for _, predicate := range predicateFuncs {
fit, err := predicate(pod, node.Name, nodeNameToInfo[node.Name])
if err != nil {
switch e := err.(type) {
case *predicates.InsufficientResourceError:
if fit {
err := fmt.Errorf("got InsufficientResourceError: %v, but also fit='true' which is unexpected", e)
return api.NodeList{}, FailedPredicateMap{}, err
}
case *predicates.PredicateFailureError:
if fit {
err := fmt.Errorf("got PredicateFailureError: %v, but also fit='true' which is unexpected", e)
return api.NodeList{}, FailedPredicateMap{}, err
}
default:
return api.NodeList{}, FailedPredicateMap{}, err
}
}
if !fit {
fits = false
if re, ok := err.(*predicates.InsufficientResourceError); ok {
failedPredicateMap[node.Name] = fmt.Sprintf("Insufficient %s", re.ResourceName)
break
}
if re, ok := err.(*predicates.PredicateFailureError); ok {
failedPredicateMap[node.Name] = re.PredicateName
break
} else {
err := fmt.Errorf("SchedulerPredicates failed due to %v, which is unexpected.", err)
return api.NodeList{}, FailedPredicateMap{}, err
}
}
checkNode := func(i int) {
nodeName := nodes.Items[i].Name
fits, failedPredicate, err := podFitsOnNode(pod, nodeName, nodeNameToInfo[nodeName], predicateFuncs)
predicateResultLock.Lock()
defer predicateResultLock.Unlock()
if err != nil {
errs = append(errs, err)
return
}
if fits {
filtered = append(filtered, node)
filtered = append(filtered, nodes.Items[i])
} else {
failedPredicateMap[nodeName] = failedPredicate
}
}
workqueue.Parallelize(16, len(nodes.Items), checkNode)
if len(errs) > 0 {
return api.NodeList{}, FailedPredicateMap{}, errors.NewAggregate(errs)
}
if len(filtered) > 0 && len(extenders) != 0 {
for _, extender := range extenders {
filteredList, err := extender.Filter(pod, &api.NodeList{Items: filtered})
@@ -181,6 +166,41 @@ func findNodesThatFit(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.No
return api.NodeList{Items: filtered}, failedPredicateMap, nil
}
// Checks whether node with a given name and NodeInfo satisfies all predicateFuncs.
func podFitsOnNode(pod *api.Pod, nodeName string, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate) (bool, string, error) {
for _, predicate := range predicateFuncs {
fit, err := predicate(pod, nodeName, info)
if err != nil {
switch e := err.(type) {
case *predicates.InsufficientResourceError:
if fit {
err := fmt.Errorf("got InsufficientResourceError: %v, but also fit='true' which is unexpected", e)
return false, "", err
}
case *predicates.PredicateFailureError:
if fit {
err := fmt.Errorf("got PredicateFailureError: %v, but also fit='true' which is unexpected", e)
return false, "", err
}
default:
return false, "", err
}
}
if !fit {
if re, ok := err.(*predicates.InsufficientResourceError); ok {
return false, fmt.Sprintf("Insufficient %s", re.ResourceName), nil
}
if re, ok := err.(*predicates.PredicateFailureError); ok {
return false, re.PredicateName, nil
} else {
err := fmt.Errorf("SchedulerPredicates failed due to %v, which is unexpected.", err)
return false, "", err
}
}
}
return true, "", nil
}
// Prioritizes the nodes by running the individual priority functions in parallel.
// Each priority function is expected to set a score of 0-10
// 0 is the lowest priority score (least preferred node) and 10 is the highest
@@ -221,18 +241,17 @@ func PrioritizeNodes(
weight := config.Weight
priorityFunc := config.Function
prioritizedList, err := priorityFunc(pod, nodeNameToInfo, nodeLister)
mu.Lock()
defer mu.Unlock()
if err != nil {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
return
}
mu.Lock()
for i := range prioritizedList {
host, score := prioritizedList[i].Host, prioritizedList[i].Score
combinedScores[host] += score * weight
}
mu.Unlock()
}(priorityConfig)
}
if len(errs) != 0 {