diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 1946e73c940..998672681cb 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -92,7 +92,7 @@ const sysctlBridgeCallIPTables = "net/bridge/bridge-nf-call-iptables" // internal struct for string service information type servicePortInfo struct { - *proxy.BaseServiceInfo + *proxy.BaseServicePortInfo // The following fields are computed and stored for performance reasons. nameString string clusterPolicyChainName utiliptables.Chain @@ -102,8 +102,8 @@ type servicePortInfo struct { } // returns a new proxy.ServicePort which abstracts a serviceInfo -func newServiceInfo(port *v1.ServicePort, service *v1.Service, baseInfo *proxy.BaseServiceInfo) proxy.ServicePort { - svcPort := &servicePortInfo{BaseServiceInfo: baseInfo} +func newServiceInfo(port *v1.ServicePort, service *v1.Service, bsvcPortInfo *proxy.BaseServicePortInfo) proxy.ServicePort { + svcPort := &servicePortInfo{BaseServicePortInfo: bsvcPortInfo} // Store the following for performance reasons. svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} @@ -157,7 +157,7 @@ type Proxier struct { serviceChanges *proxy.ServiceChangeTracker mu sync.Mutex // protects the following fields - serviceMap proxy.ServiceMap + svcPortMap proxy.ServicePortMap endpointsMap proxy.EndpointsMap nodeLabels map[string]string // endpointSlicesSynced, and servicesSynced are set to true @@ -267,7 +267,7 @@ func NewProxier(ipt utiliptables.Interface, } proxier := &Proxier{ - serviceMap: make(proxy.ServiceMap), + svcPortMap: make(proxy.ServicePortMap), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), endpointsMap: make(proxy.EndpointsMap), endpointsChanges: proxy.NewEndpointChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil), @@ -733,7 +733,7 @@ func isServiceChainName(chainString string) bool { // TODO: move it to util func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) { for _, epSvcPair := range connectionMap { - if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { + if svcInfo, ok := proxier.svcPortMap[epSvcPair.ServicePortName]; ok && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { endpointIP := utilproxy.IPPart(epSvcPair.Endpoint) nodePort := svcInfo.NodePort() svcProto := svcInfo.Protocol() @@ -812,7 +812,7 @@ func (proxier *Proxier) syncProxyRules() { serviceChanged = proxier.serviceChanges.PendingChanges() endpointsChanged = proxier.endpointsChanges.PendingChanges() } - serviceUpdateResult := proxier.serviceMap.Update(proxier.serviceChanges) + serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges) endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges) // We need to detect stale connections to UDP Services so we @@ -822,7 +822,7 @@ func (proxier *Proxier) syncProxyRules() { // merge stale services gathered from updateEndpointsMap // an UDP service that changes from 0 to non-0 endpoints is considered stale. for _, svcPortName := range endpointUpdateResult.StaleServiceNames { - if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { + if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP()) conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String()) for _, extIP := range svcInfo.ExternalIPStrings() { @@ -935,7 +935,7 @@ func (proxier *Proxier) syncProxyRules() { // Compute total number of endpoint chains across all services to get // a sense of how big the cluster is. totalEndpoints := 0 - for svcName := range proxier.serviceMap { + for svcName := range proxier.svcPortMap { totalEndpoints += len(proxier.endpointsMap[svcName]) } proxier.largeClusterMode = (totalEndpoints > largeClusterEndpointsThreshold) @@ -961,7 +961,7 @@ func (proxier *Proxier) syncProxyRules() { serviceNoLocalEndpointsTotalExternal := 0 // Build rules for each service-port. - for svcName, svc := range proxier.serviceMap { + for svcName, svc := range proxier.svcPortMap { svcInfo, ok := svc.(*servicePortInfo) if !ok { klog.ErrorS(nil, "Failed to cast serviceInfo", "serviceName", svcName) @@ -1491,7 +1491,7 @@ func (proxier *Proxier) syncProxyRules() { proxier.iptablesData.WriteString("COMMIT\n") klog.V(2).InfoS("Reloading service iptables data", - "numServices", len(proxier.serviceMap), + "numServices", len(proxier.svcPortMap), "numEndpoints", totalEndpoints, "numFilterChains", proxier.filterChains.Lines(), "numFilterRules", proxier.filterRules.Lines(), diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 38d5f2acd1e..6e965f7a7b0 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -162,7 +162,7 @@ func TestDeleteEndpointConnectionsIPv4(t *testing.T) { }), ) - fp.serviceMap.Update(fp.serviceChanges) + fp.svcPortMap.Update(fp.serviceChanges) } // Run the test cases @@ -305,7 +305,7 @@ func TestDeleteEndpointConnectionsIPv6(t *testing.T) { }), ) - fp.serviceMap.Update(fp.serviceChanges) + fp.svcPortMap.Update(fp.serviceChanges) } // Run the test cases @@ -404,7 +404,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { p := &Proxier{ exec: &fakeexec.FakeExec{}, - serviceMap: make(proxy.ServiceMap), + svcPortMap: make(proxy.ServicePortMap), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipfamily, nil, nil), endpointsMap: make(proxy.EndpointsMap), endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, newEndpointInfo, ipfamily, nil, nil), @@ -3562,9 +3562,9 @@ func TestBuildServiceMapAddRemove(t *testing.T) { for i := range services { fp.OnServiceAdd(services[i]) } - result := fp.serviceMap.Update(fp.serviceChanges) - if len(fp.serviceMap) != 10 { - t.Errorf("expected service map length 10, got %v", fp.serviceMap) + result := fp.svcPortMap.Update(fp.serviceChanges) + if len(fp.svcPortMap) != 10 { + t.Errorf("expected service map length 10, got %v", fp.svcPortMap) } // The only-local-loadbalancer ones get added @@ -3595,9 +3595,9 @@ func TestBuildServiceMapAddRemove(t *testing.T) { fp.OnServiceDelete(services[2]) fp.OnServiceDelete(services[3]) - result = fp.serviceMap.Update(fp.serviceChanges) - if len(fp.serviceMap) != 1 { - t.Errorf("expected service map length 1, got %v", fp.serviceMap) + result = fp.svcPortMap.Update(fp.serviceChanges) + if len(fp.svcPortMap) != 1 { + t.Errorf("expected service map length 1, got %v", fp.svcPortMap) } if len(result.HCServiceNodePorts) != 0 { @@ -3635,9 +3635,9 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) { ) // Headless service should be ignored - result := fp.serviceMap.Update(fp.serviceChanges) - if len(fp.serviceMap) != 0 { - t.Errorf("expected service map length 0, got %d", len(fp.serviceMap)) + result := fp.svcPortMap.Update(fp.serviceChanges) + if len(fp.svcPortMap) != 0 { + t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap)) } // No proxied services, so no healthchecks @@ -3663,9 +3663,9 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { }), ) - result := fp.serviceMap.Update(fp.serviceChanges) - if len(fp.serviceMap) != 0 { - t.Errorf("expected service map length 0, got %v", fp.serviceMap) + result := fp.svcPortMap.Update(fp.serviceChanges) + if len(fp.svcPortMap) != 0 { + t.Errorf("expected service map length 0, got %v", fp.svcPortMap) } // No proxied services, so no healthchecks if len(result.HCServiceNodePorts) != 0 { @@ -3703,9 +3703,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { fp.OnServiceAdd(servicev1) - result := fp.serviceMap.Update(fp.serviceChanges) - if len(fp.serviceMap) != 2 { - t.Errorf("expected service map length 2, got %v", fp.serviceMap) + result := fp.svcPortMap.Update(fp.serviceChanges) + if len(fp.svcPortMap) != 2 { + t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } if len(result.HCServiceNodePorts) != 0 { t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts) @@ -3717,9 +3717,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { // Change service to load-balancer fp.OnServiceUpdate(servicev1, servicev2) - result = fp.serviceMap.Update(fp.serviceChanges) - if len(fp.serviceMap) != 2 { - t.Errorf("expected service map length 2, got %v", fp.serviceMap) + result = fp.svcPortMap.Update(fp.serviceChanges) + if len(fp.svcPortMap) != 2 { + t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } if len(result.HCServiceNodePorts) != 1 { t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts) @@ -3731,9 +3731,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { // No change; make sure the service map stays the same and there are // no health-check changes fp.OnServiceUpdate(servicev2, servicev2) - result = fp.serviceMap.Update(fp.serviceChanges) - if len(fp.serviceMap) != 2 { - t.Errorf("expected service map length 2, got %v", fp.serviceMap) + result = fp.svcPortMap.Update(fp.serviceChanges) + if len(fp.svcPortMap) != 2 { + t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } if len(result.HCServiceNodePorts) != 1 { t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts) @@ -3744,9 +3744,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { // And back to ClusterIP fp.OnServiceUpdate(servicev2, servicev1) - result = fp.serviceMap.Update(fp.serviceChanges) - if len(fp.serviceMap) != 2 { - t.Errorf("expected service map length 2, got %v", fp.serviceMap) + result = fp.svcPortMap.Update(fp.serviceChanges) + if len(fp.svcPortMap) != 2 { + t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } if len(result.HCServiceNodePorts) != 0 { t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts) diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 8e73e9e45b1..6991bd92a37 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -228,7 +228,7 @@ type Proxier struct { serviceChanges *proxy.ServiceChangeTracker mu sync.Mutex // protects the following fields - serviceMap proxy.ServiceMap + svcPortMap proxy.ServicePortMap endpointsMap proxy.EndpointsMap nodeLabels map[string]string // initialSync is a bool indicating if the proxier is syncing for the first time. @@ -479,7 +479,7 @@ func NewProxier(ipt utiliptables.Interface, proxier := &Proxier{ ipFamily: ipFamily, - serviceMap: make(proxy.ServiceMap), + svcPortMap: make(proxy.ServicePortMap), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), endpointsMap: make(proxy.EndpointsMap), endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, ipFamily, recorder, nil), @@ -590,14 +590,14 @@ func filterCIDRs(wantIPv6 bool, cidrs []string) []string { // internal struct for string service information type servicePortInfo struct { - *proxy.BaseServiceInfo + *proxy.BaseServicePortInfo // The following fields are computed and stored for performance reasons. nameString string } // returns a new proxy.ServicePort which abstracts a serviceInfo -func newServiceInfo(port *v1.ServicePort, service *v1.Service, baseInfo *proxy.BaseServiceInfo) proxy.ServicePort { - svcPort := &servicePortInfo{BaseServiceInfo: baseInfo} +func newServiceInfo(port *v1.ServicePort, service *v1.Service, bsvcPortInfo *proxy.BaseServicePortInfo) proxy.ServicePort { + svcPort := &servicePortInfo{BaseServicePortInfo: bsvcPortInfo} // Store the following for performance reasons. svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} @@ -1042,13 +1042,13 @@ func (proxier *Proxier) syncProxyRules() { // We assume that if this was called, we really want to sync them, // even if nothing changed in the meantime. In other words, callers are // responsible for detecting no-op changes and not calling this function. - serviceUpdateResult := proxier.serviceMap.Update(proxier.serviceChanges) + serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges) endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges) staleServices := serviceUpdateResult.UDPStaleClusterIP // merge stale services gathered from updateEndpointsMap for _, svcPortName := range endpointUpdateResult.StaleServiceNames { - if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { + if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP()) staleServices.Insert(svcInfo.ClusterIP().String()) for _, extIP := range svcInfo.ExternalIPStrings() { @@ -1107,7 +1107,7 @@ func (proxier *Proxier) syncProxyRules() { } hasNodePort := false - for _, svc := range proxier.serviceMap { + for _, svc := range proxier.svcPortMap { svcInfo, ok := svc.(*servicePortInfo) if ok && svcInfo.NodePort() != 0 { hasNodePort = true @@ -1159,7 +1159,7 @@ func (proxier *Proxier) syncProxyRules() { nodeIPs = nodeIPs[:idx] // Build IPVS rules for each service. - for svcPortName, svcPort := range proxier.serviceMap { + for svcPortName, svcPort := range proxier.svcPortMap { svcInfo, ok := svcPort.(*servicePortInfo) if !ok { klog.ErrorS(nil, "Failed to cast serviceInfo", "servicePortName", svcPortName) @@ -1913,7 +1913,7 @@ func (proxier *Proxier) createAndLinkKubeChain() { // This assumes the proxier mutex is held func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) { for _, epSvcPair := range connectionMap { - if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { + if svcInfo, ok := proxier.svcPortMap[epSvcPair.ServicePortName]; ok && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { endpointIP := utilproxy.IPPart(epSvcPair.Endpoint) svcProto := svcInfo.Protocol() err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, svcProto) @@ -2003,7 +2003,7 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode // filter endpoints if appropriate feature gates are enabled and the // Service does not have conflicting configuration such as // externalTrafficPolicy=Local. - svcInfo, ok := proxier.serviceMap[svcPortName] + svcInfo, ok := proxier.svcPortMap[svcPortName] if !ok { klog.InfoS("Unable to filter endpoints due to missing service info", "servicePortName", svcPortName) } else { diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index d95715bced0..6db5ef1a51e 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -135,7 +135,7 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u } p := &Proxier{ exec: fexec, - serviceMap: make(proxy.ServiceMap), + svcPortMap: make(proxy.ServicePortMap), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, nil, nil), endpointsMap: make(proxy.EndpointsMap), endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, nil, ipFamily, nil, nil), @@ -2599,9 +2599,9 @@ func TestBuildServiceMapAddRemove(t *testing.T) { for i := range services { fp.OnServiceAdd(services[i]) } - result := fp.serviceMap.Update(fp.serviceChanges) - if len(fp.serviceMap) != 12 { - t.Errorf("expected service map length 12, got %v", fp.serviceMap) + result := fp.svcPortMap.Update(fp.serviceChanges) + if len(fp.svcPortMap) != 12 { + t.Errorf("expected service map length 12, got %v", fp.svcPortMap) } // The only-local-loadbalancer ones get added @@ -2632,9 +2632,9 @@ func TestBuildServiceMapAddRemove(t *testing.T) { fp.OnServiceDelete(services[2]) fp.OnServiceDelete(services[3]) - result = fp.serviceMap.Update(fp.serviceChanges) - if len(fp.serviceMap) != 1 { - t.Errorf("expected service map length 1, got %v", fp.serviceMap) + result = fp.svcPortMap.Update(fp.serviceChanges) + if len(fp.svcPortMap) != 1 { + t.Errorf("expected service map length 1, got %v", fp.svcPortMap) } if len(result.HCServiceNodePorts) != 0 { @@ -2679,9 +2679,9 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) { ) // Headless service should be ignored - result := fp.serviceMap.Update(fp.serviceChanges) - if len(fp.serviceMap) != 0 { - t.Errorf("expected service map length 0, got %d", len(fp.serviceMap)) + result := fp.svcPortMap.Update(fp.serviceChanges) + if len(fp.svcPortMap) != 0 { + t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap)) } // No proxied services, so no healthchecks @@ -2709,9 +2709,9 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { }), ) - result := fp.serviceMap.Update(fp.serviceChanges) - if len(fp.serviceMap) != 0 { - t.Errorf("expected service map length 0, got %v", fp.serviceMap) + result := fp.svcPortMap.Update(fp.serviceChanges) + if len(fp.svcPortMap) != 0 { + t.Errorf("expected service map length 0, got %v", fp.svcPortMap) } // No proxied services, so no healthchecks if len(result.HCServiceNodePorts) != 0 { @@ -2751,9 +2751,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { fp.OnServiceAdd(servicev1) - result := fp.serviceMap.Update(fp.serviceChanges) - if len(fp.serviceMap) != 2 { - t.Errorf("expected service map length 2, got %v", fp.serviceMap) + result := fp.svcPortMap.Update(fp.serviceChanges) + if len(fp.svcPortMap) != 2 { + t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } if len(result.HCServiceNodePorts) != 0 { t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts) @@ -2765,9 +2765,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { // Change service to load-balancer fp.OnServiceUpdate(servicev1, servicev2) - result = fp.serviceMap.Update(fp.serviceChanges) - if len(fp.serviceMap) != 2 { - t.Errorf("expected service map length 2, got %v", fp.serviceMap) + result = fp.svcPortMap.Update(fp.serviceChanges) + if len(fp.svcPortMap) != 2 { + t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } if len(result.HCServiceNodePorts) != 1 { t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts) @@ -2779,9 +2779,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { // No change; make sure the service map stays the same and there are // no health-check changes fp.OnServiceUpdate(servicev2, servicev2) - result = fp.serviceMap.Update(fp.serviceChanges) - if len(fp.serviceMap) != 2 { - t.Errorf("expected service map length 2, got %v", fp.serviceMap) + result = fp.svcPortMap.Update(fp.serviceChanges) + if len(fp.svcPortMap) != 2 { + t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } if len(result.HCServiceNodePorts) != 1 { t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts) @@ -2792,9 +2792,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { // And back to ClusterIP fp.OnServiceUpdate(servicev2, servicev1) - result = fp.serviceMap.Update(fp.serviceChanges) - if len(fp.serviceMap) != 2 { - t.Errorf("expected service map length 2, got %v", fp.serviceMap) + result = fp.svcPortMap.Update(fp.serviceChanges) + if len(fp.svcPortMap) != 2 { + t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } if len(result.HCServiceNodePorts) != 0 { t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts) diff --git a/pkg/proxy/service.go b/pkg/proxy/service.go index e3d39f4c7cf..a9a66e02785 100644 --- a/pkg/proxy/service.go +++ b/pkg/proxy/service.go @@ -37,11 +37,11 @@ import ( utilproxy "k8s.io/kubernetes/pkg/proxy/util" ) -// BaseServiceInfo contains base information that defines a service. +// BaseServicePortInfo contains base information that defines a service. // This could be used directly by proxier while processing services, // or can be used for constructing a more specific ServiceInfo struct // defined by the proxier if needed. -type BaseServiceInfo struct { +type BaseServicePortInfo struct { clusterIP net.IP port int protocol v1.Protocol @@ -58,106 +58,106 @@ type BaseServiceInfo struct { hintsAnnotation string } -var _ ServicePort = &BaseServiceInfo{} +var _ ServicePort = &BaseServicePortInfo{} // String is part of ServicePort interface. -func (info *BaseServiceInfo) String() string { - return fmt.Sprintf("%s:%d/%s", info.clusterIP, info.port, info.protocol) +func (bsvcPortInfo *BaseServicePortInfo) String() string { + return fmt.Sprintf("%s:%d/%s", bsvcPortInfo.clusterIP, bsvcPortInfo.port, bsvcPortInfo.protocol) } // ClusterIP is part of ServicePort interface. -func (info *BaseServiceInfo) ClusterIP() net.IP { - return info.clusterIP +func (bsvcPortInfo *BaseServicePortInfo) ClusterIP() net.IP { + return bsvcPortInfo.clusterIP } // Port is part of ServicePort interface. -func (info *BaseServiceInfo) Port() int { - return info.port +func (bsvcPortInfo *BaseServicePortInfo) Port() int { + return bsvcPortInfo.port } // SessionAffinityType is part of the ServicePort interface. -func (info *BaseServiceInfo) SessionAffinityType() v1.ServiceAffinity { - return info.sessionAffinityType +func (bsvcPortInfo *BaseServicePortInfo) SessionAffinityType() v1.ServiceAffinity { + return bsvcPortInfo.sessionAffinityType } // StickyMaxAgeSeconds is part of the ServicePort interface -func (info *BaseServiceInfo) StickyMaxAgeSeconds() int { - return info.stickyMaxAgeSeconds +func (bsvcPortInfo *BaseServicePortInfo) StickyMaxAgeSeconds() int { + return bsvcPortInfo.stickyMaxAgeSeconds } // Protocol is part of ServicePort interface. -func (info *BaseServiceInfo) Protocol() v1.Protocol { - return info.protocol +func (bsvcPortInfo *BaseServicePortInfo) Protocol() v1.Protocol { + return bsvcPortInfo.protocol } // LoadBalancerSourceRanges is part of ServicePort interface -func (info *BaseServiceInfo) LoadBalancerSourceRanges() []string { - return info.loadBalancerSourceRanges +func (bsvcPortInfo *BaseServicePortInfo) LoadBalancerSourceRanges() []string { + return bsvcPortInfo.loadBalancerSourceRanges } // HealthCheckNodePort is part of ServicePort interface. -func (info *BaseServiceInfo) HealthCheckNodePort() int { - return info.healthCheckNodePort +func (bsvcPortInfo *BaseServicePortInfo) HealthCheckNodePort() int { + return bsvcPortInfo.healthCheckNodePort } // NodePort is part of the ServicePort interface. -func (info *BaseServiceInfo) NodePort() int { - return info.nodePort +func (bsvcPortInfo *BaseServicePortInfo) NodePort() int { + return bsvcPortInfo.nodePort } // ExternalIPStrings is part of ServicePort interface. -func (info *BaseServiceInfo) ExternalIPStrings() []string { - return info.externalIPs +func (bsvcPortInfo *BaseServicePortInfo) ExternalIPStrings() []string { + return bsvcPortInfo.externalIPs } // LoadBalancerIPStrings is part of ServicePort interface. -func (info *BaseServiceInfo) LoadBalancerIPStrings() []string { +func (bsvcPortInfo *BaseServicePortInfo) LoadBalancerIPStrings() []string { var ips []string - for _, ing := range info.loadBalancerStatus.Ingress { + for _, ing := range bsvcPortInfo.loadBalancerStatus.Ingress { ips = append(ips, ing.IP) } return ips } // ExternalPolicyLocal is part of ServicePort interface. -func (info *BaseServiceInfo) ExternalPolicyLocal() bool { - return info.externalPolicyLocal +func (bsvcPortInfo *BaseServicePortInfo) ExternalPolicyLocal() bool { + return bsvcPortInfo.externalPolicyLocal } // InternalPolicyLocal is part of ServicePort interface -func (info *BaseServiceInfo) InternalPolicyLocal() bool { - return info.internalPolicyLocal +func (bsvcPortInfo *BaseServicePortInfo) InternalPolicyLocal() bool { + return bsvcPortInfo.internalPolicyLocal } // InternalTrafficPolicy is part of ServicePort interface -func (info *BaseServiceInfo) InternalTrafficPolicy() *v1.ServiceInternalTrafficPolicyType { - return info.internalTrafficPolicy +func (bsvcPortInfo *BaseServicePortInfo) InternalTrafficPolicy() *v1.ServiceInternalTrafficPolicyType { + return bsvcPortInfo.internalTrafficPolicy } // HintsAnnotation is part of ServicePort interface. -func (info *BaseServiceInfo) HintsAnnotation() string { - return info.hintsAnnotation +func (bsvcPortInfo *BaseServicePortInfo) HintsAnnotation() string { + return bsvcPortInfo.hintsAnnotation } // ExternallyAccessible is part of ServicePort interface. -func (info *BaseServiceInfo) ExternallyAccessible() bool { - return info.nodePort != 0 || len(info.loadBalancerStatus.Ingress) != 0 || len(info.externalIPs) != 0 +func (bsvcPortInfo *BaseServicePortInfo) ExternallyAccessible() bool { + return bsvcPortInfo.nodePort != 0 || len(bsvcPortInfo.loadBalancerStatus.Ingress) != 0 || len(bsvcPortInfo.externalIPs) != 0 } // UsesClusterEndpoints is part of ServicePort interface. -func (info *BaseServiceInfo) UsesClusterEndpoints() bool { +func (bsvcPortInfo *BaseServicePortInfo) UsesClusterEndpoints() bool { // The service port uses Cluster endpoints if the internal traffic policy is "Cluster", // or if it accepts external traffic at all. (Even if the external traffic policy is // "Local", we need Cluster endpoints to implement short circuiting.) - return !info.internalPolicyLocal || info.ExternallyAccessible() + return !bsvcPortInfo.internalPolicyLocal || bsvcPortInfo.ExternallyAccessible() } // UsesLocalEndpoints is part of ServicePort interface. -func (info *BaseServiceInfo) UsesLocalEndpoints() bool { - return info.internalPolicyLocal || (info.externalPolicyLocal && info.ExternallyAccessible()) +func (bsvcPortInfo *BaseServicePortInfo) UsesLocalEndpoints() bool { + return bsvcPortInfo.internalPolicyLocal || (bsvcPortInfo.externalPolicyLocal && bsvcPortInfo.ExternallyAccessible()) } -func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, service *v1.Service) *BaseServiceInfo { +func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, service *v1.Service) *BaseServicePortInfo { externalPolicyLocal := false if apiservice.ExternalPolicyLocal(service) { externalPolicyLocal = true @@ -173,7 +173,7 @@ func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, servic } clusterIP := utilproxy.GetClusterIPByFamily(sct.ipFamily, service) - info := &BaseServiceInfo{ + info := &BaseServicePortInfo{ clusterIP: netutils.ParseIPSloppy(clusterIP), port: int(port.Port), protocol: port.Protocol, @@ -245,18 +245,18 @@ func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, servic return info } -type makeServicePortFunc func(*v1.ServicePort, *v1.Service, *BaseServiceInfo) ServicePort +type makeServicePortFunc func(*v1.ServicePort, *v1.Service, *BaseServicePortInfo) ServicePort // This handler is invoked by the apply function on every change. This function should not modify the -// ServiceMap's but just use the changes for any Proxier specific cleanup. -type processServiceMapChangeFunc func(previous, current ServiceMap) +// ServicePortMap's but just use the changes for any Proxier specific cleanup. +type processServiceMapChangeFunc func(previous, current ServicePortMap) // serviceChange contains all changes to services that happened since proxy rules were synced. For a single object, // changes are accumulated, i.e. previous is state from before applying the changes, // current is state after applying all of the changes. type serviceChange struct { - previous ServiceMap - current ServiceMap + previous ServicePortMap + current ServicePortMap } // ServiceChangeTracker carries state about uncommitted changes to an arbitrary number of @@ -352,13 +352,13 @@ type UpdateServiceMapResult struct { UDPStaleClusterIP sets.String } -// Update updates ServiceMap base on the given changes. -func (sm ServiceMap) Update(changes *ServiceChangeTracker) (result UpdateServiceMapResult) { +// Update updates ServicePortMap base on the given changes. +func (sm ServicePortMap) Update(changes *ServiceChangeTracker) (result UpdateServiceMapResult) { result.UDPStaleClusterIP = sets.NewString() sm.apply(changes, result.UDPStaleClusterIP) // TODO: If this will appear to be computationally expensive, consider - // computing this incrementally similarly to serviceMap. + // computing this incrementally similarly to svcPortMap. result.HCServiceNodePorts = make(map[types.NamespacedName]uint16) for svcPortName, info := range sm { if info.HealthCheckNodePort() != 0 { @@ -369,13 +369,13 @@ func (sm ServiceMap) Update(changes *ServiceChangeTracker) (result UpdateService return result } -// ServiceMap maps a service to its ServicePort. -type ServiceMap map[ServicePortName]ServicePort +// ServicePortMap maps a service to its ServicePort. +type ServicePortMap map[ServicePortName]ServicePort -// serviceToServiceMap translates a single Service object to a ServiceMap. +// serviceToServiceMap translates a single Service object to a ServicePortMap. // // NOTE: service object should NOT be modified. -func (sct *ServiceChangeTracker) serviceToServiceMap(service *v1.Service) ServiceMap { +func (sct *ServiceChangeTracker) serviceToServiceMap(service *v1.Service) ServicePortMap { if service == nil { return nil } @@ -389,25 +389,25 @@ func (sct *ServiceChangeTracker) serviceToServiceMap(service *v1.Service) Servic return nil } - serviceMap := make(ServiceMap) + svcPortMap := make(ServicePortMap) svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} for i := range service.Spec.Ports { servicePort := &service.Spec.Ports[i] svcPortName := ServicePortName{NamespacedName: svcName, Port: servicePort.Name, Protocol: servicePort.Protocol} baseSvcInfo := sct.newBaseServiceInfo(servicePort, service) if sct.makeServiceInfo != nil { - serviceMap[svcPortName] = sct.makeServiceInfo(servicePort, service, baseSvcInfo) + svcPortMap[svcPortName] = sct.makeServiceInfo(servicePort, service, baseSvcInfo) } else { - serviceMap[svcPortName] = baseSvcInfo + svcPortMap[svcPortName] = baseSvcInfo } } - return serviceMap + return svcPortMap } -// apply the changes to ServiceMap and update the stale udp cluster IP set. The UDPStaleClusterIP argument is passed in to store the -// udp protocol service cluster ip when service is deleted from the ServiceMap. +// apply the changes to ServicePortMap and update the stale udp cluster IP set. The UDPStaleClusterIP argument is passed in to store the +// udp protocol service cluster ip when service is deleted from the ServicePortMap. // apply triggers processServiceMapChange on every change. -func (sm *ServiceMap) apply(changes *ServiceChangeTracker, UDPStaleClusterIP sets.String) { +func (sm *ServicePortMap) apply(changes *ServiceChangeTracker, UDPStaleClusterIP sets.String) { changes.lock.Lock() defer changes.lock.Unlock() for _, change := range changes.items { @@ -420,20 +420,20 @@ func (sm *ServiceMap) apply(changes *ServiceChangeTracker, UDPStaleClusterIP set change.previous.filter(change.current) sm.unmerge(change.previous, UDPStaleClusterIP) } - // clear changes after applying them to ServiceMap. + // clear changes after applying them to ServicePortMap. changes.items = make(map[types.NamespacedName]*serviceChange) metrics.ServiceChangesPending.Set(0) } -// merge adds other ServiceMap's elements to current ServiceMap. +// merge adds other ServicePortMap's elements to current ServicePortMap. // If collision, other ALWAYS win. Otherwise add the other to current. // In other words, if some elements in current collisions with other, update the current by other. // It returns a string type set which stores all the newly merged services' identifier, ServicePortName.String(), to help users // tell if a service is deleted or updated. -// The returned value is one of the arguments of ServiceMap.unmerge(). -// ServiceMap A Merge ServiceMap B will do following 2 things: -// - update ServiceMap A. -// - produce a string set which stores all other ServiceMap's ServicePortName.String(). +// The returned value is one of the arguments of ServicePortMap.unmerge(). +// ServicePortMap A Merge ServicePortMap B will do following 2 things: +// - update ServicePortMap A. +// - produce a string set which stores all other ServicePortMap's ServicePortName.String(). // // For example, // - A{} @@ -444,8 +444,8 @@ func (sm *ServiceMap) apply(changes *ServiceChangeTracker, UDPStaleClusterIP set // - B{{"ns", "cluster-ip", "http"}: {"172.16.55.10", 1234, "TCP"}} // - A updated to be {{"ns", "cluster-ip", "http"}: {"172.16.55.10", 1234, "TCP"}} // - produce string set {"ns/cluster-ip:http"} -func (sm *ServiceMap) merge(other ServiceMap) sets.String { - // existingPorts is going to store all identifiers of all services in `other` ServiceMap. +func (sm *ServicePortMap) merge(other ServicePortMap) sets.String { + // existingPorts is going to store all identifiers of all services in `other` ServicePortMap. existingPorts := sets.NewString() for svcPortName, info := range other { // Take ServicePortName.String() as the newly merged service's identifier and put it into existingPorts. @@ -461,8 +461,8 @@ func (sm *ServiceMap) merge(other ServiceMap) sets.String { return existingPorts } -// filter filters out elements from ServiceMap base on given ports string sets. -func (sm *ServiceMap) filter(other ServiceMap) { +// filter filters out elements from ServicePortMap base on given ports string sets. +func (sm *ServicePortMap) filter(other ServicePortMap) { for svcPortName := range *sm { // skip the delete for Update event. if _, ok := other[svcPortName]; ok { @@ -471,9 +471,9 @@ func (sm *ServiceMap) filter(other ServiceMap) { } } -// unmerge deletes all other ServiceMap's elements from current ServiceMap. We pass in the UDPStaleClusterIP strings sets +// unmerge deletes all other ServicePortMap's elements from current ServicePortMap. We pass in the UDPStaleClusterIP strings sets // for storing the stale udp service cluster IPs. We will clear stale udp connection base on UDPStaleClusterIP later -func (sm *ServiceMap) unmerge(other ServiceMap, UDPStaleClusterIP sets.String) { +func (sm *ServicePortMap) unmerge(other ServicePortMap, UDPStaleClusterIP sets.String) { for svcPortName := range other { info, exists := (*sm)[svcPortName] if exists { diff --git a/pkg/proxy/service_test.go b/pkg/proxy/service_test.go index f7959446d2c..885d8de7db2 100644 --- a/pkg/proxy/service_test.go +++ b/pkg/proxy/service_test.go @@ -33,19 +33,19 @@ import ( const testHostname = "test-hostname" -func makeTestServiceInfo(clusterIP string, port int, protocol string, healthcheckNodePort int, svcInfoFuncs ...func(*BaseServiceInfo)) *BaseServiceInfo { - info := &BaseServiceInfo{ +func makeTestServiceInfo(clusterIP string, port int, protocol string, healthcheckNodePort int, svcInfoFuncs ...func(*BaseServicePortInfo)) *BaseServicePortInfo { + bsvcPortInfo := &BaseServicePortInfo{ clusterIP: netutils.ParseIPSloppy(clusterIP), port: port, protocol: v1.Protocol(protocol), } if healthcheckNodePort != 0 { - info.healthCheckNodePort = healthcheckNodePort + bsvcPortInfo.healthCheckNodePort = healthcheckNodePort } for _, svcInfoFunc := range svcInfoFuncs { - svcInfoFunc(info) + svcInfoFunc(bsvcPortInfo) } - return info + return bsvcPortInfo } func makeTestService(namespace, name string, svcFunc func(*v1.Service)) *v1.Service { @@ -96,7 +96,7 @@ func TestServiceToServiceMap(t *testing.T) { testCases := []struct { desc string service *v1.Service - expected map[ServicePortName]*BaseServiceInfo + expected map[ServicePortName]*BaseServicePortInfo ipFamily v1.IPFamily }{ { @@ -104,7 +104,7 @@ func TestServiceToServiceMap(t *testing.T) { ipFamily: v1.IPv4Protocol, service: nil, - expected: map[ServicePortName]*BaseServiceInfo{}, + expected: map[ServicePortName]*BaseServicePortInfo{}, }, { desc: "headless service", @@ -115,7 +115,7 @@ func TestServiceToServiceMap(t *testing.T) { svc.Spec.ClusterIP = v1.ClusterIPNone svc.Spec.Ports = addTestPort(svc.Spec.Ports, "rpc", "UDP", 1234, 0, 0) }), - expected: map[ServicePortName]*BaseServiceInfo{}, + expected: map[ServicePortName]*BaseServicePortInfo{}, }, { desc: "headless sctp service", @@ -126,7 +126,7 @@ func TestServiceToServiceMap(t *testing.T) { svc.Spec.ClusterIP = v1.ClusterIPNone svc.Spec.Ports = addTestPort(svc.Spec.Ports, "sip", "SCTP", 7777, 0, 0) }), - expected: map[ServicePortName]*BaseServiceInfo{}, + expected: map[ServicePortName]*BaseServicePortInfo{}, }, { desc: "headless service without port", @@ -136,7 +136,7 @@ func TestServiceToServiceMap(t *testing.T) { svc.Spec.Type = v1.ServiceTypeClusterIP svc.Spec.ClusterIP = v1.ClusterIPNone }), - expected: map[ServicePortName]*BaseServiceInfo{}, + expected: map[ServicePortName]*BaseServicePortInfo{}, }, { desc: "cluster ip service", @@ -148,7 +148,7 @@ func TestServiceToServiceMap(t *testing.T) { svc.Spec.Ports = addTestPort(svc.Spec.Ports, "p1", "UDP", 1234, 4321, 0) svc.Spec.Ports = addTestPort(svc.Spec.Ports, "p2", "UDP", 1235, 5321, 0) }), - expected: map[ServicePortName]*BaseServiceInfo{ + expected: map[ServicePortName]*BaseServicePortInfo{ makeServicePortName("ns2", "cluster-ip", "p1", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.4", 1234, "UDP", 0), makeServicePortName("ns2", "cluster-ip", "p2", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.4", 1235, "UDP", 0), }, @@ -163,7 +163,7 @@ func TestServiceToServiceMap(t *testing.T) { svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port1", "UDP", 345, 678, 0) svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port2", "TCP", 344, 677, 0) }), - expected: map[ServicePortName]*BaseServiceInfo{ + expected: map[ServicePortName]*BaseServicePortInfo{ makeServicePortName("ns2", "node-port", "port1", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.10", 345, "UDP", 0), makeServicePortName("ns2", "node-port", "port2", v1.ProtocolTCP): makeTestServiceInfo("172.16.55.10", 344, "TCP", 0), }, @@ -180,12 +180,12 @@ func TestServiceToServiceMap(t *testing.T) { svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port4", "UDP", 8676, 30062, 7001) svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{IP: "10.1.2.4"}} }), - expected: map[ServicePortName]*BaseServiceInfo{ - makeServicePortName("ns1", "load-balancer", "port3", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.11", 8675, "UDP", 0, func(info *BaseServiceInfo) { - info.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: "10.1.2.4"}} + expected: map[ServicePortName]*BaseServicePortInfo{ + makeServicePortName("ns1", "load-balancer", "port3", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.11", 8675, "UDP", 0, func(bsvcPortInfo *BaseServicePortInfo) { + bsvcPortInfo.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: "10.1.2.4"}} }), - makeServicePortName("ns1", "load-balancer", "port4", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.11", 8676, "UDP", 0, func(info *BaseServiceInfo) { - info.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: "10.1.2.4"}} + makeServicePortName("ns1", "load-balancer", "port4", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.11", 8676, "UDP", 0, func(bsvcPortInfo *BaseServicePortInfo) { + bsvcPortInfo.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: "10.1.2.4"}} }), }, }, @@ -203,12 +203,12 @@ func TestServiceToServiceMap(t *testing.T) { svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal svc.Spec.HealthCheckNodePort = 345 }), - expected: map[ServicePortName]*BaseServiceInfo{ - makeServicePortName("ns1", "only-local-load-balancer", "portx", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.12", 8677, "UDP", 345, func(info *BaseServiceInfo) { - info.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: "10.1.2.3"}} + expected: map[ServicePortName]*BaseServicePortInfo{ + makeServicePortName("ns1", "only-local-load-balancer", "portx", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.12", 8677, "UDP", 345, func(bsvcPortInfo *BaseServicePortInfo) { + bsvcPortInfo.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: "10.1.2.3"}} }), - makeServicePortName("ns1", "only-local-load-balancer", "porty", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.12", 8678, "UDP", 345, func(info *BaseServiceInfo) { - info.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: "10.1.2.3"}} + makeServicePortName("ns1", "only-local-load-balancer", "porty", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.12", 8678, "UDP", 345, func(bsvcPortInfo *BaseServicePortInfo) { + bsvcPortInfo.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: "10.1.2.3"}} }), }, }, @@ -222,7 +222,7 @@ func TestServiceToServiceMap(t *testing.T) { svc.Spec.ExternalName = "foo2.bar.com" svc.Spec.Ports = addTestPort(svc.Spec.Ports, "portz", "UDP", 1235, 5321, 0) }), - expected: map[ServicePortName]*BaseServiceInfo{}, + expected: map[ServicePortName]*BaseServicePortInfo{}, }, { desc: "service with ipv6 clusterIP under ipv4 mode, service should be filtered", @@ -312,11 +312,11 @@ func TestServiceToServiceMap(t *testing.T) { }, }, }, - expected: map[ServicePortName]*BaseServiceInfo{ - makeServicePortName("test", "validIPv4", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(info *BaseServiceInfo) { - info.externalIPs = []string{testExternalIPv4} - info.loadBalancerSourceRanges = []string{testSourceRangeIPv4} - info.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: testExternalIPv4}} + expected: map[ServicePortName]*BaseServicePortInfo{ + makeServicePortName("test", "validIPv4", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(bsvcPortInfo *BaseServicePortInfo) { + bsvcPortInfo.externalIPs = []string{testExternalIPv4} + bsvcPortInfo.loadBalancerSourceRanges = []string{testSourceRangeIPv4} + bsvcPortInfo.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: testExternalIPv4}} }), }, }, @@ -350,11 +350,11 @@ func TestServiceToServiceMap(t *testing.T) { }, }, }, - expected: map[ServicePortName]*BaseServiceInfo{ - makeServicePortName("test", "validIPv6", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv6, 12345, "TCP", 0, func(info *BaseServiceInfo) { - info.externalIPs = []string{testExternalIPv6} - info.loadBalancerSourceRanges = []string{testSourceRangeIPv6} - info.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: testExternalIPv6}} + expected: map[ServicePortName]*BaseServicePortInfo{ + makeServicePortName("test", "validIPv6", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv6, 12345, "TCP", 0, func(bsvcPortInfo *BaseServicePortInfo) { + bsvcPortInfo.externalIPs = []string{testExternalIPv6} + bsvcPortInfo.loadBalancerSourceRanges = []string{testSourceRangeIPv6} + bsvcPortInfo.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: testExternalIPv6}} }), }, }, @@ -388,11 +388,11 @@ func TestServiceToServiceMap(t *testing.T) { }, }, }, - expected: map[ServicePortName]*BaseServiceInfo{ - makeServicePortName("test", "filterIPv6InIPV4Mode", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(info *BaseServiceInfo) { - info.externalIPs = []string{testExternalIPv4} - info.loadBalancerSourceRanges = []string{testSourceRangeIPv4} - info.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: testExternalIPv4}} + expected: map[ServicePortName]*BaseServicePortInfo{ + makeServicePortName("test", "filterIPv6InIPV4Mode", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(bsvcPortInfo *BaseServicePortInfo) { + bsvcPortInfo.externalIPs = []string{testExternalIPv4} + bsvcPortInfo.loadBalancerSourceRanges = []string{testSourceRangeIPv4} + bsvcPortInfo.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: testExternalIPv4}} }), }, }, @@ -426,11 +426,11 @@ func TestServiceToServiceMap(t *testing.T) { }, }, }, - expected: map[ServicePortName]*BaseServiceInfo{ - makeServicePortName("test", "filterIPv4InIPV6Mode", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv6, 12345, "TCP", 0, func(info *BaseServiceInfo) { - info.externalIPs = []string{testExternalIPv6} - info.loadBalancerSourceRanges = []string{testSourceRangeIPv6} - info.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: testExternalIPv6}} + expected: map[ServicePortName]*BaseServicePortInfo{ + makeServicePortName("test", "filterIPv4InIPV6Mode", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv6, 12345, "TCP", 0, func(bsvcPortInfo *BaseServicePortInfo) { + bsvcPortInfo.externalIPs = []string{testExternalIPv6} + bsvcPortInfo.loadBalancerSourceRanges = []string{testSourceRangeIPv6} + bsvcPortInfo.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: testExternalIPv6}} }), }, }, @@ -455,9 +455,9 @@ func TestServiceToServiceMap(t *testing.T) { }, }, }, - expected: map[ServicePortName]*BaseServiceInfo{ - makeServicePortName("test", "extra-space", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(info *BaseServiceInfo) { - info.loadBalancerSourceRanges = []string{"10.1.2.0/28"} + expected: map[ServicePortName]*BaseServicePortInfo{ + makeServicePortName("test", "extra-space", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(bsvcPortInfo *BaseServicePortInfo) { + bsvcPortInfo.loadBalancerSourceRanges = []string{"10.1.2.0/28"} }), }, }, @@ -473,7 +473,7 @@ func TestServiceToServiceMap(t *testing.T) { t.Fatalf("expected %d new, got %d: %v", len(tc.expected), len(newServices), spew.Sdump(newServices)) } for svcKey, expectedInfo := range tc.expected { - svcInfo, exists := newServices[svcKey].(*BaseServiceInfo) + svcInfo, exists := newServices[svcKey].(*BaseServicePortInfo) if !exists { t.Fatalf("[%s] expected to find key %s", tc.desc, svcKey) } @@ -488,7 +488,7 @@ func TestServiceToServiceMap(t *testing.T) { t.Errorf("[%s] expected new[%v]to be %v, got %v", tc.desc, svcKey, expectedInfo, *svcInfo) } for svcKey, expectedInfo := range tc.expected { - svcInfo, _ := newServices[svcKey].(*BaseServiceInfo) + svcInfo, _ := newServices[svcKey].(*BaseServicePortInfo) if !svcInfo.clusterIP.Equal(expectedInfo.clusterIP) || svcInfo.port != expectedInfo.port || svcInfo.protocol != expectedInfo.protocol || @@ -507,14 +507,14 @@ func TestServiceToServiceMap(t *testing.T) { type FakeProxier struct { endpointsChanges *EndpointChangeTracker serviceChanges *ServiceChangeTracker - serviceMap ServiceMap + svcPortMap ServicePortMap endpointsMap EndpointsMap hostname string } func newFakeProxier(ipFamily v1.IPFamily, t time.Time) *FakeProxier { return &FakeProxier{ - serviceMap: make(ServiceMap), + svcPortMap: make(ServicePortMap), serviceChanges: NewServiceChangeTracker(nil, ipFamily, nil, nil), endpointsMap: make(EndpointsMap), endpointsChanges: &EndpointChangeTracker{ @@ -568,9 +568,9 @@ func TestServiceMapUpdateHeadless(t *testing.T) { if pending.Len() != 0 { t.Errorf("expected 0 pending service changes, got %d", pending.Len()) } - result := fp.serviceMap.Update(fp.serviceChanges) - if len(fp.serviceMap) != 0 { - t.Errorf("expected service map length 0, got %d", len(fp.serviceMap)) + result := fp.svcPortMap.Update(fp.serviceChanges) + if len(fp.svcPortMap) != 0 { + t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap)) } // No proxied services, so no healthchecks @@ -599,9 +599,9 @@ func TestUpdateServiceTypeExternalName(t *testing.T) { if pending.Len() != 0 { t.Errorf("expected 0 pending service changes, got %d", pending.Len()) } - result := fp.serviceMap.Update(fp.serviceChanges) - if len(fp.serviceMap) != 0 { - t.Errorf("expected service map length 0, got %v", fp.serviceMap) + result := fp.svcPortMap.Update(fp.serviceChanges) + if len(fp.svcPortMap) != 0 { + t.Errorf("expected service map length 0, got %v", fp.svcPortMap) } // No proxied services, so no healthchecks if len(result.HCServiceNodePorts) != 0 { @@ -671,9 +671,9 @@ func TestBuildServiceMapAddRemove(t *testing.T) { t.Errorf("expected %d pending service changes, got %d", len(services), pending.Len()) } - result := fp.serviceMap.Update(fp.serviceChanges) - if len(fp.serviceMap) != 8 { - t.Errorf("expected service map length 2, got %v", fp.serviceMap) + result := fp.svcPortMap.Update(fp.serviceChanges) + if len(fp.svcPortMap) != 8 { + t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } // The only-local-loadbalancer ones get added @@ -708,9 +708,9 @@ func TestBuildServiceMapAddRemove(t *testing.T) { if pending.Len() != 4 { t.Errorf("expected 4 pending service changes, got %d", pending.Len()) } - result = fp.serviceMap.Update(fp.serviceChanges) - if len(fp.serviceMap) != 1 { - t.Errorf("expected service map length 1, got %v", fp.serviceMap) + result = fp.svcPortMap.Update(fp.serviceChanges) + if len(fp.svcPortMap) != 1 { + t.Errorf("expected service map length 1, got %v", fp.svcPortMap) } if len(result.HCServiceNodePorts) != 0 { @@ -761,9 +761,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if pending.Len() != 1 { t.Errorf("expected 1 pending service change, got %d", pending.Len()) } - result := fp.serviceMap.Update(fp.serviceChanges) - if len(fp.serviceMap) != 2 { - t.Errorf("expected service map length 2, got %v", fp.serviceMap) + result := fp.svcPortMap.Update(fp.serviceChanges) + if len(fp.svcPortMap) != 2 { + t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } if len(result.HCServiceNodePorts) != 0 { t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts) @@ -779,9 +779,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if pending.Len() != 1 { t.Errorf("expected 1 pending service change, got %d", pending.Len()) } - result = fp.serviceMap.Update(fp.serviceChanges) - if len(fp.serviceMap) != 2 { - t.Errorf("expected service map length 2, got %v", fp.serviceMap) + result = fp.svcPortMap.Update(fp.serviceChanges) + if len(fp.svcPortMap) != 2 { + t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } if len(result.HCServiceNodePorts) != 1 { t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts) @@ -797,9 +797,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if pending.Len() != 0 { t.Errorf("expected 0 pending service changes, got %d", pending.Len()) } - result = fp.serviceMap.Update(fp.serviceChanges) - if len(fp.serviceMap) != 2 { - t.Errorf("expected service map length 2, got %v", fp.serviceMap) + result = fp.svcPortMap.Update(fp.serviceChanges) + if len(fp.svcPortMap) != 2 { + t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } if len(result.HCServiceNodePorts) != 1 { t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts) @@ -814,9 +814,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if pending.Len() != 1 { t.Errorf("expected 1 pending service change, got %d", pending.Len()) } - result = fp.serviceMap.Update(fp.serviceChanges) - if len(fp.serviceMap) != 2 { - t.Errorf("expected service map length 2, got %v", fp.serviceMap) + result = fp.svcPortMap.Update(fp.serviceChanges) + if len(fp.svcPortMap) != 2 { + t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } if len(result.HCServiceNodePorts) != 0 { t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts) diff --git a/pkg/proxy/topology_test.go b/pkg/proxy/topology_test.go index 362af7f2d0e..3dcc3a38bac 100644 --- a/pkg/proxy/topology_test.go +++ b/pkg/proxy/topology_test.go @@ -68,7 +68,7 @@ func TestCategorizeEndpoints(t *testing.T) { name: "hints enabled, hints annotation == auto", hintsEnabled: true, nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, - serviceInfo: &BaseServiceInfo{hintsAnnotation: "auto"}, + serviceInfo: &BaseServicePortInfo{hintsAnnotation: "auto"}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true}, @@ -81,7 +81,7 @@ func TestCategorizeEndpoints(t *testing.T) { name: "hints, hints annotation == disabled, hints ignored", hintsEnabled: true, nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, - serviceInfo: &BaseServiceInfo{hintsAnnotation: "disabled"}, + serviceInfo: &BaseServicePortInfo{hintsAnnotation: "disabled"}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true}, @@ -94,7 +94,7 @@ func TestCategorizeEndpoints(t *testing.T) { name: "hints disabled, hints annotation == auto", hintsEnabled: false, nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, - serviceInfo: &BaseServiceInfo{hintsAnnotation: "auto"}, + serviceInfo: &BaseServicePortInfo{hintsAnnotation: "auto"}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true}, @@ -108,7 +108,7 @@ func TestCategorizeEndpoints(t *testing.T) { name: "hints, hints annotation == aUto (wrong capitalization), hints ignored", hintsEnabled: true, nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, - serviceInfo: &BaseServiceInfo{hintsAnnotation: "aUto"}, + serviceInfo: &BaseServicePortInfo{hintsAnnotation: "aUto"}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true}, @@ -121,7 +121,7 @@ func TestCategorizeEndpoints(t *testing.T) { name: "hints, hints annotation empty, hints ignored", hintsEnabled: true, nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, - serviceInfo: &BaseServiceInfo{}, + serviceInfo: &BaseServicePortInfo{}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true}, @@ -134,7 +134,7 @@ func TestCategorizeEndpoints(t *testing.T) { name: "externalTrafficPolicy: Local, topology ignored for Local endpoints", hintsEnabled: true, nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, - serviceInfo: &BaseServiceInfo{externalPolicyLocal: true, nodePort: 8080, hintsAnnotation: "auto"}, + serviceInfo: &BaseServicePortInfo{externalPolicyLocal: true, nodePort: 8080, hintsAnnotation: "auto"}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true, IsLocal: true}, &BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true, IsLocal: true}, @@ -148,7 +148,7 @@ func TestCategorizeEndpoints(t *testing.T) { name: "internalTrafficPolicy: Local, topology ignored for Local endpoints", hintsEnabled: true, nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, - serviceInfo: &BaseServiceInfo{internalPolicyLocal: true, hintsAnnotation: "auto", externalPolicyLocal: false, nodePort: 8080}, + serviceInfo: &BaseServicePortInfo{internalPolicyLocal: true, hintsAnnotation: "auto", externalPolicyLocal: false, nodePort: 8080}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true, IsLocal: true}, &BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true, IsLocal: true}, @@ -162,7 +162,7 @@ func TestCategorizeEndpoints(t *testing.T) { name: "empty node labels", hintsEnabled: true, nodeLabels: map[string]string{}, - serviceInfo: &BaseServiceInfo{hintsAnnotation: "auto"}, + serviceInfo: &BaseServicePortInfo{hintsAnnotation: "auto"}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, }, @@ -172,7 +172,7 @@ func TestCategorizeEndpoints(t *testing.T) { name: "empty zone label", hintsEnabled: true, nodeLabels: map[string]string{v1.LabelTopologyZone: ""}, - serviceInfo: &BaseServiceInfo{hintsAnnotation: "auto"}, + serviceInfo: &BaseServicePortInfo{hintsAnnotation: "auto"}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, }, @@ -182,7 +182,7 @@ func TestCategorizeEndpoints(t *testing.T) { name: "node in different zone, no endpoint filtering", hintsEnabled: true, nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-b"}, - serviceInfo: &BaseServiceInfo{hintsAnnotation: "auto"}, + serviceInfo: &BaseServicePortInfo{hintsAnnotation: "auto"}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, }, @@ -192,7 +192,7 @@ func TestCategorizeEndpoints(t *testing.T) { name: "normal endpoint filtering, auto annotation", hintsEnabled: true, nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, - serviceInfo: &BaseServiceInfo{hintsAnnotation: "auto"}, + serviceInfo: &BaseServicePortInfo{hintsAnnotation: "auto"}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true}, @@ -205,7 +205,7 @@ func TestCategorizeEndpoints(t *testing.T) { name: "unready endpoint", hintsEnabled: true, nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, - serviceInfo: &BaseServiceInfo{hintsAnnotation: "auto"}, + serviceInfo: &BaseServicePortInfo{hintsAnnotation: "auto"}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true}, @@ -218,7 +218,7 @@ func TestCategorizeEndpoints(t *testing.T) { name: "only unready endpoints in same zone (should not filter)", hintsEnabled: true, nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, - serviceInfo: &BaseServiceInfo{hintsAnnotation: "auto"}, + serviceInfo: &BaseServicePortInfo{hintsAnnotation: "auto"}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: false}, &BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true}, @@ -231,7 +231,7 @@ func TestCategorizeEndpoints(t *testing.T) { name: "normal endpoint filtering, Auto annotation", hintsEnabled: true, nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, - serviceInfo: &BaseServiceInfo{hintsAnnotation: "Auto"}, + serviceInfo: &BaseServicePortInfo{hintsAnnotation: "Auto"}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true}, @@ -244,7 +244,7 @@ func TestCategorizeEndpoints(t *testing.T) { name: "hintsAnnotation empty, no filtering applied", hintsEnabled: true, nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, - serviceInfo: &BaseServiceInfo{hintsAnnotation: ""}, + serviceInfo: &BaseServicePortInfo{hintsAnnotation: ""}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true}, @@ -257,7 +257,7 @@ func TestCategorizeEndpoints(t *testing.T) { name: "hintsAnnotation disabled, no filtering applied", hintsEnabled: true, nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, - serviceInfo: &BaseServiceInfo{hintsAnnotation: "disabled"}, + serviceInfo: &BaseServicePortInfo{hintsAnnotation: "disabled"}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true}, @@ -270,7 +270,7 @@ func TestCategorizeEndpoints(t *testing.T) { name: "missing hints, no filtering applied", hintsEnabled: true, nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, - serviceInfo: &BaseServiceInfo{hintsAnnotation: "auto"}, + serviceInfo: &BaseServicePortInfo{hintsAnnotation: "auto"}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true}, @@ -283,7 +283,7 @@ func TestCategorizeEndpoints(t *testing.T) { name: "multiple hints per endpoint, filtering includes any endpoint with zone included", hintsEnabled: true, nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-c"}, - serviceInfo: &BaseServiceInfo{hintsAnnotation: "auto"}, + serviceInfo: &BaseServicePortInfo{hintsAnnotation: "auto"}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a", "zone-b", "zone-c"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b", "zone-c"), Ready: true}, @@ -296,7 +296,7 @@ func TestCategorizeEndpoints(t *testing.T) { name: "conflicting topology and localness require merging allEndpoints", hintsEnabled: true, nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, - serviceInfo: &BaseServiceInfo{internalPolicyLocal: false, externalPolicyLocal: true, nodePort: 8080, hintsAnnotation: "auto"}, + serviceInfo: &BaseServicePortInfo{internalPolicyLocal: false, externalPolicyLocal: true, nodePort: 8080, hintsAnnotation: "auto"}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.0.0.0:80", ZoneHints: sets.NewString("zone-a"), Ready: true, IsLocal: true}, &BaseEndpointInfo{Endpoint: "10.0.0.1:80", ZoneHints: sets.NewString("zone-b"), Ready: true, IsLocal: true}, @@ -308,13 +308,13 @@ func TestCategorizeEndpoints(t *testing.T) { allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80", "10.0.0.2:80"), }, { name: "internalTrafficPolicy: Local, with empty endpoints", - serviceInfo: &BaseServiceInfo{internalPolicyLocal: true}, + serviceInfo: &BaseServicePortInfo{internalPolicyLocal: true}, endpoints: []Endpoint{}, clusterEndpoints: nil, localEndpoints: sets.NewString(), }, { name: "internalTrafficPolicy: Local, but all endpoints are remote", - serviceInfo: &BaseServiceInfo{internalPolicyLocal: true}, + serviceInfo: &BaseServicePortInfo{internalPolicyLocal: true}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false}, &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false}, @@ -324,7 +324,7 @@ func TestCategorizeEndpoints(t *testing.T) { onlyRemoteEndpoints: true, }, { name: "internalTrafficPolicy: Local, all endpoints are local", - serviceInfo: &BaseServiceInfo{internalPolicyLocal: true}, + serviceInfo: &BaseServicePortInfo{internalPolicyLocal: true}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: true}, &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: true}, @@ -333,7 +333,7 @@ func TestCategorizeEndpoints(t *testing.T) { localEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), }, { name: "internalTrafficPolicy: Local, some endpoints are local", - serviceInfo: &BaseServiceInfo{internalPolicyLocal: true}, + serviceInfo: &BaseServicePortInfo{internalPolicyLocal: true}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: true}, &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false}, @@ -342,7 +342,7 @@ func TestCategorizeEndpoints(t *testing.T) { localEndpoints: sets.NewString("10.0.0.0:80"), }, { name: "Cluster traffic policy, endpoints not Ready", - serviceInfo: &BaseServiceInfo{}, + serviceInfo: &BaseServicePortInfo{}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false}, &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false}, @@ -351,7 +351,7 @@ func TestCategorizeEndpoints(t *testing.T) { localEndpoints: nil, }, { name: "Cluster traffic policy, some endpoints are Ready", - serviceInfo: &BaseServiceInfo{}, + serviceInfo: &BaseServicePortInfo{}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false}, &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true}, @@ -361,7 +361,7 @@ func TestCategorizeEndpoints(t *testing.T) { }, { name: "Cluster traffic policy, PTE enabled, all endpoints are terminating", pteEnabled: true, - serviceInfo: &BaseServiceInfo{}, + serviceInfo: &BaseServicePortInfo{}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false, Serving: true, Terminating: true, IsLocal: true}, &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: true, Terminating: true, IsLocal: false}, @@ -371,7 +371,7 @@ func TestCategorizeEndpoints(t *testing.T) { }, { name: "Cluster traffic policy, PTE disabled, all endpoints are terminating", pteEnabled: false, - serviceInfo: &BaseServiceInfo{}, + serviceInfo: &BaseServicePortInfo{}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false, Serving: true, Terminating: true, IsLocal: true}, &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: true, Terminating: true, IsLocal: false}, @@ -380,7 +380,7 @@ func TestCategorizeEndpoints(t *testing.T) { localEndpoints: nil, }, { name: "iTP: Local, eTP: Cluster, some endpoints local", - serviceInfo: &BaseServiceInfo{internalPolicyLocal: true, externalPolicyLocal: false, nodePort: 8080}, + serviceInfo: &BaseServicePortInfo{internalPolicyLocal: true, externalPolicyLocal: false, nodePort: 8080}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: true}, &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false}, @@ -390,7 +390,7 @@ func TestCategorizeEndpoints(t *testing.T) { allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), }, { name: "iTP: Cluster, eTP: Local, some endpoints local", - serviceInfo: &BaseServiceInfo{internalPolicyLocal: false, externalPolicyLocal: true, nodePort: 8080}, + serviceInfo: &BaseServicePortInfo{internalPolicyLocal: false, externalPolicyLocal: true, nodePort: 8080}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: true}, &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false}, @@ -400,7 +400,7 @@ func TestCategorizeEndpoints(t *testing.T) { allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), }, { name: "iTP: Local, eTP: Local, some endpoints local", - serviceInfo: &BaseServiceInfo{internalPolicyLocal: true, externalPolicyLocal: true, nodePort: 8080}, + serviceInfo: &BaseServicePortInfo{internalPolicyLocal: true, externalPolicyLocal: true, nodePort: 8080}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: true}, &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false}, @@ -410,7 +410,7 @@ func TestCategorizeEndpoints(t *testing.T) { allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), }, { name: "iTP: Local, eTP: Local, all endpoints remote", - serviceInfo: &BaseServiceInfo{internalPolicyLocal: true, externalPolicyLocal: true, nodePort: 8080}, + serviceInfo: &BaseServicePortInfo{internalPolicyLocal: true, externalPolicyLocal: true, nodePort: 8080}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false}, &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false}, @@ -420,7 +420,7 @@ func TestCategorizeEndpoints(t *testing.T) { allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), }, { name: "iTP: Local, eTP: Local, PTE disabled, all endpoints remote and terminating", - serviceInfo: &BaseServiceInfo{internalPolicyLocal: true, externalPolicyLocal: true, nodePort: 8080}, + serviceInfo: &BaseServicePortInfo{internalPolicyLocal: true, externalPolicyLocal: true, nodePort: 8080}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false, Serving: true, Terminating: true, IsLocal: false}, &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: true, Terminating: true, IsLocal: false}, @@ -431,7 +431,7 @@ func TestCategorizeEndpoints(t *testing.T) { }, { name: "iTP: Local, eTP: Local, PTE enabled, all endpoints remote and terminating", pteEnabled: true, - serviceInfo: &BaseServiceInfo{internalPolicyLocal: true, externalPolicyLocal: true, nodePort: 8080}, + serviceInfo: &BaseServicePortInfo{internalPolicyLocal: true, externalPolicyLocal: true, nodePort: 8080}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false, Serving: true, Terminating: true, IsLocal: false}, &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: true, Terminating: true, IsLocal: false}, @@ -442,7 +442,7 @@ func TestCategorizeEndpoints(t *testing.T) { onlyRemoteEndpoints: true, }, { name: "iTP: Cluster, eTP: Local, PTE disabled, with terminating endpoints", - serviceInfo: &BaseServiceInfo{internalPolicyLocal: false, externalPolicyLocal: true, nodePort: 8080}, + serviceInfo: &BaseServicePortInfo{internalPolicyLocal: false, externalPolicyLocal: true, nodePort: 8080}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false}, &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: false, IsLocal: true}, @@ -455,7 +455,7 @@ func TestCategorizeEndpoints(t *testing.T) { }, { name: "iTP: Cluster, eTP: Local, PTE enabled, with terminating endpoints", pteEnabled: true, - serviceInfo: &BaseServiceInfo{internalPolicyLocal: false, externalPolicyLocal: true, nodePort: 8080}, + serviceInfo: &BaseServicePortInfo{internalPolicyLocal: false, externalPolicyLocal: true, nodePort: 8080}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false}, &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: false, IsLocal: true}, @@ -467,7 +467,7 @@ func TestCategorizeEndpoints(t *testing.T) { allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.2:80"), }, { name: "externalTrafficPolicy ignored if not externally accessible", - serviceInfo: &BaseServiceInfo{externalPolicyLocal: true}, + serviceInfo: &BaseServicePortInfo{externalPolicyLocal: true}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false}, &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: true}, @@ -477,7 +477,7 @@ func TestCategorizeEndpoints(t *testing.T) { allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), }, { name: "no cluster endpoints for iTP:Local internal-only service", - serviceInfo: &BaseServiceInfo{internalPolicyLocal: true}, + serviceInfo: &BaseServicePortInfo{internalPolicyLocal: true}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false}, &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: true}, diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index 235ea7680d2..887c9adece3 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -116,7 +116,7 @@ type loadBalancerFlags struct { // internal struct for string service information type serviceInfo struct { - *proxy.BaseServiceInfo + *proxy.BaseServicePortInfo targetPort int externalIPs []*externalIPInfo loadBalancerIngressIPs []*loadBalancerIngressInfo @@ -323,7 +323,7 @@ func (proxier *Proxier) endpointsMapChange(oldEndpointsMap, newEndpointsMap prox func (proxier *Proxier) onEndpointsMapChange(svcPortName *proxy.ServicePortName) { - svc, exists := proxier.serviceMap[*svcPortName] + svc, exists := proxier.svcPortMap[*svcPortName] if exists { svcInfo, ok := svc.(*serviceInfo) @@ -355,7 +355,7 @@ func (proxier *Proxier) onEndpointsMapChange(svcPortName *proxy.ServicePortName) } } -func (proxier *Proxier) serviceMapChange(previous, current proxy.ServiceMap) { +func (proxier *Proxier) serviceMapChange(previous, current proxy.ServicePortMap) { for svcPortName := range current { proxier.onServiceMapChange(&svcPortName) } @@ -370,7 +370,7 @@ func (proxier *Proxier) serviceMapChange(previous, current proxy.ServiceMap) { func (proxier *Proxier) onServiceMapChange(svcPortName *proxy.ServicePortName) { - svc, exists := proxier.serviceMap[*svcPortName] + svc, exists := proxier.svcPortMap[*svcPortName] if exists { svcInfo, ok := svc.(*serviceInfo) @@ -458,8 +458,8 @@ func (refCountMap endPointsReferenceCountMap) getRefCount(hnsID string) *uint16 } // returns a new proxy.ServicePort which abstracts a serviceInfo -func (proxier *Proxier) newServiceInfo(port *v1.ServicePort, service *v1.Service, baseInfo *proxy.BaseServiceInfo) proxy.ServicePort { - info := &serviceInfo{BaseServiceInfo: baseInfo} +func (proxier *Proxier) newServiceInfo(port *v1.ServicePort, service *v1.Service, bsvcPortInfo *proxy.BaseServicePortInfo) proxy.ServicePort { + info := &serviceInfo{BaseServicePortInfo: bsvcPortInfo} preserveDIP := service.Annotations["preserve-destination"] == "true" localTrafficDSR := service.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal err := hcn.DSRSupported() @@ -525,7 +525,7 @@ type Proxier struct { serviceChanges *proxy.ServiceChangeTracker endPointsRefCount endPointsReferenceCountMap mu sync.Mutex // protects the following fields - serviceMap proxy.ServiceMap + svcPortMap proxy.ServicePortMap endpointsMap proxy.EndpointsMap // endpointSlicesSynced and servicesSynced are set to true when corresponding // objects are synced after startup. This is used to avoid updating hns policies @@ -699,7 +699,7 @@ func NewProxier( isIPv6 := netutils.IsIPv6(nodeIP) proxier := &Proxier{ endPointsRefCount: make(endPointsReferenceCountMap), - serviceMap: make(proxy.ServiceMap), + svcPortMap: make(proxy.ServicePortMap), endpointsMap: make(proxy.EndpointsMap), masqueradeAll: masqueradeAll, masqueradeMark: masqueradeMark, @@ -986,7 +986,7 @@ func (proxier *Proxier) OnEndpointSlicesSynced() { } func (proxier *Proxier) cleanupAllPolicies() { - for svcName, svc := range proxier.serviceMap { + for svcName, svc := range proxier.svcPortMap { svcInfo, ok := svc.(*serviceInfo) if !ok { klog.ErrorS(nil, "Failed to cast serviceInfo", "serviceName", svcName) @@ -1050,13 +1050,13 @@ func (proxier *Proxier) syncProxyRules() { // We assume that if this was called, we really want to sync them, // even if nothing changed in the meantime. In other words, callers are // responsible for detecting no-op changes and not calling this function. - serviceUpdateResult := proxier.serviceMap.Update(proxier.serviceChanges) + serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges) endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges) staleServices := serviceUpdateResult.UDPStaleClusterIP // merge stale services gathered from updateEndpointsMap for _, svcPortName := range endpointUpdateResult.StaleServiceNames { - if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP { + if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP { klog.V(2).InfoS("Stale udp service", "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP()) staleServices.Insert(svcInfo.ClusterIP().String()) } @@ -1093,7 +1093,7 @@ func (proxier *Proxier) syncProxyRules() { klog.V(3).InfoS("Syncing Policies") // Program HNS by adding corresponding policies for each service. - for svcName, svc := range proxier.serviceMap { + for svcName, svc := range proxier.svcPortMap { svcInfo, ok := svc.(*serviceInfo) if !ok { klog.ErrorS(nil, "Failed to cast serviceInfo", "serviceName", svcName) diff --git a/pkg/proxy/winkernel/proxier_test.go b/pkg/proxy/winkernel/proxier_test.go index a7ac7b36528..46a3d4fba81 100644 --- a/pkg/proxy/winkernel/proxier_test.go +++ b/pkg/proxy/winkernel/proxier_test.go @@ -138,7 +138,7 @@ func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clust networkType: networkType, } proxier := &Proxier{ - serviceMap: make(proxy.ServiceMap), + svcPortMap: make(proxy.ServicePortMap), endpointsMap: make(proxy.EndpointsMap), clusterCIDR: clusterCIDR, hostname: testHostName, @@ -201,7 +201,7 @@ func TestCreateServiceVip(t *testing.T) { proxier.setInitialized(true) proxier.syncProxyRules() - svc := proxier.serviceMap[svcPortName] + svc := proxier.svcPortMap[svcPortName] svcInfo, ok := svc.(*serviceInfo) if !ok { t.Errorf("Failed to cast serviceInfo %q", svcPortName.String()) @@ -707,7 +707,7 @@ func TestCreateLoadBalancer(t *testing.T) { proxier.setInitialized(true) proxier.syncProxyRules() - svc := proxier.serviceMap[svcPortName] + svc := proxier.svcPortMap[svcPortName] svcInfo, ok := svc.(*serviceInfo) if !ok { t.Errorf("Failed to cast serviceInfo %q", svcPortName.String()) @@ -770,7 +770,7 @@ func TestCreateDsrLoadBalancer(t *testing.T) { proxier.setInitialized(true) proxier.syncProxyRules() - svc := proxier.serviceMap[svcPortName] + svc := proxier.svcPortMap[svcPortName] svcInfo, ok := svc.(*serviceInfo) if !ok { t.Errorf("Failed to cast serviceInfo %q", svcPortName.String()) @@ -840,7 +840,7 @@ func TestEndpointSlice(t *testing.T) { proxier.setInitialized(true) proxier.syncProxyRules() - svc := proxier.serviceMap[svcPortName] + svc := proxier.svcPortMap[svcPortName] svcInfo, ok := svc.(*serviceInfo) if !ok { t.Errorf("Failed to cast serviceInfo %q", svcPortName.String())