Make watch cache behave like uncached watch

This commit is contained in:
Jordan Liggitt 2016-04-07 19:53:41 -04:00
parent acf9492cb1
commit ada60236f7
4 changed files with 23 additions and 18 deletions

View File

@ -30,6 +30,7 @@ import (
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/conversion" "k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
@ -542,6 +543,8 @@ func (c *cacheWatcher) sendWatchCacheEvent(event watchCacheEvent) {
} }
func (c *cacheWatcher) process(initEvents []watchCacheEvent) { func (c *cacheWatcher) process(initEvents []watchCacheEvent) {
defer utilruntime.HandleCrash()
for _, event := range initEvents { for _, event := range initEvents {
c.sendWatchCacheEvent(event) c.sendWatchCacheEvent(event)
} }

View File

@ -19,6 +19,7 @@ package storage_test
import ( import (
"fmt" "fmt"
"reflect" "reflect"
goruntime "runtime"
"strconv" "strconv"
"testing" "testing"
"time" "time"
@ -159,15 +160,19 @@ func TestList(t *testing.T) {
} }
func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType, eventObject runtime.Object) { func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType, eventObject runtime.Object) {
_, _, line, _ := goruntime.Caller(1)
select { select {
case event := <-w.ResultChan(): case event := <-w.ResultChan():
if e, a := eventType, event.Type; e != a { if e, a := eventType, event.Type; e != a {
t.Logf("(called from line %d)", line)
t.Errorf("Expected: %s, got: %s", eventType, event.Type) t.Errorf("Expected: %s, got: %s", eventType, event.Type)
} }
if e, a := eventObject, event.Object; !api.Semantic.DeepDerivative(e, a) { if e, a := eventObject, event.Object; !api.Semantic.DeepDerivative(e, a) {
t.Logf("(called from line %d)", line)
t.Errorf("Expected (%s): %#v, got: %#v", eventType, e, a) t.Errorf("Expected (%s): %#v, got: %#v", eventType, e, a)
} }
case <-time.After(wait.ForeverTestTimeout): case <-time.After(wait.ForeverTestTimeout):
t.Logf("(called from line %d)", line)
t.Errorf("Timed out waiting for an event") t.Errorf("Timed out waiting for an event")
} }
} }
@ -236,7 +241,6 @@ func TestWatch(t *testing.T) {
} }
defer initialWatcher.Stop() defer initialWatcher.Stop()
verifyWatchEvent(t, initialWatcher, watch.Added, podFoo)
verifyWatchEvent(t, initialWatcher, watch.Modified, podFooPrime) verifyWatchEvent(t, initialWatcher, watch.Modified, podFooPrime)
// Now test watch from "now". // Now test watch from "now".
@ -335,7 +339,6 @@ func TestFiltering(t *testing.T) {
} }
defer watcher.Stop() defer watcher.Stop()
verifyWatchEvent(t, watcher, watch.Added, podFoo)
verifyWatchEvent(t, watcher, watch.Deleted, podFooFiltered) verifyWatchEvent(t, watcher, watch.Deleted, podFooFiltered)
verifyWatchEvent(t, watcher, watch.Added, podFoo) verifyWatchEvent(t, watcher, watch.Added, podFoo)
verifyWatchEvent(t, watcher, watch.Modified, podFooPrime) verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)

View File

@ -302,14 +302,13 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]wa
} }
return result, nil return result, nil
} }
if resourceVersion < oldest { if resourceVersion < oldest-1 {
return nil, errors.NewGone(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest)) return nil, errors.NewGone(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1))
} }
// Binary search the smallest index at which resourceVersion is not smaller than // Binary search the smallest index at which resourceVersion is greater than the given one.
// the given one.
f := func(i int) bool { f := func(i int) bool {
return w.cache[(w.startIndex+i)%w.capacity].resourceVersion >= resourceVersion return w.cache[(w.startIndex+i)%w.capacity].resourceVersion > resourceVersion
} }
first := sort.Search(size, f) first := sort.Search(size, f)
result := make([]watchCacheEvent, size-first) result := make([]watchCacheEvent, size-first)

View File

@ -122,7 +122,7 @@ func TestWatchCacheBasic(t *testing.T) {
func TestEvents(t *testing.T) { func TestEvents(t *testing.T) {
store := newTestWatchCache(5) store := newTestWatchCache(5)
store.Add(makeTestPod("pod", 2)) store.Add(makeTestPod("pod", 3))
// Test for Added event. // Test for Added event.
{ {
@ -145,7 +145,7 @@ func TestEvents(t *testing.T) {
if result[0].Type != watch.Added { if result[0].Type != watch.Added {
t.Errorf("unexpected event type: %v", result[0].Type) t.Errorf("unexpected event type: %v", result[0].Type)
} }
pod := makeTestPod("pod", uint64(2)) pod := makeTestPod("pod", uint64(3))
if !api.Semantic.DeepEqual(pod, result[0].Object) { if !api.Semantic.DeepEqual(pod, result[0].Object) {
t.Errorf("unexpected item: %v, expected: %v", result[0].Object, pod) t.Errorf("unexpected item: %v, expected: %v", result[0].Object, pod)
} }
@ -154,8 +154,8 @@ func TestEvents(t *testing.T) {
} }
} }
store.Update(makeTestPod("pod", 3))
store.Update(makeTestPod("pod", 4)) store.Update(makeTestPod("pod", 4))
store.Update(makeTestPod("pod", 5))
// Test with not full cache. // Test with not full cache.
{ {
@ -176,22 +176,22 @@ func TestEvents(t *testing.T) {
if result[i].Type != watch.Modified { if result[i].Type != watch.Modified {
t.Errorf("unexpected event type: %v", result[i].Type) t.Errorf("unexpected event type: %v", result[i].Type)
} }
pod := makeTestPod("pod", uint64(i+3)) pod := makeTestPod("pod", uint64(i+4))
if !api.Semantic.DeepEqual(pod, result[i].Object) { if !api.Semantic.DeepEqual(pod, result[i].Object) {
t.Errorf("unexpected item: %v, expected: %v", result[i].Object, pod) t.Errorf("unexpected item: %v, expected: %v", result[i].Object, pod)
} }
prevPod := makeTestPod("pod", uint64(i+2)) prevPod := makeTestPod("pod", uint64(i+3))
if !api.Semantic.DeepEqual(prevPod, result[i].PrevObject) { if !api.Semantic.DeepEqual(prevPod, result[i].PrevObject) {
t.Errorf("unexpected item: %v, expected: %v", result[i].PrevObject, prevPod) t.Errorf("unexpected item: %v, expected: %v", result[i].PrevObject, prevPod)
} }
} }
} }
for i := 5; i < 9; i++ { for i := 6; i < 10; i++ {
store.Update(makeTestPod("pod", uint64(i))) store.Update(makeTestPod("pod", uint64(i)))
} }
// Test with full cache - there should be elements from 4 to 8. // Test with full cache - there should be elements from 5 to 9.
{ {
_, err := store.GetAllEventsSince(3) _, err := store.GetAllEventsSince(3)
if err == nil { if err == nil {
@ -207,7 +207,7 @@ func TestEvents(t *testing.T) {
t.Fatalf("unexpected events: %v", result) t.Fatalf("unexpected events: %v", result)
} }
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
pod := makeTestPod("pod", uint64(i+4)) pod := makeTestPod("pod", uint64(i+5))
if !api.Semantic.DeepEqual(pod, result[i].Object) { if !api.Semantic.DeepEqual(pod, result[i].Object) {
t.Errorf("unexpected item: %v, expected: %v", result[i].Object, pod) t.Errorf("unexpected item: %v, expected: %v", result[i].Object, pod)
} }
@ -215,7 +215,7 @@ func TestEvents(t *testing.T) {
} }
// Test for delete event. // Test for delete event.
store.Delete(makeTestPod("pod", uint64(9))) store.Delete(makeTestPod("pod", uint64(10)))
{ {
result, err := store.GetAllEventsSince(9) result, err := store.GetAllEventsSince(9)
@ -228,11 +228,11 @@ func TestEvents(t *testing.T) {
if result[0].Type != watch.Deleted { if result[0].Type != watch.Deleted {
t.Errorf("unexpected event type: %v", result[0].Type) t.Errorf("unexpected event type: %v", result[0].Type)
} }
pod := makeTestPod("pod", uint64(9)) pod := makeTestPod("pod", uint64(10))
if !api.Semantic.DeepEqual(pod, result[0].Object) { if !api.Semantic.DeepEqual(pod, result[0].Object) {
t.Errorf("unexpected item: %v, expected: %v", result[0].Object, pod) t.Errorf("unexpected item: %v, expected: %v", result[0].Object, pod)
} }
prevPod := makeTestPod("pod", uint64(8)) prevPod := makeTestPod("pod", uint64(9))
if !api.Semantic.DeepEqual(prevPod, result[0].PrevObject) { if !api.Semantic.DeepEqual(prevPod, result[0].PrevObject) {
t.Errorf("unexpected item: %v, expected: %v", result[0].PrevObject, prevPod) t.Errorf("unexpected item: %v, expected: %v", result[0].PrevObject, prevPod)
} }