From c0392d72e91d4834f863f6caf3a436d31aada139 Mon Sep 17 00:00:00 2001 From: Jacob Tanenbaum Date: Mon, 8 Apr 2019 14:11:36 -0400 Subject: [PATCH] enforce the interface relationship between ServicePort and BaseServiceInfo Currently the BaseServiceInfo struct implements the ServicePort interface, but only uses that interface sometimes. All the elements of BaseServiceInfo are exported and sometimes the interface is used to access them and othertimes not I extended the ServicePort interface so that all relevent values can be accessed through it and unexported all the elements of BaseServiceInfo --- pkg/proxy/iptables/proxier.go | 84 ++++++++++----------- pkg/proxy/iptables/proxier_test.go | 66 ++++++++--------- pkg/proxy/ipvs/proxier.go | 105 +++++++++++++------------- pkg/proxy/service.go | 115 ++++++++++++++++++----------- pkg/proxy/service_test.go | 36 ++++----- pkg/proxy/types.go | 21 ++++-- 6 files changed, 232 insertions(+), 195 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 967f5f0e7fb..d1f9ee5c7f4 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -156,7 +156,7 @@ func newServiceInfo(port *v1.ServicePort, service *v1.Service, baseInfo *proxy.B // Store the following for performance reasons. svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: port.Name} - protocol := strings.ToLower(string(info.Protocol)) + protocol := strings.ToLower(string(info.Protocol())) info.serviceNameString = svcPortName.String() info.servicePortChainName = servicePortChainName(info.serviceNameString, protocol) info.serviceFirewallChainName = serviceFirewallChainName(info.serviceNameString, protocol) @@ -620,14 +620,14 @@ func servicePortEndpointChainName(servicePortName string, protocol string, endpo // TODO: move it to util func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) { for _, epSvcPair := range connectionMap { - if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.GetProtocol() == v1.ProtocolUDP { + if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == v1.ProtocolUDP { endpointIP := utilproxy.IPPart(epSvcPair.Endpoint) - nodePort := svcInfo.GetNodePort() + nodePort := svcInfo.NodePort() var err error if nodePort != 0 { err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, v1.ProtocolUDP) } else { - err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIPString(), endpointIP, v1.ProtocolUDP) + err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP) } if err != nil { klog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err) @@ -689,9 +689,9 @@ func (proxier *Proxier) syncProxyRules() { staleServices := serviceUpdateResult.UDPStaleClusterIP // merge stale services gathered from updateEndpointsMap for _, svcPortName := range endpointUpdateResult.StaleServiceNames { - if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.GetProtocol() == v1.ProtocolUDP { - klog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.ClusterIPString()) - staleServices.Insert(svcInfo.ClusterIPString()) + if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP { + klog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.ClusterIP().String()) + staleServices.Insert(svcInfo.ClusterIP().String()) for _, extIP := range svcInfo.ExternalIPStrings() { staleServices.Insert(extIP) } @@ -820,8 +820,8 @@ func (proxier *Proxier) syncProxyRules() { klog.Errorf("Failed to cast serviceInfo %q", svcName.String()) continue } - isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP) - protocol := strings.ToLower(string(svcInfo.Protocol)) + isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP()) + protocol := strings.ToLower(string(svcInfo.Protocol())) svcNameString := svcInfo.serviceNameString hasEndpoints := len(proxier.endpointsMap[svcName]) > 0 @@ -837,7 +837,7 @@ func (proxier *Proxier) syncProxyRules() { } svcXlbChain := svcInfo.serviceLBChainName - if svcInfo.OnlyNodeLocalEndpoints { + if svcInfo.OnlyNodeLocalEndpoints() { // Only for services request OnlyLocal traffic // create the per-service LB chain, retaining counters if possible. if lbChain, ok := existingNATChains[svcXlbChain]; ok { @@ -854,8 +854,8 @@ func (proxier *Proxier) syncProxyRules() { "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString), "-m", protocol, "-p", protocol, - "-d", utilproxy.ToCIDR(svcInfo.ClusterIP), - "--dport", strconv.Itoa(svcInfo.Port), + "-d", utilproxy.ToCIDR(svcInfo.ClusterIP()), + "--dport", strconv.Itoa(svcInfo.Port()), ) if proxier.masqueradeAll { writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) @@ -874,24 +874,24 @@ func (proxier *Proxier) syncProxyRules() { "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", protocol, "-p", protocol, - "-d", utilproxy.ToCIDR(svcInfo.ClusterIP), - "--dport", strconv.Itoa(svcInfo.Port), + "-d", utilproxy.ToCIDR(svcInfo.ClusterIP()), + "--dport", strconv.Itoa(svcInfo.Port()), "-j", "REJECT", ) } // Capture externalIPs. - for _, externalIP := range svcInfo.ExternalIPs { + for _, externalIP := range svcInfo.ExternalIPStrings() { // If the "external" IP happens to be an IP that is local to this // machine, hold the local port open so no other process can open it // (because the socket might open but it would never work). if local, err := utilproxy.IsLocalIP(externalIP); err != nil { klog.Errorf("can't determine if IP is local, assuming not: %v", err) - } else if local && (svcInfo.GetProtocol() != v1.ProtocolSCTP) { + } else if local && (svcInfo.Protocol() != v1.ProtocolSCTP) { lp := utilproxy.LocalPort{ Description: "externalIP for " + svcNameString, IP: externalIP, - Port: svcInfo.Port, + Port: svcInfo.Port(), Protocol: protocol, } if proxier.portsMap[lp] != nil { @@ -922,7 +922,7 @@ func (proxier *Proxier) syncProxyRules() { "-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcNameString), "-m", protocol, "-p", protocol, "-d", utilproxy.ToCIDR(net.ParseIP(externalIP)), - "--dport", strconv.Itoa(svcInfo.Port), + "--dport", strconv.Itoa(svcInfo.Port()), ) // We have to SNAT packets to external IPs. writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) @@ -946,7 +946,7 @@ func (proxier *Proxier) syncProxyRules() { "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", protocol, "-p", protocol, "-d", utilproxy.ToCIDR(net.ParseIP(externalIP)), - "--dport", strconv.Itoa(svcInfo.Port), + "--dport", strconv.Itoa(svcInfo.Port()), "-j", "REJECT", ) } @@ -954,8 +954,8 @@ func (proxier *Proxier) syncProxyRules() { // Capture load-balancer ingress. fwChain := svcInfo.serviceFirewallChainName - for _, ingress := range svcInfo.LoadBalancerStatus.Ingress { - if ingress.IP != "" { + for _, ingress := range svcInfo.LoadBalancerIPStrings() { + if ingress != "" { if hasEndpoints { // create service firewall chain if chain, ok := existingNATChains[fwChain]; ok { @@ -972,8 +972,8 @@ func (proxier *Proxier) syncProxyRules() { "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString), "-m", protocol, "-p", protocol, - "-d", utilproxy.ToCIDR(net.ParseIP(ingress.IP)), - "--dport", strconv.Itoa(svcInfo.Port), + "-d", utilproxy.ToCIDR(net.ParseIP(ingress)), + "--dport", strconv.Itoa(svcInfo.Port()), ) // jump to service firewall chain writeLine(proxier.natRules, append(args, "-j", string(fwChain))...) @@ -987,18 +987,18 @@ func (proxier *Proxier) syncProxyRules() { chosenChain := svcXlbChain // If we are proxying globally, we need to masquerade in case we cross nodes. // If we are proxying only locally, we can retain the source IP. - if !svcInfo.OnlyNodeLocalEndpoints { + if !svcInfo.OnlyNodeLocalEndpoints() { writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) chosenChain = svcChain } - if len(svcInfo.LoadBalancerSourceRanges) == 0 { + if len(svcInfo.LoadBalancerSourceRanges()) == 0 { // allow all sources, so jump directly to the KUBE-SVC or KUBE-XLB chain writeLine(proxier.natRules, append(args, "-j", string(chosenChain))...) } else { // firewall filter based on each source range allowFromNode := false - for _, src := range svcInfo.LoadBalancerSourceRanges { + for _, src := range svcInfo.LoadBalancerSourceRanges() { writeLine(proxier.natRules, append(args, "-s", src, "-j", string(chosenChain))...) // ignore error because it has been validated _, cidr, _ := net.ParseCIDR(src) @@ -1010,7 +1010,7 @@ func (proxier *Proxier) syncProxyRules() { // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly. // Need to add the following rule to allow request on host. if allowFromNode { - writeLine(proxier.natRules, append(args, "-s", utilproxy.ToCIDR(net.ParseIP(ingress.IP)), "-j", string(chosenChain))...) + writeLine(proxier.natRules, append(args, "-s", utilproxy.ToCIDR(net.ParseIP(ingress)), "-j", string(chosenChain))...) } } @@ -1023,8 +1023,8 @@ func (proxier *Proxier) syncProxyRules() { "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", protocol, "-p", protocol, - "-d", utilproxy.ToCIDR(net.ParseIP(ingress.IP)), - "--dport", strconv.Itoa(svcInfo.Port), + "-d", utilproxy.ToCIDR(net.ParseIP(ingress)), + "--dport", strconv.Itoa(svcInfo.Port()), "-j", "REJECT", ) } @@ -1034,7 +1034,7 @@ func (proxier *Proxier) syncProxyRules() { // Capture nodeports. If we had more than 2 rules it might be // worthwhile to make a new per-service chain for nodeport rules, but // with just 2 rules it ends up being a waste and a cognitive burden. - if svcInfo.NodePort != 0 { + if svcInfo.NodePort() != 0 { // Hold the local port open so no other process can open it // (because the socket might open but it would never work). addresses, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer) @@ -1048,7 +1048,7 @@ func (proxier *Proxier) syncProxyRules() { lp := utilproxy.LocalPort{ Description: "nodePort for " + svcNameString, IP: address, - Port: svcInfo.NodePort, + Port: svcInfo.NodePort(), Protocol: protocol, } if utilproxy.IsZeroCIDR(address) { @@ -1066,7 +1066,7 @@ func (proxier *Proxier) syncProxyRules() { if proxier.portsMap[lp] != nil { klog.V(4).Infof("Port %s was open before and is still needed", lp.String()) replacementPortsMap[lp] = proxier.portsMap[lp] - } else if svcInfo.GetProtocol() != v1.ProtocolSCTP { + } else if svcInfo.Protocol() != v1.ProtocolSCTP { socket, err := proxier.portMapper.OpenLocalPort(&lp) if err != nil { klog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err) @@ -1091,9 +1091,9 @@ func (proxier *Proxier) syncProxyRules() { "-A", string(kubeNodePortsChain), "-m", "comment", "--comment", svcNameString, "-m", protocol, "-p", protocol, - "--dport", strconv.Itoa(svcInfo.NodePort), + "--dport", strconv.Itoa(svcInfo.NodePort()), ) - if !svcInfo.OnlyNodeLocalEndpoints { + if !svcInfo.OnlyNodeLocalEndpoints() { // Nodeports need SNAT, unless they're local. writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) // Jump to the service chain. @@ -1117,7 +1117,7 @@ func (proxier *Proxier) syncProxyRules() { "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", "addrtype", "--dst-type", "LOCAL", "-m", protocol, "-p", protocol, - "--dport", strconv.Itoa(svcInfo.NodePort), + "--dport", strconv.Itoa(svcInfo.NodePort()), "-j", "REJECT", ) } @@ -1153,7 +1153,7 @@ func (proxier *Proxier) syncProxyRules() { } // First write session affinity rules, if applicable. - if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP { + if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { for _, endpointChain := range endpointChains { args = append(args[:0], "-A", string(svcChain), @@ -1161,7 +1161,7 @@ func (proxier *Proxier) syncProxyRules() { proxier.appendServiceCommentLocked(args, svcNameString) args = append(args, "-m", "recent", "--name", string(endpointChain), - "--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds), "--reap", + "--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds()), "--reap", "-j", string(endpointChain), ) writeLine(proxier.natRules, args...) @@ -1174,7 +1174,7 @@ func (proxier *Proxier) syncProxyRules() { localEndpointChains := make([]utiliptables.Chain, 0) for i, endpointChain := range endpointChains { // Write ingress loadbalancing & DNAT rules only for services that request OnlyLocal traffic. - if svcInfo.OnlyNodeLocalEndpoints && endpoints[i].IsLocal { + if svcInfo.OnlyNodeLocalEndpoints() && endpoints[i].IsLocal { // These slices parallel each other; must be kept in sync localEndpoints = append(localEndpoints, endpoints[i]) localEndpointChains = append(localEndpointChains, endpointChains[i]) @@ -1207,7 +1207,7 @@ func (proxier *Proxier) syncProxyRules() { "-s", utilproxy.ToCIDR(net.ParseIP(epIP)), "-j", string(KubeMarkMasqChain))...) // Update client-affinity lists. - if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP { + if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { args = append(args, "-m", "recent", "--name", string(endpointChain), "--set") } // DNAT to final destination. @@ -1216,7 +1216,7 @@ func (proxier *Proxier) syncProxyRules() { } // The logic below this applies only if this service is marked as OnlyLocal - if !svcInfo.OnlyNodeLocalEndpoints { + if !svcInfo.OnlyNodeLocalEndpoints() { continue } @@ -1247,13 +1247,13 @@ func (proxier *Proxier) syncProxyRules() { writeLine(proxier.natRules, args...) } else { // First write session affinity rules only over local endpoints, if applicable. - if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP { + if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { for _, endpointChain := range localEndpointChains { writeLine(proxier.natRules, "-A", string(svcXlbChain), "-m", "comment", "--comment", svcNameString, "-m", "recent", "--name", string(endpointChain), - "--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds), "--reap", + "--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds()), "--reap", "-j", string(endpointChain)) } } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index a23cc247f20..45153eec06f 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -147,30 +147,18 @@ func TestGetChainLinesMultipleTables(t *testing.T) { checkAllLines(t, utiliptables.TableNAT, []byte(iptablesSave), expected) } -func newFakeServiceInfo(service proxy.ServicePortName, ip net.IP, port int, protocol v1.Protocol, onlyNodeLocalEndpoints bool) *serviceInfo { - return &serviceInfo{ - BaseServiceInfo: &proxy.BaseServiceInfo{ - SessionAffinityType: v1.ServiceAffinityNone, // default - StickyMaxAgeSeconds: int(v1.DefaultClientIPServiceAffinitySeconds), // default - ClusterIP: ip, - Port: port, - Protocol: protocol, - OnlyNodeLocalEndpoints: onlyNodeLocalEndpoints, - }, - } -} - func TestDeleteEndpointConnections(t *testing.T) { const ( UDP = v1.ProtocolUDP TCP = v1.ProtocolTCP SCTP = v1.ProtocolSCTP ) + testCases := []struct { description string svcName string svcIP string - svcPort int + svcPort int32 protocol v1.Protocol endpoint string // IP:port endpoint epSvcPair proxy.ServiceEndpoint // Will be generated by test @@ -237,21 +225,6 @@ func TestDeleteEndpointConnections(t *testing.T) { }, } - // Create a service map that has service info entries for all test cases - // and generate an endpoint service pair for each test case - serviceMap := make(map[proxy.ServicePortName]proxy.ServicePort) - for i, tc := range testCases { - svc := proxy.ServicePortName{ - NamespacedName: types.NamespacedName{Namespace: "ns1", Name: tc.svcName}, - Port: "p80", - } - serviceMap[svc] = newFakeServiceInfo(svc, net.ParseIP(tc.svcIP), 80, tc.protocol, false) - testCases[i].epSvcPair = proxy.ServiceEndpoint{ - Endpoint: tc.endpoint, - ServicePortName: svc, - } - } - // Create a fake executor for the conntrack utility. This should only be // invoked for UDP connections, since no conntrack cleanup is needed for TCP fcmd := fakeexec.FakeCmd{} @@ -276,16 +249,43 @@ func TestDeleteEndpointConnections(t *testing.T) { } } - // Create a proxier using the fake conntrack executor and service map - fakeProxier := Proxier{exec: &fexec, serviceMap: serviceMap} + ipt := iptablestest.NewFake() + fp := NewFakeProxier(ipt) + fp.exec = &fexec + + for _, tc := range testCases { + makeServiceMap(fp, + makeTestService("ns1", tc.svcName, func(svc *v1.Service) { + svc.Spec.ClusterIP = tc.svcIP + svc.Spec.Ports = []v1.ServicePort{{ + Name: "p80", + Port: tc.svcPort, + Protocol: tc.protocol, + }} + svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal + }), + ) + + proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges) + } // Run the test cases for _, tc := range testCases { priorExecs := fexec.CommandCalls priorGlogErrs := klog.Stats.Error.Lines() - input := []proxy.ServiceEndpoint{tc.epSvcPair} - fakeProxier.deleteEndpointConnections(input) + svc := proxy.ServicePortName{ + NamespacedName: types.NamespacedName{Namespace: "ns1", Name: tc.svcName}, + Port: "p80", + } + input := []proxy.ServiceEndpoint{ + { + Endpoint: tc.endpoint, + ServicePortName: svc, + }, + } + + fp.deleteEndpointConnections(input) // For UDP connections, check the executed conntrack command var expExecs int diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 48c2d499a3e..64aa1ddee84 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -758,9 +758,9 @@ func (proxier *Proxier) syncProxyRules() { staleServices := serviceUpdateResult.UDPStaleClusterIP // merge stale services gathered from updateEndpointsMap for _, svcPortName := range endpointUpdateResult.StaleServiceNames { - if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.GetProtocol() == v1.ProtocolUDP { - klog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.ClusterIPString()) - staleServices.Insert(svcInfo.ClusterIPString()) + if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP { + klog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.ClusterIP().String()) + staleServices.Insert(svcInfo.ClusterIP().String()) for _, extIP := range svcInfo.ExternalIPStrings() { staleServices.Insert(extIP) } @@ -815,7 +815,7 @@ func (proxier *Proxier) syncProxyRules() { klog.Errorf("Failed to cast serviceInfo %q", svcName.String()) continue } - protocol := strings.ToLower(string(svcInfo.Protocol)) + protocol := strings.ToLower(string(svcInfo.Protocol())) // Precompute svcNameString; with many services the many calls // to ServicePortName.String() show up in CPU profiles. svcNameString := svcName.String() @@ -853,8 +853,8 @@ func (proxier *Proxier) syncProxyRules() { // Capture the clusterIP. // ipset call entry := &utilipset.Entry{ - IP: svcInfo.ClusterIP.String(), - Port: svcInfo.Port, + IP: svcInfo.ClusterIP().String(), + Port: svcInfo.Port(), Protocol: protocol, SetType: utilipset.HashIPPort, } @@ -867,15 +867,15 @@ func (proxier *Proxier) syncProxyRules() { proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String()) // ipvs call serv := &utilipvs.VirtualServer{ - Address: svcInfo.ClusterIP, - Port: uint16(svcInfo.Port), - Protocol: string(svcInfo.Protocol), + Address: svcInfo.ClusterIP(), + Port: uint16(svcInfo.Port()), + Protocol: string(svcInfo.Protocol()), Scheduler: proxier.ipvsScheduler, } // Set session affinity flag and timeout for IPVS service - if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP { + if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { serv.Flags |= utilipvs.FlagPersistent - serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds) + serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds()) } // We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService() if err := proxier.syncService(svcNameString, serv, true); err == nil { @@ -891,16 +891,16 @@ func (proxier *Proxier) syncProxyRules() { } // Capture externalIPs. - for _, externalIP := range svcInfo.ExternalIPs { + for _, externalIP := range svcInfo.ExternalIPStrings() { if local, err := utilproxy.IsLocalIP(externalIP); err != nil { klog.Errorf("can't determine if IP is local, assuming not: %v", err) // We do not start listening on SCTP ports, according to our agreement in the // SCTP support KEP - } else if local && (svcInfo.GetProtocol() != v1.ProtocolSCTP) { + } else if local && (svcInfo.Protocol() != v1.ProtocolSCTP) { lp := utilproxy.LocalPort{ Description: "externalIP for " + svcNameString, IP: externalIP, - Port: svcInfo.Port, + Port: svcInfo.Port(), Protocol: protocol, } if proxier.portsMap[lp] != nil { @@ -928,7 +928,7 @@ func (proxier *Proxier) syncProxyRules() { // ipset call entry := &utilipset.Entry{ IP: externalIP, - Port: svcInfo.Port, + Port: svcInfo.Port(), Protocol: protocol, SetType: utilipset.HashIPPort, } @@ -942,13 +942,13 @@ func (proxier *Proxier) syncProxyRules() { // ipvs call serv := &utilipvs.VirtualServer{ Address: net.ParseIP(externalIP), - Port: uint16(svcInfo.Port), - Protocol: string(svcInfo.Protocol), + Port: uint16(svcInfo.Port()), + Protocol: string(svcInfo.Protocol()), Scheduler: proxier.ipvsScheduler, } - if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP { + if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { serv.Flags |= utilipvs.FlagPersistent - serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds) + serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds()) } if err := proxier.syncService(svcNameString, serv, true); err == nil { activeIPVSServices[serv.String()] = true @@ -962,12 +962,12 @@ func (proxier *Proxier) syncProxyRules() { } // Capture load-balancer ingress. - for _, ingress := range svcInfo.LoadBalancerStatus.Ingress { - if ingress.IP != "" { + for _, ingress := range svcInfo.LoadBalancerIPStrings() { + if ingress != "" { // ipset call entry = &utilipset.Entry{ - IP: ingress.IP, - Port: svcInfo.Port, + IP: ingress, + Port: svcInfo.Port(), Protocol: protocol, SetType: utilipset.HashIPPort, } @@ -981,14 +981,14 @@ func (proxier *Proxier) syncProxyRules() { } proxier.ipsetList[kubeLoadBalancerSet].activeEntries.Insert(entry.String()) // insert loadbalancer entry to lbIngressLocalSet if service externaltrafficpolicy=local - if svcInfo.OnlyNodeLocalEndpoints { + if svcInfo.OnlyNodeLocalEndpoints() { if valid := proxier.ipsetList[kubeLoadBalancerLocalSet].validateEntry(entry); !valid { klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerLocalSet].Name)) continue } proxier.ipsetList[kubeLoadBalancerLocalSet].activeEntries.Insert(entry.String()) } - if len(svcInfo.LoadBalancerSourceRanges) != 0 { + if len(svcInfo.LoadBalancerSourceRanges()) != 0 { // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field. // This currently works for loadbalancers that preserves source ips. // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply. @@ -998,11 +998,11 @@ func (proxier *Proxier) syncProxyRules() { } proxier.ipsetList[kubeLoadbalancerFWSet].activeEntries.Insert(entry.String()) allowFromNode := false - for _, src := range svcInfo.LoadBalancerSourceRanges { + for _, src := range svcInfo.LoadBalancerSourceRanges() { // ipset call entry = &utilipset.Entry{ - IP: ingress.IP, - Port: svcInfo.Port, + IP: ingress, + Port: svcInfo.Port(), Protocol: protocol, Net: src, SetType: utilipset.HashIPPortNet, @@ -1025,10 +1025,10 @@ func (proxier *Proxier) syncProxyRules() { // Need to add the following rule to allow request on host. if allowFromNode { entry = &utilipset.Entry{ - IP: ingress.IP, - Port: svcInfo.Port, + IP: ingress, + Port: svcInfo.Port(), Protocol: protocol, - IP2: ingress.IP, + IP2: ingress, SetType: utilipset.HashIPPortIP, } // enumerate all white list source ip @@ -1042,19 +1042,19 @@ func (proxier *Proxier) syncProxyRules() { // ipvs call serv := &utilipvs.VirtualServer{ - Address: net.ParseIP(ingress.IP), - Port: uint16(svcInfo.Port), - Protocol: string(svcInfo.Protocol), + Address: net.ParseIP(ingress), + Port: uint16(svcInfo.Port()), + Protocol: string(svcInfo.Protocol()), Scheduler: proxier.ipvsScheduler, } - if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP { + if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { serv.Flags |= utilipvs.FlagPersistent - serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds) + serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds()) } if err := proxier.syncService(svcNameString, serv, true); err == nil { activeIPVSServices[serv.String()] = true activeBindAddrs[serv.Address.String()] = true - if err := proxier.syncEndpoint(svcName, svcInfo.OnlyNodeLocalEndpoints, serv); err != nil { + if err := proxier.syncEndpoint(svcName, svcInfo.OnlyNodeLocalEndpoints(), serv); err != nil { klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err) } } else { @@ -1063,7 +1063,7 @@ func (proxier *Proxier) syncProxyRules() { } } - if svcInfo.NodePort != 0 { + if svcInfo.NodePort() != 0 { addresses, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer) if err != nil { klog.Errorf("Failed to get node ip address matching nodeport cidr: %v", err) @@ -1075,7 +1075,7 @@ func (proxier *Proxier) syncProxyRules() { lp := utilproxy.LocalPort{ Description: "nodePort for " + svcNameString, IP: address, - Port: svcInfo.NodePort, + Port: svcInfo.NodePort(), Protocol: protocol, } if utilproxy.IsZeroCIDR(address) { @@ -1095,14 +1095,14 @@ func (proxier *Proxier) syncProxyRules() { replacementPortsMap[lp] = proxier.portsMap[lp] // We do not start listening on SCTP ports, according to our agreement in the // SCTP support KEP - } else if svcInfo.GetProtocol() != v1.ProtocolSCTP { + } else if svcInfo.Protocol() != v1.ProtocolSCTP { socket, err := proxier.portMapper.OpenLocalPort(&lp) if err != nil { klog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err) continue } if lp.Protocol == "udp" { - isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP) + isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP()) conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP) } replacementPortsMap[lp] = socket @@ -1111,13 +1111,14 @@ func (proxier *Proxier) syncProxyRules() { // Nodeports need SNAT, unless they're local. // ipset call + var nodePortSet *IPSet switch protocol { case "tcp": nodePortSet = proxier.ipsetList[kubeNodePortSetTCP] entry = &utilipset.Entry{ // No need to provide ip info - Port: svcInfo.NodePort, + Port: svcInfo.NodePort(), Protocol: protocol, SetType: utilipset.BitmapPort, } @@ -1125,7 +1126,7 @@ func (proxier *Proxier) syncProxyRules() { nodePortSet = proxier.ipsetList[kubeNodePortSetUDP] entry = &utilipset.Entry{ // No need to provide ip info - Port: svcInfo.NodePort, + Port: svcInfo.NodePort(), Protocol: protocol, SetType: utilipset.BitmapPort, } @@ -1133,7 +1134,7 @@ func (proxier *Proxier) syncProxyRules() { nodePortSet = proxier.ipsetList[kubeNodePortSetSCTP] entry = &utilipset.Entry{ IP: proxier.nodeIP.String(), - Port: svcInfo.NodePort, + Port: svcInfo.NodePort(), Protocol: protocol, SetType: utilipset.HashIPPort, } @@ -1150,7 +1151,7 @@ func (proxier *Proxier) syncProxyRules() { } // Add externaltrafficpolicy=local type nodeport entry - if svcInfo.OnlyNodeLocalEndpoints { + if svcInfo.OnlyNodeLocalEndpoints() { var nodePortLocalSet *IPSet switch protocol { case "tcp": @@ -1189,18 +1190,18 @@ func (proxier *Proxier) syncProxyRules() { // ipvs call serv := &utilipvs.VirtualServer{ Address: nodeIP, - Port: uint16(svcInfo.NodePort), - Protocol: string(svcInfo.Protocol), + Port: uint16(svcInfo.NodePort()), + Protocol: string(svcInfo.Protocol()), Scheduler: proxier.ipvsScheduler, } - if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP { + if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { serv.Flags |= utilipvs.FlagPersistent - serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds) + serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds()) } // There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`. if err := proxier.syncService(svcNameString, serv, false); err == nil { activeIPVSServices[serv.String()] = true - if err := proxier.syncEndpoint(svcName, svcInfo.OnlyNodeLocalEndpoints, serv); err != nil { + if err := proxier.syncEndpoint(svcName, svcInfo.OnlyNodeLocalEndpoints(), serv); err != nil { klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err) } } else { @@ -1529,9 +1530,9 @@ func (proxier *Proxier) getExistingChains(buffer *bytes.Buffer, table utiliptabl // This assumes the proxier mutex is held func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) { for _, epSvcPair := range connectionMap { - if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.GetProtocol() == v1.ProtocolUDP { + if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == v1.ProtocolUDP { endpointIP := utilproxy.IPPart(epSvcPair.Endpoint) - err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIPString(), endpointIP, v1.ProtocolUDP) + err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP) if err != nil { klog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err) } diff --git a/pkg/proxy/service.go b/pkg/proxy/service.go index eecc643adb2..09db239a3a9 100644 --- a/pkg/proxy/service.go +++ b/pkg/proxy/service.go @@ -40,60 +40,85 @@ import ( // or can be used for constructing a more specific ServiceInfo struct // defined by the proxier if needed. type BaseServiceInfo struct { - ClusterIP net.IP - Port int - Protocol v1.Protocol - NodePort int - LoadBalancerStatus v1.LoadBalancerStatus - SessionAffinityType v1.ServiceAffinity - StickyMaxAgeSeconds int - ExternalIPs []string - LoadBalancerSourceRanges []string - HealthCheckNodePort int - OnlyNodeLocalEndpoints bool + clusterIP net.IP + port int + protocol v1.Protocol + nodePort int + loadBalancerStatus v1.LoadBalancerStatus + sessionAffinityType v1.ServiceAffinity + stickyMaxAgeSeconds int + externalIPs []string + loadBalancerSourceRanges []string + healthCheckNodePort int + onlyNodeLocalEndpoints bool } var _ ServicePort = &BaseServiceInfo{} // String is part of ServicePort interface. func (info *BaseServiceInfo) String() string { - return fmt.Sprintf("%s:%d/%s", info.ClusterIP, info.Port, info.Protocol) + return fmt.Sprintf("%s:%d/%s", info.clusterIP, info.port, info.protocol) } -// ClusterIPString is part of ServicePort interface. -func (info *BaseServiceInfo) ClusterIPString() string { - return info.ClusterIP.String() +// ClusterIP is part of ServicePort interface. +func (info *BaseServiceInfo) ClusterIP() net.IP { + return info.clusterIP } -// GetProtocol is part of ServicePort interface. -func (info *BaseServiceInfo) GetProtocol() v1.Protocol { - return info.Protocol +// Port is part of ServicePort interface. +func (info *BaseServiceInfo) Port() int { + return info.port } -// GetHealthCheckNodePort is part of ServicePort interface. -func (info *BaseServiceInfo) GetHealthCheckNodePort() int { - return info.HealthCheckNodePort +// SessionAffinityType is part of the ServicePort interface. +func (info *BaseServiceInfo) SessionAffinityType() v1.ServiceAffinity { + return info.sessionAffinityType } -// GetNodePort is part of the ServicePort interface. -func (info *BaseServiceInfo) GetNodePort() int { - return info.NodePort +// StickyMaxAgeSeconds is part of the ServicePort interface +func (info *BaseServiceInfo) StickyMaxAgeSeconds() int { + return info.stickyMaxAgeSeconds +} + +// Protocol is part of ServicePort interface. +func (info *BaseServiceInfo) Protocol() v1.Protocol { + return info.protocol +} + +// LoadBalancerSourceRanges is part of ServicePort interface +func (info *BaseServiceInfo) LoadBalancerSourceRanges() []string { + return info.loadBalancerSourceRanges +} + +// HealthCheckNodePort is part of ServicePort interface. +func (info *BaseServiceInfo) HealthCheckNodePort() int { + return info.healthCheckNodePort +} + +// NodePort is part of the ServicePort interface. +func (info *BaseServiceInfo) NodePort() int { + return info.nodePort } // ExternalIPStrings is part of ServicePort interface. func (info *BaseServiceInfo) ExternalIPStrings() []string { - return info.ExternalIPs + return info.externalIPs } // LoadBalancerIPStrings is part of ServicePort interface. func (info *BaseServiceInfo) LoadBalancerIPStrings() []string { var ips []string - for _, ing := range info.LoadBalancerStatus.Ingress { + for _, ing := range info.loadBalancerStatus.Ingress { ips = append(ips, ing.IP) } return ips } +// OnlyNodeLocalEndpoints is part of ServicePort interface. +func (info *BaseServiceInfo) OnlyNodeLocalEndpoints() bool { + return info.onlyNodeLocalEndpoints +} + func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, service *v1.Service) *BaseServiceInfo { onlyNodeLocalEndpoints := false if apiservice.RequestsOnlyLocalTraffic(service) { @@ -105,32 +130,32 @@ func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, servic stickyMaxAgeSeconds = int(*service.Spec.SessionAffinityConfig.ClientIP.TimeoutSeconds) } info := &BaseServiceInfo{ - ClusterIP: net.ParseIP(service.Spec.ClusterIP), - Port: int(port.Port), - Protocol: port.Protocol, - NodePort: int(port.NodePort), + clusterIP: net.ParseIP(service.Spec.ClusterIP), + port: int(port.Port), + protocol: port.Protocol, + nodePort: int(port.NodePort), // Deep-copy in case the service instance changes - LoadBalancerStatus: *service.Status.LoadBalancer.DeepCopy(), - SessionAffinityType: service.Spec.SessionAffinity, - StickyMaxAgeSeconds: stickyMaxAgeSeconds, - OnlyNodeLocalEndpoints: onlyNodeLocalEndpoints, + loadBalancerStatus: *service.Status.LoadBalancer.DeepCopy(), + sessionAffinityType: service.Spec.SessionAffinity, + stickyMaxAgeSeconds: stickyMaxAgeSeconds, + onlyNodeLocalEndpoints: onlyNodeLocalEndpoints, } if sct.isIPv6Mode == nil { - info.ExternalIPs = make([]string, len(service.Spec.ExternalIPs)) - info.LoadBalancerSourceRanges = make([]string, len(service.Spec.LoadBalancerSourceRanges)) - copy(info.LoadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges) - copy(info.ExternalIPs, service.Spec.ExternalIPs) + info.externalIPs = make([]string, len(service.Spec.ExternalIPs)) + info.loadBalancerSourceRanges = make([]string, len(service.Spec.LoadBalancerSourceRanges)) + copy(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges) + copy(info.externalIPs, service.Spec.ExternalIPs) } else { // Filter out the incorrect IP version case. // If ExternalIPs and LoadBalancerSourceRanges on service contains incorrect IP versions, // only filter out the incorrect ones. var incorrectIPs []string - info.ExternalIPs, incorrectIPs = utilproxy.FilterIncorrectIPVersion(service.Spec.ExternalIPs, *sct.isIPv6Mode) + info.externalIPs, incorrectIPs = utilproxy.FilterIncorrectIPVersion(service.Spec.ExternalIPs, *sct.isIPv6Mode) if len(incorrectIPs) > 0 { utilproxy.LogAndEmitIncorrectIPVersionEvent(sct.recorder, "externalIPs", strings.Join(incorrectIPs, ","), service.Namespace, service.Name, service.UID) } - info.LoadBalancerSourceRanges, incorrectIPs = utilproxy.FilterIncorrectCIDRVersion(service.Spec.LoadBalancerSourceRanges, *sct.isIPv6Mode) + info.loadBalancerSourceRanges, incorrectIPs = utilproxy.FilterIncorrectCIDRVersion(service.Spec.LoadBalancerSourceRanges, *sct.isIPv6Mode) if len(incorrectIPs) > 0 { utilproxy.LogAndEmitIncorrectIPVersionEvent(sct.recorder, "loadBalancerSourceRanges", strings.Join(incorrectIPs, ","), service.Namespace, service.Name, service.UID) } @@ -141,7 +166,7 @@ func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, servic if p == 0 { klog.Errorf("Service %s/%s has no healthcheck nodeport", service.Namespace, service.Name) } else { - info.HealthCheckNodePort = int(p) + info.healthCheckNodePort = int(p) } } @@ -239,8 +264,8 @@ func UpdateServiceMap(serviceMap ServiceMap, changes *ServiceChangeTracker) (res // computing this incrementally similarly to serviceMap. result.HCServiceNodePorts = make(map[types.NamespacedName]uint16) for svcPortName, info := range serviceMap { - if info.GetHealthCheckNodePort() != 0 { - result.HCServiceNodePorts[svcPortName.NamespacedName] = uint16(info.GetHealthCheckNodePort()) + if info.HealthCheckNodePort() != 0 { + result.HCServiceNodePorts[svcPortName.NamespacedName] = uint16(info.HealthCheckNodePort()) } } @@ -355,8 +380,8 @@ func (sm *ServiceMap) unmerge(other ServiceMap, UDPStaleClusterIP sets.String) { info, exists := (*sm)[svcPortName] if exists { klog.V(1).Infof("Removing service port %q", svcPortName) - if info.GetProtocol() == v1.ProtocolUDP { - UDPStaleClusterIP.Insert(info.ClusterIPString()) + if info.Protocol() == v1.ProtocolUDP { + UDPStaleClusterIP.Insert(info.ClusterIP().String()) } delete(*sm, svcPortName) } else { diff --git a/pkg/proxy/service_test.go b/pkg/proxy/service_test.go index e929013ecc4..f79b5c68029 100644 --- a/pkg/proxy/service_test.go +++ b/pkg/proxy/service_test.go @@ -33,12 +33,12 @@ const testHostname = "test-hostname" func makeTestServiceInfo(clusterIP string, port int, protocol string, healthcheckNodePort int, svcInfoFuncs ...func(*BaseServiceInfo)) *BaseServiceInfo { info := &BaseServiceInfo{ - ClusterIP: net.ParseIP(clusterIP), - Port: port, - Protocol: v1.Protocol(protocol), + clusterIP: net.ParseIP(clusterIP), + port: port, + protocol: v1.Protocol(protocol), } if healthcheckNodePort != 0 { - info.HealthCheckNodePort = healthcheckNodePort + info.healthCheckNodePort = healthcheckNodePort } for _, svcInfoFunc := range svcInfoFuncs { svcInfoFunc(info) @@ -269,8 +269,8 @@ func TestServiceToServiceMap(t *testing.T) { }, expected: map[ServicePortName]*BaseServiceInfo{ makeServicePortName("test", "validIPv4", "testPort"): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(info *BaseServiceInfo) { - info.ExternalIPs = []string{testExternalIPv4} - info.LoadBalancerSourceRanges = []string{testSourceRangeIPv4} + info.externalIPs = []string{testExternalIPv4} + info.loadBalancerSourceRanges = []string{testSourceRangeIPv4} }), }, isIPv6Mode: &falseVal, @@ -297,8 +297,8 @@ func TestServiceToServiceMap(t *testing.T) { }, expected: map[ServicePortName]*BaseServiceInfo{ makeServicePortName("test", "validIPv6", "testPort"): makeTestServiceInfo(testClusterIPv6, 12345, "TCP", 0, func(info *BaseServiceInfo) { - info.ExternalIPs = []string{testExternalIPv6} - info.LoadBalancerSourceRanges = []string{testSourceRangeIPv6} + info.externalIPs = []string{testExternalIPv6} + info.loadBalancerSourceRanges = []string{testSourceRangeIPv6} }), }, isIPv6Mode: &trueVal, @@ -325,8 +325,8 @@ func TestServiceToServiceMap(t *testing.T) { }, expected: map[ServicePortName]*BaseServiceInfo{ makeServicePortName("test", "filterIPv6InIPV4Mode", "testPort"): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(info *BaseServiceInfo) { - info.ExternalIPs = []string{testExternalIPv4} - info.LoadBalancerSourceRanges = []string{testSourceRangeIPv4} + info.externalIPs = []string{testExternalIPv4} + info.loadBalancerSourceRanges = []string{testSourceRangeIPv4} }), }, isIPv6Mode: &falseVal, @@ -353,8 +353,8 @@ func TestServiceToServiceMap(t *testing.T) { }, expected: map[ServicePortName]*BaseServiceInfo{ makeServicePortName("test", "filterIPv4InIPV6Mode", "testPort"): makeTestServiceInfo(testClusterIPv6, 12345, "TCP", 0, func(info *BaseServiceInfo) { - info.ExternalIPs = []string{testExternalIPv6} - info.LoadBalancerSourceRanges = []string{testSourceRangeIPv6} + info.externalIPs = []string{testExternalIPv6} + info.loadBalancerSourceRanges = []string{testSourceRangeIPv6} }), }, isIPv6Mode: &trueVal, @@ -371,12 +371,12 @@ func TestServiceToServiceMap(t *testing.T) { } for svcKey, expectedInfo := range tc.expected { svcInfo := newServices[svcKey].(*BaseServiceInfo) - if !svcInfo.ClusterIP.Equal(expectedInfo.ClusterIP) || - svcInfo.Port != expectedInfo.Port || - svcInfo.Protocol != expectedInfo.Protocol || - svcInfo.HealthCheckNodePort != expectedInfo.HealthCheckNodePort || - !sets.NewString(svcInfo.ExternalIPs...).Equal(sets.NewString(expectedInfo.ExternalIPs...)) || - !sets.NewString(svcInfo.LoadBalancerSourceRanges...).Equal(sets.NewString(expectedInfo.LoadBalancerSourceRanges...)) { + if !svcInfo.clusterIP.Equal(expectedInfo.clusterIP) || + svcInfo.port != expectedInfo.port || + svcInfo.protocol != expectedInfo.protocol || + svcInfo.healthCheckNodePort != expectedInfo.healthCheckNodePort || + !sets.NewString(svcInfo.externalIPs...).Equal(sets.NewString(expectedInfo.externalIPs...)) || + !sets.NewString(svcInfo.loadBalancerSourceRanges...).Equal(sets.NewString(expectedInfo.loadBalancerSourceRanges...)) { t.Errorf("[%s] expected new[%v]to be %v, got %v", tc.desc, svcKey, expectedInfo, *svcInfo) } } diff --git a/pkg/proxy/types.go b/pkg/proxy/types.go index 410603db195..5b62f7ed948 100644 --- a/pkg/proxy/types.go +++ b/pkg/proxy/types.go @@ -18,6 +18,7 @@ package proxy import ( "fmt" + "net" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" @@ -52,18 +53,28 @@ func (spn ServicePortName) String() string { type ServicePort interface { // String returns service string. An example format can be: `IP:Port/Protocol`. String() string - // ClusterIPString returns service cluster IP in string format. - ClusterIPString() string + // GetClusterIP returns service cluster IP in net.IP format. + ClusterIP() net.IP + // GetPort returns service port if present. If return 0 means not present. + Port() int + // GetSessionAffinityType returns service session affinity type + SessionAffinityType() v1.ServiceAffinity + // GetStickyMaxAgeSeconds returns service max connection age + StickyMaxAgeSeconds() int // ExternalIPStrings returns service ExternalIPs as a string array. ExternalIPStrings() []string // LoadBalancerIPStrings returns service LoadBalancerIPs as a string array. LoadBalancerIPStrings() []string // GetProtocol returns service protocol. - GetProtocol() v1.Protocol + Protocol() v1.Protocol + // LoadBalancerSourceRanges returns service LoadBalancerSourceRanges if present empty array if not + LoadBalancerSourceRanges() []string // GetHealthCheckNodePort returns service health check node port if present. If return 0, it means not present. - GetHealthCheckNodePort() int + HealthCheckNodePort() int // GetNodePort returns a service Node port if present. If return 0, it means not present. - GetNodePort() int + NodePort() int + // GetOnlyNodeLocalEndpoints returns if a service has only node local endpoints + OnlyNodeLocalEndpoints() bool } // Endpoint in an interface which abstracts information about an endpoint.