diff --git a/pkg/api/context.go b/pkg/api/context.go new file mode 100644 index 00000000000..f7101ae7fe7 --- /dev/null +++ b/pkg/api/context.go @@ -0,0 +1,26 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package api + +import ( + "code.google.com/p/go.net/context" +) + +// NewContext instantiates a base context object for request flows +func NewContext() context.Context { + return context.Background() +} diff --git a/pkg/apiserver/interfaces.go b/pkg/apiserver/interfaces.go index 0c1fb2cce6e..7f7e6d4d45a 100644 --- a/pkg/apiserver/interfaces.go +++ b/pkg/apiserver/interfaces.go @@ -17,6 +17,7 @@ limitations under the License. package apiserver import ( + "code.google.com/p/go.net/context" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" @@ -30,20 +31,20 @@ type RESTStorage interface { New() runtime.Object // List selects resources in the storage which match to the selector. - List(label, field labels.Selector) (runtime.Object, error) + List(ctx context.Context, label, field labels.Selector) (runtime.Object, error) // Get finds a resource in the storage by id and returns it. // Although it can return an arbitrary error value, IsNotFound(err) is true for the // returned error value err when the specified resource is not found. - Get(id string) (runtime.Object, error) + Get(ctx context.Context, id string) (runtime.Object, error) // Delete finds a resource in the storage and deletes it. // Although it can return an arbitrary error value, IsNotFound(err) is true for the // returned error value err when the specified resource is not found. - Delete(id string) (<-chan runtime.Object, error) + Delete(ctx context.Context, id string) (<-chan runtime.Object, error) - Create(runtime.Object) (<-chan runtime.Object, error) - Update(runtime.Object) (<-chan runtime.Object, error) + Create(ctx context.Context, obj runtime.Object) (<-chan runtime.Object, error) + Update(ctx context.Context, obj runtime.Object) (<-chan runtime.Object, error) } // ResourceWatcher should be implemented by all RESTStorage objects that @@ -53,11 +54,11 @@ type ResourceWatcher interface { // are supported; an error should be returned if 'field' tries to select on a field that // isn't supported. 'resourceVersion' allows for continuing/starting a watch at a // particular version. - Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) + Watch(ctx context.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) } // Redirector know how to return a remote resource's location. type Redirector interface { // ResourceLocation should return the remote location of the given resource, or an error. - ResourceLocation(id string) (remoteLocation string, err error) + ResourceLocation(ctx context.Context, id string) (remoteLocation string, err error) } diff --git a/pkg/apiserver/proxy.go b/pkg/apiserver/proxy.go index caeeb0eaec5..51e3a518662 100644 --- a/pkg/apiserver/proxy.go +++ b/pkg/apiserver/proxy.go @@ -26,6 +26,7 @@ import ( "path" "strings" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -76,6 +77,7 @@ type ProxyHandler struct { } func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + ctx := api.NewContext() parts := strings.SplitN(req.URL.Path, "/", 3) if len(parts) < 2 { notFound(w, req) @@ -101,7 +103,7 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } - location, err := redirector.ResourceLocation(id) + location, err := redirector.ResourceLocation(ctx, id) if err != nil { status := errToAPIStatus(err) writeJSON(status.Code, r.codec, status, w) diff --git a/pkg/apiserver/redirect.go b/pkg/apiserver/redirect.go index d5175865404..f8b499d4e73 100644 --- a/pkg/apiserver/redirect.go +++ b/pkg/apiserver/redirect.go @@ -19,6 +19,7 @@ package apiserver import ( "net/http" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" ) @@ -29,6 +30,7 @@ type RedirectHandler struct { } func (r *RedirectHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + ctx := api.NewContext() parts := splitPath(req.URL.Path) if len(parts) != 2 || req.Method != "GET" { notFound(w, req) @@ -50,7 +52,7 @@ func (r *RedirectHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } - location, err := redirector.ResourceLocation(id) + location, err := redirector.ResourceLocation(ctx, id) if err != nil { status := errToAPIStatus(err) writeJSON(status.Code, r.codec, status, w) diff --git a/pkg/apiserver/resthandler.go b/pkg/apiserver/resthandler.go index 03208acb143..7f4dc1d6742 100644 --- a/pkg/apiserver/resthandler.go +++ b/pkg/apiserver/resthandler.go @@ -64,6 +64,7 @@ func (h *RESTHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { // timeout= Timeout for synchronous requests, only applies if sync=true // labels= Used for filtering list operations func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w http.ResponseWriter, storage RESTStorage) { + ctx := api.NewContext() sync := req.URL.Query().Get("sync") == "true" timeout := parseTimeout(req.URL.Query().Get("timeout")) switch req.Method { @@ -80,14 +81,14 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt errorJSON(err, h.codec, w) return } - list, err := storage.List(label, field) + list, err := storage.List(ctx, label, field) if err != nil { errorJSON(err, h.codec, w) return } writeJSON(http.StatusOK, h.codec, list, w) case 2: - item, err := storage.Get(parts[1]) + item, err := storage.Get(ctx, parts[1]) if err != nil { errorJSON(err, h.codec, w) return @@ -113,7 +114,7 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt errorJSON(err, h.codec, w) return } - out, err := storage.Create(obj) + out, err := storage.Create(ctx, obj) if err != nil { errorJSON(err, h.codec, w) return @@ -126,7 +127,7 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt notFound(w, req) return } - out, err := storage.Delete(parts[1]) + out, err := storage.Delete(ctx, parts[1]) if err != nil { errorJSON(err, h.codec, w) return @@ -150,7 +151,7 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt errorJSON(err, h.codec, w) return } - out, err := storage.Update(obj) + out, err := storage.Update(ctx, obj) if err != nil { errorJSON(err, h.codec, w) return diff --git a/pkg/apiserver/watch.go b/pkg/apiserver/watch.go index d110300b6d5..13023971588 100644 --- a/pkg/apiserver/watch.go +++ b/pkg/apiserver/watch.go @@ -24,6 +24,7 @@ import ( "strings" "code.google.com/p/go.net/websocket" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" @@ -61,6 +62,7 @@ func isWebsocketRequest(req *http.Request) bool { // ServeHTTP processes watch requests. func (h *WatchHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + ctx := api.NewContext() parts := splitPath(req.URL.Path) if len(parts) < 1 || req.Method != "GET" { notFound(w, req) @@ -73,7 +75,7 @@ func (h *WatchHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { } if watcher, ok := storage.(ResourceWatcher); ok { label, field, resourceVersion := getWatchParams(req.URL.Query()) - watching, err := watcher.Watch(label, field, resourceVersion) + watching, err := watcher.Watch(ctx, label, field, resourceVersion) if err != nil { errorJSON(err, h.codec, w) return diff --git a/pkg/master/pod_cache.go b/pkg/master/pod_cache.go index 9e68947279a..c9058cd38fa 100644 --- a/pkg/master/pod_cache.go +++ b/pkg/master/pod_cache.go @@ -19,6 +19,8 @@ package master import ( "sync" + "code.google.com/p/go.net/context" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" @@ -72,7 +74,8 @@ func (p *PodCache) updatePodInfo(host, id string) error { // UpdateAllContainers updates information about all containers. Either called by Loop() below, or one-off. func (p *PodCache) UpdateAllContainers() { - pods, err := p.pods.ListPods(labels.Everything()) + var ctx context.Context + pods, err := p.pods.ListPods(ctx, labels.Everything()) if err != nil { glog.Errorf("Error synchronizing container list: %v", err) return diff --git a/pkg/registry/binding/rest.go b/pkg/registry/binding/rest.go index bfc7b6e150f..276e8cfaf21 100644 --- a/pkg/registry/binding/rest.go +++ b/pkg/registry/binding/rest.go @@ -19,6 +19,8 @@ package binding import ( "fmt" + "code.google.com/p/go.net/context" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" @@ -41,17 +43,17 @@ func NewREST(bindingRegistry Registry) *REST { } // List returns an error because bindings are write-only objects. -func (*REST) List(label, field labels.Selector) (runtime.Object, error) { +func (*REST) List(ctx context.Context, label, field labels.Selector) (runtime.Object, error) { return nil, errors.NewNotFound("binding", "list") } // Get returns an error because bindings are write-only objects. -func (*REST) Get(id string) (runtime.Object, error) { +func (*REST) Get(ctx context.Context, id string) (runtime.Object, error) { return nil, errors.NewNotFound("binding", id) } // Delete returns an error because bindings are write-only objects. -func (*REST) Delete(id string) (<-chan runtime.Object, error) { +func (*REST) Delete(ctx context.Context, id string) (<-chan runtime.Object, error) { return nil, errors.NewNotFound("binding", id) } @@ -61,7 +63,7 @@ func (*REST) New() runtime.Object { } // Create attempts to make the assignment indicated by the binding it recieves. -func (b *REST) Create(obj runtime.Object) (<-chan runtime.Object, error) { +func (b *REST) Create(ctx context.Context, obj runtime.Object) (<-chan runtime.Object, error) { binding, ok := obj.(*api.Binding) if !ok { return nil, fmt.Errorf("incorrect type: %#v", obj) @@ -75,6 +77,6 @@ func (b *REST) Create(obj runtime.Object) (<-chan runtime.Object, error) { } // Update returns an error-- this object may not be updated. -func (b *REST) Update(obj runtime.Object) (<-chan runtime.Object, error) { +func (b *REST) Update(ctx context.Context, obj runtime.Object) (<-chan runtime.Object, error) { return nil, fmt.Errorf("Bindings may not be changed.") } diff --git a/pkg/registry/controller/rest.go b/pkg/registry/controller/rest.go index bc3fb05dc5c..42c5ce43fbe 100644 --- a/pkg/registry/controller/rest.go +++ b/pkg/registry/controller/rest.go @@ -30,11 +30,12 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "code.google.com/p/go-uuid/uuid" + "code.google.com/p/go.net/context" ) // PodLister is anything that knows how to list pods. type PodLister interface { - ListPods(labels.Selector) (*api.PodList, error) + ListPods(ctx context.Context, labels labels.Selector) (*api.PodList, error) } // REST implements apiserver.RESTStorage for the replication controller service. @@ -54,7 +55,7 @@ func NewREST(registry Registry, podLister PodLister) *REST { } // Create registers the given ReplicationController. -func (rs *REST) Create(obj runtime.Object) (<-chan runtime.Object, error) { +func (rs *REST) Create(ctx context.Context, obj runtime.Object) (<-chan runtime.Object, error) { controller, ok := obj.(*api.ReplicationController) if !ok { return nil, fmt.Errorf("not a replication controller: %#v", obj) @@ -80,24 +81,24 @@ func (rs *REST) Create(obj runtime.Object) (<-chan runtime.Object, error) { } // Delete asynchronously deletes the ReplicationController specified by its id. -func (rs *REST) Delete(id string) (<-chan runtime.Object, error) { +func (rs *REST) Delete(ctx context.Context, id string) (<-chan runtime.Object, error) { return apiserver.MakeAsync(func() (runtime.Object, error) { return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteController(id) }), nil } // Get obtains the ReplicationController specified by its id. -func (rs *REST) Get(id string) (runtime.Object, error) { +func (rs *REST) Get(ctx context.Context, id string) (runtime.Object, error) { controller, err := rs.registry.GetController(id) if err != nil { return nil, err } - rs.fillCurrentState(controller) + rs.fillCurrentState(ctx, controller) return controller, err } // List obtains a list of ReplicationControllers that match selector. -func (rs *REST) List(label, field labels.Selector) (runtime.Object, error) { +func (rs *REST) List(ctx context.Context, label, field labels.Selector) (runtime.Object, error) { if !field.Empty() { return nil, fmt.Errorf("field selector not supported yet") } @@ -108,7 +109,7 @@ func (rs *REST) List(label, field labels.Selector) (runtime.Object, error) { filtered := []api.ReplicationController{} for _, controller := range controllers.Items { if label.Matches(labels.Set(controller.Labels)) { - rs.fillCurrentState(&controller) + rs.fillCurrentState(ctx, &controller) filtered = append(filtered, controller) } } @@ -123,7 +124,7 @@ func (*REST) New() runtime.Object { // Update replaces a given ReplicationController instance with an existing // instance in storage.registry. -func (rs *REST) Update(obj runtime.Object) (<-chan runtime.Object, error) { +func (rs *REST) Update(ctx context.Context, obj runtime.Object) (<-chan runtime.Object, error) { controller, ok := obj.(*api.ReplicationController) if !ok { return nil, fmt.Errorf("not a replication controller: %#v", obj) @@ -142,7 +143,7 @@ func (rs *REST) Update(obj runtime.Object) (<-chan runtime.Object, error) { // Watch returns ReplicationController events via a watch.Interface. // It implements apiserver.ResourceWatcher. -func (rs *REST) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (rs *REST) Watch(ctx context.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { if !field.Empty() { return nil, fmt.Errorf("no field selector implemented for controllers") } @@ -160,15 +161,15 @@ func (rs *REST) Watch(label, field labels.Selector, resourceVersion uint64) (wat } match := label.Matches(labels.Set(repController.Labels)) if match { - rs.fillCurrentState(repController) + rs.fillCurrentState(ctx, repController) } return e, match }), nil } -func (rs *REST) waitForController(ctrl *api.ReplicationController) (runtime.Object, error) { +func (rs *REST) waitForController(ctx context.Context, ctrl *api.ReplicationController) (runtime.Object, error) { for { - pods, err := rs.podLister.ListPods(labels.Set(ctrl.DesiredState.ReplicaSelector).AsSelector()) + pods, err := rs.podLister.ListPods(ctx, labels.Set(ctrl.DesiredState.ReplicaSelector).AsSelector()) if err != nil { return ctrl, err } @@ -180,11 +181,11 @@ func (rs *REST) waitForController(ctrl *api.ReplicationController) (runtime.Obje return ctrl, nil } -func (rs *REST) fillCurrentState(ctrl *api.ReplicationController) error { +func (rs *REST) fillCurrentState(ctx context.Context, ctrl *api.ReplicationController) error { if rs.podLister == nil { return nil } - list, err := rs.podLister.ListPods(labels.Set(ctrl.DesiredState.ReplicaSelector).AsSelector()) + list, err := rs.podLister.ListPods(ctx, labels.Set(ctrl.DesiredState.ReplicaSelector).AsSelector()) if err != nil { return err } diff --git a/pkg/registry/endpoint/rest.go b/pkg/registry/endpoint/rest.go index 7e620dfcde0..e3a69077d62 100644 --- a/pkg/registry/endpoint/rest.go +++ b/pkg/registry/endpoint/rest.go @@ -19,6 +19,8 @@ package endpoint import ( "errors" + "code.google.com/p/go.net/context" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" @@ -38,12 +40,12 @@ func NewREST(registry Registry) *REST { } // Get satisfies the RESTStorage interface. -func (rs *REST) Get(id string) (runtime.Object, error) { +func (rs *REST) Get(ctx context.Context, id string) (runtime.Object, error) { return rs.registry.GetEndpoints(id) } // List satisfies the RESTStorage interface. -func (rs *REST) List(label, field labels.Selector) (runtime.Object, error) { +func (rs *REST) List(ctx context.Context, label, field labels.Selector) (runtime.Object, error) { if !label.Empty() || !field.Empty() { return nil, errors.New("label/field selectors are not supported on endpoints") } @@ -52,22 +54,22 @@ func (rs *REST) List(label, field labels.Selector) (runtime.Object, error) { // Watch returns Endpoint events via a watch.Interface. // It implements apiserver.ResourceWatcher. -func (rs *REST) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (rs *REST) Watch(ctx context.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { return rs.registry.WatchEndpoints(label, field, resourceVersion) } // Create satisfies the RESTStorage interface but is unimplemented. -func (rs *REST) Create(obj runtime.Object) (<-chan runtime.Object, error) { +func (rs *REST) Create(ctx context.Context, obj runtime.Object) (<-chan runtime.Object, error) { return nil, errors.New("unimplemented") } // Update satisfies the RESTStorage interface but is unimplemented. -func (rs *REST) Update(obj runtime.Object) (<-chan runtime.Object, error) { +func (rs *REST) Update(ctx context.Context, obj runtime.Object) (<-chan runtime.Object, error) { return nil, errors.New("unimplemented") } // Delete satisfies the RESTStorage interface but is unimplemented. -func (rs *REST) Delete(id string) (<-chan runtime.Object, error) { +func (rs *REST) Delete(ctx context.Context, id string) (<-chan runtime.Object, error) { return nil, errors.New("unimplemented") } diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index 409121f01cc..b039864ef28 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -29,6 +29,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/golang/glog" + + "code.google.com/p/go.net/context" ) // TODO: Need to add a reconciler loop that makes sure that things in pods are reflected into @@ -55,7 +57,7 @@ func makePodKey(podID string) string { } // ListPods obtains a list of pods with labels that match selector. -func (r *Registry) ListPods(selector labels.Selector) (*api.PodList, error) { +func (r *Registry) ListPods(ctx context.Context, selector labels.Selector) (*api.PodList, error) { return r.ListPodsPredicate(func(pod *api.Pod) bool { return selector.Matches(labels.Set(pod.Labels)) }) diff --git a/pkg/registry/minion/rest.go b/pkg/registry/minion/rest.go index 7b512f68d6e..dca9e631fa5 100644 --- a/pkg/registry/minion/rest.go +++ b/pkg/registry/minion/rest.go @@ -19,6 +19,8 @@ package minion import ( "fmt" + "code.google.com/p/go.net/context" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" @@ -38,7 +40,7 @@ func NewREST(m Registry) *REST { } } -func (rs *REST) Create(obj runtime.Object) (<-chan runtime.Object, error) { +func (rs *REST) Create(ctx context.Context, obj runtime.Object) (<-chan runtime.Object, error) { minion, ok := obj.(*api.Minion) if !ok { return nil, fmt.Errorf("not a minion: %#v", obj) @@ -65,7 +67,7 @@ func (rs *REST) Create(obj runtime.Object) (<-chan runtime.Object, error) { }), nil } -func (rs *REST) Delete(id string) (<-chan runtime.Object, error) { +func (rs *REST) Delete(ctx context.Context, id string) (<-chan runtime.Object, error) { exists, err := rs.registry.Contains(id) if !exists { return nil, ErrDoesNotExist @@ -78,7 +80,7 @@ func (rs *REST) Delete(id string) (<-chan runtime.Object, error) { }), nil } -func (rs *REST) Get(id string) (runtime.Object, error) { +func (rs *REST) Get(ctx context.Context, id string) (runtime.Object, error) { exists, err := rs.registry.Contains(id) if !exists { return nil, ErrDoesNotExist @@ -86,7 +88,7 @@ func (rs *REST) Get(id string) (runtime.Object, error) { return rs.toApiMinion(id), err } -func (rs *REST) List(label, field labels.Selector) (runtime.Object, error) { +func (rs *REST) List(ctx context.Context, label, field labels.Selector) (runtime.Object, error) { nameList, err := rs.registry.List() if err != nil { return nil, err @@ -102,7 +104,7 @@ func (*REST) New() runtime.Object { return &api.Minion{} } -func (rs *REST) Update(minion runtime.Object) (<-chan runtime.Object, error) { +func (rs *REST) Update(ctx context.Context, minion runtime.Object) (<-chan runtime.Object, error) { return nil, fmt.Errorf("Minions can only be created (inserted) and deleted.") } diff --git a/pkg/registry/pod/registry.go b/pkg/registry/pod/registry.go index 876e9118208..ba2fec56f92 100644 --- a/pkg/registry/pod/registry.go +++ b/pkg/registry/pod/registry.go @@ -17,6 +17,8 @@ limitations under the License. package pod import ( + "code.google.com/p/go.net/context" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" @@ -25,7 +27,7 @@ import ( // Registry is an interface implemented by things that know how to store Pod objects. type Registry interface { // ListPods obtains a list of pods having labels which match selector. - ListPods(selector labels.Selector) (*api.PodList, error) + ListPods(ctx context.Context, selector labels.Selector) (*api.PodList, error) // ListPodsPredicate obtains a list of pods for which filter returns true. ListPodsPredicate(filter func(*api.Pod) bool) (*api.PodList, error) // Watch for new/changed/deleted pods diff --git a/pkg/registry/pod/rest.go b/pkg/registry/pod/rest.go index 5bd46414f13..81f9c9f0e21 100644 --- a/pkg/registry/pod/rest.go +++ b/pkg/registry/pod/rest.go @@ -32,6 +32,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + "code.google.com/p/go.net/context" + "code.google.com/p/go-uuid/uuid" "github.com/golang/glog" ) @@ -67,7 +69,7 @@ func NewREST(config *RESTConfig) *REST { } } -func (rs *REST) Create(obj runtime.Object) (<-chan runtime.Object, error) { +func (rs *REST) Create(ctx context.Context, obj runtime.Object) (<-chan runtime.Object, error) { pod := obj.(*api.Pod) pod.DesiredState.Manifest.UUID = uuid.NewUUID().String() if len(pod.ID) == 0 { @@ -88,13 +90,13 @@ func (rs *REST) Create(obj runtime.Object) (<-chan runtime.Object, error) { }), nil } -func (rs *REST) Delete(id string) (<-chan runtime.Object, error) { +func (rs *REST) Delete(ctx context.Context, id string) (<-chan runtime.Object, error) { return apiserver.MakeAsync(func() (runtime.Object, error) { return &api.Status{Status: api.StatusSuccess}, rs.registry.DeletePod(id) }), nil } -func (rs *REST) Get(id string) (runtime.Object, error) { +func (rs *REST) Get(ctx context.Context, id string) (runtime.Object, error) { pod, err := rs.registry.GetPod(id) if err != nil { return pod, err @@ -131,7 +133,7 @@ func (rs *REST) filterFunc(label, field labels.Selector) func(*api.Pod) bool { } } -func (rs *REST) List(label, field labels.Selector) (runtime.Object, error) { +func (rs *REST) List(ctx context.Context, label, field labels.Selector) (runtime.Object, error) { pods, err := rs.registry.ListPodsPredicate(rs.filterFunc(label, field)) if err == nil { for i := range pods.Items { @@ -149,7 +151,7 @@ func (rs *REST) List(label, field labels.Selector) (runtime.Object, error) { } // Watch begins watching for new, changed, or deleted pods. -func (rs *REST) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (rs *REST) Watch(ctx context.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { return rs.registry.WatchPods(resourceVersion, rs.filterFunc(label, field)) } @@ -157,7 +159,7 @@ func (*REST) New() runtime.Object { return &api.Pod{} } -func (rs *REST) Update(obj runtime.Object) (<-chan runtime.Object, error) { +func (rs *REST) Update(ctx context.Context, obj runtime.Object) (<-chan runtime.Object, error) { pod := obj.(*api.Pod) if errs := validation.ValidatePod(pod); len(errs) > 0 { return nil, errors.NewInvalid("pod", pod.ID, errs) @@ -277,9 +279,9 @@ func getPodStatus(pod *api.Pod, minions client.MinionInterface) (api.PodStatus, } } -func (rs *REST) waitForPodRunning(pod *api.Pod) (runtime.Object, error) { +func (rs *REST) waitForPodRunning(ctx context.Context, pod *api.Pod) (runtime.Object, error) { for { - podObj, err := rs.Get(pod.ID) + podObj, err := rs.Get(ctx, pod.ID) if err != nil || podObj == nil { return nil, err } diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index 7226b0a7250..f193256bd97 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -32,6 +32,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + + "code.google.com/p/go.net/context" ) // REST adapts a service registry into apiserver's RESTStorage model. @@ -50,7 +52,7 @@ func NewREST(registry Registry, cloud cloudprovider.Interface, machines minion.R } } -func (rs *REST) Create(obj runtime.Object) (<-chan runtime.Object, error) { +func (rs *REST) Create(ctx context.Context, obj runtime.Object) (<-chan runtime.Object, error) { srv := obj.(*api.Service) if errs := validation.ValidateService(srv); len(errs) > 0 { return nil, errors.NewInvalid("service", srv.ID, errs) @@ -94,7 +96,7 @@ func (rs *REST) Create(obj runtime.Object) (<-chan runtime.Object, error) { }), nil } -func (rs *REST) Delete(id string) (<-chan runtime.Object, error) { +func (rs *REST) Delete(ctx context.Context, id string) (<-chan runtime.Object, error) { service, err := rs.registry.GetService(id) if err != nil { return nil, err @@ -105,7 +107,7 @@ func (rs *REST) Delete(id string) (<-chan runtime.Object, error) { }), nil } -func (rs *REST) Get(id string) (runtime.Object, error) { +func (rs *REST) Get(ctx context.Context, id string) (runtime.Object, error) { s, err := rs.registry.GetService(id) if err != nil { return nil, err @@ -114,7 +116,7 @@ func (rs *REST) Get(id string) (runtime.Object, error) { } // TODO: implement field selector? -func (rs *REST) List(label, field labels.Selector) (runtime.Object, error) { +func (rs *REST) List(ctx context.Context, label, field labels.Selector) (runtime.Object, error) { list, err := rs.registry.ListServices() if err != nil { return nil, err @@ -131,7 +133,7 @@ func (rs *REST) List(label, field labels.Selector) (runtime.Object, error) { // Watch returns Services events via a watch.Interface. // It implements apiserver.ResourceWatcher. -func (rs *REST) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (rs *REST) Watch(ctx context.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { return rs.registry.WatchServices(label, field, resourceVersion) } @@ -163,7 +165,7 @@ func GetServiceEnvironmentVariables(registry Registry, machine string) ([]api.En return result, nil } -func (rs *REST) Update(obj runtime.Object) (<-chan runtime.Object, error) { +func (rs *REST) Update(ctx context.Context, obj runtime.Object) (<-chan runtime.Object, error) { srv := obj.(*api.Service) if errs := validation.ValidateService(srv); len(errs) > 0 { return nil, errors.NewInvalid("service", srv.ID, errs) @@ -179,7 +181,7 @@ func (rs *REST) Update(obj runtime.Object) (<-chan runtime.Object, error) { } // ResourceLocation returns a URL to which one can send traffic for the specified service. -func (rs *REST) ResourceLocation(id string) (string, error) { +func (rs *REST) ResourceLocation(ctx context.Context, id string) (string, error) { e, err := rs.registry.GetEndpoints(id) if err != nil { return "", err