diff --git a/pkg/proxy/iptables/BUILD b/pkg/proxy/iptables/BUILD index d4795d93b69..c257e08e447 100644 --- a/pkg/proxy/iptables/BUILD +++ b/pkg/proxy/iptables/BUILD @@ -58,6 +58,7 @@ go_test( "//vendor/k8s.io/klog/v2:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", "//vendor/k8s.io/utils/exec/testing:go_default_library", + "//vendor/k8s.io/utils/net:go_default_library", "//vendor/k8s.io/utils/pointer:go_default_library", ], ) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index be9804148e6..efb6efdc175 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -186,7 +186,7 @@ type Proxier struct { mu sync.Mutex // protects the following fields serviceMap proxy.ServiceMap endpointsMap proxy.EndpointsMap - portsMap map[utilproxy.LocalPort]utilproxy.Closeable + portsMap map[utilnet.LocalPort]utilnet.Closeable nodeLabels map[string]string // endpointsSynced, endpointSlicesSynced, and servicesSynced are set to true // when corresponding objects are synced after startup. This is used to avoid @@ -206,7 +206,7 @@ type Proxier struct { localDetector proxyutiliptables.LocalTrafficDetector hostname string nodeIP net.IP - portMapper utilproxy.PortOpener + portMapper utilnet.PortOpener recorder record.EventRecorder serviceHealthServer healthcheck.ServiceHealthServer @@ -239,14 +239,6 @@ type Proxier struct { networkInterfacer utilproxy.NetworkInterfacer } -// listenPortOpener opens ports by calling bind() and listen(). -type listenPortOpener struct{} - -// OpenLocalPort holds the given local port open. -func (l *listenPortOpener) OpenLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) { - return openLocalPort(lp, isIPv6) -} - // Proxier implements proxy.Provider var _ proxy.Provider = &Proxier{} @@ -303,7 +295,7 @@ func NewProxier(ipt utiliptables.Interface, } proxier := &Proxier{ - portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), + portsMap: make(map[utilnet.LocalPort]utilnet.Closeable), serviceMap: make(proxy.ServiceMap), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), endpointsMap: make(proxy.EndpointsMap), @@ -316,7 +308,7 @@ func NewProxier(ipt utiliptables.Interface, localDetector: localDetector, hostname: hostname, nodeIP: nodeIP, - portMapper: &listenPortOpener{}, + portMapper: &utilnet.ListenPortOpener, recorder: recorder, serviceHealthServer: serviceHealthServer, healthzServer: healthzServer, @@ -981,7 +973,7 @@ func (proxier *Proxier) syncProxyRules() { activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set // Accumulate the set of local ports that we will be holding open once this update is complete - replacementPortsMap := map[utilproxy.LocalPort]utilproxy.Closeable{} + replacementPortsMap := map[utilnet.LocalPort]utilnet.Closeable{} // We are creating those slices ones here to avoid memory reallocations // in every loop. Note that reuse the memory, instead of doing: @@ -1017,6 +1009,10 @@ func (proxier *Proxier) syncProxyRules() { continue } isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP()) + localPortIPFamily := utilnet.IPv4 + if isIPv6 { + localPortIPFamily = utilnet.IPv6 + } protocol := strings.ToLower(string(svcInfo.Protocol())) svcNameString := svcInfo.serviceNameString @@ -1102,17 +1098,18 @@ func (proxier *Proxier) syncProxyRules() { // machine, hold the local port open so no other process can open it // (because the socket might open but it would never work). if (svcInfo.Protocol() != v1.ProtocolSCTP) && localAddrSet.Has(net.ParseIP(externalIP)) { - lp := utilproxy.LocalPort{ + lp := utilnet.LocalPort{ Description: "externalIP for " + svcNameString, IP: externalIP, + IPFamily: localPortIPFamily, Port: svcInfo.Port(), - Protocol: protocol, + Protocol: utilnet.Protocol(svcInfo.Protocol()), } if proxier.portsMap[lp] != nil { klog.V(4).InfoS("Port was open before and is still needed", "port", lp.String()) replacementPortsMap[lp] = proxier.portsMap[lp] } else { - socket, err := proxier.portMapper.OpenLocalPort(&lp, isIPv6) + socket, err := proxier.portMapper.OpenLocalPort(&lp) if err != nil { msg := fmt.Sprintf("can't open %s, skipping this externalIP: %v", lp.String(), err) @@ -1126,6 +1123,7 @@ func (proxier *Proxier) syncProxyRules() { klog.ErrorS(err, "can't open port, skipping externalIP", "port", lp.String()) continue } + klog.V(2).InfoS("Opened local port", "port", lp.String()) replacementPortsMap[lp] = socket } } @@ -1259,13 +1257,14 @@ func (proxier *Proxier) syncProxyRules() { continue } - lps := make([]utilproxy.LocalPort, 0) + lps := make([]utilnet.LocalPort, 0) for address := range nodeAddresses { - lp := utilproxy.LocalPort{ + lp := utilnet.LocalPort{ Description: "nodePort for " + svcNameString, IP: address, + IPFamily: localPortIPFamily, Port: svcInfo.NodePort(), - Protocol: protocol, + Protocol: utilnet.Protocol(svcInfo.Protocol()), } if utilproxy.IsZeroCIDR(address) { // Empty IP address means all @@ -1283,11 +1282,12 @@ func (proxier *Proxier) syncProxyRules() { klog.V(4).InfoS("Port was open before and is still needed", "port", lp.String()) replacementPortsMap[lp] = proxier.portsMap[lp] } else if svcInfo.Protocol() != v1.ProtocolSCTP { - socket, err := proxier.portMapper.OpenLocalPort(&lp, isIPv6) + socket, err := proxier.portMapper.OpenLocalPort(&lp) if err != nil { klog.ErrorS(err, "can't open port, skipping this nodePort", "port", lp.String()) continue } + klog.V(2).InfoS("Opened local port", "port", lp.String()) replacementPortsMap[lp] = socket } } @@ -1664,49 +1664,3 @@ func (proxier *Proxier) syncProxyRules() { klog.V(4).InfoS("Deleting stale endpoint connections", "endpoints", endpointUpdateResult.StaleEndpoints) proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints) } - -func openLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) { - // For ports on node IPs, open the actual port and hold it, even though we - // use iptables to redirect traffic. - // This ensures a) that it's safe to use that port and b) that (a) stays - // true. The risk is that some process on the node (e.g. sshd or kubelet) - // is using a port and we give that same port out to a Service. That would - // be bad because iptables would silently claim the traffic but the process - // would never know. - // NOTE: We should not need to have a real listen()ing socket - bind() - // should be enough, but I can't figure out a way to e2e test without - // it. Tools like 'ss' and 'netstat' do not show sockets that are - // bind()ed but not listen()ed, and at least the default debian netcat - // has no way to avoid about 10 seconds of retries. - var socket utilproxy.Closeable - switch lp.Protocol { - case "tcp": - network := "tcp4" - if isIPv6 { - network = "tcp6" - } - listener, err := net.Listen(network, net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port))) - if err != nil { - return nil, err - } - socket = listener - case "udp": - network := "udp4" - if isIPv6 { - network = "udp6" - } - addr, err := net.ResolveUDPAddr(network, net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port))) - if err != nil { - return nil, err - } - conn, err := net.ListenUDP(network, addr) - if err != nil { - return nil, err - } - socket = conn - default: - return nil, fmt.Errorf("unknown protocol %q", lp.Protocol) - } - klog.V(2).InfoS("Opened local port", "port", lp.String()) - return socket, nil -} diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index cb3a96dccba..2d4f2d992ec 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -44,6 +44,7 @@ import ( iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing" "k8s.io/utils/exec" fakeexec "k8s.io/utils/exec/testing" + utilnet "k8s.io/utils/net" utilpointer "k8s.io/utils/pointer" ) @@ -463,12 +464,12 @@ func (f *fakeCloseable) Close() error { // fakePortOpener implements portOpener. type fakePortOpener struct { - openPorts []*utilproxy.LocalPort + openPorts []*utilnet.LocalPort } // OpenLocalPort fakes out the listen() and bind() used by syncProxyRules // to lock a local port. -func (f *fakePortOpener) OpenLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) { +func (f *fakePortOpener) OpenLocalPort(lp *utilnet.LocalPort) (utilnet.Closeable, error) { f.openPorts = append(f.openPorts, lp) return &fakeCloseable{}, nil } @@ -493,8 +494,8 @@ func NewFakeProxier(ipt utiliptables.Interface, endpointSlicesEnabled bool) *Pro masqueradeMark: "0x4000", localDetector: detectLocal, hostname: testHostname, - portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), - portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}}, + portsMap: make(map[utilnet.LocalPort]utilnet.Closeable), + portMapper: &fakePortOpener{[]*utilnet.LocalPort{}}, serviceHealthServer: healthcheck.NewFakeServiceHealthServer(), precomputedProbabilities: make([]string, 0, 1001), iptablesData: bytes.NewBuffer(nil),