Merge pull request #24008 from liggitt/watch_cache

Automatic merge from submit-queue

Make watch cache treat resourceVersion consistent with uncached watch

Fixes #24004

This makes the watch cache handle resourceVersion consistent with an uncached watch API call, and the documented behavior. Watching from resourceVersion=X delivers watch events *after* version X (X is not included):

> // When specified with a watch call, shows changes that occur after that particular version of a resource.
> // Defaults to changes from the beginning of history.
> ResourceVersion string
This commit is contained in:
k8s-merge-robot 2016-04-12 10:23:33 -07:00
commit f4a421d4bb
4 changed files with 23 additions and 18 deletions

View File

@ -30,6 +30,7 @@ import (
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/conversion" "k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
@ -542,6 +543,8 @@ func (c *cacheWatcher) sendWatchCacheEvent(event watchCacheEvent) {
} }
func (c *cacheWatcher) process(initEvents []watchCacheEvent) { func (c *cacheWatcher) process(initEvents []watchCacheEvent) {
defer utilruntime.HandleCrash()
for _, event := range initEvents { for _, event := range initEvents {
c.sendWatchCacheEvent(event) c.sendWatchCacheEvent(event)
} }

View File

@ -19,6 +19,7 @@ package storage_test
import ( import (
"fmt" "fmt"
"reflect" "reflect"
goruntime "runtime"
"strconv" "strconv"
"testing" "testing"
"time" "time"
@ -159,15 +160,19 @@ func TestList(t *testing.T) {
} }
func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType, eventObject runtime.Object) { func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType, eventObject runtime.Object) {
_, _, line, _ := goruntime.Caller(1)
select { select {
case event := <-w.ResultChan(): case event := <-w.ResultChan():
if e, a := eventType, event.Type; e != a { if e, a := eventType, event.Type; e != a {
t.Logf("(called from line %d)", line)
t.Errorf("Expected: %s, got: %s", eventType, event.Type) t.Errorf("Expected: %s, got: %s", eventType, event.Type)
} }
if e, a := eventObject, event.Object; !api.Semantic.DeepDerivative(e, a) { if e, a := eventObject, event.Object; !api.Semantic.DeepDerivative(e, a) {
t.Logf("(called from line %d)", line)
t.Errorf("Expected (%s): %#v, got: %#v", eventType, e, a) t.Errorf("Expected (%s): %#v, got: %#v", eventType, e, a)
} }
case <-time.After(wait.ForeverTestTimeout): case <-time.After(wait.ForeverTestTimeout):
t.Logf("(called from line %d)", line)
t.Errorf("Timed out waiting for an event") t.Errorf("Timed out waiting for an event")
} }
} }
@ -236,7 +241,6 @@ func TestWatch(t *testing.T) {
} }
defer initialWatcher.Stop() defer initialWatcher.Stop()
verifyWatchEvent(t, initialWatcher, watch.Added, podFoo)
verifyWatchEvent(t, initialWatcher, watch.Modified, podFooPrime) verifyWatchEvent(t, initialWatcher, watch.Modified, podFooPrime)
// Now test watch from "now". // Now test watch from "now".
@ -335,7 +339,6 @@ func TestFiltering(t *testing.T) {
} }
defer watcher.Stop() defer watcher.Stop()
verifyWatchEvent(t, watcher, watch.Added, podFoo)
verifyWatchEvent(t, watcher, watch.Deleted, podFooFiltered) verifyWatchEvent(t, watcher, watch.Deleted, podFooFiltered)
verifyWatchEvent(t, watcher, watch.Added, podFoo) verifyWatchEvent(t, watcher, watch.Added, podFoo)
verifyWatchEvent(t, watcher, watch.Modified, podFooPrime) verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)

View File

@ -302,14 +302,13 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]wa
} }
return result, nil return result, nil
} }
if resourceVersion < oldest { if resourceVersion < oldest-1 {
return nil, errors.NewGone(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest)) return nil, errors.NewGone(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1))
} }
// Binary search the smallest index at which resourceVersion is not smaller than // Binary search the smallest index at which resourceVersion is greater than the given one.
// the given one.
f := func(i int) bool { f := func(i int) bool {
return w.cache[(w.startIndex+i)%w.capacity].resourceVersion >= resourceVersion return w.cache[(w.startIndex+i)%w.capacity].resourceVersion > resourceVersion
} }
first := sort.Search(size, f) first := sort.Search(size, f)
result := make([]watchCacheEvent, size-first) result := make([]watchCacheEvent, size-first)

View File

@ -122,7 +122,7 @@ func TestWatchCacheBasic(t *testing.T) {
func TestEvents(t *testing.T) { func TestEvents(t *testing.T) {
store := newTestWatchCache(5) store := newTestWatchCache(5)
store.Add(makeTestPod("pod", 2)) store.Add(makeTestPod("pod", 3))
// Test for Added event. // Test for Added event.
{ {
@ -145,7 +145,7 @@ func TestEvents(t *testing.T) {
if result[0].Type != watch.Added { if result[0].Type != watch.Added {
t.Errorf("unexpected event type: %v", result[0].Type) t.Errorf("unexpected event type: %v", result[0].Type)
} }
pod := makeTestPod("pod", uint64(2)) pod := makeTestPod("pod", uint64(3))
if !api.Semantic.DeepEqual(pod, result[0].Object) { if !api.Semantic.DeepEqual(pod, result[0].Object) {
t.Errorf("unexpected item: %v, expected: %v", result[0].Object, pod) t.Errorf("unexpected item: %v, expected: %v", result[0].Object, pod)
} }
@ -154,8 +154,8 @@ func TestEvents(t *testing.T) {
} }
} }
store.Update(makeTestPod("pod", 3))
store.Update(makeTestPod("pod", 4)) store.Update(makeTestPod("pod", 4))
store.Update(makeTestPod("pod", 5))
// Test with not full cache. // Test with not full cache.
{ {
@ -176,22 +176,22 @@ func TestEvents(t *testing.T) {
if result[i].Type != watch.Modified { if result[i].Type != watch.Modified {
t.Errorf("unexpected event type: %v", result[i].Type) t.Errorf("unexpected event type: %v", result[i].Type)
} }
pod := makeTestPod("pod", uint64(i+3)) pod := makeTestPod("pod", uint64(i+4))
if !api.Semantic.DeepEqual(pod, result[i].Object) { if !api.Semantic.DeepEqual(pod, result[i].Object) {
t.Errorf("unexpected item: %v, expected: %v", result[i].Object, pod) t.Errorf("unexpected item: %v, expected: %v", result[i].Object, pod)
} }
prevPod := makeTestPod("pod", uint64(i+2)) prevPod := makeTestPod("pod", uint64(i+3))
if !api.Semantic.DeepEqual(prevPod, result[i].PrevObject) { if !api.Semantic.DeepEqual(prevPod, result[i].PrevObject) {
t.Errorf("unexpected item: %v, expected: %v", result[i].PrevObject, prevPod) t.Errorf("unexpected item: %v, expected: %v", result[i].PrevObject, prevPod)
} }
} }
} }
for i := 5; i < 9; i++ { for i := 6; i < 10; i++ {
store.Update(makeTestPod("pod", uint64(i))) store.Update(makeTestPod("pod", uint64(i)))
} }
// Test with full cache - there should be elements from 4 to 8. // Test with full cache - there should be elements from 5 to 9.
{ {
_, err := store.GetAllEventsSince(3) _, err := store.GetAllEventsSince(3)
if err == nil { if err == nil {
@ -207,7 +207,7 @@ func TestEvents(t *testing.T) {
t.Fatalf("unexpected events: %v", result) t.Fatalf("unexpected events: %v", result)
} }
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
pod := makeTestPod("pod", uint64(i+4)) pod := makeTestPod("pod", uint64(i+5))
if !api.Semantic.DeepEqual(pod, result[i].Object) { if !api.Semantic.DeepEqual(pod, result[i].Object) {
t.Errorf("unexpected item: %v, expected: %v", result[i].Object, pod) t.Errorf("unexpected item: %v, expected: %v", result[i].Object, pod)
} }
@ -215,7 +215,7 @@ func TestEvents(t *testing.T) {
} }
// Test for delete event. // Test for delete event.
store.Delete(makeTestPod("pod", uint64(9))) store.Delete(makeTestPod("pod", uint64(10)))
{ {
result, err := store.GetAllEventsSince(9) result, err := store.GetAllEventsSince(9)
@ -228,11 +228,11 @@ func TestEvents(t *testing.T) {
if result[0].Type != watch.Deleted { if result[0].Type != watch.Deleted {
t.Errorf("unexpected event type: %v", result[0].Type) t.Errorf("unexpected event type: %v", result[0].Type)
} }
pod := makeTestPod("pod", uint64(9)) pod := makeTestPod("pod", uint64(10))
if !api.Semantic.DeepEqual(pod, result[0].Object) { if !api.Semantic.DeepEqual(pod, result[0].Object) {
t.Errorf("unexpected item: %v, expected: %v", result[0].Object, pod) t.Errorf("unexpected item: %v, expected: %v", result[0].Object, pod)
} }
prevPod := makeTestPod("pod", uint64(8)) prevPod := makeTestPod("pod", uint64(9))
if !api.Semantic.DeepEqual(prevPod, result[0].PrevObject) { if !api.Semantic.DeepEqual(prevPod, result[0].PrevObject) {
t.Errorf("unexpected item: %v, expected: %v", result[0].PrevObject, prevPod) t.Errorf("unexpected item: %v, expected: %v", result[0].PrevObject, prevPod)
} }