diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 0b753163a40..e42e31341e3 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -470,7 +470,7 @@ func NewProxyServer(config *componentconfig.KubeProxyConfiguration, cleanupAndEx if err != nil { return nil, fmt.Errorf("unable to create proxier: %v", err) } - serviceHandler = proxierUserspace + serviceEventHandler = proxierUserspace proxier = proxierUserspace } // Remove artifacts from the pure-iptables Proxier, if not on Windows. diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index ab62c9739cb..5032577b81f 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -395,97 +395,124 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol return si, nil } -// OnServiceUpdate manages the active set of service proxies. -// Active service proxies are reinitialized if found in the update set or -// shutdown if missing from the update set. -func (proxier *Proxier) OnServiceUpdate(services []*api.Service) { - glog.V(4).Infof("Received update notice: %+v", services) - activeServices := make(map[proxy.ServicePortName]bool) // use a map as a set - for _, service := range services { - // if ClusterIP is "None" or empty, skip proxying - if !helper.IsServiceIPSet(service) { - glog.V(3).Infof("Skipping service %s due to clusterIP = %q", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP) +func (proxier *Proxier) mergeService(service *api.Service) sets.String { + if service == nil { + return nil + } + svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + if !helper.IsServiceIPSet(service) { + glog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP) + return nil + } + existingPorts := sets.NewString() + for i := range service.Spec.Ports { + servicePort := &service.Spec.Ports[i] + serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name} + existingPorts.Insert(servicePort.Name) + info, exists := proxier.getServiceInfo(serviceName) + // TODO: check health of the socket? What if ProxyLoop exited? + if exists && sameConfig(info, service, servicePort) { + // Nothing changed. + continue + } + if exists { + glog.V(4).Infof("Something changed for service %q: stopping it", serviceName) + if err := proxier.closePortal(serviceName, info); err != nil { + glog.Errorf("Failed to close portal for %q: %v", serviceName, err) + } + if err := proxier.stopProxy(serviceName, info); err != nil { + glog.Errorf("Failed to stop service %q: %v", serviceName, err) + } + } + proxyPort, err := proxier.proxyPorts.AllocateNext() + if err != nil { + glog.Errorf("failed to allocate proxy port for service %q: %v", serviceName, err) continue } - for i := range service.Spec.Ports { - servicePort := &service.Spec.Ports[i] - serviceName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: servicePort.Name} - activeServices[serviceName] = true - serviceIP := net.ParseIP(service.Spec.ClusterIP) - info, exists := proxier.getServiceInfo(serviceName) - // TODO: check health of the socket? What if ProxyLoop exited? - if exists && sameConfig(info, service, servicePort) { - // Nothing changed. - continue - } - if exists { - glog.V(4).Infof("Something changed for service %q: stopping it", serviceName) - err := proxier.closePortal(serviceName, info) - if err != nil { - glog.Errorf("Failed to close portal for %q: %v", serviceName, err) - } - err = proxier.stopProxy(serviceName, info) - if err != nil { - glog.Errorf("Failed to stop service %q: %v", serviceName, err) - } - } - - proxyPort, err := proxier.proxyPorts.AllocateNext() - if err != nil { - glog.Errorf("failed to allocate proxy port for service %q: %v", serviceName, err) - continue - } - - glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol) - info, err = proxier.addServiceOnPort(serviceName, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout) - if err != nil { - glog.Errorf("Failed to start proxy for %q: %v", serviceName, err) - continue - } - info.portal.ip = serviceIP - info.portal.port = int(servicePort.Port) - info.externalIPs = service.Spec.ExternalIPs - // Deep-copy in case the service instance changes - info.loadBalancerStatus = *helper.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer) - info.nodePort = int(servicePort.NodePort) - info.sessionAffinityType = service.Spec.SessionAffinity - glog.V(4).Infof("info: %#v", info) - - err = proxier.openPortal(serviceName, info) - if err != nil { - glog.Errorf("Failed to open portal for %q: %v", serviceName, err) - } - proxier.loadBalancer.NewService(serviceName, info.sessionAffinityType, info.stickyMaxAgeMinutes) + serviceIP := net.ParseIP(service.Spec.ClusterIP) + glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol) + info, err = proxier.addServiceOnPort(serviceName, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout) + if err != nil { + glog.Errorf("Failed to start proxy for %q: %v", serviceName, err) + continue } + info.portal.ip = serviceIP + info.portal.port = int(servicePort.Port) + info.externalIPs = service.Spec.ExternalIPs + // Deep-copy in case the service instance changes + info.loadBalancerStatus = *helper.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer) + info.nodePort = int(servicePort.NodePort) + info.sessionAffinityType = service.Spec.SessionAffinity + glog.V(4).Infof("info: %#v", info) + + if err := proxier.openPortal(serviceName, info); err != nil { + glog.Errorf("Failed to open portal for %q: %v", serviceName, err) + } + proxier.loadBalancer.NewService(serviceName, info.sessionAffinityType, info.stickyMaxAgeMinutes) + } + + return existingPorts +} + +func (proxier *Proxier) unmergeService(service *api.Service, existingPorts sets.String) { + if service == nil { + return + } + svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + if !helper.IsServiceIPSet(service) { + glog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP) + return } staleUDPServices := sets.NewString() proxier.mu.Lock() defer proxier.mu.Unlock() - for name, info := range proxier.serviceMap { - if !activeServices[name] { - glog.V(1).Infof("Stopping service %q", name) - - if proxier.serviceMap[name].protocol == api.ProtocolUDP { - staleUDPServices.Insert(proxier.serviceMap[name].portal.ip.String()) - } - - err := proxier.closePortal(name, info) - if err != nil { - glog.Errorf("Failed to close portal for %q: %v", name, err) - } - err = proxier.stopProxyInternal(name, info) - if err != nil { - glog.Errorf("Failed to stop service %q: %v", name, err) - } - proxier.loadBalancer.DeleteService(name) + for i := range service.Spec.Ports { + servicePort := &service.Spec.Ports[i] + if existingPorts.Has(servicePort.Name) { + continue } - } + serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name} + glog.V(1).Infof("Stopping service %q", serviceName) + info, exists := proxier.serviceMap[serviceName] + if !exists { + glog.Errorf("Service %q is being removed but doesn't exist", serviceName) + continue + } + + if proxier.serviceMap[serviceName].protocol == api.ProtocolUDP { + staleUDPServices.Insert(proxier.serviceMap[serviceName].portal.ip.String()) + } + + if err := proxier.closePortal(serviceName, info); err != nil { + glog.Errorf("Failed to close portal for %q: %v", serviceName, err) + } + if err := proxier.stopProxyInternal(serviceName, info); err != nil { + glog.Errorf("Failed to stop service %q: %v", serviceName, err) + } + proxier.loadBalancer.DeleteService(serviceName) + } utilproxy.DeleteServiceConnections(proxier.exec, staleUDPServices.List()) } +func (proxier *Proxier) OnServiceAdd(service *api.Service) { + _ = proxier.mergeService(service) +} + +func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) { + existingPorts := proxier.mergeService(service) + proxier.unmergeService(oldService, existingPorts) +} + +func (proxier *Proxier) OnServiceDelete(service *api.Service) { + proxier.unmergeService(service, sets.NewString()) +} + +func (proxier *Proxier) OnServiceSynced() { +} + func sameConfig(info *ServiceInfo, service *api.Service, port *api.ServicePort) bool { if info.protocol != port.Protocol || info.portal.port != int(port.Port) || info.nodePort != int(port.NodePort) { return false diff --git a/pkg/proxy/userspace/proxier_test.go b/pkg/proxy/userspace/proxier_test.go index a3ad8eb0f24..b497318a979 100644 --- a/pkg/proxy/userspace/proxier_test.go +++ b/pkg/proxy/userspace/proxier_test.go @@ -326,7 +326,7 @@ func TestMultiPortProxy(t *testing.T) { waitForNumProxyLoops(t, p, 2) } -func TestMultiPortOnServiceUpdate(t *testing.T) { +func TestMultiPortOnServiceAdd(t *testing.T) { lb := NewLoadBalancerRR() serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "q"} @@ -340,7 +340,7 @@ func TestMultiPortOnServiceUpdate(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - p.OnServiceUpdate([]*api.Service{{ + p.OnServiceAdd(&api.Service{ ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ Name: "p", @@ -351,7 +351,7 @@ func TestMultiPortOnServiceUpdate(t *testing.T) { Port: 81, Protocol: "UDP", }}}, - }}) + }) waitForNumProxyLoops(t, p, 2) svcInfo, exists := p.getServiceInfo(serviceP) if !exists { @@ -496,7 +496,14 @@ func TestTCPProxyUpdateDelete(t *testing.T) { conn.Close() waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]*api.Service{}) + p.OnServiceDelete(&api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ + Name: "p", + Port: int32(svcInfo.proxyPort), + Protocol: "TCP", + }}}, + }) if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } @@ -533,7 +540,14 @@ func TestUDPProxyUpdateDelete(t *testing.T) { conn.Close() waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]*api.Service{}) + p.OnServiceDelete(&api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ + Name: "p", + Port: int32(svcInfo.proxyPort), + Protocol: "UDP", + }}}, + }) if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } @@ -571,7 +585,14 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { conn.Close() waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]*api.Service{}) + p.OnServiceDelete(&api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ + Name: "p", + Port: int32(svcInfo.proxyPort), + Protocol: "TCP", + }}}, + }) if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } @@ -579,14 +600,14 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { // need to add endpoint here because it got clean up during service delete lb.OnEndpointsAdd(endpoint) - p.OnServiceUpdate([]*api.Service{{ + p.OnServiceAdd(&api.Service{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ Name: "p", Port: int32(svcInfo.proxyPort), Protocol: "TCP", }}}, - }}) + }) svcInfo, exists := p.getServiceInfo(service) if !exists { t.Fatalf("can't find serviceInfo for %s", service) @@ -626,7 +647,14 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { conn.Close() waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]*api.Service{}) + p.OnServiceDelete(&api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ + Name: "p", + Port: int32(svcInfo.proxyPort), + Protocol: "UDP", + }}}, + }) if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } @@ -634,14 +662,14 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { // need to add endpoint here because it got clean up during service delete lb.OnEndpointsAdd(endpoint) - p.OnServiceUpdate([]*api.Service{{ + p.OnServiceAdd(&api.Service{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ Name: "p", Port: int32(svcInfo.proxyPort), Protocol: "UDP", }}}, - }}) + }) svcInfo, exists := p.getServiceInfo(service) if !exists { t.Fatalf("can't find serviceInfo") @@ -676,14 +704,14 @@ func TestTCPProxyUpdatePort(t *testing.T) { testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]*api.Service{{ + p.OnServiceAdd(&api.Service{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ Name: "p", Port: 99, Protocol: "TCP", }}}, - }}) + }) // Wait for the socket to actually get free. if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) @@ -723,14 +751,14 @@ func TestUDPProxyUpdatePort(t *testing.T) { } waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]*api.Service{{ + p.OnServiceAdd(&api.Service{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ Name: "p", Port: 99, Protocol: "UDP", }}}, - }}) + }) // Wait for the socket to actually get free. if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) @@ -769,7 +797,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) { testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]*api.Service{{ + p.OnServiceAdd(&api.Service{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ Ports: []api.ServicePort{{ @@ -780,7 +808,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) { ClusterIP: svcInfo.portal.ip.String(), ExternalIPs: []string{"4.3.2.1"}, }, - }}) + }) // Wait for the socket to actually get free. if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) @@ -822,40 +850,52 @@ func TestProxyUpdatePortal(t *testing.T) { testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]*api.Service{{ - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Spec: api.ServiceSpec{ClusterIP: "", Ports: []api.ServicePort{{ - Name: "p", - Port: int32(svcInfo.proxyPort), - Protocol: "TCP", - }}}, - }}) - _, exists := p.getServiceInfo(service) - if exists { - t.Fatalf("service with empty ClusterIP should not be included in the proxy") - } - - p.OnServiceUpdate([]*api.Service{{ - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Spec: api.ServiceSpec{ClusterIP: "None", Ports: []api.ServicePort{{ - Name: "p", - Port: int32(svcInfo.proxyPort), - Protocol: "TCP", - }}}, - }}) - _, exists = p.getServiceInfo(service) - if exists { - t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy") - } - - p.OnServiceUpdate([]*api.Service{{ + svcv0 := &api.Service{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ Name: "p", Port: int32(svcInfo.proxyPort), Protocol: "TCP", }}}, - }}) + } + + svcv1 := &api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: "", Ports: []api.ServicePort{{ + Name: "p", + Port: int32(svcInfo.proxyPort), + Protocol: "TCP", + }}}, + } + p.OnServiceUpdate(svcv0, svcv1) + _, exists := p.getServiceInfo(service) + if exists { + t.Fatalf("service with empty ClusterIP should not be included in the proxy") + } + + svcv2 := &api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: "None", Ports: []api.ServicePort{{ + Name: "p", + Port: int32(svcInfo.proxyPort), + Protocol: "TCP", + }}}, + } + p.OnServiceUpdate(svcv1, svcv2) + _, exists = p.getServiceInfo(service) + if exists { + t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy") + } + + svcv3 := &api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ + Name: "p", + Port: int32(svcInfo.proxyPort), + Protocol: "TCP", + }}}, + } + p.OnServiceUpdate(svcv2, svcv3) lb.OnEndpointsAdd(endpoint) svcInfo, exists = p.getServiceInfo(service) if !exists {