diff --git a/pkg/registry/core/rest/storage_core.go b/pkg/registry/core/rest/storage_core.go index 9f5d7a668c5..d2fa6a1d45f 100644 --- a/pkg/registry/core/rest/storage_core.go +++ b/pkg/registry/core/rest/storage_core.go @@ -261,7 +261,7 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi serviceIPAllocators[secondaryServiceClusterIPAllocator.IPFamily()] = secondaryServiceClusterIPAllocator } - serviceRESTStorage, serviceStatusStorage, err := servicestore.NewGenericREST(restOptionsGetter, serviceClusterIPRange, secondaryServiceClusterIPAllocator != nil) + serviceRESTStorage, serviceStatusStorage, err := servicestore.NewGenericREST(restOptionsGetter, serviceClusterIPAllocator.IPFamily(), serviceIPAllocators, serviceNodePortAllocator) if err != nil { return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err } diff --git a/pkg/registry/core/service/storage/rest.go b/pkg/registry/core/service/storage/rest.go index 04d04057646..52e8f199020 100644 --- a/pkg/registry/core/service/storage/rest.go +++ b/pkg/registry/core/service/storage/rest.go @@ -177,41 +177,7 @@ func (rs *REST) Watch(ctx context.Context, options *metainternalversion.ListOpti } func (rs *REST) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) { - // DeepCopy to prevent writes here propagating back to tests. - obj = obj.DeepCopyObject() - - service := obj.(*api.Service) - - if err := rest.BeforeCreate(rs.strategy, ctx, obj); err != nil { - return nil, err - } - - // Allocate IPs and ports. If we had a transactional store, this would just - // be part of the larger transaction. We don't have that, so we have to do - // it manually. This has to happen here and not in any earlier hooks (e.g. - // defaulting) because it needs to be aware of flags and be able to access - // API storage. - txn, err := rs.alloc.allocateCreate(service, dryrun.IsDryRun(options.DryRun)) - if err != nil { - return nil, err - } - defer func() { - if txn != nil { - txn.Revert() - } - }() - - out, err := rs.services.Create(ctx, service, createValidation, options) - if err != nil { - err = rest.CheckGeneratedNameError(ctx, rs.strategy, err, service) - } - - if err == nil { - txn.Commit() - txn = nil - } - - return out, err + return rs.services.Create(ctx, obj, createValidation, options) } func (al *RESTAllocStuff) allocateCreate(service *api.Service, dryRun bool) (transaction, error) { diff --git a/pkg/registry/core/service/storage/rest_test.go b/pkg/registry/core/service/storage/rest_test.go index 096b0557809..9444a651d6a 100644 --- a/pkg/registry/core/service/storage/rest_test.go +++ b/pkg/registry/core/service/storage/rest_test.go @@ -37,6 +37,7 @@ import ( "k8s.io/apiserver/pkg/registry/generic" "k8s.io/apiserver/pkg/registry/rest" etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" + "k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/util/dryrun" utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" @@ -60,6 +61,7 @@ import ( // in a completely different way. We should unify it. type serviceStorage struct { + inner *GenericREST Services map[string]*api.Service } @@ -118,12 +120,16 @@ func (s *serviceStorage) New() runtime.Object { } func (s *serviceStorage) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) { - if dryrun.IsDryRun(options.DryRun) { - return obj, nil + ret, err := s.inner.Create(ctx, obj, createValidation, options) + if err != nil { + return ret, err } - svc := obj.(*api.Service) + + if dryrun.IsDryRun(options.DryRun) { + return ret.DeepCopyObject(), nil + } + svc := ret.(*api.Service) s.saveService(svc) - s.Services[svc.Name].ResourceVersion = "1" return s.Services[svc.Name].DeepCopy(), nil } @@ -173,8 +179,6 @@ func NewTestREST(t *testing.T, ipFamilies []api.IPFamily) (*REST, *etcd3testing. func NewTestRESTWithPods(t *testing.T, endpoints []*api.Endpoints, pods []api.Pod, ipFamilies []api.IPFamily) (*REST, *etcd3testing.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, "") - serviceStorage := &serviceStorage{} - podStorage, err := podstore.NewStorage(generic.RESTOptions{ StorageConfig: etcdStorage.ForResource(schema.GroupResource{Resource: "pods"}), Decorator: generic.UndecoratedStorage, @@ -246,11 +250,29 @@ func NewTestRESTWithPods(t *testing.T, endpoints []*api.Endpoints, pods []api.Po if rSecondary != nil { ipAllocators[rSecondary.IPFamily()] = rSecondary } - rest, _ := NewREST(serviceStorage, endpointStorage, podStorage.Pod, rPrimary.IPFamily(), ipAllocators, portAllocator, nil) + + inner := newInnerREST(t, etcdStorage, ipAllocators, portAllocator) + rest, _ := NewREST(inner, endpointStorage, podStorage.Pod, rPrimary.IPFamily(), ipAllocators, portAllocator, nil) return rest, server } +// This bridges to the "inner" REST implementation so tests continue to run +// during the delayering of service REST code. +func newInnerREST(t *testing.T, etcdStorage *storagebackend.ConfigForResource, ipAllocs map[api.IPFamily]ipallocator.Interface, portAlloc portallocator.Interface) *serviceStorage { + restOptions := generic.RESTOptions{ + StorageConfig: etcdStorage.ForResource(schema.GroupResource{Resource: "services"}), + Decorator: generic.UndecoratedStorage, + DeleteCollectionWorkers: 1, + ResourcePrefix: "services", + } + inner, _, err := NewGenericREST(restOptions, api.IPv4Protocol, ipAllocs, portAlloc) + if err != nil { + t.Fatalf("unexpected error from REST storage: %v", err) + } + return &serviceStorage{inner: inner} +} + func makeIPNet(t *testing.T) *net.IPNet { _, net, err := netutils.ParseCIDRSloppy("1.2.3.0/24") if err != nil { diff --git a/pkg/registry/core/service/storage/storage.go b/pkg/registry/core/service/storage/storage.go index ed8e5be0f35..6b0e1a81309 100644 --- a/pkg/registry/core/service/storage/storage.go +++ b/pkg/registry/core/service/storage/storage.go @@ -18,13 +18,13 @@ package storage import ( "context" - "net" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apiserver/pkg/registry/generic" genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" "k8s.io/apiserver/pkg/registry/rest" + "k8s.io/apiserver/pkg/util/dryrun" utilfeature "k8s.io/apiserver/pkg/util/feature" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/features" @@ -32,8 +32,9 @@ import ( printersinternal "k8s.io/kubernetes/pkg/printers/internalversion" printerstorage "k8s.io/kubernetes/pkg/printers/storage" "k8s.io/kubernetes/pkg/registry/core/service" - registry "k8s.io/kubernetes/pkg/registry/core/service" svcreg "k8s.io/kubernetes/pkg/registry/core/service" + "k8s.io/kubernetes/pkg/registry/core/service/ipallocator" + "k8s.io/kubernetes/pkg/registry/core/service/portallocator" "sigs.k8s.io/structured-merge-diff/v4/fieldpath" netutil "k8s.io/utils/net" @@ -41,13 +42,19 @@ import ( type GenericREST struct { *genericregistry.Store - primaryIPFamily *api.IPFamily - secondaryFamily *api.IPFamily + primaryIPFamily api.IPFamily + secondaryIPFamily api.IPFamily + alloc RESTAllocStuff } // NewGenericREST returns a RESTStorage object that will work against services. -func NewGenericREST(optsGetter generic.RESTOptionsGetter, serviceCIDR net.IPNet, hasSecondary bool) (*GenericREST, *StatusREST, error) { - strategy, _ := registry.StrategyForServiceCIDRs(serviceCIDR, hasSecondary) +func NewGenericREST( + optsGetter generic.RESTOptionsGetter, + serviceIPFamily api.IPFamily, + ipAllocs map[api.IPFamily]ipallocator.Interface, + portAlloc portallocator.Interface) (*GenericREST, *StatusREST, error) { + + strategy, _ := svcreg.StrategyForServiceCIDRs(ipAllocs[serviceIPFamily].CIDR(), len(ipAllocs) > 1) store := &genericregistry.Store{ NewFunc: func() runtime.Object { return &api.Service{} }, @@ -72,22 +79,12 @@ func NewGenericREST(optsGetter generic.RESTOptionsGetter, serviceCIDR net.IPNet, statusStore.UpdateStrategy = statusStrategy statusStore.ResetFieldsStrategy = statusStrategy - ipv4 := api.IPv4Protocol - ipv6 := api.IPv6Protocol - var primaryIPFamily *api.IPFamily - var secondaryFamily *api.IPFamily - if netutil.IsIPv6CIDR(&serviceCIDR) { - primaryIPFamily = &ipv6 - if hasSecondary { - secondaryFamily = &ipv4 - } - } else { - primaryIPFamily = &ipv4 - if hasSecondary { - secondaryFamily = &ipv6 - } + var primaryIPFamily api.IPFamily = serviceIPFamily + var secondaryIPFamily api.IPFamily = "" // sentinel value + if len(ipAllocs) > 1 { + secondaryIPFamily = otherFamily(serviceIPFamily) } - genericStore := &GenericREST{store, primaryIPFamily, secondaryFamily} + genericStore := &GenericREST{store, primaryIPFamily, secondaryIPFamily, makeAlloc(serviceIPFamily, ipAllocs, portAlloc)} store.Decorator = genericStore.defaultOnRead store.BeginCreate = genericStore.beginCreate store.BeginUpdate = genericStore.beginUpdate @@ -95,6 +92,15 @@ func NewGenericREST(optsGetter generic.RESTOptionsGetter, serviceCIDR net.IPNet, return genericStore, &StatusREST{store: &statusStore}, nil } +// otherFamily returns the non-selected IPFamily. This assumes the input is +// valid. +func otherFamily(fam api.IPFamily) api.IPFamily { + if fam == api.IPv4Protocol { + return api.IPv6Protocol + } + return api.IPv4Protocol +} + var ( _ rest.ShortNamesProvider = &GenericREST{} _ rest.CategoriesProvider = &GenericREST{} @@ -196,7 +202,7 @@ func (r *GenericREST) defaultOnReadService(service *api.Service) { preferDualStack := api.IPFamilyPolicyPreferDualStack // headless services if len(service.Spec.ClusterIPs) == 1 && service.Spec.ClusterIPs[0] == api.ClusterIPNone { - service.Spec.IPFamilies = []api.IPFamily{*r.primaryIPFamily} + service.Spec.IPFamilies = []api.IPFamily{r.primaryIPFamily} // headless+selectorless // headless+selectorless takes both families. Why? @@ -205,7 +211,7 @@ func (r *GenericREST) defaultOnReadService(service *api.Service) { // it to PreferDualStack on any cluster (single or dualstack configured). if len(service.Spec.Selector) == 0 { service.Spec.IPFamilyPolicy = &preferDualStack - if *r.primaryIPFamily == api.IPv4Protocol { + if r.primaryIPFamily == api.IPv4Protocol { service.Spec.IPFamilies = append(service.Spec.IPFamilies, api.IPv6Protocol) } else { service.Spec.IPFamilies = append(service.Spec.IPFamilies, api.IPv4Protocol) @@ -216,8 +222,8 @@ func (r *GenericREST) defaultOnReadService(service *api.Service) { // selector and will have to follow how the cluster is configured. If the cluster is // configured to dual stack then the service defaults to PreferDualStack. Otherwise we // default it to SingleStack. - if r.secondaryFamily != nil { - service.Spec.IPFamilies = append(service.Spec.IPFamilies, *r.secondaryFamily) + if r.secondaryIPFamily != "" { + service.Spec.IPFamilies = append(service.Spec.IPFamilies, r.secondaryIPFamily) service.Spec.IPFamilyPolicy = &preferDualStack } else { service.Spec.IPFamilyPolicy = &singleStack @@ -246,13 +252,27 @@ func (r *GenericREST) defaultOnReadService(service *api.Service) { func (r *GenericREST) beginCreate(ctx context.Context, obj runtime.Object, options *metav1.CreateOptions) (genericregistry.FinishFunc, error) { svc := obj.(*api.Service) - // FIXME: remove this when implementing - _ = svc + // Make sure ClusterIP and ClusterIPs are in sync. This has to happen + // early, before anyone looks at them. + // NOTE: the args are (old, new) + svcreg.NormalizeClusterIPs(nil, svc) + + // Allocate IPs and ports. If we had a transactional store, this would just + // be part of the larger transaction. We don't have that, so we have to do + // it manually. This has to happen here and not in any earlier hooks (e.g. + // defaulting) because it needs to be aware of flags and be able to access + // API storage. + txn, err := r.alloc.allocateCreate(svc, dryrun.IsDryRun(options.DryRun)) + if err != nil { + return nil, err + } // Our cleanup callback finish := func(_ context.Context, success bool) { if success { + txn.Commit() } else { + txn.Revert() } } @@ -263,9 +283,10 @@ func (r *GenericREST) beginUpdate(ctx context.Context, obj, oldObj runtime.Objec newSvc := obj.(*api.Service) oldSvc := oldObj.(*api.Service) - // FIXME: remove these when implementing - _ = oldSvc - _ = newSvc + // Make sure ClusterIP and ClusterIPs are in sync. This has to happen + // early, before anyone looks at them. + // NOTE: the args are (old, new) + svcreg.NormalizeClusterIPs(oldSvc, newSvc) // Our cleanup callback finish := func(_ context.Context, success bool) { diff --git a/pkg/registry/core/service/storage/storage_test.go b/pkg/registry/core/service/storage/storage_test.go index 6fb4b76141d..43b72e14350 100644 --- a/pkg/registry/core/service/storage/storage_test.go +++ b/pkg/registry/core/service/storage/storage_test.go @@ -17,6 +17,8 @@ limitations under the License. package storage import ( + "fmt" + "net" "reflect" "testing" @@ -30,6 +32,7 @@ import ( genericregistrytest "k8s.io/apiserver/pkg/registry/generic/testing" etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" api "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/kubernetes/pkg/registry/core/service/ipallocator" "k8s.io/kubernetes/pkg/registry/registrytest" netutils "k8s.io/utils/net" @@ -38,6 +41,14 @@ import ( "k8s.io/kubernetes/pkg/features" ) +func makeIPAllocator(cidr *net.IPNet) ipallocator.Interface { + al, err := ipallocator.NewInMemory(cidr) + if err != nil { + panic(fmt.Sprintf("error creating IP allocator: %v", err)) + } + return al +} + func newStorage(t *testing.T) (*GenericREST, *StatusREST, *etcd3testing.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, "") restOptions := generic.RESTOptions{ @@ -46,7 +57,10 @@ func newStorage(t *testing.T) (*GenericREST, *StatusREST, *etcd3testing.EtcdTest DeleteCollectionWorkers: 1, ResourcePrefix: "services", } - serviceStorage, statusStorage, err := NewGenericREST(restOptions, *makeIPNet(t), false) + ipAllocs := map[api.IPFamily]ipallocator.Interface{ + api.IPv4Protocol: makeIPAllocator(makeIPNet(t)), + } + serviceStorage, statusStorage, err := NewGenericREST(restOptions, api.IPv4Protocol, ipAllocs, nil) if err != nil { t.Fatalf("unexpected error from REST storage: %v", err) } @@ -427,7 +441,10 @@ func TestServiceDefaultOnRead(t *testing.T) { t.Fatalf("failed to parse CIDR") } - serviceStorage, _, err := NewGenericREST(restOptions, *cidr, false) + ipAllocs := map[api.IPFamily]ipallocator.Interface{ + api.IPv4Protocol: makeIPAllocator(cidr), + } + serviceStorage, _, err := NewGenericREST(restOptions, api.IPv4Protocol, ipAllocs, nil) if err != nil { t.Fatalf("unexpected error from REST storage: %v", err) } @@ -471,7 +488,7 @@ func TestServiceDefaultOnRead(t *testing.T) { } func TestServiceDefaulting(t *testing.T) { - makeStorage := func(t *testing.T, primaryCIDR string, isDualStack bool) (*GenericREST, *StatusREST, *etcd3testing.EtcdTestServer) { + makeStorage := func(t *testing.T, ipFamilies []api.IPFamily) (*GenericREST, *StatusREST, *etcd3testing.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, "") restOptions := generic.RESTOptions{ StorageConfig: etcdStorage.ForResource(schema.GroupResource{Resource: "services"}), @@ -480,12 +497,19 @@ func TestServiceDefaulting(t *testing.T) { ResourcePrefix: "services", } - _, cidr, err := netutils.ParseCIDRSloppy(primaryCIDR) - if err != nil { - t.Fatalf("failed to parse CIDR %s", primaryCIDR) + ipAllocs := map[api.IPFamily]ipallocator.Interface{} + for _, fam := range ipFamilies { + switch fam { + case api.IPv4Protocol: + _, cidr, _ := netutils.ParseCIDRSloppy("10.0.0.0/16") + ipAllocs[fam] = makeIPAllocator(cidr) + case api.IPv6Protocol: + _, cidr, _ := netutils.ParseCIDRSloppy("2000::/108") + ipAllocs[fam] = makeIPAllocator(cidr) + } } - serviceStorage, statusStorage, err := NewGenericREST(restOptions, *(cidr), isDualStack) + serviceStorage, statusStorage, err := NewGenericREST(restOptions, ipFamilies[0], ipAllocs, nil) if err != nil { t.Fatalf("unexpected error from REST storage: %v", err) } @@ -493,35 +517,25 @@ func TestServiceDefaulting(t *testing.T) { } testCases := []struct { - name string - primaryCIDR string - PrimaryIPv6 bool - isDualStack bool + name string + ipFamilies []api.IPFamily }{ { - name: "IPv4 single stack cluster", - primaryCIDR: "10.0.0.0/16", - PrimaryIPv6: false, - isDualStack: false, + name: "IPv4 single stack cluster", + ipFamilies: []api.IPFamily{api.IPv4Protocol}, }, { - name: "IPv6 single stack cluster", - primaryCIDR: "2000::/108", - PrimaryIPv6: true, - isDualStack: false, + name: "IPv6 single stack cluster", + ipFamilies: []api.IPFamily{api.IPv6Protocol}, }, { - name: "IPv4, IPv6 dual stack cluster", - primaryCIDR: "10.0.0.0/16", - PrimaryIPv6: false, - isDualStack: true, + name: "IPv4, IPv6 dual stack cluster", + ipFamilies: []api.IPFamily{api.IPv4Protocol, api.IPv6Protocol}, }, { - name: "IPv6, IPv4 dual stack cluster", - primaryCIDR: "2000::/108", - PrimaryIPv6: true, - isDualStack: true, + name: "IPv6, IPv4 dual stack cluster", + ipFamilies: []api.IPFamily{api.IPv6Protocol, api.IPv4Protocol}, }, } @@ -533,7 +547,7 @@ func TestServiceDefaulting(t *testing.T) { // this func only works with dual stack feature gate on. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, true)() - storage, _, server := makeStorage(t, testCase.primaryCIDR, testCase.isDualStack) + storage, _, server := makeStorage(t, testCase.ipFamilies) defer server.Terminate(t) defer storage.Store.DestroyFunc() @@ -551,7 +565,7 @@ func TestServiceDefaulting(t *testing.T) { defaultedServiceList.Items[0].Spec.IPFamilyPolicy = &singleStack // primary family - if testCase.PrimaryIPv6 { + if testCase.ipFamilies[0] == api.IPv6Protocol { // no selector, gets both families defaultedServiceList.Items[1].Spec.IPFamilyPolicy = &preferDualStack defaultedServiceList.Items[1].Spec.IPFamilies = []api.IPFamily{api.IPv6Protocol, api.IPv4Protocol} @@ -559,7 +573,7 @@ func TestServiceDefaulting(t *testing.T) { //assume single stack for w/selector defaultedServiceList.Items[0].Spec.IPFamilies = []api.IPFamily{api.IPv6Protocol} // make dualstacked. if needed - if testCase.isDualStack { + if len(testCase.ipFamilies) > 1 { defaultedServiceList.Items[0].Spec.IPFamilyPolicy = &preferDualStack defaultedServiceList.Items[0].Spec.IPFamilies = append(defaultedServiceList.Items[0].Spec.IPFamilies, api.IPv4Protocol) } @@ -571,7 +585,7 @@ func TestServiceDefaulting(t *testing.T) { // assume single stack for w/selector defaultedServiceList.Items[0].Spec.IPFamilies = []api.IPFamily{api.IPv4Protocol} // make dualstacked. if needed - if testCase.isDualStack { + if len(testCase.ipFamilies) > 1 { defaultedServiceList.Items[0].Spec.IPFamilyPolicy = &preferDualStack defaultedServiceList.Items[0].Spec.IPFamilies = append(defaultedServiceList.Items[0].Spec.IPFamilies, api.IPv6Protocol) } diff --git a/pkg/registry/core/service/strategy.go b/pkg/registry/core/service/strategy.go index 10f1f985339..3439b423924 100644 --- a/pkg/registry/core/service/strategy.go +++ b/pkg/registry/core/service/strategy.go @@ -109,6 +109,7 @@ func (strategy svcStrategy) PrepareForCreate(ctx context.Context, obj runtime.Ob service := obj.(*api.Service) service.Status = api.ServiceStatus{} + //FIXME: Normalize is now called from BeginCreate in pkg/registry/core/service/storage NormalizeClusterIPs(nil, service) dropServiceDisabledFields(service, nil) } @@ -120,6 +121,7 @@ func (strategy svcStrategy) PrepareForUpdate(ctx context.Context, obj, old runti newService.Status = oldService.Status patchAllocatedValues(newService, oldService) + //FIXME: Normalize is now called from BeginUpdate in pkg/registry/core/service/storage NormalizeClusterIPs(oldService, newService) dropServiceDisabledFields(newService, oldService) dropTypeDependentFields(newService, oldService) @@ -362,6 +364,7 @@ func patchAllocatedValues(newSvc, oldSvc *api.Service) { // NormalizeClusterIPs adjust clusterIPs based on ClusterIP. This must not // consider any other fields. +//FIXME: move this to pkg/registry/core/service/storage func NormalizeClusterIPs(oldSvc, newSvc *api.Service) { // In all cases here, we don't need to over-think the inputs. Validation // will be called on the new object soon enough. All this needs to do is