From a5fa5673e217a80c7d0948314e21da8215df7022 Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Sat, 27 Jun 2015 23:25:16 +0000 Subject: [PATCH] rewrite Process implementation from scratch, unskip previously failing test: - fix races in unit tests - fix flaky singleActionEndsProcess test; further simplify Process impl - fix flakey error handling in processAdapter - eliminate time-based test assertions --- contrib/mesos/pkg/proc/adapter.go | 89 ++++++ contrib/mesos/pkg/proc/errors.go | 51 ++++ contrib/mesos/pkg/proc/once.go | 91 ++++++ contrib/mesos/pkg/proc/proc.go | 413 ++++++++-------------------- contrib/mesos/pkg/proc/proc_test.go | 216 +++++++-------- contrib/mesos/pkg/proc/types.go | 19 +- 6 files changed, 468 insertions(+), 411 deletions(-) create mode 100644 contrib/mesos/pkg/proc/adapter.go create mode 100644 contrib/mesos/pkg/proc/once.go diff --git a/contrib/mesos/pkg/proc/adapter.go b/contrib/mesos/pkg/proc/adapter.go new file mode 100644 index 00000000000..e08853ed310 --- /dev/null +++ b/contrib/mesos/pkg/proc/adapter.go @@ -0,0 +1,89 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proc + +import ( + "fmt" +) + +type processAdapter struct { + Process + delegate Doer +} + +// reportAnyError waits for an error to arrive from source, or for the process to end; +// errors are reported through errOnce. returns true if an error is reported through +// errOnce, otherwise false. +func (p *processAdapter) reportAnyError(source <-chan error, errOnce ErrorOnce) bool { + select { + case err, ok := <-source: + if ok && err != nil { + // failed to schedule/execute the action + errOnce.Report(err) + return true + } + // action was scheduled/executed just fine. + case <-p.Done(): + // double-check that there's no errror waiting for us in source + select { + case err, ok := <-source: + if ok { + // parent failed to schedule/execute the action + errOnce.Report(err) + return true + } + default: + } + errOnce.Report(errProcessTerminated) + return true + } + return false +} + +func (p *processAdapter) Do(a Action) <-chan error { + errCh := NewErrorOnce(p.Done()) + go func() { + ch := NewErrorOnce(p.Done()) + errOuter := p.Process.Do(func() { + errInner := p.delegate.Do(a) + ch.forward(errInner) + }) + // order is important here: check errOuter before ch + if p.reportAnyError(errOuter, errCh) { + return + } + if !p.reportAnyError(ch.Err(), errCh) { + errCh.Report(nil) + } + }() + return errCh.Err() +} + +// DoWith returns a process that, within its execution context, delegates to the specified Doer. +// Expect a panic if either the given Process or Doer are nil. +func DoWith(other Process, d Doer) Process { + if other == nil { + panic(fmt.Sprintf("cannot DoWith a nil process")) + } + if d == nil { + panic(fmt.Sprintf("cannot DoWith a nil doer")) + } + return &processAdapter{ + Process: other, + delegate: d, + } +} diff --git a/contrib/mesos/pkg/proc/errors.go b/contrib/mesos/pkg/proc/errors.go index c7fe0f442e6..229b71b5017 100644 --- a/contrib/mesos/pkg/proc/errors.go +++ b/contrib/mesos/pkg/proc/errors.go @@ -18,13 +18,24 @@ package proc import ( "errors" + "fmt" + + "k8s.io/kubernetes/contrib/mesos/pkg/runtime" ) var ( errProcessTerminated = errors.New("cannot execute action because process has terminated") errIllegalState = errors.New("illegal state, cannot execute action") + + closedErrChan <-chan error // singleton chan that's always closed ) +func init() { + ch := make(chan error) + close(ch) + closedErrChan = ch +} + func IsProcessTerminated(err error) bool { return err == errProcessTerminated } @@ -32,3 +43,43 @@ func IsProcessTerminated(err error) bool { func IsIllegalState(err error) bool { return err == errIllegalState } + +// OnError spawns a goroutine that waits for an error. if a non-nil error is read from +// the channel then the handler func is invoked, otherwise (nil error or closed chan) +// the handler is skipped. if a nil handler is specified then it's not invoked. +// the signal chan that's returned closes once the error process logic (and handler, +// if any) has completed. +func OnError(ch <-chan error, f func(error), abort <-chan struct{}) <-chan struct{} { + return runtime.After(func() { + if ch == nil { + return + } + select { + case err, ok := <-ch: + if ok && err != nil && f != nil { + f(err) + } + case <-abort: + if f != nil { + f(errProcessTerminated) + } + } + }) +} + +// ErrorChanf is a convenience func that returns a chan that yields an error +// generated from the given msg format and args. +func ErrorChanf(msg string, args ...interface{}) <-chan error { + return ErrorChan(fmt.Errorf(msg, args...)) +} + +// ErrorChan is a convenience func that returns a chan that yields the given error. +// If err is nil then a closed chan is returned. +func ErrorChan(err error) <-chan error { + if err == nil { + return closedErrChan + } + ch := make(chan error, 1) + ch <- err + return ch +} diff --git a/contrib/mesos/pkg/proc/once.go b/contrib/mesos/pkg/proc/once.go new file mode 100644 index 00000000000..09993659f73 --- /dev/null +++ b/contrib/mesos/pkg/proc/once.go @@ -0,0 +1,91 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proc + +import ( + "fmt" + "sync" + "time" +) + +type errorOnce struct { + once sync.Once + err chan error + abort <-chan struct{} +} + +// NewErrorOnce creates an ErrorOnce that aborts blocking func calls once +// the given abort chan has closed. +func NewErrorOnce(abort <-chan struct{}) ErrorOnce { + return &errorOnce{ + err: make(chan error, 1), + abort: abort, + } +} + +func (b *errorOnce) Err() <-chan error { + return b.err +} + +func (b *errorOnce) Reportf(msg string, args ...interface{}) { + b.Report(fmt.Errorf(msg, args...)) +} + +func (b *errorOnce) Report(err error) { + b.once.Do(func() { + select { + case b.err <- err: + default: + } + }) +} + +func (b *errorOnce) Send(errIn <-chan error) ErrorOnce { + go b.forward(errIn) + return b +} + +func (b *errorOnce) forward(errIn <-chan error) { + if errIn == nil { + b.Report(nil) + return + } + select { + case err, _ := <-errIn: + b.Report(err) + case <-b.abort: + // double-check that errIn was blocked: don't falsely return + // errProcessTerminated if errIn was really ready + select { + case err, _ := <-errIn: + b.Report(err) + default: + b.Report(errProcessTerminated) + } + } +} + +func (b *errorOnce) WaitFor(d time.Duration) (error, bool) { + t := time.NewTimer(d) + select { + case err, _ := <-b.err: + t.Stop() + return err, true + case <-t.C: + return nil, false + } +} diff --git a/contrib/mesos/pkg/proc/proc.go b/contrib/mesos/pkg/proc/proc.go index 8a47948e6ca..47c85922b46 100644 --- a/contrib/mesos/pkg/proc/proc.go +++ b/contrib/mesos/pkg/proc/proc.go @@ -17,351 +17,174 @@ limitations under the License. package proc import ( - "fmt" "sync" - "sync/atomic" - "time" - log "github.com/golang/glog" - "k8s.io/kubernetes/contrib/mesos/pkg/runtime" + "k8s.io/kubernetes/pkg/util" ) const ( - // if the action processor crashes (if some Action panics) then we - // wait this long before spinning up the action processor again. - defaultActionHandlerCrashDelay = 100 * time.Millisecond - // how many actions we can store in the backlog defaultActionQueueDepth = 1024 - - // wait this long before re-attempting to enqueue an action into - // the scheduling backlog - defaultMaxRescheduleWait = 5 * time.Millisecond ) -type procImpl struct { - Config - backlog chan Action // action queue - terminate chan struct{} // signaled via close() - wg sync.WaitGroup // End() terminates when the wait is over - done runtime.Signal - state *stateType - pid uint32 - writeLock sync.Mutex // avoid data race between write and close of backlog - changed *sync.Cond // wait/signal for backlog changes - engine DoerFunc // isolated this for easier unit testing later on - running chan struct{} // closes once event loop processing starts - dead chan struct{} // closes upon completion of process termination -} - type Config struct { - // cooldown period in between deferred action crashes - actionHandlerCrashDelay time.Duration - // determines the size of the deferred action backlog actionQueueDepth uint32 - - // wait this long before re-attempting to enqueue an action into - // the scheduling backlog - maxRescheduleWait time.Duration } var ( // default process configuration, used in the creation of all new processes defaultConfig = Config{ - actionHandlerCrashDelay: defaultActionHandlerCrashDelay, - actionQueueDepth: defaultActionQueueDepth, - maxRescheduleWait: defaultMaxRescheduleWait, + actionQueueDepth: defaultActionQueueDepth, } - pid uint32 // global pid counter - closedErrChan <-chan error // singleton chan that's always closed ) -func init() { - ch := make(chan error) - close(ch) - closedErrChan = ch +type scheduledAction struct { + action Action + errCh chan error +} + +type processState struct { + actions chan *scheduledAction // scheduled action backlog + running chan struct{} // closes upon start of action backlog processing + terminated chan struct{} // closes upon termination of run() + doer Doer // delegate that schedules actions + guardDoer sync.RWMutex // protect doer + end chan struct{} // closes upon invocation of End() + closeEnd sync.Once // guard: only close end chan once + nextAction func() (*scheduledAction, bool) // return false if actions queue is closed } func New() Process { return newConfigured(defaultConfig) } -func newConfigured(config Config) Process { - state := stateNew - pi := &procImpl{ - Config: config, - backlog: make(chan Action, config.actionQueueDepth), - terminate: make(chan struct{}), - state: &state, - pid: atomic.AddUint32(&pid, 1), - running: make(chan struct{}), - dead: make(chan struct{}), +func newConfigured(c Config) Process { + ps := &processState{ + actions: make(chan *scheduledAction, c.actionQueueDepth), + running: make(chan struct{}), + terminated: make(chan struct{}), + end: make(chan struct{}), } - pi.engine = DoerFunc(pi.doLater) - pi.changed = sync.NewCond(&pi.writeLock) - pi.wg.Add(1) // symmetrical to wg.Done() in End() - pi.done = pi.begin() - return pi + ps.doer = DoerFunc(ps.defaultDoer) + go ps.run() + return ps } -// returns a chan that closes upon termination of the action processing loop -func (self *procImpl) Done() <-chan struct{} { - return self.done -} +type stateFn func(*processState, *scheduledAction) stateFn -func (self *procImpl) Running() <-chan struct{} { - return self.running -} - -func (self *procImpl) begin() runtime.Signal { - if !self.state.transition(stateNew, stateRunning) { - panic(fmt.Errorf("failed to transition from New to Idle state")) +func stateRun(ps *processState, a *scheduledAction) stateFn { + // it's only possible to ever receive this once because we transition + // to state "shutdown", permanently + if a == nil { + ps.shutdown() + return stateShutdown } - defer log.V(2).Infof("started process %d", self.pid) - var entered runtime.Latch - // execute actions on the backlog chan - return runtime.After(func() { - runtime.Until(func() { - if entered.Acquire() { - close(self.running) - self.wg.Add(1) - } - for action := range self.backlog { - select { - case <-self.terminate: - return - default: - // signal to indicate there's room in the backlog now - self.changed.Broadcast() - // rely on Until to handle action panics - action() - } - } - }, self.actionHandlerCrashDelay, self.terminate) - }).Then(func() { - log.V(2).Infof("finished processing action backlog for process %d", self.pid) - if !entered.Acquire() { - self.wg.Done() - } + close(a.errCh) // signal that action was scheduled + func() { + // we don't trust clients of this package + defer util.HandleCrash() + a.action() + }() + return stateRun +} + +func (ps *processState) shutdown() { + // all future attemps to schedule actions must fail immediately + ps.guardDoer.Lock() + ps.doer = DoerFunc(func(_ Action) <-chan error { + return ErrorChan(errProcessTerminated) }) -} + ps.guardDoer.Unlock() -// execute some action in the context of the current process. Actions -// executed via this func are to be executed in a concurrency-safe manner: -// no two actions should execute at the same time. invocations of this func -// should not block for very long, unless the action backlog is full or the -// process is terminating. -// returns errProcessTerminated if the process already ended. -func (self *procImpl) doLater(deferredAction Action) (err <-chan error) { - a := Action(func() { - self.wg.Add(1) - defer self.wg.Done() - deferredAction() - }) + // no more actions may be scheduled + close(ps.actions) - scheduled := false - self.writeLock.Lock() - defer self.writeLock.Unlock() - - var timer *time.Timer - for err == nil && !scheduled { - switch s := self.state.get(); s { - case stateRunning: - select { - case self.backlog <- a: - scheduled = true - default: - if timer == nil { - timer = time.AfterFunc(self.maxRescheduleWait, self.changed.Broadcast) - } else { - timer.Reset(self.maxRescheduleWait) - } - self.changed.Wait() - timer.Stop() - } - case stateTerminal: - err = ErrorChan(errProcessTerminated) - default: - err = ErrorChan(errIllegalState) - } - } - return -} - -// implementation of Doer interface, schedules some action to be executed via -// the current execution engine -func (self *procImpl) Do(a Action) <-chan error { - return self.engine(a) -} - -// spawn a goroutine that waits for an error. if a non-nil error is read from the -// channel then the handler func is invoked, otherwise (nil error or closed chan) -// the handler is skipped. if a nil handler is specified then it's not invoked. -// the signal chan that's returned closes once the error process logic (and handler, -// if any) has completed. -func OnError(ch <-chan error, f func(error), abort <-chan struct{}) <-chan struct{} { - return runtime.After(func() { - if ch == nil { - return - } - select { - case err, ok := <-ch: - if ok && err != nil && f != nil { - f(err) - } - case <-abort: - if f != nil { - f(errProcessTerminated) - } - } - }) -} - -func (self *procImpl) OnError(ch <-chan error, f func(error)) <-chan struct{} { - return OnError(ch, f, self.Done()) -} - -func (self *procImpl) flush() { - log.V(2).Infof("flushing action backlog for process %d", self.pid) - i := 0 - //TODO: replace with `for range self.backlog` once Go 1.3 support is dropped - for { - _, open := <-self.backlog - if !open { - break - } - i++ - } - log.V(2).Infof("flushed %d backlog actions for process %d", i, self.pid) -} - -func (self *procImpl) End() <-chan struct{} { - if self.state.transitionTo(stateTerminal, stateTerminal) { - go func() { - defer close(self.dead) - self.writeLock.Lock() - defer self.writeLock.Unlock() - - log.V(2).Infof("terminating process %d", self.pid) - - close(self.backlog) - close(self.terminate) - self.wg.Done() - self.changed.Broadcast() - - log.V(2).Infof("waiting for deferred actions to complete") - - // wait for all pending actions to complete, then flush the backlog - self.wg.Wait() - self.flush() - }() - } - return self.dead -} - -type errorOnce struct { - once sync.Once - err chan error - abort <-chan struct{} -} - -func NewErrorOnce(abort <-chan struct{}) ErrorOnce { - return &errorOnce{ - err: make(chan error, 1), - abort: abort, - } -} - -func (b *errorOnce) Err() <-chan error { - return b.err -} - -func (b *errorOnce) Reportf(msg string, args ...interface{}) { - b.Report(fmt.Errorf(msg, args...)) -} - -func (b *errorOnce) Report(err error) { - b.once.Do(func() { - select { - case b.err <- err: - default: - } - }) -} - -func (b *errorOnce) Send(errIn <-chan error) ErrorOnce { - go b.forward(errIn) - return b -} - -func (b *errorOnce) forward(errIn <-chan error) { - if errIn == nil { - b.Report(nil) + // no need to check ps.end anymore + ps.nextAction = func() (a *scheduledAction, ok bool) { + a, ok = <-ps.actions return } - select { - case err, _ := <-errIn: - b.Report(err) - case <-b.abort: - b.Report(errProcessTerminated) +} + +// stateShutdown doesn't run any actions because the process is shutting down. +// instead it clears the action backlog. newly scheduled actions are rejected. +func stateShutdown(ps *processState, a *scheduledAction) stateFn { + if a != nil { + a.errCh <- errProcessTerminated } + return stateShutdown } -type processAdapter struct { - Process - delegate Doer -} +func (ps *processState) run() { + defer close(ps.terminated) + close(ps.running) -func (p *processAdapter) Do(a Action) <-chan error { - errCh := NewErrorOnce(p.Done()) - go func() { - errOuter := p.Process.Do(func() { - errInner := p.delegate.Do(a) - errCh.forward(errInner) - }) - // if the outer err is !nil then either the parent Process failed to schedule the - // the action, or else it backgrounded the scheduling task. - if errOuter != nil { - errCh.forward(errOuter) + // main state machine loop: process actions as they come, + // updating the state func along the way. + f := stateRun + ps.nextAction = func() (a *scheduledAction, ok bool) { + // if we successfully read from ps.end, we don't know if the + // actions queue is closed. assume it's not: the state machine + // shouldn't terminate yet. + // also, give preference to ps.end: we want to avoid processing + // actions if both ps.actions and ps.end are ready + select { + case <-ps.end: + ok = true + default: + select { + case <-ps.end: + ok = true + case a, ok = <-ps.actions: + } } - }() - return errCh.Err() -} - -// DoWith returns a process that, within its execution context, delegates to the specified Doer. -// Expect a panic if either the given Process or Doer are nil. -func DoWith(other Process, d Doer) Process { - if other == nil { - panic(fmt.Sprintf("cannot DoWith a nil process")) + return } - if d == nil { - panic(fmt.Sprintf("cannot DoWith a nil doer")) - } - return &processAdapter{ - Process: other, - delegate: d, + for { + a, ok := ps.nextAction() + if !ok { + return + } + g := f(ps, a) + if g == nil { + panic("state machine stateFn is not allowed to be nil") + } + f = g } } -func ErrorChanf(msg string, args ...interface{}) <-chan error { - return ErrorChan(fmt.Errorf(msg, args...)) +func (ps *processState) Running() <-chan struct{} { + return ps.running } -func ErrorChan(err error) <-chan error { - if err == nil { - return closedErrChan - } +func (ps *processState) Done() <-chan struct{} { + return ps.terminated +} + +func (ps *processState) End() <-chan struct{} { + ps.closeEnd.Do(func() { + close(ps.end) + }) + return ps.terminated +} + +func (ps *processState) Do(a Action) <-chan error { + ps.guardDoer.RLock() + defer ps.guardDoer.RUnlock() + return ps.doer.Do(a) +} + +func (ps *processState) defaultDoer(a Action) <-chan error { ch := make(chan error, 1) - ch <- err + ps.actions <- &scheduledAction{ + action: a, + errCh: ch, + } return ch } -// invoke the f on action a. returns an illegal state error if f is nil. -func (f DoerFunc) Do(a Action) <-chan error { - if f != nil { - return f(a) - } - return ErrorChan(errIllegalState) +func (ps *processState) OnError(ch <-chan error, f func(error)) <-chan struct{} { + return OnError(ch, f, ps.terminated) } diff --git a/contrib/mesos/pkg/proc/proc_test.go b/contrib/mesos/pkg/proc/proc_test.go index 33d536fe162..19dc8bf465a 100644 --- a/contrib/mesos/pkg/proc/proc_test.go +++ b/contrib/mesos/pkg/proc/proc_test.go @@ -26,30 +26,10 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/runtime" ) -// logs a testing.Fatalf if the elapsed time d passes before signal chan done is closed -func fatalAfter(t *testing.T, done <-chan struct{}, d time.Duration, msg string, args ...interface{}) { - select { - case <-done: - case <-time.After(d): - t.Fatalf(msg, args...) - } -} - -func errorAfter(errOnce ErrorOnce, done <-chan struct{}, d time.Duration, msg string, args ...interface{}) { - select { - case <-done: - case <-time.After(d): - //errOnce.Reportf(msg, args...) - panic(fmt.Sprintf(msg, args...)) - } -} - -// logs a testing.Fatalf if the signal chan closes before the elapsed time d passes -func fatalOn(t *testing.T, done <-chan struct{}, d time.Duration, msg string, args ...interface{}) { - select { - case <-done: - t.Fatalf(msg, args...) - case <-time.After(d): +func failOnError(t *testing.T, errOnce ErrorOnce) { + err, _ := <-errOnce.Err() + if err != nil { + t.Errorf("unexpected action scheduling error: %v", err) } } @@ -61,8 +41,8 @@ func TestProc_manyEndings(t *testing.T) { for i := 0; i < COUNT; i++ { runtime.On(p.End(), wg.Done) } - fatalAfter(t, runtime.After(wg.Wait), 5*time.Second, "timed out waiting for loose End()s") - fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death") + wg.Wait() + <-p.Done() } func TestProc_singleAction(t *testing.T) { @@ -70,61 +50,84 @@ func TestProc_singleAction(t *testing.T) { scheduled := make(chan struct{}) called := make(chan struct{}) + errOnce := NewErrorOnce(p.Done()) go func() { log.Infof("do'ing deferred action") defer close(scheduled) - err := p.Do(func() { + errCh := p.Do(func() { defer close(called) log.Infof("deferred action invoked") }) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } + errOnce.Send(errCh) }() - fatalAfter(t, scheduled, 5*time.Second, "timed out waiting for deferred action to be scheduled") - fatalAfter(t, called, 5*time.Second, "timed out waiting for deferred action to be invoked") + failOnError(t, errOnce) + <-scheduled + <-called p.End() - - fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death") + <-p.Done() } -func TestProc_singleActionEnd(t *testing.T) { +func TestProc_singleActionThatPanics(t *testing.T) { p := New() scheduled := make(chan struct{}) called := make(chan struct{}) + errOnce := NewErrorOnce(p.Done()) go func() { log.Infof("do'ing deferred action") defer close(scheduled) - err := p.Do(func() { + errCh := p.Do(func() { + defer close(called) + panic("panicing here") + }) + errOnce.Send(errCh) + }() + + failOnError(t, errOnce) + + <-scheduled + <-called + p.End() + <-p.Done() +} + +func TestProc_singleActionEndsProcess(t *testing.T) { + p := New() + called := make(chan struct{}) + + errOnce := NewErrorOnce(p.Done()) + go func() { + log.Infof("do'ing deferred action") + errCh := p.Do(func() { defer close(called) log.Infof("deferred action invoked") p.End() }) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } + errOnce.Send(errCh) }() - fatalAfter(t, scheduled, 5*time.Second, "timed out waiting for deferred action to be scheduled") - fatalAfter(t, called, 5*time.Second, "timed out waiting for deferred action to be invoked") - fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death") + <-called + + failOnError(t, errOnce) + + <-p.Done() } func TestProc_multiAction(t *testing.T) { p := New() const COUNT = 10 var called sync.WaitGroup - called.Add(COUNT) + called.Add(COUNT * 2) // test FIFO property next := 0 for i := 0; i < COUNT; i++ { log.Infof("do'ing deferred action %d", i) idx := i - err := p.Do(func() { + errOnce := NewErrorOnce(p.Done()) + errCh := p.Do(func() { defer called.Done() log.Infof("deferred action invoked") if next != idx { @@ -132,28 +135,28 @@ func TestProc_multiAction(t *testing.T) { } next++ }) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } + errOnce.Send(errCh) + go func() { + defer called.Done() + failOnError(t, errOnce) + }() } - fatalAfter(t, runtime.After(called.Wait), 2*time.Second, "timed out waiting for deferred actions to be invoked") - + called.Wait() p.End() - - fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death") + <-p.Done() } func TestProc_goodLifecycle(t *testing.T) { p := New() p.End() - fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death") + <-p.Done() } func TestProc_doWithDeadProc(t *testing.T) { p := New() p.End() - time.Sleep(100 * time.Millisecond) + <-p.Done() errUnexpected := fmt.Errorf("unexpected execution of delegated action") decorated := DoWith(p, DoerFunc(func(_ Action) <-chan error { @@ -161,7 +164,7 @@ func TestProc_doWithDeadProc(t *testing.T) { })) decorated.Do(func() {}) - fatalAfter(t, decorated.Done(), 5*time.Second, "timed out waiting for process death") + <-decorated.Done() } func TestProc_doWith(t *testing.T) { @@ -185,13 +188,12 @@ func TestProc_doWith(t *testing.T) { t.Fatalf("expected !nil error chan") } - fatalAfter(t, executed, 5*time.Second, "timed out waiting deferred execution") - fatalAfter(t, decorated.OnError(err, func(e error) { - t.Fatalf("unexpected error: %v", err) - }), 1*time.Second, "timed out waiting for doer result") - + <-executed + <-decorated.OnError(err, func(e error) { + t.Errorf("unexpected error: %v", err) + }) decorated.End() - fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death") + <-p.Done() } func TestProc_doWithNestedTwice(t *testing.T) { @@ -220,13 +222,14 @@ func TestProc_doWithNestedTwice(t *testing.T) { t.Fatalf("expected !nil error chan") } - fatalAfter(t, executed, 5*time.Second, "timed out waiting deferred execution") - fatalAfter(t, decorated2.OnError(err, func(e error) { - t.Fatalf("unexpected error: %v", err) - }), 1*time.Second, "timed out waiting for doer result") + <-executed + + <-decorated2.OnError(err, func(e error) { + t.Errorf("unexpected error: %v", err) + }) decorated2.End() - fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death") + <-p.Done() } func TestProc_doWithNestedErrorPropagation(t *testing.T) { @@ -248,37 +251,40 @@ func TestProc_doWithNestedErrorPropagation(t *testing.T) { })) executed := make(chan struct{}) - err := decorated2.Do(func() { + errCh := decorated2.Do(func() { defer close(executed) if !delegated { t.Fatalf("expected delegated execution") } errOnce.Report(expectedErr) }) - if err == nil { + if errCh == nil { t.Fatalf("expected !nil error chan") } - errOnce.Send(err) + errOnce.Send(errCh) foundError := false - fatalAfter(t, executed, 1*time.Second, "timed out waiting deferred execution") - fatalAfter(t, decorated2.OnError(errOnce.Err(), func(e error) { + <-executed + + <-decorated2.OnError(errOnce.Err(), func(e error) { if e != expectedErr { - t.Fatalf("unexpected error: %v", err) + t.Errorf("unexpected error: %v", e) } else { foundError = true } - }), 1*time.Second, "timed out waiting for doer result") + }) + // this has been flaky in the past. recent changes to error handling in + // processAdapter.Do should have fixed it. if !foundError { t.Fatalf("expected a propagated error") } decorated2.End() - fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death") + <-p.Done() } -func runDelegationTest(t *testing.T, p Process, name string, errOnce ErrorOnce, timeout time.Duration) { +func runDelegationTest(t *testing.T, p Process, name string, errOnce ErrorOnce) { t.Logf("starting test case " + name + " at " + time.Now().String()) defer func() { t.Logf("runDelegationTest finished at " + time.Now().String()) @@ -325,7 +331,7 @@ func runDelegationTest(t *testing.T, p Process, name string, errOnce ErrorOnce, // from errCh after this point errOnce.Send(errCh) - errorAfter(errOnce, executed, timeout, "timed out waiting deferred execution of "+name) + <-executed t.Logf("runDelegationTest received executed signal at " + time.Now().String()) } @@ -333,56 +339,41 @@ func TestProc_doWithNestedX(t *testing.T) { t.Logf("starting test case at " + time.Now().String()) p := New() errOnce := NewErrorOnce(p.Done()) - timeout := 5 * time.Second - runDelegationTest(t, p, "nested", errOnce, timeout) + runDelegationTest(t, p, "nested", errOnce) <-p.End() - select { - case err := <-errOnce.Err(): - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - case <-time.After(2 * timeout): - t.Fatalf("timed out waiting for doer result") + + err, _ := <-errOnce.Err() + if err != nil { + t.Fatalf("unexpected error: %v", err) } - fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death") + + <-p.Done() } -// TODO(jdef): find a way to test this without killing CI builds. // intended to be run with -race func TestProc_doWithNestedXConcurrent(t *testing.T) { - t.Skip("disabled for causing CI timeouts.") config := defaultConfig - config.actionQueueDepth = 0 + config.actionQueueDepth = 4000 p := newConfigured(config) var wg sync.WaitGroup const CONC = 20 wg.Add(CONC) - // this test spins up TONS of goroutines that can take a little while to execute on a busy - // CI server. drawing the line at 10s because I've never seen it take anywhere near that long. - timeout := 10 * time.Second - for i := 0; i < CONC; i++ { i := i errOnce := NewErrorOnce(p.Done()) - runtime.After(func() { runDelegationTest(t, p, fmt.Sprintf("nested%d", i), errOnce, timeout) }).Then(wg.Done) + runtime.After(func() { runDelegationTest(t, p, fmt.Sprintf("nested%d", i), errOnce) }).Then(wg.Done) go func() { - select { - case err := <-errOnce.Err(): - if err != nil { - t.Fatalf("delegate %d: unexpected error: %v", i, err) - } - case <-time.After(2 * timeout): - t.Fatalf("delegate %d: timed out waiting for doer result", i) + err, _ := <-errOnce.Err() + if err != nil { + t.Fatalf("delegate %d: unexpected error: %v", i, err) } }() } - ch := runtime.After(wg.Wait) - fatalAfter(t, ch, 2*timeout, "timed out waiting for concurrent delegates") - + wg.Wait() <-p.End() - fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death") + <-p.Done() } func TestProcWithExceededActionQueueDepth(t *testing.T) { @@ -391,16 +382,13 @@ func TestProcWithExceededActionQueueDepth(t *testing.T) { p := newConfigured(config) errOnce := NewErrorOnce(p.Done()) - timeout := 5 * time.Second - runDelegationTest(t, p, "nested", errOnce, timeout) + runDelegationTest(t, p, "nested", errOnce) <-p.End() - select { - case err := <-errOnce.Err(): - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - case <-time.After(2 * timeout): - t.Fatalf("timed out waiting for doer result") + + err, _ := <-errOnce.Err() + if err != nil { + t.Fatalf("unexpected error: %v", err) } - fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death") + + <-p.Done() } diff --git a/contrib/mesos/pkg/proc/types.go b/contrib/mesos/pkg/proc/types.go index 3ab769f2fae..27492f1c8ad 100644 --- a/contrib/mesos/pkg/proc/types.go +++ b/contrib/mesos/pkg/proc/types.go @@ -16,7 +16,11 @@ limitations under the License. package proc -// something that executes in the context of a process +import ( + "time" +) + +// Action is something that executes in the context of a process type Action func() type Context interface { @@ -36,9 +40,17 @@ type Doer interface { Do(Action) <-chan error } -// adapter func for Doer interface +// DoerFunc is an adapter func for Doer interface type DoerFunc func(Action) <-chan error +// invoke the f on action a. returns an illegal state error if f is nil. +func (f DoerFunc) Do(a Action) <-chan error { + if f != nil { + return f(a) + } + return ErrorChan(errIllegalState) +} + type Process interface { Context Doer @@ -71,4 +83,7 @@ type ErrorOnce interface { // Send is non-blocking; it spins up a goroutine that reports an error (if any) that occurs on the error chan. Send(<-chan error) ErrorOnce + + // WaitFor returns true if an error is received within the specified duration, otherwise false + WaitFor(time.Duration) (error, bool) }