diff --git a/pkg/proxy/metrics/metrics.go b/pkg/proxy/metrics/metrics.go index 55696e147fb..96b4f42487f 100644 --- a/pkg/proxy/metrics/metrics.go +++ b/pkg/proxy/metrics/metrics.go @@ -316,6 +316,8 @@ func RegisterMetrics(mode kubeproxyconfig.ProxyMode) { legacyregistry.MustRegister(IPTablesRestoreFailuresTotal) case kubeproxyconfig.ProxyModeNFTables: + legacyregistry.MustRegister(SyncFullProxyRulesLatency) + legacyregistry.MustRegister(SyncPartialProxyRulesLatency) legacyregistry.MustRegister(NFTablesSyncFailuresTotal) legacyregistry.MustRegister(NFTablesCleanupFailuresTotal) diff --git a/pkg/proxy/nftables/proxier.go b/pkg/proxy/nftables/proxier.go index 6ca1070855a..c770be081dd 100644 --- a/pkg/proxy/nftables/proxier.go +++ b/pkg/proxy/nftables/proxier.go @@ -162,6 +162,7 @@ type Proxier struct { // updating nftables with some partial data after kube-proxy restart. endpointSlicesSynced bool servicesSynced bool + needFullSync bool initialized int32 syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules syncPeriod time.Duration @@ -194,6 +195,13 @@ type Proxier struct { serviceCIDRs string logger klog.Logger + + clusterIPs *nftElementStorage + serviceIPs *nftElementStorage + firewallIPs *nftElementStorage + noEndpointServices *nftElementStorage + noEndpointNodePorts *nftElementStorage + serviceNodePorts *nftElementStorage } // Proxier implements proxy.Provider @@ -243,6 +251,7 @@ func NewProxier(ctx context.Context, serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), endpointsMap: make(proxy.EndpointsMap), endpointsChanges: proxy.NewEndpointsChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil), + needFullSync: true, syncPeriod: syncPeriod, nftables: nft, masqueradeAll: masqueradeAll, @@ -258,11 +267,18 @@ func NewProxier(ctx context.Context, networkInterfacer: proxyutil.RealNetwork{}, staleChains: make(map[string]time.Time), logger: logger, + clusterIPs: newNFTElementStorage("set", clusterIPsSet), + serviceIPs: newNFTElementStorage("map", serviceIPsMap), + firewallIPs: newNFTElementStorage("map", firewallIPsMap), + noEndpointServices: newNFTElementStorage("map", noEndpointServicesMap), + noEndpointNodePorts: newNFTElementStorage("map", noEndpointNodePortsMap), + serviceNodePorts: newNFTElementStorage("map", serviceNodePortsMap), } burstSyncs := 2 logger.V(2).Info("NFTables sync params", "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs) - proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs) + // We need to pass *some* maxInterval to NewBoundedFrequencyRunner. time.Hour is arbitrary. + proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, time.Hour, burstSyncs) return proxier, nil } @@ -421,17 +437,22 @@ var nftablesJumpChains = []nftablesJumpChain{ // ensureChain adds commands to tx to ensure that chain exists and doesn't contain // anything from before this transaction (using createdChains to ensure that we don't // Flush a chain more than once and lose *new* rules as well.) -func ensureChain(chain string, tx *knftables.Transaction, createdChains sets.Set[string]) { +// If skipCreation is true, chain will not be added to the transaction, but will be added to the createdChains +// for proper cleanup in the end of the sync iteration. +func ensureChain(chain string, tx *knftables.Transaction, createdChains sets.Set[string], skipCreation bool) { if createdChains.Has(chain) { return } + createdChains.Insert(chain) + if skipCreation { + return + } tx.Add(&knftables.Chain{ Name: chain, }) tx.Flush(&knftables.Chain{ Name: chain, }) - createdChains.Insert(chain) } func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) { @@ -477,7 +498,7 @@ func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) { // Create and flush ordinary chains and add rules jumping to them createdChains := sets.New[string]() for _, c := range nftablesJumpChains { - ensureChain(c.dstChain, tx, createdChains) + ensureChain(c.dstChain, tx, createdChains, false) tx.Add(&knftables.Rule{ Chain: c.srcChain, Rule: knftables.Concat( @@ -489,7 +510,7 @@ func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) { // Ensure all of our other "top-level" chains exist for _, chain := range []string{servicesChain, clusterIPsCheckChain, masqueradingChain, markMasqChain} { - ensureChain(chain, tx, createdChains) + ensureChain(chain, tx, createdChains, false) } // Add the rules in the mark-for-masquerade and masquerading chains @@ -639,7 +660,7 @@ func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) { Comment: ptr.To("destinations that are subject to LoadBalancerSourceRanges"), }) - ensureChain(firewallCheckChain, tx, createdChains) + ensureChain(firewallCheckChain, tx, createdChains, false) tx.Add(&knftables.Rule{ Chain: firewallCheckChain, Rule: knftables.Concat( @@ -686,6 +707,14 @@ func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) { ), }) } + + // flush containers + proxier.clusterIPs.reset(tx) + proxier.serviceIPs.reset(tx) + proxier.firewallIPs.reset(tx) + proxier.noEndpointServices.reset(tx) + proxier.noEndpointNodePorts.reset(tx) + proxier.serviceNodePorts.reset(tx) } // CleanupLeftovers removes all nftables rules and chains created by the Proxier @@ -830,6 +859,7 @@ func (proxier *Proxier) OnNodeAdd(node *v1.Node) { for k, v := range node.Labels { proxier.nodeLabels[k] = v } + proxier.needFullSync = true proxier.mu.Unlock() proxier.logger.V(4).Info("Updated proxier node labels", "labels", node.Labels) @@ -854,6 +884,7 @@ func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) { for k, v := range node.Labels { proxier.nodeLabels[k] = v } + proxier.needFullSync = true proxier.mu.Unlock() proxier.logger.V(4).Info("Updated proxier node labels", "labels", node.Labels) @@ -871,6 +902,7 @@ func (proxier *Proxier) OnNodeDelete(node *v1.Node) { proxier.mu.Lock() proxier.nodeLabels = nil + proxier.needFullSync = true proxier.mu.Unlock() proxier.Sync() @@ -1015,6 +1047,96 @@ func isAffinitySetName(set string) bool { return strings.HasPrefix(set, servicePortEndpointAffinityNamePrefix) } +// nftElementStorage is an internal representation of nftables map or set. +type nftElementStorage struct { + elements map[string]string + leftoverKeys sets.Set[string] + containerType string + containerName string +} + +// joinNFTSlice converts nft element key or value (type []string) to string to store in the nftElementStorage. +// The separator is the same as the one used by nft commands, so we know that the parsing is going to be unambiguous. +func joinNFTSlice(k []string) string { + return strings.Join(k, " . ") +} + +// splitNFTSlice converts nftElementStorage key or value string representation back to slice. +func splitNFTSlice(k string) []string { + return strings.Split(k, " . ") +} + +// newNFTElementStorage creates an empty nftElementStorage. +// nftElementStorage.reset() must be called before the first usage. +func newNFTElementStorage(containerType, containerName string) *nftElementStorage { + c := &nftElementStorage{ + elements: make(map[string]string), + leftoverKeys: sets.New[string](), + containerType: containerType, + containerName: containerName, + } + return c +} + +// reset clears the internal state and flushes the nftables map/set. +func (s *nftElementStorage) reset(tx *knftables.Transaction) { + clear(s.elements) + if s.containerType == "set" { + tx.Flush(&knftables.Set{ + Name: s.containerName, + }) + } else { + tx.Flush(&knftables.Map{ + Name: s.containerName, + }) + } + s.resetLeftoverKeys() +} + +// resetLeftoverKeys is only called internally by nftElementStorage methods. +func (s *nftElementStorage) resetLeftoverKeys() { + clear(s.leftoverKeys) + for key := range s.elements { + s.leftoverKeys.Insert(key) + } +} + +// ensureElem adds elem to the transaction if elem is not present in the container, and updates internal +// leftoverKeys set to track unused elements. +func (s *nftElementStorage) ensureElem(tx *knftables.Transaction, elem *knftables.Element) { + newKey := joinNFTSlice(elem.Key) + newValue := joinNFTSlice(elem.Value) + existingValue, exists := s.elements[newKey] + if exists { + if existingValue != newValue { + // value is different, delete and re-add + tx.Delete(elem) + tx.Add(elem) + s.elements[newKey] = newValue + } + delete(s.leftoverKeys, newKey) + } else { + tx.Add(elem) + s.elements[newKey] = newValue + } +} + +func (s *nftElementStorage) cleanupLeftoverKeys(tx *knftables.Transaction) { + for key := range s.leftoverKeys { + e := &knftables.Element{ + Key: splitNFTSlice(key), + } + if s.containerType == "set" { + e.Set = s.containerName + } else { + e.Map = s.containerName + } + tx.Delete(e) + delete(s.elements, key) + } + s.resetLeftoverKeys() +} + // This is where all of the nftables calls happen. // This assumes proxier.mu is NOT held func (proxier *Proxier) syncProxyRules() { @@ -1031,10 +1153,19 @@ func (proxier *Proxier) syncProxyRules() { // Below this point we will not return until we try to write the nftables rules. // + // The value of proxier.needFullSync may change before the defer funcs run, so + // we need to keep track of whether it was set at the *start* of the sync. + tryPartialSync := !proxier.needFullSync + // Keep track of how long syncs take. start := time.Now() defer func() { metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start)) + if tryPartialSync { + metrics.SyncPartialProxyRulesLatency.Observe(metrics.SinceInSeconds(start)) + } else { + metrics.SyncFullProxyRulesLatency.Observe(metrics.SinceInSeconds(start)) + } proxier.logger.V(2).Info("SyncProxyRules complete", "elapsed", time.Since(start)) }() @@ -1048,6 +1179,10 @@ func (proxier *Proxier) syncProxyRules() { if !success { proxier.logger.Info("Sync failed", "retryingTime", proxier.syncPeriod) proxier.syncRunner.RetryAfter(proxier.syncPeriod) + // proxier.serviceChanges and proxier.endpointChanges have already + // been flushed, so we've lost the state needed to be able to do + // a partial sync. + proxier.needFullSync = true } }() @@ -1081,7 +1216,9 @@ func (proxier *Proxier) syncProxyRules() { // Now start the actual syncing transaction tx := proxier.nftables.NewTransaction() - proxier.setupNFTables(tx) + if !tryPartialSync { + proxier.setupNFTables(tx) + } // We need to use, eg, "ip daddr" for IPv4 but "ip6 daddr" for IPv6 ipX := "ip" @@ -1091,26 +1228,6 @@ func (proxier *Proxier) syncProxyRules() { ipvX_addr = "ipv6_addr" } - // We currently fully-rebuild our sets and maps on each resync - tx.Flush(&knftables.Set{ - Name: clusterIPsSet, - }) - tx.Flush(&knftables.Map{ - Name: firewallIPsMap, - }) - tx.Flush(&knftables.Map{ - Name: noEndpointServicesMap, - }) - tx.Flush(&knftables.Map{ - Name: noEndpointNodePortsMap, - }) - tx.Flush(&knftables.Map{ - Name: serviceIPsMap, - }) - tx.Flush(&knftables.Map{ - Name: serviceNodePortsMap, - }) - // Accumulate service/endpoint chains and affinity sets to keep. activeChains := sets.New[string]() activeAffinitySets := sets.New[string]() @@ -1134,6 +1251,7 @@ func (proxier *Proxier) syncProxyRules() { proxier.logger.Error(nil, "Failed to cast serviceInfo", "serviceName", svcName) continue } + protocol := strings.ToLower(string(svcInfo.Protocol())) svcPortNameString := svcInfo.nameString @@ -1144,10 +1262,20 @@ func (proxier *Proxier) syncProxyRules() { allEndpoints := proxier.endpointsMap[svcName] clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeLabels) + // skipServiceUpdate is used for all service-related chains and their elements. + // If no changes were done to the service or its endpoints, these objects may be skipped. + skipServiceUpdate := tryPartialSync && + !serviceUpdateResult.UpdatedServices.Has(svcName.NamespacedName) && + !endpointUpdateResult.UpdatedServices.Has(svcName.NamespacedName) + // Note the endpoint chains that will be used for _, ep := range allLocallyReachableEndpoints { if epInfo, ok := ep.(*endpointInfo); ok { - ensureChain(epInfo.chainName, tx, activeChains) + ensureChain(epInfo.chainName, tx, activeChains, skipServiceUpdate) + // Note the affinity sets that will be used + if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { + activeAffinitySets.Insert(epInfo.affinitySetName) + } } } @@ -1155,14 +1283,14 @@ func (proxier *Proxier) syncProxyRules() { clusterPolicyChain := svcInfo.clusterPolicyChainName usesClusterPolicyChain := len(clusterEndpoints) > 0 && svcInfo.UsesClusterEndpoints() if usesClusterPolicyChain { - ensureChain(clusterPolicyChain, tx, activeChains) + ensureChain(clusterPolicyChain, tx, activeChains, skipServiceUpdate) } // localPolicyChain contains the endpoints used with "Local" traffic policy localPolicyChain := svcInfo.localPolicyChainName usesLocalPolicyChain := len(localEndpoints) > 0 && svcInfo.UsesLocalEndpoints() if usesLocalPolicyChain { - ensureChain(localPolicyChain, tx, activeChains) + ensureChain(localPolicyChain, tx, activeChains, skipServiceUpdate) } // internalPolicyChain is the chain containing the endpoints for @@ -1204,7 +1332,7 @@ func (proxier *Proxier) syncProxyRules() { // are no externally-usable endpoints. usesExternalTrafficChain := hasEndpoints && svcInfo.ExternallyAccessible() if usesExternalTrafficChain { - ensureChain(externalTrafficChain, tx, activeChains) + ensureChain(externalTrafficChain, tx, activeChains, skipServiceUpdate) } var internalTrafficFilterVerdict, externalTrafficFilterVerdict string @@ -1235,12 +1363,12 @@ func (proxier *Proxier) syncProxyRules() { } // Capture the clusterIP. - tx.Add(&knftables.Element{ + proxier.clusterIPs.ensureElem(tx, &knftables.Element{ Set: clusterIPsSet, Key: []string{svcInfo.ClusterIP().String()}, }) if hasInternalEndpoints { - tx.Add(&knftables.Element{ + proxier.serviceIPs.ensureElem(tx, &knftables.Element{ Map: serviceIPsMap, Key: []string{ svcInfo.ClusterIP().String(), @@ -1253,7 +1381,7 @@ func (proxier *Proxier) syncProxyRules() { }) } else { // No endpoints. - tx.Add(&knftables.Element{ + proxier.noEndpointServices.ensureElem(tx, &knftables.Element{ Map: noEndpointServicesMap, Key: []string{ svcInfo.ClusterIP().String(), @@ -1272,7 +1400,7 @@ func (proxier *Proxier) syncProxyRules() { if hasEndpoints { // Send traffic bound for external IPs to the "external // destinations" chain. - tx.Add(&knftables.Element{ + proxier.serviceIPs.ensureElem(tx, &knftables.Element{ Map: serviceIPsMap, Key: []string{ externalIP.String(), @@ -1288,7 +1416,7 @@ func (proxier *Proxier) syncProxyRules() { // Either no endpoints at all (REJECT) or no endpoints for // external traffic (DROP anything that didn't get // short-circuited by the EXT chain.) - tx.Add(&knftables.Element{ + proxier.noEndpointServices.ensureElem(tx, &knftables.Element{ Map: noEndpointServicesMap, Key: []string{ externalIP.String(), @@ -1306,41 +1434,13 @@ func (proxier *Proxier) syncProxyRules() { usesFWChain := len(svcInfo.LoadBalancerVIPs()) > 0 && len(svcInfo.LoadBalancerSourceRanges()) > 0 fwChain := svcInfo.firewallChainName if usesFWChain { - ensureChain(fwChain, tx, activeChains) - var sources []string - allowFromNode := false - for _, cidr := range svcInfo.LoadBalancerSourceRanges() { - if len(sources) > 0 { - sources = append(sources, ",") - } - sources = append(sources, cidr.String()) - if cidr.Contains(proxier.nodeIP) { - allowFromNode = true - } - } - // For VIP-like LBs, the VIP is often added as a local - // address (via an IP route rule). In that case, a request - // from a node to the VIP will not hit the loadbalancer but - // will loop back with the source IP set to the VIP. We - // need the following rules to allow requests from this node. - if allowFromNode { - for _, lbip := range svcInfo.LoadBalancerVIPs() { - sources = append(sources, ",", lbip.String()) - } - } - tx.Add(&knftables.Rule{ - Chain: fwChain, - Rule: knftables.Concat( - ipX, "saddr", "!=", "{", sources, "}", - "drop", - ), - }) + ensureChain(fwChain, tx, activeChains, skipServiceUpdate) } // Capture load-balancer ingress. for _, lbip := range svcInfo.LoadBalancerVIPs() { if hasEndpoints { - tx.Add(&knftables.Element{ + proxier.serviceIPs.ensureElem(tx, &knftables.Element{ Map: serviceIPsMap, Key: []string{ lbip.String(), @@ -1354,7 +1454,7 @@ func (proxier *Proxier) syncProxyRules() { } if usesFWChain { - tx.Add(&knftables.Element{ + proxier.firewallIPs.ensureElem(tx, &knftables.Element{ Map: firewallIPsMap, Key: []string{ lbip.String(), @@ -1372,7 +1472,7 @@ func (proxier *Proxier) syncProxyRules() { // external traffic (DROP anything that didn't get short-circuited // by the EXT chain.) for _, lbip := range svcInfo.LoadBalancerVIPs() { - tx.Add(&knftables.Element{ + proxier.noEndpointServices.ensureElem(tx, &knftables.Element{ Map: noEndpointServicesMap, Key: []string{ lbip.String(), @@ -1393,7 +1493,7 @@ func (proxier *Proxier) syncProxyRules() { // Jump to the external destination chain. For better or for // worse, nodeports are not subject to loadBalancerSourceRanges, // and we can't change that. - tx.Add(&knftables.Element{ + proxier.serviceNodePorts.ensureElem(tx, &knftables.Element{ Map: serviceNodePortsMap, Key: []string{ protocol, @@ -1408,7 +1508,7 @@ func (proxier *Proxier) syncProxyRules() { // Either no endpoints at all (REJECT) or no endpoints for // external traffic (DROP anything that didn't get // short-circuited by the EXT chain.) - tx.Add(&knftables.Element{ + proxier.noEndpointNodePorts.ensureElem(tx, &knftables.Element{ Map: noEndpointNodePortsMap, Key: []string{ protocol, @@ -1422,6 +1522,12 @@ func (proxier *Proxier) syncProxyRules() { } } + // All the following operations are service-chain related and may be skipped if no svc or endpoint + // changes are required. + if skipServiceUpdate { + continue + } + // Set up internal traffic handling. if hasInternalEndpoints { if proxier.masqueradeAll { @@ -1520,6 +1626,37 @@ func (proxier *Proxier) syncProxyRules() { } } + if usesFWChain { + var sources []string + allowFromNode := false + for _, cidr := range svcInfo.LoadBalancerSourceRanges() { + if len(sources) > 0 { + sources = append(sources, ",") + } + sources = append(sources, cidr.String()) + if cidr.Contains(proxier.nodeIP) { + allowFromNode = true + } + } + // For VIP-like LBs, the VIP is often added as a local + // address (via an IP route rule). In that case, a request + // from a node to the VIP will not hit the loadbalancer but + // will loop back with the source IP set to the VIP. We + // need the following rules to allow requests from this node. + if allowFromNode { + for _, lbip := range svcInfo.LoadBalancerVIPs() { + sources = append(sources, ",", lbip.String()) + } + } + tx.Add(&knftables.Rule{ + Chain: fwChain, + Rule: knftables.Concat( + ipX, "saddr", "!=", "{", sources, "}", + "drop", + ), + }) + } + if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { // Generate the per-endpoint affinity sets for _, ep := range allLocallyReachableEndpoints { @@ -1553,7 +1690,6 @@ func (proxier *Proxier) syncProxyRules() { }, Timeout: ptr.To(time.Duration(svcInfo.StickyMaxAgeSeconds()) * time.Second), }) - activeAffinitySets.Insert(epInfo.affinitySetName) } } @@ -1648,6 +1784,13 @@ func (proxier *Proxier) syncProxyRules() { proxier.logger.Error(err, "Failed to list nftables sets: stale affinity sets will not be deleted") } + proxier.clusterIPs.cleanupLeftoverKeys(tx) + proxier.serviceIPs.cleanupLeftoverKeys(tx) + proxier.firewallIPs.cleanupLeftoverKeys(tx) + proxier.noEndpointServices.cleanupLeftoverKeys(tx) + proxier.noEndpointNodePorts.cleanupLeftoverKeys(tx) + proxier.serviceNodePorts.cleanupLeftoverKeys(tx) + // Sync rules. proxier.logger.V(2).Info("Reloading service nftables data", "numServices", len(proxier.svcPortMap), @@ -1669,6 +1812,7 @@ func (proxier *Proxier) syncProxyRules() { return } success = true + proxier.needFullSync = false for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes { for _, lastChangeTriggerTime := range lastChangeTriggerTimes { diff --git a/pkg/proxy/nftables/proxier_test.go b/pkg/proxy/nftables/proxier_test.go index 937fcaaf1f0..64762ca1f29 100644 --- a/pkg/proxy/nftables/proxier_test.go +++ b/pkg/proxy/nftables/proxier_test.go @@ -119,6 +119,7 @@ func NewFakeProxier(ipFamily v1.IPFamily) (*knftables.Fake, *Proxier) { serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, nil, nil), endpointsMap: make(proxy.EndpointsMap), endpointsChanges: proxy.NewEndpointsChangeTracker(testHostname, newEndpointInfo, ipFamily, nil, nil), + needFullSync: true, nftables: nft, masqueradeMark: "0x4000", conntrack: conntrack.NewFake(), @@ -130,6 +131,12 @@ func NewFakeProxier(ipFamily v1.IPFamily) (*knftables.Fake, *Proxier) { networkInterfacer: networkInterfacer, staleChains: make(map[string]time.Time), serviceCIDRs: serviceCIDRs, + clusterIPs: newNFTElementStorage("set", clusterIPsSet), + serviceIPs: newNFTElementStorage("map", serviceIPsMap), + firewallIPs: newNFTElementStorage("map", firewallIPsMap), + noEndpointServices: newNFTElementStorage("map", noEndpointServicesMap), + noEndpointNodePorts: newNFTElementStorage("map", noEndpointNodePortsMap), + serviceNodePorts: newNFTElementStorage("map", serviceNodePortsMap), } p.setInitialized(true) p.syncRunner = async.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1)