diff --git a/pkg/proxy/nftables/proxier.go b/pkg/proxy/nftables/proxier.go index f31e2905449..b12ecc9b956 100644 --- a/pkg/proxy/nftables/proxier.go +++ b/pkg/proxy/nftables/proxier.go @@ -163,6 +163,7 @@ type Proxier struct { // updating nftables with some partial data after kube-proxy restart. endpointSlicesSynced bool servicesSynced bool + syncedOnce bool lastFullSync time.Time needFullSync bool initialized int32 @@ -712,13 +713,13 @@ func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) { }) } - // flush containers - proxier.clusterIPs.reset(tx) - proxier.serviceIPs.reset(tx) - proxier.firewallIPs.reset(tx) - proxier.noEndpointServices.reset(tx) - proxier.noEndpointNodePorts.reset(tx) - proxier.serviceNodePorts.reset(tx) + // read or flush containers + proxier.clusterIPs.readOrReset(tx, proxier.nftables, proxier.logger) + proxier.serviceIPs.readOrReset(tx, proxier.nftables, proxier.logger) + proxier.firewallIPs.readOrReset(tx, proxier.nftables, proxier.logger) + proxier.noEndpointServices.readOrReset(tx, proxier.nftables, proxier.logger) + proxier.noEndpointNodePorts.readOrReset(tx, proxier.nftables, proxier.logger) + proxier.serviceNodePorts.readOrReset(tx, proxier.nftables, proxier.logger) } // CleanupLeftovers removes all nftables rules and chains created by the Proxier @@ -1082,19 +1083,30 @@ func newNFTElementStorage(containerType, containerName string) *nftElementStorag return c } -// reset clears the internal state and flushes the nftables map/set. -func (s *nftElementStorage) reset(tx *knftables.Transaction) { +// readOrReset updates the existing elements from the nftables map/set. +// If reading fails, it clears the internal state and flushes the nftables map/set. +func (s *nftElementStorage) readOrReset(tx *knftables.Transaction, nftables knftables.Interface, logger klog.Logger) { clear(s.elements) - if s.containerType == "set" { - tx.Flush(&knftables.Set{ - Name: s.containerName, - }) - } else { - tx.Flush(&knftables.Map{ - Name: s.containerName, - }) + defer s.resetLeftoverKeys() + elems, err := nftables.ListElements(context.TODO(), s.containerType, s.containerName) + if err != nil && !knftables.IsNotFound(err) { + if s.containerType == "set" { + tx.Flush(&knftables.Set{ + Name: s.containerName, + }) + } else { + tx.Flush(&knftables.Map{ + Name: s.containerName, + }) + } + logger.Error(err, "Failed to list nftables elements", "containerName", s.containerName, "containerType", s.containerType) + return + } + for _, elem := range elems { + newKey := joinNFTSlice(elem.Key) + newValue := joinNFTSlice(elem.Value) + s.elements[newKey] = newValue } - s.resetLeftoverKeys() } // resetLeftoverKeys is only called internally by nftElementStorage methods. @@ -1178,6 +1190,7 @@ func (proxier *Proxier) syncProxyRules() { doFullSync := proxier.needFullSync || (time.Since(proxier.lastFullSync) > proxyutil.FullSyncPeriod) defer func() { + proxier.syncedOnce = true metrics.SyncProxyRulesLatency.WithLabelValues(string(proxier.ipFamily)).Observe(metrics.SinceInSeconds(start)) if !doFullSync { metrics.SyncPartialProxyRulesLatency.WithLabelValues(string(proxier.ipFamily)).Observe(metrics.SinceInSeconds(start)) @@ -1252,6 +1265,26 @@ func (proxier *Proxier) syncProxyRules() { ipvX_addr = "ipv6_addr" } + var existingChains sets.Set[string] + existingChainsList, err := proxier.nftables.List(context.TODO(), "chain") + if err == nil { + existingChains = sets.New(existingChainsList...) + } else { + proxier.logger.Error(err, "Failed to list existing chains") + } + var existingAffinitySets sets.Set[string] + existingSets, err := proxier.nftables.List(context.TODO(), "sets") + if err == nil { + existingAffinitySets = sets.New[string]() + for _, set := range existingSets { + if isAffinitySetName(set) { + existingAffinitySets.Insert(set) + } + } + } else { + proxier.logger.Error(err, "Failed to list existing sets") + } + // Accumulate service/endpoint chains and affinity sets to keep. activeChains := sets.New[string]() activeAffinitySets := sets.New[string]() @@ -1295,7 +1328,8 @@ func (proxier *Proxier) syncProxyRules() { // Note the endpoint chains that will be used for _, ep := range allLocallyReachableEndpoints { if epInfo, ok := ep.(*endpointInfo); ok { - ensureChain(epInfo.chainName, tx, activeChains, skipServiceUpdate) + ensureChain(epInfo.chainName, tx, activeChains, skipServiceUpdate || + proxier.epChainSkipUpdate(existingChains, existingAffinitySets, svcInfo, epInfo)) // Note the affinity sets that will be used if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { activeAffinitySets.Insert(epInfo.affinitySetName) @@ -1737,6 +1771,10 @@ func (proxier *Proxier) syncProxyRules() { continue } + if proxier.epChainSkipUpdate(existingChains, existingAffinitySets, svcInfo, epInfo) { + // If the EP chain is already updated, we can skip it. + continue + } endpointChain := epInfo.chainName // Handle traffic that loops back to the originator with SNAT. @@ -1776,36 +1814,26 @@ func (proxier *Proxier) syncProxyRules() { // short amount of time later that the chain is now unreferenced. So we flush them // now, and record the time that they become stale in staleChains so they can be // deleted later. - existingChains, err := proxier.nftables.List(context.TODO(), "chains") - if err == nil { - for _, chain := range existingChains { - if isServiceChainName(chain) { - if !activeChains.Has(chain) { - tx.Flush(&knftables.Chain{ - Name: chain, - }) - proxier.staleChains[chain] = start - } else { - delete(proxier.staleChains, chain) - } + for chain := range existingChains { + if isServiceChainName(chain) { + if !activeChains.Has(chain) { + tx.Flush(&knftables.Chain{ + Name: chain, + }) + proxier.staleChains[chain] = start + } else { + delete(proxier.staleChains, chain) } } - } else if !knftables.IsNotFound(err) { - proxier.logger.Error(err, "Failed to list nftables chains: stale chains will not be deleted") } // OTOH, we can immediately delete any stale affinity sets - existingSets, err := proxier.nftables.List(context.TODO(), "sets") - if err == nil { - for _, set := range existingSets { - if isAffinitySetName(set) && !activeAffinitySets.Has(set) { - tx.Delete(&knftables.Set{ - Name: set, - }) - } + for set := range existingAffinitySets { + if !activeAffinitySets.Has(set) { + tx.Delete(&knftables.Set{ + Name: set, + }) } - } else if !knftables.IsNotFound(err) { - proxier.logger.Error(err, "Failed to list nftables sets: stale affinity sets will not be deleted") } proxier.clusterIPs.cleanupLeftoverKeys(tx) @@ -1871,6 +1899,30 @@ func (proxier *Proxier) syncProxyRules() { } } +// epChainSkipUpdate returns true if the EP chain doesn't need to be updated. +func (proxier *Proxier) epChainSkipUpdate(existingChains, existingAffinitySets sets.Set[string], svcInfo *servicePortInfo, epInfo *endpointInfo) bool { + if proxier.syncedOnce { + // We only skip updating EP chains during the first sync to speed up kube-proxy restart, otherwise return false. + return false + } + if existingChains == nil || existingAffinitySets == nil { + // listing existing objects failed, can't skip updating + return false + } + // EP chain can have up to 3 rules: + // - loopback masquerade rule + // - includes the endpoint IP + // - affinity rule when session affinity is set to ClusterIP + // - includes the affinity set name + // - DNAT rule + // - includes the endpoint IP + port + // EP chain name includes the endpoint IP + port => loopback and DNAT rules are pre-defined by the chain name. + // When session affinity is set to ClusterIP, the affinity set is created for local endpoints. + // Therefore, we can check that sessions affinity hasn't changed by checking if the affinity set exists. + wantAffinitySet := svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP + return existingChains.Has(epInfo.chainName) && wantAffinitySet == existingAffinitySets.Has(epInfo.affinitySetName) +} + func (proxier *Proxier) writeServiceToEndpointRules(tx *knftables.Transaction, svcInfo *servicePortInfo, svcChain string, endpoints []proxy.Endpoint) { // First write session affinity rules, if applicable. if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { diff --git a/pkg/proxy/nftables/proxier_test.go b/pkg/proxy/nftables/proxier_test.go index f770cca7591..c42e597a498 100644 --- a/pkg/proxy/nftables/proxier_test.go +++ b/pkg/proxy/nftables/proxier_test.go @@ -150,6 +150,15 @@ func NewFakeProxier(ipFamily v1.IPFamily) (*knftables.Fake, *Proxier) { var baseRules = dedent.Dedent(` add table ip kube-proxy { comment "rules for kube-proxy" ; } + add set ip kube-proxy cluster-ips { type ipv4_addr ; comment "Active ClusterIPs" ; } + add set ip kube-proxy nodeport-ips { type ipv4_addr ; comment "IPs that accept NodePort traffic" ; } + + add map ip kube-proxy firewall-ips { type ipv4_addr . inet_proto . inet_service : verdict ; comment "destinations that are subject to LoadBalancerSourceRanges" ; } + add map ip kube-proxy no-endpoint-nodeports { type inet_proto . inet_service : verdict ; comment "vmap to drop or reject packets to service nodeports with no endpoints" ; } + add map ip kube-proxy no-endpoint-services { type ipv4_addr . inet_proto . inet_service : verdict ; comment "vmap to drop or reject packets to services with no endpoints" ; } + add map ip kube-proxy service-ips { type ipv4_addr . inet_proto . inet_service : verdict ; comment "ClusterIP, ExternalIP and LoadBalancer IP traffic" ; } + add map ip kube-proxy service-nodeports { type inet_proto . inet_service : verdict ; comment "NodePort traffic" ; } + add chain ip kube-proxy cluster-ips-check add chain ip kube-proxy filter-prerouting { type filter hook prerouting priority -110 ; } add chain ip kube-proxy filter-forward { type filter hook forward priority -110 ; } @@ -189,16 +198,9 @@ var baseRules = dedent.Dedent(` add rule ip kube-proxy reject-chain reject add rule ip kube-proxy services ip daddr . meta l4proto . th dport vmap @service-ips add rule ip kube-proxy services ip daddr @nodeport-ips meta l4proto . th dport vmap @service-nodeports - add set ip kube-proxy cluster-ips { type ipv4_addr ; comment "Active ClusterIPs" ; } - add set ip kube-proxy nodeport-ips { type ipv4_addr ; comment "IPs that accept NodePort traffic" ; } + add element ip kube-proxy nodeport-ips { 192.168.0.2 } add rule ip kube-proxy service-endpoints-check ip daddr . meta l4proto . th dport vmap @no-endpoint-services - - add map ip kube-proxy firewall-ips { type ipv4_addr . inet_proto . inet_service : verdict ; comment "destinations that are subject to LoadBalancerSourceRanges" ; } - add map ip kube-proxy no-endpoint-nodeports { type inet_proto . inet_service : verdict ; comment "vmap to drop or reject packets to service nodeports with no endpoints" ; } - add map ip kube-proxy no-endpoint-services { type ipv4_addr . inet_proto . inet_service : verdict ; comment "vmap to drop or reject packets to services with no endpoints" ; } - add map ip kube-proxy service-ips { type ipv4_addr . inet_proto . inet_service : verdict ; comment "ClusterIP, ExternalIP and LoadBalancer IP traffic" ; } - add map ip kube-proxy service-nodeports { type inet_proto . inet_service : verdict ; comment "NodePort traffic" ; } `) // TestOverallNFTablesRules creates a variety of services and verifies that the generated @@ -4321,6 +4323,142 @@ func TestSyncProxyRulesRepeated(t *testing.T) { } } +func TestSyncProxyRulesStartup(t *testing.T) { + nft, fp := NewFakeProxier(v1.IPv4Protocol) + fp.syncProxyRules() + // measure the amount of ops required for the initial sync + setupOps := nft.LastTransaction.NumOperations() + + // now create a new proxier and start from scratch + nft, fp = NewFakeProxier(v1.IPv4Protocol) + + // put a part of desired state to nftables + err := nft.ParseDump(baseRules + dedent.Dedent(` + add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 + add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 + add chain ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080 + add chain ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 + + add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 ip daddr 172.30.0.41 tcp dport 80 ip saddr != 10.0.0.0/8 jump mark-for-masquerade + add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 } + add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 ip saddr 10.0.1.1 jump mark-for-masquerade + add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 meta l4proto tcp dnat to 10.0.1.1:80 + + add rule ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080 ip daddr 172.30.0.42 tcp dport 8080 ip saddr != 10.0.0.0/8 jump mark-for-masquerade + add rule ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080 numgen random mod 1 vmap { 0 : goto endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 } + add rule ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 ip saddr 10.0.2.1 jump mark-for-masquerade + add rule ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 meta l4proto tcp dnat to 10.0.2.1:8080 + + add element ip kube-proxy cluster-ips { 172.30.0.41 } + add element ip kube-proxy cluster-ips { 172.30.0.42 } + add element ip kube-proxy service-ips { 172.30.0.41 . tcp . 80 : goto service-ULMVA6XW-ns1/svc1/tcp/p80 } + add element ip kube-proxy service-ips { 172.30.0.42 . tcp . 8080 : goto service-MHHHYRWA-ns2/svc2/tcp/p8080 } + `)) + + if err != nil { + t.Errorf("nft.ParseDump failed: %v", err) + } + + // Create initial state, which differs from the loaded nftables state: + // - svc1 has a second endpoint + // - svc3 is added + makeServiceMap(fp, + makeTestService("ns1", "svc1", func(svc *v1.Service) { + svc.Spec.Type = v1.ServiceTypeClusterIP + svc.Spec.ClusterIP = "172.30.0.41" + svc.Spec.Ports = []v1.ServicePort{{ + Name: "p80", + Port: 80, + Protocol: v1.ProtocolTCP, + }} + }), + makeTestService("ns2", "svc2", func(svc *v1.Service) { + svc.Spec.Type = v1.ServiceTypeClusterIP + svc.Spec.ClusterIP = "172.30.0.42" + svc.Spec.Ports = []v1.ServicePort{{ + Name: "p8080", + Port: 8080, + Protocol: v1.ProtocolTCP, + }} + }), + makeTestService("ns3", "svc3", func(svc *v1.Service) { + svc.Spec.Type = v1.ServiceTypeClusterIP + svc.Spec.ClusterIP = "172.30.0.43" + svc.Spec.Ports = []v1.ServicePort{{ + Name: "p80", + Port: 80, + Protocol: v1.ProtocolTCP, + }} + }), + ) + + populateEndpointSlices(fp, + makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) { + eps.AddressType = discovery.AddressTypeIPv4 + eps.Endpoints = []discovery.Endpoint{ + {Addresses: []string{"10.0.1.1"}}, + {Addresses: []string{"10.0.1.2"}}, + } + eps.Ports = []discovery.EndpointPort{{ + Name: ptr.To("p80"), + Port: ptr.To[int32](80), + Protocol: ptr.To(v1.ProtocolTCP), + }} + }), + makeTestEndpointSlice("ns2", "svc2", 1, func(eps *discovery.EndpointSlice) { + eps.AddressType = discovery.AddressTypeIPv4 + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"10.0.2.1"}, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: ptr.To("p8080"), + Port: ptr.To[int32](8080), + Protocol: ptr.To(v1.ProtocolTCP), + }} + }), + ) + + fp.syncProxyRules() + + expected := baseRules + dedent.Dedent(` + add element ip kube-proxy cluster-ips { 172.30.0.41 } + add element ip kube-proxy cluster-ips { 172.30.0.42 } + add element ip kube-proxy cluster-ips { 172.30.0.43 } + add element ip kube-proxy service-ips { 172.30.0.41 . tcp . 80 : goto service-ULMVA6XW-ns1/svc1/tcp/p80 } + add element ip kube-proxy service-ips { 172.30.0.42 . tcp . 8080 : goto service-MHHHYRWA-ns2/svc2/tcp/p8080 } + add element ip kube-proxy no-endpoint-services { 172.30.0.43 . tcp . 80 comment "ns3/svc3:p80" : goto reject-chain } + + add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 + add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 ip daddr 172.30.0.41 tcp dport 80 ip saddr != 10.0.0.0/8 jump mark-for-masquerade + add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 numgen random mod 2 vmap { 0 : goto endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 , 1 : goto endpoint-ZCZBVNAZ-ns1/svc1/tcp/p80__10.0.1.2/80 } + add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 + add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 ip saddr 10.0.1.1 jump mark-for-masquerade + add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 meta l4proto tcp dnat to 10.0.1.1:80 + add chain ip kube-proxy endpoint-ZCZBVNAZ-ns1/svc1/tcp/p80__10.0.1.2/80 + add rule ip kube-proxy endpoint-ZCZBVNAZ-ns1/svc1/tcp/p80__10.0.1.2/80 ip saddr 10.0.1.2 jump mark-for-masquerade + add rule ip kube-proxy endpoint-ZCZBVNAZ-ns1/svc1/tcp/p80__10.0.1.2/80 meta l4proto tcp dnat to 10.0.1.2:80 + + add chain ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080 + add rule ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080 ip daddr 172.30.0.42 tcp dport 8080 ip saddr != 10.0.0.0/8 jump mark-for-masquerade + add rule ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080 numgen random mod 1 vmap { 0 : goto endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 } + add chain ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 + add rule ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 ip saddr 10.0.2.1 jump mark-for-masquerade + add rule ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 meta l4proto tcp dnat to 10.0.2.1:8080 + `) + assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) + // initial transaction consists of: + // 1. nft setup, total ops = setupOps + // 2. services setup (should skip adding existing set/map elements and endpoint chains+rules) + // - add svc3 IP to the cluster-ips, and to the no-endpoint-services set = 2 ops + // - add+flush 2 service chains + 2 rules each = 8 ops + // - add+flush svc1 endpoint chain + 2 rules = 4 ops + // total: 14 ops + if nft.LastTransaction.NumOperations() != setupOps+14 { + fmt.Println(nft.LastTransaction) + t.Errorf("Expected %v trasaction operations, got %d", setupOps+14, nft.LastTransaction.NumOperations()) + } +} + func TestNoEndpointsMetric(t *testing.T) { type endpoint struct { ip string