From e338c9db4b441b48dbbedc03ab5716d8da9bdc39 Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Wed, 25 Nov 2020 15:28:35 -0800 Subject: [PATCH] Svc REST: Set Cluster IPs during dry-run Create Dry-run should behave like a real API call and return valid results. --- pkg/api/service/testing/make.go | 7 + pkg/registry/core/service/storage/rest.go | 46 ++-- .../core/service/storage/rest_test.go | 4 +- .../core/service/storage/storage_test.go | 238 +++++++++--------- 4 files changed, 149 insertions(+), 146 deletions(-) diff --git a/pkg/api/service/testing/make.go b/pkg/api/service/testing/make.go index 1aad4814f96..6be0abe6c4a 100644 --- a/pkg/api/service/testing/make.go +++ b/pkg/api/service/testing/make.go @@ -192,6 +192,13 @@ func SetAllocateLoadBalancerNodePorts(val bool) Tweak { } } +// SetUniqueNodePorts sets all nodeports to unique values. +func SetUniqueNodePorts(svc *api.Service) { + for i := range svc.Spec.Ports { + svc.Spec.Ports[i].NodePort = int32(30000 + i) + } +} + // SetHealthCheckNodePort sets the healthCheckNodePort field for a Service. func SetHealthCheckNodePort(value int32) Tweak { return func(svc *api.Service) { diff --git a/pkg/registry/core/service/storage/rest.go b/pkg/registry/core/service/storage/rest.go index ba429b0e087..1f3d07dd805 100644 --- a/pkg/registry/core/service/storage/rest.go +++ b/pkg/registry/core/service/storage/rest.go @@ -190,15 +190,14 @@ func (al *RESTAllocStuff) allocateCreate(service *api.Service, dryRun bool) (tra } // Allocate ClusterIPs - //FIXME: we need to put values in, even if dry run - else validation should - //not pass. It does but that should be fixed. - if !dryRun { - if txn, err := al.allocServiceClusterIPsNew(service); err != nil { - result.Revert() - return nil, err - } else { - result = append(result, txn) - } + //TODO(thockin): validation should not pass with empty clusterIP, but it + //does (and is tested!). Fixing that all is a big PR and will have to + //happen later. + if txn, err := al.allocServiceClusterIPsNew(service, dryRun); err != nil { + result.Revert() + return nil, err + } else { + result = append(result, txn) } // Allocate ports @@ -378,6 +377,7 @@ func (rs *REST) Update(ctx context.Context, name string, objInfo rest.UpdatedObj // on failure: any ip that should be released, will *not* be released // on success: any ip that should be released, will be released defer func() { + //FIXME: plumb dryRun down here // release the allocated, this is expected to be cleared if the entire function ran to success if allocated_released, err := rs.alloc.releaseClusterIPs(allocated); err != nil { klog.V(4).Infof("service %v/%v failed to clean up after failed service update error:%v. Allocated/Released:%v/%v", service.Namespace, service.Name, err, allocated, allocated_released) @@ -385,6 +385,7 @@ func (rs *REST) Update(ctx context.Context, name string, objInfo rest.UpdatedObj } // performRelease is set when the enture function ran to success if performRelease { + //FIXME: plumb dryRun down here if toReleaseIPs_released, err := rs.alloc.releaseClusterIPs(toReleaseIPs); err != nil { klog.V(4).Infof("service %v/%v failed to clean up after failed service update error:%v. ShouldRelease/Released:%v/%v", service.Namespace, service.Name, err, toReleaseIPs, toReleaseIPs_released) } @@ -527,11 +528,14 @@ func (r *REST) ConvertToTable(ctx context.Context, object runtime.Object, tableO return r.services.ConvertToTable(ctx, object, tableOptions) } -func (al *RESTAllocStuff) allocClusterIPs(service *api.Service, toAlloc map[api.IPFamily]string) (map[api.IPFamily]string, error) { +func (al *RESTAllocStuff) allocClusterIPs(service *api.Service, toAlloc map[api.IPFamily]string, dryRun bool) (map[api.IPFamily]string, error) { allocated := make(map[api.IPFamily]string) for family, ip := range toAlloc { allocator := al.serviceIPAllocatorsByFamily[family] // should always be there, as we pre validate + if dryRun { + allocator = allocator.DryRun() + } if ip == "" { allocatedIP, err := allocator.AllocateNext() if err != nil { @@ -577,13 +581,13 @@ func (al *RESTAllocStuff) releaseClusterIPs(toRelease map[api.IPFamily]string) ( // standard allocator for dualstackgate==Off, hard wired dependency // and ignores policy, families and clusterIPs -func (al *RESTAllocStuff) allocServiceClusterIP(service *api.Service) (map[api.IPFamily]string, error) { +func (al *RESTAllocStuff) allocServiceClusterIP(service *api.Service, dryRun bool) (map[api.IPFamily]string, error) { toAlloc := make(map[api.IPFamily]string) // get clusterIP.. empty string if user did not specify an ip toAlloc[al.defaultServiceIPFamily] = service.Spec.ClusterIP // alloc - allocated, err := al.allocClusterIPs(service, toAlloc) + allocated, err := al.allocClusterIPs(service, toAlloc, dryRun) // set if err == nil { @@ -595,16 +599,19 @@ func (al *RESTAllocStuff) allocServiceClusterIP(service *api.Service) (map[api.I } //FIXME: merge into allocServiceClusterIPs rather than call it -func (al *RESTAllocStuff) allocServiceClusterIPsNew(service *api.Service) (transaction, error) { +func (al *RESTAllocStuff) allocServiceClusterIPsNew(service *api.Service, dryRun bool) (transaction, error) { // clusterIPs that were allocated may need to be released in case of // failure at a higher level. - toReleaseClusterIPs, err := al.allocServiceClusterIPs(service) + toReleaseClusterIPs, err := al.allocServiceClusterIPs(service, dryRun) if err != nil { return nil, err } txn := callbackTransaction{ revert: func() { + if dryRun { + return + } released, err := al.releaseClusterIPs(toReleaseClusterIPs) if err != nil { klog.Warningf("failed to release clusterIPs for failed new service:%v allocated:%v released:%v error:%v", @@ -616,7 +623,7 @@ func (al *RESTAllocStuff) allocServiceClusterIPsNew(service *api.Service) (trans } // allocates ClusterIPs for a service -func (al *RESTAllocStuff) allocServiceClusterIPs(service *api.Service) (map[api.IPFamily]string, error) { +func (al *RESTAllocStuff) allocServiceClusterIPs(service *api.Service, dryRun bool) (map[api.IPFamily]string, error) { // external name don't get ClusterIPs if service.Spec.Type == api.ServiceTypeExternalName { return nil, nil @@ -628,7 +635,7 @@ func (al *RESTAllocStuff) allocServiceClusterIPs(service *api.Service) (map[api. } if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) { - return al.allocServiceClusterIP(service) + return al.allocServiceClusterIP(service, dryRun) } toAlloc := make(map[api.IPFamily]string) @@ -651,7 +658,7 @@ func (al *RESTAllocStuff) allocServiceClusterIPs(service *api.Service) (map[api. } // allocate - allocated, err := al.allocClusterIPs(service, toAlloc) + allocated, err := al.allocClusterIPs(service, toAlloc, dryRun) // set if successful if err == nil { @@ -702,7 +709,8 @@ func (al *RESTAllocStuff) handleClusterIPsForUpdatedService(oldService *api.Serv // CASE A: // Update service from ExternalName to non-ExternalName, should initialize ClusterIP. if oldService.Spec.Type == api.ServiceTypeExternalName && service.Spec.Type != api.ServiceTypeExternalName { - allocated, err := al.allocServiceClusterIPs(service) + //FIXME: plumb dryRun down here + allocated, err := al.allocServiceClusterIPs(service, false) return allocated, nil, err } @@ -748,7 +756,7 @@ func (al *RESTAllocStuff) handleClusterIPsForUpdatedService(oldService *api.Serv toAllocate[service.Spec.IPFamilies[1]] = service.Spec.ClusterIPs[1] // allocate - allocated, err := al.allocClusterIPs(service, toAllocate) + allocated, err := al.allocClusterIPs(service, toAllocate, false) //FIXME: plumb dry-run down here // set if successful if err == nil { service.Spec.ClusterIPs[1] = allocated[service.Spec.IPFamilies[1]] diff --git a/pkg/registry/core/service/storage/rest_test.go b/pkg/registry/core/service/storage/rest_test.go index ae3ef8b195a..d00aa81a5bf 100644 --- a/pkg/registry/core/service/storage/rest_test.go +++ b/pkg/registry/core/service/storage/rest_test.go @@ -1062,7 +1062,7 @@ func TestAllocateLoadBalancerNodePorts(t *testing.T) { name: "allocate false, gate on, port specified", svc: svctest.MakeService("alloc-false-specific", svctest.SetTypeLoadBalancer, - svctest.SetNodePorts(30000), + svctest.SetUniqueNodePorts, svctest.SetAllocateLoadBalancerNodePorts(false)), expectNodePorts: true, allocateNodePortGate: true, @@ -1070,7 +1070,7 @@ func TestAllocateLoadBalancerNodePorts(t *testing.T) { name: "allocate true, gate on, port specified", svc: svctest.MakeService("alloc-true-specific", svctest.SetTypeLoadBalancer, - svctest.SetNodePorts(30000), + svctest.SetUniqueNodePorts, svctest.SetAllocateLoadBalancerNodePorts(true)), expectNodePorts: true, allocateNodePortGate: true, diff --git a/pkg/registry/core/service/storage/storage_test.go b/pkg/registry/core/service/storage/storage_test.go index 9e60205e77a..a8fc1f47e1a 100644 --- a/pkg/registry/core/service/storage/storage_test.go +++ b/pkg/registry/core/service/storage/storage_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/intstr" + machineryutilnet "k8s.io/apimachinery/pkg/util/net" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/generic" genericregistrytest "k8s.io/apiserver/pkg/registry/generic/testing" @@ -39,6 +40,7 @@ import ( api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/registry/core/service/ipallocator" + "k8s.io/kubernetes/pkg/registry/core/service/portallocator" "k8s.io/kubernetes/pkg/registry/registrytest" netutils "k8s.io/utils/net" ) @@ -51,6 +53,14 @@ func makeIPAllocator(cidr *net.IPNet) ipallocator.Interface { return al } +func makePortAllocator(ports machineryutilnet.PortRange) portallocator.Interface { + al, err := portallocator.NewInMemory(ports) + if err != nil { + panic(fmt.Sprintf("error creating port allocator: %v", err)) + } + return al +} + func newStorage(t *testing.T, ipFamilies []api.IPFamily) (*GenericREST, *StatusREST, *etcd3testing.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, "") restOptions := generic.RESTOptions{ @@ -74,7 +84,9 @@ func newStorage(t *testing.T, ipFamilies []api.IPFamily) (*GenericREST, *StatusR } } - serviceStorage, statusStorage, err := NewGenericREST(restOptions, ipFamilies[0], ipAllocs, nil) + portAlloc := makePortAllocator(*(machineryutilnet.ParsePortRangeOrDie("30000-32767"))) + + serviceStorage, statusStorage, err := NewGenericREST(restOptions, ipFamilies[0], ipAllocs, portAlloc) if err != nil { t.Fatalf("unexpected error from REST storage: %v", err) } @@ -5051,127 +5063,103 @@ func TestCreateInitIPFields(t *testing.T) { } } -// func TestServiceRegistryCreateDryRun(t *testing.T) { -// requireDualStack := api.IPFamilyPolicyRequireDualStack -// testCases := []struct { -// name string -// svc *api.Service -// enableDualStack bool -// }{ -// { -// name: "v4 service featuregate off", -// enableDualStack: false, -// svc: &api.Service{ -// ObjectMeta: metav1.ObjectMeta{Name: "foo"}, -// Spec: api.ServiceSpec{ -// Selector: map[string]string{"bar": "baz"}, -// SessionAffinity: api.ServiceAffinityNone, -// Type: api.ServiceTypeClusterIP, -// ClusterIP: "1.2.3.4", -// ClusterIPs: []string{"1.2.3.4"}, -// Ports: []api.ServicePort{{ -// Port: 6502, -// Protocol: api.ProtocolTCP, -// TargetPort: intstr.FromInt(6502), -// }}, -// }, -// }, -// }, -// { -// name: "v6 service featuregate on but singlestack", -// enableDualStack: true, -// svc: &api.Service{ -// ObjectMeta: metav1.ObjectMeta{Name: "foo"}, -// Spec: api.ServiceSpec{ -// Selector: map[string]string{"bar": "baz"}, -// SessionAffinity: api.ServiceAffinityNone, -// Type: api.ServiceTypeClusterIP, -// IPFamilies: []api.IPFamily{api.IPv6Protocol}, -// ClusterIP: "2000:0:0:0:0:0:0:1", -// ClusterIPs: []string{"2000:0:0:0:0:0:0:1"}, -// Ports: []api.ServicePort{{ -// Port: 6502, -// Protocol: api.ProtocolTCP, -// TargetPort: intstr.FromInt(6502), -// }}, -// }, -// }, -// }, -// { -// name: "dualstack v4,v6 service", -// enableDualStack: true, -// svc: &api.Service{ -// ObjectMeta: metav1.ObjectMeta{Name: "foo"}, -// Spec: api.ServiceSpec{ -// Selector: map[string]string{"bar": "baz"}, -// SessionAffinity: api.ServiceAffinityNone, -// Type: api.ServiceTypeClusterIP, -// IPFamilyPolicy: &requireDualStack, -// ClusterIP: "1.2.3.4", -// ClusterIPs: []string{"1.2.3.4", "2000:0:0:0:0:0:0:1"}, -// IPFamilies: []api.IPFamily{api.IPv4Protocol, api.IPv6Protocol}, -// Ports: []api.ServicePort{{ -// Port: 6502, -// Protocol: api.ProtocolTCP, -// TargetPort: intstr.FromInt(6502), -// }}, -// }, -// }, -// }, -// { -// name: "dualstack v6,v4 service", -// enableDualStack: true, -// svc: &api.Service{ -// ObjectMeta: metav1.ObjectMeta{Name: "foo"}, -// Spec: api.ServiceSpec{ -// Selector: map[string]string{"bar": "baz"}, -// SessionAffinity: api.ServiceAffinityNone, -// Type: api.ServiceTypeClusterIP, -// IPFamilyPolicy: &requireDualStack, -// ClusterIP: "2000:0:0:0:0:0:0:1", -// ClusterIPs: []string{"2000:0:0:0:0:0:0:1", "1.2.3.4"}, -// IPFamilies: []api.IPFamily{api.IPv6Protocol, api.IPv4Protocol}, -// Ports: []api.ServicePort{{ -// Port: 6502, -// Protocol: api.ProtocolTCP, -// TargetPort: intstr.FromInt(6502), -// }}, -// }, -// }, -// }, -// } -// -// for _, tc := range testCases { -// t.Run(tc.name, func(t *testing.T) { -// defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, tc.enableDualStack)() -// -// families := []api.IPFamily{api.IPv4Protocol} -// if tc.enableDualStack { -// families = append(families, api.IPv6Protocol) -// } -// storage, registry, server := NewTestREST(t, nil, families) -// defer server.Terminate(t) -// -// ctx := genericapirequest.NewDefaultContext() -// _, err := storage.Create(ctx, tc.svc, rest.ValidateAllObjectFunc, &metav1.CreateOptions{DryRun: []string{metav1.DryRunAll}}) -// if err != nil { -// t.Fatalf("Unexpected error: %v", err) -// } -// -// for i, family := range tc.svc.Spec.IPFamilies { -// alloc := storage.alloc.serviceIPAllocatorsByFamily[family] -// if alloc.Has(netutils.ParseIPSloppy(tc.svc.Spec.ClusterIPs[i])) { -// t.Errorf("unexpected side effect: ip allocated %v", tc.svc.Spec.ClusterIPs[i]) -// } -// } -// -// srv, err := registry.GetService(ctx, tc.svc.Name, &metav1.GetOptions{}) -// if err != nil { -// t.Errorf("unexpected error: %v", err) -// } -// if srv != nil { -// t.Errorf("unexpected service found: %v", srv) -// } -// }) -// } -// } +// Prove that a dry-run create doesn't actually allocate IPs or ports. +func TestCreateDryRun(t *testing.T) { + testCases := []struct { + name string + clusterFamilies []api.IPFamily + enableDualStack bool + svc *api.Service + }{{ + name: "singlestack:v4_gate:off_clusterip:unset", + clusterFamilies: []api.IPFamily{api.IPv4Protocol}, + enableDualStack: false, + svc: svctest.MakeService("foo"), + }, { + name: "singlestack:v4_gate:off_clusterip:set", + clusterFamilies: []api.IPFamily{api.IPv4Protocol}, + enableDualStack: false, + svc: svctest.MakeService("foo", svctest.SetClusterIPs("10.0.0.1")), + }, { + name: "singlestack:v6_gate:on_clusterip:unset", + clusterFamilies: []api.IPFamily{api.IPv6Protocol}, + enableDualStack: true, + svc: svctest.MakeService("foo"), + }, { + name: "singlestack:v6_gate:on_clusterip:set", + clusterFamilies: []api.IPFamily{api.IPv6Protocol}, + enableDualStack: true, + svc: svctest.MakeService("foo", svctest.SetClusterIPs("2000::1")), + }, { + name: "dualstack:v4v6_gate:on_clusterip:unset", + clusterFamilies: []api.IPFamily{api.IPv4Protocol, api.IPv6Protocol}, + enableDualStack: true, + svc: svctest.MakeService("foo", svctest.SetIPFamilyPolicy(api.IPFamilyPolicyPreferDualStack)), + }, { + name: "dualstack:v4v6_gate:on_clusterip:set", + clusterFamilies: []api.IPFamily{api.IPv4Protocol, api.IPv6Protocol}, + enableDualStack: true, + svc: svctest.MakeService("foo", svctest.SetIPFamilyPolicy(api.IPFamilyPolicyPreferDualStack), svctest.SetClusterIPs("10.0.0.1", "2000::1")), + }, { + name: "singlestack:v4_gate:off_type:NodePort_nodeport:unset", + clusterFamilies: []api.IPFamily{api.IPv4Protocol}, + enableDualStack: false, + svc: svctest.MakeService("foo", svctest.SetTypeNodePort), + }, { + name: "singlestack:v4_gate:on_type:LoadBalancer_nodePort:set", + clusterFamilies: []api.IPFamily{api.IPv4Protocol}, + enableDualStack: true, + svc: svctest.MakeService("foo", svctest.SetTypeLoadBalancer, svctest.SetUniqueNodePorts), + }} + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, tc.enableDualStack)() + + storage, _, server := newStorage(t, tc.clusterFamilies) + defer server.Terminate(t) + defer storage.Store.DestroyFunc() + + ctx := genericapirequest.NewDefaultContext() + createdObj, err := storage.Create(ctx, tc.svc, rest.ValidateAllObjectFunc, &metav1.CreateOptions{DryRun: []string{metav1.DryRunAll}}) + if err != nil { + t.Fatalf("unexpected error creating service: %v", err) + } + createdSvc := createdObj.(*api.Service) + + // Ensure IPs were allocated + if netutils.ParseIPSloppy(createdSvc.Spec.ClusterIP) == nil { + t.Errorf("expected valid clusterIP: %v", createdSvc.Spec.ClusterIP) + } + + // Ensure the IP allocators are clean. + if !tc.enableDualStack { + if storage.alloc.serviceIPAllocatorsByFamily[api.IPv4Protocol].Has(netutils.ParseIPSloppy(createdSvc.Spec.ClusterIP)) { + t.Errorf("expected IP to not be allocated: %v", createdSvc.Spec.ClusterIP) + } + } else { + for _, ip := range createdSvc.Spec.ClusterIPs { + if netutils.ParseIPSloppy(ip) == nil { + t.Errorf("expected valid clusterIP: %v", createdSvc.Spec.ClusterIP) + } + } + for i, fam := range createdSvc.Spec.IPFamilies { + if storage.alloc.serviceIPAllocatorsByFamily[fam].Has(netutils.ParseIPSloppy(createdSvc.Spec.ClusterIPs[i])) { + t.Errorf("expected IP to not be allocated: %v", createdSvc.Spec.ClusterIPs[i]) + } + } + } + + if tc.svc.Spec.Type != api.ServiceTypeClusterIP { + for _, p := range createdSvc.Spec.Ports { + if p.NodePort == 0 { + t.Errorf("expected a NodePort value") + } + if storage.alloc.serviceNodePorts.Has(int(p.NodePort)) { + t.Errorf("expected port to not be allocated: %v", p.NodePort) + } + } + } + }) + } +}