From bddef32193ddd84f0ef1e585ab6b1a8694cd3e0d Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Thu, 5 Mar 2015 00:17:29 -0500 Subject: [PATCH] Prepare EtcdHelper to extract more data from Node In order to support graceful deletion, the resource object will need access to the TTL value in etcd. Also, in the future we may want to get the creation index (distinct from modifiedindex) and expose it to clients. Change EtcdResourceVersioner to be more type specific (objects vs lists) and provide a default implementation that relies on the internal API convention. Also, rename etcd_tools.go to etcd_helper.go and split a few things up. --- pkg/api/latest/latest.go | 5 - pkg/api/latest/latest_test.go | 4 +- pkg/api/meta.go | 19 ++- pkg/master/master.go | 2 +- pkg/registry/etcd/etcd_test.go | 4 +- pkg/registry/event/registry_test.go | 2 +- pkg/registry/generic/etcd/etcd.go | 4 +- pkg/registry/generic/etcd/etcd_test.go | 6 +- pkg/registry/limitrange/registry_test.go | 2 +- pkg/registry/namespace/etcd/etcd_test.go | 2 +- pkg/registry/pod/etcd/etcd_test.go | 2 +- pkg/registry/resourcequota/etcd/etcd_test.go | 2 +- pkg/registry/secret/registry_test.go | 2 +- pkg/tools/doc.go | 3 +- pkg/tools/{etcd_tools.go => etcd_helper.go} | 129 ++++-------------- ...etcd_tools_test.go => etcd_helper_test.go} | 30 ++-- ...cd_tools_watch.go => etcd_helper_watch.go} | 37 +++-- ...atch_test.go => etcd_helper_watch_test.go} | 2 + pkg/tools/etcd_object.go | 74 ++++++++++ pkg/tools/fake_etcd_client.go | 4 + pkg/tools/interfaces.go | 77 +++++++++++ test/integration/etcd_tools_test.go | 2 +- 22 files changed, 251 insertions(+), 163 deletions(-) rename pkg/tools/{etcd_tools.go => etcd_helper.go} (72%) rename pkg/tools/{etcd_tools_test.go => etcd_helper_test.go} (95%) rename pkg/tools/{etcd_tools_watch.go => etcd_helper_watch.go} (91%) rename pkg/tools/{etcd_tools_watch_test.go => etcd_helper_watch_test.go} (99%) create mode 100644 pkg/tools/etcd_object.go create mode 100644 pkg/tools/interfaces.go diff --git a/pkg/api/latest/latest.go b/pkg/api/latest/latest.go index 012ee85b765..f46d153a4d9 100644 --- a/pkg/api/latest/latest.go +++ b/pkg/api/latest/latest.go @@ -50,11 +50,6 @@ var Codec = v1beta1.Codec // accessor is the shared static metadata accessor for the API. var accessor = meta.NewAccessor() -// ResourceVersioner describes a default versioner that can handle all types -// of versioning. -// TODO: when versioning changes, make this part of each API definition. -var ResourceVersioner = runtime.ResourceVersioner(accessor) - // SelfLinker can set or get the SelfLink field of all API types. // TODO: when versioning changes, make this part of each API definition. // TODO(lavalamp): Combine SelfLinker & ResourceVersioner interfaces, force all uses diff --git a/pkg/api/latest/latest_test.go b/pkg/api/latest/latest_test.go index 253eb7cf82a..4faa05f05ea 100644 --- a/pkg/api/latest/latest_test.go +++ b/pkg/api/latest/latest_test.go @@ -27,7 +27,7 @@ import ( func TestResourceVersioner(t *testing.T) { pod := internal.Pod{ObjectMeta: internal.ObjectMeta{ResourceVersion: "10"}} - version, err := ResourceVersioner.ResourceVersion(&pod) + version, err := accessor.ResourceVersion(&pod) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -36,7 +36,7 @@ func TestResourceVersioner(t *testing.T) { } podList := internal.PodList{ListMeta: internal.ListMeta{ResourceVersion: "10"}} - version, err = ResourceVersioner.ResourceVersion(&podList) + version, err = accessor.ResourceVersion(&podList) if err != nil { t.Fatalf("unexpected error: %v", err) } diff --git a/pkg/api/meta.go b/pkg/api/meta.go index 5782be6558f..2a0b518fd1c 100644 --- a/pkg/api/meta.go +++ b/pkg/api/meta.go @@ -35,16 +35,27 @@ func HasObjectMetaSystemFieldValues(meta *ObjectMeta) bool { len(meta.UID) != 0 } -// GetObjectMetaPtr returns a pointer to a provided object's ObjectMeta. +// ObjectMetaFor returns a pointer to a provided object's ObjectMeta. // TODO: allow runtime.Unknown to extract this object func ObjectMetaFor(obj runtime.Object) (*ObjectMeta, error) { v, err := conversion.EnforcePtr(obj) if err != nil { return nil, err } - var objectMeta *ObjectMeta - if err := runtime.FieldPtr(v, "ObjectMeta", &objectMeta); err != nil { + var meta *ObjectMeta + err = runtime.FieldPtr(v, "ObjectMeta", &meta) + return meta, err +} + +// ListMetaFor returns a pointer to a provided object's ListMeta, +// or an error if the object does not have that pointer. +// TODO: allow runtime.Unknown to extract this object +func ListMetaFor(obj runtime.Object) (*ListMeta, error) { + v, err := conversion.EnforcePtr(obj) + if err != nil { return nil, err } - return objectMeta, nil + var meta *ListMeta + err = runtime.FieldPtr(v, "ListMeta", &meta) + return meta, err } diff --git a/pkg/master/master.go b/pkg/master/master.go index 68c2ff77290..0d6660672b7 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -178,7 +178,7 @@ func NewEtcdHelper(client tools.EtcdGetSet, version string) (helper tools.EtcdHe if err != nil { return helper, err } - return tools.EtcdHelper{client, versionInterfaces.Codec, tools.RuntimeVersionAdapter{versionInterfaces.MetadataAccessor}}, nil + return tools.NewEtcdHelper(client, versionInterfaces.Codec), nil } // setDefaults fills in any fields not set that are required to have valid data. diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index c690fe70890..587fc2c5a34 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -36,12 +36,12 @@ import ( ) func NewTestEtcdRegistry(client tools.EtcdClient) *Registry { - registry := NewRegistry(tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}}, nil) + registry := NewRegistry(tools.NewEtcdHelper(client, latest.Codec), nil) return registry } func NewTestEtcdRegistryWithPods(client tools.EtcdClient) *Registry { - helper := tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}} + helper := tools.NewEtcdHelper(client, latest.Codec) podStorage, _, _ := podetcd.NewREST(helper) registry := NewRegistry(helper, pod.NewRegistry(podStorage)) return registry diff --git a/pkg/registry/event/registry_test.go b/pkg/registry/event/registry_test.go index 038a6b4c7bd..03a375b8678 100644 --- a/pkg/registry/event/registry_test.go +++ b/pkg/registry/event/registry_test.go @@ -37,7 +37,7 @@ var testTTL uint64 = 60 func NewTestEventEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, generic.Registry) { f := tools.NewFakeEtcdClient(t) f.TestIndex = true - h := tools.EtcdHelper{f, testapi.Codec(), tools.RuntimeVersionAdapter{testapi.MetadataAccessor()}} + h := tools.NewEtcdHelper(f, testapi.Codec()) return f, NewEtcdRegistry(h, testTTL) } diff --git a/pkg/registry/generic/etcd/etcd.go b/pkg/registry/generic/etcd/etcd.go index 2d9a1deba8c..8a585162039 100644 --- a/pkg/registry/generic/etcd/etcd.go +++ b/pkg/registry/generic/etcd/etcd.go @@ -252,7 +252,7 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool creating := false out := e.NewFunc() err = e.Helper.AtomicUpdate(key, out, true, func(existing runtime.Object) (runtime.Object, uint64, error) { - version, err := e.Helper.ResourceVersioner.ResourceVersion(existing) + version, err := e.Helper.Versioner.ObjectResourceVersion(existing) if err != nil { return nil, 0, err } @@ -275,7 +275,7 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool } creating = false - newVersion, err := e.Helper.ResourceVersioner.ResourceVersion(obj) + newVersion, err := e.Helper.Versioner.ObjectResourceVersion(obj) if err != nil { return nil, 0, err } diff --git a/pkg/registry/generic/etcd/etcd_test.go b/pkg/registry/generic/etcd/etcd_test.go index 6f06c1d66ce..339280b288d 100644 --- a/pkg/registry/generic/etcd/etcd_test.go +++ b/pkg/registry/generic/etcd/etcd_test.go @@ -64,7 +64,7 @@ func hasCreated(t *testing.T, pod *api.Pod) func(runtime.Object) bool { func NewTestGenericEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, *Etcd) { f := tools.NewFakeEtcdClient(t) f.TestIndex = true - h := tools.EtcdHelper{f, testapi.Codec(), tools.RuntimeVersionAdapter{testapi.MetadataAccessor()}} + h := tools.NewEtcdHelper(f, testapi.Codec()) strategy := &testRESTStrategy{api.Scheme, api.SimpleNameGenerator, true, false} return f, &Etcd{ NewFunc: func() runtime.Object { return &api.Pod{} }, @@ -631,9 +631,9 @@ func TestEtcdDelete(t *testing.T) { for name, item := range table { fakeClient, registry := NewTestGenericEtcdRegistry(t) fakeClient.Data[path] = item.existing - _, err := registry.Delete(api.NewContext(), key) + obj, err := registry.Delete(api.NewContext(), key) if !item.errOK(err) { - t.Errorf("%v: unexpected error: %v", name, err) + t.Errorf("%v: unexpected error: %v (%#v)", name, err, obj) } if item.expect.E != nil { diff --git a/pkg/registry/limitrange/registry_test.go b/pkg/registry/limitrange/registry_test.go index 58479aac4b4..a3502ba91f2 100644 --- a/pkg/registry/limitrange/registry_test.go +++ b/pkg/registry/limitrange/registry_test.go @@ -36,7 +36,7 @@ import ( func NewTestLimitRangeEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, generic.Registry) { f := tools.NewFakeEtcdClient(t) f.TestIndex = true - h := tools.EtcdHelper{f, testapi.Codec(), tools.RuntimeVersionAdapter{testapi.MetadataAccessor()}} + h := tools.NewEtcdHelper(f, testapi.Codec()) return f, NewEtcdRegistry(h) } diff --git a/pkg/registry/namespace/etcd/etcd_test.go b/pkg/registry/namespace/etcd/etcd_test.go index 1e71dead83b..c029c2a6a9f 100644 --- a/pkg/registry/namespace/etcd/etcd_test.go +++ b/pkg/registry/namespace/etcd/etcd_test.go @@ -35,7 +35,7 @@ import ( func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) { fakeEtcdClient := tools.NewFakeEtcdClient(t) fakeEtcdClient.TestIndex = true - helper := tools.EtcdHelper{Client: fakeEtcdClient, Codec: latest.Codec, ResourceVersioner: tools.RuntimeVersionAdapter{latest.ResourceVersioner}} + helper := tools.NewEtcdHelper(fakeEtcdClient, latest.Codec) return fakeEtcdClient, helper } diff --git a/pkg/registry/pod/etcd/etcd_test.go b/pkg/registry/pod/etcd/etcd_test.go index 49052425b59..9546857467f 100644 --- a/pkg/registry/pod/etcd/etcd_test.go +++ b/pkg/registry/pod/etcd/etcd_test.go @@ -63,7 +63,7 @@ func (f *fakeCache) ClearPodStatus(namespace, name string) { func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) { fakeEtcdClient := tools.NewFakeEtcdClient(t) fakeEtcdClient.TestIndex = true - helper := tools.EtcdHelper{Client: fakeEtcdClient, Codec: latest.Codec, ResourceVersioner: tools.RuntimeVersionAdapter{latest.ResourceVersioner}} + helper := tools.NewEtcdHelper(fakeEtcdClient, latest.Codec) return fakeEtcdClient, helper } diff --git a/pkg/registry/resourcequota/etcd/etcd_test.go b/pkg/registry/resourcequota/etcd/etcd_test.go index 770e1ceb62a..a8a4002c050 100644 --- a/pkg/registry/resourcequota/etcd/etcd_test.go +++ b/pkg/registry/resourcequota/etcd/etcd_test.go @@ -40,7 +40,7 @@ import ( func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) { fakeEtcdClient := tools.NewFakeEtcdClient(t) fakeEtcdClient.TestIndex = true - helper := tools.EtcdHelper{Client: fakeEtcdClient, Codec: latest.Codec, ResourceVersioner: tools.RuntimeVersionAdapter{latest.ResourceVersioner}} + helper := tools.NewEtcdHelper(fakeEtcdClient, latest.Codec) return fakeEtcdClient, helper } diff --git a/pkg/registry/secret/registry_test.go b/pkg/registry/secret/registry_test.go index 79a60130c55..a8a7af977ee 100644 --- a/pkg/registry/secret/registry_test.go +++ b/pkg/registry/secret/registry_test.go @@ -35,7 +35,7 @@ import ( func NewTestSecretEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, generic.Registry) { f := tools.NewFakeEtcdClient(t) f.TestIndex = true - h := tools.EtcdHelper{f, testapi.Codec(), tools.RuntimeVersionAdapter{testapi.MetadataAccessor()}} + h := tools.NewEtcdHelper(f, testapi.Codec()) return f, NewEtcdRegistry(h) } diff --git a/pkg/tools/doc.go b/pkg/tools/doc.go index d1d8014cda2..2724e565b25 100644 --- a/pkg/tools/doc.go +++ b/pkg/tools/doc.go @@ -14,5 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package tools implements general tools which depend on the api package. +// Package tools implements types which help work with etcd which depend on the api package. +// TODO: move this package to an etcd specific utility package. package tools diff --git a/pkg/tools/etcd_tools.go b/pkg/tools/etcd_helper.go similarity index 72% rename from pkg/tools/etcd_tools.go rename to pkg/tools/etcd_helper.go index c92f2e505a8..124e5dbaa4a 100644 --- a/pkg/tools/etcd_tools.go +++ b/pkg/tools/etcd_helper.go @@ -24,7 +24,6 @@ import ( "net/http" "os/exec" "reflect" - "strconv" "github.com/GoogleCloudPlatform/kubernetes/pkg/conversion" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" @@ -33,82 +32,22 @@ import ( "github.com/golang/glog" ) -const ( - EtcdErrorCodeNotFound = 100 - EtcdErrorCodeTestFailed = 101 - EtcdErrorCodeNodeExist = 105 - EtcdErrorCodeValueRequired = 200 -) - -var ( - EtcdErrorNotFound = &etcd.EtcdError{ErrorCode: EtcdErrorCodeNotFound} - EtcdErrorTestFailed = &etcd.EtcdError{ErrorCode: EtcdErrorCodeTestFailed} - EtcdErrorNodeExist = &etcd.EtcdError{ErrorCode: EtcdErrorCodeNodeExist} - EtcdErrorValueRequired = &etcd.EtcdError{ErrorCode: EtcdErrorCodeValueRequired} -) - -// EtcdClient is an injectable interface for testing. -type EtcdClient interface { - GetCluster() []string - AddChild(key, data string, ttl uint64) (*etcd.Response, error) - Get(key string, sort, recursive bool) (*etcd.Response, error) - Set(key, value string, ttl uint64) (*etcd.Response, error) - Create(key, value string, ttl uint64) (*etcd.Response, error) - CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error) - Delete(key string, recursive bool) (*etcd.Response, error) - // I'd like to use directional channels here (e.g. <-chan) but this interface mimics - // the etcd client interface which doesn't, and it doesn't seem worth it to wrap the api. - Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) -} - -// EtcdGetSet interface exposes only the etcd operations needed by EtcdHelper. -type EtcdGetSet interface { - GetCluster() []string - Get(key string, sort, recursive bool) (*etcd.Response, error) - Set(key, value string, ttl uint64) (*etcd.Response, error) - Create(key, value string, ttl uint64) (*etcd.Response, error) - Delete(key string, recursive bool) (*etcd.Response, error) - CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error) - Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) -} - -type EtcdResourceVersioner interface { - SetResourceVersion(obj runtime.Object, version uint64) error - ResourceVersion(obj runtime.Object) (uint64, error) -} - -// RuntimeVersionAdapter converts a string based versioner to EtcdResourceVersioner -type RuntimeVersionAdapter struct { - Versioner runtime.ResourceVersioner -} - -// SetResourceVersion implements EtcdResourceVersioner -func (a RuntimeVersionAdapter) SetResourceVersion(obj runtime.Object, version uint64) error { - if version == 0 { - return a.Versioner.SetResourceVersion(obj, "") - } - s := strconv.FormatUint(version, 10) - return a.Versioner.SetResourceVersion(obj, s) -} - -// SetResourceVersion implements EtcdResourceVersioner -func (a RuntimeVersionAdapter) ResourceVersion(obj runtime.Object) (uint64, error) { - version, err := a.Versioner.ResourceVersion(obj) - if err != nil { - return 0, err - } - if version == "" { - return 0, nil - } - return strconv.ParseUint(version, 10, 64) -} - // EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client. type EtcdHelper struct { Client EtcdGetSet Codec runtime.Codec // optional, no atomic operations can be performed without this interface - ResourceVersioner EtcdResourceVersioner + Versioner EtcdVersioner +} + +// NewEtcdHelper creates a helper that works against objects that use the internal +// Kubernetes API objects. +func NewEtcdHelper(client EtcdGetSet, codec runtime.Codec) EtcdHelper { + return EtcdHelper{ + Client: client, + Codec: codec, + Versioner: APIObjectVersioner{}, + } } // IsEtcdNotFound returns true iff err is an etcd not found error. @@ -163,19 +102,6 @@ func (h *EtcdHelper) listEtcdNode(key string) ([]*etcd.Node, uint64, error) { return result.Node.Nodes, result.EtcdIndex, nil } -// ExtractList extracts a go object per etcd node into a slice with the resource version. -// DEPRECATED: Use ExtractToList instead, it's more convenient. -func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}, resourceVersion *uint64) error { - nodes, index, err := h.listEtcdNode(key) - if resourceVersion != nil { - *resourceVersion = index - } - if err != nil { - return err - } - return h.decodeNodeList(nodes, slicePtr) -} - // decodeNodeList walks the tree of each node in the list and decodes into the specified object func (h *EtcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) error { v, err := conversion.EnforcePtr(slicePtr) @@ -194,28 +120,31 @@ func (h *EtcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) er if err := h.Codec.DecodeInto([]byte(node.Value), obj.Interface().(runtime.Object)); err != nil { return err } - if h.ResourceVersioner != nil { - _ = h.ResourceVersioner.SetResourceVersion(obj.Interface().(runtime.Object), node.ModifiedIndex) + if h.Versioner != nil { // being unable to set the version does not prevent the object from being extracted + _ = h.Versioner.UpdateObject(obj.Interface().(runtime.Object), node) } v.Set(reflect.Append(v, obj.Elem())) } return nil } -// ExtractToList is just like ExtractList, but it works on a ThingyList api object. -// extracts a go object per etcd node into a slice with the resource version. +// ExtractToList works on a *List api object (an object that satisfies the runtime.IsList +// definition) and extracts a go object per etcd node into a slice with the resource version. func (h *EtcdHelper) ExtractToList(key string, listObj runtime.Object) error { - var resourceVersion uint64 listPtr, err := runtime.GetItemsPtr(listObj) if err != nil { return err } - if err := h.ExtractList(key, listPtr, &resourceVersion); err != nil { + nodes, index, err := h.listEtcdNode(key) + if err != nil { return err } - if h.ResourceVersioner != nil { - if err := h.ResourceVersioner.SetResourceVersion(listObj, resourceVersion); err != nil { + if err := h.decodeNodeList(nodes, listPtr); err != nil { + return err + } + if h.Versioner != nil { + if err := h.Versioner.UpdateList(listObj, index); err != nil { return err } } @@ -263,8 +192,8 @@ func (h *EtcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run } body = node.Value err = h.Codec.DecodeInto([]byte(body), objPtr) - if h.ResourceVersioner != nil { - _ = h.ResourceVersioner.SetResourceVersion(objPtr, node.ModifiedIndex) + if h.Versioner != nil { + _ = h.Versioner.UpdateObject(objPtr, node) // being unable to set the version does not prevent the object from being extracted } return body, node.ModifiedIndex, err @@ -278,8 +207,8 @@ func (h *EtcdHelper) CreateObj(key string, obj, out runtime.Object, ttl uint64) if err != nil { return err } - if h.ResourceVersioner != nil { - if version, err := h.ResourceVersioner.ResourceVersion(obj); err == nil && version != 0 { + if h.Versioner != nil { + if version, err := h.Versioner.ObjectResourceVersion(obj); err == nil && version != 0 { return errors.New("resourceVersion may not be set on objects to be created") } } @@ -319,7 +248,7 @@ func (h *EtcdHelper) DeleteObj(key string, out runtime.Object) error { // SetObj marshals obj via json, and stores under key. Will do an atomic update if obj's ResourceVersion // field is set. 'ttl' is time-to-live in seconds, and 0 means forever. If no error is returned and out is -//not nil, out will be set to the read value from etcd. +// not nil, out will be set to the read value from etcd. func (h *EtcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) error { var response *etcd.Response data, err := h.Codec.Encode(obj) @@ -328,8 +257,8 @@ func (h *EtcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) err } create := true - if h.ResourceVersioner != nil { - if version, err := h.ResourceVersioner.ResourceVersion(obj); err == nil && version != 0 { + if h.Versioner != nil { + if version, err := h.Versioner.ObjectResourceVersion(obj); err == nil && version != 0 { create = false response, err = h.Client.CompareAndSwap(key, string(data), ttl, "", version) if err != nil { diff --git a/pkg/tools/etcd_tools_test.go b/pkg/tools/etcd_helper_test.go similarity index 95% rename from pkg/tools/etcd_tools_test.go rename to pkg/tools/etcd_helper_test.go index 20ebfb1a87a..c48ec9ba4f9 100644 --- a/pkg/tools/etcd_tools_test.go +++ b/pkg/tools/etcd_helper_test.go @@ -26,7 +26,6 @@ import ( "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" "github.com/GoogleCloudPlatform/kubernetes/pkg/conversion" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" @@ -44,7 +43,6 @@ func (*TestResource) IsAnAPIObject() {} var scheme *runtime.Scheme var codec runtime.Codec -var versioner = RuntimeVersionAdapter{meta.NewAccessor()} func init() { scheme = runtime.NewScheme() @@ -129,7 +127,7 @@ func TestExtractToList(t *testing.T) { } var got api.PodList - helper := EtcdHelper{fakeClient, testapi.Codec(), versioner} + helper := NewEtcdHelper(fakeClient, testapi.Codec()) err := helper.ExtractToList("/some/key", &got) if err != nil { t.Errorf("Unexpected error %v", err) @@ -212,7 +210,7 @@ func TestExtractToListAcrossDirectories(t *testing.T) { } var got api.PodList - helper := EtcdHelper{fakeClient, testapi.Codec(), versioner} + helper := NewEtcdHelper(fakeClient, testapi.Codec()) err := helper.ExtractToList("/some/key", &got) if err != nil { t.Errorf("Unexpected error %v", err) @@ -282,7 +280,7 @@ func TestExtractToListExcludesDirectories(t *testing.T) { } var got api.PodList - helper := EtcdHelper{fakeClient, testapi.Codec(), versioner} + helper := NewEtcdHelper(fakeClient, testapi.Codec()) err := helper.ExtractToList("/some/key", &got) if err != nil { t.Errorf("Unexpected error %v", err) @@ -302,7 +300,7 @@ func TestExtractObj(t *testing.T) { }, } fakeClient.Set("/some/key", runtime.EncodeOrDie(testapi.Codec(), &expect), 0) - helper := EtcdHelper{fakeClient, testapi.Codec(), versioner} + helper := NewEtcdHelper(fakeClient, testapi.Codec()) var got api.Pod err := helper.ExtractObj("/some/key", &got, false) if err != nil { @@ -335,7 +333,7 @@ func TestExtractObjNotFoundErr(t *testing.T) { }, }, } - helper := EtcdHelper{fakeClient, codec, versioner} + helper := NewEtcdHelper(fakeClient, codec) try := func(key string) { var got api.Pod err := helper.ExtractObj(key, &got, false) @@ -356,7 +354,7 @@ func TestExtractObjNotFoundErr(t *testing.T) { func TestCreateObj(t *testing.T) { obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} fakeClient := NewFakeEtcdClient(t) - helper := EtcdHelper{fakeClient, testapi.Codec(), versioner} + helper := NewEtcdHelper(fakeClient, testapi.Codec()) returnedObj := &api.Pod{} err := helper.CreateObj("/some/key", obj, returnedObj, 5) if err != nil { @@ -381,7 +379,7 @@ func TestCreateObj(t *testing.T) { func TestCreateObjNilOutParam(t *testing.T) { obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} fakeClient := NewFakeEtcdClient(t) - helper := EtcdHelper{fakeClient, testapi.Codec(), versioner} + helper := NewEtcdHelper(fakeClient, testapi.Codec()) err := helper.CreateObj("/some/key", obj, nil, 5) if err != nil { t.Errorf("Unexpected error %#v", err) @@ -391,7 +389,7 @@ func TestCreateObjNilOutParam(t *testing.T) { func TestSetObj(t *testing.T) { obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} fakeClient := NewFakeEtcdClient(t) - helper := EtcdHelper{fakeClient, testapi.Codec(), versioner} + helper := NewEtcdHelper(fakeClient, testapi.Codec()) returnedObj := &api.Pod{} err := helper.SetObj("/some/key", obj, returnedObj, 5) if err != nil { @@ -418,7 +416,7 @@ func TestSetObjFailCAS(t *testing.T) { obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", ResourceVersion: "1"}} fakeClient := NewFakeEtcdClient(t) fakeClient.CasErr = fakeClient.NewError(123) - helper := EtcdHelper{fakeClient, testapi.Codec(), versioner} + helper := NewEtcdHelper(fakeClient, testapi.Codec()) err := helper.SetObj("/some/key", obj, nil, 5) if err == nil { t.Errorf("Expecting error.") @@ -438,7 +436,7 @@ func TestSetObjWithVersion(t *testing.T) { }, } - helper := EtcdHelper{fakeClient, testapi.Codec(), versioner} + helper := NewEtcdHelper(fakeClient, testapi.Codec()) returnedObj := &api.Pod{} err := helper.SetObj("/some/key", obj, returnedObj, 7) if err != nil { @@ -500,7 +498,7 @@ func TestSetObjNilOutParam(t *testing.T) { func TestAtomicUpdate(t *testing.T) { fakeClient := NewFakeEtcdClient(t) fakeClient.TestIndex = true - helper := EtcdHelper{fakeClient, codec, versioner} + helper := NewEtcdHelper(fakeClient, codec) // Create a new node. fakeClient.ExpectNotFoundGet("/some/key") @@ -554,7 +552,7 @@ func TestAtomicUpdate(t *testing.T) { func TestAtomicUpdateNoChange(t *testing.T) { fakeClient := NewFakeEtcdClient(t) fakeClient.TestIndex = true - helper := EtcdHelper{fakeClient, codec, versioner} + helper := NewEtcdHelper(fakeClient, codec) // Create a new node. fakeClient.ExpectNotFoundGet("/some/key") @@ -585,7 +583,7 @@ func TestAtomicUpdateNoChange(t *testing.T) { func TestAtomicUpdateKeyNotFound(t *testing.T) { fakeClient := NewFakeEtcdClient(t) fakeClient.TestIndex = true - helper := EtcdHelper{fakeClient, codec, versioner} + helper := NewEtcdHelper(fakeClient, codec) // Create a new node. fakeClient.ExpectNotFoundGet("/some/key") @@ -611,7 +609,7 @@ func TestAtomicUpdateKeyNotFound(t *testing.T) { func TestAtomicUpdate_CreateCollision(t *testing.T) { fakeClient := NewFakeEtcdClient(t) fakeClient.TestIndex = true - helper := EtcdHelper{fakeClient, codec, versioner} + helper := NewEtcdHelper(fakeClient, codec) fakeClient.ExpectNotFoundGet("/some/key") diff --git a/pkg/tools/etcd_tools_watch.go b/pkg/tools/etcd_helper_watch.go similarity index 91% rename from pkg/tools/etcd_tools_watch.go rename to pkg/tools/etcd_helper_watch.go index 6e5027acbb8..7aad160317b 100644 --- a/pkg/tools/etcd_tools_watch.go +++ b/pkg/tools/etcd_helper_watch.go @@ -61,7 +61,7 @@ func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) { // watch.Interface. resourceVersion may be used to specify what version to begin // watching (e.g., for reconnecting without missing any updates). func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) { - w := newEtcdWatcher(true, exceptKey(key), filter, h.Codec, h.ResourceVersioner, nil) + w := newEtcdWatcher(true, exceptKey(key), filter, h.Codec, h.Versioner, nil) go w.etcdWatch(h.Client, key, resourceVersion) return w, nil } @@ -90,7 +90,7 @@ func (h *EtcdHelper) Watch(key string, resourceVersion uint64) watch.Interface { // // Errors will be sent down the channel. func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) watch.Interface { - w := newEtcdWatcher(false, nil, Everything, h.Codec, h.ResourceVersioner, transform) + w := newEtcdWatcher(false, nil, Everything, h.Codec, h.Versioner, transform) go w.etcdWatch(h.Client, key, resourceVersion) return w } @@ -111,7 +111,7 @@ func exceptKey(except string) includeFunc { // etcdWatcher converts a native etcd watch to a watch.Interface. type etcdWatcher struct { encoding runtime.Codec - versioner EtcdResourceVersioner + versioner EtcdVersioner transform TransformFunc list bool // If we're doing a recursive watch, should be true. @@ -137,7 +137,7 @@ const watchWaitDuration = 100 * time.Millisecond // newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform // and a versioner, the versioner must be able to handle the objects that transform creates. -func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding runtime.Codec, versioner EtcdResourceVersioner, transform TransformFunc) *etcdWatcher { +func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding runtime.Codec, versioner EtcdVersioner, transform TransformFunc) *etcdWatcher { w := &etcdWatcher{ encoding: encoding, versioner: versioner, @@ -240,16 +240,16 @@ func (w *etcdWatcher) translate() { } } -func (w *etcdWatcher) decodeObject(data []byte, index uint64) (runtime.Object, error) { - obj, err := w.encoding.Decode(data) +func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) { + obj, err := w.encoding.Decode([]byte(node.Value)) if err != nil { return nil, err } // ensure resource version is set on the object we load from etcd if w.versioner != nil { - if err := w.versioner.SetResourceVersion(obj, index); err != nil { - glog.Errorf("failure to version api object (%d) %#v: %v", index, obj, err) + if err := w.versioner.UpdateObject(obj, node); err != nil { + glog.Errorf("failure to version api object (%d) %#v: %v", node.ModifiedIndex, obj, err) } } @@ -273,10 +273,9 @@ func (w *etcdWatcher) sendAdd(res *etcd.Response) { if w.include != nil && !w.include(res.Node.Key) { return } - data := []byte(res.Node.Value) - obj, err := w.decodeObject(data, res.Node.ModifiedIndex) + obj, err := w.decodeObject(res.Node) if err != nil { - glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(data), res, res.Node) + glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(res.Node.Value), res, res.Node) // TODO: expose an error through watch.Interface? // Ignore this value. If we stop the watch on a bad value, a client that uses // the resourceVersion to resume will never be able to get past a bad value. @@ -303,10 +302,9 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) { if w.include != nil && !w.include(res.Node.Key) { return } - curData := []byte(res.Node.Value) - curObj, err := w.decodeObject(curData, res.Node.ModifiedIndex) + curObj, err := w.decodeObject(res.Node) if err != nil { - glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(curData), res, res.Node) + glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(res.Node.Value), res, res.Node) // TODO: expose an error through watch.Interface? // Ignore this value. If we stop the watch on a bad value, a client that uses // the resourceVersion to resume will never be able to get past a bad value. @@ -317,7 +315,7 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) { var oldObj runtime.Object if res.PrevNode != nil && res.PrevNode.Value != "" { // Ignore problems reading the old object. - if oldObj, err = w.decodeObject([]byte(res.PrevNode.Value), res.PrevNode.ModifiedIndex); err == nil { + if oldObj, err = w.decodeObject(res.PrevNode); err == nil { oldObjPasses = w.filter(oldObj) } } @@ -352,17 +350,16 @@ func (w *etcdWatcher) sendDelete(res *etcd.Response) { if w.include != nil && !w.include(res.PrevNode.Key) { return } - data := []byte(res.PrevNode.Value) - index := res.PrevNode.ModifiedIndex + node := *res.PrevNode if res.Node != nil { // Note that this sends the *old* object with the etcd index for the time at // which it gets deleted. This will allow users to restart the watch at the right // index. - index = res.Node.ModifiedIndex + node.ModifiedIndex = res.Node.ModifiedIndex } - obj, err := w.decodeObject(data, index) + obj, err := w.decodeObject(&node) if err != nil { - glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(data), res, res.PrevNode) + glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(res.PrevNode.Value), res, res.PrevNode) // TODO: expose an error through watch.Interface? // Ignore this value. If we stop the watch on a bad value, a client that uses // the resourceVersion to resume will never be able to get past a bad value. diff --git a/pkg/tools/etcd_tools_watch_test.go b/pkg/tools/etcd_helper_watch_test.go similarity index 99% rename from pkg/tools/etcd_tools_watch_test.go rename to pkg/tools/etcd_helper_watch_test.go index 9685898d6bb..c468122bb01 100644 --- a/pkg/tools/etcd_tools_watch_test.go +++ b/pkg/tools/etcd_helper_watch_test.go @@ -29,6 +29,8 @@ import ( "github.com/coreos/go-etcd/etcd" ) +var versioner = APIObjectVersioner{} + func TestWatchInterpretations(t *testing.T) { codec := latest.Codec // Declare some pods to make the test cases compact. diff --git a/pkg/tools/etcd_object.go b/pkg/tools/etcd_object.go new file mode 100644 index 00000000000..a50adb290b6 --- /dev/null +++ b/pkg/tools/etcd_object.go @@ -0,0 +1,74 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tools + +import ( + "strconv" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/coreos/go-etcd/etcd" +) + +// APIObjectVersioner implements versioning and extracting etcd node information +// for objects that have an embedded ObjectMeta or ListMeta field. +type APIObjectVersioner struct{} + +// UpdateObject implements EtcdVersioner +func (a APIObjectVersioner) UpdateObject(obj runtime.Object, node *etcd.Node) error { + objectMeta, err := api.ObjectMetaFor(obj) + if err != nil { + return err + } + version := node.ModifiedIndex + versionString := "" + if version != 0 { + versionString = strconv.FormatUint(version, 10) + } + objectMeta.ResourceVersion = versionString + return nil +} + +// UpdateList implements EtcdVersioner +func (a APIObjectVersioner) UpdateList(obj runtime.Object, resourceVersion uint64) error { + listMeta, err := api.ListMetaFor(obj) + if err != nil || listMeta == nil { + return err + } + versionString := "" + if resourceVersion != 0 { + versionString = strconv.FormatUint(resourceVersion, 10) + } + listMeta.ResourceVersion = versionString + return nil +} + +// ObjectResourceVersion implements EtcdVersioner +func (a APIObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) { + meta, err := api.ObjectMetaFor(obj) + if err != nil { + return 0, err + } + version := meta.ResourceVersion + if len(version) == 0 { + return 0, nil + } + return strconv.ParseUint(version, 10, 64) +} + +// APIObjectVersioner implements EtcdVersioner +var _ EtcdVersioner = APIObjectVersioner{} diff --git a/pkg/tools/fake_etcd_client.go b/pkg/tools/fake_etcd_client.go index b66edab46ba..5ee00cc23b3 100644 --- a/pkg/tools/fake_etcd_client.go +++ b/pkg/tools/fake_etcd_client.go @@ -290,6 +290,10 @@ func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, err Index: f.ChangeIndex, } } + if IsEtcdNotFound(existing.E) { + f.DeletedKeys = append(f.DeletedKeys, key) + return existing.R, existing.E + } index := f.generateIndex() f.Data[key] = EtcdResponseWithError{ R: &etcd.Response{}, diff --git a/pkg/tools/interfaces.go b/pkg/tools/interfaces.go new file mode 100644 index 00000000000..609039ee420 --- /dev/null +++ b/pkg/tools/interfaces.go @@ -0,0 +1,77 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tools + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/coreos/go-etcd/etcd" +) + +const ( + EtcdErrorCodeNotFound = 100 + EtcdErrorCodeTestFailed = 101 + EtcdErrorCodeNodeExist = 105 + EtcdErrorCodeValueRequired = 200 +) + +var ( + EtcdErrorNotFound = &etcd.EtcdError{ErrorCode: EtcdErrorCodeNotFound} + EtcdErrorTestFailed = &etcd.EtcdError{ErrorCode: EtcdErrorCodeTestFailed} + EtcdErrorNodeExist = &etcd.EtcdError{ErrorCode: EtcdErrorCodeNodeExist} + EtcdErrorValueRequired = &etcd.EtcdError{ErrorCode: EtcdErrorCodeValueRequired} +) + +// EtcdClient is an injectable interface for testing. +type EtcdClient interface { + GetCluster() []string + AddChild(key, data string, ttl uint64) (*etcd.Response, error) + Get(key string, sort, recursive bool) (*etcd.Response, error) + Set(key, value string, ttl uint64) (*etcd.Response, error) + Create(key, value string, ttl uint64) (*etcd.Response, error) + CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error) + Delete(key string, recursive bool) (*etcd.Response, error) + // I'd like to use directional channels here (e.g. <-chan) but this interface mimics + // the etcd client interface which doesn't, and it doesn't seem worth it to wrap the api. + Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) +} + +// EtcdGetSet interface exposes only the etcd operations needed by EtcdHelper. +type EtcdGetSet interface { + GetCluster() []string + Get(key string, sort, recursive bool) (*etcd.Response, error) + Set(key, value string, ttl uint64) (*etcd.Response, error) + Create(key, value string, ttl uint64) (*etcd.Response, error) + Delete(key string, recursive bool) (*etcd.Response, error) + CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error) + Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) +} + +// EtcdVersioner abstracts setting and retrieving fields from the etcd response onto the object +// or list. +type EtcdVersioner interface { + // UpdateObject sets etcd storage metadata into an API object. Returns an error if the object + // cannot be updated correctly. May return nil if the requested object does not need metadata + // from etcd. + UpdateObject(obj runtime.Object, node *etcd.Node) error + // UpdateList sets the resource version into an API list object. Returns an error if the object + // cannot be updated correctly. May return nil if the requested object does not need metadata + // from etcd. + UpdateList(obj runtime.Object, resourceVersion uint64) error + // ObjectResourceVersion returns the resource version (for persistence) of the specified object. + // Should return an error if the specified object does not have a persistable version. + ObjectResourceVersion(obj runtime.Object) (uint64, error) +} diff --git a/test/integration/etcd_tools_test.go b/test/integration/etcd_tools_test.go index 1d26b6cc7ee..1bb576ddd8e 100644 --- a/test/integration/etcd_tools_test.go +++ b/test/integration/etcd_tools_test.go @@ -93,7 +93,7 @@ func TestExtractObj(t *testing.T) { func TestWatch(t *testing.T) { client := newEtcdClient() - helper := tools.EtcdHelper{Client: client, Codec: latest.Codec, ResourceVersioner: tools.RuntimeVersionAdapter{latest.ResourceVersioner}} + helper := tools.NewEtcdHelper(client, latest.Codec) withEtcdKey(func(key string) { resp, err := client.Set(key, runtime.EncodeOrDie(v1beta1.Codec, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0) if err != nil {