add timeout to cacher

This commit is contained in:
Daniel Smith 2016-02-01 10:50:22 -08:00
parent 4a7d70aef1
commit 26683fda29
3 changed files with 82 additions and 10 deletions

View File

@ -313,7 +313,10 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, f
}
filterFunc := filterFunction(key, c.keyFunc, filter)
objs, readResourceVersion := c.watchCache.WaitUntilFreshAndList(listRV)
objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV)
if err != nil {
return fmt.Errorf("failed to wait for fresh list: %v", err)
}
for _, obj := range objs {
object, ok := obj.(runtime.Object)
if !ok {

View File

@ -21,14 +21,22 @@ import (
"sort"
"strconv"
"sync"
"time"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/watch"
)
const (
// MaximumListWait determines how long we're willing to wait for a
// list if a client specified a resource version in the future.
MaximumListWait = 60 * time.Second
)
// watchCacheEvent is a single "watch event" that is send to users of
// watchCache. Additionally to a typical "watch.Event" it contains
// the previous value of the object to enable proper filtering in the
@ -85,6 +93,9 @@ type watchCache struct {
// This handler is run at the end of every Add/Update/Delete method
// and additionally gets the previous value of the object.
onEvent func(watchCacheEvent)
// for testing timeouts.
clock util.Clock
}
func newWatchCache(capacity int) *watchCache {
@ -95,6 +106,7 @@ func newWatchCache(capacity int) *watchCache {
endIndex: 0,
store: cache.NewStore(cache.MetaNamespaceKeyFunc),
resourceVersion: 0,
clock: util.RealClock{},
}
wc.cond = sync.NewCond(wc.RLocker())
return wc
@ -193,13 +205,29 @@ func (w *watchCache) List() []interface{} {
return w.store.List()
}
func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64) ([]interface{}, uint64) {
func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64) ([]interface{}, uint64, error) {
startTime := w.clock.Now()
go func() {
// Wake us up when the time limit has expired. The docs
// promise that time.After (well, NewTimer, which it calls)
// will wait *at least* the duration given. Since this go
// routine starts sometime after we record the start time, and
// it will wake up the loop below sometime after the broadcast,
// we don't need to worry about waking it up before the time
// has expired accidentally.
<-w.clock.After(MaximumListWait)
w.cond.Broadcast()
}()
w.RLock()
for w.resourceVersion < resourceVersion {
if w.clock.Since(startTime) >= MaximumListWait {
return nil, 0, fmt.Errorf("time limit exceeded while waiting for resource version %v (current value: %v)", resourceVersion, w.resourceVersion)
}
w.cond.Wait()
}
defer w.RUnlock()
return w.store.List(), w.resourceVersion
return w.store.List(), w.resourceVersion, nil
}
func (w *watchCache) ListKeys() []string {

View File

@ -19,6 +19,7 @@ package storage
import (
"strconv"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
@ -40,8 +41,15 @@ func makeTestPod(name string, resourceVersion uint64) *api.Pod {
}
}
// newTestWatchCache just adds a fake clock.
func newTestWatchCache(capacity int) *watchCache {
wc := newWatchCache(capacity)
wc.clock = util.NewFakeClock(time.Now())
return wc
}
func TestWatchCacheBasic(t *testing.T) {
store := newWatchCache(2)
store := newTestWatchCache(2)
// Test Add/Update/Delete.
pod1 := makeTestPod("pod", 1)
@ -111,7 +119,7 @@ func TestWatchCacheBasic(t *testing.T) {
}
func TestEvents(t *testing.T) {
store := newWatchCache(5)
store := newTestWatchCache(5)
store.Add(makeTestPod("pod", 2))
@ -231,7 +239,7 @@ func TestEvents(t *testing.T) {
}
func TestWaitUntilFreshAndList(t *testing.T) {
store := newWatchCache(3)
store := newTestWatchCache(3)
// In background, update the store.
go func() {
@ -239,7 +247,10 @@ func TestWaitUntilFreshAndList(t *testing.T) {
store.Add(makeTestPod("bar", 5))
}()
list, resourceVersion := store.WaitUntilFreshAndList(4)
list, resourceVersion, err := store.WaitUntilFreshAndList(5)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if resourceVersion != 5 {
t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion)
}
@ -248,6 +259,30 @@ func TestWaitUntilFreshAndList(t *testing.T) {
}
}
func TestWaitUntilFreshAndListTimeout(t *testing.T) {
store := newTestWatchCache(3)
fc := store.clock.(*util.FakeClock)
// In background, step clock after the below call starts the timer.
go func() {
for !fc.HasWaiters() {
time.Sleep(time.Millisecond)
}
fc.Step(MaximumListWait)
// Add an object to make sure the test would
// eventually fail instead of just waiting
// forever.
time.Sleep(30 * time.Second)
store.Add(makeTestPod("bar", 5))
}()
_, _, err := store.WaitUntilFreshAndList(5)
if err == nil {
t.Fatalf("unexpected lack of timeout error")
}
}
type testLW struct {
ListFunc func(options api.ListOptions) (runtime.Object, error)
WatchFunc func(options api.ListOptions) (watch.Interface, error)
@ -261,10 +296,13 @@ func (t *testLW) Watch(options api.ListOptions) (watch.Interface, error) {
}
func TestReflectorForWatchCache(t *testing.T) {
store := newWatchCache(5)
store := newTestWatchCache(5)
{
_, version := store.WaitUntilFreshAndList(0)
_, version, err := store.WaitUntilFreshAndList(0)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if version != 0 {
t.Errorf("unexpected resource version: %d", version)
}
@ -284,7 +322,10 @@ func TestReflectorForWatchCache(t *testing.T) {
r.ListAndWatch(util.NeverStop)
{
_, version := store.WaitUntilFreshAndList(10)
_, version, err := store.WaitUntilFreshAndList(10)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if version != 10 {
t.Errorf("unexpected resource version: %d", version)
}