Merge pull request #20433 from lavalamp/fix-bad-rv

Add timeout, fix potential startup hang
This commit is contained in:
Jeff Lowdermilk 2016-02-02 17:27:23 -08:00
commit caa9433234
18 changed files with 269 additions and 52 deletions

View File

@ -113,7 +113,7 @@ func TestTTLPolicy(t *testing.T) {
exactlyOnTTL := fakeTime.Add(-ttl) exactlyOnTTL := fakeTime.Add(-ttl)
expiredTime := fakeTime.Add(-(ttl + 1)) expiredTime := fakeTime.Add(-(ttl + 1))
policy := TTLPolicy{ttl, &util.FakeClock{Time: fakeTime}} policy := TTLPolicy{ttl, util.NewFakeClock(fakeTime)}
fakeTimestampedEntry := &timestampedEntry{obj: struct{}{}, timestamp: exactlyOnTTL} fakeTimestampedEntry := &timestampedEntry{obj: struct{}{}, timestamp: exactlyOnTTL}
if policy.IsExpired(fakeTimestampedEntry) { if policy.IsExpired(fakeTimestampedEntry) {
t.Errorf("TTL cache should not expire entries exactly on ttl") t.Errorf("TTL cache should not expire entries exactly on ttl")

View File

@ -348,7 +348,7 @@ func TestEventf(t *testing.T) {
eventBroadcaster := NewBroadcaster() eventBroadcaster := NewBroadcaster()
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
clock := &util.FakeClock{time.Now()} clock := util.NewFakeClock(time.Now())
recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock) recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock)
for index, item := range table { for index, item := range table {
clock.Step(1 * time.Second) clock.Step(1 * time.Second)
@ -559,7 +559,7 @@ func TestEventfNoNamespace(t *testing.T) {
eventBroadcaster := NewBroadcaster() eventBroadcaster := NewBroadcaster()
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
clock := &util.FakeClock{time.Now()} clock := util.NewFakeClock(time.Now())
recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock) recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock)
for index, item := range table { for index, item := range table {
@ -846,7 +846,7 @@ func TestMultiSinkCache(t *testing.T) {
} }
eventBroadcaster := NewBroadcaster() eventBroadcaster := NewBroadcaster()
clock := &util.FakeClock{time.Now()} clock := util.NewFakeClock(time.Now())
recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock) recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock)
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)

View File

@ -43,7 +43,7 @@ import (
// NewFakeControllerExpectationsLookup creates a fake store for PodExpectations. // NewFakeControllerExpectationsLookup creates a fake store for PodExpectations.
func NewFakeControllerExpectationsLookup(ttl time.Duration) (*ControllerExpectations, *util.FakeClock) { func NewFakeControllerExpectationsLookup(ttl time.Duration) (*ControllerExpectations, *util.FakeClock) {
fakeTime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) fakeTime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
fakeClock := &util.FakeClock{Time: fakeTime} fakeClock := util.NewFakeClock(fakeTime)
ttlPolicy := &cache.TTLPolicy{Ttl: ttl, Clock: fakeClock} ttlPolicy := &cache.TTLPolicy{Ttl: ttl, Clock: fakeClock}
ttlStore := cache.NewFakeExpirationStore( ttlStore := cache.NewFakeExpirationStore(
ExpKeyFunc, nil, ttlPolicy, fakeClock) ExpKeyFunc, nil, ttlPolicy, fakeClock)
@ -177,7 +177,7 @@ func TestControllerExpectations(t *testing.T) {
} }
// Expectations have expired because of ttl // Expectations have expired because of ttl
fakeClock.Time = fakeClock.Time.Add(ttl + 1) fakeClock.Step(ttl + 1)
if !e.SatisfiedExpectations(rcKey) { if !e.SatisfiedExpectations(rcKey) {
t.Errorf("Expectations should have expired but didn't") t.Errorf("Expectations should have expired but didn't")
} }

View File

@ -98,7 +98,7 @@ func TestPuller(t *testing.T) {
} }
backOff := util.NewBackOff(time.Second, time.Minute) backOff := util.NewBackOff(time.Second, time.Minute)
fakeClock := &util.FakeClock{Time: time.Now()} fakeClock := util.NewFakeClock(time.Now())
backOff.Clock = fakeClock backOff.Clock = fakeClock
fakeRuntime := &FakeRuntime{} fakeRuntime := &FakeRuntime{}

View File

@ -98,7 +98,7 @@ func TestSerializedPuller(t *testing.T) {
} }
backOff := util.NewBackOff(time.Second, time.Minute) backOff := util.NewBackOff(time.Second, time.Minute)
fakeClock := &util.FakeClock{Time: time.Now()} fakeClock := util.NewFakeClock(time.Now())
backOff.Clock = fakeClock backOff.Clock = fakeClock
fakeRuntime := &FakeRuntime{} fakeRuntime := &FakeRuntime{}

View File

@ -1160,7 +1160,7 @@ func TestGetAPIPodStatusWithLastTermination(t *testing.T) {
} }
func TestSyncPodBackoff(t *testing.T) { func TestSyncPodBackoff(t *testing.T) {
var fakeClock = &util.FakeClock{Time: time.Now()} var fakeClock = util.NewFakeClock(time.Now())
startTime := fakeClock.Now() startTime := fakeClock.Now()
dm, fakeDocker := newTestDockerManager() dm, fakeDocker := newTestDockerManager()
@ -1232,7 +1232,7 @@ func TestSyncPodBackoff(t *testing.T) {
backOff.Clock = fakeClock backOff.Clock = fakeClock
for _, c := range tests { for _, c := range tests {
fakeDocker.SetFakeContainers(dockerContainers) fakeDocker.SetFakeContainers(dockerContainers)
fakeClock.Time = startTime.Add(time.Duration(c.tick) * time.Second) fakeClock.SetTime(startTime.Add(time.Duration(c.tick) * time.Second))
runSyncPod(t, dm, fakeDocker, pod, backOff, c.expectErr) runSyncPod(t, dm, fakeDocker, pod, backOff, c.expectErr)
verifyCalls(t, fakeDocker, c.result) verifyCalls(t, fakeDocker, c.result)

View File

@ -431,8 +431,8 @@ func TestGarbageCollectImageNotOldEnough(t *testing.T) {
}, },
} }
fakeClock := util.FakeClock{Time: time.Now()} fakeClock := util.NewFakeClock(time.Now())
fmt.Println(fakeClock.Now()) t.Log(fakeClock.Now())
require.NoError(t, manager.detectImages(fakeClock.Now())) require.NoError(t, manager.detectImages(fakeClock.Now()))
require.Equal(t, manager.imageRecordsLen(), 2) require.Equal(t, manager.imageRecordsLen(), 2)
// no space freed since one image is in used, and another one is not old enough // no space freed since one image is in used, and another one is not old enough

View File

@ -173,7 +173,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
LowThresholdPercent: 80, LowThresholdPercent: 80,
} }
kubelet.imageManager, err = newImageManager(fakeRuntime, mockCadvisor, fakeRecorder, fakeNodeRef, fakeImageGCPolicy) kubelet.imageManager, err = newImageManager(fakeRuntime, mockCadvisor, fakeRecorder, fakeNodeRef, fakeImageGCPolicy)
fakeClock := &util.FakeClock{Time: time.Now()} fakeClock := util.NewFakeClock(time.Now())
kubelet.backOff = util.NewBackOff(time.Second, time.Minute) kubelet.backOff = util.NewBackOff(time.Second, time.Minute)
kubelet.backOff.Clock = fakeClock kubelet.backOff.Clock = fakeClock
kubelet.podKillingCh = make(chan *kubecontainer.Pod, 20) kubelet.podKillingCh = make(chan *kubecontainer.Pod, 20)

View File

@ -26,7 +26,7 @@ import (
) )
func newTestBasicWorkQueue() (*basicWorkQueue, *util.FakeClock) { func newTestBasicWorkQueue() (*basicWorkQueue, *util.FakeClock) {
fakeClock := &util.FakeClock{Time: time.Now()} fakeClock := util.NewFakeClock(time.Now())
wq := &basicWorkQueue{ wq := &basicWorkQueue{
clock: fakeClock, clock: fakeClock,
queue: make(map[types.UID]time.Time), queue: make(map[types.UID]time.Time),

View File

@ -37,32 +37,32 @@ func TestSecondsSinceSync(t *testing.T) {
tunneler.lastSync = time.Date(2015, time.January, 1, 1, 1, 1, 1, time.UTC).Unix() tunneler.lastSync = time.Date(2015, time.January, 1, 1, 1, 1, 1, time.UTC).Unix()
// Nano Second. No difference. // Nano Second. No difference.
tunneler.clock = &util.FakeClock{Time: time.Date(2015, time.January, 1, 1, 1, 1, 2, time.UTC)} tunneler.clock = util.NewFakeClock(time.Date(2015, time.January, 1, 1, 1, 1, 2, time.UTC))
assert.Equal(int64(0), tunneler.SecondsSinceSync()) assert.Equal(int64(0), tunneler.SecondsSinceSync())
// Second // Second
tunneler.clock = &util.FakeClock{Time: time.Date(2015, time.January, 1, 1, 1, 2, 1, time.UTC)} tunneler.clock = util.NewFakeClock(time.Date(2015, time.January, 1, 1, 1, 2, 1, time.UTC))
assert.Equal(int64(1), tunneler.SecondsSinceSync()) assert.Equal(int64(1), tunneler.SecondsSinceSync())
// Minute // Minute
tunneler.clock = &util.FakeClock{Time: time.Date(2015, time.January, 1, 1, 2, 1, 1, time.UTC)} tunneler.clock = util.NewFakeClock(time.Date(2015, time.January, 1, 1, 2, 1, 1, time.UTC))
assert.Equal(int64(60), tunneler.SecondsSinceSync()) assert.Equal(int64(60), tunneler.SecondsSinceSync())
// Hour // Hour
tunneler.clock = &util.FakeClock{Time: time.Date(2015, time.January, 1, 2, 1, 1, 1, time.UTC)} tunneler.clock = util.NewFakeClock(time.Date(2015, time.January, 1, 2, 1, 1, 1, time.UTC))
assert.Equal(int64(3600), tunneler.SecondsSinceSync()) assert.Equal(int64(3600), tunneler.SecondsSinceSync())
// Day // Day
tunneler.clock = &util.FakeClock{Time: time.Date(2015, time.January, 2, 1, 1, 1, 1, time.UTC)} tunneler.clock = util.NewFakeClock(time.Date(2015, time.January, 2, 1, 1, 1, 1, time.UTC))
assert.Equal(int64(86400), tunneler.SecondsSinceSync()) assert.Equal(int64(86400), tunneler.SecondsSinceSync())
// Month // Month
tunneler.clock = &util.FakeClock{Time: time.Date(2015, time.February, 1, 1, 1, 1, 1, time.UTC)} tunneler.clock = util.NewFakeClock(time.Date(2015, time.February, 1, 1, 1, 1, 1, time.UTC))
assert.Equal(int64(2678400), tunneler.SecondsSinceSync()) assert.Equal(int64(2678400), tunneler.SecondsSinceSync())
// Future Month. Should be -Month. // Future Month. Should be -Month.
tunneler.lastSync = time.Date(2015, time.February, 1, 1, 1, 1, 1, time.UTC).Unix() tunneler.lastSync = time.Date(2015, time.February, 1, 1, 1, 1, 1, time.UTC).Unix()
tunneler.clock = &util.FakeClock{Time: time.Date(2015, time.January, 1, 1, 1, 1, 1, time.UTC)} tunneler.clock = util.NewFakeClock(time.Date(2015, time.January, 1, 1, 1, 1, 1, time.UTC))
assert.Equal(int64(-2678400), tunneler.SecondsSinceSync()) assert.Equal(int64(-2678400), tunneler.SecondsSinceSync())
} }
@ -89,12 +89,12 @@ func TestIsTunnelSyncHealthy(t *testing.T) {
// Pass case: 540 second lag // Pass case: 540 second lag
tunneler.lastSync = time.Date(2015, time.January, 1, 1, 1, 1, 1, time.UTC).Unix() tunneler.lastSync = time.Date(2015, time.January, 1, 1, 1, 1, 1, time.UTC).Unix()
tunneler.clock = &util.FakeClock{Time: time.Date(2015, time.January, 1, 1, 9, 1, 1, time.UTC)} tunneler.clock = util.NewFakeClock(time.Date(2015, time.January, 1, 1, 9, 1, 1, time.UTC))
err := master.IsTunnelSyncHealthy(nil) err := master.IsTunnelSyncHealthy(nil)
assert.NoError(err, "IsTunnelSyncHealthy() should not have returned an error.") assert.NoError(err, "IsTunnelSyncHealthy() should not have returned an error.")
// Fail case: 720 second lag // Fail case: 720 second lag
tunneler.clock = &util.FakeClock{Time: time.Date(2015, time.January, 1, 1, 12, 1, 1, time.UTC)} tunneler.clock = util.NewFakeClock(time.Date(2015, time.January, 1, 1, 12, 1, 1, time.UTC))
err = master.IsTunnelSyncHealthy(nil) err = master.IsTunnelSyncHealthy(nil)
assert.Error(err, "IsTunnelSyncHealthy() should have returned an error.") assert.Error(err, "IsTunnelSyncHealthy() should have returned an error.")
} }

View File

@ -100,8 +100,12 @@ func (c *Repair) runOnce() error {
} }
ctx := api.WithNamespace(api.NewDefaultContext(), api.NamespaceAll) ctx := api.WithNamespace(api.NewDefaultContext(), api.NamespaceAll)
options := &api.ListOptions{ResourceVersion: latest.ObjectMeta.ResourceVersion} // We explicitly send no resource version, since the resource version
list, err := c.registry.ListServices(ctx, options) // of 'latest' is from a different collection, it's not comparable to
// the service collection. The caching layer keeps per-collection RVs,
// and this is proper, since in theory the collections could be hosted
// in separate etcd (or even non-etcd) instances.
list, err := c.registry.ListServices(ctx, nil)
if err != nil { if err != nil {
return fmt.Errorf("unable to refresh the service IP block: %v", err) return fmt.Errorf("unable to refresh the service IP block: %v", err)
} }

View File

@ -313,7 +313,10 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, f
} }
filterFunc := filterFunction(key, c.keyFunc, filter) filterFunc := filterFunction(key, c.keyFunc, filter)
objs, readResourceVersion := c.watchCache.WaitUntilFreshAndList(listRV) objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV)
if err != nil {
return fmt.Errorf("failed to wait for fresh list: %v", err)
}
for _, obj := range objs { for _, obj := range objs {
object, ok := obj.(runtime.Object) object, ok := obj.(runtime.Object)
if !ok { if !ok {

View File

@ -21,14 +21,22 @@ import (
"sort" "sort"
"strconv" "strconv"
"sync" "sync"
"time"
"k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
) )
const (
// MaximumListWait determines how long we're willing to wait for a
// list if a client specified a resource version in the future.
MaximumListWait = 60 * time.Second
)
// watchCacheEvent is a single "watch event" that is send to users of // watchCacheEvent is a single "watch event" that is send to users of
// watchCache. Additionally to a typical "watch.Event" it contains // watchCache. Additionally to a typical "watch.Event" it contains
// the previous value of the object to enable proper filtering in the // the previous value of the object to enable proper filtering in the
@ -85,6 +93,9 @@ type watchCache struct {
// This handler is run at the end of every Add/Update/Delete method // This handler is run at the end of every Add/Update/Delete method
// and additionally gets the previous value of the object. // and additionally gets the previous value of the object.
onEvent func(watchCacheEvent) onEvent func(watchCacheEvent)
// for testing timeouts.
clock util.Clock
} }
func newWatchCache(capacity int) *watchCache { func newWatchCache(capacity int) *watchCache {
@ -95,6 +106,7 @@ func newWatchCache(capacity int) *watchCache {
endIndex: 0, endIndex: 0,
store: cache.NewStore(cache.MetaNamespaceKeyFunc), store: cache.NewStore(cache.MetaNamespaceKeyFunc),
resourceVersion: 0, resourceVersion: 0,
clock: util.RealClock{},
} }
wc.cond = sync.NewCond(wc.RLocker()) wc.cond = sync.NewCond(wc.RLocker())
return wc return wc
@ -193,13 +205,29 @@ func (w *watchCache) List() []interface{} {
return w.store.List() return w.store.List()
} }
func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64) ([]interface{}, uint64) { func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64) ([]interface{}, uint64, error) {
startTime := w.clock.Now()
go func() {
// Wake us up when the time limit has expired. The docs
// promise that time.After (well, NewTimer, which it calls)
// will wait *at least* the duration given. Since this go
// routine starts sometime after we record the start time, and
// it will wake up the loop below sometime after the broadcast,
// we don't need to worry about waking it up before the time
// has expired accidentally.
<-w.clock.After(MaximumListWait)
w.cond.Broadcast()
}()
w.RLock() w.RLock()
for w.resourceVersion < resourceVersion { for w.resourceVersion < resourceVersion {
if w.clock.Since(startTime) >= MaximumListWait {
return nil, 0, fmt.Errorf("time limit exceeded while waiting for resource version %v (current value: %v)", resourceVersion, w.resourceVersion)
}
w.cond.Wait() w.cond.Wait()
} }
defer w.RUnlock() defer w.RUnlock()
return w.store.List(), w.resourceVersion return w.store.List(), w.resourceVersion, nil
} }
func (w *watchCache) ListKeys() []string { func (w *watchCache) ListKeys() []string {

View File

@ -19,6 +19,7 @@ package storage
import ( import (
"strconv" "strconv"
"testing" "testing"
"time"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/errors"
@ -40,8 +41,15 @@ func makeTestPod(name string, resourceVersion uint64) *api.Pod {
} }
} }
// newTestWatchCache just adds a fake clock.
func newTestWatchCache(capacity int) *watchCache {
wc := newWatchCache(capacity)
wc.clock = util.NewFakeClock(time.Now())
return wc
}
func TestWatchCacheBasic(t *testing.T) { func TestWatchCacheBasic(t *testing.T) {
store := newWatchCache(2) store := newTestWatchCache(2)
// Test Add/Update/Delete. // Test Add/Update/Delete.
pod1 := makeTestPod("pod", 1) pod1 := makeTestPod("pod", 1)
@ -111,7 +119,7 @@ func TestWatchCacheBasic(t *testing.T) {
} }
func TestEvents(t *testing.T) { func TestEvents(t *testing.T) {
store := newWatchCache(5) store := newTestWatchCache(5)
store.Add(makeTestPod("pod", 2)) store.Add(makeTestPod("pod", 2))
@ -231,7 +239,7 @@ func TestEvents(t *testing.T) {
} }
func TestWaitUntilFreshAndList(t *testing.T) { func TestWaitUntilFreshAndList(t *testing.T) {
store := newWatchCache(3) store := newTestWatchCache(3)
// In background, update the store. // In background, update the store.
go func() { go func() {
@ -239,7 +247,10 @@ func TestWaitUntilFreshAndList(t *testing.T) {
store.Add(makeTestPod("bar", 5)) store.Add(makeTestPod("bar", 5))
}() }()
list, resourceVersion := store.WaitUntilFreshAndList(4) list, resourceVersion, err := store.WaitUntilFreshAndList(5)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if resourceVersion != 5 { if resourceVersion != 5 {
t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion) t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion)
} }
@ -248,6 +259,30 @@ func TestWaitUntilFreshAndList(t *testing.T) {
} }
} }
func TestWaitUntilFreshAndListTimeout(t *testing.T) {
store := newTestWatchCache(3)
fc := store.clock.(*util.FakeClock)
// In background, step clock after the below call starts the timer.
go func() {
for !fc.HasWaiters() {
time.Sleep(time.Millisecond)
}
fc.Step(MaximumListWait)
// Add an object to make sure the test would
// eventually fail instead of just waiting
// forever.
time.Sleep(30 * time.Second)
store.Add(makeTestPod("bar", 5))
}()
_, _, err := store.WaitUntilFreshAndList(5)
if err == nil {
t.Fatalf("unexpected lack of timeout error")
}
}
type testLW struct { type testLW struct {
ListFunc func(options api.ListOptions) (runtime.Object, error) ListFunc func(options api.ListOptions) (runtime.Object, error)
WatchFunc func(options api.ListOptions) (watch.Interface, error) WatchFunc func(options api.ListOptions) (watch.Interface, error)
@ -261,10 +296,13 @@ func (t *testLW) Watch(options api.ListOptions) (watch.Interface, error) {
} }
func TestReflectorForWatchCache(t *testing.T) { func TestReflectorForWatchCache(t *testing.T) {
store := newWatchCache(5) store := newTestWatchCache(5)
{ {
_, version := store.WaitUntilFreshAndList(0) _, version, err := store.WaitUntilFreshAndList(0)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if version != 0 { if version != 0 {
t.Errorf("unexpected resource version: %d", version) t.Errorf("unexpected resource version: %d", version)
} }
@ -284,7 +322,10 @@ func TestReflectorForWatchCache(t *testing.T) {
r.ListAndWatch(util.NeverStop) r.ListAndWatch(util.NeverStop)
{ {
_, version := store.WaitUntilFreshAndList(10) _, version, err := store.WaitUntilFreshAndList(10)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if version != 10 { if version != 10 {
t.Errorf("unexpected resource version: %d", version) t.Errorf("unexpected resource version: %d", version)
} }

View File

@ -32,7 +32,7 @@ func NewFakeBackOff(initial, max time.Duration, tc *FakeClock) *Backoff {
func TestSlowBackoff(t *testing.T) { func TestSlowBackoff(t *testing.T) {
id := "_idSlow" id := "_idSlow"
tc := &FakeClock{Time: time.Now()} tc := NewFakeClock(time.Now())
step := time.Second step := time.Second
maxDuration := 50 * step maxDuration := 50 * step
@ -58,7 +58,7 @@ func TestSlowBackoff(t *testing.T) {
func TestBackoffReset(t *testing.T) { func TestBackoffReset(t *testing.T) {
id := "_idReset" id := "_idReset"
tc := &FakeClock{Time: time.Now()} tc := NewFakeClock(time.Now())
step := time.Second step := time.Second
maxDuration := step * 5 maxDuration := step * 5
b := NewFakeBackOff(step, maxDuration, tc) b := NewFakeBackOff(step, maxDuration, tc)
@ -84,7 +84,7 @@ func TestBackoffReset(t *testing.T) {
func TestBackoffHightWaterMark(t *testing.T) { func TestBackoffHightWaterMark(t *testing.T) {
id := "_idHiWaterMark" id := "_idHiWaterMark"
tc := &FakeClock{Time: time.Now()} tc := NewFakeClock(time.Now())
step := time.Second step := time.Second
maxDuration := 5 * step maxDuration := 5 * step
b := NewFakeBackOff(step, maxDuration, tc) b := NewFakeBackOff(step, maxDuration, tc)
@ -106,7 +106,7 @@ func TestBackoffHightWaterMark(t *testing.T) {
func TestBackoffGC(t *testing.T) { func TestBackoffGC(t *testing.T) {
id := "_idGC" id := "_idGC"
tc := &FakeClock{Time: time.Now()} tc := NewFakeClock(time.Now())
step := time.Second step := time.Second
maxDuration := 5 * step maxDuration := 5 * step
@ -134,7 +134,7 @@ func TestBackoffGC(t *testing.T) {
func TestIsInBackOffSinceUpdate(t *testing.T) { func TestIsInBackOffSinceUpdate(t *testing.T) {
id := "_idIsInBackOffSinceUpdate" id := "_idIsInBackOffSinceUpdate"
tc := &FakeClock{Time: time.Now()} tc := NewFakeClock(time.Now())
step := time.Second step := time.Second
maxDuration := 10 * step maxDuration := 10 * step
b := NewFakeBackOff(step, maxDuration, tc) b := NewFakeBackOff(step, maxDuration, tc)
@ -186,7 +186,7 @@ func TestIsInBackOffSinceUpdate(t *testing.T) {
} }
for _, c := range cases { for _, c := range cases {
tc.Time = startTime.Add(c.tick * step) tc.SetTime(startTime.Add(c.tick * step))
if c.inBackOff != b.IsInBackOffSinceUpdate(id, tc.Now()) { if c.inBackOff != b.IsInBackOffSinceUpdate(id, tc.Now()) {
t.Errorf("expected IsInBackOffSinceUpdate %v got %v at tick %s", c.inBackOff, b.IsInBackOffSinceUpdate(id, tc.Now()), c.tick*step) t.Errorf("expected IsInBackOffSinceUpdate %v got %v at tick %s", c.inBackOff, b.IsInBackOffSinceUpdate(id, tc.Now()), c.tick*step)
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package util package util
import ( import (
"sync"
"time" "time"
) )
@ -25,39 +26,115 @@ import (
type Clock interface { type Clock interface {
Now() time.Time Now() time.Time
Since(time.Time) time.Duration Since(time.Time) time.Duration
After(d time.Duration) <-chan time.Time
} }
var (
_ = Clock(RealClock{})
_ = Clock(&FakeClock{})
_ = Clock(&IntervalClock{})
)
// RealClock really calls time.Now() // RealClock really calls time.Now()
type RealClock struct{} type RealClock struct{}
// Now returns the current time. // Now returns the current time.
func (r RealClock) Now() time.Time { func (RealClock) Now() time.Time {
return time.Now() return time.Now()
} }
// Since returns time since the specified timestamp. // Since returns time since the specified timestamp.
func (r RealClock) Since(ts time.Time) time.Duration { func (RealClock) Since(ts time.Time) time.Duration {
return time.Since(ts) return time.Since(ts)
} }
// Same as time.After(d).
func (RealClock) After(d time.Duration) <-chan time.Time {
return time.After(d)
}
// FakeClock implements Clock, but returns an arbitrary time. // FakeClock implements Clock, but returns an arbitrary time.
type FakeClock struct { type FakeClock struct {
Time time.Time lock sync.RWMutex
time time.Time
// waiters are waiting for the fake time to pass their specified time
waiters []fakeClockWaiter
}
type fakeClockWaiter struct {
targetTime time.Time
destChan chan<- time.Time
}
func NewFakeClock(t time.Time) *FakeClock {
return &FakeClock{
time: t,
}
} }
// Now returns f's time. // Now returns f's time.
func (f *FakeClock) Now() time.Time { func (f *FakeClock) Now() time.Time {
return f.Time f.lock.RLock()
defer f.lock.RUnlock()
return f.time
} }
// Since returns time since the time in f. // Since returns time since the time in f.
func (f *FakeClock) Since(ts time.Time) time.Duration { func (f *FakeClock) Since(ts time.Time) time.Duration {
return f.Time.Sub(ts) f.lock.RLock()
defer f.lock.RUnlock()
return f.time.Sub(ts)
} }
// Move clock by Duration // Fake version of time.After(d).
func (f *FakeClock) After(d time.Duration) <-chan time.Time {
f.lock.Lock()
defer f.lock.Unlock()
stopTime := f.time.Add(d)
ch := make(chan time.Time, 1) // Don't block!
f.waiters = append(f.waiters, fakeClockWaiter{
targetTime: stopTime,
destChan: ch,
})
return ch
}
// Move clock by Duration, notify anyone that's called After
func (f *FakeClock) Step(d time.Duration) { func (f *FakeClock) Step(d time.Duration) {
f.Time = f.Time.Add(d) f.lock.Lock()
defer f.lock.Unlock()
f.setTimeLocked(f.time.Add(d))
}
// Sets the time.
func (f *FakeClock) SetTime(t time.Time) {
f.lock.Lock()
defer f.lock.Unlock()
f.setTimeLocked(t)
}
// Actually changes the time and checks any waiters. f must be write-locked.
func (f *FakeClock) setTimeLocked(t time.Time) {
f.time = t
newWaiters := make([]fakeClockWaiter, 0, len(f.waiters))
for i := range f.waiters {
w := &f.waiters[i]
if !w.targetTime.After(t) {
w.destChan <- t
} else {
newWaiters = append(newWaiters, f.waiters[i])
}
}
f.waiters = newWaiters
}
// Returns true if After has been called on f but not yet satisfied (so you can
// write race-free tests).
func (f *FakeClock) HasWaiters() bool {
f.lock.RLock()
defer f.lock.RUnlock()
return len(f.waiters) > 0
} }
// IntervalClock implements Clock, but each invocation of Now steps the clock forward the specified duration // IntervalClock implements Clock, but each invocation of Now steps the clock forward the specified duration
@ -76,3 +153,9 @@ func (i *IntervalClock) Now() time.Time {
func (i *IntervalClock) Since(ts time.Time) time.Duration { func (i *IntervalClock) Since(ts time.Time) time.Duration {
return i.Time.Sub(ts) return i.Time.Sub(ts)
} }
// Unimplemented, will panic.
// TODO: make interval clock use FakeClock so this can be implemented.
func (*IntervalClock) After(d time.Duration) <-chan time.Time {
panic("IntervalClock doesn't implement After")
}

View File

@ -23,7 +23,7 @@ import (
func TestFakeClock(t *testing.T) { func TestFakeClock(t *testing.T) {
startTime := time.Now() startTime := time.Now()
tc := &FakeClock{Time: startTime} tc := NewFakeClock(startTime)
tc.Step(time.Second) tc.Step(time.Second)
now := tc.Now() now := tc.Now()
if now.Sub(startTime) != time.Second { if now.Sub(startTime) != time.Second {
@ -31,8 +31,66 @@ func TestFakeClock(t *testing.T) {
} }
tt := tc.Now() tt := tc.Now()
tc.Time = tt.Add(time.Hour) tc.SetTime(tt.Add(time.Hour))
if tc.Now().Sub(tt) != time.Hour { if tc.Now().Sub(tt) != time.Hour {
t.Errorf("input: %s now=%s gap=%s expected=%s", tt, tc.Now(), tc.Now().Sub(tt), time.Hour) t.Errorf("input: %s now=%s gap=%s expected=%s", tt, tc.Now(), tc.Now().Sub(tt), time.Hour)
} }
} }
func TestFakeAfter(t *testing.T) {
tc := NewFakeClock(time.Now())
if tc.HasWaiters() {
t.Errorf("unexpected waiter?")
}
oneSec := tc.After(time.Second)
if !tc.HasWaiters() {
t.Errorf("unexpected lack of waiter?")
}
oneOhOneSec := tc.After(time.Second + time.Millisecond)
twoSec := tc.After(2 * time.Second)
select {
case <-oneSec:
t.Errorf("unexpected channel read")
case <-oneOhOneSec:
t.Errorf("unexpected channel read")
case <-twoSec:
t.Errorf("unexpected channel read")
default:
}
tc.Step(999 * time.Millisecond)
select {
case <-oneSec:
t.Errorf("unexpected channel read")
case <-oneOhOneSec:
t.Errorf("unexpected channel read")
case <-twoSec:
t.Errorf("unexpected channel read")
default:
}
tc.Step(time.Millisecond)
select {
case <-oneSec:
// Expected!
case <-oneOhOneSec:
t.Errorf("unexpected channel read")
case <-twoSec:
t.Errorf("unexpected channel read")
default:
t.Errorf("unexpected non-channel read")
}
tc.Step(time.Millisecond)
select {
case <-oneSec:
// should not double-trigger!
t.Errorf("unexpected channel read")
case <-oneOhOneSec:
// Expected!
case <-twoSec:
t.Errorf("unexpected channel read")
default:
t.Errorf("unexpected non-channel read")
}
}

View File

@ -176,7 +176,7 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) {
// all entries inserted with fakeTime will expire. // all entries inserted with fakeTime will expire.
ttl := 30 * time.Second ttl := 30 * time.Second
fakeTime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) fakeTime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
fakeClock := &util.FakeClock{Time: fakeTime} fakeClock := util.NewFakeClock(fakeTime)
ttlPolicy := &cache.TTLPolicy{Ttl: ttl, Clock: fakeClock} ttlPolicy := &cache.TTLPolicy{Ttl: ttl, Clock: fakeClock}
assumedPodsStore := cache.NewFakeExpirationStore( assumedPodsStore := cache.NewFakeExpirationStore(
cache.MetaNamespaceKeyFunc, nil, ttlPolicy, fakeClock) cache.MetaNamespaceKeyFunc, nil, ttlPolicy, fakeClock)
@ -274,7 +274,7 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) {
// Second scheduling pass will fail to schedule if the store hasn't expired // Second scheduling pass will fail to schedule if the store hasn't expired
// the deleted pod. This would normally happen with a timeout. // the deleted pod. This would normally happen with a timeout.
//expirationPolicy.NeverExpire = util.NewStringSet() //expirationPolicy.NeverExpire = util.NewStringSet()
fakeClock.Time = fakeClock.Time.Add(ttl + 1) fakeClock.Step(ttl + 1)
called = make(chan struct{}) called = make(chan struct{})
events = eventBroadcaster.StartEventWatcher(func(e *api.Event) { events = eventBroadcaster.StartEventWatcher(func(e *api.Event) {