diff --git a/federation/pkg/federation-controller/util/delaying_deliverer.go b/federation/pkg/federation-controller/util/delaying_deliverer.go index eea84cc1fb5..e63a620c038 100644 --- a/federation/pkg/federation-controller/util/delaying_deliverer.go +++ b/federation/pkg/federation-controller/util/delaying_deliverer.go @@ -81,8 +81,12 @@ type DelayingDeliverer struct { stopChannel chan struct{} } -func NewDelayingDeliverer(targetChannel chan *DelayingDelivererItem) *DelayingDeliverer { - d := &DelayingDeliverer{ +func NewDelayingDeliverer() *DelayingDeliverer { + return NewDelayingDelivererWithChannel(make(chan *DelayingDelivererItem, 100)) +} + +func NewDelayingDelivererWithChannel(targetChannel chan *DelayingDelivererItem) *DelayingDeliverer { + return &DelayingDeliverer{ targetChannel: targetChannel, heap: &delivererHeap{ keyPosition: make(map[string]int), @@ -91,8 +95,6 @@ func NewDelayingDeliverer(targetChannel chan *DelayingDelivererItem) *DelayingDe updateChannel: make(chan *DelayingDelivererItem, delayingDelivererUpdateChanCapacity), stopChannel: make(chan struct{}), } - go d.run() - return d } // Deliver all items due before or equal to timestamp. @@ -136,6 +138,12 @@ func (d *DelayingDeliverer) run() { } } +// Starts the DelayingDeliverer. +func (d *DelayingDeliverer) Start() { + go d.run() +} + +// Stops the DelayingDeliverer. Undelivered items are discarded. func (d *DelayingDeliverer) Stop() { close(d.stopChannel) } @@ -158,3 +166,18 @@ func (d *DelayingDeliverer) DeliverAfter(key string, value interface{}, delay ti func (d *DelayingDeliverer) GetTargetChannel() chan *DelayingDelivererItem { return d.targetChannel } + +// Starts Delaying deliverer with a handler listening on the target channel. +func (d *DelayingDeliverer) StartWithHandler(handler func(*DelayingDelivererItem)) { + go func() { + for { + select { + case item := <-d.targetChannel: + handler(item) + case <-d.stopChannel: + return + } + } + }() + d.Start() +} diff --git a/federation/pkg/federation-controller/util/delaying_deliverer_test.go b/federation/pkg/federation-controller/util/delaying_deliverer_test.go index 82c6e952763..006bab4c455 100644 --- a/federation/pkg/federation-controller/util/delaying_deliverer_test.go +++ b/federation/pkg/federation-controller/util/delaying_deliverer_test.go @@ -26,7 +26,8 @@ import ( func TestDelayingDeliverer(t *testing.T) { targetChannel := make(chan *DelayingDelivererItem) now := time.Now() - d := NewDelayingDeliverer(targetChannel) + d := NewDelayingDelivererWithChannel(targetChannel) + d.Start() defer d.Stop() startupDelay := time.Second d.DeliverAt("a", "aaa", now.Add(startupDelay+2*time.Millisecond))