From ca8c27c480fce142f19f5d6d82c69c87020239d6 Mon Sep 17 00:00:00 2001 From: Quan Tian Date: Fri, 5 Jan 2024 17:54:51 +0800 Subject: [PATCH] kube-proxy: do not delete previously stale but currently active chains In some cases a chain could change from stale to active, but once it's added to staleChains it would always be deleted once. When the proxier tries to delete a previously stale but currently active chain, it would fail and lead to errors, though it won't cause real problem thanks to kernel's validation. The commit removes a chain from staleChains if it becomes active. Signed-off-by: Quan Tian --- pkg/proxy/nftables/proxier.go | 14 +++-- pkg/proxy/nftables/proxier_test.go | 83 ++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+), 5 deletions(-) diff --git a/pkg/proxy/nftables/proxier.go b/pkg/proxy/nftables/proxier.go index 1ebba166724..b5f85747365 100644 --- a/pkg/proxy/nftables/proxier.go +++ b/pkg/proxy/nftables/proxier.go @@ -1525,11 +1525,15 @@ func (proxier *Proxier) syncProxyRules() { existingChains, err := proxier.nftables.List(context.TODO(), "chains") if err == nil { for _, chain := range existingChains { - if isServiceChainName(chain) && !activeChains.Has(chain) { - tx.Flush(&knftables.Chain{ - Name: chain, - }) - proxier.staleChains[chain] = start + if isServiceChainName(chain) { + if !activeChains.Has(chain) { + tx.Flush(&knftables.Chain{ + Name: chain, + }) + proxier.staleChains[chain] = start + } else { + delete(proxier.staleChains, chain) + } } } } else if !knftables.IsNotFound(err) { diff --git a/pkg/proxy/nftables/proxier_test.go b/pkg/proxy/nftables/proxier_test.go index 10f8c11f9e0..e699e12b3f8 100644 --- a/pkg/proxy/nftables/proxier_test.go +++ b/pkg/proxy/nftables/proxier_test.go @@ -35,6 +35,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/component-base/metrics/testutil" @@ -4649,6 +4650,88 @@ func TestSyncProxyRulesRepeated(t *testing.T) { `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) + // Empty a service's endpoints; its chains will be flushed, but not immediately deleted. + eps3update3 := eps3update2.DeepCopy() + eps3update3.Endpoints = []discovery.Endpoint{} + fp.OnEndpointSliceUpdate(eps3update2, eps3update3) + fp.syncProxyRules() + expected = baseRules + dedent.Dedent(` + add element ip kube-proxy service-ips { 172.30.0.41 . tcp . 80 : goto service-ULMVA6XW-ns1/svc1/tcp/p80 } + add element ip kube-proxy no-endpoint-services { 172.30.0.43 . tcp . 80 comment "ns3/svc3:p80" : goto reject-chain } + add element ip kube-proxy service-ips { 172.30.0.44 . tcp . 80 : goto service-LAUZTJTB-ns4/svc4/tcp/p80 } + + add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 + add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 ip daddr 172.30.0.41 tcp dport 80 ip saddr != 10.0.0.0/8 jump mark-for-masquerade + add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 } + add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 + add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 ip saddr 10.0.1.1 jump mark-for-masquerade + add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 meta l4proto tcp dnat to 10.0.1.1:80 + + add chain ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 + add chain ip kube-proxy endpoint-SWWHDC7X-ns3/svc3/tcp/p80__10.0.3.2/80 + add chain ip kube-proxy endpoint-TQ2QKHCZ-ns3/svc3/tcp/p80__10.0.3.3/80 + + add chain ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 + add rule ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 ip daddr 172.30.0.44 tcp dport 80 ip saddr != 10.0.0.0/8 jump mark-for-masquerade + add rule ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 } + add chain ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 + add rule ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 ip saddr 10.0.4.1 jump mark-for-masquerade + add rule ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 meta l4proto tcp dnat to 10.0.4.1:80 + `) + assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) + expectedStaleChains := sets.NewString("service-4AT6LBPK-ns3/svc3/tcp/p80", "endpoint-SWWHDC7X-ns3/svc3/tcp/p80__10.0.3.2/80", "endpoint-TQ2QKHCZ-ns3/svc3/tcp/p80__10.0.3.3/80") + gotStaleChains := sets.StringKeySet(fp.staleChains) + if !expectedStaleChains.Equal(gotStaleChains) { + t.Errorf("expected stale chains %v, got %v", expectedStaleChains, gotStaleChains) + } + // Restore endpoints to non-empty immediately; its chains will be restored, and deleted from staleChains. + fp.OnEndpointSliceUpdate(eps3update3, eps3update2) + fp.syncProxyRules() + expected = baseRules + dedent.Dedent(` + add element ip kube-proxy service-ips { 172.30.0.41 . tcp . 80 : goto service-ULMVA6XW-ns1/svc1/tcp/p80 } + add element ip kube-proxy service-ips { 172.30.0.43 . tcp . 80 : goto service-4AT6LBPK-ns3/svc3/tcp/p80 } + add element ip kube-proxy service-ips { 172.30.0.44 . tcp . 80 : goto service-LAUZTJTB-ns4/svc4/tcp/p80 } + + add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 + add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 ip daddr 172.30.0.41 tcp dport 80 ip saddr != 10.0.0.0/8 jump mark-for-masquerade + add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 } + add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 + add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 ip saddr 10.0.1.1 jump mark-for-masquerade + add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 meta l4proto tcp dnat to 10.0.1.1:80 + + add chain ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 + add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 ip daddr 172.30.0.43 tcp dport 80 ip saddr != 10.0.0.0/8 jump mark-for-masquerade + add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 numgen random mod 2 vmap { 0 : goto endpoint-SWWHDC7X-ns3/svc3/tcp/p80__10.0.3.2/80 , 1 : goto endpoint-TQ2QKHCZ-ns3/svc3/tcp/p80__10.0.3.3/80 } + add chain ip kube-proxy endpoint-SWWHDC7X-ns3/svc3/tcp/p80__10.0.3.2/80 + add rule ip kube-proxy endpoint-SWWHDC7X-ns3/svc3/tcp/p80__10.0.3.2/80 ip saddr 10.0.3.2 jump mark-for-masquerade + add rule ip kube-proxy endpoint-SWWHDC7X-ns3/svc3/tcp/p80__10.0.3.2/80 meta l4proto tcp dnat to 10.0.3.2:80 + add chain ip kube-proxy endpoint-TQ2QKHCZ-ns3/svc3/tcp/p80__10.0.3.3/80 + add rule ip kube-proxy endpoint-TQ2QKHCZ-ns3/svc3/tcp/p80__10.0.3.3/80 ip saddr 10.0.3.3 jump mark-for-masquerade + add rule ip kube-proxy endpoint-TQ2QKHCZ-ns3/svc3/tcp/p80__10.0.3.3/80 meta l4proto tcp dnat to 10.0.3.3:80 + + add chain ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 + add rule ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 ip daddr 172.30.0.44 tcp dport 80 ip saddr != 10.0.0.0/8 jump mark-for-masquerade + add rule ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 } + add chain ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 + add rule ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 ip saddr 10.0.4.1 jump mark-for-masquerade + add rule ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 meta l4proto tcp dnat to 10.0.4.1:80 + `) + assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) + if len(fp.staleChains) != 0 { + t.Errorf("unexpected stale chains: %v", fp.staleChains) + } + + // Empty a service's endpoints and restore it after stale chains age. + // - its chains will be flushed, but not immediately deleted in the first sync. + // - its chains will be deleted first, then recreated in the second sync. + fp.OnEndpointSliceUpdate(eps3update2, eps3update3) + fp.syncProxyRules() + ageStaleChains() + fp.OnEndpointSliceUpdate(eps3update3, eps3update2) + fp.syncProxyRules() + // The second change counteracts the first one, so same expected rules as last time + assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) + // Sync with no new changes, so same expected rules as last time fp.syncProxyRules() assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump())