From 09365fed8d887561825fcebd56de7828aa41df2b Mon Sep 17 00:00:00 2001 From: derekwaynecarr Date: Fri, 26 Sep 2014 15:18:42 -0400 Subject: [PATCH 1/2] Add ctx to registry interfaces --- pkg/registry/binding/registry.go | 2 +- pkg/registry/binding/rest.go | 2 +- pkg/registry/controller/registry.go | 12 +++---- pkg/registry/controller/rest.go | 16 +++++----- pkg/registry/endpoint/registry.go | 8 ++--- pkg/registry/endpoint/rest.go | 6 ++-- pkg/registry/etcd/etcd.go | 48 ++++++++++++++-------------- pkg/registry/pod/manifest_factory.go | 2 +- pkg/registry/pod/registry.go | 12 +++---- pkg/registry/pod/rest.go | 16 +++++----- pkg/registry/service/registry.go | 12 +++---- pkg/registry/service/rest.go | 24 +++++++------- pkg/service/endpoints_controller.go | 2 +- 13 files changed, 81 insertions(+), 81 deletions(-) diff --git a/pkg/registry/binding/registry.go b/pkg/registry/binding/registry.go index 3cf6b9ec99e..8a61559bc93 100644 --- a/pkg/registry/binding/registry.go +++ b/pkg/registry/binding/registry.go @@ -24,5 +24,5 @@ import ( type Registry interface { // ApplyBinding should apply the binding. That is, it should actually // assign or place pod binding.PodID on machine binding.Host. - ApplyBinding(binding *api.Binding) error + ApplyBinding(ctx api.Context, binding *api.Binding) error } diff --git a/pkg/registry/binding/rest.go b/pkg/registry/binding/rest.go index 412a4da2584..8c02a920e6f 100644 --- a/pkg/registry/binding/rest.go +++ b/pkg/registry/binding/rest.go @@ -67,7 +67,7 @@ func (b *REST) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Objec return nil, fmt.Errorf("incorrect type: %#v", obj) } return apiserver.MakeAsync(func() (runtime.Object, error) { - if err := b.registry.ApplyBinding(binding); err != nil { + if err := b.registry.ApplyBinding(ctx, binding); err != nil { return nil, err } return &api.Status{Status: api.StatusSuccess}, nil diff --git a/pkg/registry/controller/registry.go b/pkg/registry/controller/registry.go index 621577fd539..8afa3ee1563 100644 --- a/pkg/registry/controller/registry.go +++ b/pkg/registry/controller/registry.go @@ -23,10 +23,10 @@ import ( // Registry is an interface for things that know how to store ReplicationControllers. type Registry interface { - ListControllers() (*api.ReplicationControllerList, error) - WatchControllers(resourceVersion uint64) (watch.Interface, error) - GetController(controllerID string) (*api.ReplicationController, error) - CreateController(controller *api.ReplicationController) error - UpdateController(controller *api.ReplicationController) error - DeleteController(controllerID string) error + ListControllers(ctx api.Context) (*api.ReplicationControllerList, error) + WatchControllers(ctx api.Context, resourceVersion uint64) (watch.Interface, error) + GetController(ctx api.Context, controllerID string) (*api.ReplicationController, error) + CreateController(ctx api.Context, controller *api.ReplicationController) error + UpdateController(ctx api.Context, controller *api.ReplicationController) error + DeleteController(ctx api.Context, controllerID string) error } diff --git a/pkg/registry/controller/rest.go b/pkg/registry/controller/rest.go index d9c2e3b0f97..2665be976d6 100644 --- a/pkg/registry/controller/rest.go +++ b/pkg/registry/controller/rest.go @@ -71,24 +71,24 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje controller.CreationTimestamp = util.Now() return apiserver.MakeAsync(func() (runtime.Object, error) { - err := rs.registry.CreateController(controller) + err := rs.registry.CreateController(ctx, controller) if err != nil { return nil, err } - return rs.registry.GetController(controller.ID) + return rs.registry.GetController(ctx, controller.ID) }), nil } // Delete asynchronously deletes the ReplicationController specified by its id. func (rs *REST) Delete(ctx api.Context, id string) (<-chan runtime.Object, error) { return apiserver.MakeAsync(func() (runtime.Object, error) { - return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteController(id) + return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteController(ctx, id) }), nil } // Get obtains the ReplicationController specified by its id. func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) { - controller, err := rs.registry.GetController(id) + controller, err := rs.registry.GetController(ctx, id) if err != nil { return nil, err } @@ -101,7 +101,7 @@ func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Obj if !field.Empty() { return nil, fmt.Errorf("field selector not supported yet") } - controllers, err := rs.registry.ListControllers() + controllers, err := rs.registry.ListControllers(ctx) if err != nil { return nil, err } @@ -132,11 +132,11 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje return nil, errors.NewInvalid("replicationController", controller.ID, errs) } return apiserver.MakeAsync(func() (runtime.Object, error) { - err := rs.registry.UpdateController(controller) + err := rs.registry.UpdateController(ctx, controller) if err != nil { return nil, err } - return rs.registry.GetController(controller.ID) + return rs.registry.GetController(ctx, controller.ID) }), nil } @@ -146,7 +146,7 @@ func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVer if !field.Empty() { return nil, fmt.Errorf("no field selector implemented for controllers") } - incoming, err := rs.registry.WatchControllers(resourceVersion) + incoming, err := rs.registry.WatchControllers(ctx, resourceVersion) if err != nil { return nil, err } diff --git a/pkg/registry/endpoint/registry.go b/pkg/registry/endpoint/registry.go index ed03f0b1298..cb88fd3fe9e 100644 --- a/pkg/registry/endpoint/registry.go +++ b/pkg/registry/endpoint/registry.go @@ -24,8 +24,8 @@ import ( // Registry is an interface for things that know how to store endpoints. type Registry interface { - ListEndpoints() (*api.EndpointsList, error) - GetEndpoints(name string) (*api.Endpoints, error) - WatchEndpoints(labels, fields labels.Selector, resourceVersion uint64) (watch.Interface, error) - UpdateEndpoints(e *api.Endpoints) error + ListEndpoints(ctx api.Context) (*api.EndpointsList, error) + GetEndpoints(ctx api.Context, name string) (*api.Endpoints, error) + WatchEndpoints(ctx api.Context, labels, fields labels.Selector, resourceVersion uint64) (watch.Interface, error) + UpdateEndpoints(ctx api.Context, e *api.Endpoints) error } diff --git a/pkg/registry/endpoint/rest.go b/pkg/registry/endpoint/rest.go index a11c43ce4d4..2ea28e540c9 100644 --- a/pkg/registry/endpoint/rest.go +++ b/pkg/registry/endpoint/rest.go @@ -39,7 +39,7 @@ func NewREST(registry Registry) *REST { // Get satisfies the RESTStorage interface. func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) { - return rs.registry.GetEndpoints(id) + return rs.registry.GetEndpoints(ctx, id) } // List satisfies the RESTStorage interface. @@ -47,13 +47,13 @@ func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Obj if !label.Empty() || !field.Empty() { return nil, errors.New("label/field selectors are not supported on endpoints") } - return rs.registry.ListEndpoints() + return rs.registry.ListEndpoints(ctx) } // Watch returns Endpoint events via a watch.Interface. // It implements apiserver.ResourceWatcher. func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { - return rs.registry.WatchEndpoints(label, field, resourceVersion) + return rs.registry.WatchEndpoints(ctx, label, field, resourceVersion) } // Create satisfies the RESTStorage interface but is unimplemented. diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index 16339d19c94..68701c0a95f 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -56,13 +56,13 @@ func makePodKey(podID string) string { // ListPods obtains a list of pods with labels that match selector. func (r *Registry) ListPods(ctx api.Context, selector labels.Selector) (*api.PodList, error) { - return r.ListPodsPredicate(func(pod *api.Pod) bool { + return r.ListPodsPredicate(ctx, func(pod *api.Pod) bool { return selector.Matches(labels.Set(pod.Labels)) }) } // ListPodsPredicate obtains a list of pods that match filter. -func (r *Registry) ListPodsPredicate(filter func(*api.Pod) bool) (*api.PodList, error) { +func (r *Registry) ListPodsPredicate(ctx api.Context, filter func(*api.Pod) bool) (*api.PodList, error) { allPods := api.PodList{} err := r.ExtractList("/registry/pods", &allPods.Items, &allPods.ResourceVersion) if err != nil { @@ -83,7 +83,7 @@ func (r *Registry) ListPodsPredicate(filter func(*api.Pod) bool) (*api.PodList, } // WatchPods begins watching for new, changed, or deleted pods. -func (r *Registry) WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) { +func (r *Registry) WatchPods(ctx api.Context, resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) { return r.WatchList("/registry/pods", resourceVersion, func(obj runtime.Object) bool { switch t := obj.(type) { case *api.Pod: @@ -96,7 +96,7 @@ func (r *Registry) WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) } // GetPod gets a specific pod specified by its ID. -func (r *Registry) GetPod(podID string) (*api.Pod, error) { +func (r *Registry) GetPod(ctx api.Context, podID string) (*api.Pod, error) { var pod api.Pod if err := r.ExtractObj(makePodKey(podID), &pod, false); err != nil { return nil, etcderr.InterpretGetError(err, "pod", podID) @@ -113,7 +113,7 @@ func makeContainerKey(machine string) string { } // CreatePod creates a pod based on a specification. -func (r *Registry) CreatePod(pod *api.Pod) error { +func (r *Registry) CreatePod(ctx api.Context, pod *api.Pod) error { // Set current status to "Waiting". pod.CurrentState.Status = api.PodWaiting pod.CurrentState.Host = "" @@ -125,7 +125,7 @@ func (r *Registry) CreatePod(pod *api.Pod) error { } // ApplyBinding implements binding's registry -func (r *Registry) ApplyBinding(binding *api.Binding) error { +func (r *Registry) ApplyBinding(ctx api.Context, binding *api.Binding) error { return etcderr.InterpretCreateError(r.assignPod(binding.PodID, binding.Host), "binding", "") } @@ -178,12 +178,12 @@ func (r *Registry) assignPod(podID string, machine string) error { return err } -func (r *Registry) UpdatePod(pod *api.Pod) error { +func (r *Registry) UpdatePod(ctx api.Context, pod *api.Pod) error { return fmt.Errorf("unimplemented!") } // DeletePod deletes an existing pod specified by its ID. -func (r *Registry) DeletePod(podID string) error { +func (r *Registry) DeletePod(ctx api.Context, podID string) error { var pod api.Pod podKey := makePodKey(podID) err := r.ExtractObj(podKey, &pod, false) @@ -226,14 +226,14 @@ func (r *Registry) DeletePod(podID string) error { } // ListControllers obtains a list of ReplicationControllers. -func (r *Registry) ListControllers() (*api.ReplicationControllerList, error) { +func (r *Registry) ListControllers(ctx api.Context) (*api.ReplicationControllerList, error) { controllers := &api.ReplicationControllerList{} err := r.ExtractList("/registry/controllers", &controllers.Items, &controllers.ResourceVersion) return controllers, err } // WatchControllers begins watching for new, changed, or deleted controllers. -func (r *Registry) WatchControllers(resourceVersion uint64) (watch.Interface, error) { +func (r *Registry) WatchControllers(ctx api.Context, resourceVersion uint64) (watch.Interface, error) { return r.WatchList("/registry/controllers", resourceVersion, tools.Everything) } @@ -242,7 +242,7 @@ func makeControllerKey(id string) string { } // GetController gets a specific ReplicationController specified by its ID. -func (r *Registry) GetController(controllerID string) (*api.ReplicationController, error) { +func (r *Registry) GetController(ctx api.Context, controllerID string) (*api.ReplicationController, error) { var controller api.ReplicationController key := makeControllerKey(controllerID) err := r.ExtractObj(key, &controller, false) @@ -253,19 +253,19 @@ func (r *Registry) GetController(controllerID string) (*api.ReplicationControlle } // CreateController creates a new ReplicationController. -func (r *Registry) CreateController(controller *api.ReplicationController) error { +func (r *Registry) CreateController(ctx api.Context, controller *api.ReplicationController) error { err := r.CreateObj(makeControllerKey(controller.ID), controller, 0) return etcderr.InterpretCreateError(err, "replicationController", controller.ID) } // UpdateController replaces an existing ReplicationController. -func (r *Registry) UpdateController(controller *api.ReplicationController) error { +func (r *Registry) UpdateController(ctx api.Context, controller *api.ReplicationController) error { err := r.SetObj(makeControllerKey(controller.ID), controller) return etcderr.InterpretUpdateError(err, "replicationController", controller.ID) } // DeleteController deletes a ReplicationController specified by its ID. -func (r *Registry) DeleteController(controllerID string) error { +func (r *Registry) DeleteController(ctx api.Context, controllerID string) error { key := makeControllerKey(controllerID) err := r.Delete(key, false) return etcderr.InterpretDeleteError(err, "replicationController", controllerID) @@ -276,20 +276,20 @@ func makeServiceKey(name string) string { } // ListServices obtains a list of Services. -func (r *Registry) ListServices() (*api.ServiceList, error) { +func (r *Registry) ListServices(ctx api.Context) (*api.ServiceList, error) { list := &api.ServiceList{} err := r.ExtractList("/registry/services/specs", &list.Items, &list.ResourceVersion) return list, err } // CreateService creates a new Service. -func (r *Registry) CreateService(svc *api.Service) error { +func (r *Registry) CreateService(ctx api.Context, svc *api.Service) error { err := r.CreateObj(makeServiceKey(svc.ID), svc, 0) return etcderr.InterpretCreateError(err, "service", svc.ID) } // GetService obtains a Service specified by its name. -func (r *Registry) GetService(name string) (*api.Service, error) { +func (r *Registry) GetService(ctx api.Context, name string) (*api.Service, error) { key := makeServiceKey(name) var svc api.Service err := r.ExtractObj(key, &svc, false) @@ -300,7 +300,7 @@ func (r *Registry) GetService(name string) (*api.Service, error) { } // GetEndpoints obtains the endpoints for the service identified by 'name'. -func (r *Registry) GetEndpoints(name string) (*api.Endpoints, error) { +func (r *Registry) GetEndpoints(ctx api.Context, name string) (*api.Endpoints, error) { key := makeServiceEndpointsKey(name) var endpoints api.Endpoints err := r.ExtractObj(key, &endpoints, false) @@ -315,7 +315,7 @@ func makeServiceEndpointsKey(name string) string { } // DeleteService deletes a Service specified by its name. -func (r *Registry) DeleteService(name string) error { +func (r *Registry) DeleteService(ctx api.Context, name string) error { key := makeServiceKey(name) err := r.Delete(key, true) if err != nil { @@ -332,13 +332,13 @@ func (r *Registry) DeleteService(name string) error { } // UpdateService replaces an existing Service. -func (r *Registry) UpdateService(svc *api.Service) error { +func (r *Registry) UpdateService(ctx api.Context, svc *api.Service) error { err := r.SetObj(makeServiceKey(svc.ID), svc) return etcderr.InterpretUpdateError(err, "service", svc.ID) } // WatchServices begins watching for new, changed, or deleted service configurations. -func (r *Registry) WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (r *Registry) WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { if !label.Empty() { return nil, fmt.Errorf("label selectors are not supported on services") } @@ -352,14 +352,14 @@ func (r *Registry) WatchServices(label, field labels.Selector, resourceVersion u } // ListEndpoints obtains a list of Services. -func (r *Registry) ListEndpoints() (*api.EndpointsList, error) { +func (r *Registry) ListEndpoints(ctx api.Context) (*api.EndpointsList, error) { list := &api.EndpointsList{} err := r.ExtractList("/registry/services/endpoints", &list.Items, &list.ResourceVersion) return list, err } // UpdateEndpoints update Endpoints of a Service. -func (r *Registry) UpdateEndpoints(e *api.Endpoints) error { +func (r *Registry) UpdateEndpoints(ctx api.Context, e *api.Endpoints) error { // TODO: this is a really bad misuse of AtomicUpdate, need to compute a diff inside the loop. err := r.AtomicUpdate(makeServiceEndpointsKey(e.ID), &api.Endpoints{}, func(input runtime.Object) (runtime.Object, error) { @@ -370,7 +370,7 @@ func (r *Registry) UpdateEndpoints(e *api.Endpoints) error { } // WatchEndpoints begins watching for new, changed, or deleted endpoint configurations. -func (r *Registry) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (r *Registry) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { if !label.Empty() { return nil, fmt.Errorf("label selectors are not supported on endpoints") } diff --git a/pkg/registry/pod/manifest_factory.go b/pkg/registry/pod/manifest_factory.go index 247ee9931be..db34efa43e0 100644 --- a/pkg/registry/pod/manifest_factory.go +++ b/pkg/registry/pod/manifest_factory.go @@ -32,7 +32,7 @@ type BasicManifestFactory struct { } func (b *BasicManifestFactory) MakeManifest(machine string, pod api.Pod) (api.ContainerManifest, error) { - envVars, err := service.GetServiceEnvironmentVariables(b.ServiceRegistry, machine) + envVars, err := service.GetServiceEnvironmentVariables(api.NewContext(), b.ServiceRegistry, machine) if err != nil { return api.ContainerManifest{}, err } diff --git a/pkg/registry/pod/registry.go b/pkg/registry/pod/registry.go index 97cb00c03a9..9b535ad82b2 100644 --- a/pkg/registry/pod/registry.go +++ b/pkg/registry/pod/registry.go @@ -27,15 +27,15 @@ type Registry interface { // ListPods obtains a list of pods having labels which match selector. ListPods(ctx api.Context, selector labels.Selector) (*api.PodList, error) // ListPodsPredicate obtains a list of pods for which filter returns true. - ListPodsPredicate(filter func(*api.Pod) bool) (*api.PodList, error) + ListPodsPredicate(ctx api.Context, filter func(*api.Pod) bool) (*api.PodList, error) // Watch for new/changed/deleted pods - WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) + WatchPods(ctx api.Context, resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) // Get a specific pod - GetPod(podID string) (*api.Pod, error) + GetPod(ctx api.Context, podID string) (*api.Pod, error) // Create a pod based on a specification. - CreatePod(pod *api.Pod) error + CreatePod(ctx api.Context, pod *api.Pod) error // Update an existing pod - UpdatePod(pod *api.Pod) error + UpdatePod(ctx api.Context, pod *api.Pod) error // Delete an existing pod - DeletePod(podID string) error + DeletePod(ctx api.Context, podID string) error } diff --git a/pkg/registry/pod/rest.go b/pkg/registry/pod/rest.go index 22e60f8b2f9..8f3919aaf4c 100644 --- a/pkg/registry/pod/rest.go +++ b/pkg/registry/pod/rest.go @@ -81,21 +81,21 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje pod.CreationTimestamp = util.Now() return apiserver.MakeAsync(func() (runtime.Object, error) { - if err := rs.registry.CreatePod(pod); err != nil { + if err := rs.registry.CreatePod(ctx, pod); err != nil { return nil, err } - return rs.registry.GetPod(pod.ID) + return rs.registry.GetPod(ctx, pod.ID) }), nil } func (rs *REST) Delete(ctx api.Context, id string) (<-chan runtime.Object, error) { return apiserver.MakeAsync(func() (runtime.Object, error) { - return &api.Status{Status: api.StatusSuccess}, rs.registry.DeletePod(id) + return &api.Status{Status: api.StatusSuccess}, rs.registry.DeletePod(ctx, id) }), nil } func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) { - pod, err := rs.registry.GetPod(id) + pod, err := rs.registry.GetPod(ctx, id) if err != nil { return pod, err } @@ -132,7 +132,7 @@ func (rs *REST) filterFunc(label, field labels.Selector) func(*api.Pod) bool { } func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) { - pods, err := rs.registry.ListPodsPredicate(rs.filterFunc(label, field)) + pods, err := rs.registry.ListPodsPredicate(ctx, rs.filterFunc(label, field)) if err == nil { for i := range pods.Items { pod := &pods.Items[i] @@ -150,7 +150,7 @@ func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Obj // Watch begins watching for new, changed, or deleted pods. func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { - return rs.registry.WatchPods(resourceVersion, rs.filterFunc(label, field)) + return rs.registry.WatchPods(ctx, resourceVersion, rs.filterFunc(label, field)) } func (*REST) New() runtime.Object { @@ -163,10 +163,10 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje return nil, errors.NewInvalid("pod", pod.ID, errs) } return apiserver.MakeAsync(func() (runtime.Object, error) { - if err := rs.registry.UpdatePod(pod); err != nil { + if err := rs.registry.UpdatePod(ctx, pod); err != nil { return nil, err } - return rs.registry.GetPod(pod.ID) + return rs.registry.GetPod(ctx, pod.ID) }), nil } diff --git a/pkg/registry/service/registry.go b/pkg/registry/service/registry.go index 1176725452b..ce4512213ee 100644 --- a/pkg/registry/service/registry.go +++ b/pkg/registry/service/registry.go @@ -25,12 +25,12 @@ import ( // Registry is an interface for things that know how to store services. type Registry interface { - ListServices() (*api.ServiceList, error) - CreateService(svc *api.Service) error - GetService(name string) (*api.Service, error) - DeleteService(name string) error - UpdateService(svc *api.Service) error - WatchServices(labels, fields labels.Selector, resourceVersion uint64) (watch.Interface, error) + ListServices(ctx api.Context) (*api.ServiceList, error) + CreateService(ctx api.Context, svc *api.Service) error + GetService(ctx api.Context, name string) (*api.Service, error) + DeleteService(ctx api.Context, name string) error + UpdateService(ctx api.Context, svc *api.Service) error + WatchServices(ctx api.Context, labels, fields labels.Selector, resourceVersion uint64) (watch.Interface, error) // TODO: endpoints and their implementation should be separated, setting endpoints should be // supported via the API, and the endpoints-controller should use the API to update endpoints. diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index b9afebf36c9..635533ff66e 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -86,27 +86,27 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje return nil, err } } - err := rs.registry.CreateService(srv) + err := rs.registry.CreateService(ctx, srv) if err != nil { return nil, err } - return rs.registry.GetService(srv.ID) + return rs.registry.GetService(ctx, srv.ID) }), nil } func (rs *REST) Delete(ctx api.Context, id string) (<-chan runtime.Object, error) { - service, err := rs.registry.GetService(id) + service, err := rs.registry.GetService(ctx, id) if err != nil { return nil, err } return apiserver.MakeAsync(func() (runtime.Object, error) { rs.deleteExternalLoadBalancer(service) - return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteService(id) + return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteService(ctx, id) }), nil } func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) { - s, err := rs.registry.GetService(id) + s, err := rs.registry.GetService(ctx, id) if err != nil { return nil, err } @@ -115,7 +115,7 @@ func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) { // TODO: implement field selector? func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) { - list, err := rs.registry.ListServices() + list, err := rs.registry.ListServices(ctx) if err != nil { return nil, err } @@ -132,7 +132,7 @@ func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Obj // Watch returns Services events via a watch.Interface. // It implements apiserver.ResourceWatcher. func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { - return rs.registry.WatchServices(label, field, resourceVersion) + return rs.registry.WatchServices(ctx, label, field, resourceVersion) } func (*REST) New() runtime.Object { @@ -141,9 +141,9 @@ func (*REST) New() runtime.Object { // GetServiceEnvironmentVariables populates a list of environment variables that are use // in the container environment to get access to services. -func GetServiceEnvironmentVariables(registry Registry, machine string) ([]api.EnvVar, error) { +func GetServiceEnvironmentVariables(ctx api.Context, registry Registry, machine string) ([]api.EnvVar, error) { var result []api.EnvVar - services, err := registry.ListServices() + services, err := registry.ListServices(ctx) if err != nil { return result, err } @@ -170,17 +170,17 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje } return apiserver.MakeAsync(func() (runtime.Object, error) { // TODO: check to see if external load balancer status changed - err := rs.registry.UpdateService(srv) + err := rs.registry.UpdateService(ctx, srv) if err != nil { return nil, err } - return rs.registry.GetService(srv.ID) + return rs.registry.GetService(ctx, srv.ID) }), nil } // ResourceLocation returns a URL to which one can send traffic for the specified service. func (rs *REST) ResourceLocation(ctx api.Context, id string) (string, error) { - e, err := rs.registry.GetEndpoints(id) + e, err := rs.registry.GetEndpoints(ctx, id) if err != nil { return "", err } diff --git a/pkg/service/endpoints_controller.go b/pkg/service/endpoints_controller.go index 940c6d42a0e..40af7c0ad4e 100644 --- a/pkg/service/endpoints_controller.go +++ b/pkg/service/endpoints_controller.go @@ -73,7 +73,7 @@ func (e *EndpointController) SyncServiceEndpoints() error { endpoints[ix] = net.JoinHostPort(pod.CurrentState.PodIP, strconv.Itoa(port)) } // TODO: this is totally broken, we need to compute this and store inside an AtomicUpdate loop. - err = e.serviceRegistry.UpdateEndpoints(&api.Endpoints{ + err = e.serviceRegistry.UpdateEndpoints(api.NewContext(), &api.Endpoints{ JSONBase: api.JSONBase{ID: service.ID}, Endpoints: endpoints, }) From b7fcc7d3ec286955447a18a0dd1874759d5eba83 Mon Sep 17 00:00:00 2001 From: derekwaynecarr Date: Fri, 26 Sep 2014 15:19:22 -0400 Subject: [PATCH 2/2] Add ctx to registry test cases --- pkg/registry/binding/mock.go | 2 +- pkg/registry/etcd/etcd_test.go | 109 ++++++++++++++++-------- pkg/registry/registrytest/controller.go | 12 +-- pkg/registry/registrytest/pod.go | 14 +-- pkg/registry/registrytest/service.go | 20 ++--- pkg/registry/service/rest_test.go | 39 ++++----- 6 files changed, 117 insertions(+), 79 deletions(-) diff --git a/pkg/registry/binding/mock.go b/pkg/registry/binding/mock.go index 92af87429c1..f49429cd166 100644 --- a/pkg/registry/binding/mock.go +++ b/pkg/registry/binding/mock.go @@ -25,6 +25,6 @@ type MockRegistry struct { OnApplyBinding func(binding *api.Binding) error } -func (mr MockRegistry) ApplyBinding(binding *api.Binding) error { +func (mr MockRegistry) ApplyBinding(ctx api.Context, binding *api.Binding) error { return mr.OnApplyBinding(binding) } diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index d280af2a378..cd902ef0498 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -41,10 +41,11 @@ func NewTestEtcdRegistry(client tools.EtcdClient) *Registry { } func TestEtcdGetPod(t *testing.T) { + ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) fakeClient.Set("/registry/pods/foo", runtime.EncodeOrDie(latest.Codec, &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0) registry := NewTestEtcdRegistry(fakeClient) - pod, err := registry.GetPod("foo") + pod, err := registry.GetPod(ctx, "foo") if err != nil { t.Errorf("unexpected error: %v", err) } @@ -55,6 +56,7 @@ func TestEtcdGetPod(t *testing.T) { } func TestEtcdGetPodNotFound(t *testing.T) { + ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{ R: &etcd.Response{ @@ -63,13 +65,14 @@ func TestEtcdGetPodNotFound(t *testing.T) { E: tools.EtcdErrorNotFound, } registry := NewTestEtcdRegistry(fakeClient) - _, err := registry.GetPod("foo") + _, err := registry.GetPod(ctx, "foo") if !errors.IsNotFound(err) { t.Errorf("Unexpected error returned: %#v", err) } } func TestEtcdCreatePod(t *testing.T) { + ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) fakeClient.TestIndex = true fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{ @@ -80,7 +83,7 @@ func TestEtcdCreatePod(t *testing.T) { } fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.ContainerManifestList{}), 0) registry := NewTestEtcdRegistry(fakeClient) - err := registry.CreatePod(&api.Pod{ + err := registry.CreatePod(ctx, &api.Pod{ JSONBase: api.JSONBase{ ID: "foo", }, @@ -99,7 +102,7 @@ func TestEtcdCreatePod(t *testing.T) { } // Suddenly, a wild scheduler appears: - err = registry.ApplyBinding(&api.Binding{PodID: "foo", Host: "machine"}) + err = registry.ApplyBinding(ctx, &api.Binding{PodID: "foo", Host: "machine"}) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -130,6 +133,7 @@ func TestEtcdCreatePod(t *testing.T) { } func TestEtcdCreatePodAlreadyExisting(t *testing.T) { + ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{ R: &etcd.Response{ @@ -140,7 +144,7 @@ func TestEtcdCreatePodAlreadyExisting(t *testing.T) { E: nil, } registry := NewTestEtcdRegistry(fakeClient) - err := registry.CreatePod(&api.Pod{ + err := registry.CreatePod(ctx, &api.Pod{ JSONBase: api.JSONBase{ ID: "foo", }, @@ -151,6 +155,7 @@ func TestEtcdCreatePodAlreadyExisting(t *testing.T) { } func TestEtcdCreatePodWithContainersError(t *testing.T) { + ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) fakeClient.TestIndex = true fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{ @@ -166,7 +171,7 @@ func TestEtcdCreatePodWithContainersError(t *testing.T) { E: tools.EtcdErrorNodeExist, // validate that ApplyBinding is translating Create errors } registry := NewTestEtcdRegistry(fakeClient) - err := registry.CreatePod(&api.Pod{ + err := registry.CreatePod(ctx, &api.Pod{ JSONBase: api.JSONBase{ ID: "foo", }, @@ -176,12 +181,12 @@ func TestEtcdCreatePodWithContainersError(t *testing.T) { } // Suddenly, a wild scheduler appears: - err = registry.ApplyBinding(&api.Binding{PodID: "foo", Host: "machine"}) + err = registry.ApplyBinding(ctx, &api.Binding{PodID: "foo", Host: "machine"}) if !errors.IsAlreadyExists(err) { t.Fatalf("Unexpected error returned: %#v", err) } - existingPod, err := registry.GetPod("foo") + existingPod, err := registry.GetPod(ctx, "foo") if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -191,6 +196,7 @@ func TestEtcdCreatePodWithContainersError(t *testing.T) { } func TestEtcdCreatePodWithContainersNotFound(t *testing.T) { + ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) fakeClient.TestIndex = true fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{ @@ -206,7 +212,7 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) { E: tools.EtcdErrorNotFound, } registry := NewTestEtcdRegistry(fakeClient) - err := registry.CreatePod(&api.Pod{ + err := registry.CreatePod(ctx, &api.Pod{ JSONBase: api.JSONBase{ ID: "foo", }, @@ -226,7 +232,7 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) { } // Suddenly, a wild scheduler appears: - err = registry.ApplyBinding(&api.Binding{PodID: "foo", Host: "machine"}) + err = registry.ApplyBinding(ctx, &api.Binding{PodID: "foo", Host: "machine"}) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -257,6 +263,7 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) { } func TestEtcdCreatePodWithExistingContainers(t *testing.T) { + ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) fakeClient.TestIndex = true fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{ @@ -271,7 +278,7 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) { }, }), 0) registry := NewTestEtcdRegistry(fakeClient) - err := registry.CreatePod(&api.Pod{ + err := registry.CreatePod(ctx, &api.Pod{ JSONBase: api.JSONBase{ ID: "foo", }, @@ -291,7 +298,7 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) { } // Suddenly, a wild scheduler appears: - err = registry.ApplyBinding(&api.Binding{PodID: "foo", Host: "machine"}) + err = registry.ApplyBinding(ctx, &api.Binding{PodID: "foo", Host: "machine"}) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -322,6 +329,7 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) { } func TestEtcdDeletePod(t *testing.T) { + ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) fakeClient.TestIndex = true @@ -336,7 +344,7 @@ func TestEtcdDeletePod(t *testing.T) { }, }), 0) registry := NewTestEtcdRegistry(fakeClient) - err := registry.DeletePod("foo") + err := registry.DeletePod(ctx, "foo") if err != nil { t.Errorf("unexpected error: %v", err) } @@ -358,6 +366,7 @@ func TestEtcdDeletePod(t *testing.T) { } func TestEtcdDeletePodMultipleContainers(t *testing.T) { + ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) fakeClient.TestIndex = true @@ -373,7 +382,7 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) { }, }), 0) registry := NewTestEtcdRegistry(fakeClient) - err := registry.DeletePod("foo") + err := registry.DeletePod(ctx, "foo") if err != nil { t.Errorf("unexpected error: %v", err) } @@ -481,6 +490,7 @@ func TestEtcdListPods(t *testing.T) { } func TestEtcdListControllersNotFound(t *testing.T) { + ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) key := "/registry/controllers" fakeClient.Data[key] = tools.EtcdResponseWithError{ @@ -488,7 +498,7 @@ func TestEtcdListControllersNotFound(t *testing.T) { E: tools.EtcdErrorNotFound, } registry := NewTestEtcdRegistry(fakeClient) - controllers, err := registry.ListControllers() + controllers, err := registry.ListControllers(ctx) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -499,6 +509,7 @@ func TestEtcdListControllersNotFound(t *testing.T) { } func TestEtcdListServicesNotFound(t *testing.T) { + ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) key := "/registry/services/specs" fakeClient.Data[key] = tools.EtcdResponseWithError{ @@ -506,7 +517,7 @@ func TestEtcdListServicesNotFound(t *testing.T) { E: tools.EtcdErrorNotFound, } registry := NewTestEtcdRegistry(fakeClient) - services, err := registry.ListServices() + services, err := registry.ListServices(ctx) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -517,6 +528,7 @@ func TestEtcdListServicesNotFound(t *testing.T) { } func TestEtcdListControllers(t *testing.T) { + ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) key := "/registry/controllers" fakeClient.Data[key] = tools.EtcdResponseWithError{ @@ -535,7 +547,7 @@ func TestEtcdListControllers(t *testing.T) { E: nil, } registry := NewTestEtcdRegistry(fakeClient) - controllers, err := registry.ListControllers() + controllers, err := registry.ListControllers(ctx) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -546,10 +558,11 @@ func TestEtcdListControllers(t *testing.T) { } func TestEtcdGetController(t *testing.T) { + ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) fakeClient.Set("/registry/controllers/foo", runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0) registry := NewTestEtcdRegistry(fakeClient) - ctrl, err := registry.GetController("foo") + ctrl, err := registry.GetController(ctx, "foo") if err != nil { t.Errorf("unexpected error: %v", err) } @@ -560,6 +573,7 @@ func TestEtcdGetController(t *testing.T) { } func TestEtcdGetControllerNotFound(t *testing.T) { + ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) fakeClient.Data["/registry/controllers/foo"] = tools.EtcdResponseWithError{ R: &etcd.Response{ @@ -568,7 +582,7 @@ func TestEtcdGetControllerNotFound(t *testing.T) { E: tools.EtcdErrorNotFound, } registry := NewTestEtcdRegistry(fakeClient) - ctrl, err := registry.GetController("foo") + ctrl, err := registry.GetController(ctx, "foo") if ctrl != nil { t.Errorf("Unexpected non-nil controller: %#v", ctrl) } @@ -578,9 +592,10 @@ func TestEtcdGetControllerNotFound(t *testing.T) { } func TestEtcdDeleteController(t *testing.T) { + ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) registry := NewTestEtcdRegistry(fakeClient) - err := registry.DeleteController("foo") + err := registry.DeleteController(ctx, "foo") if err != nil { t.Errorf("unexpected error: %v", err) } @@ -595,9 +610,10 @@ func TestEtcdDeleteController(t *testing.T) { } func TestEtcdCreateController(t *testing.T) { + ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) registry := NewTestEtcdRegistry(fakeClient) - err := registry.CreateController(&api.ReplicationController{ + err := registry.CreateController(ctx, &api.ReplicationController{ JSONBase: api.JSONBase{ ID: "foo", }, @@ -622,11 +638,12 @@ func TestEtcdCreateController(t *testing.T) { } func TestEtcdCreateControllerAlreadyExisting(t *testing.T) { + ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) fakeClient.Set("/registry/controllers/foo", runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0) registry := NewTestEtcdRegistry(fakeClient) - err := registry.CreateController(&api.ReplicationController{ + err := registry.CreateController(ctx, &api.ReplicationController{ JSONBase: api.JSONBase{ ID: "foo", }, @@ -637,12 +654,13 @@ func TestEtcdCreateControllerAlreadyExisting(t *testing.T) { } func TestEtcdUpdateController(t *testing.T) { + ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) fakeClient.TestIndex = true resp, _ := fakeClient.Set("/registry/controllers/foo", runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0) registry := NewTestEtcdRegistry(fakeClient) - err := registry.UpdateController(&api.ReplicationController{ + err := registry.UpdateController(ctx, &api.ReplicationController{ JSONBase: api.JSONBase{ID: "foo", ResourceVersion: resp.Node.ModifiedIndex}, DesiredState: api.ReplicationControllerState{ Replicas: 2, @@ -652,13 +670,14 @@ func TestEtcdUpdateController(t *testing.T) { t.Errorf("unexpected error: %v", err) } - ctrl, err := registry.GetController("foo") + ctrl, err := registry.GetController(ctx, "foo") if ctrl.DesiredState.Replicas != 2 { t.Errorf("Unexpected controller: %#v", ctrl) } } func TestEtcdListServices(t *testing.T) { + ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) key := "/registry/services/specs" fakeClient.Data[key] = tools.EtcdResponseWithError{ @@ -677,7 +696,7 @@ func TestEtcdListServices(t *testing.T) { E: nil, } registry := NewTestEtcdRegistry(fakeClient) - services, err := registry.ListServices() + services, err := registry.ListServices(ctx) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -688,9 +707,10 @@ func TestEtcdListServices(t *testing.T) { } func TestEtcdCreateService(t *testing.T) { + ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) registry := NewTestEtcdRegistry(fakeClient) - err := registry.CreateService(&api.Service{ + err := registry.CreateService(ctx, &api.Service{ JSONBase: api.JSONBase{ID: "foo"}, }) if err != nil { @@ -714,10 +734,11 @@ func TestEtcdCreateService(t *testing.T) { } func TestEtcdCreateServiceAlreadyExisting(t *testing.T) { + ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) fakeClient.Set("/registry/services/specs/foo", runtime.EncodeOrDie(latest.Codec, &api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0) registry := NewTestEtcdRegistry(fakeClient) - err := registry.CreateService(&api.Service{ + err := registry.CreateService(ctx, &api.Service{ JSONBase: api.JSONBase{ID: "foo"}, }) if !errors.IsAlreadyExists(err) { @@ -726,10 +747,11 @@ func TestEtcdCreateServiceAlreadyExisting(t *testing.T) { } func TestEtcdGetService(t *testing.T) { + ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) fakeClient.Set("/registry/services/specs/foo", runtime.EncodeOrDie(latest.Codec, &api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0) registry := NewTestEtcdRegistry(fakeClient) - service, err := registry.GetService("foo") + service, err := registry.GetService(ctx, "foo") if err != nil { t.Errorf("unexpected error: %v", err) } @@ -740,6 +762,7 @@ func TestEtcdGetService(t *testing.T) { } func TestEtcdGetServiceNotFound(t *testing.T) { + ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) fakeClient.Data["/registry/services/specs/foo"] = tools.EtcdResponseWithError{ R: &etcd.Response{ @@ -748,16 +771,17 @@ func TestEtcdGetServiceNotFound(t *testing.T) { E: tools.EtcdErrorNotFound, } registry := NewTestEtcdRegistry(fakeClient) - _, err := registry.GetService("foo") + _, err := registry.GetService(ctx, "foo") if !errors.IsNotFound(err) { t.Errorf("Unexpected error returned: %#v", err) } } func TestEtcdDeleteService(t *testing.T) { + ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) registry := NewTestEtcdRegistry(fakeClient) - err := registry.DeleteService("foo") + err := registry.DeleteService(ctx, "foo") if err != nil { t.Errorf("unexpected error: %v", err) } @@ -776,6 +800,7 @@ func TestEtcdDeleteService(t *testing.T) { } func TestEtcdUpdateService(t *testing.T) { + ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) fakeClient.TestIndex = true @@ -790,12 +815,12 @@ func TestEtcdUpdateService(t *testing.T) { "baz": "bar", }, } - err := registry.UpdateService(&testService) + err := registry.UpdateService(ctx, &testService) if err != nil { t.Errorf("unexpected error: %v", err) } - svc, err := registry.GetService("foo") + svc, err := registry.GetService(ctx, "foo") if err != nil { t.Errorf("unexpected error: %v", err) } @@ -809,6 +834,7 @@ func TestEtcdUpdateService(t *testing.T) { } func TestEtcdListEndpoints(t *testing.T) { + ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) key := "/registry/services/endpoints" fakeClient.Data[key] = tools.EtcdResponseWithError{ @@ -827,7 +853,7 @@ func TestEtcdListEndpoints(t *testing.T) { E: nil, } registry := NewTestEtcdRegistry(fakeClient) - services, err := registry.ListEndpoints() + services, err := registry.ListEndpoints(ctx) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -838,6 +864,7 @@ func TestEtcdListEndpoints(t *testing.T) { } func TestEtcdGetEndpoints(t *testing.T) { + ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) registry := NewTestEtcdRegistry(fakeClient) endpoints := &api.Endpoints{ @@ -847,7 +874,7 @@ func TestEtcdGetEndpoints(t *testing.T) { fakeClient.Set("/registry/services/endpoints/foo", runtime.EncodeOrDie(latest.Codec, endpoints), 0) - got, err := registry.GetEndpoints("foo") + got, err := registry.GetEndpoints(ctx, "foo") if err != nil { t.Errorf("unexpected error: %v", err) } @@ -858,6 +885,7 @@ func TestEtcdGetEndpoints(t *testing.T) { } func TestEtcdUpdateEndpoints(t *testing.T) { + ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) fakeClient.TestIndex = true registry := NewTestEtcdRegistry(fakeClient) @@ -868,7 +896,7 @@ func TestEtcdUpdateEndpoints(t *testing.T) { fakeClient.Set("/registry/services/endpoints/foo", runtime.EncodeOrDie(latest.Codec, &api.Endpoints{}), 0) - err := registry.UpdateEndpoints(&endpoints) + err := registry.UpdateEndpoints(ctx, &endpoints) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -885,9 +913,10 @@ func TestEtcdUpdateEndpoints(t *testing.T) { } func TestEtcdWatchServices(t *testing.T) { + ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) registry := NewTestEtcdRegistry(fakeClient) - watching, err := registry.WatchServices( + watching, err := registry.WatchServices(ctx, labels.Everything(), labels.SelectorFromSet(labels.Set{"ID": "foo"}), 1, @@ -912,9 +941,11 @@ func TestEtcdWatchServices(t *testing.T) { } func TestEtcdWatchServicesBadSelector(t *testing.T) { + ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) registry := NewTestEtcdRegistry(fakeClient) _, err := registry.WatchServices( + ctx, labels.Everything(), labels.SelectorFromSet(labels.Set{"Field.Selector": "foo"}), 0, @@ -924,6 +955,7 @@ func TestEtcdWatchServicesBadSelector(t *testing.T) { } _, err = registry.WatchServices( + ctx, labels.SelectorFromSet(labels.Set{"Label.Selector": "foo"}), labels.Everything(), 0, @@ -934,9 +966,11 @@ func TestEtcdWatchServicesBadSelector(t *testing.T) { } func TestEtcdWatchEndpoints(t *testing.T) { + ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) registry := NewTestEtcdRegistry(fakeClient) watching, err := registry.WatchEndpoints( + ctx, labels.Everything(), labels.SelectorFromSet(labels.Set{"ID": "foo"}), 1, @@ -961,9 +995,11 @@ func TestEtcdWatchEndpoints(t *testing.T) { } func TestEtcdWatchEndpointsBadSelector(t *testing.T) { + ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) registry := NewTestEtcdRegistry(fakeClient) _, err := registry.WatchEndpoints( + ctx, labels.Everything(), labels.SelectorFromSet(labels.Set{"Field.Selector": "foo"}), 0, @@ -973,6 +1009,7 @@ func TestEtcdWatchEndpointsBadSelector(t *testing.T) { } _, err = registry.WatchEndpoints( + ctx, labels.SelectorFromSet(labels.Set{"Label.Selector": "foo"}), labels.Everything(), 0, diff --git a/pkg/registry/registrytest/controller.go b/pkg/registry/registrytest/controller.go index bfec704eb7b..e0a3912f518 100644 --- a/pkg/registry/registrytest/controller.go +++ b/pkg/registry/registrytest/controller.go @@ -27,26 +27,26 @@ type ControllerRegistry struct { Controllers *api.ReplicationControllerList } -func (r *ControllerRegistry) ListControllers() (*api.ReplicationControllerList, error) { +func (r *ControllerRegistry) ListControllers(ctx api.Context) (*api.ReplicationControllerList, error) { return r.Controllers, r.Err } -func (r *ControllerRegistry) GetController(ID string) (*api.ReplicationController, error) { +func (r *ControllerRegistry) GetController(ctx api.Context, ID string) (*api.ReplicationController, error) { return &api.ReplicationController{}, r.Err } -func (r *ControllerRegistry) CreateController(controller *api.ReplicationController) error { +func (r *ControllerRegistry) CreateController(ctx api.Context, controller *api.ReplicationController) error { return r.Err } -func (r *ControllerRegistry) UpdateController(controller *api.ReplicationController) error { +func (r *ControllerRegistry) UpdateController(ctx api.Context, controller *api.ReplicationController) error { return r.Err } -func (r *ControllerRegistry) DeleteController(ID string) error { +func (r *ControllerRegistry) DeleteController(ctx api.Context, ID string) error { return r.Err } -func (r *ControllerRegistry) WatchControllers(resourceVersion uint64) (watch.Interface, error) { +func (r *ControllerRegistry) WatchControllers(ctx api.Context, resourceVersion uint64) (watch.Interface, error) { return nil, r.Err } diff --git a/pkg/registry/registrytest/pod.go b/pkg/registry/registrytest/pod.go index 763a1500dd3..f77014d7ade 100644 --- a/pkg/registry/registrytest/pod.go +++ b/pkg/registry/registrytest/pod.go @@ -40,7 +40,7 @@ func NewPodRegistry(pods *api.PodList) *PodRegistry { } } -func (r *PodRegistry) ListPodsPredicate(filter func(*api.Pod) bool) (*api.PodList, error) { +func (r *PodRegistry) ListPodsPredicate(ctx api.Context, filter func(*api.Pod) bool) (*api.PodList, error) { r.Lock() defer r.Unlock() if r.Err != nil { @@ -58,23 +58,23 @@ func (r *PodRegistry) ListPodsPredicate(filter func(*api.Pod) bool) (*api.PodLis } func (r *PodRegistry) ListPods(ctx api.Context, selector labels.Selector) (*api.PodList, error) { - return r.ListPodsPredicate(func(pod *api.Pod) bool { + return r.ListPodsPredicate(ctx, func(pod *api.Pod) bool { return selector.Matches(labels.Set(pod.Labels)) }) } -func (r *PodRegistry) WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) { +func (r *PodRegistry) WatchPods(ctx api.Context, resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) { // TODO: wire filter down into the mux; it needs access to current and previous state :( return r.mux.Watch(), nil } -func (r *PodRegistry) GetPod(podId string) (*api.Pod, error) { +func (r *PodRegistry) GetPod(ctx api.Context, podId string) (*api.Pod, error) { r.Lock() defer r.Unlock() return r.Pod, r.Err } -func (r *PodRegistry) CreatePod(pod *api.Pod) error { +func (r *PodRegistry) CreatePod(ctx api.Context, pod *api.Pod) error { r.Lock() defer r.Unlock() r.Pod = pod @@ -82,7 +82,7 @@ func (r *PodRegistry) CreatePod(pod *api.Pod) error { return r.Err } -func (r *PodRegistry) UpdatePod(pod *api.Pod) error { +func (r *PodRegistry) UpdatePod(ctx api.Context, pod *api.Pod) error { r.Lock() defer r.Unlock() r.Pod = pod @@ -90,7 +90,7 @@ func (r *PodRegistry) UpdatePod(pod *api.Pod) error { return r.Err } -func (r *PodRegistry) DeletePod(podId string) error { +func (r *PodRegistry) DeletePod(ctx api.Context, podId string) error { r.Lock() defer r.Unlock() r.mux.Action(watch.Deleted, r.Pod) diff --git a/pkg/registry/registrytest/service.go b/pkg/registry/registrytest/service.go index a61ab385f3f..5d0b3b71379 100644 --- a/pkg/registry/registrytest/service.go +++ b/pkg/registry/registrytest/service.go @@ -38,49 +38,49 @@ type ServiceRegistry struct { UpdatedID string } -func (r *ServiceRegistry) ListServices() (*api.ServiceList, error) { +func (r *ServiceRegistry) ListServices(ctx api.Context) (*api.ServiceList, error) { return &r.List, r.Err } -func (r *ServiceRegistry) CreateService(svc *api.Service) error { +func (r *ServiceRegistry) CreateService(ctx api.Context, svc *api.Service) error { r.Service = svc r.List.Items = append(r.List.Items, *svc) return r.Err } -func (r *ServiceRegistry) GetService(id string) (*api.Service, error) { +func (r *ServiceRegistry) GetService(ctx api.Context, id string) (*api.Service, error) { r.GottenID = id return r.Service, r.Err } -func (r *ServiceRegistry) DeleteService(id string) error { +func (r *ServiceRegistry) DeleteService(ctx api.Context, id string) error { r.DeletedID = id return r.Err } -func (r *ServiceRegistry) UpdateService(svc *api.Service) error { +func (r *ServiceRegistry) UpdateService(ctx api.Context, svc *api.Service) error { r.UpdatedID = svc.ID return r.Err } -func (r *ServiceRegistry) WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (r *ServiceRegistry) WatchServices(ctx api.Context, label labels.Selector, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { return nil, r.Err } -func (r *ServiceRegistry) ListEndpoints() (*api.EndpointsList, error) { +func (r *ServiceRegistry) ListEndpoints(ctx api.Context) (*api.EndpointsList, error) { return &r.EndpointsList, r.Err } -func (r *ServiceRegistry) GetEndpoints(id string) (*api.Endpoints, error) { +func (r *ServiceRegistry) GetEndpoints(ctx api.Context, id string) (*api.Endpoints, error) { r.GottenID = id return &r.Endpoints, r.Err } -func (r *ServiceRegistry) UpdateEndpoints(e *api.Endpoints) error { +func (r *ServiceRegistry) UpdateEndpoints(ctx api.Context, e *api.Endpoints) error { r.Endpoints = *e return r.Err } -func (r *ServiceRegistry) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (r *ServiceRegistry) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { return nil, r.Err } diff --git a/pkg/registry/service/rest_test.go b/pkg/registry/service/rest_test.go index 9293002e212..ddadda9850f 100644 --- a/pkg/registry/service/rest_test.go +++ b/pkg/registry/service/rest_test.go @@ -53,7 +53,7 @@ func TestServiceRegistryCreate(t *testing.T) { if len(fakeCloud.Calls) != 0 { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } - srv, err := registry.GetService(svc.ID) + srv, err := registry.GetService(ctx, svc.ID) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -90,14 +90,14 @@ func TestServiceStorageValidatesCreate(t *testing.T) { } func TestServiceRegistryUpdate(t *testing.T) { + ctx := api.NewContext() registry := registrytest.NewServiceRegistry() - registry.CreateService(&api.Service{ + registry.CreateService(ctx, &api.Service{ Port: 6502, JSONBase: api.JSONBase{ID: "foo"}, Selector: map[string]string{"bar": "baz1"}, }) storage := NewREST(registry, nil, nil) - ctx := api.NewContext() c, err := storage.Update(ctx, &api.Service{ Port: 6502, JSONBase: api.JSONBase{ID: "foo"}, @@ -120,8 +120,9 @@ func TestServiceRegistryUpdate(t *testing.T) { } func TestServiceStorageValidatesUpdate(t *testing.T) { + ctx := api.NewContext() registry := registrytest.NewServiceRegistry() - registry.CreateService(&api.Service{ + registry.CreateService(ctx, &api.Service{ Port: 6502, JSONBase: api.JSONBase{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, @@ -139,7 +140,6 @@ func TestServiceStorageValidatesUpdate(t *testing.T) { Selector: map[string]string{}, }, } - ctx := api.NewContext() for _, failureCase := range failureCases { c, err := storage.Update(ctx, &failureCase) if c != nil { @@ -152,6 +152,7 @@ func TestServiceStorageValidatesUpdate(t *testing.T) { } func TestServiceRegistryExternalService(t *testing.T) { + ctx := api.NewContext() registry := registrytest.NewServiceRegistry() fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} @@ -162,13 +163,12 @@ func TestServiceRegistryExternalService(t *testing.T) { Selector: map[string]string{"bar": "baz"}, CreateExternalLoadBalancer: true, } - ctx := api.NewContext() c, _ := storage.Create(ctx, svc) <-c if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "create" { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } - srv, err := registry.GetService(svc.ID) + srv, err := registry.GetService(ctx, svc.ID) if err != nil { t.Errorf("Unexpected error: %v", err) } @@ -202,6 +202,7 @@ func TestServiceRegistryExternalServiceError(t *testing.T) { } func TestServiceRegistryDelete(t *testing.T) { + ctx := api.NewContext() registry := registrytest.NewServiceRegistry() fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} @@ -210,8 +211,7 @@ func TestServiceRegistryDelete(t *testing.T) { JSONBase: api.JSONBase{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, } - ctx := api.NewContext() - registry.CreateService(svc) + registry.CreateService(ctx, svc) c, _ := storage.Delete(ctx, svc.ID) <-c if len(fakeCloud.Calls) != 0 { @@ -223,6 +223,7 @@ func TestServiceRegistryDelete(t *testing.T) { } func TestServiceRegistryDeleteExternal(t *testing.T) { + ctx := api.NewContext() registry := registrytest.NewServiceRegistry() fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} @@ -232,8 +233,7 @@ func TestServiceRegistryDeleteExternal(t *testing.T) { Selector: map[string]string{"bar": "baz"}, CreateExternalLoadBalancer: true, } - ctx := api.NewContext() - registry.CreateService(svc) + registry.CreateService(ctx, svc) c, _ := storage.Delete(ctx, svc.ID) <-c if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "delete" { @@ -245,6 +245,7 @@ func TestServiceRegistryDeleteExternal(t *testing.T) { } func TestServiceRegistryMakeLinkVariables(t *testing.T) { + ctx := api.NewContext() registry := registrytest.NewServiceRegistry() registry.List = api.ServiceList{ Items: []api.Service{ @@ -269,7 +270,7 @@ func TestServiceRegistryMakeLinkVariables(t *testing.T) { }, } machine := "machine" - vars, err := GetServiceEnvironmentVariables(registry, machine) + vars, err := GetServiceEnvironmentVariables(ctx, registry, machine) if err != nil { t.Errorf("Unexpected err: %v", err) } @@ -309,15 +310,15 @@ func TestServiceRegistryMakeLinkVariables(t *testing.T) { } func TestServiceRegistryGet(t *testing.T) { + ctx := api.NewContext() registry := registrytest.NewServiceRegistry() fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines)) - registry.CreateService(&api.Service{ + registry.CreateService(ctx, &api.Service{ JSONBase: api.JSONBase{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, }) - ctx := api.NewContext() storage.Get(ctx, "foo") if len(fakeCloud.Calls) != 0 { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) @@ -328,13 +329,13 @@ func TestServiceRegistryGet(t *testing.T) { } func TestServiceRegistryResourceLocation(t *testing.T) { + ctx := api.NewContext() registry := registrytest.NewServiceRegistry() registry.Endpoints = api.Endpoints{Endpoints: []string{"foo:80"}} fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines)) - ctx := api.NewContext() - registry.CreateService(&api.Service{ + registry.CreateService(ctx, &api.Service{ JSONBase: api.JSONBase{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, }) @@ -358,20 +359,20 @@ func TestServiceRegistryResourceLocation(t *testing.T) { } func TestServiceRegistryList(t *testing.T) { + ctx := api.NewContext() registry := registrytest.NewServiceRegistry() fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines)) - registry.CreateService(&api.Service{ + registry.CreateService(ctx, &api.Service{ JSONBase: api.JSONBase{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, }) - registry.CreateService(&api.Service{ + registry.CreateService(ctx, &api.Service{ JSONBase: api.JSONBase{ID: "foo2"}, Selector: map[string]string{"bar2": "baz2"}, }) registry.List.ResourceVersion = 1 - ctx := api.NewContext() s, _ := storage.List(ctx, labels.Everything(), labels.Everything()) sl := s.(*api.ServiceList) if len(fakeCloud.Calls) != 0 {