diff --git a/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go b/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go index caaee1b17df..4d3a55d7169 100644 --- a/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go +++ b/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go @@ -76,9 +76,10 @@ type ListMeta struct { // continue may be set if the user set a limit on the number of items returned, and indicates that // the server has more data available. The value is opaque and may be used to issue another request // to the endpoint that served this list to retrieve the next set of available objects. Continuing a - // list may not be possible if the server configuration has changed or more than a few minutes have - // passed. The resourceVersion field returned when using this continue value will be identical to - // the value in the first response. + // consistent list may not be possible if the server configuration has changed or more than a few + // minutes have passed. The resourceVersion field returned when using this continue value will be + // identical to the value in the first response, unless you have received this token from an error + // message. Continue string `json:"continue,omitempty" protobuf:"bytes,3,opt,name=continue"` } @@ -363,14 +364,20 @@ type ListOptions struct { // updated during a chunked list the version of the object that was present at the time the first list // result was calculated is returned. Limit int64 `json:"limit,omitempty" protobuf:"varint,7,opt,name=limit"` - // The continue option should be set when retrieving more results from the server. Since this value - // is server defined, clients may only use the continue value from a previous query result with - // identical query parameters (except for the value of continue) and the server may reject a continue - // value it does not recognize. If the specified continue value is no longer valid whether due to - // expiration (generally five to fifteen minutes) or a configuration change on the server the server - // will respond with a 410 ResourceExpired error indicating the client must restart their list without - // the continue field. This field is not supported when watch is true. Clients may start a watch from - // the last resourceVersion value returned by the server and not miss any modifications. + // The continue option should be set when retrieving more results from the server. Since this value is + // server defined, clients may only use the continue value from a previous query result with identical + // query parameters (except for the value of continue) and the server may reject a continue value it + // does not recognize. If the specified continue value is no longer valid whether due to expiration + // (generally five to fifteen minutes) or a configuration change on the server, the server will + // respond with a 410 ResourceExpired error together with a continue token. If the client needs a + // consistent list, it must restart their list without the continue field. Otherwise, the client may + // send another list request with the token received with the 410 error, the server will respond with + // a list starting from the next key, but from the latest snapshot, which is inconsistent from the + // previous list results - objects that are created, modified, or deleted after the first list request + // will be included in the response, as long as their keys are after the "next key". + // + // This field is not supported when watch is true. Clients may start a watch from the last + // resourceVersion value returned by the server and not miss any modifications. Continue string `json:"continue,omitempty" protobuf:"bytes,8,opt,name=continue"` } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/errors.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/errors.go index 5aac30a146d..136570a6fc3 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/errors.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/errors.go @@ -20,6 +20,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" etcdrpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" ) func interpretWatchError(err error) error { @@ -30,13 +31,41 @@ func interpretWatchError(err error) error { return err } -func interpretListError(err error, paging bool) error { +const ( + expired string = "The resourceVersion for the provided list is too old." + continueExpired string = "The provided continue parameter is too old " + + "to display a consistent list result. You can start a new list without " + + "the continue parameter." + inconsistentContinue string = "The provided continue parameter is too old " + + "to display a consistent list result. You can start a new list without " + + "the continue parameter, or use the continue token in this response to " + + "retrieve the remainder of the results. Continuing with the provided " + + "token results in an inconsistent list - objects that were created, " + + "modified, or deleted between the time the first chunk was returned " + + "and now may show up in the list." +) + +func interpretListError(err error, paging bool, continueKey, keyPrefix string) error { switch { case err == etcdrpc.ErrCompacted: if paging { - return errors.NewResourceExpired("The provided from parameter is too old to display a consistent list result. You must start a new list without the from.") + return handleCompactedErrorForPaging(continueKey, keyPrefix) } - return errors.NewResourceExpired("The resourceVersion for the provided list is too old.") + return errors.NewResourceExpired(expired) } return err } + +func handleCompactedErrorForPaging(continueKey, keyPrefix string) error { + // continueToken.ResoureVersion=-1 means that the apiserver can + // continue the list at the latest resource version. We don't use rv=0 + // for this purpose to distinguish from a bad token that has empty rv. + newToken, err := encodeContinue(continueKey, keyPrefix, -1) + if err != nil { + utilruntime.HandleError(err) + return errors.NewResourceExpired(continueExpired) + } + statusError := errors.NewResourceExpired(inconsistentContinue) + statusError.ErrStatus.ListMeta.Continue = newToken + return statusError +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 83354478d8e..d8aa8b2fd37 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -508,10 +508,11 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor options = append(options, clientv3.WithLimit(pred.Limit)) } - var returnedRV int64 + var returnedRV, continueRV int64 + var continueKey string switch { case s.pagingEnabled && len(pred.Continue) > 0: - continueKey, continueRV, err := decodeContinue(pred.Continue, keyPrefix) + continueKey, continueRV, err = decodeContinue(pred.Continue, keyPrefix) if err != nil { return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err)) } @@ -524,9 +525,13 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor options = append(options, clientv3.WithRange(rangeEnd)) key = continueKey - options = append(options, clientv3.WithRev(continueRV)) - returnedRV = continueRV - + // If continueRV > 0, the LIST request needs a specific resource version. + // continueRV==0 is invalid. + // If continueRV < 0, the request is for the latest resource version. + if continueRV > 0 { + options = append(options, clientv3.WithRev(continueRV)) + returnedRV = continueRV + } case s.pagingEnabled && pred.Limit > 0: if len(resourceVersion) > 0 { fromRV, err := s.versioner.ParseResourceVersion(resourceVersion) @@ -563,7 +568,7 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor for { getResp, err := s.client.KV.Get(ctx, key, options...) if err != nil { - return interpretListError(err, len(pred.Continue) > 0) + return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix) } hasMore = getResp.More diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index b73639117ea..ff811a0e2af 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -25,6 +25,7 @@ import ( "fmt" "reflect" "strconv" + "strings" "sync" "testing" @@ -44,6 +45,7 @@ import ( "k8s.io/apiserver/pkg/apis/example" examplev1 "k8s.io/apiserver/pkg/apis/example/v1" "k8s.io/apiserver/pkg/storage" + "k8s.io/apiserver/pkg/storage/etcd" storagetests "k8s.io/apiserver/pkg/storage/tests" "k8s.io/apiserver/pkg/storage/value" ) @@ -1180,6 +1182,153 @@ func TestListContinuation(t *testing.T) { if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], preset[2].storedObj) { t.Fatalf("Unexpected third page: %#v", out.Items) } + +} + +func TestListInconsistentContinuation(t *testing.T) { + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(t) + store := newStore(cluster.RandClient(), false, true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)}) + ctx := context.Background() + + // Setup storage with the following structure: + // / + // - one-level/ + // | - test + // | + // - two-level/ + // - 1/ + // | - test + // | + // - 2/ + // - test + // + preset := []struct { + key string + obj *example.Pod + storedObj *example.Pod + }{ + { + key: "/one-level/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + { + key: "/two-level/1/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + { + key: "/two-level/2/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, + }, + } + + for i, ps := range preset { + preset[i].storedObj = &example.Pod{} + err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + } + + pred := func(limit int64, continueValue string) storage.SelectionPredicate { + return storage.SelectionPredicate{ + Limit: limit, + Continue: continueValue, + Label: labels.Everything(), + Field: fields.Everything(), + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { + pod := obj.(*example.Pod) + return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil + }, + } + } + + out := &example.PodList{} + if err := store.List(ctx, "/", "0", pred(1, ""), out); err != nil { + t.Fatalf("Unable to get initial list: %v", err) + } + if len(out.Continue) == 0 { + t.Fatalf("No continuation token set") + } + if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], preset[0].storedObj) { + t.Fatalf("Unexpected first page: %#v", out.Items) + } + + continueFromSecondItem := out.Continue + + // update /two-level/2/test/bar + oldName := preset[2].obj.Name + newPod := &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: oldName, + Labels: map[string]string{ + "state": "new", + }, + }, + } + if err := store.GuaranteedUpdate(ctx, preset[2].key, preset[2].storedObj, false, nil, + func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) { + return newPod, nil, nil + }, newPod); err != nil { + t.Fatalf("update failed: %v", err) + } + + // compact to latest revision. + versioner := etcd.APIObjectVersioner{} + lastRVString := preset[2].storedObj.ResourceVersion + lastRV, err := versioner.ParseResourceVersion(lastRVString) + if err != nil { + t.Fatal(err) + } + if _, err := cluster.Client(0).KV.Compact(ctx, int64(lastRV), clientv3.WithCompactPhysical()); err != nil { + t.Fatalf("Unable to compact, %v", err) + } + + // The old continue token should have expired + err = store.List(ctx, "/", "0", pred(0, continueFromSecondItem), out) + if err == nil { + t.Fatalf("unexpected no error") + } + if !strings.Contains(err.Error(), inconsistentContinue) { + t.Fatalf("unexpected error message %v", err) + } + status, ok := err.(apierrors.APIStatus) + if !ok { + t.Fatalf("expect error of implements the APIStatus interface, got %v", reflect.TypeOf(err)) + } + inconsistentContinueFromSecondItem := status.Status().ListMeta.Continue + if len(inconsistentContinueFromSecondItem) == 0 { + t.Fatalf("expect non-empty continue token") + } + + out = &example.PodList{} + if err := store.List(ctx, "/", "0", pred(1, inconsistentContinueFromSecondItem), out); err != nil { + t.Fatalf("Unable to get second page: %v", err) + } + if len(out.Continue) == 0 { + t.Fatalf("No continuation token set") + } + if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], preset[1].storedObj) { + t.Fatalf("Unexpected second page: %#v", out.Items) + } + if out.ResourceVersion != lastRVString { + t.Fatalf("Expected list resource version to be %s, got %s", lastRVString, out.ResourceVersion) + } + continueFromThirdItem := out.Continue + out = &example.PodList{} + if err := store.List(ctx, "/", "0", pred(1, continueFromThirdItem), out); err != nil { + t.Fatalf("Unable to get second page: %v", err) + } + if len(out.Continue) != 0 { + t.Fatalf("Unexpected continuation token set") + } + if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], preset[2].storedObj) { + t.Fatalf("Unexpected third page: %#v", out.Items) + } + if out.ResourceVersion != lastRVString { + t.Fatalf("Expected list resource version to be %s, got %s", lastRVString, out.ResourceVersion) + } } func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) { diff --git a/test/e2e/apimachinery/chunking.go b/test/e2e/apimachinery/chunking.go index c8b705bac0a..d3efb271467 100644 --- a/test/e2e/apimachinery/chunking.go +++ b/test/e2e/apimachinery/chunking.go @@ -19,12 +19,17 @@ package apimachinery import ( "fmt" "math/rand" + "reflect" + "time" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/test/e2e/framework" ) @@ -34,11 +39,10 @@ const numberOfTotalResources = 400 var _ = SIGDescribe("Servers with support for API chunking", func() { f := framework.NewDefaultFramework("chunking") - It("should return chunks of results for list calls", func() { + BeforeEach(func() { ns := f.Namespace.Name c := f.ClientSet client := c.CoreV1().PodTemplates(ns) - By("creating a large number of resources") workqueue.Parallelize(20, numberOfTotalResources, func(i int) { for tries := 3; tries >= 0; tries-- { @@ -61,7 +65,12 @@ var _ = SIGDescribe("Servers with support for API chunking", func() { } Fail("Unable to create template %d, exiting", i) }) + }) + It("should return chunks of results for list calls", func() { + ns := f.Namespace.Name + c := f.ClientSet + client := c.CoreV1().PodTemplates(ns) By("retrieving those results in paged fashion several times") for i := 0; i < 3; i++ { opts := metav1.ListOptions{} @@ -81,9 +90,7 @@ var _ = SIGDescribe("Servers with support for API chunking", func() { if len(lastRV) == 0 { lastRV = list.ResourceVersion } - if lastRV != list.ResourceVersion { - Expect(list.ResourceVersion).To(Equal(lastRV)) - } + Expect(list.ResourceVersion).To(Equal(lastRV)) for _, item := range list.Items { Expect(item.Name).To(Equal(fmt.Sprintf("template-%04d", found))) found++ @@ -101,4 +108,81 @@ var _ = SIGDescribe("Servers with support for API chunking", func() { Expect(err).ToNot(HaveOccurred()) Expect(list.Items).To(HaveLen(numberOfTotalResources)) }) + + It("should support continue listing from the last key if the original version has been compacted away, though the list is inconsistent", func() { + ns := f.Namespace.Name + c := f.ClientSet + client := c.CoreV1().PodTemplates(ns) + + By("retrieving the first page") + oneTenth := int64(numberOfTotalResources / 10) + opts := metav1.ListOptions{} + opts.Limit = oneTenth + list, err := client.List(opts) + // TODO: kops PR job is still using etcd2, which prevents this feature from working. Remove this check when kops is upgraded to etcd3 + if len(list.Items) > int(opts.Limit) { + framework.Skipf("ERROR: This cluster does not support chunking, which means it is running etcd2 and not supported.") + } + Expect(err).ToNot(HaveOccurred()) + firstToken := list.Continue + firstRV := list.ResourceVersion + framework.Logf("Retrieved %d/%d results with rv %s and continue %s", len(list.Items), opts.Limit, list.ResourceVersion, firstToken) + + By("retrieving the second page until the token expires") + opts.Continue = firstToken + var inconsistentToken string + wait.Poll(20*time.Second, 2*storagebackend.DefaultCompactInterval, func() (bool, error) { + _, err := client.List(opts) + if err == nil { + framework.Logf("Token %s has not expired yet", firstToken) + return false, nil + } + if err != nil && !errors.IsResourceExpired(err) { + return false, err + } + framework.Logf("got error %s", err) + status, ok := err.(errors.APIStatus) + if !ok { + return false, fmt.Errorf("expect error to implement the APIStatus interface, got %v", reflect.TypeOf(err)) + } + inconsistentToken = status.Status().ListMeta.Continue + if len(inconsistentToken) == 0 { + return false, fmt.Errorf("expect non empty continue token") + } + framework.Logf("Retrieved inconsistent continue %s", inconsistentToken) + return true, nil + }) + + By("retrieving the second page again with the token received with the error message") + opts.Continue = inconsistentToken + list, err = client.List(opts) + Expect(err).ToNot(HaveOccurred()) + Expect(list.ResourceVersion).ToNot(Equal(firstRV)) + Expect(len(list.Items)).To(BeNumerically("==", opts.Limit)) + found := oneTenth + for _, item := range list.Items { + Expect(item.Name).To(Equal(fmt.Sprintf("template-%04d", found))) + found++ + } + + By("retrieving all remaining pages") + opts.Continue = list.Continue + lastRV := list.ResourceVersion + for { + list, err := client.List(opts) + Expect(err).ToNot(HaveOccurred()) + framework.Logf("Retrieved %d/%d results with rv %s and continue %s", len(list.Items), opts.Limit, list.ResourceVersion, list.Continue) + Expect(len(list.Items)).To(BeNumerically("<=", opts.Limit)) + Expect(list.ResourceVersion).To(Equal(lastRV)) + for _, item := range list.Items { + Expect(item.Name).To(Equal(fmt.Sprintf("template-%04d", found))) + found++ + } + if len(list.Continue) == 0 { + break + } + opts.Continue = list.Continue + } + Expect(found).To(BeNumerically("==", numberOfTotalResources)) + }) })