mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 13:37:30 +00:00
Merge pull request #37692 from wojtek-t/storage_resource_version_for_get
Automatic merge from submit-queue Handle RV in Get calls to storage interface. Ref #37473
This commit is contained in:
commit
16a9c0b49c
@ -174,7 +174,7 @@ func (e *Etcd) tryUpdate(fn func() error) error {
|
|||||||
// etcd. If the key does not exist, the object will have an empty ResourceVersion.
|
// etcd. If the key does not exist, the object will have an empty ResourceVersion.
|
||||||
func (e *Etcd) Get() (*api.RangeAllocation, error) {
|
func (e *Etcd) Get() (*api.RangeAllocation, error) {
|
||||||
existing := &api.RangeAllocation{}
|
existing := &api.RangeAllocation{}
|
||||||
if err := e.storage.Get(context.TODO(), e.baseKey, existing, true); err != nil {
|
if err := e.storage.Get(context.TODO(), e.baseKey, "", existing, true); err != nil {
|
||||||
return nil, storeerr.InterpretGetError(err, e.resource, "")
|
return nil, storeerr.InterpretGetError(err, e.resource, "")
|
||||||
}
|
}
|
||||||
return existing, nil
|
return existing, nil
|
||||||
|
@ -78,7 +78,7 @@ func TestStore(t *testing.T) {
|
|||||||
other := allocator.NewAllocationMap(100, "rangeSpecValue")
|
other := allocator.NewAllocationMap(100, "rangeSpecValue")
|
||||||
|
|
||||||
allocation := &api.RangeAllocation{}
|
allocation := &api.RangeAllocation{}
|
||||||
if err := storage.storage.Get(context.TODO(), key(), allocation, false); err != nil {
|
if err := storage.storage.Get(context.TODO(), key(), "", allocation, false); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if allocation.Range != "rangeSpecValue" {
|
if allocation.Range != "rangeSpecValue" {
|
||||||
|
@ -262,7 +262,7 @@ func (e *Store) Create(ctx api.Context, obj runtime.Object) (runtime.Object, err
|
|||||||
if !kubeerr.IsAlreadyExists(err) {
|
if !kubeerr.IsAlreadyExists(err) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if errGet := e.Storage.Get(ctx, key, out, false); errGet != nil {
|
if errGet := e.Storage.Get(ctx, key, "", out, false); errGet != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
accessor, errGetAcc := meta.Accessor(out)
|
accessor, errGetAcc := meta.Accessor(out)
|
||||||
@ -483,7 +483,8 @@ func (e *Store) Get(ctx api.Context, name string) (runtime.Object, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := e.Storage.Get(ctx, key, obj, false); err != nil {
|
// TODO: Once we pass GetOptions to this method, pass the ResourceVersion from it.
|
||||||
|
if err := e.Storage.Get(ctx, key, "", obj, false); err != nil {
|
||||||
return nil, storeerr.InterpretGetError(err, e.QualifiedResource, name)
|
return nil, storeerr.InterpretGetError(err, e.QualifiedResource, name)
|
||||||
}
|
}
|
||||||
if e.Decorator != nil {
|
if e.Decorator != nil {
|
||||||
@ -695,7 +696,7 @@ func (e *Store) Delete(ctx api.Context, name string, options *api.DeleteOptions)
|
|||||||
}
|
}
|
||||||
|
|
||||||
obj := e.NewFunc()
|
obj := e.NewFunc()
|
||||||
if err := e.Storage.Get(ctx, key, obj, false); err != nil {
|
if err := e.Storage.Get(ctx, key, "", obj, false); err != nil {
|
||||||
return nil, storeerr.InterpretDeleteError(err, e.QualifiedResource, name)
|
return nil, storeerr.InterpretDeleteError(err, e.QualifiedResource, name)
|
||||||
}
|
}
|
||||||
// support older consumers of delete by treating "nil" as delete immediately
|
// support older consumers of delete by treating "nil" as delete immediately
|
||||||
|
@ -345,8 +345,48 @@ func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion stri
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Implements storage.Interface.
|
// Implements storage.Interface.
|
||||||
func (c *Cacher) Get(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) error {
|
func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error {
|
||||||
return c.storage.Get(ctx, key, objPtr, ignoreNotFound)
|
if resourceVersion == "" {
|
||||||
|
// If resourceVersion is not specified, serve it from underlying
|
||||||
|
// storage (for backward compatibility).
|
||||||
|
return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound)
|
||||||
|
}
|
||||||
|
|
||||||
|
// If resourceVersion is specified, serve it from cache.
|
||||||
|
// It's guaranteed that the returned value is at least that
|
||||||
|
// fresh as the given resourceVersion.
|
||||||
|
getRV, err := ParseListResourceVersion(resourceVersion)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do not create a trace - it's not for free and there are tons
|
||||||
|
// of Get requests. We can add it if it will be really needed.
|
||||||
|
c.ready.wait()
|
||||||
|
|
||||||
|
objVal, err := conversion.EnforcePtr(objPtr)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(getRV, key, nil)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to wait for fresh get: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if exists {
|
||||||
|
elem, ok := obj.(*storeElement)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
|
||||||
|
}
|
||||||
|
objVal.Set(reflect.ValueOf(elem.Object).Elem())
|
||||||
|
} else {
|
||||||
|
objVal.Set(reflect.Zero(objVal.Type()))
|
||||||
|
if !ignoreNotFound {
|
||||||
|
return NewKeyNotFoundError(key, int64(readResourceVersion))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements storage.Interface.
|
// Implements storage.Interface.
|
||||||
@ -384,7 +424,7 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri
|
|||||||
|
|
||||||
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace)
|
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to wait for fresh list: %v", err)
|
return fmt.Errorf("failed to wait for fresh get: %v", err)
|
||||||
}
|
}
|
||||||
trace.Step("Got from cache")
|
trace.Step("Got from cache")
|
||||||
|
|
||||||
|
@ -90,12 +90,43 @@ func updatePod(t *testing.T, s storage.Interface, obj, old *api.Pod) *api.Pod {
|
|||||||
}
|
}
|
||||||
obj.ResourceVersion = ""
|
obj.ResourceVersion = ""
|
||||||
result := &api.Pod{}
|
result := &api.Pod{}
|
||||||
if err := s.Get(context.TODO(), key, result, false); err != nil {
|
if err := s.Get(context.TODO(), key, "", result, false); err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestGet(t *testing.T) {
|
||||||
|
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
|
||||||
|
defer server.Terminate(t)
|
||||||
|
cacher := newTestCacher(etcdStorage, 10)
|
||||||
|
defer cacher.Stop()
|
||||||
|
|
||||||
|
podFoo := makeTestPod("foo")
|
||||||
|
fooCreated := updatePod(t, etcdStorage, podFoo, nil)
|
||||||
|
|
||||||
|
// We pass the ResourceVersion from the above Create() operation.
|
||||||
|
result := &api.Pod{}
|
||||||
|
if err := cacher.Get(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, result, true); err != nil {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if e, a := *fooCreated, *result; !reflect.DeepEqual(e, a) {
|
||||||
|
t.Errorf("Expected: %#v, got: %#v", e, a)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := cacher.Get(context.TODO(), "pods/ns/bar", fooCreated.ResourceVersion, result, true); err != nil {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
emptyPod := api.Pod{}
|
||||||
|
if e, a := emptyPod, *result; !reflect.DeepEqual(e, a) {
|
||||||
|
t.Errorf("Expected: %#v, got: %#v", e, a)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := cacher.Get(context.TODO(), "pods/ns/bar", fooCreated.ResourceVersion, result, false); !storage.IsNotFound(err) {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestList(t *testing.T) {
|
func TestList(t *testing.T) {
|
||||||
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
|
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
|
@ -232,7 +232,7 @@ func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Implements storage.Interface.
|
// Implements storage.Interface.
|
||||||
func (h *etcdHelper) Get(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) error {
|
func (h *etcdHelper) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error {
|
||||||
if ctx == nil {
|
if ctx == nil {
|
||||||
glog.Errorf("Context is nil")
|
glog.Errorf("Context is nil")
|
||||||
}
|
}
|
||||||
|
@ -240,7 +240,7 @@ func TestGet(t *testing.T) {
|
|||||||
t.Errorf("Unexpected error %#v", err)
|
t.Errorf("Unexpected error %#v", err)
|
||||||
}
|
}
|
||||||
expect = got
|
expect = got
|
||||||
if err := helper.Get(context.TODO(), key, &got, false); err != nil {
|
if err := helper.Get(context.TODO(), key, "", &got, false); err != nil {
|
||||||
t.Errorf("Unexpected error %#v", err)
|
t.Errorf("Unexpected error %#v", err)
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(got, expect) {
|
if !reflect.DeepEqual(got, expect) {
|
||||||
@ -256,7 +256,7 @@ func TestGetNotFoundErr(t *testing.T) {
|
|||||||
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), key)
|
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), key)
|
||||||
|
|
||||||
var got api.Pod
|
var got api.Pod
|
||||||
err := helper.Get(context.TODO(), boguskey, &got, false)
|
err := helper.Get(context.TODO(), boguskey, "", &got, false)
|
||||||
if !storage.IsNotFound(err) {
|
if !storage.IsNotFound(err) {
|
||||||
t.Errorf("Unexpected reponse on key=%v, err=%v", key, err)
|
t.Errorf("Unexpected reponse on key=%v, err=%v", key, err)
|
||||||
}
|
}
|
||||||
@ -276,7 +276,7 @@ func TestCreate(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error %#v", err)
|
t.Errorf("Unexpected error %#v", err)
|
||||||
}
|
}
|
||||||
err = helper.Get(context.TODO(), "/some/key", returnedObj, false)
|
err = helper.Get(context.TODO(), "/some/key", "", returnedObj, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error %#v", err)
|
t.Errorf("Unexpected error %#v", err)
|
||||||
}
|
}
|
||||||
@ -332,7 +332,7 @@ func TestGuaranteedUpdate(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
objCheck := &storagetesting.TestResource{}
|
objCheck := &storagetesting.TestResource{}
|
||||||
err = helper.Get(context.TODO(), key, objCheck, false)
|
err = helper.Get(context.TODO(), key, "", objCheck, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error %#v", err)
|
t.Errorf("Unexpected error %#v", err)
|
||||||
}
|
}
|
||||||
@ -442,7 +442,7 @@ func TestGuaranteedUpdate_CreateCollision(t *testing.T) {
|
|||||||
wgDone.Wait()
|
wgDone.Wait()
|
||||||
|
|
||||||
stored := &storagetesting.TestResource{}
|
stored := &storagetesting.TestResource{}
|
||||||
err := helper.Get(context.TODO(), key, stored, false)
|
err := helper.Get(context.TODO(), key, "", stored, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error %#v", stored)
|
t.Errorf("Unexpected error %#v", stored)
|
||||||
}
|
}
|
||||||
@ -562,7 +562,7 @@ func TestDeleteWithRetry(t *testing.T) {
|
|||||||
if fake.getCount != expectedRetries {
|
if fake.getCount != expectedRetries {
|
||||||
t.Errorf("Expect %d retries, got %d", expectedRetries, fake.getCount)
|
t.Errorf("Expect %d retries, got %d", expectedRetries, fake.getCount)
|
||||||
}
|
}
|
||||||
err = helper.Get(context.TODO(), "/some/key", obj, false)
|
err = helper.Get(context.TODO(), "/some/key", "", obj, false)
|
||||||
if !storage.IsNotFound(err) {
|
if !storage.IsNotFound(err) {
|
||||||
t.Errorf("Expect an NotFound error, got %v", err)
|
t.Errorf("Expect an NotFound error, got %v", err)
|
||||||
}
|
}
|
||||||
|
@ -96,7 +96,7 @@ func (s *store) Versioner() storage.Versioner {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Get implements storage.Interface.Get.
|
// Get implements storage.Interface.Get.
|
||||||
func (s *store) Get(ctx context.Context, key string, out runtime.Object, ignoreNotFound bool) error {
|
func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error {
|
||||||
key = keyWithPrefix(s.pathPrefix, key)
|
key = keyWithPrefix(s.pathPrefix, key)
|
||||||
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
|
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -133,7 +133,7 @@ func TestGet(t *testing.T) {
|
|||||||
|
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
out := &api.Pod{}
|
out := &api.Pod{}
|
||||||
err := store.Get(ctx, tt.key, out, tt.ignoreNotFound)
|
err := store.Get(ctx, tt.key, "", out, tt.ignoreNotFound)
|
||||||
if tt.expectNotFoundErr {
|
if tt.expectNotFoundErr {
|
||||||
if err == nil || !storage.IsNotFound(err) {
|
if err == nil || !storage.IsNotFound(err) {
|
||||||
t.Errorf("#%d: expecting not found error, but get: %s", i, err)
|
t.Errorf("#%d: expecting not found error, but get: %s", i, err)
|
||||||
|
@ -124,7 +124,9 @@ type Interface interface {
|
|||||||
// Get unmarshals json found at key into objPtr. On a not found error, will either
|
// Get unmarshals json found at key into objPtr. On a not found error, will either
|
||||||
// return a zero object of the requested type, or an error, depending on ignoreNotFound.
|
// return a zero object of the requested type, or an error, depending on ignoreNotFound.
|
||||||
// Treats empty responses and nil response nodes exactly like a not found error.
|
// Treats empty responses and nil response nodes exactly like a not found error.
|
||||||
Get(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) error
|
// The returned contents may be delayed, but it is guaranteed that they will
|
||||||
|
// be have at least 'resourceVersion'.
|
||||||
|
Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error
|
||||||
|
|
||||||
// GetToList unmarshals json found at key and opaque it into *List api object
|
// GetToList unmarshals json found at key and opaque it into *List api object
|
||||||
// (an object that satisfies the runtime.IsList definition).
|
// (an object that satisfies the runtime.IsList definition).
|
||||||
|
@ -70,7 +70,7 @@ func TestIgnoreClusterName(t *testing.T) {
|
|||||||
assert.Empty(t, nsNew.ClusterName)
|
assert.Empty(t, nsNew.ClusterName)
|
||||||
|
|
||||||
nsEtcd := v1.Namespace{}
|
nsEtcd := v1.Namespace{}
|
||||||
err = etcdStorage.Get(ctx, ns.Name, &nsEtcd, false)
|
err = etcdStorage.Get(ctx, ns.Name, "", &nsEtcd, false)
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
assert.Equal(t, ns.Name, nsEtcd.Name)
|
assert.Equal(t, ns.Name, nsEtcd.Name)
|
||||||
assert.Empty(t, nsEtcd.ClusterName)
|
assert.Empty(t, nsEtcd.ClusterName)
|
||||||
@ -81,7 +81,7 @@ func TestIgnoreClusterName(t *testing.T) {
|
|||||||
assert.Empty(t, nsNew.ClusterName)
|
assert.Empty(t, nsNew.ClusterName)
|
||||||
|
|
||||||
nsEtcd = v1.Namespace{}
|
nsEtcd = v1.Namespace{}
|
||||||
err = etcdStorage.Get(ctx, ns.Name, &nsEtcd, false)
|
err = etcdStorage.Get(ctx, ns.Name, "", &nsEtcd, false)
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
assert.Equal(t, ns.Name, nsEtcd.Name)
|
assert.Equal(t, ns.Name, nsEtcd.Name)
|
||||||
assert.Empty(t, nsEtcd.ClusterName)
|
assert.Empty(t, nsEtcd.ClusterName)
|
||||||
|
Loading…
Reference in New Issue
Block a user