mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
[kube-proxy:nftables] Add partialSync mode to only transact changed
objects. Change the order of operations to stop current iteration if no changes to the service chains are needed. Bump syncProxy frequency to 1 hour. In a test kind cluster creation of 10K services, 2 endpoints each, takes ~25m before the fix and ~9min after. Maximum memory usage during creation is ~650MiB and 260MiB respectively. Another important metric is the time it takes to create 1 new service when 10K svc already exist. It used to take ~8m before the fix, with partialSync it takes ~141ms. Signed-off-by: Nadia Pinaeva <n.m.pinaeva@gmail.com>
This commit is contained in:
parent
dc13e42f56
commit
3ccf5b8a55
@ -316,6 +316,8 @@ func RegisterMetrics(mode kubeproxyconfig.ProxyMode) {
|
|||||||
legacyregistry.MustRegister(IPTablesRestoreFailuresTotal)
|
legacyregistry.MustRegister(IPTablesRestoreFailuresTotal)
|
||||||
|
|
||||||
case kubeproxyconfig.ProxyModeNFTables:
|
case kubeproxyconfig.ProxyModeNFTables:
|
||||||
|
legacyregistry.MustRegister(SyncFullProxyRulesLatency)
|
||||||
|
legacyregistry.MustRegister(SyncPartialProxyRulesLatency)
|
||||||
legacyregistry.MustRegister(NFTablesSyncFailuresTotal)
|
legacyregistry.MustRegister(NFTablesSyncFailuresTotal)
|
||||||
legacyregistry.MustRegister(NFTablesCleanupFailuresTotal)
|
legacyregistry.MustRegister(NFTablesCleanupFailuresTotal)
|
||||||
|
|
||||||
|
@ -162,6 +162,7 @@ type Proxier struct {
|
|||||||
// updating nftables with some partial data after kube-proxy restart.
|
// updating nftables with some partial data after kube-proxy restart.
|
||||||
endpointSlicesSynced bool
|
endpointSlicesSynced bool
|
||||||
servicesSynced bool
|
servicesSynced bool
|
||||||
|
needFullSync bool
|
||||||
initialized int32
|
initialized int32
|
||||||
syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
|
syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
|
||||||
syncPeriod time.Duration
|
syncPeriod time.Duration
|
||||||
@ -194,6 +195,13 @@ type Proxier struct {
|
|||||||
serviceCIDRs string
|
serviceCIDRs string
|
||||||
|
|
||||||
logger klog.Logger
|
logger klog.Logger
|
||||||
|
|
||||||
|
clusterIPs *nftElementStorage
|
||||||
|
serviceIPs *nftElementStorage
|
||||||
|
firewallIPs *nftElementStorage
|
||||||
|
noEndpointServices *nftElementStorage
|
||||||
|
noEndpointNodePorts *nftElementStorage
|
||||||
|
serviceNodePorts *nftElementStorage
|
||||||
}
|
}
|
||||||
|
|
||||||
// Proxier implements proxy.Provider
|
// Proxier implements proxy.Provider
|
||||||
@ -243,6 +251,7 @@ func NewProxier(ctx context.Context,
|
|||||||
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
|
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
|
||||||
endpointsMap: make(proxy.EndpointsMap),
|
endpointsMap: make(proxy.EndpointsMap),
|
||||||
endpointsChanges: proxy.NewEndpointsChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil),
|
endpointsChanges: proxy.NewEndpointsChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil),
|
||||||
|
needFullSync: true,
|
||||||
syncPeriod: syncPeriod,
|
syncPeriod: syncPeriod,
|
||||||
nftables: nft,
|
nftables: nft,
|
||||||
masqueradeAll: masqueradeAll,
|
masqueradeAll: masqueradeAll,
|
||||||
@ -258,11 +267,18 @@ func NewProxier(ctx context.Context,
|
|||||||
networkInterfacer: proxyutil.RealNetwork{},
|
networkInterfacer: proxyutil.RealNetwork{},
|
||||||
staleChains: make(map[string]time.Time),
|
staleChains: make(map[string]time.Time),
|
||||||
logger: logger,
|
logger: logger,
|
||||||
|
clusterIPs: newNFTElementStorage("set", clusterIPsSet),
|
||||||
|
serviceIPs: newNFTElementStorage("map", serviceIPsMap),
|
||||||
|
firewallIPs: newNFTElementStorage("map", firewallIPsMap),
|
||||||
|
noEndpointServices: newNFTElementStorage("map", noEndpointServicesMap),
|
||||||
|
noEndpointNodePorts: newNFTElementStorage("map", noEndpointNodePortsMap),
|
||||||
|
serviceNodePorts: newNFTElementStorage("map", serviceNodePortsMap),
|
||||||
}
|
}
|
||||||
|
|
||||||
burstSyncs := 2
|
burstSyncs := 2
|
||||||
logger.V(2).Info("NFTables sync params", "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs)
|
logger.V(2).Info("NFTables sync params", "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs)
|
||||||
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
|
// We need to pass *some* maxInterval to NewBoundedFrequencyRunner. time.Hour is arbitrary.
|
||||||
|
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, time.Hour, burstSyncs)
|
||||||
|
|
||||||
return proxier, nil
|
return proxier, nil
|
||||||
}
|
}
|
||||||
@ -421,17 +437,22 @@ var nftablesJumpChains = []nftablesJumpChain{
|
|||||||
// ensureChain adds commands to tx to ensure that chain exists and doesn't contain
|
// ensureChain adds commands to tx to ensure that chain exists and doesn't contain
|
||||||
// anything from before this transaction (using createdChains to ensure that we don't
|
// anything from before this transaction (using createdChains to ensure that we don't
|
||||||
// Flush a chain more than once and lose *new* rules as well.)
|
// Flush a chain more than once and lose *new* rules as well.)
|
||||||
func ensureChain(chain string, tx *knftables.Transaction, createdChains sets.Set[string]) {
|
// If skipCreation is true, chain will not be added to the transaction, but will be added to the createdChains
|
||||||
|
// for proper cleanup in the end of the sync iteration.
|
||||||
|
func ensureChain(chain string, tx *knftables.Transaction, createdChains sets.Set[string], skipCreation bool) {
|
||||||
if createdChains.Has(chain) {
|
if createdChains.Has(chain) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
createdChains.Insert(chain)
|
||||||
|
if skipCreation {
|
||||||
|
return
|
||||||
|
}
|
||||||
tx.Add(&knftables.Chain{
|
tx.Add(&knftables.Chain{
|
||||||
Name: chain,
|
Name: chain,
|
||||||
})
|
})
|
||||||
tx.Flush(&knftables.Chain{
|
tx.Flush(&knftables.Chain{
|
||||||
Name: chain,
|
Name: chain,
|
||||||
})
|
})
|
||||||
createdChains.Insert(chain)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) {
|
func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) {
|
||||||
@ -477,7 +498,7 @@ func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) {
|
|||||||
// Create and flush ordinary chains and add rules jumping to them
|
// Create and flush ordinary chains and add rules jumping to them
|
||||||
createdChains := sets.New[string]()
|
createdChains := sets.New[string]()
|
||||||
for _, c := range nftablesJumpChains {
|
for _, c := range nftablesJumpChains {
|
||||||
ensureChain(c.dstChain, tx, createdChains)
|
ensureChain(c.dstChain, tx, createdChains, false)
|
||||||
tx.Add(&knftables.Rule{
|
tx.Add(&knftables.Rule{
|
||||||
Chain: c.srcChain,
|
Chain: c.srcChain,
|
||||||
Rule: knftables.Concat(
|
Rule: knftables.Concat(
|
||||||
@ -489,7 +510,7 @@ func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) {
|
|||||||
|
|
||||||
// Ensure all of our other "top-level" chains exist
|
// Ensure all of our other "top-level" chains exist
|
||||||
for _, chain := range []string{servicesChain, clusterIPsCheckChain, masqueradingChain, markMasqChain} {
|
for _, chain := range []string{servicesChain, clusterIPsCheckChain, masqueradingChain, markMasqChain} {
|
||||||
ensureChain(chain, tx, createdChains)
|
ensureChain(chain, tx, createdChains, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add the rules in the mark-for-masquerade and masquerading chains
|
// Add the rules in the mark-for-masquerade and masquerading chains
|
||||||
@ -639,7 +660,7 @@ func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) {
|
|||||||
Comment: ptr.To("destinations that are subject to LoadBalancerSourceRanges"),
|
Comment: ptr.To("destinations that are subject to LoadBalancerSourceRanges"),
|
||||||
})
|
})
|
||||||
|
|
||||||
ensureChain(firewallCheckChain, tx, createdChains)
|
ensureChain(firewallCheckChain, tx, createdChains, false)
|
||||||
tx.Add(&knftables.Rule{
|
tx.Add(&knftables.Rule{
|
||||||
Chain: firewallCheckChain,
|
Chain: firewallCheckChain,
|
||||||
Rule: knftables.Concat(
|
Rule: knftables.Concat(
|
||||||
@ -686,6 +707,14 @@ func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) {
|
|||||||
),
|
),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// flush containers
|
||||||
|
proxier.clusterIPs.reset(tx)
|
||||||
|
proxier.serviceIPs.reset(tx)
|
||||||
|
proxier.firewallIPs.reset(tx)
|
||||||
|
proxier.noEndpointServices.reset(tx)
|
||||||
|
proxier.noEndpointNodePorts.reset(tx)
|
||||||
|
proxier.serviceNodePorts.reset(tx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// CleanupLeftovers removes all nftables rules and chains created by the Proxier
|
// CleanupLeftovers removes all nftables rules and chains created by the Proxier
|
||||||
@ -830,6 +859,7 @@ func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
|
|||||||
for k, v := range node.Labels {
|
for k, v := range node.Labels {
|
||||||
proxier.nodeLabels[k] = v
|
proxier.nodeLabels[k] = v
|
||||||
}
|
}
|
||||||
|
proxier.needFullSync = true
|
||||||
proxier.mu.Unlock()
|
proxier.mu.Unlock()
|
||||||
proxier.logger.V(4).Info("Updated proxier node labels", "labels", node.Labels)
|
proxier.logger.V(4).Info("Updated proxier node labels", "labels", node.Labels)
|
||||||
|
|
||||||
@ -854,6 +884,7 @@ func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
|
|||||||
for k, v := range node.Labels {
|
for k, v := range node.Labels {
|
||||||
proxier.nodeLabels[k] = v
|
proxier.nodeLabels[k] = v
|
||||||
}
|
}
|
||||||
|
proxier.needFullSync = true
|
||||||
proxier.mu.Unlock()
|
proxier.mu.Unlock()
|
||||||
proxier.logger.V(4).Info("Updated proxier node labels", "labels", node.Labels)
|
proxier.logger.V(4).Info("Updated proxier node labels", "labels", node.Labels)
|
||||||
|
|
||||||
@ -871,6 +902,7 @@ func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
|
|||||||
|
|
||||||
proxier.mu.Lock()
|
proxier.mu.Lock()
|
||||||
proxier.nodeLabels = nil
|
proxier.nodeLabels = nil
|
||||||
|
proxier.needFullSync = true
|
||||||
proxier.mu.Unlock()
|
proxier.mu.Unlock()
|
||||||
|
|
||||||
proxier.Sync()
|
proxier.Sync()
|
||||||
@ -1015,6 +1047,96 @@ func isAffinitySetName(set string) bool {
|
|||||||
return strings.HasPrefix(set, servicePortEndpointAffinityNamePrefix)
|
return strings.HasPrefix(set, servicePortEndpointAffinityNamePrefix)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// nftElementStorage is an internal representation of nftables map or set.
|
||||||
|
type nftElementStorage struct {
|
||||||
|
elements map[string]string
|
||||||
|
leftoverKeys sets.Set[string]
|
||||||
|
containerType string
|
||||||
|
containerName string
|
||||||
|
}
|
||||||
|
|
||||||
|
// joinNFTSlice converts nft element key or value (type []string) to string to store in the nftElementStorage.
|
||||||
|
// The separator is the same as the one used by nft commands, so we know that the parsing is going to be unambiguous.
|
||||||
|
func joinNFTSlice(k []string) string {
|
||||||
|
return strings.Join(k, " . ")
|
||||||
|
}
|
||||||
|
|
||||||
|
// splitNFTSlice converts nftElementStorage key or value string representation back to slice.
|
||||||
|
func splitNFTSlice(k string) []string {
|
||||||
|
return strings.Split(k, " . ")
|
||||||
|
}
|
||||||
|
|
||||||
|
// newNFTElementStorage creates an empty nftElementStorage.
|
||||||
|
// nftElementStorage.reset() must be called before the first usage.
|
||||||
|
func newNFTElementStorage(containerType, containerName string) *nftElementStorage {
|
||||||
|
c := &nftElementStorage{
|
||||||
|
elements: make(map[string]string),
|
||||||
|
leftoverKeys: sets.New[string](),
|
||||||
|
containerType: containerType,
|
||||||
|
containerName: containerName,
|
||||||
|
}
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
// reset clears the internal state and flushes the nftables map/set.
|
||||||
|
func (s *nftElementStorage) reset(tx *knftables.Transaction) {
|
||||||
|
clear(s.elements)
|
||||||
|
if s.containerType == "set" {
|
||||||
|
tx.Flush(&knftables.Set{
|
||||||
|
Name: s.containerName,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
tx.Flush(&knftables.Map{
|
||||||
|
Name: s.containerName,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
s.resetLeftoverKeys()
|
||||||
|
}
|
||||||
|
|
||||||
|
// resetLeftoverKeys is only called internally by nftElementStorage methods.
|
||||||
|
func (s *nftElementStorage) resetLeftoverKeys() {
|
||||||
|
clear(s.leftoverKeys)
|
||||||
|
for key := range s.elements {
|
||||||
|
s.leftoverKeys.Insert(key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ensureElem adds elem to the transaction if elem is not present in the container, and updates internal
|
||||||
|
// leftoverKeys set to track unused elements.
|
||||||
|
func (s *nftElementStorage) ensureElem(tx *knftables.Transaction, elem *knftables.Element) {
|
||||||
|
newKey := joinNFTSlice(elem.Key)
|
||||||
|
newValue := joinNFTSlice(elem.Value)
|
||||||
|
existingValue, exists := s.elements[newKey]
|
||||||
|
if exists {
|
||||||
|
if existingValue != newValue {
|
||||||
|
// value is different, delete and re-add
|
||||||
|
tx.Delete(elem)
|
||||||
|
tx.Add(elem)
|
||||||
|
s.elements[newKey] = newValue
|
||||||
|
}
|
||||||
|
delete(s.leftoverKeys, newKey)
|
||||||
|
} else {
|
||||||
|
tx.Add(elem)
|
||||||
|
s.elements[newKey] = newValue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *nftElementStorage) cleanupLeftoverKeys(tx *knftables.Transaction) {
|
||||||
|
for key := range s.leftoverKeys {
|
||||||
|
e := &knftables.Element{
|
||||||
|
Key: splitNFTSlice(key),
|
||||||
|
}
|
||||||
|
if s.containerType == "set" {
|
||||||
|
e.Set = s.containerName
|
||||||
|
} else {
|
||||||
|
e.Map = s.containerName
|
||||||
|
}
|
||||||
|
tx.Delete(e)
|
||||||
|
delete(s.elements, key)
|
||||||
|
}
|
||||||
|
s.resetLeftoverKeys()
|
||||||
|
}
|
||||||
|
|
||||||
// This is where all of the nftables calls happen.
|
// This is where all of the nftables calls happen.
|
||||||
// This assumes proxier.mu is NOT held
|
// This assumes proxier.mu is NOT held
|
||||||
func (proxier *Proxier) syncProxyRules() {
|
func (proxier *Proxier) syncProxyRules() {
|
||||||
@ -1031,10 +1153,19 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
// Below this point we will not return until we try to write the nftables rules.
|
// Below this point we will not return until we try to write the nftables rules.
|
||||||
//
|
//
|
||||||
|
|
||||||
|
// The value of proxier.needFullSync may change before the defer funcs run, so
|
||||||
|
// we need to keep track of whether it was set at the *start* of the sync.
|
||||||
|
tryPartialSync := !proxier.needFullSync
|
||||||
|
|
||||||
// Keep track of how long syncs take.
|
// Keep track of how long syncs take.
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
|
metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
|
||||||
|
if tryPartialSync {
|
||||||
|
metrics.SyncPartialProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
|
||||||
|
} else {
|
||||||
|
metrics.SyncFullProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
|
||||||
|
}
|
||||||
proxier.logger.V(2).Info("SyncProxyRules complete", "elapsed", time.Since(start))
|
proxier.logger.V(2).Info("SyncProxyRules complete", "elapsed", time.Since(start))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -1048,6 +1179,10 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
if !success {
|
if !success {
|
||||||
proxier.logger.Info("Sync failed", "retryingTime", proxier.syncPeriod)
|
proxier.logger.Info("Sync failed", "retryingTime", proxier.syncPeriod)
|
||||||
proxier.syncRunner.RetryAfter(proxier.syncPeriod)
|
proxier.syncRunner.RetryAfter(proxier.syncPeriod)
|
||||||
|
// proxier.serviceChanges and proxier.endpointChanges have already
|
||||||
|
// been flushed, so we've lost the state needed to be able to do
|
||||||
|
// a partial sync.
|
||||||
|
proxier.needFullSync = true
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -1081,7 +1216,9 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
|
|
||||||
// Now start the actual syncing transaction
|
// Now start the actual syncing transaction
|
||||||
tx := proxier.nftables.NewTransaction()
|
tx := proxier.nftables.NewTransaction()
|
||||||
proxier.setupNFTables(tx)
|
if !tryPartialSync {
|
||||||
|
proxier.setupNFTables(tx)
|
||||||
|
}
|
||||||
|
|
||||||
// We need to use, eg, "ip daddr" for IPv4 but "ip6 daddr" for IPv6
|
// We need to use, eg, "ip daddr" for IPv4 but "ip6 daddr" for IPv6
|
||||||
ipX := "ip"
|
ipX := "ip"
|
||||||
@ -1091,26 +1228,6 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
ipvX_addr = "ipv6_addr"
|
ipvX_addr = "ipv6_addr"
|
||||||
}
|
}
|
||||||
|
|
||||||
// We currently fully-rebuild our sets and maps on each resync
|
|
||||||
tx.Flush(&knftables.Set{
|
|
||||||
Name: clusterIPsSet,
|
|
||||||
})
|
|
||||||
tx.Flush(&knftables.Map{
|
|
||||||
Name: firewallIPsMap,
|
|
||||||
})
|
|
||||||
tx.Flush(&knftables.Map{
|
|
||||||
Name: noEndpointServicesMap,
|
|
||||||
})
|
|
||||||
tx.Flush(&knftables.Map{
|
|
||||||
Name: noEndpointNodePortsMap,
|
|
||||||
})
|
|
||||||
tx.Flush(&knftables.Map{
|
|
||||||
Name: serviceIPsMap,
|
|
||||||
})
|
|
||||||
tx.Flush(&knftables.Map{
|
|
||||||
Name: serviceNodePortsMap,
|
|
||||||
})
|
|
||||||
|
|
||||||
// Accumulate service/endpoint chains and affinity sets to keep.
|
// Accumulate service/endpoint chains and affinity sets to keep.
|
||||||
activeChains := sets.New[string]()
|
activeChains := sets.New[string]()
|
||||||
activeAffinitySets := sets.New[string]()
|
activeAffinitySets := sets.New[string]()
|
||||||
@ -1134,6 +1251,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
proxier.logger.Error(nil, "Failed to cast serviceInfo", "serviceName", svcName)
|
proxier.logger.Error(nil, "Failed to cast serviceInfo", "serviceName", svcName)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
protocol := strings.ToLower(string(svcInfo.Protocol()))
|
protocol := strings.ToLower(string(svcInfo.Protocol()))
|
||||||
svcPortNameString := svcInfo.nameString
|
svcPortNameString := svcInfo.nameString
|
||||||
|
|
||||||
@ -1144,10 +1262,20 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
allEndpoints := proxier.endpointsMap[svcName]
|
allEndpoints := proxier.endpointsMap[svcName]
|
||||||
clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeLabels)
|
clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeLabels)
|
||||||
|
|
||||||
|
// skipServiceUpdate is used for all service-related chains and their elements.
|
||||||
|
// If no changes were done to the service or its endpoints, these objects may be skipped.
|
||||||
|
skipServiceUpdate := tryPartialSync &&
|
||||||
|
!serviceUpdateResult.UpdatedServices.Has(svcName.NamespacedName) &&
|
||||||
|
!endpointUpdateResult.UpdatedServices.Has(svcName.NamespacedName)
|
||||||
|
|
||||||
// Note the endpoint chains that will be used
|
// Note the endpoint chains that will be used
|
||||||
for _, ep := range allLocallyReachableEndpoints {
|
for _, ep := range allLocallyReachableEndpoints {
|
||||||
if epInfo, ok := ep.(*endpointInfo); ok {
|
if epInfo, ok := ep.(*endpointInfo); ok {
|
||||||
ensureChain(epInfo.chainName, tx, activeChains)
|
ensureChain(epInfo.chainName, tx, activeChains, skipServiceUpdate)
|
||||||
|
// Note the affinity sets that will be used
|
||||||
|
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
|
||||||
|
activeAffinitySets.Insert(epInfo.affinitySetName)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1155,14 +1283,14 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
clusterPolicyChain := svcInfo.clusterPolicyChainName
|
clusterPolicyChain := svcInfo.clusterPolicyChainName
|
||||||
usesClusterPolicyChain := len(clusterEndpoints) > 0 && svcInfo.UsesClusterEndpoints()
|
usesClusterPolicyChain := len(clusterEndpoints) > 0 && svcInfo.UsesClusterEndpoints()
|
||||||
if usesClusterPolicyChain {
|
if usesClusterPolicyChain {
|
||||||
ensureChain(clusterPolicyChain, tx, activeChains)
|
ensureChain(clusterPolicyChain, tx, activeChains, skipServiceUpdate)
|
||||||
}
|
}
|
||||||
|
|
||||||
// localPolicyChain contains the endpoints used with "Local" traffic policy
|
// localPolicyChain contains the endpoints used with "Local" traffic policy
|
||||||
localPolicyChain := svcInfo.localPolicyChainName
|
localPolicyChain := svcInfo.localPolicyChainName
|
||||||
usesLocalPolicyChain := len(localEndpoints) > 0 && svcInfo.UsesLocalEndpoints()
|
usesLocalPolicyChain := len(localEndpoints) > 0 && svcInfo.UsesLocalEndpoints()
|
||||||
if usesLocalPolicyChain {
|
if usesLocalPolicyChain {
|
||||||
ensureChain(localPolicyChain, tx, activeChains)
|
ensureChain(localPolicyChain, tx, activeChains, skipServiceUpdate)
|
||||||
}
|
}
|
||||||
|
|
||||||
// internalPolicyChain is the chain containing the endpoints for
|
// internalPolicyChain is the chain containing the endpoints for
|
||||||
@ -1204,7 +1332,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
// are no externally-usable endpoints.
|
// are no externally-usable endpoints.
|
||||||
usesExternalTrafficChain := hasEndpoints && svcInfo.ExternallyAccessible()
|
usesExternalTrafficChain := hasEndpoints && svcInfo.ExternallyAccessible()
|
||||||
if usesExternalTrafficChain {
|
if usesExternalTrafficChain {
|
||||||
ensureChain(externalTrafficChain, tx, activeChains)
|
ensureChain(externalTrafficChain, tx, activeChains, skipServiceUpdate)
|
||||||
}
|
}
|
||||||
|
|
||||||
var internalTrafficFilterVerdict, externalTrafficFilterVerdict string
|
var internalTrafficFilterVerdict, externalTrafficFilterVerdict string
|
||||||
@ -1235,12 +1363,12 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Capture the clusterIP.
|
// Capture the clusterIP.
|
||||||
tx.Add(&knftables.Element{
|
proxier.clusterIPs.ensureElem(tx, &knftables.Element{
|
||||||
Set: clusterIPsSet,
|
Set: clusterIPsSet,
|
||||||
Key: []string{svcInfo.ClusterIP().String()},
|
Key: []string{svcInfo.ClusterIP().String()},
|
||||||
})
|
})
|
||||||
if hasInternalEndpoints {
|
if hasInternalEndpoints {
|
||||||
tx.Add(&knftables.Element{
|
proxier.serviceIPs.ensureElem(tx, &knftables.Element{
|
||||||
Map: serviceIPsMap,
|
Map: serviceIPsMap,
|
||||||
Key: []string{
|
Key: []string{
|
||||||
svcInfo.ClusterIP().String(),
|
svcInfo.ClusterIP().String(),
|
||||||
@ -1253,7 +1381,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
// No endpoints.
|
// No endpoints.
|
||||||
tx.Add(&knftables.Element{
|
proxier.noEndpointServices.ensureElem(tx, &knftables.Element{
|
||||||
Map: noEndpointServicesMap,
|
Map: noEndpointServicesMap,
|
||||||
Key: []string{
|
Key: []string{
|
||||||
svcInfo.ClusterIP().String(),
|
svcInfo.ClusterIP().String(),
|
||||||
@ -1272,7 +1400,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
if hasEndpoints {
|
if hasEndpoints {
|
||||||
// Send traffic bound for external IPs to the "external
|
// Send traffic bound for external IPs to the "external
|
||||||
// destinations" chain.
|
// destinations" chain.
|
||||||
tx.Add(&knftables.Element{
|
proxier.serviceIPs.ensureElem(tx, &knftables.Element{
|
||||||
Map: serviceIPsMap,
|
Map: serviceIPsMap,
|
||||||
Key: []string{
|
Key: []string{
|
||||||
externalIP.String(),
|
externalIP.String(),
|
||||||
@ -1288,7 +1416,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
// Either no endpoints at all (REJECT) or no endpoints for
|
// Either no endpoints at all (REJECT) or no endpoints for
|
||||||
// external traffic (DROP anything that didn't get
|
// external traffic (DROP anything that didn't get
|
||||||
// short-circuited by the EXT chain.)
|
// short-circuited by the EXT chain.)
|
||||||
tx.Add(&knftables.Element{
|
proxier.noEndpointServices.ensureElem(tx, &knftables.Element{
|
||||||
Map: noEndpointServicesMap,
|
Map: noEndpointServicesMap,
|
||||||
Key: []string{
|
Key: []string{
|
||||||
externalIP.String(),
|
externalIP.String(),
|
||||||
@ -1306,41 +1434,13 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
usesFWChain := len(svcInfo.LoadBalancerVIPs()) > 0 && len(svcInfo.LoadBalancerSourceRanges()) > 0
|
usesFWChain := len(svcInfo.LoadBalancerVIPs()) > 0 && len(svcInfo.LoadBalancerSourceRanges()) > 0
|
||||||
fwChain := svcInfo.firewallChainName
|
fwChain := svcInfo.firewallChainName
|
||||||
if usesFWChain {
|
if usesFWChain {
|
||||||
ensureChain(fwChain, tx, activeChains)
|
ensureChain(fwChain, tx, activeChains, skipServiceUpdate)
|
||||||
var sources []string
|
|
||||||
allowFromNode := false
|
|
||||||
for _, cidr := range svcInfo.LoadBalancerSourceRanges() {
|
|
||||||
if len(sources) > 0 {
|
|
||||||
sources = append(sources, ",")
|
|
||||||
}
|
|
||||||
sources = append(sources, cidr.String())
|
|
||||||
if cidr.Contains(proxier.nodeIP) {
|
|
||||||
allowFromNode = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// For VIP-like LBs, the VIP is often added as a local
|
|
||||||
// address (via an IP route rule). In that case, a request
|
|
||||||
// from a node to the VIP will not hit the loadbalancer but
|
|
||||||
// will loop back with the source IP set to the VIP. We
|
|
||||||
// need the following rules to allow requests from this node.
|
|
||||||
if allowFromNode {
|
|
||||||
for _, lbip := range svcInfo.LoadBalancerVIPs() {
|
|
||||||
sources = append(sources, ",", lbip.String())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tx.Add(&knftables.Rule{
|
|
||||||
Chain: fwChain,
|
|
||||||
Rule: knftables.Concat(
|
|
||||||
ipX, "saddr", "!=", "{", sources, "}",
|
|
||||||
"drop",
|
|
||||||
),
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Capture load-balancer ingress.
|
// Capture load-balancer ingress.
|
||||||
for _, lbip := range svcInfo.LoadBalancerVIPs() {
|
for _, lbip := range svcInfo.LoadBalancerVIPs() {
|
||||||
if hasEndpoints {
|
if hasEndpoints {
|
||||||
tx.Add(&knftables.Element{
|
proxier.serviceIPs.ensureElem(tx, &knftables.Element{
|
||||||
Map: serviceIPsMap,
|
Map: serviceIPsMap,
|
||||||
Key: []string{
|
Key: []string{
|
||||||
lbip.String(),
|
lbip.String(),
|
||||||
@ -1354,7 +1454,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if usesFWChain {
|
if usesFWChain {
|
||||||
tx.Add(&knftables.Element{
|
proxier.firewallIPs.ensureElem(tx, &knftables.Element{
|
||||||
Map: firewallIPsMap,
|
Map: firewallIPsMap,
|
||||||
Key: []string{
|
Key: []string{
|
||||||
lbip.String(),
|
lbip.String(),
|
||||||
@ -1372,7 +1472,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
// external traffic (DROP anything that didn't get short-circuited
|
// external traffic (DROP anything that didn't get short-circuited
|
||||||
// by the EXT chain.)
|
// by the EXT chain.)
|
||||||
for _, lbip := range svcInfo.LoadBalancerVIPs() {
|
for _, lbip := range svcInfo.LoadBalancerVIPs() {
|
||||||
tx.Add(&knftables.Element{
|
proxier.noEndpointServices.ensureElem(tx, &knftables.Element{
|
||||||
Map: noEndpointServicesMap,
|
Map: noEndpointServicesMap,
|
||||||
Key: []string{
|
Key: []string{
|
||||||
lbip.String(),
|
lbip.String(),
|
||||||
@ -1393,7 +1493,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
// Jump to the external destination chain. For better or for
|
// Jump to the external destination chain. For better or for
|
||||||
// worse, nodeports are not subject to loadBalancerSourceRanges,
|
// worse, nodeports are not subject to loadBalancerSourceRanges,
|
||||||
// and we can't change that.
|
// and we can't change that.
|
||||||
tx.Add(&knftables.Element{
|
proxier.serviceNodePorts.ensureElem(tx, &knftables.Element{
|
||||||
Map: serviceNodePortsMap,
|
Map: serviceNodePortsMap,
|
||||||
Key: []string{
|
Key: []string{
|
||||||
protocol,
|
protocol,
|
||||||
@ -1408,7 +1508,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
// Either no endpoints at all (REJECT) or no endpoints for
|
// Either no endpoints at all (REJECT) or no endpoints for
|
||||||
// external traffic (DROP anything that didn't get
|
// external traffic (DROP anything that didn't get
|
||||||
// short-circuited by the EXT chain.)
|
// short-circuited by the EXT chain.)
|
||||||
tx.Add(&knftables.Element{
|
proxier.noEndpointNodePorts.ensureElem(tx, &knftables.Element{
|
||||||
Map: noEndpointNodePortsMap,
|
Map: noEndpointNodePortsMap,
|
||||||
Key: []string{
|
Key: []string{
|
||||||
protocol,
|
protocol,
|
||||||
@ -1422,6 +1522,12 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// All the following operations are service-chain related and may be skipped if no svc or endpoint
|
||||||
|
// changes are required.
|
||||||
|
if skipServiceUpdate {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// Set up internal traffic handling.
|
// Set up internal traffic handling.
|
||||||
if hasInternalEndpoints {
|
if hasInternalEndpoints {
|
||||||
if proxier.masqueradeAll {
|
if proxier.masqueradeAll {
|
||||||
@ -1520,6 +1626,37 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if usesFWChain {
|
||||||
|
var sources []string
|
||||||
|
allowFromNode := false
|
||||||
|
for _, cidr := range svcInfo.LoadBalancerSourceRanges() {
|
||||||
|
if len(sources) > 0 {
|
||||||
|
sources = append(sources, ",")
|
||||||
|
}
|
||||||
|
sources = append(sources, cidr.String())
|
||||||
|
if cidr.Contains(proxier.nodeIP) {
|
||||||
|
allowFromNode = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// For VIP-like LBs, the VIP is often added as a local
|
||||||
|
// address (via an IP route rule). In that case, a request
|
||||||
|
// from a node to the VIP will not hit the loadbalancer but
|
||||||
|
// will loop back with the source IP set to the VIP. We
|
||||||
|
// need the following rules to allow requests from this node.
|
||||||
|
if allowFromNode {
|
||||||
|
for _, lbip := range svcInfo.LoadBalancerVIPs() {
|
||||||
|
sources = append(sources, ",", lbip.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tx.Add(&knftables.Rule{
|
||||||
|
Chain: fwChain,
|
||||||
|
Rule: knftables.Concat(
|
||||||
|
ipX, "saddr", "!=", "{", sources, "}",
|
||||||
|
"drop",
|
||||||
|
),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
|
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
|
||||||
// Generate the per-endpoint affinity sets
|
// Generate the per-endpoint affinity sets
|
||||||
for _, ep := range allLocallyReachableEndpoints {
|
for _, ep := range allLocallyReachableEndpoints {
|
||||||
@ -1553,7 +1690,6 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
},
|
},
|
||||||
Timeout: ptr.To(time.Duration(svcInfo.StickyMaxAgeSeconds()) * time.Second),
|
Timeout: ptr.To(time.Duration(svcInfo.StickyMaxAgeSeconds()) * time.Second),
|
||||||
})
|
})
|
||||||
activeAffinitySets.Insert(epInfo.affinitySetName)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1648,6 +1784,13 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
proxier.logger.Error(err, "Failed to list nftables sets: stale affinity sets will not be deleted")
|
proxier.logger.Error(err, "Failed to list nftables sets: stale affinity sets will not be deleted")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
proxier.clusterIPs.cleanupLeftoverKeys(tx)
|
||||||
|
proxier.serviceIPs.cleanupLeftoverKeys(tx)
|
||||||
|
proxier.firewallIPs.cleanupLeftoverKeys(tx)
|
||||||
|
proxier.noEndpointServices.cleanupLeftoverKeys(tx)
|
||||||
|
proxier.noEndpointNodePorts.cleanupLeftoverKeys(tx)
|
||||||
|
proxier.serviceNodePorts.cleanupLeftoverKeys(tx)
|
||||||
|
|
||||||
// Sync rules.
|
// Sync rules.
|
||||||
proxier.logger.V(2).Info("Reloading service nftables data",
|
proxier.logger.V(2).Info("Reloading service nftables data",
|
||||||
"numServices", len(proxier.svcPortMap),
|
"numServices", len(proxier.svcPortMap),
|
||||||
@ -1669,6 +1812,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
success = true
|
success = true
|
||||||
|
proxier.needFullSync = false
|
||||||
|
|
||||||
for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes {
|
for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes {
|
||||||
for _, lastChangeTriggerTime := range lastChangeTriggerTimes {
|
for _, lastChangeTriggerTime := range lastChangeTriggerTimes {
|
||||||
|
@ -119,6 +119,7 @@ func NewFakeProxier(ipFamily v1.IPFamily) (*knftables.Fake, *Proxier) {
|
|||||||
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, nil, nil),
|
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, nil, nil),
|
||||||
endpointsMap: make(proxy.EndpointsMap),
|
endpointsMap: make(proxy.EndpointsMap),
|
||||||
endpointsChanges: proxy.NewEndpointsChangeTracker(testHostname, newEndpointInfo, ipFamily, nil, nil),
|
endpointsChanges: proxy.NewEndpointsChangeTracker(testHostname, newEndpointInfo, ipFamily, nil, nil),
|
||||||
|
needFullSync: true,
|
||||||
nftables: nft,
|
nftables: nft,
|
||||||
masqueradeMark: "0x4000",
|
masqueradeMark: "0x4000",
|
||||||
conntrack: conntrack.NewFake(),
|
conntrack: conntrack.NewFake(),
|
||||||
@ -130,6 +131,12 @@ func NewFakeProxier(ipFamily v1.IPFamily) (*knftables.Fake, *Proxier) {
|
|||||||
networkInterfacer: networkInterfacer,
|
networkInterfacer: networkInterfacer,
|
||||||
staleChains: make(map[string]time.Time),
|
staleChains: make(map[string]time.Time),
|
||||||
serviceCIDRs: serviceCIDRs,
|
serviceCIDRs: serviceCIDRs,
|
||||||
|
clusterIPs: newNFTElementStorage("set", clusterIPsSet),
|
||||||
|
serviceIPs: newNFTElementStorage("map", serviceIPsMap),
|
||||||
|
firewallIPs: newNFTElementStorage("map", firewallIPsMap),
|
||||||
|
noEndpointServices: newNFTElementStorage("map", noEndpointServicesMap),
|
||||||
|
noEndpointNodePorts: newNFTElementStorage("map", noEndpointNodePortsMap),
|
||||||
|
serviceNodePorts: newNFTElementStorage("map", serviceNodePortsMap),
|
||||||
}
|
}
|
||||||
p.setInitialized(true)
|
p.setInitialized(true)
|
||||||
p.syncRunner = async.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1)
|
p.syncRunner = async.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1)
|
||||||
|
Loading…
Reference in New Issue
Block a user