From ec2605a7586018f8cea8eadf6e1c6ead667d32b6 Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Thu, 24 Sep 2015 12:39:29 +0000 Subject: [PATCH] wait for some grace period for kill proc to die, upgrading from SIGTERM to SIGKILL if needed - refactor kill signal escalation into separate func for easier unit testing - added unit test for signal escalation --- contrib/mesos/pkg/minion/tasks/task.go | 108 +++++++++++++------- contrib/mesos/pkg/minion/tasks/task_test.go | 77 +++++++++++++- 2 files changed, 145 insertions(+), 40 deletions(-) diff --git a/contrib/mesos/pkg/minion/tasks/task.go b/contrib/mesos/pkg/minion/tasks/task.go index f6ddb52a9e6..1203bdcba90 100644 --- a/contrib/mesos/pkg/minion/tasks/task.go +++ b/contrib/mesos/pkg/minion/tasks/task.go @@ -29,7 +29,18 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/runtime" ) -const defaultTaskRestartDelay = 5 * time.Second +const ( + defaultTaskRestartDelay = 5 * time.Second + + // TODO(jdef) there's no easy way for us to discover the grace period that we actually + // have, from mesos: it's simply a missing core feature. there's a MESOS-xyz ticket for + // this somewhere. if it was discoverable then we could come up with a better strategy. + // there are some comments in the executor regarding this as well (because there we're + // concerned about cleaning up pods within the grace period). we could pick a some + // higher (arbitrary) value but without knowing when the slave will forcibly kill us + // it seems a somewhat futile exercise. + defaultKillGracePeriod = 5 * time.Second +) // Completion represents the termination of a Task process. Each process execution should // yield (barring drops because of an abort signal) exactly one Completion. @@ -45,7 +56,7 @@ type systemProcess interface { Wait() error // Kill returns the pid of the process that was killed - Kill() (int, error) + Kill(force bool) (int, error) } type cmdProcess struct { @@ -56,7 +67,7 @@ func (cp *cmdProcess) Wait() error { return cp.delegate.Wait() } -func (cp *cmdProcess) Kill() (int, error) { +func (cp *cmdProcess) Kill(force bool) (int, error) { // kill the entire process group, not just the one process pid := cp.delegate.Process.Pid processGroup := 0 - pid @@ -64,7 +75,11 @@ func (cp *cmdProcess) Kill() (int, error) { // we send a SIGTERM here for a graceful stop. users of this package should // wait for tasks to complete normally. as a fallback/safeguard, child procs // are spawned in notStartedTask to receive a SIGKILL when this process dies. - rc := syscall.Kill(processGroup, syscall.SIGTERM) + sig := syscall.SIGTERM + if force { + sig = syscall.SIGKILL + } + rc := syscall.Kill(processGroup, sig) return pid, rc } @@ -86,12 +101,13 @@ type Task struct { done chan struct{} // done closes when all processes related to the task have terminated initialState taskStateFn // prepare and start a new live process, defaults to notStartedTask; should be set by run() runLatch int32 // guard against multiple Task.run calls + killFunc func(bool) (int, error) } // New builds a newly initialized task object but does not start any processes for it. callers // are expected to invoke task.run(...) on their own. func New(name, bin string, args, env []string, cl func() io.WriteCloser) *Task { - return &Task{ + t := &Task{ name: name, bin: bin, args: args, @@ -103,6 +119,8 @@ func New(name, bin string, args, env []string, cl func() io.WriteCloser) *Task { RestartDelay: defaultTaskRestartDelay, Finished: func(restarting bool) bool { return restarting }, } + t.killFunc = func(force bool) (int, error) { return t.cmd.Kill(force) } + return t } // Start spawns a goroutine to execute the Task. Panics if invoked more than once. @@ -241,63 +259,79 @@ type exitError interface { } func taskRunning(t *Task) taskStateFn { - waiter := t.cmd.Wait - var sendOnce sync.Once - trySend := func(wr *Completion) { - // guarded with once because we're only allowed to send a single "result" for each - // process termination. for example, if Kill() results in an error because Wait() - // already completed we only want to return a single result for the process. - sendOnce.Do(func() { - t.tryComplete(wr) - }) - } // listen for normal process completion in a goroutine; don't block because we need to listen for shouldQuit waitCh := make(chan *Completion, 1) go func() { - wr := &Completion{} + wr := &Completion{name: t.name} defer func() { waitCh <- wr close(waitCh) }() - if err := waiter(); err != nil { + if err := t.cmd.Wait(); err != nil { if exitError, ok := err.(exitError); ok { if waitStatus, ok := exitError.Sys().(syscall.WaitStatus); ok { - wr.name = t.name wr.code = waitStatus.ExitStatus() return } } wr.err = fmt.Errorf("task wait ended strangely for %q: %v", t.bin, err) - } else { - wr.name = t.name } }() - // wait for the process to complete, or else for a "quit" signal on the task (at which point we'll attempt to kill manually) select { case <-t.shouldQuit: - // check for tie - select { - case wr := <-waitCh: - // we got a signal to quit, but we're already finished; attempt best effort delvery - trySend(wr) - default: - // Wait() has not exited yet, kill the process - log.Infof("killing %s : %s", t.name, t.bin) - pid, err := t.cmd.Kill() - if err != nil { - trySend(&Completion{err: fmt.Errorf("failed to kill process: %q pid %d: %v", t.bin, pid, err)}) - } - // else, Wait() should complete and send a completion event - } + t.tryComplete(t.awaitDeath(defaultKillGracePeriod, waitCh)) case wr := <-waitCh: - // task has completed before we were told to quit, pass along completion and error information - trySend(wr) + t.tryComplete(wr) } return taskShouldRestart } +// awaitDeath waits for the process to complete, or else for a "quit" signal on the task- +// at which point we'll attempt to kill manually. +func (t *Task) awaitDeath(gracePeriod time.Duration, waitCh <-chan *Completion) *Completion { + select { + case wr := <-waitCh: + // got a signal to quit, but we're already finished + return wr + default: + } + + forceKill := false + wr := &Completion{name: t.name, err: fmt.Errorf("failed to kill process: %q", t.bin)} + + // the loop is here in case we receive a shouldQuit signal; we need to kill the task. + // in this case, first send a SIGTERM (force=false) to the task and then wait for it + // to die (within the gracePeriod). if it doesn't die, then we loop around only this + // time we'll send a SIGKILL (force=true) and wait for a reduced gracePeriod. There + // does exist a slim chance that the underlying wait4() syscall won't complete before + // this process dies, in which case a zombie will rise. Starting the mesos slave with + // pid namespace isolation should mitigate this. +waitLoop: + for i := 0; i < 2; i++ { + log.Infof("killing %s (force=%t) : %s", t.name, forceKill, t.bin) + pid, err := t.killFunc(forceKill) + if err != nil { + log.Warningf("failed to kill process: %q pid %d: %v", t.bin, pid, err) + break waitLoop + } + + // Wait for the kill to be processed, and child proc resources cleaned up; try to avoid zombies! + select { + case wr = <-waitCh: + break waitLoop + case <-time.After(gracePeriod): + // want a timeout, but a shorter one than we used initially. + // using /= 2 is deterministic and yields the desirable effect. + gracePeriod /= 2 + forceKill = true + continue waitLoop + } + } + return wr +} + // forwardUntil forwards task process completion status and errors to the given output // chans until either the task terminates or abort is closed. func (t *Task) forwardUntil(tch chan<- *Completion, abort <-chan struct{}) { diff --git a/contrib/mesos/pkg/minion/tasks/task_test.go b/contrib/mesos/pkg/minion/tasks/task_test.go index 255e63489b6..a9d1803c334 100644 --- a/contrib/mesos/pkg/minion/tasks/task_test.go +++ b/contrib/mesos/pkg/minion/tasks/task_test.go @@ -26,6 +26,7 @@ import ( "testing" log "github.com/golang/glog" + "github.com/stretchr/testify/assert" ) type badWriteCloser struct { @@ -57,7 +58,7 @@ func (f *fakeProcess) Wait() error { <-f.done return f.err } -func (f *fakeProcess) Kill() (int, error) { +func (f *fakeProcess) Kill(_ bool) (int, error) { close(f.done) return f.pid, f.err } @@ -77,8 +78,8 @@ func TestBadLogger(t *testing.T) { fp := newFakeProcess() tt := New("foo", "bar", nil, nil, func() io.WriteCloser { defer func() { - fp.pid = 123 // sanity check - fp.Kill() // this causes Wait() to return + fp.pid = 123 // sanity check + fp.Kill(false) // this causes Wait() to return }() return &badWriteCloser{err} }) @@ -220,3 +221,73 @@ func TestMergeOutput(t *testing.T) { log.Infoln("waiting for merge to complete") <-te.Done() // wait for the merge to complete } + +func TestAfterDeath(t *testing.T) { + // test kill escalation since that's not covered by other unit tests + t1 := New("foo", "", nil, nil, devNull) + kills := 0 + waitCh := make(chan *Completion, 1) + t1.killFunc = func(force bool) (int, error) { + // > 0 is intentional, multiple calls to close() should panic + if kills > 0 { + assert.True(t, force) + waitCh <- &Completion{name: t1.name, code: 123} + close(waitCh) + } else { + assert.False(t, force) + } + kills++ + return 0, nil + } + wr := t1.awaitDeath(0, waitCh) + assert.Equal(t, "foo", wr.name) + assert.Equal(t, 123, wr.code) + assert.NoError(t, wr.err) + + // test tie between shouldQuit and waitCh + waitCh = make(chan *Completion, 1) + waitCh <- &Completion{name: t1.name, code: 456} + close(waitCh) + t1.killFunc = func(force bool) (int, error) { + t.Fatalf("should not attempt to kill a task that has already reported completion") + return 0, nil + } + wr = t1.awaitDeath(0, waitCh) + assert.Equal(t, 456, wr.code) + assert.NoError(t, wr.err) + + // test delayed killFunc failure + kills = 0 + killFailed := errors.New("for some reason kill failed") + t1.killFunc = func(force bool) (int, error) { + // > 0 is intentional, multiple calls to close() should panic + if kills > 0 { + assert.True(t, force) + return -1, killFailed + } else { + assert.False(t, force) + } + kills++ + return 0, nil + } + wr = t1.awaitDeath(0, nil) + assert.Equal(t, "foo", wr.name) + assert.Error(t, wr.err) + + // test initial killFunc failure + kills = 0 + t1.killFunc = func(force bool) (int, error) { + // > 0 is intentional, multiple calls to close() should panic + if kills > 0 { + assert.True(t, force) + t.Fatalf("killFunc should only be invoked once, not again after is has already failed") + } else { + assert.False(t, force) + } + kills++ + return 0, killFailed + } + wr = t1.awaitDeath(0, nil) + assert.Equal(t, "foo", wr.name) + assert.Error(t, wr.err) +}