diff --git a/pkg/api/context.go b/pkg/api/context.go index a065e059ead..7e86395712a 100644 --- a/pkg/api/context.go +++ b/pkg/api/context.go @@ -18,14 +18,32 @@ package api import ( stderrs "errors" + "time" "golang.org/x/net/context" "k8s.io/kubernetes/pkg/auth/user" ) // Context carries values across API boundaries. +// This context matches the context.Context interface +// (https://blog.golang.org/context), for the purposes +// of passing the api.Context through to the storage tier. +// TODO: Determine the extent that this abstraction+interface +// is used by the api, and whether we can remove. type Context interface { + // Value returns the value associated with key or nil if none. Value(key interface{}) interface{} + + // Deadline returns the time when this Context will be canceled, if any. + Deadline() (deadline time.Time, ok bool) + + // Done returns a channel that is closed when this Context is canceled + // or times out. + Done() <-chan struct{} + + // Err indicates why this context was canceled, after the Done channel + // is closed. + Err() error } // The key type is unexported to prevent collisions diff --git a/pkg/master/master.go b/pkg/master/master.go index c435a4fa5e1..9dbc09ccb94 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -87,6 +87,7 @@ import ( "github.com/emicklei/go-restful/swagger" "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus" + "golang.org/x/net/context" "k8s.io/kubernetes/pkg/registry/service/allocator" "k8s.io/kubernetes/pkg/registry/service/portallocator" ) @@ -149,13 +150,13 @@ func (s *StorageDestinations) backends() []string { backends := sets.String{} for _, group := range s.APIGroups { if group.Default != nil { - for _, backend := range group.Default.Backends() { + for _, backend := range group.Default.Backends(context.TODO()) { backends.Insert(backend) } } if group.Overrides != nil { for _, storage := range group.Overrides { - for _, backend := range storage.Backends() { + for _, backend := range storage.Backends(context.TODO()) { backends.Insert(backend) } } diff --git a/pkg/registry/generic/etcd/etcd.go b/pkg/registry/generic/etcd/etcd.go index e3fed4d76c6..a620806e52a 100644 --- a/pkg/registry/generic/etcd/etcd.go +++ b/pkg/registry/generic/etcd/etcd.go @@ -156,7 +156,7 @@ func (e *Etcd) ListPredicate(ctx api.Context, m generic.Matcher) (runtime.Object if name, ok := m.MatchesSingle(); ok { if key, err := e.KeyFunc(ctx, name); err == nil { trace.Step("About to read single object") - err := e.Storage.GetToList(key, filterFunc, list) + err := e.Storage.GetToList(ctx, key, filterFunc, list) trace.Step("Object extracted") if err != nil { return nil, err @@ -167,7 +167,7 @@ func (e *Etcd) ListPredicate(ctx api.Context, m generic.Matcher) (runtime.Object } trace.Step("About to list directory") - err := e.Storage.List(e.KeyRootFunc(ctx), filterFunc, list) + err := e.Storage.List(ctx, e.KeyRootFunc(ctx), filterFunc, list) trace.Step("List extracted") if err != nil { return nil, err @@ -196,7 +196,7 @@ func (e *Etcd) Create(ctx api.Context, obj runtime.Object) (runtime.Object, erro } trace.Step("About to create object") out := e.NewFunc() - if err := e.Storage.Create(key, obj, out, ttl); err != nil { + if err := e.Storage.Create(ctx, key, obj, out, ttl); err != nil { err = etcderr.InterpretCreateError(err, e.EndpointName, name) err = rest.CheckGeneratedNameError(e.CreateStrategy, err, obj) return nil, err @@ -240,7 +240,7 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool // TODO: expose TTL creating := false out := e.NewFunc() - err = e.Storage.GuaranteedUpdate(key, out, true, func(existing runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { + err = e.Storage.GuaranteedUpdate(ctx, key, out, true, func(existing runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { version, err := e.Storage.Versioner().ObjectResourceVersion(existing) if err != nil { return nil, nil, err @@ -330,7 +330,7 @@ func (e *Etcd) Get(ctx api.Context, name string) (runtime.Object, error) { return nil, err } trace.Step("About to read object") - if err := e.Storage.Get(key, obj, false); err != nil { + if err := e.Storage.Get(ctx, key, obj, false); err != nil { return nil, etcderr.InterpretGetError(err, e.EndpointName, name) } trace.Step("Object read") @@ -358,7 +358,7 @@ func (e *Etcd) Delete(ctx api.Context, name string, options *api.DeleteOptions) trace := util.NewTrace("Delete " + reflect.TypeOf(obj).String()) defer trace.LogIfLong(time.Second) trace.Step("About to read object") - if err := e.Storage.Get(key, obj, false); err != nil { + if err := e.Storage.Get(ctx, key, obj, false); err != nil { return nil, etcderr.InterpretDeleteError(err, e.EndpointName, name) } @@ -378,7 +378,7 @@ func (e *Etcd) Delete(ctx api.Context, name string, options *api.DeleteOptions) out := e.NewFunc() lastGraceful := int64(0) err := e.Storage.GuaranteedUpdate( - key, out, false, + ctx, key, out, false, storage.SimpleUpdate(func(existing runtime.Object) (runtime.Object, error) { graceful, pendingGraceful, err := rest.BeforeDelete(e.DeleteStrategy, ctx, existing, options) if err != nil { @@ -413,7 +413,7 @@ func (e *Etcd) Delete(ctx api.Context, name string, options *api.DeleteOptions) // delete immediately, or no graceful deletion supported out := e.NewFunc() trace.Step("About to delete object") - if err := e.Storage.Delete(key, out); err != nil { + if err := e.Storage.Delete(ctx, key, out); err != nil { return nil, etcderr.InterpretDeleteError(err, e.EndpointName, name) } return e.finalizeDelete(out, true) @@ -457,12 +457,12 @@ func (e *Etcd) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersio if err != nil { return nil, err } - return e.Storage.Watch(key, version, filterFunc) + return e.Storage.Watch(ctx, key, version, filterFunc) } // if we cannot extract a key based on the current context, the optimization is skipped } - return e.Storage.WatchList(e.KeyRootFunc(ctx), version, filterFunc) + return e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), version, filterFunc) } func (e *Etcd) filterAndDecorateFunction(m generic.Matcher) func(runtime.Object) bool { diff --git a/pkg/registry/pod/etcd/etcd.go b/pkg/registry/pod/etcd/etcd.go index 7ac728a0999..df2496746e5 100644 --- a/pkg/registry/pod/etcd/etcd.go +++ b/pkg/registry/pod/etcd/etcd.go @@ -161,7 +161,7 @@ func (r *BindingREST) setPodHostAndAnnotations(ctx api.Context, podID, oldMachin if err != nil { return nil, err } - err = r.store.Storage.GuaranteedUpdate(podKey, &api.Pod{}, false, storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { + err = r.store.Storage.GuaranteedUpdate(ctx, podKey, &api.Pod{}, false, storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { pod, ok := obj.(*api.Pod) if !ok { return nil, fmt.Errorf("unexpected object: %#v", obj) diff --git a/pkg/registry/service/allocator/etcd/etcd.go b/pkg/registry/service/allocator/etcd/etcd.go index 2a5b3e8aa7c..35831a66db1 100644 --- a/pkg/registry/service/allocator/etcd/etcd.go +++ b/pkg/registry/service/allocator/etcd/etcd.go @@ -29,6 +29,8 @@ import ( "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/storage" etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" + + "golang.org/x/net/context" ) var ( @@ -141,7 +143,7 @@ func (e *Etcd) Release(item int) error { // tryUpdate performs a read-update to persist the latest snapshot state of allocation. func (e *Etcd) tryUpdate(fn func() error) error { - err := e.storage.GuaranteedUpdate(e.baseKey, &api.RangeAllocation{}, true, + err := e.storage.GuaranteedUpdate(context.TODO(), e.baseKey, &api.RangeAllocation{}, true, storage.SimpleUpdate(func(input runtime.Object) (output runtime.Object, err error) { existing := input.(*api.RangeAllocation) if len(existing.ResourceVersion) == 0 { @@ -171,7 +173,7 @@ func (e *Etcd) Refresh() (*api.RangeAllocation, error) { defer e.lock.Unlock() existing := &api.RangeAllocation{} - if err := e.storage.Get(e.baseKey, existing, false); err != nil { + if err := e.storage.Get(context.TODO(), e.baseKey, existing, false); err != nil { if etcdstorage.IsEtcdNotFound(err) { return nil, nil } @@ -185,7 +187,7 @@ func (e *Etcd) Refresh() (*api.RangeAllocation, error) { // etcd. If the key does not exist, the object will have an empty ResourceVersion. func (e *Etcd) Get() (*api.RangeAllocation, error) { existing := &api.RangeAllocation{} - if err := e.storage.Get(e.baseKey, existing, true); err != nil { + if err := e.storage.Get(context.TODO(), e.baseKey, existing, true); err != nil { return nil, etcderr.InterpretGetError(err, e.kind, "") } return existing, nil @@ -198,7 +200,7 @@ func (e *Etcd) CreateOrUpdate(snapshot *api.RangeAllocation) error { defer e.lock.Unlock() last := "" - err := e.storage.GuaranteedUpdate(e.baseKey, &api.RangeAllocation{}, true, + err := e.storage.GuaranteedUpdate(context.TODO(), e.baseKey, &api.RangeAllocation{}, true, storage.SimpleUpdate(func(input runtime.Object) (output runtime.Object, err error) { existing := input.(*api.RangeAllocation) switch { diff --git a/pkg/registry/service/allocator/etcd/etcd_test.go b/pkg/registry/service/allocator/etcd/etcd_test.go index 1c7d89dc926..ddcac35e8a2 100644 --- a/pkg/registry/service/allocator/etcd/etcd_test.go +++ b/pkg/registry/service/allocator/etcd/etcd_test.go @@ -27,6 +27,8 @@ import ( "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/tools" "k8s.io/kubernetes/pkg/tools/etcdtest" + + "golang.org/x/net/context" ) func newStorage(t *testing.T) (*Etcd, *tools.FakeEtcdClient, allocator.Interface) { @@ -84,7 +86,7 @@ func TestStore(t *testing.T) { other := allocator.NewAllocationMap(100, "rangeSpecValue") allocation := &api.RangeAllocation{} - if err := storage.storage.Get(key(), allocation, false); err != nil { + if err := storage.storage.Get(context.TODO(), key(), allocation, false); err != nil { t.Fatal(err) } if allocation.ResourceVersion != "2" { diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index 0c9ba22c5e3..7fd1c68c071 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -31,6 +31,7 @@ import ( "k8s.io/kubernetes/pkg/watch" "github.com/golang/glog" + "golang.org/x/net/context" ) // CacherConfig contains the configuration for a given Cache. @@ -152,8 +153,8 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) { } // Implements storage.Interface. -func (c *Cacher) Backends() []string { - return c.storage.Backends() +func (c *Cacher) Backends(ctx context.Context) []string { + return c.storage.Backends(ctx) } // Implements storage.Interface. @@ -162,22 +163,22 @@ func (c *Cacher) Versioner() Versioner { } // Implements storage.Interface. -func (c *Cacher) Create(key string, obj, out runtime.Object, ttl uint64) error { - return c.storage.Create(key, obj, out, ttl) +func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error { + return c.storage.Create(ctx, key, obj, out, ttl) } // Implements storage.Interface. -func (c *Cacher) Set(key string, obj, out runtime.Object, ttl uint64) error { - return c.storage.Set(key, obj, out, ttl) +func (c *Cacher) Set(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error { + return c.storage.Set(ctx, key, obj, out, ttl) } // Implements storage.Interface. -func (c *Cacher) Delete(key string, out runtime.Object) error { - return c.storage.Delete(key, out) +func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object) error { + return c.storage.Delete(ctx, key, out) } // Implements storage.Interface. -func (c *Cacher) Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) { +func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) { // Do NOT allow Watch to start when the underlying structures are not propagated. c.usable.RLock() defer c.usable.RUnlock() @@ -203,23 +204,23 @@ func (c *Cacher) Watch(key string, resourceVersion uint64, filter FilterFunc) (w } // Implements storage.Interface. -func (c *Cacher) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) { - return c.Watch(key, resourceVersion, filter) +func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) { + return c.Watch(ctx, key, resourceVersion, filter) } // Implements storage.Interface. -func (c *Cacher) Get(key string, objPtr runtime.Object, ignoreNotFound bool) error { - return c.storage.Get(key, objPtr, ignoreNotFound) +func (c *Cacher) Get(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) error { + return c.storage.Get(ctx, key, objPtr, ignoreNotFound) } // Implements storage.Interface. -func (c *Cacher) GetToList(key string, filter FilterFunc, listObj runtime.Object) error { - return c.storage.GetToList(key, filter, listObj) +func (c *Cacher) GetToList(ctx context.Context, key string, filter FilterFunc, listObj runtime.Object) error { + return c.storage.GetToList(ctx, key, filter, listObj) } // Implements storage.Interface. -func (c *Cacher) List(key string, filter FilterFunc, listObj runtime.Object) error { - return c.storage.List(key, filter, listObj) +func (c *Cacher) List(ctx context.Context, key string, filter FilterFunc, listObj runtime.Object) error { + return c.storage.List(ctx, key, filter, listObj) } // ListFromMemory implements list operation (the same signature as List method) @@ -263,8 +264,8 @@ func (c *Cacher) ListFromMemory(key string, listObj runtime.Object) error { } // Implements storage.Interface. -func (c *Cacher) GuaranteedUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate UpdateFunc) error { - return c.storage.GuaranteedUpdate(key, ptrToType, ignoreNotFound, tryUpdate) +func (c *Cacher) GuaranteedUpdate(ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate UpdateFunc) error { + return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, tryUpdate) } // Implements storage.Interface. @@ -343,7 +344,7 @@ func newCacherListerWatcher(storage Interface, resourcePrefix string, newListFun // Implements cache.ListerWatcher interface. func (lw *cacherListerWatcher) List() (runtime.Object, error) { list := lw.newListFunc() - if err := lw.storage.List(lw.resourcePrefix, Everything, list); err != nil { + if err := lw.storage.List(context.TODO(), lw.resourcePrefix, Everything, list); err != nil { return nil, err } return list, nil @@ -355,7 +356,7 @@ func (lw *cacherListerWatcher) Watch(resourceVersion string) (watch.Interface, e if err != nil { return nil, err } - return lw.storage.WatchList(lw.resourcePrefix, version, Everything) + return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, version, Everything) } // cacherWatch implements watch.Interface diff --git a/pkg/storage/cacher_test.go b/pkg/storage/cacher_test.go index e24c28c9ab7..6e8bdc82ad8 100644 --- a/pkg/storage/cacher_test.go +++ b/pkg/storage/cacher_test.go @@ -39,6 +39,8 @@ import ( "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/watch" + + "golang.org/x/net/context" ) func newTestCacher(client tools.EtcdClient) *storage.Cacher { @@ -250,7 +252,7 @@ func TestWatch(t *testing.T) { } // Set up Watch for object "podFoo". - watcher, err := cacher.Watch("pods/ns/foo", 2, storage.Everything) + watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", 2, storage.Everything) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -273,13 +275,13 @@ func TestWatch(t *testing.T) { } // Check whether we get too-old error. - _, err = cacher.Watch("pods/ns/foo", 1, storage.Everything) + _, err = cacher.Watch(context.TODO(), "pods/ns/foo", 1, storage.Everything) if err == nil { t.Errorf("exepcted 'error too old' error") } // Now test watch with initial state. - initialWatcher, err := cacher.Watch("pods/ns/foo", 2, storage.Everything) + initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", 2, storage.Everything) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -300,7 +302,7 @@ func TestWatch(t *testing.T) { } // Now test watch from "now". - nowWatcher, err := cacher.Watch("pods/ns/foo", 0, storage.Everything) + nowWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", 0, storage.Everything) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -450,7 +452,7 @@ func TestFiltering(t *testing.T) { } return selector.Matches(labels.Set(metadata.Labels())) } - watcher, err := cacher.Watch("pods/ns/foo", 1, filter) + watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", 1, filter) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -485,7 +487,7 @@ func TestStorageError(t *testing.T) { podFoo := makeTestPod("foo") // Set up Watch for object "podFoo". - watcher, err := cacher.Watch("pods/ns/foo", 1, storage.Everything) + watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", 1, storage.Everything) if err != nil { t.Fatalf("unexpected error: %v", err) } diff --git a/pkg/storage/etcd/etcd_helper.go b/pkg/storage/etcd/etcd_helper.go index 254846ea310..ab771250e56 100644 --- a/pkg/storage/etcd/etcd_helper.go +++ b/pkg/storage/etcd/etcd_helper.go @@ -35,6 +35,7 @@ import ( "k8s.io/kubernetes/pkg/watch" "github.com/golang/glog" + "golang.org/x/net/context" ) func NewEtcdStorage(client tools.EtcdClient, codec runtime.Codec, prefix string) storage.Interface { @@ -78,7 +79,10 @@ func (h *etcdHelper) Codec() runtime.Codec { } // Implements storage.Interface. -func (h *etcdHelper) Backends() []string { +func (h *etcdHelper) Backends(ctx context.Context) []string { + if ctx == nil { + glog.Errorf("Context is nil") + } return h.client.GetCluster() } @@ -88,7 +92,10 @@ func (h *etcdHelper) Versioner() storage.Versioner { } // Implements storage.Interface. -func (h *etcdHelper) Create(key string, obj, out runtime.Object, ttl uint64) error { +func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error { + if ctx == nil { + glog.Errorf("Context is nil") + } key = h.prefixEtcdKey(key) data, err := h.codec.Encode(obj) if err != nil { @@ -116,7 +123,10 @@ func (h *etcdHelper) Create(key string, obj, out runtime.Object, ttl uint64) err } // Implements storage.Interface. -func (h *etcdHelper) Set(key string, obj, out runtime.Object, ttl uint64) error { +func (h *etcdHelper) Set(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error { + if ctx == nil { + glog.Errorf("Context is nil") + } var response *etcd.Response data, err := h.codec.Encode(obj) if err != nil { @@ -157,7 +167,10 @@ func (h *etcdHelper) Set(key string, obj, out runtime.Object, ttl uint64) error } // Implements storage.Interface. -func (h *etcdHelper) Delete(key string, out runtime.Object) error { +func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object) error { + if ctx == nil { + glog.Errorf("Context is nil") + } key = h.prefixEtcdKey(key) if _, err := conversion.EnforcePtr(out); err != nil { panic("unable to convert output object to pointer") @@ -176,7 +189,10 @@ func (h *etcdHelper) Delete(key string, out runtime.Object) error { } // Implements storage.Interface. -func (h *etcdHelper) Watch(key string, resourceVersion uint64, filter storage.FilterFunc) (watch.Interface, error) { +func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion uint64, filter storage.FilterFunc) (watch.Interface, error) { + if ctx == nil { + glog.Errorf("Context is nil") + } key = h.prefixEtcdKey(key) w := newEtcdWatcher(false, nil, filter, h.codec, h.versioner, nil, h) go w.etcdWatch(h.client, key, resourceVersion) @@ -184,7 +200,10 @@ func (h *etcdHelper) Watch(key string, resourceVersion uint64, filter storage.Fi } // Implements storage.Interface. -func (h *etcdHelper) WatchList(key string, resourceVersion uint64, filter storage.FilterFunc) (watch.Interface, error) { +func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion uint64, filter storage.FilterFunc) (watch.Interface, error) { + if ctx == nil { + glog.Errorf("Context is nil") + } key = h.prefixEtcdKey(key) w := newEtcdWatcher(true, exceptKey(key), filter, h.codec, h.versioner, nil, h) go w.etcdWatch(h.client, key, resourceVersion) @@ -192,15 +211,21 @@ func (h *etcdHelper) WatchList(key string, resourceVersion uint64, filter storag } // Implements storage.Interface. -func (h *etcdHelper) Get(key string, objPtr runtime.Object, ignoreNotFound bool) error { +func (h *etcdHelper) Get(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) error { + if ctx == nil { + glog.Errorf("Context is nil") + } key = h.prefixEtcdKey(key) - _, _, _, err := h.bodyAndExtractObj(key, objPtr, ignoreNotFound) + _, _, _, err := h.bodyAndExtractObj(ctx, key, objPtr, ignoreNotFound) return err } // bodyAndExtractObj performs the normal Get path to etcd, returning the parsed node and response for additional information // about the response, like the current etcd index and the ttl. -func (h *etcdHelper) bodyAndExtractObj(key string, objPtr runtime.Object, ignoreNotFound bool) (body string, node *etcd.Node, res *etcd.Response, err error) { +func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) (body string, node *etcd.Node, res *etcd.Response, err error) { + if ctx == nil { + glog.Errorf("Context is nil") + } startTime := time.Now() response, err := h.client.Get(key, false, false) metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime) @@ -243,7 +268,10 @@ func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run } // Implements storage.Interface. -func (h *etcdHelper) GetToList(key string, filter storage.FilterFunc, listObj runtime.Object) error { +func (h *etcdHelper) GetToList(ctx context.Context, key string, filter storage.FilterFunc, listObj runtime.Object) error { + if ctx == nil { + glog.Errorf("Context is nil") + } trace := util.NewTrace("GetToList " + getTypeName(listObj)) listPtr, err := runtime.GetItemsPtr(listObj) if err != nil { @@ -321,7 +349,10 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFun } // Implements storage.Interface. -func (h *etcdHelper) List(key string, filter storage.FilterFunc, listObj runtime.Object) error { +func (h *etcdHelper) List(ctx context.Context, key string, filter storage.FilterFunc, listObj runtime.Object) error { + if ctx == nil { + glog.Errorf("Context is nil") + } trace := util.NewTrace("List " + getTypeName(listObj)) defer trace.LogIfLong(time.Second) listPtr, err := runtime.GetItemsPtr(listObj) @@ -331,7 +362,7 @@ func (h *etcdHelper) List(key string, filter storage.FilterFunc, listObj runtime key = h.prefixEtcdKey(key) startTime := time.Now() trace.Step("About to list etcd node") - nodes, index, err := h.listEtcdNode(key) + nodes, index, err := h.listEtcdNode(ctx, key) metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime) trace.Step("Etcd node listed") if err != nil { @@ -349,7 +380,10 @@ func (h *etcdHelper) List(key string, filter storage.FilterFunc, listObj runtime return nil } -func (h *etcdHelper) listEtcdNode(key string) ([]*etcd.Node, uint64, error) { +func (h *etcdHelper) listEtcdNode(ctx context.Context, key string) ([]*etcd.Node, uint64, error) { + if ctx == nil { + glog.Errorf("Context is nil") + } result, err := h.client.Get(key, true, true) if err != nil { index, ok := etcdErrorIndex(err) @@ -367,7 +401,10 @@ func (h *etcdHelper) listEtcdNode(key string) ([]*etcd.Node, uint64, error) { } // Implements storage.Interface. -func (h *etcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate storage.UpdateFunc) error { +func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate storage.UpdateFunc) error { + if ctx == nil { + glog.Errorf("Context is nil") + } v, err := conversion.EnforcePtr(ptrToType) if err != nil { // Panic is appropriate, because this is a programming error. @@ -376,7 +413,7 @@ func (h *etcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, igno key = h.prefixEtcdKey(key) for { obj := reflect.New(v.Type()).Interface().(runtime.Object) - origBody, node, res, err := h.bodyAndExtractObj(key, obj, ignoreNotFound) + origBody, node, res, err := h.bodyAndExtractObj(ctx, key, obj, ignoreNotFound) if err != nil { return err } diff --git a/pkg/storage/etcd/etcd_helper_test.go b/pkg/storage/etcd/etcd_helper_test.go index 43bc56d3951..8e4c2a1ba40 100644 --- a/pkg/storage/etcd/etcd_helper_test.go +++ b/pkg/storage/etcd/etcd_helper_test.go @@ -32,6 +32,7 @@ import ( "github.com/coreos/go-etcd/etcd" "github.com/stretchr/testify/assert" + "golang.org/x/net/context" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" apitesting "k8s.io/kubernetes/pkg/api/testing" @@ -144,7 +145,7 @@ func TestList(t *testing.T) { } var got api.PodList - err := helper.List("/some/key", storage.Everything, &got) + err := helper.List(context.TODO(), "/some/key", storage.Everything, &got) if err != nil { t.Errorf("Unexpected error %v", err) } @@ -201,7 +202,7 @@ func TestListFiltered(t *testing.T) { } var got api.PodList - err := helper.List("/some/key", filter, &got) + err := helper.List(context.TODO(), "/some/key", filter, &got) if err != nil { t.Errorf("Unexpected error %v", err) } @@ -276,7 +277,7 @@ func TestListAcrossDirectories(t *testing.T) { } var got api.PodList - err := helper.List("/some/key", storage.Everything, &got) + err := helper.List(context.TODO(), "/some/key", storage.Everything, &got) if err != nil { t.Errorf("Unexpected error %v", err) } @@ -338,7 +339,7 @@ func TestListExcludesDirectories(t *testing.T) { } var got api.PodList - err := helper.List("/some/key", storage.Everything, &got) + err := helper.List(context.TODO(), "/some/key", storage.Everything, &got) if err != nil { t.Errorf("Unexpected error %v", err) } @@ -357,7 +358,7 @@ func TestGet(t *testing.T) { } fakeClient.Set(key, runtime.EncodeOrDie(testapi.Default.Codec(), &expect), 0) var got api.Pod - err := helper.Get("/some/key", &got, false) + err := helper.Get(context.TODO(), "/some/key", &got, false) if err != nil { t.Errorf("Unexpected error %#v", err) } @@ -394,11 +395,11 @@ func TestGetNotFoundErr(t *testing.T) { } try := func(key string) { var got api.Pod - err := helper.Get(key, &got, false) + err := helper.Get(context.TODO(), key, &got, false) if err == nil { t.Errorf("%s: wanted error but didn't get one", key) } - err = helper.Get(key, &got, true) + err = helper.Get(context.TODO(), key, &got, true) if err != nil { t.Errorf("%s: didn't want error but got %#v", key, err) } @@ -414,7 +415,7 @@ func TestCreate(t *testing.T) { fakeClient := tools.NewFakeEtcdClient(t) helper := newEtcdHelper(fakeClient, testapi.Default.Codec(), etcdtest.PathPrefix()) returnedObj := &api.Pod{} - err := helper.Create("/some/key", obj, returnedObj, 5) + err := helper.Create(context.TODO(), "/some/key", obj, returnedObj, 5) if err != nil { t.Errorf("Unexpected error %#v", err) } @@ -439,7 +440,7 @@ func TestCreateNilOutParam(t *testing.T) { obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} fakeClient := tools.NewFakeEtcdClient(t) helper := newEtcdHelper(fakeClient, testapi.Default.Codec(), etcdtest.PathPrefix()) - err := helper.Create("/some/key", obj, nil, 5) + err := helper.Create(context.TODO(), "/some/key", obj, nil, 5) if err != nil { t.Errorf("Unexpected error %#v", err) } @@ -450,7 +451,7 @@ func TestSet(t *testing.T) { fakeClient := tools.NewFakeEtcdClient(t) helper := newEtcdHelper(fakeClient, testapi.Default.Codec(), etcdtest.PathPrefix()) returnedObj := &api.Pod{} - err := helper.Set("/some/key", obj, returnedObj, 5) + err := helper.Set(context.TODO(), "/some/key", obj, returnedObj, 5) if err != nil { t.Errorf("Unexpected error %#v", err) } @@ -477,7 +478,7 @@ func TestSetFailCAS(t *testing.T) { fakeClient := tools.NewFakeEtcdClient(t) fakeClient.CasErr = fakeClient.NewError(123) helper := newEtcdHelper(fakeClient, testapi.Default.Codec(), etcdtest.PathPrefix()) - err := helper.Set("/some/key", obj, nil, 5) + err := helper.Set(context.TODO(), "/some/key", obj, nil, 5) if err == nil { t.Errorf("Expecting error.") } @@ -499,7 +500,7 @@ func TestSetWithVersion(t *testing.T) { } returnedObj := &api.Pod{} - err := helper.Set("/some/key", obj, returnedObj, 7) + err := helper.Set(context.TODO(), "/some/key", obj, returnedObj, 7) if err != nil { t.Fatalf("Unexpected error %#v", err) } @@ -526,7 +527,7 @@ func TestSetWithoutResourceVersioner(t *testing.T) { helper := newEtcdHelper(fakeClient, testapi.Default.Codec(), etcdtest.PathPrefix()) helper.versioner = nil returnedObj := &api.Pod{} - err := helper.Set("/some/key", obj, returnedObj, 3) + err := helper.Set(context.TODO(), "/some/key", obj, returnedObj, 3) key := etcdtest.AddPrefix("/some/key") if err != nil { t.Errorf("Unexpected error %#v", err) @@ -553,7 +554,7 @@ func TestSetNilOutParam(t *testing.T) { fakeClient := tools.NewFakeEtcdClient(t) helper := newEtcdHelper(fakeClient, testapi.Default.Codec(), etcdtest.PathPrefix()) helper.versioner = nil - err := helper.Set("/some/key", obj, nil, 3) + err := helper.Set(context.TODO(), "/some/key", obj, nil, 3) if err != nil { t.Errorf("Unexpected error %#v", err) } @@ -568,7 +569,7 @@ func TestGuaranteedUpdate(t *testing.T) { // Create a new node. fakeClient.ExpectNotFoundGet(key) obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1} - err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { + err := helper.GuaranteedUpdate(context.TODO(), "/some/key", &TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { return obj, nil })) if err != nil { @@ -587,7 +588,7 @@ func TestGuaranteedUpdate(t *testing.T) { // Update an existing node. callbackCalled := false objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 2} - err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { + err = helper.GuaranteedUpdate(context.TODO(), "/some/key", &TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { callbackCalled = true if in.(*TestResource).Value != 1 { @@ -623,7 +624,7 @@ func TestGuaranteedUpdateTTL(t *testing.T) { // Create a new node. fakeClient.ExpectNotFoundGet(key) obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1} - err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { + err := helper.GuaranteedUpdate(context.TODO(), "/some/key", &TestResource{}, true, func(in runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { if res.TTL != 0 { t.Fatalf("unexpected response meta: %#v", res) } @@ -649,7 +650,7 @@ func TestGuaranteedUpdateTTL(t *testing.T) { // Update an existing node. callbackCalled := false objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 2} - err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { + err = helper.GuaranteedUpdate(context.TODO(), "/some/key", &TestResource{}, true, func(in runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { if res.TTL != 10 { t.Fatalf("unexpected response meta: %#v", res) } @@ -681,7 +682,7 @@ func TestGuaranteedUpdateTTL(t *testing.T) { // Update an existing node and change ttl callbackCalled = false objUpdate = &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 3} - err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { + err = helper.GuaranteedUpdate(context.TODO(), "/some/key", &TestResource{}, true, func(in runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { if res.TTL != 10 { t.Fatalf("unexpected response meta: %#v", res) } @@ -724,7 +725,7 @@ func TestGuaranteedUpdateNoChange(t *testing.T) { // Create a new node. fakeClient.ExpectNotFoundGet(key) obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1} - err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { + err := helper.GuaranteedUpdate(context.TODO(), "/some/key", &TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { return obj, nil })) if err != nil { @@ -734,7 +735,7 @@ func TestGuaranteedUpdateNoChange(t *testing.T) { // Update an existing node with the same data callbackCalled := false objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1} - err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { + err = helper.GuaranteedUpdate(context.TODO(), "/some/key", &TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { fakeClient.Err = errors.New("should not be called") callbackCalled = true return objUpdate, nil @@ -762,13 +763,13 @@ func TestGuaranteedUpdateKeyNotFound(t *testing.T) { }) ignoreNotFound := false - err := helper.GuaranteedUpdate("/some/key", &TestResource{}, ignoreNotFound, f) + err := helper.GuaranteedUpdate(context.TODO(), "/some/key", &TestResource{}, ignoreNotFound, f) if err == nil { t.Errorf("Expected error for key not found.") } ignoreNotFound = true - err = helper.GuaranteedUpdate("/some/key", &TestResource{}, ignoreNotFound, f) + err = helper.GuaranteedUpdate(context.TODO(), "/some/key", &TestResource{}, ignoreNotFound, f) if err != nil { t.Errorf("Unexpected error %v.", err) } @@ -794,7 +795,7 @@ func TestGuaranteedUpdate_CreateCollision(t *testing.T) { defer wgDone.Done() firstCall := true - err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { + err := helper.GuaranteedUpdate(context.TODO(), "/some/key", &TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { defer func() { firstCall = false }() if firstCall { diff --git a/pkg/storage/etcd/etcd_watcher_test.go b/pkg/storage/etcd/etcd_watcher_test.go index b79f04544c3..1d197f6983b 100644 --- a/pkg/storage/etcd/etcd_watcher_test.go +++ b/pkg/storage/etcd/etcd_watcher_test.go @@ -31,6 +31,8 @@ import ( "k8s.io/kubernetes/pkg/tools/etcdtest" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/watch" + + "golang.org/x/net/context" ) var versioner = APIObjectVersioner{} @@ -223,7 +225,7 @@ func TestWatchEtcdError(t *testing.T) { fakeClient.WatchImmediateError = fmt.Errorf("immediate error") h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) - watching, err := h.Watch("/some/key", 4, storage.Everything) + watching, err := h.Watch(context.TODO(), "/some/key", 4, storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -253,7 +255,7 @@ func TestWatch(t *testing.T) { fakeClient.ExpectNotFoundGet(prefixedKey) h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) - watching, err := h.Watch(key, 0, storage.Everything) + watching, err := h.Watch(context.TODO(), key, 0, storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -428,7 +430,7 @@ func TestWatchEtcdState(t *testing.T) { } h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) - watching, err := h.Watch(baseKey, testCase.From, storage.Everything) + watching, err := h.Watch(context.TODO(), baseKey, testCase.From, storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -502,7 +504,7 @@ func TestWatchFromZeroIndex(t *testing.T) { fakeClient.Data[prefixedKey] = testCase.Response h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) - watching, err := h.Watch(key, 0, storage.Everything) + watching, err := h.Watch(context.TODO(), key, 0, storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -563,7 +565,7 @@ func TestWatchListFromZeroIndex(t *testing.T) { } h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) - watching, err := h.WatchList(key, 0, storage.Everything) + watching, err := h.WatchList(context.TODO(), key, 0, storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -603,7 +605,7 @@ func TestWatchListIgnoresRootKey(t *testing.T) { fakeClient := tools.NewFakeEtcdClient(t) h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) - watching, err := h.WatchList(key, 1, storage.Everything) + watching, err := h.WatchList(context.TODO(), key, 1, storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -656,7 +658,7 @@ func TestWatchFromNotFound(t *testing.T) { } h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) - watching, err := h.Watch(key, 0, storage.Everything) + watching, err := h.Watch(context.TODO(), key, 0, storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -683,7 +685,7 @@ func TestWatchFromOtherError(t *testing.T) { } h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) - watching, err := h.Watch(key, 0, storage.Everything) + watching, err := h.Watch(context.TODO(), key, 0, storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -719,7 +721,7 @@ func TestWatchPurposefulShutdown(t *testing.T) { fakeClient.ExpectNotFoundGet(prefixedKey) // Test purposeful shutdown - watching, err := h.Watch(key, 0, storage.Everything) + watching, err := h.Watch(context.TODO(), key, 0, storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } diff --git a/pkg/storage/interfaces.go b/pkg/storage/interfaces.go index 4b106e75d03..c55f2687fbe 100644 --- a/pkg/storage/interfaces.go +++ b/pkg/storage/interfaces.go @@ -19,6 +19,7 @@ package storage import ( "time" + "golang.org/x/net/context" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/watch" ) @@ -74,7 +75,7 @@ type Interface interface { // Returns list of servers addresses of the underyling database. // TODO: This method is used only in a single place. Consider refactoring and getting rid // of this method from the interface. - Backends() []string + Backends(ctx context.Context) []string // Returns Versioner associated with this interface. Versioner() Versioner @@ -82,40 +83,40 @@ type Interface interface { // Create adds a new object at a key unless it already exists. 'ttl' is time-to-live // in seconds (0 means forever). If no error is returned and out is not nil, out will be // set to the read value from database. - Create(key string, obj, out runtime.Object, ttl uint64) error + Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error // Set marshals obj via json and stores in database under key. Will do an atomic update // if obj's ResourceVersion field is set. 'ttl' is time-to-live in seconds (0 means forever). // If no error is returned and out is not nil, out will be set to the read value from database. - Set(key string, obj, out runtime.Object, ttl uint64) error + Set(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error // Delete removes the specified key and returns the value that existed at that spot. - Delete(key string, out runtime.Object) error + Delete(ctx context.Context, key string, out runtime.Object) error // Watch begins watching the specified key. Events are decoded into API objects, // and any items passing 'filter' are sent down to returned watch.Interface. // resourceVersion may be used to specify what version to begin watching // (e.g. reconnecting without missing any updates). - Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) + Watch(ctx context.Context, key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) // WatchList begins watching the specified key's items. Items are decoded into API // objects and any item passing 'filter' are sent down to returned watch.Interface. // resourceVersion may be used to specify what version to begin watching // (e.g. reconnecting without missing any updates). - WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) + WatchList(ctx context.Context, key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) // Get unmarshals json found at key into objPtr. On a not found error, will either // return a zero object of the requested type, or an error, depending on ignoreNotFound. // Treats empty responses and nil response nodes exactly like a not found error. - Get(key string, objPtr runtime.Object, ignoreNotFound bool) error + Get(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) error // GetToList unmarshals json found at key and opaque it into *List api object // (an object that satisfies the runtime.IsList definition). - GetToList(key string, filter FilterFunc, listObj runtime.Object) error + GetToList(ctx context.Context, key string, filter FilterFunc, listObj runtime.Object) error // List unmarshalls jsons found at directory defined by key and opaque them // into *List api object (an object that satisfies runtime.IsList definition). - List(key string, filter FilterFunc, listObj runtime.Object) error + List(ctx context.Context, key string, filter FilterFunc, listObj runtime.Object) error // GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType') // retrying the update until success if there is index conflict. @@ -141,7 +142,7 @@ type Interface interface { // return cur, nil, nil // } // }) - GuaranteedUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate UpdateFunc) error + GuaranteedUpdate(ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate UpdateFunc) error // Codec provides access to the underlying codec being used by the implementation. Codec() runtime.Codec diff --git a/test/integration/etcd_tools_test.go b/test/integration/etcd_tools_test.go index 9eb6b9c58b6..4ef52147bbb 100644 --- a/test/integration/etcd_tools_test.go +++ b/test/integration/etcd_tools_test.go @@ -30,14 +30,17 @@ import ( "k8s.io/kubernetes/pkg/tools/etcdtest" "k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/test/integration/framework" + + "golang.org/x/net/context" ) func TestSet(t *testing.T) { client := framework.NewEtcdClient() etcdStorage := etcd.NewEtcdStorage(client, testapi.Default.Codec(), "") + ctx := context.TODO() framework.WithEtcdKey(func(key string) { testObject := api.ServiceAccount{ObjectMeta: api.ObjectMeta{Name: "foo"}} - if err := etcdStorage.Set(key, &testObject, nil, 0); err != nil { + if err := etcdStorage.Set(ctx, key, &testObject, nil, 0); err != nil { t.Fatalf("unexpected error: %v", err) } resp, err := client.Get(key, false, false) @@ -58,6 +61,7 @@ func TestSet(t *testing.T) { func TestGet(t *testing.T) { client := framework.NewEtcdClient() etcdStorage := etcd.NewEtcdStorage(client, testapi.Default.Codec(), "") + ctx := context.TODO() framework.WithEtcdKey(func(key string) { testObject := api.ServiceAccount{ObjectMeta: api.ObjectMeta{Name: "foo"}} coded, err := testapi.Default.Codec().Encode(&testObject) @@ -69,7 +73,7 @@ func TestGet(t *testing.T) { t.Fatalf("unexpected error: %v", err) } result := api.ServiceAccount{} - if err := etcdStorage.Get(key, &result, false); err != nil { + if err := etcdStorage.Get(ctx, key, &result, false); err != nil { t.Fatalf("unexpected error: %v", err) } // Propagate ResourceVersion (it is set automatically). @@ -83,13 +87,14 @@ func TestGet(t *testing.T) { func TestWriteTTL(t *testing.T) { client := framework.NewEtcdClient() etcdStorage := etcd.NewEtcdStorage(client, testapi.Default.Codec(), "") + ctx := context.TODO() framework.WithEtcdKey(func(key string) { testObject := api.ServiceAccount{ObjectMeta: api.ObjectMeta{Name: "foo"}} - if err := etcdStorage.Set(key, &testObject, nil, 0); err != nil { + if err := etcdStorage.Set(ctx, key, &testObject, nil, 0); err != nil { t.Fatalf("unexpected error: %v", err) } result := &api.ServiceAccount{} - err := etcdStorage.GuaranteedUpdate(key, result, false, func(obj runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { + err := etcdStorage.GuaranteedUpdate(ctx, key, result, false, func(obj runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { if in, ok := obj.(*api.ServiceAccount); !ok || in.Name != "foo" { t.Fatalf("unexpected existing object: %v", obj) } @@ -111,7 +116,7 @@ func TestWriteTTL(t *testing.T) { } result = &api.ServiceAccount{} - err = etcdStorage.GuaranteedUpdate(key, result, false, func(obj runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { + err = etcdStorage.GuaranteedUpdate(ctx, key, result, false, func(obj runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { if in, ok := obj.(*api.ServiceAccount); !ok || in.Name != "out" { t.Fatalf("unexpected existing object: %v", obj) } @@ -136,6 +141,7 @@ func TestWriteTTL(t *testing.T) { func TestWatch(t *testing.T) { client := framework.NewEtcdClient() etcdStorage := etcd.NewEtcdStorage(client, testapi.Default.Codec(), etcdtest.PathPrefix()) + ctx := context.TODO() framework.WithEtcdKey(func(key string) { key = etcdtest.AddPrefix(key) resp, err := client.Set(key, runtime.EncodeOrDie(testapi.Default.Codec(), &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0) @@ -145,7 +151,7 @@ func TestWatch(t *testing.T) { expectedVersion := resp.Node.ModifiedIndex // watch should load the object at the current index - w, err := etcdStorage.Watch(key, 0, storage.Everything) + w, err := etcdStorage.Watch(ctx, key, 0, storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) }