mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 20:53:33 +00:00
Merge pull request #25369 from liggitt/cached-watch
Automatic merge from submit-queue Return 'too old' errors from watch cache via watch stream Fixes #25151 This PR updates the API server to produce the same results when a watch is attempted with a resourceVersion that is too old, regardless of whether the etcd watch cache is enabled. The expected result is a `200` http status, with a single watch event of type `ERROR`. Previously, the watch cache would deliver a `410` http response. This is the uncached watch impl: ``` // Implements storage.Interface. func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion string, filter storage.FilterFunc) (watch.Interface, error) { if ctx == nil { glog.Errorf("Context is nil") } watchRV, err := storage.ParseWatchResourceVersion(resourceVersion) if err != nil { return nil, err } key = h.prefixEtcdKey(key) w := newEtcdWatcher(true, h.quorum, exceptKey(key), filter, h.codec, h.versioner, nil, h) go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV) return w, nil } ``` once the resourceVersion parses, there is no path that returns a direct error, so all errors would have to be returned as an `ERROR` event via the ResultChan().
This commit is contained in:
commit
aa8fddb7d9
@ -18,6 +18,7 @@ package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
@ -25,8 +26,10 @@ import (
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/api/meta"
|
||||
"k8s.io/kubernetes/pkg/api/rest"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
"k8s.io/kubernetes/pkg/conversion"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
@ -259,7 +262,10 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
|
||||
defer c.watchCache.RUnlock()
|
||||
initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// To match the uncached watch implementation, once we have passed authn/authz/admission,
|
||||
// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
|
||||
// rather than a directly returned error.
|
||||
return newErrWatcher(err), nil
|
||||
}
|
||||
|
||||
c.Lock()
|
||||
@ -455,6 +461,46 @@ func (lw *cacherListerWatcher) Watch(options api.ListOptions) (watch.Interface,
|
||||
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, options.ResourceVersion, Everything)
|
||||
}
|
||||
|
||||
// cacherWatch implements watch.Interface to return a single error
|
||||
type errWatcher struct {
|
||||
result chan watch.Event
|
||||
}
|
||||
|
||||
func newErrWatcher(err error) *errWatcher {
|
||||
// Create an error event
|
||||
errEvent := watch.Event{Type: watch.Error}
|
||||
switch err := err.(type) {
|
||||
case runtime.Object:
|
||||
errEvent.Object = err
|
||||
case *errors.StatusError:
|
||||
errEvent.Object = &err.ErrStatus
|
||||
default:
|
||||
errEvent.Object = &unversioned.Status{
|
||||
Status: unversioned.StatusFailure,
|
||||
Message: err.Error(),
|
||||
Reason: unversioned.StatusReasonInternalError,
|
||||
Code: http.StatusInternalServerError,
|
||||
}
|
||||
}
|
||||
|
||||
// Create a watcher with room for a single event, populate it, and close the channel
|
||||
watcher := &errWatcher{result: make(chan watch.Event, 1)}
|
||||
watcher.result <- errEvent
|
||||
close(watcher.result)
|
||||
|
||||
return watcher
|
||||
}
|
||||
|
||||
// Implements watch.Interface.
|
||||
func (c *errWatcher) ResultChan() <-chan watch.Event {
|
||||
return c.result
|
||||
}
|
||||
|
||||
// Implements watch.Interface.
|
||||
func (c *errWatcher) Stop() {
|
||||
// no-op
|
||||
}
|
||||
|
||||
// cacherWatch implements watch.Interface
|
||||
type cacheWatcher struct {
|
||||
sync.Mutex
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/api/meta"
|
||||
"k8s.io/kubernetes/pkg/api/testapi"
|
||||
apitesting "k8s.io/kubernetes/pkg/api/testing"
|
||||
@ -231,11 +232,15 @@ func TestWatch(t *testing.T) {
|
||||
verifyWatchEvent(t, watcher, watch.Added, podFoo)
|
||||
verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)
|
||||
|
||||
// Check whether we get too-old error.
|
||||
_, err = cacher.Watch(context.TODO(), "pods/ns/foo", "1", storage.Everything)
|
||||
if err == nil {
|
||||
t.Errorf("Expected 'error too old' error")
|
||||
// Check whether we get too-old error via the watch channel
|
||||
tooOldWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", "1", storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no direct error, got %v", err)
|
||||
}
|
||||
defer tooOldWatcher.Stop()
|
||||
// Ensure we get a "Gone" error
|
||||
expectedGoneError := errors.NewGone("").(*errors.StatusError).ErrStatus
|
||||
verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedGoneError)
|
||||
|
||||
initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, storage.Everything)
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user