mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 03:11:40 +00:00
Merge pull request #113776 from princepereira/ppereira-kubeproxy-kep1669
Windows Kube-Proxy implementation of ProxyTerminatingEndpoints feature
This commit is contained in:
commit
5b54d48357
@ -244,6 +244,20 @@ func (hns hns) deleteEndpoint(hnsID string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// findLoadBalancerID will construct a id from the provided loadbalancer fields
|
||||
func findLoadBalancerID(endpoints []endpointsInfo, vip string, protocol, internalPort, externalPort uint16) (loadBalancerIdentifier, error) {
|
||||
// Compute hash from backends (endpoint IDs)
|
||||
hash, err := hashEndpoints(endpoints)
|
||||
if err != nil {
|
||||
klog.V(2).ErrorS(err, "Error hashing endpoints", "endpoints", endpoints)
|
||||
return loadBalancerIdentifier{}, err
|
||||
}
|
||||
if len(vip) > 0 {
|
||||
return loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: vip, endpointsHash: hash}, nil
|
||||
}
|
||||
return loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, endpointsHash: hash}, nil
|
||||
}
|
||||
|
||||
func (hns hns) getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerInfo, error) {
|
||||
lbs, err := hcn.ListLoadBalancers()
|
||||
var id loadBalancerIdentifier
|
||||
@ -396,7 +410,7 @@ func hashEndpoints[T string | endpointsInfo](endpoints []T) (hash [20]byte, err
|
||||
for _, ep := range endpoints {
|
||||
switch x := any(ep).(type) {
|
||||
case endpointsInfo:
|
||||
id = x.hnsID
|
||||
id = strings.ToUpper(x.hnsID)
|
||||
case string:
|
||||
id = x
|
||||
}
|
||||
|
@ -128,6 +128,7 @@ type serviceInfo struct {
|
||||
hns HostNetworkService
|
||||
preserveDIP bool
|
||||
localTrafficDSR bool
|
||||
winProxyOptimization bool
|
||||
}
|
||||
|
||||
type hnsNetworkInfo struct {
|
||||
@ -144,7 +145,12 @@ type remoteSubnetInfo struct {
|
||||
drMacAddress string
|
||||
}
|
||||
|
||||
const NETWORK_TYPE_OVERLAY = "overlay"
|
||||
const (
|
||||
NETWORK_TYPE_OVERLAY = "overlay"
|
||||
// MAX_COUNT_STALE_LOADBALANCERS is the maximum number of stale loadbalancers which cleanedup in single syncproxyrules.
|
||||
// If there are more stale loadbalancers to clean, it will go to next iteration of syncproxyrules.
|
||||
MAX_COUNT_STALE_LOADBALANCERS = 20
|
||||
)
|
||||
|
||||
func newHostNetworkService() (HostNetworkService, hcn.SupportedFeatures) {
|
||||
var h HostNetworkService
|
||||
@ -157,6 +163,44 @@ func newHostNetworkService() (HostNetworkService, hcn.SupportedFeatures) {
|
||||
return h, supportedFeatures
|
||||
}
|
||||
|
||||
// logFormattedEndpoints will log all endpoints and its states which are taking part in endpointmap change.
|
||||
// This mostly for debugging purpose and verbosity is set to 5.
|
||||
func logFormattedEndpoints(logMsg string, logLevel klog.Level, svcPortName proxy.ServicePortName, eps []proxy.Endpoint) {
|
||||
if klog.V(logLevel).Enabled() {
|
||||
var epInfo string
|
||||
for _, v := range eps {
|
||||
epInfo = epInfo + fmt.Sprintf("\n %s={Ready:%v,Serving:%v,Terminating:%v,IsRemote:%v}", v.String(), v.IsReady(), v.IsServing(), v.IsTerminating(), !v.GetIsLocal())
|
||||
}
|
||||
klog.V(logLevel).InfoS(logMsg, "svcPortName", svcPortName, "endpoints", epInfo)
|
||||
}
|
||||
}
|
||||
|
||||
// This will cleanup stale load balancers which are pending delete
|
||||
// in last iteration. This function will act like a self healing of stale
|
||||
// loadbalancer entries.
|
||||
func (proxier *Proxier) cleanupStaleLoadbalancers() {
|
||||
i := 0
|
||||
countStaleLB := len(proxier.mapStaleLoadbalancers)
|
||||
if countStaleLB == 0 {
|
||||
return
|
||||
}
|
||||
klog.V(3).InfoS("Cleanup of stale loadbalancers triggered", "LB Count", countStaleLB)
|
||||
for lbID := range proxier.mapStaleLoadbalancers {
|
||||
i++
|
||||
if err := proxier.hns.deleteLoadBalancer(lbID); err == nil {
|
||||
delete(proxier.mapStaleLoadbalancers, lbID)
|
||||
}
|
||||
if i == MAX_COUNT_STALE_LOADBALANCERS {
|
||||
// The remaining stale loadbalancers will be cleaned up in next iteration
|
||||
break
|
||||
}
|
||||
}
|
||||
countStaleLB = len(proxier.mapStaleLoadbalancers)
|
||||
if countStaleLB > 0 {
|
||||
klog.V(3).InfoS("Stale loadbalancers still remaining", "LB Count", countStaleLB, "stale_lb_ids", proxier.mapStaleLoadbalancers)
|
||||
}
|
||||
}
|
||||
|
||||
func getNetworkName(hnsNetworkName string) (string, error) {
|
||||
if len(hnsNetworkName) == 0 {
|
||||
klog.V(3).InfoS("Flag --network-name not set, checking environment variable")
|
||||
@ -313,16 +357,24 @@ func conjureMac(macPrefix string, ip net.IP) string {
|
||||
}
|
||||
|
||||
func (proxier *Proxier) endpointsMapChange(oldEndpointsMap, newEndpointsMap proxy.EndpointsMap) {
|
||||
for svcPortName := range oldEndpointsMap {
|
||||
proxier.onEndpointsMapChange(&svcPortName)
|
||||
// This will optimize remote endpoint and loadbalancer deletion based on the annotation
|
||||
var svcPortMap = make(map[proxy.ServicePortName]bool)
|
||||
var logLevel klog.Level = 5
|
||||
for svcPortName, eps := range oldEndpointsMap {
|
||||
logFormattedEndpoints("endpointsMapChange oldEndpointsMap", logLevel, svcPortName, eps)
|
||||
svcPortMap[svcPortName] = true
|
||||
proxier.onEndpointsMapChange(&svcPortName, false)
|
||||
}
|
||||
|
||||
for svcPortName := range newEndpointsMap {
|
||||
proxier.onEndpointsMapChange(&svcPortName)
|
||||
for svcPortName, eps := range newEndpointsMap {
|
||||
logFormattedEndpoints("endpointsMapChange newEndpointsMap", logLevel, svcPortName, eps)
|
||||
// redundantCleanup true means cleanup is called second time on the same svcPort
|
||||
redundantCleanup := svcPortMap[svcPortName]
|
||||
proxier.onEndpointsMapChange(&svcPortName, redundantCleanup)
|
||||
}
|
||||
}
|
||||
|
||||
func (proxier *Proxier) onEndpointsMapChange(svcPortName *proxy.ServicePortName) {
|
||||
func (proxier *Proxier) onEndpointsMapChange(svcPortName *proxy.ServicePortName, redundantCleanup bool) {
|
||||
|
||||
svc, exists := proxier.svcPortMap[*svcPortName]
|
||||
|
||||
@ -334,8 +386,15 @@ func (proxier *Proxier) onEndpointsMapChange(svcPortName *proxy.ServicePortName)
|
||||
return
|
||||
}
|
||||
|
||||
if svcInfo.winProxyOptimization && redundantCleanup {
|
||||
// This is a second cleanup call.
|
||||
// Second cleanup on the same svcPort will be ignored if the
|
||||
// winProxyOptimization is Enabled
|
||||
return
|
||||
}
|
||||
|
||||
klog.V(3).InfoS("Endpoints are modified. Service is stale", "servicePortName", svcPortName)
|
||||
svcInfo.cleanupAllPolicies(proxier.endpointsMap[*svcPortName])
|
||||
svcInfo.cleanupAllPolicies(proxier.endpointsMap[*svcPortName], proxier.mapStaleLoadbalancers, true)
|
||||
} else {
|
||||
// If no service exists, just cleanup the remote endpoints
|
||||
klog.V(3).InfoS("Endpoints are orphaned, cleaning up")
|
||||
@ -382,7 +441,7 @@ func (proxier *Proxier) onServiceMapChange(svcPortName *proxy.ServicePortName) {
|
||||
}
|
||||
|
||||
klog.V(3).InfoS("Updating existing service port", "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP(), "port", svcInfo.Port(), "protocol", svcInfo.Protocol())
|
||||
svcInfo.cleanupAllPolicies(proxier.endpointsMap[*svcPortName])
|
||||
svcInfo.cleanupAllPolicies(proxier.endpointsMap[*svcPortName], proxier.mapStaleLoadbalancers, false)
|
||||
}
|
||||
}
|
||||
|
||||
@ -427,6 +486,13 @@ func newSourceVIP(hns HostNetworkService, network string, ip string, mac string,
|
||||
return ep, err
|
||||
}
|
||||
|
||||
func (ep *endpointsInfo) DecrementRefCount() {
|
||||
klog.V(3).InfoS("Decrementing Endpoint RefCount", "endpointsInfo", ep)
|
||||
if !ep.GetIsLocal() && ep.refCount != nil && *ep.refCount > 0 {
|
||||
*ep.refCount--
|
||||
}
|
||||
}
|
||||
|
||||
func (ep *endpointsInfo) Cleanup() {
|
||||
klog.V(3).InfoS("Endpoint cleanup", "endpointsInfo", ep)
|
||||
if !ep.GetIsLocal() && ep.refCount != nil {
|
||||
@ -462,6 +528,8 @@ func (refCountMap endPointsReferenceCountMap) getRefCount(hnsID string) *uint16
|
||||
func (proxier *Proxier) newServiceInfo(port *v1.ServicePort, service *v1.Service, bsvcPortInfo *proxy.BaseServicePortInfo) proxy.ServicePort {
|
||||
info := &serviceInfo{BaseServicePortInfo: bsvcPortInfo}
|
||||
preserveDIP := service.Annotations["preserve-destination"] == "true"
|
||||
// Annotation introduced to enable optimized loadbalancing
|
||||
winProxyOptimization := !(strings.ToUpper(service.Annotations["winProxyOptimization"]) == "DISABLED")
|
||||
localTrafficDSR := service.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyLocal
|
||||
err := hcn.DSRSupported()
|
||||
if err != nil {
|
||||
@ -479,6 +547,9 @@ func (proxier *Proxier) newServiceInfo(port *v1.ServicePort, service *v1.Service
|
||||
info.targetPort = targetPort
|
||||
info.hns = proxier.hns
|
||||
info.localTrafficDSR = localTrafficDSR
|
||||
info.winProxyOptimization = winProxyOptimization
|
||||
|
||||
klog.V(3).InfoS("Flags enabled for service", "service", service.Name, "localTrafficDSR", localTrafficDSR, "preserveDIP", preserveDIP, "winProxyOptimization", winProxyOptimization)
|
||||
|
||||
for _, eip := range service.Spec.ExternalIPs {
|
||||
info.externalIPs = append(info.externalIPs, &externalIPInfo{ip: eip})
|
||||
@ -562,6 +633,7 @@ type Proxier struct {
|
||||
|
||||
forwardHealthCheckVip bool
|
||||
rootHnsEndpointName string
|
||||
mapStaleLoadbalancers map[string]bool // This maintains entries of stale load balancers which are pending delete in last iteration
|
||||
}
|
||||
|
||||
type localPort struct {
|
||||
@ -720,6 +792,7 @@ func NewProxier(
|
||||
healthzPort: healthzPort,
|
||||
rootHnsEndpointName: config.RootHnsEndpointName,
|
||||
forwardHealthCheckVip: config.ForwardHealthCheckVip,
|
||||
mapStaleLoadbalancers: make(map[string]bool),
|
||||
}
|
||||
|
||||
ipFamily := v1.IPv4Protocol
|
||||
@ -781,17 +854,27 @@ func CleanupLeftovers() (encounteredError bool) {
|
||||
return encounteredError
|
||||
}
|
||||
|
||||
func (svcInfo *serviceInfo) cleanupAllPolicies(endpoints []proxy.Endpoint) {
|
||||
func (svcInfo *serviceInfo) cleanupAllPolicies(endpoints []proxy.Endpoint, mapStaleLoadbalancers map[string]bool, isEndpointChange bool) {
|
||||
klog.V(3).InfoS("Service cleanup", "serviceInfo", svcInfo)
|
||||
// if it's an endpoint change and winProxyOptimization annotation enable, skip lb deletion and remoteEndpoint deletion
|
||||
winProxyOptimization := isEndpointChange && svcInfo.winProxyOptimization
|
||||
if winProxyOptimization {
|
||||
klog.V(3).InfoS("Skipped loadbalancer deletion.", "hnsID", svcInfo.hnsID, "nodePorthnsID", svcInfo.nodePorthnsID, "winProxyOptimization", svcInfo.winProxyOptimization, "isEndpointChange", isEndpointChange)
|
||||
} else {
|
||||
// Skip the svcInfo.policyApplied check to remove all the policies
|
||||
svcInfo.deleteLoadBalancerPolicy()
|
||||
svcInfo.deleteLoadBalancerPolicy(mapStaleLoadbalancers)
|
||||
}
|
||||
// Cleanup Endpoints references
|
||||
for _, ep := range endpoints {
|
||||
epInfo, ok := ep.(*endpointsInfo)
|
||||
if ok {
|
||||
if winProxyOptimization {
|
||||
epInfo.DecrementRefCount()
|
||||
} else {
|
||||
epInfo.Cleanup()
|
||||
}
|
||||
}
|
||||
}
|
||||
if svcInfo.remoteEndpoint != nil {
|
||||
svcInfo.remoteEndpoint.Cleanup()
|
||||
}
|
||||
@ -799,10 +882,11 @@ func (svcInfo *serviceInfo) cleanupAllPolicies(endpoints []proxy.Endpoint) {
|
||||
svcInfo.policyApplied = false
|
||||
}
|
||||
|
||||
func (svcInfo *serviceInfo) deleteLoadBalancerPolicy() {
|
||||
func (svcInfo *serviceInfo) deleteLoadBalancerPolicy(mapStaleLoadbalancer map[string]bool) {
|
||||
// Remove the Hns Policy corresponding to this service
|
||||
hns := svcInfo.hns
|
||||
if err := hns.deleteLoadBalancer(svcInfo.hnsID); err != nil {
|
||||
mapStaleLoadbalancer[svcInfo.hnsID] = true
|
||||
klog.V(1).ErrorS(err, "Error deleting Hns loadbalancer policy resource.", "hnsID", svcInfo.hnsID, "ClusterIP", svcInfo.ClusterIP())
|
||||
} else {
|
||||
// On successful delete, remove hnsId
|
||||
@ -810,6 +894,7 @@ func (svcInfo *serviceInfo) deleteLoadBalancerPolicy() {
|
||||
}
|
||||
|
||||
if err := hns.deleteLoadBalancer(svcInfo.nodePorthnsID); err != nil {
|
||||
mapStaleLoadbalancer[svcInfo.nodePorthnsID] = true
|
||||
klog.V(1).ErrorS(err, "Error deleting Hns NodePort policy resource.", "hnsID", svcInfo.nodePorthnsID, "NodePort", svcInfo.NodePort())
|
||||
} else {
|
||||
// On successful delete, remove hnsId
|
||||
@ -817,6 +902,7 @@ func (svcInfo *serviceInfo) deleteLoadBalancerPolicy() {
|
||||
}
|
||||
|
||||
for _, externalIP := range svcInfo.externalIPs {
|
||||
mapStaleLoadbalancer[externalIP.hnsID] = true
|
||||
if err := hns.deleteLoadBalancer(externalIP.hnsID); err != nil {
|
||||
klog.V(1).ErrorS(err, "Error deleting Hns ExternalIP policy resource.", "hnsID", externalIP.hnsID, "IP", externalIP.ip)
|
||||
} else {
|
||||
@ -825,7 +911,9 @@ func (svcInfo *serviceInfo) deleteLoadBalancerPolicy() {
|
||||
}
|
||||
}
|
||||
for _, lbIngressIP := range svcInfo.loadBalancerIngressIPs {
|
||||
klog.V(3).InfoS("Loadbalancer Hns LoadBalancer delete triggered for loadBalancer Ingress resources in cleanup", "lbIngressIP", lbIngressIP)
|
||||
if err := hns.deleteLoadBalancer(lbIngressIP.hnsID); err != nil {
|
||||
mapStaleLoadbalancer[lbIngressIP.hnsID] = true
|
||||
klog.V(1).ErrorS(err, "Error deleting Hns IngressIP policy resource.", "hnsID", lbIngressIP.hnsID, "IP", lbIngressIP.ip)
|
||||
} else {
|
||||
// On successful delete, remove hnsId
|
||||
@ -834,6 +922,7 @@ func (svcInfo *serviceInfo) deleteLoadBalancerPolicy() {
|
||||
|
||||
if lbIngressIP.healthCheckHnsID != "" {
|
||||
if err := hns.deleteLoadBalancer(lbIngressIP.healthCheckHnsID); err != nil {
|
||||
mapStaleLoadbalancer[lbIngressIP.healthCheckHnsID] = true
|
||||
klog.V(1).ErrorS(err, "Error deleting Hns IngressIP HealthCheck policy resource.", "hnsID", lbIngressIP.healthCheckHnsID, "IP", lbIngressIP.ip)
|
||||
} else {
|
||||
// On successful delete, remove hnsId
|
||||
@ -993,7 +1082,7 @@ func (proxier *Proxier) cleanupAllPolicies() {
|
||||
klog.ErrorS(nil, "Failed to cast serviceInfo", "serviceName", svcName)
|
||||
continue
|
||||
}
|
||||
svcInfo.cleanupAllPolicies(proxier.endpointsMap[svcName])
|
||||
svcInfo.cleanupAllPolicies(proxier.endpointsMap[svcName], proxier.mapStaleLoadbalancers, false)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1010,6 +1099,56 @@ func isNetworkNotFoundError(err error) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// isAllEndpointsTerminating function will return true if all the endpoints are terminating.
|
||||
// If atleast one is not terminating, then return false
|
||||
func (proxier *Proxier) isAllEndpointsTerminating(svcName proxy.ServicePortName, isLocalTrafficDSR bool) bool {
|
||||
for _, epInfo := range proxier.endpointsMap[svcName] {
|
||||
ep, ok := epInfo.(*endpointsInfo)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if isLocalTrafficDSR && !ep.GetIsLocal() {
|
||||
// KEP-1669: Ignore remote endpoints when the ExternalTrafficPolicy is Local (DSR Mode)
|
||||
continue
|
||||
}
|
||||
// If Readiness Probe fails and pod is not under delete, then
|
||||
// the state of the endpoint will be - Ready:False, Serving:False, Terminating:False
|
||||
if !ep.IsReady() && !ep.IsTerminating() {
|
||||
// Ready:false, Terminating:False, ignore
|
||||
continue
|
||||
}
|
||||
if !ep.IsTerminating() {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// isAllEndpointsNonServing function will return true if all the endpoints are non serving.
|
||||
// If atleast one is serving, then return false
|
||||
func (proxier *Proxier) isAllEndpointsNonServing(svcName proxy.ServicePortName, isLocalTrafficDSR bool) bool {
|
||||
for _, epInfo := range proxier.endpointsMap[svcName] {
|
||||
ep, ok := epInfo.(*endpointsInfo)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if isLocalTrafficDSR && !ep.GetIsLocal() {
|
||||
continue
|
||||
}
|
||||
if ep.IsServing() {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// updateQueriedEndpoints updates the queriedEndpoints map with newly created endpoint details
|
||||
func updateQueriedEndpoints(newHnsEndpoint *endpointsInfo, queriedEndpoints map[string]*endpointsInfo) {
|
||||
// store newly created endpoints in queriedEndpoints
|
||||
queriedEndpoints[newHnsEndpoint.hnsID] = newHnsEndpoint
|
||||
queriedEndpoints[newHnsEndpoint.ip] = newHnsEndpoint
|
||||
}
|
||||
|
||||
// This is where all of the hns save/restore calls happen.
|
||||
// assumes proxier.mu is held
|
||||
func (proxier *Proxier) syncProxyRules() {
|
||||
@ -1126,9 +1265,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
newHnsEndpoint.refCount = proxier.endPointsRefCount.getRefCount(newHnsEndpoint.hnsID)
|
||||
*newHnsEndpoint.refCount++
|
||||
svcInfo.remoteEndpoint = newHnsEndpoint
|
||||
// store newly created endpoints in queriedEndpoints
|
||||
queriedEndpoints[newHnsEndpoint.hnsID] = newHnsEndpoint
|
||||
queriedEndpoints[newHnsEndpoint.ip] = newHnsEndpoint
|
||||
updateQueriedEndpoints(newHnsEndpoint, queriedEndpoints)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1138,6 +1275,19 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// Create Remote endpoints for every endpoint, corresponding to the service
|
||||
containsPublicIP := false
|
||||
containsNodeIP := false
|
||||
var allEndpointsTerminating, allEndpointsNonServing bool
|
||||
someEndpointsServing := true
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.ProxyTerminatingEndpoints) && len(svcInfo.loadBalancerIngressIPs) > 0 {
|
||||
// Check should be done only if comes under the feature gate or enabled
|
||||
// The check should be done only if Spec.Type == Loadbalancer.
|
||||
allEndpointsTerminating = proxier.isAllEndpointsTerminating(svcName, svcInfo.localTrafficDSR)
|
||||
allEndpointsNonServing = proxier.isAllEndpointsNonServing(svcName, svcInfo.localTrafficDSR)
|
||||
someEndpointsServing = !allEndpointsNonServing
|
||||
klog.V(4).InfoS("Terminating status checked for all endpoints", "svcClusterIP", svcInfo.ClusterIP(), "allEndpointsTerminating", allEndpointsTerminating, "allEndpointsNonServing", allEndpointsNonServing, "localTrafficDSR", svcInfo.localTrafficDSR)
|
||||
} else {
|
||||
klog.V(4).InfoS("Skipped terminating status check for all endpoints", "svcClusterIP", svcInfo.ClusterIP(), "proxyEndpointsFeatureGateEnabled", utilfeature.DefaultFeatureGate.Enabled(kubefeatures.ProxyTerminatingEndpoints), "ingressLBCount", len(svcInfo.loadBalancerIngressIPs))
|
||||
}
|
||||
|
||||
for _, epInfo := range proxier.endpointsMap[svcName] {
|
||||
ep, ok := epInfo.(*endpointsInfo)
|
||||
@ -1146,9 +1296,19 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
continue
|
||||
}
|
||||
|
||||
if !ep.IsReady() {
|
||||
if someEndpointsServing {
|
||||
|
||||
if !allEndpointsTerminating && !ep.IsReady() {
|
||||
klog.V(3).InfoS("Skipping the endpoint for LB creation. Endpoint is either not ready or all not all endpoints are terminating", "EpIP", ep.ip, " EpPort", ep.port, "allEndpointsTerminating", allEndpointsTerminating, "IsEpReady", ep.IsReady())
|
||||
continue
|
||||
}
|
||||
if !ep.IsServing() {
|
||||
klog.V(3).InfoS("Skipping the endpoint for LB creation. Endpoint is not serving", "EpIP", ep.ip, " EpPort", ep.port, "IsEpServing", ep.IsServing())
|
||||
continue
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
var newHnsEndpoint *endpointsInfo
|
||||
hnsNetworkName := proxier.network.name
|
||||
var err error
|
||||
@ -1206,6 +1366,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
klog.ErrorS(err, "Remote endpoint creation failed", "endpointsInfo", hnsEndpoint)
|
||||
continue
|
||||
}
|
||||
updateQueriedEndpoints(newHnsEndpoint, queriedEndpoints)
|
||||
} else {
|
||||
|
||||
hnsEndpoint := &endpointsInfo{
|
||||
@ -1219,6 +1380,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
klog.ErrorS(err, "Remote endpoint creation failed")
|
||||
continue
|
||||
}
|
||||
updateQueriedEndpoints(newHnsEndpoint, queriedEndpoints)
|
||||
}
|
||||
}
|
||||
// For Overlay networks 'SourceVIP' on an Load balancer Policy can either be chosen as
|
||||
@ -1267,7 +1429,14 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
klog.InfoS("Load Balancer already exists -- Debug ", "hnsID", svcInfo.hnsID)
|
||||
}
|
||||
|
||||
// In ETP:Cluster, if all endpoints are under termination,
|
||||
// it will have serving and terminating, else only ready and serving
|
||||
if len(hnsEndpoints) == 0 {
|
||||
if svcInfo.winProxyOptimization {
|
||||
// Deleting loadbalancers when there are no endpoints to serve.
|
||||
klog.V(3).InfoS("Cleanup existing ", "endpointsInfo", hnsEndpoints, "serviceName", svcName)
|
||||
svcInfo.deleteLoadBalancerPolicy(proxier.mapStaleLoadbalancers)
|
||||
}
|
||||
klog.ErrorS(nil, "Endpoint information not available for service, not applying any policy", "serviceName", svcName)
|
||||
continue
|
||||
}
|
||||
@ -1284,6 +1453,12 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
klog.InfoS("Session Affinity is not supported on this version of Windows")
|
||||
}
|
||||
|
||||
endpointsAvailableForLB := !allEndpointsTerminating && !allEndpointsNonServing
|
||||
proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &svcInfo.hnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), hnsEndpoints, queriedLoadBalancers)
|
||||
|
||||
if endpointsAvailableForLB {
|
||||
// If all endpoints are terminating, then no need to create Cluster IP LoadBalancer
|
||||
// Cluster IP LoadBalancer creation
|
||||
hnsLoadBalancer, err := hns.getLoadBalancer(
|
||||
hnsEndpoints,
|
||||
loadBalancerFlags{isDSR: proxier.isDSR, isIPv6: proxier.isIPv6Mode, sessionAffinity: sessionAffinityClientIP},
|
||||
@ -1302,6 +1477,10 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
svcInfo.hnsID = hnsLoadBalancer.hnsID
|
||||
klog.V(3).InfoS("Hns LoadBalancer resource created for cluster ip resources", "clusterIP", svcInfo.ClusterIP(), "hnsID", hnsLoadBalancer.hnsID)
|
||||
|
||||
} else {
|
||||
klog.V(3).InfoS("Skipped creating Hns LoadBalancer for cluster ip resources. Reason : all endpoints are terminating", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "allEndpointsTerminating", allEndpointsTerminating)
|
||||
}
|
||||
|
||||
// If nodePort is specified, user should be able to use nodeIP:nodePort to reach the backend endpoints
|
||||
if svcInfo.NodePort() > 0 {
|
||||
// If the preserve-destination service annotation is present, we will disable routing mesh for NodePort.
|
||||
@ -1311,7 +1490,10 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
nodePortEndpoints = hnsLocalEndpoints
|
||||
}
|
||||
|
||||
if len(nodePortEndpoints) > 0 {
|
||||
proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &svcInfo.nodePorthnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), nodePortEndpoints, queriedLoadBalancers)
|
||||
|
||||
if len(nodePortEndpoints) > 0 && endpointsAvailableForLB {
|
||||
// If all endpoints are in terminating stage, then no need to create Node Port LoadBalancer
|
||||
hnsLoadBalancer, err := hns.getLoadBalancer(
|
||||
nodePortEndpoints,
|
||||
loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, localRoutedVIP: true, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode},
|
||||
@ -1330,7 +1512,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
svcInfo.nodePorthnsID = hnsLoadBalancer.hnsID
|
||||
klog.V(3).InfoS("Hns LoadBalancer resource created for nodePort resources", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "hnsID", hnsLoadBalancer.hnsID)
|
||||
} else {
|
||||
klog.V(3).InfoS("Skipped creating Hns LoadBalancer for nodePort resources", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "hnsID", hnsLoadBalancer.hnsID)
|
||||
klog.V(3).InfoS("Skipped creating Hns LoadBalancer for nodePort resources", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "allEndpointsTerminating", allEndpointsTerminating)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1342,7 +1524,10 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
externalIPEndpoints = hnsLocalEndpoints
|
||||
}
|
||||
|
||||
if len(externalIPEndpoints) > 0 {
|
||||
proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &externalIP.hnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), externalIPEndpoints, queriedLoadBalancers)
|
||||
|
||||
if len(externalIPEndpoints) > 0 && endpointsAvailableForLB {
|
||||
// If all endpoints are in terminating stage, then no need to External IP LoadBalancer
|
||||
// Try loading existing policies, if already available
|
||||
hnsLoadBalancer, err = hns.getLoadBalancer(
|
||||
externalIPEndpoints,
|
||||
@ -1361,7 +1546,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
externalIP.hnsID = hnsLoadBalancer.hnsID
|
||||
klog.V(3).InfoS("Hns LoadBalancer resource created for externalIP resources", "externalIP", externalIP, "hnsID", hnsLoadBalancer.hnsID)
|
||||
} else {
|
||||
klog.V(3).InfoS("Skipped creating Hns LoadBalancer for externalIP resources", "externalIP", externalIP, "hnsID", hnsLoadBalancer.hnsID)
|
||||
klog.V(3).InfoS("Skipped creating Hns LoadBalancer for externalIP resources", "externalIP", externalIP, "allEndpointsTerminating", allEndpointsTerminating)
|
||||
}
|
||||
}
|
||||
// Create a Load Balancer Policy for each loadbalancer ingress
|
||||
@ -1372,6 +1557,8 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
lbIngressEndpoints = hnsLocalEndpoints
|
||||
}
|
||||
|
||||
proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.hnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), lbIngressEndpoints, queriedLoadBalancers)
|
||||
|
||||
if len(lbIngressEndpoints) > 0 {
|
||||
hnsLoadBalancer, err := hns.getLoadBalancer(
|
||||
lbIngressEndpoints,
|
||||
@ -1393,11 +1580,15 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
klog.V(3).InfoS("Skipped creating Hns LoadBalancer for loadBalancer Ingress resources", "lbIngressIP", lbIngressIP)
|
||||
}
|
||||
|
||||
if proxier.forwardHealthCheckVip && gatewayHnsendpoint != nil {
|
||||
if proxier.forwardHealthCheckVip && gatewayHnsendpoint != nil && endpointsAvailableForLB {
|
||||
// Avoid creating health check loadbalancer if all the endpoints are terminating
|
||||
nodeport := proxier.healthzPort
|
||||
if svcInfo.HealthCheckNodePort() != 0 {
|
||||
nodeport = svcInfo.HealthCheckNodePort()
|
||||
}
|
||||
|
||||
proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.healthCheckHnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), []endpointsInfo{*gatewayHnsendpoint}, queriedLoadBalancers)
|
||||
|
||||
hnsHealthCheckLoadBalancer, err := hns.getLoadBalancer(
|
||||
[]endpointsInfo{*gatewayHnsendpoint},
|
||||
loadBalancerFlags{isDSR: false, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP},
|
||||
@ -1414,6 +1605,8 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
}
|
||||
lbIngressIP.healthCheckHnsID = hnsHealthCheckLoadBalancer.hnsID
|
||||
klog.V(3).InfoS("Hns Health Check LoadBalancer resource created for loadBalancer Ingress resources", "ip", lbIngressIP)
|
||||
} else {
|
||||
klog.V(3).InfoS("Skipped creating Hns Health Check LoadBalancer for loadBalancer Ingress resources", "ip", lbIngressIP, "allEndpointsTerminating", allEndpointsTerminating)
|
||||
}
|
||||
}
|
||||
svcInfo.policyApplied = true
|
||||
@ -1445,7 +1638,51 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// remove stale endpoint refcount entries
|
||||
for hnsID, referenceCount := range proxier.endPointsRefCount {
|
||||
if *referenceCount <= 0 {
|
||||
klog.V(3).InfoS("Deleting unreferenced remote endpoint", "hnsID", hnsID)
|
||||
proxier.hns.deleteEndpoint(hnsID)
|
||||
delete(proxier.endPointsRefCount, hnsID)
|
||||
}
|
||||
}
|
||||
// This will cleanup stale load balancers which are pending delete
|
||||
// in last iteration
|
||||
proxier.cleanupStaleLoadbalancers()
|
||||
}
|
||||
|
||||
// deleteExistingLoadBalancer checks whether loadbalancer delete is needed or not.
|
||||
// If it is needed, the function will delete the existing loadbalancer and return true, else false.
|
||||
func (proxier *Proxier) deleteExistingLoadBalancer(hns HostNetworkService, winProxyOptimization bool, lbHnsID *string, sourceVip string, protocol, intPort, extPort uint16, endpoints []endpointsInfo, queriedLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) bool {
|
||||
|
||||
if !winProxyOptimization || *lbHnsID == "" {
|
||||
// Loadbalancer delete not needed
|
||||
return false
|
||||
}
|
||||
|
||||
lbID, lbIdErr := findLoadBalancerID(
|
||||
endpoints,
|
||||
sourceVip,
|
||||
protocol,
|
||||
intPort,
|
||||
extPort,
|
||||
)
|
||||
|
||||
if lbIdErr != nil {
|
||||
return proxier.deleteLoadBalancer(hns, lbHnsID)
|
||||
}
|
||||
|
||||
if _, ok := queriedLoadBalancers[lbID]; ok {
|
||||
// The existing loadbalancer in the system is same as what we try to delete and recreate. So we skip deleting.
|
||||
return false
|
||||
}
|
||||
|
||||
return proxier.deleteLoadBalancer(hns, lbHnsID)
|
||||
}
|
||||
|
||||
func (proxier *Proxier) deleteLoadBalancer(hns HostNetworkService, lbHnsID *string) bool {
|
||||
klog.V(3).InfoS("Hns LoadBalancer delete triggered for loadBalancer resources", "lbHnsID", *lbHnsID)
|
||||
if err := hns.deleteLoadBalancer(*lbHnsID); err != nil {
|
||||
// This will be cleanup by cleanupStaleLoadbalancer fnction.
|
||||
proxier.mapStaleLoadbalancers[*lbHnsID] = true
|
||||
}
|
||||
*lbHnsID = ""
|
||||
return true
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user