From 254b05761f21ab20bf9b84b3ccba477ea92fdbea Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Wed, 13 Apr 2016 15:44:06 +0200 Subject: [PATCH] Asynchronous bindings --- plugin/pkg/scheduler/scheduler.go | 47 ++++++++++++++------------ plugin/pkg/scheduler/scheduler_test.go | 21 ++++++------ 2 files changed, 37 insertions(+), 31 deletions(-) diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index b28833817a7..b4f0f0c2e7f 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -90,39 +90,44 @@ func (s *Scheduler) scheduleOne() { dest, err := s.config.Algorithm.Schedule(pod, s.config.NodeLister) if err != nil { glog.V(1).Infof("Failed to schedule: %+v", pod) - s.config.Recorder.Eventf(pod, api.EventTypeWarning, "FailedScheduling", "%v", err) s.config.Error(pod, err) + s.config.Recorder.Eventf(pod, api.EventTypeWarning, "FailedScheduling", "%v", err) return } metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start)) - b := &api.Binding{ - ObjectMeta: api.ObjectMeta{Namespace: pod.Namespace, Name: pod.Name}, - Target: api.ObjectReference{ - Kind: "Node", - Name: dest, - }, - } + // Optimistically assume that the binding will succeed and send it to apiserver + // in the background. + // The only risk in this approach is that if the binding fails because of some + // reason, scheduler will be assuming that it succeeded while scheduling next + // pods, until the assumption in the internal cache expire (expiration is + // defined as "didn't read the binding via watch within a given timeout", + // timeout is currently set to 30s). However, after this timeout, the situation + // will self-repair. + assumed := *pod + assumed.Spec.NodeName = dest + s.config.SchedulerCache.AssumePodIfBindSucceed(&assumed, func() bool { return true }) + + go func() { + defer metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start)) + + b := &api.Binding{ + ObjectMeta: api.ObjectMeta{Namespace: pod.Namespace, Name: pod.Name}, + Target: api.ObjectReference{ + Kind: "Node", + Name: dest, + }, + } - bindAction := func() bool { bindingStart := time.Now() err := s.config.Binder.Bind(b) if err != nil { glog.V(1).Infof("Failed to bind pod: %+v", err) - s.config.Recorder.Eventf(pod, api.EventTypeNormal, "FailedScheduling", "Binding rejected: %v", err) s.config.Error(pod, err) - return false + s.config.Recorder.Eventf(pod, api.EventTypeNormal, "FailedScheduling", "Binding rejected: %v", err) + return } metrics.BindingLatency.Observe(metrics.SinceInMicroseconds(bindingStart)) s.config.Recorder.Eventf(pod, api.EventTypeNormal, "Scheduled", "Successfully assigned %v to %v", pod.Name, dest) - return true - } - - assumed := *pod - assumed.Spec.NodeName = dest - // We want to assume the pod if and only if the bind succeeds, - // but we don't want to race with any deletions, which happen asynchronously. - s.config.SchedulerCache.AssumePodIfBindSucceed(&assumed, bindAction) - - metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start)) + }() } diff --git a/plugin/pkg/scheduler/scheduler_test.go b/plugin/pkg/scheduler/scheduler_test.go index 6d176ad9e57..01843e16e5e 100644 --- a/plugin/pkg/scheduler/scheduler_test.go +++ b/plugin/pkg/scheduler/scheduler_test.go @@ -98,13 +98,14 @@ func TestScheduler(t *testing.T) { expectErrorPod: podWithID("foo", ""), eventReason: "FailedScheduling", }, { - sendPod: podWithID("foo", ""), - algo: mockScheduler{"machine1", nil}, - expectBind: &api.Binding{ObjectMeta: api.ObjectMeta{Name: "foo"}, Target: api.ObjectReference{Kind: "Node", Name: "machine1"}}, - injectBindError: errB, - expectError: errB, - expectErrorPod: podWithID("foo", ""), - eventReason: "FailedScheduling", + sendPod: podWithID("foo", ""), + algo: mockScheduler{"machine1", nil}, + expectBind: &api.Binding{ObjectMeta: api.ObjectMeta{Name: "foo"}, Target: api.ObjectReference{Kind: "Node", Name: "machine1"}}, + expectAssumedPod: podWithID("foo", "machine1"), + injectBindError: errB, + expectError: errB, + expectErrorPod: podWithID("foo", ""), + eventReason: "FailedScheduling", }, } @@ -145,6 +146,7 @@ func TestScheduler(t *testing.T) { close(called) }) s.scheduleOne() + <-called if e, a := item.expectAssumedPod, gotAssumedPod; !reflect.DeepEqual(e, a) { t.Errorf("%v: assumed pod: wanted %v, got %v", i, e, a) } @@ -157,7 +159,6 @@ func TestScheduler(t *testing.T) { if e, a := item.expectBind, gotBinding; !reflect.DeepEqual(e, a) { t.Errorf("%v: error: %s", i, diff.ObjectDiff(e, a)) } - <-called events.Stop() } } @@ -250,6 +251,7 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) { // assumedPods: [] s.scheduleOne() + <-called // queuedPodStore: [] // scheduledPodStore: [foo:8080] // assumedPods: [foo:8080] @@ -271,7 +273,6 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) { t.Errorf("Expected exact match on binding: %s", diff.ObjectDiff(ex, ac)) } - <-called events.Stop() scheduledPodStore.Delete(pod) @@ -312,6 +313,7 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) { }) s.scheduleOne() + <-called expectBind = &api.Binding{ ObjectMeta: api.ObjectMeta{Name: "bar"}, @@ -320,6 +322,5 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) { if ex, ac := expectBind, gotBinding; !reflect.DeepEqual(ex, ac) { t.Errorf("Expected exact match on binding: %s", diff.ObjectDiff(ex, ac)) } - <-called events.Stop() }