mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 22:17:14 +00:00
ipvs: remove port opener
This commit is contained in:
parent
407dcf5164
commit
c4a00b7d90
@ -222,7 +222,6 @@ type Proxier struct {
|
||||
mu sync.Mutex // protects the following fields
|
||||
serviceMap proxy.ServiceMap
|
||||
endpointsMap proxy.EndpointsMap
|
||||
portsMap map[netutils.LocalPort]netutils.Closeable
|
||||
nodeLabels map[string]string
|
||||
// endpointSlicesSynced, and servicesSynced are set to true when
|
||||
// corresponding objects are synced after startup. This is used to avoid updating
|
||||
@ -248,7 +247,6 @@ type Proxier struct {
|
||||
localDetector proxyutiliptables.LocalTrafficDetector
|
||||
hostname string
|
||||
nodeIP net.IP
|
||||
portMapper netutils.PortOpener
|
||||
recorder events.EventRecorder
|
||||
|
||||
serviceHealthServer healthcheck.ServiceHealthServer
|
||||
@ -453,7 +451,6 @@ func NewProxier(ipt utiliptables.Interface,
|
||||
|
||||
proxier := &Proxier{
|
||||
ipFamily: ipFamily,
|
||||
portsMap: make(map[netutils.LocalPort]netutils.Closeable),
|
||||
serviceMap: make(proxy.ServiceMap),
|
||||
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
|
||||
endpointsMap: make(proxy.EndpointsMap),
|
||||
@ -468,7 +465,6 @@ func NewProxier(ipt utiliptables.Interface,
|
||||
localDetector: localDetector,
|
||||
hostname: hostname,
|
||||
nodeIP: nodeIP,
|
||||
portMapper: &netutils.ListenPortOpener,
|
||||
recorder: recorder,
|
||||
serviceHealthServer: serviceHealthServer,
|
||||
healthzServer: healthzServer,
|
||||
@ -1061,8 +1057,6 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
set.resetEntries()
|
||||
}
|
||||
|
||||
// Accumulate the set of local ports that we will be holding open once this update is complete
|
||||
replacementPortsMap := map[netutils.LocalPort]netutils.Closeable{}
|
||||
// activeIPVSServices represents IPVS service successfully created in this round of sync
|
||||
activeIPVSServices := map[string]bool{}
|
||||
// currentIPVSServices represent IPVS services listed from the system
|
||||
@ -1127,8 +1121,6 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// reset slice to filtered entries
|
||||
nodeIPs = nodeIPs[:idx]
|
||||
|
||||
localAddrSet := utilproxy.GetLocalAddrSet()
|
||||
|
||||
// Build IPVS rules for each service.
|
||||
for svcName, svc := range proxier.serviceMap {
|
||||
svcInfo, ok := svc.(*serviceInfo)
|
||||
@ -1222,41 +1214,6 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
|
||||
// Capture externalIPs.
|
||||
for _, externalIP := range svcInfo.ExternalIPStrings() {
|
||||
// If the "external" IP happens to be an IP that is local to this
|
||||
// machine, hold the local port open so no other process can open it
|
||||
// (because the socket might open but it would never work).
|
||||
if (svcInfo.Protocol() != v1.ProtocolSCTP) && localAddrSet.Has(netutils.ParseIPSloppy(externalIP)) {
|
||||
// We do not start listening on SCTP ports, according to our agreement in the SCTP support KEP
|
||||
lp := netutils.LocalPort{
|
||||
Description: "externalIP for " + svcNameString,
|
||||
IP: externalIP,
|
||||
IPFamily: localPortIPFamily,
|
||||
Port: svcInfo.Port(),
|
||||
Protocol: netutils.Protocol(svcInfo.Protocol()),
|
||||
}
|
||||
if proxier.portsMap[lp] != nil {
|
||||
klog.V(4).InfoS("Port was open before and is still needed", "port", lp)
|
||||
replacementPortsMap[lp] = proxier.portsMap[lp]
|
||||
} else {
|
||||
socket, err := proxier.portMapper.OpenLocalPort(&lp)
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("can't open port %s, skipping it", lp.String())
|
||||
|
||||
proxier.recorder.Eventf(
|
||||
&v1.ObjectReference{
|
||||
Kind: "Node",
|
||||
Name: proxier.hostname,
|
||||
UID: types.UID(proxier.hostname),
|
||||
Namespace: "",
|
||||
}, nil, v1.EventTypeWarning, err.Error(), "SyncProxyRules", msg)
|
||||
klog.ErrorS(err, "Can't open port, skipping it", "port", lp)
|
||||
continue
|
||||
}
|
||||
klog.V(2).InfoS("Opened local port", "port", lp)
|
||||
replacementPortsMap[lp] = socket
|
||||
}
|
||||
} // We're holding the port, so it's OK to install IPVS rules.
|
||||
|
||||
// ipset call
|
||||
entry := &utilipset.Entry{
|
||||
IP: externalIP,
|
||||
@ -1430,33 +1387,9 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
|
||||
// For ports on node IPs, open the actual port and hold it.
|
||||
for _, lp := range lps {
|
||||
if proxier.portsMap[lp] != nil {
|
||||
klog.V(4).InfoS("Port was open before and is still needed", "port", lp)
|
||||
replacementPortsMap[lp] = proxier.portsMap[lp]
|
||||
// We do not start listening on SCTP ports, according to our agreement in the
|
||||
// SCTP support KEP
|
||||
} else if svcInfo.Protocol() != v1.ProtocolSCTP {
|
||||
socket, err := proxier.portMapper.OpenLocalPort(&lp)
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("can't open port %s, skipping it", lp.String())
|
||||
|
||||
proxier.recorder.Eventf(
|
||||
&v1.ObjectReference{
|
||||
Kind: "Node",
|
||||
Name: proxier.hostname,
|
||||
UID: types.UID(proxier.hostname),
|
||||
Namespace: "",
|
||||
}, nil, v1.EventTypeWarning, err.Error(), "SyncProxyRules", msg)
|
||||
klog.ErrorS(err, "Can't open port, skipping it", "port", lp)
|
||||
continue
|
||||
}
|
||||
klog.V(2).InfoS("Opened local port", "port", lp)
|
||||
|
||||
if lp.Protocol == netutils.UDP {
|
||||
conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP)
|
||||
}
|
||||
replacementPortsMap[lp] = socket
|
||||
} // We're holding the port, so it's OK to install ipvs rules.
|
||||
if svcInfo.Protocol() != v1.ProtocolSCTP && lp.Protocol == netutils.UDP {
|
||||
conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP)
|
||||
}
|
||||
}
|
||||
|
||||
// Nodeports need SNAT, unless they're local.
|
||||
@ -1614,8 +1547,6 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
klog.ErrorS(err, "Failed to execute iptables-restore", "rules", string(proxier.iptablesData.Bytes()))
|
||||
}
|
||||
metrics.IptablesRestoreFailuresTotal.Inc()
|
||||
// Revert new local ports.
|
||||
utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
|
||||
return
|
||||
}
|
||||
for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes {
|
||||
@ -1626,14 +1557,6 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
}
|
||||
}
|
||||
|
||||
// Close old local ports and save new ones.
|
||||
for k, v := range proxier.portsMap {
|
||||
if replacementPortsMap[k] == nil {
|
||||
v.Close()
|
||||
}
|
||||
}
|
||||
proxier.portsMap = replacementPortsMap
|
||||
|
||||
// Get legacy bind address
|
||||
// currentBindAddrs represents ip addresses bind to DefaultDummyDevice from the system
|
||||
currentBindAddrs, err := proxier.netlinkHandle.ListBindAddress(DefaultDummyDevice)
|
||||
|
@ -70,18 +70,6 @@ func (f *fakeIPGetter) BindedIPs() (sets.String, error) {
|
||||
return f.bindedIPs, nil
|
||||
}
|
||||
|
||||
// fakePortOpener implements portOpener.
|
||||
type fakePortOpener struct {
|
||||
openPorts []*netutils.LocalPort
|
||||
}
|
||||
|
||||
// OpenLocalPort fakes out the listen() and bind() used by syncProxyRules
|
||||
// to lock a local port.
|
||||
func (f *fakePortOpener) OpenLocalPort(lp *netutils.LocalPort) (netutils.Closeable, error) {
|
||||
f.openPorts = append(f.openPorts, lp)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// fakeKernelHandler implements KernelHandler.
|
||||
type fakeKernelHandler struct {
|
||||
modules []string
|
||||
@ -153,8 +141,6 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u
|
||||
strictARP: false,
|
||||
localDetector: proxyutiliptables.NewNoOpLocalDetector(),
|
||||
hostname: testHostname,
|
||||
portsMap: make(map[netutils.LocalPort]netutils.Closeable),
|
||||
portMapper: &fakePortOpener{[]*netutils.LocalPort{}},
|
||||
serviceHealthServer: healthcheck.NewFakeServiceHealthServer(),
|
||||
ipvsScheduler: DefaultScheduler,
|
||||
ipGetter: &fakeIPGetter{nodeIPs: nodeIPs},
|
||||
|
Loading…
Reference in New Issue
Block a user