mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 12:43:23 +00:00
Merge pull request #126010 from aroradaman/proxy-options-refactor
Kube-Proxy options refactor
This commit is contained in:
commit
d31ba91240
464
cmd/kube-proxy/app/options.go
Normal file
464
cmd/kube-proxy/app/options.go
Normal file
@ -0,0 +1,464 @@
|
||||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"github.com/spf13/pflag"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
cliflag "k8s.io/component-base/cli/flag"
|
||||
logsapi "k8s.io/component-base/logs/api/v1"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kube-proxy/config/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/cluster/ports"
|
||||
"k8s.io/kubernetes/pkg/kubelet/qos"
|
||||
kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config"
|
||||
proxyconfigscheme "k8s.io/kubernetes/pkg/proxy/apis/config/scheme"
|
||||
kubeproxyconfigv1alpha1 "k8s.io/kubernetes/pkg/proxy/apis/config/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/proxy/apis/config/validation"
|
||||
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
|
||||
"k8s.io/kubernetes/pkg/util/filesystem"
|
||||
utilflag "k8s.io/kubernetes/pkg/util/flag"
|
||||
"k8s.io/utils/ptr"
|
||||
)
|
||||
|
||||
// Options contains everything necessary to create and run a proxy server.
|
||||
type Options struct {
|
||||
// ConfigFile is the location of the proxy server's configuration file.
|
||||
ConfigFile string
|
||||
// WriteConfigTo is the path where the default configuration will be written.
|
||||
WriteConfigTo string
|
||||
// CleanupAndExit, when true, makes the proxy server clean up iptables and ipvs rules, then exit.
|
||||
CleanupAndExit bool
|
||||
// InitAndExit, when true, makes the proxy server makes configurations that need privileged access, then exit.
|
||||
InitAndExit bool
|
||||
// WindowsService should be set to true if kube-proxy is running as a service on Windows.
|
||||
// Its corresponding flag only gets registered in Windows builds
|
||||
WindowsService bool
|
||||
// config is the proxy server's configuration object.
|
||||
config *kubeproxyconfig.KubeProxyConfiguration
|
||||
// watcher is used to watch on the update change of ConfigFile
|
||||
watcher filesystem.FSWatcher
|
||||
// proxyServer is the interface to run the proxy server
|
||||
proxyServer proxyRun
|
||||
// errCh is the channel that errors will be sent
|
||||
errCh chan error
|
||||
|
||||
// The fields below here are placeholders for flags that can't be directly mapped into
|
||||
// config.KubeProxyConfiguration.
|
||||
//
|
||||
// TODO remove these fields once the deprecated flags are removed.
|
||||
|
||||
// master is used to override the kubeconfig's URL to the apiserver.
|
||||
master string
|
||||
// healthzPort is the port to be used by the healthz server.
|
||||
healthzPort int32
|
||||
// metricsPort is the port to be used by the metrics server.
|
||||
metricsPort int32
|
||||
|
||||
// hostnameOverride, if set from the command line flag, takes precedence over the `HostnameOverride` value from the config file
|
||||
hostnameOverride string
|
||||
|
||||
logger klog.Logger
|
||||
}
|
||||
|
||||
// AddFlags adds flags to fs and binds them to options.
|
||||
func (o *Options) AddFlags(fs *pflag.FlagSet) {
|
||||
o.addOSFlags(fs)
|
||||
|
||||
fs.StringVar(&o.ConfigFile, "config", o.ConfigFile, "The path to the configuration file.")
|
||||
fs.StringVar(&o.WriteConfigTo, "write-config-to", o.WriteConfigTo, "If set, write the default configuration values to this file and exit.")
|
||||
|
||||
fs.BoolVar(&o.CleanupAndExit, "cleanup", o.CleanupAndExit, "If true cleanup iptables and ipvs rules and exit.")
|
||||
|
||||
fs.Var(cliflag.NewMapStringBool(&o.config.FeatureGates), "feature-gates", "A set of key=value pairs that describe feature gates for alpha/experimental features. "+
|
||||
"Options are:\n"+strings.Join(utilfeature.DefaultFeatureGate.KnownFeatures(), "\n")+"\n"+
|
||||
"This parameter is ignored if a config file is specified by --config.")
|
||||
|
||||
fs.StringVar(&o.config.ClientConnection.Kubeconfig, "kubeconfig", o.config.ClientConnection.Kubeconfig, "Path to kubeconfig file with authorization information (the master location can be overridden by the master flag).")
|
||||
fs.StringVar(&o.master, "master", o.master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
|
||||
fs.StringVar(&o.config.ClientConnection.ContentType, "kube-api-content-type", o.config.ClientConnection.ContentType, "Content type of requests sent to apiserver.")
|
||||
fs.Int32Var(&o.config.ClientConnection.Burst, "kube-api-burst", o.config.ClientConnection.Burst, "Burst to use while talking with kubernetes apiserver")
|
||||
fs.Float32Var(&o.config.ClientConnection.QPS, "kube-api-qps", o.config.ClientConnection.QPS, "QPS to use while talking with kubernetes apiserver")
|
||||
|
||||
fs.StringVar(&o.hostnameOverride, "hostname-override", o.hostnameOverride, "If non-empty, will be used as the name of the Node that kube-proxy is running on. If unset, the node name is assumed to be the same as the node's hostname.")
|
||||
fs.Var(&utilflag.IPVar{Val: &o.config.BindAddress}, "bind-address", "Overrides kube-proxy's idea of what its node's primary IP is. Note that the name is a historical artifact, and kube-proxy does not actually bind any sockets to this IP. This parameter is ignored if a config file is specified by --config.")
|
||||
fs.Var(&utilflag.IPPortVar{Val: &o.config.HealthzBindAddress}, "healthz-bind-address", "The IP address and port for the health check server to serve on, defaulting to \"0.0.0.0:10256\". This parameter is ignored if a config file is specified by --config.")
|
||||
fs.Var(&utilflag.IPPortVar{Val: &o.config.MetricsBindAddress}, "metrics-bind-address", "The IP address and port for the metrics server to serve on, defaulting to \"127.0.0.1:10249\". (Set to \"0.0.0.0:10249\" / \"[::]:10249\" to bind on all interfaces.) Set empty to disable. This parameter is ignored if a config file is specified by --config.")
|
||||
fs.BoolVar(&o.config.BindAddressHardFail, "bind-address-hard-fail", o.config.BindAddressHardFail, "If true kube-proxy will treat failure to bind to a port as fatal and exit")
|
||||
fs.BoolVar(&o.config.EnableProfiling, "profiling", o.config.EnableProfiling, "If true enables profiling via web interface on /debug/pprof handler. This parameter is ignored if a config file is specified by --config.")
|
||||
fs.StringVar(&o.config.ShowHiddenMetricsForVersion, "show-hidden-metrics-for-version", o.config.ShowHiddenMetricsForVersion,
|
||||
"The previous version for which you want to show hidden metrics. "+
|
||||
"Only the previous minor version is meaningful, other values will not be allowed. "+
|
||||
"The format is <major>.<minor>, e.g.: '1.16'. "+
|
||||
"The purpose of this format is make sure you have the opportunity to notice if the next release hides additional metrics, "+
|
||||
"rather than being surprised when they are permanently removed in the release after that. "+
|
||||
"This parameter is ignored if a config file is specified by --config.")
|
||||
fs.BoolVar(&o.InitAndExit, "init-only", o.InitAndExit, "If true, perform any initialization steps that must be done with full root privileges, and then exit. After doing this, you can run kube-proxy again with only the CAP_NET_ADMIN capability.")
|
||||
fs.Var(&o.config.Mode, "proxy-mode", "Which proxy mode to use: on Linux this can be 'iptables' (default) or 'ipvs'. On Windows the only supported value is 'kernelspace'."+
|
||||
"This parameter is ignored if a config file is specified by --config.")
|
||||
|
||||
fs.Int32Var(o.config.IPTables.MasqueradeBit, "iptables-masquerade-bit", ptr.Deref(o.config.IPTables.MasqueradeBit, 14), "If using the iptables or ipvs proxy mode, the bit of the fwmark space to mark packets requiring SNAT with. Must be within the range [0, 31].")
|
||||
fs.BoolVar(&o.config.IPTables.MasqueradeAll, "masquerade-all", o.config.IPTables.MasqueradeAll, "If using the iptables or ipvs proxy mode, SNAT all traffic sent via Service cluster IPs. This may be required with some CNI plugins.")
|
||||
fs.BoolVar(o.config.IPTables.LocalhostNodePorts, "iptables-localhost-nodeports", ptr.Deref(o.config.IPTables.LocalhostNodePorts, true), "If false, kube-proxy will disable the legacy behavior of allowing NodePort services to be accessed via localhost. (Applies only to iptables mode and IPv4; localhost NodePorts are never allowed with other proxy modes or with IPv6.)")
|
||||
fs.DurationVar(&o.config.IPTables.SyncPeriod.Duration, "iptables-sync-period", o.config.IPTables.SyncPeriod.Duration, "An interval (e.g. '5s', '1m', '2h22m') indicating how frequently various re-synchronizing and cleanup operations are performed. Must be greater than 0.")
|
||||
fs.DurationVar(&o.config.IPTables.MinSyncPeriod.Duration, "iptables-min-sync-period", o.config.IPTables.MinSyncPeriod.Duration, "The minimum period between iptables rule resyncs (e.g. '5s', '1m', '2h22m'). A value of 0 means every Service or EndpointSlice change will result in an immediate iptables resync.")
|
||||
|
||||
fs.DurationVar(&o.config.IPVS.SyncPeriod.Duration, "ipvs-sync-period", o.config.IPVS.SyncPeriod.Duration, "An interval (e.g. '5s', '1m', '2h22m') indicating how frequently various re-synchronizing and cleanup operations are performed. Must be greater than 0.")
|
||||
fs.DurationVar(&o.config.IPVS.MinSyncPeriod.Duration, "ipvs-min-sync-period", o.config.IPVS.MinSyncPeriod.Duration, "The minimum period between IPVS rule resyncs (e.g. '5s', '1m', '2h22m'). A value of 0 means every Service or EndpointSlice change will result in an immediate IPVS resync.")
|
||||
fs.StringVar(&o.config.IPVS.Scheduler, "ipvs-scheduler", o.config.IPVS.Scheduler, "The ipvs scheduler type when proxy mode is ipvs")
|
||||
fs.StringSliceVar(&o.config.IPVS.ExcludeCIDRs, "ipvs-exclude-cidrs", o.config.IPVS.ExcludeCIDRs, "A comma-separated list of CIDRs which the ipvs proxier should not touch when cleaning up IPVS rules.")
|
||||
fs.BoolVar(&o.config.IPVS.StrictARP, "ipvs-strict-arp", o.config.IPVS.StrictARP, "Enable strict ARP by setting arp_ignore to 1 and arp_announce to 2")
|
||||
fs.DurationVar(&o.config.IPVS.TCPTimeout.Duration, "ipvs-tcp-timeout", o.config.IPVS.TCPTimeout.Duration, "The timeout for idle IPVS TCP connections, 0 to leave as-is. (e.g. '5s', '1m', '2h22m').")
|
||||
fs.DurationVar(&o.config.IPVS.TCPFinTimeout.Duration, "ipvs-tcpfin-timeout", o.config.IPVS.TCPFinTimeout.Duration, "The timeout for IPVS TCP connections after receiving a FIN packet, 0 to leave as-is. (e.g. '5s', '1m', '2h22m').")
|
||||
fs.DurationVar(&o.config.IPVS.UDPTimeout.Duration, "ipvs-udp-timeout", o.config.IPVS.UDPTimeout.Duration, "The timeout for IPVS UDP packets, 0 to leave as-is. (e.g. '5s', '1m', '2h22m').")
|
||||
|
||||
fs.Var(&o.config.DetectLocalMode, "detect-local-mode", "Mode to use to detect local traffic. This parameter is ignored if a config file is specified by --config.")
|
||||
fs.StringVar(&o.config.DetectLocal.BridgeInterface, "pod-bridge-interface", o.config.DetectLocal.BridgeInterface, "A bridge interface name. When --detect-local-mode is set to BridgeInterface, kube-proxy will consider traffic to be local if it originates from this bridge.")
|
||||
fs.StringVar(&o.config.DetectLocal.InterfaceNamePrefix, "pod-interface-name-prefix", o.config.DetectLocal.InterfaceNamePrefix, "An interface name prefix. When --detect-local-mode is set to InterfaceNamePrefix, kube-proxy will consider traffic to be local if it originates from any interface whose name begins with this prefix.")
|
||||
fs.StringVar(&o.config.ClusterCIDR, "cluster-cidr", o.config.ClusterCIDR, "The CIDR range of the pods in the cluster. (For dual-stack clusters, this can be a comma-separated dual-stack pair of CIDR ranges.). When --detect-local-mode is set to ClusterCIDR, kube-proxy will consider traffic to be local if its source IP is in this range. (Otherwise it is not used.) "+
|
||||
"This parameter is ignored if a config file is specified by --config.")
|
||||
|
||||
fs.StringSliceVar(&o.config.NodePortAddresses, "nodeport-addresses", o.config.NodePortAddresses,
|
||||
"A list of CIDR ranges that contain valid node IPs, or alternatively, the single string 'primary'. If set to a list of CIDRs, connections to NodePort services will only be accepted on node IPs in one of the indicated ranges. If set to 'primary', NodePort services will only be accepted on the node's primary IP(s) according to the Node object. If unset, NodePort connections will be accepted on all local IPs. This parameter is ignored if a config file is specified by --config.")
|
||||
|
||||
fs.Int32Var(o.config.OOMScoreAdj, "oom-score-adj", ptr.Deref(o.config.OOMScoreAdj, int32(qos.KubeProxyOOMScoreAdj)), "The oom-score-adj value for kube-proxy process. Values must be within the range [-1000, 1000]. This parameter is ignored if a config file is specified by --config.")
|
||||
fs.Int32Var(o.config.Conntrack.MaxPerCore, "conntrack-max-per-core", *o.config.Conntrack.MaxPerCore,
|
||||
"Maximum number of NAT connections to track per CPU core (0 to leave the limit as-is and ignore conntrack-min).")
|
||||
fs.Int32Var(o.config.Conntrack.Min, "conntrack-min", *o.config.Conntrack.Min,
|
||||
"Minimum number of conntrack entries to allocate, regardless of conntrack-max-per-core (set conntrack-max-per-core=0 to leave the limit as-is).")
|
||||
|
||||
fs.DurationVar(&o.config.Conntrack.TCPEstablishedTimeout.Duration, "conntrack-tcp-timeout-established", o.config.Conntrack.TCPEstablishedTimeout.Duration, "Idle timeout for established TCP connections (0 to leave as-is)")
|
||||
fs.DurationVar(
|
||||
&o.config.Conntrack.TCPCloseWaitTimeout.Duration, "conntrack-tcp-timeout-close-wait",
|
||||
o.config.Conntrack.TCPCloseWaitTimeout.Duration,
|
||||
"NAT timeout for TCP connections in the CLOSE_WAIT state")
|
||||
fs.BoolVar(&o.config.Conntrack.TCPBeLiberal, "conntrack-tcp-be-liberal", o.config.Conntrack.TCPBeLiberal, "Enable liberal mode for tracking TCP packets by setting nf_conntrack_tcp_be_liberal to 1")
|
||||
fs.DurationVar(&o.config.Conntrack.UDPTimeout.Duration, "conntrack-udp-timeout", o.config.Conntrack.UDPTimeout.Duration, "Idle timeout for UNREPLIED UDP connections (0 to leave as-is)")
|
||||
fs.DurationVar(&o.config.Conntrack.UDPStreamTimeout.Duration, "conntrack-udp-timeout-stream", o.config.Conntrack.UDPStreamTimeout.Duration, "Idle timeout for ASSURED UDP connections (0 to leave as-is)")
|
||||
|
||||
fs.DurationVar(&o.config.ConfigSyncPeriod.Duration, "config-sync-period", o.config.ConfigSyncPeriod.Duration, "How often configuration from the apiserver is refreshed. Must be greater than 0.")
|
||||
|
||||
fs.Int32Var(&o.healthzPort, "healthz-port", o.healthzPort, "The port to bind the health check server. Use 0 to disable.")
|
||||
_ = fs.MarkDeprecated("healthz-port", "This flag is deprecated and will be removed in a future release. Please use --healthz-bind-address instead.")
|
||||
fs.Int32Var(&o.metricsPort, "metrics-port", o.metricsPort, "The port to bind the metrics server. Use 0 to disable.")
|
||||
_ = fs.MarkDeprecated("metrics-port", "This flag is deprecated and will be removed in a future release. Please use --metrics-bind-address instead.")
|
||||
fs.Var(utilflag.PortRangeVar{Val: &o.config.PortRange}, "proxy-port-range", "This was previously used to configure the userspace proxy, but is now unused.")
|
||||
_ = fs.MarkDeprecated("proxy-port-range", "This flag has no effect and will be removed in a future release.")
|
||||
|
||||
logsapi.AddFlags(&o.config.Logging, fs)
|
||||
}
|
||||
|
||||
// newKubeProxyConfiguration returns a KubeProxyConfiguration with default values
|
||||
func newKubeProxyConfiguration() *kubeproxyconfig.KubeProxyConfiguration {
|
||||
versionedConfig := &v1alpha1.KubeProxyConfiguration{}
|
||||
proxyconfigscheme.Scheme.Default(versionedConfig)
|
||||
internalConfig, err := proxyconfigscheme.Scheme.ConvertToVersion(versionedConfig, kubeproxyconfig.SchemeGroupVersion)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Unable to create default config: %v", err))
|
||||
}
|
||||
|
||||
return internalConfig.(*kubeproxyconfig.KubeProxyConfiguration)
|
||||
}
|
||||
|
||||
// NewOptions returns initialized Options
|
||||
func NewOptions() *Options {
|
||||
return &Options{
|
||||
config: newKubeProxyConfiguration(),
|
||||
healthzPort: ports.ProxyHealthzPort,
|
||||
metricsPort: ports.ProxyStatusPort,
|
||||
errCh: make(chan error),
|
||||
logger: klog.FromContext(context.Background()),
|
||||
}
|
||||
}
|
||||
|
||||
// Complete completes all the required options.
|
||||
func (o *Options) Complete(fs *pflag.FlagSet) error {
|
||||
if len(o.ConfigFile) == 0 && len(o.WriteConfigTo) == 0 {
|
||||
o.config.HealthzBindAddress = addressFromDeprecatedFlags(o.config.HealthzBindAddress, o.healthzPort)
|
||||
o.config.MetricsBindAddress = addressFromDeprecatedFlags(o.config.MetricsBindAddress, o.metricsPort)
|
||||
}
|
||||
|
||||
// Load the config file here in Complete, so that Validate validates the fully-resolved config.
|
||||
if len(o.ConfigFile) > 0 {
|
||||
c, err := o.loadConfigFromFile(o.ConfigFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Before we overwrite the config which holds the parsed
|
||||
// command line parameters, we need to copy all modified
|
||||
// logging settings over to the loaded config (i.e. logging
|
||||
// command line flags have priority). Otherwise `--config
|
||||
// ... -v=5` doesn't work (config resets verbosity even
|
||||
// when it contains no logging settings).
|
||||
_ = copyLogsFromFlags(fs, &c.Logging)
|
||||
o.config = c
|
||||
|
||||
if err := o.initWatcher(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
o.platformApplyDefaults(o.config)
|
||||
|
||||
if err := o.processHostnameOverrideFlag(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return utilfeature.DefaultMutableFeatureGate.SetFromMap(o.config.FeatureGates)
|
||||
}
|
||||
|
||||
// copyLogsFromFlags applies the logging flags from the given flag set to the given
|
||||
// configuration. Fields for which the corresponding flag was not used are left
|
||||
// unmodified. For fields that have multiple values (like vmodule), the values from
|
||||
// the flags get joined so that the command line flags have priority.
|
||||
//
|
||||
// TODO (pohly): move this to logsapi
|
||||
func copyLogsFromFlags(from *pflag.FlagSet, to *logsapi.LoggingConfiguration) error {
|
||||
var cloneFS pflag.FlagSet
|
||||
logsapi.AddFlags(to, &cloneFS)
|
||||
vmodule := to.VModule
|
||||
to.VModule = nil
|
||||
var err error
|
||||
cloneFS.VisitAll(func(f *pflag.Flag) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
fsFlag := from.Lookup(f.Name)
|
||||
if fsFlag == nil {
|
||||
err = fmt.Errorf("logging flag %s not found in flag set", f.Name)
|
||||
return
|
||||
}
|
||||
if !fsFlag.Changed {
|
||||
return
|
||||
}
|
||||
if setErr := f.Value.Set(fsFlag.Value.String()); setErr != nil {
|
||||
err = fmt.Errorf("copying flag %s value: %w", f.Name, setErr)
|
||||
return
|
||||
}
|
||||
})
|
||||
to.VModule = append(to.VModule, vmodule...)
|
||||
return err
|
||||
}
|
||||
|
||||
// Creates a new filesystem watcher and adds watches for the config file.
|
||||
func (o *Options) initWatcher() error {
|
||||
fswatcher := filesystem.NewFsnotifyWatcher()
|
||||
err := fswatcher.Init(o.eventHandler, o.errorHandler)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = fswatcher.AddWatch(o.ConfigFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
o.watcher = fswatcher
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o *Options) eventHandler(ent fsnotify.Event) {
|
||||
if ent.Has(fsnotify.Write) || ent.Has(fsnotify.Rename) {
|
||||
// error out when ConfigFile is updated
|
||||
o.errCh <- fmt.Errorf("content of the proxy server's configuration file was updated")
|
||||
return
|
||||
}
|
||||
o.errCh <- nil
|
||||
}
|
||||
|
||||
func (o *Options) errorHandler(err error) {
|
||||
o.errCh <- err
|
||||
}
|
||||
|
||||
// processHostnameOverrideFlag processes hostname-override flag
|
||||
func (o *Options) processHostnameOverrideFlag() error {
|
||||
// Check if hostname-override flag is set and use value since configFile always overrides
|
||||
if len(o.hostnameOverride) > 0 {
|
||||
hostName := strings.TrimSpace(o.hostnameOverride)
|
||||
if len(hostName) == 0 {
|
||||
return fmt.Errorf("empty hostname-override is invalid")
|
||||
}
|
||||
o.config.HostnameOverride = strings.ToLower(hostName)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Validate validates all the required options.
|
||||
func (o *Options) Validate() error {
|
||||
if errs := validation.Validate(o.config); len(errs) != 0 {
|
||||
return errs.ToAggregate()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Run runs the specified ProxyServer.
|
||||
func (o *Options) Run(ctx context.Context) error {
|
||||
defer close(o.errCh)
|
||||
if len(o.WriteConfigTo) > 0 {
|
||||
return o.writeConfigFile()
|
||||
}
|
||||
|
||||
err := platformCleanup(ctx, o.config.Mode, o.CleanupAndExit)
|
||||
if o.CleanupAndExit {
|
||||
return err
|
||||
}
|
||||
// We ignore err otherwise; the cleanup is best-effort, and the backends will have
|
||||
// logged messages if they failed in interesting ways.
|
||||
|
||||
proxyServer, err := newProxyServer(ctx, o.config, o.master, o.InitAndExit)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if o.InitAndExit {
|
||||
return nil
|
||||
}
|
||||
|
||||
o.proxyServer = proxyServer
|
||||
return o.runLoop(ctx)
|
||||
}
|
||||
|
||||
// runLoop will watch on the update change of the proxy server's configuration file.
|
||||
// Return an error when updated
|
||||
func (o *Options) runLoop(ctx context.Context) error {
|
||||
if o.watcher != nil {
|
||||
o.watcher.Run()
|
||||
}
|
||||
|
||||
// run the proxy in goroutine
|
||||
go func() {
|
||||
err := o.proxyServer.Run(ctx)
|
||||
o.errCh <- err
|
||||
}()
|
||||
|
||||
for {
|
||||
err := <-o.errCh
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (o *Options) writeConfigFile() (err error) {
|
||||
const mediaType = runtime.ContentTypeYAML
|
||||
info, ok := runtime.SerializerInfoForMediaType(proxyconfigscheme.Codecs.SupportedMediaTypes(), mediaType)
|
||||
if !ok {
|
||||
return fmt.Errorf("unable to locate encoder -- %q is not a supported media type", mediaType)
|
||||
}
|
||||
|
||||
encoder := proxyconfigscheme.Codecs.EncoderForVersion(info.Serializer, v1alpha1.SchemeGroupVersion)
|
||||
|
||||
configFile, err := os.Create(o.WriteConfigTo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
ferr := configFile.Close()
|
||||
if ferr != nil && err == nil {
|
||||
err = ferr
|
||||
}
|
||||
}()
|
||||
|
||||
if err = encoder.Encode(o.config, configFile); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
o.logger.Info("Wrote configuration", "file", o.WriteConfigTo)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// addressFromDeprecatedFlags returns server address from flags
|
||||
// passed on the command line based on the following rules:
|
||||
// 1. If port is 0, disable the server (e.g. set address to empty).
|
||||
// 2. Otherwise, set the port portion of the config accordingly.
|
||||
func addressFromDeprecatedFlags(addr string, port int32) string {
|
||||
if port == 0 {
|
||||
return ""
|
||||
}
|
||||
return proxyutil.AppendPortIfNeeded(addr, port)
|
||||
}
|
||||
|
||||
// newLenientSchemeAndCodecs returns a scheme that has only v1alpha1 registered into
|
||||
// it and a CodecFactory with strict decoding disabled.
|
||||
func newLenientSchemeAndCodecs() (*runtime.Scheme, *serializer.CodecFactory, error) {
|
||||
lenientScheme := runtime.NewScheme()
|
||||
if err := kubeproxyconfig.AddToScheme(lenientScheme); err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to add kube-proxy config API to lenient scheme: %w", err)
|
||||
}
|
||||
if err := kubeproxyconfigv1alpha1.AddToScheme(lenientScheme); err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to add kube-proxy config v1alpha1 API to lenient scheme: %w", err)
|
||||
}
|
||||
lenientCodecs := serializer.NewCodecFactory(lenientScheme, serializer.DisableStrict)
|
||||
return lenientScheme, &lenientCodecs, nil
|
||||
}
|
||||
|
||||
// loadConfigFromFile loads the contents of file and decodes it as a
|
||||
// KubeProxyConfiguration object.
|
||||
func (o *Options) loadConfigFromFile(file string) (*kubeproxyconfig.KubeProxyConfiguration, error) {
|
||||
data, err := os.ReadFile(file)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return o.loadConfig(data)
|
||||
}
|
||||
|
||||
// loadConfig decodes a serialized KubeProxyConfiguration to the internal type.
|
||||
func (o *Options) loadConfig(data []byte) (*kubeproxyconfig.KubeProxyConfiguration, error) {
|
||||
|
||||
configObj, gvk, err := proxyconfigscheme.Codecs.UniversalDecoder().Decode(data, nil, nil)
|
||||
if err != nil {
|
||||
// Try strict decoding first. If that fails decode with a lenient
|
||||
// decoder, which has only v1alpha1 registered, and log a warning.
|
||||
// The lenient path is to be dropped when support for v1alpha1 is dropped.
|
||||
if !runtime.IsStrictDecodingError(err) {
|
||||
return nil, fmt.Errorf("failed to decode: %w", err)
|
||||
}
|
||||
|
||||
_, lenientCodecs, lenientErr := newLenientSchemeAndCodecs()
|
||||
if lenientErr != nil {
|
||||
return nil, lenientErr
|
||||
}
|
||||
|
||||
configObj, gvk, lenientErr = lenientCodecs.UniversalDecoder().Decode(data, nil, nil)
|
||||
if lenientErr != nil {
|
||||
// Lenient decoding failed with the current version, return the
|
||||
// original strict error.
|
||||
return nil, fmt.Errorf("failed lenient decoding: %w", err)
|
||||
}
|
||||
|
||||
// Continue with the v1alpha1 object that was decoded leniently, but emit a warning.
|
||||
o.logger.Info("Using lenient decoding as strict decoding failed", "err", err)
|
||||
}
|
||||
|
||||
proxyConfig, ok := configObj.(*kubeproxyconfig.KubeProxyConfiguration)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("got unexpected config type: %v", gvk)
|
||||
}
|
||||
return proxyConfig, nil
|
||||
}
|
571
cmd/kube-proxy/app/options_test.go
Normal file
571
cmd/kube-proxy/app/options_test.go
Normal file
@ -0,0 +1,571 @@
|
||||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package app
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/spf13/pflag"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
componentbaseconfig "k8s.io/component-base/config"
|
||||
logsapi "k8s.io/component-base/logs/api/v1"
|
||||
kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config"
|
||||
"k8s.io/utils/ptr"
|
||||
)
|
||||
|
||||
// TestLoadConfig tests proper operation of loadConfig()
|
||||
func TestLoadConfig(t *testing.T) {
|
||||
|
||||
yamlTemplate := `apiVersion: kubeproxy.config.k8s.io/v1alpha1
|
||||
bindAddress: %s
|
||||
clientConnection:
|
||||
acceptContentTypes: "abc"
|
||||
burst: 100
|
||||
contentType: content-type
|
||||
kubeconfig: "/path/to/kubeconfig"
|
||||
qps: 7
|
||||
clusterCIDR: "%s"
|
||||
configSyncPeriod: 15s
|
||||
conntrack:
|
||||
maxPerCore: 2
|
||||
min: 1
|
||||
tcpCloseWaitTimeout: 10s
|
||||
tcpEstablishedTimeout: 20s
|
||||
healthzBindAddress: "%s"
|
||||
hostnameOverride: "foo"
|
||||
iptables:
|
||||
masqueradeAll: true
|
||||
masqueradeBit: 17
|
||||
minSyncPeriod: 10s
|
||||
syncPeriod: 60s
|
||||
localhostNodePorts: true
|
||||
ipvs:
|
||||
minSyncPeriod: 10s
|
||||
syncPeriod: 60s
|
||||
excludeCIDRs:
|
||||
- "10.20.30.40/16"
|
||||
- "fd00:1::0/64"
|
||||
nftables:
|
||||
masqueradeAll: true
|
||||
masqueradeBit: 18
|
||||
minSyncPeriod: 10s
|
||||
syncPeriod: 60s
|
||||
kind: KubeProxyConfiguration
|
||||
metricsBindAddress: "%s"
|
||||
mode: "%s"
|
||||
oomScoreAdj: 17
|
||||
portRange: "2-7"
|
||||
detectLocalMode: "ClusterCIDR"
|
||||
detectLocal:
|
||||
bridgeInterface: "cbr0"
|
||||
interfaceNamePrefix: "veth"
|
||||
nodePortAddresses:
|
||||
- "10.20.30.40/16"
|
||||
- "fd00:1::0/64"
|
||||
`
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
mode string
|
||||
bindAddress string
|
||||
clusterCIDR string
|
||||
healthzBindAddress string
|
||||
metricsBindAddress string
|
||||
extraConfig string
|
||||
}{
|
||||
{
|
||||
name: "iptables mode, IPv4 all-zeros bind address",
|
||||
mode: "iptables",
|
||||
bindAddress: "0.0.0.0",
|
||||
clusterCIDR: "1.2.3.0/24",
|
||||
healthzBindAddress: "1.2.3.4:12345",
|
||||
metricsBindAddress: "2.3.4.5:23456",
|
||||
},
|
||||
{
|
||||
name: "iptables mode, non-zeros IPv4 config",
|
||||
mode: "iptables",
|
||||
bindAddress: "9.8.7.6",
|
||||
clusterCIDR: "1.2.3.0/24",
|
||||
healthzBindAddress: "1.2.3.4:12345",
|
||||
metricsBindAddress: "2.3.4.5:23456",
|
||||
},
|
||||
{
|
||||
// Test for 'bindAddress: "::"' (IPv6 all-zeros) in kube-proxy
|
||||
// config file. The user will need to put quotes around '::' since
|
||||
// 'bindAddress: ::' is invalid yaml syntax.
|
||||
name: "iptables mode, IPv6 \"::\" bind address",
|
||||
mode: "iptables",
|
||||
bindAddress: "\"::\"",
|
||||
clusterCIDR: "fd00:1::0/64",
|
||||
healthzBindAddress: "[fd00:1::5]:12345",
|
||||
metricsBindAddress: "[fd00:2::5]:23456",
|
||||
},
|
||||
{
|
||||
// Test for 'bindAddress: "[::]"' (IPv6 all-zeros in brackets)
|
||||
// in kube-proxy config file. The user will need to use
|
||||
// surrounding quotes here since 'bindAddress: [::]' is invalid
|
||||
// yaml syntax.
|
||||
name: "iptables mode, IPv6 \"[::]\" bind address",
|
||||
mode: "iptables",
|
||||
bindAddress: "\"[::]\"",
|
||||
clusterCIDR: "fd00:1::0/64",
|
||||
healthzBindAddress: "[fd00:1::5]:12345",
|
||||
metricsBindAddress: "[fd00:2::5]:23456",
|
||||
},
|
||||
{
|
||||
// Test for 'bindAddress: ::0' (another form of IPv6 all-zeros).
|
||||
// No surrounding quotes are required around '::0'.
|
||||
name: "iptables mode, IPv6 ::0 bind address",
|
||||
mode: "iptables",
|
||||
bindAddress: "::0",
|
||||
clusterCIDR: "fd00:1::0/64",
|
||||
healthzBindAddress: "[fd00:1::5]:12345",
|
||||
metricsBindAddress: "[fd00:2::5]:23456",
|
||||
},
|
||||
{
|
||||
name: "ipvs mode, IPv6 config",
|
||||
mode: "ipvs",
|
||||
bindAddress: "2001:db8::1",
|
||||
clusterCIDR: "fd00:1::0/64",
|
||||
healthzBindAddress: "[fd00:1::5]:12345",
|
||||
metricsBindAddress: "[fd00:2::5]:23456",
|
||||
},
|
||||
{
|
||||
// Test for unknown field within config.
|
||||
// For v1alpha1 a lenient path is implemented and will throw a
|
||||
// strict decoding warning instead of failing to load
|
||||
name: "unknown field",
|
||||
mode: "iptables",
|
||||
bindAddress: "9.8.7.6",
|
||||
clusterCIDR: "1.2.3.0/24",
|
||||
healthzBindAddress: "1.2.3.4:12345",
|
||||
metricsBindAddress: "2.3.4.5:23456",
|
||||
extraConfig: "foo: bar",
|
||||
},
|
||||
{
|
||||
// Test for duplicate field within config.
|
||||
// For v1alpha1 a lenient path is implemented and will throw a
|
||||
// strict decoding warning instead of failing to load
|
||||
name: "duplicate field",
|
||||
mode: "iptables",
|
||||
bindAddress: "9.8.7.6",
|
||||
clusterCIDR: "1.2.3.0/24",
|
||||
healthzBindAddress: "1.2.3.4:12345",
|
||||
metricsBindAddress: "2.3.4.5:23456",
|
||||
extraConfig: "bindAddress: 9.8.7.6",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
expBindAddr := tc.bindAddress
|
||||
if tc.bindAddress[0] == '"' {
|
||||
// Surrounding double quotes will get stripped by the yaml parser.
|
||||
expBindAddr = expBindAddr[1 : len(tc.bindAddress)-1]
|
||||
}
|
||||
expected := &kubeproxyconfig.KubeProxyConfiguration{
|
||||
BindAddress: expBindAddr,
|
||||
ClientConnection: componentbaseconfig.ClientConnectionConfiguration{
|
||||
AcceptContentTypes: "abc",
|
||||
Burst: 100,
|
||||
ContentType: "content-type",
|
||||
Kubeconfig: "/path/to/kubeconfig",
|
||||
QPS: 7,
|
||||
},
|
||||
ClusterCIDR: tc.clusterCIDR,
|
||||
ConfigSyncPeriod: metav1.Duration{Duration: 15 * time.Second},
|
||||
Conntrack: kubeproxyconfig.KubeProxyConntrackConfiguration{
|
||||
MaxPerCore: ptr.To[int32](2),
|
||||
Min: ptr.To[int32](1),
|
||||
TCPCloseWaitTimeout: &metav1.Duration{Duration: 10 * time.Second},
|
||||
TCPEstablishedTimeout: &metav1.Duration{Duration: 20 * time.Second},
|
||||
},
|
||||
FeatureGates: map[string]bool{},
|
||||
HealthzBindAddress: tc.healthzBindAddress,
|
||||
HostnameOverride: "foo",
|
||||
IPTables: kubeproxyconfig.KubeProxyIPTablesConfiguration{
|
||||
MasqueradeAll: true,
|
||||
MasqueradeBit: ptr.To[int32](17),
|
||||
LocalhostNodePorts: ptr.To(true),
|
||||
MinSyncPeriod: metav1.Duration{Duration: 10 * time.Second},
|
||||
SyncPeriod: metav1.Duration{Duration: 60 * time.Second},
|
||||
},
|
||||
IPVS: kubeproxyconfig.KubeProxyIPVSConfiguration{
|
||||
MinSyncPeriod: metav1.Duration{Duration: 10 * time.Second},
|
||||
SyncPeriod: metav1.Duration{Duration: 60 * time.Second},
|
||||
ExcludeCIDRs: []string{"10.20.30.40/16", "fd00:1::0/64"},
|
||||
},
|
||||
NFTables: kubeproxyconfig.KubeProxyNFTablesConfiguration{
|
||||
MasqueradeAll: true,
|
||||
MasqueradeBit: ptr.To[int32](18),
|
||||
MinSyncPeriod: metav1.Duration{Duration: 10 * time.Second},
|
||||
SyncPeriod: metav1.Duration{Duration: 60 * time.Second},
|
||||
},
|
||||
MetricsBindAddress: tc.metricsBindAddress,
|
||||
Mode: kubeproxyconfig.ProxyMode(tc.mode),
|
||||
OOMScoreAdj: ptr.To[int32](17),
|
||||
PortRange: "2-7",
|
||||
NodePortAddresses: []string{"10.20.30.40/16", "fd00:1::0/64"},
|
||||
DetectLocalMode: kubeproxyconfig.LocalModeClusterCIDR,
|
||||
DetectLocal: kubeproxyconfig.DetectLocalConfiguration{
|
||||
BridgeInterface: "cbr0",
|
||||
InterfaceNamePrefix: "veth",
|
||||
},
|
||||
Logging: logsapi.LoggingConfiguration{
|
||||
Format: "text",
|
||||
FlushFrequency: logsapi.TimeOrMetaDuration{Duration: metav1.Duration{Duration: 5 * time.Second}, SerializeAsString: true},
|
||||
},
|
||||
}
|
||||
|
||||
options := NewOptions()
|
||||
|
||||
baseYAML := fmt.Sprintf(
|
||||
yamlTemplate, tc.bindAddress, tc.clusterCIDR,
|
||||
tc.healthzBindAddress, tc.metricsBindAddress, tc.mode)
|
||||
|
||||
// Append additional configuration to the base yaml template
|
||||
yaml := fmt.Sprintf("%s\n%s", baseYAML, tc.extraConfig)
|
||||
|
||||
config, err := options.loadConfig([]byte(yaml))
|
||||
|
||||
require.NoError(t, err, "unexpected error for %s: %v", tc.name, err)
|
||||
|
||||
if diff := cmp.Diff(config, expected); diff != "" {
|
||||
t.Fatalf("unexpected config, diff = %s", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestLoadConfigFailures tests failure modes for loadConfig()
|
||||
func TestLoadConfigFailures(t *testing.T) {
|
||||
// TODO(phenixblue): Uncomment below template when v1alpha2+ of kube-proxy config is
|
||||
// released with strict decoding. These associated tests will fail with
|
||||
// the lenient codec and only one config API version.
|
||||
/*
|
||||
yamlTemplate := `bindAddress: 0.0.0.0
|
||||
clusterCIDR: "1.2.3.0/24"
|
||||
configSyncPeriod: 15s
|
||||
kind: KubeProxyConfiguration`
|
||||
*/
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
config string
|
||||
expErr string
|
||||
checkFn func(err error) bool
|
||||
}{
|
||||
{
|
||||
name: "Decode error test",
|
||||
config: "Twas bryllyg, and ye slythy toves",
|
||||
expErr: "could not find expected ':'",
|
||||
},
|
||||
{
|
||||
name: "Bad config type test",
|
||||
config: "kind: KubeSchedulerConfiguration",
|
||||
expErr: "no kind",
|
||||
},
|
||||
{
|
||||
name: "Missing quotes around :: bindAddress",
|
||||
config: "bindAddress: ::",
|
||||
expErr: "mapping values are not allowed in this context",
|
||||
},
|
||||
// TODO(phenixblue): Uncomment below tests when v1alpha2+ of kube-proxy config is
|
||||
// released with strict decoding. These tests will fail with the
|
||||
// lenient codec and only one config API version.
|
||||
/*
|
||||
{
|
||||
name: "Duplicate fields",
|
||||
config: fmt.Sprintf("%s\nbindAddress: 1.2.3.4", yamlTemplate),
|
||||
checkFn: kuberuntime.IsStrictDecodingError,
|
||||
},
|
||||
{
|
||||
name: "Unknown field",
|
||||
config: fmt.Sprintf("%s\nfoo: bar", yamlTemplate),
|
||||
checkFn: kuberuntime.IsStrictDecodingError,
|
||||
},
|
||||
*/
|
||||
}
|
||||
|
||||
version := "apiVersion: kubeproxy.config.k8s.io/v1alpha1"
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
options := NewOptions()
|
||||
config := fmt.Sprintf("%s\n%s", version, tc.config)
|
||||
_, err := options.loadConfig([]byte(config))
|
||||
|
||||
require.Error(t, err, tc.name)
|
||||
require.Contains(t, err.Error(), tc.expErr)
|
||||
|
||||
if tc.checkFn != nil {
|
||||
require.True(t, tc.checkFn(err), tc.name)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestProcessHostnameOverrideFlag tests processing hostname-override arg
|
||||
func TestProcessHostnameOverrideFlag(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
hostnameOverrideFlag string
|
||||
expectedHostname string
|
||||
expectError bool
|
||||
}{
|
||||
{
|
||||
name: "Hostname from config file",
|
||||
hostnameOverrideFlag: "",
|
||||
expectedHostname: "foo",
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
name: "Hostname from flag",
|
||||
hostnameOverrideFlag: " bar ",
|
||||
expectedHostname: "bar",
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
name: "Hostname is space",
|
||||
hostnameOverrideFlag: " ",
|
||||
expectError: true,
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
options := NewOptions()
|
||||
options.config = &kubeproxyconfig.KubeProxyConfiguration{
|
||||
HostnameOverride: "foo",
|
||||
}
|
||||
|
||||
options.hostnameOverride = tc.hostnameOverrideFlag
|
||||
|
||||
err := options.processHostnameOverrideFlag()
|
||||
if tc.expectError {
|
||||
if err == nil {
|
||||
t.Fatalf("should error for this case %s", tc.name)
|
||||
}
|
||||
} else {
|
||||
require.NoError(t, err, "unexpected error %v", err)
|
||||
if tc.expectedHostname != options.config.HostnameOverride {
|
||||
t.Fatalf("expected hostname: %s, but got: %s", tc.expectedHostname, options.config.HostnameOverride)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestOptionsComplete checks that command line flags are combined with a
|
||||
// config properly.
|
||||
func TestOptionsComplete(t *testing.T) {
|
||||
header := `apiVersion: kubeproxy.config.k8s.io/v1alpha1
|
||||
kind: KubeProxyConfiguration
|
||||
`
|
||||
|
||||
// Determine default config (depends on platform defaults).
|
||||
o := NewOptions()
|
||||
require.NoError(t, o.Complete(new(pflag.FlagSet)))
|
||||
expected := o.config
|
||||
|
||||
config := header + `logging:
|
||||
format: json
|
||||
flushFrequency: 1s
|
||||
verbosity: 10
|
||||
vmodule:
|
||||
- filePattern: foo.go
|
||||
verbosity: 6
|
||||
- filePattern: bar.go
|
||||
verbosity: 8
|
||||
`
|
||||
expectedLoggingConfig := logsapi.LoggingConfiguration{
|
||||
Format: "json",
|
||||
FlushFrequency: logsapi.TimeOrMetaDuration{Duration: metav1.Duration{Duration: time.Second}, SerializeAsString: true},
|
||||
Verbosity: 10,
|
||||
VModule: []logsapi.VModuleItem{
|
||||
{
|
||||
FilePattern: "foo.go",
|
||||
Verbosity: 6,
|
||||
},
|
||||
{
|
||||
FilePattern: "bar.go",
|
||||
Verbosity: 8,
|
||||
},
|
||||
},
|
||||
Options: logsapi.FormatOptions{
|
||||
JSON: logsapi.JSONOptions{
|
||||
OutputRoutingOptions: logsapi.OutputRoutingOptions{
|
||||
InfoBufferSize: resource.QuantityValue{Quantity: resource.MustParse("0")},
|
||||
},
|
||||
},
|
||||
Text: logsapi.TextOptions{
|
||||
OutputRoutingOptions: logsapi.OutputRoutingOptions{
|
||||
InfoBufferSize: resource.QuantityValue{Quantity: resource.MustParse("0")},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range map[string]struct {
|
||||
config string
|
||||
flags []string
|
||||
expected *kubeproxyconfig.KubeProxyConfiguration
|
||||
}{
|
||||
"empty": {
|
||||
expected: expected,
|
||||
},
|
||||
"empty-config": {
|
||||
config: header,
|
||||
expected: expected,
|
||||
},
|
||||
"logging-config": {
|
||||
config: config,
|
||||
expected: func() *kubeproxyconfig.KubeProxyConfiguration {
|
||||
c := expected.DeepCopy()
|
||||
c.Logging = *expectedLoggingConfig.DeepCopy()
|
||||
return c
|
||||
}(),
|
||||
},
|
||||
"flags": {
|
||||
flags: []string{
|
||||
"-v=7",
|
||||
"--vmodule", "goo.go=8",
|
||||
},
|
||||
expected: func() *kubeproxyconfig.KubeProxyConfiguration {
|
||||
c := expected.DeepCopy()
|
||||
c.Logging.Verbosity = 7
|
||||
c.Logging.VModule = append(c.Logging.VModule, logsapi.VModuleItem{
|
||||
FilePattern: "goo.go",
|
||||
Verbosity: 8,
|
||||
})
|
||||
return c
|
||||
}(),
|
||||
},
|
||||
"both": {
|
||||
config: config,
|
||||
flags: []string{
|
||||
"-v=7",
|
||||
"--vmodule", "goo.go=8",
|
||||
"--ipvs-scheduler", "some-scheduler", // Overwritten by config.
|
||||
},
|
||||
expected: func() *kubeproxyconfig.KubeProxyConfiguration {
|
||||
c := expected.DeepCopy()
|
||||
c.Logging = *expectedLoggingConfig.DeepCopy()
|
||||
// Flag wins.
|
||||
c.Logging.Verbosity = 7
|
||||
// Flag and config get merged with command line flags first.
|
||||
c.Logging.VModule = append([]logsapi.VModuleItem{
|
||||
{
|
||||
FilePattern: "goo.go",
|
||||
Verbosity: 8,
|
||||
},
|
||||
}, c.Logging.VModule...)
|
||||
return c
|
||||
}(),
|
||||
},
|
||||
} {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
options := NewOptions()
|
||||
fs := new(pflag.FlagSet)
|
||||
options.AddFlags(fs)
|
||||
flags := tc.flags
|
||||
if len(tc.config) > 0 {
|
||||
tmp := t.TempDir()
|
||||
configFile := path.Join(tmp, "kube-proxy.conf")
|
||||
require.NoError(t, os.WriteFile(configFile, []byte(tc.config), 0666))
|
||||
flags = append(flags, "--config", configFile)
|
||||
}
|
||||
require.NoError(t, fs.Parse(flags))
|
||||
require.NoError(t, options.Complete(fs))
|
||||
require.Equal(t, tc.expected, options.config)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddressFromDeprecatedFlags(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
healthzPort int32
|
||||
healthzBindAddress string
|
||||
metricsPort int32
|
||||
metricsBindAddress string
|
||||
expHealthz string
|
||||
expMetrics string
|
||||
}{
|
||||
{
|
||||
name: "IPv4 bind address",
|
||||
healthzBindAddress: "1.2.3.4",
|
||||
healthzPort: 12345,
|
||||
metricsBindAddress: "2.3.4.5",
|
||||
metricsPort: 23456,
|
||||
expHealthz: "1.2.3.4:12345",
|
||||
expMetrics: "2.3.4.5:23456",
|
||||
},
|
||||
{
|
||||
name: "IPv4 bind address has port",
|
||||
healthzBindAddress: "1.2.3.4:12345",
|
||||
healthzPort: 23456,
|
||||
metricsBindAddress: "2.3.4.5:12345",
|
||||
metricsPort: 23456,
|
||||
expHealthz: "1.2.3.4:12345",
|
||||
expMetrics: "2.3.4.5:12345",
|
||||
},
|
||||
{
|
||||
name: "IPv6 bind address",
|
||||
healthzBindAddress: "fd00:1::5",
|
||||
healthzPort: 12345,
|
||||
metricsBindAddress: "fd00:1::6",
|
||||
metricsPort: 23456,
|
||||
expHealthz: "[fd00:1::5]:12345",
|
||||
expMetrics: "[fd00:1::6]:23456",
|
||||
},
|
||||
{
|
||||
name: "IPv6 bind address has port",
|
||||
healthzBindAddress: "[fd00:1::5]:12345",
|
||||
healthzPort: 56789,
|
||||
metricsBindAddress: "[fd00:1::6]:56789",
|
||||
metricsPort: 12345,
|
||||
expHealthz: "[fd00:1::5]:12345",
|
||||
expMetrics: "[fd00:1::6]:56789",
|
||||
},
|
||||
{
|
||||
name: "Invalid IPv6 Config",
|
||||
healthzBindAddress: "[fd00:1::5]",
|
||||
healthzPort: 12345,
|
||||
metricsBindAddress: "[fd00:1::6]",
|
||||
metricsPort: 56789,
|
||||
expHealthz: "[fd00:1::5]",
|
||||
expMetrics: "[fd00:1::6]",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
gotHealthz := addressFromDeprecatedFlags(tc.healthzBindAddress, tc.healthzPort)
|
||||
gotMetrics := addressFromDeprecatedFlags(tc.metricsBindAddress, tc.metricsPort)
|
||||
|
||||
require.Equal(t, tc.expHealthz, gotHealthz)
|
||||
require.Equal(t, tc.expMetrics, gotMetrics)
|
||||
})
|
||||
}
|
||||
}
|
@ -28,16 +28,12 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/pflag"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
"k8s.io/apimachinery/pkg/selection"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
@ -67,27 +63,19 @@ import (
|
||||
"k8s.io/component-base/version/verflag"
|
||||
nodeutil "k8s.io/component-helpers/node/util"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kube-proxy/config/v1alpha1"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
"k8s.io/kubernetes/pkg/cluster/ports"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/kubelet/qos"
|
||||
"k8s.io/kubernetes/pkg/proxy"
|
||||
"k8s.io/kubernetes/pkg/proxy/apis"
|
||||
kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config"
|
||||
proxyconfigscheme "k8s.io/kubernetes/pkg/proxy/apis/config/scheme"
|
||||
kubeproxyconfigv1alpha1 "k8s.io/kubernetes/pkg/proxy/apis/config/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/proxy/apis/config/validation"
|
||||
"k8s.io/kubernetes/pkg/proxy/config"
|
||||
"k8s.io/kubernetes/pkg/proxy/healthcheck"
|
||||
proxymetrics "k8s.io/kubernetes/pkg/proxy/metrics"
|
||||
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
|
||||
"k8s.io/kubernetes/pkg/util/filesystem"
|
||||
utilflag "k8s.io/kubernetes/pkg/util/flag"
|
||||
utilnode "k8s.io/kubernetes/pkg/util/node"
|
||||
"k8s.io/kubernetes/pkg/util/oom"
|
||||
netutils "k8s.io/utils/net"
|
||||
"k8s.io/utils/ptr"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -100,425 +88,6 @@ type proxyRun interface {
|
||||
Run(ctx context.Context) error
|
||||
}
|
||||
|
||||
// Options contains everything necessary to create and run a proxy server.
|
||||
type Options struct {
|
||||
// ConfigFile is the location of the proxy server's configuration file.
|
||||
ConfigFile string
|
||||
// WriteConfigTo is the path where the default configuration will be written.
|
||||
WriteConfigTo string
|
||||
// CleanupAndExit, when true, makes the proxy server clean up iptables and ipvs rules, then exit.
|
||||
CleanupAndExit bool
|
||||
// InitAndExit, when true, makes the proxy server makes configurations that need privileged access, then exit.
|
||||
InitAndExit bool
|
||||
// WindowsService should be set to true if kube-proxy is running as a service on Windows.
|
||||
// Its corresponding flag only gets registered in Windows builds
|
||||
WindowsService bool
|
||||
// config is the proxy server's configuration object.
|
||||
config *kubeproxyconfig.KubeProxyConfiguration
|
||||
// watcher is used to watch on the update change of ConfigFile
|
||||
watcher filesystem.FSWatcher
|
||||
// proxyServer is the interface to run the proxy server
|
||||
proxyServer proxyRun
|
||||
// errCh is the channel that errors will be sent
|
||||
errCh chan error
|
||||
|
||||
// The fields below here are placeholders for flags that can't be directly mapped into
|
||||
// config.KubeProxyConfiguration.
|
||||
//
|
||||
// TODO remove these fields once the deprecated flags are removed.
|
||||
|
||||
// master is used to override the kubeconfig's URL to the apiserver.
|
||||
master string
|
||||
// healthzPort is the port to be used by the healthz server.
|
||||
healthzPort int32
|
||||
// metricsPort is the port to be used by the metrics server.
|
||||
metricsPort int32
|
||||
|
||||
// hostnameOverride, if set from the command line flag, takes precedence over the `HostnameOverride` value from the config file
|
||||
hostnameOverride string
|
||||
|
||||
logger klog.Logger
|
||||
}
|
||||
|
||||
// AddFlags adds flags to fs and binds them to options.
|
||||
func (o *Options) AddFlags(fs *pflag.FlagSet) {
|
||||
o.addOSFlags(fs)
|
||||
|
||||
fs.StringVar(&o.ConfigFile, "config", o.ConfigFile, "The path to the configuration file.")
|
||||
fs.StringVar(&o.WriteConfigTo, "write-config-to", o.WriteConfigTo, "If set, write the default configuration values to this file and exit.")
|
||||
|
||||
fs.BoolVar(&o.CleanupAndExit, "cleanup", o.CleanupAndExit, "If true cleanup iptables and ipvs rules and exit.")
|
||||
|
||||
fs.Var(cliflag.NewMapStringBool(&o.config.FeatureGates), "feature-gates", "A set of key=value pairs that describe feature gates for alpha/experimental features. "+
|
||||
"Options are:\n"+strings.Join(utilfeature.DefaultFeatureGate.KnownFeatures(), "\n")+"\n"+
|
||||
"This parameter is ignored if a config file is specified by --config.")
|
||||
|
||||
fs.StringVar(&o.config.ClientConnection.Kubeconfig, "kubeconfig", o.config.ClientConnection.Kubeconfig, "Path to kubeconfig file with authorization information (the master location can be overridden by the master flag).")
|
||||
fs.StringVar(&o.master, "master", o.master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
|
||||
fs.StringVar(&o.config.ClientConnection.ContentType, "kube-api-content-type", o.config.ClientConnection.ContentType, "Content type of requests sent to apiserver.")
|
||||
fs.Int32Var(&o.config.ClientConnection.Burst, "kube-api-burst", o.config.ClientConnection.Burst, "Burst to use while talking with kubernetes apiserver")
|
||||
fs.Float32Var(&o.config.ClientConnection.QPS, "kube-api-qps", o.config.ClientConnection.QPS, "QPS to use while talking with kubernetes apiserver")
|
||||
|
||||
fs.StringVar(&o.hostnameOverride, "hostname-override", o.hostnameOverride, "If non-empty, will be used as the name of the Node that kube-proxy is running on. If unset, the node name is assumed to be the same as the node's hostname.")
|
||||
fs.Var(&utilflag.IPVar{Val: &o.config.BindAddress}, "bind-address", "Overrides kube-proxy's idea of what its node's primary IP is. Note that the name is a historical artifact, and kube-proxy does not actually bind any sockets to this IP. This parameter is ignored if a config file is specified by --config.")
|
||||
fs.Var(&utilflag.IPPortVar{Val: &o.config.HealthzBindAddress}, "healthz-bind-address", "The IP address and port for the health check server to serve on, defaulting to \"0.0.0.0:10256\". This parameter is ignored if a config file is specified by --config.")
|
||||
fs.Var(&utilflag.IPPortVar{Val: &o.config.MetricsBindAddress}, "metrics-bind-address", "The IP address and port for the metrics server to serve on, defaulting to \"127.0.0.1:10249\". (Set to \"0.0.0.0:10249\" / \"[::]:10249\" to bind on all interfaces.) Set empty to disable. This parameter is ignored if a config file is specified by --config.")
|
||||
fs.BoolVar(&o.config.BindAddressHardFail, "bind-address-hard-fail", o.config.BindAddressHardFail, "If true kube-proxy will treat failure to bind to a port as fatal and exit")
|
||||
fs.BoolVar(&o.config.EnableProfiling, "profiling", o.config.EnableProfiling, "If true enables profiling via web interface on /debug/pprof handler. This parameter is ignored if a config file is specified by --config.")
|
||||
fs.StringVar(&o.config.ShowHiddenMetricsForVersion, "show-hidden-metrics-for-version", o.config.ShowHiddenMetricsForVersion,
|
||||
"The previous version for which you want to show hidden metrics. "+
|
||||
"Only the previous minor version is meaningful, other values will not be allowed. "+
|
||||
"The format is <major>.<minor>, e.g.: '1.16'. "+
|
||||
"The purpose of this format is make sure you have the opportunity to notice if the next release hides additional metrics, "+
|
||||
"rather than being surprised when they are permanently removed in the release after that. "+
|
||||
"This parameter is ignored if a config file is specified by --config.")
|
||||
fs.BoolVar(&o.InitAndExit, "init-only", o.InitAndExit, "If true, perform any initialization steps that must be done with full root privileges, and then exit. After doing this, you can run kube-proxy again with only the CAP_NET_ADMIN capability.")
|
||||
fs.Var(&o.config.Mode, "proxy-mode", "Which proxy mode to use: on Linux this can be 'iptables' (default) or 'ipvs'. On Windows the only supported value is 'kernelspace'."+
|
||||
"This parameter is ignored if a config file is specified by --config.")
|
||||
|
||||
fs.Int32Var(o.config.IPTables.MasqueradeBit, "iptables-masquerade-bit", ptr.Deref(o.config.IPTables.MasqueradeBit, 14), "If using the iptables or ipvs proxy mode, the bit of the fwmark space to mark packets requiring SNAT with. Must be within the range [0, 31].")
|
||||
fs.BoolVar(&o.config.IPTables.MasqueradeAll, "masquerade-all", o.config.IPTables.MasqueradeAll, "If using the iptables or ipvs proxy mode, SNAT all traffic sent via Service cluster IPs. This may be required with some CNI plugins.")
|
||||
fs.BoolVar(o.config.IPTables.LocalhostNodePorts, "iptables-localhost-nodeports", ptr.Deref(o.config.IPTables.LocalhostNodePorts, true), "If false, kube-proxy will disable the legacy behavior of allowing NodePort services to be accessed via localhost. (Applies only to iptables mode and IPv4; localhost NodePorts are never allowed with other proxy modes or with IPv6.)")
|
||||
fs.DurationVar(&o.config.IPTables.SyncPeriod.Duration, "iptables-sync-period", o.config.IPTables.SyncPeriod.Duration, "An interval (e.g. '5s', '1m', '2h22m') indicating how frequently various re-synchronizing and cleanup operations are performed. Must be greater than 0.")
|
||||
fs.DurationVar(&o.config.IPTables.MinSyncPeriod.Duration, "iptables-min-sync-period", o.config.IPTables.MinSyncPeriod.Duration, "The minimum period between iptables rule resyncs (e.g. '5s', '1m', '2h22m'). A value of 0 means every Service or EndpointSlice change will result in an immediate iptables resync.")
|
||||
|
||||
fs.DurationVar(&o.config.IPVS.SyncPeriod.Duration, "ipvs-sync-period", o.config.IPVS.SyncPeriod.Duration, "An interval (e.g. '5s', '1m', '2h22m') indicating how frequently various re-synchronizing and cleanup operations are performed. Must be greater than 0.")
|
||||
fs.DurationVar(&o.config.IPVS.MinSyncPeriod.Duration, "ipvs-min-sync-period", o.config.IPVS.MinSyncPeriod.Duration, "The minimum period between IPVS rule resyncs (e.g. '5s', '1m', '2h22m'). A value of 0 means every Service or EndpointSlice change will result in an immediate IPVS resync.")
|
||||
fs.StringVar(&o.config.IPVS.Scheduler, "ipvs-scheduler", o.config.IPVS.Scheduler, "The ipvs scheduler type when proxy mode is ipvs")
|
||||
fs.StringSliceVar(&o.config.IPVS.ExcludeCIDRs, "ipvs-exclude-cidrs", o.config.IPVS.ExcludeCIDRs, "A comma-separated list of CIDRs which the ipvs proxier should not touch when cleaning up IPVS rules.")
|
||||
fs.BoolVar(&o.config.IPVS.StrictARP, "ipvs-strict-arp", o.config.IPVS.StrictARP, "Enable strict ARP by setting arp_ignore to 1 and arp_announce to 2")
|
||||
fs.DurationVar(&o.config.IPVS.TCPTimeout.Duration, "ipvs-tcp-timeout", o.config.IPVS.TCPTimeout.Duration, "The timeout for idle IPVS TCP connections, 0 to leave as-is. (e.g. '5s', '1m', '2h22m').")
|
||||
fs.DurationVar(&o.config.IPVS.TCPFinTimeout.Duration, "ipvs-tcpfin-timeout", o.config.IPVS.TCPFinTimeout.Duration, "The timeout for IPVS TCP connections after receiving a FIN packet, 0 to leave as-is. (e.g. '5s', '1m', '2h22m').")
|
||||
fs.DurationVar(&o.config.IPVS.UDPTimeout.Duration, "ipvs-udp-timeout", o.config.IPVS.UDPTimeout.Duration, "The timeout for IPVS UDP packets, 0 to leave as-is. (e.g. '5s', '1m', '2h22m').")
|
||||
|
||||
fs.Var(&o.config.DetectLocalMode, "detect-local-mode", "Mode to use to detect local traffic. This parameter is ignored if a config file is specified by --config.")
|
||||
fs.StringVar(&o.config.DetectLocal.BridgeInterface, "pod-bridge-interface", o.config.DetectLocal.BridgeInterface, "A bridge interface name. When --detect-local-mode is set to BridgeInterface, kube-proxy will consider traffic to be local if it originates from this bridge.")
|
||||
fs.StringVar(&o.config.DetectLocal.InterfaceNamePrefix, "pod-interface-name-prefix", o.config.DetectLocal.InterfaceNamePrefix, "An interface name prefix. When --detect-local-mode is set to InterfaceNamePrefix, kube-proxy will consider traffic to be local if it originates from any interface whose name begins with this prefix.")
|
||||
fs.StringVar(&o.config.ClusterCIDR, "cluster-cidr", o.config.ClusterCIDR, "The CIDR range of the pods in the cluster. (For dual-stack clusters, this can be a comma-separated dual-stack pair of CIDR ranges.). When --detect-local-mode is set to ClusterCIDR, kube-proxy will consider traffic to be local if its source IP is in this range. (Otherwise it is not used.) "+
|
||||
"This parameter is ignored if a config file is specified by --config.")
|
||||
|
||||
fs.StringSliceVar(&o.config.NodePortAddresses, "nodeport-addresses", o.config.NodePortAddresses,
|
||||
"A list of CIDR ranges that contain valid node IPs, or alternatively, the single string 'primary'. If set to a list of CIDRs, connections to NodePort services will only be accepted on node IPs in one of the indicated ranges. If set to 'primary', NodePort services will only be accepted on the node's primary IP(s) according to the Node object. If unset, NodePort connections will be accepted on all local IPs. This parameter is ignored if a config file is specified by --config.")
|
||||
|
||||
fs.Int32Var(o.config.OOMScoreAdj, "oom-score-adj", ptr.Deref(o.config.OOMScoreAdj, int32(qos.KubeProxyOOMScoreAdj)), "The oom-score-adj value for kube-proxy process. Values must be within the range [-1000, 1000]. This parameter is ignored if a config file is specified by --config.")
|
||||
fs.Int32Var(o.config.Conntrack.MaxPerCore, "conntrack-max-per-core", *o.config.Conntrack.MaxPerCore,
|
||||
"Maximum number of NAT connections to track per CPU core (0 to leave the limit as-is and ignore conntrack-min).")
|
||||
fs.Int32Var(o.config.Conntrack.Min, "conntrack-min", *o.config.Conntrack.Min,
|
||||
"Minimum number of conntrack entries to allocate, regardless of conntrack-max-per-core (set conntrack-max-per-core=0 to leave the limit as-is).")
|
||||
|
||||
fs.DurationVar(&o.config.Conntrack.TCPEstablishedTimeout.Duration, "conntrack-tcp-timeout-established", o.config.Conntrack.TCPEstablishedTimeout.Duration, "Idle timeout for established TCP connections (0 to leave as-is)")
|
||||
fs.DurationVar(
|
||||
&o.config.Conntrack.TCPCloseWaitTimeout.Duration, "conntrack-tcp-timeout-close-wait",
|
||||
o.config.Conntrack.TCPCloseWaitTimeout.Duration,
|
||||
"NAT timeout for TCP connections in the CLOSE_WAIT state")
|
||||
fs.BoolVar(&o.config.Conntrack.TCPBeLiberal, "conntrack-tcp-be-liberal", o.config.Conntrack.TCPBeLiberal, "Enable liberal mode for tracking TCP packets by setting nf_conntrack_tcp_be_liberal to 1")
|
||||
fs.DurationVar(&o.config.Conntrack.UDPTimeout.Duration, "conntrack-udp-timeout", o.config.Conntrack.UDPTimeout.Duration, "Idle timeout for UNREPLIED UDP connections (0 to leave as-is)")
|
||||
fs.DurationVar(&o.config.Conntrack.UDPStreamTimeout.Duration, "conntrack-udp-timeout-stream", o.config.Conntrack.UDPStreamTimeout.Duration, "Idle timeout for ASSURED UDP connections (0 to leave as-is)")
|
||||
|
||||
fs.DurationVar(&o.config.ConfigSyncPeriod.Duration, "config-sync-period", o.config.ConfigSyncPeriod.Duration, "How often configuration from the apiserver is refreshed. Must be greater than 0.")
|
||||
|
||||
fs.Int32Var(&o.healthzPort, "healthz-port", o.healthzPort, "The port to bind the health check server. Use 0 to disable.")
|
||||
_ = fs.MarkDeprecated("healthz-port", "This flag is deprecated and will be removed in a future release. Please use --healthz-bind-address instead.")
|
||||
fs.Int32Var(&o.metricsPort, "metrics-port", o.metricsPort, "The port to bind the metrics server. Use 0 to disable.")
|
||||
_ = fs.MarkDeprecated("metrics-port", "This flag is deprecated and will be removed in a future release. Please use --metrics-bind-address instead.")
|
||||
fs.Var(utilflag.PortRangeVar{Val: &o.config.PortRange}, "proxy-port-range", "This was previously used to configure the userspace proxy, but is now unused.")
|
||||
_ = fs.MarkDeprecated("proxy-port-range", "This flag has no effect and will be removed in a future release.")
|
||||
|
||||
logsapi.AddFlags(&o.config.Logging, fs)
|
||||
}
|
||||
|
||||
// newKubeProxyConfiguration returns a KubeProxyConfiguration with default values
|
||||
func newKubeProxyConfiguration() *kubeproxyconfig.KubeProxyConfiguration {
|
||||
versionedConfig := &v1alpha1.KubeProxyConfiguration{}
|
||||
proxyconfigscheme.Scheme.Default(versionedConfig)
|
||||
internalConfig, err := proxyconfigscheme.Scheme.ConvertToVersion(versionedConfig, kubeproxyconfig.SchemeGroupVersion)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Unable to create default config: %v", err))
|
||||
}
|
||||
|
||||
return internalConfig.(*kubeproxyconfig.KubeProxyConfiguration)
|
||||
}
|
||||
|
||||
// NewOptions returns initialized Options
|
||||
func NewOptions() *Options {
|
||||
return &Options{
|
||||
config: newKubeProxyConfiguration(),
|
||||
healthzPort: ports.ProxyHealthzPort,
|
||||
metricsPort: ports.ProxyStatusPort,
|
||||
errCh: make(chan error),
|
||||
logger: klog.FromContext(context.Background()),
|
||||
}
|
||||
}
|
||||
|
||||
// Complete completes all the required options.
|
||||
func (o *Options) Complete(fs *pflag.FlagSet) error {
|
||||
if len(o.ConfigFile) == 0 && len(o.WriteConfigTo) == 0 {
|
||||
o.config.HealthzBindAddress = addressFromDeprecatedFlags(o.config.HealthzBindAddress, o.healthzPort)
|
||||
o.config.MetricsBindAddress = addressFromDeprecatedFlags(o.config.MetricsBindAddress, o.metricsPort)
|
||||
}
|
||||
|
||||
// Load the config file here in Complete, so that Validate validates the fully-resolved config.
|
||||
if len(o.ConfigFile) > 0 {
|
||||
c, err := o.loadConfigFromFile(o.ConfigFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Before we overwrite the config which holds the parsed
|
||||
// command line parameters, we need to copy all modified
|
||||
// logging settings over to the loaded config (i.e. logging
|
||||
// command line flags have priority). Otherwise `--config
|
||||
// ... -v=5` doesn't work (config resets verbosity even
|
||||
// when it contains no logging settings).
|
||||
copyLogsFromFlags(fs, &c.Logging)
|
||||
o.config = c
|
||||
|
||||
if err := o.initWatcher(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
o.platformApplyDefaults(o.config)
|
||||
|
||||
if err := o.processHostnameOverrideFlag(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return utilfeature.DefaultMutableFeatureGate.SetFromMap(o.config.FeatureGates)
|
||||
}
|
||||
|
||||
// copyLogsFromFlags applies the logging flags from the given flag set to the given
|
||||
// configuration. Fields for which the corresponding flag was not used are left
|
||||
// unmodified. For fields that have multiple values (like vmodule), the values from
|
||||
// the flags get joined so that the command line flags have priority.
|
||||
//
|
||||
// TODO (pohly): move this to logsapi
|
||||
func copyLogsFromFlags(from *pflag.FlagSet, to *logsapi.LoggingConfiguration) error {
|
||||
var cloneFS pflag.FlagSet
|
||||
logsapi.AddFlags(to, &cloneFS)
|
||||
vmodule := to.VModule
|
||||
to.VModule = nil
|
||||
var err error
|
||||
cloneFS.VisitAll(func(f *pflag.Flag) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
fsFlag := from.Lookup(f.Name)
|
||||
if fsFlag == nil {
|
||||
err = fmt.Errorf("logging flag %s not found in flag set", f.Name)
|
||||
return
|
||||
}
|
||||
if !fsFlag.Changed {
|
||||
return
|
||||
}
|
||||
if setErr := f.Value.Set(fsFlag.Value.String()); setErr != nil {
|
||||
err = fmt.Errorf("copying flag %s value: %v", f.Name, setErr)
|
||||
return
|
||||
}
|
||||
})
|
||||
to.VModule = append(to.VModule, vmodule...)
|
||||
return err
|
||||
}
|
||||
|
||||
// Creates a new filesystem watcher and adds watches for the config file.
|
||||
func (o *Options) initWatcher() error {
|
||||
fswatcher := filesystem.NewFsnotifyWatcher()
|
||||
err := fswatcher.Init(o.eventHandler, o.errorHandler)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = fswatcher.AddWatch(o.ConfigFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
o.watcher = fswatcher
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o *Options) eventHandler(ent fsnotify.Event) {
|
||||
if ent.Has(fsnotify.Write) || ent.Has(fsnotify.Rename) {
|
||||
// error out when ConfigFile is updated
|
||||
o.errCh <- fmt.Errorf("content of the proxy server's configuration file was updated")
|
||||
return
|
||||
}
|
||||
o.errCh <- nil
|
||||
}
|
||||
|
||||
func (o *Options) errorHandler(err error) {
|
||||
o.errCh <- err
|
||||
}
|
||||
|
||||
// processHostnameOverrideFlag processes hostname-override flag
|
||||
func (o *Options) processHostnameOverrideFlag() error {
|
||||
// Check if hostname-override flag is set and use value since configFile always overrides
|
||||
if len(o.hostnameOverride) > 0 {
|
||||
hostName := strings.TrimSpace(o.hostnameOverride)
|
||||
if len(hostName) == 0 {
|
||||
return fmt.Errorf("empty hostname-override is invalid")
|
||||
}
|
||||
o.config.HostnameOverride = strings.ToLower(hostName)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Validate validates all the required options.
|
||||
func (o *Options) Validate() error {
|
||||
if errs := validation.Validate(o.config); len(errs) != 0 {
|
||||
return errs.ToAggregate()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Run runs the specified ProxyServer.
|
||||
func (o *Options) Run(ctx context.Context) error {
|
||||
defer close(o.errCh)
|
||||
if len(o.WriteConfigTo) > 0 {
|
||||
return o.writeConfigFile()
|
||||
}
|
||||
|
||||
err := platformCleanup(ctx, o.config.Mode, o.CleanupAndExit)
|
||||
if o.CleanupAndExit {
|
||||
return err
|
||||
}
|
||||
// We ignore err otherwise; the cleanup is best-effort, and the backends will have
|
||||
// logged messages if they failed in interesting ways.
|
||||
|
||||
proxyServer, err := newProxyServer(ctx, o.config, o.master, o.InitAndExit)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if o.InitAndExit {
|
||||
return nil
|
||||
}
|
||||
|
||||
o.proxyServer = proxyServer
|
||||
return o.runLoop(ctx)
|
||||
}
|
||||
|
||||
// runLoop will watch on the update change of the proxy server's configuration file.
|
||||
// Return an error when updated
|
||||
func (o *Options) runLoop(ctx context.Context) error {
|
||||
if o.watcher != nil {
|
||||
o.watcher.Run()
|
||||
}
|
||||
|
||||
// run the proxy in goroutine
|
||||
go func() {
|
||||
err := o.proxyServer.Run(ctx)
|
||||
o.errCh <- err
|
||||
}()
|
||||
|
||||
for {
|
||||
err := <-o.errCh
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (o *Options) writeConfigFile() (err error) {
|
||||
const mediaType = runtime.ContentTypeYAML
|
||||
info, ok := runtime.SerializerInfoForMediaType(proxyconfigscheme.Codecs.SupportedMediaTypes(), mediaType)
|
||||
if !ok {
|
||||
return fmt.Errorf("unable to locate encoder -- %q is not a supported media type", mediaType)
|
||||
}
|
||||
|
||||
encoder := proxyconfigscheme.Codecs.EncoderForVersion(info.Serializer, v1alpha1.SchemeGroupVersion)
|
||||
|
||||
configFile, err := os.Create(o.WriteConfigTo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
ferr := configFile.Close()
|
||||
if ferr != nil && err == nil {
|
||||
err = ferr
|
||||
}
|
||||
}()
|
||||
|
||||
if err = encoder.Encode(o.config, configFile); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
o.logger.Info("Wrote configuration", "file", o.WriteConfigTo)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// addressFromDeprecatedFlags returns server address from flags
|
||||
// passed on the command line based on the following rules:
|
||||
// 1. If port is 0, disable the server (e.g. set address to empty).
|
||||
// 2. Otherwise, set the port portion of the config accordingly.
|
||||
func addressFromDeprecatedFlags(addr string, port int32) string {
|
||||
if port == 0 {
|
||||
return ""
|
||||
}
|
||||
return proxyutil.AppendPortIfNeeded(addr, port)
|
||||
}
|
||||
|
||||
// newLenientSchemeAndCodecs returns a scheme that has only v1alpha1 registered into
|
||||
// it and a CodecFactory with strict decoding disabled.
|
||||
func newLenientSchemeAndCodecs() (*runtime.Scheme, *serializer.CodecFactory, error) {
|
||||
lenientScheme := runtime.NewScheme()
|
||||
if err := kubeproxyconfig.AddToScheme(lenientScheme); err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to add kube-proxy config API to lenient scheme: %v", err)
|
||||
}
|
||||
if err := kubeproxyconfigv1alpha1.AddToScheme(lenientScheme); err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to add kube-proxy config v1alpha1 API to lenient scheme: %v", err)
|
||||
}
|
||||
lenientCodecs := serializer.NewCodecFactory(lenientScheme, serializer.DisableStrict)
|
||||
return lenientScheme, &lenientCodecs, nil
|
||||
}
|
||||
|
||||
// loadConfigFromFile loads the contents of file and decodes it as a
|
||||
// KubeProxyConfiguration object.
|
||||
func (o *Options) loadConfigFromFile(file string) (*kubeproxyconfig.KubeProxyConfiguration, error) {
|
||||
data, err := os.ReadFile(file)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return o.loadConfig(data)
|
||||
}
|
||||
|
||||
// loadConfig decodes a serialized KubeProxyConfiguration to the internal type.
|
||||
func (o *Options) loadConfig(data []byte) (*kubeproxyconfig.KubeProxyConfiguration, error) {
|
||||
|
||||
configObj, gvk, err := proxyconfigscheme.Codecs.UniversalDecoder().Decode(data, nil, nil)
|
||||
if err != nil {
|
||||
// Try strict decoding first. If that fails decode with a lenient
|
||||
// decoder, which has only v1alpha1 registered, and log a warning.
|
||||
// The lenient path is to be dropped when support for v1alpha1 is dropped.
|
||||
if !runtime.IsStrictDecodingError(err) {
|
||||
return nil, fmt.Errorf("failed to decode: %w", err)
|
||||
}
|
||||
|
||||
_, lenientCodecs, lenientErr := newLenientSchemeAndCodecs()
|
||||
if lenientErr != nil {
|
||||
return nil, lenientErr
|
||||
}
|
||||
|
||||
configObj, gvk, lenientErr = lenientCodecs.UniversalDecoder().Decode(data, nil, nil)
|
||||
if lenientErr != nil {
|
||||
// Lenient decoding failed with the current version, return the
|
||||
// original strict error.
|
||||
return nil, fmt.Errorf("failed lenient decoding: %v", err)
|
||||
}
|
||||
|
||||
// Continue with the v1alpha1 object that was decoded leniently, but emit a warning.
|
||||
o.logger.Info("Using lenient decoding as strict decoding failed", "err", err)
|
||||
}
|
||||
|
||||
proxyConfig, ok := configObj.(*kubeproxyconfig.KubeProxyConfiguration)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("got unexpected config type: %v", gvk)
|
||||
}
|
||||
return proxyConfig, nil
|
||||
}
|
||||
|
||||
// NewProxyCommand creates a *cobra.Command object with default parameters
|
||||
func NewProxyCommand() *cobra.Command {
|
||||
opts := NewOptions()
|
||||
@ -545,7 +114,7 @@ with the apiserver API to configure the proxy.`,
|
||||
|
||||
logs.InitLogs()
|
||||
if err := logsapi.ValidateAndApplyAsField(&opts.config.Logging, utilfeature.DefaultFeatureGate, field.NewPath("logging")); err != nil {
|
||||
return fmt.Errorf("initialize logging: %v", err)
|
||||
return fmt.Errorf("initialize logging: %w", err)
|
||||
}
|
||||
|
||||
cliflag.PrintFlags(cmd.Flags())
|
||||
@ -680,7 +249,7 @@ func newProxyServer(ctx context.Context, config *kubeproxyconfig.KubeProxyConfig
|
||||
err, fatal := checkBadIPConfig(s, dualStackSupported)
|
||||
if err != nil {
|
||||
if fatal {
|
||||
return nil, fmt.Errorf("kube-proxy configuration is incorrect: %v", err)
|
||||
return nil, fmt.Errorf("kube-proxy configuration is incorrect: %w", err)
|
||||
}
|
||||
logger.Error(err, "Kube-proxy configuration may be incomplete or incorrect")
|
||||
}
|
||||
@ -859,7 +428,7 @@ func serveHealthz(ctx context.Context, hz *healthcheck.ProxierHealthServer, errC
|
||||
if err != nil {
|
||||
logger.Error(err, "Healthz server failed")
|
||||
if errCh != nil {
|
||||
errCh <- fmt.Errorf("healthz server failed: %v", err)
|
||||
errCh <- fmt.Errorf("healthz server failed: %w", err)
|
||||
// if in hardfail mode, never retry again
|
||||
blockCh := make(chan error)
|
||||
<-blockCh
|
||||
@ -898,7 +467,7 @@ func serveMetrics(bindAddress string, proxyMode kubeproxyconfig.ProxyMode, enabl
|
||||
fn := func() {
|
||||
err := http.ListenAndServe(bindAddress, proxyMux)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("starting metrics server failed: %v", err)
|
||||
err = fmt.Errorf("starting metrics server failed: %w", err)
|
||||
utilruntime.HandleError(err)
|
||||
if errCh != nil {
|
||||
errCh <- err
|
||||
|
@ -20,496 +20,18 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"path"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/spf13/pflag"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
clientsetfake "k8s.io/client-go/kubernetes/fake"
|
||||
componentbaseconfig "k8s.io/component-base/config"
|
||||
logsapi "k8s.io/component-base/logs/api/v1"
|
||||
kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config"
|
||||
"k8s.io/kubernetes/test/utils/ktesting"
|
||||
netutils "k8s.io/utils/net"
|
||||
"k8s.io/utils/ptr"
|
||||
)
|
||||
|
||||
// TestLoadConfig tests proper operation of loadConfig()
|
||||
func TestLoadConfig(t *testing.T) {
|
||||
|
||||
yamlTemplate := `apiVersion: kubeproxy.config.k8s.io/v1alpha1
|
||||
bindAddress: %s
|
||||
clientConnection:
|
||||
acceptContentTypes: "abc"
|
||||
burst: 100
|
||||
contentType: content-type
|
||||
kubeconfig: "/path/to/kubeconfig"
|
||||
qps: 7
|
||||
clusterCIDR: "%s"
|
||||
configSyncPeriod: 15s
|
||||
conntrack:
|
||||
maxPerCore: 2
|
||||
min: 1
|
||||
tcpCloseWaitTimeout: 10s
|
||||
tcpEstablishedTimeout: 20s
|
||||
healthzBindAddress: "%s"
|
||||
hostnameOverride: "foo"
|
||||
iptables:
|
||||
masqueradeAll: true
|
||||
masqueradeBit: 17
|
||||
minSyncPeriod: 10s
|
||||
syncPeriod: 60s
|
||||
localhostNodePorts: true
|
||||
ipvs:
|
||||
minSyncPeriod: 10s
|
||||
syncPeriod: 60s
|
||||
excludeCIDRs:
|
||||
- "10.20.30.40/16"
|
||||
- "fd00:1::0/64"
|
||||
nftables:
|
||||
masqueradeAll: true
|
||||
masqueradeBit: 18
|
||||
minSyncPeriod: 10s
|
||||
syncPeriod: 60s
|
||||
kind: KubeProxyConfiguration
|
||||
metricsBindAddress: "%s"
|
||||
mode: "%s"
|
||||
oomScoreAdj: 17
|
||||
portRange: "2-7"
|
||||
detectLocalMode: "ClusterCIDR"
|
||||
detectLocal:
|
||||
bridgeInterface: "cbr0"
|
||||
interfaceNamePrefix: "veth"
|
||||
nodePortAddresses:
|
||||
- "10.20.30.40/16"
|
||||
- "fd00:1::0/64"
|
||||
`
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
mode string
|
||||
bindAddress string
|
||||
clusterCIDR string
|
||||
healthzBindAddress string
|
||||
metricsBindAddress string
|
||||
extraConfig string
|
||||
}{
|
||||
{
|
||||
name: "iptables mode, IPv4 all-zeros bind address",
|
||||
mode: "iptables",
|
||||
bindAddress: "0.0.0.0",
|
||||
clusterCIDR: "1.2.3.0/24",
|
||||
healthzBindAddress: "1.2.3.4:12345",
|
||||
metricsBindAddress: "2.3.4.5:23456",
|
||||
},
|
||||
{
|
||||
name: "iptables mode, non-zeros IPv4 config",
|
||||
mode: "iptables",
|
||||
bindAddress: "9.8.7.6",
|
||||
clusterCIDR: "1.2.3.0/24",
|
||||
healthzBindAddress: "1.2.3.4:12345",
|
||||
metricsBindAddress: "2.3.4.5:23456",
|
||||
},
|
||||
{
|
||||
// Test for 'bindAddress: "::"' (IPv6 all-zeros) in kube-proxy
|
||||
// config file. The user will need to put quotes around '::' since
|
||||
// 'bindAddress: ::' is invalid yaml syntax.
|
||||
name: "iptables mode, IPv6 \"::\" bind address",
|
||||
mode: "iptables",
|
||||
bindAddress: "\"::\"",
|
||||
clusterCIDR: "fd00:1::0/64",
|
||||
healthzBindAddress: "[fd00:1::5]:12345",
|
||||
metricsBindAddress: "[fd00:2::5]:23456",
|
||||
},
|
||||
{
|
||||
// Test for 'bindAddress: "[::]"' (IPv6 all-zeros in brackets)
|
||||
// in kube-proxy config file. The user will need to use
|
||||
// surrounding quotes here since 'bindAddress: [::]' is invalid
|
||||
// yaml syntax.
|
||||
name: "iptables mode, IPv6 \"[::]\" bind address",
|
||||
mode: "iptables",
|
||||
bindAddress: "\"[::]\"",
|
||||
clusterCIDR: "fd00:1::0/64",
|
||||
healthzBindAddress: "[fd00:1::5]:12345",
|
||||
metricsBindAddress: "[fd00:2::5]:23456",
|
||||
},
|
||||
{
|
||||
// Test for 'bindAddress: ::0' (another form of IPv6 all-zeros).
|
||||
// No surrounding quotes are required around '::0'.
|
||||
name: "iptables mode, IPv6 ::0 bind address",
|
||||
mode: "iptables",
|
||||
bindAddress: "::0",
|
||||
clusterCIDR: "fd00:1::0/64",
|
||||
healthzBindAddress: "[fd00:1::5]:12345",
|
||||
metricsBindAddress: "[fd00:2::5]:23456",
|
||||
},
|
||||
{
|
||||
name: "ipvs mode, IPv6 config",
|
||||
mode: "ipvs",
|
||||
bindAddress: "2001:db8::1",
|
||||
clusterCIDR: "fd00:1::0/64",
|
||||
healthzBindAddress: "[fd00:1::5]:12345",
|
||||
metricsBindAddress: "[fd00:2::5]:23456",
|
||||
},
|
||||
{
|
||||
// Test for unknown field within config.
|
||||
// For v1alpha1 a lenient path is implemented and will throw a
|
||||
// strict decoding warning instead of failing to load
|
||||
name: "unknown field",
|
||||
mode: "iptables",
|
||||
bindAddress: "9.8.7.6",
|
||||
clusterCIDR: "1.2.3.0/24",
|
||||
healthzBindAddress: "1.2.3.4:12345",
|
||||
metricsBindAddress: "2.3.4.5:23456",
|
||||
extraConfig: "foo: bar",
|
||||
},
|
||||
{
|
||||
// Test for duplicate field within config.
|
||||
// For v1alpha1 a lenient path is implemented and will throw a
|
||||
// strict decoding warning instead of failing to load
|
||||
name: "duplicate field",
|
||||
mode: "iptables",
|
||||
bindAddress: "9.8.7.6",
|
||||
clusterCIDR: "1.2.3.0/24",
|
||||
healthzBindAddress: "1.2.3.4:12345",
|
||||
metricsBindAddress: "2.3.4.5:23456",
|
||||
extraConfig: "bindAddress: 9.8.7.6",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
expBindAddr := tc.bindAddress
|
||||
if tc.bindAddress[0] == '"' {
|
||||
// Surrounding double quotes will get stripped by the yaml parser.
|
||||
expBindAddr = expBindAddr[1 : len(tc.bindAddress)-1]
|
||||
}
|
||||
expected := &kubeproxyconfig.KubeProxyConfiguration{
|
||||
BindAddress: expBindAddr,
|
||||
ClientConnection: componentbaseconfig.ClientConnectionConfiguration{
|
||||
AcceptContentTypes: "abc",
|
||||
Burst: 100,
|
||||
ContentType: "content-type",
|
||||
Kubeconfig: "/path/to/kubeconfig",
|
||||
QPS: 7,
|
||||
},
|
||||
ClusterCIDR: tc.clusterCIDR,
|
||||
ConfigSyncPeriod: metav1.Duration{Duration: 15 * time.Second},
|
||||
Conntrack: kubeproxyconfig.KubeProxyConntrackConfiguration{
|
||||
MaxPerCore: ptr.To[int32](2),
|
||||
Min: ptr.To[int32](1),
|
||||
TCPCloseWaitTimeout: &metav1.Duration{Duration: 10 * time.Second},
|
||||
TCPEstablishedTimeout: &metav1.Duration{Duration: 20 * time.Second},
|
||||
},
|
||||
FeatureGates: map[string]bool{},
|
||||
HealthzBindAddress: tc.healthzBindAddress,
|
||||
HostnameOverride: "foo",
|
||||
IPTables: kubeproxyconfig.KubeProxyIPTablesConfiguration{
|
||||
MasqueradeAll: true,
|
||||
MasqueradeBit: ptr.To[int32](17),
|
||||
LocalhostNodePorts: ptr.To(true),
|
||||
MinSyncPeriod: metav1.Duration{Duration: 10 * time.Second},
|
||||
SyncPeriod: metav1.Duration{Duration: 60 * time.Second},
|
||||
},
|
||||
IPVS: kubeproxyconfig.KubeProxyIPVSConfiguration{
|
||||
MinSyncPeriod: metav1.Duration{Duration: 10 * time.Second},
|
||||
SyncPeriod: metav1.Duration{Duration: 60 * time.Second},
|
||||
ExcludeCIDRs: []string{"10.20.30.40/16", "fd00:1::0/64"},
|
||||
},
|
||||
NFTables: kubeproxyconfig.KubeProxyNFTablesConfiguration{
|
||||
MasqueradeAll: true,
|
||||
MasqueradeBit: ptr.To[int32](18),
|
||||
MinSyncPeriod: metav1.Duration{Duration: 10 * time.Second},
|
||||
SyncPeriod: metav1.Duration{Duration: 60 * time.Second},
|
||||
},
|
||||
MetricsBindAddress: tc.metricsBindAddress,
|
||||
Mode: kubeproxyconfig.ProxyMode(tc.mode),
|
||||
OOMScoreAdj: ptr.To[int32](17),
|
||||
PortRange: "2-7",
|
||||
NodePortAddresses: []string{"10.20.30.40/16", "fd00:1::0/64"},
|
||||
DetectLocalMode: kubeproxyconfig.LocalModeClusterCIDR,
|
||||
DetectLocal: kubeproxyconfig.DetectLocalConfiguration{
|
||||
BridgeInterface: string("cbr0"),
|
||||
InterfaceNamePrefix: string("veth"),
|
||||
},
|
||||
Logging: logsapi.LoggingConfiguration{
|
||||
Format: "text",
|
||||
FlushFrequency: logsapi.TimeOrMetaDuration{Duration: metav1.Duration{Duration: 5 * time.Second}, SerializeAsString: true},
|
||||
},
|
||||
}
|
||||
|
||||
options := NewOptions()
|
||||
|
||||
baseYAML := fmt.Sprintf(
|
||||
yamlTemplate, tc.bindAddress, tc.clusterCIDR,
|
||||
tc.healthzBindAddress, tc.metricsBindAddress, tc.mode)
|
||||
|
||||
// Append additional configuration to the base yaml template
|
||||
yaml := fmt.Sprintf("%s\n%s", baseYAML, tc.extraConfig)
|
||||
|
||||
config, err := options.loadConfig([]byte(yaml))
|
||||
|
||||
assert.NoError(t, err, "unexpected error for %s: %v", tc.name, err)
|
||||
|
||||
if diff := cmp.Diff(config, expected); diff != "" {
|
||||
t.Fatalf("unexpected config for %s, diff = %s", tc.name, diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestLoadConfigFailures tests failure modes for loadConfig()
|
||||
func TestLoadConfigFailures(t *testing.T) {
|
||||
// TODO(phenixblue): Uncomment below template when v1alpha2+ of kube-proxy config is
|
||||
// released with strict decoding. These associated tests will fail with
|
||||
// the lenient codec and only one config API version.
|
||||
/*
|
||||
yamlTemplate := `bindAddress: 0.0.0.0
|
||||
clusterCIDR: "1.2.3.0/24"
|
||||
configSyncPeriod: 15s
|
||||
kind: KubeProxyConfiguration`
|
||||
*/
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
config string
|
||||
expErr string
|
||||
checkFn func(err error) bool
|
||||
}{
|
||||
{
|
||||
name: "Decode error test",
|
||||
config: "Twas bryllyg, and ye slythy toves",
|
||||
expErr: "could not find expected ':'",
|
||||
},
|
||||
{
|
||||
name: "Bad config type test",
|
||||
config: "kind: KubeSchedulerConfiguration",
|
||||
expErr: "no kind",
|
||||
},
|
||||
{
|
||||
name: "Missing quotes around :: bindAddress",
|
||||
config: "bindAddress: ::",
|
||||
expErr: "mapping values are not allowed in this context",
|
||||
},
|
||||
// TODO(phenixblue): Uncomment below tests when v1alpha2+ of kube-proxy config is
|
||||
// released with strict decoding. These tests will fail with the
|
||||
// lenient codec and only one config API version.
|
||||
/*
|
||||
{
|
||||
name: "Duplicate fields",
|
||||
config: fmt.Sprintf("%s\nbindAddress: 1.2.3.4", yamlTemplate),
|
||||
checkFn: kuberuntime.IsStrictDecodingError,
|
||||
},
|
||||
{
|
||||
name: "Unknown field",
|
||||
config: fmt.Sprintf("%s\nfoo: bar", yamlTemplate),
|
||||
checkFn: kuberuntime.IsStrictDecodingError,
|
||||
},
|
||||
*/
|
||||
}
|
||||
|
||||
version := "apiVersion: kubeproxy.config.k8s.io/v1alpha1"
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
options := NewOptions()
|
||||
config := fmt.Sprintf("%s\n%s", version, tc.config)
|
||||
_, err := options.loadConfig([]byte(config))
|
||||
|
||||
if assert.Error(t, err, tc.name) {
|
||||
if tc.expErr != "" {
|
||||
assert.Contains(t, err.Error(), tc.expErr)
|
||||
}
|
||||
if tc.checkFn != nil {
|
||||
assert.True(t, tc.checkFn(err), tc.name)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestProcessHostnameOverrideFlag tests processing hostname-override arg
|
||||
func TestProcessHostnameOverrideFlag(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
hostnameOverrideFlag string
|
||||
expectedHostname string
|
||||
expectError bool
|
||||
}{
|
||||
{
|
||||
name: "Hostname from config file",
|
||||
hostnameOverrideFlag: "",
|
||||
expectedHostname: "foo",
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
name: "Hostname from flag",
|
||||
hostnameOverrideFlag: " bar ",
|
||||
expectedHostname: "bar",
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
name: "Hostname is space",
|
||||
hostnameOverrideFlag: " ",
|
||||
expectError: true,
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
options := NewOptions()
|
||||
options.config = &kubeproxyconfig.KubeProxyConfiguration{
|
||||
HostnameOverride: "foo",
|
||||
}
|
||||
|
||||
options.hostnameOverride = tc.hostnameOverrideFlag
|
||||
|
||||
err := options.processHostnameOverrideFlag()
|
||||
if tc.expectError {
|
||||
if err == nil {
|
||||
t.Fatalf("should error for this case %s", tc.name)
|
||||
}
|
||||
} else {
|
||||
assert.NoError(t, err, "unexpected error %v", err)
|
||||
if tc.expectedHostname != options.config.HostnameOverride {
|
||||
t.Fatalf("expected hostname: %s, but got: %s", tc.expectedHostname, options.config.HostnameOverride)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestOptionsComplete checks that command line flags are combined with a
|
||||
// config properly.
|
||||
func TestOptionsComplete(t *testing.T) {
|
||||
header := `apiVersion: kubeproxy.config.k8s.io/v1alpha1
|
||||
kind: KubeProxyConfiguration
|
||||
`
|
||||
|
||||
// Determine default config (depends on platform defaults).
|
||||
o := NewOptions()
|
||||
require.NoError(t, o.Complete(new(pflag.FlagSet)))
|
||||
expected := o.config
|
||||
|
||||
config := header + `logging:
|
||||
format: json
|
||||
flushFrequency: 1s
|
||||
verbosity: 10
|
||||
vmodule:
|
||||
- filePattern: foo.go
|
||||
verbosity: 6
|
||||
- filePattern: bar.go
|
||||
verbosity: 8
|
||||
`
|
||||
expectedLoggingConfig := logsapi.LoggingConfiguration{
|
||||
Format: "json",
|
||||
FlushFrequency: logsapi.TimeOrMetaDuration{Duration: metav1.Duration{Duration: time.Second}, SerializeAsString: true},
|
||||
Verbosity: 10,
|
||||
VModule: []logsapi.VModuleItem{
|
||||
{
|
||||
FilePattern: "foo.go",
|
||||
Verbosity: 6,
|
||||
},
|
||||
{
|
||||
FilePattern: "bar.go",
|
||||
Verbosity: 8,
|
||||
},
|
||||
},
|
||||
Options: logsapi.FormatOptions{
|
||||
JSON: logsapi.JSONOptions{
|
||||
OutputRoutingOptions: logsapi.OutputRoutingOptions{
|
||||
InfoBufferSize: resource.QuantityValue{Quantity: resource.MustParse("0")},
|
||||
},
|
||||
},
|
||||
Text: logsapi.TextOptions{
|
||||
OutputRoutingOptions: logsapi.OutputRoutingOptions{
|
||||
InfoBufferSize: resource.QuantityValue{Quantity: resource.MustParse("0")},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range map[string]struct {
|
||||
config string
|
||||
flags []string
|
||||
expected *kubeproxyconfig.KubeProxyConfiguration
|
||||
}{
|
||||
"empty": {
|
||||
expected: expected,
|
||||
},
|
||||
"empty-config": {
|
||||
config: header,
|
||||
expected: expected,
|
||||
},
|
||||
"logging-config": {
|
||||
config: config,
|
||||
expected: func() *kubeproxyconfig.KubeProxyConfiguration {
|
||||
c := expected.DeepCopy()
|
||||
c.Logging = *expectedLoggingConfig.DeepCopy()
|
||||
return c
|
||||
}(),
|
||||
},
|
||||
"flags": {
|
||||
flags: []string{
|
||||
"-v=7",
|
||||
"--vmodule", "goo.go=8",
|
||||
},
|
||||
expected: func() *kubeproxyconfig.KubeProxyConfiguration {
|
||||
c := expected.DeepCopy()
|
||||
c.Logging.Verbosity = 7
|
||||
c.Logging.VModule = append(c.Logging.VModule, logsapi.VModuleItem{
|
||||
FilePattern: "goo.go",
|
||||
Verbosity: 8,
|
||||
})
|
||||
return c
|
||||
}(),
|
||||
},
|
||||
"both": {
|
||||
config: config,
|
||||
flags: []string{
|
||||
"-v=7",
|
||||
"--vmodule", "goo.go=8",
|
||||
"--ipvs-scheduler", "some-scheduler", // Overwritten by config.
|
||||
},
|
||||
expected: func() *kubeproxyconfig.KubeProxyConfiguration {
|
||||
c := expected.DeepCopy()
|
||||
c.Logging = *expectedLoggingConfig.DeepCopy()
|
||||
// Flag wins.
|
||||
c.Logging.Verbosity = 7
|
||||
// Flag and config get merged with command line flags first.
|
||||
c.Logging.VModule = append([]logsapi.VModuleItem{
|
||||
{
|
||||
FilePattern: "goo.go",
|
||||
Verbosity: 8,
|
||||
},
|
||||
}, c.Logging.VModule...)
|
||||
return c
|
||||
}(),
|
||||
},
|
||||
} {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
options := NewOptions()
|
||||
fs := new(pflag.FlagSet)
|
||||
options.AddFlags(fs)
|
||||
flags := tc.flags
|
||||
if len(tc.config) > 0 {
|
||||
tmp := t.TempDir()
|
||||
configFile := path.Join(tmp, "kube-proxy.conf")
|
||||
require.NoError(t, ioutil.WriteFile(configFile, []byte(tc.config), 0666))
|
||||
flags = append(flags, "--config", configFile)
|
||||
}
|
||||
require.NoError(t, fs.Parse(flags))
|
||||
require.NoError(t, options.Complete(fs))
|
||||
assert.Equal(t, tc.expected, options.config)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type fakeProxyServerLongRun struct{}
|
||||
|
||||
// Run runs the specified ProxyServer.
|
||||
@ -539,82 +61,6 @@ func (s *fakeProxyServerError) CleanupAndExit() error {
|
||||
return errors.New("mocking error from ProxyServer.CleanupAndExit()")
|
||||
}
|
||||
|
||||
func TestAddressFromDeprecatedFlags(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
healthzPort int32
|
||||
healthzBindAddress string
|
||||
metricsPort int32
|
||||
metricsBindAddress string
|
||||
expHealthz string
|
||||
expMetrics string
|
||||
}{
|
||||
{
|
||||
name: "IPv4 bind address",
|
||||
healthzBindAddress: "1.2.3.4",
|
||||
healthzPort: 12345,
|
||||
metricsBindAddress: "2.3.4.5",
|
||||
metricsPort: 23456,
|
||||
expHealthz: "1.2.3.4:12345",
|
||||
expMetrics: "2.3.4.5:23456",
|
||||
},
|
||||
{
|
||||
name: "IPv4 bind address has port",
|
||||
healthzBindAddress: "1.2.3.4:12345",
|
||||
healthzPort: 23456,
|
||||
metricsBindAddress: "2.3.4.5:12345",
|
||||
metricsPort: 23456,
|
||||
expHealthz: "1.2.3.4:12345",
|
||||
expMetrics: "2.3.4.5:12345",
|
||||
},
|
||||
{
|
||||
name: "IPv6 bind address",
|
||||
healthzBindAddress: "fd00:1::5",
|
||||
healthzPort: 12345,
|
||||
metricsBindAddress: "fd00:1::6",
|
||||
metricsPort: 23456,
|
||||
expHealthz: "[fd00:1::5]:12345",
|
||||
expMetrics: "[fd00:1::6]:23456",
|
||||
},
|
||||
{
|
||||
name: "IPv6 bind address has port",
|
||||
healthzBindAddress: "[fd00:1::5]:12345",
|
||||
healthzPort: 56789,
|
||||
metricsBindAddress: "[fd00:1::6]:56789",
|
||||
metricsPort: 12345,
|
||||
expHealthz: "[fd00:1::5]:12345",
|
||||
expMetrics: "[fd00:1::6]:56789",
|
||||
},
|
||||
{
|
||||
name: "Invalid IPv6 Config",
|
||||
healthzBindAddress: "[fd00:1::5]",
|
||||
healthzPort: 12345,
|
||||
metricsBindAddress: "[fd00:1::6]",
|
||||
metricsPort: 56789,
|
||||
expHealthz: "[fd00:1::5]",
|
||||
expMetrics: "[fd00:1::6]",
|
||||
},
|
||||
}
|
||||
|
||||
for i := range testCases {
|
||||
gotHealthz := addressFromDeprecatedFlags(testCases[i].healthzBindAddress, testCases[i].healthzPort)
|
||||
gotMetrics := addressFromDeprecatedFlags(testCases[i].metricsBindAddress, testCases[i].metricsPort)
|
||||
|
||||
errFn := func(name, except, got string) {
|
||||
t.Errorf("case %s: expected %v, got %v", name, except, got)
|
||||
}
|
||||
|
||||
if gotHealthz != testCases[i].expHealthz {
|
||||
errFn(testCases[i].name, testCases[i].expHealthz, gotHealthz)
|
||||
}
|
||||
|
||||
if gotMetrics != testCases[i].expMetrics {
|
||||
errFn(testCases[i].name, testCases[i].expMetrics, gotMetrics)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func makeNodeWithAddress(name, primaryIP string) *v1.Node {
|
||||
node := &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
|
Loading…
Reference in New Issue
Block a user