mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-09 12:07:47 +00:00
Merge pull request #119649 from wackxu/Paginatelist
paginate initial list inside the storage watcher
This commit is contained in:
commit
1144c85a92
@ -24,6 +24,7 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
grpccodes "google.golang.org/grpc/codes"
|
grpccodes "google.golang.org/grpc/codes"
|
||||||
grpcstatus "google.golang.org/grpc/status"
|
grpcstatus "google.golang.org/grpc/status"
|
||||||
@ -48,6 +49,9 @@ const (
|
|||||||
outgoingBufSize = 100
|
outgoingBufSize = 100
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// defaultWatcherMaxLimit is used to facilitate construction tests
|
||||||
|
var defaultWatcherMaxLimit int64 = maxLimit
|
||||||
|
|
||||||
// fatalOnDecodeError is used during testing to panic the server if watcher encounters a decoding error
|
// fatalOnDecodeError is used during testing to panic the server if watcher encounters a decoding error
|
||||||
var fatalOnDecodeError = false
|
var fatalOnDecodeError = false
|
||||||
|
|
||||||
@ -211,17 +215,58 @@ func (wc *watchChan) RequestWatchProgress() error {
|
|||||||
func (wc *watchChan) sync() error {
|
func (wc *watchChan) sync() error {
|
||||||
opts := []clientv3.OpOption{}
|
opts := []clientv3.OpOption{}
|
||||||
if wc.recursive {
|
if wc.recursive {
|
||||||
opts = append(opts, clientv3.WithPrefix())
|
opts = append(opts, clientv3.WithLimit(defaultWatcherMaxLimit))
|
||||||
|
rangeEnd := clientv3.GetPrefixRangeEnd(wc.key)
|
||||||
|
opts = append(opts, clientv3.WithRange(rangeEnd))
|
||||||
}
|
}
|
||||||
getResp, err := wc.watcher.client.Get(wc.ctx, wc.key, opts...)
|
|
||||||
if err != nil {
|
var err error
|
||||||
return err
|
var lastKey []byte
|
||||||
|
var withRev int64
|
||||||
|
var getResp *clientv3.GetResponse
|
||||||
|
|
||||||
|
metricsOp := "get"
|
||||||
|
if wc.recursive {
|
||||||
|
metricsOp = "list"
|
||||||
}
|
}
|
||||||
wc.initialRev = getResp.Header.Revision
|
|
||||||
for _, kv := range getResp.Kvs {
|
preparedKey := wc.key
|
||||||
wc.sendEvent(parseKV(kv))
|
|
||||||
|
for {
|
||||||
|
startTime := time.Now()
|
||||||
|
getResp, err = wc.watcher.client.KV.Get(wc.ctx, preparedKey, opts...)
|
||||||
|
metrics.RecordEtcdRequest(metricsOp, wc.watcher.groupResource.String(), err, startTime)
|
||||||
|
if err != nil {
|
||||||
|
return interpretListError(err, true, preparedKey, wc.key)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(getResp.Kvs) == 0 && getResp.More {
|
||||||
|
return fmt.Errorf("no results were found, but etcd indicated there were more values remaining")
|
||||||
|
}
|
||||||
|
|
||||||
|
// send items from the response until no more results
|
||||||
|
for i, kv := range getResp.Kvs {
|
||||||
|
lastKey = kv.Key
|
||||||
|
wc.sendEvent(parseKV(kv))
|
||||||
|
// free kv early. Long lists can take O(seconds) to decode.
|
||||||
|
getResp.Kvs[i] = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if withRev == 0 {
|
||||||
|
wc.initialRev = getResp.Header.Revision
|
||||||
|
}
|
||||||
|
|
||||||
|
// no more results remain
|
||||||
|
if !getResp.More {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
preparedKey = string(lastKey) + "\x00"
|
||||||
|
if withRev == 0 {
|
||||||
|
withRev = getResp.Header.Revision
|
||||||
|
opts = append(opts, clientv3.WithRev(withRev))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func logWatchChannelErr(err error) {
|
func logWatchChannelErr(err error) {
|
||||||
|
@ -20,13 +20,15 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
|
"k8s.io/apiserver/pkg/apis/example"
|
||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
"k8s.io/apiserver/pkg/storage/etcd3/testserver"
|
"k8s.io/apiserver/pkg/storage/etcd3/testserver"
|
||||||
storagetesting "k8s.io/apiserver/pkg/storage/testing"
|
storagetesting "k8s.io/apiserver/pkg/storage/testing"
|
||||||
@ -158,3 +160,156 @@ func TestWatchErrorWhenNoNewFunc(t *testing.T) {
|
|||||||
t.Fatalf("unexpected err = %v, expected = %v", err, expectedError)
|
t.Fatalf("unexpected err = %v, expected = %v", err, expectedError)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWatchChanSync(t *testing.T) {
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
watchKey string
|
||||||
|
watcherMaxLimit int64
|
||||||
|
expectEventCount int
|
||||||
|
expectGetCount int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "None of the current objects match watchKey: sync with empty page",
|
||||||
|
watchKey: "/pods/test/",
|
||||||
|
watcherMaxLimit: 1,
|
||||||
|
expectGetCount: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "The number of current objects is less than defaultWatcherMaxLimit: sync with one page",
|
||||||
|
watchKey: "/pods/",
|
||||||
|
watcherMaxLimit: 3,
|
||||||
|
expectEventCount: 2,
|
||||||
|
expectGetCount: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "a new item added to etcd before returning a second page is not returned: sync with two page",
|
||||||
|
watchKey: "/pods/",
|
||||||
|
watcherMaxLimit: 1,
|
||||||
|
expectEventCount: 2,
|
||||||
|
expectGetCount: 2,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, testCase := range testCases {
|
||||||
|
t.Run(testCase.name, func(t *testing.T) {
|
||||||
|
defaultWatcherMaxLimit = testCase.watcherMaxLimit
|
||||||
|
|
||||||
|
origCtx, store, _ := testSetup(t)
|
||||||
|
initList, err := initStoreData(origCtx, store)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
kvWrapper := newEtcdClientKVWrapper(store.client.KV)
|
||||||
|
kvWrapper.getReactors = append(kvWrapper.getReactors, func() {
|
||||||
|
barThird := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "third", Name: "bar"}}
|
||||||
|
podKey := fmt.Sprintf("/pods/%s/%s", barThird.Namespace, barThird.Name)
|
||||||
|
storedObj := &example.Pod{}
|
||||||
|
|
||||||
|
err := store.Create(context.Background(), podKey, barThird, storedObj, 0)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("failed to create object: %v", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
store.client.KV = kvWrapper
|
||||||
|
|
||||||
|
w := store.watcher.createWatchChan(
|
||||||
|
origCtx,
|
||||||
|
testCase.watchKey,
|
||||||
|
0,
|
||||||
|
true,
|
||||||
|
false,
|
||||||
|
storage.Everything)
|
||||||
|
|
||||||
|
err = w.sync()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// close incomingEventChan so we can read incomingEventChan non-blocking
|
||||||
|
close(w.incomingEventChan)
|
||||||
|
|
||||||
|
eventsReceived := 0
|
||||||
|
for event := range w.incomingEventChan {
|
||||||
|
eventsReceived++
|
||||||
|
storagetesting.ExpectContains(t, "incorrect list pods", initList, event.key)
|
||||||
|
}
|
||||||
|
|
||||||
|
if eventsReceived != testCase.expectEventCount {
|
||||||
|
t.Errorf("Unexpected number of events: %v, expected: %v", eventsReceived, testCase.expectEventCount)
|
||||||
|
}
|
||||||
|
|
||||||
|
if kvWrapper.getCallCounter != testCase.expectGetCount {
|
||||||
|
t.Errorf("Unexpected called times of client.KV.Get() : %v, expected: %v", kvWrapper.getCallCounter, testCase.expectGetCount)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NOTE: it's not thread-safe
|
||||||
|
type etcdClientKVWrapper struct {
|
||||||
|
clientv3.KV
|
||||||
|
// keeps track of the number of times Get method is called
|
||||||
|
getCallCounter int
|
||||||
|
// getReactors is called after the etcd KV's get function is executed.
|
||||||
|
getReactors []func()
|
||||||
|
}
|
||||||
|
|
||||||
|
func newEtcdClientKVWrapper(kv clientv3.KV) *etcdClientKVWrapper {
|
||||||
|
return &etcdClientKVWrapper{
|
||||||
|
KV: kv,
|
||||||
|
getCallCounter: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ecw *etcdClientKVWrapper) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
|
||||||
|
resp, err := ecw.KV.Get(ctx, key, opts...)
|
||||||
|
ecw.getCallCounter++
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(ecw.getReactors) > 0 {
|
||||||
|
reactor := ecw.getReactors[0]
|
||||||
|
ecw.getReactors = ecw.getReactors[1:]
|
||||||
|
reactor()
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func initStoreData(ctx context.Context, store storage.Interface) ([]interface{}, error) {
|
||||||
|
barFirst := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "first", Name: "bar"}}
|
||||||
|
barSecond := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "second", Name: "bar"}}
|
||||||
|
|
||||||
|
preset := []struct {
|
||||||
|
key string
|
||||||
|
obj *example.Pod
|
||||||
|
storedObj *example.Pod
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
key: fmt.Sprintf("/pods/%s/%s", barFirst.Namespace, barFirst.Name),
|
||||||
|
obj: barFirst,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
key: fmt.Sprintf("/pods/%s/%s", barSecond.Namespace, barSecond.Name),
|
||||||
|
obj: barSecond,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, ps := range preset {
|
||||||
|
preset[i].storedObj = &example.Pod{}
|
||||||
|
err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to create object: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var created []interface{}
|
||||||
|
for _, item := range preset {
|
||||||
|
created = append(created, item.key)
|
||||||
|
}
|
||||||
|
return created, nil
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user