Fix for preferred dualstack and required dualstack in winkernel proxier.

This commit is contained in:
Prince Pereira
2025-12-16 18:55:07 +00:00
parent 8825a37c13
commit 6d23c3f540
5 changed files with 299 additions and 44 deletions

View File

@@ -202,7 +202,7 @@ func (hns hns) getEndpointByIpAddress(ip string, networkName string) (*endpointI
}
for _, endpoint := range endpoints {
equal := false
if endpoint.IpConfigurations != nil && len(endpoint.IpConfigurations) > 0 {
if len(endpoint.IpConfigurations) > 0 {
equal = endpoint.IpConfigurations[0].IpAddress == ip
if !equal && len(endpoint.IpConfigurations) > 1 {
@@ -276,11 +276,13 @@ func (hns hns) createEndpoint(ep *endpointInfo, networkName string) (*endpointIn
if err != nil {
return nil, err
}
klog.V(3).InfoS("Created remote endpoint resource", "hnsID", createdEndpoint.Id)
} else {
createdEndpoint, err = hns.hcn.CreateEndpoint(hnsNetwork, hnsEndpoint)
if err != nil {
return nil, err
}
klog.V(3).InfoS("Created local endpoint resource", "hnsID", createdEndpoint.Id)
}
return &endpointInfo{
ip: createdEndpoint.IpConfigurations[0].IpAddress,
@@ -304,17 +306,14 @@ func (hns hns) deleteEndpoint(hnsID string) error {
}
// findLoadBalancerID will construct a id from the provided loadbalancer fields
func findLoadBalancerID(endpoints []endpointInfo, vip string, protocol, internalPort, externalPort uint16) (loadBalancerIdentifier, error) {
func findLoadBalancerID(endpoints []endpointInfo, vip string, protocol, internalPort, externalPort uint16, isIpv6 bool) (loadBalancerIdentifier, error) {
// Compute hash from backends (endpoint IDs)
hash, err := hashEndpoints(endpoints)
if err != nil {
klog.V(2).ErrorS(err, "Error hashing endpoints", "endpoints", endpoints)
return loadBalancerIdentifier{}, err
}
if len(vip) > 0 {
return loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: vip, endpointsHash: hash}, nil
}
return loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, endpointsHash: hash}, nil
return loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: vip, endpointsHash: hash, isIPv6: isIpv6}, nil
}
func (hns hns) getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerInfo, error) {
@@ -325,6 +324,7 @@ func (hns hns) getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerIn
}
loadBalancers := make(map[loadBalancerIdentifier]*(loadBalancerInfo))
for _, lb := range lbs {
isIPv6 := (lb.Flags & LoadBalancerFlagsIPv6) == LoadBalancerFlagsIPv6
portMap := lb.PortMappings[0]
// Compute hash from backends (endpoint IDs)
hash, err := hashEndpoints(lb.HostComputeEndpoints)
@@ -334,9 +334,9 @@ func (hns hns) getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerIn
}
if len(lb.FrontendVIPs) == 0 {
// Leave VIP uninitialized
id = loadBalancerIdentifier{protocol: uint16(portMap.Protocol), internalPort: portMap.InternalPort, externalPort: portMap.ExternalPort, endpointsHash: hash}
id = loadBalancerIdentifier{protocol: uint16(portMap.Protocol), internalPort: portMap.InternalPort, externalPort: portMap.ExternalPort, endpointsHash: hash, isIPv6: isIPv6}
} else {
id = loadBalancerIdentifier{protocol: uint16(portMap.Protocol), internalPort: portMap.InternalPort, externalPort: portMap.ExternalPort, vip: lb.FrontendVIPs[0], endpointsHash: hash}
id = loadBalancerIdentifier{protocol: uint16(portMap.Protocol), internalPort: portMap.InternalPort, externalPort: portMap.ExternalPort, vip: lb.FrontendVIPs[0], endpointsHash: hash, isIPv6: isIPv6}
}
loadBalancers[id] = &loadBalancerInfo{
hnsID: lb.Id,
@@ -355,6 +355,7 @@ func (hns hns) getLoadBalancer(endpoints []endpointInfo, flags loadBalancerFlags
protocol,
internalPort,
externalPort,
flags.isIPv6,
)
if lbIdErr != nil {
@@ -461,10 +462,10 @@ func (hns hns) updateLoadBalancer(hnsID string,
return nil, err
}
if len(vip) > 0 {
id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: vip, endpointsHash: hash}
id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: vip, endpointsHash: hash, isIPv6: flags.isIPv6}
vips = append(vips, vip)
} else {
id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, endpointsHash: hash}
id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, endpointsHash: hash, isIPv6: flags.isIPv6}
}
if lb, found := previousLoadBalancers[id]; found {
@@ -531,8 +532,13 @@ func (hns hns) deleteLoadBalancer(hnsID string) error {
// There is a bug in Windows Server 2019, that can cause the delete call to fail sometimes. We retry one more time.
// TODO: The logic in syncProxyRules should be rewritten in the future to better stage and handle a call like this failing using the policyApplied fields.
klog.V(1).ErrorS(err, "Error deleting Hns loadbalancer policy resource. Attempting one more time...", "loadBalancer", lb)
return hns.hcn.DeleteLoadBalancer(lb)
err = hns.hcn.DeleteLoadBalancer(lb)
}
if err != nil {
klog.V(2).ErrorS(err, "Error deleting Hns loadbalancer policy resource again.", "hnsID", hnsID)
return err
}
klog.V(3).InfoS("Deleted Hns loadbalancer policy resource", "hnsID", hnsID)
return err
}

View File

@@ -30,21 +30,24 @@ import (
)
const (
sourceVip = "192.168.1.2"
serviceVip = "11.0.0.1"
addressPrefix = "192.168.1.0/24"
gatewayAddress = "192.168.1.1"
epMacAddress = "00-11-22-33-44-55"
epIpAddress = "192.168.1.3"
epIpv6Address = "192::3"
epIpAddressB = "192.168.1.4"
epIpAddressRemote = "192.168.2.3"
epIpAddressLocal1 = "192.168.4.4"
epIpAddressLocal2 = "192.168.4.5"
epPaAddress = "10.0.0.3"
protocol = 6
internalPort = 80
externalPort = 32440
sourceVip = "192.168.1.2"
serviceVip = "11.0.0.1"
addressPrefix = "192.168.1.0/24"
gatewayAddress = "192.168.1.1"
epMacAddress = "00-11-22-33-44-55"
epIpAddress = "192.168.1.3"
epIpv6Address = "192::3"
epIpAddressB = "192.168.1.4"
epIpAddressLocal = "192.168.5.3"
epIpAddressLocalv6 = "192::5:3"
epIpAddressRemote = "192.168.2.3"
epIpAddressRemotev6 = "192::2:3"
epIpAddressLocal1 = "192.168.4.4"
epIpAddressLocal2 = "192.168.4.5"
epPaAddress = "10.0.0.3"
protocol = 6
internalPort = 80
externalPort = 32440
)
func TestGetNetworkByName(t *testing.T) {

View File

@@ -120,6 +120,7 @@ type loadBalancerIdentifier struct {
externalPort uint16
vip string
endpointsHash [20]byte
isIPv6 bool // isIPv6 flag makes the identifier unique per IP family. In a dualstack etp:local service, nodeport lb will be configured with empty vip, same port and protocol. Since ids of only localendpoints (both ipv4 and v6 ips present in same object) are considered for ep hash, endpointsHash will also be same.
}
func (info loadBalancerIdentifier) String() string {
@@ -604,15 +605,14 @@ func (proxier *Proxier) newServiceInfo(port *v1.ServicePort, service *v1.Service
info.winProxyOptimization = winProxyOptimization
klog.V(3).InfoS("Flags enabled for service", "service", service.Name, "localTrafficDSR", localTrafficDSR, "internalTrafficLocal", internalTrafficLocal, "preserveDIP", preserveDIP, "winProxyOptimization", winProxyOptimization)
for _, eip := range service.Spec.ExternalIPs {
info.externalIPs = append(info.externalIPs, &externalIPInfo{ip: eip})
for _, eip := range info.ExternalIPs() {
info.externalIPs = append(info.externalIPs, &externalIPInfo{ip: eip.String()})
}
for _, ingress := range service.Status.LoadBalancer.Ingress {
if netutils.ParseIPSloppy(ingress.IP) != nil {
info.loadBalancerIngressIPs = append(info.loadBalancerIngressIPs, &loadBalancerIngressInfo{ip: ingress.IP})
}
for _, ingress := range info.LoadBalancerVIPs() {
info.loadBalancerIngressIPs = append(info.loadBalancerIngressIPs, &loadBalancerIngressInfo{ip: ingress.String()})
}
return info
}
@@ -1271,7 +1271,9 @@ func (proxier *Proxier) syncProxyRules() (retryError error) {
}
}
klog.V(3).InfoS("Syncing Policies")
klog.V(3).InfoS("Syncing Policies", "proxierFamily", proxier.ipFamily, "serviceCount", len(proxier.svcPortMap), "endpointCount", len(proxier.endpointsMap), "queriedEndpointsCount", len(queriedEndpoints), "queriedLoadBalancersCount", len(queriedLoadBalancers))
defer klog.V(3).InfoS("Syncing Policies complete", "proxierFamily", proxier.ipFamily)
// Program HNS by adding corresponding policies for each service.
for svcName, svc := range proxier.svcPortMap {
@@ -1545,12 +1547,12 @@ func (proxier *Proxier) syncProxyRules() (retryError error) {
queriedLoadBalancers,
)
if err != nil {
klog.ErrorS(err, "ClusterIP policy creation failed")
klog.ErrorS(err, "ClusterIP policy creation failed", "clusterIP", svcInfo.ClusterIP(), "serviceName", svcName, "proxierIPFamily", proxier.ipFamily)
continue
}
svcInfo.hnsID = hnsLoadBalancer.hnsID
klog.V(3).InfoS("Hns LoadBalancer resource created for cluster ip resources", "clusterIP", svcInfo.ClusterIP(), "hnsID", hnsLoadBalancer.hnsID)
klog.V(3).InfoS("Hns LoadBalancer resource created for cluster ip resources", "clusterIP", svcInfo.ClusterIP(), "hnsID", hnsLoadBalancer.hnsID, "serviceName", svcName, "proxierIPFamily", proxier.ipFamily)
} else {
klog.V(3).InfoS("Skipped creating Hns LoadBalancer for cluster ip resources. Reason : all endpoints are terminating", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "allEndpointsTerminating", allEndpointsTerminating)
@@ -1599,16 +1601,18 @@ func (proxier *Proxier) syncProxyRules() (retryError error) {
queriedLoadBalancers,
)
if err != nil {
klog.ErrorS(err, "Nodeport policy creation failed")
klog.ErrorS(err, "Nodeport policy creation failed", "nodeport", svcInfo.NodePort(), "sourceVip", sourceVip, "proxierIPFamily", proxier.ipFamily, "nodeIP", proxier.nodeIP.String())
continue
}
svcInfo.nodePorthnsID = hnsLoadBalancer.hnsID
klog.V(3).InfoS("Hns LoadBalancer resource created for nodePort resources", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "hnsID", hnsLoadBalancer.hnsID)
klog.V(3).InfoS("Hns LoadBalancer resource created for nodePort resources", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "hnsID", hnsLoadBalancer.hnsID, "sourceVip", sourceVip, "proxierIPFamily", proxier.ipFamily, "nodeIP", proxier.nodeIP.String())
} else {
klog.V(3).InfoS("Skipped creating Hns LoadBalancer for nodePort resources", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "allEndpointsTerminating", allEndpointsTerminating)
}
}
} else {
klog.V(3).InfoS("Skipped configuring NodePort for service", "serviceName", svcName, "nodePort", svcInfo.NodePort(), "proxierIPFamily", proxier.ipFamily)
}
// Create a Load Balancer Policy for each external IP
@@ -1653,7 +1657,7 @@ func (proxier *Proxier) syncProxyRules() (retryError error) {
queriedLoadBalancers,
)
if err != nil {
klog.ErrorS(err, "ExternalIP policy creation failed")
klog.ErrorS(err, "ExternalIP policy creation failed", "externalIP", externalIP.ip)
continue
}
externalIP.hnsID = hnsLoadBalancer.hnsID
@@ -1663,6 +1667,7 @@ func (proxier *Proxier) syncProxyRules() (retryError error) {
}
}
}
// Create a Load Balancer Policy for each loadbalancer ingress
for _, lbIngressIP := range svcInfo.loadBalancerIngressIPs {
// Try loading existing policies, if already available
@@ -1703,11 +1708,11 @@ func (proxier *Proxier) syncProxyRules() (retryError error) {
queriedLoadBalancers,
)
if err != nil {
klog.ErrorS(err, "IngressIP policy creation failed")
klog.ErrorS(err, "IngressIP policy creation failed", "lbIngressIP", lbIngressIP.ip, "serviceName", svcName, "proxierIPFamily", proxier.ipFamily)
continue
}
lbIngressIP.hnsID = hnsLoadBalancer.hnsID
klog.V(3).InfoS("Hns LoadBalancer resource created for loadBalancer Ingress resources", "lbIngressIPInfo", lbIngressIP)
klog.V(3).InfoS("Hns LoadBalancer resource created for loadBalancer Ingress resources", "lbIngressIPInfo", lbIngressIP, "hnsID", hnsLoadBalancer.hnsID, "serviceName", svcName, "proxierIPFamily", proxier.ipFamily)
} else {
klog.V(3).InfoS("Skipped creating Hns LoadBalancer for loadBalancer Ingress resources", "lbIngressIPInfo", lbIngressIP)
}
@@ -1753,7 +1758,7 @@ func (proxier *Proxier) syncProxyRules() (retryError error) {
queriedLoadBalancers,
)
if err != nil {
klog.ErrorS(err, "Healthcheck loadbalancer policy creation failed")
klog.ErrorS(err, "Healthcheck loadbalancer policy creation failed", "lbIngressIP", lbIngressIP.ip, "serviceName", svcName, "proxierIPFamily", proxier.ipFamily)
continue
}
lbIngressIP.healthCheckHnsID = hnsHealthCheckLoadBalancer.hnsID
@@ -1817,6 +1822,7 @@ func (proxier *Proxier) deleteExistingLoadBalancer(hns HostNetworkService, winPr
protocol,
intPort,
extPort,
proxier.ipFamily == v1.IPv6Protocol,
)
if lbIdErr != nil {
@@ -1832,10 +1838,13 @@ func (proxier *Proxier) deleteExistingLoadBalancer(hns HostNetworkService, winPr
}
func (proxier *Proxier) deleteLoadBalancer(hns HostNetworkService, lbHnsID *string) bool {
klog.V(3).InfoS("Hns LoadBalancer delete triggered for loadBalancer resources", "lbHnsID", *lbHnsID)
klog.V(3).InfoS("Hns LoadBalancer delete triggered for loadBalancer resources", "lbHnsID", *lbHnsID, "proxierIPFamily", proxier.ipFamily)
if err := hns.deleteLoadBalancer(*lbHnsID); err != nil {
klog.ErrorS(err, "LoadBalancer deletion failed. Adding to stale loadbalancers.", "hnsID", *lbHnsID)
// This will be cleanup by cleanupStaleLoadbalancer fnction.
proxier.mapStaleLoadbalancers[*lbHnsID] = true
} else {
klog.V(3).InfoS("Hns LoadBalancer resource deleted for loadBalancer resources", "lbHnsID", *lbHnsID)
}
*lbHnsID = ""
return true

View File

@@ -57,15 +57,21 @@ const (
guid = "123ABC"
networkId = "123ABC"
endpointGuid1 = "EPID-1"
emptyLbID = ""
loadbalancerGuid1 = "LBID-1"
loadbalancerGuid2 = "LBID-2"
endpointLocal1 = "EP-LOCAL-1"
endpointLocal2 = "EP-LOCAL-2"
endpointGw = "EP-GW"
epIpAddressGw = "192.168.2.1"
epIpAddressGwv6 = "192::1"
epMacAddressGw = "00-11-22-33-44-66"
)
var (
loadbalancerGuids = []string{emptyLbID, "LBID-1", "LBID-2", "LBID-3", "LBID-4", "LBID-5", "LBID-6", "LBID-7", "LBID-8"}
)
func newHnsNetwork(networkInfo *hnsNetworkInfo) *hcn.HostComputeNetwork {
var policies []hcn.NetworkPolicy
for _, remoteSubnet := range networkInfo.remoteSubnets {
@@ -92,9 +98,19 @@ func newHnsNetwork(networkInfo *hnsNetworkInfo) *hcn.HostComputeNetwork {
return network
}
func NewFakeProxier(t *testing.T, nodeName string, nodeIP net.IP, networkType string, enableDSR bool) *Proxier {
func NewFakeProxier(t *testing.T, nodeName string, nodeIP net.IP, networkType string, enableDSR bool, ipFamily ...v1.IPFamily) *Proxier {
sourceVip := "192.168.1.2"
// Default to IPv4 if no ipFamily is provided
family := v1.IPv4Protocol
if len(ipFamily) > 0 {
family = ipFamily[0]
}
if family == v1.IPv6Protocol {
sourceVip = "192::1:2"
}
// enable `WinDSR` feature gate
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.WinDSR, true)
@@ -108,7 +124,7 @@ func NewFakeProxier(t *testing.T, nodeName string, nodeIP net.IP, networkType st
hcnMock := getHcnMock(networkType)
proxier, _ := newProxierInternal(
v1.IPv4Protocol,
family,
nodeName,
nodeIP,
healthcheck.NewFakeServiceHealthServer(),
@@ -1715,6 +1731,203 @@ func TestWinDSRWithOverlayEnabled(t *testing.T) {
}
}
func testDualStackService(t *testing.T, enableDsr bool, dualStackFamilyPolicy v1.IPFamilyPolicy, trafficPolicy v1.ServiceExternalTrafficPolicy) {
baseFlag := hcn.LoadBalancerFlagsNone
if trafficPolicy == v1.ServiceExternalTrafficPolicyLocal {
baseFlag = hcn.LoadBalancerFlagsDSR
}
v4Proxier := NewFakeProxier(t, testNodeName, netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY, enableDsr)
if v4Proxier == nil {
t.Error()
}
v6Proxier := NewFakeProxier(t, testNodeName, netutils.ParseIPSloppy("10::1"), NETWORK_TYPE_OVERLAY, enableDsr, v1.IPv6Protocol)
if v6Proxier == nil {
t.Error()
}
v6Proxier.hcn = v4Proxier.hcn // Share the same HCN mock to simulate dual stack behavior
svcPort := 80
svcNodePort := 3001
svcPortName := proxy.ServicePortName{
NamespacedName: makeNSN("ns1", "svc1"),
Port: "p80",
Protocol: v1.ProtocolTCP,
}
v4ClusterIP, v6ClusterIP := "10.20.30.41", "fd00::21"
v4LbIP, v6LbIP := "11.21.31.41", "fd00::1"
testService := makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeLoadBalancer
svc.Spec.ClusterIP = v4ClusterIP
svc.Spec.ClusterIPs = []string{v4ClusterIP, v6ClusterIP}
svc.Spec.IPFamilyPolicy = &dualStackFamilyPolicy
svc.Spec.IPFamilies = []v1.IPFamily{v1.IPv4Protocol, v1.IPv6Protocol}
svc.Spec.ExternalTrafficPolicy = trafficPolicy
svc.Spec.Ports = []v1.ServicePort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: v1.ProtocolTCP,
NodePort: int32(svcNodePort),
}}
svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{
{IP: v4LbIP},
{IP: v6LbIP},
}
})
v4Endpoints := []discovery.Endpoint{{
Addresses: []string{epIpAddressLocal},
NodeName: ptr.To(testNodeName),
}}
v6Endpoints := []discovery.Endpoint{{
Addresses: []string{epIpAddressLocalv6},
NodeName: ptr.To(testNodeName),
}}
if trafficPolicy == v1.ServiceExternalTrafficPolicyCluster {
v4Endpoints = append(v4Endpoints, discovery.Endpoint{
Addresses: []string{epIpAddressRemote},
})
v6Endpoints = append(v6Endpoints, discovery.Endpoint{
Addresses: []string{epIpAddressRemotev6},
})
}
testEpSliceV4 := makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = v4Endpoints
eps.Ports = []discovery.EndpointPort{{
Name: ptr.To(svcPortName.Port),
Port: ptr.To(int32(svcPort)),
Protocol: ptr.To(v1.ProtocolTCP),
}}
})
testEpSliceV6 := makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv6
eps.Endpoints = v6Endpoints
eps.Ports = []discovery.EndpointPort{{
Name: ptr.To(svcPortName.Port),
Port: ptr.To(int32(svcPort)),
Protocol: ptr.To(v1.ProtocolTCP),
}}
})
makeServiceMap(v4Proxier, testService)
makeServiceMap(v6Proxier, testService)
populateEndpointSlices(v4Proxier, testEpSliceV4)
populateEndpointSlices(v6Proxier, testEpSliceV6)
hcnMock := (v4Proxier.hcn).(*fakehcn.HcnMock)
v4Proxier.rootHnsEndpointName = endpointGw
v6Proxier.rootHnsEndpointName = endpointGw
hcnMock.PopulateQueriedEndpoints(endpointLocal1, guid, epIpAddressLocal, macAddress, prefixLen)
hcnMock.PopulateQueriedEndpoints(endpointLocal1, guid, epIpAddressLocalv6, macAddress, prefixLen)
hcnMock.PopulateQueriedEndpoints(endpointGw, guid, epIpAddressGw, epMacAddressGw, prefixLen)
hcnMock.PopulateQueriedEndpoints(endpointGw, guid, epIpAddressGwv6, epMacAddressGw, prefixLen)
v4Proxier.setInitialized(true)
v4Proxier.syncProxyRules()
v4Svc := v4Proxier.svcPortMap[svcPortName]
v4SvcInfo, ok := v4Svc.(*serviceInfo)
assert.True(t, ok, "Failed to cast serviceInfo %q", svcPortName.String())
assert.Equal(t, loadbalancerGuids[1], v4SvcInfo.hnsID, "The Hns Loadbalancer Id %v does not match %v. ServicePortName %q", v4SvcInfo.hnsID, loadbalancerGuids[1], svcPortName.String())
assert.Equal(t, v4LbIP, v4SvcInfo.loadBalancerIngressIPs[0].ip, "LoadBalancer Ingress IP 0 should be %s", v4LbIP)
assert.Equal(t, loadbalancerGuids[2], v4SvcInfo.nodePorthnsID, "The Hns NodePort Loadbalancer Id %v does not match %v. ServicePortName %q", v4SvcInfo.nodePorthnsID, loadbalancerGuids[2], svcPortName.String())
assert.Equal(t, 1, len(v4SvcInfo.loadBalancerIngressIPs), "Expected 1 LoadBalancer Ingress IP, found %d", len(v4SvcInfo.loadBalancerIngressIPs))
assert.Equal(t, loadbalancerGuids[3], v4SvcInfo.loadBalancerIngressIPs[0].hnsID, "The Hns Loadbalancer Ingress Id %v does not match %v. ServicePortName %q", v4SvcInfo.loadBalancerIngressIPs[0].hnsID, loadbalancerGuids[3], svcPortName.String())
assert.Equal(t, loadbalancerGuids[4], v4SvcInfo.loadBalancerIngressIPs[0].healthCheckHnsID, "The Hns Loadbalancer HealthCheck Id %v does not match %v. ServicePortName %q", v4SvcInfo.loadBalancerIngressIPs[0].healthCheckHnsID, loadbalancerGuids[4], svcPortName.String())
// Validating ClusterIP Loadbalancer
clusterIPLb, err := v4Proxier.hcn.GetLoadBalancerByID(v4SvcInfo.hnsID)
assert.Equal(t, nil, err, "Failed to fetch loadbalancer: %v", err)
assert.NotEqual(t, nil, clusterIPLb, "Loadbalancer object should not be nil")
assert.Equal(t, baseFlag, clusterIPLb.Flags, "Loadbalancer Flags should be %v for IPv4 stack", baseFlag)
// Validating NodePort Loadbalancer
nodePortLb, err := v4Proxier.hcn.GetLoadBalancerByID(v4SvcInfo.nodePorthnsID)
assert.Equal(t, nil, err, "Failed to fetch NodePort loadbalancer: %v", err)
assert.NotEqual(t, nil, nodePortLb, "NodePort Loadbalancer object should not be nil")
assert.Equal(t, baseFlag, nodePortLb.Flags, "NodePort Loadbalancer Flags should be %v for IPv4 stack", baseFlag)
// Validating V4 IngressIP Loadbalancer
ingressLb, err := v4Proxier.hcn.GetLoadBalancerByID(v4SvcInfo.loadBalancerIngressIPs[0].hnsID)
assert.Equal(t, nil, err, "Failed to fetch IngressIP loadbalancer: %v", err)
assert.NotEqual(t, nil, ingressLb, "IngressIP Loadbalancer object should not be nil")
assert.Equal(t, baseFlag, ingressLb.Flags, "IngressIP Loadbalancer Flags should be %v for IPv4 stack", baseFlag)
if trafficPolicy == v1.ServiceExternalTrafficPolicyLocal {
assert.Equal(t, 1, len(clusterIPLb.HostComputeEndpoints), "Expected 1 endpoint for local traffic policy, found %d", len(clusterIPLb.HostComputeEndpoints))
assert.Equal(t, 1, len(nodePortLb.HostComputeEndpoints), "Expected 1 endpoint for local traffic policy, found %d", len(nodePortLb.HostComputeEndpoints))
assert.Equal(t, 1, len(ingressLb.HostComputeEndpoints), "Expected 1 endpoint for local traffic policy, found %d", len(ingressLb.HostComputeEndpoints))
} else {
assert.Equal(t, 2, len(clusterIPLb.HostComputeEndpoints), "Expected 2 endpoints for cluster traffic policy, found %d", len(clusterIPLb.HostComputeEndpoints))
assert.Equal(t, 2, len(nodePortLb.HostComputeEndpoints), "Expected 2 endpoints for cluster traffic policy, found %d", len(nodePortLb.HostComputeEndpoints))
assert.Equal(t, 2, len(ingressLb.HostComputeEndpoints), "Expected 2 endpoints for cluster traffic policy, found %d", len(ingressLb.HostComputeEndpoints))
}
// Initiating v6 tests.
v6Proxier.setInitialized(true)
v6Proxier.syncProxyRules()
v6Svc := v6Proxier.svcPortMap[svcPortName]
v6SvcInfo, ok := v6Svc.(*serviceInfo)
assert.True(t, ok, "Failed to cast serviceInfo %q", svcPortName.String())
assert.Equal(t, loadbalancerGuids[5], v6SvcInfo.hnsID, "The Hns Loadbalancer Id %v does not match %v. ServicePortName %q", v6SvcInfo.hnsID, loadbalancerGuids[5], svcPortName.String())
assert.Equal(t, loadbalancerGuids[6], v6SvcInfo.nodePorthnsID, "The Hns NodePort Loadbalancer Id %v does not match %v. ServicePortName %q", v6SvcInfo.nodePorthnsID, loadbalancerGuids[6], svcPortName.String())
assert.Equal(t, 1, len(v6SvcInfo.loadBalancerIngressIPs), "Expected 1 LoadBalancer Ingress IP, found %d", len(v6SvcInfo.loadBalancerIngressIPs))
assert.Equal(t, v6LbIP, v6SvcInfo.loadBalancerIngressIPs[0].ip, "LoadBalancer Ingress IP 0 should be %s", v6LbIP)
assert.Equal(t, loadbalancerGuids[7], v6SvcInfo.loadBalancerIngressIPs[0].hnsID, "The Hns Loadbalancer Ingress Id %v does not match %v for IPv6 LB. ServicePortName %q", v6SvcInfo.loadBalancerIngressIPs[0].hnsID, loadbalancerGuids[7], svcPortName.String())
assert.Equal(t, loadbalancerGuids[8], v6SvcInfo.loadBalancerIngressIPs[0].healthCheckHnsID, "The Hns Loadbalancer HealthCheck Id %v does not match %v. ServicePortName %q", v6SvcInfo.loadBalancerIngressIPs[0].healthCheckHnsID, loadbalancerGuids[8], svcPortName.String())
// Validating ClusterIP Loadbalancer
clusterIPLb, err = v6Proxier.hcn.GetLoadBalancerByID(v6SvcInfo.hnsID)
assert.Equal(t, nil, err, "Failed to fetch loadbalancer: %v", err)
assert.NotEqual(t, nil, clusterIPLb, "Loadbalancer object should not be nil")
assert.Equal(t, baseFlag|hcn.LoadBalancerFlagsIPv6, clusterIPLb.Flags, "Loadbalancer Flags should be %v for IPv6 stack", baseFlag|hcn.LoadBalancerFlagsIPv6)
// Validating V6 IngressIP Loadbalancer
ingressLb, err = v6Proxier.hcn.GetLoadBalancerByID(v6SvcInfo.loadBalancerIngressIPs[0].hnsID)
assert.Equal(t, nil, err, "Failed to fetch IngressIP loadbalancer: %v", err)
assert.NotEqual(t, nil, ingressLb, "IngressIP Loadbalancer object should not be nil")
assert.Equal(t, baseFlag|hcn.LoadBalancerFlagsIPv6, ingressLb.Flags, "IngressIP Loadbalancer Flags should be %v for IPv6 stack", baseFlag|hcn.LoadBalancerFlagsIPv6)
eps, _ := hcnMock.ListEndpoints()
if trafficPolicy == v1.ServiceExternalTrafficPolicyLocal {
assert.Equal(t, 10, len(eps), "Expected 10 endpoints for local traffic policy, found %d", len(eps))
assert.Equal(t, 1, len(clusterIPLb.HostComputeEndpoints), "Expected 1 endpoint for local traffic policy, found %d", len(clusterIPLb.HostComputeEndpoints))
assert.Equal(t, 1, len(nodePortLb.HostComputeEndpoints), "Expected 1 endpoint for local traffic policy, found %d", len(nodePortLb.HostComputeEndpoints))
assert.Equal(t, 1, len(ingressLb.HostComputeEndpoints), "Expected 1 endpoint for local traffic policy, found %d", len(ingressLb.HostComputeEndpoints))
} else {
assert.Equal(t, 14, len(eps), "Expected 14 endpoints for cluster traffic policy, found %d", len(eps))
assert.Equal(t, 2, len(clusterIPLb.HostComputeEndpoints), "Expected 2 endpoints for cluster traffic policy, found %d", len(clusterIPLb.HostComputeEndpoints))
assert.Equal(t, 2, len(nodePortLb.HostComputeEndpoints), "Expected 2 endpoints for cluster traffic policy, found %d", len(nodePortLb.HostComputeEndpoints))
assert.Equal(t, 2, len(ingressLb.HostComputeEndpoints), "Expected 2 endpoints for cluster traffic policy, found %d", len(ingressLb.HostComputeEndpoints))
}
}
func TestETPLocalIngressIPServiceWithPreferDualStack(t *testing.T) {
testDualStackService(t, true, v1.IPFamilyPolicyPreferDualStack, v1.ServiceExternalTrafficPolicyLocal)
}
func TestETPLocalIngressIPServiceWithRequireDualStack(t *testing.T) {
testDualStackService(t, true, v1.IPFamilyPolicyRequireDualStack, v1.ServiceExternalTrafficPolicyLocal)
}
func TestETPClusterIngressIPServiceWithPreferDualStack(t *testing.T) {
testDualStackService(t, false, v1.IPFamilyPolicyPreferDualStack, v1.ServiceExternalTrafficPolicyCluster)
}
func TestETPClusterIngressIPServiceWithRequireDualStack(t *testing.T) {
testDualStackService(t, false, v1.IPFamilyPolicyRequireDualStack, v1.ServiceExternalTrafficPolicyCluster)
}
func makeNSN(namespace, name string) types.NamespacedName {
return types.NamespacedName{Namespace: namespace, Name: name}
}
@@ -1766,6 +1979,10 @@ func populateEndpointSlices(proxier *Proxier, allEndpointSlices ...*discovery.En
for i := range allEndpointSlices {
proxier.OnEndpointSliceAdd(allEndpointSlices[i])
}
proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.endpointSlicesSynced = true
}
func makeTestEndpointSlice(namespace, name string, sliceNum int, epsFunc func(*discovery.EndpointSlice)) *discovery.EndpointSlice {

View File

@@ -70,6 +70,15 @@ func NewHcnMock(hnsNetwork *hcn.HostComputeNetwork) *HcnMock {
}
func (hcnObj HcnMock) PopulateQueriedEndpoints(epId, hnsId, ipAddress, mac string, prefixLen uint8) {
if endpoint, ok := endpointMap[epId]; ok {
endpoint.IpConfigurations = append(endpoint.IpConfigurations, hcn.IpConfig{
IpAddress: ipAddress,
PrefixLength: prefixLen,
})
return
}
endpoint := &hcn.HostComputeEndpoint{
Id: epId,
Name: epId,
@@ -179,6 +188,17 @@ func (hcnObj HcnMock) CreateLoadBalancer(loadBalancer *hcn.HostComputeLoadBalanc
if _, ok := loadbalancerMap[loadBalancer.Id]; ok {
return nil, fmt.Errorf("LoadBalancer id %s Already Present", loadBalancer.Id)
}
for _, lb := range loadbalancerMap {
portMappingMatched := lb.PortMappings != nil && lb.PortMappings[0].ExternalPort == loadBalancer.PortMappings[0].ExternalPort && lb.PortMappings[0].Protocol == loadBalancer.PortMappings[0].Protocol
portMappingMatched = portMappingMatched && (lb.PortMappings[0].InternalPort == loadBalancer.PortMappings[0].InternalPort)
portMappingMatched = portMappingMatched && len(lb.FrontendVIPs) != 0 && len(loadBalancer.FrontendVIPs) != 0 && lb.FrontendVIPs[0] == loadBalancer.FrontendVIPs[0]
if portMappingMatched && ((lb.Flags & hcn.LoadBalancerFlagsIPv6) == (loadBalancer.Flags & hcn.LoadBalancerFlagsIPv6)) {
return nil, fmt.Errorf("The specified port already exists.")
}
if portMappingMatched && lb.PortMappings[0].Flags&hcn.LoadBalancerPortMappingFlagsLocalRoutedVIP == 0 && lb.FrontendVIPs[0] == loadBalancer.FrontendVIPs[0] {
return nil, fmt.Errorf("The specified port already exists.")
}
}
loadBalancer.Id = hcnObj.generateLoadbalancerGuid()
loadbalancerMap[loadBalancer.Id] = loadBalancer
return loadBalancer, nil