Merge pull request #114725 from danwinship/kube-proxy-startup-cleanup

(minor) kube-proxy startup cleanup
This commit is contained in:
Kubernetes Prow Robot 2023-01-05 13:57:59 -08:00 committed by GitHub
commit fcaa32bd99
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 40 additions and 41 deletions

View File

@ -54,6 +54,7 @@ import (
"k8s.io/kubernetes/pkg/proxy/iptables" "k8s.io/kubernetes/pkg/proxy/iptables"
"k8s.io/kubernetes/pkg/proxy/ipvs" "k8s.io/kubernetes/pkg/proxy/ipvs"
proxymetrics "k8s.io/kubernetes/pkg/proxy/metrics" proxymetrics "k8s.io/kubernetes/pkg/proxy/metrics"
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables" proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables"
utilipset "k8s.io/kubernetes/pkg/util/ipset" utilipset "k8s.io/kubernetes/pkg/util/ipset"
utiliptables "k8s.io/kubernetes/pkg/util/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables"
@ -145,8 +146,10 @@ func newProxyServer(
klog.V(2).InfoS("DetectLocalMode", "LocalMode", string(detectLocalMode)) klog.V(2).InfoS("DetectLocalMode", "LocalMode", string(detectLocalMode))
primaryFamily := v1.IPv4Protocol
primaryProtocol := utiliptables.ProtocolIPv4 primaryProtocol := utiliptables.ProtocolIPv4
if netutils.IsIPv6(nodeIP) { if netutils.IsIPv6(nodeIP) {
primaryFamily = v1.IPv6Protocol
primaryProtocol = utiliptables.ProtocolIPv6 primaryProtocol = utiliptables.ProtocolIPv6
} }
execer := exec.New() execer := exec.New()
@ -165,10 +168,21 @@ func newProxyServer(
ipt[1] = iptInterface ipt[1] = iptInterface
} }
for _, perFamilyIpt := range ipt { nodePortAddresses := config.NodePortAddresses
if !perFamilyIpt.Present() {
klog.InfoS("kube-proxy running in single-stack mode, this ipFamily is not supported", "ipFamily", perFamilyIpt.Protocol()) if !ipt[0].Present() {
dualStack = false return nil, fmt.Errorf("iptables is not supported for primary IP family %q", primaryProtocol)
} else if !ipt[1].Present() {
klog.InfoS("kube-proxy running in single-stack mode: secondary ipFamily is not supported", "ipFamily", ipt[1].Protocol())
dualStack = false
// Validate NodePortAddresses is single-stack
npaByFamily := proxyutil.MapCIDRsByIPFamily(config.NodePortAddresses)
secondaryFamily := proxyutil.OtherIPFamily(primaryFamily)
badAddrs := npaByFamily[secondaryFamily]
if len(badAddrs) > 0 {
klog.InfoS("Ignoring --nodeport-addresses of the wrong family", "ipFamily", secondaryFamily, "addresses", badAddrs)
nodePortAddresses = npaByFamily[primaryFamily]
} }
} }
@ -204,7 +218,7 @@ func newProxyServer(
nodeIPTuple(config.BindAddress), nodeIPTuple(config.BindAddress),
recorder, recorder,
healthzServer, healthzServer,
config.NodePortAddresses, nodePortAddresses,
) )
} else { } else {
// Create a single-stack proxier if and only if the node does not support dual-stack (i.e, no iptables support). // Create a single-stack proxier if and only if the node does not support dual-stack (i.e, no iptables support).
@ -216,6 +230,7 @@ func newProxyServer(
// TODO this has side effects that should only happen when Run() is invoked. // TODO this has side effects that should only happen when Run() is invoked.
proxier, err = iptables.NewProxier( proxier, err = iptables.NewProxier(
primaryFamily,
iptInterface, iptInterface,
utilsysctl.New(), utilsysctl.New(),
execer, execer,
@ -229,7 +244,7 @@ func newProxyServer(
nodeIP, nodeIP,
recorder, recorder,
healthzServer, healthzServer,
config.NodePortAddresses, nodePortAddresses,
) )
} }
@ -279,7 +294,7 @@ func newProxyServer(
recorder, recorder,
healthzServer, healthzServer,
config.IPVS.Scheduler, config.IPVS.Scheduler,
config.NodePortAddresses, nodePortAddresses,
kernelHandler, kernelHandler,
) )
} else { } else {
@ -290,6 +305,7 @@ func newProxyServer(
} }
proxier, err = ipvs.NewProxier( proxier, err = ipvs.NewProxier(
primaryFamily,
iptInterface, iptInterface,
ipvsInterface, ipvsInterface,
ipsetInterface, ipsetInterface,
@ -310,7 +326,7 @@ func newProxyServer(
recorder, recorder,
healthzServer, healthzServer,
config.IPVS.Scheduler, config.IPVS.Scheduler,
config.NodePortAddresses, nodePortAddresses,
kernelHandler, kernelHandler,
) )
} }

View File

@ -85,9 +85,14 @@ func NewHollowProxyOrDie(
klog.InfoS("can't determine this node's IP, assuming 127.0.0.1") klog.InfoS("can't determine this node's IP, assuming 127.0.0.1")
nodeIP = netutils.ParseIPSloppy("127.0.0.1") nodeIP = netutils.ParseIPSloppy("127.0.0.1")
} }
family := v1.IPv4Protocol
if iptInterface.IsIPv6() {
family = v1.IPv6Protocol
}
// Real proxier with fake iptables, sysctl, etc underneath it. // Real proxier with fake iptables, sysctl, etc underneath it.
//var err error //var err error
proxier, err = iptables.NewProxier( proxier, err = iptables.NewProxier(
family,
iptInterface, iptInterface,
sysctl, sysctl,
execer, execer,

View File

@ -221,7 +221,8 @@ var _ proxy.Provider = &Proxier{}
// An error will be returned if iptables fails to update or acquire the initial lock. // An error will be returned if iptables fails to update or acquire the initial lock.
// Once a proxier is created, it will keep iptables up to date in the background and // Once a proxier is created, it will keep iptables up to date in the background and
// will not terminate if a particular iptables call fails. // will not terminate if a particular iptables call fails.
func NewProxier(ipt utiliptables.Interface, func NewProxier(ipFamily v1.IPFamily,
ipt utiliptables.Interface,
sysctl utilsysctl.Interface, sysctl utilsysctl.Interface,
exec utilexec.Interface, exec utilexec.Interface,
syncPeriod time.Duration, syncPeriod time.Duration,
@ -259,18 +260,6 @@ func NewProxier(ipt utiliptables.Interface,
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses) serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses)
ipFamily := v1.IPv4Protocol
if ipt.IsIPv6() {
ipFamily = v1.IPv6Protocol
}
ipFamilyMap := utilproxy.MapCIDRsByIPFamily(nodePortAddresses)
nodePortAddresses = ipFamilyMap[ipFamily]
// Log the IPs not matching the ipFamily
if ips, ok := ipFamilyMap[utilproxy.OtherIPFamily(ipFamily)]; ok && len(ips) > 0 {
klog.InfoS("Found node IPs of the wrong family", "ipFamily", ipFamily, "IPs", strings.Join(ips, ","))
}
proxier := &Proxier{ proxier := &Proxier{
svcPortMap: make(proxy.ServicePortMap), svcPortMap: make(proxy.ServicePortMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
@ -337,14 +326,14 @@ func NewDualStackProxier(
) (proxy.Provider, error) { ) (proxy.Provider, error) {
// Create an ipv4 instance of the single-stack proxier // Create an ipv4 instance of the single-stack proxier
ipFamilyMap := utilproxy.MapCIDRsByIPFamily(nodePortAddresses) ipFamilyMap := utilproxy.MapCIDRsByIPFamily(nodePortAddresses)
ipv4Proxier, err := NewProxier(ipt[0], sysctl, ipv4Proxier, err := NewProxier(v1.IPv4Protocol, ipt[0], sysctl,
exec, syncPeriod, minSyncPeriod, masqueradeAll, localhostNodePorts, masqueradeBit, localDetectors[0], hostname, exec, syncPeriod, minSyncPeriod, masqueradeAll, localhostNodePorts, masqueradeBit, localDetectors[0], hostname,
nodeIP[0], recorder, healthzServer, ipFamilyMap[v1.IPv4Protocol]) nodeIP[0], recorder, healthzServer, ipFamilyMap[v1.IPv4Protocol])
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to create ipv4 proxier: %v", err) return nil, fmt.Errorf("unable to create ipv4 proxier: %v", err)
} }
ipv6Proxier, err := NewProxier(ipt[1], sysctl, ipv6Proxier, err := NewProxier(v1.IPv6Protocol, ipt[1], sysctl,
exec, syncPeriod, minSyncPeriod, masqueradeAll, false, masqueradeBit, localDetectors[1], hostname, exec, syncPeriod, minSyncPeriod, masqueradeAll, false, masqueradeBit, localDetectors[1], hostname,
nodeIP[1], recorder, healthzServer, ipFamilyMap[v1.IPv6Protocol]) nodeIP[1], recorder, healthzServer, ipFamilyMap[v1.IPv6Protocol])
if err != nil { if err != nil {

View File

@ -354,7 +354,8 @@ var _ proxy.Provider = &Proxier{}
// An error will be returned if it fails to update or acquire the initial lock. // An error will be returned if it fails to update or acquire the initial lock.
// Once a proxier is created, it will keep iptables and ipvs rules up to date in the background and // Once a proxier is created, it will keep iptables and ipvs rules up to date in the background and
// will not terminate if a particular iptables or ipvs call fails. // will not terminate if a particular iptables or ipvs call fails.
func NewProxier(ipt utiliptables.Interface, func NewProxier(ipFamily v1.IPFamily,
ipt utiliptables.Interface,
ipvs utilipvs.Interface, ipvs utilipvs.Interface,
ipset utilipset.Interface, ipset utilipset.Interface,
sysctl utilsysctl.Interface, sysctl utilsysctl.Interface,
@ -449,11 +450,6 @@ func NewProxier(ipt utiliptables.Interface,
masqueradeValue := 1 << uint(masqueradeBit) masqueradeValue := 1 << uint(masqueradeBit)
masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue) masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue)
ipFamily := v1.IPv4Protocol
if ipt.IsIPv6() {
ipFamily = v1.IPv6Protocol
}
klog.V(2).InfoS("Record nodeIP and family", "nodeIP", nodeIP, "family", ipFamily) klog.V(2).InfoS("Record nodeIP and family", "nodeIP", nodeIP, "family", ipFamily)
if len(scheduler) == 0 { if len(scheduler) == 0 {
@ -463,13 +459,6 @@ func NewProxier(ipt utiliptables.Interface,
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses) serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses)
ipFamilyMap := utilproxy.MapCIDRsByIPFamily(nodePortAddresses)
nodePortAddresses = ipFamilyMap[ipFamily]
// Log the IPs not matching the ipFamily
if ips, ok := ipFamilyMap[utilproxy.OtherIPFamily(ipFamily)]; ok && len(ips) > 0 {
klog.InfoS("Found node IPs of the wrong family", "ipFamily", ipFamily, "IPs", ips)
}
// excludeCIDRs has been validated before, here we just parse it to IPNet list // excludeCIDRs has been validated before, here we just parse it to IPNet list
parsedExcludeCIDRs, _ := netutils.ParseCIDRs(excludeCIDRs) parsedExcludeCIDRs, _ := netutils.ParseCIDRs(excludeCIDRs)
@ -551,7 +540,7 @@ func NewDualStackProxier(
ipFamilyMap := utilproxy.MapCIDRsByIPFamily(nodePortAddresses) ipFamilyMap := utilproxy.MapCIDRsByIPFamily(nodePortAddresses)
// Create an ipv4 instance of the single-stack proxier // Create an ipv4 instance of the single-stack proxier
ipv4Proxier, err := NewProxier(ipt[0], ipvs, safeIpset, sysctl, ipv4Proxier, err := NewProxier(v1.IPv4Protocol, ipt[0], ipvs, safeIpset, sysctl,
exec, syncPeriod, minSyncPeriod, filterCIDRs(false, excludeCIDRs), strictARP, exec, syncPeriod, minSyncPeriod, filterCIDRs(false, excludeCIDRs), strictARP,
tcpTimeout, tcpFinTimeout, udpTimeout, masqueradeAll, masqueradeBit, tcpTimeout, tcpFinTimeout, udpTimeout, masqueradeAll, masqueradeBit,
localDetectors[0], hostname, nodeIP[0], localDetectors[0], hostname, nodeIP[0],
@ -560,7 +549,7 @@ func NewDualStackProxier(
return nil, fmt.Errorf("unable to create ipv4 proxier: %v", err) return nil, fmt.Errorf("unable to create ipv4 proxier: %v", err)
} }
ipv6Proxier, err := NewProxier(ipt[1], ipvs, safeIpset, sysctl, ipv6Proxier, err := NewProxier(v1.IPv6Protocol, ipt[1], ipvs, safeIpset, sysctl,
exec, syncPeriod, minSyncPeriod, filterCIDRs(true, excludeCIDRs), strictARP, exec, syncPeriod, minSyncPeriod, filterCIDRs(true, excludeCIDRs), strictARP,
tcpTimeout, tcpFinTimeout, udpTimeout, masqueradeAll, masqueradeBit, tcpTimeout, tcpFinTimeout, udpTimeout, masqueradeAll, masqueradeBit,
localDetectors[1], hostname, nodeIP[1], localDetectors[1], hostname, nodeIP[1],

View File

@ -2117,11 +2117,11 @@ func TestOnlyLocalNodePorts(t *testing.T) {
addrs1 := []net.Addr{&net.IPNet{IP: netutils.ParseIPSloppy("2001:db8::"), Mask: net.CIDRMask(64, 128)}} addrs1 := []net.Addr{&net.IPNet{IP: netutils.ParseIPSloppy("2001:db8::"), Mask: net.CIDRMask(64, 128)}}
fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf, addrs) fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf, addrs)
fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf1, addrs1) fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf1, addrs1)
fp.nodePortAddresses = []string{"100.101.102.0/24", "2001:db8::0/64"} fp.nodePortAddresses = []string{"100.101.102.0/24"}
fp.syncProxyRules() fp.syncProxyRules()
// Expect 2 (matching ipvs IPFamily field) services and 1 destination // Expect 2 services and 1 destination
epVS := &netlinktest.ExpectedVirtualServer{ epVS := &netlinktest.ExpectedVirtualServer{
VSNum: 2, IP: nodeIP.String(), Port: uint16(svcNodePort), Protocol: string(v1.ProtocolTCP), VSNum: 2, IP: nodeIP.String(), Port: uint16(svcNodePort), Protocol: string(v1.ProtocolTCP),
RS: []netlinktest.ExpectedRealServer{{ RS: []netlinktest.ExpectedRealServer{{
@ -2205,7 +2205,7 @@ func TestHealthCheckNodePort(t *testing.T) {
addrs1 := []net.Addr{&net.IPNet{IP: netutils.ParseIPSloppy("2001:db8::"), Mask: net.CIDRMask(64, 128)}} addrs1 := []net.Addr{&net.IPNet{IP: netutils.ParseIPSloppy("2001:db8::"), Mask: net.CIDRMask(64, 128)}}
fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf, addrs) fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf, addrs)
fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf1, addrs1) fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf1, addrs1)
fp.nodePortAddresses = []string{"100.101.102.0/24", "2001:db8::0/64"} fp.nodePortAddresses = []string{"100.101.102.0/24"}
fp.syncProxyRules() fp.syncProxyRules()