From 110b064d630ca39220696225dd597e7d33b95f4f Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Sun, 4 Feb 2018 22:38:39 -0500 Subject: [PATCH] Make Service storage a wrapper around other storages The registry abstraction is unnecessary and adds direct coupling to the core types. By using a wrapper, we carry through the default implementations of the non-mutating operations. The DeleteCollection method is explicitly patched out since it cannot be correctly implemented on the storage currently. As a result, TableConvertor is now exposed. A few other minor refactorings * Corrected the case of some variables * Used functions instead of methods for several helper methods * Removed the legacy Deleter - service was the only remaining consumer --- pkg/registry/core/rest/storage_core.go | 16 +- .../portallocator/controller/repair.go | 14 +- pkg/registry/core/service/proxy.go | 4 +- .../core/service/{ => storage}/rest.go | 245 ++++++++++-------- .../core/service/{ => storage}/rest_test.go | 218 +++++++++++----- pkg/registry/core/service/storage/storage.go | 22 +- .../core/service/storage/storage_test.go | 6 +- pkg/registry/core/service/strategy_test.go | 7 +- .../apiserver/pkg/endpoints/apiserver_test.go | 59 ----- .../apiserver/pkg/endpoints/installer.go | 11 +- .../apiserver/pkg/registry/rest/rest.go | 21 -- test/e2e/apimachinery/table_conversion.go | 12 +- 12 files changed, 349 insertions(+), 286 deletions(-) rename pkg/registry/core/service/{ => storage}/rest.go (79%) rename pkg/registry/core/service/{ => storage}/rest_test.go (90%) diff --git a/pkg/registry/core/rest/storage_core.go b/pkg/registry/core/rest/storage_core.go index bfb8e1a4c13..c937e4f2873 100644 --- a/pkg/registry/core/rest/storage_core.go +++ b/pkg/registry/core/rest/storage_core.go @@ -44,7 +44,6 @@ import ( "k8s.io/kubernetes/pkg/master/ports" "k8s.io/kubernetes/pkg/registry/core/componentstatus" configmapstore "k8s.io/kubernetes/pkg/registry/core/configmap/storage" - "k8s.io/kubernetes/pkg/registry/core/endpoint" endpointsstore "k8s.io/kubernetes/pkg/registry/core/endpoint/storage" eventstore "k8s.io/kubernetes/pkg/registry/core/event/storage" limitrangestore "k8s.io/kubernetes/pkg/registry/core/limitrange/storage" @@ -58,7 +57,6 @@ import ( controllerstore "k8s.io/kubernetes/pkg/registry/core/replicationcontroller/storage" resourcequotastore "k8s.io/kubernetes/pkg/registry/core/resourcequota/storage" secretstore "k8s.io/kubernetes/pkg/registry/core/secret/storage" - "k8s.io/kubernetes/pkg/registry/core/service" "k8s.io/kubernetes/pkg/registry/core/service/allocator" serviceallocator "k8s.io/kubernetes/pkg/registry/core/service/allocator/storage" "k8s.io/kubernetes/pkg/registry/core/service/ipallocator" @@ -127,7 +125,6 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespacestore.NewREST(restOptionsGetter) endpointsStorage := endpointsstore.NewREST(restOptionsGetter) - endpointRegistry := endpoint.NewRegistry(endpointsStorage) nodeStorage, err := nodestore.NewStorage(restOptionsGetter, c.KubeletClientConfig, c.ProxyTransport) if err != nil { @@ -148,8 +145,7 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi serviceAccountStorage = serviceaccountstore.NewREST(restOptionsGetter, nil, nil, nil) } - serviceRESTStorage, serviceStatusStorage := servicestore.NewREST(restOptionsGetter) - serviceRegistry := service.NewRegistry(serviceRESTStorage) + serviceRESTStorage, serviceStatusStorage := servicestore.NewGenericREST(restOptionsGetter) var serviceClusterIPRegistry rangeallocation.RangeRegistry serviceClusterIPRange := c.ServiceIPRange @@ -162,7 +158,7 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err } - ServiceClusterIPAllocator := ipallocator.NewAllocatorCIDRRange(&serviceClusterIPRange, func(max int, rangeSpec string) allocator.Interface { + serviceClusterIPAllocator := ipallocator.NewAllocatorCIDRRange(&serviceClusterIPRange, func(max int, rangeSpec string) allocator.Interface { mem := allocator.NewAllocationMap(max, rangeSpec) // TODO etcdallocator package to return a storage interface via the storageFactory etcd := serviceallocator.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), serviceStorageConfig) @@ -172,7 +168,7 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi restStorage.ServiceClusterIPAllocator = serviceClusterIPRegistry var serviceNodePortRegistry rangeallocation.RangeRegistry - ServiceNodePortAllocator := portallocator.NewPortAllocatorCustom(c.ServiceNodePortRange, func(max int, rangeSpec string) allocator.Interface { + serviceNodePortAllocator := portallocator.NewPortAllocatorCustom(c.ServiceNodePortRange, func(max int, rangeSpec string) allocator.Interface { mem := allocator.NewAllocationMap(max, rangeSpec) // TODO etcdallocator package to return a storage interface via the storageFactory etcd := serviceallocator.NewEtcd(mem, "/ranges/servicenodeports", api.Resource("servicenodeportallocations"), serviceStorageConfig) @@ -183,7 +179,7 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi controllerStorage := controllerstore.NewStorage(restOptionsGetter) - serviceRest := service.NewStorage(serviceRegistry, endpointRegistry, podStorage.Pod, ServiceClusterIPAllocator, ServiceNodePortAllocator, c.ProxyTransport) + serviceRest, serviceRestProxy := servicestore.NewREST(serviceRESTStorage, endpointsStorage, podStorage.Pod, serviceClusterIPAllocator, serviceNodePortAllocator, c.ProxyTransport) restStorageMap := map[string]rest.Storage{ "pods": podStorage.Pod, @@ -201,8 +197,8 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi "replicationControllers": controllerStorage.Controller, "replicationControllers/status": controllerStorage.Status, - "services": serviceRest.Service, - "services/proxy": serviceRest.Proxy, + "services": serviceRest, + "services/proxy": serviceRestProxy, "services/status": serviceStatusStorage, "endpoints": endpointsStorage, diff --git a/pkg/registry/core/service/portallocator/controller/repair.go b/pkg/registry/core/service/portallocator/controller/repair.go index e7024ded053..1e4dc685c3d 100644 --- a/pkg/registry/core/service/portallocator/controller/repair.go +++ b/pkg/registry/core/service/portallocator/controller/repair.go @@ -32,7 +32,6 @@ import ( api "k8s.io/kubernetes/pkg/apis/core" coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" "k8s.io/kubernetes/pkg/registry/core/rangeallocation" - "k8s.io/kubernetes/pkg/registry/core/service" "k8s.io/kubernetes/pkg/registry/core/service/portallocator" ) @@ -126,7 +125,7 @@ func (c *Repair) runOnce() error { // Check every Service's ports, and rebuild the state as we think it should be. for i := range list.Items { svc := &list.Items[i] - ports := service.CollectServiceNodePorts(svc) + ports := collectServiceNodePorts(svc) if len(ports) == 0 { continue } @@ -196,3 +195,14 @@ func (c *Repair) runOnce() error { } return nil } + +func collectServiceNodePorts(service *api.Service) []int { + servicePorts := []int{} + for i := range service.Spec.Ports { + servicePort := &service.Spec.Ports[i] + if servicePort.NodePort != 0 { + servicePorts = append(servicePorts, int(servicePort.NodePort)) + } + } + return servicePorts +} diff --git a/pkg/registry/core/service/proxy.go b/pkg/registry/core/service/proxy.go index 1956f276fb3..23e016b07af 100644 --- a/pkg/registry/core/service/proxy.go +++ b/pkg/registry/core/service/proxy.go @@ -32,7 +32,7 @@ import ( // ProxyREST implements the proxy subresource for a Service type ProxyREST struct { - ServiceRest *REST + Redirector rest.Redirector ProxyTransport http.RoundTripper } @@ -62,7 +62,7 @@ func (r *ProxyREST) Connect(ctx genericapirequest.Context, id string, opts runti if !ok { return nil, fmt.Errorf("Invalid options object: %#v", opts) } - location, transport, err := r.ServiceRest.ResourceLocation(ctx, id) + location, transport, err := r.Redirector.ResourceLocation(ctx, id) if err != nil { return nil, err } diff --git a/pkg/registry/core/service/rest.go b/pkg/registry/core/service/storage/rest.go similarity index 79% rename from pkg/registry/core/service/rest.go rename to pkg/registry/core/service/storage/rest.go index 72e068d0246..6dea373d189 100644 --- a/pkg/registry/core/service/rest.go +++ b/pkg/registry/core/service/storage/rest.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package service +package storage import ( "fmt" @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1" "k8s.io/apimachinery/pkg/runtime" utilnet "k8s.io/apimachinery/pkg/util/net" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -36,31 +37,25 @@ import ( genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" utilfeature "k8s.io/apiserver/pkg/util/feature" + apiservice "k8s.io/kubernetes/pkg/api/service" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/apis/core/helper" "k8s.io/kubernetes/pkg/apis/core/validation" "k8s.io/kubernetes/pkg/features" - "k8s.io/kubernetes/pkg/registry/core/endpoint" - podstore "k8s.io/kubernetes/pkg/registry/core/pod/storage" + registry "k8s.io/kubernetes/pkg/registry/core/service" "k8s.io/kubernetes/pkg/registry/core/service/ipallocator" "k8s.io/kubernetes/pkg/registry/core/service/portallocator" ) -// ServiceRest includes storage for services and all sub resources -type ServiceRest struct { - Service *REST - Proxy *ProxyREST -} - // REST adapts a service registry into apiserver's RESTStorage model. type REST struct { - registry Registry - endpoints endpoint.Registry + services ServiceStorage + endpoints EndpointsStorage serviceIPs ipallocator.Interface serviceNodePorts portallocator.Interface proxyTransport http.RoundTripper - pods *podstore.REST + pods rest.Getter } // ServiceNodePort includes protocol and port number of a service NodePort. @@ -73,23 +68,50 @@ type ServiceNodePort struct { NodePort int32 } -// NewStorage returns a new REST. -func NewStorage(registry Registry, endpoints endpoint.Registry, pods *podstore.REST, serviceIPs ipallocator.Interface, - serviceNodePorts portallocator.Interface, proxyTransport http.RoundTripper) *ServiceRest { +type ServiceStorage interface { + rest.Getter + rest.Lister + rest.CreaterUpdater + rest.GracefulDeleter + rest.Watcher + rest.TableConvertor + rest.Exporter +} + +type EndpointsStorage interface { + rest.Getter + rest.GracefulDeleter +} + +// NewREST returns a wrapper around the underlying generic storage and performs +// allocations and deallocations of various service related resources like ports. +// TODO: all transactional behavior should be supported from within generic storage +// or the strategy. +func NewREST( + services ServiceStorage, + endpoints EndpointsStorage, + pods rest.Getter, + serviceIPs ipallocator.Interface, + serviceNodePorts portallocator.Interface, + proxyTransport http.RoundTripper, +) (*REST, *registry.ProxyREST) { rest := &REST{ - registry: registry, + services: services, endpoints: endpoints, serviceIPs: serviceIPs, serviceNodePorts: serviceNodePorts, proxyTransport: proxyTransport, pods: pods, } - return &ServiceRest{ - Service: rest, - Proxy: &ProxyREST{ServiceRest: rest, ProxyTransport: proxyTransport}, - } + return rest, ®istry.ProxyREST{Redirector: rest, ProxyTransport: proxyTransport} } +var ( + _ ServiceStorage = &REST{} + _ rest.CategoriesProvider = &REST{} + _ rest.ShortNamesProvider = &REST{} +) + // ShortNames implements the ShortNamesProvider interface. Returns a list of short names for a resource. func (rs *REST) ShortNames() []string { return []string{"svc"} @@ -100,11 +122,34 @@ func (rs *REST) Categories() []string { return []string{"all"} } -// TODO: implement includeUninitialized by refactoring this to move to store +func (rs *REST) New() runtime.Object { + return rs.services.New() +} + +func (rs *REST) NewList() runtime.Object { + return rs.services.NewList() +} + +func (rs *REST) Get(ctx genericapirequest.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { + return rs.services.Get(ctx, name, options) +} + +func (rs *REST) List(ctx genericapirequest.Context, options *metainternalversion.ListOptions) (runtime.Object, error) { + return rs.services.List(ctx, options) +} + +func (rs *REST) Watch(ctx genericapirequest.Context, options *metainternalversion.ListOptions) (watch.Interface, error) { + return rs.services.Watch(ctx, options) +} + +func (rs *REST) Export(ctx genericapirequest.Context, name string, opts metav1.ExportOptions) (runtime.Object, error) { + return rs.services.Export(ctx, name, opts) +} + func (rs *REST) Create(ctx genericapirequest.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, includeUninitialized bool) (runtime.Object, error) { service := obj.(*api.Service) - if err := rest.BeforeCreate(Strategy, ctx, obj); err != nil { + if err := rest.BeforeCreate(registry.Strategy, ctx, obj); err != nil { return nil, err } @@ -120,7 +165,7 @@ func (rs *REST) Create(ctx genericapirequest.Context, obj runtime.Object, create var err error if service.Spec.Type != api.ServiceTypeExternalName { - if releaseServiceIP, err = rs.initClusterIP(service); err != nil { + if releaseServiceIP, err = initClusterIP(service, rs.serviceIPs); err != nil { return nil, err } } @@ -129,14 +174,14 @@ func (rs *REST) Create(ctx genericapirequest.Context, obj runtime.Object, create defer nodePortOp.Finish() if service.Spec.Type == api.ServiceTypeNodePort || service.Spec.Type == api.ServiceTypeLoadBalancer { - if err := rs.initNodePorts(service, nodePortOp); err != nil { + if err := initNodePorts(service, nodePortOp); err != nil { return nil, err } } // Handle ExternalTraffic related fields during service creation. if apiservice.NeedsHealthCheck(service) { - if err := rs.allocateHealthCheckNodePort(service, nodePortOp); err != nil { + if err := allocateHealthCheckNodePort(service, nodePortOp); err != nil { return nil, errors.NewInternalError(err) } } @@ -144,16 +189,16 @@ func (rs *REST) Create(ctx genericapirequest.Context, obj runtime.Object, create return nil, errors.NewInvalid(api.Kind("Service"), service.Name, errs) } - out, err := rs.registry.CreateService(ctx, service, createValidation) + out, err := rs.services.Create(ctx, service, createValidation, includeUninitialized) if err != nil { - err = rest.CheckGeneratedNameError(Strategy, err, service) + err = rest.CheckGeneratedNameError(registry.Strategy, err, service) } if err == nil { el := nodePortOp.Commit() if el != nil { // these should be caught by an eventual reconciliation / restart - glog.Errorf("error(s) committing service node-ports changes: %v", el) + utilruntime.HandleError(fmt.Errorf("error(s) committing service node-ports changes: %v", el)) } releaseServiceIP = false @@ -162,75 +207,56 @@ func (rs *REST) Create(ctx genericapirequest.Context, obj runtime.Object, create return out, err } -func (rs *REST) Delete(ctx genericapirequest.Context, id string) (runtime.Object, error) { - service, err := rs.registry.GetService(ctx, id, &metav1.GetOptions{}) +func (rs *REST) Delete(ctx genericapirequest.Context, id string, options *metav1.DeleteOptions) (runtime.Object, bool, error) { + // TODO: handle graceful + obj, _, err := rs.services.Delete(ctx, id, options) if err != nil { - return nil, err + return nil, false, err } - err = rs.registry.DeleteService(ctx, id) - if err != nil { - return nil, err - } + svc := obj.(*api.Service) // TODO: can leave dangling endpoints, and potentially return incorrect // endpoints if a new service is created with the same name - err = rs.endpoints.DeleteEndpoints(ctx, id) + _, _, err = rs.endpoints.Delete(ctx, id, &metav1.DeleteOptions{}) if err != nil && !errors.IsNotFound(err) { - return nil, err + return nil, false, err } - if helper.IsServiceIPSet(service) { - rs.serviceIPs.Release(net.ParseIP(service.Spec.ClusterIP)) + if helper.IsServiceIPSet(svc) { + rs.serviceIPs.Release(net.ParseIP(svc.Spec.ClusterIP)) } - for _, nodePort := range CollectServiceNodePorts(service) { + for _, nodePort := range collectServiceNodePorts(svc) { err := rs.serviceNodePorts.Release(nodePort) if err != nil { // these should be caught by an eventual reconciliation / restart - glog.Errorf("Error releasing service %s node port %d: %v", service.Name, nodePort, err) + utilruntime.HandleError(fmt.Errorf("Error releasing service %s node port %d: %v", svc.Name, nodePort, err)) } } - if apiservice.NeedsHealthCheck(service) { - nodePort := service.Spec.HealthCheckNodePort + if apiservice.NeedsHealthCheck(svc) { + nodePort := svc.Spec.HealthCheckNodePort if nodePort > 0 { err := rs.serviceNodePorts.Release(int(nodePort)) if err != nil { // these should be caught by an eventual reconciliation / restart - utilruntime.HandleError(fmt.Errorf("Error releasing service %s health check node port %d: %v", service.Name, nodePort, err)) + utilruntime.HandleError(fmt.Errorf("Error releasing service %s health check node port %d: %v", svc.Name, nodePort, err)) } } } - return &metav1.Status{Status: metav1.StatusSuccess}, nil -} -func (rs *REST) Get(ctx genericapirequest.Context, id string, options *metav1.GetOptions) (runtime.Object, error) { - return rs.registry.GetService(ctx, id, options) -} - -func (rs *REST) List(ctx genericapirequest.Context, options *metainternalversion.ListOptions) (runtime.Object, error) { - return rs.registry.ListServices(ctx, options) -} - -// Watch returns Services events via a watch.Interface. -// It implements rest.Watcher. -func (rs *REST) Watch(ctx genericapirequest.Context, options *metainternalversion.ListOptions) (watch.Interface, error) { - return rs.registry.WatchServices(ctx, options) -} - -// Export returns Service stripped of cluster-specific information. -// It implements rest.Exporter. -func (rs *REST) Export(ctx genericapirequest.Context, name string, opts metav1.ExportOptions) (runtime.Object, error) { - return rs.registry.ExportService(ctx, name, opts) -} - -func (*REST) New() runtime.Object { - return &api.Service{} -} - -func (*REST) NewList() runtime.Object { - return &api.ServiceList{} + // TODO: this is duplicated from the generic storage, when this wrapper is fully removed we can drop this + details := &metav1.StatusDetails{ + Name: svc.Name, + UID: svc.UID, + } + if info, ok := genericapirequest.RequestInfoFrom(ctx); ok { + details.Group = info.APIGroup + details.Kind = info.Resource // legacy behavior + } + status := &metav1.Status{Status: metav1.StatusSuccess, Details: details} + return status, true, nil } // externalTrafficPolicyUpdate adjusts ExternalTrafficPolicy during service update if needed. @@ -267,7 +293,7 @@ func (rs *REST) healthCheckNodePortUpdate(oldService, service *api.Service, node // Insert health check node port into the service's HealthCheckNodePort field if needed. case !neededHealthCheckNodePort && needsHealthCheckNodePort: glog.Infof("Transition to LoadBalancer type service with ExternalTrafficPolicy=Local") - if err := rs.allocateHealthCheckNodePort(service, nodePortOp); err != nil { + if err := allocateHealthCheckNodePort(service, nodePortOp); err != nil { return false, errors.NewInternalError(err) } @@ -295,10 +321,11 @@ func (rs *REST) healthCheckNodePortUpdate(oldService, service *api.Service, node } func (rs *REST) Update(ctx genericapirequest.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc) (runtime.Object, bool, error) { - oldService, err := rs.registry.GetService(ctx, name, &metav1.GetOptions{}) + oldObj, err := rs.services.Get(ctx, name, &metav1.GetOptions{}) if err != nil { return nil, false, err } + oldService := oldObj.(*api.Service) obj, err := objInfo.UpdatedObject(ctx, oldService) if err != nil { @@ -331,7 +358,7 @@ func (rs *REST) Update(ctx genericapirequest.Context, name string, objInfo rest. // Update service from ExternalName to non-ExternalName, should initialize ClusterIP. if oldService.Spec.Type == api.ServiceTypeExternalName && service.Spec.Type != api.ServiceTypeExternalName { - if releaseServiceIP, err = rs.initClusterIP(service); err != nil { + if releaseServiceIP, err = initClusterIP(service, rs.serviceIPs); err != nil { return nil, false, err } } @@ -344,11 +371,11 @@ func (rs *REST) Update(ctx genericapirequest.Context, name string, objInfo rest. // Update service from NodePort or LoadBalancer to ExternalName or ClusterIP, should release NodePort if exists. if (oldService.Spec.Type == api.ServiceTypeNodePort || oldService.Spec.Type == api.ServiceTypeLoadBalancer) && (service.Spec.Type == api.ServiceTypeExternalName || service.Spec.Type == api.ServiceTypeClusterIP) { - rs.releaseNodePorts(oldService, nodePortOp) + releaseNodePorts(oldService, nodePortOp) } // Update service from any type to NodePort or LoadBalancer, should update NodePort. if service.Spec.Type == api.ServiceTypeNodePort || service.Spec.Type == api.ServiceTypeLoadBalancer { - if err := rs.updateNodePorts(oldService, service, nodePortOp); err != nil { + if err := updateNodePorts(oldService, service, nodePortOp); err != nil { return nil, false, err } } @@ -368,18 +395,18 @@ func (rs *REST) Update(ctx genericapirequest.Context, name string, objInfo rest. return nil, false, errors.NewInvalid(api.Kind("Service"), service.Name, errs) } - out, err := rs.registry.UpdateService(ctx, service, createValidation, updateValidation) + out, created, err := rs.services.Update(ctx, service.Name, rest.DefaultUpdatedObjectInfo(service), createValidation, updateValidation) if err == nil { el := nodePortOp.Commit() if el != nil { // problems should be fixed by an eventual reconciliation / restart - glog.Errorf("error(s) committing NodePorts changes: %v", el) + utilruntime.HandleError(fmt.Errorf("error(s) committing NodePorts changes: %v", el)) } releaseServiceIP = false } - return out, false, err + return out, created, err } // Implement Redirector. @@ -395,10 +422,11 @@ func (rs *REST) ResourceLocation(ctx genericapirequest.Context, id string) (*url // If a port *number* was specified, find the corresponding service port name if portNum, err := strconv.ParseInt(portStr, 10, 64); err == nil { - svc, err := rs.registry.GetService(ctx, svcName, &metav1.GetOptions{}) + obj, err := rs.services.Get(ctx, svcName, &metav1.GetOptions{}) if err != nil { return nil, nil, err } + svc := obj.(*api.Service) found := false for _, svcPort := range svc.Spec.Ports { if int64(svcPort.Port) == portNum { @@ -413,10 +441,11 @@ func (rs *REST) ResourceLocation(ctx genericapirequest.Context, id string) (*url } } - eps, err := rs.endpoints.GetEndpoints(ctx, svcName, &metav1.GetOptions{}) + obj, err := rs.endpoints.Get(ctx, svcName, &metav1.GetOptions{}) if err != nil { return nil, nil, err } + eps := obj.(*api.Endpoints) if len(eps.Subsets) == 0 { return nil, nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", svcName)) } @@ -439,7 +468,7 @@ func (rs *REST) ResourceLocation(ctx genericapirequest.Context, id string) (*url addr := ss.Addresses[(addrSeed+try)%len(ss.Addresses)] if !utilfeature.DefaultFeatureGate.Enabled(features.ServiceProxyAllowExternalIPs) { if err := isValidAddress(ctx, &addr, rs.pods); err != nil { - glog.Errorf("Address %v isn't valid (%v)", addr, err) + utilruntime.HandleError(fmt.Errorf("Address %v isn't valid (%v)", addr, err)) continue } } @@ -450,14 +479,18 @@ func (rs *REST) ResourceLocation(ctx genericapirequest.Context, id string) (*url Host: net.JoinHostPort(ip, strconv.Itoa(port)), }, rs.proxyTransport, nil } - glog.Errorf("Failed to find a valid address, skipping subset: %v", ss) + utilruntime.HandleError(fmt.Errorf("Failed to find a valid address, skipping subset: %v", ss)) } } } return nil, nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", id)) } -func isValidAddress(ctx genericapirequest.Context, addr *api.EndpointAddress, pods *podstore.REST) error { +func (r *REST) ConvertToTable(ctx genericapirequest.Context, object runtime.Object, tableOptions runtime.Object) (*metav1beta1.Table, error) { + return r.services.ConvertToTable(ctx, object, tableOptions) +} + +func isValidAddress(ctx genericapirequest.Context, addr *api.EndpointAddress, pods rest.Getter) error { if addr.TargetRef == nil { return fmt.Errorf("Address has no target ref, skipping: %v", addr) } @@ -503,17 +536,6 @@ func containsNodePort(serviceNodePorts []ServiceNodePort, serviceNodePort Servic return false } -func CollectServiceNodePorts(service *api.Service) []int { - servicePorts := []int{} - for i := range service.Spec.Ports { - servicePort := &service.Spec.Ports[i] - if servicePort.NodePort != 0 { - servicePorts = append(servicePorts, int(servicePort.NodePort)) - } - } - return servicePorts -} - // Loop through the service ports list, find one with the same port number and // NodePort specified, return this NodePort otherwise return 0. func findRequestedNodePort(port int, servicePorts []api.ServicePort) int { @@ -527,7 +549,7 @@ func findRequestedNodePort(port int, servicePorts []api.ServicePort) int { } // allocateHealthCheckNodePort allocates health check node port to service. -func (rs *REST) allocateHealthCheckNodePort(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) error { +func allocateHealthCheckNodePort(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) error { healthCheckNodePort := service.Spec.HealthCheckNodePort if healthCheckNodePort != 0 { // If the request has a health check nodePort in mind, attempt to reserve it. @@ -550,11 +572,11 @@ func (rs *REST) allocateHealthCheckNodePort(service *api.Service, nodePortOp *po } // The return bool value indicates if a cluster IP is allocated successfully. -func (rs *REST) initClusterIP(service *api.Service) (bool, error) { +func initClusterIP(service *api.Service, serviceIPs ipallocator.Interface) (bool, error) { switch { case service.Spec.ClusterIP == "": // Allocate next available. - ip, err := rs.serviceIPs.AllocateNext() + ip, err := serviceIPs.AllocateNext() if err != nil { // TODO: what error should be returned here? It's not a // field-level validation failure (the field is valid), and it's @@ -565,7 +587,7 @@ func (rs *REST) initClusterIP(service *api.Service) (bool, error) { return true, nil case service.Spec.ClusterIP != api.ClusterIPNone && service.Spec.ClusterIP != "": // Try to respect the requested IP. - if err := rs.serviceIPs.Allocate(net.ParseIP(service.Spec.ClusterIP)); err != nil { + if err := serviceIPs.Allocate(net.ParseIP(service.Spec.ClusterIP)); err != nil { // TODO: when validation becomes versioned, this gets more complicated. el := field.ErrorList{field.Invalid(field.NewPath("spec", "clusterIP"), service.Spec.ClusterIP, err.Error())} return false, errors.NewInvalid(api.Kind("Service"), service.Name, el) @@ -576,7 +598,7 @@ func (rs *REST) initClusterIP(service *api.Service) (bool, error) { return false, nil } -func (rs *REST) initNodePorts(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) error { +func initNodePorts(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) error { svcPortToNodePort := map[int]int{} for i := range service.Spec.Ports { servicePort := &service.Spec.Ports[i] @@ -625,8 +647,8 @@ func (rs *REST) initNodePorts(service *api.Service, nodePortOp *portallocator.Po return nil } -func (rs *REST) updateNodePorts(oldService, newService *api.Service, nodePortOp *portallocator.PortAllocationOperation) error { - oldNodePortsNumbers := CollectServiceNodePorts(oldService) +func updateNodePorts(oldService, newService *api.Service, nodePortOp *portallocator.PortAllocationOperation) error { + oldNodePortsNumbers := collectServiceNodePorts(oldService) newNodePorts := []ServiceNodePort{} portAllocated := map[int]bool{} @@ -659,7 +681,7 @@ func (rs *REST) updateNodePorts(oldService, newService *api.Service, nodePortOp newNodePorts = append(newNodePorts, nodePort) } - newNodePortsNumbers := CollectServiceNodePorts(newService) + newNodePortsNumbers := collectServiceNodePorts(newService) // The comparison loops are O(N^2), but we don't expect N to be huge // (there's a hard-limit at 2^16, because they're ports; and even 4 ports would be a lot) @@ -673,10 +695,21 @@ func (rs *REST) updateNodePorts(oldService, newService *api.Service, nodePortOp return nil } -func (rs *REST) releaseNodePorts(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) { - nodePorts := CollectServiceNodePorts(service) +func releaseNodePorts(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) { + nodePorts := collectServiceNodePorts(service) for _, nodePort := range nodePorts { nodePortOp.ReleaseDeferred(nodePort) } } + +func collectServiceNodePorts(service *api.Service) []int { + servicePorts := []int{} + for i := range service.Spec.Ports { + servicePort := &service.Spec.Ports[i] + if servicePort.NodePort != 0 { + servicePorts = append(servicePorts, int(servicePort.NodePort)) + } + } + return servicePorts +} diff --git a/pkg/registry/core/service/rest_test.go b/pkg/registry/core/service/storage/rest_test.go similarity index 90% rename from pkg/registry/core/service/rest_test.go rename to pkg/registry/core/service/storage/rest_test.go index 3a42aee9a1f..9fbb7f26fcb 100644 --- a/pkg/registry/core/service/rest_test.go +++ b/pkg/registry/core/service/storage/rest_test.go @@ -14,28 +14,33 @@ See the License for the specific language governing permissions and limitations under the License. */ -package service +package storage import ( - "testing" - "net" "reflect" "strings" + "testing" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" + metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/watch" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/generic" "k8s.io/apiserver/pkg/registry/rest" etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing" + "k8s.io/kubernetes/pkg/api/service" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/apis/core/helper" + endpointstore "k8s.io/kubernetes/pkg/registry/core/endpoint/storage" podstore "k8s.io/kubernetes/pkg/registry/core/pod/storage" "k8s.io/kubernetes/pkg/registry/core/service/ipallocator" "k8s.io/kubernetes/pkg/registry/core/service/portallocator" @@ -46,28 +51,121 @@ import ( // It is now testing mostly the same things as other resources but // in a completely different way. We should unify it. +type serviceStorage struct { + GottenID string + UpdatedID string + CreatedID string + DeletedID string + Created bool + DeletedImmediately bool + Service *api.Service + OldService *api.Service + ServiceList *api.ServiceList + Err error +} + +func (s *serviceStorage) Get(ctx genericapirequest.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { + s.GottenID = name + return s.Service, s.Err +} + +func (s *serviceStorage) GetService(ctx genericapirequest.Context, name string, options *metav1.GetOptions) (*api.Service, error) { + return s.Service, s.Err +} + +func (s *serviceStorage) NewList() runtime.Object { + panic("not implemented") +} + +func (s *serviceStorage) List(ctx genericapirequest.Context, options *metainternalversion.ListOptions) (runtime.Object, error) { + ns, _ := genericapirequest.NamespaceFrom(ctx) + + // Copy metadata from internal list into result + res := new(api.ServiceList) + res.TypeMeta = s.ServiceList.TypeMeta + res.ListMeta = s.ServiceList.ListMeta + + if ns != metav1.NamespaceAll { + for _, service := range s.ServiceList.Items { + if ns == service.Namespace { + res.Items = append(res.Items, service) + } + } + } else { + res.Items = append([]api.Service{}, s.ServiceList.Items...) + } + + return res, s.Err +} + +func (s *serviceStorage) New() runtime.Object { + panic("not implemented") +} + +func (s *serviceStorage) Create(ctx genericapirequest.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, includeUninitialized bool) (runtime.Object, error) { + svc := obj.(*api.Service) + s.CreatedID = obj.(metav1.Object).GetName() + s.Service = svc.DeepCopy() + + if s.ServiceList == nil { + s.ServiceList = &api.ServiceList{} + } + + s.ServiceList.Items = append(s.ServiceList.Items, *svc) + return svc, s.Err +} + +func (s *serviceStorage) Update(ctx genericapirequest.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc) (runtime.Object, bool, error) { + s.UpdatedID = name + obj, err := objInfo.UpdatedObject(ctx, s.OldService) + if err != nil { + return nil, false, err + } + s.Service = obj.(*api.Service) + return s.Service, s.Created, s.Err +} + +func (s *serviceStorage) Delete(ctx genericapirequest.Context, name string, options *metav1.DeleteOptions) (runtime.Object, bool, error) { + s.DeletedID = name + return s.Service, s.DeletedImmediately, s.Err +} + +func (s *serviceStorage) DeleteCollection(ctx genericapirequest.Context, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) (runtime.Object, error) { + panic("not implemented") +} + +func (s *serviceStorage) Watch(ctx genericapirequest.Context, options *metainternalversion.ListOptions) (watch.Interface, error) { + panic("not implemented") +} + +func (s *serviceStorage) ConvertToTable(ctx genericapirequest.Context, object runtime.Object, tableOptions runtime.Object) (*metav1beta1.Table, error) { + panic("not implemented") +} + +func (s *serviceStorage) Export(ctx genericapirequest.Context, name string, opts metav1.ExportOptions) (runtime.Object, error) { + panic("not implemented") +} + func generateRandomNodePort() int32 { return int32(rand.IntnRange(30001, 30999)) } -func NewTestREST(t *testing.T, endpoints *api.EndpointsList) (*REST, *registrytest.ServiceRegistry, *etcdtesting.EtcdTestServer) { +func NewTestREST(t *testing.T, endpoints *api.EndpointsList) (*REST, *serviceStorage, *etcdtesting.EtcdTestServer) { return NewTestRESTWithPods(t, endpoints, nil) } -func NewTestRESTWithPods(t *testing.T, endpoints *api.EndpointsList, pods *api.PodList) (*REST, *registrytest.ServiceRegistry, *etcdtesting.EtcdTestServer) { - registry := registrytest.NewServiceRegistry() - endpointRegistry := ®istrytest.EndpointRegistry{ - Endpoints: endpoints, - } +func NewTestRESTWithPods(t *testing.T, endpoints *api.EndpointsList, pods *api.PodList) (*REST, *serviceStorage, *etcdtesting.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, "") - restOptions := generic.RESTOptions{ + + serviceStorage := &serviceStorage{} + + podStorage := podstore.NewStorage(generic.RESTOptions{ StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 3, ResourcePrefix: "pods", - } - podStorage := podstore.NewStorage(restOptions, nil, nil, nil) - if pods != nil && pods.Items != nil { + }, nil, nil, nil) + if pods != nil && len(pods.Items) > 0 { ctx := genericapirequest.NewDefaultContext() for ix := range pods.Items { key, _ := podStorage.Pod.KeyFunc(ctx, pods.Items[ix].Name) @@ -76,14 +174,29 @@ func NewTestRESTWithPods(t *testing.T, endpoints *api.EndpointsList, pods *api.P } } } + endpointStorage := endpointstore.NewREST(generic.RESTOptions{ + StorageConfig: etcdStorage, + Decorator: generic.UndecoratedStorage, + ResourcePrefix: "endpoints", + }) + if endpoints != nil && len(endpoints.Items) > 0 { + ctx := genericapirequest.NewDefaultContext() + for ix := range endpoints.Items { + key, _ := endpointStorage.KeyFunc(ctx, endpoints.Items[ix].Name) + if err := endpointStorage.Store.Storage.Create(ctx, key, &endpoints.Items[ix], nil, 0); err != nil { + t.Fatalf("Couldn't create endpoint: %v", err) + } + } + } + r := ipallocator.NewCIDRRange(makeIPNet(t)) portRange := utilnet.PortRange{Base: 30000, Size: 1000} portAllocator := portallocator.NewPortAllocator(portRange) - storage := NewStorage(registry, endpointRegistry, podStorage.Pod, r, portAllocator, nil) + rest, _ := NewREST(serviceStorage, endpointStorage, podStorage.Pod, r, portAllocator, nil) - return storage.Service, registry, server + return rest, serviceStorage, server } func makeIPNet(t *testing.T) *net.IPNet { @@ -94,15 +207,16 @@ func makeIPNet(t *testing.T) *net.IPNet { return net } -func releaseServiceNodePorts(t *testing.T, ctx genericapirequest.Context, svcName string, rest *REST, registry *registrytest.ServiceRegistry) { - srv, err := registry.GetService(ctx, svcName, &metav1.GetOptions{}) +func releaseServiceNodePorts(t *testing.T, ctx genericapirequest.Context, svcName string, rest *REST, registry ServiceStorage) { + obj, err := registry.Get(ctx, svcName, &metav1.GetOptions{}) if err != nil { t.Errorf("Unexpected error: %v", err) } + srv := obj.(*api.Service) if srv == nil { t.Fatalf("Failed to find service: %s", svcName) } - serviceNodePorts := CollectServiceNodePorts(srv) + serviceNodePorts := collectServiceNodePorts(srv) if len(serviceNodePorts) == 0 { t.Errorf("Failed to find NodePorts of service : %s", srv.Name) } @@ -271,7 +385,7 @@ func TestServiceRegistryCreateMultiNodePortsService(t *testing.T) { if created_service.Name != test.name { t.Errorf("Expected %s, but got %s", test.name, created_service.Name) } - serviceNodePorts := CollectServiceNodePorts(created_service) + serviceNodePorts := collectServiceNodePorts(created_service) if !reflect.DeepEqual(serviceNodePorts, test.expectNodePorts) { t.Errorf("Expected %v, but got %v", test.expectNodePorts, serviceNodePorts) } @@ -348,7 +462,7 @@ func TestServiceRegistryUpdate(t *testing.T) { storage, registry, server := NewTestREST(t, nil) defer server.Terminate(t) - svc, err := registry.CreateService(ctx, &api.Service{ + obj, err := registry.Create(ctx, &api.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1", Namespace: metav1.NamespaceDefault}, Spec: api.ServiceSpec{ Selector: map[string]string{"bar": "baz1"}, @@ -358,8 +472,8 @@ func TestServiceRegistryUpdate(t *testing.T) { TargetPort: intstr.FromInt(6502), }}, }, - }, rest.ValidateAllObjectFunc) - + }, rest.ValidateAllObjectFunc, false) + svc := obj.(*api.Service) if err != nil { t.Fatalf("Expected no error: %v", err) } @@ -400,7 +514,7 @@ func TestServiceStorageValidatesUpdate(t *testing.T) { ctx := genericapirequest.NewDefaultContext() storage, registry, server := NewTestREST(t, nil) defer server.Terminate(t) - registry.CreateService(ctx, &api.Service{ + registry.Create(ctx, &api.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{ Selector: map[string]string{"bar": "baz"}, @@ -409,7 +523,7 @@ func TestServiceStorageValidatesUpdate(t *testing.T) { Protocol: api.ProtocolTCP, }}, }, - }, rest.ValidateAllObjectFunc) + }, rest.ValidateAllObjectFunc, false) failureCases := map[string]api.Service{ "empty ID": { ObjectMeta: metav1.ObjectMeta{Name: ""}, @@ -477,7 +591,7 @@ func TestServiceRegistryExternalService(t *testing.T) { if srv == nil { t.Fatalf("Failed to find service: %s", svc.Name) } - serviceNodePorts := CollectServiceNodePorts(srv) + serviceNodePorts := collectServiceNodePorts(srv) if len(serviceNodePorts) == 0 { t.Errorf("Failed to find NodePorts of service : %s", srv.Name) } @@ -504,8 +618,8 @@ func TestServiceRegistryDelete(t *testing.T) { }}, }, } - registry.CreateService(ctx, svc, rest.ValidateAllObjectFunc) - storage.Delete(ctx, svc.Name) + registry.Create(ctx, svc, rest.ValidateAllObjectFunc, false) + storage.Delete(ctx, svc.Name, &metav1.DeleteOptions{}) if e, a := "foo", registry.DeletedID; e != a { t.Errorf("Expected %v, but got %v", e, a) } @@ -527,8 +641,8 @@ func TestServiceRegistryDeleteExternal(t *testing.T) { }}, }, } - registry.CreateService(ctx, svc, rest.ValidateAllObjectFunc) - storage.Delete(ctx, svc.Name) + registry.Create(ctx, svc, rest.ValidateAllObjectFunc, false) + storage.Delete(ctx, svc.Name, &metav1.DeleteOptions{}) if e, a := "foo", registry.DeletedID; e != a { t.Errorf("Expected %v, but got %v", e, a) } @@ -615,12 +729,12 @@ func TestServiceRegistryGet(t *testing.T) { ctx := genericapirequest.NewDefaultContext() storage, registry, server := NewTestREST(t, nil) defer server.Terminate(t) - registry.CreateService(ctx, &api.Service{ + registry.Create(ctx, &api.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{ Selector: map[string]string{"bar": "baz"}, }, - }, rest.ValidateAllObjectFunc) + }, rest.ValidateAllObjectFunc, false) storage.Get(ctx, "foo", &metav1.GetOptions{}) if e, a := "foo", registry.GottenID; e != a { t.Errorf("Expected %v, but got %v", e, a) @@ -655,22 +769,6 @@ func TestServiceRegistryResourceLocation(t *testing.T) { Ports: []api.EndpointPort{{Name: "", Port: 80}, {Name: "p", Port: 93}}, }}, }, - { - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: metav1.NamespaceDefault, - }, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{}, - Ports: []api.EndpointPort{{Name: "", Port: 80}, {Name: "p", Port: 93}}, - }, { - Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Name: "foo", Namespace: metav1.NamespaceDefault}}}, - Ports: []api.EndpointPort{{Name: "", Port: 80}, {Name: "p", Port: 93}}, - }, { - Addresses: []api.EndpointAddress{{IP: "1.2.3.5", TargetRef: &api.ObjectReference{Name: "bar", Namespace: metav1.NamespaceDefault}}}, - Ports: []api.EndpointPort{}, - }}, - }, }, } pods := &api.PodList{ @@ -708,7 +806,7 @@ func TestServiceRegistryResourceLocation(t *testing.T) { storage, registry, server := NewTestRESTWithPods(t, endpoints, pods) defer server.Terminate(t) for _, name := range []string{"foo", "bad"} { - registry.CreateService(ctx, &api.Service{ + registry.Create(ctx, &api.Service{ ObjectMeta: metav1.ObjectMeta{Name: name}, Spec: api.ServiceSpec{ Selector: map[string]string{"bar": "baz"}, @@ -721,7 +819,7 @@ func TestServiceRegistryResourceLocation(t *testing.T) { {Name: "", Port: 93, TargetPort: intstr.FromInt(80)}, }, }, - }, rest.ValidateAllObjectFunc) + }, rest.ValidateAllObjectFunc, false) } redirector := rest.Redirector(storage) @@ -807,19 +905,19 @@ func TestServiceRegistryList(t *testing.T) { ctx := genericapirequest.NewDefaultContext() storage, registry, server := NewTestREST(t, nil) defer server.Terminate(t) - registry.CreateService(ctx, &api.Service{ + registry.Create(ctx, &api.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: metav1.NamespaceDefault}, Spec: api.ServiceSpec{ Selector: map[string]string{"bar": "baz"}, }, - }, rest.ValidateAllObjectFunc) - registry.CreateService(ctx, &api.Service{ + }, rest.ValidateAllObjectFunc, false) + registry.Create(ctx, &api.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo2", Namespace: metav1.NamespaceDefault}, Spec: api.ServiceSpec{ Selector: map[string]string{"bar2": "baz2"}, }, - }, rest.ValidateAllObjectFunc) - registry.List.ResourceVersion = "1" + }, rest.ValidateAllObjectFunc, false) + registry.ServiceList.ResourceVersion = "1" s, _ := storage.List(ctx, nil) sl := s.(*api.ServiceList) if len(sl.Items) != 2 { @@ -946,7 +1044,7 @@ func TestServiceRegistryIPReallocation(t *testing.T) { t.Errorf("Unexpected ClusterIP: %s", created_service_1.Spec.ClusterIP) } - _, err := storage.Delete(ctx, created_service_1.Name) + _, _, err := storage.Delete(ctx, created_service_1.Name, &metav1.DeleteOptions{}) if err != nil { t.Errorf("Unexpected error deleting service: %v", err) } @@ -1303,7 +1401,7 @@ func TestInitClusterIP(t *testing.T) { } for _, test := range testCases { - hasAllocatedIP, err := storage.initClusterIP(test.svc) + hasAllocatedIP, err := initClusterIP(test.svc, storage.serviceIPs) if err != nil { t.Errorf("%q: unexpected error: %v", test.name, err) } @@ -1488,13 +1586,13 @@ func TestInitNodePorts(t *testing.T) { } for _, test := range testCases { - err := storage.initNodePorts(test.service, nodePortOp) + err := initNodePorts(test.service, nodePortOp) if err != nil { t.Errorf("%q: unexpected error: %v", test.name, err) continue } - serviceNodePorts := CollectServiceNodePorts(test.service) + serviceNodePorts := collectServiceNodePorts(test.service) if len(test.expectSpecifiedNodePorts) == 0 { for _, nodePort := range serviceNodePorts { if !storage.serviceNodePorts.Has(nodePort) { @@ -1758,13 +1856,13 @@ func TestUpdateNodePorts(t *testing.T) { } for _, test := range testCases { - err := storage.updateNodePorts(test.oldService, test.newService, nodePortOp) + err := updateNodePorts(test.oldService, test.newService, nodePortOp) if err != nil { t.Errorf("%q: unexpected error: %v", test.name, err) continue } - serviceNodePorts := CollectServiceNodePorts(test.newService) + serviceNodePorts := collectServiceNodePorts(test.newService) if len(test.expectSpecifiedNodePorts) == 0 { for _, nodePort := range serviceNodePorts { if !storage.serviceNodePorts.Has(nodePort) { diff --git a/pkg/registry/core/service/storage/storage.go b/pkg/registry/core/service/storage/storage.go index 0d492b7e881..131c7c2ba1d 100644 --- a/pkg/registry/core/service/storage/storage.go +++ b/pkg/registry/core/service/storage/storage.go @@ -30,16 +30,17 @@ import ( "k8s.io/kubernetes/pkg/registry/core/service" ) -type REST struct { +type GenericREST struct { *genericregistry.Store } // NewREST returns a RESTStorage object that will work against services. -func NewREST(optsGetter generic.RESTOptionsGetter) (*REST, *StatusREST) { +func NewGenericREST(optsGetter generic.RESTOptionsGetter) (*GenericREST, *StatusREST) { store := &genericregistry.Store{ NewFunc: func() runtime.Object { return &api.Service{} }, NewListFunc: func() runtime.Object { return &api.ServiceList{} }, DefaultQualifiedResource: api.Resource("services"), + ReturnDeletedObject: true, CreateStrategy: service.Strategy, UpdateStrategy: service.Strategy, @@ -55,26 +56,25 @@ func NewREST(optsGetter generic.RESTOptionsGetter) (*REST, *StatusREST) { statusStore := *store statusStore.UpdateStrategy = service.StatusStrategy - return &REST{store}, &StatusREST{store: &statusStore} + return &GenericREST{store}, &StatusREST{store: &statusStore} } -// Implement ShortNamesProvider -var _ rest.ShortNamesProvider = &REST{} +var ( + _ rest.ShortNamesProvider = &GenericREST{} + _ rest.CategoriesProvider = &GenericREST{} +) // ShortNames implements the ShortNamesProvider interface. Returns a list of short names for a resource. -func (r *REST) ShortNames() []string { +func (r *GenericREST) ShortNames() []string { return []string{"svc"} } -// Implement CategoriesProvider -var _ rest.CategoriesProvider = &REST{} - // Categories implements the CategoriesProvider interface. Returns a list of categories a resource is part of. -func (r *REST) Categories() []string { +func (r *GenericREST) Categories() []string { return []string{"all"} } -// StatusREST implements the REST endpoint for changing the status of a service. +// StatusREST implements the GenericREST endpoint for changing the status of a service. type StatusREST struct { store *genericregistry.Store } diff --git a/pkg/registry/core/service/storage/storage_test.go b/pkg/registry/core/service/storage/storage_test.go index 3964dad1360..99c31a4863e 100644 --- a/pkg/registry/core/service/storage/storage_test.go +++ b/pkg/registry/core/service/storage/storage_test.go @@ -31,7 +31,7 @@ import ( "k8s.io/kubernetes/pkg/registry/registrytest" ) -func newStorage(t *testing.T) (*REST, *StatusREST, *etcdtesting.EtcdTestServer) { +func newStorage(t *testing.T) (*GenericREST, *StatusREST, *etcdtesting.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, "") restOptions := generic.RESTOptions{ StorageConfig: etcdStorage, @@ -39,7 +39,7 @@ func newStorage(t *testing.T) (*REST, *StatusREST, *etcdtesting.EtcdTestServer) DeleteCollectionWorkers: 1, ResourcePrefix: "services", } - serviceStorage, statusStorage := NewREST(restOptions) + serviceStorage, statusStorage := NewGenericREST(restOptions) return serviceStorage, statusStorage, server } @@ -125,7 +125,7 @@ func TestDelete(t *testing.T) { storage, _, server := newStorage(t) defer server.Terminate(t) defer storage.Store.DestroyFunc() - test := genericregistrytest.New(t, storage.Store).AllowCreateOnUpdate() + test := genericregistrytest.New(t, storage.Store).AllowCreateOnUpdate().ReturnDeletedObject() test.TestDelete(validService()) } diff --git a/pkg/registry/core/service/strategy_test.go b/pkg/registry/core/service/strategy_test.go index e5b4b4cc5d8..1c5ac1d4308 100644 --- a/pkg/registry/core/service/strategy_test.go +++ b/pkg/registry/core/service/strategy_test.go @@ -27,6 +27,7 @@ import ( genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" api "k8s.io/kubernetes/pkg/apis/core" + _ "k8s.io/kubernetes/pkg/apis/core/install" ) func TestExportService(t *testing.T) { @@ -111,17 +112,17 @@ func TestExportService(t *testing.T) { func TestCheckGeneratedNameError(t *testing.T) { expect := errors.NewNotFound(api.Resource("foos"), "bar") - if err := rest.CheckGeneratedNameError(Strategy, expect, &api.Pod{}); err != expect { + if err := rest.CheckGeneratedNameError(Strategy, expect, &api.Service{}); err != expect { t.Errorf("NotFoundError should be ignored: %v", err) } expect = errors.NewAlreadyExists(api.Resource("foos"), "bar") - if err := rest.CheckGeneratedNameError(Strategy, expect, &api.Pod{}); err != expect { + if err := rest.CheckGeneratedNameError(Strategy, expect, &api.Service{}); err != expect { t.Errorf("AlreadyExists should be returned when no GenerateName field: %v", err) } expect = errors.NewAlreadyExists(api.Resource("foos"), "bar") - if err := rest.CheckGeneratedNameError(Strategy, expect, &api.Pod{ObjectMeta: metav1.ObjectMeta{GenerateName: "foo"}}); err == nil || !errors.IsServerTimeout(err) { + if err := rest.CheckGeneratedNameError(Strategy, expect, &api.Service{ObjectMeta: metav1.ObjectMeta{GenerateName: "foo"}}); err == nil || !errors.IsServerTimeout(err) { t.Errorf("expected try again later error: %v", err) } } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go index 0e92927082c..54172d70e9d 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go @@ -2839,65 +2839,6 @@ func TestDeleteWithOptionsQueryAndBody(t *testing.T) { } } -func TestLegacyDelete(t *testing.T) { - storage := map[string]rest.Storage{} - simpleStorage := SimpleRESTStorage{} - ID := "id" - storage["simple"] = LegacyRESTStorage{&simpleStorage} - var _ rest.Deleter = storage["simple"].(LegacyRESTStorage) - handler := handle(storage) - server := httptest.NewServer(handler) - defer server.Close() - - client := http.Client{} - request, err := http.NewRequest("DELETE", server.URL+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/default/simple/"+ID, nil) - res, err := client.Do(request) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if res.StatusCode != http.StatusOK { - t.Errorf("unexpected response: %#v", res) - } - if simpleStorage.deleted != ID { - t.Errorf("Unexpected delete: %s, expected %s", simpleStorage.deleted, ID) - } - if simpleStorage.deleteOptions != nil { - t.Errorf("unexpected delete options: %#v", simpleStorage.deleteOptions) - } -} - -func TestLegacyDeleteIgnoresOptions(t *testing.T) { - storage := map[string]rest.Storage{} - simpleStorage := SimpleRESTStorage{} - ID := "id" - storage["simple"] = LegacyRESTStorage{&simpleStorage} - handler := handle(storage) - server := httptest.NewServer(handler) - defer server.Close() - - item := metav1.NewDeleteOptions(300) - body, err := runtime.Encode(codec, item) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - client := http.Client{} - request, err := http.NewRequest("DELETE", server.URL+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/default/simple/"+ID, bytes.NewReader(body)) - res, err := client.Do(request) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if res.StatusCode != http.StatusOK { - t.Errorf("unexpected response: %#v", res) - } - if simpleStorage.deleted != ID { - t.Errorf("Unexpected delete: %s, expected %s", simpleStorage.deleted, ID) - } - if simpleStorage.deleteOptions != nil { - t.Errorf("unexpected delete options: %#v", simpleStorage.deleteOptions) - } -} - func TestDeleteInvokesAdmissionControl(t *testing.T) { // TODO: remove mutating deny when we removed it from the endpoint implementation and ported all plugins for _, admit := range []admission.Interface{alwaysMutatingDeny{}, alwaysValidatingDeny{}} { diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go b/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go index 90b8439b8b5..934ee94f727 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go @@ -227,7 +227,6 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag lister, isLister := storage.(rest.Lister) getter, isGetter := storage.(rest.Getter) getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions) - deleter, isDeleter := storage.(rest.Deleter) gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter) collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter) updater, isUpdater := storage.(rest.Updater) @@ -273,16 +272,12 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag var versionedDeleteOptions runtime.Object var versionedDeleterObject interface{} - switch { - case isGracefulDeleter: + if isGracefulDeleter { versionedDeleteOptions, err = a.group.Creater.New(optionsExternalVersion.WithKind("DeleteOptions")) if err != nil { return nil, err } versionedDeleterObject = indirectArbitraryPointer(versionedDeleteOptions) - isDeleter = true - case isDeleter: - gracefulDeleter = rest.GracefulDeleteAdapter{Deleter: deleter} } versionedStatusPtr, err := a.group.Creater.New(optionsExternalVersion.WithKind("Status")) @@ -416,7 +411,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag } actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer, false}, isUpdater) actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer, false}, isPatcher) - actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isDeleter) + actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isGracefulDeleter) actions = appendIf(actions, action{"WATCH", "watch/" + itemPath, nameParams, namer, false}, isWatcher) actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer, false}, isConnecter) actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", proxyParams, namer, false}, isConnecter && connectSubpath) @@ -462,7 +457,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag } actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer, false}, isUpdater) actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer, false}, isPatcher) - actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isDeleter) + actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isGracefulDeleter) actions = appendIf(actions, action{"WATCH", "watch/" + itemPath, nameParams, namer, false}, isWatcher) actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer, false}, isConnecter) actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", proxyParams, namer, false}, isConnecter && connectSubpath) diff --git a/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go b/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go index 8cf9689ac2a..a2b0930f38a 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go @@ -137,16 +137,6 @@ type TableConvertor interface { ConvertToTable(ctx genericapirequest.Context, object runtime.Object, tableOptions runtime.Object) (*metav1beta1.Table, error) } -// Deleter is an object that can delete a named RESTful resource. -type Deleter interface { - // Delete finds a resource in the storage and deletes it. - // Although it can return an arbitrary error value, IsNotFound(err) is true for the - // returned error value err when the specified resource is not found. - // Delete *may* return the object that was deleted, or a status object indicating additional - // information about deletion. - Delete(ctx genericapirequest.Context, name string) (runtime.Object, error) -} - // GracefulDeleter knows how to pass deletion options to allow delayed deletion of a // RESTful object. type GracefulDeleter interface { @@ -162,17 +152,6 @@ type GracefulDeleter interface { Delete(ctx genericapirequest.Context, name string, options *metav1.DeleteOptions) (runtime.Object, bool, error) } -// GracefulDeleteAdapter adapts the Deleter interface to GracefulDeleter -type GracefulDeleteAdapter struct { - Deleter -} - -// Delete implements RESTGracefulDeleter in terms of Deleter -func (w GracefulDeleteAdapter) Delete(ctx genericapirequest.Context, name string, options *metav1.DeleteOptions) (runtime.Object, bool, error) { - obj, err := w.Deleter.Delete(ctx, name) - return obj, true, err -} - // CollectionDeleter is an object that can delete a collection // of RESTful resources. type CollectionDeleter interface { diff --git a/test/e2e/apimachinery/table_conversion.go b/test/e2e/apimachinery/table_conversion.go index d1da1c4bcd7..a9a7c4289ce 100644 --- a/test/e2e/apimachinery/table_conversion.go +++ b/test/e2e/apimachinery/table_conversion.go @@ -24,11 +24,13 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + authorizationv1 "k8s.io/api/authorization/v1" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1" "k8s.io/client-go/util/workqueue" + "k8s.io/kubernetes/pkg/printers" "k8s.io/kubernetes/test/e2e/framework" imageutils "k8s.io/kubernetes/test/utils/image" @@ -142,7 +144,15 @@ var _ = SIGDescribe("Servers with support for Table transformation", func() { c := f.ClientSet table := &metav1beta1.Table{} - err := c.CoreV1().RESTClient().Get().Resource("services").SetHeader("Accept", "application/json;as=Table;v=v1beta1;g=meta.k8s.io").Do().Into(table) + sar := &authorizationv1.SelfSubjectAccessReview{ + Spec: authorizationv1.SelfSubjectAccessReviewSpec{ + NonResourceAttributes: &authorizationv1.NonResourceAttributes{ + Path: "/", + Verb: "get", + }, + }, + } + err := c.AuthorizationV1().RESTClient().Post().Resource("selfsubjectaccessreviews").SetHeader("Accept", "application/json;as=Table;v=v1beta1;g=meta.k8s.io").Body(sar).Do().Into(table) Expect(err).To(HaveOccurred()) Expect(err.(errors.APIStatus).Status().Code).To(Equal(int32(406))) })