Fix flaky mesos executor test

The TestExecutorFrameworkMessage test sends a "task-lost:foo" message to the
executor in order to mark a pod as lost. For that the pod must be running first.
Otherwise, the executor code will send "TASK_FAILED" status updates, not "TASK_LOST".

Before this patch there was no synchronization between the pod startup and the
test case. Moreover, in order to startup a task a working apiserver URL must be
passed to the executor which was not the case either.

Fixes mesosphere/kubernetes-mesos#351
This commit is contained in:
Dr. Stefan Schimanski 2015-06-15 09:33:07 +02:00
parent a8269e38c9
commit 7abe12d6f4

View File

@ -552,16 +552,38 @@ func TestExecutorStaticPods(t *testing.T) {
// its state. When a Kamikaze message is received, the executor should // its state. When a Kamikaze message is received, the executor should
// attempt suicide. // attempt suicide.
func TestExecutorFrameworkMessage(t *testing.T) { func TestExecutorFrameworkMessage(t *testing.T) {
t.SkipNow() // TODO(jdef) see comment below re: TASK_FAILED // create fake apiserver
podListWatch := NewMockPodsListWatch(api.PodList{})
testApiServer := NewTestServer(t, api.NamespaceDefault, &podListWatch.list)
defer testApiServer.server.Close()
// create and start executor
mockDriver := &MockExecutorDriver{} mockDriver := &MockExecutorDriver{}
kubeletFinished := make(chan struct{}) kubeletFinished := make(chan struct{})
config := Config{ config := Config{
Docker: dockertools.ConnectToDockerOrDie("fake://"), Docker: dockertools.ConnectToDockerOrDie("fake://"),
Updates: make(chan interface{}, 1024), Updates: make(chan interface{}, 1024),
APIClient: client.NewOrDie(&client.Config{ APIClient: client.NewOrDie(&client.Config{
Host: "fakehost", Host: testApiServer.server.URL,
Version: testapi.Version(), Version: testapi.Version(),
}), }),
Kubelet: &fakeKubelet{
Kubelet: &kubelet.Kubelet{},
hostIP: net.IPv4(127, 0, 0, 1),
},
PodStatusFunc: func(kl KubeletInterface, pod *api.Pod) (*api.PodStatus, error) {
return &api.PodStatus{
ContainerStatuses: []api.ContainerStatus{
{
Name: "foo",
State: api.ContainerState{
Running: &api.ContainerStateRunning{},
},
},
},
Phase: api.PodRunning,
}, nil
},
ShutdownAlert: func() { ShutdownAlert: func() {
close(kubeletFinished) close(kubeletFinished)
}, },
@ -583,10 +605,30 @@ func TestExecutorFrameworkMessage(t *testing.T) {
data, _ := testapi.Codec().Encode(pod) data, _ := testapi.Codec().Encode(pod)
taskInfo.Data = data taskInfo.Data = data
mockDriver.On(
"SendStatusUpdate",
mesosproto.TaskState_TASK_STARTING,
).Return(mesosproto.Status_DRIVER_RUNNING, nil).Once()
called := make(chan struct{})
mockDriver.On(
"SendStatusUpdate",
mesosproto.TaskState_TASK_RUNNING,
).Return(mesosproto.Status_DRIVER_RUNNING, nil).Run(func(_ mock.Arguments) { close(called) }).Once()
executor.LaunchTask(mockDriver, taskInfo) executor.LaunchTask(mockDriver, taskInfo)
// waiting until the pod is really running b/c otherwise a TASK_FAILED could be
// triggered by the asynchronously running _launchTask, __launchTask methods
// when removing the task from k.tasks through the "task-lost:foo" message below.
select {
case <-called:
case <-time.After(5 * time.Second):
t.Fatalf("timed out waiting for SendStatusUpdate for the running task")
}
// send task-lost message for it // send task-lost message for it
called := make(chan struct{}) called = make(chan struct{})
mockDriver.On( mockDriver.On(
"SendStatusUpdate", "SendStatusUpdate",
mesosproto.TaskState_TASK_LOST, mesosproto.TaskState_TASK_LOST,
@ -599,7 +641,6 @@ func TestExecutorFrameworkMessage(t *testing.T) {
return len(executor.tasks) == 0 && len(executor.pods) == 0 return len(executor.tasks) == 0 && len(executor.pods) == 0
}, "executor must be able to kill a created task and pod") }, "executor must be able to kill a created task and pod")
//TODO(jdef) still sometimes seeing TASK_FAILED here instead of TASK_LOST
select { select {
case <-called: case <-called:
case <-time.After(5 * time.Second): case <-time.After(5 * time.Second):