Add ProxyServer.platformSetup

Move the Linux-specific conntrack setup code into a new
"platformSetup" rather than trying to fit it into the generic setup
code.

Also move metrics registration there.
This commit is contained in:
Dan Winship 2023-03-13 10:57:19 -04:00
parent ce05a4f7fc
commit 08ce580576
5 changed files with 119 additions and 114 deletions

View File

@ -514,7 +514,6 @@ type ProxyServer struct {
Client clientset.Interface Client clientset.Interface
Broadcaster events.EventBroadcaster Broadcaster events.EventBroadcaster
Recorder events.EventRecorder Recorder events.EventRecorder
Conntracker Conntracker // if nil, ignored
NodeRef *v1.ObjectReference NodeRef *v1.ObjectReference
HealthzServer healthcheck.ProxierHealthUpdater HealthzServer healthcheck.ProxierHealthUpdater
@ -652,45 +651,10 @@ func (s *ProxyServer) Run() error {
// Start up a metrics server if requested // Start up a metrics server if requested
serveMetrics(s.Config.MetricsBindAddress, s.Config.Mode, s.Config.EnableProfiling, errCh) serveMetrics(s.Config.MetricsBindAddress, s.Config.Mode, s.Config.EnableProfiling, errCh)
// Tune conntrack, if requested // Do platform-specific setup
// Conntracker is always nil for windows err := s.platformSetup()
if s.Conntracker != nil { if err != nil {
max, err := getConntrackMax(s.Config.Conntrack) return err
if err != nil {
return err
}
if max > 0 {
err := s.Conntracker.SetMax(max)
if err != nil {
if err != errReadOnlySysFS {
return err
}
// errReadOnlySysFS is caused by a known docker issue (https://github.com/docker/docker/issues/24000),
// the only remediation we know is to restart the docker daemon.
// Here we'll send an node event with specific reason and message, the
// administrator should decide whether and how to handle this issue,
// whether to drain the node and restart docker. Occurs in other container runtimes
// as well.
// TODO(random-liu): Remove this when the docker bug is fixed.
const message = "CRI error: /sys is read-only: " +
"cannot modify conntrack limits, problems may arise later (If running Docker, see docker issue #24000)"
s.Recorder.Eventf(s.NodeRef, nil, api.EventTypeWarning, err.Error(), "StartKubeProxy", message)
}
}
if s.Config.Conntrack.TCPEstablishedTimeout != nil && s.Config.Conntrack.TCPEstablishedTimeout.Duration > 0 {
timeout := int(s.Config.Conntrack.TCPEstablishedTimeout.Duration / time.Second)
if err := s.Conntracker.SetTCPEstablishedTimeout(timeout); err != nil {
return err
}
}
if s.Config.Conntrack.TCPCloseWaitTimeout != nil && s.Config.Conntrack.TCPCloseWaitTimeout.Duration > 0 {
timeout := int(s.Config.Conntrack.TCPCloseWaitTimeout.Duration / time.Second)
if err := s.Conntracker.SetTCPCloseWaitTimeout(timeout); err != nil {
return err
}
}
} }
noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil) noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil)
@ -758,23 +722,6 @@ func (s *ProxyServer) birthCry() {
s.Recorder.Eventf(s.NodeRef, nil, api.EventTypeNormal, "Starting", "StartKubeProxy", "") s.Recorder.Eventf(s.NodeRef, nil, api.EventTypeNormal, "Starting", "StartKubeProxy", "")
} }
func getConntrackMax(config kubeproxyconfig.KubeProxyConntrackConfiguration) (int, error) {
if config.MaxPerCore != nil && *config.MaxPerCore > 0 {
floor := 0
if config.Min != nil {
floor = int(*config.Min)
}
scaled := int(*config.MaxPerCore) * detectNumCPU()
if scaled > floor {
klog.V(3).InfoS("GetConntrackMax: using scaled conntrack-max-per-core")
return scaled, nil
}
klog.V(3).InfoS("GetConntrackMax: using conntrack-min")
return floor, nil
}
return 0, nil
}
// detectNodeIP returns the nodeIP used by the proxier // detectNodeIP returns the nodeIP used by the proxier
// The order of precedence is: // The order of precedence is:
// 1. config.bindAddress if bindAddress is not 0.0.0.0 or :: // 1. config.bindAddress if bindAddress is not 0.0.0.0 or ::

View File

@ -247,7 +247,6 @@ func newProxyServer(
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err) return nil, fmt.Errorf("unable to create proxier: %v", err)
} }
proxymetrics.RegisterMetrics()
} else if config.Mode == proxyconfigapi.ProxyModeIPVS { } else if config.Mode == proxyconfigapi.ProxyModeIPVS {
kernelHandler := ipvs.NewLinuxKernelHandler() kernelHandler := ipvs.NewLinuxKernelHandler()
ipsetInterface = utilipset.New(execer) ipsetInterface = utilipset.New(execer)
@ -329,7 +328,6 @@ func newProxyServer(
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err) return nil, fmt.Errorf("unable to create proxier: %v", err)
} }
proxymetrics.RegisterMetrics()
} }
return &ProxyServer{ return &ProxyServer{
@ -338,12 +336,72 @@ func newProxyServer(
Proxier: proxier, Proxier: proxier,
Broadcaster: eventBroadcaster, Broadcaster: eventBroadcaster,
Recorder: recorder, Recorder: recorder,
Conntracker: &realConntracker{},
NodeRef: nodeRef, NodeRef: nodeRef,
HealthzServer: healthzServer, HealthzServer: healthzServer,
}, nil }, nil
} }
func (s *ProxyServer) platformSetup() error {
ct := &realConntracker{}
max, err := getConntrackMax(s.Config.Conntrack)
if err != nil {
return err
}
if max > 0 {
err := ct.SetMax(max)
if err != nil {
if err != errReadOnlySysFS {
return err
}
// errReadOnlySysFS is caused by a known docker issue (https://github.com/docker/docker/issues/24000),
// the only remediation we know is to restart the docker daemon.
// Here we'll send an node event with specific reason and message, the
// administrator should decide whether and how to handle this issue,
// whether to drain the node and restart docker. Occurs in other container runtimes
// as well.
// TODO(random-liu): Remove this when the docker bug is fixed.
const message = "CRI error: /sys is read-only: " +
"cannot modify conntrack limits, problems may arise later (If running Docker, see docker issue #24000)"
s.Recorder.Eventf(s.NodeRef, nil, v1.EventTypeWarning, err.Error(), "StartKubeProxy", message)
}
}
if s.Config.Conntrack.TCPEstablishedTimeout != nil && s.Config.Conntrack.TCPEstablishedTimeout.Duration > 0 {
timeout := int(s.Config.Conntrack.TCPEstablishedTimeout.Duration / time.Second)
if err := ct.SetTCPEstablishedTimeout(timeout); err != nil {
return err
}
}
if s.Config.Conntrack.TCPCloseWaitTimeout != nil && s.Config.Conntrack.TCPCloseWaitTimeout.Duration > 0 {
timeout := int(s.Config.Conntrack.TCPCloseWaitTimeout.Duration / time.Second)
if err := ct.SetTCPCloseWaitTimeout(timeout); err != nil {
return err
}
}
proxymetrics.RegisterMetrics()
return nil
}
func getConntrackMax(config proxyconfigapi.KubeProxyConntrackConfiguration) (int, error) {
if config.MaxPerCore != nil && *config.MaxPerCore > 0 {
floor := 0
if config.Min != nil {
floor = int(*config.Min)
}
scaled := int(*config.MaxPerCore) * detectNumCPU()
if scaled > floor {
klog.V(3).InfoS("GetConntrackMax: using scaled conntrack-max-per-core")
return scaled, nil
}
klog.V(3).InfoS("GetConntrackMax: using conntrack-min")
return floor, nil
}
return 0, nil
}
func waitForPodCIDR(client clientset.Interface, nodeName string) (*v1.Node, error) { func waitForPodCIDR(client clientset.Interface, nodeName string) (*v1.Node, error) {
// since allocators can assign the podCIDR after the node registers, we do a watch here to wait // since allocators can assign the podCIDR after the node registers, we do a watch here to wait
// for podCIDR to be assigned, instead of assuming that the Get() on startup will have it. // for podCIDR to be assigned, instead of assuming that the Get() on startup will have it.

View File

@ -25,6 +25,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"reflect" "reflect"
goruntime "runtime"
"strings" "strings"
"testing" "testing"
"time" "time"
@ -34,6 +35,7 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
netutils "k8s.io/utils/net" netutils "k8s.io/utils/net"
"k8s.io/utils/pointer"
clientsetfake "k8s.io/client-go/kubernetes/fake" clientsetfake "k8s.io/client-go/kubernetes/fake"
clientgotesting "k8s.io/client-go/testing" clientgotesting "k8s.io/client-go/testing"
@ -733,3 +735,49 @@ func Test_waitForPodCIDR(t *testing.T) {
t.Errorf("waitForPodCIDR() got %v expected to be %v ", got.Spec.PodCIDRs, expected) t.Errorf("waitForPodCIDR() got %v expected to be %v ", got.Spec.PodCIDRs, expected)
} }
} }
func TestGetConntrackMax(t *testing.T) {
ncores := goruntime.NumCPU()
testCases := []struct {
min int32
maxPerCore int32
expected int
err string
}{
{
expected: 0,
},
{
maxPerCore: 67890, // use this if Max is 0
min: 1, // avoid 0 default
expected: 67890 * ncores,
},
{
maxPerCore: 1, // ensure that Min is considered
min: 123456,
expected: 123456,
},
{
maxPerCore: 0, // leave system setting
min: 123456,
expected: 0,
},
}
for i, tc := range testCases {
cfg := proxyconfigapi.KubeProxyConntrackConfiguration{
Min: pointer.Int32(tc.min),
MaxPerCore: pointer.Int32(tc.maxPerCore),
}
x, e := getConntrackMax(cfg)
if e != nil {
if tc.err == "" {
t.Errorf("[%d] unexpected error: %v", i, e)
} else if !strings.Contains(e.Error(), tc.err) {
t.Errorf("[%d] expected an error containing %q: %v", i, tc.err, e)
}
} else if x != tc.expected {
t.Errorf("[%d] expected %d, got %d", i, tc.expected, x)
}
}
}

View File

@ -20,8 +20,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"reflect" "reflect"
"runtime"
"strings"
"testing" "testing"
"time" "time"
@ -36,52 +34,6 @@ import (
kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config" kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config"
) )
func TestGetConntrackMax(t *testing.T) {
ncores := runtime.NumCPU()
testCases := []struct {
min int32
maxPerCore int32
expected int
err string
}{
{
expected: 0,
},
{
maxPerCore: 67890, // use this if Max is 0
min: 1, // avoid 0 default
expected: 67890 * ncores,
},
{
maxPerCore: 1, // ensure that Min is considered
min: 123456,
expected: 123456,
},
{
maxPerCore: 0, // leave system setting
min: 123456,
expected: 0,
},
}
for i, tc := range testCases {
cfg := kubeproxyconfig.KubeProxyConntrackConfiguration{
Min: pointer.Int32(tc.min),
MaxPerCore: pointer.Int32(tc.maxPerCore),
}
x, e := getConntrackMax(cfg)
if e != nil {
if tc.err == "" {
t.Errorf("[%d] unexpected error: %v", i, e)
} else if !strings.Contains(e.Error(), tc.err) {
t.Errorf("[%d] expected an error containing %q: %v", i, tc.err, e)
}
} else if x != tc.expected {
t.Errorf("[%d] expected %d, got %d", i, tc.expected, x)
}
}
}
// TestLoadConfig tests proper operation of loadConfig() // TestLoadConfig tests proper operation of loadConfig()
func TestLoadConfig(t *testing.T) { func TestLoadConfig(t *testing.T) {

View File

@ -25,7 +25,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"net" "net"
goruntime "runtime"
"strconv" "strconv"
// Enable pprof HTTP handlers. // Enable pprof HTTP handlers.
@ -105,6 +104,7 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, master string
} }
var proxier proxy.Provider var proxier proxy.Provider
dualStackMode := getDualStackMode(config.Winkernel.NetworkName, winkernel.DualStackCompatTester{}) dualStackMode := getDualStackMode(config.Winkernel.NetworkName, winkernel.DualStackCompatTester{})
if dualStackMode { if dualStackMode {
klog.InfoS("Creating dualStackProxier for Windows kernel.") klog.InfoS("Creating dualStackProxier for Windows kernel.")
@ -136,7 +136,6 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, master string
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err) return nil, fmt.Errorf("unable to create proxier: %v", err)
} }
winkernel.RegisterMetrics()
return &ProxyServer{ return &ProxyServer{
Config: config, Config: config,
@ -149,12 +148,13 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, master string
}, nil }, nil
} }
func getDualStackMode(networkname string, compatTester winkernel.StackCompatTester) bool { func (s *ProxyServer) platformSetup() error {
return compatTester.DualStackCompatible(networkname) winkernel.RegisterMetrics()
return nil
} }
func detectNumCPU() int { func getDualStackMode(networkname string, compatTester winkernel.StackCompatTester) bool {
return goruntime.NumCPU() return compatTester.DualStackCompatible(networkname)
} }
// cleanupAndExit cleans up after a previous proxy run // cleanupAndExit cleans up after a previous proxy run