From fab1948a1be09f86f1d31d424e37a11d2a0c20a4 Mon Sep 17 00:00:00 2001 From: draveness Date: Thu, 4 Jul 2019 16:26:42 +0800 Subject: [PATCH] feat: use channel instead of mutex in scheduling predicates --- .../algorithm/predicates/metadata.go | 43 ++++++-------- pkg/scheduler/util/BUILD | 2 + pkg/scheduler/util/error_channel.go | 59 +++++++++++++++++++ pkg/scheduler/util/error_channel_test.go | 47 +++++++++++++++ 4 files changed, 127 insertions(+), 24 deletions(-) create mode 100644 pkg/scheduler/util/error_channel.go create mode 100644 pkg/scheduler/util/error_channel_test.go diff --git a/pkg/scheduler/algorithm/predicates/metadata.go b/pkg/scheduler/algorithm/predicates/metadata.go index e25f86695c0..095d80f29d5 100644 --- a/pkg/scheduler/algorithm/predicates/metadata.go +++ b/pkg/scheduler/algorithm/predicates/metadata.go @@ -372,9 +372,8 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, nodeInfoMap map[string]*s allNodeNames = append(allNodeNames, name) } + errCh := schedutil.NewErrorChannel() var lock sync.Mutex - var firstError error - topologyMaps := newTopologyPairsMaps() appendTopologyPairsMaps := func(toAppend *topologyPairsMaps) { @@ -382,13 +381,6 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, nodeInfoMap map[string]*s defer lock.Unlock() topologyMaps.appendMaps(toAppend) } - catchError := func(err error) { - lock.Lock() - defer lock.Unlock() - if firstError == nil { - firstError = err - } - } ctx, cancel := context.WithCancel(context.Background()) @@ -402,15 +394,19 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, nodeInfoMap map[string]*s for _, existingPod := range nodeInfo.PodsWithAffinity() { existingPodTopologyMaps, err := getMatchingAntiAffinityTopologyPairsOfPod(pod, existingPod, node) if err != nil { - catchError(err) - cancel() + errCh.SendErrorWithCancel(err, cancel) return } appendTopologyPairsMaps(existingPodTopologyMaps) } } workqueue.ParallelizeUntil(ctx, 16, len(allNodeNames), processNode) - return topologyMaps, firstError + + if err := errCh.ReceiveError(); err != nil { + return nil, err + } + + return topologyMaps, nil } // getTPMapMatchingIncomingAffinityAntiAffinity finds existing Pods that match affinity terms of the given "pod". @@ -428,8 +424,9 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, nodeInfoMap map[s allNodeNames = append(allNodeNames, name) } + errCh := schedutil.NewErrorChannel() + var lock sync.Mutex - var firstError error topologyPairsAffinityPodsMaps = newTopologyPairsMaps() topologyPairsAntiAffinityPodsMaps = newTopologyPairsMaps() appendResult := func(nodeName string, nodeTopologyPairsAffinityPodsMaps, nodeTopologyPairsAntiAffinityPodsMaps *topologyPairsMaps) { @@ -443,19 +440,12 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, nodeInfoMap map[s } } - catchError := func(err error) { - lock.Lock() - defer lock.Unlock() - if firstError == nil { - firstError = err - } - } - affinityTerms := GetPodAffinityTerms(affinity.PodAffinity) affinityProperties, err := getAffinityTermProperties(pod, affinityTerms) if err != nil { return nil, nil, err } + antiAffinityTerms := GetPodAntiAffinityTerms(affinity.PodAntiAffinity) ctx, cancel := context.WithCancel(context.Background()) @@ -484,8 +474,7 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, nodeInfoMap map[s namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(pod, &term) selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector) if err != nil { - catchError(err) - cancel() + errCh.SendErrorWithCancel(err, cancel) return } if priorityutil.PodMatchesTermsNamespaceAndSelector(existingPod, namespaces, selector) { @@ -496,12 +485,18 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, nodeInfoMap map[s } } } + if len(nodeTopologyPairsAffinityPodsMaps.topologyPairToPods) > 0 || len(nodeTopologyPairsAntiAffinityPodsMaps.topologyPairToPods) > 0 { appendResult(node.Name, nodeTopologyPairsAffinityPodsMaps, nodeTopologyPairsAntiAffinityPodsMaps) } } workqueue.ParallelizeUntil(ctx, 16, len(allNodeNames), processNode) - return topologyPairsAffinityPodsMaps, topologyPairsAntiAffinityPodsMaps, firstError + + if err := errCh.ReceiveError(); err != nil { + return nil, nil, err + } + + return topologyPairsAffinityPodsMaps, topologyPairsAntiAffinityPodsMaps, nil } // targetPodMatchesAffinityOfPod returns true if "targetPod" matches ALL affinity terms of diff --git a/pkg/scheduler/util/BUILD b/pkg/scheduler/util/BUILD index a4007661cd2..3d1a71aa14b 100644 --- a/pkg/scheduler/util/BUILD +++ b/pkg/scheduler/util/BUILD @@ -9,6 +9,7 @@ load( go_test( name = "go_default_test", srcs = [ + "error_channel_test.go", "heap_test.go", "utils_test.go", ], @@ -24,6 +25,7 @@ go_library( name = "go_default_library", srcs = [ "clock.go", + "error_channel.go", "heap.go", "utils.go", ], diff --git a/pkg/scheduler/util/error_channel.go b/pkg/scheduler/util/error_channel.go new file mode 100644 index 00000000000..ef300a79d90 --- /dev/null +++ b/pkg/scheduler/util/error_channel.go @@ -0,0 +1,59 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import "context" + +// ErrorChannel supports non-blocking send and receive operation to capture error. +// A maximum of one error is kept in the channel and the rest of the errors sent +// are ignored, unless the existing error is received and the channel becomes empty +// again. +type ErrorChannel struct { + errCh chan error +} + +// SendError sends an error without blocking the sender. +func (e *ErrorChannel) SendError(err error) { + select { + case e.errCh <- err: + default: + } +} + +// SendErrorWithCancel sends an error without blocking the sender and calls +// cancel function. +func (e *ErrorChannel) SendErrorWithCancel(err error, cancel context.CancelFunc) { + e.SendError(err) + cancel() +} + +// ReceiveError receives an error from channel without blocking on the receiver. +func (e *ErrorChannel) ReceiveError() error { + select { + case err := <-e.errCh: + return err + default: + return nil + } +} + +// NewErrorChannel returns a new ErrorChannel. +func NewErrorChannel() *ErrorChannel { + return &ErrorChannel{ + errCh: make(chan error, 1), + } +} diff --git a/pkg/scheduler/util/error_channel_test.go b/pkg/scheduler/util/error_channel_test.go new file mode 100644 index 00000000000..7a1ba0e3c1b --- /dev/null +++ b/pkg/scheduler/util/error_channel_test.go @@ -0,0 +1,47 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "context" + "errors" + "testing" +) + +func TestErrorChannel(t *testing.T) { + errCh := NewErrorChannel() + + if actualErr := errCh.ReceiveError(); actualErr != nil { + t.Errorf("expect nil from err channel, but got %v", actualErr) + } + + err := errors.New("unknown error") + errCh.SendError(err) + if actualErr := errCh.ReceiveError(); actualErr != err { + t.Errorf("expect %v from err channel, but got %v", err, actualErr) + } + + ctx, cancel := context.WithCancel(context.Background()) + errCh.SendErrorWithCancel(err, cancel) + if actualErr := errCh.ReceiveError(); actualErr != err { + t.Errorf("expect %v from err channel, but got %v", err, actualErr) + } + + if ctxErr := ctx.Err(); ctxErr != context.Canceled { + t.Errorf("expect context canceled, but got %v", ctxErr) + } +}