Add context object to interfaces

This commit is contained in:
derekwaynecarr 2014-09-25 14:34:01 -04:00
parent 377a9ac3d7
commit 3e685674e7
15 changed files with 115 additions and 63 deletions

26
pkg/api/context.go Normal file
View File

@ -0,0 +1,26 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package api
import (
"code.google.com/p/go.net/context"
)
// NewContext instantiates a base context object for request flows
func NewContext() context.Context {
return context.Background()
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package apiserver package apiserver
import ( import (
"code.google.com/p/go.net/context"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
@ -30,20 +31,20 @@ type RESTStorage interface {
New() runtime.Object New() runtime.Object
// List selects resources in the storage which match to the selector. // List selects resources in the storage which match to the selector.
List(label, field labels.Selector) (runtime.Object, error) List(ctx context.Context, label, field labels.Selector) (runtime.Object, error)
// Get finds a resource in the storage by id and returns it. // Get finds a resource in the storage by id and returns it.
// Although it can return an arbitrary error value, IsNotFound(err) is true for the // Although it can return an arbitrary error value, IsNotFound(err) is true for the
// returned error value err when the specified resource is not found. // returned error value err when the specified resource is not found.
Get(id string) (runtime.Object, error) Get(ctx context.Context, id string) (runtime.Object, error)
// Delete finds a resource in the storage and deletes it. // Delete finds a resource in the storage and deletes it.
// Although it can return an arbitrary error value, IsNotFound(err) is true for the // Although it can return an arbitrary error value, IsNotFound(err) is true for the
// returned error value err when the specified resource is not found. // returned error value err when the specified resource is not found.
Delete(id string) (<-chan runtime.Object, error) Delete(ctx context.Context, id string) (<-chan runtime.Object, error)
Create(runtime.Object) (<-chan runtime.Object, error) Create(ctx context.Context, obj runtime.Object) (<-chan runtime.Object, error)
Update(runtime.Object) (<-chan runtime.Object, error) Update(ctx context.Context, obj runtime.Object) (<-chan runtime.Object, error)
} }
// ResourceWatcher should be implemented by all RESTStorage objects that // ResourceWatcher should be implemented by all RESTStorage objects that
@ -53,11 +54,11 @@ type ResourceWatcher interface {
// are supported; an error should be returned if 'field' tries to select on a field that // are supported; an error should be returned if 'field' tries to select on a field that
// isn't supported. 'resourceVersion' allows for continuing/starting a watch at a // isn't supported. 'resourceVersion' allows for continuing/starting a watch at a
// particular version. // particular version.
Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) Watch(ctx context.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
} }
// Redirector know how to return a remote resource's location. // Redirector know how to return a remote resource's location.
type Redirector interface { type Redirector interface {
// ResourceLocation should return the remote location of the given resource, or an error. // ResourceLocation should return the remote location of the given resource, or an error.
ResourceLocation(id string) (remoteLocation string, err error) ResourceLocation(ctx context.Context, id string) (remoteLocation string, err error)
} }

View File

@ -26,6 +26,7 @@ import (
"path" "path"
"strings" "strings"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@ -76,6 +77,7 @@ type ProxyHandler struct {
} }
func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
ctx := api.NewContext()
parts := strings.SplitN(req.URL.Path, "/", 3) parts := strings.SplitN(req.URL.Path, "/", 3)
if len(parts) < 2 { if len(parts) < 2 {
notFound(w, req) notFound(w, req)
@ -101,7 +103,7 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return return
} }
location, err := redirector.ResourceLocation(id) location, err := redirector.ResourceLocation(ctx, id)
if err != nil { if err != nil {
status := errToAPIStatus(err) status := errToAPIStatus(err)
writeJSON(status.Code, r.codec, status, w) writeJSON(status.Code, r.codec, status, w)

View File

@ -19,6 +19,7 @@ package apiserver
import ( import (
"net/http" "net/http"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
) )
@ -29,6 +30,7 @@ type RedirectHandler struct {
} }
func (r *RedirectHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { func (r *RedirectHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
ctx := api.NewContext()
parts := splitPath(req.URL.Path) parts := splitPath(req.URL.Path)
if len(parts) != 2 || req.Method != "GET" { if len(parts) != 2 || req.Method != "GET" {
notFound(w, req) notFound(w, req)
@ -50,7 +52,7 @@ func (r *RedirectHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return return
} }
location, err := redirector.ResourceLocation(id) location, err := redirector.ResourceLocation(ctx, id)
if err != nil { if err != nil {
status := errToAPIStatus(err) status := errToAPIStatus(err)
writeJSON(status.Code, r.codec, status, w) writeJSON(status.Code, r.codec, status, w)

View File

@ -64,6 +64,7 @@ func (h *RESTHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// timeout=<duration> Timeout for synchronous requests, only applies if sync=true // timeout=<duration> Timeout for synchronous requests, only applies if sync=true
// labels=<label-selector> Used for filtering list operations // labels=<label-selector> Used for filtering list operations
func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w http.ResponseWriter, storage RESTStorage) { func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w http.ResponseWriter, storage RESTStorage) {
ctx := api.NewContext()
sync := req.URL.Query().Get("sync") == "true" sync := req.URL.Query().Get("sync") == "true"
timeout := parseTimeout(req.URL.Query().Get("timeout")) timeout := parseTimeout(req.URL.Query().Get("timeout"))
switch req.Method { switch req.Method {
@ -80,14 +81,14 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt
errorJSON(err, h.codec, w) errorJSON(err, h.codec, w)
return return
} }
list, err := storage.List(label, field) list, err := storage.List(ctx, label, field)
if err != nil { if err != nil {
errorJSON(err, h.codec, w) errorJSON(err, h.codec, w)
return return
} }
writeJSON(http.StatusOK, h.codec, list, w) writeJSON(http.StatusOK, h.codec, list, w)
case 2: case 2:
item, err := storage.Get(parts[1]) item, err := storage.Get(ctx, parts[1])
if err != nil { if err != nil {
errorJSON(err, h.codec, w) errorJSON(err, h.codec, w)
return return
@ -113,7 +114,7 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt
errorJSON(err, h.codec, w) errorJSON(err, h.codec, w)
return return
} }
out, err := storage.Create(obj) out, err := storage.Create(ctx, obj)
if err != nil { if err != nil {
errorJSON(err, h.codec, w) errorJSON(err, h.codec, w)
return return
@ -126,7 +127,7 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt
notFound(w, req) notFound(w, req)
return return
} }
out, err := storage.Delete(parts[1]) out, err := storage.Delete(ctx, parts[1])
if err != nil { if err != nil {
errorJSON(err, h.codec, w) errorJSON(err, h.codec, w)
return return
@ -150,7 +151,7 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt
errorJSON(err, h.codec, w) errorJSON(err, h.codec, w)
return return
} }
out, err := storage.Update(obj) out, err := storage.Update(ctx, obj)
if err != nil { if err != nil {
errorJSON(err, h.codec, w) errorJSON(err, h.codec, w)
return return

View File

@ -24,6 +24,7 @@ import (
"strings" "strings"
"code.google.com/p/go.net/websocket" "code.google.com/p/go.net/websocket"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
@ -61,6 +62,7 @@ func isWebsocketRequest(req *http.Request) bool {
// ServeHTTP processes watch requests. // ServeHTTP processes watch requests.
func (h *WatchHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { func (h *WatchHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
ctx := api.NewContext()
parts := splitPath(req.URL.Path) parts := splitPath(req.URL.Path)
if len(parts) < 1 || req.Method != "GET" { if len(parts) < 1 || req.Method != "GET" {
notFound(w, req) notFound(w, req)
@ -73,7 +75,7 @@ func (h *WatchHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
} }
if watcher, ok := storage.(ResourceWatcher); ok { if watcher, ok := storage.(ResourceWatcher); ok {
label, field, resourceVersion := getWatchParams(req.URL.Query()) label, field, resourceVersion := getWatchParams(req.URL.Query())
watching, err := watcher.Watch(label, field, resourceVersion) watching, err := watcher.Watch(ctx, label, field, resourceVersion)
if err != nil { if err != nil {
errorJSON(err, h.codec, w) errorJSON(err, h.codec, w)
return return

View File

@ -19,6 +19,8 @@ package master
import ( import (
"sync" "sync"
"code.google.com/p/go.net/context"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
@ -72,7 +74,8 @@ func (p *PodCache) updatePodInfo(host, id string) error {
// UpdateAllContainers updates information about all containers. Either called by Loop() below, or one-off. // UpdateAllContainers updates information about all containers. Either called by Loop() below, or one-off.
func (p *PodCache) UpdateAllContainers() { func (p *PodCache) UpdateAllContainers() {
pods, err := p.pods.ListPods(labels.Everything()) var ctx context.Context
pods, err := p.pods.ListPods(ctx, labels.Everything())
if err != nil { if err != nil {
glog.Errorf("Error synchronizing container list: %v", err) glog.Errorf("Error synchronizing container list: %v", err)
return return

View File

@ -19,6 +19,8 @@ package binding
import ( import (
"fmt" "fmt"
"code.google.com/p/go.net/context"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
@ -41,17 +43,17 @@ func NewREST(bindingRegistry Registry) *REST {
} }
// List returns an error because bindings are write-only objects. // List returns an error because bindings are write-only objects.
func (*REST) List(label, field labels.Selector) (runtime.Object, error) { func (*REST) List(ctx context.Context, label, field labels.Selector) (runtime.Object, error) {
return nil, errors.NewNotFound("binding", "list") return nil, errors.NewNotFound("binding", "list")
} }
// Get returns an error because bindings are write-only objects. // Get returns an error because bindings are write-only objects.
func (*REST) Get(id string) (runtime.Object, error) { func (*REST) Get(ctx context.Context, id string) (runtime.Object, error) {
return nil, errors.NewNotFound("binding", id) return nil, errors.NewNotFound("binding", id)
} }
// Delete returns an error because bindings are write-only objects. // Delete returns an error because bindings are write-only objects.
func (*REST) Delete(id string) (<-chan runtime.Object, error) { func (*REST) Delete(ctx context.Context, id string) (<-chan runtime.Object, error) {
return nil, errors.NewNotFound("binding", id) return nil, errors.NewNotFound("binding", id)
} }
@ -61,7 +63,7 @@ func (*REST) New() runtime.Object {
} }
// Create attempts to make the assignment indicated by the binding it recieves. // Create attempts to make the assignment indicated by the binding it recieves.
func (b *REST) Create(obj runtime.Object) (<-chan runtime.Object, error) { func (b *REST) Create(ctx context.Context, obj runtime.Object) (<-chan runtime.Object, error) {
binding, ok := obj.(*api.Binding) binding, ok := obj.(*api.Binding)
if !ok { if !ok {
return nil, fmt.Errorf("incorrect type: %#v", obj) return nil, fmt.Errorf("incorrect type: %#v", obj)
@ -75,6 +77,6 @@ func (b *REST) Create(obj runtime.Object) (<-chan runtime.Object, error) {
} }
// Update returns an error-- this object may not be updated. // Update returns an error-- this object may not be updated.
func (b *REST) Update(obj runtime.Object) (<-chan runtime.Object, error) { func (b *REST) Update(ctx context.Context, obj runtime.Object) (<-chan runtime.Object, error) {
return nil, fmt.Errorf("Bindings may not be changed.") return nil, fmt.Errorf("Bindings may not be changed.")
} }

View File

@ -30,11 +30,12 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"code.google.com/p/go-uuid/uuid" "code.google.com/p/go-uuid/uuid"
"code.google.com/p/go.net/context"
) )
// PodLister is anything that knows how to list pods. // PodLister is anything that knows how to list pods.
type PodLister interface { type PodLister interface {
ListPods(labels.Selector) (*api.PodList, error) ListPods(ctx context.Context, labels labels.Selector) (*api.PodList, error)
} }
// REST implements apiserver.RESTStorage for the replication controller service. // REST implements apiserver.RESTStorage for the replication controller service.
@ -54,7 +55,7 @@ func NewREST(registry Registry, podLister PodLister) *REST {
} }
// Create registers the given ReplicationController. // Create registers the given ReplicationController.
func (rs *REST) Create(obj runtime.Object) (<-chan runtime.Object, error) { func (rs *REST) Create(ctx context.Context, obj runtime.Object) (<-chan runtime.Object, error) {
controller, ok := obj.(*api.ReplicationController) controller, ok := obj.(*api.ReplicationController)
if !ok { if !ok {
return nil, fmt.Errorf("not a replication controller: %#v", obj) return nil, fmt.Errorf("not a replication controller: %#v", obj)
@ -80,24 +81,24 @@ func (rs *REST) Create(obj runtime.Object) (<-chan runtime.Object, error) {
} }
// Delete asynchronously deletes the ReplicationController specified by its id. // Delete asynchronously deletes the ReplicationController specified by its id.
func (rs *REST) Delete(id string) (<-chan runtime.Object, error) { func (rs *REST) Delete(ctx context.Context, id string) (<-chan runtime.Object, error) {
return apiserver.MakeAsync(func() (runtime.Object, error) { return apiserver.MakeAsync(func() (runtime.Object, error) {
return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteController(id) return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteController(id)
}), nil }), nil
} }
// Get obtains the ReplicationController specified by its id. // Get obtains the ReplicationController specified by its id.
func (rs *REST) Get(id string) (runtime.Object, error) { func (rs *REST) Get(ctx context.Context, id string) (runtime.Object, error) {
controller, err := rs.registry.GetController(id) controller, err := rs.registry.GetController(id)
if err != nil { if err != nil {
return nil, err return nil, err
} }
rs.fillCurrentState(controller) rs.fillCurrentState(ctx, controller)
return controller, err return controller, err
} }
// List obtains a list of ReplicationControllers that match selector. // List obtains a list of ReplicationControllers that match selector.
func (rs *REST) List(label, field labels.Selector) (runtime.Object, error) { func (rs *REST) List(ctx context.Context, label, field labels.Selector) (runtime.Object, error) {
if !field.Empty() { if !field.Empty() {
return nil, fmt.Errorf("field selector not supported yet") return nil, fmt.Errorf("field selector not supported yet")
} }
@ -108,7 +109,7 @@ func (rs *REST) List(label, field labels.Selector) (runtime.Object, error) {
filtered := []api.ReplicationController{} filtered := []api.ReplicationController{}
for _, controller := range controllers.Items { for _, controller := range controllers.Items {
if label.Matches(labels.Set(controller.Labels)) { if label.Matches(labels.Set(controller.Labels)) {
rs.fillCurrentState(&controller) rs.fillCurrentState(ctx, &controller)
filtered = append(filtered, controller) filtered = append(filtered, controller)
} }
} }
@ -123,7 +124,7 @@ func (*REST) New() runtime.Object {
// Update replaces a given ReplicationController instance with an existing // Update replaces a given ReplicationController instance with an existing
// instance in storage.registry. // instance in storage.registry.
func (rs *REST) Update(obj runtime.Object) (<-chan runtime.Object, error) { func (rs *REST) Update(ctx context.Context, obj runtime.Object) (<-chan runtime.Object, error) {
controller, ok := obj.(*api.ReplicationController) controller, ok := obj.(*api.ReplicationController)
if !ok { if !ok {
return nil, fmt.Errorf("not a replication controller: %#v", obj) return nil, fmt.Errorf("not a replication controller: %#v", obj)
@ -142,7 +143,7 @@ func (rs *REST) Update(obj runtime.Object) (<-chan runtime.Object, error) {
// Watch returns ReplicationController events via a watch.Interface. // Watch returns ReplicationController events via a watch.Interface.
// It implements apiserver.ResourceWatcher. // It implements apiserver.ResourceWatcher.
func (rs *REST) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { func (rs *REST) Watch(ctx context.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
if !field.Empty() { if !field.Empty() {
return nil, fmt.Errorf("no field selector implemented for controllers") return nil, fmt.Errorf("no field selector implemented for controllers")
} }
@ -160,15 +161,15 @@ func (rs *REST) Watch(label, field labels.Selector, resourceVersion uint64) (wat
} }
match := label.Matches(labels.Set(repController.Labels)) match := label.Matches(labels.Set(repController.Labels))
if match { if match {
rs.fillCurrentState(repController) rs.fillCurrentState(ctx, repController)
} }
return e, match return e, match
}), nil }), nil
} }
func (rs *REST) waitForController(ctrl *api.ReplicationController) (runtime.Object, error) { func (rs *REST) waitForController(ctx context.Context, ctrl *api.ReplicationController) (runtime.Object, error) {
for { for {
pods, err := rs.podLister.ListPods(labels.Set(ctrl.DesiredState.ReplicaSelector).AsSelector()) pods, err := rs.podLister.ListPods(ctx, labels.Set(ctrl.DesiredState.ReplicaSelector).AsSelector())
if err != nil { if err != nil {
return ctrl, err return ctrl, err
} }
@ -180,11 +181,11 @@ func (rs *REST) waitForController(ctrl *api.ReplicationController) (runtime.Obje
return ctrl, nil return ctrl, nil
} }
func (rs *REST) fillCurrentState(ctrl *api.ReplicationController) error { func (rs *REST) fillCurrentState(ctx context.Context, ctrl *api.ReplicationController) error {
if rs.podLister == nil { if rs.podLister == nil {
return nil return nil
} }
list, err := rs.podLister.ListPods(labels.Set(ctrl.DesiredState.ReplicaSelector).AsSelector()) list, err := rs.podLister.ListPods(ctx, labels.Set(ctrl.DesiredState.ReplicaSelector).AsSelector())
if err != nil { if err != nil {
return err return err
} }

View File

@ -19,6 +19,8 @@ package endpoint
import ( import (
"errors" "errors"
"code.google.com/p/go.net/context"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
@ -38,12 +40,12 @@ func NewREST(registry Registry) *REST {
} }
// Get satisfies the RESTStorage interface. // Get satisfies the RESTStorage interface.
func (rs *REST) Get(id string) (runtime.Object, error) { func (rs *REST) Get(ctx context.Context, id string) (runtime.Object, error) {
return rs.registry.GetEndpoints(id) return rs.registry.GetEndpoints(id)
} }
// List satisfies the RESTStorage interface. // List satisfies the RESTStorage interface.
func (rs *REST) List(label, field labels.Selector) (runtime.Object, error) { func (rs *REST) List(ctx context.Context, label, field labels.Selector) (runtime.Object, error) {
if !label.Empty() || !field.Empty() { if !label.Empty() || !field.Empty() {
return nil, errors.New("label/field selectors are not supported on endpoints") return nil, errors.New("label/field selectors are not supported on endpoints")
} }
@ -52,22 +54,22 @@ func (rs *REST) List(label, field labels.Selector) (runtime.Object, error) {
// Watch returns Endpoint events via a watch.Interface. // Watch returns Endpoint events via a watch.Interface.
// It implements apiserver.ResourceWatcher. // It implements apiserver.ResourceWatcher.
func (rs *REST) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { func (rs *REST) Watch(ctx context.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return rs.registry.WatchEndpoints(label, field, resourceVersion) return rs.registry.WatchEndpoints(label, field, resourceVersion)
} }
// Create satisfies the RESTStorage interface but is unimplemented. // Create satisfies the RESTStorage interface but is unimplemented.
func (rs *REST) Create(obj runtime.Object) (<-chan runtime.Object, error) { func (rs *REST) Create(ctx context.Context, obj runtime.Object) (<-chan runtime.Object, error) {
return nil, errors.New("unimplemented") return nil, errors.New("unimplemented")
} }
// Update satisfies the RESTStorage interface but is unimplemented. // Update satisfies the RESTStorage interface but is unimplemented.
func (rs *REST) Update(obj runtime.Object) (<-chan runtime.Object, error) { func (rs *REST) Update(ctx context.Context, obj runtime.Object) (<-chan runtime.Object, error) {
return nil, errors.New("unimplemented") return nil, errors.New("unimplemented")
} }
// Delete satisfies the RESTStorage interface but is unimplemented. // Delete satisfies the RESTStorage interface but is unimplemented.
func (rs *REST) Delete(id string) (<-chan runtime.Object, error) { func (rs *REST) Delete(ctx context.Context, id string) (<-chan runtime.Object, error) {
return nil, errors.New("unimplemented") return nil, errors.New("unimplemented")
} }

View File

@ -29,6 +29,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog" "github.com/golang/glog"
"code.google.com/p/go.net/context"
) )
// TODO: Need to add a reconciler loop that makes sure that things in pods are reflected into // TODO: Need to add a reconciler loop that makes sure that things in pods are reflected into
@ -55,7 +57,7 @@ func makePodKey(podID string) string {
} }
// ListPods obtains a list of pods with labels that match selector. // ListPods obtains a list of pods with labels that match selector.
func (r *Registry) ListPods(selector labels.Selector) (*api.PodList, error) { func (r *Registry) ListPods(ctx context.Context, selector labels.Selector) (*api.PodList, error) {
return r.ListPodsPredicate(func(pod *api.Pod) bool { return r.ListPodsPredicate(func(pod *api.Pod) bool {
return selector.Matches(labels.Set(pod.Labels)) return selector.Matches(labels.Set(pod.Labels))
}) })

View File

@ -19,6 +19,8 @@ package minion
import ( import (
"fmt" "fmt"
"code.google.com/p/go.net/context"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
@ -38,7 +40,7 @@ func NewREST(m Registry) *REST {
} }
} }
func (rs *REST) Create(obj runtime.Object) (<-chan runtime.Object, error) { func (rs *REST) Create(ctx context.Context, obj runtime.Object) (<-chan runtime.Object, error) {
minion, ok := obj.(*api.Minion) minion, ok := obj.(*api.Minion)
if !ok { if !ok {
return nil, fmt.Errorf("not a minion: %#v", obj) return nil, fmt.Errorf("not a minion: %#v", obj)
@ -65,7 +67,7 @@ func (rs *REST) Create(obj runtime.Object) (<-chan runtime.Object, error) {
}), nil }), nil
} }
func (rs *REST) Delete(id string) (<-chan runtime.Object, error) { func (rs *REST) Delete(ctx context.Context, id string) (<-chan runtime.Object, error) {
exists, err := rs.registry.Contains(id) exists, err := rs.registry.Contains(id)
if !exists { if !exists {
return nil, ErrDoesNotExist return nil, ErrDoesNotExist
@ -78,7 +80,7 @@ func (rs *REST) Delete(id string) (<-chan runtime.Object, error) {
}), nil }), nil
} }
func (rs *REST) Get(id string) (runtime.Object, error) { func (rs *REST) Get(ctx context.Context, id string) (runtime.Object, error) {
exists, err := rs.registry.Contains(id) exists, err := rs.registry.Contains(id)
if !exists { if !exists {
return nil, ErrDoesNotExist return nil, ErrDoesNotExist
@ -86,7 +88,7 @@ func (rs *REST) Get(id string) (runtime.Object, error) {
return rs.toApiMinion(id), err return rs.toApiMinion(id), err
} }
func (rs *REST) List(label, field labels.Selector) (runtime.Object, error) { func (rs *REST) List(ctx context.Context, label, field labels.Selector) (runtime.Object, error) {
nameList, err := rs.registry.List() nameList, err := rs.registry.List()
if err != nil { if err != nil {
return nil, err return nil, err
@ -102,7 +104,7 @@ func (*REST) New() runtime.Object {
return &api.Minion{} return &api.Minion{}
} }
func (rs *REST) Update(minion runtime.Object) (<-chan runtime.Object, error) { func (rs *REST) Update(ctx context.Context, minion runtime.Object) (<-chan runtime.Object, error) {
return nil, fmt.Errorf("Minions can only be created (inserted) and deleted.") return nil, fmt.Errorf("Minions can only be created (inserted) and deleted.")
} }

View File

@ -17,6 +17,8 @@ limitations under the License.
package pod package pod
import ( import (
"code.google.com/p/go.net/context"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
@ -25,7 +27,7 @@ import (
// Registry is an interface implemented by things that know how to store Pod objects. // Registry is an interface implemented by things that know how to store Pod objects.
type Registry interface { type Registry interface {
// ListPods obtains a list of pods having labels which match selector. // ListPods obtains a list of pods having labels which match selector.
ListPods(selector labels.Selector) (*api.PodList, error) ListPods(ctx context.Context, selector labels.Selector) (*api.PodList, error)
// ListPodsPredicate obtains a list of pods for which filter returns true. // ListPodsPredicate obtains a list of pods for which filter returns true.
ListPodsPredicate(filter func(*api.Pod) bool) (*api.PodList, error) ListPodsPredicate(filter func(*api.Pod) bool) (*api.PodList, error)
// Watch for new/changed/deleted pods // Watch for new/changed/deleted pods

View File

@ -32,6 +32,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"code.google.com/p/go.net/context"
"code.google.com/p/go-uuid/uuid" "code.google.com/p/go-uuid/uuid"
"github.com/golang/glog" "github.com/golang/glog"
) )
@ -67,7 +69,7 @@ func NewREST(config *RESTConfig) *REST {
} }
} }
func (rs *REST) Create(obj runtime.Object) (<-chan runtime.Object, error) { func (rs *REST) Create(ctx context.Context, obj runtime.Object) (<-chan runtime.Object, error) {
pod := obj.(*api.Pod) pod := obj.(*api.Pod)
pod.DesiredState.Manifest.UUID = uuid.NewUUID().String() pod.DesiredState.Manifest.UUID = uuid.NewUUID().String()
if len(pod.ID) == 0 { if len(pod.ID) == 0 {
@ -88,13 +90,13 @@ func (rs *REST) Create(obj runtime.Object) (<-chan runtime.Object, error) {
}), nil }), nil
} }
func (rs *REST) Delete(id string) (<-chan runtime.Object, error) { func (rs *REST) Delete(ctx context.Context, id string) (<-chan runtime.Object, error) {
return apiserver.MakeAsync(func() (runtime.Object, error) { return apiserver.MakeAsync(func() (runtime.Object, error) {
return &api.Status{Status: api.StatusSuccess}, rs.registry.DeletePod(id) return &api.Status{Status: api.StatusSuccess}, rs.registry.DeletePod(id)
}), nil }), nil
} }
func (rs *REST) Get(id string) (runtime.Object, error) { func (rs *REST) Get(ctx context.Context, id string) (runtime.Object, error) {
pod, err := rs.registry.GetPod(id) pod, err := rs.registry.GetPod(id)
if err != nil { if err != nil {
return pod, err return pod, err
@ -131,7 +133,7 @@ func (rs *REST) filterFunc(label, field labels.Selector) func(*api.Pod) bool {
} }
} }
func (rs *REST) List(label, field labels.Selector) (runtime.Object, error) { func (rs *REST) List(ctx context.Context, label, field labels.Selector) (runtime.Object, error) {
pods, err := rs.registry.ListPodsPredicate(rs.filterFunc(label, field)) pods, err := rs.registry.ListPodsPredicate(rs.filterFunc(label, field))
if err == nil { if err == nil {
for i := range pods.Items { for i := range pods.Items {
@ -149,7 +151,7 @@ func (rs *REST) List(label, field labels.Selector) (runtime.Object, error) {
} }
// Watch begins watching for new, changed, or deleted pods. // Watch begins watching for new, changed, or deleted pods.
func (rs *REST) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { func (rs *REST) Watch(ctx context.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return rs.registry.WatchPods(resourceVersion, rs.filterFunc(label, field)) return rs.registry.WatchPods(resourceVersion, rs.filterFunc(label, field))
} }
@ -157,7 +159,7 @@ func (*REST) New() runtime.Object {
return &api.Pod{} return &api.Pod{}
} }
func (rs *REST) Update(obj runtime.Object) (<-chan runtime.Object, error) { func (rs *REST) Update(ctx context.Context, obj runtime.Object) (<-chan runtime.Object, error) {
pod := obj.(*api.Pod) pod := obj.(*api.Pod)
if errs := validation.ValidatePod(pod); len(errs) > 0 { if errs := validation.ValidatePod(pod); len(errs) > 0 {
return nil, errors.NewInvalid("pod", pod.ID, errs) return nil, errors.NewInvalid("pod", pod.ID, errs)
@ -277,9 +279,9 @@ func getPodStatus(pod *api.Pod, minions client.MinionInterface) (api.PodStatus,
} }
} }
func (rs *REST) waitForPodRunning(pod *api.Pod) (runtime.Object, error) { func (rs *REST) waitForPodRunning(ctx context.Context, pod *api.Pod) (runtime.Object, error) {
for { for {
podObj, err := rs.Get(pod.ID) podObj, err := rs.Get(ctx, pod.ID)
if err != nil || podObj == nil { if err != nil || podObj == nil {
return nil, err return nil, err
} }

View File

@ -32,6 +32,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"code.google.com/p/go.net/context"
) )
// REST adapts a service registry into apiserver's RESTStorage model. // REST adapts a service registry into apiserver's RESTStorage model.
@ -50,7 +52,7 @@ func NewREST(registry Registry, cloud cloudprovider.Interface, machines minion.R
} }
} }
func (rs *REST) Create(obj runtime.Object) (<-chan runtime.Object, error) { func (rs *REST) Create(ctx context.Context, obj runtime.Object) (<-chan runtime.Object, error) {
srv := obj.(*api.Service) srv := obj.(*api.Service)
if errs := validation.ValidateService(srv); len(errs) > 0 { if errs := validation.ValidateService(srv); len(errs) > 0 {
return nil, errors.NewInvalid("service", srv.ID, errs) return nil, errors.NewInvalid("service", srv.ID, errs)
@ -94,7 +96,7 @@ func (rs *REST) Create(obj runtime.Object) (<-chan runtime.Object, error) {
}), nil }), nil
} }
func (rs *REST) Delete(id string) (<-chan runtime.Object, error) { func (rs *REST) Delete(ctx context.Context, id string) (<-chan runtime.Object, error) {
service, err := rs.registry.GetService(id) service, err := rs.registry.GetService(id)
if err != nil { if err != nil {
return nil, err return nil, err
@ -105,7 +107,7 @@ func (rs *REST) Delete(id string) (<-chan runtime.Object, error) {
}), nil }), nil
} }
func (rs *REST) Get(id string) (runtime.Object, error) { func (rs *REST) Get(ctx context.Context, id string) (runtime.Object, error) {
s, err := rs.registry.GetService(id) s, err := rs.registry.GetService(id)
if err != nil { if err != nil {
return nil, err return nil, err
@ -114,7 +116,7 @@ func (rs *REST) Get(id string) (runtime.Object, error) {
} }
// TODO: implement field selector? // TODO: implement field selector?
func (rs *REST) List(label, field labels.Selector) (runtime.Object, error) { func (rs *REST) List(ctx context.Context, label, field labels.Selector) (runtime.Object, error) {
list, err := rs.registry.ListServices() list, err := rs.registry.ListServices()
if err != nil { if err != nil {
return nil, err return nil, err
@ -131,7 +133,7 @@ func (rs *REST) List(label, field labels.Selector) (runtime.Object, error) {
// Watch returns Services events via a watch.Interface. // Watch returns Services events via a watch.Interface.
// It implements apiserver.ResourceWatcher. // It implements apiserver.ResourceWatcher.
func (rs *REST) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { func (rs *REST) Watch(ctx context.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return rs.registry.WatchServices(label, field, resourceVersion) return rs.registry.WatchServices(label, field, resourceVersion)
} }
@ -163,7 +165,7 @@ func GetServiceEnvironmentVariables(registry Registry, machine string) ([]api.En
return result, nil return result, nil
} }
func (rs *REST) Update(obj runtime.Object) (<-chan runtime.Object, error) { func (rs *REST) Update(ctx context.Context, obj runtime.Object) (<-chan runtime.Object, error) {
srv := obj.(*api.Service) srv := obj.(*api.Service)
if errs := validation.ValidateService(srv); len(errs) > 0 { if errs := validation.ValidateService(srv); len(errs) > 0 {
return nil, errors.NewInvalid("service", srv.ID, errs) return nil, errors.NewInvalid("service", srv.ID, errs)
@ -179,7 +181,7 @@ func (rs *REST) Update(obj runtime.Object) (<-chan runtime.Object, error) {
} }
// ResourceLocation returns a URL to which one can send traffic for the specified service. // ResourceLocation returns a URL to which one can send traffic for the specified service.
func (rs *REST) ResourceLocation(id string) (string, error) { func (rs *REST) ResourceLocation(ctx context.Context, id string) (string, error) {
e, err := rs.registry.GetEndpoints(id) e, err := rs.registry.GetEndpoints(id)
if err != nil { if err != nil {
return "", err return "", err