rewrite Process implementation from scratch, unskip previously failing test:

- fix races in unit tests
- fix flaky singleActionEndsProcess test; further simplify Process impl
- fix flakey error handling in processAdapter
- eliminate time-based test assertions
This commit is contained in:
James DeFelice 2015-06-27 23:25:16 +00:00
parent a078eba5b3
commit a5fa5673e2
6 changed files with 468 additions and 411 deletions

View File

@ -0,0 +1,89 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proc
import (
"fmt"
)
type processAdapter struct {
Process
delegate Doer
}
// reportAnyError waits for an error to arrive from source, or for the process to end;
// errors are reported through errOnce. returns true if an error is reported through
// errOnce, otherwise false.
func (p *processAdapter) reportAnyError(source <-chan error, errOnce ErrorOnce) bool {
select {
case err, ok := <-source:
if ok && err != nil {
// failed to schedule/execute the action
errOnce.Report(err)
return true
}
// action was scheduled/executed just fine.
case <-p.Done():
// double-check that there's no errror waiting for us in source
select {
case err, ok := <-source:
if ok {
// parent failed to schedule/execute the action
errOnce.Report(err)
return true
}
default:
}
errOnce.Report(errProcessTerminated)
return true
}
return false
}
func (p *processAdapter) Do(a Action) <-chan error {
errCh := NewErrorOnce(p.Done())
go func() {
ch := NewErrorOnce(p.Done())
errOuter := p.Process.Do(func() {
errInner := p.delegate.Do(a)
ch.forward(errInner)
})
// order is important here: check errOuter before ch
if p.reportAnyError(errOuter, errCh) {
return
}
if !p.reportAnyError(ch.Err(), errCh) {
errCh.Report(nil)
}
}()
return errCh.Err()
}
// DoWith returns a process that, within its execution context, delegates to the specified Doer.
// Expect a panic if either the given Process or Doer are nil.
func DoWith(other Process, d Doer) Process {
if other == nil {
panic(fmt.Sprintf("cannot DoWith a nil process"))
}
if d == nil {
panic(fmt.Sprintf("cannot DoWith a nil doer"))
}
return &processAdapter{
Process: other,
delegate: d,
}
}

View File

@ -18,13 +18,24 @@ package proc
import (
"errors"
"fmt"
"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
)
var (
errProcessTerminated = errors.New("cannot execute action because process has terminated")
errIllegalState = errors.New("illegal state, cannot execute action")
closedErrChan <-chan error // singleton chan that's always closed
)
func init() {
ch := make(chan error)
close(ch)
closedErrChan = ch
}
func IsProcessTerminated(err error) bool {
return err == errProcessTerminated
}
@ -32,3 +43,43 @@ func IsProcessTerminated(err error) bool {
func IsIllegalState(err error) bool {
return err == errIllegalState
}
// OnError spawns a goroutine that waits for an error. if a non-nil error is read from
// the channel then the handler func is invoked, otherwise (nil error or closed chan)
// the handler is skipped. if a nil handler is specified then it's not invoked.
// the signal chan that's returned closes once the error process logic (and handler,
// if any) has completed.
func OnError(ch <-chan error, f func(error), abort <-chan struct{}) <-chan struct{} {
return runtime.After(func() {
if ch == nil {
return
}
select {
case err, ok := <-ch:
if ok && err != nil && f != nil {
f(err)
}
case <-abort:
if f != nil {
f(errProcessTerminated)
}
}
})
}
// ErrorChanf is a convenience func that returns a chan that yields an error
// generated from the given msg format and args.
func ErrorChanf(msg string, args ...interface{}) <-chan error {
return ErrorChan(fmt.Errorf(msg, args...))
}
// ErrorChan is a convenience func that returns a chan that yields the given error.
// If err is nil then a closed chan is returned.
func ErrorChan(err error) <-chan error {
if err == nil {
return closedErrChan
}
ch := make(chan error, 1)
ch <- err
return ch
}

View File

@ -0,0 +1,91 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proc
import (
"fmt"
"sync"
"time"
)
type errorOnce struct {
once sync.Once
err chan error
abort <-chan struct{}
}
// NewErrorOnce creates an ErrorOnce that aborts blocking func calls once
// the given abort chan has closed.
func NewErrorOnce(abort <-chan struct{}) ErrorOnce {
return &errorOnce{
err: make(chan error, 1),
abort: abort,
}
}
func (b *errorOnce) Err() <-chan error {
return b.err
}
func (b *errorOnce) Reportf(msg string, args ...interface{}) {
b.Report(fmt.Errorf(msg, args...))
}
func (b *errorOnce) Report(err error) {
b.once.Do(func() {
select {
case b.err <- err:
default:
}
})
}
func (b *errorOnce) Send(errIn <-chan error) ErrorOnce {
go b.forward(errIn)
return b
}
func (b *errorOnce) forward(errIn <-chan error) {
if errIn == nil {
b.Report(nil)
return
}
select {
case err, _ := <-errIn:
b.Report(err)
case <-b.abort:
// double-check that errIn was blocked: don't falsely return
// errProcessTerminated if errIn was really ready
select {
case err, _ := <-errIn:
b.Report(err)
default:
b.Report(errProcessTerminated)
}
}
}
func (b *errorOnce) WaitFor(d time.Duration) (error, bool) {
t := time.NewTimer(d)
select {
case err, _ := <-b.err:
t.Stop()
return err, true
case <-t.C:
return nil, false
}
}

View File

@ -17,351 +17,174 @@ limitations under the License.
package proc
import (
"fmt"
"sync"
"sync/atomic"
"time"
log "github.com/golang/glog"
"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
)
const (
// if the action processor crashes (if some Action panics) then we
// wait this long before spinning up the action processor again.
defaultActionHandlerCrashDelay = 100 * time.Millisecond
// how many actions we can store in the backlog
defaultActionQueueDepth = 1024
// wait this long before re-attempting to enqueue an action into
// the scheduling backlog
defaultMaxRescheduleWait = 5 * time.Millisecond
)
type procImpl struct {
Config
backlog chan Action // action queue
terminate chan struct{} // signaled via close()
wg sync.WaitGroup // End() terminates when the wait is over
done runtime.Signal
state *stateType
pid uint32
writeLock sync.Mutex // avoid data race between write and close of backlog
changed *sync.Cond // wait/signal for backlog changes
engine DoerFunc // isolated this for easier unit testing later on
running chan struct{} // closes once event loop processing starts
dead chan struct{} // closes upon completion of process termination
}
type Config struct {
// cooldown period in between deferred action crashes
actionHandlerCrashDelay time.Duration
// determines the size of the deferred action backlog
actionQueueDepth uint32
// wait this long before re-attempting to enqueue an action into
// the scheduling backlog
maxRescheduleWait time.Duration
}
var (
// default process configuration, used in the creation of all new processes
defaultConfig = Config{
actionHandlerCrashDelay: defaultActionHandlerCrashDelay,
actionQueueDepth: defaultActionQueueDepth,
maxRescheduleWait: defaultMaxRescheduleWait,
actionQueueDepth: defaultActionQueueDepth,
}
pid uint32 // global pid counter
closedErrChan <-chan error // singleton chan that's always closed
)
func init() {
ch := make(chan error)
close(ch)
closedErrChan = ch
type scheduledAction struct {
action Action
errCh chan error
}
type processState struct {
actions chan *scheduledAction // scheduled action backlog
running chan struct{} // closes upon start of action backlog processing
terminated chan struct{} // closes upon termination of run()
doer Doer // delegate that schedules actions
guardDoer sync.RWMutex // protect doer
end chan struct{} // closes upon invocation of End()
closeEnd sync.Once // guard: only close end chan once
nextAction func() (*scheduledAction, bool) // return false if actions queue is closed
}
func New() Process {
return newConfigured(defaultConfig)
}
func newConfigured(config Config) Process {
state := stateNew
pi := &procImpl{
Config: config,
backlog: make(chan Action, config.actionQueueDepth),
terminate: make(chan struct{}),
state: &state,
pid: atomic.AddUint32(&pid, 1),
running: make(chan struct{}),
dead: make(chan struct{}),
func newConfigured(c Config) Process {
ps := &processState{
actions: make(chan *scheduledAction, c.actionQueueDepth),
running: make(chan struct{}),
terminated: make(chan struct{}),
end: make(chan struct{}),
}
pi.engine = DoerFunc(pi.doLater)
pi.changed = sync.NewCond(&pi.writeLock)
pi.wg.Add(1) // symmetrical to wg.Done() in End()
pi.done = pi.begin()
return pi
ps.doer = DoerFunc(ps.defaultDoer)
go ps.run()
return ps
}
// returns a chan that closes upon termination of the action processing loop
func (self *procImpl) Done() <-chan struct{} {
return self.done
}
type stateFn func(*processState, *scheduledAction) stateFn
func (self *procImpl) Running() <-chan struct{} {
return self.running
}
func (self *procImpl) begin() runtime.Signal {
if !self.state.transition(stateNew, stateRunning) {
panic(fmt.Errorf("failed to transition from New to Idle state"))
func stateRun(ps *processState, a *scheduledAction) stateFn {
// it's only possible to ever receive this once because we transition
// to state "shutdown", permanently
if a == nil {
ps.shutdown()
return stateShutdown
}
defer log.V(2).Infof("started process %d", self.pid)
var entered runtime.Latch
// execute actions on the backlog chan
return runtime.After(func() {
runtime.Until(func() {
if entered.Acquire() {
close(self.running)
self.wg.Add(1)
}
for action := range self.backlog {
select {
case <-self.terminate:
return
default:
// signal to indicate there's room in the backlog now
self.changed.Broadcast()
// rely on Until to handle action panics
action()
}
}
}, self.actionHandlerCrashDelay, self.terminate)
}).Then(func() {
log.V(2).Infof("finished processing action backlog for process %d", self.pid)
if !entered.Acquire() {
self.wg.Done()
}
close(a.errCh) // signal that action was scheduled
func() {
// we don't trust clients of this package
defer util.HandleCrash()
a.action()
}()
return stateRun
}
func (ps *processState) shutdown() {
// all future attemps to schedule actions must fail immediately
ps.guardDoer.Lock()
ps.doer = DoerFunc(func(_ Action) <-chan error {
return ErrorChan(errProcessTerminated)
})
}
ps.guardDoer.Unlock()
// execute some action in the context of the current process. Actions
// executed via this func are to be executed in a concurrency-safe manner:
// no two actions should execute at the same time. invocations of this func
// should not block for very long, unless the action backlog is full or the
// process is terminating.
// returns errProcessTerminated if the process already ended.
func (self *procImpl) doLater(deferredAction Action) (err <-chan error) {
a := Action(func() {
self.wg.Add(1)
defer self.wg.Done()
deferredAction()
})
// no more actions may be scheduled
close(ps.actions)
scheduled := false
self.writeLock.Lock()
defer self.writeLock.Unlock()
var timer *time.Timer
for err == nil && !scheduled {
switch s := self.state.get(); s {
case stateRunning:
select {
case self.backlog <- a:
scheduled = true
default:
if timer == nil {
timer = time.AfterFunc(self.maxRescheduleWait, self.changed.Broadcast)
} else {
timer.Reset(self.maxRescheduleWait)
}
self.changed.Wait()
timer.Stop()
}
case stateTerminal:
err = ErrorChan(errProcessTerminated)
default:
err = ErrorChan(errIllegalState)
}
}
return
}
// implementation of Doer interface, schedules some action to be executed via
// the current execution engine
func (self *procImpl) Do(a Action) <-chan error {
return self.engine(a)
}
// spawn a goroutine that waits for an error. if a non-nil error is read from the
// channel then the handler func is invoked, otherwise (nil error or closed chan)
// the handler is skipped. if a nil handler is specified then it's not invoked.
// the signal chan that's returned closes once the error process logic (and handler,
// if any) has completed.
func OnError(ch <-chan error, f func(error), abort <-chan struct{}) <-chan struct{} {
return runtime.After(func() {
if ch == nil {
return
}
select {
case err, ok := <-ch:
if ok && err != nil && f != nil {
f(err)
}
case <-abort:
if f != nil {
f(errProcessTerminated)
}
}
})
}
func (self *procImpl) OnError(ch <-chan error, f func(error)) <-chan struct{} {
return OnError(ch, f, self.Done())
}
func (self *procImpl) flush() {
log.V(2).Infof("flushing action backlog for process %d", self.pid)
i := 0
//TODO: replace with `for range self.backlog` once Go 1.3 support is dropped
for {
_, open := <-self.backlog
if !open {
break
}
i++
}
log.V(2).Infof("flushed %d backlog actions for process %d", i, self.pid)
}
func (self *procImpl) End() <-chan struct{} {
if self.state.transitionTo(stateTerminal, stateTerminal) {
go func() {
defer close(self.dead)
self.writeLock.Lock()
defer self.writeLock.Unlock()
log.V(2).Infof("terminating process %d", self.pid)
close(self.backlog)
close(self.terminate)
self.wg.Done()
self.changed.Broadcast()
log.V(2).Infof("waiting for deferred actions to complete")
// wait for all pending actions to complete, then flush the backlog
self.wg.Wait()
self.flush()
}()
}
return self.dead
}
type errorOnce struct {
once sync.Once
err chan error
abort <-chan struct{}
}
func NewErrorOnce(abort <-chan struct{}) ErrorOnce {
return &errorOnce{
err: make(chan error, 1),
abort: abort,
}
}
func (b *errorOnce) Err() <-chan error {
return b.err
}
func (b *errorOnce) Reportf(msg string, args ...interface{}) {
b.Report(fmt.Errorf(msg, args...))
}
func (b *errorOnce) Report(err error) {
b.once.Do(func() {
select {
case b.err <- err:
default:
}
})
}
func (b *errorOnce) Send(errIn <-chan error) ErrorOnce {
go b.forward(errIn)
return b
}
func (b *errorOnce) forward(errIn <-chan error) {
if errIn == nil {
b.Report(nil)
// no need to check ps.end anymore
ps.nextAction = func() (a *scheduledAction, ok bool) {
a, ok = <-ps.actions
return
}
select {
case err, _ := <-errIn:
b.Report(err)
case <-b.abort:
b.Report(errProcessTerminated)
}
// stateShutdown doesn't run any actions because the process is shutting down.
// instead it clears the action backlog. newly scheduled actions are rejected.
func stateShutdown(ps *processState, a *scheduledAction) stateFn {
if a != nil {
a.errCh <- errProcessTerminated
}
return stateShutdown
}
type processAdapter struct {
Process
delegate Doer
}
func (ps *processState) run() {
defer close(ps.terminated)
close(ps.running)
func (p *processAdapter) Do(a Action) <-chan error {
errCh := NewErrorOnce(p.Done())
go func() {
errOuter := p.Process.Do(func() {
errInner := p.delegate.Do(a)
errCh.forward(errInner)
})
// if the outer err is !nil then either the parent Process failed to schedule the
// the action, or else it backgrounded the scheduling task.
if errOuter != nil {
errCh.forward(errOuter)
// main state machine loop: process actions as they come,
// updating the state func along the way.
f := stateRun
ps.nextAction = func() (a *scheduledAction, ok bool) {
// if we successfully read from ps.end, we don't know if the
// actions queue is closed. assume it's not: the state machine
// shouldn't terminate yet.
// also, give preference to ps.end: we want to avoid processing
// actions if both ps.actions and ps.end are ready
select {
case <-ps.end:
ok = true
default:
select {
case <-ps.end:
ok = true
case a, ok = <-ps.actions:
}
}
}()
return errCh.Err()
}
// DoWith returns a process that, within its execution context, delegates to the specified Doer.
// Expect a panic if either the given Process or Doer are nil.
func DoWith(other Process, d Doer) Process {
if other == nil {
panic(fmt.Sprintf("cannot DoWith a nil process"))
return
}
if d == nil {
panic(fmt.Sprintf("cannot DoWith a nil doer"))
}
return &processAdapter{
Process: other,
delegate: d,
for {
a, ok := ps.nextAction()
if !ok {
return
}
g := f(ps, a)
if g == nil {
panic("state machine stateFn is not allowed to be nil")
}
f = g
}
}
func ErrorChanf(msg string, args ...interface{}) <-chan error {
return ErrorChan(fmt.Errorf(msg, args...))
func (ps *processState) Running() <-chan struct{} {
return ps.running
}
func ErrorChan(err error) <-chan error {
if err == nil {
return closedErrChan
}
func (ps *processState) Done() <-chan struct{} {
return ps.terminated
}
func (ps *processState) End() <-chan struct{} {
ps.closeEnd.Do(func() {
close(ps.end)
})
return ps.terminated
}
func (ps *processState) Do(a Action) <-chan error {
ps.guardDoer.RLock()
defer ps.guardDoer.RUnlock()
return ps.doer.Do(a)
}
func (ps *processState) defaultDoer(a Action) <-chan error {
ch := make(chan error, 1)
ch <- err
ps.actions <- &scheduledAction{
action: a,
errCh: ch,
}
return ch
}
// invoke the f on action a. returns an illegal state error if f is nil.
func (f DoerFunc) Do(a Action) <-chan error {
if f != nil {
return f(a)
}
return ErrorChan(errIllegalState)
func (ps *processState) OnError(ch <-chan error, f func(error)) <-chan struct{} {
return OnError(ch, f, ps.terminated)
}

View File

@ -26,30 +26,10 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
)
// logs a testing.Fatalf if the elapsed time d passes before signal chan done is closed
func fatalAfter(t *testing.T, done <-chan struct{}, d time.Duration, msg string, args ...interface{}) {
select {
case <-done:
case <-time.After(d):
t.Fatalf(msg, args...)
}
}
func errorAfter(errOnce ErrorOnce, done <-chan struct{}, d time.Duration, msg string, args ...interface{}) {
select {
case <-done:
case <-time.After(d):
//errOnce.Reportf(msg, args...)
panic(fmt.Sprintf(msg, args...))
}
}
// logs a testing.Fatalf if the signal chan closes before the elapsed time d passes
func fatalOn(t *testing.T, done <-chan struct{}, d time.Duration, msg string, args ...interface{}) {
select {
case <-done:
t.Fatalf(msg, args...)
case <-time.After(d):
func failOnError(t *testing.T, errOnce ErrorOnce) {
err, _ := <-errOnce.Err()
if err != nil {
t.Errorf("unexpected action scheduling error: %v", err)
}
}
@ -61,8 +41,8 @@ func TestProc_manyEndings(t *testing.T) {
for i := 0; i < COUNT; i++ {
runtime.On(p.End(), wg.Done)
}
fatalAfter(t, runtime.After(wg.Wait), 5*time.Second, "timed out waiting for loose End()s")
fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death")
wg.Wait()
<-p.Done()
}
func TestProc_singleAction(t *testing.T) {
@ -70,61 +50,84 @@ func TestProc_singleAction(t *testing.T) {
scheduled := make(chan struct{})
called := make(chan struct{})
errOnce := NewErrorOnce(p.Done())
go func() {
log.Infof("do'ing deferred action")
defer close(scheduled)
err := p.Do(func() {
errCh := p.Do(func() {
defer close(called)
log.Infof("deferred action invoked")
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
errOnce.Send(errCh)
}()
fatalAfter(t, scheduled, 5*time.Second, "timed out waiting for deferred action to be scheduled")
fatalAfter(t, called, 5*time.Second, "timed out waiting for deferred action to be invoked")
failOnError(t, errOnce)
<-scheduled
<-called
p.End()
fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death")
<-p.Done()
}
func TestProc_singleActionEnd(t *testing.T) {
func TestProc_singleActionThatPanics(t *testing.T) {
p := New()
scheduled := make(chan struct{})
called := make(chan struct{})
errOnce := NewErrorOnce(p.Done())
go func() {
log.Infof("do'ing deferred action")
defer close(scheduled)
err := p.Do(func() {
errCh := p.Do(func() {
defer close(called)
panic("panicing here")
})
errOnce.Send(errCh)
}()
failOnError(t, errOnce)
<-scheduled
<-called
p.End()
<-p.Done()
}
func TestProc_singleActionEndsProcess(t *testing.T) {
p := New()
called := make(chan struct{})
errOnce := NewErrorOnce(p.Done())
go func() {
log.Infof("do'ing deferred action")
errCh := p.Do(func() {
defer close(called)
log.Infof("deferred action invoked")
p.End()
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
errOnce.Send(errCh)
}()
fatalAfter(t, scheduled, 5*time.Second, "timed out waiting for deferred action to be scheduled")
fatalAfter(t, called, 5*time.Second, "timed out waiting for deferred action to be invoked")
fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death")
<-called
failOnError(t, errOnce)
<-p.Done()
}
func TestProc_multiAction(t *testing.T) {
p := New()
const COUNT = 10
var called sync.WaitGroup
called.Add(COUNT)
called.Add(COUNT * 2)
// test FIFO property
next := 0
for i := 0; i < COUNT; i++ {
log.Infof("do'ing deferred action %d", i)
idx := i
err := p.Do(func() {
errOnce := NewErrorOnce(p.Done())
errCh := p.Do(func() {
defer called.Done()
log.Infof("deferred action invoked")
if next != idx {
@ -132,28 +135,28 @@ func TestProc_multiAction(t *testing.T) {
}
next++
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
errOnce.Send(errCh)
go func() {
defer called.Done()
failOnError(t, errOnce)
}()
}
fatalAfter(t, runtime.After(called.Wait), 2*time.Second, "timed out waiting for deferred actions to be invoked")
called.Wait()
p.End()
fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death")
<-p.Done()
}
func TestProc_goodLifecycle(t *testing.T) {
p := New()
p.End()
fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death")
<-p.Done()
}
func TestProc_doWithDeadProc(t *testing.T) {
p := New()
p.End()
time.Sleep(100 * time.Millisecond)
<-p.Done()
errUnexpected := fmt.Errorf("unexpected execution of delegated action")
decorated := DoWith(p, DoerFunc(func(_ Action) <-chan error {
@ -161,7 +164,7 @@ func TestProc_doWithDeadProc(t *testing.T) {
}))
decorated.Do(func() {})
fatalAfter(t, decorated.Done(), 5*time.Second, "timed out waiting for process death")
<-decorated.Done()
}
func TestProc_doWith(t *testing.T) {
@ -185,13 +188,12 @@ func TestProc_doWith(t *testing.T) {
t.Fatalf("expected !nil error chan")
}
fatalAfter(t, executed, 5*time.Second, "timed out waiting deferred execution")
fatalAfter(t, decorated.OnError(err, func(e error) {
t.Fatalf("unexpected error: %v", err)
}), 1*time.Second, "timed out waiting for doer result")
<-executed
<-decorated.OnError(err, func(e error) {
t.Errorf("unexpected error: %v", err)
})
decorated.End()
fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death")
<-p.Done()
}
func TestProc_doWithNestedTwice(t *testing.T) {
@ -220,13 +222,14 @@ func TestProc_doWithNestedTwice(t *testing.T) {
t.Fatalf("expected !nil error chan")
}
fatalAfter(t, executed, 5*time.Second, "timed out waiting deferred execution")
fatalAfter(t, decorated2.OnError(err, func(e error) {
t.Fatalf("unexpected error: %v", err)
}), 1*time.Second, "timed out waiting for doer result")
<-executed
<-decorated2.OnError(err, func(e error) {
t.Errorf("unexpected error: %v", err)
})
decorated2.End()
fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death")
<-p.Done()
}
func TestProc_doWithNestedErrorPropagation(t *testing.T) {
@ -248,37 +251,40 @@ func TestProc_doWithNestedErrorPropagation(t *testing.T) {
}))
executed := make(chan struct{})
err := decorated2.Do(func() {
errCh := decorated2.Do(func() {
defer close(executed)
if !delegated {
t.Fatalf("expected delegated execution")
}
errOnce.Report(expectedErr)
})
if err == nil {
if errCh == nil {
t.Fatalf("expected !nil error chan")
}
errOnce.Send(err)
errOnce.Send(errCh)
foundError := false
fatalAfter(t, executed, 1*time.Second, "timed out waiting deferred execution")
fatalAfter(t, decorated2.OnError(errOnce.Err(), func(e error) {
<-executed
<-decorated2.OnError(errOnce.Err(), func(e error) {
if e != expectedErr {
t.Fatalf("unexpected error: %v", err)
t.Errorf("unexpected error: %v", e)
} else {
foundError = true
}
}), 1*time.Second, "timed out waiting for doer result")
})
// this has been flaky in the past. recent changes to error handling in
// processAdapter.Do should have fixed it.
if !foundError {
t.Fatalf("expected a propagated error")
}
decorated2.End()
fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death")
<-p.Done()
}
func runDelegationTest(t *testing.T, p Process, name string, errOnce ErrorOnce, timeout time.Duration) {
func runDelegationTest(t *testing.T, p Process, name string, errOnce ErrorOnce) {
t.Logf("starting test case " + name + " at " + time.Now().String())
defer func() {
t.Logf("runDelegationTest finished at " + time.Now().String())
@ -325,7 +331,7 @@ func runDelegationTest(t *testing.T, p Process, name string, errOnce ErrorOnce,
// from errCh after this point
errOnce.Send(errCh)
errorAfter(errOnce, executed, timeout, "timed out waiting deferred execution of "+name)
<-executed
t.Logf("runDelegationTest received executed signal at " + time.Now().String())
}
@ -333,56 +339,41 @@ func TestProc_doWithNestedX(t *testing.T) {
t.Logf("starting test case at " + time.Now().String())
p := New()
errOnce := NewErrorOnce(p.Done())
timeout := 5 * time.Second
runDelegationTest(t, p, "nested", errOnce, timeout)
runDelegationTest(t, p, "nested", errOnce)
<-p.End()
select {
case err := <-errOnce.Err():
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
case <-time.After(2 * timeout):
t.Fatalf("timed out waiting for doer result")
err, _ := <-errOnce.Err()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death")
<-p.Done()
}
// TODO(jdef): find a way to test this without killing CI builds.
// intended to be run with -race
func TestProc_doWithNestedXConcurrent(t *testing.T) {
t.Skip("disabled for causing CI timeouts.")
config := defaultConfig
config.actionQueueDepth = 0
config.actionQueueDepth = 4000
p := newConfigured(config)
var wg sync.WaitGroup
const CONC = 20
wg.Add(CONC)
// this test spins up TONS of goroutines that can take a little while to execute on a busy
// CI server. drawing the line at 10s because I've never seen it take anywhere near that long.
timeout := 10 * time.Second
for i := 0; i < CONC; i++ {
i := i
errOnce := NewErrorOnce(p.Done())
runtime.After(func() { runDelegationTest(t, p, fmt.Sprintf("nested%d", i), errOnce, timeout) }).Then(wg.Done)
runtime.After(func() { runDelegationTest(t, p, fmt.Sprintf("nested%d", i), errOnce) }).Then(wg.Done)
go func() {
select {
case err := <-errOnce.Err():
if err != nil {
t.Fatalf("delegate %d: unexpected error: %v", i, err)
}
case <-time.After(2 * timeout):
t.Fatalf("delegate %d: timed out waiting for doer result", i)
err, _ := <-errOnce.Err()
if err != nil {
t.Fatalf("delegate %d: unexpected error: %v", i, err)
}
}()
}
ch := runtime.After(wg.Wait)
fatalAfter(t, ch, 2*timeout, "timed out waiting for concurrent delegates")
wg.Wait()
<-p.End()
fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death")
<-p.Done()
}
func TestProcWithExceededActionQueueDepth(t *testing.T) {
@ -391,16 +382,13 @@ func TestProcWithExceededActionQueueDepth(t *testing.T) {
p := newConfigured(config)
errOnce := NewErrorOnce(p.Done())
timeout := 5 * time.Second
runDelegationTest(t, p, "nested", errOnce, timeout)
runDelegationTest(t, p, "nested", errOnce)
<-p.End()
select {
case err := <-errOnce.Err():
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
case <-time.After(2 * timeout):
t.Fatalf("timed out waiting for doer result")
err, _ := <-errOnce.Err()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death")
<-p.Done()
}

View File

@ -16,7 +16,11 @@ limitations under the License.
package proc
// something that executes in the context of a process
import (
"time"
)
// Action is something that executes in the context of a process
type Action func()
type Context interface {
@ -36,9 +40,17 @@ type Doer interface {
Do(Action) <-chan error
}
// adapter func for Doer interface
// DoerFunc is an adapter func for Doer interface
type DoerFunc func(Action) <-chan error
// invoke the f on action a. returns an illegal state error if f is nil.
func (f DoerFunc) Do(a Action) <-chan error {
if f != nil {
return f(a)
}
return ErrorChan(errIllegalState)
}
type Process interface {
Context
Doer
@ -71,4 +83,7 @@ type ErrorOnce interface {
// Send is non-blocking; it spins up a goroutine that reports an error (if any) that occurs on the error chan.
Send(<-chan error) ErrorOnce
// WaitFor returns true if an error is received within the specified duration, otherwise false
WaitFor(time.Duration) (error, bool)
}