diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index d1331e5ab6d..aef1193751d 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -20,6 +20,7 @@ package factory import ( "math/rand" + "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -69,6 +70,11 @@ func (factory *ConfigFactory) Create() *scheduler.Config { algorithm.EqualPriority, &storeToPodLister{podCache}, r) + podBackoff := podBackoff{ + perPodBackoff: map[string]*backoffEntry{}, + clock: realClock{}, + } + return &scheduler.Config{ MinionLister: &storeToMinionLister{minionCache}, Algorithm: algo, @@ -81,7 +87,7 @@ func (factory *ConfigFactory) Create() *scheduler.Config { pod.ID, minionCache.Contains(), podCache.Contains()) return pod }, - Error: factory.makeDefaultErrorFunc(podQueue), + Error: factory.makeDefaultErrorFunc(&podBackoff, podQueue), } } @@ -157,15 +163,16 @@ func (factory *ConfigFactory) pollMinions() (cache.Enumerator, error) { return &minionEnumerator{list}, nil } -func (factory *ConfigFactory) makeDefaultErrorFunc(podQueue *cache.FIFO) func(pod *api.Pod, err error) { +func (factory *ConfigFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue *cache.FIFO) func(pod *api.Pod, err error) { return func(pod *api.Pod, err error) { glog.Errorf("Error scheduling %v: %v; retrying", pod.ID, err) - + backoff.gc() // Retry asynchronously. // Note that this is extremely rudimentary and we need a more real error handling path. go func() { defer util.HandleCrash() podID := pod.ID + backoff.wait(podID) // Get the pod again; it may have changed/been scheduled already. pod = &api.Pod{} err := factory.Client.Get().Path("pods").Path(podID).Do().Into(pod) @@ -234,3 +241,62 @@ func (b *binder) Bind(binding *api.Binding) error { glog.V(2).Infof("Attempting to bind %v to %v", binding.PodID, binding.Host) return b.Post().Path("bindings").Body(binding).Do().Error() } + +type clock interface { + Now() time.Time +} + +type realClock struct{} + +func (realClock) Now() time.Time { + return time.Now() +} + +type backoffEntry struct { + backoff time.Duration + lastUpdate time.Time +} + +type podBackoff struct { + perPodBackoff map[string]*backoffEntry + lock sync.Mutex + clock clock +} + +func (p *podBackoff) getEntry(podID string) *backoffEntry { + p.lock.Lock() + defer p.lock.Unlock() + entry, ok := p.perPodBackoff[podID] + if !ok { + entry = &backoffEntry{backoff: 1 * time.Second} + p.perPodBackoff[podID] = entry + } + entry.lastUpdate = p.clock.Now() + return entry +} + +func (p *podBackoff) getBackoff(podID string) time.Duration { + entry := p.getEntry(podID) + duration := entry.backoff + entry.backoff *= 2 + if entry.backoff > 60*time.Second { + entry.backoff = 60 * time.Second + } + glog.V(4).Infof("Backing off %s for pod %s", duration.String(), podID) + return duration +} + +func (p *podBackoff) wait(podID string) { + time.Sleep(p.getBackoff(podID)) +} + +func (p *podBackoff) gc() { + p.lock.Lock() + defer p.lock.Unlock() + now := p.clock.Now() + for podID, entry := range p.perPodBackoff { + if now.Sub(entry.lastUpdate) > 60*time.Second { + delete(p.perPodBackoff, podID) + } + } +} diff --git a/plugin/pkg/scheduler/factory/factory_test.go b/plugin/pkg/scheduler/factory/factory_test.go index 2f834a3aabb..e3bb34a7419 100644 --- a/plugin/pkg/scheduler/factory/factory_test.go +++ b/plugin/pkg/scheduler/factory/factory_test.go @@ -187,7 +187,11 @@ func TestDefaultErrorFunc(t *testing.T) { server := httptest.NewServer(mux) factory := ConfigFactory{client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})} queue := cache.NewFIFO() - errFunc := factory.makeDefaultErrorFunc(queue) + podBackoff := podBackoff{ + perPodBackoff: map[string]*backoffEntry{}, + clock: &fakeClock{}, + } + errFunc := factory.makeDefaultErrorFunc(&podBackoff, queue) errFunc(testPod, nil) for { @@ -276,6 +280,14 @@ func TestMinionEnumerator(t *testing.T) { } } +type fakeClock struct { + t time.Time +} + +func (f *fakeClock) Now() time.Time { + return f.t +} + func TestBind(t *testing.T) { table := []struct { binding *api.Binding @@ -301,3 +313,55 @@ func TestBind(t *testing.T) { handler.ValidateRequest(t, "/api/"+testapi.Version()+"/bindings", "POST", &expectedBody) } } + +func TestBackoff(t *testing.T) { + clock := fakeClock{} + backoff := podBackoff{ + perPodBackoff: map[string]*backoffEntry{}, + clock: &clock, + } + + tests := []struct { + podID string + expectedDuration time.Duration + advanceClock time.Duration + }{ + { + podID: "foo", + expectedDuration: 1 * time.Second, + }, + { + podID: "foo", + expectedDuration: 2 * time.Second, + }, + { + podID: "foo", + expectedDuration: 4 * time.Second, + }, + { + podID: "bar", + expectedDuration: 1 * time.Second, + advanceClock: 120 * time.Second, + }, + // 'foo' should have been gc'd here. + { + podID: "foo", + expectedDuration: 1 * time.Second, + }, + } + + for _, test := range tests { + duration := backoff.getBackoff(test.podID) + if duration != test.expectedDuration { + t.Errorf("expected: %s, got %s for %s", test.expectedDuration.String(), duration.String(), test.podID) + } + clock.t = clock.t.Add(test.advanceClock) + backoff.gc() + } + + backoff.perPodBackoff["foo"].backoff = 60 * time.Second + duration := backoff.getBackoff("foo") + if duration != 60*time.Second { + t.Errorf("expected: 60, got %s", duration.String()) + } +}