diff --git a/pkg/registry/core/service/storage/rest.go b/pkg/registry/core/service/storage/rest.go index 1af49f057e2..093232c8692 100644 --- a/pkg/registry/core/service/storage/rest.go +++ b/pkg/registry/core/service/storage/rest.go @@ -193,7 +193,7 @@ func (rs *REST) Create(ctx context.Context, obj runtime.Object, createValidation // TODO: this should probably move to strategy.PrepareForCreate() defer func() { - released, err := rs.releaseClusterIPs(toReleaseClusterIPs) + released, err := rs.alloc.releaseClusterIPs(toReleaseClusterIPs) if err != nil { klog.Warningf("failed to release clusterIPs for failed new service:%v allocated:%v released:%v error:%v", service.Name, toReleaseClusterIPs, released, err) @@ -203,13 +203,13 @@ func (rs *REST) Create(ctx context.Context, obj runtime.Object, createValidation // try set ip families (for missing ip families) // we do it here, since we want this to be visible // even when dryRun == true - if err := rs.tryDefaultValidateServiceClusterIPFields(nil, service); err != nil { + if err := rs.alloc.tryDefaultValidateServiceClusterIPFields(nil, service); err != nil { return nil, err } var err error if !dryrun.IsDryRun(options.DryRun) { - toReleaseClusterIPs, err = rs.allocServiceClusterIPs(service) + toReleaseClusterIPs, err = rs.alloc.allocServiceClusterIPs(service) if err != nil { return nil, err } @@ -283,7 +283,7 @@ func (rs *REST) Delete(ctx context.Context, id string, deleteValidation rest.Val return nil, false, err } - rs.releaseAllocatedResources(svc) + rs.alloc.releaseAllocatedResources(svc) } // TODO: this is duplicated from the generic storage, when this wrapper is fully removed we can drop this @@ -299,11 +299,11 @@ func (rs *REST) Delete(ctx context.Context, id string, deleteValidation rest.Val return status, true, nil } -func (rs *REST) releaseAllocatedResources(svc *api.Service) { - rs.releaseServiceClusterIPs(svc) +func (al *RESTAllocStuff) releaseAllocatedResources(svc *api.Service) { + al.releaseServiceClusterIPs(svc) for _, nodePort := range collectServiceNodePorts(svc) { - err := rs.alloc.serviceNodePorts.Release(nodePort) + err := al.serviceNodePorts.Release(nodePort) if err != nil { // these should be caught by an eventual reconciliation / restart utilruntime.HandleError(fmt.Errorf("Error releasing service %s node port %d: %v", svc.Name, nodePort, err)) @@ -313,7 +313,7 @@ func (rs *REST) releaseAllocatedResources(svc *api.Service) { if apiservice.NeedsHealthCheck(svc) { nodePort := svc.Spec.HealthCheckNodePort if nodePort > 0 { - err := rs.alloc.serviceNodePorts.Release(int(nodePort)) + err := al.serviceNodePorts.Release(int(nodePort)) if err != nil { // these should be caught by an eventual reconciliation / restart utilruntime.HandleError(fmt.Errorf("Error releasing service %s health check node port %d: %v", svc.Name, nodePort, err)) @@ -337,7 +337,7 @@ func shouldAllocateNodePorts(service *api.Service) bool { // healthCheckNodePortUpdate handles HealthCheckNodePort allocation/release // and adjusts HealthCheckNodePort during service update if needed. -func (rs *REST) healthCheckNodePortUpdate(oldService, service *api.Service, nodePortOp *portallocator.PortAllocationOperation) (bool, error) { +func (al *RESTAllocStuff) healthCheckNodePortUpdate(oldService, service *api.Service, nodePortOp *portallocator.PortAllocationOperation) (bool, error) { neededHealthCheckNodePort := apiservice.NeedsHealthCheck(oldService) oldHealthCheckNodePort := oldService.Spec.HealthCheckNodePort @@ -420,13 +420,13 @@ func (rs *REST) Update(ctx context.Context, name string, objInfo rest.UpdatedObj // on success: any ip that should be released, will be released defer func() { // release the allocated, this is expected to be cleared if the entire function ran to success - if allocated_released, err := rs.releaseClusterIPs(allocated); err != nil { + if allocated_released, err := rs.alloc.releaseClusterIPs(allocated); err != nil { klog.V(4).Infof("service %v/%v failed to clean up after failed service update error:%v. Allocated/Released:%v/%v", service.Namespace, service.Name, err, allocated, allocated_released) } // performRelease is set when the enture function ran to success if performRelease { - if toReleaseIPs_released, err := rs.releaseClusterIPs(toReleaseIPs); err != nil { + if toReleaseIPs_released, err := rs.alloc.releaseClusterIPs(toReleaseIPs); err != nil { klog.V(4).Infof("service %v/%v failed to clean up after failed service update error:%v. ShouldRelease/Released:%v/%v", service.Namespace, service.Name, err, toReleaseIPs, toReleaseIPs_released) } } @@ -436,12 +436,12 @@ func (rs *REST) Update(ctx context.Context, name string, objInfo rest.UpdatedObj defer nodePortOp.Finish() // try set ip families (for missing ip families) - if err := rs.tryDefaultValidateServiceClusterIPFields(oldService, service); err != nil { + if err := rs.alloc.tryDefaultValidateServiceClusterIPFields(oldService, service); err != nil { return nil, false, err } if !dryrun.IsDryRun(options.DryRun) { - allocated, toReleaseIPs, err = rs.handleClusterIPsForUpdatedService(oldService, service) + allocated, toReleaseIPs, err = rs.alloc.handleClusterIPsForUpdatedService(oldService, service) if err != nil { return nil, false, err } @@ -464,7 +464,7 @@ func (rs *REST) Update(ctx context.Context, name string, objInfo rest.UpdatedObj } // Handle ExternalTraffic related updates. - success, err := rs.healthCheckNodePortUpdate(oldService, service, nodePortOp) + success, err := rs.alloc.healthCheckNodePortUpdate(oldService, service, nodePortOp) if !success || err != nil { return nil, false, err } @@ -571,11 +571,11 @@ func (r *REST) ConvertToTable(ctx context.Context, object runtime.Object, tableO return r.services.ConvertToTable(ctx, object, tableOptions) } -func (rs *REST) allocClusterIPs(service *api.Service, toAlloc map[api.IPFamily]string) (map[api.IPFamily]string, error) { +func (al *RESTAllocStuff) allocClusterIPs(service *api.Service, toAlloc map[api.IPFamily]string) (map[api.IPFamily]string, error) { allocated := make(map[api.IPFamily]string) for family, ip := range toAlloc { - allocator := rs.alloc.serviceIPAllocatorsByFamily[family] // should always be there, as we pre validate + allocator := al.serviceIPAllocatorsByFamily[family] // should always be there, as we pre validate if ip == "" { allocatedIP, err := allocator.AllocateNext() if err != nil { @@ -595,14 +595,14 @@ func (rs *REST) allocClusterIPs(service *api.Service, toAlloc map[api.IPFamily]s } // releases clusterIPs per family -func (rs *REST) releaseClusterIPs(toRelease map[api.IPFamily]string) (map[api.IPFamily]string, error) { +func (al *RESTAllocStuff) releaseClusterIPs(toRelease map[api.IPFamily]string) (map[api.IPFamily]string, error) { if toRelease == nil { return nil, nil } released := make(map[api.IPFamily]string) for family, ip := range toRelease { - allocator, ok := rs.alloc.serviceIPAllocatorsByFamily[family] + allocator, ok := al.serviceIPAllocatorsByFamily[family] if !ok { // cluster was configured for dual stack, then single stack klog.V(4).Infof("delete service. Not releasing ClusterIP:%v because IPFamily:%v is no longer configured on server", ip, family) @@ -621,25 +621,25 @@ func (rs *REST) releaseClusterIPs(toRelease map[api.IPFamily]string) (map[api.IP // standard allocator for dualstackgate==Off, hard wired dependency // and ignores policy, families and clusterIPs -func (rs *REST) allocServiceClusterIP(service *api.Service) (map[api.IPFamily]string, error) { +func (al *RESTAllocStuff) allocServiceClusterIP(service *api.Service) (map[api.IPFamily]string, error) { toAlloc := make(map[api.IPFamily]string) // get clusterIP.. empty string if user did not specify an ip - toAlloc[rs.alloc.defaultServiceIPFamily] = service.Spec.ClusterIP + toAlloc[al.defaultServiceIPFamily] = service.Spec.ClusterIP // alloc - allocated, err := rs.allocClusterIPs(service, toAlloc) + allocated, err := al.allocClusterIPs(service, toAlloc) // set if err == nil { - service.Spec.ClusterIP = allocated[rs.alloc.defaultServiceIPFamily] - service.Spec.ClusterIPs = []string{allocated[rs.alloc.defaultServiceIPFamily]} + service.Spec.ClusterIP = allocated[al.defaultServiceIPFamily] + service.Spec.ClusterIPs = []string{allocated[al.defaultServiceIPFamily]} } return allocated, err } // allocates ClusterIPs for a service -func (rs *REST) allocServiceClusterIPs(service *api.Service) (map[api.IPFamily]string, error) { +func (al *RESTAllocStuff) allocServiceClusterIPs(service *api.Service) (map[api.IPFamily]string, error) { // external name don't get ClusterIPs if service.Spec.Type == api.ServiceTypeExternalName { return nil, nil @@ -651,7 +651,7 @@ func (rs *REST) allocServiceClusterIPs(service *api.Service) (map[api.IPFamily]s } if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) { - return rs.allocServiceClusterIP(service) + return al.allocServiceClusterIP(service) } toAlloc := make(map[api.IPFamily]string) @@ -674,7 +674,7 @@ func (rs *REST) allocServiceClusterIPs(service *api.Service) (map[api.IPFamily]s } // allocate - allocated, err := rs.allocClusterIPs(service, toAlloc) + allocated, err := al.allocClusterIPs(service, toAlloc) // set if successful if err == nil { @@ -702,7 +702,7 @@ func (rs *REST) allocServiceClusterIPs(service *api.Service) (map[api.IPFamily]s // this func does not perform actual release of clusterIPs. it returns // a map[family]ip for the caller to release when everything else has // executed successfully -func (rs *REST) handleClusterIPsForUpdatedService(oldService *api.Service, service *api.Service) (allocated map[api.IPFamily]string, toRelease map[api.IPFamily]string, err error) { +func (al *RESTAllocStuff) handleClusterIPsForUpdatedService(oldService *api.Service, service *api.Service) (allocated map[api.IPFamily]string, toRelease map[api.IPFamily]string, err error) { // We don't want to upgrade (add an IP) or downgrade (remove an IP) // following a cluster downgrade/upgrade to/from dual-stackness @@ -725,7 +725,7 @@ func (rs *REST) handleClusterIPsForUpdatedService(oldService *api.Service, servi // CASE A: // Update service from ExternalName to non-ExternalName, should initialize ClusterIP. if oldService.Spec.Type == api.ServiceTypeExternalName && service.Spec.Type != api.ServiceTypeExternalName { - allocated, err := rs.allocServiceClusterIPs(service) + allocated, err := al.allocServiceClusterIPs(service) return allocated, nil, err } @@ -741,7 +741,7 @@ func (rs *REST) handleClusterIPsForUpdatedService(oldService *api.Service, servi toRelease = make(map[api.IPFamily]string) if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) { // for non dual stack enabled cluster we use clusterIPs - toRelease[rs.alloc.defaultServiceIPFamily] = oldService.Spec.ClusterIP + toRelease[al.defaultServiceIPFamily] = oldService.Spec.ClusterIP } else { // dual stack is enabled, collect ClusterIPs by families for i, family := range oldService.Spec.IPFamilies { @@ -771,7 +771,7 @@ func (rs *REST) handleClusterIPsForUpdatedService(oldService *api.Service, servi toAllocate[service.Spec.IPFamilies[1]] = service.Spec.ClusterIPs[1] // allocate - allocated, err := rs.allocClusterIPs(service, toAllocate) + allocated, err := al.allocClusterIPs(service, toAllocate) // set if successful if err == nil { service.Spec.ClusterIPs[1] = allocated[service.Spec.IPFamilies[1]] @@ -792,7 +792,7 @@ func (rs *REST) handleClusterIPsForUpdatedService(oldService *api.Service, servi } // for pre dual stack (gate == off). Hardwired to ClusterIP and ignores all new fields -func (rs *REST) releaseServiceClusterIP(service *api.Service) (released map[api.IPFamily]string, err error) { +func (al *RESTAllocStuff) releaseServiceClusterIP(service *api.Service) (released map[api.IPFamily]string, err error) { toRelease := make(map[api.IPFamily]string) // we need to do that to handle cases where allocator is no longer configured on @@ -803,11 +803,11 @@ func (rs *REST) releaseServiceClusterIP(service *api.Service) (released map[api. toRelease[api.IPv4Protocol] = service.Spec.ClusterIP } - return rs.releaseClusterIPs(toRelease) + return al.releaseClusterIPs(toRelease) } // releases allocated ClusterIPs for service that is about to be deleted -func (rs *REST) releaseServiceClusterIPs(service *api.Service) (released map[api.IPFamily]string, err error) { +func (al *RESTAllocStuff) releaseServiceClusterIPs(service *api.Service) (released map[api.IPFamily]string, err error) { // external name don't get ClusterIPs if service.Spec.Type == api.ServiceTypeExternalName { return nil, nil @@ -819,7 +819,7 @@ func (rs *REST) releaseServiceClusterIPs(service *api.Service) (released map[api } if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) { - return rs.releaseServiceClusterIP(service) + return al.releaseServiceClusterIP(service) } toRelease := make(map[api.IPFamily]string) @@ -830,7 +830,7 @@ func (rs *REST) releaseServiceClusterIPs(service *api.Service) (released map[api toRelease[api.IPv4Protocol] = ip } } - return rs.releaseClusterIPs(toRelease) + return al.releaseClusterIPs(toRelease) } // tests if two preferred dual-stack service have matching ClusterIPFields @@ -897,7 +897,7 @@ func isMatchingPreferDualStackClusterIPFields(oldService, service *api.Service) // attempts to default service ip families according to cluster configuration // while ensuring that provided families are configured on cluster. -func (rs *REST) tryDefaultValidateServiceClusterIPFields(oldService, service *api.Service) error { +func (al *RESTAllocStuff) tryDefaultValidateServiceClusterIPFields(oldService, service *api.Service) error { // can not do anything here if service.Spec.Type == api.ServiceTypeExternalName { return nil @@ -952,14 +952,14 @@ func (rs *REST) tryDefaultValidateServiceClusterIPFields(oldService, service *ap if i >= len(service.Spec.IPFamilies) { if isIPv6 { // first make sure that family(ip) is configured - if _, found := rs.alloc.serviceIPAllocatorsByFamily[api.IPv6Protocol]; !found { + if _, found := al.serviceIPAllocatorsByFamily[api.IPv6Protocol]; !found { el := field.ErrorList{field.Invalid(field.NewPath("spec", "clusterIPs").Index(i), service.Spec.ClusterIPs, "may not use IPv6 on a cluster which is not configured for it")} return errors.NewInvalid(api.Kind("Service"), service.Name, el) } service.Spec.IPFamilies = append(service.Spec.IPFamilies, api.IPv6Protocol) } else { // first make sure that family(ip) is configured - if _, found := rs.alloc.serviceIPAllocatorsByFamily[api.IPv4Protocol]; !found { + if _, found := al.serviceIPAllocatorsByFamily[api.IPv4Protocol]; !found { el := field.ErrorList{field.Invalid(field.NewPath("spec", "clusterIPs").Index(i), service.Spec.ClusterIPs, "may not use IPv4 on a cluster which is not configured for it")} return errors.NewInvalid(api.Kind("Service"), service.Name, el) } @@ -986,7 +986,7 @@ func (rs *REST) tryDefaultValidateServiceClusterIPFields(oldService, service *ap // If IPFamilies was not set by the user, start with the default // family. if len(service.Spec.IPFamilies) == 0 { - service.Spec.IPFamilies = []api.IPFamily{rs.alloc.defaultServiceIPFamily} + service.Spec.IPFamilies = []api.IPFamily{al.defaultServiceIPFamily} } // this follows headful services. With one exception on a single stack @@ -1013,13 +1013,13 @@ func (rs *REST) tryDefaultValidateServiceClusterIPFields(oldService, service *ap // asking for dual stack on a non dual stack cluster // should fail without assigning any family - if service.Spec.IPFamilyPolicy != nil && *(service.Spec.IPFamilyPolicy) == api.IPFamilyPolicyRequireDualStack && len(rs.alloc.serviceIPAllocatorsByFamily) < 2 { + if service.Spec.IPFamilyPolicy != nil && *(service.Spec.IPFamilyPolicy) == api.IPFamilyPolicyRequireDualStack && len(al.serviceIPAllocatorsByFamily) < 2 { el = append(el, field.Invalid(field.NewPath("spec", "ipFamilyPolicy"), service.Spec.IPFamilyPolicy, "Cluster is not configured for dual stack services")) } // if there is a family requested then it has to be configured on cluster for i, ipFamily := range service.Spec.IPFamilies { - if _, found := rs.alloc.serviceIPAllocatorsByFamily[ipFamily]; !found { + if _, found := al.serviceIPAllocatorsByFamily[ipFamily]; !found { el = append(el, field.Invalid(field.NewPath("spec", "ipFamilies").Index(i), service.Spec.ClusterIPs, fmt.Sprintf("ipfamily %v is not configured on cluster", ipFamily))) } } @@ -1038,14 +1038,14 @@ func (rs *REST) tryDefaultValidateServiceClusterIPFields(oldService, service *ap // nil families, gets cluster default (if feature flag is not in effect, the strategy will take care of removing it) if len(service.Spec.IPFamilies) == 0 { - service.Spec.IPFamilies = []api.IPFamily{rs.alloc.defaultServiceIPFamily} + service.Spec.IPFamilies = []api.IPFamily{al.defaultServiceIPFamily} } // is this service looking for dual stack, and this cluster does have two families? // if so, then append the missing family if *(service.Spec.IPFamilyPolicy) != api.IPFamilyPolicySingleStack && len(service.Spec.IPFamilies) == 1 && - len(rs.alloc.serviceIPAllocatorsByFamily) == 2 { + len(al.serviceIPAllocatorsByFamily) == 2 { if service.Spec.IPFamilies[0] == api.IPv4Protocol { service.Spec.IPFamilies = append(service.Spec.IPFamilies, api.IPv6Protocol) diff --git a/pkg/registry/core/service/storage/rest_test.go b/pkg/registry/core/service/storage/rest_test.go index 1c2a9b59add..43eb8ef56fa 100644 --- a/pkg/registry/core/service/storage/rest_test.go +++ b/pkg/registry/core/service/storage/rest_test.go @@ -3710,7 +3710,7 @@ func TestDefaultingValidation(t *testing.T) { testCase.modifyRest(storage) } - err := storage.tryDefaultValidateServiceClusterIPFields(testCase.oldSvc, testCase.svc) + err := storage.alloc.tryDefaultValidateServiceClusterIPFields(testCase.oldSvc, testCase.svc) if err != nil && !testCase.expectError { t.Fatalf("error %v was not expected", err) }