Simplify nftables/proxier.go by removing large-cluster mode

since things will be optimized differently in nftables
This commit is contained in:
Dan Winship 2023-06-01 10:32:20 -04:00
parent a70653143e
commit 39a5af1d0a
2 changed files with 22 additions and 253 deletions

View File

@ -83,11 +83,6 @@ const (
// the anti-martian-packet rule. It should not be used for any other
// rules.
kubeletFirewallChain utiliptables.Chain = "KUBE-FIREWALL"
// largeClusterEndpointsThreshold is the number of endpoints at which
// we switch into "large cluster mode" and optimize for iptables
// performance over iptables debuggability
largeClusterEndpointsThreshold = 1000
)
const sysctlRouteLocalnet = "net/ipv4/conf/all/route_localnet"
@ -162,7 +157,6 @@ type Proxier struct {
initialized int32
syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
syncPeriod time.Duration
lastIPTablesCleanup time.Time
// These are effectively const and do not need the mutex to be held.
iptables utiliptables.Interface
@ -191,11 +185,6 @@ type Proxier struct {
natChains proxyutil.LineBuffer
natRules proxyutil.LineBuffer
// largeClusterMode is set at the beginning of syncProxyRules if we are
// going to end up outputting "lots" of iptables rules and so we need to
// optimize for performance over debuggability.
largeClusterMode bool
// localhostNodePorts indicates whether we allow NodePort services to be accessed
// via localhost.
localhostNodePorts bool
@ -742,17 +731,6 @@ func isServiceChainName(chainString string) bool {
return false
}
// Assumes proxier.mu is held.
func (proxier *Proxier) appendServiceCommentLocked(args []string, svcName string) []string {
// Not printing these comments, can reduce size of iptables (in case of large
// number of endpoints) even by 40%+. So if total number of endpoint chains
// is large enough, we simply drop those comments.
if proxier.largeClusterMode {
return args
}
return append(args, "-m", "comment", "--comment", svcName)
}
// Called by the iptables.Monitor, and in response to topology changes; this calls
// syncProxyRules() and tells it to resync all services, regardless of whether the
// Service or Endpoints/EndpointSlice objects themselves have changed
@ -940,7 +918,6 @@ func (proxier *Proxier) syncProxyRules() {
for svcName := range proxier.svcPortMap {
totalEndpoints += len(proxier.endpointsMap[svcName])
}
proxier.largeClusterMode = (totalEndpoints > largeClusterEndpointsThreshold)
// These two variables are used to publish the sync_proxy_rules_no_endpoints_total
// metric.
@ -1372,7 +1349,7 @@ func (proxier *Proxier) syncProxyRules() {
activeNATChains[endpointChain] = true
args = append(args[:0], "-A", string(endpointChain))
args = proxier.appendServiceCommentLocked(args, svcPortNameString)
args = append(args, "-m", "comment", "--comment", svcPortNameString)
// Handle traffic that loops back to the originator with SNAT.
natRules.Write(
args,
@ -1388,37 +1365,31 @@ func (proxier *Proxier) syncProxyRules() {
}
}
// Delete chains no longer in use. Since "iptables-save" can take several seconds
// to run on hosts with lots of iptables rules, we don't bother to do this on
// every sync in large clusters. (Stale chains will not be referenced by any
// active rules, so they're harmless other than taking up memory.)
// Delete chains no longer in use.
deletedChains := 0
if !proxier.largeClusterMode || time.Since(proxier.lastIPTablesCleanup) > proxier.syncPeriod {
var existingNATChains map[utiliptables.Chain]struct{}
var existingNATChains map[utiliptables.Chain]struct{}
proxier.iptablesData.Reset()
if err := proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData); err == nil {
existingNATChains = utiliptables.GetChainsFromTable(proxier.iptablesData.Bytes())
proxier.iptablesData.Reset()
if err := proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData); err == nil {
existingNATChains = utiliptables.GetChainsFromTable(proxier.iptablesData.Bytes())
for chain := range existingNATChains {
if !activeNATChains[chain] {
chainString := string(chain)
if !isServiceChainName(chainString) {
// Ignore chains that aren't ours.
continue
}
// We must (as per iptables) write a chain-line
// for it, which has the nice effect of flushing
// the chain. Then we can remove the chain.
proxier.natChains.Write(utiliptables.MakeChainLine(chain))
proxier.natRules.Write("-X", chainString)
deletedChains++
for chain := range existingNATChains {
if !activeNATChains[chain] {
chainString := string(chain)
if !isServiceChainName(chainString) {
// Ignore chains that aren't ours.
continue
}
// We must (as per iptables) write a chain-line
// for it, which has the nice effect of flushing
// the chain. Then we can remove the chain.
proxier.natChains.Write(utiliptables.MakeChainLine(chain))
proxier.natRules.Write("-X", chainString)
deletedChains++
}
proxier.lastIPTablesCleanup = time.Now()
} else {
klog.ErrorS(err, "Failed to execute iptables-save: stale chains will not be deleted")
}
} else {
klog.ErrorS(err, "Failed to execute iptables-save: stale chains will not be deleted")
}
// Finally, tail-call to the nodePorts chain. This needs to be after all
@ -1579,7 +1550,7 @@ func (proxier *Proxier) writeServiceToEndpointRules(natRules proxyutil.LineBuffe
args = append(args[:0],
"-A", string(svcChain),
)
args = proxier.appendServiceCommentLocked(args, comment)
args = append(args, "-m", "comment", "--comment", comment)
args = append(args,
"-m", "recent", "--name", string(epInfo.ChainName),
"--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds()), "--reap",
@ -1599,7 +1570,7 @@ func (proxier *Proxier) writeServiceToEndpointRules(natRules proxyutil.LineBuffe
comment := fmt.Sprintf(`"%s -> %s"`, svcPortNameString, epInfo.String())
args = append(args[:0], "-A", string(svcChain))
args = proxier.appendServiceCommentLocked(args, comment)
args = append(args, "-m", "comment", "--comment", comment)
if i < (numEndpoints - 1) {
// Each rule is a probabilistic match.
args = append(args,

View File

@ -5943,208 +5943,6 @@ func TestInternalExternalMasquerade(t *testing.T) {
}
}
func countEndpointsAndComments(iptablesData string, matchEndpoint string) (string, int, int) {
var numEndpoints, numComments int
var matched string
for _, line := range strings.Split(iptablesData, "\n") {
if strings.HasPrefix(line, "-A KUBE-SEP-") && strings.Contains(line, "-j DNAT") {
numEndpoints++
if strings.Contains(line, "--comment") {
numComments++
}
if strings.Contains(line, matchEndpoint) {
matched = line
}
}
}
return matched, numEndpoints, numComments
}
func TestSyncProxyRulesLargeClusterMode(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
fp.masqueradeAll = true
fp.syncPeriod = 30 * time.Second
makeServiceMap(fp,
makeTestService("ns1", "svc1", func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeClusterIP
svc.Spec.ClusterIP = "172.30.0.41"
svc.Spec.Ports = []v1.ServicePort{{
Name: "p80",
Port: 80,
Protocol: v1.ProtocolTCP,
}}
}),
makeTestService("ns2", "svc2", func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeClusterIP
svc.Spec.ClusterIP = "172.30.0.42"
svc.Spec.Ports = []v1.ServicePort{{
Name: "p8080",
Port: 8080,
Protocol: v1.ProtocolTCP,
}}
}),
makeTestService("ns3", "svc3", func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeClusterIP
svc.Spec.ClusterIP = "172.30.0.43"
svc.Spec.Ports = []v1.ServicePort{{
Name: "p8081",
Port: 8081,
Protocol: v1.ProtocolTCP,
}}
}),
)
populateEndpointSlices(fp,
makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = make([]discovery.Endpoint, largeClusterEndpointsThreshold/2-1)
for i := range eps.Endpoints {
eps.Endpoints[i].Addresses = []string{fmt.Sprintf("10.0.%d.%d", i%256, i/256)}
}
eps.Ports = []discovery.EndpointPort{{
Name: ptr.To("p80"),
Port: ptr.To[int32](80),
Protocol: ptr.To(v1.ProtocolTCP),
}}
}),
makeTestEndpointSlice("ns2", "svc2", 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = make([]discovery.Endpoint, largeClusterEndpointsThreshold/2-1)
for i := range eps.Endpoints {
eps.Endpoints[i].Addresses = []string{fmt.Sprintf("10.1.%d.%d", i%256, i/256)}
}
eps.Ports = []discovery.EndpointPort{{
Name: ptr.To("p8080"),
Port: ptr.To[int32](8080),
Protocol: ptr.To(v1.ProtocolTCP),
}}
}),
)
fp.syncProxyRules()
expectedEndpoints := 2 * (largeClusterEndpointsThreshold/2 - 1)
firstEndpoint, numEndpoints, numComments := countEndpointsAndComments(fp.iptablesData.String(), "10.0.0.0")
assert.Equal(t, "-A KUBE-SEP-DKGQUZGBKLTPAR56 -m comment --comment ns1/svc1:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.0.0:80", firstEndpoint)
if numEndpoints != expectedEndpoints {
t.Errorf("Found wrong number of endpoints: expected %d, got %d", expectedEndpoints, numEndpoints)
}
if numComments != numEndpoints {
t.Errorf("numComments (%d) != numEndpoints (%d) when numEndpoints < threshold (%d)", numComments, numEndpoints, largeClusterEndpointsThreshold)
}
fp.OnEndpointSliceAdd(makeTestEndpointSlice("ns3", "svc3", 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{"203.0.113.4"},
}, {
Addresses: []string{"203.0.113.8"},
}, {
Addresses: []string{"203.0.113.12"},
}}
eps.Ports = []discovery.EndpointPort{{
Name: ptr.To("p8081"),
Port: ptr.To[int32](8081),
Protocol: ptr.To(v1.ProtocolTCP),
}}
}))
fp.syncProxyRules()
firstEndpoint, numEndpoints, numComments = countEndpointsAndComments(fp.iptablesData.String(), "203.0.113.4")
assert.Equal(t, "-A KUBE-SEP-RUVVH7YV3PHQBDOS -m tcp -p tcp -j DNAT --to-destination 203.0.113.4:8081", firstEndpoint)
// syncProxyRules will only have output the endpoints for svc3, since the others
// didn't change (and syncProxyRules doesn't automatically do a full resync when you
// cross the largeClusterEndpointsThreshold).
if numEndpoints != 3 {
t.Errorf("Found wrong number of endpoints on partial resync: expected %d, got %d", 3, numEndpoints)
}
if numComments != 0 {
t.Errorf("numComments (%d) != 0 after partial resync when numEndpoints (%d) > threshold (%d)", numComments, expectedEndpoints+3, largeClusterEndpointsThreshold)
}
// Now force a full resync and confirm that it rewrites the older services with
// no comments as well.
fp.forceSyncProxyRules()
expectedEndpoints += 3
firstEndpoint, numEndpoints, numComments = countEndpointsAndComments(fp.iptablesData.String(), "10.0.0.0")
assert.Equal(t, "-A KUBE-SEP-DKGQUZGBKLTPAR56 -m tcp -p tcp -j DNAT --to-destination 10.0.0.0:80", firstEndpoint)
if numEndpoints != expectedEndpoints {
t.Errorf("Found wrong number of endpoints: expected %d, got %d", expectedEndpoints, numEndpoints)
}
if numComments != 0 {
t.Errorf("numComments (%d) != 0 when numEndpoints (%d) > threshold (%d)", numComments, numEndpoints, largeClusterEndpointsThreshold)
}
// Now test service deletion; we have to create another service to do this though,
// because if we deleted any of the existing services, we'd fall back out of large
// cluster mode.
svc4 := makeTestService("ns4", "svc4", func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeClusterIP
svc.Spec.ClusterIP = "172.30.0.44"
svc.Spec.Ports = []v1.ServicePort{{
Name: "p8082",
Port: 8082,
Protocol: v1.ProtocolTCP,
}}
})
fp.OnServiceAdd(svc4)
fp.OnEndpointSliceAdd(makeTestEndpointSlice("ns4", "svc4", 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{"10.4.0.1"},
}}
eps.Ports = []discovery.EndpointPort{{
Name: ptr.To("p8082"),
Port: ptr.To[int32](8082),
Protocol: ptr.To(v1.ProtocolTCP),
}}
}))
fp.syncProxyRules()
svc4Endpoint, numEndpoints, _ := countEndpointsAndComments(fp.iptablesData.String(), "10.4.0.1")
assert.Equal(t, "-A KUBE-SEP-SU5STNODRYEWJAUF -m tcp -p tcp -j DNAT --to-destination 10.4.0.1:8082", svc4Endpoint, "svc4 endpoint was not created")
// should only sync svc4
if numEndpoints != 1 {
t.Errorf("Found wrong number of endpoints after svc4 creation: expected %d, got %d", 1, numEndpoints)
}
// In large-cluster mode, if we delete a service, it will not re-sync its chains
// but it will not delete them immediately either.
fp.lastIPTablesCleanup = time.Now()
fp.OnServiceDelete(svc4)
fp.syncProxyRules()
svc4Endpoint, numEndpoints, _ = countEndpointsAndComments(fp.iptablesData.String(), "10.4.0.1")
assert.Equal(t, "", svc4Endpoint, "svc4 endpoint was still created!")
// should only sync svc4, and shouldn't output its endpoints
if numEndpoints != 0 {
t.Errorf("Found wrong number of endpoints after service deletion: expected %d, got %d", 0, numEndpoints)
}
assert.NotContains(t, fp.iptablesData.String(), "-X ", "iptables data unexpectedly contains chain deletions")
// But resyncing after a long-enough delay will delete the stale chains
fp.lastIPTablesCleanup = time.Now().Add(-fp.syncPeriod).Add(-1)
fp.syncProxyRules()
svc4Endpoint, numEndpoints, _ = countEndpointsAndComments(fp.iptablesData.String(), "10.4.0.1")
assert.Equal(t, "", svc4Endpoint, "svc4 endpoint was still created!")
if numEndpoints != 0 {
t.Errorf("Found wrong number of endpoints after delayed resync: expected %d, got %d", 0, numEndpoints)
}
assert.Contains(t, fp.iptablesData.String(), "-X KUBE-SVC-EBDQOQU5SJFXRIL3", "iptables data does not contain chain deletion")
assert.Contains(t, fp.iptablesData.String(), "-X KUBE-SEP-SU5STNODRYEWJAUF", "iptables data does not contain endpoint deletions")
// force a full sync and count
fp.forceSyncProxyRules()
_, numEndpoints, _ = countEndpointsAndComments(fp.iptablesData.String(), "10.0.0.0")
if numEndpoints != expectedEndpoints {
t.Errorf("Found wrong number of endpoints: expected %d, got %d", expectedEndpoints, numEndpoints)
}
}
// Test calling syncProxyRules() multiple times with various changes
func TestSyncProxyRulesRepeated(t *testing.T) {
ipt := iptablestest.NewFake()