Svc REST: De-layering done! Convert to 1 layer

This is the culmination of all the previous commits which made this last
move less dramatic.  More tests and cleanup commits will follow.

Background, for future archaeologists:

Service has (had) an "outer" and "inner" REST handler.  This is because of how we do IP and port allocations synchronously, but since we don't have API transactions, we need to roll those back in case of a failure.  Both layers use the same `Strategy`, but the outer calls into the inner, which causes a lot of complexity in the code (including an open-coded partial reimplementation of a date-unknown snapshot of the generic REST code) and results in `Prepare` and `Validate` hooks being called twice.

The "normal" REST flow seems to be:

```
mutating webhooks
generic REST store Create {
    cleanup = BeginCreate
    BeforeCreate {
        strategy.PrepareForCreate {
            dropDisabledFields
        }
        strategy.Validate
        strategy.Canonicalize
    }
    createValidation (validating webhooks)
    storage Create
    cleanup
    AfterCreate
    Decorator
}
```

Service (before this series of commits) does:

```
mutating webhooks
svc custom Create {
    BeforeCreate {
        strategy.PrepareForCreate {
            dropDisabledFields
        }
        strategy.Validate
        strategy.Canonicalize
    }
    Allocations
    inner (generic) Create {
        cleanup = BeginCreate
        BeforeCreate {
            strategy.PrepareForCreate {
                dropDisabledFields
            }
            strategy.Validate
            strategy.Canonicalize
        }
        createValidation (validating webhooks)
        storage Create
        cleanup
        AfterCreate
        Decorator
    }
}
```

After this:

```
mutating webhooks
generic REST store Create {
    cleanup = BeginCreate
        Allocations
    BeforeCreate {
        strategy.PrepareForCreate {
            dropDisabledFields
        }
        strategy.Validate
        strategy.Canonicalize
    }
    createValidation (validating webhooks)
    storage Create
    cleanup
    AfterCreate
        Rollback allocations on error
    Decorator
}
```
This commit is contained in:
Tim Hockin 2020-12-05 16:37:29 -08:00
parent cf4804643a
commit 8e68b587e8
3 changed files with 7 additions and 185 deletions

View File

@ -261,7 +261,7 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi
serviceIPAllocators[secondaryServiceClusterIPAllocator.IPFamily()] = secondaryServiceClusterIPAllocator serviceIPAllocators[secondaryServiceClusterIPAllocator.IPFamily()] = secondaryServiceClusterIPAllocator
} }
serviceRESTStorage, serviceStatusStorage, _, err := servicestore.NewGenericREST( serviceRESTStorage, serviceStatusStorage, serviceRESTProxy, err := servicestore.NewGenericREST(
restOptionsGetter, restOptionsGetter,
serviceClusterIPAllocator.IPFamily(), serviceClusterIPAllocator.IPFamily(),
serviceIPAllocators, serviceIPAllocators,
@ -273,11 +273,6 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
} }
serviceRest, serviceRestProxy := servicestore.NewREST(
serviceRESTStorage,
serviceClusterIPAllocator.IPFamily(),
c.ProxyTransport)
restStorageMap := map[string]rest.Storage{ restStorageMap := map[string]rest.Storage{
"pods": podStorage.Pod, "pods": podStorage.Pod,
"pods/attach": podStorage.Attach, "pods/attach": podStorage.Attach,
@ -294,8 +289,8 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi
"replicationControllers": controllerStorage.Controller, "replicationControllers": controllerStorage.Controller,
"replicationControllers/status": controllerStorage.Status, "replicationControllers/status": controllerStorage.Status,
"services": serviceRest, "services": serviceRESTStorage,
"services/proxy": serviceRestProxy, "services/proxy": serviceRESTProxy,
"services/status": serviceStatusStorage, "services/status": serviceStatusStorage,
"endpoints": endpointsStorage, "endpoints": endpointsStorage,

View File

@ -37,7 +37,6 @@ import (
api "k8s.io/kubernetes/pkg/apis/core" api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/core/validation" "k8s.io/kubernetes/pkg/apis/core/validation"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
registry "k8s.io/kubernetes/pkg/registry/core/service"
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator" "k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
"k8s.io/kubernetes/pkg/registry/core/service/portallocator" "k8s.io/kubernetes/pkg/registry/core/service/portallocator"
netutils "k8s.io/utils/net" netutils "k8s.io/utils/net"
@ -81,26 +80,6 @@ type ServiceStorage interface {
rest.Redirector rest.Redirector
} }
// NewREST returns a wrapper around the underlying generic storage and performs
// allocations and deallocations of various service related resources like ports.
// TODO: all transactional behavior should be supported from within generic storage
// or the strategy.
func NewREST(
services ServiceStorage,
defaultFamily api.IPFamily,
proxyTransport http.RoundTripper,
) (*REST, *registry.ProxyREST) {
klog.V(0).Infof("the default service ipfamily for this cluster is: %s", string(defaultFamily))
rest := &REST{
services: services,
proxyTransport: proxyTransport,
}
return rest, &registry.ProxyREST{Redirector: rest, ProxyTransport: proxyTransport}
}
// This is a trasitionary function to facilitate service REST flattening. // This is a trasitionary function to facilitate service REST flattening.
func makeAlloc(defaultFamily api.IPFamily, ipAllocs map[api.IPFamily]ipallocator.Interface, portAlloc portallocator.Interface) RESTAllocStuff { func makeAlloc(defaultFamily api.IPFamily, ipAllocs map[api.IPFamily]ipallocator.Interface, portAlloc portallocator.Interface) RESTAllocStuff {
return RESTAllocStuff{ return RESTAllocStuff{

View File

@ -17,28 +17,18 @@ limitations under the License.
package storage package storage
import ( import (
"context"
"fmt"
"net" "net"
"net/http"
"net/url"
"reflect" "reflect"
"sort"
"testing" "testing"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
utilnet "k8s.io/apimachinery/pkg/util/net" utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/watch"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request" genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/generic" "k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/registry/rest"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/util/dryrun"
svctest "k8s.io/kubernetes/pkg/api/service/testing" svctest "k8s.io/kubernetes/pkg/api/service/testing"
api "k8s.io/kubernetes/pkg/apis/core" api "k8s.io/kubernetes/pkg/apis/core"
endpointstore "k8s.io/kubernetes/pkg/registry/core/endpoint/storage" endpointstore "k8s.io/kubernetes/pkg/registry/core/endpoint/storage"
@ -47,143 +37,9 @@ import (
"k8s.io/kubernetes/pkg/registry/registrytest" "k8s.io/kubernetes/pkg/registry/registrytest"
netutils "k8s.io/utils/net" netutils "k8s.io/utils/net"
utilpointer "k8s.io/utils/pointer" utilpointer "k8s.io/utils/pointer"
"sigs.k8s.io/structured-merge-diff/v4/fieldpath"
) )
// TODO(wojtek-t): Cleanup this file. func NewTestREST(t *testing.T, ipFamilies []api.IPFamily) (*GenericREST, *etcd3testing.EtcdTestServer) {
// It is now testing mostly the same things as other resources but
// in a completely different way. We should unify it.
type serviceStorage struct {
inner *GenericREST
Services map[string]*api.Service
}
func (s *serviceStorage) saveService(svc *api.Service) {
if s.Services == nil {
s.Services = map[string]*api.Service{}
}
s.Services[svc.Name] = svc.DeepCopy()
}
func (s *serviceStorage) NamespaceScoped() bool {
return true
}
func (s *serviceStorage) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
if s.Services[name] == nil {
return nil, fmt.Errorf("service %q not found", name)
}
return s.Services[name].DeepCopy(), nil
}
func getService(getter rest.Getter, ctx context.Context, name string, options *metav1.GetOptions) (*api.Service, error) {
obj, err := getter.Get(ctx, name, options)
if err != nil {
return nil, err
}
return obj.(*api.Service), nil
}
func (s *serviceStorage) NewList() runtime.Object {
panic("not implemented")
}
func (s *serviceStorage) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
ns, _ := genericapirequest.NamespaceFrom(ctx)
keys := make([]string, 0, len(s.Services))
for k := range s.Services {
keys = append(keys, k)
}
sort.Strings(keys)
res := new(api.ServiceList)
for _, k := range keys {
svc := s.Services[k]
if ns == metav1.NamespaceAll || ns == svc.Namespace {
res.Items = append(res.Items, *svc)
}
}
return res, nil
}
func (s *serviceStorage) New() runtime.Object {
panic("not implemented")
}
func (s *serviceStorage) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
ret, err := s.inner.Create(ctx, obj, createValidation, options)
if err != nil {
return ret, err
}
if dryrun.IsDryRun(options.DryRun) {
return ret.DeepCopyObject(), nil
}
svc := ret.(*api.Service)
s.saveService(svc)
return s.Services[svc.Name].DeepCopy(), nil
}
func (s *serviceStorage) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
ret, created, err := s.inner.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
if err != nil {
return ret, created, err
}
if dryrun.IsDryRun(options.DryRun) {
return ret.DeepCopyObject(), created, err
}
svc := ret.(*api.Service)
s.saveService(svc)
return s.Services[name].DeepCopy(), created, nil
}
func (s *serviceStorage) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
ret, del, err := s.inner.Delete(ctx, name, deleteValidation, options)
if err != nil {
return ret, del, err
}
if dryrun.IsDryRun(options.DryRun) {
return ret.DeepCopyObject(), del, nil
}
delete(s.Services, name)
return ret.DeepCopyObject(), del, err
}
func (s *serviceStorage) DeleteCollection(ctx context.Context, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) (runtime.Object, error) {
panic("not implemented")
}
func (s *serviceStorage) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {
panic("not implemented")
}
func (s *serviceStorage) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) {
panic("not implemented")
}
func (s *serviceStorage) StorageVersion() runtime.GroupVersioner {
panic("not implemented")
}
// GetResetFields implements rest.ResetFieldsStrategy
func (s *serviceStorage) GetResetFields() map[fieldpath.APIVersion]*fieldpath.Set {
//FIXME: should panic?
return nil
}
// ResourceLocation implements rest.Redirector
func (s *serviceStorage) ResourceLocation(ctx context.Context, id string) (remoteLocation *url.URL, transport http.RoundTripper, err error) {
panic("not implemented")
}
func NewTestREST(t *testing.T, ipFamilies []api.IPFamily) (*REST, *etcd3testing.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "") etcdStorage, server := registrytest.NewEtcdStorage(t, "")
var rPrimary ipallocator.Interface var rPrimary ipallocator.Interface
@ -228,15 +84,6 @@ func NewTestREST(t *testing.T, ipFamilies []api.IPFamily) (*REST, *etcd3testing.
ipAllocators[rSecondary.IPFamily()] = rSecondary ipAllocators[rSecondary.IPFamily()] = rSecondary
} }
inner := newInnerREST(t, etcdStorage, ipAllocators, portAllocator)
rest, _ := NewREST(inner, rPrimary.IPFamily(), nil)
return rest, server
}
// This bridges to the "inner" REST implementation so tests continue to run
// during the delayering of service REST code.
func newInnerREST(t *testing.T, etcdStorage *storagebackend.ConfigForResource, ipAllocs map[api.IPFamily]ipallocator.Interface, portAlloc portallocator.Interface) *serviceStorage {
restOptions := generic.RESTOptions{ restOptions := generic.RESTOptions{
StorageConfig: etcdStorage.ForResource(schema.GroupResource{Resource: "services"}), StorageConfig: etcdStorage.ForResource(schema.GroupResource{Resource: "services"}),
Decorator: generic.UndecoratedStorage, Decorator: generic.UndecoratedStorage,
@ -249,11 +96,12 @@ func newInnerREST(t *testing.T, etcdStorage *storagebackend.ConfigForResource, i
ResourcePrefix: "endpoints", ResourcePrefix: "endpoints",
}) })
inner, _, _, err := NewGenericREST(restOptions, api.IPv4Protocol, ipAllocs, portAlloc, endpoints, nil, nil) rest, _, _, err := NewGenericREST(restOptions, api.IPv4Protocol, ipAllocators, portAllocator, endpoints, nil, nil)
if err != nil { if err != nil {
t.Fatalf("unexpected error from REST storage: %v", err) t.Fatalf("unexpected error from REST storage: %v", err)
} }
return &serviceStorage{inner: inner}
return rest, server
} }
func makeIPNet(t *testing.T) *net.IPNet { func makeIPNet(t *testing.T) *net.IPNet {