mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 15:05:27 +00:00
Merge pull request #59510 from smarterclayton/services_table
Automatic merge from submit-queue (batch tested with PRs 60106, 59510, 60263, 60063, 59088). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Refactor service storage to remove registry wrapper This exposes the correct table exporter to the API endpoint, which is a prereq for server side GET to beta. Removing the use of the registry simplifies a few complex changes but results in test abstractions changing. Part of #58536
This commit is contained in:
commit
3a399c05f5
31
api/openapi-spec/swagger.json
generated
31
api/openapi-spec/swagger.json
generated
@ -8687,6 +8687,37 @@
|
||||
"core_v1"
|
||||
],
|
||||
"operationId": "deleteCoreV1NamespacedService",
|
||||
"parameters": [
|
||||
{
|
||||
"name": "body",
|
||||
"in": "body",
|
||||
"required": true,
|
||||
"schema": {
|
||||
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.DeleteOptions"
|
||||
}
|
||||
},
|
||||
{
|
||||
"uniqueItems": true,
|
||||
"type": "integer",
|
||||
"description": "The duration in seconds before the object should be deleted. Value must be non-negative integer. The value zero indicates delete immediately. If this value is nil, the default grace period for the specified type will be used. Defaults to a per object value if not specified. zero means delete immediately.",
|
||||
"name": "gracePeriodSeconds",
|
||||
"in": "query"
|
||||
},
|
||||
{
|
||||
"uniqueItems": true,
|
||||
"type": "boolean",
|
||||
"description": "Deprecated: please use the PropagationPolicy, this field will be deprecated in 1.7. Should the dependent objects be orphaned. If true/false, the \"orphan\" finalizer will be added to/removed from the object's finalizers list. Either this field or PropagationPolicy may be set, but not both.",
|
||||
"name": "orphanDependents",
|
||||
"in": "query"
|
||||
},
|
||||
{
|
||||
"uniqueItems": true,
|
||||
"type": "string",
|
||||
"description": "Whether and how garbage collection will be performed. Either this field or OrphanDependents may be set, but not both. The default policy is decided by the existing finalizer set in the metadata.finalizers and the resource-specific default policy. Acceptable values are: 'Orphan' - orphan the dependents; 'Background' - allow the garbage collector to delete the dependents in the background; 'Foreground' - a cascading policy that deletes all dependents in the foreground.",
|
||||
"name": "propagationPolicy",
|
||||
"in": "query"
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "OK",
|
||||
|
32
api/swagger-spec/v1.json
generated
32
api/swagger-spec/v1.json
generated
@ -16451,6 +16451,38 @@
|
||||
"required": false,
|
||||
"allowMultiple": false
|
||||
},
|
||||
{
|
||||
"type": "v1.DeleteOptions",
|
||||
"paramType": "body",
|
||||
"name": "body",
|
||||
"description": "",
|
||||
"required": true,
|
||||
"allowMultiple": false
|
||||
},
|
||||
{
|
||||
"type": "integer",
|
||||
"paramType": "query",
|
||||
"name": "gracePeriodSeconds",
|
||||
"description": "The duration in seconds before the object should be deleted. Value must be non-negative integer. The value zero indicates delete immediately. If this value is nil, the default grace period for the specified type will be used. Defaults to a per object value if not specified. zero means delete immediately.",
|
||||
"required": false,
|
||||
"allowMultiple": false
|
||||
},
|
||||
{
|
||||
"type": "boolean",
|
||||
"paramType": "query",
|
||||
"name": "orphanDependents",
|
||||
"description": "Deprecated: please use the PropagationPolicy, this field will be deprecated in 1.7. Should the dependent objects be orphaned. If true/false, the \"orphan\" finalizer will be added to/removed from the object's finalizers list. Either this field or PropagationPolicy may be set, but not both.",
|
||||
"required": false,
|
||||
"allowMultiple": false
|
||||
},
|
||||
{
|
||||
"type": "string",
|
||||
"paramType": "query",
|
||||
"name": "propagationPolicy",
|
||||
"description": "Whether and how garbage collection will be performed. Either this field or OrphanDependents may be set, but not both. The default policy is decided by the existing finalizer set in the metadata.finalizers and the resource-specific default policy. Acceptable values are: 'Orphan' - orphan the dependents; 'Background' - allow the garbage collector to delete the dependents in the background; 'Foreground' - a cascading policy that deletes all dependents in the foreground.",
|
||||
"required": false,
|
||||
"allowMultiple": false
|
||||
},
|
||||
{
|
||||
"type": "string",
|
||||
"paramType": "path",
|
||||
|
32
docs/api-reference/v1/operations.html
generated
32
docs/api-reference/v1/operations.html
generated
@ -18245,6 +18245,38 @@ span.icon > [class^="icon-"], span.icon > [class*=" icon-"] { cursor: default; }
|
||||
<td class="tableblock halign-left valign-top"></td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td class="tableblock halign-left valign-top"><p class="tableblock">BodyParameter</p></td>
|
||||
<td class="tableblock halign-left valign-top"><p class="tableblock">body</p></td>
|
||||
<td class="tableblock halign-left valign-top"></td>
|
||||
<td class="tableblock halign-left valign-top"><p class="tableblock">true</p></td>
|
||||
<td class="tableblock halign-left valign-top"><p class="tableblock"><a href="../definitions#_v1_deleteoptions">v1.DeleteOptions</a></p></td>
|
||||
<td class="tableblock halign-left valign-top"></td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td class="tableblock halign-left valign-top"><p class="tableblock">QueryParameter</p></td>
|
||||
<td class="tableblock halign-left valign-top"><p class="tableblock">gracePeriodSeconds</p></td>
|
||||
<td class="tableblock halign-left valign-top"><p class="tableblock">The duration in seconds before the object should be deleted. Value must be non-negative integer. The value zero indicates delete immediately. If this value is nil, the default grace period for the specified type will be used. Defaults to a per object value if not specified. zero means delete immediately.</p></td>
|
||||
<td class="tableblock halign-left valign-top"><p class="tableblock">false</p></td>
|
||||
<td class="tableblock halign-left valign-top"><p class="tableblock">integer (int32)</p></td>
|
||||
<td class="tableblock halign-left valign-top"></td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td class="tableblock halign-left valign-top"><p class="tableblock">QueryParameter</p></td>
|
||||
<td class="tableblock halign-left valign-top"><p class="tableblock">orphanDependents</p></td>
|
||||
<td class="tableblock halign-left valign-top"><p class="tableblock">Deprecated: please use the PropagationPolicy, this field will be deprecated in 1.7. Should the dependent objects be orphaned. If true/false, the "orphan" finalizer will be added to/removed from the object’s finalizers list. Either this field or PropagationPolicy may be set, but not both.</p></td>
|
||||
<td class="tableblock halign-left valign-top"><p class="tableblock">false</p></td>
|
||||
<td class="tableblock halign-left valign-top"><p class="tableblock">boolean</p></td>
|
||||
<td class="tableblock halign-left valign-top"></td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td class="tableblock halign-left valign-top"><p class="tableblock">QueryParameter</p></td>
|
||||
<td class="tableblock halign-left valign-top"><p class="tableblock">propagationPolicy</p></td>
|
||||
<td class="tableblock halign-left valign-top"><p class="tableblock">Whether and how garbage collection will be performed. Either this field or OrphanDependents may be set, but not both. The default policy is decided by the existing finalizer set in the metadata.finalizers and the resource-specific default policy. Acceptable values are: <em>Orphan</em> - orphan the dependents; <em>Background</em> - allow the garbage collector to delete the dependents in the background; <em>Foreground</em> - a cascading policy that deletes all dependents in the foreground.</p></td>
|
||||
<td class="tableblock halign-left valign-top"><p class="tableblock">false</p></td>
|
||||
<td class="tableblock halign-left valign-top"><p class="tableblock">string</p></td>
|
||||
<td class="tableblock halign-left valign-top"></td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td class="tableblock halign-left valign-top"><p class="tableblock">PathParameter</p></td>
|
||||
<td class="tableblock halign-left valign-top"><p class="tableblock">namespace</p></td>
|
||||
<td class="tableblock halign-left valign-top"><p class="tableblock">object name and auth scope, such as for teams and projects</p></td>
|
||||
|
@ -30,7 +30,6 @@ go_library(
|
||||
"//pkg/master/ports:go_default_library",
|
||||
"//pkg/registry/core/componentstatus:go_default_library",
|
||||
"//pkg/registry/core/configmap/storage:go_default_library",
|
||||
"//pkg/registry/core/endpoint:go_default_library",
|
||||
"//pkg/registry/core/endpoint/storage:go_default_library",
|
||||
"//pkg/registry/core/event/storage:go_default_library",
|
||||
"//pkg/registry/core/limitrange/storage:go_default_library",
|
||||
@ -44,7 +43,6 @@ go_library(
|
||||
"//pkg/registry/core/replicationcontroller/storage:go_default_library",
|
||||
"//pkg/registry/core/resourcequota/storage:go_default_library",
|
||||
"//pkg/registry/core/secret/storage:go_default_library",
|
||||
"//pkg/registry/core/service:go_default_library",
|
||||
"//pkg/registry/core/service/allocator:go_default_library",
|
||||
"//pkg/registry/core/service/allocator/storage:go_default_library",
|
||||
"//pkg/registry/core/service/ipallocator:go_default_library",
|
||||
|
@ -44,7 +44,6 @@ import (
|
||||
"k8s.io/kubernetes/pkg/master/ports"
|
||||
"k8s.io/kubernetes/pkg/registry/core/componentstatus"
|
||||
configmapstore "k8s.io/kubernetes/pkg/registry/core/configmap/storage"
|
||||
"k8s.io/kubernetes/pkg/registry/core/endpoint"
|
||||
endpointsstore "k8s.io/kubernetes/pkg/registry/core/endpoint/storage"
|
||||
eventstore "k8s.io/kubernetes/pkg/registry/core/event/storage"
|
||||
limitrangestore "k8s.io/kubernetes/pkg/registry/core/limitrange/storage"
|
||||
@ -58,7 +57,6 @@ import (
|
||||
controllerstore "k8s.io/kubernetes/pkg/registry/core/replicationcontroller/storage"
|
||||
resourcequotastore "k8s.io/kubernetes/pkg/registry/core/resourcequota/storage"
|
||||
secretstore "k8s.io/kubernetes/pkg/registry/core/secret/storage"
|
||||
"k8s.io/kubernetes/pkg/registry/core/service"
|
||||
"k8s.io/kubernetes/pkg/registry/core/service/allocator"
|
||||
serviceallocator "k8s.io/kubernetes/pkg/registry/core/service/allocator/storage"
|
||||
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
|
||||
@ -127,7 +125,6 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi
|
||||
namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespacestore.NewREST(restOptionsGetter)
|
||||
|
||||
endpointsStorage := endpointsstore.NewREST(restOptionsGetter)
|
||||
endpointRegistry := endpoint.NewRegistry(endpointsStorage)
|
||||
|
||||
nodeStorage, err := nodestore.NewStorage(restOptionsGetter, c.KubeletClientConfig, c.ProxyTransport)
|
||||
if err != nil {
|
||||
@ -148,8 +145,7 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi
|
||||
serviceAccountStorage = serviceaccountstore.NewREST(restOptionsGetter, nil, nil, nil)
|
||||
}
|
||||
|
||||
serviceRESTStorage, serviceStatusStorage := servicestore.NewREST(restOptionsGetter)
|
||||
serviceRegistry := service.NewRegistry(serviceRESTStorage)
|
||||
serviceRESTStorage, serviceStatusStorage := servicestore.NewGenericREST(restOptionsGetter)
|
||||
|
||||
var serviceClusterIPRegistry rangeallocation.RangeRegistry
|
||||
serviceClusterIPRange := c.ServiceIPRange
|
||||
@ -162,7 +158,7 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi
|
||||
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
|
||||
}
|
||||
|
||||
ServiceClusterIPAllocator := ipallocator.NewAllocatorCIDRRange(&serviceClusterIPRange, func(max int, rangeSpec string) allocator.Interface {
|
||||
serviceClusterIPAllocator := ipallocator.NewAllocatorCIDRRange(&serviceClusterIPRange, func(max int, rangeSpec string) allocator.Interface {
|
||||
mem := allocator.NewAllocationMap(max, rangeSpec)
|
||||
// TODO etcdallocator package to return a storage interface via the storageFactory
|
||||
etcd := serviceallocator.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), serviceStorageConfig)
|
||||
@ -172,7 +168,7 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi
|
||||
restStorage.ServiceClusterIPAllocator = serviceClusterIPRegistry
|
||||
|
||||
var serviceNodePortRegistry rangeallocation.RangeRegistry
|
||||
ServiceNodePortAllocator := portallocator.NewPortAllocatorCustom(c.ServiceNodePortRange, func(max int, rangeSpec string) allocator.Interface {
|
||||
serviceNodePortAllocator := portallocator.NewPortAllocatorCustom(c.ServiceNodePortRange, func(max int, rangeSpec string) allocator.Interface {
|
||||
mem := allocator.NewAllocationMap(max, rangeSpec)
|
||||
// TODO etcdallocator package to return a storage interface via the storageFactory
|
||||
etcd := serviceallocator.NewEtcd(mem, "/ranges/servicenodeports", api.Resource("servicenodeportallocations"), serviceStorageConfig)
|
||||
@ -183,7 +179,7 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi
|
||||
|
||||
controllerStorage := controllerstore.NewStorage(restOptionsGetter)
|
||||
|
||||
serviceRest := service.NewStorage(serviceRegistry, endpointRegistry, podStorage.Pod, ServiceClusterIPAllocator, ServiceNodePortAllocator, c.ProxyTransport)
|
||||
serviceRest, serviceRestProxy := servicestore.NewREST(serviceRESTStorage, endpointsStorage, podStorage.Pod, serviceClusterIPAllocator, serviceNodePortAllocator, c.ProxyTransport)
|
||||
|
||||
restStorageMap := map[string]rest.Storage{
|
||||
"pods": podStorage.Pod,
|
||||
@ -201,8 +197,8 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi
|
||||
"replicationControllers": controllerStorage.Controller,
|
||||
"replicationControllers/status": controllerStorage.Status,
|
||||
|
||||
"services": serviceRest.Service,
|
||||
"services/proxy": serviceRest.Proxy,
|
||||
"services": serviceRest,
|
||||
"services/proxy": serviceRestProxy,
|
||||
"services/status": serviceStatusStorage,
|
||||
|
||||
"endpoints": endpointsStorage,
|
||||
|
@ -12,65 +12,40 @@ go_library(
|
||||
"doc.go",
|
||||
"proxy.go",
|
||||
"registry.go",
|
||||
"rest.go",
|
||||
"strategy.go",
|
||||
],
|
||||
importpath = "k8s.io/kubernetes/pkg/registry/core/service",
|
||||
deps = [
|
||||
"//pkg/api/legacyscheme:go_default_library",
|
||||
"//pkg/api/service:go_default_library",
|
||||
"//pkg/apis/core:go_default_library",
|
||||
"//pkg/apis/core/helper:go_default_library",
|
||||
"//pkg/apis/core/validation:go_default_library",
|
||||
"//pkg/capabilities:go_default_library",
|
||||
"//pkg/features:go_default_library",
|
||||
"//pkg/registry/core/endpoint:go_default_library",
|
||||
"//pkg/registry/core/pod/storage:go_default_library",
|
||||
"//pkg/registry/core/service/ipallocator:go_default_library",
|
||||
"//pkg/registry/core/service/portallocator:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/internalversion:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/proxy:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/validation/field:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/storage/names:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = [
|
||||
"rest_test.go",
|
||||
"strategy_test.go",
|
||||
],
|
||||
srcs = ["strategy_test.go"],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//pkg/api/service:go_default_library",
|
||||
"//pkg/apis/core:go_default_library",
|
||||
"//pkg/apis/core/helper:go_default_library",
|
||||
"//pkg/registry/core/pod/storage:go_default_library",
|
||||
"//pkg/registry/core/service/ipallocator:go_default_library",
|
||||
"//pkg/registry/core/service/portallocator:go_default_library",
|
||||
"//pkg/registry/registrytest:go_default_library",
|
||||
"//pkg/apis/core/install:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/rand:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/registry/generic:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/storage/etcd/testing:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
@ -15,7 +15,6 @@ go_library(
|
||||
"//pkg/apis/core:go_default_library",
|
||||
"//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library",
|
||||
"//pkg/registry/core/rangeallocation:go_default_library",
|
||||
"//pkg/registry/core/service:go_default_library",
|
||||
"//pkg/registry/core/service/portallocator:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
|
@ -32,7 +32,6 @@ import (
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
|
||||
"k8s.io/kubernetes/pkg/registry/core/rangeallocation"
|
||||
"k8s.io/kubernetes/pkg/registry/core/service"
|
||||
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
|
||||
)
|
||||
|
||||
@ -126,7 +125,7 @@ func (c *Repair) runOnce() error {
|
||||
// Check every Service's ports, and rebuild the state as we think it should be.
|
||||
for i := range list.Items {
|
||||
svc := &list.Items[i]
|
||||
ports := service.CollectServiceNodePorts(svc)
|
||||
ports := collectServiceNodePorts(svc)
|
||||
if len(ports) == 0 {
|
||||
continue
|
||||
}
|
||||
@ -196,3 +195,14 @@ func (c *Repair) runOnce() error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func collectServiceNodePorts(service *api.Service) []int {
|
||||
servicePorts := []int{}
|
||||
for i := range service.Spec.Ports {
|
||||
servicePort := &service.Spec.Ports[i]
|
||||
if servicePort.NodePort != 0 {
|
||||
servicePorts = append(servicePorts, int(servicePort.NodePort))
|
||||
}
|
||||
}
|
||||
return servicePorts
|
||||
}
|
||||
|
@ -32,7 +32,7 @@ import (
|
||||
|
||||
// ProxyREST implements the proxy subresource for a Service
|
||||
type ProxyREST struct {
|
||||
ServiceRest *REST
|
||||
Redirector rest.Redirector
|
||||
ProxyTransport http.RoundTripper
|
||||
}
|
||||
|
||||
@ -62,7 +62,7 @@ func (r *ProxyREST) Connect(ctx genericapirequest.Context, id string, opts runti
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("Invalid options object: %#v", opts)
|
||||
}
|
||||
location, transport, err := r.ServiceRest.ResourceLocation(ctx, id)
|
||||
location, transport, err := r.Redirector.ResourceLocation(ctx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -8,38 +8,74 @@ load(
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["storage_test.go"],
|
||||
srcs = [
|
||||
"rest_test.go",
|
||||
"storage_test.go",
|
||||
],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//pkg/api/service:go_default_library",
|
||||
"//pkg/apis/core:go_default_library",
|
||||
"//pkg/apis/core/helper:go_default_library",
|
||||
"//pkg/registry/core/endpoint/storage:go_default_library",
|
||||
"//pkg/registry/core/pod/storage:go_default_library",
|
||||
"//pkg/registry/core/service/ipallocator:go_default_library",
|
||||
"//pkg/registry/core/service/portallocator:go_default_library",
|
||||
"//pkg/registry/registrytest:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/internalversion:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1beta1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/rand:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/registry/generic:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/registry/generic/testing:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/storage/etcd/testing:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["storage.go"],
|
||||
srcs = [
|
||||
"rest.go",
|
||||
"storage.go",
|
||||
],
|
||||
importpath = "k8s.io/kubernetes/pkg/registry/core/service/storage",
|
||||
deps = [
|
||||
"//pkg/api/service:go_default_library",
|
||||
"//pkg/apis/core:go_default_library",
|
||||
"//pkg/apis/core/helper:go_default_library",
|
||||
"//pkg/apis/core/validation:go_default_library",
|
||||
"//pkg/features:go_default_library",
|
||||
"//pkg/printers:go_default_library",
|
||||
"//pkg/printers/internalversion:go_default_library",
|
||||
"//pkg/printers/storage:go_default_library",
|
||||
"//pkg/registry/core/service:go_default_library",
|
||||
"//pkg/registry/core/service/ipallocator:go_default_library",
|
||||
"//pkg/registry/core/service/portallocator:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/internalversion:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1beta1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/validation/field:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/registry/generic:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package service
|
||||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
@ -28,6 +28,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
@ -36,31 +37,25 @@ import (
|
||||
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
|
||||
apiservice "k8s.io/kubernetes/pkg/api/service"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
"k8s.io/kubernetes/pkg/apis/core/helper"
|
||||
"k8s.io/kubernetes/pkg/apis/core/validation"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/registry/core/endpoint"
|
||||
podstore "k8s.io/kubernetes/pkg/registry/core/pod/storage"
|
||||
registry "k8s.io/kubernetes/pkg/registry/core/service"
|
||||
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
|
||||
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
|
||||
)
|
||||
|
||||
// ServiceRest includes storage for services and all sub resources
|
||||
type ServiceRest struct {
|
||||
Service *REST
|
||||
Proxy *ProxyREST
|
||||
}
|
||||
|
||||
// REST adapts a service registry into apiserver's RESTStorage model.
|
||||
type REST struct {
|
||||
registry Registry
|
||||
endpoints endpoint.Registry
|
||||
services ServiceStorage
|
||||
endpoints EndpointsStorage
|
||||
serviceIPs ipallocator.Interface
|
||||
serviceNodePorts portallocator.Interface
|
||||
proxyTransport http.RoundTripper
|
||||
pods *podstore.REST
|
||||
pods rest.Getter
|
||||
}
|
||||
|
||||
// ServiceNodePort includes protocol and port number of a service NodePort.
|
||||
@ -73,23 +68,50 @@ type ServiceNodePort struct {
|
||||
NodePort int32
|
||||
}
|
||||
|
||||
// NewStorage returns a new REST.
|
||||
func NewStorage(registry Registry, endpoints endpoint.Registry, pods *podstore.REST, serviceIPs ipallocator.Interface,
|
||||
serviceNodePorts portallocator.Interface, proxyTransport http.RoundTripper) *ServiceRest {
|
||||
type ServiceStorage interface {
|
||||
rest.Getter
|
||||
rest.Lister
|
||||
rest.CreaterUpdater
|
||||
rest.GracefulDeleter
|
||||
rest.Watcher
|
||||
rest.TableConvertor
|
||||
rest.Exporter
|
||||
}
|
||||
|
||||
type EndpointsStorage interface {
|
||||
rest.Getter
|
||||
rest.GracefulDeleter
|
||||
}
|
||||
|
||||
// NewREST returns a wrapper around the underlying generic storage and performs
|
||||
// allocations and deallocations of various service related resources like ports.
|
||||
// TODO: all transactional behavior should be supported from within generic storage
|
||||
// or the strategy.
|
||||
func NewREST(
|
||||
services ServiceStorage,
|
||||
endpoints EndpointsStorage,
|
||||
pods rest.Getter,
|
||||
serviceIPs ipallocator.Interface,
|
||||
serviceNodePorts portallocator.Interface,
|
||||
proxyTransport http.RoundTripper,
|
||||
) (*REST, *registry.ProxyREST) {
|
||||
rest := &REST{
|
||||
registry: registry,
|
||||
services: services,
|
||||
endpoints: endpoints,
|
||||
serviceIPs: serviceIPs,
|
||||
serviceNodePorts: serviceNodePorts,
|
||||
proxyTransport: proxyTransport,
|
||||
pods: pods,
|
||||
}
|
||||
return &ServiceRest{
|
||||
Service: rest,
|
||||
Proxy: &ProxyREST{ServiceRest: rest, ProxyTransport: proxyTransport},
|
||||
}
|
||||
return rest, ®istry.ProxyREST{Redirector: rest, ProxyTransport: proxyTransport}
|
||||
}
|
||||
|
||||
var (
|
||||
_ ServiceStorage = &REST{}
|
||||
_ rest.CategoriesProvider = &REST{}
|
||||
_ rest.ShortNamesProvider = &REST{}
|
||||
)
|
||||
|
||||
// ShortNames implements the ShortNamesProvider interface. Returns a list of short names for a resource.
|
||||
func (rs *REST) ShortNames() []string {
|
||||
return []string{"svc"}
|
||||
@ -100,11 +122,34 @@ func (rs *REST) Categories() []string {
|
||||
return []string{"all"}
|
||||
}
|
||||
|
||||
// TODO: implement includeUninitialized by refactoring this to move to store
|
||||
func (rs *REST) New() runtime.Object {
|
||||
return rs.services.New()
|
||||
}
|
||||
|
||||
func (rs *REST) NewList() runtime.Object {
|
||||
return rs.services.NewList()
|
||||
}
|
||||
|
||||
func (rs *REST) Get(ctx genericapirequest.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
|
||||
return rs.services.Get(ctx, name, options)
|
||||
}
|
||||
|
||||
func (rs *REST) List(ctx genericapirequest.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
|
||||
return rs.services.List(ctx, options)
|
||||
}
|
||||
|
||||
func (rs *REST) Watch(ctx genericapirequest.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {
|
||||
return rs.services.Watch(ctx, options)
|
||||
}
|
||||
|
||||
func (rs *REST) Export(ctx genericapirequest.Context, name string, opts metav1.ExportOptions) (runtime.Object, error) {
|
||||
return rs.services.Export(ctx, name, opts)
|
||||
}
|
||||
|
||||
func (rs *REST) Create(ctx genericapirequest.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, includeUninitialized bool) (runtime.Object, error) {
|
||||
service := obj.(*api.Service)
|
||||
|
||||
if err := rest.BeforeCreate(Strategy, ctx, obj); err != nil {
|
||||
if err := rest.BeforeCreate(registry.Strategy, ctx, obj); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -120,7 +165,7 @@ func (rs *REST) Create(ctx genericapirequest.Context, obj runtime.Object, create
|
||||
|
||||
var err error
|
||||
if service.Spec.Type != api.ServiceTypeExternalName {
|
||||
if releaseServiceIP, err = rs.initClusterIP(service); err != nil {
|
||||
if releaseServiceIP, err = initClusterIP(service, rs.serviceIPs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
@ -129,14 +174,14 @@ func (rs *REST) Create(ctx genericapirequest.Context, obj runtime.Object, create
|
||||
defer nodePortOp.Finish()
|
||||
|
||||
if service.Spec.Type == api.ServiceTypeNodePort || service.Spec.Type == api.ServiceTypeLoadBalancer {
|
||||
if err := rs.initNodePorts(service, nodePortOp); err != nil {
|
||||
if err := initNodePorts(service, nodePortOp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Handle ExternalTraffic related fields during service creation.
|
||||
if apiservice.NeedsHealthCheck(service) {
|
||||
if err := rs.allocateHealthCheckNodePort(service, nodePortOp); err != nil {
|
||||
if err := allocateHealthCheckNodePort(service, nodePortOp); err != nil {
|
||||
return nil, errors.NewInternalError(err)
|
||||
}
|
||||
}
|
||||
@ -144,16 +189,16 @@ func (rs *REST) Create(ctx genericapirequest.Context, obj runtime.Object, create
|
||||
return nil, errors.NewInvalid(api.Kind("Service"), service.Name, errs)
|
||||
}
|
||||
|
||||
out, err := rs.registry.CreateService(ctx, service, createValidation)
|
||||
out, err := rs.services.Create(ctx, service, createValidation, includeUninitialized)
|
||||
if err != nil {
|
||||
err = rest.CheckGeneratedNameError(Strategy, err, service)
|
||||
err = rest.CheckGeneratedNameError(registry.Strategy, err, service)
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
el := nodePortOp.Commit()
|
||||
if el != nil {
|
||||
// these should be caught by an eventual reconciliation / restart
|
||||
glog.Errorf("error(s) committing service node-ports changes: %v", el)
|
||||
utilruntime.HandleError(fmt.Errorf("error(s) committing service node-ports changes: %v", el))
|
||||
}
|
||||
|
||||
releaseServiceIP = false
|
||||
@ -162,75 +207,56 @@ func (rs *REST) Create(ctx genericapirequest.Context, obj runtime.Object, create
|
||||
return out, err
|
||||
}
|
||||
|
||||
func (rs *REST) Delete(ctx genericapirequest.Context, id string) (runtime.Object, error) {
|
||||
service, err := rs.registry.GetService(ctx, id, &metav1.GetOptions{})
|
||||
func (rs *REST) Delete(ctx genericapirequest.Context, id string, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
|
||||
// TODO: handle graceful
|
||||
obj, _, err := rs.services.Delete(ctx, id, options)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
err = rs.registry.DeleteService(ctx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
svc := obj.(*api.Service)
|
||||
|
||||
// TODO: can leave dangling endpoints, and potentially return incorrect
|
||||
// endpoints if a new service is created with the same name
|
||||
err = rs.endpoints.DeleteEndpoints(ctx, id)
|
||||
_, _, err = rs.endpoints.Delete(ctx, id, &metav1.DeleteOptions{})
|
||||
if err != nil && !errors.IsNotFound(err) {
|
||||
return nil, err
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
if helper.IsServiceIPSet(service) {
|
||||
rs.serviceIPs.Release(net.ParseIP(service.Spec.ClusterIP))
|
||||
if helper.IsServiceIPSet(svc) {
|
||||
rs.serviceIPs.Release(net.ParseIP(svc.Spec.ClusterIP))
|
||||
}
|
||||
|
||||
for _, nodePort := range CollectServiceNodePorts(service) {
|
||||
for _, nodePort := range collectServiceNodePorts(svc) {
|
||||
err := rs.serviceNodePorts.Release(nodePort)
|
||||
if err != nil {
|
||||
// these should be caught by an eventual reconciliation / restart
|
||||
glog.Errorf("Error releasing service %s node port %d: %v", service.Name, nodePort, err)
|
||||
utilruntime.HandleError(fmt.Errorf("Error releasing service %s node port %d: %v", svc.Name, nodePort, err))
|
||||
}
|
||||
}
|
||||
|
||||
if apiservice.NeedsHealthCheck(service) {
|
||||
nodePort := service.Spec.HealthCheckNodePort
|
||||
if apiservice.NeedsHealthCheck(svc) {
|
||||
nodePort := svc.Spec.HealthCheckNodePort
|
||||
if nodePort > 0 {
|
||||
err := rs.serviceNodePorts.Release(int(nodePort))
|
||||
if err != nil {
|
||||
// these should be caught by an eventual reconciliation / restart
|
||||
utilruntime.HandleError(fmt.Errorf("Error releasing service %s health check node port %d: %v", service.Name, nodePort, err))
|
||||
utilruntime.HandleError(fmt.Errorf("Error releasing service %s health check node port %d: %v", svc.Name, nodePort, err))
|
||||
}
|
||||
}
|
||||
}
|
||||
return &metav1.Status{Status: metav1.StatusSuccess}, nil
|
||||
}
|
||||
|
||||
func (rs *REST) Get(ctx genericapirequest.Context, id string, options *metav1.GetOptions) (runtime.Object, error) {
|
||||
return rs.registry.GetService(ctx, id, options)
|
||||
}
|
||||
|
||||
func (rs *REST) List(ctx genericapirequest.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
|
||||
return rs.registry.ListServices(ctx, options)
|
||||
}
|
||||
|
||||
// Watch returns Services events via a watch.Interface.
|
||||
// It implements rest.Watcher.
|
||||
func (rs *REST) Watch(ctx genericapirequest.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {
|
||||
return rs.registry.WatchServices(ctx, options)
|
||||
}
|
||||
|
||||
// Export returns Service stripped of cluster-specific information.
|
||||
// It implements rest.Exporter.
|
||||
func (rs *REST) Export(ctx genericapirequest.Context, name string, opts metav1.ExportOptions) (runtime.Object, error) {
|
||||
return rs.registry.ExportService(ctx, name, opts)
|
||||
}
|
||||
|
||||
func (*REST) New() runtime.Object {
|
||||
return &api.Service{}
|
||||
}
|
||||
|
||||
func (*REST) NewList() runtime.Object {
|
||||
return &api.ServiceList{}
|
||||
// TODO: this is duplicated from the generic storage, when this wrapper is fully removed we can drop this
|
||||
details := &metav1.StatusDetails{
|
||||
Name: svc.Name,
|
||||
UID: svc.UID,
|
||||
}
|
||||
if info, ok := genericapirequest.RequestInfoFrom(ctx); ok {
|
||||
details.Group = info.APIGroup
|
||||
details.Kind = info.Resource // legacy behavior
|
||||
}
|
||||
status := &metav1.Status{Status: metav1.StatusSuccess, Details: details}
|
||||
return status, true, nil
|
||||
}
|
||||
|
||||
// externalTrafficPolicyUpdate adjusts ExternalTrafficPolicy during service update if needed.
|
||||
@ -267,7 +293,7 @@ func (rs *REST) healthCheckNodePortUpdate(oldService, service *api.Service, node
|
||||
// Insert health check node port into the service's HealthCheckNodePort field if needed.
|
||||
case !neededHealthCheckNodePort && needsHealthCheckNodePort:
|
||||
glog.Infof("Transition to LoadBalancer type service with ExternalTrafficPolicy=Local")
|
||||
if err := rs.allocateHealthCheckNodePort(service, nodePortOp); err != nil {
|
||||
if err := allocateHealthCheckNodePort(service, nodePortOp); err != nil {
|
||||
return false, errors.NewInternalError(err)
|
||||
}
|
||||
|
||||
@ -295,10 +321,11 @@ func (rs *REST) healthCheckNodePortUpdate(oldService, service *api.Service, node
|
||||
}
|
||||
|
||||
func (rs *REST) Update(ctx genericapirequest.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc) (runtime.Object, bool, error) {
|
||||
oldService, err := rs.registry.GetService(ctx, name, &metav1.GetOptions{})
|
||||
oldObj, err := rs.services.Get(ctx, name, &metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
oldService := oldObj.(*api.Service)
|
||||
|
||||
obj, err := objInfo.UpdatedObject(ctx, oldService)
|
||||
if err != nil {
|
||||
@ -331,7 +358,7 @@ func (rs *REST) Update(ctx genericapirequest.Context, name string, objInfo rest.
|
||||
|
||||
// Update service from ExternalName to non-ExternalName, should initialize ClusterIP.
|
||||
if oldService.Spec.Type == api.ServiceTypeExternalName && service.Spec.Type != api.ServiceTypeExternalName {
|
||||
if releaseServiceIP, err = rs.initClusterIP(service); err != nil {
|
||||
if releaseServiceIP, err = initClusterIP(service, rs.serviceIPs); err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
}
|
||||
@ -344,11 +371,11 @@ func (rs *REST) Update(ctx genericapirequest.Context, name string, objInfo rest.
|
||||
// Update service from NodePort or LoadBalancer to ExternalName or ClusterIP, should release NodePort if exists.
|
||||
if (oldService.Spec.Type == api.ServiceTypeNodePort || oldService.Spec.Type == api.ServiceTypeLoadBalancer) &&
|
||||
(service.Spec.Type == api.ServiceTypeExternalName || service.Spec.Type == api.ServiceTypeClusterIP) {
|
||||
rs.releaseNodePorts(oldService, nodePortOp)
|
||||
releaseNodePorts(oldService, nodePortOp)
|
||||
}
|
||||
// Update service from any type to NodePort or LoadBalancer, should update NodePort.
|
||||
if service.Spec.Type == api.ServiceTypeNodePort || service.Spec.Type == api.ServiceTypeLoadBalancer {
|
||||
if err := rs.updateNodePorts(oldService, service, nodePortOp); err != nil {
|
||||
if err := updateNodePorts(oldService, service, nodePortOp); err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
}
|
||||
@ -368,18 +395,18 @@ func (rs *REST) Update(ctx genericapirequest.Context, name string, objInfo rest.
|
||||
return nil, false, errors.NewInvalid(api.Kind("Service"), service.Name, errs)
|
||||
}
|
||||
|
||||
out, err := rs.registry.UpdateService(ctx, service, createValidation, updateValidation)
|
||||
out, created, err := rs.services.Update(ctx, service.Name, rest.DefaultUpdatedObjectInfo(service), createValidation, updateValidation)
|
||||
if err == nil {
|
||||
el := nodePortOp.Commit()
|
||||
if el != nil {
|
||||
// problems should be fixed by an eventual reconciliation / restart
|
||||
glog.Errorf("error(s) committing NodePorts changes: %v", el)
|
||||
utilruntime.HandleError(fmt.Errorf("error(s) committing NodePorts changes: %v", el))
|
||||
}
|
||||
|
||||
releaseServiceIP = false
|
||||
}
|
||||
|
||||
return out, false, err
|
||||
return out, created, err
|
||||
}
|
||||
|
||||
// Implement Redirector.
|
||||
@ -395,10 +422,11 @@ func (rs *REST) ResourceLocation(ctx genericapirequest.Context, id string) (*url
|
||||
|
||||
// If a port *number* was specified, find the corresponding service port name
|
||||
if portNum, err := strconv.ParseInt(portStr, 10, 64); err == nil {
|
||||
svc, err := rs.registry.GetService(ctx, svcName, &metav1.GetOptions{})
|
||||
obj, err := rs.services.Get(ctx, svcName, &metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
svc := obj.(*api.Service)
|
||||
found := false
|
||||
for _, svcPort := range svc.Spec.Ports {
|
||||
if int64(svcPort.Port) == portNum {
|
||||
@ -413,10 +441,11 @@ func (rs *REST) ResourceLocation(ctx genericapirequest.Context, id string) (*url
|
||||
}
|
||||
}
|
||||
|
||||
eps, err := rs.endpoints.GetEndpoints(ctx, svcName, &metav1.GetOptions{})
|
||||
obj, err := rs.endpoints.Get(ctx, svcName, &metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
eps := obj.(*api.Endpoints)
|
||||
if len(eps.Subsets) == 0 {
|
||||
return nil, nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", svcName))
|
||||
}
|
||||
@ -439,7 +468,7 @@ func (rs *REST) ResourceLocation(ctx genericapirequest.Context, id string) (*url
|
||||
addr := ss.Addresses[(addrSeed+try)%len(ss.Addresses)]
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.ServiceProxyAllowExternalIPs) {
|
||||
if err := isValidAddress(ctx, &addr, rs.pods); err != nil {
|
||||
glog.Errorf("Address %v isn't valid (%v)", addr, err)
|
||||
utilruntime.HandleError(fmt.Errorf("Address %v isn't valid (%v)", addr, err))
|
||||
continue
|
||||
}
|
||||
}
|
||||
@ -450,14 +479,18 @@ func (rs *REST) ResourceLocation(ctx genericapirequest.Context, id string) (*url
|
||||
Host: net.JoinHostPort(ip, strconv.Itoa(port)),
|
||||
}, rs.proxyTransport, nil
|
||||
}
|
||||
glog.Errorf("Failed to find a valid address, skipping subset: %v", ss)
|
||||
utilruntime.HandleError(fmt.Errorf("Failed to find a valid address, skipping subset: %v", ss))
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil, nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", id))
|
||||
}
|
||||
|
||||
func isValidAddress(ctx genericapirequest.Context, addr *api.EndpointAddress, pods *podstore.REST) error {
|
||||
func (r *REST) ConvertToTable(ctx genericapirequest.Context, object runtime.Object, tableOptions runtime.Object) (*metav1beta1.Table, error) {
|
||||
return r.services.ConvertToTable(ctx, object, tableOptions)
|
||||
}
|
||||
|
||||
func isValidAddress(ctx genericapirequest.Context, addr *api.EndpointAddress, pods rest.Getter) error {
|
||||
if addr.TargetRef == nil {
|
||||
return fmt.Errorf("Address has no target ref, skipping: %v", addr)
|
||||
}
|
||||
@ -503,17 +536,6 @@ func containsNodePort(serviceNodePorts []ServiceNodePort, serviceNodePort Servic
|
||||
return false
|
||||
}
|
||||
|
||||
func CollectServiceNodePorts(service *api.Service) []int {
|
||||
servicePorts := []int{}
|
||||
for i := range service.Spec.Ports {
|
||||
servicePort := &service.Spec.Ports[i]
|
||||
if servicePort.NodePort != 0 {
|
||||
servicePorts = append(servicePorts, int(servicePort.NodePort))
|
||||
}
|
||||
}
|
||||
return servicePorts
|
||||
}
|
||||
|
||||
// Loop through the service ports list, find one with the same port number and
|
||||
// NodePort specified, return this NodePort otherwise return 0.
|
||||
func findRequestedNodePort(port int, servicePorts []api.ServicePort) int {
|
||||
@ -527,7 +549,7 @@ func findRequestedNodePort(port int, servicePorts []api.ServicePort) int {
|
||||
}
|
||||
|
||||
// allocateHealthCheckNodePort allocates health check node port to service.
|
||||
func (rs *REST) allocateHealthCheckNodePort(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) error {
|
||||
func allocateHealthCheckNodePort(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) error {
|
||||
healthCheckNodePort := service.Spec.HealthCheckNodePort
|
||||
if healthCheckNodePort != 0 {
|
||||
// If the request has a health check nodePort in mind, attempt to reserve it.
|
||||
@ -550,11 +572,11 @@ func (rs *REST) allocateHealthCheckNodePort(service *api.Service, nodePortOp *po
|
||||
}
|
||||
|
||||
// The return bool value indicates if a cluster IP is allocated successfully.
|
||||
func (rs *REST) initClusterIP(service *api.Service) (bool, error) {
|
||||
func initClusterIP(service *api.Service, serviceIPs ipallocator.Interface) (bool, error) {
|
||||
switch {
|
||||
case service.Spec.ClusterIP == "":
|
||||
// Allocate next available.
|
||||
ip, err := rs.serviceIPs.AllocateNext()
|
||||
ip, err := serviceIPs.AllocateNext()
|
||||
if err != nil {
|
||||
// TODO: what error should be returned here? It's not a
|
||||
// field-level validation failure (the field is valid), and it's
|
||||
@ -565,7 +587,7 @@ func (rs *REST) initClusterIP(service *api.Service) (bool, error) {
|
||||
return true, nil
|
||||
case service.Spec.ClusterIP != api.ClusterIPNone && service.Spec.ClusterIP != "":
|
||||
// Try to respect the requested IP.
|
||||
if err := rs.serviceIPs.Allocate(net.ParseIP(service.Spec.ClusterIP)); err != nil {
|
||||
if err := serviceIPs.Allocate(net.ParseIP(service.Spec.ClusterIP)); err != nil {
|
||||
// TODO: when validation becomes versioned, this gets more complicated.
|
||||
el := field.ErrorList{field.Invalid(field.NewPath("spec", "clusterIP"), service.Spec.ClusterIP, err.Error())}
|
||||
return false, errors.NewInvalid(api.Kind("Service"), service.Name, el)
|
||||
@ -576,7 +598,7 @@ func (rs *REST) initClusterIP(service *api.Service) (bool, error) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (rs *REST) initNodePorts(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) error {
|
||||
func initNodePorts(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) error {
|
||||
svcPortToNodePort := map[int]int{}
|
||||
for i := range service.Spec.Ports {
|
||||
servicePort := &service.Spec.Ports[i]
|
||||
@ -625,8 +647,8 @@ func (rs *REST) initNodePorts(service *api.Service, nodePortOp *portallocator.Po
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rs *REST) updateNodePorts(oldService, newService *api.Service, nodePortOp *portallocator.PortAllocationOperation) error {
|
||||
oldNodePortsNumbers := CollectServiceNodePorts(oldService)
|
||||
func updateNodePorts(oldService, newService *api.Service, nodePortOp *portallocator.PortAllocationOperation) error {
|
||||
oldNodePortsNumbers := collectServiceNodePorts(oldService)
|
||||
newNodePorts := []ServiceNodePort{}
|
||||
portAllocated := map[int]bool{}
|
||||
|
||||
@ -659,7 +681,7 @@ func (rs *REST) updateNodePorts(oldService, newService *api.Service, nodePortOp
|
||||
newNodePorts = append(newNodePorts, nodePort)
|
||||
}
|
||||
|
||||
newNodePortsNumbers := CollectServiceNodePorts(newService)
|
||||
newNodePortsNumbers := collectServiceNodePorts(newService)
|
||||
|
||||
// The comparison loops are O(N^2), but we don't expect N to be huge
|
||||
// (there's a hard-limit at 2^16, because they're ports; and even 4 ports would be a lot)
|
||||
@ -673,10 +695,21 @@ func (rs *REST) updateNodePorts(oldService, newService *api.Service, nodePortOp
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rs *REST) releaseNodePorts(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) {
|
||||
nodePorts := CollectServiceNodePorts(service)
|
||||
func releaseNodePorts(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) {
|
||||
nodePorts := collectServiceNodePorts(service)
|
||||
|
||||
for _, nodePort := range nodePorts {
|
||||
nodePortOp.ReleaseDeferred(nodePort)
|
||||
}
|
||||
}
|
||||
|
||||
func collectServiceNodePorts(service *api.Service) []int {
|
||||
servicePorts := []int{}
|
||||
for i := range service.Spec.Ports {
|
||||
servicePort := &service.Spec.Ports[i]
|
||||
if servicePort.NodePort != 0 {
|
||||
servicePorts = append(servicePorts, int(servicePort.NodePort))
|
||||
}
|
||||
}
|
||||
return servicePorts
|
||||
}
|
@ -14,28 +14,33 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package service
|
||||
package storage
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"net"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||
"k8s.io/apimachinery/pkg/util/rand"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/registry/generic"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api/service"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
"k8s.io/kubernetes/pkg/apis/core/helper"
|
||||
endpointstore "k8s.io/kubernetes/pkg/registry/core/endpoint/storage"
|
||||
podstore "k8s.io/kubernetes/pkg/registry/core/pod/storage"
|
||||
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
|
||||
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
|
||||
@ -46,28 +51,121 @@ import (
|
||||
// It is now testing mostly the same things as other resources but
|
||||
// in a completely different way. We should unify it.
|
||||
|
||||
type serviceStorage struct {
|
||||
GottenID string
|
||||
UpdatedID string
|
||||
CreatedID string
|
||||
DeletedID string
|
||||
Created bool
|
||||
DeletedImmediately bool
|
||||
Service *api.Service
|
||||
OldService *api.Service
|
||||
ServiceList *api.ServiceList
|
||||
Err error
|
||||
}
|
||||
|
||||
func (s *serviceStorage) Get(ctx genericapirequest.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
|
||||
s.GottenID = name
|
||||
return s.Service, s.Err
|
||||
}
|
||||
|
||||
func (s *serviceStorage) GetService(ctx genericapirequest.Context, name string, options *metav1.GetOptions) (*api.Service, error) {
|
||||
return s.Service, s.Err
|
||||
}
|
||||
|
||||
func (s *serviceStorage) NewList() runtime.Object {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (s *serviceStorage) List(ctx genericapirequest.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
|
||||
ns, _ := genericapirequest.NamespaceFrom(ctx)
|
||||
|
||||
// Copy metadata from internal list into result
|
||||
res := new(api.ServiceList)
|
||||
res.TypeMeta = s.ServiceList.TypeMeta
|
||||
res.ListMeta = s.ServiceList.ListMeta
|
||||
|
||||
if ns != metav1.NamespaceAll {
|
||||
for _, service := range s.ServiceList.Items {
|
||||
if ns == service.Namespace {
|
||||
res.Items = append(res.Items, service)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
res.Items = append([]api.Service{}, s.ServiceList.Items...)
|
||||
}
|
||||
|
||||
return res, s.Err
|
||||
}
|
||||
|
||||
func (s *serviceStorage) New() runtime.Object {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (s *serviceStorage) Create(ctx genericapirequest.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, includeUninitialized bool) (runtime.Object, error) {
|
||||
svc := obj.(*api.Service)
|
||||
s.CreatedID = obj.(metav1.Object).GetName()
|
||||
s.Service = svc.DeepCopy()
|
||||
|
||||
if s.ServiceList == nil {
|
||||
s.ServiceList = &api.ServiceList{}
|
||||
}
|
||||
|
||||
s.ServiceList.Items = append(s.ServiceList.Items, *svc)
|
||||
return svc, s.Err
|
||||
}
|
||||
|
||||
func (s *serviceStorage) Update(ctx genericapirequest.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc) (runtime.Object, bool, error) {
|
||||
s.UpdatedID = name
|
||||
obj, err := objInfo.UpdatedObject(ctx, s.OldService)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
s.Service = obj.(*api.Service)
|
||||
return s.Service, s.Created, s.Err
|
||||
}
|
||||
|
||||
func (s *serviceStorage) Delete(ctx genericapirequest.Context, name string, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
|
||||
s.DeletedID = name
|
||||
return s.Service, s.DeletedImmediately, s.Err
|
||||
}
|
||||
|
||||
func (s *serviceStorage) DeleteCollection(ctx genericapirequest.Context, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) (runtime.Object, error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (s *serviceStorage) Watch(ctx genericapirequest.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (s *serviceStorage) ConvertToTable(ctx genericapirequest.Context, object runtime.Object, tableOptions runtime.Object) (*metav1beta1.Table, error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (s *serviceStorage) Export(ctx genericapirequest.Context, name string, opts metav1.ExportOptions) (runtime.Object, error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func generateRandomNodePort() int32 {
|
||||
return int32(rand.IntnRange(30001, 30999))
|
||||
}
|
||||
|
||||
func NewTestREST(t *testing.T, endpoints *api.EndpointsList) (*REST, *registrytest.ServiceRegistry, *etcdtesting.EtcdTestServer) {
|
||||
func NewTestREST(t *testing.T, endpoints *api.EndpointsList) (*REST, *serviceStorage, *etcdtesting.EtcdTestServer) {
|
||||
return NewTestRESTWithPods(t, endpoints, nil)
|
||||
}
|
||||
|
||||
func NewTestRESTWithPods(t *testing.T, endpoints *api.EndpointsList, pods *api.PodList) (*REST, *registrytest.ServiceRegistry, *etcdtesting.EtcdTestServer) {
|
||||
registry := registrytest.NewServiceRegistry()
|
||||
endpointRegistry := ®istrytest.EndpointRegistry{
|
||||
Endpoints: endpoints,
|
||||
}
|
||||
func NewTestRESTWithPods(t *testing.T, endpoints *api.EndpointsList, pods *api.PodList) (*REST, *serviceStorage, *etcdtesting.EtcdTestServer) {
|
||||
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
|
||||
restOptions := generic.RESTOptions{
|
||||
|
||||
serviceStorage := &serviceStorage{}
|
||||
|
||||
podStorage := podstore.NewStorage(generic.RESTOptions{
|
||||
StorageConfig: etcdStorage,
|
||||
Decorator: generic.UndecoratedStorage,
|
||||
DeleteCollectionWorkers: 3,
|
||||
ResourcePrefix: "pods",
|
||||
}
|
||||
podStorage := podstore.NewStorage(restOptions, nil, nil, nil)
|
||||
if pods != nil && pods.Items != nil {
|
||||
}, nil, nil, nil)
|
||||
if pods != nil && len(pods.Items) > 0 {
|
||||
ctx := genericapirequest.NewDefaultContext()
|
||||
for ix := range pods.Items {
|
||||
key, _ := podStorage.Pod.KeyFunc(ctx, pods.Items[ix].Name)
|
||||
@ -76,14 +174,29 @@ func NewTestRESTWithPods(t *testing.T, endpoints *api.EndpointsList, pods *api.P
|
||||
}
|
||||
}
|
||||
}
|
||||
endpointStorage := endpointstore.NewREST(generic.RESTOptions{
|
||||
StorageConfig: etcdStorage,
|
||||
Decorator: generic.UndecoratedStorage,
|
||||
ResourcePrefix: "endpoints",
|
||||
})
|
||||
if endpoints != nil && len(endpoints.Items) > 0 {
|
||||
ctx := genericapirequest.NewDefaultContext()
|
||||
for ix := range endpoints.Items {
|
||||
key, _ := endpointStorage.KeyFunc(ctx, endpoints.Items[ix].Name)
|
||||
if err := endpointStorage.Store.Storage.Create(ctx, key, &endpoints.Items[ix], nil, 0); err != nil {
|
||||
t.Fatalf("Couldn't create endpoint: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
r := ipallocator.NewCIDRRange(makeIPNet(t))
|
||||
|
||||
portRange := utilnet.PortRange{Base: 30000, Size: 1000}
|
||||
portAllocator := portallocator.NewPortAllocator(portRange)
|
||||
|
||||
storage := NewStorage(registry, endpointRegistry, podStorage.Pod, r, portAllocator, nil)
|
||||
rest, _ := NewREST(serviceStorage, endpointStorage, podStorage.Pod, r, portAllocator, nil)
|
||||
|
||||
return storage.Service, registry, server
|
||||
return rest, serviceStorage, server
|
||||
}
|
||||
|
||||
func makeIPNet(t *testing.T) *net.IPNet {
|
||||
@ -94,15 +207,16 @@ func makeIPNet(t *testing.T) *net.IPNet {
|
||||
return net
|
||||
}
|
||||
|
||||
func releaseServiceNodePorts(t *testing.T, ctx genericapirequest.Context, svcName string, rest *REST, registry *registrytest.ServiceRegistry) {
|
||||
srv, err := registry.GetService(ctx, svcName, &metav1.GetOptions{})
|
||||
func releaseServiceNodePorts(t *testing.T, ctx genericapirequest.Context, svcName string, rest *REST, registry ServiceStorage) {
|
||||
obj, err := registry.Get(ctx, svcName, &metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
srv := obj.(*api.Service)
|
||||
if srv == nil {
|
||||
t.Fatalf("Failed to find service: %s", svcName)
|
||||
}
|
||||
serviceNodePorts := CollectServiceNodePorts(srv)
|
||||
serviceNodePorts := collectServiceNodePorts(srv)
|
||||
if len(serviceNodePorts) == 0 {
|
||||
t.Errorf("Failed to find NodePorts of service : %s", srv.Name)
|
||||
}
|
||||
@ -271,7 +385,7 @@ func TestServiceRegistryCreateMultiNodePortsService(t *testing.T) {
|
||||
if created_service.Name != test.name {
|
||||
t.Errorf("Expected %s, but got %s", test.name, created_service.Name)
|
||||
}
|
||||
serviceNodePorts := CollectServiceNodePorts(created_service)
|
||||
serviceNodePorts := collectServiceNodePorts(created_service)
|
||||
if !reflect.DeepEqual(serviceNodePorts, test.expectNodePorts) {
|
||||
t.Errorf("Expected %v, but got %v", test.expectNodePorts, serviceNodePorts)
|
||||
}
|
||||
@ -348,7 +462,7 @@ func TestServiceRegistryUpdate(t *testing.T) {
|
||||
storage, registry, server := NewTestREST(t, nil)
|
||||
defer server.Terminate(t)
|
||||
|
||||
svc, err := registry.CreateService(ctx, &api.Service{
|
||||
obj, err := registry.Create(ctx, &api.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1", Namespace: metav1.NamespaceDefault},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"bar": "baz1"},
|
||||
@ -358,8 +472,8 @@ func TestServiceRegistryUpdate(t *testing.T) {
|
||||
TargetPort: intstr.FromInt(6502),
|
||||
}},
|
||||
},
|
||||
}, rest.ValidateAllObjectFunc)
|
||||
|
||||
}, rest.ValidateAllObjectFunc, false)
|
||||
svc := obj.(*api.Service)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error: %v", err)
|
||||
}
|
||||
@ -400,7 +514,7 @@ func TestServiceStorageValidatesUpdate(t *testing.T) {
|
||||
ctx := genericapirequest.NewDefaultContext()
|
||||
storage, registry, server := NewTestREST(t, nil)
|
||||
defer server.Terminate(t)
|
||||
registry.CreateService(ctx, &api.Service{
|
||||
registry.Create(ctx, &api.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"bar": "baz"},
|
||||
@ -409,7 +523,7 @@ func TestServiceStorageValidatesUpdate(t *testing.T) {
|
||||
Protocol: api.ProtocolTCP,
|
||||
}},
|
||||
},
|
||||
}, rest.ValidateAllObjectFunc)
|
||||
}, rest.ValidateAllObjectFunc, false)
|
||||
failureCases := map[string]api.Service{
|
||||
"empty ID": {
|
||||
ObjectMeta: metav1.ObjectMeta{Name: ""},
|
||||
@ -477,7 +591,7 @@ func TestServiceRegistryExternalService(t *testing.T) {
|
||||
if srv == nil {
|
||||
t.Fatalf("Failed to find service: %s", svc.Name)
|
||||
}
|
||||
serviceNodePorts := CollectServiceNodePorts(srv)
|
||||
serviceNodePorts := collectServiceNodePorts(srv)
|
||||
if len(serviceNodePorts) == 0 {
|
||||
t.Errorf("Failed to find NodePorts of service : %s", srv.Name)
|
||||
}
|
||||
@ -504,8 +618,8 @@ func TestServiceRegistryDelete(t *testing.T) {
|
||||
}},
|
||||
},
|
||||
}
|
||||
registry.CreateService(ctx, svc, rest.ValidateAllObjectFunc)
|
||||
storage.Delete(ctx, svc.Name)
|
||||
registry.Create(ctx, svc, rest.ValidateAllObjectFunc, false)
|
||||
storage.Delete(ctx, svc.Name, &metav1.DeleteOptions{})
|
||||
if e, a := "foo", registry.DeletedID; e != a {
|
||||
t.Errorf("Expected %v, but got %v", e, a)
|
||||
}
|
||||
@ -527,8 +641,8 @@ func TestServiceRegistryDeleteExternal(t *testing.T) {
|
||||
}},
|
||||
},
|
||||
}
|
||||
registry.CreateService(ctx, svc, rest.ValidateAllObjectFunc)
|
||||
storage.Delete(ctx, svc.Name)
|
||||
registry.Create(ctx, svc, rest.ValidateAllObjectFunc, false)
|
||||
storage.Delete(ctx, svc.Name, &metav1.DeleteOptions{})
|
||||
if e, a := "foo", registry.DeletedID; e != a {
|
||||
t.Errorf("Expected %v, but got %v", e, a)
|
||||
}
|
||||
@ -615,12 +729,12 @@ func TestServiceRegistryGet(t *testing.T) {
|
||||
ctx := genericapirequest.NewDefaultContext()
|
||||
storage, registry, server := NewTestREST(t, nil)
|
||||
defer server.Terminate(t)
|
||||
registry.CreateService(ctx, &api.Service{
|
||||
registry.Create(ctx, &api.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"bar": "baz"},
|
||||
},
|
||||
}, rest.ValidateAllObjectFunc)
|
||||
}, rest.ValidateAllObjectFunc, false)
|
||||
storage.Get(ctx, "foo", &metav1.GetOptions{})
|
||||
if e, a := "foo", registry.GottenID; e != a {
|
||||
t.Errorf("Expected %v, but got %v", e, a)
|
||||
@ -655,22 +769,6 @@ func TestServiceRegistryResourceLocation(t *testing.T) {
|
||||
Ports: []api.EndpointPort{{Name: "", Port: 80}, {Name: "p", Port: 93}},
|
||||
}},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "foo",
|
||||
Namespace: metav1.NamespaceDefault,
|
||||
},
|
||||
Subsets: []api.EndpointSubset{{
|
||||
Addresses: []api.EndpointAddress{},
|
||||
Ports: []api.EndpointPort{{Name: "", Port: 80}, {Name: "p", Port: 93}},
|
||||
}, {
|
||||
Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Name: "foo", Namespace: metav1.NamespaceDefault}}},
|
||||
Ports: []api.EndpointPort{{Name: "", Port: 80}, {Name: "p", Port: 93}},
|
||||
}, {
|
||||
Addresses: []api.EndpointAddress{{IP: "1.2.3.5", TargetRef: &api.ObjectReference{Name: "bar", Namespace: metav1.NamespaceDefault}}},
|
||||
Ports: []api.EndpointPort{},
|
||||
}},
|
||||
},
|
||||
},
|
||||
}
|
||||
pods := &api.PodList{
|
||||
@ -708,7 +806,7 @@ func TestServiceRegistryResourceLocation(t *testing.T) {
|
||||
storage, registry, server := NewTestRESTWithPods(t, endpoints, pods)
|
||||
defer server.Terminate(t)
|
||||
for _, name := range []string{"foo", "bad"} {
|
||||
registry.CreateService(ctx, &api.Service{
|
||||
registry.Create(ctx, &api.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: name},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"bar": "baz"},
|
||||
@ -721,7 +819,7 @@ func TestServiceRegistryResourceLocation(t *testing.T) {
|
||||
{Name: "", Port: 93, TargetPort: intstr.FromInt(80)},
|
||||
},
|
||||
},
|
||||
}, rest.ValidateAllObjectFunc)
|
||||
}, rest.ValidateAllObjectFunc, false)
|
||||
}
|
||||
redirector := rest.Redirector(storage)
|
||||
|
||||
@ -807,19 +905,19 @@ func TestServiceRegistryList(t *testing.T) {
|
||||
ctx := genericapirequest.NewDefaultContext()
|
||||
storage, registry, server := NewTestREST(t, nil)
|
||||
defer server.Terminate(t)
|
||||
registry.CreateService(ctx, &api.Service{
|
||||
registry.Create(ctx, &api.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: metav1.NamespaceDefault},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"bar": "baz"},
|
||||
},
|
||||
}, rest.ValidateAllObjectFunc)
|
||||
registry.CreateService(ctx, &api.Service{
|
||||
}, rest.ValidateAllObjectFunc, false)
|
||||
registry.Create(ctx, &api.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "foo2", Namespace: metav1.NamespaceDefault},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"bar2": "baz2"},
|
||||
},
|
||||
}, rest.ValidateAllObjectFunc)
|
||||
registry.List.ResourceVersion = "1"
|
||||
}, rest.ValidateAllObjectFunc, false)
|
||||
registry.ServiceList.ResourceVersion = "1"
|
||||
s, _ := storage.List(ctx, nil)
|
||||
sl := s.(*api.ServiceList)
|
||||
if len(sl.Items) != 2 {
|
||||
@ -946,7 +1044,7 @@ func TestServiceRegistryIPReallocation(t *testing.T) {
|
||||
t.Errorf("Unexpected ClusterIP: %s", created_service_1.Spec.ClusterIP)
|
||||
}
|
||||
|
||||
_, err := storage.Delete(ctx, created_service_1.Name)
|
||||
_, _, err := storage.Delete(ctx, created_service_1.Name, &metav1.DeleteOptions{})
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error deleting service: %v", err)
|
||||
}
|
||||
@ -1303,7 +1401,7 @@ func TestInitClusterIP(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, test := range testCases {
|
||||
hasAllocatedIP, err := storage.initClusterIP(test.svc)
|
||||
hasAllocatedIP, err := initClusterIP(test.svc, storage.serviceIPs)
|
||||
if err != nil {
|
||||
t.Errorf("%q: unexpected error: %v", test.name, err)
|
||||
}
|
||||
@ -1488,13 +1586,13 @@ func TestInitNodePorts(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, test := range testCases {
|
||||
err := storage.initNodePorts(test.service, nodePortOp)
|
||||
err := initNodePorts(test.service, nodePortOp)
|
||||
if err != nil {
|
||||
t.Errorf("%q: unexpected error: %v", test.name, err)
|
||||
continue
|
||||
}
|
||||
|
||||
serviceNodePorts := CollectServiceNodePorts(test.service)
|
||||
serviceNodePorts := collectServiceNodePorts(test.service)
|
||||
if len(test.expectSpecifiedNodePorts) == 0 {
|
||||
for _, nodePort := range serviceNodePorts {
|
||||
if !storage.serviceNodePorts.Has(nodePort) {
|
||||
@ -1758,13 +1856,13 @@ func TestUpdateNodePorts(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, test := range testCases {
|
||||
err := storage.updateNodePorts(test.oldService, test.newService, nodePortOp)
|
||||
err := updateNodePorts(test.oldService, test.newService, nodePortOp)
|
||||
if err != nil {
|
||||
t.Errorf("%q: unexpected error: %v", test.name, err)
|
||||
continue
|
||||
}
|
||||
|
||||
serviceNodePorts := CollectServiceNodePorts(test.newService)
|
||||
serviceNodePorts := collectServiceNodePorts(test.newService)
|
||||
if len(test.expectSpecifiedNodePorts) == 0 {
|
||||
for _, nodePort := range serviceNodePorts {
|
||||
if !storage.serviceNodePorts.Has(nodePort) {
|
@ -30,16 +30,17 @@ import (
|
||||
"k8s.io/kubernetes/pkg/registry/core/service"
|
||||
)
|
||||
|
||||
type REST struct {
|
||||
type GenericREST struct {
|
||||
*genericregistry.Store
|
||||
}
|
||||
|
||||
// NewREST returns a RESTStorage object that will work against services.
|
||||
func NewREST(optsGetter generic.RESTOptionsGetter) (*REST, *StatusREST) {
|
||||
func NewGenericREST(optsGetter generic.RESTOptionsGetter) (*GenericREST, *StatusREST) {
|
||||
store := &genericregistry.Store{
|
||||
NewFunc: func() runtime.Object { return &api.Service{} },
|
||||
NewListFunc: func() runtime.Object { return &api.ServiceList{} },
|
||||
DefaultQualifiedResource: api.Resource("services"),
|
||||
ReturnDeletedObject: true,
|
||||
|
||||
CreateStrategy: service.Strategy,
|
||||
UpdateStrategy: service.Strategy,
|
||||
@ -55,26 +56,25 @@ func NewREST(optsGetter generic.RESTOptionsGetter) (*REST, *StatusREST) {
|
||||
|
||||
statusStore := *store
|
||||
statusStore.UpdateStrategy = service.StatusStrategy
|
||||
return &REST{store}, &StatusREST{store: &statusStore}
|
||||
return &GenericREST{store}, &StatusREST{store: &statusStore}
|
||||
}
|
||||
|
||||
// Implement ShortNamesProvider
|
||||
var _ rest.ShortNamesProvider = &REST{}
|
||||
var (
|
||||
_ rest.ShortNamesProvider = &GenericREST{}
|
||||
_ rest.CategoriesProvider = &GenericREST{}
|
||||
)
|
||||
|
||||
// ShortNames implements the ShortNamesProvider interface. Returns a list of short names for a resource.
|
||||
func (r *REST) ShortNames() []string {
|
||||
func (r *GenericREST) ShortNames() []string {
|
||||
return []string{"svc"}
|
||||
}
|
||||
|
||||
// Implement CategoriesProvider
|
||||
var _ rest.CategoriesProvider = &REST{}
|
||||
|
||||
// Categories implements the CategoriesProvider interface. Returns a list of categories a resource is part of.
|
||||
func (r *REST) Categories() []string {
|
||||
func (r *GenericREST) Categories() []string {
|
||||
return []string{"all"}
|
||||
}
|
||||
|
||||
// StatusREST implements the REST endpoint for changing the status of a service.
|
||||
// StatusREST implements the GenericREST endpoint for changing the status of a service.
|
||||
type StatusREST struct {
|
||||
store *genericregistry.Store
|
||||
}
|
||||
|
@ -31,7 +31,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/registry/registrytest"
|
||||
)
|
||||
|
||||
func newStorage(t *testing.T) (*REST, *StatusREST, *etcdtesting.EtcdTestServer) {
|
||||
func newStorage(t *testing.T) (*GenericREST, *StatusREST, *etcdtesting.EtcdTestServer) {
|
||||
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
|
||||
restOptions := generic.RESTOptions{
|
||||
StorageConfig: etcdStorage,
|
||||
@ -39,7 +39,7 @@ func newStorage(t *testing.T) (*REST, *StatusREST, *etcdtesting.EtcdTestServer)
|
||||
DeleteCollectionWorkers: 1,
|
||||
ResourcePrefix: "services",
|
||||
}
|
||||
serviceStorage, statusStorage := NewREST(restOptions)
|
||||
serviceStorage, statusStorage := NewGenericREST(restOptions)
|
||||
return serviceStorage, statusStorage, server
|
||||
}
|
||||
|
||||
@ -125,7 +125,7 @@ func TestDelete(t *testing.T) {
|
||||
storage, _, server := newStorage(t)
|
||||
defer server.Terminate(t)
|
||||
defer storage.Store.DestroyFunc()
|
||||
test := genericregistrytest.New(t, storage.Store).AllowCreateOnUpdate()
|
||||
test := genericregistrytest.New(t, storage.Store).AllowCreateOnUpdate().ReturnDeletedObject()
|
||||
test.TestDelete(validService())
|
||||
}
|
||||
|
||||
|
@ -27,6 +27,7 @@ import (
|
||||
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
_ "k8s.io/kubernetes/pkg/apis/core/install"
|
||||
)
|
||||
|
||||
func TestExportService(t *testing.T) {
|
||||
@ -111,17 +112,17 @@ func TestExportService(t *testing.T) {
|
||||
|
||||
func TestCheckGeneratedNameError(t *testing.T) {
|
||||
expect := errors.NewNotFound(api.Resource("foos"), "bar")
|
||||
if err := rest.CheckGeneratedNameError(Strategy, expect, &api.Pod{}); err != expect {
|
||||
if err := rest.CheckGeneratedNameError(Strategy, expect, &api.Service{}); err != expect {
|
||||
t.Errorf("NotFoundError should be ignored: %v", err)
|
||||
}
|
||||
|
||||
expect = errors.NewAlreadyExists(api.Resource("foos"), "bar")
|
||||
if err := rest.CheckGeneratedNameError(Strategy, expect, &api.Pod{}); err != expect {
|
||||
if err := rest.CheckGeneratedNameError(Strategy, expect, &api.Service{}); err != expect {
|
||||
t.Errorf("AlreadyExists should be returned when no GenerateName field: %v", err)
|
||||
}
|
||||
|
||||
expect = errors.NewAlreadyExists(api.Resource("foos"), "bar")
|
||||
if err := rest.CheckGeneratedNameError(Strategy, expect, &api.Pod{ObjectMeta: metav1.ObjectMeta{GenerateName: "foo"}}); err == nil || !errors.IsServerTimeout(err) {
|
||||
if err := rest.CheckGeneratedNameError(Strategy, expect, &api.Service{ObjectMeta: metav1.ObjectMeta{GenerateName: "foo"}}); err == nil || !errors.IsServerTimeout(err) {
|
||||
t.Errorf("expected try again later error: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -2839,65 +2839,6 @@ func TestDeleteWithOptionsQueryAndBody(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestLegacyDelete(t *testing.T) {
|
||||
storage := map[string]rest.Storage{}
|
||||
simpleStorage := SimpleRESTStorage{}
|
||||
ID := "id"
|
||||
storage["simple"] = LegacyRESTStorage{&simpleStorage}
|
||||
var _ rest.Deleter = storage["simple"].(LegacyRESTStorage)
|
||||
handler := handle(storage)
|
||||
server := httptest.NewServer(handler)
|
||||
defer server.Close()
|
||||
|
||||
client := http.Client{}
|
||||
request, err := http.NewRequest("DELETE", server.URL+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/default/simple/"+ID, nil)
|
||||
res, err := client.Do(request)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if res.StatusCode != http.StatusOK {
|
||||
t.Errorf("unexpected response: %#v", res)
|
||||
}
|
||||
if simpleStorage.deleted != ID {
|
||||
t.Errorf("Unexpected delete: %s, expected %s", simpleStorage.deleted, ID)
|
||||
}
|
||||
if simpleStorage.deleteOptions != nil {
|
||||
t.Errorf("unexpected delete options: %#v", simpleStorage.deleteOptions)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLegacyDeleteIgnoresOptions(t *testing.T) {
|
||||
storage := map[string]rest.Storage{}
|
||||
simpleStorage := SimpleRESTStorage{}
|
||||
ID := "id"
|
||||
storage["simple"] = LegacyRESTStorage{&simpleStorage}
|
||||
handler := handle(storage)
|
||||
server := httptest.NewServer(handler)
|
||||
defer server.Close()
|
||||
|
||||
item := metav1.NewDeleteOptions(300)
|
||||
body, err := runtime.Encode(codec, item)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
client := http.Client{}
|
||||
request, err := http.NewRequest("DELETE", server.URL+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/default/simple/"+ID, bytes.NewReader(body))
|
||||
res, err := client.Do(request)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if res.StatusCode != http.StatusOK {
|
||||
t.Errorf("unexpected response: %#v", res)
|
||||
}
|
||||
if simpleStorage.deleted != ID {
|
||||
t.Errorf("Unexpected delete: %s, expected %s", simpleStorage.deleted, ID)
|
||||
}
|
||||
if simpleStorage.deleteOptions != nil {
|
||||
t.Errorf("unexpected delete options: %#v", simpleStorage.deleteOptions)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeleteInvokesAdmissionControl(t *testing.T) {
|
||||
// TODO: remove mutating deny when we removed it from the endpoint implementation and ported all plugins
|
||||
for _, admit := range []admission.Interface{alwaysMutatingDeny{}, alwaysValidatingDeny{}} {
|
||||
|
@ -227,7 +227,6 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
|
||||
lister, isLister := storage.(rest.Lister)
|
||||
getter, isGetter := storage.(rest.Getter)
|
||||
getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions)
|
||||
deleter, isDeleter := storage.(rest.Deleter)
|
||||
gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
|
||||
collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter)
|
||||
updater, isUpdater := storage.(rest.Updater)
|
||||
@ -273,16 +272,12 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
|
||||
|
||||
var versionedDeleteOptions runtime.Object
|
||||
var versionedDeleterObject interface{}
|
||||
switch {
|
||||
case isGracefulDeleter:
|
||||
if isGracefulDeleter {
|
||||
versionedDeleteOptions, err = a.group.Creater.New(optionsExternalVersion.WithKind("DeleteOptions"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
versionedDeleterObject = indirectArbitraryPointer(versionedDeleteOptions)
|
||||
isDeleter = true
|
||||
case isDeleter:
|
||||
gracefulDeleter = rest.GracefulDeleteAdapter{Deleter: deleter}
|
||||
}
|
||||
|
||||
versionedStatusPtr, err := a.group.Creater.New(optionsExternalVersion.WithKind("Status"))
|
||||
@ -416,7 +411,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
|
||||
}
|
||||
actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer, false}, isUpdater)
|
||||
actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer, false}, isPatcher)
|
||||
actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isDeleter)
|
||||
actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isGracefulDeleter)
|
||||
actions = appendIf(actions, action{"WATCH", "watch/" + itemPath, nameParams, namer, false}, isWatcher)
|
||||
actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer, false}, isConnecter)
|
||||
actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", proxyParams, namer, false}, isConnecter && connectSubpath)
|
||||
@ -462,7 +457,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
|
||||
}
|
||||
actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer, false}, isUpdater)
|
||||
actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer, false}, isPatcher)
|
||||
actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isDeleter)
|
||||
actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isGracefulDeleter)
|
||||
actions = appendIf(actions, action{"WATCH", "watch/" + itemPath, nameParams, namer, false}, isWatcher)
|
||||
actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer, false}, isConnecter)
|
||||
actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", proxyParams, namer, false}, isConnecter && connectSubpath)
|
||||
|
@ -137,16 +137,6 @@ type TableConvertor interface {
|
||||
ConvertToTable(ctx genericapirequest.Context, object runtime.Object, tableOptions runtime.Object) (*metav1beta1.Table, error)
|
||||
}
|
||||
|
||||
// Deleter is an object that can delete a named RESTful resource.
|
||||
type Deleter interface {
|
||||
// Delete finds a resource in the storage and deletes it.
|
||||
// Although it can return an arbitrary error value, IsNotFound(err) is true for the
|
||||
// returned error value err when the specified resource is not found.
|
||||
// Delete *may* return the object that was deleted, or a status object indicating additional
|
||||
// information about deletion.
|
||||
Delete(ctx genericapirequest.Context, name string) (runtime.Object, error)
|
||||
}
|
||||
|
||||
// GracefulDeleter knows how to pass deletion options to allow delayed deletion of a
|
||||
// RESTful object.
|
||||
type GracefulDeleter interface {
|
||||
@ -162,17 +152,6 @@ type GracefulDeleter interface {
|
||||
Delete(ctx genericapirequest.Context, name string, options *metav1.DeleteOptions) (runtime.Object, bool, error)
|
||||
}
|
||||
|
||||
// GracefulDeleteAdapter adapts the Deleter interface to GracefulDeleter
|
||||
type GracefulDeleteAdapter struct {
|
||||
Deleter
|
||||
}
|
||||
|
||||
// Delete implements RESTGracefulDeleter in terms of Deleter
|
||||
func (w GracefulDeleteAdapter) Delete(ctx genericapirequest.Context, name string, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
|
||||
obj, err := w.Deleter.Delete(ctx, name)
|
||||
return obj, true, err
|
||||
}
|
||||
|
||||
// CollectionDeleter is an object that can delete a collection
|
||||
// of RESTful resources.
|
||||
type CollectionDeleter interface {
|
||||
|
@ -38,6 +38,7 @@ go_library(
|
||||
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
||||
"//vendor/k8s.io/api/admissionregistration/v1alpha1:go_default_library",
|
||||
"//vendor/k8s.io/api/admissionregistration/v1beta1:go_default_library",
|
||||
"//vendor/k8s.io/api/authorization/v1:go_default_library",
|
||||
"//vendor/k8s.io/api/batch/v1:go_default_library",
|
||||
"//vendor/k8s.io/api/batch/v1beta1:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
|
@ -24,11 +24,13 @@ import (
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
|
||||
authorizationv1 "k8s.io/api/authorization/v1"
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
|
||||
"k8s.io/kubernetes/pkg/printers"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
imageutils "k8s.io/kubernetes/test/utils/image"
|
||||
@ -142,7 +144,15 @@ var _ = SIGDescribe("Servers with support for Table transformation", func() {
|
||||
c := f.ClientSet
|
||||
|
||||
table := &metav1beta1.Table{}
|
||||
err := c.CoreV1().RESTClient().Get().Resource("services").SetHeader("Accept", "application/json;as=Table;v=v1beta1;g=meta.k8s.io").Do().Into(table)
|
||||
sar := &authorizationv1.SelfSubjectAccessReview{
|
||||
Spec: authorizationv1.SelfSubjectAccessReviewSpec{
|
||||
NonResourceAttributes: &authorizationv1.NonResourceAttributes{
|
||||
Path: "/",
|
||||
Verb: "get",
|
||||
},
|
||||
},
|
||||
}
|
||||
err := c.AuthorizationV1().RESTClient().Post().Resource("selfsubjectaccessreviews").SetHeader("Accept", "application/json;as=Table;v=v1beta1;g=meta.k8s.io").Body(sar).Do().Into(table)
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.(errors.APIStatus).Status().Code).To(Equal(int32(406)))
|
||||
})
|
||||
|
Loading…
Reference in New Issue
Block a user