mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 23:15:14 +00:00
[kube-proxy:nftables] Skip EP chain updates on startup.
Endpoint chain contents are fairly predictable from their name and existing affinity sets. Skip endpoint chain updates, when we can be sure that rules in that chain are still correct. Add unit test to verify first transaction is optimized. Change baseRules ordering to make it accepted by nft.ParseDump. Signed-off-by: Nadia Pinaeva <npinaeva@redhat.com>
This commit is contained in:
parent
7d5f3c5723
commit
cc0faf086d
@ -163,6 +163,7 @@ type Proxier struct {
|
||||
// updating nftables with some partial data after kube-proxy restart.
|
||||
endpointSlicesSynced bool
|
||||
servicesSynced bool
|
||||
syncedOnce bool
|
||||
lastFullSync time.Time
|
||||
needFullSync bool
|
||||
initialized int32
|
||||
@ -1189,6 +1190,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
doFullSync := proxier.needFullSync || (time.Since(proxier.lastFullSync) > proxyutil.FullSyncPeriod)
|
||||
|
||||
defer func() {
|
||||
proxier.syncedOnce = true
|
||||
metrics.SyncProxyRulesLatency.WithLabelValues(string(proxier.ipFamily)).Observe(metrics.SinceInSeconds(start))
|
||||
if !doFullSync {
|
||||
metrics.SyncPartialProxyRulesLatency.WithLabelValues(string(proxier.ipFamily)).Observe(metrics.SinceInSeconds(start))
|
||||
@ -1263,6 +1265,26 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
ipvX_addr = "ipv6_addr"
|
||||
}
|
||||
|
||||
var existingChains sets.Set[string]
|
||||
existingChainsList, err := proxier.nftables.List(context.TODO(), "chain")
|
||||
if err == nil {
|
||||
existingChains = sets.New(existingChainsList...)
|
||||
} else {
|
||||
proxier.logger.Error(err, "Failed to list existing chains")
|
||||
}
|
||||
var existingAffinitySets sets.Set[string]
|
||||
existingSets, err := proxier.nftables.List(context.TODO(), "sets")
|
||||
if err == nil {
|
||||
existingAffinitySets = sets.New[string]()
|
||||
for _, set := range existingSets {
|
||||
if isAffinitySetName(set) {
|
||||
existingAffinitySets.Insert(set)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
proxier.logger.Error(err, "Failed to list existing sets")
|
||||
}
|
||||
|
||||
// Accumulate service/endpoint chains and affinity sets to keep.
|
||||
activeChains := sets.New[string]()
|
||||
activeAffinitySets := sets.New[string]()
|
||||
@ -1306,7 +1328,8 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// Note the endpoint chains that will be used
|
||||
for _, ep := range allLocallyReachableEndpoints {
|
||||
if epInfo, ok := ep.(*endpointInfo); ok {
|
||||
ensureChain(epInfo.chainName, tx, activeChains, skipServiceUpdate)
|
||||
ensureChain(epInfo.chainName, tx, activeChains, skipServiceUpdate ||
|
||||
proxier.epChainSkipUpdate(existingChains, existingAffinitySets, svcInfo, epInfo))
|
||||
// Note the affinity sets that will be used
|
||||
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
|
||||
activeAffinitySets.Insert(epInfo.affinitySetName)
|
||||
@ -1748,6 +1771,10 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
continue
|
||||
}
|
||||
|
||||
if proxier.epChainSkipUpdate(existingChains, existingAffinitySets, svcInfo, epInfo) {
|
||||
// If the EP chain is already updated, we can skip it.
|
||||
continue
|
||||
}
|
||||
endpointChain := epInfo.chainName
|
||||
|
||||
// Handle traffic that loops back to the originator with SNAT.
|
||||
@ -1787,36 +1814,26 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// short amount of time later that the chain is now unreferenced. So we flush them
|
||||
// now, and record the time that they become stale in staleChains so they can be
|
||||
// deleted later.
|
||||
existingChains, err := proxier.nftables.List(context.TODO(), "chains")
|
||||
if err == nil {
|
||||
for _, chain := range existingChains {
|
||||
if isServiceChainName(chain) {
|
||||
if !activeChains.Has(chain) {
|
||||
tx.Flush(&knftables.Chain{
|
||||
Name: chain,
|
||||
})
|
||||
proxier.staleChains[chain] = start
|
||||
} else {
|
||||
delete(proxier.staleChains, chain)
|
||||
}
|
||||
for chain := range existingChains {
|
||||
if isServiceChainName(chain) {
|
||||
if !activeChains.Has(chain) {
|
||||
tx.Flush(&knftables.Chain{
|
||||
Name: chain,
|
||||
})
|
||||
proxier.staleChains[chain] = start
|
||||
} else {
|
||||
delete(proxier.staleChains, chain)
|
||||
}
|
||||
}
|
||||
} else if !knftables.IsNotFound(err) {
|
||||
proxier.logger.Error(err, "Failed to list nftables chains: stale chains will not be deleted")
|
||||
}
|
||||
|
||||
// OTOH, we can immediately delete any stale affinity sets
|
||||
existingSets, err := proxier.nftables.List(context.TODO(), "sets")
|
||||
if err == nil {
|
||||
for _, set := range existingSets {
|
||||
if isAffinitySetName(set) && !activeAffinitySets.Has(set) {
|
||||
tx.Delete(&knftables.Set{
|
||||
Name: set,
|
||||
})
|
||||
}
|
||||
for set := range existingAffinitySets {
|
||||
if !activeAffinitySets.Has(set) {
|
||||
tx.Delete(&knftables.Set{
|
||||
Name: set,
|
||||
})
|
||||
}
|
||||
} else if !knftables.IsNotFound(err) {
|
||||
proxier.logger.Error(err, "Failed to list nftables sets: stale affinity sets will not be deleted")
|
||||
}
|
||||
|
||||
proxier.clusterIPs.cleanupLeftoverKeys(tx)
|
||||
@ -1882,6 +1899,30 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
}
|
||||
}
|
||||
|
||||
// epChainSkipUpdate returns true if the EP chain doesn't need to be updated.
|
||||
func (proxier *Proxier) epChainSkipUpdate(existingChains, existingAffinitySets sets.Set[string], svcInfo *servicePortInfo, epInfo *endpointInfo) bool {
|
||||
if proxier.syncedOnce {
|
||||
// We only skip updating EP chains during the first sync to speed up kube-proxy restart, otherwise return false.
|
||||
return false
|
||||
}
|
||||
if existingChains == nil || existingAffinitySets == nil {
|
||||
// listing existing objects failed, can't skip updating
|
||||
return false
|
||||
}
|
||||
// EP chain can have up to 3 rules:
|
||||
// - loopback masquerade rule
|
||||
// - includes the endpoint IP
|
||||
// - affinity rule when session affinity is set to ClusterIP
|
||||
// - includes the affinity set name
|
||||
// - DNAT rule
|
||||
// - includes the endpoint IP + port
|
||||
// EP chain name includes the endpoint IP + port => loopback and DNAT rules are pre-defined by the chain name.
|
||||
// When session affinity is set to ClusterIP, the affinity set is created for local endpoints.
|
||||
// Therefore, we can check that sessions affinity hasn't changed by checking if the affinity set exists.
|
||||
wantAffinitySet := svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP
|
||||
return existingChains.Has(epInfo.chainName) && wantAffinitySet == existingAffinitySets.Has(epInfo.affinitySetName)
|
||||
}
|
||||
|
||||
func (proxier *Proxier) writeServiceToEndpointRules(tx *knftables.Transaction, svcInfo *servicePortInfo, svcChain string, endpoints []proxy.Endpoint) {
|
||||
// First write session affinity rules, if applicable.
|
||||
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
|
||||
|
@ -150,6 +150,15 @@ func NewFakeProxier(ipFamily v1.IPFamily) (*knftables.Fake, *Proxier) {
|
||||
var baseRules = dedent.Dedent(`
|
||||
add table ip kube-proxy { comment "rules for kube-proxy" ; }
|
||||
|
||||
add set ip kube-proxy cluster-ips { type ipv4_addr ; comment "Active ClusterIPs" ; }
|
||||
add set ip kube-proxy nodeport-ips { type ipv4_addr ; comment "IPs that accept NodePort traffic" ; }
|
||||
|
||||
add map ip kube-proxy firewall-ips { type ipv4_addr . inet_proto . inet_service : verdict ; comment "destinations that are subject to LoadBalancerSourceRanges" ; }
|
||||
add map ip kube-proxy no-endpoint-nodeports { type inet_proto . inet_service : verdict ; comment "vmap to drop or reject packets to service nodeports with no endpoints" ; }
|
||||
add map ip kube-proxy no-endpoint-services { type ipv4_addr . inet_proto . inet_service : verdict ; comment "vmap to drop or reject packets to services with no endpoints" ; }
|
||||
add map ip kube-proxy service-ips { type ipv4_addr . inet_proto . inet_service : verdict ; comment "ClusterIP, ExternalIP and LoadBalancer IP traffic" ; }
|
||||
add map ip kube-proxy service-nodeports { type inet_proto . inet_service : verdict ; comment "NodePort traffic" ; }
|
||||
|
||||
add chain ip kube-proxy cluster-ips-check
|
||||
add chain ip kube-proxy filter-prerouting { type filter hook prerouting priority -110 ; }
|
||||
add chain ip kube-proxy filter-forward { type filter hook forward priority -110 ; }
|
||||
@ -189,16 +198,9 @@ var baseRules = dedent.Dedent(`
|
||||
add rule ip kube-proxy reject-chain reject
|
||||
add rule ip kube-proxy services ip daddr . meta l4proto . th dport vmap @service-ips
|
||||
add rule ip kube-proxy services ip daddr @nodeport-ips meta l4proto . th dport vmap @service-nodeports
|
||||
add set ip kube-proxy cluster-ips { type ipv4_addr ; comment "Active ClusterIPs" ; }
|
||||
add set ip kube-proxy nodeport-ips { type ipv4_addr ; comment "IPs that accept NodePort traffic" ; }
|
||||
|
||||
add element ip kube-proxy nodeport-ips { 192.168.0.2 }
|
||||
add rule ip kube-proxy service-endpoints-check ip daddr . meta l4proto . th dport vmap @no-endpoint-services
|
||||
|
||||
add map ip kube-proxy firewall-ips { type ipv4_addr . inet_proto . inet_service : verdict ; comment "destinations that are subject to LoadBalancerSourceRanges" ; }
|
||||
add map ip kube-proxy no-endpoint-nodeports { type inet_proto . inet_service : verdict ; comment "vmap to drop or reject packets to service nodeports with no endpoints" ; }
|
||||
add map ip kube-proxy no-endpoint-services { type ipv4_addr . inet_proto . inet_service : verdict ; comment "vmap to drop or reject packets to services with no endpoints" ; }
|
||||
add map ip kube-proxy service-ips { type ipv4_addr . inet_proto . inet_service : verdict ; comment "ClusterIP, ExternalIP and LoadBalancer IP traffic" ; }
|
||||
add map ip kube-proxy service-nodeports { type inet_proto . inet_service : verdict ; comment "NodePort traffic" ; }
|
||||
`)
|
||||
|
||||
// TestOverallNFTablesRules creates a variety of services and verifies that the generated
|
||||
@ -4321,6 +4323,142 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncProxyRulesStartup(t *testing.T) {
|
||||
nft, fp := NewFakeProxier(v1.IPv4Protocol)
|
||||
fp.syncProxyRules()
|
||||
// measure the amount of ops required for the initial sync
|
||||
setupOps := nft.LastTransaction.NumOperations()
|
||||
|
||||
// now create a new proxier and start from scratch
|
||||
nft, fp = NewFakeProxier(v1.IPv4Protocol)
|
||||
|
||||
// put a part of desired state to nftables
|
||||
err := nft.ParseDump(baseRules + dedent.Dedent(`
|
||||
add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80
|
||||
add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80
|
||||
add chain ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080
|
||||
add chain ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080
|
||||
|
||||
add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 ip daddr 172.30.0.41 tcp dport 80 ip saddr != 10.0.0.0/8 jump mark-for-masquerade
|
||||
add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 }
|
||||
add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 ip saddr 10.0.1.1 jump mark-for-masquerade
|
||||
add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 meta l4proto tcp dnat to 10.0.1.1:80
|
||||
|
||||
add rule ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080 ip daddr 172.30.0.42 tcp dport 8080 ip saddr != 10.0.0.0/8 jump mark-for-masquerade
|
||||
add rule ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080 numgen random mod 1 vmap { 0 : goto endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 }
|
||||
add rule ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 ip saddr 10.0.2.1 jump mark-for-masquerade
|
||||
add rule ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 meta l4proto tcp dnat to 10.0.2.1:8080
|
||||
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.41 }
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.42 }
|
||||
add element ip kube-proxy service-ips { 172.30.0.41 . tcp . 80 : goto service-ULMVA6XW-ns1/svc1/tcp/p80 }
|
||||
add element ip kube-proxy service-ips { 172.30.0.42 . tcp . 8080 : goto service-MHHHYRWA-ns2/svc2/tcp/p8080 }
|
||||
`))
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("nft.ParseDump failed: %v", err)
|
||||
}
|
||||
|
||||
// Create initial state, which differs from the loaded nftables state:
|
||||
// - svc1 has a second endpoint
|
||||
// - svc3 is added
|
||||
makeServiceMap(fp,
|
||||
makeTestService("ns1", "svc1", func(svc *v1.Service) {
|
||||
svc.Spec.Type = v1.ServiceTypeClusterIP
|
||||
svc.Spec.ClusterIP = "172.30.0.41"
|
||||
svc.Spec.Ports = []v1.ServicePort{{
|
||||
Name: "p80",
|
||||
Port: 80,
|
||||
Protocol: v1.ProtocolTCP,
|
||||
}}
|
||||
}),
|
||||
makeTestService("ns2", "svc2", func(svc *v1.Service) {
|
||||
svc.Spec.Type = v1.ServiceTypeClusterIP
|
||||
svc.Spec.ClusterIP = "172.30.0.42"
|
||||
svc.Spec.Ports = []v1.ServicePort{{
|
||||
Name: "p8080",
|
||||
Port: 8080,
|
||||
Protocol: v1.ProtocolTCP,
|
||||
}}
|
||||
}),
|
||||
makeTestService("ns3", "svc3", func(svc *v1.Service) {
|
||||
svc.Spec.Type = v1.ServiceTypeClusterIP
|
||||
svc.Spec.ClusterIP = "172.30.0.43"
|
||||
svc.Spec.Ports = []v1.ServicePort{{
|
||||
Name: "p80",
|
||||
Port: 80,
|
||||
Protocol: v1.ProtocolTCP,
|
||||
}}
|
||||
}),
|
||||
)
|
||||
|
||||
populateEndpointSlices(fp,
|
||||
makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) {
|
||||
eps.AddressType = discovery.AddressTypeIPv4
|
||||
eps.Endpoints = []discovery.Endpoint{
|
||||
{Addresses: []string{"10.0.1.1"}},
|
||||
{Addresses: []string{"10.0.1.2"}},
|
||||
}
|
||||
eps.Ports = []discovery.EndpointPort{{
|
||||
Name: ptr.To("p80"),
|
||||
Port: ptr.To[int32](80),
|
||||
Protocol: ptr.To(v1.ProtocolTCP),
|
||||
}}
|
||||
}),
|
||||
makeTestEndpointSlice("ns2", "svc2", 1, func(eps *discovery.EndpointSlice) {
|
||||
eps.AddressType = discovery.AddressTypeIPv4
|
||||
eps.Endpoints = []discovery.Endpoint{{
|
||||
Addresses: []string{"10.0.2.1"},
|
||||
}}
|
||||
eps.Ports = []discovery.EndpointPort{{
|
||||
Name: ptr.To("p8080"),
|
||||
Port: ptr.To[int32](8080),
|
||||
Protocol: ptr.To(v1.ProtocolTCP),
|
||||
}}
|
||||
}),
|
||||
)
|
||||
|
||||
fp.syncProxyRules()
|
||||
|
||||
expected := baseRules + dedent.Dedent(`
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.41 }
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.42 }
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.43 }
|
||||
add element ip kube-proxy service-ips { 172.30.0.41 . tcp . 80 : goto service-ULMVA6XW-ns1/svc1/tcp/p80 }
|
||||
add element ip kube-proxy service-ips { 172.30.0.42 . tcp . 8080 : goto service-MHHHYRWA-ns2/svc2/tcp/p8080 }
|
||||
add element ip kube-proxy no-endpoint-services { 172.30.0.43 . tcp . 80 comment "ns3/svc3:p80" : goto reject-chain }
|
||||
|
||||
add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80
|
||||
add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 ip daddr 172.30.0.41 tcp dport 80 ip saddr != 10.0.0.0/8 jump mark-for-masquerade
|
||||
add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 numgen random mod 2 vmap { 0 : goto endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 , 1 : goto endpoint-ZCZBVNAZ-ns1/svc1/tcp/p80__10.0.1.2/80 }
|
||||
add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80
|
||||
add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 ip saddr 10.0.1.1 jump mark-for-masquerade
|
||||
add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 meta l4proto tcp dnat to 10.0.1.1:80
|
||||
add chain ip kube-proxy endpoint-ZCZBVNAZ-ns1/svc1/tcp/p80__10.0.1.2/80
|
||||
add rule ip kube-proxy endpoint-ZCZBVNAZ-ns1/svc1/tcp/p80__10.0.1.2/80 ip saddr 10.0.1.2 jump mark-for-masquerade
|
||||
add rule ip kube-proxy endpoint-ZCZBVNAZ-ns1/svc1/tcp/p80__10.0.1.2/80 meta l4proto tcp dnat to 10.0.1.2:80
|
||||
|
||||
add chain ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080
|
||||
add rule ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080 ip daddr 172.30.0.42 tcp dport 8080 ip saddr != 10.0.0.0/8 jump mark-for-masquerade
|
||||
add rule ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080 numgen random mod 1 vmap { 0 : goto endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 }
|
||||
add chain ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080
|
||||
add rule ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 ip saddr 10.0.2.1 jump mark-for-masquerade
|
||||
add rule ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 meta l4proto tcp dnat to 10.0.2.1:8080
|
||||
`)
|
||||
assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump())
|
||||
// initial transaction consists of:
|
||||
// 1. nft setup, total ops = setupOps
|
||||
// 2. services setup (should skip adding existing set/map elements and endpoint chains+rules)
|
||||
// - add svc3 IP to the cluster-ips, and to the no-endpoint-services set = 2 ops
|
||||
// - add+flush 2 service chains + 2 rules each = 8 ops
|
||||
// - add+flush svc1 endpoint chain + 2 rules = 4 ops
|
||||
// total: 14 ops
|
||||
if nft.LastTransaction.NumOperations() != setupOps+14 {
|
||||
fmt.Println(nft.LastTransaction)
|
||||
t.Errorf("Expected %v trasaction operations, got %d", setupOps+14, nft.LastTransaction.NumOperations())
|
||||
}
|
||||
}
|
||||
|
||||
func TestNoEndpointsMetric(t *testing.T) {
|
||||
type endpoint struct {
|
||||
ip string
|
||||
|
Loading…
Reference in New Issue
Block a user