Merge pull request #128216 from pacoxu/revert-127930-kube-proxy-refactor-healthz-metrics-address

Revert "re: kube-proxy: internal config: refactor HealthzAddress and MetricsAddress "
This commit is contained in:
Kubernetes Prow Robot 2024-10-21 10:15:05 +01:00 committed by GitHub
commit d99ee9293c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 369 additions and 578 deletions

View File

@ -19,9 +19,7 @@ package app
import (
"context"
"fmt"
"net"
"os"
"strconv"
"strings"
"time"
@ -35,14 +33,15 @@ import (
logsapi "k8s.io/component-base/logs/api/v1"
"k8s.io/klog/v2"
"k8s.io/kube-proxy/config/v1alpha1"
"k8s.io/kubernetes/pkg/cluster/ports"
"k8s.io/kubernetes/pkg/kubelet/qos"
kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config"
proxyconfigscheme "k8s.io/kubernetes/pkg/proxy/apis/config/scheme"
kubeproxyconfigv1alpha1 "k8s.io/kubernetes/pkg/proxy/apis/config/v1alpha1"
"k8s.io/kubernetes/pkg/proxy/apis/config/validation"
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
"k8s.io/kubernetes/pkg/util/filesystem"
utilflag "k8s.io/kubernetes/pkg/util/flag"
netutils "k8s.io/utils/net"
"k8s.io/utils/ptr"
)
@ -72,6 +71,10 @@ type Options struct {
// master is used to override the kubeconfig's URL to the apiserver.
master string
// healthzPort is the port to be used by the healthz server.
healthzPort int32
// metricsPort is the port to be used by the metrics server.
metricsPort int32
// hostnameOverride, if set from the command line flag, takes precedence over the `HostnameOverride` value from the config file
hostnameOverride string
@ -85,8 +88,6 @@ type Options struct {
ipvsSyncPeriod time.Duration
ipvsMinSyncPeriod time.Duration
clusterCIDRs string
healthzBindAddress string
metricsBindAddress string
}
// AddFlags adds flags to fs and binds them to options.
@ -110,8 +111,8 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.hostnameOverride, "hostname-override", o.hostnameOverride, "If non-empty, will be used as the name of the Node that kube-proxy is running on. If unset, the node name is assumed to be the same as the node's hostname.")
fs.Var(&utilflag.IPVar{Val: &o.config.BindAddress}, "bind-address", "Overrides kube-proxy's idea of what its node's primary IP is. Note that the name is a historical artifact, and kube-proxy does not actually bind any sockets to this IP. This parameter is ignored if a config file is specified by --config.")
fs.Var(&utilflag.IPPortVar{Val: &o.healthzBindAddress}, "healthz-bind-address", "The IP address and port for the health check server to serve on, defaulting to \"0.0.0.0:10256\". This parameter is ignored if a config file is specified by --config.")
fs.Var(&utilflag.IPPortVar{Val: &o.metricsBindAddress}, "metrics-bind-address", "The IP address and port for the metrics server to serve on, defaulting to \"127.0.0.1:10249\". (Set to \"0.0.0.0:10249\" / \"[::]:10249\" to bind on all interfaces.) Set empty to disable. This parameter is ignored if a config file is specified by --config.")
fs.Var(&utilflag.IPPortVar{Val: &o.config.HealthzBindAddress}, "healthz-bind-address", "The IP address and port for the health check server to serve on, defaulting to \"0.0.0.0:10256\". This parameter is ignored if a config file is specified by --config.")
fs.Var(&utilflag.IPPortVar{Val: &o.config.MetricsBindAddress}, "metrics-bind-address", "The IP address and port for the metrics server to serve on, defaulting to \"127.0.0.1:10249\". (Set to \"0.0.0.0:10249\" / \"[::]:10249\" to bind on all interfaces.) Set empty to disable. This parameter is ignored if a config file is specified by --config.")
fs.BoolVar(&o.config.BindAddressHardFail, "bind-address-hard-fail", o.config.BindAddressHardFail, "If true kube-proxy will treat failure to bind to a port as fatal and exit")
fs.BoolVar(&o.config.EnableProfiling, "profiling", o.config.EnableProfiling, "If true enables profiling via web interface on /debug/pprof handler. This parameter is ignored if a config file is specified by --config.")
fs.StringVar(&o.config.ShowHiddenMetricsForVersion, "show-hidden-metrics-for-version", o.config.ShowHiddenMetricsForVersion,
@ -165,6 +166,11 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&o.config.Linux.Conntrack.UDPStreamTimeout.Duration, "conntrack-udp-timeout-stream", o.config.Linux.Conntrack.UDPStreamTimeout.Duration, "Idle timeout for ASSURED UDP connections (0 to leave as-is)")
fs.DurationVar(&o.config.ConfigSyncPeriod.Duration, "config-sync-period", o.config.ConfigSyncPeriod.Duration, "How often configuration from the apiserver is refreshed. Must be greater than 0.")
fs.Int32Var(&o.healthzPort, "healthz-port", o.healthzPort, "The port to bind the health check server. Use 0 to disable.")
_ = fs.MarkDeprecated("healthz-port", "This flag is deprecated and will be removed in a future release. Please use --healthz-bind-address instead.")
fs.Int32Var(&o.metricsPort, "metrics-port", o.metricsPort, "The port to bind the metrics server. Use 0 to disable.")
_ = fs.MarkDeprecated("metrics-port", "This flag is deprecated and will be removed in a future release. Please use --metrics-bind-address instead.")
logsapi.AddFlags(&o.config.Logging, fs)
}
@ -183,14 +189,21 @@ func newKubeProxyConfiguration() *kubeproxyconfig.KubeProxyConfiguration {
// NewOptions returns initialized Options
func NewOptions() *Options {
return &Options{
config: newKubeProxyConfiguration(),
errCh: make(chan error),
logger: klog.FromContext(context.Background()),
config: newKubeProxyConfiguration(),
healthzPort: ports.ProxyHealthzPort,
metricsPort: ports.ProxyStatusPort,
errCh: make(chan error),
logger: klog.FromContext(context.Background()),
}
}
// Complete completes all the required options.
func (o *Options) Complete(fs *pflag.FlagSet) error {
if len(o.ConfigFile) == 0 && len(o.WriteConfigTo) == 0 {
o.config.HealthzBindAddress = addressFromDeprecatedFlags(o.config.HealthzBindAddress, o.healthzPort)
o.config.MetricsBindAddress = addressFromDeprecatedFlags(o.config.MetricsBindAddress, o.metricsPort)
}
// Load the config file here in Complete, so that Validate validates the fully-resolved config.
if len(o.ConfigFile) > 0 {
c, err := o.loadConfigFromFile(o.ConfigFile)
@ -315,32 +328,6 @@ func (o *Options) processV1Alpha1Flags(fs *pflag.FlagSet) {
if fs.Changed("cluster-cidr") {
o.config.DetectLocal.ClusterCIDRs = strings.Split(o.clusterCIDRs, ",")
}
if fs.Changed("healthz-bind-address") {
host, port, _ := net.SplitHostPort(o.healthzBindAddress)
ip := netutils.ParseIPSloppy(host)
if ip.IsUnspecified() {
o.config.HealthzBindAddresses = []string{fmt.Sprintf("%s/0", host)}
} else if netutils.IsIPv4(ip) {
o.config.HealthzBindAddresses = []string{fmt.Sprintf("%s/32", host)}
} else {
o.config.HealthzBindAddresses = []string{fmt.Sprintf("%s/128", host)}
}
intPort, _ := strconv.Atoi(port)
o.config.HealthzBindPort = int32(intPort)
}
if fs.Changed("metrics-bind-address") {
host, port, _ := net.SplitHostPort(o.metricsBindAddress)
ip := netutils.ParseIPSloppy(host)
if ip.IsUnspecified() {
o.config.MetricsBindAddresses = []string{fmt.Sprintf("%s/0", host)}
} else if netutils.IsIPv4(ip) {
o.config.MetricsBindAddresses = []string{fmt.Sprintf("%s/32", host)}
} else {
o.config.MetricsBindAddresses = []string{fmt.Sprintf("%s/128", host)}
}
intPort, _ := strconv.Atoi(port)
o.config.MetricsBindPort = int32(intPort)
}
}
// Validate validates all the required options.
@ -429,6 +416,17 @@ func (o *Options) writeConfigFile() (err error) {
return nil
}
// addressFromDeprecatedFlags returns server address from flags
// passed on the command line based on the following rules:
// 1. If port is 0, disable the server (e.g. set address to empty).
// 2. Otherwise, set the port portion of the config accordingly.
func addressFromDeprecatedFlags(addr string, port int32) string {
if port == 0 {
return ""
}
return proxyutil.AppendPortIfNeeded(addr, port)
}
// newLenientSchemeAndCodecs returns a scheme that has only v1alpha1 registered into
// it and a CodecFactory with strict decoding disabled.
func newLenientSchemeAndCodecs() (*runtime.Scheme, *serializer.CodecFactory, error) {

View File

@ -89,130 +89,94 @@ nodePortAddresses:
`
testCases := []struct {
name string
mode string
bindAddress string
clusterCIDR string
healthzBindAddress string
metricsBindAddress string
extraConfig string
expectedHealthzBindAddresses []string
expectedHealthzBindPort int32
expectedMetricsBindAddresses []string
expectedMetricsBindPort int32
name string
mode string
bindAddress string
clusterCIDR string
healthzBindAddress string
metricsBindAddress string
extraConfig string
}{
{
name: "iptables mode, IPv4 all-zeros bind address",
mode: "iptables",
bindAddress: "0.0.0.0",
clusterCIDR: "1.2.3.0/24",
healthzBindAddress: "1.2.3.4:12345",
metricsBindAddress: "2.3.4.5:23456",
expectedHealthzBindAddresses: []string{"1.2.3.4/32"},
expectedHealthzBindPort: int32(12345),
expectedMetricsBindAddresses: []string{"2.3.4.5/32"},
expectedMetricsBindPort: int32(23456),
name: "iptables mode, IPv4 all-zeros bind address",
mode: "iptables",
bindAddress: "0.0.0.0",
clusterCIDR: "1.2.3.0/24",
healthzBindAddress: "1.2.3.4:12345",
metricsBindAddress: "2.3.4.5:23456",
},
{
name: "iptables mode, non-zeros IPv4 config",
mode: "iptables",
bindAddress: "9.8.7.6",
clusterCIDR: "1.2.3.0/24",
healthzBindAddress: "1.2.3.4:12345",
metricsBindAddress: "2.3.4.5:23456",
expectedHealthzBindAddresses: []string{"1.2.3.4/32"},
expectedHealthzBindPort: int32(12345),
expectedMetricsBindAddresses: []string{"2.3.4.5/32"},
expectedMetricsBindPort: int32(23456),
name: "iptables mode, non-zeros IPv4 config",
mode: "iptables",
bindAddress: "9.8.7.6",
clusterCIDR: "1.2.3.0/24",
healthzBindAddress: "1.2.3.4:12345",
metricsBindAddress: "2.3.4.5:23456",
},
{
// Test for 'bindAddress: "::"' (IPv6 all-zeros) in kube-proxy
// config file. The user will need to put quotes around '::' since
// 'bindAddress: ::' is invalid yaml syntax.
name: "iptables mode, IPv6 \"::\" bind address",
mode: "iptables",
bindAddress: "\"::\"",
clusterCIDR: "fd00:1::0/64",
healthzBindAddress: "[fd00:1::5]:12345",
metricsBindAddress: "[fd00:2::5]:23456",
expectedHealthzBindAddresses: []string{"fd00:1::5/128"},
expectedHealthzBindPort: int32(12345),
expectedMetricsBindAddresses: []string{"fd00:2::5/128"},
expectedMetricsBindPort: int32(23456),
name: "iptables mode, IPv6 \"::\" bind address",
mode: "iptables",
bindAddress: "\"::\"",
clusterCIDR: "fd00:1::0/64",
healthzBindAddress: "[fd00:1::5]:12345",
metricsBindAddress: "[fd00:2::5]:23456",
},
{
// Test for 'bindAddress: "[::]"' (IPv6 all-zeros in brackets)
// in kube-proxy config file. The user will need to use
// surrounding quotes here since 'bindAddress: [::]' is invalid
// yaml syntax.
name: "iptables mode, IPv6 \"[::]\" bind address",
mode: "iptables",
bindAddress: "\"[::]\"",
clusterCIDR: "fd00:1::0/64",
healthzBindAddress: "[fd00:1::5]:12345",
metricsBindAddress: "[fd00:2::5]:23456",
expectedHealthzBindAddresses: []string{"fd00:1::5/128"},
expectedHealthzBindPort: int32(12345),
expectedMetricsBindAddresses: []string{"fd00:2::5/128"},
expectedMetricsBindPort: int32(23456),
name: "iptables mode, IPv6 \"[::]\" bind address",
mode: "iptables",
bindAddress: "\"[::]\"",
clusterCIDR: "fd00:1::0/64",
healthzBindAddress: "[fd00:1::5]:12345",
metricsBindAddress: "[fd00:2::5]:23456",
},
{
// Test for 'bindAddress: ::0' (another form of IPv6 all-zeros).
// No surrounding quotes are required around '::0'.
name: "iptables mode, IPv6 ::0 bind address",
mode: "iptables",
bindAddress: "::0",
clusterCIDR: "fd00:1::0/64",
healthzBindAddress: "[fd00:1::5]:12345",
metricsBindAddress: "[fd00:2::5]:23456",
expectedHealthzBindAddresses: []string{"fd00:1::5/128"},
expectedHealthzBindPort: int32(12345),
expectedMetricsBindAddresses: []string{"fd00:2::5/128"},
expectedMetricsBindPort: int32(23456),
name: "iptables mode, IPv6 ::0 bind address",
mode: "iptables",
bindAddress: "::0",
clusterCIDR: "fd00:1::0/64",
healthzBindAddress: "[fd00:1::5]:12345",
metricsBindAddress: "[fd00:2::5]:23456",
},
{
name: "ipvs mode, IPv6 config",
mode: "ipvs",
bindAddress: "2001:db8::1",
clusterCIDR: "fd00:1::0/64",
healthzBindAddress: "[fd00:1::5]:12345",
metricsBindAddress: "[fd00:2::5]:23456",
expectedHealthzBindAddresses: []string{"fd00:1::5/128"},
expectedHealthzBindPort: int32(12345),
expectedMetricsBindAddresses: []string{"fd00:2::5/128"},
expectedMetricsBindPort: int32(23456),
name: "ipvs mode, IPv6 config",
mode: "ipvs",
bindAddress: "2001:db8::1",
clusterCIDR: "fd00:1::0/64",
healthzBindAddress: "[fd00:1::5]:12345",
metricsBindAddress: "[fd00:2::5]:23456",
},
{
// Test for unknown field within config.
// For v1alpha1 a lenient path is implemented and will throw a
// strict decoding warning instead of failing to load
name: "unknown field",
mode: "iptables",
bindAddress: "9.8.7.6",
clusterCIDR: "1.2.3.0/24",
healthzBindAddress: "1.2.3.4:12345",
metricsBindAddress: "2.3.4.5:23456",
extraConfig: "foo: bar",
expectedHealthzBindAddresses: []string{"1.2.3.4/32"},
expectedHealthzBindPort: int32(12345),
expectedMetricsBindAddresses: []string{"2.3.4.5/32"},
expectedMetricsBindPort: int32(23456),
name: "unknown field",
mode: "iptables",
bindAddress: "9.8.7.6",
clusterCIDR: "1.2.3.0/24",
healthzBindAddress: "1.2.3.4:12345",
metricsBindAddress: "2.3.4.5:23456",
extraConfig: "foo: bar",
},
{
// Test for duplicate field within config.
// For v1alpha1 a lenient path is implemented and will throw a
// strict decoding warning instead of failing to load
name: "duplicate field",
mode: "iptables",
bindAddress: "9.8.7.6",
clusterCIDR: "1.2.3.0/24",
healthzBindAddress: "1.2.3.4:12345",
metricsBindAddress: "2.3.4.5:23456",
extraConfig: "bindAddress: 9.8.7.6",
expectedHealthzBindAddresses: []string{"1.2.3.4/32"},
expectedHealthzBindPort: int32(12345),
expectedMetricsBindAddresses: []string{"2.3.4.5/32"},
expectedMetricsBindPort: int32(23456),
name: "duplicate field",
mode: "iptables",
bindAddress: "9.8.7.6",
clusterCIDR: "1.2.3.0/24",
healthzBindAddress: "1.2.3.4:12345",
metricsBindAddress: "2.3.4.5:23456",
extraConfig: "bindAddress: 9.8.7.6",
},
}
@ -245,10 +209,9 @@ nodePortAddresses:
MasqueradeAll: true,
OOMScoreAdj: ptr.To[int32](17),
},
FeatureGates: map[string]bool{},
HealthzBindAddresses: tc.expectedHealthzBindAddresses,
HealthzBindPort: tc.expectedHealthzBindPort,
HostnameOverride: "foo",
FeatureGates: map[string]bool{},
HealthzBindAddress: tc.healthzBindAddress,
HostnameOverride: "foo",
IPTables: kubeproxyconfig.KubeProxyIPTablesConfiguration{
MasqueradeBit: ptr.To[int32](17),
LocalhostNodePorts: ptr.To(true),
@ -259,11 +222,10 @@ nodePortAddresses:
NFTables: kubeproxyconfig.KubeProxyNFTablesConfiguration{
MasqueradeBit: ptr.To[int32](18),
},
MetricsBindAddresses: tc.expectedMetricsBindAddresses,
MetricsBindPort: tc.expectedMetricsBindPort,
Mode: kubeproxyconfig.ProxyMode(tc.mode),
NodePortAddresses: []string{"10.20.30.40/16", "fd00:1::0/64"},
DetectLocalMode: kubeproxyconfig.LocalModeClusterCIDR,
MetricsBindAddress: tc.metricsBindAddress,
Mode: kubeproxyconfig.ProxyMode(tc.mode),
NodePortAddresses: []string{"10.20.30.40/16", "fd00:1::0/64"},
DetectLocalMode: kubeproxyconfig.LocalModeClusterCIDR,
DetectLocal: kubeproxyconfig.DetectLocalConfiguration{
BridgeInterface: "cbr0",
ClusterCIDRs: strings.Split(tc.clusterCIDR, ","),
@ -492,36 +454,6 @@ func TestProcessV1Alpha1Flags(t *testing.T) {
return reflect.DeepEqual(config.DetectLocal.ClusterCIDRs, []string{"2002:0:0:1234::/64", "10.0.0.0/14"})
},
},
{
name: "metrics and healthz address ipv4",
flags: []string{
"--healthz-bind-address=0.0.0.0:54321",
"--metrics-bind-address=127.0.0.1:3306",
},
validate: func(config *kubeproxyconfig.KubeProxyConfiguration) bool {
if reflect.DeepEqual(config.HealthzBindAddresses, []string{"0.0.0.0/0"}) &&
reflect.DeepEqual(config.MetricsBindAddresses, []string{"127.0.0.1/32"}) &&
config.HealthzBindPort == 54321 && config.MetricsBindPort == 3306 {
return true
}
return false
},
},
{
name: "metrics and healthz address ipv6",
flags: []string{
"--healthz-bind-address=[fd00:4321::2]:9090",
"--metrics-bind-address=[::1]:8080",
},
validate: func(config *kubeproxyconfig.KubeProxyConfiguration) bool {
if reflect.DeepEqual(config.HealthzBindAddresses, []string{"fd00:4321::2/128"}) &&
reflect.DeepEqual(config.MetricsBindAddresses, []string{"::1/128"}) &&
config.HealthzBindPort == 9090 && config.MetricsBindPort == 8080 {
return true
}
return false
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
@ -676,3 +608,71 @@ kind: KubeProxyConfiguration
})
}
}
func TestAddressFromDeprecatedFlags(t *testing.T) {
testCases := []struct {
name string
healthzPort int32
healthzBindAddress string
metricsPort int32
metricsBindAddress string
expHealthz string
expMetrics string
}{
{
name: "IPv4 bind address",
healthzBindAddress: "1.2.3.4",
healthzPort: 12345,
metricsBindAddress: "2.3.4.5",
metricsPort: 23456,
expHealthz: "1.2.3.4:12345",
expMetrics: "2.3.4.5:23456",
},
{
name: "IPv4 bind address has port",
healthzBindAddress: "1.2.3.4:12345",
healthzPort: 23456,
metricsBindAddress: "2.3.4.5:12345",
metricsPort: 23456,
expHealthz: "1.2.3.4:12345",
expMetrics: "2.3.4.5:12345",
},
{
name: "IPv6 bind address",
healthzBindAddress: "fd00:1::5",
healthzPort: 12345,
metricsBindAddress: "fd00:1::6",
metricsPort: 23456,
expHealthz: "[fd00:1::5]:12345",
expMetrics: "[fd00:1::6]:23456",
},
{
name: "IPv6 bind address has port",
healthzBindAddress: "[fd00:1::5]:12345",
healthzPort: 56789,
metricsBindAddress: "[fd00:1::6]:56789",
metricsPort: 12345,
expHealthz: "[fd00:1::5]:12345",
expMetrics: "[fd00:1::6]:56789",
},
{
name: "Invalid IPv6 Config",
healthzBindAddress: "[fd00:1::5]",
healthzPort: 12345,
metricsBindAddress: "[fd00:1::6]",
metricsPort: 56789,
expHealthz: "[fd00:1::5]",
expMetrics: "[fd00:1::6]",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
gotHealthz := addressFromDeprecatedFlags(tc.healthzBindAddress, tc.healthzPort)
gotMetrics := addressFromDeprecatedFlags(tc.metricsBindAddress, tc.metricsPort)
require.Equal(t, tc.expHealthz, gotHealthz)
require.Equal(t, tc.expMetrics, gotMetrics)
})
}
}

View File

@ -25,7 +25,6 @@ import (
"net"
"net/http"
"os"
"strconv"
"time"
"github.com/spf13/cobra"
@ -221,8 +220,8 @@ func newProxyServer(ctx context.Context, config *kubeproxyconfig.KubeProxyConfig
Namespace: "",
}
if len(config.HealthzBindAddresses) > 0 {
s.HealthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddresses, config.HealthzBindPort, 2*config.SyncPeriod.Duration)
if len(config.HealthzBindAddress) > 0 {
s.HealthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.SyncPeriod.Duration)
}
err = s.platformSetup(ctx)
@ -278,7 +277,7 @@ func checkBadConfig(s *ProxyServer) error {
}
}
// Warn if NodeAddressHandler does not limit connections on all IP families that
// Warn if NodePortAddresses does not limit connections on all IP families that
// seem to be in use.
cidrsByFamily := proxyutil.MapCIDRsByIPFamily(s.Config.NodePortAddresses)
if len(s.Config.NodePortAddresses) == 0 {
@ -313,7 +312,7 @@ func checkBadIPConfig(s *ProxyServer, dualStackSupported bool) (err error, fatal
clusterType = fmt.Sprintf("%s-only", s.PrimaryIPFamily)
}
if badCIDRs(s.Config.DetectLocal.ClusterCIDRs, badFamily, false) {
if badCIDRs(s.Config.DetectLocal.ClusterCIDRs, badFamily) {
errors = append(errors, fmt.Errorf("cluster is %s but clusterCIDRs contains only IPv%s addresses", clusterType, badFamily))
if s.Config.DetectLocalMode == kubeproxyconfig.LocalModeClusterCIDR && !dualStackSupported {
// This has always been a fatal error
@ -321,7 +320,7 @@ func checkBadIPConfig(s *ProxyServer, dualStackSupported bool) (err error, fatal
}
}
if badCIDRs(s.podCIDRs, badFamily, false) {
if badCIDRs(s.podCIDRs, badFamily) {
errors = append(errors, fmt.Errorf("cluster is %s but node.spec.podCIDRs contains only IPv%s addresses", clusterType, badFamily))
if s.Config.DetectLocalMode == kubeproxyconfig.LocalModeNodeCIDR {
// This has always been a fatal error
@ -336,41 +335,49 @@ func checkBadIPConfig(s *ProxyServer, dualStackSupported bool) (err error, fatal
// In some cases, wrong-IP-family is only a problem when the secondary IP family
// isn't present at all.
if !dualStackSupported {
if badCIDRs(s.Config.IPVS.ExcludeCIDRs, badFamily, false) {
if badCIDRs(s.Config.IPVS.ExcludeCIDRs, badFamily) {
errors = append(errors, fmt.Errorf("cluster is %s but ipvs.excludeCIDRs contains only IPv%s addresses", clusterType, badFamily))
}
if badCIDRs(s.Config.HealthzBindAddresses, badFamily, true) {
errors = append(errors, fmt.Errorf("cluster is %s but healthzBindAddresses doesn't contain IPv%s cidr", clusterType, badFamily))
if badBindAddress(s.Config.HealthzBindAddress, badFamily) {
errors = append(errors, fmt.Errorf("cluster is %s but healthzBindAddress is IPv%s", clusterType, badFamily))
}
if badCIDRs(s.Config.MetricsBindAddresses, badFamily, true) {
errors = append(errors, fmt.Errorf("cluster is %s but metricsBindAddresses doesn't contain IPv%s cidr", clusterType, badFamily))
if badBindAddress(s.Config.MetricsBindAddress, badFamily) {
errors = append(errors, fmt.Errorf("cluster is %s but metricsBindAddress is IPv%s", clusterType, badFamily))
}
}
// Note that s.Config.NodeAddressHandler gets checked as part of checkBadConfig()
// Note that s.Config.NodePortAddresses gets checked as part of checkBadConfig()
// so it doesn't need to be checked here.
return utilerrors.NewAggregate(errors), fatal
}
// badCIDRs returns true if cidrs is a non-empty list of CIDRs, all of wrongFamily.
// If allowUnspecified is false, unspecified addresses '0.0.0.0' and '::' will not be treated
// as part of either family.
func badCIDRs(cidrStrings []string, wrongFamily netutils.IPFamily, allowUnspecified bool) bool {
if len(cidrStrings) == 0 {
func badCIDRs(cidrs []string, wrongFamily netutils.IPFamily) bool {
if len(cidrs) == 0 {
return false
}
for _, cidrString := range cidrStrings {
ip, cidr, _ := netutils.ParseCIDRSloppy(cidrString)
maskSize, _ := cidr.Mask.Size()
if netutils.IPFamilyOf(ip) != wrongFamily || (allowUnspecified && (ip.IsUnspecified() && maskSize == 0)) {
for _, cidr := range cidrs {
if netutils.IPFamilyOfCIDRString(cidr) != wrongFamily {
return false
}
}
return true
}
// badBindAddress returns true if bindAddress is an "IP:port" string where IP is a
// non-zero IP of wrongFamily.
func badBindAddress(bindAddress string, wrongFamily netutils.IPFamily) bool {
if host, _, _ := net.SplitHostPort(bindAddress); host != "" {
ip := netutils.ParseIPSloppy(host)
if ip != nil && netutils.IPFamilyOf(ip) == wrongFamily && !ip.IsUnspecified() {
return true
}
}
return false
}
// createClient creates a kube client from the given config and masterOverride.
// TODO remove masterOverride when CLI flags are removed.
func createClient(ctx context.Context, config componentbaseconfig.ClientConnectionConfiguration, masterOverride string) (clientset.Interface, error) {
@ -412,7 +419,7 @@ func serveHealthz(ctx context.Context, hz *healthcheck.ProxierHealthServer, errC
}
fn := func() {
err := hz.Run(ctx)
err := hz.Run()
if err != nil {
logger.Error(err, "Healthz server failed")
if errCh != nil {
@ -428,9 +435,8 @@ func serveHealthz(ctx context.Context, hz *healthcheck.ProxierHealthServer, errC
go wait.Until(fn, 5*time.Second, ctx.Done())
}
func serveMetrics(ctx context.Context, cidrStrings []string, port int32, proxyMode kubeproxyconfig.ProxyMode, enableProfiling bool, errCh chan error) {
logger := klog.FromContext(ctx)
if len(cidrStrings) == 0 {
func serveMetrics(bindAddress string, proxyMode kubeproxyconfig.ProxyMode, enableProfiling bool, errCh chan error) {
if len(bindAddress) == 0 {
return
}
@ -453,62 +459,18 @@ func serveMetrics(ctx context.Context, cidrStrings []string, port int32, proxyMo
configz.InstallHandler(proxyMux)
var nodeIPs []net.IP
for _, ipFamily := range []v1.IPFamily{v1.IPv4Protocol, v1.IPv6Protocol} {
nah := proxyutil.NewNodeAddressHandler(ipFamily, cidrStrings)
if nah.MatchAll() {
// Some cloud-providers may assign IPs to a node after kube-proxy
// startup. The only way to listen on those IPs is to bind the server
// on 0.0.0.0. To handle this case we skip filtering NodeIPs by CIDRs
// and listen on 0.0.0.0 if any of the given CIDRs is a zero-cidr.
// (ref: https://github.com/kubernetes/kubernetes/pull/126889)
nodeIPs = []net.IP{net.IPv4zero}
break
} else {
ips, err := nah.GetNodeIPs(proxyutil.RealNetwork{})
nodeIPs = append(nodeIPs, ips...)
if err != nil {
logger.Error(err, "failed to get node IPs for metrics server", "ipFamily", ipFamily)
}
}
if len(nodeIPs) == 0 {
logger.Info("failed to get any node ip matching metricsBindAddresses", "metricsBindAddresses", cidrStrings)
}
}
var addrs []string
for _, nodeIP := range nodeIPs {
if nodeIP.IsLinkLocalUnicast() || nodeIP.IsLinkLocalMulticast() {
continue
}
addrs = append(addrs, net.JoinHostPort(nodeIP.String(), strconv.Itoa(int(port))))
}
fn := func() {
var err error
defer func() {
if err != nil {
err = fmt.Errorf("starting metrics server failed: %w", err)
utilruntime.HandleError(err)
if errCh != nil {
errCh <- err
// if in hardfail mode, never retry again
blockCh := make(chan error)
<-blockCh
}
err := http.ListenAndServe(bindAddress, proxyMux)
if err != nil {
err = fmt.Errorf("starting metrics server failed: %w", err)
utilruntime.HandleError(err)
if errCh != nil {
errCh <- err
// if in hardfail mode, never retry again
blockCh := make(chan error)
<-blockCh
}
}()
listener, err := netutils.MultiListen(ctx, "tcp", addrs...)
if err != nil {
return
}
server := &http.Server{Handler: proxyMux}
err = server.Serve(listener)
if err != nil {
return
}
}
go wait.Until(fn, 5*time.Second, wait.NeverStop)
}
@ -550,7 +512,7 @@ func (s *ProxyServer) Run(ctx context.Context) error {
serveHealthz(ctx, s.HealthzServer, healthzErrCh)
// Start up a metrics server if requested
serveMetrics(ctx, s.Config.MetricsBindAddresses, s.Config.MetricsBindPort, s.Config.Mode, s.Config.EnableProfiling, metricsErrCh)
serveMetrics(s.Config.MetricsBindAddress, s.Config.Mode, s.Config.EnableProfiling, metricsErrCh)
noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil)
if err != nil {

View File

@ -551,8 +551,7 @@ func Test_checkBadIPConfig(t *testing.T) {
name: "ok IPv4 metricsBindAddress",
proxy: &ProxyServer{
Config: &kubeproxyconfig.KubeProxyConfiguration{
MetricsBindAddresses: []string{"10.0.0.0/24"},
MetricsBindPort: 9999,
MetricsBindAddress: "10.0.0.1:9999",
},
PrimaryIPFamily: v1.IPv4Protocol,
},
@ -563,8 +562,7 @@ func Test_checkBadIPConfig(t *testing.T) {
name: "ok IPv6 metricsBindAddress",
proxy: &ProxyServer{
Config: &kubeproxyconfig.KubeProxyConfiguration{
MetricsBindAddresses: []string{"fd01:2345::/64"},
MetricsBindPort: 9999,
MetricsBindAddress: "[fd01:2345::1]:9999",
},
PrimaryIPFamily: v1.IPv6Protocol,
},
@ -575,8 +573,7 @@ func Test_checkBadIPConfig(t *testing.T) {
name: "ok unspecified wrong-family metricsBindAddress",
proxy: &ProxyServer{
Config: &kubeproxyconfig.KubeProxyConfiguration{
MetricsBindAddresses: []string{"0.0.0.0/0"},
MetricsBindPort: 9999,
MetricsBindAddress: "0.0.0.0:9999",
},
PrimaryIPFamily: v1.IPv6Protocol,
},
@ -587,8 +584,7 @@ func Test_checkBadIPConfig(t *testing.T) {
name: "wrong family metricsBindAddress",
proxy: &ProxyServer{
Config: &kubeproxyconfig.KubeProxyConfiguration{
MetricsBindAddresses: []string{"10.0.0.0/24"},
MetricsBindPort: 9999,
MetricsBindAddress: "10.0.0.1:9999",
},
PrimaryIPFamily: v1.IPv6Protocol,
},

View File

@ -97,7 +97,7 @@ func (s *ProxyServer) createProxier(ctx context.Context, config *proxyconfigapi.
s.NodeIPs,
s.Recorder,
s.HealthzServer,
int(config.HealthzBindPort),
config.HealthzBindAddress,
config.Winkernel,
)
} else {
@ -109,7 +109,7 @@ func (s *ProxyServer) createProxier(ctx context.Context, config *proxyconfigapi.
s.NodeIPs[s.PrimaryIPFamily],
s.Recorder,
s.HealthzServer,
int(config.HealthzBindPort),
config.HealthzBindAddress,
config.Winkernel,
)
}

View File

@ -91,13 +91,11 @@ func Funcs(codecs runtimeserializer.CodecFactory) []interface{} {
obj.Linux.Conntrack.TCPCloseWaitTimeout = &metav1.Duration{Duration: time.Duration(c.Int63()) * time.Hour}
obj.Linux.Conntrack.TCPEstablishedTimeout = &metav1.Duration{Duration: time.Duration(c.Int63()) * time.Hour}
obj.FeatureGates = map[string]bool{c.RandString(): true}
obj.HealthzBindAddresses = []string{fmt.Sprintf("%d.%d.%d.%d/32", c.Intn(256), c.Intn(256), c.Intn(256), c.Intn(256))}
obj.HealthzBindPort = c.Int31() % 65536
obj.HealthzBindAddress = fmt.Sprintf("%d.%d.%d.%d:%d", c.Intn(256), c.Intn(256), c.Intn(256), c.Intn(256), c.Intn(65536))
obj.IPTables.MasqueradeBit = ptr.To(c.Int31())
obj.IPTables.LocalhostNodePorts = ptr.To(c.RandBool())
obj.NFTables.MasqueradeBit = ptr.To(c.Int31())
obj.MetricsBindAddresses = []string{fmt.Sprintf("%d.%d.%d.%d/32", c.Intn(256), c.Intn(256), c.Intn(256), c.Intn(256))}
obj.MetricsBindPort = c.Int31() % 65536
obj.MetricsBindAddress = fmt.Sprintf("%d.%d.%d.%d:%d", c.Intn(256), c.Intn(256), c.Intn(256), c.Intn(256), c.Intn(65536))
obj.Linux.OOMScoreAdj = ptr.To(c.Int31())
obj.ClientConnection.ContentType = "bar"
obj.NodePortAddresses = []string{"1.2.3.0/24"}

View File

@ -183,16 +183,15 @@ type KubeProxyConfiguration struct {
// primary IP is. Note that the name is a historical artifact, and kube-proxy does
// not actually bind any sockets to this IP.
BindAddress string
// healthzBindAddresses is a list of CIDR ranges that contains a valid node IP on which
// the healthz server will be served on, defaulting to [ "0.0.0.0/0", "::/0" ].
HealthzBindAddresses []string
// healthzBindPort is the port on which healthz server will be exposed, defaulting to 10256.
HealthzBindPort int32
// metricsBindAddresses is a list of CIDR ranges that contains a valid node IP on which
// the metrics server will be served on, defaulting to [ "127.0.0.0/8", "::1/128" ].
MetricsBindAddresses []string
// metricsBindPort is the port on which metrics server will be exposed, defaulting to 10249.
MetricsBindPort int32
// healthzBindAddress is the IP address and port for the health check server to
// serve on, defaulting to "0.0.0.0:10256" (if bindAddress is unset or IPv4), or
// "[::]:10256" (if bindAddress is IPv6).
HealthzBindAddress string
// metricsBindAddress is the IP address and port for the metrics server to serve
// on, defaulting to "127.0.0.1:10249" (if bindAddress is unset or IPv4), or
// "[::1]:10249" (if bindAddress is IPv6). (Set to "0.0.0.0:10249" / "[::]:10249"
// to bind on all interfaces.)
MetricsBindAddress string
// bindAddressHardFail, if true, tells kube-proxy to treat failure to bind to a
// port as fatal and exit
BindAddressHardFail bool

View File

@ -17,15 +17,11 @@ limitations under the License.
package v1alpha1
import (
"fmt"
"net"
"strconv"
"strings"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/kube-proxy/config/v1alpha1"
"k8s.io/kubernetes/pkg/proxy/apis/config"
netutils "k8s.io/utils/net"
)
// Convert_config_KubeProxyConfiguration_To_v1alpha1_KubeProxyConfiguration is defined here, because public conversion is not auto-generated due to existing warnings.
@ -58,15 +54,6 @@ func Convert_config_KubeProxyConfiguration_To_v1alpha1_KubeProxyConfiguration(in
if len(in.DetectLocal.ClusterCIDRs) > 0 {
out.ClusterCIDR = strings.Join(in.DetectLocal.ClusterCIDRs, ",")
}
if len(in.HealthzBindAddresses) > 0 && in.HealthzBindPort > 0 {
host, _, _ := netutils.ParseCIDRSloppy(in.HealthzBindAddresses[0])
out.HealthzBindAddress = net.JoinHostPort(host.String(), strconv.Itoa(int(in.HealthzBindPort)))
}
if len(in.MetricsBindAddresses) > 0 && in.MetricsBindPort > 0 {
host, _, _ := netutils.ParseCIDRSloppy(in.MetricsBindAddresses[0])
out.MetricsBindAddress = net.JoinHostPort(host.String(), strconv.Itoa(int(in.MetricsBindPort)))
}
return nil
}
@ -100,33 +87,6 @@ func Convert_v1alpha1_KubeProxyConfiguration_To_config_KubeProxyConfiguration(in
if len(in.ClusterCIDR) > 0 {
out.DetectLocal.ClusterCIDRs = strings.Split(in.ClusterCIDR, ",")
}
var prefix int
host, portStr, _ := net.SplitHostPort(in.HealthzBindAddress)
port, _ := strconv.Atoi(portStr)
hostIP := netutils.ParseIPSloppy(host)
if hostIP.IsUnspecified() {
prefix = 0
} else if netutils.IsIPv4(hostIP) {
prefix = 32
} else {
prefix = 128
}
out.HealthzBindAddresses = []string{fmt.Sprintf("%s/%d", hostIP.String(), prefix)}
out.HealthzBindPort = int32(port)
host, portStr, _ = net.SplitHostPort(in.MetricsBindAddress)
port, _ = strconv.Atoi(portStr)
hostIP = netutils.ParseIPSloppy(host)
if hostIP.IsUnspecified() {
prefix = 0
} else if netutils.IsIPv4(hostIP) {
prefix = 32
} else {
prefix = 128
}
out.MetricsBindAddresses = []string{fmt.Sprintf("%s/%d", hostIP.String(), prefix)}
out.MetricsBindPort = int32(port)
return nil
}

View File

@ -138,8 +138,8 @@ func autoConvert_v1alpha1_KubeProxyConfiguration_To_config_KubeProxyConfiguratio
out.Logging = in.Logging
out.HostnameOverride = in.HostnameOverride
out.BindAddress = in.BindAddress
// WARNING: in.HealthzBindAddress requires manual conversion: does not exist in peer-type
// WARNING: in.MetricsBindAddress requires manual conversion: does not exist in peer-type
out.HealthzBindAddress = in.HealthzBindAddress
out.MetricsBindAddress = in.MetricsBindAddress
out.BindAddressHardFail = in.BindAddressHardFail
out.EnableProfiling = in.EnableProfiling
out.ShowHiddenMetricsForVersion = in.ShowHiddenMetricsForVersion
@ -180,10 +180,8 @@ func autoConvert_config_KubeProxyConfiguration_To_v1alpha1_KubeProxyConfiguratio
out.Logging = in.Logging
out.HostnameOverride = in.HostnameOverride
out.BindAddress = in.BindAddress
// WARNING: in.HealthzBindAddresses requires manual conversion: does not exist in peer-type
// WARNING: in.HealthzBindPort requires manual conversion: does not exist in peer-type
// WARNING: in.MetricsBindAddresses requires manual conversion: does not exist in peer-type
// WARNING: in.MetricsBindPort requires manual conversion: does not exist in peer-type
out.HealthzBindAddress = in.HealthzBindAddress
out.MetricsBindAddress = in.MetricsBindAddress
out.BindAddressHardFail = in.BindAddressHardFail
out.EnableProfiling = in.EnableProfiling
out.ShowHiddenMetricsForVersion = in.ShowHiddenMetricsForVersion

View File

@ -74,14 +74,10 @@ func Validate(config *kubeproxyconfig.KubeProxyConfiguration) field.ErrorList {
allErrs = append(allErrs, field.Invalid(newPath.Child("BindAddress"), config.BindAddress, "not a valid textual representation of an IP address"))
}
if len(config.HealthzBindAddresses) > 0 {
allErrs = append(allErrs, validateDualStackCIDRStrings(config.HealthzBindAddresses, newPath.Child("HealthzBindAddresses"))...)
if config.HealthzBindAddress != "" {
allErrs = append(allErrs, validateHostPort(config.HealthzBindAddress, newPath.Child("HealthzBindAddress"))...)
}
if config.HealthzBindPort > 0 {
allErrs = append(allErrs, validatePort(config.HealthzBindPort, newPath.Child("HealthzBindPort"))...)
}
allErrs = append(allErrs, validateDualStackCIDRStrings(config.MetricsBindAddresses, newPath.Child("MetricsBindAddresses"))...)
allErrs = append(allErrs, validatePort(config.MetricsBindPort, newPath.Child("MetricsBindPort"))...)
allErrs = append(allErrs, validateHostPort(config.MetricsBindAddress, newPath.Child("MetricsBindAddress"))...)
allErrs = append(allErrs, validateKubeProxyNodePortAddress(config.NodePortAddresses, newPath.Child("NodePortAddresses"))...)
allErrs = append(allErrs, validateShowHiddenMetricsVersion(config.ShowHiddenMetricsForVersion, newPath.Child("ShowHiddenMetricsForVersion"))...)
@ -351,11 +347,3 @@ func validateDetectLocalConfiguration(mode kubeproxyconfig.LocalMode, config kub
}
return allErrs
}
func validatePort(port int32, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
if port < 1 || port > 65535 {
allErrs = append(allErrs, field.Invalid(fldPath, port, "must be a valid port"))
}
return allErrs
}

View File

@ -33,12 +33,10 @@ import (
func TestValidateKubeProxyConfiguration(t *testing.T) {
baseConfig := &kubeproxyconfig.KubeProxyConfiguration{
BindAddress: "192.168.59.103",
HealthzBindAddresses: []string{"0.0.0.0/0"},
HealthzBindPort: 10256,
MetricsBindAddresses: []string{"127.0.0.0/8"},
MetricsBindPort: 10249,
DetectLocalMode: kubeproxyconfig.LocalModeClusterCIDR,
BindAddress: "192.168.59.103",
HealthzBindAddress: "0.0.0.0:10256",
MetricsBindAddress: "127.0.0.1:10249",
DetectLocalMode: kubeproxyconfig.LocalModeClusterCIDR,
DetectLocal: kubeproxyconfig.DetectLocalConfiguration{
ClusterCIDRs: []string{"192.168.59.0/24"},
},
@ -79,20 +77,20 @@ func TestValidateKubeProxyConfiguration(t *testing.T) {
},
"empty HealthzBindAddress": {
mutateConfigFunc: func(config *kubeproxyconfig.KubeProxyConfiguration) {
config.HealthzBindAddresses = []string{}
config.HealthzBindAddress = ""
},
},
"IPv6": {
mutateConfigFunc: func(config *kubeproxyconfig.KubeProxyConfiguration) {
config.BindAddress = "fd00:192:168:59::103"
config.HealthzBindAddresses = []string{}
config.MetricsBindAddresses = []string{"::1/128"}
config.HealthzBindAddress = ""
config.MetricsBindAddress = "[::1]:10249"
config.DetectLocal.ClusterCIDRs = []string{"fd00:192:168:59::/64"}
},
},
"alternate healthz port": {
mutateConfigFunc: func(config *kubeproxyconfig.KubeProxyConfiguration) {
config.HealthzBindPort = 12345
config.HealthzBindAddress = "0.0.0.0:12345"
},
},
"ClusterCIDR is wrong IP family": {
@ -127,29 +125,17 @@ func TestValidateKubeProxyConfiguration(t *testing.T) {
},
expectedErrs: field.ErrorList{field.Invalid(newPath.Child("BindAddress"), "10.10.12.11:2000", "not a valid textual representation of an IP address")},
},
"invalid HealthzBindAddresses": {
"invalid HealthzBindAddress": {
mutateConfigFunc: func(config *kubeproxyconfig.KubeProxyConfiguration) {
config.HealthzBindAddresses = []string{"0.0.0.0"}
config.HealthzBindAddress = "0.0.0.0"
},
expectedErrs: field.ErrorList{field.Invalid(newPath.Child("HealthzBindAddresses").Index(0), "0.0.0.0", "must be a valid CIDR block (e.g. 10.100.0.0/16 or fde4:8dba:82e1::/48)")},
expectedErrs: field.ErrorList{field.Invalid(newPath.Child("HealthzBindAddress"), "0.0.0.0", "must be IP:port")},
},
"invalid HealthzBindPort": {
"invalid MetricsBindAddress": {
mutateConfigFunc: func(config *kubeproxyconfig.KubeProxyConfiguration) {
config.HealthzBindPort = 1234567
config.MetricsBindAddress = "127.0.0.1"
},
expectedErrs: field.ErrorList{field.Invalid(newPath.Child("HealthzBindPort"), int32(1234567), "must be a valid port")},
},
"invalid MetricsBindAddresses": {
mutateConfigFunc: func(config *kubeproxyconfig.KubeProxyConfiguration) {
config.MetricsBindAddresses = []string{"127.0.0.1"}
},
expectedErrs: field.ErrorList{field.Invalid(newPath.Child("MetricsBindAddresses").Index(0), "127.0.0.1", "must be a valid CIDR block (e.g. 10.100.0.0/16 or fde4:8dba:82e1::/48)")},
},
"invalid MetricsBindPort": {
mutateConfigFunc: func(config *kubeproxyconfig.KubeProxyConfiguration) {
config.MetricsBindPort = 5432100
},
expectedErrs: field.ErrorList{field.Invalid(newPath.Child("MetricsBindPort"), int32(5432100), "must be a valid port")},
expectedErrs: field.ErrorList{field.Invalid(newPath.Child("MetricsBindAddress"), "127.0.0.1", "must be IP:port")},
},
"ConfigSyncPeriod must be > 0": {
mutateConfigFunc: func(config *kubeproxyconfig.KubeProxyConfiguration) {

View File

@ -62,16 +62,6 @@ func (in *KubeProxyConfiguration) DeepCopyInto(out *KubeProxyConfiguration) {
}
out.ClientConnection = in.ClientConnection
in.Logging.DeepCopyInto(&out.Logging)
if in.HealthzBindAddresses != nil {
in, out := &in.HealthzBindAddresses, &out.HealthzBindAddresses
*out = make([]string, len(*in))
copy(*out, *in)
}
if in.MetricsBindAddresses != nil {
in, out := &in.MetricsBindAddresses, &out.MetricsBindAddresses
*out = make([]string, len(*in))
copy(*out, *in)
}
in.IPTables.DeepCopyInto(&out.IPTables)
in.IPVS.DeepCopyInto(&out.IPVS)
out.Winkernel = in.Winkernel

View File

@ -17,25 +17,22 @@ limitations under the License.
package healthcheck
import (
"context"
"net"
"net/http"
netutils "k8s.io/utils/net"
)
// listener allows for testing of ServiceHealthServer and ProxierHealthServer.
type listener interface {
// Listen is very much like netutils.MultiListen, except the second arg (network) is
// Listen is very much like net.Listen, except the first arg (network) is
// fixed to be "tcp".
Listen(ctx context.Context, addrs ...string) (net.Listener, error)
Listen(addr string) (net.Listener, error)
}
// httpServerFactory allows for testing of ServiceHealthServer and ProxierHealthServer.
type httpServerFactory interface {
// New creates an instance of a type satisfying HTTPServer. This is
// designed to include http.Server.
New(handler http.Handler) httpServer
New(addr string, handler http.Handler) httpServer
}
// httpServer allows for testing of ServiceHealthServer and ProxierHealthServer.
@ -48,8 +45,8 @@ type httpServer interface {
// Implement listener in terms of net.Listen.
type stdNetListener struct{}
func (stdNetListener) Listen(ctx context.Context, addrs ...string) (net.Listener, error) {
return netutils.MultiListen(ctx, "tcp", addrs...)
func (stdNetListener) Listen(addr string) (net.Listener, error) {
return net.Listen("tcp", addr)
}
var _ listener = stdNetListener{}
@ -57,8 +54,9 @@ var _ listener = stdNetListener{}
// Implement httpServerFactory in terms of http.Server.
type stdHTTPServerFactory struct{}
func (stdHTTPServerFactory) New(handler http.Handler) httpServer {
func (stdHTTPServerFactory) New(addr string, handler http.Handler) httpServer {
return &http.Server{
Addr: addr,
Handler: handler,
}
}

View File

@ -17,7 +17,6 @@ limitations under the License.
package healthcheck
import (
"context"
"encoding/json"
"net"
"net/http"
@ -27,20 +26,18 @@ import (
"time"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/component-base/metrics/testutil"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/dump"
"k8s.io/apimachinery/pkg/util/sets"
basemetrics "k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/testutil"
"k8s.io/kubernetes/pkg/proxy/metrics"
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
proxyutiltest "k8s.io/kubernetes/pkg/proxy/util/testing"
testingclock "k8s.io/utils/clock/testing"
netutils "k8s.io/utils/net"
)
type fakeListener struct {
@ -57,17 +54,17 @@ func (fake *fakeListener) hasPort(addr string) bool {
return fake.openPorts.Has(addr)
}
func (fake *fakeListener) Listen(_ context.Context, addrs ...string) (net.Listener, error) {
fake.openPorts.Insert(addrs...)
func (fake *fakeListener) Listen(addr string) (net.Listener, error) {
fake.openPorts.Insert(addr)
return &fakeNetListener{
parent: fake,
addrs: addrs,
addr: addr,
}, nil
}
type fakeNetListener struct {
parent *fakeListener
addrs []string
addr string
}
type fakeAddr struct {
@ -85,7 +82,7 @@ func (fake *fakeNetListener) Accept() (net.Conn, error) {
}
func (fake *fakeNetListener) Close() error {
fake.parent.openPorts.Delete(fake.addrs...)
fake.parent.openPorts.Delete(fake.addr)
return nil
}
@ -100,13 +97,15 @@ func newFakeHTTPServerFactory() *fakeHTTPServerFactory {
return &fakeHTTPServerFactory{}
}
func (fake *fakeHTTPServerFactory) New(handler http.Handler) httpServer {
func (fake *fakeHTTPServerFactory) New(addr string, handler http.Handler) httpServer {
return &fakeHTTPServer{
addr: addr,
handler: handler,
}
}
type fakeHTTPServer struct {
addr string
handler http.Handler
}
@ -151,10 +150,10 @@ func (fake fakeProxierHealthChecker) IsHealthy() bool {
func TestServer(t *testing.T) {
listener := newFakeListener()
httpFactory := newFakeHTTPServerFactory()
nodeAddressHandler := proxyutil.NewNodeAddressHandler(v1.IPv4Protocol, []string{})
nodePortAddresses := proxyutil.NewNodePortAddresses(v1.IPv4Protocol, []string{})
proxyChecker := &fakeProxierHealthChecker{true}
hcsi := newServiceHealthServer("hostname", nil, listener, httpFactory, nodeAddressHandler, proxyChecker)
hcsi := newServiceHealthServer("hostname", nil, listener, httpFactory, nodePortAddresses, proxyChecker)
hcs := hcsi.(*server)
if len(hcs.services) != 0 {
t.Errorf("expected 0 services, got %d", len(hcs.services))
@ -465,61 +464,14 @@ type serverTest struct {
tracking503 int
}
func TestProxierHealthServer_NodeAddresses(t *testing.T) {
fakeInterfacer := proxyutiltest.NewFakeNetwork()
itf := net.Interface{Index: 0, MTU: 0, Name: "eth0", HardwareAddr: nil, Flags: 0}
addrs := []net.Addr{
&net.IPNet{IP: netutils.ParseIPSloppy("172.18.0.2"), Mask: net.CIDRMask(24, 32)},
&net.IPNet{IP: netutils.ParseIPSloppy("2001:db8::1"), Mask: net.CIDRMask(64, 128)},
}
fakeInterfacer.AddInterfaceAddr(&itf, addrs)
testCases := []struct {
name string
cidrStrings []string
expectedAddrs []string
}{
{
name: "ipv4 zero cidr",
cidrStrings: []string{"0.0.0.0/0", "2001:db8::/64"},
expectedAddrs: []string{"0.0.0.0:10256"},
},
{
name: "ipv6 zero cidr",
cidrStrings: []string{"172.18.0.0/24", "::/0"},
expectedAddrs: []string{"0.0.0.0:10256"},
},
{
name: "non zero cidrs",
cidrStrings: []string{"172.18.0.0/16", "2001:db8::/64"},
expectedAddrs: []string{"172.18.0.2:10256", "[2001:db8::1]:10256"},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
listener := newFakeListener()
httpFactory := newFakeHTTPServerFactory()
fakeClock := testingclock.NewFakeClock(time.Now())
hs := newProxierHealthServer(listener, httpFactory, fakeClock, fakeInterfacer, tc.cidrStrings, 10256, 10*time.Second)
require.Equal(t, tc.expectedAddrs, hs.addrs)
})
}
}
func TestHealthzServer(t *testing.T) {
metrics.RegisterMetrics("")
listener := newFakeListener()
httpFactory := newFakeHTTPServerFactory()
fakeClock := testingclock.NewFakeClock(time.Now())
fakeInterfacer := proxyutiltest.NewFakeNetwork()
itf := net.Interface{Index: 0, MTU: 0, Name: "lo", HardwareAddr: nil, Flags: 0}
addrs := []net.Addr{&net.IPNet{IP: netutils.ParseIPSloppy("127.0.0.1"), Mask: net.CIDRMask(24, 32)}}
fakeInterfacer.AddInterfaceAddr(&itf, addrs)
hs := newProxierHealthServer(listener, httpFactory, fakeClock, fakeInterfacer, []string{"127.0.0.0/8"}, 10256, 10*time.Second)
server := hs.httpFactory.New(healthzHandler{hs: hs})
hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second)
server := hs.httpFactory.New(hs.addr, healthzHandler{hs: hs})
hsTest := &serverTest{
server: server,
@ -553,12 +505,8 @@ func TestLivezServer(t *testing.T) {
httpFactory := newFakeHTTPServerFactory()
fakeClock := testingclock.NewFakeClock(time.Now())
fakeInterfacer := proxyutiltest.NewFakeNetwork()
itf := net.Interface{Index: 0, MTU: 0, Name: "lo", HardwareAddr: nil, Flags: 0}
addrs := []net.Addr{&net.IPNet{IP: netutils.ParseIPSloppy("127.0.0.1"), Mask: net.CIDRMask(24, 32)}}
fakeInterfacer.AddInterfaceAddr(&itf, addrs)
hs := newProxierHealthServer(listener, httpFactory, fakeClock, fakeInterfacer, []string{"127.0.0.0/8"}, 10256, 10*time.Second)
server := hs.httpFactory.New(livezHandler{hs: hs})
hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second)
server := hs.httpFactory.New(hs.addr, livezHandler{hs: hs})
hsTest := &serverTest{
server: server,
@ -716,9 +664,9 @@ func TestServerWithSelectiveListeningAddress(t *testing.T) {
// limiting addresses to loop back. We don't want any cleverness here around getting IP for
// machine nor testing ipv6 || ipv4. using loop back guarantees the test will work on any machine
nodeAddressHandler := proxyutil.NewNodeAddressHandler(v1.IPv4Protocol, []string{"127.0.0.0/8"})
nodePortAddresses := proxyutil.NewNodePortAddresses(v1.IPv4Protocol, []string{"127.0.0.0/8"})
hcsi := newServiceHealthServer("hostname", nil, listener, httpFactory, nodeAddressHandler, proxyChecker)
hcsi := newServiceHealthServer("hostname", nil, listener, httpFactory, nodePortAddresses, proxyChecker)
hcs := hcsi.(*server)
if len(hcs.services) != 0 {
t.Errorf("expected 0 services, got %d", len(hcs.services))

View File

@ -17,18 +17,14 @@ limitations under the License.
package healthcheck
import (
"context"
"fmt"
"net"
"net/http"
"strconv"
"sync"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/proxy/metrics"
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
"k8s.io/utils/clock"
)
@ -50,7 +46,7 @@ type ProxierHealthServer struct {
httpFactory httpServerFactory
clock clock.Clock
addrs []string
addr string
healthTimeout time.Duration
lock sync.RWMutex
@ -60,45 +56,16 @@ type ProxierHealthServer struct {
}
// NewProxierHealthServer returns a proxier health http server.
func NewProxierHealthServer(cidrStrings []string, port int32, healthTimeout time.Duration) *ProxierHealthServer {
return newProxierHealthServer(stdNetListener{}, stdHTTPServerFactory{}, clock.RealClock{}, proxyutil.RealNetwork{}, cidrStrings, port, healthTimeout)
func NewProxierHealthServer(addr string, healthTimeout time.Duration) *ProxierHealthServer {
return newProxierHealthServer(stdNetListener{}, stdHTTPServerFactory{}, clock.RealClock{}, addr, healthTimeout)
}
func newProxierHealthServer(listener listener, httpServerFactory httpServerFactory, c clock.Clock, nw proxyutil.NetworkInterfacer, cidrStrings []string, port int32, healthTimeout time.Duration) *ProxierHealthServer {
var nodeIPs []net.IP
for _, ipFamily := range []v1.IPFamily{v1.IPv4Protocol, v1.IPv6Protocol} {
nah := proxyutil.NewNodeAddressHandler(ipFamily, cidrStrings)
if nah.MatchAll() {
// Some cloud-providers may assign IPs to a node after kube-proxy
// startup. The only way to listen on those IPs is to bind the server
// on 0.0.0.0. To handle this case we skip filtering NodeIPs by CIDRs
// and listen on 0.0.0.0 if any of the given CIDRs is a zero-cidr.
// (ref: https://github.com/kubernetes/kubernetes/pull/126889)
nodeIPs = []net.IP{net.IPv4zero}
break
} else {
ips, err := nah.GetNodeIPs(nw)
nodeIPs = append(nodeIPs, ips...)
if err != nil {
klog.V(3).ErrorS(err, "Failed to get node IPs for healthz server", "ipFamily", ipFamily)
return nil
}
}
}
var addrs []string
for _, nodeIP := range nodeIPs {
if nodeIP.IsLinkLocalUnicast() || nodeIP.IsLinkLocalMulticast() {
continue
}
addrs = append(addrs, net.JoinHostPort(nodeIP.String(), strconv.Itoa(int(port))))
}
func newProxierHealthServer(listener listener, httpServerFactory httpServerFactory, c clock.Clock, addr string, healthTimeout time.Duration) *ProxierHealthServer {
return &ProxierHealthServer{
listener: listener,
httpFactory: httpServerFactory,
clock: c,
addrs: addrs,
addr: addr,
healthTimeout: healthTimeout,
lastUpdatedMap: make(map[v1.IPFamily]time.Time),
@ -195,18 +162,18 @@ func (hs *ProxierHealthServer) NodeEligible() bool {
}
// Run starts the healthz HTTP server and blocks until it exits.
func (hs *ProxierHealthServer) Run(ctx context.Context) error {
func (hs *ProxierHealthServer) Run() error {
serveMux := http.NewServeMux()
serveMux.Handle("/healthz", healthzHandler{hs: hs})
serveMux.Handle("/livez", livezHandler{hs: hs})
server := hs.httpFactory.New(serveMux)
server := hs.httpFactory.New(hs.addr, serveMux)
listener, err := hs.listener.Listen(ctx, hs.addrs...)
listener, err := hs.listener.Listen(hs.addr)
if err != nil {
return fmt.Errorf("failed to start proxier healthz on %s: %w", hs.addrs, err)
return fmt.Errorf("failed to start proxier healthz on %s: %v", hs.addr, err)
}
klog.V(3).InfoS("Starting healthz HTTP server", "address", hs.addrs)
klog.V(3).InfoS("Starting healthz HTTP server", "address", hs.addr)
if err := server.Serve(listener); err != nil {
return fmt.Errorf("proxier healthz closed with error: %v", err)

View File

@ -17,7 +17,6 @@ limitations under the License.
package healthcheck
import (
"context"
"fmt"
"net"
"net/http"
@ -58,17 +57,17 @@ type proxierHealthChecker interface {
IsHealthy() bool
}
func newServiceHealthServer(hostname string, recorder events.EventRecorder, listener listener, factory httpServerFactory, nodeAddressHandler *proxyutil.NodeAddressHandler, healthzServer proxierHealthChecker) ServiceHealthServer {
func newServiceHealthServer(hostname string, recorder events.EventRecorder, listener listener, factory httpServerFactory, nodePortAddresses *proxyutil.NodePortAddresses, healthzServer proxierHealthChecker) ServiceHealthServer {
// It doesn't matter whether we listen on "0.0.0.0", "::", or ""; go
// treats them all the same.
nodeIPs := []net.IP{net.IPv4zero}
if !nodeAddressHandler.MatchAll() {
ips, err := nodeAddressHandler.GetNodeIPs(proxyutil.RealNetwork{})
if !nodePortAddresses.MatchAll() {
ips, err := nodePortAddresses.GetNodeIPs(proxyutil.RealNetwork{})
if err == nil {
nodeIPs = ips
} else {
klog.ErrorS(err, "Failed to get node ip address matching node port addresses, health check port will listen to all node addresses", "nodeAddresses", nodeAddressHandler)
klog.ErrorS(err, "Failed to get node ip address matching node port addresses, health check port will listen to all node addresses", "nodePortAddresses", nodePortAddresses)
}
}
@ -84,7 +83,7 @@ func newServiceHealthServer(hostname string, recorder events.EventRecorder, list
}
// NewServiceHealthServer allocates a new service healthcheck server manager
func NewServiceHealthServer(hostname string, recorder events.EventRecorder, nodePortAddresses *proxyutil.NodeAddressHandler, healthzServer proxierHealthChecker) ServiceHealthServer {
func NewServiceHealthServer(hostname string, recorder events.EventRecorder, nodePortAddresses *proxyutil.NodePortAddresses, healthzServer proxierHealthChecker) ServiceHealthServer {
return newServiceHealthServer(hostname, recorder, stdNetListener{}, stdHTTPServerFactory{}, nodePortAddresses, healthzServer)
}
@ -171,9 +170,9 @@ func (hcI *hcInstance) listenAndServeAll(hcs *server) error {
for _, ip := range hcs.nodeIPs {
addr := net.JoinHostPort(ip.String(), fmt.Sprint(hcI.port))
// create http server
httpSrv := hcs.httpFactory.New(hcHandler{name: hcI.nsn, hcs: hcs})
httpSrv := hcs.httpFactory.New(addr, hcHandler{name: hcI.nsn, hcs: hcs})
// start listener
listener, err = hcs.listener.Listen(context.TODO(), addr)
listener, err = hcs.listener.Listen(addr)
if err != nil {
// must close whatever have been previously opened
// to allow a retry/or port ownership change as needed

View File

@ -205,8 +205,8 @@ type Proxier struct {
// conntrackTCPLiberal indicates whether the system sets the kernel nf_conntrack_tcp_be_liberal
conntrackTCPLiberal bool
// nodeAddressHandler selects the interfaces where nodePort works.
nodeAddressHandler *proxyutil.NodeAddressHandler
// nodePortAddresses selects the interfaces where nodePort works.
nodePortAddresses *proxyutil.NodePortAddresses
// networkInterfacer defines an interface for several net library functions.
// Inject for test purpose.
networkInterfacer proxyutil.NetworkInterfacer
@ -244,9 +244,9 @@ func NewProxier(ctx context.Context,
initOnly bool,
) (*Proxier, error) {
logger := klog.LoggerWithValues(klog.FromContext(ctx), "ipFamily", ipFamily)
nodeAddressHandler := proxyutil.NewNodeAddressHandler(ipFamily, nodePortAddressStrings)
nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nodePortAddressStrings)
if !nodeAddressHandler.ContainsIPv4Loopback() {
if !nodePortAddresses.ContainsIPv4Loopback() {
localhostNodePorts = false
}
if localhostNodePorts {
@ -277,7 +277,7 @@ func NewProxier(ctx context.Context,
masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue)
logger.V(2).Info("Using iptables mark for masquerade", "mark", masqueradeMark)
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodeAddressHandler, healthzServer)
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer)
nfacctRunner, err := nfacct.New()
if err != nil {
logger.Error(err, "Failed to create nfacct runner, nfacct based metrics won't be available")
@ -310,7 +310,7 @@ func NewProxier(ctx context.Context,
natChains: proxyutil.NewLineBuffer(),
natRules: proxyutil.NewLineBuffer(),
localhostNodePorts: localhostNodePorts,
nodeAddressHandler: nodeAddressHandler,
nodePortAddresses: nodePortAddresses,
networkInterfacer: proxyutil.RealNetwork{},
conntrackTCPLiberal: conntrackTCPLiberal,
logger: logger,
@ -1447,7 +1447,7 @@ func (proxier *Proxier) syncProxyRules() {
// Finally, tail-call to the nodePorts chain. This needs to be after all
// other service portal rules.
if proxier.nodeAddressHandler.MatchAll() {
if proxier.nodePortAddresses.MatchAll() {
destinations := []string{"-m", "addrtype", "--dst-type", "LOCAL"}
// Block localhost nodePorts if they are not supported. (For IPv6 they never
// work, and for IPv4 they only work if we previously set `route_localnet`.)
@ -1463,9 +1463,9 @@ func (proxier *Proxier) syncProxyRules() {
destinations,
"-j", string(kubeNodePortsChain))
} else {
nodeIPs, err := proxier.nodeAddressHandler.GetNodeIPs(proxier.networkInterfacer)
nodeIPs, err := proxier.nodePortAddresses.GetNodeIPs(proxier.networkInterfacer)
if err != nil {
proxier.logger.Error(err, "Failed to get node ip address matching nodeport cidrs, services with nodeport may not work as intended", "CIDRs", proxier.nodeAddressHandler)
proxier.logger.Error(err, "Failed to get node ip address matching nodeport cidrs, services with nodeport may not work as intended", "CIDRs", proxier.nodePortAddresses)
}
for _, ip := range nodeIPs {
if ip.IsLoopback() {

View File

@ -135,7 +135,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
natRules: proxyutil.NewLineBuffer(),
nodeIP: netutils.ParseIPSloppy(testNodeIP),
localhostNodePorts: true,
nodeAddressHandler: proxyutil.NewNodeAddressHandler(ipfamily, nil),
nodePortAddresses: proxyutil.NewNodePortAddresses(ipfamily, nil),
networkInterfacer: networkInterfacer,
nfAcctCounters: map[string]bool{
metrics.IPTablesCTStateInvalidDroppedNFAcctCounter: true,
@ -2352,7 +2352,7 @@ func TestNodePorts(t *testing.T) {
fp := NewFakeProxier(ipt)
fp.localhostNodePorts = tc.localhostNodePorts
if tc.nodePortAddresses != nil {
fp.nodeAddressHandler = proxyutil.NewNodeAddressHandler(tc.family, tc.nodePortAddresses)
fp.nodePortAddresses = proxyutil.NewNodePortAddresses(tc.family, tc.nodePortAddresses)
}
makeServiceMap(fp,
@ -2500,7 +2500,7 @@ func TestNodePorts(t *testing.T) {
func TestHealthCheckNodePort(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
fp.nodeAddressHandler = proxyutil.NewNodeAddressHandler(v1.IPv4Protocol, []string{"127.0.0.0/8"})
fp.nodePortAddresses = proxyutil.NewNodePortAddresses(v1.IPv4Protocol, []string{"127.0.0.0/8"})
svcIP := "172.30.0.42"
svcPort := 80

View File

@ -227,8 +227,8 @@ type Proxier struct {
netlinkHandle NetLinkHandle
// ipsetList is the list of ipsets that ipvs proxier used.
ipsetList map[string]*IPSet
// nodeAddressHandler selects the interfaces where nodePort works.
nodeAddressHandler *proxyutil.NodeAddressHandler
// nodePortAddresses selects the interfaces where nodePort works.
nodePortAddresses *proxyutil.NodePortAddresses
// networkInterfacer defines an interface for several net library functions.
// Inject for test purpose.
networkInterfacer proxyutil.NetworkInterfacer
@ -365,9 +365,9 @@ func NewProxier(
scheduler = defaultScheduler
}
nodeAddressHandler := proxyutil.NewNodeAddressHandler(ipFamily, nodePortAddressStrings)
nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nodePortAddressStrings)
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodeAddressHandler, healthzServer)
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer)
// excludeCIDRs has been validated before, here we just parse it to IPNet list
parsedExcludeCIDRs, _ := netutils.ParseCIDRs(excludeCIDRs)
@ -402,7 +402,7 @@ func NewProxier(
filterRules: proxyutil.NewLineBuffer(),
netlinkHandle: NewNetLinkHandle(ipFamily == v1.IPv6Protocol),
ipset: ipset,
nodeAddressHandler: nodeAddressHandler,
nodePortAddresses: nodePortAddresses,
networkInterfacer: proxyutil.RealNetwork{},
gracefuldeleteManager: NewGracefulTerminationManager(ipvs),
logger: logger,
@ -1000,12 +1000,12 @@ func (proxier *Proxier) syncProxyRules() {
// can be reused for all nodePort services.
var nodeIPs []net.IP
if hasNodePort {
if proxier.nodeAddressHandler.MatchAll() {
if proxier.nodePortAddresses.MatchAll() {
for _, ipStr := range nodeAddressSet.UnsortedList() {
nodeIPs = append(nodeIPs, netutils.ParseIPSloppy(ipStr))
}
} else {
allNodeIPs, err := proxier.nodeAddressHandler.GetNodeIPs(proxier.networkInterfacer)
allNodeIPs, err := proxier.nodePortAddresses.GetNodeIPs(proxier.networkInterfacer)
if err != nil {
proxier.logger.Error(err, "Failed to get node IP address matching nodeport cidr")
} else {

View File

@ -161,7 +161,7 @@ func NewFakeProxier(ctx context.Context, ipt utiliptables.Interface, ipvs utilip
filterRules: proxyutil.NewLineBuffer(),
netlinkHandle: netlinkHandle,
ipsetList: ipsetList,
nodeAddressHandler: proxyutil.NewNodeAddressHandler(ipFamily, nil),
nodePortAddresses: proxyutil.NewNodePortAddresses(ipFamily, nil),
networkInterfacer: proxyutiltest.NewFakeNetwork(),
gracefuldeleteManager: NewGracefulTerminationManager(ipvs),
ipFamily: ipFamily,
@ -951,7 +951,7 @@ func TestNodePortIPv4(t *testing.T) {
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, test.nodeIPs, nil, v1.IPv4Protocol)
fp.nodeAddressHandler = proxyutil.NewNodeAddressHandler(v1.IPv4Protocol, test.nodePortAddresses)
fp.nodePortAddresses = proxyutil.NewNodePortAddresses(v1.IPv4Protocol, test.nodePortAddresses)
makeServiceMap(fp, test.services...)
populateEndpointSlices(fp, test.endpoints...)
@ -1294,7 +1294,7 @@ func TestNodePortIPv6(t *testing.T) {
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, test.nodeIPs, nil, v1.IPv6Protocol)
fp.nodeAddressHandler = proxyutil.NewNodeAddressHandler(v1.IPv6Protocol, test.nodePortAddresses)
fp.nodePortAddresses = proxyutil.NewNodePortAddresses(v1.IPv6Protocol, test.nodePortAddresses)
makeServiceMap(fp, test.services...)
populateEndpointSlices(fp, test.endpoints...)
@ -2054,7 +2054,7 @@ func TestOnlyLocalNodePorts(t *testing.T) {
addrs1 := []net.Addr{&net.IPNet{IP: netutils.ParseIPSloppy("2001:db8::"), Mask: net.CIDRMask(64, 128)}}
fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf, addrs)
fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf1, addrs1)
fp.nodeAddressHandler = proxyutil.NewNodeAddressHandler(v1.IPv4Protocol, []string{"100.101.102.0/24"})
fp.nodePortAddresses = proxyutil.NewNodePortAddresses(v1.IPv4Protocol, []string{"100.101.102.0/24"})
fp.syncProxyRules()
@ -2142,7 +2142,7 @@ func TestHealthCheckNodePort(t *testing.T) {
addrs1 := []net.Addr{&net.IPNet{IP: netutils.ParseIPSloppy("2001:db8::"), Mask: net.CIDRMask(64, 128)}}
fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf, addrs)
fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf1, addrs1)
fp.nodeAddressHandler = proxyutil.NewNodeAddressHandler(v1.IPv4Protocol, []string{"100.101.102.0/24"})
fp.nodePortAddresses = proxyutil.NewNodePortAddresses(v1.IPv4Protocol, []string{"100.101.102.0/24"})
fp.syncProxyRules()

View File

@ -180,8 +180,8 @@ type Proxier struct {
serviceHealthServer healthcheck.ServiceHealthServer
healthzServer *healthcheck.ProxierHealthServer
// nodeAddressHandler selects the interfaces where nodePort works.
nodeAddressHandler *proxyutil.NodeAddressHandler
// nodePortAddresses selects the interfaces where nodePort works.
nodePortAddresses *proxyutil.NodePortAddresses
// networkInterfacer defines an interface for several net library functions.
// Inject for test purpose.
networkInterfacer proxyutil.NetworkInterfacer
@ -240,9 +240,9 @@ func NewProxier(ctx context.Context,
masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue)
logger.V(2).Info("Using nftables mark for masquerade", "mark", masqueradeMark)
nodeAddressHandler := proxyutil.NewNodeAddressHandler(ipFamily, nodePortAddressStrings)
nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nodePortAddressStrings)
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodeAddressHandler, healthzServer)
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer)
proxier := &Proxier{
ipFamily: ipFamily,
@ -262,7 +262,7 @@ func NewProxier(ctx context.Context,
recorder: recorder,
serviceHealthServer: serviceHealthServer,
healthzServer: healthzServer,
nodeAddressHandler: nodeAddressHandler,
nodePortAddresses: nodePortAddresses,
networkInterfacer: proxyutil.RealNetwork{},
staleChains: make(map[string]time.Time),
logger: logger,
@ -574,7 +574,7 @@ func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) {
Type: ipvX_addr,
Comment: ptr.To("IPs that accept NodePort traffic"),
})
if proxier.nodeAddressHandler.MatchAll() {
if proxier.nodePortAddresses.MatchAll() {
tx.Delete(&knftables.Set{
Name: nodePortIPsSet,
})
@ -582,9 +582,9 @@ func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) {
tx.Flush(&knftables.Set{
Name: nodePortIPsSet,
})
nodeIPs, err := proxier.nodeAddressHandler.GetNodeIPs(proxier.networkInterfacer)
nodeIPs, err := proxier.nodePortAddresses.GetNodeIPs(proxier.networkInterfacer)
if err != nil {
proxier.logger.Error(err, "Failed to get node ip address matching nodeport cidrs, services with nodeport may not work as intended", "CIDRs", proxier.nodeAddressHandler)
proxier.logger.Error(err, "Failed to get node ip address matching nodeport cidrs, services with nodeport may not work as intended", "CIDRs", proxier.nodePortAddresses)
}
for _, ip := range nodeIPs {
if ip.IsLoopback() {
@ -632,7 +632,7 @@ func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) {
),
})
if proxier.nodeAddressHandler.MatchAll() {
if proxier.nodePortAddresses.MatchAll() {
tx.Add(&knftables.Rule{
Chain: nodePortEndpointsCheckChain,
Rule: knftables.Concat(
@ -686,7 +686,7 @@ func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) {
"vmap", "@", serviceIPsMap,
),
})
if proxier.nodeAddressHandler.MatchAll() {
if proxier.nodePortAddresses.MatchAll() {
tx.Add(&knftables.Rule{
Chain: servicesChain,
Rule: knftables.Concat(

View File

@ -128,7 +128,7 @@ func NewFakeProxier(ipFamily v1.IPFamily) (*knftables.Fake, *Proxier) {
hostname: testHostname,
serviceHealthServer: healthcheck.NewFakeServiceHealthServer(),
nodeIP: nodeIP,
nodeAddressHandler: proxyutil.NewNodeAddressHandler(ipFamily, nodePortAddresses),
nodePortAddresses: proxyutil.NewNodePortAddresses(ipFamily, nodePortAddresses),
networkInterfacer: networkInterfacer,
staleChains: make(map[string]time.Time),
serviceCIDRs: serviceCIDRs,
@ -959,7 +959,7 @@ func TestNodePorts(t *testing.T) {
nodeIP = testNodeIPv6
}
if tc.nodePortAddresses != nil {
fp.nodeAddressHandler = proxyutil.NewNodeAddressHandler(tc.family, tc.nodePortAddresses)
fp.nodePortAddresses = proxyutil.NewNodePortAddresses(tc.family, tc.nodePortAddresses)
}
makeServiceMap(fp,

View File

@ -24,9 +24,8 @@ import (
netutils "k8s.io/utils/net"
)
// NodeAddressHandler is used to handle NodePortAddresses,
// HealthzBindAddresses and MetricsBindAddresses.
type NodeAddressHandler struct {
// NodePortAddresses is used to handle the --nodeport-addresses flag
type NodePortAddresses struct {
cidrStrings []string
cidrs []*net.IPNet
@ -37,65 +36,64 @@ type NodeAddressHandler struct {
// RFC 5735 127.0.0.0/8 - This block is assigned for use as the Internet host loopback address
var ipv4LoopbackStart = net.IPv4(127, 0, 0, 0)
// NewNodeAddressHandler takes an IP family and the CIDR strings (
// NodePortAddresses, HealthzBindAddresses or MetricsBindAddresses, which is
// NewNodePortAddresses takes an IP family and the `--nodeport-addresses` value (which is
// assumed to contain only valid CIDRs, potentially of both IP families) and returns a
// NodeAddressHandler object for the given family. If there are no CIDRs of the given
// NodePortAddresses object for the given family. If there are no CIDRs of the given
// family then the CIDR "0.0.0.0/0" or "::/0" will be added (even if there are CIDRs of
// the other family).
func NewNodeAddressHandler(family v1.IPFamily, cidrStrings []string) *NodeAddressHandler {
nah := &NodeAddressHandler{}
func NewNodePortAddresses(family v1.IPFamily, cidrStrings []string) *NodePortAddresses {
npa := &NodePortAddresses{}
// Filter CIDRs to correct family
for _, str := range cidrStrings {
if (family == v1.IPv4Protocol) == netutils.IsIPv4CIDRString(str) {
nah.cidrStrings = append(nah.cidrStrings, str)
npa.cidrStrings = append(npa.cidrStrings, str)
}
}
if len(nah.cidrStrings) == 0 {
if len(npa.cidrStrings) == 0 {
if family == v1.IPv4Protocol {
nah.cidrStrings = []string{IPv4ZeroCIDR}
npa.cidrStrings = []string{IPv4ZeroCIDR}
} else {
nah.cidrStrings = []string{IPv6ZeroCIDR}
npa.cidrStrings = []string{IPv6ZeroCIDR}
}
}
// Now parse
for _, str := range nah.cidrStrings {
for _, str := range npa.cidrStrings {
_, cidr, _ := netutils.ParseCIDRSloppy(str)
if netutils.IsIPv4CIDR(cidr) {
if cidr.IP.IsLoopback() || cidr.Contains(ipv4LoopbackStart) {
nah.containsIPv4Loopback = true
npa.containsIPv4Loopback = true
}
}
if IsZeroCIDR(str) {
// Ignore everything else
nah.cidrs = []*net.IPNet{cidr}
nah.matchAll = true
npa.cidrs = []*net.IPNet{cidr}
npa.matchAll = true
break
}
nah.cidrs = append(nah.cidrs, cidr)
npa.cidrs = append(npa.cidrs, cidr)
}
return nah
return npa
}
func (nah *NodeAddressHandler) String() string {
return fmt.Sprintf("%v", nah.cidrStrings)
func (npa *NodePortAddresses) String() string {
return fmt.Sprintf("%v", npa.cidrStrings)
}
// MatchAll returns true if nah matches all node IPs (of nah's given family)
func (nah *NodeAddressHandler) MatchAll() bool {
return nah.matchAll
// MatchAll returns true if npa matches all node IPs (of npa's given family)
func (npa *NodePortAddresses) MatchAll() bool {
return npa.matchAll
}
// GetNodeIPs return all matched node IP addresses for nah's CIDRs. If no matching
// GetNodeIPs return all matched node IP addresses for npa's CIDRs. If no matching
// IPs are found, it returns an empty list.
// NetworkInterfacer is injected for test purpose.
func (nah *NodeAddressHandler) GetNodeIPs(nw NetworkInterfacer) ([]net.IP, error) {
func (npa *NodePortAddresses) GetNodeIPs(nw NetworkInterfacer) ([]net.IP, error) {
addrs, err := nw.InterfaceAddrs()
if err != nil {
return nil, fmt.Errorf("error listing all interfaceAddrs from host, error: %v", err)
@ -103,7 +101,7 @@ func (nah *NodeAddressHandler) GetNodeIPs(nw NetworkInterfacer) ([]net.IP, error
// Use a map to dedup matches
addresses := make(map[string]net.IP)
for _, cidr := range nah.cidrs {
for _, cidr := range npa.cidrs {
for _, addr := range addrs {
var ip net.IP
// nw.InterfaceAddrs may return net.IPAddr or net.IPNet on windows, and it will return net.IPNet on linux.
@ -130,7 +128,7 @@ func (nah *NodeAddressHandler) GetNodeIPs(nw NetworkInterfacer) ([]net.IP, error
return ips, nil
}
// ContainsIPv4Loopback returns true if nah's CIDRs contain an IPv4 loopback address.
func (nah *NodeAddressHandler) ContainsIPv4Loopback() bool {
return nah.containsIPv4Loopback
// ContainsIPv4Loopback returns true if npa's CIDRs contain an IPv4 loopback address.
func (npa *NodePortAddresses) ContainsIPv4Loopback() bool {
return npa.containsIPv4Loopback
}

View File

@ -379,13 +379,13 @@ func TestGetNodeIPs(t *testing.T) {
}
for _, family := range []v1.IPFamily{v1.IPv4Protocol, v1.IPv6Protocol} {
nah := NewNodeAddressHandler(family, tc.cidrs)
npa := NewNodePortAddresses(family, tc.cidrs)
if nah.MatchAll() != tc.expected[family].matchAll {
if npa.MatchAll() != tc.expected[family].matchAll {
t.Errorf("unexpected MatchAll(%s), expected: %v", family, tc.expected[family].matchAll)
}
ips, err := nah.GetNodeIPs(nw)
ips, err := npa.GetNodeIPs(nw)
expectedIPs := tc.expected[family].ips
// The fake InterfaceAddrs() never returns an error, so
@ -451,13 +451,13 @@ func TestContainsIPv4Loopback(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
nah := NewNodeAddressHandler(v1.IPv4Protocol, tt.cidrStrings)
if got := nah.ContainsIPv4Loopback(); got != tt.want {
npa := NewNodePortAddresses(v1.IPv4Protocol, tt.cidrStrings)
if got := npa.ContainsIPv4Loopback(); got != tt.want {
t.Errorf("IPv4 ContainsIPv4Loopback() = %v, want %v", got, tt.want)
}
// ContainsIPv4Loopback should always be false for family=IPv6
nah = NewNodeAddressHandler(v1.IPv6Protocol, tt.cidrStrings)
if got := nah.ContainsIPv4Loopback(); got {
npa = NewNodePortAddresses(v1.IPv6Protocol, tt.cidrStrings)
if got := npa.ContainsIPv4Loopback(); got {
t.Errorf("IPv6 ContainsIPv4Loopback() = %v, want %v", got, false)
}
})

View File

@ -677,7 +677,7 @@ func NewProxier(
nodeIP net.IP,
recorder events.EventRecorder,
healthzServer *healthcheck.ProxierHealthServer,
healthzPort int,
healthzBindAddress string,
config config.KubeProxyWinkernelConfiguration,
) (*Proxier, error) {
if nodeIP == nil {
@ -686,8 +686,14 @@ func NewProxier(
}
// windows listens to all node addresses
nodeAddressHandler := proxyutil.NewNodeAddressHandler(ipFamily, nil)
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodeAddressHandler, healthzServer)
nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nil)
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer)
var healthzPort int
if len(healthzBindAddress) > 0 {
_, port, _ := net.SplitHostPort(healthzBindAddress)
healthzPort, _ = strconv.Atoi(port)
}
hcnImpl := newHcnImpl()
hns, supportedFeatures := newHostNetworkService(hcnImpl)
@ -808,14 +814,14 @@ func NewDualStackProxier(
nodeIPs map[v1.IPFamily]net.IP,
recorder events.EventRecorder,
healthzServer *healthcheck.ProxierHealthServer,
healthzPort int,
healthzBindAddress string,
config config.KubeProxyWinkernelConfiguration,
) (proxy.Provider, error) {
// Create an ipv4 instance of the single-stack proxier
ipv4Proxier, err := NewProxier(v1.IPv4Protocol, syncPeriod, minSyncPeriod,
hostname, nodeIPs[v1.IPv4Protocol], recorder, healthzServer,
healthzPort, config)
healthzBindAddress, config)
if err != nil {
return nil, fmt.Errorf("unable to create ipv4 proxier: %v, hostname: %s, nodeIP:%v", err, hostname, nodeIPs[v1.IPv4Protocol])
@ -823,7 +829,7 @@ func NewDualStackProxier(
ipv6Proxier, err := NewProxier(v1.IPv6Protocol, syncPeriod, minSyncPeriod,
hostname, nodeIPs[v1.IPv6Protocol], recorder, healthzServer,
healthzPort, config)
healthzBindAddress, config)
if err != nil {
return nil, fmt.Errorf("unable to create ipv6 proxier: %v, hostname: %s, nodeIP:%v", err, hostname, nodeIPs[v1.IPv6Protocol])
}