Merge pull request #76160 from JacobTanenbaum/BaseServiceInfo-cleanup

enforce the interface relationship between ServicePort and BaseServiceInfo
This commit is contained in:
Kubernetes Prow Robot 2019-06-13 20:37:13 -07:00 committed by GitHub
commit 0c9964fac3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 232 additions and 195 deletions

View File

@ -156,7 +156,7 @@ func newServiceInfo(port *v1.ServicePort, service *v1.Service, baseInfo *proxy.B
// Store the following for performance reasons. // Store the following for performance reasons.
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: port.Name} svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: port.Name}
protocol := strings.ToLower(string(info.Protocol)) protocol := strings.ToLower(string(info.Protocol()))
info.serviceNameString = svcPortName.String() info.serviceNameString = svcPortName.String()
info.servicePortChainName = servicePortChainName(info.serviceNameString, protocol) info.servicePortChainName = servicePortChainName(info.serviceNameString, protocol)
info.serviceFirewallChainName = serviceFirewallChainName(info.serviceNameString, protocol) info.serviceFirewallChainName = serviceFirewallChainName(info.serviceNameString, protocol)
@ -620,14 +620,14 @@ func servicePortEndpointChainName(servicePortName string, protocol string, endpo
// TODO: move it to util // TODO: move it to util
func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) { func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) {
for _, epSvcPair := range connectionMap { for _, epSvcPair := range connectionMap {
if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.GetProtocol() == v1.ProtocolUDP { if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == v1.ProtocolUDP {
endpointIP := utilproxy.IPPart(epSvcPair.Endpoint) endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
nodePort := svcInfo.GetNodePort() nodePort := svcInfo.NodePort()
var err error var err error
if nodePort != 0 { if nodePort != 0 {
err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, v1.ProtocolUDP) err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, v1.ProtocolUDP)
} else { } else {
err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIPString(), endpointIP, v1.ProtocolUDP) err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP)
} }
if err != nil { if err != nil {
klog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err) klog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err)
@ -689,9 +689,9 @@ func (proxier *Proxier) syncProxyRules() {
staleServices := serviceUpdateResult.UDPStaleClusterIP staleServices := serviceUpdateResult.UDPStaleClusterIP
// merge stale services gathered from updateEndpointsMap // merge stale services gathered from updateEndpointsMap
for _, svcPortName := range endpointUpdateResult.StaleServiceNames { for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.GetProtocol() == v1.ProtocolUDP { if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP {
klog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.ClusterIPString()) klog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.ClusterIP().String())
staleServices.Insert(svcInfo.ClusterIPString()) staleServices.Insert(svcInfo.ClusterIP().String())
for _, extIP := range svcInfo.ExternalIPStrings() { for _, extIP := range svcInfo.ExternalIPStrings() {
staleServices.Insert(extIP) staleServices.Insert(extIP)
} }
@ -820,8 +820,8 @@ func (proxier *Proxier) syncProxyRules() {
klog.Errorf("Failed to cast serviceInfo %q", svcName.String()) klog.Errorf("Failed to cast serviceInfo %q", svcName.String())
continue continue
} }
isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP) isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP())
protocol := strings.ToLower(string(svcInfo.Protocol)) protocol := strings.ToLower(string(svcInfo.Protocol()))
svcNameString := svcInfo.serviceNameString svcNameString := svcInfo.serviceNameString
hasEndpoints := len(proxier.endpointsMap[svcName]) > 0 hasEndpoints := len(proxier.endpointsMap[svcName]) > 0
@ -837,7 +837,7 @@ func (proxier *Proxier) syncProxyRules() {
} }
svcXlbChain := svcInfo.serviceLBChainName svcXlbChain := svcInfo.serviceLBChainName
if svcInfo.OnlyNodeLocalEndpoints { if svcInfo.OnlyNodeLocalEndpoints() {
// Only for services request OnlyLocal traffic // Only for services request OnlyLocal traffic
// create the per-service LB chain, retaining counters if possible. // create the per-service LB chain, retaining counters if possible.
if lbChain, ok := existingNATChains[svcXlbChain]; ok { if lbChain, ok := existingNATChains[svcXlbChain]; ok {
@ -854,8 +854,8 @@ func (proxier *Proxier) syncProxyRules() {
"-A", string(kubeServicesChain), "-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString), "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"-d", utilproxy.ToCIDR(svcInfo.ClusterIP), "-d", utilproxy.ToCIDR(svcInfo.ClusterIP()),
"--dport", strconv.Itoa(svcInfo.Port), "--dport", strconv.Itoa(svcInfo.Port()),
) )
if proxier.masqueradeAll { if proxier.masqueradeAll {
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
@ -874,24 +874,24 @@ func (proxier *Proxier) syncProxyRules() {
"-A", string(kubeServicesChain), "-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"-d", utilproxy.ToCIDR(svcInfo.ClusterIP), "-d", utilproxy.ToCIDR(svcInfo.ClusterIP()),
"--dport", strconv.Itoa(svcInfo.Port), "--dport", strconv.Itoa(svcInfo.Port()),
"-j", "REJECT", "-j", "REJECT",
) )
} }
// Capture externalIPs. // Capture externalIPs.
for _, externalIP := range svcInfo.ExternalIPs { for _, externalIP := range svcInfo.ExternalIPStrings() {
// If the "external" IP happens to be an IP that is local to this // If the "external" IP happens to be an IP that is local to this
// machine, hold the local port open so no other process can open it // machine, hold the local port open so no other process can open it
// (because the socket might open but it would never work). // (because the socket might open but it would never work).
if local, err := utilproxy.IsLocalIP(externalIP); err != nil { if local, err := utilproxy.IsLocalIP(externalIP); err != nil {
klog.Errorf("can't determine if IP is local, assuming not: %v", err) klog.Errorf("can't determine if IP is local, assuming not: %v", err)
} else if local && (svcInfo.GetProtocol() != v1.ProtocolSCTP) { } else if local && (svcInfo.Protocol() != v1.ProtocolSCTP) {
lp := utilproxy.LocalPort{ lp := utilproxy.LocalPort{
Description: "externalIP for " + svcNameString, Description: "externalIP for " + svcNameString,
IP: externalIP, IP: externalIP,
Port: svcInfo.Port, Port: svcInfo.Port(),
Protocol: protocol, Protocol: protocol,
} }
if proxier.portsMap[lp] != nil { if proxier.portsMap[lp] != nil {
@ -922,7 +922,7 @@ func (proxier *Proxier) syncProxyRules() {
"-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcNameString), "-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcNameString),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"-d", utilproxy.ToCIDR(net.ParseIP(externalIP)), "-d", utilproxy.ToCIDR(net.ParseIP(externalIP)),
"--dport", strconv.Itoa(svcInfo.Port), "--dport", strconv.Itoa(svcInfo.Port()),
) )
// We have to SNAT packets to external IPs. // We have to SNAT packets to external IPs.
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
@ -946,7 +946,7 @@ func (proxier *Proxier) syncProxyRules() {
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"-d", utilproxy.ToCIDR(net.ParseIP(externalIP)), "-d", utilproxy.ToCIDR(net.ParseIP(externalIP)),
"--dport", strconv.Itoa(svcInfo.Port), "--dport", strconv.Itoa(svcInfo.Port()),
"-j", "REJECT", "-j", "REJECT",
) )
} }
@ -954,8 +954,8 @@ func (proxier *Proxier) syncProxyRules() {
// Capture load-balancer ingress. // Capture load-balancer ingress.
fwChain := svcInfo.serviceFirewallChainName fwChain := svcInfo.serviceFirewallChainName
for _, ingress := range svcInfo.LoadBalancerStatus.Ingress { for _, ingress := range svcInfo.LoadBalancerIPStrings() {
if ingress.IP != "" { if ingress != "" {
if hasEndpoints { if hasEndpoints {
// create service firewall chain // create service firewall chain
if chain, ok := existingNATChains[fwChain]; ok { if chain, ok := existingNATChains[fwChain]; ok {
@ -972,8 +972,8 @@ func (proxier *Proxier) syncProxyRules() {
"-A", string(kubeServicesChain), "-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString), "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"-d", utilproxy.ToCIDR(net.ParseIP(ingress.IP)), "-d", utilproxy.ToCIDR(net.ParseIP(ingress)),
"--dport", strconv.Itoa(svcInfo.Port), "--dport", strconv.Itoa(svcInfo.Port()),
) )
// jump to service firewall chain // jump to service firewall chain
writeLine(proxier.natRules, append(args, "-j", string(fwChain))...) writeLine(proxier.natRules, append(args, "-j", string(fwChain))...)
@ -987,18 +987,18 @@ func (proxier *Proxier) syncProxyRules() {
chosenChain := svcXlbChain chosenChain := svcXlbChain
// If we are proxying globally, we need to masquerade in case we cross nodes. // If we are proxying globally, we need to masquerade in case we cross nodes.
// If we are proxying only locally, we can retain the source IP. // If we are proxying only locally, we can retain the source IP.
if !svcInfo.OnlyNodeLocalEndpoints { if !svcInfo.OnlyNodeLocalEndpoints() {
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
chosenChain = svcChain chosenChain = svcChain
} }
if len(svcInfo.LoadBalancerSourceRanges) == 0 { if len(svcInfo.LoadBalancerSourceRanges()) == 0 {
// allow all sources, so jump directly to the KUBE-SVC or KUBE-XLB chain // allow all sources, so jump directly to the KUBE-SVC or KUBE-XLB chain
writeLine(proxier.natRules, append(args, "-j", string(chosenChain))...) writeLine(proxier.natRules, append(args, "-j", string(chosenChain))...)
} else { } else {
// firewall filter based on each source range // firewall filter based on each source range
allowFromNode := false allowFromNode := false
for _, src := range svcInfo.LoadBalancerSourceRanges { for _, src := range svcInfo.LoadBalancerSourceRanges() {
writeLine(proxier.natRules, append(args, "-s", src, "-j", string(chosenChain))...) writeLine(proxier.natRules, append(args, "-s", src, "-j", string(chosenChain))...)
// ignore error because it has been validated // ignore error because it has been validated
_, cidr, _ := net.ParseCIDR(src) _, cidr, _ := net.ParseCIDR(src)
@ -1010,7 +1010,7 @@ func (proxier *Proxier) syncProxyRules() {
// loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly. // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly.
// Need to add the following rule to allow request on host. // Need to add the following rule to allow request on host.
if allowFromNode { if allowFromNode {
writeLine(proxier.natRules, append(args, "-s", utilproxy.ToCIDR(net.ParseIP(ingress.IP)), "-j", string(chosenChain))...) writeLine(proxier.natRules, append(args, "-s", utilproxy.ToCIDR(net.ParseIP(ingress)), "-j", string(chosenChain))...)
} }
} }
@ -1023,8 +1023,8 @@ func (proxier *Proxier) syncProxyRules() {
"-A", string(kubeServicesChain), "-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"-d", utilproxy.ToCIDR(net.ParseIP(ingress.IP)), "-d", utilproxy.ToCIDR(net.ParseIP(ingress)),
"--dport", strconv.Itoa(svcInfo.Port), "--dport", strconv.Itoa(svcInfo.Port()),
"-j", "REJECT", "-j", "REJECT",
) )
} }
@ -1034,7 +1034,7 @@ func (proxier *Proxier) syncProxyRules() {
// Capture nodeports. If we had more than 2 rules it might be // Capture nodeports. If we had more than 2 rules it might be
// worthwhile to make a new per-service chain for nodeport rules, but // worthwhile to make a new per-service chain for nodeport rules, but
// with just 2 rules it ends up being a waste and a cognitive burden. // with just 2 rules it ends up being a waste and a cognitive burden.
if svcInfo.NodePort != 0 { if svcInfo.NodePort() != 0 {
// Hold the local port open so no other process can open it // Hold the local port open so no other process can open it
// (because the socket might open but it would never work). // (because the socket might open but it would never work).
addresses, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer) addresses, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer)
@ -1048,7 +1048,7 @@ func (proxier *Proxier) syncProxyRules() {
lp := utilproxy.LocalPort{ lp := utilproxy.LocalPort{
Description: "nodePort for " + svcNameString, Description: "nodePort for " + svcNameString,
IP: address, IP: address,
Port: svcInfo.NodePort, Port: svcInfo.NodePort(),
Protocol: protocol, Protocol: protocol,
} }
if utilproxy.IsZeroCIDR(address) { if utilproxy.IsZeroCIDR(address) {
@ -1066,7 +1066,7 @@ func (proxier *Proxier) syncProxyRules() {
if proxier.portsMap[lp] != nil { if proxier.portsMap[lp] != nil {
klog.V(4).Infof("Port %s was open before and is still needed", lp.String()) klog.V(4).Infof("Port %s was open before and is still needed", lp.String())
replacementPortsMap[lp] = proxier.portsMap[lp] replacementPortsMap[lp] = proxier.portsMap[lp]
} else if svcInfo.GetProtocol() != v1.ProtocolSCTP { } else if svcInfo.Protocol() != v1.ProtocolSCTP {
socket, err := proxier.portMapper.OpenLocalPort(&lp) socket, err := proxier.portMapper.OpenLocalPort(&lp)
if err != nil { if err != nil {
klog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err) klog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
@ -1091,9 +1091,9 @@ func (proxier *Proxier) syncProxyRules() {
"-A", string(kubeNodePortsChain), "-A", string(kubeNodePortsChain),
"-m", "comment", "--comment", svcNameString, "-m", "comment", "--comment", svcNameString,
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"--dport", strconv.Itoa(svcInfo.NodePort), "--dport", strconv.Itoa(svcInfo.NodePort()),
) )
if !svcInfo.OnlyNodeLocalEndpoints { if !svcInfo.OnlyNodeLocalEndpoints() {
// Nodeports need SNAT, unless they're local. // Nodeports need SNAT, unless they're local.
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
// Jump to the service chain. // Jump to the service chain.
@ -1117,7 +1117,7 @@ func (proxier *Proxier) syncProxyRules() {
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
"-m", "addrtype", "--dst-type", "LOCAL", "-m", "addrtype", "--dst-type", "LOCAL",
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"--dport", strconv.Itoa(svcInfo.NodePort), "--dport", strconv.Itoa(svcInfo.NodePort()),
"-j", "REJECT", "-j", "REJECT",
) )
} }
@ -1153,7 +1153,7 @@ func (proxier *Proxier) syncProxyRules() {
} }
// First write session affinity rules, if applicable. // First write session affinity rules, if applicable.
if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP { if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
for _, endpointChain := range endpointChains { for _, endpointChain := range endpointChains {
args = append(args[:0], args = append(args[:0],
"-A", string(svcChain), "-A", string(svcChain),
@ -1161,7 +1161,7 @@ func (proxier *Proxier) syncProxyRules() {
proxier.appendServiceCommentLocked(args, svcNameString) proxier.appendServiceCommentLocked(args, svcNameString)
args = append(args, args = append(args,
"-m", "recent", "--name", string(endpointChain), "-m", "recent", "--name", string(endpointChain),
"--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds), "--reap", "--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds()), "--reap",
"-j", string(endpointChain), "-j", string(endpointChain),
) )
writeLine(proxier.natRules, args...) writeLine(proxier.natRules, args...)
@ -1174,7 +1174,7 @@ func (proxier *Proxier) syncProxyRules() {
localEndpointChains := make([]utiliptables.Chain, 0) localEndpointChains := make([]utiliptables.Chain, 0)
for i, endpointChain := range endpointChains { for i, endpointChain := range endpointChains {
// Write ingress loadbalancing & DNAT rules only for services that request OnlyLocal traffic. // Write ingress loadbalancing & DNAT rules only for services that request OnlyLocal traffic.
if svcInfo.OnlyNodeLocalEndpoints && endpoints[i].IsLocal { if svcInfo.OnlyNodeLocalEndpoints() && endpoints[i].IsLocal {
// These slices parallel each other; must be kept in sync // These slices parallel each other; must be kept in sync
localEndpoints = append(localEndpoints, endpoints[i]) localEndpoints = append(localEndpoints, endpoints[i])
localEndpointChains = append(localEndpointChains, endpointChains[i]) localEndpointChains = append(localEndpointChains, endpointChains[i])
@ -1207,7 +1207,7 @@ func (proxier *Proxier) syncProxyRules() {
"-s", utilproxy.ToCIDR(net.ParseIP(epIP)), "-s", utilproxy.ToCIDR(net.ParseIP(epIP)),
"-j", string(KubeMarkMasqChain))...) "-j", string(KubeMarkMasqChain))...)
// Update client-affinity lists. // Update client-affinity lists.
if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP { if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
args = append(args, "-m", "recent", "--name", string(endpointChain), "--set") args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
} }
// DNAT to final destination. // DNAT to final destination.
@ -1216,7 +1216,7 @@ func (proxier *Proxier) syncProxyRules() {
} }
// The logic below this applies only if this service is marked as OnlyLocal // The logic below this applies only if this service is marked as OnlyLocal
if !svcInfo.OnlyNodeLocalEndpoints { if !svcInfo.OnlyNodeLocalEndpoints() {
continue continue
} }
@ -1257,13 +1257,13 @@ func (proxier *Proxier) syncProxyRules() {
writeLine(proxier.natRules, args...) writeLine(proxier.natRules, args...)
} else { } else {
// First write session affinity rules only over local endpoints, if applicable. // First write session affinity rules only over local endpoints, if applicable.
if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP { if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
for _, endpointChain := range localEndpointChains { for _, endpointChain := range localEndpointChains {
writeLine(proxier.natRules, writeLine(proxier.natRules,
"-A", string(svcXlbChain), "-A", string(svcXlbChain),
"-m", "comment", "--comment", svcNameString, "-m", "comment", "--comment", svcNameString,
"-m", "recent", "--name", string(endpointChain), "-m", "recent", "--name", string(endpointChain),
"--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds), "--reap", "--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds()), "--reap",
"-j", string(endpointChain)) "-j", string(endpointChain))
} }
} }

View File

@ -147,30 +147,18 @@ func TestGetChainLinesMultipleTables(t *testing.T) {
checkAllLines(t, utiliptables.TableNAT, []byte(iptablesSave), expected) checkAllLines(t, utiliptables.TableNAT, []byte(iptablesSave), expected)
} }
func newFakeServiceInfo(service proxy.ServicePortName, ip net.IP, port int, protocol v1.Protocol, onlyNodeLocalEndpoints bool) *serviceInfo {
return &serviceInfo{
BaseServiceInfo: &proxy.BaseServiceInfo{
SessionAffinityType: v1.ServiceAffinityNone, // default
StickyMaxAgeSeconds: int(v1.DefaultClientIPServiceAffinitySeconds), // default
ClusterIP: ip,
Port: port,
Protocol: protocol,
OnlyNodeLocalEndpoints: onlyNodeLocalEndpoints,
},
}
}
func TestDeleteEndpointConnections(t *testing.T) { func TestDeleteEndpointConnections(t *testing.T) {
const ( const (
UDP = v1.ProtocolUDP UDP = v1.ProtocolUDP
TCP = v1.ProtocolTCP TCP = v1.ProtocolTCP
SCTP = v1.ProtocolSCTP SCTP = v1.ProtocolSCTP
) )
testCases := []struct { testCases := []struct {
description string description string
svcName string svcName string
svcIP string svcIP string
svcPort int svcPort int32
protocol v1.Protocol protocol v1.Protocol
endpoint string // IP:port endpoint endpoint string // IP:port endpoint
epSvcPair proxy.ServiceEndpoint // Will be generated by test epSvcPair proxy.ServiceEndpoint // Will be generated by test
@ -237,21 +225,6 @@ func TestDeleteEndpointConnections(t *testing.T) {
}, },
} }
// Create a service map that has service info entries for all test cases
// and generate an endpoint service pair for each test case
serviceMap := make(map[proxy.ServicePortName]proxy.ServicePort)
for i, tc := range testCases {
svc := proxy.ServicePortName{
NamespacedName: types.NamespacedName{Namespace: "ns1", Name: tc.svcName},
Port: "p80",
}
serviceMap[svc] = newFakeServiceInfo(svc, net.ParseIP(tc.svcIP), 80, tc.protocol, false)
testCases[i].epSvcPair = proxy.ServiceEndpoint{
Endpoint: tc.endpoint,
ServicePortName: svc,
}
}
// Create a fake executor for the conntrack utility. This should only be // Create a fake executor for the conntrack utility. This should only be
// invoked for UDP connections, since no conntrack cleanup is needed for TCP // invoked for UDP connections, since no conntrack cleanup is needed for TCP
fcmd := fakeexec.FakeCmd{} fcmd := fakeexec.FakeCmd{}
@ -276,16 +249,43 @@ func TestDeleteEndpointConnections(t *testing.T) {
} }
} }
// Create a proxier using the fake conntrack executor and service map ipt := iptablestest.NewFake()
fakeProxier := Proxier{exec: &fexec, serviceMap: serviceMap} fp := NewFakeProxier(ipt)
fp.exec = &fexec
for _, tc := range testCases {
makeServiceMap(fp,
makeTestService("ns1", tc.svcName, func(svc *v1.Service) {
svc.Spec.ClusterIP = tc.svcIP
svc.Spec.Ports = []v1.ServicePort{{
Name: "p80",
Port: tc.svcPort,
Protocol: tc.protocol,
}}
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
}),
)
proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
}
// Run the test cases // Run the test cases
for _, tc := range testCases { for _, tc := range testCases {
priorExecs := fexec.CommandCalls priorExecs := fexec.CommandCalls
priorGlogErrs := klog.Stats.Error.Lines() priorGlogErrs := klog.Stats.Error.Lines()
input := []proxy.ServiceEndpoint{tc.epSvcPair} svc := proxy.ServicePortName{
fakeProxier.deleteEndpointConnections(input) NamespacedName: types.NamespacedName{Namespace: "ns1", Name: tc.svcName},
Port: "p80",
}
input := []proxy.ServiceEndpoint{
{
Endpoint: tc.endpoint,
ServicePortName: svc,
},
}
fp.deleteEndpointConnections(input)
// For UDP connections, check the executed conntrack command // For UDP connections, check the executed conntrack command
var expExecs int var expExecs int

View File

@ -758,9 +758,9 @@ func (proxier *Proxier) syncProxyRules() {
staleServices := serviceUpdateResult.UDPStaleClusterIP staleServices := serviceUpdateResult.UDPStaleClusterIP
// merge stale services gathered from updateEndpointsMap // merge stale services gathered from updateEndpointsMap
for _, svcPortName := range endpointUpdateResult.StaleServiceNames { for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.GetProtocol() == v1.ProtocolUDP { if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP {
klog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.ClusterIPString()) klog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.ClusterIP().String())
staleServices.Insert(svcInfo.ClusterIPString()) staleServices.Insert(svcInfo.ClusterIP().String())
for _, extIP := range svcInfo.ExternalIPStrings() { for _, extIP := range svcInfo.ExternalIPStrings() {
staleServices.Insert(extIP) staleServices.Insert(extIP)
} }
@ -815,7 +815,7 @@ func (proxier *Proxier) syncProxyRules() {
klog.Errorf("Failed to cast serviceInfo %q", svcName.String()) klog.Errorf("Failed to cast serviceInfo %q", svcName.String())
continue continue
} }
protocol := strings.ToLower(string(svcInfo.Protocol)) protocol := strings.ToLower(string(svcInfo.Protocol()))
// Precompute svcNameString; with many services the many calls // Precompute svcNameString; with many services the many calls
// to ServicePortName.String() show up in CPU profiles. // to ServicePortName.String() show up in CPU profiles.
svcNameString := svcName.String() svcNameString := svcName.String()
@ -853,8 +853,8 @@ func (proxier *Proxier) syncProxyRules() {
// Capture the clusterIP. // Capture the clusterIP.
// ipset call // ipset call
entry := &utilipset.Entry{ entry := &utilipset.Entry{
IP: svcInfo.ClusterIP.String(), IP: svcInfo.ClusterIP().String(),
Port: svcInfo.Port, Port: svcInfo.Port(),
Protocol: protocol, Protocol: protocol,
SetType: utilipset.HashIPPort, SetType: utilipset.HashIPPort,
} }
@ -867,15 +867,15 @@ func (proxier *Proxier) syncProxyRules() {
proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String()) proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String())
// ipvs call // ipvs call
serv := &utilipvs.VirtualServer{ serv := &utilipvs.VirtualServer{
Address: svcInfo.ClusterIP, Address: svcInfo.ClusterIP(),
Port: uint16(svcInfo.Port), Port: uint16(svcInfo.Port()),
Protocol: string(svcInfo.Protocol), Protocol: string(svcInfo.Protocol()),
Scheduler: proxier.ipvsScheduler, Scheduler: proxier.ipvsScheduler,
} }
// Set session affinity flag and timeout for IPVS service // Set session affinity flag and timeout for IPVS service
if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP { if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
serv.Flags |= utilipvs.FlagPersistent serv.Flags |= utilipvs.FlagPersistent
serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds) serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
} }
// We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService() // We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService()
if err := proxier.syncService(svcNameString, serv, true); err == nil { if err := proxier.syncService(svcNameString, serv, true); err == nil {
@ -891,16 +891,16 @@ func (proxier *Proxier) syncProxyRules() {
} }
// Capture externalIPs. // Capture externalIPs.
for _, externalIP := range svcInfo.ExternalIPs { for _, externalIP := range svcInfo.ExternalIPStrings() {
if local, err := utilproxy.IsLocalIP(externalIP); err != nil { if local, err := utilproxy.IsLocalIP(externalIP); err != nil {
klog.Errorf("can't determine if IP is local, assuming not: %v", err) klog.Errorf("can't determine if IP is local, assuming not: %v", err)
// We do not start listening on SCTP ports, according to our agreement in the // We do not start listening on SCTP ports, according to our agreement in the
// SCTP support KEP // SCTP support KEP
} else if local && (svcInfo.GetProtocol() != v1.ProtocolSCTP) { } else if local && (svcInfo.Protocol() != v1.ProtocolSCTP) {
lp := utilproxy.LocalPort{ lp := utilproxy.LocalPort{
Description: "externalIP for " + svcNameString, Description: "externalIP for " + svcNameString,
IP: externalIP, IP: externalIP,
Port: svcInfo.Port, Port: svcInfo.Port(),
Protocol: protocol, Protocol: protocol,
} }
if proxier.portsMap[lp] != nil { if proxier.portsMap[lp] != nil {
@ -928,7 +928,7 @@ func (proxier *Proxier) syncProxyRules() {
// ipset call // ipset call
entry := &utilipset.Entry{ entry := &utilipset.Entry{
IP: externalIP, IP: externalIP,
Port: svcInfo.Port, Port: svcInfo.Port(),
Protocol: protocol, Protocol: protocol,
SetType: utilipset.HashIPPort, SetType: utilipset.HashIPPort,
} }
@ -942,13 +942,13 @@ func (proxier *Proxier) syncProxyRules() {
// ipvs call // ipvs call
serv := &utilipvs.VirtualServer{ serv := &utilipvs.VirtualServer{
Address: net.ParseIP(externalIP), Address: net.ParseIP(externalIP),
Port: uint16(svcInfo.Port), Port: uint16(svcInfo.Port()),
Protocol: string(svcInfo.Protocol), Protocol: string(svcInfo.Protocol()),
Scheduler: proxier.ipvsScheduler, Scheduler: proxier.ipvsScheduler,
} }
if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP { if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
serv.Flags |= utilipvs.FlagPersistent serv.Flags |= utilipvs.FlagPersistent
serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds) serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
} }
if err := proxier.syncService(svcNameString, serv, true); err == nil { if err := proxier.syncService(svcNameString, serv, true); err == nil {
activeIPVSServices[serv.String()] = true activeIPVSServices[serv.String()] = true
@ -962,12 +962,12 @@ func (proxier *Proxier) syncProxyRules() {
} }
// Capture load-balancer ingress. // Capture load-balancer ingress.
for _, ingress := range svcInfo.LoadBalancerStatus.Ingress { for _, ingress := range svcInfo.LoadBalancerIPStrings() {
if ingress.IP != "" { if ingress != "" {
// ipset call // ipset call
entry = &utilipset.Entry{ entry = &utilipset.Entry{
IP: ingress.IP, IP: ingress,
Port: svcInfo.Port, Port: svcInfo.Port(),
Protocol: protocol, Protocol: protocol,
SetType: utilipset.HashIPPort, SetType: utilipset.HashIPPort,
} }
@ -981,14 +981,14 @@ func (proxier *Proxier) syncProxyRules() {
} }
proxier.ipsetList[kubeLoadBalancerSet].activeEntries.Insert(entry.String()) proxier.ipsetList[kubeLoadBalancerSet].activeEntries.Insert(entry.String())
// insert loadbalancer entry to lbIngressLocalSet if service externaltrafficpolicy=local // insert loadbalancer entry to lbIngressLocalSet if service externaltrafficpolicy=local
if svcInfo.OnlyNodeLocalEndpoints { if svcInfo.OnlyNodeLocalEndpoints() {
if valid := proxier.ipsetList[kubeLoadBalancerLocalSet].validateEntry(entry); !valid { if valid := proxier.ipsetList[kubeLoadBalancerLocalSet].validateEntry(entry); !valid {
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerLocalSet].Name)) klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerLocalSet].Name))
continue continue
} }
proxier.ipsetList[kubeLoadBalancerLocalSet].activeEntries.Insert(entry.String()) proxier.ipsetList[kubeLoadBalancerLocalSet].activeEntries.Insert(entry.String())
} }
if len(svcInfo.LoadBalancerSourceRanges) != 0 { if len(svcInfo.LoadBalancerSourceRanges()) != 0 {
// The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field. // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field.
// This currently works for loadbalancers that preserves source ips. // This currently works for loadbalancers that preserves source ips.
// For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply. // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply.
@ -998,11 +998,11 @@ func (proxier *Proxier) syncProxyRules() {
} }
proxier.ipsetList[kubeLoadbalancerFWSet].activeEntries.Insert(entry.String()) proxier.ipsetList[kubeLoadbalancerFWSet].activeEntries.Insert(entry.String())
allowFromNode := false allowFromNode := false
for _, src := range svcInfo.LoadBalancerSourceRanges { for _, src := range svcInfo.LoadBalancerSourceRanges() {
// ipset call // ipset call
entry = &utilipset.Entry{ entry = &utilipset.Entry{
IP: ingress.IP, IP: ingress,
Port: svcInfo.Port, Port: svcInfo.Port(),
Protocol: protocol, Protocol: protocol,
Net: src, Net: src,
SetType: utilipset.HashIPPortNet, SetType: utilipset.HashIPPortNet,
@ -1025,10 +1025,10 @@ func (proxier *Proxier) syncProxyRules() {
// Need to add the following rule to allow request on host. // Need to add the following rule to allow request on host.
if allowFromNode { if allowFromNode {
entry = &utilipset.Entry{ entry = &utilipset.Entry{
IP: ingress.IP, IP: ingress,
Port: svcInfo.Port, Port: svcInfo.Port(),
Protocol: protocol, Protocol: protocol,
IP2: ingress.IP, IP2: ingress,
SetType: utilipset.HashIPPortIP, SetType: utilipset.HashIPPortIP,
} }
// enumerate all white list source ip // enumerate all white list source ip
@ -1042,19 +1042,19 @@ func (proxier *Proxier) syncProxyRules() {
// ipvs call // ipvs call
serv := &utilipvs.VirtualServer{ serv := &utilipvs.VirtualServer{
Address: net.ParseIP(ingress.IP), Address: net.ParseIP(ingress),
Port: uint16(svcInfo.Port), Port: uint16(svcInfo.Port()),
Protocol: string(svcInfo.Protocol), Protocol: string(svcInfo.Protocol()),
Scheduler: proxier.ipvsScheduler, Scheduler: proxier.ipvsScheduler,
} }
if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP { if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
serv.Flags |= utilipvs.FlagPersistent serv.Flags |= utilipvs.FlagPersistent
serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds) serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
} }
if err := proxier.syncService(svcNameString, serv, true); err == nil { if err := proxier.syncService(svcNameString, serv, true); err == nil {
activeIPVSServices[serv.String()] = true activeIPVSServices[serv.String()] = true
activeBindAddrs[serv.Address.String()] = true activeBindAddrs[serv.Address.String()] = true
if err := proxier.syncEndpoint(svcName, svcInfo.OnlyNodeLocalEndpoints, serv); err != nil { if err := proxier.syncEndpoint(svcName, svcInfo.OnlyNodeLocalEndpoints(), serv); err != nil {
klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err) klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
} }
} else { } else {
@ -1063,7 +1063,7 @@ func (proxier *Proxier) syncProxyRules() {
} }
} }
if svcInfo.NodePort != 0 { if svcInfo.NodePort() != 0 {
addresses, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer) addresses, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer)
if err != nil { if err != nil {
klog.Errorf("Failed to get node ip address matching nodeport cidr: %v", err) klog.Errorf("Failed to get node ip address matching nodeport cidr: %v", err)
@ -1075,7 +1075,7 @@ func (proxier *Proxier) syncProxyRules() {
lp := utilproxy.LocalPort{ lp := utilproxy.LocalPort{
Description: "nodePort for " + svcNameString, Description: "nodePort for " + svcNameString,
IP: address, IP: address,
Port: svcInfo.NodePort, Port: svcInfo.NodePort(),
Protocol: protocol, Protocol: protocol,
} }
if utilproxy.IsZeroCIDR(address) { if utilproxy.IsZeroCIDR(address) {
@ -1095,14 +1095,14 @@ func (proxier *Proxier) syncProxyRules() {
replacementPortsMap[lp] = proxier.portsMap[lp] replacementPortsMap[lp] = proxier.portsMap[lp]
// We do not start listening on SCTP ports, according to our agreement in the // We do not start listening on SCTP ports, according to our agreement in the
// SCTP support KEP // SCTP support KEP
} else if svcInfo.GetProtocol() != v1.ProtocolSCTP { } else if svcInfo.Protocol() != v1.ProtocolSCTP {
socket, err := proxier.portMapper.OpenLocalPort(&lp) socket, err := proxier.portMapper.OpenLocalPort(&lp)
if err != nil { if err != nil {
klog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err) klog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
continue continue
} }
if lp.Protocol == "udp" { if lp.Protocol == "udp" {
isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP) isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP())
conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP) conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP)
} }
replacementPortsMap[lp] = socket replacementPortsMap[lp] = socket
@ -1111,13 +1111,14 @@ func (proxier *Proxier) syncProxyRules() {
// Nodeports need SNAT, unless they're local. // Nodeports need SNAT, unless they're local.
// ipset call // ipset call
var nodePortSet *IPSet var nodePortSet *IPSet
switch protocol { switch protocol {
case "tcp": case "tcp":
nodePortSet = proxier.ipsetList[kubeNodePortSetTCP] nodePortSet = proxier.ipsetList[kubeNodePortSetTCP]
entry = &utilipset.Entry{ entry = &utilipset.Entry{
// No need to provide ip info // No need to provide ip info
Port: svcInfo.NodePort, Port: svcInfo.NodePort(),
Protocol: protocol, Protocol: protocol,
SetType: utilipset.BitmapPort, SetType: utilipset.BitmapPort,
} }
@ -1125,7 +1126,7 @@ func (proxier *Proxier) syncProxyRules() {
nodePortSet = proxier.ipsetList[kubeNodePortSetUDP] nodePortSet = proxier.ipsetList[kubeNodePortSetUDP]
entry = &utilipset.Entry{ entry = &utilipset.Entry{
// No need to provide ip info // No need to provide ip info
Port: svcInfo.NodePort, Port: svcInfo.NodePort(),
Protocol: protocol, Protocol: protocol,
SetType: utilipset.BitmapPort, SetType: utilipset.BitmapPort,
} }
@ -1133,7 +1134,7 @@ func (proxier *Proxier) syncProxyRules() {
nodePortSet = proxier.ipsetList[kubeNodePortSetSCTP] nodePortSet = proxier.ipsetList[kubeNodePortSetSCTP]
entry = &utilipset.Entry{ entry = &utilipset.Entry{
IP: proxier.nodeIP.String(), IP: proxier.nodeIP.String(),
Port: svcInfo.NodePort, Port: svcInfo.NodePort(),
Protocol: protocol, Protocol: protocol,
SetType: utilipset.HashIPPort, SetType: utilipset.HashIPPort,
} }
@ -1150,7 +1151,7 @@ func (proxier *Proxier) syncProxyRules() {
} }
// Add externaltrafficpolicy=local type nodeport entry // Add externaltrafficpolicy=local type nodeport entry
if svcInfo.OnlyNodeLocalEndpoints { if svcInfo.OnlyNodeLocalEndpoints() {
var nodePortLocalSet *IPSet var nodePortLocalSet *IPSet
switch protocol { switch protocol {
case "tcp": case "tcp":
@ -1189,18 +1190,18 @@ func (proxier *Proxier) syncProxyRules() {
// ipvs call // ipvs call
serv := &utilipvs.VirtualServer{ serv := &utilipvs.VirtualServer{
Address: nodeIP, Address: nodeIP,
Port: uint16(svcInfo.NodePort), Port: uint16(svcInfo.NodePort()),
Protocol: string(svcInfo.Protocol), Protocol: string(svcInfo.Protocol()),
Scheduler: proxier.ipvsScheduler, Scheduler: proxier.ipvsScheduler,
} }
if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP { if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
serv.Flags |= utilipvs.FlagPersistent serv.Flags |= utilipvs.FlagPersistent
serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds) serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
} }
// There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`. // There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`.
if err := proxier.syncService(svcNameString, serv, false); err == nil { if err := proxier.syncService(svcNameString, serv, false); err == nil {
activeIPVSServices[serv.String()] = true activeIPVSServices[serv.String()] = true
if err := proxier.syncEndpoint(svcName, svcInfo.OnlyNodeLocalEndpoints, serv); err != nil { if err := proxier.syncEndpoint(svcName, svcInfo.OnlyNodeLocalEndpoints(), serv); err != nil {
klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err) klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
} }
} else { } else {
@ -1529,9 +1530,9 @@ func (proxier *Proxier) getExistingChains(buffer *bytes.Buffer, table utiliptabl
// This assumes the proxier mutex is held // This assumes the proxier mutex is held
func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) { func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) {
for _, epSvcPair := range connectionMap { for _, epSvcPair := range connectionMap {
if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.GetProtocol() == v1.ProtocolUDP { if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == v1.ProtocolUDP {
endpointIP := utilproxy.IPPart(epSvcPair.Endpoint) endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIPString(), endpointIP, v1.ProtocolUDP) err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP)
if err != nil { if err != nil {
klog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err) klog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err)
} }

View File

@ -40,60 +40,85 @@ import (
// or can be used for constructing a more specific ServiceInfo struct // or can be used for constructing a more specific ServiceInfo struct
// defined by the proxier if needed. // defined by the proxier if needed.
type BaseServiceInfo struct { type BaseServiceInfo struct {
ClusterIP net.IP clusterIP net.IP
Port int port int
Protocol v1.Protocol protocol v1.Protocol
NodePort int nodePort int
LoadBalancerStatus v1.LoadBalancerStatus loadBalancerStatus v1.LoadBalancerStatus
SessionAffinityType v1.ServiceAffinity sessionAffinityType v1.ServiceAffinity
StickyMaxAgeSeconds int stickyMaxAgeSeconds int
ExternalIPs []string externalIPs []string
LoadBalancerSourceRanges []string loadBalancerSourceRanges []string
HealthCheckNodePort int healthCheckNodePort int
OnlyNodeLocalEndpoints bool onlyNodeLocalEndpoints bool
} }
var _ ServicePort = &BaseServiceInfo{} var _ ServicePort = &BaseServiceInfo{}
// String is part of ServicePort interface. // String is part of ServicePort interface.
func (info *BaseServiceInfo) String() string { func (info *BaseServiceInfo) String() string {
return fmt.Sprintf("%s:%d/%s", info.ClusterIP, info.Port, info.Protocol) return fmt.Sprintf("%s:%d/%s", info.clusterIP, info.port, info.protocol)
} }
// ClusterIPString is part of ServicePort interface. // ClusterIP is part of ServicePort interface.
func (info *BaseServiceInfo) ClusterIPString() string { func (info *BaseServiceInfo) ClusterIP() net.IP {
return info.ClusterIP.String() return info.clusterIP
} }
// GetProtocol is part of ServicePort interface. // Port is part of ServicePort interface.
func (info *BaseServiceInfo) GetProtocol() v1.Protocol { func (info *BaseServiceInfo) Port() int {
return info.Protocol return info.port
} }
// GetHealthCheckNodePort is part of ServicePort interface. // SessionAffinityType is part of the ServicePort interface.
func (info *BaseServiceInfo) GetHealthCheckNodePort() int { func (info *BaseServiceInfo) SessionAffinityType() v1.ServiceAffinity {
return info.HealthCheckNodePort return info.sessionAffinityType
} }
// GetNodePort is part of the ServicePort interface. // StickyMaxAgeSeconds is part of the ServicePort interface
func (info *BaseServiceInfo) GetNodePort() int { func (info *BaseServiceInfo) StickyMaxAgeSeconds() int {
return info.NodePort return info.stickyMaxAgeSeconds
}
// Protocol is part of ServicePort interface.
func (info *BaseServiceInfo) Protocol() v1.Protocol {
return info.protocol
}
// LoadBalancerSourceRanges is part of ServicePort interface
func (info *BaseServiceInfo) LoadBalancerSourceRanges() []string {
return info.loadBalancerSourceRanges
}
// HealthCheckNodePort is part of ServicePort interface.
func (info *BaseServiceInfo) HealthCheckNodePort() int {
return info.healthCheckNodePort
}
// NodePort is part of the ServicePort interface.
func (info *BaseServiceInfo) NodePort() int {
return info.nodePort
} }
// ExternalIPStrings is part of ServicePort interface. // ExternalIPStrings is part of ServicePort interface.
func (info *BaseServiceInfo) ExternalIPStrings() []string { func (info *BaseServiceInfo) ExternalIPStrings() []string {
return info.ExternalIPs return info.externalIPs
} }
// LoadBalancerIPStrings is part of ServicePort interface. // LoadBalancerIPStrings is part of ServicePort interface.
func (info *BaseServiceInfo) LoadBalancerIPStrings() []string { func (info *BaseServiceInfo) LoadBalancerIPStrings() []string {
var ips []string var ips []string
for _, ing := range info.LoadBalancerStatus.Ingress { for _, ing := range info.loadBalancerStatus.Ingress {
ips = append(ips, ing.IP) ips = append(ips, ing.IP)
} }
return ips return ips
} }
// OnlyNodeLocalEndpoints is part of ServicePort interface.
func (info *BaseServiceInfo) OnlyNodeLocalEndpoints() bool {
return info.onlyNodeLocalEndpoints
}
func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, service *v1.Service) *BaseServiceInfo { func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, service *v1.Service) *BaseServiceInfo {
onlyNodeLocalEndpoints := false onlyNodeLocalEndpoints := false
if apiservice.RequestsOnlyLocalTraffic(service) { if apiservice.RequestsOnlyLocalTraffic(service) {
@ -105,32 +130,32 @@ func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, servic
stickyMaxAgeSeconds = int(*service.Spec.SessionAffinityConfig.ClientIP.TimeoutSeconds) stickyMaxAgeSeconds = int(*service.Spec.SessionAffinityConfig.ClientIP.TimeoutSeconds)
} }
info := &BaseServiceInfo{ info := &BaseServiceInfo{
ClusterIP: net.ParseIP(service.Spec.ClusterIP), clusterIP: net.ParseIP(service.Spec.ClusterIP),
Port: int(port.Port), port: int(port.Port),
Protocol: port.Protocol, protocol: port.Protocol,
NodePort: int(port.NodePort), nodePort: int(port.NodePort),
// Deep-copy in case the service instance changes // Deep-copy in case the service instance changes
LoadBalancerStatus: *service.Status.LoadBalancer.DeepCopy(), loadBalancerStatus: *service.Status.LoadBalancer.DeepCopy(),
SessionAffinityType: service.Spec.SessionAffinity, sessionAffinityType: service.Spec.SessionAffinity,
StickyMaxAgeSeconds: stickyMaxAgeSeconds, stickyMaxAgeSeconds: stickyMaxAgeSeconds,
OnlyNodeLocalEndpoints: onlyNodeLocalEndpoints, onlyNodeLocalEndpoints: onlyNodeLocalEndpoints,
} }
if sct.isIPv6Mode == nil { if sct.isIPv6Mode == nil {
info.ExternalIPs = make([]string, len(service.Spec.ExternalIPs)) info.externalIPs = make([]string, len(service.Spec.ExternalIPs))
info.LoadBalancerSourceRanges = make([]string, len(service.Spec.LoadBalancerSourceRanges)) info.loadBalancerSourceRanges = make([]string, len(service.Spec.LoadBalancerSourceRanges))
copy(info.LoadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges) copy(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges)
copy(info.ExternalIPs, service.Spec.ExternalIPs) copy(info.externalIPs, service.Spec.ExternalIPs)
} else { } else {
// Filter out the incorrect IP version case. // Filter out the incorrect IP version case.
// If ExternalIPs and LoadBalancerSourceRanges on service contains incorrect IP versions, // If ExternalIPs and LoadBalancerSourceRanges on service contains incorrect IP versions,
// only filter out the incorrect ones. // only filter out the incorrect ones.
var incorrectIPs []string var incorrectIPs []string
info.ExternalIPs, incorrectIPs = utilproxy.FilterIncorrectIPVersion(service.Spec.ExternalIPs, *sct.isIPv6Mode) info.externalIPs, incorrectIPs = utilproxy.FilterIncorrectIPVersion(service.Spec.ExternalIPs, *sct.isIPv6Mode)
if len(incorrectIPs) > 0 { if len(incorrectIPs) > 0 {
utilproxy.LogAndEmitIncorrectIPVersionEvent(sct.recorder, "externalIPs", strings.Join(incorrectIPs, ","), service.Namespace, service.Name, service.UID) utilproxy.LogAndEmitIncorrectIPVersionEvent(sct.recorder, "externalIPs", strings.Join(incorrectIPs, ","), service.Namespace, service.Name, service.UID)
} }
info.LoadBalancerSourceRanges, incorrectIPs = utilproxy.FilterIncorrectCIDRVersion(service.Spec.LoadBalancerSourceRanges, *sct.isIPv6Mode) info.loadBalancerSourceRanges, incorrectIPs = utilproxy.FilterIncorrectCIDRVersion(service.Spec.LoadBalancerSourceRanges, *sct.isIPv6Mode)
if len(incorrectIPs) > 0 { if len(incorrectIPs) > 0 {
utilproxy.LogAndEmitIncorrectIPVersionEvent(sct.recorder, "loadBalancerSourceRanges", strings.Join(incorrectIPs, ","), service.Namespace, service.Name, service.UID) utilproxy.LogAndEmitIncorrectIPVersionEvent(sct.recorder, "loadBalancerSourceRanges", strings.Join(incorrectIPs, ","), service.Namespace, service.Name, service.UID)
} }
@ -141,7 +166,7 @@ func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, servic
if p == 0 { if p == 0 {
klog.Errorf("Service %s/%s has no healthcheck nodeport", service.Namespace, service.Name) klog.Errorf("Service %s/%s has no healthcheck nodeport", service.Namespace, service.Name)
} else { } else {
info.HealthCheckNodePort = int(p) info.healthCheckNodePort = int(p)
} }
} }
@ -239,8 +264,8 @@ func UpdateServiceMap(serviceMap ServiceMap, changes *ServiceChangeTracker) (res
// computing this incrementally similarly to serviceMap. // computing this incrementally similarly to serviceMap.
result.HCServiceNodePorts = make(map[types.NamespacedName]uint16) result.HCServiceNodePorts = make(map[types.NamespacedName]uint16)
for svcPortName, info := range serviceMap { for svcPortName, info := range serviceMap {
if info.GetHealthCheckNodePort() != 0 { if info.HealthCheckNodePort() != 0 {
result.HCServiceNodePorts[svcPortName.NamespacedName] = uint16(info.GetHealthCheckNodePort()) result.HCServiceNodePorts[svcPortName.NamespacedName] = uint16(info.HealthCheckNodePort())
} }
} }
@ -355,8 +380,8 @@ func (sm *ServiceMap) unmerge(other ServiceMap, UDPStaleClusterIP sets.String) {
info, exists := (*sm)[svcPortName] info, exists := (*sm)[svcPortName]
if exists { if exists {
klog.V(1).Infof("Removing service port %q", svcPortName) klog.V(1).Infof("Removing service port %q", svcPortName)
if info.GetProtocol() == v1.ProtocolUDP { if info.Protocol() == v1.ProtocolUDP {
UDPStaleClusterIP.Insert(info.ClusterIPString()) UDPStaleClusterIP.Insert(info.ClusterIP().String())
} }
delete(*sm, svcPortName) delete(*sm, svcPortName)
} else { } else {

View File

@ -33,12 +33,12 @@ const testHostname = "test-hostname"
func makeTestServiceInfo(clusterIP string, port int, protocol string, healthcheckNodePort int, svcInfoFuncs ...func(*BaseServiceInfo)) *BaseServiceInfo { func makeTestServiceInfo(clusterIP string, port int, protocol string, healthcheckNodePort int, svcInfoFuncs ...func(*BaseServiceInfo)) *BaseServiceInfo {
info := &BaseServiceInfo{ info := &BaseServiceInfo{
ClusterIP: net.ParseIP(clusterIP), clusterIP: net.ParseIP(clusterIP),
Port: port, port: port,
Protocol: v1.Protocol(protocol), protocol: v1.Protocol(protocol),
} }
if healthcheckNodePort != 0 { if healthcheckNodePort != 0 {
info.HealthCheckNodePort = healthcheckNodePort info.healthCheckNodePort = healthcheckNodePort
} }
for _, svcInfoFunc := range svcInfoFuncs { for _, svcInfoFunc := range svcInfoFuncs {
svcInfoFunc(info) svcInfoFunc(info)
@ -269,8 +269,8 @@ func TestServiceToServiceMap(t *testing.T) {
}, },
expected: map[ServicePortName]*BaseServiceInfo{ expected: map[ServicePortName]*BaseServiceInfo{
makeServicePortName("test", "validIPv4", "testPort"): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(info *BaseServiceInfo) { makeServicePortName("test", "validIPv4", "testPort"): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(info *BaseServiceInfo) {
info.ExternalIPs = []string{testExternalIPv4} info.externalIPs = []string{testExternalIPv4}
info.LoadBalancerSourceRanges = []string{testSourceRangeIPv4} info.loadBalancerSourceRanges = []string{testSourceRangeIPv4}
}), }),
}, },
isIPv6Mode: &falseVal, isIPv6Mode: &falseVal,
@ -297,8 +297,8 @@ func TestServiceToServiceMap(t *testing.T) {
}, },
expected: map[ServicePortName]*BaseServiceInfo{ expected: map[ServicePortName]*BaseServiceInfo{
makeServicePortName("test", "validIPv6", "testPort"): makeTestServiceInfo(testClusterIPv6, 12345, "TCP", 0, func(info *BaseServiceInfo) { makeServicePortName("test", "validIPv6", "testPort"): makeTestServiceInfo(testClusterIPv6, 12345, "TCP", 0, func(info *BaseServiceInfo) {
info.ExternalIPs = []string{testExternalIPv6} info.externalIPs = []string{testExternalIPv6}
info.LoadBalancerSourceRanges = []string{testSourceRangeIPv6} info.loadBalancerSourceRanges = []string{testSourceRangeIPv6}
}), }),
}, },
isIPv6Mode: &trueVal, isIPv6Mode: &trueVal,
@ -325,8 +325,8 @@ func TestServiceToServiceMap(t *testing.T) {
}, },
expected: map[ServicePortName]*BaseServiceInfo{ expected: map[ServicePortName]*BaseServiceInfo{
makeServicePortName("test", "filterIPv6InIPV4Mode", "testPort"): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(info *BaseServiceInfo) { makeServicePortName("test", "filterIPv6InIPV4Mode", "testPort"): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(info *BaseServiceInfo) {
info.ExternalIPs = []string{testExternalIPv4} info.externalIPs = []string{testExternalIPv4}
info.LoadBalancerSourceRanges = []string{testSourceRangeIPv4} info.loadBalancerSourceRanges = []string{testSourceRangeIPv4}
}), }),
}, },
isIPv6Mode: &falseVal, isIPv6Mode: &falseVal,
@ -353,8 +353,8 @@ func TestServiceToServiceMap(t *testing.T) {
}, },
expected: map[ServicePortName]*BaseServiceInfo{ expected: map[ServicePortName]*BaseServiceInfo{
makeServicePortName("test", "filterIPv4InIPV6Mode", "testPort"): makeTestServiceInfo(testClusterIPv6, 12345, "TCP", 0, func(info *BaseServiceInfo) { makeServicePortName("test", "filterIPv4InIPV6Mode", "testPort"): makeTestServiceInfo(testClusterIPv6, 12345, "TCP", 0, func(info *BaseServiceInfo) {
info.ExternalIPs = []string{testExternalIPv6} info.externalIPs = []string{testExternalIPv6}
info.LoadBalancerSourceRanges = []string{testSourceRangeIPv6} info.loadBalancerSourceRanges = []string{testSourceRangeIPv6}
}), }),
}, },
isIPv6Mode: &trueVal, isIPv6Mode: &trueVal,
@ -371,12 +371,12 @@ func TestServiceToServiceMap(t *testing.T) {
} }
for svcKey, expectedInfo := range tc.expected { for svcKey, expectedInfo := range tc.expected {
svcInfo := newServices[svcKey].(*BaseServiceInfo) svcInfo := newServices[svcKey].(*BaseServiceInfo)
if !svcInfo.ClusterIP.Equal(expectedInfo.ClusterIP) || if !svcInfo.clusterIP.Equal(expectedInfo.clusterIP) ||
svcInfo.Port != expectedInfo.Port || svcInfo.port != expectedInfo.port ||
svcInfo.Protocol != expectedInfo.Protocol || svcInfo.protocol != expectedInfo.protocol ||
svcInfo.HealthCheckNodePort != expectedInfo.HealthCheckNodePort || svcInfo.healthCheckNodePort != expectedInfo.healthCheckNodePort ||
!sets.NewString(svcInfo.ExternalIPs...).Equal(sets.NewString(expectedInfo.ExternalIPs...)) || !sets.NewString(svcInfo.externalIPs...).Equal(sets.NewString(expectedInfo.externalIPs...)) ||
!sets.NewString(svcInfo.LoadBalancerSourceRanges...).Equal(sets.NewString(expectedInfo.LoadBalancerSourceRanges...)) { !sets.NewString(svcInfo.loadBalancerSourceRanges...).Equal(sets.NewString(expectedInfo.loadBalancerSourceRanges...)) {
t.Errorf("[%s] expected new[%v]to be %v, got %v", tc.desc, svcKey, expectedInfo, *svcInfo) t.Errorf("[%s] expected new[%v]to be %v, got %v", tc.desc, svcKey, expectedInfo, *svcInfo)
} }
} }

View File

@ -18,6 +18,7 @@ package proxy
import ( import (
"fmt" "fmt"
"net"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
@ -52,18 +53,28 @@ func (spn ServicePortName) String() string {
type ServicePort interface { type ServicePort interface {
// String returns service string. An example format can be: `IP:Port/Protocol`. // String returns service string. An example format can be: `IP:Port/Protocol`.
String() string String() string
// ClusterIPString returns service cluster IP in string format. // GetClusterIP returns service cluster IP in net.IP format.
ClusterIPString() string ClusterIP() net.IP
// GetPort returns service port if present. If return 0 means not present.
Port() int
// GetSessionAffinityType returns service session affinity type
SessionAffinityType() v1.ServiceAffinity
// GetStickyMaxAgeSeconds returns service max connection age
StickyMaxAgeSeconds() int
// ExternalIPStrings returns service ExternalIPs as a string array. // ExternalIPStrings returns service ExternalIPs as a string array.
ExternalIPStrings() []string ExternalIPStrings() []string
// LoadBalancerIPStrings returns service LoadBalancerIPs as a string array. // LoadBalancerIPStrings returns service LoadBalancerIPs as a string array.
LoadBalancerIPStrings() []string LoadBalancerIPStrings() []string
// GetProtocol returns service protocol. // GetProtocol returns service protocol.
GetProtocol() v1.Protocol Protocol() v1.Protocol
// LoadBalancerSourceRanges returns service LoadBalancerSourceRanges if present empty array if not
LoadBalancerSourceRanges() []string
// GetHealthCheckNodePort returns service health check node port if present. If return 0, it means not present. // GetHealthCheckNodePort returns service health check node port if present. If return 0, it means not present.
GetHealthCheckNodePort() int HealthCheckNodePort() int
// GetNodePort returns a service Node port if present. If return 0, it means not present. // GetNodePort returns a service Node port if present. If return 0, it means not present.
GetNodePort() int NodePort() int
// GetOnlyNodeLocalEndpoints returns if a service has only node local endpoints
OnlyNodeLocalEndpoints() bool
} }
// Endpoint in an interface which abstracts information about an endpoint. // Endpoint in an interface which abstracts information about an endpoint.