diff --git a/pkg/proxy/iptables/BUILD b/pkg/proxy/iptables/BUILD index 0c5cc6891d1..821b14da8a3 100644 --- a/pkg/proxy/iptables/BUILD +++ b/pkg/proxy/iptables/BUILD @@ -20,7 +20,6 @@ go_library( "//pkg/proxy/healthcheck:go_default_library", "//pkg/util/exec:go_default_library", "//pkg/util/iptables:go_default_library", - "//pkg/util/slice:go_default_library", "//pkg/util/sysctl:go_default_library", "//pkg/util/version:go_default_library", "//vendor:github.com/davecgh/go-spew/spew", @@ -46,9 +45,11 @@ go_test( "//pkg/util/exec:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/iptables/testing:go_default_library", + "//vendor:github.com/davecgh/go-spew/spew", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/types", "//vendor:k8s.io/apimachinery/pkg/util/intstr", + "//vendor:k8s.io/apimachinery/pkg/util/sets", ], ) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 311ef58d04b..6208dc00128 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -48,7 +48,6 @@ import ( "k8s.io/kubernetes/pkg/proxy/healthcheck" utilexec "k8s.io/kubernetes/pkg/util/exec" utiliptables "k8s.io/kubernetes/pkg/util/iptables" - "k8s.io/kubernetes/pkg/util/slice" utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" utilversion "k8s.io/kubernetes/pkg/util/version" ) @@ -150,8 +149,8 @@ type serviceInfo struct { // internal struct for endpoints information type endpointsInfo struct { - ip string - localEndpoint bool + endpoint string // TODO: should be an endpointString type + isLocal bool } // returns a new serviceInfo struct @@ -191,13 +190,13 @@ type proxyServiceMap map[proxy.ServicePortName]*serviceInfo // Proxier is an iptables based proxy for connections between a localhost:lport // and services that provide the actual backends. type Proxier struct { - mu sync.Mutex // protects the following fields - serviceMap proxyServiceMap - endpointsMap map[proxy.ServicePortName][]*endpointsInfo - portsMap map[localPort]closeable - haveReceivedServiceUpdate bool // true once we've seen an OnServiceUpdate event - haveReceivedEndpointsUpdate bool // true once we've seen an OnEndpointsUpdate event - throttle flowcontrol.RateLimiter + mu sync.Mutex // protects the following fields + serviceMap proxyServiceMap + endpointsMap map[proxy.ServicePortName][]*endpointsInfo + portsMap map[localPort]closeable + haveReceivedServiceUpdate bool // true once we've seen an OnServiceUpdate event + allEndpoints []api.Endpoints // nil until we have seen an OnEndpointsUpdate event + throttle flowcontrol.RateLimiter // These are effectively const and do not need the mutex to be held. syncPeriod time.Duration @@ -211,6 +210,7 @@ type Proxier struct { nodeIP net.IP portMapper portOpener recorder record.EventRecorder + healthChecker healthChecker } type localPort struct { @@ -242,6 +242,17 @@ func (l *listenPortOpener) OpenLocalPort(lp *localPort) (closeable, error) { return openLocalPort(lp) } +type healthChecker interface { + UpdateEndpoints(serviceName types.NamespacedName, endpointUIDs sets.String) +} + +// TODO: the healthcheck pkg should offer a type +type globalHealthChecker struct{} + +func (globalHealthChecker) UpdateEndpoints(serviceName types.NamespacedName, endpointUIDs sets.String) { + healthcheck.UpdateEndpoints(serviceName, endpointUIDs) +} + // Proxier implements ProxyProvider var _ proxy.ProxyProvider = &Proxier{} @@ -295,6 +306,7 @@ func NewProxier(ipt utiliptables.Interface, glog.Warningf("clusterCIDR not specified, unable to distinguish between internal and external traffic") } + healthChecker := globalHealthChecker{} go healthcheck.Run() var throttle flowcontrol.RateLimiter @@ -321,6 +333,7 @@ func NewProxier(ipt utiliptables.Interface, nodeIP: nodeIP, portMapper: &listenPortOpener{}, recorder: recorder, + healthChecker: healthChecker, }, nil } @@ -542,15 +555,6 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { proxier.deleteServiceConnections(staleUDPServices.List()) } -// Generate a list of ip strings from the list of endpoint infos -func flattenEndpointsInfo(endPoints []*endpointsInfo) []string { - var endpointIPs []string - for _, ep := range endPoints { - endpointIPs = append(endpointIPs, ep.ip) - } - return endpointIPs -} - // Reconstruct the list of endpoint infos from the endpointIP list // Use the slice of endpointIPs to rebuild a slice of corresponding {endpointIP, localEndpointOnly} infos // from the full []hostPortInfo slice. @@ -562,7 +566,7 @@ func flattenEndpointsInfo(endPoints []*endpointsInfo) []string { // then output will be // // []endpointsInfo{ {"2.2.2.2:80", localEndpointOnly=} } -func (proxier *Proxier) buildEndpointInfoList(endPoints []hostPortInfo, endpointIPs []string) []*endpointsInfo { +func buildEndpointInfoList(endPoints []hostPortInfo, endpointIPs []string) []*endpointsInfo { lookupSet := sets.NewString() for _, ip := range endpointIPs { lookupSet.Insert(ip) @@ -571,7 +575,7 @@ func (proxier *Proxier) buildEndpointInfoList(endPoints []hostPortInfo, endpoint for _, hpp := range endPoints { key := net.JoinHostPort(hpp.host, strconv.Itoa(hpp.port)) if lookupSet.Has(key) { - filteredEndpoints = append(filteredEndpoints, &endpointsInfo{ip: key, localEndpoint: hpp.localEndpoint}) + filteredEndpoints = append(filteredEndpoints, &endpointsInfo{endpoint: key, isLocal: hpp.isLocal}) } } return filteredEndpoints @@ -579,92 +583,17 @@ func (proxier *Proxier) buildEndpointInfoList(endPoints []hostPortInfo, endpoint // OnEndpointsUpdate takes in a slice of updated endpoints. func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { - start := time.Now() - defer func() { - glog.V(4).Infof("OnEndpointsUpdate took %v for %d endpoints", time.Since(start), len(allEndpoints)) - }() - proxier.mu.Lock() defer proxier.mu.Unlock() - proxier.haveReceivedEndpointsUpdate = true - - activeEndpoints := make(map[proxy.ServicePortName]bool) // use a map as a set - staleConnections := make(map[endpointServicePair]bool) - svcPortToInfoMap := make(map[proxy.ServicePortName][]hostPortInfo) - newEndpointsMap := make(map[proxy.ServicePortName][]*endpointsInfo) - - // Update endpoints for services. - for i := range allEndpoints { - svcEndpoints := &allEndpoints[i] - - // We need to build a map of portname -> all ip:ports for that - // portname. Explode Endpoints.Subsets[*] into this structure. - portsToEndpoints := map[string][]hostPortInfo{} - for i := range svcEndpoints.Subsets { - ss := &svcEndpoints.Subsets[i] - for i := range ss.Ports { - port := &ss.Ports[i] - for i := range ss.Addresses { - addr := &ss.Addresses[i] - var isLocalEndpoint bool - if addr.NodeName != nil { - isLocalEndpoint = *addr.NodeName == proxier.hostname - isLocalEndpoint = utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) && isLocalEndpoint - } - hostPortObject := hostPortInfo{ - host: addr.IP, - port: int(port.Port), - localEndpoint: isLocalEndpoint, - } - portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortObject) - } - } - } - for portname := range portsToEndpoints { - svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: svcEndpoints.Namespace, Name: svcEndpoints.Name}, Port: portname} - svcPortToInfoMap[svcPort] = portsToEndpoints[portname] - curEndpoints := proxier.endpointsMap[svcPort] - newEndpoints := flattenValidEndpoints(portsToEndpoints[portname]) - // Flatten the list of current endpoint infos to just a list of ips as strings - curEndpointIPs := flattenEndpointsInfo(curEndpoints) - if len(curEndpointIPs) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpointIPs), newEndpoints) { - glog.V(3).Infof("Setting endpoints for %q to %+v", svcPort, newEndpoints) - // Gather stale connections to removed endpoints - removedEndpoints := getRemovedEndpoints(curEndpointIPs, newEndpoints) - for _, ep := range removedEndpoints { - staleConnections[endpointServicePair{endpoint: ep, servicePortName: svcPort}] = true - } - } - // Once the set operations using the list of ips are complete, build the list of endpoint infos - newEndpointsMap[svcPort] = proxier.buildEndpointInfoList(portsToEndpoints[portname], newEndpoints) - activeEndpoints[svcPort] = true - } - } - // Check stale connections against endpoints missing from the update. - for svcPort := range proxier.endpointsMap { - if !activeEndpoints[svcPort] { - glog.V(2).Infof("Removing endpoints for %q", svcPort) - // record endpoints of unactive service to stale connections - for _, ep := range proxier.endpointsMap[svcPort] { - staleConnections[endpointServicePair{endpoint: ep.ip, servicePortName: svcPort}] = true - } - } + if proxier.allEndpoints == nil { + glog.V(2).Info("Received first Endpoints update") } + proxier.allEndpoints = allEndpoints - // Update service health check - allSvcPorts := make(map[proxy.ServicePortName]bool) - for svcPort := range proxier.endpointsMap { - allSvcPorts[svcPort] = true - } - for svcPort := range newEndpointsMap { - allSvcPorts[svcPort] = true - } - for svcPort := range allSvcPorts { - proxier.updateHealthCheckEntries(svcPort.NamespacedName, svcPortToInfoMap[svcPort]) - } - - if len(newEndpointsMap) != len(proxier.endpointsMap) || !reflect.DeepEqual(newEndpointsMap, proxier.endpointsMap) { - proxier.endpointsMap = newEndpointsMap + // TODO: once service has made this same transform, move this into proxier.syncProxyRules() + newMap, staleConnections := updateEndpoints(proxier.allEndpoints, proxier.endpointsMap, proxier.hostname, proxier.healthChecker) + if len(newMap) != len(proxier.endpointsMap) || !reflect.DeepEqual(newMap, proxier.endpointsMap) { + proxier.endpointsMap = newMap proxier.syncProxyRules() } else { glog.V(4).Infof("Skipping proxy iptables rule sync on endpoint update because nothing changed") @@ -673,41 +602,127 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { proxier.deleteEndpointConnections(staleConnections) } +// Convert a slice of api.Endpoints objects into a map of service-port -> endpoints. +func updateEndpoints(allEndpoints []api.Endpoints, curMap map[proxy.ServicePortName][]*endpointsInfo, hostname string, + healthChecker healthChecker) (newMap map[proxy.ServicePortName][]*endpointsInfo, staleSet map[endpointServicePair]bool) { + + // return values + newMap = make(map[proxy.ServicePortName][]*endpointsInfo) + staleSet = make(map[endpointServicePair]bool) + + // local + svcPortToInfoMap := make(map[proxy.ServicePortName][]hostPortInfo) + + // Update endpoints for services. + for i := range allEndpoints { + accumulateEndpointsMap(&allEndpoints[i], hostname, curMap, &newMap, &svcPortToInfoMap) + } + // Check stale connections against endpoints missing from the update. + // TODO: we should really only mark a connection stale if the proto was UDP + // and the (ip, port, proto) was removed from the endpoints. + for svcPort, epList := range curMap { + for _, ep := range epList { + stale := true + for i := range newMap[svcPort] { + if *newMap[svcPort][i] == *ep { + stale = false + break + } + } + if stale { + glog.V(4).Infof("Stale endpoint %v -> %v", svcPort, ep.endpoint) + staleSet[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPort}] = true + } + } + } + + // Update service health check + allSvcPorts := make(map[proxy.ServicePortName]bool) + for svcPort := range curMap { + allSvcPorts[svcPort] = true + } + for svcPort := range newMap { + allSvcPorts[svcPort] = true + } + for svcPort := range allSvcPorts { + updateHealthCheckEntries(svcPort.NamespacedName, svcPortToInfoMap[svcPort], healthChecker) + } + + return newMap, staleSet +} + +// Gather information about all the endpoint state for a given api.Endpoints. +// This can not report complete info on stale connections because it has limited +// scope - it only knows one Endpoints, but sees the whole current map. That +// cleanup has to be done above. +// +// TODO: this could be simplified: +// - hostPortInfo and endpointsInfo overlap too much +// - the test for this is overlapped by the test for updateEndpoints +// - naming is poor and responsibilities are muddled +func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, + curEndpoints map[proxy.ServicePortName][]*endpointsInfo, + newEndpoints *map[proxy.ServicePortName][]*endpointsInfo, + svcPortToInfoMap *map[proxy.ServicePortName][]hostPortInfo) { + + // We need to build a map of portname -> all ip:ports for that + // portname. Explode Endpoints.Subsets[*] into this structure. + for i := range endpoints.Subsets { + ss := &endpoints.Subsets[i] + for i := range ss.Ports { + port := &ss.Ports[i] + svcPort := proxy.ServicePortName{ + NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, + Port: port.Name, + } + for i := range ss.Addresses { + addr := &ss.Addresses[i] + hostPortObject := hostPortInfo{ + host: addr.IP, + port: int(port.Port), + isLocal: addr.NodeName != nil && *addr.NodeName == hostname, + } + (*svcPortToInfoMap)[svcPort] = append((*svcPortToInfoMap)[svcPort], hostPortObject) + } + } + } + // Decompose the lists of endpoints into details of what was changed for the caller. + for svcPort, hostPortInfos := range *svcPortToInfoMap { + newEPList := flattenValidEndpoints(hostPortInfos) + glog.V(3).Infof("Setting endpoints for %q to %+v", svcPort, newEPList) + // Once the set operations using the list of ips are complete, build the list of endpoint infos + (*newEndpoints)[svcPort] = buildEndpointInfoList(hostPortInfos, newEPList) + } +} + // updateHealthCheckEntries - send the new set of local endpoints to the health checker -func (proxier *Proxier) updateHealthCheckEntries(name types.NamespacedName, hostPorts []hostPortInfo) { +func updateHealthCheckEntries(name types.NamespacedName, hostPorts []hostPortInfo, healthChecker healthChecker) { + if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) { + return + } + // Use a set instead of a slice to provide deduplication endpoints := sets.NewString() for _, portInfo := range hostPorts { - if portInfo.localEndpoint { + if portInfo.isLocal { // kube-proxy health check only needs local endpoints endpoints.Insert(fmt.Sprintf("%s/%s", name.Namespace, name.Name)) } } - healthcheck.UpdateEndpoints(name, endpoints) + healthChecker.UpdateEndpoints(name, endpoints) } // used in OnEndpointsUpdate type hostPortInfo struct { - host string - port int - localEndpoint bool + host string + port int + isLocal bool } func isValidEndpoint(hpp *hostPortInfo) bool { return hpp.host != "" && hpp.port > 0 } -// Tests whether two slices are equivalent. This sorts both slices in-place. -func slicesEquiv(lhs, rhs []string) bool { - if len(lhs) != len(rhs) { - return false - } - if reflect.DeepEqual(slice.SortStrings(lhs), slice.SortStrings(rhs)) { - return true - } - return false -} - func flattenValidEndpoints(endpoints []hostPortInfo) []string { // Convert Endpoint objects into strings for easier use later. var result []string @@ -762,11 +777,6 @@ func servicePortEndpointChainName(s proxy.ServicePortName, protocol string, endp return utiliptables.Chain("KUBE-SEP-" + encoded[:16]) } -// getRemovedEndpoints returns the endpoint IPs that are missing in the new endpoints -func getRemovedEndpoints(curEndpoints, newEndpoints []string) []string { - return sets.NewString(curEndpoints...).Difference(sets.NewString(newEndpoints...)).List() -} - type endpointServicePair struct { endpoint string servicePortName proxy.ServicePortName @@ -832,7 +842,7 @@ func (proxier *Proxier) syncProxyRules() { glog.V(4).Infof("syncProxyRules took %v", time.Since(start)) }() // don't sync rules till we've received services and endpoints - if !proxier.haveReceivedEndpointsUpdate || !proxier.haveReceivedServiceUpdate { + if proxier.allEndpoints == nil || !proxier.haveReceivedServiceUpdate { glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master") return } @@ -881,6 +891,10 @@ func (proxier *Proxier) syncProxyRules() { } } + // + // Below this point we will not return until we try to write the iptables rules. + // + // Get iptables-save output so we can check for existing chains and rules. // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore existingFilterChains := make(map[utiliptables.Chain]string) @@ -1195,7 +1209,7 @@ func (proxier *Proxier) syncProxyRules() { endpointChains := make([]utiliptables.Chain, 0) for _, ep := range proxier.endpointsMap[svcName] { endpoints = append(endpoints, ep) - endpointChain := servicePortEndpointChainName(svcName, protocol, ep.ip) + endpointChain := servicePortEndpointChainName(svcName, protocol, ep.endpoint) endpointChains = append(endpointChains, endpointChain) // Create the endpoint chain, retaining counters if possible. @@ -1245,14 +1259,14 @@ func (proxier *Proxier) syncProxyRules() { } // Handle traffic that loops back to the originator with SNAT. writeLine(natRules, append(args, - "-s", fmt.Sprintf("%s/32", strings.Split(endpoints[i].ip, ":")[0]), + "-s", fmt.Sprintf("%s/32", strings.Split(endpoints[i].endpoint, ":")[0]), "-j", string(KubeMarkMasqChain))...) // Update client-affinity lists. if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP { args = append(args, "-m", "recent", "--name", string(endpointChain), "--set") } // DNAT to final destination. - args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i].ip) + args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i].endpoint) writeLine(natRules, args...) } @@ -1266,7 +1280,7 @@ func (proxier *Proxier) syncProxyRules() { localEndpoints := make([]*endpointsInfo, 0) localEndpointChains := make([]utiliptables.Chain, 0) for i := range endpointChains { - if endpoints[i].localEndpoint { + if endpoints[i].isLocal { // These slices parallel each other; must be kept in sync localEndpoints = append(localEndpoints, endpoints[i]) localEndpointChains = append(localEndpointChains, endpointChains[i]) diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 472432b8313..7fbc27c1d42 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -17,8 +17,11 @@ limitations under the License. package iptables import ( + "strconv" "testing" + "github.com/davecgh/go-spew/spew" + "fmt" "net" "strings" @@ -26,6 +29,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/service" "k8s.io/kubernetes/pkg/proxy" @@ -168,47 +172,6 @@ func TestGetChainLinesMultipleTables(t *testing.T) { checkAllLines(t, utiliptables.TableNAT, []byte(iptables_save), expected) } -func TestGetRemovedEndpoints(t *testing.T) { - testCases := []struct { - currentEndpoints []string - newEndpoints []string - removedEndpoints []string - }{ - { - currentEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"}, - newEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"}, - removedEndpoints: []string{}, - }, - { - currentEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80", "10.0.2.3:80"}, - newEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"}, - removedEndpoints: []string{"10.0.2.3:80"}, - }, - { - currentEndpoints: []string{}, - newEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"}, - removedEndpoints: []string{}, - }, - { - currentEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"}, - newEndpoints: []string{}, - removedEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"}, - }, - { - currentEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80", "10.0.2.2:443"}, - newEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"}, - removedEndpoints: []string{"10.0.2.2:443"}, - }, - } - - for i := range testCases { - res := getRemovedEndpoints(testCases[i].currentEndpoints, testCases[i].newEndpoints) - if !slicesEquiv(res, testCases[i].removedEndpoints) { - t.Errorf("Expected: %v, but getRemovedEndpoints returned: %v", testCases[i].removedEndpoints, res) - } - } -} - func TestExecConntrackTool(t *testing.T) { fcmd := exec.FakeCmd{ CombinedOutputScript: []exec.FakeCombinedOutputAction{ @@ -289,8 +252,8 @@ func TestDeleteEndpointConnections(t *testing.T) { } serviceMap := make(map[proxy.ServicePortName]*serviceInfo) - svc1 := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: "svc1"}, Port: "80"} - svc2 := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: "svc2"}, Port: "80"} + svc1 := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: "svc1"}, Port: "p80"} + svc2 := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: "svc2"}, Port: "p80"} serviceMap[svc1] = newFakeServiceInfo(svc1, net.IPv4(10, 20, 30, 40), 80, api.ProtocolUDP, false) serviceMap[svc2] = newFakeServiceInfo(svc1, net.IPv4(10, 20, 30, 41), 80, api.ProtocolTCP, false) @@ -491,36 +454,43 @@ func (f *fakePortOpener) OpenLocalPort(lp *localPort) (closeable, error) { return nil, nil } +type fakeHealthChecker struct{} + +func (fakeHealthChecker) UpdateEndpoints(serviceName types.NamespacedName, endpointUIDs sets.String) {} + +const testHostname = "test-hostname" + func NewFakeProxier(ipt utiliptables.Interface) *Proxier { // TODO: Call NewProxier after refactoring out the goroutine // invocation into a Run() method. return &Proxier{ - exec: &exec.FakeExec{}, - serviceMap: make(map[proxy.ServicePortName]*serviceInfo), - iptables: ipt, - endpointsMap: make(map[proxy.ServicePortName][]*endpointsInfo), - clusterCIDR: "10.0.0.0/24", - haveReceivedEndpointsUpdate: true, - haveReceivedServiceUpdate: true, - hostname: "test-hostname", - portsMap: make(map[localPort]closeable), - portMapper: &fakePortOpener{[]*localPort{}}, + exec: &exec.FakeExec{}, + serviceMap: make(map[proxy.ServicePortName]*serviceInfo), + iptables: ipt, + clusterCIDR: "10.0.0.0/24", + allEndpoints: []api.Endpoints{}, + haveReceivedServiceUpdate: true, + hostname: testHostname, + portsMap: make(map[localPort]closeable), + portMapper: &fakePortOpener{[]*localPort{}}, + healthChecker: fakeHealthChecker{}, } } -func hasJump(rules []iptablestest.Rule, destChain, destIP, destPort string) bool { +func hasJump(rules []iptablestest.Rule, destChain, destIP string, destPort int) bool { + destPortStr := strconv.Itoa(destPort) match := false for _, r := range rules { if r[iptablestest.Jump] == destChain { match = true if destIP != "" { - if strings.Contains(r[iptablestest.Destination], destIP) && (strings.Contains(r[iptablestest.DPort], destPort) || r[iptablestest.DPort] == "") { + if strings.Contains(r[iptablestest.Destination], destIP) && (strings.Contains(r[iptablestest.DPort], destPortStr) || r[iptablestest.DPort] == "") { return true } match = false } - if destPort != "" { - if strings.Contains(r[iptablestest.DPort], destPort) && (strings.Contains(r[iptablestest.Destination], destIP) || r[iptablestest.Destination] == "") { + if destPort != 0 { + if strings.Contains(r[iptablestest.DPort], destPortStr) && (strings.Contains(r[iptablestest.Destination], destIP) || r[iptablestest.Destination] == "") { return true } match = false @@ -535,7 +505,7 @@ func TestHasJump(t *testing.T) { rules []iptablestest.Rule destChain string destIP string - destPort string + destPort int expected bool }{ "case 1": { @@ -546,7 +516,7 @@ func TestHasJump(t *testing.T) { }, destChain: "REJECT", destIP: "10.20.30.41", - destPort: "80", + destPort: 80, expected: true, }, "case 2": { @@ -557,7 +527,7 @@ func TestHasJump(t *testing.T) { }, destChain: "REJECT", destIP: "", - destPort: "3001", + destPort: 3001, expected: true, }, "case 3": { @@ -567,7 +537,7 @@ func TestHasJump(t *testing.T) { }, destChain: "KUBE-XLB-GF53O3C2HZEXL2XN", destIP: "1.2.3.4", - destPort: "80", + destPort: 80, expected: true, }, "case 4": { @@ -577,7 +547,7 @@ func TestHasJump(t *testing.T) { }, destChain: "KUBE-XLB-GF53O3C2HZEXL2XN", destIP: "1.2.3.4", - destPort: "8080", + destPort: 8080, expected: false, }, "case 5": { @@ -587,7 +557,7 @@ func TestHasJump(t *testing.T) { }, destChain: "KUBE-XLB-GF53O3C2HZEXL2XN", destIP: "10.20.30.40", - destPort: "80", + destPort: 80, expected: false, }, "case 6": { @@ -599,7 +569,7 @@ func TestHasJump(t *testing.T) { }, destChain: "REJECT", destIP: "1.2.3.4", - destPort: "8080", + destPort: 8080, expected: true, }, "case 7": { @@ -610,7 +580,7 @@ func TestHasJump(t *testing.T) { }, destChain: "REJECT", destIP: "1.2.3.4", - destPort: "3001", + destPort: 3001, expected: true, }, "case 8": { @@ -621,7 +591,7 @@ func TestHasJump(t *testing.T) { }, destChain: "REJECT", destIP: "10.20.30.41", - destPort: "8080", + destPort: 8080, expected: true, }, "case 9": { @@ -630,7 +600,7 @@ func TestHasJump(t *testing.T) { }, destChain: "KUBE-SEP-LWSOSDSHMKPJHHJV", destIP: "", - destPort: "", + destPort: 0, expected: true, }, "case 10": { @@ -639,7 +609,7 @@ func TestHasJump(t *testing.T) { }, destChain: "KUBE-SEP-BAR", destIP: "", - destPort: "", + destPort: 0, expected: false, }, } @@ -673,7 +643,7 @@ func TestClusterIPReject(t *testing.T) { svcName := "svc1" svcIP := net.IPv4(10, 20, 30, 41) - svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "80"} + svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "p80"} fp.serviceMap[svc] = newFakeServiceInfo(svc, svcIP, 80, api.ProtocolTCP, false) fp.syncProxyRules() @@ -683,7 +653,7 @@ func TestClusterIPReject(t *testing.T) { errorf(fmt.Sprintf("Unexpected rule for chain %v service %v without endpoints", svcChain, svcName), svcRules, t) } kubeSvcRules := ipt.GetRules(string(kubeServicesChain)) - if !hasJump(kubeSvcRules, iptablestest.Reject, svcIP.String(), "80") { + if !hasJump(kubeSvcRules, iptablestest.Reject, svcIP.String(), 80) { errorf(fmt.Sprintf("Failed to find a %v rule for service %v with no endpoints", iptablestest.Reject, svcName), kubeSvcRules, t) } } @@ -694,23 +664,37 @@ func TestClusterIPEndpointsJump(t *testing.T) { svcName := "svc1" svcIP := net.IPv4(10, 20, 30, 41) - svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "80"} + svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "p80"} fp.serviceMap[svc] = newFakeServiceInfo(svc, svcIP, 80, api.ProtocolTCP, true) - ep := "10.180.0.1:80" - fp.endpointsMap[svc] = []*endpointsInfo{{ep, false}} + ip := "10.180.0.1" + port := 80 + ep := fmt.Sprintf("%s:%d", ip, port) + allEndpoints := []api.Endpoints{ + makeTestEndpoints("ns1", svcName, func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: ip, + }}, + Ports: []api.EndpointPort{{ + Name: "p80", + Port: int32(port), + }}, + }} + }), + } - fp.syncProxyRules() + fp.OnEndpointsUpdate(allEndpoints) svcChain := string(servicePortChainName(svc, strings.ToLower(string(api.ProtocolTCP)))) epChain := string(servicePortEndpointChainName(svc, strings.ToLower(string(api.ProtocolTCP)), ep)) kubeSvcRules := ipt.GetRules(string(kubeServicesChain)) - if !hasJump(kubeSvcRules, svcChain, svcIP.String(), "80") { + if !hasJump(kubeSvcRules, svcChain, svcIP.String(), 80) { errorf(fmt.Sprintf("Failed to find jump from KUBE-SERVICES to %v chain", svcChain), kubeSvcRules, t) } svcRules := ipt.GetRules(svcChain) - if !hasJump(svcRules, epChain, "", "") { + if !hasJump(svcRules, epChain, "", 0) { errorf(fmt.Sprintf("Failed to jump to ep chain %v", epChain), svcRules, t) } epRules := ipt.GetRules(epChain) @@ -733,12 +717,25 @@ func TestLoadBalancer(t *testing.T) { svcName := "svc1" svcIP := net.IPv4(10, 20, 30, 41) - svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "80"} + svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "p80"} svcInfo := newFakeServiceInfo(svc, svcIP, 80, api.ProtocolTCP, false) fp.serviceMap[svc] = typeLoadBalancer(svcInfo) - ep1 := "10.180.0.1:80" - fp.endpointsMap[svc] = []*endpointsInfo{{ep1, false}} + ip := "10.180.0.1" + port := 80 + fp.allEndpoints = []api.Endpoints{ + makeTestEndpoints("ns1", svcName, func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: ip, + }}, + Ports: []api.EndpointPort{{ + Name: "p80", + Port: int32(port), + }}, + }} + }), + } fp.syncProxyRules() @@ -748,12 +745,12 @@ func TestLoadBalancer(t *testing.T) { //lbChain := string(serviceLBChainName(svc, proto)) kubeSvcRules := ipt.GetRules(string(kubeServicesChain)) - if !hasJump(kubeSvcRules, fwChain, svcInfo.loadBalancerStatus.Ingress[0].IP, "80") { + if !hasJump(kubeSvcRules, fwChain, svcInfo.loadBalancerStatus.Ingress[0].IP, 80) { errorf(fmt.Sprintf("Failed to find jump to firewall chain %v", fwChain), kubeSvcRules, t) } fwRules := ipt.GetRules(fwChain) - if !hasJump(fwRules, svcChain, "", "") || !hasJump(fwRules, string(KubeMarkMasqChain), "", "") { + if !hasJump(fwRules, svcChain, "", 0) || !hasJump(fwRules, string(KubeMarkMasqChain), "", 0) { errorf(fmt.Sprintf("Failed to find jump from firewall chain %v to svc chain %v", fwChain, svcChain), fwRules, t) } } @@ -764,13 +761,26 @@ func TestNodePort(t *testing.T) { svcName := "svc1" svcIP := net.IPv4(10, 20, 30, 41) - svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "80"} + svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "p80"} svcInfo := newFakeServiceInfo(svc, svcIP, 80, api.ProtocolTCP, false) svcInfo.nodePort = 3001 fp.serviceMap[svc] = svcInfo - ep1 := "10.180.0.1:80" - fp.endpointsMap[svc] = []*endpointsInfo{{ep1, false}} + ip := "10.180.0.1" + port := 80 + fp.allEndpoints = []api.Endpoints{ + makeTestEndpoints("ns1", svcName, func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: ip, + }}, + Ports: []api.EndpointPort{{ + Name: "p80", + Port: int32(port), + }}, + }} + }), + } fp.syncProxyRules() @@ -778,26 +788,49 @@ func TestNodePort(t *testing.T) { svcChain := string(servicePortChainName(svc, strings.ToLower(proto))) kubeNodePortRules := ipt.GetRules(string(kubeNodePortsChain)) - if !hasJump(kubeNodePortRules, svcChain, "", fmt.Sprintf("%v", svcInfo.nodePort)) { + if !hasJump(kubeNodePortRules, svcChain, "", svcInfo.nodePort) { errorf(fmt.Sprintf("Failed to find jump to svc chain %v", svcChain), kubeNodePortRules, t) } } +func strPtr(s string) *string { + return &s +} + func TestOnlyLocalLoadBalancing(t *testing.T) { ipt := iptablestest.NewFake() fp := NewFakeProxier(ipt) svcName := "svc1" svcIP := net.IPv4(10, 20, 30, 41) - svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "80"} + svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "p80"} svcInfo := newFakeServiceInfo(svc, svcIP, 80, api.ProtocolTCP, true) fp.serviceMap[svc] = typeLoadBalancer(svcInfo) - nonLocalEp := "10.180.0.1:80" - localEp := "10.180.2.1:80" - fp.endpointsMap[svc] = []*endpointsInfo{{nonLocalEp, false}, {localEp, true}} + ip1 := "10.180.0.1" + ip2 := "10.180.2.1" + port := 80 + nonLocalEp := fmt.Sprintf("%s:%d", ip1, port) + localEp := fmt.Sprintf("%s:%d", ip2, port) + allEndpoints := []api.Endpoints{ + makeTestEndpoints("ns1", svcName, func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: ip1, + NodeName: nil, + }, { + IP: ip2, + NodeName: strPtr(testHostname), + }}, + Ports: []api.EndpointPort{{ + Name: "p80", + Port: int32(port), + }}, + }} + }), + } - fp.syncProxyRules() + fp.OnEndpointsUpdate(allEndpoints) proto := strings.ToLower(string(api.ProtocolTCP)) fwChain := string(serviceFirewallChainName(svc, proto)) @@ -807,24 +840,24 @@ func TestOnlyLocalLoadBalancing(t *testing.T) { localEpChain := string(servicePortEndpointChainName(svc, strings.ToLower(string(api.ProtocolTCP)), localEp)) kubeSvcRules := ipt.GetRules(string(kubeServicesChain)) - if !hasJump(kubeSvcRules, fwChain, svcInfo.loadBalancerStatus.Ingress[0].IP, "") { + if !hasJump(kubeSvcRules, fwChain, svcInfo.loadBalancerStatus.Ingress[0].IP, 0) { errorf(fmt.Sprintf("Failed to find jump to firewall chain %v", fwChain), kubeSvcRules, t) } fwRules := ipt.GetRules(fwChain) - if !hasJump(fwRules, lbChain, "", "") { + if !hasJump(fwRules, lbChain, "", 0) { errorf(fmt.Sprintf("Failed to find jump from firewall chain %v to svc chain %v", fwChain, lbChain), fwRules, t) } - if hasJump(fwRules, string(KubeMarkMasqChain), "", "") { + if hasJump(fwRules, string(KubeMarkMasqChain), "", 0) { errorf(fmt.Sprintf("Found jump from fw chain %v to MASQUERADE", fwChain), fwRules, t) } lbRules := ipt.GetRules(lbChain) - if hasJump(lbRules, nonLocalEpChain, "", "") { + if hasJump(lbRules, nonLocalEpChain, "", 0) { errorf(fmt.Sprintf("Found jump from lb chain %v to non-local ep %v", lbChain, nonLocalEp), lbRules, t) } - if !hasJump(lbRules, localEpChain, "", "") { - errorf(fmt.Sprintf("Didn't find jump from lb chain %v to local ep %v", lbChain, nonLocalEp), lbRules, t) + if !hasJump(lbRules, localEpChain, "", 0) { + errorf(fmt.Sprintf("Didn't find jump from lb chain %v to local ep %v", lbChain, localEp), lbRules, t) } } @@ -847,16 +880,35 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable svcName := "svc1" svcIP := net.IPv4(10, 20, 30, 41) - svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "80"} + svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "p80"} svcInfo := newFakeServiceInfo(svc, svcIP, 80, api.ProtocolTCP, true) svcInfo.nodePort = 3001 fp.serviceMap[svc] = svcInfo - nonLocalEp := "10.180.0.1:80" - localEp := "10.180.2.1:80" - fp.endpointsMap[svc] = []*endpointsInfo{{nonLocalEp, false}, {localEp, true}} + ip1 := "10.180.0.1" + ip2 := "10.180.2.1" + port := 80 + nonLocalEp := fmt.Sprintf("%s:%d", ip1, port) + localEp := fmt.Sprintf("%s:%d", ip2, port) + allEndpoints := []api.Endpoints{ + makeTestEndpoints("ns1", svcName, func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: ip1, + NodeName: nil, + }, { + IP: ip2, + NodeName: strPtr(testHostname), + }}, + Ports: []api.EndpointPort{{ + Name: "p80", + Port: int32(port), + }}, + }} + }), + } - fp.syncProxyRules() + fp.OnEndpointsUpdate(allEndpoints) proto := strings.ToLower(string(api.ProtocolTCP)) lbChain := string(serviceLBChainName(svc, proto)) @@ -865,23 +917,23 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable localEpChain := string(servicePortEndpointChainName(svc, strings.ToLower(string(api.ProtocolTCP)), localEp)) kubeNodePortRules := ipt.GetRules(string(kubeNodePortsChain)) - if !hasJump(kubeNodePortRules, lbChain, "", fmt.Sprintf("%v", svcInfo.nodePort)) { + if !hasJump(kubeNodePortRules, lbChain, "", svcInfo.nodePort) { errorf(fmt.Sprintf("Failed to find jump to lb chain %v", lbChain), kubeNodePortRules, t) } svcChain := string(servicePortChainName(svc, strings.ToLower(string(api.ProtocolTCP)))) lbRules := ipt.GetRules(lbChain) - if hasJump(lbRules, nonLocalEpChain, "", "") { + if hasJump(lbRules, nonLocalEpChain, "", 0) { errorf(fmt.Sprintf("Found jump from lb chain %v to non-local ep %v", lbChain, nonLocalEp), lbRules, t) } - if hasJump(lbRules, svcChain, "", "") != shouldLBTOSVCRuleExist { + if hasJump(lbRules, svcChain, "", 0) != shouldLBTOSVCRuleExist { prefix := "Did not find " if !shouldLBTOSVCRuleExist { prefix = "Found " } errorf(fmt.Sprintf("%s jump from lb chain %v to svc %v", prefix, lbChain, svcChain), lbRules, t) } - if !hasJump(lbRules, localEpChain, "", "") { + if !hasJump(lbRules, localEpChain, "", 0) { errorf(fmt.Sprintf("Didn't find jump from lb chain %v to local ep %v", lbChain, nonLocalEp), lbRules, t) } } @@ -1172,4 +1224,880 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { } } +// This is a coarse test, but it offers some modicum of confidence as the code is evolved. +func Test_accumulateEndpointsMap(t *testing.T) { + testCases := []struct { + newEndpoints api.Endpoints + oldEndpoints map[proxy.ServicePortName][]*endpointsInfo + expectedNew map[proxy.ServicePortName][]*endpointsInfo + }{{ + // Case[0]: nothing + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{}, + }, { + // Case[1]: no changes, unnamed port + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "", + Port: 11, + }}, + }, + } + }), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", ""): { + {"1.1.1.1:11", false}, + }, + }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", ""): { + {"1.1.1.1:11", false}, + }, + }, + }, { + // Case[2]: no changes, named port + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "port", + Port: 11, + }}, + }, + } + }), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "port"): { + {"1.1.1.1:11", false}, + }, + }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "port"): { + {"1.1.1.1:11", false}, + }, + }, + }, { + // Case[3]: new port + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Port: 11, + }}, + }, + } + }), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", ""): {}, + }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", ""): { + {"1.1.1.1:11", false}, + }, + }, + }, { + // Case[4]: remove port + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", ""): { + {"1.1.1.1:11", false}, + }, + }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{}, + }, { + // Case[5]: new IP and port + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }, { + IP: "2.2.2.2", + }}, + Ports: []api.EndpointPort{{ + Name: "p1", + Port: 11, + }, { + Name: "p2", + Port: 22, + }}, + }, + } + }), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p1"): { + {"1.1.1.1:11", false}, + }, + }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p1"): { + {"1.1.1.1:11", false}, + {"2.2.2.2:11", false}, + }, + makeServicePortName("ns1", "ep1", "p2"): { + {"1.1.1.1:22", false}, + {"2.2.2.2:22", false}, + }, + }, + }, { + // Case[6]: remove IP and port + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "p1", + Port: 11, + }}, + }, + } + }), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p1"): { + {"1.1.1.1:11", false}, + {"2.2.2.2:11", false}, + }, + makeServicePortName("ns1", "ep1", "p2"): { + {"1.1.1.1:22", false}, + {"2.2.2.2:22", false}, + }, + }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p1"): { + {"1.1.1.1:11", false}, + }, + }, + }, { + // Case[7]: rename port + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "p2", + Port: 11, + }}, + }, + } + }), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p1"): { + {"1.1.1.1:11", false}, + }, + }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p2"): { + {"1.1.1.1:11", false}, + }, + }, + }, { + // Case[8]: renumber port + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "p1", + Port: 22, + }}, + }, + } + }), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p1"): { + {"1.1.1.1:11", false}, + }, + }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p1"): { + {"1.1.1.1:22", false}, + }, + }, + }} + + for tci, tc := range testCases { + // outputs + newEndpoints := map[proxy.ServicePortName][]*endpointsInfo{} + svcPortToInfoMap := map[proxy.ServicePortName][]hostPortInfo{} + accumulateEndpointsMap(&tc.newEndpoints, "host", tc.oldEndpoints, &newEndpoints, &svcPortToInfoMap) + + if len(newEndpoints) != len(tc.expectedNew) { + t.Errorf("[%d] expected %d new, got %d: %v", tci, len(tc.expectedNew), len(newEndpoints), spew.Sdump(newEndpoints)) + } + for x := range tc.expectedNew { + if len(newEndpoints[x]) != len(tc.expectedNew[x]) { + t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(tc.expectedNew[x]), x, len(newEndpoints[x])) + } else { + for i := range newEndpoints[x] { + if *(newEndpoints[x][i]) != *(tc.expectedNew[x][i]) { + t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, tc.expectedNew[x][i], *(newEndpoints[x][i])) + } + } + } + } + } +} + +func makeTestEndpoints(namespace, name string, eptFunc func(*api.Endpoints)) api.Endpoints { + ept := api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + } + eptFunc(&ept) + return ept +} + +func makeServicePortName(ns, name, port string) proxy.ServicePortName { + return proxy.ServicePortName{ + NamespacedName: types.NamespacedName{ + Namespace: ns, + Name: name, + }, + Port: port, + } +} + +func Test_updateEndpoints(t *testing.T) { + testCases := []struct { + newEndpoints []api.Endpoints + oldEndpoints map[proxy.ServicePortName][]*endpointsInfo + expectedResult map[proxy.ServicePortName][]*endpointsInfo + expectedStale []endpointServicePair + }{{ + // Case[0]: nothing + newEndpoints: []api.Endpoints{}, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedStale: []endpointServicePair{}, + }, { + // Case[1]: no change, unnamed port + newEndpoints: []api.Endpoints{ + makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Port: 11, + }}, + }} + }), + }, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", ""): { + {"1.1.1.1:11", false}, + }, + }, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", ""): { + {"1.1.1.1:11", false}, + }, + }, + expectedStale: []endpointServicePair{}, + }, { + // Case[2]: no change, named port + newEndpoints: []api.Endpoints{ + makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "p11", + Port: 11, + }}, + }} + }), + }, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + }, + }, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + }, + }, + expectedStale: []endpointServicePair{}, + }, { + // Case[3]: no change, multiple subsets + newEndpoints: []api.Endpoints{ + makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "p11", + Port: 11, + }}, + }, { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.2", + }}, + Ports: []api.EndpointPort{{ + Name: "p12", + Port: 12, + }}, + }} + }), + }, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + }, + makeServicePortName("ns1", "ep1", "p12"): { + {"1.1.1.2:12", false}, + }, + }, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + }, + makeServicePortName("ns1", "ep1", "p12"): { + {"1.1.1.2:12", false}, + }, + }, + expectedStale: []endpointServicePair{}, + }, { + // Case[4]: no change, multiple subsets, multiple ports + newEndpoints: []api.Endpoints{ + makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "p11", + Port: 11, + }, { + Name: "p12", + Port: 12, + }}, + }, { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.3", + }}, + Ports: []api.EndpointPort{{ + Name: "p13", + Port: 13, + }}, + }} + }), + }, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + }, + makeServicePortName("ns1", "ep1", "p12"): { + {"1.1.1.1:12", false}, + }, + makeServicePortName("ns1", "ep1", "p13"): { + {"1.1.1.3:13", false}, + }, + }, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + }, + makeServicePortName("ns1", "ep1", "p12"): { + {"1.1.1.1:12", false}, + }, + makeServicePortName("ns1", "ep1", "p13"): { + {"1.1.1.3:13", false}, + }, + }, + expectedStale: []endpointServicePair{}, + }, { + // Case[5]: no change, multiple endpoints, subsets, IPs, and ports + newEndpoints: []api.Endpoints{ + makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }, { + IP: "1.1.1.2", + }}, + Ports: []api.EndpointPort{{ + Name: "p11", + Port: 11, + }, { + Name: "p12", + Port: 12, + }}, + }, { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.3", + }, { + IP: "1.1.1.4", + }}, + Ports: []api.EndpointPort{{ + Name: "p13", + Port: 13, + }, { + Name: "p14", + Port: 14, + }}, + }} + }), + makeTestEndpoints("ns2", "ep2", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "2.2.2.1", + }, { + IP: "2.2.2.2", + }}, + Ports: []api.EndpointPort{{ + Name: "p21", + Port: 21, + }, { + Name: "p22", + Port: 22, + }}, + }} + }), + }, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + {"1.1.1.2:11", false}, + }, + makeServicePortName("ns1", "ep1", "p12"): { + {"1.1.1.1:12", false}, + {"1.1.1.2:12", false}, + }, + makeServicePortName("ns1", "ep1", "p13"): { + {"1.1.1.3:13", false}, + {"1.1.1.4:13", false}, + }, + makeServicePortName("ns1", "ep1", "p14"): { + {"1.1.1.3:14", false}, + {"1.1.1.4:14", false}, + }, + makeServicePortName("ns2", "ep2", "p21"): { + {"2.2.2.1:21", false}, + {"2.2.2.2:21", false}, + }, + makeServicePortName("ns2", "ep2", "p22"): { + {"2.2.2.1:22", false}, + {"2.2.2.2:22", false}, + }, + }, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + {"1.1.1.2:11", false}, + }, + makeServicePortName("ns1", "ep1", "p12"): { + {"1.1.1.1:12", false}, + {"1.1.1.2:12", false}, + }, + makeServicePortName("ns1", "ep1", "p13"): { + {"1.1.1.3:13", false}, + {"1.1.1.4:13", false}, + }, + makeServicePortName("ns1", "ep1", "p14"): { + {"1.1.1.3:14", false}, + {"1.1.1.4:14", false}, + }, + makeServicePortName("ns2", "ep2", "p21"): { + {"2.2.2.1:21", false}, + {"2.2.2.2:21", false}, + }, + makeServicePortName("ns2", "ep2", "p22"): { + {"2.2.2.1:22", false}, + {"2.2.2.2:22", false}, + }, + }, + expectedStale: []endpointServicePair{}, + }, { + // Case[6]: add an Endpoints + newEndpoints: []api.Endpoints{ + makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Port: 11, + }}, + }} + }), + }, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ /* empty */ }, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", ""): { + {"1.1.1.1:11", false}, + }, + }, + expectedStale: []endpointServicePair{}, + }, { + // Case[7]: remove an Endpoints + newEndpoints: []api.Endpoints{ /* empty */ }, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", ""): { + {"1.1.1.1:11", false}, + }, + }, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedStale: []endpointServicePair{{ + endpoint: "1.1.1.1:11", + servicePortName: makeServicePortName("ns1", "ep1", ""), + }}, + }, { + // Case[8]: add an IP and port + newEndpoints: []api.Endpoints{ + makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }, { + IP: "1.1.1.2", + }}, + Ports: []api.EndpointPort{{ + Name: "p11", + Port: 11, + }, { + Name: "p12", + Port: 12, + }}, + }} + }), + }, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + }, + }, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + {"1.1.1.2:11", false}, + }, + makeServicePortName("ns1", "ep1", "p12"): { + {"1.1.1.1:12", false}, + {"1.1.1.2:12", false}, + }, + }, + expectedStale: []endpointServicePair{}, + }, { + // Case[9]: remove an IP and port + newEndpoints: []api.Endpoints{ + makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "p11", + Port: 11, + }}, + }} + }), + }, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + {"1.1.1.2:11", false}, + }, + makeServicePortName("ns1", "ep1", "p12"): { + {"1.1.1.1:12", false}, + {"1.1.1.2:12", false}, + }, + }, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + }, + }, + expectedStale: []endpointServicePair{{ + endpoint: "1.1.1.2:11", + servicePortName: makeServicePortName("ns1", "ep1", "p11"), + }, { + endpoint: "1.1.1.1:12", + servicePortName: makeServicePortName("ns1", "ep1", "p12"), + }, { + endpoint: "1.1.1.2:12", + servicePortName: makeServicePortName("ns1", "ep1", "p12"), + }}, + }, { + // Case[10]: add a subset + newEndpoints: []api.Endpoints{ + makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "p11", + Port: 11, + }}, + }, { + Addresses: []api.EndpointAddress{{ + IP: "2.2.2.2", + }}, + Ports: []api.EndpointPort{{ + Name: "p22", + Port: 22, + }}, + }} + }), + }, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + }, + }, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + }, + makeServicePortName("ns1", "ep1", "p22"): { + {"2.2.2.2:22", false}, + }, + }, + expectedStale: []endpointServicePair{}, + }, { + // Case[11]: remove a subset + newEndpoints: []api.Endpoints{ + makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "p11", + Port: 11, + }}, + }} + }), + }, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + }, + makeServicePortName("ns1", "ep1", "p22"): { + {"2.2.2.2:22", false}, + }, + }, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + }, + }, + expectedStale: []endpointServicePair{{ + endpoint: "2.2.2.2:22", + servicePortName: makeServicePortName("ns1", "ep1", "p22"), + }}, + }, { + // Case[12]: rename a port + newEndpoints: []api.Endpoints{ + makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "p11-2", + Port: 11, + }}, + }} + }), + }, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + }, + }, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11-2"): { + {"1.1.1.1:11", false}, + }, + }, + expectedStale: []endpointServicePair{{ + endpoint: "1.1.1.1:11", + servicePortName: makeServicePortName("ns1", "ep1", "p11"), + }}, + }, { + // Case[13]: renumber a port + newEndpoints: []api.Endpoints{ + makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "p11", + Port: 22, + }}, + }} + }), + }, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + }, + }, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:22", false}, + }, + }, + expectedStale: []endpointServicePair{{ + endpoint: "1.1.1.1:11", + servicePortName: makeServicePortName("ns1", "ep1", "p11"), + }}, + }, { + // Case[14]: complex add and remove + newEndpoints: []api.Endpoints{ + makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }, { + IP: "1.1.1.11", + }}, + Ports: []api.EndpointPort{{ + Name: "p11", + Port: 11, + }}, + }, { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.2", + }}, + Ports: []api.EndpointPort{{ + Name: "p12", + Port: 12, + }, { + Name: "p122", + Port: 122, + }}, + }} + }), + makeTestEndpoints("ns3", "ep3", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "3.3.3.3", + }}, + Ports: []api.EndpointPort{{ + Name: "p33", + Port: 33, + }}, + }} + }), + makeTestEndpoints("ns4", "ep4", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "4.4.4.4", + }}, + Ports: []api.EndpointPort{{ + Name: "p44", + Port: 44, + }}, + }} + }), + }, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + }, + makeServicePortName("ns2", "ep2", "p22"): { + {"2.2.2.2:22", false}, + {"2.2.2.22:22", false}, + }, + makeServicePortName("ns2", "ep2", "p23"): { + {"2.2.2.3:23", false}, + }, + makeServicePortName("ns4", "ep4", "p44"): { + {"4.4.4.4:44", false}, + {"4.4.4.5:44", false}, + }, + makeServicePortName("ns4", "ep4", "p45"): { + {"4.4.4.6:45", false}, + }, + }, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + {"1.1.1.11:11", false}, + }, + makeServicePortName("ns1", "ep1", "p12"): { + {"1.1.1.2:12", false}, + }, + makeServicePortName("ns1", "ep1", "p122"): { + {"1.1.1.2:122", false}, + }, + makeServicePortName("ns3", "ep3", "p33"): { + {"3.3.3.3:33", false}, + }, + makeServicePortName("ns4", "ep4", "p44"): { + {"4.4.4.4:44", false}, + }, + }, + expectedStale: []endpointServicePair{{ + endpoint: "2.2.2.2:22", + servicePortName: makeServicePortName("ns2", "ep2", "p22"), + }, { + endpoint: "2.2.2.22:22", + servicePortName: makeServicePortName("ns2", "ep2", "p22"), + }, { + endpoint: "2.2.2.3:23", + servicePortName: makeServicePortName("ns2", "ep2", "p23"), + }, { + endpoint: "4.4.4.5:44", + servicePortName: makeServicePortName("ns4", "ep4", "p44"), + }, { + endpoint: "4.4.4.6:45", + servicePortName: makeServicePortName("ns4", "ep4", "p45"), + }}, + }} + + for tci, tc := range testCases { + newMap, stale := updateEndpoints(tc.newEndpoints, tc.oldEndpoints, "host", fakeHealthChecker{}) + if len(newMap) != len(tc.expectedResult) { + t.Errorf("[%d] expected %d results, got %d: %v", tci, len(tc.expectedResult), len(newMap), newMap) + } + for x := range tc.expectedResult { + if len(newMap[x]) != len(tc.expectedResult[x]) { + t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(tc.expectedResult[x]), x, len(newMap[x])) + } else { + for i := range tc.expectedResult[x] { + if *(newMap[x][i]) != *(tc.expectedResult[x][i]) { + t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, tc.expectedResult[x][i], newMap[x][i]) + } + } + } + } + if len(stale) != len(tc.expectedStale) { + t.Errorf("[%d] expected %d stale, got %d: %v", tci, len(tc.expectedStale), len(stale), stale) + } + for _, x := range tc.expectedStale { + if stale[x] != true { + t.Errorf("[%d] expected stale[%v], but didn't find it: %v", tci, x, stale) + } + } + } +} + // TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces.