Merge pull request #9775 from mesosphere/fix-flakey-proc-test

Fix flakey nested concurrent unit test in contrib/mesos/pkg/proc
This commit is contained in:
David Oppenheimer 2015-06-13 14:07:02 -07:00
commit a01090933c
3 changed files with 83 additions and 62 deletions

View File

@ -33,6 +33,10 @@ const (
// how many actions we can store in the backlog // how many actions we can store in the backlog
defaultActionQueueDepth = 1024 defaultActionQueueDepth = 1024
// wait this long before re-attempting to enqueue an action into
// the scheduling backlog
defaultMaxRescheduleWait = 5 * time.Millisecond
) )
type procImpl struct { type procImpl struct {
@ -56,15 +60,21 @@ type Config struct {
// determines the size of the deferred action backlog // determines the size of the deferred action backlog
actionQueueDepth uint32 actionQueueDepth uint32
// wait this long before re-attempting to enqueue an action into
// the scheduling backlog
maxRescheduleWait time.Duration
} }
var ( var (
// default process configuration, used in the creation of all new processes
defaultConfig = Config{ defaultConfig = Config{
actionHandlerCrashDelay: defaultActionHandlerCrashDelay, actionHandlerCrashDelay: defaultActionHandlerCrashDelay,
actionQueueDepth: defaultActionQueueDepth, actionQueueDepth: defaultActionQueueDepth,
maxRescheduleWait: defaultMaxRescheduleWait,
} }
pid uint32 pid uint32 // global pid counter
closedErrChan <-chan error closedErrChan <-chan error // singleton chan that's always closed
) )
func init() { func init() {
@ -110,6 +120,7 @@ func (self *procImpl) begin() runtime.Signal {
} }
defer log.V(2).Infof("started process %d", self.pid) defer log.V(2).Infof("started process %d", self.pid)
var entered runtime.Latch var entered runtime.Latch
// execute actions on the backlog chan // execute actions on the backlog chan
return runtime.After(func() { return runtime.After(func() {
runtime.Until(func() { runtime.Until(func() {
@ -154,6 +165,7 @@ func (self *procImpl) doLater(deferredAction Action) (err <-chan error) {
self.writeLock.Lock() self.writeLock.Lock()
defer self.writeLock.Unlock() defer self.writeLock.Unlock()
var timer *time.Timer
for err == nil && !scheduled { for err == nil && !scheduled {
switch s := self.state.get(); s { switch s := self.state.get(); s {
case stateRunning: case stateRunning:
@ -161,7 +173,13 @@ func (self *procImpl) doLater(deferredAction Action) (err <-chan error) {
case self.backlog <- a: case self.backlog <- a:
scheduled = true scheduled = true
default: default:
if timer == nil {
timer = time.AfterFunc(self.maxRescheduleWait, self.changed.Broadcast)
} else {
timer.Reset(self.maxRescheduleWait)
}
self.changed.Wait() self.changed.Wait()
timer.Stop()
} }
case stateTerminal: case stateTerminal:
err = ErrorChan(errProcessTerminated) err = ErrorChan(errProcessTerminated)
@ -292,21 +310,18 @@ func (b *errorOnce) forward(errIn <-chan error) {
} }
type processAdapter struct { type processAdapter struct {
parent Process Process
delegate Doer delegate Doer
} }
func (p *processAdapter) Do(a Action) <-chan error { func (p *processAdapter) Do(a Action) <-chan error {
if p == nil || p.parent == nil || p.delegate == nil {
return ErrorChan(errIllegalState)
}
errCh := NewErrorOnce(p.Done()) errCh := NewErrorOnce(p.Done())
go func() { go func() {
errOuter := p.parent.Do(func() { errOuter := p.Process.Do(func() {
errInner := p.delegate.Do(a) errInner := p.delegate.Do(a)
errCh.forward(errInner) errCh.forward(errInner)
}) })
// if the outer err is !nil then either the parent failed to schedule the // if the outer err is !nil then either the parent Process failed to schedule the
// the action, or else it backgrounded the scheduling task. // the action, or else it backgrounded the scheduling task.
if errOuter != nil { if errOuter != nil {
errCh.forward(errOuter) errCh.forward(errOuter)
@ -315,42 +330,17 @@ func (p *processAdapter) Do(a Action) <-chan error {
return errCh.Err() return errCh.Err()
} }
func (p *processAdapter) End() <-chan struct{} { // DoWith returns a process that, within its execution context, delegates to the specified Doer.
if p != nil && p.parent != nil { // Expect a panic if either the given Process or Doer are nil.
return p.parent.End()
}
return nil
}
func (p *processAdapter) Done() <-chan struct{} {
if p != nil && p.parent != nil {
return p.parent.Done()
}
return nil
}
func (p *processAdapter) Running() <-chan struct{} {
if p != nil && p.parent != nil {
return p.parent.Running()
}
return nil
}
func (p *processAdapter) OnError(ch <-chan error, f func(error)) <-chan struct{} {
if p != nil && p.parent != nil {
return p.parent.OnError(ch, f)
}
return nil
}
// returns a process that, within its execution context, delegates to the specified Doer.
// if the given Doer instance is nil, a valid Process is still returned though calls to its
// Do() implementation will always return errIllegalState.
// if the given Process instance is nil then in addition to the behavior in the prior sentence,
// calls to End() and Done() are effectively noops.
func DoWith(other Process, d Doer) Process { func DoWith(other Process, d Doer) Process {
if other == nil {
panic(fmt.Sprintf("cannot DoWith a nil process"))
}
if d == nil {
panic(fmt.Sprintf("cannot DoWith a nil doer"))
}
return &processAdapter{ return &processAdapter{
parent: other, Process: other,
delegate: d, delegate: d,
} }
} }

View File

@ -39,7 +39,8 @@ func errorAfter(errOnce ErrorOnce, done <-chan struct{}, d time.Duration, msg st
select { select {
case <-done: case <-done:
case <-time.After(d): case <-time.After(d):
errOnce.Reportf(msg, args...) //errOnce.Reportf(msg, args...)
panic(fmt.Sprintf(msg, args...))
} }
} }
@ -277,7 +278,8 @@ func TestProc_doWithNestedErrorPropagation(t *testing.T) {
fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death") fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death")
} }
func runDelegationTest(t *testing.T, p Process, name string, errOnce ErrorOnce) { func runDelegationTest(t *testing.T, p Process, name string, errOnce ErrorOnce, timeout time.Duration) {
t.Logf("starting test case " + name + " at " + time.Now().String())
defer func() { defer func() {
t.Logf("runDelegationTest finished at " + time.Now().String()) t.Logf("runDelegationTest finished at " + time.Now().String())
}() }()
@ -293,11 +295,11 @@ func runDelegationTest(t *testing.T, p Process, name string, errOnce ErrorOnce)
x := x x := x
nextp := DoWith(decorated, DoerFunc(func(a Action) <-chan error { nextp := DoWith(decorated, DoerFunc(func(a Action) <-chan error {
if x == 1 { if x == 1 {
t.Logf("delegate chain invoked for " + name) t.Logf("delegate chain invoked for " + name + " at " + time.Now().String())
} }
y++ y++
if y != x { if y != x {
return ErrorChanf("out of order delegated execution") return ErrorChanf("out of order delegated execution for " + name)
} }
defer wg.Done() defer wg.Done()
a() a()
@ -310,7 +312,7 @@ func runDelegationTest(t *testing.T, p Process, name string, errOnce ErrorOnce)
errCh := decorated.Do(func() { errCh := decorated.Do(func() {
defer close(executed) defer close(executed)
if y != DEPTH { if y != DEPTH {
errOnce.Reportf("expected delegated execution") errOnce.Reportf("expected delegated execution for " + name)
} }
t.Logf("executing deferred action: " + name + " at " + time.Now().String()) t.Logf("executing deferred action: " + name + " at " + time.Now().String())
errOnce.Send(nil) // we completed without error, let the listener know errOnce.Send(nil) // we completed without error, let the listener know
@ -323,7 +325,7 @@ func runDelegationTest(t *testing.T, p Process, name string, errOnce ErrorOnce)
// from errCh after this point // from errCh after this point
errOnce.Send(errCh) errOnce.Send(errCh)
errorAfter(errOnce, executed, 5*time.Second, "timed out waiting deferred execution") errorAfter(errOnce, executed, timeout, "timed out waiting deferred execution of "+name)
t.Logf("runDelegationTest received executed signal at " + time.Now().String()) t.Logf("runDelegationTest received executed signal at " + time.Now().String())
} }
@ -331,14 +333,15 @@ func TestProc_doWithNestedX(t *testing.T) {
t.Logf("starting test case at " + time.Now().String()) t.Logf("starting test case at " + time.Now().String())
p := New() p := New()
errOnce := NewErrorOnce(p.Done()) errOnce := NewErrorOnce(p.Done())
runDelegationTest(t, p, "nested", errOnce) timeout := 5 * time.Second
runDelegationTest(t, p, "nested", errOnce, timeout)
<-p.End() <-p.End()
select { select {
case err := <-errOnce.Err(): case err := <-errOnce.Err():
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
case <-time.After(5 * time.Second): case <-time.After(2 * timeout):
t.Fatalf("timed out waiting for doer result") t.Fatalf("timed out waiting for doer result")
} }
fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death") fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death")
@ -346,28 +349,53 @@ func TestProc_doWithNestedX(t *testing.T) {
// intended to be run with -race // intended to be run with -race
func TestProc_doWithNestedXConcurrent(t *testing.T) { func TestProc_doWithNestedXConcurrent(t *testing.T) {
p := New() config := defaultConfig
errOnce := NewErrorOnce(p.Done()) config.actionQueueDepth = 0
p := newConfigured(config)
var wg sync.WaitGroup var wg sync.WaitGroup
const CONC = 20 const CONC = 20
wg.Add(CONC) wg.Add(CONC)
timeout := 3 * time.Second
for i := 0; i < CONC; i++ { for i := 0; i < CONC; i++ {
i := i i := i
runtime.After(func() { runDelegationTest(t, p, fmt.Sprintf("nested%d", i), errOnce) }).Then(wg.Done) errOnce := NewErrorOnce(p.Done())
runtime.After(func() { runDelegationTest(t, p, fmt.Sprintf("nested%d", i), errOnce, timeout) }).Then(wg.Done)
go func() {
select {
case err := <-errOnce.Err():
if err != nil {
t.Fatalf("delegate %d: unexpected error: %v", i, err)
}
case <-time.After(2 * timeout):
t.Fatalf("delegate %d: timed out waiting for doer result", i)
}
}()
} }
ch := runtime.After(wg.Wait) ch := runtime.After(wg.Wait)
fatalAfter(t, ch, 10*time.Second, "timed out waiting for concurrent delegates") fatalAfter(t, ch, 2*timeout, "timed out waiting for concurrent delegates")
<-p.End() <-p.End()
fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death")
}
func TestProcWithExceededActionQueueDepth(t *testing.T) {
config := defaultConfig
config.actionQueueDepth = 0
p := newConfigured(config)
errOnce := NewErrorOnce(p.Done())
timeout := 5 * time.Second
runDelegationTest(t, p, "nested", errOnce, timeout)
<-p.End()
select { select {
case err := <-errOnce.Err(): case err := <-errOnce.Err():
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
case <-time.After(5 * time.Second): case <-time.After(2 * timeout):
t.Fatalf("timed out waiting for doer result") t.Fatalf("timed out waiting for doer result")
} }
fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death") fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death")
} }

View File

@ -52,20 +52,23 @@ type Process interface {
Running() <-chan struct{} Running() <-chan struct{}
} }
// this is an error promise. if we ever start building out support for other promise types it will probably // ErrorOnce an error promise. If we ever start building out support for other promise types it will probably
// make sense to group them in some sort of "promises" package. // make sense to group them in some sort of "promises" package.
type ErrorOnce interface { type ErrorOnce interface {
// return a chan that only ever sends one error, either obtained via Report() or Forward() // Err returns a chan that only ever sends one error, either obtained via Report() or Forward().
Err() <-chan error Err() <-chan error
// reports the given error via Err(), but only if no other errors have been reported or forwarded // Report reports the given error via Err(), but only if no other errors have been reported or forwarded.
Report(error) Report(error)
// Report reports an error via Err(), but only if no other errors have been reported or forwarded, using
// fmt.Errorf to generate the error.
Reportf(string, ...interface{}) Reportf(string, ...interface{})
// waits for an error on the incoming chan, the result of which is later obtained via Err() (if no // forward waits for an error on the incoming chan, the result of which is later obtained via Err() (if no
// other errors have been reported or forwarded) // other errors have been reported or forwarded).
forward(<-chan error) forward(<-chan error)
// non-blocking, spins up a goroutine that reports an error (if any) that occurs on the error chan. // Send is non-blocking; it spins up a goroutine that reports an error (if any) that occurs on the error chan.
Send(<-chan error) ErrorOnce Send(<-chan error) ErrorOnce
} }