diff --git a/pkg/registry/core/rest/storage_core.go b/pkg/registry/core/rest/storage_core.go index 05e6ae1bcd3..9f5d7a668c5 100644 --- a/pkg/registry/core/rest/storage_core.go +++ b/pkg/registry/core/rest/storage_core.go @@ -254,16 +254,24 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err } + serviceIPAllocators := map[api.IPFamily]ipallocator.Interface{ + serviceClusterIPAllocator.IPFamily(): serviceClusterIPAllocator, + } + if secondaryServiceClusterIPAllocator != nil { + serviceIPAllocators[secondaryServiceClusterIPAllocator.IPFamily()] = secondaryServiceClusterIPAllocator + } + serviceRESTStorage, serviceStatusStorage, err := servicestore.NewGenericREST(restOptionsGetter, serviceClusterIPRange, secondaryServiceClusterIPAllocator != nil) if err != nil { return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err } - serviceRest, serviceRestProxy := servicestore.NewREST(serviceRESTStorage, + serviceRest, serviceRestProxy := servicestore.NewREST( + serviceRESTStorage, endpointsStorage, podStorage.Pod, - serviceClusterIPAllocator, - secondaryServiceClusterIPAllocator, + serviceClusterIPAllocator.IPFamily(), + serviceIPAllocators, serviceNodePortAllocator, c.ProxyTransport) diff --git a/pkg/registry/core/service/storage/rest.go b/pkg/registry/core/service/storage/rest.go index 0b30eb7cf07..1af49f057e2 100644 --- a/pkg/registry/core/service/storage/rest.go +++ b/pkg/registry/core/service/storage/rest.go @@ -51,14 +51,20 @@ import ( // REST adapts a service registry into apiserver's RESTStorage model. type REST struct { - strategy rest.RESTCreateUpdateStrategy - services ServiceStorage - endpoints EndpointsStorage + strategy rest.RESTCreateUpdateStrategy + services ServiceStorage + endpoints EndpointsStorage + alloc RESTAllocStuff + proxyTransport http.RoundTripper + pods rest.Getter +} + +// RESTAllocStuff is a temporary struct to facilitate the flattening of service +// REST layers. It will be cleaned up over a series of commits. +type RESTAllocStuff struct { serviceIPAllocatorsByFamily map[api.IPFamily]ipallocator.Interface defaultServiceIPFamily api.IPFamily // --service-cluster-ip-range[0] serviceNodePorts portallocator.Interface - proxyTransport http.RoundTripper - pods rest.Getter } // ServiceNodePort includes protocol and port number of a service NodePort. @@ -95,54 +101,37 @@ func NewREST( services ServiceStorage, endpoints EndpointsStorage, pods rest.Getter, - serviceIPs ipallocator.Interface, - secondaryServiceIPs ipallocator.Interface, - serviceNodePorts portallocator.Interface, + defaultFamily api.IPFamily, + ipAllocs map[api.IPFamily]ipallocator.Interface, + portAlloc portallocator.Interface, proxyTransport http.RoundTripper, ) (*REST, *registry.ProxyREST) { - strategy, _ := registry.StrategyForServiceCIDRs(serviceIPs.CIDR(), secondaryServiceIPs != nil) + strategy, _ := registry.StrategyForServiceCIDRs(ipAllocs[defaultFamily].CIDR(), len(ipAllocs) > 1) - byIPFamily := make(map[api.IPFamily]ipallocator.Interface) - - // detect this cluster default Service IPFamily (ipfamily of --service-cluster-ip-range[0]) - serviceIPFamily := api.IPv4Protocol - cidr := serviceIPs.CIDR() - if netutils.IsIPv6CIDR(&cidr) { - serviceIPFamily = api.IPv6Protocol - } - - // add primary family - byIPFamily[serviceIPFamily] = serviceIPs - - if secondaryServiceIPs != nil { - // process secondary family - secondaryServiceIPFamily := api.IPv6Protocol - - // get family of secondary - if serviceIPFamily == api.IPv6Protocol { - secondaryServiceIPFamily = api.IPv4Protocol - } - // add it - byIPFamily[secondaryServiceIPFamily] = secondaryServiceIPs - } - - klog.V(0).Infof("the default service ipfamily for this cluster is: %s", string(serviceIPFamily)) + klog.V(0).Infof("the default service ipfamily for this cluster is: %s", string(defaultFamily)) rest := &REST{ - strategy: strategy, - services: services, - endpoints: endpoints, - serviceIPAllocatorsByFamily: byIPFamily, - serviceNodePorts: serviceNodePorts, - defaultServiceIPFamily: serviceIPFamily, - proxyTransport: proxyTransport, - pods: pods, + strategy: strategy, + services: services, + endpoints: endpoints, + proxyTransport: proxyTransport, + pods: pods, + alloc: makeAlloc(defaultFamily, ipAllocs, portAlloc), } return rest, ®istry.ProxyREST{Redirector: rest, ProxyTransport: proxyTransport} } +// This is a trasitionary function to facilitate service REST flattening. +func makeAlloc(defaultFamily api.IPFamily, ipAllocs map[api.IPFamily]ipallocator.Interface, portAlloc portallocator.Interface) RESTAllocStuff { + return RESTAllocStuff{ + defaultServiceIPFamily: defaultFamily, + serviceIPAllocatorsByFamily: ipAllocs, + serviceNodePorts: portAlloc, + } +} + var ( _ ServiceStorage = &REST{} _ rest.CategoriesProvider = &REST{} @@ -226,7 +215,7 @@ func (rs *REST) Create(ctx context.Context, obj runtime.Object, createValidation } } - nodePortOp := portallocator.StartOperation(rs.serviceNodePorts, dryrun.IsDryRun(options.DryRun)) + nodePortOp := portallocator.StartOperation(rs.alloc.serviceNodePorts, dryrun.IsDryRun(options.DryRun)) defer nodePortOp.Finish() if service.Spec.Type == api.ServiceTypeNodePort || service.Spec.Type == api.ServiceTypeLoadBalancer { @@ -314,7 +303,7 @@ func (rs *REST) releaseAllocatedResources(svc *api.Service) { rs.releaseServiceClusterIPs(svc) for _, nodePort := range collectServiceNodePorts(svc) { - err := rs.serviceNodePorts.Release(nodePort) + err := rs.alloc.serviceNodePorts.Release(nodePort) if err != nil { // these should be caught by an eventual reconciliation / restart utilruntime.HandleError(fmt.Errorf("Error releasing service %s node port %d: %v", svc.Name, nodePort, err)) @@ -324,7 +313,7 @@ func (rs *REST) releaseAllocatedResources(svc *api.Service) { if apiservice.NeedsHealthCheck(svc) { nodePort := svc.Spec.HealthCheckNodePort if nodePort > 0 { - err := rs.serviceNodePorts.Release(int(nodePort)) + err := rs.alloc.serviceNodePorts.Release(int(nodePort)) if err != nil { // these should be caught by an eventual reconciliation / restart utilruntime.HandleError(fmt.Errorf("Error releasing service %s health check node port %d: %v", svc.Name, nodePort, err)) @@ -443,7 +432,7 @@ func (rs *REST) Update(ctx context.Context, name string, objInfo rest.UpdatedObj } }() - nodePortOp := portallocator.StartOperation(rs.serviceNodePorts, dryrun.IsDryRun(options.DryRun)) + nodePortOp := portallocator.StartOperation(rs.alloc.serviceNodePorts, dryrun.IsDryRun(options.DryRun)) defer nodePortOp.Finish() // try set ip families (for missing ip families) @@ -586,7 +575,7 @@ func (rs *REST) allocClusterIPs(service *api.Service, toAlloc map[api.IPFamily]s allocated := make(map[api.IPFamily]string) for family, ip := range toAlloc { - allocator := rs.serviceIPAllocatorsByFamily[family] // should always be there, as we pre validate + allocator := rs.alloc.serviceIPAllocatorsByFamily[family] // should always be there, as we pre validate if ip == "" { allocatedIP, err := allocator.AllocateNext() if err != nil { @@ -613,7 +602,7 @@ func (rs *REST) releaseClusterIPs(toRelease map[api.IPFamily]string) (map[api.IP released := make(map[api.IPFamily]string) for family, ip := range toRelease { - allocator, ok := rs.serviceIPAllocatorsByFamily[family] + allocator, ok := rs.alloc.serviceIPAllocatorsByFamily[family] if !ok { // cluster was configured for dual stack, then single stack klog.V(4).Infof("delete service. Not releasing ClusterIP:%v because IPFamily:%v is no longer configured on server", ip, family) @@ -636,14 +625,14 @@ func (rs *REST) allocServiceClusterIP(service *api.Service) (map[api.IPFamily]st toAlloc := make(map[api.IPFamily]string) // get clusterIP.. empty string if user did not specify an ip - toAlloc[rs.defaultServiceIPFamily] = service.Spec.ClusterIP + toAlloc[rs.alloc.defaultServiceIPFamily] = service.Spec.ClusterIP // alloc allocated, err := rs.allocClusterIPs(service, toAlloc) // set if err == nil { - service.Spec.ClusterIP = allocated[rs.defaultServiceIPFamily] - service.Spec.ClusterIPs = []string{allocated[rs.defaultServiceIPFamily]} + service.Spec.ClusterIP = allocated[rs.alloc.defaultServiceIPFamily] + service.Spec.ClusterIPs = []string{allocated[rs.alloc.defaultServiceIPFamily]} } return allocated, err @@ -752,7 +741,7 @@ func (rs *REST) handleClusterIPsForUpdatedService(oldService *api.Service, servi toRelease = make(map[api.IPFamily]string) if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) { // for non dual stack enabled cluster we use clusterIPs - toRelease[rs.defaultServiceIPFamily] = oldService.Spec.ClusterIP + toRelease[rs.alloc.defaultServiceIPFamily] = oldService.Spec.ClusterIP } else { // dual stack is enabled, collect ClusterIPs by families for i, family := range oldService.Spec.IPFamilies { @@ -963,14 +952,14 @@ func (rs *REST) tryDefaultValidateServiceClusterIPFields(oldService, service *ap if i >= len(service.Spec.IPFamilies) { if isIPv6 { // first make sure that family(ip) is configured - if _, found := rs.serviceIPAllocatorsByFamily[api.IPv6Protocol]; !found { + if _, found := rs.alloc.serviceIPAllocatorsByFamily[api.IPv6Protocol]; !found { el := field.ErrorList{field.Invalid(field.NewPath("spec", "clusterIPs").Index(i), service.Spec.ClusterIPs, "may not use IPv6 on a cluster which is not configured for it")} return errors.NewInvalid(api.Kind("Service"), service.Name, el) } service.Spec.IPFamilies = append(service.Spec.IPFamilies, api.IPv6Protocol) } else { // first make sure that family(ip) is configured - if _, found := rs.serviceIPAllocatorsByFamily[api.IPv4Protocol]; !found { + if _, found := rs.alloc.serviceIPAllocatorsByFamily[api.IPv4Protocol]; !found { el := field.ErrorList{field.Invalid(field.NewPath("spec", "clusterIPs").Index(i), service.Spec.ClusterIPs, "may not use IPv4 on a cluster which is not configured for it")} return errors.NewInvalid(api.Kind("Service"), service.Name, el) } @@ -997,7 +986,7 @@ func (rs *REST) tryDefaultValidateServiceClusterIPFields(oldService, service *ap // If IPFamilies was not set by the user, start with the default // family. if len(service.Spec.IPFamilies) == 0 { - service.Spec.IPFamilies = []api.IPFamily{rs.defaultServiceIPFamily} + service.Spec.IPFamilies = []api.IPFamily{rs.alloc.defaultServiceIPFamily} } // this follows headful services. With one exception on a single stack @@ -1024,13 +1013,13 @@ func (rs *REST) tryDefaultValidateServiceClusterIPFields(oldService, service *ap // asking for dual stack on a non dual stack cluster // should fail without assigning any family - if service.Spec.IPFamilyPolicy != nil && *(service.Spec.IPFamilyPolicy) == api.IPFamilyPolicyRequireDualStack && len(rs.serviceIPAllocatorsByFamily) < 2 { + if service.Spec.IPFamilyPolicy != nil && *(service.Spec.IPFamilyPolicy) == api.IPFamilyPolicyRequireDualStack && len(rs.alloc.serviceIPAllocatorsByFamily) < 2 { el = append(el, field.Invalid(field.NewPath("spec", "ipFamilyPolicy"), service.Spec.IPFamilyPolicy, "Cluster is not configured for dual stack services")) } // if there is a family requested then it has to be configured on cluster for i, ipFamily := range service.Spec.IPFamilies { - if _, found := rs.serviceIPAllocatorsByFamily[ipFamily]; !found { + if _, found := rs.alloc.serviceIPAllocatorsByFamily[ipFamily]; !found { el = append(el, field.Invalid(field.NewPath("spec", "ipFamilies").Index(i), service.Spec.ClusterIPs, fmt.Sprintf("ipfamily %v is not configured on cluster", ipFamily))) } } @@ -1049,14 +1038,14 @@ func (rs *REST) tryDefaultValidateServiceClusterIPFields(oldService, service *ap // nil families, gets cluster default (if feature flag is not in effect, the strategy will take care of removing it) if len(service.Spec.IPFamilies) == 0 { - service.Spec.IPFamilies = []api.IPFamily{rs.defaultServiceIPFamily} + service.Spec.IPFamilies = []api.IPFamily{rs.alloc.defaultServiceIPFamily} } // is this service looking for dual stack, and this cluster does have two families? // if so, then append the missing family if *(service.Spec.IPFamilyPolicy) != api.IPFamilyPolicySingleStack && len(service.Spec.IPFamilies) == 1 && - len(rs.serviceIPAllocatorsByFamily) == 2 { + len(rs.alloc.serviceIPAllocatorsByFamily) == 2 { if service.Spec.IPFamilies[0] == api.IPv4Protocol { service.Spec.IPFamilies = append(service.Spec.IPFamilies, api.IPv6Protocol) diff --git a/pkg/registry/core/service/storage/rest_test.go b/pkg/registry/core/service/storage/rest_test.go index c5ef587e006..1c2a9b59add 100644 --- a/pkg/registry/core/service/storage/rest_test.go +++ b/pkg/registry/core/service/storage/rest_test.go @@ -240,7 +240,13 @@ func NewTestRESTWithPods(t *testing.T, endpoints []*api.Endpoints, pods []api.Po t.Fatalf("cannot create port allocator %v", err) } - rest, _ := NewREST(serviceStorage, endpointStorage, podStorage.Pod, rPrimary, rSecondary, portAllocator, nil) + ipAllocators := map[api.IPFamily]ipallocator.Interface{ + rPrimary.IPFamily(): rPrimary, + } + if rSecondary != nil { + ipAllocators[rSecondary.IPFamily()] = rSecondary + } + rest, _ := NewREST(serviceStorage, endpointStorage, podStorage.Pod, rPrimary.IPFamily(), ipAllocators, portAllocator, nil) return rest, server } @@ -332,7 +338,7 @@ func TestServiceRegistryCreate(t *testing.T) { } for i, family := range createdService.Spec.IPFamilies { - allocator := storage.serviceIPAllocatorsByFamily[family] + allocator := storage.alloc.serviceIPAllocatorsByFamily[family] c := allocator.CIDR() cidr := &c if !cidr.Contains(netutils.ParseIPSloppy(createdService.Spec.ClusterIPs[i])) { @@ -399,7 +405,7 @@ func TestServiceRegistryCreateDryRun(t *testing.T) { } for i, family := range tc.svc.Spec.IPFamilies { - alloc := storage.serviceIPAllocatorsByFamily[family] + alloc := storage.alloc.serviceIPAllocatorsByFamily[family] if ipIsAllocated(t, alloc, tc.svc.Spec.ClusterIPs[i]) { t.Errorf("unexpected side effect: ip allocated %v", tc.svc.Spec.ClusterIPs[i]) } @@ -429,7 +435,7 @@ func TestDryRunNodePort(t *testing.T) { if createdSvc.Spec.Ports[0].NodePort == 0 { t.Errorf("expected NodePort value assigned") } - if portIsAllocated(t, storage.serviceNodePorts, createdSvc.Spec.Ports[0].NodePort) { + if portIsAllocated(t, storage.alloc.serviceNodePorts, createdSvc.Spec.Ports[0].NodePort) { t.Errorf("unexpected side effect: NodePort allocated") } _, err = getService(storage, ctx, svc.Name, &metav1.GetOptions{}) @@ -456,7 +462,7 @@ func TestDryRunNodePort(t *testing.T) { t.Errorf("Expected %v, but got %v", expectNodePorts, actualNodePorts) } for i := range svc.Spec.Ports { - if portIsAllocated(t, storage.serviceNodePorts, svc.Spec.Ports[i].NodePort) { + if portIsAllocated(t, storage.alloc.serviceNodePorts, svc.Spec.Ports[i].NodePort) { t.Errorf("unexpected side effect: NodePort allocated") } } @@ -560,7 +566,7 @@ func TestServiceRegistryCreateMultiNodePortsService(t *testing.T) { for i := range serviceNodePorts { nodePort := serviceNodePorts[i] // Release the node port at the end of the test case. - storage.serviceNodePorts.Release(nodePort) + storage.alloc.serviceNodePorts.Release(nodePort) } } } @@ -902,7 +908,7 @@ func TestServiceRegistryUpdateDryRun(t *testing.T) { if created { t.Errorf("expected not created") } - if portIsAllocated(t, storage.serviceNodePorts, new1.Spec.Ports[0].NodePort) { + if portIsAllocated(t, storage.alloc.serviceNodePorts, new1.Spec.Ports[0].NodePort) { t.Errorf("unexpected side effect: NodePort allocated") } @@ -915,7 +921,7 @@ func TestServiceRegistryUpdateDryRun(t *testing.T) { if err != nil { t.Fatalf("Expected no error: %v", err) } - if ipIsAllocated(t, storage.serviceIPAllocatorsByFamily[storage.defaultServiceIPFamily], new2.Spec.ClusterIP) { + if ipIsAllocated(t, storage.alloc.serviceIPAllocatorsByFamily[storage.alloc.defaultServiceIPFamily], new2.Spec.ClusterIP) { t.Errorf("unexpected side effect: ip allocated") } @@ -925,10 +931,10 @@ func TestServiceRegistryUpdateDryRun(t *testing.T) { t.Fatalf("Expected no error: %v", err) } svc = obj.(*api.Service) - if !ipIsAllocated(t, storage.serviceIPAllocatorsByFamily[storage.defaultServiceIPFamily], svc.Spec.ClusterIP) { + if !ipIsAllocated(t, storage.alloc.serviceIPAllocatorsByFamily[storage.alloc.defaultServiceIPFamily], svc.Spec.ClusterIP) { t.Errorf("expected IP to be allocated") } - if !portIsAllocated(t, storage.serviceNodePorts, svc.Spec.Ports[0].NodePort) { + if !portIsAllocated(t, storage.alloc.serviceNodePorts, svc.Spec.Ports[0].NodePort) { t.Errorf("expected NodePort to be allocated") } @@ -939,7 +945,7 @@ func TestServiceRegistryUpdateDryRun(t *testing.T) { if err != nil { t.Fatalf("Expected no error: %v", err) } - if !portIsAllocated(t, storage.serviceNodePorts, svc.Spec.Ports[0].NodePort) { + if !portIsAllocated(t, storage.alloc.serviceNodePorts, svc.Spec.Ports[0].NodePort) { t.Errorf("unexpected side effect: NodePort unallocated") } @@ -957,7 +963,7 @@ func TestServiceRegistryUpdateDryRun(t *testing.T) { if err != nil { t.Fatalf("expected no error: %v", err) } - if !ipIsAllocated(t, storage.serviceIPAllocatorsByFamily[storage.defaultServiceIPFamily], svc.Spec.ClusterIP) { + if !ipIsAllocated(t, storage.alloc.serviceIPAllocatorsByFamily[storage.alloc.defaultServiceIPFamily], svc.Spec.ClusterIP) { t.Errorf("unexpected side effect: ip unallocated") } } @@ -1130,14 +1136,14 @@ func TestServiceRegistryDeleteDryRun(t *testing.T) { if createdSvc.Spec.ClusterIP == "" { t.Fatalf("expected ClusterIP to be set") } - if !ipIsAllocated(t, storage.serviceIPAllocatorsByFamily[storage.defaultServiceIPFamily], createdSvc.Spec.ClusterIP) { + if !ipIsAllocated(t, storage.alloc.serviceIPAllocatorsByFamily[storage.alloc.defaultServiceIPFamily], createdSvc.Spec.ClusterIP) { t.Errorf("expected ClusterIP to be allocated") } _, _, err = storage.Delete(ctx, svc.Name, rest.ValidateAllObjectFunc, &metav1.DeleteOptions{DryRun: []string{metav1.DryRunAll}}) if err != nil { t.Fatalf("Expected no error: %v", err) } - if !ipIsAllocated(t, storage.serviceIPAllocatorsByFamily[storage.defaultServiceIPFamily], createdSvc.Spec.ClusterIP) { + if !ipIsAllocated(t, storage.alloc.serviceIPAllocatorsByFamily[storage.alloc.defaultServiceIPFamily], createdSvc.Spec.ClusterIP) { t.Errorf("unexpected side effect: ip unallocated") } @@ -1151,7 +1157,7 @@ func TestServiceRegistryDeleteDryRun(t *testing.T) { if createdSvc.Spec.Ports[0].NodePort == 0 { t.Fatalf("expected NodePort to be set") } - if !portIsAllocated(t, storage.serviceNodePorts, createdSvc.Spec.Ports[0].NodePort) { + if !portIsAllocated(t, storage.alloc.serviceNodePorts, createdSvc.Spec.Ports[0].NodePort) { t.Errorf("expected NodePort to be allocated") } @@ -1161,7 +1167,7 @@ func TestServiceRegistryDeleteDryRun(t *testing.T) { if err != nil { t.Fatalf("Expected no error: %v", err) } - if !portIsAllocated(t, storage.serviceNodePorts, createdSvc.Spec.Ports[0].NodePort) { + if !portIsAllocated(t, storage.alloc.serviceNodePorts, createdSvc.Spec.Ports[0].NodePort) { t.Errorf("unexpected side effect: NodePort unallocated") } } @@ -1189,7 +1195,7 @@ func TestDualStackServiceRegistryDeleteDryRun(t *testing.T) { t.Fatalf("Expected no error: %v", err) } for i, family := range dualstack_svc.Spec.IPFamilies { - if !ipIsAllocated(t, dualstack_storage.serviceIPAllocatorsByFamily[family], dualstack_svc.Spec.ClusterIPs[i]) { + if !ipIsAllocated(t, dualstack_storage.alloc.serviceIPAllocatorsByFamily[family], dualstack_svc.Spec.ClusterIPs[i]) { t.Errorf("unexpected side effect: ip unallocated %v", dualstack_svc.Spec.ClusterIPs[i]) } } @@ -1492,7 +1498,7 @@ func TestServiceRegistryIPAllocation(t *testing.T) { testIPs := []string{"1.2.3.93", "1.2.3.94", "1.2.3.95", "1.2.3.96"} testIP := "not-an-ip" for _, ip := range testIPs { - if !ipIsAllocated(t, storage.serviceIPAllocatorsByFamily[storage.defaultServiceIPFamily].(*ipallocator.Range), ip) { + if !ipIsAllocated(t, storage.alloc.serviceIPAllocatorsByFamily[storage.alloc.defaultServiceIPFamily].(*ipallocator.Range), ip) { testIP = ip break } @@ -1581,7 +1587,7 @@ func TestServiceRegistryIPUpdate(t *testing.T) { testIPs := []string{"1.2.3.93", "1.2.3.94", "1.2.3.95", "1.2.3.96"} testIP := "" for _, ip := range testIPs { - if !ipIsAllocated(t, storage.serviceIPAllocatorsByFamily[storage.defaultServiceIPFamily].(*ipallocator.Range), ip) { + if !ipIsAllocated(t, storage.alloc.serviceIPAllocatorsByFamily[storage.alloc.defaultServiceIPFamily].(*ipallocator.Range), ip) { testIP = ip break } @@ -1955,7 +1961,7 @@ func TestInitClusterIP(t *testing.T) { // pre allocate ips if any for family, ip := range test.preAllocateClusterIPs { - allocator, ok := storage.serviceIPAllocatorsByFamily[family] + allocator, ok := storage.alloc.serviceIPAllocatorsByFamily[family] if !ok { t.Fatalf("test is incorrect, allocator does not exist on rest") } @@ -1986,7 +1992,7 @@ func TestInitClusterIP(t *testing.T) { if netutils.IsIPv6String(ip) { family = api.IPv6Protocol } - allocator := storage.serviceIPAllocatorsByFamily[family] + allocator := storage.alloc.serviceIPAllocatorsByFamily[family] if !ipIsAllocated(t, allocator, ip) { t.Fatalf("expected ip:%v to be allocated by %v allocator. it was not", ip, family) } @@ -2021,7 +2027,7 @@ func TestInitClusterIP(t *testing.T) { return } - shouldUpgrade := len(newSvc.Spec.IPFamilies) == 2 && *(newSvc.Spec.IPFamilyPolicy) != api.IPFamilyPolicySingleStack && len(storage.serviceIPAllocatorsByFamily) == 2 + shouldUpgrade := len(newSvc.Spec.IPFamilies) == 2 && *(newSvc.Spec.IPFamilyPolicy) != api.IPFamilyPolicySingleStack && len(storage.alloc.serviceIPAllocatorsByFamily) == 2 if shouldUpgrade && len(newSvc.Spec.ClusterIPs) < 2 { t.Fatalf("Service should have been upgraded %+v", newSvc) } @@ -2037,7 +2043,7 @@ func TestInitClusterIP(t *testing.T) { func TestInitNodePorts(t *testing.T) { storage, server := NewTestREST(t, []api.IPFamily{api.IPv4Protocol}) defer server.Terminate(t) - nodePortOp := portallocator.StartOperation(storage.serviceNodePorts, false) + nodePortOp := portallocator.StartOperation(storage.alloc.serviceNodePorts, false) testCases := []struct { name string @@ -2103,7 +2109,7 @@ func TestInitNodePorts(t *testing.T) { serviceNodePorts := collectServiceNodePorts(test.service) if len(test.expectSpecifiedNodePorts) == 0 { for _, nodePort := range serviceNodePorts { - if !storage.serviceNodePorts.Has(nodePort) { + if !storage.alloc.serviceNodePorts.Has(nodePort) { t.Errorf("%q: unexpected NodePort %d, out of range", test.name, nodePort) } } @@ -2113,7 +2119,7 @@ func TestInitNodePorts(t *testing.T) { for i := range serviceNodePorts { nodePort := serviceNodePorts[i] // Release the node port at the end of the test case. - storage.serviceNodePorts.Release(nodePort) + storage.alloc.serviceNodePorts.Release(nodePort) } } } @@ -2121,7 +2127,7 @@ func TestInitNodePorts(t *testing.T) { func TestUpdateNodePorts(t *testing.T) { storage, server := NewTestREST(t, []api.IPFamily{api.IPv4Protocol}) defer server.Terminate(t) - nodePortOp := portallocator.StartOperation(storage.serviceNodePorts, false) + nodePortOp := portallocator.StartOperation(storage.alloc.serviceNodePorts, false) testCases := []struct { name string @@ -2218,7 +2224,7 @@ func TestUpdateNodePorts(t *testing.T) { serviceNodePorts := collectServiceNodePorts(test.newService) if len(test.expectSpecifiedNodePorts) == 0 { for _, nodePort := range serviceNodePorts { - if !storage.serviceNodePorts.Has(nodePort) { + if !storage.alloc.serviceNodePorts.Has(nodePort) { t.Errorf("%q: unexpected NodePort %d, out of range", test.name, nodePort) } } @@ -2228,7 +2234,7 @@ func TestUpdateNodePorts(t *testing.T) { for i := range serviceNodePorts { nodePort := serviceNodePorts[i] // Release the node port at the end of the test case. - storage.serviceNodePorts.Release(nodePort) + storage.alloc.serviceNodePorts.Release(nodePort) } } } @@ -2386,7 +2392,7 @@ func TestServiceUpgrade(t *testing.T) { createdSvc := obj.(*api.Service) // allocated IP for family, ip := range testCase.allocateIPsBeforeUpdate { - alloc := storage.serviceIPAllocatorsByFamily[family] + alloc := storage.alloc.serviceIPAllocatorsByFamily[family] if err := alloc.Allocate(netutils.ParseIPSloppy(ip)); err != nil { t.Fatalf("test is incorrect, unable to preallocate ip:%v", ip) } @@ -2418,7 +2424,7 @@ func TestServiceUpgrade(t *testing.T) { updatedSvc := updated.(*api.Service) isValidClusterIPFields(t, storage, updatedSvc, updatedSvc) - shouldUpgrade := len(createdSvc.Spec.IPFamilies) == 2 && *(createdSvc.Spec.IPFamilyPolicy) != api.IPFamilyPolicySingleStack && len(storage.serviceIPAllocatorsByFamily) == 2 + shouldUpgrade := len(createdSvc.Spec.IPFamilies) == 2 && *(createdSvc.Spec.IPFamilyPolicy) != api.IPFamilyPolicySingleStack && len(storage.alloc.serviceIPAllocatorsByFamily) == 2 if shouldUpgrade && len(updatedSvc.Spec.ClusterIPs) < 2 { t.Fatalf("Service should have been upgraded %+v", createdSvc) } @@ -2430,7 +2436,7 @@ func TestServiceUpgrade(t *testing.T) { // make sure that ips were allocated, correctly for i, family := range updatedSvc.Spec.IPFamilies { ip := updatedSvc.Spec.ClusterIPs[i] - allocator := storage.serviceIPAllocatorsByFamily[family] + allocator := storage.alloc.serviceIPAllocatorsByFamily[family] if !ipIsAllocated(t, allocator, ip) { t.Fatalf("expected ip:%v to be allocated by %v allocator. it was not", ip, family) } @@ -2562,7 +2568,7 @@ func TestServiceDowngrade(t *testing.T) { if shouldDowngrade { releasedIP := copySvc.Spec.ClusterIPs[1] releasedIPFamily := copySvc.Spec.IPFamilies[1] - allocator := storage.serviceIPAllocatorsByFamily[releasedIPFamily] + allocator := storage.alloc.serviceIPAllocatorsByFamily[releasedIPFamily] if ipIsAllocated(t, allocator, releasedIP) { t.Fatalf("expected ip:%v to be released by %v allocator. it was not", releasedIP, releasedIPFamily) @@ -2579,28 +2585,28 @@ func TestDefaultingValidation(t *testing.T) { // takes in REST and modify it for a specific config fnMakeSingleStackIPv4Allocator := func(rest *REST) { - rest.defaultServiceIPFamily = api.IPv4Protocol - rest.serviceIPAllocatorsByFamily = map[api.IPFamily]ipallocator.Interface{api.IPv4Protocol: rest.serviceIPAllocatorsByFamily[api.IPv4Protocol]} + rest.alloc.defaultServiceIPFamily = api.IPv4Protocol + rest.alloc.serviceIPAllocatorsByFamily = map[api.IPFamily]ipallocator.Interface{api.IPv4Protocol: rest.alloc.serviceIPAllocatorsByFamily[api.IPv4Protocol]} } fnMakeSingleStackIPv6Allocator := func(rest *REST) { - rest.defaultServiceIPFamily = api.IPv6Protocol - rest.serviceIPAllocatorsByFamily = map[api.IPFamily]ipallocator.Interface{api.IPv6Protocol: rest.serviceIPAllocatorsByFamily[api.IPv6Protocol]} + rest.alloc.defaultServiceIPFamily = api.IPv6Protocol + rest.alloc.serviceIPAllocatorsByFamily = map[api.IPFamily]ipallocator.Interface{api.IPv6Protocol: rest.alloc.serviceIPAllocatorsByFamily[api.IPv6Protocol]} } fnMakeDualStackStackIPv4IPv6Allocator := func(rest *REST) { - rest.defaultServiceIPFamily = api.IPv4Protocol - rest.serviceIPAllocatorsByFamily = map[api.IPFamily]ipallocator.Interface{ - api.IPv6Protocol: rest.serviceIPAllocatorsByFamily[api.IPv6Protocol], - api.IPv4Protocol: rest.serviceIPAllocatorsByFamily[api.IPv4Protocol], + rest.alloc.defaultServiceIPFamily = api.IPv4Protocol + rest.alloc.serviceIPAllocatorsByFamily = map[api.IPFamily]ipallocator.Interface{ + api.IPv6Protocol: rest.alloc.serviceIPAllocatorsByFamily[api.IPv6Protocol], + api.IPv4Protocol: rest.alloc.serviceIPAllocatorsByFamily[api.IPv4Protocol], } } fnMakeDualStackStackIPv6IPv4Allocator := func(rest *REST) { - rest.defaultServiceIPFamily = api.IPv6Protocol - rest.serviceIPAllocatorsByFamily = map[api.IPFamily]ipallocator.Interface{ - api.IPv6Protocol: rest.serviceIPAllocatorsByFamily[api.IPv6Protocol], - api.IPv4Protocol: rest.serviceIPAllocatorsByFamily[api.IPv4Protocol], + rest.alloc.defaultServiceIPFamily = api.IPv6Protocol + rest.alloc.serviceIPAllocatorsByFamily = map[api.IPFamily]ipallocator.Interface{ + api.IPv6Protocol: rest.alloc.serviceIPAllocatorsByFamily[api.IPv6Protocol], + api.IPv4Protocol: rest.alloc.serviceIPAllocatorsByFamily[api.IPv4Protocol], } }