Svc REST: Encapsulate IP and Port allocator logic

Encapsulate the allocator logic so it can be shared across REST
layers while we stage a series of commits to get rid of one layer.
This commit is contained in:
Tim Hockin 2020-11-17 20:42:16 -08:00
parent d13c920606
commit 89587b3c6a
3 changed files with 109 additions and 106 deletions

View File

@ -254,16 +254,24 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}
serviceIPAllocators := map[api.IPFamily]ipallocator.Interface{
serviceClusterIPAllocator.IPFamily(): serviceClusterIPAllocator,
}
if secondaryServiceClusterIPAllocator != nil {
serviceIPAllocators[secondaryServiceClusterIPAllocator.IPFamily()] = secondaryServiceClusterIPAllocator
}
serviceRESTStorage, serviceStatusStorage, err := servicestore.NewGenericREST(restOptionsGetter, serviceClusterIPRange, secondaryServiceClusterIPAllocator != nil)
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}
serviceRest, serviceRestProxy := servicestore.NewREST(serviceRESTStorage,
serviceRest, serviceRestProxy := servicestore.NewREST(
serviceRESTStorage,
endpointsStorage,
podStorage.Pod,
serviceClusterIPAllocator,
secondaryServiceClusterIPAllocator,
serviceClusterIPAllocator.IPFamily(),
serviceIPAllocators,
serviceNodePortAllocator,
c.ProxyTransport)

View File

@ -51,14 +51,20 @@ import (
// REST adapts a service registry into apiserver's RESTStorage model.
type REST struct {
strategy rest.RESTCreateUpdateStrategy
services ServiceStorage
endpoints EndpointsStorage
strategy rest.RESTCreateUpdateStrategy
services ServiceStorage
endpoints EndpointsStorage
alloc RESTAllocStuff
proxyTransport http.RoundTripper
pods rest.Getter
}
// RESTAllocStuff is a temporary struct to facilitate the flattening of service
// REST layers. It will be cleaned up over a series of commits.
type RESTAllocStuff struct {
serviceIPAllocatorsByFamily map[api.IPFamily]ipallocator.Interface
defaultServiceIPFamily api.IPFamily // --service-cluster-ip-range[0]
serviceNodePorts portallocator.Interface
proxyTransport http.RoundTripper
pods rest.Getter
}
// ServiceNodePort includes protocol and port number of a service NodePort.
@ -95,54 +101,37 @@ func NewREST(
services ServiceStorage,
endpoints EndpointsStorage,
pods rest.Getter,
serviceIPs ipallocator.Interface,
secondaryServiceIPs ipallocator.Interface,
serviceNodePorts portallocator.Interface,
defaultFamily api.IPFamily,
ipAllocs map[api.IPFamily]ipallocator.Interface,
portAlloc portallocator.Interface,
proxyTransport http.RoundTripper,
) (*REST, *registry.ProxyREST) {
strategy, _ := registry.StrategyForServiceCIDRs(serviceIPs.CIDR(), secondaryServiceIPs != nil)
strategy, _ := registry.StrategyForServiceCIDRs(ipAllocs[defaultFamily].CIDR(), len(ipAllocs) > 1)
byIPFamily := make(map[api.IPFamily]ipallocator.Interface)
// detect this cluster default Service IPFamily (ipfamily of --service-cluster-ip-range[0])
serviceIPFamily := api.IPv4Protocol
cidr := serviceIPs.CIDR()
if netutils.IsIPv6CIDR(&cidr) {
serviceIPFamily = api.IPv6Protocol
}
// add primary family
byIPFamily[serviceIPFamily] = serviceIPs
if secondaryServiceIPs != nil {
// process secondary family
secondaryServiceIPFamily := api.IPv6Protocol
// get family of secondary
if serviceIPFamily == api.IPv6Protocol {
secondaryServiceIPFamily = api.IPv4Protocol
}
// add it
byIPFamily[secondaryServiceIPFamily] = secondaryServiceIPs
}
klog.V(0).Infof("the default service ipfamily for this cluster is: %s", string(serviceIPFamily))
klog.V(0).Infof("the default service ipfamily for this cluster is: %s", string(defaultFamily))
rest := &REST{
strategy: strategy,
services: services,
endpoints: endpoints,
serviceIPAllocatorsByFamily: byIPFamily,
serviceNodePorts: serviceNodePorts,
defaultServiceIPFamily: serviceIPFamily,
proxyTransport: proxyTransport,
pods: pods,
strategy: strategy,
services: services,
endpoints: endpoints,
proxyTransport: proxyTransport,
pods: pods,
alloc: makeAlloc(defaultFamily, ipAllocs, portAlloc),
}
return rest, &registry.ProxyREST{Redirector: rest, ProxyTransport: proxyTransport}
}
// This is a trasitionary function to facilitate service REST flattening.
func makeAlloc(defaultFamily api.IPFamily, ipAllocs map[api.IPFamily]ipallocator.Interface, portAlloc portallocator.Interface) RESTAllocStuff {
return RESTAllocStuff{
defaultServiceIPFamily: defaultFamily,
serviceIPAllocatorsByFamily: ipAllocs,
serviceNodePorts: portAlloc,
}
}
var (
_ ServiceStorage = &REST{}
_ rest.CategoriesProvider = &REST{}
@ -226,7 +215,7 @@ func (rs *REST) Create(ctx context.Context, obj runtime.Object, createValidation
}
}
nodePortOp := portallocator.StartOperation(rs.serviceNodePorts, dryrun.IsDryRun(options.DryRun))
nodePortOp := portallocator.StartOperation(rs.alloc.serviceNodePorts, dryrun.IsDryRun(options.DryRun))
defer nodePortOp.Finish()
if service.Spec.Type == api.ServiceTypeNodePort || service.Spec.Type == api.ServiceTypeLoadBalancer {
@ -314,7 +303,7 @@ func (rs *REST) releaseAllocatedResources(svc *api.Service) {
rs.releaseServiceClusterIPs(svc)
for _, nodePort := range collectServiceNodePorts(svc) {
err := rs.serviceNodePorts.Release(nodePort)
err := rs.alloc.serviceNodePorts.Release(nodePort)
if err != nil {
// these should be caught by an eventual reconciliation / restart
utilruntime.HandleError(fmt.Errorf("Error releasing service %s node port %d: %v", svc.Name, nodePort, err))
@ -324,7 +313,7 @@ func (rs *REST) releaseAllocatedResources(svc *api.Service) {
if apiservice.NeedsHealthCheck(svc) {
nodePort := svc.Spec.HealthCheckNodePort
if nodePort > 0 {
err := rs.serviceNodePorts.Release(int(nodePort))
err := rs.alloc.serviceNodePorts.Release(int(nodePort))
if err != nil {
// these should be caught by an eventual reconciliation / restart
utilruntime.HandleError(fmt.Errorf("Error releasing service %s health check node port %d: %v", svc.Name, nodePort, err))
@ -443,7 +432,7 @@ func (rs *REST) Update(ctx context.Context, name string, objInfo rest.UpdatedObj
}
}()
nodePortOp := portallocator.StartOperation(rs.serviceNodePorts, dryrun.IsDryRun(options.DryRun))
nodePortOp := portallocator.StartOperation(rs.alloc.serviceNodePorts, dryrun.IsDryRun(options.DryRun))
defer nodePortOp.Finish()
// try set ip families (for missing ip families)
@ -586,7 +575,7 @@ func (rs *REST) allocClusterIPs(service *api.Service, toAlloc map[api.IPFamily]s
allocated := make(map[api.IPFamily]string)
for family, ip := range toAlloc {
allocator := rs.serviceIPAllocatorsByFamily[family] // should always be there, as we pre validate
allocator := rs.alloc.serviceIPAllocatorsByFamily[family] // should always be there, as we pre validate
if ip == "" {
allocatedIP, err := allocator.AllocateNext()
if err != nil {
@ -613,7 +602,7 @@ func (rs *REST) releaseClusterIPs(toRelease map[api.IPFamily]string) (map[api.IP
released := make(map[api.IPFamily]string)
for family, ip := range toRelease {
allocator, ok := rs.serviceIPAllocatorsByFamily[family]
allocator, ok := rs.alloc.serviceIPAllocatorsByFamily[family]
if !ok {
// cluster was configured for dual stack, then single stack
klog.V(4).Infof("delete service. Not releasing ClusterIP:%v because IPFamily:%v is no longer configured on server", ip, family)
@ -636,14 +625,14 @@ func (rs *REST) allocServiceClusterIP(service *api.Service) (map[api.IPFamily]st
toAlloc := make(map[api.IPFamily]string)
// get clusterIP.. empty string if user did not specify an ip
toAlloc[rs.defaultServiceIPFamily] = service.Spec.ClusterIP
toAlloc[rs.alloc.defaultServiceIPFamily] = service.Spec.ClusterIP
// alloc
allocated, err := rs.allocClusterIPs(service, toAlloc)
// set
if err == nil {
service.Spec.ClusterIP = allocated[rs.defaultServiceIPFamily]
service.Spec.ClusterIPs = []string{allocated[rs.defaultServiceIPFamily]}
service.Spec.ClusterIP = allocated[rs.alloc.defaultServiceIPFamily]
service.Spec.ClusterIPs = []string{allocated[rs.alloc.defaultServiceIPFamily]}
}
return allocated, err
@ -752,7 +741,7 @@ func (rs *REST) handleClusterIPsForUpdatedService(oldService *api.Service, servi
toRelease = make(map[api.IPFamily]string)
if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) {
// for non dual stack enabled cluster we use clusterIPs
toRelease[rs.defaultServiceIPFamily] = oldService.Spec.ClusterIP
toRelease[rs.alloc.defaultServiceIPFamily] = oldService.Spec.ClusterIP
} else {
// dual stack is enabled, collect ClusterIPs by families
for i, family := range oldService.Spec.IPFamilies {
@ -963,14 +952,14 @@ func (rs *REST) tryDefaultValidateServiceClusterIPFields(oldService, service *ap
if i >= len(service.Spec.IPFamilies) {
if isIPv6 {
// first make sure that family(ip) is configured
if _, found := rs.serviceIPAllocatorsByFamily[api.IPv6Protocol]; !found {
if _, found := rs.alloc.serviceIPAllocatorsByFamily[api.IPv6Protocol]; !found {
el := field.ErrorList{field.Invalid(field.NewPath("spec", "clusterIPs").Index(i), service.Spec.ClusterIPs, "may not use IPv6 on a cluster which is not configured for it")}
return errors.NewInvalid(api.Kind("Service"), service.Name, el)
}
service.Spec.IPFamilies = append(service.Spec.IPFamilies, api.IPv6Protocol)
} else {
// first make sure that family(ip) is configured
if _, found := rs.serviceIPAllocatorsByFamily[api.IPv4Protocol]; !found {
if _, found := rs.alloc.serviceIPAllocatorsByFamily[api.IPv4Protocol]; !found {
el := field.ErrorList{field.Invalid(field.NewPath("spec", "clusterIPs").Index(i), service.Spec.ClusterIPs, "may not use IPv4 on a cluster which is not configured for it")}
return errors.NewInvalid(api.Kind("Service"), service.Name, el)
}
@ -997,7 +986,7 @@ func (rs *REST) tryDefaultValidateServiceClusterIPFields(oldService, service *ap
// If IPFamilies was not set by the user, start with the default
// family.
if len(service.Spec.IPFamilies) == 0 {
service.Spec.IPFamilies = []api.IPFamily{rs.defaultServiceIPFamily}
service.Spec.IPFamilies = []api.IPFamily{rs.alloc.defaultServiceIPFamily}
}
// this follows headful services. With one exception on a single stack
@ -1024,13 +1013,13 @@ func (rs *REST) tryDefaultValidateServiceClusterIPFields(oldService, service *ap
// asking for dual stack on a non dual stack cluster
// should fail without assigning any family
if service.Spec.IPFamilyPolicy != nil && *(service.Spec.IPFamilyPolicy) == api.IPFamilyPolicyRequireDualStack && len(rs.serviceIPAllocatorsByFamily) < 2 {
if service.Spec.IPFamilyPolicy != nil && *(service.Spec.IPFamilyPolicy) == api.IPFamilyPolicyRequireDualStack && len(rs.alloc.serviceIPAllocatorsByFamily) < 2 {
el = append(el, field.Invalid(field.NewPath("spec", "ipFamilyPolicy"), service.Spec.IPFamilyPolicy, "Cluster is not configured for dual stack services"))
}
// if there is a family requested then it has to be configured on cluster
for i, ipFamily := range service.Spec.IPFamilies {
if _, found := rs.serviceIPAllocatorsByFamily[ipFamily]; !found {
if _, found := rs.alloc.serviceIPAllocatorsByFamily[ipFamily]; !found {
el = append(el, field.Invalid(field.NewPath("spec", "ipFamilies").Index(i), service.Spec.ClusterIPs, fmt.Sprintf("ipfamily %v is not configured on cluster", ipFamily)))
}
}
@ -1049,14 +1038,14 @@ func (rs *REST) tryDefaultValidateServiceClusterIPFields(oldService, service *ap
// nil families, gets cluster default (if feature flag is not in effect, the strategy will take care of removing it)
if len(service.Spec.IPFamilies) == 0 {
service.Spec.IPFamilies = []api.IPFamily{rs.defaultServiceIPFamily}
service.Spec.IPFamilies = []api.IPFamily{rs.alloc.defaultServiceIPFamily}
}
// is this service looking for dual stack, and this cluster does have two families?
// if so, then append the missing family
if *(service.Spec.IPFamilyPolicy) != api.IPFamilyPolicySingleStack &&
len(service.Spec.IPFamilies) == 1 &&
len(rs.serviceIPAllocatorsByFamily) == 2 {
len(rs.alloc.serviceIPAllocatorsByFamily) == 2 {
if service.Spec.IPFamilies[0] == api.IPv4Protocol {
service.Spec.IPFamilies = append(service.Spec.IPFamilies, api.IPv6Protocol)

View File

@ -240,7 +240,13 @@ func NewTestRESTWithPods(t *testing.T, endpoints []*api.Endpoints, pods []api.Po
t.Fatalf("cannot create port allocator %v", err)
}
rest, _ := NewREST(serviceStorage, endpointStorage, podStorage.Pod, rPrimary, rSecondary, portAllocator, nil)
ipAllocators := map[api.IPFamily]ipallocator.Interface{
rPrimary.IPFamily(): rPrimary,
}
if rSecondary != nil {
ipAllocators[rSecondary.IPFamily()] = rSecondary
}
rest, _ := NewREST(serviceStorage, endpointStorage, podStorage.Pod, rPrimary.IPFamily(), ipAllocators, portAllocator, nil)
return rest, server
}
@ -332,7 +338,7 @@ func TestServiceRegistryCreate(t *testing.T) {
}
for i, family := range createdService.Spec.IPFamilies {
allocator := storage.serviceIPAllocatorsByFamily[family]
allocator := storage.alloc.serviceIPAllocatorsByFamily[family]
c := allocator.CIDR()
cidr := &c
if !cidr.Contains(netutils.ParseIPSloppy(createdService.Spec.ClusterIPs[i])) {
@ -399,7 +405,7 @@ func TestServiceRegistryCreateDryRun(t *testing.T) {
}
for i, family := range tc.svc.Spec.IPFamilies {
alloc := storage.serviceIPAllocatorsByFamily[family]
alloc := storage.alloc.serviceIPAllocatorsByFamily[family]
if ipIsAllocated(t, alloc, tc.svc.Spec.ClusterIPs[i]) {
t.Errorf("unexpected side effect: ip allocated %v", tc.svc.Spec.ClusterIPs[i])
}
@ -429,7 +435,7 @@ func TestDryRunNodePort(t *testing.T) {
if createdSvc.Spec.Ports[0].NodePort == 0 {
t.Errorf("expected NodePort value assigned")
}
if portIsAllocated(t, storage.serviceNodePorts, createdSvc.Spec.Ports[0].NodePort) {
if portIsAllocated(t, storage.alloc.serviceNodePorts, createdSvc.Spec.Ports[0].NodePort) {
t.Errorf("unexpected side effect: NodePort allocated")
}
_, err = getService(storage, ctx, svc.Name, &metav1.GetOptions{})
@ -456,7 +462,7 @@ func TestDryRunNodePort(t *testing.T) {
t.Errorf("Expected %v, but got %v", expectNodePorts, actualNodePorts)
}
for i := range svc.Spec.Ports {
if portIsAllocated(t, storage.serviceNodePorts, svc.Spec.Ports[i].NodePort) {
if portIsAllocated(t, storage.alloc.serviceNodePorts, svc.Spec.Ports[i].NodePort) {
t.Errorf("unexpected side effect: NodePort allocated")
}
}
@ -560,7 +566,7 @@ func TestServiceRegistryCreateMultiNodePortsService(t *testing.T) {
for i := range serviceNodePorts {
nodePort := serviceNodePorts[i]
// Release the node port at the end of the test case.
storage.serviceNodePorts.Release(nodePort)
storage.alloc.serviceNodePorts.Release(nodePort)
}
}
}
@ -902,7 +908,7 @@ func TestServiceRegistryUpdateDryRun(t *testing.T) {
if created {
t.Errorf("expected not created")
}
if portIsAllocated(t, storage.serviceNodePorts, new1.Spec.Ports[0].NodePort) {
if portIsAllocated(t, storage.alloc.serviceNodePorts, new1.Spec.Ports[0].NodePort) {
t.Errorf("unexpected side effect: NodePort allocated")
}
@ -915,7 +921,7 @@ func TestServiceRegistryUpdateDryRun(t *testing.T) {
if err != nil {
t.Fatalf("Expected no error: %v", err)
}
if ipIsAllocated(t, storage.serviceIPAllocatorsByFamily[storage.defaultServiceIPFamily], new2.Spec.ClusterIP) {
if ipIsAllocated(t, storage.alloc.serviceIPAllocatorsByFamily[storage.alloc.defaultServiceIPFamily], new2.Spec.ClusterIP) {
t.Errorf("unexpected side effect: ip allocated")
}
@ -925,10 +931,10 @@ func TestServiceRegistryUpdateDryRun(t *testing.T) {
t.Fatalf("Expected no error: %v", err)
}
svc = obj.(*api.Service)
if !ipIsAllocated(t, storage.serviceIPAllocatorsByFamily[storage.defaultServiceIPFamily], svc.Spec.ClusterIP) {
if !ipIsAllocated(t, storage.alloc.serviceIPAllocatorsByFamily[storage.alloc.defaultServiceIPFamily], svc.Spec.ClusterIP) {
t.Errorf("expected IP to be allocated")
}
if !portIsAllocated(t, storage.serviceNodePorts, svc.Spec.Ports[0].NodePort) {
if !portIsAllocated(t, storage.alloc.serviceNodePorts, svc.Spec.Ports[0].NodePort) {
t.Errorf("expected NodePort to be allocated")
}
@ -939,7 +945,7 @@ func TestServiceRegistryUpdateDryRun(t *testing.T) {
if err != nil {
t.Fatalf("Expected no error: %v", err)
}
if !portIsAllocated(t, storage.serviceNodePorts, svc.Spec.Ports[0].NodePort) {
if !portIsAllocated(t, storage.alloc.serviceNodePorts, svc.Spec.Ports[0].NodePort) {
t.Errorf("unexpected side effect: NodePort unallocated")
}
@ -957,7 +963,7 @@ func TestServiceRegistryUpdateDryRun(t *testing.T) {
if err != nil {
t.Fatalf("expected no error: %v", err)
}
if !ipIsAllocated(t, storage.serviceIPAllocatorsByFamily[storage.defaultServiceIPFamily], svc.Spec.ClusterIP) {
if !ipIsAllocated(t, storage.alloc.serviceIPAllocatorsByFamily[storage.alloc.defaultServiceIPFamily], svc.Spec.ClusterIP) {
t.Errorf("unexpected side effect: ip unallocated")
}
}
@ -1130,14 +1136,14 @@ func TestServiceRegistryDeleteDryRun(t *testing.T) {
if createdSvc.Spec.ClusterIP == "" {
t.Fatalf("expected ClusterIP to be set")
}
if !ipIsAllocated(t, storage.serviceIPAllocatorsByFamily[storage.defaultServiceIPFamily], createdSvc.Spec.ClusterIP) {
if !ipIsAllocated(t, storage.alloc.serviceIPAllocatorsByFamily[storage.alloc.defaultServiceIPFamily], createdSvc.Spec.ClusterIP) {
t.Errorf("expected ClusterIP to be allocated")
}
_, _, err = storage.Delete(ctx, svc.Name, rest.ValidateAllObjectFunc, &metav1.DeleteOptions{DryRun: []string{metav1.DryRunAll}})
if err != nil {
t.Fatalf("Expected no error: %v", err)
}
if !ipIsAllocated(t, storage.serviceIPAllocatorsByFamily[storage.defaultServiceIPFamily], createdSvc.Spec.ClusterIP) {
if !ipIsAllocated(t, storage.alloc.serviceIPAllocatorsByFamily[storage.alloc.defaultServiceIPFamily], createdSvc.Spec.ClusterIP) {
t.Errorf("unexpected side effect: ip unallocated")
}
@ -1151,7 +1157,7 @@ func TestServiceRegistryDeleteDryRun(t *testing.T) {
if createdSvc.Spec.Ports[0].NodePort == 0 {
t.Fatalf("expected NodePort to be set")
}
if !portIsAllocated(t, storage.serviceNodePorts, createdSvc.Spec.Ports[0].NodePort) {
if !portIsAllocated(t, storage.alloc.serviceNodePorts, createdSvc.Spec.Ports[0].NodePort) {
t.Errorf("expected NodePort to be allocated")
}
@ -1161,7 +1167,7 @@ func TestServiceRegistryDeleteDryRun(t *testing.T) {
if err != nil {
t.Fatalf("Expected no error: %v", err)
}
if !portIsAllocated(t, storage.serviceNodePorts, createdSvc.Spec.Ports[0].NodePort) {
if !portIsAllocated(t, storage.alloc.serviceNodePorts, createdSvc.Spec.Ports[0].NodePort) {
t.Errorf("unexpected side effect: NodePort unallocated")
}
}
@ -1189,7 +1195,7 @@ func TestDualStackServiceRegistryDeleteDryRun(t *testing.T) {
t.Fatalf("Expected no error: %v", err)
}
for i, family := range dualstack_svc.Spec.IPFamilies {
if !ipIsAllocated(t, dualstack_storage.serviceIPAllocatorsByFamily[family], dualstack_svc.Spec.ClusterIPs[i]) {
if !ipIsAllocated(t, dualstack_storage.alloc.serviceIPAllocatorsByFamily[family], dualstack_svc.Spec.ClusterIPs[i]) {
t.Errorf("unexpected side effect: ip unallocated %v", dualstack_svc.Spec.ClusterIPs[i])
}
}
@ -1492,7 +1498,7 @@ func TestServiceRegistryIPAllocation(t *testing.T) {
testIPs := []string{"1.2.3.93", "1.2.3.94", "1.2.3.95", "1.2.3.96"}
testIP := "not-an-ip"
for _, ip := range testIPs {
if !ipIsAllocated(t, storage.serviceIPAllocatorsByFamily[storage.defaultServiceIPFamily].(*ipallocator.Range), ip) {
if !ipIsAllocated(t, storage.alloc.serviceIPAllocatorsByFamily[storage.alloc.defaultServiceIPFamily].(*ipallocator.Range), ip) {
testIP = ip
break
}
@ -1581,7 +1587,7 @@ func TestServiceRegistryIPUpdate(t *testing.T) {
testIPs := []string{"1.2.3.93", "1.2.3.94", "1.2.3.95", "1.2.3.96"}
testIP := ""
for _, ip := range testIPs {
if !ipIsAllocated(t, storage.serviceIPAllocatorsByFamily[storage.defaultServiceIPFamily].(*ipallocator.Range), ip) {
if !ipIsAllocated(t, storage.alloc.serviceIPAllocatorsByFamily[storage.alloc.defaultServiceIPFamily].(*ipallocator.Range), ip) {
testIP = ip
break
}
@ -1955,7 +1961,7 @@ func TestInitClusterIP(t *testing.T) {
// pre allocate ips if any
for family, ip := range test.preAllocateClusterIPs {
allocator, ok := storage.serviceIPAllocatorsByFamily[family]
allocator, ok := storage.alloc.serviceIPAllocatorsByFamily[family]
if !ok {
t.Fatalf("test is incorrect, allocator does not exist on rest")
}
@ -1986,7 +1992,7 @@ func TestInitClusterIP(t *testing.T) {
if netutils.IsIPv6String(ip) {
family = api.IPv6Protocol
}
allocator := storage.serviceIPAllocatorsByFamily[family]
allocator := storage.alloc.serviceIPAllocatorsByFamily[family]
if !ipIsAllocated(t, allocator, ip) {
t.Fatalf("expected ip:%v to be allocated by %v allocator. it was not", ip, family)
}
@ -2021,7 +2027,7 @@ func TestInitClusterIP(t *testing.T) {
return
}
shouldUpgrade := len(newSvc.Spec.IPFamilies) == 2 && *(newSvc.Spec.IPFamilyPolicy) != api.IPFamilyPolicySingleStack && len(storage.serviceIPAllocatorsByFamily) == 2
shouldUpgrade := len(newSvc.Spec.IPFamilies) == 2 && *(newSvc.Spec.IPFamilyPolicy) != api.IPFamilyPolicySingleStack && len(storage.alloc.serviceIPAllocatorsByFamily) == 2
if shouldUpgrade && len(newSvc.Spec.ClusterIPs) < 2 {
t.Fatalf("Service should have been upgraded %+v", newSvc)
}
@ -2037,7 +2043,7 @@ func TestInitClusterIP(t *testing.T) {
func TestInitNodePorts(t *testing.T) {
storage, server := NewTestREST(t, []api.IPFamily{api.IPv4Protocol})
defer server.Terminate(t)
nodePortOp := portallocator.StartOperation(storage.serviceNodePorts, false)
nodePortOp := portallocator.StartOperation(storage.alloc.serviceNodePorts, false)
testCases := []struct {
name string
@ -2103,7 +2109,7 @@ func TestInitNodePorts(t *testing.T) {
serviceNodePorts := collectServiceNodePorts(test.service)
if len(test.expectSpecifiedNodePorts) == 0 {
for _, nodePort := range serviceNodePorts {
if !storage.serviceNodePorts.Has(nodePort) {
if !storage.alloc.serviceNodePorts.Has(nodePort) {
t.Errorf("%q: unexpected NodePort %d, out of range", test.name, nodePort)
}
}
@ -2113,7 +2119,7 @@ func TestInitNodePorts(t *testing.T) {
for i := range serviceNodePorts {
nodePort := serviceNodePorts[i]
// Release the node port at the end of the test case.
storage.serviceNodePorts.Release(nodePort)
storage.alloc.serviceNodePorts.Release(nodePort)
}
}
}
@ -2121,7 +2127,7 @@ func TestInitNodePorts(t *testing.T) {
func TestUpdateNodePorts(t *testing.T) {
storage, server := NewTestREST(t, []api.IPFamily{api.IPv4Protocol})
defer server.Terminate(t)
nodePortOp := portallocator.StartOperation(storage.serviceNodePorts, false)
nodePortOp := portallocator.StartOperation(storage.alloc.serviceNodePorts, false)
testCases := []struct {
name string
@ -2218,7 +2224,7 @@ func TestUpdateNodePorts(t *testing.T) {
serviceNodePorts := collectServiceNodePorts(test.newService)
if len(test.expectSpecifiedNodePorts) == 0 {
for _, nodePort := range serviceNodePorts {
if !storage.serviceNodePorts.Has(nodePort) {
if !storage.alloc.serviceNodePorts.Has(nodePort) {
t.Errorf("%q: unexpected NodePort %d, out of range", test.name, nodePort)
}
}
@ -2228,7 +2234,7 @@ func TestUpdateNodePorts(t *testing.T) {
for i := range serviceNodePorts {
nodePort := serviceNodePorts[i]
// Release the node port at the end of the test case.
storage.serviceNodePorts.Release(nodePort)
storage.alloc.serviceNodePorts.Release(nodePort)
}
}
}
@ -2386,7 +2392,7 @@ func TestServiceUpgrade(t *testing.T) {
createdSvc := obj.(*api.Service)
// allocated IP
for family, ip := range testCase.allocateIPsBeforeUpdate {
alloc := storage.serviceIPAllocatorsByFamily[family]
alloc := storage.alloc.serviceIPAllocatorsByFamily[family]
if err := alloc.Allocate(netutils.ParseIPSloppy(ip)); err != nil {
t.Fatalf("test is incorrect, unable to preallocate ip:%v", ip)
}
@ -2418,7 +2424,7 @@ func TestServiceUpgrade(t *testing.T) {
updatedSvc := updated.(*api.Service)
isValidClusterIPFields(t, storage, updatedSvc, updatedSvc)
shouldUpgrade := len(createdSvc.Spec.IPFamilies) == 2 && *(createdSvc.Spec.IPFamilyPolicy) != api.IPFamilyPolicySingleStack && len(storage.serviceIPAllocatorsByFamily) == 2
shouldUpgrade := len(createdSvc.Spec.IPFamilies) == 2 && *(createdSvc.Spec.IPFamilyPolicy) != api.IPFamilyPolicySingleStack && len(storage.alloc.serviceIPAllocatorsByFamily) == 2
if shouldUpgrade && len(updatedSvc.Spec.ClusterIPs) < 2 {
t.Fatalf("Service should have been upgraded %+v", createdSvc)
}
@ -2430,7 +2436,7 @@ func TestServiceUpgrade(t *testing.T) {
// make sure that ips were allocated, correctly
for i, family := range updatedSvc.Spec.IPFamilies {
ip := updatedSvc.Spec.ClusterIPs[i]
allocator := storage.serviceIPAllocatorsByFamily[family]
allocator := storage.alloc.serviceIPAllocatorsByFamily[family]
if !ipIsAllocated(t, allocator, ip) {
t.Fatalf("expected ip:%v to be allocated by %v allocator. it was not", ip, family)
}
@ -2562,7 +2568,7 @@ func TestServiceDowngrade(t *testing.T) {
if shouldDowngrade {
releasedIP := copySvc.Spec.ClusterIPs[1]
releasedIPFamily := copySvc.Spec.IPFamilies[1]
allocator := storage.serviceIPAllocatorsByFamily[releasedIPFamily]
allocator := storage.alloc.serviceIPAllocatorsByFamily[releasedIPFamily]
if ipIsAllocated(t, allocator, releasedIP) {
t.Fatalf("expected ip:%v to be released by %v allocator. it was not", releasedIP, releasedIPFamily)
@ -2579,28 +2585,28 @@ func TestDefaultingValidation(t *testing.T) {
// takes in REST and modify it for a specific config
fnMakeSingleStackIPv4Allocator := func(rest *REST) {
rest.defaultServiceIPFamily = api.IPv4Protocol
rest.serviceIPAllocatorsByFamily = map[api.IPFamily]ipallocator.Interface{api.IPv4Protocol: rest.serviceIPAllocatorsByFamily[api.IPv4Protocol]}
rest.alloc.defaultServiceIPFamily = api.IPv4Protocol
rest.alloc.serviceIPAllocatorsByFamily = map[api.IPFamily]ipallocator.Interface{api.IPv4Protocol: rest.alloc.serviceIPAllocatorsByFamily[api.IPv4Protocol]}
}
fnMakeSingleStackIPv6Allocator := func(rest *REST) {
rest.defaultServiceIPFamily = api.IPv6Protocol
rest.serviceIPAllocatorsByFamily = map[api.IPFamily]ipallocator.Interface{api.IPv6Protocol: rest.serviceIPAllocatorsByFamily[api.IPv6Protocol]}
rest.alloc.defaultServiceIPFamily = api.IPv6Protocol
rest.alloc.serviceIPAllocatorsByFamily = map[api.IPFamily]ipallocator.Interface{api.IPv6Protocol: rest.alloc.serviceIPAllocatorsByFamily[api.IPv6Protocol]}
}
fnMakeDualStackStackIPv4IPv6Allocator := func(rest *REST) {
rest.defaultServiceIPFamily = api.IPv4Protocol
rest.serviceIPAllocatorsByFamily = map[api.IPFamily]ipallocator.Interface{
api.IPv6Protocol: rest.serviceIPAllocatorsByFamily[api.IPv6Protocol],
api.IPv4Protocol: rest.serviceIPAllocatorsByFamily[api.IPv4Protocol],
rest.alloc.defaultServiceIPFamily = api.IPv4Protocol
rest.alloc.serviceIPAllocatorsByFamily = map[api.IPFamily]ipallocator.Interface{
api.IPv6Protocol: rest.alloc.serviceIPAllocatorsByFamily[api.IPv6Protocol],
api.IPv4Protocol: rest.alloc.serviceIPAllocatorsByFamily[api.IPv4Protocol],
}
}
fnMakeDualStackStackIPv6IPv4Allocator := func(rest *REST) {
rest.defaultServiceIPFamily = api.IPv6Protocol
rest.serviceIPAllocatorsByFamily = map[api.IPFamily]ipallocator.Interface{
api.IPv6Protocol: rest.serviceIPAllocatorsByFamily[api.IPv6Protocol],
api.IPv4Protocol: rest.serviceIPAllocatorsByFamily[api.IPv4Protocol],
rest.alloc.defaultServiceIPFamily = api.IPv6Protocol
rest.alloc.serviceIPAllocatorsByFamily = map[api.IPFamily]ipallocator.Interface{
api.IPv6Protocol: rest.alloc.serviceIPAllocatorsByFamily[api.IPv6Protocol],
api.IPv4Protocol: rest.alloc.serviceIPAllocatorsByFamily[api.IPv4Protocol],
}
}