From 57d35d5acbc2af278fb81e9941cf9c1cd339df8b Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Fri, 5 May 2017 10:56:03 +0200 Subject: [PATCH] Switch winuserspace proxy to be event based for services --- cmd/kube-proxy/app/server.go | 2 +- pkg/proxy/winuserspace/proxier.go | 187 +++++++++++++++---------- pkg/proxy/winuserspace/proxier_test.go | 131 +++++++++++------ 3 files changed, 198 insertions(+), 122 deletions(-) diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 7f308783604..a5c14390777 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -455,7 +455,7 @@ func NewProxyServer(config *componentconfig.KubeProxyConfiguration, cleanupAndEx if err != nil { return nil, fmt.Errorf("unable to create proxier: %v", err) } - serviceHandler = proxierUserspace + serviceEventHandler = proxierUserspace proxier = proxierUserspace } else { // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for diff --git a/pkg/proxy/winuserspace/proxier.go b/pkg/proxy/winuserspace/proxier.go index b331530e763..8bce3d639f9 100644 --- a/pkg/proxy/winuserspace/proxier.go +++ b/pkg/proxy/winuserspace/proxier.go @@ -315,86 +315,78 @@ func getListenIPPortMap(service *api.Service, listenPort int, nodePort int) map[ return listenIPPortMap } -// OnServiceUpdate manages the active set of service proxies. -// Active service proxies are reinitialized if found in the update set or -// shutdown if missing from the update set. -func (proxier *Proxier) OnServiceUpdate(services []*api.Service) { - glog.V(4).Infof("Received update notice: %+v", services) - activeServicePortPortals := make(map[ServicePortPortalName]bool) // use a map as a set - for _, service := range services { - // if ClusterIP is "None" or empty, skip proxying - if !helper.IsServiceIPSet(service) { - glog.V(3).Infof("Skipping service %s due to clusterIP = %q", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP) - continue +func (proxier *Proxier) mergeService(service *api.Service) map[ServicePortPortalName]bool { + if service == nil { + return nil + } + svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + if !helper.IsServiceIPSet(service) { + glog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP) + return nil + } + existingPortPortals := make(map[ServicePortPortalName]bool) + + for i := range service.Spec.Ports { + servicePort := &service.Spec.Ports[i] + // create a slice of all the source IPs to use for service port portals + listenIPPortMap := getListenIPPortMap(service, int(servicePort.Port), int(servicePort.NodePort)) + protocol := servicePort.Protocol + + for listenIP, listenPort := range listenIPPortMap { + servicePortPortalName := ServicePortPortalName{ + NamespacedName: svcName, + Port: servicePort.Name, + PortalIPName: listenIP, + } + existingPortPortals[servicePortPortalName] = true + info, exists := proxier.getServiceInfo(servicePortPortalName) + if exists && sameConfig(info, service, protocol, listenPort) { + // Nothing changed. + continue + } + if exists { + glog.V(4).Infof("Something changed for service %q: stopping it", servicePortPortalName) + if err := proxier.closeServicePortPortal(servicePortPortalName, info); err != nil { + glog.Errorf("Failed to close service port portal %q: %v", servicePortPortalName, err) + } + } + glog.V(1).Infof("Adding new service %q at %s:%d/%s", servicePortPortalName, listenIP, listenPort, protocol) + info, err := proxier.addServicePortPortal(servicePortPortalName, protocol, listenIP, listenPort, proxier.udpIdleTimeout) + if err != nil { + glog.Errorf("Failed to start proxy for %q: %v", servicePortPortalName, err) + continue + } + info.sessionAffinityType = service.Spec.SessionAffinity + glog.V(10).Infof("info: %#v", info) } - - for i := range service.Spec.Ports { - servicePort := &service.Spec.Ports[i] - // create a slice of all the source IPs to use for service port portals - listenIPPortMap := getListenIPPortMap(service, int(servicePort.Port), int(servicePort.NodePort)) - protocol := servicePort.Protocol - - for listenIP, listenPort := range listenIPPortMap { - servicePortPortalName := ServicePortPortalName{ - NamespacedName: types.NamespacedName{ - Namespace: service.Namespace, - Name: service.Name, - }, - Port: servicePort.Name, - PortalIPName: listenIP, - } - activeServicePortPortals[servicePortPortalName] = true - info, exists := proxier.getServiceInfo(servicePortPortalName) - if exists && sameConfig(info, service, protocol, listenPort) { - // Nothing changed. - continue - } - if exists { - glog.V(4).Infof("Something changed for service %q: stopping it", servicePortPortalName) - if err := proxier.closeServicePortPortal(servicePortPortalName, info); err != nil { - glog.Errorf("Failed to close service port portal %q: %v", servicePortPortalName, err) - } - } - glog.V(1).Infof("Adding new service %q at %s:%d/%s", servicePortPortalName, listenIP, listenPort, protocol) - info, err := proxier.addServicePortPortal(servicePortPortalName, protocol, listenIP, listenPort, proxier.udpIdleTimeout) - if err != nil { - glog.Errorf("Failed to start proxy for %q: %v", servicePortPortalName, err) - continue - } - info.sessionAffinityType = service.Spec.SessionAffinity - glog.V(10).Infof("info: %#v", info) - } - if len(listenIPPortMap) > 0 { - // only one loadbalancer per service port portal - servicePortName := proxy.ServicePortName{ - NamespacedName: types.NamespacedName{ - Namespace: service.Namespace, - Name: service.Name, - }, - Port: servicePort.Name, - } - proxier.loadBalancer.NewService(servicePortName, service.Spec.SessionAffinity, stickyMaxAgeMinutes) + if len(listenIPPortMap) > 0 { + // only one loadbalancer per service port portal + servicePortName := proxy.ServicePortName{ + NamespacedName: types.NamespacedName{ + Namespace: service.Namespace, + Name: service.Name, + }, + Port: servicePort.Name, } + proxier.loadBalancer.NewService(servicePortName, service.Spec.SessionAffinity, stickyMaxAgeMinutes) } } - for name, info := range proxier.serviceMap { - if !activeServicePortPortals[name] { - glog.V(1).Infof("Stopping service %q", name) + return existingPortPortals +} - if err := proxier.closeServicePortPortal(name, info); err != nil { - glog.Errorf("Failed to close service port portal %q: %v", name, err) - } - } +func (proxier *Proxier) unmergeService(service *api.Service, existingPortPortals map[ServicePortPortalName]bool) { + if service == nil { + return + } + svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + if !helper.IsServiceIPSet(service) { + glog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP) + return } - proxier.mu.Lock() - defer proxier.mu.Unlock() - - // servicePortNameMap tracks all service port portals with the same name/port. - // A value of true means there is one or more service port portals with name/port pair. servicePortNameMap := make(map[proxy.ServicePortName]bool) - for name := range proxier.serviceMap { + for name := range existingPortPortals { servicePortName := proxy.ServicePortName{ NamespacedName: types.NamespacedName{ Namespace: name.Namespace, @@ -402,17 +394,60 @@ func (proxier *Proxier) OnServiceUpdate(services []*api.Service) { }, Port: name.Port, } - servicePortNameMap[servicePortName] = servicePortNameMap[servicePortName] || activeServicePortPortals[name] + servicePortNameMap[servicePortName] = true } - // Only delete load balancer if all listen ips per name/port show inactive. - for name := range servicePortNameMap { - if !servicePortNameMap[name] { - proxier.loadBalancer.DeleteService(name) + for i := range service.Spec.Ports { + servicePort := &service.Spec.Ports[i] + serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name} + // create a slice of all the source IPs to use for service port portals + listenIPPortMap := getListenIPPortMap(service, int(servicePort.Port), int(servicePort.NodePort)) + + for listenIP := range listenIPPortMap { + servicePortPortalName := ServicePortPortalName{ + NamespacedName: svcName, + Port: servicePort.Name, + PortalIPName: listenIP, + } + if existingPortPortals[servicePortPortalName] { + continue + } + + glog.V(1).Infof("Stopping service %q", servicePortPortalName) + info, exists := proxier.getServiceInfo(servicePortPortalName) + if !exists { + glog.Errorf("Service %q is being removed but doesn't exist", servicePortPortalName) + continue + } + + if err := proxier.closeServicePortPortal(servicePortPortalName, info); err != nil { + glog.Errorf("Failed to close service port portal %q: %v", servicePortPortalName, err) + } + } + + // Only delete load balancer if all listen ips per name/port show inactive. + if !servicePortNameMap[serviceName] { + proxier.loadBalancer.DeleteService(serviceName) } } } +func (proxier *Proxier) OnServiceAdd(service *api.Service) { + _ = proxier.mergeService(service) +} + +func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) { + existingPortPortals := proxier.mergeService(service) + proxier.unmergeService(oldService, existingPortPortals) +} + +func (proxier *Proxier) OnServiceDelete(service *api.Service) { + proxier.unmergeService(service, map[ServicePortPortalName]bool{}) +} + +func (proxier *Proxier) OnServiceSynced() { +} + func sameConfig(info *serviceInfo, service *api.Service, protocol api.Protocol, listenPort int) bool { return info.protocol == protocol && info.portal.port == listenPort && info.sessionAffinityType == service.Spec.SessionAffinity } diff --git a/pkg/proxy/winuserspace/proxier_test.go b/pkg/proxy/winuserspace/proxier_test.go index 9334880a1d0..fadc5e38bc2 100644 --- a/pkg/proxy/winuserspace/proxier_test.go +++ b/pkg/proxy/winuserspace/proxier_test.go @@ -341,7 +341,7 @@ func TestMultiPortProxy(t *testing.T) { waitForNumProxyLoops(t, p, 2) } -func TestMultiPortOnServiceUpdate(t *testing.T) { +func TestMultiPortOnServiceAdd(t *testing.T) { lb := NewLoadBalancerRR() serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "q"} @@ -354,7 +354,7 @@ func TestMultiPortOnServiceUpdate(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - p.OnServiceUpdate([]*api.Service{{ + p.OnServiceAdd(&api.Service{ ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Spec: api.ServiceSpec{ClusterIP: "0.0.0.0", Ports: []api.ServicePort{{ Name: "p", @@ -365,7 +365,7 @@ func TestMultiPortOnServiceUpdate(t *testing.T) { Port: 0, Protocol: "UDP", }}}, - }}) + }) waitForNumProxyLoops(t, p, 2) servicePortPortalNameP := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: serviceP.Namespace, Name: serviceP.Name}, Port: serviceP.Port, PortalIPName: listenIP} @@ -515,7 +515,14 @@ func TestTCPProxyUpdateDelete(t *testing.T) { conn.Close() waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]*api.Service{}) + p.OnServiceDelete(&api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ + Name: "p", + Port: int32(getPortNum(t, svcInfo.socket.Addr().String())), + Protocol: "TCP", + }}}, + }) if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { t.Fatalf(err.Error()) } @@ -552,7 +559,14 @@ func TestUDPProxyUpdateDelete(t *testing.T) { conn.Close() waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]*api.Service{}) + p.OnServiceDelete(&api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ + Name: "p", + Port: int32(getPortNum(t, svcInfo.socket.Addr().String())), + Protocol: "UDP", + }}}, + }) if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { t.Fatalf(err.Error()) } @@ -590,7 +604,14 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { conn.Close() waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]*api.Service{}) + p.OnServiceDelete(&api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ + Name: "p", + Port: int32(getPortNum(t, svcInfo.socket.Addr().String())), + Protocol: "TCP", + }}}, + }) if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { t.Fatalf(err.Error()) } @@ -598,14 +619,14 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { // need to add endpoint here because it got clean up during service delete lb.OnEndpointsAdd(endpoint) - p.OnServiceUpdate([]*api.Service{{ + p.OnServiceAdd(&api.Service{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ Name: "p", Port: int32(getPortNum(t, svcInfo.socket.Addr().String())), Protocol: "TCP", }}}, - }}) + }) svcInfo, exists := p.getServiceInfo(servicePortPortalName) if !exists { t.Fatalf("can't find serviceInfo for %s", servicePortPortalName) @@ -645,7 +666,14 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { conn.Close() waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]*api.Service{}) + p.OnServiceDelete(&api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ + Name: "p", + Port: int32(getPortNum(t, svcInfo.socket.Addr().String())), + Protocol: "UDP", + }}}, + }) if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { t.Fatalf(err.Error()) } @@ -653,14 +681,14 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { // need to add endpoint here because it got clean up during service delete lb.OnEndpointsAdd(endpoint) - p.OnServiceUpdate([]*api.Service{{ + p.OnServiceAdd(&api.Service{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ Name: "p", Port: int32(getPortNum(t, svcInfo.socket.Addr().String())), Protocol: "UDP", }}}, - }}) + }) svcInfo, exists := p.getServiceInfo(servicePortPortalName) if !exists { t.Fatalf("can't find serviceInfo for %s", servicePortPortalName) @@ -695,14 +723,14 @@ func TestTCPProxyUpdatePort(t *testing.T) { testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String())) waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]*api.Service{{ + p.OnServiceAdd(&api.Service{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ Name: "p", Port: 0, Protocol: "TCP", }}}, - }}) + }) // Wait for the socket to actually get free. if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { t.Fatalf(err.Error()) @@ -742,14 +770,14 @@ func TestUDPProxyUpdatePort(t *testing.T) { } waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]*api.Service{{ + p.OnServiceAdd(&api.Service{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ Name: "p", Port: 0, Protocol: "UDP", }}}, - }}) + }) // Wait for the socket to actually get free. if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { t.Fatalf(err.Error()) @@ -788,7 +816,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) { testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String())) waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]*api.Service{{ + p.OnServiceAdd(&api.Service{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ Ports: []api.ServicePort{{ @@ -799,7 +827,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) { ClusterIP: svcInfo.portal.ip, ExternalIPs: []string{"0.0.0.0"}, }, - }}) + }) // Wait for the socket to actually get free. if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { t.Fatalf(err.Error()) @@ -841,40 +869,53 @@ func TestProxyUpdatePortal(t *testing.T) { testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String())) waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]*api.Service{{ - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Spec: api.ServiceSpec{ClusterIP: "", Ports: []api.ServicePort{{ - Name: "p", - Port: int32(svcInfo.portal.port), - Protocol: "TCP", - }}}, - }}) - _, exists := p.getServiceInfo(servicePortPortalName) - if exists { - t.Fatalf("service with empty ClusterIP should not be included in the proxy") - } - - p.OnServiceUpdate([]*api.Service{{ - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Spec: api.ServiceSpec{ClusterIP: "None", Ports: []api.ServicePort{{ - Name: "p", - Port: int32(getPortNum(t, svcInfo.socket.Addr().String())), - Protocol: "TCP", - }}}, - }}) - _, exists = p.getServiceInfo(servicePortPortalName) - if exists { - t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy") - } - - p.OnServiceUpdate([]*api.Service{{ + svcv0 := &api.Service{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ Name: "p", Port: int32(svcInfo.portal.port), Protocol: "TCP", }}}, - }}) + } + + svcv1 := &api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: "", Ports: []api.ServicePort{{ + Name: "p", + Port: int32(svcInfo.portal.port), + Protocol: "TCP", + }}}, + } + + p.OnServiceUpdate(svcv0, svcv1) + _, exists := p.getServiceInfo(servicePortPortalName) + if exists { + t.Fatalf("service with empty ClusterIP should not be included in the proxy") + } + + svcv2 := &api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: "None", Ports: []api.ServicePort{{ + Name: "p", + Port: int32(getPortNum(t, svcInfo.socket.Addr().String())), + Protocol: "TCP", + }}}, + } + p.OnServiceUpdate(svcv1, svcv2) + _, exists = p.getServiceInfo(servicePortPortalName) + if exists { + t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy") + } + + svcv3 := &api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ + Name: "p", + Port: int32(svcInfo.portal.port), + Protocol: "TCP", + }}}, + } + p.OnServiceUpdate(svcv2, svcv3) lb.OnEndpointsAdd(endpoint) svcInfo, exists = p.getServiceInfo(servicePortPortalName) if !exists {