From 4b07f80d204462c0ee639d67126d5eedce1a2ec6 Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Tue, 4 Dec 2018 23:35:34 -0600 Subject: [PATCH 1/6] proxy: consolidate ServicesHandler/EndpointsHandler into ProxyProvider Proxies should be able to cleanly figure out when endpoints have been synced, so make all ProxyProviders also implement EndpointsHandler and pass those through to loadbalancers when required. --- cmd/kube-proxy/app/server.go | 6 ++-- cmd/kube-proxy/app/server_others.go | 26 +++----------- cmd/kube-proxy/app/server_windows.go | 48 +++++++++----------------- pkg/kubemark/BUILD | 1 - pkg/kubemark/hollow_proxy.go | 35 +++++++------------ pkg/proxy/BUILD | 1 + pkg/proxy/types.go | 4 +++ pkg/proxy/userspace/BUILD | 1 + pkg/proxy/userspace/loadbalancer.go | 3 ++ pkg/proxy/userspace/proxier.go | 16 +++++++++ pkg/proxy/winuserspace/BUILD | 1 + pkg/proxy/winuserspace/loadbalancer.go | 3 ++ pkg/proxy/winuserspace/proxier.go | 16 +++++++++ 13 files changed, 80 insertions(+), 81 deletions(-) diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index c91f93244fc..e1a4b7d015c 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -501,8 +501,6 @@ type ProxyServer struct { OOMScoreAdj *int32 ResourceContainer string ConfigSyncPeriod time.Duration - ServiceEventHandler config.ServiceHandler - EndpointsEventHandler config.EndpointsHandler HealthzServer *healthcheck.HealthzServer } @@ -657,11 +655,11 @@ func (s *ProxyServer) Run() error { // only notify on changes, and the initial update (on process start) may be lost if no handlers // are registered yet. serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod) - serviceConfig.RegisterEventHandler(s.ServiceEventHandler) + serviceConfig.RegisterEventHandler(s.Proxier) go serviceConfig.Run(wait.NeverStop) endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod) - endpointsConfig.RegisterEventHandler(s.EndpointsEventHandler) + endpointsConfig.RegisterEventHandler(s.Proxier) go endpointsConfig.Run(wait.NeverStop) // This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those diff --git a/cmd/kube-proxy/app/server_others.go b/cmd/kube-proxy/app/server_others.go index 84d5ea0d07d..0f0832c9bc4 100644 --- a/cmd/kube-proxy/app/server_others.go +++ b/cmd/kube-proxy/app/server_others.go @@ -33,7 +33,6 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/proxy" proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config" - proxyconfig "k8s.io/kubernetes/pkg/proxy/config" "k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/proxy/iptables" "k8s.io/kubernetes/pkg/proxy/ipvs" @@ -135,8 +134,6 @@ func newProxyServer( } var proxier proxy.ProxyProvider - var serviceEventHandler proxyconfig.ServiceHandler - var endpointsEventHandler proxyconfig.EndpointsHandler proxyMode := getProxyMode(string(config.Mode), iptInterface, kernelHandler, ipsetInterface, iptables.LinuxKernelCompatTester{}) nodeIP := net.ParseIP(config.BindAddress) @@ -151,7 +148,7 @@ func newProxyServer( } // TODO this has side effects that should only happen when Run() is invoked. - proxierIPTables, err := iptables.NewProxier( + proxier, err = iptables.NewProxier( iptInterface, utilsysctl.New(), execer, @@ -170,12 +167,9 @@ func newProxyServer( return nil, fmt.Errorf("unable to create proxier: %v", err) } metrics.RegisterMetrics() - proxier = proxierIPTables - serviceEventHandler = proxierIPTables - endpointsEventHandler = proxierIPTables } else if proxyMode == proxyModeIPVS { klog.V(0).Info("Using ipvs Proxier.") - proxierIPVS, err := ipvs.NewProxier( + proxier, err = ipvs.NewProxier( iptInterface, ipvsInterface, ipsetInterface, @@ -199,20 +193,12 @@ func newProxyServer( return nil, fmt.Errorf("unable to create proxier: %v", err) } metrics.RegisterMetrics() - proxier = proxierIPVS - serviceEventHandler = proxierIPVS - endpointsEventHandler = proxierIPVS } else { klog.V(0).Info("Using userspace Proxier.") - // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for - // our config.EndpointsConfigHandler. - loadBalancer := userspace.NewLoadBalancerRR() - // set EndpointsConfigHandler to our loadBalancer - endpointsEventHandler = loadBalancer // TODO this has side effects that should only happen when Run() is invoked. - proxierUserspace, err := userspace.NewProxier( - loadBalancer, + proxier, err = userspace.NewProxier( + userspace.NewLoadBalancerRR(), net.ParseIP(config.BindAddress), iptInterface, execer, @@ -225,8 +211,6 @@ func newProxyServer( if err != nil { return nil, fmt.Errorf("unable to create proxier: %v", err) } - serviceEventHandler = proxierUserspace - proxier = proxierUserspace } iptInterface.AddReloadFunc(proxier.Sync) @@ -250,8 +234,6 @@ func newProxyServer( OOMScoreAdj: config.OOMScoreAdj, ResourceContainer: config.ResourceContainer, ConfigSyncPeriod: config.ConfigSyncPeriod.Duration, - ServiceEventHandler: serviceEventHandler, - EndpointsEventHandler: endpointsEventHandler, HealthzServer: healthzServer, }, nil } diff --git a/cmd/kube-proxy/app/server_windows.go b/cmd/kube-proxy/app/server_windows.go index 596796cbbe1..97bce3828f1 100644 --- a/cmd/kube-proxy/app/server_windows.go +++ b/cmd/kube-proxy/app/server_windows.go @@ -33,7 +33,6 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/proxy" proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config" - proxyconfig "k8s.io/kubernetes/pkg/proxy/config" "k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/proxy/winkernel" "k8s.io/kubernetes/pkg/proxy/winuserspace" @@ -94,13 +93,11 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi } var proxier proxy.ProxyProvider - var serviceEventHandler proxyconfig.ServiceHandler - var endpointsEventHandler proxyconfig.EndpointsHandler proxyMode := getProxyMode(string(config.Mode), winkernel.WindowsKernelCompatTester{}) if proxyMode == proxyModeKernelspace { klog.V(0).Info("Using Kernelspace Proxier.") - proxierKernelspace, err := winkernel.NewProxier( + proxier, err = winkernel.NewProxier( config.IPTables.SyncPeriod.Duration, config.IPTables.MinSyncPeriod.Duration, config.IPTables.MasqueradeAll, @@ -115,23 +112,14 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi if err != nil { return nil, fmt.Errorf("unable to create proxier: %v", err) } - proxier = proxierKernelspace - endpointsEventHandler = proxierKernelspace - serviceEventHandler = proxierKernelspace } else { klog.V(0).Info("Using userspace Proxier.") execer := exec.New() var netshInterface utilnetsh.Interface netshInterface = utilnetsh.New(execer) - // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for - // our config.EndpointsConfigHandler. - loadBalancer := winuserspace.NewLoadBalancerRR() - - // set EndpointsConfigHandler to our loadBalancer - endpointsEventHandler = loadBalancer - proxierUserspace, err := winuserspace.NewProxier( - loadBalancer, + proxier, err = winuserspace.NewProxier( + winuserspace.NewLoadBalancerRR(), net.ParseIP(config.BindAddress), netshInterface, *utilnet.ParsePortRangeOrDie(config.PortRange), @@ -142,26 +130,22 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi if err != nil { return nil, fmt.Errorf("unable to create proxier: %v", err) } - proxier = proxierUserspace - serviceEventHandler = proxierUserspace } return &ProxyServer{ - Client: client, - EventClient: eventClient, - Proxier: proxier, - Broadcaster: eventBroadcaster, - Recorder: recorder, - ProxyMode: proxyMode, - NodeRef: nodeRef, - MetricsBindAddress: config.MetricsBindAddress, - EnableProfiling: config.EnableProfiling, - OOMScoreAdj: config.OOMScoreAdj, - ResourceContainer: config.ResourceContainer, - ConfigSyncPeriod: config.ConfigSyncPeriod.Duration, - ServiceEventHandler: serviceEventHandler, - EndpointsEventHandler: endpointsEventHandler, - HealthzServer: healthzServer, + Client: client, + EventClient: eventClient, + Proxier: proxier, + Broadcaster: eventBroadcaster, + Recorder: recorder, + ProxyMode: proxyMode, + NodeRef: nodeRef, + MetricsBindAddress: config.MetricsBindAddress, + EnableProfiling: config.EnableProfiling, + OOMScoreAdj: config.OOMScoreAdj, + ResourceContainer: config.ResourceContainer, + ConfigSyncPeriod: config.ConfigSyncPeriod.Duration, + HealthzServer: healthzServer, }, nil } diff --git a/pkg/kubemark/BUILD b/pkg/kubemark/BUILD index ced440d7bc6..1b689f13157 100644 --- a/pkg/kubemark/BUILD +++ b/pkg/kubemark/BUILD @@ -26,7 +26,6 @@ go_library( "//pkg/kubelet/dockershim:go_default_library", "//pkg/kubelet/types:go_default_library", "//pkg/proxy:go_default_library", - "//pkg/proxy/config:go_default_library", "//pkg/proxy/iptables:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/mount:go_default_library", diff --git a/pkg/kubemark/hollow_proxy.go b/pkg/kubemark/hollow_proxy.go index 5e4a7ec8897..600a97b295b 100644 --- a/pkg/kubemark/hollow_proxy.go +++ b/pkg/kubemark/hollow_proxy.go @@ -27,7 +27,6 @@ import ( "k8s.io/client-go/tools/record" proxyapp "k8s.io/kubernetes/cmd/kube-proxy/app" "k8s.io/kubernetes/pkg/proxy" - proxyconfig "k8s.io/kubernetes/pkg/proxy/config" "k8s.io/kubernetes/pkg/proxy/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables" utilnode "k8s.io/kubernetes/pkg/util/node" @@ -72,13 +71,12 @@ func NewHollowProxyOrDie( ) (*HollowProxy, error) { // Create proxier and service/endpoint handlers. var proxier proxy.ProxyProvider - var serviceHandler proxyconfig.ServiceHandler - var endpointsHandler proxyconfig.EndpointsHandler + var err error if useRealProxier { // Real proxier with fake iptables, sysctl, etc underneath it. //var err error - proxierIPTables, err := iptables.NewProxier( + proxier, err = iptables.NewProxier( iptInterface, sysctl, execer, @@ -96,13 +94,8 @@ func NewHollowProxyOrDie( if err != nil { return nil, fmt.Errorf("unable to create proxier: %v", err) } - proxier = proxierIPTables - serviceHandler = proxierIPTables - endpointsHandler = proxierIPTables } else { proxier = &FakeProxier{} - serviceHandler = &FakeProxier{} - endpointsHandler = &FakeProxier{} } // Create a Hollow Proxy instance. @@ -114,19 +107,17 @@ func NewHollowProxyOrDie( } return &HollowProxy{ ProxyServer: &proxyapp.ProxyServer{ - Client: client, - EventClient: eventClient, - IptInterface: iptInterface, - Proxier: proxier, - Broadcaster: broadcaster, - Recorder: recorder, - ProxyMode: "fake", - NodeRef: nodeRef, - OOMScoreAdj: utilpointer.Int32Ptr(0), - ResourceContainer: "", - ConfigSyncPeriod: 30 * time.Second, - ServiceEventHandler: serviceHandler, - EndpointsEventHandler: endpointsHandler, + Client: client, + EventClient: eventClient, + IptInterface: iptInterface, + Proxier: proxier, + Broadcaster: broadcaster, + Recorder: recorder, + ProxyMode: "fake", + NodeRef: nodeRef, + OOMScoreAdj: utilpointer.Int32Ptr(0), + ResourceContainer: "", + ConfigSyncPeriod: 30 * time.Second, }, }, nil } diff --git a/pkg/proxy/BUILD b/pkg/proxy/BUILD index a7969c25c86..b43afc322c3 100644 --- a/pkg/proxy/BUILD +++ b/pkg/proxy/BUILD @@ -17,6 +17,7 @@ go_library( importpath = "k8s.io/kubernetes/pkg/proxy", deps = [ "//pkg/api/v1/service:go_default_library", + "//pkg/proxy/config:go_default_library", "//pkg/proxy/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", diff --git a/pkg/proxy/types.go b/pkg/proxy/types.go index b7d8b1654a9..410603db195 100644 --- a/pkg/proxy/types.go +++ b/pkg/proxy/types.go @@ -21,10 +21,14 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/kubernetes/pkg/proxy/config" ) // ProxyProvider is the interface provided by proxier implementations. type ProxyProvider interface { + config.EndpointsHandler + config.ServiceHandler + // Sync immediately synchronizes the ProxyProvider's current state to proxy rules. Sync() // SyncLoop runs periodic work. diff --git a/pkg/proxy/userspace/BUILD b/pkg/proxy/userspace/BUILD index b7887c85b5d..b378147969f 100644 --- a/pkg/proxy/userspace/BUILD +++ b/pkg/proxy/userspace/BUILD @@ -21,6 +21,7 @@ go_library( deps = [ "//pkg/apis/core/v1/helper:go_default_library", "//pkg/proxy:go_default_library", + "//pkg/proxy/config:go_default_library", "//pkg/proxy/util:go_default_library", "//pkg/util/conntrack:go_default_library", "//pkg/util/iptables:go_default_library", diff --git a/pkg/proxy/userspace/loadbalancer.go b/pkg/proxy/userspace/loadbalancer.go index f0e5bcaa694..0b853477851 100644 --- a/pkg/proxy/userspace/loadbalancer.go +++ b/pkg/proxy/userspace/loadbalancer.go @@ -19,6 +19,7 @@ package userspace import ( "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/proxy" + proxyconfig "k8s.io/kubernetes/pkg/proxy/config" "net" ) @@ -31,4 +32,6 @@ type LoadBalancer interface { DeleteService(service proxy.ServicePortName) CleanupStaleStickySessions(service proxy.ServicePortName) ServiceHasEndpoints(service proxy.ServicePortName) bool + + proxyconfig.EndpointsHandler } diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index 661092b1b2b..52f8586b656 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -526,6 +526,22 @@ func (proxier *Proxier) OnServiceDelete(service *v1.Service) { func (proxier *Proxier) OnServiceSynced() { } +func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) { + proxier.loadBalancer.OnEndpointsAdd(endpoints) +} + +func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) { + proxier.loadBalancer.OnEndpointsUpdate(oldEndpoints, endpoints) +} + +func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) { + proxier.loadBalancer.OnEndpointsDelete(endpoints) +} + +func (proxier *Proxier) OnEndpointsSynced() { + proxier.loadBalancer.OnEndpointsSynced() +} + func sameConfig(info *ServiceInfo, service *v1.Service, port *v1.ServicePort) bool { if info.protocol != port.Protocol || info.portal.port != int(port.Port) || info.nodePort != int(port.NodePort) { return false diff --git a/pkg/proxy/winuserspace/BUILD b/pkg/proxy/winuserspace/BUILD index 3925dc96d72..8bad75c386b 100644 --- a/pkg/proxy/winuserspace/BUILD +++ b/pkg/proxy/winuserspace/BUILD @@ -19,6 +19,7 @@ go_library( deps = [ "//pkg/apis/core/v1/helper:go_default_library", "//pkg/proxy:go_default_library", + "//pkg/proxy/config:go_default_library", "//pkg/util/ipconfig:go_default_library", "//pkg/util/netsh:go_default_library", "//pkg/util/slice:go_default_library", diff --git a/pkg/proxy/winuserspace/loadbalancer.go b/pkg/proxy/winuserspace/loadbalancer.go index e7cb770e930..d96a2951f48 100644 --- a/pkg/proxy/winuserspace/loadbalancer.go +++ b/pkg/proxy/winuserspace/loadbalancer.go @@ -19,6 +19,7 @@ package winuserspace import ( "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/proxy" + proxyconfig "k8s.io/kubernetes/pkg/proxy/config" "net" ) @@ -30,4 +31,6 @@ type LoadBalancer interface { NewService(service proxy.ServicePortName, sessionAffinityType v1.ServiceAffinity, stickyMaxAgeMinutes int) error DeleteService(service proxy.ServicePortName) CleanupStaleStickySessions(service proxy.ServicePortName) + + proxyconfig.EndpointsHandler } diff --git a/pkg/proxy/winuserspace/proxier.go b/pkg/proxy/winuserspace/proxier.go index 4b5a218cfc0..91cbd6337c8 100644 --- a/pkg/proxy/winuserspace/proxier.go +++ b/pkg/proxy/winuserspace/proxier.go @@ -444,6 +444,22 @@ func (proxier *Proxier) OnServiceDelete(service *v1.Service) { func (proxier *Proxier) OnServiceSynced() { } +func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) { + proxier.loadBalancer.OnEndpointsAdd(endpoints) +} + +func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) { + proxier.loadBalancer.OnEndpointsUpdate(oldEndpoints, endpoints) +} + +func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) { + proxier.loadBalancer.OnEndpointsDelete(endpoints) +} + +func (proxier *Proxier) OnEndpointsSynced() { + proxier.loadBalancer.OnEndpointsSynced() +} + func sameConfig(info *serviceInfo, service *v1.Service, protocol v1.Protocol, listenPort int) bool { return info.protocol == protocol && info.portal.port == listenPort && info.sessionAffinityType == service.Spec.SessionAffinity } From ddab79a233fc159633ae710d50f406f06c69681a Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Sun, 31 Mar 2019 14:41:11 -0500 Subject: [PATCH 2/6] proxy/userspace: add proxy shutdown function and use in testcases If a testcase does time out and 'go test' prints the call stack, make sure everything from previous tests is cleaned up so the call stack is easier to understand. --- pkg/proxy/userspace/proxier.go | 27 ++++++++++++++++++++++++--- pkg/proxy/userspace/proxier_test.go | 15 +++++++++++++++ 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index 52f8586b656..0f57ce81bdf 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -109,6 +109,8 @@ type Proxier struct { proxyPorts PortAllocator makeProxySocket ProxySocketFunc exec utilexec.Interface + + stopChan chan struct{} } // assert Proxier is a ProxyProvider @@ -216,6 +218,7 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables proxyPorts: proxyPorts, makeProxySocket: makeProxySocket, exec: exec, + stopChan: make(chan struct{}), }, nil } @@ -287,6 +290,20 @@ func CleanupLeftovers(ipt iptables.Interface) (encounteredError bool) { return encounteredError } +// shutdown closes all service port proxies and returns from the proxy's +// sync loop. Used from testcases. +func (proxier *Proxier) shutdown() { + defer proxier.cleanupStaleStickySessions() + + proxier.mu.Lock() + defer proxier.mu.Unlock() + + for serviceName, info := range proxier.serviceMap { + proxier.stopProxyInternal(serviceName, info) + } + close(proxier.stopChan) +} + // Sync is called to immediately synchronize the proxier state to iptables func (proxier *Proxier) Sync() { if err := iptablesInit(proxier.iptables); err != nil { @@ -301,9 +318,13 @@ func (proxier *Proxier) SyncLoop() { t := time.NewTicker(proxier.syncPeriod) defer t.Stop() for { - <-t.C - klog.V(6).Infof("Periodic sync") - proxier.Sync() + select { + case <-t.C: + klog.V(6).Infof("Periodic sync") + proxier.Sync() + case <-proxier.stopChan: + return + } } } diff --git a/pkg/proxy/userspace/proxier_test.go b/pkg/proxy/userspace/proxier_test.go index cee7eec411b..9488c1bb213 100644 --- a/pkg/proxy/userspace/proxier_test.go +++ b/pkg/proxy/userspace/proxier_test.go @@ -243,6 +243,7 @@ func TestTCPProxy(t *testing.T) { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) if err != nil { @@ -270,6 +271,7 @@ func TestUDPProxy(t *testing.T) { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) if err != nil { @@ -297,6 +299,7 @@ func TestUDPProxyTimeout(t *testing.T) { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) if err != nil { @@ -336,6 +339,7 @@ func TestMultiPortProxy(t *testing.T) { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) + defer p.shutdown() svcInfoP, err := p.addServiceOnPort(serviceP, "TCP", 0, time.Second) if err != nil { @@ -365,6 +369,7 @@ func TestMultiPortOnServiceAdd(t *testing.T) { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) + defer p.shutdown() p.OnServiceAdd(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, @@ -428,6 +433,7 @@ func TestTCPProxyStop(t *testing.T) { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) if err != nil { @@ -472,6 +478,7 @@ func TestUDPProxyStop(t *testing.T) { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) if err != nil { @@ -510,6 +517,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) if err != nil { @@ -554,6 +562,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) if err != nil { @@ -599,6 +608,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) if err != nil { @@ -661,6 +671,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) if err != nil { @@ -722,6 +733,7 @@ func TestTCPProxyUpdatePort(t *testing.T) { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) if err != nil { @@ -770,6 +782,7 @@ func TestUDPProxyUpdatePort(t *testing.T) { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) if err != nil { @@ -815,6 +828,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) if err != nil { @@ -868,6 +882,7 @@ func TestProxyUpdatePortal(t *testing.T) { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) if err != nil { From 04b943ce382531d572f6f70da7b1973aec1d4b09 Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Sun, 31 Mar 2019 15:09:12 -0500 Subject: [PATCH 3/6] proxy/userspace: track initial service/endpoints sync We'll use this shortly to prevent premature syncing before all initial endpoints and services have been received from the apiserver. --- pkg/proxy/userspace/proxier.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index 0f57ce81bdf..d50cd31a6c7 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -109,6 +109,12 @@ type Proxier struct { proxyPorts PortAllocator makeProxySocket ProxySocketFunc exec utilexec.Interface + // endpointsSynced and servicesSynced are set to 1 when the corresponding + // objects are synced after startup. This is used to avoid updating iptables + // with some partial data after kube-proxy restart. + endpointsSynced int32 + servicesSynced int32 + initialized int32 stopChan chan struct{} } @@ -304,6 +310,10 @@ func (proxier *Proxier) shutdown() { close(proxier.stopChan) } +func (proxier *Proxier) isInitialized() bool { + return atomic.LoadInt32(&proxier.initialized) > 0 +} + // Sync is called to immediately synchronize the proxier state to iptables func (proxier *Proxier) Sync() { if err := iptablesInit(proxier.iptables); err != nil { @@ -545,6 +555,14 @@ func (proxier *Proxier) OnServiceDelete(service *v1.Service) { } func (proxier *Proxier) OnServiceSynced() { + klog.V(2).Infof("userspace OnServiceSynced") + + // Mark services as initialized and (if endpoints are already + // initialized) the entire proxy as initialized + atomic.StoreInt32(&proxier.servicesSynced, 1) + if atomic.LoadInt32(&proxier.endpointsSynced) > 0 { + atomic.StoreInt32(&proxier.initialized, 1) + } } func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) { @@ -560,7 +578,15 @@ func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) { } func (proxier *Proxier) OnEndpointsSynced() { + klog.V(2).Infof("userspace OnEndpointsSynced") proxier.loadBalancer.OnEndpointsSynced() + + // Mark endpoints as initialized and (if services are already + // initialized) the entire proxy as initialized + atomic.StoreInt32(&proxier.endpointsSynced, 1) + if atomic.LoadInt32(&proxier.servicesSynced) > 0 { + atomic.StoreInt32(&proxier.initialized, 1) + } } func sameConfig(info *ServiceInfo, service *v1.Service, port *v1.ServicePort) bool { From cf7225f56180a0e44922f59e870c4378f2f72d59 Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Sun, 31 Mar 2019 15:20:29 -0500 Subject: [PATCH 4/6] proxy/userspace: replace IsServiceIPSet() with ShouldSkipService() Keeps things consistent with iptables/IPVS proxies. Proxies don't handle ServiceTypeExternalName even if the ClusterIP is set. --- pkg/proxy/userspace/proxier.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index d50cd31a6c7..698ae65a9b3 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -435,7 +435,7 @@ func (proxier *Proxier) mergeService(service *v1.Service) sets.String { return nil } svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if !helper.IsServiceIPSet(service) { + if utilproxy.ShouldSkipService(svcName, service) { klog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP) return nil } @@ -500,7 +500,7 @@ func (proxier *Proxier) unmergeService(service *v1.Service, existingPorts sets.S return } svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if !helper.IsServiceIPSet(service) { + if utilproxy.ShouldSkipService(svcName, service) { klog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP) return } From 8cf0076e238a47322c69eaed05b7b805d839e6e7 Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Fri, 29 Mar 2019 09:25:21 -0500 Subject: [PATCH 5/6] proxy/userspace: respect minSyncInterval and simplify locking The userspace proxy does not have any ratelimiting and when many services are used will hammer iptables every time a service or endpoint change occurs. Instead build up a map of changed services and process all those changes at once instead of each time an event comes in. This also ensures that no long-running processing happens in the same call chain as the OnService* calls as this blocks other handlers attached to the proxy's parent ServiceConfig object for long periods of time. Locking can also now be simplified as the only accesses to the proxy's serviceMap happen from syncProxyRules(). So instead of locking in many functions just lock once in syncProxyRules() like the other proxies do. https://bugzilla.redhat.com/show_bug.cgi?id=1590589 https://bugzilla.redhat.com/show_bug.cgi?id=1689690 --- pkg/proxy/userspace/BUILD | 2 + pkg/proxy/userspace/proxier.go | 171 ++++++++++++----- pkg/proxy/userspace/proxier_test.go | 277 +++++++++++++++++++++++----- 3 files changed, 357 insertions(+), 93 deletions(-) diff --git a/pkg/proxy/userspace/BUILD b/pkg/proxy/userspace/BUILD index b378147969f..87e3da69e96 100644 --- a/pkg/proxy/userspace/BUILD +++ b/pkg/proxy/userspace/BUILD @@ -23,6 +23,7 @@ go_library( "//pkg/proxy:go_default_library", "//pkg/proxy/config:go_default_library", "//pkg/proxy/util:go_default_library", + "//pkg/util/async:go_default_library", "//pkg/util/conntrack:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/slice:go_default_library", @@ -86,6 +87,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", "//vendor/k8s.io/utils/exec/testing:go_default_library", ], diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index 698ae65a9b3..8db24f0da9b 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -19,6 +19,7 @@ package userspace import ( "fmt" "net" + "reflect" "strconv" "strings" "sync" @@ -35,6 +36,7 @@ import ( "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/proxy" utilproxy "k8s.io/kubernetes/pkg/proxy/util" + "k8s.io/kubernetes/pkg/util/async" "k8s.io/kubernetes/pkg/util/conntrack" "k8s.io/kubernetes/pkg/util/iptables" utilexec "k8s.io/utils/exec" @@ -91,6 +93,19 @@ func logTimeout(err error) bool { // ProxySocketFunc is a function which constructs a ProxySocket from a protocol, ip, and port type ProxySocketFunc func(protocol v1.Protocol, ip net.IP, port int) (ProxySocket, error) +const numBurstSyncs int = 2 + +type serviceChange struct { + current *v1.Service + previous *v1.Service +} + +// Interface for async runner; abstracted for testing +type asyncRunnerInterface interface { + Run() + Loop(<-chan struct{}) +} + // Proxier is a simple proxy for TCP connections between a localhost:lport // and services that provide the actual implementations. type Proxier struct { @@ -98,7 +113,7 @@ type Proxier struct { mu sync.Mutex // protects serviceMap serviceMap map[proxy.ServicePortName]*ServiceInfo syncPeriod time.Duration - minSyncPeriod time.Duration // unused atm, but plumbed through + minSyncPeriod time.Duration udpIdleTimeout time.Duration portMapMutex sync.Mutex portMap map[portMapKey]*portMapValue @@ -115,6 +130,10 @@ type Proxier struct { endpointsSynced int32 servicesSynced int32 initialized int32 + // protects serviceChanges + serviceChangesLock sync.Mutex + serviceChanges map[types.NamespacedName]*serviceChange // map of service changes + syncRunner asyncRunnerInterface // governs calls to syncProxyRules stopChan chan struct{} } @@ -210,12 +229,12 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables if err := iptablesFlush(iptables); err != nil { return nil, fmt.Errorf("failed to flush iptables: %v", err) } - return &Proxier{ - loadBalancer: loadBalancer, - serviceMap: make(map[proxy.ServicePortName]*ServiceInfo), - portMap: make(map[portMapKey]*portMapValue), - syncPeriod: syncPeriod, - // plumbed through if needed, not used atm. + proxier := &Proxier{ + loadBalancer: loadBalancer, + serviceMap: make(map[proxy.ServicePortName]*ServiceInfo), + serviceChanges: make(map[types.NamespacedName]*serviceChange), + portMap: make(map[portMapKey]*portMapValue), + syncPeriod: syncPeriod, minSyncPeriod: minSyncPeriod, udpIdleTimeout: udpIdleTimeout, listenIP: listenIP, @@ -225,7 +244,10 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables makeProxySocket: makeProxySocket, exec: exec, stopChan: make(chan struct{}), - }, nil + } + klog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, numBurstSyncs) + proxier.syncRunner = async.NewBoundedFrequencyRunner("userspace-proxy-sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, numBurstSyncs) + return proxier, nil } // CleanupLeftovers removes all iptables rules and chains created by the Proxier @@ -299,14 +321,13 @@ func CleanupLeftovers(ipt iptables.Interface) (encounteredError bool) { // shutdown closes all service port proxies and returns from the proxy's // sync loop. Used from testcases. func (proxier *Proxier) shutdown() { - defer proxier.cleanupStaleStickySessions() - proxier.mu.Lock() defer proxier.mu.Unlock() for serviceName, info := range proxier.serviceMap { - proxier.stopProxyInternal(serviceName, info) + proxier.stopProxy(serviceName, info) } + proxier.cleanupStaleStickySessions() close(proxier.stopChan) } @@ -314,34 +335,52 @@ func (proxier *Proxier) isInitialized() bool { return atomic.LoadInt32(&proxier.initialized) > 0 } -// Sync is called to immediately synchronize the proxier state to iptables +// Sync is called to synchronize the proxier state to iptables as soon as possible. func (proxier *Proxier) Sync() { + proxier.syncRunner.Run() +} + +func (proxier *Proxier) syncProxyRules() { + start := time.Now() + defer func() { + klog.V(2).Infof("userspace syncProxyRules took %v", time.Since(start)) + }() + + // don't sync rules till we've received services and endpoints + if !proxier.isInitialized() { + klog.V(2).Info("Not syncing userspace proxy until Services and Endpoints have been received from master") + return + } + if err := iptablesInit(proxier.iptables); err != nil { klog.Errorf("Failed to ensure iptables: %v", err) } + + proxier.serviceChangesLock.Lock() + changes := proxier.serviceChanges + proxier.serviceChanges = make(map[types.NamespacedName]*serviceChange) + proxier.serviceChangesLock.Unlock() + + proxier.mu.Lock() + defer proxier.mu.Unlock() + + klog.V(2).Infof("userspace proxy: processing %d service events", len(changes)) + for _, change := range changes { + existingPorts := proxier.mergeService(change.current) + proxier.unmergeService(change.previous, existingPorts) + } + proxier.ensurePortals() proxier.cleanupStaleStickySessions() } // SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return. func (proxier *Proxier) SyncLoop() { - t := time.NewTicker(proxier.syncPeriod) - defer t.Stop() - for { - select { - case <-t.C: - klog.V(6).Infof("Periodic sync") - proxier.Sync() - case <-proxier.stopChan: - return - } - } + proxier.syncRunner.Loop(proxier.stopChan) } // Ensure that portals exist for all services. func (proxier *Proxier) ensurePortals() { - proxier.mu.Lock() - defer proxier.mu.Unlock() // NB: This does not remove rules that should not be present. for name, info := range proxier.serviceMap { err := proxier.openPortal(name, info) @@ -353,22 +392,12 @@ func (proxier *Proxier) ensurePortals() { // clean up any stale sticky session records in the hash map. func (proxier *Proxier) cleanupStaleStickySessions() { - proxier.mu.Lock() - defer proxier.mu.Unlock() for name := range proxier.serviceMap { proxier.loadBalancer.CleanupStaleStickySessions(name) } } -// This assumes proxier.mu is not locked. func (proxier *Proxier) stopProxy(service proxy.ServicePortName, info *ServiceInfo) error { - proxier.mu.Lock() - defer proxier.mu.Unlock() - return proxier.stopProxyInternal(service, info) -} - -// This assumes proxier.mu is locked. -func (proxier *Proxier) stopProxyInternal(service proxy.ServicePortName, info *ServiceInfo) error { delete(proxier.serviceMap, service) info.setAlive(false) err := info.socket.Close() @@ -384,16 +413,18 @@ func (proxier *Proxier) getServiceInfo(service proxy.ServicePortName) (*ServiceI return info, ok } -func (proxier *Proxier) setServiceInfo(service proxy.ServicePortName, info *ServiceInfo) { +// addServiceOnPort lockes the proxy before calling addServiceOnPortInternal. +// Used from testcases. +func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol v1.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) { proxier.mu.Lock() defer proxier.mu.Unlock() - proxier.serviceMap[service] = info + return proxier.addServiceOnPortInternal(service, protocol, proxyPort, timeout) } -// addServiceOnPort starts listening for a new service, returning the ServiceInfo. +// addServiceOnPortInternal starts listening for a new service, returning the ServiceInfo. // Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP // connections, for now. -func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol v1.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) { +func (proxier *Proxier) addServiceOnPortInternal(service proxy.ServicePortName, protocol v1.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) { sock, err := proxier.makeProxySocket(protocol, proxier.listenIP, proxyPort) if err != nil { return nil, err @@ -417,7 +448,7 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol socket: sock, sessionAffinityType: v1.ServiceAffinityNone, // default } - proxier.setServiceInfo(service, si) + proxier.serviceMap[service] = si klog.V(2).Infof("Proxying for service %q on %s port %d", service, protocol, portNum) go func(service proxy.ServicePortName, proxier *Proxier) { @@ -444,7 +475,7 @@ func (proxier *Proxier) mergeService(service *v1.Service) sets.String { servicePort := &service.Spec.Ports[i] serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name} existingPorts.Insert(servicePort.Name) - info, exists := proxier.getServiceInfo(serviceName) + info, exists := proxier.serviceMap[serviceName] // TODO: check health of the socket? What if ProxyLoop exited? if exists && sameConfig(info, service, servicePort) { // Nothing changed. @@ -467,7 +498,7 @@ func (proxier *Proxier) mergeService(service *v1.Service) sets.String { serviceIP := net.ParseIP(service.Spec.ClusterIP) klog.V(1).Infof("Adding new service %q at %s/%s", serviceName, net.JoinHostPort(serviceIP.String(), strconv.Itoa(int(servicePort.Port))), servicePort.Protocol) - info, err = proxier.addServiceOnPort(serviceName, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout) + info, err = proxier.addServiceOnPortInternal(serviceName, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout) if err != nil { klog.Errorf("Failed to start proxy for %q: %v", serviceName, err) continue @@ -504,10 +535,7 @@ func (proxier *Proxier) unmergeService(service *v1.Service, existingPorts sets.S klog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP) return } - staleUDPServices := sets.NewString() - proxier.mu.Lock() - defer proxier.mu.Unlock() for i := range service.Spec.Ports { servicePort := &service.Spec.Ports[i] if existingPorts.Has(servicePort.Name) { @@ -529,7 +557,7 @@ func (proxier *Proxier) unmergeService(service *v1.Service, existingPorts sets.S if err := proxier.closePortal(serviceName, info); err != nil { klog.Errorf("Failed to close portal for %q: %v", serviceName, err) } - if err := proxier.stopProxyInternal(serviceName, info); err != nil { + if err := proxier.stopProxy(serviceName, info); err != nil { klog.Errorf("Failed to stop service %q: %v", serviceName, err) } proxier.loadBalancer.DeleteService(serviceName) @@ -541,17 +569,50 @@ func (proxier *Proxier) unmergeService(service *v1.Service, existingPorts sets.S } } +func (proxier *Proxier) serviceChange(previous, current *v1.Service, detail string) { + var svcName types.NamespacedName + if current != nil { + svcName = types.NamespacedName{Namespace: current.Namespace, Name: current.Name} + } else { + svcName = types.NamespacedName{Namespace: previous.Namespace, Name: previous.Name} + } + klog.V(4).Infof("userspace proxy: %s for %s", detail, svcName) + + proxier.serviceChangesLock.Lock() + defer proxier.serviceChangesLock.Unlock() + + change, exists := proxier.serviceChanges[svcName] + if !exists { + // change.previous is only set for new changes. We must keep + // the oldest service info (or nil) because correct unmerging + // depends on the next update/del after a merge, not subsequent + // updates. + change = &serviceChange{previous: previous} + proxier.serviceChanges[svcName] = change + } + + // Always use the most current service (or nil) as change.current + change.current = current + + if reflect.DeepEqual(change.previous, change.current) { + // collapsed change had no effect + delete(proxier.serviceChanges, svcName) + } else if proxier.isInitialized() { + // change will have an effect, ask the proxy to sync + proxier.syncRunner.Run() + } +} + func (proxier *Proxier) OnServiceAdd(service *v1.Service) { - _ = proxier.mergeService(service) + proxier.serviceChange(nil, service, "OnServiceAdd") } func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) { - existingPorts := proxier.mergeService(service) - proxier.unmergeService(oldService, existingPorts) + proxier.serviceChange(oldService, service, "OnServiceUpdate") } func (proxier *Proxier) OnServiceDelete(service *v1.Service) { - proxier.unmergeService(service, sets.NewString()) + proxier.serviceChange(service, nil, "OnServiceDelete") } func (proxier *Proxier) OnServiceSynced() { @@ -563,6 +624,11 @@ func (proxier *Proxier) OnServiceSynced() { if atomic.LoadInt32(&proxier.endpointsSynced) > 0 { atomic.StoreInt32(&proxier.initialized, 1) } + + // Must sync from a goroutine to avoid blocking the + // service event handler on startup with large numbers + // of initial objects + go proxier.syncProxyRules() } func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) { @@ -587,6 +653,11 @@ func (proxier *Proxier) OnEndpointsSynced() { if atomic.LoadInt32(&proxier.servicesSynced) > 0 { atomic.StoreInt32(&proxier.initialized, 1) } + + // Must sync from a goroutine to avoid blocking the + // service event handler on startup with large numbers + // of initial objects + go proxier.syncProxyRules() } func sameConfig(info *ServiceInfo, service *v1.Service, port *v1.ServicePort) bool { diff --git a/pkg/proxy/userspace/proxier_test.go b/pkg/proxy/userspace/proxier_test.go index 9488c1bb213..e76400d114a 100644 --- a/pkg/proxy/userspace/proxier_test.go +++ b/pkg/proxy/userspace/proxier_test.go @@ -24,6 +24,7 @@ import ( "net/http/httptest" "net/url" "os" + "reflect" "strconv" "sync/atomic" "testing" @@ -33,6 +34,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/proxy" ipttest "k8s.io/kubernetes/pkg/util/iptables/testing" "k8s.io/utils/exec" @@ -86,6 +88,16 @@ func waitForClosedPortUDP(p *Proxier, proxyPort int) error { return fmt.Errorf("port %d still open", proxyPort) } +func waitForServiceInfo(p *Proxier, service proxy.ServicePortName) (*ServiceInfo, bool) { + var svcInfo *ServiceInfo + var exists bool + wait.PollImmediate(50*time.Millisecond, 3*time.Second, func() (bool, error) { + svcInfo, exists = p.getServiceInfo(service) + return exists, nil + }) + return svcInfo, exists +} + // udpEchoServer is a simple echo server in UDP, intended for testing the proxy. type udpEchoServer struct { net.PacketConn @@ -225,6 +237,15 @@ func waitForNumProxyClients(t *testing.T, s *ServiceInfo, want int, timeout time t.Errorf("expected %d ProxyClients live, got %d", want, got) } +func startProxier(p *Proxier, t *testing.T) { + go func() { + p.SyncLoop() + }() + waitForNumProxyLoops(t, p, 0) + p.OnServiceSynced() + p.OnEndpointsSynced() +} + func TestTCPProxy(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} @@ -242,7 +263,7 @@ func TestTCPProxy(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) @@ -270,7 +291,7 @@ func TestUDPProxy(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) @@ -298,7 +319,7 @@ func TestUDPProxyTimeout(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) @@ -338,7 +359,7 @@ func TestMultiPortProxy(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) defer p.shutdown() svcInfoP, err := p.addServiceOnPort(serviceP, "TCP", 0, time.Second) @@ -368,7 +389,7 @@ func TestMultiPortOnServiceAdd(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) defer p.shutdown() p.OnServiceAdd(&v1.Service{ @@ -384,7 +405,7 @@ func TestMultiPortOnServiceAdd(t *testing.T) { }}}, }) waitForNumProxyLoops(t, p, 2) - svcInfo, exists := p.getServiceInfo(serviceP) + svcInfo, exists := waitForServiceInfo(p, serviceP) if !exists { t.Fatalf("can't find serviceInfo for %s", serviceP) } @@ -392,7 +413,7 @@ func TestMultiPortOnServiceAdd(t *testing.T) { t.Errorf("unexpected serviceInfo for %s: %#v", serviceP, svcInfo) } - svcInfo, exists = p.getServiceInfo(serviceQ) + svcInfo, exists = waitForServiceInfo(p, serviceQ) if !exists { t.Fatalf("can't find serviceInfo for %s", serviceQ) } @@ -408,7 +429,9 @@ func TestMultiPortOnServiceAdd(t *testing.T) { // Helper: Stops the proxy for the named service. func stopProxyByName(proxier *Proxier, service proxy.ServicePortName) error { - info, found := proxier.getServiceInfo(service) + proxier.mu.Lock() + defer proxier.mu.Unlock() + info, found := proxier.serviceMap[service] if !found { return fmt.Errorf("unknown service: %s", service) } @@ -432,7 +455,7 @@ func TestTCPProxyStop(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) @@ -477,7 +500,7 @@ func TestUDPProxyStop(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) @@ -501,9 +524,9 @@ func TestUDPProxyStop(t *testing.T) { func TestTCPProxyUpdateDelete(t *testing.T) { lb := NewLoadBalancerRR() - service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} + servicePortName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} lb.OnEndpointsAdd(&v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, + ObjectMeta: metav1.ObjectMeta{Namespace: servicePortName.Namespace, Name: servicePortName.Name}, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}}, @@ -516,29 +539,22 @@ func TestTCPProxyUpdateDelete(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) defer p.shutdown() - svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) - if err != nil { - t.Fatalf("error adding new service: %#v", err) - } - conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort)) - if err != nil { - t.Fatalf("error connecting to proxy: %v", err) - } - conn.Close() - waitForNumProxyLoops(t, p, 1) - - p.OnServiceDelete(&v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: servicePortName.Name, Namespace: servicePortName.Namespace}, Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ Name: "p", - Port: int32(svcInfo.proxyPort), + Port: 9997, Protocol: "TCP", }}}, - }) - if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { + } + + p.OnServiceAdd(service) + waitForNumProxyLoops(t, p, 1) + p.OnServiceDelete(service) + if err := waitForClosedPortTCP(p, int(service.Spec.Ports[0].Port)); err != nil { t.Fatalf(err.Error()) } waitForNumProxyLoops(t, p, 0) @@ -561,7 +577,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) @@ -607,7 +623,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) @@ -644,7 +660,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { Protocol: "TCP", }}}, }) - svcInfo, exists := p.getServiceInfo(service) + svcInfo, exists := waitForServiceInfo(p, service) if !exists { t.Fatalf("can't find serviceInfo for %s", service) } @@ -670,7 +686,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) @@ -707,7 +723,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { Protocol: "UDP", }}}, }) - svcInfo, exists := p.getServiceInfo(service) + svcInfo, exists := waitForServiceInfo(p, service) if !exists { t.Fatalf("can't find serviceInfo") } @@ -732,7 +748,7 @@ func TestTCPProxyUpdatePort(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) @@ -754,7 +770,7 @@ func TestTCPProxyUpdatePort(t *testing.T) { if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } - svcInfo, exists := p.getServiceInfo(service) + svcInfo, exists := waitForServiceInfo(p, service) if !exists { t.Fatalf("can't find serviceInfo") } @@ -781,7 +797,7 @@ func TestUDPProxyUpdatePort(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) @@ -802,7 +818,7 @@ func TestUDPProxyUpdatePort(t *testing.T) { if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } - svcInfo, exists := p.getServiceInfo(service) + svcInfo, exists := waitForServiceInfo(p, service) if !exists { t.Fatalf("can't find serviceInfo") } @@ -827,7 +843,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) @@ -853,7 +869,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) { if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } - svcInfo, exists := p.getServiceInfo(service) + svcInfo, exists := waitForServiceInfo(p, service) if !exists { t.Fatalf("can't find serviceInfo") } @@ -881,7 +897,7 @@ func TestProxyUpdatePortal(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) @@ -909,7 +925,16 @@ func TestProxyUpdatePortal(t *testing.T) { }}}, } p.OnServiceUpdate(svcv0, svcv1) - _, exists := p.getServiceInfo(service) + + // Wait for the service to be removed because it had an empty ClusterIP + var exists bool + for i := 0; i < 50; i++ { + _, exists = p.getServiceInfo(service) + if !exists { + break + } + time.Sleep(50 * time.Millisecond) + } if exists { t.Fatalf("service with empty ClusterIP should not be included in the proxy") } @@ -938,7 +963,7 @@ func TestProxyUpdatePortal(t *testing.T) { } p.OnServiceUpdate(svcv2, svcv3) lb.OnEndpointsAdd(endpoint) - svcInfo, exists = p.getServiceInfo(service) + svcInfo, exists = waitForServiceInfo(p, service) if !exists { t.Fatalf("service with ClusterIP set not found in the proxy") } @@ -946,6 +971,172 @@ func TestProxyUpdatePortal(t *testing.T) { waitForNumProxyLoops(t, p, 1) } +type fakeRunner struct{} + +// assert fakeAsyncRunner is a ProxyProvider +var _ asyncRunnerInterface = &fakeRunner{} + +func (f fakeRunner) Run() { +} + +func (f fakeRunner) Loop(stop <-chan struct{}) { +} + +func TestOnServiceAddChangeMap(t *testing.T) { + fexec := makeFakeExec() + + // Use long minSyncPeriod so we can test that immediate syncs work + p, err := createProxier(NewLoadBalancerRR(), net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Minute, udpIdleTimeoutForTest, newProxySocket) + if err != nil { + t.Fatal(err) + } + + // Fake out sync runner + p.syncRunner = fakeRunner{} + + serviceMeta := metav1.ObjectMeta{Namespace: "testnamespace", Name: "testname"} + service := &v1.Service{ + ObjectMeta: serviceMeta, + Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ + Name: "p", + Port: 99, + Protocol: "TCP", + }}}, + } + + serviceUpdate := &v1.Service{ + ObjectMeta: serviceMeta, + Spec: v1.ServiceSpec{ClusterIP: "1.2.3.5", Ports: []v1.ServicePort{{ + Name: "p", + Port: 100, + Protocol: "TCP", + }}}, + } + + serviceUpdate2 := &v1.Service{ + ObjectMeta: serviceMeta, + Spec: v1.ServiceSpec{ClusterIP: "1.2.3.6", Ports: []v1.ServicePort{{ + Name: "p", + Port: 101, + Protocol: "TCP", + }}}, + } + + type onServiceTest struct { + detail string + changes []serviceChange + expectedChange *serviceChange + } + + tests := []onServiceTest{ + { + detail: "add", + changes: []serviceChange{ + {current: service}, + }, + expectedChange: &serviceChange{ + current: service, + }, + }, + { + detail: "add+update=add", + changes: []serviceChange{ + {current: service}, + { + previous: service, + current: serviceUpdate, + }, + }, + expectedChange: &serviceChange{ + current: serviceUpdate, + }, + }, + { + detail: "add+del=none", + changes: []serviceChange{ + {current: service}, + {previous: service}, + }, + }, + { + detail: "update+update=update", + changes: []serviceChange{ + { + previous: service, + current: serviceUpdate, + }, + { + previous: serviceUpdate, + current: serviceUpdate2, + }, + }, + expectedChange: &serviceChange{ + previous: service, + current: serviceUpdate2, + }, + }, + { + detail: "update+del=del", + changes: []serviceChange{ + { + previous: service, + current: serviceUpdate, + }, + {previous: serviceUpdate}, + }, + // change collapsing always keeps the oldest service + // info since correct unmerging depends on the least + // recent update, not the most current. + expectedChange: &serviceChange{ + previous: service, + }, + }, + { + detail: "del+add=update", + changes: []serviceChange{ + {previous: service}, + {current: serviceUpdate}, + }, + expectedChange: &serviceChange{ + previous: service, + current: serviceUpdate, + }, + }, + } + + for _, test := range tests { + for _, change := range test.changes { + p.serviceChange(change.previous, change.current, test.detail) + } + + if test.expectedChange != nil { + if len(p.serviceChanges) != 1 { + t.Fatalf("[%s] expected 1 service change but found %d", test.detail, len(p.serviceChanges)) + } + expectedService := test.expectedChange.current + if expectedService == nil { + expectedService = test.expectedChange.previous + } + svcName := types.NamespacedName{Namespace: expectedService.Namespace, Name: expectedService.Name} + + change, ok := p.serviceChanges[svcName] + if !ok { + t.Fatalf("[%s] did not find service change for %v", test.detail, svcName) + } + if !reflect.DeepEqual(change.previous, test.expectedChange.previous) { + t.Fatalf("[%s] change previous service and expected previous service don't match\nchange: %+v\nexp: %+v", test.detail, change.previous, test.expectedChange.previous) + } + if !reflect.DeepEqual(change.current, test.expectedChange.current) { + t.Fatalf("[%s] change current service and expected current service don't match\nchange: %+v\nexp: %+v", test.detail, change.current, test.expectedChange.current) + } + } else { + if len(p.serviceChanges) != 0 { + t.Fatalf("[%s] expected no service changes but found %d", test.detail, len(p.serviceChanges)) + } + } + } +} + func makeFakeExec() *fakeexec.FakeExec { fcmd := fakeexec.FakeCmd{ CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ From cc2b31a2f38487941125309ed56cd071e56cfac0 Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Sun, 31 Mar 2019 15:30:02 -0500 Subject: [PATCH 6/6] proxy/userspace: consolidate portal and proxy cleanup --- pkg/proxy/userspace/proxier.go | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index 8db24f0da9b..ae55842b302 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -461,6 +461,16 @@ func (proxier *Proxier) addServiceOnPortInternal(service proxy.ServicePortName, return si, nil } +func (proxier *Proxier) cleanupPortalAndProxy(serviceName proxy.ServicePortName, info *ServiceInfo) error { + if err := proxier.closePortal(serviceName, info); err != nil { + return fmt.Errorf("Failed to close portal for %q: %v", serviceName, err) + } + if err := proxier.stopProxy(serviceName, info); err != nil { + return fmt.Errorf("Failed to stop service %q: %v", serviceName, err) + } + return nil +} + func (proxier *Proxier) mergeService(service *v1.Service) sets.String { if service == nil { return nil @@ -483,11 +493,8 @@ func (proxier *Proxier) mergeService(service *v1.Service) sets.String { } if exists { klog.V(4).Infof("Something changed for service %q: stopping it", serviceName) - if err := proxier.closePortal(serviceName, info); err != nil { - klog.Errorf("Failed to close portal for %q: %v", serviceName, err) - } - if err := proxier.stopProxy(serviceName, info); err != nil { - klog.Errorf("Failed to stop service %q: %v", serviceName, err) + if err := proxier.cleanupPortalAndProxy(serviceName, info); err != nil { + klog.Error(err) } } proxyPort, err := proxier.proxyPorts.AllocateNext() @@ -554,11 +561,8 @@ func (proxier *Proxier) unmergeService(service *v1.Service, existingPorts sets.S staleUDPServices.Insert(proxier.serviceMap[serviceName].portal.ip.String()) } - if err := proxier.closePortal(serviceName, info); err != nil { - klog.Errorf("Failed to close portal for %q: %v", serviceName, err) - } - if err := proxier.stopProxy(serviceName, info); err != nil { - klog.Errorf("Failed to stop service %q: %v", serviceName, err) + if err := proxier.cleanupPortalAndProxy(serviceName, info); err != nil { + klog.Error(err) } proxier.loadBalancer.DeleteService(serviceName) }