[mesos] Handle TASK_ERROR

- The TASK_ERROR task status was introduced with Mesos 0.21 and is actually used since 0.22.
  It was not handled at all before this patch, leaving errored task in the registry in phase
  "Pending". This will lead to task status updates from the Mesos Master on reconciliation with empty
  slaveId fields, leading to scheduler crashes eventually.
- Handle terminal task with empty slaveId.
  The slave id can be empty for TASK_ERROR.
  The modified code path does not use the slaveId.
This commit is contained in:
Dr. Stefan Schimanski 2015-08-18 10:31:50 +02:00 committed by Karl Isenberg
parent 9eb0970bdb
commit f9635d5778
3 changed files with 12 additions and 4 deletions

View File

@ -188,6 +188,8 @@ func (k *inMemoryRegistry) UpdateStatus(status *mesos.TaskStatus) (*T, StateType
k.handleTaskFinished(task, state, status)
case mesos.TaskState_TASK_FAILED:
k.handleTaskFailed(task, state, status)
case mesos.TaskState_TASK_ERROR:
k.handleTaskError(task, state, status)
case mesos.TaskState_TASK_KILLED:
k.handleTaskKilled(task, state, status)
case mesos.TaskState_TASK_LOST:
@ -300,10 +302,15 @@ func (k *inMemoryRegistry) recordFinishedTask(taskId string) *ring.Ring {
func (k *inMemoryRegistry) handleTaskFailed(task *T, state StateType, status *mesos.TaskStatus) {
switch state {
case StatePending:
case StatePending, StateRunning:
delete(k.taskRegistry, task.ID)
delete(k.podToTask, task.podKey)
case StateRunning:
}
}
func (k *inMemoryRegistry) handleTaskError(task *T, state StateType, status *mesos.TaskStatus) {
switch state {
case StatePending, StateRunning:
delete(k.taskRegistry, task.ID)
delete(k.podToTask, task.podKey)
}

View File

@ -297,6 +297,7 @@ func TestInMemoryRegistry_TaskLifeCycle(t *testing.T) {
func TestInMemoryRegistry_NotFinished(t *testing.T) {
// all these behave the same
notFinishedStates := []mesos.TaskState{
mesos.TaskState_TASK_ERROR,
mesos.TaskState_TASK_FAILED,
mesos.TaskState_TASK_KILLED,
mesos.TaskState_TASK_LOST,

View File

@ -429,7 +429,7 @@ func (k *KubernetesScheduler) StatusUpdate(driver bindings.SchedulerDriver, task
log.Errorf("Ignore status %+v because the slave does not exist", taskStatus)
return
}
case mesos.TaskState_TASK_FAILED:
case mesos.TaskState_TASK_FAILED, mesos.TaskState_TASK_ERROR:
if task, _ := k.taskRegistry.UpdateStatus(taskStatus); task != nil {
if task.Has(podtask.Launched) && !task.Has(podtask.Bound) {
go k.plugin.reconcileTask(task)
@ -449,7 +449,7 @@ func (k *KubernetesScheduler) StatusUpdate(driver bindings.SchedulerDriver, task
func (k *KubernetesScheduler) reconcileTerminalTask(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) {
task, state := k.taskRegistry.UpdateStatus(taskStatus)
if (state == podtask.StateRunning || state == podtask.StatePending) && taskStatus.SlaveId != nil &&
if (state == podtask.StateRunning || state == podtask.StatePending) &&
((taskStatus.GetSource() == mesos.TaskStatus_SOURCE_MASTER && taskStatus.GetReason() == mesos.TaskStatus_REASON_RECONCILIATION) ||
(taskStatus.GetSource() == mesos.TaskStatus_SOURCE_SLAVE && taskStatus.GetReason() == mesos.TaskStatus_REASON_EXECUTOR_TERMINATED) ||
(taskStatus.GetSource() == mesos.TaskStatus_SOURCE_SLAVE && taskStatus.GetReason() == mesos.TaskStatus_REASON_EXECUTOR_UNREGISTERED)) {