From f80b59ba8766c57fe8b9947a5316e2e6d1b8a1d4 Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Mon, 9 May 2016 17:01:20 -0400 Subject: [PATCH] Return 'too old' errors from watch cache via watch stream --- pkg/storage/cacher.go | 48 +++++++++++++++++++++++++++++++++++++- pkg/storage/cacher_test.go | 13 +++++++---- 2 files changed, 56 insertions(+), 5 deletions(-) diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index 20acacc9a87..e7b4d63e2f0 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -18,6 +18,7 @@ package storage import ( "fmt" + "net/http" "reflect" "strconv" "strings" @@ -25,8 +26,10 @@ import ( "time" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/api/rest" + "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/conversion" "k8s.io/kubernetes/pkg/runtime" @@ -259,7 +262,10 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, defer c.watchCache.RUnlock() initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV) if err != nil { - return nil, err + // To match the uncached watch implementation, once we have passed authn/authz/admission, + // and successfully parsed a resource version, other errors must fail with a watch event of type ERROR, + // rather than a directly returned error. + return newErrWatcher(err), nil } c.Lock() @@ -455,6 +461,46 @@ func (lw *cacherListerWatcher) Watch(options api.ListOptions) (watch.Interface, return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, options.ResourceVersion, Everything) } +// cacherWatch implements watch.Interface to return a single error +type errWatcher struct { + result chan watch.Event +} + +func newErrWatcher(err error) *errWatcher { + // Create an error event + errEvent := watch.Event{Type: watch.Error} + switch err := err.(type) { + case runtime.Object: + errEvent.Object = err + case *errors.StatusError: + errEvent.Object = &err.ErrStatus + default: + errEvent.Object = &unversioned.Status{ + Status: unversioned.StatusFailure, + Message: err.Error(), + Reason: unversioned.StatusReasonInternalError, + Code: http.StatusInternalServerError, + } + } + + // Create a watcher with room for a single event, populate it, and close the channel + watcher := &errWatcher{result: make(chan watch.Event, 1)} + watcher.result <- errEvent + close(watcher.result) + + return watcher +} + +// Implements watch.Interface. +func (c *errWatcher) ResultChan() <-chan watch.Event { + return c.result +} + +// Implements watch.Interface. +func (c *errWatcher) Stop() { + // no-op +} + // cacherWatch implements watch.Interface type cacheWatcher struct { sync.Mutex diff --git a/pkg/storage/cacher_test.go b/pkg/storage/cacher_test.go index 67d8b4e0559..21cf571f744 100644 --- a/pkg/storage/cacher_test.go +++ b/pkg/storage/cacher_test.go @@ -25,6 +25,7 @@ import ( "time" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/api/testapi" apitesting "k8s.io/kubernetes/pkg/api/testing" @@ -231,11 +232,15 @@ func TestWatch(t *testing.T) { verifyWatchEvent(t, watcher, watch.Added, podFoo) verifyWatchEvent(t, watcher, watch.Modified, podFooPrime) - // Check whether we get too-old error. - _, err = cacher.Watch(context.TODO(), "pods/ns/foo", "1", storage.Everything) - if err == nil { - t.Errorf("Expected 'error too old' error") + // Check whether we get too-old error via the watch channel + tooOldWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", "1", storage.Everything) + if err != nil { + t.Fatalf("Expected no direct error, got %v", err) } + defer tooOldWatcher.Stop() + // Ensure we get a "Gone" error + expectedGoneError := errors.NewGone("").(*errors.StatusError).ErrStatus + verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedGoneError) initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, storage.Everything) if err != nil {