Merge pull request #87297 from alculquicondor/cleanup/get_binder

Replace Scheduler.GetBinder with a method
This commit is contained in:
Kubernetes Prow Robot 2020-01-20 21:25:36 -08:00 committed by GitHub
commit 48cb968825
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 228 additions and 314 deletions

View File

@ -90,7 +90,6 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
@ -102,6 +101,7 @@ go_test(
"//staging/src/k8s.io/client-go/testing:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/events:go_default_library",
"//vendor/github.com/google/go-cmp/cmp:go_default_library",
],
)

View File

@ -157,7 +157,6 @@ func (c *Configurator) create(extenders []algorithm.SchedulerExtender) (*Schedul
return &Scheduler{
SchedulerCache: c.schedulerCache,
Algorithm: algo,
GetBinder: getBinderFunc(c.client, extenders),
Framework: framework,
NextPod: internalqueue.MakeNextPodFunc(podQueue),
Error: MakeDefaultErrorFunc(c.client, podQueue, c.schedulerCache),
@ -378,19 +377,6 @@ func getPredicateConfigs(keys sets.String, lr *frameworkplugins.LegacyRegistry,
return &plugins, pluginConfig, nil
}
// getBinderFunc returns a func which returns an extender that supports bind or a default binder based on the given pod.
func getBinderFunc(client clientset.Interface, extenders []algorithm.SchedulerExtender) func(pod *v1.Pod) Binder {
defaultBinder := &binder{client}
return func(pod *v1.Pod) Binder {
for _, extender := range extenders {
if extender.IsBinder() && extender.IsInterested(pod) {
return extender
}
}
return defaultBinder
}
}
type podInformer struct {
informer cache.SharedIndexInformer
}
@ -482,19 +468,6 @@ func MakeDefaultErrorFunc(client clientset.Interface, podQueue internalqueue.Sch
}
}
type binder struct {
Client clientset.Interface
}
// Implement Binder interface
var _ Binder = &binder{}
// Bind just does a POST binding RPC.
func (b *binder) Bind(binding *v1.Binding) error {
klog.V(3).Infof("Attempting to bind %v to %v", binding.Name, binding.Target.Name)
return b.Client.CoreV1().Pods(binding.Namespace).Bind(binding)
}
// GetPodDisruptionBudgetLister returns pdb lister from the given informer factory. Returns nil if PodDisruptionBudget feature is disabled.
func GetPodDisruptionBudgetLister(informerFactory informers.SharedInformerFactory) policylisters.PodDisruptionBudgetLister {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodDisruptionBudget) {

View File

@ -20,7 +20,6 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
"testing"
"time"
@ -38,7 +37,6 @@ import (
"k8s.io/client-go/tools/cache"
apitesting "k8s.io/kubernetes/pkg/api/testing"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
extenderv1 "k8s.io/kubernetes/pkg/scheduler/apis/extender/v1"
@ -373,49 +371,32 @@ func testClientGetPodRequest(client *fake.Clientset, t *testing.T, podNs string,
}
}
func TestBind(t *testing.T) {
table := []struct {
name string
binding *v1.Binding
}{
{
name: "binding can bind and validate request",
binding: &v1.Binding{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceDefault,
Name: "foo",
},
Target: v1.ObjectReference{
Name: "foohost.kubernetes.mydomain.com",
},
},
// TODO(#87157): Move to DefaultBinding Plugin tests when it is introduced.
func TestDefaultBinding(t *testing.T) {
binding := &v1.Binding{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceDefault,
Name: "foo",
},
Target: v1.ObjectReference{
Name: "foohost.kubernetes.mydomain.com",
},
}
for _, test := range table {
t.Run(test.name, func(t *testing.T) {
testBind(test.binding, t)
})
}
}
func testBind(binding *v1.Binding, t *testing.T) {
testPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: binding.GetName(), Namespace: metav1.NamespaceDefault},
Spec: apitesting.V1DeepEqualSafePodSpec(),
}
client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}})
b := binder{client}
if err := b.Bind(binding); err != nil {
sched := Scheduler{client: client}
if err := sched.defaultBinding(binding); err != nil {
t.Errorf("Unexpected error: %v", err)
return
}
pod := client.CoreV1().Pods(metav1.NamespaceDefault).(*fakeV1.FakePods)
actualBinding, err := pod.GetBinding(binding.GetName())
pods := client.CoreV1().Pods(metav1.NamespaceDefault).(*fakeV1.FakePods)
actualBinding, err := pods.GetBinding(binding.GetName())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
return
@ -461,6 +442,7 @@ type fakeExtender struct {
isBinder bool
interestedPodName string
ignorable bool
gotBind bool
}
func (f *fakeExtender) Name() string {
@ -496,6 +478,7 @@ func (f *fakeExtender) Prioritize(
func (f *fakeExtender) Bind(binding *v1.Binding) error {
if f.isBinder {
f.gotBind = true
return nil
}
return errors.New("not a binder")
@ -509,65 +492,6 @@ func (f *fakeExtender) IsInterested(pod *v1.Pod) bool {
return pod != nil && pod.Name == f.interestedPodName
}
func TestGetBinderFunc(t *testing.T) {
table := []struct {
podName string
extenders []algorithm.SchedulerExtender
expectedBinderType string
name string
}{
{
name: "the extender is not a binder",
podName: "pod0",
extenders: []algorithm.SchedulerExtender{
&fakeExtender{isBinder: false, interestedPodName: "pod0"},
},
expectedBinderType: "*scheduler.binder",
},
{
name: "one of the extenders is a binder and interested in pod",
podName: "pod0",
extenders: []algorithm.SchedulerExtender{
&fakeExtender{isBinder: false, interestedPodName: "pod0"},
&fakeExtender{isBinder: true, interestedPodName: "pod0"},
},
expectedBinderType: "*scheduler.fakeExtender",
},
{
name: "one of the extenders is a binder, but not interested in pod",
podName: "pod1",
extenders: []algorithm.SchedulerExtender{
&fakeExtender{isBinder: false, interestedPodName: "pod1"},
&fakeExtender{isBinder: true, interestedPodName: "pod0"},
},
expectedBinderType: "*scheduler.binder",
},
}
for _, test := range table {
t.Run(test.name, func(t *testing.T) {
testGetBinderFunc(test.expectedBinderType, test.podName, test.extenders, t)
})
}
}
func testGetBinderFunc(expectedBinderType, podName string, extenders []algorithm.SchedulerExtender, t *testing.T) {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
},
}
f := &Configurator{}
binderFunc := getBinderFunc(f.client, extenders)
binder := binderFunc(pod)
binderType := fmt.Sprintf("%s", reflect.TypeOf(binder))
if binderType != expectedBinderType {
t.Errorf("Expected binder %q but got %q", expectedBinderType, binderType)
}
}
type TestPlugin struct {
name string
}

View File

@ -235,6 +235,7 @@ func TestAssumePodScheduled(t *testing.T) {
type testExpirePodStruct struct {
pod *v1.Pod
finishBind bool
assumedTime time.Time
}
@ -254,6 +255,7 @@ func TestExpirePod(t *testing.T) {
testPods := []*v1.Pod{
makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}),
makeBasePod(t, nodeName, "test-3", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}),
}
now := time.Now()
ttl := 10 * time.Second
@ -264,26 +266,28 @@ func TestExpirePod(t *testing.T) {
wNodeInfo *schedulernodeinfo.NodeInfo
}{{ // assumed pod would expires
pods: []*testExpirePodStruct{
{pod: testPods[0], assumedTime: now},
{pod: testPods[0], finishBind: true, assumedTime: now},
},
cleanupTime: now.Add(2 * ttl),
wNodeInfo: schedulernodeinfo.NewNodeInfo(),
}, { // first one would expire, second one would not.
}, { // first one would expire, second and third would not.
pods: []*testExpirePodStruct{
{pod: testPods[0], assumedTime: now},
{pod: testPods[1], assumedTime: now.Add(3 * ttl / 2)},
{pod: testPods[0], finishBind: true, assumedTime: now},
{pod: testPods[1], finishBind: true, assumedTime: now.Add(3 * ttl / 2)},
{pod: testPods[2]},
},
cleanupTime: now.Add(2 * ttl),
wNodeInfo: newNodeInfo(
&schedulernodeinfo.Resource{
MilliCPU: 200,
Memory: 1024,
MilliCPU: 400,
Memory: 2048,
},
&schedulernodeinfo.Resource{
MilliCPU: 200,
Memory: 1024,
MilliCPU: 400,
Memory: 2048,
},
[]*v1.Pod{testPods[1]},
// Order gets altered when removing pods.
[]*v1.Pod{testPods[2], testPods[1]},
newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(),
make(map[string]*schedulernodeinfo.ImageStateSummary),
),
@ -294,11 +298,18 @@ func TestExpirePod(t *testing.T) {
cache := newSchedulerCache(ttl, time.Second, nil)
for _, pod := range tt.pods {
if err := assumeAndFinishBinding(cache, pod.pod, pod.assumedTime); err != nil {
t.Fatalf("assumePod failed: %v", err)
if err := cache.AssumePod(pod.pod); err != nil {
t.Fatal(err)
}
if !pod.finishBind {
continue
}
if err := cache.finishBinding(pod.pod, pod.assumedTime); err != nil {
t.Fatal(err)
}
}
// pods that have assumedTime + ttl < cleanupTime will get expired and removed
// pods that got bound and have assumedTime + ttl < cleanupTime will get
// expired and removed
cache.cleanupAssumedPods(tt.cleanupTime)
n := cache.nodes[nodeName]
if err := deepEqualWithoutGeneration(n, tt.wNodeInfo); err != nil {

View File

@ -82,7 +82,6 @@ type Scheduler struct {
SchedulerCache internalcache.Cache
Algorithm core.ScheduleAlgorithm
GetBinder func(pod *v1.Pod) Binder
// PodConditionUpdater is used only in case of scheduling errors. If we succeed
// with scheduling, PodScheduled condition will be updated in apiserver in /bind
// handler so that binding and setting PodCondition it is atomic.
@ -119,6 +118,9 @@ type Scheduler struct {
SchedulingQueue internalqueue.SchedulingQueue
scheduledPodsHasSynced func() bool
// TODO(#87157): Remove this when the DefaultBinding Plugin is introduced.
client clientset.Interface
}
// Cache returns the cache in scheduler for test to check the data in scheduler.
@ -333,6 +335,7 @@ func New(client clientset.Interface,
sched.podConditionUpdater = &podConditionUpdaterImpl{client}
sched.podPreemptor = &podPreemptorImpl{client}
sched.scheduledPodsHasSynced = podInformer.Informer().HasSynced
sched.client = client
AddAllEventHandlers(sched, options.schedulerName, informerFactory, podInformer)
return sched, nil
@ -505,28 +508,61 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
return nil
}
// bind binds a pod to a given node defined in a binding object. We expect this to run asynchronously, so we
// handle binding metrics internally.
func (sched *Scheduler) bind(ctx context.Context, assumed *v1.Pod, targetNode string, state *framework.CycleState) error {
bindingStart := time.Now()
bindStatus := sched.Framework.RunBindPlugins(ctx, state, assumed, targetNode)
var err error
if !bindStatus.IsSuccess() {
if bindStatus.Code() == framework.Skip {
// All bind plugins chose to skip binding of this pod, call original binding function.
// If binding succeeds then PodScheduled condition will be updated in apiserver so that
// it's atomic with setting host.
err = sched.GetBinder(assumed).Bind(&v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: assumed.Namespace, Name: assumed.Name, UID: assumed.UID},
Target: v1.ObjectReference{
Kind: "Node",
Name: targetNode,
},
})
} else {
err = fmt.Errorf("Bind failure, code: %d: %v", bindStatus.Code(), bindStatus.Message())
}
// bind binds a pod to a given node defined in a binding object.
// The precedence for binding is: (1) extenders, (2) plugins and (3) default binding.
// We expect this to run asynchronously, so we handle binding metrics internally.
func (sched *Scheduler) bind(ctx context.Context, assumed *v1.Pod, targetNode string, state *framework.CycleState) (err error) {
start := time.Now()
defer func() {
sched.finishBinding(assumed, targetNode, start, err)
}()
binding := &v1.Binding{
ObjectMeta: metav1.ObjectMeta{
Namespace: assumed.Namespace,
Name: assumed.Name,
UID: assumed.UID,
},
Target: v1.ObjectReference{
Kind: "Node",
Name: targetNode,
},
}
bound, err := sched.extendersBinding(assumed, binding)
if bound {
return err
}
bindStatus := sched.Framework.RunBindPlugins(ctx, state, assumed, targetNode)
if bindStatus.IsSuccess() {
return nil
}
if bindStatus.Code() != framework.Skip {
return fmt.Errorf("bind failure, code: %d: %v", bindStatus.Code(), bindStatus.Message())
}
// All bind plugins chose to skip binding of this pod, call original binding
// function. If binding succeeds then PodScheduled condition will be updated
// in apiserver so that it's atomic with setting host.
return sched.defaultBinding(binding)
}
// TODO(#87159): Move this to a Plugin.
func (sched *Scheduler) extendersBinding(assumed *v1.Pod, binding *v1.Binding) (bool, error) {
for _, extender := range sched.Algorithm.Extenders() {
if !extender.IsBinder() || !extender.IsInterested(assumed) {
continue
}
return true, extender.Bind(binding)
}
return false, nil
}
// TODO(#87157): Move this to a Plugin.
func (sched *Scheduler) defaultBinding(binding *v1.Binding) error {
klog.V(3).Infof("Attempting to bind %v/%v to %v", binding.Namespace, binding.Name, binding.Target.Name)
return sched.client.CoreV1().Pods(binding.Namespace).Bind(binding)
}
func (sched *Scheduler) finishBinding(assumed *v1.Pod, targetNode string, start time.Time, err error) {
if finErr := sched.SchedulerCache.FinishBinding(assumed); finErr != nil {
klog.Errorf("scheduler cache FinishBinding failed: %v", finErr)
}
@ -535,13 +571,12 @@ func (sched *Scheduler) bind(ctx context.Context, assumed *v1.Pod, targetNode st
if err := sched.SchedulerCache.ForgetPod(assumed); err != nil {
klog.Errorf("scheduler cache ForgetPod failed: %v", err)
}
return err
return
}
metrics.BindingLatency.Observe(metrics.SinceInSeconds(bindingStart))
metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.Binding).Observe(metrics.SinceInSeconds(bindingStart))
metrics.BindingLatency.Observe(metrics.SinceInSeconds(start))
metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.Binding).Observe(metrics.SinceInSeconds(start))
sched.Recorder.Eventf(assumed, nil, v1.EventTypeNormal, "Scheduled", "Binding", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode)
return nil
}
// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.

View File

@ -27,6 +27,7 @@ import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
"k8s.io/api/events/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
@ -34,12 +35,12 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
clienttesting "k8s.io/client-go/testing"
clientcache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events"
volumescheduling "k8s.io/kubernetes/pkg/controller/volume/scheduling"
@ -66,12 +67,6 @@ var (
emptyFramework, _ = framework.NewFramework(emptyPluginRegistry, nil, nil)
)
type fakeBinder struct {
b func(binding *v1.Binding) error
}
func (fb fakeBinder) Bind(binding *v1.Binding) error { return fb.b(binding) }
type fakePodConditionUpdater struct{}
func (fc fakePodConditionUpdater) update(pod *v1.Pod, podCondition *v1.PodCondition) error {
@ -258,6 +253,7 @@ func TestScheduler(t *testing.T) {
expectForgetPod: podWithID("foo", testNode.Name),
eventReason: "FailedScheduling",
}, {
name: "deleting pod",
sendPod: deletingPod("foo"),
algo: mockScheduler{core.ScheduleResult{}, nil},
eventReason: "FailedScheduling",
@ -292,16 +288,18 @@ func TestScheduler(t *testing.T) {
return pod.UID == gotAssumedPod.UID
},
}
client := clientsetfake.NewSimpleClientset(item.sendPod)
client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
if action.GetSubresource() != "binding" {
return false, nil, nil
}
gotBinding = action.(clienttesting.CreateAction).GetObject().(*v1.Binding)
return true, gotBinding, item.injectBindError
})
s := &Scheduler{
SchedulerCache: sCache,
Algorithm: item.algo,
GetBinder: func(pod *v1.Pod) Binder {
return fakeBinder{func(b *v1.Binding) error {
gotBinding = b
return item.injectBindError
}}
},
SchedulerCache: sCache,
Algorithm: item.algo,
podConditionUpdater: fakePodConditionUpdater{},
Error: func(p *framework.PodInfo, err error) {
gotPod = p.Pod
@ -313,12 +311,13 @@ func TestScheduler(t *testing.T) {
Framework: emptyFramework,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, "scheduler"),
VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
client: client,
}
called := make(chan struct{})
stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
e, _ := obj.(*v1beta1.Event)
if e, a := item.eventReason, e.Reason; e != a {
t.Errorf("expected %v, got %v", e, a)
if e.Reason != item.eventReason {
t.Errorf("got event %v, want %v", e.Reason, item.eventReason)
}
close(called)
})
@ -336,8 +335,8 @@ func TestScheduler(t *testing.T) {
if e, a := item.expectError, gotError; !reflect.DeepEqual(e, a) {
t.Errorf("error: wanted %v, got %v", e, a)
}
if e, a := item.expectBind, gotBinding; !reflect.DeepEqual(e, a) {
t.Errorf("error: %s", diff.ObjectDiff(e, a))
if diff := cmp.Diff(item.expectBind, gotBinding); diff != "" {
t.Errorf("got binding diff (-want, +got): %s", diff)
}
stopFunc()
})
@ -481,78 +480,6 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
}
}
// Scheduler should preserve predicate constraint even if binding was longer
// than cache ttl
func TestSchedulerErrorWithLongBinding(t *testing.T) {
stop := make(chan struct{})
defer close(stop)
firstPod := podWithPort("foo", "", 8080)
conflictPod := podWithPort("bar", "", 8080)
pods := map[string]*v1.Pod{firstPod.Name: firstPod, conflictPod.Name: conflictPod}
for _, test := range []struct {
name string
Expected map[string]bool
CacheTTL time.Duration
BindingDuration time.Duration
}{
{
name: "long cache ttl",
Expected: map[string]bool{firstPod.Name: true},
CacheTTL: 100 * time.Millisecond,
BindingDuration: 300 * time.Millisecond,
},
{
name: "short cache ttl",
Expected: map[string]bool{firstPod.Name: true},
CacheTTL: 10 * time.Second,
BindingDuration: 300 * time.Millisecond,
},
} {
t.Run(test.name, func(t *testing.T) {
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
scache := internalcache.New(test.CacheTTL, stop)
node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
scache.AddNode(&node)
client := clientsetfake.NewSimpleClientset(&node)
informerFactory := informers.NewSharedInformerFactory(client, 0)
fns := []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterPluginAsExtensions(nodeports.Name, 1, nodeports.New, "Filter", "PreFilter"),
}
scheduler, bindingChan := setupTestSchedulerLongBindingWithRetry(
queuedPodStore, scache, informerFactory, stop, test.BindingDuration, fns...)
informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)
go scheduler.Run(context.Background())
queuedPodStore.Add(firstPod)
queuedPodStore.Add(conflictPod)
resultBindings := map[string]bool{}
waitChan := time.After(5 * time.Second)
for finished := false; !finished; {
select {
case b := <-bindingChan:
resultBindings[b.Name] = true
p := pods[b.Name]
p.Spec.NodeName = b.Target.Name
scache.AddPod(p)
case <-waitChan:
finished = true
}
}
if !reflect.DeepEqual(resultBindings, test.Expected) {
t.Errorf("Result binding are not equal to expected. %v != %v", resultBindings, test.Expected)
}
})
}
}
// queuedPodStore: pods queued before processing.
// cache: scheduler cache that might contain assumed pods.
func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcache.FIFO, scache internalcache.Cache,
@ -698,15 +625,19 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
bindingChan := make(chan *v1.Binding, 1)
errChan := make(chan error, 1)
client := clientsetfake.NewSimpleClientset()
client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
var b *v1.Binding
if action.GetSubresource() == "binding" {
b := action.(clienttesting.CreateAction).GetObject().(*v1.Binding)
bindingChan <- b
}
return true, b, nil
})
sched := &Scheduler{
SchedulerCache: scache,
Algorithm: algo,
GetBinder: func(pod *v1.Pod) Binder {
return fakeBinder{func(b *v1.Binding) error {
bindingChan <- b
return nil
}}
},
NextPod: func() *framework.PodInfo {
return &framework.PodInfo{Pod: clientcache.Pop(queuedPodStore).(*v1.Pod)}
},
@ -718,6 +649,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
podPreemptor: fakePodPreemptor{},
Framework: fwk,
VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
client: client,
}
if recorder != nil {
@ -727,66 +659,6 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
return sched, bindingChan, errChan
}
func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, stop chan struct{}, bindingTime time.Duration, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding) {
registry := framework.Registry{}
// TODO: instantiate the plugins dynamically.
plugins := &schedulerapi.Plugins{
QueueSort: &schedulerapi.PluginSet{},
PreFilter: &schedulerapi.PluginSet{},
Filter: &schedulerapi.PluginSet{},
}
var pluginConfigs []schedulerapi.PluginConfig
for _, f := range fns {
f(&registry, plugins, pluginConfigs)
}
fwk, _ := framework.NewFramework(registry, plugins, pluginConfigs)
queue := internalqueue.NewSchedulingQueue(nil)
algo := core.NewGenericScheduler(
scache,
queue,
internalcache.NewEmptySnapshot(),
fwk,
[]algorithm.SchedulerExtender{},
nil,
informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(),
false,
schedulerapi.DefaultPercentageOfNodesToScore,
false,
)
bindingChan := make(chan *v1.Binding, 2)
sched := &Scheduler{
SchedulerCache: scache,
Algorithm: algo,
GetBinder: func(pod *v1.Pod) Binder {
return fakeBinder{func(b *v1.Binding) error {
time.Sleep(bindingTime)
bindingChan <- b
return nil
}}
},
scheduledPodsHasSynced: func() bool {
return true
},
NextPod: func() *framework.PodInfo {
return &framework.PodInfo{Pod: clientcache.Pop(queuedPodStore).(*v1.Pod)}
},
Error: func(p *framework.PodInfo, err error) {
queuedPodStore.AddIfNotPresent(p)
},
Recorder: &events.FakeRecorder{},
podConditionUpdater: fakePodConditionUpdater{},
podPreemptor: fakePodPreemptor{},
StopEverything: stop,
Framework: fwk,
VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
SchedulingQueue: queue,
}
return sched, bindingChan
}
func setupTestSchedulerWithVolumeBinding(fakeVolumeBinder *volumebinder.VolumeBinder, stop <-chan struct{}, broadcaster events.EventBroadcaster) (*Scheduler, chan *v1.Binding, chan error) {
testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
@ -1065,3 +937,102 @@ priorities:
}
}
}
func TestSchedulerBinding(t *testing.T) {
table := []struct {
podName string
extenders []algorithm.SchedulerExtender
wantBinderID int
name string
}{
{
name: "the extender is not a binder",
podName: "pod0",
extenders: []algorithm.SchedulerExtender{
&fakeExtender{isBinder: false, interestedPodName: "pod0"},
},
wantBinderID: -1, // default binding.
},
{
name: "one of the extenders is a binder and interested in pod",
podName: "pod0",
extenders: []algorithm.SchedulerExtender{
&fakeExtender{isBinder: false, interestedPodName: "pod0"},
&fakeExtender{isBinder: true, interestedPodName: "pod0"},
},
wantBinderID: 1,
},
{
name: "one of the extenders is a binder, but not interested in pod",
podName: "pod1",
extenders: []algorithm.SchedulerExtender{
&fakeExtender{isBinder: false, interestedPodName: "pod1"},
&fakeExtender{isBinder: true, interestedPodName: "pod0"},
},
wantBinderID: -1, // default binding.
},
}
for _, test := range table {
t.Run(test.name, func(t *testing.T) {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: test.podName,
},
}
defaultBound := false
client := clientsetfake.NewSimpleClientset(pod)
client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
if action.GetSubresource() == "binding" {
defaultBound = true
}
return false, nil, nil
})
fwk, err := framework.NewFramework(framework.Registry{}, nil, nil)
if err != nil {
t.Fatal(err)
}
stop := make(chan struct{})
defer close(stop)
scache := internalcache.New(100*time.Millisecond, stop)
algo := core.NewGenericScheduler(
scache,
nil,
nil,
fwk,
test.extenders,
nil,
nil,
nil,
false,
0,
false,
)
sched := Scheduler{
Algorithm: algo,
Framework: fwk,
Recorder: &events.FakeRecorder{},
SchedulerCache: scache,
client: client,
}
err = sched.bind(context.Background(), pod, "node", nil)
if err != nil {
t.Error(err)
}
// Checking default binding.
if wantBound := test.wantBinderID == -1; defaultBound != wantBound {
t.Errorf("got bound with default binding: %v, want %v", defaultBound, wantBound)
}
// Checking extenders binding.
for i, ext := range test.extenders {
wantBound := i == test.wantBinderID
if gotBound := ext.(*fakeExtender).gotBind; gotBound != wantBound {
t.Errorf("got bound with extender #%d: %v, want %v", i, gotBound, wantBound)
}
}
})
}
}