Merge pull request #12849 from mesosphere/task-error

WIP/MESOS: Handle Mesos TASK_ERROR status
This commit is contained in:
Marek Grabowski 2015-08-25 09:19:26 +02:00
commit 4d10def14f
3 changed files with 32 additions and 6 deletions

View File

@ -188,6 +188,8 @@ func (k *inMemoryRegistry) UpdateStatus(status *mesos.TaskStatus) (*T, StateType
k.handleTaskFinished(task, state, status)
case mesos.TaskState_TASK_FAILED:
k.handleTaskFailed(task, state, status)
case mesos.TaskState_TASK_ERROR:
k.handleTaskError(task, state, status)
case mesos.TaskState_TASK_KILLED:
k.handleTaskKilled(task, state, status)
case mesos.TaskState_TASK_LOST:
@ -300,10 +302,15 @@ func (k *inMemoryRegistry) recordFinishedTask(taskId string) *ring.Ring {
func (k *inMemoryRegistry) handleTaskFailed(task *T, state StateType, status *mesos.TaskStatus) {
switch state {
case StatePending:
case StatePending, StateRunning:
delete(k.taskRegistry, task.ID)
delete(k.podToTask, task.podKey)
case StateRunning:
}
}
func (k *inMemoryRegistry) handleTaskError(task *T, state StateType, status *mesos.TaskStatus) {
switch state {
case StatePending, StateRunning:
delete(k.taskRegistry, task.ID)
delete(k.podToTask, task.podKey)
}

View File

@ -297,6 +297,7 @@ func TestInMemoryRegistry_TaskLifeCycle(t *testing.T) {
func TestInMemoryRegistry_NotFinished(t *testing.T) {
// all these behave the same
notFinishedStates := []mesos.TaskState{
mesos.TaskState_TASK_ERROR,
mesos.TaskState_TASK_FAILED,
mesos.TaskState_TASK_KILLED,
mesos.TaskState_TASK_LOST,

View File

@ -400,14 +400,21 @@ func (k *KubernetesScheduler) StatusUpdate(driver bindings.SchedulerDriver, task
taskState := taskStatus.GetState()
metrics.StatusUpdates.WithLabelValues(source, reason, taskState.String()).Inc()
message := "none"
if taskStatus.Message != nil {
message = *taskStatus.Message
}
log.Infof(
"task status update %q from %q for task %q on slave %q executor %q for reason %q",
"task status update %q from %q for task %q on slave %q executor %q for reason %q with message %q",
taskState.String(),
source,
taskStatus.TaskId.GetValue(),
taskStatus.SlaveId.GetValue(),
taskStatus.ExecutorId.GetValue(),
reason)
reason,
message,
)
switch taskState {
case mesos.TaskState_TASK_RUNNING, mesos.TaskState_TASK_FINISHED, mesos.TaskState_TASK_STARTING, mesos.TaskState_TASK_STAGING:
@ -429,7 +436,7 @@ func (k *KubernetesScheduler) StatusUpdate(driver bindings.SchedulerDriver, task
log.Errorf("Ignore status %+v because the slave does not exist", taskStatus)
return
}
case mesos.TaskState_TASK_FAILED:
case mesos.TaskState_TASK_FAILED, mesos.TaskState_TASK_ERROR:
if task, _ := k.taskRegistry.UpdateStatus(taskStatus); task != nil {
if task.Has(podtask.Launched) && !task.Has(podtask.Bound) {
go k.plugin.reconcileTask(task)
@ -443,13 +450,24 @@ func (k *KubernetesScheduler) StatusUpdate(driver bindings.SchedulerDriver, task
fallthrough
case mesos.TaskState_TASK_LOST, mesos.TaskState_TASK_KILLED:
k.reconcileTerminalTask(driver, taskStatus)
default:
log.Errorf(
"unknown task status %q from %q for task %q on slave %q executor %q for reason %q with message %q",
taskState.String(),
source,
taskStatus.TaskId.GetValue(),
taskStatus.SlaveId.GetValue(),
taskStatus.ExecutorId.GetValue(),
reason,
message,
)
}
}
func (k *KubernetesScheduler) reconcileTerminalTask(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) {
task, state := k.taskRegistry.UpdateStatus(taskStatus)
if (state == podtask.StateRunning || state == podtask.StatePending) && taskStatus.SlaveId != nil &&
if (state == podtask.StateRunning || state == podtask.StatePending) &&
((taskStatus.GetSource() == mesos.TaskStatus_SOURCE_MASTER && taskStatus.GetReason() == mesos.TaskStatus_REASON_RECONCILIATION) ||
(taskStatus.GetSource() == mesos.TaskStatus_SOURCE_SLAVE && taskStatus.GetReason() == mesos.TaskStatus_REASON_EXECUTOR_TERMINATED) ||
(taskStatus.GetSource() == mesos.TaskStatus_SOURCE_SLAVE && taskStatus.GetReason() == mesos.TaskStatus_REASON_EXECUTOR_UNREGISTERED)) {