clean up LocalPort in proxier.go

This commit is contained in:
m1093782566
2017-08-22 10:49:24 +08:00
parent e633a1604f
commit a7fd545d49
6 changed files with 214 additions and 151 deletions

View File

@@ -369,7 +369,7 @@ type Proxier struct {
mu sync.Mutex // protects the following fields
serviceMap proxyServiceMap
endpointsMap proxyEndpointsMap
portsMap map[localPort]closeable
portsMap map[utilproxy.LocalPort]utilproxy.Closeable
// endpointsSynced and servicesSynced are set to true when corresponding
// objects are synced after startup. This is used to avoid updating iptables
// with some partial data after kube-proxy restart.
@@ -386,7 +386,7 @@ type Proxier struct {
clusterCIDR string
hostname string
nodeIP net.IP
portMapper portOpener
portMapper utilproxy.PortOpener
recorder record.EventRecorder
healthChecker healthcheck.Server
healthzServer healthcheck.HealthzUpdater
@@ -405,32 +405,11 @@ type Proxier struct {
natRules *bytes.Buffer
}
type localPort struct {
desc string
ip string
port int
protocol string
}
func (lp *localPort) String() string {
return fmt.Sprintf("%q (%s:%d/%s)", lp.desc, lp.ip, lp.port, lp.protocol)
}
type closeable interface {
Close() error
}
// portOpener is an interface around port opening/closing.
// Abstracted out for testing.
type portOpener interface {
OpenLocalPort(lp *localPort) (closeable, error)
}
// listenPortOpener opens ports by calling bind() and listen().
type listenPortOpener struct{}
// OpenLocalPort holds the given local port open.
func (l *listenPortOpener) OpenLocalPort(lp *localPort) (closeable, error) {
func (l *listenPortOpener) OpenLocalPort(lp *utilproxy.LocalPort) (utilproxy.Closeable, error) {
return openLocalPort(lp)
}
@@ -491,7 +470,7 @@ func NewProxier(ipt utiliptables.Interface,
healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps
proxier := &Proxier{
portsMap: make(map[localPort]closeable),
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
serviceMap: make(proxyServiceMap),
serviceChanges: newServiceChangeMap(),
endpointsMap: make(proxyEndpointsMap),
@@ -1126,7 +1105,7 @@ func (proxier *Proxier) syncProxyRules() {
activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set
// Accumulate the set of local ports that we will be holding open once this update is complete
replacementPortsMap := map[localPort]closeable{}
replacementPortsMap := map[utilproxy.LocalPort]utilproxy.Closeable{}
// We are creating those slices ones here to avoid memory reallocations
// in every loop. Note that reuse the memory, instead of doing:
@@ -1200,11 +1179,11 @@ func (proxier *Proxier) syncProxyRules() {
if local, err := utilproxy.IsLocalIP(externalIP); err != nil {
glog.Errorf("can't determine if IP is local, assuming not: %v", err)
} else if local {
lp := localPort{
desc: "externalIP for " + svcNameString,
ip: externalIP,
port: svcInfo.port,
protocol: protocol,
lp := utilproxy.LocalPort{
Description: "externalIP for " + svcNameString,
IP: externalIP,
Port: svcInfo.port,
Protocol: protocol,
}
if proxier.portsMap[lp] != nil {
glog.V(4).Infof("Port %s was open before and is still needed", lp.String())
@@ -1337,11 +1316,11 @@ func (proxier *Proxier) syncProxyRules() {
if svcInfo.nodePort != 0 {
// Hold the local port open so no other process can open it
// (because the socket might open but it would never work).
lp := localPort{
desc: "nodePort for " + svcNameString,
ip: "",
port: svcInfo.nodePort,
protocol: protocol,
lp := utilproxy.LocalPort{
Description: "nodePort for " + svcNameString,
IP: "",
Port: svcInfo.nodePort,
Protocol: protocol,
}
if proxier.portsMap[lp] != nil {
glog.V(4).Infof("Port %s was open before and is still needed", lp.String())
@@ -1352,14 +1331,14 @@ func (proxier *Proxier) syncProxyRules() {
glog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
continue
}
if lp.protocol == "udp" {
if lp.Protocol == "udp" {
// TODO: We might have multiple services using the same port, and this will clear conntrack for all of them.
// This is very low impact. The NodePort range is intentionally obscure, and unlikely to actually collide with real Services.
// This only affects UDP connections, which are not common.
// See issue: https://github.com/kubernetes/kubernetes/issues/49881
err := utilproxy.ClearUDPConntrackForPort(proxier.exec, lp.port)
err := utilproxy.ClearUDPConntrackForPort(proxier.exec, lp.Port)
if err != nil {
glog.Errorf("Failed to clear udp conntrack for port %d, error: %v", lp.port, err)
glog.Errorf("Failed to clear udp conntrack for port %d, error: %v", lp.Port, err)
}
}
replacementPortsMap[lp] = socket
@@ -1601,7 +1580,8 @@ func (proxier *Proxier) syncProxyRules() {
}
// Revert new local ports.
revertPorts(replacementPortsMap, proxier.portsMap)
glog.V(2).Infof("Closing local ports after iptables-restore failure")
utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
return
}
@@ -1651,7 +1631,7 @@ func writeLine(buf *bytes.Buffer, words ...string) {
}
}
func openLocalPort(lp *localPort) (closeable, error) {
func openLocalPort(lp *utilproxy.LocalPort) (utilproxy.Closeable, error) {
// For ports on node IPs, open the actual port and hold it, even though we
// use iptables to redirect traffic.
// This ensures a) that it's safe to use that port and b) that (a) stays
@@ -1664,16 +1644,16 @@ func openLocalPort(lp *localPort) (closeable, error) {
// it. Tools like 'ss' and 'netstat' do not show sockets that are
// bind()ed but not listen()ed, and at least the default debian netcat
// has no way to avoid about 10 seconds of retries.
var socket closeable
switch lp.protocol {
var socket utilproxy.Closeable
switch lp.Protocol {
case "tcp":
listener, err := net.Listen("tcp", net.JoinHostPort(lp.ip, strconv.Itoa(lp.port)))
listener, err := net.Listen("tcp", net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port)))
if err != nil {
return nil, err
}
socket = listener
case "udp":
addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(lp.ip, strconv.Itoa(lp.port)))
addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port)))
if err != nil {
return nil, err
}
@@ -1683,20 +1663,8 @@ func openLocalPort(lp *localPort) (closeable, error) {
}
socket = conn
default:
return nil, fmt.Errorf("unknown protocol %q", lp.protocol)
return nil, fmt.Errorf("unknown protocol %q", lp.Protocol)
}
glog.V(2).Infof("Opened local port %s", lp.String())
return socket, nil
}
// revertPorts is closing ports in replacementPortsMap but not in originalPortsMap. In other words, it only
// closes the ports opened in this sync.
func revertPorts(replacementPortsMap, originalPortsMap map[localPort]closeable) {
for k, v := range replacementPortsMap {
// Only close newly opened local ports - leave ones that were open before this update
if originalPortsMap[k] == nil {
glog.V(2).Infof("Closing local port %s after iptables-restore failure", k.String())
v.Close()
}
}
}