diff --git a/pkg/proxy/winkernel/hcnutils.go b/pkg/proxy/winkernel/hcnutils.go index afb513b3fed..5c86a113bc0 100644 --- a/pkg/proxy/winkernel/hcnutils.go +++ b/pkg/proxy/winkernel/hcnutils.go @@ -49,6 +49,7 @@ type HcnService interface { DsrSupported() error // Policy functions DeleteAllHnsLoadBalancerPolicy() + RemoteSubnetSupported() error } type hcnImpl struct{} @@ -139,3 +140,7 @@ func (hcnObj hcnImpl) DeleteAllHnsLoadBalancerPolicy() { } } } + +func (hcnObj hcnImpl) RemoteSubnetSupported() error { + return hcn.RemoteSubnetSupported() +} diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index 57c4a908b6f..6279a92c0cb 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -59,6 +59,10 @@ type KernelCompatTester interface { IsCompatible() error } +type HostMacProvider interface { + GetHostMac(nodeIP net.IP) string +} + // CanUseWinKernelProxier returns true if we should use the Kernel Proxier // instead of the "classic" userspace Proxier. This is determined by checking // the windows kernel version and for the existence of kernel features. @@ -723,6 +727,41 @@ func NewProxier( } hcnImpl := newHcnImpl() + proxier, err := newProxierInternal( + ipFamily, + hostname, + nodeIP, + serviceHealthServer, + healthzServer, + healthzPort, + hcnImpl, + &localHostMacProvider{}, + config, + true, // waitForHNSOverlay + ) + if err != nil { + return nil, err + } + + burstSyncs := 2 + klog.V(3).InfoS("Record sync param", "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs) + proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs) + return proxier, nil +} + +// allow internal testing of proxier +func newProxierInternal( + ipFamily v1.IPFamily, + hostname string, + nodeIP net.IP, + serviceHealthServer healthcheck.ServiceHealthServer, + healthzServer *healthcheck.ProxyHealthServer, + healthzPort int, + hcnImpl HcnService, + hostMacProvider HostMacProvider, + config config.KubeProxyWinkernelConfiguration, + waitForHNSOverlay bool, +) (*Proxier, error) { hns, supportedFeatures := newHostNetworkService(hcnImpl) hnsNetworkName, err := getNetworkName(config.NetworkName) if err != nil { @@ -741,7 +780,10 @@ func NewProxier( // Network could have been detected before Remote Subnet Routes are applied or ManagementIP is updated // Sleep and update the network to include new information if isOverlay(hnsNetworkInfo) { - time.Sleep(10 * time.Second) + if waitForHNSOverlay { + time.Sleep(10 * time.Second) + } + hnsNetworkInfo, err = hns.getNetworkByName(hnsNetworkName) if err != nil { return nil, fmt.Errorf("could not find HNS network %s", hnsNetworkName) @@ -765,7 +807,7 @@ func NewProxier( if !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WinOverlay) { return nil, fmt.Errorf("WinOverlay feature gate not enabled") } - err = hcn.RemoteSubnetSupported() + err = hcnImpl.RemoteSubnetSupported() if err != nil { return nil, err } @@ -783,17 +825,8 @@ func NewProxier( } } - interfaces, _ := net.Interfaces() //TODO create interfaces - for _, inter := range interfaces { - addresses, _ := inter.Addrs() - for _, addr := range addresses { - addrIP, _, _ := netutils.ParseCIDRSloppy(addr.String()) - if addrIP.String() == nodeIP.String() { - klog.V(2).InfoS("Record Host MAC address", "addr", inter.HardwareAddr) - hostMac = inter.HardwareAddr.String() - } - } - } + hostMac = hostMacProvider.GetHostMac(nodeIP) + if len(hostMac) == 0 { return nil, fmt.Errorf("could not find host mac address for %s", nodeIP) } @@ -827,9 +860,6 @@ func NewProxier( proxier.endpointsChanges = endPointChangeTracker proxier.serviceChanges = serviceChanges - burstSyncs := 2 - klog.V(3).InfoS("Record sync param", "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs) - proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs) return proxier, nil } @@ -1785,3 +1815,21 @@ func (proxier *Proxier) deleteLoadBalancer(hns HostNetworkService, lbHnsID *stri *lbHnsID = "" return true } + +type localHostMacProvider struct{} + +func (r *localHostMacProvider) GetHostMac(nodeIP net.IP) string { + var hostMac string + interfaces, _ := net.Interfaces() + for _, inter := range interfaces { + addresses, _ := inter.Addrs() + for _, addr := range addresses { + addrIP, _, _ := netutils.ParseCIDRSloppy(addr.String()) + if addrIP.String() == nodeIP.String() { + klog.V(2).InfoS("Record Host MAC address", "addr", inter.HardwareAddr) + hostMac = inter.HardwareAddr.String() + } + } + } + return hostMac +} diff --git a/pkg/proxy/winkernel/proxier_test.go b/pkg/proxy/winkernel/proxier_test.go index e78a8003686..dcfcc0cd49d 100644 --- a/pkg/proxy/winkernel/proxier_test.go +++ b/pkg/proxy/winkernel/proxier_test.go @@ -25,7 +25,6 @@ import ( "net" "strings" "testing" - "time" "github.com/Microsoft/hnslib/hcn" @@ -34,7 +33,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" + kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/proxy" + "k8s.io/kubernetes/pkg/proxy/apis/config" "k8s.io/kubernetes/pkg/proxy/healthcheck" fakehcn "k8s.io/kubernetes/pkg/proxy/winkernel/testing" netutils "k8s.io/utils/net" @@ -84,8 +87,38 @@ func newHnsNetwork(networkInfo *hnsNetworkInfo) *hcn.HostComputeNetwork { return network } -func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, hostname string, nodeIP net.IP, networkType string) *Proxier { +func NewFakeProxier(t *testing.T, hostname string, nodeIP net.IP, networkType string, enableDSR bool) *Proxier { sourceVip := "192.168.1.2" + + // enable `WinDSR` feature gate + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.WinDSR, true) + + config := config.KubeProxyWinkernelConfiguration{ + SourceVip: sourceVip, + EnableDSR: enableDSR, + NetworkName: testNetwork, + ForwardHealthCheckVip: true, + } + + hcnMock := getHcnMock(networkType) + + proxier, _ := newProxierInternal( + v1.IPv4Protocol, + hostname, + nodeIP, + healthcheck.NewFakeServiceHealthServer(), + nil, + 0, + hcnMock, + &testHostMacProvider{macAddress: macAddress}, + config, + false, + ) + + return proxier +} + +func getHcnMock(networkType string) *fakehcn.HcnMock { var remoteSubnets []*remoteSubnetInfo rs := &remoteSubnetInfo{ destinationPrefix: destinationPrefix, @@ -102,37 +135,11 @@ func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, hostn } hnsNetwork := newHnsNetwork(hnsNetworkInfo) hcnMock := fakehcn.NewHcnMock(hnsNetwork) - proxier := &Proxier{ - svcPortMap: make(proxy.ServicePortMap), - endpointsMap: make(proxy.EndpointsMap), - hostname: testHostName, - nodeIP: nodeIP, - serviceHealthServer: healthcheck.NewFakeServiceHealthServer(), - network: *hnsNetworkInfo, - sourceVip: sourceVip, - hostMac: macAddress, - isDSR: false, - hns: &hns{ - hcn: hcnMock, - }, - hcn: hcnMock, - endPointsRefCount: make(endPointsReferenceCountMap), - forwardHealthCheckVip: true, - mapStaleLoadbalancers: make(map[string]bool), - terminatedEndpoints: make(map[string]bool), - } - - serviceChanges := proxy.NewServiceChangeTracker(v1.IPv4Protocol, proxier.newServiceInfo, proxier.serviceMapChange) - endpointChangeTracker := proxy.NewEndpointsChangeTracker(v1.IPv4Protocol, hostname, proxier.newEndpointInfo, proxier.endpointsMapChange) - proxier.endpointsChanges = endpointChangeTracker - proxier.serviceChanges = serviceChanges - - return proxier + return hcnMock } func TestCreateServiceVip(t *testing.T) { - syncPeriod := 30 * time.Second - proxier := NewFakeProxier(syncPeriod, syncPeriod, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY) + proxier := NewFakeProxier(t, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY, true) if proxier == nil { t.Error() } @@ -185,8 +192,7 @@ func TestCreateServiceVip(t *testing.T) { } func TestCreateRemoteEndpointOverlay(t *testing.T) { - syncPeriod := 30 * time.Second - proxier := NewFakeProxier(syncPeriod, syncPeriod, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY) + proxier := NewFakeProxier(t, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY, false) if proxier == nil { t.Error() } @@ -250,8 +256,7 @@ func TestCreateRemoteEndpointOverlay(t *testing.T) { } func TestCreateRemoteEndpointL2Bridge(t *testing.T) { - syncPeriod := 30 * time.Second - proxier := NewFakeProxier(syncPeriod, syncPeriod, "testhost", netutils.ParseIPSloppy("10.0.0.1"), "L2Bridge") + proxier := NewFakeProxier(t, "testhost", netutils.ParseIPSloppy("10.0.0.1"), "L2Bridge", false) if proxier == nil { t.Error() } @@ -311,9 +316,104 @@ func TestCreateRemoteEndpointL2Bridge(t *testing.T) { t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[endpointGuid1], *epInfo.refCount) } } + +func TestDsrEndpointsAreCreatedCorrectly(t *testing.T) { + proxier := NewFakeProxier(t, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY, true) + if proxier == nil { + t.Fatal("Failed to create proxier") + } + + svcIP := "10.20.30.41" + svcPort := 80 + svcPortName := proxy.ServicePortName{ + NamespacedName: makeNSN("ns1", "svc1"), + Port: "p80", + Protocol: v1.ProtocolTCP, + } + + makeServiceMap(proxier, + makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { + svc.Spec.Type = "ClusterIP" + svc.Spec.ClusterIP = svcIP + svc.Spec.Ports = []v1.ServicePort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: v1.ProtocolTCP, + }} + }), + ) + + populateEndpointSlices(proxier, + makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { + eps.AddressType = discovery.AddressTypeIPv4 + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{epIpAddressRemote}, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: ptr.To(svcPortName.Port), + Port: ptr.To(int32(svcPort)), + Protocol: ptr.To(v1.ProtocolTCP), + }} + }), + ) + + proxier.setInitialized(true) + proxier.syncProxyRules() + + ep := proxier.endpointsMap[svcPortName][0] + epInfo, ok := ep.(*endpointInfo) + if !ok { + t.Errorf("Failed to cast endpointInfo %q", svcPortName.String()) + } + + if epInfo.hnsID == "" { + t.Errorf("Expected HNS ID to be set for endpoint %s, but got empty value", epIpAddressRemote) + } +} + +func TestDsrNotAppliedToClusterTrafficPolicy(t *testing.T) { + proxier := NewFakeProxier(t, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY, true) + if proxier == nil { + t.Fatal("Failed to create proxier") + } + + svcIP := "10.20.30.41" + svcPort := 80 + svcPortName := proxy.ServicePortName{ + NamespacedName: makeNSN("ns1", "svc1"), + Port: "p80", + Protocol: v1.ProtocolTCP, + } + + makeServiceMap(proxier, + makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { + svc.Spec.Type = "ClusterIP" + svc.Spec.ClusterIP = svcIP + svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyCluster + svc.Spec.Ports = []v1.ServicePort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: v1.ProtocolTCP, + }} + }), + ) + + proxier.setInitialized(true) + proxier.syncProxyRules() + + svc := proxier.svcPortMap[svcPortName] + svcInfo, ok := svc.(*serviceInfo) + if !ok { + t.Errorf("Failed to cast serviceInfo %q", svcPortName.String()) + } + + if svcInfo.localTrafficDSR { + t.Errorf("Expected localTrafficDSR to be false for ExternalTrafficPolicy=Cluster, but got true") + } +} + func TestSharedRemoteEndpointDelete(t *testing.T) { - syncPeriod := 30 * time.Second - proxier := NewFakeProxier(syncPeriod, syncPeriod, "testhost", netutils.ParseIPSloppy("10.0.0.1"), "L2Bridge") + proxier := NewFakeProxier(t, "testhost", netutils.ParseIPSloppy("10.0.0.1"), "L2Bridge", true) if proxier == nil { t.Error() } @@ -454,8 +554,7 @@ func TestSharedRemoteEndpointDelete(t *testing.T) { } } func TestSharedRemoteEndpointUpdate(t *testing.T) { - syncPeriod := 30 * time.Second - proxier := NewFakeProxier(syncPeriod, syncPeriod, "testhost", netutils.ParseIPSloppy("10.0.0.1"), "L2Bridge") + proxier := NewFakeProxier(t, "testhost", netutils.ParseIPSloppy("10.0.0.1"), "L2Bridge", true) if proxier == nil { t.Error() } @@ -627,9 +726,9 @@ func TestSharedRemoteEndpointUpdate(t *testing.T) { t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[endpointGuid1], *epInfo.refCount) } } -func TestCreateLoadBalancer(t *testing.T) { - syncPeriod := 30 * time.Second - proxier := NewFakeProxier(syncPeriod, syncPeriod, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY) + +func TestCreateLoadBalancerWithoutDSR(t *testing.T) { + proxier := NewFakeProxier(t, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY, false) if proxier == nil { t.Error() } @@ -677,16 +776,97 @@ func TestCreateLoadBalancer(t *testing.T) { if !ok { t.Errorf("Failed to cast serviceInfo %q", svcPortName.String()) - } else { - if svcInfo.hnsID != loadbalancerGuid1 { - t.Errorf("%v does not match %v", svcInfo.hnsID, loadbalancerGuid1) - } + } + + if svcInfo.hnsID != loadbalancerGuid1 { + t.Errorf("%v does not match %v", svcInfo.hnsID, loadbalancerGuid1) + } + + lb, err := proxier.hcn.GetLoadBalancerByID(svcInfo.hnsID) + if err != nil { + t.Errorf("Failed to fetch loadbalancer: %v", err) + } + + if lb == nil { + t.Errorf("Failed to fetch loadbalancer: %v", err) + } + + if lb.Flags != hcn.LoadBalancerFlagsNone { + t.Errorf("Incorrect loadbalancer flags. Current value: %v", lb.Flags) + } +} + +func TestCreateLoadBalancerWithDSR(t *testing.T) { + proxier := NewFakeProxier(t, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY, true) + if proxier == nil { + t.Error() + } + + svcIP := "10.20.30.41" + svcPort := 80 + svcNodePort := 3001 + svcPortName := proxy.ServicePortName{ + NamespacedName: makeNSN("ns1", "svc1"), + Port: "p80", + Protocol: v1.ProtocolTCP, + } + + makeServiceMap(proxier, + makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { + svc.Spec.Type = "NodePort" + svc.Spec.ClusterIP = svcIP + svc.Spec.Ports = []v1.ServicePort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: v1.ProtocolTCP, + NodePort: int32(svcNodePort), + }} + }), + ) + populateEndpointSlices(proxier, + makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { + eps.AddressType = discovery.AddressTypeIPv4 + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{epIpAddressRemote}, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: ptr.To(svcPortName.Port), + Port: ptr.To(int32(svcPort)), + Protocol: ptr.To(v1.ProtocolTCP), + }} + }), + ) + + proxier.setInitialized(true) + proxier.syncProxyRules() + + svc := proxier.svcPortMap[svcPortName] + svcInfo, ok := svc.(*serviceInfo) + if !ok { + t.Errorf("Failed to cast serviceInfo %q", svcPortName.String()) + + } + + if svcInfo.hnsID != loadbalancerGuid1 { + t.Errorf("%v does not match %v", svcInfo.hnsID, loadbalancerGuid1) + } + + lb, err := proxier.hcn.GetLoadBalancerByID(svcInfo.hnsID) + if err != nil { + t.Errorf("Failed to fetch loadbalancer: %v", err) + } + + if lb == nil { + t.Errorf("Failed to fetch loadbalancer: %v", err) + } + + if lb.Flags != hcn.LoadBalancerFlagsDSR { + t.Errorf("Incorrect loadbalancer flags. Current value: %v", lb.Flags) } } func TestUpdateLoadBalancerWhenSupported(t *testing.T) { - syncPeriod := 30 * time.Second - proxier := NewFakeProxier(syncPeriod, syncPeriod, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY) + proxier := NewFakeProxier(t, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY, true) if proxier == nil { t.Error() } @@ -827,8 +1007,7 @@ func TestUpdateLoadBalancerWhenSupported(t *testing.T) { } func TestUpdateLoadBalancerWhenUnsupported(t *testing.T) { - syncPeriod := 30 * time.Second - proxier := NewFakeProxier(syncPeriod, syncPeriod, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY) + proxier := NewFakeProxier(t, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY, true) if proxier == nil { t.Error() } @@ -970,8 +1149,7 @@ func TestUpdateLoadBalancerWhenUnsupported(t *testing.T) { } func TestCreateDsrLoadBalancer(t *testing.T) { - syncPeriod := 30 * time.Second - proxier := NewFakeProxier(syncPeriod, syncPeriod, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY) + proxier := NewFakeProxier(t, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY, true) if proxier == nil { t.Error() } @@ -1038,7 +1216,8 @@ func TestCreateDsrLoadBalancer(t *testing.T) { } if len(svcInfo.loadBalancerIngressIPs) == 0 { t.Errorf("svcInfo does not have any loadBalancerIngressIPs, %+v", svcInfo) - } else if svcInfo.loadBalancerIngressIPs[0].healthCheckHnsID != "LBID-4" { + } + if svcInfo.loadBalancerIngressIPs[0].healthCheckHnsID != "LBID-4" { t.Errorf("The Hns Loadbalancer HealthCheck Id %v does not match %v. ServicePortName %q", svcInfo.loadBalancerIngressIPs[0].healthCheckHnsID, loadbalancerGuid1, svcPortName.String()) } } @@ -1048,8 +1227,7 @@ func TestCreateDsrLoadBalancer(t *testing.T) { // syncproxyrules only creates ClusterIP Loadbalancer and no NodePort, External IP or IngressIP // loadbalancers will be created. func TestClusterIPLBInCreateDsrLoadBalancer(t *testing.T) { - syncPeriod := 30 * time.Second - proxier := NewFakeProxier(syncPeriod, syncPeriod, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY) + proxier := NewFakeProxier(t, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY, false) if proxier == nil { t.Error() @@ -1129,8 +1307,7 @@ func TestClusterIPLBInCreateDsrLoadBalancer(t *testing.T) { } func TestEndpointSlice(t *testing.T) { - syncPeriod := 30 * time.Second - proxier := NewFakeProxier(syncPeriod, syncPeriod, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY) + proxier := NewFakeProxier(t, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY, true) if proxier == nil { t.Error() } @@ -1210,8 +1387,7 @@ func TestNoopEndpointSlice(t *testing.T) { } func TestFindRemoteSubnetProviderAddress(t *testing.T) { - syncPeriod := 30 * time.Second - proxier := NewFakeProxier(syncPeriod, syncPeriod, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY) + proxier := NewFakeProxier(t, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY, true) if proxier == nil { t.Error() } @@ -1236,6 +1412,117 @@ func TestFindRemoteSubnetProviderAddress(t *testing.T) { } } +func TestWinDSRWithOverlayEnabled(t *testing.T) { + proxier := NewFakeProxier(t, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY, true) + if proxier == nil { + t.Error("Failed to create proxier") + } + + svcIP := "10.20.30.41" + svcPort := 80 + svcNodePort := 3001 + svcPortName := proxy.ServicePortName{ + NamespacedName: makeNSN("ns1", "svc1"), + Port: "p80", + Protocol: v1.ProtocolTCP, + } + + makeServiceMap(proxier, + makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { + svc.Spec.Type = "ClusterIP" + svc.Spec.ClusterIP = svcIP + svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal + svc.Spec.Ports = []v1.ServicePort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: v1.ProtocolTCP, + NodePort: int32(svcNodePort), + }} + }), + ) + + proxier.setInitialized(true) + proxier.syncProxyRules() + + svc := proxier.svcPortMap[svcPortName] + svcInfo, ok := svc.(*serviceInfo) + if !ok { + t.Errorf("Failed to cast serviceInfo %q", svcPortName.String()) + } + + if !svcInfo.localTrafficDSR { + t.Errorf("Expected localTrafficDSR to be enabled but got false") + } +} + +func TestDSRFeatureGateValidation(t *testing.T) { + testCases := []struct { + name string + enableDSR bool + featureGate bool + expectFailure bool + }{ + { + name: "DSR enabled but feature gate disabled", + enableDSR: true, + featureGate: false, + expectFailure: true, + }, + { + name: "DSR enabled and feature gate enabled", + enableDSR: true, + featureGate: true, + expectFailure: false, + }, + { + name: "DSR disabled, feature gate does not matter", + enableDSR: false, + featureGate: false, + expectFailure: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Mock feature gate + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.WinDSR, tc.featureGate) + + config := config.KubeProxyWinkernelConfiguration{ + EnableDSR: tc.enableDSR, + NetworkName: testNetwork, + SourceVip: serviceVip, + } + + hostMacProvider := &testHostMacProvider{macAddress: macAddress} + + hcnMock := getHcnMock(NETWORK_TYPE_OVERLAY) + + _, err := newProxierInternal( + v1.IPv4Protocol, // ipFamily + testHostName, // hostname + netutils.ParseIPSloppy("192.168.1.1"), // nodeIP + nil, // serviceHealthServer (not needed in this unit test) + nil, // healthzServer (not needed in this unit test) + 0, // healthzPort + hcnMock, // hcnImpl + hostMacProvider, // hostMacProvider + config, // kube-proxy config + false, // waitForHNSOverlay + ) + + if tc.expectFailure { + if err == nil { + t.Errorf("Expected failure for case %q, but got success", tc.name) + } + } else { + if err != nil { + t.Errorf("Expected success for case %q, but got error: %v", tc.name, err) + } + } + }) + } +} + func makeNSN(namespace, name string) types.NamespacedName { return types.NamespacedName{Namespace: namespace, Name: name} } @@ -1300,3 +1587,11 @@ func makeTestEndpointSlice(namespace, name string, sliceNum int, epsFunc func(*d epsFunc(eps) return eps } + +type testHostMacProvider struct { + macAddress string +} + +func (r *testHostMacProvider) GetHostMac(nodeIP net.IP) string { + return r.macAddress +} diff --git a/pkg/proxy/winkernel/testing/hcnutils_mock.go b/pkg/proxy/winkernel/testing/hcnutils_mock.go index 4b1b8e7cb9e..26843f2f004 100644 --- a/pkg/proxy/winkernel/testing/hcnutils_mock.go +++ b/pkg/proxy/winkernel/testing/hcnutils_mock.go @@ -62,6 +62,7 @@ func NewHcnMock(hnsNetwork *hcn.HostComputeNetwork) *HcnMock { V2: true, }, DSR: true, + RemoteSubnet: true, IPv6DualStack: true, }, network: hnsNetwork, @@ -223,3 +224,11 @@ func (hcnObj HcnMock) DeleteAllHnsLoadBalancerPolicy() { delete(loadbalancerMap, k) } } + +func (hcnObj HcnMock) RemoteSubnetSupported() error { + if hcnObj.supportedFeatures.RemoteSubnet { + return nil + } + + return errors.New("remote Subnet Not Supported") +}