Migrate cmd/kube-proxy to contextual logging (#122197)

* cmd/kube-proxy support contextual logging

Signed-off-by: Ziqi Zhao <zhaoziqi9146@gmail.com>

* use ktesting.NewTestContext(t) in unit test

Signed-off-by: Ziqi Zhao <zhaoziqi9146@gmail.com>

* use ktesting.NewTestContext(t) in unit test

Signed-off-by: Ziqi Zhao <zhaoziqi9146@gmail.com>

* remove unnecessary blank line & add cmd/kube-proxy to contextual section in logcheck.conf

Signed-off-by: Ziqi Zhao <zhaoziqi9146@gmail.com>

* add more contextual logging

Signed-off-by: Ziqi Zhao <zhaoziqi9146@gmail.com>

* new lint yaml

Signed-off-by: Ziqi Zhao <zhaoziqi9146@gmail.com>

---------

Signed-off-by: Ziqi Zhao <zhaoziqi9146@gmail.com>
This commit is contained in:
Ziqi Zhao 2024-01-09 00:30:18 +08:00 committed by GitHub
parent c8a718bb87
commit 6b5e973e5f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 93 additions and 74 deletions

View File

@ -47,7 +47,9 @@ type Conntracker interface {
SetUDPStreamTimeout(seconds int) error SetUDPStreamTimeout(seconds int) error
} }
type realConntracker struct{} type realConntracker struct {
logger klog.Logger
}
var errReadOnlySysFS = errors.New("readOnlySysFS") var errReadOnlySysFS = errors.New("readOnlySysFS")
@ -55,7 +57,7 @@ func (rct realConntracker) SetMax(max int) error {
if err := rct.setIntSysCtl("nf_conntrack_max", max); err != nil { if err := rct.setIntSysCtl("nf_conntrack_max", max); err != nil {
return err return err
} }
klog.InfoS("Setting nf_conntrack_max", "nfConntrackMax", max) rct.logger.Info("Setting nf_conntrack_max", "nfConntrackMax", max)
// Linux does not support writing to /sys/module/nf_conntrack/parameters/hashsize // Linux does not support writing to /sys/module/nf_conntrack/parameters/hashsize
// when the writer process is not in the initial network namespace // when the writer process is not in the initial network namespace
@ -78,7 +80,7 @@ func (rct realConntracker) SetMax(max int) error {
// don't set conntrack hashsize and return a special error // don't set conntrack hashsize and return a special error
// errReadOnlySysFS here. The caller should deal with // errReadOnlySysFS here. The caller should deal with
// errReadOnlySysFS differently. // errReadOnlySysFS differently.
writable, err := isSysFSWritable() writable, err := rct.isSysFSWritable()
if err != nil { if err != nil {
return err return err
} }
@ -86,7 +88,7 @@ func (rct realConntracker) SetMax(max int) error {
return errReadOnlySysFS return errReadOnlySysFS
} }
// TODO: generify this and sysctl to a new sysfs.WriteInt() // TODO: generify this and sysctl to a new sysfs.WriteInt()
klog.InfoS("Setting conntrack hashsize", "conntrackHashsize", max/4) rct.logger.Info("Setting conntrack hashsize", "conntrackHashsize", max/4)
return writeIntStringFile("/sys/module/nf_conntrack/parameters/hashsize", max/4) return writeIntStringFile("/sys/module/nf_conntrack/parameters/hashsize", max/4)
} }
@ -110,12 +112,12 @@ func (rct realConntracker) SetUDPStreamTimeout(seconds int) error {
return rct.setIntSysCtl("nf_conntrack_udp_timeout_stream", seconds) return rct.setIntSysCtl("nf_conntrack_udp_timeout_stream", seconds)
} }
func (realConntracker) setIntSysCtl(name string, value int) error { func (rct realConntracker) setIntSysCtl(name string, value int) error {
entry := "net/netfilter/" + name entry := "net/netfilter/" + name
sys := sysctl.New() sys := sysctl.New()
if val, _ := sys.GetSysctl(entry); val != value { if val, _ := sys.GetSysctl(entry); val != value {
klog.InfoS("Set sysctl", "entry", entry, "value", value) rct.logger.Info("Set sysctl", "entry", entry, "value", value)
if err := sys.SetSysctl(entry, value); err != nil { if err := sys.SetSysctl(entry, value); err != nil {
return err return err
} }
@ -124,13 +126,13 @@ func (realConntracker) setIntSysCtl(name string, value int) error {
} }
// isSysFSWritable checks /proc/mounts to see whether sysfs is 'rw' or not. // isSysFSWritable checks /proc/mounts to see whether sysfs is 'rw' or not.
func isSysFSWritable() (bool, error) { func (rct realConntracker) isSysFSWritable() (bool, error) {
const permWritable = "rw" const permWritable = "rw"
const sysfsDevice = "sysfs" const sysfsDevice = "sysfs"
m := mount.New("" /* default mount path */) m := mount.New("" /* default mount path */)
mountPoints, err := m.List() mountPoints, err := m.List()
if err != nil { if err != nil {
klog.ErrorS(err, "Failed to list mount points") rct.logger.Error(err, "Failed to list mount points")
return false, err return false, err
} }
@ -142,7 +144,7 @@ func isSysFSWritable() (bool, error) {
if len(mountPoint.Opts) > 0 && mountPoint.Opts[0] == permWritable { if len(mountPoint.Opts) > 0 && mountPoint.Opts[0] == permWritable {
return true, nil return true, nil
} }
klog.ErrorS(nil, "Sysfs is not writable", "mountPoint", mountPoint, "mountOptions", mountPoint.Opts) rct.logger.Error(nil, "Sysfs is not writable", "mountPoint", mountPoint, "mountOptions", mountPoint.Opts)
return false, errReadOnlySysFS return false, errReadOnlySysFS
} }

View File

@ -136,6 +136,8 @@ type Options struct {
// hostnameOverride, if set from the command line flag, takes precedence over the `HostnameOverride` value from the config file // hostnameOverride, if set from the command line flag, takes precedence over the `HostnameOverride` value from the config file
hostnameOverride string hostnameOverride string
logger klog.Logger
} }
// AddFlags adds flags to fs and binds them to options. // AddFlags adds flags to fs and binds them to options.
@ -244,6 +246,7 @@ func NewOptions() *Options {
healthzPort: ports.ProxyHealthzPort, healthzPort: ports.ProxyHealthzPort,
metricsPort: ports.ProxyStatusPort, metricsPort: ports.ProxyStatusPort,
errCh: make(chan error), errCh: make(chan error),
logger: klog.FromContext(context.Background()),
} }
} }
@ -382,7 +385,7 @@ func (o *Options) Run() error {
// We ignore err otherwise; the cleanup is best-effort, and the backends will have // We ignore err otherwise; the cleanup is best-effort, and the backends will have
// logged messages if they failed in interesting ways. // logged messages if they failed in interesting ways.
proxyServer, err := newProxyServer(o.config, o.master, o.InitAndExit) proxyServer, err := newProxyServer(o.logger, o.config, o.master, o.InitAndExit)
if err != nil { if err != nil {
return err return err
} }
@ -440,7 +443,7 @@ func (o *Options) writeConfigFile() (err error) {
return err return err
} }
klog.InfoS("Wrote configuration", "file", o.WriteConfigTo) o.logger.Info("Wrote configuration", "file", o.WriteConfigTo)
return nil return nil
} }
@ -506,7 +509,7 @@ func (o *Options) loadConfig(data []byte) (*kubeproxyconfig.KubeProxyConfigurati
} }
// Continue with the v1alpha1 object that was decoded leniently, but emit a warning. // Continue with the v1alpha1 object that was decoded leniently, but emit a warning.
klog.InfoS("Using lenient decoding as strict decoding failed", "err", err) o.logger.Info("Using lenient decoding as strict decoding failed", "err", err)
} }
proxyConfig, ok := configObj.(*kubeproxyconfig.KubeProxyConfiguration) proxyConfig, ok := configObj.(*kubeproxyconfig.KubeProxyConfiguration)
@ -553,7 +556,7 @@ with the apiserver API to configure the proxy.`,
// add feature enablement metrics // add feature enablement metrics
utilfeature.DefaultMutableFeatureGate.AddMetrics() utilfeature.DefaultMutableFeatureGate.AddMetrics()
if err := opts.Run(); err != nil { if err := opts.Run(); err != nil {
klog.ErrorS(err, "Error running ProxyServer") opts.logger.Error(err, "Error running ProxyServer")
return err return err
} }
@ -595,11 +598,16 @@ type ProxyServer struct {
podCIDRs []string // only used for LocalModeNodeCIDR podCIDRs []string // only used for LocalModeNodeCIDR
Proxier proxy.Provider Proxier proxy.Provider
logger klog.Logger
} }
// newProxyServer creates a ProxyServer based on the given config // newProxyServer creates a ProxyServer based on the given config
func newProxyServer(config *kubeproxyconfig.KubeProxyConfiguration, master string, initOnly bool) (*ProxyServer, error) { func newProxyServer(logger klog.Logger, config *kubeproxyconfig.KubeProxyConfiguration, master string, initOnly bool) (*ProxyServer, error) {
s := &ProxyServer{Config: config} s := &ProxyServer{
Config: config,
logger: logger,
}
cz, err := configz.New(kubeproxyconfig.GroupName) cz, err := configz.New(kubeproxyconfig.GroupName)
if err != nil { if err != nil {
@ -616,13 +624,13 @@ func newProxyServer(config *kubeproxyconfig.KubeProxyConfiguration, master strin
return nil, err return nil, err
} }
s.Client, err = createClient(config.ClientConnection, master) s.Client, err = createClient(logger, config.ClientConnection, master)
if err != nil { if err != nil {
return nil, err return nil, err
} }
rawNodeIPs := getNodeIPs(s.Client, s.Hostname) rawNodeIPs := getNodeIPs(logger, s.Client, s.Hostname)
s.PrimaryIPFamily, s.NodeIPs = detectNodeIPs(rawNodeIPs, config.BindAddress) s.PrimaryIPFamily, s.NodeIPs = detectNodeIPs(logger, rawNodeIPs, config.BindAddress)
s.Broadcaster = events.NewBroadcaster(&events.EventSinkImpl{Interface: s.Client.EventsV1()}) s.Broadcaster = events.NewBroadcaster(&events.EventSinkImpl{Interface: s.Client.EventsV1()})
s.Recorder = s.Broadcaster.NewRecorder(proxyconfigscheme.Scheme, "kube-proxy") s.Recorder = s.Broadcaster.NewRecorder(proxyconfigscheme.Scheme, "kube-proxy")
@ -649,9 +657,9 @@ func newProxyServer(config *kubeproxyconfig.KubeProxyConfiguration, master strin
} else if (s.PrimaryIPFamily == v1.IPv4Protocol && !ipv4Supported) || (s.PrimaryIPFamily == v1.IPv6Protocol && !ipv6Supported) { } else if (s.PrimaryIPFamily == v1.IPv4Protocol && !ipv4Supported) || (s.PrimaryIPFamily == v1.IPv6Protocol && !ipv6Supported) {
return nil, fmt.Errorf("no support for primary IP family %q", s.PrimaryIPFamily) return nil, fmt.Errorf("no support for primary IP family %q", s.PrimaryIPFamily)
} else if dualStackSupported { } else if dualStackSupported {
klog.InfoS("kube-proxy running in dual-stack mode", "primary ipFamily", s.PrimaryIPFamily) logger.Info("kube-proxy running in dual-stack mode", "primary ipFamily", s.PrimaryIPFamily)
} else { } else {
klog.InfoS("kube-proxy running in single-stack mode", "ipFamily", s.PrimaryIPFamily) logger.Info("kube-proxy running in single-stack mode", "ipFamily", s.PrimaryIPFamily)
} }
err, fatal := checkIPConfig(s, dualStackSupported) err, fatal := checkIPConfig(s, dualStackSupported)
@ -659,7 +667,7 @@ func newProxyServer(config *kubeproxyconfig.KubeProxyConfiguration, master strin
if fatal { if fatal {
return nil, fmt.Errorf("kube-proxy configuration is incorrect: %v", err) return nil, fmt.Errorf("kube-proxy configuration is incorrect: %v", err)
} }
klog.ErrorS(err, "Kube-proxy configuration may be incomplete or incorrect") logger.Error(err, "Kube-proxy configuration may be incomplete or incorrect")
} }
s.Proxier, err = s.createProxier(config, dualStackSupported, initOnly) s.Proxier, err = s.createProxier(config, dualStackSupported, initOnly)
@ -765,12 +773,12 @@ func badBindAddress(bindAddress string, wrongFamily netutils.IPFamily) bool {
// createClient creates a kube client from the given config and masterOverride. // createClient creates a kube client from the given config and masterOverride.
// TODO remove masterOverride when CLI flags are removed. // TODO remove masterOverride when CLI flags are removed.
func createClient(config componentbaseconfig.ClientConnectionConfiguration, masterOverride string) (clientset.Interface, error) { func createClient(logger klog.Logger, config componentbaseconfig.ClientConnectionConfiguration, masterOverride string) (clientset.Interface, error) {
var kubeConfig *rest.Config var kubeConfig *rest.Config
var err error var err error
if len(config.Kubeconfig) == 0 && len(masterOverride) == 0 { if len(config.Kubeconfig) == 0 && len(masterOverride) == 0 {
klog.InfoS("Neither kubeconfig file nor master URL was specified, falling back to in-cluster config") logger.Info("Neither kubeconfig file nor master URL was specified, falling back to in-cluster config")
kubeConfig, err = rest.InClusterConfig() kubeConfig, err = rest.InClusterConfig()
} else { } else {
// This creates a client, first loading any specified kubeconfig // This creates a client, first loading any specified kubeconfig
@ -796,7 +804,7 @@ func createClient(config componentbaseconfig.ClientConnectionConfiguration, mast
return client, nil return client, nil
} }
func serveHealthz(hz *healthcheck.ProxierHealthServer, errCh chan error) { func serveHealthz(logger klog.Logger, hz *healthcheck.ProxierHealthServer, errCh chan error) {
if hz == nil { if hz == nil {
return return
} }
@ -804,7 +812,7 @@ func serveHealthz(hz *healthcheck.ProxierHealthServer, errCh chan error) {
fn := func() { fn := func() {
err := hz.Run() err := hz.Run()
if err != nil { if err != nil {
klog.ErrorS(err, "Healthz server failed") logger.Error(err, "Healthz server failed")
if errCh != nil { if errCh != nil {
errCh <- fmt.Errorf("healthz server failed: %v", err) errCh <- fmt.Errorf("healthz server failed: %v", err)
// if in hardfail mode, never retry again // if in hardfail mode, never retry again
@ -812,7 +820,7 @@ func serveHealthz(hz *healthcheck.ProxierHealthServer, errCh chan error) {
<-blockCh <-blockCh
} }
} else { } else {
klog.ErrorS(nil, "Healthz server returned without error") logger.Error(nil, "Healthz server returned without error")
} }
} }
go wait.Until(fn, 5*time.Second, wait.NeverStop) go wait.Until(fn, 5*time.Second, wait.NeverStop)
@ -862,16 +870,16 @@ func serveMetrics(bindAddress string, proxyMode kubeproxyconfig.ProxyMode, enabl
// TODO: At the moment, Run() cannot return a nil error, otherwise it's caller will never exit. Update callers of Run to handle nil errors. // TODO: At the moment, Run() cannot return a nil error, otherwise it's caller will never exit. Update callers of Run to handle nil errors.
func (s *ProxyServer) Run() error { func (s *ProxyServer) Run() error {
// To help debugging, immediately log version // To help debugging, immediately log version
klog.InfoS("Version info", "version", version.Get()) s.logger.Info("Version info", "version", version.Get())
klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK")) s.logger.Info("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
// TODO(vmarmol): Use container config for this. // TODO(vmarmol): Use container config for this.
var oomAdjuster *oom.OOMAdjuster var oomAdjuster *oom.OOMAdjuster
if s.Config.OOMScoreAdj != nil { if s.Config.OOMScoreAdj != nil {
oomAdjuster = oom.NewOOMAdjuster() oomAdjuster = oom.NewOOMAdjuster()
if err := oomAdjuster.ApplyOOMScoreAdj(0, int(*s.Config.OOMScoreAdj)); err != nil { if err := oomAdjuster.ApplyOOMScoreAdj(0, int(*s.Config.OOMScoreAdj)); err != nil {
klog.V(2).InfoS("Failed to apply OOMScore", "err", err) s.logger.V(2).Info("Failed to apply OOMScore", "err", err)
} }
} }
@ -889,7 +897,7 @@ func (s *ProxyServer) Run() error {
} }
// Start up a healthz server if requested // Start up a healthz server if requested
serveHealthz(s.HealthzServer, healthzErrCh) serveHealthz(s.logger, s.HealthzServer, healthzErrCh)
// Start up a metrics server if requested // Start up a metrics server if requested
serveMetrics(s.Config.MetricsBindAddress, s.Config.Mode, s.Config.EnableProfiling, metricsErrCh) serveMetrics(s.Config.MetricsBindAddress, s.Config.Mode, s.Config.EnableProfiling, metricsErrCh)
@ -983,7 +991,7 @@ func (s *ProxyServer) birthCry() {
// 1. if bindAddress is not 0.0.0.0 or ::, then it is used as the primary IP. // 1. if bindAddress is not 0.0.0.0 or ::, then it is used as the primary IP.
// 2. if rawNodeIPs is not empty, then its address(es) is/are used // 2. if rawNodeIPs is not empty, then its address(es) is/are used
// 3. otherwise the node IPs are 127.0.0.1 and ::1 // 3. otherwise the node IPs are 127.0.0.1 and ::1
func detectNodeIPs(rawNodeIPs []net.IP, bindAddress string) (v1.IPFamily, map[v1.IPFamily]net.IP) { func detectNodeIPs(logger klog.Logger, rawNodeIPs []net.IP, bindAddress string) (v1.IPFamily, map[v1.IPFamily]net.IP) {
primaryFamily := v1.IPv4Protocol primaryFamily := v1.IPv4Protocol
nodeIPs := map[v1.IPFamily]net.IP{ nodeIPs := map[v1.IPFamily]net.IP{
v1.IPv4Protocol: net.IPv4(127, 0, 0, 1), v1.IPv4Protocol: net.IPv4(127, 0, 0, 1),
@ -1017,14 +1025,14 @@ func detectNodeIPs(rawNodeIPs []net.IP, bindAddress string) (v1.IPFamily, map[v1
} }
if nodeIPs[primaryFamily].IsLoopback() { if nodeIPs[primaryFamily].IsLoopback() {
klog.InfoS("Can't determine this node's IP, assuming loopback; if this is incorrect, please set the --bind-address flag") logger.Info("Can't determine this node's IP, assuming loopback; if this is incorrect, please set the --bind-address flag")
} }
return primaryFamily, nodeIPs return primaryFamily, nodeIPs
} }
// getNodeIP returns IPs for the node with the provided name. If // getNodeIP returns IPs for the node with the provided name. If
// required, it will wait for the node to be created. // required, it will wait for the node to be created.
func getNodeIPs(client clientset.Interface, name string) []net.IP { func getNodeIPs(logger klog.Logger, client clientset.Interface, name string) []net.IP {
var nodeIPs []net.IP var nodeIPs []net.IP
backoff := wait.Backoff{ backoff := wait.Backoff{
Steps: 6, Steps: 6,
@ -1036,18 +1044,18 @@ func getNodeIPs(client clientset.Interface, name string) []net.IP {
err := wait.ExponentialBackoff(backoff, func() (bool, error) { err := wait.ExponentialBackoff(backoff, func() (bool, error) {
node, err := client.CoreV1().Nodes().Get(context.TODO(), name, metav1.GetOptions{}) node, err := client.CoreV1().Nodes().Get(context.TODO(), name, metav1.GetOptions{})
if err != nil { if err != nil {
klog.ErrorS(err, "Failed to retrieve node info") logger.Error(err, "Failed to retrieve node info")
return false, nil return false, nil
} }
nodeIPs, err = utilnode.GetNodeHostIPs(node) nodeIPs, err = utilnode.GetNodeHostIPs(node)
if err != nil { if err != nil {
klog.ErrorS(err, "Failed to retrieve node IPs") logger.Error(err, "Failed to retrieve node IPs")
return false, nil return false, nil
} }
return true, nil return true, nil
}) })
if err == nil { if err == nil {
klog.InfoS("Successfully retrieved node IP(s)", "IPs", nodeIPs) logger.Info("Successfully retrieved node IP(s)", "IPs", nodeIPs)
} }
return nodeIPs return nodeIPs
} }

View File

@ -69,15 +69,15 @@ var timeoutForNodePodCIDR = 5 * time.Minute
// config file, to apply platform-specific default values to config. // config file, to apply platform-specific default values to config.
func (o *Options) platformApplyDefaults(config *proxyconfigapi.KubeProxyConfiguration) { func (o *Options) platformApplyDefaults(config *proxyconfigapi.KubeProxyConfiguration) {
if config.Mode == "" { if config.Mode == "" {
klog.InfoS("Using iptables proxy") o.logger.Info("Using iptables proxy")
config.Mode = proxyconfigapi.ProxyModeIPTables config.Mode = proxyconfigapi.ProxyModeIPTables
} }
if config.DetectLocalMode == "" { if config.DetectLocalMode == "" {
klog.V(4).InfoS("Defaulting detect-local-mode", "localModeClusterCIDR", string(proxyconfigapi.LocalModeClusterCIDR)) o.logger.V(4).Info("Defaulting detect-local-mode", "localModeClusterCIDR", string(proxyconfigapi.LocalModeClusterCIDR))
config.DetectLocalMode = proxyconfigapi.LocalModeClusterCIDR config.DetectLocalMode = proxyconfigapi.LocalModeClusterCIDR
} }
klog.V(2).InfoS("DetectLocalMode", "localMode", string(config.DetectLocalMode)) o.logger.V(2).Info("DetectLocalMode", "localMode", string(config.DetectLocalMode))
} }
// platformSetup is called after setting up the ProxyServer, but before creating the // platformSetup is called after setting up the ProxyServer, but before creating the
@ -85,13 +85,13 @@ func (o *Options) platformApplyDefaults(config *proxyconfigapi.KubeProxyConfigur
// platform-specific setup. // platform-specific setup.
func (s *ProxyServer) platformSetup() error { func (s *ProxyServer) platformSetup() error {
if s.Config.DetectLocalMode == proxyconfigapi.LocalModeNodeCIDR { if s.Config.DetectLocalMode == proxyconfigapi.LocalModeNodeCIDR {
klog.InfoS("Watching for node, awaiting podCIDR allocation", "hostname", s.Hostname) s.logger.Info("Watching for node, awaiting podCIDR allocation", "hostname", s.Hostname)
node, err := waitForPodCIDR(s.Client, s.Hostname) node, err := waitForPodCIDR(s.Client, s.Hostname)
if err != nil { if err != nil {
return err return err
} }
s.podCIDRs = node.Spec.PodCIDRs s.podCIDRs = node.Spec.PodCIDRs
klog.InfoS("NodeInfo", "podCIDRs", node.Spec.PodCIDRs) s.logger.Info("NodeInfo", "podCIDRs", node.Spec.PodCIDRs)
} }
err := s.setupConntrack() err := s.setupConntrack()
@ -141,9 +141,9 @@ func (s *ProxyServer) platformCheckSupported() (ipv4Supported, ipv6Supported, du
if !ipv4Supported && !ipv6Supported { if !ipv4Supported && !ipv6Supported {
err = fmt.Errorf("iptables is not available on this host") err = fmt.Errorf("iptables is not available on this host")
} else if !ipv4Supported { } else if !ipv4Supported {
klog.InfoS("No iptables support for family", "ipFamily", v1.IPv4Protocol) s.logger.Info("No iptables support for family", "ipFamily", v1.IPv4Protocol)
} else if !ipv6Supported { } else if !ipv6Supported {
klog.InfoS("No iptables support for family", "ipFamily", v1.IPv6Protocol) s.logger.Info("No iptables support for family", "ipFamily", v1.IPv6Protocol)
} }
} else { } else {
// Assume support for both families. // Assume support for both families.
@ -165,12 +165,12 @@ func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguratio
var err error var err error
if config.Mode == proxyconfigapi.ProxyModeIPTables { if config.Mode == proxyconfigapi.ProxyModeIPTables {
klog.InfoS("Using iptables Proxier") s.logger.Info("Using iptables Proxier")
if dualStack { if dualStack {
ipt, _ := getIPTables(s.PrimaryIPFamily) ipt, _ := getIPTables(s.PrimaryIPFamily)
localDetectors, err = getDualStackLocalDetectorTuple(config.DetectLocalMode, config, s.podCIDRs) localDetectors, err = getDualStackLocalDetectorTuple(s.logger, config.DetectLocalMode, config, s.podCIDRs)
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err) return nil, fmt.Errorf("unable to create proxier: %v", err)
} }
@ -196,7 +196,7 @@ func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguratio
} else { } else {
// Create a single-stack proxier if and only if the node does not support dual-stack (i.e, no iptables support). // Create a single-stack proxier if and only if the node does not support dual-stack (i.e, no iptables support).
_, iptInterface := getIPTables(s.PrimaryIPFamily) _, iptInterface := getIPTables(s.PrimaryIPFamily)
localDetector, err = getLocalDetector(s.PrimaryIPFamily, config.DetectLocalMode, config, s.podCIDRs) localDetector, err = getLocalDetector(s.logger, s.PrimaryIPFamily, config.DetectLocalMode, config, s.podCIDRs)
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err) return nil, fmt.Errorf("unable to create proxier: %v", err)
} }
@ -233,12 +233,12 @@ func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguratio
return nil, fmt.Errorf("can't use the IPVS proxier: %v", err) return nil, fmt.Errorf("can't use the IPVS proxier: %v", err)
} }
klog.InfoS("Using ipvs Proxier") s.logger.Info("Using ipvs Proxier")
if dualStack { if dualStack {
ipt, _ := getIPTables(s.PrimaryIPFamily) ipt, _ := getIPTables(s.PrimaryIPFamily)
// Always ordered to match []ipt // Always ordered to match []ipt
localDetectors, err = getDualStackLocalDetectorTuple(config.DetectLocalMode, config, s.podCIDRs) localDetectors, err = getDualStackLocalDetectorTuple(s.logger, config.DetectLocalMode, config, s.podCIDRs)
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err) return nil, fmt.Errorf("unable to create proxier: %v", err)
} }
@ -269,7 +269,7 @@ func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguratio
) )
} else { } else {
_, iptInterface := getIPTables(s.PrimaryIPFamily) _, iptInterface := getIPTables(s.PrimaryIPFamily)
localDetector, err = getLocalDetector(s.PrimaryIPFamily, config.DetectLocalMode, config, s.podCIDRs) localDetector, err = getLocalDetector(s.logger, s.PrimaryIPFamily, config.DetectLocalMode, config, s.podCIDRs)
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err) return nil, fmt.Errorf("unable to create proxier: %v", err)
} }
@ -304,10 +304,10 @@ func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguratio
return nil, fmt.Errorf("unable to create proxier: %v", err) return nil, fmt.Errorf("unable to create proxier: %v", err)
} }
} else if config.Mode == proxyconfigapi.ProxyModeNFTables { } else if config.Mode == proxyconfigapi.ProxyModeNFTables {
klog.InfoS("Using nftables Proxier") s.logger.Info("Using nftables Proxier")
if dualStack { if dualStack {
localDetectors, err = getDualStackLocalDetectorTuple(config.DetectLocalMode, config, s.podCIDRs) localDetectors, err = getDualStackLocalDetectorTuple(s.logger, config.DetectLocalMode, config, s.podCIDRs)
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err) return nil, fmt.Errorf("unable to create proxier: %v", err)
} }
@ -329,7 +329,7 @@ func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguratio
) )
} else { } else {
// Create a single-stack proxier if and only if the node does not support dual-stack // Create a single-stack proxier if and only if the node does not support dual-stack
localDetector, err = getLocalDetector(s.PrimaryIPFamily, config.DetectLocalMode, config, s.podCIDRs) localDetector, err = getLocalDetector(s.logger, s.PrimaryIPFamily, config.DetectLocalMode, config, s.podCIDRs)
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err) return nil, fmt.Errorf("unable to create proxier: %v", err)
} }
@ -361,9 +361,11 @@ func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguratio
} }
func (s *ProxyServer) setupConntrack() error { func (s *ProxyServer) setupConntrack() error {
ct := &realConntracker{} ct := &realConntracker{
logger: s.logger,
}
max, err := getConntrackMax(s.Config.Conntrack) max, err := getConntrackMax(s.logger, s.Config.Conntrack)
if err != nil { if err != nil {
return err return err
} }
@ -423,7 +425,7 @@ func (s *ProxyServer) setupConntrack() error {
return nil return nil
} }
func getConntrackMax(config proxyconfigapi.KubeProxyConntrackConfiguration) (int, error) { func getConntrackMax(logger klog.Logger, config proxyconfigapi.KubeProxyConntrackConfiguration) (int, error) {
if config.MaxPerCore != nil && *config.MaxPerCore > 0 { if config.MaxPerCore != nil && *config.MaxPerCore > 0 {
floor := 0 floor := 0
if config.Min != nil { if config.Min != nil {
@ -431,10 +433,10 @@ func getConntrackMax(config proxyconfigapi.KubeProxyConntrackConfiguration) (int
} }
scaled := int(*config.MaxPerCore) * detectNumCPU() scaled := int(*config.MaxPerCore) * detectNumCPU()
if scaled > floor { if scaled > floor {
klog.V(3).InfoS("GetConntrackMax: using scaled conntrack-max-per-core") logger.V(3).Info("GetConntrackMax: using scaled conntrack-max-per-core")
return scaled, nil return scaled, nil
} }
klog.V(3).InfoS("GetConntrackMax: using conntrack-min") logger.V(3).Info("GetConntrackMax: using conntrack-min")
return floor, nil return floor, nil
} }
return 0, nil return 0, nil
@ -493,14 +495,14 @@ func detectNumCPU() int {
return numCPU return numCPU
} }
func getLocalDetector(ipFamily v1.IPFamily, mode proxyconfigapi.LocalMode, config *proxyconfigapi.KubeProxyConfiguration, nodePodCIDRs []string) (proxyutiliptables.LocalTrafficDetector, error) { func getLocalDetector(logger klog.Logger, ipFamily v1.IPFamily, mode proxyconfigapi.LocalMode, config *proxyconfigapi.KubeProxyConfiguration, nodePodCIDRs []string) (proxyutiliptables.LocalTrafficDetector, error) {
switch mode { switch mode {
case proxyconfigapi.LocalModeClusterCIDR: case proxyconfigapi.LocalModeClusterCIDR:
// LocalModeClusterCIDR is the default if --detect-local-mode wasn't passed, // LocalModeClusterCIDR is the default if --detect-local-mode wasn't passed,
// but --cluster-cidr is optional. // but --cluster-cidr is optional.
clusterCIDRs := strings.TrimSpace(config.ClusterCIDR) clusterCIDRs := strings.TrimSpace(config.ClusterCIDR)
if len(clusterCIDRs) == 0 { if len(clusterCIDRs) == 0 {
klog.InfoS("Detect-local-mode set to ClusterCIDR, but no cluster CIDR defined") logger.Info("Detect-local-mode set to ClusterCIDR, but no cluster CIDR defined")
break break
} }
@ -509,7 +511,7 @@ func getLocalDetector(ipFamily v1.IPFamily, mode proxyconfigapi.LocalMode, confi
return proxyutiliptables.NewDetectLocalByCIDR(cidrsByFamily[ipFamily][0]) return proxyutiliptables.NewDetectLocalByCIDR(cidrsByFamily[ipFamily][0])
} }
klog.InfoS("Detect-local-mode set to ClusterCIDR, but no cluster CIDR for family", "ipFamily", ipFamily) logger.Info("Detect-local-mode set to ClusterCIDR, but no cluster CIDR for family", "ipFamily", ipFamily)
case proxyconfigapi.LocalModeNodeCIDR: case proxyconfigapi.LocalModeNodeCIDR:
cidrsByFamily := proxyutil.MapCIDRsByIPFamily(nodePodCIDRs) cidrsByFamily := proxyutil.MapCIDRsByIPFamily(nodePodCIDRs)
@ -517,7 +519,7 @@ func getLocalDetector(ipFamily v1.IPFamily, mode proxyconfigapi.LocalMode, confi
return proxyutiliptables.NewDetectLocalByCIDR(cidrsByFamily[ipFamily][0]) return proxyutiliptables.NewDetectLocalByCIDR(cidrsByFamily[ipFamily][0])
} }
klog.InfoS("Detect-local-mode set to NodeCIDR, but no PodCIDR defined at node for family", "ipFamily", ipFamily) logger.Info("Detect-local-mode set to NodeCIDR, but no PodCIDR defined at node for family", "ipFamily", ipFamily)
case proxyconfigapi.LocalModeBridgeInterface: case proxyconfigapi.LocalModeBridgeInterface:
return proxyutiliptables.NewDetectLocalByBridgeInterface(config.DetectLocal.BridgeInterface) return proxyutiliptables.NewDetectLocalByBridgeInterface(config.DetectLocal.BridgeInterface)
@ -526,19 +528,19 @@ func getLocalDetector(ipFamily v1.IPFamily, mode proxyconfigapi.LocalMode, confi
return proxyutiliptables.NewDetectLocalByInterfaceNamePrefix(config.DetectLocal.InterfaceNamePrefix) return proxyutiliptables.NewDetectLocalByInterfaceNamePrefix(config.DetectLocal.InterfaceNamePrefix)
} }
klog.InfoS("Defaulting to no-op detect-local") logger.Info("Defaulting to no-op detect-local")
return proxyutiliptables.NewNoOpLocalDetector(), nil return proxyutiliptables.NewNoOpLocalDetector(), nil
} }
func getDualStackLocalDetectorTuple(mode proxyconfigapi.LocalMode, config *proxyconfigapi.KubeProxyConfiguration, nodePodCIDRs []string) ([2]proxyutiliptables.LocalTrafficDetector, error) { func getDualStackLocalDetectorTuple(logger klog.Logger, mode proxyconfigapi.LocalMode, config *proxyconfigapi.KubeProxyConfiguration, nodePodCIDRs []string) ([2]proxyutiliptables.LocalTrafficDetector, error) {
var localDetectors [2]proxyutiliptables.LocalTrafficDetector var localDetectors [2]proxyutiliptables.LocalTrafficDetector
var err error var err error
localDetectors[0], err = getLocalDetector(v1.IPv4Protocol, mode, config, nodePodCIDRs) localDetectors[0], err = getLocalDetector(logger, v1.IPv4Protocol, mode, config, nodePodCIDRs)
if err != nil { if err != nil {
return localDetectors, err return localDetectors, err
} }
localDetectors[1], err = getLocalDetector(v1.IPv6Protocol, mode, config, nodePodCIDRs) localDetectors[1], err = getLocalDetector(logger, v1.IPv6Protocol, mode, config, nodePodCIDRs)
if err != nil { if err != nil {
return localDetectors, err return localDetectors, err
} }

View File

@ -39,6 +39,7 @@ import (
clientgotesting "k8s.io/client-go/testing" clientgotesting "k8s.io/client-go/testing"
proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config" proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config"
proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables" proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables"
"k8s.io/kubernetes/test/utils/ktesting"
netutils "k8s.io/utils/net" netutils "k8s.io/utils/net"
"k8s.io/utils/ptr" "k8s.io/utils/ptr"
) )
@ -274,7 +275,8 @@ func Test_getLocalDetector(t *testing.T) {
} }
for _, c := range cases { for _, c := range cases {
t.Run(c.name, func(t *testing.T) { t.Run(c.name, func(t *testing.T) {
r, err := getLocalDetector(c.family, c.mode, c.config, c.nodePodCIDRs) logger, _ := ktesting.NewTestContext(t)
r, err := getLocalDetector(logger, c.family, c.mode, c.config, c.nodePodCIDRs)
if c.errExpected { if c.errExpected {
if err == nil { if err == nil {
t.Errorf("Expected error, but succeeded with %v", r) t.Errorf("Expected error, but succeeded with %v", r)
@ -421,7 +423,8 @@ func Test_getDualStackLocalDetectorTuple(t *testing.T) {
} }
for _, c := range cases { for _, c := range cases {
t.Run(c.name, func(t *testing.T) { t.Run(c.name, func(t *testing.T) {
r, err := getDualStackLocalDetectorTuple(c.mode, c.config, c.nodePodCIDRs) logger, _ := ktesting.NewTestContext(t)
r, err := getDualStackLocalDetectorTuple(logger, c.mode, c.config, c.nodePodCIDRs)
if c.errExpected { if c.errExpected {
if err == nil { if err == nil {
t.Errorf("Expected error, but succeeded with %q", r) t.Errorf("Expected error, but succeeded with %q", r)
@ -675,7 +678,8 @@ func TestGetConntrackMax(t *testing.T) {
Min: ptr.To(tc.min), Min: ptr.To(tc.min),
MaxPerCore: ptr.To(tc.maxPerCore), MaxPerCore: ptr.To(tc.maxPerCore),
} }
x, e := getConntrackMax(cfg) logger, _ := ktesting.NewTestContext(t)
x, e := getConntrackMax(logger, cfg)
if e != nil { if e != nil {
if tc.err == "" { if tc.err == "" {
t.Errorf("[%d] unexpected error: %v", i, e) t.Errorf("[%d] unexpected error: %v", i, e)

View File

@ -38,6 +38,7 @@ import (
componentbaseconfig "k8s.io/component-base/config" componentbaseconfig "k8s.io/component-base/config"
logsapi "k8s.io/component-base/logs/api/v1" logsapi "k8s.io/component-base/logs/api/v1"
kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config" kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config"
"k8s.io/kubernetes/test/utils/ktesting"
netutils "k8s.io/utils/net" netutils "k8s.io/utils/net"
"k8s.io/utils/ptr" "k8s.io/utils/ptr"
) )
@ -646,7 +647,8 @@ func Test_getNodeIPs(t *testing.T) {
nodeName := fmt.Sprintf("node%d", i+1) nodeName := fmt.Sprintf("node%d", i+1)
expectIP := fmt.Sprintf("192.168.0.%d", i+1) expectIP := fmt.Sprintf("192.168.0.%d", i+1)
go func() { go func() {
ips := getNodeIPs(client, nodeName) logger, _ := ktesting.NewTestContext(t)
ips := getNodeIPs(logger, client, nodeName)
if len(ips) == 0 { if len(ips) == 0 {
ch <- fmt.Errorf("expected IP %s for %s but got nil", expectIP, nodeName) ch <- fmt.Errorf("expected IP %s for %s but got nil", expectIP, nodeName)
} else if ips[0].String() != expectIP { } else if ips[0].String() != expectIP {
@ -825,7 +827,8 @@ func Test_detectNodeIPs(t *testing.T) {
} }
for _, c := range cases { for _, c := range cases {
t.Run(c.name, func(t *testing.T) { t.Run(c.name, func(t *testing.T) {
primaryFamily, ips := detectNodeIPs(c.rawNodeIPs, c.bindAddress) logger, _ := ktesting.NewTestContext(t)
primaryFamily, ips := detectNodeIPs(logger, c.rawNodeIPs, c.bindAddress)
if primaryFamily != c.expectedFamily { if primaryFamily != c.expectedFamily {
t.Errorf("Expected family %q got %q", c.expectedFamily, primaryFamily) t.Errorf("Expected family %q got %q", c.expectedFamily, primaryFamily)
} }

View File

@ -116,7 +116,6 @@ linters-settings: # please keep this alphabetized
-structured .* -structured .*
# Now enable it again for migrated packages. # Now enable it again for migrated packages.
structured k8s.io/kubernetes/cmd/kube-proxy/.*
structured k8s.io/kubernetes/cmd/kubelet/.* structured k8s.io/kubernetes/cmd/kubelet/.*
structured k8s.io/kubernetes/pkg/kubelet/.* structured k8s.io/kubernetes/pkg/kubelet/.*
structured k8s.io/kubernetes/pkg/proxy/.* structured k8s.io/kubernetes/pkg/proxy/.*
@ -131,6 +130,7 @@ linters-settings: # please keep this alphabetized
contextual k8s.io/client-go/tools/events/.* contextual k8s.io/client-go/tools/events/.*
contextual k8s.io/client-go/tools/record/.* contextual k8s.io/client-go/tools/record/.*
contextual k8s.io/dynamic-resource-allocation/.* contextual k8s.io/dynamic-resource-allocation/.*
contextual k8s.io/kubernetes/cmd/kube-proxy/.*
contextual k8s.io/kubernetes/cmd/kube-scheduler/.* contextual k8s.io/kubernetes/cmd/kube-scheduler/.*
contextual k8s.io/kubernetes/pkg/controller/.* contextual k8s.io/kubernetes/pkg/controller/.*
contextual k8s.io/kubernetes/pkg/scheduler/.* contextual k8s.io/kubernetes/pkg/scheduler/.*

View File

@ -163,7 +163,6 @@ linters-settings: # please keep this alphabetized
-structured .* -structured .*
# Now enable it again for migrated packages. # Now enable it again for migrated packages.
structured k8s.io/kubernetes/cmd/kube-proxy/.*
structured k8s.io/kubernetes/cmd/kubelet/.* structured k8s.io/kubernetes/cmd/kubelet/.*
structured k8s.io/kubernetes/pkg/kubelet/.* structured k8s.io/kubernetes/pkg/kubelet/.*
structured k8s.io/kubernetes/pkg/proxy/.* structured k8s.io/kubernetes/pkg/proxy/.*
@ -178,6 +177,7 @@ linters-settings: # please keep this alphabetized
contextual k8s.io/client-go/tools/events/.* contextual k8s.io/client-go/tools/events/.*
contextual k8s.io/client-go/tools/record/.* contextual k8s.io/client-go/tools/record/.*
contextual k8s.io/dynamic-resource-allocation/.* contextual k8s.io/dynamic-resource-allocation/.*
contextual k8s.io/kubernetes/cmd/kube-proxy/.*
contextual k8s.io/kubernetes/cmd/kube-scheduler/.* contextual k8s.io/kubernetes/cmd/kube-scheduler/.*
contextual k8s.io/kubernetes/pkg/controller/.* contextual k8s.io/kubernetes/pkg/controller/.*
contextual k8s.io/kubernetes/pkg/scheduler/.* contextual k8s.io/kubernetes/pkg/scheduler/.*

View File

@ -166,7 +166,6 @@ linters-settings: # please keep this alphabetized
-structured .* -structured .*
# Now enable it again for migrated packages. # Now enable it again for migrated packages.
structured k8s.io/kubernetes/cmd/kube-proxy/.*
structured k8s.io/kubernetes/cmd/kubelet/.* structured k8s.io/kubernetes/cmd/kubelet/.*
structured k8s.io/kubernetes/pkg/kubelet/.* structured k8s.io/kubernetes/pkg/kubelet/.*
structured k8s.io/kubernetes/pkg/proxy/.* structured k8s.io/kubernetes/pkg/proxy/.*
@ -181,6 +180,7 @@ linters-settings: # please keep this alphabetized
contextual k8s.io/client-go/tools/events/.* contextual k8s.io/client-go/tools/events/.*
contextual k8s.io/client-go/tools/record/.* contextual k8s.io/client-go/tools/record/.*
contextual k8s.io/dynamic-resource-allocation/.* contextual k8s.io/dynamic-resource-allocation/.*
contextual k8s.io/kubernetes/cmd/kube-proxy/.*
contextual k8s.io/kubernetes/cmd/kube-scheduler/.* contextual k8s.io/kubernetes/cmd/kube-scheduler/.*
contextual k8s.io/kubernetes/pkg/controller/.* contextual k8s.io/kubernetes/pkg/controller/.*
contextual k8s.io/kubernetes/pkg/scheduler/.* contextual k8s.io/kubernetes/pkg/scheduler/.*

View File

@ -14,7 +14,6 @@
-structured .* -structured .*
# Now enable it again for migrated packages. # Now enable it again for migrated packages.
structured k8s.io/kubernetes/cmd/kube-proxy/.*
structured k8s.io/kubernetes/cmd/kubelet/.* structured k8s.io/kubernetes/cmd/kubelet/.*
structured k8s.io/kubernetes/pkg/kubelet/.* structured k8s.io/kubernetes/pkg/kubelet/.*
structured k8s.io/kubernetes/pkg/proxy/.* structured k8s.io/kubernetes/pkg/proxy/.*
@ -29,6 +28,7 @@ contextual k8s.io/client-go/metadata/.*
contextual k8s.io/client-go/tools/events/.* contextual k8s.io/client-go/tools/events/.*
contextual k8s.io/client-go/tools/record/.* contextual k8s.io/client-go/tools/record/.*
contextual k8s.io/dynamic-resource-allocation/.* contextual k8s.io/dynamic-resource-allocation/.*
contextual k8s.io/kubernetes/cmd/kube-proxy/.*
contextual k8s.io/kubernetes/cmd/kube-scheduler/.* contextual k8s.io/kubernetes/cmd/kube-scheduler/.*
contextual k8s.io/kubernetes/pkg/controller/.* contextual k8s.io/kubernetes/pkg/controller/.*
contextual k8s.io/kubernetes/pkg/scheduler/.* contextual k8s.io/kubernetes/pkg/scheduler/.*