From 08ce5805769526e5cba3d555cd981048f87639c0 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Mon, 13 Mar 2023 10:57:19 -0400 Subject: [PATCH 1/2] Add ProxyServer.platformSetup Move the Linux-specific conntrack setup code into a new "platformSetup" rather than trying to fit it into the generic setup code. Also move metrics registration there. --- cmd/kube-proxy/app/server.go | 61 ++-------------------- cmd/kube-proxy/app/server_others.go | 64 ++++++++++++++++++++++-- cmd/kube-proxy/app/server_others_test.go | 48 ++++++++++++++++++ cmd/kube-proxy/app/server_test.go | 48 ------------------ cmd/kube-proxy/app/server_windows.go | 12 ++--- 5 files changed, 119 insertions(+), 114 deletions(-) diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index b9048aa3aed..44be8944d03 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -514,7 +514,6 @@ type ProxyServer struct { Client clientset.Interface Broadcaster events.EventBroadcaster Recorder events.EventRecorder - Conntracker Conntracker // if nil, ignored NodeRef *v1.ObjectReference HealthzServer healthcheck.ProxierHealthUpdater @@ -652,45 +651,10 @@ func (s *ProxyServer) Run() error { // Start up a metrics server if requested serveMetrics(s.Config.MetricsBindAddress, s.Config.Mode, s.Config.EnableProfiling, errCh) - // Tune conntrack, if requested - // Conntracker is always nil for windows - if s.Conntracker != nil { - max, err := getConntrackMax(s.Config.Conntrack) - if err != nil { - return err - } - if max > 0 { - err := s.Conntracker.SetMax(max) - if err != nil { - if err != errReadOnlySysFS { - return err - } - // errReadOnlySysFS is caused by a known docker issue (https://github.com/docker/docker/issues/24000), - // the only remediation we know is to restart the docker daemon. - // Here we'll send an node event with specific reason and message, the - // administrator should decide whether and how to handle this issue, - // whether to drain the node and restart docker. Occurs in other container runtimes - // as well. - // TODO(random-liu): Remove this when the docker bug is fixed. - const message = "CRI error: /sys is read-only: " + - "cannot modify conntrack limits, problems may arise later (If running Docker, see docker issue #24000)" - s.Recorder.Eventf(s.NodeRef, nil, api.EventTypeWarning, err.Error(), "StartKubeProxy", message) - } - } - - if s.Config.Conntrack.TCPEstablishedTimeout != nil && s.Config.Conntrack.TCPEstablishedTimeout.Duration > 0 { - timeout := int(s.Config.Conntrack.TCPEstablishedTimeout.Duration / time.Second) - if err := s.Conntracker.SetTCPEstablishedTimeout(timeout); err != nil { - return err - } - } - - if s.Config.Conntrack.TCPCloseWaitTimeout != nil && s.Config.Conntrack.TCPCloseWaitTimeout.Duration > 0 { - timeout := int(s.Config.Conntrack.TCPCloseWaitTimeout.Duration / time.Second) - if err := s.Conntracker.SetTCPCloseWaitTimeout(timeout); err != nil { - return err - } - } + // Do platform-specific setup + err := s.platformSetup() + if err != nil { + return err } noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil) @@ -758,23 +722,6 @@ func (s *ProxyServer) birthCry() { s.Recorder.Eventf(s.NodeRef, nil, api.EventTypeNormal, "Starting", "StartKubeProxy", "") } -func getConntrackMax(config kubeproxyconfig.KubeProxyConntrackConfiguration) (int, error) { - if config.MaxPerCore != nil && *config.MaxPerCore > 0 { - floor := 0 - if config.Min != nil { - floor = int(*config.Min) - } - scaled := int(*config.MaxPerCore) * detectNumCPU() - if scaled > floor { - klog.V(3).InfoS("GetConntrackMax: using scaled conntrack-max-per-core") - return scaled, nil - } - klog.V(3).InfoS("GetConntrackMax: using conntrack-min") - return floor, nil - } - return 0, nil -} - // detectNodeIP returns the nodeIP used by the proxier // The order of precedence is: // 1. config.bindAddress if bindAddress is not 0.0.0.0 or :: diff --git a/cmd/kube-proxy/app/server_others.go b/cmd/kube-proxy/app/server_others.go index f78ab70792b..7959f9569f6 100644 --- a/cmd/kube-proxy/app/server_others.go +++ b/cmd/kube-proxy/app/server_others.go @@ -247,7 +247,6 @@ func newProxyServer( if err != nil { return nil, fmt.Errorf("unable to create proxier: %v", err) } - proxymetrics.RegisterMetrics() } else if config.Mode == proxyconfigapi.ProxyModeIPVS { kernelHandler := ipvs.NewLinuxKernelHandler() ipsetInterface = utilipset.New(execer) @@ -329,7 +328,6 @@ func newProxyServer( if err != nil { return nil, fmt.Errorf("unable to create proxier: %v", err) } - proxymetrics.RegisterMetrics() } return &ProxyServer{ @@ -338,12 +336,72 @@ func newProxyServer( Proxier: proxier, Broadcaster: eventBroadcaster, Recorder: recorder, - Conntracker: &realConntracker{}, NodeRef: nodeRef, HealthzServer: healthzServer, }, nil } +func (s *ProxyServer) platformSetup() error { + ct := &realConntracker{} + + max, err := getConntrackMax(s.Config.Conntrack) + if err != nil { + return err + } + if max > 0 { + err := ct.SetMax(max) + if err != nil { + if err != errReadOnlySysFS { + return err + } + // errReadOnlySysFS is caused by a known docker issue (https://github.com/docker/docker/issues/24000), + // the only remediation we know is to restart the docker daemon. + // Here we'll send an node event with specific reason and message, the + // administrator should decide whether and how to handle this issue, + // whether to drain the node and restart docker. Occurs in other container runtimes + // as well. + // TODO(random-liu): Remove this when the docker bug is fixed. + const message = "CRI error: /sys is read-only: " + + "cannot modify conntrack limits, problems may arise later (If running Docker, see docker issue #24000)" + s.Recorder.Eventf(s.NodeRef, nil, v1.EventTypeWarning, err.Error(), "StartKubeProxy", message) + } + } + + if s.Config.Conntrack.TCPEstablishedTimeout != nil && s.Config.Conntrack.TCPEstablishedTimeout.Duration > 0 { + timeout := int(s.Config.Conntrack.TCPEstablishedTimeout.Duration / time.Second) + if err := ct.SetTCPEstablishedTimeout(timeout); err != nil { + return err + } + } + + if s.Config.Conntrack.TCPCloseWaitTimeout != nil && s.Config.Conntrack.TCPCloseWaitTimeout.Duration > 0 { + timeout := int(s.Config.Conntrack.TCPCloseWaitTimeout.Duration / time.Second) + if err := ct.SetTCPCloseWaitTimeout(timeout); err != nil { + return err + } + } + + proxymetrics.RegisterMetrics() + return nil +} + +func getConntrackMax(config proxyconfigapi.KubeProxyConntrackConfiguration) (int, error) { + if config.MaxPerCore != nil && *config.MaxPerCore > 0 { + floor := 0 + if config.Min != nil { + floor = int(*config.Min) + } + scaled := int(*config.MaxPerCore) * detectNumCPU() + if scaled > floor { + klog.V(3).InfoS("GetConntrackMax: using scaled conntrack-max-per-core") + return scaled, nil + } + klog.V(3).InfoS("GetConntrackMax: using conntrack-min") + return floor, nil + } + return 0, nil +} + func waitForPodCIDR(client clientset.Interface, nodeName string) (*v1.Node, error) { // since allocators can assign the podCIDR after the node registers, we do a watch here to wait // for podCIDR to be assigned, instead of assuming that the Get() on startup will have it. diff --git a/cmd/kube-proxy/app/server_others_test.go b/cmd/kube-proxy/app/server_others_test.go index e6836138c2f..e595ba282ef 100644 --- a/cmd/kube-proxy/app/server_others_test.go +++ b/cmd/kube-proxy/app/server_others_test.go @@ -25,6 +25,7 @@ import ( "os" "path/filepath" "reflect" + goruntime "runtime" "strings" "testing" "time" @@ -34,6 +35,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" netutils "k8s.io/utils/net" + "k8s.io/utils/pointer" clientsetfake "k8s.io/client-go/kubernetes/fake" clientgotesting "k8s.io/client-go/testing" @@ -733,3 +735,49 @@ func Test_waitForPodCIDR(t *testing.T) { t.Errorf("waitForPodCIDR() got %v expected to be %v ", got.Spec.PodCIDRs, expected) } } + +func TestGetConntrackMax(t *testing.T) { + ncores := goruntime.NumCPU() + testCases := []struct { + min int32 + maxPerCore int32 + expected int + err string + }{ + { + expected: 0, + }, + { + maxPerCore: 67890, // use this if Max is 0 + min: 1, // avoid 0 default + expected: 67890 * ncores, + }, + { + maxPerCore: 1, // ensure that Min is considered + min: 123456, + expected: 123456, + }, + { + maxPerCore: 0, // leave system setting + min: 123456, + expected: 0, + }, + } + + for i, tc := range testCases { + cfg := proxyconfigapi.KubeProxyConntrackConfiguration{ + Min: pointer.Int32(tc.min), + MaxPerCore: pointer.Int32(tc.maxPerCore), + } + x, e := getConntrackMax(cfg) + if e != nil { + if tc.err == "" { + t.Errorf("[%d] unexpected error: %v", i, e) + } else if !strings.Contains(e.Error(), tc.err) { + t.Errorf("[%d] expected an error containing %q: %v", i, tc.err, e) + } + } else if x != tc.expected { + t.Errorf("[%d] expected %d, got %d", i, tc.expected, x) + } + } +} diff --git a/cmd/kube-proxy/app/server_test.go b/cmd/kube-proxy/app/server_test.go index b3cb9422cf1..96b1e59872d 100644 --- a/cmd/kube-proxy/app/server_test.go +++ b/cmd/kube-proxy/app/server_test.go @@ -20,8 +20,6 @@ import ( "errors" "fmt" "reflect" - "runtime" - "strings" "testing" "time" @@ -36,52 +34,6 @@ import ( kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config" ) -func TestGetConntrackMax(t *testing.T) { - ncores := runtime.NumCPU() - testCases := []struct { - min int32 - maxPerCore int32 - expected int - err string - }{ - { - expected: 0, - }, - { - maxPerCore: 67890, // use this if Max is 0 - min: 1, // avoid 0 default - expected: 67890 * ncores, - }, - { - maxPerCore: 1, // ensure that Min is considered - min: 123456, - expected: 123456, - }, - { - maxPerCore: 0, // leave system setting - min: 123456, - expected: 0, - }, - } - - for i, tc := range testCases { - cfg := kubeproxyconfig.KubeProxyConntrackConfiguration{ - Min: pointer.Int32(tc.min), - MaxPerCore: pointer.Int32(tc.maxPerCore), - } - x, e := getConntrackMax(cfg) - if e != nil { - if tc.err == "" { - t.Errorf("[%d] unexpected error: %v", i, e) - } else if !strings.Contains(e.Error(), tc.err) { - t.Errorf("[%d] expected an error containing %q: %v", i, tc.err, e) - } - } else if x != tc.expected { - t.Errorf("[%d] expected %d, got %d", i, tc.expected, x) - } - } -} - // TestLoadConfig tests proper operation of loadConfig() func TestLoadConfig(t *testing.T) { diff --git a/cmd/kube-proxy/app/server_windows.go b/cmd/kube-proxy/app/server_windows.go index 43ae62600e9..58befa1b2f2 100644 --- a/cmd/kube-proxy/app/server_windows.go +++ b/cmd/kube-proxy/app/server_windows.go @@ -25,7 +25,6 @@ import ( "errors" "fmt" "net" - goruntime "runtime" "strconv" // Enable pprof HTTP handlers. @@ -105,6 +104,7 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, master string } var proxier proxy.Provider + dualStackMode := getDualStackMode(config.Winkernel.NetworkName, winkernel.DualStackCompatTester{}) if dualStackMode { klog.InfoS("Creating dualStackProxier for Windows kernel.") @@ -136,7 +136,6 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, master string if err != nil { return nil, fmt.Errorf("unable to create proxier: %v", err) } - winkernel.RegisterMetrics() return &ProxyServer{ Config: config, @@ -149,12 +148,13 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, master string }, nil } -func getDualStackMode(networkname string, compatTester winkernel.StackCompatTester) bool { - return compatTester.DualStackCompatible(networkname) +func (s *ProxyServer) platformSetup() error { + winkernel.RegisterMetrics() + return nil } -func detectNumCPU() int { - return goruntime.NumCPU() +func getDualStackMode(networkname string, compatTester winkernel.StackCompatTester) bool { + return compatTester.DualStackCompatible(networkname) } // cleanupAndExit cleans up after a previous proxy run From 6232ac734a0c0bdb614e46e7c447f779fec39b9c Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Sat, 11 Mar 2023 17:25:13 -0500 Subject: [PATCH 2/2] Merge duplicated Linux/Windows kube-proxy setup code --- cmd/kube-proxy/app/server.go | 56 +++++++++++++- cmd/kube-proxy/app/server_others.go | 109 ++++++--------------------- cmd/kube-proxy/app/server_windows.go | 75 +++--------------- 3 files changed, 88 insertions(+), 152 deletions(-) diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 44be8944d03..78a9639f2d5 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -40,6 +40,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/selection" + "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/server/healthz" @@ -57,11 +58,13 @@ import ( "k8s.io/component-base/configz" "k8s.io/component-base/logs" logsapi "k8s.io/component-base/logs/api/v1" + "k8s.io/component-base/metrics" metricsfeatures "k8s.io/component-base/metrics/features" "k8s.io/component-base/metrics/legacyregistry" "k8s.io/component-base/metrics/prometheus/slis" "k8s.io/component-base/version" "k8s.io/component-base/version/verflag" + nodeutil "k8s.io/component-helpers/node/util" "k8s.io/klog/v2" "k8s.io/kube-proxy/config/v1alpha1" api "k8s.io/kubernetes/pkg/apis/core" @@ -319,7 +322,7 @@ func (o *Options) Run() error { return cleanupAndExit() } - proxyServer, err := NewProxyServer(o) + proxyServer, err := newProxyServer(o.config, o.master) if err != nil { return err } @@ -516,10 +519,61 @@ type ProxyServer struct { Recorder events.EventRecorder NodeRef *v1.ObjectReference HealthzServer healthcheck.ProxierHealthUpdater + Hostname string + NodeIP net.IP Proxier proxy.Provider } +// newProxyServer creates a ProxyServer based on the given config +func newProxyServer(config *kubeproxyconfig.KubeProxyConfiguration, master string) (*ProxyServer, error) { + s := &ProxyServer{Config: config} + + cz, err := configz.New(kubeproxyconfig.GroupName) + if err != nil { + return nil, fmt.Errorf("unable to register configz: %s", err) + } + cz.Set(config) + + if len(config.ShowHiddenMetricsForVersion) > 0 { + metrics.SetShowHidden() + } + + s.Hostname, err = nodeutil.GetHostname(config.HostnameOverride) + if err != nil { + return nil, err + } + + s.Client, err = createClient(config.ClientConnection, master) + if err != nil { + return nil, err + } + + s.NodeIP = detectNodeIP(s.Client, s.Hostname, config.BindAddress) + klog.InfoS("Detected node IP", "address", s.NodeIP.String()) + + s.Broadcaster = events.NewBroadcaster(&events.EventSinkImpl{Interface: s.Client.EventsV1()}) + s.Recorder = s.Broadcaster.NewRecorder(proxyconfigscheme.Scheme, "kube-proxy") + + s.NodeRef = &v1.ObjectReference{ + Kind: "Node", + Name: s.Hostname, + UID: types.UID(s.Hostname), + Namespace: "", + } + + if len(config.HealthzBindAddress) > 0 { + s.HealthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, s.Recorder, s.NodeRef) + } + + s.Proxier, err = s.createProxier(config) + if err != nil { + return nil, err + } + + return s, nil +} + // createClient creates a kube client from the given config and masterOverride. // TODO remove masterOverride when CLI flags are removed. func createClient(config componentbaseconfig.ClientConnectionConfiguration, masterOverride string) (clientset.Interface, error) { diff --git a/cmd/kube-proxy/app/server_others.go b/cmd/kube-proxy/app/server_others.go index 7959f9569f6..5380a8faaaf 100644 --- a/cmd/kube-proxy/app/server_others.go +++ b/cmd/kube-proxy/app/server_others.go @@ -35,23 +35,16 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/events" "k8s.io/apimachinery/pkg/fields" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" clientset "k8s.io/client-go/kubernetes" toolswatch "k8s.io/client-go/tools/watch" - "k8s.io/component-base/configz" - "k8s.io/component-base/metrics" - nodeutil "k8s.io/component-helpers/node/util" utilsysctl "k8s.io/component-helpers/node/util/sysctl" "k8s.io/kubernetes/pkg/proxy" proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config" - "k8s.io/kubernetes/pkg/proxy/apis/config/scheme" - "k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/proxy/iptables" "k8s.io/kubernetes/pkg/proxy/ipvs" proxymetrics "k8s.io/kubernetes/pkg/proxy/metrics" @@ -83,63 +76,15 @@ func (o *Options) platformApplyDefaults(config *proxyconfigapi.KubeProxyConfigur klog.V(2).InfoS("DetectLocalMode", "localMode", string(config.DetectLocalMode)) } -// NewProxyServer returns a new ProxyServer. -func NewProxyServer(o *Options) (*ProxyServer, error) { - return newProxyServer(o.config, o.master) -} - -func newProxyServer( - config *proxyconfigapi.KubeProxyConfiguration, - master string) (*ProxyServer, error) { - - if c, err := configz.New(proxyconfigapi.GroupName); err == nil { - c.Set(config) - } else { - return nil, fmt.Errorf("unable to register configz: %s", err) - } - - var ipvsInterface utilipvs.Interface - var ipsetInterface utilipset.Interface - - if len(config.ShowHiddenMetricsForVersion) > 0 { - metrics.SetShowHidden() - } - - hostname, err := nodeutil.GetHostname(config.HostnameOverride) - if err != nil { - return nil, err - } - - client, err := createClient(config.ClientConnection, master) - if err != nil { - return nil, err - } - - nodeIP := detectNodeIP(client, hostname, config.BindAddress) - klog.InfoS("Detected node IP", "address", nodeIP.String()) - - // Create event recorder - eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()}) - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "kube-proxy") - - nodeRef := &v1.ObjectReference{ - Kind: "Node", - Name: hostname, - UID: types.UID(hostname), - Namespace: "", - } - - var healthzServer healthcheck.ProxierHealthUpdater - if len(config.HealthzBindAddress) > 0 { - healthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef) - } - +// createProxier creates the proxy.Provider +func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguration) (proxy.Provider, error) { var proxier proxy.Provider + var err error var nodeInfo *v1.Node if config.DetectLocalMode == proxyconfigapi.LocalModeNodeCIDR { - klog.InfoS("Watching for node, awaiting podCIDR allocation", "hostname", hostname) - nodeInfo, err = waitForPodCIDR(client, hostname) + klog.InfoS("Watching for node, awaiting podCIDR allocation", "hostname", s.Hostname) + nodeInfo, err = waitForPodCIDR(s.Client, s.Hostname) if err != nil { return nil, err } @@ -148,7 +93,7 @@ func newProxyServer( primaryFamily := v1.IPv4Protocol primaryProtocol := utiliptables.ProtocolIPv4 - if netutils.IsIPv6(nodeIP) { + if netutils.IsIPv6(s.NodeIP) { primaryFamily = v1.IPv6Protocol primaryProtocol = utiliptables.ProtocolIPv6 } @@ -210,10 +155,10 @@ func newProxyServer( *config.IPTables.LocalhostNodePorts, int(*config.IPTables.MasqueradeBit), localDetectors, - hostname, + s.Hostname, nodeIPTuple(config.BindAddress), - recorder, - healthzServer, + s.Recorder, + s.HealthzServer, nodePortAddresses, ) } else { @@ -236,10 +181,10 @@ func newProxyServer( *config.IPTables.LocalhostNodePorts, int(*config.IPTables.MasqueradeBit), localDetector, - hostname, - nodeIP, - recorder, - healthzServer, + s.Hostname, + s.NodeIP, + s.Recorder, + s.HealthzServer, nodePortAddresses, ) } @@ -249,8 +194,8 @@ func newProxyServer( } } else if config.Mode == proxyconfigapi.ProxyModeIPVS { kernelHandler := ipvs.NewLinuxKernelHandler() - ipsetInterface = utilipset.New(execer) - ipvsInterface = utilipvs.New() + ipsetInterface := utilipset.New(execer) + ipvsInterface := utilipvs.New() if err := ipvs.CanUseIPVSProxier(ipvsInterface, ipsetInterface, config.IPVS.Scheduler); err != nil { return nil, fmt.Errorf("can't use the IPVS proxier: %v", err) } @@ -284,10 +229,10 @@ func newProxyServer( config.IPTables.MasqueradeAll, int(*config.IPTables.MasqueradeBit), localDetectors, - hostname, + s.Hostname, nodeIPs, - recorder, - healthzServer, + s.Recorder, + s.HealthzServer, config.IPVS.Scheduler, nodePortAddresses, kernelHandler, @@ -316,10 +261,10 @@ func newProxyServer( config.IPTables.MasqueradeAll, int(*config.IPTables.MasqueradeBit), localDetector, - hostname, - nodeIP, - recorder, - healthzServer, + s.Hostname, + s.NodeIP, + s.Recorder, + s.HealthzServer, config.IPVS.Scheduler, nodePortAddresses, kernelHandler, @@ -330,15 +275,7 @@ func newProxyServer( } } - return &ProxyServer{ - Config: config, - Client: client, - Proxier: proxier, - Broadcaster: eventBroadcaster, - Recorder: recorder, - NodeRef: nodeRef, - HealthzServer: healthzServer, - }, nil + return proxier, nil } func (s *ProxyServer) platformSetup() error { diff --git a/cmd/kube-proxy/app/server_windows.go b/cmd/kube-proxy/app/server_windows.go index 58befa1b2f2..3cb813bfd28 100644 --- a/cmd/kube-proxy/app/server_windows.go +++ b/cmd/kube-proxy/app/server_windows.go @@ -30,17 +30,9 @@ import ( // Enable pprof HTTP handlers. _ "net/http/pprof" - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/tools/events" - "k8s.io/component-base/configz" - "k8s.io/component-base/metrics" - nodeutil "k8s.io/component-helpers/node/util" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/proxy" proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config" - proxyconfigscheme "k8s.io/kubernetes/pkg/proxy/apis/config/scheme" - "k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/proxy/winkernel" ) @@ -50,49 +42,10 @@ func (o *Options) platformApplyDefaults(config *proxyconfigapi.KubeProxyConfigur } } -// NewProxyServer returns a new ProxyServer. -func NewProxyServer(o *Options) (*ProxyServer, error) { - return newProxyServer(o.config, o.master) -} - -func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, master string) (*ProxyServer, error) { - if c, err := configz.New(proxyconfigapi.GroupName); err == nil { - c.Set(config) - } else { - return nil, fmt.Errorf("unable to register configz: %s", err) - } - - if len(config.ShowHiddenMetricsForVersion) > 0 { - metrics.SetShowHidden() - } - - client, err := createClient(config.ClientConnection, master) - if err != nil { - return nil, err - } - - // Create event recorder - hostname, err := nodeutil.GetHostname(config.HostnameOverride) - if err != nil { - return nil, err - } - nodeIP := detectNodeIP(client, hostname, config.BindAddress) - klog.InfoS("Detected node IP", "IP", nodeIP.String()) - - eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()}) - recorder := eventBroadcaster.NewRecorder(proxyconfigscheme.Scheme, "kube-proxy") - - nodeRef := &v1.ObjectReference{ - Kind: "Node", - Name: hostname, - UID: types.UID(hostname), - Namespace: "", - } - - var healthzServer healthcheck.ProxierHealthUpdater +// createProxier creates the proxy.Provider +func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguration) (proxy.Provider, error) { var healthzPort int if len(config.HealthzBindAddress) > 0 { - healthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef) _, port, _ := net.SplitHostPort(config.HealthzBindAddress) healthzPort, _ = strconv.Atoi(port) } @@ -113,10 +66,10 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, master string config.IPTables.SyncPeriod.Duration, config.IPTables.MinSyncPeriod.Duration, config.ClusterCIDR, - hostname, + s.Hostname, nodeIPTuple(config.BindAddress), - recorder, - healthzServer, + s.Recorder, + s.HealthzServer, config.Winkernel, healthzPort, ) @@ -125,10 +78,10 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, master string config.IPTables.SyncPeriod.Duration, config.IPTables.MinSyncPeriod.Duration, config.ClusterCIDR, - hostname, - nodeIP, - recorder, - healthzServer, + s.Hostname, + s.NodeIP, + s.Recorder, + s.HealthzServer, config.Winkernel, healthzPort, ) @@ -137,15 +90,7 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, master string return nil, fmt.Errorf("unable to create proxier: %v", err) } - return &ProxyServer{ - Config: config, - Client: client, - Proxier: proxier, - Broadcaster: eventBroadcaster, - Recorder: recorder, - NodeRef: nodeRef, - HealthzServer: healthzServer, - }, nil + return proxier, nil } func (s *ProxyServer) platformSetup() error {