Merge pull request #51074 from m1093782566/util-port

Automatic merge from submit-queue (batch tested with PRs 51229, 50131, 51074, 51167, 51213)

[proxy] Clean up LocalPort related functions and structures in proxier.go

**What this PR does / why we need it**:

See, https://github.com/kubernetes/kubernetes/blob/master/pkg/proxy/iptables/proxier.go#L1694

I think RevertPorts() is independent from iptables, and would be used by other proxiers which needs to hold/close local port.

Perhaps we can move RevertPorts() from proxier.go to pkg/proxy/util package so that it can be consumed among different proxiers. And, reduce codes in proxier.go

**Which issue this PR fixes**:

fixes #51073 

**Special notes for your reviewer**:

**Release note**:

```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2017-08-25 03:16:06 -07:00 committed by GitHub
commit c358544e83
6 changed files with 214 additions and 151 deletions

View File

@ -43,6 +43,7 @@ go_test(
deps = [ deps = [
"//pkg/api:go_default_library", "//pkg/api:go_default_library",
"//pkg/proxy:go_default_library", "//pkg/proxy:go_default_library",
"//pkg/proxy/util:go_default_library",
"//pkg/util/async:go_default_library", "//pkg/util/async:go_default_library",
"//pkg/util/iptables:go_default_library", "//pkg/util/iptables:go_default_library",
"//pkg/util/iptables/testing:go_default_library", "//pkg/util/iptables/testing:go_default_library",

View File

@ -369,7 +369,7 @@ type Proxier struct {
mu sync.Mutex // protects the following fields mu sync.Mutex // protects the following fields
serviceMap proxyServiceMap serviceMap proxyServiceMap
endpointsMap proxyEndpointsMap endpointsMap proxyEndpointsMap
portsMap map[localPort]closeable portsMap map[utilproxy.LocalPort]utilproxy.Closeable
// endpointsSynced and servicesSynced are set to true when corresponding // endpointsSynced and servicesSynced are set to true when corresponding
// objects are synced after startup. This is used to avoid updating iptables // objects are synced after startup. This is used to avoid updating iptables
// with some partial data after kube-proxy restart. // with some partial data after kube-proxy restart.
@ -386,7 +386,7 @@ type Proxier struct {
clusterCIDR string clusterCIDR string
hostname string hostname string
nodeIP net.IP nodeIP net.IP
portMapper portOpener portMapper utilproxy.PortOpener
recorder record.EventRecorder recorder record.EventRecorder
healthChecker healthcheck.Server healthChecker healthcheck.Server
healthzServer healthcheck.HealthzUpdater healthzServer healthcheck.HealthzUpdater
@ -405,32 +405,11 @@ type Proxier struct {
natRules *bytes.Buffer natRules *bytes.Buffer
} }
type localPort struct {
desc string
ip string
port int
protocol string
}
func (lp *localPort) String() string {
return fmt.Sprintf("%q (%s:%d/%s)", lp.desc, lp.ip, lp.port, lp.protocol)
}
type closeable interface {
Close() error
}
// portOpener is an interface around port opening/closing.
// Abstracted out for testing.
type portOpener interface {
OpenLocalPort(lp *localPort) (closeable, error)
}
// listenPortOpener opens ports by calling bind() and listen(). // listenPortOpener opens ports by calling bind() and listen().
type listenPortOpener struct{} type listenPortOpener struct{}
// OpenLocalPort holds the given local port open. // OpenLocalPort holds the given local port open.
func (l *listenPortOpener) OpenLocalPort(lp *localPort) (closeable, error) { func (l *listenPortOpener) OpenLocalPort(lp *utilproxy.LocalPort) (utilproxy.Closeable, error) {
return openLocalPort(lp) return openLocalPort(lp)
} }
@ -491,7 +470,7 @@ func NewProxier(ipt utiliptables.Interface,
healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps
proxier := &Proxier{ proxier := &Proxier{
portsMap: make(map[localPort]closeable), portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
serviceMap: make(proxyServiceMap), serviceMap: make(proxyServiceMap),
serviceChanges: newServiceChangeMap(), serviceChanges: newServiceChangeMap(),
endpointsMap: make(proxyEndpointsMap), endpointsMap: make(proxyEndpointsMap),
@ -1126,7 +1105,7 @@ func (proxier *Proxier) syncProxyRules() {
activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set
// Accumulate the set of local ports that we will be holding open once this update is complete // Accumulate the set of local ports that we will be holding open once this update is complete
replacementPortsMap := map[localPort]closeable{} replacementPortsMap := map[utilproxy.LocalPort]utilproxy.Closeable{}
// We are creating those slices ones here to avoid memory reallocations // We are creating those slices ones here to avoid memory reallocations
// in every loop. Note that reuse the memory, instead of doing: // in every loop. Note that reuse the memory, instead of doing:
@ -1200,11 +1179,11 @@ func (proxier *Proxier) syncProxyRules() {
if local, err := utilproxy.IsLocalIP(externalIP); err != nil { if local, err := utilproxy.IsLocalIP(externalIP); err != nil {
glog.Errorf("can't determine if IP is local, assuming not: %v", err) glog.Errorf("can't determine if IP is local, assuming not: %v", err)
} else if local { } else if local {
lp := localPort{ lp := utilproxy.LocalPort{
desc: "externalIP for " + svcNameString, Description: "externalIP for " + svcNameString,
ip: externalIP, IP: externalIP,
port: svcInfo.port, Port: svcInfo.port,
protocol: protocol, Protocol: protocol,
} }
if proxier.portsMap[lp] != nil { if proxier.portsMap[lp] != nil {
glog.V(4).Infof("Port %s was open before and is still needed", lp.String()) glog.V(4).Infof("Port %s was open before and is still needed", lp.String())
@ -1337,11 +1316,11 @@ func (proxier *Proxier) syncProxyRules() {
if svcInfo.nodePort != 0 { if svcInfo.nodePort != 0 {
// Hold the local port open so no other process can open it // Hold the local port open so no other process can open it
// (because the socket might open but it would never work). // (because the socket might open but it would never work).
lp := localPort{ lp := utilproxy.LocalPort{
desc: "nodePort for " + svcNameString, Description: "nodePort for " + svcNameString,
ip: "", IP: "",
port: svcInfo.nodePort, Port: svcInfo.nodePort,
protocol: protocol, Protocol: protocol,
} }
if proxier.portsMap[lp] != nil { if proxier.portsMap[lp] != nil {
glog.V(4).Infof("Port %s was open before and is still needed", lp.String()) glog.V(4).Infof("Port %s was open before and is still needed", lp.String())
@ -1352,14 +1331,14 @@ func (proxier *Proxier) syncProxyRules() {
glog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err) glog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
continue continue
} }
if lp.protocol == "udp" { if lp.Protocol == "udp" {
// TODO: We might have multiple services using the same port, and this will clear conntrack for all of them. // TODO: We might have multiple services using the same port, and this will clear conntrack for all of them.
// This is very low impact. The NodePort range is intentionally obscure, and unlikely to actually collide with real Services. // This is very low impact. The NodePort range is intentionally obscure, and unlikely to actually collide with real Services.
// This only affects UDP connections, which are not common. // This only affects UDP connections, which are not common.
// See issue: https://github.com/kubernetes/kubernetes/issues/49881 // See issue: https://github.com/kubernetes/kubernetes/issues/49881
err := utilproxy.ClearUDPConntrackForPort(proxier.exec, lp.port) err := utilproxy.ClearUDPConntrackForPort(proxier.exec, lp.Port)
if err != nil { if err != nil {
glog.Errorf("Failed to clear udp conntrack for port %d, error: %v", lp.port, err) glog.Errorf("Failed to clear udp conntrack for port %d, error: %v", lp.Port, err)
} }
} }
replacementPortsMap[lp] = socket replacementPortsMap[lp] = socket
@ -1601,7 +1580,8 @@ func (proxier *Proxier) syncProxyRules() {
} }
// Revert new local ports. // Revert new local ports.
revertPorts(replacementPortsMap, proxier.portsMap) glog.V(2).Infof("Closing local ports after iptables-restore failure")
utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
return return
} }
@ -1651,7 +1631,7 @@ func writeLine(buf *bytes.Buffer, words ...string) {
} }
} }
func openLocalPort(lp *localPort) (closeable, error) { func openLocalPort(lp *utilproxy.LocalPort) (utilproxy.Closeable, error) {
// For ports on node IPs, open the actual port and hold it, even though we // For ports on node IPs, open the actual port and hold it, even though we
// use iptables to redirect traffic. // use iptables to redirect traffic.
// This ensures a) that it's safe to use that port and b) that (a) stays // This ensures a) that it's safe to use that port and b) that (a) stays
@ -1664,16 +1644,16 @@ func openLocalPort(lp *localPort) (closeable, error) {
// it. Tools like 'ss' and 'netstat' do not show sockets that are // it. Tools like 'ss' and 'netstat' do not show sockets that are
// bind()ed but not listen()ed, and at least the default debian netcat // bind()ed but not listen()ed, and at least the default debian netcat
// has no way to avoid about 10 seconds of retries. // has no way to avoid about 10 seconds of retries.
var socket closeable var socket utilproxy.Closeable
switch lp.protocol { switch lp.Protocol {
case "tcp": case "tcp":
listener, err := net.Listen("tcp", net.JoinHostPort(lp.ip, strconv.Itoa(lp.port))) listener, err := net.Listen("tcp", net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port)))
if err != nil { if err != nil {
return nil, err return nil, err
} }
socket = listener socket = listener
case "udp": case "udp":
addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(lp.ip, strconv.Itoa(lp.port))) addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port)))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -1683,20 +1663,8 @@ func openLocalPort(lp *localPort) (closeable, error) {
} }
socket = conn socket = conn
default: default:
return nil, fmt.Errorf("unknown protocol %q", lp.protocol) return nil, fmt.Errorf("unknown protocol %q", lp.Protocol)
} }
glog.V(2).Infof("Opened local port %s", lp.String()) glog.V(2).Infof("Opened local port %s", lp.String())
return socket, nil return socket, nil
} }
// revertPorts is closing ports in replacementPortsMap but not in originalPortsMap. In other words, it only
// closes the ports opened in this sync.
func revertPorts(replacementPortsMap, originalPortsMap map[localPort]closeable) {
for k, v := range replacementPortsMap {
// Only close newly opened local ports - leave ones that were open before this update
if originalPortsMap[k] == nil {
glog.V(2).Infof("Closing local port %s after iptables-restore failure", k.String())
v.Close()
}
}
}

View File

@ -35,6 +35,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
"k8s.io/kubernetes/pkg/util/async" "k8s.io/kubernetes/pkg/util/async"
utiliptables "k8s.io/kubernetes/pkg/util/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables"
iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing" iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing"
@ -258,103 +259,14 @@ func (c *fakeClosable) Close() error {
return nil return nil
} }
func TestRevertPorts(t *testing.T) {
testCases := []struct {
replacementPorts []localPort
existingPorts []localPort
expectToBeClose []bool
}{
{
replacementPorts: []localPort{
{port: 5001},
{port: 5002},
{port: 5003},
},
existingPorts: []localPort{},
expectToBeClose: []bool{true, true, true},
},
{
replacementPorts: []localPort{},
existingPorts: []localPort{
{port: 5001},
{port: 5002},
{port: 5003},
},
expectToBeClose: []bool{},
},
{
replacementPorts: []localPort{
{port: 5001},
{port: 5002},
{port: 5003},
},
existingPorts: []localPort{
{port: 5001},
{port: 5002},
{port: 5003},
},
expectToBeClose: []bool{false, false, false},
},
{
replacementPorts: []localPort{
{port: 5001},
{port: 5002},
{port: 5003},
},
existingPorts: []localPort{
{port: 5001},
{port: 5003},
},
expectToBeClose: []bool{false, true, false},
},
{
replacementPorts: []localPort{
{port: 5001},
{port: 5002},
{port: 5003},
},
existingPorts: []localPort{
{port: 5001},
{port: 5002},
{port: 5003},
{port: 5004},
},
expectToBeClose: []bool{false, false, false},
},
}
for i, tc := range testCases {
replacementPortsMap := make(map[localPort]closeable)
for _, lp := range tc.replacementPorts {
replacementPortsMap[lp] = &fakeClosable{}
}
existingPortsMap := make(map[localPort]closeable)
for _, lp := range tc.existingPorts {
existingPortsMap[lp] = &fakeClosable{}
}
revertPorts(replacementPortsMap, existingPortsMap)
for j, expectation := range tc.expectToBeClose {
if replacementPortsMap[tc.replacementPorts[j]].(*fakeClosable).closed != expectation {
t.Errorf("Expect replacement localport %v to be %v in test case %v", tc.replacementPorts[j], expectation, i)
}
}
for _, lp := range tc.existingPorts {
if existingPortsMap[lp].(*fakeClosable).closed == true {
t.Errorf("Expect existing localport %v to be false in test case %v", lp, i)
}
}
}
}
// fakePortOpener implements portOpener. // fakePortOpener implements portOpener.
type fakePortOpener struct { type fakePortOpener struct {
openPorts []*localPort openPorts []*utilproxy.LocalPort
} }
// OpenLocalPort fakes out the listen() and bind() used by syncProxyRules // OpenLocalPort fakes out the listen() and bind() used by syncProxyRules
// to lock a local port. // to lock a local port.
func (f *fakePortOpener) OpenLocalPort(lp *localPort) (closeable, error) { func (f *fakePortOpener) OpenLocalPort(lp *utilproxy.LocalPort) (utilproxy.Closeable, error) {
f.openPorts = append(f.openPorts, lp) f.openPorts = append(f.openPorts, lp)
return nil, nil return nil, nil
} }
@ -395,8 +307,8 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
iptables: ipt, iptables: ipt,
clusterCIDR: "10.0.0.0/24", clusterCIDR: "10.0.0.0/24",
hostname: testHostname, hostname: testHostname,
portsMap: make(map[localPort]closeable), portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
portMapper: &fakePortOpener{[]*localPort{}}, portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}},
healthChecker: newFakeHealthChecker(), healthChecker: newFakeHealthChecker(),
precomputedProbabilities: make([]string, 0, 1001), precomputedProbabilities: make([]string, 0, 1001),
iptablesData: bytes.NewBuffer(nil), iptablesData: bytes.NewBuffer(nil),

View File

@ -4,6 +4,7 @@ go_library(
name = "go_default_library", name = "go_default_library",
srcs = [ srcs = [
"conntrack.go", "conntrack.go",
"port.go",
"utils.go", "utils.go",
], ],
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
@ -20,6 +21,7 @@ go_test(
name = "go_default_test", name = "go_default_test",
srcs = [ srcs = [
"conntrack_test.go", "conntrack_test.go",
"port_test.go",
"utils_test.go", "utils_test.go",
], ],
library = ":go_default_library", library = ":go_default_library",

64
pkg/proxy/util/port.go Normal file
View File

@ -0,0 +1,64 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"github.com/golang/glog"
)
// LocalPort describes a port on specific IP address and protocol
type LocalPort struct {
// Description is the identity message of a given local port.
Description string
// IP is the IP address part of a given local port.
// If this string is empty, the port binds to all local IP addresses.
IP string
// Port is the port part of a given local port.
Port int
// Protocol is the protocol part of a given local port.
// The value is assumed to be lower-case. For example, "udp" not "UDP", "tcp" not "TCP".
Protocol string
}
func (lp *LocalPort) String() string {
return fmt.Sprintf("%q (%s:%d/%s)", lp.Description, lp.IP, lp.Port, lp.Protocol)
}
// Closeable is an interface around closing an port.
type Closeable interface {
Close() error
}
// PortOpener is an interface around port opening/closing.
// Abstracted out for testing.
type PortOpener interface {
OpenLocalPort(lp *LocalPort) (Closeable, error)
}
// RevertPorts is closing ports in replacementPortsMap but not in originalPortsMap. In other words, it only
// closes the ports opened in this sync.
func RevertPorts(replacementPortsMap, originalPortsMap map[LocalPort]Closeable) {
for k, v := range replacementPortsMap {
// Only close newly opened local ports - leave ones that were open before this update
if originalPortsMap[k] == nil {
glog.V(2).Infof("Closing local port %s", k.String())
v.Close()
}
}
}

116
pkg/proxy/util/port_test.go Normal file
View File

@ -0,0 +1,116 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import "testing"
type fakeClosable struct {
closed bool
}
func (c *fakeClosable) Close() error {
c.closed = true
return nil
}
func TestRevertPorts(t *testing.T) {
testCases := []struct {
replacementPorts []LocalPort
existingPorts []LocalPort
expectToBeClose []bool
}{
{
replacementPorts: []LocalPort{
{Port: 5001},
{Port: 5002},
{Port: 5003},
},
existingPorts: []LocalPort{},
expectToBeClose: []bool{true, true, true},
},
{
replacementPorts: []LocalPort{},
existingPorts: []LocalPort{
{Port: 5001},
{Port: 5002},
{Port: 5003},
},
expectToBeClose: []bool{},
},
{
replacementPorts: []LocalPort{
{Port: 5001},
{Port: 5002},
{Port: 5003},
},
existingPorts: []LocalPort{
{Port: 5001},
{Port: 5002},
{Port: 5003},
},
expectToBeClose: []bool{false, false, false},
},
{
replacementPorts: []LocalPort{
{Port: 5001},
{Port: 5002},
{Port: 5003},
},
existingPorts: []LocalPort{
{Port: 5001},
{Port: 5003},
},
expectToBeClose: []bool{false, true, false},
},
{
replacementPorts: []LocalPort{
{Port: 5001},
{Port: 5002},
{Port: 5003},
},
existingPorts: []LocalPort{
{Port: 5001},
{Port: 5002},
{Port: 5003},
{Port: 5004},
},
expectToBeClose: []bool{false, false, false},
},
}
for i, tc := range testCases {
replacementPortsMap := make(map[LocalPort]Closeable)
for _, lp := range tc.replacementPorts {
replacementPortsMap[lp] = &fakeClosable{}
}
existingPortsMap := make(map[LocalPort]Closeable)
for _, lp := range tc.existingPorts {
existingPortsMap[lp] = &fakeClosable{}
}
RevertPorts(replacementPortsMap, existingPortsMap)
for j, expectation := range tc.expectToBeClose {
if replacementPortsMap[tc.replacementPorts[j]].(*fakeClosable).closed != expectation {
t.Errorf("Expect replacement localport %v to be %v in test case %v", tc.replacementPorts[j], expectation, i)
}
}
for _, lp := range tc.existingPorts {
if existingPortsMap[lp].(*fakeClosable).closed == true {
t.Errorf("Expect existing localport %v to be false in test case %v", lp, i)
}
}
}
}