New etcd client modifications part 1 (context support)

This commit plumbs contexts which are needed for the new client.
This commit is contained in:
Timothy St. Clair 2015-10-09 09:49:01 -05:00
parent 41a7f579ea
commit 2a2a2d79ff
13 changed files with 182 additions and 109 deletions

View File

@ -18,14 +18,32 @@ package api
import ( import (
stderrs "errors" stderrs "errors"
"time"
"golang.org/x/net/context" "golang.org/x/net/context"
"k8s.io/kubernetes/pkg/auth/user" "k8s.io/kubernetes/pkg/auth/user"
) )
// Context carries values across API boundaries. // Context carries values across API boundaries.
// This context matches the context.Context interface
// (https://blog.golang.org/context), for the purposes
// of passing the api.Context through to the storage tier.
// TODO: Determine the extent that this abstraction+interface
// is used by the api, and whether we can remove.
type Context interface { type Context interface {
// Value returns the value associated with key or nil if none.
Value(key interface{}) interface{} Value(key interface{}) interface{}
// Deadline returns the time when this Context will be canceled, if any.
Deadline() (deadline time.Time, ok bool)
// Done returns a channel that is closed when this Context is canceled
// or times out.
Done() <-chan struct{}
// Err indicates why this context was canceled, after the Done channel
// is closed.
Err() error
} }
// The key type is unexported to prevent collisions // The key type is unexported to prevent collisions

View File

@ -90,6 +90,7 @@ import (
"github.com/emicklei/go-restful/swagger" "github.com/emicklei/go-restful/swagger"
"github.com/golang/glog" "github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/registry/service/allocator" "k8s.io/kubernetes/pkg/registry/service/allocator"
"k8s.io/kubernetes/pkg/registry/service/portallocator" "k8s.io/kubernetes/pkg/registry/service/portallocator"
) )
@ -152,13 +153,13 @@ func (s *StorageDestinations) backends() []string {
backends := sets.String{} backends := sets.String{}
for _, group := range s.APIGroups { for _, group := range s.APIGroups {
if group.Default != nil { if group.Default != nil {
for _, backend := range group.Default.Backends() { for _, backend := range group.Default.Backends(context.TODO()) {
backends.Insert(backend) backends.Insert(backend)
} }
} }
if group.Overrides != nil { if group.Overrides != nil {
for _, storage := range group.Overrides { for _, storage := range group.Overrides {
for _, backend := range storage.Backends() { for _, backend := range storage.Backends(context.TODO()) {
backends.Insert(backend) backends.Insert(backend)
} }
} }

View File

@ -156,7 +156,7 @@ func (e *Etcd) ListPredicate(ctx api.Context, m generic.Matcher) (runtime.Object
if name, ok := m.MatchesSingle(); ok { if name, ok := m.MatchesSingle(); ok {
if key, err := e.KeyFunc(ctx, name); err == nil { if key, err := e.KeyFunc(ctx, name); err == nil {
trace.Step("About to read single object") trace.Step("About to read single object")
err := e.Storage.GetToList(key, filterFunc, list) err := e.Storage.GetToList(ctx, key, filterFunc, list)
trace.Step("Object extracted") trace.Step("Object extracted")
if err != nil { if err != nil {
return nil, err return nil, err
@ -167,7 +167,7 @@ func (e *Etcd) ListPredicate(ctx api.Context, m generic.Matcher) (runtime.Object
} }
trace.Step("About to list directory") trace.Step("About to list directory")
err := e.Storage.List(e.KeyRootFunc(ctx), filterFunc, list) err := e.Storage.List(ctx, e.KeyRootFunc(ctx), filterFunc, list)
trace.Step("List extracted") trace.Step("List extracted")
if err != nil { if err != nil {
return nil, err return nil, err
@ -196,7 +196,7 @@ func (e *Etcd) Create(ctx api.Context, obj runtime.Object) (runtime.Object, erro
} }
trace.Step("About to create object") trace.Step("About to create object")
out := e.NewFunc() out := e.NewFunc()
if err := e.Storage.Create(key, obj, out, ttl); err != nil { if err := e.Storage.Create(ctx, key, obj, out, ttl); err != nil {
err = etcderr.InterpretCreateError(err, e.EndpointName, name) err = etcderr.InterpretCreateError(err, e.EndpointName, name)
err = rest.CheckGeneratedNameError(e.CreateStrategy, err, obj) err = rest.CheckGeneratedNameError(e.CreateStrategy, err, obj)
return nil, err return nil, err
@ -240,7 +240,7 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool
// TODO: expose TTL // TODO: expose TTL
creating := false creating := false
out := e.NewFunc() out := e.NewFunc()
err = e.Storage.GuaranteedUpdate(key, out, true, func(existing runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { err = e.Storage.GuaranteedUpdate(ctx, key, out, true, func(existing runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
version, err := e.Storage.Versioner().ObjectResourceVersion(existing) version, err := e.Storage.Versioner().ObjectResourceVersion(existing)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
@ -330,7 +330,7 @@ func (e *Etcd) Get(ctx api.Context, name string) (runtime.Object, error) {
return nil, err return nil, err
} }
trace.Step("About to read object") trace.Step("About to read object")
if err := e.Storage.Get(key, obj, false); err != nil { if err := e.Storage.Get(ctx, key, obj, false); err != nil {
return nil, etcderr.InterpretGetError(err, e.EndpointName, name) return nil, etcderr.InterpretGetError(err, e.EndpointName, name)
} }
trace.Step("Object read") trace.Step("Object read")
@ -358,7 +358,7 @@ func (e *Etcd) Delete(ctx api.Context, name string, options *api.DeleteOptions)
trace := util.NewTrace("Delete " + reflect.TypeOf(obj).String()) trace := util.NewTrace("Delete " + reflect.TypeOf(obj).String())
defer trace.LogIfLong(time.Second) defer trace.LogIfLong(time.Second)
trace.Step("About to read object") trace.Step("About to read object")
if err := e.Storage.Get(key, obj, false); err != nil { if err := e.Storage.Get(ctx, key, obj, false); err != nil {
return nil, etcderr.InterpretDeleteError(err, e.EndpointName, name) return nil, etcderr.InterpretDeleteError(err, e.EndpointName, name)
} }
@ -378,7 +378,7 @@ func (e *Etcd) Delete(ctx api.Context, name string, options *api.DeleteOptions)
out := e.NewFunc() out := e.NewFunc()
lastGraceful := int64(0) lastGraceful := int64(0)
err := e.Storage.GuaranteedUpdate( err := e.Storage.GuaranteedUpdate(
key, out, false, ctx, key, out, false,
storage.SimpleUpdate(func(existing runtime.Object) (runtime.Object, error) { storage.SimpleUpdate(func(existing runtime.Object) (runtime.Object, error) {
graceful, pendingGraceful, err := rest.BeforeDelete(e.DeleteStrategy, ctx, existing, options) graceful, pendingGraceful, err := rest.BeforeDelete(e.DeleteStrategy, ctx, existing, options)
if err != nil { if err != nil {
@ -413,7 +413,7 @@ func (e *Etcd) Delete(ctx api.Context, name string, options *api.DeleteOptions)
// delete immediately, or no graceful deletion supported // delete immediately, or no graceful deletion supported
out := e.NewFunc() out := e.NewFunc()
trace.Step("About to delete object") trace.Step("About to delete object")
if err := e.Storage.Delete(key, out); err != nil { if err := e.Storage.Delete(ctx, key, out); err != nil {
return nil, etcderr.InterpretDeleteError(err, e.EndpointName, name) return nil, etcderr.InterpretDeleteError(err, e.EndpointName, name)
} }
return e.finalizeDelete(out, true) return e.finalizeDelete(out, true)
@ -457,12 +457,12 @@ func (e *Etcd) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersio
if err != nil { if err != nil {
return nil, err return nil, err
} }
return e.Storage.Watch(key, version, filterFunc) return e.Storage.Watch(ctx, key, version, filterFunc)
} }
// if we cannot extract a key based on the current context, the optimization is skipped // if we cannot extract a key based on the current context, the optimization is skipped
} }
return e.Storage.WatchList(e.KeyRootFunc(ctx), version, filterFunc) return e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), version, filterFunc)
} }
func (e *Etcd) filterAndDecorateFunction(m generic.Matcher) func(runtime.Object) bool { func (e *Etcd) filterAndDecorateFunction(m generic.Matcher) func(runtime.Object) bool {

View File

@ -160,7 +160,7 @@ func (r *BindingREST) setPodHostAndAnnotations(ctx api.Context, podID, oldMachin
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = r.store.Storage.GuaranteedUpdate(podKey, &api.Pod{}, false, storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { err = r.store.Storage.GuaranteedUpdate(ctx, podKey, &api.Pod{}, false, storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
pod, ok := obj.(*api.Pod) pod, ok := obj.(*api.Pod)
if !ok { if !ok {
return nil, fmt.Errorf("unexpected object: %#v", obj) return nil, fmt.Errorf("unexpected object: %#v", obj)

View File

@ -29,6 +29,8 @@ import (
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage" "k8s.io/kubernetes/pkg/storage"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"golang.org/x/net/context"
) )
var ( var (
@ -141,7 +143,7 @@ func (e *Etcd) Release(item int) error {
// tryUpdate performs a read-update to persist the latest snapshot state of allocation. // tryUpdate performs a read-update to persist the latest snapshot state of allocation.
func (e *Etcd) tryUpdate(fn func() error) error { func (e *Etcd) tryUpdate(fn func() error) error {
err := e.storage.GuaranteedUpdate(e.baseKey, &api.RangeAllocation{}, true, err := e.storage.GuaranteedUpdate(context.TODO(), e.baseKey, &api.RangeAllocation{}, true,
storage.SimpleUpdate(func(input runtime.Object) (output runtime.Object, err error) { storage.SimpleUpdate(func(input runtime.Object) (output runtime.Object, err error) {
existing := input.(*api.RangeAllocation) existing := input.(*api.RangeAllocation)
if len(existing.ResourceVersion) == 0 { if len(existing.ResourceVersion) == 0 {
@ -171,7 +173,7 @@ func (e *Etcd) Refresh() (*api.RangeAllocation, error) {
defer e.lock.Unlock() defer e.lock.Unlock()
existing := &api.RangeAllocation{} existing := &api.RangeAllocation{}
if err := e.storage.Get(e.baseKey, existing, false); err != nil { if err := e.storage.Get(context.TODO(), e.baseKey, existing, false); err != nil {
if etcdstorage.IsEtcdNotFound(err) { if etcdstorage.IsEtcdNotFound(err) {
return nil, nil return nil, nil
} }
@ -185,7 +187,7 @@ func (e *Etcd) Refresh() (*api.RangeAllocation, error) {
// etcd. If the key does not exist, the object will have an empty ResourceVersion. // etcd. If the key does not exist, the object will have an empty ResourceVersion.
func (e *Etcd) Get() (*api.RangeAllocation, error) { func (e *Etcd) Get() (*api.RangeAllocation, error) {
existing := &api.RangeAllocation{} existing := &api.RangeAllocation{}
if err := e.storage.Get(e.baseKey, existing, true); err != nil { if err := e.storage.Get(context.TODO(), e.baseKey, existing, true); err != nil {
return nil, etcderr.InterpretGetError(err, e.kind, "") return nil, etcderr.InterpretGetError(err, e.kind, "")
} }
return existing, nil return existing, nil
@ -198,7 +200,7 @@ func (e *Etcd) CreateOrUpdate(snapshot *api.RangeAllocation) error {
defer e.lock.Unlock() defer e.lock.Unlock()
last := "" last := ""
err := e.storage.GuaranteedUpdate(e.baseKey, &api.RangeAllocation{}, true, err := e.storage.GuaranteedUpdate(context.TODO(), e.baseKey, &api.RangeAllocation{}, true,
storage.SimpleUpdate(func(input runtime.Object) (output runtime.Object, err error) { storage.SimpleUpdate(func(input runtime.Object) (output runtime.Object, err error) {
existing := input.(*api.RangeAllocation) existing := input.(*api.RangeAllocation)
switch { switch {

View File

@ -27,6 +27,8 @@ import (
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/tools" "k8s.io/kubernetes/pkg/tools"
"k8s.io/kubernetes/pkg/tools/etcdtest" "k8s.io/kubernetes/pkg/tools/etcdtest"
"golang.org/x/net/context"
) )
func newStorage(t *testing.T) (*Etcd, *tools.FakeEtcdClient, allocator.Interface) { func newStorage(t *testing.T) (*Etcd, *tools.FakeEtcdClient, allocator.Interface) {
@ -84,7 +86,7 @@ func TestStore(t *testing.T) {
other := allocator.NewAllocationMap(100, "rangeSpecValue") other := allocator.NewAllocationMap(100, "rangeSpecValue")
allocation := &api.RangeAllocation{} allocation := &api.RangeAllocation{}
if err := storage.storage.Get(key(), allocation, false); err != nil { if err := storage.storage.Get(context.TODO(), key(), allocation, false); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if allocation.ResourceVersion != "2" { if allocation.ResourceVersion != "2" {

View File

@ -31,6 +31,7 @@ import (
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog" "github.com/golang/glog"
"golang.org/x/net/context"
) )
// CacherConfig contains the configuration for a given Cache. // CacherConfig contains the configuration for a given Cache.
@ -152,8 +153,8 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
} }
// Implements storage.Interface. // Implements storage.Interface.
func (c *Cacher) Backends() []string { func (c *Cacher) Backends(ctx context.Context) []string {
return c.storage.Backends() return c.storage.Backends(ctx)
} }
// Implements storage.Interface. // Implements storage.Interface.
@ -162,22 +163,22 @@ func (c *Cacher) Versioner() Versioner {
} }
// Implements storage.Interface. // Implements storage.Interface.
func (c *Cacher) Create(key string, obj, out runtime.Object, ttl uint64) error { func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
return c.storage.Create(key, obj, out, ttl) return c.storage.Create(ctx, key, obj, out, ttl)
} }
// Implements storage.Interface. // Implements storage.Interface.
func (c *Cacher) Set(key string, obj, out runtime.Object, ttl uint64) error { func (c *Cacher) Set(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
return c.storage.Set(key, obj, out, ttl) return c.storage.Set(ctx, key, obj, out, ttl)
} }
// Implements storage.Interface. // Implements storage.Interface.
func (c *Cacher) Delete(key string, out runtime.Object) error { func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object) error {
return c.storage.Delete(key, out) return c.storage.Delete(ctx, key, out)
} }
// Implements storage.Interface. // Implements storage.Interface.
func (c *Cacher) Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) { func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
// Do NOT allow Watch to start when the underlying structures are not propagated. // Do NOT allow Watch to start when the underlying structures are not propagated.
c.usable.RLock() c.usable.RLock()
defer c.usable.RUnlock() defer c.usable.RUnlock()
@ -203,23 +204,23 @@ func (c *Cacher) Watch(key string, resourceVersion uint64, filter FilterFunc) (w
} }
// Implements storage.Interface. // Implements storage.Interface.
func (c *Cacher) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) { func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
return c.Watch(key, resourceVersion, filter) return c.Watch(ctx, key, resourceVersion, filter)
} }
// Implements storage.Interface. // Implements storage.Interface.
func (c *Cacher) Get(key string, objPtr runtime.Object, ignoreNotFound bool) error { func (c *Cacher) Get(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) error {
return c.storage.Get(key, objPtr, ignoreNotFound) return c.storage.Get(ctx, key, objPtr, ignoreNotFound)
} }
// Implements storage.Interface. // Implements storage.Interface.
func (c *Cacher) GetToList(key string, filter FilterFunc, listObj runtime.Object) error { func (c *Cacher) GetToList(ctx context.Context, key string, filter FilterFunc, listObj runtime.Object) error {
return c.storage.GetToList(key, filter, listObj) return c.storage.GetToList(ctx, key, filter, listObj)
} }
// Implements storage.Interface. // Implements storage.Interface.
func (c *Cacher) List(key string, filter FilterFunc, listObj runtime.Object) error { func (c *Cacher) List(ctx context.Context, key string, filter FilterFunc, listObj runtime.Object) error {
return c.storage.List(key, filter, listObj) return c.storage.List(ctx, key, filter, listObj)
} }
// ListFromMemory implements list operation (the same signature as List method) // ListFromMemory implements list operation (the same signature as List method)
@ -263,8 +264,8 @@ func (c *Cacher) ListFromMemory(key string, listObj runtime.Object) error {
} }
// Implements storage.Interface. // Implements storage.Interface.
func (c *Cacher) GuaranteedUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate UpdateFunc) error { func (c *Cacher) GuaranteedUpdate(ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate UpdateFunc) error {
return c.storage.GuaranteedUpdate(key, ptrToType, ignoreNotFound, tryUpdate) return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, tryUpdate)
} }
// Implements storage.Interface. // Implements storage.Interface.
@ -343,7 +344,7 @@ func newCacherListerWatcher(storage Interface, resourcePrefix string, newListFun
// Implements cache.ListerWatcher interface. // Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) List() (runtime.Object, error) { func (lw *cacherListerWatcher) List() (runtime.Object, error) {
list := lw.newListFunc() list := lw.newListFunc()
if err := lw.storage.List(lw.resourcePrefix, Everything, list); err != nil { if err := lw.storage.List(context.TODO(), lw.resourcePrefix, Everything, list); err != nil {
return nil, err return nil, err
} }
return list, nil return list, nil
@ -355,7 +356,7 @@ func (lw *cacherListerWatcher) Watch(resourceVersion string) (watch.Interface, e
if err != nil { if err != nil {
return nil, err return nil, err
} }
return lw.storage.WatchList(lw.resourcePrefix, version, Everything) return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, version, Everything)
} }
// cacherWatch implements watch.Interface // cacherWatch implements watch.Interface

View File

@ -39,6 +39,8 @@ import (
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
"golang.org/x/net/context"
) )
func newTestCacher(client tools.EtcdClient) *storage.Cacher { func newTestCacher(client tools.EtcdClient) *storage.Cacher {
@ -250,7 +252,7 @@ func TestWatch(t *testing.T) {
} }
// Set up Watch for object "podFoo". // Set up Watch for object "podFoo".
watcher, err := cacher.Watch("pods/ns/foo", 2, storage.Everything) watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", 2, storage.Everything)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
@ -273,13 +275,13 @@ func TestWatch(t *testing.T) {
} }
// Check whether we get too-old error. // Check whether we get too-old error.
_, err = cacher.Watch("pods/ns/foo", 1, storage.Everything) _, err = cacher.Watch(context.TODO(), "pods/ns/foo", 1, storage.Everything)
if err == nil { if err == nil {
t.Errorf("exepcted 'error too old' error") t.Errorf("exepcted 'error too old' error")
} }
// Now test watch with initial state. // Now test watch with initial state.
initialWatcher, err := cacher.Watch("pods/ns/foo", 2, storage.Everything) initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", 2, storage.Everything)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
@ -300,7 +302,7 @@ func TestWatch(t *testing.T) {
} }
// Now test watch from "now". // Now test watch from "now".
nowWatcher, err := cacher.Watch("pods/ns/foo", 0, storage.Everything) nowWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", 0, storage.Everything)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
@ -450,7 +452,7 @@ func TestFiltering(t *testing.T) {
} }
return selector.Matches(labels.Set(metadata.Labels())) return selector.Matches(labels.Set(metadata.Labels()))
} }
watcher, err := cacher.Watch("pods/ns/foo", 1, filter) watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", 1, filter)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
@ -485,7 +487,7 @@ func TestStorageError(t *testing.T) {
podFoo := makeTestPod("foo") podFoo := makeTestPod("foo")
// Set up Watch for object "podFoo". // Set up Watch for object "podFoo".
watcher, err := cacher.Watch("pods/ns/foo", 1, storage.Everything) watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", 1, storage.Everything)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }

View File

@ -35,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog" "github.com/golang/glog"
"golang.org/x/net/context"
) )
func NewEtcdStorage(client tools.EtcdClient, codec runtime.Codec, prefix string) storage.Interface { func NewEtcdStorage(client tools.EtcdClient, codec runtime.Codec, prefix string) storage.Interface {
@ -78,7 +79,10 @@ func (h *etcdHelper) Codec() runtime.Codec {
} }
// Implements storage.Interface. // Implements storage.Interface.
func (h *etcdHelper) Backends() []string { func (h *etcdHelper) Backends(ctx context.Context) []string {
if ctx == nil {
glog.Errorf("Context is nil")
}
return h.client.GetCluster() return h.client.GetCluster()
} }
@ -88,7 +92,10 @@ func (h *etcdHelper) Versioner() storage.Versioner {
} }
// Implements storage.Interface. // Implements storage.Interface.
func (h *etcdHelper) Create(key string, obj, out runtime.Object, ttl uint64) error { func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
if ctx == nil {
glog.Errorf("Context is nil")
}
key = h.prefixEtcdKey(key) key = h.prefixEtcdKey(key)
data, err := h.codec.Encode(obj) data, err := h.codec.Encode(obj)
if err != nil { if err != nil {
@ -116,7 +123,10 @@ func (h *etcdHelper) Create(key string, obj, out runtime.Object, ttl uint64) err
} }
// Implements storage.Interface. // Implements storage.Interface.
func (h *etcdHelper) Set(key string, obj, out runtime.Object, ttl uint64) error { func (h *etcdHelper) Set(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
if ctx == nil {
glog.Errorf("Context is nil")
}
var response *etcd.Response var response *etcd.Response
data, err := h.codec.Encode(obj) data, err := h.codec.Encode(obj)
if err != nil { if err != nil {
@ -157,7 +167,10 @@ func (h *etcdHelper) Set(key string, obj, out runtime.Object, ttl uint64) error
} }
// Implements storage.Interface. // Implements storage.Interface.
func (h *etcdHelper) Delete(key string, out runtime.Object) error { func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object) error {
if ctx == nil {
glog.Errorf("Context is nil")
}
key = h.prefixEtcdKey(key) key = h.prefixEtcdKey(key)
if _, err := conversion.EnforcePtr(out); err != nil { if _, err := conversion.EnforcePtr(out); err != nil {
panic("unable to convert output object to pointer") panic("unable to convert output object to pointer")
@ -176,7 +189,10 @@ func (h *etcdHelper) Delete(key string, out runtime.Object) error {
} }
// Implements storage.Interface. // Implements storage.Interface.
func (h *etcdHelper) Watch(key string, resourceVersion uint64, filter storage.FilterFunc) (watch.Interface, error) { func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion uint64, filter storage.FilterFunc) (watch.Interface, error) {
if ctx == nil {
glog.Errorf("Context is nil")
}
key = h.prefixEtcdKey(key) key = h.prefixEtcdKey(key)
w := newEtcdWatcher(false, nil, filter, h.codec, h.versioner, nil, h) w := newEtcdWatcher(false, nil, filter, h.codec, h.versioner, nil, h)
go w.etcdWatch(h.client, key, resourceVersion) go w.etcdWatch(h.client, key, resourceVersion)
@ -184,7 +200,10 @@ func (h *etcdHelper) Watch(key string, resourceVersion uint64, filter storage.Fi
} }
// Implements storage.Interface. // Implements storage.Interface.
func (h *etcdHelper) WatchList(key string, resourceVersion uint64, filter storage.FilterFunc) (watch.Interface, error) { func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion uint64, filter storage.FilterFunc) (watch.Interface, error) {
if ctx == nil {
glog.Errorf("Context is nil")
}
key = h.prefixEtcdKey(key) key = h.prefixEtcdKey(key)
w := newEtcdWatcher(true, exceptKey(key), filter, h.codec, h.versioner, nil, h) w := newEtcdWatcher(true, exceptKey(key), filter, h.codec, h.versioner, nil, h)
go w.etcdWatch(h.client, key, resourceVersion) go w.etcdWatch(h.client, key, resourceVersion)
@ -192,15 +211,21 @@ func (h *etcdHelper) WatchList(key string, resourceVersion uint64, filter storag
} }
// Implements storage.Interface. // Implements storage.Interface.
func (h *etcdHelper) Get(key string, objPtr runtime.Object, ignoreNotFound bool) error { func (h *etcdHelper) Get(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) error {
if ctx == nil {
glog.Errorf("Context is nil")
}
key = h.prefixEtcdKey(key) key = h.prefixEtcdKey(key)
_, _, _, err := h.bodyAndExtractObj(key, objPtr, ignoreNotFound) _, _, _, err := h.bodyAndExtractObj(ctx, key, objPtr, ignoreNotFound)
return err return err
} }
// bodyAndExtractObj performs the normal Get path to etcd, returning the parsed node and response for additional information // bodyAndExtractObj performs the normal Get path to etcd, returning the parsed node and response for additional information
// about the response, like the current etcd index and the ttl. // about the response, like the current etcd index and the ttl.
func (h *etcdHelper) bodyAndExtractObj(key string, objPtr runtime.Object, ignoreNotFound bool) (body string, node *etcd.Node, res *etcd.Response, err error) { func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) (body string, node *etcd.Node, res *etcd.Response, err error) {
if ctx == nil {
glog.Errorf("Context is nil")
}
startTime := time.Now() startTime := time.Now()
response, err := h.client.Get(key, false, false) response, err := h.client.Get(key, false, false)
metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime) metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime)
@ -243,7 +268,10 @@ func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run
} }
// Implements storage.Interface. // Implements storage.Interface.
func (h *etcdHelper) GetToList(key string, filter storage.FilterFunc, listObj runtime.Object) error { func (h *etcdHelper) GetToList(ctx context.Context, key string, filter storage.FilterFunc, listObj runtime.Object) error {
if ctx == nil {
glog.Errorf("Context is nil")
}
trace := util.NewTrace("GetToList " + getTypeName(listObj)) trace := util.NewTrace("GetToList " + getTypeName(listObj))
listPtr, err := runtime.GetItemsPtr(listObj) listPtr, err := runtime.GetItemsPtr(listObj)
if err != nil { if err != nil {
@ -321,7 +349,10 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFun
} }
// Implements storage.Interface. // Implements storage.Interface.
func (h *etcdHelper) List(key string, filter storage.FilterFunc, listObj runtime.Object) error { func (h *etcdHelper) List(ctx context.Context, key string, filter storage.FilterFunc, listObj runtime.Object) error {
if ctx == nil {
glog.Errorf("Context is nil")
}
trace := util.NewTrace("List " + getTypeName(listObj)) trace := util.NewTrace("List " + getTypeName(listObj))
defer trace.LogIfLong(time.Second) defer trace.LogIfLong(time.Second)
listPtr, err := runtime.GetItemsPtr(listObj) listPtr, err := runtime.GetItemsPtr(listObj)
@ -331,7 +362,7 @@ func (h *etcdHelper) List(key string, filter storage.FilterFunc, listObj runtime
key = h.prefixEtcdKey(key) key = h.prefixEtcdKey(key)
startTime := time.Now() startTime := time.Now()
trace.Step("About to list etcd node") trace.Step("About to list etcd node")
nodes, index, err := h.listEtcdNode(key) nodes, index, err := h.listEtcdNode(ctx, key)
metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime) metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime)
trace.Step("Etcd node listed") trace.Step("Etcd node listed")
if err != nil { if err != nil {
@ -349,7 +380,10 @@ func (h *etcdHelper) List(key string, filter storage.FilterFunc, listObj runtime
return nil return nil
} }
func (h *etcdHelper) listEtcdNode(key string) ([]*etcd.Node, uint64, error) { func (h *etcdHelper) listEtcdNode(ctx context.Context, key string) ([]*etcd.Node, uint64, error) {
if ctx == nil {
glog.Errorf("Context is nil")
}
result, err := h.client.Get(key, true, true) result, err := h.client.Get(key, true, true)
if err != nil { if err != nil {
index, ok := etcdErrorIndex(err) index, ok := etcdErrorIndex(err)
@ -367,7 +401,10 @@ func (h *etcdHelper) listEtcdNode(key string) ([]*etcd.Node, uint64, error) {
} }
// Implements storage.Interface. // Implements storage.Interface.
func (h *etcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate storage.UpdateFunc) error { func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate storage.UpdateFunc) error {
if ctx == nil {
glog.Errorf("Context is nil")
}
v, err := conversion.EnforcePtr(ptrToType) v, err := conversion.EnforcePtr(ptrToType)
if err != nil { if err != nil {
// Panic is appropriate, because this is a programming error. // Panic is appropriate, because this is a programming error.
@ -376,7 +413,7 @@ func (h *etcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, igno
key = h.prefixEtcdKey(key) key = h.prefixEtcdKey(key)
for { for {
obj := reflect.New(v.Type()).Interface().(runtime.Object) obj := reflect.New(v.Type()).Interface().(runtime.Object)
origBody, node, res, err := h.bodyAndExtractObj(key, obj, ignoreNotFound) origBody, node, res, err := h.bodyAndExtractObj(ctx, key, obj, ignoreNotFound)
if err != nil { if err != nil {
return err return err
} }

View File

@ -32,6 +32,7 @@ import (
"github.com/coreos/go-etcd/etcd" "github.com/coreos/go-etcd/etcd"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/testapi"
apitesting "k8s.io/kubernetes/pkg/api/testing" apitesting "k8s.io/kubernetes/pkg/api/testing"
@ -144,7 +145,7 @@ func TestList(t *testing.T) {
} }
var got api.PodList var got api.PodList
err := helper.List("/some/key", storage.Everything, &got) err := helper.List(context.TODO(), "/some/key", storage.Everything, &got)
if err != nil { if err != nil {
t.Errorf("Unexpected error %v", err) t.Errorf("Unexpected error %v", err)
} }
@ -201,7 +202,7 @@ func TestListFiltered(t *testing.T) {
} }
var got api.PodList var got api.PodList
err := helper.List("/some/key", filter, &got) err := helper.List(context.TODO(), "/some/key", filter, &got)
if err != nil { if err != nil {
t.Errorf("Unexpected error %v", err) t.Errorf("Unexpected error %v", err)
} }
@ -276,7 +277,7 @@ func TestListAcrossDirectories(t *testing.T) {
} }
var got api.PodList var got api.PodList
err := helper.List("/some/key", storage.Everything, &got) err := helper.List(context.TODO(), "/some/key", storage.Everything, &got)
if err != nil { if err != nil {
t.Errorf("Unexpected error %v", err) t.Errorf("Unexpected error %v", err)
} }
@ -338,7 +339,7 @@ func TestListExcludesDirectories(t *testing.T) {
} }
var got api.PodList var got api.PodList
err := helper.List("/some/key", storage.Everything, &got) err := helper.List(context.TODO(), "/some/key", storage.Everything, &got)
if err != nil { if err != nil {
t.Errorf("Unexpected error %v", err) t.Errorf("Unexpected error %v", err)
} }
@ -357,7 +358,7 @@ func TestGet(t *testing.T) {
} }
fakeClient.Set(key, runtime.EncodeOrDie(testapi.Default.Codec(), &expect), 0) fakeClient.Set(key, runtime.EncodeOrDie(testapi.Default.Codec(), &expect), 0)
var got api.Pod var got api.Pod
err := helper.Get("/some/key", &got, false) err := helper.Get(context.TODO(), "/some/key", &got, false)
if err != nil { if err != nil {
t.Errorf("Unexpected error %#v", err) t.Errorf("Unexpected error %#v", err)
} }
@ -394,11 +395,11 @@ func TestGetNotFoundErr(t *testing.T) {
} }
try := func(key string) { try := func(key string) {
var got api.Pod var got api.Pod
err := helper.Get(key, &got, false) err := helper.Get(context.TODO(), key, &got, false)
if err == nil { if err == nil {
t.Errorf("%s: wanted error but didn't get one", key) t.Errorf("%s: wanted error but didn't get one", key)
} }
err = helper.Get(key, &got, true) err = helper.Get(context.TODO(), key, &got, true)
if err != nil { if err != nil {
t.Errorf("%s: didn't want error but got %#v", key, err) t.Errorf("%s: didn't want error but got %#v", key, err)
} }
@ -414,7 +415,7 @@ func TestCreate(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
helper := newEtcdHelper(fakeClient, testapi.Default.Codec(), etcdtest.PathPrefix()) helper := newEtcdHelper(fakeClient, testapi.Default.Codec(), etcdtest.PathPrefix())
returnedObj := &api.Pod{} returnedObj := &api.Pod{}
err := helper.Create("/some/key", obj, returnedObj, 5) err := helper.Create(context.TODO(), "/some/key", obj, returnedObj, 5)
if err != nil { if err != nil {
t.Errorf("Unexpected error %#v", err) t.Errorf("Unexpected error %#v", err)
} }
@ -439,7 +440,7 @@ func TestCreateNilOutParam(t *testing.T) {
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
helper := newEtcdHelper(fakeClient, testapi.Default.Codec(), etcdtest.PathPrefix()) helper := newEtcdHelper(fakeClient, testapi.Default.Codec(), etcdtest.PathPrefix())
err := helper.Create("/some/key", obj, nil, 5) err := helper.Create(context.TODO(), "/some/key", obj, nil, 5)
if err != nil { if err != nil {
t.Errorf("Unexpected error %#v", err) t.Errorf("Unexpected error %#v", err)
} }
@ -450,7 +451,7 @@ func TestSet(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
helper := newEtcdHelper(fakeClient, testapi.Default.Codec(), etcdtest.PathPrefix()) helper := newEtcdHelper(fakeClient, testapi.Default.Codec(), etcdtest.PathPrefix())
returnedObj := &api.Pod{} returnedObj := &api.Pod{}
err := helper.Set("/some/key", obj, returnedObj, 5) err := helper.Set(context.TODO(), "/some/key", obj, returnedObj, 5)
if err != nil { if err != nil {
t.Errorf("Unexpected error %#v", err) t.Errorf("Unexpected error %#v", err)
} }
@ -477,7 +478,7 @@ func TestSetFailCAS(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.CasErr = fakeClient.NewError(123) fakeClient.CasErr = fakeClient.NewError(123)
helper := newEtcdHelper(fakeClient, testapi.Default.Codec(), etcdtest.PathPrefix()) helper := newEtcdHelper(fakeClient, testapi.Default.Codec(), etcdtest.PathPrefix())
err := helper.Set("/some/key", obj, nil, 5) err := helper.Set(context.TODO(), "/some/key", obj, nil, 5)
if err == nil { if err == nil {
t.Errorf("Expecting error.") t.Errorf("Expecting error.")
} }
@ -499,7 +500,7 @@ func TestSetWithVersion(t *testing.T) {
} }
returnedObj := &api.Pod{} returnedObj := &api.Pod{}
err := helper.Set("/some/key", obj, returnedObj, 7) err := helper.Set(context.TODO(), "/some/key", obj, returnedObj, 7)
if err != nil { if err != nil {
t.Fatalf("Unexpected error %#v", err) t.Fatalf("Unexpected error %#v", err)
} }
@ -526,7 +527,7 @@ func TestSetWithoutResourceVersioner(t *testing.T) {
helper := newEtcdHelper(fakeClient, testapi.Default.Codec(), etcdtest.PathPrefix()) helper := newEtcdHelper(fakeClient, testapi.Default.Codec(), etcdtest.PathPrefix())
helper.versioner = nil helper.versioner = nil
returnedObj := &api.Pod{} returnedObj := &api.Pod{}
err := helper.Set("/some/key", obj, returnedObj, 3) err := helper.Set(context.TODO(), "/some/key", obj, returnedObj, 3)
key := etcdtest.AddPrefix("/some/key") key := etcdtest.AddPrefix("/some/key")
if err != nil { if err != nil {
t.Errorf("Unexpected error %#v", err) t.Errorf("Unexpected error %#v", err)
@ -553,7 +554,7 @@ func TestSetNilOutParam(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
helper := newEtcdHelper(fakeClient, testapi.Default.Codec(), etcdtest.PathPrefix()) helper := newEtcdHelper(fakeClient, testapi.Default.Codec(), etcdtest.PathPrefix())
helper.versioner = nil helper.versioner = nil
err := helper.Set("/some/key", obj, nil, 3) err := helper.Set(context.TODO(), "/some/key", obj, nil, 3)
if err != nil { if err != nil {
t.Errorf("Unexpected error %#v", err) t.Errorf("Unexpected error %#v", err)
} }
@ -568,7 +569,7 @@ func TestGuaranteedUpdate(t *testing.T) {
// Create a new node. // Create a new node.
fakeClient.ExpectNotFoundGet(key) fakeClient.ExpectNotFoundGet(key)
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1} obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { err := helper.GuaranteedUpdate(context.TODO(), "/some/key", &TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
return obj, nil return obj, nil
})) }))
if err != nil { if err != nil {
@ -587,7 +588,7 @@ func TestGuaranteedUpdate(t *testing.T) {
// Update an existing node. // Update an existing node.
callbackCalled := false callbackCalled := false
objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 2} objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 2}
err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { err = helper.GuaranteedUpdate(context.TODO(), "/some/key", &TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
callbackCalled = true callbackCalled = true
if in.(*TestResource).Value != 1 { if in.(*TestResource).Value != 1 {
@ -623,7 +624,7 @@ func TestGuaranteedUpdateTTL(t *testing.T) {
// Create a new node. // Create a new node.
fakeClient.ExpectNotFoundGet(key) fakeClient.ExpectNotFoundGet(key)
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1} obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { err := helper.GuaranteedUpdate(context.TODO(), "/some/key", &TestResource{}, true, func(in runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
if res.TTL != 0 { if res.TTL != 0 {
t.Fatalf("unexpected response meta: %#v", res) t.Fatalf("unexpected response meta: %#v", res)
} }
@ -649,7 +650,7 @@ func TestGuaranteedUpdateTTL(t *testing.T) {
// Update an existing node. // Update an existing node.
callbackCalled := false callbackCalled := false
objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 2} objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 2}
err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { err = helper.GuaranteedUpdate(context.TODO(), "/some/key", &TestResource{}, true, func(in runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
if res.TTL != 10 { if res.TTL != 10 {
t.Fatalf("unexpected response meta: %#v", res) t.Fatalf("unexpected response meta: %#v", res)
} }
@ -681,7 +682,7 @@ func TestGuaranteedUpdateTTL(t *testing.T) {
// Update an existing node and change ttl // Update an existing node and change ttl
callbackCalled = false callbackCalled = false
objUpdate = &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 3} objUpdate = &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 3}
err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { err = helper.GuaranteedUpdate(context.TODO(), "/some/key", &TestResource{}, true, func(in runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
if res.TTL != 10 { if res.TTL != 10 {
t.Fatalf("unexpected response meta: %#v", res) t.Fatalf("unexpected response meta: %#v", res)
} }
@ -724,7 +725,7 @@ func TestGuaranteedUpdateNoChange(t *testing.T) {
// Create a new node. // Create a new node.
fakeClient.ExpectNotFoundGet(key) fakeClient.ExpectNotFoundGet(key)
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1} obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { err := helper.GuaranteedUpdate(context.TODO(), "/some/key", &TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
return obj, nil return obj, nil
})) }))
if err != nil { if err != nil {
@ -734,7 +735,7 @@ func TestGuaranteedUpdateNoChange(t *testing.T) {
// Update an existing node with the same data // Update an existing node with the same data
callbackCalled := false callbackCalled := false
objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1} objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { err = helper.GuaranteedUpdate(context.TODO(), "/some/key", &TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
fakeClient.Err = errors.New("should not be called") fakeClient.Err = errors.New("should not be called")
callbackCalled = true callbackCalled = true
return objUpdate, nil return objUpdate, nil
@ -762,13 +763,13 @@ func TestGuaranteedUpdateKeyNotFound(t *testing.T) {
}) })
ignoreNotFound := false ignoreNotFound := false
err := helper.GuaranteedUpdate("/some/key", &TestResource{}, ignoreNotFound, f) err := helper.GuaranteedUpdate(context.TODO(), "/some/key", &TestResource{}, ignoreNotFound, f)
if err == nil { if err == nil {
t.Errorf("Expected error for key not found.") t.Errorf("Expected error for key not found.")
} }
ignoreNotFound = true ignoreNotFound = true
err = helper.GuaranteedUpdate("/some/key", &TestResource{}, ignoreNotFound, f) err = helper.GuaranteedUpdate(context.TODO(), "/some/key", &TestResource{}, ignoreNotFound, f)
if err != nil { if err != nil {
t.Errorf("Unexpected error %v.", err) t.Errorf("Unexpected error %v.", err)
} }
@ -794,7 +795,7 @@ func TestGuaranteedUpdate_CreateCollision(t *testing.T) {
defer wgDone.Done() defer wgDone.Done()
firstCall := true firstCall := true
err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { err := helper.GuaranteedUpdate(context.TODO(), "/some/key", &TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
defer func() { firstCall = false }() defer func() { firstCall = false }()
if firstCall { if firstCall {

View File

@ -31,6 +31,8 @@ import (
"k8s.io/kubernetes/pkg/tools/etcdtest" "k8s.io/kubernetes/pkg/tools/etcdtest"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
"golang.org/x/net/context"
) )
var versioner = APIObjectVersioner{} var versioner = APIObjectVersioner{}
@ -223,7 +225,7 @@ func TestWatchEtcdError(t *testing.T) {
fakeClient.WatchImmediateError = fmt.Errorf("immediate error") fakeClient.WatchImmediateError = fmt.Errorf("immediate error")
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.Watch("/some/key", 4, storage.Everything) watching, err := h.Watch(context.TODO(), "/some/key", 4, storage.Everything)
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@ -253,7 +255,7 @@ func TestWatch(t *testing.T) {
fakeClient.ExpectNotFoundGet(prefixedKey) fakeClient.ExpectNotFoundGet(prefixedKey)
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.Watch(key, 0, storage.Everything) watching, err := h.Watch(context.TODO(), key, 0, storage.Everything)
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@ -428,7 +430,7 @@ func TestWatchEtcdState(t *testing.T) {
} }
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.Watch(baseKey, testCase.From, storage.Everything) watching, err := h.Watch(context.TODO(), baseKey, testCase.From, storage.Everything)
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@ -502,7 +504,7 @@ func TestWatchFromZeroIndex(t *testing.T) {
fakeClient.Data[prefixedKey] = testCase.Response fakeClient.Data[prefixedKey] = testCase.Response
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.Watch(key, 0, storage.Everything) watching, err := h.Watch(context.TODO(), key, 0, storage.Everything)
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@ -563,7 +565,7 @@ func TestWatchListFromZeroIndex(t *testing.T) {
} }
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.WatchList(key, 0, storage.Everything) watching, err := h.WatchList(context.TODO(), key, 0, storage.Everything)
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@ -603,7 +605,7 @@ func TestWatchListIgnoresRootKey(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.WatchList(key, 1, storage.Everything) watching, err := h.WatchList(context.TODO(), key, 1, storage.Everything)
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@ -656,7 +658,7 @@ func TestWatchFromNotFound(t *testing.T) {
} }
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.Watch(key, 0, storage.Everything) watching, err := h.Watch(context.TODO(), key, 0, storage.Everything)
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@ -683,7 +685,7 @@ func TestWatchFromOtherError(t *testing.T) {
} }
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.Watch(key, 0, storage.Everything) watching, err := h.Watch(context.TODO(), key, 0, storage.Everything)
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@ -719,7 +721,7 @@ func TestWatchPurposefulShutdown(t *testing.T) {
fakeClient.ExpectNotFoundGet(prefixedKey) fakeClient.ExpectNotFoundGet(prefixedKey)
// Test purposeful shutdown // Test purposeful shutdown
watching, err := h.Watch(key, 0, storage.Everything) watching, err := h.Watch(context.TODO(), key, 0, storage.Everything)
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }

View File

@ -19,6 +19,7 @@ package storage
import ( import (
"time" "time"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
) )
@ -74,7 +75,7 @@ type Interface interface {
// Returns list of servers addresses of the underyling database. // Returns list of servers addresses of the underyling database.
// TODO: This method is used only in a single place. Consider refactoring and getting rid // TODO: This method is used only in a single place. Consider refactoring and getting rid
// of this method from the interface. // of this method from the interface.
Backends() []string Backends(ctx context.Context) []string
// Returns Versioner associated with this interface. // Returns Versioner associated with this interface.
Versioner() Versioner Versioner() Versioner
@ -82,40 +83,40 @@ type Interface interface {
// Create adds a new object at a key unless it already exists. 'ttl' is time-to-live // Create adds a new object at a key unless it already exists. 'ttl' is time-to-live
// in seconds (0 means forever). If no error is returned and out is not nil, out will be // in seconds (0 means forever). If no error is returned and out is not nil, out will be
// set to the read value from database. // set to the read value from database.
Create(key string, obj, out runtime.Object, ttl uint64) error Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error
// Set marshals obj via json and stores in database under key. Will do an atomic update // Set marshals obj via json and stores in database under key. Will do an atomic update
// if obj's ResourceVersion field is set. 'ttl' is time-to-live in seconds (0 means forever). // if obj's ResourceVersion field is set. 'ttl' is time-to-live in seconds (0 means forever).
// If no error is returned and out is not nil, out will be set to the read value from database. // If no error is returned and out is not nil, out will be set to the read value from database.
Set(key string, obj, out runtime.Object, ttl uint64) error Set(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error
// Delete removes the specified key and returns the value that existed at that spot. // Delete removes the specified key and returns the value that existed at that spot.
Delete(key string, out runtime.Object) error Delete(ctx context.Context, key string, out runtime.Object) error
// Watch begins watching the specified key. Events are decoded into API objects, // Watch begins watching the specified key. Events are decoded into API objects,
// and any items passing 'filter' are sent down to returned watch.Interface. // and any items passing 'filter' are sent down to returned watch.Interface.
// resourceVersion may be used to specify what version to begin watching // resourceVersion may be used to specify what version to begin watching
// (e.g. reconnecting without missing any updates). // (e.g. reconnecting without missing any updates).
Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) Watch(ctx context.Context, key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error)
// WatchList begins watching the specified key's items. Items are decoded into API // WatchList begins watching the specified key's items. Items are decoded into API
// objects and any item passing 'filter' are sent down to returned watch.Interface. // objects and any item passing 'filter' are sent down to returned watch.Interface.
// resourceVersion may be used to specify what version to begin watching // resourceVersion may be used to specify what version to begin watching
// (e.g. reconnecting without missing any updates). // (e.g. reconnecting without missing any updates).
WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) WatchList(ctx context.Context, key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error)
// Get unmarshals json found at key into objPtr. On a not found error, will either // Get unmarshals json found at key into objPtr. On a not found error, will either
// return a zero object of the requested type, or an error, depending on ignoreNotFound. // return a zero object of the requested type, or an error, depending on ignoreNotFound.
// Treats empty responses and nil response nodes exactly like a not found error. // Treats empty responses and nil response nodes exactly like a not found error.
Get(key string, objPtr runtime.Object, ignoreNotFound bool) error Get(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) error
// GetToList unmarshals json found at key and opaque it into *List api object // GetToList unmarshals json found at key and opaque it into *List api object
// (an object that satisfies the runtime.IsList definition). // (an object that satisfies the runtime.IsList definition).
GetToList(key string, filter FilterFunc, listObj runtime.Object) error GetToList(ctx context.Context, key string, filter FilterFunc, listObj runtime.Object) error
// List unmarshalls jsons found at directory defined by key and opaque them // List unmarshalls jsons found at directory defined by key and opaque them
// into *List api object (an object that satisfies runtime.IsList definition). // into *List api object (an object that satisfies runtime.IsList definition).
List(key string, filter FilterFunc, listObj runtime.Object) error List(ctx context.Context, key string, filter FilterFunc, listObj runtime.Object) error
// GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType') // GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType')
// retrying the update until success if there is index conflict. // retrying the update until success if there is index conflict.
@ -141,7 +142,7 @@ type Interface interface {
// return cur, nil, nil // return cur, nil, nil
// } // }
// }) // })
GuaranteedUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate UpdateFunc) error GuaranteedUpdate(ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate UpdateFunc) error
// Codec provides access to the underlying codec being used by the implementation. // Codec provides access to the underlying codec being used by the implementation.
Codec() runtime.Codec Codec() runtime.Codec

View File

@ -30,14 +30,17 @@ import (
"k8s.io/kubernetes/pkg/tools/etcdtest" "k8s.io/kubernetes/pkg/tools/etcdtest"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/framework"
"golang.org/x/net/context"
) )
func TestSet(t *testing.T) { func TestSet(t *testing.T) {
client := framework.NewEtcdClient() client := framework.NewEtcdClient()
etcdStorage := etcd.NewEtcdStorage(client, testapi.Default.Codec(), "") etcdStorage := etcd.NewEtcdStorage(client, testapi.Default.Codec(), "")
ctx := context.TODO()
framework.WithEtcdKey(func(key string) { framework.WithEtcdKey(func(key string) {
testObject := api.ServiceAccount{ObjectMeta: api.ObjectMeta{Name: "foo"}} testObject := api.ServiceAccount{ObjectMeta: api.ObjectMeta{Name: "foo"}}
if err := etcdStorage.Set(key, &testObject, nil, 0); err != nil { if err := etcdStorage.Set(ctx, key, &testObject, nil, 0); err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
resp, err := client.Get(key, false, false) resp, err := client.Get(key, false, false)
@ -58,6 +61,7 @@ func TestSet(t *testing.T) {
func TestGet(t *testing.T) { func TestGet(t *testing.T) {
client := framework.NewEtcdClient() client := framework.NewEtcdClient()
etcdStorage := etcd.NewEtcdStorage(client, testapi.Default.Codec(), "") etcdStorage := etcd.NewEtcdStorage(client, testapi.Default.Codec(), "")
ctx := context.TODO()
framework.WithEtcdKey(func(key string) { framework.WithEtcdKey(func(key string) {
testObject := api.ServiceAccount{ObjectMeta: api.ObjectMeta{Name: "foo"}} testObject := api.ServiceAccount{ObjectMeta: api.ObjectMeta{Name: "foo"}}
coded, err := testapi.Default.Codec().Encode(&testObject) coded, err := testapi.Default.Codec().Encode(&testObject)
@ -69,7 +73,7 @@ func TestGet(t *testing.T) {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
result := api.ServiceAccount{} result := api.ServiceAccount{}
if err := etcdStorage.Get(key, &result, false); err != nil { if err := etcdStorage.Get(ctx, key, &result, false); err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
// Propagate ResourceVersion (it is set automatically). // Propagate ResourceVersion (it is set automatically).
@ -83,13 +87,14 @@ func TestGet(t *testing.T) {
func TestWriteTTL(t *testing.T) { func TestWriteTTL(t *testing.T) {
client := framework.NewEtcdClient() client := framework.NewEtcdClient()
etcdStorage := etcd.NewEtcdStorage(client, testapi.Default.Codec(), "") etcdStorage := etcd.NewEtcdStorage(client, testapi.Default.Codec(), "")
ctx := context.TODO()
framework.WithEtcdKey(func(key string) { framework.WithEtcdKey(func(key string) {
testObject := api.ServiceAccount{ObjectMeta: api.ObjectMeta{Name: "foo"}} testObject := api.ServiceAccount{ObjectMeta: api.ObjectMeta{Name: "foo"}}
if err := etcdStorage.Set(key, &testObject, nil, 0); err != nil { if err := etcdStorage.Set(ctx, key, &testObject, nil, 0); err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
result := &api.ServiceAccount{} result := &api.ServiceAccount{}
err := etcdStorage.GuaranteedUpdate(key, result, false, func(obj runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { err := etcdStorage.GuaranteedUpdate(ctx, key, result, false, func(obj runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
if in, ok := obj.(*api.ServiceAccount); !ok || in.Name != "foo" { if in, ok := obj.(*api.ServiceAccount); !ok || in.Name != "foo" {
t.Fatalf("unexpected existing object: %v", obj) t.Fatalf("unexpected existing object: %v", obj)
} }
@ -111,7 +116,7 @@ func TestWriteTTL(t *testing.T) {
} }
result = &api.ServiceAccount{} result = &api.ServiceAccount{}
err = etcdStorage.GuaranteedUpdate(key, result, false, func(obj runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { err = etcdStorage.GuaranteedUpdate(ctx, key, result, false, func(obj runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
if in, ok := obj.(*api.ServiceAccount); !ok || in.Name != "out" { if in, ok := obj.(*api.ServiceAccount); !ok || in.Name != "out" {
t.Fatalf("unexpected existing object: %v", obj) t.Fatalf("unexpected existing object: %v", obj)
} }
@ -136,6 +141,7 @@ func TestWriteTTL(t *testing.T) {
func TestWatch(t *testing.T) { func TestWatch(t *testing.T) {
client := framework.NewEtcdClient() client := framework.NewEtcdClient()
etcdStorage := etcd.NewEtcdStorage(client, testapi.Default.Codec(), etcdtest.PathPrefix()) etcdStorage := etcd.NewEtcdStorage(client, testapi.Default.Codec(), etcdtest.PathPrefix())
ctx := context.TODO()
framework.WithEtcdKey(func(key string) { framework.WithEtcdKey(func(key string) {
key = etcdtest.AddPrefix(key) key = etcdtest.AddPrefix(key)
resp, err := client.Set(key, runtime.EncodeOrDie(testapi.Default.Codec(), &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0) resp, err := client.Set(key, runtime.EncodeOrDie(testapi.Default.Codec(), &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0)
@ -145,7 +151,7 @@ func TestWatch(t *testing.T) {
expectedVersion := resp.Node.ModifiedIndex expectedVersion := resp.Node.ModifiedIndex
// watch should load the object at the current index // watch should load the object at the current index
w, err := etcdStorage.Watch(key, 0, storage.Everything) w, err := etcdStorage.Watch(ctx, key, 0, storage.Everything)
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }