Federation - common libs - small changes in delaying deliverer

This commit is contained in:
Marcin Wielgus 2016-08-10 16:10:58 +02:00
parent d611e611d9
commit 9f01136df4
2 changed files with 29 additions and 5 deletions

View File

@ -81,8 +81,12 @@ type DelayingDeliverer struct {
stopChannel chan struct{} stopChannel chan struct{}
} }
func NewDelayingDeliverer(targetChannel chan *DelayingDelivererItem) *DelayingDeliverer { func NewDelayingDeliverer() *DelayingDeliverer {
d := &DelayingDeliverer{ return NewDelayingDelivererWithChannel(make(chan *DelayingDelivererItem, 100))
}
func NewDelayingDelivererWithChannel(targetChannel chan *DelayingDelivererItem) *DelayingDeliverer {
return &DelayingDeliverer{
targetChannel: targetChannel, targetChannel: targetChannel,
heap: &delivererHeap{ heap: &delivererHeap{
keyPosition: make(map[string]int), keyPosition: make(map[string]int),
@ -91,8 +95,6 @@ func NewDelayingDeliverer(targetChannel chan *DelayingDelivererItem) *DelayingDe
updateChannel: make(chan *DelayingDelivererItem, delayingDelivererUpdateChanCapacity), updateChannel: make(chan *DelayingDelivererItem, delayingDelivererUpdateChanCapacity),
stopChannel: make(chan struct{}), stopChannel: make(chan struct{}),
} }
go d.run()
return d
} }
// Deliver all items due before or equal to timestamp. // Deliver all items due before or equal to timestamp.
@ -136,6 +138,12 @@ func (d *DelayingDeliverer) run() {
} }
} }
// Starts the DelayingDeliverer.
func (d *DelayingDeliverer) Start() {
go d.run()
}
// Stops the DelayingDeliverer. Undelivered items are discarded.
func (d *DelayingDeliverer) Stop() { func (d *DelayingDeliverer) Stop() {
close(d.stopChannel) close(d.stopChannel)
} }
@ -158,3 +166,18 @@ func (d *DelayingDeliverer) DeliverAfter(key string, value interface{}, delay ti
func (d *DelayingDeliverer) GetTargetChannel() chan *DelayingDelivererItem { func (d *DelayingDeliverer) GetTargetChannel() chan *DelayingDelivererItem {
return d.targetChannel return d.targetChannel
} }
// Starts Delaying deliverer with a handler listening on the target channel.
func (d *DelayingDeliverer) StartWithHandler(handler func(*DelayingDelivererItem)) {
go func() {
for {
select {
case item := <-d.targetChannel:
handler(item)
case <-d.stopChannel:
return
}
}
}()
d.Start()
}

View File

@ -26,7 +26,8 @@ import (
func TestDelayingDeliverer(t *testing.T) { func TestDelayingDeliverer(t *testing.T) {
targetChannel := make(chan *DelayingDelivererItem) targetChannel := make(chan *DelayingDelivererItem)
now := time.Now() now := time.Now()
d := NewDelayingDeliverer(targetChannel) d := NewDelayingDelivererWithChannel(targetChannel)
d.Start()
defer d.Stop() defer d.Stop()
startupDelay := time.Second startupDelay := time.Second
d.DeliverAt("a", "aaa", now.Add(startupDelay+2*time.Millisecond)) d.DeliverAt("a", "aaa", now.Add(startupDelay+2*time.Millisecond))