proxier/ipvs: fall back to ready terminating if no ready endpoint exists

Signed-off-by: Andrew Sy Kim <kim.andrewsy@gmail.com>
This commit is contained in:
Andrew Sy Kim 2021-03-10 21:35:54 -05:00
parent b54c0568d8
commit 9d4e24aa32

View File

@ -1237,7 +1237,11 @@ func (proxier *Proxier) syncProxyRules() {
activeBindAddrs[serv.Address.String()] = true activeBindAddrs[serv.Address.String()] = true
// ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP // ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP
// So we still need clusterIP rules in onlyNodeLocalEndpoints mode. // So we still need clusterIP rules in onlyNodeLocalEndpoints mode.
if err := proxier.syncEndpoint(svcName, false, svcInfo.NodeLocalInternal(), serv); err != nil { internalNodeLocal := false
if utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) && svcInfo.NodeLocalInternal() {
internalNodeLocal = true
}
if err := proxier.syncEndpoint(svcName, internalNodeLocal, serv); err != nil {
klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv.String()) klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv.String())
} }
} else { } else {
@ -1319,9 +1323,7 @@ func (proxier *Proxier) syncProxyRules() {
activeIPVSServices[serv.String()] = true activeIPVSServices[serv.String()] = true
activeBindAddrs[serv.Address.String()] = true activeBindAddrs[serv.Address.String()] = true
onlyNodeLocalEndpoints := svcInfo.NodeLocalExternal() if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), serv); err != nil {
onlyNodeLocalEndpointsForInternal := svcInfo.NodeLocalInternal()
if err := proxier.syncEndpoint(svcName, onlyNodeLocalEndpoints, onlyNodeLocalEndpointsForInternal, serv); err != nil {
klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv.String()) klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv.String())
} }
} else { } else {
@ -1422,7 +1424,7 @@ func (proxier *Proxier) syncProxyRules() {
if err := proxier.syncService(svcNameString, serv, true, bindedAddresses); err == nil { if err := proxier.syncService(svcNameString, serv, true, bindedAddresses); err == nil {
activeIPVSServices[serv.String()] = true activeIPVSServices[serv.String()] = true
activeBindAddrs[serv.Address.String()] = true activeBindAddrs[serv.Address.String()] = true
if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), svcInfo.NodeLocalInternal(), serv); err != nil { if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), serv); err != nil {
klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv) klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv)
} }
} else { } else {
@ -1590,7 +1592,7 @@ func (proxier *Proxier) syncProxyRules() {
// There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`. // There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`.
if err := proxier.syncService(svcNameString, serv, false, bindedAddresses); err == nil { if err := proxier.syncService(svcNameString, serv, false, bindedAddresses); err == nil {
activeIPVSServices[serv.String()] = true activeIPVSServices[serv.String()] = true
if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), svcInfo.NodeLocalInternal(), serv); err != nil { if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), serv); err != nil {
klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv) klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv)
} }
} else { } else {
@ -2041,7 +2043,7 @@ func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer,
return nil return nil
} }
func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNodeLocalEndpoints bool, onlyNodeLocalEndpointsForInternal bool, vs *utilipvs.VirtualServer) error { func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNodeLocalEndpoints bool, vs *utilipvs.VirtualServer) error {
appliedVirtualServer, err := proxier.ipvs.GetVirtualServer(vs) appliedVirtualServer, err := proxier.ipvs.GetVirtualServer(vs)
if err != nil { if err != nil {
klog.Errorf("Failed to get IPVS service, error: %v", err) klog.Errorf("Failed to get IPVS service, error: %v", err)
@ -2053,8 +2055,13 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
// curEndpoints represents IPVS destinations listed from current system. // curEndpoints represents IPVS destinations listed from current system.
curEndpoints := sets.NewString() curEndpoints := sets.NewString()
// newEndpoints represents Endpoints watched from API Server. // readyEndpoints represents Endpoints watched from API Server.
newEndpoints := sets.NewString() readyEndpoints := sets.NewString()
// localReadyEndpoints represents local endpoints that are ready and NOT terminating.
localReadyEndpoints := sets.NewString()
// localReadyTerminatingEndpoints represents local endpoints that are ready AND terminating.
// Fall back to these endpoints if no non-terminating ready endpoints exist for node-local traffic.
localReadyTerminatingEndpoints := sets.NewString()
curDests, err := proxier.ipvs.GetRealServers(appliedVirtualServer) curDests, err := proxier.ipvs.GetRealServers(appliedVirtualServer)
if err != nil { if err != nil {
@ -2079,15 +2086,28 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
} }
for _, epInfo := range endpoints { for _, epInfo := range endpoints {
if !epInfo.IsReady() { if epInfo.IsReady() {
continue readyEndpoints.Insert(epInfo.String())
} }
if onlyNodeLocalEndpoints && !epInfo.GetIsLocal() { if onlyNodeLocalEndpoints && epInfo.GetIsLocal() {
continue if epInfo.IsReady() {
localReadyEndpoints.Insert(epInfo.String())
} else if epInfo.IsServing() && epInfo.IsTerminating() {
localReadyTerminatingEndpoints.Insert(epInfo.String())
}
}
} }
newEndpoints.Insert(epInfo.String()) newEndpoints := readyEndpoints
if onlyNodeLocalEndpoints {
newEndpoints = localReadyEndpoints
if utilfeature.DefaultFeatureGate.Enabled(features.ProxyTerminatingEndpoints) {
if len(newEndpoints) == 0 && localReadyTerminatingEndpoints.Len() > 0 {
newEndpoints = localReadyTerminatingEndpoints
}
}
} }
// Create new endpoints // Create new endpoints