Make Service storage a wrapper around other storages

The registry abstraction is unnecessary and adds direct coupling to the
core types. By using a wrapper, we carry through the default
implementations of the non-mutating operations. The DeleteCollection
method is explicitly patched out since it cannot be correctly
implemented on the storage currently.

As a result, TableConvertor is now exposed.

A few other minor refactorings

* Corrected the case of some variables
* Used functions instead of methods for several helper methods
* Removed the legacy Deleter - service was the only remaining consumer
This commit is contained in:
Clayton Coleman 2018-02-04 22:38:39 -05:00
parent da564ef4fb
commit 110b064d63
No known key found for this signature in database
GPG Key ID: 3D16906B4F1C5CB3
12 changed files with 349 additions and 286 deletions

View File

@ -44,7 +44,6 @@ import (
"k8s.io/kubernetes/pkg/master/ports"
"k8s.io/kubernetes/pkg/registry/core/componentstatus"
configmapstore "k8s.io/kubernetes/pkg/registry/core/configmap/storage"
"k8s.io/kubernetes/pkg/registry/core/endpoint"
endpointsstore "k8s.io/kubernetes/pkg/registry/core/endpoint/storage"
eventstore "k8s.io/kubernetes/pkg/registry/core/event/storage"
limitrangestore "k8s.io/kubernetes/pkg/registry/core/limitrange/storage"
@ -58,7 +57,6 @@ import (
controllerstore "k8s.io/kubernetes/pkg/registry/core/replicationcontroller/storage"
resourcequotastore "k8s.io/kubernetes/pkg/registry/core/resourcequota/storage"
secretstore "k8s.io/kubernetes/pkg/registry/core/secret/storage"
"k8s.io/kubernetes/pkg/registry/core/service"
"k8s.io/kubernetes/pkg/registry/core/service/allocator"
serviceallocator "k8s.io/kubernetes/pkg/registry/core/service/allocator/storage"
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
@ -127,7 +125,6 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi
namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespacestore.NewREST(restOptionsGetter)
endpointsStorage := endpointsstore.NewREST(restOptionsGetter)
endpointRegistry := endpoint.NewRegistry(endpointsStorage)
nodeStorage, err := nodestore.NewStorage(restOptionsGetter, c.KubeletClientConfig, c.ProxyTransport)
if err != nil {
@ -148,8 +145,7 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi
serviceAccountStorage = serviceaccountstore.NewREST(restOptionsGetter, nil, nil, nil)
}
serviceRESTStorage, serviceStatusStorage := servicestore.NewREST(restOptionsGetter)
serviceRegistry := service.NewRegistry(serviceRESTStorage)
serviceRESTStorage, serviceStatusStorage := servicestore.NewGenericREST(restOptionsGetter)
var serviceClusterIPRegistry rangeallocation.RangeRegistry
serviceClusterIPRange := c.ServiceIPRange
@ -162,7 +158,7 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}
ServiceClusterIPAllocator := ipallocator.NewAllocatorCIDRRange(&serviceClusterIPRange, func(max int, rangeSpec string) allocator.Interface {
serviceClusterIPAllocator := ipallocator.NewAllocatorCIDRRange(&serviceClusterIPRange, func(max int, rangeSpec string) allocator.Interface {
mem := allocator.NewAllocationMap(max, rangeSpec)
// TODO etcdallocator package to return a storage interface via the storageFactory
etcd := serviceallocator.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), serviceStorageConfig)
@ -172,7 +168,7 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi
restStorage.ServiceClusterIPAllocator = serviceClusterIPRegistry
var serviceNodePortRegistry rangeallocation.RangeRegistry
ServiceNodePortAllocator := portallocator.NewPortAllocatorCustom(c.ServiceNodePortRange, func(max int, rangeSpec string) allocator.Interface {
serviceNodePortAllocator := portallocator.NewPortAllocatorCustom(c.ServiceNodePortRange, func(max int, rangeSpec string) allocator.Interface {
mem := allocator.NewAllocationMap(max, rangeSpec)
// TODO etcdallocator package to return a storage interface via the storageFactory
etcd := serviceallocator.NewEtcd(mem, "/ranges/servicenodeports", api.Resource("servicenodeportallocations"), serviceStorageConfig)
@ -183,7 +179,7 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi
controllerStorage := controllerstore.NewStorage(restOptionsGetter)
serviceRest := service.NewStorage(serviceRegistry, endpointRegistry, podStorage.Pod, ServiceClusterIPAllocator, ServiceNodePortAllocator, c.ProxyTransport)
serviceRest, serviceRestProxy := servicestore.NewREST(serviceRESTStorage, endpointsStorage, podStorage.Pod, serviceClusterIPAllocator, serviceNodePortAllocator, c.ProxyTransport)
restStorageMap := map[string]rest.Storage{
"pods": podStorage.Pod,
@ -201,8 +197,8 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi
"replicationControllers": controllerStorage.Controller,
"replicationControllers/status": controllerStorage.Status,
"services": serviceRest.Service,
"services/proxy": serviceRest.Proxy,
"services": serviceRest,
"services/proxy": serviceRestProxy,
"services/status": serviceStatusStorage,
"endpoints": endpointsStorage,

View File

@ -32,7 +32,6 @@ import (
api "k8s.io/kubernetes/pkg/apis/core"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
"k8s.io/kubernetes/pkg/registry/core/rangeallocation"
"k8s.io/kubernetes/pkg/registry/core/service"
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
)
@ -126,7 +125,7 @@ func (c *Repair) runOnce() error {
// Check every Service's ports, and rebuild the state as we think it should be.
for i := range list.Items {
svc := &list.Items[i]
ports := service.CollectServiceNodePorts(svc)
ports := collectServiceNodePorts(svc)
if len(ports) == 0 {
continue
}
@ -196,3 +195,14 @@ func (c *Repair) runOnce() error {
}
return nil
}
func collectServiceNodePorts(service *api.Service) []int {
servicePorts := []int{}
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
if servicePort.NodePort != 0 {
servicePorts = append(servicePorts, int(servicePort.NodePort))
}
}
return servicePorts
}

View File

@ -32,7 +32,7 @@ import (
// ProxyREST implements the proxy subresource for a Service
type ProxyREST struct {
ServiceRest *REST
Redirector rest.Redirector
ProxyTransport http.RoundTripper
}
@ -62,7 +62,7 @@ func (r *ProxyREST) Connect(ctx genericapirequest.Context, id string, opts runti
if !ok {
return nil, fmt.Errorf("Invalid options object: %#v", opts)
}
location, transport, err := r.ServiceRest.ResourceLocation(ctx, id)
location, transport, err := r.Redirector.ResourceLocation(ctx, id)
if err != nil {
return nil, err
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package service
package storage
import (
"fmt"
@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
utilnet "k8s.io/apimachinery/pkg/util/net"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@ -36,31 +37,25 @@ import (
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
utilfeature "k8s.io/apiserver/pkg/util/feature"
apiservice "k8s.io/kubernetes/pkg/api/service"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/core/helper"
"k8s.io/kubernetes/pkg/apis/core/validation"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/registry/core/endpoint"
podstore "k8s.io/kubernetes/pkg/registry/core/pod/storage"
registry "k8s.io/kubernetes/pkg/registry/core/service"
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
)
// ServiceRest includes storage for services and all sub resources
type ServiceRest struct {
Service *REST
Proxy *ProxyREST
}
// REST adapts a service registry into apiserver's RESTStorage model.
type REST struct {
registry Registry
endpoints endpoint.Registry
services ServiceStorage
endpoints EndpointsStorage
serviceIPs ipallocator.Interface
serviceNodePorts portallocator.Interface
proxyTransport http.RoundTripper
pods *podstore.REST
pods rest.Getter
}
// ServiceNodePort includes protocol and port number of a service NodePort.
@ -73,23 +68,50 @@ type ServiceNodePort struct {
NodePort int32
}
// NewStorage returns a new REST.
func NewStorage(registry Registry, endpoints endpoint.Registry, pods *podstore.REST, serviceIPs ipallocator.Interface,
serviceNodePorts portallocator.Interface, proxyTransport http.RoundTripper) *ServiceRest {
type ServiceStorage interface {
rest.Getter
rest.Lister
rest.CreaterUpdater
rest.GracefulDeleter
rest.Watcher
rest.TableConvertor
rest.Exporter
}
type EndpointsStorage interface {
rest.Getter
rest.GracefulDeleter
}
// NewREST returns a wrapper around the underlying generic storage and performs
// allocations and deallocations of various service related resources like ports.
// TODO: all transactional behavior should be supported from within generic storage
// or the strategy.
func NewREST(
services ServiceStorage,
endpoints EndpointsStorage,
pods rest.Getter,
serviceIPs ipallocator.Interface,
serviceNodePorts portallocator.Interface,
proxyTransport http.RoundTripper,
) (*REST, *registry.ProxyREST) {
rest := &REST{
registry: registry,
services: services,
endpoints: endpoints,
serviceIPs: serviceIPs,
serviceNodePorts: serviceNodePorts,
proxyTransport: proxyTransport,
pods: pods,
}
return &ServiceRest{
Service: rest,
Proxy: &ProxyREST{ServiceRest: rest, ProxyTransport: proxyTransport},
}
return rest, &registry.ProxyREST{Redirector: rest, ProxyTransport: proxyTransport}
}
var (
_ ServiceStorage = &REST{}
_ rest.CategoriesProvider = &REST{}
_ rest.ShortNamesProvider = &REST{}
)
// ShortNames implements the ShortNamesProvider interface. Returns a list of short names for a resource.
func (rs *REST) ShortNames() []string {
return []string{"svc"}
@ -100,11 +122,34 @@ func (rs *REST) Categories() []string {
return []string{"all"}
}
// TODO: implement includeUninitialized by refactoring this to move to store
func (rs *REST) New() runtime.Object {
return rs.services.New()
}
func (rs *REST) NewList() runtime.Object {
return rs.services.NewList()
}
func (rs *REST) Get(ctx genericapirequest.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
return rs.services.Get(ctx, name, options)
}
func (rs *REST) List(ctx genericapirequest.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
return rs.services.List(ctx, options)
}
func (rs *REST) Watch(ctx genericapirequest.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {
return rs.services.Watch(ctx, options)
}
func (rs *REST) Export(ctx genericapirequest.Context, name string, opts metav1.ExportOptions) (runtime.Object, error) {
return rs.services.Export(ctx, name, opts)
}
func (rs *REST) Create(ctx genericapirequest.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, includeUninitialized bool) (runtime.Object, error) {
service := obj.(*api.Service)
if err := rest.BeforeCreate(Strategy, ctx, obj); err != nil {
if err := rest.BeforeCreate(registry.Strategy, ctx, obj); err != nil {
return nil, err
}
@ -120,7 +165,7 @@ func (rs *REST) Create(ctx genericapirequest.Context, obj runtime.Object, create
var err error
if service.Spec.Type != api.ServiceTypeExternalName {
if releaseServiceIP, err = rs.initClusterIP(service); err != nil {
if releaseServiceIP, err = initClusterIP(service, rs.serviceIPs); err != nil {
return nil, err
}
}
@ -129,14 +174,14 @@ func (rs *REST) Create(ctx genericapirequest.Context, obj runtime.Object, create
defer nodePortOp.Finish()
if service.Spec.Type == api.ServiceTypeNodePort || service.Spec.Type == api.ServiceTypeLoadBalancer {
if err := rs.initNodePorts(service, nodePortOp); err != nil {
if err := initNodePorts(service, nodePortOp); err != nil {
return nil, err
}
}
// Handle ExternalTraffic related fields during service creation.
if apiservice.NeedsHealthCheck(service) {
if err := rs.allocateHealthCheckNodePort(service, nodePortOp); err != nil {
if err := allocateHealthCheckNodePort(service, nodePortOp); err != nil {
return nil, errors.NewInternalError(err)
}
}
@ -144,16 +189,16 @@ func (rs *REST) Create(ctx genericapirequest.Context, obj runtime.Object, create
return nil, errors.NewInvalid(api.Kind("Service"), service.Name, errs)
}
out, err := rs.registry.CreateService(ctx, service, createValidation)
out, err := rs.services.Create(ctx, service, createValidation, includeUninitialized)
if err != nil {
err = rest.CheckGeneratedNameError(Strategy, err, service)
err = rest.CheckGeneratedNameError(registry.Strategy, err, service)
}
if err == nil {
el := nodePortOp.Commit()
if el != nil {
// these should be caught by an eventual reconciliation / restart
glog.Errorf("error(s) committing service node-ports changes: %v", el)
utilruntime.HandleError(fmt.Errorf("error(s) committing service node-ports changes: %v", el))
}
releaseServiceIP = false
@ -162,75 +207,56 @@ func (rs *REST) Create(ctx genericapirequest.Context, obj runtime.Object, create
return out, err
}
func (rs *REST) Delete(ctx genericapirequest.Context, id string) (runtime.Object, error) {
service, err := rs.registry.GetService(ctx, id, &metav1.GetOptions{})
func (rs *REST) Delete(ctx genericapirequest.Context, id string, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
// TODO: handle graceful
obj, _, err := rs.services.Delete(ctx, id, options)
if err != nil {
return nil, err
return nil, false, err
}
err = rs.registry.DeleteService(ctx, id)
if err != nil {
return nil, err
}
svc := obj.(*api.Service)
// TODO: can leave dangling endpoints, and potentially return incorrect
// endpoints if a new service is created with the same name
err = rs.endpoints.DeleteEndpoints(ctx, id)
_, _, err = rs.endpoints.Delete(ctx, id, &metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
return nil, err
return nil, false, err
}
if helper.IsServiceIPSet(service) {
rs.serviceIPs.Release(net.ParseIP(service.Spec.ClusterIP))
if helper.IsServiceIPSet(svc) {
rs.serviceIPs.Release(net.ParseIP(svc.Spec.ClusterIP))
}
for _, nodePort := range CollectServiceNodePorts(service) {
for _, nodePort := range collectServiceNodePorts(svc) {
err := rs.serviceNodePorts.Release(nodePort)
if err != nil {
// these should be caught by an eventual reconciliation / restart
glog.Errorf("Error releasing service %s node port %d: %v", service.Name, nodePort, err)
utilruntime.HandleError(fmt.Errorf("Error releasing service %s node port %d: %v", svc.Name, nodePort, err))
}
}
if apiservice.NeedsHealthCheck(service) {
nodePort := service.Spec.HealthCheckNodePort
if apiservice.NeedsHealthCheck(svc) {
nodePort := svc.Spec.HealthCheckNodePort
if nodePort > 0 {
err := rs.serviceNodePorts.Release(int(nodePort))
if err != nil {
// these should be caught by an eventual reconciliation / restart
utilruntime.HandleError(fmt.Errorf("Error releasing service %s health check node port %d: %v", service.Name, nodePort, err))
utilruntime.HandleError(fmt.Errorf("Error releasing service %s health check node port %d: %v", svc.Name, nodePort, err))
}
}
}
return &metav1.Status{Status: metav1.StatusSuccess}, nil
}
func (rs *REST) Get(ctx genericapirequest.Context, id string, options *metav1.GetOptions) (runtime.Object, error) {
return rs.registry.GetService(ctx, id, options)
}
func (rs *REST) List(ctx genericapirequest.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
return rs.registry.ListServices(ctx, options)
}
// Watch returns Services events via a watch.Interface.
// It implements rest.Watcher.
func (rs *REST) Watch(ctx genericapirequest.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {
return rs.registry.WatchServices(ctx, options)
}
// Export returns Service stripped of cluster-specific information.
// It implements rest.Exporter.
func (rs *REST) Export(ctx genericapirequest.Context, name string, opts metav1.ExportOptions) (runtime.Object, error) {
return rs.registry.ExportService(ctx, name, opts)
}
func (*REST) New() runtime.Object {
return &api.Service{}
}
func (*REST) NewList() runtime.Object {
return &api.ServiceList{}
// TODO: this is duplicated from the generic storage, when this wrapper is fully removed we can drop this
details := &metav1.StatusDetails{
Name: svc.Name,
UID: svc.UID,
}
if info, ok := genericapirequest.RequestInfoFrom(ctx); ok {
details.Group = info.APIGroup
details.Kind = info.Resource // legacy behavior
}
status := &metav1.Status{Status: metav1.StatusSuccess, Details: details}
return status, true, nil
}
// externalTrafficPolicyUpdate adjusts ExternalTrafficPolicy during service update if needed.
@ -267,7 +293,7 @@ func (rs *REST) healthCheckNodePortUpdate(oldService, service *api.Service, node
// Insert health check node port into the service's HealthCheckNodePort field if needed.
case !neededHealthCheckNodePort && needsHealthCheckNodePort:
glog.Infof("Transition to LoadBalancer type service with ExternalTrafficPolicy=Local")
if err := rs.allocateHealthCheckNodePort(service, nodePortOp); err != nil {
if err := allocateHealthCheckNodePort(service, nodePortOp); err != nil {
return false, errors.NewInternalError(err)
}
@ -295,10 +321,11 @@ func (rs *REST) healthCheckNodePortUpdate(oldService, service *api.Service, node
}
func (rs *REST) Update(ctx genericapirequest.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc) (runtime.Object, bool, error) {
oldService, err := rs.registry.GetService(ctx, name, &metav1.GetOptions{})
oldObj, err := rs.services.Get(ctx, name, &metav1.GetOptions{})
if err != nil {
return nil, false, err
}
oldService := oldObj.(*api.Service)
obj, err := objInfo.UpdatedObject(ctx, oldService)
if err != nil {
@ -331,7 +358,7 @@ func (rs *REST) Update(ctx genericapirequest.Context, name string, objInfo rest.
// Update service from ExternalName to non-ExternalName, should initialize ClusterIP.
if oldService.Spec.Type == api.ServiceTypeExternalName && service.Spec.Type != api.ServiceTypeExternalName {
if releaseServiceIP, err = rs.initClusterIP(service); err != nil {
if releaseServiceIP, err = initClusterIP(service, rs.serviceIPs); err != nil {
return nil, false, err
}
}
@ -344,11 +371,11 @@ func (rs *REST) Update(ctx genericapirequest.Context, name string, objInfo rest.
// Update service from NodePort or LoadBalancer to ExternalName or ClusterIP, should release NodePort if exists.
if (oldService.Spec.Type == api.ServiceTypeNodePort || oldService.Spec.Type == api.ServiceTypeLoadBalancer) &&
(service.Spec.Type == api.ServiceTypeExternalName || service.Spec.Type == api.ServiceTypeClusterIP) {
rs.releaseNodePorts(oldService, nodePortOp)
releaseNodePorts(oldService, nodePortOp)
}
// Update service from any type to NodePort or LoadBalancer, should update NodePort.
if service.Spec.Type == api.ServiceTypeNodePort || service.Spec.Type == api.ServiceTypeLoadBalancer {
if err := rs.updateNodePorts(oldService, service, nodePortOp); err != nil {
if err := updateNodePorts(oldService, service, nodePortOp); err != nil {
return nil, false, err
}
}
@ -368,18 +395,18 @@ func (rs *REST) Update(ctx genericapirequest.Context, name string, objInfo rest.
return nil, false, errors.NewInvalid(api.Kind("Service"), service.Name, errs)
}
out, err := rs.registry.UpdateService(ctx, service, createValidation, updateValidation)
out, created, err := rs.services.Update(ctx, service.Name, rest.DefaultUpdatedObjectInfo(service), createValidation, updateValidation)
if err == nil {
el := nodePortOp.Commit()
if el != nil {
// problems should be fixed by an eventual reconciliation / restart
glog.Errorf("error(s) committing NodePorts changes: %v", el)
utilruntime.HandleError(fmt.Errorf("error(s) committing NodePorts changes: %v", el))
}
releaseServiceIP = false
}
return out, false, err
return out, created, err
}
// Implement Redirector.
@ -395,10 +422,11 @@ func (rs *REST) ResourceLocation(ctx genericapirequest.Context, id string) (*url
// If a port *number* was specified, find the corresponding service port name
if portNum, err := strconv.ParseInt(portStr, 10, 64); err == nil {
svc, err := rs.registry.GetService(ctx, svcName, &metav1.GetOptions{})
obj, err := rs.services.Get(ctx, svcName, &metav1.GetOptions{})
if err != nil {
return nil, nil, err
}
svc := obj.(*api.Service)
found := false
for _, svcPort := range svc.Spec.Ports {
if int64(svcPort.Port) == portNum {
@ -413,10 +441,11 @@ func (rs *REST) ResourceLocation(ctx genericapirequest.Context, id string) (*url
}
}
eps, err := rs.endpoints.GetEndpoints(ctx, svcName, &metav1.GetOptions{})
obj, err := rs.endpoints.Get(ctx, svcName, &metav1.GetOptions{})
if err != nil {
return nil, nil, err
}
eps := obj.(*api.Endpoints)
if len(eps.Subsets) == 0 {
return nil, nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", svcName))
}
@ -439,7 +468,7 @@ func (rs *REST) ResourceLocation(ctx genericapirequest.Context, id string) (*url
addr := ss.Addresses[(addrSeed+try)%len(ss.Addresses)]
if !utilfeature.DefaultFeatureGate.Enabled(features.ServiceProxyAllowExternalIPs) {
if err := isValidAddress(ctx, &addr, rs.pods); err != nil {
glog.Errorf("Address %v isn't valid (%v)", addr, err)
utilruntime.HandleError(fmt.Errorf("Address %v isn't valid (%v)", addr, err))
continue
}
}
@ -450,14 +479,18 @@ func (rs *REST) ResourceLocation(ctx genericapirequest.Context, id string) (*url
Host: net.JoinHostPort(ip, strconv.Itoa(port)),
}, rs.proxyTransport, nil
}
glog.Errorf("Failed to find a valid address, skipping subset: %v", ss)
utilruntime.HandleError(fmt.Errorf("Failed to find a valid address, skipping subset: %v", ss))
}
}
}
return nil, nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", id))
}
func isValidAddress(ctx genericapirequest.Context, addr *api.EndpointAddress, pods *podstore.REST) error {
func (r *REST) ConvertToTable(ctx genericapirequest.Context, object runtime.Object, tableOptions runtime.Object) (*metav1beta1.Table, error) {
return r.services.ConvertToTable(ctx, object, tableOptions)
}
func isValidAddress(ctx genericapirequest.Context, addr *api.EndpointAddress, pods rest.Getter) error {
if addr.TargetRef == nil {
return fmt.Errorf("Address has no target ref, skipping: %v", addr)
}
@ -503,17 +536,6 @@ func containsNodePort(serviceNodePorts []ServiceNodePort, serviceNodePort Servic
return false
}
func CollectServiceNodePorts(service *api.Service) []int {
servicePorts := []int{}
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
if servicePort.NodePort != 0 {
servicePorts = append(servicePorts, int(servicePort.NodePort))
}
}
return servicePorts
}
// Loop through the service ports list, find one with the same port number and
// NodePort specified, return this NodePort otherwise return 0.
func findRequestedNodePort(port int, servicePorts []api.ServicePort) int {
@ -527,7 +549,7 @@ func findRequestedNodePort(port int, servicePorts []api.ServicePort) int {
}
// allocateHealthCheckNodePort allocates health check node port to service.
func (rs *REST) allocateHealthCheckNodePort(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) error {
func allocateHealthCheckNodePort(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) error {
healthCheckNodePort := service.Spec.HealthCheckNodePort
if healthCheckNodePort != 0 {
// If the request has a health check nodePort in mind, attempt to reserve it.
@ -550,11 +572,11 @@ func (rs *REST) allocateHealthCheckNodePort(service *api.Service, nodePortOp *po
}
// The return bool value indicates if a cluster IP is allocated successfully.
func (rs *REST) initClusterIP(service *api.Service) (bool, error) {
func initClusterIP(service *api.Service, serviceIPs ipallocator.Interface) (bool, error) {
switch {
case service.Spec.ClusterIP == "":
// Allocate next available.
ip, err := rs.serviceIPs.AllocateNext()
ip, err := serviceIPs.AllocateNext()
if err != nil {
// TODO: what error should be returned here? It's not a
// field-level validation failure (the field is valid), and it's
@ -565,7 +587,7 @@ func (rs *REST) initClusterIP(service *api.Service) (bool, error) {
return true, nil
case service.Spec.ClusterIP != api.ClusterIPNone && service.Spec.ClusterIP != "":
// Try to respect the requested IP.
if err := rs.serviceIPs.Allocate(net.ParseIP(service.Spec.ClusterIP)); err != nil {
if err := serviceIPs.Allocate(net.ParseIP(service.Spec.ClusterIP)); err != nil {
// TODO: when validation becomes versioned, this gets more complicated.
el := field.ErrorList{field.Invalid(field.NewPath("spec", "clusterIP"), service.Spec.ClusterIP, err.Error())}
return false, errors.NewInvalid(api.Kind("Service"), service.Name, el)
@ -576,7 +598,7 @@ func (rs *REST) initClusterIP(service *api.Service) (bool, error) {
return false, nil
}
func (rs *REST) initNodePorts(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) error {
func initNodePorts(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) error {
svcPortToNodePort := map[int]int{}
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
@ -625,8 +647,8 @@ func (rs *REST) initNodePorts(service *api.Service, nodePortOp *portallocator.Po
return nil
}
func (rs *REST) updateNodePorts(oldService, newService *api.Service, nodePortOp *portallocator.PortAllocationOperation) error {
oldNodePortsNumbers := CollectServiceNodePorts(oldService)
func updateNodePorts(oldService, newService *api.Service, nodePortOp *portallocator.PortAllocationOperation) error {
oldNodePortsNumbers := collectServiceNodePorts(oldService)
newNodePorts := []ServiceNodePort{}
portAllocated := map[int]bool{}
@ -659,7 +681,7 @@ func (rs *REST) updateNodePorts(oldService, newService *api.Service, nodePortOp
newNodePorts = append(newNodePorts, nodePort)
}
newNodePortsNumbers := CollectServiceNodePorts(newService)
newNodePortsNumbers := collectServiceNodePorts(newService)
// The comparison loops are O(N^2), but we don't expect N to be huge
// (there's a hard-limit at 2^16, because they're ports; and even 4 ports would be a lot)
@ -673,10 +695,21 @@ func (rs *REST) updateNodePorts(oldService, newService *api.Service, nodePortOp
return nil
}
func (rs *REST) releaseNodePorts(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) {
nodePorts := CollectServiceNodePorts(service)
func releaseNodePorts(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) {
nodePorts := collectServiceNodePorts(service)
for _, nodePort := range nodePorts {
nodePortOp.ReleaseDeferred(nodePort)
}
}
func collectServiceNodePorts(service *api.Service) []int {
servicePorts := []int{}
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
if servicePort.NodePort != 0 {
servicePorts = append(servicePorts, int(servicePort.NodePort))
}
}
return servicePorts
}

View File

@ -14,28 +14,33 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package service
package storage
import (
"testing"
"net"
"reflect"
"strings"
"testing"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/watch"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/rest"
etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing"
"k8s.io/kubernetes/pkg/api/service"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/core/helper"
endpointstore "k8s.io/kubernetes/pkg/registry/core/endpoint/storage"
podstore "k8s.io/kubernetes/pkg/registry/core/pod/storage"
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
@ -46,28 +51,121 @@ import (
// It is now testing mostly the same things as other resources but
// in a completely different way. We should unify it.
type serviceStorage struct {
GottenID string
UpdatedID string
CreatedID string
DeletedID string
Created bool
DeletedImmediately bool
Service *api.Service
OldService *api.Service
ServiceList *api.ServiceList
Err error
}
func (s *serviceStorage) Get(ctx genericapirequest.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
s.GottenID = name
return s.Service, s.Err
}
func (s *serviceStorage) GetService(ctx genericapirequest.Context, name string, options *metav1.GetOptions) (*api.Service, error) {
return s.Service, s.Err
}
func (s *serviceStorage) NewList() runtime.Object {
panic("not implemented")
}
func (s *serviceStorage) List(ctx genericapirequest.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
ns, _ := genericapirequest.NamespaceFrom(ctx)
// Copy metadata from internal list into result
res := new(api.ServiceList)
res.TypeMeta = s.ServiceList.TypeMeta
res.ListMeta = s.ServiceList.ListMeta
if ns != metav1.NamespaceAll {
for _, service := range s.ServiceList.Items {
if ns == service.Namespace {
res.Items = append(res.Items, service)
}
}
} else {
res.Items = append([]api.Service{}, s.ServiceList.Items...)
}
return res, s.Err
}
func (s *serviceStorage) New() runtime.Object {
panic("not implemented")
}
func (s *serviceStorage) Create(ctx genericapirequest.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, includeUninitialized bool) (runtime.Object, error) {
svc := obj.(*api.Service)
s.CreatedID = obj.(metav1.Object).GetName()
s.Service = svc.DeepCopy()
if s.ServiceList == nil {
s.ServiceList = &api.ServiceList{}
}
s.ServiceList.Items = append(s.ServiceList.Items, *svc)
return svc, s.Err
}
func (s *serviceStorage) Update(ctx genericapirequest.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc) (runtime.Object, bool, error) {
s.UpdatedID = name
obj, err := objInfo.UpdatedObject(ctx, s.OldService)
if err != nil {
return nil, false, err
}
s.Service = obj.(*api.Service)
return s.Service, s.Created, s.Err
}
func (s *serviceStorage) Delete(ctx genericapirequest.Context, name string, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
s.DeletedID = name
return s.Service, s.DeletedImmediately, s.Err
}
func (s *serviceStorage) DeleteCollection(ctx genericapirequest.Context, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) (runtime.Object, error) {
panic("not implemented")
}
func (s *serviceStorage) Watch(ctx genericapirequest.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {
panic("not implemented")
}
func (s *serviceStorage) ConvertToTable(ctx genericapirequest.Context, object runtime.Object, tableOptions runtime.Object) (*metav1beta1.Table, error) {
panic("not implemented")
}
func (s *serviceStorage) Export(ctx genericapirequest.Context, name string, opts metav1.ExportOptions) (runtime.Object, error) {
panic("not implemented")
}
func generateRandomNodePort() int32 {
return int32(rand.IntnRange(30001, 30999))
}
func NewTestREST(t *testing.T, endpoints *api.EndpointsList) (*REST, *registrytest.ServiceRegistry, *etcdtesting.EtcdTestServer) {
func NewTestREST(t *testing.T, endpoints *api.EndpointsList) (*REST, *serviceStorage, *etcdtesting.EtcdTestServer) {
return NewTestRESTWithPods(t, endpoints, nil)
}
func NewTestRESTWithPods(t *testing.T, endpoints *api.EndpointsList, pods *api.PodList) (*REST, *registrytest.ServiceRegistry, *etcdtesting.EtcdTestServer) {
registry := registrytest.NewServiceRegistry()
endpointRegistry := &registrytest.EndpointRegistry{
Endpoints: endpoints,
}
func NewTestRESTWithPods(t *testing.T, endpoints *api.EndpointsList, pods *api.PodList) (*REST, *serviceStorage, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
restOptions := generic.RESTOptions{
serviceStorage := &serviceStorage{}
podStorage := podstore.NewStorage(generic.RESTOptions{
StorageConfig: etcdStorage,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: 3,
ResourcePrefix: "pods",
}
podStorage := podstore.NewStorage(restOptions, nil, nil, nil)
if pods != nil && pods.Items != nil {
}, nil, nil, nil)
if pods != nil && len(pods.Items) > 0 {
ctx := genericapirequest.NewDefaultContext()
for ix := range pods.Items {
key, _ := podStorage.Pod.KeyFunc(ctx, pods.Items[ix].Name)
@ -76,14 +174,29 @@ func NewTestRESTWithPods(t *testing.T, endpoints *api.EndpointsList, pods *api.P
}
}
}
endpointStorage := endpointstore.NewREST(generic.RESTOptions{
StorageConfig: etcdStorage,
Decorator: generic.UndecoratedStorage,
ResourcePrefix: "endpoints",
})
if endpoints != nil && len(endpoints.Items) > 0 {
ctx := genericapirequest.NewDefaultContext()
for ix := range endpoints.Items {
key, _ := endpointStorage.KeyFunc(ctx, endpoints.Items[ix].Name)
if err := endpointStorage.Store.Storage.Create(ctx, key, &endpoints.Items[ix], nil, 0); err != nil {
t.Fatalf("Couldn't create endpoint: %v", err)
}
}
}
r := ipallocator.NewCIDRRange(makeIPNet(t))
portRange := utilnet.PortRange{Base: 30000, Size: 1000}
portAllocator := portallocator.NewPortAllocator(portRange)
storage := NewStorage(registry, endpointRegistry, podStorage.Pod, r, portAllocator, nil)
rest, _ := NewREST(serviceStorage, endpointStorage, podStorage.Pod, r, portAllocator, nil)
return storage.Service, registry, server
return rest, serviceStorage, server
}
func makeIPNet(t *testing.T) *net.IPNet {
@ -94,15 +207,16 @@ func makeIPNet(t *testing.T) *net.IPNet {
return net
}
func releaseServiceNodePorts(t *testing.T, ctx genericapirequest.Context, svcName string, rest *REST, registry *registrytest.ServiceRegistry) {
srv, err := registry.GetService(ctx, svcName, &metav1.GetOptions{})
func releaseServiceNodePorts(t *testing.T, ctx genericapirequest.Context, svcName string, rest *REST, registry ServiceStorage) {
obj, err := registry.Get(ctx, svcName, &metav1.GetOptions{})
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
srv := obj.(*api.Service)
if srv == nil {
t.Fatalf("Failed to find service: %s", svcName)
}
serviceNodePorts := CollectServiceNodePorts(srv)
serviceNodePorts := collectServiceNodePorts(srv)
if len(serviceNodePorts) == 0 {
t.Errorf("Failed to find NodePorts of service : %s", srv.Name)
}
@ -271,7 +385,7 @@ func TestServiceRegistryCreateMultiNodePortsService(t *testing.T) {
if created_service.Name != test.name {
t.Errorf("Expected %s, but got %s", test.name, created_service.Name)
}
serviceNodePorts := CollectServiceNodePorts(created_service)
serviceNodePorts := collectServiceNodePorts(created_service)
if !reflect.DeepEqual(serviceNodePorts, test.expectNodePorts) {
t.Errorf("Expected %v, but got %v", test.expectNodePorts, serviceNodePorts)
}
@ -348,7 +462,7 @@ func TestServiceRegistryUpdate(t *testing.T) {
storage, registry, server := NewTestREST(t, nil)
defer server.Terminate(t)
svc, err := registry.CreateService(ctx, &api.Service{
obj, err := registry.Create(ctx, &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1", Namespace: metav1.NamespaceDefault},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz1"},
@ -358,8 +472,8 @@ func TestServiceRegistryUpdate(t *testing.T) {
TargetPort: intstr.FromInt(6502),
}},
},
}, rest.ValidateAllObjectFunc)
}, rest.ValidateAllObjectFunc, false)
svc := obj.(*api.Service)
if err != nil {
t.Fatalf("Expected no error: %v", err)
}
@ -400,7 +514,7 @@ func TestServiceStorageValidatesUpdate(t *testing.T) {
ctx := genericapirequest.NewDefaultContext()
storage, registry, server := NewTestREST(t, nil)
defer server.Terminate(t)
registry.CreateService(ctx, &api.Service{
registry.Create(ctx, &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
@ -409,7 +523,7 @@ func TestServiceStorageValidatesUpdate(t *testing.T) {
Protocol: api.ProtocolTCP,
}},
},
}, rest.ValidateAllObjectFunc)
}, rest.ValidateAllObjectFunc, false)
failureCases := map[string]api.Service{
"empty ID": {
ObjectMeta: metav1.ObjectMeta{Name: ""},
@ -477,7 +591,7 @@ func TestServiceRegistryExternalService(t *testing.T) {
if srv == nil {
t.Fatalf("Failed to find service: %s", svc.Name)
}
serviceNodePorts := CollectServiceNodePorts(srv)
serviceNodePorts := collectServiceNodePorts(srv)
if len(serviceNodePorts) == 0 {
t.Errorf("Failed to find NodePorts of service : %s", srv.Name)
}
@ -504,8 +618,8 @@ func TestServiceRegistryDelete(t *testing.T) {
}},
},
}
registry.CreateService(ctx, svc, rest.ValidateAllObjectFunc)
storage.Delete(ctx, svc.Name)
registry.Create(ctx, svc, rest.ValidateAllObjectFunc, false)
storage.Delete(ctx, svc.Name, &metav1.DeleteOptions{})
if e, a := "foo", registry.DeletedID; e != a {
t.Errorf("Expected %v, but got %v", e, a)
}
@ -527,8 +641,8 @@ func TestServiceRegistryDeleteExternal(t *testing.T) {
}},
},
}
registry.CreateService(ctx, svc, rest.ValidateAllObjectFunc)
storage.Delete(ctx, svc.Name)
registry.Create(ctx, svc, rest.ValidateAllObjectFunc, false)
storage.Delete(ctx, svc.Name, &metav1.DeleteOptions{})
if e, a := "foo", registry.DeletedID; e != a {
t.Errorf("Expected %v, but got %v", e, a)
}
@ -615,12 +729,12 @@ func TestServiceRegistryGet(t *testing.T) {
ctx := genericapirequest.NewDefaultContext()
storage, registry, server := NewTestREST(t, nil)
defer server.Terminate(t)
registry.CreateService(ctx, &api.Service{
registry.Create(ctx, &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
},
}, rest.ValidateAllObjectFunc)
}, rest.ValidateAllObjectFunc, false)
storage.Get(ctx, "foo", &metav1.GetOptions{})
if e, a := "foo", registry.GottenID; e != a {
t.Errorf("Expected %v, but got %v", e, a)
@ -655,22 +769,6 @@ func TestServiceRegistryResourceLocation(t *testing.T) {
Ports: []api.EndpointPort{{Name: "", Port: 80}, {Name: "p", Port: 93}},
}},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: metav1.NamespaceDefault,
},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{},
Ports: []api.EndpointPort{{Name: "", Port: 80}, {Name: "p", Port: 93}},
}, {
Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Name: "foo", Namespace: metav1.NamespaceDefault}}},
Ports: []api.EndpointPort{{Name: "", Port: 80}, {Name: "p", Port: 93}},
}, {
Addresses: []api.EndpointAddress{{IP: "1.2.3.5", TargetRef: &api.ObjectReference{Name: "bar", Namespace: metav1.NamespaceDefault}}},
Ports: []api.EndpointPort{},
}},
},
},
}
pods := &api.PodList{
@ -708,7 +806,7 @@ func TestServiceRegistryResourceLocation(t *testing.T) {
storage, registry, server := NewTestRESTWithPods(t, endpoints, pods)
defer server.Terminate(t)
for _, name := range []string{"foo", "bad"} {
registry.CreateService(ctx, &api.Service{
registry.Create(ctx, &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: name},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
@ -721,7 +819,7 @@ func TestServiceRegistryResourceLocation(t *testing.T) {
{Name: "", Port: 93, TargetPort: intstr.FromInt(80)},
},
},
}, rest.ValidateAllObjectFunc)
}, rest.ValidateAllObjectFunc, false)
}
redirector := rest.Redirector(storage)
@ -807,19 +905,19 @@ func TestServiceRegistryList(t *testing.T) {
ctx := genericapirequest.NewDefaultContext()
storage, registry, server := NewTestREST(t, nil)
defer server.Terminate(t)
registry.CreateService(ctx, &api.Service{
registry.Create(ctx, &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: metav1.NamespaceDefault},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
},
}, rest.ValidateAllObjectFunc)
registry.CreateService(ctx, &api.Service{
}, rest.ValidateAllObjectFunc, false)
registry.Create(ctx, &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo2", Namespace: metav1.NamespaceDefault},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar2": "baz2"},
},
}, rest.ValidateAllObjectFunc)
registry.List.ResourceVersion = "1"
}, rest.ValidateAllObjectFunc, false)
registry.ServiceList.ResourceVersion = "1"
s, _ := storage.List(ctx, nil)
sl := s.(*api.ServiceList)
if len(sl.Items) != 2 {
@ -946,7 +1044,7 @@ func TestServiceRegistryIPReallocation(t *testing.T) {
t.Errorf("Unexpected ClusterIP: %s", created_service_1.Spec.ClusterIP)
}
_, err := storage.Delete(ctx, created_service_1.Name)
_, _, err := storage.Delete(ctx, created_service_1.Name, &metav1.DeleteOptions{})
if err != nil {
t.Errorf("Unexpected error deleting service: %v", err)
}
@ -1303,7 +1401,7 @@ func TestInitClusterIP(t *testing.T) {
}
for _, test := range testCases {
hasAllocatedIP, err := storage.initClusterIP(test.svc)
hasAllocatedIP, err := initClusterIP(test.svc, storage.serviceIPs)
if err != nil {
t.Errorf("%q: unexpected error: %v", test.name, err)
}
@ -1488,13 +1586,13 @@ func TestInitNodePorts(t *testing.T) {
}
for _, test := range testCases {
err := storage.initNodePorts(test.service, nodePortOp)
err := initNodePorts(test.service, nodePortOp)
if err != nil {
t.Errorf("%q: unexpected error: %v", test.name, err)
continue
}
serviceNodePorts := CollectServiceNodePorts(test.service)
serviceNodePorts := collectServiceNodePorts(test.service)
if len(test.expectSpecifiedNodePorts) == 0 {
for _, nodePort := range serviceNodePorts {
if !storage.serviceNodePorts.Has(nodePort) {
@ -1758,13 +1856,13 @@ func TestUpdateNodePorts(t *testing.T) {
}
for _, test := range testCases {
err := storage.updateNodePorts(test.oldService, test.newService, nodePortOp)
err := updateNodePorts(test.oldService, test.newService, nodePortOp)
if err != nil {
t.Errorf("%q: unexpected error: %v", test.name, err)
continue
}
serviceNodePorts := CollectServiceNodePorts(test.newService)
serviceNodePorts := collectServiceNodePorts(test.newService)
if len(test.expectSpecifiedNodePorts) == 0 {
for _, nodePort := range serviceNodePorts {
if !storage.serviceNodePorts.Has(nodePort) {

View File

@ -30,16 +30,17 @@ import (
"k8s.io/kubernetes/pkg/registry/core/service"
)
type REST struct {
type GenericREST struct {
*genericregistry.Store
}
// NewREST returns a RESTStorage object that will work against services.
func NewREST(optsGetter generic.RESTOptionsGetter) (*REST, *StatusREST) {
func NewGenericREST(optsGetter generic.RESTOptionsGetter) (*GenericREST, *StatusREST) {
store := &genericregistry.Store{
NewFunc: func() runtime.Object { return &api.Service{} },
NewListFunc: func() runtime.Object { return &api.ServiceList{} },
DefaultQualifiedResource: api.Resource("services"),
ReturnDeletedObject: true,
CreateStrategy: service.Strategy,
UpdateStrategy: service.Strategy,
@ -55,26 +56,25 @@ func NewREST(optsGetter generic.RESTOptionsGetter) (*REST, *StatusREST) {
statusStore := *store
statusStore.UpdateStrategy = service.StatusStrategy
return &REST{store}, &StatusREST{store: &statusStore}
return &GenericREST{store}, &StatusREST{store: &statusStore}
}
// Implement ShortNamesProvider
var _ rest.ShortNamesProvider = &REST{}
var (
_ rest.ShortNamesProvider = &GenericREST{}
_ rest.CategoriesProvider = &GenericREST{}
)
// ShortNames implements the ShortNamesProvider interface. Returns a list of short names for a resource.
func (r *REST) ShortNames() []string {
func (r *GenericREST) ShortNames() []string {
return []string{"svc"}
}
// Implement CategoriesProvider
var _ rest.CategoriesProvider = &REST{}
// Categories implements the CategoriesProvider interface. Returns a list of categories a resource is part of.
func (r *REST) Categories() []string {
func (r *GenericREST) Categories() []string {
return []string{"all"}
}
// StatusREST implements the REST endpoint for changing the status of a service.
// StatusREST implements the GenericREST endpoint for changing the status of a service.
type StatusREST struct {
store *genericregistry.Store
}

View File

@ -31,7 +31,7 @@ import (
"k8s.io/kubernetes/pkg/registry/registrytest"
)
func newStorage(t *testing.T) (*REST, *StatusREST, *etcdtesting.EtcdTestServer) {
func newStorage(t *testing.T) (*GenericREST, *StatusREST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
restOptions := generic.RESTOptions{
StorageConfig: etcdStorage,
@ -39,7 +39,7 @@ func newStorage(t *testing.T) (*REST, *StatusREST, *etcdtesting.EtcdTestServer)
DeleteCollectionWorkers: 1,
ResourcePrefix: "services",
}
serviceStorage, statusStorage := NewREST(restOptions)
serviceStorage, statusStorage := NewGenericREST(restOptions)
return serviceStorage, statusStorage, server
}
@ -125,7 +125,7 @@ func TestDelete(t *testing.T) {
storage, _, server := newStorage(t)
defer server.Terminate(t)
defer storage.Store.DestroyFunc()
test := genericregistrytest.New(t, storage.Store).AllowCreateOnUpdate()
test := genericregistrytest.New(t, storage.Store).AllowCreateOnUpdate().ReturnDeletedObject()
test.TestDelete(validService())
}

View File

@ -27,6 +27,7 @@ import (
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
api "k8s.io/kubernetes/pkg/apis/core"
_ "k8s.io/kubernetes/pkg/apis/core/install"
)
func TestExportService(t *testing.T) {
@ -111,17 +112,17 @@ func TestExportService(t *testing.T) {
func TestCheckGeneratedNameError(t *testing.T) {
expect := errors.NewNotFound(api.Resource("foos"), "bar")
if err := rest.CheckGeneratedNameError(Strategy, expect, &api.Pod{}); err != expect {
if err := rest.CheckGeneratedNameError(Strategy, expect, &api.Service{}); err != expect {
t.Errorf("NotFoundError should be ignored: %v", err)
}
expect = errors.NewAlreadyExists(api.Resource("foos"), "bar")
if err := rest.CheckGeneratedNameError(Strategy, expect, &api.Pod{}); err != expect {
if err := rest.CheckGeneratedNameError(Strategy, expect, &api.Service{}); err != expect {
t.Errorf("AlreadyExists should be returned when no GenerateName field: %v", err)
}
expect = errors.NewAlreadyExists(api.Resource("foos"), "bar")
if err := rest.CheckGeneratedNameError(Strategy, expect, &api.Pod{ObjectMeta: metav1.ObjectMeta{GenerateName: "foo"}}); err == nil || !errors.IsServerTimeout(err) {
if err := rest.CheckGeneratedNameError(Strategy, expect, &api.Service{ObjectMeta: metav1.ObjectMeta{GenerateName: "foo"}}); err == nil || !errors.IsServerTimeout(err) {
t.Errorf("expected try again later error: %v", err)
}
}

View File

@ -2839,65 +2839,6 @@ func TestDeleteWithOptionsQueryAndBody(t *testing.T) {
}
}
func TestLegacyDelete(t *testing.T) {
storage := map[string]rest.Storage{}
simpleStorage := SimpleRESTStorage{}
ID := "id"
storage["simple"] = LegacyRESTStorage{&simpleStorage}
var _ rest.Deleter = storage["simple"].(LegacyRESTStorage)
handler := handle(storage)
server := httptest.NewServer(handler)
defer server.Close()
client := http.Client{}
request, err := http.NewRequest("DELETE", server.URL+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/default/simple/"+ID, nil)
res, err := client.Do(request)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if res.StatusCode != http.StatusOK {
t.Errorf("unexpected response: %#v", res)
}
if simpleStorage.deleted != ID {
t.Errorf("Unexpected delete: %s, expected %s", simpleStorage.deleted, ID)
}
if simpleStorage.deleteOptions != nil {
t.Errorf("unexpected delete options: %#v", simpleStorage.deleteOptions)
}
}
func TestLegacyDeleteIgnoresOptions(t *testing.T) {
storage := map[string]rest.Storage{}
simpleStorage := SimpleRESTStorage{}
ID := "id"
storage["simple"] = LegacyRESTStorage{&simpleStorage}
handler := handle(storage)
server := httptest.NewServer(handler)
defer server.Close()
item := metav1.NewDeleteOptions(300)
body, err := runtime.Encode(codec, item)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
client := http.Client{}
request, err := http.NewRequest("DELETE", server.URL+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/default/simple/"+ID, bytes.NewReader(body))
res, err := client.Do(request)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if res.StatusCode != http.StatusOK {
t.Errorf("unexpected response: %#v", res)
}
if simpleStorage.deleted != ID {
t.Errorf("Unexpected delete: %s, expected %s", simpleStorage.deleted, ID)
}
if simpleStorage.deleteOptions != nil {
t.Errorf("unexpected delete options: %#v", simpleStorage.deleteOptions)
}
}
func TestDeleteInvokesAdmissionControl(t *testing.T) {
// TODO: remove mutating deny when we removed it from the endpoint implementation and ported all plugins
for _, admit := range []admission.Interface{alwaysMutatingDeny{}, alwaysValidatingDeny{}} {

View File

@ -227,7 +227,6 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
lister, isLister := storage.(rest.Lister)
getter, isGetter := storage.(rest.Getter)
getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions)
deleter, isDeleter := storage.(rest.Deleter)
gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter)
updater, isUpdater := storage.(rest.Updater)
@ -273,16 +272,12 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
var versionedDeleteOptions runtime.Object
var versionedDeleterObject interface{}
switch {
case isGracefulDeleter:
if isGracefulDeleter {
versionedDeleteOptions, err = a.group.Creater.New(optionsExternalVersion.WithKind("DeleteOptions"))
if err != nil {
return nil, err
}
versionedDeleterObject = indirectArbitraryPointer(versionedDeleteOptions)
isDeleter = true
case isDeleter:
gracefulDeleter = rest.GracefulDeleteAdapter{Deleter: deleter}
}
versionedStatusPtr, err := a.group.Creater.New(optionsExternalVersion.WithKind("Status"))
@ -416,7 +411,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
}
actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer, false}, isUpdater)
actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer, false}, isPatcher)
actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isDeleter)
actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isGracefulDeleter)
actions = appendIf(actions, action{"WATCH", "watch/" + itemPath, nameParams, namer, false}, isWatcher)
actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer, false}, isConnecter)
actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", proxyParams, namer, false}, isConnecter && connectSubpath)
@ -462,7 +457,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
}
actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer, false}, isUpdater)
actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer, false}, isPatcher)
actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isDeleter)
actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isGracefulDeleter)
actions = appendIf(actions, action{"WATCH", "watch/" + itemPath, nameParams, namer, false}, isWatcher)
actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer, false}, isConnecter)
actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", proxyParams, namer, false}, isConnecter && connectSubpath)

View File

@ -137,16 +137,6 @@ type TableConvertor interface {
ConvertToTable(ctx genericapirequest.Context, object runtime.Object, tableOptions runtime.Object) (*metav1beta1.Table, error)
}
// Deleter is an object that can delete a named RESTful resource.
type Deleter interface {
// Delete finds a resource in the storage and deletes it.
// Although it can return an arbitrary error value, IsNotFound(err) is true for the
// returned error value err when the specified resource is not found.
// Delete *may* return the object that was deleted, or a status object indicating additional
// information about deletion.
Delete(ctx genericapirequest.Context, name string) (runtime.Object, error)
}
// GracefulDeleter knows how to pass deletion options to allow delayed deletion of a
// RESTful object.
type GracefulDeleter interface {
@ -162,17 +152,6 @@ type GracefulDeleter interface {
Delete(ctx genericapirequest.Context, name string, options *metav1.DeleteOptions) (runtime.Object, bool, error)
}
// GracefulDeleteAdapter adapts the Deleter interface to GracefulDeleter
type GracefulDeleteAdapter struct {
Deleter
}
// Delete implements RESTGracefulDeleter in terms of Deleter
func (w GracefulDeleteAdapter) Delete(ctx genericapirequest.Context, name string, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
obj, err := w.Deleter.Delete(ctx, name)
return obj, true, err
}
// CollectionDeleter is an object that can delete a collection
// of RESTful resources.
type CollectionDeleter interface {

View File

@ -24,11 +24,13 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
authorizationv1 "k8s.io/api/authorization/v1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/printers"
"k8s.io/kubernetes/test/e2e/framework"
imageutils "k8s.io/kubernetes/test/utils/image"
@ -142,7 +144,15 @@ var _ = SIGDescribe("Servers with support for Table transformation", func() {
c := f.ClientSet
table := &metav1beta1.Table{}
err := c.CoreV1().RESTClient().Get().Resource("services").SetHeader("Accept", "application/json;as=Table;v=v1beta1;g=meta.k8s.io").Do().Into(table)
sar := &authorizationv1.SelfSubjectAccessReview{
Spec: authorizationv1.SelfSubjectAccessReviewSpec{
NonResourceAttributes: &authorizationv1.NonResourceAttributes{
Path: "/",
Verb: "get",
},
},
}
err := c.AuthorizationV1().RESTClient().Post().Resource("selfsubjectaccessreviews").SetHeader("Accept", "application/json;as=Table;v=v1beta1;g=meta.k8s.io").Body(sar).Do().Into(table)
Expect(err).To(HaveOccurred())
Expect(err.(errors.APIStatus).Status().Code).To(Equal(int32(406)))
})