From eae2b8e9bac5307b12872b6e0a8acf58a9f58292 Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Mon, 19 Dec 2016 17:08:24 -0600 Subject: [PATCH 1/4] proxy/iptables: split out service map creation and add testcases --- pkg/proxy/iptables/BUILD | 2 + pkg/proxy/iptables/proxier.go | 98 +++++++++++------ pkg/proxy/iptables/proxier_test.go | 170 +++++++++++++++++++++++++++++ 3 files changed, 237 insertions(+), 33 deletions(-) diff --git a/pkg/proxy/iptables/BUILD b/pkg/proxy/iptables/BUILD index c964a07c18a..03fab42888d 100644 --- a/pkg/proxy/iptables/BUILD +++ b/pkg/proxy/iptables/BUILD @@ -40,8 +40,10 @@ go_test( tags = ["automanaged"], deps = [ "//pkg/api:go_default_library", + "//pkg/api/service:go_default_library", "//pkg/proxy:go_default_library", "//pkg/util/exec:go_default_library", + "//pkg/util/intstr:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/iptables/testing:go_default_library", "//vendor:k8s.io/apimachinery/pkg/types", diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 6996c6729c5..89634d461c5 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -160,11 +160,13 @@ func newServiceInfo(service proxy.ServicePortName) *serviceInfo { } } +type proxyServiceMap map[proxy.ServicePortName]*serviceInfo + // Proxier is an iptables based proxy for connections between a localhost:lport // and services that provide the actual backends. type Proxier struct { mu sync.Mutex // protects the following fields - serviceMap map[proxy.ServicePortName]*serviceInfo + serviceMap proxyServiceMap endpointsMap map[proxy.ServicePortName][]*endpointsInfo portsMap map[localPort]closeable haveReceivedServiceUpdate bool // true once we've seen an OnServiceUpdate event @@ -278,7 +280,7 @@ func NewProxier(ipt utiliptables.Interface, } return &Proxier{ - serviceMap: make(map[proxy.ServicePortName]*serviceInfo), + serviceMap: make(proxyServiceMap), endpointsMap: make(map[proxy.ServicePortName][]*endpointsInfo), portsMap: make(map[localPort]closeable), syncPeriod: syncPeriod, @@ -382,7 +384,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { return encounteredError } -func (proxier *Proxier) sameConfig(info *serviceInfo, service *api.Service, port *api.ServicePort) bool { +func sameConfig(info *serviceInfo, service *api.Service, port *api.ServicePort) bool { if info.protocol != port.Protocol || info.port != int(port.Port) || info.nodePort != int(port.NodePort) { return false } @@ -438,16 +440,22 @@ func (proxier *Proxier) SyncLoop() { } } -// OnServiceUpdate tracks the active set of service proxies. -// They will be synchronized using syncProxyRules() -func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { - start := time.Now() - defer func() { - glog.V(4).Infof("OnServiceUpdate took %v for %d services", time.Since(start), len(allServices)) - }() - proxier.mu.Lock() - defer proxier.mu.Unlock() - proxier.haveReceivedServiceUpdate = true +type healthCheckPort struct { + namespace types.NamespacedName + nodeport int +} + +// Accepts a list of Services and the existing service map. Returns the new +// service map, a list of healthcheck ports to add to or remove from the health +// checking listener service, and a set of stale UDP services. +func buildServiceMap(allServices []api.Service, oldServiceMap proxyServiceMap) (proxyServiceMap, []healthCheckPort, []healthCheckPort, sets.String) { + healthCheckAdd := make([]healthCheckPort, 0) + healthCheckDel := make([]healthCheckPort, 0) + + newServiceMap := make(proxyServiceMap) + for key, value := range oldServiceMap { + newServiceMap[key] = value + } activeServices := make(map[proxy.ServicePortName]bool) // use a map as a set @@ -472,15 +480,15 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { Port: servicePort.Name, } activeServices[serviceName] = true - info, exists := proxier.serviceMap[serviceName] - if exists && proxier.sameConfig(info, service, servicePort) { - // Nothing changed. - continue - } + info, exists := newServiceMap[serviceName] if exists { + if sameConfig(info, service, servicePort) { + // Nothing changed. + continue + } // Something changed. glog.V(3).Infof("Something changed for service %q: removing it", serviceName) - delete(proxier.serviceMap, serviceName) + delete(newServiceMap, serviceName) } serviceIP := net.ParseIP(service.Spec.ClusterIP) glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol) @@ -501,17 +509,13 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { glog.Errorf("Service does not contain necessary annotation %v", apiservice.BetaAnnotationHealthCheckNodePort) } else { - glog.V(4).Infof("Adding health check for %+v, port %v", serviceName.NamespacedName, p) info.healthCheckNodePort = int(p) - // Turn on healthcheck responder to listen on the health check nodePort - healthcheck.AddServiceListener(serviceName.NamespacedName, info.healthCheckNodePort) + healthCheckAdd = append(healthCheckAdd, healthCheckPort{serviceName.NamespacedName, info.healthCheckNodePort}) } } else { - glog.V(4).Infof("Deleting health check for %+v", serviceName.NamespacedName) - // Delete healthcheck responders, if any, previously listening for this service - healthcheck.DeleteServiceListener(serviceName.NamespacedName, 0) + healthCheckDel = append(healthCheckDel, healthCheckPort{serviceName.NamespacedName, 0}) } - proxier.serviceMap[serviceName] = info + newServiceMap[serviceName] = info glog.V(4).Infof("added serviceInfo(%s): %s", serviceName, spew.Sdump(info)) } @@ -519,24 +523,52 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { staleUDPServices := sets.NewString() // Remove serviceports missing from the update. - for name, info := range proxier.serviceMap { + for name, info := range newServiceMap { if !activeServices[name] { glog.V(1).Infof("Removing service %q", name) if info.protocol == api.ProtocolUDP { staleUDPServices.Insert(info.clusterIP.String()) } - delete(proxier.serviceMap, name) + delete(newServiceMap, name) if info.onlyNodeLocalEndpoints && info.healthCheckNodePort > 0 { - // Remove ServiceListener health check nodePorts from the health checker - // TODO - Stats - glog.V(4).Infof("Deleting health check for %+v, port %v", name.NamespacedName, info.healthCheckNodePort) - healthcheck.DeleteServiceListener(name.NamespacedName, info.healthCheckNodePort) + healthCheckDel = append(healthCheckDel, healthCheckPort{name.NamespacedName, info.healthCheckNodePort}) } } } + + return newServiceMap, healthCheckAdd, healthCheckDel, staleUDPServices +} + +// OnServiceUpdate tracks the active set of service proxies. +// They will be synchronized using syncProxyRules() +func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { + start := time.Now() + defer func() { + glog.V(4).Infof("OnServiceUpdate took %v for %d services", time.Since(start), len(allServices)) + }() + proxier.mu.Lock() + defer proxier.mu.Unlock() + proxier.haveReceivedServiceUpdate = true + + newServiceMap, hcAdd, hcDel, staleUDPServices := buildServiceMap(allServices, proxier.serviceMap) + proxier.serviceMap = newServiceMap + + for _, hc := range hcAdd { + glog.V(4).Infof("Adding health check for %+v, port %v", hc.namespace, hc.nodeport) + // Turn on healthcheck responder to listen on the health check nodePort + // FIXME: handle failures from adding the service + healthcheck.AddServiceListener(hc.namespace, hc.nodeport) + } + for _, hc := range hcDel { + // Remove ServiceListener health check nodePorts from the health checker + // TODO - Stats + glog.V(4).Infof("Deleting health check for %+v, port %v", hc.namespace, hc.nodeport) + // FIXME: handle failures from deleting the service + healthcheck.DeleteServiceListener(hc.namespace, hc.nodeport) + } + proxier.syncProxyRules() proxier.deleteServiceConnections(staleUDPServices.List()) - } // Generate a list of ip strings from the list of endpoint infos diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index bf0e39bf063..5d4612f9bb0 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -25,8 +25,10 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/service" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/util/exec" + "k8s.io/kubernetes/pkg/util/intstr" utiliptables "k8s.io/kubernetes/pkg/util/iptables" iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing" ) @@ -883,4 +885,172 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable } } +func makeTestService(namespace, name string, svcFunc func(*api.Service)) api.Service { + svc := api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: api.ServiceSpec{}, + Status: api.ServiceStatus{}, + } + svcFunc(&svc) + return svc +} + +func addTestPort(array []api.ServicePort, name string, protocol api.Protocol, port, nodeport int32, targetPort int) []api.ServicePort { + svcPort := api.ServicePort{ + Name: name, + Protocol: protocol, + Port: port, + NodePort: nodeport, + TargetPort: intstr.FromInt(targetPort), + } + return append(array, svcPort) +} + +func TestBuildServiceMapAddRemove(t *testing.T) { + services := []api.Service{ + makeTestService("somewhere-else", "cluster-ip", func(svc *api.Service) { + svc.Spec.Type = api.ServiceTypeClusterIP + svc.Spec.ClusterIP = "172.16.55.4" + svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 0) + svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "UDP", 1235, 5321, 0) + }), + makeTestService("somewhere-else", "node-port", func(svc *api.Service) { + svc.Spec.Type = api.ServiceTypeNodePort + svc.Spec.ClusterIP = "172.16.55.10" + svc.Spec.Ports = addTestPort(svc.Spec.Ports, "blahblah", "UDP", 345, 678, 0) + svc.Spec.Ports = addTestPort(svc.Spec.Ports, "moreblahblah", "TCP", 344, 677, 0) + }), + makeTestService("somewhere", "load-balancer", func(svc *api.Service) { + svc.Spec.Type = api.ServiceTypeLoadBalancer + svc.Spec.ClusterIP = "172.16.55.11" + svc.Spec.LoadBalancerIP = "5.6.7.8" + svc.Spec.Ports = addTestPort(svc.Spec.Ports, "foobar", "UDP", 8675, 30061, 7000) + svc.Spec.Ports = addTestPort(svc.Spec.Ports, "baz", "UDP", 8676, 30062, 7001) + svc.Status.LoadBalancer = api.LoadBalancerStatus{ + Ingress: []api.LoadBalancerIngress{ + {IP: "10.1.2.4"}, + }, + } + }), + makeTestService("somewhere", "only-local-load-balancer", func(svc *api.Service) { + svc.ObjectMeta.Annotations = map[string]string{ + service.BetaAnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal, + service.BetaAnnotationHealthCheckNodePort: "345", + } + svc.Spec.Type = api.ServiceTypeLoadBalancer + svc.Spec.ClusterIP = "172.16.55.12" + svc.Spec.LoadBalancerIP = "5.6.7.8" + svc.Spec.Ports = addTestPort(svc.Spec.Ports, "foobar2", "UDP", 8677, 30063, 7002) + svc.Spec.Ports = addTestPort(svc.Spec.Ports, "baz", "UDP", 8678, 30064, 7003) + svc.Status.LoadBalancer = api.LoadBalancerStatus{ + Ingress: []api.LoadBalancerIngress{ + {IP: "10.1.2.3"}, + }, + } + }), + } + + serviceMap, hcAdd, hcDel, staleUDPServices := buildServiceMap(services, make(proxyServiceMap)) + if len(serviceMap) != 8 { + t.Errorf("expected service map length 8, got %v", serviceMap) + } + + // The only-local-loadbalancer ones get added + if len(hcAdd) != 2 { + t.Errorf("expected healthcheck add length 2, got %v", hcAdd) + } else { + for _, hc := range hcAdd { + if hc.namespace.Namespace != "somewhere" || hc.namespace.Name != "only-local-load-balancer" { + t.Errorf("unexpected healthcheck listener added: %v", hc) + } + } + } + + // All the rest get deleted + if len(hcDel) != 6 { + t.Errorf("expected healthcheck del length 6, got %v", hcDel) + } else { + for _, hc := range hcDel { + if hc.namespace.Namespace == "somewhere" && hc.namespace.Name == "only-local-load-balancer" { + t.Errorf("unexpected healthcheck listener deleted: %v", hc) + } + } + } + + if len(staleUDPServices) != 0 { + // Services only added, so nothing stale yet + t.Errorf("expected stale UDP services length 0, got %d", len(staleUDPServices)) + } + + // Remove some stuff + services = []api.Service{services[0]} + services[0].Spec.Ports = []api.ServicePort{services[0].Spec.Ports[1]} + serviceMap, hcAdd, hcDel, staleUDPServices = buildServiceMap(services, serviceMap) + if len(serviceMap) != 1 { + t.Errorf("expected service map length 1, got %v", serviceMap) + } + + if len(hcAdd) != 0 { + t.Errorf("expected healthcheck add length 1, got %v", hcAdd) + } + + // The only OnlyLocal annotation was removed above, so we expect a delete now. + // FIXME: Since the BetaAnnotationHealthCheckNodePort is the same for all + // ServicePorts, we'll get one delete per ServicePort, even though they all + // contain the same information + if len(hcDel) != 2 { + t.Errorf("expected healthcheck del length 2, got %v", hcDel) + } else { + for _, hc := range hcDel { + if hc.namespace.Namespace != "somewhere" || hc.namespace.Name != "only-local-load-balancer" { + t.Errorf("unexpected healthcheck listener deleted: %v", hc) + } + } + } + + // All services but one were deleted. While you'd expect only the ClusterIPs + // from the three deleted services here, we still have the ClusterIP for + // the not-deleted service, because one of it's ServicePorts was deleted. + expectedStaleUDPServices := []string{"172.16.55.10", "172.16.55.4", "172.16.55.11", "172.16.55.12"} + if len(staleUDPServices) != len(expectedStaleUDPServices) { + t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), staleUDPServices.List()) + } + for _, ip := range expectedStaleUDPServices { + if !staleUDPServices.Has(ip) { + t.Errorf("expected stale UDP service service %s", ip) + } + } +} + +func TestBuildServiceMapServiceHeadless(t *testing.T) { + services := []api.Service{ + makeTestService("somewhere-else", "headless", func(svc *api.Service) { + svc.Spec.Type = api.ServiceTypeClusterIP + svc.Spec.ClusterIP = api.ClusterIPNone + svc.Spec.Ports = addTestPort(svc.Spec.Ports, "rpc", "UDP", 1234, 0, 0) + }), + } + + // Headless service should be ignored + serviceMap, hcAdd, hcDel, staleUDPServices := buildServiceMap(services, make(proxyServiceMap)) + if len(serviceMap) != 0 { + t.Errorf("expected service map length 0, got %d", len(serviceMap)) + } + + // No proxied services, so no healthchecks + if len(hcAdd) != 0 { + t.Errorf("expected healthcheck add length 0, got %d", len(hcAdd)) + } + if len(hcDel) != 0 { + t.Errorf("expected healthcheck del length 0, got %d", len(hcDel)) + } + + if len(staleUDPServices) != 0 { + t.Errorf("expected stale UDP services length 0, got %d", len(staleUDPServices)) + } +} + // TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces. From 433f6830f805c2cad59b07addb48fcb1db07e407 Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Mon, 19 Dec 2016 17:12:32 -0600 Subject: [PATCH 2/4] proxy/iptables: don't proxy ExternalName services The API docs say: // ServiceTypeExternalName means a service consists of only a reference to // an external name that kubedns or equivalent will return as a CNAME // record, with no exposing or proxying of any pods involved. which implies that ExternalName services should be ignored for proxy purposes. --- pkg/proxy/iptables/proxier.go | 5 +++++ pkg/proxy/iptables/proxier_test.go | 26 ++++++++++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 89634d461c5..56643a2cb2d 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -471,6 +471,11 @@ func buildServiceMap(allServices []api.Service, oldServiceMap proxyServiceMap) ( glog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP) continue } + // Even if ClusterIP is set, ServiceTypeExternalName services don't get proxied + if service.Spec.Type == api.ServiceTypeExternalName { + glog.V(3).Infof("Skipping service %s due to Type=ExternalName", svcName) + continue + } for i := range service.Spec.Ports { servicePort := &service.Spec.Ports[i] diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 5d4612f9bb0..3e1fb4b8b15 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -1053,4 +1053,30 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) { } } +func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { + services := []api.Service{ + makeTestService("somewhere-else", "external-name", func(svc *api.Service) { + svc.Spec.Type = api.ServiceTypeExternalName + svc.Spec.ClusterIP = "172.16.55.4" // Should be ignored + svc.Spec.ExternalName = "foo2.bar.com" + svc.Spec.Ports = addTestPort(svc.Spec.Ports, "blah", "UDP", 1235, 5321, 0) + }), + } + + serviceMap, hcAdd, hcDel, staleUDPServices := buildServiceMap(services, make(proxyServiceMap)) + if len(serviceMap) != 0 { + t.Errorf("expected service map length 0, got %v", serviceMap) + } + // No proxied services, so no healthchecks + if len(hcAdd) != 0 { + t.Errorf("expected healthcheck add length 0, got %v", hcAdd) + } + if len(hcDel) != 0 { + t.Errorf("expected healthcheck del length 0, got %v", hcDel) + } + if len(staleUDPServices) != 0 { + t.Errorf("expected stale UDP services length 0, got %v", staleUDPServices) + } +} + // TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces. From 6aa784e6f29ae58a8c19442316ba8cf8227369de Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Mon, 19 Dec 2016 17:15:49 -0600 Subject: [PATCH 3/4] proxy/iptables: don't sync proxy rules if services map didn't change --- pkg/proxy/iptables/proxier.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 56643a2cb2d..87a3c9cc612 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -556,8 +556,6 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { proxier.haveReceivedServiceUpdate = true newServiceMap, hcAdd, hcDel, staleUDPServices := buildServiceMap(allServices, proxier.serviceMap) - proxier.serviceMap = newServiceMap - for _, hc := range hcAdd { glog.V(4).Infof("Adding health check for %+v, port %v", hc.namespace, hc.nodeport) // Turn on healthcheck responder to listen on the health check nodePort @@ -572,7 +570,13 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { healthcheck.DeleteServiceListener(hc.namespace, hc.nodeport) } - proxier.syncProxyRules() + if len(newServiceMap) != len(proxier.serviceMap) || !reflect.DeepEqual(newServiceMap, proxier.serviceMap) { + proxier.serviceMap = newServiceMap + proxier.syncProxyRules() + } else { + glog.V(4).Infof("Skipping proxy iptables rule sync on service update because nothing changed") + } + proxier.deleteServiceConnections(staleUDPServices.List()) } From 59076391403821201de2f226e24bd34d797b7aad Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Tue, 20 Dec 2016 12:24:43 -0600 Subject: [PATCH 4/4] proxy/iptables: clean up service map creation Instead of copying the map, like OnServicesUpdate() used to do and which was copied into buildServiceMap() to preserve semantics while creating testcases, start with a new empty map and do deletion checking later. --- pkg/proxy/iptables/proxier.go | 138 ++++++++++------------------- pkg/proxy/iptables/proxier_test.go | 92 +++++++++++++++++++ 2 files changed, 141 insertions(+), 89 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 87a3c9cc612..1bb7bb2f2ef 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -153,11 +153,35 @@ type endpointsInfo struct { } // returns a new serviceInfo struct -func newServiceInfo(service proxy.ServicePortName) *serviceInfo { - return &serviceInfo{ - sessionAffinityType: api.ServiceAffinityNone, // default - stickyMaxAgeMinutes: 180, // TODO: paramaterize this in the API. +func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, service *api.Service) *serviceInfo { + onlyNodeLocalEndpoints := apiservice.NeedsHealthCheck(service) && featuregate.DefaultFeatureGate.ExternalTrafficLocalOnly() && (service.Spec.Type == api.ServiceTypeLoadBalancer || service.Spec.Type == api.ServiceTypeNodePort) + info := &serviceInfo{ + clusterIP: net.ParseIP(service.Spec.ClusterIP), + port: int(port.Port), + protocol: port.Protocol, + nodePort: int(port.NodePort), + // Deep-copy in case the service instance changes + loadBalancerStatus: *api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer), + sessionAffinityType: service.Spec.SessionAffinity, + stickyMaxAgeMinutes: 180, // TODO: paramaterize this in the API. + externalIPs: make([]string, len(service.Spec.ExternalIPs)), + loadBalancerSourceRanges: make([]string, len(service.Spec.LoadBalancerSourceRanges)), + onlyNodeLocalEndpoints: onlyNodeLocalEndpoints, } + copy(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges) + copy(info.externalIPs, service.Spec.ExternalIPs) + + if info.onlyNodeLocalEndpoints { + p := apiservice.GetServiceHealthCheckNodePort(service) + if p == 0 { + glog.Errorf("Service does not contain necessary annotation %v", + apiservice.BetaAnnotationHealthCheckNodePort) + } else { + info.healthCheckNodePort = int(p) + } + } + + return info } type proxyServiceMap map[proxy.ServicePortName]*serviceInfo @@ -384,44 +408,6 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { return encounteredError } -func sameConfig(info *serviceInfo, service *api.Service, port *api.ServicePort) bool { - if info.protocol != port.Protocol || info.port != int(port.Port) || info.nodePort != int(port.NodePort) { - return false - } - if !info.clusterIP.Equal(net.ParseIP(service.Spec.ClusterIP)) { - return false - } - if !ipsEqual(info.externalIPs, service.Spec.ExternalIPs) { - return false - } - if !api.LoadBalancerStatusEqual(&info.loadBalancerStatus, &service.Status.LoadBalancer) { - return false - } - if info.sessionAffinityType != service.Spec.SessionAffinity { - return false - } - onlyNodeLocalEndpoints := apiservice.NeedsHealthCheck(service) && featuregate.DefaultFeatureGate.ExternalTrafficLocalOnly() - if info.onlyNodeLocalEndpoints != onlyNodeLocalEndpoints { - return false - } - if !reflect.DeepEqual(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges) { - return false - } - return true -} - -func ipsEqual(lhs, rhs []string) bool { - if len(lhs) != len(rhs) { - return false - } - for i := range lhs { - if lhs[i] != rhs[i] { - return false - } - } - return true -} - // Sync is called to immediately synchronize the proxier state to iptables func (proxier *Proxier) Sync() { proxier.mu.Lock() @@ -449,16 +435,10 @@ type healthCheckPort struct { // service map, a list of healthcheck ports to add to or remove from the health // checking listener service, and a set of stale UDP services. func buildServiceMap(allServices []api.Service, oldServiceMap proxyServiceMap) (proxyServiceMap, []healthCheckPort, []healthCheckPort, sets.String) { + newServiceMap := make(proxyServiceMap) healthCheckAdd := make([]healthCheckPort, 0) healthCheckDel := make([]healthCheckPort, 0) - newServiceMap := make(proxyServiceMap) - for key, value := range oldServiceMap { - newServiceMap[key] = value - } - - activeServices := make(map[proxy.ServicePortName]bool) // use a map as a set - for i := range allServices { service := &allServices[i] svcName := types.NamespacedName{ @@ -484,57 +464,37 @@ func buildServiceMap(allServices []api.Service, oldServiceMap proxyServiceMap) ( NamespacedName: svcName, Port: servicePort.Name, } - activeServices[serviceName] = true - info, exists := newServiceMap[serviceName] - if exists { - if sameConfig(info, service, servicePort) { - // Nothing changed. - continue - } - // Something changed. - glog.V(3).Infof("Something changed for service %q: removing it", serviceName) - delete(newServiceMap, serviceName) - } - serviceIP := net.ParseIP(service.Spec.ClusterIP) - glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol) - info = newServiceInfo(serviceName) - info.clusterIP = serviceIP - info.port = int(servicePort.Port) - info.protocol = servicePort.Protocol - info.nodePort = int(servicePort.NodePort) - info.externalIPs = service.Spec.ExternalIPs - // Deep-copy in case the service instance changes - info.loadBalancerStatus = *api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer) - info.sessionAffinityType = service.Spec.SessionAffinity - info.loadBalancerSourceRanges = service.Spec.LoadBalancerSourceRanges - info.onlyNodeLocalEndpoints = apiservice.NeedsHealthCheck(service) && featuregate.DefaultFeatureGate.ExternalTrafficLocalOnly() && (service.Spec.Type == api.ServiceTypeLoadBalancer || service.Spec.Type == api.ServiceTypeNodePort) - if info.onlyNodeLocalEndpoints { - p := apiservice.GetServiceHealthCheckNodePort(service) - if p == 0 { - glog.Errorf("Service does not contain necessary annotation %v", - apiservice.BetaAnnotationHealthCheckNodePort) - } else { - info.healthCheckNodePort = int(p) - healthCheckAdd = append(healthCheckAdd, healthCheckPort{serviceName.NamespacedName, info.healthCheckNodePort}) - } - } else { - healthCheckDel = append(healthCheckDel, healthCheckPort{serviceName.NamespacedName, 0}) - } - newServiceMap[serviceName] = info + info := newServiceInfo(serviceName, servicePort, service) + oldInfo, exists := oldServiceMap[serviceName] + equal := reflect.DeepEqual(info, oldInfo) + if !exists { + glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol) + } else if !equal { + glog.V(1).Infof("Updating existing service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol) + } + + if !exists || !equal { + if info.onlyNodeLocalEndpoints && info.healthCheckNodePort > 0 { + healthCheckAdd = append(healthCheckAdd, healthCheckPort{serviceName.NamespacedName, info.healthCheckNodePort}) + } else { + healthCheckDel = append(healthCheckDel, healthCheckPort{serviceName.NamespacedName, 0}) + } + } + + newServiceMap[serviceName] = info glog.V(4).Infof("added serviceInfo(%s): %s", serviceName, spew.Sdump(info)) } } staleUDPServices := sets.NewString() // Remove serviceports missing from the update. - for name, info := range newServiceMap { - if !activeServices[name] { + for name, info := range oldServiceMap { + if _, exists := newServiceMap[name]; !exists { glog.V(1).Infof("Removing service %q", name) if info.protocol == api.ProtocolUDP { staleUDPServices.Insert(info.clusterIP.String()) } - delete(newServiceMap, name) if info.onlyNodeLocalEndpoints && info.healthCheckNodePort > 0 { healthCheckDel = append(healthCheckDel, healthCheckPort{name.NamespacedName, info.healthCheckNodePort}) } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 3e1fb4b8b15..dd4b7065e5b 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -1079,4 +1079,96 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { } } +func TestBuildServiceMapServiceUpdate(t *testing.T) { + first := []api.Service{ + makeTestService("somewhere", "some-service", func(svc *api.Service) { + svc.Spec.Type = api.ServiceTypeClusterIP + svc.Spec.ClusterIP = "172.16.55.4" + svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 0) + svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 0) + }), + } + + second := []api.Service{ + makeTestService("somewhere", "some-service", func(svc *api.Service) { + svc.ObjectMeta.Annotations = map[string]string{ + service.BetaAnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal, + service.BetaAnnotationHealthCheckNodePort: "345", + } + svc.Spec.Type = api.ServiceTypeLoadBalancer + svc.Spec.ClusterIP = "172.16.55.4" + svc.Spec.LoadBalancerIP = "5.6.7.8" + svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 7002) + svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 7003) + svc.Status.LoadBalancer = api.LoadBalancerStatus{ + Ingress: []api.LoadBalancerIngress{ + {IP: "10.1.2.3"}, + }, + } + }), + } + + serviceMap, hcAdd, hcDel, staleUDPServices := buildServiceMap(first, make(proxyServiceMap)) + if len(serviceMap) != 2 { + t.Errorf("expected service map length 2, got %v", serviceMap) + } + if len(hcAdd) != 0 { + t.Errorf("expected healthcheck add length 0, got %v", hcAdd) + } + if len(hcDel) != 2 { + t.Errorf("expected healthcheck del length 2, got %v", hcDel) + } + if len(staleUDPServices) != 0 { + // Services only added, so nothing stale yet + t.Errorf("expected stale UDP services length 0, got %d", len(staleUDPServices)) + } + + // Change service to load-balancer + serviceMap, hcAdd, hcDel, staleUDPServices = buildServiceMap(second, serviceMap) + if len(serviceMap) != 2 { + t.Errorf("expected service map length 2, got %v", serviceMap) + } + if len(hcAdd) != 2 { + t.Errorf("expected healthcheck add length 2, got %v", hcAdd) + } + if len(hcDel) != 0 { + t.Errorf("expected healthcheck add length 2, got %v", hcDel) + } + if len(staleUDPServices) != 0 { + t.Errorf("expected stale UDP services length 0, got %v", staleUDPServices.List()) + } + + // No change; make sure the service map stays the same and there are + // no health-check changes + serviceMap, hcAdd, hcDel, staleUDPServices = buildServiceMap(second, serviceMap) + if len(serviceMap) != 2 { + t.Errorf("expected service map length 2, got %v", serviceMap) + } + if len(hcAdd) != 0 { + t.Errorf("expected healthcheck add length 0, got %v", hcAdd) + } + if len(hcDel) != 0 { + t.Errorf("expected healthcheck add length 2, got %v", hcDel) + } + if len(staleUDPServices) != 0 { + t.Errorf("expected stale UDP services length 0, got %v", staleUDPServices.List()) + } + + // And back to ClusterIP + serviceMap, hcAdd, hcDel, staleUDPServices = buildServiceMap(first, serviceMap) + if len(serviceMap) != 2 { + t.Errorf("expected service map length 2, got %v", serviceMap) + } + if len(hcAdd) != 0 { + t.Errorf("expected healthcheck add length 0, got %v", hcAdd) + } + if len(hcDel) != 2 { + t.Errorf("expected healthcheck del length 2, got %v", hcDel) + } + if len(staleUDPServices) != 0 { + // Services only added, so nothing stale yet + t.Errorf("expected stale UDP services length 0, got %d", len(staleUDPServices)) + } +} + // TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces.