mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Svc REST: move allocator methods -> alloc object
Move all allocator-related methods onto the alloc object so it can be used in either REST layer. There's an INORDINATE amount of test code here and I am skeptical that it is all useful. That's for later commits.
This commit is contained in:
parent
89587b3c6a
commit
b76a8c3c40
@ -193,7 +193,7 @@ func (rs *REST) Create(ctx context.Context, obj runtime.Object, createValidation
|
||||
|
||||
// TODO: this should probably move to strategy.PrepareForCreate()
|
||||
defer func() {
|
||||
released, err := rs.releaseClusterIPs(toReleaseClusterIPs)
|
||||
released, err := rs.alloc.releaseClusterIPs(toReleaseClusterIPs)
|
||||
if err != nil {
|
||||
klog.Warningf("failed to release clusterIPs for failed new service:%v allocated:%v released:%v error:%v",
|
||||
service.Name, toReleaseClusterIPs, released, err)
|
||||
@ -203,13 +203,13 @@ func (rs *REST) Create(ctx context.Context, obj runtime.Object, createValidation
|
||||
// try set ip families (for missing ip families)
|
||||
// we do it here, since we want this to be visible
|
||||
// even when dryRun == true
|
||||
if err := rs.tryDefaultValidateServiceClusterIPFields(nil, service); err != nil {
|
||||
if err := rs.alloc.tryDefaultValidateServiceClusterIPFields(nil, service); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var err error
|
||||
if !dryrun.IsDryRun(options.DryRun) {
|
||||
toReleaseClusterIPs, err = rs.allocServiceClusterIPs(service)
|
||||
toReleaseClusterIPs, err = rs.alloc.allocServiceClusterIPs(service)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -283,7 +283,7 @@ func (rs *REST) Delete(ctx context.Context, id string, deleteValidation rest.Val
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
rs.releaseAllocatedResources(svc)
|
||||
rs.alloc.releaseAllocatedResources(svc)
|
||||
}
|
||||
|
||||
// TODO: this is duplicated from the generic storage, when this wrapper is fully removed we can drop this
|
||||
@ -299,11 +299,11 @@ func (rs *REST) Delete(ctx context.Context, id string, deleteValidation rest.Val
|
||||
return status, true, nil
|
||||
}
|
||||
|
||||
func (rs *REST) releaseAllocatedResources(svc *api.Service) {
|
||||
rs.releaseServiceClusterIPs(svc)
|
||||
func (al *RESTAllocStuff) releaseAllocatedResources(svc *api.Service) {
|
||||
al.releaseServiceClusterIPs(svc)
|
||||
|
||||
for _, nodePort := range collectServiceNodePorts(svc) {
|
||||
err := rs.alloc.serviceNodePorts.Release(nodePort)
|
||||
err := al.serviceNodePorts.Release(nodePort)
|
||||
if err != nil {
|
||||
// these should be caught by an eventual reconciliation / restart
|
||||
utilruntime.HandleError(fmt.Errorf("Error releasing service %s node port %d: %v", svc.Name, nodePort, err))
|
||||
@ -313,7 +313,7 @@ func (rs *REST) releaseAllocatedResources(svc *api.Service) {
|
||||
if apiservice.NeedsHealthCheck(svc) {
|
||||
nodePort := svc.Spec.HealthCheckNodePort
|
||||
if nodePort > 0 {
|
||||
err := rs.alloc.serviceNodePorts.Release(int(nodePort))
|
||||
err := al.serviceNodePorts.Release(int(nodePort))
|
||||
if err != nil {
|
||||
// these should be caught by an eventual reconciliation / restart
|
||||
utilruntime.HandleError(fmt.Errorf("Error releasing service %s health check node port %d: %v", svc.Name, nodePort, err))
|
||||
@ -337,7 +337,7 @@ func shouldAllocateNodePorts(service *api.Service) bool {
|
||||
|
||||
// healthCheckNodePortUpdate handles HealthCheckNodePort allocation/release
|
||||
// and adjusts HealthCheckNodePort during service update if needed.
|
||||
func (rs *REST) healthCheckNodePortUpdate(oldService, service *api.Service, nodePortOp *portallocator.PortAllocationOperation) (bool, error) {
|
||||
func (al *RESTAllocStuff) healthCheckNodePortUpdate(oldService, service *api.Service, nodePortOp *portallocator.PortAllocationOperation) (bool, error) {
|
||||
neededHealthCheckNodePort := apiservice.NeedsHealthCheck(oldService)
|
||||
oldHealthCheckNodePort := oldService.Spec.HealthCheckNodePort
|
||||
|
||||
@ -420,13 +420,13 @@ func (rs *REST) Update(ctx context.Context, name string, objInfo rest.UpdatedObj
|
||||
// on success: any ip that should be released, will be released
|
||||
defer func() {
|
||||
// release the allocated, this is expected to be cleared if the entire function ran to success
|
||||
if allocated_released, err := rs.releaseClusterIPs(allocated); err != nil {
|
||||
if allocated_released, err := rs.alloc.releaseClusterIPs(allocated); err != nil {
|
||||
klog.V(4).Infof("service %v/%v failed to clean up after failed service update error:%v. Allocated/Released:%v/%v", service.Namespace, service.Name, err, allocated, allocated_released)
|
||||
|
||||
}
|
||||
// performRelease is set when the enture function ran to success
|
||||
if performRelease {
|
||||
if toReleaseIPs_released, err := rs.releaseClusterIPs(toReleaseIPs); err != nil {
|
||||
if toReleaseIPs_released, err := rs.alloc.releaseClusterIPs(toReleaseIPs); err != nil {
|
||||
klog.V(4).Infof("service %v/%v failed to clean up after failed service update error:%v. ShouldRelease/Released:%v/%v", service.Namespace, service.Name, err, toReleaseIPs, toReleaseIPs_released)
|
||||
}
|
||||
}
|
||||
@ -436,12 +436,12 @@ func (rs *REST) Update(ctx context.Context, name string, objInfo rest.UpdatedObj
|
||||
defer nodePortOp.Finish()
|
||||
|
||||
// try set ip families (for missing ip families)
|
||||
if err := rs.tryDefaultValidateServiceClusterIPFields(oldService, service); err != nil {
|
||||
if err := rs.alloc.tryDefaultValidateServiceClusterIPFields(oldService, service); err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
if !dryrun.IsDryRun(options.DryRun) {
|
||||
allocated, toReleaseIPs, err = rs.handleClusterIPsForUpdatedService(oldService, service)
|
||||
allocated, toReleaseIPs, err = rs.alloc.handleClusterIPsForUpdatedService(oldService, service)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
@ -464,7 +464,7 @@ func (rs *REST) Update(ctx context.Context, name string, objInfo rest.UpdatedObj
|
||||
}
|
||||
|
||||
// Handle ExternalTraffic related updates.
|
||||
success, err := rs.healthCheckNodePortUpdate(oldService, service, nodePortOp)
|
||||
success, err := rs.alloc.healthCheckNodePortUpdate(oldService, service, nodePortOp)
|
||||
if !success || err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
@ -571,11 +571,11 @@ func (r *REST) ConvertToTable(ctx context.Context, object runtime.Object, tableO
|
||||
return r.services.ConvertToTable(ctx, object, tableOptions)
|
||||
}
|
||||
|
||||
func (rs *REST) allocClusterIPs(service *api.Service, toAlloc map[api.IPFamily]string) (map[api.IPFamily]string, error) {
|
||||
func (al *RESTAllocStuff) allocClusterIPs(service *api.Service, toAlloc map[api.IPFamily]string) (map[api.IPFamily]string, error) {
|
||||
allocated := make(map[api.IPFamily]string)
|
||||
|
||||
for family, ip := range toAlloc {
|
||||
allocator := rs.alloc.serviceIPAllocatorsByFamily[family] // should always be there, as we pre validate
|
||||
allocator := al.serviceIPAllocatorsByFamily[family] // should always be there, as we pre validate
|
||||
if ip == "" {
|
||||
allocatedIP, err := allocator.AllocateNext()
|
||||
if err != nil {
|
||||
@ -595,14 +595,14 @@ func (rs *REST) allocClusterIPs(service *api.Service, toAlloc map[api.IPFamily]s
|
||||
}
|
||||
|
||||
// releases clusterIPs per family
|
||||
func (rs *REST) releaseClusterIPs(toRelease map[api.IPFamily]string) (map[api.IPFamily]string, error) {
|
||||
func (al *RESTAllocStuff) releaseClusterIPs(toRelease map[api.IPFamily]string) (map[api.IPFamily]string, error) {
|
||||
if toRelease == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
released := make(map[api.IPFamily]string)
|
||||
for family, ip := range toRelease {
|
||||
allocator, ok := rs.alloc.serviceIPAllocatorsByFamily[family]
|
||||
allocator, ok := al.serviceIPAllocatorsByFamily[family]
|
||||
if !ok {
|
||||
// cluster was configured for dual stack, then single stack
|
||||
klog.V(4).Infof("delete service. Not releasing ClusterIP:%v because IPFamily:%v is no longer configured on server", ip, family)
|
||||
@ -621,25 +621,25 @@ func (rs *REST) releaseClusterIPs(toRelease map[api.IPFamily]string) (map[api.IP
|
||||
|
||||
// standard allocator for dualstackgate==Off, hard wired dependency
|
||||
// and ignores policy, families and clusterIPs
|
||||
func (rs *REST) allocServiceClusterIP(service *api.Service) (map[api.IPFamily]string, error) {
|
||||
func (al *RESTAllocStuff) allocServiceClusterIP(service *api.Service) (map[api.IPFamily]string, error) {
|
||||
toAlloc := make(map[api.IPFamily]string)
|
||||
|
||||
// get clusterIP.. empty string if user did not specify an ip
|
||||
toAlloc[rs.alloc.defaultServiceIPFamily] = service.Spec.ClusterIP
|
||||
toAlloc[al.defaultServiceIPFamily] = service.Spec.ClusterIP
|
||||
// alloc
|
||||
allocated, err := rs.allocClusterIPs(service, toAlloc)
|
||||
allocated, err := al.allocClusterIPs(service, toAlloc)
|
||||
|
||||
// set
|
||||
if err == nil {
|
||||
service.Spec.ClusterIP = allocated[rs.alloc.defaultServiceIPFamily]
|
||||
service.Spec.ClusterIPs = []string{allocated[rs.alloc.defaultServiceIPFamily]}
|
||||
service.Spec.ClusterIP = allocated[al.defaultServiceIPFamily]
|
||||
service.Spec.ClusterIPs = []string{allocated[al.defaultServiceIPFamily]}
|
||||
}
|
||||
|
||||
return allocated, err
|
||||
}
|
||||
|
||||
// allocates ClusterIPs for a service
|
||||
func (rs *REST) allocServiceClusterIPs(service *api.Service) (map[api.IPFamily]string, error) {
|
||||
func (al *RESTAllocStuff) allocServiceClusterIPs(service *api.Service) (map[api.IPFamily]string, error) {
|
||||
// external name don't get ClusterIPs
|
||||
if service.Spec.Type == api.ServiceTypeExternalName {
|
||||
return nil, nil
|
||||
@ -651,7 +651,7 @@ func (rs *REST) allocServiceClusterIPs(service *api.Service) (map[api.IPFamily]s
|
||||
}
|
||||
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) {
|
||||
return rs.allocServiceClusterIP(service)
|
||||
return al.allocServiceClusterIP(service)
|
||||
}
|
||||
|
||||
toAlloc := make(map[api.IPFamily]string)
|
||||
@ -674,7 +674,7 @@ func (rs *REST) allocServiceClusterIPs(service *api.Service) (map[api.IPFamily]s
|
||||
}
|
||||
|
||||
// allocate
|
||||
allocated, err := rs.allocClusterIPs(service, toAlloc)
|
||||
allocated, err := al.allocClusterIPs(service, toAlloc)
|
||||
|
||||
// set if successful
|
||||
if err == nil {
|
||||
@ -702,7 +702,7 @@ func (rs *REST) allocServiceClusterIPs(service *api.Service) (map[api.IPFamily]s
|
||||
// this func does not perform actual release of clusterIPs. it returns
|
||||
// a map[family]ip for the caller to release when everything else has
|
||||
// executed successfully
|
||||
func (rs *REST) handleClusterIPsForUpdatedService(oldService *api.Service, service *api.Service) (allocated map[api.IPFamily]string, toRelease map[api.IPFamily]string, err error) {
|
||||
func (al *RESTAllocStuff) handleClusterIPsForUpdatedService(oldService *api.Service, service *api.Service) (allocated map[api.IPFamily]string, toRelease map[api.IPFamily]string, err error) {
|
||||
|
||||
// We don't want to upgrade (add an IP) or downgrade (remove an IP)
|
||||
// following a cluster downgrade/upgrade to/from dual-stackness
|
||||
@ -725,7 +725,7 @@ func (rs *REST) handleClusterIPsForUpdatedService(oldService *api.Service, servi
|
||||
// CASE A:
|
||||
// Update service from ExternalName to non-ExternalName, should initialize ClusterIP.
|
||||
if oldService.Spec.Type == api.ServiceTypeExternalName && service.Spec.Type != api.ServiceTypeExternalName {
|
||||
allocated, err := rs.allocServiceClusterIPs(service)
|
||||
allocated, err := al.allocServiceClusterIPs(service)
|
||||
return allocated, nil, err
|
||||
}
|
||||
|
||||
@ -741,7 +741,7 @@ func (rs *REST) handleClusterIPsForUpdatedService(oldService *api.Service, servi
|
||||
toRelease = make(map[api.IPFamily]string)
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) {
|
||||
// for non dual stack enabled cluster we use clusterIPs
|
||||
toRelease[rs.alloc.defaultServiceIPFamily] = oldService.Spec.ClusterIP
|
||||
toRelease[al.defaultServiceIPFamily] = oldService.Spec.ClusterIP
|
||||
} else {
|
||||
// dual stack is enabled, collect ClusterIPs by families
|
||||
for i, family := range oldService.Spec.IPFamilies {
|
||||
@ -771,7 +771,7 @@ func (rs *REST) handleClusterIPsForUpdatedService(oldService *api.Service, servi
|
||||
toAllocate[service.Spec.IPFamilies[1]] = service.Spec.ClusterIPs[1]
|
||||
|
||||
// allocate
|
||||
allocated, err := rs.allocClusterIPs(service, toAllocate)
|
||||
allocated, err := al.allocClusterIPs(service, toAllocate)
|
||||
// set if successful
|
||||
if err == nil {
|
||||
service.Spec.ClusterIPs[1] = allocated[service.Spec.IPFamilies[1]]
|
||||
@ -792,7 +792,7 @@ func (rs *REST) handleClusterIPsForUpdatedService(oldService *api.Service, servi
|
||||
}
|
||||
|
||||
// for pre dual stack (gate == off). Hardwired to ClusterIP and ignores all new fields
|
||||
func (rs *REST) releaseServiceClusterIP(service *api.Service) (released map[api.IPFamily]string, err error) {
|
||||
func (al *RESTAllocStuff) releaseServiceClusterIP(service *api.Service) (released map[api.IPFamily]string, err error) {
|
||||
toRelease := make(map[api.IPFamily]string)
|
||||
|
||||
// we need to do that to handle cases where allocator is no longer configured on
|
||||
@ -803,11 +803,11 @@ func (rs *REST) releaseServiceClusterIP(service *api.Service) (released map[api.
|
||||
toRelease[api.IPv4Protocol] = service.Spec.ClusterIP
|
||||
}
|
||||
|
||||
return rs.releaseClusterIPs(toRelease)
|
||||
return al.releaseClusterIPs(toRelease)
|
||||
}
|
||||
|
||||
// releases allocated ClusterIPs for service that is about to be deleted
|
||||
func (rs *REST) releaseServiceClusterIPs(service *api.Service) (released map[api.IPFamily]string, err error) {
|
||||
func (al *RESTAllocStuff) releaseServiceClusterIPs(service *api.Service) (released map[api.IPFamily]string, err error) {
|
||||
// external name don't get ClusterIPs
|
||||
if service.Spec.Type == api.ServiceTypeExternalName {
|
||||
return nil, nil
|
||||
@ -819,7 +819,7 @@ func (rs *REST) releaseServiceClusterIPs(service *api.Service) (released map[api
|
||||
}
|
||||
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) {
|
||||
return rs.releaseServiceClusterIP(service)
|
||||
return al.releaseServiceClusterIP(service)
|
||||
}
|
||||
|
||||
toRelease := make(map[api.IPFamily]string)
|
||||
@ -830,7 +830,7 @@ func (rs *REST) releaseServiceClusterIPs(service *api.Service) (released map[api
|
||||
toRelease[api.IPv4Protocol] = ip
|
||||
}
|
||||
}
|
||||
return rs.releaseClusterIPs(toRelease)
|
||||
return al.releaseClusterIPs(toRelease)
|
||||
}
|
||||
|
||||
// tests if two preferred dual-stack service have matching ClusterIPFields
|
||||
@ -897,7 +897,7 @@ func isMatchingPreferDualStackClusterIPFields(oldService, service *api.Service)
|
||||
|
||||
// attempts to default service ip families according to cluster configuration
|
||||
// while ensuring that provided families are configured on cluster.
|
||||
func (rs *REST) tryDefaultValidateServiceClusterIPFields(oldService, service *api.Service) error {
|
||||
func (al *RESTAllocStuff) tryDefaultValidateServiceClusterIPFields(oldService, service *api.Service) error {
|
||||
// can not do anything here
|
||||
if service.Spec.Type == api.ServiceTypeExternalName {
|
||||
return nil
|
||||
@ -952,14 +952,14 @@ func (rs *REST) tryDefaultValidateServiceClusterIPFields(oldService, service *ap
|
||||
if i >= len(service.Spec.IPFamilies) {
|
||||
if isIPv6 {
|
||||
// first make sure that family(ip) is configured
|
||||
if _, found := rs.alloc.serviceIPAllocatorsByFamily[api.IPv6Protocol]; !found {
|
||||
if _, found := al.serviceIPAllocatorsByFamily[api.IPv6Protocol]; !found {
|
||||
el := field.ErrorList{field.Invalid(field.NewPath("spec", "clusterIPs").Index(i), service.Spec.ClusterIPs, "may not use IPv6 on a cluster which is not configured for it")}
|
||||
return errors.NewInvalid(api.Kind("Service"), service.Name, el)
|
||||
}
|
||||
service.Spec.IPFamilies = append(service.Spec.IPFamilies, api.IPv6Protocol)
|
||||
} else {
|
||||
// first make sure that family(ip) is configured
|
||||
if _, found := rs.alloc.serviceIPAllocatorsByFamily[api.IPv4Protocol]; !found {
|
||||
if _, found := al.serviceIPAllocatorsByFamily[api.IPv4Protocol]; !found {
|
||||
el := field.ErrorList{field.Invalid(field.NewPath("spec", "clusterIPs").Index(i), service.Spec.ClusterIPs, "may not use IPv4 on a cluster which is not configured for it")}
|
||||
return errors.NewInvalid(api.Kind("Service"), service.Name, el)
|
||||
}
|
||||
@ -986,7 +986,7 @@ func (rs *REST) tryDefaultValidateServiceClusterIPFields(oldService, service *ap
|
||||
// If IPFamilies was not set by the user, start with the default
|
||||
// family.
|
||||
if len(service.Spec.IPFamilies) == 0 {
|
||||
service.Spec.IPFamilies = []api.IPFamily{rs.alloc.defaultServiceIPFamily}
|
||||
service.Spec.IPFamilies = []api.IPFamily{al.defaultServiceIPFamily}
|
||||
}
|
||||
|
||||
// this follows headful services. With one exception on a single stack
|
||||
@ -1013,13 +1013,13 @@ func (rs *REST) tryDefaultValidateServiceClusterIPFields(oldService, service *ap
|
||||
|
||||
// asking for dual stack on a non dual stack cluster
|
||||
// should fail without assigning any family
|
||||
if service.Spec.IPFamilyPolicy != nil && *(service.Spec.IPFamilyPolicy) == api.IPFamilyPolicyRequireDualStack && len(rs.alloc.serviceIPAllocatorsByFamily) < 2 {
|
||||
if service.Spec.IPFamilyPolicy != nil && *(service.Spec.IPFamilyPolicy) == api.IPFamilyPolicyRequireDualStack && len(al.serviceIPAllocatorsByFamily) < 2 {
|
||||
el = append(el, field.Invalid(field.NewPath("spec", "ipFamilyPolicy"), service.Spec.IPFamilyPolicy, "Cluster is not configured for dual stack services"))
|
||||
}
|
||||
|
||||
// if there is a family requested then it has to be configured on cluster
|
||||
for i, ipFamily := range service.Spec.IPFamilies {
|
||||
if _, found := rs.alloc.serviceIPAllocatorsByFamily[ipFamily]; !found {
|
||||
if _, found := al.serviceIPAllocatorsByFamily[ipFamily]; !found {
|
||||
el = append(el, field.Invalid(field.NewPath("spec", "ipFamilies").Index(i), service.Spec.ClusterIPs, fmt.Sprintf("ipfamily %v is not configured on cluster", ipFamily)))
|
||||
}
|
||||
}
|
||||
@ -1038,14 +1038,14 @@ func (rs *REST) tryDefaultValidateServiceClusterIPFields(oldService, service *ap
|
||||
|
||||
// nil families, gets cluster default (if feature flag is not in effect, the strategy will take care of removing it)
|
||||
if len(service.Spec.IPFamilies) == 0 {
|
||||
service.Spec.IPFamilies = []api.IPFamily{rs.alloc.defaultServiceIPFamily}
|
||||
service.Spec.IPFamilies = []api.IPFamily{al.defaultServiceIPFamily}
|
||||
}
|
||||
|
||||
// is this service looking for dual stack, and this cluster does have two families?
|
||||
// if so, then append the missing family
|
||||
if *(service.Spec.IPFamilyPolicy) != api.IPFamilyPolicySingleStack &&
|
||||
len(service.Spec.IPFamilies) == 1 &&
|
||||
len(rs.alloc.serviceIPAllocatorsByFamily) == 2 {
|
||||
len(al.serviceIPAllocatorsByFamily) == 2 {
|
||||
|
||||
if service.Spec.IPFamilies[0] == api.IPv4Protocol {
|
||||
service.Spec.IPFamilies = append(service.Spec.IPFamilies, api.IPv6Protocol)
|
||||
|
@ -3710,7 +3710,7 @@ func TestDefaultingValidation(t *testing.T) {
|
||||
testCase.modifyRest(storage)
|
||||
}
|
||||
|
||||
err := storage.tryDefaultValidateServiceClusterIPFields(testCase.oldSvc, testCase.svc)
|
||||
err := storage.alloc.tryDefaultValidateServiceClusterIPFields(testCase.oldSvc, testCase.svc)
|
||||
if err != nil && !testCase.expectError {
|
||||
t.Fatalf("error %v was not expected", err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user