Merge pull request #14489 from mesosphere/jdef-fix505-zombies-after-sigterm

MESOS: try not to leave dangling zombie procs when minion controller exits
This commit is contained in:
Brian Grant 2015-09-25 13:29:55 -07:00
commit e122b4199a
2 changed files with 145 additions and 40 deletions

View File

@ -29,7 +29,18 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/runtime" "k8s.io/kubernetes/contrib/mesos/pkg/runtime"
) )
const defaultTaskRestartDelay = 5 * time.Second const (
defaultTaskRestartDelay = 5 * time.Second
// TODO(jdef) there's no easy way for us to discover the grace period that we actually
// have, from mesos: it's simply a missing core feature. there's a MESOS-xyz ticket for
// this somewhere. if it was discoverable then we could come up with a better strategy.
// there are some comments in the executor regarding this as well (because there we're
// concerned about cleaning up pods within the grace period). we could pick a some
// higher (arbitrary) value but without knowing when the slave will forcibly kill us
// it seems a somewhat futile exercise.
defaultKillGracePeriod = 5 * time.Second
)
// Completion represents the termination of a Task process. Each process execution should // Completion represents the termination of a Task process. Each process execution should
// yield (barring drops because of an abort signal) exactly one Completion. // yield (barring drops because of an abort signal) exactly one Completion.
@ -45,7 +56,7 @@ type systemProcess interface {
Wait() error Wait() error
// Kill returns the pid of the process that was killed // Kill returns the pid of the process that was killed
Kill() (int, error) Kill(force bool) (int, error)
} }
type cmdProcess struct { type cmdProcess struct {
@ -56,7 +67,7 @@ func (cp *cmdProcess) Wait() error {
return cp.delegate.Wait() return cp.delegate.Wait()
} }
func (cp *cmdProcess) Kill() (int, error) { func (cp *cmdProcess) Kill(force bool) (int, error) {
// kill the entire process group, not just the one process // kill the entire process group, not just the one process
pid := cp.delegate.Process.Pid pid := cp.delegate.Process.Pid
processGroup := 0 - pid processGroup := 0 - pid
@ -64,7 +75,11 @@ func (cp *cmdProcess) Kill() (int, error) {
// we send a SIGTERM here for a graceful stop. users of this package should // we send a SIGTERM here for a graceful stop. users of this package should
// wait for tasks to complete normally. as a fallback/safeguard, child procs // wait for tasks to complete normally. as a fallback/safeguard, child procs
// are spawned in notStartedTask to receive a SIGKILL when this process dies. // are spawned in notStartedTask to receive a SIGKILL when this process dies.
rc := syscall.Kill(processGroup, syscall.SIGTERM) sig := syscall.SIGTERM
if force {
sig = syscall.SIGKILL
}
rc := syscall.Kill(processGroup, sig)
return pid, rc return pid, rc
} }
@ -86,12 +101,13 @@ type Task struct {
done chan struct{} // done closes when all processes related to the task have terminated done chan struct{} // done closes when all processes related to the task have terminated
initialState taskStateFn // prepare and start a new live process, defaults to notStartedTask; should be set by run() initialState taskStateFn // prepare and start a new live process, defaults to notStartedTask; should be set by run()
runLatch int32 // guard against multiple Task.run calls runLatch int32 // guard against multiple Task.run calls
killFunc func(bool) (int, error)
} }
// New builds a newly initialized task object but does not start any processes for it. callers // New builds a newly initialized task object but does not start any processes for it. callers
// are expected to invoke task.run(...) on their own. // are expected to invoke task.run(...) on their own.
func New(name, bin string, args, env []string, cl func() io.WriteCloser) *Task { func New(name, bin string, args, env []string, cl func() io.WriteCloser) *Task {
return &Task{ t := &Task{
name: name, name: name,
bin: bin, bin: bin,
args: args, args: args,
@ -103,6 +119,8 @@ func New(name, bin string, args, env []string, cl func() io.WriteCloser) *Task {
RestartDelay: defaultTaskRestartDelay, RestartDelay: defaultTaskRestartDelay,
Finished: func(restarting bool) bool { return restarting }, Finished: func(restarting bool) bool { return restarting },
} }
t.killFunc = func(force bool) (int, error) { return t.cmd.Kill(force) }
return t
} }
// Start spawns a goroutine to execute the Task. Panics if invoked more than once. // Start spawns a goroutine to execute the Task. Panics if invoked more than once.
@ -241,63 +259,79 @@ type exitError interface {
} }
func taskRunning(t *Task) taskStateFn { func taskRunning(t *Task) taskStateFn {
waiter := t.cmd.Wait
var sendOnce sync.Once
trySend := func(wr *Completion) {
// guarded with once because we're only allowed to send a single "result" for each
// process termination. for example, if Kill() results in an error because Wait()
// already completed we only want to return a single result for the process.
sendOnce.Do(func() {
t.tryComplete(wr)
})
}
// listen for normal process completion in a goroutine; don't block because we need to listen for shouldQuit // listen for normal process completion in a goroutine; don't block because we need to listen for shouldQuit
waitCh := make(chan *Completion, 1) waitCh := make(chan *Completion, 1)
go func() { go func() {
wr := &Completion{} wr := &Completion{name: t.name}
defer func() { defer func() {
waitCh <- wr waitCh <- wr
close(waitCh) close(waitCh)
}() }()
if err := waiter(); err != nil { if err := t.cmd.Wait(); err != nil {
if exitError, ok := err.(exitError); ok { if exitError, ok := err.(exitError); ok {
if waitStatus, ok := exitError.Sys().(syscall.WaitStatus); ok { if waitStatus, ok := exitError.Sys().(syscall.WaitStatus); ok {
wr.name = t.name
wr.code = waitStatus.ExitStatus() wr.code = waitStatus.ExitStatus()
return return
} }
} }
wr.err = fmt.Errorf("task wait ended strangely for %q: %v", t.bin, err) wr.err = fmt.Errorf("task wait ended strangely for %q: %v", t.bin, err)
} else {
wr.name = t.name
} }
}() }()
// wait for the process to complete, or else for a "quit" signal on the task (at which point we'll attempt to kill manually)
select { select {
case <-t.shouldQuit: case <-t.shouldQuit:
// check for tie t.tryComplete(t.awaitDeath(defaultKillGracePeriod, waitCh))
select {
case wr := <-waitCh: case wr := <-waitCh:
// we got a signal to quit, but we're already finished; attempt best effort delvery t.tryComplete(wr)
trySend(wr)
default:
// Wait() has not exited yet, kill the process
log.Infof("killing %s : %s", t.name, t.bin)
pid, err := t.cmd.Kill()
if err != nil {
trySend(&Completion{err: fmt.Errorf("failed to kill process: %q pid %d: %v", t.bin, pid, err)})
}
// else, Wait() should complete and send a completion event
}
case wr := <-waitCh:
// task has completed before we were told to quit, pass along completion and error information
trySend(wr)
} }
return taskShouldRestart return taskShouldRestart
} }
// awaitDeath waits for the process to complete, or else for a "quit" signal on the task-
// at which point we'll attempt to kill manually.
func (t *Task) awaitDeath(gracePeriod time.Duration, waitCh <-chan *Completion) *Completion {
select {
case wr := <-waitCh:
// got a signal to quit, but we're already finished
return wr
default:
}
forceKill := false
wr := &Completion{name: t.name, err: fmt.Errorf("failed to kill process: %q", t.bin)}
// the loop is here in case we receive a shouldQuit signal; we need to kill the task.
// in this case, first send a SIGTERM (force=false) to the task and then wait for it
// to die (within the gracePeriod). if it doesn't die, then we loop around only this
// time we'll send a SIGKILL (force=true) and wait for a reduced gracePeriod. There
// does exist a slim chance that the underlying wait4() syscall won't complete before
// this process dies, in which case a zombie will rise. Starting the mesos slave with
// pid namespace isolation should mitigate this.
waitLoop:
for i := 0; i < 2; i++ {
log.Infof("killing %s (force=%t) : %s", t.name, forceKill, t.bin)
pid, err := t.killFunc(forceKill)
if err != nil {
log.Warningf("failed to kill process: %q pid %d: %v", t.bin, pid, err)
break waitLoop
}
// Wait for the kill to be processed, and child proc resources cleaned up; try to avoid zombies!
select {
case wr = <-waitCh:
break waitLoop
case <-time.After(gracePeriod):
// want a timeout, but a shorter one than we used initially.
// using /= 2 is deterministic and yields the desirable effect.
gracePeriod /= 2
forceKill = true
continue waitLoop
}
}
return wr
}
// forwardUntil forwards task process completion status and errors to the given output // forwardUntil forwards task process completion status and errors to the given output
// chans until either the task terminates or abort is closed. // chans until either the task terminates or abort is closed.
func (t *Task) forwardUntil(tch chan<- *Completion, abort <-chan struct{}) { func (t *Task) forwardUntil(tch chan<- *Completion, abort <-chan struct{}) {

View File

@ -26,6 +26,7 @@ import (
"testing" "testing"
log "github.com/golang/glog" log "github.com/golang/glog"
"github.com/stretchr/testify/assert"
) )
type badWriteCloser struct { type badWriteCloser struct {
@ -57,7 +58,7 @@ func (f *fakeProcess) Wait() error {
<-f.done <-f.done
return f.err return f.err
} }
func (f *fakeProcess) Kill() (int, error) { func (f *fakeProcess) Kill(_ bool) (int, error) {
close(f.done) close(f.done)
return f.pid, f.err return f.pid, f.err
} }
@ -78,7 +79,7 @@ func TestBadLogger(t *testing.T) {
tt := New("foo", "bar", nil, nil, func() io.WriteCloser { tt := New("foo", "bar", nil, nil, func() io.WriteCloser {
defer func() { defer func() {
fp.pid = 123 // sanity check fp.pid = 123 // sanity check
fp.Kill() // this causes Wait() to return fp.Kill(false) // this causes Wait() to return
}() }()
return &badWriteCloser{err} return &badWriteCloser{err}
}) })
@ -220,3 +221,73 @@ func TestMergeOutput(t *testing.T) {
log.Infoln("waiting for merge to complete") log.Infoln("waiting for merge to complete")
<-te.Done() // wait for the merge to complete <-te.Done() // wait for the merge to complete
} }
func TestAfterDeath(t *testing.T) {
// test kill escalation since that's not covered by other unit tests
t1 := New("foo", "", nil, nil, devNull)
kills := 0
waitCh := make(chan *Completion, 1)
t1.killFunc = func(force bool) (int, error) {
// > 0 is intentional, multiple calls to close() should panic
if kills > 0 {
assert.True(t, force)
waitCh <- &Completion{name: t1.name, code: 123}
close(waitCh)
} else {
assert.False(t, force)
}
kills++
return 0, nil
}
wr := t1.awaitDeath(0, waitCh)
assert.Equal(t, "foo", wr.name)
assert.Equal(t, 123, wr.code)
assert.NoError(t, wr.err)
// test tie between shouldQuit and waitCh
waitCh = make(chan *Completion, 1)
waitCh <- &Completion{name: t1.name, code: 456}
close(waitCh)
t1.killFunc = func(force bool) (int, error) {
t.Fatalf("should not attempt to kill a task that has already reported completion")
return 0, nil
}
wr = t1.awaitDeath(0, waitCh)
assert.Equal(t, 456, wr.code)
assert.NoError(t, wr.err)
// test delayed killFunc failure
kills = 0
killFailed := errors.New("for some reason kill failed")
t1.killFunc = func(force bool) (int, error) {
// > 0 is intentional, multiple calls to close() should panic
if kills > 0 {
assert.True(t, force)
return -1, killFailed
} else {
assert.False(t, force)
}
kills++
return 0, nil
}
wr = t1.awaitDeath(0, nil)
assert.Equal(t, "foo", wr.name)
assert.Error(t, wr.err)
// test initial killFunc failure
kills = 0
t1.killFunc = func(force bool) (int, error) {
// > 0 is intentional, multiple calls to close() should panic
if kills > 0 {
assert.True(t, force)
t.Fatalf("killFunc should only be invoked once, not again after is has already failed")
} else {
assert.False(t, force)
}
kills++
return 0, killFailed
}
wr = t1.awaitDeath(0, nil)
assert.Equal(t, "foo", wr.name)
assert.Error(t, wr.err)
}