diff --git a/pkg/proxy/ipvs/BUILD b/pkg/proxy/ipvs/BUILD index f10643ece38..0117ad38966 100644 --- a/pkg/proxy/ipvs/BUILD +++ b/pkg/proxy/ipvs/BUILD @@ -18,7 +18,6 @@ go_test( "//pkg/proxy:go_default_library", "//pkg/proxy/healthcheck:go_default_library", "//pkg/proxy/ipvs/testing:go_default_library", - "//pkg/proxy/util:go_default_library", "//pkg/proxy/util/iptables:go_default_library", "//pkg/proxy/util/testing:go_default_library", "//pkg/util/async:go_default_library", diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 1289883cef1..7bb97a303cb 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -219,7 +219,7 @@ type Proxier struct { mu sync.Mutex // protects the following fields serviceMap proxy.ServiceMap endpointsMap proxy.EndpointsMap - portsMap map[utilproxy.LocalPort]utilproxy.Closeable + portsMap map[utilnet.LocalPort]utilnet.Closeable nodeLabels map[string]string // endpointsSynced, endpointSlicesSynced, and servicesSynced are set to true when // corresponding objects are synced after startup. This is used to avoid updating @@ -246,7 +246,7 @@ type Proxier struct { localDetector proxyutiliptables.LocalTrafficDetector hostname string nodeIP net.IP - portMapper utilproxy.PortOpener + portMapper utilnet.PortOpener recorder record.EventRecorder serviceHealthServer healthcheck.ServiceHealthServer @@ -459,7 +459,7 @@ func NewProxier(ipt utiliptables.Interface, proxier := &Proxier{ ipFamily: ipFamily, - portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), + portsMap: make(map[utilnet.LocalPort]utilnet.Closeable), serviceMap: make(proxy.ServiceMap), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), endpointsMap: make(proxy.EndpointsMap), @@ -474,7 +474,7 @@ func NewProxier(ipt utiliptables.Interface, localDetector: localDetector, hostname: hostname, nodeIP: nodeIP, - portMapper: &listenPortOpener{}, + portMapper: &utilnet.ListenPortOpener, recorder: recorder, serviceHealthServer: serviceHealthServer, healthzServer: healthzServer, @@ -1093,7 +1093,7 @@ func (proxier *Proxier) syncProxyRules() { } // Accumulate the set of local ports that we will be holding open once this update is complete - replacementPortsMap := map[utilproxy.LocalPort]utilproxy.Closeable{} + replacementPortsMap := map[utilnet.LocalPort]utilnet.Closeable{} // activeIPVSServices represents IPVS service successfully created in this round of sync activeIPVSServices := map[string]bool{} // currentIPVSServices represent IPVS services listed from the system @@ -1164,6 +1164,10 @@ func (proxier *Proxier) syncProxyRules() { continue } isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP()) + localPortIPFamily := utilnet.IPv4 + if isIPv6 { + localPortIPFamily = utilnet.IPv6 + } protocol := strings.ToLower(string(svcInfo.Protocol())) // Precompute svcNameString; with many services the many calls // to ServicePortName.String() show up in CPU profiles. @@ -1246,17 +1250,18 @@ func (proxier *Proxier) syncProxyRules() { // (because the socket might open but it would never work). if (svcInfo.Protocol() != v1.ProtocolSCTP) && localAddrSet.Has(net.ParseIP(externalIP)) { // We do not start listening on SCTP ports, according to our agreement in the SCTP support KEP - lp := utilproxy.LocalPort{ + lp := utilnet.LocalPort{ Description: "externalIP for " + svcNameString, IP: externalIP, + IPFamily: localPortIPFamily, Port: svcInfo.Port(), - Protocol: protocol, + Protocol: utilnet.Protocol(svcInfo.Protocol()), } if proxier.portsMap[lp] != nil { klog.V(4).Infof("Port %s was open before and is still needed", lp.String()) replacementPortsMap[lp] = proxier.portsMap[lp] } else { - socket, err := proxier.portMapper.OpenLocalPort(&lp, isIPv6) + socket, err := proxier.portMapper.OpenLocalPort(&lp) if err != nil { msg := fmt.Sprintf("can't open %s, skipping this externalIP: %v", lp.String(), err) @@ -1270,6 +1275,7 @@ func (proxier *Proxier) syncProxyRules() { klog.Error(msg) continue } + klog.V(2).Infof("Opened local port %s", lp.String()) replacementPortsMap[lp] = socket } } // We're holding the port, so it's OK to install IPVS rules. @@ -1430,13 +1436,14 @@ func (proxier *Proxier) syncProxyRules() { continue } - var lps []utilproxy.LocalPort + var lps []utilnet.LocalPort for _, address := range nodeAddresses { - lp := utilproxy.LocalPort{ + lp := utilnet.LocalPort{ Description: "nodePort for " + svcNameString, IP: address, + IPFamily: localPortIPFamily, Port: svcInfo.NodePort(), - Protocol: protocol, + Protocol: utilnet.Protocol(svcInfo.Protocol()), } if utilproxy.IsZeroCIDR(address) { // Empty IP address means all @@ -1456,12 +1463,14 @@ func (proxier *Proxier) syncProxyRules() { // We do not start listening on SCTP ports, according to our agreement in the // SCTP support KEP } else if svcInfo.Protocol() != v1.ProtocolSCTP { - socket, err := proxier.portMapper.OpenLocalPort(&lp, isIPv6) + socket, err := proxier.portMapper.OpenLocalPort(&lp) if err != nil { klog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err) continue } - if lp.Protocol == "udp" { + klog.V(2).Infof("Opened local port %s", lp.String()) + + if lp.Protocol == utilnet.UDP { conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP) } replacementPortsMap[lp] = socket @@ -2194,60 +2203,6 @@ func (proxier *Proxier) getLegacyBindAddr(activeBindAddrs map[string]bool, curre return legacyAddrs } -// listenPortOpener opens ports by calling bind() and listen(). -type listenPortOpener struct{} - -// OpenLocalPort holds the given local port open. -func (l *listenPortOpener) OpenLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) { - return openLocalPort(lp, isIPv6) -} - -func openLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) { - // For ports on node IPs, open the actual port and hold it, even though we - // use ipvs to redirect traffic. - // This ensures a) that it's safe to use that port and b) that (a) stays - // true. The risk is that some process on the node (e.g. sshd or kubelet) - // is using a port and we give that same port out to a Service. That would - // be bad because ipvs would silently claim the traffic but the process - // would never know. - // NOTE: We should not need to have a real listen()ing socket - bind() - // should be enough, but I can't figure out a way to e2e test without - // it. Tools like 'ss' and 'netstat' do not show sockets that are - // bind()ed but not listen()ed, and at least the default debian netcat - // has no way to avoid about 10 seconds of retries. - var socket utilproxy.Closeable - switch lp.Protocol { - case "tcp": - network := "tcp4" - if isIPv6 { - network = "tcp6" - } - listener, err := net.Listen(network, net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port))) - if err != nil { - return nil, err - } - socket = listener - case "udp": - network := "udp4" - if isIPv6 { - network = "udp6" - } - addr, err := net.ResolveUDPAddr(network, net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port))) - if err != nil { - return nil, err - } - conn, err := net.ListenUDP(network, addr) - if err != nil { - return nil, err - } - socket = conn - default: - return nil, fmt.Errorf("unknown protocol %q", lp.Protocol) - } - klog.V(2).Infof("Opened local port %s", lp.String()) - return socket, nil -} - // ipvs Proxier fall back on iptables when it needs to do SNAT for engress packets // It will only operate iptables *nat table. // Create and link the kube postrouting chain for SNAT packets. diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 46a0c4c6216..9912244ff2a 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -36,7 +36,6 @@ import ( "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/healthcheck" netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing" - utilproxy "k8s.io/kubernetes/pkg/proxy/util" proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables" proxyutiltest "k8s.io/kubernetes/pkg/proxy/util/testing" "k8s.io/kubernetes/pkg/util/async" @@ -70,12 +69,12 @@ func (f *fakeIPGetter) BindedIPs() (sets.String, error) { // fakePortOpener implements portOpener. type fakePortOpener struct { - openPorts []*utilproxy.LocalPort + openPorts []*utilnet.LocalPort } // OpenLocalPort fakes out the listen() and bind() used by syncProxyRules // to lock a local port. -func (f *fakePortOpener) OpenLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) { +func (f *fakePortOpener) OpenLocalPort(lp *utilnet.LocalPort) (utilnet.Closeable, error) { f.openPorts = append(f.openPorts, lp) return nil, nil } @@ -151,8 +150,8 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u strictARP: false, localDetector: proxyutiliptables.NewNoOpLocalDetector(), hostname: testHostname, - portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), - portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}}, + portsMap: make(map[utilnet.LocalPort]utilnet.Closeable), + portMapper: &fakePortOpener{[]*utilnet.LocalPort{}}, serviceHealthServer: healthcheck.NewFakeServiceHealthServer(), ipvsScheduler: DefaultScheduler, ipGetter: &fakeIPGetter{nodeIPs: nodeIPs},