diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 48b61d636dc..ee688ff870c 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -1237,7 +1237,11 @@ func (proxier *Proxier) syncProxyRules() { activeBindAddrs[serv.Address.String()] = true // ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP // So we still need clusterIP rules in onlyNodeLocalEndpoints mode. - if err := proxier.syncEndpoint(svcName, false, svcInfo.NodeLocalInternal(), serv); err != nil { + internalNodeLocal := false + if utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) && svcInfo.NodeLocalInternal() { + internalNodeLocal = true + } + if err := proxier.syncEndpoint(svcName, internalNodeLocal, serv); err != nil { klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv.String()) } } else { @@ -1319,9 +1323,7 @@ func (proxier *Proxier) syncProxyRules() { activeIPVSServices[serv.String()] = true activeBindAddrs[serv.Address.String()] = true - onlyNodeLocalEndpoints := svcInfo.NodeLocalExternal() - onlyNodeLocalEndpointsForInternal := svcInfo.NodeLocalInternal() - if err := proxier.syncEndpoint(svcName, onlyNodeLocalEndpoints, onlyNodeLocalEndpointsForInternal, serv); err != nil { + if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), serv); err != nil { klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv.String()) } } else { @@ -1422,7 +1424,7 @@ func (proxier *Proxier) syncProxyRules() { if err := proxier.syncService(svcNameString, serv, true, bindedAddresses); err == nil { activeIPVSServices[serv.String()] = true activeBindAddrs[serv.Address.String()] = true - if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), svcInfo.NodeLocalInternal(), serv); err != nil { + if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), serv); err != nil { klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv) } } else { @@ -1590,7 +1592,7 @@ func (proxier *Proxier) syncProxyRules() { // There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`. if err := proxier.syncService(svcNameString, serv, false, bindedAddresses); err == nil { activeIPVSServices[serv.String()] = true - if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), svcInfo.NodeLocalInternal(), serv); err != nil { + if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), serv); err != nil { klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv) } } else { @@ -2041,7 +2043,7 @@ func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, return nil } -func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNodeLocalEndpoints bool, onlyNodeLocalEndpointsForInternal bool, vs *utilipvs.VirtualServer) error { +func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNodeLocalEndpoints bool, vs *utilipvs.VirtualServer) error { appliedVirtualServer, err := proxier.ipvs.GetVirtualServer(vs) if err != nil { klog.Errorf("Failed to get IPVS service, error: %v", err) @@ -2053,8 +2055,13 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode // curEndpoints represents IPVS destinations listed from current system. curEndpoints := sets.NewString() - // newEndpoints represents Endpoints watched from API Server. - newEndpoints := sets.NewString() + // readyEndpoints represents Endpoints watched from API Server. + readyEndpoints := sets.NewString() + // localReadyEndpoints represents local endpoints that are ready and NOT terminating. + localReadyEndpoints := sets.NewString() + // localReadyTerminatingEndpoints represents local endpoints that are ready AND terminating. + // Fall back to these endpoints if no non-terminating ready endpoints exist for node-local traffic. + localReadyTerminatingEndpoints := sets.NewString() curDests, err := proxier.ipvs.GetRealServers(appliedVirtualServer) if err != nil { @@ -2079,15 +2086,28 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode } for _, epInfo := range endpoints { - if !epInfo.IsReady() { - continue + if epInfo.IsReady() { + readyEndpoints.Insert(epInfo.String()) } - if onlyNodeLocalEndpoints && !epInfo.GetIsLocal() { - continue + if onlyNodeLocalEndpoints && epInfo.GetIsLocal() { + if epInfo.IsReady() { + localReadyEndpoints.Insert(epInfo.String()) + } else if epInfo.IsServing() && epInfo.IsTerminating() { + localReadyTerminatingEndpoints.Insert(epInfo.String()) + } } + } - newEndpoints.Insert(epInfo.String()) + newEndpoints := readyEndpoints + if onlyNodeLocalEndpoints { + newEndpoints = localReadyEndpoints + + if utilfeature.DefaultFeatureGate.Enabled(features.ProxyTerminatingEndpoints) { + if len(newEndpoints) == 0 && localReadyTerminatingEndpoints.Len() > 0 { + newEndpoints = localReadyTerminatingEndpoints + } + } } // Create new endpoints