mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 23:15:14 +00:00
Prepare EtcdHelper to extract more data from Node
In order to support graceful deletion, the resource object will need access to the TTL value in etcd. Also, in the future we may want to get the creation index (distinct from modifiedindex) and expose it to clients. Change EtcdResourceVersioner to be more type specific (objects vs lists) and provide a default implementation that relies on the internal API convention. Also, rename etcd_tools.go to etcd_helper.go and split a few things up.
This commit is contained in:
parent
4ca90e0343
commit
bddef32193
@ -50,11 +50,6 @@ var Codec = v1beta1.Codec
|
||||
// accessor is the shared static metadata accessor for the API.
|
||||
var accessor = meta.NewAccessor()
|
||||
|
||||
// ResourceVersioner describes a default versioner that can handle all types
|
||||
// of versioning.
|
||||
// TODO: when versioning changes, make this part of each API definition.
|
||||
var ResourceVersioner = runtime.ResourceVersioner(accessor)
|
||||
|
||||
// SelfLinker can set or get the SelfLink field of all API types.
|
||||
// TODO: when versioning changes, make this part of each API definition.
|
||||
// TODO(lavalamp): Combine SelfLinker & ResourceVersioner interfaces, force all uses
|
||||
|
@ -27,7 +27,7 @@ import (
|
||||
|
||||
func TestResourceVersioner(t *testing.T) {
|
||||
pod := internal.Pod{ObjectMeta: internal.ObjectMeta{ResourceVersion: "10"}}
|
||||
version, err := ResourceVersioner.ResourceVersion(&pod)
|
||||
version, err := accessor.ResourceVersion(&pod)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
@ -36,7 +36,7 @@ func TestResourceVersioner(t *testing.T) {
|
||||
}
|
||||
|
||||
podList := internal.PodList{ListMeta: internal.ListMeta{ResourceVersion: "10"}}
|
||||
version, err = ResourceVersioner.ResourceVersion(&podList)
|
||||
version, err = accessor.ResourceVersion(&podList)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -35,16 +35,27 @@ func HasObjectMetaSystemFieldValues(meta *ObjectMeta) bool {
|
||||
len(meta.UID) != 0
|
||||
}
|
||||
|
||||
// GetObjectMetaPtr returns a pointer to a provided object's ObjectMeta.
|
||||
// ObjectMetaFor returns a pointer to a provided object's ObjectMeta.
|
||||
// TODO: allow runtime.Unknown to extract this object
|
||||
func ObjectMetaFor(obj runtime.Object) (*ObjectMeta, error) {
|
||||
v, err := conversion.EnforcePtr(obj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var objectMeta *ObjectMeta
|
||||
if err := runtime.FieldPtr(v, "ObjectMeta", &objectMeta); err != nil {
|
||||
var meta *ObjectMeta
|
||||
err = runtime.FieldPtr(v, "ObjectMeta", &meta)
|
||||
return meta, err
|
||||
}
|
||||
|
||||
// ListMetaFor returns a pointer to a provided object's ListMeta,
|
||||
// or an error if the object does not have that pointer.
|
||||
// TODO: allow runtime.Unknown to extract this object
|
||||
func ListMetaFor(obj runtime.Object) (*ListMeta, error) {
|
||||
v, err := conversion.EnforcePtr(obj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return objectMeta, nil
|
||||
var meta *ListMeta
|
||||
err = runtime.FieldPtr(v, "ListMeta", &meta)
|
||||
return meta, err
|
||||
}
|
||||
|
@ -178,7 +178,7 @@ func NewEtcdHelper(client tools.EtcdGetSet, version string) (helper tools.EtcdHe
|
||||
if err != nil {
|
||||
return helper, err
|
||||
}
|
||||
return tools.EtcdHelper{client, versionInterfaces.Codec, tools.RuntimeVersionAdapter{versionInterfaces.MetadataAccessor}}, nil
|
||||
return tools.NewEtcdHelper(client, versionInterfaces.Codec), nil
|
||||
}
|
||||
|
||||
// setDefaults fills in any fields not set that are required to have valid data.
|
||||
|
@ -36,12 +36,12 @@ import (
|
||||
)
|
||||
|
||||
func NewTestEtcdRegistry(client tools.EtcdClient) *Registry {
|
||||
registry := NewRegistry(tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}}, nil)
|
||||
registry := NewRegistry(tools.NewEtcdHelper(client, latest.Codec), nil)
|
||||
return registry
|
||||
}
|
||||
|
||||
func NewTestEtcdRegistryWithPods(client tools.EtcdClient) *Registry {
|
||||
helper := tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}}
|
||||
helper := tools.NewEtcdHelper(client, latest.Codec)
|
||||
podStorage, _, _ := podetcd.NewREST(helper)
|
||||
registry := NewRegistry(helper, pod.NewRegistry(podStorage))
|
||||
return registry
|
||||
|
@ -37,7 +37,7 @@ var testTTL uint64 = 60
|
||||
func NewTestEventEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, generic.Registry) {
|
||||
f := tools.NewFakeEtcdClient(t)
|
||||
f.TestIndex = true
|
||||
h := tools.EtcdHelper{f, testapi.Codec(), tools.RuntimeVersionAdapter{testapi.MetadataAccessor()}}
|
||||
h := tools.NewEtcdHelper(f, testapi.Codec())
|
||||
return f, NewEtcdRegistry(h, testTTL)
|
||||
}
|
||||
|
||||
|
@ -252,7 +252,7 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool
|
||||
creating := false
|
||||
out := e.NewFunc()
|
||||
err = e.Helper.AtomicUpdate(key, out, true, func(existing runtime.Object) (runtime.Object, uint64, error) {
|
||||
version, err := e.Helper.ResourceVersioner.ResourceVersion(existing)
|
||||
version, err := e.Helper.Versioner.ObjectResourceVersion(existing)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
@ -275,7 +275,7 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool
|
||||
}
|
||||
|
||||
creating = false
|
||||
newVersion, err := e.Helper.ResourceVersioner.ResourceVersion(obj)
|
||||
newVersion, err := e.Helper.Versioner.ObjectResourceVersion(obj)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
@ -64,7 +64,7 @@ func hasCreated(t *testing.T, pod *api.Pod) func(runtime.Object) bool {
|
||||
func NewTestGenericEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, *Etcd) {
|
||||
f := tools.NewFakeEtcdClient(t)
|
||||
f.TestIndex = true
|
||||
h := tools.EtcdHelper{f, testapi.Codec(), tools.RuntimeVersionAdapter{testapi.MetadataAccessor()}}
|
||||
h := tools.NewEtcdHelper(f, testapi.Codec())
|
||||
strategy := &testRESTStrategy{api.Scheme, api.SimpleNameGenerator, true, false}
|
||||
return f, &Etcd{
|
||||
NewFunc: func() runtime.Object { return &api.Pod{} },
|
||||
@ -631,9 +631,9 @@ func TestEtcdDelete(t *testing.T) {
|
||||
for name, item := range table {
|
||||
fakeClient, registry := NewTestGenericEtcdRegistry(t)
|
||||
fakeClient.Data[path] = item.existing
|
||||
_, err := registry.Delete(api.NewContext(), key)
|
||||
obj, err := registry.Delete(api.NewContext(), key)
|
||||
if !item.errOK(err) {
|
||||
t.Errorf("%v: unexpected error: %v", name, err)
|
||||
t.Errorf("%v: unexpected error: %v (%#v)", name, err, obj)
|
||||
}
|
||||
|
||||
if item.expect.E != nil {
|
||||
|
@ -36,7 +36,7 @@ import (
|
||||
func NewTestLimitRangeEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, generic.Registry) {
|
||||
f := tools.NewFakeEtcdClient(t)
|
||||
f.TestIndex = true
|
||||
h := tools.EtcdHelper{f, testapi.Codec(), tools.RuntimeVersionAdapter{testapi.MetadataAccessor()}}
|
||||
h := tools.NewEtcdHelper(f, testapi.Codec())
|
||||
return f, NewEtcdRegistry(h)
|
||||
}
|
||||
|
||||
|
@ -35,7 +35,7 @@ import (
|
||||
func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) {
|
||||
fakeEtcdClient := tools.NewFakeEtcdClient(t)
|
||||
fakeEtcdClient.TestIndex = true
|
||||
helper := tools.EtcdHelper{Client: fakeEtcdClient, Codec: latest.Codec, ResourceVersioner: tools.RuntimeVersionAdapter{latest.ResourceVersioner}}
|
||||
helper := tools.NewEtcdHelper(fakeEtcdClient, latest.Codec)
|
||||
return fakeEtcdClient, helper
|
||||
}
|
||||
|
||||
|
@ -63,7 +63,7 @@ func (f *fakeCache) ClearPodStatus(namespace, name string) {
|
||||
func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) {
|
||||
fakeEtcdClient := tools.NewFakeEtcdClient(t)
|
||||
fakeEtcdClient.TestIndex = true
|
||||
helper := tools.EtcdHelper{Client: fakeEtcdClient, Codec: latest.Codec, ResourceVersioner: tools.RuntimeVersionAdapter{latest.ResourceVersioner}}
|
||||
helper := tools.NewEtcdHelper(fakeEtcdClient, latest.Codec)
|
||||
return fakeEtcdClient, helper
|
||||
}
|
||||
|
||||
|
@ -40,7 +40,7 @@ import (
|
||||
func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) {
|
||||
fakeEtcdClient := tools.NewFakeEtcdClient(t)
|
||||
fakeEtcdClient.TestIndex = true
|
||||
helper := tools.EtcdHelper{Client: fakeEtcdClient, Codec: latest.Codec, ResourceVersioner: tools.RuntimeVersionAdapter{latest.ResourceVersioner}}
|
||||
helper := tools.NewEtcdHelper(fakeEtcdClient, latest.Codec)
|
||||
return fakeEtcdClient, helper
|
||||
}
|
||||
|
||||
|
@ -35,7 +35,7 @@ import (
|
||||
func NewTestSecretEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, generic.Registry) {
|
||||
f := tools.NewFakeEtcdClient(t)
|
||||
f.TestIndex = true
|
||||
h := tools.EtcdHelper{f, testapi.Codec(), tools.RuntimeVersionAdapter{testapi.MetadataAccessor()}}
|
||||
h := tools.NewEtcdHelper(f, testapi.Codec())
|
||||
return f, NewEtcdRegistry(h)
|
||||
}
|
||||
|
||||
|
@ -14,5 +14,6 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package tools implements general tools which depend on the api package.
|
||||
// Package tools implements types which help work with etcd which depend on the api package.
|
||||
// TODO: move this package to an etcd specific utility package.
|
||||
package tools
|
||||
|
@ -24,7 +24,6 @@ import (
|
||||
"net/http"
|
||||
"os/exec"
|
||||
"reflect"
|
||||
"strconv"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
@ -33,82 +32,22 @@ import (
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
const (
|
||||
EtcdErrorCodeNotFound = 100
|
||||
EtcdErrorCodeTestFailed = 101
|
||||
EtcdErrorCodeNodeExist = 105
|
||||
EtcdErrorCodeValueRequired = 200
|
||||
)
|
||||
|
||||
var (
|
||||
EtcdErrorNotFound = &etcd.EtcdError{ErrorCode: EtcdErrorCodeNotFound}
|
||||
EtcdErrorTestFailed = &etcd.EtcdError{ErrorCode: EtcdErrorCodeTestFailed}
|
||||
EtcdErrorNodeExist = &etcd.EtcdError{ErrorCode: EtcdErrorCodeNodeExist}
|
||||
EtcdErrorValueRequired = &etcd.EtcdError{ErrorCode: EtcdErrorCodeValueRequired}
|
||||
)
|
||||
|
||||
// EtcdClient is an injectable interface for testing.
|
||||
type EtcdClient interface {
|
||||
GetCluster() []string
|
||||
AddChild(key, data string, ttl uint64) (*etcd.Response, error)
|
||||
Get(key string, sort, recursive bool) (*etcd.Response, error)
|
||||
Set(key, value string, ttl uint64) (*etcd.Response, error)
|
||||
Create(key, value string, ttl uint64) (*etcd.Response, error)
|
||||
CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error)
|
||||
Delete(key string, recursive bool) (*etcd.Response, error)
|
||||
// I'd like to use directional channels here (e.g. <-chan) but this interface mimics
|
||||
// the etcd client interface which doesn't, and it doesn't seem worth it to wrap the api.
|
||||
Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error)
|
||||
}
|
||||
|
||||
// EtcdGetSet interface exposes only the etcd operations needed by EtcdHelper.
|
||||
type EtcdGetSet interface {
|
||||
GetCluster() []string
|
||||
Get(key string, sort, recursive bool) (*etcd.Response, error)
|
||||
Set(key, value string, ttl uint64) (*etcd.Response, error)
|
||||
Create(key, value string, ttl uint64) (*etcd.Response, error)
|
||||
Delete(key string, recursive bool) (*etcd.Response, error)
|
||||
CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error)
|
||||
Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error)
|
||||
}
|
||||
|
||||
type EtcdResourceVersioner interface {
|
||||
SetResourceVersion(obj runtime.Object, version uint64) error
|
||||
ResourceVersion(obj runtime.Object) (uint64, error)
|
||||
}
|
||||
|
||||
// RuntimeVersionAdapter converts a string based versioner to EtcdResourceVersioner
|
||||
type RuntimeVersionAdapter struct {
|
||||
Versioner runtime.ResourceVersioner
|
||||
}
|
||||
|
||||
// SetResourceVersion implements EtcdResourceVersioner
|
||||
func (a RuntimeVersionAdapter) SetResourceVersion(obj runtime.Object, version uint64) error {
|
||||
if version == 0 {
|
||||
return a.Versioner.SetResourceVersion(obj, "")
|
||||
}
|
||||
s := strconv.FormatUint(version, 10)
|
||||
return a.Versioner.SetResourceVersion(obj, s)
|
||||
}
|
||||
|
||||
// SetResourceVersion implements EtcdResourceVersioner
|
||||
func (a RuntimeVersionAdapter) ResourceVersion(obj runtime.Object) (uint64, error) {
|
||||
version, err := a.Versioner.ResourceVersion(obj)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if version == "" {
|
||||
return 0, nil
|
||||
}
|
||||
return strconv.ParseUint(version, 10, 64)
|
||||
}
|
||||
|
||||
// EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client.
|
||||
type EtcdHelper struct {
|
||||
Client EtcdGetSet
|
||||
Codec runtime.Codec
|
||||
// optional, no atomic operations can be performed without this interface
|
||||
ResourceVersioner EtcdResourceVersioner
|
||||
Versioner EtcdVersioner
|
||||
}
|
||||
|
||||
// NewEtcdHelper creates a helper that works against objects that use the internal
|
||||
// Kubernetes API objects.
|
||||
func NewEtcdHelper(client EtcdGetSet, codec runtime.Codec) EtcdHelper {
|
||||
return EtcdHelper{
|
||||
Client: client,
|
||||
Codec: codec,
|
||||
Versioner: APIObjectVersioner{},
|
||||
}
|
||||
}
|
||||
|
||||
// IsEtcdNotFound returns true iff err is an etcd not found error.
|
||||
@ -163,19 +102,6 @@ func (h *EtcdHelper) listEtcdNode(key string) ([]*etcd.Node, uint64, error) {
|
||||
return result.Node.Nodes, result.EtcdIndex, nil
|
||||
}
|
||||
|
||||
// ExtractList extracts a go object per etcd node into a slice with the resource version.
|
||||
// DEPRECATED: Use ExtractToList instead, it's more convenient.
|
||||
func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}, resourceVersion *uint64) error {
|
||||
nodes, index, err := h.listEtcdNode(key)
|
||||
if resourceVersion != nil {
|
||||
*resourceVersion = index
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return h.decodeNodeList(nodes, slicePtr)
|
||||
}
|
||||
|
||||
// decodeNodeList walks the tree of each node in the list and decodes into the specified object
|
||||
func (h *EtcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) error {
|
||||
v, err := conversion.EnforcePtr(slicePtr)
|
||||
@ -194,28 +120,31 @@ func (h *EtcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) er
|
||||
if err := h.Codec.DecodeInto([]byte(node.Value), obj.Interface().(runtime.Object)); err != nil {
|
||||
return err
|
||||
}
|
||||
if h.ResourceVersioner != nil {
|
||||
_ = h.ResourceVersioner.SetResourceVersion(obj.Interface().(runtime.Object), node.ModifiedIndex)
|
||||
if h.Versioner != nil {
|
||||
// being unable to set the version does not prevent the object from being extracted
|
||||
_ = h.Versioner.UpdateObject(obj.Interface().(runtime.Object), node)
|
||||
}
|
||||
v.Set(reflect.Append(v, obj.Elem()))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ExtractToList is just like ExtractList, but it works on a ThingyList api object.
|
||||
// extracts a go object per etcd node into a slice with the resource version.
|
||||
// ExtractToList works on a *List api object (an object that satisfies the runtime.IsList
|
||||
// definition) and extracts a go object per etcd node into a slice with the resource version.
|
||||
func (h *EtcdHelper) ExtractToList(key string, listObj runtime.Object) error {
|
||||
var resourceVersion uint64
|
||||
listPtr, err := runtime.GetItemsPtr(listObj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := h.ExtractList(key, listPtr, &resourceVersion); err != nil {
|
||||
nodes, index, err := h.listEtcdNode(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if h.ResourceVersioner != nil {
|
||||
if err := h.ResourceVersioner.SetResourceVersion(listObj, resourceVersion); err != nil {
|
||||
if err := h.decodeNodeList(nodes, listPtr); err != nil {
|
||||
return err
|
||||
}
|
||||
if h.Versioner != nil {
|
||||
if err := h.Versioner.UpdateList(listObj, index); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -263,8 +192,8 @@ func (h *EtcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run
|
||||
}
|
||||
body = node.Value
|
||||
err = h.Codec.DecodeInto([]byte(body), objPtr)
|
||||
if h.ResourceVersioner != nil {
|
||||
_ = h.ResourceVersioner.SetResourceVersion(objPtr, node.ModifiedIndex)
|
||||
if h.Versioner != nil {
|
||||
_ = h.Versioner.UpdateObject(objPtr, node)
|
||||
// being unable to set the version does not prevent the object from being extracted
|
||||
}
|
||||
return body, node.ModifiedIndex, err
|
||||
@ -278,8 +207,8 @@ func (h *EtcdHelper) CreateObj(key string, obj, out runtime.Object, ttl uint64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if h.ResourceVersioner != nil {
|
||||
if version, err := h.ResourceVersioner.ResourceVersion(obj); err == nil && version != 0 {
|
||||
if h.Versioner != nil {
|
||||
if version, err := h.Versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
|
||||
return errors.New("resourceVersion may not be set on objects to be created")
|
||||
}
|
||||
}
|
||||
@ -319,7 +248,7 @@ func (h *EtcdHelper) DeleteObj(key string, out runtime.Object) error {
|
||||
|
||||
// SetObj marshals obj via json, and stores under key. Will do an atomic update if obj's ResourceVersion
|
||||
// field is set. 'ttl' is time-to-live in seconds, and 0 means forever. If no error is returned and out is
|
||||
//not nil, out will be set to the read value from etcd.
|
||||
// not nil, out will be set to the read value from etcd.
|
||||
func (h *EtcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) error {
|
||||
var response *etcd.Response
|
||||
data, err := h.Codec.Encode(obj)
|
||||
@ -328,8 +257,8 @@ func (h *EtcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) err
|
||||
}
|
||||
|
||||
create := true
|
||||
if h.ResourceVersioner != nil {
|
||||
if version, err := h.ResourceVersioner.ResourceVersion(obj); err == nil && version != 0 {
|
||||
if h.Versioner != nil {
|
||||
if version, err := h.Versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
|
||||
create = false
|
||||
response, err = h.Client.CompareAndSwap(key, string(data), ttl, "", version)
|
||||
if err != nil {
|
@ -26,7 +26,6 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
@ -44,7 +43,6 @@ func (*TestResource) IsAnAPIObject() {}
|
||||
|
||||
var scheme *runtime.Scheme
|
||||
var codec runtime.Codec
|
||||
var versioner = RuntimeVersionAdapter{meta.NewAccessor()}
|
||||
|
||||
func init() {
|
||||
scheme = runtime.NewScheme()
|
||||
@ -129,7 +127,7 @@ func TestExtractToList(t *testing.T) {
|
||||
}
|
||||
|
||||
var got api.PodList
|
||||
helper := EtcdHelper{fakeClient, testapi.Codec(), versioner}
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec())
|
||||
err := helper.ExtractToList("/some/key", &got)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
@ -212,7 +210,7 @@ func TestExtractToListAcrossDirectories(t *testing.T) {
|
||||
}
|
||||
|
||||
var got api.PodList
|
||||
helper := EtcdHelper{fakeClient, testapi.Codec(), versioner}
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec())
|
||||
err := helper.ExtractToList("/some/key", &got)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
@ -282,7 +280,7 @@ func TestExtractToListExcludesDirectories(t *testing.T) {
|
||||
}
|
||||
|
||||
var got api.PodList
|
||||
helper := EtcdHelper{fakeClient, testapi.Codec(), versioner}
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec())
|
||||
err := helper.ExtractToList("/some/key", &got)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
@ -302,7 +300,7 @@ func TestExtractObj(t *testing.T) {
|
||||
},
|
||||
}
|
||||
fakeClient.Set("/some/key", runtime.EncodeOrDie(testapi.Codec(), &expect), 0)
|
||||
helper := EtcdHelper{fakeClient, testapi.Codec(), versioner}
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec())
|
||||
var got api.Pod
|
||||
err := helper.ExtractObj("/some/key", &got, false)
|
||||
if err != nil {
|
||||
@ -335,7 +333,7 @@ func TestExtractObjNotFoundErr(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
helper := EtcdHelper{fakeClient, codec, versioner}
|
||||
helper := NewEtcdHelper(fakeClient, codec)
|
||||
try := func(key string) {
|
||||
var got api.Pod
|
||||
err := helper.ExtractObj(key, &got, false)
|
||||
@ -356,7 +354,7 @@ func TestExtractObjNotFoundErr(t *testing.T) {
|
||||
func TestCreateObj(t *testing.T) {
|
||||
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
helper := EtcdHelper{fakeClient, testapi.Codec(), versioner}
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec())
|
||||
returnedObj := &api.Pod{}
|
||||
err := helper.CreateObj("/some/key", obj, returnedObj, 5)
|
||||
if err != nil {
|
||||
@ -381,7 +379,7 @@ func TestCreateObj(t *testing.T) {
|
||||
func TestCreateObjNilOutParam(t *testing.T) {
|
||||
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
helper := EtcdHelper{fakeClient, testapi.Codec(), versioner}
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec())
|
||||
err := helper.CreateObj("/some/key", obj, nil, 5)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %#v", err)
|
||||
@ -391,7 +389,7 @@ func TestCreateObjNilOutParam(t *testing.T) {
|
||||
func TestSetObj(t *testing.T) {
|
||||
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
helper := EtcdHelper{fakeClient, testapi.Codec(), versioner}
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec())
|
||||
returnedObj := &api.Pod{}
|
||||
err := helper.SetObj("/some/key", obj, returnedObj, 5)
|
||||
if err != nil {
|
||||
@ -418,7 +416,7 @@ func TestSetObjFailCAS(t *testing.T) {
|
||||
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", ResourceVersion: "1"}}
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
fakeClient.CasErr = fakeClient.NewError(123)
|
||||
helper := EtcdHelper{fakeClient, testapi.Codec(), versioner}
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec())
|
||||
err := helper.SetObj("/some/key", obj, nil, 5)
|
||||
if err == nil {
|
||||
t.Errorf("Expecting error.")
|
||||
@ -438,7 +436,7 @@ func TestSetObjWithVersion(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
helper := EtcdHelper{fakeClient, testapi.Codec(), versioner}
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec())
|
||||
returnedObj := &api.Pod{}
|
||||
err := helper.SetObj("/some/key", obj, returnedObj, 7)
|
||||
if err != nil {
|
||||
@ -500,7 +498,7 @@ func TestSetObjNilOutParam(t *testing.T) {
|
||||
func TestAtomicUpdate(t *testing.T) {
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
fakeClient.TestIndex = true
|
||||
helper := EtcdHelper{fakeClient, codec, versioner}
|
||||
helper := NewEtcdHelper(fakeClient, codec)
|
||||
|
||||
// Create a new node.
|
||||
fakeClient.ExpectNotFoundGet("/some/key")
|
||||
@ -554,7 +552,7 @@ func TestAtomicUpdate(t *testing.T) {
|
||||
func TestAtomicUpdateNoChange(t *testing.T) {
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
fakeClient.TestIndex = true
|
||||
helper := EtcdHelper{fakeClient, codec, versioner}
|
||||
helper := NewEtcdHelper(fakeClient, codec)
|
||||
|
||||
// Create a new node.
|
||||
fakeClient.ExpectNotFoundGet("/some/key")
|
||||
@ -585,7 +583,7 @@ func TestAtomicUpdateNoChange(t *testing.T) {
|
||||
func TestAtomicUpdateKeyNotFound(t *testing.T) {
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
fakeClient.TestIndex = true
|
||||
helper := EtcdHelper{fakeClient, codec, versioner}
|
||||
helper := NewEtcdHelper(fakeClient, codec)
|
||||
|
||||
// Create a new node.
|
||||
fakeClient.ExpectNotFoundGet("/some/key")
|
||||
@ -611,7 +609,7 @@ func TestAtomicUpdateKeyNotFound(t *testing.T) {
|
||||
func TestAtomicUpdate_CreateCollision(t *testing.T) {
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
fakeClient.TestIndex = true
|
||||
helper := EtcdHelper{fakeClient, codec, versioner}
|
||||
helper := NewEtcdHelper(fakeClient, codec)
|
||||
|
||||
fakeClient.ExpectNotFoundGet("/some/key")
|
||||
|
@ -61,7 +61,7 @@ func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) {
|
||||
// watch.Interface. resourceVersion may be used to specify what version to begin
|
||||
// watching (e.g., for reconnecting without missing any updates).
|
||||
func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
|
||||
w := newEtcdWatcher(true, exceptKey(key), filter, h.Codec, h.ResourceVersioner, nil)
|
||||
w := newEtcdWatcher(true, exceptKey(key), filter, h.Codec, h.Versioner, nil)
|
||||
go w.etcdWatch(h.Client, key, resourceVersion)
|
||||
return w, nil
|
||||
}
|
||||
@ -90,7 +90,7 @@ func (h *EtcdHelper) Watch(key string, resourceVersion uint64) watch.Interface {
|
||||
//
|
||||
// Errors will be sent down the channel.
|
||||
func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) watch.Interface {
|
||||
w := newEtcdWatcher(false, nil, Everything, h.Codec, h.ResourceVersioner, transform)
|
||||
w := newEtcdWatcher(false, nil, Everything, h.Codec, h.Versioner, transform)
|
||||
go w.etcdWatch(h.Client, key, resourceVersion)
|
||||
return w
|
||||
}
|
||||
@ -111,7 +111,7 @@ func exceptKey(except string) includeFunc {
|
||||
// etcdWatcher converts a native etcd watch to a watch.Interface.
|
||||
type etcdWatcher struct {
|
||||
encoding runtime.Codec
|
||||
versioner EtcdResourceVersioner
|
||||
versioner EtcdVersioner
|
||||
transform TransformFunc
|
||||
|
||||
list bool // If we're doing a recursive watch, should be true.
|
||||
@ -137,7 +137,7 @@ const watchWaitDuration = 100 * time.Millisecond
|
||||
|
||||
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform
|
||||
// and a versioner, the versioner must be able to handle the objects that transform creates.
|
||||
func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding runtime.Codec, versioner EtcdResourceVersioner, transform TransformFunc) *etcdWatcher {
|
||||
func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding runtime.Codec, versioner EtcdVersioner, transform TransformFunc) *etcdWatcher {
|
||||
w := &etcdWatcher{
|
||||
encoding: encoding,
|
||||
versioner: versioner,
|
||||
@ -240,16 +240,16 @@ func (w *etcdWatcher) translate() {
|
||||
}
|
||||
}
|
||||
|
||||
func (w *etcdWatcher) decodeObject(data []byte, index uint64) (runtime.Object, error) {
|
||||
obj, err := w.encoding.Decode(data)
|
||||
func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
|
||||
obj, err := w.encoding.Decode([]byte(node.Value))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// ensure resource version is set on the object we load from etcd
|
||||
if w.versioner != nil {
|
||||
if err := w.versioner.SetResourceVersion(obj, index); err != nil {
|
||||
glog.Errorf("failure to version api object (%d) %#v: %v", index, obj, err)
|
||||
if err := w.versioner.UpdateObject(obj, node); err != nil {
|
||||
glog.Errorf("failure to version api object (%d) %#v: %v", node.ModifiedIndex, obj, err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -273,10 +273,9 @@ func (w *etcdWatcher) sendAdd(res *etcd.Response) {
|
||||
if w.include != nil && !w.include(res.Node.Key) {
|
||||
return
|
||||
}
|
||||
data := []byte(res.Node.Value)
|
||||
obj, err := w.decodeObject(data, res.Node.ModifiedIndex)
|
||||
obj, err := w.decodeObject(res.Node)
|
||||
if err != nil {
|
||||
glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(data), res, res.Node)
|
||||
glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(res.Node.Value), res, res.Node)
|
||||
// TODO: expose an error through watch.Interface?
|
||||
// Ignore this value. If we stop the watch on a bad value, a client that uses
|
||||
// the resourceVersion to resume will never be able to get past a bad value.
|
||||
@ -303,10 +302,9 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) {
|
||||
if w.include != nil && !w.include(res.Node.Key) {
|
||||
return
|
||||
}
|
||||
curData := []byte(res.Node.Value)
|
||||
curObj, err := w.decodeObject(curData, res.Node.ModifiedIndex)
|
||||
curObj, err := w.decodeObject(res.Node)
|
||||
if err != nil {
|
||||
glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(curData), res, res.Node)
|
||||
glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(res.Node.Value), res, res.Node)
|
||||
// TODO: expose an error through watch.Interface?
|
||||
// Ignore this value. If we stop the watch on a bad value, a client that uses
|
||||
// the resourceVersion to resume will never be able to get past a bad value.
|
||||
@ -317,7 +315,7 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) {
|
||||
var oldObj runtime.Object
|
||||
if res.PrevNode != nil && res.PrevNode.Value != "" {
|
||||
// Ignore problems reading the old object.
|
||||
if oldObj, err = w.decodeObject([]byte(res.PrevNode.Value), res.PrevNode.ModifiedIndex); err == nil {
|
||||
if oldObj, err = w.decodeObject(res.PrevNode); err == nil {
|
||||
oldObjPasses = w.filter(oldObj)
|
||||
}
|
||||
}
|
||||
@ -352,17 +350,16 @@ func (w *etcdWatcher) sendDelete(res *etcd.Response) {
|
||||
if w.include != nil && !w.include(res.PrevNode.Key) {
|
||||
return
|
||||
}
|
||||
data := []byte(res.PrevNode.Value)
|
||||
index := res.PrevNode.ModifiedIndex
|
||||
node := *res.PrevNode
|
||||
if res.Node != nil {
|
||||
// Note that this sends the *old* object with the etcd index for the time at
|
||||
// which it gets deleted. This will allow users to restart the watch at the right
|
||||
// index.
|
||||
index = res.Node.ModifiedIndex
|
||||
node.ModifiedIndex = res.Node.ModifiedIndex
|
||||
}
|
||||
obj, err := w.decodeObject(data, index)
|
||||
obj, err := w.decodeObject(&node)
|
||||
if err != nil {
|
||||
glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(data), res, res.PrevNode)
|
||||
glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(res.PrevNode.Value), res, res.PrevNode)
|
||||
// TODO: expose an error through watch.Interface?
|
||||
// Ignore this value. If we stop the watch on a bad value, a client that uses
|
||||
// the resourceVersion to resume will never be able to get past a bad value.
|
@ -29,6 +29,8 @@ import (
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
)
|
||||
|
||||
var versioner = APIObjectVersioner{}
|
||||
|
||||
func TestWatchInterpretations(t *testing.T) {
|
||||
codec := latest.Codec
|
||||
// Declare some pods to make the test cases compact.
|
74
pkg/tools/etcd_object.go
Normal file
74
pkg/tools/etcd_object.go
Normal file
@ -0,0 +1,74 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package tools
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
)
|
||||
|
||||
// APIObjectVersioner implements versioning and extracting etcd node information
|
||||
// for objects that have an embedded ObjectMeta or ListMeta field.
|
||||
type APIObjectVersioner struct{}
|
||||
|
||||
// UpdateObject implements EtcdVersioner
|
||||
func (a APIObjectVersioner) UpdateObject(obj runtime.Object, node *etcd.Node) error {
|
||||
objectMeta, err := api.ObjectMetaFor(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
version := node.ModifiedIndex
|
||||
versionString := ""
|
||||
if version != 0 {
|
||||
versionString = strconv.FormatUint(version, 10)
|
||||
}
|
||||
objectMeta.ResourceVersion = versionString
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateList implements EtcdVersioner
|
||||
func (a APIObjectVersioner) UpdateList(obj runtime.Object, resourceVersion uint64) error {
|
||||
listMeta, err := api.ListMetaFor(obj)
|
||||
if err != nil || listMeta == nil {
|
||||
return err
|
||||
}
|
||||
versionString := ""
|
||||
if resourceVersion != 0 {
|
||||
versionString = strconv.FormatUint(resourceVersion, 10)
|
||||
}
|
||||
listMeta.ResourceVersion = versionString
|
||||
return nil
|
||||
}
|
||||
|
||||
// ObjectResourceVersion implements EtcdVersioner
|
||||
func (a APIObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
|
||||
meta, err := api.ObjectMetaFor(obj)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
version := meta.ResourceVersion
|
||||
if len(version) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
return strconv.ParseUint(version, 10, 64)
|
||||
}
|
||||
|
||||
// APIObjectVersioner implements EtcdVersioner
|
||||
var _ EtcdVersioner = APIObjectVersioner{}
|
@ -290,6 +290,10 @@ func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, err
|
||||
Index: f.ChangeIndex,
|
||||
}
|
||||
}
|
||||
if IsEtcdNotFound(existing.E) {
|
||||
f.DeletedKeys = append(f.DeletedKeys, key)
|
||||
return existing.R, existing.E
|
||||
}
|
||||
index := f.generateIndex()
|
||||
f.Data[key] = EtcdResponseWithError{
|
||||
R: &etcd.Response{},
|
||||
|
77
pkg/tools/interfaces.go
Normal file
77
pkg/tools/interfaces.go
Normal file
@ -0,0 +1,77 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package tools
|
||||
|
||||
import (
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
)
|
||||
|
||||
const (
|
||||
EtcdErrorCodeNotFound = 100
|
||||
EtcdErrorCodeTestFailed = 101
|
||||
EtcdErrorCodeNodeExist = 105
|
||||
EtcdErrorCodeValueRequired = 200
|
||||
)
|
||||
|
||||
var (
|
||||
EtcdErrorNotFound = &etcd.EtcdError{ErrorCode: EtcdErrorCodeNotFound}
|
||||
EtcdErrorTestFailed = &etcd.EtcdError{ErrorCode: EtcdErrorCodeTestFailed}
|
||||
EtcdErrorNodeExist = &etcd.EtcdError{ErrorCode: EtcdErrorCodeNodeExist}
|
||||
EtcdErrorValueRequired = &etcd.EtcdError{ErrorCode: EtcdErrorCodeValueRequired}
|
||||
)
|
||||
|
||||
// EtcdClient is an injectable interface for testing.
|
||||
type EtcdClient interface {
|
||||
GetCluster() []string
|
||||
AddChild(key, data string, ttl uint64) (*etcd.Response, error)
|
||||
Get(key string, sort, recursive bool) (*etcd.Response, error)
|
||||
Set(key, value string, ttl uint64) (*etcd.Response, error)
|
||||
Create(key, value string, ttl uint64) (*etcd.Response, error)
|
||||
CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error)
|
||||
Delete(key string, recursive bool) (*etcd.Response, error)
|
||||
// I'd like to use directional channels here (e.g. <-chan) but this interface mimics
|
||||
// the etcd client interface which doesn't, and it doesn't seem worth it to wrap the api.
|
||||
Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error)
|
||||
}
|
||||
|
||||
// EtcdGetSet interface exposes only the etcd operations needed by EtcdHelper.
|
||||
type EtcdGetSet interface {
|
||||
GetCluster() []string
|
||||
Get(key string, sort, recursive bool) (*etcd.Response, error)
|
||||
Set(key, value string, ttl uint64) (*etcd.Response, error)
|
||||
Create(key, value string, ttl uint64) (*etcd.Response, error)
|
||||
Delete(key string, recursive bool) (*etcd.Response, error)
|
||||
CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error)
|
||||
Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error)
|
||||
}
|
||||
|
||||
// EtcdVersioner abstracts setting and retrieving fields from the etcd response onto the object
|
||||
// or list.
|
||||
type EtcdVersioner interface {
|
||||
// UpdateObject sets etcd storage metadata into an API object. Returns an error if the object
|
||||
// cannot be updated correctly. May return nil if the requested object does not need metadata
|
||||
// from etcd.
|
||||
UpdateObject(obj runtime.Object, node *etcd.Node) error
|
||||
// UpdateList sets the resource version into an API list object. Returns an error if the object
|
||||
// cannot be updated correctly. May return nil if the requested object does not need metadata
|
||||
// from etcd.
|
||||
UpdateList(obj runtime.Object, resourceVersion uint64) error
|
||||
// ObjectResourceVersion returns the resource version (for persistence) of the specified object.
|
||||
// Should return an error if the specified object does not have a persistable version.
|
||||
ObjectResourceVersion(obj runtime.Object) (uint64, error)
|
||||
}
|
@ -93,7 +93,7 @@ func TestExtractObj(t *testing.T) {
|
||||
|
||||
func TestWatch(t *testing.T) {
|
||||
client := newEtcdClient()
|
||||
helper := tools.EtcdHelper{Client: client, Codec: latest.Codec, ResourceVersioner: tools.RuntimeVersionAdapter{latest.ResourceVersioner}}
|
||||
helper := tools.NewEtcdHelper(client, latest.Codec)
|
||||
withEtcdKey(func(key string) {
|
||||
resp, err := client.Set(key, runtime.EncodeOrDie(v1beta1.Codec, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0)
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user