From 4b07f80d204462c0ee639d67126d5eedce1a2ec6 Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Tue, 4 Dec 2018 23:35:34 -0600 Subject: [PATCH] proxy: consolidate ServicesHandler/EndpointsHandler into ProxyProvider Proxies should be able to cleanly figure out when endpoints have been synced, so make all ProxyProviders also implement EndpointsHandler and pass those through to loadbalancers when required. --- cmd/kube-proxy/app/server.go | 6 ++-- cmd/kube-proxy/app/server_others.go | 26 +++----------- cmd/kube-proxy/app/server_windows.go | 48 +++++++++----------------- pkg/kubemark/BUILD | 1 - pkg/kubemark/hollow_proxy.go | 35 +++++++------------ pkg/proxy/BUILD | 1 + pkg/proxy/types.go | 4 +++ pkg/proxy/userspace/BUILD | 1 + pkg/proxy/userspace/loadbalancer.go | 3 ++ pkg/proxy/userspace/proxier.go | 16 +++++++++ pkg/proxy/winuserspace/BUILD | 1 + pkg/proxy/winuserspace/loadbalancer.go | 3 ++ pkg/proxy/winuserspace/proxier.go | 16 +++++++++ 13 files changed, 80 insertions(+), 81 deletions(-) diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index c91f93244fc..e1a4b7d015c 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -501,8 +501,6 @@ type ProxyServer struct { OOMScoreAdj *int32 ResourceContainer string ConfigSyncPeriod time.Duration - ServiceEventHandler config.ServiceHandler - EndpointsEventHandler config.EndpointsHandler HealthzServer *healthcheck.HealthzServer } @@ -657,11 +655,11 @@ func (s *ProxyServer) Run() error { // only notify on changes, and the initial update (on process start) may be lost if no handlers // are registered yet. serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod) - serviceConfig.RegisterEventHandler(s.ServiceEventHandler) + serviceConfig.RegisterEventHandler(s.Proxier) go serviceConfig.Run(wait.NeverStop) endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod) - endpointsConfig.RegisterEventHandler(s.EndpointsEventHandler) + endpointsConfig.RegisterEventHandler(s.Proxier) go endpointsConfig.Run(wait.NeverStop) // This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those diff --git a/cmd/kube-proxy/app/server_others.go b/cmd/kube-proxy/app/server_others.go index 84d5ea0d07d..0f0832c9bc4 100644 --- a/cmd/kube-proxy/app/server_others.go +++ b/cmd/kube-proxy/app/server_others.go @@ -33,7 +33,6 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/proxy" proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config" - proxyconfig "k8s.io/kubernetes/pkg/proxy/config" "k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/proxy/iptables" "k8s.io/kubernetes/pkg/proxy/ipvs" @@ -135,8 +134,6 @@ func newProxyServer( } var proxier proxy.ProxyProvider - var serviceEventHandler proxyconfig.ServiceHandler - var endpointsEventHandler proxyconfig.EndpointsHandler proxyMode := getProxyMode(string(config.Mode), iptInterface, kernelHandler, ipsetInterface, iptables.LinuxKernelCompatTester{}) nodeIP := net.ParseIP(config.BindAddress) @@ -151,7 +148,7 @@ func newProxyServer( } // TODO this has side effects that should only happen when Run() is invoked. - proxierIPTables, err := iptables.NewProxier( + proxier, err = iptables.NewProxier( iptInterface, utilsysctl.New(), execer, @@ -170,12 +167,9 @@ func newProxyServer( return nil, fmt.Errorf("unable to create proxier: %v", err) } metrics.RegisterMetrics() - proxier = proxierIPTables - serviceEventHandler = proxierIPTables - endpointsEventHandler = proxierIPTables } else if proxyMode == proxyModeIPVS { klog.V(0).Info("Using ipvs Proxier.") - proxierIPVS, err := ipvs.NewProxier( + proxier, err = ipvs.NewProxier( iptInterface, ipvsInterface, ipsetInterface, @@ -199,20 +193,12 @@ func newProxyServer( return nil, fmt.Errorf("unable to create proxier: %v", err) } metrics.RegisterMetrics() - proxier = proxierIPVS - serviceEventHandler = proxierIPVS - endpointsEventHandler = proxierIPVS } else { klog.V(0).Info("Using userspace Proxier.") - // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for - // our config.EndpointsConfigHandler. - loadBalancer := userspace.NewLoadBalancerRR() - // set EndpointsConfigHandler to our loadBalancer - endpointsEventHandler = loadBalancer // TODO this has side effects that should only happen when Run() is invoked. - proxierUserspace, err := userspace.NewProxier( - loadBalancer, + proxier, err = userspace.NewProxier( + userspace.NewLoadBalancerRR(), net.ParseIP(config.BindAddress), iptInterface, execer, @@ -225,8 +211,6 @@ func newProxyServer( if err != nil { return nil, fmt.Errorf("unable to create proxier: %v", err) } - serviceEventHandler = proxierUserspace - proxier = proxierUserspace } iptInterface.AddReloadFunc(proxier.Sync) @@ -250,8 +234,6 @@ func newProxyServer( OOMScoreAdj: config.OOMScoreAdj, ResourceContainer: config.ResourceContainer, ConfigSyncPeriod: config.ConfigSyncPeriod.Duration, - ServiceEventHandler: serviceEventHandler, - EndpointsEventHandler: endpointsEventHandler, HealthzServer: healthzServer, }, nil } diff --git a/cmd/kube-proxy/app/server_windows.go b/cmd/kube-proxy/app/server_windows.go index 596796cbbe1..97bce3828f1 100644 --- a/cmd/kube-proxy/app/server_windows.go +++ b/cmd/kube-proxy/app/server_windows.go @@ -33,7 +33,6 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/proxy" proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config" - proxyconfig "k8s.io/kubernetes/pkg/proxy/config" "k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/proxy/winkernel" "k8s.io/kubernetes/pkg/proxy/winuserspace" @@ -94,13 +93,11 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi } var proxier proxy.ProxyProvider - var serviceEventHandler proxyconfig.ServiceHandler - var endpointsEventHandler proxyconfig.EndpointsHandler proxyMode := getProxyMode(string(config.Mode), winkernel.WindowsKernelCompatTester{}) if proxyMode == proxyModeKernelspace { klog.V(0).Info("Using Kernelspace Proxier.") - proxierKernelspace, err := winkernel.NewProxier( + proxier, err = winkernel.NewProxier( config.IPTables.SyncPeriod.Duration, config.IPTables.MinSyncPeriod.Duration, config.IPTables.MasqueradeAll, @@ -115,23 +112,14 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi if err != nil { return nil, fmt.Errorf("unable to create proxier: %v", err) } - proxier = proxierKernelspace - endpointsEventHandler = proxierKernelspace - serviceEventHandler = proxierKernelspace } else { klog.V(0).Info("Using userspace Proxier.") execer := exec.New() var netshInterface utilnetsh.Interface netshInterface = utilnetsh.New(execer) - // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for - // our config.EndpointsConfigHandler. - loadBalancer := winuserspace.NewLoadBalancerRR() - - // set EndpointsConfigHandler to our loadBalancer - endpointsEventHandler = loadBalancer - proxierUserspace, err := winuserspace.NewProxier( - loadBalancer, + proxier, err = winuserspace.NewProxier( + winuserspace.NewLoadBalancerRR(), net.ParseIP(config.BindAddress), netshInterface, *utilnet.ParsePortRangeOrDie(config.PortRange), @@ -142,26 +130,22 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi if err != nil { return nil, fmt.Errorf("unable to create proxier: %v", err) } - proxier = proxierUserspace - serviceEventHandler = proxierUserspace } return &ProxyServer{ - Client: client, - EventClient: eventClient, - Proxier: proxier, - Broadcaster: eventBroadcaster, - Recorder: recorder, - ProxyMode: proxyMode, - NodeRef: nodeRef, - MetricsBindAddress: config.MetricsBindAddress, - EnableProfiling: config.EnableProfiling, - OOMScoreAdj: config.OOMScoreAdj, - ResourceContainer: config.ResourceContainer, - ConfigSyncPeriod: config.ConfigSyncPeriod.Duration, - ServiceEventHandler: serviceEventHandler, - EndpointsEventHandler: endpointsEventHandler, - HealthzServer: healthzServer, + Client: client, + EventClient: eventClient, + Proxier: proxier, + Broadcaster: eventBroadcaster, + Recorder: recorder, + ProxyMode: proxyMode, + NodeRef: nodeRef, + MetricsBindAddress: config.MetricsBindAddress, + EnableProfiling: config.EnableProfiling, + OOMScoreAdj: config.OOMScoreAdj, + ResourceContainer: config.ResourceContainer, + ConfigSyncPeriod: config.ConfigSyncPeriod.Duration, + HealthzServer: healthzServer, }, nil } diff --git a/pkg/kubemark/BUILD b/pkg/kubemark/BUILD index ced440d7bc6..1b689f13157 100644 --- a/pkg/kubemark/BUILD +++ b/pkg/kubemark/BUILD @@ -26,7 +26,6 @@ go_library( "//pkg/kubelet/dockershim:go_default_library", "//pkg/kubelet/types:go_default_library", "//pkg/proxy:go_default_library", - "//pkg/proxy/config:go_default_library", "//pkg/proxy/iptables:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/mount:go_default_library", diff --git a/pkg/kubemark/hollow_proxy.go b/pkg/kubemark/hollow_proxy.go index 5e4a7ec8897..600a97b295b 100644 --- a/pkg/kubemark/hollow_proxy.go +++ b/pkg/kubemark/hollow_proxy.go @@ -27,7 +27,6 @@ import ( "k8s.io/client-go/tools/record" proxyapp "k8s.io/kubernetes/cmd/kube-proxy/app" "k8s.io/kubernetes/pkg/proxy" - proxyconfig "k8s.io/kubernetes/pkg/proxy/config" "k8s.io/kubernetes/pkg/proxy/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables" utilnode "k8s.io/kubernetes/pkg/util/node" @@ -72,13 +71,12 @@ func NewHollowProxyOrDie( ) (*HollowProxy, error) { // Create proxier and service/endpoint handlers. var proxier proxy.ProxyProvider - var serviceHandler proxyconfig.ServiceHandler - var endpointsHandler proxyconfig.EndpointsHandler + var err error if useRealProxier { // Real proxier with fake iptables, sysctl, etc underneath it. //var err error - proxierIPTables, err := iptables.NewProxier( + proxier, err = iptables.NewProxier( iptInterface, sysctl, execer, @@ -96,13 +94,8 @@ func NewHollowProxyOrDie( if err != nil { return nil, fmt.Errorf("unable to create proxier: %v", err) } - proxier = proxierIPTables - serviceHandler = proxierIPTables - endpointsHandler = proxierIPTables } else { proxier = &FakeProxier{} - serviceHandler = &FakeProxier{} - endpointsHandler = &FakeProxier{} } // Create a Hollow Proxy instance. @@ -114,19 +107,17 @@ func NewHollowProxyOrDie( } return &HollowProxy{ ProxyServer: &proxyapp.ProxyServer{ - Client: client, - EventClient: eventClient, - IptInterface: iptInterface, - Proxier: proxier, - Broadcaster: broadcaster, - Recorder: recorder, - ProxyMode: "fake", - NodeRef: nodeRef, - OOMScoreAdj: utilpointer.Int32Ptr(0), - ResourceContainer: "", - ConfigSyncPeriod: 30 * time.Second, - ServiceEventHandler: serviceHandler, - EndpointsEventHandler: endpointsHandler, + Client: client, + EventClient: eventClient, + IptInterface: iptInterface, + Proxier: proxier, + Broadcaster: broadcaster, + Recorder: recorder, + ProxyMode: "fake", + NodeRef: nodeRef, + OOMScoreAdj: utilpointer.Int32Ptr(0), + ResourceContainer: "", + ConfigSyncPeriod: 30 * time.Second, }, }, nil } diff --git a/pkg/proxy/BUILD b/pkg/proxy/BUILD index a7969c25c86..b43afc322c3 100644 --- a/pkg/proxy/BUILD +++ b/pkg/proxy/BUILD @@ -17,6 +17,7 @@ go_library( importpath = "k8s.io/kubernetes/pkg/proxy", deps = [ "//pkg/api/v1/service:go_default_library", + "//pkg/proxy/config:go_default_library", "//pkg/proxy/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", diff --git a/pkg/proxy/types.go b/pkg/proxy/types.go index b7d8b1654a9..410603db195 100644 --- a/pkg/proxy/types.go +++ b/pkg/proxy/types.go @@ -21,10 +21,14 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/kubernetes/pkg/proxy/config" ) // ProxyProvider is the interface provided by proxier implementations. type ProxyProvider interface { + config.EndpointsHandler + config.ServiceHandler + // Sync immediately synchronizes the ProxyProvider's current state to proxy rules. Sync() // SyncLoop runs periodic work. diff --git a/pkg/proxy/userspace/BUILD b/pkg/proxy/userspace/BUILD index b7887c85b5d..b378147969f 100644 --- a/pkg/proxy/userspace/BUILD +++ b/pkg/proxy/userspace/BUILD @@ -21,6 +21,7 @@ go_library( deps = [ "//pkg/apis/core/v1/helper:go_default_library", "//pkg/proxy:go_default_library", + "//pkg/proxy/config:go_default_library", "//pkg/proxy/util:go_default_library", "//pkg/util/conntrack:go_default_library", "//pkg/util/iptables:go_default_library", diff --git a/pkg/proxy/userspace/loadbalancer.go b/pkg/proxy/userspace/loadbalancer.go index f0e5bcaa694..0b853477851 100644 --- a/pkg/proxy/userspace/loadbalancer.go +++ b/pkg/proxy/userspace/loadbalancer.go @@ -19,6 +19,7 @@ package userspace import ( "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/proxy" + proxyconfig "k8s.io/kubernetes/pkg/proxy/config" "net" ) @@ -31,4 +32,6 @@ type LoadBalancer interface { DeleteService(service proxy.ServicePortName) CleanupStaleStickySessions(service proxy.ServicePortName) ServiceHasEndpoints(service proxy.ServicePortName) bool + + proxyconfig.EndpointsHandler } diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index 661092b1b2b..52f8586b656 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -526,6 +526,22 @@ func (proxier *Proxier) OnServiceDelete(service *v1.Service) { func (proxier *Proxier) OnServiceSynced() { } +func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) { + proxier.loadBalancer.OnEndpointsAdd(endpoints) +} + +func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) { + proxier.loadBalancer.OnEndpointsUpdate(oldEndpoints, endpoints) +} + +func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) { + proxier.loadBalancer.OnEndpointsDelete(endpoints) +} + +func (proxier *Proxier) OnEndpointsSynced() { + proxier.loadBalancer.OnEndpointsSynced() +} + func sameConfig(info *ServiceInfo, service *v1.Service, port *v1.ServicePort) bool { if info.protocol != port.Protocol || info.portal.port != int(port.Port) || info.nodePort != int(port.NodePort) { return false diff --git a/pkg/proxy/winuserspace/BUILD b/pkg/proxy/winuserspace/BUILD index 3925dc96d72..8bad75c386b 100644 --- a/pkg/proxy/winuserspace/BUILD +++ b/pkg/proxy/winuserspace/BUILD @@ -19,6 +19,7 @@ go_library( deps = [ "//pkg/apis/core/v1/helper:go_default_library", "//pkg/proxy:go_default_library", + "//pkg/proxy/config:go_default_library", "//pkg/util/ipconfig:go_default_library", "//pkg/util/netsh:go_default_library", "//pkg/util/slice:go_default_library", diff --git a/pkg/proxy/winuserspace/loadbalancer.go b/pkg/proxy/winuserspace/loadbalancer.go index e7cb770e930..d96a2951f48 100644 --- a/pkg/proxy/winuserspace/loadbalancer.go +++ b/pkg/proxy/winuserspace/loadbalancer.go @@ -19,6 +19,7 @@ package winuserspace import ( "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/proxy" + proxyconfig "k8s.io/kubernetes/pkg/proxy/config" "net" ) @@ -30,4 +31,6 @@ type LoadBalancer interface { NewService(service proxy.ServicePortName, sessionAffinityType v1.ServiceAffinity, stickyMaxAgeMinutes int) error DeleteService(service proxy.ServicePortName) CleanupStaleStickySessions(service proxy.ServicePortName) + + proxyconfig.EndpointsHandler } diff --git a/pkg/proxy/winuserspace/proxier.go b/pkg/proxy/winuserspace/proxier.go index 4b5a218cfc0..91cbd6337c8 100644 --- a/pkg/proxy/winuserspace/proxier.go +++ b/pkg/proxy/winuserspace/proxier.go @@ -444,6 +444,22 @@ func (proxier *Proxier) OnServiceDelete(service *v1.Service) { func (proxier *Proxier) OnServiceSynced() { } +func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) { + proxier.loadBalancer.OnEndpointsAdd(endpoints) +} + +func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) { + proxier.loadBalancer.OnEndpointsUpdate(oldEndpoints, endpoints) +} + +func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) { + proxier.loadBalancer.OnEndpointsDelete(endpoints) +} + +func (proxier *Proxier) OnEndpointsSynced() { + proxier.loadBalancer.OnEndpointsSynced() +} + func sameConfig(info *serviceInfo, service *v1.Service, protocol v1.Protocol, listenPort int) bool { return info.protocol == protocol && info.portal.port == listenPort && info.sessionAffinityType == service.Spec.SessionAffinity }