Merge pull request #79777 from draveness/feature/refactor-scheduling-predicates

feat: use channel instead of mutex in scheduling predicates
This commit is contained in:
Kubernetes Prow Robot 2019-07-15 20:27:35 -07:00 committed by GitHub
commit 5a32bea904
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 127 additions and 24 deletions

View File

@ -372,9 +372,8 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, nodeInfoMap map[string]*s
allNodeNames = append(allNodeNames, name)
}
errCh := schedutil.NewErrorChannel()
var lock sync.Mutex
var firstError error
topologyMaps := newTopologyPairsMaps()
appendTopologyPairsMaps := func(toAppend *topologyPairsMaps) {
@ -382,13 +381,6 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, nodeInfoMap map[string]*s
defer lock.Unlock()
topologyMaps.appendMaps(toAppend)
}
catchError := func(err error) {
lock.Lock()
defer lock.Unlock()
if firstError == nil {
firstError = err
}
}
ctx, cancel := context.WithCancel(context.Background())
@ -402,15 +394,19 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, nodeInfoMap map[string]*s
for _, existingPod := range nodeInfo.PodsWithAffinity() {
existingPodTopologyMaps, err := getMatchingAntiAffinityTopologyPairsOfPod(pod, existingPod, node)
if err != nil {
catchError(err)
cancel()
errCh.SendErrorWithCancel(err, cancel)
return
}
appendTopologyPairsMaps(existingPodTopologyMaps)
}
}
workqueue.ParallelizeUntil(ctx, 16, len(allNodeNames), processNode)
return topologyMaps, firstError
if err := errCh.ReceiveError(); err != nil {
return nil, err
}
return topologyMaps, nil
}
// getTPMapMatchingIncomingAffinityAntiAffinity finds existing Pods that match affinity terms of the given "pod".
@ -428,8 +424,9 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, nodeInfoMap map[s
allNodeNames = append(allNodeNames, name)
}
errCh := schedutil.NewErrorChannel()
var lock sync.Mutex
var firstError error
topologyPairsAffinityPodsMaps = newTopologyPairsMaps()
topologyPairsAntiAffinityPodsMaps = newTopologyPairsMaps()
appendResult := func(nodeName string, nodeTopologyPairsAffinityPodsMaps, nodeTopologyPairsAntiAffinityPodsMaps *topologyPairsMaps) {
@ -443,19 +440,12 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, nodeInfoMap map[s
}
}
catchError := func(err error) {
lock.Lock()
defer lock.Unlock()
if firstError == nil {
firstError = err
}
}
affinityTerms := GetPodAffinityTerms(affinity.PodAffinity)
affinityProperties, err := getAffinityTermProperties(pod, affinityTerms)
if err != nil {
return nil, nil, err
}
antiAffinityTerms := GetPodAntiAffinityTerms(affinity.PodAntiAffinity)
ctx, cancel := context.WithCancel(context.Background())
@ -484,8 +474,7 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, nodeInfoMap map[s
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(pod, &term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
catchError(err)
cancel()
errCh.SendErrorWithCancel(err, cancel)
return
}
if priorityutil.PodMatchesTermsNamespaceAndSelector(existingPod, namespaces, selector) {
@ -496,12 +485,18 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, nodeInfoMap map[s
}
}
}
if len(nodeTopologyPairsAffinityPodsMaps.topologyPairToPods) > 0 || len(nodeTopologyPairsAntiAffinityPodsMaps.topologyPairToPods) > 0 {
appendResult(node.Name, nodeTopologyPairsAffinityPodsMaps, nodeTopologyPairsAntiAffinityPodsMaps)
}
}
workqueue.ParallelizeUntil(ctx, 16, len(allNodeNames), processNode)
return topologyPairsAffinityPodsMaps, topologyPairsAntiAffinityPodsMaps, firstError
if err := errCh.ReceiveError(); err != nil {
return nil, nil, err
}
return topologyPairsAffinityPodsMaps, topologyPairsAntiAffinityPodsMaps, nil
}
// targetPodMatchesAffinityOfPod returns true if "targetPod" matches ALL affinity terms of

View File

@ -9,6 +9,7 @@ load(
go_test(
name = "go_default_test",
srcs = [
"error_channel_test.go",
"heap_test.go",
"utils_test.go",
],
@ -24,6 +25,7 @@ go_library(
name = "go_default_library",
srcs = [
"clock.go",
"error_channel.go",
"heap.go",
"utils.go",
],

View File

@ -0,0 +1,59 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import "context"
// ErrorChannel supports non-blocking send and receive operation to capture error.
// A maximum of one error is kept in the channel and the rest of the errors sent
// are ignored, unless the existing error is received and the channel becomes empty
// again.
type ErrorChannel struct {
errCh chan error
}
// SendError sends an error without blocking the sender.
func (e *ErrorChannel) SendError(err error) {
select {
case e.errCh <- err:
default:
}
}
// SendErrorWithCancel sends an error without blocking the sender and calls
// cancel function.
func (e *ErrorChannel) SendErrorWithCancel(err error, cancel context.CancelFunc) {
e.SendError(err)
cancel()
}
// ReceiveError receives an error from channel without blocking on the receiver.
func (e *ErrorChannel) ReceiveError() error {
select {
case err := <-e.errCh:
return err
default:
return nil
}
}
// NewErrorChannel returns a new ErrorChannel.
func NewErrorChannel() *ErrorChannel {
return &ErrorChannel{
errCh: make(chan error, 1),
}
}

View File

@ -0,0 +1,47 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"context"
"errors"
"testing"
)
func TestErrorChannel(t *testing.T) {
errCh := NewErrorChannel()
if actualErr := errCh.ReceiveError(); actualErr != nil {
t.Errorf("expect nil from err channel, but got %v", actualErr)
}
err := errors.New("unknown error")
errCh.SendError(err)
if actualErr := errCh.ReceiveError(); actualErr != err {
t.Errorf("expect %v from err channel, but got %v", err, actualErr)
}
ctx, cancel := context.WithCancel(context.Background())
errCh.SendErrorWithCancel(err, cancel)
if actualErr := errCh.ReceiveError(); actualErr != err {
t.Errorf("expect %v from err channel, but got %v", err, actualErr)
}
if ctxErr := ctx.Err(); ctxErr != context.Canceled {
t.Errorf("expect context canceled, but got %v", ctxErr)
}
}