storage/etcd: the watcher supports the API streaming

This commit is contained in:
Lukasz Szaszkiewicz 2023-08-22 14:02:39 +02:00
parent 7a941176a7
commit ca562fd280
4 changed files with 191 additions and 32 deletions

View File

@ -30,6 +30,17 @@ type event struct {
isDeleted bool isDeleted bool
isCreated bool isCreated bool
isProgressNotify bool isProgressNotify bool
// isInitialEventsEndBookmark helps us keep track
// of whether we have sent an annotated bookmark event.
//
// when this variable is set to true,
// a special annotation will be added
// to the bookmark event.
//
// note that we decided to extend the event
// struct field to eliminate contention
// between startWatching and processEvent
isInitialEventsEndBookmark bool
} }
// parseKV converts a KeyValue retrieved from an initial sync() listing to a synthetic isCreated event. // parseKV converts a KeyValue retrieved from an initial sync() listing to a synthetic isCreated event.

View File

@ -36,7 +36,6 @@ import (
"k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/audit" "k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
@ -112,12 +111,11 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func
pathPrefix += "/" pathPrefix += "/"
} }
// TODO(p0lyn0mial): pass newListFunc and resourcePrefix to the watcher
w := &watcher{ w := &watcher{
client: c, client: c,
codec: codec, codec: codec,
groupResource: groupResource,
newFunc: newFunc, newFunc: newFunc,
groupResource: groupResource,
versioner: versioner, versioner: versioner,
transformer: transformer, transformer: transformer,
} }
@ -126,7 +124,6 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func
} else { } else {
w.objectType = reflect.TypeOf(newFunc()).String() w.objectType = reflect.TypeOf(newFunc()).String()
} }
s := &store{ s := &store{
client: c, client: c,
codec: codec, codec: codec,
@ -139,6 +136,10 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func
watcher: w, watcher: w,
leaseManager: newDefaultLeaseManager(c, leaseManagerConfig), leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),
} }
w.getCurrentStorageRV = func(ctx context.Context) (uint64, error) {
return storage.GetCurrentResourceVersionFromStorage(ctx, s, newListFunc, resourcePrefix, w.objectType)
}
return s return s
} }
@ -855,18 +856,7 @@ func growSlice(v reflect.Value, maxCapacity int, sizes ...int) {
} }
// Watch implements storage.Interface.Watch. // Watch implements storage.Interface.Watch.
// TODO(#115478): In order to graduate the WatchList feature to beta, the etcd3 implementation must/should also support it.
func (s *store) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { func (s *store) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
// it is safe to skip SendInitialEvents if the request is backward compatible
// see https://github.com/kubernetes/kubernetes/blob/267eb25e60955fe8e438c6311412e7cf7d028acb/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go#L260
compatibility := opts.Predicate.AllowWatchBookmarks == false && (opts.ResourceVersion == "" || opts.ResourceVersion == "0")
if opts.SendInitialEvents != nil && !compatibility {
return nil, apierrors.NewInvalid(
schema.GroupKind{Group: s.groupResource.Group, Kind: s.groupResource.Resource},
"",
field.ErrorList{field.Forbidden(field.NewPath("sendInitialEvents"), "for watch is unsupported by an etcd cluster")},
)
}
preparedKey, err := s.prepareKey(key) preparedKey, err := s.prepareKey(key)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -26,20 +26,21 @@ import (
"sync" "sync"
"time" "time"
clientv3 "go.etcd.io/etcd/client/v3"
grpccodes "google.golang.org/grpc/codes" grpccodes "google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status" grpcstatus "google.golang.org/grpc/status"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3/metrics" "k8s.io/apiserver/pkg/storage/etcd3/metrics"
"k8s.io/apiserver/pkg/storage/value" "k8s.io/apiserver/pkg/storage/value"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
clientv3 "go.etcd.io/etcd/client/v3"
"k8s.io/klog/v2" "k8s.io/klog/v2"
) )
@ -67,13 +68,14 @@ func TestOnlySetFatalOnDecodeError(b bool) {
} }
type watcher struct { type watcher struct {
client *clientv3.Client client *clientv3.Client
codec runtime.Codec codec runtime.Codec
newFunc func() runtime.Object newFunc func() runtime.Object
objectType string objectType string
groupResource schema.GroupResource groupResource schema.GroupResource
versioner storage.Versioner versioner storage.Versioner
transformer value.Transformer transformer value.Transformer
getCurrentStorageRV func(context.Context) (uint64, error)
} }
// watchChan implements watch.Interface. // watchChan implements watch.Interface.
@ -105,8 +107,12 @@ func (w *watcher) Watch(ctx context.Context, key string, rev int64, opts storage
if opts.ProgressNotify && w.newFunc == nil { if opts.ProgressNotify && w.newFunc == nil {
return nil, apierrors.NewInternalError(errors.New("progressNotify for watch is unsupported by the etcd storage because no newFunc was provided")) return nil, apierrors.NewInternalError(errors.New("progressNotify for watch is unsupported by the etcd storage because no newFunc was provided"))
} }
wc := w.createWatchChan(ctx, key, rev, opts.Recursive, opts.ProgressNotify, opts.Predicate) startWatchRV, err := w.getStartWatchResourceVersion(ctx, rev, opts)
go wc.run() if err != nil {
return nil, err
}
wc := w.createWatchChan(ctx, key, startWatchRV, opts.Recursive, opts.ProgressNotify, opts.Predicate)
go wc.run(isInitialEventsEndBookmarkRequired(opts), areInitialEventsRequired(rev, opts))
// For etcd watch we don't have an easy way to answer whether the watch // For etcd watch we don't have an easy way to answer whether the watch
// has already caught up. So in the initial version (given that watchcache // has already caught up. So in the initial version (given that watchcache
@ -138,6 +144,62 @@ func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, re
return wc return wc
} }
// getStartWatchResourceVersion returns a ResourceVersion
// the watch will be started from.
// Depending on the input parameters the semantics of the returned ResourceVersion are:
// - start at Exact (return resourceVersion)
// - start at Most Recent (return an RV from etcd)
func (w *watcher) getStartWatchResourceVersion(ctx context.Context, resourceVersion int64, opts storage.ListOptions) (int64, error) {
if resourceVersion > 0 {
return resourceVersion, nil
}
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) {
return 0, nil
}
if opts.SendInitialEvents == nil || *opts.SendInitialEvents {
// note that when opts.SendInitialEvents=true
// we will be issuing a consistent LIST request
// against etcd followed by the special bookmark event
return 0, nil
}
// at this point the clients is interested
// only in getting a stream of events
// starting at the MostRecent point in time (RV)
currentStorageRV, err := w.getCurrentStorageRV(ctx)
if err != nil {
return 0, err
}
// currentStorageRV is taken from resp.Header.Revision (int64)
// and cast to uint64, so it is safe to do reverse
// at some point we should unify the interface but that
// would require changing Versioner.UpdateList
return int64(currentStorageRV), nil
}
// isInitialEventsEndBookmarkRequired since there is no way to directly set
// opts.ProgressNotify from the API and the etcd3 impl doesn't support
// notification for external clients we simply return initialEventsEndBookmarkRequired
// to only send the bookmark event after the initial list call.
//
// see: https://github.com/kubernetes/kubernetes/issues/120348
func isInitialEventsEndBookmarkRequired(opts storage.ListOptions) bool {
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) {
return false
}
return opts.SendInitialEvents != nil && *opts.SendInitialEvents && opts.Predicate.AllowWatchBookmarks
}
// areInitialEventsRequired returns true if all events from the etcd should be returned.
func areInitialEventsRequired(resourceVersion int64, opts storage.ListOptions) bool {
if opts.SendInitialEvents == nil && resourceVersion == 0 {
return true // legacy case
}
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) {
return false
}
return opts.SendInitialEvents != nil && *opts.SendInitialEvents
}
type etcdError interface { type etcdError interface {
Code() grpccodes.Code Code() grpccodes.Code
Error() string Error() string
@ -163,9 +225,9 @@ func isCancelError(err error) bool {
return false return false
} }
func (wc *watchChan) run() { func (wc *watchChan) run(initialEventsEndBookmarkRequired, forceInitialEvents bool) {
watchClosedCh := make(chan struct{}) watchClosedCh := make(chan struct{})
go wc.startWatching(watchClosedCh) go wc.startWatching(watchClosedCh, initialEventsEndBookmarkRequired, forceInitialEvents)
var resultChanWG sync.WaitGroup var resultChanWG sync.WaitGroup
resultChanWG.Add(1) resultChanWG.Add(1)
@ -284,14 +346,44 @@ func logWatchChannelErr(err error) {
// startWatching does: // startWatching does:
// - get current objects if initialRev=0; set initialRev to current rev // - get current objects if initialRev=0; set initialRev to current rev
// - watch on given key and send events to process. // - watch on given key and send events to process.
func (wc *watchChan) startWatching(watchClosedCh chan struct{}) { //
if wc.initialRev == 0 { // initialEventsEndBookmarkSent helps us keep track
// of whether we have sent an annotated bookmark event.
//
// it's important to note that we don't
// need to track the actual RV because
// we only send the bookmark event
// after the initial list call.
//
// when this variable is set to false,
// it means we don't have any specific
// preferences for delivering bookmark events.
func (wc *watchChan) startWatching(watchClosedCh chan struct{}, initialEventsEndBookmarkRequired, forceInitialEvents bool) {
if wc.initialRev > 0 && forceInitialEvents {
currentStorageRV, err := wc.watcher.getCurrentStorageRV(wc.ctx)
if err != nil {
wc.sendError(err)
return
}
if uint64(wc.initialRev) > currentStorageRV {
wc.sendError(storage.NewTooLargeResourceVersionError(uint64(wc.initialRev), currentStorageRV, int(wait.Jitter(1*time.Second, 3).Seconds())))
return
}
}
if forceInitialEvents {
if err := wc.sync(); err != nil { if err := wc.sync(); err != nil {
klog.Errorf("failed to sync with latest state: %v", err) klog.Errorf("failed to sync with latest state: %v", err)
wc.sendError(err) wc.sendError(err)
return return
} }
} }
if initialEventsEndBookmarkRequired {
wc.sendEvent(func() *event {
e := progressNotifyEvent(wc.initialRev)
e.isInitialEventsEndBookmark = true
return e
}())
}
opts := []clientv3.OpOption{clientv3.WithRev(wc.initialRev + 1), clientv3.WithPrevKV()} opts := []clientv3.OpOption{clientv3.WithRev(wc.initialRev + 1), clientv3.WithPrevKV()}
if wc.recursive { if wc.recursive {
opts = append(opts, clientv3.WithPrefix()) opts = append(opts, clientv3.WithPrefix())
@ -388,6 +480,12 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
klog.Errorf("failed to propagate object version: %v", err) klog.Errorf("failed to propagate object version: %v", err)
return nil return nil
} }
if e.isInitialEventsEndBookmark {
if err := storage.AnnotateInitialEventsEndBookmark(object); err != nil {
wc.sendError(fmt.Errorf("error while accessing object's metadata gr: %v, type: %v, obj: %#v, err: %v", wc.watcher.groupResource, wc.watcher.objectType, object, err))
return nil
}
}
res = &watch.Event{ res = &watch.Event{
Type: watch.Bookmark, Type: watch.Bookmark,
Object: object, Object: object,

View File

@ -24,9 +24,13 @@ import (
"testing" "testing"
"time" "time"
"github.com/google/go-cmp/cmp"
clientv3 "go.etcd.io/etcd/client/v3" clientv3 "go.etcd.io/etcd/client/v3"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/apis/example" "k8s.io/apiserver/pkg/apis/example"
"k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/features"
@ -35,6 +39,7 @@ import (
storagetesting "k8s.io/apiserver/pkg/storage/testing" storagetesting "k8s.io/apiserver/pkg/storage/testing"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing" featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/utils/ptr"
) )
func TestWatch(t *testing.T) { func TestWatch(t *testing.T) {
@ -123,6 +128,16 @@ func TestSendInitialEventsBackwardCompatibility(t *testing.T) {
storagetesting.RunSendInitialEventsBackwardCompatibility(ctx, t, store) storagetesting.RunSendInitialEventsBackwardCompatibility(ctx, t, store)
} }
func TestEtcdWatchSemantics(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunWatchSemantics(ctx, t, store)
}
func TestEtcdWatchSemanticInitialEventsExtended(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunWatchSemanticInitialEventsExtended(ctx, t, store)
}
// ======================================================================= // =======================================================================
// Implementation-specific tests are following. // Implementation-specific tests are following.
// The following tests are exercising the details of the implementation // The following tests are exercising the details of the implementation
@ -145,7 +160,7 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
go func() { go func() {
w.run() w.run(false, true)
wg.Done() wg.Done()
}() }()
w.errChan <- fmt.Errorf("some error") w.errChan <- fmt.Errorf("some error")
@ -194,6 +209,51 @@ func TestWatchErrorIncorrectConfiguration(t *testing.T) {
} }
} }
func TestTooLargeResourceVersionErrorForWatchList(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)()
origCtx, store, _ := testSetup(t)
ctx, cancel := context.WithCancel(origCtx)
defer cancel()
requestOpts := storage.ListOptions{
SendInitialEvents: ptr.To(true),
Recursive: true,
Predicate: storage.SelectionPredicate{
Field: fields.Everything(),
Label: labels.Everything(),
AllowWatchBookmarks: true,
},
}
var expectedErr *apierrors.StatusError
if !errors.As(storage.NewTooLargeResourceVersionError(uint64(102), 1, 0), &expectedErr) {
t.Fatalf("Unable to convert NewTooLargeResourceVersionError to apierrors.StatusError")
}
w, err := store.watcher.Watch(ctx, "/abc", int64(102), requestOpts)
if err != nil {
t.Fatal(err)
}
defer w.Stop()
actualEvent := <-w.ResultChan()
if actualEvent.Type != watch.Error {
t.Fatalf("Unexpected type of the event: %v, expected: %v", actualEvent.Type, watch.Error)
}
actualErr, ok := actualEvent.Object.(*metav1.Status)
if !ok {
t.Fatalf("Expected *apierrors.StatusError, got: %#v", actualEvent.Object)
}
if actualErr.Details.RetryAfterSeconds <= 0 {
t.Fatalf("RetryAfterSeconds must be > 0, actual value: %v", actualErr.Details.RetryAfterSeconds)
}
// rewrite the Details as it contains retry seconds
// and validate the whole struct
expectedErr.ErrStatus.Details = actualErr.Details
if diff := cmp.Diff(*actualErr, expectedErr.ErrStatus); diff != "" {
t.Fatalf("Unexpected error returned, diff: %v", diff)
}
}
func TestWatchChanSync(t *testing.T) { func TestWatchChanSync(t *testing.T) {
testCases := []struct { testCases := []struct {
name string name string