Svc REST: De-layer Create

Gut the "outer" Create() and move it to the inner BeginCreate().  This
uses a "transaction" type to make cleanup functions easy to read.

Background:

Service has an "outer" and "inner" REST handler.  This is because of how we do IP and port allocations synchronously, but since we don't have API transactions, we need to roll those back in case of a failure.  Both layers use the same `Strategy`, but the outer calls into the inner, which causes a lot of complexity in the code (including an open-coded partial reimplementation of a date-unknown snapshot of the generic REST code) and results in `Prepare` and `Validate` hooks being called twice.

The "normal" REST flow seems to be:

```
mutating webhooks
generic REST store Create {
    cleanup = BeginCreate
    BeforeCreate {
        strategy.PrepareForCreate {
            dropDisabledFields
        }
        strategy.Validate
        strategy.Canonicalize
    }
    createValidation (validating webhooks)
    storage Create
    cleanup
    AfterCreate
    Decorator
}
```

Service (before this commit) does:

```
mutating webhooks
svc custom Create {
    BeforeCreate {
        strategy.PrepareForCreate {
            dropDisabledFields
        }
        strategy.Validate
        strategy.Canonicalize
    }
    Allocations
    inner (generic) Create {
        cleanup = BeginCreate
        BeforeCreate {
            strategy.PrepareForCreate {
                dropDisabledFields
            }
            strategy.Validate
            strategy.Canonicalize
        }
        createValidation (validating webhooks)
        storage Create
        cleanup
        AfterCreate
        Decorator
    }
}
```

After this commit:

```
mutating webhooks
generic REST store Create {
    cleanup = BeginCreate
        Allocations
    BeforeCreate {
        strategy.PrepareForCreate {
            dropDisabledFields
        }
        strategy.Validate
        strategy.Canonicalize
    }
    createValidation (validating webhooks)
    storage Create
    cleanup
    AfterCreate
        Rollback allocations on error
    Decorator
}
```

This same fix pattern will be applied to Delete and Update in subsequent
commits.
This commit is contained in:
Tim Hockin 2020-11-17 16:30:46 -08:00
parent 5e7e35ca45
commit 634055bded
6 changed files with 130 additions and 104 deletions

View File

@ -261,7 +261,7 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi
serviceIPAllocators[secondaryServiceClusterIPAllocator.IPFamily()] = secondaryServiceClusterIPAllocator
}
serviceRESTStorage, serviceStatusStorage, err := servicestore.NewGenericREST(restOptionsGetter, serviceClusterIPRange, secondaryServiceClusterIPAllocator != nil)
serviceRESTStorage, serviceStatusStorage, err := servicestore.NewGenericREST(restOptionsGetter, serviceClusterIPAllocator.IPFamily(), serviceIPAllocators, serviceNodePortAllocator)
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}

View File

@ -177,41 +177,7 @@ func (rs *REST) Watch(ctx context.Context, options *metainternalversion.ListOpti
}
func (rs *REST) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
// DeepCopy to prevent writes here propagating back to tests.
obj = obj.DeepCopyObject()
service := obj.(*api.Service)
if err := rest.BeforeCreate(rs.strategy, ctx, obj); err != nil {
return nil, err
}
// Allocate IPs and ports. If we had a transactional store, this would just
// be part of the larger transaction. We don't have that, so we have to do
// it manually. This has to happen here and not in any earlier hooks (e.g.
// defaulting) because it needs to be aware of flags and be able to access
// API storage.
txn, err := rs.alloc.allocateCreate(service, dryrun.IsDryRun(options.DryRun))
if err != nil {
return nil, err
}
defer func() {
if txn != nil {
txn.Revert()
}
}()
out, err := rs.services.Create(ctx, service, createValidation, options)
if err != nil {
err = rest.CheckGeneratedNameError(ctx, rs.strategy, err, service)
}
if err == nil {
txn.Commit()
txn = nil
}
return out, err
return rs.services.Create(ctx, obj, createValidation, options)
}
func (al *RESTAllocStuff) allocateCreate(service *api.Service, dryRun bool) (transaction, error) {

View File

@ -37,6 +37,7 @@ import (
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/rest"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/util/dryrun"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
@ -60,6 +61,7 @@ import (
// in a completely different way. We should unify it.
type serviceStorage struct {
inner *GenericREST
Services map[string]*api.Service
}
@ -118,12 +120,16 @@ func (s *serviceStorage) New() runtime.Object {
}
func (s *serviceStorage) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
if dryrun.IsDryRun(options.DryRun) {
return obj, nil
ret, err := s.inner.Create(ctx, obj, createValidation, options)
if err != nil {
return ret, err
}
svc := obj.(*api.Service)
if dryrun.IsDryRun(options.DryRun) {
return ret.DeepCopyObject(), nil
}
svc := ret.(*api.Service)
s.saveService(svc)
s.Services[svc.Name].ResourceVersion = "1"
return s.Services[svc.Name].DeepCopy(), nil
}
@ -173,8 +179,6 @@ func NewTestREST(t *testing.T, ipFamilies []api.IPFamily) (*REST, *etcd3testing.
func NewTestRESTWithPods(t *testing.T, endpoints []*api.Endpoints, pods []api.Pod, ipFamilies []api.IPFamily) (*REST, *etcd3testing.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
serviceStorage := &serviceStorage{}
podStorage, err := podstore.NewStorage(generic.RESTOptions{
StorageConfig: etcdStorage.ForResource(schema.GroupResource{Resource: "pods"}),
Decorator: generic.UndecoratedStorage,
@ -246,11 +250,29 @@ func NewTestRESTWithPods(t *testing.T, endpoints []*api.Endpoints, pods []api.Po
if rSecondary != nil {
ipAllocators[rSecondary.IPFamily()] = rSecondary
}
rest, _ := NewREST(serviceStorage, endpointStorage, podStorage.Pod, rPrimary.IPFamily(), ipAllocators, portAllocator, nil)
inner := newInnerREST(t, etcdStorage, ipAllocators, portAllocator)
rest, _ := NewREST(inner, endpointStorage, podStorage.Pod, rPrimary.IPFamily(), ipAllocators, portAllocator, nil)
return rest, server
}
// This bridges to the "inner" REST implementation so tests continue to run
// during the delayering of service REST code.
func newInnerREST(t *testing.T, etcdStorage *storagebackend.ConfigForResource, ipAllocs map[api.IPFamily]ipallocator.Interface, portAlloc portallocator.Interface) *serviceStorage {
restOptions := generic.RESTOptions{
StorageConfig: etcdStorage.ForResource(schema.GroupResource{Resource: "services"}),
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: 1,
ResourcePrefix: "services",
}
inner, _, err := NewGenericREST(restOptions, api.IPv4Protocol, ipAllocs, portAlloc)
if err != nil {
t.Fatalf("unexpected error from REST storage: %v", err)
}
return &serviceStorage{inner: inner}
}
func makeIPNet(t *testing.T) *net.IPNet {
_, net, err := netutils.ParseCIDRSloppy("1.2.3.0/24")
if err != nil {

View File

@ -18,13 +18,13 @@ package storage
import (
"context"
"net"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/registry/generic"
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/util/dryrun"
utilfeature "k8s.io/apiserver/pkg/util/feature"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/features"
@ -32,8 +32,9 @@ import (
printersinternal "k8s.io/kubernetes/pkg/printers/internalversion"
printerstorage "k8s.io/kubernetes/pkg/printers/storage"
"k8s.io/kubernetes/pkg/registry/core/service"
registry "k8s.io/kubernetes/pkg/registry/core/service"
svcreg "k8s.io/kubernetes/pkg/registry/core/service"
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
"sigs.k8s.io/structured-merge-diff/v4/fieldpath"
netutil "k8s.io/utils/net"
@ -41,13 +42,19 @@ import (
type GenericREST struct {
*genericregistry.Store
primaryIPFamily *api.IPFamily
secondaryFamily *api.IPFamily
primaryIPFamily api.IPFamily
secondaryIPFamily api.IPFamily
alloc RESTAllocStuff
}
// NewGenericREST returns a RESTStorage object that will work against services.
func NewGenericREST(optsGetter generic.RESTOptionsGetter, serviceCIDR net.IPNet, hasSecondary bool) (*GenericREST, *StatusREST, error) {
strategy, _ := registry.StrategyForServiceCIDRs(serviceCIDR, hasSecondary)
func NewGenericREST(
optsGetter generic.RESTOptionsGetter,
serviceIPFamily api.IPFamily,
ipAllocs map[api.IPFamily]ipallocator.Interface,
portAlloc portallocator.Interface) (*GenericREST, *StatusREST, error) {
strategy, _ := svcreg.StrategyForServiceCIDRs(ipAllocs[serviceIPFamily].CIDR(), len(ipAllocs) > 1)
store := &genericregistry.Store{
NewFunc: func() runtime.Object { return &api.Service{} },
@ -72,22 +79,12 @@ func NewGenericREST(optsGetter generic.RESTOptionsGetter, serviceCIDR net.IPNet,
statusStore.UpdateStrategy = statusStrategy
statusStore.ResetFieldsStrategy = statusStrategy
ipv4 := api.IPv4Protocol
ipv6 := api.IPv6Protocol
var primaryIPFamily *api.IPFamily
var secondaryFamily *api.IPFamily
if netutil.IsIPv6CIDR(&serviceCIDR) {
primaryIPFamily = &ipv6
if hasSecondary {
secondaryFamily = &ipv4
}
} else {
primaryIPFamily = &ipv4
if hasSecondary {
secondaryFamily = &ipv6
}
var primaryIPFamily api.IPFamily = serviceIPFamily
var secondaryIPFamily api.IPFamily = "" // sentinel value
if len(ipAllocs) > 1 {
secondaryIPFamily = otherFamily(serviceIPFamily)
}
genericStore := &GenericREST{store, primaryIPFamily, secondaryFamily}
genericStore := &GenericREST{store, primaryIPFamily, secondaryIPFamily, makeAlloc(serviceIPFamily, ipAllocs, portAlloc)}
store.Decorator = genericStore.defaultOnRead
store.BeginCreate = genericStore.beginCreate
store.BeginUpdate = genericStore.beginUpdate
@ -95,6 +92,15 @@ func NewGenericREST(optsGetter generic.RESTOptionsGetter, serviceCIDR net.IPNet,
return genericStore, &StatusREST{store: &statusStore}, nil
}
// otherFamily returns the non-selected IPFamily. This assumes the input is
// valid.
func otherFamily(fam api.IPFamily) api.IPFamily {
if fam == api.IPv4Protocol {
return api.IPv6Protocol
}
return api.IPv4Protocol
}
var (
_ rest.ShortNamesProvider = &GenericREST{}
_ rest.CategoriesProvider = &GenericREST{}
@ -196,7 +202,7 @@ func (r *GenericREST) defaultOnReadService(service *api.Service) {
preferDualStack := api.IPFamilyPolicyPreferDualStack
// headless services
if len(service.Spec.ClusterIPs) == 1 && service.Spec.ClusterIPs[0] == api.ClusterIPNone {
service.Spec.IPFamilies = []api.IPFamily{*r.primaryIPFamily}
service.Spec.IPFamilies = []api.IPFamily{r.primaryIPFamily}
// headless+selectorless
// headless+selectorless takes both families. Why?
@ -205,7 +211,7 @@ func (r *GenericREST) defaultOnReadService(service *api.Service) {
// it to PreferDualStack on any cluster (single or dualstack configured).
if len(service.Spec.Selector) == 0 {
service.Spec.IPFamilyPolicy = &preferDualStack
if *r.primaryIPFamily == api.IPv4Protocol {
if r.primaryIPFamily == api.IPv4Protocol {
service.Spec.IPFamilies = append(service.Spec.IPFamilies, api.IPv6Protocol)
} else {
service.Spec.IPFamilies = append(service.Spec.IPFamilies, api.IPv4Protocol)
@ -216,8 +222,8 @@ func (r *GenericREST) defaultOnReadService(service *api.Service) {
// selector and will have to follow how the cluster is configured. If the cluster is
// configured to dual stack then the service defaults to PreferDualStack. Otherwise we
// default it to SingleStack.
if r.secondaryFamily != nil {
service.Spec.IPFamilies = append(service.Spec.IPFamilies, *r.secondaryFamily)
if r.secondaryIPFamily != "" {
service.Spec.IPFamilies = append(service.Spec.IPFamilies, r.secondaryIPFamily)
service.Spec.IPFamilyPolicy = &preferDualStack
} else {
service.Spec.IPFamilyPolicy = &singleStack
@ -246,13 +252,27 @@ func (r *GenericREST) defaultOnReadService(service *api.Service) {
func (r *GenericREST) beginCreate(ctx context.Context, obj runtime.Object, options *metav1.CreateOptions) (genericregistry.FinishFunc, error) {
svc := obj.(*api.Service)
// FIXME: remove this when implementing
_ = svc
// Make sure ClusterIP and ClusterIPs are in sync. This has to happen
// early, before anyone looks at them.
// NOTE: the args are (old, new)
svcreg.NormalizeClusterIPs(nil, svc)
// Allocate IPs and ports. If we had a transactional store, this would just
// be part of the larger transaction. We don't have that, so we have to do
// it manually. This has to happen here and not in any earlier hooks (e.g.
// defaulting) because it needs to be aware of flags and be able to access
// API storage.
txn, err := r.alloc.allocateCreate(svc, dryrun.IsDryRun(options.DryRun))
if err != nil {
return nil, err
}
// Our cleanup callback
finish := func(_ context.Context, success bool) {
if success {
txn.Commit()
} else {
txn.Revert()
}
}
@ -263,9 +283,10 @@ func (r *GenericREST) beginUpdate(ctx context.Context, obj, oldObj runtime.Objec
newSvc := obj.(*api.Service)
oldSvc := oldObj.(*api.Service)
// FIXME: remove these when implementing
_ = oldSvc
_ = newSvc
// Make sure ClusterIP and ClusterIPs are in sync. This has to happen
// early, before anyone looks at them.
// NOTE: the args are (old, new)
svcreg.NormalizeClusterIPs(oldSvc, newSvc)
// Our cleanup callback
finish := func(_ context.Context, success bool) {

View File

@ -17,6 +17,8 @@ limitations under the License.
package storage
import (
"fmt"
"net"
"reflect"
"testing"
@ -30,6 +32,7 @@ import (
genericregistrytest "k8s.io/apiserver/pkg/registry/generic/testing"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
"k8s.io/kubernetes/pkg/registry/registrytest"
netutils "k8s.io/utils/net"
@ -38,6 +41,14 @@ import (
"k8s.io/kubernetes/pkg/features"
)
func makeIPAllocator(cidr *net.IPNet) ipallocator.Interface {
al, err := ipallocator.NewInMemory(cidr)
if err != nil {
panic(fmt.Sprintf("error creating IP allocator: %v", err))
}
return al
}
func newStorage(t *testing.T) (*GenericREST, *StatusREST, *etcd3testing.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
restOptions := generic.RESTOptions{
@ -46,7 +57,10 @@ func newStorage(t *testing.T) (*GenericREST, *StatusREST, *etcd3testing.EtcdTest
DeleteCollectionWorkers: 1,
ResourcePrefix: "services",
}
serviceStorage, statusStorage, err := NewGenericREST(restOptions, *makeIPNet(t), false)
ipAllocs := map[api.IPFamily]ipallocator.Interface{
api.IPv4Protocol: makeIPAllocator(makeIPNet(t)),
}
serviceStorage, statusStorage, err := NewGenericREST(restOptions, api.IPv4Protocol, ipAllocs, nil)
if err != nil {
t.Fatalf("unexpected error from REST storage: %v", err)
}
@ -427,7 +441,10 @@ func TestServiceDefaultOnRead(t *testing.T) {
t.Fatalf("failed to parse CIDR")
}
serviceStorage, _, err := NewGenericREST(restOptions, *cidr, false)
ipAllocs := map[api.IPFamily]ipallocator.Interface{
api.IPv4Protocol: makeIPAllocator(cidr),
}
serviceStorage, _, err := NewGenericREST(restOptions, api.IPv4Protocol, ipAllocs, nil)
if err != nil {
t.Fatalf("unexpected error from REST storage: %v", err)
}
@ -471,7 +488,7 @@ func TestServiceDefaultOnRead(t *testing.T) {
}
func TestServiceDefaulting(t *testing.T) {
makeStorage := func(t *testing.T, primaryCIDR string, isDualStack bool) (*GenericREST, *StatusREST, *etcd3testing.EtcdTestServer) {
makeStorage := func(t *testing.T, ipFamilies []api.IPFamily) (*GenericREST, *StatusREST, *etcd3testing.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
restOptions := generic.RESTOptions{
StorageConfig: etcdStorage.ForResource(schema.GroupResource{Resource: "services"}),
@ -480,12 +497,19 @@ func TestServiceDefaulting(t *testing.T) {
ResourcePrefix: "services",
}
_, cidr, err := netutils.ParseCIDRSloppy(primaryCIDR)
if err != nil {
t.Fatalf("failed to parse CIDR %s", primaryCIDR)
ipAllocs := map[api.IPFamily]ipallocator.Interface{}
for _, fam := range ipFamilies {
switch fam {
case api.IPv4Protocol:
_, cidr, _ := netutils.ParseCIDRSloppy("10.0.0.0/16")
ipAllocs[fam] = makeIPAllocator(cidr)
case api.IPv6Protocol:
_, cidr, _ := netutils.ParseCIDRSloppy("2000::/108")
ipAllocs[fam] = makeIPAllocator(cidr)
}
}
serviceStorage, statusStorage, err := NewGenericREST(restOptions, *(cidr), isDualStack)
serviceStorage, statusStorage, err := NewGenericREST(restOptions, ipFamilies[0], ipAllocs, nil)
if err != nil {
t.Fatalf("unexpected error from REST storage: %v", err)
}
@ -493,35 +517,25 @@ func TestServiceDefaulting(t *testing.T) {
}
testCases := []struct {
name string
primaryCIDR string
PrimaryIPv6 bool
isDualStack bool
name string
ipFamilies []api.IPFamily
}{
{
name: "IPv4 single stack cluster",
primaryCIDR: "10.0.0.0/16",
PrimaryIPv6: false,
isDualStack: false,
name: "IPv4 single stack cluster",
ipFamilies: []api.IPFamily{api.IPv4Protocol},
},
{
name: "IPv6 single stack cluster",
primaryCIDR: "2000::/108",
PrimaryIPv6: true,
isDualStack: false,
name: "IPv6 single stack cluster",
ipFamilies: []api.IPFamily{api.IPv6Protocol},
},
{
name: "IPv4, IPv6 dual stack cluster",
primaryCIDR: "10.0.0.0/16",
PrimaryIPv6: false,
isDualStack: true,
name: "IPv4, IPv6 dual stack cluster",
ipFamilies: []api.IPFamily{api.IPv4Protocol, api.IPv6Protocol},
},
{
name: "IPv6, IPv4 dual stack cluster",
primaryCIDR: "2000::/108",
PrimaryIPv6: true,
isDualStack: true,
name: "IPv6, IPv4 dual stack cluster",
ipFamilies: []api.IPFamily{api.IPv6Protocol, api.IPv4Protocol},
},
}
@ -533,7 +547,7 @@ func TestServiceDefaulting(t *testing.T) {
// this func only works with dual stack feature gate on.
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, true)()
storage, _, server := makeStorage(t, testCase.primaryCIDR, testCase.isDualStack)
storage, _, server := makeStorage(t, testCase.ipFamilies)
defer server.Terminate(t)
defer storage.Store.DestroyFunc()
@ -551,7 +565,7 @@ func TestServiceDefaulting(t *testing.T) {
defaultedServiceList.Items[0].Spec.IPFamilyPolicy = &singleStack
// primary family
if testCase.PrimaryIPv6 {
if testCase.ipFamilies[0] == api.IPv6Protocol {
// no selector, gets both families
defaultedServiceList.Items[1].Spec.IPFamilyPolicy = &preferDualStack
defaultedServiceList.Items[1].Spec.IPFamilies = []api.IPFamily{api.IPv6Protocol, api.IPv4Protocol}
@ -559,7 +573,7 @@ func TestServiceDefaulting(t *testing.T) {
//assume single stack for w/selector
defaultedServiceList.Items[0].Spec.IPFamilies = []api.IPFamily{api.IPv6Protocol}
// make dualstacked. if needed
if testCase.isDualStack {
if len(testCase.ipFamilies) > 1 {
defaultedServiceList.Items[0].Spec.IPFamilyPolicy = &preferDualStack
defaultedServiceList.Items[0].Spec.IPFamilies = append(defaultedServiceList.Items[0].Spec.IPFamilies, api.IPv4Protocol)
}
@ -571,7 +585,7 @@ func TestServiceDefaulting(t *testing.T) {
// assume single stack for w/selector
defaultedServiceList.Items[0].Spec.IPFamilies = []api.IPFamily{api.IPv4Protocol}
// make dualstacked. if needed
if testCase.isDualStack {
if len(testCase.ipFamilies) > 1 {
defaultedServiceList.Items[0].Spec.IPFamilyPolicy = &preferDualStack
defaultedServiceList.Items[0].Spec.IPFamilies = append(defaultedServiceList.Items[0].Spec.IPFamilies, api.IPv6Protocol)
}

View File

@ -109,6 +109,7 @@ func (strategy svcStrategy) PrepareForCreate(ctx context.Context, obj runtime.Ob
service := obj.(*api.Service)
service.Status = api.ServiceStatus{}
//FIXME: Normalize is now called from BeginCreate in pkg/registry/core/service/storage
NormalizeClusterIPs(nil, service)
dropServiceDisabledFields(service, nil)
}
@ -120,6 +121,7 @@ func (strategy svcStrategy) PrepareForUpdate(ctx context.Context, obj, old runti
newService.Status = oldService.Status
patchAllocatedValues(newService, oldService)
//FIXME: Normalize is now called from BeginUpdate in pkg/registry/core/service/storage
NormalizeClusterIPs(oldService, newService)
dropServiceDisabledFields(newService, oldService)
dropTypeDependentFields(newService, oldService)
@ -362,6 +364,7 @@ func patchAllocatedValues(newSvc, oldSvc *api.Service) {
// NormalizeClusterIPs adjust clusterIPs based on ClusterIP. This must not
// consider any other fields.
//FIXME: move this to pkg/registry/core/service/storage
func NormalizeClusterIPs(oldSvc, newSvc *api.Service) {
// In all cases here, we don't need to over-think the inputs. Validation
// will be called on the new object soon enough. All this needs to do is