Merge pull request #126889 from aroradaman/kube-proxy-refactor-healthz-metrics-address

kube-proxy: internal config: refactor HealthzAddress and MetricsAddress
This commit is contained in:
Kubernetes Prow Robot 2024-10-01 14:45:49 +01:00 committed by GitHub
commit 98657377dc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 521 additions and 318 deletions

View File

@ -19,7 +19,9 @@ package app
import (
"context"
"fmt"
"net"
"os"
"strconv"
"strings"
"time"
@ -33,15 +35,14 @@ import (
logsapi "k8s.io/component-base/logs/api/v1"
"k8s.io/klog/v2"
"k8s.io/kube-proxy/config/v1alpha1"
"k8s.io/kubernetes/pkg/cluster/ports"
"k8s.io/kubernetes/pkg/kubelet/qos"
kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config"
proxyconfigscheme "k8s.io/kubernetes/pkg/proxy/apis/config/scheme"
kubeproxyconfigv1alpha1 "k8s.io/kubernetes/pkg/proxy/apis/config/v1alpha1"
"k8s.io/kubernetes/pkg/proxy/apis/config/validation"
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
"k8s.io/kubernetes/pkg/util/filesystem"
utilflag "k8s.io/kubernetes/pkg/util/flag"
netutils "k8s.io/utils/net"
"k8s.io/utils/ptr"
)
@ -71,10 +72,6 @@ type Options struct {
// master is used to override the kubeconfig's URL to the apiserver.
master string
// healthzPort is the port to be used by the healthz server.
healthzPort int32
// metricsPort is the port to be used by the metrics server.
metricsPort int32
// hostnameOverride, if set from the command line flag, takes precedence over the `HostnameOverride` value from the config file
hostnameOverride string
@ -88,6 +85,8 @@ type Options struct {
ipvsSyncPeriod time.Duration
ipvsMinSyncPeriod time.Duration
clusterCIDRs string
healthzBindAddress string
metricsBindAddress string
}
// AddFlags adds flags to fs and binds them to options.
@ -111,8 +110,8 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.hostnameOverride, "hostname-override", o.hostnameOverride, "If non-empty, will be used as the name of the Node that kube-proxy is running on. If unset, the node name is assumed to be the same as the node's hostname.")
fs.Var(&utilflag.IPVar{Val: &o.config.BindAddress}, "bind-address", "Overrides kube-proxy's idea of what its node's primary IP is. Note that the name is a historical artifact, and kube-proxy does not actually bind any sockets to this IP. This parameter is ignored if a config file is specified by --config.")
fs.Var(&utilflag.IPPortVar{Val: &o.config.HealthzBindAddress}, "healthz-bind-address", "The IP address and port for the health check server to serve on, defaulting to \"0.0.0.0:10256\". This parameter is ignored if a config file is specified by --config.")
fs.Var(&utilflag.IPPortVar{Val: &o.config.MetricsBindAddress}, "metrics-bind-address", "The IP address and port for the metrics server to serve on, defaulting to \"127.0.0.1:10249\". (Set to \"0.0.0.0:10249\" / \"[::]:10249\" to bind on all interfaces.) Set empty to disable. This parameter is ignored if a config file is specified by --config.")
fs.Var(&utilflag.IPPortVar{Val: &o.healthzBindAddress}, "healthz-bind-address", "The IP address and port for the health check server to serve on, defaulting to \"0.0.0.0:10256\". This parameter is ignored if a config file is specified by --config.")
fs.Var(&utilflag.IPPortVar{Val: &o.metricsBindAddress}, "metrics-bind-address", "The IP address and port for the metrics server to serve on, defaulting to \"127.0.0.1:10249\". (Set to \"0.0.0.0:10249\" / \"[::]:10249\" to bind on all interfaces.) Set empty to disable. This parameter is ignored if a config file is specified by --config.")
fs.BoolVar(&o.config.BindAddressHardFail, "bind-address-hard-fail", o.config.BindAddressHardFail, "If true kube-proxy will treat failure to bind to a port as fatal and exit")
fs.BoolVar(&o.config.EnableProfiling, "profiling", o.config.EnableProfiling, "If true enables profiling via web interface on /debug/pprof handler. This parameter is ignored if a config file is specified by --config.")
fs.StringVar(&o.config.ShowHiddenMetricsForVersion, "show-hidden-metrics-for-version", o.config.ShowHiddenMetricsForVersion,
@ -166,11 +165,6 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&o.config.Linux.Conntrack.UDPStreamTimeout.Duration, "conntrack-udp-timeout-stream", o.config.Linux.Conntrack.UDPStreamTimeout.Duration, "Idle timeout for ASSURED UDP connections (0 to leave as-is)")
fs.DurationVar(&o.config.ConfigSyncPeriod.Duration, "config-sync-period", o.config.ConfigSyncPeriod.Duration, "How often configuration from the apiserver is refreshed. Must be greater than 0.")
fs.Int32Var(&o.healthzPort, "healthz-port", o.healthzPort, "The port to bind the health check server. Use 0 to disable.")
_ = fs.MarkDeprecated("healthz-port", "This flag is deprecated and will be removed in a future release. Please use --healthz-bind-address instead.")
fs.Int32Var(&o.metricsPort, "metrics-port", o.metricsPort, "The port to bind the metrics server. Use 0 to disable.")
_ = fs.MarkDeprecated("metrics-port", "This flag is deprecated and will be removed in a future release. Please use --metrics-bind-address instead.")
logsapi.AddFlags(&o.config.Logging, fs)
}
@ -189,21 +183,14 @@ func newKubeProxyConfiguration() *kubeproxyconfig.KubeProxyConfiguration {
// NewOptions returns initialized Options
func NewOptions() *Options {
return &Options{
config: newKubeProxyConfiguration(),
healthzPort: ports.ProxyHealthzPort,
metricsPort: ports.ProxyStatusPort,
errCh: make(chan error),
logger: klog.FromContext(context.Background()),
config: newKubeProxyConfiguration(),
errCh: make(chan error),
logger: klog.FromContext(context.Background()),
}
}
// Complete completes all the required options.
func (o *Options) Complete(fs *pflag.FlagSet) error {
if len(o.ConfigFile) == 0 && len(o.WriteConfigTo) == 0 {
o.config.HealthzBindAddress = addressFromDeprecatedFlags(o.config.HealthzBindAddress, o.healthzPort)
o.config.MetricsBindAddress = addressFromDeprecatedFlags(o.config.MetricsBindAddress, o.metricsPort)
}
// Load the config file here in Complete, so that Validate validates the fully-resolved config.
if len(o.ConfigFile) > 0 {
c, err := o.loadConfigFromFile(o.ConfigFile)
@ -328,6 +315,32 @@ func (o *Options) processV1Alpha1Flags(fs *pflag.FlagSet) {
if fs.Changed("cluster-cidr") {
o.config.DetectLocal.ClusterCIDRs = strings.Split(o.clusterCIDRs, ",")
}
if fs.Changed("healthz-bind-address") {
host, port, _ := net.SplitHostPort(o.healthzBindAddress)
ip := netutils.ParseIPSloppy(host)
if ip.IsUnspecified() {
o.config.HealthzBindAddresses = []string{fmt.Sprintf("%s/0", host)}
} else if netutils.IsIPv4(ip) {
o.config.HealthzBindAddresses = []string{fmt.Sprintf("%s/32", host)}
} else {
o.config.HealthzBindAddresses = []string{fmt.Sprintf("%s/128", host)}
}
intPort, _ := strconv.Atoi(port)
o.config.HealthzBindPort = int32(intPort)
}
if fs.Changed("metrics-bind-address") {
host, port, _ := net.SplitHostPort(o.metricsBindAddress)
ip := netutils.ParseIPSloppy(host)
if ip.IsUnspecified() {
o.config.MetricsBindAddresses = []string{fmt.Sprintf("%s/0", host)}
} else if netutils.IsIPv4(ip) {
o.config.MetricsBindAddresses = []string{fmt.Sprintf("%s/32", host)}
} else {
o.config.MetricsBindAddresses = []string{fmt.Sprintf("%s/128", host)}
}
intPort, _ := strconv.Atoi(port)
o.config.MetricsBindPort = int32(intPort)
}
}
// Validate validates all the required options.
@ -416,17 +429,6 @@ func (o *Options) writeConfigFile() (err error) {
return nil
}
// addressFromDeprecatedFlags returns server address from flags
// passed on the command line based on the following rules:
// 1. If port is 0, disable the server (e.g. set address to empty).
// 2. Otherwise, set the port portion of the config accordingly.
func addressFromDeprecatedFlags(addr string, port int32) string {
if port == 0 {
return ""
}
return proxyutil.AppendPortIfNeeded(addr, port)
}
// newLenientSchemeAndCodecs returns a scheme that has only v1alpha1 registered into
// it and a CodecFactory with strict decoding disabled.
func newLenientSchemeAndCodecs() (*runtime.Scheme, *serializer.CodecFactory, error) {

View File

@ -89,94 +89,130 @@ nodePortAddresses:
`
testCases := []struct {
name string
mode string
bindAddress string
clusterCIDR string
healthzBindAddress string
metricsBindAddress string
extraConfig string
name string
mode string
bindAddress string
clusterCIDR string
healthzBindAddress string
metricsBindAddress string
extraConfig string
expectedHealthzBindAddresses []string
expectedHealthzBindPort int32
expectedMetricsBindAddresses []string
expectedMetricsBindPort int32
}{
{
name: "iptables mode, IPv4 all-zeros bind address",
mode: "iptables",
bindAddress: "0.0.0.0",
clusterCIDR: "1.2.3.0/24",
healthzBindAddress: "1.2.3.4:12345",
metricsBindAddress: "2.3.4.5:23456",
name: "iptables mode, IPv4 all-zeros bind address",
mode: "iptables",
bindAddress: "0.0.0.0",
clusterCIDR: "1.2.3.0/24",
healthzBindAddress: "1.2.3.4:12345",
metricsBindAddress: "2.3.4.5:23456",
expectedHealthzBindAddresses: []string{"1.2.3.4/32"},
expectedHealthzBindPort: int32(12345),
expectedMetricsBindAddresses: []string{"2.3.4.5/32"},
expectedMetricsBindPort: int32(23456),
},
{
name: "iptables mode, non-zeros IPv4 config",
mode: "iptables",
bindAddress: "9.8.7.6",
clusterCIDR: "1.2.3.0/24",
healthzBindAddress: "1.2.3.4:12345",
metricsBindAddress: "2.3.4.5:23456",
name: "iptables mode, non-zeros IPv4 config",
mode: "iptables",
bindAddress: "9.8.7.6",
clusterCIDR: "1.2.3.0/24",
healthzBindAddress: "1.2.3.4:12345",
metricsBindAddress: "2.3.4.5:23456",
expectedHealthzBindAddresses: []string{"1.2.3.4/32"},
expectedHealthzBindPort: int32(12345),
expectedMetricsBindAddresses: []string{"2.3.4.5/32"},
expectedMetricsBindPort: int32(23456),
},
{
// Test for 'bindAddress: "::"' (IPv6 all-zeros) in kube-proxy
// config file. The user will need to put quotes around '::' since
// 'bindAddress: ::' is invalid yaml syntax.
name: "iptables mode, IPv6 \"::\" bind address",
mode: "iptables",
bindAddress: "\"::\"",
clusterCIDR: "fd00:1::0/64",
healthzBindAddress: "[fd00:1::5]:12345",
metricsBindAddress: "[fd00:2::5]:23456",
name: "iptables mode, IPv6 \"::\" bind address",
mode: "iptables",
bindAddress: "\"::\"",
clusterCIDR: "fd00:1::0/64",
healthzBindAddress: "[fd00:1::5]:12345",
metricsBindAddress: "[fd00:2::5]:23456",
expectedHealthzBindAddresses: []string{"fd00:1::5/128"},
expectedHealthzBindPort: int32(12345),
expectedMetricsBindAddresses: []string{"fd00:2::5/128"},
expectedMetricsBindPort: int32(23456),
},
{
// Test for 'bindAddress: "[::]"' (IPv6 all-zeros in brackets)
// in kube-proxy config file. The user will need to use
// surrounding quotes here since 'bindAddress: [::]' is invalid
// yaml syntax.
name: "iptables mode, IPv6 \"[::]\" bind address",
mode: "iptables",
bindAddress: "\"[::]\"",
clusterCIDR: "fd00:1::0/64",
healthzBindAddress: "[fd00:1::5]:12345",
metricsBindAddress: "[fd00:2::5]:23456",
name: "iptables mode, IPv6 \"[::]\" bind address",
mode: "iptables",
bindAddress: "\"[::]\"",
clusterCIDR: "fd00:1::0/64",
healthzBindAddress: "[fd00:1::5]:12345",
metricsBindAddress: "[fd00:2::5]:23456",
expectedHealthzBindAddresses: []string{"fd00:1::5/128"},
expectedHealthzBindPort: int32(12345),
expectedMetricsBindAddresses: []string{"fd00:2::5/128"},
expectedMetricsBindPort: int32(23456),
},
{
// Test for 'bindAddress: ::0' (another form of IPv6 all-zeros).
// No surrounding quotes are required around '::0'.
name: "iptables mode, IPv6 ::0 bind address",
mode: "iptables",
bindAddress: "::0",
clusterCIDR: "fd00:1::0/64",
healthzBindAddress: "[fd00:1::5]:12345",
metricsBindAddress: "[fd00:2::5]:23456",
name: "iptables mode, IPv6 ::0 bind address",
mode: "iptables",
bindAddress: "::0",
clusterCIDR: "fd00:1::0/64",
healthzBindAddress: "[fd00:1::5]:12345",
metricsBindAddress: "[fd00:2::5]:23456",
expectedHealthzBindAddresses: []string{"fd00:1::5/128"},
expectedHealthzBindPort: int32(12345),
expectedMetricsBindAddresses: []string{"fd00:2::5/128"},
expectedMetricsBindPort: int32(23456),
},
{
name: "ipvs mode, IPv6 config",
mode: "ipvs",
bindAddress: "2001:db8::1",
clusterCIDR: "fd00:1::0/64",
healthzBindAddress: "[fd00:1::5]:12345",
metricsBindAddress: "[fd00:2::5]:23456",
name: "ipvs mode, IPv6 config",
mode: "ipvs",
bindAddress: "2001:db8::1",
clusterCIDR: "fd00:1::0/64",
healthzBindAddress: "[fd00:1::5]:12345",
metricsBindAddress: "[fd00:2::5]:23456",
expectedHealthzBindAddresses: []string{"fd00:1::5/128"},
expectedHealthzBindPort: int32(12345),
expectedMetricsBindAddresses: []string{"fd00:2::5/128"},
expectedMetricsBindPort: int32(23456),
},
{
// Test for unknown field within config.
// For v1alpha1 a lenient path is implemented and will throw a
// strict decoding warning instead of failing to load
name: "unknown field",
mode: "iptables",
bindAddress: "9.8.7.6",
clusterCIDR: "1.2.3.0/24",
healthzBindAddress: "1.2.3.4:12345",
metricsBindAddress: "2.3.4.5:23456",
extraConfig: "foo: bar",
name: "unknown field",
mode: "iptables",
bindAddress: "9.8.7.6",
clusterCIDR: "1.2.3.0/24",
healthzBindAddress: "1.2.3.4:12345",
metricsBindAddress: "2.3.4.5:23456",
extraConfig: "foo: bar",
expectedHealthzBindAddresses: []string{"1.2.3.4/32"},
expectedHealthzBindPort: int32(12345),
expectedMetricsBindAddresses: []string{"2.3.4.5/32"},
expectedMetricsBindPort: int32(23456),
},
{
// Test for duplicate field within config.
// For v1alpha1 a lenient path is implemented and will throw a
// strict decoding warning instead of failing to load
name: "duplicate field",
mode: "iptables",
bindAddress: "9.8.7.6",
clusterCIDR: "1.2.3.0/24",
healthzBindAddress: "1.2.3.4:12345",
metricsBindAddress: "2.3.4.5:23456",
extraConfig: "bindAddress: 9.8.7.6",
name: "duplicate field",
mode: "iptables",
bindAddress: "9.8.7.6",
clusterCIDR: "1.2.3.0/24",
healthzBindAddress: "1.2.3.4:12345",
metricsBindAddress: "2.3.4.5:23456",
extraConfig: "bindAddress: 9.8.7.6",
expectedHealthzBindAddresses: []string{"1.2.3.4/32"},
expectedHealthzBindPort: int32(12345),
expectedMetricsBindAddresses: []string{"2.3.4.5/32"},
expectedMetricsBindPort: int32(23456),
},
}
@ -209,9 +245,10 @@ nodePortAddresses:
MasqueradeAll: true,
OOMScoreAdj: ptr.To[int32](17),
},
FeatureGates: map[string]bool{},
HealthzBindAddress: tc.healthzBindAddress,
HostnameOverride: "foo",
FeatureGates: map[string]bool{},
HealthzBindAddresses: tc.expectedHealthzBindAddresses,
HealthzBindPort: tc.expectedHealthzBindPort,
HostnameOverride: "foo",
IPTables: kubeproxyconfig.KubeProxyIPTablesConfiguration{
MasqueradeBit: ptr.To[int32](17),
LocalhostNodePorts: ptr.To(true),
@ -222,10 +259,11 @@ nodePortAddresses:
NFTables: kubeproxyconfig.KubeProxyNFTablesConfiguration{
MasqueradeBit: ptr.To[int32](18),
},
MetricsBindAddress: tc.metricsBindAddress,
Mode: kubeproxyconfig.ProxyMode(tc.mode),
NodePortAddresses: []string{"10.20.30.40/16", "fd00:1::0/64"},
DetectLocalMode: kubeproxyconfig.LocalModeClusterCIDR,
MetricsBindAddresses: tc.expectedMetricsBindAddresses,
MetricsBindPort: tc.expectedMetricsBindPort,
Mode: kubeproxyconfig.ProxyMode(tc.mode),
NodePortAddresses: []string{"10.20.30.40/16", "fd00:1::0/64"},
DetectLocalMode: kubeproxyconfig.LocalModeClusterCIDR,
DetectLocal: kubeproxyconfig.DetectLocalConfiguration{
BridgeInterface: "cbr0",
ClusterCIDRs: strings.Split(tc.clusterCIDR, ","),
@ -454,6 +492,36 @@ func TestProcessV1Alpha1Flags(t *testing.T) {
return reflect.DeepEqual(config.DetectLocal.ClusterCIDRs, []string{"2002:0:0:1234::/64", "10.0.0.0/14"})
},
},
{
name: "metrics and healthz address ipv4",
flags: []string{
"--healthz-bind-address=0.0.0.0:54321",
"--metrics-bind-address=127.0.0.1:3306",
},
validate: func(config *kubeproxyconfig.KubeProxyConfiguration) bool {
if reflect.DeepEqual(config.HealthzBindAddresses, []string{"0.0.0.0/0"}) &&
reflect.DeepEqual(config.MetricsBindAddresses, []string{"127.0.0.1/32"}) &&
config.HealthzBindPort == 54321 && config.MetricsBindPort == 3306 {
return true
}
return false
},
},
{
name: "metrics and healthz address ipv6",
flags: []string{
"--healthz-bind-address=[fd00:4321::2]:9090",
"--metrics-bind-address=[::1]:8080",
},
validate: func(config *kubeproxyconfig.KubeProxyConfiguration) bool {
if reflect.DeepEqual(config.HealthzBindAddresses, []string{"fd00:4321::2/128"}) &&
reflect.DeepEqual(config.MetricsBindAddresses, []string{"::1/128"}) &&
config.HealthzBindPort == 9090 && config.MetricsBindPort == 8080 {
return true
}
return false
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
@ -592,71 +660,3 @@ kind: KubeProxyConfiguration
})
}
}
func TestAddressFromDeprecatedFlags(t *testing.T) {
testCases := []struct {
name string
healthzPort int32
healthzBindAddress string
metricsPort int32
metricsBindAddress string
expHealthz string
expMetrics string
}{
{
name: "IPv4 bind address",
healthzBindAddress: "1.2.3.4",
healthzPort: 12345,
metricsBindAddress: "2.3.4.5",
metricsPort: 23456,
expHealthz: "1.2.3.4:12345",
expMetrics: "2.3.4.5:23456",
},
{
name: "IPv4 bind address has port",
healthzBindAddress: "1.2.3.4:12345",
healthzPort: 23456,
metricsBindAddress: "2.3.4.5:12345",
metricsPort: 23456,
expHealthz: "1.2.3.4:12345",
expMetrics: "2.3.4.5:12345",
},
{
name: "IPv6 bind address",
healthzBindAddress: "fd00:1::5",
healthzPort: 12345,
metricsBindAddress: "fd00:1::6",
metricsPort: 23456,
expHealthz: "[fd00:1::5]:12345",
expMetrics: "[fd00:1::6]:23456",
},
{
name: "IPv6 bind address has port",
healthzBindAddress: "[fd00:1::5]:12345",
healthzPort: 56789,
metricsBindAddress: "[fd00:1::6]:56789",
metricsPort: 12345,
expHealthz: "[fd00:1::5]:12345",
expMetrics: "[fd00:1::6]:56789",
},
{
name: "Invalid IPv6 Config",
healthzBindAddress: "[fd00:1::5]",
healthzPort: 12345,
metricsBindAddress: "[fd00:1::6]",
metricsPort: 56789,
expHealthz: "[fd00:1::5]",
expMetrics: "[fd00:1::6]",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
gotHealthz := addressFromDeprecatedFlags(tc.healthzBindAddress, tc.healthzPort)
gotMetrics := addressFromDeprecatedFlags(tc.metricsBindAddress, tc.metricsPort)
require.Equal(t, tc.expHealthz, gotHealthz)
require.Equal(t, tc.expMetrics, gotMetrics)
})
}
}

View File

@ -25,6 +25,7 @@ import (
"net"
"net/http"
"os"
"strconv"
"time"
"github.com/spf13/cobra"
@ -220,8 +221,8 @@ func newProxyServer(ctx context.Context, config *kubeproxyconfig.KubeProxyConfig
Namespace: "",
}
if len(config.HealthzBindAddress) > 0 {
s.HealthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.SyncPeriod.Duration)
if len(config.HealthzBindAddresses) > 0 {
s.HealthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddresses, config.HealthzBindPort, 2*config.SyncPeriod.Duration)
}
err = s.platformSetup(ctx)
@ -312,7 +313,7 @@ func checkBadIPConfig(s *ProxyServer, dualStackSupported bool) (err error, fatal
clusterType = fmt.Sprintf("%s-only", s.PrimaryIPFamily)
}
if badCIDRs(s.Config.DetectLocal.ClusterCIDRs, badFamily) {
if badCIDRs(s.Config.DetectLocal.ClusterCIDRs, badFamily, false) {
errors = append(errors, fmt.Errorf("cluster is %s but clusterCIDRs contains only IPv%s addresses", clusterType, badFamily))
if s.Config.DetectLocalMode == kubeproxyconfig.LocalModeClusterCIDR && !dualStackSupported {
// This has always been a fatal error
@ -320,7 +321,7 @@ func checkBadIPConfig(s *ProxyServer, dualStackSupported bool) (err error, fatal
}
}
if badCIDRs(s.podCIDRs, badFamily) {
if badCIDRs(s.podCIDRs, badFamily, false) {
errors = append(errors, fmt.Errorf("cluster is %s but node.spec.podCIDRs contains only IPv%s addresses", clusterType, badFamily))
if s.Config.DetectLocalMode == kubeproxyconfig.LocalModeNodeCIDR {
// This has always been a fatal error
@ -335,15 +336,15 @@ func checkBadIPConfig(s *ProxyServer, dualStackSupported bool) (err error, fatal
// In some cases, wrong-IP-family is only a problem when the secondary IP family
// isn't present at all.
if !dualStackSupported {
if badCIDRs(s.Config.IPVS.ExcludeCIDRs, badFamily) {
if badCIDRs(s.Config.IPVS.ExcludeCIDRs, badFamily, false) {
errors = append(errors, fmt.Errorf("cluster is %s but ipvs.excludeCIDRs contains only IPv%s addresses", clusterType, badFamily))
}
if badBindAddress(s.Config.HealthzBindAddress, badFamily) {
errors = append(errors, fmt.Errorf("cluster is %s but healthzBindAddress is IPv%s", clusterType, badFamily))
if badCIDRs(s.Config.HealthzBindAddresses, badFamily, true) {
errors = append(errors, fmt.Errorf("cluster is %s but healthzBindAddresses doesn't contain IPv%s cidr", clusterType, badFamily))
}
if badBindAddress(s.Config.MetricsBindAddress, badFamily) {
errors = append(errors, fmt.Errorf("cluster is %s but metricsBindAddress is IPv%s", clusterType, badFamily))
if badCIDRs(s.Config.MetricsBindAddresses, badFamily, true) {
errors = append(errors, fmt.Errorf("cluster is %s but metricsBindAddresses doesn't contain IPv%s cidr", clusterType, badFamily))
}
}
@ -354,30 +355,22 @@ func checkBadIPConfig(s *ProxyServer, dualStackSupported bool) (err error, fatal
}
// badCIDRs returns true if cidrs is a non-empty list of CIDRs, all of wrongFamily.
func badCIDRs(cidrs []string, wrongFamily netutils.IPFamily) bool {
if len(cidrs) == 0 {
// If allowUnspecified is false, unspecified addresses '0.0.0.0' and '::' will not be treated
// as part of either family.
func badCIDRs(cidrStrings []string, wrongFamily netutils.IPFamily, allowUnspecified bool) bool {
if len(cidrStrings) == 0 {
return false
}
for _, cidr := range cidrs {
if netutils.IPFamilyOfCIDRString(cidr) != wrongFamily {
for _, cidrString := range cidrStrings {
ip, cidr, _ := netutils.ParseCIDRSloppy(cidrString)
maskSize, _ := cidr.Mask.Size()
if netutils.IPFamilyOf(ip) != wrongFamily || (allowUnspecified && (ip.IsUnspecified() && maskSize == 0)) {
return false
}
}
return true
}
// badBindAddress returns true if bindAddress is an "IP:port" string where IP is a
// non-zero IP of wrongFamily.
func badBindAddress(bindAddress string, wrongFamily netutils.IPFamily) bool {
if host, _, _ := net.SplitHostPort(bindAddress); host != "" {
ip := netutils.ParseIPSloppy(host)
if ip != nil && netutils.IPFamilyOf(ip) == wrongFamily && !ip.IsUnspecified() {
return true
}
}
return false
}
// createClient creates a kube client from the given config and masterOverride.
// TODO remove masterOverride when CLI flags are removed.
func createClient(ctx context.Context, config componentbaseconfig.ClientConnectionConfiguration, masterOverride string) (clientset.Interface, error) {
@ -419,7 +412,7 @@ func serveHealthz(ctx context.Context, hz *healthcheck.ProxierHealthServer, errC
}
fn := func() {
err := hz.Run()
err := hz.Run(ctx)
if err != nil {
logger.Error(err, "Healthz server failed")
if errCh != nil {
@ -435,8 +428,9 @@ func serveHealthz(ctx context.Context, hz *healthcheck.ProxierHealthServer, errC
go wait.Until(fn, 5*time.Second, ctx.Done())
}
func serveMetrics(bindAddress string, proxyMode kubeproxyconfig.ProxyMode, enableProfiling bool, errCh chan error) {
if len(bindAddress) == 0 {
func serveMetrics(ctx context.Context, cidrStrings []string, port int32, proxyMode kubeproxyconfig.ProxyMode, enableProfiling bool, errCh chan error) {
logger := klog.FromContext(ctx)
if len(cidrStrings) == 0 {
return
}
@ -458,19 +452,47 @@ func serveMetrics(bindAddress string, proxyMode kubeproxyconfig.ProxyMode, enabl
}
configz.InstallHandler(proxyMux)
nodeIPs, err := proxyutil.FilterInterfaceAddrsByCIDRStrings(proxyutil.RealNetwork{}, cidrStrings)
if err != nil {
logger.Error(err, "failed to get node IPs for metrics server")
}
if len(nodeIPs) == 0 {
logger.Info("failed to get any node ip matching metricsBindAddresses", "metricsBindAddresses", cidrStrings)
}
var addrs []string
for _, nodeIP := range nodeIPs {
if nodeIP.IsLinkLocalUnicast() || nodeIP.IsLinkLocalMulticast() {
continue
}
addrs = append(addrs, net.JoinHostPort(nodeIP.String(), strconv.Itoa(int(port))))
}
fn := func() {
err := http.ListenAndServe(bindAddress, proxyMux)
if err != nil {
err = fmt.Errorf("starting metrics server failed: %w", err)
utilruntime.HandleError(err)
if errCh != nil {
errCh <- err
// if in hardfail mode, never retry again
blockCh := make(chan error)
<-blockCh
var err error
defer func() {
if err != nil {
err = fmt.Errorf("starting metrics server failed: %w", err)
utilruntime.HandleError(err)
if errCh != nil {
errCh <- err
// if in hardfail mode, never retry again
blockCh := make(chan error)
<-blockCh
}
}
}()
listener, err := netutils.MultiListen(ctx, "tcp", addrs...)
if err != nil {
return
}
server := &http.Server{Handler: proxyMux}
err = server.Serve(listener)
if err != nil {
return
}
}
go wait.Until(fn, 5*time.Second, wait.NeverStop)
}
@ -512,7 +534,7 @@ func (s *ProxyServer) Run(ctx context.Context) error {
serveHealthz(ctx, s.HealthzServer, healthzErrCh)
// Start up a metrics server if requested
serveMetrics(s.Config.MetricsBindAddress, s.Config.Mode, s.Config.EnableProfiling, metricsErrCh)
serveMetrics(ctx, s.Config.MetricsBindAddresses, s.Config.MetricsBindPort, s.Config.Mode, s.Config.EnableProfiling, metricsErrCh)
noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil)
if err != nil {

View File

@ -551,7 +551,8 @@ func Test_checkBadIPConfig(t *testing.T) {
name: "ok IPv4 metricsBindAddress",
proxy: &ProxyServer{
Config: &kubeproxyconfig.KubeProxyConfiguration{
MetricsBindAddress: "10.0.0.1:9999",
MetricsBindAddresses: []string{"10.0.0.0/24"},
MetricsBindPort: 9999,
},
PrimaryIPFamily: v1.IPv4Protocol,
},
@ -562,7 +563,8 @@ func Test_checkBadIPConfig(t *testing.T) {
name: "ok IPv6 metricsBindAddress",
proxy: &ProxyServer{
Config: &kubeproxyconfig.KubeProxyConfiguration{
MetricsBindAddress: "[fd01:2345::1]:9999",
MetricsBindAddresses: []string{"fd01:2345::/64"},
MetricsBindPort: 9999,
},
PrimaryIPFamily: v1.IPv6Protocol,
},
@ -573,7 +575,8 @@ func Test_checkBadIPConfig(t *testing.T) {
name: "ok unspecified wrong-family metricsBindAddress",
proxy: &ProxyServer{
Config: &kubeproxyconfig.KubeProxyConfiguration{
MetricsBindAddress: "0.0.0.0:9999",
MetricsBindAddresses: []string{"0.0.0.0/0"},
MetricsBindPort: 9999,
},
PrimaryIPFamily: v1.IPv6Protocol,
},
@ -584,7 +587,8 @@ func Test_checkBadIPConfig(t *testing.T) {
name: "wrong family metricsBindAddress",
proxy: &ProxyServer{
Config: &kubeproxyconfig.KubeProxyConfiguration{
MetricsBindAddress: "10.0.0.1:9999",
MetricsBindAddresses: []string{"10.0.0.0/24"},
MetricsBindPort: 9999,
},
PrimaryIPFamily: v1.IPv6Protocol,
},

View File

@ -97,7 +97,7 @@ func (s *ProxyServer) createProxier(ctx context.Context, config *proxyconfigapi.
s.NodeIPs,
s.Recorder,
s.HealthzServer,
config.HealthzBindAddress,
int(config.HealthzBindPort),
config.Winkernel,
)
} else {
@ -109,7 +109,7 @@ func (s *ProxyServer) createProxier(ctx context.Context, config *proxyconfigapi.
s.NodeIPs[s.PrimaryIPFamily],
s.Recorder,
s.HealthzServer,
config.HealthzBindAddress,
int(config.HealthzBindPort),
config.Winkernel,
)
}

View File

@ -91,11 +91,13 @@ func Funcs(codecs runtimeserializer.CodecFactory) []interface{} {
obj.Linux.Conntrack.TCPCloseWaitTimeout = &metav1.Duration{Duration: time.Duration(c.Int63()) * time.Hour}
obj.Linux.Conntrack.TCPEstablishedTimeout = &metav1.Duration{Duration: time.Duration(c.Int63()) * time.Hour}
obj.FeatureGates = map[string]bool{c.RandString(): true}
obj.HealthzBindAddress = fmt.Sprintf("%d.%d.%d.%d:%d", c.Intn(256), c.Intn(256), c.Intn(256), c.Intn(256), c.Intn(65536))
obj.HealthzBindAddresses = []string{fmt.Sprintf("%d.%d.%d.%d/32", c.Intn(256), c.Intn(256), c.Intn(256), c.Intn(256))}
obj.HealthzBindPort = c.Int31() % 65536
obj.IPTables.MasqueradeBit = ptr.To(c.Int31())
obj.IPTables.LocalhostNodePorts = ptr.To(c.RandBool())
obj.NFTables.MasqueradeBit = ptr.To(c.Int31())
obj.MetricsBindAddress = fmt.Sprintf("%d.%d.%d.%d:%d", c.Intn(256), c.Intn(256), c.Intn(256), c.Intn(256), c.Intn(65536))
obj.MetricsBindAddresses = []string{fmt.Sprintf("%d.%d.%d.%d/32", c.Intn(256), c.Intn(256), c.Intn(256), c.Intn(256))}
obj.MetricsBindPort = c.Int31() % 65536
obj.Linux.OOMScoreAdj = ptr.To(c.Int31())
obj.ClientConnection.ContentType = "bar"
obj.NodePortAddresses = []string{"1.2.3.0/24"}

View File

@ -183,15 +183,16 @@ type KubeProxyConfiguration struct {
// primary IP is. Note that the name is a historical artifact, and kube-proxy does
// not actually bind any sockets to this IP.
BindAddress string
// healthzBindAddress is the IP address and port for the health check server to
// serve on, defaulting to "0.0.0.0:10256" (if bindAddress is unset or IPv4), or
// "[::]:10256" (if bindAddress is IPv6).
HealthzBindAddress string
// metricsBindAddress is the IP address and port for the metrics server to serve
// on, defaulting to "127.0.0.1:10249" (if bindAddress is unset or IPv4), or
// "[::1]:10249" (if bindAddress is IPv6). (Set to "0.0.0.0:10249" / "[::]:10249"
// to bind on all interfaces.)
MetricsBindAddress string
// healthzBindAddresses is a list of CIDR ranges that contains a valid node IP on which
// the healthz server will be served on, defaulting to [ "0.0.0.0/0", "::/0" ].
HealthzBindAddresses []string
// healthzBindPort is the port on which healthz server will be exposed, defaulting to 10256.
HealthzBindPort int32
// metricsBindAddresses is a list of CIDR ranges that contains a valid node IP on which
// the metrics server will be served on, defaulting to [ "127.0.0.0/8", "::1/128" ].
MetricsBindAddresses []string
// metricsBindPort is the port on which metrics server will be exposed, defaulting to 10249.
MetricsBindPort int32
// bindAddressHardFail, if true, tells kube-proxy to treat failure to bind to a
// port as fatal and exit
BindAddressHardFail bool

View File

@ -17,11 +17,15 @@ limitations under the License.
package v1alpha1
import (
"fmt"
"net"
"strconv"
"strings"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/kube-proxy/config/v1alpha1"
"k8s.io/kubernetes/pkg/proxy/apis/config"
netutils "k8s.io/utils/net"
)
// Convert_config_KubeProxyConfiguration_To_v1alpha1_KubeProxyConfiguration is defined here, because public conversion is not auto-generated due to existing warnings.
@ -54,6 +58,15 @@ func Convert_config_KubeProxyConfiguration_To_v1alpha1_KubeProxyConfiguration(in
if len(in.DetectLocal.ClusterCIDRs) > 0 {
out.ClusterCIDR = strings.Join(in.DetectLocal.ClusterCIDRs, ",")
}
if len(in.HealthzBindAddresses) > 0 && in.HealthzBindPort > 0 {
host, _, _ := netutils.ParseCIDRSloppy(in.HealthzBindAddresses[0])
out.HealthzBindAddress = net.JoinHostPort(host.String(), strconv.Itoa(int(in.HealthzBindPort)))
}
if len(in.MetricsBindAddresses) > 0 && in.MetricsBindPort > 0 {
host, _, _ := netutils.ParseCIDRSloppy(in.MetricsBindAddresses[0])
out.MetricsBindAddress = net.JoinHostPort(host.String(), strconv.Itoa(int(in.MetricsBindPort)))
}
return nil
}
@ -87,6 +100,33 @@ func Convert_v1alpha1_KubeProxyConfiguration_To_config_KubeProxyConfiguration(in
if len(in.ClusterCIDR) > 0 {
out.DetectLocal.ClusterCIDRs = strings.Split(in.ClusterCIDR, ",")
}
var prefix int
host, portStr, _ := net.SplitHostPort(in.HealthzBindAddress)
port, _ := strconv.Atoi(portStr)
hostIP := netutils.ParseIPSloppy(host)
if hostIP.IsUnspecified() {
prefix = 0
} else if netutils.IsIPv4(hostIP) {
prefix = 32
} else {
prefix = 128
}
out.HealthzBindAddresses = []string{fmt.Sprintf("%s/%d", hostIP.String(), prefix)}
out.HealthzBindPort = int32(port)
host, portStr, _ = net.SplitHostPort(in.MetricsBindAddress)
port, _ = strconv.Atoi(portStr)
hostIP = netutils.ParseIPSloppy(host)
if hostIP.IsUnspecified() {
prefix = 0
} else if netutils.IsIPv4(hostIP) {
prefix = 32
} else {
prefix = 128
}
out.MetricsBindAddresses = []string{fmt.Sprintf("%s/%d", hostIP.String(), prefix)}
out.MetricsBindPort = int32(port)
return nil
}

View File

@ -138,8 +138,8 @@ func autoConvert_v1alpha1_KubeProxyConfiguration_To_config_KubeProxyConfiguratio
out.Logging = in.Logging
out.HostnameOverride = in.HostnameOverride
out.BindAddress = in.BindAddress
out.HealthzBindAddress = in.HealthzBindAddress
out.MetricsBindAddress = in.MetricsBindAddress
// WARNING: in.HealthzBindAddress requires manual conversion: does not exist in peer-type
// WARNING: in.MetricsBindAddress requires manual conversion: does not exist in peer-type
out.BindAddressHardFail = in.BindAddressHardFail
out.EnableProfiling = in.EnableProfiling
out.ShowHiddenMetricsForVersion = in.ShowHiddenMetricsForVersion
@ -180,8 +180,10 @@ func autoConvert_config_KubeProxyConfiguration_To_v1alpha1_KubeProxyConfiguratio
out.Logging = in.Logging
out.HostnameOverride = in.HostnameOverride
out.BindAddress = in.BindAddress
out.HealthzBindAddress = in.HealthzBindAddress
out.MetricsBindAddress = in.MetricsBindAddress
// WARNING: in.HealthzBindAddresses requires manual conversion: does not exist in peer-type
// WARNING: in.HealthzBindPort requires manual conversion: does not exist in peer-type
// WARNING: in.MetricsBindAddresses requires manual conversion: does not exist in peer-type
// WARNING: in.MetricsBindPort requires manual conversion: does not exist in peer-type
out.BindAddressHardFail = in.BindAddressHardFail
out.EnableProfiling = in.EnableProfiling
out.ShowHiddenMetricsForVersion = in.ShowHiddenMetricsForVersion

View File

@ -74,10 +74,14 @@ func Validate(config *kubeproxyconfig.KubeProxyConfiguration) field.ErrorList {
allErrs = append(allErrs, field.Invalid(newPath.Child("BindAddress"), config.BindAddress, "not a valid textual representation of an IP address"))
}
if config.HealthzBindAddress != "" {
allErrs = append(allErrs, validateHostPort(config.HealthzBindAddress, newPath.Child("HealthzBindAddress"))...)
if len(config.HealthzBindAddresses) > 0 {
allErrs = append(allErrs, validateDualStackCIDRStrings(config.HealthzBindAddresses, newPath.Child("HealthzBindAddresses"))...)
}
allErrs = append(allErrs, validateHostPort(config.MetricsBindAddress, newPath.Child("MetricsBindAddress"))...)
if config.HealthzBindPort > 0 {
allErrs = append(allErrs, validatePort(config.HealthzBindPort, newPath.Child("HealthzBindPort"))...)
}
allErrs = append(allErrs, validateDualStackCIDRStrings(config.MetricsBindAddresses, newPath.Child("MetricsBindAddresses"))...)
allErrs = append(allErrs, validatePort(config.MetricsBindPort, newPath.Child("MetricsBindPort"))...)
allErrs = append(allErrs, validateKubeProxyNodePortAddress(config.NodePortAddresses, newPath.Child("NodePortAddresses"))...)
allErrs = append(allErrs, validateShowHiddenMetricsVersion(config.ShowHiddenMetricsForVersion, newPath.Child("ShowHiddenMetricsForVersion"))...)
@ -347,3 +351,11 @@ func validateDetectLocalConfiguration(mode kubeproxyconfig.LocalMode, config kub
}
return allErrs
}
func validatePort(port int32, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
if port < 1 || port > 65535 {
allErrs = append(allErrs, field.Invalid(fldPath, port, "must be a valid port"))
}
return allErrs
}

View File

@ -33,10 +33,12 @@ import (
func TestValidateKubeProxyConfiguration(t *testing.T) {
baseConfig := &kubeproxyconfig.KubeProxyConfiguration{
BindAddress: "192.168.59.103",
HealthzBindAddress: "0.0.0.0:10256",
MetricsBindAddress: "127.0.0.1:10249",
DetectLocalMode: kubeproxyconfig.LocalModeClusterCIDR,
BindAddress: "192.168.59.103",
HealthzBindAddresses: []string{"0.0.0.0/0"},
HealthzBindPort: 10256,
MetricsBindAddresses: []string{"127.0.0.0/8"},
MetricsBindPort: 10249,
DetectLocalMode: kubeproxyconfig.LocalModeClusterCIDR,
DetectLocal: kubeproxyconfig.DetectLocalConfiguration{
ClusterCIDRs: []string{"192.168.59.0/24"},
},
@ -77,20 +79,20 @@ func TestValidateKubeProxyConfiguration(t *testing.T) {
},
"empty HealthzBindAddress": {
mutateConfigFunc: func(config *kubeproxyconfig.KubeProxyConfiguration) {
config.HealthzBindAddress = ""
config.HealthzBindAddresses = []string{}
},
},
"IPv6": {
mutateConfigFunc: func(config *kubeproxyconfig.KubeProxyConfiguration) {
config.BindAddress = "fd00:192:168:59::103"
config.HealthzBindAddress = ""
config.MetricsBindAddress = "[::1]:10249"
config.HealthzBindAddresses = []string{}
config.MetricsBindAddresses = []string{"::1/128"}
config.DetectLocal.ClusterCIDRs = []string{"fd00:192:168:59::/64"}
},
},
"alternate healthz port": {
mutateConfigFunc: func(config *kubeproxyconfig.KubeProxyConfiguration) {
config.HealthzBindAddress = "0.0.0.0:12345"
config.HealthzBindPort = 12345
},
},
"ClusterCIDR is wrong IP family": {
@ -125,17 +127,29 @@ func TestValidateKubeProxyConfiguration(t *testing.T) {
},
expectedErrs: field.ErrorList{field.Invalid(newPath.Child("BindAddress"), "10.10.12.11:2000", "not a valid textual representation of an IP address")},
},
"invalid HealthzBindAddress": {
"invalid HealthzBindAddresses": {
mutateConfigFunc: func(config *kubeproxyconfig.KubeProxyConfiguration) {
config.HealthzBindAddress = "0.0.0.0"
config.HealthzBindAddresses = []string{"0.0.0.0"}
},
expectedErrs: field.ErrorList{field.Invalid(newPath.Child("HealthzBindAddress"), "0.0.0.0", "must be IP:port")},
expectedErrs: field.ErrorList{field.Invalid(newPath.Child("HealthzBindAddresses").Index(0), "0.0.0.0", "must be a valid CIDR block (e.g. 10.100.0.0/16 or fde4:8dba:82e1::/48)")},
},
"invalid MetricsBindAddress": {
"invalid HealthzBindPort": {
mutateConfigFunc: func(config *kubeproxyconfig.KubeProxyConfiguration) {
config.MetricsBindAddress = "127.0.0.1"
config.HealthzBindPort = 1234567
},
expectedErrs: field.ErrorList{field.Invalid(newPath.Child("MetricsBindAddress"), "127.0.0.1", "must be IP:port")},
expectedErrs: field.ErrorList{field.Invalid(newPath.Child("HealthzBindPort"), int32(1234567), "must be a valid port")},
},
"invalid MetricsBindAddresses": {
mutateConfigFunc: func(config *kubeproxyconfig.KubeProxyConfiguration) {
config.MetricsBindAddresses = []string{"127.0.0.1"}
},
expectedErrs: field.ErrorList{field.Invalid(newPath.Child("MetricsBindAddresses").Index(0), "127.0.0.1", "must be a valid CIDR block (e.g. 10.100.0.0/16 or fde4:8dba:82e1::/48)")},
},
"invalid MetricsBindPort": {
mutateConfigFunc: func(config *kubeproxyconfig.KubeProxyConfiguration) {
config.MetricsBindPort = 5432100
},
expectedErrs: field.ErrorList{field.Invalid(newPath.Child("MetricsBindPort"), int32(5432100), "must be a valid port")},
},
"ConfigSyncPeriod must be > 0": {
mutateConfigFunc: func(config *kubeproxyconfig.KubeProxyConfiguration) {

View File

@ -62,6 +62,16 @@ func (in *KubeProxyConfiguration) DeepCopyInto(out *KubeProxyConfiguration) {
}
out.ClientConnection = in.ClientConnection
in.Logging.DeepCopyInto(&out.Logging)
if in.HealthzBindAddresses != nil {
in, out := &in.HealthzBindAddresses, &out.HealthzBindAddresses
*out = make([]string, len(*in))
copy(*out, *in)
}
if in.MetricsBindAddresses != nil {
in, out := &in.MetricsBindAddresses, &out.MetricsBindAddresses
*out = make([]string, len(*in))
copy(*out, *in)
}
in.IPTables.DeepCopyInto(&out.IPTables)
in.IPVS.DeepCopyInto(&out.IPVS)
out.Winkernel = in.Winkernel

View File

@ -17,22 +17,25 @@ limitations under the License.
package healthcheck
import (
"context"
"net"
"net/http"
netutils "k8s.io/utils/net"
)
// listener allows for testing of ServiceHealthServer and ProxierHealthServer.
type listener interface {
// Listen is very much like net.Listen, except the first arg (network) is
// Listen is very much like netutils.MultiListen, except the second arg (network) is
// fixed to be "tcp".
Listen(addr string) (net.Listener, error)
Listen(ctx context.Context, addrs ...string) (net.Listener, error)
}
// httpServerFactory allows for testing of ServiceHealthServer and ProxierHealthServer.
type httpServerFactory interface {
// New creates an instance of a type satisfying HTTPServer. This is
// designed to include http.Server.
New(addr string, handler http.Handler) httpServer
New(handler http.Handler) httpServer
}
// httpServer allows for testing of ServiceHealthServer and ProxierHealthServer.
@ -45,8 +48,8 @@ type httpServer interface {
// Implement listener in terms of net.Listen.
type stdNetListener struct{}
func (stdNetListener) Listen(addr string) (net.Listener, error) {
return net.Listen("tcp", addr)
func (stdNetListener) Listen(ctx context.Context, addrs ...string) (net.Listener, error) {
return netutils.MultiListen(ctx, "tcp", addrs...)
}
var _ listener = stdNetListener{}
@ -54,9 +57,8 @@ var _ listener = stdNetListener{}
// Implement httpServerFactory in terms of http.Server.
type stdHTTPServerFactory struct{}
func (stdHTTPServerFactory) New(addr string, handler http.Handler) httpServer {
func (stdHTTPServerFactory) New(handler http.Handler) httpServer {
return &http.Server{
Addr: addr,
Handler: handler,
}
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package healthcheck
import (
"context"
"encoding/json"
"net"
"net/http"
@ -26,18 +27,19 @@ import (
"time"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/component-base/metrics/testutil"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/dump"
"k8s.io/apimachinery/pkg/util/sets"
basemetrics "k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/testutil"
"k8s.io/kubernetes/pkg/proxy/metrics"
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
proxyutiltest "k8s.io/kubernetes/pkg/proxy/util/testing"
testingclock "k8s.io/utils/clock/testing"
netutils "k8s.io/utils/net"
)
type fakeListener struct {
@ -54,17 +56,17 @@ func (fake *fakeListener) hasPort(addr string) bool {
return fake.openPorts.Has(addr)
}
func (fake *fakeListener) Listen(addr string) (net.Listener, error) {
fake.openPorts.Insert(addr)
func (fake *fakeListener) Listen(_ context.Context, addrs ...string) (net.Listener, error) {
fake.openPorts.Insert(addrs...)
return &fakeNetListener{
parent: fake,
addr: addr,
addrs: addrs,
}, nil
}
type fakeNetListener struct {
parent *fakeListener
addr string
addrs []string
}
type fakeAddr struct {
@ -82,7 +84,7 @@ func (fake *fakeNetListener) Accept() (net.Conn, error) {
}
func (fake *fakeNetListener) Close() error {
fake.parent.openPorts.Delete(fake.addr)
fake.parent.openPorts.Delete(fake.addrs...)
return nil
}
@ -97,15 +99,13 @@ func newFakeHTTPServerFactory() *fakeHTTPServerFactory {
return &fakeHTTPServerFactory{}
}
func (fake *fakeHTTPServerFactory) New(addr string, handler http.Handler) httpServer {
func (fake *fakeHTTPServerFactory) New(handler http.Handler) httpServer {
return &fakeHTTPServer{
addr: addr,
handler: handler,
}
}
type fakeHTTPServer struct {
addr string
handler http.Handler
}
@ -470,8 +470,12 @@ func TestHealthzServer(t *testing.T) {
httpFactory := newFakeHTTPServerFactory()
fakeClock := testingclock.NewFakeClock(time.Now())
hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second)
server := hs.httpFactory.New(hs.addr, healthzHandler{hs: hs})
fakeInterfacer := proxyutiltest.NewFakeNetwork()
itf := net.Interface{Index: 0, MTU: 0, Name: "lo", HardwareAddr: nil, Flags: 0}
addrs := []net.Addr{&net.IPNet{IP: netutils.ParseIPSloppy("127.0.0.1"), Mask: net.CIDRMask(24, 32)}}
fakeInterfacer.AddInterfaceAddr(&itf, addrs)
hs := newProxierHealthServer(listener, httpFactory, fakeClock, fakeInterfacer, []string{"127.0.0.0/8"}, 10256, 10*time.Second)
server := hs.httpFactory.New(healthzHandler{hs: hs})
hsTest := &serverTest{
server: server,
@ -505,8 +509,12 @@ func TestLivezServer(t *testing.T) {
httpFactory := newFakeHTTPServerFactory()
fakeClock := testingclock.NewFakeClock(time.Now())
hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second)
server := hs.httpFactory.New(hs.addr, livezHandler{hs: hs})
fakeInterfacer := proxyutiltest.NewFakeNetwork()
itf := net.Interface{Index: 0, MTU: 0, Name: "lo", HardwareAddr: nil, Flags: 0}
addrs := []net.Addr{&net.IPNet{IP: netutils.ParseIPSloppy("127.0.0.1"), Mask: net.CIDRMask(24, 32)}}
fakeInterfacer.AddInterfaceAddr(&itf, addrs)
hs := newProxierHealthServer(listener, httpFactory, fakeClock, fakeInterfacer, []string{"127.0.0.0/8"}, 10256, 10*time.Second)
server := hs.httpFactory.New(livezHandler{hs: hs})
hsTest := &serverTest{
server: server,

View File

@ -17,14 +17,18 @@ limitations under the License.
package healthcheck
import (
"context"
"fmt"
"net"
"net/http"
"strconv"
"sync"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/proxy/metrics"
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
"k8s.io/utils/clock"
)
@ -46,7 +50,7 @@ type ProxierHealthServer struct {
httpFactory httpServerFactory
clock clock.Clock
addr string
addrs []string
healthTimeout time.Duration
lock sync.RWMutex
@ -56,16 +60,28 @@ type ProxierHealthServer struct {
}
// NewProxierHealthServer returns a proxier health http server.
func NewProxierHealthServer(addr string, healthTimeout time.Duration) *ProxierHealthServer {
return newProxierHealthServer(stdNetListener{}, stdHTTPServerFactory{}, clock.RealClock{}, addr, healthTimeout)
func NewProxierHealthServer(cidrStrings []string, port int32, healthTimeout time.Duration) *ProxierHealthServer {
return newProxierHealthServer(stdNetListener{}, stdHTTPServerFactory{}, clock.RealClock{}, proxyutil.RealNetwork{}, cidrStrings, port, healthTimeout)
}
func newProxierHealthServer(listener listener, httpServerFactory httpServerFactory, c clock.Clock, addr string, healthTimeout time.Duration) *ProxierHealthServer {
func newProxierHealthServer(listener listener, httpServerFactory httpServerFactory, c clock.Clock, nw proxyutil.NetworkInterfacer, cidrStrings []string, port int32, healthTimeout time.Duration) *ProxierHealthServer {
nodeIPs, err := proxyutil.FilterInterfaceAddrsByCIDRStrings(nw, cidrStrings)
if err != nil {
klog.V(3).ErrorS(err, "Failed to get node IPs for healthz server")
return nil
}
var addrs []string
for _, nodeIP := range nodeIPs {
if nodeIP.IsLinkLocalUnicast() || nodeIP.IsLinkLocalMulticast() {
continue
}
addrs = append(addrs, net.JoinHostPort(nodeIP.String(), strconv.Itoa(int(port))))
}
return &ProxierHealthServer{
listener: listener,
httpFactory: httpServerFactory,
clock: c,
addr: addr,
addrs: addrs,
healthTimeout: healthTimeout,
lastUpdatedMap: make(map[v1.IPFamily]time.Time),
@ -162,18 +178,18 @@ func (hs *ProxierHealthServer) NodeEligible() bool {
}
// Run starts the healthz HTTP server and blocks until it exits.
func (hs *ProxierHealthServer) Run() error {
func (hs *ProxierHealthServer) Run(ctx context.Context) error {
serveMux := http.NewServeMux()
serveMux.Handle("/healthz", healthzHandler{hs: hs})
serveMux.Handle("/livez", livezHandler{hs: hs})
server := hs.httpFactory.New(hs.addr, serveMux)
server := hs.httpFactory.New(serveMux)
listener, err := hs.listener.Listen(hs.addr)
listener, err := hs.listener.Listen(ctx, hs.addrs...)
if err != nil {
return fmt.Errorf("failed to start proxier healthz on %s: %v", hs.addr, err)
return fmt.Errorf("failed to start proxier healthz on %s: %w", hs.addrs, err)
}
klog.V(3).InfoS("Starting healthz HTTP server", "address", hs.addr)
klog.V(3).InfoS("Starting healthz HTTP server", "address", hs.addrs)
if err := server.Serve(listener); err != nil {
return fmt.Errorf("proxier healthz closed with error: %v", err)

View File

@ -17,6 +17,7 @@ limitations under the License.
package healthcheck
import (
"context"
"fmt"
"net"
"net/http"
@ -170,9 +171,9 @@ func (hcI *hcInstance) listenAndServeAll(hcs *server) error {
for _, ip := range hcs.nodeIPs {
addr := net.JoinHostPort(ip.String(), fmt.Sprint(hcI.port))
// create http server
httpSrv := hcs.httpFactory.New(addr, hcHandler{name: hcI.nsn, hcs: hcs})
httpSrv := hcs.httpFactory.New(hcHandler{name: hcI.nsn, hcs: hcs})
// start listener
listener, err = hcs.listener.Listen(addr)
listener, err = hcs.listener.Listen(context.TODO(), addr)
if err != nil {
// must close whatever have been previously opened
// to allow a retry/or port ownership change as needed

View File

@ -94,38 +94,7 @@ func (npa *NodePortAddresses) MatchAll() bool {
// IPs are found, it returns an empty list.
// NetworkInterfacer is injected for test purpose.
func (npa *NodePortAddresses) GetNodeIPs(nw NetworkInterfacer) ([]net.IP, error) {
addrs, err := nw.InterfaceAddrs()
if err != nil {
return nil, fmt.Errorf("error listing all interfaceAddrs from host, error: %v", err)
}
// Use a map to dedup matches
addresses := make(map[string]net.IP)
for _, cidr := range npa.cidrs {
for _, addr := range addrs {
var ip net.IP
// nw.InterfaceAddrs may return net.IPAddr or net.IPNet on windows, and it will return net.IPNet on linux.
switch v := addr.(type) {
case *net.IPAddr:
ip = v.IP
case *net.IPNet:
ip = v.IP
default:
continue
}
if cidr.Contains(ip) {
addresses[ip.String()] = ip
}
}
}
ips := make([]net.IP, 0, len(addresses))
for _, ip := range addresses {
ips = append(ips, ip)
}
return ips, nil
return FilterInterfaceAddrsByCIDRs(nw, npa.cidrs)
}
// ContainsIPv4Loopback returns true if npa's CIDRs contain an IPv4 loopback address.

View File

@ -241,3 +241,49 @@ func IsVIPMode(ing v1.LoadBalancerIngress) bool {
}
return *ing.IPMode == v1.LoadBalancerIPModeVIP
}
// FilterInterfaceAddrsByCIDRs filters the IP addresses of the provided NetworkInterfacer,
// returning only those that belong to any of the CIDRs specified in the given list.
func FilterInterfaceAddrsByCIDRs(nw NetworkInterfacer, cidrs []*net.IPNet) ([]net.IP, error) {
addrs, err := nw.InterfaceAddrs()
if err != nil {
return nil, fmt.Errorf("error listing all interfaceAddrs from host, error: %w", err)
}
// Use a map to dedup matches
addresses := make(map[string]net.IP)
for _, cidr := range cidrs {
for _, addr := range addrs {
var ip net.IP
// nw.InterfaceAddrs may return net.IPAddr or net.IPNet on windows, and it will return net.IPNet on linux.
switch v := addr.(type) {
case *net.IPAddr:
ip = v.IP
case *net.IPNet:
ip = v.IP
default:
continue
}
if cidr.Contains(ip) {
addresses[ip.String()] = ip
}
}
}
ips := make([]net.IP, 0, len(addresses))
for _, ip := range addresses {
ips = append(ips, ip)
}
return ips, nil
}
// FilterInterfaceAddrsByCIDRStrings is a wrapper around FilterInterfaceAddrsByCIDRs which accepts CIDRs as list of strings.
func FilterInterfaceAddrsByCIDRStrings(nw NetworkInterfacer, cidrStrings []string) ([]net.IP, error) {
cidrs, err := netutils.ParseCIDRs(cidrStrings)
if err != nil {
return nil, err
}
return FilterInterfaceAddrsByCIDRs(nw, cidrs)
}

View File

@ -21,9 +21,12 @@ import (
"reflect"
"testing"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
proxyutiltest "k8s.io/kubernetes/pkg/proxy/util/testing"
netutils "k8s.io/utils/net"
)
@ -703,3 +706,58 @@ func TestIsZeroCIDR(t *testing.T) {
})
}
}
func TestFilterInterfaceAddrsByCIDRs(t *testing.T) {
networkInterfacer := proxyutiltest.NewFakeNetwork()
var itf net.Interface
var addrs []net.Addr
itf = net.Interface{Index: 0, MTU: 0, Name: "eth1", HardwareAddr: nil, Flags: 0}
addrs = []net.Addr{
&net.IPNet{IP: netutils.ParseIPSloppy("10.10.10.10"), Mask: net.CIDRMask(24, 32)},
&net.IPNet{IP: netutils.ParseIPSloppy("2001:db8::1"), Mask: net.CIDRMask(64, 128)},
}
networkInterfacer.AddInterfaceAddr(&itf, addrs)
itf = net.Interface{Index: 0, MTU: 0, Name: "eth2", HardwareAddr: nil, Flags: 0}
addrs = []net.Addr{
&net.IPNet{IP: netutils.ParseIPSloppy("192.168.0.2"), Mask: net.CIDRMask(24, 32)},
&net.IPNet{IP: netutils.ParseIPSloppy("fd00:4321::2"), Mask: net.CIDRMask(64, 128)},
}
networkInterfacer.AddInterfaceAddr(&itf, addrs)
testCases := []struct {
name string
cidrStrings []string
expected []string
}{
{
name: "ipv4",
cidrStrings: []string{"192.168.0.0/24"},
expected: []string{"192.168.0.2"},
},
{
name: "ipv6",
cidrStrings: []string{"fd00:4321::/64"},
expected: []string{"fd00:4321::2"},
},
{
name: "dual stack",
cidrStrings: []string{"10.10.0.0/16", "2001:db8::/64"},
expected: []string{"10.10.10.10", "2001:db8::1"},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
cidrs, err := netutils.ParseCIDRs(tc.cidrStrings)
require.NoError(t, err)
ips, err := FilterInterfaceAddrsByCIDRs(networkInterfacer, cidrs)
require.NoError(t, err)
var expected []net.IP
for i := range tc.expected {
expected = append(expected, netutils.ParseIPSloppy(tc.expected[i]))
}
require.Equal(t, expected, ips)
})
}
}

View File

@ -677,7 +677,7 @@ func NewProxier(
nodeIP net.IP,
recorder events.EventRecorder,
healthzServer *healthcheck.ProxierHealthServer,
healthzBindAddress string,
healthzPort int,
config config.KubeProxyWinkernelConfiguration,
) (*Proxier, error) {
if nodeIP == nil {
@ -689,12 +689,6 @@ func NewProxier(
nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nil)
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer)
var healthzPort int
if len(healthzBindAddress) > 0 {
_, port, _ := net.SplitHostPort(healthzBindAddress)
healthzPort, _ = strconv.Atoi(port)
}
hcnImpl := newHcnImpl()
hns, supportedFeatures := newHostNetworkService(hcnImpl)
hnsNetworkName, err := getNetworkName(config.NetworkName)
@ -814,14 +808,14 @@ func NewDualStackProxier(
nodeIPs map[v1.IPFamily]net.IP,
recorder events.EventRecorder,
healthzServer *healthcheck.ProxierHealthServer,
healthzBindAddress string,
healthzPort int,
config config.KubeProxyWinkernelConfiguration,
) (proxy.Provider, error) {
// Create an ipv4 instance of the single-stack proxier
ipv4Proxier, err := NewProxier(v1.IPv4Protocol, syncPeriod, minSyncPeriod,
hostname, nodeIPs[v1.IPv4Protocol], recorder, healthzServer,
healthzBindAddress, config)
healthzPort, config)
if err != nil {
return nil, fmt.Errorf("unable to create ipv4 proxier: %v, hostname: %s, nodeIP:%v", err, hostname, nodeIPs[v1.IPv4Protocol])
@ -829,7 +823,7 @@ func NewDualStackProxier(
ipv6Proxier, err := NewProxier(v1.IPv6Protocol, syncPeriod, minSyncPeriod,
hostname, nodeIPs[v1.IPv6Protocol], recorder, healthzServer,
healthzBindAddress, config)
healthzPort, config)
if err != nil {
return nil, fmt.Errorf("unable to create ipv6 proxier: %v, hostname: %s, nodeIP:%v", err, hostname, nodeIPs[v1.IPv6Protocol])
}