Merge duplicated Linux/Windows kube-proxy setup code

This commit is contained in:
Dan Winship 2023-03-11 17:25:13 -05:00
parent 08ce580576
commit 6232ac734a
3 changed files with 88 additions and 152 deletions

View File

@ -40,6 +40,7 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server/healthz" "k8s.io/apiserver/pkg/server/healthz"
@ -57,11 +58,13 @@ import (
"k8s.io/component-base/configz" "k8s.io/component-base/configz"
"k8s.io/component-base/logs" "k8s.io/component-base/logs"
logsapi "k8s.io/component-base/logs/api/v1" logsapi "k8s.io/component-base/logs/api/v1"
"k8s.io/component-base/metrics"
metricsfeatures "k8s.io/component-base/metrics/features" metricsfeatures "k8s.io/component-base/metrics/features"
"k8s.io/component-base/metrics/legacyregistry" "k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/metrics/prometheus/slis" "k8s.io/component-base/metrics/prometheus/slis"
"k8s.io/component-base/version" "k8s.io/component-base/version"
"k8s.io/component-base/version/verflag" "k8s.io/component-base/version/verflag"
nodeutil "k8s.io/component-helpers/node/util"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kube-proxy/config/v1alpha1" "k8s.io/kube-proxy/config/v1alpha1"
api "k8s.io/kubernetes/pkg/apis/core" api "k8s.io/kubernetes/pkg/apis/core"
@ -319,7 +322,7 @@ func (o *Options) Run() error {
return cleanupAndExit() return cleanupAndExit()
} }
proxyServer, err := NewProxyServer(o) proxyServer, err := newProxyServer(o.config, o.master)
if err != nil { if err != nil {
return err return err
} }
@ -516,10 +519,61 @@ type ProxyServer struct {
Recorder events.EventRecorder Recorder events.EventRecorder
NodeRef *v1.ObjectReference NodeRef *v1.ObjectReference
HealthzServer healthcheck.ProxierHealthUpdater HealthzServer healthcheck.ProxierHealthUpdater
Hostname string
NodeIP net.IP
Proxier proxy.Provider Proxier proxy.Provider
} }
// newProxyServer creates a ProxyServer based on the given config
func newProxyServer(config *kubeproxyconfig.KubeProxyConfiguration, master string) (*ProxyServer, error) {
s := &ProxyServer{Config: config}
cz, err := configz.New(kubeproxyconfig.GroupName)
if err != nil {
return nil, fmt.Errorf("unable to register configz: %s", err)
}
cz.Set(config)
if len(config.ShowHiddenMetricsForVersion) > 0 {
metrics.SetShowHidden()
}
s.Hostname, err = nodeutil.GetHostname(config.HostnameOverride)
if err != nil {
return nil, err
}
s.Client, err = createClient(config.ClientConnection, master)
if err != nil {
return nil, err
}
s.NodeIP = detectNodeIP(s.Client, s.Hostname, config.BindAddress)
klog.InfoS("Detected node IP", "address", s.NodeIP.String())
s.Broadcaster = events.NewBroadcaster(&events.EventSinkImpl{Interface: s.Client.EventsV1()})
s.Recorder = s.Broadcaster.NewRecorder(proxyconfigscheme.Scheme, "kube-proxy")
s.NodeRef = &v1.ObjectReference{
Kind: "Node",
Name: s.Hostname,
UID: types.UID(s.Hostname),
Namespace: "",
}
if len(config.HealthzBindAddress) > 0 {
s.HealthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, s.Recorder, s.NodeRef)
}
s.Proxier, err = s.createProxier(config)
if err != nil {
return nil, err
}
return s, nil
}
// createClient creates a kube client from the given config and masterOverride. // createClient creates a kube client from the given config and masterOverride.
// TODO remove masterOverride when CLI flags are removed. // TODO remove masterOverride when CLI flags are removed.
func createClient(config componentbaseconfig.ClientConnectionConfiguration, masterOverride string) (clientset.Interface, error) { func createClient(config componentbaseconfig.ClientConnectionConfiguration, masterOverride string) (clientset.Interface, error) {

View File

@ -35,23 +35,16 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
toolswatch "k8s.io/client-go/tools/watch" toolswatch "k8s.io/client-go/tools/watch"
"k8s.io/component-base/configz"
"k8s.io/component-base/metrics"
nodeutil "k8s.io/component-helpers/node/util"
utilsysctl "k8s.io/component-helpers/node/util/sysctl" utilsysctl "k8s.io/component-helpers/node/util/sysctl"
"k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy"
proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config" proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config"
"k8s.io/kubernetes/pkg/proxy/apis/config/scheme"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
"k8s.io/kubernetes/pkg/proxy/iptables" "k8s.io/kubernetes/pkg/proxy/iptables"
"k8s.io/kubernetes/pkg/proxy/ipvs" "k8s.io/kubernetes/pkg/proxy/ipvs"
proxymetrics "k8s.io/kubernetes/pkg/proxy/metrics" proxymetrics "k8s.io/kubernetes/pkg/proxy/metrics"
@ -83,63 +76,15 @@ func (o *Options) platformApplyDefaults(config *proxyconfigapi.KubeProxyConfigur
klog.V(2).InfoS("DetectLocalMode", "localMode", string(config.DetectLocalMode)) klog.V(2).InfoS("DetectLocalMode", "localMode", string(config.DetectLocalMode))
} }
// NewProxyServer returns a new ProxyServer. // createProxier creates the proxy.Provider
func NewProxyServer(o *Options) (*ProxyServer, error) { func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguration) (proxy.Provider, error) {
return newProxyServer(o.config, o.master)
}
func newProxyServer(
config *proxyconfigapi.KubeProxyConfiguration,
master string) (*ProxyServer, error) {
if c, err := configz.New(proxyconfigapi.GroupName); err == nil {
c.Set(config)
} else {
return nil, fmt.Errorf("unable to register configz: %s", err)
}
var ipvsInterface utilipvs.Interface
var ipsetInterface utilipset.Interface
if len(config.ShowHiddenMetricsForVersion) > 0 {
metrics.SetShowHidden()
}
hostname, err := nodeutil.GetHostname(config.HostnameOverride)
if err != nil {
return nil, err
}
client, err := createClient(config.ClientConnection, master)
if err != nil {
return nil, err
}
nodeIP := detectNodeIP(client, hostname, config.BindAddress)
klog.InfoS("Detected node IP", "address", nodeIP.String())
// Create event recorder
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "kube-proxy")
nodeRef := &v1.ObjectReference{
Kind: "Node",
Name: hostname,
UID: types.UID(hostname),
Namespace: "",
}
var healthzServer healthcheck.ProxierHealthUpdater
if len(config.HealthzBindAddress) > 0 {
healthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef)
}
var proxier proxy.Provider var proxier proxy.Provider
var err error
var nodeInfo *v1.Node var nodeInfo *v1.Node
if config.DetectLocalMode == proxyconfigapi.LocalModeNodeCIDR { if config.DetectLocalMode == proxyconfigapi.LocalModeNodeCIDR {
klog.InfoS("Watching for node, awaiting podCIDR allocation", "hostname", hostname) klog.InfoS("Watching for node, awaiting podCIDR allocation", "hostname", s.Hostname)
nodeInfo, err = waitForPodCIDR(client, hostname) nodeInfo, err = waitForPodCIDR(s.Client, s.Hostname)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -148,7 +93,7 @@ func newProxyServer(
primaryFamily := v1.IPv4Protocol primaryFamily := v1.IPv4Protocol
primaryProtocol := utiliptables.ProtocolIPv4 primaryProtocol := utiliptables.ProtocolIPv4
if netutils.IsIPv6(nodeIP) { if netutils.IsIPv6(s.NodeIP) {
primaryFamily = v1.IPv6Protocol primaryFamily = v1.IPv6Protocol
primaryProtocol = utiliptables.ProtocolIPv6 primaryProtocol = utiliptables.ProtocolIPv6
} }
@ -210,10 +155,10 @@ func newProxyServer(
*config.IPTables.LocalhostNodePorts, *config.IPTables.LocalhostNodePorts,
int(*config.IPTables.MasqueradeBit), int(*config.IPTables.MasqueradeBit),
localDetectors, localDetectors,
hostname, s.Hostname,
nodeIPTuple(config.BindAddress), nodeIPTuple(config.BindAddress),
recorder, s.Recorder,
healthzServer, s.HealthzServer,
nodePortAddresses, nodePortAddresses,
) )
} else { } else {
@ -236,10 +181,10 @@ func newProxyServer(
*config.IPTables.LocalhostNodePorts, *config.IPTables.LocalhostNodePorts,
int(*config.IPTables.MasqueradeBit), int(*config.IPTables.MasqueradeBit),
localDetector, localDetector,
hostname, s.Hostname,
nodeIP, s.NodeIP,
recorder, s.Recorder,
healthzServer, s.HealthzServer,
nodePortAddresses, nodePortAddresses,
) )
} }
@ -249,8 +194,8 @@ func newProxyServer(
} }
} else if config.Mode == proxyconfigapi.ProxyModeIPVS { } else if config.Mode == proxyconfigapi.ProxyModeIPVS {
kernelHandler := ipvs.NewLinuxKernelHandler() kernelHandler := ipvs.NewLinuxKernelHandler()
ipsetInterface = utilipset.New(execer) ipsetInterface := utilipset.New(execer)
ipvsInterface = utilipvs.New() ipvsInterface := utilipvs.New()
if err := ipvs.CanUseIPVSProxier(ipvsInterface, ipsetInterface, config.IPVS.Scheduler); err != nil { if err := ipvs.CanUseIPVSProxier(ipvsInterface, ipsetInterface, config.IPVS.Scheduler); err != nil {
return nil, fmt.Errorf("can't use the IPVS proxier: %v", err) return nil, fmt.Errorf("can't use the IPVS proxier: %v", err)
} }
@ -284,10 +229,10 @@ func newProxyServer(
config.IPTables.MasqueradeAll, config.IPTables.MasqueradeAll,
int(*config.IPTables.MasqueradeBit), int(*config.IPTables.MasqueradeBit),
localDetectors, localDetectors,
hostname, s.Hostname,
nodeIPs, nodeIPs,
recorder, s.Recorder,
healthzServer, s.HealthzServer,
config.IPVS.Scheduler, config.IPVS.Scheduler,
nodePortAddresses, nodePortAddresses,
kernelHandler, kernelHandler,
@ -316,10 +261,10 @@ func newProxyServer(
config.IPTables.MasqueradeAll, config.IPTables.MasqueradeAll,
int(*config.IPTables.MasqueradeBit), int(*config.IPTables.MasqueradeBit),
localDetector, localDetector,
hostname, s.Hostname,
nodeIP, s.NodeIP,
recorder, s.Recorder,
healthzServer, s.HealthzServer,
config.IPVS.Scheduler, config.IPVS.Scheduler,
nodePortAddresses, nodePortAddresses,
kernelHandler, kernelHandler,
@ -330,15 +275,7 @@ func newProxyServer(
} }
} }
return &ProxyServer{ return proxier, nil
Config: config,
Client: client,
Proxier: proxier,
Broadcaster: eventBroadcaster,
Recorder: recorder,
NodeRef: nodeRef,
HealthzServer: healthzServer,
}, nil
} }
func (s *ProxyServer) platformSetup() error { func (s *ProxyServer) platformSetup() error {

View File

@ -30,17 +30,9 @@ import (
// Enable pprof HTTP handlers. // Enable pprof HTTP handlers.
_ "net/http/pprof" _ "net/http/pprof"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/events"
"k8s.io/component-base/configz"
"k8s.io/component-base/metrics"
nodeutil "k8s.io/component-helpers/node/util"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy"
proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config" proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config"
proxyconfigscheme "k8s.io/kubernetes/pkg/proxy/apis/config/scheme"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
"k8s.io/kubernetes/pkg/proxy/winkernel" "k8s.io/kubernetes/pkg/proxy/winkernel"
) )
@ -50,49 +42,10 @@ func (o *Options) platformApplyDefaults(config *proxyconfigapi.KubeProxyConfigur
} }
} }
// NewProxyServer returns a new ProxyServer. // createProxier creates the proxy.Provider
func NewProxyServer(o *Options) (*ProxyServer, error) { func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguration) (proxy.Provider, error) {
return newProxyServer(o.config, o.master)
}
func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, master string) (*ProxyServer, error) {
if c, err := configz.New(proxyconfigapi.GroupName); err == nil {
c.Set(config)
} else {
return nil, fmt.Errorf("unable to register configz: %s", err)
}
if len(config.ShowHiddenMetricsForVersion) > 0 {
metrics.SetShowHidden()
}
client, err := createClient(config.ClientConnection, master)
if err != nil {
return nil, err
}
// Create event recorder
hostname, err := nodeutil.GetHostname(config.HostnameOverride)
if err != nil {
return nil, err
}
nodeIP := detectNodeIP(client, hostname, config.BindAddress)
klog.InfoS("Detected node IP", "IP", nodeIP.String())
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
recorder := eventBroadcaster.NewRecorder(proxyconfigscheme.Scheme, "kube-proxy")
nodeRef := &v1.ObjectReference{
Kind: "Node",
Name: hostname,
UID: types.UID(hostname),
Namespace: "",
}
var healthzServer healthcheck.ProxierHealthUpdater
var healthzPort int var healthzPort int
if len(config.HealthzBindAddress) > 0 { if len(config.HealthzBindAddress) > 0 {
healthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef)
_, port, _ := net.SplitHostPort(config.HealthzBindAddress) _, port, _ := net.SplitHostPort(config.HealthzBindAddress)
healthzPort, _ = strconv.Atoi(port) healthzPort, _ = strconv.Atoi(port)
} }
@ -113,10 +66,10 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, master string
config.IPTables.SyncPeriod.Duration, config.IPTables.SyncPeriod.Duration,
config.IPTables.MinSyncPeriod.Duration, config.IPTables.MinSyncPeriod.Duration,
config.ClusterCIDR, config.ClusterCIDR,
hostname, s.Hostname,
nodeIPTuple(config.BindAddress), nodeIPTuple(config.BindAddress),
recorder, s.Recorder,
healthzServer, s.HealthzServer,
config.Winkernel, config.Winkernel,
healthzPort, healthzPort,
) )
@ -125,10 +78,10 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, master string
config.IPTables.SyncPeriod.Duration, config.IPTables.SyncPeriod.Duration,
config.IPTables.MinSyncPeriod.Duration, config.IPTables.MinSyncPeriod.Duration,
config.ClusterCIDR, config.ClusterCIDR,
hostname, s.Hostname,
nodeIP, s.NodeIP,
recorder, s.Recorder,
healthzServer, s.HealthzServer,
config.Winkernel, config.Winkernel,
healthzPort, healthzPort,
) )
@ -137,15 +90,7 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, master string
return nil, fmt.Errorf("unable to create proxier: %v", err) return nil, fmt.Errorf("unable to create proxier: %v", err)
} }
return &ProxyServer{ return proxier, nil
Config: config,
Client: client,
Proxier: proxier,
Broadcaster: eventBroadcaster,
Recorder: recorder,
NodeRef: nodeRef,
HealthzServer: healthzServer,
}, nil
} }
func (s *ProxyServer) platformSetup() error { func (s *ProxyServer) platformSetup() error {