mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
consume endpoints IPPart function in util
This commit is contained in:
parent
1ed1200143
commit
d96409178b
@ -166,26 +166,9 @@ type endpointsInfo struct {
|
||||
chainName utiliptables.Chain
|
||||
}
|
||||
|
||||
// Returns just the IP part of an IP or IP:port or endpoint string. If the IP
|
||||
// part is an IPv6 address enclosed in brackets (e.g. "[fd00:1::5]:9999"),
|
||||
// then the brackets are stripped as well.
|
||||
func ipPart(s string) string {
|
||||
if ip := net.ParseIP(s); ip != nil {
|
||||
// IP address without port
|
||||
return s
|
||||
}
|
||||
// Must be IP:port
|
||||
ip, _, err := net.SplitHostPort(s)
|
||||
if err != nil {
|
||||
glog.Errorf("Error parsing '%s': %v", s, err)
|
||||
return ""
|
||||
}
|
||||
return ip
|
||||
}
|
||||
|
||||
// Returns just the IP part of the endpoint.
|
||||
// IPPart returns just the IP part of the endpoint.
|
||||
func (e *endpointsInfo) IPPart() string {
|
||||
return ipPart(e.endpoint)
|
||||
return utilproxy.IPPart(e.endpoint)
|
||||
}
|
||||
|
||||
// Returns the endpoint chain name for a given endpointsInfo.
|
||||
@ -944,7 +927,7 @@ type endpointServicePair struct {
|
||||
}
|
||||
|
||||
func (esp *endpointServicePair) IPPart() string {
|
||||
return ipPart(esp.endpoint)
|
||||
return utilproxy.IPPart(esp.endpoint)
|
||||
}
|
||||
|
||||
// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
|
||||
@ -953,7 +936,7 @@ func (esp *endpointServicePair) IPPart() string {
|
||||
func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServicePair]bool) {
|
||||
for epSvcPair := range connectionMap {
|
||||
if svcInfo, ok := proxier.serviceMap[epSvcPair.servicePortName]; ok && svcInfo.protocol == api.ProtocolUDP {
|
||||
endpointIP := epSvcPair.endpoint[0:strings.Index(epSvcPair.endpoint, ":")]
|
||||
endpointIP := utilproxy.IPPart(epSvcPair.endpoint)
|
||||
err := utilproxy.ClearUDPConntrackForPeers(proxier.exec, svcInfo.clusterIP.String(), endpointIP)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.servicePortName.String(), err)
|
||||
@ -962,16 +945,6 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServ
|
||||
}
|
||||
}
|
||||
|
||||
// hostAddress returns a host address of the form <ip-address>/32 for
|
||||
// IPv4 and <ip-address>/128 for IPv6
|
||||
func hostAddress(ip net.IP) string {
|
||||
len := 32
|
||||
if ip.To4() == nil {
|
||||
len = 128
|
||||
}
|
||||
return fmt.Sprintf("%s/%d", ip.String(), len)
|
||||
}
|
||||
|
||||
// This is where all of the iptables-save/restore calls happen.
|
||||
// The only other iptables rules are those that are setup in iptablesInit()
|
||||
// This assumes proxier.mu is NOT held
|
||||
@ -1189,7 +1162,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
"-A", string(kubeServicesChain),
|
||||
"-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString),
|
||||
"-m", protocol, "-p", protocol,
|
||||
"-d", hostAddress(svcInfo.clusterIP),
|
||||
"-d", utilproxy.ToCIDR(svcInfo.clusterIP),
|
||||
"--dport", strconv.Itoa(svcInfo.port),
|
||||
)
|
||||
if proxier.masqueradeAll {
|
||||
@ -1243,7 +1216,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
"-A", string(kubeServicesChain),
|
||||
"-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcNameString),
|
||||
"-m", protocol, "-p", protocol,
|
||||
"-d", hostAddress(net.ParseIP(externalIP)),
|
||||
"-d", utilproxy.ToCIDR(net.ParseIP(externalIP)),
|
||||
"--dport", strconv.Itoa(svcInfo.port),
|
||||
)
|
||||
// We have to SNAT packets to external IPs.
|
||||
@ -1269,7 +1242,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
"-A", string(kubeServicesChain),
|
||||
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
|
||||
"-m", protocol, "-p", protocol,
|
||||
"-d", hostAddress(net.ParseIP(externalIP)),
|
||||
"-d", utilproxy.ToCIDR(net.ParseIP(externalIP)),
|
||||
"--dport", strconv.Itoa(svcInfo.port),
|
||||
"-j", "REJECT",
|
||||
)
|
||||
@ -1295,7 +1268,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
"-A", string(kubeServicesChain),
|
||||
"-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString),
|
||||
"-m", protocol, "-p", protocol,
|
||||
"-d", hostAddress(net.ParseIP(ingress.IP)),
|
||||
"-d", utilproxy.ToCIDR(net.ParseIP(ingress.IP)),
|
||||
"--dport", strconv.Itoa(svcInfo.port),
|
||||
)
|
||||
// jump to service firewall chain
|
||||
@ -1333,7 +1306,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly.
|
||||
// Need to add the following rule to allow request on host.
|
||||
if allowFromNode {
|
||||
writeLine(proxier.natRules, append(args, "-s", hostAddress(net.ParseIP(ingress.IP)), "-j", string(chosenChain))...)
|
||||
writeLine(proxier.natRules, append(args, "-s", utilproxy.ToCIDR(net.ParseIP(ingress.IP)), "-j", string(chosenChain))...)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1417,7 +1390,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
"-A", string(kubeServicesChain),
|
||||
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
|
||||
"-m", protocol, "-p", protocol,
|
||||
"-d", hostAddress(svcInfo.clusterIP),
|
||||
"-d", utilproxy.ToCIDR(svcInfo.clusterIP),
|
||||
"--dport", strconv.Itoa(svcInfo.port),
|
||||
"-j", "REJECT",
|
||||
)
|
||||
@ -1489,7 +1462,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
)
|
||||
// Handle traffic that loops back to the originator with SNAT.
|
||||
writeLine(proxier.natRules, append(args,
|
||||
"-s", hostAddress(net.ParseIP(epIP)),
|
||||
"-s", utilproxy.ToCIDR(net.ParseIP(epIP)),
|
||||
"-j", string(KubeMarkMasqChain))...)
|
||||
// Update client-affinity lists.
|
||||
if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP {
|
||||
|
@ -56,52 +56,6 @@ func checkAllLines(t *testing.T, table utiliptables.Table, save []byte, expected
|
||||
}
|
||||
}
|
||||
|
||||
func TestIpPart(t *testing.T) {
|
||||
const noError = ""
|
||||
|
||||
testCases := []struct {
|
||||
endpoint string
|
||||
expectedIP string
|
||||
expectedError string
|
||||
}{
|
||||
{"1.2.3.4", "1.2.3.4", noError},
|
||||
{"1.2.3.4:9999", "1.2.3.4", noError},
|
||||
{"2001:db8::1:1", "2001:db8::1:1", noError},
|
||||
{"[2001:db8::2:2]:9999", "2001:db8::2:2", noError},
|
||||
{"1.2.3.4::9999", "", "too many colons"},
|
||||
{"1.2.3.4:[0]", "", "unexpected '[' in address"},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
ip := ipPart(tc.endpoint)
|
||||
if tc.expectedError == noError {
|
||||
if ip != tc.expectedIP {
|
||||
t.Errorf("Unexpected IP for %s: Expected: %s, Got %s", tc.endpoint, tc.expectedIP, ip)
|
||||
}
|
||||
} else if ip != "" {
|
||||
t.Errorf("Error did not occur for %s, expected: '%s' error", tc.endpoint, tc.expectedError)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestHostAddress(t *testing.T) {
|
||||
testCases := []struct {
|
||||
ip string
|
||||
expectedAddr string
|
||||
}{
|
||||
{"1.2.3.4", "1.2.3.4/32"},
|
||||
{"2001:db8::1:1", "2001:db8::1:1/128"},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
ip := net.ParseIP(tc.ip)
|
||||
addr := hostAddress(ip)
|
||||
if addr != tc.expectedAddr {
|
||||
t.Errorf("Unexpected host address for %s: Expected: %s, Got %s", tc.ip, tc.expectedAddr, addr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestReadLinesFromByteBuffer(t *testing.T) {
|
||||
testFn := func(byteArray []byte, expected []string) {
|
||||
index := 0
|
||||
@ -272,6 +226,10 @@ func TestDeleteEndpointConnections(t *testing.T) {
|
||||
endpoint: "10.240.0.5:80",
|
||||
servicePortName: svc2,
|
||||
},
|
||||
{
|
||||
endpoint: "[fd00:1::5]:8080",
|
||||
servicePortName: svc2,
|
||||
},
|
||||
}
|
||||
|
||||
expectCommandExecCount := 0
|
||||
@ -281,7 +239,7 @@ func TestDeleteEndpointConnections(t *testing.T) {
|
||||
svcInfo := fakeProxier.serviceMap[testCases[i].servicePortName]
|
||||
if svcInfo.protocol == api.ProtocolUDP {
|
||||
svcIp := svcInfo.clusterIP.String()
|
||||
endpointIp := strings.Split(testCases[i].endpoint, ":")[0]
|
||||
endpointIp := utilproxy.IPPart(testCases[i].endpoint)
|
||||
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", svcIp, endpointIp)
|
||||
execCommand := strings.Join(fcmd.CombinedOutputLog[expectCommandExecCount], " ")
|
||||
if expectCommand != execCommand {
|
||||
|
@ -478,10 +478,7 @@ func (e *endpointsInfo) String() string {
|
||||
|
||||
// IPPart returns just the IP part of the endpoint.
|
||||
func (e *endpointsInfo) IPPart() string {
|
||||
if index := strings.Index(e.endpoint, ":"); index != -1 {
|
||||
return e.endpoint[0:index]
|
||||
}
|
||||
return e.endpoint
|
||||
return utilproxy.IPPart(e.endpoint)
|
||||
}
|
||||
|
||||
type endpointServicePair struct {
|
||||
@ -1262,7 +1259,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServicePair]bool) {
|
||||
for epSvcPair := range connectionMap {
|
||||
if svcInfo, ok := proxier.serviceMap[epSvcPair.servicePortName]; ok && svcInfo.protocol == api.ProtocolUDP {
|
||||
endpointIP := epSvcPair.endpoint[0:strings.Index(epSvcPair.endpoint, ":")]
|
||||
endpointIP := utilproxy.IPPart(epSvcPair.endpoint)
|
||||
err := utilproxy.ClearUDPConntrackForPeers(proxier.exec, svcInfo.clusterIP.String(), endpointIP)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.servicePortName.String(), err)
|
||||
|
@ -4,6 +4,7 @@ go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"conntrack.go",
|
||||
"endpoints.go",
|
||||
"port.go",
|
||||
"utils.go",
|
||||
],
|
||||
@ -21,6 +22,7 @@ go_test(
|
||||
name = "go_default_test",
|
||||
srcs = [
|
||||
"conntrack_test.go",
|
||||
"endpoints_test.go",
|
||||
"port_test.go",
|
||||
"utils_test.go",
|
||||
],
|
||||
|
Loading…
Reference in New Issue
Block a user