diff --git a/contrib/mesos/pkg/queue/historical.go b/contrib/mesos/pkg/queue/historical.go index a9021c14b4d..9ef42eaf80b 100644 --- a/contrib/mesos/pkg/queue/historical.go +++ b/contrib/mesos/pkg/queue/historical.go @@ -222,7 +222,7 @@ func (f *HistoricalFIFO) Poll(id string, t EventType) bool { func (q *HistoricalFIFO) Await(timeout time.Duration) interface{} { cancel := make(chan struct{}) ch := make(chan interface{}, 1) - go func() { ch <- q.pop(cancel) }() + go func() { ch <- q.CancelablePop(cancel) }() select { case <-time.After(timeout): close(cancel) @@ -232,10 +232,10 @@ func (q *HistoricalFIFO) Await(timeout time.Duration) interface{} { } } func (f *HistoricalFIFO) Pop() interface{} { - return f.pop(nil) + return f.CancelablePop(nil) } -func (f *HistoricalFIFO) pop(cancel chan struct{}) interface{} { +func (f *HistoricalFIFO) CancelablePop(cancel <-chan struct{}) interface{} { popEvent := (Entry)(nil) defer func() { f.carrier(popEvent) @@ -383,7 +383,7 @@ func (f *HistoricalFIFO) merge(id string, obj UniqueCopyable) (notifications []E // NewHistorical returns a Store which can be used to queue up items to // process. If a non-nil Mux is provided, then modifications to the // the FIFO are delivered on a channel specific to this fifo. -func NewHistorical(ch chan<- Entry) FIFO { +func NewHistorical(ch chan<- Entry) *HistoricalFIFO { carrier := dead if ch != nil { carrier = func(msg Entry) {