mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
migrate to use k8s.io/util LocalPort and ListenPortOpener in ipvs.proxier
This commit is contained in:
parent
e68e105102
commit
97a5a3d4d5
@ -18,7 +18,6 @@ go_test(
|
||||
"//pkg/proxy:go_default_library",
|
||||
"//pkg/proxy/healthcheck:go_default_library",
|
||||
"//pkg/proxy/ipvs/testing:go_default_library",
|
||||
"//pkg/proxy/util:go_default_library",
|
||||
"//pkg/proxy/util/iptables:go_default_library",
|
||||
"//pkg/proxy/util/testing:go_default_library",
|
||||
"//pkg/util/async:go_default_library",
|
||||
|
@ -219,7 +219,7 @@ type Proxier struct {
|
||||
mu sync.Mutex // protects the following fields
|
||||
serviceMap proxy.ServiceMap
|
||||
endpointsMap proxy.EndpointsMap
|
||||
portsMap map[utilproxy.LocalPort]utilproxy.Closeable
|
||||
portsMap map[utilnet.LocalPort]utilnet.Closeable
|
||||
nodeLabels map[string]string
|
||||
// endpointsSynced, endpointSlicesSynced, and servicesSynced are set to true when
|
||||
// corresponding objects are synced after startup. This is used to avoid updating
|
||||
@ -246,7 +246,7 @@ type Proxier struct {
|
||||
localDetector proxyutiliptables.LocalTrafficDetector
|
||||
hostname string
|
||||
nodeIP net.IP
|
||||
portMapper utilproxy.PortOpener
|
||||
portMapper utilnet.PortOpener
|
||||
recorder record.EventRecorder
|
||||
|
||||
serviceHealthServer healthcheck.ServiceHealthServer
|
||||
@ -459,7 +459,7 @@ func NewProxier(ipt utiliptables.Interface,
|
||||
|
||||
proxier := &Proxier{
|
||||
ipFamily: ipFamily,
|
||||
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
|
||||
portsMap: make(map[utilnet.LocalPort]utilnet.Closeable),
|
||||
serviceMap: make(proxy.ServiceMap),
|
||||
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
|
||||
endpointsMap: make(proxy.EndpointsMap),
|
||||
@ -474,7 +474,7 @@ func NewProxier(ipt utiliptables.Interface,
|
||||
localDetector: localDetector,
|
||||
hostname: hostname,
|
||||
nodeIP: nodeIP,
|
||||
portMapper: &listenPortOpener{},
|
||||
portMapper: &utilnet.ListenPortOpener,
|
||||
recorder: recorder,
|
||||
serviceHealthServer: serviceHealthServer,
|
||||
healthzServer: healthzServer,
|
||||
@ -1093,7 +1093,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
}
|
||||
|
||||
// Accumulate the set of local ports that we will be holding open once this update is complete
|
||||
replacementPortsMap := map[utilproxy.LocalPort]utilproxy.Closeable{}
|
||||
replacementPortsMap := map[utilnet.LocalPort]utilnet.Closeable{}
|
||||
// activeIPVSServices represents IPVS service successfully created in this round of sync
|
||||
activeIPVSServices := map[string]bool{}
|
||||
// currentIPVSServices represent IPVS services listed from the system
|
||||
@ -1164,6 +1164,10 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
continue
|
||||
}
|
||||
isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP())
|
||||
localPortIPFamily := utilnet.IPv4
|
||||
if isIPv6 {
|
||||
localPortIPFamily = utilnet.IPv6
|
||||
}
|
||||
protocol := strings.ToLower(string(svcInfo.Protocol()))
|
||||
// Precompute svcNameString; with many services the many calls
|
||||
// to ServicePortName.String() show up in CPU profiles.
|
||||
@ -1246,17 +1250,18 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// (because the socket might open but it would never work).
|
||||
if (svcInfo.Protocol() != v1.ProtocolSCTP) && localAddrSet.Has(net.ParseIP(externalIP)) {
|
||||
// We do not start listening on SCTP ports, according to our agreement in the SCTP support KEP
|
||||
lp := utilproxy.LocalPort{
|
||||
lp := utilnet.LocalPort{
|
||||
Description: "externalIP for " + svcNameString,
|
||||
IP: externalIP,
|
||||
IPFamily: localPortIPFamily,
|
||||
Port: svcInfo.Port(),
|
||||
Protocol: protocol,
|
||||
Protocol: utilnet.Protocol(svcInfo.Protocol()),
|
||||
}
|
||||
if proxier.portsMap[lp] != nil {
|
||||
klog.V(4).Infof("Port %s was open before and is still needed", lp.String())
|
||||
replacementPortsMap[lp] = proxier.portsMap[lp]
|
||||
} else {
|
||||
socket, err := proxier.portMapper.OpenLocalPort(&lp, isIPv6)
|
||||
socket, err := proxier.portMapper.OpenLocalPort(&lp)
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("can't open %s, skipping this externalIP: %v", lp.String(), err)
|
||||
|
||||
@ -1270,6 +1275,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
klog.Error(msg)
|
||||
continue
|
||||
}
|
||||
klog.V(2).Infof("Opened local port %s", lp.String())
|
||||
replacementPortsMap[lp] = socket
|
||||
}
|
||||
} // We're holding the port, so it's OK to install IPVS rules.
|
||||
@ -1430,13 +1436,14 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
continue
|
||||
}
|
||||
|
||||
var lps []utilproxy.LocalPort
|
||||
var lps []utilnet.LocalPort
|
||||
for _, address := range nodeAddresses {
|
||||
lp := utilproxy.LocalPort{
|
||||
lp := utilnet.LocalPort{
|
||||
Description: "nodePort for " + svcNameString,
|
||||
IP: address,
|
||||
IPFamily: localPortIPFamily,
|
||||
Port: svcInfo.NodePort(),
|
||||
Protocol: protocol,
|
||||
Protocol: utilnet.Protocol(svcInfo.Protocol()),
|
||||
}
|
||||
if utilproxy.IsZeroCIDR(address) {
|
||||
// Empty IP address means all
|
||||
@ -1456,12 +1463,14 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// We do not start listening on SCTP ports, according to our agreement in the
|
||||
// SCTP support KEP
|
||||
} else if svcInfo.Protocol() != v1.ProtocolSCTP {
|
||||
socket, err := proxier.portMapper.OpenLocalPort(&lp, isIPv6)
|
||||
socket, err := proxier.portMapper.OpenLocalPort(&lp)
|
||||
if err != nil {
|
||||
klog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
|
||||
continue
|
||||
}
|
||||
if lp.Protocol == "udp" {
|
||||
klog.V(2).Infof("Opened local port %s", lp.String())
|
||||
|
||||
if lp.Protocol == utilnet.UDP {
|
||||
conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP)
|
||||
}
|
||||
replacementPortsMap[lp] = socket
|
||||
@ -2194,60 +2203,6 @@ func (proxier *Proxier) getLegacyBindAddr(activeBindAddrs map[string]bool, curre
|
||||
return legacyAddrs
|
||||
}
|
||||
|
||||
// listenPortOpener opens ports by calling bind() and listen().
|
||||
type listenPortOpener struct{}
|
||||
|
||||
// OpenLocalPort holds the given local port open.
|
||||
func (l *listenPortOpener) OpenLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) {
|
||||
return openLocalPort(lp, isIPv6)
|
||||
}
|
||||
|
||||
func openLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) {
|
||||
// For ports on node IPs, open the actual port and hold it, even though we
|
||||
// use ipvs to redirect traffic.
|
||||
// This ensures a) that it's safe to use that port and b) that (a) stays
|
||||
// true. The risk is that some process on the node (e.g. sshd or kubelet)
|
||||
// is using a port and we give that same port out to a Service. That would
|
||||
// be bad because ipvs would silently claim the traffic but the process
|
||||
// would never know.
|
||||
// NOTE: We should not need to have a real listen()ing socket - bind()
|
||||
// should be enough, but I can't figure out a way to e2e test without
|
||||
// it. Tools like 'ss' and 'netstat' do not show sockets that are
|
||||
// bind()ed but not listen()ed, and at least the default debian netcat
|
||||
// has no way to avoid about 10 seconds of retries.
|
||||
var socket utilproxy.Closeable
|
||||
switch lp.Protocol {
|
||||
case "tcp":
|
||||
network := "tcp4"
|
||||
if isIPv6 {
|
||||
network = "tcp6"
|
||||
}
|
||||
listener, err := net.Listen(network, net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port)))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
socket = listener
|
||||
case "udp":
|
||||
network := "udp4"
|
||||
if isIPv6 {
|
||||
network = "udp6"
|
||||
}
|
||||
addr, err := net.ResolveUDPAddr(network, net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port)))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conn, err := net.ListenUDP(network, addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
socket = conn
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown protocol %q", lp.Protocol)
|
||||
}
|
||||
klog.V(2).Infof("Opened local port %s", lp.String())
|
||||
return socket, nil
|
||||
}
|
||||
|
||||
// ipvs Proxier fall back on iptables when it needs to do SNAT for engress packets
|
||||
// It will only operate iptables *nat table.
|
||||
// Create and link the kube postrouting chain for SNAT packets.
|
||||
|
@ -36,7 +36,6 @@ import (
|
||||
"k8s.io/kubernetes/pkg/proxy"
|
||||
"k8s.io/kubernetes/pkg/proxy/healthcheck"
|
||||
netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing"
|
||||
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
|
||||
proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables"
|
||||
proxyutiltest "k8s.io/kubernetes/pkg/proxy/util/testing"
|
||||
"k8s.io/kubernetes/pkg/util/async"
|
||||
@ -70,12 +69,12 @@ func (f *fakeIPGetter) BindedIPs() (sets.String, error) {
|
||||
|
||||
// fakePortOpener implements portOpener.
|
||||
type fakePortOpener struct {
|
||||
openPorts []*utilproxy.LocalPort
|
||||
openPorts []*utilnet.LocalPort
|
||||
}
|
||||
|
||||
// OpenLocalPort fakes out the listen() and bind() used by syncProxyRules
|
||||
// to lock a local port.
|
||||
func (f *fakePortOpener) OpenLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) {
|
||||
func (f *fakePortOpener) OpenLocalPort(lp *utilnet.LocalPort) (utilnet.Closeable, error) {
|
||||
f.openPorts = append(f.openPorts, lp)
|
||||
return nil, nil
|
||||
}
|
||||
@ -151,8 +150,8 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u
|
||||
strictARP: false,
|
||||
localDetector: proxyutiliptables.NewNoOpLocalDetector(),
|
||||
hostname: testHostname,
|
||||
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
|
||||
portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}},
|
||||
portsMap: make(map[utilnet.LocalPort]utilnet.Closeable),
|
||||
portMapper: &fakePortOpener{[]*utilnet.LocalPort{}},
|
||||
serviceHealthServer: healthcheck.NewFakeServiceHealthServer(),
|
||||
ipvsScheduler: DefaultScheduler,
|
||||
ipGetter: &fakeIPGetter{nodeIPs: nodeIPs},
|
||||
|
Loading…
Reference in New Issue
Block a user