mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-09 03:57:41 +00:00
Add ctx to registry interfaces
This commit is contained in:
parent
f377d3eba8
commit
09365fed8d
@ -24,5 +24,5 @@ import (
|
|||||||
type Registry interface {
|
type Registry interface {
|
||||||
// ApplyBinding should apply the binding. That is, it should actually
|
// ApplyBinding should apply the binding. That is, it should actually
|
||||||
// assign or place pod binding.PodID on machine binding.Host.
|
// assign or place pod binding.PodID on machine binding.Host.
|
||||||
ApplyBinding(binding *api.Binding) error
|
ApplyBinding(ctx api.Context, binding *api.Binding) error
|
||||||
}
|
}
|
||||||
|
@ -67,7 +67,7 @@ func (b *REST) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Objec
|
|||||||
return nil, fmt.Errorf("incorrect type: %#v", obj)
|
return nil, fmt.Errorf("incorrect type: %#v", obj)
|
||||||
}
|
}
|
||||||
return apiserver.MakeAsync(func() (runtime.Object, error) {
|
return apiserver.MakeAsync(func() (runtime.Object, error) {
|
||||||
if err := b.registry.ApplyBinding(binding); err != nil {
|
if err := b.registry.ApplyBinding(ctx, binding); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &api.Status{Status: api.StatusSuccess}, nil
|
return &api.Status{Status: api.StatusSuccess}, nil
|
||||||
|
@ -23,10 +23,10 @@ import (
|
|||||||
|
|
||||||
// Registry is an interface for things that know how to store ReplicationControllers.
|
// Registry is an interface for things that know how to store ReplicationControllers.
|
||||||
type Registry interface {
|
type Registry interface {
|
||||||
ListControllers() (*api.ReplicationControllerList, error)
|
ListControllers(ctx api.Context) (*api.ReplicationControllerList, error)
|
||||||
WatchControllers(resourceVersion uint64) (watch.Interface, error)
|
WatchControllers(ctx api.Context, resourceVersion uint64) (watch.Interface, error)
|
||||||
GetController(controllerID string) (*api.ReplicationController, error)
|
GetController(ctx api.Context, controllerID string) (*api.ReplicationController, error)
|
||||||
CreateController(controller *api.ReplicationController) error
|
CreateController(ctx api.Context, controller *api.ReplicationController) error
|
||||||
UpdateController(controller *api.ReplicationController) error
|
UpdateController(ctx api.Context, controller *api.ReplicationController) error
|
||||||
DeleteController(controllerID string) error
|
DeleteController(ctx api.Context, controllerID string) error
|
||||||
}
|
}
|
||||||
|
@ -71,24 +71,24 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje
|
|||||||
controller.CreationTimestamp = util.Now()
|
controller.CreationTimestamp = util.Now()
|
||||||
|
|
||||||
return apiserver.MakeAsync(func() (runtime.Object, error) {
|
return apiserver.MakeAsync(func() (runtime.Object, error) {
|
||||||
err := rs.registry.CreateController(controller)
|
err := rs.registry.CreateController(ctx, controller)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return rs.registry.GetController(controller.ID)
|
return rs.registry.GetController(ctx, controller.ID)
|
||||||
}), nil
|
}), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete asynchronously deletes the ReplicationController specified by its id.
|
// Delete asynchronously deletes the ReplicationController specified by its id.
|
||||||
func (rs *REST) Delete(ctx api.Context, id string) (<-chan runtime.Object, error) {
|
func (rs *REST) Delete(ctx api.Context, id string) (<-chan runtime.Object, error) {
|
||||||
return apiserver.MakeAsync(func() (runtime.Object, error) {
|
return apiserver.MakeAsync(func() (runtime.Object, error) {
|
||||||
return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteController(id)
|
return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteController(ctx, id)
|
||||||
}), nil
|
}), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get obtains the ReplicationController specified by its id.
|
// Get obtains the ReplicationController specified by its id.
|
||||||
func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) {
|
func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) {
|
||||||
controller, err := rs.registry.GetController(id)
|
controller, err := rs.registry.GetController(ctx, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -101,7 +101,7 @@ func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Obj
|
|||||||
if !field.Empty() {
|
if !field.Empty() {
|
||||||
return nil, fmt.Errorf("field selector not supported yet")
|
return nil, fmt.Errorf("field selector not supported yet")
|
||||||
}
|
}
|
||||||
controllers, err := rs.registry.ListControllers()
|
controllers, err := rs.registry.ListControllers(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -132,11 +132,11 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje
|
|||||||
return nil, errors.NewInvalid("replicationController", controller.ID, errs)
|
return nil, errors.NewInvalid("replicationController", controller.ID, errs)
|
||||||
}
|
}
|
||||||
return apiserver.MakeAsync(func() (runtime.Object, error) {
|
return apiserver.MakeAsync(func() (runtime.Object, error) {
|
||||||
err := rs.registry.UpdateController(controller)
|
err := rs.registry.UpdateController(ctx, controller)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return rs.registry.GetController(controller.ID)
|
return rs.registry.GetController(ctx, controller.ID)
|
||||||
}), nil
|
}), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -146,7 +146,7 @@ func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVer
|
|||||||
if !field.Empty() {
|
if !field.Empty() {
|
||||||
return nil, fmt.Errorf("no field selector implemented for controllers")
|
return nil, fmt.Errorf("no field selector implemented for controllers")
|
||||||
}
|
}
|
||||||
incoming, err := rs.registry.WatchControllers(resourceVersion)
|
incoming, err := rs.registry.WatchControllers(ctx, resourceVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -24,8 +24,8 @@ import (
|
|||||||
|
|
||||||
// Registry is an interface for things that know how to store endpoints.
|
// Registry is an interface for things that know how to store endpoints.
|
||||||
type Registry interface {
|
type Registry interface {
|
||||||
ListEndpoints() (*api.EndpointsList, error)
|
ListEndpoints(ctx api.Context) (*api.EndpointsList, error)
|
||||||
GetEndpoints(name string) (*api.Endpoints, error)
|
GetEndpoints(ctx api.Context, name string) (*api.Endpoints, error)
|
||||||
WatchEndpoints(labels, fields labels.Selector, resourceVersion uint64) (watch.Interface, error)
|
WatchEndpoints(ctx api.Context, labels, fields labels.Selector, resourceVersion uint64) (watch.Interface, error)
|
||||||
UpdateEndpoints(e *api.Endpoints) error
|
UpdateEndpoints(ctx api.Context, e *api.Endpoints) error
|
||||||
}
|
}
|
||||||
|
@ -39,7 +39,7 @@ func NewREST(registry Registry) *REST {
|
|||||||
|
|
||||||
// Get satisfies the RESTStorage interface.
|
// Get satisfies the RESTStorage interface.
|
||||||
func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) {
|
func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) {
|
||||||
return rs.registry.GetEndpoints(id)
|
return rs.registry.GetEndpoints(ctx, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
// List satisfies the RESTStorage interface.
|
// List satisfies the RESTStorage interface.
|
||||||
@ -47,13 +47,13 @@ func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Obj
|
|||||||
if !label.Empty() || !field.Empty() {
|
if !label.Empty() || !field.Empty() {
|
||||||
return nil, errors.New("label/field selectors are not supported on endpoints")
|
return nil, errors.New("label/field selectors are not supported on endpoints")
|
||||||
}
|
}
|
||||||
return rs.registry.ListEndpoints()
|
return rs.registry.ListEndpoints(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Watch returns Endpoint events via a watch.Interface.
|
// Watch returns Endpoint events via a watch.Interface.
|
||||||
// It implements apiserver.ResourceWatcher.
|
// It implements apiserver.ResourceWatcher.
|
||||||
func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
|
func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
|
||||||
return rs.registry.WatchEndpoints(label, field, resourceVersion)
|
return rs.registry.WatchEndpoints(ctx, label, field, resourceVersion)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create satisfies the RESTStorage interface but is unimplemented.
|
// Create satisfies the RESTStorage interface but is unimplemented.
|
||||||
|
@ -56,13 +56,13 @@ func makePodKey(podID string) string {
|
|||||||
|
|
||||||
// ListPods obtains a list of pods with labels that match selector.
|
// ListPods obtains a list of pods with labels that match selector.
|
||||||
func (r *Registry) ListPods(ctx api.Context, selector labels.Selector) (*api.PodList, error) {
|
func (r *Registry) ListPods(ctx api.Context, selector labels.Selector) (*api.PodList, error) {
|
||||||
return r.ListPodsPredicate(func(pod *api.Pod) bool {
|
return r.ListPodsPredicate(ctx, func(pod *api.Pod) bool {
|
||||||
return selector.Matches(labels.Set(pod.Labels))
|
return selector.Matches(labels.Set(pod.Labels))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListPodsPredicate obtains a list of pods that match filter.
|
// ListPodsPredicate obtains a list of pods that match filter.
|
||||||
func (r *Registry) ListPodsPredicate(filter func(*api.Pod) bool) (*api.PodList, error) {
|
func (r *Registry) ListPodsPredicate(ctx api.Context, filter func(*api.Pod) bool) (*api.PodList, error) {
|
||||||
allPods := api.PodList{}
|
allPods := api.PodList{}
|
||||||
err := r.ExtractList("/registry/pods", &allPods.Items, &allPods.ResourceVersion)
|
err := r.ExtractList("/registry/pods", &allPods.Items, &allPods.ResourceVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -83,7 +83,7 @@ func (r *Registry) ListPodsPredicate(filter func(*api.Pod) bool) (*api.PodList,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// WatchPods begins watching for new, changed, or deleted pods.
|
// WatchPods begins watching for new, changed, or deleted pods.
|
||||||
func (r *Registry) WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) {
|
func (r *Registry) WatchPods(ctx api.Context, resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) {
|
||||||
return r.WatchList("/registry/pods", resourceVersion, func(obj runtime.Object) bool {
|
return r.WatchList("/registry/pods", resourceVersion, func(obj runtime.Object) bool {
|
||||||
switch t := obj.(type) {
|
switch t := obj.(type) {
|
||||||
case *api.Pod:
|
case *api.Pod:
|
||||||
@ -96,7 +96,7 @@ func (r *Registry) WatchPods(resourceVersion uint64, filter func(*api.Pod) bool)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetPod gets a specific pod specified by its ID.
|
// GetPod gets a specific pod specified by its ID.
|
||||||
func (r *Registry) GetPod(podID string) (*api.Pod, error) {
|
func (r *Registry) GetPod(ctx api.Context, podID string) (*api.Pod, error) {
|
||||||
var pod api.Pod
|
var pod api.Pod
|
||||||
if err := r.ExtractObj(makePodKey(podID), &pod, false); err != nil {
|
if err := r.ExtractObj(makePodKey(podID), &pod, false); err != nil {
|
||||||
return nil, etcderr.InterpretGetError(err, "pod", podID)
|
return nil, etcderr.InterpretGetError(err, "pod", podID)
|
||||||
@ -113,7 +113,7 @@ func makeContainerKey(machine string) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// CreatePod creates a pod based on a specification.
|
// CreatePod creates a pod based on a specification.
|
||||||
func (r *Registry) CreatePod(pod *api.Pod) error {
|
func (r *Registry) CreatePod(ctx api.Context, pod *api.Pod) error {
|
||||||
// Set current status to "Waiting".
|
// Set current status to "Waiting".
|
||||||
pod.CurrentState.Status = api.PodWaiting
|
pod.CurrentState.Status = api.PodWaiting
|
||||||
pod.CurrentState.Host = ""
|
pod.CurrentState.Host = ""
|
||||||
@ -125,7 +125,7 @@ func (r *Registry) CreatePod(pod *api.Pod) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ApplyBinding implements binding's registry
|
// ApplyBinding implements binding's registry
|
||||||
func (r *Registry) ApplyBinding(binding *api.Binding) error {
|
func (r *Registry) ApplyBinding(ctx api.Context, binding *api.Binding) error {
|
||||||
return etcderr.InterpretCreateError(r.assignPod(binding.PodID, binding.Host), "binding", "")
|
return etcderr.InterpretCreateError(r.assignPod(binding.PodID, binding.Host), "binding", "")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -178,12 +178,12 @@ func (r *Registry) assignPod(podID string, machine string) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Registry) UpdatePod(pod *api.Pod) error {
|
func (r *Registry) UpdatePod(ctx api.Context, pod *api.Pod) error {
|
||||||
return fmt.Errorf("unimplemented!")
|
return fmt.Errorf("unimplemented!")
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeletePod deletes an existing pod specified by its ID.
|
// DeletePod deletes an existing pod specified by its ID.
|
||||||
func (r *Registry) DeletePod(podID string) error {
|
func (r *Registry) DeletePod(ctx api.Context, podID string) error {
|
||||||
var pod api.Pod
|
var pod api.Pod
|
||||||
podKey := makePodKey(podID)
|
podKey := makePodKey(podID)
|
||||||
err := r.ExtractObj(podKey, &pod, false)
|
err := r.ExtractObj(podKey, &pod, false)
|
||||||
@ -226,14 +226,14 @@ func (r *Registry) DeletePod(podID string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ListControllers obtains a list of ReplicationControllers.
|
// ListControllers obtains a list of ReplicationControllers.
|
||||||
func (r *Registry) ListControllers() (*api.ReplicationControllerList, error) {
|
func (r *Registry) ListControllers(ctx api.Context) (*api.ReplicationControllerList, error) {
|
||||||
controllers := &api.ReplicationControllerList{}
|
controllers := &api.ReplicationControllerList{}
|
||||||
err := r.ExtractList("/registry/controllers", &controllers.Items, &controllers.ResourceVersion)
|
err := r.ExtractList("/registry/controllers", &controllers.Items, &controllers.ResourceVersion)
|
||||||
return controllers, err
|
return controllers, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// WatchControllers begins watching for new, changed, or deleted controllers.
|
// WatchControllers begins watching for new, changed, or deleted controllers.
|
||||||
func (r *Registry) WatchControllers(resourceVersion uint64) (watch.Interface, error) {
|
func (r *Registry) WatchControllers(ctx api.Context, resourceVersion uint64) (watch.Interface, error) {
|
||||||
return r.WatchList("/registry/controllers", resourceVersion, tools.Everything)
|
return r.WatchList("/registry/controllers", resourceVersion, tools.Everything)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -242,7 +242,7 @@ func makeControllerKey(id string) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetController gets a specific ReplicationController specified by its ID.
|
// GetController gets a specific ReplicationController specified by its ID.
|
||||||
func (r *Registry) GetController(controllerID string) (*api.ReplicationController, error) {
|
func (r *Registry) GetController(ctx api.Context, controllerID string) (*api.ReplicationController, error) {
|
||||||
var controller api.ReplicationController
|
var controller api.ReplicationController
|
||||||
key := makeControllerKey(controllerID)
|
key := makeControllerKey(controllerID)
|
||||||
err := r.ExtractObj(key, &controller, false)
|
err := r.ExtractObj(key, &controller, false)
|
||||||
@ -253,19 +253,19 @@ func (r *Registry) GetController(controllerID string) (*api.ReplicationControlle
|
|||||||
}
|
}
|
||||||
|
|
||||||
// CreateController creates a new ReplicationController.
|
// CreateController creates a new ReplicationController.
|
||||||
func (r *Registry) CreateController(controller *api.ReplicationController) error {
|
func (r *Registry) CreateController(ctx api.Context, controller *api.ReplicationController) error {
|
||||||
err := r.CreateObj(makeControllerKey(controller.ID), controller, 0)
|
err := r.CreateObj(makeControllerKey(controller.ID), controller, 0)
|
||||||
return etcderr.InterpretCreateError(err, "replicationController", controller.ID)
|
return etcderr.InterpretCreateError(err, "replicationController", controller.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateController replaces an existing ReplicationController.
|
// UpdateController replaces an existing ReplicationController.
|
||||||
func (r *Registry) UpdateController(controller *api.ReplicationController) error {
|
func (r *Registry) UpdateController(ctx api.Context, controller *api.ReplicationController) error {
|
||||||
err := r.SetObj(makeControllerKey(controller.ID), controller)
|
err := r.SetObj(makeControllerKey(controller.ID), controller)
|
||||||
return etcderr.InterpretUpdateError(err, "replicationController", controller.ID)
|
return etcderr.InterpretUpdateError(err, "replicationController", controller.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteController deletes a ReplicationController specified by its ID.
|
// DeleteController deletes a ReplicationController specified by its ID.
|
||||||
func (r *Registry) DeleteController(controllerID string) error {
|
func (r *Registry) DeleteController(ctx api.Context, controllerID string) error {
|
||||||
key := makeControllerKey(controllerID)
|
key := makeControllerKey(controllerID)
|
||||||
err := r.Delete(key, false)
|
err := r.Delete(key, false)
|
||||||
return etcderr.InterpretDeleteError(err, "replicationController", controllerID)
|
return etcderr.InterpretDeleteError(err, "replicationController", controllerID)
|
||||||
@ -276,20 +276,20 @@ func makeServiceKey(name string) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ListServices obtains a list of Services.
|
// ListServices obtains a list of Services.
|
||||||
func (r *Registry) ListServices() (*api.ServiceList, error) {
|
func (r *Registry) ListServices(ctx api.Context) (*api.ServiceList, error) {
|
||||||
list := &api.ServiceList{}
|
list := &api.ServiceList{}
|
||||||
err := r.ExtractList("/registry/services/specs", &list.Items, &list.ResourceVersion)
|
err := r.ExtractList("/registry/services/specs", &list.Items, &list.ResourceVersion)
|
||||||
return list, err
|
return list, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateService creates a new Service.
|
// CreateService creates a new Service.
|
||||||
func (r *Registry) CreateService(svc *api.Service) error {
|
func (r *Registry) CreateService(ctx api.Context, svc *api.Service) error {
|
||||||
err := r.CreateObj(makeServiceKey(svc.ID), svc, 0)
|
err := r.CreateObj(makeServiceKey(svc.ID), svc, 0)
|
||||||
return etcderr.InterpretCreateError(err, "service", svc.ID)
|
return etcderr.InterpretCreateError(err, "service", svc.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetService obtains a Service specified by its name.
|
// GetService obtains a Service specified by its name.
|
||||||
func (r *Registry) GetService(name string) (*api.Service, error) {
|
func (r *Registry) GetService(ctx api.Context, name string) (*api.Service, error) {
|
||||||
key := makeServiceKey(name)
|
key := makeServiceKey(name)
|
||||||
var svc api.Service
|
var svc api.Service
|
||||||
err := r.ExtractObj(key, &svc, false)
|
err := r.ExtractObj(key, &svc, false)
|
||||||
@ -300,7 +300,7 @@ func (r *Registry) GetService(name string) (*api.Service, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetEndpoints obtains the endpoints for the service identified by 'name'.
|
// GetEndpoints obtains the endpoints for the service identified by 'name'.
|
||||||
func (r *Registry) GetEndpoints(name string) (*api.Endpoints, error) {
|
func (r *Registry) GetEndpoints(ctx api.Context, name string) (*api.Endpoints, error) {
|
||||||
key := makeServiceEndpointsKey(name)
|
key := makeServiceEndpointsKey(name)
|
||||||
var endpoints api.Endpoints
|
var endpoints api.Endpoints
|
||||||
err := r.ExtractObj(key, &endpoints, false)
|
err := r.ExtractObj(key, &endpoints, false)
|
||||||
@ -315,7 +315,7 @@ func makeServiceEndpointsKey(name string) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// DeleteService deletes a Service specified by its name.
|
// DeleteService deletes a Service specified by its name.
|
||||||
func (r *Registry) DeleteService(name string) error {
|
func (r *Registry) DeleteService(ctx api.Context, name string) error {
|
||||||
key := makeServiceKey(name)
|
key := makeServiceKey(name)
|
||||||
err := r.Delete(key, true)
|
err := r.Delete(key, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -332,13 +332,13 @@ func (r *Registry) DeleteService(name string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// UpdateService replaces an existing Service.
|
// UpdateService replaces an existing Service.
|
||||||
func (r *Registry) UpdateService(svc *api.Service) error {
|
func (r *Registry) UpdateService(ctx api.Context, svc *api.Service) error {
|
||||||
err := r.SetObj(makeServiceKey(svc.ID), svc)
|
err := r.SetObj(makeServiceKey(svc.ID), svc)
|
||||||
return etcderr.InterpretUpdateError(err, "service", svc.ID)
|
return etcderr.InterpretUpdateError(err, "service", svc.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// WatchServices begins watching for new, changed, or deleted service configurations.
|
// WatchServices begins watching for new, changed, or deleted service configurations.
|
||||||
func (r *Registry) WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
|
func (r *Registry) WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
|
||||||
if !label.Empty() {
|
if !label.Empty() {
|
||||||
return nil, fmt.Errorf("label selectors are not supported on services")
|
return nil, fmt.Errorf("label selectors are not supported on services")
|
||||||
}
|
}
|
||||||
@ -352,14 +352,14 @@ func (r *Registry) WatchServices(label, field labels.Selector, resourceVersion u
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ListEndpoints obtains a list of Services.
|
// ListEndpoints obtains a list of Services.
|
||||||
func (r *Registry) ListEndpoints() (*api.EndpointsList, error) {
|
func (r *Registry) ListEndpoints(ctx api.Context) (*api.EndpointsList, error) {
|
||||||
list := &api.EndpointsList{}
|
list := &api.EndpointsList{}
|
||||||
err := r.ExtractList("/registry/services/endpoints", &list.Items, &list.ResourceVersion)
|
err := r.ExtractList("/registry/services/endpoints", &list.Items, &list.ResourceVersion)
|
||||||
return list, err
|
return list, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateEndpoints update Endpoints of a Service.
|
// UpdateEndpoints update Endpoints of a Service.
|
||||||
func (r *Registry) UpdateEndpoints(e *api.Endpoints) error {
|
func (r *Registry) UpdateEndpoints(ctx api.Context, e *api.Endpoints) error {
|
||||||
// TODO: this is a really bad misuse of AtomicUpdate, need to compute a diff inside the loop.
|
// TODO: this is a really bad misuse of AtomicUpdate, need to compute a diff inside the loop.
|
||||||
err := r.AtomicUpdate(makeServiceEndpointsKey(e.ID), &api.Endpoints{},
|
err := r.AtomicUpdate(makeServiceEndpointsKey(e.ID), &api.Endpoints{},
|
||||||
func(input runtime.Object) (runtime.Object, error) {
|
func(input runtime.Object) (runtime.Object, error) {
|
||||||
@ -370,7 +370,7 @@ func (r *Registry) UpdateEndpoints(e *api.Endpoints) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// WatchEndpoints begins watching for new, changed, or deleted endpoint configurations.
|
// WatchEndpoints begins watching for new, changed, or deleted endpoint configurations.
|
||||||
func (r *Registry) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
|
func (r *Registry) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
|
||||||
if !label.Empty() {
|
if !label.Empty() {
|
||||||
return nil, fmt.Errorf("label selectors are not supported on endpoints")
|
return nil, fmt.Errorf("label selectors are not supported on endpoints")
|
||||||
}
|
}
|
||||||
|
@ -32,7 +32,7 @@ type BasicManifestFactory struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (b *BasicManifestFactory) MakeManifest(machine string, pod api.Pod) (api.ContainerManifest, error) {
|
func (b *BasicManifestFactory) MakeManifest(machine string, pod api.Pod) (api.ContainerManifest, error) {
|
||||||
envVars, err := service.GetServiceEnvironmentVariables(b.ServiceRegistry, machine)
|
envVars, err := service.GetServiceEnvironmentVariables(api.NewContext(), b.ServiceRegistry, machine)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return api.ContainerManifest{}, err
|
return api.ContainerManifest{}, err
|
||||||
}
|
}
|
||||||
|
@ -27,15 +27,15 @@ type Registry interface {
|
|||||||
// ListPods obtains a list of pods having labels which match selector.
|
// ListPods obtains a list of pods having labels which match selector.
|
||||||
ListPods(ctx api.Context, selector labels.Selector) (*api.PodList, error)
|
ListPods(ctx api.Context, selector labels.Selector) (*api.PodList, error)
|
||||||
// ListPodsPredicate obtains a list of pods for which filter returns true.
|
// ListPodsPredicate obtains a list of pods for which filter returns true.
|
||||||
ListPodsPredicate(filter func(*api.Pod) bool) (*api.PodList, error)
|
ListPodsPredicate(ctx api.Context, filter func(*api.Pod) bool) (*api.PodList, error)
|
||||||
// Watch for new/changed/deleted pods
|
// Watch for new/changed/deleted pods
|
||||||
WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error)
|
WatchPods(ctx api.Context, resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error)
|
||||||
// Get a specific pod
|
// Get a specific pod
|
||||||
GetPod(podID string) (*api.Pod, error)
|
GetPod(ctx api.Context, podID string) (*api.Pod, error)
|
||||||
// Create a pod based on a specification.
|
// Create a pod based on a specification.
|
||||||
CreatePod(pod *api.Pod) error
|
CreatePod(ctx api.Context, pod *api.Pod) error
|
||||||
// Update an existing pod
|
// Update an existing pod
|
||||||
UpdatePod(pod *api.Pod) error
|
UpdatePod(ctx api.Context, pod *api.Pod) error
|
||||||
// Delete an existing pod
|
// Delete an existing pod
|
||||||
DeletePod(podID string) error
|
DeletePod(ctx api.Context, podID string) error
|
||||||
}
|
}
|
||||||
|
@ -81,21 +81,21 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje
|
|||||||
pod.CreationTimestamp = util.Now()
|
pod.CreationTimestamp = util.Now()
|
||||||
|
|
||||||
return apiserver.MakeAsync(func() (runtime.Object, error) {
|
return apiserver.MakeAsync(func() (runtime.Object, error) {
|
||||||
if err := rs.registry.CreatePod(pod); err != nil {
|
if err := rs.registry.CreatePod(ctx, pod); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return rs.registry.GetPod(pod.ID)
|
return rs.registry.GetPod(ctx, pod.ID)
|
||||||
}), nil
|
}), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rs *REST) Delete(ctx api.Context, id string) (<-chan runtime.Object, error) {
|
func (rs *REST) Delete(ctx api.Context, id string) (<-chan runtime.Object, error) {
|
||||||
return apiserver.MakeAsync(func() (runtime.Object, error) {
|
return apiserver.MakeAsync(func() (runtime.Object, error) {
|
||||||
return &api.Status{Status: api.StatusSuccess}, rs.registry.DeletePod(id)
|
return &api.Status{Status: api.StatusSuccess}, rs.registry.DeletePod(ctx, id)
|
||||||
}), nil
|
}), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) {
|
func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) {
|
||||||
pod, err := rs.registry.GetPod(id)
|
pod, err := rs.registry.GetPod(ctx, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return pod, err
|
return pod, err
|
||||||
}
|
}
|
||||||
@ -132,7 +132,7 @@ func (rs *REST) filterFunc(label, field labels.Selector) func(*api.Pod) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) {
|
func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) {
|
||||||
pods, err := rs.registry.ListPodsPredicate(rs.filterFunc(label, field))
|
pods, err := rs.registry.ListPodsPredicate(ctx, rs.filterFunc(label, field))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
for i := range pods.Items {
|
for i := range pods.Items {
|
||||||
pod := &pods.Items[i]
|
pod := &pods.Items[i]
|
||||||
@ -150,7 +150,7 @@ func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Obj
|
|||||||
|
|
||||||
// Watch begins watching for new, changed, or deleted pods.
|
// Watch begins watching for new, changed, or deleted pods.
|
||||||
func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
|
func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
|
||||||
return rs.registry.WatchPods(resourceVersion, rs.filterFunc(label, field))
|
return rs.registry.WatchPods(ctx, resourceVersion, rs.filterFunc(label, field))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*REST) New() runtime.Object {
|
func (*REST) New() runtime.Object {
|
||||||
@ -163,10 +163,10 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje
|
|||||||
return nil, errors.NewInvalid("pod", pod.ID, errs)
|
return nil, errors.NewInvalid("pod", pod.ID, errs)
|
||||||
}
|
}
|
||||||
return apiserver.MakeAsync(func() (runtime.Object, error) {
|
return apiserver.MakeAsync(func() (runtime.Object, error) {
|
||||||
if err := rs.registry.UpdatePod(pod); err != nil {
|
if err := rs.registry.UpdatePod(ctx, pod); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return rs.registry.GetPod(pod.ID)
|
return rs.registry.GetPod(ctx, pod.ID)
|
||||||
}), nil
|
}), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -25,12 +25,12 @@ import (
|
|||||||
|
|
||||||
// Registry is an interface for things that know how to store services.
|
// Registry is an interface for things that know how to store services.
|
||||||
type Registry interface {
|
type Registry interface {
|
||||||
ListServices() (*api.ServiceList, error)
|
ListServices(ctx api.Context) (*api.ServiceList, error)
|
||||||
CreateService(svc *api.Service) error
|
CreateService(ctx api.Context, svc *api.Service) error
|
||||||
GetService(name string) (*api.Service, error)
|
GetService(ctx api.Context, name string) (*api.Service, error)
|
||||||
DeleteService(name string) error
|
DeleteService(ctx api.Context, name string) error
|
||||||
UpdateService(svc *api.Service) error
|
UpdateService(ctx api.Context, svc *api.Service) error
|
||||||
WatchServices(labels, fields labels.Selector, resourceVersion uint64) (watch.Interface, error)
|
WatchServices(ctx api.Context, labels, fields labels.Selector, resourceVersion uint64) (watch.Interface, error)
|
||||||
|
|
||||||
// TODO: endpoints and their implementation should be separated, setting endpoints should be
|
// TODO: endpoints and their implementation should be separated, setting endpoints should be
|
||||||
// supported via the API, and the endpoints-controller should use the API to update endpoints.
|
// supported via the API, and the endpoints-controller should use the API to update endpoints.
|
||||||
|
@ -86,27 +86,27 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err := rs.registry.CreateService(srv)
|
err := rs.registry.CreateService(ctx, srv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return rs.registry.GetService(srv.ID)
|
return rs.registry.GetService(ctx, srv.ID)
|
||||||
}), nil
|
}), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rs *REST) Delete(ctx api.Context, id string) (<-chan runtime.Object, error) {
|
func (rs *REST) Delete(ctx api.Context, id string) (<-chan runtime.Object, error) {
|
||||||
service, err := rs.registry.GetService(id)
|
service, err := rs.registry.GetService(ctx, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return apiserver.MakeAsync(func() (runtime.Object, error) {
|
return apiserver.MakeAsync(func() (runtime.Object, error) {
|
||||||
rs.deleteExternalLoadBalancer(service)
|
rs.deleteExternalLoadBalancer(service)
|
||||||
return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteService(id)
|
return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteService(ctx, id)
|
||||||
}), nil
|
}), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) {
|
func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) {
|
||||||
s, err := rs.registry.GetService(id)
|
s, err := rs.registry.GetService(ctx, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -115,7 +115,7 @@ func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) {
|
|||||||
|
|
||||||
// TODO: implement field selector?
|
// TODO: implement field selector?
|
||||||
func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) {
|
func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) {
|
||||||
list, err := rs.registry.ListServices()
|
list, err := rs.registry.ListServices(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -132,7 +132,7 @@ func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Obj
|
|||||||
// Watch returns Services events via a watch.Interface.
|
// Watch returns Services events via a watch.Interface.
|
||||||
// It implements apiserver.ResourceWatcher.
|
// It implements apiserver.ResourceWatcher.
|
||||||
func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
|
func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
|
||||||
return rs.registry.WatchServices(label, field, resourceVersion)
|
return rs.registry.WatchServices(ctx, label, field, resourceVersion)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*REST) New() runtime.Object {
|
func (*REST) New() runtime.Object {
|
||||||
@ -141,9 +141,9 @@ func (*REST) New() runtime.Object {
|
|||||||
|
|
||||||
// GetServiceEnvironmentVariables populates a list of environment variables that are use
|
// GetServiceEnvironmentVariables populates a list of environment variables that are use
|
||||||
// in the container environment to get access to services.
|
// in the container environment to get access to services.
|
||||||
func GetServiceEnvironmentVariables(registry Registry, machine string) ([]api.EnvVar, error) {
|
func GetServiceEnvironmentVariables(ctx api.Context, registry Registry, machine string) ([]api.EnvVar, error) {
|
||||||
var result []api.EnvVar
|
var result []api.EnvVar
|
||||||
services, err := registry.ListServices()
|
services, err := registry.ListServices(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
@ -170,17 +170,17 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje
|
|||||||
}
|
}
|
||||||
return apiserver.MakeAsync(func() (runtime.Object, error) {
|
return apiserver.MakeAsync(func() (runtime.Object, error) {
|
||||||
// TODO: check to see if external load balancer status changed
|
// TODO: check to see if external load balancer status changed
|
||||||
err := rs.registry.UpdateService(srv)
|
err := rs.registry.UpdateService(ctx, srv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return rs.registry.GetService(srv.ID)
|
return rs.registry.GetService(ctx, srv.ID)
|
||||||
}), nil
|
}), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ResourceLocation returns a URL to which one can send traffic for the specified service.
|
// ResourceLocation returns a URL to which one can send traffic for the specified service.
|
||||||
func (rs *REST) ResourceLocation(ctx api.Context, id string) (string, error) {
|
func (rs *REST) ResourceLocation(ctx api.Context, id string) (string, error) {
|
||||||
e, err := rs.registry.GetEndpoints(id)
|
e, err := rs.registry.GetEndpoints(ctx, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
@ -73,7 +73,7 @@ func (e *EndpointController) SyncServiceEndpoints() error {
|
|||||||
endpoints[ix] = net.JoinHostPort(pod.CurrentState.PodIP, strconv.Itoa(port))
|
endpoints[ix] = net.JoinHostPort(pod.CurrentState.PodIP, strconv.Itoa(port))
|
||||||
}
|
}
|
||||||
// TODO: this is totally broken, we need to compute this and store inside an AtomicUpdate loop.
|
// TODO: this is totally broken, we need to compute this and store inside an AtomicUpdate loop.
|
||||||
err = e.serviceRegistry.UpdateEndpoints(&api.Endpoints{
|
err = e.serviceRegistry.UpdateEndpoints(api.NewContext(), &api.Endpoints{
|
||||||
JSONBase: api.JSONBase{ID: service.ID},
|
JSONBase: api.JSONBase{ID: service.ID},
|
||||||
Endpoints: endpoints,
|
Endpoints: endpoints,
|
||||||
})
|
})
|
||||||
|
Loading…
Reference in New Issue
Block a user