From 2f250435fd34151bf4c00fab86f9ea3ff29c12d1 Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Tue, 11 Apr 2017 09:59:31 +0200 Subject: [PATCH] Don't rebuild endpoints map in iptables kube-proxy all the time. --- pkg/proxy/iptables/BUILD | 1 + pkg/proxy/iptables/proxier.go | 216 ++++--- pkg/proxy/iptables/proxier_test.go | 891 ++++++++++++++++++----------- 3 files changed, 701 insertions(+), 407 deletions(-) diff --git a/pkg/proxy/iptables/BUILD b/pkg/proxy/iptables/BUILD index 87305ca4fdd..0a3de1effa8 100644 --- a/pkg/proxy/iptables/BUILD +++ b/pkg/proxy/iptables/BUILD @@ -51,6 +51,7 @@ go_test( "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", ], ) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index bd547b5aa60..74c155b4005 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -195,24 +195,44 @@ func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, se return info } -type endpointsMap map[types.NamespacedName]*api.Endpoints +type endpointsChange struct { + previous *api.Endpoints + current *api.Endpoints +} +type endpointsChangeMap map[types.NamespacedName]*endpointsChange type serviceMap map[types.NamespacedName]*api.Service type proxyServiceMap map[proxy.ServicePortName]*serviceInfo -type proxyEndpointMap map[proxy.ServicePortName][]*endpointsInfo +type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo + +func (em proxyEndpointsMap) merge(other proxyEndpointsMap) { + for svcPort := range other { + em[svcPort] = other[svcPort] + } +} + +func (em proxyEndpointsMap) unmerge(other proxyEndpointsMap) { + for svcPort := range other { + delete(em, svcPort) + } +} // Proxier is an iptables based proxy for connections between a localhost:lport // and services that provide the actual backends. type Proxier struct { mu sync.Mutex // protects the following fields serviceMap proxyServiceMap - endpointsMap proxyEndpointMap - portsMap map[localPort]closeable - // allServices and allEndpoints should never be modified by proxier - the + endpointsMap proxyEndpointsMap + // endpointsChanges contains all changes to endpoints that happened since + // last syncProxyRules call. For a single object, changes are accumulated, + // i.e. previous is state from before all of them, current is state after + // applying all of those. + endpointsChanges endpointsChangeMap + portsMap map[localPort]closeable + // allServices should never be modified by proxier - the // pointers are shared with higher layers of kube-proxy. They are guaranteed // to not be modified in the meantime, but also require to be not modified // by Proxier. - allEndpoints endpointsMap - allServices serviceMap + allServices serviceMap // endpointsSynced and servicesSynced are set to true when corresponding // objects are synced after startup. This is used to avoid updating iptables @@ -330,24 +350,24 @@ func NewProxier(ipt utiliptables.Interface, } return &Proxier{ - serviceMap: make(proxyServiceMap), - endpointsMap: make(proxyEndpointMap), - portsMap: make(map[localPort]closeable), - allEndpoints: make(endpointsMap), - allServices: make(serviceMap), - syncPeriod: syncPeriod, - minSyncPeriod: minSyncPeriod, - throttle: throttle, - iptables: ipt, - masqueradeAll: masqueradeAll, - masqueradeMark: masqueradeMark, - exec: exec, - clusterCIDR: clusterCIDR, - hostname: hostname, - nodeIP: nodeIP, - portMapper: &listenPortOpener{}, - recorder: recorder, - healthChecker: healthChecker, + serviceMap: make(proxyServiceMap), + endpointsMap: make(proxyEndpointsMap), + endpointsChanges: make(endpointsChangeMap), + portsMap: make(map[localPort]closeable), + allServices: make(serviceMap), + syncPeriod: syncPeriod, + minSyncPeriod: minSyncPeriod, + throttle: throttle, + iptables: ipt, + masqueradeAll: masqueradeAll, + masqueradeMark: masqueradeMark, + exec: exec, + clusterCIDR: clusterCIDR, + hostname: hostname, + nodeIP: nodeIP, + portMapper: &listenPortOpener{}, + recorder: recorder, + healthChecker: healthChecker, }, nil } @@ -566,16 +586,32 @@ func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) { proxier.mu.Lock() defer proxier.mu.Unlock() - proxier.allEndpoints[namespacedName] = endpoints + + change, exists := proxier.endpointsChanges[namespacedName] + if !exists { + change = &endpointsChange{} + change.previous = nil + proxier.endpointsChanges[namespacedName] = change + } + change.current = endpoints + proxier.syncProxyRules(syncReasonEndpoints) } -func (proxier *Proxier) OnEndpointsUpdate(_, endpoints *api.Endpoints) { +func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} proxier.mu.Lock() defer proxier.mu.Unlock() - proxier.allEndpoints[namespacedName] = endpoints + + change, exists := proxier.endpointsChanges[namespacedName] + if !exists { + change = &endpointsChange{} + change.previous = oldEndpoints + proxier.endpointsChanges[namespacedName] = change + } + change.current = endpoints + proxier.syncProxyRules(syncReasonEndpoints) } @@ -584,7 +620,15 @@ func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) { proxier.mu.Lock() defer proxier.mu.Unlock() - delete(proxier.allEndpoints, namespacedName) + + change, exists := proxier.endpointsChanges[namespacedName] + if !exists { + change = &endpointsChange{} + change.previous = endpoints + proxier.endpointsChanges[namespacedName] = change + } + change.current = nil + proxier.syncProxyRules(syncReasonEndpoints) } @@ -595,45 +639,65 @@ func (proxier *Proxier) OnEndpointsSynced() { proxier.syncProxyRules(syncReasonEndpoints) } -// Convert a slice of api.Endpoints objects into a map of service-port -> endpoints. -func buildNewEndpointsMap(allEndpoints endpointsMap, curMap proxyEndpointMap, hostname string) (newMap proxyEndpointMap, hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) { - - // return values - newMap = make(proxyEndpointMap) - hcEndpoints = make(map[types.NamespacedName]int) +// is updated by this function (based on the given changes). +// map is cleared after applying them. +func updateEndpointsMap( + endpointsMap proxyEndpointsMap, + changes *endpointsChangeMap, + hostname string) (syncRequired bool, hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) { + syncRequired = false staleSet = make(map[endpointServicePair]bool) - - // Update endpoints for services. - for _, endpoints := range allEndpoints { - accumulateEndpointsMap(endpoints, hostname, &newMap) + for _, change := range *changes { + oldEndpointsMap := endpointsToEndpointsMap(change.previous, hostname) + newEndpointsMap := endpointsToEndpointsMap(change.current, hostname) + if !reflect.DeepEqual(oldEndpointsMap, newEndpointsMap) { + endpointsMap.unmerge(oldEndpointsMap) + endpointsMap.merge(newEndpointsMap) + detectStaleConnections(oldEndpointsMap, newEndpointsMap, staleSet) + syncRequired = true + } } - // Check stale connections against endpoints missing from the update. - // TODO: we should really only mark a connection stale if the proto was UDP - // and the (ip, port, proto) was removed from the endpoints. - for svcPort, epList := range curMap { + *changes = make(endpointsChangeMap) + + if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) { + return + } + + // TODO: If this will appear to be computationally expensive, consider + // computing this incrementally similarly to endpointsMap. + hcEndpoints = make(map[types.NamespacedName]int) + localIPs := getLocalIPs(endpointsMap) + for nsn, ips := range localIPs { + hcEndpoints[nsn] = len(ips) + } + + return syncRequired, hcEndpoints, staleSet +} + +// are modified by this function with detected stale +// connections. +func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap, staleEndpoints map[endpointServicePair]bool) { + for svcPort, epList := range oldEndpointsMap { for _, ep := range epList { stale := true - for i := range newMap[svcPort] { - if *newMap[svcPort][i] == *ep { + for i := range newEndpointsMap[svcPort] { + if *newEndpointsMap[svcPort][i] == *ep { stale = false break } } if stale { glog.V(4).Infof("Stale endpoint %v -> %v", svcPort, ep.endpoint) - staleSet[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPort}] = true + staleEndpoints[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPort}] = true } } } +} - if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) { - return - } - - // accumulate local IPs per service, ignoring ports - localIPs := map[types.NamespacedName]sets.String{} - for svcPort := range newMap { - for _, ep := range newMap[svcPort] { +func getLocalIPs(endpointsMap proxyEndpointsMap) map[types.NamespacedName]sets.String { + localIPs := make(map[types.NamespacedName]sets.String) + for svcPort := range endpointsMap { + for _, ep := range endpointsMap[svcPort] { if ep.isLocal { nsn := svcPort.NamespacedName if localIPs[nsn] == nil { @@ -644,25 +708,19 @@ func buildNewEndpointsMap(allEndpoints endpointsMap, curMap proxyEndpointMap, ho } } } - // produce a count per service - for nsn, ips := range localIPs { - hcEndpoints[nsn] = len(ips) - } - - return newMap, hcEndpoints, staleSet + return localIPs } -// Gather information about all the endpoint state for a given api.Endpoints. -// This can not report complete info on stale connections because it has limited -// scope - it only knows one Endpoints, but sees the whole current map. That -// cleanup has to be done above. +// Translates single Endpoints object to proxyEndpointsMap. +// This function is used for incremental updated of endpointsMap. // // NOTE: endpoints object should NOT be modified. -// -// TODO: this could be simplified: -// - the test for this is overlapped by the test for buildNewEndpointsMap -// - naming is poor and responsibilities are muddled -func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, newEndpoints *proxyEndpointMap) { +func endpointsToEndpointsMap(endpoints *api.Endpoints, hostname string) proxyEndpointsMap { + if endpoints == nil { + return nil + } + + endpointsMap := make(proxyEndpointsMap) // We need to build a map of portname -> all ip:ports for that // portname. Explode Endpoints.Subsets[*] into this structure. for i := range endpoints.Subsets { @@ -687,17 +745,18 @@ func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, newEndpoi endpoint: net.JoinHostPort(addr.IP, strconv.Itoa(int(port.Port))), isLocal: addr.NodeName != nil && *addr.NodeName == hostname, } - (*newEndpoints)[svcPort] = append((*newEndpoints)[svcPort], epInfo) + endpointsMap[svcPort] = append(endpointsMap[svcPort], epInfo) } if glog.V(3) { newEPList := []string{} - for _, ep := range (*newEndpoints)[svcPort] { + for _, ep := range endpointsMap[svcPort] { newEPList = append(newEPList, ep.endpoint) } glog.Infof("Setting endpoints for %q to %+v", svcPort, newEPList) } } } + return endpointsMap } // portProtoHash takes the ServicePortName and protocol for a service @@ -784,7 +843,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { glog.V(4).Infof("syncProxyRules(%s) took %v", reason, time.Since(start)) }() // don't sync rules till we've received services and endpoints - if !proxier.endpointsSynced || proxier.allServices == nil { + if !proxier.endpointsSynced || !proxier.servicesSynced { glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master") return } @@ -798,11 +857,11 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { return } - // Figure out the new endpoints we need to activate. - newEndpoints, hcEndpoints, staleEndpoints := buildNewEndpointsMap(proxier.allEndpoints, proxier.endpointsMap, proxier.hostname) + endpointsSyncRequired, hcEndpoints, staleEndpoints := updateEndpointsMap( + proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname) // If this was called because of an endpoints update, but nothing actionable has changed, skip it. - if reason == syncReasonEndpoints && reflect.DeepEqual(newEndpoints, proxier.endpointsMap) { + if reason == syncReasonEndpoints && !endpointsSyncRequired { glog.V(3).Infof("Skipping iptables sync because nothing changed") return } @@ -1157,7 +1216,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { // table doesn't currently have the same per-service structure that // the nat table does, so we just stick this into the kube-services // chain. - if len(newEndpoints[svcName]) == 0 { + if len(proxier.endpointsMap[svcName]) == 0 { writeLine(filterRules, "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), @@ -1170,7 +1229,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { } // If the service has no endpoints then reject packets. - if len(newEndpoints[svcName]) == 0 { + if len(proxier.endpointsMap[svcName]) == 0 { writeLine(filterRules, "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), @@ -1189,7 +1248,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { // These two slices parallel each other - keep in sync endpoints := make([]*endpointsInfo, 0) endpointChains := make([]utiliptables.Chain, 0) - for _, ep := range newEndpoints[svcName] { + for _, ep := range proxier.endpointsMap[svcName] { endpoints = append(endpoints, ep) endpointChain := servicePortEndpointChainName(svcNameString, protocol, ep.endpoint) endpointChains = append(endpointChains, endpointChain) @@ -1379,7 +1438,6 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { // Finish housekeeping. proxier.serviceMap = newServices - proxier.endpointsMap = newEndpoints // TODO: these and clearUDPConntrackForPort() could be made more consistent. utilproxy.DeleteServiceConnections(proxier.exec, staleServices.List()) diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 72f7c4a6ab2..a419f1c6d60 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -30,6 +30,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/service" "k8s.io/kubernetes/pkg/proxy" @@ -383,17 +384,18 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { // TODO: Call NewProxier after refactoring out the goroutine // invocation into a Run() method. return &Proxier{ - exec: &exec.FakeExec{}, - serviceMap: make(map[proxy.ServicePortName]*serviceInfo), - iptables: ipt, - clusterCIDR: "10.0.0.0/24", - allEndpoints: make(endpointsMap), - allServices: make(serviceMap), - endpointsSynced: true, - hostname: testHostname, - portsMap: make(map[localPort]closeable), - portMapper: &fakePortOpener{[]*localPort{}}, - healthChecker: newFakeHealthChecker(), + exec: &exec.FakeExec{}, + serviceMap: make(proxyServiceMap), + endpointsMap: make(proxyEndpointsMap), + endpointsChanges: make(endpointsChangeMap), + iptables: ipt, + clusterCIDR: "10.0.0.0/24", + allServices: make(serviceMap), + servicesSynced: true, + hostname: testHostname, + portsMap: make(map[localPort]closeable), + portMapper: &fakePortOpener{[]*localPort{}}, + healthChecker: newFakeHealthChecker(), } } @@ -577,6 +579,7 @@ func TestClusterIPReject(t *testing.T) { }} }), ) + makeEndpointsMap(fp) fp.syncProxyRules(syncReasonForce) svcChain := string(servicePortChainName(svcPortName.String(), strings.ToLower(string(api.ProtocolTCP)))) @@ -612,7 +615,7 @@ func TestClusterIPEndpointsJump(t *testing.T) { ) epIP := "10.180.0.1" - fp.allEndpoints = makeEndpointsMap( + makeEndpointsMap(fp, makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -676,7 +679,7 @@ func TestLoadBalancer(t *testing.T) { ) epIP := "10.180.0.1" - fp.allEndpoints = makeEndpointsMap( + makeEndpointsMap(fp, makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -733,7 +736,7 @@ func TestNodePort(t *testing.T) { ) epIP := "10.180.0.1" - fp.allEndpoints = makeEndpointsMap( + makeEndpointsMap(fp, makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -781,6 +784,7 @@ func TestNodePortReject(t *testing.T) { }} }), ) + makeEndpointsMap(fp) fp.syncProxyRules(syncReasonForce) @@ -827,7 +831,7 @@ func TestOnlyLocalLoadBalancing(t *testing.T) { epIP2 := "10.180.2.1" epStrLocal := fmt.Sprintf("%s:%d", epIP1, svcPort) epStrNonLocal := fmt.Sprintf("%s:%d", epIP2, svcPort) - fp.allEndpoints = makeEndpointsMap( + makeEndpointsMap(fp, makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -918,7 +922,7 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable epIP2 := "10.180.2.1" epStrLocal := fmt.Sprintf("%s:%d", epIP1, svcPort) epStrNonLocal := fmt.Sprintf("%s:%d", epIP2, svcPort) - fp.allEndpoints = makeEndpointsMap( + makeEndpointsMap(fp, makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1210,8 +1214,86 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { } } +func Test_getLocalIPs(t *testing.T) { + testCases := []struct { + endpointsMap map[proxy.ServicePortName][]*endpointsInfo + expected map[types.NamespacedName]sets.String + }{{ + // Case[0]: nothing + endpointsMap: map[proxy.ServicePortName][]*endpointsInfo{}, + expected: map[types.NamespacedName]sets.String{}, + }, { + // Case[1]: unnamed port + endpointsMap: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", ""): { + {"1.1.1.1:11", false}, + }, + }, + expected: map[types.NamespacedName]sets.String{}, + }, { + // Case[2]: unnamed port local + endpointsMap: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", ""): { + {"1.1.1.1:11", true}, + }, + }, + expected: map[types.NamespacedName]sets.String{ + types.NamespacedName{Namespace: "ns1", Name: "ep1"}: sets.NewString("1.1.1.1"), + }, + }, { + // Case[3]: named local and non-local ports for the same IP. + endpointsMap: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + {"1.1.1.2:11", true}, + }, + makeServicePortName("ns1", "ep1", "p12"): { + {"1.1.1.1:12", false}, + {"1.1.1.2:12", true}, + }, + }, + expected: map[types.NamespacedName]sets.String{ + types.NamespacedName{Namespace: "ns1", Name: "ep1"}: sets.NewString("1.1.1.2"), + }, + }, { + // Case[4]: named local and non-local ports for different IPs. + endpointsMap: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + }, + makeServicePortName("ns2", "ep2", "p22"): { + {"2.2.2.2:22", true}, + {"2.2.2.22:22", true}, + }, + makeServicePortName("ns2", "ep2", "p23"): { + {"2.2.2.3:23", true}, + }, + makeServicePortName("ns4", "ep4", "p44"): { + {"4.4.4.4:44", true}, + {"4.4.4.5:44", false}, + }, + makeServicePortName("ns4", "ep4", "p45"): { + {"4.4.4.6:45", true}, + }, + }, + expected: map[types.NamespacedName]sets.String{ + types.NamespacedName{Namespace: "ns2", Name: "ep2"}: sets.NewString("2.2.2.2", "2.2.2.22", "2.2.2.3"), + types.NamespacedName{Namespace: "ns4", Name: "ep4"}: sets.NewString("4.4.4.4", "4.4.4.6"), + }, + }} + + for tci, tc := range testCases { + // outputs + localIPs := getLocalIPs(tc.endpointsMap) + + if !reflect.DeepEqual(localIPs, tc.expected) { + t.Errorf("[%d] expected %#v, got %#v", tci, tc.expected, localIPs) + } + } +} + // This is a coarse test, but it offers some modicum of confidence as the code is evolved. -func Test_accumulateEndpointsMap(t *testing.T) { +func Test_endpointsToEndpointsMap(t *testing.T) { testCases := []struct { newEndpoints *api.Endpoints expected map[proxy.ServicePortName][]*endpointsInfo @@ -1376,8 +1458,7 @@ func Test_accumulateEndpointsMap(t *testing.T) { for tci, tc := range testCases { // outputs - newEndpoints := make(proxyEndpointMap) - accumulateEndpointsMap(tc.newEndpoints, "host", &newEndpoints) + newEndpoints := endpointsToEndpointsMap(tc.newEndpoints, "host") if len(newEndpoints) != len(tc.expected) { t.Errorf("[%d] expected %d new, got %d: %v", tci, len(tc.expected), len(newEndpoints), spew.Sdump(newEndpoints)) @@ -1407,13 +1488,14 @@ func makeTestEndpoints(namespace, name string, eptFunc func(*api.Endpoints)) *ap return ept } -func makeEndpointsMap(allEndpoints ...*api.Endpoints) endpointsMap { - result := make(endpointsMap) - for _, endpoints := range allEndpoints { - namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - result[namespacedName] = endpoints +func makeEndpointsMap(proxier *Proxier, allEndpoints ...*api.Endpoints) { + for i := range allEndpoints { + proxier.OnEndpointsAdd(allEndpoints[i]) } - return result + + proxier.mu.Lock() + defer proxier.mu.Unlock() + proxier.endpointsSynced = true } func makeNSN(namespace, name string) types.NamespacedName { @@ -1436,36 +1518,350 @@ func makeServiceMap(allServices ...*api.Service) serviceMap { return result } -func Test_buildNewEndpointsMap(t *testing.T) { +func compareEndpointsMaps(t *testing.T, tci int, newMap, expected map[proxy.ServicePortName][]*endpointsInfo) { + if len(newMap) != len(expected) { + t.Errorf("[%d] expected %d results, got %d: %v", tci, len(expected), len(newMap), newMap) + } + for x := range expected { + if len(newMap[x]) != len(expected[x]) { + t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(expected[x]), x, len(newMap[x])) + } else { + for i := range expected[x] { + if *(newMap[x][i]) != *(expected[x][i]) { + t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, expected[x][i], newMap[x][i]) + } + } + } + } +} + +func Test_updateEndpointsMap(t *testing.T) { var nodeName = "host" + unnamedPort := func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Port: 11, + }}, + }} + } + unnamedPortLocal := func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + NodeName: &nodeName, + }}, + Ports: []api.EndpointPort{{ + Port: 11, + }}, + }} + } + namedPortLocal := func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + NodeName: &nodeName, + }}, + Ports: []api.EndpointPort{{ + Name: "p11", + Port: 11, + }}, + }} + } + namedPort := func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "p11", + Port: 11, + }}, + }} + } + namedPortRenamed := func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "p11-2", + Port: 11, + }}, + }} + } + namedPortRenumbered := func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "p11", + Port: 22, + }}, + }} + } + namedPortsLocalNoLocal := func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }, { + IP: "1.1.1.2", + NodeName: &nodeName, + }}, + Ports: []api.EndpointPort{{ + Name: "p11", + Port: 11, + }, { + Name: "p12", + Port: 12, + }}, + }} + } + multipleSubsets := func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "p11", + Port: 11, + }}, + }, { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.2", + }}, + Ports: []api.EndpointPort{{ + Name: "p12", + Port: 12, + }}, + }} + } + multipleSubsetsWithLocal := func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "p11", + Port: 11, + }}, + }, { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.2", + NodeName: &nodeName, + }}, + Ports: []api.EndpointPort{{ + Name: "p12", + Port: 12, + }}, + }} + } + multipleSubsetsMultiplePortsLocal := func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + NodeName: &nodeName, + }}, + Ports: []api.EndpointPort{{ + Name: "p11", + Port: 11, + }, { + Name: "p12", + Port: 12, + }}, + }, { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.3", + }}, + Ports: []api.EndpointPort{{ + Name: "p13", + Port: 13, + }}, + }} + } + multipleSubsetsIPsPorts1 := func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }, { + IP: "1.1.1.2", + NodeName: &nodeName, + }}, + Ports: []api.EndpointPort{{ + Name: "p11", + Port: 11, + }, { + Name: "p12", + Port: 12, + }}, + }, { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.3", + }, { + IP: "1.1.1.4", + NodeName: &nodeName, + }}, + Ports: []api.EndpointPort{{ + Name: "p13", + Port: 13, + }, { + Name: "p14", + Port: 14, + }}, + }} + } + multipleSubsetsIPsPorts2 := func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "2.2.2.1", + }, { + IP: "2.2.2.2", + NodeName: &nodeName, + }}, + Ports: []api.EndpointPort{{ + Name: "p21", + Port: 21, + }, { + Name: "p22", + Port: 22, + }}, + }} + } + complexBefore1 := func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "p11", + Port: 11, + }}, + }} + } + complexBefore2 := func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "2.2.2.2", + NodeName: &nodeName, + }, { + IP: "2.2.2.22", + NodeName: &nodeName, + }}, + Ports: []api.EndpointPort{{ + Name: "p22", + Port: 22, + }}, + }, { + Addresses: []api.EndpointAddress{{ + IP: "2.2.2.3", + NodeName: &nodeName, + }}, + Ports: []api.EndpointPort{{ + Name: "p23", + Port: 23, + }}, + }} + } + complexBefore4 := func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "4.4.4.4", + NodeName: &nodeName, + }, { + IP: "4.4.4.5", + NodeName: &nodeName, + }}, + Ports: []api.EndpointPort{{ + Name: "p44", + Port: 44, + }}, + }, { + Addresses: []api.EndpointAddress{{ + IP: "4.4.4.6", + NodeName: &nodeName, + }}, + Ports: []api.EndpointPort{{ + Name: "p45", + Port: 45, + }}, + }} + } + complexAfter1 := func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }, { + IP: "1.1.1.11", + }}, + Ports: []api.EndpointPort{{ + Name: "p11", + Port: 11, + }}, + }, { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.2", + }}, + Ports: []api.EndpointPort{{ + Name: "p12", + Port: 12, + }, { + Name: "p122", + Port: 122, + }}, + }} + } + complexAfter3 := func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "3.3.3.3", + }}, + Ports: []api.EndpointPort{{ + Name: "p33", + Port: 33, + }}, + }} + } + complexAfter4 := func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "4.4.4.4", + NodeName: &nodeName, + }}, + Ports: []api.EndpointPort{{ + Name: "p44", + Port: 44, + }}, + }} + } + testCases := []struct { - newEndpoints map[types.NamespacedName]*api.Endpoints + // previousEndpoints and currentEndpoints are used to call appropriate + // handlers OnEndpoints* (based on whether corresponding values are nil + // or non-nil) and must be of equal length. + previousEndpoints []*api.Endpoints + currentEndpoints []*api.Endpoints oldEndpoints map[proxy.ServicePortName][]*endpointsInfo expectedResult map[proxy.ServicePortName][]*endpointsInfo expectedStale []endpointServicePair expectedHealthchecks map[types.NamespacedName]int }{{ // Case[0]: nothing - newEndpoints: map[types.NamespacedName]*api.Endpoints{}, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, expectedStale: []endpointServicePair{}, expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[1]: no change, unnamed port - newEndpoints: makeEndpointsMap( - makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { - ept.Subsets = []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{ - IP: "1.1.1.1", - }}, - Ports: []api.EndpointPort{{ - Port: 11, - }}, - }} - }), - ), + previousEndpoints: []*api.Endpoints{ + makeTestEndpoints("ns1", "ep1", unnamedPort), + }, + currentEndpoints: []*api.Endpoints{ + makeTestEndpoints("ns1", "ep1", unnamedPort), + }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", ""): { {"1.1.1.1:11", false}, @@ -1480,20 +1876,12 @@ func Test_buildNewEndpointsMap(t *testing.T) { expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[2]: no change, named port, local - newEndpoints: makeEndpointsMap( - makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { - ept.Subsets = []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{ - IP: "1.1.1.1", - NodeName: &nodeName, - }}, - Ports: []api.EndpointPort{{ - Name: "p11", - Port: 11, - }}, - }} - }), - ), + previousEndpoints: []*api.Endpoints{ + makeTestEndpoints("ns1", "ep1", namedPortLocal), + }, + currentEndpoints: []*api.Endpoints{ + makeTestEndpoints("ns1", "ep1", namedPortLocal), + }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", true}, @@ -1510,27 +1898,12 @@ func Test_buildNewEndpointsMap(t *testing.T) { }, }, { // Case[3]: no change, multiple subsets - newEndpoints: makeEndpointsMap( - makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { - ept.Subsets = []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{ - IP: "1.1.1.1", - }}, - Ports: []api.EndpointPort{{ - Name: "p11", - Port: 11, - }}, - }, { - Addresses: []api.EndpointAddress{{ - IP: "1.1.1.2", - }}, - Ports: []api.EndpointPort{{ - Name: "p12", - Port: 12, - }}, - }} - }), - ), + previousEndpoints: []*api.Endpoints{ + makeTestEndpoints("ns1", "ep1", multipleSubsets), + }, + currentEndpoints: []*api.Endpoints{ + makeTestEndpoints("ns1", "ep1", multipleSubsets), + }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", false}, @@ -1551,31 +1924,12 @@ func Test_buildNewEndpointsMap(t *testing.T) { expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[4]: no change, multiple subsets, multiple ports, local - newEndpoints: makeEndpointsMap( - makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { - ept.Subsets = []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{ - IP: "1.1.1.1", - NodeName: &nodeName, - }}, - Ports: []api.EndpointPort{{ - Name: "p11", - Port: 11, - }, { - Name: "p12", - Port: 12, - }}, - }, { - Addresses: []api.EndpointAddress{{ - IP: "1.1.1.3", - }}, - Ports: []api.EndpointPort{{ - Name: "p13", - Port: 13, - }}, - }} - }), - ), + previousEndpoints: []*api.Endpoints{ + makeTestEndpoints("ns1", "ep1", multipleSubsetsMultiplePortsLocal), + }, + currentEndpoints: []*api.Endpoints{ + makeTestEndpoints("ns1", "ep1", multipleSubsetsMultiplePortsLocal), + }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", true}, @@ -1604,56 +1958,14 @@ func Test_buildNewEndpointsMap(t *testing.T) { }, }, { // Case[5]: no change, multiple endpoints, subsets, IPs, and ports - newEndpoints: makeEndpointsMap( - makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { - ept.Subsets = []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{ - IP: "1.1.1.1", - }, { - IP: "1.1.1.2", - NodeName: &nodeName, - }}, - Ports: []api.EndpointPort{{ - Name: "p11", - Port: 11, - }, { - Name: "p12", - Port: 12, - }}, - }, { - Addresses: []api.EndpointAddress{{ - IP: "1.1.1.3", - }, { - IP: "1.1.1.4", - NodeName: &nodeName, - }}, - Ports: []api.EndpointPort{{ - Name: "p13", - Port: 13, - }, { - Name: "p14", - Port: 14, - }}, - }} - }), - makeTestEndpoints("ns2", "ep2", func(ept *api.Endpoints) { - ept.Subsets = []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{ - IP: "2.2.2.1", - }, { - IP: "2.2.2.2", - NodeName: &nodeName, - }}, - Ports: []api.EndpointPort{{ - Name: "p21", - Port: 21, - }, { - Name: "p22", - Port: 22, - }}, - }} - }), - ), + previousEndpoints: []*api.Endpoints{ + makeTestEndpoints("ns1", "ep1", multipleSubsetsIPsPorts1), + makeTestEndpoints("ns2", "ep2", multipleSubsetsIPsPorts2), + }, + currentEndpoints: []*api.Endpoints{ + makeTestEndpoints("ns1", "ep1", multipleSubsetsIPsPorts1), + makeTestEndpoints("ns2", "ep2", multipleSubsetsIPsPorts2), + }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", false}, @@ -1713,20 +2025,13 @@ func Test_buildNewEndpointsMap(t *testing.T) { }, }, { // Case[6]: add an Endpoints - newEndpoints: makeEndpointsMap( - makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { - ept.Subsets = []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{ - IP: "1.1.1.1", - NodeName: &nodeName, - }}, - Ports: []api.EndpointPort{{ - Port: 11, - }}, - }} - }), - ), - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ /* empty */ }, + previousEndpoints: []*api.Endpoints{ + nil, + }, + currentEndpoints: []*api.Endpoints{ + makeTestEndpoints("ns1", "ep1", unnamedPortLocal), + }, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", ""): { {"1.1.1.1:11", true}, @@ -1738,7 +2043,12 @@ func Test_buildNewEndpointsMap(t *testing.T) { }, }, { // Case[7]: remove an Endpoints - newEndpoints: makeEndpointsMap(), + previousEndpoints: []*api.Endpoints{ + makeTestEndpoints("ns1", "ep1", unnamedPortLocal), + }, + currentEndpoints: []*api.Endpoints{ + nil, + }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", ""): { {"1.1.1.1:11", true}, @@ -1752,25 +2062,12 @@ func Test_buildNewEndpointsMap(t *testing.T) { expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[8]: add an IP and port - newEndpoints: makeEndpointsMap( - makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { - ept.Subsets = []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{ - IP: "1.1.1.1", - }, { - IP: "1.1.1.2", - NodeName: &nodeName, - }}, - Ports: []api.EndpointPort{{ - Name: "p11", - Port: 11, - }, { - Name: "p12", - Port: 12, - }}, - }} - }), - ), + previousEndpoints: []*api.Endpoints{ + makeTestEndpoints("ns1", "ep1", namedPort), + }, + currentEndpoints: []*api.Endpoints{ + makeTestEndpoints("ns1", "ep1", namedPortsLocalNoLocal), + }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", false}, @@ -1792,27 +2089,20 @@ func Test_buildNewEndpointsMap(t *testing.T) { }, }, { // Case[9]: remove an IP and port - newEndpoints: makeEndpointsMap( - makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { - ept.Subsets = []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{ - IP: "1.1.1.1", - }}, - Ports: []api.EndpointPort{{ - Name: "p11", - Port: 11, - }}, - }} - }), - ), + previousEndpoints: []*api.Endpoints{ + makeTestEndpoints("ns1", "ep1", namedPortsLocalNoLocal), + }, + currentEndpoints: []*api.Endpoints{ + makeTestEndpoints("ns1", "ep1", namedPort), + }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", false}, - {"1.1.1.2:11", false}, + {"1.1.1.2:11", true}, }, makeServicePortName("ns1", "ep1", "p12"): { {"1.1.1.1:12", false}, - {"1.1.1.2:12", false}, + {"1.1.1.2:12", true}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ @@ -1833,28 +2123,12 @@ func Test_buildNewEndpointsMap(t *testing.T) { expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[10]: add a subset - newEndpoints: makeEndpointsMap( - makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { - ept.Subsets = []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{ - IP: "1.1.1.1", - }}, - Ports: []api.EndpointPort{{ - Name: "p11", - Port: 11, - }}, - }, { - Addresses: []api.EndpointAddress{{ - IP: "2.2.2.2", - NodeName: &nodeName, - }}, - Ports: []api.EndpointPort{{ - Name: "p22", - Port: 22, - }}, - }} - }), - ), + previousEndpoints: []*api.Endpoints{ + makeTestEndpoints("ns1", "ep1", namedPort), + }, + currentEndpoints: []*api.Endpoints{ + makeTestEndpoints("ns1", "ep1", multipleSubsetsWithLocal), + }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", false}, @@ -1864,8 +2138,8 @@ func Test_buildNewEndpointsMap(t *testing.T) { makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", false}, }, - makeServicePortName("ns1", "ep1", "p22"): { - {"2.2.2.2:22", true}, + makeServicePortName("ns1", "ep1", "p12"): { + {"1.1.1.2:12", true}, }, }, expectedStale: []endpointServicePair{}, @@ -1874,25 +2148,18 @@ func Test_buildNewEndpointsMap(t *testing.T) { }, }, { // Case[11]: remove a subset - newEndpoints: makeEndpointsMap( - makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { - ept.Subsets = []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{ - IP: "1.1.1.1", - }}, - Ports: []api.EndpointPort{{ - Name: "p11", - Port: 11, - }}, - }} - }), - ), + previousEndpoints: []*api.Endpoints{ + makeTestEndpoints("ns1", "ep1", multipleSubsets), + }, + currentEndpoints: []*api.Endpoints{ + makeTestEndpoints("ns1", "ep1", namedPort), + }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", false}, }, - makeServicePortName("ns1", "ep1", "p22"): { - {"2.2.2.2:22", false}, + makeServicePortName("ns1", "ep1", "p12"): { + {"1.1.1.2:12", false}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ @@ -1901,25 +2168,18 @@ func Test_buildNewEndpointsMap(t *testing.T) { }, }, expectedStale: []endpointServicePair{{ - endpoint: "2.2.2.2:22", - servicePortName: makeServicePortName("ns1", "ep1", "p22"), + endpoint: "1.1.1.2:12", + servicePortName: makeServicePortName("ns1", "ep1", "p12"), }}, expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[12]: rename a port - newEndpoints: makeEndpointsMap( - makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { - ept.Subsets = []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{ - IP: "1.1.1.1", - }}, - Ports: []api.EndpointPort{{ - Name: "p11-2", - Port: 11, - }}, - }} - }), - ), + previousEndpoints: []*api.Endpoints{ + makeTestEndpoints("ns1", "ep1", namedPort), + }, + currentEndpoints: []*api.Endpoints{ + makeTestEndpoints("ns1", "ep1", namedPortRenamed), + }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", false}, @@ -1937,19 +2197,12 @@ func Test_buildNewEndpointsMap(t *testing.T) { expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[13]: renumber a port - newEndpoints: makeEndpointsMap( - makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { - ept.Subsets = []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{ - IP: "1.1.1.1", - }}, - Ports: []api.EndpointPort{{ - Name: "p11", - Port: 22, - }}, - }} - }), - ), + previousEndpoints: []*api.Endpoints{ + makeTestEndpoints("ns1", "ep1", namedPort), + }, + currentEndpoints: []*api.Endpoints{ + makeTestEndpoints("ns1", "ep1", namedPortRenumbered), + }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", false}, @@ -1967,55 +2220,18 @@ func Test_buildNewEndpointsMap(t *testing.T) { expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[14]: complex add and remove - newEndpoints: makeEndpointsMap( - makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { - ept.Subsets = []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{ - IP: "1.1.1.1", - }, { - IP: "1.1.1.11", - }}, - Ports: []api.EndpointPort{{ - Name: "p11", - Port: 11, - }}, - }, { - Addresses: []api.EndpointAddress{{ - IP: "1.1.1.2", - }}, - Ports: []api.EndpointPort{{ - Name: "p12", - Port: 12, - }, { - Name: "p122", - Port: 122, - }}, - }} - }), - makeTestEndpoints("ns3", "ep3", func(ept *api.Endpoints) { - ept.Subsets = []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{ - IP: "3.3.3.3", - }}, - Ports: []api.EndpointPort{{ - Name: "p33", - Port: 33, - }}, - }} - }), - makeTestEndpoints("ns4", "ep4", func(ept *api.Endpoints) { - ept.Subsets = []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{ - IP: "4.4.4.4", - NodeName: &nodeName, - }}, - Ports: []api.EndpointPort{{ - Name: "p44", - Port: 44, - }}, - }} - }), - ), + previousEndpoints: []*api.Endpoints{ + makeTestEndpoints("ns1", "ep1", complexBefore1), + makeTestEndpoints("ns2", "ep2", complexBefore2), + nil, + makeTestEndpoints("ns4", "ep4", complexBefore4), + }, + currentEndpoints: []*api.Endpoints{ + makeTestEndpoints("ns1", "ep1", complexAfter1), + nil, + makeTestEndpoints("ns3", "ep3", complexAfter3), + makeTestEndpoints("ns4", "ep4", complexAfter4), + }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { {"1.1.1.1:11", false}, @@ -2075,21 +2291,40 @@ func Test_buildNewEndpointsMap(t *testing.T) { }} for tci, tc := range testCases { - newMap, hcEndpoints, stale := buildNewEndpointsMap(tc.newEndpoints, tc.oldEndpoints, nodeName) - if len(newMap) != len(tc.expectedResult) { - t.Errorf("[%d] expected %d results, got %d: %v", tci, len(tc.expectedResult), len(newMap), newMap) - } - for x := range tc.expectedResult { - if len(newMap[x]) != len(tc.expectedResult[x]) { - t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(tc.expectedResult[x]), x, len(newMap[x])) - } else { - for i := range tc.expectedResult[x] { - if *(newMap[x][i]) != *(tc.expectedResult[x][i]) { - t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, tc.expectedResult[x][i], newMap[x][i]) - } - } + ipt := iptablestest.NewFake() + fp := NewFakeProxier(ipt) + fp.hostname = nodeName + + // First check that after adding all previous versions of endpoints, + // the fp.oldEndpoints is as we expect. + for i := range tc.previousEndpoints { + if tc.previousEndpoints[i] != nil { + fp.OnEndpointsAdd(tc.previousEndpoints[i]) } } + updateEndpointsMap(fp.endpointsMap, &fp.endpointsChanges, fp.hostname) + compareEndpointsMaps(t, tci, fp.endpointsMap, tc.oldEndpoints) + + // Now let's call appropriate handlers to get to state we want to be. + if len(tc.previousEndpoints) != len(tc.currentEndpoints) { + t.Fatalf("[%d] different lengths of previous and current endpoints", tci) + continue + } + + for i := range tc.previousEndpoints { + prev, curr := tc.previousEndpoints[i], tc.currentEndpoints[i] + switch { + case prev == nil: + fp.OnEndpointsAdd(curr) + case curr == nil: + fp.OnEndpointsDelete(prev) + default: + fp.OnEndpointsUpdate(prev, curr) + } + } + _, hcEndpoints, stale := updateEndpointsMap(fp.endpointsMap, &fp.endpointsChanges, fp.hostname) + newMap := fp.endpointsMap + compareEndpointsMaps(t, tci, newMap, tc.expectedResult) if len(stale) != len(tc.expectedStale) { t.Errorf("[%d] expected %d stale, got %d: %v", tci, len(tc.expectedStale), len(stale), stale) }