From a7fd545d49cf55773363150ff406a7b6b3337d7e Mon Sep 17 00:00:00 2001 From: m1093782566 Date: Tue, 22 Aug 2017 10:49:24 +0800 Subject: [PATCH] clean up LocalPort in proxier.go --- pkg/proxy/iptables/BUILD | 1 + pkg/proxy/iptables/proxier.go | 84 +++++++-------------- pkg/proxy/iptables/proxier_test.go | 98 ++---------------------- pkg/proxy/util/BUILD | 2 + pkg/proxy/util/port.go | 64 ++++++++++++++++ pkg/proxy/util/port_test.go | 116 +++++++++++++++++++++++++++++ 6 files changed, 214 insertions(+), 151 deletions(-) create mode 100644 pkg/proxy/util/port.go create mode 100644 pkg/proxy/util/port_test.go diff --git a/pkg/proxy/iptables/BUILD b/pkg/proxy/iptables/BUILD index e6c45650b38..ac49fb2093e 100644 --- a/pkg/proxy/iptables/BUILD +++ b/pkg/proxy/iptables/BUILD @@ -43,6 +43,7 @@ go_test( deps = [ "//pkg/api:go_default_library", "//pkg/proxy:go_default_library", + "//pkg/proxy/util:go_default_library", "//pkg/util/async:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/iptables/testing:go_default_library", diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 6f2e6668427..3ba5dcd0c62 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -369,7 +369,7 @@ type Proxier struct { mu sync.Mutex // protects the following fields serviceMap proxyServiceMap endpointsMap proxyEndpointsMap - portsMap map[localPort]closeable + portsMap map[utilproxy.LocalPort]utilproxy.Closeable // endpointsSynced and servicesSynced are set to true when corresponding // objects are synced after startup. This is used to avoid updating iptables // with some partial data after kube-proxy restart. @@ -386,7 +386,7 @@ type Proxier struct { clusterCIDR string hostname string nodeIP net.IP - portMapper portOpener + portMapper utilproxy.PortOpener recorder record.EventRecorder healthChecker healthcheck.Server healthzServer healthcheck.HealthzUpdater @@ -405,32 +405,11 @@ type Proxier struct { natRules *bytes.Buffer } -type localPort struct { - desc string - ip string - port int - protocol string -} - -func (lp *localPort) String() string { - return fmt.Sprintf("%q (%s:%d/%s)", lp.desc, lp.ip, lp.port, lp.protocol) -} - -type closeable interface { - Close() error -} - -// portOpener is an interface around port opening/closing. -// Abstracted out for testing. -type portOpener interface { - OpenLocalPort(lp *localPort) (closeable, error) -} - // listenPortOpener opens ports by calling bind() and listen(). type listenPortOpener struct{} // OpenLocalPort holds the given local port open. -func (l *listenPortOpener) OpenLocalPort(lp *localPort) (closeable, error) { +func (l *listenPortOpener) OpenLocalPort(lp *utilproxy.LocalPort) (utilproxy.Closeable, error) { return openLocalPort(lp) } @@ -491,7 +470,7 @@ func NewProxier(ipt utiliptables.Interface, healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps proxier := &Proxier{ - portsMap: make(map[localPort]closeable), + portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), serviceMap: make(proxyServiceMap), serviceChanges: newServiceChangeMap(), endpointsMap: make(proxyEndpointsMap), @@ -1126,7 +1105,7 @@ func (proxier *Proxier) syncProxyRules() { activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set // Accumulate the set of local ports that we will be holding open once this update is complete - replacementPortsMap := map[localPort]closeable{} + replacementPortsMap := map[utilproxy.LocalPort]utilproxy.Closeable{} // We are creating those slices ones here to avoid memory reallocations // in every loop. Note that reuse the memory, instead of doing: @@ -1200,11 +1179,11 @@ func (proxier *Proxier) syncProxyRules() { if local, err := utilproxy.IsLocalIP(externalIP); err != nil { glog.Errorf("can't determine if IP is local, assuming not: %v", err) } else if local { - lp := localPort{ - desc: "externalIP for " + svcNameString, - ip: externalIP, - port: svcInfo.port, - protocol: protocol, + lp := utilproxy.LocalPort{ + Description: "externalIP for " + svcNameString, + IP: externalIP, + Port: svcInfo.port, + Protocol: protocol, } if proxier.portsMap[lp] != nil { glog.V(4).Infof("Port %s was open before and is still needed", lp.String()) @@ -1337,11 +1316,11 @@ func (proxier *Proxier) syncProxyRules() { if svcInfo.nodePort != 0 { // Hold the local port open so no other process can open it // (because the socket might open but it would never work). - lp := localPort{ - desc: "nodePort for " + svcNameString, - ip: "", - port: svcInfo.nodePort, - protocol: protocol, + lp := utilproxy.LocalPort{ + Description: "nodePort for " + svcNameString, + IP: "", + Port: svcInfo.nodePort, + Protocol: protocol, } if proxier.portsMap[lp] != nil { glog.V(4).Infof("Port %s was open before and is still needed", lp.String()) @@ -1352,14 +1331,14 @@ func (proxier *Proxier) syncProxyRules() { glog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err) continue } - if lp.protocol == "udp" { + if lp.Protocol == "udp" { // TODO: We might have multiple services using the same port, and this will clear conntrack for all of them. // This is very low impact. The NodePort range is intentionally obscure, and unlikely to actually collide with real Services. // This only affects UDP connections, which are not common. // See issue: https://github.com/kubernetes/kubernetes/issues/49881 - err := utilproxy.ClearUDPConntrackForPort(proxier.exec, lp.port) + err := utilproxy.ClearUDPConntrackForPort(proxier.exec, lp.Port) if err != nil { - glog.Errorf("Failed to clear udp conntrack for port %d, error: %v", lp.port, err) + glog.Errorf("Failed to clear udp conntrack for port %d, error: %v", lp.Port, err) } } replacementPortsMap[lp] = socket @@ -1601,7 +1580,8 @@ func (proxier *Proxier) syncProxyRules() { } // Revert new local ports. - revertPorts(replacementPortsMap, proxier.portsMap) + glog.V(2).Infof("Closing local ports after iptables-restore failure") + utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap) return } @@ -1651,7 +1631,7 @@ func writeLine(buf *bytes.Buffer, words ...string) { } } -func openLocalPort(lp *localPort) (closeable, error) { +func openLocalPort(lp *utilproxy.LocalPort) (utilproxy.Closeable, error) { // For ports on node IPs, open the actual port and hold it, even though we // use iptables to redirect traffic. // This ensures a) that it's safe to use that port and b) that (a) stays @@ -1664,16 +1644,16 @@ func openLocalPort(lp *localPort) (closeable, error) { // it. Tools like 'ss' and 'netstat' do not show sockets that are // bind()ed but not listen()ed, and at least the default debian netcat // has no way to avoid about 10 seconds of retries. - var socket closeable - switch lp.protocol { + var socket utilproxy.Closeable + switch lp.Protocol { case "tcp": - listener, err := net.Listen("tcp", net.JoinHostPort(lp.ip, strconv.Itoa(lp.port))) + listener, err := net.Listen("tcp", net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port))) if err != nil { return nil, err } socket = listener case "udp": - addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(lp.ip, strconv.Itoa(lp.port))) + addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port))) if err != nil { return nil, err } @@ -1683,20 +1663,8 @@ func openLocalPort(lp *localPort) (closeable, error) { } socket = conn default: - return nil, fmt.Errorf("unknown protocol %q", lp.protocol) + return nil, fmt.Errorf("unknown protocol %q", lp.Protocol) } glog.V(2).Infof("Opened local port %s", lp.String()) return socket, nil } - -// revertPorts is closing ports in replacementPortsMap but not in originalPortsMap. In other words, it only -// closes the ports opened in this sync. -func revertPorts(replacementPortsMap, originalPortsMap map[localPort]closeable) { - for k, v := range replacementPortsMap { - // Only close newly opened local ports - leave ones that were open before this update - if originalPortsMap[k] == nil { - glog.V(2).Infof("Closing local port %s after iptables-restore failure", k.String()) - v.Close() - } - } -} diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 35f288c53b3..e9057f423af 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -35,6 +35,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/proxy" + utilproxy "k8s.io/kubernetes/pkg/proxy/util" "k8s.io/kubernetes/pkg/util/async" utiliptables "k8s.io/kubernetes/pkg/util/iptables" iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing" @@ -258,103 +259,14 @@ func (c *fakeClosable) Close() error { return nil } -func TestRevertPorts(t *testing.T) { - testCases := []struct { - replacementPorts []localPort - existingPorts []localPort - expectToBeClose []bool - }{ - { - replacementPorts: []localPort{ - {port: 5001}, - {port: 5002}, - {port: 5003}, - }, - existingPorts: []localPort{}, - expectToBeClose: []bool{true, true, true}, - }, - { - replacementPorts: []localPort{}, - existingPorts: []localPort{ - {port: 5001}, - {port: 5002}, - {port: 5003}, - }, - expectToBeClose: []bool{}, - }, - { - replacementPorts: []localPort{ - {port: 5001}, - {port: 5002}, - {port: 5003}, - }, - existingPorts: []localPort{ - {port: 5001}, - {port: 5002}, - {port: 5003}, - }, - expectToBeClose: []bool{false, false, false}, - }, - { - replacementPorts: []localPort{ - {port: 5001}, - {port: 5002}, - {port: 5003}, - }, - existingPorts: []localPort{ - {port: 5001}, - {port: 5003}, - }, - expectToBeClose: []bool{false, true, false}, - }, - { - replacementPorts: []localPort{ - {port: 5001}, - {port: 5002}, - {port: 5003}, - }, - existingPorts: []localPort{ - {port: 5001}, - {port: 5002}, - {port: 5003}, - {port: 5004}, - }, - expectToBeClose: []bool{false, false, false}, - }, - } - - for i, tc := range testCases { - replacementPortsMap := make(map[localPort]closeable) - for _, lp := range tc.replacementPorts { - replacementPortsMap[lp] = &fakeClosable{} - } - existingPortsMap := make(map[localPort]closeable) - for _, lp := range tc.existingPorts { - existingPortsMap[lp] = &fakeClosable{} - } - revertPorts(replacementPortsMap, existingPortsMap) - for j, expectation := range tc.expectToBeClose { - if replacementPortsMap[tc.replacementPorts[j]].(*fakeClosable).closed != expectation { - t.Errorf("Expect replacement localport %v to be %v in test case %v", tc.replacementPorts[j], expectation, i) - } - } - for _, lp := range tc.existingPorts { - if existingPortsMap[lp].(*fakeClosable).closed == true { - t.Errorf("Expect existing localport %v to be false in test case %v", lp, i) - } - } - } - -} - // fakePortOpener implements portOpener. type fakePortOpener struct { - openPorts []*localPort + openPorts []*utilproxy.LocalPort } // OpenLocalPort fakes out the listen() and bind() used by syncProxyRules // to lock a local port. -func (f *fakePortOpener) OpenLocalPort(lp *localPort) (closeable, error) { +func (f *fakePortOpener) OpenLocalPort(lp *utilproxy.LocalPort) (utilproxy.Closeable, error) { f.openPorts = append(f.openPorts, lp) return nil, nil } @@ -395,8 +307,8 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { iptables: ipt, clusterCIDR: "10.0.0.0/24", hostname: testHostname, - portsMap: make(map[localPort]closeable), - portMapper: &fakePortOpener{[]*localPort{}}, + portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), + portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}}, healthChecker: newFakeHealthChecker(), precomputedProbabilities: make([]string, 0, 1001), iptablesData: bytes.NewBuffer(nil), diff --git a/pkg/proxy/util/BUILD b/pkg/proxy/util/BUILD index 9cbb8476aac..0fb554dc0cf 100644 --- a/pkg/proxy/util/BUILD +++ b/pkg/proxy/util/BUILD @@ -4,6 +4,7 @@ go_library( name = "go_default_library", srcs = [ "conntrack.go", + "port.go", "utils.go", ], visibility = ["//visibility:public"], @@ -20,6 +21,7 @@ go_test( name = "go_default_test", srcs = [ "conntrack_test.go", + "port_test.go", "utils_test.go", ], library = ":go_default_library", diff --git a/pkg/proxy/util/port.go b/pkg/proxy/util/port.go new file mode 100644 index 00000000000..fd1a024dac8 --- /dev/null +++ b/pkg/proxy/util/port.go @@ -0,0 +1,64 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "fmt" + + "github.com/golang/glog" +) + +// LocalPort describes a port on specific IP address and protocol +type LocalPort struct { + // Description is the identity message of a given local port. + Description string + // IP is the IP address part of a given local port. + // If this string is empty, the port binds to all local IP addresses. + IP string + // Port is the port part of a given local port. + Port int + // Protocol is the protocol part of a given local port. + // The value is assumed to be lower-case. For example, "udp" not "UDP", "tcp" not "TCP". + Protocol string +} + +func (lp *LocalPort) String() string { + return fmt.Sprintf("%q (%s:%d/%s)", lp.Description, lp.IP, lp.Port, lp.Protocol) +} + +// Closeable is an interface around closing an port. +type Closeable interface { + Close() error +} + +// PortOpener is an interface around port opening/closing. +// Abstracted out for testing. +type PortOpener interface { + OpenLocalPort(lp *LocalPort) (Closeable, error) +} + +// RevertPorts is closing ports in replacementPortsMap but not in originalPortsMap. In other words, it only +// closes the ports opened in this sync. +func RevertPorts(replacementPortsMap, originalPortsMap map[LocalPort]Closeable) { + for k, v := range replacementPortsMap { + // Only close newly opened local ports - leave ones that were open before this update + if originalPortsMap[k] == nil { + glog.V(2).Infof("Closing local port %s", k.String()) + v.Close() + } + } +} diff --git a/pkg/proxy/util/port_test.go b/pkg/proxy/util/port_test.go new file mode 100644 index 00000000000..7f1cb0f9e1b --- /dev/null +++ b/pkg/proxy/util/port_test.go @@ -0,0 +1,116 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import "testing" + +type fakeClosable struct { + closed bool +} + +func (c *fakeClosable) Close() error { + c.closed = true + return nil +} + +func TestRevertPorts(t *testing.T) { + testCases := []struct { + replacementPorts []LocalPort + existingPorts []LocalPort + expectToBeClose []bool + }{ + { + replacementPorts: []LocalPort{ + {Port: 5001}, + {Port: 5002}, + {Port: 5003}, + }, + existingPorts: []LocalPort{}, + expectToBeClose: []bool{true, true, true}, + }, + { + replacementPorts: []LocalPort{}, + existingPorts: []LocalPort{ + {Port: 5001}, + {Port: 5002}, + {Port: 5003}, + }, + expectToBeClose: []bool{}, + }, + { + replacementPorts: []LocalPort{ + {Port: 5001}, + {Port: 5002}, + {Port: 5003}, + }, + existingPorts: []LocalPort{ + {Port: 5001}, + {Port: 5002}, + {Port: 5003}, + }, + expectToBeClose: []bool{false, false, false}, + }, + { + replacementPorts: []LocalPort{ + {Port: 5001}, + {Port: 5002}, + {Port: 5003}, + }, + existingPorts: []LocalPort{ + {Port: 5001}, + {Port: 5003}, + }, + expectToBeClose: []bool{false, true, false}, + }, + { + replacementPorts: []LocalPort{ + {Port: 5001}, + {Port: 5002}, + {Port: 5003}, + }, + existingPorts: []LocalPort{ + {Port: 5001}, + {Port: 5002}, + {Port: 5003}, + {Port: 5004}, + }, + expectToBeClose: []bool{false, false, false}, + }, + } + + for i, tc := range testCases { + replacementPortsMap := make(map[LocalPort]Closeable) + for _, lp := range tc.replacementPorts { + replacementPortsMap[lp] = &fakeClosable{} + } + existingPortsMap := make(map[LocalPort]Closeable) + for _, lp := range tc.existingPorts { + existingPortsMap[lp] = &fakeClosable{} + } + RevertPorts(replacementPortsMap, existingPortsMap) + for j, expectation := range tc.expectToBeClose { + if replacementPortsMap[tc.replacementPorts[j]].(*fakeClosable).closed != expectation { + t.Errorf("Expect replacement localport %v to be %v in test case %v", tc.replacementPorts[j], expectation, i) + } + } + for _, lp := range tc.existingPorts { + if existingPortsMap[lp].(*fakeClosable).closed == true { + t.Errorf("Expect existing localport %v to be false in test case %v", lp, i) + } + } + } +}