From e68e105102bf4c539fb3a9b86de6b38fe9aeaa6d Mon Sep 17 00:00:00 2001 From: jornshen Date: Mon, 18 Jan 2021 20:44:15 +0800 Subject: [PATCH 1/4] migrate to use k8s.io/util LocalPort and ListenPortOpener in iptables.proxier --- pkg/proxy/iptables/BUILD | 1 + pkg/proxy/iptables/proxier.go | 86 +++++++----------------------- pkg/proxy/iptables/proxier_test.go | 9 ++-- 3 files changed, 26 insertions(+), 70 deletions(-) diff --git a/pkg/proxy/iptables/BUILD b/pkg/proxy/iptables/BUILD index d4795d93b69..c257e08e447 100644 --- a/pkg/proxy/iptables/BUILD +++ b/pkg/proxy/iptables/BUILD @@ -58,6 +58,7 @@ go_test( "//vendor/k8s.io/klog/v2:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", "//vendor/k8s.io/utils/exec/testing:go_default_library", + "//vendor/k8s.io/utils/net:go_default_library", "//vendor/k8s.io/utils/pointer:go_default_library", ], ) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index be9804148e6..efb6efdc175 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -186,7 +186,7 @@ type Proxier struct { mu sync.Mutex // protects the following fields serviceMap proxy.ServiceMap endpointsMap proxy.EndpointsMap - portsMap map[utilproxy.LocalPort]utilproxy.Closeable + portsMap map[utilnet.LocalPort]utilnet.Closeable nodeLabels map[string]string // endpointsSynced, endpointSlicesSynced, and servicesSynced are set to true // when corresponding objects are synced after startup. This is used to avoid @@ -206,7 +206,7 @@ type Proxier struct { localDetector proxyutiliptables.LocalTrafficDetector hostname string nodeIP net.IP - portMapper utilproxy.PortOpener + portMapper utilnet.PortOpener recorder record.EventRecorder serviceHealthServer healthcheck.ServiceHealthServer @@ -239,14 +239,6 @@ type Proxier struct { networkInterfacer utilproxy.NetworkInterfacer } -// listenPortOpener opens ports by calling bind() and listen(). -type listenPortOpener struct{} - -// OpenLocalPort holds the given local port open. -func (l *listenPortOpener) OpenLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) { - return openLocalPort(lp, isIPv6) -} - // Proxier implements proxy.Provider var _ proxy.Provider = &Proxier{} @@ -303,7 +295,7 @@ func NewProxier(ipt utiliptables.Interface, } proxier := &Proxier{ - portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), + portsMap: make(map[utilnet.LocalPort]utilnet.Closeable), serviceMap: make(proxy.ServiceMap), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), endpointsMap: make(proxy.EndpointsMap), @@ -316,7 +308,7 @@ func NewProxier(ipt utiliptables.Interface, localDetector: localDetector, hostname: hostname, nodeIP: nodeIP, - portMapper: &listenPortOpener{}, + portMapper: &utilnet.ListenPortOpener, recorder: recorder, serviceHealthServer: serviceHealthServer, healthzServer: healthzServer, @@ -981,7 +973,7 @@ func (proxier *Proxier) syncProxyRules() { activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set // Accumulate the set of local ports that we will be holding open once this update is complete - replacementPortsMap := map[utilproxy.LocalPort]utilproxy.Closeable{} + replacementPortsMap := map[utilnet.LocalPort]utilnet.Closeable{} // We are creating those slices ones here to avoid memory reallocations // in every loop. Note that reuse the memory, instead of doing: @@ -1017,6 +1009,10 @@ func (proxier *Proxier) syncProxyRules() { continue } isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP()) + localPortIPFamily := utilnet.IPv4 + if isIPv6 { + localPortIPFamily = utilnet.IPv6 + } protocol := strings.ToLower(string(svcInfo.Protocol())) svcNameString := svcInfo.serviceNameString @@ -1102,17 +1098,18 @@ func (proxier *Proxier) syncProxyRules() { // machine, hold the local port open so no other process can open it // (because the socket might open but it would never work). if (svcInfo.Protocol() != v1.ProtocolSCTP) && localAddrSet.Has(net.ParseIP(externalIP)) { - lp := utilproxy.LocalPort{ + lp := utilnet.LocalPort{ Description: "externalIP for " + svcNameString, IP: externalIP, + IPFamily: localPortIPFamily, Port: svcInfo.Port(), - Protocol: protocol, + Protocol: utilnet.Protocol(svcInfo.Protocol()), } if proxier.portsMap[lp] != nil { klog.V(4).InfoS("Port was open before and is still needed", "port", lp.String()) replacementPortsMap[lp] = proxier.portsMap[lp] } else { - socket, err := proxier.portMapper.OpenLocalPort(&lp, isIPv6) + socket, err := proxier.portMapper.OpenLocalPort(&lp) if err != nil { msg := fmt.Sprintf("can't open %s, skipping this externalIP: %v", lp.String(), err) @@ -1126,6 +1123,7 @@ func (proxier *Proxier) syncProxyRules() { klog.ErrorS(err, "can't open port, skipping externalIP", "port", lp.String()) continue } + klog.V(2).InfoS("Opened local port", "port", lp.String()) replacementPortsMap[lp] = socket } } @@ -1259,13 +1257,14 @@ func (proxier *Proxier) syncProxyRules() { continue } - lps := make([]utilproxy.LocalPort, 0) + lps := make([]utilnet.LocalPort, 0) for address := range nodeAddresses { - lp := utilproxy.LocalPort{ + lp := utilnet.LocalPort{ Description: "nodePort for " + svcNameString, IP: address, + IPFamily: localPortIPFamily, Port: svcInfo.NodePort(), - Protocol: protocol, + Protocol: utilnet.Protocol(svcInfo.Protocol()), } if utilproxy.IsZeroCIDR(address) { // Empty IP address means all @@ -1283,11 +1282,12 @@ func (proxier *Proxier) syncProxyRules() { klog.V(4).InfoS("Port was open before and is still needed", "port", lp.String()) replacementPortsMap[lp] = proxier.portsMap[lp] } else if svcInfo.Protocol() != v1.ProtocolSCTP { - socket, err := proxier.portMapper.OpenLocalPort(&lp, isIPv6) + socket, err := proxier.portMapper.OpenLocalPort(&lp) if err != nil { klog.ErrorS(err, "can't open port, skipping this nodePort", "port", lp.String()) continue } + klog.V(2).InfoS("Opened local port", "port", lp.String()) replacementPortsMap[lp] = socket } } @@ -1664,49 +1664,3 @@ func (proxier *Proxier) syncProxyRules() { klog.V(4).InfoS("Deleting stale endpoint connections", "endpoints", endpointUpdateResult.StaleEndpoints) proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints) } - -func openLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) { - // For ports on node IPs, open the actual port and hold it, even though we - // use iptables to redirect traffic. - // This ensures a) that it's safe to use that port and b) that (a) stays - // true. The risk is that some process on the node (e.g. sshd or kubelet) - // is using a port and we give that same port out to a Service. That would - // be bad because iptables would silently claim the traffic but the process - // would never know. - // NOTE: We should not need to have a real listen()ing socket - bind() - // should be enough, but I can't figure out a way to e2e test without - // it. Tools like 'ss' and 'netstat' do not show sockets that are - // bind()ed but not listen()ed, and at least the default debian netcat - // has no way to avoid about 10 seconds of retries. - var socket utilproxy.Closeable - switch lp.Protocol { - case "tcp": - network := "tcp4" - if isIPv6 { - network = "tcp6" - } - listener, err := net.Listen(network, net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port))) - if err != nil { - return nil, err - } - socket = listener - case "udp": - network := "udp4" - if isIPv6 { - network = "udp6" - } - addr, err := net.ResolveUDPAddr(network, net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port))) - if err != nil { - return nil, err - } - conn, err := net.ListenUDP(network, addr) - if err != nil { - return nil, err - } - socket = conn - default: - return nil, fmt.Errorf("unknown protocol %q", lp.Protocol) - } - klog.V(2).InfoS("Opened local port", "port", lp.String()) - return socket, nil -} diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index cb3a96dccba..2d4f2d992ec 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -44,6 +44,7 @@ import ( iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing" "k8s.io/utils/exec" fakeexec "k8s.io/utils/exec/testing" + utilnet "k8s.io/utils/net" utilpointer "k8s.io/utils/pointer" ) @@ -463,12 +464,12 @@ func (f *fakeCloseable) Close() error { // fakePortOpener implements portOpener. type fakePortOpener struct { - openPorts []*utilproxy.LocalPort + openPorts []*utilnet.LocalPort } // OpenLocalPort fakes out the listen() and bind() used by syncProxyRules // to lock a local port. -func (f *fakePortOpener) OpenLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) { +func (f *fakePortOpener) OpenLocalPort(lp *utilnet.LocalPort) (utilnet.Closeable, error) { f.openPorts = append(f.openPorts, lp) return &fakeCloseable{}, nil } @@ -493,8 +494,8 @@ func NewFakeProxier(ipt utiliptables.Interface, endpointSlicesEnabled bool) *Pro masqueradeMark: "0x4000", localDetector: detectLocal, hostname: testHostname, - portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), - portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}}, + portsMap: make(map[utilnet.LocalPort]utilnet.Closeable), + portMapper: &fakePortOpener{[]*utilnet.LocalPort{}}, serviceHealthServer: healthcheck.NewFakeServiceHealthServer(), precomputedProbabilities: make([]string, 0, 1001), iptablesData: bytes.NewBuffer(nil), From 97a5a3d4d5ac6aea65d34a37da744ccbdb1f2005 Mon Sep 17 00:00:00 2001 From: jornshen Date: Mon, 18 Jan 2021 21:16:16 +0800 Subject: [PATCH 2/4] migrate to use k8s.io/util LocalPort and ListenPortOpener in ipvs.proxier --- pkg/proxy/ipvs/BUILD | 1 - pkg/proxy/ipvs/proxier.go | 89 +++++++++------------------------- pkg/proxy/ipvs/proxier_test.go | 9 ++-- 3 files changed, 26 insertions(+), 73 deletions(-) diff --git a/pkg/proxy/ipvs/BUILD b/pkg/proxy/ipvs/BUILD index f10643ece38..0117ad38966 100644 --- a/pkg/proxy/ipvs/BUILD +++ b/pkg/proxy/ipvs/BUILD @@ -18,7 +18,6 @@ go_test( "//pkg/proxy:go_default_library", "//pkg/proxy/healthcheck:go_default_library", "//pkg/proxy/ipvs/testing:go_default_library", - "//pkg/proxy/util:go_default_library", "//pkg/proxy/util/iptables:go_default_library", "//pkg/proxy/util/testing:go_default_library", "//pkg/util/async:go_default_library", diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 1289883cef1..7bb97a303cb 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -219,7 +219,7 @@ type Proxier struct { mu sync.Mutex // protects the following fields serviceMap proxy.ServiceMap endpointsMap proxy.EndpointsMap - portsMap map[utilproxy.LocalPort]utilproxy.Closeable + portsMap map[utilnet.LocalPort]utilnet.Closeable nodeLabels map[string]string // endpointsSynced, endpointSlicesSynced, and servicesSynced are set to true when // corresponding objects are synced after startup. This is used to avoid updating @@ -246,7 +246,7 @@ type Proxier struct { localDetector proxyutiliptables.LocalTrafficDetector hostname string nodeIP net.IP - portMapper utilproxy.PortOpener + portMapper utilnet.PortOpener recorder record.EventRecorder serviceHealthServer healthcheck.ServiceHealthServer @@ -459,7 +459,7 @@ func NewProxier(ipt utiliptables.Interface, proxier := &Proxier{ ipFamily: ipFamily, - portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), + portsMap: make(map[utilnet.LocalPort]utilnet.Closeable), serviceMap: make(proxy.ServiceMap), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), endpointsMap: make(proxy.EndpointsMap), @@ -474,7 +474,7 @@ func NewProxier(ipt utiliptables.Interface, localDetector: localDetector, hostname: hostname, nodeIP: nodeIP, - portMapper: &listenPortOpener{}, + portMapper: &utilnet.ListenPortOpener, recorder: recorder, serviceHealthServer: serviceHealthServer, healthzServer: healthzServer, @@ -1093,7 +1093,7 @@ func (proxier *Proxier) syncProxyRules() { } // Accumulate the set of local ports that we will be holding open once this update is complete - replacementPortsMap := map[utilproxy.LocalPort]utilproxy.Closeable{} + replacementPortsMap := map[utilnet.LocalPort]utilnet.Closeable{} // activeIPVSServices represents IPVS service successfully created in this round of sync activeIPVSServices := map[string]bool{} // currentIPVSServices represent IPVS services listed from the system @@ -1164,6 +1164,10 @@ func (proxier *Proxier) syncProxyRules() { continue } isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP()) + localPortIPFamily := utilnet.IPv4 + if isIPv6 { + localPortIPFamily = utilnet.IPv6 + } protocol := strings.ToLower(string(svcInfo.Protocol())) // Precompute svcNameString; with many services the many calls // to ServicePortName.String() show up in CPU profiles. @@ -1246,17 +1250,18 @@ func (proxier *Proxier) syncProxyRules() { // (because the socket might open but it would never work). if (svcInfo.Protocol() != v1.ProtocolSCTP) && localAddrSet.Has(net.ParseIP(externalIP)) { // We do not start listening on SCTP ports, according to our agreement in the SCTP support KEP - lp := utilproxy.LocalPort{ + lp := utilnet.LocalPort{ Description: "externalIP for " + svcNameString, IP: externalIP, + IPFamily: localPortIPFamily, Port: svcInfo.Port(), - Protocol: protocol, + Protocol: utilnet.Protocol(svcInfo.Protocol()), } if proxier.portsMap[lp] != nil { klog.V(4).Infof("Port %s was open before and is still needed", lp.String()) replacementPortsMap[lp] = proxier.portsMap[lp] } else { - socket, err := proxier.portMapper.OpenLocalPort(&lp, isIPv6) + socket, err := proxier.portMapper.OpenLocalPort(&lp) if err != nil { msg := fmt.Sprintf("can't open %s, skipping this externalIP: %v", lp.String(), err) @@ -1270,6 +1275,7 @@ func (proxier *Proxier) syncProxyRules() { klog.Error(msg) continue } + klog.V(2).Infof("Opened local port %s", lp.String()) replacementPortsMap[lp] = socket } } // We're holding the port, so it's OK to install IPVS rules. @@ -1430,13 +1436,14 @@ func (proxier *Proxier) syncProxyRules() { continue } - var lps []utilproxy.LocalPort + var lps []utilnet.LocalPort for _, address := range nodeAddresses { - lp := utilproxy.LocalPort{ + lp := utilnet.LocalPort{ Description: "nodePort for " + svcNameString, IP: address, + IPFamily: localPortIPFamily, Port: svcInfo.NodePort(), - Protocol: protocol, + Protocol: utilnet.Protocol(svcInfo.Protocol()), } if utilproxy.IsZeroCIDR(address) { // Empty IP address means all @@ -1456,12 +1463,14 @@ func (proxier *Proxier) syncProxyRules() { // We do not start listening on SCTP ports, according to our agreement in the // SCTP support KEP } else if svcInfo.Protocol() != v1.ProtocolSCTP { - socket, err := proxier.portMapper.OpenLocalPort(&lp, isIPv6) + socket, err := proxier.portMapper.OpenLocalPort(&lp) if err != nil { klog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err) continue } - if lp.Protocol == "udp" { + klog.V(2).Infof("Opened local port %s", lp.String()) + + if lp.Protocol == utilnet.UDP { conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP) } replacementPortsMap[lp] = socket @@ -2194,60 +2203,6 @@ func (proxier *Proxier) getLegacyBindAddr(activeBindAddrs map[string]bool, curre return legacyAddrs } -// listenPortOpener opens ports by calling bind() and listen(). -type listenPortOpener struct{} - -// OpenLocalPort holds the given local port open. -func (l *listenPortOpener) OpenLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) { - return openLocalPort(lp, isIPv6) -} - -func openLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) { - // For ports on node IPs, open the actual port and hold it, even though we - // use ipvs to redirect traffic. - // This ensures a) that it's safe to use that port and b) that (a) stays - // true. The risk is that some process on the node (e.g. sshd or kubelet) - // is using a port and we give that same port out to a Service. That would - // be bad because ipvs would silently claim the traffic but the process - // would never know. - // NOTE: We should not need to have a real listen()ing socket - bind() - // should be enough, but I can't figure out a way to e2e test without - // it. Tools like 'ss' and 'netstat' do not show sockets that are - // bind()ed but not listen()ed, and at least the default debian netcat - // has no way to avoid about 10 seconds of retries. - var socket utilproxy.Closeable - switch lp.Protocol { - case "tcp": - network := "tcp4" - if isIPv6 { - network = "tcp6" - } - listener, err := net.Listen(network, net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port))) - if err != nil { - return nil, err - } - socket = listener - case "udp": - network := "udp4" - if isIPv6 { - network = "udp6" - } - addr, err := net.ResolveUDPAddr(network, net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port))) - if err != nil { - return nil, err - } - conn, err := net.ListenUDP(network, addr) - if err != nil { - return nil, err - } - socket = conn - default: - return nil, fmt.Errorf("unknown protocol %q", lp.Protocol) - } - klog.V(2).Infof("Opened local port %s", lp.String()) - return socket, nil -} - // ipvs Proxier fall back on iptables when it needs to do SNAT for engress packets // It will only operate iptables *nat table. // Create and link the kube postrouting chain for SNAT packets. diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 46a0c4c6216..9912244ff2a 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -36,7 +36,6 @@ import ( "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/healthcheck" netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing" - utilproxy "k8s.io/kubernetes/pkg/proxy/util" proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables" proxyutiltest "k8s.io/kubernetes/pkg/proxy/util/testing" "k8s.io/kubernetes/pkg/util/async" @@ -70,12 +69,12 @@ func (f *fakeIPGetter) BindedIPs() (sets.String, error) { // fakePortOpener implements portOpener. type fakePortOpener struct { - openPorts []*utilproxy.LocalPort + openPorts []*utilnet.LocalPort } // OpenLocalPort fakes out the listen() and bind() used by syncProxyRules // to lock a local port. -func (f *fakePortOpener) OpenLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) { +func (f *fakePortOpener) OpenLocalPort(lp *utilnet.LocalPort) (utilnet.Closeable, error) { f.openPorts = append(f.openPorts, lp) return nil, nil } @@ -151,8 +150,8 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u strictARP: false, localDetector: proxyutiliptables.NewNoOpLocalDetector(), hostname: testHostname, - portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), - portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}}, + portsMap: make(map[utilnet.LocalPort]utilnet.Closeable), + portMapper: &fakePortOpener{[]*utilnet.LocalPort{}}, serviceHealthServer: healthcheck.NewFakeServiceHealthServer(), ipvsScheduler: DefaultScheduler, ipGetter: &fakeIPGetter{nodeIPs: nodeIPs}, From d8d6a0223b444a7ab4192377bbea67588f5bcfda Mon Sep 17 00:00:00 2001 From: jornshen Date: Mon, 18 Jan 2021 21:26:55 +0800 Subject: [PATCH 3/4] clear no use LocalPort in winkernel --- pkg/proxy/winkernel/BUILD | 2 -- pkg/proxy/winkernel/proxier.go | 3 --- pkg/proxy/winkernel/proxier_test.go | 2 -- 3 files changed, 7 deletions(-) diff --git a/pkg/proxy/winkernel/BUILD b/pkg/proxy/winkernel/BUILD index 5817e7ad0bf..8b697069ba0 100644 --- a/pkg/proxy/winkernel/BUILD +++ b/pkg/proxy/winkernel/BUILD @@ -23,7 +23,6 @@ go_library( "//pkg/proxy/healthcheck:go_default_library", "//pkg/proxy/metaproxier:go_default_library", "//pkg/proxy/metrics:go_default_library", - "//pkg/proxy/util:go_default_library", "//pkg/util/async:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/discovery/v1beta1:go_default_library", @@ -67,7 +66,6 @@ go_test( "@io_bazel_rules_go//go/platform:windows": [ "//pkg/proxy:go_default_library", "//pkg/proxy/healthcheck:go_default_library", - "//pkg/proxy/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/discovery/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index 3ab83f4b160..fa585a62403 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -50,7 +50,6 @@ import ( "k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/proxy/metaproxier" "k8s.io/kubernetes/pkg/proxy/metrics" - utilproxy "k8s.io/kubernetes/pkg/proxy/util" "k8s.io/kubernetes/pkg/util/async" utilnet "k8s.io/utils/net" ) @@ -446,7 +445,6 @@ type Proxier struct { mu sync.Mutex // protects the following fields serviceMap proxy.ServiceMap endpointsMap proxy.EndpointsMap - portsMap map[utilproxy.LocalPort]utilproxy.Closeable // endpointsSynced and servicesSynced are set to true when corresponding // objects are synced after startup. This is used to avoid updating hns policies // with some partial data after kube-proxy restart. @@ -619,7 +617,6 @@ func NewProxier( endpointSlicesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.WindowsEndpointSliceProxying) proxier := &Proxier{ endPointsRefCount: make(endPointsReferenceCountMap), - portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), serviceMap: make(proxy.ServiceMap), endpointsMap: make(proxy.EndpointsMap), masqueradeAll: masqueradeAll, diff --git a/pkg/proxy/winkernel/proxier_test.go b/pkg/proxy/winkernel/proxier_test.go index 03e7d0bef4e..be7b51ee28b 100644 --- a/pkg/proxy/winkernel/proxier_test.go +++ b/pkg/proxy/winkernel/proxier_test.go @@ -27,7 +27,6 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/healthcheck" - utilproxy "k8s.io/kubernetes/pkg/proxy/util" utilpointer "k8s.io/utils/pointer" "net" "strings" @@ -119,7 +118,6 @@ func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clust networkType: networkType, } proxier := &Proxier{ - portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), serviceMap: make(proxy.ServiceMap), endpointsMap: make(proxy.EndpointsMap), clusterCIDR: clusterCIDR, From 00e26e97855dbc47e47dcd09569430642470a480 Mon Sep 17 00:00:00 2001 From: jornshen Date: Mon, 18 Jan 2021 21:38:32 +0800 Subject: [PATCH 4/4] clear pkg/proxy/port.go port_test.go file --- pkg/proxy/util/BUILD | 3 +- pkg/proxy/util/port.go | 67 ---------------- pkg/proxy/util/port_test.go | 145 ----------------------------------- pkg/proxy/util/utils.go | 12 +++ pkg/proxy/util/utils_test.go | 97 +++++++++++++++++++++++ 5 files changed, 110 insertions(+), 214 deletions(-) delete mode 100644 pkg/proxy/util/port.go delete mode 100644 pkg/proxy/util/port_test.go diff --git a/pkg/proxy/util/BUILD b/pkg/proxy/util/BUILD index 4f1ce9864b1..842bb1f7947 100644 --- a/pkg/proxy/util/BUILD +++ b/pkg/proxy/util/BUILD @@ -5,7 +5,6 @@ go_library( srcs = [ "endpoints.go", "network.go", - "port.go", "utils.go", ], importpath = "k8s.io/kubernetes/pkg/proxy/util", @@ -27,7 +26,6 @@ go_test( name = "go_default_test", srcs = [ "endpoints_test.go", - "port_test.go", "utils_test.go", ], embed = [":go_default_library"], @@ -36,6 +34,7 @@ go_test( "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/utils/net:go_default_library", ], ) diff --git a/pkg/proxy/util/port.go b/pkg/proxy/util/port.go deleted file mode 100644 index e256bd9ec44..00000000000 --- a/pkg/proxy/util/port.go +++ /dev/null @@ -1,67 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package util - -import ( - "fmt" - "net" - "strconv" - - "k8s.io/klog/v2" -) - -// LocalPort describes a port on specific IP address and protocol -type LocalPort struct { - // Description is the identity message of a given local port. - Description string - // IP is the IP address part of a given local port. - // If this string is empty, the port binds to all local IP addresses. - IP string - // Port is the port part of a given local port. - Port int - // Protocol is the protocol part of a given local port. - // The value is assumed to be lower-case. For example, "udp" not "UDP", "tcp" not "TCP". - Protocol string -} - -func (lp *LocalPort) String() string { - ipPort := net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port)) - return fmt.Sprintf("%q (%s/%s)", lp.Description, ipPort, lp.Protocol) -} - -// Closeable is an interface around closing a port. -type Closeable interface { - Close() error -} - -// PortOpener is an interface around port opening/closing. -// Abstracted out for testing. -type PortOpener interface { - OpenLocalPort(lp *LocalPort, isIPv6 bool) (Closeable, error) -} - -// RevertPorts is closing ports in replacementPortsMap but not in originalPortsMap. In other words, it only -// closes the ports opened in this sync. -func RevertPorts(replacementPortsMap, originalPortsMap map[LocalPort]Closeable) { - for k, v := range replacementPortsMap { - // Only close newly opened local ports - leave ones that were open before this update - if originalPortsMap[k] == nil { - klog.V(2).Infof("Closing local port %s", k.String()) - v.Close() - } - } -} diff --git a/pkg/proxy/util/port_test.go b/pkg/proxy/util/port_test.go deleted file mode 100644 index 7295fd5000c..00000000000 --- a/pkg/proxy/util/port_test.go +++ /dev/null @@ -1,145 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package util - -import "testing" - -type fakeClosable struct { - closed bool -} - -func (c *fakeClosable) Close() error { - c.closed = true - return nil -} - -func TestLocalPortString(t *testing.T) { - testCases := []struct { - description string - ip string - port int - protocol string - expectedStr string - }{ - {"IPv4 UDP", "1.2.3.4", 9999, "udp", "\"IPv4 UDP\" (1.2.3.4:9999/udp)"}, - {"IPv4 TCP", "5.6.7.8", 1053, "tcp", "\"IPv4 TCP\" (5.6.7.8:1053/tcp)"}, - {"IPv6 TCP", "2001:db8::1", 80, "tcp", "\"IPv6 TCP\" ([2001:db8::1]:80/tcp)"}, - {"IPv4 SCTP", "9.10.11.12", 7777, "sctp", "\"IPv4 SCTP\" (9.10.11.12:7777/sctp)"}, - {"IPv6 SCTP", "2001:db8::2", 80, "sctp", "\"IPv6 SCTP\" ([2001:db8::2]:80/sctp)"}, - } - - for _, tc := range testCases { - lp := &LocalPort{ - Description: tc.description, - IP: tc.ip, - Port: tc.port, - Protocol: tc.protocol, - } - str := lp.String() - if str != tc.expectedStr { - t.Errorf("Unexpected output for %s, expected: %s, got: %s", tc.description, tc.expectedStr, str) - } - } -} - -func TestRevertPorts(t *testing.T) { - testCases := []struct { - replacementPorts []LocalPort - existingPorts []LocalPort - expectToBeClose []bool - }{ - { - replacementPorts: []LocalPort{ - {Port: 5001}, - {Port: 5002}, - {Port: 5003}, - }, - existingPorts: []LocalPort{}, - expectToBeClose: []bool{true, true, true}, - }, - { - replacementPorts: []LocalPort{}, - existingPorts: []LocalPort{ - {Port: 5001}, - {Port: 5002}, - {Port: 5003}, - }, - expectToBeClose: []bool{}, - }, - { - replacementPorts: []LocalPort{ - {Port: 5001}, - {Port: 5002}, - {Port: 5003}, - }, - existingPorts: []LocalPort{ - {Port: 5001}, - {Port: 5002}, - {Port: 5003}, - }, - expectToBeClose: []bool{false, false, false}, - }, - { - replacementPorts: []LocalPort{ - {Port: 5001}, - {Port: 5002}, - {Port: 5003}, - }, - existingPorts: []LocalPort{ - {Port: 5001}, - {Port: 5003}, - }, - expectToBeClose: []bool{false, true, false}, - }, - { - replacementPorts: []LocalPort{ - {Port: 5001}, - {Port: 5002}, - {Port: 5003}, - }, - existingPorts: []LocalPort{ - {Port: 5001}, - {Port: 5002}, - {Port: 5003}, - {Port: 5004}, - }, - expectToBeClose: []bool{false, false, false}, - }, - } - - for i, tc := range testCases { - replacementPortsMap := make(map[LocalPort]Closeable) - for _, lp := range tc.replacementPorts { - replacementPortsMap[lp] = &fakeClosable{} - } - existingPortsMap := make(map[LocalPort]Closeable) - for _, lp := range tc.existingPorts { - existingPortsMap[lp] = &fakeClosable{} - } - RevertPorts(replacementPortsMap, existingPortsMap) - for j, expectation := range tc.expectToBeClose { - if replacementPortsMap[tc.replacementPorts[j]].(*fakeClosable).closed != expectation { - t.Errorf("Expect replacement localport %v to be %v in test case %v", tc.replacementPorts[j], expectation, i) - } - } - for _, lp := range tc.existingPorts { - if existingPortsMap[lp].(*fakeClosable).closed == true { - t.Errorf("Expect existing localport %v to be false in test case %v", lp, i) - } - } - } -} diff --git a/pkg/proxy/util/utils.go b/pkg/proxy/util/utils.go index b4ce2772292..45b46582de5 100644 --- a/pkg/proxy/util/utils.go +++ b/pkg/proxy/util/utils.go @@ -482,3 +482,15 @@ func WriteBytesLine(buf *bytes.Buffer, bytes []byte) { buf.Write(bytes) buf.WriteByte('\n') } + +// RevertPorts is closing ports in replacementPortsMap but not in originalPortsMap. In other words, it only +// closes the ports opened in this sync. +func RevertPorts(replacementPortsMap, originalPortsMap map[utilnet.LocalPort]utilnet.Closeable) { + for k, v := range replacementPortsMap { + // Only close newly opened local ports - leave ones that were open before this update + if originalPortsMap[k] == nil { + klog.V(2).Infof("Closing local port %s", k.String()) + v.Close() + } + } +} diff --git a/pkg/proxy/util/utils_test.go b/pkg/proxy/util/utils_test.go index 895b3b7c245..f2467405f25 100644 --- a/pkg/proxy/util/utils_test.go +++ b/pkg/proxy/util/utils_test.go @@ -29,6 +29,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" fake "k8s.io/kubernetes/pkg/proxy/util/testing" + utilnet "k8s.io/utils/net" ) func TestValidateWorks(t *testing.T) { @@ -1048,7 +1049,103 @@ func TestGetClusterIPByFamily(t *testing.T) { } }) } +} +type fakeClosable struct { + closed bool +} + +func (c *fakeClosable) Close() error { + c.closed = true + return nil +} + +func TestRevertPorts(t *testing.T) { + testCases := []struct { + replacementPorts []utilnet.LocalPort + existingPorts []utilnet.LocalPort + expectToBeClose []bool + }{ + { + replacementPorts: []utilnet.LocalPort{ + {Port: 5001}, + {Port: 5002}, + {Port: 5003}, + }, + existingPorts: []utilnet.LocalPort{}, + expectToBeClose: []bool{true, true, true}, + }, + { + replacementPorts: []utilnet.LocalPort{}, + existingPorts: []utilnet.LocalPort{ + {Port: 5001}, + {Port: 5002}, + {Port: 5003}, + }, + expectToBeClose: []bool{}, + }, + { + replacementPorts: []utilnet.LocalPort{ + {Port: 5001}, + {Port: 5002}, + {Port: 5003}, + }, + existingPorts: []utilnet.LocalPort{ + {Port: 5001}, + {Port: 5002}, + {Port: 5003}, + }, + expectToBeClose: []bool{false, false, false}, + }, + { + replacementPorts: []utilnet.LocalPort{ + {Port: 5001}, + {Port: 5002}, + {Port: 5003}, + }, + existingPorts: []utilnet.LocalPort{ + {Port: 5001}, + {Port: 5003}, + }, + expectToBeClose: []bool{false, true, false}, + }, + { + replacementPorts: []utilnet.LocalPort{ + {Port: 5001}, + {Port: 5002}, + {Port: 5003}, + }, + existingPorts: []utilnet.LocalPort{ + {Port: 5001}, + {Port: 5002}, + {Port: 5003}, + {Port: 5004}, + }, + expectToBeClose: []bool{false, false, false}, + }, + } + + for i, tc := range testCases { + replacementPortsMap := make(map[utilnet.LocalPort]utilnet.Closeable) + for _, lp := range tc.replacementPorts { + replacementPortsMap[lp] = &fakeClosable{} + } + existingPortsMap := make(map[utilnet.LocalPort]utilnet.Closeable) + for _, lp := range tc.existingPorts { + existingPortsMap[lp] = &fakeClosable{} + } + RevertPorts(replacementPortsMap, existingPortsMap) + for j, expectation := range tc.expectToBeClose { + if replacementPortsMap[tc.replacementPorts[j]].(*fakeClosable).closed != expectation { + t.Errorf("Expect replacement localport %v to be %v in test case %v", tc.replacementPorts[j], expectation, i) + } + } + for _, lp := range tc.existingPorts { + if existingPortsMap[lp].(*fakeClosable).closed == true { + t.Errorf("Expect existing localport %v to be false in test case %v", lp, i) + } + } + } } func TestWriteLine(t *testing.T) {