Merge pull request #115256 from danwinship/nodeport-addresses-2

clean up kube-proxy nodeport address handling (part 2)
This commit is contained in:
Kubernetes Prow Robot 2023-02-22 13:07:05 -08:00 committed by GitHub
commit b86f94f438
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 111 additions and 97 deletions

View File

@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
testingclock "k8s.io/utils/clock/testing"
"github.com/davecgh/go-spew/spew"
@ -131,8 +132,9 @@ type healthzPayload struct {
func TestServer(t *testing.T) {
listener := newFakeListener()
httpFactory := newFakeHTTPServerFactory()
nodePortAddresses := utilproxy.NewNodePortAddresses([]string{})
hcsi := newServiceHealthServer("hostname", nil, listener, httpFactory, []string{})
hcsi := newServiceHealthServer("hostname", nil, listener, httpFactory, nodePortAddresses)
hcs := hcsi.(*server)
if len(hcs.services) != 0 {
t.Errorf("expected 0 services, got %d", len(hcs.services))
@ -435,8 +437,9 @@ func TestServerWithSelectiveListeningAddress(t *testing.T) {
// limiting addresses to loop back. We don't want any cleverness here around getting IP for
// machine nor testing ipv6 || ipv4. using loop back guarantees the test will work on any machine
nodePortAddresses := utilproxy.NewNodePortAddresses([]string{"127.0.0.0/8"})
hcsi := newServiceHealthServer("hostname", nil, listener, httpFactory, []string{"127.0.0.0/8"})
hcsi := newServiceHealthServer("hostname", nil, listener, httpFactory, nodePortAddresses)
hcs := hcsi.(*server)
if len(hcs.services) != 0 {
t.Errorf("expected 0 services, got %d", len(hcs.services))

View File

@ -52,9 +52,9 @@ type ServiceHealthServer interface {
SyncEndpoints(newEndpoints map[types.NamespacedName]int) error
}
func newServiceHealthServer(hostname string, recorder events.EventRecorder, listener listener, factory httpServerFactory, nodePortAddresses []string) ServiceHealthServer {
func newServiceHealthServer(hostname string, recorder events.EventRecorder, listener listener, factory httpServerFactory, nodePortAddresses *utilproxy.NodePortAddresses) ServiceHealthServer {
nodeAddresses, err := utilproxy.GetNodeAddresses(nodePortAddresses, utilproxy.RealNetwork{})
nodeAddresses, err := nodePortAddresses.GetNodeAddresses(utilproxy.RealNetwork{})
if err != nil || nodeAddresses.Len() == 0 {
klog.ErrorS(err, "Failed to get node ip address matching node port addresses, health check port will listen to all node addresses", "nodePortAddresses", nodePortAddresses)
nodeAddresses = sets.NewString()
@ -81,7 +81,7 @@ func newServiceHealthServer(hostname string, recorder events.EventRecorder, list
}
// NewServiceHealthServer allocates a new service healthcheck server manager
func NewServiceHealthServer(hostname string, recorder events.EventRecorder, nodePortAddresses []string) ServiceHealthServer {
func NewServiceHealthServer(hostname string, recorder events.EventRecorder, nodePortAddresses *utilproxy.NodePortAddresses) ServiceHealthServer {
return newServiceHealthServer(hostname, recorder, stdNetListener{}, stdHTTPServerFactory{}, nodePortAddresses)
}

View File

@ -211,8 +211,8 @@ type Proxier struct {
// localhostNodePorts indicates whether we allow NodePort services to be accessed
// via localhost.
localhostNodePorts bool
// Values are as a parameter to select the interfaces where nodePort works.
nodePortAddresses []string
// nodePortAddresses selects the interfaces where nodePort works.
nodePortAddresses *utilproxy.NodePortAddresses
// networkInterfacer defines an interface for several net library functions.
// Inject for test purpose.
networkInterfacer utilproxy.NetworkInterfacer
@ -240,9 +240,11 @@ func NewProxier(ipFamily v1.IPFamily,
nodeIP net.IP,
recorder events.EventRecorder,
healthzServer healthcheck.ProxierHealthUpdater,
nodePortAddresses []string,
nodePortAddressStrings []string,
) (*Proxier, error) {
if !utilproxy.ContainsIPv4Loopback(nodePortAddresses) {
nodePortAddresses := utilproxy.NewNodePortAddresses(nodePortAddressStrings)
if !nodePortAddresses.ContainsIPv4Loopback() {
localhostNodePorts = false
}
if localhostNodePorts {
@ -988,20 +990,6 @@ func (proxier *Proxier) syncProxyRules() {
}
proxier.largeClusterMode = (totalEndpoints > largeClusterEndpointsThreshold)
nodeAddresses, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer)
if err != nil {
klog.ErrorS(err, "Failed to get node ip address matching nodeport cidrs, services with nodeport may not work as intended", "CIDRs", proxier.nodePortAddresses)
}
// nodeAddresses may contain dual-stack zero-CIDRs if proxier.nodePortAddresses is empty.
// Ensure nodeAddresses only contains the addresses for this proxier's IP family.
for addr := range nodeAddresses {
if utilproxy.IsZeroCIDR(addr) && isIPv6 == netutils.IsIPv6CIDRString(addr) {
// if any of the addresses is zero cidr of this IP family, non-zero IPs can be excluded.
nodeAddresses = sets.NewString(addr)
break
}
}
// These two variables are used to publish the sync_proxy_rules_no_endpoints_total
// metric.
serviceNoLocalEndpointsTotalInternal := 0
@ -1218,7 +1206,7 @@ func (proxier *Proxier) syncProxyRules() {
}
// Capture nodeports.
if svcInfo.NodePort() != 0 && len(nodeAddresses) != 0 {
if svcInfo.NodePort() != 0 {
if hasEndpoints {
// Jump to the external destination chain. For better or for
// worse, nodeports are not subect to loadBalancerSourceRanges,
@ -1414,7 +1402,7 @@ func (proxier *Proxier) syncProxyRules() {
for _, ep := range allLocallyReachableEndpoints {
epInfo, ok := ep.(*endpointsInfo)
if !ok {
klog.ErrorS(err, "Failed to cast endpointsInfo", "endpointsInfo", ep)
klog.ErrorS(nil, "Failed to cast endpointsInfo", "endpointsInfo", ep)
continue
}
@ -1474,6 +1462,20 @@ func (proxier *Proxier) syncProxyRules() {
// Finally, tail-call to the nodePorts chain. This needs to be after all
// other service portal rules.
nodeAddresses, err := proxier.nodePortAddresses.GetNodeAddresses(proxier.networkInterfacer)
if err != nil {
klog.ErrorS(err, "Failed to get node ip address matching nodeport cidrs, services with nodeport may not work as intended", "CIDRs", proxier.nodePortAddresses)
}
// nodeAddresses may contain dual-stack zero-CIDRs if proxier.nodePortAddresses is empty.
// Ensure nodeAddresses only contains the addresses for this proxier's IP family.
for addr := range nodeAddresses {
if utilproxy.IsZeroCIDR(addr) && isIPv6 == netutils.IsIPv6CIDRString(addr) {
// if any of the addresses is zero cidr of this IP family, non-zero IPs can be excluded.
nodeAddresses = sets.NewString(addr)
break
}
}
for address := range nodeAddresses {
if utilproxy.IsZeroCIDR(address) {
destinations := []string{"-m", "addrtype", "--dst-type", "LOCAL"}

View File

@ -423,7 +423,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
natRules: utilproxy.LineBuffer{},
nodeIP: netutils.ParseIPSloppy(testNodeIP),
localhostNodePorts: true,
nodePortAddresses: make([]string, 0),
nodePortAddresses: utilproxy.NewNodePortAddresses(nil),
networkInterfacer: networkInterfacer,
}
p.setInitialized(true)
@ -2557,7 +2557,7 @@ func TestNodePort(t *testing.T) {
func TestHealthCheckNodePort(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
fp.nodePortAddresses = []string{"127.0.0.0/8"}
fp.nodePortAddresses = utilproxy.NewNodePortAddresses([]string{"127.0.0.0/8"})
svcIP := "172.30.0.42"
svcPort := 80
@ -3486,7 +3486,7 @@ func TestDisableLocalhostNodePortsIPv4WithNodeAddress(t *testing.T) {
fp.localDetector = proxyutiliptables.NewNoOpLocalDetector()
fp.localhostNodePorts = false
fp.networkInterfacer.InterfaceAddrs()
fp.nodePortAddresses = []string{"127.0.0.0/8"}
fp.nodePortAddresses = utilproxy.NewNodePortAddresses([]string{"127.0.0.0/8"})
expected := dedent.Dedent(`
*filter
@ -3767,7 +3767,7 @@ func TestOnlyLocalNodePortsNoClusterCIDR(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
fp.localDetector = proxyutiliptables.NewNoOpLocalDetector()
fp.nodePortAddresses = []string{"192.168.0.0/24"}
fp.nodePortAddresses = utilproxy.NewNodePortAddresses([]string{"192.168.0.0/24"})
fp.localhostNodePorts = false
expected := dedent.Dedent(`
@ -3816,7 +3816,7 @@ func TestOnlyLocalNodePortsNoClusterCIDR(t *testing.T) {
func TestOnlyLocalNodePorts(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
fp.nodePortAddresses = []string{"192.168.0.0/24"}
fp.nodePortAddresses = utilproxy.NewNodePortAddresses([]string{"192.168.0.0/24"})
fp.localhostNodePorts = false
expected := dedent.Dedent(`

View File

@ -276,8 +276,8 @@ type Proxier struct {
netlinkHandle NetLinkHandle
// ipsetList is the list of ipsets that ipvs proxier used.
ipsetList map[string]*IPSet
// Values are as a parameter to select the interfaces which nodeport works.
nodePortAddresses []string
// nodePortAddresses selects the interfaces where nodePort works.
nodePortAddresses *utilproxy.NodePortAddresses
// networkInterfacer defines an interface for several net library functions.
// Inject for test purpose.
networkInterfacer utilproxy.NetworkInterfacer
@ -327,7 +327,7 @@ func NewProxier(ipFamily v1.IPFamily,
recorder events.EventRecorder,
healthzServer healthcheck.ProxierHealthUpdater,
scheduler string,
nodePortAddresses []string,
nodePortAddressStrings []string,
kernelHandler KernelHandler,
) (*Proxier, error) {
// Proxy needs br_netfilter and bridge-nf-call-iptables=1 when containers
@ -409,6 +409,8 @@ func NewProxier(ipFamily v1.IPFamily,
scheduler = defaultScheduler
}
nodePortAddresses := utilproxy.NewNodePortAddresses(nodePortAddressStrings)
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses)
// excludeCIDRs has been validated before, here we just parse it to IPNet list
@ -1028,7 +1030,7 @@ func (proxier *Proxier) syncProxyRules() {
)
if hasNodePort {
nodeAddrSet, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer)
nodeAddrSet, err := proxier.nodePortAddresses.GetNodeAddresses(proxier.networkInterfacer)
if err != nil {
klog.ErrorS(err, "Failed to get node IP address matching nodeport cidr")
} else {

View File

@ -172,7 +172,7 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u
filterRules: utilproxy.LineBuffer{},
netlinkHandle: netlinkHandle,
ipsetList: ipsetList,
nodePortAddresses: make([]string, 0),
nodePortAddresses: utilproxy.NewNodePortAddresses(nil),
networkInterfacer: proxyutiltest.NewFakeNetwork(),
gracefuldeleteManager: NewGracefulTerminationManager(ipvs),
ipFamily: ipFamily,
@ -963,7 +963,7 @@ func TestNodePortIPv4(t *testing.T) {
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, test.nodeIPs, nil, v1.IPv4Protocol)
fp.nodePortAddresses = test.nodePortAddresses
fp.nodePortAddresses = utilproxy.NewNodePortAddresses(test.nodePortAddresses)
makeServiceMap(fp, test.services...)
populateEndpointSlices(fp, test.endpoints...)
@ -1308,7 +1308,7 @@ func TestNodePortIPv6(t *testing.T) {
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, test.nodeIPs, nil, v1.IPv6Protocol)
fp.nodePortAddresses = test.nodePortAddresses
fp.nodePortAddresses = utilproxy.NewNodePortAddresses(test.nodePortAddresses)
makeServiceMap(fp, test.services...)
populateEndpointSlices(fp, test.endpoints...)
@ -2071,7 +2071,7 @@ func TestOnlyLocalNodePorts(t *testing.T) {
addrs1 := []net.Addr{&net.IPNet{IP: netutils.ParseIPSloppy("2001:db8::"), Mask: net.CIDRMask(64, 128)}}
fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf, addrs)
fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf1, addrs1)
fp.nodePortAddresses = []string{"100.101.102.0/24"}
fp.nodePortAddresses = utilproxy.NewNodePortAddresses([]string{"100.101.102.0/24"})
fp.syncProxyRules()
@ -2159,7 +2159,7 @@ func TestHealthCheckNodePort(t *testing.T) {
addrs1 := []net.Addr{&net.IPNet{IP: netutils.ParseIPSloppy("2001:db8::"), Mask: net.CIDRMask(64, 128)}}
fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf, addrs)
fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf1, addrs1)
fp.nodePortAddresses = []string{"100.101.102.0/24"}
fp.nodePortAddresses = utilproxy.NewNodePortAddresses([]string{"100.101.102.0/24"})
fp.syncProxyRules()

View File

@ -24,23 +24,57 @@ import (
netutils "k8s.io/utils/net"
)
// GetNodeAddresses return all matched node IP addresses based on given cidr slice.
// Some callers, e.g. IPVS proxier, need concrete IPs, not ranges, which is why this exists.
// NetworkInterfacer is injected for test purpose.
// We expect the cidrs passed in is already validated.
// Given an empty input `[]`, it will return `0.0.0.0/0` and `::/0` directly.
// If multiple cidrs is given, it will return the minimal IP sets, e.g. given input `[1.2.0.0/16, 0.0.0.0/0]`, it will
// only return `0.0.0.0/0`.
// NOTE: GetNodeAddresses only accepts CIDRs, if you want concrete IPs, e.g. 1.2.3.4, then the input should be 1.2.3.4/32.
func GetNodeAddresses(cidrs []string, nw NetworkInterfacer) (sets.String, error) {
uniqueAddressList := sets.NewString()
if len(cidrs) == 0 {
uniqueAddressList.Insert(IPv4ZeroCIDR)
uniqueAddressList.Insert(IPv6ZeroCIDR)
return uniqueAddressList, nil
// NodePortAddresses is used to handle the --nodeport-addresses flag
type NodePortAddresses struct {
cidrStrings []string
cidrs []*net.IPNet
containsIPv4Loopback bool
}
// RFC 5735 127.0.0.0/8 - This block is assigned for use as the Internet host loopback address
var ipv4LoopbackStart = net.IPv4(127, 0, 0, 0)
// NewNodePortAddresses takes the `--nodeport-addresses` value (which is assumed to
// contain only valid CIDRs) and returns a NodePortAddresses object. If cidrStrings is
// empty, this is treated as `["0.0.0.0/0", "::/0"]`.
func NewNodePortAddresses(cidrStrings []string) *NodePortAddresses {
if len(cidrStrings) == 0 {
cidrStrings = []string{IPv4ZeroCIDR, IPv6ZeroCIDR}
}
npa := &NodePortAddresses{
cidrStrings: cidrStrings,
}
for _, str := range npa.cidrStrings {
_, cidr, _ := netutils.ParseCIDRSloppy(str)
npa.cidrs = append(npa.cidrs, cidr)
if netutils.IsIPv4CIDR(cidr) {
if cidr.IP.IsLoopback() || cidr.Contains(ipv4LoopbackStart) {
npa.containsIPv4Loopback = true
}
}
}
return npa
}
func (npa *NodePortAddresses) String() string {
return fmt.Sprintf("%v", npa.cidrStrings)
}
// GetNodeAddresses return all matched node IP addresses for npa's CIDRs.
// If npa's CIDRs include "0.0.0.0/0" and/or "::/0", then those values will be returned
// verbatim in the response and no actual IPs of that family will be returned.
// If no matching IPs are found, GetNodeAddresses will return an error.
// NetworkInterfacer is injected for test purpose.
func (npa *NodePortAddresses) GetNodeAddresses(nw NetworkInterfacer) (sets.String, error) {
uniqueAddressList := sets.NewString()
// First round of iteration to pick out `0.0.0.0/0` or `::/0` for the sake of excluding non-zero IPs.
for _, cidr := range cidrs {
for _, cidr := range npa.cidrStrings {
if IsZeroCIDR(cidr) {
uniqueAddressList.Insert(cidr)
}
@ -52,12 +86,11 @@ func GetNodeAddresses(cidrs []string, nw NetworkInterfacer) (sets.String, error)
}
// Second round of iteration to parse IPs based on cidr.
for _, cidr := range cidrs {
if IsZeroCIDR(cidr) {
for _, cidr := range npa.cidrs {
if IsZeroCIDR(cidr.String()) {
continue
}
_, ipNet, _ := netutils.ParseCIDRSloppy(cidr)
for _, addr := range addrs {
var ip net.IP
// nw.InterfaceAddrs may return net.IPAddr or net.IPNet on windows, and it will return net.IPNet on linux.
@ -70,7 +103,7 @@ func GetNodeAddresses(cidrs []string, nw NetworkInterfacer) (sets.String, error)
continue
}
if ipNet.Contains(ip) {
if cidr.Contains(ip) {
if netutils.IsIPv6(ip) && !uniqueAddressList.Has(IPv6ZeroCIDR) {
uniqueAddressList.Insert(ip.String())
}
@ -82,35 +115,13 @@ func GetNodeAddresses(cidrs []string, nw NetworkInterfacer) (sets.String, error)
}
if uniqueAddressList.Len() == 0 {
return nil, fmt.Errorf("no addresses found for cidrs %v", cidrs)
return nil, fmt.Errorf("no addresses found for cidrs %v", npa.cidrStrings)
}
return uniqueAddressList, nil
}
// ContainsIPv4Loopback returns true if the input is empty or one of the CIDR contains an IPv4 loopback address.
func ContainsIPv4Loopback(cidrStrings []string) bool {
if len(cidrStrings) == 0 {
return true
}
// RFC 5735 127.0.0.0/8 - This block is assigned for use as the Internet host loopback address
ipv4LoopbackStart := netutils.ParseIPSloppy("127.0.0.0")
for _, cidr := range cidrStrings {
ip, ipnet, err := netutils.ParseCIDRSloppy(cidr)
if err != nil {
continue
}
if netutils.IsIPv6CIDR(ipnet) {
continue
}
if ip.IsLoopback() {
return true
}
if ipnet.Contains(ipv4LoopbackStart) {
return true
}
}
return false
// ContainsIPv4Loopback returns true if npa's CIDRs contain an IPv4 loopback address.
func (npa *NodePortAddresses) ContainsIPv4Loopback() bool {
return npa.containsIPv4Loopback
}

View File

@ -245,7 +245,8 @@ func TestGetNodeAddresses(t *testing.T) {
nw.AddInterfaceAddr(&pair.itf, pair.addrs)
}
addrList, err := GetNodeAddresses(tc.cidrs, nw)
npa := NewNodePortAddresses(tc.cidrs)
addrList, err := npa.GetNodeAddresses(nw)
// The fake InterfaceAddrs() never returns an error, so the only
// error GetNodeAddresses will return is "no addresses found".
if err != nil && tc.expected != nil {
@ -274,11 +275,6 @@ func TestContainsIPv4Loopback(t *testing.T) {
cidrStrings: []string{"224.0.0.0/24", "192.168.0.0/16", "fd00:1:d::/64", "0.0.0.0/0"},
want: true,
},
{
name: "all zeros ipv4 and invalid cidr",
cidrStrings: []string{"invalid.cidr", "192.168.0.0/16", "fd00:1:d::/64", "0.0.0.0/0"},
want: true,
},
{
name: "all zeros ipv6",
cidrStrings: []string{"224.0.0.0/24", "192.168.0.0/16", "fd00:1:d::/64", "::/0"},
@ -309,15 +305,11 @@ func TestContainsIPv4Loopback(t *testing.T) {
cidrStrings: []string{"128.0.2.0/28", "224.0.0.0/24", "192.168.0.0/16", "fd00:1:d::/64"},
want: false,
},
{
name: "invalid cidr",
cidrStrings: []string{"invalid.ip/invalid.mask"},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := ContainsIPv4Loopback(tt.cidrStrings); got != tt.want {
npa := NewNodePortAddresses(tt.cidrStrings)
if got := npa.ContainsIPv4Loopback(); got != tt.want {
t.Errorf("ContainsIPv4Loopback() = %v, want %v", got, tt.want)
}
})

View File

@ -49,6 +49,7 @@ import (
"k8s.io/kubernetes/pkg/proxy/healthcheck"
"k8s.io/kubernetes/pkg/proxy/metaproxier"
"k8s.io/kubernetes/pkg/proxy/metrics"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
"k8s.io/kubernetes/pkg/util/async"
netutils "k8s.io/utils/net"
)
@ -698,7 +699,10 @@ func NewProxier(
klog.InfoS("ClusterCIDR not specified, unable to distinguish between internal and external traffic")
}
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, []string{} /* windows listen to all node addresses */)
// windows listens to all node addresses
nodePortAddresses := utilproxy.NewNodePortAddresses(nil)
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses)
hns, supportedFeatures := newHostNetworkService()
hnsNetworkName, err := getNetworkName(config.NetworkName)
if err != nil {