diff --git a/contrib/mesos/pkg/scheduler/podtask/registry.go b/contrib/mesos/pkg/scheduler/podtask/registry.go index 7de33959030..dedd359eaa5 100644 --- a/contrib/mesos/pkg/scheduler/podtask/registry.go +++ b/contrib/mesos/pkg/scheduler/podtask/registry.go @@ -188,6 +188,8 @@ func (k *inMemoryRegistry) UpdateStatus(status *mesos.TaskStatus) (*T, StateType k.handleTaskFinished(task, state, status) case mesos.TaskState_TASK_FAILED: k.handleTaskFailed(task, state, status) + case mesos.TaskState_TASK_ERROR: + k.handleTaskError(task, state, status) case mesos.TaskState_TASK_KILLED: k.handleTaskKilled(task, state, status) case mesos.TaskState_TASK_LOST: @@ -300,10 +302,15 @@ func (k *inMemoryRegistry) recordFinishedTask(taskId string) *ring.Ring { func (k *inMemoryRegistry) handleTaskFailed(task *T, state StateType, status *mesos.TaskStatus) { switch state { - case StatePending: + case StatePending, StateRunning: delete(k.taskRegistry, task.ID) delete(k.podToTask, task.podKey) - case StateRunning: + } +} + +func (k *inMemoryRegistry) handleTaskError(task *T, state StateType, status *mesos.TaskStatus) { + switch state { + case StatePending, StateRunning: delete(k.taskRegistry, task.ID) delete(k.podToTask, task.podKey) } diff --git a/contrib/mesos/pkg/scheduler/podtask/registry_test.go b/contrib/mesos/pkg/scheduler/podtask/registry_test.go index bdc2a0be10a..c3a2ef6082b 100644 --- a/contrib/mesos/pkg/scheduler/podtask/registry_test.go +++ b/contrib/mesos/pkg/scheduler/podtask/registry_test.go @@ -297,6 +297,7 @@ func TestInMemoryRegistry_TaskLifeCycle(t *testing.T) { func TestInMemoryRegistry_NotFinished(t *testing.T) { // all these behave the same notFinishedStates := []mesos.TaskState{ + mesos.TaskState_TASK_ERROR, mesos.TaskState_TASK_FAILED, mesos.TaskState_TASK_KILLED, mesos.TaskState_TASK_LOST, diff --git a/contrib/mesos/pkg/scheduler/scheduler.go b/contrib/mesos/pkg/scheduler/scheduler.go index 85e5f03804d..d15e19bf0fd 100644 --- a/contrib/mesos/pkg/scheduler/scheduler.go +++ b/contrib/mesos/pkg/scheduler/scheduler.go @@ -400,14 +400,21 @@ func (k *KubernetesScheduler) StatusUpdate(driver bindings.SchedulerDriver, task taskState := taskStatus.GetState() metrics.StatusUpdates.WithLabelValues(source, reason, taskState.String()).Inc() + message := "none" + if taskStatus.Message != nil { + message = *taskStatus.Message + } + log.Infof( - "task status update %q from %q for task %q on slave %q executor %q for reason %q", + "task status update %q from %q for task %q on slave %q executor %q for reason %q with message %q", taskState.String(), source, taskStatus.TaskId.GetValue(), taskStatus.SlaveId.GetValue(), taskStatus.ExecutorId.GetValue(), - reason) + reason, + message, + ) switch taskState { case mesos.TaskState_TASK_RUNNING, mesos.TaskState_TASK_FINISHED, mesos.TaskState_TASK_STARTING, mesos.TaskState_TASK_STAGING: @@ -429,7 +436,7 @@ func (k *KubernetesScheduler) StatusUpdate(driver bindings.SchedulerDriver, task log.Errorf("Ignore status %+v because the slave does not exist", taskStatus) return } - case mesos.TaskState_TASK_FAILED: + case mesos.TaskState_TASK_FAILED, mesos.TaskState_TASK_ERROR: if task, _ := k.taskRegistry.UpdateStatus(taskStatus); task != nil { if task.Has(podtask.Launched) && !task.Has(podtask.Bound) { go k.plugin.reconcileTask(task) @@ -443,13 +450,24 @@ func (k *KubernetesScheduler) StatusUpdate(driver bindings.SchedulerDriver, task fallthrough case mesos.TaskState_TASK_LOST, mesos.TaskState_TASK_KILLED: k.reconcileTerminalTask(driver, taskStatus) + default: + log.Errorf( + "unknown task status %q from %q for task %q on slave %q executor %q for reason %q with message %q", + taskState.String(), + source, + taskStatus.TaskId.GetValue(), + taskStatus.SlaveId.GetValue(), + taskStatus.ExecutorId.GetValue(), + reason, + message, + ) } } func (k *KubernetesScheduler) reconcileTerminalTask(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) { task, state := k.taskRegistry.UpdateStatus(taskStatus) - if (state == podtask.StateRunning || state == podtask.StatePending) && taskStatus.SlaveId != nil && + if (state == podtask.StateRunning || state == podtask.StatePending) && ((taskStatus.GetSource() == mesos.TaskStatus_SOURCE_MASTER && taskStatus.GetReason() == mesos.TaskStatus_REASON_RECONCILIATION) || (taskStatus.GetSource() == mesos.TaskStatus_SOURCE_SLAVE && taskStatus.GetReason() == mesos.TaskStatus_REASON_EXECUTOR_TERMINATED) || (taskStatus.GetSource() == mesos.TaskStatus_SOURCE_SLAVE && taskStatus.GetReason() == mesos.TaskStatus_REASON_EXECUTOR_UNREGISTERED)) {