mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #29368 from mwielgus/delaying_deliverer
Automatic merge from submit-queue Delaying deliverer for Federated ReplicaSet A helper struct to push data to a channel after a given delay. It runs on a single gouroutine and allows updates. An update cancels previous delivery if it was about to happen later than the new one. Otherwise the new update is discarded. All data require a string key that is used to identify the data (for updates and de-duplication). cc: @quinton-hoole @wojtek-t
This commit is contained in:
commit
f7409f3be2
149
federation/pkg/federation-controller/util/delaying_deliverer.go
Normal file
149
federation/pkg/federation-controller/util/delaying_deliverer.go
Normal file
@ -0,0 +1,149 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// TODO: consider moving it to a more generic package.
|
||||
package util
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
// TODO: Investigate what capacity is right.
|
||||
delayingDelivererUpdateChanCapacity = 1000
|
||||
)
|
||||
|
||||
// DelayingDelivererItem is structure delivered by DelayingDeliverer to the
|
||||
// target channel.
|
||||
type DelayingDelivererItem struct {
|
||||
// Key under which the value was added to deliverer.
|
||||
Key string
|
||||
// Value of the item.
|
||||
Value interface{}
|
||||
// When the item should be delivered.
|
||||
DeliveryTime time.Time
|
||||
}
|
||||
|
||||
type delivererHeap struct {
|
||||
keyPosition map[string]int
|
||||
data []*DelayingDelivererItem
|
||||
}
|
||||
|
||||
// Functions required by container.Heap.
|
||||
|
||||
func (dh *delivererHeap) Len() int { return len(dh.data) }
|
||||
func (dh *delivererHeap) Less(i, j int) bool {
|
||||
return dh.data[i].DeliveryTime.Before(dh.data[j].DeliveryTime)
|
||||
}
|
||||
func (dh *delivererHeap) Swap(i, j int) {
|
||||
dh.keyPosition[dh.data[i].Key] = j
|
||||
dh.keyPosition[dh.data[j].Key] = i
|
||||
dh.data[i], dh.data[j] = dh.data[j], dh.data[i]
|
||||
}
|
||||
|
||||
func (dh *delivererHeap) Push(x interface{}) {
|
||||
item := x.(*DelayingDelivererItem)
|
||||
dh.data = append(dh.data, item)
|
||||
dh.keyPosition[item.Key] = len(dh.data) - 1
|
||||
}
|
||||
|
||||
func (dh *delivererHeap) Pop() interface{} {
|
||||
n := len(dh.data)
|
||||
item := dh.data[n-1]
|
||||
dh.data = dh.data[:n-1]
|
||||
delete(dh.keyPosition, item.Key)
|
||||
return item
|
||||
}
|
||||
|
||||
// A structure that pushes the items to the target channel at a given time.
|
||||
type DelayingDeliverer struct {
|
||||
// Channel to deliver the data when their time comes.
|
||||
targetChannel chan *DelayingDelivererItem
|
||||
// Store for data
|
||||
heap *delivererHeap
|
||||
// Channel to feed the main goroutine with updates.
|
||||
updateChannel chan *DelayingDelivererItem
|
||||
// To stop the main goroutine.
|
||||
stopChannel chan struct{}
|
||||
}
|
||||
|
||||
func NewDelayingDeliverer(targetChannel chan *DelayingDelivererItem) *DelayingDeliverer {
|
||||
d := &DelayingDeliverer{
|
||||
targetChannel: targetChannel,
|
||||
heap: &delivererHeap{
|
||||
keyPosition: make(map[string]int),
|
||||
data: make([]*DelayingDelivererItem, 0),
|
||||
},
|
||||
updateChannel: make(chan *DelayingDelivererItem, delayingDelivererUpdateChanCapacity),
|
||||
stopChannel: make(chan struct{}),
|
||||
}
|
||||
go d.run()
|
||||
return d
|
||||
}
|
||||
|
||||
// Deliver all items due before or equal to timestamp.
|
||||
func (d *DelayingDeliverer) deliver(timestamp time.Time) {
|
||||
for d.heap.Len() > 0 {
|
||||
if timestamp.Before(d.heap.data[0].DeliveryTime) {
|
||||
return
|
||||
}
|
||||
item := heap.Pop(d.heap).(*DelayingDelivererItem)
|
||||
d.targetChannel <- item
|
||||
}
|
||||
}
|
||||
|
||||
func (d *DelayingDeliverer) run() {
|
||||
for {
|
||||
now := time.Now()
|
||||
d.deliver(now)
|
||||
|
||||
nextWakeUp := now.Add(time.Hour)
|
||||
if d.heap.Len() > 0 {
|
||||
nextWakeUp = d.heap.data[0].DeliveryTime
|
||||
}
|
||||
sleepTime := nextWakeUp.Sub(now)
|
||||
|
||||
select {
|
||||
case <-time.After(sleepTime):
|
||||
break // just wake up and process the data
|
||||
case item := <-d.updateChannel:
|
||||
if position, found := d.heap.keyPosition[item.Key]; found {
|
||||
if item.DeliveryTime.Before(d.heap.data[position].DeliveryTime) {
|
||||
d.heap.data[position] = item
|
||||
heap.Fix(d.heap, position)
|
||||
}
|
||||
// Ignore if later.
|
||||
} else {
|
||||
heap.Push(d.heap, item)
|
||||
}
|
||||
case <-d.stopChannel:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *DelayingDeliverer) Stop() {
|
||||
close(d.stopChannel)
|
||||
}
|
||||
|
||||
func (d *DelayingDeliverer) DeliverAt(key string, value interface{}, deliveryTime time.Time) {
|
||||
d.updateChannel <- &DelayingDelivererItem{
|
||||
Key: key,
|
||||
Value: value,
|
||||
DeliveryTime: deliveryTime,
|
||||
}
|
||||
}
|
@ -0,0 +1,62 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestDelayingDeliverer(t *testing.T) {
|
||||
targetChannel := make(chan *DelayingDelivererItem)
|
||||
now := time.Now()
|
||||
d := NewDelayingDeliverer(targetChannel)
|
||||
defer d.Stop()
|
||||
startupDelay := time.Second
|
||||
d.DeliverAt("a", "aaa", now.Add(startupDelay+2*time.Millisecond))
|
||||
d.DeliverAt("b", "bbb", now.Add(startupDelay+3*time.Millisecond))
|
||||
d.DeliverAt("c", "ccc", now.Add(startupDelay+1*time.Millisecond))
|
||||
d.DeliverAt("e", "eee", now.Add(time.Hour))
|
||||
d.DeliverAt("e", "eee", now)
|
||||
|
||||
d.DeliverAt("d", "ddd", now.Add(time.Hour))
|
||||
|
||||
i0 := <-targetChannel
|
||||
assert.Equal(t, "e", i0.Key)
|
||||
assert.Equal(t, "eee", i0.Value.(string))
|
||||
assert.Equal(t, now, i0.DeliveryTime)
|
||||
|
||||
i1 := <-targetChannel
|
||||
received1 := time.Now()
|
||||
assert.True(t, received1.Sub(now).Nanoseconds() > startupDelay.Nanoseconds())
|
||||
assert.Equal(t, "c", i1.Key)
|
||||
|
||||
i2 := <-targetChannel
|
||||
assert.Equal(t, "a", i2.Key)
|
||||
|
||||
i3 := <-targetChannel
|
||||
assert.Equal(t, "b", i3.Key)
|
||||
|
||||
select {
|
||||
case <-targetChannel:
|
||||
t.Fatalf("Nothing should be received")
|
||||
case <-time.After(time.Second):
|
||||
// Ok. Expected
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user