From f9635d57788a198cd78362e7210c3521aabeb649 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Tue, 18 Aug 2015 10:31:50 +0200 Subject: [PATCH] [mesos] Handle TASK_ERROR - The TASK_ERROR task status was introduced with Mesos 0.21 and is actually used since 0.22. It was not handled at all before this patch, leaving errored task in the registry in phase "Pending". This will lead to task status updates from the Mesos Master on reconciliation with empty slaveId fields, leading to scheduler crashes eventually. - Handle terminal task with empty slaveId. The slave id can be empty for TASK_ERROR. The modified code path does not use the slaveId. --- contrib/mesos/pkg/scheduler/podtask/registry.go | 11 +++++++++-- contrib/mesos/pkg/scheduler/podtask/registry_test.go | 1 + contrib/mesos/pkg/scheduler/scheduler.go | 4 ++-- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/contrib/mesos/pkg/scheduler/podtask/registry.go b/contrib/mesos/pkg/scheduler/podtask/registry.go index 7de33959030..dedd359eaa5 100644 --- a/contrib/mesos/pkg/scheduler/podtask/registry.go +++ b/contrib/mesos/pkg/scheduler/podtask/registry.go @@ -188,6 +188,8 @@ func (k *inMemoryRegistry) UpdateStatus(status *mesos.TaskStatus) (*T, StateType k.handleTaskFinished(task, state, status) case mesos.TaskState_TASK_FAILED: k.handleTaskFailed(task, state, status) + case mesos.TaskState_TASK_ERROR: + k.handleTaskError(task, state, status) case mesos.TaskState_TASK_KILLED: k.handleTaskKilled(task, state, status) case mesos.TaskState_TASK_LOST: @@ -300,10 +302,15 @@ func (k *inMemoryRegistry) recordFinishedTask(taskId string) *ring.Ring { func (k *inMemoryRegistry) handleTaskFailed(task *T, state StateType, status *mesos.TaskStatus) { switch state { - case StatePending: + case StatePending, StateRunning: delete(k.taskRegistry, task.ID) delete(k.podToTask, task.podKey) - case StateRunning: + } +} + +func (k *inMemoryRegistry) handleTaskError(task *T, state StateType, status *mesos.TaskStatus) { + switch state { + case StatePending, StateRunning: delete(k.taskRegistry, task.ID) delete(k.podToTask, task.podKey) } diff --git a/contrib/mesos/pkg/scheduler/podtask/registry_test.go b/contrib/mesos/pkg/scheduler/podtask/registry_test.go index bdc2a0be10a..c3a2ef6082b 100644 --- a/contrib/mesos/pkg/scheduler/podtask/registry_test.go +++ b/contrib/mesos/pkg/scheduler/podtask/registry_test.go @@ -297,6 +297,7 @@ func TestInMemoryRegistry_TaskLifeCycle(t *testing.T) { func TestInMemoryRegistry_NotFinished(t *testing.T) { // all these behave the same notFinishedStates := []mesos.TaskState{ + mesos.TaskState_TASK_ERROR, mesos.TaskState_TASK_FAILED, mesos.TaskState_TASK_KILLED, mesos.TaskState_TASK_LOST, diff --git a/contrib/mesos/pkg/scheduler/scheduler.go b/contrib/mesos/pkg/scheduler/scheduler.go index 85e5f03804d..58e24e144bb 100644 --- a/contrib/mesos/pkg/scheduler/scheduler.go +++ b/contrib/mesos/pkg/scheduler/scheduler.go @@ -429,7 +429,7 @@ func (k *KubernetesScheduler) StatusUpdate(driver bindings.SchedulerDriver, task log.Errorf("Ignore status %+v because the slave does not exist", taskStatus) return } - case mesos.TaskState_TASK_FAILED: + case mesos.TaskState_TASK_FAILED, mesos.TaskState_TASK_ERROR: if task, _ := k.taskRegistry.UpdateStatus(taskStatus); task != nil { if task.Has(podtask.Launched) && !task.Has(podtask.Bound) { go k.plugin.reconcileTask(task) @@ -449,7 +449,7 @@ func (k *KubernetesScheduler) StatusUpdate(driver bindings.SchedulerDriver, task func (k *KubernetesScheduler) reconcileTerminalTask(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) { task, state := k.taskRegistry.UpdateStatus(taskStatus) - if (state == podtask.StateRunning || state == podtask.StatePending) && taskStatus.SlaveId != nil && + if (state == podtask.StateRunning || state == podtask.StatePending) && ((taskStatus.GetSource() == mesos.TaskStatus_SOURCE_MASTER && taskStatus.GetReason() == mesos.TaskStatus_REASON_RECONCILIATION) || (taskStatus.GetSource() == mesos.TaskStatus_SOURCE_SLAVE && taskStatus.GetReason() == mesos.TaskStatus_REASON_EXECUTOR_TERMINATED) || (taskStatus.GetSource() == mesos.TaskStatus_SOURCE_SLAVE && taskStatus.GetReason() == mesos.TaskStatus_REASON_EXECUTOR_UNREGISTERED)) {