mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 12:43:23 +00:00
Implementing ExternalTrafficPolicy: local in winkernel kube-proxy via DSR
This commit is contained in:
parent
1968e96165
commit
1bcddb0747
@ -118,6 +118,7 @@ type serviceInfo struct {
|
|||||||
remoteEndpoint *endpointsInfo
|
remoteEndpoint *endpointsInfo
|
||||||
hns HostNetworkService
|
hns HostNetworkService
|
||||||
preserveDIP bool
|
preserveDIP bool
|
||||||
|
localTrafficDSR bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type hnsNetworkInfo struct {
|
type hnsNetworkInfo struct {
|
||||||
@ -350,9 +351,11 @@ func (refCountMap endPointsReferenceCountMap) getRefCount(hnsID string) *uint16
|
|||||||
func (proxier *Proxier) newServiceInfo(port *v1.ServicePort, service *v1.Service, baseInfo *proxy.BaseServiceInfo) proxy.ServicePort {
|
func (proxier *Proxier) newServiceInfo(port *v1.ServicePort, service *v1.Service, baseInfo *proxy.BaseServiceInfo) proxy.ServicePort {
|
||||||
info := &serviceInfo{BaseServiceInfo: baseInfo}
|
info := &serviceInfo{BaseServiceInfo: baseInfo}
|
||||||
preserveDIP := service.Annotations["preserve-destination"] == "true"
|
preserveDIP := service.Annotations["preserve-destination"] == "true"
|
||||||
|
localTrafficDSR := service.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal
|
||||||
err := hcn.DSRSupported()
|
err := hcn.DSRSupported()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
preserveDIP = false
|
preserveDIP = false
|
||||||
|
localTrafficDSR = false
|
||||||
}
|
}
|
||||||
// targetPort is zero if it is specified as a name in port.TargetPort.
|
// targetPort is zero if it is specified as a name in port.TargetPort.
|
||||||
// Its real value would be got later from endpoints.
|
// Its real value would be got later from endpoints.
|
||||||
@ -364,6 +367,7 @@ func (proxier *Proxier) newServiceInfo(port *v1.ServicePort, service *v1.Service
|
|||||||
info.preserveDIP = preserveDIP
|
info.preserveDIP = preserveDIP
|
||||||
info.targetPort = targetPort
|
info.targetPort = targetPort
|
||||||
info.hns = proxier.hns
|
info.hns = proxier.hns
|
||||||
|
info.localTrafficDSR = localTrafficDSR
|
||||||
|
|
||||||
for _, eip := range service.Spec.ExternalIPs {
|
for _, eip := range service.Spec.ExternalIPs {
|
||||||
info.externalIPs = append(info.externalIPs, &externalIPInfo{ip: eip})
|
info.externalIPs = append(info.externalIPs, &externalIPInfo{ip: eip})
|
||||||
@ -1157,12 +1161,12 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
// If the preserve-destination service annotation is present, we will disable routing mesh for NodePort.
|
// If the preserve-destination service annotation is present, we will disable routing mesh for NodePort.
|
||||||
// This means that health services can use Node Port without falsely getting results from a different node.
|
// This means that health services can use Node Port without falsely getting results from a different node.
|
||||||
nodePortEndpoints := hnsEndpoints
|
nodePortEndpoints := hnsEndpoints
|
||||||
if svcInfo.preserveDIP {
|
if svcInfo.preserveDIP || svcInfo.localTrafficDSR {
|
||||||
nodePortEndpoints = hnsLocalEndpoints
|
nodePortEndpoints = hnsLocalEndpoints
|
||||||
}
|
}
|
||||||
hnsLoadBalancer, err := hns.getLoadBalancer(
|
hnsLoadBalancer, err := hns.getLoadBalancer(
|
||||||
nodePortEndpoints,
|
nodePortEndpoints,
|
||||||
loadBalancerFlags{localRoutedVIP: true, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode},
|
loadBalancerFlags{isDSR: svcInfo.localTrafficDSR, localRoutedVIP: true, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode},
|
||||||
sourceVip,
|
sourceVip,
|
||||||
"",
|
"",
|
||||||
Enum(svcInfo.Protocol()),
|
Enum(svcInfo.Protocol()),
|
||||||
@ -1180,10 +1184,15 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
|
|
||||||
// Create a Load Balancer Policy for each external IP
|
// Create a Load Balancer Policy for each external IP
|
||||||
for _, externalIP := range svcInfo.externalIPs {
|
for _, externalIP := range svcInfo.externalIPs {
|
||||||
|
// Disable routing mesh if ExternalTrafficPolicy is set to local
|
||||||
|
externalIPEndpoints := hnsEndpoints
|
||||||
|
if svcInfo.localTrafficDSR {
|
||||||
|
externalIPEndpoints = hnsLocalEndpoints
|
||||||
|
}
|
||||||
// Try loading existing policies, if already available
|
// Try loading existing policies, if already available
|
||||||
hnsLoadBalancer, err = hns.getLoadBalancer(
|
hnsLoadBalancer, err = hns.getLoadBalancer(
|
||||||
hnsEndpoints,
|
externalIPEndpoints,
|
||||||
loadBalancerFlags{sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode},
|
loadBalancerFlags{isDSR: svcInfo.localTrafficDSR, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode},
|
||||||
sourceVip,
|
sourceVip,
|
||||||
externalIP.ip,
|
externalIP.ip,
|
||||||
Enum(svcInfo.Protocol()),
|
Enum(svcInfo.Protocol()),
|
||||||
@ -1201,12 +1210,12 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
for _, lbIngressIP := range svcInfo.loadBalancerIngressIPs {
|
for _, lbIngressIP := range svcInfo.loadBalancerIngressIPs {
|
||||||
// Try loading existing policies, if already available
|
// Try loading existing policies, if already available
|
||||||
lbIngressEndpoints := hnsEndpoints
|
lbIngressEndpoints := hnsEndpoints
|
||||||
if svcInfo.preserveDIP {
|
if svcInfo.preserveDIP || svcInfo.localTrafficDSR {
|
||||||
lbIngressEndpoints = hnsLocalEndpoints
|
lbIngressEndpoints = hnsLocalEndpoints
|
||||||
}
|
}
|
||||||
hnsLoadBalancer, err := hns.getLoadBalancer(
|
hnsLoadBalancer, err := hns.getLoadBalancer(
|
||||||
lbIngressEndpoints,
|
lbIngressEndpoints,
|
||||||
loadBalancerFlags{isDSR: svcInfo.preserveDIP || proxier.isDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode},
|
loadBalancerFlags{isDSR: svcInfo.preserveDIP || proxier.isDSR || svcInfo.localTrafficDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode},
|
||||||
sourceVip,
|
sourceVip,
|
||||||
lbIngressIP.ip,
|
lbIngressIP.ip,
|
||||||
Enum(svcInfo.Protocol()),
|
Enum(svcInfo.Protocol()),
|
||||||
|
@ -47,6 +47,7 @@ type fakeHNS struct{}
|
|||||||
func newFakeHNS() *fakeHNS {
|
func newFakeHNS() *fakeHNS {
|
||||||
return &fakeHNS{}
|
return &fakeHNS{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hns fakeHNS) getNetworkByName(name string) (*hnsNetworkInfo, error) {
|
func (hns fakeHNS) getNetworkByName(name string) (*hnsNetworkInfo, error) {
|
||||||
var remoteSubnets []*remoteSubnetInfo
|
var remoteSubnets []*remoteSubnetInfo
|
||||||
rs := &remoteSubnetInfo{
|
rs := &remoteSubnetInfo{
|
||||||
@ -63,9 +64,11 @@ func (hns fakeHNS) getNetworkByName(name string) (*hnsNetworkInfo, error) {
|
|||||||
remoteSubnets: remoteSubnets,
|
remoteSubnets: remoteSubnets,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hns fakeHNS) getEndpointByID(id string) (*endpointsInfo, error) {
|
func (hns fakeHNS) getEndpointByID(id string) (*endpointsInfo, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hns fakeHNS) getEndpointByIpAddress(ip string, networkName string) (*endpointsInfo, error) {
|
func (hns fakeHNS) getEndpointByIpAddress(ip string, networkName string) (*endpointsInfo, error) {
|
||||||
_, ipNet, _ := net.ParseCIDR(destinationPrefix)
|
_, ipNet, _ := net.ParseCIDR(destinationPrefix)
|
||||||
|
|
||||||
@ -81,6 +84,7 @@ func (hns fakeHNS) getEndpointByIpAddress(ip string, networkName string) (*endpo
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hns fakeHNS) createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error) {
|
func (hns fakeHNS) createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error) {
|
||||||
return &endpointsInfo{
|
return &endpointsInfo{
|
||||||
ip: ep.ip,
|
ip: ep.ip,
|
||||||
@ -90,17 +94,21 @@ func (hns fakeHNS) createEndpoint(ep *endpointsInfo, networkName string) (*endpo
|
|||||||
hns: hns,
|
hns: hns,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hns fakeHNS) deleteEndpoint(hnsID string) error {
|
func (hns fakeHNS) deleteEndpoint(hnsID string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hns fakeHNS) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error) {
|
func (hns fakeHNS) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error) {
|
||||||
return &loadBalancerInfo{
|
return &loadBalancerInfo{
|
||||||
hnsID: guid,
|
hnsID: guid,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hns fakeHNS) deleteLoadBalancer(hnsID string) error {
|
func (hns fakeHNS) deleteLoadBalancer(hnsID string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clusterCIDR string, hostname string, nodeIP net.IP, networkType string, endpointSliceEnabled bool) *Proxier {
|
func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clusterCIDR string, hostname string, nodeIP net.IP, networkType string, endpointSliceEnabled bool) *Proxier {
|
||||||
sourceVip := "192.168.1.2"
|
sourceVip := "192.168.1.2"
|
||||||
hnsNetworkInfo := &hnsNetworkInfo{
|
hnsNetworkInfo := &hnsNetworkInfo{
|
||||||
@ -187,6 +195,7 @@ func TestCreateServiceVip(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCreateRemoteEndpointOverlay(t *testing.T) {
|
func TestCreateRemoteEndpointOverlay(t *testing.T) {
|
||||||
syncPeriod := 30 * time.Second
|
syncPeriod := 30 * time.Second
|
||||||
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay", false)
|
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay", false)
|
||||||
@ -251,6 +260,7 @@ func TestCreateRemoteEndpointOverlay(t *testing.T) {
|
|||||||
t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[guid], *epInfo.refCount)
|
t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[guid], *epInfo.refCount)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCreateRemoteEndpointL2Bridge(t *testing.T) {
|
func TestCreateRemoteEndpointL2Bridge(t *testing.T) {
|
||||||
syncPeriod := 30 * time.Second
|
syncPeriod := 30 * time.Second
|
||||||
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "L2Bridge", false)
|
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "L2Bridge", false)
|
||||||
@ -695,6 +705,69 @@ func TestCreateLoadBalancer(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCreateDsrLoadBalancer(t *testing.T) {
|
||||||
|
syncPeriod := 30 * time.Second
|
||||||
|
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay", false)
|
||||||
|
if proxier == nil {
|
||||||
|
t.Error()
|
||||||
|
}
|
||||||
|
|
||||||
|
svcIP := "10.20.30.41"
|
||||||
|
svcPort := 80
|
||||||
|
svcNodePort := 3001
|
||||||
|
svcPortName := proxy.ServicePortName{
|
||||||
|
NamespacedName: makeNSN("ns1", "svc1"),
|
||||||
|
Port: "p80",
|
||||||
|
Protocol: v1.ProtocolTCP,
|
||||||
|
}
|
||||||
|
|
||||||
|
makeServiceMap(proxier,
|
||||||
|
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
|
||||||
|
svc.Spec.Type = "NodePort"
|
||||||
|
svc.Spec.ClusterIP = svcIP
|
||||||
|
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
|
||||||
|
svc.Spec.Ports = []v1.ServicePort{{
|
||||||
|
Name: svcPortName.Port,
|
||||||
|
Port: int32(svcPort),
|
||||||
|
Protocol: v1.ProtocolTCP,
|
||||||
|
NodePort: int32(svcNodePort),
|
||||||
|
}}
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
makeEndpointsMap(proxier,
|
||||||
|
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
|
||||||
|
ept.Subsets = []v1.EndpointSubset{{
|
||||||
|
Addresses: []v1.EndpointAddress{{
|
||||||
|
IP: epIpAddressRemote,
|
||||||
|
}},
|
||||||
|
Ports: []v1.EndpointPort{{
|
||||||
|
Name: svcPortName.Port,
|
||||||
|
Port: int32(svcPort),
|
||||||
|
Protocol: v1.ProtocolTCP,
|
||||||
|
}},
|
||||||
|
}}
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
|
||||||
|
proxier.setInitialized(true)
|
||||||
|
proxier.syncProxyRules()
|
||||||
|
|
||||||
|
svc := proxier.serviceMap[svcPortName]
|
||||||
|
svcInfo, ok := svc.(*serviceInfo)
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("Failed to cast serviceInfo %q", svcPortName.String())
|
||||||
|
|
||||||
|
} else {
|
||||||
|
if svcInfo.hnsID != guid {
|
||||||
|
t.Errorf("%v does not match %v", svcInfo.hnsID, guid)
|
||||||
|
}
|
||||||
|
if svcInfo.localTrafficDSR != true {
|
||||||
|
t.Errorf("Failed to create DSR loadbalancer with local traffic policy")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestEndpointSlice(t *testing.T) {
|
func TestEndpointSlice(t *testing.T) {
|
||||||
syncPeriod := 30 * time.Second
|
syncPeriod := 30 * time.Second
|
||||||
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay", true)
|
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay", true)
|
||||||
@ -767,6 +840,7 @@ func TestEndpointSlice(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNoopEndpointSlice(t *testing.T) {
|
func TestNoopEndpointSlice(t *testing.T) {
|
||||||
p := Proxier{}
|
p := Proxier{}
|
||||||
p.OnEndpointSliceAdd(&discovery.EndpointSlice{})
|
p.OnEndpointSliceAdd(&discovery.EndpointSlice{})
|
||||||
@ -799,6 +873,7 @@ func TestFindRemoteSubnetProviderAddress(t *testing.T) {
|
|||||||
func makeNSN(namespace, name string) types.NamespacedName {
|
func makeNSN(namespace, name string) types.NamespacedName {
|
||||||
return types.NamespacedName{Namespace: namespace, Name: name}
|
return types.NamespacedName{Namespace: namespace, Name: name}
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeServiceMap(proxier *Proxier, allServices ...*v1.Service) {
|
func makeServiceMap(proxier *Proxier, allServices ...*v1.Service) {
|
||||||
for i := range allServices {
|
for i := range allServices {
|
||||||
proxier.OnServiceAdd(allServices[i])
|
proxier.OnServiceAdd(allServices[i])
|
||||||
@ -817,6 +892,7 @@ func deleteServices(proxier *Proxier, allServices ...*v1.Service) {
|
|||||||
defer proxier.mu.Unlock()
|
defer proxier.mu.Unlock()
|
||||||
proxier.servicesSynced = true
|
proxier.servicesSynced = true
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeTestService(namespace, name string, svcFunc func(*v1.Service)) *v1.Service {
|
func makeTestService(namespace, name string, svcFunc func(*v1.Service)) *v1.Service {
|
||||||
svc := &v1.Service{
|
svc := &v1.Service{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Loading…
Reference in New Issue
Block a user