diff --git a/pkg/util/iptables/iptables.go b/pkg/util/iptables/iptables.go index a1424c590f9..a4766a793fb 100644 --- a/pkg/util/iptables/iptables.go +++ b/pkg/util/iptables/iptables.go @@ -502,16 +502,22 @@ const ( // Monitor is part of Interface func (runner *runner) Monitor(canary Chain, tables []Table, reloadFunc func(), interval time.Duration, stopCh <-chan struct{}) { for { - for _, table := range tables { - if _, err := runner.EnsureChain(table, canary); err != nil { - klog.Warningf("Could not set up iptables canary %s/%s: %v", string(table), string(canary), err) - return + _ = utilwait.PollImmediateUntil(interval, func() (bool, error) { + for _, table := range tables { + if _, err := runner.EnsureChain(table, canary); err != nil { + klog.Warningf("Could not set up iptables canary %s/%s: %v", string(table), string(canary), err) + return false, nil + } } - } + return true, nil + }, stopCh) // Poll until stopCh is closed or iptables is flushed err := utilwait.PollUntil(interval, func() (bool, error) { - if runner.chainExists(tables[0], canary) { + if exists, err := runner.chainExists(tables[0], canary); exists { + return false, nil + } else if isResourceError(err) { + klog.Warningf("Could not check for iptables canary %s/%s: %v", string(tables[0]), string(canary), err) return false, nil } klog.V(2).Infof("iptables canary %s/%s deleted", string(tables[0]), string(canary)) @@ -520,7 +526,7 @@ func (runner *runner) Monitor(canary Chain, tables []Table, reloadFunc func(), i // so we don't start reloading too soon. err := utilwait.PollImmediate(iptablesFlushPollTime, iptablesFlushTimeout, func() (bool, error) { for i := 1; i < len(tables); i++ { - if runner.chainExists(tables[i], canary) { + if exists, err := runner.chainExists(tables[i], canary); exists || isResourceError(err) { return false, nil } } @@ -547,14 +553,14 @@ func (runner *runner) Monitor(canary Chain, tables []Table, reloadFunc func(), i // chainExists is used internally by Monitor; none of the public Interface methods can be // used to distinguish "chain exists" from "chain does not exist" with no side effects -func (runner *runner) chainExists(table Table, chain Chain) bool { +func (runner *runner) chainExists(table Table, chain Chain) (bool, error) { fullArgs := makeFullArgs(table, chain) runner.mu.Lock() defer runner.mu.Unlock() _, err := runner.run(opListChain, fullArgs) - return err == nil + return err == nil, err } type operation string @@ -691,3 +697,16 @@ func IsNotFoundError(err error) bool { } return false } + +const iptablesStatusResourceProblem = 4 + +// isResourceError returns true if the error indicates that iptables ran into a "resource +// problem" and was unable to attempt the request. In particular, this will be true if it +// times out trying to get the iptables lock. +func isResourceError(err error) bool { + if ee, isExitError := err.(utilexec.ExitError); isExitError { + return ee.ExitStatus() == iptablesStatusResourceProblem + } else { + return false + } +} diff --git a/pkg/util/iptables/monitor_test.go b/pkg/util/iptables/monitor_test.go index 0a0cc875d6a..7f4d4176dc8 100644 --- a/pkg/util/iptables/monitor_test.go +++ b/pkg/util/iptables/monitor_test.go @@ -41,6 +41,9 @@ type monitorFakeExec struct { sync.Mutex tables map[string]sets.String + + block bool + wasBlocked bool } func newMonitorFakeExec() *monitorFakeExec { @@ -51,6 +54,22 @@ func newMonitorFakeExec() *monitorFakeExec { return &monitorFakeExec{tables: tables} } +func (mfe *monitorFakeExec) blockIPTables(block bool) { + mfe.Lock() + defer mfe.Unlock() + + mfe.block = block +} + +func (mfe *monitorFakeExec) getWasBlocked() bool { + mfe.Lock() + defer mfe.Unlock() + + wasBlocked := mfe.wasBlocked + mfe.wasBlocked = false + return wasBlocked +} + func (mfe *monitorFakeExec) Command(cmd string, args ...string) exec.Cmd { return &monitorFakeCmd{mfe: mfe, cmd: cmd, args: args} } @@ -96,6 +115,12 @@ func (mfc *monitorFakeCmd) CombinedOutput() ([]byte, error) { return []byte{}, fmt.Errorf("no such table %q", tableName) } + // For ease-of-testing reasons, blockIPTables blocks create and list, but not delete + if mfc.mfe.block && op != opDeleteChain { + mfc.mfe.wasBlocked = true + return []byte{}, exec.CodeExitError{Code: 4, Err: fmt.Errorf("could not get xtables.lock, etc")} + } + switch op { case opCreateChain: if !table.Has(chainName) { @@ -199,8 +224,17 @@ func TestIPTablesMonitor(t *testing.T) { t.Errorf("got unexpected number of reloads after partial flush: %v", err) } - // If we delete the last chain, it should reload now + // Now ensure that "iptables -L" will get an error about the xtables.lock, and + // delete the last chain. The monitor should not reload, because it can't actually + // tell if the chain was deleted or not. + mfe.blockIPTables(true) ipt.DeleteChain(TableNAT, canary) + if err := waitForBlocked(mfe); err != nil { + t.Errorf("failed waiting for monitor to be blocked from monitoring: %v", err) + } + + // After unblocking the monitor, it should now reload + mfe.blockIPTables(false) if err := waitForReloads(&reloads, 2); err != nil { t.Errorf("got unexpected number of reloads after slow flush: %v", err) @@ -213,11 +247,40 @@ func TestIPTablesMonitor(t *testing.T) { close(stopCh) if err := waitForNoReload(&reloads, 2); err != nil { - t.Errorf("got unexpected number of reloads after partial flush: %v", err) + t.Errorf("got unexpected number of reloads after stop: %v", err) } if !ensureNoChains(mfe) { t.Errorf("canaries still exist after stopping monitor") } + + // If we create a new monitor while the iptables lock is held, it will + // retry creating canaries until it succeeds + + stopCh = make(chan struct{}) + _ = mfe.getWasBlocked() + mfe.blockIPTables(true) + go ipt.Monitor(canary, tables, func() { + if !ensureNoChains(mfe) { + t.Errorf("reload called while canaries still exist") + } + atomic.AddUint32(&reloads, 1) + }, 100*time.Millisecond, stopCh) + + // Monitor should not have created canaries yet + if !ensureNoChains(mfe) { + t.Errorf("canary created while iptables blocked") + } + + if err := waitForBlocked(mfe); err != nil { + t.Errorf("failed waiting for monitor to fail creating canaries: %v", err) + } + + mfe.blockIPTables(false) + if err := waitForChains(mfe, canary, tables); err != nil { + t.Errorf("failed to create iptables canaries: %v", err) + } + + close(stopCh) } func waitForChains(mfe *monitorFakeExec, canary Chain, tables []Table) error { @@ -266,3 +329,10 @@ func waitForNoReload(reloads *uint32, expected uint32) error { } return nil } + +func waitForBlocked(mfe *monitorFakeExec) error { + return utilwait.PollImmediate(100*time.Millisecond, time.Second, func() (bool, error) { + blocked := mfe.getWasBlocked() + return blocked, nil + }) +}