diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index cbea5893e92..d140b1904fd 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -27,6 +27,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler" @@ -70,7 +71,7 @@ func (factory *ConfigFactory) Create() *scheduler.Config { NextPod: func() *api.Pod { return podQueue.Pop().(*api.Pod) }, - Error: factory.defaultErrorFunc, + Error: factory.makeDefaultErrorFunc(podQueue), } } @@ -118,8 +119,27 @@ func (factory *ConfigFactory) pollMinions() (cache.Enumerator, error) { return &minionEnumerator{list}, nil } -func (factory *ConfigFactory) defaultErrorFunc(pod *api.Pod, err error) { - glog.Errorf("Error scheduling %v: %v; retrying", pod.ID, err) +func (factory *ConfigFactory) makeDefaultErrorFunc(podQueue *cache.FIFO) func(pod *api.Pod, err error) { + return func(pod *api.Pod, err error) { + glog.Errorf("Error scheduling %v: %v; retrying", pod.ID, err) + + // Retry asynchronously. + // Note that this is extremely rudimentary and we need a more real error handling path. + go func() { + defer util.HandleCrash() + podID := pod.ID + // Get the pod again; it may have changed/been scheduled already. + pod = &api.Pod{} + err := factory.Client.Get().Path("pods").Path(podID).Do().Into(pod) + if err != nil { + glog.Errorf("Error getting pod %v for retry: %v; abandoning", podID, err) + return + } + if pod.DesiredState.Host == "" { + podQueue.Add(pod.ID, pod) + } + }() + } } // storeToMinionLister turns a store into a minion lister. The store must contain (only) minions. diff --git a/plugin/pkg/scheduler/factory/factory_test.go b/plugin/pkg/scheduler/factory/factory_test.go index c542423526f..fb268d7bef2 100644 --- a/plugin/pkg/scheduler/factory/factory_test.go +++ b/plugin/pkg/scheduler/factory/factory_test.go @@ -20,6 +20,7 @@ import ( "net/http/httptest" "reflect" "testing" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" @@ -128,6 +129,36 @@ func TestPollMinions(t *testing.T) { } } +func TestDefaultErrorFunc(t *testing.T) { + testPod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} + handler := util.FakeHandler{ + StatusCode: 200, + ResponseBody: api.EncodeOrDie(testPod), + T: t, + } + server := httptest.NewServer(&handler) + factory := ConfigFactory{client.New(server.URL, nil)} + queue := cache.NewFIFO() + errFunc := factory.makeDefaultErrorFunc(queue) + + errFunc(testPod, nil) + for { + // This is a terrible way to do this but I plan on replacing this + // whole error handling system in the future. The test will time + // out if something doesn't work. + time.Sleep(10 * time.Millisecond) + got, exists := queue.Get("foo") + if !exists { + continue + } + handler.ValidateRequest(t, "/api/v1beta1/pods/foo", "GET", nil) + if e, a := testPod, got; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %v, got %v", e, a) + } + break + } +} + func TestStoreToMinionLister(t *testing.T) { store := cache.NewStore() ids := util.NewStringSet("foo", "bar", "baz")