Merge pull request #122111 from danwinship/proxy-chain-creation-cleanup

proxy chain creation cleanup
This commit is contained in:
Kubernetes Prow Robot 2023-12-14 06:17:40 +01:00 committed by GitHub
commit b54e719509
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 57 additions and 73 deletions

View File

@ -421,7 +421,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
natChains.Write("*nat") natChains.Write("*nat")
// Start with chains we know we need to remove. // Start with chains we know we need to remove.
for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain} { for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain} {
if _, found := existingNATChains[chain]; found { if existingNATChains.Has(chain) {
chainString := string(chain) chainString := string(chain)
natChains.Write(utiliptables.MakeChainLine(chain)) // flush natChains.Write(utiliptables.MakeChainLine(chain)) // flush
natRules.Write("-X", chainString) // delete natRules.Write("-X", chainString) // delete
@ -457,7 +457,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
filterRules := proxyutil.NewLineBuffer() filterRules := proxyutil.NewLineBuffer()
filterChains.Write("*filter") filterChains.Write("*filter")
for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain, kubeNodePortsChain} { for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain, kubeNodePortsChain} {
if _, found := existingFilterChains[chain]; found { if existingFilterChains.Has(chain) {
chainString := string(chain) chainString := string(chain)
filterChains.Write(utiliptables.MakeChainLine(chain)) filterChains.Write(utiliptables.MakeChainLine(chain))
filterRules.Write("-X", chainString) filterRules.Write("-X", chainString)
@ -926,7 +926,7 @@ func (proxier *Proxier) syncProxyRules() {
} }
// Accumulate NAT chains to keep. // Accumulate NAT chains to keep.
activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set activeNATChains := sets.New[utiliptables.Chain]()
// To avoid growing this slice, we arbitrarily set its size to 64, // To avoid growing this slice, we arbitrarily set its size to 64,
// there is never more than that many arguments for a single line. // there is never more than that many arguments for a single line.
@ -964,26 +964,13 @@ func (proxier *Proxier) syncProxyRules() {
allEndpoints := proxier.endpointsMap[svcName] allEndpoints := proxier.endpointsMap[svcName]
clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeLabels) clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeLabels)
// Note the endpoint chains that will be used
for _, ep := range allLocallyReachableEndpoints {
if epInfo, ok := ep.(*endpointInfo); ok {
activeNATChains[epInfo.ChainName] = true
}
}
// clusterPolicyChain contains the endpoints used with "Cluster" traffic policy // clusterPolicyChain contains the endpoints used with "Cluster" traffic policy
clusterPolicyChain := svcInfo.clusterPolicyChainName clusterPolicyChain := svcInfo.clusterPolicyChainName
usesClusterPolicyChain := len(clusterEndpoints) > 0 && svcInfo.UsesClusterEndpoints() usesClusterPolicyChain := len(clusterEndpoints) > 0 && svcInfo.UsesClusterEndpoints()
if usesClusterPolicyChain {
activeNATChains[clusterPolicyChain] = true
}
// localPolicyChain contains the endpoints used with "Local" traffic policy // localPolicyChain contains the endpoints used with "Local" traffic policy
localPolicyChain := svcInfo.localPolicyChainName localPolicyChain := svcInfo.localPolicyChainName
usesLocalPolicyChain := len(localEndpoints) > 0 && svcInfo.UsesLocalEndpoints() usesLocalPolicyChain := len(localEndpoints) > 0 && svcInfo.UsesLocalEndpoints()
if usesLocalPolicyChain {
activeNATChains[localPolicyChain] = true
}
// internalPolicyChain is the chain containing the endpoints for // internalPolicyChain is the chain containing the endpoints for
// "internal" (ClusterIP) traffic. internalTrafficChain is the chain that // "internal" (ClusterIP) traffic. internalTrafficChain is the chain that
@ -1023,9 +1010,6 @@ func (proxier *Proxier) syncProxyRules() {
// because we need the local-traffic-short-circuiting rules even when there // because we need the local-traffic-short-circuiting rules even when there
// are no externally-usable endpoints. // are no externally-usable endpoints.
usesExternalTrafficChain := hasEndpoints && svcInfo.ExternallyAccessible() usesExternalTrafficChain := hasEndpoints && svcInfo.ExternallyAccessible()
if usesExternalTrafficChain {
activeNATChains[externalTrafficChain] = true
}
// Traffic to LoadBalancer IPs can go directly to externalTrafficChain // Traffic to LoadBalancer IPs can go directly to externalTrafficChain
// unless LoadBalancerSourceRanges is in use in which case we will // unless LoadBalancerSourceRanges is in use in which case we will
@ -1034,7 +1018,6 @@ func (proxier *Proxier) syncProxyRules() {
fwChain := svcInfo.firewallChainName fwChain := svcInfo.firewallChainName
usesFWChain := hasEndpoints && len(svcInfo.LoadBalancerVIPStrings()) > 0 && len(svcInfo.LoadBalancerSourceRanges()) > 0 usesFWChain := hasEndpoints && len(svcInfo.LoadBalancerVIPStrings()) > 0 && len(svcInfo.LoadBalancerSourceRanges()) > 0
if usesFWChain { if usesFWChain {
activeNATChains[fwChain] = true
loadBalancerTrafficChain = fwChain loadBalancerTrafficChain = fwChain
} }
@ -1203,10 +1186,9 @@ func (proxier *Proxier) syncProxyRules() {
} }
// If the SVC/SVL/EXT/FW/SEP chains have not changed since the last sync // If the SVC/SVL/EXT/FW/SEP chains have not changed since the last sync
// then we can omit them from the restore input. (We have already marked // then we can omit them from the restore input. However, we have to still
// them in activeNATChains, so they won't get deleted.) However, we have // figure out how many chains we _would_ have written, to make the metrics
// to still figure out how many chains we _would_ have written to make the // come out right, so we just compute them and throw them away.
// metrics come out right, so we just compute them and throw them away.
if tryPartialSync && !serviceChanged.Has(svcName.NamespacedName.String()) && !endpointsChanged.Has(svcName.NamespacedName.String()) { if tryPartialSync && !serviceChanged.Has(svcName.NamespacedName.String()) && !endpointsChanged.Has(svcName.NamespacedName.String()) {
natChains = skippedNatChains natChains = skippedNatChains
natRules = skippedNatRules natRules = skippedNatRules
@ -1245,6 +1227,7 @@ func (proxier *Proxier) syncProxyRules() {
// then jump to externalPolicyChain. // then jump to externalPolicyChain.
if usesExternalTrafficChain { if usesExternalTrafficChain {
natChains.Write(utiliptables.MakeChainLine(externalTrafficChain)) natChains.Write(utiliptables.MakeChainLine(externalTrafficChain))
activeNATChains.Insert(externalTrafficChain)
if !svcInfo.ExternalPolicyLocal() { if !svcInfo.ExternalPolicyLocal() {
// If we are using non-local endpoints we need to masquerade, // If we are using non-local endpoints we need to masquerade,
@ -1299,6 +1282,7 @@ func (proxier *Proxier) syncProxyRules() {
// Set up firewall chain, if needed // Set up firewall chain, if needed
if usesFWChain { if usesFWChain {
natChains.Write(utiliptables.MakeChainLine(fwChain)) natChains.Write(utiliptables.MakeChainLine(fwChain))
activeNATChains.Insert(fwChain)
// The service firewall rules are created based on the // The service firewall rules are created based on the
// loadBalancerSourceRanges field. This only works for VIP-like // loadBalancerSourceRanges field. This only works for VIP-like
@ -1347,6 +1331,7 @@ func (proxier *Proxier) syncProxyRules() {
// from clusterPolicyChain to the clusterEndpoints // from clusterPolicyChain to the clusterEndpoints
if usesClusterPolicyChain { if usesClusterPolicyChain {
natChains.Write(utiliptables.MakeChainLine(clusterPolicyChain)) natChains.Write(utiliptables.MakeChainLine(clusterPolicyChain))
activeNATChains.Insert(clusterPolicyChain)
proxier.writeServiceToEndpointRules(natRules, svcPortNameString, svcInfo, clusterPolicyChain, clusterEndpoints, args) proxier.writeServiceToEndpointRules(natRules, svcPortNameString, svcInfo, clusterPolicyChain, clusterEndpoints, args)
} }
@ -1354,6 +1339,7 @@ func (proxier *Proxier) syncProxyRules() {
// from localPolicyChain to the localEndpoints // from localPolicyChain to the localEndpoints
if usesLocalPolicyChain { if usesLocalPolicyChain {
natChains.Write(utiliptables.MakeChainLine(localPolicyChain)) natChains.Write(utiliptables.MakeChainLine(localPolicyChain))
activeNATChains.Insert(localPolicyChain)
proxier.writeServiceToEndpointRules(natRules, svcPortNameString, svcInfo, localPolicyChain, localEndpoints, args) proxier.writeServiceToEndpointRules(natRules, svcPortNameString, svcInfo, localPolicyChain, localEndpoints, args)
} }
@ -1369,7 +1355,7 @@ func (proxier *Proxier) syncProxyRules() {
// Create the endpoint chain // Create the endpoint chain
natChains.Write(utiliptables.MakeChainLine(endpointChain)) natChains.Write(utiliptables.MakeChainLine(endpointChain))
activeNATChains[endpointChain] = true activeNATChains.Insert(endpointChain)
args = append(args[:0], "-A", string(endpointChain)) args = append(args[:0], "-A", string(endpointChain))
args = proxier.appendServiceCommentLocked(args, svcPortNameString) args = proxier.appendServiceCommentLocked(args, svcPortNameString)
@ -1394,26 +1380,21 @@ func (proxier *Proxier) syncProxyRules() {
// active rules, so they're harmless other than taking up memory.) // active rules, so they're harmless other than taking up memory.)
deletedChains := 0 deletedChains := 0
if !proxier.largeClusterMode || time.Since(proxier.lastIPTablesCleanup) > proxier.syncPeriod { if !proxier.largeClusterMode || time.Since(proxier.lastIPTablesCleanup) > proxier.syncPeriod {
var existingNATChains map[utiliptables.Chain]struct{}
proxier.iptablesData.Reset() proxier.iptablesData.Reset()
if err := proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData); err == nil { if err := proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData); err == nil {
existingNATChains = utiliptables.GetChainsFromTable(proxier.iptablesData.Bytes()) existingNATChains := utiliptables.GetChainsFromTable(proxier.iptablesData.Bytes())
for chain := range existingNATChains.Difference(activeNATChains) {
for chain := range existingNATChains { chainString := string(chain)
if !activeNATChains[chain] { if !isServiceChainName(chainString) {
chainString := string(chain) // Ignore chains that aren't ours.
if !isServiceChainName(chainString) { continue
// Ignore chains that aren't ours.
continue
}
// We must (as per iptables) write a chain-line
// for it, which has the nice effect of flushing
// the chain. Then we can remove the chain.
proxier.natChains.Write(utiliptables.MakeChainLine(chain))
proxier.natRules.Write("-X", chainString)
deletedChains++
} }
// We must (as per iptables) write a chain-line
// for it, which has the nice effect of flushing
// the chain. Then we can remove the chain.
proxier.natChains.Write(utiliptables.MakeChainLine(chain))
proxier.natRules.Write("-X", chainString)
deletedChains++
} }
proxier.lastIPTablesCleanup = time.Now() proxier.lastIPTablesCleanup = time.Now()
} else { } else {

View File

@ -19,6 +19,8 @@ package iptables
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"k8s.io/apimachinery/pkg/util/sets"
) )
// MakeChainLine return an iptables-save/restore formatted chain line given a Chain // MakeChainLine return an iptables-save/restore formatted chain line given a Chain
@ -27,10 +29,10 @@ func MakeChainLine(chain Chain) string {
} }
// GetChainsFromTable parses iptables-save data to find the chains that are defined. It // GetChainsFromTable parses iptables-save data to find the chains that are defined. It
// assumes that save contains a single table's data, and returns a map with keys for every // assumes that save contains a single table's data, and returns a set with keys for every
// chain defined in that table. // chain defined in that table.
func GetChainsFromTable(save []byte) map[Chain]struct{} { func GetChainsFromTable(save []byte) sets.Set[Chain] {
chainsMap := make(map[Chain]struct{}) chainsSet := sets.New[Chain]()
for { for {
i := bytes.Index(save, []byte("\n:")) i := bytes.Index(save, []byte("\n:"))
@ -45,8 +47,8 @@ func GetChainsFromTable(save []byte) map[Chain]struct{} {
break break
} }
chain := Chain(save[:end]) chain := Chain(save[:end])
chainsMap[chain] = struct{}{} chainsSet.Insert(chain)
save = save[end:] save = save[end:]
} }
return chainsMap return chainsSet
} }

View File

@ -20,19 +20,19 @@ import (
"testing" "testing"
"github.com/lithammer/dedent" "github.com/lithammer/dedent"
"k8s.io/apimachinery/pkg/util/sets"
) )
func checkChains(t *testing.T, save []byte, expected map[Chain]struct{}) { func checkChains(t *testing.T, save []byte, expected sets.Set[Chain]) {
chains := GetChainsFromTable(save) chains := GetChainsFromTable(save)
for chain := range expected { missing := expected.Difference(chains)
if _, exists := chains[chain]; !exists { if len(missing) != 0 {
t.Errorf("GetChainsFromTable expected chain not present: %s", chain) t.Errorf("GetChainsFromTable expected chains not present: %v", missing.UnsortedList())
}
} }
for chain := range chains { extra := chains.Difference(expected)
if _, exists := expected[chain]; !exists { if len(extra) != 0 {
t.Errorf("GetChainsFromTable chain unexpectedly present: %s", chain) t.Errorf("GetChainsFromTable expected chains unexpectedly present: %v", extra.UnsortedList())
}
} }
} }
@ -77,22 +77,23 @@ func TestGetChainsFromTable(t *testing.T) {
-A KUBE-SVC-6666666666666666 -m comment --comment "kube-system/kube-dns:dns" -j KUBE-SVC-1111111111111111 -A KUBE-SVC-6666666666666666 -m comment --comment "kube-system/kube-dns:dns" -j KUBE-SVC-1111111111111111
COMMIT COMMIT
`) `)
expected := map[Chain]struct{}{
ChainPrerouting: {}, expected := sets.New(
Chain("INPUT"): {}, ChainPrerouting,
Chain("OUTPUT"): {}, Chain("INPUT"),
ChainPostrouting: {}, Chain("OUTPUT"),
Chain("DOCKER"): {}, ChainPostrouting,
Chain("KUBE-NODEPORT-CONTAINER"): {}, Chain("DOCKER"),
Chain("KUBE-NODEPORT-HOST"): {}, Chain("KUBE-NODEPORT-CONTAINER"),
Chain("KUBE-PORTALS-CONTAINER"): {}, Chain("KUBE-NODEPORT-HOST"),
Chain("KUBE-PORTALS-HOST"): {}, Chain("KUBE-PORTALS-CONTAINER"),
Chain("KUBE-SVC-1111111111111111"): {}, Chain("KUBE-PORTALS-HOST"),
Chain("KUBE-SVC-2222222222222222"): {}, Chain("KUBE-SVC-1111111111111111"),
Chain("KUBE-SVC-3333333333333333"): {}, Chain("KUBE-SVC-2222222222222222"),
Chain("KUBE-SVC-4444444444444444"): {}, Chain("KUBE-SVC-3333333333333333"),
Chain("KUBE-SVC-5555555555555555"): {}, Chain("KUBE-SVC-4444444444444444"),
Chain("KUBE-SVC-6666666666666666"): {}, Chain("KUBE-SVC-5555555555555555"),
} Chain("KUBE-SVC-6666666666666666"),
)
checkChains(t, []byte(iptablesSave), expected) checkChains(t, []byte(iptablesSave), expected)
} }