diff --git a/pkg/scheduler/framework/v1alpha1/framework.go b/pkg/scheduler/framework/v1alpha1/framework.go index 9e2a5e4a732..7dce3761a13 100644 --- a/pkg/scheduler/framework/v1alpha1/framework.go +++ b/pkg/scheduler/framework/v1alpha1/framework.go @@ -19,7 +19,6 @@ package v1alpha1 import ( "context" "fmt" - "sync" "time" "k8s.io/api/core/v1" @@ -29,6 +28,7 @@ import ( "k8s.io/klog" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/internal/cache" + schedutil "k8s.io/kubernetes/pkg/scheduler/util" ) // framework is the component responsible for initializing and running scheduler @@ -273,35 +273,26 @@ func (f *framework) RunScorePlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1. pluginToNodeScoreMap[pl.Name()] = make(NodeScoreList, len(nodes)) } ctx, cancel := context.WithCancel(context.Background()) - var firstErr *Status - var mu = sync.Mutex{} - catchError := func(err *Status) { - mu.Lock() - defer mu.Unlock() - if firstErr == nil { - firstErr = err - } - cancel() - } - + errCh := schedutil.NewErrorChannel() workqueue.ParallelizeUntil(ctx, 16, len(nodes), func(index int) { for _, pl := range f.scorePlugins { weight, weightExists := f.pluginNameToWeightMap[pl.Name()] if !weightExists { - errMsg := fmt.Sprintf("weight does not exist for plugin %v", pl.Name()) - catchError(NewStatus(Error, errMsg)) + err := fmt.Errorf("weight does not exist for plugin %v", pl.Name()) + errCh.SendErrorWithCancel(err, cancel) return } score, status := pl.Score(pc, pod, nodes[index].Name) if !status.IsSuccess() { - catchError(status) + errCh.SendErrorWithCancel(fmt.Errorf(status.Message()), cancel) return } pluginToNodeScoreMap[pl.Name()][index] = score * weight } }) - if firstErr != nil { - msg := fmt.Sprintf("error while running score plugin for pod %v: %v", pod.Name, firstErr.Message()) + + if err := errCh.ReceiveError(); err != nil { + msg := fmt.Sprintf("error while running score plugin for pod %v: %v", pod.Name, err) klog.Error(msg) return nil, NewStatus(Error, msg) }