diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go index 4f6f8f09e3c..dbd99a3e3b1 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go @@ -24,6 +24,7 @@ import ( "strconv" "strings" "sync" + "time" grpccodes "google.golang.org/grpc/codes" grpcstatus "google.golang.org/grpc/status" @@ -48,6 +49,9 @@ const ( outgoingBufSize = 100 ) +// defaultWatcherMaxLimit is used to facilitate construction tests +var defaultWatcherMaxLimit int64 = maxLimit + // fatalOnDecodeError is used during testing to panic the server if watcher encounters a decoding error var fatalOnDecodeError = false @@ -211,17 +215,58 @@ func (wc *watchChan) RequestWatchProgress() error { func (wc *watchChan) sync() error { opts := []clientv3.OpOption{} if wc.recursive { - opts = append(opts, clientv3.WithPrefix()) + opts = append(opts, clientv3.WithLimit(defaultWatcherMaxLimit)) + rangeEnd := clientv3.GetPrefixRangeEnd(wc.key) + opts = append(opts, clientv3.WithRange(rangeEnd)) } - getResp, err := wc.watcher.client.Get(wc.ctx, wc.key, opts...) - if err != nil { - return err + + var err error + var lastKey []byte + var withRev int64 + var getResp *clientv3.GetResponse + + metricsOp := "get" + if wc.recursive { + metricsOp = "list" } - wc.initialRev = getResp.Header.Revision - for _, kv := range getResp.Kvs { - wc.sendEvent(parseKV(kv)) + + preparedKey := wc.key + + for { + startTime := time.Now() + getResp, err = wc.watcher.client.KV.Get(wc.ctx, preparedKey, opts...) + metrics.RecordEtcdRequest(metricsOp, wc.watcher.groupResource.String(), err, startTime) + if err != nil { + return interpretListError(err, true, preparedKey, wc.key) + } + + if len(getResp.Kvs) == 0 && getResp.More { + return fmt.Errorf("no results were found, but etcd indicated there were more values remaining") + } + + // send items from the response until no more results + for i, kv := range getResp.Kvs { + lastKey = kv.Key + wc.sendEvent(parseKV(kv)) + // free kv early. Long lists can take O(seconds) to decode. + getResp.Kvs[i] = nil + } + + if withRev == 0 { + wc.initialRev = getResp.Header.Revision + } + + // no more results remain + if !getResp.More { + return nil + } + + preparedKey = string(lastKey) + "\x00" + if withRev == 0 { + withRev = getResp.Header.Revision + opts = append(opts, clientv3.WithRev(withRev)) + } } - return nil } func logWatchChannelErr(err error) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index 35c3a3245d2..7fa579aae3f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -20,13 +20,15 @@ import ( "context" "errors" "fmt" - "sync" "testing" "time" + clientv3 "go.etcd.io/etcd/client/v3" apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/apis/example" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/etcd3/testserver" storagetesting "k8s.io/apiserver/pkg/storage/testing" @@ -158,3 +160,156 @@ func TestWatchErrorWhenNoNewFunc(t *testing.T) { t.Fatalf("unexpected err = %v, expected = %v", err, expectedError) } } + +func TestWatchChanSync(t *testing.T) { + testCases := []struct { + name string + watchKey string + watcherMaxLimit int64 + expectEventCount int + expectGetCount int + }{ + { + name: "None of the current objects match watchKey: sync with empty page", + watchKey: "/pods/test/", + watcherMaxLimit: 1, + expectGetCount: 1, + }, + { + name: "The number of current objects is less than defaultWatcherMaxLimit: sync with one page", + watchKey: "/pods/", + watcherMaxLimit: 3, + expectEventCount: 2, + expectGetCount: 1, + }, + { + name: "a new item added to etcd before returning a second page is not returned: sync with two page", + watchKey: "/pods/", + watcherMaxLimit: 1, + expectEventCount: 2, + expectGetCount: 2, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + defaultWatcherMaxLimit = testCase.watcherMaxLimit + + origCtx, store, _ := testSetup(t) + initList, err := initStoreData(origCtx, store) + if err != nil { + t.Fatal(err) + } + + kvWrapper := newEtcdClientKVWrapper(store.client.KV) + kvWrapper.getReactors = append(kvWrapper.getReactors, func() { + barThird := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "third", Name: "bar"}} + podKey := fmt.Sprintf("/pods/%s/%s", barThird.Namespace, barThird.Name) + storedObj := &example.Pod{} + + err := store.Create(context.Background(), podKey, barThird, storedObj, 0) + if err != nil { + t.Errorf("failed to create object: %v", err) + } + }) + + store.client.KV = kvWrapper + + w := store.watcher.createWatchChan( + origCtx, + testCase.watchKey, + 0, + true, + false, + storage.Everything) + + err = w.sync() + if err != nil { + t.Fatal(err) + } + + // close incomingEventChan so we can read incomingEventChan non-blocking + close(w.incomingEventChan) + + eventsReceived := 0 + for event := range w.incomingEventChan { + eventsReceived++ + storagetesting.ExpectContains(t, "incorrect list pods", initList, event.key) + } + + if eventsReceived != testCase.expectEventCount { + t.Errorf("Unexpected number of events: %v, expected: %v", eventsReceived, testCase.expectEventCount) + } + + if kvWrapper.getCallCounter != testCase.expectGetCount { + t.Errorf("Unexpected called times of client.KV.Get() : %v, expected: %v", kvWrapper.getCallCounter, testCase.expectGetCount) + } + }) + } +} + +// NOTE: it's not thread-safe +type etcdClientKVWrapper struct { + clientv3.KV + // keeps track of the number of times Get method is called + getCallCounter int + // getReactors is called after the etcd KV's get function is executed. + getReactors []func() +} + +func newEtcdClientKVWrapper(kv clientv3.KV) *etcdClientKVWrapper { + return &etcdClientKVWrapper{ + KV: kv, + getCallCounter: 0, + } +} + +func (ecw *etcdClientKVWrapper) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) { + resp, err := ecw.KV.Get(ctx, key, opts...) + ecw.getCallCounter++ + if err != nil { + return nil, err + } + + if len(ecw.getReactors) > 0 { + reactor := ecw.getReactors[0] + ecw.getReactors = ecw.getReactors[1:] + reactor() + } + + return resp, nil +} + +func initStoreData(ctx context.Context, store storage.Interface) ([]interface{}, error) { + barFirst := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "first", Name: "bar"}} + barSecond := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "second", Name: "bar"}} + + preset := []struct { + key string + obj *example.Pod + storedObj *example.Pod + }{ + { + key: fmt.Sprintf("/pods/%s/%s", barFirst.Namespace, barFirst.Name), + obj: barFirst, + }, + { + key: fmt.Sprintf("/pods/%s/%s", barSecond.Namespace, barSecond.Name), + obj: barSecond, + }, + } + + for i, ps := range preset { + preset[i].storedObj = &example.Pod{} + err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0) + if err != nil { + return nil, fmt.Errorf("failed to create object: %w", err) + } + } + + var created []interface{} + for _, item := range preset { + created = append(created, item.key) + } + return created, nil +}