Avoid locking when computing predicates.

This commit is contained in:
Wojciech Tyczynski 2016-07-20 08:28:57 +02:00
parent cab7db3a64
commit fc6d38baa2

View File

@ -21,6 +21,7 @@ import (
"fmt" "fmt"
"sort" "sort"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
@ -142,33 +143,39 @@ func findNodesThatFit(
nodes []*api.Node, nodes []*api.Node,
predicateFuncs map[string]algorithm.FitPredicate, predicateFuncs map[string]algorithm.FitPredicate,
extenders []algorithm.SchedulerExtender) ([]*api.Node, FailedPredicateMap, error) { extenders []algorithm.SchedulerExtender) ([]*api.Node, FailedPredicateMap, error) {
// Create filtered list with enough space to avoid growing it. var filtered []*api.Node
filtered := make([]*api.Node, 0, len(nodes))
failedPredicateMap := FailedPredicateMap{} failedPredicateMap := FailedPredicateMap{}
if len(predicateFuncs) == 0 { if len(predicateFuncs) == 0 {
filtered = nodes filtered = nodes
} else { } else {
predicateResultLock := sync.Mutex{} // Create filtered list with enough space to avoid growing it
errs := []error{} // and allow assigning.
filtered = make([]*api.Node, len(nodes))
meta := predicates.PredicateMetadata(pod) meta := predicates.PredicateMetadata(pod)
errs := []error{}
var predicateResultLock sync.Mutex
var filteredLen int32
checkNode := func(i int) { checkNode := func(i int) {
nodeName := nodes[i].Name nodeName := nodes[i].Name
fits, failedPredicate, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs) fits, failedPredicate, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs)
predicateResultLock.Lock()
defer predicateResultLock.Unlock()
if err != nil { if err != nil {
predicateResultLock.Lock()
errs = append(errs, err) errs = append(errs, err)
predicateResultLock.Unlock()
return return
} }
if fits { if fits {
filtered = append(filtered, nodes[i]) filtered[atomic.AddInt32(&filteredLen, 1)-1] = nodes[i]
} else { } else {
predicateResultLock.Lock()
failedPredicateMap[nodeName] = failedPredicate failedPredicateMap[nodeName] = failedPredicate
predicateResultLock.Unlock()
} }
} }
workqueue.Parallelize(16, len(nodes), checkNode) workqueue.Parallelize(16, len(nodes), checkNode)
filtered = filtered[:filteredLen]
if len(errs) > 0 { if len(errs) > 0 {
return []*api.Node{}, FailedPredicateMap{}, errors.NewAggregate(errs) return []*api.Node{}, FailedPredicateMap{}, errors.NewAggregate(errs)
} }