diff --git a/pkg/proxy/metrics/metrics.go b/pkg/proxy/metrics/metrics.go index 55696e147fb..96b4f42487f 100644 --- a/pkg/proxy/metrics/metrics.go +++ b/pkg/proxy/metrics/metrics.go @@ -316,6 +316,8 @@ func RegisterMetrics(mode kubeproxyconfig.ProxyMode) { legacyregistry.MustRegister(IPTablesRestoreFailuresTotal) case kubeproxyconfig.ProxyModeNFTables: + legacyregistry.MustRegister(SyncFullProxyRulesLatency) + legacyregistry.MustRegister(SyncPartialProxyRulesLatency) legacyregistry.MustRegister(NFTablesSyncFailuresTotal) legacyregistry.MustRegister(NFTablesCleanupFailuresTotal) diff --git a/pkg/proxy/nftables/proxier.go b/pkg/proxy/nftables/proxier.go index c4b9aa803fa..c770be081dd 100644 --- a/pkg/proxy/nftables/proxier.go +++ b/pkg/proxy/nftables/proxier.go @@ -162,6 +162,7 @@ type Proxier struct { // updating nftables with some partial data after kube-proxy restart. endpointSlicesSynced bool servicesSynced bool + needFullSync bool initialized int32 syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules syncPeriod time.Duration @@ -194,6 +195,13 @@ type Proxier struct { serviceCIDRs string logger klog.Logger + + clusterIPs *nftElementStorage + serviceIPs *nftElementStorage + firewallIPs *nftElementStorage + noEndpointServices *nftElementStorage + noEndpointNodePorts *nftElementStorage + serviceNodePorts *nftElementStorage } // Proxier implements proxy.Provider @@ -243,6 +251,7 @@ func NewProxier(ctx context.Context, serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), endpointsMap: make(proxy.EndpointsMap), endpointsChanges: proxy.NewEndpointsChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil), + needFullSync: true, syncPeriod: syncPeriod, nftables: nft, masqueradeAll: masqueradeAll, @@ -258,11 +267,18 @@ func NewProxier(ctx context.Context, networkInterfacer: proxyutil.RealNetwork{}, staleChains: make(map[string]time.Time), logger: logger, + clusterIPs: newNFTElementStorage("set", clusterIPsSet), + serviceIPs: newNFTElementStorage("map", serviceIPsMap), + firewallIPs: newNFTElementStorage("map", firewallIPsMap), + noEndpointServices: newNFTElementStorage("map", noEndpointServicesMap), + noEndpointNodePorts: newNFTElementStorage("map", noEndpointNodePortsMap), + serviceNodePorts: newNFTElementStorage("map", serviceNodePortsMap), } burstSyncs := 2 logger.V(2).Info("NFTables sync params", "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs) - proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs) + // We need to pass *some* maxInterval to NewBoundedFrequencyRunner. time.Hour is arbitrary. + proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, time.Hour, burstSyncs) return proxier, nil } @@ -421,17 +437,22 @@ var nftablesJumpChains = []nftablesJumpChain{ // ensureChain adds commands to tx to ensure that chain exists and doesn't contain // anything from before this transaction (using createdChains to ensure that we don't // Flush a chain more than once and lose *new* rules as well.) -func ensureChain(chain string, tx *knftables.Transaction, createdChains sets.Set[string]) { +// If skipCreation is true, chain will not be added to the transaction, but will be added to the createdChains +// for proper cleanup in the end of the sync iteration. +func ensureChain(chain string, tx *knftables.Transaction, createdChains sets.Set[string], skipCreation bool) { if createdChains.Has(chain) { return } + createdChains.Insert(chain) + if skipCreation { + return + } tx.Add(&knftables.Chain{ Name: chain, }) tx.Flush(&knftables.Chain{ Name: chain, }) - createdChains.Insert(chain) } func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) { @@ -477,7 +498,7 @@ func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) { // Create and flush ordinary chains and add rules jumping to them createdChains := sets.New[string]() for _, c := range nftablesJumpChains { - ensureChain(c.dstChain, tx, createdChains) + ensureChain(c.dstChain, tx, createdChains, false) tx.Add(&knftables.Rule{ Chain: c.srcChain, Rule: knftables.Concat( @@ -489,7 +510,7 @@ func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) { // Ensure all of our other "top-level" chains exist for _, chain := range []string{servicesChain, clusterIPsCheckChain, masqueradingChain, markMasqChain} { - ensureChain(chain, tx, createdChains) + ensureChain(chain, tx, createdChains, false) } // Add the rules in the mark-for-masquerade and masquerading chains @@ -639,7 +660,7 @@ func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) { Comment: ptr.To("destinations that are subject to LoadBalancerSourceRanges"), }) - ensureChain(firewallCheckChain, tx, createdChains) + ensureChain(firewallCheckChain, tx, createdChains, false) tx.Add(&knftables.Rule{ Chain: firewallCheckChain, Rule: knftables.Concat( @@ -686,6 +707,14 @@ func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) { ), }) } + + // flush containers + proxier.clusterIPs.reset(tx) + proxier.serviceIPs.reset(tx) + proxier.firewallIPs.reset(tx) + proxier.noEndpointServices.reset(tx) + proxier.noEndpointNodePorts.reset(tx) + proxier.serviceNodePorts.reset(tx) } // CleanupLeftovers removes all nftables rules and chains created by the Proxier @@ -830,6 +859,7 @@ func (proxier *Proxier) OnNodeAdd(node *v1.Node) { for k, v := range node.Labels { proxier.nodeLabels[k] = v } + proxier.needFullSync = true proxier.mu.Unlock() proxier.logger.V(4).Info("Updated proxier node labels", "labels", node.Labels) @@ -854,6 +884,7 @@ func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) { for k, v := range node.Labels { proxier.nodeLabels[k] = v } + proxier.needFullSync = true proxier.mu.Unlock() proxier.logger.V(4).Info("Updated proxier node labels", "labels", node.Labels) @@ -871,6 +902,7 @@ func (proxier *Proxier) OnNodeDelete(node *v1.Node) { proxier.mu.Lock() proxier.nodeLabels = nil + proxier.needFullSync = true proxier.mu.Unlock() proxier.Sync() @@ -1015,6 +1047,96 @@ func isAffinitySetName(set string) bool { return strings.HasPrefix(set, servicePortEndpointAffinityNamePrefix) } +// nftElementStorage is an internal representation of nftables map or set. +type nftElementStorage struct { + elements map[string]string + leftoverKeys sets.Set[string] + containerType string + containerName string +} + +// joinNFTSlice converts nft element key or value (type []string) to string to store in the nftElementStorage. +// The separator is the same as the one used by nft commands, so we know that the parsing is going to be unambiguous. +func joinNFTSlice(k []string) string { + return strings.Join(k, " . ") +} + +// splitNFTSlice converts nftElementStorage key or value string representation back to slice. +func splitNFTSlice(k string) []string { + return strings.Split(k, " . ") +} + +// newNFTElementStorage creates an empty nftElementStorage. +// nftElementStorage.reset() must be called before the first usage. +func newNFTElementStorage(containerType, containerName string) *nftElementStorage { + c := &nftElementStorage{ + elements: make(map[string]string), + leftoverKeys: sets.New[string](), + containerType: containerType, + containerName: containerName, + } + return c +} + +// reset clears the internal state and flushes the nftables map/set. +func (s *nftElementStorage) reset(tx *knftables.Transaction) { + clear(s.elements) + if s.containerType == "set" { + tx.Flush(&knftables.Set{ + Name: s.containerName, + }) + } else { + tx.Flush(&knftables.Map{ + Name: s.containerName, + }) + } + s.resetLeftoverKeys() +} + +// resetLeftoverKeys is only called internally by nftElementStorage methods. +func (s *nftElementStorage) resetLeftoverKeys() { + clear(s.leftoverKeys) + for key := range s.elements { + s.leftoverKeys.Insert(key) + } +} + +// ensureElem adds elem to the transaction if elem is not present in the container, and updates internal +// leftoverKeys set to track unused elements. +func (s *nftElementStorage) ensureElem(tx *knftables.Transaction, elem *knftables.Element) { + newKey := joinNFTSlice(elem.Key) + newValue := joinNFTSlice(elem.Value) + existingValue, exists := s.elements[newKey] + if exists { + if existingValue != newValue { + // value is different, delete and re-add + tx.Delete(elem) + tx.Add(elem) + s.elements[newKey] = newValue + } + delete(s.leftoverKeys, newKey) + } else { + tx.Add(elem) + s.elements[newKey] = newValue + } +} + +func (s *nftElementStorage) cleanupLeftoverKeys(tx *knftables.Transaction) { + for key := range s.leftoverKeys { + e := &knftables.Element{ + Key: splitNFTSlice(key), + } + if s.containerType == "set" { + e.Set = s.containerName + } else { + e.Map = s.containerName + } + tx.Delete(e) + delete(s.elements, key) + } + s.resetLeftoverKeys() +} + // This is where all of the nftables calls happen. // This assumes proxier.mu is NOT held func (proxier *Proxier) syncProxyRules() { @@ -1031,10 +1153,19 @@ func (proxier *Proxier) syncProxyRules() { // Below this point we will not return until we try to write the nftables rules. // + // The value of proxier.needFullSync may change before the defer funcs run, so + // we need to keep track of whether it was set at the *start* of the sync. + tryPartialSync := !proxier.needFullSync + // Keep track of how long syncs take. start := time.Now() defer func() { metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start)) + if tryPartialSync { + metrics.SyncPartialProxyRulesLatency.Observe(metrics.SinceInSeconds(start)) + } else { + metrics.SyncFullProxyRulesLatency.Observe(metrics.SinceInSeconds(start)) + } proxier.logger.V(2).Info("SyncProxyRules complete", "elapsed", time.Since(start)) }() @@ -1048,6 +1179,10 @@ func (proxier *Proxier) syncProxyRules() { if !success { proxier.logger.Info("Sync failed", "retryingTime", proxier.syncPeriod) proxier.syncRunner.RetryAfter(proxier.syncPeriod) + // proxier.serviceChanges and proxier.endpointChanges have already + // been flushed, so we've lost the state needed to be able to do + // a partial sync. + proxier.needFullSync = true } }() @@ -1081,7 +1216,9 @@ func (proxier *Proxier) syncProxyRules() { // Now start the actual syncing transaction tx := proxier.nftables.NewTransaction() - proxier.setupNFTables(tx) + if !tryPartialSync { + proxier.setupNFTables(tx) + } // We need to use, eg, "ip daddr" for IPv4 but "ip6 daddr" for IPv6 ipX := "ip" @@ -1091,26 +1228,6 @@ func (proxier *Proxier) syncProxyRules() { ipvX_addr = "ipv6_addr" } - // We currently fully-rebuild our sets and maps on each resync - tx.Flush(&knftables.Set{ - Name: clusterIPsSet, - }) - tx.Flush(&knftables.Map{ - Name: firewallIPsMap, - }) - tx.Flush(&knftables.Map{ - Name: noEndpointServicesMap, - }) - tx.Flush(&knftables.Map{ - Name: noEndpointNodePortsMap, - }) - tx.Flush(&knftables.Map{ - Name: serviceIPsMap, - }) - tx.Flush(&knftables.Map{ - Name: serviceNodePortsMap, - }) - // Accumulate service/endpoint chains and affinity sets to keep. activeChains := sets.New[string]() activeAffinitySets := sets.New[string]() @@ -1134,6 +1251,7 @@ func (proxier *Proxier) syncProxyRules() { proxier.logger.Error(nil, "Failed to cast serviceInfo", "serviceName", svcName) continue } + protocol := strings.ToLower(string(svcInfo.Protocol())) svcPortNameString := svcInfo.nameString @@ -1144,10 +1262,20 @@ func (proxier *Proxier) syncProxyRules() { allEndpoints := proxier.endpointsMap[svcName] clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeLabels) + // skipServiceUpdate is used for all service-related chains and their elements. + // If no changes were done to the service or its endpoints, these objects may be skipped. + skipServiceUpdate := tryPartialSync && + !serviceUpdateResult.UpdatedServices.Has(svcName.NamespacedName) && + !endpointUpdateResult.UpdatedServices.Has(svcName.NamespacedName) + // Note the endpoint chains that will be used for _, ep := range allLocallyReachableEndpoints { if epInfo, ok := ep.(*endpointInfo); ok { - ensureChain(epInfo.chainName, tx, activeChains) + ensureChain(epInfo.chainName, tx, activeChains, skipServiceUpdate) + // Note the affinity sets that will be used + if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { + activeAffinitySets.Insert(epInfo.affinitySetName) + } } } @@ -1155,14 +1283,14 @@ func (proxier *Proxier) syncProxyRules() { clusterPolicyChain := svcInfo.clusterPolicyChainName usesClusterPolicyChain := len(clusterEndpoints) > 0 && svcInfo.UsesClusterEndpoints() if usesClusterPolicyChain { - ensureChain(clusterPolicyChain, tx, activeChains) + ensureChain(clusterPolicyChain, tx, activeChains, skipServiceUpdate) } // localPolicyChain contains the endpoints used with "Local" traffic policy localPolicyChain := svcInfo.localPolicyChainName usesLocalPolicyChain := len(localEndpoints) > 0 && svcInfo.UsesLocalEndpoints() if usesLocalPolicyChain { - ensureChain(localPolicyChain, tx, activeChains) + ensureChain(localPolicyChain, tx, activeChains, skipServiceUpdate) } // internalPolicyChain is the chain containing the endpoints for @@ -1204,7 +1332,7 @@ func (proxier *Proxier) syncProxyRules() { // are no externally-usable endpoints. usesExternalTrafficChain := hasEndpoints && svcInfo.ExternallyAccessible() if usesExternalTrafficChain { - ensureChain(externalTrafficChain, tx, activeChains) + ensureChain(externalTrafficChain, tx, activeChains, skipServiceUpdate) } var internalTrafficFilterVerdict, externalTrafficFilterVerdict string @@ -1235,12 +1363,12 @@ func (proxier *Proxier) syncProxyRules() { } // Capture the clusterIP. - tx.Add(&knftables.Element{ + proxier.clusterIPs.ensureElem(tx, &knftables.Element{ Set: clusterIPsSet, Key: []string{svcInfo.ClusterIP().String()}, }) if hasInternalEndpoints { - tx.Add(&knftables.Element{ + proxier.serviceIPs.ensureElem(tx, &knftables.Element{ Map: serviceIPsMap, Key: []string{ svcInfo.ClusterIP().String(), @@ -1253,7 +1381,7 @@ func (proxier *Proxier) syncProxyRules() { }) } else { // No endpoints. - tx.Add(&knftables.Element{ + proxier.noEndpointServices.ensureElem(tx, &knftables.Element{ Map: noEndpointServicesMap, Key: []string{ svcInfo.ClusterIP().String(), @@ -1272,7 +1400,7 @@ func (proxier *Proxier) syncProxyRules() { if hasEndpoints { // Send traffic bound for external IPs to the "external // destinations" chain. - tx.Add(&knftables.Element{ + proxier.serviceIPs.ensureElem(tx, &knftables.Element{ Map: serviceIPsMap, Key: []string{ externalIP.String(), @@ -1288,7 +1416,7 @@ func (proxier *Proxier) syncProxyRules() { // Either no endpoints at all (REJECT) or no endpoints for // external traffic (DROP anything that didn't get // short-circuited by the EXT chain.) - tx.Add(&knftables.Element{ + proxier.noEndpointServices.ensureElem(tx, &knftables.Element{ Map: noEndpointServicesMap, Key: []string{ externalIP.String(), @@ -1306,41 +1434,13 @@ func (proxier *Proxier) syncProxyRules() { usesFWChain := len(svcInfo.LoadBalancerVIPs()) > 0 && len(svcInfo.LoadBalancerSourceRanges()) > 0 fwChain := svcInfo.firewallChainName if usesFWChain { - ensureChain(fwChain, tx, activeChains) - var sources []string - allowFromNode := false - for _, cidr := range svcInfo.LoadBalancerSourceRanges() { - if len(sources) > 0 { - sources = append(sources, ",") - } - sources = append(sources, cidr.String()) - if cidr.Contains(proxier.nodeIP) { - allowFromNode = true - } - } - // For VIP-like LBs, the VIP is often added as a local - // address (via an IP route rule). In that case, a request - // from a node to the VIP will not hit the loadbalancer but - // will loop back with the source IP set to the VIP. We - // need the following rules to allow requests from this node. - if allowFromNode { - for _, lbip := range svcInfo.LoadBalancerVIPs() { - sources = append(sources, ",", lbip.String()) - } - } - tx.Add(&knftables.Rule{ - Chain: fwChain, - Rule: knftables.Concat( - ipX, "saddr", "!=", "{", sources, "}", - "drop", - ), - }) + ensureChain(fwChain, tx, activeChains, skipServiceUpdate) } // Capture load-balancer ingress. for _, lbip := range svcInfo.LoadBalancerVIPs() { if hasEndpoints { - tx.Add(&knftables.Element{ + proxier.serviceIPs.ensureElem(tx, &knftables.Element{ Map: serviceIPsMap, Key: []string{ lbip.String(), @@ -1354,7 +1454,7 @@ func (proxier *Proxier) syncProxyRules() { } if usesFWChain { - tx.Add(&knftables.Element{ + proxier.firewallIPs.ensureElem(tx, &knftables.Element{ Map: firewallIPsMap, Key: []string{ lbip.String(), @@ -1372,7 +1472,7 @@ func (proxier *Proxier) syncProxyRules() { // external traffic (DROP anything that didn't get short-circuited // by the EXT chain.) for _, lbip := range svcInfo.LoadBalancerVIPs() { - tx.Add(&knftables.Element{ + proxier.noEndpointServices.ensureElem(tx, &knftables.Element{ Map: noEndpointServicesMap, Key: []string{ lbip.String(), @@ -1391,9 +1491,9 @@ func (proxier *Proxier) syncProxyRules() { if svcInfo.NodePort() != 0 { if hasEndpoints { // Jump to the external destination chain. For better or for - // worse, nodeports are not subect to loadBalancerSourceRanges, + // worse, nodeports are not subject to loadBalancerSourceRanges, // and we can't change that. - tx.Add(&knftables.Element{ + proxier.serviceNodePorts.ensureElem(tx, &knftables.Element{ Map: serviceNodePortsMap, Key: []string{ protocol, @@ -1408,7 +1508,7 @@ func (proxier *Proxier) syncProxyRules() { // Either no endpoints at all (REJECT) or no endpoints for // external traffic (DROP anything that didn't get // short-circuited by the EXT chain.) - tx.Add(&knftables.Element{ + proxier.noEndpointNodePorts.ensureElem(tx, &knftables.Element{ Map: noEndpointNodePortsMap, Key: []string{ protocol, @@ -1422,6 +1522,12 @@ func (proxier *Proxier) syncProxyRules() { } } + // All the following operations are service-chain related and may be skipped if no svc or endpoint + // changes are required. + if skipServiceUpdate { + continue + } + // Set up internal traffic handling. if hasInternalEndpoints { if proxier.masqueradeAll { @@ -1520,6 +1626,37 @@ func (proxier *Proxier) syncProxyRules() { } } + if usesFWChain { + var sources []string + allowFromNode := false + for _, cidr := range svcInfo.LoadBalancerSourceRanges() { + if len(sources) > 0 { + sources = append(sources, ",") + } + sources = append(sources, cidr.String()) + if cidr.Contains(proxier.nodeIP) { + allowFromNode = true + } + } + // For VIP-like LBs, the VIP is often added as a local + // address (via an IP route rule). In that case, a request + // from a node to the VIP will not hit the loadbalancer but + // will loop back with the source IP set to the VIP. We + // need the following rules to allow requests from this node. + if allowFromNode { + for _, lbip := range svcInfo.LoadBalancerVIPs() { + sources = append(sources, ",", lbip.String()) + } + } + tx.Add(&knftables.Rule{ + Chain: fwChain, + Rule: knftables.Concat( + ipX, "saddr", "!=", "{", sources, "}", + "drop", + ), + }) + } + if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { // Generate the per-endpoint affinity sets for _, ep := range allLocallyReachableEndpoints { @@ -1553,20 +1690,19 @@ func (proxier *Proxier) syncProxyRules() { }, Timeout: ptr.To(time.Duration(svcInfo.StickyMaxAgeSeconds()) * time.Second), }) - activeAffinitySets.Insert(epInfo.affinitySetName) } } // If Cluster policy is in use, create the chain and create rules jumping // from clusterPolicyChain to the clusterEndpoints if usesClusterPolicyChain { - proxier.writeServiceToEndpointRules(tx, svcPortNameString, svcInfo, clusterPolicyChain, clusterEndpoints) + proxier.writeServiceToEndpointRules(tx, svcInfo, clusterPolicyChain, clusterEndpoints) } // If Local policy is in use, create rules jumping from localPolicyChain // to the localEndpoints if usesLocalPolicyChain { - proxier.writeServiceToEndpointRules(tx, svcPortNameString, svcInfo, localPolicyChain, localEndpoints) + proxier.writeServiceToEndpointRules(tx, svcInfo, localPolicyChain, localEndpoints) } // Generate the per-endpoint chains @@ -1648,6 +1784,13 @@ func (proxier *Proxier) syncProxyRules() { proxier.logger.Error(err, "Failed to list nftables sets: stale affinity sets will not be deleted") } + proxier.clusterIPs.cleanupLeftoverKeys(tx) + proxier.serviceIPs.cleanupLeftoverKeys(tx) + proxier.firewallIPs.cleanupLeftoverKeys(tx) + proxier.noEndpointServices.cleanupLeftoverKeys(tx) + proxier.noEndpointNodePorts.cleanupLeftoverKeys(tx) + proxier.serviceNodePorts.cleanupLeftoverKeys(tx) + // Sync rules. proxier.logger.V(2).Info("Reloading service nftables data", "numServices", len(proxier.svcPortMap), @@ -1669,6 +1812,7 @@ func (proxier *Proxier) syncProxyRules() { return } success = true + proxier.needFullSync = false for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes { for _, lastChangeTriggerTime := range lastChangeTriggerTimes { @@ -1699,7 +1843,7 @@ func (proxier *Proxier) syncProxyRules() { conntrack.CleanStaleEntries(proxier.conntrack, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult) } -func (proxier *Proxier) writeServiceToEndpointRules(tx *knftables.Transaction, svcPortNameString string, svcInfo *servicePortInfo, svcChain string, endpoints []proxy.Endpoint) { +func (proxier *Proxier) writeServiceToEndpointRules(tx *knftables.Transaction, svcInfo *servicePortInfo, svcChain string, endpoints []proxy.Endpoint) { // First write session affinity rules, if applicable. if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { ipX := "ip" diff --git a/pkg/proxy/nftables/proxier_test.go b/pkg/proxy/nftables/proxier_test.go index 937fcaaf1f0..1d774cad065 100644 --- a/pkg/proxy/nftables/proxier_test.go +++ b/pkg/proxy/nftables/proxier_test.go @@ -119,6 +119,7 @@ func NewFakeProxier(ipFamily v1.IPFamily) (*knftables.Fake, *Proxier) { serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, nil, nil), endpointsMap: make(proxy.EndpointsMap), endpointsChanges: proxy.NewEndpointsChangeTracker(testHostname, newEndpointInfo, ipFamily, nil, nil), + needFullSync: true, nftables: nft, masqueradeMark: "0x4000", conntrack: conntrack.NewFake(), @@ -130,6 +131,12 @@ func NewFakeProxier(ipFamily v1.IPFamily) (*knftables.Fake, *Proxier) { networkInterfacer: networkInterfacer, staleChains: make(map[string]time.Time), serviceCIDRs: serviceCIDRs, + clusterIPs: newNFTElementStorage("set", clusterIPsSet), + serviceIPs: newNFTElementStorage("map", serviceIPsMap), + firewallIPs: newNFTElementStorage("map", firewallIPsMap), + noEndpointServices: newNFTElementStorage("map", noEndpointServicesMap), + noEndpointNodePorts: newNFTElementStorage("map", noEndpointNodePortsMap), + serviceNodePorts: newNFTElementStorage("map", serviceNodePortsMap), } p.setInitialized(true) p.syncRunner = async.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1) @@ -4082,6 +4089,12 @@ func TestSyncProxyRulesRepeated(t *testing.T) { add rule ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 meta l4proto tcp dnat to 10.0.3.1:80 `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) + // add 1 element to cluster-ips and service-ips = 2 operations + // add+flush 2 chains for service and endpoint, add 2 rules in each = 8 operations + // 10 operations total. + if nft.LastTransaction.NumOperations() != 10 { + t.Errorf("Expected 10 trasaction operations, got %d", nft.LastTransaction.NumOperations()) + } // Delete a service; its chains will be flushed, but not immediately deleted. fp.OnServiceDelete(svc2) @@ -4110,6 +4123,12 @@ func TestSyncProxyRulesRepeated(t *testing.T) { add rule ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 meta l4proto tcp dnat to 10.0.3.1:80 `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) + // delete 1 element from cluster-ips and service-ips = 2 operations + // flush 2 chains for service and endpoint = 2 operations + // 4 operations total. + if nft.LastTransaction.NumOperations() != 4 { + t.Errorf("Expected 4 trasaction operations, got %d", nft.LastTransaction.NumOperations()) + } // Fake the passage of time and confirm that the stale chains get deleted. ageStaleChains() @@ -4135,6 +4154,10 @@ func TestSyncProxyRulesRepeated(t *testing.T) { add rule ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 meta l4proto tcp dnat to 10.0.3.1:80 `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) + // delete stale chains happens in a separate transaction, nothing else changed => last transaction will have 0 operations. + if nft.LastTransaction.NumOperations() != 0 { + t.Errorf("Expected 0 trasaction operations, got %d", nft.LastTransaction.NumOperations()) + } // Add a service, sync, then add its endpoints. makeServiceMap(fp, @@ -4173,6 +4196,10 @@ func TestSyncProxyRulesRepeated(t *testing.T) { add element ip kube-proxy no-endpoint-services { 172.30.0.44 . tcp . 80 comment "ns4/svc4:p80" : goto reject-chain } `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) + // add 1 element to cluster-ips and no-endpoint-services = 2 operations + if nft.LastTransaction.NumOperations() != 2 { + t.Errorf("Expected 2 trasaction operations, got %d", nft.LastTransaction.NumOperations()) + } populateEndpointSlices(fp, makeTestEndpointSlice("ns4", "svc4", 1, func(eps *discovery.EndpointSlice) { @@ -4218,6 +4245,11 @@ func TestSyncProxyRulesRepeated(t *testing.T) { add rule ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 meta l4proto tcp dnat to 10.0.4.1:80 `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) + // add 1 element to service-ips, remove 1 element from no-endpoint-services = 2 operations + // add+flush 2 chains for service and endpoint, add 2 rules in each = 8 operations + if nft.LastTransaction.NumOperations() != 10 { + t.Errorf("Expected 10 trasaction operations, got %d", nft.LastTransaction.NumOperations()) + } // Change an endpoint of an existing service. eps3update := eps3.DeepCopy() @@ -4257,6 +4289,11 @@ func TestSyncProxyRulesRepeated(t *testing.T) { add rule ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 meta l4proto tcp dnat to 10.0.4.1:80 `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) + // add+flush 2 chains for service and endpoint, add 2 rules in each = 8 operations + // flush old endpoint chain = 1 operation + if nft.LastTransaction.NumOperations() != 9 { + t.Errorf("Expected 9 trasaction operations, got %d", nft.LastTransaction.NumOperations()) + } // (Ensure the old svc3 chain gets deleted in the next sync.) ageStaleChains() @@ -4300,6 +4337,10 @@ func TestSyncProxyRulesRepeated(t *testing.T) { add rule ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 meta l4proto tcp dnat to 10.0.4.1:80 `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) + // add+flush 3 chains for 1 service and 2 endpoints, add 2 rules in each = 12 operations + if nft.LastTransaction.NumOperations() != 12 { + t.Errorf("Expected 12 trasaction operations, got %d", nft.LastTransaction.NumOperations()) + } // Empty a service's endpoints; its chains will be flushed, but not immediately deleted. eps3update3 := eps3update2.DeepCopy() @@ -4333,6 +4374,12 @@ func TestSyncProxyRulesRepeated(t *testing.T) { add rule ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 meta l4proto tcp dnat to 10.0.4.1:80 `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) + // remove 1 element from service-ips, add 1 element to no-endpoint-services = 2 operations + // flush 3 chains = 3 operations + if nft.LastTransaction.NumOperations() != 5 { + t.Errorf("Expected 5 trasaction operations, got %d", nft.LastTransaction.NumOperations()) + } + expectedStaleChains := sets.NewString("service-4AT6LBPK-ns3/svc3/tcp/p80", "endpoint-SWWHDC7X-ns3/svc3/tcp/p80__10.0.3.2/80", "endpoint-TQ2QKHCZ-ns3/svc3/tcp/p80__10.0.3.3/80") gotStaleChains := sets.StringKeySet(fp.staleChains) if !expectedStaleChains.Equal(gotStaleChains) { @@ -4374,6 +4421,12 @@ func TestSyncProxyRulesRepeated(t *testing.T) { add rule ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 meta l4proto tcp dnat to 10.0.4.1:80 `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) + // remove 1 element from no-endpoint-services, add 1 element to service-ips = 2 operations + // add+flush 3 chains for 1 service and 2 endpoints, add 2 rules in each = 12 operations + if nft.LastTransaction.NumOperations() != 14 { + t.Errorf("Expected 14 trasaction operations, got %d", nft.LastTransaction.NumOperations()) + } + if len(fp.staleChains) != 0 { t.Errorf("unexpected stale chains: %v", fp.staleChains) } @@ -4392,6 +4445,9 @@ func TestSyncProxyRulesRepeated(t *testing.T) { // Sync with no new changes, so same expected rules as last time fp.syncProxyRules() assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) + if nft.LastTransaction.NumOperations() != 0 { + t.Errorf("Expected 0 trasaction operations, got %d", nft.LastTransaction.NumOperations()) + } } func TestNoEndpointsMetric(t *testing.T) {