feat: use channel instead of mutex in scheduling predicates

This commit is contained in:
draveness 2019-07-04 16:26:42 +08:00
parent 9ef075f359
commit fab1948a1b
4 changed files with 127 additions and 24 deletions

View File

@ -372,9 +372,8 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, nodeInfoMap map[string]*s
allNodeNames = append(allNodeNames, name)
}
errCh := schedutil.NewErrorChannel()
var lock sync.Mutex
var firstError error
topologyMaps := newTopologyPairsMaps()
appendTopologyPairsMaps := func(toAppend *topologyPairsMaps) {
@ -382,13 +381,6 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, nodeInfoMap map[string]*s
defer lock.Unlock()
topologyMaps.appendMaps(toAppend)
}
catchError := func(err error) {
lock.Lock()
defer lock.Unlock()
if firstError == nil {
firstError = err
}
}
ctx, cancel := context.WithCancel(context.Background())
@ -402,15 +394,19 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, nodeInfoMap map[string]*s
for _, existingPod := range nodeInfo.PodsWithAffinity() {
existingPodTopologyMaps, err := getMatchingAntiAffinityTopologyPairsOfPod(pod, existingPod, node)
if err != nil {
catchError(err)
cancel()
errCh.SendErrorWithCancel(err, cancel)
return
}
appendTopologyPairsMaps(existingPodTopologyMaps)
}
}
workqueue.ParallelizeUntil(ctx, 16, len(allNodeNames), processNode)
return topologyMaps, firstError
if err := errCh.ReceiveError(); err != nil {
return nil, err
}
return topologyMaps, nil
}
// getTPMapMatchingIncomingAffinityAntiAffinity finds existing Pods that match affinity terms of the given "pod".
@ -428,8 +424,9 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, nodeInfoMap map[s
allNodeNames = append(allNodeNames, name)
}
errCh := schedutil.NewErrorChannel()
var lock sync.Mutex
var firstError error
topologyPairsAffinityPodsMaps = newTopologyPairsMaps()
topologyPairsAntiAffinityPodsMaps = newTopologyPairsMaps()
appendResult := func(nodeName string, nodeTopologyPairsAffinityPodsMaps, nodeTopologyPairsAntiAffinityPodsMaps *topologyPairsMaps) {
@ -443,19 +440,12 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, nodeInfoMap map[s
}
}
catchError := func(err error) {
lock.Lock()
defer lock.Unlock()
if firstError == nil {
firstError = err
}
}
affinityTerms := GetPodAffinityTerms(affinity.PodAffinity)
affinityProperties, err := getAffinityTermProperties(pod, affinityTerms)
if err != nil {
return nil, nil, err
}
antiAffinityTerms := GetPodAntiAffinityTerms(affinity.PodAntiAffinity)
ctx, cancel := context.WithCancel(context.Background())
@ -484,8 +474,7 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, nodeInfoMap map[s
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(pod, &term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
catchError(err)
cancel()
errCh.SendErrorWithCancel(err, cancel)
return
}
if priorityutil.PodMatchesTermsNamespaceAndSelector(existingPod, namespaces, selector) {
@ -496,12 +485,18 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, nodeInfoMap map[s
}
}
}
if len(nodeTopologyPairsAffinityPodsMaps.topologyPairToPods) > 0 || len(nodeTopologyPairsAntiAffinityPodsMaps.topologyPairToPods) > 0 {
appendResult(node.Name, nodeTopologyPairsAffinityPodsMaps, nodeTopologyPairsAntiAffinityPodsMaps)
}
}
workqueue.ParallelizeUntil(ctx, 16, len(allNodeNames), processNode)
return topologyPairsAffinityPodsMaps, topologyPairsAntiAffinityPodsMaps, firstError
if err := errCh.ReceiveError(); err != nil {
return nil, nil, err
}
return topologyPairsAffinityPodsMaps, topologyPairsAntiAffinityPodsMaps, nil
}
// targetPodMatchesAffinityOfPod returns true if "targetPod" matches ALL affinity terms of

View File

@ -9,6 +9,7 @@ load(
go_test(
name = "go_default_test",
srcs = [
"error_channel_test.go",
"heap_test.go",
"utils_test.go",
],
@ -24,6 +25,7 @@ go_library(
name = "go_default_library",
srcs = [
"clock.go",
"error_channel.go",
"heap.go",
"utils.go",
],

View File

@ -0,0 +1,59 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import "context"
// ErrorChannel supports non-blocking send and receive operation to capture error.
// A maximum of one error is kept in the channel and the rest of the errors sent
// are ignored, unless the existing error is received and the channel becomes empty
// again.
type ErrorChannel struct {
errCh chan error
}
// SendError sends an error without blocking the sender.
func (e *ErrorChannel) SendError(err error) {
select {
case e.errCh <- err:
default:
}
}
// SendErrorWithCancel sends an error without blocking the sender and calls
// cancel function.
func (e *ErrorChannel) SendErrorWithCancel(err error, cancel context.CancelFunc) {
e.SendError(err)
cancel()
}
// ReceiveError receives an error from channel without blocking on the receiver.
func (e *ErrorChannel) ReceiveError() error {
select {
case err := <-e.errCh:
return err
default:
return nil
}
}
// NewErrorChannel returns a new ErrorChannel.
func NewErrorChannel() *ErrorChannel {
return &ErrorChannel{
errCh: make(chan error, 1),
}
}

View File

@ -0,0 +1,47 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"context"
"errors"
"testing"
)
func TestErrorChannel(t *testing.T) {
errCh := NewErrorChannel()
if actualErr := errCh.ReceiveError(); actualErr != nil {
t.Errorf("expect nil from err channel, but got %v", actualErr)
}
err := errors.New("unknown error")
errCh.SendError(err)
if actualErr := errCh.ReceiveError(); actualErr != err {
t.Errorf("expect %v from err channel, but got %v", err, actualErr)
}
ctx, cancel := context.WithCancel(context.Background())
errCh.SendErrorWithCancel(err, cancel)
if actualErr := errCh.ReceiveError(); actualErr != err {
t.Errorf("expect %v from err channel, but got %v", err, actualErr)
}
if ctxErr := ctx.Err(); ctxErr != context.Canceled {
t.Errorf("expect context canceled, but got %v", ctxErr)
}
}