mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Do not persist SelfLink into etcd storage
This behavior regressed in an earlier release. Clearing the self link ensures that a new version is always written and reduces the size of the stored object by a small amount. Add tests to verify that Create and Update result in no SelfLink stored in etcd.
This commit is contained in:
parent
fdee1d5488
commit
461c3701f0
@ -389,7 +389,7 @@ func TestEtcdStorage(t *testing.T) {
|
||||
Metadata: Metadata{
|
||||
Name: "noxus.mygroup.example.com",
|
||||
Namespace: "",
|
||||
SelfLink: "/apis/apiextensions.k8s.io/v1beta1/customresourcedefinitions/noxus.mygroup.example.com",
|
||||
SelfLink: "",
|
||||
},
|
||||
},
|
||||
},
|
||||
@ -414,7 +414,7 @@ func TestEtcdStorage(t *testing.T) {
|
||||
Metadata: Metadata{
|
||||
Name: "curlets.mygroup.example.com",
|
||||
Namespace: "",
|
||||
SelfLink: "/apis/apiextensions.k8s.io/v1beta1/customresourcedefinitions/curlets.mygroup.example.com",
|
||||
SelfLink: "",
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -56,6 +56,17 @@ func (a APIObjectVersioner) UpdateList(obj runtime.Object, resourceVersion uint6
|
||||
return nil
|
||||
}
|
||||
|
||||
// PrepareObjectForStorage clears resource version and self link prior to writing to etcd.
|
||||
func (a APIObjectVersioner) PrepareObjectForStorage(obj runtime.Object) error {
|
||||
accessor, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
accessor.SetResourceVersion("")
|
||||
accessor.SetSelfLink("")
|
||||
return nil
|
||||
}
|
||||
|
||||
// ObjectResourceVersion implements Versioner
|
||||
func (a APIObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
|
||||
accessor, err := meta.Accessor(obj)
|
||||
|
@ -128,6 +128,9 @@ func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Ob
|
||||
if version, err := h.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
|
||||
return errors.New("resourceVersion may not be set on objects to be created")
|
||||
}
|
||||
if err := h.versioner.PrepareObjectForStorage(obj); err != nil {
|
||||
return fmt.Errorf("PrepareObjectForStorage returned an error: %v", err)
|
||||
}
|
||||
trace.Step("Version checked")
|
||||
|
||||
startTime := time.Now()
|
||||
@ -530,7 +533,7 @@ func (h *etcdHelper) GuaranteedUpdate(
|
||||
}
|
||||
|
||||
// Since update object may have a resourceVersion set, we need to clear it here.
|
||||
if err := h.versioner.UpdateObject(ret, 0); err != nil {
|
||||
if err := h.versioner.PrepareObjectForStorage(ret); err != nil {
|
||||
return errors.New("resourceVersion cannot be set on objects store in etcd")
|
||||
}
|
||||
|
||||
|
@ -147,6 +147,9 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
|
||||
if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
|
||||
return errors.New("resourceVersion should not be set on objects to be created")
|
||||
}
|
||||
if err := s.versioner.PrepareObjectForStorage(obj); err != nil {
|
||||
return fmt.Errorf("PrepareObjectForStorage failed: %v", err)
|
||||
}
|
||||
data, err := runtime.Encode(s.codec, obj)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -486,8 +489,8 @@ func (s *store) getStateFromObject(obj runtime.Object) (*objState, error) {
|
||||
|
||||
// Compute the serialized form - for that we need to temporarily clean
|
||||
// its resource version field (those are not stored in etcd).
|
||||
if err := s.versioner.UpdateObject(obj, 0); err != nil {
|
||||
return nil, errors.New("resourceVersion cannot be set on objects store in etcd")
|
||||
if err := s.versioner.PrepareObjectForStorage(obj); err != nil {
|
||||
return nil, fmt.Errorf("PrepareObjectForStorage failed: %v", err)
|
||||
}
|
||||
state.data, err = runtime.Encode(s.codec, obj)
|
||||
if err != nil {
|
||||
@ -503,15 +506,8 @@ func (s *store) updateState(st *objState, userUpdate storage.UpdateFunc) (runtim
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
version, err := s.versioner.ObjectResourceVersion(ret)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
if version != 0 {
|
||||
// We cannot store object with resourceVersion in etcd. We need to reset it.
|
||||
if err := s.versioner.UpdateObject(ret, 0); err != nil {
|
||||
return nil, 0, fmt.Errorf("UpdateObject failed: %v", err)
|
||||
}
|
||||
if err := s.versioner.PrepareObjectForStorage(ret); err != nil {
|
||||
return nil, 0, fmt.Errorf("PrepareObjectForStorage failed: %v", err)
|
||||
}
|
||||
var ttl uint64
|
||||
if ttlPtr != nil {
|
||||
|
@ -38,6 +38,7 @@ import (
|
||||
storagetests "k8s.io/apiserver/pkg/storage/tests"
|
||||
"k8s.io/apiserver/pkg/storage/value"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/integration"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
@ -45,6 +46,8 @@ import (
|
||||
var scheme = runtime.NewScheme()
|
||||
var codecs = serializer.NewCodecFactory(scheme)
|
||||
|
||||
const defaultTestPrefix = "test!"
|
||||
|
||||
func init() {
|
||||
metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion)
|
||||
example.AddToScheme(scheme)
|
||||
@ -84,7 +87,7 @@ func TestCreate(t *testing.T) {
|
||||
|
||||
key := "/testkey"
|
||||
out := &example.Pod{}
|
||||
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
||||
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", SelfLink: "testlink"}}
|
||||
|
||||
// verify that kv pair is empty before set
|
||||
getResp, err := etcdClient.KV.Get(ctx, key)
|
||||
@ -106,15 +109,32 @@ func TestCreate(t *testing.T) {
|
||||
if out.ResourceVersion == "" {
|
||||
t.Errorf("output should have non-empty resource version")
|
||||
}
|
||||
if out.SelfLink != "" {
|
||||
t.Errorf("output should have empty self link")
|
||||
}
|
||||
|
||||
// verify that kv pair is not empty after set
|
||||
getResp, err = etcdClient.KV.Get(ctx, key)
|
||||
checkStorageInvariants(ctx, t, etcdClient, store, key)
|
||||
}
|
||||
|
||||
func checkStorageInvariants(ctx context.Context, t *testing.T, etcdClient *clientv3.Client, store *store, key string) {
|
||||
getResp, err := etcdClient.KV.Get(ctx, key)
|
||||
if err != nil {
|
||||
t.Fatalf("etcdClient.KV.Get failed: %v", err)
|
||||
}
|
||||
if len(getResp.Kvs) == 0 {
|
||||
t.Fatalf("expecting non empty result on key: %s", key)
|
||||
}
|
||||
decoded, err := runtime.Decode(store.codec, getResp.Kvs[0].Value[len(defaultTestPrefix):])
|
||||
if err != nil {
|
||||
t.Fatalf("expecting successful decode of object from %v\n%v", err, string(getResp.Kvs[0].Value))
|
||||
}
|
||||
obj := decoded.(*example.Pod)
|
||||
if obj.ResourceVersion != "" {
|
||||
t.Errorf("stored object should have empty resource version")
|
||||
}
|
||||
if obj.SelfLink != "" {
|
||||
t.Errorf("stored output should have empty self link")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateWithTTL(t *testing.T) {
|
||||
@ -316,7 +336,8 @@ func TestGetToList(t *testing.T) {
|
||||
func TestGuaranteedUpdate(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
key, storeObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}})
|
||||
etcdClient := cluster.RandClient()
|
||||
key := "/testkey"
|
||||
|
||||
tests := []struct {
|
||||
key string
|
||||
@ -326,6 +347,7 @@ func TestGuaranteedUpdate(t *testing.T) {
|
||||
expectInvalidObjErr bool
|
||||
expectNoUpdate bool
|
||||
transformStale bool
|
||||
hasSelfLink bool
|
||||
}{{ // GuaranteedUpdate on non-existing key with ignoreNotFound=false
|
||||
key: "/non-existing",
|
||||
ignoreNotFound: false,
|
||||
@ -354,6 +376,14 @@ func TestGuaranteedUpdate(t *testing.T) {
|
||||
expectNotFoundErr: false,
|
||||
expectInvalidObjErr: false,
|
||||
expectNoUpdate: true,
|
||||
}, { // GuaranteedUpdate with same data AND a self link
|
||||
key: key,
|
||||
ignoreNotFound: false,
|
||||
precondition: nil,
|
||||
expectNotFoundErr: false,
|
||||
expectInvalidObjErr: false,
|
||||
expectNoUpdate: true,
|
||||
hasSelfLink: true,
|
||||
}, { // GuaranteedUpdate with same data but stale
|
||||
key: key,
|
||||
ignoreNotFound: false,
|
||||
@ -379,6 +409,8 @@ func TestGuaranteedUpdate(t *testing.T) {
|
||||
}}
|
||||
|
||||
for i, tt := range tests {
|
||||
key, storeObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}})
|
||||
|
||||
out := &example.Pod{}
|
||||
name := fmt.Sprintf("foo-%d", i)
|
||||
if tt.expectNoUpdate {
|
||||
@ -399,6 +431,9 @@ func TestGuaranteedUpdate(t *testing.T) {
|
||||
}
|
||||
}
|
||||
pod := *storeObj
|
||||
if tt.hasSelfLink {
|
||||
pod.SelfLink = "testlink"
|
||||
}
|
||||
pod.Name = name
|
||||
return &pod, nil
|
||||
}))
|
||||
@ -422,6 +457,13 @@ func TestGuaranteedUpdate(t *testing.T) {
|
||||
if out.ObjectMeta.Name != name {
|
||||
t.Errorf("#%d: pod name want=%s, get=%s", i, name, out.ObjectMeta.Name)
|
||||
}
|
||||
if out.SelfLink != "" {
|
||||
t.Errorf("#%d: selflink should not be set", i)
|
||||
}
|
||||
|
||||
// verify that kv pair is not empty after set and that the underlying data matches expectations
|
||||
checkStorageInvariants(ctx, t, etcdClient, store, key)
|
||||
|
||||
switch tt.expectNoUpdate {
|
||||
case true:
|
||||
if version != out.ResourceVersion {
|
||||
@ -432,7 +474,6 @@ func TestGuaranteedUpdate(t *testing.T) {
|
||||
t.Errorf("#%d: expect version change, but get the same version=%s", i, version)
|
||||
}
|
||||
}
|
||||
storeObj = out
|
||||
}
|
||||
}
|
||||
|
||||
@ -546,7 +587,7 @@ func TestTransformationFailure(t *testing.T) {
|
||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")})
|
||||
store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
|
||||
ctx := context.Background()
|
||||
|
||||
preset := []struct {
|
||||
@ -626,7 +667,7 @@ func TestList(t *testing.T) {
|
||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")})
|
||||
store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
|
||||
ctx := context.Background()
|
||||
|
||||
// Setup storage with the following structure:
|
||||
@ -714,7 +755,7 @@ func TestList(t *testing.T) {
|
||||
func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) {
|
||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")})
|
||||
store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
|
||||
ctx := context.Background()
|
||||
return ctx, store, cluster
|
||||
}
|
||||
@ -724,9 +765,12 @@ func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) {
|
||||
func testPropogateStore(ctx context.Context, t *testing.T, store *store, obj *example.Pod) (string, *example.Pod) {
|
||||
// Setup store with a key and grab the output for returning.
|
||||
key := "/testkey"
|
||||
err := store.unconditionalDelete(ctx, key, &example.Pod{})
|
||||
if err != nil && !storage.IsNotFound(err) {
|
||||
t.Fatal("Cleanup failed: %v", err)
|
||||
}
|
||||
setOutput := &example.Pod{}
|
||||
err := store.Create(ctx, key, obj, setOutput, 0)
|
||||
if err != nil {
|
||||
if err := store.Create(ctx, key, obj, setOutput, 0); err != nil {
|
||||
t.Fatalf("Set failed: %v", err)
|
||||
}
|
||||
return key, setOutput
|
||||
@ -736,7 +780,7 @@ func TestPrefix(t *testing.T) {
|
||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
transformer := prefixTransformer{prefix: []byte("test!")}
|
||||
transformer := prefixTransformer{prefix: []byte(defaultTestPrefix)}
|
||||
testcases := map[string]string{
|
||||
"custom/prefix": "/custom/prefix",
|
||||
"/custom//prefix//": "/custom/prefix",
|
||||
|
@ -26,7 +26,9 @@ import (
|
||||
)
|
||||
|
||||
// Versioner abstracts setting and retrieving metadata fields from database response
|
||||
// onto the object ot list.
|
||||
// onto the object ot list. It is required to maintain storage invariants - updating an
|
||||
// object twice with the same data except for the ResourceVersion and SelfLink must be
|
||||
// a no-op.
|
||||
type Versioner interface {
|
||||
// UpdateObject sets storage metadata into an API object. Returns an error if the object
|
||||
// cannot be updated correctly. May return nil if the requested object does not need metadata
|
||||
@ -36,6 +38,9 @@ type Versioner interface {
|
||||
// cannot be updated correctly. May return nil if the requested object does not need metadata
|
||||
// from database.
|
||||
UpdateList(obj runtime.Object, resourceVersion uint64) error
|
||||
// PrepareObjectForStorage should set SelfLink and ResourceVersion to the empty value. Should
|
||||
// return an error if the specified object cannot be updated.
|
||||
PrepareObjectForStorage(obj runtime.Object) error
|
||||
// ObjectResourceVersion returns the resource version (for persistence) of the specified object.
|
||||
// Should return an error if the specified object does not have a persistable version.
|
||||
ObjectResourceVersion(obj runtime.Object) (uint64, error)
|
||||
|
Loading…
Reference in New Issue
Block a user