diff --git a/pkg/registry/core/service/allocator/etcd/etcd.go b/pkg/registry/core/service/allocator/etcd/etcd.go index febb65004fc..509b116d696 100644 --- a/pkg/registry/core/service/allocator/etcd/etcd.go +++ b/pkg/registry/core/service/allocator/etcd/etcd.go @@ -174,7 +174,7 @@ func (e *Etcd) tryUpdate(fn func() error) error { // etcd. If the key does not exist, the object will have an empty ResourceVersion. func (e *Etcd) Get() (*api.RangeAllocation, error) { existing := &api.RangeAllocation{} - if err := e.storage.Get(context.TODO(), e.baseKey, existing, true); err != nil { + if err := e.storage.Get(context.TODO(), e.baseKey, "", existing, true); err != nil { return nil, storeerr.InterpretGetError(err, e.resource, "") } return existing, nil diff --git a/pkg/registry/core/service/allocator/etcd/etcd_test.go b/pkg/registry/core/service/allocator/etcd/etcd_test.go index 847247352cd..d471e4d4854 100644 --- a/pkg/registry/core/service/allocator/etcd/etcd_test.go +++ b/pkg/registry/core/service/allocator/etcd/etcd_test.go @@ -78,7 +78,7 @@ func TestStore(t *testing.T) { other := allocator.NewAllocationMap(100, "rangeSpecValue") allocation := &api.RangeAllocation{} - if err := storage.storage.Get(context.TODO(), key(), allocation, false); err != nil { + if err := storage.storage.Get(context.TODO(), key(), "", allocation, false); err != nil { t.Fatal(err) } if allocation.Range != "rangeSpecValue" { diff --git a/pkg/registry/generic/registry/store.go b/pkg/registry/generic/registry/store.go index 85d1f2733a0..10e5712d3aa 100644 --- a/pkg/registry/generic/registry/store.go +++ b/pkg/registry/generic/registry/store.go @@ -262,7 +262,7 @@ func (e *Store) Create(ctx api.Context, obj runtime.Object) (runtime.Object, err if !kubeerr.IsAlreadyExists(err) { return nil, err } - if errGet := e.Storage.Get(ctx, key, out, false); errGet != nil { + if errGet := e.Storage.Get(ctx, key, "", out, false); errGet != nil { return nil, err } accessor, errGetAcc := meta.Accessor(out) @@ -483,7 +483,8 @@ func (e *Store) Get(ctx api.Context, name string) (runtime.Object, error) { if err != nil { return nil, err } - if err := e.Storage.Get(ctx, key, obj, false); err != nil { + // TODO: Once we pass GetOptions to this method, pass the ResourceVersion from it. + if err := e.Storage.Get(ctx, key, "", obj, false); err != nil { return nil, storeerr.InterpretGetError(err, e.QualifiedResource, name) } if e.Decorator != nil { @@ -695,7 +696,7 @@ func (e *Store) Delete(ctx api.Context, name string, options *api.DeleteOptions) } obj := e.NewFunc() - if err := e.Storage.Get(ctx, key, obj, false); err != nil { + if err := e.Storage.Get(ctx, key, "", obj, false); err != nil { return nil, storeerr.InterpretDeleteError(err, e.QualifiedResource, name) } // support older consumers of delete by treating "nil" as delete immediately diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index 1283eb45ca3..59c634748fb 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -345,8 +345,48 @@ func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion stri } // Implements storage.Interface. -func (c *Cacher) Get(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) error { - return c.storage.Get(ctx, key, objPtr, ignoreNotFound) +func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error { + if resourceVersion == "" { + // If resourceVersion is not specified, serve it from underlying + // storage (for backward compatibility). + return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound) + } + + // If resourceVersion is specified, serve it from cache. + // It's guaranteed that the returned value is at least that + // fresh as the given resourceVersion. + getRV, err := ParseListResourceVersion(resourceVersion) + if err != nil { + return err + } + + // Do not create a trace - it's not for free and there are tons + // of Get requests. We can add it if it will be really needed. + c.ready.wait() + + objVal, err := conversion.EnforcePtr(objPtr) + if err != nil { + return err + } + + obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(getRV, key, nil) + if err != nil { + return fmt.Errorf("failed to wait for fresh get: %v", err) + } + + if exists { + elem, ok := obj.(*storeElement) + if !ok { + return fmt.Errorf("non *storeElement returned from storage: %v", obj) + } + objVal.Set(reflect.ValueOf(elem.Object).Elem()) + } else { + objVal.Set(reflect.Zero(objVal.Type())) + if !ignoreNotFound { + return NewKeyNotFoundError(key, int64(readResourceVersion)) + } + } + return nil } // Implements storage.Interface. @@ -384,7 +424,7 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace) if err != nil { - return fmt.Errorf("failed to wait for fresh list: %v", err) + return fmt.Errorf("failed to wait for fresh get: %v", err) } trace.Step("Got from cache") diff --git a/pkg/storage/cacher_test.go b/pkg/storage/cacher_test.go index 9ef9eb2281e..5fa6c15a099 100644 --- a/pkg/storage/cacher_test.go +++ b/pkg/storage/cacher_test.go @@ -90,12 +90,43 @@ func updatePod(t *testing.T, s storage.Interface, obj, old *api.Pod) *api.Pod { } obj.ResourceVersion = "" result := &api.Pod{} - if err := s.Get(context.TODO(), key, result, false); err != nil { + if err := s.Get(context.TODO(), key, "", result, false); err != nil { t.Errorf("unexpected error: %v", err) } return result } +func TestGet(t *testing.T) { + server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix()) + defer server.Terminate(t) + cacher := newTestCacher(etcdStorage, 10) + defer cacher.Stop() + + podFoo := makeTestPod("foo") + fooCreated := updatePod(t, etcdStorage, podFoo, nil) + + // We pass the ResourceVersion from the above Create() operation. + result := &api.Pod{} + if err := cacher.Get(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, result, true); err != nil { + t.Errorf("Unexpected error: %v", err) + } + if e, a := *fooCreated, *result; !reflect.DeepEqual(e, a) { + t.Errorf("Expected: %#v, got: %#v", e, a) + } + + if err := cacher.Get(context.TODO(), "pods/ns/bar", fooCreated.ResourceVersion, result, true); err != nil { + t.Errorf("Unexpected error: %v", err) + } + emptyPod := api.Pod{} + if e, a := emptyPod, *result; !reflect.DeepEqual(e, a) { + t.Errorf("Expected: %#v, got: %#v", e, a) + } + + if err := cacher.Get(context.TODO(), "pods/ns/bar", fooCreated.ResourceVersion, result, false); !storage.IsNotFound(err) { + t.Errorf("Unexpected error: %v", err) + } +} + func TestList(t *testing.T) { server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix()) defer server.Terminate(t) diff --git a/pkg/storage/etcd/etcd_helper.go b/pkg/storage/etcd/etcd_helper.go index a7245905c03..d7ddabb65c6 100644 --- a/pkg/storage/etcd/etcd_helper.go +++ b/pkg/storage/etcd/etcd_helper.go @@ -232,7 +232,7 @@ func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion } // Implements storage.Interface. -func (h *etcdHelper) Get(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) error { +func (h *etcdHelper) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error { if ctx == nil { glog.Errorf("Context is nil") } diff --git a/pkg/storage/etcd/etcd_helper_test.go b/pkg/storage/etcd/etcd_helper_test.go index 57960616385..65d6a557f9a 100644 --- a/pkg/storage/etcd/etcd_helper_test.go +++ b/pkg/storage/etcd/etcd_helper_test.go @@ -240,7 +240,7 @@ func TestGet(t *testing.T) { t.Errorf("Unexpected error %#v", err) } expect = got - if err := helper.Get(context.TODO(), key, &got, false); err != nil { + if err := helper.Get(context.TODO(), key, "", &got, false); err != nil { t.Errorf("Unexpected error %#v", err) } if !reflect.DeepEqual(got, expect) { @@ -256,7 +256,7 @@ func TestGetNotFoundErr(t *testing.T) { helper := newEtcdHelper(server.Client, testapi.Default.Codec(), key) var got api.Pod - err := helper.Get(context.TODO(), boguskey, &got, false) + err := helper.Get(context.TODO(), boguskey, "", &got, false) if !storage.IsNotFound(err) { t.Errorf("Unexpected reponse on key=%v, err=%v", key, err) } @@ -276,7 +276,7 @@ func TestCreate(t *testing.T) { if err != nil { t.Errorf("Unexpected error %#v", err) } - err = helper.Get(context.TODO(), "/some/key", returnedObj, false) + err = helper.Get(context.TODO(), "/some/key", "", returnedObj, false) if err != nil { t.Errorf("Unexpected error %#v", err) } @@ -332,7 +332,7 @@ func TestGuaranteedUpdate(t *testing.T) { } objCheck := &storagetesting.TestResource{} - err = helper.Get(context.TODO(), key, objCheck, false) + err = helper.Get(context.TODO(), key, "", objCheck, false) if err != nil { t.Errorf("Unexpected error %#v", err) } @@ -442,7 +442,7 @@ func TestGuaranteedUpdate_CreateCollision(t *testing.T) { wgDone.Wait() stored := &storagetesting.TestResource{} - err := helper.Get(context.TODO(), key, stored, false) + err := helper.Get(context.TODO(), key, "", stored, false) if err != nil { t.Errorf("Unexpected error %#v", stored) } @@ -562,7 +562,7 @@ func TestDeleteWithRetry(t *testing.T) { if fake.getCount != expectedRetries { t.Errorf("Expect %d retries, got %d", expectedRetries, fake.getCount) } - err = helper.Get(context.TODO(), "/some/key", obj, false) + err = helper.Get(context.TODO(), "/some/key", "", obj, false) if !storage.IsNotFound(err) { t.Errorf("Expect an NotFound error, got %v", err) } diff --git a/pkg/storage/etcd3/store.go b/pkg/storage/etcd3/store.go index 0e080f52853..f240be8b3df 100644 --- a/pkg/storage/etcd3/store.go +++ b/pkg/storage/etcd3/store.go @@ -96,7 +96,7 @@ func (s *store) Versioner() storage.Versioner { } // Get implements storage.Interface.Get. -func (s *store) Get(ctx context.Context, key string, out runtime.Object, ignoreNotFound bool) error { +func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error { key = keyWithPrefix(s.pathPrefix, key) getResp, err := s.client.KV.Get(ctx, key, s.getOps...) if err != nil { diff --git a/pkg/storage/etcd3/store_test.go b/pkg/storage/etcd3/store_test.go index b5e101490f8..cc9d51ca8cc 100644 --- a/pkg/storage/etcd3/store_test.go +++ b/pkg/storage/etcd3/store_test.go @@ -133,7 +133,7 @@ func TestGet(t *testing.T) { for i, tt := range tests { out := &api.Pod{} - err := store.Get(ctx, tt.key, out, tt.ignoreNotFound) + err := store.Get(ctx, tt.key, "", out, tt.ignoreNotFound) if tt.expectNotFoundErr { if err == nil || !storage.IsNotFound(err) { t.Errorf("#%d: expecting not found error, but get: %s", i, err) diff --git a/pkg/storage/interfaces.go b/pkg/storage/interfaces.go index 612d3741b7b..3ce105301aa 100644 --- a/pkg/storage/interfaces.go +++ b/pkg/storage/interfaces.go @@ -124,7 +124,9 @@ type Interface interface { // Get unmarshals json found at key into objPtr. On a not found error, will either // return a zero object of the requested type, or an error, depending on ignoreNotFound. // Treats empty responses and nil response nodes exactly like a not found error. - Get(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) error + // The returned contents may be delayed, but it is guaranteed that they will + // be have at least 'resourceVersion'. + Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error // GetToList unmarshals json found at key and opaque it into *List api object // (an object that satisfies the runtime.IsList definition). diff --git a/test/integration/objectmeta/objectmeta_test.go b/test/integration/objectmeta/objectmeta_test.go index e7f315f3837..40d8c0497a7 100644 --- a/test/integration/objectmeta/objectmeta_test.go +++ b/test/integration/objectmeta/objectmeta_test.go @@ -70,7 +70,7 @@ func TestIgnoreClusterName(t *testing.T) { assert.Empty(t, nsNew.ClusterName) nsEtcd := v1.Namespace{} - err = etcdStorage.Get(ctx, ns.Name, &nsEtcd, false) + err = etcdStorage.Get(ctx, ns.Name, "", &nsEtcd, false) assert.Nil(t, err) assert.Equal(t, ns.Name, nsEtcd.Name) assert.Empty(t, nsEtcd.ClusterName) @@ -81,7 +81,7 @@ func TestIgnoreClusterName(t *testing.T) { assert.Empty(t, nsNew.ClusterName) nsEtcd = v1.Namespace{} - err = etcdStorage.Get(ctx, ns.Name, &nsEtcd, false) + err = etcdStorage.Get(ctx, ns.Name, "", &nsEtcd, false) assert.Nil(t, err) assert.Equal(t, ns.Name, nsEtcd.Name) assert.Empty(t, nsEtcd.ClusterName)