diff --git a/contrib/mesos/pkg/proc/proc.go b/contrib/mesos/pkg/proc/proc.go index 159e523961f..0108563adde 100644 --- a/contrib/mesos/pkg/proc/proc.go +++ b/contrib/mesos/pkg/proc/proc.go @@ -33,6 +33,10 @@ const ( // how many actions we can store in the backlog defaultActionQueueDepth = 1024 + + // wait this long before re-attempting to enqueue an action into + // the scheduling backlog + defaultMaxRescheduleWait = 5 * time.Millisecond ) type procImpl struct { @@ -56,15 +60,21 @@ type Config struct { // determines the size of the deferred action backlog actionQueueDepth uint32 + + // wait this long before re-attempting to enqueue an action into + // the scheduling backlog + maxRescheduleWait time.Duration } var ( + // default process configuration, used in the creation of all new processes defaultConfig = Config{ actionHandlerCrashDelay: defaultActionHandlerCrashDelay, actionQueueDepth: defaultActionQueueDepth, + maxRescheduleWait: defaultMaxRescheduleWait, } - pid uint32 - closedErrChan <-chan error + pid uint32 // global pid counter + closedErrChan <-chan error // singleton chan that's always closed ) func init() { @@ -110,6 +120,7 @@ func (self *procImpl) begin() runtime.Signal { } defer log.V(2).Infof("started process %d", self.pid) var entered runtime.Latch + // execute actions on the backlog chan return runtime.After(func() { runtime.Until(func() { @@ -154,6 +165,7 @@ func (self *procImpl) doLater(deferredAction Action) (err <-chan error) { self.writeLock.Lock() defer self.writeLock.Unlock() + var timer *time.Timer for err == nil && !scheduled { switch s := self.state.get(); s { case stateRunning: @@ -161,7 +173,13 @@ func (self *procImpl) doLater(deferredAction Action) (err <-chan error) { case self.backlog <- a: scheduled = true default: + if timer == nil { + timer = time.AfterFunc(self.maxRescheduleWait, self.changed.Broadcast) + } else { + timer.Reset(self.maxRescheduleWait) + } self.changed.Wait() + timer.Stop() } case stateTerminal: err = ErrorChan(errProcessTerminated) @@ -292,21 +310,18 @@ func (b *errorOnce) forward(errIn <-chan error) { } type processAdapter struct { - parent Process + Process delegate Doer } func (p *processAdapter) Do(a Action) <-chan error { - if p == nil || p.parent == nil || p.delegate == nil { - return ErrorChan(errIllegalState) - } errCh := NewErrorOnce(p.Done()) go func() { - errOuter := p.parent.Do(func() { + errOuter := p.Process.Do(func() { errInner := p.delegate.Do(a) errCh.forward(errInner) }) - // if the outer err is !nil then either the parent failed to schedule the + // if the outer err is !nil then either the parent Process failed to schedule the // the action, or else it backgrounded the scheduling task. if errOuter != nil { errCh.forward(errOuter) @@ -315,42 +330,17 @@ func (p *processAdapter) Do(a Action) <-chan error { return errCh.Err() } -func (p *processAdapter) End() <-chan struct{} { - if p != nil && p.parent != nil { - return p.parent.End() - } - return nil -} - -func (p *processAdapter) Done() <-chan struct{} { - if p != nil && p.parent != nil { - return p.parent.Done() - } - return nil -} - -func (p *processAdapter) Running() <-chan struct{} { - if p != nil && p.parent != nil { - return p.parent.Running() - } - return nil -} - -func (p *processAdapter) OnError(ch <-chan error, f func(error)) <-chan struct{} { - if p != nil && p.parent != nil { - return p.parent.OnError(ch, f) - } - return nil -} - -// returns a process that, within its execution context, delegates to the specified Doer. -// if the given Doer instance is nil, a valid Process is still returned though calls to its -// Do() implementation will always return errIllegalState. -// if the given Process instance is nil then in addition to the behavior in the prior sentence, -// calls to End() and Done() are effectively noops. +// DoWith returns a process that, within its execution context, delegates to the specified Doer. +// Expect a panic if either the given Process or Doer are nil. func DoWith(other Process, d Doer) Process { + if other == nil { + panic(fmt.Sprintf("cannot DoWith a nil process")) + } + if d == nil { + panic(fmt.Sprintf("cannot DoWith a nil doer")) + } return &processAdapter{ - parent: other, + Process: other, delegate: d, } } diff --git a/contrib/mesos/pkg/proc/proc_test.go b/contrib/mesos/pkg/proc/proc_test.go index 31c034465f6..c645c31cbaa 100644 --- a/contrib/mesos/pkg/proc/proc_test.go +++ b/contrib/mesos/pkg/proc/proc_test.go @@ -39,7 +39,8 @@ func errorAfter(errOnce ErrorOnce, done <-chan struct{}, d time.Duration, msg st select { case <-done: case <-time.After(d): - errOnce.Reportf(msg, args...) + //errOnce.Reportf(msg, args...) + panic(fmt.Sprintf(msg, args...)) } } @@ -277,7 +278,8 @@ func TestProc_doWithNestedErrorPropagation(t *testing.T) { fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death") } -func runDelegationTest(t *testing.T, p Process, name string, errOnce ErrorOnce) { +func runDelegationTest(t *testing.T, p Process, name string, errOnce ErrorOnce, timeout time.Duration) { + t.Logf("starting test case " + name + " at " + time.Now().String()) defer func() { t.Logf("runDelegationTest finished at " + time.Now().String()) }() @@ -293,11 +295,11 @@ func runDelegationTest(t *testing.T, p Process, name string, errOnce ErrorOnce) x := x nextp := DoWith(decorated, DoerFunc(func(a Action) <-chan error { if x == 1 { - t.Logf("delegate chain invoked for " + name) + t.Logf("delegate chain invoked for " + name + " at " + time.Now().String()) } y++ if y != x { - return ErrorChanf("out of order delegated execution") + return ErrorChanf("out of order delegated execution for " + name) } defer wg.Done() a() @@ -310,7 +312,7 @@ func runDelegationTest(t *testing.T, p Process, name string, errOnce ErrorOnce) errCh := decorated.Do(func() { defer close(executed) if y != DEPTH { - errOnce.Reportf("expected delegated execution") + errOnce.Reportf("expected delegated execution for " + name) } t.Logf("executing deferred action: " + name + " at " + time.Now().String()) errOnce.Send(nil) // we completed without error, let the listener know @@ -323,7 +325,7 @@ func runDelegationTest(t *testing.T, p Process, name string, errOnce ErrorOnce) // from errCh after this point errOnce.Send(errCh) - errorAfter(errOnce, executed, 5*time.Second, "timed out waiting deferred execution") + errorAfter(errOnce, executed, timeout, "timed out waiting deferred execution of "+name) t.Logf("runDelegationTest received executed signal at " + time.Now().String()) } @@ -331,14 +333,15 @@ func TestProc_doWithNestedX(t *testing.T) { t.Logf("starting test case at " + time.Now().String()) p := New() errOnce := NewErrorOnce(p.Done()) - runDelegationTest(t, p, "nested", errOnce) + timeout := 5 * time.Second + runDelegationTest(t, p, "nested", errOnce, timeout) <-p.End() select { case err := <-errOnce.Err(): if err != nil { t.Fatalf("unexpected error: %v", err) } - case <-time.After(5 * time.Second): + case <-time.After(2 * timeout): t.Fatalf("timed out waiting for doer result") } fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death") @@ -346,28 +349,53 @@ func TestProc_doWithNestedX(t *testing.T) { // intended to be run with -race func TestProc_doWithNestedXConcurrent(t *testing.T) { - p := New() - errOnce := NewErrorOnce(p.Done()) + config := defaultConfig + config.actionQueueDepth = 0 + p := newConfigured(config) + var wg sync.WaitGroup const CONC = 20 wg.Add(CONC) + timeout := 3 * time.Second + for i := 0; i < CONC; i++ { i := i - runtime.After(func() { runDelegationTest(t, p, fmt.Sprintf("nested%d", i), errOnce) }).Then(wg.Done) + errOnce := NewErrorOnce(p.Done()) + runtime.After(func() { runDelegationTest(t, p, fmt.Sprintf("nested%d", i), errOnce, timeout) }).Then(wg.Done) + go func() { + select { + case err := <-errOnce.Err(): + if err != nil { + t.Fatalf("delegate %d: unexpected error: %v", i, err) + } + case <-time.After(2 * timeout): + t.Fatalf("delegate %d: timed out waiting for doer result", i) + } + }() } ch := runtime.After(wg.Wait) - fatalAfter(t, ch, 10*time.Second, "timed out waiting for concurrent delegates") + fatalAfter(t, ch, 2*timeout, "timed out waiting for concurrent delegates") <-p.End() + fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death") +} +func TestProcWithExceededActionQueueDepth(t *testing.T) { + config := defaultConfig + config.actionQueueDepth = 0 + p := newConfigured(config) + + errOnce := NewErrorOnce(p.Done()) + timeout := 5 * time.Second + runDelegationTest(t, p, "nested", errOnce, timeout) + <-p.End() select { case err := <-errOnce.Err(): if err != nil { t.Fatalf("unexpected error: %v", err) } - case <-time.After(5 * time.Second): + case <-time.After(2 * timeout): t.Fatalf("timed out waiting for doer result") } - fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death") } diff --git a/contrib/mesos/pkg/proc/types.go b/contrib/mesos/pkg/proc/types.go index d2cae458b15..3ab769f2fae 100644 --- a/contrib/mesos/pkg/proc/types.go +++ b/contrib/mesos/pkg/proc/types.go @@ -52,20 +52,23 @@ type Process interface { Running() <-chan struct{} } -// this is an error promise. if we ever start building out support for other promise types it will probably +// ErrorOnce an error promise. If we ever start building out support for other promise types it will probably // make sense to group them in some sort of "promises" package. type ErrorOnce interface { - // return a chan that only ever sends one error, either obtained via Report() or Forward() + // Err returns a chan that only ever sends one error, either obtained via Report() or Forward(). Err() <-chan error - // reports the given error via Err(), but only if no other errors have been reported or forwarded + // Report reports the given error via Err(), but only if no other errors have been reported or forwarded. Report(error) + + // Report reports an error via Err(), but only if no other errors have been reported or forwarded, using + // fmt.Errorf to generate the error. Reportf(string, ...interface{}) - // waits for an error on the incoming chan, the result of which is later obtained via Err() (if no - // other errors have been reported or forwarded) + // forward waits for an error on the incoming chan, the result of which is later obtained via Err() (if no + // other errors have been reported or forwarded). forward(<-chan error) - // non-blocking, spins up a goroutine that reports an error (if any) that occurs on the error chan. + // Send is non-blocking; it spins up a goroutine that reports an error (if any) that occurs on the error chan. Send(<-chan error) ErrorOnce }