From 85dd375cc8eea20688a09b278d4991781c700d1f Mon Sep 17 00:00:00 2001 From: Marcin Wielgus Date: Thu, 21 Jul 2016 12:10:58 +0200 Subject: [PATCH] Delaying deliverer for Federated ReplicaSet --- .../util/delaying_deliverer.go | 149 ++++++++++++++++++ .../util/delaying_deliverer_test.go | 62 ++++++++ 2 files changed, 211 insertions(+) create mode 100644 federation/pkg/federation-controller/util/delaying_deliverer.go create mode 100644 federation/pkg/federation-controller/util/delaying_deliverer_test.go diff --git a/federation/pkg/federation-controller/util/delaying_deliverer.go b/federation/pkg/federation-controller/util/delaying_deliverer.go new file mode 100644 index 00000000000..40aa058e3e3 --- /dev/null +++ b/federation/pkg/federation-controller/util/delaying_deliverer.go @@ -0,0 +1,149 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// TODO: consider moving it to a more generic package. +package util + +import ( + "container/heap" + "time" +) + +const ( + // TODO: Investigate what capacity is right. + delayingDelivererUpdateChanCapacity = 1000 +) + +// DelayingDelivererItem is structure delivered by DelayingDeliverer to the +// target channel. +type DelayingDelivererItem struct { + // Key under which the value was added to deliverer. + Key string + // Value of the item. + Value interface{} + // When the item should be delivered. + DeliveryTime time.Time +} + +type delivererHeap struct { + keyPosition map[string]int + data []*DelayingDelivererItem +} + +// Functions required by container.Heap. + +func (dh *delivererHeap) Len() int { return len(dh.data) } +func (dh *delivererHeap) Less(i, j int) bool { + return dh.data[i].DeliveryTime.Before(dh.data[j].DeliveryTime) +} +func (dh *delivererHeap) Swap(i, j int) { + dh.keyPosition[dh.data[i].Key] = j + dh.keyPosition[dh.data[j].Key] = i + dh.data[i], dh.data[j] = dh.data[j], dh.data[i] +} + +func (dh *delivererHeap) Push(x interface{}) { + item := x.(*DelayingDelivererItem) + dh.data = append(dh.data, item) + dh.keyPosition[item.Key] = len(dh.data) - 1 +} + +func (dh *delivererHeap) Pop() interface{} { + n := len(dh.data) + item := dh.data[n-1] + dh.data = dh.data[:n-1] + delete(dh.keyPosition, item.Key) + return item +} + +// A structure that pushes the items to the target channel at a given time. +type DelayingDeliverer struct { + // Channel to deliver the data when their time comes. + targetChannel chan *DelayingDelivererItem + // Store for data + heap *delivererHeap + // Channel to feed the main goroutine with updates. + updateChannel chan *DelayingDelivererItem + // To stop the main goroutine. + stopChannel chan struct{} +} + +func NewDelayingDeliverer(targetChannel chan *DelayingDelivererItem) *DelayingDeliverer { + d := &DelayingDeliverer{ + targetChannel: targetChannel, + heap: &delivererHeap{ + keyPosition: make(map[string]int), + data: make([]*DelayingDelivererItem, 0), + }, + updateChannel: make(chan *DelayingDelivererItem, delayingDelivererUpdateChanCapacity), + stopChannel: make(chan struct{}), + } + go d.run() + return d +} + +// Deliver all items due before or equal to timestamp. +func (d *DelayingDeliverer) deliver(timestamp time.Time) { + for d.heap.Len() > 0 { + if timestamp.Before(d.heap.data[0].DeliveryTime) { + return + } + item := heap.Pop(d.heap).(*DelayingDelivererItem) + d.targetChannel <- item + } +} + +func (d *DelayingDeliverer) run() { + for { + now := time.Now() + d.deliver(now) + + nextWakeUp := now.Add(time.Hour) + if d.heap.Len() > 0 { + nextWakeUp = d.heap.data[0].DeliveryTime + } + sleepTime := nextWakeUp.Sub(now) + + select { + case <-time.After(sleepTime): + break // just wake up and process the data + case item := <-d.updateChannel: + if position, found := d.heap.keyPosition[item.Key]; found { + if item.DeliveryTime.Before(d.heap.data[position].DeliveryTime) { + d.heap.data[position] = item + heap.Fix(d.heap, position) + } + // Ignore if later. + } else { + heap.Push(d.heap, item) + } + case <-d.stopChannel: + return + } + } +} + +func (d *DelayingDeliverer) Stop() { + close(d.stopChannel) +} + +func (d *DelayingDeliverer) DeliverAt(key string, value interface{}, deliveryTime time.Time) { + d.updateChannel <- &DelayingDelivererItem{ + Key: key, + Value: value, + DeliveryTime: deliveryTime, + } +} diff --git a/federation/pkg/federation-controller/util/delaying_deliverer_test.go b/federation/pkg/federation-controller/util/delaying_deliverer_test.go new file mode 100644 index 00000000000..82c6e952763 --- /dev/null +++ b/federation/pkg/federation-controller/util/delaying_deliverer_test.go @@ -0,0 +1,62 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestDelayingDeliverer(t *testing.T) { + targetChannel := make(chan *DelayingDelivererItem) + now := time.Now() + d := NewDelayingDeliverer(targetChannel) + defer d.Stop() + startupDelay := time.Second + d.DeliverAt("a", "aaa", now.Add(startupDelay+2*time.Millisecond)) + d.DeliverAt("b", "bbb", now.Add(startupDelay+3*time.Millisecond)) + d.DeliverAt("c", "ccc", now.Add(startupDelay+1*time.Millisecond)) + d.DeliverAt("e", "eee", now.Add(time.Hour)) + d.DeliverAt("e", "eee", now) + + d.DeliverAt("d", "ddd", now.Add(time.Hour)) + + i0 := <-targetChannel + assert.Equal(t, "e", i0.Key) + assert.Equal(t, "eee", i0.Value.(string)) + assert.Equal(t, now, i0.DeliveryTime) + + i1 := <-targetChannel + received1 := time.Now() + assert.True(t, received1.Sub(now).Nanoseconds() > startupDelay.Nanoseconds()) + assert.Equal(t, "c", i1.Key) + + i2 := <-targetChannel + assert.Equal(t, "a", i2.Key) + + i3 := <-targetChannel + assert.Equal(t, "b", i3.Key) + + select { + case <-targetChannel: + t.Fatalf("Nothing should be received") + case <-time.After(time.Second): + // Ok. Expected + } +}