mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 12:43:23 +00:00
Merge pull request #83402 from danwinship/iptables-monitor-timeout
iptables.Monitor: don't be fooled by "could not get lock" errors
This commit is contained in:
commit
deaf22430c
@ -502,16 +502,22 @@ const (
|
|||||||
// Monitor is part of Interface
|
// Monitor is part of Interface
|
||||||
func (runner *runner) Monitor(canary Chain, tables []Table, reloadFunc func(), interval time.Duration, stopCh <-chan struct{}) {
|
func (runner *runner) Monitor(canary Chain, tables []Table, reloadFunc func(), interval time.Duration, stopCh <-chan struct{}) {
|
||||||
for {
|
for {
|
||||||
|
_ = utilwait.PollImmediateUntil(interval, func() (bool, error) {
|
||||||
for _, table := range tables {
|
for _, table := range tables {
|
||||||
if _, err := runner.EnsureChain(table, canary); err != nil {
|
if _, err := runner.EnsureChain(table, canary); err != nil {
|
||||||
klog.Warningf("Could not set up iptables canary %s/%s: %v", string(table), string(canary), err)
|
klog.Warningf("Could not set up iptables canary %s/%s: %v", string(table), string(canary), err)
|
||||||
return
|
return false, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return true, nil
|
||||||
|
}, stopCh)
|
||||||
|
|
||||||
// Poll until stopCh is closed or iptables is flushed
|
// Poll until stopCh is closed or iptables is flushed
|
||||||
err := utilwait.PollUntil(interval, func() (bool, error) {
|
err := utilwait.PollUntil(interval, func() (bool, error) {
|
||||||
if runner.chainExists(tables[0], canary) {
|
if exists, err := runner.chainExists(tables[0], canary); exists {
|
||||||
|
return false, nil
|
||||||
|
} else if isResourceError(err) {
|
||||||
|
klog.Warningf("Could not check for iptables canary %s/%s: %v", string(tables[0]), string(canary), err)
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
klog.V(2).Infof("iptables canary %s/%s deleted", string(tables[0]), string(canary))
|
klog.V(2).Infof("iptables canary %s/%s deleted", string(tables[0]), string(canary))
|
||||||
@ -520,7 +526,7 @@ func (runner *runner) Monitor(canary Chain, tables []Table, reloadFunc func(), i
|
|||||||
// so we don't start reloading too soon.
|
// so we don't start reloading too soon.
|
||||||
err := utilwait.PollImmediate(iptablesFlushPollTime, iptablesFlushTimeout, func() (bool, error) {
|
err := utilwait.PollImmediate(iptablesFlushPollTime, iptablesFlushTimeout, func() (bool, error) {
|
||||||
for i := 1; i < len(tables); i++ {
|
for i := 1; i < len(tables); i++ {
|
||||||
if runner.chainExists(tables[i], canary) {
|
if exists, err := runner.chainExists(tables[i], canary); exists || isResourceError(err) {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -547,14 +553,14 @@ func (runner *runner) Monitor(canary Chain, tables []Table, reloadFunc func(), i
|
|||||||
|
|
||||||
// chainExists is used internally by Monitor; none of the public Interface methods can be
|
// chainExists is used internally by Monitor; none of the public Interface methods can be
|
||||||
// used to distinguish "chain exists" from "chain does not exist" with no side effects
|
// used to distinguish "chain exists" from "chain does not exist" with no side effects
|
||||||
func (runner *runner) chainExists(table Table, chain Chain) bool {
|
func (runner *runner) chainExists(table Table, chain Chain) (bool, error) {
|
||||||
fullArgs := makeFullArgs(table, chain)
|
fullArgs := makeFullArgs(table, chain)
|
||||||
|
|
||||||
runner.mu.Lock()
|
runner.mu.Lock()
|
||||||
defer runner.mu.Unlock()
|
defer runner.mu.Unlock()
|
||||||
|
|
||||||
_, err := runner.run(opListChain, fullArgs)
|
_, err := runner.run(opListChain, fullArgs)
|
||||||
return err == nil
|
return err == nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
type operation string
|
type operation string
|
||||||
@ -691,3 +697,16 @@ func IsNotFoundError(err error) bool {
|
|||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const iptablesStatusResourceProblem = 4
|
||||||
|
|
||||||
|
// isResourceError returns true if the error indicates that iptables ran into a "resource
|
||||||
|
// problem" and was unable to attempt the request. In particular, this will be true if it
|
||||||
|
// times out trying to get the iptables lock.
|
||||||
|
func isResourceError(err error) bool {
|
||||||
|
if ee, isExitError := err.(utilexec.ExitError); isExitError {
|
||||||
|
return ee.ExitStatus() == iptablesStatusResourceProblem
|
||||||
|
} else {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -41,6 +41,9 @@ type monitorFakeExec struct {
|
|||||||
sync.Mutex
|
sync.Mutex
|
||||||
|
|
||||||
tables map[string]sets.String
|
tables map[string]sets.String
|
||||||
|
|
||||||
|
block bool
|
||||||
|
wasBlocked bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMonitorFakeExec() *monitorFakeExec {
|
func newMonitorFakeExec() *monitorFakeExec {
|
||||||
@ -51,6 +54,22 @@ func newMonitorFakeExec() *monitorFakeExec {
|
|||||||
return &monitorFakeExec{tables: tables}
|
return &monitorFakeExec{tables: tables}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mfe *monitorFakeExec) blockIPTables(block bool) {
|
||||||
|
mfe.Lock()
|
||||||
|
defer mfe.Unlock()
|
||||||
|
|
||||||
|
mfe.block = block
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mfe *monitorFakeExec) getWasBlocked() bool {
|
||||||
|
mfe.Lock()
|
||||||
|
defer mfe.Unlock()
|
||||||
|
|
||||||
|
wasBlocked := mfe.wasBlocked
|
||||||
|
mfe.wasBlocked = false
|
||||||
|
return wasBlocked
|
||||||
|
}
|
||||||
|
|
||||||
func (mfe *monitorFakeExec) Command(cmd string, args ...string) exec.Cmd {
|
func (mfe *monitorFakeExec) Command(cmd string, args ...string) exec.Cmd {
|
||||||
return &monitorFakeCmd{mfe: mfe, cmd: cmd, args: args}
|
return &monitorFakeCmd{mfe: mfe, cmd: cmd, args: args}
|
||||||
}
|
}
|
||||||
@ -96,6 +115,12 @@ func (mfc *monitorFakeCmd) CombinedOutput() ([]byte, error) {
|
|||||||
return []byte{}, fmt.Errorf("no such table %q", tableName)
|
return []byte{}, fmt.Errorf("no such table %q", tableName)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// For ease-of-testing reasons, blockIPTables blocks create and list, but not delete
|
||||||
|
if mfc.mfe.block && op != opDeleteChain {
|
||||||
|
mfc.mfe.wasBlocked = true
|
||||||
|
return []byte{}, exec.CodeExitError{Code: 4, Err: fmt.Errorf("could not get xtables.lock, etc")}
|
||||||
|
}
|
||||||
|
|
||||||
switch op {
|
switch op {
|
||||||
case opCreateChain:
|
case opCreateChain:
|
||||||
if !table.Has(chainName) {
|
if !table.Has(chainName) {
|
||||||
@ -199,8 +224,17 @@ func TestIPTablesMonitor(t *testing.T) {
|
|||||||
t.Errorf("got unexpected number of reloads after partial flush: %v", err)
|
t.Errorf("got unexpected number of reloads after partial flush: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we delete the last chain, it should reload now
|
// Now ensure that "iptables -L" will get an error about the xtables.lock, and
|
||||||
|
// delete the last chain. The monitor should not reload, because it can't actually
|
||||||
|
// tell if the chain was deleted or not.
|
||||||
|
mfe.blockIPTables(true)
|
||||||
ipt.DeleteChain(TableNAT, canary)
|
ipt.DeleteChain(TableNAT, canary)
|
||||||
|
if err := waitForBlocked(mfe); err != nil {
|
||||||
|
t.Errorf("failed waiting for monitor to be blocked from monitoring: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// After unblocking the monitor, it should now reload
|
||||||
|
mfe.blockIPTables(false)
|
||||||
|
|
||||||
if err := waitForReloads(&reloads, 2); err != nil {
|
if err := waitForReloads(&reloads, 2); err != nil {
|
||||||
t.Errorf("got unexpected number of reloads after slow flush: %v", err)
|
t.Errorf("got unexpected number of reloads after slow flush: %v", err)
|
||||||
@ -213,11 +247,40 @@ func TestIPTablesMonitor(t *testing.T) {
|
|||||||
close(stopCh)
|
close(stopCh)
|
||||||
|
|
||||||
if err := waitForNoReload(&reloads, 2); err != nil {
|
if err := waitForNoReload(&reloads, 2); err != nil {
|
||||||
t.Errorf("got unexpected number of reloads after partial flush: %v", err)
|
t.Errorf("got unexpected number of reloads after stop: %v", err)
|
||||||
}
|
}
|
||||||
if !ensureNoChains(mfe) {
|
if !ensureNoChains(mfe) {
|
||||||
t.Errorf("canaries still exist after stopping monitor")
|
t.Errorf("canaries still exist after stopping monitor")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If we create a new monitor while the iptables lock is held, it will
|
||||||
|
// retry creating canaries until it succeeds
|
||||||
|
|
||||||
|
stopCh = make(chan struct{})
|
||||||
|
_ = mfe.getWasBlocked()
|
||||||
|
mfe.blockIPTables(true)
|
||||||
|
go ipt.Monitor(canary, tables, func() {
|
||||||
|
if !ensureNoChains(mfe) {
|
||||||
|
t.Errorf("reload called while canaries still exist")
|
||||||
|
}
|
||||||
|
atomic.AddUint32(&reloads, 1)
|
||||||
|
}, 100*time.Millisecond, stopCh)
|
||||||
|
|
||||||
|
// Monitor should not have created canaries yet
|
||||||
|
if !ensureNoChains(mfe) {
|
||||||
|
t.Errorf("canary created while iptables blocked")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := waitForBlocked(mfe); err != nil {
|
||||||
|
t.Errorf("failed waiting for monitor to fail creating canaries: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
mfe.blockIPTables(false)
|
||||||
|
if err := waitForChains(mfe, canary, tables); err != nil {
|
||||||
|
t.Errorf("failed to create iptables canaries: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
close(stopCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
func waitForChains(mfe *monitorFakeExec, canary Chain, tables []Table) error {
|
func waitForChains(mfe *monitorFakeExec, canary Chain, tables []Table) error {
|
||||||
@ -266,3 +329,10 @@ func waitForNoReload(reloads *uint32, expected uint32) error {
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func waitForBlocked(mfe *monitorFakeExec) error {
|
||||||
|
return utilwait.PollImmediate(100*time.Millisecond, time.Second, func() (bool, error) {
|
||||||
|
blocked := mfe.getWasBlocked()
|
||||||
|
return blocked, nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user