Remove duplicated config fields from ProxyServer

Rather than duplicating some of the KubeProxyConfiguration into
ProxyServer, just store the KubeProxyConfiguration itself so later
code can reference it directly.

For the fields that get platform-specific defaults (Mode,
DetectLocalMode), fill the defaults directly into the
KubeProxyConfiguration rather than keeping the original there and the
defaulted version in the ProxyServer.
This commit is contained in:
Dan Winship 2023-03-12 17:37:50 -04:00
parent 9d4f10f5d2
commit 258c4c4251
5 changed files with 131 additions and 113 deletions

View File

@ -452,6 +452,8 @@ func (o *Options) ApplyDefaults(in *kubeproxyconfig.KubeProxyConfiguration) (*ku
out := internal.(*kubeproxyconfig.KubeProxyConfiguration)
o.platformApplyDefaults(out)
return out, nil
}
@ -522,21 +524,16 @@ with the apiserver API to configure the proxy.`,
// ProxyServer represents all the parameters required to start the Kubernetes proxy server. All
// fields are required.
type ProxyServer struct {
Client clientset.Interface
Proxier proxy.Provider
Broadcaster events.EventBroadcaster
Recorder events.EventRecorder
ConntrackConfiguration kubeproxyconfig.KubeProxyConntrackConfiguration
Conntracker Conntracker // if nil, ignored
ProxyMode kubeproxyconfig.ProxyMode
NodeRef *v1.ObjectReference
MetricsBindAddress string
BindAddressHardFail bool
EnableProfiling bool
OOMScoreAdj *int32
ConfigSyncPeriod time.Duration
HealthzServer healthcheck.ProxierHealthUpdater
localDetectorMode kubeproxyconfig.LocalMode
Config *kubeproxyconfig.KubeProxyConfiguration
Client clientset.Interface
Broadcaster events.EventBroadcaster
Recorder events.EventRecorder
Conntracker Conntracker // if nil, ignored
NodeRef *v1.ObjectReference
HealthzServer healthcheck.ProxierHealthUpdater
Proxier proxy.Provider
}
// createClient creates a kube client from the given config and masterOverride.
@ -645,9 +642,9 @@ func (s *ProxyServer) Run() error {
// TODO(vmarmol): Use container config for this.
var oomAdjuster *oom.OOMAdjuster
if s.OOMScoreAdj != nil {
if s.Config.OOMScoreAdj != nil {
oomAdjuster = oom.NewOOMAdjuster()
if err := oomAdjuster.ApplyOOMScoreAdj(0, int(*s.OOMScoreAdj)); err != nil {
if err := oomAdjuster.ApplyOOMScoreAdj(0, int(*s.Config.OOMScoreAdj)); err != nil {
klog.V(2).InfoS("Failed to apply OOMScore", "err", err)
}
}
@ -660,7 +657,7 @@ func (s *ProxyServer) Run() error {
// TODO(thockin): make it possible for healthz and metrics to be on the same port.
var errCh chan error
if s.BindAddressHardFail {
if s.Config.BindAddressHardFail {
errCh = make(chan error)
}
@ -668,12 +665,12 @@ func (s *ProxyServer) Run() error {
serveHealthz(s.HealthzServer, errCh)
// Start up a metrics server if requested
serveMetrics(s.MetricsBindAddress, s.ProxyMode, s.EnableProfiling, errCh)
serveMetrics(s.Config.MetricsBindAddress, s.Config.Mode, s.Config.EnableProfiling, errCh)
// Tune conntrack, if requested
// Conntracker is always nil for windows
if s.Conntracker != nil {
max, err := getConntrackMax(s.ConntrackConfiguration)
max, err := getConntrackMax(s.Config.Conntrack)
if err != nil {
return err
}
@ -696,15 +693,15 @@ func (s *ProxyServer) Run() error {
}
}
if s.ConntrackConfiguration.TCPEstablishedTimeout != nil && s.ConntrackConfiguration.TCPEstablishedTimeout.Duration > 0 {
timeout := int(s.ConntrackConfiguration.TCPEstablishedTimeout.Duration / time.Second)
if s.Config.Conntrack.TCPEstablishedTimeout != nil && s.Config.Conntrack.TCPEstablishedTimeout.Duration > 0 {
timeout := int(s.Config.Conntrack.TCPEstablishedTimeout.Duration / time.Second)
if err := s.Conntracker.SetTCPEstablishedTimeout(timeout); err != nil {
return err
}
}
if s.ConntrackConfiguration.TCPCloseWaitTimeout != nil && s.ConntrackConfiguration.TCPCloseWaitTimeout.Duration > 0 {
timeout := int(s.ConntrackConfiguration.TCPCloseWaitTimeout.Duration / time.Second)
if s.Config.Conntrack.TCPCloseWaitTimeout != nil && s.Config.Conntrack.TCPCloseWaitTimeout.Duration > 0 {
timeout := int(s.Config.Conntrack.TCPCloseWaitTimeout.Duration / time.Second)
if err := s.Conntracker.SetTCPCloseWaitTimeout(timeout); err != nil {
return err
}
@ -725,7 +722,7 @@ func (s *ProxyServer) Run() error {
labelSelector = labelSelector.Add(*noProxyName, *noHeadlessEndpoints)
// Make informers that filter out objects that want a non-default service proxy.
informerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod,
informerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.Config.ConfigSyncPeriod.Duration,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = labelSelector.String()
}))
@ -734,11 +731,11 @@ func (s *ProxyServer) Run() error {
// Note: RegisterHandler() calls need to happen before creation of Sources because sources
// only notify on changes, and the initial update (on process start) may be lost if no handlers
// are registered yet.
serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod)
serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.Config.ConfigSyncPeriod.Duration)
serviceConfig.RegisterEventHandler(s.Proxier)
go serviceConfig.Run(wait.NeverStop)
endpointSliceConfig := config.NewEndpointSliceConfig(informerFactory.Discovery().V1().EndpointSlices(), s.ConfigSyncPeriod)
endpointSliceConfig := config.NewEndpointSliceConfig(informerFactory.Discovery().V1().EndpointSlices(), s.Config.ConfigSyncPeriod.Duration)
endpointSliceConfig.RegisterEventHandler(s.Proxier)
go endpointSliceConfig.Run(wait.NeverStop)
@ -747,13 +744,13 @@ func (s *ProxyServer) Run() error {
informerFactory.Start(wait.NeverStop)
// Make an informer that selects for our nodename.
currentNodeInformerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod,
currentNodeInformerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.Config.ConfigSyncPeriod.Duration,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("metadata.name", s.NodeRef.Name).String()
}))
nodeConfig := config.NewNodeConfig(currentNodeInformerFactory.Core().V1().Nodes(), s.ConfigSyncPeriod)
nodeConfig := config.NewNodeConfig(currentNodeInformerFactory.Core().V1().Nodes(), s.Config.ConfigSyncPeriod.Duration)
// https://issues.k8s.io/111321
if s.localDetectorMode == kubeproxyconfig.LocalModeNodeCIDR {
if s.Config.DetectLocalMode == kubeproxyconfig.LocalModeNodeCIDR {
nodeConfig.RegisterEventHandler(&proxy.NodePodCIDRHandler{})
}
nodeConfig.RegisterEventHandler(s.Proxier)

View File

@ -70,6 +70,19 @@ import (
// node after it is registered.
var timeoutForNodePodCIDR = 5 * time.Minute
func (o *Options) platformApplyDefaults(config *proxyconfigapi.KubeProxyConfiguration) {
if config.Mode == "" {
klog.InfoS("Using iptables proxy")
config.Mode = proxyconfigapi.ProxyModeIPTables
}
if config.DetectLocalMode == "" {
klog.V(4).InfoS("Defaulting detect-local-mode", "localModeClusterCIDR", string(proxyconfigapi.LocalModeClusterCIDR))
config.DetectLocalMode = proxyconfigapi.LocalModeClusterCIDR
}
klog.V(2).InfoS("DetectLocalMode", "localMode", string(config.DetectLocalMode))
}
// NewProxyServer returns a new ProxyServer.
func NewProxyServer(o *Options) (*ProxyServer, error) {
return newProxyServer(o.config, o.master)
@ -123,11 +136,8 @@ func newProxyServer(
var proxier proxy.Provider
proxyMode := getProxyMode(config.Mode)
detectLocalMode := getDetectLocalMode(config)
var nodeInfo *v1.Node
if detectLocalMode == proxyconfigapi.LocalModeNodeCIDR {
if config.DetectLocalMode == proxyconfigapi.LocalModeNodeCIDR {
klog.InfoS("Watching for node, awaiting podCIDR allocation", "hostname", hostname)
nodeInfo, err = waitForPodCIDR(client, hostname)
if err != nil {
@ -136,8 +146,6 @@ func newProxyServer(
klog.InfoS("NodeInfo", "podCIDR", nodeInfo.Spec.PodCIDR, "podCIDRs", nodeInfo.Spec.PodCIDRs)
}
klog.V(2).InfoS("DetectLocalMode", "localMode", string(detectLocalMode))
primaryFamily := v1.IPv4Protocol
primaryProtocol := utiliptables.ProtocolIPv4
if netutils.IsIPv6(nodeIP) {
@ -178,7 +186,7 @@ func newProxyServer(
}
}
if proxyMode == proxyconfigapi.ProxyModeIPTables {
if config.Mode == proxyconfigapi.ProxyModeIPTables {
klog.InfoS("Using iptables Proxier")
if dualStack {
@ -186,7 +194,7 @@ func newProxyServer(
klog.InfoS("Creating dualStackProxier for iptables")
// Always ordered to match []ipt
var localDetectors [2]proxyutiliptables.LocalTrafficDetector
localDetectors, err = getDualStackLocalDetectorTuple(detectLocalMode, config, ipt, nodeInfo)
localDetectors, err = getDualStackLocalDetectorTuple(config.DetectLocalMode, config, ipt, nodeInfo)
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
}
@ -211,7 +219,7 @@ func newProxyServer(
} else {
// Create a single-stack proxier if and only if the node does not support dual-stack (i.e, no iptables support).
var localDetector proxyutiliptables.LocalTrafficDetector
localDetector, err = getLocalDetector(detectLocalMode, config, iptInterface, nodeInfo)
localDetector, err = getLocalDetector(config.DetectLocalMode, config, iptInterface, nodeInfo)
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
}
@ -240,7 +248,7 @@ func newProxyServer(
return nil, fmt.Errorf("unable to create proxier: %v", err)
}
proxymetrics.RegisterMetrics()
} else if proxyMode == proxyconfigapi.ProxyModeIPVS {
} else if config.Mode == proxyconfigapi.ProxyModeIPVS {
kernelHandler := ipvs.NewLinuxKernelHandler()
ipsetInterface = utilipset.New(execer)
ipvsInterface = utilipvs.New()
@ -256,7 +264,7 @@ func newProxyServer(
// Always ordered to match []ipt
var localDetectors [2]proxyutiliptables.LocalTrafficDetector
localDetectors, err = getDualStackLocalDetectorTuple(detectLocalMode, config, ipt, nodeInfo)
localDetectors, err = getDualStackLocalDetectorTuple(config.DetectLocalMode, config, ipt, nodeInfo)
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
}
@ -287,7 +295,7 @@ func newProxyServer(
)
} else {
var localDetector proxyutiliptables.LocalTrafficDetector
localDetector, err = getLocalDetector(detectLocalMode, config, iptInterface, nodeInfo)
localDetector, err = getLocalDetector(config.DetectLocalMode, config, iptInterface, nodeInfo)
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
}
@ -325,21 +333,14 @@ func newProxyServer(
}
return &ProxyServer{
Client: client,
Proxier: proxier,
Broadcaster: eventBroadcaster,
Recorder: recorder,
ConntrackConfiguration: config.Conntrack,
Conntracker: &realConntracker{},
ProxyMode: proxyMode,
NodeRef: nodeRef,
MetricsBindAddress: config.MetricsBindAddress,
BindAddressHardFail: config.BindAddressHardFail,
EnableProfiling: config.EnableProfiling,
OOMScoreAdj: config.OOMScoreAdj,
ConfigSyncPeriod: config.ConfigSyncPeriod.Duration,
HealthzServer: healthzServer,
localDetectorMode: detectLocalMode,
Config: config,
Client: client,
Proxier: proxier,
Broadcaster: eventBroadcaster,
Recorder: recorder,
Conntracker: &realConntracker{},
NodeRef: nodeRef,
HealthzServer: healthzServer,
}, nil
}
@ -396,14 +397,6 @@ func detectNumCPU() int {
return numCPU
}
func getDetectLocalMode(config *proxyconfigapi.KubeProxyConfiguration) proxyconfigapi.LocalMode {
if config.DetectLocalMode == "" {
klog.V(4).InfoS("Defaulting detect-local-mode", "localModeClusterCIDR", string(proxyconfigapi.LocalModeClusterCIDR))
return proxyconfigapi.LocalModeClusterCIDR
}
return config.DetectLocalMode
}
func getLocalDetector(mode proxyconfigapi.LocalMode, config *proxyconfigapi.KubeProxyConfiguration, ipt utiliptables.Interface, nodeInfo *v1.Node) (proxyutiliptables.LocalTrafficDetector, error) {
switch mode {
case proxyconfigapi.LocalModeClusterCIDR:
@ -521,15 +514,6 @@ func cidrTuple(cidrList string) [2]string {
return cidrs
}
func getProxyMode(proxyMode proxyconfigapi.ProxyMode) proxyconfigapi.ProxyMode {
if proxyMode == "" {
klog.InfoS("Using iptables proxy")
return proxyconfigapi.ProxyModeIPTables
} else {
return proxyMode
}
}
// cleanupAndExit remove iptables rules and ipset/ipvs rules
func cleanupAndExit() error {
execer := exec.New()

View File

@ -44,34 +44,66 @@ import (
utiliptablestest "k8s.io/kubernetes/pkg/util/iptables/testing"
)
func Test_getDetectLocalMode(t *testing.T) {
cases := []struct {
detectLocal string
expected proxyconfigapi.LocalMode
func Test_platformApplyDefaults(t *testing.T) {
testCases := []struct {
name string
mode proxyconfigapi.ProxyMode
expectedMode proxyconfigapi.ProxyMode
detectLocal proxyconfigapi.LocalMode
expectedDetectLocal proxyconfigapi.LocalMode
}{
{
detectLocal: "",
expected: proxyconfigapi.LocalModeClusterCIDR,
name: "defaults",
mode: "",
expectedMode: proxyconfigapi.ProxyModeIPTables,
detectLocal: "",
expectedDetectLocal: proxyconfigapi.LocalModeClusterCIDR,
},
{
detectLocal: string(proxyconfigapi.LocalModeClusterCIDR),
expected: proxyconfigapi.LocalModeClusterCIDR,
name: "explicit",
mode: proxyconfigapi.ProxyModeIPTables,
expectedMode: proxyconfigapi.ProxyModeIPTables,
detectLocal: proxyconfigapi.LocalModeClusterCIDR,
expectedDetectLocal: proxyconfigapi.LocalModeClusterCIDR,
},
{
detectLocal: string(proxyconfigapi.LocalModeInterfaceNamePrefix),
expected: proxyconfigapi.LocalModeInterfaceNamePrefix,
name: "override mode",
mode: "ipvs",
expectedMode: proxyconfigapi.ProxyModeIPVS,
detectLocal: "",
expectedDetectLocal: proxyconfigapi.LocalModeClusterCIDR,
},
{
detectLocal: string(proxyconfigapi.LocalModeBridgeInterface),
expected: proxyconfigapi.LocalModeBridgeInterface,
name: "override detect-local",
mode: "",
expectedMode: proxyconfigapi.ProxyModeIPTables,
detectLocal: "NodeCIDR",
expectedDetectLocal: proxyconfigapi.LocalModeNodeCIDR,
},
{
name: "override both",
mode: "ipvs",
expectedMode: proxyconfigapi.ProxyModeIPVS,
detectLocal: "NodeCIDR",
expectedDetectLocal: proxyconfigapi.LocalModeNodeCIDR,
},
}
for i, c := range cases {
proxyConfig := &proxyconfigapi.KubeProxyConfiguration{DetectLocalMode: proxyconfigapi.LocalMode(c.detectLocal)}
r := getDetectLocalMode(proxyConfig)
if r != c.expected {
t.Errorf("Case[%d] Expected %q got %q", i, c.expected, r)
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
options := NewOptions()
config := &proxyconfigapi.KubeProxyConfiguration{
Mode: tc.mode,
DetectLocalMode: tc.detectLocal,
}
options.platformApplyDefaults(config)
if config.Mode != tc.expectedMode {
t.Fatalf("expected mode: %s, but got: %s", tc.expectedMode, config.Mode)
}
if config.DetectLocalMode != tc.expectedDetectLocal {
t.Fatalf("expected detect-local: %s, but got: %s", tc.expectedDetectLocal, config.DetectLocalMode)
}
})
}
}

View File

@ -45,6 +45,12 @@ import (
"k8s.io/kubernetes/pkg/proxy/winkernel"
)
func (o *Options) platformApplyDefaults(config *proxyconfigapi.KubeProxyConfiguration) {
if config.Mode == "" {
config.Mode = proxyconfigapi.ProxyModeKernelspace
}
}
// NewProxyServer returns a new ProxyServer.
func NewProxyServer(o *Options) (*ProxyServer, error) {
return newProxyServer(o.config, o.master)
@ -99,7 +105,6 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, master string
}
var proxier proxy.Provider
proxyMode := proxyconfigapi.ProxyModeKernelspace
dualStackMode := getDualStackMode(config.Winkernel.NetworkName, winkernel.DualStackCompatTester{})
if dualStackMode {
klog.InfoS("Creating dualStackProxier for Windows kernel.")
@ -134,18 +139,13 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, master string
winkernel.RegisterMetrics()
return &ProxyServer{
Client: client,
Proxier: proxier,
Broadcaster: eventBroadcaster,
Recorder: recorder,
ProxyMode: proxyMode,
NodeRef: nodeRef,
MetricsBindAddress: config.MetricsBindAddress,
BindAddressHardFail: config.BindAddressHardFail,
EnableProfiling: config.EnableProfiling,
OOMScoreAdj: config.OOMScoreAdj,
ConfigSyncPeriod: config.ConfigSyncPeriod.Duration,
HealthzServer: healthzServer,
Config: config,
Client: client,
Proxier: proxier,
Broadcaster: eventBroadcaster,
Recorder: recorder,
NodeRef: nodeRef,
HealthzServer: healthzServer,
}, nil
}

View File

@ -22,6 +22,7 @@ import (
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
@ -29,6 +30,7 @@ import (
utilsysctl "k8s.io/component-helpers/node/util/sysctl"
proxyapp "k8s.io/kubernetes/cmd/kube-proxy/app"
"k8s.io/kubernetes/pkg/proxy"
proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config"
proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
"k8s.io/kubernetes/pkg/proxy/iptables"
proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables"
@ -124,14 +126,17 @@ func NewHollowProxyOrDie(
}
return &HollowProxy{
ProxyServer: &proxyapp.ProxyServer{
Client: client,
Proxier: proxier,
Broadcaster: broadcaster,
Recorder: recorder,
ProxyMode: "fake",
NodeRef: nodeRef,
OOMScoreAdj: utilpointer.Int32Ptr(0),
ConfigSyncPeriod: 30 * time.Second,
Config: &proxyconfigapi.KubeProxyConfiguration{
Mode: proxyconfigapi.ProxyMode("fake"),
ConfigSyncPeriod: metav1.Duration{Duration: 30 * time.Second},
OOMScoreAdj: utilpointer.Int32Ptr(0),
},
Client: client,
Proxier: proxier,
Broadcaster: broadcaster,
Recorder: recorder,
NodeRef: nodeRef,
},
}, nil
}