migrate to use k8s.io/util LocalPort and ListenPortOpener in iptables.proxier

This commit is contained in:
jornshen 2021-01-18 20:44:15 +08:00
parent f81235605a
commit e68e105102
3 changed files with 26 additions and 70 deletions

View File

@ -58,6 +58,7 @@ go_test(
"//vendor/k8s.io/klog/v2:go_default_library", "//vendor/k8s.io/klog/v2:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library",
"//vendor/k8s.io/utils/exec/testing:go_default_library", "//vendor/k8s.io/utils/exec/testing:go_default_library",
"//vendor/k8s.io/utils/net:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library", "//vendor/k8s.io/utils/pointer:go_default_library",
], ],
) )

View File

@ -186,7 +186,7 @@ type Proxier struct {
mu sync.Mutex // protects the following fields mu sync.Mutex // protects the following fields
serviceMap proxy.ServiceMap serviceMap proxy.ServiceMap
endpointsMap proxy.EndpointsMap endpointsMap proxy.EndpointsMap
portsMap map[utilproxy.LocalPort]utilproxy.Closeable portsMap map[utilnet.LocalPort]utilnet.Closeable
nodeLabels map[string]string nodeLabels map[string]string
// endpointsSynced, endpointSlicesSynced, and servicesSynced are set to true // endpointsSynced, endpointSlicesSynced, and servicesSynced are set to true
// when corresponding objects are synced after startup. This is used to avoid // when corresponding objects are synced after startup. This is used to avoid
@ -206,7 +206,7 @@ type Proxier struct {
localDetector proxyutiliptables.LocalTrafficDetector localDetector proxyutiliptables.LocalTrafficDetector
hostname string hostname string
nodeIP net.IP nodeIP net.IP
portMapper utilproxy.PortOpener portMapper utilnet.PortOpener
recorder record.EventRecorder recorder record.EventRecorder
serviceHealthServer healthcheck.ServiceHealthServer serviceHealthServer healthcheck.ServiceHealthServer
@ -239,14 +239,6 @@ type Proxier struct {
networkInterfacer utilproxy.NetworkInterfacer networkInterfacer utilproxy.NetworkInterfacer
} }
// listenPortOpener opens ports by calling bind() and listen().
type listenPortOpener struct{}
// OpenLocalPort holds the given local port open.
func (l *listenPortOpener) OpenLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) {
return openLocalPort(lp, isIPv6)
}
// Proxier implements proxy.Provider // Proxier implements proxy.Provider
var _ proxy.Provider = &Proxier{} var _ proxy.Provider = &Proxier{}
@ -303,7 +295,7 @@ func NewProxier(ipt utiliptables.Interface,
} }
proxier := &Proxier{ proxier := &Proxier{
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), portsMap: make(map[utilnet.LocalPort]utilnet.Closeable),
serviceMap: make(proxy.ServiceMap), serviceMap: make(proxy.ServiceMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
endpointsMap: make(proxy.EndpointsMap), endpointsMap: make(proxy.EndpointsMap),
@ -316,7 +308,7 @@ func NewProxier(ipt utiliptables.Interface,
localDetector: localDetector, localDetector: localDetector,
hostname: hostname, hostname: hostname,
nodeIP: nodeIP, nodeIP: nodeIP,
portMapper: &listenPortOpener{}, portMapper: &utilnet.ListenPortOpener,
recorder: recorder, recorder: recorder,
serviceHealthServer: serviceHealthServer, serviceHealthServer: serviceHealthServer,
healthzServer: healthzServer, healthzServer: healthzServer,
@ -981,7 +973,7 @@ func (proxier *Proxier) syncProxyRules() {
activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set
// Accumulate the set of local ports that we will be holding open once this update is complete // Accumulate the set of local ports that we will be holding open once this update is complete
replacementPortsMap := map[utilproxy.LocalPort]utilproxy.Closeable{} replacementPortsMap := map[utilnet.LocalPort]utilnet.Closeable{}
// We are creating those slices ones here to avoid memory reallocations // We are creating those slices ones here to avoid memory reallocations
// in every loop. Note that reuse the memory, instead of doing: // in every loop. Note that reuse the memory, instead of doing:
@ -1017,6 +1009,10 @@ func (proxier *Proxier) syncProxyRules() {
continue continue
} }
isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP()) isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP())
localPortIPFamily := utilnet.IPv4
if isIPv6 {
localPortIPFamily = utilnet.IPv6
}
protocol := strings.ToLower(string(svcInfo.Protocol())) protocol := strings.ToLower(string(svcInfo.Protocol()))
svcNameString := svcInfo.serviceNameString svcNameString := svcInfo.serviceNameString
@ -1102,17 +1098,18 @@ func (proxier *Proxier) syncProxyRules() {
// machine, hold the local port open so no other process can open it // machine, hold the local port open so no other process can open it
// (because the socket might open but it would never work). // (because the socket might open but it would never work).
if (svcInfo.Protocol() != v1.ProtocolSCTP) && localAddrSet.Has(net.ParseIP(externalIP)) { if (svcInfo.Protocol() != v1.ProtocolSCTP) && localAddrSet.Has(net.ParseIP(externalIP)) {
lp := utilproxy.LocalPort{ lp := utilnet.LocalPort{
Description: "externalIP for " + svcNameString, Description: "externalIP for " + svcNameString,
IP: externalIP, IP: externalIP,
IPFamily: localPortIPFamily,
Port: svcInfo.Port(), Port: svcInfo.Port(),
Protocol: protocol, Protocol: utilnet.Protocol(svcInfo.Protocol()),
} }
if proxier.portsMap[lp] != nil { if proxier.portsMap[lp] != nil {
klog.V(4).InfoS("Port was open before and is still needed", "port", lp.String()) klog.V(4).InfoS("Port was open before and is still needed", "port", lp.String())
replacementPortsMap[lp] = proxier.portsMap[lp] replacementPortsMap[lp] = proxier.portsMap[lp]
} else { } else {
socket, err := proxier.portMapper.OpenLocalPort(&lp, isIPv6) socket, err := proxier.portMapper.OpenLocalPort(&lp)
if err != nil { if err != nil {
msg := fmt.Sprintf("can't open %s, skipping this externalIP: %v", lp.String(), err) msg := fmt.Sprintf("can't open %s, skipping this externalIP: %v", lp.String(), err)
@ -1126,6 +1123,7 @@ func (proxier *Proxier) syncProxyRules() {
klog.ErrorS(err, "can't open port, skipping externalIP", "port", lp.String()) klog.ErrorS(err, "can't open port, skipping externalIP", "port", lp.String())
continue continue
} }
klog.V(2).InfoS("Opened local port", "port", lp.String())
replacementPortsMap[lp] = socket replacementPortsMap[lp] = socket
} }
} }
@ -1259,13 +1257,14 @@ func (proxier *Proxier) syncProxyRules() {
continue continue
} }
lps := make([]utilproxy.LocalPort, 0) lps := make([]utilnet.LocalPort, 0)
for address := range nodeAddresses { for address := range nodeAddresses {
lp := utilproxy.LocalPort{ lp := utilnet.LocalPort{
Description: "nodePort for " + svcNameString, Description: "nodePort for " + svcNameString,
IP: address, IP: address,
IPFamily: localPortIPFamily,
Port: svcInfo.NodePort(), Port: svcInfo.NodePort(),
Protocol: protocol, Protocol: utilnet.Protocol(svcInfo.Protocol()),
} }
if utilproxy.IsZeroCIDR(address) { if utilproxy.IsZeroCIDR(address) {
// Empty IP address means all // Empty IP address means all
@ -1283,11 +1282,12 @@ func (proxier *Proxier) syncProxyRules() {
klog.V(4).InfoS("Port was open before and is still needed", "port", lp.String()) klog.V(4).InfoS("Port was open before and is still needed", "port", lp.String())
replacementPortsMap[lp] = proxier.portsMap[lp] replacementPortsMap[lp] = proxier.portsMap[lp]
} else if svcInfo.Protocol() != v1.ProtocolSCTP { } else if svcInfo.Protocol() != v1.ProtocolSCTP {
socket, err := proxier.portMapper.OpenLocalPort(&lp, isIPv6) socket, err := proxier.portMapper.OpenLocalPort(&lp)
if err != nil { if err != nil {
klog.ErrorS(err, "can't open port, skipping this nodePort", "port", lp.String()) klog.ErrorS(err, "can't open port, skipping this nodePort", "port", lp.String())
continue continue
} }
klog.V(2).InfoS("Opened local port", "port", lp.String())
replacementPortsMap[lp] = socket replacementPortsMap[lp] = socket
} }
} }
@ -1664,49 +1664,3 @@ func (proxier *Proxier) syncProxyRules() {
klog.V(4).InfoS("Deleting stale endpoint connections", "endpoints", endpointUpdateResult.StaleEndpoints) klog.V(4).InfoS("Deleting stale endpoint connections", "endpoints", endpointUpdateResult.StaleEndpoints)
proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints) proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints)
} }
func openLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) {
// For ports on node IPs, open the actual port and hold it, even though we
// use iptables to redirect traffic.
// This ensures a) that it's safe to use that port and b) that (a) stays
// true. The risk is that some process on the node (e.g. sshd or kubelet)
// is using a port and we give that same port out to a Service. That would
// be bad because iptables would silently claim the traffic but the process
// would never know.
// NOTE: We should not need to have a real listen()ing socket - bind()
// should be enough, but I can't figure out a way to e2e test without
// it. Tools like 'ss' and 'netstat' do not show sockets that are
// bind()ed but not listen()ed, and at least the default debian netcat
// has no way to avoid about 10 seconds of retries.
var socket utilproxy.Closeable
switch lp.Protocol {
case "tcp":
network := "tcp4"
if isIPv6 {
network = "tcp6"
}
listener, err := net.Listen(network, net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port)))
if err != nil {
return nil, err
}
socket = listener
case "udp":
network := "udp4"
if isIPv6 {
network = "udp6"
}
addr, err := net.ResolveUDPAddr(network, net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port)))
if err != nil {
return nil, err
}
conn, err := net.ListenUDP(network, addr)
if err != nil {
return nil, err
}
socket = conn
default:
return nil, fmt.Errorf("unknown protocol %q", lp.Protocol)
}
klog.V(2).InfoS("Opened local port", "port", lp.String())
return socket, nil
}

View File

@ -44,6 +44,7 @@ import (
iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing" iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing"
"k8s.io/utils/exec" "k8s.io/utils/exec"
fakeexec "k8s.io/utils/exec/testing" fakeexec "k8s.io/utils/exec/testing"
utilnet "k8s.io/utils/net"
utilpointer "k8s.io/utils/pointer" utilpointer "k8s.io/utils/pointer"
) )
@ -463,12 +464,12 @@ func (f *fakeCloseable) Close() error {
// fakePortOpener implements portOpener. // fakePortOpener implements portOpener.
type fakePortOpener struct { type fakePortOpener struct {
openPorts []*utilproxy.LocalPort openPorts []*utilnet.LocalPort
} }
// OpenLocalPort fakes out the listen() and bind() used by syncProxyRules // OpenLocalPort fakes out the listen() and bind() used by syncProxyRules
// to lock a local port. // to lock a local port.
func (f *fakePortOpener) OpenLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) { func (f *fakePortOpener) OpenLocalPort(lp *utilnet.LocalPort) (utilnet.Closeable, error) {
f.openPorts = append(f.openPorts, lp) f.openPorts = append(f.openPorts, lp)
return &fakeCloseable{}, nil return &fakeCloseable{}, nil
} }
@ -493,8 +494,8 @@ func NewFakeProxier(ipt utiliptables.Interface, endpointSlicesEnabled bool) *Pro
masqueradeMark: "0x4000", masqueradeMark: "0x4000",
localDetector: detectLocal, localDetector: detectLocal,
hostname: testHostname, hostname: testHostname,
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), portsMap: make(map[utilnet.LocalPort]utilnet.Closeable),
portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}}, portMapper: &fakePortOpener{[]*utilnet.LocalPort{}},
serviceHealthServer: healthcheck.NewFakeServiceHealthServer(), serviceHealthServer: healthcheck.NewFakeServiceHealthServer(),
precomputedProbabilities: make([]string, 0, 1001), precomputedProbabilities: make([]string, 0, 1001),
iptablesData: bytes.NewBuffer(nil), iptablesData: bytes.NewBuffer(nil),