diff --git a/pkg/util/workqueue/parallelizer.go b/pkg/util/workqueue/parallelizer.go new file mode 100644 index 00000000000..9b773d53885 --- /dev/null +++ b/pkg/util/workqueue/parallelizer.go @@ -0,0 +1,48 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "sync" + + utilruntime "k8s.io/kubernetes/pkg/util/runtime" +) + +type DoWorkPieceFunc func(piece int) + +// Parallelize is a very simple framework that allow for parallelizing +// N independent pieces of work. +func Parallelize(workers, pieces int, doWorkPiece DoWorkPieceFunc) { + toProcess := make(chan int, pieces) + for i := 0; i < pieces; i++ { + toProcess <- i + } + close(toProcess) + + wg := sync.WaitGroup{} + wg.Add(workers) + for i := 0; i < workers; i++ { + go func() { + defer utilruntime.HandleCrash() + defer wg.Done() + for piece := range toProcess { + doWorkPiece(piece) + } + }() + } + wg.Wait() +} diff --git a/plugin/pkg/scheduler/generic_scheduler.go b/plugin/pkg/scheduler/generic_scheduler.go index 85fb6f9593a..d1a8abc59e2 100644 --- a/plugin/pkg/scheduler/generic_scheduler.go +++ b/plugin/pkg/scheduler/generic_scheduler.go @@ -21,20 +21,19 @@ import ( "fmt" "math/rand" "sort" - "strings" "sync" "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/util/errors" - "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) -type FailedPredicateMap map[string]sets.String +type FailedPredicateMap map[string]string type FitError struct { Pod *api.Pod @@ -47,8 +46,8 @@ var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods") func (f *FitError) Error() string { var buf bytes.Buffer buf.WriteString(fmt.Sprintf("pod (%s) failed to fit in any node\n", f.Pod.Name)) - for node, predicateList := range f.FailedPredicates { - reason := fmt.Sprintf("fit failure on node (%s): %s\n", node, strings.Join(predicateList.List(), ",")) + for node, predicate := range f.FailedPredicates { + reason := fmt.Sprintf("fit failure on node (%s): %s\n", node, predicate) buf.WriteString(reason) } return buf.String() @@ -126,51 +125,32 @@ func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList // Filters the nodes to find the ones that fit based on the given predicate functions // Each node is passed through the predicate functions to determine if it is a fit func findNodesThatFit(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate, nodes api.NodeList, extenders []algorithm.SchedulerExtender) (api.NodeList, FailedPredicateMap, error) { + predicateResultLock := sync.Mutex{} filtered := []api.Node{} failedPredicateMap := FailedPredicateMap{} + errs := []error{} - for _, node := range nodes.Items { - fits := true - for _, predicate := range predicateFuncs { - fit, err := predicate(pod, node.Name, nodeNameToInfo[node.Name]) - if err != nil { - switch e := err.(type) { - case *predicates.InsufficientResourceError: - if fit { - err := fmt.Errorf("got InsufficientResourceError: %v, but also fit='true' which is unexpected", e) - return api.NodeList{}, FailedPredicateMap{}, err - } - case *predicates.PredicateFailureError: - if fit { - err := fmt.Errorf("got PredicateFailureError: %v, but also fit='true' which is unexpected", e) - return api.NodeList{}, FailedPredicateMap{}, err - } - default: - return api.NodeList{}, FailedPredicateMap{}, err - } - } - if !fit { - fits = false - if _, found := failedPredicateMap[node.Name]; !found { - failedPredicateMap[node.Name] = sets.String{} - } - if re, ok := err.(*predicates.InsufficientResourceError); ok { - failedPredicateMap[node.Name].Insert(fmt.Sprintf("Insufficient %s", re.ResourceName)) - break - } - if re, ok := err.(*predicates.PredicateFailureError); ok { - failedPredicateMap[node.Name].Insert(re.PredicateName) - break - } else { - err := fmt.Errorf("SchedulerPredicates failed due to %v, which is unexpected.", err) - return api.NodeList{}, FailedPredicateMap{}, err - } - } + checkNode := func(i int) { + nodeName := nodes.Items[i].Name + fits, failedPredicate, err := podFitsOnNode(pod, nodeName, nodeNameToInfo[nodeName], predicateFuncs) + + predicateResultLock.Lock() + defer predicateResultLock.Unlock() + if err != nil { + errs = append(errs, err) + return } if fits { - filtered = append(filtered, node) + filtered = append(filtered, nodes.Items[i]) + } else { + failedPredicateMap[nodeName] = failedPredicate } } + workqueue.Parallelize(16, len(nodes.Items), checkNode) + if len(errs) > 0 { + return api.NodeList{}, FailedPredicateMap{}, errors.NewAggregate(errs) + } + if len(filtered) > 0 && len(extenders) != 0 { for _, extender := range extenders { filteredList, err := extender.Filter(pod, &api.NodeList{Items: filtered}) @@ -186,6 +166,41 @@ func findNodesThatFit(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.No return api.NodeList{Items: filtered}, failedPredicateMap, nil } +// Checks whether node with a given name and NodeInfo satisfies all predicateFuncs. +func podFitsOnNode(pod *api.Pod, nodeName string, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate) (bool, string, error) { + for _, predicate := range predicateFuncs { + fit, err := predicate(pod, nodeName, info) + if err != nil { + switch e := err.(type) { + case *predicates.InsufficientResourceError: + if fit { + err := fmt.Errorf("got InsufficientResourceError: %v, but also fit='true' which is unexpected", e) + return false, "", err + } + case *predicates.PredicateFailureError: + if fit { + err := fmt.Errorf("got PredicateFailureError: %v, but also fit='true' which is unexpected", e) + return false, "", err + } + default: + return false, "", err + } + } + if !fit { + if re, ok := err.(*predicates.InsufficientResourceError); ok { + return false, fmt.Sprintf("Insufficient %s", re.ResourceName), nil + } + if re, ok := err.(*predicates.PredicateFailureError); ok { + return false, re.PredicateName, nil + } else { + err := fmt.Errorf("SchedulerPredicates failed due to %v, which is unexpected.", err) + return false, "", err + } + } + } + return true, "", nil +} + // Prioritizes the nodes by running the individual priority functions in parallel. // Each priority function is expected to set a score of 0-10 // 0 is the lowest priority score (least preferred node) and 10 is the highest @@ -226,18 +241,17 @@ func PrioritizeNodes( weight := config.Weight priorityFunc := config.Function prioritizedList, err := priorityFunc(pod, nodeNameToInfo, nodeLister) + + mu.Lock() + defer mu.Unlock() if err != nil { - mu.Lock() errs = append(errs, err) - mu.Unlock() return } - mu.Lock() for i := range prioritizedList { host, score := prioritizedList[i].Host, prioritizedList[i].Score combinedScores[host] += score * weight } - mu.Unlock() }(priorityConfig) } if len(errs) != 0 { diff --git a/plugin/pkg/scheduler/generic_scheduler_test.go b/plugin/pkg/scheduler/generic_scheduler_test.go index 0bce1fedd32..4519d5f507c 100644 --- a/plugin/pkg/scheduler/generic_scheduler_test.go +++ b/plugin/pkg/scheduler/generic_scheduler_test.go @@ -309,12 +309,12 @@ func TestFindFitAllError(t *testing.T) { } for _, node := range nodes { - failures, found := predicateMap[node] + failure, found := predicateMap[node] if !found { t.Errorf("failed to find node: %s in %v", node, predicateMap) } - if len(failures) != 1 || !failures.Has("false") { - t.Errorf("unexpected failures: %v", failures) + if failure != "false" { + t.Errorf("unexpected failures: %v", failure) } } } @@ -342,12 +342,12 @@ func TestFindFitSomeError(t *testing.T) { if node == pod.Name { continue } - failures, found := predicateMap[node] + failure, found := predicateMap[node] if !found { t.Errorf("failed to find node: %s in %v", node, predicateMap) } - if len(failures) != 1 || !failures.Has("false") { - t.Errorf("unexpected failures: %v", failures) + if failure != "false" { + t.Errorf("unexpected failures: %v", failure) } } }