mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 12:15:52 +00:00
Merge pull request #24248 from wojtek-t/parallel_predicates
Automatic merge from submit-queue Parallelize computing predicates in scheduler @davidopp
This commit is contained in:
commit
7664509c73
48
pkg/util/workqueue/parallelizer.go
Normal file
48
pkg/util/workqueue/parallelizer.go
Normal file
@ -0,0 +1,48 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package workqueue
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
||||
)
|
||||
|
||||
type DoWorkPieceFunc func(piece int)
|
||||
|
||||
// Parallelize is a very simple framework that allow for parallelizing
|
||||
// N independent pieces of work.
|
||||
func Parallelize(workers, pieces int, doWorkPiece DoWorkPieceFunc) {
|
||||
toProcess := make(chan int, pieces)
|
||||
for i := 0; i < pieces; i++ {
|
||||
toProcess <- i
|
||||
}
|
||||
close(toProcess)
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(workers)
|
||||
for i := 0; i < workers; i++ {
|
||||
go func() {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer wg.Done()
|
||||
for piece := range toProcess {
|
||||
doWorkPiece(piece)
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
@ -21,20 +21,19 @@ import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/util/errors"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/util/workqueue"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
|
||||
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||
)
|
||||
|
||||
type FailedPredicateMap map[string]sets.String
|
||||
type FailedPredicateMap map[string]string
|
||||
|
||||
type FitError struct {
|
||||
Pod *api.Pod
|
||||
@ -47,8 +46,8 @@ var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods")
|
||||
func (f *FitError) Error() string {
|
||||
var buf bytes.Buffer
|
||||
buf.WriteString(fmt.Sprintf("pod (%s) failed to fit in any node\n", f.Pod.Name))
|
||||
for node, predicateList := range f.FailedPredicates {
|
||||
reason := fmt.Sprintf("fit failure on node (%s): %s\n", node, strings.Join(predicateList.List(), ","))
|
||||
for node, predicate := range f.FailedPredicates {
|
||||
reason := fmt.Sprintf("fit failure on node (%s): %s\n", node, predicate)
|
||||
buf.WriteString(reason)
|
||||
}
|
||||
return buf.String()
|
||||
@ -126,51 +125,32 @@ func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList
|
||||
// Filters the nodes to find the ones that fit based on the given predicate functions
|
||||
// Each node is passed through the predicate functions to determine if it is a fit
|
||||
func findNodesThatFit(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate, nodes api.NodeList, extenders []algorithm.SchedulerExtender) (api.NodeList, FailedPredicateMap, error) {
|
||||
predicateResultLock := sync.Mutex{}
|
||||
filtered := []api.Node{}
|
||||
failedPredicateMap := FailedPredicateMap{}
|
||||
errs := []error{}
|
||||
|
||||
for _, node := range nodes.Items {
|
||||
fits := true
|
||||
for _, predicate := range predicateFuncs {
|
||||
fit, err := predicate(pod, node.Name, nodeNameToInfo[node.Name])
|
||||
if err != nil {
|
||||
switch e := err.(type) {
|
||||
case *predicates.InsufficientResourceError:
|
||||
if fit {
|
||||
err := fmt.Errorf("got InsufficientResourceError: %v, but also fit='true' which is unexpected", e)
|
||||
return api.NodeList{}, FailedPredicateMap{}, err
|
||||
}
|
||||
case *predicates.PredicateFailureError:
|
||||
if fit {
|
||||
err := fmt.Errorf("got PredicateFailureError: %v, but also fit='true' which is unexpected", e)
|
||||
return api.NodeList{}, FailedPredicateMap{}, err
|
||||
}
|
||||
default:
|
||||
return api.NodeList{}, FailedPredicateMap{}, err
|
||||
}
|
||||
}
|
||||
if !fit {
|
||||
fits = false
|
||||
if _, found := failedPredicateMap[node.Name]; !found {
|
||||
failedPredicateMap[node.Name] = sets.String{}
|
||||
}
|
||||
if re, ok := err.(*predicates.InsufficientResourceError); ok {
|
||||
failedPredicateMap[node.Name].Insert(fmt.Sprintf("Insufficient %s", re.ResourceName))
|
||||
break
|
||||
}
|
||||
if re, ok := err.(*predicates.PredicateFailureError); ok {
|
||||
failedPredicateMap[node.Name].Insert(re.PredicateName)
|
||||
break
|
||||
} else {
|
||||
err := fmt.Errorf("SchedulerPredicates failed due to %v, which is unexpected.", err)
|
||||
return api.NodeList{}, FailedPredicateMap{}, err
|
||||
}
|
||||
}
|
||||
checkNode := func(i int) {
|
||||
nodeName := nodes.Items[i].Name
|
||||
fits, failedPredicate, err := podFitsOnNode(pod, nodeName, nodeNameToInfo[nodeName], predicateFuncs)
|
||||
|
||||
predicateResultLock.Lock()
|
||||
defer predicateResultLock.Unlock()
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
return
|
||||
}
|
||||
if fits {
|
||||
filtered = append(filtered, node)
|
||||
filtered = append(filtered, nodes.Items[i])
|
||||
} else {
|
||||
failedPredicateMap[nodeName] = failedPredicate
|
||||
}
|
||||
}
|
||||
workqueue.Parallelize(16, len(nodes.Items), checkNode)
|
||||
if len(errs) > 0 {
|
||||
return api.NodeList{}, FailedPredicateMap{}, errors.NewAggregate(errs)
|
||||
}
|
||||
|
||||
if len(filtered) > 0 && len(extenders) != 0 {
|
||||
for _, extender := range extenders {
|
||||
filteredList, err := extender.Filter(pod, &api.NodeList{Items: filtered})
|
||||
@ -186,6 +166,41 @@ func findNodesThatFit(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.No
|
||||
return api.NodeList{Items: filtered}, failedPredicateMap, nil
|
||||
}
|
||||
|
||||
// Checks whether node with a given name and NodeInfo satisfies all predicateFuncs.
|
||||
func podFitsOnNode(pod *api.Pod, nodeName string, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate) (bool, string, error) {
|
||||
for _, predicate := range predicateFuncs {
|
||||
fit, err := predicate(pod, nodeName, info)
|
||||
if err != nil {
|
||||
switch e := err.(type) {
|
||||
case *predicates.InsufficientResourceError:
|
||||
if fit {
|
||||
err := fmt.Errorf("got InsufficientResourceError: %v, but also fit='true' which is unexpected", e)
|
||||
return false, "", err
|
||||
}
|
||||
case *predicates.PredicateFailureError:
|
||||
if fit {
|
||||
err := fmt.Errorf("got PredicateFailureError: %v, but also fit='true' which is unexpected", e)
|
||||
return false, "", err
|
||||
}
|
||||
default:
|
||||
return false, "", err
|
||||
}
|
||||
}
|
||||
if !fit {
|
||||
if re, ok := err.(*predicates.InsufficientResourceError); ok {
|
||||
return false, fmt.Sprintf("Insufficient %s", re.ResourceName), nil
|
||||
}
|
||||
if re, ok := err.(*predicates.PredicateFailureError); ok {
|
||||
return false, re.PredicateName, nil
|
||||
} else {
|
||||
err := fmt.Errorf("SchedulerPredicates failed due to %v, which is unexpected.", err)
|
||||
return false, "", err
|
||||
}
|
||||
}
|
||||
}
|
||||
return true, "", nil
|
||||
}
|
||||
|
||||
// Prioritizes the nodes by running the individual priority functions in parallel.
|
||||
// Each priority function is expected to set a score of 0-10
|
||||
// 0 is the lowest priority score (least preferred node) and 10 is the highest
|
||||
@ -226,18 +241,17 @@ func PrioritizeNodes(
|
||||
weight := config.Weight
|
||||
priorityFunc := config.Function
|
||||
prioritizedList, err := priorityFunc(pod, nodeNameToInfo, nodeLister)
|
||||
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
if err != nil {
|
||||
mu.Lock()
|
||||
errs = append(errs, err)
|
||||
mu.Unlock()
|
||||
return
|
||||
}
|
||||
mu.Lock()
|
||||
for i := range prioritizedList {
|
||||
host, score := prioritizedList[i].Host, prioritizedList[i].Score
|
||||
combinedScores[host] += score * weight
|
||||
}
|
||||
mu.Unlock()
|
||||
}(priorityConfig)
|
||||
}
|
||||
if len(errs) != 0 {
|
||||
|
@ -309,12 +309,12 @@ func TestFindFitAllError(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, node := range nodes {
|
||||
failures, found := predicateMap[node]
|
||||
failure, found := predicateMap[node]
|
||||
if !found {
|
||||
t.Errorf("failed to find node: %s in %v", node, predicateMap)
|
||||
}
|
||||
if len(failures) != 1 || !failures.Has("false") {
|
||||
t.Errorf("unexpected failures: %v", failures)
|
||||
if failure != "false" {
|
||||
t.Errorf("unexpected failures: %v", failure)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -342,12 +342,12 @@ func TestFindFitSomeError(t *testing.T) {
|
||||
if node == pod.Name {
|
||||
continue
|
||||
}
|
||||
failures, found := predicateMap[node]
|
||||
failure, found := predicateMap[node]
|
||||
if !found {
|
||||
t.Errorf("failed to find node: %s in %v", node, predicateMap)
|
||||
}
|
||||
if len(failures) != 1 || !failures.Has("false") {
|
||||
t.Errorf("unexpected failures: %v", failures)
|
||||
if failure != "false" {
|
||||
t.Errorf("unexpected failures: %v", failure)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user