From 56fc0f5900d20a93436b18ba2e46cd823234ce06 Mon Sep 17 00:00:00 2001 From: Sergiusz Urbaniak Date: Mon, 21 Sep 2015 15:26:44 +0200 Subject: [PATCH] scheduler: reenable TestPlugin_LifeCycle, increase timeouts --- contrib/mesos/pkg/scheduler/plugin.go | 11 +- contrib/mesos/pkg/scheduler/plugin_test.go | 480 ++++++++++++++------- 2 files changed, 327 insertions(+), 164 deletions(-) diff --git a/contrib/mesos/pkg/scheduler/plugin.go b/contrib/mesos/pkg/scheduler/plugin.go index 9a7d4bedd83..1c8d01eede7 100644 --- a/contrib/mesos/pkg/scheduler/plugin.go +++ b/contrib/mesos/pkg/scheduler/plugin.go @@ -52,6 +52,11 @@ const ( pluginRecoveryDelay = 100 * time.Millisecond // delay after scheduler plugin crashes, before we resume scheduling ) +const ( + FailedScheduling = "FailedScheduling" + Scheduled = "Scheduled" +) + // scheduler abstraction to allow for easier unit testing type schedulerInterface interface { sync.Locker // synchronize scheduler plugin operations @@ -757,7 +762,7 @@ func (s *schedulingPlugin) scheduleOne() { dest, err := s.config.Algorithm.Schedule(pod, s.config.NodeLister) // call kubeScheduler.Schedule if err != nil { log.V(1).Infof("Failed to schedule: %+v", pod) - s.config.Recorder.Eventf(pod, "FailedScheduling", "Error scheduling: %v", err) + s.config.Recorder.Eventf(pod, FailedScheduling, "Error scheduling: %v", err) s.config.Error(pod, err) return } @@ -770,11 +775,11 @@ func (s *schedulingPlugin) scheduleOne() { } if err := s.config.Binder.Bind(b); err != nil { log.V(1).Infof("Failed to bind pod: %+v", err) - s.config.Recorder.Eventf(pod, "FailedScheduling", "Binding rejected: %v", err) + s.config.Recorder.Eventf(pod, FailedScheduling, "Binding rejected: %v", err) s.config.Error(pod, err) return } - s.config.Recorder.Eventf(pod, "Scheduled", "Successfully assigned %v to %v", pod.Name, dest) + s.config.Recorder.Eventf(pod, Scheduled, "Successfully assigned %v to %v", pod.Name, dest) } // this pod may be out of sync with respect to the API server registry: diff --git a/contrib/mesos/pkg/scheduler/plugin_test.go b/contrib/mesos/pkg/scheduler/plugin_test.go index 5223f364c2c..22fc5c87fc0 100644 --- a/contrib/mesos/pkg/scheduler/plugin_test.go +++ b/contrib/mesos/pkg/scheduler/plugin_test.go @@ -17,6 +17,7 @@ limitations under the License. package scheduler import ( + "encoding/json" "fmt" "net/http" "net/http/httptest" @@ -34,7 +35,7 @@ import ( log "github.com/golang/glog" mesos "github.com/mesos/mesos-go/mesosproto" - util "github.com/mesos/mesos-go/mesosutil" + "github.com/mesos/mesos-go/mesosutil" bindings "github.com/mesos/mesos-go/scheduler" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -46,26 +47,48 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource" + "k8s.io/kubernetes/pkg/util" ) // A apiserver mock which partially mocks the pods API type TestServer struct { + stats map[string]uint + nodes map[string]*api.Node + lock sync.Mutex // guards above fields + server *httptest.Server - stats map[string]uint - lock sync.Mutex + t *testing.T +} + +func (srv *TestServer) LookupNode(name string) *api.Node { + srv.lock.Lock() + defer srv.lock.Unlock() + + node, _ := api.Scheme.DeepCopy(srv.nodes[name]) + return node.(*api.Node) +} + +func (srv *TestServer) WaitForNode(name string) { + assertext.EventuallyTrue(srv.t, util.ForeverTestTimeout, func() bool { + return srv.LookupNode(name) != nil + }) } func NewTestServer(t *testing.T, namespace string, mockPodListWatch *MockPodsListWatch) *TestServer { ts := TestServer{ stats: map[string]uint{}, + nodes: map[string]*api.Node{}, + t: t, } mux := http.NewServeMux() - mux.HandleFunc(testapi.Default.ResourcePath("pods", namespace, ""), func(w http.ResponseWriter, r *http.Request) { + podListHandler := func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) pods := mockPodListWatch.Pods() w.Write([]byte(runtime.EncodeOrDie(testapi.Default.Codec(), &pods))) - }) + } + mux.HandleFunc(testapi.Default.ResourcePath("pods", namespace, ""), podListHandler) + mux.HandleFunc(testapi.Default.ResourcePath("pods", "", ""), podListHandler) podsPrefix := testapi.Default.ResourcePath("pods", namespace, "") + "/" mux.HandleFunc(podsPrefix, func(w http.ResponseWriter, r *http.Request) { @@ -76,7 +99,7 @@ func NewTestServer(t *testing.T, namespace string, mockPodListWatch *MockPodsLis defer ts.lock.Unlock() ts.stats[name] = ts.stats[name] + 1 - p := mockPodListWatch.GetPod(name) + p := mockPodListWatch.Pod(name) if p != nil { w.WriteHeader(http.StatusOK) w.Write([]byte(runtime.EncodeOrDie(testapi.Default.Codec(), p))) @@ -85,9 +108,33 @@ func NewTestServer(t *testing.T, namespace string, mockPodListWatch *MockPodsLis w.WriteHeader(http.StatusNotFound) }) - mux.HandleFunc(testapi.Default.ResourcePath("events", namespace, ""), func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - }) + mux.HandleFunc( + testapi.Default.ResourcePath("events", namespace, ""), + func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + }, + ) + + mux.HandleFunc( + testapi.Default.ResourcePath("nodes", "", ""), + func(w http.ResponseWriter, r *http.Request) { + var node api.Node + if err := json.NewDecoder(r.Body).Decode(&node); err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + ts.lock.Lock() + defer ts.lock.Unlock() + ts.nodes[node.Name] = &node + + if err := json.NewEncoder(w).Encode(node); err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) + }, + ) mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) { t.Errorf("unexpected request: %v", req.RequestURI) @@ -97,6 +144,7 @@ func NewTestServer(t *testing.T, namespace string, mockPodListWatch *MockPodsLis ts.server = httptest.NewServer(mux) return &ts } + func (ts *TestServer) Stats(name string) uint { ts.lock.Lock() defer ts.lock.Unlock() @@ -131,13 +179,15 @@ func NewMockPodsListWatch(initialPodList api.PodList) *MockPodsListWatch { } return &lw } + func (lw *MockPodsListWatch) Pods() api.PodList { lw.lock.Lock() defer lw.lock.Unlock() return lw.list } -func (lw *MockPodsListWatch) GetPod(name string) *api.Pod { + +func (lw *MockPodsListWatch) Pod(name string) *api.Pod { lw.lock.Lock() defer lw.lock.Unlock() @@ -173,6 +223,7 @@ func (lw *MockPodsListWatch) Modify(pod *api.Pod, notify bool) { } log.Fatalf("Cannot find pod %v to modify in MockPodsListWatch", pod.Name) } + func (lw *MockPodsListWatch) Delete(pod *api.Pod, notify bool) { lw.lock.Lock() defer lw.lock.Unlock() @@ -229,16 +280,16 @@ func NewTestPod() (*api.Pod, int) { // Offering some cpus and memory and the 8000-9000 port range func NewTestOffer(id string) *mesos.Offer { hostname := "some_hostname" - cpus := util.NewScalarResource("cpus", 3.75) - mem := util.NewScalarResource("mem", 940) + cpus := mesosutil.NewScalarResource("cpus", 3.75) + mem := mesosutil.NewScalarResource("mem", 940) var port8000 uint64 = 8000 var port9000 uint64 = 9000 ports8000to9000 := mesos.Value_Range{Begin: &port8000, End: &port9000} - ports := util.NewRangesResource("ports", []*mesos.Value_Range{&ports8000to9000}) + ports := mesosutil.NewRangesResource("ports", []*mesos.Value_Range{&ports8000to9000}) return &mesos.Offer{ - Id: util.NewOfferID(id), + Id: mesosutil.NewOfferID(id), Hostname: &hostname, - SlaveId: util.NewSlaveID(hostname), + SlaveId: mesosutil.NewSlaveID(hostname), Resources: []*mesos.Resource{cpus, mem, ports}, } } @@ -266,9 +317,11 @@ func NewEventObserver() *EventObserver { fifo: make(chan Event, 1000), } } + func (o *EventObserver) Event(object runtime.Object, reason, message string) { o.fifo <- Event{Object: object, Reason: reason, Message: message} } + func (o *EventObserver) Eventf(object runtime.Object, reason, messageFmt string, args ...interface{}) { o.fifo <- Event{Object: object, Reason: reason, Message: fmt.Sprintf(messageFmt, args...)} } @@ -278,7 +331,7 @@ func (o *EventObserver) PastEventf(object runtime.Object, timestamp unversioned. func (a *EventAssertions) Event(observer *EventObserver, pred EventPredicate, msgAndArgs ...interface{}) bool { // parse msgAndArgs: first possibly a duration, otherwise a format string with further args - timeout := time.Second * 2 + timeout := util.ForeverTestTimeout msg := "event not received" msgArgStart := 0 if len(msgAndArgs) > 0 { @@ -326,6 +379,7 @@ func (a *EventAssertions) Event(observer *EventObserver, pred EventPredicate, ms return a.Fail(msg) } } + func (a *EventAssertions) EventWithReason(observer *EventObserver, reason string, msgAndArgs ...interface{}) bool { return a.Event(observer, func(e Event) bool { return e.Reason == reason @@ -362,6 +416,175 @@ func newTaskStatusForTask(task *mesos.TaskInfo, state mesos.TaskState) *mesos.Ta } } +type LaunchedTask struct { + offerId mesos.OfferID + taskInfo *mesos.TaskInfo +} + +type lifecycleTest struct { + apiServer *TestServer + driver *joinableDriver + eventObs *EventObserver + plugin *schedulingPlugin + podsListWatch *MockPodsListWatch + scheduler *KubernetesScheduler + schedulerProc *ha.SchedulerProcess + t *testing.T +} + +func newLifecycleTest(t *testing.T) lifecycleTest { + assert := &EventAssertions{*assert.New(t)} + + // create a fake pod watch. We use that below to submit new pods to the scheduler + podsListWatch := NewMockPodsListWatch(api.PodList{}) + + // create fake apiserver + apiServer := NewTestServer(t, api.NamespaceDefault, podsListWatch) + + // create executor with some data for static pods if set + executor := mesosutil.NewExecutorInfo( + mesosutil.NewExecutorID("executor-id"), + mesosutil.NewCommandInfo("executor-cmd"), + ) + executor.Data = []byte{0, 1, 2} + + // create scheduler + strategy := NewAllocationStrategy( + podtask.DefaultPredicate, + podtask.NewDefaultProcurement( + mresource.DefaultDefaultContainerCPULimit, + mresource.DefaultDefaultContainerMemLimit, + ), + ) + + scheduler := New(Config{ + Executor: executor, + Client: client.NewOrDie(&client.Config{ + Host: apiServer.server.URL, + Version: testapi.Default.Version(), + }), + Scheduler: NewFCFSPodScheduler(strategy, apiServer.LookupNode), + Schedcfg: *schedcfg.CreateDefaultConfig(), + LookupNode: apiServer.LookupNode, + }) + + assert.NotNil(scheduler.client, "client is nil") + assert.NotNil(scheduler.executor, "executor is nil") + assert.NotNil(scheduler.offers, "offer registry is nil") + + // create scheduler process + schedulerProc := ha.New(scheduler) + + // get plugin config from it + config := scheduler.NewPluginConfig( + schedulerProc.Terminal(), + http.DefaultServeMux, + &podsListWatch.ListWatch, + ) + assert.NotNil(config) + + // make events observable + eventObs := NewEventObserver() + config.Recorder = eventObs + + // create plugin + plugin := NewPlugin(config).(*schedulingPlugin) + assert.NotNil(plugin) + + // create mock mesos scheduler driver + driver := &joinableDriver{} + + return lifecycleTest{ + apiServer: apiServer, + driver: driver, + eventObs: eventObs, + plugin: plugin, + podsListWatch: podsListWatch, + scheduler: scheduler, + schedulerProc: schedulerProc, + t: t, + } +} + +func (lt lifecycleTest) Start() <-chan LaunchedTask { + assert := &EventAssertions{*assert.New(lt.t)} + lt.plugin.Run(lt.schedulerProc.Terminal()) + + // init scheduler + err := lt.scheduler.Init( + lt.schedulerProc.Master(), + lt.plugin, + http.DefaultServeMux, + ) + assert.NoError(err) + + lt.driver.On("Start").Return(mesos.Status_DRIVER_RUNNING, nil).Once() + started := lt.driver.Upon() + + lt.driver.On("ReconcileTasks", + mock.AnythingOfType("[]*mesosproto.TaskStatus"), + ).Return(mesos.Status_DRIVER_RUNNING, nil) + + lt.driver.On("SendFrameworkMessage", + mock.AnythingOfType("*mesosproto.ExecutorID"), + mock.AnythingOfType("*mesosproto.SlaveID"), + mock.AnythingOfType("string"), + ).Return(mesos.Status_DRIVER_RUNNING, nil) + + launchedTasks := make(chan LaunchedTask, 1) + launchTasksFunc := func(args mock.Arguments) { + offerIDs := args.Get(0).([]*mesos.OfferID) + taskInfos := args.Get(1).([]*mesos.TaskInfo) + assert.Equal(1, len(offerIDs)) + assert.Equal(1, len(taskInfos)) + + launchedTasks <- LaunchedTask{ + offerId: *offerIDs[0], + taskInfo: taskInfos[0], + } + } + + lt.driver.On("LaunchTasks", + mock.AnythingOfType("[]*mesosproto.OfferID"), + mock.AnythingOfType("[]*mesosproto.TaskInfo"), + mock.AnythingOfType("*mesosproto.Filters"), + ).Return(mesos.Status_DRIVER_RUNNING, nil).Run(launchTasksFunc) + + lt.driver.On("DeclineOffer", + mock.AnythingOfType("*mesosproto.OfferID"), + mock.AnythingOfType("*mesosproto.Filters"), + ).Return(mesos.Status_DRIVER_RUNNING, nil) + + // elect master with mock driver + driverFactory := ha.DriverFactory(func() (bindings.SchedulerDriver, error) { + return lt.driver, nil + }) + lt.schedulerProc.Elect(driverFactory) + elected := lt.schedulerProc.Elected() + + // driver will be started + <-started + + // tell scheduler to be registered + lt.scheduler.Registered( + lt.driver, + mesosutil.NewFrameworkID("kubernetes-id"), + mesosutil.NewMasterInfo("master-id", (192<<24)+(168<<16)+(0<<8)+1, 5050), + ) + + // wait for being elected + <-elected + return launchedTasks +} + +func (lt lifecycleTest) Close() { + lt.apiServer.server.Close() +} + +func (lt lifecycleTest) End() <-chan struct{} { + return lt.schedulerProc.End() +} + // Test to create the scheduler plugin with an empty plugin config func TestPlugin_New(t *testing.T) { assert := assert.New(t) @@ -371,167 +594,89 @@ func TestPlugin_New(t *testing.T) { assert.NotNil(p) } -// Test to create the scheduler plugin with the config returned by the scheduler, -// and play through the whole life cycle of the plugin while creating pods, deleting +// TestPlugin_LifeCycle creates a scheduler plugin with the config returned by the scheduler, +// and plays through the whole life cycle of the plugin while creating pods, deleting // and failing them. func TestPlugin_LifeCycle(t *testing.T) { - t.Skip("This test is flaky, see #11901") assert := &EventAssertions{*assert.New(t)} - - // create a fake pod watch. We use that below to submit new pods to the scheduler - podListWatch := NewMockPodsListWatch(api.PodList{}) - - // create fake apiserver - testApiServer := NewTestServer(t, api.NamespaceDefault, podListWatch) - defer testApiServer.server.Close() - - // create executor with some data for static pods if set - executor := util.NewExecutorInfo( - util.NewExecutorID("executor-id"), - util.NewCommandInfo("executor-cmd"), - ) - executor.Data = []byte{0, 1, 2} - - // create scheduler - nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc) - as := NewAllocationStrategy( - podtask.DefaultPredicate, - podtask.NewDefaultProcurement(mresource.DefaultDefaultContainerCPULimit, mresource.DefaultDefaultContainerMemLimit)) - testScheduler := New(Config{ - Executor: executor, - Client: client.NewOrDie(&client.Config{Host: testApiServer.server.URL, Version: testapi.Default.Version()}), - Scheduler: NewFCFSPodScheduler(as, func(node string) *api.Node { - obj, _, _ := nodeStore.GetByKey(node) - if obj == nil { - return nil - } - return obj.(*api.Node) - }), - Schedcfg: *schedcfg.CreateDefaultConfig(), - }) - - assert.NotNil(testScheduler.client, "client is nil") - assert.NotNil(testScheduler.executor, "executor is nil") - assert.NotNil(testScheduler.offers, "offer registry is nil") - - // create scheduler process - schedulerProcess := ha.New(testScheduler) - - // get plugin config from it - c := testScheduler.NewPluginConfig(schedulerProcess.Terminal(), http.DefaultServeMux, &podListWatch.ListWatch) - assert.NotNil(c) - - // make events observable - eventObserver := NewEventObserver() - c.Recorder = eventObserver - - // create plugin - p := NewPlugin(c).(*schedulingPlugin) - assert.NotNil(p) + lt := newLifecycleTest(t) + defer lt.Close() // run plugin - p.Run(schedulerProcess.Terminal()) - defer schedulerProcess.End() - - // init scheduler - err := testScheduler.Init(schedulerProcess.Master(), p, http.DefaultServeMux) - assert.NoError(err) - - // create mock mesos scheduler driver - mockDriver := &joinableDriver{} - mockDriver.On("Start").Return(mesos.Status_DRIVER_RUNNING, nil).Once() - started := mockDriver.Upon() - - mAny := mock.AnythingOfType - mockDriver.On("ReconcileTasks", mAny("[]*mesosproto.TaskStatus")).Return(mesos.Status_DRIVER_RUNNING, nil) - mockDriver.On("SendFrameworkMessage", mAny("*mesosproto.ExecutorID"), mAny("*mesosproto.SlaveID"), mAny("string")). - Return(mesos.Status_DRIVER_RUNNING, nil) - - type LaunchedTask struct { - offerId mesos.OfferID - taskInfo *mesos.TaskInfo - } - launchedTasks := make(chan LaunchedTask, 1) - launchTasksCalledFunc := func(args mock.Arguments) { - offerIDs := args.Get(0).([]*mesos.OfferID) - taskInfos := args.Get(1).([]*mesos.TaskInfo) - assert.Equal(1, len(offerIDs)) - assert.Equal(1, len(taskInfos)) - launchedTasks <- LaunchedTask{ - offerId: *offerIDs[0], - taskInfo: taskInfos[0], - } - } - mockDriver.On("LaunchTasks", mAny("[]*mesosproto.OfferID"), mAny("[]*mesosproto.TaskInfo"), mAny("*mesosproto.Filters")). - Return(mesos.Status_DRIVER_RUNNING, nil).Run(launchTasksCalledFunc) - mockDriver.On("DeclineOffer", mAny("*mesosproto.OfferID"), mAny("*mesosproto.Filters")). - Return(mesos.Status_DRIVER_RUNNING, nil) - - // elect master with mock driver - driverFactory := ha.DriverFactory(func() (bindings.SchedulerDriver, error) { - return mockDriver, nil - }) - schedulerProcess.Elect(driverFactory) - elected := schedulerProcess.Elected() - - // driver will be started - <-started - - // tell scheduler to be registered - testScheduler.Registered( - mockDriver, - util.NewFrameworkID("kubernetes-id"), - util.NewMasterInfo("master-id", (192<<24)+(168<<16)+(0<<8)+1, 5050), - ) - - // wait for being elected - <-elected - - //TODO(jdef) refactor things above here into a test suite setup of some sort + launchedTasks := lt.Start() + defer lt.End() // fake new, unscheduled pod pod, i := NewTestPod() - podListWatch.Add(pod, true) // notify watchers + lt.podsListWatch.Add(pod, true) // notify watchers // wait for failedScheduling event because there is no offer - assert.EventWithReason(eventObserver, "failedScheduling", "failedScheduling event not received") + assert.EventWithReason(lt.eventObs, FailedScheduling, "failedScheduling event not received") // add some matching offer offers := []*mesos.Offer{NewTestOffer(fmt.Sprintf("offer%d", i))} - testScheduler.ResourceOffers(nil, offers) + lt.scheduler.ResourceOffers(nil, offers) + + // first offer is declined because node is not available yet + lt.apiServer.WaitForNode("some_hostname") + + // add one more offer + lt.scheduler.ResourceOffers(nil, offers) // and wait for scheduled pod - assert.EventWithReason(eventObserver, "scheduled") + assert.EventWithReason(lt.eventObs, Scheduled) select { case launchedTask := <-launchedTasks: // report back that the task has been staged, and then started by mesos - testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_STAGING)) - testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_RUNNING)) + lt.scheduler.StatusUpdate( + lt.driver, + newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_STAGING), + ) + + lt.scheduler.StatusUpdate( + lt.driver, + newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_RUNNING), + ) // check that ExecutorInfo.data has the static pod data assert.Len(launchedTask.taskInfo.Executor.Data, 3) // report back that the task has been lost - mockDriver.AssertNumberOfCalls(t, "SendFrameworkMessage", 0) - testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_LOST)) + lt.driver.AssertNumberOfCalls(t, "SendFrameworkMessage", 0) + + lt.scheduler.StatusUpdate( + lt.driver, + newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_LOST), + ) // and wait that framework message is sent to executor - mockDriver.AssertNumberOfCalls(t, "SendFrameworkMessage", 1) + lt.driver.AssertNumberOfCalls(t, "SendFrameworkMessage", 1) - case <-time.After(5 * time.Second): + case <-time.After(util.ForeverTestTimeout): t.Fatalf("timed out waiting for launchTasks call") } + offeredNodes := make(map[string]struct{}) + // Launch a pod and wait until the scheduler driver is called schedulePodWithOffers := func(pod *api.Pod, offers []*mesos.Offer) (*api.Pod, *LaunchedTask, *mesos.Offer) { // wait for failedScheduling event because there is no offer - assert.EventWithReason(eventObserver, "failedScheduling", "failedScheduling event not received") + assert.EventWithReason(lt.eventObs, FailedScheduling, "failedScheduling event not received") // supply a matching offer - testScheduler.ResourceOffers(mockDriver, offers) + lt.scheduler.ResourceOffers(lt.driver, offers) + for _, offer := range offers { + if _, ok := offeredNodes[offer.GetHostname()]; !ok { + offeredNodes[offer.GetHostname()] = struct{}{} + lt.apiServer.WaitForNode(offer.GetHostname()) + + // reoffer since it must have been declined above + lt.scheduler.ResourceOffers(lt.driver, []*mesos.Offer{offer}) + } + } // and wait to get scheduled - assert.EventWithReason(eventObserver, "scheduled") + assert.EventWithReason(lt.eventObs, Scheduled) // wait for driver.launchTasks call select { @@ -543,14 +688,15 @@ func TestPlugin_LifeCycle(t *testing.T) { } t.Fatalf("unknown offer used to start a pod") return nil, nil, nil - case <-time.After(5 * time.Second): + case <-time.After(util.ForeverTestTimeout): t.Fatal("timed out waiting for launchTasks") return nil, nil, nil } } + // Launch a pod and wait until the scheduler driver is called launchPodWithOffers := func(pod *api.Pod, offers []*mesos.Offer) (*api.Pod, *LaunchedTask, *mesos.Offer) { - podListWatch.Add(pod, true) + lt.podsListWatch.Add(pod, true) return schedulePodWithOffers(pod, offers) } @@ -560,8 +706,15 @@ func TestPlugin_LifeCycle(t *testing.T) { pod, launchedTask, offer := launchPodWithOffers(pod, offers) if pod != nil { // report back status - testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_STAGING)) - testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_RUNNING)) + lt.scheduler.StatusUpdate( + lt.driver, + newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_STAGING), + ) + lt.scheduler.StatusUpdate( + lt.driver, + newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_RUNNING), + ) + return pod, launchedTask, offer } @@ -577,23 +730,28 @@ func TestPlugin_LifeCycle(t *testing.T) { // start another pod pod, launchedTask, _ := startTestPod() - // mock drvier.KillTask, should be invoked when a pod is deleted - mockDriver.On("KillTask", mAny("*mesosproto.TaskID")).Return(mesos.Status_DRIVER_RUNNING, nil).Run(func(args mock.Arguments) { + // mock driver.KillTask, should be invoked when a pod is deleted + lt.driver.On("KillTask", + mock.AnythingOfType("*mesosproto.TaskID"), + ).Return(mesos.Status_DRIVER_RUNNING, nil).Run(func(args mock.Arguments) { killedTaskId := *(args.Get(0).(*mesos.TaskID)) assert.Equal(*launchedTask.taskInfo.TaskId, killedTaskId, "expected same TaskID as during launch") }) - killTaskCalled := mockDriver.Upon() + killTaskCalled := lt.driver.Upon() // stop it again via the apiserver mock - podListWatch.Delete(pod, true) // notify watchers + lt.podsListWatch.Delete(pod, true) // notify watchers // and wait for the driver killTask call with the correct TaskId select { case <-killTaskCalled: // report back that the task is finished - testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_FINISHED)) + lt.scheduler.StatusUpdate( + lt.driver, + newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_FINISHED), + ) - case <-time.After(5 * time.Second): + case <-time.After(util.ForeverTestTimeout): t.Fatal("timed out waiting for KillTask") } @@ -613,8 +771,8 @@ func TestPlugin_LifeCycle(t *testing.T) { assert.Equal(offers[1].Id.GetValue(), usedOffer.Id.GetValue()) assert.Equal(pod.Spec.NodeName, *usedOffer.Hostname) - testScheduler.OfferRescinded(mockDriver, offers[0].Id) - testScheduler.OfferRescinded(mockDriver, offers[2].Id) + lt.scheduler.OfferRescinded(lt.driver, offers[0].Id) + lt.scheduler.OfferRescinded(lt.driver, offers[2].Id) // start pods: // - which are failing while binding, @@ -622,15 +780,15 @@ func TestPlugin_LifeCycle(t *testing.T) { // - with different states on the apiserver failPodFromExecutor := func(task *mesos.TaskInfo) { - beforePodLookups := testApiServer.Stats(pod.Name) + beforePodLookups := lt.apiServer.Stats(pod.Name) status := newTaskStatusForTask(task, mesos.TaskState_TASK_FAILED) message := messages.CreateBindingFailure status.Message = &message - testScheduler.StatusUpdate(mockDriver, status) + lt.scheduler.StatusUpdate(lt.driver, status) // wait until pod is looked up at the apiserver - assertext.EventuallyTrue(t, time.Second, func() bool { - return testApiServer.Stats(pod.Name) == beforePodLookups+1 + assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool { + return lt.apiServer.Stats(pod.Name) == beforePodLookups+1 }, "expect that reconcileTask will access apiserver for pod %v", pod.Name) } @@ -643,12 +801,12 @@ func TestPlugin_LifeCycle(t *testing.T) { // 1. with pod deleted from the apiserver // expected: pod is removed from internal task registry pod, launchedTask, _ = launchTestPod() - podListWatch.Delete(pod, false) // not notifying the watchers + lt.podsListWatch.Delete(pod, false) // not notifying the watchers failPodFromExecutor(launchedTask.taskInfo) podKey, _ := podtask.MakePodKey(api.NewDefaultContext(), pod.Name) - assertext.EventuallyTrue(t, time.Second, func() bool { - t, _ := p.api.tasks().ForPod(podKey) + assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool { + t, _ := lt.plugin.api.tasks().ForPod(podKey) return t == nil }) @@ -667,7 +825,7 @@ func TestPlugin_LifeCycle(t *testing.T) { meta.BindingHostKey: *usedOffer.Hostname, } pod.Spec.NodeName = *usedOffer.Hostname - podListWatch.Modify(pod, true) // notifying the watchers + lt.podsListWatch.Modify(pod, true) // notifying the watchers time.Sleep(time.Second / 2) failPodFromExecutor(launchedTask.taskInfo) }