proxy/iptables: Add metric for partial sync failures, add test

This commit is contained in:
Dan Winship 2022-08-15 17:07:12 -04:00
parent ab326d2f4e
commit 818de5a545
5 changed files with 187 additions and 8 deletions

View File

@ -846,6 +846,9 @@ func (proxier *Proxier) syncProxyRules() {
if !success {
klog.InfoS("Sync failed", "retryingTime", proxier.syncPeriod)
proxier.syncRunner.RetryAfter(proxier.syncPeriod)
if tryPartialSync {
metrics.IptablesPartialRestoreFailuresTotal.Inc()
}
// proxier.serviceChanges and proxier.endpointChanges have already
// been flushed, so we've lost the state needed to be able to do
// a partial sync.

View File

@ -7684,6 +7684,7 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
metrics.RegisterMetrics()
// Create initial state
var svc2 *v1.Service
@ -7878,8 +7879,10 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
// Add a service, sync, then add its endpoints. (The first sync will be a no-op other
// than adding the REJECT rule. The second sync will create the new service.)
var svc4 *v1.Service
makeServiceMap(fp,
makeTestService("ns4", "svc4", func(svc *v1.Service) {
svc4 = svc
svc.Spec.Type = v1.ServiceTypeClusterIP
svc.Spec.ClusterIP = "172.30.0.44"
svc.Spec.Ports = []v1.ServicePort{{
@ -8083,6 +8086,102 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
COMMIT
`)
assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String())
// Now force a partial resync error and ensure that it recovers correctly
if fp.needFullSync {
t.Fatalf("Proxier unexpectedly already needs a full sync?")
}
prFailures, err := testutil.GetCounterMetricValue(metrics.IptablesPartialRestoreFailuresTotal)
if err != nil {
t.Fatalf("Could not get partial restore failures metric: %v", err)
}
if prFailures != 0.0 {
t.Errorf("Already did a partial resync? Something failed earlier!")
}
// Add a rule jumping from svc3's service chain to svc4's endpoint, then try to
// delete svc4. This will fail because the partial resync won't rewrite svc3's
// rules and so the partial restore would leave a dangling jump from there to
// svc4's endpoint. The proxier will then queue a full resync in response to the
// partial resync failure, and the full resync will succeed (since it will rewrite
// svc3's rules as well).
//
// This is an absurd scenario, but it has to be; partial resync failures are
// supposed to be impossible; if we knew of any non-absurd scenario that would
// cause such a failure, then that would be a bug and we would fix it.
if _, err := fp.iptables.ChainExists(utiliptables.TableNAT, utiliptables.Chain("KUBE-SEP-AYCN5HPXMIRJNJXU")); err != nil {
t.Fatalf("svc4's endpoint chain unexpected already does not exist!")
}
if _, err := fp.iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, utiliptables.Chain("KUBE-SVC-X27LE4BHSL4DOUIK"), "-j", "KUBE-SEP-AYCN5HPXMIRJNJXU"); err != nil {
t.Fatalf("Could not add bad iptables rule: %v", err)
}
fp.OnServiceDelete(svc4)
fp.syncProxyRules()
if _, err := fp.iptables.ChainExists(utiliptables.TableNAT, utiliptables.Chain("KUBE-SEP-AYCN5HPXMIRJNJXU")); err != nil {
t.Errorf("svc4's endpoint chain was successfully deleted despite dangling references!")
}
if !fp.needFullSync {
t.Errorf("Proxier did not fail on previous partial resync?")
}
updatedPRFailures, err := testutil.GetCounterMetricValue(metrics.IptablesPartialRestoreFailuresTotal)
if err != nil {
t.Errorf("Could not get partial restore failures metric: %v", err)
}
if updatedPRFailures != prFailures+1.0 {
t.Errorf("Partial restore failures metric was not incremented after failed partial resync (expected %.02f, got %.02f)", prFailures+1.0, updatedPRFailures)
}
// On retry we should do a full resync, which should succeed (and delete svc4)
fp.syncProxyRules()
expected = dedent.Dedent(`
*filter
:KUBE-NODEPORTS - [0:0]
:KUBE-SERVICES - [0:0]
:KUBE-EXTERNAL-SERVICES - [0:0]
:KUBE-FORWARD - [0:0]
:KUBE-PROXY-FIREWALL - [0:0]
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
COMMIT
*nat
:KUBE-NODEPORTS - [0:0]
:KUBE-SERVICES - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-POSTROUTING - [0:0]
:KUBE-SEP-AYCN5HPXMIRJNJXU - [0:0]
:KUBE-SEP-DKCFIS26GWF2WLWC - [0:0]
:KUBE-SEP-JVVZVJ7BSEPPRNBS - [0:0]
:KUBE-SEP-SNQ3ZNILQDEJNDQO - [0:0]
:KUBE-SVC-4SW47YFZTEDKD3PK - [0:0]
:KUBE-SVC-X27LE4BHSL4DOUIK - [0:0]
:KUBE-SVC-XPGD46QRK7WJZT7O - [0:0]
-A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 -j KUBE-SVC-XPGD46QRK7WJZT7O
-A KUBE-SERVICES -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-X27LE4BHSL4DOUIK
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
-A KUBE-SEP-DKCFIS26GWF2WLWC -m comment --comment ns3/svc3:p80 -s 10.0.3.2 -j KUBE-MARK-MASQ
-A KUBE-SEP-DKCFIS26GWF2WLWC -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.2:80
-A KUBE-SEP-JVVZVJ7BSEPPRNBS -m comment --comment ns3/svc3:p80 -s 10.0.3.3 -j KUBE-MARK-MASQ
-A KUBE-SEP-JVVZVJ7BSEPPRNBS -m comment --comment ns3/svc3:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.3.3:80
-A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -s 10.0.1.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-SNQ3ZNILQDEJNDQO -m comment --comment ns1/svc1:p80 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.2:80" -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-DKCFIS26GWF2WLWC
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 -> 10.0.3.3:80" -j KUBE-SEP-JVVZVJ7BSEPPRNBS
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 -> 10.0.1.1:80" -j KUBE-SEP-SNQ3ZNILQDEJNDQO
-X KUBE-SEP-AYCN5HPXMIRJNJXU
-X KUBE-SVC-4SW47YFZTEDKD3PK
COMMIT
`)
assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String())
}
func TestNoEndpointsMetric(t *testing.T) {

View File

@ -126,6 +126,17 @@ var (
},
)
// IptablesPartialRestoreFailuresTotal is the number of iptables *partial* restore
// failures (resulting in a fall back to a full restore) that the proxy has seen.
IptablesPartialRestoreFailuresTotal = metrics.NewCounter(
&metrics.CounterOpts{
Subsystem: kubeProxySubsystem,
Name: "sync_proxy_rules_iptables_partial_restore_failures_total",
Help: "Cumulative proxy iptables partial restore failures",
StabilityLevel: metrics.ALPHA,
},
)
// IptablesRulesTotal is the number of iptables rules that the iptables proxy installs.
IptablesRulesTotal = metrics.NewGaugeVec(
&metrics.GaugeOpts{
@ -177,6 +188,7 @@ func RegisterMetrics() {
legacyregistry.MustRegister(ServiceChangesTotal)
legacyregistry.MustRegister(IptablesRulesTotal)
legacyregistry.MustRegister(IptablesRestoreFailuresTotal)
legacyregistry.MustRegister(IptablesPartialRestoreFailuresTotal)
legacyregistry.MustRegister(SyncProxyRulesLastQueuedTimestamp)
legacyregistry.MustRegister(SyncProxyRulesNoLocalEndpointsTotal)
})

View File

@ -22,6 +22,7 @@ import (
"strings"
"time"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/iptables"
)
@ -217,16 +218,22 @@ func (f *FakeIPTables) SaveInto(table iptables.Table, buffer *bytes.Buffer) erro
return f.saveTable(table, buffer)
}
func (f *FakeIPTables) restoreTable(newTable *Table, flush iptables.FlushFlag, counters iptables.RestoreCountersFlag) error {
// This is not a complete list but it's enough to pass the unit tests
var builtinTargets = sets.NewString("ACCEPT", "DROP", "RETURN", "REJECT", "DNAT", "SNAT", "MASQUERADE", "MARK")
func (f *FakeIPTables) restoreTable(newDump *IPTablesDump, newTable *Table, flush iptables.FlushFlag, counters iptables.RestoreCountersFlag) error {
oldTable, err := f.Dump.GetTable(newTable.Name)
if err != nil {
return err
}
backupChains := make([]Chain, len(oldTable.Chains))
copy(backupChains, oldTable.Chains)
// Update internal state
if flush == iptables.FlushTables {
oldTable.Chains = make([]Chain, 0, len(newTable.Chains))
}
for _, newChain := range newTable.Chains {
oldChain, _ := f.Dump.GetChain(newTable.Name, newChain.Name)
switch {
@ -235,7 +242,6 @@ func (f *FakeIPTables) restoreTable(newTable *Table, flush iptables.FlushFlag, c
case oldChain == nil && !newChain.Deleted:
oldTable.Chains = append(oldTable.Chains, newChain)
case oldChain != nil && newChain.Deleted:
// FIXME: should make sure chain is not referenced from other jumps
_ = f.DeleteChain(newTable.Name, newChain.Name)
case oldChain != nil && !newChain.Deleted:
// replace old data with new
@ -246,6 +252,35 @@ func (f *FakeIPTables) restoreTable(newTable *Table, flush iptables.FlushFlag, c
}
}
}
// Now check that all old/new jumps are valid
for _, chain := range oldTable.Chains {
for _, rule := range chain.Rules {
if rule.Jump == nil {
continue
}
if builtinTargets.Has(rule.Jump.Value) {
continue
}
jumpedChain, _ := f.Dump.GetChain(oldTable.Name, iptables.Chain(rule.Jump.Value))
if jumpedChain == nil {
newChain, _ := newDump.GetChain(oldTable.Name, iptables.Chain(rule.Jump.Value))
if newChain != nil {
// rule is an old rule that jumped to a chain which
// was deleted by newDump.
oldTable.Chains = backupChains
return fmt.Errorf("deleted chain %q is referenced by existing rules", newChain.Name)
} else {
// rule is a new rule that jumped to a chain that was
// neither created nor pre-existing
oldTable.Chains = backupChains
return fmt.Errorf("rule %q jumps to a non-existent chain", rule.Raw)
}
}
}
}
return nil
}
@ -261,7 +296,7 @@ func (f *FakeIPTables) Restore(table iptables.Table, data []byte, flush iptables
return err
}
return f.restoreTable(newTable, flush, counters)
return f.restoreTable(dump, newTable, flush, counters)
}
// RestoreAll is part of iptables.Interface
@ -272,7 +307,7 @@ func (f *FakeIPTables) RestoreAll(data []byte, flush iptables.FlushFlag, counter
}
for i := range dump.Tables {
err = f.restoreTable(&dump.Tables[i], flush, counters)
err = f.restoreTable(dump, &dump.Tables[i], flush, counters)
if err != nil {
return err
}

View File

@ -170,10 +170,12 @@ func TestFakeIPTables(t *testing.T) {
*nat
:KUBE-RESTORED - [0:0]
:KUBE-MISC-CHAIN - [0:0]
:KUBE-MISC-TWO - [0:0]
:KUBE-EMPTY - [0:0]
-A KUBE-RESTORED -m comment --comment "restored chain" -j ACCEPT
-A KUBE-MISC-CHAIN -s 1.2.3.4 -j DROP
-A KUBE-MISC-CHAIN -s 1.2.3.4 -j KUBE-MISC-TWO
-A KUBE-MISC-CHAIN -d 5.6.7.8 -j MASQUERADE
-A KUBE-MISC-TWO -j ACCEPT
COMMIT
`, "\n"))
err = fake.Restore(iptables.TableNAT, []byte(rules), iptables.NoFlushTables, iptables.NoRestoreCounters)
@ -196,11 +198,13 @@ func TestFakeIPTables(t *testing.T) {
:KUBE-TEST - [0:0]
:KUBE-RESTORED - [0:0]
:KUBE-MISC-CHAIN - [0:0]
:KUBE-MISC-TWO - [0:0]
:KUBE-EMPTY - [0:0]
-A KUBE-TEST -j ACCEPT
-A KUBE-RESTORED -m comment --comment "restored chain" -j ACCEPT
-A KUBE-MISC-CHAIN -s 1.2.3.4 -j DROP
-A KUBE-MISC-CHAIN -s 1.2.3.4 -j KUBE-MISC-TWO
-A KUBE-MISC-CHAIN -d 5.6.7.8 -j MASQUERADE
-A KUBE-MISC-TWO -j ACCEPT
COMMIT
*filter
:INPUT - [0:0]
@ -214,6 +218,30 @@ func TestFakeIPTables(t *testing.T) {
t.Fatalf("bad post-restore dump. expected:\n%s\n\ngot:\n%s\n", expected, buf.Bytes())
}
// Trying to use Restore to delete a chain that another chain jumps to will fail
rules = dedent.Dedent(strings.Trim(`
*nat
:KUBE-MISC-TWO - [0:0]
-X KUBE-MISC-TWO
COMMIT
`, "\n"))
err = fake.Restore(iptables.TableNAT, []byte(rules), iptables.NoFlushTables, iptables.RestoreCounters)
if err == nil || !strings.Contains(err.Error(), "referenced by existing rules") {
t.Fatalf("Expected 'referenced by existing rules' error from Restore, got %v", err)
}
// Trying to use Restore to add a jump to a non-existent chain will fail
rules = dedent.Dedent(strings.Trim(`
*nat
:KUBE-MISC-TWO - [0:0]
-A KUBE-MISC-TWO -j KUBE-MISC-THREE
COMMIT
`, "\n"))
err = fake.Restore(iptables.TableNAT, []byte(rules), iptables.NoFlushTables, iptables.RestoreCounters)
if err == nil || !strings.Contains(err.Error(), "non-existent chain") {
t.Fatalf("Expected 'non-existent chain' error from Restore, got %v", err)
}
// more Restore; empty out one chain and delete another, but also update its counters
rules = dedent.Dedent(strings.Trim(`
*nat
@ -240,9 +268,11 @@ func TestFakeIPTables(t *testing.T) {
:POSTROUTING - [0:0]
:KUBE-TEST - [99:9999]
:KUBE-MISC-CHAIN - [0:0]
:KUBE-MISC-TWO - [0:0]
:KUBE-EMPTY - [0:0]
-A KUBE-MISC-CHAIN -s 1.2.3.4 -j DROP
-A KUBE-MISC-CHAIN -s 1.2.3.4 -j KUBE-MISC-TWO
-A KUBE-MISC-CHAIN -d 5.6.7.8 -j MASQUERADE
-A KUBE-MISC-TWO -j ACCEPT
COMMIT
*filter
:INPUT - [0:0]