Fix incorrectly set resource version in List

This commit is contained in:
wojtekt 2018-05-22 15:32:25 +02:00
parent a0036fcae1
commit a3578c864e
5 changed files with 137 additions and 15 deletions

View File

@ -347,7 +347,10 @@ func (h *etcdHelper) GetToList(ctx context.Context, key string, resourceVersion
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
if err != nil {
if etcdutil.IsEtcdNotFound(err) {
return nil
if etcdErr, ok := err.(etcd.Error); ok {
return h.versioner.UpdateList(listObj, etcdErr.Index, "")
}
return fmt.Errorf("unexpected error from storage: %#v", err)
}
return toStorageErr(err, key, 0)
}

View File

@ -329,6 +329,66 @@ func TestGet(t *testing.T) {
}
}
func TestGetToList(t *testing.T) {
_, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
key := "/some/key"
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
storedObj := &example.Pod{}
if err := helper.Create(context.TODO(), key, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, storedObj, 0); err != nil {
t.Errorf("Unexpected error %#v", err)
}
tests := []struct {
key string
pred storage.SelectionPredicate
expectedOut []*example.Pod
}{{ // test GetToList on existing key
key: key,
pred: storage.Everything,
expectedOut: []*example.Pod{storedObj},
}, { // test GetToList on non-existing key
key: "/non-existing",
pred: storage.Everything,
expectedOut: nil,
}, { // test GetToList with matching pod name
key: "/non-existing",
pred: storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.ParseSelectorOrDie("metadata.name!=" + storedObj.Name),
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
},
},
expectedOut: nil,
}}
for i, tt := range tests {
out := &example.PodList{}
err := helper.GetToList(context.TODO(), tt.key, "", tt.pred, out)
if err != nil {
t.Fatalf("GetToList failed: %v", err)
}
if len(out.ResourceVersion) == 0 {
t.Errorf("#%d: unset resourceVersion", i)
}
if len(out.Items) != len(tt.expectedOut) {
t.Errorf("#%d: length of list want=%d, get=%d", i, len(tt.expectedOut), len(out.Items))
continue
}
for j, wantPod := range tt.expectedOut {
getPod := &out.Items[j]
if !reflect.DeepEqual(wantPod, getPod) {
t.Errorf("#%d: pod want=%#v, get=%#v", i, wantPod, getPod)
}
}
}
}
func TestGetNotFoundErr(t *testing.T) {
_, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: "v1"})

View File

@ -388,26 +388,26 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin
if err != nil {
return err
}
key = path.Join(s.pathPrefix, key)
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
if err != nil {
return err
}
if len(getResp.Kvs) == 0 {
return nil
}
data, _, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value, authenticatedDataString(key))
if err != nil {
return storage.NewInternalError(err.Error())
}
v, err := conversion.EnforcePtr(listPtr)
if err != nil || v.Kind() != reflect.Slice {
panic("need ptr to slice")
}
if err := appendListItem(v, data, uint64(getResp.Kvs[0].ModRevision), pred, s.codec, s.versioner); err != nil {
key = path.Join(s.pathPrefix, key)
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
if err != nil {
return err
}
if len(getResp.Kvs) > 0 {
data, _, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value, authenticatedDataString(key))
if err != nil {
return storage.NewInternalError(err.Error())
}
if err := appendListItem(v, data, uint64(getResp.Kvs[0].ModRevision), pred, s.codec, s.versioner); err != nil {
return err
}
}
// update version with cluster level revision
return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision), "")
}

View File

@ -326,6 +326,9 @@ func TestGetToList(t *testing.T) {
if err != nil {
t.Fatalf("GetToList failed: %v", err)
}
if len(out.ResourceVersion) == 0 {
t.Errorf("#%d: unset resourceVersion", i)
}
if len(out.Items) != len(tt.expectedOut) {
t.Errorf("#%d: length of list want=%d, get=%d", i, len(tt.expectedOut), len(out.Items))
continue

View File

@ -166,6 +166,62 @@ func TestGet(t *testing.T) {
}
}
func TestGetToList(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
defer server.Terminate(t)
cacher, _ := newTestCacher(etcdStorage, 10)
defer cacher.Stop()
storedObj := updatePod(t, etcdStorage, makeTestPod("foo"), nil)
key := "pods/" + storedObj.Namespace + "/" + storedObj.Name
tests := []struct {
key string
pred storage.SelectionPredicate
expectedOut []*example.Pod
}{{ // test GetToList on existing key
key: key,
pred: storage.Everything,
expectedOut: []*example.Pod{storedObj},
}, { // test GetToList on non-existing key
key: "/non-existing",
pred: storage.Everything,
expectedOut: nil,
}, { // test GetToList with matching pod name
key: "/non-existing",
pred: storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.ParseSelectorOrDie("metadata.name!=" + storedObj.Name),
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
},
},
expectedOut: nil,
}}
for i, tt := range tests {
out := &example.PodList{}
err := cacher.GetToList(context.TODO(), tt.key, "", tt.pred, out)
if err != nil {
t.Fatalf("GetToList failed: %v", err)
}
if len(out.ResourceVersion) == 0 {
t.Errorf("#%d: unset resourceVersion", i)
}
if len(out.Items) != len(tt.expectedOut) {
t.Errorf("#%d: length of list want=%d, get=%d", i, len(tt.expectedOut), len(out.Items))
continue
}
for j, wantPod := range tt.expectedOut {
getPod := &out.Items[j]
if !reflect.DeepEqual(wantPod, getPod) {
t.Errorf("#%d: pod want=%#v, get=%#v", i, wantPod, getPod)
}
}
}
}
func TestList(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
defer server.Terminate(t)