diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 812cfc69c6c..14643dad913 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -572,7 +572,7 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions, fromRV = &parsedRV } - var returnedRV, continueRV int64 + var returnedRV, continueRV, withRev int64 var continueKey string switch { case s.pagingEnabled && len(pred.Continue) > 0: @@ -593,7 +593,7 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions, // continueRV==0 is invalid. // If continueRV < 0, the request is for the latest resource version. if continueRV > 0 { - options = append(options, clientv3.WithRev(continueRV)) + withRev = continueRV returnedRV = continueRV } case s.pagingEnabled && pred.Limit > 0: @@ -604,11 +604,11 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions, // and returnedRV is then set to the revision we get from the etcd response. case metav1.ResourceVersionMatchExact: returnedRV = int64(*fromRV) - options = append(options, clientv3.WithRev(returnedRV)) + withRev = returnedRV case "": // legacy case if *fromRV > 0 { returnedRV = int64(*fromRV) - options = append(options, clientv3.WithRev(returnedRV)) + withRev = returnedRV } default: return fmt.Errorf("unknown ResourceVersionMatch value: %v", match) @@ -625,7 +625,7 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions, // and returnedRV is then set to the revision we get from the etcd response. case metav1.ResourceVersionMatchExact: returnedRV = int64(*fromRV) - options = append(options, clientv3.WithRev(returnedRV)) + withRev = returnedRV case "": // legacy case default: return fmt.Errorf("unknown ResourceVersionMatch value: %v", match) @@ -634,6 +634,9 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions, options = append(options, clientv3.WithPrefix()) } + if withRev != 0 { + options = append(options, clientv3.WithRev(withRev)) + } // loop until we have filled the requested limit from etcd or there are no more results var lastKey []byte @@ -695,6 +698,10 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions, break } key = string(lastKey) + "\x00" + if withRev == 0 { + withRev = returnedRV + options = append(options, clientv3.WithRev(withRev)) + } } // instruct the client to begin querying from immediately after the last key we returned diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index 3f6fad0083c..db60282d45c 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -1963,3 +1963,115 @@ func Test_growSlice(t *testing.T) { }) } } + +// fancyTransformer creates next object on each call to +// TransformFromStorage call. +type fancyTransformer struct { + transformer value.Transformer + store *store + + lock sync.Mutex + index int +} + +func (t *fancyTransformer) TransformFromStorage(b []byte, ctx value.Context) ([]byte, bool, error) { + if err := t.createObject(); err != nil { + return nil, false, err + } + return t.transformer.TransformFromStorage(b, ctx) +} + +func (t *fancyTransformer) TransformToStorage(b []byte, ctx value.Context) ([]byte, error) { + return t.transformer.TransformToStorage(b, ctx) +} + +func (t *fancyTransformer) createObject() error { + t.lock.Lock() + defer t.lock.Unlock() + + t.index++ + key := fmt.Sprintf("pod-%d", t.index) + obj := &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: key, + Labels: map[string]string{ + "even": strconv.FormatBool(t.index%2 == 0), + }, + }, + } + out := &example.Pod{} + return t.store.Create(context.TODO(), key, obj, out, 0) +} + +func TestConsistentList(t *testing.T) { + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(t) + + transformer := &fancyTransformer{ + transformer: &prefixTransformer{prefix: []byte(defaultTestPrefix)}, + } + store := newStore(cluster.RandClient(), true, codec, "", transformer) + transformer.store = store + + for i := 0; i < 5; i++ { + if err := transformer.createObject(); err != nil { + t.Fatalf("failed to create object: %v", err) + } + } + + getAttrs := func(obj runtime.Object) (labels.Set, fields.Set, error) { + pod, ok := obj.(*example.Pod) + if !ok { + return nil, nil, fmt.Errorf("invalid object") + } + return labels.Set(pod.Labels), nil, nil + } + predicate := storage.SelectionPredicate{ + Label: labels.Set{"even": "true"}.AsSelector(), + GetAttrs: getAttrs, + Limit: 4, + } + + result1 := example.PodList{} + if err := store.List(context.TODO(), "/", storage.ListOptions{Predicate: predicate}, &result1); err != nil { + t.Fatalf("failed to list objects: %v", err) + } + + // List objects from the returned resource version. + options := storage.ListOptions{ + Predicate: predicate, + ResourceVersion: result1.ResourceVersion, + ResourceVersionMatch: metav1.ResourceVersionMatchExact, + } + + result2 := example.PodList{} + if err := store.List(context.TODO(), "/", options, &result2); err != nil { + t.Fatalf("failed to list objects: %v", err) + } + + if !reflect.DeepEqual(result1, result2) { + t.Errorf("inconsistent lists: %#v, %#v", result1, result2) + } + + // Now also verify the ResourceVersionMatchNotOlderThan. + options.ResourceVersionMatch = metav1.ResourceVersionMatchNotOlderThan + + result3 := example.PodList{} + if err := store.List(context.TODO(), "/", options, &result3); err != nil { + t.Fatalf("failed to list objects: %v", err) + } + + options.ResourceVersion = result3.ResourceVersion + options.ResourceVersionMatch = metav1.ResourceVersionMatchExact + + result4 := example.PodList{} + if err := store.List(context.TODO(), "/", options, &result4); err != nil { + t.Fatalf("failed to list objects: %v", err) + } + + if !reflect.DeepEqual(result3, result4) { + t.Errorf("inconsistent lists: %#v, %#v", result3, result4) + } + +}