From 3948f16ff4060955b7f25d26d261b99589963ade Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Tue, 13 Aug 2019 08:29:39 -0400 Subject: [PATCH] Add iptables.Monitor, use it from kubelet and kube-proxy Kubelet and kube-proxy both had loops to ensure that their iptables rules didn't get deleted, by repeatedly recreating them. But on systems with lots of iptables rules (ie, thousands of services), this can be very slow (and thus might end up holding the iptables lock for several seconds, blocking other operations, etc). The specific threat that they need to worry about is firewall-management commands that flush *all* dynamic iptables rules. So add a new iptables.Monitor() function that handles this by creating iptables-flush canaries and only triggering a full rule reload after noticing that someone has deleted those chains. --- .../network/hostport/fake_iptables.go | 4 + pkg/kubelet/kubelet.go | 4 +- pkg/kubelet/kubelet_network_linux.go | 9 + pkg/kubelet/kubelet_network_others.go | 2 +- pkg/proxy/iptables/proxier.go | 8 +- pkg/util/iptables/BUILD | 4 +- pkg/util/iptables/iptables.go | 78 +++++ pkg/util/iptables/monitor_test.go | 268 ++++++++++++++++++ pkg/util/iptables/testing/fake.go | 4 + 9 files changed, 376 insertions(+), 5 deletions(-) create mode 100644 pkg/util/iptables/monitor_test.go diff --git a/pkg/kubelet/dockershim/network/hostport/fake_iptables.go b/pkg/kubelet/dockershim/network/hostport/fake_iptables.go index f345c8f7f3e..28db1b65a27 100644 --- a/pkg/kubelet/dockershim/network/hostport/fake_iptables.go +++ b/pkg/kubelet/dockershim/network/hostport/fake_iptables.go @@ -21,6 +21,7 @@ import ( "fmt" "net" "strings" + "time" "k8s.io/apimachinery/pkg/util/sets" utiliptables "k8s.io/kubernetes/pkg/util/iptables" @@ -335,6 +336,9 @@ func (f *fakeIPTables) RestoreAll(data []byte, flush utiliptables.FlushFlag, cou return f.restore("", data, flush) } +func (f *fakeIPTables) Monitor(canary utiliptables.Chain, tables []utiliptables.Table, reloadFunc func(), interval time.Duration, stopCh <-chan struct{}) { +} + func (f *fakeIPTables) isBuiltinChain(tableName utiliptables.Table, chainName utiliptables.Chain) bool { if builtinChains, ok := f.builtinChains[string(tableName)]; ok && builtinChains.Has(string(chainName)) { return true diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 99c28354546..994e502b794 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1427,9 +1427,9 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { } go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop) - // Start loop to sync iptables util rules + // Set up iptables util rules if kl.makeIPTablesUtilChains { - go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop) + kl.initNetworkUtil() } // Start a goroutine responsible for killing pods (that are not properly diff --git a/pkg/kubelet/kubelet_network_linux.go b/pkg/kubelet/kubelet_network_linux.go index 1c9ad46b989..f3b82c94142 100644 --- a/pkg/kubelet/kubelet_network_linux.go +++ b/pkg/kubelet/kubelet_network_linux.go @@ -20,11 +20,20 @@ package kubelet import ( "fmt" + "time" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog" utiliptables "k8s.io/kubernetes/pkg/util/iptables" ) +func (kl *Kubelet) initNetworkUtil() { + kl.syncNetworkUtil() + go kl.iptClient.Monitor(utiliptables.Chain("KUBE-KUBELET-CANARY"), + []utiliptables.Table{utiliptables.TableMangle, utiliptables.TableNAT, utiliptables.TableFilter}, + kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop) +} + // syncNetworkUtil ensures the network utility are present on host. // Network util includes: // 1. In nat table, KUBE-MARK-DROP rule to mark connections for dropping diff --git a/pkg/kubelet/kubelet_network_others.go b/pkg/kubelet/kubelet_network_others.go index 53267bfc5f6..bf0a3ba52fc 100644 --- a/pkg/kubelet/kubelet_network_others.go +++ b/pkg/kubelet/kubelet_network_others.go @@ -19,4 +19,4 @@ limitations under the License. package kubelet // Do nothing. -func (kl *Kubelet) syncNetworkUtil() {} +func (kl *Kubelet) initNetworkUtil() {} diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 5d0afa016ed..92287f9a550 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -323,7 +323,13 @@ func NewProxier(ipt utiliptables.Interface, } burstSyncs := 2 klog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs) - proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs) + // We pass syncPeriod to ipt.Monitor, which will call us only if it needs to. + // We need to pass *some* maxInterval to NewBoundedFrequencyRunner anyway though. + // time.Hour is arbitrary. + proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, time.Hour, burstSyncs) + go ipt.Monitor(utiliptables.Chain("KUBE-PROXY-CANARY"), + []utiliptables.Table{utiliptables.TableMangle, utiliptables.TableNAT, utiliptables.TableFilter}, + proxier.syncProxyRules, syncPeriod, wait.NeverStop) return proxier, nil } diff --git a/pkg/util/iptables/BUILD b/pkg/util/iptables/BUILD index bf03082c5e0..15241a00298 100644 --- a/pkg/util/iptables/BUILD +++ b/pkg/util/iptables/BUILD @@ -19,13 +19,13 @@ go_library( deps = [ "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/version:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", "//vendor/k8s.io/utils/trace:go_default_library", ] + select({ "@io_bazel_rules_go//go/platform:linux": [ "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/golang.org/x/sys/unix:go_default_library", ], "//conditions:default": [], @@ -36,6 +36,7 @@ go_test( name = "go_default_test", srcs = [ "iptables_test.go", + "monitor_test.go", "save_restore_test.go", ], embed = [":go_default_library"], @@ -43,6 +44,7 @@ go_test( "@io_bazel_rules_go//go/platform:linux": [ "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/version:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", "//vendor/k8s.io/utils/exec/testing:go_default_library", ], diff --git a/pkg/util/iptables/iptables.go b/pkg/util/iptables/iptables.go index da4c7d7ec1a..a060ed4d469 100644 --- a/pkg/util/iptables/iptables.go +++ b/pkg/util/iptables/iptables.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" utilversion "k8s.io/apimachinery/pkg/util/version" + utilwait "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog" utilexec "k8s.io/utils/exec" utiltrace "k8s.io/utils/trace" @@ -63,6 +64,17 @@ type Interface interface { Restore(table Table, data []byte, flush FlushFlag, counters RestoreCountersFlag) error // RestoreAll is the same as Restore except that no table is specified. RestoreAll(data []byte, flush FlushFlag, counters RestoreCountersFlag) error + // Monitor detects when the given iptables tables have been flushed by an external + // tool (e.g. a firewall reload) by creating canary chains and polling to see if + // they have been deleted. (Specifically, it polls tables[0] every interval until + // the canary has been deleted from there, then waits a short additional time for + // the canaries to be deleted from the remaining tables as well. You can optimize + // the polling by listing a relatively empty table in tables[0]). When a flush is + // detected, this calls the reloadFunc so the caller can reload their own iptables + // rules. If it is unable to create the canary chains (either initially or after + // a reload) it will log an error and stop monitoring. + // (This function should be called from a goroutine.) + Monitor(canary Chain, tables []Table, reloadFunc func(), interval time.Duration, stopCh <-chan struct{}) // HasRandomFully reveals whether `-j MASQUERADE` takes the // `--random-fully` option. This is helpful to work around a // Linux kernel bug that sometimes causes multiple flows to get @@ -480,12 +492,78 @@ func (runner *runner) checkRuleUsingCheck(args []string) (bool, error) { return false, fmt.Errorf("error checking rule: %v: %s", err, out) } +const ( + // Max time we wait for an iptables flush to complete after we notice it has started + iptablesFlushTimeout = 5 * time.Second + // How often we poll while waiting for an iptables flush to complete + iptablesFlushPollTime = 100 * time.Millisecond +) + +// Monitor is part of Interface +func (runner *runner) Monitor(canary Chain, tables []Table, reloadFunc func(), interval time.Duration, stopCh <-chan struct{}) { + for { + for _, table := range tables { + if _, err := runner.EnsureChain(table, canary); err != nil { + klog.Warningf("Could not set up iptables canary %s/%s: %v", string(table), string(canary), err) + return + } + } + + // Poll until stopCh is closed or iptables is flushed + err := utilwait.PollUntil(interval, func() (bool, error) { + if runner.chainExists(tables[0], canary) { + return false, nil + } + klog.V(2).Infof("iptables canary %s/%s deleted", string(tables[0]), string(canary)) + + // Wait for the other canaries to be deleted too before returning + // so we don't start reloading too soon. + err := utilwait.PollImmediate(iptablesFlushPollTime, iptablesFlushTimeout, func() (bool, error) { + for i := 1; i < len(tables); i++ { + if runner.chainExists(tables[i], canary) { + return false, nil + } + } + return true, nil + }) + if err != nil { + klog.Warning("Inconsistent iptables state detected.") + } + return true, nil + }, stopCh) + + if err != nil { + // stopCh was closed + for _, table := range tables { + _ = runner.DeleteChain(table, canary) + } + return + } + + klog.V(2).Infof("Reloading after iptables flush") + reloadFunc() + } +} + +// chainExists is used internally by Monitor; none of the public Interface methods can be +// used to distinguish "chain exists" from "chain does not exist" with no side effects +func (runner *runner) chainExists(table Table, chain Chain) bool { + fullArgs := makeFullArgs(table, chain) + + runner.mu.Lock() + defer runner.mu.Unlock() + + _, err := runner.run(opListChain, fullArgs) + return err == nil +} + type operation string const ( opCreateChain operation = "-N" opFlushChain operation = "-F" opDeleteChain operation = "-X" + opListChain operation = "-L" opAppendRule operation = "-A" opCheckRule operation = "-C" opDeleteRule operation = "-D" diff --git a/pkg/util/iptables/monitor_test.go b/pkg/util/iptables/monitor_test.go new file mode 100644 index 00000000000..0a0cc875d6a --- /dev/null +++ b/pkg/util/iptables/monitor_test.go @@ -0,0 +1,268 @@ +// +build linux + +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package iptables + +import ( + "context" + "fmt" + "io" + "sync" + "sync/atomic" + "testing" + "time" + + "k8s.io/apimachinery/pkg/util/sets" + utilwait "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/utils/exec" +) + +// We can't use the normal FakeExec because we don't know precisely how many times the +// Monitor thread will do its checks, and we don't know precisely how its iptables calls +// will interleave with the main thread's. So we use our own fake Exec implementation that +// implements a minimal iptables interface. This will need updates as iptables.runner +// changes its use of Exec. +type monitorFakeExec struct { + sync.Mutex + + tables map[string]sets.String +} + +func newMonitorFakeExec() *monitorFakeExec { + tables := make(map[string]sets.String) + tables["mangle"] = sets.NewString() + tables["filter"] = sets.NewString() + tables["nat"] = sets.NewString() + return &monitorFakeExec{tables: tables} +} + +func (mfe *monitorFakeExec) Command(cmd string, args ...string) exec.Cmd { + return &monitorFakeCmd{mfe: mfe, cmd: cmd, args: args} +} + +func (mfe *monitorFakeExec) CommandContext(ctx context.Context, cmd string, args ...string) exec.Cmd { + return mfe.Command(cmd, args...) +} + +func (mfe *monitorFakeExec) LookPath(file string) (string, error) { + return file, nil +} + +type monitorFakeCmd struct { + mfe *monitorFakeExec + cmd string + args []string +} + +func (mfc *monitorFakeCmd) CombinedOutput() ([]byte, error) { + if mfc.cmd == cmdIPTablesRestore { + // Only used for "iptables-restore --version", and the result doesn't matter + return []byte{}, nil + } else if mfc.cmd != cmdIPTables { + panic("bad command " + mfc.cmd) + } + + if len(mfc.args) == 1 && mfc.args[0] == "--version" { + return []byte("iptables v1.6.2"), nil + } + + if len(mfc.args) != 6 || mfc.args[0] != WaitString || mfc.args[1] != WaitSecondsValue || mfc.args[4] != "-t" { + panic(fmt.Sprintf("bad args %#v", mfc.args)) + } + op := operation(mfc.args[2]) + chainName := mfc.args[3] + tableName := mfc.args[5] + + mfc.mfe.Lock() + defer mfc.mfe.Unlock() + + table := mfc.mfe.tables[tableName] + if table == nil { + return []byte{}, fmt.Errorf("no such table %q", tableName) + } + + switch op { + case opCreateChain: + if !table.Has(chainName) { + table.Insert(chainName) + } + return []byte{}, nil + case opListChain: + if table.Has(chainName) { + return []byte{}, nil + } else { + return []byte{}, fmt.Errorf("no such chain %q", chainName) + } + case opDeleteChain: + table.Delete(chainName) + return []byte{}, nil + default: + panic("should not be reached") + } +} + +func (mfc *monitorFakeCmd) SetStdin(in io.Reader) { + // Used by getIPTablesRestoreVersionString(), can be ignored +} + +func (mfc *monitorFakeCmd) Run() error { + panic("should not be reached") +} +func (mfc *monitorFakeCmd) Output() ([]byte, error) { + panic("should not be reached") +} +func (mfc *monitorFakeCmd) SetDir(dir string) { + panic("should not be reached") +} +func (mfc *monitorFakeCmd) SetStdout(out io.Writer) { + panic("should not be reached") +} +func (mfc *monitorFakeCmd) SetStderr(out io.Writer) { + panic("should not be reached") +} +func (mfc *monitorFakeCmd) SetEnv(env []string) { + panic("should not be reached") +} +func (mfc *monitorFakeCmd) StdoutPipe() (io.ReadCloser, error) { + panic("should not be reached") +} +func (mfc *monitorFakeCmd) StderrPipe() (io.ReadCloser, error) { + panic("should not be reached") +} +func (mfc *monitorFakeCmd) Start() error { + panic("should not be reached") +} +func (mfc *monitorFakeCmd) Wait() error { + panic("should not be reached") +} +func (mfc *monitorFakeCmd) Stop() { + panic("should not be reached") +} + +func TestIPTablesMonitor(t *testing.T) { + mfe := newMonitorFakeExec() + ipt := New(mfe, ProtocolIpv4) + + var reloads uint32 + stopCh := make(chan struct{}) + + canary := Chain("MONITOR-TEST-CANARY") + tables := []Table{TableMangle, TableFilter, TableNAT} + go ipt.Monitor(canary, tables, func() { + if !ensureNoChains(mfe) { + t.Errorf("reload called while canaries still exist") + } + atomic.AddUint32(&reloads, 1) + }, 100*time.Millisecond, stopCh) + + // Monitor should create canary chains quickly + if err := waitForChains(mfe, canary, tables); err != nil { + t.Errorf("failed to create iptables canaries: %v", err) + } + + if err := waitForReloads(&reloads, 0); err != nil { + t.Errorf("got unexpected reloads: %v", err) + } + + // If we delete all of the chains, it should reload + ipt.DeleteChain(TableMangle, canary) + ipt.DeleteChain(TableFilter, canary) + ipt.DeleteChain(TableNAT, canary) + + if err := waitForReloads(&reloads, 1); err != nil { + t.Errorf("got unexpected number of reloads after flush: %v", err) + } + if err := waitForChains(mfe, canary, tables); err != nil { + t.Errorf("failed to create iptables canaries: %v", err) + } + + // If we delete two chains, it should not reload yet + ipt.DeleteChain(TableMangle, canary) + ipt.DeleteChain(TableFilter, canary) + + if err := waitForNoReload(&reloads, 1); err != nil { + t.Errorf("got unexpected number of reloads after partial flush: %v", err) + } + + // If we delete the last chain, it should reload now + ipt.DeleteChain(TableNAT, canary) + + if err := waitForReloads(&reloads, 2); err != nil { + t.Errorf("got unexpected number of reloads after slow flush: %v", err) + } + if err := waitForChains(mfe, canary, tables); err != nil { + t.Errorf("failed to create iptables canaries: %v", err) + } + + // If we close the stop channel, it should stop running + close(stopCh) + + if err := waitForNoReload(&reloads, 2); err != nil { + t.Errorf("got unexpected number of reloads after partial flush: %v", err) + } + if !ensureNoChains(mfe) { + t.Errorf("canaries still exist after stopping monitor") + } +} + +func waitForChains(mfe *monitorFakeExec, canary Chain, tables []Table) error { + return utilwait.PollImmediate(100*time.Millisecond, time.Second, func() (bool, error) { + mfe.Lock() + defer mfe.Unlock() + + for _, table := range tables { + if !mfe.tables[string(table)].Has(string(canary)) { + return false, nil + } + } + return true, nil + }) +} + +func ensureNoChains(mfe *monitorFakeExec) bool { + mfe.Lock() + defer mfe.Unlock() + return mfe.tables["mangle"].Len() == 0 && + mfe.tables["filter"].Len() == 0 && + mfe.tables["nat"].Len() == 0 +} + +func waitForReloads(reloads *uint32, expected uint32) error { + if atomic.LoadUint32(reloads) < expected { + utilwait.PollImmediate(100*time.Millisecond, time.Second, func() (bool, error) { + return atomic.LoadUint32(reloads) >= expected, nil + }) + } + got := atomic.LoadUint32(reloads) + if got != expected { + return fmt.Errorf("expected %d, got %d", expected, got) + } + return nil +} + +func waitForNoReload(reloads *uint32, expected uint32) error { + utilwait.PollImmediate(50*time.Millisecond, 250*time.Millisecond, func() (bool, error) { + return atomic.LoadUint32(reloads) > expected, nil + }) + + got := atomic.LoadUint32(reloads) + if got != expected { + return fmt.Errorf("expected %d, got %d", expected, got) + } + return nil +} diff --git a/pkg/util/iptables/testing/fake.go b/pkg/util/iptables/testing/fake.go index 67309166784..c95d9810a65 100644 --- a/pkg/util/iptables/testing/fake.go +++ b/pkg/util/iptables/testing/fake.go @@ -20,6 +20,7 @@ import ( "bytes" "fmt" "strings" + "time" "k8s.io/kubernetes/pkg/util/iptables" ) @@ -99,6 +100,9 @@ func (f *FakeIPTables) RestoreAll(data []byte, flush iptables.FlushFlag, counter return nil } +func (f *FakeIPTables) Monitor(canary iptables.Chain, tables []iptables.Table, reloadFunc func(), interval time.Duration, stopCh <-chan struct{}) { +} + func getToken(line, separator string) string { tokens := strings.Split(line, separator) if len(tokens) == 2 {