mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #47703 from smarterclayton/clean_self_link
Automatic merge from submit-queue (batch tested with PRs 48864, 48651, 47703) Do not persist SelfLink into etcd storage This behavior regressed in an earlier release. Clearing the self link reduces the size of the stored object by a small amount and keeps etcd marginally cleaner. Add tests to verify that Create and Update result in no SelfLink stored in etcd.
This commit is contained in:
commit
578063b7a9
@ -389,7 +389,7 @@ func TestEtcdStorage(t *testing.T) {
|
||||
Metadata: Metadata{
|
||||
Name: "noxus.mygroup.example.com",
|
||||
Namespace: "",
|
||||
SelfLink: "/apis/apiextensions.k8s.io/v1beta1/customresourcedefinitions/noxus.mygroup.example.com",
|
||||
SelfLink: "",
|
||||
},
|
||||
},
|
||||
},
|
||||
@ -414,7 +414,7 @@ func TestEtcdStorage(t *testing.T) {
|
||||
Metadata: Metadata{
|
||||
Name: "curlets.mygroup.example.com",
|
||||
Namespace: "",
|
||||
SelfLink: "/apis/apiextensions.k8s.io/v1beta1/customresourcedefinitions/curlets.mygroup.example.com",
|
||||
SelfLink: "",
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -56,6 +56,17 @@ func (a APIObjectVersioner) UpdateList(obj runtime.Object, resourceVersion uint6
|
||||
return nil
|
||||
}
|
||||
|
||||
// PrepareObjectForStorage clears resource version and self link prior to writing to etcd.
|
||||
func (a APIObjectVersioner) PrepareObjectForStorage(obj runtime.Object) error {
|
||||
accessor, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
accessor.SetResourceVersion("")
|
||||
accessor.SetSelfLink("")
|
||||
return nil
|
||||
}
|
||||
|
||||
// ObjectResourceVersion implements Versioner
|
||||
func (a APIObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
|
||||
accessor, err := meta.Accessor(obj)
|
||||
|
@ -128,6 +128,9 @@ func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Ob
|
||||
if version, err := h.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
|
||||
return errors.New("resourceVersion may not be set on objects to be created")
|
||||
}
|
||||
if err := h.versioner.PrepareObjectForStorage(obj); err != nil {
|
||||
return fmt.Errorf("PrepareObjectForStorage returned an error: %v", err)
|
||||
}
|
||||
trace.Step("Version checked")
|
||||
|
||||
startTime := time.Now()
|
||||
@ -530,7 +533,7 @@ func (h *etcdHelper) GuaranteedUpdate(
|
||||
}
|
||||
|
||||
// Since update object may have a resourceVersion set, we need to clear it here.
|
||||
if err := h.versioner.UpdateObject(ret, 0); err != nil {
|
||||
if err := h.versioner.PrepareObjectForStorage(ret); err != nil {
|
||||
return errors.New("resourceVersion cannot be set on objects store in etcd")
|
||||
}
|
||||
|
||||
|
@ -147,6 +147,9 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
|
||||
if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
|
||||
return errors.New("resourceVersion should not be set on objects to be created")
|
||||
}
|
||||
if err := s.versioner.PrepareObjectForStorage(obj); err != nil {
|
||||
return fmt.Errorf("PrepareObjectForStorage failed: %v", err)
|
||||
}
|
||||
data, err := runtime.Encode(s.codec, obj)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -486,8 +489,8 @@ func (s *store) getStateFromObject(obj runtime.Object) (*objState, error) {
|
||||
|
||||
// Compute the serialized form - for that we need to temporarily clean
|
||||
// its resource version field (those are not stored in etcd).
|
||||
if err := s.versioner.UpdateObject(obj, 0); err != nil {
|
||||
return nil, errors.New("resourceVersion cannot be set on objects store in etcd")
|
||||
if err := s.versioner.PrepareObjectForStorage(obj); err != nil {
|
||||
return nil, fmt.Errorf("PrepareObjectForStorage failed: %v", err)
|
||||
}
|
||||
state.data, err = runtime.Encode(s.codec, obj)
|
||||
if err != nil {
|
||||
@ -503,15 +506,8 @@ func (s *store) updateState(st *objState, userUpdate storage.UpdateFunc) (runtim
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
version, err := s.versioner.ObjectResourceVersion(ret)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
if version != 0 {
|
||||
// We cannot store object with resourceVersion in etcd. We need to reset it.
|
||||
if err := s.versioner.UpdateObject(ret, 0); err != nil {
|
||||
return nil, 0, fmt.Errorf("UpdateObject failed: %v", err)
|
||||
}
|
||||
if err := s.versioner.PrepareObjectForStorage(ret); err != nil {
|
||||
return nil, 0, fmt.Errorf("PrepareObjectForStorage failed: %v", err)
|
||||
}
|
||||
var ttl uint64
|
||||
if ttlPtr != nil {
|
||||
|
@ -38,6 +38,7 @@ import (
|
||||
storagetests "k8s.io/apiserver/pkg/storage/tests"
|
||||
"k8s.io/apiserver/pkg/storage/value"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/integration"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
@ -45,6 +46,8 @@ import (
|
||||
var scheme = runtime.NewScheme()
|
||||
var codecs = serializer.NewCodecFactory(scheme)
|
||||
|
||||
const defaultTestPrefix = "test!"
|
||||
|
||||
func init() {
|
||||
metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion)
|
||||
example.AddToScheme(scheme)
|
||||
@ -84,7 +87,7 @@ func TestCreate(t *testing.T) {
|
||||
|
||||
key := "/testkey"
|
||||
out := &example.Pod{}
|
||||
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
||||
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", SelfLink: "testlink"}}
|
||||
|
||||
// verify that kv pair is empty before set
|
||||
getResp, err := etcdClient.KV.Get(ctx, key)
|
||||
@ -106,15 +109,32 @@ func TestCreate(t *testing.T) {
|
||||
if out.ResourceVersion == "" {
|
||||
t.Errorf("output should have non-empty resource version")
|
||||
}
|
||||
if out.SelfLink != "" {
|
||||
t.Errorf("output should have empty self link")
|
||||
}
|
||||
|
||||
// verify that kv pair is not empty after set
|
||||
getResp, err = etcdClient.KV.Get(ctx, key)
|
||||
checkStorageInvariants(ctx, t, etcdClient, store, key)
|
||||
}
|
||||
|
||||
func checkStorageInvariants(ctx context.Context, t *testing.T, etcdClient *clientv3.Client, store *store, key string) {
|
||||
getResp, err := etcdClient.KV.Get(ctx, key)
|
||||
if err != nil {
|
||||
t.Fatalf("etcdClient.KV.Get failed: %v", err)
|
||||
}
|
||||
if len(getResp.Kvs) == 0 {
|
||||
t.Fatalf("expecting non empty result on key: %s", key)
|
||||
}
|
||||
decoded, err := runtime.Decode(store.codec, getResp.Kvs[0].Value[len(defaultTestPrefix):])
|
||||
if err != nil {
|
||||
t.Fatalf("expecting successful decode of object from %v\n%v", err, string(getResp.Kvs[0].Value))
|
||||
}
|
||||
obj := decoded.(*example.Pod)
|
||||
if obj.ResourceVersion != "" {
|
||||
t.Errorf("stored object should have empty resource version")
|
||||
}
|
||||
if obj.SelfLink != "" {
|
||||
t.Errorf("stored output should have empty self link")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateWithTTL(t *testing.T) {
|
||||
@ -316,7 +336,8 @@ func TestGetToList(t *testing.T) {
|
||||
func TestGuaranteedUpdate(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
key, storeObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}})
|
||||
etcdClient := cluster.RandClient()
|
||||
key := "/testkey"
|
||||
|
||||
tests := []struct {
|
||||
key string
|
||||
@ -326,6 +347,7 @@ func TestGuaranteedUpdate(t *testing.T) {
|
||||
expectInvalidObjErr bool
|
||||
expectNoUpdate bool
|
||||
transformStale bool
|
||||
hasSelfLink bool
|
||||
}{{ // GuaranteedUpdate on non-existing key with ignoreNotFound=false
|
||||
key: "/non-existing",
|
||||
ignoreNotFound: false,
|
||||
@ -354,6 +376,14 @@ func TestGuaranteedUpdate(t *testing.T) {
|
||||
expectNotFoundErr: false,
|
||||
expectInvalidObjErr: false,
|
||||
expectNoUpdate: true,
|
||||
}, { // GuaranteedUpdate with same data AND a self link
|
||||
key: key,
|
||||
ignoreNotFound: false,
|
||||
precondition: nil,
|
||||
expectNotFoundErr: false,
|
||||
expectInvalidObjErr: false,
|
||||
expectNoUpdate: true,
|
||||
hasSelfLink: true,
|
||||
}, { // GuaranteedUpdate with same data but stale
|
||||
key: key,
|
||||
ignoreNotFound: false,
|
||||
@ -379,6 +409,8 @@ func TestGuaranteedUpdate(t *testing.T) {
|
||||
}}
|
||||
|
||||
for i, tt := range tests {
|
||||
key, storeObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}})
|
||||
|
||||
out := &example.Pod{}
|
||||
name := fmt.Sprintf("foo-%d", i)
|
||||
if tt.expectNoUpdate {
|
||||
@ -399,6 +431,9 @@ func TestGuaranteedUpdate(t *testing.T) {
|
||||
}
|
||||
}
|
||||
pod := *storeObj
|
||||
if tt.hasSelfLink {
|
||||
pod.SelfLink = "testlink"
|
||||
}
|
||||
pod.Name = name
|
||||
return &pod, nil
|
||||
}))
|
||||
@ -422,6 +457,13 @@ func TestGuaranteedUpdate(t *testing.T) {
|
||||
if out.ObjectMeta.Name != name {
|
||||
t.Errorf("#%d: pod name want=%s, get=%s", i, name, out.ObjectMeta.Name)
|
||||
}
|
||||
if out.SelfLink != "" {
|
||||
t.Errorf("#%d: selflink should not be set", i)
|
||||
}
|
||||
|
||||
// verify that kv pair is not empty after set and that the underlying data matches expectations
|
||||
checkStorageInvariants(ctx, t, etcdClient, store, key)
|
||||
|
||||
switch tt.expectNoUpdate {
|
||||
case true:
|
||||
if version != out.ResourceVersion {
|
||||
@ -432,7 +474,6 @@ func TestGuaranteedUpdate(t *testing.T) {
|
||||
t.Errorf("#%d: expect version change, but get the same version=%s", i, version)
|
||||
}
|
||||
}
|
||||
storeObj = out
|
||||
}
|
||||
}
|
||||
|
||||
@ -546,7 +587,7 @@ func TestTransformationFailure(t *testing.T) {
|
||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")})
|
||||
store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
|
||||
ctx := context.Background()
|
||||
|
||||
preset := []struct {
|
||||
@ -626,7 +667,7 @@ func TestList(t *testing.T) {
|
||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")})
|
||||
store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
|
||||
ctx := context.Background()
|
||||
|
||||
// Setup storage with the following structure:
|
||||
@ -714,7 +755,7 @@ func TestList(t *testing.T) {
|
||||
func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) {
|
||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")})
|
||||
store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
|
||||
ctx := context.Background()
|
||||
return ctx, store, cluster
|
||||
}
|
||||
@ -724,9 +765,12 @@ func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) {
|
||||
func testPropogateStore(ctx context.Context, t *testing.T, store *store, obj *example.Pod) (string, *example.Pod) {
|
||||
// Setup store with a key and grab the output for returning.
|
||||
key := "/testkey"
|
||||
err := store.unconditionalDelete(ctx, key, &example.Pod{})
|
||||
if err != nil && !storage.IsNotFound(err) {
|
||||
t.Fatal("Cleanup failed: %v", err)
|
||||
}
|
||||
setOutput := &example.Pod{}
|
||||
err := store.Create(ctx, key, obj, setOutput, 0)
|
||||
if err != nil {
|
||||
if err := store.Create(ctx, key, obj, setOutput, 0); err != nil {
|
||||
t.Fatalf("Set failed: %v", err)
|
||||
}
|
||||
return key, setOutput
|
||||
@ -736,7 +780,7 @@ func TestPrefix(t *testing.T) {
|
||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
transformer := prefixTransformer{prefix: []byte("test!")}
|
||||
transformer := prefixTransformer{prefix: []byte(defaultTestPrefix)}
|
||||
testcases := map[string]string{
|
||||
"custom/prefix": "/custom/prefix",
|
||||
"/custom//prefix//": "/custom/prefix",
|
||||
|
@ -26,7 +26,9 @@ import (
|
||||
)
|
||||
|
||||
// Versioner abstracts setting and retrieving metadata fields from database response
|
||||
// onto the object ot list.
|
||||
// onto the object ot list. It is required to maintain storage invariants - updating an
|
||||
// object twice with the same data except for the ResourceVersion and SelfLink must be
|
||||
// a no-op.
|
||||
type Versioner interface {
|
||||
// UpdateObject sets storage metadata into an API object. Returns an error if the object
|
||||
// cannot be updated correctly. May return nil if the requested object does not need metadata
|
||||
@ -36,6 +38,9 @@ type Versioner interface {
|
||||
// cannot be updated correctly. May return nil if the requested object does not need metadata
|
||||
// from database.
|
||||
UpdateList(obj runtime.Object, resourceVersion uint64) error
|
||||
// PrepareObjectForStorage should set SelfLink and ResourceVersion to the empty value. Should
|
||||
// return an error if the specified object cannot be updated.
|
||||
PrepareObjectForStorage(obj runtime.Object) error
|
||||
// ObjectResourceVersion returns the resource version (for persistence) of the specified object.
|
||||
// Should return an error if the specified object does not have a persistable version.
|
||||
ObjectResourceVersion(obj runtime.Object) (uint64, error)
|
||||
|
Loading…
Reference in New Issue
Block a user