Use error channel to capture first error.

This commit is contained in:
Abdullah Gharaibeh 2019-07-16 10:00:42 -04:00
parent c54c4d1962
commit 73704f1b8e

View File

@ -19,7 +19,6 @@ package v1alpha1
import ( import (
"context" "context"
"fmt" "fmt"
"sync"
"time" "time"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
@ -29,6 +28,7 @@ import (
"k8s.io/klog" "k8s.io/klog"
"k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/internal/cache" "k8s.io/kubernetes/pkg/scheduler/internal/cache"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
) )
// framework is the component responsible for initializing and running scheduler // framework is the component responsible for initializing and running scheduler
@ -273,35 +273,26 @@ func (f *framework) RunScorePlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1.
pluginToNodeScoreMap[pl.Name()] = make(NodeScoreList, len(nodes)) pluginToNodeScoreMap[pl.Name()] = make(NodeScoreList, len(nodes))
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
var firstErr *Status errCh := schedutil.NewErrorChannel()
var mu = sync.Mutex{}
catchError := func(err *Status) {
mu.Lock()
defer mu.Unlock()
if firstErr == nil {
firstErr = err
}
cancel()
}
workqueue.ParallelizeUntil(ctx, 16, len(nodes), func(index int) { workqueue.ParallelizeUntil(ctx, 16, len(nodes), func(index int) {
for _, pl := range f.scorePlugins { for _, pl := range f.scorePlugins {
weight, weightExists := f.pluginNameToWeightMap[pl.Name()] weight, weightExists := f.pluginNameToWeightMap[pl.Name()]
if !weightExists { if !weightExists {
errMsg := fmt.Sprintf("weight does not exist for plugin %v", pl.Name()) err := fmt.Errorf("weight does not exist for plugin %v", pl.Name())
catchError(NewStatus(Error, errMsg)) errCh.SendErrorWithCancel(err, cancel)
return return
} }
score, status := pl.Score(pc, pod, nodes[index].Name) score, status := pl.Score(pc, pod, nodes[index].Name)
if !status.IsSuccess() { if !status.IsSuccess() {
catchError(status) errCh.SendErrorWithCancel(fmt.Errorf(status.Message()), cancel)
return return
} }
pluginToNodeScoreMap[pl.Name()][index] = score * weight pluginToNodeScoreMap[pl.Name()][index] = score * weight
} }
}) })
if firstErr != nil {
msg := fmt.Sprintf("error while running score plugin for pod %v: %v", pod.Name, firstErr.Message()) if err := errCh.ReceiveError(); err != nil {
msg := fmt.Sprintf("error while running score plugin for pod %v: %v", pod.Name, err)
klog.Error(msg) klog.Error(msg)
return nil, NewStatus(Error, msg) return nil, NewStatus(Error, msg)
} }