diff --git a/pkg/api/latest/latest.go b/pkg/api/latest/latest.go index 012ee85b765..f46d153a4d9 100644 --- a/pkg/api/latest/latest.go +++ b/pkg/api/latest/latest.go @@ -50,11 +50,6 @@ var Codec = v1beta1.Codec // accessor is the shared static metadata accessor for the API. var accessor = meta.NewAccessor() -// ResourceVersioner describes a default versioner that can handle all types -// of versioning. -// TODO: when versioning changes, make this part of each API definition. -var ResourceVersioner = runtime.ResourceVersioner(accessor) - // SelfLinker can set or get the SelfLink field of all API types. // TODO: when versioning changes, make this part of each API definition. // TODO(lavalamp): Combine SelfLinker & ResourceVersioner interfaces, force all uses diff --git a/pkg/api/latest/latest_test.go b/pkg/api/latest/latest_test.go index 253eb7cf82a..4faa05f05ea 100644 --- a/pkg/api/latest/latest_test.go +++ b/pkg/api/latest/latest_test.go @@ -27,7 +27,7 @@ import ( func TestResourceVersioner(t *testing.T) { pod := internal.Pod{ObjectMeta: internal.ObjectMeta{ResourceVersion: "10"}} - version, err := ResourceVersioner.ResourceVersion(&pod) + version, err := accessor.ResourceVersion(&pod) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -36,7 +36,7 @@ func TestResourceVersioner(t *testing.T) { } podList := internal.PodList{ListMeta: internal.ListMeta{ResourceVersion: "10"}} - version, err = ResourceVersioner.ResourceVersion(&podList) + version, err = accessor.ResourceVersion(&podList) if err != nil { t.Fatalf("unexpected error: %v", err) } diff --git a/pkg/api/meta.go b/pkg/api/meta.go index 5782be6558f..2a0b518fd1c 100644 --- a/pkg/api/meta.go +++ b/pkg/api/meta.go @@ -35,16 +35,27 @@ func HasObjectMetaSystemFieldValues(meta *ObjectMeta) bool { len(meta.UID) != 0 } -// GetObjectMetaPtr returns a pointer to a provided object's ObjectMeta. +// ObjectMetaFor returns a pointer to a provided object's ObjectMeta. // TODO: allow runtime.Unknown to extract this object func ObjectMetaFor(obj runtime.Object) (*ObjectMeta, error) { v, err := conversion.EnforcePtr(obj) if err != nil { return nil, err } - var objectMeta *ObjectMeta - if err := runtime.FieldPtr(v, "ObjectMeta", &objectMeta); err != nil { + var meta *ObjectMeta + err = runtime.FieldPtr(v, "ObjectMeta", &meta) + return meta, err +} + +// ListMetaFor returns a pointer to a provided object's ListMeta, +// or an error if the object does not have that pointer. +// TODO: allow runtime.Unknown to extract this object +func ListMetaFor(obj runtime.Object) (*ListMeta, error) { + v, err := conversion.EnforcePtr(obj) + if err != nil { return nil, err } - return objectMeta, nil + var meta *ListMeta + err = runtime.FieldPtr(v, "ListMeta", &meta) + return meta, err } diff --git a/pkg/master/master.go b/pkg/master/master.go index 68c2ff77290..0d6660672b7 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -178,7 +178,7 @@ func NewEtcdHelper(client tools.EtcdGetSet, version string) (helper tools.EtcdHe if err != nil { return helper, err } - return tools.EtcdHelper{client, versionInterfaces.Codec, tools.RuntimeVersionAdapter{versionInterfaces.MetadataAccessor}}, nil + return tools.NewEtcdHelper(client, versionInterfaces.Codec), nil } // setDefaults fills in any fields not set that are required to have valid data. diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index c690fe70890..587fc2c5a34 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -36,12 +36,12 @@ import ( ) func NewTestEtcdRegistry(client tools.EtcdClient) *Registry { - registry := NewRegistry(tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}}, nil) + registry := NewRegistry(tools.NewEtcdHelper(client, latest.Codec), nil) return registry } func NewTestEtcdRegistryWithPods(client tools.EtcdClient) *Registry { - helper := tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}} + helper := tools.NewEtcdHelper(client, latest.Codec) podStorage, _, _ := podetcd.NewREST(helper) registry := NewRegistry(helper, pod.NewRegistry(podStorage)) return registry diff --git a/pkg/registry/event/registry_test.go b/pkg/registry/event/registry_test.go index 038a6b4c7bd..03a375b8678 100644 --- a/pkg/registry/event/registry_test.go +++ b/pkg/registry/event/registry_test.go @@ -37,7 +37,7 @@ var testTTL uint64 = 60 func NewTestEventEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, generic.Registry) { f := tools.NewFakeEtcdClient(t) f.TestIndex = true - h := tools.EtcdHelper{f, testapi.Codec(), tools.RuntimeVersionAdapter{testapi.MetadataAccessor()}} + h := tools.NewEtcdHelper(f, testapi.Codec()) return f, NewEtcdRegistry(h, testTTL) } diff --git a/pkg/registry/generic/etcd/etcd.go b/pkg/registry/generic/etcd/etcd.go index 2d9a1deba8c..8a585162039 100644 --- a/pkg/registry/generic/etcd/etcd.go +++ b/pkg/registry/generic/etcd/etcd.go @@ -252,7 +252,7 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool creating := false out := e.NewFunc() err = e.Helper.AtomicUpdate(key, out, true, func(existing runtime.Object) (runtime.Object, uint64, error) { - version, err := e.Helper.ResourceVersioner.ResourceVersion(existing) + version, err := e.Helper.Versioner.ObjectResourceVersion(existing) if err != nil { return nil, 0, err } @@ -275,7 +275,7 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool } creating = false - newVersion, err := e.Helper.ResourceVersioner.ResourceVersion(obj) + newVersion, err := e.Helper.Versioner.ObjectResourceVersion(obj) if err != nil { return nil, 0, err } diff --git a/pkg/registry/generic/etcd/etcd_test.go b/pkg/registry/generic/etcd/etcd_test.go index 6f06c1d66ce..339280b288d 100644 --- a/pkg/registry/generic/etcd/etcd_test.go +++ b/pkg/registry/generic/etcd/etcd_test.go @@ -64,7 +64,7 @@ func hasCreated(t *testing.T, pod *api.Pod) func(runtime.Object) bool { func NewTestGenericEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, *Etcd) { f := tools.NewFakeEtcdClient(t) f.TestIndex = true - h := tools.EtcdHelper{f, testapi.Codec(), tools.RuntimeVersionAdapter{testapi.MetadataAccessor()}} + h := tools.NewEtcdHelper(f, testapi.Codec()) strategy := &testRESTStrategy{api.Scheme, api.SimpleNameGenerator, true, false} return f, &Etcd{ NewFunc: func() runtime.Object { return &api.Pod{} }, @@ -631,9 +631,9 @@ func TestEtcdDelete(t *testing.T) { for name, item := range table { fakeClient, registry := NewTestGenericEtcdRegistry(t) fakeClient.Data[path] = item.existing - _, err := registry.Delete(api.NewContext(), key) + obj, err := registry.Delete(api.NewContext(), key) if !item.errOK(err) { - t.Errorf("%v: unexpected error: %v", name, err) + t.Errorf("%v: unexpected error: %v (%#v)", name, err, obj) } if item.expect.E != nil { diff --git a/pkg/registry/limitrange/registry_test.go b/pkg/registry/limitrange/registry_test.go index 58479aac4b4..a3502ba91f2 100644 --- a/pkg/registry/limitrange/registry_test.go +++ b/pkg/registry/limitrange/registry_test.go @@ -36,7 +36,7 @@ import ( func NewTestLimitRangeEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, generic.Registry) { f := tools.NewFakeEtcdClient(t) f.TestIndex = true - h := tools.EtcdHelper{f, testapi.Codec(), tools.RuntimeVersionAdapter{testapi.MetadataAccessor()}} + h := tools.NewEtcdHelper(f, testapi.Codec()) return f, NewEtcdRegistry(h) } diff --git a/pkg/registry/namespace/etcd/etcd_test.go b/pkg/registry/namespace/etcd/etcd_test.go index 1e71dead83b..c029c2a6a9f 100644 --- a/pkg/registry/namespace/etcd/etcd_test.go +++ b/pkg/registry/namespace/etcd/etcd_test.go @@ -35,7 +35,7 @@ import ( func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) { fakeEtcdClient := tools.NewFakeEtcdClient(t) fakeEtcdClient.TestIndex = true - helper := tools.EtcdHelper{Client: fakeEtcdClient, Codec: latest.Codec, ResourceVersioner: tools.RuntimeVersionAdapter{latest.ResourceVersioner}} + helper := tools.NewEtcdHelper(fakeEtcdClient, latest.Codec) return fakeEtcdClient, helper } diff --git a/pkg/registry/pod/etcd/etcd_test.go b/pkg/registry/pod/etcd/etcd_test.go index 49052425b59..9546857467f 100644 --- a/pkg/registry/pod/etcd/etcd_test.go +++ b/pkg/registry/pod/etcd/etcd_test.go @@ -63,7 +63,7 @@ func (f *fakeCache) ClearPodStatus(namespace, name string) { func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) { fakeEtcdClient := tools.NewFakeEtcdClient(t) fakeEtcdClient.TestIndex = true - helper := tools.EtcdHelper{Client: fakeEtcdClient, Codec: latest.Codec, ResourceVersioner: tools.RuntimeVersionAdapter{latest.ResourceVersioner}} + helper := tools.NewEtcdHelper(fakeEtcdClient, latest.Codec) return fakeEtcdClient, helper } diff --git a/pkg/registry/resourcequota/etcd/etcd_test.go b/pkg/registry/resourcequota/etcd/etcd_test.go index 770e1ceb62a..a8a4002c050 100644 --- a/pkg/registry/resourcequota/etcd/etcd_test.go +++ b/pkg/registry/resourcequota/etcd/etcd_test.go @@ -40,7 +40,7 @@ import ( func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) { fakeEtcdClient := tools.NewFakeEtcdClient(t) fakeEtcdClient.TestIndex = true - helper := tools.EtcdHelper{Client: fakeEtcdClient, Codec: latest.Codec, ResourceVersioner: tools.RuntimeVersionAdapter{latest.ResourceVersioner}} + helper := tools.NewEtcdHelper(fakeEtcdClient, latest.Codec) return fakeEtcdClient, helper } diff --git a/pkg/registry/secret/registry_test.go b/pkg/registry/secret/registry_test.go index 79a60130c55..a8a7af977ee 100644 --- a/pkg/registry/secret/registry_test.go +++ b/pkg/registry/secret/registry_test.go @@ -35,7 +35,7 @@ import ( func NewTestSecretEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, generic.Registry) { f := tools.NewFakeEtcdClient(t) f.TestIndex = true - h := tools.EtcdHelper{f, testapi.Codec(), tools.RuntimeVersionAdapter{testapi.MetadataAccessor()}} + h := tools.NewEtcdHelper(f, testapi.Codec()) return f, NewEtcdRegistry(h) } diff --git a/pkg/tools/doc.go b/pkg/tools/doc.go index d1d8014cda2..2724e565b25 100644 --- a/pkg/tools/doc.go +++ b/pkg/tools/doc.go @@ -14,5 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package tools implements general tools which depend on the api package. +// Package tools implements types which help work with etcd which depend on the api package. +// TODO: move this package to an etcd specific utility package. package tools diff --git a/pkg/tools/etcd_tools.go b/pkg/tools/etcd_helper.go similarity index 72% rename from pkg/tools/etcd_tools.go rename to pkg/tools/etcd_helper.go index c92f2e505a8..124e5dbaa4a 100644 --- a/pkg/tools/etcd_tools.go +++ b/pkg/tools/etcd_helper.go @@ -24,7 +24,6 @@ import ( "net/http" "os/exec" "reflect" - "strconv" "github.com/GoogleCloudPlatform/kubernetes/pkg/conversion" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" @@ -33,82 +32,22 @@ import ( "github.com/golang/glog" ) -const ( - EtcdErrorCodeNotFound = 100 - EtcdErrorCodeTestFailed = 101 - EtcdErrorCodeNodeExist = 105 - EtcdErrorCodeValueRequired = 200 -) - -var ( - EtcdErrorNotFound = &etcd.EtcdError{ErrorCode: EtcdErrorCodeNotFound} - EtcdErrorTestFailed = &etcd.EtcdError{ErrorCode: EtcdErrorCodeTestFailed} - EtcdErrorNodeExist = &etcd.EtcdError{ErrorCode: EtcdErrorCodeNodeExist} - EtcdErrorValueRequired = &etcd.EtcdError{ErrorCode: EtcdErrorCodeValueRequired} -) - -// EtcdClient is an injectable interface for testing. -type EtcdClient interface { - GetCluster() []string - AddChild(key, data string, ttl uint64) (*etcd.Response, error) - Get(key string, sort, recursive bool) (*etcd.Response, error) - Set(key, value string, ttl uint64) (*etcd.Response, error) - Create(key, value string, ttl uint64) (*etcd.Response, error) - CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error) - Delete(key string, recursive bool) (*etcd.Response, error) - // I'd like to use directional channels here (e.g. <-chan) but this interface mimics - // the etcd client interface which doesn't, and it doesn't seem worth it to wrap the api. - Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) -} - -// EtcdGetSet interface exposes only the etcd operations needed by EtcdHelper. -type EtcdGetSet interface { - GetCluster() []string - Get(key string, sort, recursive bool) (*etcd.Response, error) - Set(key, value string, ttl uint64) (*etcd.Response, error) - Create(key, value string, ttl uint64) (*etcd.Response, error) - Delete(key string, recursive bool) (*etcd.Response, error) - CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error) - Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) -} - -type EtcdResourceVersioner interface { - SetResourceVersion(obj runtime.Object, version uint64) error - ResourceVersion(obj runtime.Object) (uint64, error) -} - -// RuntimeVersionAdapter converts a string based versioner to EtcdResourceVersioner -type RuntimeVersionAdapter struct { - Versioner runtime.ResourceVersioner -} - -// SetResourceVersion implements EtcdResourceVersioner -func (a RuntimeVersionAdapter) SetResourceVersion(obj runtime.Object, version uint64) error { - if version == 0 { - return a.Versioner.SetResourceVersion(obj, "") - } - s := strconv.FormatUint(version, 10) - return a.Versioner.SetResourceVersion(obj, s) -} - -// SetResourceVersion implements EtcdResourceVersioner -func (a RuntimeVersionAdapter) ResourceVersion(obj runtime.Object) (uint64, error) { - version, err := a.Versioner.ResourceVersion(obj) - if err != nil { - return 0, err - } - if version == "" { - return 0, nil - } - return strconv.ParseUint(version, 10, 64) -} - // EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client. type EtcdHelper struct { Client EtcdGetSet Codec runtime.Codec // optional, no atomic operations can be performed without this interface - ResourceVersioner EtcdResourceVersioner + Versioner EtcdVersioner +} + +// NewEtcdHelper creates a helper that works against objects that use the internal +// Kubernetes API objects. +func NewEtcdHelper(client EtcdGetSet, codec runtime.Codec) EtcdHelper { + return EtcdHelper{ + Client: client, + Codec: codec, + Versioner: APIObjectVersioner{}, + } } // IsEtcdNotFound returns true iff err is an etcd not found error. @@ -163,19 +102,6 @@ func (h *EtcdHelper) listEtcdNode(key string) ([]*etcd.Node, uint64, error) { return result.Node.Nodes, result.EtcdIndex, nil } -// ExtractList extracts a go object per etcd node into a slice with the resource version. -// DEPRECATED: Use ExtractToList instead, it's more convenient. -func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}, resourceVersion *uint64) error { - nodes, index, err := h.listEtcdNode(key) - if resourceVersion != nil { - *resourceVersion = index - } - if err != nil { - return err - } - return h.decodeNodeList(nodes, slicePtr) -} - // decodeNodeList walks the tree of each node in the list and decodes into the specified object func (h *EtcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) error { v, err := conversion.EnforcePtr(slicePtr) @@ -194,28 +120,31 @@ func (h *EtcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) er if err := h.Codec.DecodeInto([]byte(node.Value), obj.Interface().(runtime.Object)); err != nil { return err } - if h.ResourceVersioner != nil { - _ = h.ResourceVersioner.SetResourceVersion(obj.Interface().(runtime.Object), node.ModifiedIndex) + if h.Versioner != nil { // being unable to set the version does not prevent the object from being extracted + _ = h.Versioner.UpdateObject(obj.Interface().(runtime.Object), node) } v.Set(reflect.Append(v, obj.Elem())) } return nil } -// ExtractToList is just like ExtractList, but it works on a ThingyList api object. -// extracts a go object per etcd node into a slice with the resource version. +// ExtractToList works on a *List api object (an object that satisfies the runtime.IsList +// definition) and extracts a go object per etcd node into a slice with the resource version. func (h *EtcdHelper) ExtractToList(key string, listObj runtime.Object) error { - var resourceVersion uint64 listPtr, err := runtime.GetItemsPtr(listObj) if err != nil { return err } - if err := h.ExtractList(key, listPtr, &resourceVersion); err != nil { + nodes, index, err := h.listEtcdNode(key) + if err != nil { return err } - if h.ResourceVersioner != nil { - if err := h.ResourceVersioner.SetResourceVersion(listObj, resourceVersion); err != nil { + if err := h.decodeNodeList(nodes, listPtr); err != nil { + return err + } + if h.Versioner != nil { + if err := h.Versioner.UpdateList(listObj, index); err != nil { return err } } @@ -263,8 +192,8 @@ func (h *EtcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run } body = node.Value err = h.Codec.DecodeInto([]byte(body), objPtr) - if h.ResourceVersioner != nil { - _ = h.ResourceVersioner.SetResourceVersion(objPtr, node.ModifiedIndex) + if h.Versioner != nil { + _ = h.Versioner.UpdateObject(objPtr, node) // being unable to set the version does not prevent the object from being extracted } return body, node.ModifiedIndex, err @@ -278,8 +207,8 @@ func (h *EtcdHelper) CreateObj(key string, obj, out runtime.Object, ttl uint64) if err != nil { return err } - if h.ResourceVersioner != nil { - if version, err := h.ResourceVersioner.ResourceVersion(obj); err == nil && version != 0 { + if h.Versioner != nil { + if version, err := h.Versioner.ObjectResourceVersion(obj); err == nil && version != 0 { return errors.New("resourceVersion may not be set on objects to be created") } } @@ -319,7 +248,7 @@ func (h *EtcdHelper) DeleteObj(key string, out runtime.Object) error { // SetObj marshals obj via json, and stores under key. Will do an atomic update if obj's ResourceVersion // field is set. 'ttl' is time-to-live in seconds, and 0 means forever. If no error is returned and out is -//not nil, out will be set to the read value from etcd. +// not nil, out will be set to the read value from etcd. func (h *EtcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) error { var response *etcd.Response data, err := h.Codec.Encode(obj) @@ -328,8 +257,8 @@ func (h *EtcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) err } create := true - if h.ResourceVersioner != nil { - if version, err := h.ResourceVersioner.ResourceVersion(obj); err == nil && version != 0 { + if h.Versioner != nil { + if version, err := h.Versioner.ObjectResourceVersion(obj); err == nil && version != 0 { create = false response, err = h.Client.CompareAndSwap(key, string(data), ttl, "", version) if err != nil { diff --git a/pkg/tools/etcd_tools_test.go b/pkg/tools/etcd_helper_test.go similarity index 95% rename from pkg/tools/etcd_tools_test.go rename to pkg/tools/etcd_helper_test.go index 20ebfb1a87a..c48ec9ba4f9 100644 --- a/pkg/tools/etcd_tools_test.go +++ b/pkg/tools/etcd_helper_test.go @@ -26,7 +26,6 @@ import ( "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" "github.com/GoogleCloudPlatform/kubernetes/pkg/conversion" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" @@ -44,7 +43,6 @@ func (*TestResource) IsAnAPIObject() {} var scheme *runtime.Scheme var codec runtime.Codec -var versioner = RuntimeVersionAdapter{meta.NewAccessor()} func init() { scheme = runtime.NewScheme() @@ -129,7 +127,7 @@ func TestExtractToList(t *testing.T) { } var got api.PodList - helper := EtcdHelper{fakeClient, testapi.Codec(), versioner} + helper := NewEtcdHelper(fakeClient, testapi.Codec()) err := helper.ExtractToList("/some/key", &got) if err != nil { t.Errorf("Unexpected error %v", err) @@ -212,7 +210,7 @@ func TestExtractToListAcrossDirectories(t *testing.T) { } var got api.PodList - helper := EtcdHelper{fakeClient, testapi.Codec(), versioner} + helper := NewEtcdHelper(fakeClient, testapi.Codec()) err := helper.ExtractToList("/some/key", &got) if err != nil { t.Errorf("Unexpected error %v", err) @@ -282,7 +280,7 @@ func TestExtractToListExcludesDirectories(t *testing.T) { } var got api.PodList - helper := EtcdHelper{fakeClient, testapi.Codec(), versioner} + helper := NewEtcdHelper(fakeClient, testapi.Codec()) err := helper.ExtractToList("/some/key", &got) if err != nil { t.Errorf("Unexpected error %v", err) @@ -302,7 +300,7 @@ func TestExtractObj(t *testing.T) { }, } fakeClient.Set("/some/key", runtime.EncodeOrDie(testapi.Codec(), &expect), 0) - helper := EtcdHelper{fakeClient, testapi.Codec(), versioner} + helper := NewEtcdHelper(fakeClient, testapi.Codec()) var got api.Pod err := helper.ExtractObj("/some/key", &got, false) if err != nil { @@ -335,7 +333,7 @@ func TestExtractObjNotFoundErr(t *testing.T) { }, }, } - helper := EtcdHelper{fakeClient, codec, versioner} + helper := NewEtcdHelper(fakeClient, codec) try := func(key string) { var got api.Pod err := helper.ExtractObj(key, &got, false) @@ -356,7 +354,7 @@ func TestExtractObjNotFoundErr(t *testing.T) { func TestCreateObj(t *testing.T) { obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} fakeClient := NewFakeEtcdClient(t) - helper := EtcdHelper{fakeClient, testapi.Codec(), versioner} + helper := NewEtcdHelper(fakeClient, testapi.Codec()) returnedObj := &api.Pod{} err := helper.CreateObj("/some/key", obj, returnedObj, 5) if err != nil { @@ -381,7 +379,7 @@ func TestCreateObj(t *testing.T) { func TestCreateObjNilOutParam(t *testing.T) { obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} fakeClient := NewFakeEtcdClient(t) - helper := EtcdHelper{fakeClient, testapi.Codec(), versioner} + helper := NewEtcdHelper(fakeClient, testapi.Codec()) err := helper.CreateObj("/some/key", obj, nil, 5) if err != nil { t.Errorf("Unexpected error %#v", err) @@ -391,7 +389,7 @@ func TestCreateObjNilOutParam(t *testing.T) { func TestSetObj(t *testing.T) { obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} fakeClient := NewFakeEtcdClient(t) - helper := EtcdHelper{fakeClient, testapi.Codec(), versioner} + helper := NewEtcdHelper(fakeClient, testapi.Codec()) returnedObj := &api.Pod{} err := helper.SetObj("/some/key", obj, returnedObj, 5) if err != nil { @@ -418,7 +416,7 @@ func TestSetObjFailCAS(t *testing.T) { obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", ResourceVersion: "1"}} fakeClient := NewFakeEtcdClient(t) fakeClient.CasErr = fakeClient.NewError(123) - helper := EtcdHelper{fakeClient, testapi.Codec(), versioner} + helper := NewEtcdHelper(fakeClient, testapi.Codec()) err := helper.SetObj("/some/key", obj, nil, 5) if err == nil { t.Errorf("Expecting error.") @@ -438,7 +436,7 @@ func TestSetObjWithVersion(t *testing.T) { }, } - helper := EtcdHelper{fakeClient, testapi.Codec(), versioner} + helper := NewEtcdHelper(fakeClient, testapi.Codec()) returnedObj := &api.Pod{} err := helper.SetObj("/some/key", obj, returnedObj, 7) if err != nil { @@ -500,7 +498,7 @@ func TestSetObjNilOutParam(t *testing.T) { func TestAtomicUpdate(t *testing.T) { fakeClient := NewFakeEtcdClient(t) fakeClient.TestIndex = true - helper := EtcdHelper{fakeClient, codec, versioner} + helper := NewEtcdHelper(fakeClient, codec) // Create a new node. fakeClient.ExpectNotFoundGet("/some/key") @@ -554,7 +552,7 @@ func TestAtomicUpdate(t *testing.T) { func TestAtomicUpdateNoChange(t *testing.T) { fakeClient := NewFakeEtcdClient(t) fakeClient.TestIndex = true - helper := EtcdHelper{fakeClient, codec, versioner} + helper := NewEtcdHelper(fakeClient, codec) // Create a new node. fakeClient.ExpectNotFoundGet("/some/key") @@ -585,7 +583,7 @@ func TestAtomicUpdateNoChange(t *testing.T) { func TestAtomicUpdateKeyNotFound(t *testing.T) { fakeClient := NewFakeEtcdClient(t) fakeClient.TestIndex = true - helper := EtcdHelper{fakeClient, codec, versioner} + helper := NewEtcdHelper(fakeClient, codec) // Create a new node. fakeClient.ExpectNotFoundGet("/some/key") @@ -611,7 +609,7 @@ func TestAtomicUpdateKeyNotFound(t *testing.T) { func TestAtomicUpdate_CreateCollision(t *testing.T) { fakeClient := NewFakeEtcdClient(t) fakeClient.TestIndex = true - helper := EtcdHelper{fakeClient, codec, versioner} + helper := NewEtcdHelper(fakeClient, codec) fakeClient.ExpectNotFoundGet("/some/key") diff --git a/pkg/tools/etcd_tools_watch.go b/pkg/tools/etcd_helper_watch.go similarity index 91% rename from pkg/tools/etcd_tools_watch.go rename to pkg/tools/etcd_helper_watch.go index 6e5027acbb8..7aad160317b 100644 --- a/pkg/tools/etcd_tools_watch.go +++ b/pkg/tools/etcd_helper_watch.go @@ -61,7 +61,7 @@ func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) { // watch.Interface. resourceVersion may be used to specify what version to begin // watching (e.g., for reconnecting without missing any updates). func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) { - w := newEtcdWatcher(true, exceptKey(key), filter, h.Codec, h.ResourceVersioner, nil) + w := newEtcdWatcher(true, exceptKey(key), filter, h.Codec, h.Versioner, nil) go w.etcdWatch(h.Client, key, resourceVersion) return w, nil } @@ -90,7 +90,7 @@ func (h *EtcdHelper) Watch(key string, resourceVersion uint64) watch.Interface { // // Errors will be sent down the channel. func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) watch.Interface { - w := newEtcdWatcher(false, nil, Everything, h.Codec, h.ResourceVersioner, transform) + w := newEtcdWatcher(false, nil, Everything, h.Codec, h.Versioner, transform) go w.etcdWatch(h.Client, key, resourceVersion) return w } @@ -111,7 +111,7 @@ func exceptKey(except string) includeFunc { // etcdWatcher converts a native etcd watch to a watch.Interface. type etcdWatcher struct { encoding runtime.Codec - versioner EtcdResourceVersioner + versioner EtcdVersioner transform TransformFunc list bool // If we're doing a recursive watch, should be true. @@ -137,7 +137,7 @@ const watchWaitDuration = 100 * time.Millisecond // newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform // and a versioner, the versioner must be able to handle the objects that transform creates. -func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding runtime.Codec, versioner EtcdResourceVersioner, transform TransformFunc) *etcdWatcher { +func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding runtime.Codec, versioner EtcdVersioner, transform TransformFunc) *etcdWatcher { w := &etcdWatcher{ encoding: encoding, versioner: versioner, @@ -240,16 +240,16 @@ func (w *etcdWatcher) translate() { } } -func (w *etcdWatcher) decodeObject(data []byte, index uint64) (runtime.Object, error) { - obj, err := w.encoding.Decode(data) +func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) { + obj, err := w.encoding.Decode([]byte(node.Value)) if err != nil { return nil, err } // ensure resource version is set on the object we load from etcd if w.versioner != nil { - if err := w.versioner.SetResourceVersion(obj, index); err != nil { - glog.Errorf("failure to version api object (%d) %#v: %v", index, obj, err) + if err := w.versioner.UpdateObject(obj, node); err != nil { + glog.Errorf("failure to version api object (%d) %#v: %v", node.ModifiedIndex, obj, err) } } @@ -273,10 +273,9 @@ func (w *etcdWatcher) sendAdd(res *etcd.Response) { if w.include != nil && !w.include(res.Node.Key) { return } - data := []byte(res.Node.Value) - obj, err := w.decodeObject(data, res.Node.ModifiedIndex) + obj, err := w.decodeObject(res.Node) if err != nil { - glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(data), res, res.Node) + glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(res.Node.Value), res, res.Node) // TODO: expose an error through watch.Interface? // Ignore this value. If we stop the watch on a bad value, a client that uses // the resourceVersion to resume will never be able to get past a bad value. @@ -303,10 +302,9 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) { if w.include != nil && !w.include(res.Node.Key) { return } - curData := []byte(res.Node.Value) - curObj, err := w.decodeObject(curData, res.Node.ModifiedIndex) + curObj, err := w.decodeObject(res.Node) if err != nil { - glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(curData), res, res.Node) + glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(res.Node.Value), res, res.Node) // TODO: expose an error through watch.Interface? // Ignore this value. If we stop the watch on a bad value, a client that uses // the resourceVersion to resume will never be able to get past a bad value. @@ -317,7 +315,7 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) { var oldObj runtime.Object if res.PrevNode != nil && res.PrevNode.Value != "" { // Ignore problems reading the old object. - if oldObj, err = w.decodeObject([]byte(res.PrevNode.Value), res.PrevNode.ModifiedIndex); err == nil { + if oldObj, err = w.decodeObject(res.PrevNode); err == nil { oldObjPasses = w.filter(oldObj) } } @@ -352,17 +350,16 @@ func (w *etcdWatcher) sendDelete(res *etcd.Response) { if w.include != nil && !w.include(res.PrevNode.Key) { return } - data := []byte(res.PrevNode.Value) - index := res.PrevNode.ModifiedIndex + node := *res.PrevNode if res.Node != nil { // Note that this sends the *old* object with the etcd index for the time at // which it gets deleted. This will allow users to restart the watch at the right // index. - index = res.Node.ModifiedIndex + node.ModifiedIndex = res.Node.ModifiedIndex } - obj, err := w.decodeObject(data, index) + obj, err := w.decodeObject(&node) if err != nil { - glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(data), res, res.PrevNode) + glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(res.PrevNode.Value), res, res.PrevNode) // TODO: expose an error through watch.Interface? // Ignore this value. If we stop the watch on a bad value, a client that uses // the resourceVersion to resume will never be able to get past a bad value. diff --git a/pkg/tools/etcd_tools_watch_test.go b/pkg/tools/etcd_helper_watch_test.go similarity index 99% rename from pkg/tools/etcd_tools_watch_test.go rename to pkg/tools/etcd_helper_watch_test.go index 9685898d6bb..c468122bb01 100644 --- a/pkg/tools/etcd_tools_watch_test.go +++ b/pkg/tools/etcd_helper_watch_test.go @@ -29,6 +29,8 @@ import ( "github.com/coreos/go-etcd/etcd" ) +var versioner = APIObjectVersioner{} + func TestWatchInterpretations(t *testing.T) { codec := latest.Codec // Declare some pods to make the test cases compact. diff --git a/pkg/tools/etcd_object.go b/pkg/tools/etcd_object.go new file mode 100644 index 00000000000..a50adb290b6 --- /dev/null +++ b/pkg/tools/etcd_object.go @@ -0,0 +1,74 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tools + +import ( + "strconv" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/coreos/go-etcd/etcd" +) + +// APIObjectVersioner implements versioning and extracting etcd node information +// for objects that have an embedded ObjectMeta or ListMeta field. +type APIObjectVersioner struct{} + +// UpdateObject implements EtcdVersioner +func (a APIObjectVersioner) UpdateObject(obj runtime.Object, node *etcd.Node) error { + objectMeta, err := api.ObjectMetaFor(obj) + if err != nil { + return err + } + version := node.ModifiedIndex + versionString := "" + if version != 0 { + versionString = strconv.FormatUint(version, 10) + } + objectMeta.ResourceVersion = versionString + return nil +} + +// UpdateList implements EtcdVersioner +func (a APIObjectVersioner) UpdateList(obj runtime.Object, resourceVersion uint64) error { + listMeta, err := api.ListMetaFor(obj) + if err != nil || listMeta == nil { + return err + } + versionString := "" + if resourceVersion != 0 { + versionString = strconv.FormatUint(resourceVersion, 10) + } + listMeta.ResourceVersion = versionString + return nil +} + +// ObjectResourceVersion implements EtcdVersioner +func (a APIObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) { + meta, err := api.ObjectMetaFor(obj) + if err != nil { + return 0, err + } + version := meta.ResourceVersion + if len(version) == 0 { + return 0, nil + } + return strconv.ParseUint(version, 10, 64) +} + +// APIObjectVersioner implements EtcdVersioner +var _ EtcdVersioner = APIObjectVersioner{} diff --git a/pkg/tools/fake_etcd_client.go b/pkg/tools/fake_etcd_client.go index b66edab46ba..5ee00cc23b3 100644 --- a/pkg/tools/fake_etcd_client.go +++ b/pkg/tools/fake_etcd_client.go @@ -290,6 +290,10 @@ func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, err Index: f.ChangeIndex, } } + if IsEtcdNotFound(existing.E) { + f.DeletedKeys = append(f.DeletedKeys, key) + return existing.R, existing.E + } index := f.generateIndex() f.Data[key] = EtcdResponseWithError{ R: &etcd.Response{}, diff --git a/pkg/tools/interfaces.go b/pkg/tools/interfaces.go new file mode 100644 index 00000000000..609039ee420 --- /dev/null +++ b/pkg/tools/interfaces.go @@ -0,0 +1,77 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tools + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/coreos/go-etcd/etcd" +) + +const ( + EtcdErrorCodeNotFound = 100 + EtcdErrorCodeTestFailed = 101 + EtcdErrorCodeNodeExist = 105 + EtcdErrorCodeValueRequired = 200 +) + +var ( + EtcdErrorNotFound = &etcd.EtcdError{ErrorCode: EtcdErrorCodeNotFound} + EtcdErrorTestFailed = &etcd.EtcdError{ErrorCode: EtcdErrorCodeTestFailed} + EtcdErrorNodeExist = &etcd.EtcdError{ErrorCode: EtcdErrorCodeNodeExist} + EtcdErrorValueRequired = &etcd.EtcdError{ErrorCode: EtcdErrorCodeValueRequired} +) + +// EtcdClient is an injectable interface for testing. +type EtcdClient interface { + GetCluster() []string + AddChild(key, data string, ttl uint64) (*etcd.Response, error) + Get(key string, sort, recursive bool) (*etcd.Response, error) + Set(key, value string, ttl uint64) (*etcd.Response, error) + Create(key, value string, ttl uint64) (*etcd.Response, error) + CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error) + Delete(key string, recursive bool) (*etcd.Response, error) + // I'd like to use directional channels here (e.g. <-chan) but this interface mimics + // the etcd client interface which doesn't, and it doesn't seem worth it to wrap the api. + Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) +} + +// EtcdGetSet interface exposes only the etcd operations needed by EtcdHelper. +type EtcdGetSet interface { + GetCluster() []string + Get(key string, sort, recursive bool) (*etcd.Response, error) + Set(key, value string, ttl uint64) (*etcd.Response, error) + Create(key, value string, ttl uint64) (*etcd.Response, error) + Delete(key string, recursive bool) (*etcd.Response, error) + CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error) + Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) +} + +// EtcdVersioner abstracts setting and retrieving fields from the etcd response onto the object +// or list. +type EtcdVersioner interface { + // UpdateObject sets etcd storage metadata into an API object. Returns an error if the object + // cannot be updated correctly. May return nil if the requested object does not need metadata + // from etcd. + UpdateObject(obj runtime.Object, node *etcd.Node) error + // UpdateList sets the resource version into an API list object. Returns an error if the object + // cannot be updated correctly. May return nil if the requested object does not need metadata + // from etcd. + UpdateList(obj runtime.Object, resourceVersion uint64) error + // ObjectResourceVersion returns the resource version (for persistence) of the specified object. + // Should return an error if the specified object does not have a persistable version. + ObjectResourceVersion(obj runtime.Object) (uint64, error) +} diff --git a/test/integration/etcd_tools_test.go b/test/integration/etcd_tools_test.go index 1d26b6cc7ee..1bb576ddd8e 100644 --- a/test/integration/etcd_tools_test.go +++ b/test/integration/etcd_tools_test.go @@ -93,7 +93,7 @@ func TestExtractObj(t *testing.T) { func TestWatch(t *testing.T) { client := newEtcdClient() - helper := tools.EtcdHelper{Client: client, Codec: latest.Codec, ResourceVersioner: tools.RuntimeVersionAdapter{latest.ResourceVersioner}} + helper := tools.NewEtcdHelper(client, latest.Codec) withEtcdKey(func(key string) { resp, err := client.Set(key, runtime.EncodeOrDie(v1beta1.Codec, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0) if err != nil {