From 1bcddb07477b59c50c1e5f062de0f7a4f52b7325 Mon Sep 17 00:00:00 2001 From: elweb9858 Date: Thu, 16 Jul 2020 17:06:36 -0700 Subject: [PATCH] Implementing ExternalTrafficPolicy: local in winkernel kube-proxy via DSR --- pkg/proxy/winkernel/proxier.go | 21 +++++--- pkg/proxy/winkernel/proxier_test.go | 76 +++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+), 6 deletions(-) diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index 64dc752e93c..80eee1b5187 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -118,6 +118,7 @@ type serviceInfo struct { remoteEndpoint *endpointsInfo hns HostNetworkService preserveDIP bool + localTrafficDSR bool } type hnsNetworkInfo struct { @@ -350,9 +351,11 @@ func (refCountMap endPointsReferenceCountMap) getRefCount(hnsID string) *uint16 func (proxier *Proxier) newServiceInfo(port *v1.ServicePort, service *v1.Service, baseInfo *proxy.BaseServiceInfo) proxy.ServicePort { info := &serviceInfo{BaseServiceInfo: baseInfo} preserveDIP := service.Annotations["preserve-destination"] == "true" + localTrafficDSR := service.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal err := hcn.DSRSupported() if err != nil { preserveDIP = false + localTrafficDSR = false } // targetPort is zero if it is specified as a name in port.TargetPort. // Its real value would be got later from endpoints. @@ -364,6 +367,7 @@ func (proxier *Proxier) newServiceInfo(port *v1.ServicePort, service *v1.Service info.preserveDIP = preserveDIP info.targetPort = targetPort info.hns = proxier.hns + info.localTrafficDSR = localTrafficDSR for _, eip := range service.Spec.ExternalIPs { info.externalIPs = append(info.externalIPs, &externalIPInfo{ip: eip}) @@ -1157,12 +1161,12 @@ func (proxier *Proxier) syncProxyRules() { // If the preserve-destination service annotation is present, we will disable routing mesh for NodePort. // This means that health services can use Node Port without falsely getting results from a different node. nodePortEndpoints := hnsEndpoints - if svcInfo.preserveDIP { + if svcInfo.preserveDIP || svcInfo.localTrafficDSR { nodePortEndpoints = hnsLocalEndpoints } hnsLoadBalancer, err := hns.getLoadBalancer( nodePortEndpoints, - loadBalancerFlags{localRoutedVIP: true, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode}, + loadBalancerFlags{isDSR: svcInfo.localTrafficDSR, localRoutedVIP: true, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode}, sourceVip, "", Enum(svcInfo.Protocol()), @@ -1180,10 +1184,15 @@ func (proxier *Proxier) syncProxyRules() { // Create a Load Balancer Policy for each external IP for _, externalIP := range svcInfo.externalIPs { + // Disable routing mesh if ExternalTrafficPolicy is set to local + externalIPEndpoints := hnsEndpoints + if svcInfo.localTrafficDSR { + externalIPEndpoints = hnsLocalEndpoints + } // Try loading existing policies, if already available hnsLoadBalancer, err = hns.getLoadBalancer( - hnsEndpoints, - loadBalancerFlags{sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode}, + externalIPEndpoints, + loadBalancerFlags{isDSR: svcInfo.localTrafficDSR, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode}, sourceVip, externalIP.ip, Enum(svcInfo.Protocol()), @@ -1201,12 +1210,12 @@ func (proxier *Proxier) syncProxyRules() { for _, lbIngressIP := range svcInfo.loadBalancerIngressIPs { // Try loading existing policies, if already available lbIngressEndpoints := hnsEndpoints - if svcInfo.preserveDIP { + if svcInfo.preserveDIP || svcInfo.localTrafficDSR { lbIngressEndpoints = hnsLocalEndpoints } hnsLoadBalancer, err := hns.getLoadBalancer( lbIngressEndpoints, - loadBalancerFlags{isDSR: svcInfo.preserveDIP || proxier.isDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode}, + loadBalancerFlags{isDSR: svcInfo.preserveDIP || proxier.isDSR || svcInfo.localTrafficDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode}, sourceVip, lbIngressIP.ip, Enum(svcInfo.Protocol()), diff --git a/pkg/proxy/winkernel/proxier_test.go b/pkg/proxy/winkernel/proxier_test.go index a24c49fb07b..a21f48a8644 100644 --- a/pkg/proxy/winkernel/proxier_test.go +++ b/pkg/proxy/winkernel/proxier_test.go @@ -47,6 +47,7 @@ type fakeHNS struct{} func newFakeHNS() *fakeHNS { return &fakeHNS{} } + func (hns fakeHNS) getNetworkByName(name string) (*hnsNetworkInfo, error) { var remoteSubnets []*remoteSubnetInfo rs := &remoteSubnetInfo{ @@ -63,9 +64,11 @@ func (hns fakeHNS) getNetworkByName(name string) (*hnsNetworkInfo, error) { remoteSubnets: remoteSubnets, }, nil } + func (hns fakeHNS) getEndpointByID(id string) (*endpointsInfo, error) { return nil, nil } + func (hns fakeHNS) getEndpointByIpAddress(ip string, networkName string) (*endpointsInfo, error) { _, ipNet, _ := net.ParseCIDR(destinationPrefix) @@ -81,6 +84,7 @@ func (hns fakeHNS) getEndpointByIpAddress(ip string, networkName string) (*endpo return nil, nil } + func (hns fakeHNS) createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error) { return &endpointsInfo{ ip: ep.ip, @@ -90,17 +94,21 @@ func (hns fakeHNS) createEndpoint(ep *endpointsInfo, networkName string) (*endpo hns: hns, }, nil } + func (hns fakeHNS) deleteEndpoint(hnsID string) error { return nil } + func (hns fakeHNS) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error) { return &loadBalancerInfo{ hnsID: guid, }, nil } + func (hns fakeHNS) deleteLoadBalancer(hnsID string) error { return nil } + func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clusterCIDR string, hostname string, nodeIP net.IP, networkType string, endpointSliceEnabled bool) *Proxier { sourceVip := "192.168.1.2" hnsNetworkInfo := &hnsNetworkInfo{ @@ -187,6 +195,7 @@ func TestCreateServiceVip(t *testing.T) { } } } + func TestCreateRemoteEndpointOverlay(t *testing.T) { syncPeriod := 30 * time.Second proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay", false) @@ -251,6 +260,7 @@ func TestCreateRemoteEndpointOverlay(t *testing.T) { t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[guid], *epInfo.refCount) } } + func TestCreateRemoteEndpointL2Bridge(t *testing.T) { syncPeriod := 30 * time.Second proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "L2Bridge", false) @@ -695,6 +705,69 @@ func TestCreateLoadBalancer(t *testing.T) { } } + +func TestCreateDsrLoadBalancer(t *testing.T) { + syncPeriod := 30 * time.Second + proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay", false) + if proxier == nil { + t.Error() + } + + svcIP := "10.20.30.41" + svcPort := 80 + svcNodePort := 3001 + svcPortName := proxy.ServicePortName{ + NamespacedName: makeNSN("ns1", "svc1"), + Port: "p80", + Protocol: v1.ProtocolTCP, + } + + makeServiceMap(proxier, + makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { + svc.Spec.Type = "NodePort" + svc.Spec.ClusterIP = svcIP + svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal + svc.Spec.Ports = []v1.ServicePort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: v1.ProtocolTCP, + NodePort: int32(svcNodePort), + }} + }), + ) + makeEndpointsMap(proxier, + makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) { + ept.Subsets = []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{ + IP: epIpAddressRemote, + }}, + Ports: []v1.EndpointPort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: v1.ProtocolTCP, + }}, + }} + }), + ) + + proxier.setInitialized(true) + proxier.syncProxyRules() + + svc := proxier.serviceMap[svcPortName] + svcInfo, ok := svc.(*serviceInfo) + if !ok { + t.Errorf("Failed to cast serviceInfo %q", svcPortName.String()) + + } else { + if svcInfo.hnsID != guid { + t.Errorf("%v does not match %v", svcInfo.hnsID, guid) + } + if svcInfo.localTrafficDSR != true { + t.Errorf("Failed to create DSR loadbalancer with local traffic policy") + } + } +} + func TestEndpointSlice(t *testing.T) { syncPeriod := 30 * time.Second proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay", true) @@ -767,6 +840,7 @@ func TestEndpointSlice(t *testing.T) { } } } + func TestNoopEndpointSlice(t *testing.T) { p := Proxier{} p.OnEndpointSliceAdd(&discovery.EndpointSlice{}) @@ -799,6 +873,7 @@ func TestFindRemoteSubnetProviderAddress(t *testing.T) { func makeNSN(namespace, name string) types.NamespacedName { return types.NamespacedName{Namespace: namespace, Name: name} } + func makeServiceMap(proxier *Proxier, allServices ...*v1.Service) { for i := range allServices { proxier.OnServiceAdd(allServices[i]) @@ -817,6 +892,7 @@ func deleteServices(proxier *Proxier, allServices ...*v1.Service) { defer proxier.mu.Unlock() proxier.servicesSynced = true } + func makeTestService(namespace, name string, svcFunc func(*v1.Service)) *v1.Service { svc := &v1.Service{ ObjectMeta: metav1.ObjectMeta{