mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-11 13:02:14 +00:00
pkg/proxy: refactor NodePortAddresses to NodeAddressHandler
Signed-off-by: Daman Arora <aroradaman@gmail.com>
This commit is contained in:
parent
4f732a233d
commit
48f1356b2f
@ -277,7 +277,7 @@ func checkBadConfig(s *ProxyServer) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Warn if NodePortAddresses does not limit connections on all IP families that
|
// Warn if NodeAddressHandler does not limit connections on all IP families that
|
||||||
// seem to be in use.
|
// seem to be in use.
|
||||||
cidrsByFamily := proxyutil.MapCIDRsByIPFamily(s.Config.NodePortAddresses)
|
cidrsByFamily := proxyutil.MapCIDRsByIPFamily(s.Config.NodePortAddresses)
|
||||||
if len(s.Config.NodePortAddresses) == 0 {
|
if len(s.Config.NodePortAddresses) == 0 {
|
||||||
@ -347,7 +347,7 @@ func checkBadIPConfig(s *ProxyServer, dualStackSupported bool) (err error, fatal
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Note that s.Config.NodePortAddresses gets checked as part of checkBadConfig()
|
// Note that s.Config.NodeAddressHandler gets checked as part of checkBadConfig()
|
||||||
// so it doesn't need to be checked here.
|
// so it doesn't need to be checked here.
|
||||||
|
|
||||||
return utilerrors.NewAggregate(errors), fatal
|
return utilerrors.NewAggregate(errors), fatal
|
||||||
|
@ -149,10 +149,10 @@ func (fake fakeProxierHealthChecker) IsHealthy() bool {
|
|||||||
func TestServer(t *testing.T) {
|
func TestServer(t *testing.T) {
|
||||||
listener := newFakeListener()
|
listener := newFakeListener()
|
||||||
httpFactory := newFakeHTTPServerFactory()
|
httpFactory := newFakeHTTPServerFactory()
|
||||||
nodePortAddresses := proxyutil.NewNodePortAddresses(v1.IPv4Protocol, []string{})
|
nodeAddressHandler := proxyutil.NewNodeAddressHandler(v1.IPv4Protocol, []string{})
|
||||||
proxyChecker := &fakeProxierHealthChecker{true}
|
proxyChecker := &fakeProxierHealthChecker{true}
|
||||||
|
|
||||||
hcsi := newServiceHealthServer("hostname", nil, listener, httpFactory, nodePortAddresses, proxyChecker)
|
hcsi := newServiceHealthServer("hostname", nil, listener, httpFactory, nodeAddressHandler, proxyChecker)
|
||||||
hcs := hcsi.(*server)
|
hcs := hcsi.(*server)
|
||||||
if len(hcs.services) != 0 {
|
if len(hcs.services) != 0 {
|
||||||
t.Errorf("expected 0 services, got %d", len(hcs.services))
|
t.Errorf("expected 0 services, got %d", len(hcs.services))
|
||||||
@ -663,9 +663,9 @@ func TestServerWithSelectiveListeningAddress(t *testing.T) {
|
|||||||
|
|
||||||
// limiting addresses to loop back. We don't want any cleverness here around getting IP for
|
// limiting addresses to loop back. We don't want any cleverness here around getting IP for
|
||||||
// machine nor testing ipv6 || ipv4. using loop back guarantees the test will work on any machine
|
// machine nor testing ipv6 || ipv4. using loop back guarantees the test will work on any machine
|
||||||
nodePortAddresses := proxyutil.NewNodePortAddresses(v1.IPv4Protocol, []string{"127.0.0.0/8"})
|
nodeAddressHandler := proxyutil.NewNodeAddressHandler(v1.IPv4Protocol, []string{"127.0.0.0/8"})
|
||||||
|
|
||||||
hcsi := newServiceHealthServer("hostname", nil, listener, httpFactory, nodePortAddresses, proxyChecker)
|
hcsi := newServiceHealthServer("hostname", nil, listener, httpFactory, nodeAddressHandler, proxyChecker)
|
||||||
hcs := hcsi.(*server)
|
hcs := hcsi.(*server)
|
||||||
if len(hcs.services) != 0 {
|
if len(hcs.services) != 0 {
|
||||||
t.Errorf("expected 0 services, got %d", len(hcs.services))
|
t.Errorf("expected 0 services, got %d", len(hcs.services))
|
||||||
|
@ -58,17 +58,17 @@ type proxierHealthChecker interface {
|
|||||||
IsHealthy() bool
|
IsHealthy() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func newServiceHealthServer(hostname string, recorder events.EventRecorder, listener listener, factory httpServerFactory, nodePortAddresses *proxyutil.NodePortAddresses, healthzServer proxierHealthChecker) ServiceHealthServer {
|
func newServiceHealthServer(hostname string, recorder events.EventRecorder, listener listener, factory httpServerFactory, nodeAddressHandler *proxyutil.NodeAddressHandler, healthzServer proxierHealthChecker) ServiceHealthServer {
|
||||||
// It doesn't matter whether we listen on "0.0.0.0", "::", or ""; go
|
// It doesn't matter whether we listen on "0.0.0.0", "::", or ""; go
|
||||||
// treats them all the same.
|
// treats them all the same.
|
||||||
nodeIPs := []net.IP{net.IPv4zero}
|
nodeIPs := []net.IP{net.IPv4zero}
|
||||||
|
|
||||||
if !nodePortAddresses.MatchAll() {
|
if !nodeAddressHandler.MatchAll() {
|
||||||
ips, err := nodePortAddresses.GetNodeIPs(proxyutil.RealNetwork{})
|
ips, err := nodeAddressHandler.GetNodeIPs(proxyutil.RealNetwork{})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
nodeIPs = ips
|
nodeIPs = ips
|
||||||
} else {
|
} else {
|
||||||
klog.ErrorS(err, "Failed to get node ip address matching node port addresses, health check port will listen to all node addresses", "nodePortAddresses", nodePortAddresses)
|
klog.ErrorS(err, "Failed to get node ip address matching node port addresses, health check port will listen to all node addresses", "nodeAddresses", nodeAddressHandler)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -84,7 +84,7 @@ func newServiceHealthServer(hostname string, recorder events.EventRecorder, list
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewServiceHealthServer allocates a new service healthcheck server manager
|
// NewServiceHealthServer allocates a new service healthcheck server manager
|
||||||
func NewServiceHealthServer(hostname string, recorder events.EventRecorder, nodePortAddresses *proxyutil.NodePortAddresses, healthzServer proxierHealthChecker) ServiceHealthServer {
|
func NewServiceHealthServer(hostname string, recorder events.EventRecorder, nodePortAddresses *proxyutil.NodeAddressHandler, healthzServer proxierHealthChecker) ServiceHealthServer {
|
||||||
return newServiceHealthServer(hostname, recorder, stdNetListener{}, stdHTTPServerFactory{}, nodePortAddresses, healthzServer)
|
return newServiceHealthServer(hostname, recorder, stdNetListener{}, stdHTTPServerFactory{}, nodePortAddresses, healthzServer)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -205,8 +205,8 @@ type Proxier struct {
|
|||||||
// conntrackTCPLiberal indicates whether the system sets the kernel nf_conntrack_tcp_be_liberal
|
// conntrackTCPLiberal indicates whether the system sets the kernel nf_conntrack_tcp_be_liberal
|
||||||
conntrackTCPLiberal bool
|
conntrackTCPLiberal bool
|
||||||
|
|
||||||
// nodePortAddresses selects the interfaces where nodePort works.
|
// nodeAddressHandler selects the interfaces where nodePort works.
|
||||||
nodePortAddresses *proxyutil.NodePortAddresses
|
nodeAddressHandler *proxyutil.NodeAddressHandler
|
||||||
// networkInterfacer defines an interface for several net library functions.
|
// networkInterfacer defines an interface for several net library functions.
|
||||||
// Inject for test purpose.
|
// Inject for test purpose.
|
||||||
networkInterfacer proxyutil.NetworkInterfacer
|
networkInterfacer proxyutil.NetworkInterfacer
|
||||||
@ -244,9 +244,9 @@ func NewProxier(ctx context.Context,
|
|||||||
initOnly bool,
|
initOnly bool,
|
||||||
) (*Proxier, error) {
|
) (*Proxier, error) {
|
||||||
logger := klog.LoggerWithValues(klog.FromContext(ctx), "ipFamily", ipFamily)
|
logger := klog.LoggerWithValues(klog.FromContext(ctx), "ipFamily", ipFamily)
|
||||||
nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nodePortAddressStrings)
|
nodeAddressHandler := proxyutil.NewNodeAddressHandler(ipFamily, nodePortAddressStrings)
|
||||||
|
|
||||||
if !nodePortAddresses.ContainsIPv4Loopback() {
|
if !nodeAddressHandler.ContainsIPv4Loopback() {
|
||||||
localhostNodePorts = false
|
localhostNodePorts = false
|
||||||
}
|
}
|
||||||
if localhostNodePorts {
|
if localhostNodePorts {
|
||||||
@ -277,7 +277,7 @@ func NewProxier(ctx context.Context,
|
|||||||
masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue)
|
masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue)
|
||||||
logger.V(2).Info("Using iptables mark for masquerade", "mark", masqueradeMark)
|
logger.V(2).Info("Using iptables mark for masquerade", "mark", masqueradeMark)
|
||||||
|
|
||||||
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer)
|
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodeAddressHandler, healthzServer)
|
||||||
nfacctRunner, err := nfacct.New()
|
nfacctRunner, err := nfacct.New()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(err, "Failed to create nfacct runner, nfacct based metrics won't be available")
|
logger.Error(err, "Failed to create nfacct runner, nfacct based metrics won't be available")
|
||||||
@ -310,7 +310,7 @@ func NewProxier(ctx context.Context,
|
|||||||
natChains: proxyutil.NewLineBuffer(),
|
natChains: proxyutil.NewLineBuffer(),
|
||||||
natRules: proxyutil.NewLineBuffer(),
|
natRules: proxyutil.NewLineBuffer(),
|
||||||
localhostNodePorts: localhostNodePorts,
|
localhostNodePorts: localhostNodePorts,
|
||||||
nodePortAddresses: nodePortAddresses,
|
nodeAddressHandler: nodeAddressHandler,
|
||||||
networkInterfacer: proxyutil.RealNetwork{},
|
networkInterfacer: proxyutil.RealNetwork{},
|
||||||
conntrackTCPLiberal: conntrackTCPLiberal,
|
conntrackTCPLiberal: conntrackTCPLiberal,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
@ -1447,7 +1447,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
|
|
||||||
// Finally, tail-call to the nodePorts chain. This needs to be after all
|
// Finally, tail-call to the nodePorts chain. This needs to be after all
|
||||||
// other service portal rules.
|
// other service portal rules.
|
||||||
if proxier.nodePortAddresses.MatchAll() {
|
if proxier.nodeAddressHandler.MatchAll() {
|
||||||
destinations := []string{"-m", "addrtype", "--dst-type", "LOCAL"}
|
destinations := []string{"-m", "addrtype", "--dst-type", "LOCAL"}
|
||||||
// Block localhost nodePorts if they are not supported. (For IPv6 they never
|
// Block localhost nodePorts if they are not supported. (For IPv6 they never
|
||||||
// work, and for IPv4 they only work if we previously set `route_localnet`.)
|
// work, and for IPv4 they only work if we previously set `route_localnet`.)
|
||||||
@ -1463,9 +1463,9 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
destinations,
|
destinations,
|
||||||
"-j", string(kubeNodePortsChain))
|
"-j", string(kubeNodePortsChain))
|
||||||
} else {
|
} else {
|
||||||
nodeIPs, err := proxier.nodePortAddresses.GetNodeIPs(proxier.networkInterfacer)
|
nodeIPs, err := proxier.nodeAddressHandler.GetNodeIPs(proxier.networkInterfacer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
proxier.logger.Error(err, "Failed to get node ip address matching nodeport cidrs, services with nodeport may not work as intended", "CIDRs", proxier.nodePortAddresses)
|
proxier.logger.Error(err, "Failed to get node ip address matching nodeport cidrs, services with nodeport may not work as intended", "CIDRs", proxier.nodeAddressHandler)
|
||||||
}
|
}
|
||||||
for _, ip := range nodeIPs {
|
for _, ip := range nodeIPs {
|
||||||
if ip.IsLoopback() {
|
if ip.IsLoopback() {
|
||||||
|
@ -134,7 +134,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
|
|||||||
natRules: proxyutil.NewLineBuffer(),
|
natRules: proxyutil.NewLineBuffer(),
|
||||||
nodeIP: netutils.ParseIPSloppy(testNodeIP),
|
nodeIP: netutils.ParseIPSloppy(testNodeIP),
|
||||||
localhostNodePorts: true,
|
localhostNodePorts: true,
|
||||||
nodePortAddresses: proxyutil.NewNodePortAddresses(ipfamily, nil),
|
nodeAddressHandler: proxyutil.NewNodeAddressHandler(ipfamily, nil),
|
||||||
networkInterfacer: networkInterfacer,
|
networkInterfacer: networkInterfacer,
|
||||||
nfAcctCounters: map[string]bool{
|
nfAcctCounters: map[string]bool{
|
||||||
metrics.IPTablesCTStateInvalidDroppedNFAcctCounter: true,
|
metrics.IPTablesCTStateInvalidDroppedNFAcctCounter: true,
|
||||||
@ -2351,7 +2351,7 @@ func TestNodePorts(t *testing.T) {
|
|||||||
fp := NewFakeProxier(ipt)
|
fp := NewFakeProxier(ipt)
|
||||||
fp.localhostNodePorts = tc.localhostNodePorts
|
fp.localhostNodePorts = tc.localhostNodePorts
|
||||||
if tc.nodePortAddresses != nil {
|
if tc.nodePortAddresses != nil {
|
||||||
fp.nodePortAddresses = proxyutil.NewNodePortAddresses(tc.family, tc.nodePortAddresses)
|
fp.nodeAddressHandler = proxyutil.NewNodeAddressHandler(tc.family, tc.nodePortAddresses)
|
||||||
}
|
}
|
||||||
|
|
||||||
makeServiceMap(fp,
|
makeServiceMap(fp,
|
||||||
@ -2499,7 +2499,7 @@ func TestNodePorts(t *testing.T) {
|
|||||||
func TestHealthCheckNodePort(t *testing.T) {
|
func TestHealthCheckNodePort(t *testing.T) {
|
||||||
ipt := iptablestest.NewFake()
|
ipt := iptablestest.NewFake()
|
||||||
fp := NewFakeProxier(ipt)
|
fp := NewFakeProxier(ipt)
|
||||||
fp.nodePortAddresses = proxyutil.NewNodePortAddresses(v1.IPv4Protocol, []string{"127.0.0.0/8"})
|
fp.nodeAddressHandler = proxyutil.NewNodeAddressHandler(v1.IPv4Protocol, []string{"127.0.0.0/8"})
|
||||||
|
|
||||||
svcIP := "172.30.0.42"
|
svcIP := "172.30.0.42"
|
||||||
svcPort := 80
|
svcPort := 80
|
||||||
|
@ -227,8 +227,8 @@ type Proxier struct {
|
|||||||
netlinkHandle NetLinkHandle
|
netlinkHandle NetLinkHandle
|
||||||
// ipsetList is the list of ipsets that ipvs proxier used.
|
// ipsetList is the list of ipsets that ipvs proxier used.
|
||||||
ipsetList map[string]*IPSet
|
ipsetList map[string]*IPSet
|
||||||
// nodePortAddresses selects the interfaces where nodePort works.
|
// nodeAddressHandler selects the interfaces where nodePort works.
|
||||||
nodePortAddresses *proxyutil.NodePortAddresses
|
nodeAddressHandler *proxyutil.NodeAddressHandler
|
||||||
// networkInterfacer defines an interface for several net library functions.
|
// networkInterfacer defines an interface for several net library functions.
|
||||||
// Inject for test purpose.
|
// Inject for test purpose.
|
||||||
networkInterfacer proxyutil.NetworkInterfacer
|
networkInterfacer proxyutil.NetworkInterfacer
|
||||||
@ -365,9 +365,9 @@ func NewProxier(
|
|||||||
scheduler = defaultScheduler
|
scheduler = defaultScheduler
|
||||||
}
|
}
|
||||||
|
|
||||||
nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nodePortAddressStrings)
|
nodeAddressHandler := proxyutil.NewNodeAddressHandler(ipFamily, nodePortAddressStrings)
|
||||||
|
|
||||||
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer)
|
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodeAddressHandler, healthzServer)
|
||||||
|
|
||||||
// excludeCIDRs has been validated before, here we just parse it to IPNet list
|
// excludeCIDRs has been validated before, here we just parse it to IPNet list
|
||||||
parsedExcludeCIDRs, _ := netutils.ParseCIDRs(excludeCIDRs)
|
parsedExcludeCIDRs, _ := netutils.ParseCIDRs(excludeCIDRs)
|
||||||
@ -402,7 +402,7 @@ func NewProxier(
|
|||||||
filterRules: proxyutil.NewLineBuffer(),
|
filterRules: proxyutil.NewLineBuffer(),
|
||||||
netlinkHandle: NewNetLinkHandle(ipFamily == v1.IPv6Protocol),
|
netlinkHandle: NewNetLinkHandle(ipFamily == v1.IPv6Protocol),
|
||||||
ipset: ipset,
|
ipset: ipset,
|
||||||
nodePortAddresses: nodePortAddresses,
|
nodeAddressHandler: nodeAddressHandler,
|
||||||
networkInterfacer: proxyutil.RealNetwork{},
|
networkInterfacer: proxyutil.RealNetwork{},
|
||||||
gracefuldeleteManager: NewGracefulTerminationManager(ipvs),
|
gracefuldeleteManager: NewGracefulTerminationManager(ipvs),
|
||||||
logger: logger,
|
logger: logger,
|
||||||
@ -1000,12 +1000,12 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
// can be reused for all nodePort services.
|
// can be reused for all nodePort services.
|
||||||
var nodeIPs []net.IP
|
var nodeIPs []net.IP
|
||||||
if hasNodePort {
|
if hasNodePort {
|
||||||
if proxier.nodePortAddresses.MatchAll() {
|
if proxier.nodeAddressHandler.MatchAll() {
|
||||||
for _, ipStr := range nodeAddressSet.UnsortedList() {
|
for _, ipStr := range nodeAddressSet.UnsortedList() {
|
||||||
nodeIPs = append(nodeIPs, netutils.ParseIPSloppy(ipStr))
|
nodeIPs = append(nodeIPs, netutils.ParseIPSloppy(ipStr))
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
allNodeIPs, err := proxier.nodePortAddresses.GetNodeIPs(proxier.networkInterfacer)
|
allNodeIPs, err := proxier.nodeAddressHandler.GetNodeIPs(proxier.networkInterfacer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
proxier.logger.Error(err, "Failed to get node IP address matching nodeport cidr")
|
proxier.logger.Error(err, "Failed to get node IP address matching nodeport cidr")
|
||||||
} else {
|
} else {
|
||||||
|
@ -160,7 +160,7 @@ func NewFakeProxier(ctx context.Context, ipt utiliptables.Interface, ipvs utilip
|
|||||||
filterRules: proxyutil.NewLineBuffer(),
|
filterRules: proxyutil.NewLineBuffer(),
|
||||||
netlinkHandle: netlinkHandle,
|
netlinkHandle: netlinkHandle,
|
||||||
ipsetList: ipsetList,
|
ipsetList: ipsetList,
|
||||||
nodePortAddresses: proxyutil.NewNodePortAddresses(ipFamily, nil),
|
nodeAddressHandler: proxyutil.NewNodeAddressHandler(ipFamily, nil),
|
||||||
networkInterfacer: proxyutiltest.NewFakeNetwork(),
|
networkInterfacer: proxyutiltest.NewFakeNetwork(),
|
||||||
gracefuldeleteManager: NewGracefulTerminationManager(ipvs),
|
gracefuldeleteManager: NewGracefulTerminationManager(ipvs),
|
||||||
ipFamily: ipFamily,
|
ipFamily: ipFamily,
|
||||||
@ -950,7 +950,7 @@ func TestNodePortIPv4(t *testing.T) {
|
|||||||
ipvs := ipvstest.NewFake()
|
ipvs := ipvstest.NewFake()
|
||||||
ipset := ipsettest.NewFake(testIPSetVersion)
|
ipset := ipsettest.NewFake(testIPSetVersion)
|
||||||
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, test.nodeIPs, nil, v1.IPv4Protocol)
|
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, test.nodeIPs, nil, v1.IPv4Protocol)
|
||||||
fp.nodePortAddresses = proxyutil.NewNodePortAddresses(v1.IPv4Protocol, test.nodePortAddresses)
|
fp.nodeAddressHandler = proxyutil.NewNodeAddressHandler(v1.IPv4Protocol, test.nodePortAddresses)
|
||||||
|
|
||||||
makeServiceMap(fp, test.services...)
|
makeServiceMap(fp, test.services...)
|
||||||
populateEndpointSlices(fp, test.endpoints...)
|
populateEndpointSlices(fp, test.endpoints...)
|
||||||
@ -1293,7 +1293,7 @@ func TestNodePortIPv6(t *testing.T) {
|
|||||||
ipvs := ipvstest.NewFake()
|
ipvs := ipvstest.NewFake()
|
||||||
ipset := ipsettest.NewFake(testIPSetVersion)
|
ipset := ipsettest.NewFake(testIPSetVersion)
|
||||||
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, test.nodeIPs, nil, v1.IPv6Protocol)
|
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, test.nodeIPs, nil, v1.IPv6Protocol)
|
||||||
fp.nodePortAddresses = proxyutil.NewNodePortAddresses(v1.IPv6Protocol, test.nodePortAddresses)
|
fp.nodeAddressHandler = proxyutil.NewNodeAddressHandler(v1.IPv6Protocol, test.nodePortAddresses)
|
||||||
|
|
||||||
makeServiceMap(fp, test.services...)
|
makeServiceMap(fp, test.services...)
|
||||||
populateEndpointSlices(fp, test.endpoints...)
|
populateEndpointSlices(fp, test.endpoints...)
|
||||||
@ -2053,7 +2053,7 @@ func TestOnlyLocalNodePorts(t *testing.T) {
|
|||||||
addrs1 := []net.Addr{&net.IPNet{IP: netutils.ParseIPSloppy("2001:db8::"), Mask: net.CIDRMask(64, 128)}}
|
addrs1 := []net.Addr{&net.IPNet{IP: netutils.ParseIPSloppy("2001:db8::"), Mask: net.CIDRMask(64, 128)}}
|
||||||
fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf, addrs)
|
fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf, addrs)
|
||||||
fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf1, addrs1)
|
fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf1, addrs1)
|
||||||
fp.nodePortAddresses = proxyutil.NewNodePortAddresses(v1.IPv4Protocol, []string{"100.101.102.0/24"})
|
fp.nodeAddressHandler = proxyutil.NewNodeAddressHandler(v1.IPv4Protocol, []string{"100.101.102.0/24"})
|
||||||
|
|
||||||
fp.syncProxyRules()
|
fp.syncProxyRules()
|
||||||
|
|
||||||
@ -2141,7 +2141,7 @@ func TestHealthCheckNodePort(t *testing.T) {
|
|||||||
addrs1 := []net.Addr{&net.IPNet{IP: netutils.ParseIPSloppy("2001:db8::"), Mask: net.CIDRMask(64, 128)}}
|
addrs1 := []net.Addr{&net.IPNet{IP: netutils.ParseIPSloppy("2001:db8::"), Mask: net.CIDRMask(64, 128)}}
|
||||||
fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf, addrs)
|
fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf, addrs)
|
||||||
fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf1, addrs1)
|
fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf1, addrs1)
|
||||||
fp.nodePortAddresses = proxyutil.NewNodePortAddresses(v1.IPv4Protocol, []string{"100.101.102.0/24"})
|
fp.nodeAddressHandler = proxyutil.NewNodeAddressHandler(v1.IPv4Protocol, []string{"100.101.102.0/24"})
|
||||||
|
|
||||||
fp.syncProxyRules()
|
fp.syncProxyRules()
|
||||||
|
|
||||||
|
@ -180,8 +180,8 @@ type Proxier struct {
|
|||||||
serviceHealthServer healthcheck.ServiceHealthServer
|
serviceHealthServer healthcheck.ServiceHealthServer
|
||||||
healthzServer *healthcheck.ProxierHealthServer
|
healthzServer *healthcheck.ProxierHealthServer
|
||||||
|
|
||||||
// nodePortAddresses selects the interfaces where nodePort works.
|
// nodeAddressHandler selects the interfaces where nodePort works.
|
||||||
nodePortAddresses *proxyutil.NodePortAddresses
|
nodeAddressHandler *proxyutil.NodeAddressHandler
|
||||||
// networkInterfacer defines an interface for several net library functions.
|
// networkInterfacer defines an interface for several net library functions.
|
||||||
// Inject for test purpose.
|
// Inject for test purpose.
|
||||||
networkInterfacer proxyutil.NetworkInterfacer
|
networkInterfacer proxyutil.NetworkInterfacer
|
||||||
@ -240,9 +240,9 @@ func NewProxier(ctx context.Context,
|
|||||||
masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue)
|
masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue)
|
||||||
logger.V(2).Info("Using nftables mark for masquerade", "mark", masqueradeMark)
|
logger.V(2).Info("Using nftables mark for masquerade", "mark", masqueradeMark)
|
||||||
|
|
||||||
nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nodePortAddressStrings)
|
nodeAddressHandler := proxyutil.NewNodeAddressHandler(ipFamily, nodePortAddressStrings)
|
||||||
|
|
||||||
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer)
|
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodeAddressHandler, healthzServer)
|
||||||
|
|
||||||
proxier := &Proxier{
|
proxier := &Proxier{
|
||||||
ipFamily: ipFamily,
|
ipFamily: ipFamily,
|
||||||
@ -262,7 +262,7 @@ func NewProxier(ctx context.Context,
|
|||||||
recorder: recorder,
|
recorder: recorder,
|
||||||
serviceHealthServer: serviceHealthServer,
|
serviceHealthServer: serviceHealthServer,
|
||||||
healthzServer: healthzServer,
|
healthzServer: healthzServer,
|
||||||
nodePortAddresses: nodePortAddresses,
|
nodeAddressHandler: nodeAddressHandler,
|
||||||
networkInterfacer: proxyutil.RealNetwork{},
|
networkInterfacer: proxyutil.RealNetwork{},
|
||||||
staleChains: make(map[string]time.Time),
|
staleChains: make(map[string]time.Time),
|
||||||
logger: logger,
|
logger: logger,
|
||||||
@ -574,7 +574,7 @@ func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) {
|
|||||||
Type: ipvX_addr,
|
Type: ipvX_addr,
|
||||||
Comment: ptr.To("IPs that accept NodePort traffic"),
|
Comment: ptr.To("IPs that accept NodePort traffic"),
|
||||||
})
|
})
|
||||||
if proxier.nodePortAddresses.MatchAll() {
|
if proxier.nodeAddressHandler.MatchAll() {
|
||||||
tx.Delete(&knftables.Set{
|
tx.Delete(&knftables.Set{
|
||||||
Name: nodePortIPsSet,
|
Name: nodePortIPsSet,
|
||||||
})
|
})
|
||||||
@ -582,9 +582,9 @@ func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) {
|
|||||||
tx.Flush(&knftables.Set{
|
tx.Flush(&knftables.Set{
|
||||||
Name: nodePortIPsSet,
|
Name: nodePortIPsSet,
|
||||||
})
|
})
|
||||||
nodeIPs, err := proxier.nodePortAddresses.GetNodeIPs(proxier.networkInterfacer)
|
nodeIPs, err := proxier.nodeAddressHandler.GetNodeIPs(proxier.networkInterfacer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
proxier.logger.Error(err, "Failed to get node ip address matching nodeport cidrs, services with nodeport may not work as intended", "CIDRs", proxier.nodePortAddresses)
|
proxier.logger.Error(err, "Failed to get node ip address matching nodeport cidrs, services with nodeport may not work as intended", "CIDRs", proxier.nodeAddressHandler)
|
||||||
}
|
}
|
||||||
for _, ip := range nodeIPs {
|
for _, ip := range nodeIPs {
|
||||||
if ip.IsLoopback() {
|
if ip.IsLoopback() {
|
||||||
@ -632,7 +632,7 @@ func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) {
|
|||||||
),
|
),
|
||||||
})
|
})
|
||||||
|
|
||||||
if proxier.nodePortAddresses.MatchAll() {
|
if proxier.nodeAddressHandler.MatchAll() {
|
||||||
tx.Add(&knftables.Rule{
|
tx.Add(&knftables.Rule{
|
||||||
Chain: nodePortEndpointsCheckChain,
|
Chain: nodePortEndpointsCheckChain,
|
||||||
Rule: knftables.Concat(
|
Rule: knftables.Concat(
|
||||||
@ -686,7 +686,7 @@ func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) {
|
|||||||
"vmap", "@", serviceIPsMap,
|
"vmap", "@", serviceIPsMap,
|
||||||
),
|
),
|
||||||
})
|
})
|
||||||
if proxier.nodePortAddresses.MatchAll() {
|
if proxier.nodeAddressHandler.MatchAll() {
|
||||||
tx.Add(&knftables.Rule{
|
tx.Add(&knftables.Rule{
|
||||||
Chain: servicesChain,
|
Chain: servicesChain,
|
||||||
Rule: knftables.Concat(
|
Rule: knftables.Concat(
|
||||||
|
@ -127,7 +127,7 @@ func NewFakeProxier(ipFamily v1.IPFamily) (*knftables.Fake, *Proxier) {
|
|||||||
hostname: testHostname,
|
hostname: testHostname,
|
||||||
serviceHealthServer: healthcheck.NewFakeServiceHealthServer(),
|
serviceHealthServer: healthcheck.NewFakeServiceHealthServer(),
|
||||||
nodeIP: nodeIP,
|
nodeIP: nodeIP,
|
||||||
nodePortAddresses: proxyutil.NewNodePortAddresses(ipFamily, nodePortAddresses),
|
nodeAddressHandler: proxyutil.NewNodeAddressHandler(ipFamily, nodePortAddresses),
|
||||||
networkInterfacer: networkInterfacer,
|
networkInterfacer: networkInterfacer,
|
||||||
staleChains: make(map[string]time.Time),
|
staleChains: make(map[string]time.Time),
|
||||||
serviceCIDRs: serviceCIDRs,
|
serviceCIDRs: serviceCIDRs,
|
||||||
@ -958,7 +958,7 @@ func TestNodePorts(t *testing.T) {
|
|||||||
nodeIP = testNodeIPv6
|
nodeIP = testNodeIPv6
|
||||||
}
|
}
|
||||||
if tc.nodePortAddresses != nil {
|
if tc.nodePortAddresses != nil {
|
||||||
fp.nodePortAddresses = proxyutil.NewNodePortAddresses(tc.family, tc.nodePortAddresses)
|
fp.nodeAddressHandler = proxyutil.NewNodeAddressHandler(tc.family, tc.nodePortAddresses)
|
||||||
}
|
}
|
||||||
|
|
||||||
makeServiceMap(fp,
|
makeServiceMap(fp,
|
||||||
|
@ -24,8 +24,9 @@ import (
|
|||||||
netutils "k8s.io/utils/net"
|
netutils "k8s.io/utils/net"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NodePortAddresses is used to handle the --nodeport-addresses flag
|
// NodeAddressHandler is used to handle NodePortAddresses,
|
||||||
type NodePortAddresses struct {
|
// HealthzBindAddresses and MetricsBindAddresses.
|
||||||
|
type NodeAddressHandler struct {
|
||||||
cidrStrings []string
|
cidrStrings []string
|
||||||
|
|
||||||
cidrs []*net.IPNet
|
cidrs []*net.IPNet
|
||||||
@ -36,64 +37,65 @@ type NodePortAddresses struct {
|
|||||||
// RFC 5735 127.0.0.0/8 - This block is assigned for use as the Internet host loopback address
|
// RFC 5735 127.0.0.0/8 - This block is assigned for use as the Internet host loopback address
|
||||||
var ipv4LoopbackStart = net.IPv4(127, 0, 0, 0)
|
var ipv4LoopbackStart = net.IPv4(127, 0, 0, 0)
|
||||||
|
|
||||||
// NewNodePortAddresses takes an IP family and the `--nodeport-addresses` value (which is
|
// NewNodeAddressHandler takes an IP family and the CIDR strings (
|
||||||
|
// NodePortAddresses, HealthzBindAddresses or MetricsBindAddresses, which is
|
||||||
// assumed to contain only valid CIDRs, potentially of both IP families) and returns a
|
// assumed to contain only valid CIDRs, potentially of both IP families) and returns a
|
||||||
// NodePortAddresses object for the given family. If there are no CIDRs of the given
|
// NodeAddressHandler object for the given family. If there are no CIDRs of the given
|
||||||
// family then the CIDR "0.0.0.0/0" or "::/0" will be added (even if there are CIDRs of
|
// family then the CIDR "0.0.0.0/0" or "::/0" will be added (even if there are CIDRs of
|
||||||
// the other family).
|
// the other family).
|
||||||
func NewNodePortAddresses(family v1.IPFamily, cidrStrings []string) *NodePortAddresses {
|
func NewNodeAddressHandler(family v1.IPFamily, cidrStrings []string) *NodeAddressHandler {
|
||||||
npa := &NodePortAddresses{}
|
nah := &NodeAddressHandler{}
|
||||||
|
|
||||||
// Filter CIDRs to correct family
|
// Filter CIDRs to correct family
|
||||||
for _, str := range cidrStrings {
|
for _, str := range cidrStrings {
|
||||||
if (family == v1.IPv4Protocol) == netutils.IsIPv4CIDRString(str) {
|
if (family == v1.IPv4Protocol) == netutils.IsIPv4CIDRString(str) {
|
||||||
npa.cidrStrings = append(npa.cidrStrings, str)
|
nah.cidrStrings = append(nah.cidrStrings, str)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(npa.cidrStrings) == 0 {
|
if len(nah.cidrStrings) == 0 {
|
||||||
if family == v1.IPv4Protocol {
|
if family == v1.IPv4Protocol {
|
||||||
npa.cidrStrings = []string{IPv4ZeroCIDR}
|
nah.cidrStrings = []string{IPv4ZeroCIDR}
|
||||||
} else {
|
} else {
|
||||||
npa.cidrStrings = []string{IPv6ZeroCIDR}
|
nah.cidrStrings = []string{IPv6ZeroCIDR}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now parse
|
// Now parse
|
||||||
for _, str := range npa.cidrStrings {
|
for _, str := range nah.cidrStrings {
|
||||||
_, cidr, _ := netutils.ParseCIDRSloppy(str)
|
_, cidr, _ := netutils.ParseCIDRSloppy(str)
|
||||||
|
|
||||||
if netutils.IsIPv4CIDR(cidr) {
|
if netutils.IsIPv4CIDR(cidr) {
|
||||||
if cidr.IP.IsLoopback() || cidr.Contains(ipv4LoopbackStart) {
|
if cidr.IP.IsLoopback() || cidr.Contains(ipv4LoopbackStart) {
|
||||||
npa.containsIPv4Loopback = true
|
nah.containsIPv4Loopback = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if IsZeroCIDR(str) {
|
if IsZeroCIDR(str) {
|
||||||
// Ignore everything else
|
// Ignore everything else
|
||||||
npa.cidrs = []*net.IPNet{cidr}
|
nah.cidrs = []*net.IPNet{cidr}
|
||||||
npa.matchAll = true
|
nah.matchAll = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
npa.cidrs = append(npa.cidrs, cidr)
|
nah.cidrs = append(nah.cidrs, cidr)
|
||||||
}
|
}
|
||||||
|
|
||||||
return npa
|
return nah
|
||||||
}
|
}
|
||||||
|
|
||||||
func (npa *NodePortAddresses) String() string {
|
func (nah *NodeAddressHandler) String() string {
|
||||||
return fmt.Sprintf("%v", npa.cidrStrings)
|
return fmt.Sprintf("%v", nah.cidrStrings)
|
||||||
}
|
}
|
||||||
|
|
||||||
// MatchAll returns true if npa matches all node IPs (of npa's given family)
|
// MatchAll returns true if nah matches all node IPs (of nah's given family)
|
||||||
func (npa *NodePortAddresses) MatchAll() bool {
|
func (nah *NodeAddressHandler) MatchAll() bool {
|
||||||
return npa.matchAll
|
return nah.matchAll
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetNodeIPs return all matched node IP addresses for npa's CIDRs. If no matching
|
// GetNodeIPs return all matched node IP addresses for nah's CIDRs. If no matching
|
||||||
// IPs are found, it returns an empty list.
|
// IPs are found, it returns an empty list.
|
||||||
// NetworkInterfacer is injected for test purpose.
|
// NetworkInterfacer is injected for test purpose.
|
||||||
func (npa *NodePortAddresses) GetNodeIPs(nw NetworkInterfacer) ([]net.IP, error) {
|
func (nah *NodeAddressHandler) GetNodeIPs(nw NetworkInterfacer) ([]net.IP, error) {
|
||||||
addrs, err := nw.InterfaceAddrs()
|
addrs, err := nw.InterfaceAddrs()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error listing all interfaceAddrs from host, error: %v", err)
|
return nil, fmt.Errorf("error listing all interfaceAddrs from host, error: %v", err)
|
||||||
@ -101,7 +103,7 @@ func (npa *NodePortAddresses) GetNodeIPs(nw NetworkInterfacer) ([]net.IP, error)
|
|||||||
|
|
||||||
// Use a map to dedup matches
|
// Use a map to dedup matches
|
||||||
addresses := make(map[string]net.IP)
|
addresses := make(map[string]net.IP)
|
||||||
for _, cidr := range npa.cidrs {
|
for _, cidr := range nah.cidrs {
|
||||||
for _, addr := range addrs {
|
for _, addr := range addrs {
|
||||||
var ip net.IP
|
var ip net.IP
|
||||||
// nw.InterfaceAddrs may return net.IPAddr or net.IPNet on windows, and it will return net.IPNet on linux.
|
// nw.InterfaceAddrs may return net.IPAddr or net.IPNet on windows, and it will return net.IPNet on linux.
|
||||||
@ -128,7 +130,7 @@ func (npa *NodePortAddresses) GetNodeIPs(nw NetworkInterfacer) ([]net.IP, error)
|
|||||||
return ips, nil
|
return ips, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ContainsIPv4Loopback returns true if npa's CIDRs contain an IPv4 loopback address.
|
// ContainsIPv4Loopback returns true if nah's CIDRs contain an IPv4 loopback address.
|
||||||
func (npa *NodePortAddresses) ContainsIPv4Loopback() bool {
|
func (nah *NodeAddressHandler) ContainsIPv4Loopback() bool {
|
||||||
return npa.containsIPv4Loopback
|
return nah.containsIPv4Loopback
|
||||||
}
|
}
|
||||||
|
@ -379,13 +379,13 @@ func TestGetNodeIPs(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, family := range []v1.IPFamily{v1.IPv4Protocol, v1.IPv6Protocol} {
|
for _, family := range []v1.IPFamily{v1.IPv4Protocol, v1.IPv6Protocol} {
|
||||||
npa := NewNodePortAddresses(family, tc.cidrs)
|
nah := NewNodeAddressHandler(family, tc.cidrs)
|
||||||
|
|
||||||
if npa.MatchAll() != tc.expected[family].matchAll {
|
if nah.MatchAll() != tc.expected[family].matchAll {
|
||||||
t.Errorf("unexpected MatchAll(%s), expected: %v", family, tc.expected[family].matchAll)
|
t.Errorf("unexpected MatchAll(%s), expected: %v", family, tc.expected[family].matchAll)
|
||||||
}
|
}
|
||||||
|
|
||||||
ips, err := npa.GetNodeIPs(nw)
|
ips, err := nah.GetNodeIPs(nw)
|
||||||
expectedIPs := tc.expected[family].ips
|
expectedIPs := tc.expected[family].ips
|
||||||
|
|
||||||
// The fake InterfaceAddrs() never returns an error, so
|
// The fake InterfaceAddrs() never returns an error, so
|
||||||
@ -451,13 +451,13 @@ func TestContainsIPv4Loopback(t *testing.T) {
|
|||||||
}
|
}
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
npa := NewNodePortAddresses(v1.IPv4Protocol, tt.cidrStrings)
|
nah := NewNodeAddressHandler(v1.IPv4Protocol, tt.cidrStrings)
|
||||||
if got := npa.ContainsIPv4Loopback(); got != tt.want {
|
if got := nah.ContainsIPv4Loopback(); got != tt.want {
|
||||||
t.Errorf("IPv4 ContainsIPv4Loopback() = %v, want %v", got, tt.want)
|
t.Errorf("IPv4 ContainsIPv4Loopback() = %v, want %v", got, tt.want)
|
||||||
}
|
}
|
||||||
// ContainsIPv4Loopback should always be false for family=IPv6
|
// ContainsIPv4Loopback should always be false for family=IPv6
|
||||||
npa = NewNodePortAddresses(v1.IPv6Protocol, tt.cidrStrings)
|
nah = NewNodeAddressHandler(v1.IPv6Protocol, tt.cidrStrings)
|
||||||
if got := npa.ContainsIPv4Loopback(); got {
|
if got := nah.ContainsIPv4Loopback(); got {
|
||||||
t.Errorf("IPv6 ContainsIPv4Loopback() = %v, want %v", got, false)
|
t.Errorf("IPv6 ContainsIPv4Loopback() = %v, want %v", got, false)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -686,8 +686,8 @@ func NewProxier(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// windows listens to all node addresses
|
// windows listens to all node addresses
|
||||||
nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nil)
|
nodeAddressHandler := proxyutil.NewNodeAddressHandler(ipFamily, nil)
|
||||||
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer)
|
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodeAddressHandler, healthzServer)
|
||||||
|
|
||||||
var healthzPort int
|
var healthzPort int
|
||||||
if len(healthzBindAddress) > 0 {
|
if len(healthzBindAddress) > 0 {
|
||||||
|
Loading…
Reference in New Issue
Block a user