mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-09 12:07:47 +00:00
Merge pull request #118017 from danwinship/kube-proxy-platform
merge duplicated linux/windows kube-proxy setup code
This commit is contained in:
commit
d8e9a7b33a
@ -40,6 +40,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
"k8s.io/apimachinery/pkg/selection"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
@ -57,11 +58,13 @@ import (
|
||||
"k8s.io/component-base/configz"
|
||||
"k8s.io/component-base/logs"
|
||||
logsapi "k8s.io/component-base/logs/api/v1"
|
||||
"k8s.io/component-base/metrics"
|
||||
metricsfeatures "k8s.io/component-base/metrics/features"
|
||||
"k8s.io/component-base/metrics/legacyregistry"
|
||||
"k8s.io/component-base/metrics/prometheus/slis"
|
||||
"k8s.io/component-base/version"
|
||||
"k8s.io/component-base/version/verflag"
|
||||
nodeutil "k8s.io/component-helpers/node/util"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kube-proxy/config/v1alpha1"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
@ -319,7 +322,7 @@ func (o *Options) Run() error {
|
||||
return cleanupAndExit()
|
||||
}
|
||||
|
||||
proxyServer, err := NewProxyServer(o)
|
||||
proxyServer, err := newProxyServer(o.config, o.master)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -514,13 +517,63 @@ type ProxyServer struct {
|
||||
Client clientset.Interface
|
||||
Broadcaster events.EventBroadcaster
|
||||
Recorder events.EventRecorder
|
||||
Conntracker Conntracker // if nil, ignored
|
||||
NodeRef *v1.ObjectReference
|
||||
HealthzServer healthcheck.ProxierHealthUpdater
|
||||
Hostname string
|
||||
NodeIP net.IP
|
||||
|
||||
Proxier proxy.Provider
|
||||
}
|
||||
|
||||
// newProxyServer creates a ProxyServer based on the given config
|
||||
func newProxyServer(config *kubeproxyconfig.KubeProxyConfiguration, master string) (*ProxyServer, error) {
|
||||
s := &ProxyServer{Config: config}
|
||||
|
||||
cz, err := configz.New(kubeproxyconfig.GroupName)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to register configz: %s", err)
|
||||
}
|
||||
cz.Set(config)
|
||||
|
||||
if len(config.ShowHiddenMetricsForVersion) > 0 {
|
||||
metrics.SetShowHidden()
|
||||
}
|
||||
|
||||
s.Hostname, err = nodeutil.GetHostname(config.HostnameOverride)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s.Client, err = createClient(config.ClientConnection, master)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s.NodeIP = detectNodeIP(s.Client, s.Hostname, config.BindAddress)
|
||||
klog.InfoS("Detected node IP", "address", s.NodeIP.String())
|
||||
|
||||
s.Broadcaster = events.NewBroadcaster(&events.EventSinkImpl{Interface: s.Client.EventsV1()})
|
||||
s.Recorder = s.Broadcaster.NewRecorder(proxyconfigscheme.Scheme, "kube-proxy")
|
||||
|
||||
s.NodeRef = &v1.ObjectReference{
|
||||
Kind: "Node",
|
||||
Name: s.Hostname,
|
||||
UID: types.UID(s.Hostname),
|
||||
Namespace: "",
|
||||
}
|
||||
|
||||
if len(config.HealthzBindAddress) > 0 {
|
||||
s.HealthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, s.Recorder, s.NodeRef)
|
||||
}
|
||||
|
||||
s.Proxier, err = s.createProxier(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// createClient creates a kube client from the given config and masterOverride.
|
||||
// TODO remove masterOverride when CLI flags are removed.
|
||||
func createClient(config componentbaseconfig.ClientConnectionConfiguration, masterOverride string) (clientset.Interface, error) {
|
||||
@ -652,45 +705,10 @@ func (s *ProxyServer) Run() error {
|
||||
// Start up a metrics server if requested
|
||||
serveMetrics(s.Config.MetricsBindAddress, s.Config.Mode, s.Config.EnableProfiling, errCh)
|
||||
|
||||
// Tune conntrack, if requested
|
||||
// Conntracker is always nil for windows
|
||||
if s.Conntracker != nil {
|
||||
max, err := getConntrackMax(s.Config.Conntrack)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if max > 0 {
|
||||
err := s.Conntracker.SetMax(max)
|
||||
if err != nil {
|
||||
if err != errReadOnlySysFS {
|
||||
return err
|
||||
}
|
||||
// errReadOnlySysFS is caused by a known docker issue (https://github.com/docker/docker/issues/24000),
|
||||
// the only remediation we know is to restart the docker daemon.
|
||||
// Here we'll send an node event with specific reason and message, the
|
||||
// administrator should decide whether and how to handle this issue,
|
||||
// whether to drain the node and restart docker. Occurs in other container runtimes
|
||||
// as well.
|
||||
// TODO(random-liu): Remove this when the docker bug is fixed.
|
||||
const message = "CRI error: /sys is read-only: " +
|
||||
"cannot modify conntrack limits, problems may arise later (If running Docker, see docker issue #24000)"
|
||||
s.Recorder.Eventf(s.NodeRef, nil, api.EventTypeWarning, err.Error(), "StartKubeProxy", message)
|
||||
}
|
||||
}
|
||||
|
||||
if s.Config.Conntrack.TCPEstablishedTimeout != nil && s.Config.Conntrack.TCPEstablishedTimeout.Duration > 0 {
|
||||
timeout := int(s.Config.Conntrack.TCPEstablishedTimeout.Duration / time.Second)
|
||||
if err := s.Conntracker.SetTCPEstablishedTimeout(timeout); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if s.Config.Conntrack.TCPCloseWaitTimeout != nil && s.Config.Conntrack.TCPCloseWaitTimeout.Duration > 0 {
|
||||
timeout := int(s.Config.Conntrack.TCPCloseWaitTimeout.Duration / time.Second)
|
||||
if err := s.Conntracker.SetTCPCloseWaitTimeout(timeout); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// Do platform-specific setup
|
||||
err := s.platformSetup()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil)
|
||||
@ -758,23 +776,6 @@ func (s *ProxyServer) birthCry() {
|
||||
s.Recorder.Eventf(s.NodeRef, nil, api.EventTypeNormal, "Starting", "StartKubeProxy", "")
|
||||
}
|
||||
|
||||
func getConntrackMax(config kubeproxyconfig.KubeProxyConntrackConfiguration) (int, error) {
|
||||
if config.MaxPerCore != nil && *config.MaxPerCore > 0 {
|
||||
floor := 0
|
||||
if config.Min != nil {
|
||||
floor = int(*config.Min)
|
||||
}
|
||||
scaled := int(*config.MaxPerCore) * detectNumCPU()
|
||||
if scaled > floor {
|
||||
klog.V(3).InfoS("GetConntrackMax: using scaled conntrack-max-per-core")
|
||||
return scaled, nil
|
||||
}
|
||||
klog.V(3).InfoS("GetConntrackMax: using conntrack-min")
|
||||
return floor, nil
|
||||
}
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// detectNodeIP returns the nodeIP used by the proxier
|
||||
// The order of precedence is:
|
||||
// 1. config.bindAddress if bindAddress is not 0.0.0.0 or ::
|
||||
|
@ -35,23 +35,16 @@ import (
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/events"
|
||||
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
toolswatch "k8s.io/client-go/tools/watch"
|
||||
"k8s.io/component-base/configz"
|
||||
"k8s.io/component-base/metrics"
|
||||
nodeutil "k8s.io/component-helpers/node/util"
|
||||
utilsysctl "k8s.io/component-helpers/node/util/sysctl"
|
||||
"k8s.io/kubernetes/pkg/proxy"
|
||||
proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config"
|
||||
"k8s.io/kubernetes/pkg/proxy/apis/config/scheme"
|
||||
"k8s.io/kubernetes/pkg/proxy/healthcheck"
|
||||
"k8s.io/kubernetes/pkg/proxy/iptables"
|
||||
"k8s.io/kubernetes/pkg/proxy/ipvs"
|
||||
proxymetrics "k8s.io/kubernetes/pkg/proxy/metrics"
|
||||
@ -83,63 +76,15 @@ func (o *Options) platformApplyDefaults(config *proxyconfigapi.KubeProxyConfigur
|
||||
klog.V(2).InfoS("DetectLocalMode", "localMode", string(config.DetectLocalMode))
|
||||
}
|
||||
|
||||
// NewProxyServer returns a new ProxyServer.
|
||||
func NewProxyServer(o *Options) (*ProxyServer, error) {
|
||||
return newProxyServer(o.config, o.master)
|
||||
}
|
||||
|
||||
func newProxyServer(
|
||||
config *proxyconfigapi.KubeProxyConfiguration,
|
||||
master string) (*ProxyServer, error) {
|
||||
|
||||
if c, err := configz.New(proxyconfigapi.GroupName); err == nil {
|
||||
c.Set(config)
|
||||
} else {
|
||||
return nil, fmt.Errorf("unable to register configz: %s", err)
|
||||
}
|
||||
|
||||
var ipvsInterface utilipvs.Interface
|
||||
var ipsetInterface utilipset.Interface
|
||||
|
||||
if len(config.ShowHiddenMetricsForVersion) > 0 {
|
||||
metrics.SetShowHidden()
|
||||
}
|
||||
|
||||
hostname, err := nodeutil.GetHostname(config.HostnameOverride)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
client, err := createClient(config.ClientConnection, master)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nodeIP := detectNodeIP(client, hostname, config.BindAddress)
|
||||
klog.InfoS("Detected node IP", "address", nodeIP.String())
|
||||
|
||||
// Create event recorder
|
||||
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
|
||||
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "kube-proxy")
|
||||
|
||||
nodeRef := &v1.ObjectReference{
|
||||
Kind: "Node",
|
||||
Name: hostname,
|
||||
UID: types.UID(hostname),
|
||||
Namespace: "",
|
||||
}
|
||||
|
||||
var healthzServer healthcheck.ProxierHealthUpdater
|
||||
if len(config.HealthzBindAddress) > 0 {
|
||||
healthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef)
|
||||
}
|
||||
|
||||
// createProxier creates the proxy.Provider
|
||||
func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguration) (proxy.Provider, error) {
|
||||
var proxier proxy.Provider
|
||||
var err error
|
||||
|
||||
var nodeInfo *v1.Node
|
||||
if config.DetectLocalMode == proxyconfigapi.LocalModeNodeCIDR {
|
||||
klog.InfoS("Watching for node, awaiting podCIDR allocation", "hostname", hostname)
|
||||
nodeInfo, err = waitForPodCIDR(client, hostname)
|
||||
klog.InfoS("Watching for node, awaiting podCIDR allocation", "hostname", s.Hostname)
|
||||
nodeInfo, err = waitForPodCIDR(s.Client, s.Hostname)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -148,7 +93,7 @@ func newProxyServer(
|
||||
|
||||
primaryFamily := v1.IPv4Protocol
|
||||
primaryProtocol := utiliptables.ProtocolIPv4
|
||||
if netutils.IsIPv6(nodeIP) {
|
||||
if netutils.IsIPv6(s.NodeIP) {
|
||||
primaryFamily = v1.IPv6Protocol
|
||||
primaryProtocol = utiliptables.ProtocolIPv6
|
||||
}
|
||||
@ -210,10 +155,10 @@ func newProxyServer(
|
||||
*config.IPTables.LocalhostNodePorts,
|
||||
int(*config.IPTables.MasqueradeBit),
|
||||
localDetectors,
|
||||
hostname,
|
||||
s.Hostname,
|
||||
nodeIPTuple(config.BindAddress),
|
||||
recorder,
|
||||
healthzServer,
|
||||
s.Recorder,
|
||||
s.HealthzServer,
|
||||
nodePortAddresses,
|
||||
)
|
||||
} else {
|
||||
@ -236,10 +181,10 @@ func newProxyServer(
|
||||
*config.IPTables.LocalhostNodePorts,
|
||||
int(*config.IPTables.MasqueradeBit),
|
||||
localDetector,
|
||||
hostname,
|
||||
nodeIP,
|
||||
recorder,
|
||||
healthzServer,
|
||||
s.Hostname,
|
||||
s.NodeIP,
|
||||
s.Recorder,
|
||||
s.HealthzServer,
|
||||
nodePortAddresses,
|
||||
)
|
||||
}
|
||||
@ -247,11 +192,10 @@ func newProxyServer(
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to create proxier: %v", err)
|
||||
}
|
||||
proxymetrics.RegisterMetrics()
|
||||
} else if config.Mode == proxyconfigapi.ProxyModeIPVS {
|
||||
kernelHandler := ipvs.NewLinuxKernelHandler()
|
||||
ipsetInterface = utilipset.New(execer)
|
||||
ipvsInterface = utilipvs.New()
|
||||
ipsetInterface := utilipset.New(execer)
|
||||
ipvsInterface := utilipvs.New()
|
||||
if err := ipvs.CanUseIPVSProxier(ipvsInterface, ipsetInterface, config.IPVS.Scheduler); err != nil {
|
||||
return nil, fmt.Errorf("can't use the IPVS proxier: %v", err)
|
||||
}
|
||||
@ -285,10 +229,10 @@ func newProxyServer(
|
||||
config.IPTables.MasqueradeAll,
|
||||
int(*config.IPTables.MasqueradeBit),
|
||||
localDetectors,
|
||||
hostname,
|
||||
s.Hostname,
|
||||
nodeIPs,
|
||||
recorder,
|
||||
healthzServer,
|
||||
s.Recorder,
|
||||
s.HealthzServer,
|
||||
config.IPVS.Scheduler,
|
||||
nodePortAddresses,
|
||||
kernelHandler,
|
||||
@ -317,10 +261,10 @@ func newProxyServer(
|
||||
config.IPTables.MasqueradeAll,
|
||||
int(*config.IPTables.MasqueradeBit),
|
||||
localDetector,
|
||||
hostname,
|
||||
nodeIP,
|
||||
recorder,
|
||||
healthzServer,
|
||||
s.Hostname,
|
||||
s.NodeIP,
|
||||
s.Recorder,
|
||||
s.HealthzServer,
|
||||
config.IPVS.Scheduler,
|
||||
nodePortAddresses,
|
||||
kernelHandler,
|
||||
@ -329,19 +273,70 @@ func newProxyServer(
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to create proxier: %v", err)
|
||||
}
|
||||
proxymetrics.RegisterMetrics()
|
||||
}
|
||||
|
||||
return &ProxyServer{
|
||||
Config: config,
|
||||
Client: client,
|
||||
Proxier: proxier,
|
||||
Broadcaster: eventBroadcaster,
|
||||
Recorder: recorder,
|
||||
Conntracker: &realConntracker{},
|
||||
NodeRef: nodeRef,
|
||||
HealthzServer: healthzServer,
|
||||
}, nil
|
||||
return proxier, nil
|
||||
}
|
||||
|
||||
func (s *ProxyServer) platformSetup() error {
|
||||
ct := &realConntracker{}
|
||||
|
||||
max, err := getConntrackMax(s.Config.Conntrack)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if max > 0 {
|
||||
err := ct.SetMax(max)
|
||||
if err != nil {
|
||||
if err != errReadOnlySysFS {
|
||||
return err
|
||||
}
|
||||
// errReadOnlySysFS is caused by a known docker issue (https://github.com/docker/docker/issues/24000),
|
||||
// the only remediation we know is to restart the docker daemon.
|
||||
// Here we'll send an node event with specific reason and message, the
|
||||
// administrator should decide whether and how to handle this issue,
|
||||
// whether to drain the node and restart docker. Occurs in other container runtimes
|
||||
// as well.
|
||||
// TODO(random-liu): Remove this when the docker bug is fixed.
|
||||
const message = "CRI error: /sys is read-only: " +
|
||||
"cannot modify conntrack limits, problems may arise later (If running Docker, see docker issue #24000)"
|
||||
s.Recorder.Eventf(s.NodeRef, nil, v1.EventTypeWarning, err.Error(), "StartKubeProxy", message)
|
||||
}
|
||||
}
|
||||
|
||||
if s.Config.Conntrack.TCPEstablishedTimeout != nil && s.Config.Conntrack.TCPEstablishedTimeout.Duration > 0 {
|
||||
timeout := int(s.Config.Conntrack.TCPEstablishedTimeout.Duration / time.Second)
|
||||
if err := ct.SetTCPEstablishedTimeout(timeout); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if s.Config.Conntrack.TCPCloseWaitTimeout != nil && s.Config.Conntrack.TCPCloseWaitTimeout.Duration > 0 {
|
||||
timeout := int(s.Config.Conntrack.TCPCloseWaitTimeout.Duration / time.Second)
|
||||
if err := ct.SetTCPCloseWaitTimeout(timeout); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
proxymetrics.RegisterMetrics()
|
||||
return nil
|
||||
}
|
||||
|
||||
func getConntrackMax(config proxyconfigapi.KubeProxyConntrackConfiguration) (int, error) {
|
||||
if config.MaxPerCore != nil && *config.MaxPerCore > 0 {
|
||||
floor := 0
|
||||
if config.Min != nil {
|
||||
floor = int(*config.Min)
|
||||
}
|
||||
scaled := int(*config.MaxPerCore) * detectNumCPU()
|
||||
if scaled > floor {
|
||||
klog.V(3).InfoS("GetConntrackMax: using scaled conntrack-max-per-core")
|
||||
return scaled, nil
|
||||
}
|
||||
klog.V(3).InfoS("GetConntrackMax: using conntrack-min")
|
||||
return floor, nil
|
||||
}
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func waitForPodCIDR(client clientset.Interface, nodeName string) (*v1.Node, error) {
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
goruntime "runtime"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
@ -34,6 +35,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
netutils "k8s.io/utils/net"
|
||||
"k8s.io/utils/pointer"
|
||||
|
||||
clientsetfake "k8s.io/client-go/kubernetes/fake"
|
||||
clientgotesting "k8s.io/client-go/testing"
|
||||
@ -733,3 +735,49 @@ func Test_waitForPodCIDR(t *testing.T) {
|
||||
t.Errorf("waitForPodCIDR() got %v expected to be %v ", got.Spec.PodCIDRs, expected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetConntrackMax(t *testing.T) {
|
||||
ncores := goruntime.NumCPU()
|
||||
testCases := []struct {
|
||||
min int32
|
||||
maxPerCore int32
|
||||
expected int
|
||||
err string
|
||||
}{
|
||||
{
|
||||
expected: 0,
|
||||
},
|
||||
{
|
||||
maxPerCore: 67890, // use this if Max is 0
|
||||
min: 1, // avoid 0 default
|
||||
expected: 67890 * ncores,
|
||||
},
|
||||
{
|
||||
maxPerCore: 1, // ensure that Min is considered
|
||||
min: 123456,
|
||||
expected: 123456,
|
||||
},
|
||||
{
|
||||
maxPerCore: 0, // leave system setting
|
||||
min: 123456,
|
||||
expected: 0,
|
||||
},
|
||||
}
|
||||
|
||||
for i, tc := range testCases {
|
||||
cfg := proxyconfigapi.KubeProxyConntrackConfiguration{
|
||||
Min: pointer.Int32(tc.min),
|
||||
MaxPerCore: pointer.Int32(tc.maxPerCore),
|
||||
}
|
||||
x, e := getConntrackMax(cfg)
|
||||
if e != nil {
|
||||
if tc.err == "" {
|
||||
t.Errorf("[%d] unexpected error: %v", i, e)
|
||||
} else if !strings.Contains(e.Error(), tc.err) {
|
||||
t.Errorf("[%d] expected an error containing %q: %v", i, tc.err, e)
|
||||
}
|
||||
} else if x != tc.expected {
|
||||
t.Errorf("[%d] expected %d, got %d", i, tc.expected, x)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -20,8 +20,6 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -36,52 +34,6 @@ import (
|
||||
kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config"
|
||||
)
|
||||
|
||||
func TestGetConntrackMax(t *testing.T) {
|
||||
ncores := runtime.NumCPU()
|
||||
testCases := []struct {
|
||||
min int32
|
||||
maxPerCore int32
|
||||
expected int
|
||||
err string
|
||||
}{
|
||||
{
|
||||
expected: 0,
|
||||
},
|
||||
{
|
||||
maxPerCore: 67890, // use this if Max is 0
|
||||
min: 1, // avoid 0 default
|
||||
expected: 67890 * ncores,
|
||||
},
|
||||
{
|
||||
maxPerCore: 1, // ensure that Min is considered
|
||||
min: 123456,
|
||||
expected: 123456,
|
||||
},
|
||||
{
|
||||
maxPerCore: 0, // leave system setting
|
||||
min: 123456,
|
||||
expected: 0,
|
||||
},
|
||||
}
|
||||
|
||||
for i, tc := range testCases {
|
||||
cfg := kubeproxyconfig.KubeProxyConntrackConfiguration{
|
||||
Min: pointer.Int32(tc.min),
|
||||
MaxPerCore: pointer.Int32(tc.maxPerCore),
|
||||
}
|
||||
x, e := getConntrackMax(cfg)
|
||||
if e != nil {
|
||||
if tc.err == "" {
|
||||
t.Errorf("[%d] unexpected error: %v", i, e)
|
||||
} else if !strings.Contains(e.Error(), tc.err) {
|
||||
t.Errorf("[%d] expected an error containing %q: %v", i, tc.err, e)
|
||||
}
|
||||
} else if x != tc.expected {
|
||||
t.Errorf("[%d] expected %d, got %d", i, tc.expected, x)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestLoadConfig tests proper operation of loadConfig()
|
||||
func TestLoadConfig(t *testing.T) {
|
||||
|
||||
|
@ -25,23 +25,14 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
goruntime "runtime"
|
||||
"strconv"
|
||||
|
||||
// Enable pprof HTTP handlers.
|
||||
_ "net/http/pprof"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/tools/events"
|
||||
"k8s.io/component-base/configz"
|
||||
"k8s.io/component-base/metrics"
|
||||
nodeutil "k8s.io/component-helpers/node/util"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/proxy"
|
||||
proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config"
|
||||
proxyconfigscheme "k8s.io/kubernetes/pkg/proxy/apis/config/scheme"
|
||||
"k8s.io/kubernetes/pkg/proxy/healthcheck"
|
||||
"k8s.io/kubernetes/pkg/proxy/winkernel"
|
||||
)
|
||||
|
||||
@ -51,49 +42,10 @@ func (o *Options) platformApplyDefaults(config *proxyconfigapi.KubeProxyConfigur
|
||||
}
|
||||
}
|
||||
|
||||
// NewProxyServer returns a new ProxyServer.
|
||||
func NewProxyServer(o *Options) (*ProxyServer, error) {
|
||||
return newProxyServer(o.config, o.master)
|
||||
}
|
||||
|
||||
func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, master string) (*ProxyServer, error) {
|
||||
if c, err := configz.New(proxyconfigapi.GroupName); err == nil {
|
||||
c.Set(config)
|
||||
} else {
|
||||
return nil, fmt.Errorf("unable to register configz: %s", err)
|
||||
}
|
||||
|
||||
if len(config.ShowHiddenMetricsForVersion) > 0 {
|
||||
metrics.SetShowHidden()
|
||||
}
|
||||
|
||||
client, err := createClient(config.ClientConnection, master)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Create event recorder
|
||||
hostname, err := nodeutil.GetHostname(config.HostnameOverride)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nodeIP := detectNodeIP(client, hostname, config.BindAddress)
|
||||
klog.InfoS("Detected node IP", "IP", nodeIP.String())
|
||||
|
||||
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
|
||||
recorder := eventBroadcaster.NewRecorder(proxyconfigscheme.Scheme, "kube-proxy")
|
||||
|
||||
nodeRef := &v1.ObjectReference{
|
||||
Kind: "Node",
|
||||
Name: hostname,
|
||||
UID: types.UID(hostname),
|
||||
Namespace: "",
|
||||
}
|
||||
|
||||
var healthzServer healthcheck.ProxierHealthUpdater
|
||||
// createProxier creates the proxy.Provider
|
||||
func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguration) (proxy.Provider, error) {
|
||||
var healthzPort int
|
||||
if len(config.HealthzBindAddress) > 0 {
|
||||
healthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef)
|
||||
_, port, _ := net.SplitHostPort(config.HealthzBindAddress)
|
||||
healthzPort, _ = strconv.Atoi(port)
|
||||
}
|
||||
@ -105,6 +57,7 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, master string
|
||||
}
|
||||
|
||||
var proxier proxy.Provider
|
||||
|
||||
dualStackMode := getDualStackMode(config.Winkernel.NetworkName, winkernel.DualStackCompatTester{})
|
||||
if dualStackMode {
|
||||
klog.InfoS("Creating dualStackProxier for Windows kernel.")
|
||||
@ -113,10 +66,10 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, master string
|
||||
config.IPTables.SyncPeriod.Duration,
|
||||
config.IPTables.MinSyncPeriod.Duration,
|
||||
config.ClusterCIDR,
|
||||
hostname,
|
||||
s.Hostname,
|
||||
nodeIPTuple(config.BindAddress),
|
||||
recorder,
|
||||
healthzServer,
|
||||
s.Recorder,
|
||||
s.HealthzServer,
|
||||
config.Winkernel,
|
||||
healthzPort,
|
||||
)
|
||||
@ -125,10 +78,10 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, master string
|
||||
config.IPTables.SyncPeriod.Duration,
|
||||
config.IPTables.MinSyncPeriod.Duration,
|
||||
config.ClusterCIDR,
|
||||
hostname,
|
||||
nodeIP,
|
||||
recorder,
|
||||
healthzServer,
|
||||
s.Hostname,
|
||||
s.NodeIP,
|
||||
s.Recorder,
|
||||
s.HealthzServer,
|
||||
config.Winkernel,
|
||||
healthzPort,
|
||||
)
|
||||
@ -136,27 +89,19 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, master string
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to create proxier: %v", err)
|
||||
}
|
||||
winkernel.RegisterMetrics()
|
||||
|
||||
return &ProxyServer{
|
||||
Config: config,
|
||||
Client: client,
|
||||
Proxier: proxier,
|
||||
Broadcaster: eventBroadcaster,
|
||||
Recorder: recorder,
|
||||
NodeRef: nodeRef,
|
||||
HealthzServer: healthzServer,
|
||||
}, nil
|
||||
return proxier, nil
|
||||
}
|
||||
|
||||
func (s *ProxyServer) platformSetup() error {
|
||||
winkernel.RegisterMetrics()
|
||||
return nil
|
||||
}
|
||||
|
||||
func getDualStackMode(networkname string, compatTester winkernel.StackCompatTester) bool {
|
||||
return compatTester.DualStackCompatible(networkname)
|
||||
}
|
||||
|
||||
func detectNumCPU() int {
|
||||
return goruntime.NumCPU()
|
||||
}
|
||||
|
||||
// cleanupAndExit cleans up after a previous proxy run
|
||||
func cleanupAndExit() error {
|
||||
return errors.New("--cleanup-and-exit is not implemented on Windows")
|
||||
|
Loading…
Reference in New Issue
Block a user