mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #44019 from thockin/proxy-defer-on-update-events-2
Automatic merge from submit-queue (batch tested with PRs 44019, 42225) Move On*Update handling into sync funcion
This commit is contained in:
commit
94836a52f7
@ -154,9 +154,18 @@ type endpointsInfo struct {
|
||||
isLocal bool
|
||||
}
|
||||
|
||||
func (e *endpointsInfo) String() string {
|
||||
return fmt.Sprintf("%v", *e)
|
||||
}
|
||||
|
||||
// returns a new serviceInfo struct
|
||||
func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, service *api.Service) *serviceInfo {
|
||||
onlyNodeLocalEndpoints := apiservice.NeedsHealthCheck(service) && utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) && (service.Spec.Type == api.ServiceTypeLoadBalancer || service.Spec.Type == api.ServiceTypeNodePort)
|
||||
onlyNodeLocalEndpoints := false
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) &&
|
||||
(service.Spec.Type == api.ServiceTypeLoadBalancer || service.Spec.Type == api.ServiceTypeNodePort) &&
|
||||
apiservice.NeedsHealthCheck(service) {
|
||||
onlyNodeLocalEndpoints = true
|
||||
}
|
||||
info := &serviceInfo{
|
||||
clusterIP: net.ParseIP(service.Spec.ClusterIP),
|
||||
port: int(port.Port),
|
||||
@ -519,22 +528,7 @@ func (proxier *Proxier) OnServiceUpdate(allServices []*api.Service) {
|
||||
glog.V(2).Info("Received first Services update")
|
||||
}
|
||||
proxier.allServices = allServices
|
||||
|
||||
newServiceMap, hcPorts, staleUDPServices := buildNewServiceMap(allServices, proxier.serviceMap)
|
||||
|
||||
// update healthcheck ports
|
||||
if err := proxier.healthChecker.SyncServices(hcPorts); err != nil {
|
||||
glog.Errorf("Error syncing healtcheck ports: %v", err)
|
||||
}
|
||||
|
||||
if len(newServiceMap) != len(proxier.serviceMap) || !reflect.DeepEqual(newServiceMap, proxier.serviceMap) {
|
||||
proxier.serviceMap = newServiceMap
|
||||
proxier.syncProxyRules(syncReasonServices)
|
||||
} else {
|
||||
glog.V(4).Infof("Skipping proxy iptables rule sync on service update because nothing changed")
|
||||
}
|
||||
|
||||
utilproxy.DeleteServiceConnections(proxier.exec, staleUDPServices.List())
|
||||
proxier.syncProxyRules(syncReasonServices)
|
||||
}
|
||||
|
||||
// OnEndpointsUpdate takes in a slice of updated endpoints.
|
||||
@ -545,23 +539,7 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []*api.Endpoints) {
|
||||
glog.V(2).Info("Received first Endpoints update")
|
||||
}
|
||||
proxier.allEndpoints = allEndpoints
|
||||
|
||||
// TODO: once service has made this same transform, move this into proxier.syncProxyRules()
|
||||
newMap, hcEndpoints, staleConnections := buildNewEndpointsMap(proxier.allEndpoints, proxier.endpointsMap, proxier.hostname)
|
||||
|
||||
// update healthcheck endpoints
|
||||
if err := proxier.healthChecker.SyncEndpoints(hcEndpoints); err != nil {
|
||||
glog.Errorf("Error syncing healthcheck endoints: %v", err)
|
||||
}
|
||||
|
||||
if len(newMap) != len(proxier.endpointsMap) || !reflect.DeepEqual(newMap, proxier.endpointsMap) {
|
||||
proxier.endpointsMap = newMap
|
||||
proxier.syncProxyRules(syncReasonEndpoints)
|
||||
} else {
|
||||
glog.V(4).Infof("Skipping proxy iptables rule sync on endpoint update because nothing changed")
|
||||
}
|
||||
|
||||
proxier.deleteEndpointConnections(staleConnections)
|
||||
proxier.syncProxyRules(syncReasonEndpoints)
|
||||
}
|
||||
|
||||
// Convert a slice of api.Endpoints objects into a map of service-port -> endpoints.
|
||||
@ -574,7 +552,7 @@ func buildNewEndpointsMap(allEndpoints []*api.Endpoints, curMap proxyEndpointMap
|
||||
|
||||
// Update endpoints for services.
|
||||
for i := range allEndpoints {
|
||||
accumulateEndpointsMap(allEndpoints[i], hostname, curMap, &newMap)
|
||||
accumulateEndpointsMap(allEndpoints[i], hostname, &newMap)
|
||||
}
|
||||
// Check stale connections against endpoints missing from the update.
|
||||
// TODO: we should really only mark a connection stale if the proto was UDP
|
||||
@ -632,10 +610,7 @@ func buildNewEndpointsMap(allEndpoints []*api.Endpoints, curMap proxyEndpointMap
|
||||
// - hostPortInfo and endpointsInfo overlap too much
|
||||
// - the test for this is overlapped by the test for buildNewEndpointsMap
|
||||
// - naming is poor and responsibilities are muddled
|
||||
func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string,
|
||||
curEndpoints proxyEndpointMap,
|
||||
newEndpoints *proxyEndpointMap) {
|
||||
|
||||
func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, newEndpoints *proxyEndpointMap) {
|
||||
// We need to build a map of portname -> all ip:ports for that
|
||||
// portname. Explode Endpoints.Subsets[*] into this structure.
|
||||
for i := range endpoints.Subsets {
|
||||
@ -761,6 +736,25 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
|
||||
glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master")
|
||||
return
|
||||
}
|
||||
|
||||
// Figure out the new services we need to activate.
|
||||
newServices, hcServices, staleServices := buildNewServiceMap(proxier.allServices, proxier.serviceMap)
|
||||
|
||||
// If this was called because of a services update, but nothing actionable has changed, skip it.
|
||||
if reason == syncReasonServices && reflect.DeepEqual(newServices, proxier.serviceMap) {
|
||||
glog.V(3).Infof("Skipping iptables sync because nothing changed")
|
||||
return
|
||||
}
|
||||
|
||||
// Figure out the new endpoints we need to activate.
|
||||
newEndpoints, hcEndpoints, staleEndpoints := buildNewEndpointsMap(proxier.allEndpoints, proxier.endpointsMap, proxier.hostname)
|
||||
|
||||
// If this was called because of an endpoints update, but nothing actionable has changed, skip it.
|
||||
if reason == syncReasonEndpoints && reflect.DeepEqual(newEndpoints, proxier.endpointsMap) {
|
||||
glog.V(3).Infof("Skipping iptables sync because nothing changed")
|
||||
return
|
||||
}
|
||||
|
||||
glog.V(3).Infof("Syncing iptables rules")
|
||||
|
||||
// Create and link the kube services chain.
|
||||
@ -891,7 +885,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
|
||||
replacementPortsMap := map[localPort]closeable{}
|
||||
|
||||
// Build rules for each service.
|
||||
for svcName, svcInfo := range proxier.serviceMap {
|
||||
for svcName, svcInfo := range newServices {
|
||||
protocol := strings.ToLower(string(svcInfo.protocol))
|
||||
|
||||
// Create the per-service chain, retaining counters if possible.
|
||||
@ -1082,7 +1076,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
|
||||
continue
|
||||
}
|
||||
if lp.protocol == "udp" {
|
||||
proxier.clearUdpConntrackForPort(lp.port)
|
||||
proxier.clearUDPConntrackForPort(lp.port)
|
||||
}
|
||||
replacementPortsMap[lp] = socket
|
||||
} // We're holding the port, so it's OK to install iptables rules.
|
||||
@ -1108,7 +1102,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
|
||||
// table doesn't currently have the same per-service structure that
|
||||
// the nat table does, so we just stick this into the kube-services
|
||||
// chain.
|
||||
if len(proxier.endpointsMap[svcName]) == 0 {
|
||||
if len(newEndpoints[svcName]) == 0 {
|
||||
writeLine(filterRules,
|
||||
"-A", string(kubeServicesChain),
|
||||
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcName.String()),
|
||||
@ -1121,7 +1115,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
|
||||
}
|
||||
|
||||
// If the service has no endpoints then reject packets.
|
||||
if len(proxier.endpointsMap[svcName]) == 0 {
|
||||
if len(newEndpoints[svcName]) == 0 {
|
||||
writeLine(filterRules,
|
||||
"-A", string(kubeServicesChain),
|
||||
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcName.String()),
|
||||
@ -1140,7 +1134,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
|
||||
// These two slices parallel each other - keep in sync
|
||||
endpoints := make([]*endpointsInfo, 0)
|
||||
endpointChains := make([]utiliptables.Chain, 0)
|
||||
for _, ep := range proxier.endpointsMap[svcName] {
|
||||
for _, ep := range newEndpoints[svcName] {
|
||||
endpoints = append(endpoints, ep)
|
||||
endpointChain := servicePortEndpointChainName(svcName, protocol, ep.endpoint)
|
||||
endpointChains = append(endpointChains, endpointChain)
|
||||
@ -1317,6 +1311,24 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
|
||||
}
|
||||
}
|
||||
proxier.portsMap = replacementPortsMap
|
||||
|
||||
// Update healthchecks. The endpoints list might include services that are
|
||||
// not "OnlyLocal", but the services list will not, and the healthChecker
|
||||
// will just drop those endpoints.
|
||||
if err := proxier.healthChecker.SyncServices(hcServices); err != nil {
|
||||
glog.Errorf("Error syncing healtcheck services: %v", err)
|
||||
}
|
||||
if err := proxier.healthChecker.SyncEndpoints(hcEndpoints); err != nil {
|
||||
glog.Errorf("Error syncing healthcheck endoints: %v", err)
|
||||
}
|
||||
|
||||
// Finish housekeeping.
|
||||
proxier.serviceMap = newServices
|
||||
proxier.endpointsMap = newEndpoints
|
||||
|
||||
// TODO: these and clearUDPConntrackForPort() could be made more consistent.
|
||||
utilproxy.DeleteServiceConnections(proxier.exec, staleServices.List())
|
||||
proxier.deleteEndpointConnections(staleEndpoints)
|
||||
}
|
||||
|
||||
// Clear UDP conntrack for port or all conntrack entries when port equal zero.
|
||||
@ -1324,7 +1336,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
|
||||
// The solution is clearing the conntrack. Known issus:
|
||||
// https://github.com/docker/docker/issues/8795
|
||||
// https://github.com/kubernetes/kubernetes/issues/31983
|
||||
func (proxier *Proxier) clearUdpConntrackForPort(port int) {
|
||||
func (proxier *Proxier) clearUDPConntrackForPort(port int) {
|
||||
glog.V(2).Infof("Deleting conntrack entries for udp connections")
|
||||
if port > 0 {
|
||||
err := utilproxy.ExecConntrackTool(proxier.exec, "-D", "-p", "udp", "--dport", strconv.Itoa(port))
|
||||
|
@ -551,7 +551,7 @@ func hasDNAT(rules []iptablestest.Rule, endpoint string) bool {
|
||||
|
||||
func errorf(msg string, rules []iptablestest.Rule, t *testing.T) {
|
||||
for _, r := range rules {
|
||||
t.Logf("%v", r)
|
||||
t.Logf("%q", r)
|
||||
}
|
||||
t.Errorf("%v", msg)
|
||||
}
|
||||
@ -559,56 +559,80 @@ func errorf(msg string, rules []iptablestest.Rule, t *testing.T) {
|
||||
func TestClusterIPReject(t *testing.T) {
|
||||
ipt := iptablestest.NewFake()
|
||||
fp := NewFakeProxier(ipt)
|
||||
svcName := "svc1"
|
||||
svcIP := net.IPv4(10, 20, 30, 41)
|
||||
svcIP := "10.20.30.41"
|
||||
svcPort := 80
|
||||
svcPortName := proxy.ServicePortName{
|
||||
NamespacedName: makeNSN("ns1", "svc1"),
|
||||
Port: "p80",
|
||||
}
|
||||
|
||||
svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "p80"}
|
||||
fp.serviceMap[svc] = newFakeServiceInfo(svc, svcIP, 80, api.ProtocolTCP, false)
|
||||
fp.allServices = []*api.Service{
|
||||
makeTestService(svcPortName.Namespace, svcPortName.Namespace, func(svc *api.Service) {
|
||||
svc.Spec.ClusterIP = svcIP
|
||||
svc.Spec.Ports = []api.ServicePort{{
|
||||
Name: svcPortName.Port,
|
||||
Port: int32(svcPort),
|
||||
Protocol: api.ProtocolTCP,
|
||||
}}
|
||||
}),
|
||||
}
|
||||
fp.syncProxyRules(syncReasonForce)
|
||||
|
||||
svcChain := string(servicePortChainName(svc, strings.ToLower(string(api.ProtocolTCP))))
|
||||
svcChain := string(servicePortChainName(svcPortName, strings.ToLower(string(api.ProtocolTCP))))
|
||||
svcRules := ipt.GetRules(svcChain)
|
||||
if len(svcRules) != 0 {
|
||||
errorf(fmt.Sprintf("Unexpected rule for chain %v service %v without endpoints", svcChain, svcName), svcRules, t)
|
||||
errorf(fmt.Sprintf("Unexpected rule for chain %v service %v without endpoints", svcChain, svcPortName), svcRules, t)
|
||||
}
|
||||
kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
|
||||
if !hasJump(kubeSvcRules, iptablestest.Reject, svcIP.String(), 80) {
|
||||
errorf(fmt.Sprintf("Failed to find a %v rule for service %v with no endpoints", iptablestest.Reject, svcName), kubeSvcRules, t)
|
||||
if !hasJump(kubeSvcRules, iptablestest.Reject, svcIP, svcPort) {
|
||||
errorf(fmt.Sprintf("Failed to find a %v rule for service %v with no endpoints", iptablestest.Reject, svcPortName), kubeSvcRules, t)
|
||||
}
|
||||
}
|
||||
|
||||
func TestClusterIPEndpointsJump(t *testing.T) {
|
||||
ipt := iptablestest.NewFake()
|
||||
fp := NewFakeProxier(ipt)
|
||||
svcName := "svc1"
|
||||
svcIP := net.IPv4(10, 20, 30, 41)
|
||||
svcIP := "10.20.30.41"
|
||||
svcPort := 80
|
||||
svcPortName := proxy.ServicePortName{
|
||||
NamespacedName: makeNSN("ns1", "svc1"),
|
||||
Port: "p80",
|
||||
}
|
||||
|
||||
svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "p80"}
|
||||
fp.serviceMap[svc] = newFakeServiceInfo(svc, svcIP, 80, api.ProtocolTCP, true)
|
||||
ip := "10.180.0.1"
|
||||
port := 80
|
||||
ep := fmt.Sprintf("%s:%d", ip, port)
|
||||
allEndpoints := []*api.Endpoints{
|
||||
makeTestEndpoints("ns1", svcName, func(ept *api.Endpoints) {
|
||||
fp.allServices = []*api.Service{
|
||||
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) {
|
||||
svc.Spec.ClusterIP = svcIP
|
||||
svc.Spec.Ports = []api.ServicePort{{
|
||||
Name: svcPortName.Port,
|
||||
Port: int32(svcPort),
|
||||
Protocol: api.ProtocolTCP,
|
||||
}}
|
||||
}),
|
||||
}
|
||||
|
||||
epIP := "10.180.0.1"
|
||||
fp.allEndpoints = []*api.Endpoints{
|
||||
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) {
|
||||
ept.Subsets = []api.EndpointSubset{{
|
||||
Addresses: []api.EndpointAddress{{
|
||||
IP: ip,
|
||||
IP: epIP,
|
||||
}},
|
||||
Ports: []api.EndpointPort{{
|
||||
Name: "p80",
|
||||
Port: int32(port),
|
||||
Name: svcPortName.Port,
|
||||
Port: int32(svcPort),
|
||||
}},
|
||||
}}
|
||||
}),
|
||||
}
|
||||
|
||||
fp.OnEndpointsUpdate(allEndpoints)
|
||||
fp.syncProxyRules(syncReasonForce)
|
||||
|
||||
svcChain := string(servicePortChainName(svc, strings.ToLower(string(api.ProtocolTCP))))
|
||||
epChain := string(servicePortEndpointChainName(svc, strings.ToLower(string(api.ProtocolTCP)), ep))
|
||||
epStr := fmt.Sprintf("%s:%d", epIP, svcPort)
|
||||
svcChain := string(servicePortChainName(svcPortName, strings.ToLower(string(api.ProtocolTCP))))
|
||||
epChain := string(servicePortEndpointChainName(svcPortName, strings.ToLower(string(api.ProtocolTCP)), epStr))
|
||||
|
||||
kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
|
||||
if !hasJump(kubeSvcRules, svcChain, svcIP.String(), 80) {
|
||||
if !hasJump(kubeSvcRules, svcChain, svcIP, svcPort) {
|
||||
errorf(fmt.Sprintf("Failed to find jump from KUBE-SERVICES to %v chain", svcChain), kubeSvcRules, t)
|
||||
}
|
||||
|
||||
@ -617,40 +641,49 @@ func TestClusterIPEndpointsJump(t *testing.T) {
|
||||
errorf(fmt.Sprintf("Failed to jump to ep chain %v", epChain), svcRules, t)
|
||||
}
|
||||
epRules := ipt.GetRules(epChain)
|
||||
if !hasDNAT(epRules, ep) {
|
||||
errorf(fmt.Sprintf("Endpoint chain %v lacks DNAT to %v", epChain, ep), epRules, t)
|
||||
if !hasDNAT(epRules, epStr) {
|
||||
errorf(fmt.Sprintf("Endpoint chain %v lacks DNAT to %v", epChain, epStr), epRules, t)
|
||||
}
|
||||
}
|
||||
|
||||
func typeLoadBalancer(svcInfo *serviceInfo) *serviceInfo {
|
||||
svcInfo.nodePort = 3001
|
||||
svcInfo.loadBalancerStatus = api.LoadBalancerStatus{
|
||||
Ingress: []api.LoadBalancerIngress{{IP: "1.2.3.4"}},
|
||||
}
|
||||
return svcInfo
|
||||
}
|
||||
|
||||
func TestLoadBalancer(t *testing.T) {
|
||||
ipt := iptablestest.NewFake()
|
||||
fp := NewFakeProxier(ipt)
|
||||
svcName := "svc1"
|
||||
svcIP := net.IPv4(10, 20, 30, 41)
|
||||
svcIP := "10.20.30.41"
|
||||
svcPort := 80
|
||||
svcNodePort := 3001
|
||||
svcLBIP := "1.2.3.4"
|
||||
svcPortName := proxy.ServicePortName{
|
||||
NamespacedName: makeNSN("ns1", "svc1"),
|
||||
Port: "p80",
|
||||
}
|
||||
|
||||
svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "p80"}
|
||||
svcInfo := newFakeServiceInfo(svc, svcIP, 80, api.ProtocolTCP, false)
|
||||
fp.serviceMap[svc] = typeLoadBalancer(svcInfo)
|
||||
fp.allServices = []*api.Service{
|
||||
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) {
|
||||
svc.Spec.Type = "LoadBalancer"
|
||||
svc.Spec.ClusterIP = svcIP
|
||||
svc.Spec.Ports = []api.ServicePort{{
|
||||
Name: svcPortName.Port,
|
||||
Port: int32(svcPort),
|
||||
Protocol: api.ProtocolTCP,
|
||||
NodePort: int32(svcNodePort),
|
||||
}}
|
||||
svc.Status.LoadBalancer.Ingress = []api.LoadBalancerIngress{{
|
||||
IP: svcLBIP,
|
||||
}}
|
||||
}),
|
||||
}
|
||||
|
||||
ip := "10.180.0.1"
|
||||
port := 80
|
||||
epIP := "10.180.0.1"
|
||||
fp.allEndpoints = []*api.Endpoints{
|
||||
makeTestEndpoints("ns1", svcName, func(ept *api.Endpoints) {
|
||||
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) {
|
||||
ept.Subsets = []api.EndpointSubset{{
|
||||
Addresses: []api.EndpointAddress{{
|
||||
IP: ip,
|
||||
IP: epIP,
|
||||
}},
|
||||
Ports: []api.EndpointPort{{
|
||||
Name: "p80",
|
||||
Port: int32(port),
|
||||
Name: svcPortName.Port,
|
||||
Port: int32(svcPort),
|
||||
}},
|
||||
}}
|
||||
}),
|
||||
@ -659,12 +692,12 @@ func TestLoadBalancer(t *testing.T) {
|
||||
fp.syncProxyRules(syncReasonForce)
|
||||
|
||||
proto := strings.ToLower(string(api.ProtocolTCP))
|
||||
fwChain := string(serviceFirewallChainName(svc, proto))
|
||||
svcChain := string(servicePortChainName(svc, strings.ToLower(string(api.ProtocolTCP))))
|
||||
//lbChain := string(serviceLBChainName(svc, proto))
|
||||
fwChain := string(serviceFirewallChainName(svcPortName, proto))
|
||||
svcChain := string(servicePortChainName(svcPortName, proto))
|
||||
//lbChain := string(serviceLBChainName(svcPortName, proto))
|
||||
|
||||
kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
|
||||
if !hasJump(kubeSvcRules, fwChain, svcInfo.loadBalancerStatus.Ingress[0].IP, 80) {
|
||||
if !hasJump(kubeSvcRules, fwChain, svcLBIP, svcPort) {
|
||||
errorf(fmt.Sprintf("Failed to find jump to firewall chain %v", fwChain), kubeSvcRules, t)
|
||||
}
|
||||
|
||||
@ -677,25 +710,37 @@ func TestLoadBalancer(t *testing.T) {
|
||||
func TestNodePort(t *testing.T) {
|
||||
ipt := iptablestest.NewFake()
|
||||
fp := NewFakeProxier(ipt)
|
||||
svcName := "svc1"
|
||||
svcIP := net.IPv4(10, 20, 30, 41)
|
||||
svcIP := "10.20.30.41"
|
||||
svcPort := 80
|
||||
svcNodePort := 3001
|
||||
svcPortName := proxy.ServicePortName{
|
||||
NamespacedName: makeNSN("ns1", "svc1"),
|
||||
Port: "p80",
|
||||
}
|
||||
|
||||
svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "p80"}
|
||||
svcInfo := newFakeServiceInfo(svc, svcIP, 80, api.ProtocolTCP, false)
|
||||
svcInfo.nodePort = 3001
|
||||
fp.serviceMap[svc] = svcInfo
|
||||
fp.allServices = []*api.Service{
|
||||
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) {
|
||||
svc.Spec.Type = "NodePort"
|
||||
svc.Spec.ClusterIP = svcIP
|
||||
svc.Spec.Ports = []api.ServicePort{{
|
||||
Name: svcPortName.Port,
|
||||
Port: int32(svcPort),
|
||||
Protocol: api.ProtocolTCP,
|
||||
NodePort: int32(svcNodePort),
|
||||
}}
|
||||
}),
|
||||
}
|
||||
|
||||
ip := "10.180.0.1"
|
||||
port := 80
|
||||
epIP := "10.180.0.1"
|
||||
fp.allEndpoints = []*api.Endpoints{
|
||||
makeTestEndpoints("ns1", svcName, func(ept *api.Endpoints) {
|
||||
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) {
|
||||
ept.Subsets = []api.EndpointSubset{{
|
||||
Addresses: []api.EndpointAddress{{
|
||||
IP: ip,
|
||||
IP: epIP,
|
||||
}},
|
||||
Ports: []api.EndpointPort{{
|
||||
Name: "p80",
|
||||
Port: int32(port),
|
||||
Name: svcPortName.Port,
|
||||
Port: int32(svcPort),
|
||||
}},
|
||||
}}
|
||||
}),
|
||||
@ -704,10 +749,10 @@ func TestNodePort(t *testing.T) {
|
||||
fp.syncProxyRules(syncReasonForce)
|
||||
|
||||
proto := strings.ToLower(string(api.ProtocolTCP))
|
||||
svcChain := string(servicePortChainName(svc, strings.ToLower(proto)))
|
||||
svcChain := string(servicePortChainName(svcPortName, proto))
|
||||
|
||||
kubeNodePortRules := ipt.GetRules(string(kubeNodePortsChain))
|
||||
if !hasJump(kubeNodePortRules, svcChain, "", svcInfo.nodePort) {
|
||||
if !hasJump(kubeNodePortRules, svcChain, "", svcNodePort) {
|
||||
errorf(fmt.Sprintf("Failed to find jump to svc chain %v", svcChain), kubeNodePortRules, t)
|
||||
}
|
||||
}
|
||||
@ -715,19 +760,32 @@ func TestNodePort(t *testing.T) {
|
||||
func TestNodePortReject(t *testing.T) {
|
||||
ipt := iptablestest.NewFake()
|
||||
fp := NewFakeProxier(ipt)
|
||||
svcName := "svc1"
|
||||
svcIP := net.IPv4(10, 20, 30, 41)
|
||||
svcIP := "10.20.30.41"
|
||||
svcPort := 80
|
||||
svcNodePort := 3001
|
||||
svcPortName := proxy.ServicePortName{
|
||||
NamespacedName: makeNSN("ns1", "svc1"),
|
||||
Port: "p80",
|
||||
}
|
||||
|
||||
svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "p80"}
|
||||
svcInfo := newFakeServiceInfo(svc, svcIP, 80, api.ProtocolTCP, false)
|
||||
svcInfo.nodePort = 3001
|
||||
fp.serviceMap[svc] = svcInfo
|
||||
fp.allServices = []*api.Service{
|
||||
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) {
|
||||
svc.Spec.Type = "NodePort"
|
||||
svc.Spec.ClusterIP = svcIP
|
||||
svc.Spec.Ports = []api.ServicePort{{
|
||||
Name: svcPortName.Port,
|
||||
Port: int32(svcPort),
|
||||
Protocol: api.ProtocolTCP,
|
||||
NodePort: int32(svcNodePort),
|
||||
}}
|
||||
}),
|
||||
}
|
||||
|
||||
fp.syncProxyRules(syncReasonForce)
|
||||
|
||||
kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
|
||||
if !hasJump(kubeSvcRules, iptablestest.Reject, svcIP.String(), 3001) {
|
||||
errorf(fmt.Sprintf("Failed to find a %v rule for service %v with no endpoints", iptablestest.Reject, svcName), kubeSvcRules, t)
|
||||
if !hasJump(kubeSvcRules, iptablestest.Reject, svcIP, svcNodePort) {
|
||||
errorf(fmt.Sprintf("Failed to find a %v rule for service %v with no endpoints", iptablestest.Reject, svcPortName), kubeSvcRules, t)
|
||||
}
|
||||
}
|
||||
|
||||
@ -738,47 +796,65 @@ func strPtr(s string) *string {
|
||||
func TestOnlyLocalLoadBalancing(t *testing.T) {
|
||||
ipt := iptablestest.NewFake()
|
||||
fp := NewFakeProxier(ipt)
|
||||
svcName := "svc1"
|
||||
svcIP := net.IPv4(10, 20, 30, 41)
|
||||
svcIP := "10.20.30.41"
|
||||
svcPort := 80
|
||||
svcNodePort := 3001
|
||||
svcLBIP := "1.2.3.4"
|
||||
svcPortName := proxy.ServicePortName{
|
||||
NamespacedName: makeNSN("ns1", "svc1"),
|
||||
Port: "p80",
|
||||
}
|
||||
|
||||
svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "p80"}
|
||||
svcInfo := newFakeServiceInfo(svc, svcIP, 80, api.ProtocolTCP, true)
|
||||
fp.serviceMap[svc] = typeLoadBalancer(svcInfo)
|
||||
fp.allServices = []*api.Service{
|
||||
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) {
|
||||
svc.Spec.Type = "LoadBalancer"
|
||||
svc.Spec.ClusterIP = svcIP
|
||||
svc.Spec.Ports = []api.ServicePort{{
|
||||
Name: svcPortName.Port,
|
||||
Port: int32(svcPort),
|
||||
Protocol: api.ProtocolTCP,
|
||||
NodePort: int32(svcNodePort),
|
||||
}}
|
||||
svc.Status.LoadBalancer.Ingress = []api.LoadBalancerIngress{{
|
||||
IP: svcLBIP,
|
||||
}}
|
||||
svc.Annotations[service.BetaAnnotationExternalTraffic] = service.AnnotationValueExternalTrafficLocal
|
||||
}),
|
||||
}
|
||||
|
||||
ip1 := "10.180.0.1"
|
||||
ip2 := "10.180.2.1"
|
||||
port := 80
|
||||
nonLocalEp := fmt.Sprintf("%s:%d", ip1, port)
|
||||
localEp := fmt.Sprintf("%s:%d", ip2, port)
|
||||
allEndpoints := []*api.Endpoints{
|
||||
makeTestEndpoints("ns1", svcName, func(ept *api.Endpoints) {
|
||||
epIP1 := "10.180.0.1"
|
||||
epIP2 := "10.180.2.1"
|
||||
epStrLocal := fmt.Sprintf("%s:%d", epIP1, svcPort)
|
||||
epStrNonLocal := fmt.Sprintf("%s:%d", epIP2, svcPort)
|
||||
fp.allEndpoints = []*api.Endpoints{
|
||||
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) {
|
||||
ept.Subsets = []api.EndpointSubset{{
|
||||
Addresses: []api.EndpointAddress{{
|
||||
IP: ip1,
|
||||
IP: epIP1,
|
||||
NodeName: nil,
|
||||
}, {
|
||||
IP: ip2,
|
||||
IP: epIP2,
|
||||
NodeName: strPtr(testHostname),
|
||||
}},
|
||||
Ports: []api.EndpointPort{{
|
||||
Name: "p80",
|
||||
Port: int32(port),
|
||||
Name: svcPortName.Port,
|
||||
Port: int32(svcPort),
|
||||
}},
|
||||
}}
|
||||
}),
|
||||
}
|
||||
|
||||
fp.OnEndpointsUpdate(allEndpoints)
|
||||
fp.syncProxyRules(syncReasonForce)
|
||||
|
||||
proto := strings.ToLower(string(api.ProtocolTCP))
|
||||
fwChain := string(serviceFirewallChainName(svc, proto))
|
||||
lbChain := string(serviceLBChainName(svc, proto))
|
||||
fwChain := string(serviceFirewallChainName(svcPortName, proto))
|
||||
lbChain := string(serviceLBChainName(svcPortName, proto))
|
||||
|
||||
nonLocalEpChain := string(servicePortEndpointChainName(svc, strings.ToLower(string(api.ProtocolTCP)), nonLocalEp))
|
||||
localEpChain := string(servicePortEndpointChainName(svc, strings.ToLower(string(api.ProtocolTCP)), localEp))
|
||||
nonLocalEpChain := string(servicePortEndpointChainName(svcPortName, strings.ToLower(string(api.ProtocolTCP)), epStrLocal))
|
||||
localEpChain := string(servicePortEndpointChainName(svcPortName, strings.ToLower(string(api.ProtocolTCP)), epStrNonLocal))
|
||||
|
||||
kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
|
||||
if !hasJump(kubeSvcRules, fwChain, svcInfo.loadBalancerStatus.Ingress[0].IP, 0) {
|
||||
if !hasJump(kubeSvcRules, fwChain, svcLBIP, svcPort) {
|
||||
errorf(fmt.Sprintf("Failed to find jump to firewall chain %v", fwChain), kubeSvcRules, t)
|
||||
}
|
||||
|
||||
@ -792,10 +868,10 @@ func TestOnlyLocalLoadBalancing(t *testing.T) {
|
||||
|
||||
lbRules := ipt.GetRules(lbChain)
|
||||
if hasJump(lbRules, nonLocalEpChain, "", 0) {
|
||||
errorf(fmt.Sprintf("Found jump from lb chain %v to non-local ep %v", lbChain, nonLocalEp), lbRules, t)
|
||||
errorf(fmt.Sprintf("Found jump from lb chain %v to non-local ep %v", lbChain, epStrLocal), lbRules, t)
|
||||
}
|
||||
if !hasJump(lbRules, localEpChain, "", 0) {
|
||||
errorf(fmt.Sprintf("Didn't find jump from lb chain %v to local ep %v", lbChain, localEp), lbRules, t)
|
||||
errorf(fmt.Sprintf("Didn't find jump from lb chain %v to local ep %v", lbChain, epStrNonLocal), lbRules, t)
|
||||
}
|
||||
}
|
||||
|
||||
@ -815,54 +891,67 @@ func TestOnlyLocalNodePorts(t *testing.T) {
|
||||
|
||||
func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTables) {
|
||||
shouldLBTOSVCRuleExist := len(fp.clusterCIDR) > 0
|
||||
svcName := "svc1"
|
||||
svcIP := net.IPv4(10, 20, 30, 41)
|
||||
svcIP := "10.20.30.41"
|
||||
svcPort := 80
|
||||
svcNodePort := 3001
|
||||
svcPortName := proxy.ServicePortName{
|
||||
NamespacedName: makeNSN("ns1", "svc1"),
|
||||
Port: "p80",
|
||||
}
|
||||
|
||||
svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "p80"}
|
||||
svcInfo := newFakeServiceInfo(svc, svcIP, 80, api.ProtocolTCP, true)
|
||||
svcInfo.nodePort = 3001
|
||||
fp.serviceMap[svc] = svcInfo
|
||||
fp.allServices = []*api.Service{
|
||||
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) {
|
||||
svc.Spec.Type = "NodePort"
|
||||
svc.Spec.ClusterIP = svcIP
|
||||
svc.Spec.Ports = []api.ServicePort{{
|
||||
Name: svcPortName.Port,
|
||||
Port: int32(svcPort),
|
||||
Protocol: api.ProtocolTCP,
|
||||
NodePort: int32(svcNodePort),
|
||||
}}
|
||||
svc.Annotations[service.BetaAnnotationExternalTraffic] = service.AnnotationValueExternalTrafficLocal
|
||||
}),
|
||||
}
|
||||
|
||||
ip1 := "10.180.0.1"
|
||||
ip2 := "10.180.2.1"
|
||||
port := 80
|
||||
nonLocalEp := fmt.Sprintf("%s:%d", ip1, port)
|
||||
localEp := fmt.Sprintf("%s:%d", ip2, port)
|
||||
allEndpoints := []*api.Endpoints{
|
||||
makeTestEndpoints("ns1", svcName, func(ept *api.Endpoints) {
|
||||
epIP1 := "10.180.0.1"
|
||||
epIP2 := "10.180.2.1"
|
||||
epStrLocal := fmt.Sprintf("%s:%d", epIP1, svcPort)
|
||||
epStrNonLocal := fmt.Sprintf("%s:%d", epIP2, svcPort)
|
||||
fp.allEndpoints = []*api.Endpoints{
|
||||
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) {
|
||||
ept.Subsets = []api.EndpointSubset{{
|
||||
Addresses: []api.EndpointAddress{{
|
||||
IP: ip1,
|
||||
IP: epIP1,
|
||||
NodeName: nil,
|
||||
}, {
|
||||
IP: ip2,
|
||||
IP: epIP2,
|
||||
NodeName: strPtr(testHostname),
|
||||
}},
|
||||
Ports: []api.EndpointPort{{
|
||||
Name: "p80",
|
||||
Port: int32(port),
|
||||
Name: svcPortName.Port,
|
||||
Port: int32(svcPort),
|
||||
}},
|
||||
}}
|
||||
}),
|
||||
}
|
||||
|
||||
fp.OnEndpointsUpdate(allEndpoints)
|
||||
fp.syncProxyRules(syncReasonForce)
|
||||
|
||||
proto := strings.ToLower(string(api.ProtocolTCP))
|
||||
lbChain := string(serviceLBChainName(svc, proto))
|
||||
lbChain := string(serviceLBChainName(svcPortName, proto))
|
||||
|
||||
nonLocalEpChain := string(servicePortEndpointChainName(svc, strings.ToLower(string(api.ProtocolTCP)), nonLocalEp))
|
||||
localEpChain := string(servicePortEndpointChainName(svc, strings.ToLower(string(api.ProtocolTCP)), localEp))
|
||||
nonLocalEpChain := string(servicePortEndpointChainName(svcPortName, proto, epStrLocal))
|
||||
localEpChain := string(servicePortEndpointChainName(svcPortName, proto, epStrNonLocal))
|
||||
|
||||
kubeNodePortRules := ipt.GetRules(string(kubeNodePortsChain))
|
||||
if !hasJump(kubeNodePortRules, lbChain, "", svcInfo.nodePort) {
|
||||
if !hasJump(kubeNodePortRules, lbChain, "", svcNodePort) {
|
||||
errorf(fmt.Sprintf("Failed to find jump to lb chain %v", lbChain), kubeNodePortRules, t)
|
||||
}
|
||||
|
||||
svcChain := string(servicePortChainName(svc, strings.ToLower(string(api.ProtocolTCP))))
|
||||
svcChain := string(servicePortChainName(svcPortName, proto))
|
||||
lbRules := ipt.GetRules(lbChain)
|
||||
if hasJump(lbRules, nonLocalEpChain, "", 0) {
|
||||
errorf(fmt.Sprintf("Found jump from lb chain %v to non-local ep %v", lbChain, nonLocalEp), lbRules, t)
|
||||
errorf(fmt.Sprintf("Found jump from lb chain %v to non-local ep %v", lbChain, epStrLocal), lbRules, t)
|
||||
}
|
||||
if hasJump(lbRules, svcChain, "", 0) != shouldLBTOSVCRuleExist {
|
||||
prefix := "Did not find "
|
||||
@ -872,15 +961,16 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable
|
||||
errorf(fmt.Sprintf("%s jump from lb chain %v to svc %v", prefix, lbChain, svcChain), lbRules, t)
|
||||
}
|
||||
if !hasJump(lbRules, localEpChain, "", 0) {
|
||||
errorf(fmt.Sprintf("Didn't find jump from lb chain %v to local ep %v", lbChain, nonLocalEp), lbRules, t)
|
||||
errorf(fmt.Sprintf("Didn't find jump from lb chain %v to local ep %v", lbChain, epStrLocal), lbRules, t)
|
||||
}
|
||||
}
|
||||
|
||||
func makeTestService(namespace, name string, svcFunc func(*api.Service)) *api.Service {
|
||||
svc := &api.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
Namespace: namespace,
|
||||
Name: name,
|
||||
Namespace: namespace,
|
||||
Annotations: map[string]string{},
|
||||
},
|
||||
Spec: api.ServiceSpec{},
|
||||
Status: api.ServiceStatus{},
|
||||
@ -1122,13 +1212,11 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
||||
func Test_accumulateEndpointsMap(t *testing.T) {
|
||||
testCases := []struct {
|
||||
newEndpoints *api.Endpoints
|
||||
oldEndpoints map[proxy.ServicePortName][]*endpointsInfo
|
||||
expectedNew map[proxy.ServicePortName][]*endpointsInfo
|
||||
expected map[proxy.ServicePortName][]*endpointsInfo
|
||||
}{{
|
||||
// Case[0]: nothing
|
||||
newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}),
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
|
||||
expectedNew: map[proxy.ServicePortName][]*endpointsInfo{},
|
||||
expected: map[proxy.ServicePortName][]*endpointsInfo{},
|
||||
}, {
|
||||
// Case[1]: no changes, unnamed port
|
||||
newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
|
||||
@ -1144,12 +1232,7 @@ func Test_accumulateEndpointsMap(t *testing.T) {
|
||||
},
|
||||
}
|
||||
}),
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
makeServicePortName("ns1", "ep1", ""): {
|
||||
{"1.1.1.1:11", false},
|
||||
},
|
||||
},
|
||||
expectedNew: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
expected: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
makeServicePortName("ns1", "ep1", ""): {
|
||||
{"1.1.1.1:11", false},
|
||||
},
|
||||
@ -1169,12 +1252,7 @@ func Test_accumulateEndpointsMap(t *testing.T) {
|
||||
},
|
||||
}
|
||||
}),
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
makeServicePortName("ns1", "ep1", "port"): {
|
||||
{"1.1.1.1:11", false},
|
||||
},
|
||||
},
|
||||
expectedNew: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
expected: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
makeServicePortName("ns1", "ep1", "port"): {
|
||||
{"1.1.1.1:11", false},
|
||||
},
|
||||
@ -1193,10 +1271,7 @@ func Test_accumulateEndpointsMap(t *testing.T) {
|
||||
},
|
||||
}
|
||||
}),
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
makeServicePortName("ns1", "ep1", ""): {},
|
||||
},
|
||||
expectedNew: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
expected: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
makeServicePortName("ns1", "ep1", ""): {
|
||||
{"1.1.1.1:11", false},
|
||||
},
|
||||
@ -1204,12 +1279,7 @@ func Test_accumulateEndpointsMap(t *testing.T) {
|
||||
}, {
|
||||
// Case[4]: remove port
|
||||
newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}),
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
makeServicePortName("ns1", "ep1", ""): {
|
||||
{"1.1.1.1:11", false},
|
||||
},
|
||||
},
|
||||
expectedNew: map[proxy.ServicePortName][]*endpointsInfo{},
|
||||
expected: map[proxy.ServicePortName][]*endpointsInfo{},
|
||||
}, {
|
||||
// Case[5]: new IP and port
|
||||
newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
|
||||
@ -1230,12 +1300,7 @@ func Test_accumulateEndpointsMap(t *testing.T) {
|
||||
},
|
||||
}
|
||||
}),
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
makeServicePortName("ns1", "ep1", "p1"): {
|
||||
{"1.1.1.1:11", false},
|
||||
},
|
||||
},
|
||||
expectedNew: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
expected: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
makeServicePortName("ns1", "ep1", "p1"): {
|
||||
{"1.1.1.1:11", false},
|
||||
{"2.2.2.2:11", false},
|
||||
@ -1260,17 +1325,7 @@ func Test_accumulateEndpointsMap(t *testing.T) {
|
||||
},
|
||||
}
|
||||
}),
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
makeServicePortName("ns1", "ep1", "p1"): {
|
||||
{"1.1.1.1:11", false},
|
||||
{"2.2.2.2:11", false},
|
||||
},
|
||||
makeServicePortName("ns1", "ep1", "p2"): {
|
||||
{"1.1.1.1:22", false},
|
||||
{"2.2.2.2:22", false},
|
||||
},
|
||||
},
|
||||
expectedNew: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
expected: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
makeServicePortName("ns1", "ep1", "p1"): {
|
||||
{"1.1.1.1:11", false},
|
||||
},
|
||||
@ -1290,12 +1345,7 @@ func Test_accumulateEndpointsMap(t *testing.T) {
|
||||
},
|
||||
}
|
||||
}),
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
makeServicePortName("ns1", "ep1", "p1"): {
|
||||
{"1.1.1.1:11", false},
|
||||
},
|
||||
},
|
||||
expectedNew: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
expected: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
makeServicePortName("ns1", "ep1", "p2"): {
|
||||
{"1.1.1.1:11", false},
|
||||
},
|
||||
@ -1315,12 +1365,7 @@ func Test_accumulateEndpointsMap(t *testing.T) {
|
||||
},
|
||||
}
|
||||
}),
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
makeServicePortName("ns1", "ep1", "p1"): {
|
||||
{"1.1.1.1:11", false},
|
||||
},
|
||||
},
|
||||
expectedNew: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
expected: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
makeServicePortName("ns1", "ep1", "p1"): {
|
||||
{"1.1.1.1:22", false},
|
||||
},
|
||||
@ -1330,18 +1375,18 @@ func Test_accumulateEndpointsMap(t *testing.T) {
|
||||
for tci, tc := range testCases {
|
||||
// outputs
|
||||
newEndpoints := make(proxyEndpointMap)
|
||||
accumulateEndpointsMap(tc.newEndpoints, "host", tc.oldEndpoints, &newEndpoints)
|
||||
accumulateEndpointsMap(tc.newEndpoints, "host", &newEndpoints)
|
||||
|
||||
if len(newEndpoints) != len(tc.expectedNew) {
|
||||
t.Errorf("[%d] expected %d new, got %d: %v", tci, len(tc.expectedNew), len(newEndpoints), spew.Sdump(newEndpoints))
|
||||
if len(newEndpoints) != len(tc.expected) {
|
||||
t.Errorf("[%d] expected %d new, got %d: %v", tci, len(tc.expected), len(newEndpoints), spew.Sdump(newEndpoints))
|
||||
}
|
||||
for x := range tc.expectedNew {
|
||||
if len(newEndpoints[x]) != len(tc.expectedNew[x]) {
|
||||
t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(tc.expectedNew[x]), x, len(newEndpoints[x]))
|
||||
for x := range tc.expected {
|
||||
if len(newEndpoints[x]) != len(tc.expected[x]) {
|
||||
t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(tc.expected[x]), x, len(newEndpoints[x]))
|
||||
} else {
|
||||
for i := range newEndpoints[x] {
|
||||
if *(newEndpoints[x][i]) != *(tc.expectedNew[x][i]) {
|
||||
t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, tc.expectedNew[x][i], *(newEndpoints[x][i]))
|
||||
if *(newEndpoints[x][i]) != *(tc.expected[x][i]) {
|
||||
t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, tc.expected[x][i], *(newEndpoints[x][i]))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user