diff --git a/contrib/mesos/pkg/executor/executor_test.go b/contrib/mesos/pkg/executor/executor_test.go index d38b9936bec..2678665616c 100644 --- a/contrib/mesos/pkg/executor/executor_test.go +++ b/contrib/mesos/pkg/executor/executor_test.go @@ -552,16 +552,38 @@ func TestExecutorStaticPods(t *testing.T) { // its state. When a Kamikaze message is received, the executor should // attempt suicide. func TestExecutorFrameworkMessage(t *testing.T) { - t.SkipNow() // TODO(jdef) see comment below re: TASK_FAILED + // create fake apiserver + podListWatch := NewMockPodsListWatch(api.PodList{}) + testApiServer := NewTestServer(t, api.NamespaceDefault, &podListWatch.list) + defer testApiServer.server.Close() + + // create and start executor mockDriver := &MockExecutorDriver{} kubeletFinished := make(chan struct{}) config := Config{ Docker: dockertools.ConnectToDockerOrDie("fake://"), Updates: make(chan interface{}, 1024), APIClient: client.NewOrDie(&client.Config{ - Host: "fakehost", + Host: testApiServer.server.URL, Version: testapi.Version(), }), + Kubelet: &fakeKubelet{ + Kubelet: &kubelet.Kubelet{}, + hostIP: net.IPv4(127, 0, 0, 1), + }, + PodStatusFunc: func(kl KubeletInterface, pod *api.Pod) (*api.PodStatus, error) { + return &api.PodStatus{ + ContainerStatuses: []api.ContainerStatus{ + { + Name: "foo", + State: api.ContainerState{ + Running: &api.ContainerStateRunning{}, + }, + }, + }, + Phase: api.PodRunning, + }, nil + }, ShutdownAlert: func() { close(kubeletFinished) }, @@ -583,10 +605,30 @@ func TestExecutorFrameworkMessage(t *testing.T) { data, _ := testapi.Codec().Encode(pod) taskInfo.Data = data + mockDriver.On( + "SendStatusUpdate", + mesosproto.TaskState_TASK_STARTING, + ).Return(mesosproto.Status_DRIVER_RUNNING, nil).Once() + + called := make(chan struct{}) + mockDriver.On( + "SendStatusUpdate", + mesosproto.TaskState_TASK_RUNNING, + ).Return(mesosproto.Status_DRIVER_RUNNING, nil).Run(func(_ mock.Arguments) { close(called) }).Once() + executor.LaunchTask(mockDriver, taskInfo) + // waiting until the pod is really running b/c otherwise a TASK_FAILED could be + // triggered by the asynchronously running _launchTask, __launchTask methods + // when removing the task from k.tasks through the "task-lost:foo" message below. + select { + case <-called: + case <-time.After(5 * time.Second): + t.Fatalf("timed out waiting for SendStatusUpdate for the running task") + } + // send task-lost message for it - called := make(chan struct{}) + called = make(chan struct{}) mockDriver.On( "SendStatusUpdate", mesosproto.TaskState_TASK_LOST, @@ -599,7 +641,6 @@ func TestExecutorFrameworkMessage(t *testing.T) { return len(executor.tasks) == 0 && len(executor.pods) == 0 }, "executor must be able to kill a created task and pod") - //TODO(jdef) still sometimes seeing TASK_FAILED here instead of TASK_LOST select { case <-called: case <-time.After(5 * time.Second):