From ef2628d764675d87932da0a793b97d97bfc33efe Mon Sep 17 00:00:00 2001 From: Prince Pereira Date: Wed, 27 Mar 2024 21:45:18 -0700 Subject: [PATCH] Adding support for ModifyLoadbalancer in windows kubeproxy. --- pkg/proxy/winkernel/hcnutils.go | 17 +- pkg/proxy/winkernel/hns.go | 108 ++++++- pkg/proxy/winkernel/proxier.go | 292 ++++++++++++++----- pkg/proxy/winkernel/proxier_test.go | 286 ++++++++++++++++++ pkg/proxy/winkernel/testing/hcnutils_mock.go | 9 + 5 files changed, 628 insertions(+), 84 deletions(-) diff --git a/pkg/proxy/winkernel/hcnutils.go b/pkg/proxy/winkernel/hcnutils.go index ec6cf81651d..8dbebf460c5 100644 --- a/pkg/proxy/winkernel/hcnutils.go +++ b/pkg/proxy/winkernel/hcnutils.go @@ -20,7 +20,6 @@ limitations under the License. package winkernel import ( - "github.com/Microsoft/hcsshim" "github.com/Microsoft/hcsshim/hcn" "k8s.io/klog/v2" ) @@ -41,6 +40,7 @@ type HcnService interface { ListLoadBalancers() ([]hcn.HostComputeLoadBalancer, error) GetLoadBalancerByID(loadBalancerId string) (*hcn.HostComputeLoadBalancer, error) CreateLoadBalancer(loadBalancer *hcn.HostComputeLoadBalancer) (*hcn.HostComputeLoadBalancer, error) + UpdateLoadBalancer(loadBalancer *hcn.HostComputeLoadBalancer, hnsID string) (*hcn.HostComputeLoadBalancer, error) DeleteLoadBalancer(loadBalancer *hcn.HostComputeLoadBalancer) error // Features functions GetSupportedFeatures() hcn.SupportedFeatures @@ -104,6 +104,10 @@ func (hcnObj hcnImpl) CreateLoadBalancer(loadBalancer *hcn.HostComputeLoadBalanc return loadBalancer.Create() } +func (hcnObj hcnImpl) UpdateLoadBalancer(loadBalancer *hcn.HostComputeLoadBalancer, hnsID string) (*hcn.HostComputeLoadBalancer, error) { + return loadBalancer.Update(hnsID) +} + func (hcnObj hcnImpl) DeleteLoadBalancer(loadBalancer *hcn.HostComputeLoadBalancer) error { return loadBalancer.Delete() } @@ -121,15 +125,16 @@ func (hcnObj hcnImpl) DsrSupported() error { } func (hcnObj hcnImpl) DeleteAllHnsLoadBalancerPolicy() { - plists, err := hcsshim.HNSListPolicyListRequest() + lbs, err := hcnObj.ListLoadBalancers() if err != nil { + klog.V(2).ErrorS(err, "Deleting all existing loadbalancers failed.") return } - for _, plist := range plists { - klog.V(3).InfoS("Remove policy", "policies", plist) - _, err = plist.Delete() + klog.V(3).InfoS("Deleting all existing loadbalancers", "lbCount", len(lbs)) + for _, lb := range lbs { + err = lb.Delete() if err != nil { - klog.ErrorS(err, "Failed to delete policy list") + klog.V(2).ErrorS(err, "Error deleting existing loadbalancer", "lb", lb) } } } diff --git a/pkg/proxy/winkernel/hns.go b/pkg/proxy/winkernel/hns.go index b11eb7cacd4..d2cb92a67e8 100644 --- a/pkg/proxy/winkernel/hns.go +++ b/pkg/proxy/winkernel/hns.go @@ -40,6 +40,7 @@ type HostNetworkService interface { deleteEndpoint(hnsID string) error getLoadBalancer(endpoints []endpointInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerInfo, error) + updateLoadBalancer(hnsID string, sourceVip, vip string, endpoints []endpointInfo, flags loadBalancerFlags, protocol, internalPort, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) deleteLoadBalancer(hnsID string) error } @@ -54,6 +55,33 @@ var ( LoadBalancerPortMappingFlagsVipExternalIP hcn.LoadBalancerPortMappingFlags = 16 ) +func getLoadBalancerPolicyFlags(flags loadBalancerFlags) (lbPortMappingFlags hcn.LoadBalancerPortMappingFlags, lbFlags hcn.LoadBalancerFlags) { + lbPortMappingFlags = hcn.LoadBalancerPortMappingFlagsNone + if flags.isILB { + lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsILB + } + if flags.useMUX { + lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsUseMux + } + if flags.preserveDIP { + lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsPreserveDIP + } + if flags.localRoutedVIP { + lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsLocalRoutedVIP + } + if flags.isVipExternalIP { + lbPortMappingFlags |= LoadBalancerPortMappingFlagsVipExternalIP + } + lbFlags = hcn.LoadBalancerFlagsNone + if flags.isDSR { + lbFlags |= hcn.LoadBalancerFlagsDSR + } + if flags.isIPv6 { + lbFlags |= LoadBalancerFlagsIPv6 + } + return +} + func (hns hns) getNetworkByName(name string) (*hnsNetworkInfo, error) { hnsnetwork, err := hns.hcn.GetNetworkByName(name) if err != nil { @@ -406,6 +434,84 @@ func (hns hns) getLoadBalancer(endpoints []endpointInfo, flags loadBalancerFlags return lbInfo, err } +func (hns hns) updateLoadBalancer(hnsID string, + sourceVip, + vip string, + endpoints []endpointInfo, + flags loadBalancerFlags, + protocol, + internalPort, + externalPort uint16, + previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) { + klog.V(3).InfoS("Updating existing loadbalancer called", "hnsLbID", hnsID, "endpointCount", len(endpoints), "vip", vip, "sourceVip", sourceVip, "internalPort", internalPort, "externalPort", externalPort) + + var id loadBalancerIdentifier + vips := []string{} + // Compute hash from backends (endpoint IDs) + hash, err := hashEndpoints(endpoints) + if err != nil { + klog.V(2).ErrorS(err, "Error hashing endpoints", "endpoints", endpoints) + return nil, err + } + if len(vip) > 0 { + id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: vip, endpointsHash: hash} + vips = append(vips, vip) + } else { + id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, endpointsHash: hash} + } + + if lb, found := previousLoadBalancers[id]; found { + klog.V(1).InfoS("Found cached Hns loadbalancer policy resource", "policies", lb) + return lb, nil + } + + lbPortMappingFlags, lbFlags := getLoadBalancerPolicyFlags(flags) + + lbDistributionType := hcn.LoadBalancerDistributionNone + + if flags.sessionAffinity { + lbDistributionType = hcn.LoadBalancerDistributionSourceIP + } + + loadBalancer := &hcn.HostComputeLoadBalancer{ + SourceVIP: sourceVip, + PortMappings: []hcn.LoadBalancerPortMapping{ + { + Protocol: uint32(protocol), + InternalPort: internalPort, + ExternalPort: externalPort, + DistributionType: lbDistributionType, + Flags: lbPortMappingFlags, + }, + }, + FrontendVIPs: vips, + SchemaVersion: hcn.SchemaVersion{ + Major: 2, + Minor: 0, + }, + Flags: lbFlags, + } + + for _, ep := range endpoints { + loadBalancer.HostComputeEndpoints = append(loadBalancer.HostComputeEndpoints, ep.hnsID) + } + + lb, err := hns.hcn.UpdateLoadBalancer(loadBalancer, hnsID) + + if err != nil { + klog.V(2).ErrorS(err, "Error updating existing loadbalancer", "hnsLbID", hnsID, "error", err, "endpoints", endpoints) + return nil, err + } + + klog.V(1).InfoS("Update loadbalancer is successful", "loadBalancer", lb) + lbInfo := &loadBalancerInfo{ + hnsID: lb.Id, + } + // Add to map of load balancers + previousLoadBalancers[id] = lbInfo + return lbInfo, err +} + func (hns hns) deleteLoadBalancer(hnsID string) error { lb, err := hns.hcn.GetLoadBalancerByID(hnsID) if err != nil { @@ -440,7 +546,7 @@ func hashEndpoints[T string | endpointInfo](endpoints []T) (hash [20]byte, err e case endpointInfo: id = strings.ToUpper(x.hnsID) case string: - id = x + id = strings.ToUpper(x) } if len(id) > 0 { // We XOR the hashes of endpoints, since they are an unordered set. diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index a1f9bade5f2..694637e1c7f 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -155,6 +155,7 @@ const ( func newHostNetworkService(hcnImpl HcnService) (HostNetworkService, hcn.SupportedFeatures) { var h HostNetworkService supportedFeatures := hcnImpl.GetSupportedFeatures() + klog.V(3).InfoS("HNS Supported features", "hnsSupportedFeatures", supportedFeatures) if supportedFeatures.Api.V2 { h = hns{ hcn: hcnImpl, @@ -344,19 +345,37 @@ func conjureMac(macPrefix string, ip net.IP) string { return "02-11-22-33-44-55" } +// This will keep the track of all terminated endpoints. +// This is done by adding the endpoints from old endpoint map and removing the endpoints from new endpoint map. +// This way, we have entries which are only present in old endpoint map and not in new endpoint map. +func (proxier *Proxier) updateTerminatedEndpoints(eps []proxy.Endpoint, isOldEndpointsMap bool) { + for _, ep := range eps { + if !ep.IsLocal() { + if isOldEndpointsMap { + proxier.terminatedEndpoints[ep.IP()] = true + } else { + delete(proxier.terminatedEndpoints, ep.IP()) + } + } + } +} + func (proxier *Proxier) endpointsMapChange(oldEndpointsMap, newEndpointsMap proxy.EndpointsMap) { // This will optimize remote endpoint and loadbalancer deletion based on the annotation var svcPortMap = make(map[proxy.ServicePortName]bool) + clear(proxier.terminatedEndpoints) var logLevel klog.Level = 5 for svcPortName, eps := range oldEndpointsMap { logFormattedEndpoints("endpointsMapChange oldEndpointsMap", logLevel, svcPortName, eps) svcPortMap[svcPortName] = true + proxier.updateTerminatedEndpoints(eps, true) proxier.onEndpointsMapChange(&svcPortName, false) } for svcPortName, eps := range newEndpointsMap { logFormattedEndpoints("endpointsMapChange newEndpointsMap", logLevel, svcPortName, eps) // redundantCleanup true means cleanup is called second time on the same svcPort + proxier.updateTerminatedEndpoints(eps, false) redundantCleanup := svcPortMap[svcPortName] proxier.onEndpointsMapChange(&svcPortName, redundantCleanup) } @@ -615,6 +634,7 @@ type Proxier struct { forwardHealthCheckVip bool rootHnsEndpointName string mapStaleLoadbalancers map[string]bool // This maintains entries of stale load balancers which are pending delete in last iteration + terminatedEndpoints map[string]bool // This maintains entries of endpoints which are terminated. Key is ip address:portnumber } type localPort struct { @@ -773,6 +793,7 @@ func NewProxier( rootHnsEndpointName: config.RootHnsEndpointName, forwardHealthCheckVip: config.ForwardHealthCheckVip, mapStaleLoadbalancers: make(map[string]bool), + terminatedEndpoints: make(map[string]bool), } serviceChanges := proxy.NewServiceChangeTracker(proxier.newServiceInfo, ipFamily, recorder, proxier.serviceMapChange) @@ -1085,6 +1106,25 @@ func updateQueriedEndpoints(newHnsEndpoint *endpointInfo, queriedEndpoints map[s queriedEndpoints[newHnsEndpoint.ip] = newHnsEndpoint } +func (proxier *Proxier) requiresUpdateLoadbalancer(lbHnsID string, endpointCount int) bool { + return proxier.supportedFeatures.ModifyLoadbalancer && lbHnsID != "" && endpointCount > 0 +} + +// handleUpdateLoadbalancerFailure will handle the error returned by updatePolicy. If the error is due to unsupported feature, +// then it will set the supportedFeatures.ModifyLoadbalancer to false. return true means skip the iteration. +func (proxier *Proxier) handleUpdateLoadbalancerFailure(err error, hnsID, svcIP string, endpointCount int) (skipIteration bool) { + if err != nil { + if hcn.IsNotImplemented(err) { + klog.Warning("Update loadbalancer policies is not implemented.", "hnsID", hnsID, "svcIP", svcIP, "endpointCount", endpointCount) + proxier.supportedFeatures.ModifyLoadbalancer = false + } else { + klog.ErrorS(err, "Update loadbalancer policy failed", "hnsID", hnsID, "svcIP", svcIP, "endpointCount", endpointCount) + skipIteration = true + } + } + return skipIteration +} + // This is where all of the hns save/restore calls happen. // assumes proxier.mu is held func (proxier *Proxier) syncProxyRules() { @@ -1368,7 +1408,7 @@ func (proxier *Proxier) syncProxyRules() { if len(svcInfo.hnsID) > 0 { // This should not happen - klog.InfoS("Load Balancer already exists -- Debug ", "hnsID", svcInfo.hnsID) + klog.InfoS("Load Balancer already exists.", "hnsID", svcInfo.hnsID) } // In ETP:Cluster, if all endpoints are under termination, @@ -1396,7 +1436,6 @@ func (proxier *Proxier) syncProxyRules() { } endpointsAvailableForLB := !allEndpointsTerminating && !allEndpointsNonServing - proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &svcInfo.hnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), hnsEndpoints, queriedLoadBalancers) // clusterIPEndpoints is the endpoint list used for creating ClusterIP loadbalancer. clusterIPEndpoints := hnsEndpoints @@ -1405,30 +1444,50 @@ func (proxier *Proxier) syncProxyRules() { clusterIPEndpoints = hnsLocalEndpoints } - if len(clusterIPEndpoints) > 0 { - - // If all endpoints are terminating, then no need to create Cluster IP LoadBalancer - // Cluster IP LoadBalancer creation - hnsLoadBalancer, err := hns.getLoadBalancer( - clusterIPEndpoints, - loadBalancerFlags{isDSR: proxier.isDSR, isIPv6: proxier.ipFamily == v1.IPv6Protocol, sessionAffinity: sessionAffinityClientIP}, + if proxier.requiresUpdateLoadbalancer(svcInfo.hnsID, len(clusterIPEndpoints)) { + hnsLoadBalancer, err = hns.updateLoadBalancer( + svcInfo.hnsID, sourceVip, svcInfo.ClusterIP().String(), + clusterIPEndpoints, + loadBalancerFlags{isDSR: proxier.isDSR, isIPv6: proxier.ipFamily == v1.IPv6Protocol, sessionAffinity: sessionAffinityClientIP}, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), queriedLoadBalancers, ) - if err != nil { - klog.ErrorS(err, "Policy creation failed") + if skipIteration := proxier.handleUpdateLoadbalancerFailure(err, svcInfo.hnsID, svcInfo.ClusterIP().String(), len(clusterIPEndpoints)); skipIteration { continue } + } - svcInfo.hnsID = hnsLoadBalancer.hnsID - klog.V(3).InfoS("Hns LoadBalancer resource created for cluster ip resources", "clusterIP", svcInfo.ClusterIP(), "hnsID", hnsLoadBalancer.hnsID) + if !proxier.requiresUpdateLoadbalancer(svcInfo.hnsID, len(clusterIPEndpoints)) { + proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &svcInfo.hnsID, svcInfo.ClusterIP().String(), Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), hnsEndpoints, queriedLoadBalancers) + if len(clusterIPEndpoints) > 0 { - } else { - klog.V(3).InfoS("Skipped creating Hns LoadBalancer for cluster ip resources. Reason : all endpoints are terminating", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "allEndpointsTerminating", allEndpointsTerminating) + // If all endpoints are terminating, then no need to create Cluster IP LoadBalancer + // Cluster IP LoadBalancer creation + hnsLoadBalancer, err := hns.getLoadBalancer( + clusterIPEndpoints, + loadBalancerFlags{isDSR: proxier.isDSR, isIPv6: proxier.ipFamily == v1.IPv6Protocol, sessionAffinity: sessionAffinityClientIP}, + sourceVip, + svcInfo.ClusterIP().String(), + Enum(svcInfo.Protocol()), + uint16(svcInfo.targetPort), + uint16(svcInfo.Port()), + queriedLoadBalancers, + ) + if err != nil { + klog.ErrorS(err, "ClusterIP policy creation failed") + continue + } + + svcInfo.hnsID = hnsLoadBalancer.hnsID + klog.V(3).InfoS("Hns LoadBalancer resource created for cluster ip resources", "clusterIP", svcInfo.ClusterIP(), "hnsID", hnsLoadBalancer.hnsID) + + } else { + klog.V(3).InfoS("Skipped creating Hns LoadBalancer for cluster ip resources. Reason : all endpoints are terminating", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "allEndpointsTerminating", allEndpointsTerminating) + } } // If nodePort is specified, user should be able to use nodeIP:nodePort to reach the backend endpoints @@ -1440,29 +1499,48 @@ func (proxier *Proxier) syncProxyRules() { nodePortEndpoints = hnsLocalEndpoints } - proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &svcInfo.nodePorthnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), nodePortEndpoints, queriedLoadBalancers) - - if len(nodePortEndpoints) > 0 && endpointsAvailableForLB { - // If all endpoints are in terminating stage, then no need to create Node Port LoadBalancer - hnsLoadBalancer, err := hns.getLoadBalancer( - nodePortEndpoints, - loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, localRoutedVIP: true, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol}, + if proxier.requiresUpdateLoadbalancer(svcInfo.nodePorthnsID, len(nodePortEndpoints)) && endpointsAvailableForLB { + hnsLoadBalancer, err = hns.updateLoadBalancer( + svcInfo.nodePorthnsID, sourceVip, "", + nodePortEndpoints, + loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, localRoutedVIP: true, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol}, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.NodePort()), queriedLoadBalancers, ) - if err != nil { - klog.ErrorS(err, "Policy creation failed") + if skipIteration := proxier.handleUpdateLoadbalancerFailure(err, svcInfo.nodePorthnsID, sourceVip, len(nodePortEndpoints)); skipIteration { continue } + } - svcInfo.nodePorthnsID = hnsLoadBalancer.hnsID - klog.V(3).InfoS("Hns LoadBalancer resource created for nodePort resources", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "hnsID", hnsLoadBalancer.hnsID) - } else { - klog.V(3).InfoS("Skipped creating Hns LoadBalancer for nodePort resources", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "allEndpointsTerminating", allEndpointsTerminating) + if !proxier.requiresUpdateLoadbalancer(svcInfo.nodePorthnsID, len(nodePortEndpoints)) { + proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &svcInfo.nodePorthnsID, "", Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.NodePort()), nodePortEndpoints, queriedLoadBalancers) + + if len(nodePortEndpoints) > 0 && endpointsAvailableForLB { + // If all endpoints are in terminating stage, then no need to create Node Port LoadBalancer + hnsLoadBalancer, err := hns.getLoadBalancer( + nodePortEndpoints, + loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, localRoutedVIP: true, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol}, + sourceVip, + "", + Enum(svcInfo.Protocol()), + uint16(svcInfo.targetPort), + uint16(svcInfo.NodePort()), + queriedLoadBalancers, + ) + if err != nil { + klog.ErrorS(err, "Nodeport policy creation failed") + continue + } + + svcInfo.nodePorthnsID = hnsLoadBalancer.hnsID + klog.V(3).InfoS("Hns LoadBalancer resource created for nodePort resources", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "hnsID", hnsLoadBalancer.hnsID) + } else { + klog.V(3).InfoS("Skipped creating Hns LoadBalancer for nodePort resources", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "allEndpointsTerminating", allEndpointsTerminating) + } } } @@ -1474,29 +1552,48 @@ func (proxier *Proxier) syncProxyRules() { externalIPEndpoints = hnsLocalEndpoints } - proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &externalIP.hnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), externalIPEndpoints, queriedLoadBalancers) - - if len(externalIPEndpoints) > 0 && endpointsAvailableForLB { - // If all endpoints are in terminating stage, then no need to External IP LoadBalancer - // Try loading existing policies, if already available - hnsLoadBalancer, err = hns.getLoadBalancer( - externalIPEndpoints, - loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol}, + if proxier.requiresUpdateLoadbalancer(externalIP.hnsID, len(externalIPEndpoints)) && endpointsAvailableForLB { + hnsLoadBalancer, err = hns.updateLoadBalancer( + externalIP.hnsID, sourceVip, externalIP.ip, + externalIPEndpoints, + loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol}, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), queriedLoadBalancers, ) - if err != nil { - klog.ErrorS(err, "Policy creation failed") + if skipIteration := proxier.handleUpdateLoadbalancerFailure(err, externalIP.hnsID, externalIP.ip, len(externalIPEndpoints)); skipIteration { continue } - externalIP.hnsID = hnsLoadBalancer.hnsID - klog.V(3).InfoS("Hns LoadBalancer resource created for externalIP resources", "externalIP", externalIP, "hnsID", hnsLoadBalancer.hnsID) - } else { - klog.V(3).InfoS("Skipped creating Hns LoadBalancer for externalIP resources", "externalIP", externalIP, "allEndpointsTerminating", allEndpointsTerminating) + } + + if !proxier.requiresUpdateLoadbalancer(externalIP.hnsID, len(externalIPEndpoints)) { + proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &externalIP.hnsID, externalIP.ip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), externalIPEndpoints, queriedLoadBalancers) + + if len(externalIPEndpoints) > 0 && endpointsAvailableForLB { + // If all endpoints are in terminating stage, then no need to External IP LoadBalancer + // Try loading existing policies, if already available + hnsLoadBalancer, err = hns.getLoadBalancer( + externalIPEndpoints, + loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol}, + sourceVip, + externalIP.ip, + Enum(svcInfo.Protocol()), + uint16(svcInfo.targetPort), + uint16(svcInfo.Port()), + queriedLoadBalancers, + ) + if err != nil { + klog.ErrorS(err, "ExternalIP policy creation failed") + continue + } + externalIP.hnsID = hnsLoadBalancer.hnsID + klog.V(3).InfoS("Hns LoadBalancer resource created for externalIP resources", "externalIP", externalIP, "hnsID", hnsLoadBalancer.hnsID) + } else { + klog.V(3).InfoS("Skipped creating Hns LoadBalancer for externalIP resources", "externalIP", externalIP, "allEndpointsTerminating", allEndpointsTerminating) + } } } // Create a Load Balancer Policy for each loadbalancer ingress @@ -1507,27 +1604,46 @@ func (proxier *Proxier) syncProxyRules() { lbIngressEndpoints = hnsLocalEndpoints } - proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.hnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), lbIngressEndpoints, queriedLoadBalancers) - - if len(lbIngressEndpoints) > 0 { - hnsLoadBalancer, err := hns.getLoadBalancer( - lbIngressEndpoints, - loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.preserveDIP || svcInfo.localTrafficDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol}, + if proxier.requiresUpdateLoadbalancer(lbIngressIP.hnsID, len(lbIngressEndpoints)) { + hnsLoadBalancer, err = hns.updateLoadBalancer( + lbIngressIP.hnsID, sourceVip, lbIngressIP.ip, + lbIngressEndpoints, + loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.preserveDIP || svcInfo.localTrafficDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol}, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), queriedLoadBalancers, ) - if err != nil { - klog.ErrorS(err, "Policy creation failed") + if skipIteration := proxier.handleUpdateLoadbalancerFailure(err, lbIngressIP.hnsID, lbIngressIP.ip, len(lbIngressEndpoints)); skipIteration { continue } - lbIngressIP.hnsID = hnsLoadBalancer.hnsID - klog.V(3).InfoS("Hns LoadBalancer resource created for loadBalancer Ingress resources", "lbIngressIP", lbIngressIP) - } else { - klog.V(3).InfoS("Skipped creating Hns LoadBalancer for loadBalancer Ingress resources", "lbIngressIP", lbIngressIP) + } + + if !proxier.requiresUpdateLoadbalancer(lbIngressIP.hnsID, len(lbIngressEndpoints)) { + proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.hnsID, lbIngressIP.ip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), lbIngressEndpoints, queriedLoadBalancers) + + if len(lbIngressEndpoints) > 0 { + hnsLoadBalancer, err := hns.getLoadBalancer( + lbIngressEndpoints, + loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.preserveDIP || svcInfo.localTrafficDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol}, + sourceVip, + lbIngressIP.ip, + Enum(svcInfo.Protocol()), + uint16(svcInfo.targetPort), + uint16(svcInfo.Port()), + queriedLoadBalancers, + ) + if err != nil { + klog.ErrorS(err, "IngressIP policy creation failed") + continue + } + lbIngressIP.hnsID = hnsLoadBalancer.hnsID + klog.V(3).InfoS("Hns LoadBalancer resource created for loadBalancer Ingress resources", "lbIngressIP", lbIngressIP) + } else { + klog.V(3).InfoS("Skipped creating Hns LoadBalancer for loadBalancer Ingress resources", "lbIngressIP", lbIngressIP) + } } if proxier.forwardHealthCheckVip && gatewayHnsendpoint != nil && endpointsAvailableForLB { @@ -1537,24 +1653,45 @@ func (proxier *Proxier) syncProxyRules() { nodeport = svcInfo.HealthCheckNodePort() } - proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.healthCheckHnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), []endpointInfo{*gatewayHnsendpoint}, queriedLoadBalancers) + gwEndpoints := []endpointInfo{*gatewayHnsendpoint} - hnsHealthCheckLoadBalancer, err := hns.getLoadBalancer( - []endpointInfo{*gatewayHnsendpoint}, - loadBalancerFlags{isDSR: false, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP}, - sourceVip, - lbIngressIP.ip, - Enum(svcInfo.Protocol()), - uint16(nodeport), - uint16(nodeport), - queriedLoadBalancers, - ) - if err != nil { - klog.ErrorS(err, "Policy creation failed") - continue + if proxier.requiresUpdateLoadbalancer(lbIngressIP.healthCheckHnsID, len(gwEndpoints)) { + hnsLoadBalancer, err = hns.updateLoadBalancer( + lbIngressIP.healthCheckHnsID, + sourceVip, + lbIngressIP.ip, + gwEndpoints, + loadBalancerFlags{isDSR: false, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP}, + Enum(svcInfo.Protocol()), + uint16(nodeport), + uint16(nodeport), + queriedLoadBalancers, + ) + if skipIteration := proxier.handleUpdateLoadbalancerFailure(err, lbIngressIP.healthCheckHnsID, lbIngressIP.ip, 1); skipIteration { + continue + } + } + + if !proxier.requiresUpdateLoadbalancer(lbIngressIP.healthCheckHnsID, len(gwEndpoints)) { + proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.healthCheckHnsID, lbIngressIP.ip, Enum(svcInfo.Protocol()), uint16(nodeport), uint16(nodeport), gwEndpoints, queriedLoadBalancers) + + hnsHealthCheckLoadBalancer, err := hns.getLoadBalancer( + gwEndpoints, + loadBalancerFlags{isDSR: false, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP}, + sourceVip, + lbIngressIP.ip, + Enum(svcInfo.Protocol()), + uint16(nodeport), + uint16(nodeport), + queriedLoadBalancers, + ) + if err != nil { + klog.ErrorS(err, "Healthcheck loadbalancer policy creation failed") + continue + } + lbIngressIP.healthCheckHnsID = hnsHealthCheckLoadBalancer.hnsID + klog.V(3).InfoS("Hns Health Check LoadBalancer resource created for loadBalancer Ingress resources", "ip", lbIngressIP) } - lbIngressIP.healthCheckHnsID = hnsHealthCheckLoadBalancer.hnsID - klog.V(3).InfoS("Hns Health Check LoadBalancer resource created for loadBalancer Ingress resources", "ip", lbIngressIP) } else { klog.V(3).InfoS("Skipped creating Hns Health Check LoadBalancer for loadBalancer Ingress resources", "ip", lbIngressIP, "allEndpointsTerminating", allEndpointsTerminating) } @@ -1586,11 +1723,12 @@ func (proxier *Proxier) syncProxyRules() { } // remove stale endpoint refcount entries - for hnsID, referenceCount := range proxier.endPointsRefCount { - if *referenceCount <= 0 { - klog.V(3).InfoS("Deleting unreferenced remote endpoint", "hnsID", hnsID) - proxier.hns.deleteEndpoint(hnsID) - delete(proxier.endPointsRefCount, hnsID) + for epIP := range proxier.terminatedEndpoints { + if epToDelete := queriedEndpoints[epIP]; epToDelete != nil && epToDelete.hnsID != "" { + if refCount := proxier.endPointsRefCount.getRefCount(epToDelete.hnsID); refCount == nil || *refCount == 0 { + klog.V(3).InfoS("Deleting unreferenced remote endpoint", "hnsID", epToDelete.hnsID) + proxier.hns.deleteEndpoint(epToDelete.hnsID) + } } } // This will cleanup stale load balancers which are pending delete @@ -1600,7 +1738,7 @@ func (proxier *Proxier) syncProxyRules() { // deleteExistingLoadBalancer checks whether loadbalancer delete is needed or not. // If it is needed, the function will delete the existing loadbalancer and return true, else false. -func (proxier *Proxier) deleteExistingLoadBalancer(hns HostNetworkService, winProxyOptimization bool, lbHnsID *string, sourceVip string, protocol, intPort, extPort uint16, endpoints []endpointInfo, queriedLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) bool { +func (proxier *Proxier) deleteExistingLoadBalancer(hns HostNetworkService, winProxyOptimization bool, lbHnsID *string, vip string, protocol, intPort, extPort uint16, endpoints []endpointInfo, queriedLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) bool { if !winProxyOptimization || *lbHnsID == "" { // Loadbalancer delete not needed @@ -1609,7 +1747,7 @@ func (proxier *Proxier) deleteExistingLoadBalancer(hns HostNetworkService, winPr lbID, lbIdErr := findLoadBalancerID( endpoints, - sourceVip, + vip, protocol, intPort, extPort, diff --git a/pkg/proxy/winkernel/proxier_test.go b/pkg/proxy/winkernel/proxier_test.go index 964ca542835..56ffc4fe532 100644 --- a/pkg/proxy/winkernel/proxier_test.go +++ b/pkg/proxy/winkernel/proxier_test.go @@ -118,6 +118,7 @@ func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, hostn endPointsRefCount: make(endPointsReferenceCountMap), forwardHealthCheckVip: true, mapStaleLoadbalancers: make(map[string]bool), + terminatedEndpoints: make(map[string]bool), } serviceChanges := proxy.NewServiceChangeTracker(proxier.newServiceInfo, v1.IPv4Protocol, nil, proxier.serviceMapChange) @@ -682,6 +683,291 @@ func TestCreateLoadBalancer(t *testing.T) { } } +func TestUpdateLoadBalancerWhenSupported(t *testing.T) { + syncPeriod := 30 * time.Second + proxier := NewFakeProxier(syncPeriod, syncPeriod, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY) + if proxier == nil { + t.Error() + } + + proxier.supportedFeatures.ModifyLoadbalancer = true + + svcIP := "10.20.30.41" + svcPort := 80 + svcNodePort := 3001 + svcPortName := proxy.ServicePortName{ + NamespacedName: makeNSN("ns1", "svc1"), + Port: "p80", + Protocol: v1.ProtocolTCP, + } + + makeServiceMap(proxier, + makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { + svc.Spec.Type = "NodePort" + svc.Spec.ClusterIP = svcIP + svc.Spec.Ports = []v1.ServicePort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: v1.ProtocolTCP, + NodePort: int32(svcNodePort), + }} + }), + ) + populateEndpointSlices(proxier, + makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { + eps.AddressType = discovery.AddressTypeIPv4 + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{epIpAddressRemote}, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: ptr.To(svcPortName.Port), + Port: ptr.To(int32(svcPort)), + Protocol: ptr.To(v1.ProtocolTCP), + }} + }), + ) + + proxier.setInitialized(true) + proxier.syncProxyRules() + + svc := proxier.svcPortMap[svcPortName] + svcInfo, ok := svc.(*serviceInfo) + if !ok { + t.Errorf("Failed to cast serviceInfo %q", svcPortName.String()) + + } else { + if svcInfo.hnsID != loadbalancerGuid1 { + t.Errorf("%v does not match %v", svcInfo.hnsID, loadbalancerGuid1) + } + } + + proxier.setInitialized(false) + + proxier.OnEndpointSliceUpdate( + makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { + eps.AddressType = discovery.AddressTypeIPv4 + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{epIpAddressRemote}, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: ptr.To(svcPortName.Port), + Port: ptr.To(int32(svcPort)), + Protocol: ptr.To(v1.ProtocolTCP), + }} + }), + makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { + eps.AddressType = discovery.AddressTypeIPv4 + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{epPaAddress}, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: ptr.To(svcPortName.Port), + Port: ptr.To(int32(svcPort)), + Protocol: ptr.To(v1.ProtocolTCP), + }} + })) + + proxier.mu.Lock() + proxier.endpointSlicesSynced = true + proxier.mu.Unlock() + + proxier.setInitialized(true) + + epObj, err := proxier.hcn.GetEndpointByID("EPID-3") + if err != nil || epObj == nil { + t.Errorf("Failed to fetch endpoint: EPID-3") + } + + proxier.syncProxyRules() + + // The endpoint should be deleted as it is not present in the new endpoint slice + epObj, err = proxier.hcn.GetEndpointByID("EPID-3") + if err == nil || epObj != nil { + t.Errorf("Failed to fetch endpoint: EPID-3") + } + + ep := proxier.endpointsMap[svcPortName][0] + epInfo, ok := ep.(*endpointInfo) + + epObj, err = proxier.hcn.GetEndpointByID("EPID-5") + if err != nil || epObj == nil { + t.Errorf("Failed to fetch endpoint: EPID-5") + } + + if !ok { + t.Errorf("Failed to cast endpointInfo %q", svcPortName.String()) + + } else { + if epInfo.hnsID != "EPID-5" { + t.Errorf("%v does not match %v", epInfo.hnsID, "EPID-5") + } + } + + if *epInfo.refCount != 1 { + t.Errorf("Incorrect refcount. Current value: %v", *epInfo.refCount) + } + + if *proxier.endPointsRefCount["EPID-5"] != *epInfo.refCount { + t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[endpointGuid1], *epInfo.refCount) + } + + svc = proxier.svcPortMap[svcPortName] + svcInfo, ok = svc.(*serviceInfo) + if !ok { + t.Errorf("Failed to cast serviceInfo %q", svcPortName.String()) + + } else { + // Loadbalancer id should not change after the update + if svcInfo.hnsID != loadbalancerGuid1 { + t.Errorf("%v does not match %v", svcInfo.hnsID, loadbalancerGuid1) + } + } + +} + +func TestUpdateLoadBalancerWhenUnsupported(t *testing.T) { + syncPeriod := 30 * time.Second + proxier := NewFakeProxier(syncPeriod, syncPeriod, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY) + if proxier == nil { + t.Error() + } + + // By default the value is false, for the readibility of the test case setting it to false again + proxier.supportedFeatures.ModifyLoadbalancer = false + + svcIP := "10.20.30.41" + svcPort := 80 + svcNodePort := 3001 + svcPortName := proxy.ServicePortName{ + NamespacedName: makeNSN("ns1", "svc1"), + Port: "p80", + Protocol: v1.ProtocolTCP, + } + + makeServiceMap(proxier, + makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { + svc.Spec.Type = "NodePort" + svc.Spec.ClusterIP = svcIP + svc.Spec.Ports = []v1.ServicePort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: v1.ProtocolTCP, + NodePort: int32(svcNodePort), + }} + }), + ) + populateEndpointSlices(proxier, + makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { + eps.AddressType = discovery.AddressTypeIPv4 + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{epIpAddressRemote}, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: ptr.To(svcPortName.Port), + Port: ptr.To(int32(svcPort)), + Protocol: ptr.To(v1.ProtocolTCP), + }} + }), + ) + + proxier.setInitialized(true) + proxier.syncProxyRules() + + svc := proxier.svcPortMap[svcPortName] + svcInfo, ok := svc.(*serviceInfo) + if !ok { + t.Errorf("Failed to cast serviceInfo %q", svcPortName.String()) + + } else { + if svcInfo.hnsID != loadbalancerGuid1 { + t.Errorf("%v does not match %v", svcInfo.hnsID, loadbalancerGuid1) + } + } + + proxier.setInitialized(false) + + proxier.OnEndpointSliceUpdate( + makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { + eps.AddressType = discovery.AddressTypeIPv4 + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{epIpAddressRemote}, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: ptr.To(svcPortName.Port), + Port: ptr.To(int32(svcPort)), + Protocol: ptr.To(v1.ProtocolTCP), + }} + }), + makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { + eps.AddressType = discovery.AddressTypeIPv4 + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{epPaAddress}, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: ptr.To(svcPortName.Port), + Port: ptr.To(int32(svcPort)), + Protocol: ptr.To(v1.ProtocolTCP), + }} + })) + + proxier.mu.Lock() + proxier.endpointSlicesSynced = true + proxier.mu.Unlock() + + proxier.setInitialized(true) + + epObj, err := proxier.hcn.GetEndpointByID("EPID-3") + if err != nil || epObj == nil { + t.Errorf("Failed to fetch endpoint: EPID-3") + } + + proxier.syncProxyRules() + + // The endpoint should be deleted as it is not present in the new endpoint slice + epObj, err = proxier.hcn.GetEndpointByID("EPID-3") + if err == nil || epObj != nil { + t.Errorf("Failed to fetch endpoint: EPID-3") + } + + ep := proxier.endpointsMap[svcPortName][0] + epInfo, ok := ep.(*endpointInfo) + + epObj, err = proxier.hcn.GetEndpointByID("EPID-5") + if err != nil || epObj == nil { + t.Errorf("Failed to fetch endpoint: EPID-5") + } + + if !ok { + t.Errorf("Failed to cast endpointInfo %q", svcPortName.String()) + + } else { + if epInfo.hnsID != "EPID-5" { + t.Errorf("%v does not match %v", epInfo.hnsID, "EPID-5") + } + } + + if *epInfo.refCount != 1 { + t.Errorf("Incorrect refcount. Current value: %v", *epInfo.refCount) + } + + if *proxier.endPointsRefCount["EPID-5"] != *epInfo.refCount { + t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[endpointGuid1], *epInfo.refCount) + } + + svc = proxier.svcPortMap[svcPortName] + svcInfo, ok = svc.(*serviceInfo) + if !ok { + t.Errorf("Failed to cast serviceInfo %q", svcPortName.String()) + + } else { + // Loadbalancer id should change after the update + if svcInfo.hnsID != "LBID-3" { + t.Errorf("%v does not match %v", svcInfo.hnsID, "LBID-3") + } + } + +} + func TestCreateDsrLoadBalancer(t *testing.T) { syncPeriod := 30 * time.Second proxier := NewFakeProxier(syncPeriod, syncPeriod, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY) diff --git a/pkg/proxy/winkernel/testing/hcnutils_mock.go b/pkg/proxy/winkernel/testing/hcnutils_mock.go index 319f2e10c47..a626efb274f 100644 --- a/pkg/proxy/winkernel/testing/hcnutils_mock.go +++ b/pkg/proxy/winkernel/testing/hcnutils_mock.go @@ -183,6 +183,15 @@ func (hcnObj HcnMock) CreateLoadBalancer(loadBalancer *hcn.HostComputeLoadBalanc return loadBalancer, nil } +func (hcnObj HcnMock) UpdateLoadBalancer(loadBalancer *hcn.HostComputeLoadBalancer, hnsLbID string) (*hcn.HostComputeLoadBalancer, error) { + if _, ok := loadbalancerMap[hnsLbID]; !ok { + return nil, fmt.Errorf("LoadBalancer id %s Not Present", loadBalancer.Id) + } + loadBalancer.Id = hnsLbID + loadbalancerMap[hnsLbID] = loadBalancer + return loadBalancer, nil +} + func (hcnObj HcnMock) DeleteLoadBalancer(loadBalancer *hcn.HostComputeLoadBalancer) error { if _, ok := loadbalancerMap[loadBalancer.Id]; !ok { return hcn.LoadBalancerNotFoundError{LoadBalancerId: loadBalancer.Id}