From 8dd4cbe88b1b60dfdc8c051a11617b6f9dc77cfb Mon Sep 17 00:00:00 2001 From: m1093782566 Date: Sat, 3 Feb 2018 17:51:17 +0800 Subject: [PATCH] ipvs part changes --- pkg/proxy/ipvs/proxier.go | 445 +++++++---------------------- pkg/proxy/ipvs/proxier_test.go | 498 ++++++++------------------------- 2 files changed, 222 insertions(+), 721 deletions(-) diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index e86d1019a79..e567f915175 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -24,7 +24,6 @@ import ( "bytes" "fmt" "net" - "reflect" "strconv" "strings" "sync" @@ -109,12 +108,12 @@ type Proxier struct { // services that happened since last syncProxyRules call. For a single object, // changes are accumulated, i.e. previous is state from before all of them, // current is state after applying all of those. - endpointsChanges endpointsChangeMap - serviceChanges serviceChangeMap + endpointsChanges *proxy.EndpointChangeTracker + serviceChanges *proxy.ServiceChangeTracker mu sync.Mutex // protects the following fields - serviceMap proxyServiceMap - endpointsMap proxyEndpointsMap + serviceMap proxy.ServiceMap + endpointsMap proxy.EndpointsMap portsMap map[utilproxy.LocalPort]utilproxy.Closeable // endpointsSynced and servicesSynced are set to true when corresponding // objects are synced after startup. This is used to avoid updating ipvs rules @@ -302,10 +301,10 @@ func NewProxier(ipt utiliptables.Interface, proxier := &Proxier{ portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), - serviceMap: make(proxyServiceMap), - serviceChanges: newServiceChangeMap(), - endpointsMap: make(proxyEndpointsMap), - endpointsChanges: newEndpointsChangeMap(hostname), + serviceMap: make(proxy.ServiceMap), + serviceChanges: proxy.NewServiceChangeTracker(), + endpointsMap: make(proxy.EndpointsMap), + endpointsChanges: proxy.NewEndpointChangeTracker(hostname), syncPeriod: syncPeriod, minSyncPeriod: minSyncPeriod, iptables: ipt, @@ -343,8 +342,6 @@ func NewProxier(ipt utiliptables.Interface, return proxier, nil } -type proxyServiceMap map[proxy.ServicePortName]*serviceInfo - // internal struct for string service information type serviceInfo struct { clusterIP net.IP @@ -362,37 +359,8 @@ type serviceInfo struct { serviceNameString string } -// is updated by this function (based on the given changes). -// map is cleared after applying them. -func updateServiceMap( - serviceMap proxyServiceMap, - changes *serviceChangeMap) (result updateServiceMapResult) { - result.staleServices = sets.NewString() - - func() { - changes.lock.Lock() - defer changes.lock.Unlock() - for _, change := range changes.items { - existingPorts := serviceMap.merge(change.current) - serviceMap.unmerge(change.previous, existingPorts, result.staleServices) - } - changes.items = make(map[types.NamespacedName]*serviceChange) - }() - - // TODO: If this will appear to be computationally expensive, consider - // computing this incrementally similarly to serviceMap. - result.hcServices = make(map[types.NamespacedName]uint16) - for svcPortName, info := range serviceMap { - if info.healthCheckNodePort != 0 { - result.hcServices[svcPortName.NamespacedName] = uint16(info.healthCheckNodePort) - } - } - - return result -} - -// returns a new serviceInfo struct -func newServiceInfo(svcPortName proxy.ServicePortName, port *api.ServicePort, service *api.Service) *serviceInfo { +// returns a new proxy.ServicePort which abstracts a serviceInfo +func newServiceInfo(port *api.ServicePort, service *api.Service) proxy.ServicePort { onlyNodeLocalEndpoints := false if apiservice.RequestsOnlyLocalTraffic(service) { onlyNodeLocalEndpoints = true @@ -418,10 +386,13 @@ func newServiceInfo(svcPortName proxy.ServicePortName, port *api.ServicePort, se copy(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges) copy(info.externalIPs, service.Spec.ExternalIPs) + svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: port.Name} + if apiservice.NeedsHealthCheck(service) { p := service.Spec.HealthCheckNodePort if p == 0 { - glog.Errorf("Service %q has no healthcheck nodeport", svcPortName.NamespacedName.String()) + glog.Errorf("Service %q has no healthcheck nodeport", svcName.String()) } else { info.healthCheckNodePort = int(p) } @@ -433,103 +404,32 @@ func newServiceInfo(svcPortName proxy.ServicePortName, port *api.ServicePort, se return info } -func (sm *proxyServiceMap) merge(other proxyServiceMap) sets.String { - existingPorts := sets.NewString() - for svcPortName, info := range other { - existingPorts.Insert(svcPortName.Port) - _, exists := (*sm)[svcPortName] - if !exists { - glog.V(1).Infof("Adding new service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol) - } else { - glog.V(1).Infof("Updating existing service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol) - } - (*sm)[svcPortName] = info - } - return existingPorts +// ClusterIP is part of ServicePort interface. +func (info *serviceInfo) ClusterIP() string { + return info.clusterIP.String() } -func (sm *proxyServiceMap) unmerge(other proxyServiceMap, existingPorts, staleServices sets.String) { - for svcPortName := range other { - if existingPorts.Has(svcPortName.Port) { - continue - } - info, exists := (*sm)[svcPortName] - if exists { - glog.V(1).Infof("Removing service port %q", svcPortName) - if info.protocol == api.ProtocolUDP { - staleServices.Insert(info.clusterIP.String()) - } - delete(*sm, svcPortName) - } else { - glog.Errorf("Service port %q removed, but doesn't exists", svcPortName) - } - } +// Port is part of ServicePort interface. +func (info *serviceInfo) Port() int { + return info.port } -type serviceChangeMap struct { - lock sync.Mutex - items map[types.NamespacedName]*serviceChange +// Protocol is part of ServicePort interface. +func (info *serviceInfo) Protocol() api.Protocol { + return info.protocol } -type serviceChange struct { - previous proxyServiceMap - current proxyServiceMap +// String is part of ServicePort interface. +func (info *serviceInfo) String() string { + return fmt.Sprintf("%s:%d/%s", info.clusterIP, info.port, info.protocol) } -type updateEndpointMapResult struct { - hcEndpoints map[types.NamespacedName]int - staleEndpoints map[endpointServicePair]bool - staleServiceNames map[proxy.ServicePortName]bool +// HealthCheckNodePort is part of ServicePort interface. +func (info *serviceInfo) HealthCheckNodePort() int { + return info.healthCheckNodePort } -type updateServiceMapResult struct { - hcServices map[types.NamespacedName]uint16 - staleServices sets.String -} - -func newServiceChangeMap() serviceChangeMap { - return serviceChangeMap{ - items: make(map[types.NamespacedName]*serviceChange), - } -} - -func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Service) bool { - scm.lock.Lock() - defer scm.lock.Unlock() - - change, exists := scm.items[*namespacedName] - if !exists { - change = &serviceChange{} - change.previous = serviceToServiceMap(previous) - scm.items[*namespacedName] = change - } - change.current = serviceToServiceMap(current) - if reflect.DeepEqual(change.previous, change.current) { - delete(scm.items, *namespacedName) - } - return len(scm.items) > 0 -} - -// Translates single Service object to proxyServiceMap. -// -// NOTE: service object should NOT be modified. -func serviceToServiceMap(service *api.Service) proxyServiceMap { - if service == nil { - return nil - } - svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if utilproxy.ShouldSkipService(svcName, service) { - return nil - } - - serviceMap := make(proxyServiceMap) - for i := range service.Spec.Ports { - servicePort := &service.Spec.Ports[i] - svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name} - serviceMap[svcPortName] = newServiceInfo(svcPortName, servicePort, service) - } - return serviceMap -} +var _ proxy.ServicePort = &serviceInfo{} // internal struct for endpoints information type endpointsInfo struct { @@ -537,12 +437,26 @@ type endpointsInfo struct { isLocal bool } -func (e *endpointsInfo) String() string { - return fmt.Sprintf("%v", *e) +// returns a new proxy.Endpoint which abstracts a endpointsInfo +func newEndpointsInfo(IP string, port int, isLocal bool) proxy.Endpoint { + return &endpointsInfo{ + endpoint: net.JoinHostPort(IP, strconv.Itoa(port)), + isLocal: isLocal, + } } -// IPPart returns just the IP part of the endpoint. -func (e *endpointsInfo) IPPart() string { +// IsLocal is part of proxy.Endpoint interface. +func (e *endpointsInfo) IsLocal() bool { + return e.isLocal +} + +// String is part of proxy.Endpoint interface. +func (e *endpointsInfo) String() string { + return fmt.Sprintf("%v", e.endpoint) +} + +// IP returns just the IP part of the endpoint, it's a part of proxy.Endpoints interface. +func (e *endpointsInfo) IP() string { return utilproxy.IPPart(e.endpoint) } @@ -551,164 +465,18 @@ func (e *endpointsInfo) PortPart() (int, error) { return utilproxy.PortPart(e.endpoint) } -type endpointServicePair struct { - endpoint string - servicePortName proxy.ServicePortName +// Equal is part of proxy.Endpoint interface. +func (e *endpointsInfo) Equal(other proxy.Endpoint) bool { + o, ok := other.(*endpointsInfo) + if !ok { + glog.Errorf("Failed to cast endpointsInfo") + return false + } + return e.endpoint == o.endpoint && + e.isLocal == o.isLocal } -type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo - -type endpointsChange struct { - previous proxyEndpointsMap - current proxyEndpointsMap -} - -type endpointsChangeMap struct { - lock sync.Mutex - hostname string - items map[types.NamespacedName]*endpointsChange -} - -// and are modified by this function with detected stale connections. -func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap, staleEndpoints map[endpointServicePair]bool, staleServiceNames map[proxy.ServicePortName]bool) { - for svcPortName, epList := range oldEndpointsMap { - for _, ep := range epList { - stale := true - for i := range newEndpointsMap[svcPortName] { - if *newEndpointsMap[svcPortName][i] == *ep { - stale = false - break - } - } - if stale { - glog.V(4).Infof("Stale endpoint %v -> %v", svcPortName, ep.endpoint) - staleEndpoints[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPortName}] = true - } - } - } - - for svcPortName, epList := range newEndpointsMap { - // For udp service, if its backend changes from 0 to non-0. There may exist a conntrack entry that could blackhole traffic to the service. - if len(epList) > 0 && len(oldEndpointsMap[svcPortName]) == 0 { - staleServiceNames[svcPortName] = true - } - } -} - -// is updated by this function (based on the given changes). -// map is cleared after applying them. -func updateEndpointsMap( - endpointsMap proxyEndpointsMap, - changes *endpointsChangeMap, - hostname string) (result updateEndpointMapResult) { - result.staleEndpoints = make(map[endpointServicePair]bool) - result.staleServiceNames = make(map[proxy.ServicePortName]bool) - - func() { - changes.lock.Lock() - defer changes.lock.Unlock() - for _, change := range changes.items { - endpointsMap.unmerge(change.previous) - endpointsMap.merge(change.current) - detectStaleConnections(change.previous, change.current, result.staleEndpoints, result.staleServiceNames) - } - changes.items = make(map[types.NamespacedName]*endpointsChange) - }() - - // TODO: If this will appear to be computationally expensive, consider - // computing this incrementally similarly to endpointsMap. - result.hcEndpoints = make(map[types.NamespacedName]int) - localIPs := getLocalIPs(endpointsMap) - for nsn, ips := range localIPs { - result.hcEndpoints[nsn] = len(ips) - } - - return result -} - -// Translates single Endpoints object to proxyEndpointsMap. -// This function is used for incremental updated of endpointsMap. -// -// NOTE: endpoints object should NOT be modified. -func endpointsToEndpointsMap(endpoints *api.Endpoints, hostname string) proxyEndpointsMap { - if endpoints == nil { - return nil - } - - endpointsMap := make(proxyEndpointsMap) - // We need to build a map of portname -> all ip:ports for that - // portname. Explode Endpoints.Subsets[*] into this structure. - for i := range endpoints.Subsets { - ss := &endpoints.Subsets[i] - for i := range ss.Ports { - port := &ss.Ports[i] - if port.Port == 0 { - glog.Warningf("ignoring invalid endpoint port %s", port.Name) - continue - } - svcPort := proxy.ServicePortName{ - NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, - Port: port.Name, - } - for i := range ss.Addresses { - addr := &ss.Addresses[i] - if addr.IP == "" { - glog.Warningf("ignoring invalid endpoint port %s with empty host", port.Name) - continue - } - epInfo := &endpointsInfo{ - endpoint: net.JoinHostPort(addr.IP, strconv.Itoa(int(port.Port))), - isLocal: addr.NodeName != nil && *addr.NodeName == hostname, - } - endpointsMap[svcPort] = append(endpointsMap[svcPort], epInfo) - } - if glog.V(3) { - newEPList := []string{} - for _, ep := range endpointsMap[svcPort] { - newEPList = append(newEPList, ep.endpoint) - } - glog.Infof("Setting endpoints for %q to %+v", svcPort, newEPList) - } - } - } - return endpointsMap -} - -func newEndpointsChangeMap(hostname string) endpointsChangeMap { - return endpointsChangeMap{ - hostname: hostname, - items: make(map[types.NamespacedName]*endpointsChange), - } -} - -func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Endpoints) bool { - ecm.lock.Lock() - defer ecm.lock.Unlock() - - change, exists := ecm.items[*namespacedName] - if !exists { - change = &endpointsChange{} - change.previous = endpointsToEndpointsMap(previous, ecm.hostname) - ecm.items[*namespacedName] = change - } - change.current = endpointsToEndpointsMap(current, ecm.hostname) - if reflect.DeepEqual(change.previous, change.current) { - delete(ecm.items, *namespacedName) - } - return len(ecm.items) > 0 -} - -func (em proxyEndpointsMap) merge(other proxyEndpointsMap) { - for svcPort := range other { - em[svcPort] = other[svcPort] - } -} - -func (em proxyEndpointsMap) unmerge(other proxyEndpointsMap) { - for svcPort := range other { - delete(em, svcPort) - } -} +var _ proxy.Endpoint = &endpointsInfo{} // KernelHandler can handle the current installed kernel modules. type KernelHandler interface { @@ -891,24 +659,21 @@ func (proxier *Proxier) isInitialized() bool { // OnServiceAdd is called whenever creation of new service object is observed. func (proxier *Proxier) OnServiceAdd(service *api.Service) { - namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if proxier.serviceChanges.update(&namespacedName, nil, service) && proxier.isInitialized() { + if proxier.serviceChanges.Update(nil, service, newServiceInfo) && proxier.isInitialized() { proxier.syncRunner.Run() } } // OnServiceUpdate is called whenever modification of an existing service object is observed. func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) { - namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if proxier.serviceChanges.update(&namespacedName, oldService, service) && proxier.isInitialized() { + if proxier.serviceChanges.Update(oldService, service, newServiceInfo) && proxier.isInitialized() { proxier.syncRunner.Run() } } // OnServiceDelete is called whenever deletion of an existing service object is observed. func (proxier *Proxier) OnServiceDelete(service *api.Service) { - namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if proxier.serviceChanges.update(&namespacedName, service, nil) && proxier.isInitialized() { + if proxier.serviceChanges.Update(service, nil, newServiceInfo) && proxier.isInitialized() { proxier.syncRunner.Run() } } @@ -926,24 +691,21 @@ func (proxier *Proxier) OnServiceSynced() { // OnEndpointsAdd is called whenever creation of new endpoints object is observed. func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) { - namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - if proxier.endpointsChanges.update(&namespacedName, nil, endpoints) && proxier.isInitialized() { + if proxier.endpointsChanges.Update(nil, endpoints, newEndpointsInfo) && proxier.isInitialized() { proxier.syncRunner.Run() } } // OnEndpointsUpdate is called whenever modification of an existing endpoints object is observed. func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) { - namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) && proxier.isInitialized() { + if proxier.endpointsChanges.Update(oldEndpoints, endpoints, newEndpointsInfo) && proxier.isInitialized() { proxier.syncRunner.Run() } } // OnEndpointsDelete is called whenever deletion of an existing endpoints object is observed. func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) { - namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - if proxier.endpointsChanges.update(&namespacedName, endpoints, nil) && proxier.isInitialized() { + if proxier.endpointsChanges.Update(endpoints, nil, newEndpointsInfo) && proxier.isInitialized() { proxier.syncRunner.Run() } } @@ -977,17 +739,15 @@ func (proxier *Proxier) syncProxyRules() { // We assume that if this was called, we really want to sync them, // even if nothing changed in the meantime. In other words, callers are // responsible for detecting no-op changes and not calling this function. - serviceUpdateResult := updateServiceMap( - proxier.serviceMap, &proxier.serviceChanges) - endpointUpdateResult := updateEndpointsMap( - proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname) + serviceUpdateResult := proxy.UpdateServiceMap(proxier.serviceMap, proxier.serviceChanges) + endpointUpdateResult := proxy.UpdateEndpointsMap(proxier.endpointsMap, proxier.endpointsChanges) - staleServices := serviceUpdateResult.staleServices + staleServices := serviceUpdateResult.UDPStaleClusterIP // merge stale services gathered from updateEndpointsMap - for svcPortName := range endpointUpdateResult.staleServiceNames { - if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.protocol == api.ProtocolUDP { - glog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.clusterIP.String()) - staleServices.Insert(svcInfo.clusterIP.String()) + for _, svcPortName := range endpointUpdateResult.StaleServiceNames { + if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == api.ProtocolUDP { + glog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.ClusterIP()) + staleServices.Insert(svcInfo.ClusterIP()) } } @@ -1090,15 +850,25 @@ func (proxier *Proxier) syncProxyRules() { } // Build IPVS rules for each service. - for svcName, svcInfo := range proxier.serviceMap { + for svcName, svc := range proxier.serviceMap { + svcInfo, ok := svc.(*serviceInfo) + if !ok { + glog.Errorf("Failed to cast serviceInfo %q", svcName.String()) + continue + } protocol := strings.ToLower(string(svcInfo.protocol)) // Precompute svcNameString; with many services the many calls // to ServicePortName.String() show up in CPU profiles. svcNameString := svcName.String() // Handle traffic that loops back to the originator with SNAT. - for _, ep := range proxier.endpointsMap[svcName] { - epIP := ep.IPPart() + for _, e := range proxier.endpointsMap[svcName] { + ep, ok := e.(*endpointsInfo) + if !ok { + glog.Errorf("Failed to cast endpointsInfo %q", e.String()) + continue + } + epIP := ep.IP() epPort, err := ep.PortPart() // Error parsing this endpoint has been logged. Skip to next endpoint. if epIP == "" || err != nil { @@ -1532,10 +1302,10 @@ func (proxier *Proxier) syncProxyRules() { // Update healthchecks. The endpoints list might include services that are // not "OnlyLocal", but the services list will not, and the healthChecker // will just drop those endpoints. - if err := proxier.healthChecker.SyncServices(serviceUpdateResult.hcServices); err != nil { + if err := proxier.healthChecker.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil { glog.Errorf("Error syncing healtcheck services: %v", err) } - if err := proxier.healthChecker.SyncEndpoints(endpointUpdateResult.hcEndpoints); err != nil { + if err := proxier.healthChecker.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil { glog.Errorf("Error syncing healthcheck endpoints: %v", err) } @@ -1546,19 +1316,19 @@ func (proxier *Proxier) syncProxyRules() { glog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err) } } - proxier.deleteEndpointConnections(endpointUpdateResult.staleEndpoints) + proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints) } // After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we // risk sending more traffic to it, all of which will be lost (because UDP). // This assumes the proxier mutex is held -func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServicePair]bool) { - for epSvcPair := range connectionMap { - if svcInfo, ok := proxier.serviceMap[epSvcPair.servicePortName]; ok && svcInfo.protocol == api.ProtocolUDP { - endpointIP := utilproxy.IPPart(epSvcPair.endpoint) - err := utilproxy.ClearUDPConntrackForPeers(proxier.exec, svcInfo.clusterIP.String(), endpointIP) +func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) { + for _, epSvcPair := range connectionMap { + if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == api.ProtocolUDP { + endpointIP := utilproxy.IPPart(epSvcPair.Endpoint) + err := utilproxy.ClearUDPConntrackForPeers(proxier.exec, svcInfo.ClusterIP(), endpointIP) if err != nil { - glog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.servicePortName.String(), err) + glog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err) } } } @@ -1619,8 +1389,13 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode } for _, eps := range proxier.endpointsMap[svcPortName] { - if !onlyNodeLocalEndpoints || onlyNodeLocalEndpoints && eps.isLocal { - newEndpoints.Insert(eps.endpoint) + epInfo, ok := eps.(*endpointsInfo) + if !ok { + glog.Errorf("Failed to cast endpointsInfo") + continue + } + if !onlyNodeLocalEndpoints || onlyNodeLocalEndpoints && epInfo.isLocal { + newEndpoints.Insert(epInfo.endpoint) } } @@ -1759,26 +1534,6 @@ func writeLine(buf *bytes.Buffer, words ...string) { } } -func getLocalIPs(endpointsMap proxyEndpointsMap) map[types.NamespacedName]sets.String { - localIPs := make(map[types.NamespacedName]sets.String) - for svcPortName := range endpointsMap { - for _, ep := range endpointsMap[svcPortName] { - if ep.isLocal { - // If the endpoint has a bad format, utilproxy.IPPart() will log an - // error and ep.IPPart() will return a null string. - if ip := ep.IPPart(); ip != "" { - nsn := svcPortName.NamespacedName - if localIPs[nsn] == nil { - localIPs[nsn] = sets.NewString() - } - localIPs[nsn].Insert(ip) - } - } - } - } - return localIPs -} - // listenPortOpener opens ports by calling bind() and listen(). type listenPortOpener struct{} diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 88f7df51897..fb242566015 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -40,8 +40,6 @@ import ( iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing" utilipvs "k8s.io/kubernetes/pkg/util/ipvs" ipvstest "k8s.io/kubernetes/pkg/util/ipvs/testing" - - "github.com/davecgh/go-spew/spew" ) const testHostname = "test-hostname" @@ -121,10 +119,10 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u } return &Proxier{ exec: fexec, - serviceMap: make(proxyServiceMap), - serviceChanges: newServiceChangeMap(), - endpointsMap: make(proxyEndpointsMap), - endpointsChanges: newEndpointsChangeMap(testHostname), + serviceMap: make(proxy.ServiceMap), + serviceChanges: proxy.NewServiceChangeTracker(), + endpointsMap: make(proxy.EndpointsMap), + endpointsChanges: proxy.NewEndpointChangeTracker(testHostname), iptables: ipt, ipvs: ipvs, ipset: ipset, @@ -997,24 +995,24 @@ func TestBuildServiceMapAddRemove(t *testing.T) { for i := range services { fp.OnServiceAdd(services[i]) } - result := updateServiceMap(fp.serviceMap, &fp.serviceChanges) + result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges) if len(fp.serviceMap) != 8 { t.Errorf("expected service map length 8, got %v", fp.serviceMap) } // The only-local-loadbalancer ones get added - if len(result.hcServices) != 1 { - t.Errorf("expected 1 healthcheck port, got %v", result.hcServices) + if len(result.HCServiceNodePorts) != 1 { + t.Errorf("expected 1 healthcheck port, got %v", result.HCServiceNodePorts) } else { nsn := makeNSN("somewhere", "only-local-load-balancer") - if port, found := result.hcServices[nsn]; !found || port != 345 { - t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, result.hcServices) + if port, found := result.HCServiceNodePorts[nsn]; !found || port != 345 { + t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, result.HCServiceNodePorts) } } - if len(result.staleServices) != 0 { + if len(result.UDPStaleClusterIP) != 0 { // Services only added, so nothing stale yet - t.Errorf("expected stale UDP services length 0, got %d", len(result.staleServices)) + t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) } // Remove some stuff @@ -1030,24 +1028,24 @@ func TestBuildServiceMapAddRemove(t *testing.T) { fp.OnServiceDelete(services[2]) fp.OnServiceDelete(services[3]) - result = updateServiceMap(fp.serviceMap, &fp.serviceChanges) + result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges) if len(fp.serviceMap) != 1 { t.Errorf("expected service map length 1, got %v", fp.serviceMap) } - if len(result.hcServices) != 0 { - t.Errorf("expected 0 healthcheck ports, got %v", result.hcServices) + if len(result.HCServiceNodePorts) != 0 { + t.Errorf("expected 0 healthcheck ports, got %v", result.HCServiceNodePorts) } // All services but one were deleted. While you'd expect only the ClusterIPs // from the three deleted services here, we still have the ClusterIP for // the not-deleted service, because one of it's ServicePorts was deleted. expectedStaleUDPServices := []string{"172.16.55.10", "172.16.55.4", "172.16.55.11", "172.16.55.12"} - if len(result.staleServices) != len(expectedStaleUDPServices) { - t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.staleServices.List()) + if len(result.UDPStaleClusterIP) != len(expectedStaleUDPServices) { + t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.UDPStaleClusterIP.List()) } for _, ip := range expectedStaleUDPServices { - if !result.staleServices.Has(ip) { + if !result.UDPStaleClusterIP.Has(ip) { t.Errorf("expected stale UDP service service %s", ip) } } @@ -1072,18 +1070,18 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) { ) // Headless service should be ignored - result := updateServiceMap(fp.serviceMap, &fp.serviceChanges) + result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges) if len(fp.serviceMap) != 0 { t.Errorf("expected service map length 0, got %d", len(fp.serviceMap)) } // No proxied services, so no healthchecks - if len(result.hcServices) != 0 { - t.Errorf("expected healthcheck ports length 0, got %d", len(result.hcServices)) + if len(result.HCServiceNodePorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %d", len(result.HCServiceNodePorts)) } - if len(result.staleServices) != 0 { - t.Errorf("expected stale UDP services length 0, got %d", len(result.staleServices)) + if len(result.UDPStaleClusterIP) != 0 { + t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) } } @@ -1102,16 +1100,16 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { }), ) - result := updateServiceMap(fp.serviceMap, &fp.serviceChanges) + result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges) if len(fp.serviceMap) != 0 { t.Errorf("expected service map length 0, got %v", fp.serviceMap) } // No proxied services, so no healthchecks - if len(result.hcServices) != 0 { - t.Errorf("expected healthcheck ports length 0, got %v", result.hcServices) + if len(result.HCServiceNodePorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts) } - if len(result.staleServices) != 0 { - t.Errorf("expected stale UDP services length 0, got %v", result.staleServices) + if len(result.UDPStaleClusterIP) != 0 { + t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP) } } @@ -1144,57 +1142,57 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { fp.OnServiceAdd(servicev1) - result := updateServiceMap(fp.serviceMap, &fp.serviceChanges) + result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges) if len(fp.serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.serviceMap) } - if len(result.hcServices) != 0 { - t.Errorf("expected healthcheck ports length 0, got %v", result.hcServices) + if len(result.HCServiceNodePorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts) } - if len(result.staleServices) != 0 { + if len(result.UDPStaleClusterIP) != 0 { // Services only added, so nothing stale yet - t.Errorf("expected stale UDP services length 0, got %d", len(result.staleServices)) + t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) } // Change service to load-balancer fp.OnServiceUpdate(servicev1, servicev2) - result = updateServiceMap(fp.serviceMap, &fp.serviceChanges) + result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges) if len(fp.serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.serviceMap) } - if len(result.hcServices) != 1 { - t.Errorf("expected healthcheck ports length 1, got %v", result.hcServices) + if len(result.HCServiceNodePorts) != 1 { + t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts) } - if len(result.staleServices) != 0 { - t.Errorf("expected stale UDP services length 0, got %v", result.staleServices.List()) + if len(result.UDPStaleClusterIP) != 0 { + t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.List()) } // No change; make sure the service map stays the same and there are // no health-check changes fp.OnServiceUpdate(servicev2, servicev2) - result = updateServiceMap(fp.serviceMap, &fp.serviceChanges) + result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges) if len(fp.serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.serviceMap) } - if len(result.hcServices) != 1 { - t.Errorf("expected healthcheck ports length 1, got %v", result.hcServices) + if len(result.HCServiceNodePorts) != 1 { + t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts) } - if len(result.staleServices) != 0 { - t.Errorf("expected stale UDP services length 0, got %v", result.staleServices.List()) + if len(result.UDPStaleClusterIP) != 0 { + t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.List()) } // And back to ClusterIP fp.OnServiceUpdate(servicev2, servicev1) - result = updateServiceMap(fp.serviceMap, &fp.serviceChanges) + result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges) if len(fp.serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.serviceMap) } - if len(result.hcServices) != 0 { - t.Errorf("expected healthcheck ports length 0, got %v", result.hcServices) + if len(result.HCServiceNodePorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts) } - if len(result.staleServices) != 0 { + if len(result.UDPStaleClusterIP) != 0 { // Services only added, so nothing stale yet - t.Errorf("expected stale UDP services length 0, got %d", len(result.staleServices)) + t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) } } @@ -1570,14 +1568,14 @@ func Test_updateEndpointsMap(t *testing.T) { currentEndpoints []*api.Endpoints oldEndpoints map[proxy.ServicePortName][]*endpointsInfo expectedResult map[proxy.ServicePortName][]*endpointsInfo - expectedStaleEndpoints []endpointServicePair + expectedStaleEndpoints []proxy.ServiceEndpoint expectedStaleServiceNames map[proxy.ServicePortName]bool expectedHealthchecks map[types.NamespacedName]int }{{ // Case[0]: nothing oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{}, }, { @@ -1598,7 +1596,7 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.1:11", isLocal: false}, }, }, - expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{}, }, { @@ -1619,7 +1617,7 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.1:11", isLocal: true}, }, }, - expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, @@ -1648,7 +1646,7 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.2:12", isLocal: false}, }, }, - expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{}, }, { @@ -1681,7 +1679,7 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.3:13", isLocal: false}, }, }, - expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, @@ -1748,7 +1746,7 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "2.2.2.2:22", isLocal: true}, }, }, - expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 2, @@ -1768,7 +1766,7 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.1:11", isLocal: true}, }, }, - expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", ""): true, }, @@ -1789,9 +1787,9 @@ func Test_updateEndpointsMap(t *testing.T) { }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedStaleEndpoints: []endpointServicePair{{ - endpoint: "1.1.1.1:11", - servicePortName: makeServicePortName("ns1", "ep1", ""), + expectedStaleEndpoints: []proxy.ServiceEndpoint{{ + Endpoint: "1.1.1.1:11", + ServicePortName: makeServicePortName("ns1", "ep1", ""), }}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{}, @@ -1818,7 +1816,7 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.2:12", isLocal: true}, }, }, - expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p12"): true, }, @@ -1848,15 +1846,15 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.1:11", isLocal: false}, }, }, - expectedStaleEndpoints: []endpointServicePair{{ - endpoint: "1.1.1.2:11", - servicePortName: makeServicePortName("ns1", "ep1", "p11"), + expectedStaleEndpoints: []proxy.ServiceEndpoint{{ + Endpoint: "1.1.1.2:11", + ServicePortName: makeServicePortName("ns1", "ep1", "p11"), }, { - endpoint: "1.1.1.1:12", - servicePortName: makeServicePortName("ns1", "ep1", "p12"), + Endpoint: "1.1.1.1:12", + ServicePortName: makeServicePortName("ns1", "ep1", "p12"), }, { - endpoint: "1.1.1.2:12", - servicePortName: makeServicePortName("ns1", "ep1", "p12"), + Endpoint: "1.1.1.2:12", + ServicePortName: makeServicePortName("ns1", "ep1", "p12"), }}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{}, @@ -1881,7 +1879,7 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.2:12", isLocal: true}, }, }, - expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p12"): true, }, @@ -1909,9 +1907,9 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.1:11", isLocal: false}, }, }, - expectedStaleEndpoints: []endpointServicePair{{ - endpoint: "1.1.1.2:12", - servicePortName: makeServicePortName("ns1", "ep1", "p12"), + expectedStaleEndpoints: []proxy.ServiceEndpoint{{ + Endpoint: "1.1.1.2:12", + ServicePortName: makeServicePortName("ns1", "ep1", "p12"), }}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{}, @@ -1933,9 +1931,9 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.1:11", isLocal: false}, }, }, - expectedStaleEndpoints: []endpointServicePair{{ - endpoint: "1.1.1.1:11", - servicePortName: makeServicePortName("ns1", "ep1", "p11"), + expectedStaleEndpoints: []proxy.ServiceEndpoint{{ + Endpoint: "1.1.1.1:11", + ServicePortName: makeServicePortName("ns1", "ep1", "p11"), }}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p11-2"): true, @@ -1959,9 +1957,9 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.1:22", isLocal: false}, }, }, - expectedStaleEndpoints: []endpointServicePair{{ - endpoint: "1.1.1.1:11", - servicePortName: makeServicePortName("ns1", "ep1", "p11"), + expectedStaleEndpoints: []proxy.ServiceEndpoint{{ + Endpoint: "1.1.1.1:11", + ServicePortName: makeServicePortName("ns1", "ep1", "p11"), }}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{}, @@ -2016,21 +2014,21 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "4.4.4.4:44", isLocal: true}, }, }, - expectedStaleEndpoints: []endpointServicePair{{ - endpoint: "2.2.2.2:22", - servicePortName: makeServicePortName("ns2", "ep2", "p22"), + expectedStaleEndpoints: []proxy.ServiceEndpoint{{ + Endpoint: "2.2.2.2:22", + ServicePortName: makeServicePortName("ns2", "ep2", "p22"), }, { - endpoint: "2.2.2.22:22", - servicePortName: makeServicePortName("ns2", "ep2", "p22"), + Endpoint: "2.2.2.22:22", + ServicePortName: makeServicePortName("ns2", "ep2", "p22"), }, { - endpoint: "2.2.2.3:23", - servicePortName: makeServicePortName("ns2", "ep2", "p23"), + Endpoint: "2.2.2.3:23", + ServicePortName: makeServicePortName("ns2", "ep2", "p23"), }, { - endpoint: "4.4.4.5:44", - servicePortName: makeServicePortName("ns4", "ep4", "p44"), + Endpoint: "4.4.4.5:44", + ServicePortName: makeServicePortName("ns4", "ep4", "p44"), }, { - endpoint: "4.4.4.6:45", - servicePortName: makeServicePortName("ns4", "ep4", "p45"), + Endpoint: "4.4.4.6:45", + ServicePortName: makeServicePortName("ns4", "ep4", "p45"), }}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p12"): true, @@ -2054,7 +2052,7 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.1:11", isLocal: false}, }, }, - expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", ""): true, }, @@ -2076,7 +2074,7 @@ func Test_updateEndpointsMap(t *testing.T) { fp.OnEndpointsAdd(tc.previousEndpoints[i]) } } - updateEndpointsMap(fp.endpointsMap, &fp.endpointsChanges, fp.hostname) + proxy.UpdateEndpointsMap(fp.endpointsMap, fp.endpointsChanges) compareEndpointsMaps(t, tci, fp.endpointsMap, tc.oldEndpoints) // Now let's call appropriate handlers to get to state we want to be. @@ -2096,313 +2094,61 @@ func Test_updateEndpointsMap(t *testing.T) { fp.OnEndpointsUpdate(prev, curr) } } - result := updateEndpointsMap(fp.endpointsMap, &fp.endpointsChanges, fp.hostname) + result := proxy.UpdateEndpointsMap(fp.endpointsMap, fp.endpointsChanges) newMap := fp.endpointsMap compareEndpointsMaps(t, tci, newMap, tc.expectedResult) - if len(result.staleEndpoints) != len(tc.expectedStaleEndpoints) { - t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedStaleEndpoints), len(result.staleEndpoints), result.staleEndpoints) + if len(result.StaleEndpoints) != len(tc.expectedStaleEndpoints) { + t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedStaleEndpoints), len(result.StaleEndpoints), result.StaleEndpoints) } for _, x := range tc.expectedStaleEndpoints { - if result.staleEndpoints[x] != true { - t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.staleEndpoints) + found := false + for _, stale := range result.StaleEndpoints { + if stale == x { + found = true + break + } + } + if !found { + t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.StaleEndpoints) } } - if len(result.staleServiceNames) != len(tc.expectedStaleServiceNames) { - t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedStaleServiceNames), len(result.staleServiceNames), result.staleServiceNames) + if len(result.StaleServiceNames) != len(tc.expectedStaleServiceNames) { + t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedStaleServiceNames), len(result.StaleServiceNames), result.StaleServiceNames) } for svcName := range tc.expectedStaleServiceNames { - if result.staleServiceNames[svcName] != true { - t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.staleServiceNames) + found := false + for _, stale := range result.StaleServiceNames { + if stale == svcName { + found = true + break + } + } + if !found { + t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.StaleServiceNames) } } - if !reflect.DeepEqual(result.hcEndpoints, tc.expectedHealthchecks) { - t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedHealthchecks, result.hcEndpoints) + if !reflect.DeepEqual(result.HCEndpointsLocalIPSize, tc.expectedHealthchecks) { + t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedHealthchecks, result.HCEndpointsLocalIPSize) } } } -func compareEndpointsMaps(t *testing.T, tci int, newMap, expected map[proxy.ServicePortName][]*endpointsInfo) { +func compareEndpointsMaps(t *testing.T, tci int, newMap proxy.EndpointsMap, expected map[proxy.ServicePortName][]*endpointsInfo) { if len(newMap) != len(expected) { t.Errorf("[%d] expected %d results, got %d: %v", tci, len(expected), len(newMap), newMap) } for x := range expected { if len(newMap[x]) != len(expected[x]) { - t.Errorf("[%d] expected %d Endpoints for %v, got %d", tci, len(expected[x]), x, len(newMap[x])) + t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(expected[x]), x, len(newMap[x])) } else { for i := range expected[x] { - if *(newMap[x][i]) != *(expected[x][i]) { - t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, expected[x][i], newMap[x][i]) + newEp, ok := newMap[x][i].(*endpointsInfo) + if !ok { + t.Errorf("Failed to cast endpointsInfo") + continue } - } - } - } -} - -func Test_getLocalIPs(t *testing.T) { - testCases := []struct { - endpointsMap map[proxy.ServicePortName][]*endpointsInfo - expected map[types.NamespacedName]sets.String - }{{ - // Case[0]: nothing - endpointsMap: map[proxy.ServicePortName][]*endpointsInfo{}, - expected: map[types.NamespacedName]sets.String{}, - }, { - // Case[1]: unnamed port - endpointsMap: map[proxy.ServicePortName][]*endpointsInfo{ - makeServicePortName("ns1", "ep1", ""): { - {"1.1.1.1:11", false}, - }, - }, - expected: map[types.NamespacedName]sets.String{}, - }, { - // Case[2]: unnamed port local - endpointsMap: map[proxy.ServicePortName][]*endpointsInfo{ - makeServicePortName("ns1", "ep1", ""): { - {"1.1.1.1:11", true}, - }, - }, - expected: map[types.NamespacedName]sets.String{ - {Namespace: "ns1", Name: "ep1"}: sets.NewString("1.1.1.1"), - }, - }, { - // Case[3]: named local and non-local ports for the same IP. - endpointsMap: map[proxy.ServicePortName][]*endpointsInfo{ - makeServicePortName("ns1", "ep1", "p11"): { - {"1.1.1.1:11", false}, - {"1.1.1.2:11", true}, - }, - makeServicePortName("ns1", "ep1", "p12"): { - {"1.1.1.1:12", false}, - {"1.1.1.2:12", true}, - }, - }, - expected: map[types.NamespacedName]sets.String{ - {Namespace: "ns1", Name: "ep1"}: sets.NewString("1.1.1.2"), - }, - }, { - // Case[4]: named local and non-local ports for different IPs. - endpointsMap: map[proxy.ServicePortName][]*endpointsInfo{ - makeServicePortName("ns1", "ep1", "p11"): { - {"1.1.1.1:11", false}, - }, - makeServicePortName("ns2", "ep2", "p22"): { - {"2.2.2.2:22", true}, - {"2.2.2.22:22", true}, - }, - makeServicePortName("ns2", "ep2", "p23"): { - {"2.2.2.3:23", true}, - }, - makeServicePortName("ns4", "ep4", "p44"): { - {"4.4.4.4:44", true}, - {"4.4.4.5:44", false}, - }, - makeServicePortName("ns4", "ep4", "p45"): { - {"4.4.4.6:45", true}, - }, - }, - expected: map[types.NamespacedName]sets.String{ - {Namespace: "ns2", Name: "ep2"}: sets.NewString("2.2.2.2", "2.2.2.22", "2.2.2.3"), - {Namespace: "ns4", Name: "ep4"}: sets.NewString("4.4.4.4", "4.4.4.6"), - }, - }, { - // Case[5]: named port local and bad endpoints IP - endpointsMap: map[proxy.ServicePortName][]*endpointsInfo{ - makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "bad ip:11", isLocal: true}, - }, - }, - expected: map[types.NamespacedName]sets.String{}, - }} - - for tci, tc := range testCases { - // outputs - localIPs := getLocalIPs(tc.endpointsMap) - - if !reflect.DeepEqual(localIPs, tc.expected) { - t.Errorf("[%d] expected %#v, got %#v", tci, tc.expected, localIPs) - } - } -} - -// This is a coarse test, but it offers some modicum of confidence as the code is evolved. -func Test_endpointsToEndpointsMap(t *testing.T) { - testCases := []struct { - newEndpoints *api.Endpoints - expected map[proxy.ServicePortName][]*endpointsInfo - }{{ - // Case[0]: nothing - newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}), - expected: map[proxy.ServicePortName][]*endpointsInfo{}, - }, { - // Case[1]: no changes, unnamed port - newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { - ept.Subsets = []api.EndpointSubset{ - { - Addresses: []api.EndpointAddress{{ - IP: "1.1.1.1", - }}, - Ports: []api.EndpointPort{{ - Name: "", - Port: 11, - }}, - }, - } - }), - expected: map[proxy.ServicePortName][]*endpointsInfo{ - makeServicePortName("ns1", "ep1", ""): { - {"1.1.1.1:11", false}, - }, - }, - }, { - // Case[2]: no changes, named port - newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { - ept.Subsets = []api.EndpointSubset{ - { - Addresses: []api.EndpointAddress{{ - IP: "1.1.1.1", - }}, - Ports: []api.EndpointPort{{ - Name: "port", - Port: 11, - }}, - }, - } - }), - expected: map[proxy.ServicePortName][]*endpointsInfo{ - makeServicePortName("ns1", "ep1", "port"): { - {"1.1.1.1:11", false}, - }, - }, - }, { - // Case[3]: new port - newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { - ept.Subsets = []api.EndpointSubset{ - { - Addresses: []api.EndpointAddress{{ - IP: "1.1.1.1", - }}, - Ports: []api.EndpointPort{{ - Port: 11, - }}, - }, - } - }), - expected: map[proxy.ServicePortName][]*endpointsInfo{ - makeServicePortName("ns1", "ep1", ""): { - {"1.1.1.1:11", false}, - }, - }, - }, { - // Case[4]: remove port - newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}), - expected: map[proxy.ServicePortName][]*endpointsInfo{}, - }, { - // Case[5]: new IP and port - newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { - ept.Subsets = []api.EndpointSubset{ - { - Addresses: []api.EndpointAddress{{ - IP: "1.1.1.1", - }, { - IP: "2.2.2.2", - }}, - Ports: []api.EndpointPort{{ - Name: "p1", - Port: 11, - }, { - Name: "p2", - Port: 22, - }}, - }, - } - }), - expected: map[proxy.ServicePortName][]*endpointsInfo{ - makeServicePortName("ns1", "ep1", "p1"): { - {"1.1.1.1:11", false}, - {"2.2.2.2:11", false}, - }, - makeServicePortName("ns1", "ep1", "p2"): { - {"1.1.1.1:22", false}, - {"2.2.2.2:22", false}, - }, - }, - }, { - // Case[6]: remove IP and port - newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { - ept.Subsets = []api.EndpointSubset{ - { - Addresses: []api.EndpointAddress{{ - IP: "1.1.1.1", - }}, - Ports: []api.EndpointPort{{ - Name: "p1", - Port: 11, - }}, - }, - } - }), - expected: map[proxy.ServicePortName][]*endpointsInfo{ - makeServicePortName("ns1", "ep1", "p1"): { - {"1.1.1.1:11", false}, - }, - }, - }, { - // Case[7]: rename port - newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { - ept.Subsets = []api.EndpointSubset{ - { - Addresses: []api.EndpointAddress{{ - IP: "1.1.1.1", - }}, - Ports: []api.EndpointPort{{ - Name: "p2", - Port: 11, - }}, - }, - } - }), - expected: map[proxy.ServicePortName][]*endpointsInfo{ - makeServicePortName("ns1", "ep1", "p2"): { - {"1.1.1.1:11", false}, - }, - }, - }, { - // Case[8]: renumber port - newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { - ept.Subsets = []api.EndpointSubset{ - { - Addresses: []api.EndpointAddress{{ - IP: "1.1.1.1", - }}, - Ports: []api.EndpointPort{{ - Name: "p1", - Port: 22, - }}, - }, - } - }), - expected: map[proxy.ServicePortName][]*endpointsInfo{ - makeServicePortName("ns1", "ep1", "p1"): { - {"1.1.1.1:22", false}, - }, - }, - }} - - for tci, tc := range testCases { - // outputs - newEndpoints := endpointsToEndpointsMap(tc.newEndpoints, "host") - - if len(newEndpoints) != len(tc.expected) { - t.Errorf("[%d] expected %d new, got %d: %v", tci, len(tc.expected), len(newEndpoints), spew.Sdump(newEndpoints)) - } - for x := range tc.expected { - if len(newEndpoints[x]) != len(tc.expected[x]) { - t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(tc.expected[x]), x, len(newEndpoints[x])) - } else { - for i := range newEndpoints[x] { - if *(newEndpoints[x][i]) != *(tc.expected[x][i]) { - t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, tc.expected[x][i], *(newEndpoints[x][i])) - } + if *newEp != *(expected[x][i]) { + t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, expected[x][i], newEp) } } }