Merge pull request #1463 from derekwaynecarr/introduce_context_obj

Add ctx object to registry interface
This commit is contained in:
Tim Hockin 2014-09-26 13:04:18 -07:00
commit de3799d605
19 changed files with 198 additions and 160 deletions

View File

@ -25,6 +25,6 @@ type MockRegistry struct {
OnApplyBinding func(binding *api.Binding) error OnApplyBinding func(binding *api.Binding) error
} }
func (mr MockRegistry) ApplyBinding(binding *api.Binding) error { func (mr MockRegistry) ApplyBinding(ctx api.Context, binding *api.Binding) error {
return mr.OnApplyBinding(binding) return mr.OnApplyBinding(binding)
} }

View File

@ -24,5 +24,5 @@ import (
type Registry interface { type Registry interface {
// ApplyBinding should apply the binding. That is, it should actually // ApplyBinding should apply the binding. That is, it should actually
// assign or place pod binding.PodID on machine binding.Host. // assign or place pod binding.PodID on machine binding.Host.
ApplyBinding(binding *api.Binding) error ApplyBinding(ctx api.Context, binding *api.Binding) error
} }

View File

@ -67,7 +67,7 @@ func (b *REST) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Objec
return nil, fmt.Errorf("incorrect type: %#v", obj) return nil, fmt.Errorf("incorrect type: %#v", obj)
} }
return apiserver.MakeAsync(func() (runtime.Object, error) { return apiserver.MakeAsync(func() (runtime.Object, error) {
if err := b.registry.ApplyBinding(binding); err != nil { if err := b.registry.ApplyBinding(ctx, binding); err != nil {
return nil, err return nil, err
} }
return &api.Status{Status: api.StatusSuccess}, nil return &api.Status{Status: api.StatusSuccess}, nil

View File

@ -23,10 +23,10 @@ import (
// Registry is an interface for things that know how to store ReplicationControllers. // Registry is an interface for things that know how to store ReplicationControllers.
type Registry interface { type Registry interface {
ListControllers() (*api.ReplicationControllerList, error) ListControllers(ctx api.Context) (*api.ReplicationControllerList, error)
WatchControllers(resourceVersion uint64) (watch.Interface, error) WatchControllers(ctx api.Context, resourceVersion uint64) (watch.Interface, error)
GetController(controllerID string) (*api.ReplicationController, error) GetController(ctx api.Context, controllerID string) (*api.ReplicationController, error)
CreateController(controller *api.ReplicationController) error CreateController(ctx api.Context, controller *api.ReplicationController) error
UpdateController(controller *api.ReplicationController) error UpdateController(ctx api.Context, controller *api.ReplicationController) error
DeleteController(controllerID string) error DeleteController(ctx api.Context, controllerID string) error
} }

View File

@ -71,24 +71,24 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje
controller.CreationTimestamp = util.Now() controller.CreationTimestamp = util.Now()
return apiserver.MakeAsync(func() (runtime.Object, error) { return apiserver.MakeAsync(func() (runtime.Object, error) {
err := rs.registry.CreateController(controller) err := rs.registry.CreateController(ctx, controller)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return rs.registry.GetController(controller.ID) return rs.registry.GetController(ctx, controller.ID)
}), nil }), nil
} }
// Delete asynchronously deletes the ReplicationController specified by its id. // Delete asynchronously deletes the ReplicationController specified by its id.
func (rs *REST) Delete(ctx api.Context, id string) (<-chan runtime.Object, error) { func (rs *REST) Delete(ctx api.Context, id string) (<-chan runtime.Object, error) {
return apiserver.MakeAsync(func() (runtime.Object, error) { return apiserver.MakeAsync(func() (runtime.Object, error) {
return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteController(id) return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteController(ctx, id)
}), nil }), nil
} }
// Get obtains the ReplicationController specified by its id. // Get obtains the ReplicationController specified by its id.
func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) { func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) {
controller, err := rs.registry.GetController(id) controller, err := rs.registry.GetController(ctx, id)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -101,7 +101,7 @@ func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Obj
if !field.Empty() { if !field.Empty() {
return nil, fmt.Errorf("field selector not supported yet") return nil, fmt.Errorf("field selector not supported yet")
} }
controllers, err := rs.registry.ListControllers() controllers, err := rs.registry.ListControllers(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -132,11 +132,11 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje
return nil, errors.NewInvalid("replicationController", controller.ID, errs) return nil, errors.NewInvalid("replicationController", controller.ID, errs)
} }
return apiserver.MakeAsync(func() (runtime.Object, error) { return apiserver.MakeAsync(func() (runtime.Object, error) {
err := rs.registry.UpdateController(controller) err := rs.registry.UpdateController(ctx, controller)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return rs.registry.GetController(controller.ID) return rs.registry.GetController(ctx, controller.ID)
}), nil }), nil
} }
@ -146,7 +146,7 @@ func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVer
if !field.Empty() { if !field.Empty() {
return nil, fmt.Errorf("no field selector implemented for controllers") return nil, fmt.Errorf("no field selector implemented for controllers")
} }
incoming, err := rs.registry.WatchControllers(resourceVersion) incoming, err := rs.registry.WatchControllers(ctx, resourceVersion)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -24,8 +24,8 @@ import (
// Registry is an interface for things that know how to store endpoints. // Registry is an interface for things that know how to store endpoints.
type Registry interface { type Registry interface {
ListEndpoints() (*api.EndpointsList, error) ListEndpoints(ctx api.Context) (*api.EndpointsList, error)
GetEndpoints(name string) (*api.Endpoints, error) GetEndpoints(ctx api.Context, name string) (*api.Endpoints, error)
WatchEndpoints(labels, fields labels.Selector, resourceVersion uint64) (watch.Interface, error) WatchEndpoints(ctx api.Context, labels, fields labels.Selector, resourceVersion uint64) (watch.Interface, error)
UpdateEndpoints(e *api.Endpoints) error UpdateEndpoints(ctx api.Context, e *api.Endpoints) error
} }

View File

@ -39,7 +39,7 @@ func NewREST(registry Registry) *REST {
// Get satisfies the RESTStorage interface. // Get satisfies the RESTStorage interface.
func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) { func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) {
return rs.registry.GetEndpoints(id) return rs.registry.GetEndpoints(ctx, id)
} }
// List satisfies the RESTStorage interface. // List satisfies the RESTStorage interface.
@ -47,13 +47,13 @@ func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Obj
if !label.Empty() || !field.Empty() { if !label.Empty() || !field.Empty() {
return nil, errors.New("label/field selectors are not supported on endpoints") return nil, errors.New("label/field selectors are not supported on endpoints")
} }
return rs.registry.ListEndpoints() return rs.registry.ListEndpoints(ctx)
} }
// Watch returns Endpoint events via a watch.Interface. // Watch returns Endpoint events via a watch.Interface.
// It implements apiserver.ResourceWatcher. // It implements apiserver.ResourceWatcher.
func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return rs.registry.WatchEndpoints(label, field, resourceVersion) return rs.registry.WatchEndpoints(ctx, label, field, resourceVersion)
} }
// Create satisfies the RESTStorage interface but is unimplemented. // Create satisfies the RESTStorage interface but is unimplemented.

View File

@ -56,13 +56,13 @@ func makePodKey(podID string) string {
// ListPods obtains a list of pods with labels that match selector. // ListPods obtains a list of pods with labels that match selector.
func (r *Registry) ListPods(ctx api.Context, selector labels.Selector) (*api.PodList, error) { func (r *Registry) ListPods(ctx api.Context, selector labels.Selector) (*api.PodList, error) {
return r.ListPodsPredicate(func(pod *api.Pod) bool { return r.ListPodsPredicate(ctx, func(pod *api.Pod) bool {
return selector.Matches(labels.Set(pod.Labels)) return selector.Matches(labels.Set(pod.Labels))
}) })
} }
// ListPodsPredicate obtains a list of pods that match filter. // ListPodsPredicate obtains a list of pods that match filter.
func (r *Registry) ListPodsPredicate(filter func(*api.Pod) bool) (*api.PodList, error) { func (r *Registry) ListPodsPredicate(ctx api.Context, filter func(*api.Pod) bool) (*api.PodList, error) {
allPods := api.PodList{} allPods := api.PodList{}
err := r.ExtractList("/registry/pods", &allPods.Items, &allPods.ResourceVersion) err := r.ExtractList("/registry/pods", &allPods.Items, &allPods.ResourceVersion)
if err != nil { if err != nil {
@ -83,7 +83,7 @@ func (r *Registry) ListPodsPredicate(filter func(*api.Pod) bool) (*api.PodList,
} }
// WatchPods begins watching for new, changed, or deleted pods. // WatchPods begins watching for new, changed, or deleted pods.
func (r *Registry) WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) { func (r *Registry) WatchPods(ctx api.Context, resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) {
return r.WatchList("/registry/pods", resourceVersion, func(obj runtime.Object) bool { return r.WatchList("/registry/pods", resourceVersion, func(obj runtime.Object) bool {
switch t := obj.(type) { switch t := obj.(type) {
case *api.Pod: case *api.Pod:
@ -96,7 +96,7 @@ func (r *Registry) WatchPods(resourceVersion uint64, filter func(*api.Pod) bool)
} }
// GetPod gets a specific pod specified by its ID. // GetPod gets a specific pod specified by its ID.
func (r *Registry) GetPod(podID string) (*api.Pod, error) { func (r *Registry) GetPod(ctx api.Context, podID string) (*api.Pod, error) {
var pod api.Pod var pod api.Pod
if err := r.ExtractObj(makePodKey(podID), &pod, false); err != nil { if err := r.ExtractObj(makePodKey(podID), &pod, false); err != nil {
return nil, etcderr.InterpretGetError(err, "pod", podID) return nil, etcderr.InterpretGetError(err, "pod", podID)
@ -113,7 +113,7 @@ func makeContainerKey(machine string) string {
} }
// CreatePod creates a pod based on a specification. // CreatePod creates a pod based on a specification.
func (r *Registry) CreatePod(pod *api.Pod) error { func (r *Registry) CreatePod(ctx api.Context, pod *api.Pod) error {
// Set current status to "Waiting". // Set current status to "Waiting".
pod.CurrentState.Status = api.PodWaiting pod.CurrentState.Status = api.PodWaiting
pod.CurrentState.Host = "" pod.CurrentState.Host = ""
@ -125,7 +125,7 @@ func (r *Registry) CreatePod(pod *api.Pod) error {
} }
// ApplyBinding implements binding's registry // ApplyBinding implements binding's registry
func (r *Registry) ApplyBinding(binding *api.Binding) error { func (r *Registry) ApplyBinding(ctx api.Context, binding *api.Binding) error {
return etcderr.InterpretCreateError(r.assignPod(binding.PodID, binding.Host), "binding", "") return etcderr.InterpretCreateError(r.assignPod(binding.PodID, binding.Host), "binding", "")
} }
@ -178,12 +178,12 @@ func (r *Registry) assignPod(podID string, machine string) error {
return err return err
} }
func (r *Registry) UpdatePod(pod *api.Pod) error { func (r *Registry) UpdatePod(ctx api.Context, pod *api.Pod) error {
return fmt.Errorf("unimplemented!") return fmt.Errorf("unimplemented!")
} }
// DeletePod deletes an existing pod specified by its ID. // DeletePod deletes an existing pod specified by its ID.
func (r *Registry) DeletePod(podID string) error { func (r *Registry) DeletePod(ctx api.Context, podID string) error {
var pod api.Pod var pod api.Pod
podKey := makePodKey(podID) podKey := makePodKey(podID)
err := r.ExtractObj(podKey, &pod, false) err := r.ExtractObj(podKey, &pod, false)
@ -226,14 +226,14 @@ func (r *Registry) DeletePod(podID string) error {
} }
// ListControllers obtains a list of ReplicationControllers. // ListControllers obtains a list of ReplicationControllers.
func (r *Registry) ListControllers() (*api.ReplicationControllerList, error) { func (r *Registry) ListControllers(ctx api.Context) (*api.ReplicationControllerList, error) {
controllers := &api.ReplicationControllerList{} controllers := &api.ReplicationControllerList{}
err := r.ExtractList("/registry/controllers", &controllers.Items, &controllers.ResourceVersion) err := r.ExtractList("/registry/controllers", &controllers.Items, &controllers.ResourceVersion)
return controllers, err return controllers, err
} }
// WatchControllers begins watching for new, changed, or deleted controllers. // WatchControllers begins watching for new, changed, or deleted controllers.
func (r *Registry) WatchControllers(resourceVersion uint64) (watch.Interface, error) { func (r *Registry) WatchControllers(ctx api.Context, resourceVersion uint64) (watch.Interface, error) {
return r.WatchList("/registry/controllers", resourceVersion, tools.Everything) return r.WatchList("/registry/controllers", resourceVersion, tools.Everything)
} }
@ -242,7 +242,7 @@ func makeControllerKey(id string) string {
} }
// GetController gets a specific ReplicationController specified by its ID. // GetController gets a specific ReplicationController specified by its ID.
func (r *Registry) GetController(controllerID string) (*api.ReplicationController, error) { func (r *Registry) GetController(ctx api.Context, controllerID string) (*api.ReplicationController, error) {
var controller api.ReplicationController var controller api.ReplicationController
key := makeControllerKey(controllerID) key := makeControllerKey(controllerID)
err := r.ExtractObj(key, &controller, false) err := r.ExtractObj(key, &controller, false)
@ -253,19 +253,19 @@ func (r *Registry) GetController(controllerID string) (*api.ReplicationControlle
} }
// CreateController creates a new ReplicationController. // CreateController creates a new ReplicationController.
func (r *Registry) CreateController(controller *api.ReplicationController) error { func (r *Registry) CreateController(ctx api.Context, controller *api.ReplicationController) error {
err := r.CreateObj(makeControllerKey(controller.ID), controller, 0) err := r.CreateObj(makeControllerKey(controller.ID), controller, 0)
return etcderr.InterpretCreateError(err, "replicationController", controller.ID) return etcderr.InterpretCreateError(err, "replicationController", controller.ID)
} }
// UpdateController replaces an existing ReplicationController. // UpdateController replaces an existing ReplicationController.
func (r *Registry) UpdateController(controller *api.ReplicationController) error { func (r *Registry) UpdateController(ctx api.Context, controller *api.ReplicationController) error {
err := r.SetObj(makeControllerKey(controller.ID), controller) err := r.SetObj(makeControllerKey(controller.ID), controller)
return etcderr.InterpretUpdateError(err, "replicationController", controller.ID) return etcderr.InterpretUpdateError(err, "replicationController", controller.ID)
} }
// DeleteController deletes a ReplicationController specified by its ID. // DeleteController deletes a ReplicationController specified by its ID.
func (r *Registry) DeleteController(controllerID string) error { func (r *Registry) DeleteController(ctx api.Context, controllerID string) error {
key := makeControllerKey(controllerID) key := makeControllerKey(controllerID)
err := r.Delete(key, false) err := r.Delete(key, false)
return etcderr.InterpretDeleteError(err, "replicationController", controllerID) return etcderr.InterpretDeleteError(err, "replicationController", controllerID)
@ -276,20 +276,20 @@ func makeServiceKey(name string) string {
} }
// ListServices obtains a list of Services. // ListServices obtains a list of Services.
func (r *Registry) ListServices() (*api.ServiceList, error) { func (r *Registry) ListServices(ctx api.Context) (*api.ServiceList, error) {
list := &api.ServiceList{} list := &api.ServiceList{}
err := r.ExtractList("/registry/services/specs", &list.Items, &list.ResourceVersion) err := r.ExtractList("/registry/services/specs", &list.Items, &list.ResourceVersion)
return list, err return list, err
} }
// CreateService creates a new Service. // CreateService creates a new Service.
func (r *Registry) CreateService(svc *api.Service) error { func (r *Registry) CreateService(ctx api.Context, svc *api.Service) error {
err := r.CreateObj(makeServiceKey(svc.ID), svc, 0) err := r.CreateObj(makeServiceKey(svc.ID), svc, 0)
return etcderr.InterpretCreateError(err, "service", svc.ID) return etcderr.InterpretCreateError(err, "service", svc.ID)
} }
// GetService obtains a Service specified by its name. // GetService obtains a Service specified by its name.
func (r *Registry) GetService(name string) (*api.Service, error) { func (r *Registry) GetService(ctx api.Context, name string) (*api.Service, error) {
key := makeServiceKey(name) key := makeServiceKey(name)
var svc api.Service var svc api.Service
err := r.ExtractObj(key, &svc, false) err := r.ExtractObj(key, &svc, false)
@ -300,7 +300,7 @@ func (r *Registry) GetService(name string) (*api.Service, error) {
} }
// GetEndpoints obtains the endpoints for the service identified by 'name'. // GetEndpoints obtains the endpoints for the service identified by 'name'.
func (r *Registry) GetEndpoints(name string) (*api.Endpoints, error) { func (r *Registry) GetEndpoints(ctx api.Context, name string) (*api.Endpoints, error) {
key := makeServiceEndpointsKey(name) key := makeServiceEndpointsKey(name)
var endpoints api.Endpoints var endpoints api.Endpoints
err := r.ExtractObj(key, &endpoints, false) err := r.ExtractObj(key, &endpoints, false)
@ -315,7 +315,7 @@ func makeServiceEndpointsKey(name string) string {
} }
// DeleteService deletes a Service specified by its name. // DeleteService deletes a Service specified by its name.
func (r *Registry) DeleteService(name string) error { func (r *Registry) DeleteService(ctx api.Context, name string) error {
key := makeServiceKey(name) key := makeServiceKey(name)
err := r.Delete(key, true) err := r.Delete(key, true)
if err != nil { if err != nil {
@ -332,13 +332,13 @@ func (r *Registry) DeleteService(name string) error {
} }
// UpdateService replaces an existing Service. // UpdateService replaces an existing Service.
func (r *Registry) UpdateService(svc *api.Service) error { func (r *Registry) UpdateService(ctx api.Context, svc *api.Service) error {
err := r.SetObj(makeServiceKey(svc.ID), svc) err := r.SetObj(makeServiceKey(svc.ID), svc)
return etcderr.InterpretUpdateError(err, "service", svc.ID) return etcderr.InterpretUpdateError(err, "service", svc.ID)
} }
// WatchServices begins watching for new, changed, or deleted service configurations. // WatchServices begins watching for new, changed, or deleted service configurations.
func (r *Registry) WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { func (r *Registry) WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
if !label.Empty() { if !label.Empty() {
return nil, fmt.Errorf("label selectors are not supported on services") return nil, fmt.Errorf("label selectors are not supported on services")
} }
@ -352,14 +352,14 @@ func (r *Registry) WatchServices(label, field labels.Selector, resourceVersion u
} }
// ListEndpoints obtains a list of Services. // ListEndpoints obtains a list of Services.
func (r *Registry) ListEndpoints() (*api.EndpointsList, error) { func (r *Registry) ListEndpoints(ctx api.Context) (*api.EndpointsList, error) {
list := &api.EndpointsList{} list := &api.EndpointsList{}
err := r.ExtractList("/registry/services/endpoints", &list.Items, &list.ResourceVersion) err := r.ExtractList("/registry/services/endpoints", &list.Items, &list.ResourceVersion)
return list, err return list, err
} }
// UpdateEndpoints update Endpoints of a Service. // UpdateEndpoints update Endpoints of a Service.
func (r *Registry) UpdateEndpoints(e *api.Endpoints) error { func (r *Registry) UpdateEndpoints(ctx api.Context, e *api.Endpoints) error {
// TODO: this is a really bad misuse of AtomicUpdate, need to compute a diff inside the loop. // TODO: this is a really bad misuse of AtomicUpdate, need to compute a diff inside the loop.
err := r.AtomicUpdate(makeServiceEndpointsKey(e.ID), &api.Endpoints{}, err := r.AtomicUpdate(makeServiceEndpointsKey(e.ID), &api.Endpoints{},
func(input runtime.Object) (runtime.Object, error) { func(input runtime.Object) (runtime.Object, error) {
@ -370,7 +370,7 @@ func (r *Registry) UpdateEndpoints(e *api.Endpoints) error {
} }
// WatchEndpoints begins watching for new, changed, or deleted endpoint configurations. // WatchEndpoints begins watching for new, changed, or deleted endpoint configurations.
func (r *Registry) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { func (r *Registry) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
if !label.Empty() { if !label.Empty() {
return nil, fmt.Errorf("label selectors are not supported on endpoints") return nil, fmt.Errorf("label selectors are not supported on endpoints")
} }

View File

@ -41,10 +41,11 @@ func NewTestEtcdRegistry(client tools.EtcdClient) *Registry {
} }
func TestEtcdGetPod(t *testing.T) { func TestEtcdGetPod(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.Set("/registry/pods/foo", runtime.EncodeOrDie(latest.Codec, &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0) fakeClient.Set("/registry/pods/foo", runtime.EncodeOrDie(latest.Codec, &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0)
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
pod, err := registry.GetPod("foo") pod, err := registry.GetPod(ctx, "foo")
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -55,6 +56,7 @@ func TestEtcdGetPod(t *testing.T) {
} }
func TestEtcdGetPodNotFound(t *testing.T) { func TestEtcdGetPodNotFound(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{ fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
@ -63,13 +65,14 @@ func TestEtcdGetPodNotFound(t *testing.T) {
E: tools.EtcdErrorNotFound, E: tools.EtcdErrorNotFound,
} }
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
_, err := registry.GetPod("foo") _, err := registry.GetPod(ctx, "foo")
if !errors.IsNotFound(err) { if !errors.IsNotFound(err) {
t.Errorf("Unexpected error returned: %#v", err) t.Errorf("Unexpected error returned: %#v", err)
} }
} }
func TestEtcdCreatePod(t *testing.T) { func TestEtcdCreatePod(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true fakeClient.TestIndex = true
fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{ fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{
@ -80,7 +83,7 @@ func TestEtcdCreatePod(t *testing.T) {
} }
fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.ContainerManifestList{}), 0) fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.ContainerManifestList{}), 0)
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
err := registry.CreatePod(&api.Pod{ err := registry.CreatePod(ctx, &api.Pod{
JSONBase: api.JSONBase{ JSONBase: api.JSONBase{
ID: "foo", ID: "foo",
}, },
@ -99,7 +102,7 @@ func TestEtcdCreatePod(t *testing.T) {
} }
// Suddenly, a wild scheduler appears: // Suddenly, a wild scheduler appears:
err = registry.ApplyBinding(&api.Binding{PodID: "foo", Host: "machine"}) err = registry.ApplyBinding(ctx, &api.Binding{PodID: "foo", Host: "machine"})
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
@ -130,6 +133,7 @@ func TestEtcdCreatePod(t *testing.T) {
} }
func TestEtcdCreatePodAlreadyExisting(t *testing.T) { func TestEtcdCreatePodAlreadyExisting(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{ fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
@ -140,7 +144,7 @@ func TestEtcdCreatePodAlreadyExisting(t *testing.T) {
E: nil, E: nil,
} }
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
err := registry.CreatePod(&api.Pod{ err := registry.CreatePod(ctx, &api.Pod{
JSONBase: api.JSONBase{ JSONBase: api.JSONBase{
ID: "foo", ID: "foo",
}, },
@ -151,6 +155,7 @@ func TestEtcdCreatePodAlreadyExisting(t *testing.T) {
} }
func TestEtcdCreatePodWithContainersError(t *testing.T) { func TestEtcdCreatePodWithContainersError(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true fakeClient.TestIndex = true
fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{ fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{
@ -166,7 +171,7 @@ func TestEtcdCreatePodWithContainersError(t *testing.T) {
E: tools.EtcdErrorNodeExist, // validate that ApplyBinding is translating Create errors E: tools.EtcdErrorNodeExist, // validate that ApplyBinding is translating Create errors
} }
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
err := registry.CreatePod(&api.Pod{ err := registry.CreatePod(ctx, &api.Pod{
JSONBase: api.JSONBase{ JSONBase: api.JSONBase{
ID: "foo", ID: "foo",
}, },
@ -176,12 +181,12 @@ func TestEtcdCreatePodWithContainersError(t *testing.T) {
} }
// Suddenly, a wild scheduler appears: // Suddenly, a wild scheduler appears:
err = registry.ApplyBinding(&api.Binding{PodID: "foo", Host: "machine"}) err = registry.ApplyBinding(ctx, &api.Binding{PodID: "foo", Host: "machine"})
if !errors.IsAlreadyExists(err) { if !errors.IsAlreadyExists(err) {
t.Fatalf("Unexpected error returned: %#v", err) t.Fatalf("Unexpected error returned: %#v", err)
} }
existingPod, err := registry.GetPod("foo") existingPod, err := registry.GetPod(ctx, "foo")
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@ -191,6 +196,7 @@ func TestEtcdCreatePodWithContainersError(t *testing.T) {
} }
func TestEtcdCreatePodWithContainersNotFound(t *testing.T) { func TestEtcdCreatePodWithContainersNotFound(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true fakeClient.TestIndex = true
fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{ fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{
@ -206,7 +212,7 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) {
E: tools.EtcdErrorNotFound, E: tools.EtcdErrorNotFound,
} }
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
err := registry.CreatePod(&api.Pod{ err := registry.CreatePod(ctx, &api.Pod{
JSONBase: api.JSONBase{ JSONBase: api.JSONBase{
ID: "foo", ID: "foo",
}, },
@ -226,7 +232,7 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) {
} }
// Suddenly, a wild scheduler appears: // Suddenly, a wild scheduler appears:
err = registry.ApplyBinding(&api.Binding{PodID: "foo", Host: "machine"}) err = registry.ApplyBinding(ctx, &api.Binding{PodID: "foo", Host: "machine"})
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
@ -257,6 +263,7 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) {
} }
func TestEtcdCreatePodWithExistingContainers(t *testing.T) { func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true fakeClient.TestIndex = true
fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{ fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{
@ -271,7 +278,7 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
}, },
}), 0) }), 0)
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
err := registry.CreatePod(&api.Pod{ err := registry.CreatePod(ctx, &api.Pod{
JSONBase: api.JSONBase{ JSONBase: api.JSONBase{
ID: "foo", ID: "foo",
}, },
@ -291,7 +298,7 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
} }
// Suddenly, a wild scheduler appears: // Suddenly, a wild scheduler appears:
err = registry.ApplyBinding(&api.Binding{PodID: "foo", Host: "machine"}) err = registry.ApplyBinding(ctx, &api.Binding{PodID: "foo", Host: "machine"})
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
@ -322,6 +329,7 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
} }
func TestEtcdDeletePod(t *testing.T) { func TestEtcdDeletePod(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true fakeClient.TestIndex = true
@ -336,7 +344,7 @@ func TestEtcdDeletePod(t *testing.T) {
}, },
}), 0) }), 0)
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
err := registry.DeletePod("foo") err := registry.DeletePod(ctx, "foo")
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -358,6 +366,7 @@ func TestEtcdDeletePod(t *testing.T) {
} }
func TestEtcdDeletePodMultipleContainers(t *testing.T) { func TestEtcdDeletePodMultipleContainers(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true fakeClient.TestIndex = true
@ -373,7 +382,7 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) {
}, },
}), 0) }), 0)
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
err := registry.DeletePod("foo") err := registry.DeletePod(ctx, "foo")
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -481,6 +490,7 @@ func TestEtcdListPods(t *testing.T) {
} }
func TestEtcdListControllersNotFound(t *testing.T) { func TestEtcdListControllersNotFound(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
key := "/registry/controllers" key := "/registry/controllers"
fakeClient.Data[key] = tools.EtcdResponseWithError{ fakeClient.Data[key] = tools.EtcdResponseWithError{
@ -488,7 +498,7 @@ func TestEtcdListControllersNotFound(t *testing.T) {
E: tools.EtcdErrorNotFound, E: tools.EtcdErrorNotFound,
} }
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
controllers, err := registry.ListControllers() controllers, err := registry.ListControllers(ctx)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -499,6 +509,7 @@ func TestEtcdListControllersNotFound(t *testing.T) {
} }
func TestEtcdListServicesNotFound(t *testing.T) { func TestEtcdListServicesNotFound(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
key := "/registry/services/specs" key := "/registry/services/specs"
fakeClient.Data[key] = tools.EtcdResponseWithError{ fakeClient.Data[key] = tools.EtcdResponseWithError{
@ -506,7 +517,7 @@ func TestEtcdListServicesNotFound(t *testing.T) {
E: tools.EtcdErrorNotFound, E: tools.EtcdErrorNotFound,
} }
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
services, err := registry.ListServices() services, err := registry.ListServices(ctx)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -517,6 +528,7 @@ func TestEtcdListServicesNotFound(t *testing.T) {
} }
func TestEtcdListControllers(t *testing.T) { func TestEtcdListControllers(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
key := "/registry/controllers" key := "/registry/controllers"
fakeClient.Data[key] = tools.EtcdResponseWithError{ fakeClient.Data[key] = tools.EtcdResponseWithError{
@ -535,7 +547,7 @@ func TestEtcdListControllers(t *testing.T) {
E: nil, E: nil,
} }
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
controllers, err := registry.ListControllers() controllers, err := registry.ListControllers(ctx)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -546,10 +558,11 @@ func TestEtcdListControllers(t *testing.T) {
} }
func TestEtcdGetController(t *testing.T) { func TestEtcdGetController(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.Set("/registry/controllers/foo", runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0) fakeClient.Set("/registry/controllers/foo", runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0)
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
ctrl, err := registry.GetController("foo") ctrl, err := registry.GetController(ctx, "foo")
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -560,6 +573,7 @@ func TestEtcdGetController(t *testing.T) {
} }
func TestEtcdGetControllerNotFound(t *testing.T) { func TestEtcdGetControllerNotFound(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.Data["/registry/controllers/foo"] = tools.EtcdResponseWithError{ fakeClient.Data["/registry/controllers/foo"] = tools.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
@ -568,7 +582,7 @@ func TestEtcdGetControllerNotFound(t *testing.T) {
E: tools.EtcdErrorNotFound, E: tools.EtcdErrorNotFound,
} }
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
ctrl, err := registry.GetController("foo") ctrl, err := registry.GetController(ctx, "foo")
if ctrl != nil { if ctrl != nil {
t.Errorf("Unexpected non-nil controller: %#v", ctrl) t.Errorf("Unexpected non-nil controller: %#v", ctrl)
} }
@ -578,9 +592,10 @@ func TestEtcdGetControllerNotFound(t *testing.T) {
} }
func TestEtcdDeleteController(t *testing.T) { func TestEtcdDeleteController(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
err := registry.DeleteController("foo") err := registry.DeleteController(ctx, "foo")
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -595,9 +610,10 @@ func TestEtcdDeleteController(t *testing.T) {
} }
func TestEtcdCreateController(t *testing.T) { func TestEtcdCreateController(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
err := registry.CreateController(&api.ReplicationController{ err := registry.CreateController(ctx, &api.ReplicationController{
JSONBase: api.JSONBase{ JSONBase: api.JSONBase{
ID: "foo", ID: "foo",
}, },
@ -622,11 +638,12 @@ func TestEtcdCreateController(t *testing.T) {
} }
func TestEtcdCreateControllerAlreadyExisting(t *testing.T) { func TestEtcdCreateControllerAlreadyExisting(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.Set("/registry/controllers/foo", runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0) fakeClient.Set("/registry/controllers/foo", runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0)
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
err := registry.CreateController(&api.ReplicationController{ err := registry.CreateController(ctx, &api.ReplicationController{
JSONBase: api.JSONBase{ JSONBase: api.JSONBase{
ID: "foo", ID: "foo",
}, },
@ -637,12 +654,13 @@ func TestEtcdCreateControllerAlreadyExisting(t *testing.T) {
} }
func TestEtcdUpdateController(t *testing.T) { func TestEtcdUpdateController(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true fakeClient.TestIndex = true
resp, _ := fakeClient.Set("/registry/controllers/foo", runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0) resp, _ := fakeClient.Set("/registry/controllers/foo", runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0)
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
err := registry.UpdateController(&api.ReplicationController{ err := registry.UpdateController(ctx, &api.ReplicationController{
JSONBase: api.JSONBase{ID: "foo", ResourceVersion: resp.Node.ModifiedIndex}, JSONBase: api.JSONBase{ID: "foo", ResourceVersion: resp.Node.ModifiedIndex},
DesiredState: api.ReplicationControllerState{ DesiredState: api.ReplicationControllerState{
Replicas: 2, Replicas: 2,
@ -652,13 +670,14 @@ func TestEtcdUpdateController(t *testing.T) {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
ctrl, err := registry.GetController("foo") ctrl, err := registry.GetController(ctx, "foo")
if ctrl.DesiredState.Replicas != 2 { if ctrl.DesiredState.Replicas != 2 {
t.Errorf("Unexpected controller: %#v", ctrl) t.Errorf("Unexpected controller: %#v", ctrl)
} }
} }
func TestEtcdListServices(t *testing.T) { func TestEtcdListServices(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
key := "/registry/services/specs" key := "/registry/services/specs"
fakeClient.Data[key] = tools.EtcdResponseWithError{ fakeClient.Data[key] = tools.EtcdResponseWithError{
@ -677,7 +696,7 @@ func TestEtcdListServices(t *testing.T) {
E: nil, E: nil,
} }
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
services, err := registry.ListServices() services, err := registry.ListServices(ctx)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -688,9 +707,10 @@ func TestEtcdListServices(t *testing.T) {
} }
func TestEtcdCreateService(t *testing.T) { func TestEtcdCreateService(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
err := registry.CreateService(&api.Service{ err := registry.CreateService(ctx, &api.Service{
JSONBase: api.JSONBase{ID: "foo"}, JSONBase: api.JSONBase{ID: "foo"},
}) })
if err != nil { if err != nil {
@ -714,10 +734,11 @@ func TestEtcdCreateService(t *testing.T) {
} }
func TestEtcdCreateServiceAlreadyExisting(t *testing.T) { func TestEtcdCreateServiceAlreadyExisting(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.Set("/registry/services/specs/foo", runtime.EncodeOrDie(latest.Codec, &api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0) fakeClient.Set("/registry/services/specs/foo", runtime.EncodeOrDie(latest.Codec, &api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0)
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
err := registry.CreateService(&api.Service{ err := registry.CreateService(ctx, &api.Service{
JSONBase: api.JSONBase{ID: "foo"}, JSONBase: api.JSONBase{ID: "foo"},
}) })
if !errors.IsAlreadyExists(err) { if !errors.IsAlreadyExists(err) {
@ -726,10 +747,11 @@ func TestEtcdCreateServiceAlreadyExisting(t *testing.T) {
} }
func TestEtcdGetService(t *testing.T) { func TestEtcdGetService(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.Set("/registry/services/specs/foo", runtime.EncodeOrDie(latest.Codec, &api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0) fakeClient.Set("/registry/services/specs/foo", runtime.EncodeOrDie(latest.Codec, &api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0)
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
service, err := registry.GetService("foo") service, err := registry.GetService(ctx, "foo")
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -740,6 +762,7 @@ func TestEtcdGetService(t *testing.T) {
} }
func TestEtcdGetServiceNotFound(t *testing.T) { func TestEtcdGetServiceNotFound(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.Data["/registry/services/specs/foo"] = tools.EtcdResponseWithError{ fakeClient.Data["/registry/services/specs/foo"] = tools.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
@ -748,16 +771,17 @@ func TestEtcdGetServiceNotFound(t *testing.T) {
E: tools.EtcdErrorNotFound, E: tools.EtcdErrorNotFound,
} }
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
_, err := registry.GetService("foo") _, err := registry.GetService(ctx, "foo")
if !errors.IsNotFound(err) { if !errors.IsNotFound(err) {
t.Errorf("Unexpected error returned: %#v", err) t.Errorf("Unexpected error returned: %#v", err)
} }
} }
func TestEtcdDeleteService(t *testing.T) { func TestEtcdDeleteService(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
err := registry.DeleteService("foo") err := registry.DeleteService(ctx, "foo")
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -776,6 +800,7 @@ func TestEtcdDeleteService(t *testing.T) {
} }
func TestEtcdUpdateService(t *testing.T) { func TestEtcdUpdateService(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true fakeClient.TestIndex = true
@ -790,12 +815,12 @@ func TestEtcdUpdateService(t *testing.T) {
"baz": "bar", "baz": "bar",
}, },
} }
err := registry.UpdateService(&testService) err := registry.UpdateService(ctx, &testService)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
svc, err := registry.GetService("foo") svc, err := registry.GetService(ctx, "foo")
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -809,6 +834,7 @@ func TestEtcdUpdateService(t *testing.T) {
} }
func TestEtcdListEndpoints(t *testing.T) { func TestEtcdListEndpoints(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
key := "/registry/services/endpoints" key := "/registry/services/endpoints"
fakeClient.Data[key] = tools.EtcdResponseWithError{ fakeClient.Data[key] = tools.EtcdResponseWithError{
@ -827,7 +853,7 @@ func TestEtcdListEndpoints(t *testing.T) {
E: nil, E: nil,
} }
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
services, err := registry.ListEndpoints() services, err := registry.ListEndpoints(ctx)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -838,6 +864,7 @@ func TestEtcdListEndpoints(t *testing.T) {
} }
func TestEtcdGetEndpoints(t *testing.T) { func TestEtcdGetEndpoints(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
endpoints := &api.Endpoints{ endpoints := &api.Endpoints{
@ -847,7 +874,7 @@ func TestEtcdGetEndpoints(t *testing.T) {
fakeClient.Set("/registry/services/endpoints/foo", runtime.EncodeOrDie(latest.Codec, endpoints), 0) fakeClient.Set("/registry/services/endpoints/foo", runtime.EncodeOrDie(latest.Codec, endpoints), 0)
got, err := registry.GetEndpoints("foo") got, err := registry.GetEndpoints(ctx, "foo")
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -858,6 +885,7 @@ func TestEtcdGetEndpoints(t *testing.T) {
} }
func TestEtcdUpdateEndpoints(t *testing.T) { func TestEtcdUpdateEndpoints(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true fakeClient.TestIndex = true
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
@ -868,7 +896,7 @@ func TestEtcdUpdateEndpoints(t *testing.T) {
fakeClient.Set("/registry/services/endpoints/foo", runtime.EncodeOrDie(latest.Codec, &api.Endpoints{}), 0) fakeClient.Set("/registry/services/endpoints/foo", runtime.EncodeOrDie(latest.Codec, &api.Endpoints{}), 0)
err := registry.UpdateEndpoints(&endpoints) err := registry.UpdateEndpoints(ctx, &endpoints)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -885,9 +913,10 @@ func TestEtcdUpdateEndpoints(t *testing.T) {
} }
func TestEtcdWatchServices(t *testing.T) { func TestEtcdWatchServices(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
watching, err := registry.WatchServices( watching, err := registry.WatchServices(ctx,
labels.Everything(), labels.Everything(),
labels.SelectorFromSet(labels.Set{"ID": "foo"}), labels.SelectorFromSet(labels.Set{"ID": "foo"}),
1, 1,
@ -912,9 +941,11 @@ func TestEtcdWatchServices(t *testing.T) {
} }
func TestEtcdWatchServicesBadSelector(t *testing.T) { func TestEtcdWatchServicesBadSelector(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
_, err := registry.WatchServices( _, err := registry.WatchServices(
ctx,
labels.Everything(), labels.Everything(),
labels.SelectorFromSet(labels.Set{"Field.Selector": "foo"}), labels.SelectorFromSet(labels.Set{"Field.Selector": "foo"}),
0, 0,
@ -924,6 +955,7 @@ func TestEtcdWatchServicesBadSelector(t *testing.T) {
} }
_, err = registry.WatchServices( _, err = registry.WatchServices(
ctx,
labels.SelectorFromSet(labels.Set{"Label.Selector": "foo"}), labels.SelectorFromSet(labels.Set{"Label.Selector": "foo"}),
labels.Everything(), labels.Everything(),
0, 0,
@ -934,9 +966,11 @@ func TestEtcdWatchServicesBadSelector(t *testing.T) {
} }
func TestEtcdWatchEndpoints(t *testing.T) { func TestEtcdWatchEndpoints(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
watching, err := registry.WatchEndpoints( watching, err := registry.WatchEndpoints(
ctx,
labels.Everything(), labels.Everything(),
labels.SelectorFromSet(labels.Set{"ID": "foo"}), labels.SelectorFromSet(labels.Set{"ID": "foo"}),
1, 1,
@ -961,9 +995,11 @@ func TestEtcdWatchEndpoints(t *testing.T) {
} }
func TestEtcdWatchEndpointsBadSelector(t *testing.T) { func TestEtcdWatchEndpointsBadSelector(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
_, err := registry.WatchEndpoints( _, err := registry.WatchEndpoints(
ctx,
labels.Everything(), labels.Everything(),
labels.SelectorFromSet(labels.Set{"Field.Selector": "foo"}), labels.SelectorFromSet(labels.Set{"Field.Selector": "foo"}),
0, 0,
@ -973,6 +1009,7 @@ func TestEtcdWatchEndpointsBadSelector(t *testing.T) {
} }
_, err = registry.WatchEndpoints( _, err = registry.WatchEndpoints(
ctx,
labels.SelectorFromSet(labels.Set{"Label.Selector": "foo"}), labels.SelectorFromSet(labels.Set{"Label.Selector": "foo"}),
labels.Everything(), labels.Everything(),
0, 0,

View File

@ -32,7 +32,7 @@ type BasicManifestFactory struct {
} }
func (b *BasicManifestFactory) MakeManifest(machine string, pod api.Pod) (api.ContainerManifest, error) { func (b *BasicManifestFactory) MakeManifest(machine string, pod api.Pod) (api.ContainerManifest, error) {
envVars, err := service.GetServiceEnvironmentVariables(b.ServiceRegistry, machine) envVars, err := service.GetServiceEnvironmentVariables(api.NewContext(), b.ServiceRegistry, machine)
if err != nil { if err != nil {
return api.ContainerManifest{}, err return api.ContainerManifest{}, err
} }

View File

@ -27,15 +27,15 @@ type Registry interface {
// ListPods obtains a list of pods having labels which match selector. // ListPods obtains a list of pods having labels which match selector.
ListPods(ctx api.Context, selector labels.Selector) (*api.PodList, error) ListPods(ctx api.Context, selector labels.Selector) (*api.PodList, error)
// ListPodsPredicate obtains a list of pods for which filter returns true. // ListPodsPredicate obtains a list of pods for which filter returns true.
ListPodsPredicate(filter func(*api.Pod) bool) (*api.PodList, error) ListPodsPredicate(ctx api.Context, filter func(*api.Pod) bool) (*api.PodList, error)
// Watch for new/changed/deleted pods // Watch for new/changed/deleted pods
WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) WatchPods(ctx api.Context, resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error)
// Get a specific pod // Get a specific pod
GetPod(podID string) (*api.Pod, error) GetPod(ctx api.Context, podID string) (*api.Pod, error)
// Create a pod based on a specification. // Create a pod based on a specification.
CreatePod(pod *api.Pod) error CreatePod(ctx api.Context, pod *api.Pod) error
// Update an existing pod // Update an existing pod
UpdatePod(pod *api.Pod) error UpdatePod(ctx api.Context, pod *api.Pod) error
// Delete an existing pod // Delete an existing pod
DeletePod(podID string) error DeletePod(ctx api.Context, podID string) error
} }

View File

@ -81,21 +81,21 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje
pod.CreationTimestamp = util.Now() pod.CreationTimestamp = util.Now()
return apiserver.MakeAsync(func() (runtime.Object, error) { return apiserver.MakeAsync(func() (runtime.Object, error) {
if err := rs.registry.CreatePod(pod); err != nil { if err := rs.registry.CreatePod(ctx, pod); err != nil {
return nil, err return nil, err
} }
return rs.registry.GetPod(pod.ID) return rs.registry.GetPod(ctx, pod.ID)
}), nil }), nil
} }
func (rs *REST) Delete(ctx api.Context, id string) (<-chan runtime.Object, error) { func (rs *REST) Delete(ctx api.Context, id string) (<-chan runtime.Object, error) {
return apiserver.MakeAsync(func() (runtime.Object, error) { return apiserver.MakeAsync(func() (runtime.Object, error) {
return &api.Status{Status: api.StatusSuccess}, rs.registry.DeletePod(id) return &api.Status{Status: api.StatusSuccess}, rs.registry.DeletePod(ctx, id)
}), nil }), nil
} }
func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) { func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) {
pod, err := rs.registry.GetPod(id) pod, err := rs.registry.GetPod(ctx, id)
if err != nil { if err != nil {
return pod, err return pod, err
} }
@ -132,7 +132,7 @@ func (rs *REST) filterFunc(label, field labels.Selector) func(*api.Pod) bool {
} }
func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) { func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) {
pods, err := rs.registry.ListPodsPredicate(rs.filterFunc(label, field)) pods, err := rs.registry.ListPodsPredicate(ctx, rs.filterFunc(label, field))
if err == nil { if err == nil {
for i := range pods.Items { for i := range pods.Items {
pod := &pods.Items[i] pod := &pods.Items[i]
@ -150,7 +150,7 @@ func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Obj
// Watch begins watching for new, changed, or deleted pods. // Watch begins watching for new, changed, or deleted pods.
func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return rs.registry.WatchPods(resourceVersion, rs.filterFunc(label, field)) return rs.registry.WatchPods(ctx, resourceVersion, rs.filterFunc(label, field))
} }
func (*REST) New() runtime.Object { func (*REST) New() runtime.Object {
@ -163,10 +163,10 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje
return nil, errors.NewInvalid("pod", pod.ID, errs) return nil, errors.NewInvalid("pod", pod.ID, errs)
} }
return apiserver.MakeAsync(func() (runtime.Object, error) { return apiserver.MakeAsync(func() (runtime.Object, error) {
if err := rs.registry.UpdatePod(pod); err != nil { if err := rs.registry.UpdatePod(ctx, pod); err != nil {
return nil, err return nil, err
} }
return rs.registry.GetPod(pod.ID) return rs.registry.GetPod(ctx, pod.ID)
}), nil }), nil
} }

View File

@ -27,26 +27,26 @@ type ControllerRegistry struct {
Controllers *api.ReplicationControllerList Controllers *api.ReplicationControllerList
} }
func (r *ControllerRegistry) ListControllers() (*api.ReplicationControllerList, error) { func (r *ControllerRegistry) ListControllers(ctx api.Context) (*api.ReplicationControllerList, error) {
return r.Controllers, r.Err return r.Controllers, r.Err
} }
func (r *ControllerRegistry) GetController(ID string) (*api.ReplicationController, error) { func (r *ControllerRegistry) GetController(ctx api.Context, ID string) (*api.ReplicationController, error) {
return &api.ReplicationController{}, r.Err return &api.ReplicationController{}, r.Err
} }
func (r *ControllerRegistry) CreateController(controller *api.ReplicationController) error { func (r *ControllerRegistry) CreateController(ctx api.Context, controller *api.ReplicationController) error {
return r.Err return r.Err
} }
func (r *ControllerRegistry) UpdateController(controller *api.ReplicationController) error { func (r *ControllerRegistry) UpdateController(ctx api.Context, controller *api.ReplicationController) error {
return r.Err return r.Err
} }
func (r *ControllerRegistry) DeleteController(ID string) error { func (r *ControllerRegistry) DeleteController(ctx api.Context, ID string) error {
return r.Err return r.Err
} }
func (r *ControllerRegistry) WatchControllers(resourceVersion uint64) (watch.Interface, error) { func (r *ControllerRegistry) WatchControllers(ctx api.Context, resourceVersion uint64) (watch.Interface, error) {
return nil, r.Err return nil, r.Err
} }

View File

@ -40,7 +40,7 @@ func NewPodRegistry(pods *api.PodList) *PodRegistry {
} }
} }
func (r *PodRegistry) ListPodsPredicate(filter func(*api.Pod) bool) (*api.PodList, error) { func (r *PodRegistry) ListPodsPredicate(ctx api.Context, filter func(*api.Pod) bool) (*api.PodList, error) {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
if r.Err != nil { if r.Err != nil {
@ -58,23 +58,23 @@ func (r *PodRegistry) ListPodsPredicate(filter func(*api.Pod) bool) (*api.PodLis
} }
func (r *PodRegistry) ListPods(ctx api.Context, selector labels.Selector) (*api.PodList, error) { func (r *PodRegistry) ListPods(ctx api.Context, selector labels.Selector) (*api.PodList, error) {
return r.ListPodsPredicate(func(pod *api.Pod) bool { return r.ListPodsPredicate(ctx, func(pod *api.Pod) bool {
return selector.Matches(labels.Set(pod.Labels)) return selector.Matches(labels.Set(pod.Labels))
}) })
} }
func (r *PodRegistry) WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) { func (r *PodRegistry) WatchPods(ctx api.Context, resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) {
// TODO: wire filter down into the mux; it needs access to current and previous state :( // TODO: wire filter down into the mux; it needs access to current and previous state :(
return r.mux.Watch(), nil return r.mux.Watch(), nil
} }
func (r *PodRegistry) GetPod(podId string) (*api.Pod, error) { func (r *PodRegistry) GetPod(ctx api.Context, podId string) (*api.Pod, error) {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
return r.Pod, r.Err return r.Pod, r.Err
} }
func (r *PodRegistry) CreatePod(pod *api.Pod) error { func (r *PodRegistry) CreatePod(ctx api.Context, pod *api.Pod) error {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
r.Pod = pod r.Pod = pod
@ -82,7 +82,7 @@ func (r *PodRegistry) CreatePod(pod *api.Pod) error {
return r.Err return r.Err
} }
func (r *PodRegistry) UpdatePod(pod *api.Pod) error { func (r *PodRegistry) UpdatePod(ctx api.Context, pod *api.Pod) error {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
r.Pod = pod r.Pod = pod
@ -90,7 +90,7 @@ func (r *PodRegistry) UpdatePod(pod *api.Pod) error {
return r.Err return r.Err
} }
func (r *PodRegistry) DeletePod(podId string) error { func (r *PodRegistry) DeletePod(ctx api.Context, podId string) error {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
r.mux.Action(watch.Deleted, r.Pod) r.mux.Action(watch.Deleted, r.Pod)

View File

@ -38,49 +38,49 @@ type ServiceRegistry struct {
UpdatedID string UpdatedID string
} }
func (r *ServiceRegistry) ListServices() (*api.ServiceList, error) { func (r *ServiceRegistry) ListServices(ctx api.Context) (*api.ServiceList, error) {
return &r.List, r.Err return &r.List, r.Err
} }
func (r *ServiceRegistry) CreateService(svc *api.Service) error { func (r *ServiceRegistry) CreateService(ctx api.Context, svc *api.Service) error {
r.Service = svc r.Service = svc
r.List.Items = append(r.List.Items, *svc) r.List.Items = append(r.List.Items, *svc)
return r.Err return r.Err
} }
func (r *ServiceRegistry) GetService(id string) (*api.Service, error) { func (r *ServiceRegistry) GetService(ctx api.Context, id string) (*api.Service, error) {
r.GottenID = id r.GottenID = id
return r.Service, r.Err return r.Service, r.Err
} }
func (r *ServiceRegistry) DeleteService(id string) error { func (r *ServiceRegistry) DeleteService(ctx api.Context, id string) error {
r.DeletedID = id r.DeletedID = id
return r.Err return r.Err
} }
func (r *ServiceRegistry) UpdateService(svc *api.Service) error { func (r *ServiceRegistry) UpdateService(ctx api.Context, svc *api.Service) error {
r.UpdatedID = svc.ID r.UpdatedID = svc.ID
return r.Err return r.Err
} }
func (r *ServiceRegistry) WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { func (r *ServiceRegistry) WatchServices(ctx api.Context, label labels.Selector, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return nil, r.Err return nil, r.Err
} }
func (r *ServiceRegistry) ListEndpoints() (*api.EndpointsList, error) { func (r *ServiceRegistry) ListEndpoints(ctx api.Context) (*api.EndpointsList, error) {
return &r.EndpointsList, r.Err return &r.EndpointsList, r.Err
} }
func (r *ServiceRegistry) GetEndpoints(id string) (*api.Endpoints, error) { func (r *ServiceRegistry) GetEndpoints(ctx api.Context, id string) (*api.Endpoints, error) {
r.GottenID = id r.GottenID = id
return &r.Endpoints, r.Err return &r.Endpoints, r.Err
} }
func (r *ServiceRegistry) UpdateEndpoints(e *api.Endpoints) error { func (r *ServiceRegistry) UpdateEndpoints(ctx api.Context, e *api.Endpoints) error {
r.Endpoints = *e r.Endpoints = *e
return r.Err return r.Err
} }
func (r *ServiceRegistry) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { func (r *ServiceRegistry) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return nil, r.Err return nil, r.Err
} }

View File

@ -25,12 +25,12 @@ import (
// Registry is an interface for things that know how to store services. // Registry is an interface for things that know how to store services.
type Registry interface { type Registry interface {
ListServices() (*api.ServiceList, error) ListServices(ctx api.Context) (*api.ServiceList, error)
CreateService(svc *api.Service) error CreateService(ctx api.Context, svc *api.Service) error
GetService(name string) (*api.Service, error) GetService(ctx api.Context, name string) (*api.Service, error)
DeleteService(name string) error DeleteService(ctx api.Context, name string) error
UpdateService(svc *api.Service) error UpdateService(ctx api.Context, svc *api.Service) error
WatchServices(labels, fields labels.Selector, resourceVersion uint64) (watch.Interface, error) WatchServices(ctx api.Context, labels, fields labels.Selector, resourceVersion uint64) (watch.Interface, error)
// TODO: endpoints and their implementation should be separated, setting endpoints should be // TODO: endpoints and their implementation should be separated, setting endpoints should be
// supported via the API, and the endpoints-controller should use the API to update endpoints. // supported via the API, and the endpoints-controller should use the API to update endpoints.

View File

@ -86,27 +86,27 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje
return nil, err return nil, err
} }
} }
err := rs.registry.CreateService(srv) err := rs.registry.CreateService(ctx, srv)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return rs.registry.GetService(srv.ID) return rs.registry.GetService(ctx, srv.ID)
}), nil }), nil
} }
func (rs *REST) Delete(ctx api.Context, id string) (<-chan runtime.Object, error) { func (rs *REST) Delete(ctx api.Context, id string) (<-chan runtime.Object, error) {
service, err := rs.registry.GetService(id) service, err := rs.registry.GetService(ctx, id)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return apiserver.MakeAsync(func() (runtime.Object, error) { return apiserver.MakeAsync(func() (runtime.Object, error) {
rs.deleteExternalLoadBalancer(service) rs.deleteExternalLoadBalancer(service)
return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteService(id) return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteService(ctx, id)
}), nil }), nil
} }
func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) { func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) {
s, err := rs.registry.GetService(id) s, err := rs.registry.GetService(ctx, id)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -115,7 +115,7 @@ func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) {
// TODO: implement field selector? // TODO: implement field selector?
func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) { func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) {
list, err := rs.registry.ListServices() list, err := rs.registry.ListServices(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -132,7 +132,7 @@ func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Obj
// Watch returns Services events via a watch.Interface. // Watch returns Services events via a watch.Interface.
// It implements apiserver.ResourceWatcher. // It implements apiserver.ResourceWatcher.
func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return rs.registry.WatchServices(label, field, resourceVersion) return rs.registry.WatchServices(ctx, label, field, resourceVersion)
} }
func (*REST) New() runtime.Object { func (*REST) New() runtime.Object {
@ -141,9 +141,9 @@ func (*REST) New() runtime.Object {
// GetServiceEnvironmentVariables populates a list of environment variables that are use // GetServiceEnvironmentVariables populates a list of environment variables that are use
// in the container environment to get access to services. // in the container environment to get access to services.
func GetServiceEnvironmentVariables(registry Registry, machine string) ([]api.EnvVar, error) { func GetServiceEnvironmentVariables(ctx api.Context, registry Registry, machine string) ([]api.EnvVar, error) {
var result []api.EnvVar var result []api.EnvVar
services, err := registry.ListServices() services, err := registry.ListServices(ctx)
if err != nil { if err != nil {
return result, err return result, err
} }
@ -170,17 +170,17 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje
} }
return apiserver.MakeAsync(func() (runtime.Object, error) { return apiserver.MakeAsync(func() (runtime.Object, error) {
// TODO: check to see if external load balancer status changed // TODO: check to see if external load balancer status changed
err := rs.registry.UpdateService(srv) err := rs.registry.UpdateService(ctx, srv)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return rs.registry.GetService(srv.ID) return rs.registry.GetService(ctx, srv.ID)
}), nil }), nil
} }
// ResourceLocation returns a URL to which one can send traffic for the specified service. // ResourceLocation returns a URL to which one can send traffic for the specified service.
func (rs *REST) ResourceLocation(ctx api.Context, id string) (string, error) { func (rs *REST) ResourceLocation(ctx api.Context, id string) (string, error) {
e, err := rs.registry.GetEndpoints(id) e, err := rs.registry.GetEndpoints(ctx, id)
if err != nil { if err != nil {
return "", err return "", err
} }

View File

@ -53,7 +53,7 @@ func TestServiceRegistryCreate(t *testing.T) {
if len(fakeCloud.Calls) != 0 { if len(fakeCloud.Calls) != 0 {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
} }
srv, err := registry.GetService(svc.ID) srv, err := registry.GetService(ctx, svc.ID)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -90,14 +90,14 @@ func TestServiceStorageValidatesCreate(t *testing.T) {
} }
func TestServiceRegistryUpdate(t *testing.T) { func TestServiceRegistryUpdate(t *testing.T) {
ctx := api.NewContext()
registry := registrytest.NewServiceRegistry() registry := registrytest.NewServiceRegistry()
registry.CreateService(&api.Service{ registry.CreateService(ctx, &api.Service{
Port: 6502, Port: 6502,
JSONBase: api.JSONBase{ID: "foo"}, JSONBase: api.JSONBase{ID: "foo"},
Selector: map[string]string{"bar": "baz1"}, Selector: map[string]string{"bar": "baz1"},
}) })
storage := NewREST(registry, nil, nil) storage := NewREST(registry, nil, nil)
ctx := api.NewContext()
c, err := storage.Update(ctx, &api.Service{ c, err := storage.Update(ctx, &api.Service{
Port: 6502, Port: 6502,
JSONBase: api.JSONBase{ID: "foo"}, JSONBase: api.JSONBase{ID: "foo"},
@ -120,8 +120,9 @@ func TestServiceRegistryUpdate(t *testing.T) {
} }
func TestServiceStorageValidatesUpdate(t *testing.T) { func TestServiceStorageValidatesUpdate(t *testing.T) {
ctx := api.NewContext()
registry := registrytest.NewServiceRegistry() registry := registrytest.NewServiceRegistry()
registry.CreateService(&api.Service{ registry.CreateService(ctx, &api.Service{
Port: 6502, Port: 6502,
JSONBase: api.JSONBase{ID: "foo"}, JSONBase: api.JSONBase{ID: "foo"},
Selector: map[string]string{"bar": "baz"}, Selector: map[string]string{"bar": "baz"},
@ -139,7 +140,6 @@ func TestServiceStorageValidatesUpdate(t *testing.T) {
Selector: map[string]string{}, Selector: map[string]string{},
}, },
} }
ctx := api.NewContext()
for _, failureCase := range failureCases { for _, failureCase := range failureCases {
c, err := storage.Update(ctx, &failureCase) c, err := storage.Update(ctx, &failureCase)
if c != nil { if c != nil {
@ -152,6 +152,7 @@ func TestServiceStorageValidatesUpdate(t *testing.T) {
} }
func TestServiceRegistryExternalService(t *testing.T) { func TestServiceRegistryExternalService(t *testing.T) {
ctx := api.NewContext()
registry := registrytest.NewServiceRegistry() registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{} fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"} machines := []string{"foo", "bar", "baz"}
@ -162,13 +163,12 @@ func TestServiceRegistryExternalService(t *testing.T) {
Selector: map[string]string{"bar": "baz"}, Selector: map[string]string{"bar": "baz"},
CreateExternalLoadBalancer: true, CreateExternalLoadBalancer: true,
} }
ctx := api.NewContext()
c, _ := storage.Create(ctx, svc) c, _ := storage.Create(ctx, svc)
<-c <-c
if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "create" { if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "create" {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
} }
srv, err := registry.GetService(svc.ID) srv, err := registry.GetService(ctx, svc.ID)
if err != nil { if err != nil {
t.Errorf("Unexpected error: %v", err) t.Errorf("Unexpected error: %v", err)
} }
@ -202,6 +202,7 @@ func TestServiceRegistryExternalServiceError(t *testing.T) {
} }
func TestServiceRegistryDelete(t *testing.T) { func TestServiceRegistryDelete(t *testing.T) {
ctx := api.NewContext()
registry := registrytest.NewServiceRegistry() registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{} fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"} machines := []string{"foo", "bar", "baz"}
@ -210,8 +211,7 @@ func TestServiceRegistryDelete(t *testing.T) {
JSONBase: api.JSONBase{ID: "foo"}, JSONBase: api.JSONBase{ID: "foo"},
Selector: map[string]string{"bar": "baz"}, Selector: map[string]string{"bar": "baz"},
} }
ctx := api.NewContext() registry.CreateService(ctx, svc)
registry.CreateService(svc)
c, _ := storage.Delete(ctx, svc.ID) c, _ := storage.Delete(ctx, svc.ID)
<-c <-c
if len(fakeCloud.Calls) != 0 { if len(fakeCloud.Calls) != 0 {
@ -223,6 +223,7 @@ func TestServiceRegistryDelete(t *testing.T) {
} }
func TestServiceRegistryDeleteExternal(t *testing.T) { func TestServiceRegistryDeleteExternal(t *testing.T) {
ctx := api.NewContext()
registry := registrytest.NewServiceRegistry() registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{} fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"} machines := []string{"foo", "bar", "baz"}
@ -232,8 +233,7 @@ func TestServiceRegistryDeleteExternal(t *testing.T) {
Selector: map[string]string{"bar": "baz"}, Selector: map[string]string{"bar": "baz"},
CreateExternalLoadBalancer: true, CreateExternalLoadBalancer: true,
} }
ctx := api.NewContext() registry.CreateService(ctx, svc)
registry.CreateService(svc)
c, _ := storage.Delete(ctx, svc.ID) c, _ := storage.Delete(ctx, svc.ID)
<-c <-c
if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "delete" { if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "delete" {
@ -245,6 +245,7 @@ func TestServiceRegistryDeleteExternal(t *testing.T) {
} }
func TestServiceRegistryMakeLinkVariables(t *testing.T) { func TestServiceRegistryMakeLinkVariables(t *testing.T) {
ctx := api.NewContext()
registry := registrytest.NewServiceRegistry() registry := registrytest.NewServiceRegistry()
registry.List = api.ServiceList{ registry.List = api.ServiceList{
Items: []api.Service{ Items: []api.Service{
@ -269,7 +270,7 @@ func TestServiceRegistryMakeLinkVariables(t *testing.T) {
}, },
} }
machine := "machine" machine := "machine"
vars, err := GetServiceEnvironmentVariables(registry, machine) vars, err := GetServiceEnvironmentVariables(ctx, registry, machine)
if err != nil { if err != nil {
t.Errorf("Unexpected err: %v", err) t.Errorf("Unexpected err: %v", err)
} }
@ -309,15 +310,15 @@ func TestServiceRegistryMakeLinkVariables(t *testing.T) {
} }
func TestServiceRegistryGet(t *testing.T) { func TestServiceRegistryGet(t *testing.T) {
ctx := api.NewContext()
registry := registrytest.NewServiceRegistry() registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{} fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"} machines := []string{"foo", "bar", "baz"}
storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines)) storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines))
registry.CreateService(&api.Service{ registry.CreateService(ctx, &api.Service{
JSONBase: api.JSONBase{ID: "foo"}, JSONBase: api.JSONBase{ID: "foo"},
Selector: map[string]string{"bar": "baz"}, Selector: map[string]string{"bar": "baz"},
}) })
ctx := api.NewContext()
storage.Get(ctx, "foo") storage.Get(ctx, "foo")
if len(fakeCloud.Calls) != 0 { if len(fakeCloud.Calls) != 0 {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
@ -328,13 +329,13 @@ func TestServiceRegistryGet(t *testing.T) {
} }
func TestServiceRegistryResourceLocation(t *testing.T) { func TestServiceRegistryResourceLocation(t *testing.T) {
ctx := api.NewContext()
registry := registrytest.NewServiceRegistry() registry := registrytest.NewServiceRegistry()
registry.Endpoints = api.Endpoints{Endpoints: []string{"foo:80"}} registry.Endpoints = api.Endpoints{Endpoints: []string{"foo:80"}}
fakeCloud := &cloud.FakeCloud{} fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"} machines := []string{"foo", "bar", "baz"}
storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines)) storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines))
ctx := api.NewContext() registry.CreateService(ctx, &api.Service{
registry.CreateService(&api.Service{
JSONBase: api.JSONBase{ID: "foo"}, JSONBase: api.JSONBase{ID: "foo"},
Selector: map[string]string{"bar": "baz"}, Selector: map[string]string{"bar": "baz"},
}) })
@ -358,20 +359,20 @@ func TestServiceRegistryResourceLocation(t *testing.T) {
} }
func TestServiceRegistryList(t *testing.T) { func TestServiceRegistryList(t *testing.T) {
ctx := api.NewContext()
registry := registrytest.NewServiceRegistry() registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{} fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"} machines := []string{"foo", "bar", "baz"}
storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines)) storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines))
registry.CreateService(&api.Service{ registry.CreateService(ctx, &api.Service{
JSONBase: api.JSONBase{ID: "foo"}, JSONBase: api.JSONBase{ID: "foo"},
Selector: map[string]string{"bar": "baz"}, Selector: map[string]string{"bar": "baz"},
}) })
registry.CreateService(&api.Service{ registry.CreateService(ctx, &api.Service{
JSONBase: api.JSONBase{ID: "foo2"}, JSONBase: api.JSONBase{ID: "foo2"},
Selector: map[string]string{"bar2": "baz2"}, Selector: map[string]string{"bar2": "baz2"},
}) })
registry.List.ResourceVersion = 1 registry.List.ResourceVersion = 1
ctx := api.NewContext()
s, _ := storage.List(ctx, labels.Everything(), labels.Everything()) s, _ := storage.List(ctx, labels.Everything(), labels.Everything())
sl := s.(*api.ServiceList) sl := s.(*api.ServiceList)
if len(fakeCloud.Calls) != 0 { if len(fakeCloud.Calls) != 0 {

View File

@ -73,7 +73,7 @@ func (e *EndpointController) SyncServiceEndpoints() error {
endpoints[ix] = net.JoinHostPort(pod.CurrentState.PodIP, strconv.Itoa(port)) endpoints[ix] = net.JoinHostPort(pod.CurrentState.PodIP, strconv.Itoa(port))
} }
// TODO: this is totally broken, we need to compute this and store inside an AtomicUpdate loop. // TODO: this is totally broken, we need to compute this and store inside an AtomicUpdate loop.
err = e.serviceRegistry.UpdateEndpoints(&api.Endpoints{ err = e.serviceRegistry.UpdateEndpoints(api.NewContext(), &api.Endpoints{
JSONBase: api.JSONBase{ID: service.ID}, JSONBase: api.JSONBase{ID: service.ID},
Endpoints: endpoints, Endpoints: endpoints,
}) })