Merge pull request #124092 from princepereira/ppereira-updatelbpolicy-master

Adding modifyloadbalancer api support in Windows KubeProxy for update loadbalancer replacing usage of delete and create loadbalancer api.
This commit is contained in:
Kubernetes Prow Robot 2024-07-05 17:40:28 -07:00 committed by GitHub
commit 9039d71dd7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 628 additions and 84 deletions

View File

@ -20,7 +20,6 @@ limitations under the License.
package winkernel
import (
"github.com/Microsoft/hcsshim"
"github.com/Microsoft/hcsshim/hcn"
"k8s.io/klog/v2"
)
@ -41,6 +40,7 @@ type HcnService interface {
ListLoadBalancers() ([]hcn.HostComputeLoadBalancer, error)
GetLoadBalancerByID(loadBalancerId string) (*hcn.HostComputeLoadBalancer, error)
CreateLoadBalancer(loadBalancer *hcn.HostComputeLoadBalancer) (*hcn.HostComputeLoadBalancer, error)
UpdateLoadBalancer(loadBalancer *hcn.HostComputeLoadBalancer, hnsID string) (*hcn.HostComputeLoadBalancer, error)
DeleteLoadBalancer(loadBalancer *hcn.HostComputeLoadBalancer) error
// Features functions
GetSupportedFeatures() hcn.SupportedFeatures
@ -104,6 +104,10 @@ func (hcnObj hcnImpl) CreateLoadBalancer(loadBalancer *hcn.HostComputeLoadBalanc
return loadBalancer.Create()
}
func (hcnObj hcnImpl) UpdateLoadBalancer(loadBalancer *hcn.HostComputeLoadBalancer, hnsID string) (*hcn.HostComputeLoadBalancer, error) {
return loadBalancer.Update(hnsID)
}
func (hcnObj hcnImpl) DeleteLoadBalancer(loadBalancer *hcn.HostComputeLoadBalancer) error {
return loadBalancer.Delete()
}
@ -121,15 +125,16 @@ func (hcnObj hcnImpl) DsrSupported() error {
}
func (hcnObj hcnImpl) DeleteAllHnsLoadBalancerPolicy() {
plists, err := hcsshim.HNSListPolicyListRequest()
lbs, err := hcnObj.ListLoadBalancers()
if err != nil {
klog.V(2).ErrorS(err, "Deleting all existing loadbalancers failed.")
return
}
for _, plist := range plists {
klog.V(3).InfoS("Remove policy", "policies", plist)
_, err = plist.Delete()
klog.V(3).InfoS("Deleting all existing loadbalancers", "lbCount", len(lbs))
for _, lb := range lbs {
err = lb.Delete()
if err != nil {
klog.ErrorS(err, "Failed to delete policy list")
klog.V(2).ErrorS(err, "Error deleting existing loadbalancer", "lb", lb)
}
}
}

View File

@ -40,6 +40,7 @@ type HostNetworkService interface {
deleteEndpoint(hnsID string) error
getLoadBalancer(endpoints []endpointInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error)
getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerInfo, error)
updateLoadBalancer(hnsID string, sourceVip, vip string, endpoints []endpointInfo, flags loadBalancerFlags, protocol, internalPort, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error)
deleteLoadBalancer(hnsID string) error
}
@ -54,6 +55,33 @@ var (
LoadBalancerPortMappingFlagsVipExternalIP hcn.LoadBalancerPortMappingFlags = 16
)
func getLoadBalancerPolicyFlags(flags loadBalancerFlags) (lbPortMappingFlags hcn.LoadBalancerPortMappingFlags, lbFlags hcn.LoadBalancerFlags) {
lbPortMappingFlags = hcn.LoadBalancerPortMappingFlagsNone
if flags.isILB {
lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsILB
}
if flags.useMUX {
lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsUseMux
}
if flags.preserveDIP {
lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsPreserveDIP
}
if flags.localRoutedVIP {
lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsLocalRoutedVIP
}
if flags.isVipExternalIP {
lbPortMappingFlags |= LoadBalancerPortMappingFlagsVipExternalIP
}
lbFlags = hcn.LoadBalancerFlagsNone
if flags.isDSR {
lbFlags |= hcn.LoadBalancerFlagsDSR
}
if flags.isIPv6 {
lbFlags |= LoadBalancerFlagsIPv6
}
return
}
func (hns hns) getNetworkByName(name string) (*hnsNetworkInfo, error) {
hnsnetwork, err := hns.hcn.GetNetworkByName(name)
if err != nil {
@ -406,6 +434,84 @@ func (hns hns) getLoadBalancer(endpoints []endpointInfo, flags loadBalancerFlags
return lbInfo, err
}
func (hns hns) updateLoadBalancer(hnsID string,
sourceVip,
vip string,
endpoints []endpointInfo,
flags loadBalancerFlags,
protocol,
internalPort,
externalPort uint16,
previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) {
klog.V(3).InfoS("Updating existing loadbalancer called", "hnsLbID", hnsID, "endpointCount", len(endpoints), "vip", vip, "sourceVip", sourceVip, "internalPort", internalPort, "externalPort", externalPort)
var id loadBalancerIdentifier
vips := []string{}
// Compute hash from backends (endpoint IDs)
hash, err := hashEndpoints(endpoints)
if err != nil {
klog.V(2).ErrorS(err, "Error hashing endpoints", "endpoints", endpoints)
return nil, err
}
if len(vip) > 0 {
id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: vip, endpointsHash: hash}
vips = append(vips, vip)
} else {
id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, endpointsHash: hash}
}
if lb, found := previousLoadBalancers[id]; found {
klog.V(1).InfoS("Found cached Hns loadbalancer policy resource", "policies", lb)
return lb, nil
}
lbPortMappingFlags, lbFlags := getLoadBalancerPolicyFlags(flags)
lbDistributionType := hcn.LoadBalancerDistributionNone
if flags.sessionAffinity {
lbDistributionType = hcn.LoadBalancerDistributionSourceIP
}
loadBalancer := &hcn.HostComputeLoadBalancer{
SourceVIP: sourceVip,
PortMappings: []hcn.LoadBalancerPortMapping{
{
Protocol: uint32(protocol),
InternalPort: internalPort,
ExternalPort: externalPort,
DistributionType: lbDistributionType,
Flags: lbPortMappingFlags,
},
},
FrontendVIPs: vips,
SchemaVersion: hcn.SchemaVersion{
Major: 2,
Minor: 0,
},
Flags: lbFlags,
}
for _, ep := range endpoints {
loadBalancer.HostComputeEndpoints = append(loadBalancer.HostComputeEndpoints, ep.hnsID)
}
lb, err := hns.hcn.UpdateLoadBalancer(loadBalancer, hnsID)
if err != nil {
klog.V(2).ErrorS(err, "Error updating existing loadbalancer", "hnsLbID", hnsID, "error", err, "endpoints", endpoints)
return nil, err
}
klog.V(1).InfoS("Update loadbalancer is successful", "loadBalancer", lb)
lbInfo := &loadBalancerInfo{
hnsID: lb.Id,
}
// Add to map of load balancers
previousLoadBalancers[id] = lbInfo
return lbInfo, err
}
func (hns hns) deleteLoadBalancer(hnsID string) error {
lb, err := hns.hcn.GetLoadBalancerByID(hnsID)
if err != nil {
@ -440,7 +546,7 @@ func hashEndpoints[T string | endpointInfo](endpoints []T) (hash [20]byte, err e
case endpointInfo:
id = strings.ToUpper(x.hnsID)
case string:
id = x
id = strings.ToUpper(x)
}
if len(id) > 0 {
// We XOR the hashes of endpoints, since they are an unordered set.

View File

@ -155,6 +155,7 @@ const (
func newHostNetworkService(hcnImpl HcnService) (HostNetworkService, hcn.SupportedFeatures) {
var h HostNetworkService
supportedFeatures := hcnImpl.GetSupportedFeatures()
klog.V(3).InfoS("HNS Supported features", "hnsSupportedFeatures", supportedFeatures)
if supportedFeatures.Api.V2 {
h = hns{
hcn: hcnImpl,
@ -344,19 +345,37 @@ func conjureMac(macPrefix string, ip net.IP) string {
return "02-11-22-33-44-55"
}
// This will keep the track of all terminated endpoints.
// This is done by adding the endpoints from old endpoint map and removing the endpoints from new endpoint map.
// This way, we have entries which are only present in old endpoint map and not in new endpoint map.
func (proxier *Proxier) updateTerminatedEndpoints(eps []proxy.Endpoint, isOldEndpointsMap bool) {
for _, ep := range eps {
if !ep.IsLocal() {
if isOldEndpointsMap {
proxier.terminatedEndpoints[ep.IP()] = true
} else {
delete(proxier.terminatedEndpoints, ep.IP())
}
}
}
}
func (proxier *Proxier) endpointsMapChange(oldEndpointsMap, newEndpointsMap proxy.EndpointsMap) {
// This will optimize remote endpoint and loadbalancer deletion based on the annotation
var svcPortMap = make(map[proxy.ServicePortName]bool)
clear(proxier.terminatedEndpoints)
var logLevel klog.Level = 5
for svcPortName, eps := range oldEndpointsMap {
logFormattedEndpoints("endpointsMapChange oldEndpointsMap", logLevel, svcPortName, eps)
svcPortMap[svcPortName] = true
proxier.updateTerminatedEndpoints(eps, true)
proxier.onEndpointsMapChange(&svcPortName, false)
}
for svcPortName, eps := range newEndpointsMap {
logFormattedEndpoints("endpointsMapChange newEndpointsMap", logLevel, svcPortName, eps)
// redundantCleanup true means cleanup is called second time on the same svcPort
proxier.updateTerminatedEndpoints(eps, false)
redundantCleanup := svcPortMap[svcPortName]
proxier.onEndpointsMapChange(&svcPortName, redundantCleanup)
}
@ -615,6 +634,7 @@ type Proxier struct {
forwardHealthCheckVip bool
rootHnsEndpointName string
mapStaleLoadbalancers map[string]bool // This maintains entries of stale load balancers which are pending delete in last iteration
terminatedEndpoints map[string]bool // This maintains entries of endpoints which are terminated. Key is ip address:portnumber
}
type localPort struct {
@ -773,6 +793,7 @@ func NewProxier(
rootHnsEndpointName: config.RootHnsEndpointName,
forwardHealthCheckVip: config.ForwardHealthCheckVip,
mapStaleLoadbalancers: make(map[string]bool),
terminatedEndpoints: make(map[string]bool),
}
serviceChanges := proxy.NewServiceChangeTracker(proxier.newServiceInfo, ipFamily, recorder, proxier.serviceMapChange)
@ -1085,6 +1106,25 @@ func updateQueriedEndpoints(newHnsEndpoint *endpointInfo, queriedEndpoints map[s
queriedEndpoints[newHnsEndpoint.ip] = newHnsEndpoint
}
func (proxier *Proxier) requiresUpdateLoadbalancer(lbHnsID string, endpointCount int) bool {
return proxier.supportedFeatures.ModifyLoadbalancer && lbHnsID != "" && endpointCount > 0
}
// handleUpdateLoadbalancerFailure will handle the error returned by updatePolicy. If the error is due to unsupported feature,
// then it will set the supportedFeatures.ModifyLoadbalancer to false. return true means skip the iteration.
func (proxier *Proxier) handleUpdateLoadbalancerFailure(err error, hnsID, svcIP string, endpointCount int) (skipIteration bool) {
if err != nil {
if hcn.IsNotImplemented(err) {
klog.Warning("Update loadbalancer policies is not implemented.", "hnsID", hnsID, "svcIP", svcIP, "endpointCount", endpointCount)
proxier.supportedFeatures.ModifyLoadbalancer = false
} else {
klog.ErrorS(err, "Update loadbalancer policy failed", "hnsID", hnsID, "svcIP", svcIP, "endpointCount", endpointCount)
skipIteration = true
}
}
return skipIteration
}
// This is where all of the hns save/restore calls happen.
// assumes proxier.mu is held
func (proxier *Proxier) syncProxyRules() {
@ -1368,7 +1408,7 @@ func (proxier *Proxier) syncProxyRules() {
if len(svcInfo.hnsID) > 0 {
// This should not happen
klog.InfoS("Load Balancer already exists -- Debug ", "hnsID", svcInfo.hnsID)
klog.InfoS("Load Balancer already exists.", "hnsID", svcInfo.hnsID)
}
// In ETP:Cluster, if all endpoints are under termination,
@ -1396,7 +1436,6 @@ func (proxier *Proxier) syncProxyRules() {
}
endpointsAvailableForLB := !allEndpointsTerminating && !allEndpointsNonServing
proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &svcInfo.hnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), hnsEndpoints, queriedLoadBalancers)
// clusterIPEndpoints is the endpoint list used for creating ClusterIP loadbalancer.
clusterIPEndpoints := hnsEndpoints
@ -1405,30 +1444,50 @@ func (proxier *Proxier) syncProxyRules() {
clusterIPEndpoints = hnsLocalEndpoints
}
if len(clusterIPEndpoints) > 0 {
// If all endpoints are terminating, then no need to create Cluster IP LoadBalancer
// Cluster IP LoadBalancer creation
hnsLoadBalancer, err := hns.getLoadBalancer(
clusterIPEndpoints,
loadBalancerFlags{isDSR: proxier.isDSR, isIPv6: proxier.ipFamily == v1.IPv6Protocol, sessionAffinity: sessionAffinityClientIP},
if proxier.requiresUpdateLoadbalancer(svcInfo.hnsID, len(clusterIPEndpoints)) {
hnsLoadBalancer, err = hns.updateLoadBalancer(
svcInfo.hnsID,
sourceVip,
svcInfo.ClusterIP().String(),
clusterIPEndpoints,
loadBalancerFlags{isDSR: proxier.isDSR, isIPv6: proxier.ipFamily == v1.IPv6Protocol, sessionAffinity: sessionAffinityClientIP},
Enum(svcInfo.Protocol()),
uint16(svcInfo.targetPort),
uint16(svcInfo.Port()),
queriedLoadBalancers,
)
if err != nil {
klog.ErrorS(err, "Policy creation failed")
if skipIteration := proxier.handleUpdateLoadbalancerFailure(err, svcInfo.hnsID, svcInfo.ClusterIP().String(), len(clusterIPEndpoints)); skipIteration {
continue
}
}
svcInfo.hnsID = hnsLoadBalancer.hnsID
klog.V(3).InfoS("Hns LoadBalancer resource created for cluster ip resources", "clusterIP", svcInfo.ClusterIP(), "hnsID", hnsLoadBalancer.hnsID)
if !proxier.requiresUpdateLoadbalancer(svcInfo.hnsID, len(clusterIPEndpoints)) {
proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &svcInfo.hnsID, svcInfo.ClusterIP().String(), Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), hnsEndpoints, queriedLoadBalancers)
if len(clusterIPEndpoints) > 0 {
} else {
klog.V(3).InfoS("Skipped creating Hns LoadBalancer for cluster ip resources. Reason : all endpoints are terminating", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "allEndpointsTerminating", allEndpointsTerminating)
// If all endpoints are terminating, then no need to create Cluster IP LoadBalancer
// Cluster IP LoadBalancer creation
hnsLoadBalancer, err := hns.getLoadBalancer(
clusterIPEndpoints,
loadBalancerFlags{isDSR: proxier.isDSR, isIPv6: proxier.ipFamily == v1.IPv6Protocol, sessionAffinity: sessionAffinityClientIP},
sourceVip,
svcInfo.ClusterIP().String(),
Enum(svcInfo.Protocol()),
uint16(svcInfo.targetPort),
uint16(svcInfo.Port()),
queriedLoadBalancers,
)
if err != nil {
klog.ErrorS(err, "ClusterIP policy creation failed")
continue
}
svcInfo.hnsID = hnsLoadBalancer.hnsID
klog.V(3).InfoS("Hns LoadBalancer resource created for cluster ip resources", "clusterIP", svcInfo.ClusterIP(), "hnsID", hnsLoadBalancer.hnsID)
} else {
klog.V(3).InfoS("Skipped creating Hns LoadBalancer for cluster ip resources. Reason : all endpoints are terminating", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "allEndpointsTerminating", allEndpointsTerminating)
}
}
// If nodePort is specified, user should be able to use nodeIP:nodePort to reach the backend endpoints
@ -1440,29 +1499,48 @@ func (proxier *Proxier) syncProxyRules() {
nodePortEndpoints = hnsLocalEndpoints
}
proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &svcInfo.nodePorthnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), nodePortEndpoints, queriedLoadBalancers)
if len(nodePortEndpoints) > 0 && endpointsAvailableForLB {
// If all endpoints are in terminating stage, then no need to create Node Port LoadBalancer
hnsLoadBalancer, err := hns.getLoadBalancer(
nodePortEndpoints,
loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, localRoutedVIP: true, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol},
if proxier.requiresUpdateLoadbalancer(svcInfo.nodePorthnsID, len(nodePortEndpoints)) && endpointsAvailableForLB {
hnsLoadBalancer, err = hns.updateLoadBalancer(
svcInfo.nodePorthnsID,
sourceVip,
"",
nodePortEndpoints,
loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, localRoutedVIP: true, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol},
Enum(svcInfo.Protocol()),
uint16(svcInfo.targetPort),
uint16(svcInfo.NodePort()),
queriedLoadBalancers,
)
if err != nil {
klog.ErrorS(err, "Policy creation failed")
if skipIteration := proxier.handleUpdateLoadbalancerFailure(err, svcInfo.nodePorthnsID, sourceVip, len(nodePortEndpoints)); skipIteration {
continue
}
}
svcInfo.nodePorthnsID = hnsLoadBalancer.hnsID
klog.V(3).InfoS("Hns LoadBalancer resource created for nodePort resources", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "hnsID", hnsLoadBalancer.hnsID)
} else {
klog.V(3).InfoS("Skipped creating Hns LoadBalancer for nodePort resources", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "allEndpointsTerminating", allEndpointsTerminating)
if !proxier.requiresUpdateLoadbalancer(svcInfo.nodePorthnsID, len(nodePortEndpoints)) {
proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &svcInfo.nodePorthnsID, "", Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.NodePort()), nodePortEndpoints, queriedLoadBalancers)
if len(nodePortEndpoints) > 0 && endpointsAvailableForLB {
// If all endpoints are in terminating stage, then no need to create Node Port LoadBalancer
hnsLoadBalancer, err := hns.getLoadBalancer(
nodePortEndpoints,
loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, localRoutedVIP: true, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol},
sourceVip,
"",
Enum(svcInfo.Protocol()),
uint16(svcInfo.targetPort),
uint16(svcInfo.NodePort()),
queriedLoadBalancers,
)
if err != nil {
klog.ErrorS(err, "Nodeport policy creation failed")
continue
}
svcInfo.nodePorthnsID = hnsLoadBalancer.hnsID
klog.V(3).InfoS("Hns LoadBalancer resource created for nodePort resources", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "hnsID", hnsLoadBalancer.hnsID)
} else {
klog.V(3).InfoS("Skipped creating Hns LoadBalancer for nodePort resources", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "allEndpointsTerminating", allEndpointsTerminating)
}
}
}
@ -1474,29 +1552,48 @@ func (proxier *Proxier) syncProxyRules() {
externalIPEndpoints = hnsLocalEndpoints
}
proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &externalIP.hnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), externalIPEndpoints, queriedLoadBalancers)
if len(externalIPEndpoints) > 0 && endpointsAvailableForLB {
// If all endpoints are in terminating stage, then no need to External IP LoadBalancer
// Try loading existing policies, if already available
hnsLoadBalancer, err = hns.getLoadBalancer(
externalIPEndpoints,
loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol},
if proxier.requiresUpdateLoadbalancer(externalIP.hnsID, len(externalIPEndpoints)) && endpointsAvailableForLB {
hnsLoadBalancer, err = hns.updateLoadBalancer(
externalIP.hnsID,
sourceVip,
externalIP.ip,
externalIPEndpoints,
loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol},
Enum(svcInfo.Protocol()),
uint16(svcInfo.targetPort),
uint16(svcInfo.Port()),
queriedLoadBalancers,
)
if err != nil {
klog.ErrorS(err, "Policy creation failed")
if skipIteration := proxier.handleUpdateLoadbalancerFailure(err, externalIP.hnsID, externalIP.ip, len(externalIPEndpoints)); skipIteration {
continue
}
externalIP.hnsID = hnsLoadBalancer.hnsID
klog.V(3).InfoS("Hns LoadBalancer resource created for externalIP resources", "externalIP", externalIP, "hnsID", hnsLoadBalancer.hnsID)
} else {
klog.V(3).InfoS("Skipped creating Hns LoadBalancer for externalIP resources", "externalIP", externalIP, "allEndpointsTerminating", allEndpointsTerminating)
}
if !proxier.requiresUpdateLoadbalancer(externalIP.hnsID, len(externalIPEndpoints)) {
proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &externalIP.hnsID, externalIP.ip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), externalIPEndpoints, queriedLoadBalancers)
if len(externalIPEndpoints) > 0 && endpointsAvailableForLB {
// If all endpoints are in terminating stage, then no need to External IP LoadBalancer
// Try loading existing policies, if already available
hnsLoadBalancer, err = hns.getLoadBalancer(
externalIPEndpoints,
loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol},
sourceVip,
externalIP.ip,
Enum(svcInfo.Protocol()),
uint16(svcInfo.targetPort),
uint16(svcInfo.Port()),
queriedLoadBalancers,
)
if err != nil {
klog.ErrorS(err, "ExternalIP policy creation failed")
continue
}
externalIP.hnsID = hnsLoadBalancer.hnsID
klog.V(3).InfoS("Hns LoadBalancer resource created for externalIP resources", "externalIP", externalIP, "hnsID", hnsLoadBalancer.hnsID)
} else {
klog.V(3).InfoS("Skipped creating Hns LoadBalancer for externalIP resources", "externalIP", externalIP, "allEndpointsTerminating", allEndpointsTerminating)
}
}
}
// Create a Load Balancer Policy for each loadbalancer ingress
@ -1507,27 +1604,46 @@ func (proxier *Proxier) syncProxyRules() {
lbIngressEndpoints = hnsLocalEndpoints
}
proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.hnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), lbIngressEndpoints, queriedLoadBalancers)
if len(lbIngressEndpoints) > 0 {
hnsLoadBalancer, err := hns.getLoadBalancer(
lbIngressEndpoints,
loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.preserveDIP || svcInfo.localTrafficDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol},
if proxier.requiresUpdateLoadbalancer(lbIngressIP.hnsID, len(lbIngressEndpoints)) {
hnsLoadBalancer, err = hns.updateLoadBalancer(
lbIngressIP.hnsID,
sourceVip,
lbIngressIP.ip,
lbIngressEndpoints,
loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.preserveDIP || svcInfo.localTrafficDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol},
Enum(svcInfo.Protocol()),
uint16(svcInfo.targetPort),
uint16(svcInfo.Port()),
queriedLoadBalancers,
)
if err != nil {
klog.ErrorS(err, "Policy creation failed")
if skipIteration := proxier.handleUpdateLoadbalancerFailure(err, lbIngressIP.hnsID, lbIngressIP.ip, len(lbIngressEndpoints)); skipIteration {
continue
}
lbIngressIP.hnsID = hnsLoadBalancer.hnsID
klog.V(3).InfoS("Hns LoadBalancer resource created for loadBalancer Ingress resources", "lbIngressIP", lbIngressIP)
} else {
klog.V(3).InfoS("Skipped creating Hns LoadBalancer for loadBalancer Ingress resources", "lbIngressIP", lbIngressIP)
}
if !proxier.requiresUpdateLoadbalancer(lbIngressIP.hnsID, len(lbIngressEndpoints)) {
proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.hnsID, lbIngressIP.ip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), lbIngressEndpoints, queriedLoadBalancers)
if len(lbIngressEndpoints) > 0 {
hnsLoadBalancer, err := hns.getLoadBalancer(
lbIngressEndpoints,
loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.preserveDIP || svcInfo.localTrafficDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol},
sourceVip,
lbIngressIP.ip,
Enum(svcInfo.Protocol()),
uint16(svcInfo.targetPort),
uint16(svcInfo.Port()),
queriedLoadBalancers,
)
if err != nil {
klog.ErrorS(err, "IngressIP policy creation failed")
continue
}
lbIngressIP.hnsID = hnsLoadBalancer.hnsID
klog.V(3).InfoS("Hns LoadBalancer resource created for loadBalancer Ingress resources", "lbIngressIP", lbIngressIP)
} else {
klog.V(3).InfoS("Skipped creating Hns LoadBalancer for loadBalancer Ingress resources", "lbIngressIP", lbIngressIP)
}
}
if proxier.forwardHealthCheckVip && gatewayHnsendpoint != nil && endpointsAvailableForLB {
@ -1537,24 +1653,45 @@ func (proxier *Proxier) syncProxyRules() {
nodeport = svcInfo.HealthCheckNodePort()
}
proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.healthCheckHnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), []endpointInfo{*gatewayHnsendpoint}, queriedLoadBalancers)
gwEndpoints := []endpointInfo{*gatewayHnsendpoint}
hnsHealthCheckLoadBalancer, err := hns.getLoadBalancer(
[]endpointInfo{*gatewayHnsendpoint},
loadBalancerFlags{isDSR: false, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP},
sourceVip,
lbIngressIP.ip,
Enum(svcInfo.Protocol()),
uint16(nodeport),
uint16(nodeport),
queriedLoadBalancers,
)
if err != nil {
klog.ErrorS(err, "Policy creation failed")
continue
if proxier.requiresUpdateLoadbalancer(lbIngressIP.healthCheckHnsID, len(gwEndpoints)) {
hnsLoadBalancer, err = hns.updateLoadBalancer(
lbIngressIP.healthCheckHnsID,
sourceVip,
lbIngressIP.ip,
gwEndpoints,
loadBalancerFlags{isDSR: false, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP},
Enum(svcInfo.Protocol()),
uint16(nodeport),
uint16(nodeport),
queriedLoadBalancers,
)
if skipIteration := proxier.handleUpdateLoadbalancerFailure(err, lbIngressIP.healthCheckHnsID, lbIngressIP.ip, 1); skipIteration {
continue
}
}
if !proxier.requiresUpdateLoadbalancer(lbIngressIP.healthCheckHnsID, len(gwEndpoints)) {
proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.healthCheckHnsID, lbIngressIP.ip, Enum(svcInfo.Protocol()), uint16(nodeport), uint16(nodeport), gwEndpoints, queriedLoadBalancers)
hnsHealthCheckLoadBalancer, err := hns.getLoadBalancer(
gwEndpoints,
loadBalancerFlags{isDSR: false, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP},
sourceVip,
lbIngressIP.ip,
Enum(svcInfo.Protocol()),
uint16(nodeport),
uint16(nodeport),
queriedLoadBalancers,
)
if err != nil {
klog.ErrorS(err, "Healthcheck loadbalancer policy creation failed")
continue
}
lbIngressIP.healthCheckHnsID = hnsHealthCheckLoadBalancer.hnsID
klog.V(3).InfoS("Hns Health Check LoadBalancer resource created for loadBalancer Ingress resources", "ip", lbIngressIP)
}
lbIngressIP.healthCheckHnsID = hnsHealthCheckLoadBalancer.hnsID
klog.V(3).InfoS("Hns Health Check LoadBalancer resource created for loadBalancer Ingress resources", "ip", lbIngressIP)
} else {
klog.V(3).InfoS("Skipped creating Hns Health Check LoadBalancer for loadBalancer Ingress resources", "ip", lbIngressIP, "allEndpointsTerminating", allEndpointsTerminating)
}
@ -1586,11 +1723,12 @@ func (proxier *Proxier) syncProxyRules() {
}
// remove stale endpoint refcount entries
for hnsID, referenceCount := range proxier.endPointsRefCount {
if *referenceCount <= 0 {
klog.V(3).InfoS("Deleting unreferenced remote endpoint", "hnsID", hnsID)
proxier.hns.deleteEndpoint(hnsID)
delete(proxier.endPointsRefCount, hnsID)
for epIP := range proxier.terminatedEndpoints {
if epToDelete := queriedEndpoints[epIP]; epToDelete != nil && epToDelete.hnsID != "" {
if refCount := proxier.endPointsRefCount.getRefCount(epToDelete.hnsID); refCount == nil || *refCount == 0 {
klog.V(3).InfoS("Deleting unreferenced remote endpoint", "hnsID", epToDelete.hnsID)
proxier.hns.deleteEndpoint(epToDelete.hnsID)
}
}
}
// This will cleanup stale load balancers which are pending delete
@ -1600,7 +1738,7 @@ func (proxier *Proxier) syncProxyRules() {
// deleteExistingLoadBalancer checks whether loadbalancer delete is needed or not.
// If it is needed, the function will delete the existing loadbalancer and return true, else false.
func (proxier *Proxier) deleteExistingLoadBalancer(hns HostNetworkService, winProxyOptimization bool, lbHnsID *string, sourceVip string, protocol, intPort, extPort uint16, endpoints []endpointInfo, queriedLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) bool {
func (proxier *Proxier) deleteExistingLoadBalancer(hns HostNetworkService, winProxyOptimization bool, lbHnsID *string, vip string, protocol, intPort, extPort uint16, endpoints []endpointInfo, queriedLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) bool {
if !winProxyOptimization || *lbHnsID == "" {
// Loadbalancer delete not needed
@ -1609,7 +1747,7 @@ func (proxier *Proxier) deleteExistingLoadBalancer(hns HostNetworkService, winPr
lbID, lbIdErr := findLoadBalancerID(
endpoints,
sourceVip,
vip,
protocol,
intPort,
extPort,

View File

@ -118,6 +118,7 @@ func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, hostn
endPointsRefCount: make(endPointsReferenceCountMap),
forwardHealthCheckVip: true,
mapStaleLoadbalancers: make(map[string]bool),
terminatedEndpoints: make(map[string]bool),
}
serviceChanges := proxy.NewServiceChangeTracker(proxier.newServiceInfo, v1.IPv4Protocol, nil, proxier.serviceMapChange)
@ -682,6 +683,291 @@ func TestCreateLoadBalancer(t *testing.T) {
}
}
func TestUpdateLoadBalancerWhenSupported(t *testing.T) {
syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY)
if proxier == nil {
t.Error()
}
proxier.supportedFeatures.ModifyLoadbalancer = true
svcIP := "10.20.30.41"
svcPort := 80
svcNodePort := 3001
svcPortName := proxy.ServicePortName{
NamespacedName: makeNSN("ns1", "svc1"),
Port: "p80",
Protocol: v1.ProtocolTCP,
}
makeServiceMap(proxier,
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
svc.Spec.Type = "NodePort"
svc.Spec.ClusterIP = svcIP
svc.Spec.Ports = []v1.ServicePort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: v1.ProtocolTCP,
NodePort: int32(svcNodePort),
}}
}),
)
populateEndpointSlices(proxier,
makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epIpAddressRemote},
}}
eps.Ports = []discovery.EndpointPort{{
Name: ptr.To(svcPortName.Port),
Port: ptr.To(int32(svcPort)),
Protocol: ptr.To(v1.ProtocolTCP),
}}
}),
)
proxier.setInitialized(true)
proxier.syncProxyRules()
svc := proxier.svcPortMap[svcPortName]
svcInfo, ok := svc.(*serviceInfo)
if !ok {
t.Errorf("Failed to cast serviceInfo %q", svcPortName.String())
} else {
if svcInfo.hnsID != loadbalancerGuid1 {
t.Errorf("%v does not match %v", svcInfo.hnsID, loadbalancerGuid1)
}
}
proxier.setInitialized(false)
proxier.OnEndpointSliceUpdate(
makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epIpAddressRemote},
}}
eps.Ports = []discovery.EndpointPort{{
Name: ptr.To(svcPortName.Port),
Port: ptr.To(int32(svcPort)),
Protocol: ptr.To(v1.ProtocolTCP),
}}
}),
makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epPaAddress},
}}
eps.Ports = []discovery.EndpointPort{{
Name: ptr.To(svcPortName.Port),
Port: ptr.To(int32(svcPort)),
Protocol: ptr.To(v1.ProtocolTCP),
}}
}))
proxier.mu.Lock()
proxier.endpointSlicesSynced = true
proxier.mu.Unlock()
proxier.setInitialized(true)
epObj, err := proxier.hcn.GetEndpointByID("EPID-3")
if err != nil || epObj == nil {
t.Errorf("Failed to fetch endpoint: EPID-3")
}
proxier.syncProxyRules()
// The endpoint should be deleted as it is not present in the new endpoint slice
epObj, err = proxier.hcn.GetEndpointByID("EPID-3")
if err == nil || epObj != nil {
t.Errorf("Failed to fetch endpoint: EPID-3")
}
ep := proxier.endpointsMap[svcPortName][0]
epInfo, ok := ep.(*endpointInfo)
epObj, err = proxier.hcn.GetEndpointByID("EPID-5")
if err != nil || epObj == nil {
t.Errorf("Failed to fetch endpoint: EPID-5")
}
if !ok {
t.Errorf("Failed to cast endpointInfo %q", svcPortName.String())
} else {
if epInfo.hnsID != "EPID-5" {
t.Errorf("%v does not match %v", epInfo.hnsID, "EPID-5")
}
}
if *epInfo.refCount != 1 {
t.Errorf("Incorrect refcount. Current value: %v", *epInfo.refCount)
}
if *proxier.endPointsRefCount["EPID-5"] != *epInfo.refCount {
t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[endpointGuid1], *epInfo.refCount)
}
svc = proxier.svcPortMap[svcPortName]
svcInfo, ok = svc.(*serviceInfo)
if !ok {
t.Errorf("Failed to cast serviceInfo %q", svcPortName.String())
} else {
// Loadbalancer id should not change after the update
if svcInfo.hnsID != loadbalancerGuid1 {
t.Errorf("%v does not match %v", svcInfo.hnsID, loadbalancerGuid1)
}
}
}
func TestUpdateLoadBalancerWhenUnsupported(t *testing.T) {
syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY)
if proxier == nil {
t.Error()
}
// By default the value is false, for the readibility of the test case setting it to false again
proxier.supportedFeatures.ModifyLoadbalancer = false
svcIP := "10.20.30.41"
svcPort := 80
svcNodePort := 3001
svcPortName := proxy.ServicePortName{
NamespacedName: makeNSN("ns1", "svc1"),
Port: "p80",
Protocol: v1.ProtocolTCP,
}
makeServiceMap(proxier,
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
svc.Spec.Type = "NodePort"
svc.Spec.ClusterIP = svcIP
svc.Spec.Ports = []v1.ServicePort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: v1.ProtocolTCP,
NodePort: int32(svcNodePort),
}}
}),
)
populateEndpointSlices(proxier,
makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epIpAddressRemote},
}}
eps.Ports = []discovery.EndpointPort{{
Name: ptr.To(svcPortName.Port),
Port: ptr.To(int32(svcPort)),
Protocol: ptr.To(v1.ProtocolTCP),
}}
}),
)
proxier.setInitialized(true)
proxier.syncProxyRules()
svc := proxier.svcPortMap[svcPortName]
svcInfo, ok := svc.(*serviceInfo)
if !ok {
t.Errorf("Failed to cast serviceInfo %q", svcPortName.String())
} else {
if svcInfo.hnsID != loadbalancerGuid1 {
t.Errorf("%v does not match %v", svcInfo.hnsID, loadbalancerGuid1)
}
}
proxier.setInitialized(false)
proxier.OnEndpointSliceUpdate(
makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epIpAddressRemote},
}}
eps.Ports = []discovery.EndpointPort{{
Name: ptr.To(svcPortName.Port),
Port: ptr.To(int32(svcPort)),
Protocol: ptr.To(v1.ProtocolTCP),
}}
}),
makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epPaAddress},
}}
eps.Ports = []discovery.EndpointPort{{
Name: ptr.To(svcPortName.Port),
Port: ptr.To(int32(svcPort)),
Protocol: ptr.To(v1.ProtocolTCP),
}}
}))
proxier.mu.Lock()
proxier.endpointSlicesSynced = true
proxier.mu.Unlock()
proxier.setInitialized(true)
epObj, err := proxier.hcn.GetEndpointByID("EPID-3")
if err != nil || epObj == nil {
t.Errorf("Failed to fetch endpoint: EPID-3")
}
proxier.syncProxyRules()
// The endpoint should be deleted as it is not present in the new endpoint slice
epObj, err = proxier.hcn.GetEndpointByID("EPID-3")
if err == nil || epObj != nil {
t.Errorf("Failed to fetch endpoint: EPID-3")
}
ep := proxier.endpointsMap[svcPortName][0]
epInfo, ok := ep.(*endpointInfo)
epObj, err = proxier.hcn.GetEndpointByID("EPID-5")
if err != nil || epObj == nil {
t.Errorf("Failed to fetch endpoint: EPID-5")
}
if !ok {
t.Errorf("Failed to cast endpointInfo %q", svcPortName.String())
} else {
if epInfo.hnsID != "EPID-5" {
t.Errorf("%v does not match %v", epInfo.hnsID, "EPID-5")
}
}
if *epInfo.refCount != 1 {
t.Errorf("Incorrect refcount. Current value: %v", *epInfo.refCount)
}
if *proxier.endPointsRefCount["EPID-5"] != *epInfo.refCount {
t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[endpointGuid1], *epInfo.refCount)
}
svc = proxier.svcPortMap[svcPortName]
svcInfo, ok = svc.(*serviceInfo)
if !ok {
t.Errorf("Failed to cast serviceInfo %q", svcPortName.String())
} else {
// Loadbalancer id should change after the update
if svcInfo.hnsID != "LBID-3" {
t.Errorf("%v does not match %v", svcInfo.hnsID, "LBID-3")
}
}
}
func TestCreateDsrLoadBalancer(t *testing.T) {
syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY)

View File

@ -183,6 +183,15 @@ func (hcnObj HcnMock) CreateLoadBalancer(loadBalancer *hcn.HostComputeLoadBalanc
return loadBalancer, nil
}
func (hcnObj HcnMock) UpdateLoadBalancer(loadBalancer *hcn.HostComputeLoadBalancer, hnsLbID string) (*hcn.HostComputeLoadBalancer, error) {
if _, ok := loadbalancerMap[hnsLbID]; !ok {
return nil, fmt.Errorf("LoadBalancer id %s Not Present", loadBalancer.Id)
}
loadBalancer.Id = hnsLbID
loadbalancerMap[hnsLbID] = loadBalancer
return loadBalancer, nil
}
func (hcnObj HcnMock) DeleteLoadBalancer(loadBalancer *hcn.HostComputeLoadBalancer) error {
if _, ok := loadbalancerMap[loadBalancer.Id]; !ok {
return hcn.LoadBalancerNotFoundError{LoadBalancerId: loadBalancer.Id}