diff --git a/pkg/kubelet/dockershim/network/hostport/fake_iptables.go b/pkg/kubelet/dockershim/network/hostport/fake_iptables.go index f345c8f7f3e..28db1b65a27 100644 --- a/pkg/kubelet/dockershim/network/hostport/fake_iptables.go +++ b/pkg/kubelet/dockershim/network/hostport/fake_iptables.go @@ -21,6 +21,7 @@ import ( "fmt" "net" "strings" + "time" "k8s.io/apimachinery/pkg/util/sets" utiliptables "k8s.io/kubernetes/pkg/util/iptables" @@ -335,6 +336,9 @@ func (f *fakeIPTables) RestoreAll(data []byte, flush utiliptables.FlushFlag, cou return f.restore("", data, flush) } +func (f *fakeIPTables) Monitor(canary utiliptables.Chain, tables []utiliptables.Table, reloadFunc func(), interval time.Duration, stopCh <-chan struct{}) { +} + func (f *fakeIPTables) isBuiltinChain(tableName utiliptables.Table, chainName utiliptables.Chain) bool { if builtinChains, ok := f.builtinChains[string(tableName)]; ok && builtinChains.Has(string(chainName)) { return true diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 99c28354546..994e502b794 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1427,9 +1427,9 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { } go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop) - // Start loop to sync iptables util rules + // Set up iptables util rules if kl.makeIPTablesUtilChains { - go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop) + kl.initNetworkUtil() } // Start a goroutine responsible for killing pods (that are not properly diff --git a/pkg/kubelet/kubelet_network_linux.go b/pkg/kubelet/kubelet_network_linux.go index 1c9ad46b989..f3b82c94142 100644 --- a/pkg/kubelet/kubelet_network_linux.go +++ b/pkg/kubelet/kubelet_network_linux.go @@ -20,11 +20,20 @@ package kubelet import ( "fmt" + "time" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog" utiliptables "k8s.io/kubernetes/pkg/util/iptables" ) +func (kl *Kubelet) initNetworkUtil() { + kl.syncNetworkUtil() + go kl.iptClient.Monitor(utiliptables.Chain("KUBE-KUBELET-CANARY"), + []utiliptables.Table{utiliptables.TableMangle, utiliptables.TableNAT, utiliptables.TableFilter}, + kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop) +} + // syncNetworkUtil ensures the network utility are present on host. // Network util includes: // 1. In nat table, KUBE-MARK-DROP rule to mark connections for dropping diff --git a/pkg/kubelet/kubelet_network_others.go b/pkg/kubelet/kubelet_network_others.go index 53267bfc5f6..bf0a3ba52fc 100644 --- a/pkg/kubelet/kubelet_network_others.go +++ b/pkg/kubelet/kubelet_network_others.go @@ -19,4 +19,4 @@ limitations under the License. package kubelet // Do nothing. -func (kl *Kubelet) syncNetworkUtil() {} +func (kl *Kubelet) initNetworkUtil() {} diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 5d0afa016ed..92287f9a550 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -323,7 +323,13 @@ func NewProxier(ipt utiliptables.Interface, } burstSyncs := 2 klog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs) - proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs) + // We pass syncPeriod to ipt.Monitor, which will call us only if it needs to. + // We need to pass *some* maxInterval to NewBoundedFrequencyRunner anyway though. + // time.Hour is arbitrary. + proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, time.Hour, burstSyncs) + go ipt.Monitor(utiliptables.Chain("KUBE-PROXY-CANARY"), + []utiliptables.Table{utiliptables.TableMangle, utiliptables.TableNAT, utiliptables.TableFilter}, + proxier.syncProxyRules, syncPeriod, wait.NeverStop) return proxier, nil } diff --git a/pkg/util/iptables/BUILD b/pkg/util/iptables/BUILD index bf03082c5e0..15241a00298 100644 --- a/pkg/util/iptables/BUILD +++ b/pkg/util/iptables/BUILD @@ -19,13 +19,13 @@ go_library( deps = [ "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/version:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", "//vendor/k8s.io/utils/trace:go_default_library", ] + select({ "@io_bazel_rules_go//go/platform:linux": [ "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/golang.org/x/sys/unix:go_default_library", ], "//conditions:default": [], @@ -36,6 +36,7 @@ go_test( name = "go_default_test", srcs = [ "iptables_test.go", + "monitor_test.go", "save_restore_test.go", ], embed = [":go_default_library"], @@ -43,6 +44,7 @@ go_test( "@io_bazel_rules_go//go/platform:linux": [ "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/version:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", "//vendor/k8s.io/utils/exec/testing:go_default_library", ], diff --git a/pkg/util/iptables/iptables.go b/pkg/util/iptables/iptables.go index da4c7d7ec1a..a060ed4d469 100644 --- a/pkg/util/iptables/iptables.go +++ b/pkg/util/iptables/iptables.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" utilversion "k8s.io/apimachinery/pkg/util/version" + utilwait "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog" utilexec "k8s.io/utils/exec" utiltrace "k8s.io/utils/trace" @@ -63,6 +64,17 @@ type Interface interface { Restore(table Table, data []byte, flush FlushFlag, counters RestoreCountersFlag) error // RestoreAll is the same as Restore except that no table is specified. RestoreAll(data []byte, flush FlushFlag, counters RestoreCountersFlag) error + // Monitor detects when the given iptables tables have been flushed by an external + // tool (e.g. a firewall reload) by creating canary chains and polling to see if + // they have been deleted. (Specifically, it polls tables[0] every interval until + // the canary has been deleted from there, then waits a short additional time for + // the canaries to be deleted from the remaining tables as well. You can optimize + // the polling by listing a relatively empty table in tables[0]). When a flush is + // detected, this calls the reloadFunc so the caller can reload their own iptables + // rules. If it is unable to create the canary chains (either initially or after + // a reload) it will log an error and stop monitoring. + // (This function should be called from a goroutine.) + Monitor(canary Chain, tables []Table, reloadFunc func(), interval time.Duration, stopCh <-chan struct{}) // HasRandomFully reveals whether `-j MASQUERADE` takes the // `--random-fully` option. This is helpful to work around a // Linux kernel bug that sometimes causes multiple flows to get @@ -480,12 +492,78 @@ func (runner *runner) checkRuleUsingCheck(args []string) (bool, error) { return false, fmt.Errorf("error checking rule: %v: %s", err, out) } +const ( + // Max time we wait for an iptables flush to complete after we notice it has started + iptablesFlushTimeout = 5 * time.Second + // How often we poll while waiting for an iptables flush to complete + iptablesFlushPollTime = 100 * time.Millisecond +) + +// Monitor is part of Interface +func (runner *runner) Monitor(canary Chain, tables []Table, reloadFunc func(), interval time.Duration, stopCh <-chan struct{}) { + for { + for _, table := range tables { + if _, err := runner.EnsureChain(table, canary); err != nil { + klog.Warningf("Could not set up iptables canary %s/%s: %v", string(table), string(canary), err) + return + } + } + + // Poll until stopCh is closed or iptables is flushed + err := utilwait.PollUntil(interval, func() (bool, error) { + if runner.chainExists(tables[0], canary) { + return false, nil + } + klog.V(2).Infof("iptables canary %s/%s deleted", string(tables[0]), string(canary)) + + // Wait for the other canaries to be deleted too before returning + // so we don't start reloading too soon. + err := utilwait.PollImmediate(iptablesFlushPollTime, iptablesFlushTimeout, func() (bool, error) { + for i := 1; i < len(tables); i++ { + if runner.chainExists(tables[i], canary) { + return false, nil + } + } + return true, nil + }) + if err != nil { + klog.Warning("Inconsistent iptables state detected.") + } + return true, nil + }, stopCh) + + if err != nil { + // stopCh was closed + for _, table := range tables { + _ = runner.DeleteChain(table, canary) + } + return + } + + klog.V(2).Infof("Reloading after iptables flush") + reloadFunc() + } +} + +// chainExists is used internally by Monitor; none of the public Interface methods can be +// used to distinguish "chain exists" from "chain does not exist" with no side effects +func (runner *runner) chainExists(table Table, chain Chain) bool { + fullArgs := makeFullArgs(table, chain) + + runner.mu.Lock() + defer runner.mu.Unlock() + + _, err := runner.run(opListChain, fullArgs) + return err == nil +} + type operation string const ( opCreateChain operation = "-N" opFlushChain operation = "-F" opDeleteChain operation = "-X" + opListChain operation = "-L" opAppendRule operation = "-A" opCheckRule operation = "-C" opDeleteRule operation = "-D" diff --git a/pkg/util/iptables/monitor_test.go b/pkg/util/iptables/monitor_test.go new file mode 100644 index 00000000000..0a0cc875d6a --- /dev/null +++ b/pkg/util/iptables/monitor_test.go @@ -0,0 +1,268 @@ +// +build linux + +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package iptables + +import ( + "context" + "fmt" + "io" + "sync" + "sync/atomic" + "testing" + "time" + + "k8s.io/apimachinery/pkg/util/sets" + utilwait "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/utils/exec" +) + +// We can't use the normal FakeExec because we don't know precisely how many times the +// Monitor thread will do its checks, and we don't know precisely how its iptables calls +// will interleave with the main thread's. So we use our own fake Exec implementation that +// implements a minimal iptables interface. This will need updates as iptables.runner +// changes its use of Exec. +type monitorFakeExec struct { + sync.Mutex + + tables map[string]sets.String +} + +func newMonitorFakeExec() *monitorFakeExec { + tables := make(map[string]sets.String) + tables["mangle"] = sets.NewString() + tables["filter"] = sets.NewString() + tables["nat"] = sets.NewString() + return &monitorFakeExec{tables: tables} +} + +func (mfe *monitorFakeExec) Command(cmd string, args ...string) exec.Cmd { + return &monitorFakeCmd{mfe: mfe, cmd: cmd, args: args} +} + +func (mfe *monitorFakeExec) CommandContext(ctx context.Context, cmd string, args ...string) exec.Cmd { + return mfe.Command(cmd, args...) +} + +func (mfe *monitorFakeExec) LookPath(file string) (string, error) { + return file, nil +} + +type monitorFakeCmd struct { + mfe *monitorFakeExec + cmd string + args []string +} + +func (mfc *monitorFakeCmd) CombinedOutput() ([]byte, error) { + if mfc.cmd == cmdIPTablesRestore { + // Only used for "iptables-restore --version", and the result doesn't matter + return []byte{}, nil + } else if mfc.cmd != cmdIPTables { + panic("bad command " + mfc.cmd) + } + + if len(mfc.args) == 1 && mfc.args[0] == "--version" { + return []byte("iptables v1.6.2"), nil + } + + if len(mfc.args) != 6 || mfc.args[0] != WaitString || mfc.args[1] != WaitSecondsValue || mfc.args[4] != "-t" { + panic(fmt.Sprintf("bad args %#v", mfc.args)) + } + op := operation(mfc.args[2]) + chainName := mfc.args[3] + tableName := mfc.args[5] + + mfc.mfe.Lock() + defer mfc.mfe.Unlock() + + table := mfc.mfe.tables[tableName] + if table == nil { + return []byte{}, fmt.Errorf("no such table %q", tableName) + } + + switch op { + case opCreateChain: + if !table.Has(chainName) { + table.Insert(chainName) + } + return []byte{}, nil + case opListChain: + if table.Has(chainName) { + return []byte{}, nil + } else { + return []byte{}, fmt.Errorf("no such chain %q", chainName) + } + case opDeleteChain: + table.Delete(chainName) + return []byte{}, nil + default: + panic("should not be reached") + } +} + +func (mfc *monitorFakeCmd) SetStdin(in io.Reader) { + // Used by getIPTablesRestoreVersionString(), can be ignored +} + +func (mfc *monitorFakeCmd) Run() error { + panic("should not be reached") +} +func (mfc *monitorFakeCmd) Output() ([]byte, error) { + panic("should not be reached") +} +func (mfc *monitorFakeCmd) SetDir(dir string) { + panic("should not be reached") +} +func (mfc *monitorFakeCmd) SetStdout(out io.Writer) { + panic("should not be reached") +} +func (mfc *monitorFakeCmd) SetStderr(out io.Writer) { + panic("should not be reached") +} +func (mfc *monitorFakeCmd) SetEnv(env []string) { + panic("should not be reached") +} +func (mfc *monitorFakeCmd) StdoutPipe() (io.ReadCloser, error) { + panic("should not be reached") +} +func (mfc *monitorFakeCmd) StderrPipe() (io.ReadCloser, error) { + panic("should not be reached") +} +func (mfc *monitorFakeCmd) Start() error { + panic("should not be reached") +} +func (mfc *monitorFakeCmd) Wait() error { + panic("should not be reached") +} +func (mfc *monitorFakeCmd) Stop() { + panic("should not be reached") +} + +func TestIPTablesMonitor(t *testing.T) { + mfe := newMonitorFakeExec() + ipt := New(mfe, ProtocolIpv4) + + var reloads uint32 + stopCh := make(chan struct{}) + + canary := Chain("MONITOR-TEST-CANARY") + tables := []Table{TableMangle, TableFilter, TableNAT} + go ipt.Monitor(canary, tables, func() { + if !ensureNoChains(mfe) { + t.Errorf("reload called while canaries still exist") + } + atomic.AddUint32(&reloads, 1) + }, 100*time.Millisecond, stopCh) + + // Monitor should create canary chains quickly + if err := waitForChains(mfe, canary, tables); err != nil { + t.Errorf("failed to create iptables canaries: %v", err) + } + + if err := waitForReloads(&reloads, 0); err != nil { + t.Errorf("got unexpected reloads: %v", err) + } + + // If we delete all of the chains, it should reload + ipt.DeleteChain(TableMangle, canary) + ipt.DeleteChain(TableFilter, canary) + ipt.DeleteChain(TableNAT, canary) + + if err := waitForReloads(&reloads, 1); err != nil { + t.Errorf("got unexpected number of reloads after flush: %v", err) + } + if err := waitForChains(mfe, canary, tables); err != nil { + t.Errorf("failed to create iptables canaries: %v", err) + } + + // If we delete two chains, it should not reload yet + ipt.DeleteChain(TableMangle, canary) + ipt.DeleteChain(TableFilter, canary) + + if err := waitForNoReload(&reloads, 1); err != nil { + t.Errorf("got unexpected number of reloads after partial flush: %v", err) + } + + // If we delete the last chain, it should reload now + ipt.DeleteChain(TableNAT, canary) + + if err := waitForReloads(&reloads, 2); err != nil { + t.Errorf("got unexpected number of reloads after slow flush: %v", err) + } + if err := waitForChains(mfe, canary, tables); err != nil { + t.Errorf("failed to create iptables canaries: %v", err) + } + + // If we close the stop channel, it should stop running + close(stopCh) + + if err := waitForNoReload(&reloads, 2); err != nil { + t.Errorf("got unexpected number of reloads after partial flush: %v", err) + } + if !ensureNoChains(mfe) { + t.Errorf("canaries still exist after stopping monitor") + } +} + +func waitForChains(mfe *monitorFakeExec, canary Chain, tables []Table) error { + return utilwait.PollImmediate(100*time.Millisecond, time.Second, func() (bool, error) { + mfe.Lock() + defer mfe.Unlock() + + for _, table := range tables { + if !mfe.tables[string(table)].Has(string(canary)) { + return false, nil + } + } + return true, nil + }) +} + +func ensureNoChains(mfe *monitorFakeExec) bool { + mfe.Lock() + defer mfe.Unlock() + return mfe.tables["mangle"].Len() == 0 && + mfe.tables["filter"].Len() == 0 && + mfe.tables["nat"].Len() == 0 +} + +func waitForReloads(reloads *uint32, expected uint32) error { + if atomic.LoadUint32(reloads) < expected { + utilwait.PollImmediate(100*time.Millisecond, time.Second, func() (bool, error) { + return atomic.LoadUint32(reloads) >= expected, nil + }) + } + got := atomic.LoadUint32(reloads) + if got != expected { + return fmt.Errorf("expected %d, got %d", expected, got) + } + return nil +} + +func waitForNoReload(reloads *uint32, expected uint32) error { + utilwait.PollImmediate(50*time.Millisecond, 250*time.Millisecond, func() (bool, error) { + return atomic.LoadUint32(reloads) > expected, nil + }) + + got := atomic.LoadUint32(reloads) + if got != expected { + return fmt.Errorf("expected %d, got %d", expected, got) + } + return nil +} diff --git a/pkg/util/iptables/testing/fake.go b/pkg/util/iptables/testing/fake.go index 67309166784..c95d9810a65 100644 --- a/pkg/util/iptables/testing/fake.go +++ b/pkg/util/iptables/testing/fake.go @@ -20,6 +20,7 @@ import ( "bytes" "fmt" "strings" + "time" "k8s.io/kubernetes/pkg/util/iptables" ) @@ -99,6 +100,9 @@ func (f *FakeIPTables) RestoreAll(data []byte, flush iptables.FlushFlag, counter return nil } +func (f *FakeIPTables) Monitor(canary iptables.Chain, tables []iptables.Table, reloadFunc func(), interval time.Duration, stopCh <-chan struct{}) { +} + func getToken(line, separator string) string { tokens := strings.Split(line, separator) if len(tokens) == 2 {