From a3578c864ed7db5a73daa88839eed11d86af209b Mon Sep 17 00:00:00 2001 From: wojtekt Date: Tue, 22 May 2018 15:32:25 +0200 Subject: [PATCH] Fix incorrectly set resource version in List --- .../apiserver/pkg/storage/etcd/etcd_helper.go | 5 +- .../pkg/storage/etcd/etcd_helper_test.go | 60 +++++++++++++++++++ .../apiserver/pkg/storage/etcd3/store.go | 28 ++++----- .../apiserver/pkg/storage/etcd3/store_test.go | 3 + .../pkg/storage/tests/cacher_test.go | 56 +++++++++++++++++ 5 files changed, 137 insertions(+), 15 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper.go index 3a34841f919..2fe2bbada2f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper.go @@ -347,7 +347,10 @@ func (h *etcdHelper) GetToList(ctx context.Context, key string, resourceVersion metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime) if err != nil { if etcdutil.IsEtcdNotFound(err) { - return nil + if etcdErr, ok := err.(etcd.Error); ok { + return h.versioner.UpdateList(listObj, etcdErr.Index, "") + } + return fmt.Errorf("unexpected error from storage: %#v", err) } return toStorageErr(err, key, 0) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper_test.go index b0c8fa6c19e..277ad9ac7b2 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper_test.go @@ -329,6 +329,66 @@ func TestGet(t *testing.T) { } } +func TestGetToList(t *testing.T) { + _, codecs := testScheme(t) + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) + server := etcdtesting.NewEtcdTestClientServer(t) + defer server.Terminate(t) + key := "/some/key" + helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) + + storedObj := &example.Pod{} + if err := helper.Create(context.TODO(), key, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, storedObj, 0); err != nil { + t.Errorf("Unexpected error %#v", err) + } + + tests := []struct { + key string + pred storage.SelectionPredicate + expectedOut []*example.Pod + }{{ // test GetToList on existing key + key: key, + pred: storage.Everything, + expectedOut: []*example.Pod{storedObj}, + }, { // test GetToList on non-existing key + key: "/non-existing", + pred: storage.Everything, + expectedOut: nil, + }, { // test GetToList with matching pod name + key: "/non-existing", + pred: storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.ParseSelectorOrDie("metadata.name!=" + storedObj.Name), + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { + pod := obj.(*example.Pod) + return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil + }, + }, + expectedOut: nil, + }} + + for i, tt := range tests { + out := &example.PodList{} + err := helper.GetToList(context.TODO(), tt.key, "", tt.pred, out) + if err != nil { + t.Fatalf("GetToList failed: %v", err) + } + if len(out.ResourceVersion) == 0 { + t.Errorf("#%d: unset resourceVersion", i) + } + if len(out.Items) != len(tt.expectedOut) { + t.Errorf("#%d: length of list want=%d, get=%d", i, len(tt.expectedOut), len(out.Items)) + continue + } + for j, wantPod := range tt.expectedOut { + getPod := &out.Items[j] + if !reflect.DeepEqual(wantPod, getPod) { + t.Errorf("#%d: pod want=%#v, get=%#v", i, wantPod, getPod) + } + } + } +} + func TestGetNotFoundErr(t *testing.T) { _, codecs := testScheme(t) codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: "v1"}) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 17384346a0b..fabb083e0ce 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -388,26 +388,26 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin if err != nil { return err } - key = path.Join(s.pathPrefix, key) - - getResp, err := s.client.KV.Get(ctx, key, s.getOps...) - if err != nil { - return err - } - if len(getResp.Kvs) == 0 { - return nil - } - data, _, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value, authenticatedDataString(key)) - if err != nil { - return storage.NewInternalError(err.Error()) - } v, err := conversion.EnforcePtr(listPtr) if err != nil || v.Kind() != reflect.Slice { panic("need ptr to slice") } - if err := appendListItem(v, data, uint64(getResp.Kvs[0].ModRevision), pred, s.codec, s.versioner); err != nil { + + key = path.Join(s.pathPrefix, key) + getResp, err := s.client.KV.Get(ctx, key, s.getOps...) + if err != nil { return err } + + if len(getResp.Kvs) > 0 { + data, _, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value, authenticatedDataString(key)) + if err != nil { + return storage.NewInternalError(err.Error()) + } + if err := appendListItem(v, data, uint64(getResp.Kvs[0].ModRevision), pred, s.codec, s.versioner); err != nil { + return err + } + } // update version with cluster level revision return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision), "") } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index 10496fb7a0c..b3e34be3989 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -326,6 +326,9 @@ func TestGetToList(t *testing.T) { if err != nil { t.Fatalf("GetToList failed: %v", err) } + if len(out.ResourceVersion) == 0 { + t.Errorf("#%d: unset resourceVersion", i) + } if len(out.Items) != len(tt.expectedOut) { t.Errorf("#%d: length of list want=%d, get=%d", i, len(tt.expectedOut), len(out.Items)) continue diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index bd3034f08aa..e6e7eec6492 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -166,6 +166,62 @@ func TestGet(t *testing.T) { } } +func TestGetToList(t *testing.T) { + server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix()) + defer server.Terminate(t) + cacher, _ := newTestCacher(etcdStorage, 10) + defer cacher.Stop() + + storedObj := updatePod(t, etcdStorage, makeTestPod("foo"), nil) + key := "pods/" + storedObj.Namespace + "/" + storedObj.Name + + tests := []struct { + key string + pred storage.SelectionPredicate + expectedOut []*example.Pod + }{{ // test GetToList on existing key + key: key, + pred: storage.Everything, + expectedOut: []*example.Pod{storedObj}, + }, { // test GetToList on non-existing key + key: "/non-existing", + pred: storage.Everything, + expectedOut: nil, + }, { // test GetToList with matching pod name + key: "/non-existing", + pred: storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.ParseSelectorOrDie("metadata.name!=" + storedObj.Name), + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { + pod := obj.(*example.Pod) + return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil + }, + }, + expectedOut: nil, + }} + + for i, tt := range tests { + out := &example.PodList{} + err := cacher.GetToList(context.TODO(), tt.key, "", tt.pred, out) + if err != nil { + t.Fatalf("GetToList failed: %v", err) + } + if len(out.ResourceVersion) == 0 { + t.Errorf("#%d: unset resourceVersion", i) + } + if len(out.Items) != len(tt.expectedOut) { + t.Errorf("#%d: length of list want=%d, get=%d", i, len(tt.expectedOut), len(out.Items)) + continue + } + for j, wantPod := range tt.expectedOut { + getPod := &out.Items[j] + if !reflect.DeepEqual(wantPod, getPod) { + t.Errorf("#%d: pod want=%#v, get=%#v", i, wantPod, getPod) + } + } + } +} + func TestList(t *testing.T) { server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix()) defer server.Terminate(t)