Merge pull request #113277 from manav014/master

Cleanup: kube-proxy internal naming
This commit is contained in:
Kubernetes Prow Robot 2022-11-02 18:21:05 -07:00 committed by GitHub
commit 86a6ace994
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 274 additions and 274 deletions

View File

@ -92,7 +92,7 @@ const sysctlBridgeCallIPTables = "net/bridge/bridge-nf-call-iptables"
// internal struct for string service information
type servicePortInfo struct {
*proxy.BaseServiceInfo
*proxy.BaseServicePortInfo
// The following fields are computed and stored for performance reasons.
nameString string
clusterPolicyChainName utiliptables.Chain
@ -102,8 +102,8 @@ type servicePortInfo struct {
}
// returns a new proxy.ServicePort which abstracts a serviceInfo
func newServiceInfo(port *v1.ServicePort, service *v1.Service, baseInfo *proxy.BaseServiceInfo) proxy.ServicePort {
svcPort := &servicePortInfo{BaseServiceInfo: baseInfo}
func newServiceInfo(port *v1.ServicePort, service *v1.Service, bsvcPortInfo *proxy.BaseServicePortInfo) proxy.ServicePort {
svcPort := &servicePortInfo{BaseServicePortInfo: bsvcPortInfo}
// Store the following for performance reasons.
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
@ -157,7 +157,7 @@ type Proxier struct {
serviceChanges *proxy.ServiceChangeTracker
mu sync.Mutex // protects the following fields
serviceMap proxy.ServiceMap
svcPortMap proxy.ServicePortMap
endpointsMap proxy.EndpointsMap
nodeLabels map[string]string
// endpointSlicesSynced, and servicesSynced are set to true
@ -267,7 +267,7 @@ func NewProxier(ipt utiliptables.Interface,
}
proxier := &Proxier{
serviceMap: make(proxy.ServiceMap),
svcPortMap: make(proxy.ServicePortMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil),
@ -733,7 +733,7 @@ func isServiceChainName(chainString string) bool {
// TODO: move it to util
func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) {
for _, epSvcPair := range connectionMap {
if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
if svcInfo, ok := proxier.svcPortMap[epSvcPair.ServicePortName]; ok && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
nodePort := svcInfo.NodePort()
svcProto := svcInfo.Protocol()
@ -812,7 +812,7 @@ func (proxier *Proxier) syncProxyRules() {
serviceChanged = proxier.serviceChanges.PendingChanges()
endpointsChanged = proxier.endpointsChanges.PendingChanges()
}
serviceUpdateResult := proxier.serviceMap.Update(proxier.serviceChanges)
serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges)
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
// We need to detect stale connections to UDP Services so we
@ -822,7 +822,7 @@ func (proxier *Proxier) syncProxyRules() {
// merge stale services gathered from updateEndpointsMap
// an UDP service that changes from 0 to non-0 endpoints is considered stale.
for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP())
conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String())
for _, extIP := range svcInfo.ExternalIPStrings() {
@ -935,7 +935,7 @@ func (proxier *Proxier) syncProxyRules() {
// Compute total number of endpoint chains across all services to get
// a sense of how big the cluster is.
totalEndpoints := 0
for svcName := range proxier.serviceMap {
for svcName := range proxier.svcPortMap {
totalEndpoints += len(proxier.endpointsMap[svcName])
}
proxier.largeClusterMode = (totalEndpoints > largeClusterEndpointsThreshold)
@ -961,7 +961,7 @@ func (proxier *Proxier) syncProxyRules() {
serviceNoLocalEndpointsTotalExternal := 0
// Build rules for each service-port.
for svcName, svc := range proxier.serviceMap {
for svcName, svc := range proxier.svcPortMap {
svcInfo, ok := svc.(*servicePortInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast serviceInfo", "serviceName", svcName)
@ -1491,7 +1491,7 @@ func (proxier *Proxier) syncProxyRules() {
proxier.iptablesData.WriteString("COMMIT\n")
klog.V(2).InfoS("Reloading service iptables data",
"numServices", len(proxier.serviceMap),
"numServices", len(proxier.svcPortMap),
"numEndpoints", totalEndpoints,
"numFilterChains", proxier.filterChains.Lines(),
"numFilterRules", proxier.filterRules.Lines(),

View File

@ -162,7 +162,7 @@ func TestDeleteEndpointConnectionsIPv4(t *testing.T) {
}),
)
fp.serviceMap.Update(fp.serviceChanges)
fp.svcPortMap.Update(fp.serviceChanges)
}
// Run the test cases
@ -305,7 +305,7 @@ func TestDeleteEndpointConnectionsIPv6(t *testing.T) {
}),
)
fp.serviceMap.Update(fp.serviceChanges)
fp.svcPortMap.Update(fp.serviceChanges)
}
// Run the test cases
@ -404,7 +404,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
p := &Proxier{
exec: &fakeexec.FakeExec{},
serviceMap: make(proxy.ServiceMap),
svcPortMap: make(proxy.ServicePortMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipfamily, nil, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, newEndpointInfo, ipfamily, nil, nil),
@ -3562,9 +3562,9 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
for i := range services {
fp.OnServiceAdd(services[i])
}
result := fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 10 {
t.Errorf("expected service map length 10, got %v", fp.serviceMap)
result := fp.svcPortMap.Update(fp.serviceChanges)
if len(fp.svcPortMap) != 10 {
t.Errorf("expected service map length 10, got %v", fp.svcPortMap)
}
// The only-local-loadbalancer ones get added
@ -3595,9 +3595,9 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
fp.OnServiceDelete(services[2])
fp.OnServiceDelete(services[3])
result = fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 1 {
t.Errorf("expected service map length 1, got %v", fp.serviceMap)
result = fp.svcPortMap.Update(fp.serviceChanges)
if len(fp.svcPortMap) != 1 {
t.Errorf("expected service map length 1, got %v", fp.svcPortMap)
}
if len(result.HCServiceNodePorts) != 0 {
@ -3635,9 +3635,9 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) {
)
// Headless service should be ignored
result := fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 0 {
t.Errorf("expected service map length 0, got %d", len(fp.serviceMap))
result := fp.svcPortMap.Update(fp.serviceChanges)
if len(fp.svcPortMap) != 0 {
t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap))
}
// No proxied services, so no healthchecks
@ -3663,9 +3663,9 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
}),
)
result := fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 0 {
t.Errorf("expected service map length 0, got %v", fp.serviceMap)
result := fp.svcPortMap.Update(fp.serviceChanges)
if len(fp.svcPortMap) != 0 {
t.Errorf("expected service map length 0, got %v", fp.svcPortMap)
}
// No proxied services, so no healthchecks
if len(result.HCServiceNodePorts) != 0 {
@ -3703,9 +3703,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
fp.OnServiceAdd(servicev1)
result := fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
result := fp.svcPortMap.Update(fp.serviceChanges)
if len(fp.svcPortMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
}
if len(result.HCServiceNodePorts) != 0 {
t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
@ -3717,9 +3717,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
// Change service to load-balancer
fp.OnServiceUpdate(servicev1, servicev2)
result = fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
result = fp.svcPortMap.Update(fp.serviceChanges)
if len(fp.svcPortMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
}
if len(result.HCServiceNodePorts) != 1 {
t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts)
@ -3731,9 +3731,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
// No change; make sure the service map stays the same and there are
// no health-check changes
fp.OnServiceUpdate(servicev2, servicev2)
result = fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
result = fp.svcPortMap.Update(fp.serviceChanges)
if len(fp.svcPortMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
}
if len(result.HCServiceNodePorts) != 1 {
t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts)
@ -3744,9 +3744,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
// And back to ClusterIP
fp.OnServiceUpdate(servicev2, servicev1)
result = fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
result = fp.svcPortMap.Update(fp.serviceChanges)
if len(fp.svcPortMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
}
if len(result.HCServiceNodePorts) != 0 {
t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)

View File

@ -228,7 +228,7 @@ type Proxier struct {
serviceChanges *proxy.ServiceChangeTracker
mu sync.Mutex // protects the following fields
serviceMap proxy.ServiceMap
svcPortMap proxy.ServicePortMap
endpointsMap proxy.EndpointsMap
nodeLabels map[string]string
// initialSync is a bool indicating if the proxier is syncing for the first time.
@ -479,7 +479,7 @@ func NewProxier(ipt utiliptables.Interface,
proxier := &Proxier{
ipFamily: ipFamily,
serviceMap: make(proxy.ServiceMap),
svcPortMap: make(proxy.ServicePortMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, ipFamily, recorder, nil),
@ -590,14 +590,14 @@ func filterCIDRs(wantIPv6 bool, cidrs []string) []string {
// internal struct for string service information
type servicePortInfo struct {
*proxy.BaseServiceInfo
*proxy.BaseServicePortInfo
// The following fields are computed and stored for performance reasons.
nameString string
}
// returns a new proxy.ServicePort which abstracts a serviceInfo
func newServiceInfo(port *v1.ServicePort, service *v1.Service, baseInfo *proxy.BaseServiceInfo) proxy.ServicePort {
svcPort := &servicePortInfo{BaseServiceInfo: baseInfo}
func newServiceInfo(port *v1.ServicePort, service *v1.Service, bsvcPortInfo *proxy.BaseServicePortInfo) proxy.ServicePort {
svcPort := &servicePortInfo{BaseServicePortInfo: bsvcPortInfo}
// Store the following for performance reasons.
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
@ -1042,13 +1042,13 @@ func (proxier *Proxier) syncProxyRules() {
// We assume that if this was called, we really want to sync them,
// even if nothing changed in the meantime. In other words, callers are
// responsible for detecting no-op changes and not calling this function.
serviceUpdateResult := proxier.serviceMap.Update(proxier.serviceChanges)
serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges)
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
staleServices := serviceUpdateResult.UDPStaleClusterIP
// merge stale services gathered from updateEndpointsMap
for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP())
staleServices.Insert(svcInfo.ClusterIP().String())
for _, extIP := range svcInfo.ExternalIPStrings() {
@ -1107,7 +1107,7 @@ func (proxier *Proxier) syncProxyRules() {
}
hasNodePort := false
for _, svc := range proxier.serviceMap {
for _, svc := range proxier.svcPortMap {
svcInfo, ok := svc.(*servicePortInfo)
if ok && svcInfo.NodePort() != 0 {
hasNodePort = true
@ -1159,7 +1159,7 @@ func (proxier *Proxier) syncProxyRules() {
nodeIPs = nodeIPs[:idx]
// Build IPVS rules for each service.
for svcPortName, svcPort := range proxier.serviceMap {
for svcPortName, svcPort := range proxier.svcPortMap {
svcInfo, ok := svcPort.(*servicePortInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast serviceInfo", "servicePortName", svcPortName)
@ -1913,7 +1913,7 @@ func (proxier *Proxier) createAndLinkKubeChain() {
// This assumes the proxier mutex is held
func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) {
for _, epSvcPair := range connectionMap {
if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
if svcInfo, ok := proxier.svcPortMap[epSvcPair.ServicePortName]; ok && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
svcProto := svcInfo.Protocol()
err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, svcProto)
@ -2003,7 +2003,7 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
// filter endpoints if appropriate feature gates are enabled and the
// Service does not have conflicting configuration such as
// externalTrafficPolicy=Local.
svcInfo, ok := proxier.serviceMap[svcPortName]
svcInfo, ok := proxier.svcPortMap[svcPortName]
if !ok {
klog.InfoS("Unable to filter endpoints due to missing service info", "servicePortName", svcPortName)
} else {

View File

@ -135,7 +135,7 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u
}
p := &Proxier{
exec: fexec,
serviceMap: make(proxy.ServiceMap),
svcPortMap: make(proxy.ServicePortMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, nil, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, nil, ipFamily, nil, nil),
@ -2599,9 +2599,9 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
for i := range services {
fp.OnServiceAdd(services[i])
}
result := fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 12 {
t.Errorf("expected service map length 12, got %v", fp.serviceMap)
result := fp.svcPortMap.Update(fp.serviceChanges)
if len(fp.svcPortMap) != 12 {
t.Errorf("expected service map length 12, got %v", fp.svcPortMap)
}
// The only-local-loadbalancer ones get added
@ -2632,9 +2632,9 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
fp.OnServiceDelete(services[2])
fp.OnServiceDelete(services[3])
result = fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 1 {
t.Errorf("expected service map length 1, got %v", fp.serviceMap)
result = fp.svcPortMap.Update(fp.serviceChanges)
if len(fp.svcPortMap) != 1 {
t.Errorf("expected service map length 1, got %v", fp.svcPortMap)
}
if len(result.HCServiceNodePorts) != 0 {
@ -2679,9 +2679,9 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) {
)
// Headless service should be ignored
result := fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 0 {
t.Errorf("expected service map length 0, got %d", len(fp.serviceMap))
result := fp.svcPortMap.Update(fp.serviceChanges)
if len(fp.svcPortMap) != 0 {
t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap))
}
// No proxied services, so no healthchecks
@ -2709,9 +2709,9 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
}),
)
result := fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 0 {
t.Errorf("expected service map length 0, got %v", fp.serviceMap)
result := fp.svcPortMap.Update(fp.serviceChanges)
if len(fp.svcPortMap) != 0 {
t.Errorf("expected service map length 0, got %v", fp.svcPortMap)
}
// No proxied services, so no healthchecks
if len(result.HCServiceNodePorts) != 0 {
@ -2751,9 +2751,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
fp.OnServiceAdd(servicev1)
result := fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
result := fp.svcPortMap.Update(fp.serviceChanges)
if len(fp.svcPortMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
}
if len(result.HCServiceNodePorts) != 0 {
t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
@ -2765,9 +2765,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
// Change service to load-balancer
fp.OnServiceUpdate(servicev1, servicev2)
result = fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
result = fp.svcPortMap.Update(fp.serviceChanges)
if len(fp.svcPortMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
}
if len(result.HCServiceNodePorts) != 1 {
t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts)
@ -2779,9 +2779,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
// No change; make sure the service map stays the same and there are
// no health-check changes
fp.OnServiceUpdate(servicev2, servicev2)
result = fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
result = fp.svcPortMap.Update(fp.serviceChanges)
if len(fp.svcPortMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
}
if len(result.HCServiceNodePorts) != 1 {
t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts)
@ -2792,9 +2792,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
// And back to ClusterIP
fp.OnServiceUpdate(servicev2, servicev1)
result = fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
result = fp.svcPortMap.Update(fp.serviceChanges)
if len(fp.svcPortMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
}
if len(result.HCServiceNodePorts) != 0 {
t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)

View File

@ -37,11 +37,11 @@ import (
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
)
// BaseServiceInfo contains base information that defines a service.
// BaseServicePortInfo contains base information that defines a service.
// This could be used directly by proxier while processing services,
// or can be used for constructing a more specific ServiceInfo struct
// defined by the proxier if needed.
type BaseServiceInfo struct {
type BaseServicePortInfo struct {
clusterIP net.IP
port int
protocol v1.Protocol
@ -58,106 +58,106 @@ type BaseServiceInfo struct {
hintsAnnotation string
}
var _ ServicePort = &BaseServiceInfo{}
var _ ServicePort = &BaseServicePortInfo{}
// String is part of ServicePort interface.
func (info *BaseServiceInfo) String() string {
return fmt.Sprintf("%s:%d/%s", info.clusterIP, info.port, info.protocol)
func (bsvcPortInfo *BaseServicePortInfo) String() string {
return fmt.Sprintf("%s:%d/%s", bsvcPortInfo.clusterIP, bsvcPortInfo.port, bsvcPortInfo.protocol)
}
// ClusterIP is part of ServicePort interface.
func (info *BaseServiceInfo) ClusterIP() net.IP {
return info.clusterIP
func (bsvcPortInfo *BaseServicePortInfo) ClusterIP() net.IP {
return bsvcPortInfo.clusterIP
}
// Port is part of ServicePort interface.
func (info *BaseServiceInfo) Port() int {
return info.port
func (bsvcPortInfo *BaseServicePortInfo) Port() int {
return bsvcPortInfo.port
}
// SessionAffinityType is part of the ServicePort interface.
func (info *BaseServiceInfo) SessionAffinityType() v1.ServiceAffinity {
return info.sessionAffinityType
func (bsvcPortInfo *BaseServicePortInfo) SessionAffinityType() v1.ServiceAffinity {
return bsvcPortInfo.sessionAffinityType
}
// StickyMaxAgeSeconds is part of the ServicePort interface
func (info *BaseServiceInfo) StickyMaxAgeSeconds() int {
return info.stickyMaxAgeSeconds
func (bsvcPortInfo *BaseServicePortInfo) StickyMaxAgeSeconds() int {
return bsvcPortInfo.stickyMaxAgeSeconds
}
// Protocol is part of ServicePort interface.
func (info *BaseServiceInfo) Protocol() v1.Protocol {
return info.protocol
func (bsvcPortInfo *BaseServicePortInfo) Protocol() v1.Protocol {
return bsvcPortInfo.protocol
}
// LoadBalancerSourceRanges is part of ServicePort interface
func (info *BaseServiceInfo) LoadBalancerSourceRanges() []string {
return info.loadBalancerSourceRanges
func (bsvcPortInfo *BaseServicePortInfo) LoadBalancerSourceRanges() []string {
return bsvcPortInfo.loadBalancerSourceRanges
}
// HealthCheckNodePort is part of ServicePort interface.
func (info *BaseServiceInfo) HealthCheckNodePort() int {
return info.healthCheckNodePort
func (bsvcPortInfo *BaseServicePortInfo) HealthCheckNodePort() int {
return bsvcPortInfo.healthCheckNodePort
}
// NodePort is part of the ServicePort interface.
func (info *BaseServiceInfo) NodePort() int {
return info.nodePort
func (bsvcPortInfo *BaseServicePortInfo) NodePort() int {
return bsvcPortInfo.nodePort
}
// ExternalIPStrings is part of ServicePort interface.
func (info *BaseServiceInfo) ExternalIPStrings() []string {
return info.externalIPs
func (bsvcPortInfo *BaseServicePortInfo) ExternalIPStrings() []string {
return bsvcPortInfo.externalIPs
}
// LoadBalancerIPStrings is part of ServicePort interface.
func (info *BaseServiceInfo) LoadBalancerIPStrings() []string {
func (bsvcPortInfo *BaseServicePortInfo) LoadBalancerIPStrings() []string {
var ips []string
for _, ing := range info.loadBalancerStatus.Ingress {
for _, ing := range bsvcPortInfo.loadBalancerStatus.Ingress {
ips = append(ips, ing.IP)
}
return ips
}
// ExternalPolicyLocal is part of ServicePort interface.
func (info *BaseServiceInfo) ExternalPolicyLocal() bool {
return info.externalPolicyLocal
func (bsvcPortInfo *BaseServicePortInfo) ExternalPolicyLocal() bool {
return bsvcPortInfo.externalPolicyLocal
}
// InternalPolicyLocal is part of ServicePort interface
func (info *BaseServiceInfo) InternalPolicyLocal() bool {
return info.internalPolicyLocal
func (bsvcPortInfo *BaseServicePortInfo) InternalPolicyLocal() bool {
return bsvcPortInfo.internalPolicyLocal
}
// InternalTrafficPolicy is part of ServicePort interface
func (info *BaseServiceInfo) InternalTrafficPolicy() *v1.ServiceInternalTrafficPolicyType {
return info.internalTrafficPolicy
func (bsvcPortInfo *BaseServicePortInfo) InternalTrafficPolicy() *v1.ServiceInternalTrafficPolicyType {
return bsvcPortInfo.internalTrafficPolicy
}
// HintsAnnotation is part of ServicePort interface.
func (info *BaseServiceInfo) HintsAnnotation() string {
return info.hintsAnnotation
func (bsvcPortInfo *BaseServicePortInfo) HintsAnnotation() string {
return bsvcPortInfo.hintsAnnotation
}
// ExternallyAccessible is part of ServicePort interface.
func (info *BaseServiceInfo) ExternallyAccessible() bool {
return info.nodePort != 0 || len(info.loadBalancerStatus.Ingress) != 0 || len(info.externalIPs) != 0
func (bsvcPortInfo *BaseServicePortInfo) ExternallyAccessible() bool {
return bsvcPortInfo.nodePort != 0 || len(bsvcPortInfo.loadBalancerStatus.Ingress) != 0 || len(bsvcPortInfo.externalIPs) != 0
}
// UsesClusterEndpoints is part of ServicePort interface.
func (info *BaseServiceInfo) UsesClusterEndpoints() bool {
func (bsvcPortInfo *BaseServicePortInfo) UsesClusterEndpoints() bool {
// The service port uses Cluster endpoints if the internal traffic policy is "Cluster",
// or if it accepts external traffic at all. (Even if the external traffic policy is
// "Local", we need Cluster endpoints to implement short circuiting.)
return !info.internalPolicyLocal || info.ExternallyAccessible()
return !bsvcPortInfo.internalPolicyLocal || bsvcPortInfo.ExternallyAccessible()
}
// UsesLocalEndpoints is part of ServicePort interface.
func (info *BaseServiceInfo) UsesLocalEndpoints() bool {
return info.internalPolicyLocal || (info.externalPolicyLocal && info.ExternallyAccessible())
func (bsvcPortInfo *BaseServicePortInfo) UsesLocalEndpoints() bool {
return bsvcPortInfo.internalPolicyLocal || (bsvcPortInfo.externalPolicyLocal && bsvcPortInfo.ExternallyAccessible())
}
func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, service *v1.Service) *BaseServiceInfo {
func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, service *v1.Service) *BaseServicePortInfo {
externalPolicyLocal := false
if apiservice.ExternalPolicyLocal(service) {
externalPolicyLocal = true
@ -173,7 +173,7 @@ func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, servic
}
clusterIP := utilproxy.GetClusterIPByFamily(sct.ipFamily, service)
info := &BaseServiceInfo{
info := &BaseServicePortInfo{
clusterIP: netutils.ParseIPSloppy(clusterIP),
port: int(port.Port),
protocol: port.Protocol,
@ -245,18 +245,18 @@ func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, servic
return info
}
type makeServicePortFunc func(*v1.ServicePort, *v1.Service, *BaseServiceInfo) ServicePort
type makeServicePortFunc func(*v1.ServicePort, *v1.Service, *BaseServicePortInfo) ServicePort
// This handler is invoked by the apply function on every change. This function should not modify the
// ServiceMap's but just use the changes for any Proxier specific cleanup.
type processServiceMapChangeFunc func(previous, current ServiceMap)
// ServicePortMap's but just use the changes for any Proxier specific cleanup.
type processServiceMapChangeFunc func(previous, current ServicePortMap)
// serviceChange contains all changes to services that happened since proxy rules were synced. For a single object,
// changes are accumulated, i.e. previous is state from before applying the changes,
// current is state after applying all of the changes.
type serviceChange struct {
previous ServiceMap
current ServiceMap
previous ServicePortMap
current ServicePortMap
}
// ServiceChangeTracker carries state about uncommitted changes to an arbitrary number of
@ -352,13 +352,13 @@ type UpdateServiceMapResult struct {
UDPStaleClusterIP sets.String
}
// Update updates ServiceMap base on the given changes.
func (sm ServiceMap) Update(changes *ServiceChangeTracker) (result UpdateServiceMapResult) {
// Update updates ServicePortMap base on the given changes.
func (sm ServicePortMap) Update(changes *ServiceChangeTracker) (result UpdateServiceMapResult) {
result.UDPStaleClusterIP = sets.NewString()
sm.apply(changes, result.UDPStaleClusterIP)
// TODO: If this will appear to be computationally expensive, consider
// computing this incrementally similarly to serviceMap.
// computing this incrementally similarly to svcPortMap.
result.HCServiceNodePorts = make(map[types.NamespacedName]uint16)
for svcPortName, info := range sm {
if info.HealthCheckNodePort() != 0 {
@ -369,13 +369,13 @@ func (sm ServiceMap) Update(changes *ServiceChangeTracker) (result UpdateService
return result
}
// ServiceMap maps a service to its ServicePort.
type ServiceMap map[ServicePortName]ServicePort
// ServicePortMap maps a service to its ServicePort.
type ServicePortMap map[ServicePortName]ServicePort
// serviceToServiceMap translates a single Service object to a ServiceMap.
// serviceToServiceMap translates a single Service object to a ServicePortMap.
//
// NOTE: service object should NOT be modified.
func (sct *ServiceChangeTracker) serviceToServiceMap(service *v1.Service) ServiceMap {
func (sct *ServiceChangeTracker) serviceToServiceMap(service *v1.Service) ServicePortMap {
if service == nil {
return nil
}
@ -389,25 +389,25 @@ func (sct *ServiceChangeTracker) serviceToServiceMap(service *v1.Service) Servic
return nil
}
serviceMap := make(ServiceMap)
svcPortMap := make(ServicePortMap)
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
svcPortName := ServicePortName{NamespacedName: svcName, Port: servicePort.Name, Protocol: servicePort.Protocol}
baseSvcInfo := sct.newBaseServiceInfo(servicePort, service)
if sct.makeServiceInfo != nil {
serviceMap[svcPortName] = sct.makeServiceInfo(servicePort, service, baseSvcInfo)
svcPortMap[svcPortName] = sct.makeServiceInfo(servicePort, service, baseSvcInfo)
} else {
serviceMap[svcPortName] = baseSvcInfo
svcPortMap[svcPortName] = baseSvcInfo
}
}
return serviceMap
return svcPortMap
}
// apply the changes to ServiceMap and update the stale udp cluster IP set. The UDPStaleClusterIP argument is passed in to store the
// udp protocol service cluster ip when service is deleted from the ServiceMap.
// apply the changes to ServicePortMap and update the stale udp cluster IP set. The UDPStaleClusterIP argument is passed in to store the
// udp protocol service cluster ip when service is deleted from the ServicePortMap.
// apply triggers processServiceMapChange on every change.
func (sm *ServiceMap) apply(changes *ServiceChangeTracker, UDPStaleClusterIP sets.String) {
func (sm *ServicePortMap) apply(changes *ServiceChangeTracker, UDPStaleClusterIP sets.String) {
changes.lock.Lock()
defer changes.lock.Unlock()
for _, change := range changes.items {
@ -420,20 +420,20 @@ func (sm *ServiceMap) apply(changes *ServiceChangeTracker, UDPStaleClusterIP set
change.previous.filter(change.current)
sm.unmerge(change.previous, UDPStaleClusterIP)
}
// clear changes after applying them to ServiceMap.
// clear changes after applying them to ServicePortMap.
changes.items = make(map[types.NamespacedName]*serviceChange)
metrics.ServiceChangesPending.Set(0)
}
// merge adds other ServiceMap's elements to current ServiceMap.
// merge adds other ServicePortMap's elements to current ServicePortMap.
// If collision, other ALWAYS win. Otherwise add the other to current.
// In other words, if some elements in current collisions with other, update the current by other.
// It returns a string type set which stores all the newly merged services' identifier, ServicePortName.String(), to help users
// tell if a service is deleted or updated.
// The returned value is one of the arguments of ServiceMap.unmerge().
// ServiceMap A Merge ServiceMap B will do following 2 things:
// - update ServiceMap A.
// - produce a string set which stores all other ServiceMap's ServicePortName.String().
// The returned value is one of the arguments of ServicePortMap.unmerge().
// ServicePortMap A Merge ServicePortMap B will do following 2 things:
// - update ServicePortMap A.
// - produce a string set which stores all other ServicePortMap's ServicePortName.String().
//
// For example,
// - A{}
@ -444,8 +444,8 @@ func (sm *ServiceMap) apply(changes *ServiceChangeTracker, UDPStaleClusterIP set
// - B{{"ns", "cluster-ip", "http"}: {"172.16.55.10", 1234, "TCP"}}
// - A updated to be {{"ns", "cluster-ip", "http"}: {"172.16.55.10", 1234, "TCP"}}
// - produce string set {"ns/cluster-ip:http"}
func (sm *ServiceMap) merge(other ServiceMap) sets.String {
// existingPorts is going to store all identifiers of all services in `other` ServiceMap.
func (sm *ServicePortMap) merge(other ServicePortMap) sets.String {
// existingPorts is going to store all identifiers of all services in `other` ServicePortMap.
existingPorts := sets.NewString()
for svcPortName, info := range other {
// Take ServicePortName.String() as the newly merged service's identifier and put it into existingPorts.
@ -461,8 +461,8 @@ func (sm *ServiceMap) merge(other ServiceMap) sets.String {
return existingPorts
}
// filter filters out elements from ServiceMap base on given ports string sets.
func (sm *ServiceMap) filter(other ServiceMap) {
// filter filters out elements from ServicePortMap base on given ports string sets.
func (sm *ServicePortMap) filter(other ServicePortMap) {
for svcPortName := range *sm {
// skip the delete for Update event.
if _, ok := other[svcPortName]; ok {
@ -471,9 +471,9 @@ func (sm *ServiceMap) filter(other ServiceMap) {
}
}
// unmerge deletes all other ServiceMap's elements from current ServiceMap. We pass in the UDPStaleClusterIP strings sets
// unmerge deletes all other ServicePortMap's elements from current ServicePortMap. We pass in the UDPStaleClusterIP strings sets
// for storing the stale udp service cluster IPs. We will clear stale udp connection base on UDPStaleClusterIP later
func (sm *ServiceMap) unmerge(other ServiceMap, UDPStaleClusterIP sets.String) {
func (sm *ServicePortMap) unmerge(other ServicePortMap, UDPStaleClusterIP sets.String) {
for svcPortName := range other {
info, exists := (*sm)[svcPortName]
if exists {

View File

@ -33,19 +33,19 @@ import (
const testHostname = "test-hostname"
func makeTestServiceInfo(clusterIP string, port int, protocol string, healthcheckNodePort int, svcInfoFuncs ...func(*BaseServiceInfo)) *BaseServiceInfo {
info := &BaseServiceInfo{
func makeTestServiceInfo(clusterIP string, port int, protocol string, healthcheckNodePort int, svcInfoFuncs ...func(*BaseServicePortInfo)) *BaseServicePortInfo {
bsvcPortInfo := &BaseServicePortInfo{
clusterIP: netutils.ParseIPSloppy(clusterIP),
port: port,
protocol: v1.Protocol(protocol),
}
if healthcheckNodePort != 0 {
info.healthCheckNodePort = healthcheckNodePort
bsvcPortInfo.healthCheckNodePort = healthcheckNodePort
}
for _, svcInfoFunc := range svcInfoFuncs {
svcInfoFunc(info)
svcInfoFunc(bsvcPortInfo)
}
return info
return bsvcPortInfo
}
func makeTestService(namespace, name string, svcFunc func(*v1.Service)) *v1.Service {
@ -96,7 +96,7 @@ func TestServiceToServiceMap(t *testing.T) {
testCases := []struct {
desc string
service *v1.Service
expected map[ServicePortName]*BaseServiceInfo
expected map[ServicePortName]*BaseServicePortInfo
ipFamily v1.IPFamily
}{
{
@ -104,7 +104,7 @@ func TestServiceToServiceMap(t *testing.T) {
ipFamily: v1.IPv4Protocol,
service: nil,
expected: map[ServicePortName]*BaseServiceInfo{},
expected: map[ServicePortName]*BaseServicePortInfo{},
},
{
desc: "headless service",
@ -115,7 +115,7 @@ func TestServiceToServiceMap(t *testing.T) {
svc.Spec.ClusterIP = v1.ClusterIPNone
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "rpc", "UDP", 1234, 0, 0)
}),
expected: map[ServicePortName]*BaseServiceInfo{},
expected: map[ServicePortName]*BaseServicePortInfo{},
},
{
desc: "headless sctp service",
@ -126,7 +126,7 @@ func TestServiceToServiceMap(t *testing.T) {
svc.Spec.ClusterIP = v1.ClusterIPNone
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "sip", "SCTP", 7777, 0, 0)
}),
expected: map[ServicePortName]*BaseServiceInfo{},
expected: map[ServicePortName]*BaseServicePortInfo{},
},
{
desc: "headless service without port",
@ -136,7 +136,7 @@ func TestServiceToServiceMap(t *testing.T) {
svc.Spec.Type = v1.ServiceTypeClusterIP
svc.Spec.ClusterIP = v1.ClusterIPNone
}),
expected: map[ServicePortName]*BaseServiceInfo{},
expected: map[ServicePortName]*BaseServicePortInfo{},
},
{
desc: "cluster ip service",
@ -148,7 +148,7 @@ func TestServiceToServiceMap(t *testing.T) {
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "p1", "UDP", 1234, 4321, 0)
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "p2", "UDP", 1235, 5321, 0)
}),
expected: map[ServicePortName]*BaseServiceInfo{
expected: map[ServicePortName]*BaseServicePortInfo{
makeServicePortName("ns2", "cluster-ip", "p1", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.4", 1234, "UDP", 0),
makeServicePortName("ns2", "cluster-ip", "p2", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.4", 1235, "UDP", 0),
},
@ -163,7 +163,7 @@ func TestServiceToServiceMap(t *testing.T) {
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port1", "UDP", 345, 678, 0)
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port2", "TCP", 344, 677, 0)
}),
expected: map[ServicePortName]*BaseServiceInfo{
expected: map[ServicePortName]*BaseServicePortInfo{
makeServicePortName("ns2", "node-port", "port1", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.10", 345, "UDP", 0),
makeServicePortName("ns2", "node-port", "port2", v1.ProtocolTCP): makeTestServiceInfo("172.16.55.10", 344, "TCP", 0),
},
@ -180,12 +180,12 @@ func TestServiceToServiceMap(t *testing.T) {
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port4", "UDP", 8676, 30062, 7001)
svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{IP: "10.1.2.4"}}
}),
expected: map[ServicePortName]*BaseServiceInfo{
makeServicePortName("ns1", "load-balancer", "port3", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.11", 8675, "UDP", 0, func(info *BaseServiceInfo) {
info.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: "10.1.2.4"}}
expected: map[ServicePortName]*BaseServicePortInfo{
makeServicePortName("ns1", "load-balancer", "port3", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.11", 8675, "UDP", 0, func(bsvcPortInfo *BaseServicePortInfo) {
bsvcPortInfo.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: "10.1.2.4"}}
}),
makeServicePortName("ns1", "load-balancer", "port4", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.11", 8676, "UDP", 0, func(info *BaseServiceInfo) {
info.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: "10.1.2.4"}}
makeServicePortName("ns1", "load-balancer", "port4", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.11", 8676, "UDP", 0, func(bsvcPortInfo *BaseServicePortInfo) {
bsvcPortInfo.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: "10.1.2.4"}}
}),
},
},
@ -203,12 +203,12 @@ func TestServiceToServiceMap(t *testing.T) {
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
svc.Spec.HealthCheckNodePort = 345
}),
expected: map[ServicePortName]*BaseServiceInfo{
makeServicePortName("ns1", "only-local-load-balancer", "portx", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.12", 8677, "UDP", 345, func(info *BaseServiceInfo) {
info.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: "10.1.2.3"}}
expected: map[ServicePortName]*BaseServicePortInfo{
makeServicePortName("ns1", "only-local-load-balancer", "portx", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.12", 8677, "UDP", 345, func(bsvcPortInfo *BaseServicePortInfo) {
bsvcPortInfo.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: "10.1.2.3"}}
}),
makeServicePortName("ns1", "only-local-load-balancer", "porty", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.12", 8678, "UDP", 345, func(info *BaseServiceInfo) {
info.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: "10.1.2.3"}}
makeServicePortName("ns1", "only-local-load-balancer", "porty", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.12", 8678, "UDP", 345, func(bsvcPortInfo *BaseServicePortInfo) {
bsvcPortInfo.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: "10.1.2.3"}}
}),
},
},
@ -222,7 +222,7 @@ func TestServiceToServiceMap(t *testing.T) {
svc.Spec.ExternalName = "foo2.bar.com"
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "portz", "UDP", 1235, 5321, 0)
}),
expected: map[ServicePortName]*BaseServiceInfo{},
expected: map[ServicePortName]*BaseServicePortInfo{},
},
{
desc: "service with ipv6 clusterIP under ipv4 mode, service should be filtered",
@ -312,11 +312,11 @@ func TestServiceToServiceMap(t *testing.T) {
},
},
},
expected: map[ServicePortName]*BaseServiceInfo{
makeServicePortName("test", "validIPv4", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(info *BaseServiceInfo) {
info.externalIPs = []string{testExternalIPv4}
info.loadBalancerSourceRanges = []string{testSourceRangeIPv4}
info.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: testExternalIPv4}}
expected: map[ServicePortName]*BaseServicePortInfo{
makeServicePortName("test", "validIPv4", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(bsvcPortInfo *BaseServicePortInfo) {
bsvcPortInfo.externalIPs = []string{testExternalIPv4}
bsvcPortInfo.loadBalancerSourceRanges = []string{testSourceRangeIPv4}
bsvcPortInfo.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: testExternalIPv4}}
}),
},
},
@ -350,11 +350,11 @@ func TestServiceToServiceMap(t *testing.T) {
},
},
},
expected: map[ServicePortName]*BaseServiceInfo{
makeServicePortName("test", "validIPv6", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv6, 12345, "TCP", 0, func(info *BaseServiceInfo) {
info.externalIPs = []string{testExternalIPv6}
info.loadBalancerSourceRanges = []string{testSourceRangeIPv6}
info.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: testExternalIPv6}}
expected: map[ServicePortName]*BaseServicePortInfo{
makeServicePortName("test", "validIPv6", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv6, 12345, "TCP", 0, func(bsvcPortInfo *BaseServicePortInfo) {
bsvcPortInfo.externalIPs = []string{testExternalIPv6}
bsvcPortInfo.loadBalancerSourceRanges = []string{testSourceRangeIPv6}
bsvcPortInfo.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: testExternalIPv6}}
}),
},
},
@ -388,11 +388,11 @@ func TestServiceToServiceMap(t *testing.T) {
},
},
},
expected: map[ServicePortName]*BaseServiceInfo{
makeServicePortName("test", "filterIPv6InIPV4Mode", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(info *BaseServiceInfo) {
info.externalIPs = []string{testExternalIPv4}
info.loadBalancerSourceRanges = []string{testSourceRangeIPv4}
info.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: testExternalIPv4}}
expected: map[ServicePortName]*BaseServicePortInfo{
makeServicePortName("test", "filterIPv6InIPV4Mode", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(bsvcPortInfo *BaseServicePortInfo) {
bsvcPortInfo.externalIPs = []string{testExternalIPv4}
bsvcPortInfo.loadBalancerSourceRanges = []string{testSourceRangeIPv4}
bsvcPortInfo.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: testExternalIPv4}}
}),
},
},
@ -426,11 +426,11 @@ func TestServiceToServiceMap(t *testing.T) {
},
},
},
expected: map[ServicePortName]*BaseServiceInfo{
makeServicePortName("test", "filterIPv4InIPV6Mode", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv6, 12345, "TCP", 0, func(info *BaseServiceInfo) {
info.externalIPs = []string{testExternalIPv6}
info.loadBalancerSourceRanges = []string{testSourceRangeIPv6}
info.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: testExternalIPv6}}
expected: map[ServicePortName]*BaseServicePortInfo{
makeServicePortName("test", "filterIPv4InIPV6Mode", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv6, 12345, "TCP", 0, func(bsvcPortInfo *BaseServicePortInfo) {
bsvcPortInfo.externalIPs = []string{testExternalIPv6}
bsvcPortInfo.loadBalancerSourceRanges = []string{testSourceRangeIPv6}
bsvcPortInfo.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: testExternalIPv6}}
}),
},
},
@ -455,9 +455,9 @@ func TestServiceToServiceMap(t *testing.T) {
},
},
},
expected: map[ServicePortName]*BaseServiceInfo{
makeServicePortName("test", "extra-space", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(info *BaseServiceInfo) {
info.loadBalancerSourceRanges = []string{"10.1.2.0/28"}
expected: map[ServicePortName]*BaseServicePortInfo{
makeServicePortName("test", "extra-space", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(bsvcPortInfo *BaseServicePortInfo) {
bsvcPortInfo.loadBalancerSourceRanges = []string{"10.1.2.0/28"}
}),
},
},
@ -473,7 +473,7 @@ func TestServiceToServiceMap(t *testing.T) {
t.Fatalf("expected %d new, got %d: %v", len(tc.expected), len(newServices), spew.Sdump(newServices))
}
for svcKey, expectedInfo := range tc.expected {
svcInfo, exists := newServices[svcKey].(*BaseServiceInfo)
svcInfo, exists := newServices[svcKey].(*BaseServicePortInfo)
if !exists {
t.Fatalf("[%s] expected to find key %s", tc.desc, svcKey)
}
@ -488,7 +488,7 @@ func TestServiceToServiceMap(t *testing.T) {
t.Errorf("[%s] expected new[%v]to be %v, got %v", tc.desc, svcKey, expectedInfo, *svcInfo)
}
for svcKey, expectedInfo := range tc.expected {
svcInfo, _ := newServices[svcKey].(*BaseServiceInfo)
svcInfo, _ := newServices[svcKey].(*BaseServicePortInfo)
if !svcInfo.clusterIP.Equal(expectedInfo.clusterIP) ||
svcInfo.port != expectedInfo.port ||
svcInfo.protocol != expectedInfo.protocol ||
@ -507,14 +507,14 @@ func TestServiceToServiceMap(t *testing.T) {
type FakeProxier struct {
endpointsChanges *EndpointChangeTracker
serviceChanges *ServiceChangeTracker
serviceMap ServiceMap
svcPortMap ServicePortMap
endpointsMap EndpointsMap
hostname string
}
func newFakeProxier(ipFamily v1.IPFamily, t time.Time) *FakeProxier {
return &FakeProxier{
serviceMap: make(ServiceMap),
svcPortMap: make(ServicePortMap),
serviceChanges: NewServiceChangeTracker(nil, ipFamily, nil, nil),
endpointsMap: make(EndpointsMap),
endpointsChanges: &EndpointChangeTracker{
@ -568,9 +568,9 @@ func TestServiceMapUpdateHeadless(t *testing.T) {
if pending.Len() != 0 {
t.Errorf("expected 0 pending service changes, got %d", pending.Len())
}
result := fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 0 {
t.Errorf("expected service map length 0, got %d", len(fp.serviceMap))
result := fp.svcPortMap.Update(fp.serviceChanges)
if len(fp.svcPortMap) != 0 {
t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap))
}
// No proxied services, so no healthchecks
@ -599,9 +599,9 @@ func TestUpdateServiceTypeExternalName(t *testing.T) {
if pending.Len() != 0 {
t.Errorf("expected 0 pending service changes, got %d", pending.Len())
}
result := fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 0 {
t.Errorf("expected service map length 0, got %v", fp.serviceMap)
result := fp.svcPortMap.Update(fp.serviceChanges)
if len(fp.svcPortMap) != 0 {
t.Errorf("expected service map length 0, got %v", fp.svcPortMap)
}
// No proxied services, so no healthchecks
if len(result.HCServiceNodePorts) != 0 {
@ -671,9 +671,9 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
t.Errorf("expected %d pending service changes, got %d", len(services), pending.Len())
}
result := fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 8 {
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
result := fp.svcPortMap.Update(fp.serviceChanges)
if len(fp.svcPortMap) != 8 {
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
}
// The only-local-loadbalancer ones get added
@ -708,9 +708,9 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
if pending.Len() != 4 {
t.Errorf("expected 4 pending service changes, got %d", pending.Len())
}
result = fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 1 {
t.Errorf("expected service map length 1, got %v", fp.serviceMap)
result = fp.svcPortMap.Update(fp.serviceChanges)
if len(fp.svcPortMap) != 1 {
t.Errorf("expected service map length 1, got %v", fp.svcPortMap)
}
if len(result.HCServiceNodePorts) != 0 {
@ -761,9 +761,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
if pending.Len() != 1 {
t.Errorf("expected 1 pending service change, got %d", pending.Len())
}
result := fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
result := fp.svcPortMap.Update(fp.serviceChanges)
if len(fp.svcPortMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
}
if len(result.HCServiceNodePorts) != 0 {
t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
@ -779,9 +779,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
if pending.Len() != 1 {
t.Errorf("expected 1 pending service change, got %d", pending.Len())
}
result = fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
result = fp.svcPortMap.Update(fp.serviceChanges)
if len(fp.svcPortMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
}
if len(result.HCServiceNodePorts) != 1 {
t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts)
@ -797,9 +797,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
if pending.Len() != 0 {
t.Errorf("expected 0 pending service changes, got %d", pending.Len())
}
result = fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
result = fp.svcPortMap.Update(fp.serviceChanges)
if len(fp.svcPortMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
}
if len(result.HCServiceNodePorts) != 1 {
t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts)
@ -814,9 +814,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
if pending.Len() != 1 {
t.Errorf("expected 1 pending service change, got %d", pending.Len())
}
result = fp.serviceMap.Update(fp.serviceChanges)
if len(fp.serviceMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
result = fp.svcPortMap.Update(fp.serviceChanges)
if len(fp.svcPortMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
}
if len(result.HCServiceNodePorts) != 0 {
t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)

View File

@ -68,7 +68,7 @@ func TestCategorizeEndpoints(t *testing.T) {
name: "hints enabled, hints annotation == auto",
hintsEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServiceInfo{hintsAnnotation: "auto"},
serviceInfo: &BaseServicePortInfo{hintsAnnotation: "auto"},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true},
@ -81,7 +81,7 @@ func TestCategorizeEndpoints(t *testing.T) {
name: "hints, hints annotation == disabled, hints ignored",
hintsEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServiceInfo{hintsAnnotation: "disabled"},
serviceInfo: &BaseServicePortInfo{hintsAnnotation: "disabled"},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true},
@ -94,7 +94,7 @@ func TestCategorizeEndpoints(t *testing.T) {
name: "hints disabled, hints annotation == auto",
hintsEnabled: false,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServiceInfo{hintsAnnotation: "auto"},
serviceInfo: &BaseServicePortInfo{hintsAnnotation: "auto"},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true},
@ -108,7 +108,7 @@ func TestCategorizeEndpoints(t *testing.T) {
name: "hints, hints annotation == aUto (wrong capitalization), hints ignored",
hintsEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServiceInfo{hintsAnnotation: "aUto"},
serviceInfo: &BaseServicePortInfo{hintsAnnotation: "aUto"},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true},
@ -121,7 +121,7 @@ func TestCategorizeEndpoints(t *testing.T) {
name: "hints, hints annotation empty, hints ignored",
hintsEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServiceInfo{},
serviceInfo: &BaseServicePortInfo{},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true},
@ -134,7 +134,7 @@ func TestCategorizeEndpoints(t *testing.T) {
name: "externalTrafficPolicy: Local, topology ignored for Local endpoints",
hintsEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServiceInfo{externalPolicyLocal: true, nodePort: 8080, hintsAnnotation: "auto"},
serviceInfo: &BaseServicePortInfo{externalPolicyLocal: true, nodePort: 8080, hintsAnnotation: "auto"},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true, IsLocal: true},
@ -148,7 +148,7 @@ func TestCategorizeEndpoints(t *testing.T) {
name: "internalTrafficPolicy: Local, topology ignored for Local endpoints",
hintsEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServiceInfo{internalPolicyLocal: true, hintsAnnotation: "auto", externalPolicyLocal: false, nodePort: 8080},
serviceInfo: &BaseServicePortInfo{internalPolicyLocal: true, hintsAnnotation: "auto", externalPolicyLocal: false, nodePort: 8080},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true, IsLocal: true},
@ -162,7 +162,7 @@ func TestCategorizeEndpoints(t *testing.T) {
name: "empty node labels",
hintsEnabled: true,
nodeLabels: map[string]string{},
serviceInfo: &BaseServiceInfo{hintsAnnotation: "auto"},
serviceInfo: &BaseServicePortInfo{hintsAnnotation: "auto"},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
},
@ -172,7 +172,7 @@ func TestCategorizeEndpoints(t *testing.T) {
name: "empty zone label",
hintsEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: ""},
serviceInfo: &BaseServiceInfo{hintsAnnotation: "auto"},
serviceInfo: &BaseServicePortInfo{hintsAnnotation: "auto"},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
},
@ -182,7 +182,7 @@ func TestCategorizeEndpoints(t *testing.T) {
name: "node in different zone, no endpoint filtering",
hintsEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-b"},
serviceInfo: &BaseServiceInfo{hintsAnnotation: "auto"},
serviceInfo: &BaseServicePortInfo{hintsAnnotation: "auto"},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
},
@ -192,7 +192,7 @@ func TestCategorizeEndpoints(t *testing.T) {
name: "normal endpoint filtering, auto annotation",
hintsEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServiceInfo{hintsAnnotation: "auto"},
serviceInfo: &BaseServicePortInfo{hintsAnnotation: "auto"},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true},
@ -205,7 +205,7 @@ func TestCategorizeEndpoints(t *testing.T) {
name: "unready endpoint",
hintsEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServiceInfo{hintsAnnotation: "auto"},
serviceInfo: &BaseServicePortInfo{hintsAnnotation: "auto"},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true},
@ -218,7 +218,7 @@ func TestCategorizeEndpoints(t *testing.T) {
name: "only unready endpoints in same zone (should not filter)",
hintsEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServiceInfo{hintsAnnotation: "auto"},
serviceInfo: &BaseServicePortInfo{hintsAnnotation: "auto"},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: false},
&BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true},
@ -231,7 +231,7 @@ func TestCategorizeEndpoints(t *testing.T) {
name: "normal endpoint filtering, Auto annotation",
hintsEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServiceInfo{hintsAnnotation: "Auto"},
serviceInfo: &BaseServicePortInfo{hintsAnnotation: "Auto"},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true},
@ -244,7 +244,7 @@ func TestCategorizeEndpoints(t *testing.T) {
name: "hintsAnnotation empty, no filtering applied",
hintsEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServiceInfo{hintsAnnotation: ""},
serviceInfo: &BaseServicePortInfo{hintsAnnotation: ""},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true},
@ -257,7 +257,7 @@ func TestCategorizeEndpoints(t *testing.T) {
name: "hintsAnnotation disabled, no filtering applied",
hintsEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServiceInfo{hintsAnnotation: "disabled"},
serviceInfo: &BaseServicePortInfo{hintsAnnotation: "disabled"},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true},
@ -270,7 +270,7 @@ func TestCategorizeEndpoints(t *testing.T) {
name: "missing hints, no filtering applied",
hintsEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServiceInfo{hintsAnnotation: "auto"},
serviceInfo: &BaseServicePortInfo{hintsAnnotation: "auto"},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true},
@ -283,7 +283,7 @@ func TestCategorizeEndpoints(t *testing.T) {
name: "multiple hints per endpoint, filtering includes any endpoint with zone included",
hintsEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-c"},
serviceInfo: &BaseServiceInfo{hintsAnnotation: "auto"},
serviceInfo: &BaseServicePortInfo{hintsAnnotation: "auto"},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a", "zone-b", "zone-c"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b", "zone-c"), Ready: true},
@ -296,7 +296,7 @@ func TestCategorizeEndpoints(t *testing.T) {
name: "conflicting topology and localness require merging allEndpoints",
hintsEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServiceInfo{internalPolicyLocal: false, externalPolicyLocal: true, nodePort: 8080, hintsAnnotation: "auto"},
serviceInfo: &BaseServicePortInfo{internalPolicyLocal: false, externalPolicyLocal: true, nodePort: 8080, hintsAnnotation: "auto"},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", ZoneHints: sets.NewString("zone-a"), Ready: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", ZoneHints: sets.NewString("zone-b"), Ready: true, IsLocal: true},
@ -308,13 +308,13 @@ func TestCategorizeEndpoints(t *testing.T) {
allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80", "10.0.0.2:80"),
}, {
name: "internalTrafficPolicy: Local, with empty endpoints",
serviceInfo: &BaseServiceInfo{internalPolicyLocal: true},
serviceInfo: &BaseServicePortInfo{internalPolicyLocal: true},
endpoints: []Endpoint{},
clusterEndpoints: nil,
localEndpoints: sets.NewString(),
}, {
name: "internalTrafficPolicy: Local, but all endpoints are remote",
serviceInfo: &BaseServiceInfo{internalPolicyLocal: true},
serviceInfo: &BaseServicePortInfo{internalPolicyLocal: true},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false},
@ -324,7 +324,7 @@ func TestCategorizeEndpoints(t *testing.T) {
onlyRemoteEndpoints: true,
}, {
name: "internalTrafficPolicy: Local, all endpoints are local",
serviceInfo: &BaseServiceInfo{internalPolicyLocal: true},
serviceInfo: &BaseServicePortInfo{internalPolicyLocal: true},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: true},
@ -333,7 +333,7 @@ func TestCategorizeEndpoints(t *testing.T) {
localEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
}, {
name: "internalTrafficPolicy: Local, some endpoints are local",
serviceInfo: &BaseServiceInfo{internalPolicyLocal: true},
serviceInfo: &BaseServicePortInfo{internalPolicyLocal: true},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false},
@ -342,7 +342,7 @@ func TestCategorizeEndpoints(t *testing.T) {
localEndpoints: sets.NewString("10.0.0.0:80"),
}, {
name: "Cluster traffic policy, endpoints not Ready",
serviceInfo: &BaseServiceInfo{},
serviceInfo: &BaseServicePortInfo{},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false},
@ -351,7 +351,7 @@ func TestCategorizeEndpoints(t *testing.T) {
localEndpoints: nil,
}, {
name: "Cluster traffic policy, some endpoints are Ready",
serviceInfo: &BaseServiceInfo{},
serviceInfo: &BaseServicePortInfo{},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true},
@ -361,7 +361,7 @@ func TestCategorizeEndpoints(t *testing.T) {
}, {
name: "Cluster traffic policy, PTE enabled, all endpoints are terminating",
pteEnabled: true,
serviceInfo: &BaseServiceInfo{},
serviceInfo: &BaseServicePortInfo{},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false, Serving: true, Terminating: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: true, Terminating: true, IsLocal: false},
@ -371,7 +371,7 @@ func TestCategorizeEndpoints(t *testing.T) {
}, {
name: "Cluster traffic policy, PTE disabled, all endpoints are terminating",
pteEnabled: false,
serviceInfo: &BaseServiceInfo{},
serviceInfo: &BaseServicePortInfo{},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false, Serving: true, Terminating: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: true, Terminating: true, IsLocal: false},
@ -380,7 +380,7 @@ func TestCategorizeEndpoints(t *testing.T) {
localEndpoints: nil,
}, {
name: "iTP: Local, eTP: Cluster, some endpoints local",
serviceInfo: &BaseServiceInfo{internalPolicyLocal: true, externalPolicyLocal: false, nodePort: 8080},
serviceInfo: &BaseServicePortInfo{internalPolicyLocal: true, externalPolicyLocal: false, nodePort: 8080},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false},
@ -390,7 +390,7 @@ func TestCategorizeEndpoints(t *testing.T) {
allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
}, {
name: "iTP: Cluster, eTP: Local, some endpoints local",
serviceInfo: &BaseServiceInfo{internalPolicyLocal: false, externalPolicyLocal: true, nodePort: 8080},
serviceInfo: &BaseServicePortInfo{internalPolicyLocal: false, externalPolicyLocal: true, nodePort: 8080},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false},
@ -400,7 +400,7 @@ func TestCategorizeEndpoints(t *testing.T) {
allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
}, {
name: "iTP: Local, eTP: Local, some endpoints local",
serviceInfo: &BaseServiceInfo{internalPolicyLocal: true, externalPolicyLocal: true, nodePort: 8080},
serviceInfo: &BaseServicePortInfo{internalPolicyLocal: true, externalPolicyLocal: true, nodePort: 8080},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false},
@ -410,7 +410,7 @@ func TestCategorizeEndpoints(t *testing.T) {
allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
}, {
name: "iTP: Local, eTP: Local, all endpoints remote",
serviceInfo: &BaseServiceInfo{internalPolicyLocal: true, externalPolicyLocal: true, nodePort: 8080},
serviceInfo: &BaseServicePortInfo{internalPolicyLocal: true, externalPolicyLocal: true, nodePort: 8080},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false},
@ -420,7 +420,7 @@ func TestCategorizeEndpoints(t *testing.T) {
allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
}, {
name: "iTP: Local, eTP: Local, PTE disabled, all endpoints remote and terminating",
serviceInfo: &BaseServiceInfo{internalPolicyLocal: true, externalPolicyLocal: true, nodePort: 8080},
serviceInfo: &BaseServicePortInfo{internalPolicyLocal: true, externalPolicyLocal: true, nodePort: 8080},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false, Serving: true, Terminating: true, IsLocal: false},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: true, Terminating: true, IsLocal: false},
@ -431,7 +431,7 @@ func TestCategorizeEndpoints(t *testing.T) {
}, {
name: "iTP: Local, eTP: Local, PTE enabled, all endpoints remote and terminating",
pteEnabled: true,
serviceInfo: &BaseServiceInfo{internalPolicyLocal: true, externalPolicyLocal: true, nodePort: 8080},
serviceInfo: &BaseServicePortInfo{internalPolicyLocal: true, externalPolicyLocal: true, nodePort: 8080},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false, Serving: true, Terminating: true, IsLocal: false},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: true, Terminating: true, IsLocal: false},
@ -442,7 +442,7 @@ func TestCategorizeEndpoints(t *testing.T) {
onlyRemoteEndpoints: true,
}, {
name: "iTP: Cluster, eTP: Local, PTE disabled, with terminating endpoints",
serviceInfo: &BaseServiceInfo{internalPolicyLocal: false, externalPolicyLocal: true, nodePort: 8080},
serviceInfo: &BaseServicePortInfo{internalPolicyLocal: false, externalPolicyLocal: true, nodePort: 8080},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: false, IsLocal: true},
@ -455,7 +455,7 @@ func TestCategorizeEndpoints(t *testing.T) {
}, {
name: "iTP: Cluster, eTP: Local, PTE enabled, with terminating endpoints",
pteEnabled: true,
serviceInfo: &BaseServiceInfo{internalPolicyLocal: false, externalPolicyLocal: true, nodePort: 8080},
serviceInfo: &BaseServicePortInfo{internalPolicyLocal: false, externalPolicyLocal: true, nodePort: 8080},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: false, IsLocal: true},
@ -467,7 +467,7 @@ func TestCategorizeEndpoints(t *testing.T) {
allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.2:80"),
}, {
name: "externalTrafficPolicy ignored if not externally accessible",
serviceInfo: &BaseServiceInfo{externalPolicyLocal: true},
serviceInfo: &BaseServicePortInfo{externalPolicyLocal: true},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: true},
@ -477,7 +477,7 @@ func TestCategorizeEndpoints(t *testing.T) {
allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
}, {
name: "no cluster endpoints for iTP:Local internal-only service",
serviceInfo: &BaseServiceInfo{internalPolicyLocal: true},
serviceInfo: &BaseServicePortInfo{internalPolicyLocal: true},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: true},

View File

@ -116,7 +116,7 @@ type loadBalancerFlags struct {
// internal struct for string service information
type serviceInfo struct {
*proxy.BaseServiceInfo
*proxy.BaseServicePortInfo
targetPort int
externalIPs []*externalIPInfo
loadBalancerIngressIPs []*loadBalancerIngressInfo
@ -323,7 +323,7 @@ func (proxier *Proxier) endpointsMapChange(oldEndpointsMap, newEndpointsMap prox
func (proxier *Proxier) onEndpointsMapChange(svcPortName *proxy.ServicePortName) {
svc, exists := proxier.serviceMap[*svcPortName]
svc, exists := proxier.svcPortMap[*svcPortName]
if exists {
svcInfo, ok := svc.(*serviceInfo)
@ -355,7 +355,7 @@ func (proxier *Proxier) onEndpointsMapChange(svcPortName *proxy.ServicePortName)
}
}
func (proxier *Proxier) serviceMapChange(previous, current proxy.ServiceMap) {
func (proxier *Proxier) serviceMapChange(previous, current proxy.ServicePortMap) {
for svcPortName := range current {
proxier.onServiceMapChange(&svcPortName)
}
@ -370,7 +370,7 @@ func (proxier *Proxier) serviceMapChange(previous, current proxy.ServiceMap) {
func (proxier *Proxier) onServiceMapChange(svcPortName *proxy.ServicePortName) {
svc, exists := proxier.serviceMap[*svcPortName]
svc, exists := proxier.svcPortMap[*svcPortName]
if exists {
svcInfo, ok := svc.(*serviceInfo)
@ -458,8 +458,8 @@ func (refCountMap endPointsReferenceCountMap) getRefCount(hnsID string) *uint16
}
// returns a new proxy.ServicePort which abstracts a serviceInfo
func (proxier *Proxier) newServiceInfo(port *v1.ServicePort, service *v1.Service, baseInfo *proxy.BaseServiceInfo) proxy.ServicePort {
info := &serviceInfo{BaseServiceInfo: baseInfo}
func (proxier *Proxier) newServiceInfo(port *v1.ServicePort, service *v1.Service, bsvcPortInfo *proxy.BaseServicePortInfo) proxy.ServicePort {
info := &serviceInfo{BaseServicePortInfo: bsvcPortInfo}
preserveDIP := service.Annotations["preserve-destination"] == "true"
localTrafficDSR := service.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal
err := hcn.DSRSupported()
@ -525,7 +525,7 @@ type Proxier struct {
serviceChanges *proxy.ServiceChangeTracker
endPointsRefCount endPointsReferenceCountMap
mu sync.Mutex // protects the following fields
serviceMap proxy.ServiceMap
svcPortMap proxy.ServicePortMap
endpointsMap proxy.EndpointsMap
// endpointSlicesSynced and servicesSynced are set to true when corresponding
// objects are synced after startup. This is used to avoid updating hns policies
@ -699,7 +699,7 @@ func NewProxier(
isIPv6 := netutils.IsIPv6(nodeIP)
proxier := &Proxier{
endPointsRefCount: make(endPointsReferenceCountMap),
serviceMap: make(proxy.ServiceMap),
svcPortMap: make(proxy.ServicePortMap),
endpointsMap: make(proxy.EndpointsMap),
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
@ -986,7 +986,7 @@ func (proxier *Proxier) OnEndpointSlicesSynced() {
}
func (proxier *Proxier) cleanupAllPolicies() {
for svcName, svc := range proxier.serviceMap {
for svcName, svc := range proxier.svcPortMap {
svcInfo, ok := svc.(*serviceInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast serviceInfo", "serviceName", svcName)
@ -1050,13 +1050,13 @@ func (proxier *Proxier) syncProxyRules() {
// We assume that if this was called, we really want to sync them,
// even if nothing changed in the meantime. In other words, callers are
// responsible for detecting no-op changes and not calling this function.
serviceUpdateResult := proxier.serviceMap.Update(proxier.serviceChanges)
serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges)
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
staleServices := serviceUpdateResult.UDPStaleClusterIP
// merge stale services gathered from updateEndpointsMap
for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP {
if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP {
klog.V(2).InfoS("Stale udp service", "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP())
staleServices.Insert(svcInfo.ClusterIP().String())
}
@ -1093,7 +1093,7 @@ func (proxier *Proxier) syncProxyRules() {
klog.V(3).InfoS("Syncing Policies")
// Program HNS by adding corresponding policies for each service.
for svcName, svc := range proxier.serviceMap {
for svcName, svc := range proxier.svcPortMap {
svcInfo, ok := svc.(*serviceInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast serviceInfo", "serviceName", svcName)

View File

@ -138,7 +138,7 @@ func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clust
networkType: networkType,
}
proxier := &Proxier{
serviceMap: make(proxy.ServiceMap),
svcPortMap: make(proxy.ServicePortMap),
endpointsMap: make(proxy.EndpointsMap),
clusterCIDR: clusterCIDR,
hostname: testHostName,
@ -201,7 +201,7 @@ func TestCreateServiceVip(t *testing.T) {
proxier.setInitialized(true)
proxier.syncProxyRules()
svc := proxier.serviceMap[svcPortName]
svc := proxier.svcPortMap[svcPortName]
svcInfo, ok := svc.(*serviceInfo)
if !ok {
t.Errorf("Failed to cast serviceInfo %q", svcPortName.String())
@ -707,7 +707,7 @@ func TestCreateLoadBalancer(t *testing.T) {
proxier.setInitialized(true)
proxier.syncProxyRules()
svc := proxier.serviceMap[svcPortName]
svc := proxier.svcPortMap[svcPortName]
svcInfo, ok := svc.(*serviceInfo)
if !ok {
t.Errorf("Failed to cast serviceInfo %q", svcPortName.String())
@ -770,7 +770,7 @@ func TestCreateDsrLoadBalancer(t *testing.T) {
proxier.setInitialized(true)
proxier.syncProxyRules()
svc := proxier.serviceMap[svcPortName]
svc := proxier.svcPortMap[svcPortName]
svcInfo, ok := svc.(*serviceInfo)
if !ok {
t.Errorf("Failed to cast serviceInfo %q", svcPortName.String())
@ -840,7 +840,7 @@ func TestEndpointSlice(t *testing.T) {
proxier.setInitialized(true)
proxier.syncProxyRules()
svc := proxier.serviceMap[svcPortName]
svc := proxier.svcPortMap[svcPortName]
svcInfo, ok := svc.(*serviceInfo)
if !ok {
t.Errorf("Failed to cast serviceInfo %q", svcPortName.String())