mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
iptables.Monitor: don't be fooled by "could not get lock" errors
This commit is contained in:
parent
c2c821534b
commit
2f89c03c63
@ -502,16 +502,22 @@ const (
|
||||
// Monitor is part of Interface
|
||||
func (runner *runner) Monitor(canary Chain, tables []Table, reloadFunc func(), interval time.Duration, stopCh <-chan struct{}) {
|
||||
for {
|
||||
_ = utilwait.PollImmediateUntil(interval, func() (bool, error) {
|
||||
for _, table := range tables {
|
||||
if _, err := runner.EnsureChain(table, canary); err != nil {
|
||||
klog.Warningf("Could not set up iptables canary %s/%s: %v", string(table), string(canary), err)
|
||||
return
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
}, stopCh)
|
||||
|
||||
// Poll until stopCh is closed or iptables is flushed
|
||||
err := utilwait.PollUntil(interval, func() (bool, error) {
|
||||
if runner.chainExists(tables[0], canary) {
|
||||
if exists, err := runner.chainExists(tables[0], canary); exists {
|
||||
return false, nil
|
||||
} else if isResourceError(err) {
|
||||
klog.Warningf("Could not check for iptables canary %s/%s: %v", string(tables[0]), string(canary), err)
|
||||
return false, nil
|
||||
}
|
||||
klog.V(2).Infof("iptables canary %s/%s deleted", string(tables[0]), string(canary))
|
||||
@ -520,7 +526,7 @@ func (runner *runner) Monitor(canary Chain, tables []Table, reloadFunc func(), i
|
||||
// so we don't start reloading too soon.
|
||||
err := utilwait.PollImmediate(iptablesFlushPollTime, iptablesFlushTimeout, func() (bool, error) {
|
||||
for i := 1; i < len(tables); i++ {
|
||||
if runner.chainExists(tables[i], canary) {
|
||||
if exists, err := runner.chainExists(tables[i], canary); exists || isResourceError(err) {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
@ -547,14 +553,14 @@ func (runner *runner) Monitor(canary Chain, tables []Table, reloadFunc func(), i
|
||||
|
||||
// chainExists is used internally by Monitor; none of the public Interface methods can be
|
||||
// used to distinguish "chain exists" from "chain does not exist" with no side effects
|
||||
func (runner *runner) chainExists(table Table, chain Chain) bool {
|
||||
func (runner *runner) chainExists(table Table, chain Chain) (bool, error) {
|
||||
fullArgs := makeFullArgs(table, chain)
|
||||
|
||||
runner.mu.Lock()
|
||||
defer runner.mu.Unlock()
|
||||
|
||||
_, err := runner.run(opListChain, fullArgs)
|
||||
return err == nil
|
||||
return err == nil, err
|
||||
}
|
||||
|
||||
type operation string
|
||||
@ -691,3 +697,16 @@ func IsNotFoundError(err error) bool {
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
const iptablesStatusResourceProblem = 4
|
||||
|
||||
// isResourceError returns true if the error indicates that iptables ran into a "resource
|
||||
// problem" and was unable to attempt the request. In particular, this will be true if it
|
||||
// times out trying to get the iptables lock.
|
||||
func isResourceError(err error) bool {
|
||||
if ee, isExitError := err.(utilexec.ExitError); isExitError {
|
||||
return ee.ExitStatus() == iptablesStatusResourceProblem
|
||||
} else {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
@ -41,6 +41,9 @@ type monitorFakeExec struct {
|
||||
sync.Mutex
|
||||
|
||||
tables map[string]sets.String
|
||||
|
||||
block bool
|
||||
wasBlocked bool
|
||||
}
|
||||
|
||||
func newMonitorFakeExec() *monitorFakeExec {
|
||||
@ -51,6 +54,22 @@ func newMonitorFakeExec() *monitorFakeExec {
|
||||
return &monitorFakeExec{tables: tables}
|
||||
}
|
||||
|
||||
func (mfe *monitorFakeExec) blockIPTables(block bool) {
|
||||
mfe.Lock()
|
||||
defer mfe.Unlock()
|
||||
|
||||
mfe.block = block
|
||||
}
|
||||
|
||||
func (mfe *monitorFakeExec) getWasBlocked() bool {
|
||||
mfe.Lock()
|
||||
defer mfe.Unlock()
|
||||
|
||||
wasBlocked := mfe.wasBlocked
|
||||
mfe.wasBlocked = false
|
||||
return wasBlocked
|
||||
}
|
||||
|
||||
func (mfe *monitorFakeExec) Command(cmd string, args ...string) exec.Cmd {
|
||||
return &monitorFakeCmd{mfe: mfe, cmd: cmd, args: args}
|
||||
}
|
||||
@ -96,6 +115,12 @@ func (mfc *monitorFakeCmd) CombinedOutput() ([]byte, error) {
|
||||
return []byte{}, fmt.Errorf("no such table %q", tableName)
|
||||
}
|
||||
|
||||
// For ease-of-testing reasons, blockIPTables blocks create and list, but not delete
|
||||
if mfc.mfe.block && op != opDeleteChain {
|
||||
mfc.mfe.wasBlocked = true
|
||||
return []byte{}, exec.CodeExitError{Code: 4, Err: fmt.Errorf("could not get xtables.lock, etc")}
|
||||
}
|
||||
|
||||
switch op {
|
||||
case opCreateChain:
|
||||
if !table.Has(chainName) {
|
||||
@ -199,8 +224,17 @@ func TestIPTablesMonitor(t *testing.T) {
|
||||
t.Errorf("got unexpected number of reloads after partial flush: %v", err)
|
||||
}
|
||||
|
||||
// If we delete the last chain, it should reload now
|
||||
// Now ensure that "iptables -L" will get an error about the xtables.lock, and
|
||||
// delete the last chain. The monitor should not reload, because it can't actually
|
||||
// tell if the chain was deleted or not.
|
||||
mfe.blockIPTables(true)
|
||||
ipt.DeleteChain(TableNAT, canary)
|
||||
if err := waitForBlocked(mfe); err != nil {
|
||||
t.Errorf("failed waiting for monitor to be blocked from monitoring: %v", err)
|
||||
}
|
||||
|
||||
// After unblocking the monitor, it should now reload
|
||||
mfe.blockIPTables(false)
|
||||
|
||||
if err := waitForReloads(&reloads, 2); err != nil {
|
||||
t.Errorf("got unexpected number of reloads after slow flush: %v", err)
|
||||
@ -213,11 +247,40 @@ func TestIPTablesMonitor(t *testing.T) {
|
||||
close(stopCh)
|
||||
|
||||
if err := waitForNoReload(&reloads, 2); err != nil {
|
||||
t.Errorf("got unexpected number of reloads after partial flush: %v", err)
|
||||
t.Errorf("got unexpected number of reloads after stop: %v", err)
|
||||
}
|
||||
if !ensureNoChains(mfe) {
|
||||
t.Errorf("canaries still exist after stopping monitor")
|
||||
}
|
||||
|
||||
// If we create a new monitor while the iptables lock is held, it will
|
||||
// retry creating canaries until it succeeds
|
||||
|
||||
stopCh = make(chan struct{})
|
||||
_ = mfe.getWasBlocked()
|
||||
mfe.blockIPTables(true)
|
||||
go ipt.Monitor(canary, tables, func() {
|
||||
if !ensureNoChains(mfe) {
|
||||
t.Errorf("reload called while canaries still exist")
|
||||
}
|
||||
atomic.AddUint32(&reloads, 1)
|
||||
}, 100*time.Millisecond, stopCh)
|
||||
|
||||
// Monitor should not have created canaries yet
|
||||
if !ensureNoChains(mfe) {
|
||||
t.Errorf("canary created while iptables blocked")
|
||||
}
|
||||
|
||||
if err := waitForBlocked(mfe); err != nil {
|
||||
t.Errorf("failed waiting for monitor to fail creating canaries: %v", err)
|
||||
}
|
||||
|
||||
mfe.blockIPTables(false)
|
||||
if err := waitForChains(mfe, canary, tables); err != nil {
|
||||
t.Errorf("failed to create iptables canaries: %v", err)
|
||||
}
|
||||
|
||||
close(stopCh)
|
||||
}
|
||||
|
||||
func waitForChains(mfe *monitorFakeExec, canary Chain, tables []Table) error {
|
||||
@ -266,3 +329,10 @@ func waitForNoReload(reloads *uint32, expected uint32) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func waitForBlocked(mfe *monitorFakeExec) error {
|
||||
return utilwait.PollImmediate(100*time.Millisecond, time.Second, func() (bool, error) {
|
||||
blocked := mfe.getWasBlocked()
|
||||
return blocked, nil
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user