Merge pull request #11878 from wojtek-t/refactor_etcd_watcher

Factor out etcdWatcher and rename methods in StorageInterface
This commit is contained in:
Mike Danese 2015-07-29 09:44:02 -07:00
commit e5c31da2ff
16 changed files with 97 additions and 114 deletions

View File

@ -163,7 +163,7 @@ resource type. However, this watch can potentially expire at any time and
reconnecting can return "too old resource version". In that case relisting is
necessary. In such case, to avoid LIST requests coming from all watchers at
the same time, we can introduce an additional etcd event type:
[EtcdResync](../../pkg/tools/etcd_helper_watch.go#L36)
[EtcdResync](../../pkg/tools/etcd_watcher.go#L36)
Whenever reslisting will be done to refresh the internal watch to etcd,
EtcdResync event will be send to all the watchers. It will contain the

View File

@ -96,7 +96,7 @@ func makeServiceKey(ctx api.Context, name string) (string, error) {
// ListServices obtains a list of Services.
func (r *Registry) ListServices(ctx api.Context) (*api.ServiceList, error) {
list := &api.ServiceList{}
err := r.ExtractToList(makeServiceListKey(ctx), list)
err := r.List(makeServiceListKey(ctx), list)
return list, err
}
@ -107,7 +107,7 @@ func (r *Registry) CreateService(ctx api.Context, svc *api.Service) (*api.Servic
return nil, err
}
out := &api.Service{}
err = r.CreateObj(key, svc, out, 0)
err = r.Create(key, svc, out, 0)
return out, etcderr.InterpretCreateError(err, "service", svc.Name)
}
@ -118,7 +118,7 @@ func (r *Registry) GetService(ctx api.Context, name string) (*api.Service, error
return nil, err
}
var svc api.Service
err = r.ExtractObj(key, &svc, false)
err = r.Get(key, &svc, false)
if err != nil {
return nil, etcderr.InterpretGetError(err, "service", name)
}
@ -131,7 +131,7 @@ func (r *Registry) DeleteService(ctx api.Context, name string) error {
if err != nil {
return err
}
err = r.Delete(key, true)
err = r.RecursiveDelete(key, true)
if err != nil {
return etcderr.InterpretDeleteError(err, "service", name)
}
@ -152,7 +152,7 @@ func (r *Registry) UpdateService(ctx api.Context, svc *api.Service) (*api.Servic
return nil, err
}
out := &api.Service{}
err = r.SetObj(key, svc, out, 0)
err = r.Set(key, svc, out, 0)
return out, etcderr.InterpretUpdateError(err, "service", svc.Name)
}

View File

@ -157,14 +157,14 @@ func (e *Etcd) ListPredicate(ctx api.Context, m generic.Matcher) (runtime.Object
if err != nil {
return nil, err
}
err = e.Storage.ExtractObjToList(key, list)
err = e.Storage.GetToList(key, list)
trace.Step("Object extracted")
if err != nil {
return nil, err
}
} else {
trace.Step("About to list directory")
err := e.Storage.ExtractToList(e.KeyRootFunc(ctx), list)
err := e.Storage.List(e.KeyRootFunc(ctx), list)
trace.Step("List extracted")
if err != nil {
return nil, err
@ -190,7 +190,7 @@ func (e *Etcd) CreateWithName(ctx api.Context, name string, obj runtime.Object)
if err != nil {
return err
}
err = e.Storage.CreateObj(key, obj, nil, ttl)
err = e.Storage.Create(key, obj, nil, ttl)
err = etcderr.InterpretCreateError(err, e.EndpointName, name)
if err == nil && e.Decorator != nil {
err = e.Decorator(obj)
@ -219,7 +219,7 @@ func (e *Etcd) Create(ctx api.Context, obj runtime.Object) (runtime.Object, erro
}
trace.Step("About to create object")
out := e.NewFunc()
if err := e.Storage.CreateObj(key, obj, out, ttl); err != nil {
if err := e.Storage.Create(key, obj, out, ttl); err != nil {
err = etcderr.InterpretCreateError(err, e.EndpointName, name)
err = rest.CheckGeneratedNameError(e.CreateStrategy, err, obj)
return nil, err
@ -249,7 +249,7 @@ func (e *Etcd) UpdateWithName(ctx api.Context, name string, obj runtime.Object)
if err != nil {
return err
}
err = e.Storage.SetObj(key, obj, nil, ttl)
err = e.Storage.Set(key, obj, nil, ttl)
err = etcderr.InterpretUpdateError(err, e.EndpointName, name)
if err == nil && e.Decorator != nil {
err = e.Decorator(obj)
@ -372,7 +372,7 @@ func (e *Etcd) Get(ctx api.Context, name string) (runtime.Object, error) {
return nil, err
}
trace.Step("About to read object")
if err := e.Storage.ExtractObj(key, obj, false); err != nil {
if err := e.Storage.Get(key, obj, false); err != nil {
return nil, etcderr.InterpretGetError(err, e.EndpointName, name)
}
trace.Step("Object read")
@ -395,7 +395,7 @@ func (e *Etcd) Delete(ctx api.Context, name string, options *api.DeleteOptions)
trace := util.NewTrace("Delete " + reflect.TypeOf(obj).String())
defer trace.LogIfLong(time.Second)
trace.Step("About to read object")
if err := e.Storage.ExtractObj(key, obj, false); err != nil {
if err := e.Storage.Get(key, obj, false); err != nil {
return nil, etcderr.InterpretDeleteError(err, e.EndpointName, name)
}
@ -413,7 +413,7 @@ func (e *Etcd) Delete(ctx api.Context, name string, options *api.DeleteOptions)
if graceful && *options.GracePeriodSeconds != 0 {
trace.Step("Graceful deletion")
out := e.NewFunc()
if err := e.Storage.SetObj(key, obj, out, uint64(*options.GracePeriodSeconds)); err != nil {
if err := e.Storage.Set(key, obj, out, uint64(*options.GracePeriodSeconds)); err != nil {
return nil, etcderr.InterpretUpdateError(err, e.EndpointName, name)
}
return e.finalizeDelete(out, true)
@ -422,7 +422,7 @@ func (e *Etcd) Delete(ctx api.Context, name string, options *api.DeleteOptions)
// delete immediately, or no graceful deletion supported
out := e.NewFunc()
trace.Step("About to delete object")
if err := e.Storage.DeleteObj(key, out); err != nil {
if err := e.Storage.Delete(key, out); err != nil {
return nil, etcderr.InterpretDeleteError(err, e.EndpointName, name)
}
return e.finalizeDelete(out, true)

View File

@ -108,7 +108,7 @@ func TestCreateSetsFields(t *testing.T) {
if err != nil {
t.Fatalf("unexpected key error: %v", err)
}
if err := etcdStorage.ExtractObj(key, actual, false); err != nil {
if err := etcdStorage.Get(key, actual, false); err != nil {
t.Fatalf("unexpected extraction error: %v", err)
}
if actual.Name != namespace.Name {

View File

@ -350,7 +350,7 @@ func TestEtcdUpdateStatus(t *testing.T) {
}
var pvOut api.PersistentVolume
key, _ = storage.KeyFunc(ctx, "foo")
if err := etcdStorage.ExtractObj(key, &pvOut, false); err != nil {
if err := etcdStorage.Get(key, &pvOut, false); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !api.Semantic.DeepEqual(expected, pvOut) {

View File

@ -357,7 +357,7 @@ func TestEtcdUpdateStatus(t *testing.T) {
}
var pvcOut api.PersistentVolumeClaim
key, _ = storage.KeyFunc(ctx, "foo")
if err := etcdStorage.ExtractObj(key, &pvcOut, false); err != nil {
if err := etcdStorage.Get(key, &pvcOut, false); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !api.Semantic.DeepEqual(expected, pvcOut) {

View File

@ -169,7 +169,7 @@ func TestCreateSetsFields(t *testing.T) {
ctx := api.NewDefaultContext()
key, _ := storage.Etcd.KeyFunc(ctx, "foo")
actual := &api.Pod{}
if err := etcdStorage.ExtractObj(key, actual, false); err != nil {
if err := etcdStorage.Get(key, actual, false); err != nil {
t.Fatalf("unexpected extraction error: %v", err)
}
if actual.Name != pod.Name {
@ -448,7 +448,7 @@ func TestCreatePod(t *testing.T) {
t.Fatalf("unexpected object: %#v", obj)
}
actual := &api.Pod{}
if err := etcdStorage.ExtractObj(key, actual, false); err != nil {
if err := etcdStorage.Get(key, actual, false); err != nil {
t.Fatalf("unexpected extraction error: %v", err)
}
if !api.HasObjectMetaSystemFieldValues(&actual.ObjectMeta) {
@ -1222,7 +1222,7 @@ func TestEtcdUpdateStatus(t *testing.T) {
}
var podOut api.Pod
key, _ = registry.KeyFunc(ctx, "foo")
if err := etcdStorage.ExtractObj(key, &podOut, false); err != nil {
if err := etcdStorage.Get(key, &podOut, false); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !api.Semantic.DeepEqual(expected, podOut) {

View File

@ -133,7 +133,7 @@ func TestCreateSetsFields(t *testing.T) {
actual := &api.ResourceQuota{}
key, _ := storage.Etcd.KeyFunc(ctx, "foo")
if err := etcdStorage.ExtractObj(key, actual, false); err != nil {
if err := etcdStorage.Get(key, actual, false); err != nil {
t.Fatalf("unexpected extraction error: %v", err)
}
if actual.Name != resourcequota.Name {
@ -517,7 +517,7 @@ func TestEtcdUpdateStatus(t *testing.T) {
}
var resourcequotaOut api.ResourceQuota
key, _ = registry.KeyFunc(ctx, "foo")
if err := etcdStorage.ExtractObj(key, &resourcequotaOut, false); err != nil {
if err := etcdStorage.Get(key, &resourcequotaOut, false); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !api.Semantic.DeepEqual(expected, resourcequotaOut) {

View File

@ -170,7 +170,7 @@ func (e *Etcd) Refresh() (*api.RangeAllocation, error) {
defer e.lock.Unlock()
existing := &api.RangeAllocation{}
if err := e.storage.ExtractObj(e.baseKey, existing, false); err != nil {
if err := e.storage.Get(e.baseKey, existing, false); err != nil {
if tools.IsEtcdNotFound(err) {
return nil, nil
}
@ -184,7 +184,7 @@ func (e *Etcd) Refresh() (*api.RangeAllocation, error) {
// etcd. If the key does not exist, the object will have an empty ResourceVersion.
func (e *Etcd) Get() (*api.RangeAllocation, error) {
existing := &api.RangeAllocation{}
if err := e.storage.ExtractObj(e.baseKey, existing, true); err != nil {
if err := e.storage.Get(e.baseKey, existing, true); err != nil {
return nil, etcderr.InterpretGetError(err, e.kind, "")
}
return existing, nil

View File

@ -101,7 +101,7 @@ func TestStore(t *testing.T) {
other := allocator.NewAllocationMap(100, "rangeSpecValue")
allocation := &api.RangeAllocation{}
if err := storage.storage.ExtractObj(key(), allocation, false); err != nil {
if err := storage.storage.Get(key(), allocation, false); err != nil {
t.Fatal(err)
}
if allocation.ResourceVersion != "1" {

View File

@ -29,6 +29,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools/metrics"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
@ -80,7 +81,7 @@ func (h *etcdHelper) Versioner() StorageVersioner {
}
// Implements StorageInterface.
func (h *etcdHelper) CreateObj(key string, obj, out runtime.Object, ttl uint64) error {
func (h *etcdHelper) Create(key string, obj, out runtime.Object, ttl uint64) error {
key = h.prefixEtcdKey(key)
data, err := h.codec.Encode(obj)
if err != nil {
@ -108,7 +109,7 @@ func (h *etcdHelper) CreateObj(key string, obj, out runtime.Object, ttl uint64)
}
// Implements StorageInterface.
func (h *etcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) error {
func (h *etcdHelper) Set(key string, obj, out runtime.Object, ttl uint64) error {
var response *etcd.Response
data, err := h.codec.Encode(obj)
if err != nil {
@ -149,7 +150,7 @@ func (h *etcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) err
}
// Implements StorageInterface.
func (h *etcdHelper) DeleteObj(key string, out runtime.Object) error {
func (h *etcdHelper) Delete(key string, out runtime.Object) error {
key = h.prefixEtcdKey(key)
if _, err := conversion.EnforcePtr(out); err != nil {
panic("unable to convert output object to pointer")
@ -168,7 +169,7 @@ func (h *etcdHelper) DeleteObj(key string, out runtime.Object) error {
}
// Implements StorageInterface.
func (h *etcdHelper) Delete(key string, recursive bool) error {
func (h *etcdHelper) RecursiveDelete(key string, recursive bool) error {
key = h.prefixEtcdKey(key)
startTime := time.Now()
_, err := h.client.Delete(key, recursive)
@ -177,7 +178,23 @@ func (h *etcdHelper) Delete(key string, recursive bool) error {
}
// Implements StorageInterface.
func (h *etcdHelper) ExtractObj(key string, objPtr runtime.Object, ignoreNotFound bool) error {
func (h *etcdHelper) Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
key = h.prefixEtcdKey(key)
w := newEtcdWatcher(false, nil, filter, h.codec, h.versioner, nil, h)
go w.etcdWatch(h.client, key, resourceVersion)
return w, nil
}
// Implements StorageInterface.
func (h *etcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
key = h.prefixEtcdKey(key)
w := newEtcdWatcher(true, exceptKey(key), filter, h.codec, h.versioner, nil, h)
go w.etcdWatch(h.client, key, resourceVersion)
return w, nil
}
// Implements StorageInterface.
func (h *etcdHelper) Get(key string, objPtr runtime.Object, ignoreNotFound bool) error {
key = h.prefixEtcdKey(key)
_, _, _, err := h.bodyAndExtractObj(key, objPtr, ignoreNotFound)
return err
@ -228,8 +245,8 @@ func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run
}
// Implements StorageInterface.
func (h *etcdHelper) ExtractObjToList(key string, listObj runtime.Object) error {
trace := util.NewTrace("ExtractObjToList " + getTypeName(listObj))
func (h *etcdHelper) GetToList(key string, listObj runtime.Object) error {
trace := util.NewTrace("GetToList " + getTypeName(listObj))
listPtr, err := runtime.GetItemsPtr(listObj)
if err != nil {
return err
@ -302,8 +319,8 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) er
}
// Implements StorageInterface.
func (h *etcdHelper) ExtractToList(key string, listObj runtime.Object) error {
trace := util.NewTrace("ExtractToList " + getTypeName(listObj))
func (h *etcdHelper) List(key string, listObj runtime.Object) error {
trace := util.NewTrace("List " + getTypeName(listObj))
defer trace.LogIfLong(time.Second)
listPtr, err := runtime.GetItemsPtr(listObj)
if err != nil {

View File

@ -89,7 +89,7 @@ func getEncodedPod(name string) string {
return string(pod)
}
func TestExtractToList(t *testing.T) {
func TestList(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
key := etcdtest.AddPrefix("/some/key")
@ -149,7 +149,7 @@ func TestExtractToList(t *testing.T) {
}
var got api.PodList
err := helper.ExtractToList("/some/key", &got)
err := helper.List("/some/key", &got)
if err != nil {
t.Errorf("Unexpected error %v", err)
}
@ -158,8 +158,8 @@ func TestExtractToList(t *testing.T) {
}
}
// TestExtractToListAcrossDirectories ensures that the client excludes directories and flattens tree-response - simulates cross-namespace query
func TestExtractToListAcrossDirectories(t *testing.T) {
// TestListAcrossDirectories ensures that the client excludes directories and flattens tree-response - simulates cross-namespace query
func TestListAcrossDirectories(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
key := etcdtest.AddPrefix("/some/key")
@ -233,7 +233,7 @@ func TestExtractToListAcrossDirectories(t *testing.T) {
}
var got api.PodList
err := helper.ExtractToList("/some/key", &got)
err := helper.List("/some/key", &got)
if err != nil {
t.Errorf("Unexpected error %v", err)
}
@ -242,7 +242,7 @@ func TestExtractToListAcrossDirectories(t *testing.T) {
}
}
func TestExtractToListExcludesDirectories(t *testing.T) {
func TestListExcludesDirectories(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
key := etcdtest.AddPrefix("/some/key")
@ -304,7 +304,7 @@ func TestExtractToListExcludesDirectories(t *testing.T) {
}
var got api.PodList
err := helper.ExtractToList("/some/key", &got)
err := helper.List("/some/key", &got)
if err != nil {
t.Errorf("Unexpected error %v", err)
}
@ -313,7 +313,7 @@ func TestExtractToListExcludesDirectories(t *testing.T) {
}
}
func TestExtractObj(t *testing.T) {
func TestGet(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
key := etcdtest.AddPrefix("/some/key")
@ -326,7 +326,7 @@ func TestExtractObj(t *testing.T) {
}
fakeClient.Set(key, runtime.EncodeOrDie(testapi.Codec(), &expect), 0)
var got api.Pod
err := helper.ExtractObj("/some/key", &got, false)
err := helper.Get("/some/key", &got, false)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
@ -335,7 +335,7 @@ func TestExtractObj(t *testing.T) {
}
}
func TestExtractObjNotFoundErr(t *testing.T) {
func TestGetNotFoundErr(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
key1 := etcdtest.AddPrefix("/some/key")
@ -363,11 +363,11 @@ func TestExtractObjNotFoundErr(t *testing.T) {
}
try := func(key string) {
var got api.Pod
err := helper.ExtractObj(key, &got, false)
err := helper.Get(key, &got, false)
if err == nil {
t.Errorf("%s: wanted error but didn't get one", key)
}
err = helper.ExtractObj(key, &got, true)
err = helper.Get(key, &got, true)
if err != nil {
t.Errorf("%s: didn't want error but got %#v", key, err)
}
@ -378,12 +378,12 @@ func TestExtractObjNotFoundErr(t *testing.T) {
try("/some/key3")
}
func TestCreateObj(t *testing.T) {
func TestCreate(t *testing.T) {
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
fakeClient := NewFakeEtcdClient(t)
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
returnedObj := &api.Pod{}
err := helper.CreateObj("/some/key", obj, returnedObj, 5)
err := helper.Create("/some/key", obj, returnedObj, 5)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
@ -404,22 +404,22 @@ func TestCreateObj(t *testing.T) {
}
}
func TestCreateObjNilOutParam(t *testing.T) {
func TestCreateNilOutParam(t *testing.T) {
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
fakeClient := NewFakeEtcdClient(t)
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
err := helper.CreateObj("/some/key", obj, nil, 5)
err := helper.Create("/some/key", obj, nil, 5)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
}
func TestSetObj(t *testing.T) {
func TestSet(t *testing.T) {
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
fakeClient := NewFakeEtcdClient(t)
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
returnedObj := &api.Pod{}
err := helper.SetObj("/some/key", obj, returnedObj, 5)
err := helper.Set("/some/key", obj, returnedObj, 5)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
@ -441,18 +441,18 @@ func TestSetObj(t *testing.T) {
}
}
func TestSetObjFailCAS(t *testing.T) {
func TestSetFailCAS(t *testing.T) {
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", ResourceVersion: "1"}}
fakeClient := NewFakeEtcdClient(t)
fakeClient.CasErr = fakeClient.NewError(123)
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
err := helper.SetObj("/some/key", obj, nil, 5)
err := helper.Set("/some/key", obj, nil, 5)
if err == nil {
t.Errorf("Expecting error.")
}
}
func TestSetObjWithVersion(t *testing.T) {
func TestSetWithVersion(t *testing.T) {
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", ResourceVersion: "1"}}
fakeClient := NewFakeEtcdClient(t)
fakeClient.TestIndex = true
@ -468,7 +468,7 @@ func TestSetObjWithVersion(t *testing.T) {
}
returnedObj := &api.Pod{}
err := helper.SetObj("/some/key", obj, returnedObj, 7)
err := helper.Set("/some/key", obj, returnedObj, 7)
if err != nil {
t.Fatalf("Unexpected error %#v", err)
}
@ -489,13 +489,13 @@ func TestSetObjWithVersion(t *testing.T) {
}
}
func TestSetObjWithoutResourceVersioner(t *testing.T) {
func TestSetWithoutResourceVersioner(t *testing.T) {
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
fakeClient := NewFakeEtcdClient(t)
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
helper.versioner = nil
returnedObj := &api.Pod{}
err := helper.SetObj("/some/key", obj, returnedObj, 3)
err := helper.Set("/some/key", obj, returnedObj, 3)
key := etcdtest.AddPrefix("/some/key")
if err != nil {
t.Errorf("Unexpected error %#v", err)
@ -517,12 +517,12 @@ func TestSetObjWithoutResourceVersioner(t *testing.T) {
}
}
func TestSetObjNilOutParam(t *testing.T) {
func TestSetNilOutParam(t *testing.T) {
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
fakeClient := NewFakeEtcdClient(t)
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
helper.versioner = nil
err := helper.SetObj("/some/key", obj, nil, 3)
err := helper.Set("/some/key", obj, nil, 3)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}

View File

@ -66,27 +66,6 @@ func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) {
return version + 1, nil
}
// WatchList begins watching the specified key's items. Items are decoded into
// API objects, and any items passing 'filter' are sent down the returned
// watch.Interface. resourceVersion may be used to specify what version to begin
// watching (e.g., for reconnecting without missing any updates).
func (h *etcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
key = h.prefixEtcdKey(key)
w := newEtcdWatcher(true, exceptKey(key), filter, h.codec, h.versioner, nil, h)
go w.etcdWatch(h.client, key, resourceVersion)
return w, nil
}
// Watch begins watching the specified key. Events are decoded into
// API objects and sent down the returned watch.Interface.
// Errors will be sent down the channel.
func (h *etcdHelper) Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
key = h.prefixEtcdKey(key)
w := newEtcdWatcher(false, nil, filter, h.codec, h.versioner, nil, h)
go w.etcdWatch(h.client, key, resourceVersion)
return w, nil
}
// TransformFunc attempts to convert an object to another object for use with a watcher.
type TransformFunc func(runtime.Object) (runtime.Object, error)

View File

@ -99,29 +99,22 @@ type StorageInterface interface {
// Returns StorageVersioner associated with this interface.
Versioner() StorageVersioner
// CreateObj adds a new object at a key unless it already exists. 'ttl' is time-to-live
// Create adds a new object at a key unless it already exists. 'ttl' is time-to-live
// in seconds (0 means forever). If no error is returned and out is not nil, out will be
// set to the read value from etcd.
//
// TODO(wojtekt): Rename to Create().
CreateObj(key string, obj, out runtime.Object, ttl uint64) error
Create(key string, obj, out runtime.Object, ttl uint64) error
// SetObj marshals obj via json and stores in etcd under key. Will do an atomic update
// Set marshals obj via json and stores in etcd under key. Will do an atomic update
// if obj's ResourceVersion field is set. 'ttl' is time-to-live in seconds (0 means forever).
// If no error is returned and out is not nil, out will be set to the read value from etcd.
//
// TODO(wojtekt): Rename to Set() (or Update?).
SetObj(key string, obj, out runtime.Object, ttl uint64) error
Set(key string, obj, out runtime.Object, ttl uint64) error
// DeleteObj removes the specified key and returns the value that existed at that spot.
//
// TODO(wojtekt): Rename to Delete().
DeleteObj(key string, out runtime.Object) error
// Delete removes the specified key and returns the value that existed at that spot.
Delete(key string, out runtime.Object) error
// Delete removes the specified key.
//
// TODO(wojtekt): Unify it with DeleteObj().
Delete(key string, recursive bool) error
// RecursiveDelete removes the specified key.
// TODO: Get rid of this method and use Delete() instead.
RecursiveDelete(key string, recursive bool) error
// Watch begins watching the specified key. Events are decoded into API objects,
// and any items passing 'filter' are sent down to returned watch.Interface.
@ -135,24 +128,18 @@ type StorageInterface interface {
// (e.g. reconnecting without missing any updates).
WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error)
// ExtractObj unmarshals json found at key into objPtr. On a not found error, will either
// Get unmarshals json found at key into objPtr. On a not found error, will either
// return a zero object of the requested type, or an error, depending on ignoreNotFound.
// Treats empty responses and nil response nodes exactly like a not found error.
//
// TODO(wojtekt): Rename to Get().
ExtractObj(key string, objPtr runtime.Object, ignoreNotFound bool) error
Get(key string, objPtr runtime.Object, ignoreNotFound bool) error
// ExtractObjToList unmarshals json found at key and opaque it into *List api object
// GetToList unmarshals json found at key and opaque it into *List api object
// (an object that satisfies the runtime.IsList definition).
//
// TODO(wojtekt): Rename to GetToList().
ExtractObjToList(key string, listObj runtime.Object) error
GetToList(key string, listObj runtime.Object) error
// ExtractToList unmarshalls jsons found at directory defined by key and opaque them
// List unmarshalls jsons found at directory defined by key and opaque them
// into *List api object (an object that satisfies runtime.IsList definition).
//
// TODO(wojtekt): Rename to List().
ExtractToList(key string, listObj runtime.Object) error
List(key string, listObj runtime.Object) error
// GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType')
// retrying the update until success if there is etcd index conflict.

View File

@ -31,12 +31,12 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/test/integration/framework"
)
func TestSetObj(t *testing.T) {
func TestSet(t *testing.T) {
client := framework.NewEtcdClient()
etcdStorage := tools.NewEtcdStorage(client, testapi.Codec(), "")
framework.WithEtcdKey(func(key string) {
testObject := api.ServiceAccount{ObjectMeta: api.ObjectMeta{Name: "foo"}}
if err := etcdStorage.SetObj(key, &testObject, nil, 0); err != nil {
if err := etcdStorage.Set(key, &testObject, nil, 0); err != nil {
t.Fatalf("unexpected error: %v", err)
}
resp, err := client.Get(key, false, false)
@ -54,7 +54,7 @@ func TestSetObj(t *testing.T) {
})
}
func TestExtractObj(t *testing.T) {
func TestGet(t *testing.T) {
client := framework.NewEtcdClient()
etcdStorage := tools.NewEtcdStorage(client, testapi.Codec(), "")
framework.WithEtcdKey(func(key string) {
@ -68,7 +68,7 @@ func TestExtractObj(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
result := api.ServiceAccount{}
if err := etcdStorage.ExtractObj(key, &result, false); err != nil {
if err := etcdStorage.Get(key, &result, false); err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Propagate ResourceVersion (it is set automatically).