From 967fd5aaf0ff780cf37b1609dfacaa757b60ad44 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Fri, 16 Aug 2019 11:08:36 -0400 Subject: [PATCH 1/3] e2e: test that both kube-proxy and kubelet recover after iptables flush --- test/e2e/network/networking.go | 88 ++++++++++++++++++++++++++++++++++ test/e2e/network/service.go | 12 ----- 2 files changed, 88 insertions(+), 12 deletions(-) diff --git a/test/e2e/network/networking.go b/test/e2e/network/networking.go index 55b1d2498da..1279672e32d 100644 --- a/test/e2e/network/networking.go +++ b/test/e2e/network/networking.go @@ -19,10 +19,14 @@ package network import ( "fmt" "net/http" + "strings" "k8s.io/apimachinery/pkg/util/sets" + utilwait "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/master/ports" "k8s.io/kubernetes/test/e2e/framework" + e2eservice "k8s.io/kubernetes/test/e2e/framework/service" + e2essh "k8s.io/kubernetes/test/e2e/framework/ssh" "github.com/onsi/ginkgo" ) @@ -235,4 +239,88 @@ var _ = SIGDescribe("Networking", func() { } }) }) + + ginkgo.It("should recreate its iptables rules if they are deleted [Disruptive]", func() { + framework.SkipUnlessProviderIs(framework.ProvidersWithSSH...) + framework.SkipUnlessSSHKeyPresent() + + hosts, err := e2essh.NodeSSHHosts(f.ClientSet) + framework.ExpectNoError(err, "failed to find external/internal IPs for every node") + if len(hosts) == 0 { + framework.Failf("No ssh-able nodes") + } + host := hosts[0] + + ns := f.Namespace.Name + numPods, servicePort := 3, defaultServeHostnameServicePort + svc := "iptables-flush-test" + + defer func() { + framework.ExpectNoError(e2eservice.StopServeHostnameService(f.ClientSet, ns, svc)) + }() + podNames, svcIP, err := e2eservice.StartServeHostnameService(f.ClientSet, getServeHostnameService(svc), ns, numPods) + framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svc, ns) + + // Ideally we want to reload the system firewall, but we don't necessarily + // know how to do that on this system ("firewall-cmd --reload"? "systemctl + // restart iptables"?). So instead we just manually delete all "KUBE-" + // chains. + + ginkgo.By("dumping iptables rules on a node") + result, err := e2essh.SSH("sudo iptables-save", host, framework.TestContext.Provider) + if err != nil || result.Code != 0 { + e2essh.LogResult(result) + framework.Failf("couldn't dump iptable rules: %v", err) + } + + // All the commands that delete rules have to come before all the commands + // that delete chains, since the chains can't be deleted while there are + // still rules referencing them. + var deleteRuleCmds, deleteChainCmds []string + table := "" + for _, line := range strings.Split(result.Stdout, "\n") { + if strings.HasPrefix(line, "*") { + table = line[1:] + } else if table == "" { + continue + } + + // Delete jumps from non-KUBE chains to KUBE chains + if !strings.HasPrefix(line, "-A KUBE-") && strings.Contains(line, "-j KUBE-") { + deleteRuleCmds = append(deleteRuleCmds, fmt.Sprintf("sudo iptables -t %s -D %s || true", table, line[3:])) + } + // Flush and delete all KUBE chains + if strings.HasPrefix(line, ":KUBE-") { + chain := strings.Split(line, " ")[0][1:] + deleteRuleCmds = append(deleteRuleCmds, fmt.Sprintf("sudo iptables -t %s -F %s || true", table, chain)) + deleteChainCmds = append(deleteChainCmds, fmt.Sprintf("sudo iptables -t %s -X %s || true", table, chain)) + } + } + cmd := strings.Join(append(deleteRuleCmds, deleteChainCmds...), "\n") + + ginkgo.By("deleting all KUBE-* iptables chains") + result, err = e2essh.SSH(cmd, host, framework.TestContext.Provider) + if err != nil || result.Code != 0 { + e2essh.LogResult(result) + framework.Failf("couldn't delete iptable rules: %v", err) + } + + ginkgo.By("verifying that kube-proxy rules are eventually recreated") + framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceUp(f.ClientSet, ns, host, podNames, svcIP, servicePort)) + + ginkgo.By("verifying that kubelet rules are eventually recreated") + err = utilwait.PollImmediate(framework.Poll, framework.RestartNodeReadyAgainTimeout, func() (bool, error) { + result, err = e2essh.SSH("sudo iptables-save -t nat", host, framework.TestContext.Provider) + if err != nil || result.Code != 0 { + e2essh.LogResult(result) + return false, err + } + + if strings.Contains(result.Stdout, "\n-A KUBE-MARK-DROP ") { + return true, nil + } + return false, nil + }) + framework.ExpectNoError(err, "kubelet did not recreate its iptables rules") + }) }) diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index 83dd61e055f..1fd882c1760 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -476,18 +476,6 @@ var _ = SIGDescribe("Services", func() { } framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceUp(cs, ns, host, podNames1, svc1IP, servicePort)) framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceUp(cs, ns, host, podNames2, svc2IP, servicePort)) - - ginkgo.By("Removing iptable rules") - result, err := e2essh.SSH(` - sudo iptables -t nat -F KUBE-SERVICES || true; - sudo iptables -t nat -F KUBE-PORTALS-HOST || true; - sudo iptables -t nat -F KUBE-PORTALS-CONTAINER || true`, host, framework.TestContext.Provider) - if err != nil || result.Code != 0 { - e2essh.LogResult(result) - framework.Failf("couldn't remove iptable rules: %v", err) - } - framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceUp(cs, ns, host, podNames1, svc1IP, servicePort)) - framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceUp(cs, ns, host, podNames2, svc2IP, servicePort)) }) ginkgo.It("should work after restarting apiserver [Disruptive]", func() { From b6c3d5416ac6adf0b555fb67b96554e00228f0d7 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Mon, 12 Aug 2019 15:03:29 -0400 Subject: [PATCH 2/3] Drop iptables firewalld monitoring support The firewalld monitoring code was not well tested (and not easily testable), would never be triggered on most platforms, and was only being taken advantage of from one place (kube-proxy), which didn't need it anyway since it already has its own resync loop. Since the firewalld monitoring was the only consumer of pkg/util/dbus, we can also now delete that. --- cmd/kube-proxy/app/BUILD | 10 - cmd/kube-proxy/app/server_others.go | 11 +- cmd/kubeadm/.import-restrictions | 1 - go.mod | 2 +- pkg/kubelet/BUILD | 1 - .../network/hostport/fake_iptables.go | 6 - pkg/kubelet/dockershim/network/kubenet/BUILD | 1 - .../network/kubenet/kubenet_linux.go | 6 +- pkg/kubelet/kubelet.go | 3 +- pkg/util/BUILD | 1 - pkg/util/dbus/BUILD | 38 --- pkg/util/dbus/dbus.go | 133 ---------- pkg/util/dbus/dbus_test.go | 243 ------------------ pkg/util/dbus/doc.go | 18 -- pkg/util/dbus/fake_dbus.go | 140 ---------- pkg/util/iptables/BUILD | 3 - pkg/util/iptables/iptables.go | 111 +------- pkg/util/iptables/iptables_test.go | 184 ++----------- pkg/util/iptables/testing/fake.go | 3 - test/test_owners.csv | 1 - 20 files changed, 34 insertions(+), 882 deletions(-) delete mode 100644 pkg/util/dbus/BUILD delete mode 100644 pkg/util/dbus/dbus.go delete mode 100644 pkg/util/dbus/dbus_test.go delete mode 100644 pkg/util/dbus/doc.go delete mode 100644 pkg/util/dbus/fake_dbus.go diff --git a/cmd/kube-proxy/app/BUILD b/cmd/kube-proxy/app/BUILD index f81a1dee004..c1a719f9159 100644 --- a/cmd/kube-proxy/app/BUILD +++ b/cmd/kube-proxy/app/BUILD @@ -75,7 +75,6 @@ go_library( ] + select({ "@io_bazel_rules_go//go/platform:android": [ "//pkg/proxy/metrics:go_default_library", - "//pkg/util/dbus:go_default_library", "//pkg/util/node:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", @@ -83,7 +82,6 @@ go_library( ], "@io_bazel_rules_go//go/platform:darwin": [ "//pkg/proxy/metrics:go_default_library", - "//pkg/util/dbus:go_default_library", "//pkg/util/node:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", @@ -91,7 +89,6 @@ go_library( ], "@io_bazel_rules_go//go/platform:dragonfly": [ "//pkg/proxy/metrics:go_default_library", - "//pkg/util/dbus:go_default_library", "//pkg/util/node:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", @@ -99,7 +96,6 @@ go_library( ], "@io_bazel_rules_go//go/platform:freebsd": [ "//pkg/proxy/metrics:go_default_library", - "//pkg/util/dbus:go_default_library", "//pkg/util/node:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", @@ -107,7 +103,6 @@ go_library( ], "@io_bazel_rules_go//go/platform:linux": [ "//pkg/proxy/metrics:go_default_library", - "//pkg/util/dbus:go_default_library", "//pkg/util/node:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", @@ -115,7 +110,6 @@ go_library( ], "@io_bazel_rules_go//go/platform:nacl": [ "//pkg/proxy/metrics:go_default_library", - "//pkg/util/dbus:go_default_library", "//pkg/util/node:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", @@ -123,7 +117,6 @@ go_library( ], "@io_bazel_rules_go//go/platform:netbsd": [ "//pkg/proxy/metrics:go_default_library", - "//pkg/util/dbus:go_default_library", "//pkg/util/node:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", @@ -131,7 +124,6 @@ go_library( ], "@io_bazel_rules_go//go/platform:openbsd": [ "//pkg/proxy/metrics:go_default_library", - "//pkg/util/dbus:go_default_library", "//pkg/util/node:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", @@ -139,7 +131,6 @@ go_library( ], "@io_bazel_rules_go//go/platform:plan9": [ "//pkg/proxy/metrics:go_default_library", - "//pkg/util/dbus:go_default_library", "//pkg/util/node:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", @@ -147,7 +138,6 @@ go_library( ], "@io_bazel_rules_go//go/platform:solaris": [ "//pkg/proxy/metrics:go_default_library", - "//pkg/util/dbus:go_default_library", "//pkg/util/node:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", diff --git a/cmd/kube-proxy/app/server_others.go b/cmd/kube-proxy/app/server_others.go index 933c9cc0c4a..af17e7ec1af 100644 --- a/cmd/kube-proxy/app/server_others.go +++ b/cmd/kube-proxy/app/server_others.go @@ -42,7 +42,6 @@ import ( "k8s.io/kubernetes/pkg/proxy/metrics" "k8s.io/kubernetes/pkg/proxy/userspace" "k8s.io/kubernetes/pkg/util/configz" - utildbus "k8s.io/kubernetes/pkg/util/dbus" utilipset "k8s.io/kubernetes/pkg/util/ipset" utiliptables "k8s.io/kubernetes/pkg/util/iptables" utilipvs "k8s.io/kubernetes/pkg/util/ipvs" @@ -84,13 +83,11 @@ func newProxyServer( var ipvsInterface utilipvs.Interface var kernelHandler ipvs.KernelHandler var ipsetInterface utilipset.Interface - var dbus utildbus.Interface // Create a iptables utils. execer := exec.New() - dbus = utildbus.New() - iptInterface = utiliptables.New(execer, dbus, protocol) + iptInterface = utiliptables.New(execer, protocol) kernelHandler = ipvs.NewLinuxKernelHandler() ipsetInterface = utilipset.New(execer) canUseIPVS, _ := ipvs.CanUseIPVSProxier(kernelHandler, ipsetInterface) @@ -181,10 +178,10 @@ func newProxyServer( var ipt [2]utiliptables.Interface if iptInterface.IsIpv6() { ipt[1] = iptInterface - ipt[0] = utiliptables.New(execer, dbus, utiliptables.ProtocolIpv4) + ipt[0] = utiliptables.New(execer, utiliptables.ProtocolIpv4) } else { ipt[0] = iptInterface - ipt[1] = utiliptables.New(execer, dbus, utiliptables.ProtocolIpv6) + ipt[1] = utiliptables.New(execer, utiliptables.ProtocolIpv6) } proxier, err = ipvs.NewDualStackProxier( @@ -253,8 +250,6 @@ func newProxyServer( } } - iptInterface.AddReloadFunc(proxier.Sync) - return &ProxyServer{ Client: client, EventClient: eventClient, diff --git a/cmd/kubeadm/.import-restrictions b/cmd/kubeadm/.import-restrictions index 47a50dfc7a1..e9319a5006e 100644 --- a/cmd/kubeadm/.import-restrictions +++ b/cmd/kubeadm/.import-restrictions @@ -75,7 +75,6 @@ "k8s.io/kubernetes/pkg/serviceaccount", "k8s.io/kubernetes/pkg/util/async", "k8s.io/kubernetes/pkg/util/conntrack", - "k8s.io/kubernetes/pkg/util/dbus", "k8s.io/kubernetes/pkg/util/hash", "k8s.io/kubernetes/pkg/util/iptables", "k8s.io/kubernetes/pkg/util/metrics", diff --git a/go.mod b/go.mod index 7b03c6cd562..a1baeb0c455 100644 --- a/go.mod +++ b/go.mod @@ -61,7 +61,7 @@ require ( github.com/go-openapi/strfmt v0.19.0 github.com/go-openapi/validate v0.19.2 github.com/go-ozzo/ozzo-validation v3.5.0+incompatible // indirect - github.com/godbus/dbus v4.1.0+incompatible + github.com/godbus/dbus v4.1.0+incompatible // indirect github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903 github.com/golang/mock v1.2.0 diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index 4aa528ec1fd..c55edf9277d 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -100,7 +100,6 @@ go_library( "//pkg/scheduler/api:go_default_library", "//pkg/security/apparmor:go_default_library", "//pkg/security/podsecuritypolicy/sysctl:go_default_library", - "//pkg/util/dbus:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/mount:go_default_library", "//pkg/util/node:go_default_library", diff --git a/pkg/kubelet/dockershim/network/hostport/fake_iptables.go b/pkg/kubelet/dockershim/network/hostport/fake_iptables.go index 3b7dc695116..f345c8f7f3e 100644 --- a/pkg/kubelet/dockershim/network/hostport/fake_iptables.go +++ b/pkg/kubelet/dockershim/network/hostport/fake_iptables.go @@ -335,12 +335,6 @@ func (f *fakeIPTables) RestoreAll(data []byte, flush utiliptables.FlushFlag, cou return f.restore("", data, flush) } -func (f *fakeIPTables) AddReloadFunc(reloadFunc func()) { -} - -func (f *fakeIPTables) Destroy() { -} - func (f *fakeIPTables) isBuiltinChain(tableName utiliptables.Table, chainName utiliptables.Chain) bool { if builtinChains, ok := f.builtinChains[string(tableName)]; ok && builtinChains.Has(string(chainName)) { return true diff --git a/pkg/kubelet/dockershim/network/kubenet/BUILD b/pkg/kubelet/dockershim/network/kubenet/BUILD index f42ee784d77..4211f74c371 100644 --- a/pkg/kubelet/dockershim/network/kubenet/BUILD +++ b/pkg/kubelet/dockershim/network/kubenet/BUILD @@ -42,7 +42,6 @@ go_library( "//pkg/kubelet/dockershim/network:go_default_library", "//pkg/kubelet/dockershim/network/hostport:go_default_library", "//pkg/util/bandwidth:go_default_library", - "//pkg/util/dbus:go_default_library", "//pkg/util/ebtables:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/sysctl:go_default_library", diff --git a/pkg/kubelet/dockershim/network/kubenet/kubenet_linux.go b/pkg/kubelet/dockershim/network/kubenet/kubenet_linux.go index 8e8dc4accac..df6145a50bb 100644 --- a/pkg/kubelet/dockershim/network/kubenet/kubenet_linux.go +++ b/pkg/kubelet/dockershim/network/kubenet/kubenet_linux.go @@ -41,7 +41,6 @@ import ( "k8s.io/kubernetes/pkg/kubelet/dockershim/network" "k8s.io/kubernetes/pkg/kubelet/dockershim/network/hostport" "k8s.io/kubernetes/pkg/util/bandwidth" - utildbus "k8s.io/kubernetes/pkg/util/dbus" utilebtables "k8s.io/kubernetes/pkg/util/ebtables" utiliptables "k8s.io/kubernetes/pkg/util/iptables" utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" @@ -125,9 +124,8 @@ type kubenetNetworkPlugin struct { func NewPlugin(networkPluginDirs []string, cacheDir string) network.NetworkPlugin { execer := utilexec.New() - dbus := utildbus.New() - iptInterface := utiliptables.New(execer, dbus, utiliptables.ProtocolIpv4) - iptInterfacev6 := utiliptables.New(execer, dbus, utiliptables.ProtocolIpv6) + iptInterface := utiliptables.New(execer, utiliptables.ProtocolIpv4) + iptInterfacev6 := utiliptables.New(execer, utiliptables.ProtocolIpv6) return &kubenetNetworkPlugin{ podIPs: make(map[kubecontainer.ContainerID]utilsets.String), execer: utilexec.New(), diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index c2acd358e59..99c28354546 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -109,7 +109,6 @@ import ( "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/security/apparmor" sysctlwhitelist "k8s.io/kubernetes/pkg/security/podsecuritypolicy/sysctl" - utildbus "k8s.io/kubernetes/pkg/util/dbus" utilipt "k8s.io/kubernetes/pkg/util/iptables" "k8s.io/kubernetes/pkg/util/mount" nodeutil "k8s.io/kubernetes/pkg/util/node" @@ -537,7 +536,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, nodeIPValidator: validateNodeIP, clock: clock.RealClock{}, enableControllerAttachDetach: kubeCfg.EnableControllerAttachDetach, - iptClient: utilipt.New(utilexec.New(), utildbus.New(), protocol), + iptClient: utilipt.New(utilexec.New(), protocol), makeIPTablesUtilChains: kubeCfg.MakeIPTablesUtilChains, iptablesMasqueradeBit: int(kubeCfg.IPTablesMasqueradeBit), iptablesDropBit: int(kubeCfg.IPTablesDropBit), diff --git a/pkg/util/BUILD b/pkg/util/BUILD index 576010282c5..9739e887daa 100644 --- a/pkg/util/BUILD +++ b/pkg/util/BUILD @@ -17,7 +17,6 @@ filegroup( "//pkg/util/configz:all-srcs", "//pkg/util/conntrack:all-srcs", "//pkg/util/coverage:all-srcs", - "//pkg/util/dbus:all-srcs", "//pkg/util/ebtables:all-srcs", "//pkg/util/env:all-srcs", "//pkg/util/filesystem:all-srcs", diff --git a/pkg/util/dbus/BUILD b/pkg/util/dbus/BUILD deleted file mode 100644 index 6b91c8b6c3e..00000000000 --- a/pkg/util/dbus/BUILD +++ /dev/null @@ -1,38 +0,0 @@ -package(default_visibility = ["//visibility:public"]) - -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", - "go_test", -) - -go_library( - name = "go_default_library", - srcs = [ - "dbus.go", - "doc.go", - "fake_dbus.go", - ], - importpath = "k8s.io/kubernetes/pkg/util/dbus", - deps = ["//vendor/github.com/godbus/dbus:go_default_library"], -) - -go_test( - name = "go_default_test", - srcs = ["dbus_test.go"], - embed = [":go_default_library"], - deps = ["//vendor/github.com/godbus/dbus:go_default_library"], -) - -filegroup( - name = "package-srcs", - srcs = glob(["**"]), - tags = ["automanaged"], - visibility = ["//visibility:private"], -) - -filegroup( - name = "all-srcs", - srcs = [":package-srcs"], - tags = ["automanaged"], -) diff --git a/pkg/util/dbus/dbus.go b/pkg/util/dbus/dbus.go deleted file mode 100644 index 702d16e5dce..00000000000 --- a/pkg/util/dbus/dbus.go +++ /dev/null @@ -1,133 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package dbus - -import ( - godbus "github.com/godbus/dbus" -) - -// Interface is an interface that presents a subset of the godbus/dbus API. Use this -// when you want to inject fakeable/mockable D-Bus behavior. -type Interface interface { - // SystemBus returns a connection to the system bus, connecting to it - // first if necessary - SystemBus() (Connection, error) - // SessionBus returns a connection to the session bus, connecting to it - // first if necessary - SessionBus() (Connection, error) -} - -// Connection represents a D-Bus connection -type Connection interface { - // Returns an Object representing the bus itself - BusObject() Object - - // Object creates a representation of a remote D-Bus object - Object(name, path string) Object - - // Signal registers or unregisters a channel to receive D-Bus signals - Signal(ch chan<- *godbus.Signal) -} - -// Object represents a remote D-Bus object -type Object interface { - // Call synchronously calls a D-Bus method - Call(method string, flags godbus.Flags, args ...interface{}) Call -} - -// Call represents a pending or completed D-Bus method call -type Call interface { - // Store returns a completed call's return values, or an error - Store(retvalues ...interface{}) error -} - -// Implements Interface in terms of actually talking to D-Bus -type dbusImpl struct { - systemBus *connImpl - sessionBus *connImpl -} - -// Implements Connection as a godbus.Conn -type connImpl struct { - conn *godbus.Conn -} - -// Implements Object as a godbus.Object -type objectImpl struct { - object godbus.BusObject -} - -// Implements Call as a godbus.Call -type callImpl struct { - call *godbus.Call -} - -// New returns a new Interface which will use godbus to talk to D-Bus -func New() Interface { - return &dbusImpl{} -} - -// SystemBus is part of Interface -func (db *dbusImpl) SystemBus() (Connection, error) { - if db.systemBus == nil { - bus, err := godbus.SystemBus() - if err != nil { - return nil, err - } - db.systemBus = &connImpl{bus} - } - - return db.systemBus, nil -} - -// SessionBus is part of Interface -func (db *dbusImpl) SessionBus() (Connection, error) { - if db.sessionBus == nil { - bus, err := godbus.SessionBus() - if err != nil { - return nil, err - } - db.sessionBus = &connImpl{bus} - } - - return db.sessionBus, nil -} - -// BusObject is part of the Connection interface -func (conn *connImpl) BusObject() Object { - return &objectImpl{conn.conn.BusObject()} -} - -// Object is part of the Connection interface -func (conn *connImpl) Object(name, path string) Object { - return &objectImpl{conn.conn.Object(name, godbus.ObjectPath(path))} -} - -// Signal is part of the Connection interface -func (conn *connImpl) Signal(ch chan<- *godbus.Signal) { - conn.conn.Signal(ch) -} - -// Call is part of the Object interface -func (obj *objectImpl) Call(method string, flags godbus.Flags, args ...interface{}) Call { - return &callImpl{obj.object.Call(method, flags, args...)} -} - -// Store is part of the Call interface -func (call *callImpl) Store(retvalues ...interface{}) error { - return call.call.Store(retvalues...) -} diff --git a/pkg/util/dbus/dbus_test.go b/pkg/util/dbus/dbus_test.go deleted file mode 100644 index 14e749991ca..00000000000 --- a/pkg/util/dbus/dbus_test.go +++ /dev/null @@ -1,243 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package dbus - -import ( - "fmt" - "os" - "testing" - - godbus "github.com/godbus/dbus" -) - -const ( - DBusNameFlagDoNotQueue uint32 = 1 << (iota + 1) -) - -const ( - DBusRequestNameReplyPrimaryOwner uint32 = iota + 1 - DBusRequestNameReplyAlreadyOwner -) - -const ( - DBusReleaseNameReplyReleased uint32 = iota + 1 - DBusReleaseNameReplyNotOwner -) - -func doDBusTest(t *testing.T, dbus Interface, real bool) { - bus, err := dbus.SystemBus() - if err != nil { - if !real { - t.Errorf("dbus.SystemBus() failed with fake Interface") - } - t.Skipf("D-Bus is not running: %v", err) - } - busObj := bus.BusObject() - - id := "" - err = busObj.Call("org.freedesktop.DBus.GetId", 0).Store(&id) - if err != nil { - t.Errorf("expected success, got %v", err) - } - if len(id) == 0 { - t.Errorf("expected non-empty Id, got \"\"") - } - - // Switch to the session bus for the rest, since the system bus is more - // locked down (and thus harder to trick into emitting signals). - - bus, err = dbus.SessionBus() - if err != nil { - if !real { - t.Errorf("dbus.SystemBus() failed with fake Interface") - } - t.Skipf("D-Bus session bus is not available: %v", err) - } - busObj = bus.BusObject() - - name := fmt.Sprintf("io.kubernetes.dbus_test_%d", os.Getpid()) - owner := "" - err = busObj.Call("org.freedesktop.DBus.GetNameOwner", 0, name).Store(&owner) - if err == nil { - t.Errorf("expected '%s' to be un-owned, but found owner %s", name, owner) - } - dbuserr, ok := err.(godbus.Error) - if !ok { - t.Errorf("expected godbus.Error, but got %#v", err) - } - if dbuserr.Name != "org.freedesktop.DBus.Error.NameHasNoOwner" { - t.Errorf("expected NameHasNoOwner error but got %v", err) - } - - sigchan := make(chan *godbus.Signal, 10) - bus.Signal(sigchan) - - rule := fmt.Sprintf("type='signal',interface='org.freedesktop.DBus',member='NameOwnerChanged',path='/org/freedesktop/DBus',sender='org.freedesktop.DBus',arg0='%s'", name) - err = busObj.Call("org.freedesktop.DBus.AddMatch", 0, rule).Store() - if err != nil { - t.Errorf("expected success, got %v", err) - } - - var ret uint32 - err = busObj.Call("org.freedesktop.DBus.RequestName", 0, name, DBusNameFlagDoNotQueue).Store(&ret) - if err != nil { - t.Errorf("expected success, got %v", err) - } - if ret != DBusRequestNameReplyPrimaryOwner { - t.Errorf("expected %v, got %v", DBusRequestNameReplyPrimaryOwner, ret) - } - - err = busObj.Call("org.freedesktop.DBus.GetNameOwner", 0, name).Store(&owner) - if err != nil { - t.Errorf("expected success, got %v", err) - } - - var changedSignal, acquiredSignal, lostSignal *godbus.Signal - - sig1 := <-sigchan - sig2 := <-sigchan - // We get two signals, but the order isn't guaranteed - if sig1.Name == "org.freedesktop.DBus.NameOwnerChanged" { - changedSignal = sig1 - acquiredSignal = sig2 - } else { - acquiredSignal = sig1 - changedSignal = sig2 - } - - if acquiredSignal.Sender != "org.freedesktop.DBus" || acquiredSignal.Name != "org.freedesktop.DBus.NameAcquired" { - t.Errorf("expected NameAcquired signal, got %v", acquiredSignal) - } - acquiredName := acquiredSignal.Body[0].(string) - if acquiredName != name { - t.Errorf("unexpected NameAcquired arguments: %v", acquiredSignal) - } - - if changedSignal.Sender != "org.freedesktop.DBus" || changedSignal.Name != "org.freedesktop.DBus.NameOwnerChanged" { - t.Errorf("expected NameOwnerChanged signal, got %v", changedSignal) - } - - changedName := changedSignal.Body[0].(string) - oldOwner := changedSignal.Body[1].(string) - newOwner := changedSignal.Body[2].(string) - if changedName != name || oldOwner != "" || newOwner != owner { - t.Errorf("unexpected NameOwnerChanged arguments: %v", changedSignal) - } - - err = busObj.Call("org.freedesktop.DBus.ReleaseName", 0, name).Store(&ret) - if err != nil { - t.Errorf("expected success, got %v", err) - } - if ret != DBusReleaseNameReplyReleased { - t.Errorf("expected %v, got %v", DBusReleaseNameReplyReleased, ret) - } - - sig1 = <-sigchan - sig2 = <-sigchan - if sig1.Name == "org.freedesktop.DBus.NameOwnerChanged" { - changedSignal = sig1 - lostSignal = sig2 - } else { - lostSignal = sig1 - changedSignal = sig2 - } - - if lostSignal.Sender != "org.freedesktop.DBus" || lostSignal.Name != "org.freedesktop.DBus.NameLost" { - t.Errorf("expected NameLost signal, got %v", lostSignal) - } - lostName := lostSignal.Body[0].(string) - if lostName != name { - t.Errorf("unexpected NameLost arguments: %v", lostSignal) - } - - if changedSignal.Sender != "org.freedesktop.DBus" || changedSignal.Name != "org.freedesktop.DBus.NameOwnerChanged" { - t.Errorf("expected NameOwnerChanged signal, got %v", changedSignal) - } - - changedName = changedSignal.Body[0].(string) - oldOwner = changedSignal.Body[1].(string) - newOwner = changedSignal.Body[2].(string) - if changedName != name || oldOwner != owner || newOwner != "" { - t.Errorf("unexpected NameOwnerChanged arguments: %v", changedSignal) - } - - if len(sigchan) != 0 { - t.Errorf("unexpected extra signals (%d)", len(sigchan)) - } - - // Unregister sigchan - bus.Signal(sigchan) -} - -func TestRealDBus(t *testing.T) { - dbus := New() - doDBusTest(t, dbus, true) -} - -func TestFakeDBus(t *testing.T) { - uniqueName := ":1.1" - ownedName := "" - - fakeSystem := NewFakeConnection() - fakeSystem.SetBusObject( - func(method string, args ...interface{}) ([]interface{}, error) { - if method == "org.freedesktop.DBus.GetId" { - return []interface{}{"foo"}, nil - } - return nil, fmt.Errorf("unexpected method call '%s'", method) - }, - ) - - fakeSession := NewFakeConnection() - fakeSession.SetBusObject( - func(method string, args ...interface{}) ([]interface{}, error) { - if method == "org.freedesktop.DBus.GetNameOwner" { - checkName := args[0].(string) - if checkName != ownedName { - return nil, godbus.Error{Name: "org.freedesktop.DBus.Error.NameHasNoOwner", Body: nil} - } - return []interface{}{uniqueName}, nil - } else if method == "org.freedesktop.DBus.RequestName" { - reqName := args[0].(string) - _ = args[1].(uint32) - if ownedName != "" { - return []interface{}{DBusRequestNameReplyAlreadyOwner}, nil - } - ownedName = reqName - fakeSession.EmitSignal("org.freedesktop.DBus", "/org/freedesktop/DBus", "org.freedesktop.DBus", "NameAcquired", reqName) - fakeSession.EmitSignal("org.freedesktop.DBus", "/org/freedesktop/DBus", "org.freedesktop.DBus", "NameOwnerChanged", reqName, "", uniqueName) - return []interface{}{DBusRequestNameReplyPrimaryOwner}, nil - } else if method == "org.freedesktop.DBus.ReleaseName" { - reqName := args[0].(string) - if reqName != ownedName { - return []interface{}{DBusReleaseNameReplyNotOwner}, nil - } - ownedName = "" - fakeSession.EmitSignal("org.freedesktop.DBus", "/org/freedesktop/DBus", "org.freedesktop.DBus", "NameOwnerChanged", reqName, uniqueName, "") - fakeSession.EmitSignal("org.freedesktop.DBus", "/org/freedesktop/DBus", "org.freedesktop.DBus", "NameLost", reqName) - return []interface{}{DBusReleaseNameReplyReleased}, nil - } else if method == "org.freedesktop.DBus.AddMatch" { - return nil, nil - } else { - return nil, fmt.Errorf("unexpected method call '%s'", method) - } - }, - ) - - dbus := NewFake(fakeSystem, fakeSession) - doDBusTest(t, dbus, false) -} diff --git a/pkg/util/dbus/doc.go b/pkg/util/dbus/doc.go deleted file mode 100644 index b07da628d98..00000000000 --- a/pkg/util/dbus/doc.go +++ /dev/null @@ -1,18 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package dbus provides an injectable interface and implementations for D-Bus communication -package dbus // import "k8s.io/kubernetes/pkg/util/dbus" diff --git a/pkg/util/dbus/fake_dbus.go b/pkg/util/dbus/fake_dbus.go deleted file mode 100644 index cceec4ca772..00000000000 --- a/pkg/util/dbus/fake_dbus.go +++ /dev/null @@ -1,140 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package dbus - -import ( - "fmt" - "sync" - - godbus "github.com/godbus/dbus" -) - -// Fake is a simple fake Interface type. -type Fake struct { - systemBus *FakeConnection - sessionBus *FakeConnection -} - -// FakeConnection represents a fake D-Bus connection -type FakeConnection struct { - lock sync.Mutex - busObject *fakeObject - objects map[string]*fakeObject - signalHandlers []chan<- *godbus.Signal -} - -// FakeHandler is used to handle fake D-Bus method calls -type FakeHandler func(method string, args ...interface{}) ([]interface{}, error) - -type fakeObject struct { - handler FakeHandler -} - -type fakeCall struct { - ret []interface{} - err error -} - -// NewFake returns a new Interface which will fake talking to D-Bus -func NewFake(systemBus *FakeConnection, sessionBus *FakeConnection) *Fake { - return &Fake{systemBus, sessionBus} -} - -// NewFakeConnection returns a FakeConnection Interface -func NewFakeConnection() *FakeConnection { - return &FakeConnection{ - objects: make(map[string]*fakeObject), - } -} - -// SystemBus is part of Interface -func (db *Fake) SystemBus() (Connection, error) { - if db.systemBus != nil { - return db.systemBus, nil - } - return nil, fmt.Errorf("DBus is not running") -} - -// SessionBus is part of Interface -func (db *Fake) SessionBus() (Connection, error) { - if db.sessionBus != nil { - return db.sessionBus, nil - } - return nil, fmt.Errorf("DBus is not running") -} - -// BusObject is part of the Connection interface -func (conn *FakeConnection) BusObject() Object { - return conn.busObject -} - -// Object is part of the Connection interface -func (conn *FakeConnection) Object(name, path string) Object { - return conn.objects[name+path] -} - -// Signal is part of the Connection interface -func (conn *FakeConnection) Signal(ch chan<- *godbus.Signal) { - conn.lock.Lock() - defer conn.lock.Unlock() - for i := range conn.signalHandlers { - if conn.signalHandlers[i] == ch { - conn.signalHandlers = append(conn.signalHandlers[:i], conn.signalHandlers[i+1:]...) - return - } - } - conn.signalHandlers = append(conn.signalHandlers, ch) -} - -// SetBusObject sets the handler for the BusObject of conn -func (conn *FakeConnection) SetBusObject(handler FakeHandler) { - conn.busObject = &fakeObject{handler} -} - -// AddObject adds a handler for the Object at name and path -func (conn *FakeConnection) AddObject(name, path string, handler FakeHandler) { - conn.objects[name+path] = &fakeObject{handler} -} - -// EmitSignal emits a signal on conn -func (conn *FakeConnection) EmitSignal(name, path, iface, signal string, args ...interface{}) { - conn.lock.Lock() - defer conn.lock.Unlock() - sig := &godbus.Signal{ - Sender: name, - Path: godbus.ObjectPath(path), - Name: iface + "." + signal, - Body: args, - } - for _, ch := range conn.signalHandlers { - ch <- sig - } -} - -// Call is part of the Object interface -func (obj *fakeObject) Call(method string, flags godbus.Flags, args ...interface{}) Call { - ret, err := obj.handler(method, args...) - return &fakeCall{ret, err} -} - -// Store is part of the Call interface -func (call *fakeCall) Store(retvalues ...interface{}) error { - if call.err != nil { - return call.err - } - return godbus.Store(call.ret, retvalues...) -} diff --git a/pkg/util/iptables/BUILD b/pkg/util/iptables/BUILD index a0d32ac7f78..bf03082c5e0 100644 --- a/pkg/util/iptables/BUILD +++ b/pkg/util/iptables/BUILD @@ -17,10 +17,8 @@ go_library( ], importpath = "k8s.io/kubernetes/pkg/util/iptables", deps = [ - "//pkg/util/dbus:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/version:go_default_library", - "//vendor/github.com/godbus/dbus:go_default_library", "//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", "//vendor/k8s.io/utils/trace:go_default_library", @@ -43,7 +41,6 @@ go_test( embed = [":go_default_library"], deps = select({ "@io_bazel_rules_go//go/platform:linux": [ - "//pkg/util/dbus:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/version:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", diff --git a/pkg/util/iptables/iptables.go b/pkg/util/iptables/iptables.go index 50694cc4658..da4c7d7ec1a 100644 --- a/pkg/util/iptables/iptables.go +++ b/pkg/util/iptables/iptables.go @@ -25,11 +25,9 @@ import ( "sync" "time" - godbus "github.com/godbus/dbus" "k8s.io/apimachinery/pkg/util/sets" utilversion "k8s.io/apimachinery/pkg/util/version" "k8s.io/klog" - utildbus "k8s.io/kubernetes/pkg/util/dbus" utilexec "k8s.io/utils/exec" utiltrace "k8s.io/utils/trace" ) @@ -65,10 +63,6 @@ type Interface interface { Restore(table Table, data []byte, flush FlushFlag, counters RestoreCountersFlag) error // RestoreAll is the same as Restore except that no table is specified. RestoreAll(data []byte, flush FlushFlag, counters RestoreCountersFlag) error - // AddReloadFunc adds a function to call on iptables reload - AddReloadFunc(reloadFunc func()) - // Destroy cleans up resources used by the Interface - Destroy() // HasRandomFully reveals whether `-j MASQUERADE` takes the // `--random-fully` option. This is helpful to work around a // Linux kernel bug that sometimes causes multiple flows to get @@ -143,22 +137,17 @@ const LockfilePath16x = "/run/xtables.lock" type runner struct { mu sync.Mutex exec utilexec.Interface - dbus utildbus.Interface protocol Protocol hasCheck bool - hasListener bool hasRandomFully bool waitFlag []string restoreWaitFlag []string lockfilePath string - - reloadFuncs []func() - signal chan *godbus.Signal } // newInternal returns a new Interface which will exec iptables, and allows the // caller to change the iptables-restore lockfile path -func newInternal(exec utilexec.Interface, dbus utildbus.Interface, protocol Protocol, lockfilePath string) Interface { +func newInternal(exec utilexec.Interface, protocol Protocol, lockfilePath string) Interface { version, err := getIPTablesVersion(exec, protocol) if err != nil { klog.Warningf("Error checking iptables version, assuming version at least %s: %v", MinCheckVersion, err) @@ -171,10 +160,8 @@ func newInternal(exec utilexec.Interface, dbus utildbus.Interface, protocol Prot runner := &runner{ exec: exec, - dbus: dbus, protocol: protocol, hasCheck: version.AtLeast(MinCheckVersion), - hasListener: false, hasRandomFully: version.AtLeast(RandomFullyMinVersion), waitFlag: getIPTablesWaitFlag(version), restoreWaitFlag: getIPTablesRestoreWaitFlag(version, exec, protocol), @@ -184,44 +171,8 @@ func newInternal(exec utilexec.Interface, dbus utildbus.Interface, protocol Prot } // New returns a new Interface which will exec iptables. -func New(exec utilexec.Interface, dbus utildbus.Interface, protocol Protocol) Interface { - return newInternal(exec, dbus, protocol, "") -} - -// Destroy is part of Interface. -func (runner *runner) Destroy() { - if runner.signal != nil { - runner.signal <- nil - } -} - -const ( - firewalldName = "org.fedoraproject.FirewallD1" - firewalldPath = "/org/fedoraproject/FirewallD1" - firewalldInterface = "org.fedoraproject.FirewallD1" -) - -// Connects to D-Bus and listens for FirewallD start/restart. (On non-FirewallD-using -// systems, this is effectively a no-op; we listen for the signals, but they will never be -// emitted, so reload() will never be called.) -func (runner *runner) connectToFirewallD() { - bus, err := runner.dbus.SystemBus() - if err != nil { - klog.V(1).Infof("Could not connect to D-Bus system bus: %s", err) - return - } - runner.hasListener = true - - rule := fmt.Sprintf("type='signal',sender='%s',path='%s',interface='%s',member='Reloaded'", firewalldName, firewalldPath, firewalldInterface) - bus.BusObject().Call("org.freedesktop.DBus.AddMatch", 0, rule) - - rule = fmt.Sprintf("type='signal',interface='org.freedesktop.DBus',member='NameOwnerChanged',path='/org/freedesktop/DBus',sender='org.freedesktop.DBus',arg0='%s'", firewalldName) - bus.BusObject().Call("org.freedesktop.DBus.AddMatch", 0, rule) - - runner.signal = make(chan *godbus.Signal, 10) - bus.Signal(runner.signal) - - go runner.dbusSignalHandler(bus) +func New(exec utilexec.Interface, protocol Protocol) Interface { + return newInternal(exec, protocol, "") } // EnsureChain is part of Interface. @@ -620,62 +571,6 @@ func getIPTablesRestoreVersionString(exec utilexec.Interface, protocol Protocol) return match[1], nil } -// goroutine to listen for D-Bus signals -func (runner *runner) dbusSignalHandler(bus utildbus.Connection) { - firewalld := bus.Object(firewalldName, firewalldPath) - - for s := range runner.signal { - if s == nil { - // Unregister - bus.Signal(runner.signal) - return - } - - switch s.Name { - case "org.freedesktop.DBus.NameOwnerChanged": - name := s.Body[0].(string) - newOwner := s.Body[2].(string) - - if name != firewalldName || len(newOwner) == 0 { - continue - } - - // FirewallD startup (specifically the part where it deletes - // all existing iptables rules) may not yet be complete when - // we get this signal, so make a dummy request to it to - // synchronize. - firewalld.Call(firewalldInterface+".getDefaultZone", 0) - - runner.reload() - case firewalldInterface + ".Reloaded": - runner.reload() - } - } -} - -// AddReloadFunc is part of Interface -func (runner *runner) AddReloadFunc(reloadFunc func()) { - runner.mu.Lock() - defer runner.mu.Unlock() - - // We only need to listen to firewalld if there are Reload functions, so lazy - // initialize the listener. - if !runner.hasListener { - runner.connectToFirewallD() - } - - runner.reloadFuncs = append(runner.reloadFuncs, reloadFunc) -} - -// runs all reload funcs to re-sync iptables rules -func (runner *runner) reload() { - klog.V(1).Infof("reloading iptables rules") - - for _, f := range runner.reloadFuncs { - f() - } -} - func (runner *runner) HasRandomFully() bool { return runner.hasRandomFully } diff --git a/pkg/util/iptables/iptables_test.go b/pkg/util/iptables/iptables_test.go index fb34f017f02..cda4cda94c1 100644 --- a/pkg/util/iptables/iptables_test.go +++ b/pkg/util/iptables/iptables_test.go @@ -26,11 +26,9 @@ import ( "reflect" "strings" "testing" - "time" "k8s.io/apimachinery/pkg/util/sets" utilversion "k8s.io/apimachinery/pkg/util/version" - "k8s.io/kubernetes/pkg/util/dbus" "k8s.io/utils/exec" fakeexec "k8s.io/utils/exec/testing" ) @@ -64,8 +62,7 @@ func testIPTablesVersionCmds(t *testing.T, protocol Protocol) { func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := New(&fexec, dbus.NewFake(nil, nil), protocol) - defer runner.Destroy() + _ = New(&fexec, protocol) // Check that proper iptables version command was used during runner instantiation if !sets.NewString(fcmd.CombinedOutputLog[0]...).HasAll(iptablesCmd, "--version") { @@ -109,8 +106,7 @@ func testEnsureChain(t *testing.T, protocol Protocol) { func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := New(&fexec, dbus.NewFake(nil, nil), protocol) - defer runner.Destroy() + runner := New(&fexec, protocol) // Success. exists, err := runner.EnsureChain(TableNAT, Chain("FOOBAR")) if err != nil { @@ -167,8 +163,7 @@ func TestFlushChain(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4) - defer runner.Destroy() + runner := New(&fexec, ProtocolIpv4) // Success. err := runner.FlushChain(TableNAT, Chain("FOOBAR")) if err != nil { @@ -205,8 +200,7 @@ func TestDeleteChain(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4) - defer runner.Destroy() + runner := New(&fexec, ProtocolIpv4) // Success. err := runner.DeleteChain(TableNAT, Chain("FOOBAR")) if err != nil { @@ -242,8 +236,7 @@ func TestEnsureRuleAlreadyExists(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4) - defer runner.Destroy() + runner := New(&fexec, ProtocolIpv4) exists, err := runner.EnsureRule(Append, TableNAT, ChainOutput, "abc", "123") if err != nil { t.Errorf("expected success, got %v", err) @@ -279,8 +272,7 @@ func TestEnsureRuleNew(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4) - defer runner.Destroy() + runner := New(&fexec, ProtocolIpv4) exists, err := runner.EnsureRule(Append, TableNAT, ChainOutput, "abc", "123") if err != nil { t.Errorf("expected success, got %v", err) @@ -313,8 +305,7 @@ func TestEnsureRuleErrorChecking(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4) - defer runner.Destroy() + runner := New(&fexec, ProtocolIpv4) _, err := runner.EnsureRule(Append, TableNAT, ChainOutput, "abc", "123") if err == nil { t.Errorf("expected failure") @@ -344,8 +335,7 @@ func TestEnsureRuleErrorCreating(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4) - defer runner.Destroy() + runner := New(&fexec, ProtocolIpv4) _, err := runner.EnsureRule(Append, TableNAT, ChainOutput, "abc", "123") if err == nil { t.Errorf("expected failure") @@ -372,8 +362,7 @@ func TestDeleteRuleDoesNotExist(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4) - defer runner.Destroy() + runner := New(&fexec, ProtocolIpv4) err := runner.DeleteRule(TableNAT, ChainOutput, "abc", "123") if err != nil { t.Errorf("expected success, got %v", err) @@ -406,8 +395,7 @@ func TestDeleteRuleExists(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4) - defer runner.Destroy() + runner := New(&fexec, ProtocolIpv4) err := runner.DeleteRule(TableNAT, ChainOutput, "abc", "123") if err != nil { t.Errorf("expected success, got %v", err) @@ -437,8 +425,7 @@ func TestDeleteRuleErrorChecking(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4) - defer runner.Destroy() + runner := New(&fexec, ProtocolIpv4) err := runner.DeleteRule(TableNAT, ChainOutput, "abc", "123") if err == nil { t.Errorf("expected failure") @@ -468,8 +455,7 @@ func TestDeleteRuleErrorDeleting(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4) - defer runner.Destroy() + runner := New(&fexec, ProtocolIpv4) err := runner.DeleteRule(TableNAT, ChainOutput, "abc", "123") if err == nil { t.Errorf("expected failure") @@ -504,7 +490,7 @@ func TestGetIPTablesHasCheckCommand(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - ipt := New(&fexec, nil, ProtocolIpv4) + ipt := New(&fexec, ProtocolIpv4) runner := ipt.(*runner) if testCase.Expected != runner.hasCheck { t.Errorf("Expected result: %v, Got result: %v", testCase.Expected, runner.hasCheck) @@ -665,8 +651,7 @@ func TestWaitFlagUnavailable(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4) - defer runner.Destroy() + runner := New(&fexec, ProtocolIpv4) err := runner.DeleteChain(TableNAT, Chain("FOOBAR")) if err != nil { t.Errorf("expected success, got %v", err) @@ -697,8 +682,7 @@ func TestWaitFlagOld(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4) - defer runner.Destroy() + runner := New(&fexec, ProtocolIpv4) err := runner.DeleteChain(TableNAT, Chain("FOOBAR")) if err != nil { t.Errorf("expected success, got %v", err) @@ -732,8 +716,7 @@ func TestWaitFlagNew(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4) - defer runner.Destroy() + runner := New(&fexec, ProtocolIpv4) err := runner.DeleteChain(TableNAT, Chain("FOOBAR")) if err != nil { t.Errorf("expected success, got %v", err) @@ -746,117 +729,6 @@ func TestWaitFlagNew(t *testing.T) { } } -func TestReload(t *testing.T) { - dbusConn := dbus.NewFakeConnection() - dbusConn.SetBusObject(func(method string, args ...interface{}) ([]interface{}, error) { return nil, nil }) - dbusConn.AddObject(firewalldName, firewalldPath, func(method string, args ...interface{}) ([]interface{}, error) { return nil, nil }) - fdbus := dbus.NewFake(dbusConn, nil) - - reloaded := make(chan bool, 2) - - fcmd := fakeexec.FakeCmd{ - CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ - // iptables version check - func() ([]byte, error) { return []byte("iptables v1.6.4"), nil }, - - // first reload - // EnsureChain - func() ([]byte, error) { return []byte{}, nil }, - // EnsureRule abc check - func() ([]byte, error) { return []byte{}, &fakeexec.FakeExitError{Status: 1} }, - // EnsureRule abc - func() ([]byte, error) { return []byte{}, nil }, - - // second reload - // EnsureChain - func() ([]byte, error) { return []byte{}, nil }, - // EnsureRule abc check - func() ([]byte, error) { return []byte{}, &fakeexec.FakeExitError{Status: 1} }, - // EnsureRule abc - func() ([]byte, error) { return []byte{}, nil }, - }, - } - fexec := fakeexec.FakeExec{ - CommandScript: []fakeexec.FakeCommandAction{ - func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, - func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, - func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, - func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, - func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, - func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, - func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, - }, - } - - runner := New(&fexec, fdbus, ProtocolIpv4) - defer runner.Destroy() - - runner.AddReloadFunc(func() { - exists, err := runner.EnsureChain(TableNAT, Chain("FOOBAR")) - if err != nil { - t.Errorf("expected success, got %v", err) - } - if exists { - t.Errorf("expected exists = false") - } - reloaded <- true - }) - - runner.AddReloadFunc(func() { - exists, err := runner.EnsureRule(Append, TableNAT, ChainOutput, "abc", "123") - if err != nil { - t.Errorf("expected success, got %v", err) - } - if exists { - t.Errorf("expected exists = false") - } - reloaded <- true - }) - - dbusConn.EmitSignal("org.freedesktop.DBus", "/org/freedesktop/DBus", "org.freedesktop.DBus", "NameOwnerChanged", firewalldName, "", ":1.1") - <-reloaded - <-reloaded - - if fcmd.CombinedOutputCalls != 4 { - t.Errorf("expected 4 CombinedOutput() calls total, got %d", fcmd.CombinedOutputCalls) - } - if !sets.NewString(fcmd.CombinedOutputLog[1]...).HasAll("iptables", "-t", "nat", "-N", "FOOBAR") { - t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[2]) - } - if !sets.NewString(fcmd.CombinedOutputLog[2]...).HasAll("iptables", "-t", "nat", "-C", "OUTPUT", "abc", "123") { - t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[3]) - } - if !sets.NewString(fcmd.CombinedOutputLog[3]...).HasAll("iptables", "-t", "nat", "-A", "OUTPUT", "abc", "123") { - t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[4]) - } - - go func() { time.Sleep(time.Second / 100); reloaded <- true }() - dbusConn.EmitSignal(firewalldName, firewalldPath, firewalldInterface, "DefaultZoneChanged", "public") - dbusConn.EmitSignal("org.freedesktop.DBus", "/org/freedesktop/DBus", "org.freedesktop.DBus", "NameOwnerChanged", "io.k8s.Something", "", ":1.1") - <-reloaded - - if fcmd.CombinedOutputCalls != 4 { - t.Errorf("Incorrect signal caused a reload") - } - - dbusConn.EmitSignal(firewalldName, firewalldPath, firewalldInterface, "Reloaded") - <-reloaded - <-reloaded - - if fcmd.CombinedOutputCalls != 7 { - t.Errorf("expected 7 CombinedOutput() calls total, got %d", fcmd.CombinedOutputCalls) - } - if !sets.NewString(fcmd.CombinedOutputLog[4]...).HasAll("iptables", "-t", "nat", "-N", "FOOBAR") { - t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[5]) - } - if !sets.NewString(fcmd.CombinedOutputLog[5]...).HasAll("iptables", "-t", "nat", "-C", "OUTPUT", "abc", "123") { - t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[6]) - } - if !sets.NewString(fcmd.CombinedOutputLog[6]...).HasAll("iptables", "-t", "nat", "-A", "OUTPUT", "abc", "123") { - t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[7]) - } -} - func testSaveInto(t *testing.T, protocol Protocol) { version := " v1.9.22" iptablesCmd := iptablesCommand(protocol) @@ -890,8 +762,7 @@ COMMIT func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := New(&fexec, dbus.NewFake(nil, nil), protocol) - defer runner.Destroy() + runner := New(&fexec, protocol) buffer := bytes.NewBuffer(nil) // Success. @@ -960,8 +831,7 @@ func testRestore(t *testing.T, protocol Protocol) { func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := New(&fexec, dbus.NewFake(nil, nil), protocol) - defer runner.Destroy() + runner := New(&fexec, protocol) // both flags true err := runner.Restore(TableNAT, []byte{}, FlushTables, RestoreCounters) @@ -1043,9 +913,8 @@ func TestRestoreAll(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := newInternal(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4, TestLockfilePath) + runner := newInternal(&fexec, ProtocolIpv4, TestLockfilePath) defer os.Remove(TestLockfilePath) - defer runner.Destroy() err := runner.RestoreAll([]byte{}, NoFlushTables, RestoreCounters) if err != nil { @@ -1085,9 +954,8 @@ func TestRestoreAllWait(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := newInternal(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4, TestLockfilePath) + runner := newInternal(&fexec, ProtocolIpv4, TestLockfilePath) defer os.Remove(TestLockfilePath) - defer runner.Destroy() err := runner.RestoreAll([]byte{}, NoFlushTables, RestoreCounters) if err != nil { @@ -1131,9 +999,8 @@ func TestRestoreAllWaitOldIptablesRestore(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := newInternal(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4, TestLockfilePath) + runner := newInternal(&fexec, ProtocolIpv4, TestLockfilePath) defer os.Remove(TestLockfilePath) - defer runner.Destroy() err := runner.RestoreAll([]byte{}, NoFlushTables, RestoreCounters) if err != nil { @@ -1178,9 +1045,8 @@ func TestRestoreAllGrabNewLock(t *testing.T) { }, } - runner := newInternal(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4, TestLockfilePath) + runner := newInternal(&fexec, ProtocolIpv4, TestLockfilePath) defer os.Remove(TestLockfilePath) - defer runner.Destroy() // Grab the /run lock and ensure the RestoreAll fails runLock, err := os.OpenFile(TestLockfilePath, os.O_CREATE, 0600) @@ -1221,9 +1087,8 @@ func TestRestoreAllGrabOldLock(t *testing.T) { }, } - runner := newInternal(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4, TestLockfilePath) + runner := newInternal(&fexec, ProtocolIpv4, TestLockfilePath) defer os.Remove(TestLockfilePath) - defer runner.Destroy() // Grab the abstract @xtables socket runLock, err := net.ListenUnix("unix", &net.UnixAddr{Name: "@xtables", Net: "unix"}) @@ -1262,9 +1127,8 @@ func TestRestoreAllWaitBackportedIptablesRestore(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := newInternal(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4, TestLockfilePath) + runner := newInternal(&fexec, ProtocolIpv4, TestLockfilePath) defer os.Remove(TestLockfilePath) - defer runner.Destroy() err := runner.RestoreAll([]byte{}, NoFlushTables, RestoreCounters) if err != nil { diff --git a/pkg/util/iptables/testing/fake.go b/pkg/util/iptables/testing/fake.go index 16382355915..67309166784 100644 --- a/pkg/util/iptables/testing/fake.go +++ b/pkg/util/iptables/testing/fake.go @@ -98,9 +98,6 @@ func (f *FakeIPTables) RestoreAll(data []byte, flush iptables.FlushFlag, counter f.Lines = data return nil } -func (*FakeIPTables) AddReloadFunc(reloadFunc func()) {} - -func (*FakeIPTables) Destroy() {} func getToken(line, separator string) string { tokens := strings.Split(line, separator) diff --git a/test/test_owners.csv b/test/test_owners.csv index 375e645e130..982f553fdb1 100644 --- a/test/test_owners.csv +++ b/test/test_owners.csv @@ -778,7 +778,6 @@ k8s.io/kubernetes/pkg/util/async,spxtr,1, k8s.io/kubernetes/pkg/util/bandwidth,thockin,1, k8s.io/kubernetes/pkg/util/config,jszczepkowski,1, k8s.io/kubernetes/pkg/util/configz,ixdy,1, -k8s.io/kubernetes/pkg/util/dbus,roberthbailey,1, k8s.io/kubernetes/pkg/util/env,asalkeld,0, k8s.io/kubernetes/pkg/util/goroutinemap,saad-ali,0, k8s.io/kubernetes/pkg/util/hash,timothysc,1, From 3948f16ff4060955b7f25d26d261b99589963ade Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Tue, 13 Aug 2019 08:29:39 -0400 Subject: [PATCH 3/3] Add iptables.Monitor, use it from kubelet and kube-proxy Kubelet and kube-proxy both had loops to ensure that their iptables rules didn't get deleted, by repeatedly recreating them. But on systems with lots of iptables rules (ie, thousands of services), this can be very slow (and thus might end up holding the iptables lock for several seconds, blocking other operations, etc). The specific threat that they need to worry about is firewall-management commands that flush *all* dynamic iptables rules. So add a new iptables.Monitor() function that handles this by creating iptables-flush canaries and only triggering a full rule reload after noticing that someone has deleted those chains. --- .../network/hostport/fake_iptables.go | 4 + pkg/kubelet/kubelet.go | 4 +- pkg/kubelet/kubelet_network_linux.go | 9 + pkg/kubelet/kubelet_network_others.go | 2 +- pkg/proxy/iptables/proxier.go | 8 +- pkg/util/iptables/BUILD | 4 +- pkg/util/iptables/iptables.go | 78 +++++ pkg/util/iptables/monitor_test.go | 268 ++++++++++++++++++ pkg/util/iptables/testing/fake.go | 4 + 9 files changed, 376 insertions(+), 5 deletions(-) create mode 100644 pkg/util/iptables/monitor_test.go diff --git a/pkg/kubelet/dockershim/network/hostport/fake_iptables.go b/pkg/kubelet/dockershim/network/hostport/fake_iptables.go index f345c8f7f3e..28db1b65a27 100644 --- a/pkg/kubelet/dockershim/network/hostport/fake_iptables.go +++ b/pkg/kubelet/dockershim/network/hostport/fake_iptables.go @@ -21,6 +21,7 @@ import ( "fmt" "net" "strings" + "time" "k8s.io/apimachinery/pkg/util/sets" utiliptables "k8s.io/kubernetes/pkg/util/iptables" @@ -335,6 +336,9 @@ func (f *fakeIPTables) RestoreAll(data []byte, flush utiliptables.FlushFlag, cou return f.restore("", data, flush) } +func (f *fakeIPTables) Monitor(canary utiliptables.Chain, tables []utiliptables.Table, reloadFunc func(), interval time.Duration, stopCh <-chan struct{}) { +} + func (f *fakeIPTables) isBuiltinChain(tableName utiliptables.Table, chainName utiliptables.Chain) bool { if builtinChains, ok := f.builtinChains[string(tableName)]; ok && builtinChains.Has(string(chainName)) { return true diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 99c28354546..994e502b794 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1427,9 +1427,9 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { } go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop) - // Start loop to sync iptables util rules + // Set up iptables util rules if kl.makeIPTablesUtilChains { - go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop) + kl.initNetworkUtil() } // Start a goroutine responsible for killing pods (that are not properly diff --git a/pkg/kubelet/kubelet_network_linux.go b/pkg/kubelet/kubelet_network_linux.go index 1c9ad46b989..f3b82c94142 100644 --- a/pkg/kubelet/kubelet_network_linux.go +++ b/pkg/kubelet/kubelet_network_linux.go @@ -20,11 +20,20 @@ package kubelet import ( "fmt" + "time" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog" utiliptables "k8s.io/kubernetes/pkg/util/iptables" ) +func (kl *Kubelet) initNetworkUtil() { + kl.syncNetworkUtil() + go kl.iptClient.Monitor(utiliptables.Chain("KUBE-KUBELET-CANARY"), + []utiliptables.Table{utiliptables.TableMangle, utiliptables.TableNAT, utiliptables.TableFilter}, + kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop) +} + // syncNetworkUtil ensures the network utility are present on host. // Network util includes: // 1. In nat table, KUBE-MARK-DROP rule to mark connections for dropping diff --git a/pkg/kubelet/kubelet_network_others.go b/pkg/kubelet/kubelet_network_others.go index 53267bfc5f6..bf0a3ba52fc 100644 --- a/pkg/kubelet/kubelet_network_others.go +++ b/pkg/kubelet/kubelet_network_others.go @@ -19,4 +19,4 @@ limitations under the License. package kubelet // Do nothing. -func (kl *Kubelet) syncNetworkUtil() {} +func (kl *Kubelet) initNetworkUtil() {} diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 5d0afa016ed..92287f9a550 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -323,7 +323,13 @@ func NewProxier(ipt utiliptables.Interface, } burstSyncs := 2 klog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs) - proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs) + // We pass syncPeriod to ipt.Monitor, which will call us only if it needs to. + // We need to pass *some* maxInterval to NewBoundedFrequencyRunner anyway though. + // time.Hour is arbitrary. + proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, time.Hour, burstSyncs) + go ipt.Monitor(utiliptables.Chain("KUBE-PROXY-CANARY"), + []utiliptables.Table{utiliptables.TableMangle, utiliptables.TableNAT, utiliptables.TableFilter}, + proxier.syncProxyRules, syncPeriod, wait.NeverStop) return proxier, nil } diff --git a/pkg/util/iptables/BUILD b/pkg/util/iptables/BUILD index bf03082c5e0..15241a00298 100644 --- a/pkg/util/iptables/BUILD +++ b/pkg/util/iptables/BUILD @@ -19,13 +19,13 @@ go_library( deps = [ "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/version:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", "//vendor/k8s.io/utils/trace:go_default_library", ] + select({ "@io_bazel_rules_go//go/platform:linux": [ "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/golang.org/x/sys/unix:go_default_library", ], "//conditions:default": [], @@ -36,6 +36,7 @@ go_test( name = "go_default_test", srcs = [ "iptables_test.go", + "monitor_test.go", "save_restore_test.go", ], embed = [":go_default_library"], @@ -43,6 +44,7 @@ go_test( "@io_bazel_rules_go//go/platform:linux": [ "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/version:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", "//vendor/k8s.io/utils/exec/testing:go_default_library", ], diff --git a/pkg/util/iptables/iptables.go b/pkg/util/iptables/iptables.go index da4c7d7ec1a..a060ed4d469 100644 --- a/pkg/util/iptables/iptables.go +++ b/pkg/util/iptables/iptables.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" utilversion "k8s.io/apimachinery/pkg/util/version" + utilwait "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog" utilexec "k8s.io/utils/exec" utiltrace "k8s.io/utils/trace" @@ -63,6 +64,17 @@ type Interface interface { Restore(table Table, data []byte, flush FlushFlag, counters RestoreCountersFlag) error // RestoreAll is the same as Restore except that no table is specified. RestoreAll(data []byte, flush FlushFlag, counters RestoreCountersFlag) error + // Monitor detects when the given iptables tables have been flushed by an external + // tool (e.g. a firewall reload) by creating canary chains and polling to see if + // they have been deleted. (Specifically, it polls tables[0] every interval until + // the canary has been deleted from there, then waits a short additional time for + // the canaries to be deleted from the remaining tables as well. You can optimize + // the polling by listing a relatively empty table in tables[0]). When a flush is + // detected, this calls the reloadFunc so the caller can reload their own iptables + // rules. If it is unable to create the canary chains (either initially or after + // a reload) it will log an error and stop monitoring. + // (This function should be called from a goroutine.) + Monitor(canary Chain, tables []Table, reloadFunc func(), interval time.Duration, stopCh <-chan struct{}) // HasRandomFully reveals whether `-j MASQUERADE` takes the // `--random-fully` option. This is helpful to work around a // Linux kernel bug that sometimes causes multiple flows to get @@ -480,12 +492,78 @@ func (runner *runner) checkRuleUsingCheck(args []string) (bool, error) { return false, fmt.Errorf("error checking rule: %v: %s", err, out) } +const ( + // Max time we wait for an iptables flush to complete after we notice it has started + iptablesFlushTimeout = 5 * time.Second + // How often we poll while waiting for an iptables flush to complete + iptablesFlushPollTime = 100 * time.Millisecond +) + +// Monitor is part of Interface +func (runner *runner) Monitor(canary Chain, tables []Table, reloadFunc func(), interval time.Duration, stopCh <-chan struct{}) { + for { + for _, table := range tables { + if _, err := runner.EnsureChain(table, canary); err != nil { + klog.Warningf("Could not set up iptables canary %s/%s: %v", string(table), string(canary), err) + return + } + } + + // Poll until stopCh is closed or iptables is flushed + err := utilwait.PollUntil(interval, func() (bool, error) { + if runner.chainExists(tables[0], canary) { + return false, nil + } + klog.V(2).Infof("iptables canary %s/%s deleted", string(tables[0]), string(canary)) + + // Wait for the other canaries to be deleted too before returning + // so we don't start reloading too soon. + err := utilwait.PollImmediate(iptablesFlushPollTime, iptablesFlushTimeout, func() (bool, error) { + for i := 1; i < len(tables); i++ { + if runner.chainExists(tables[i], canary) { + return false, nil + } + } + return true, nil + }) + if err != nil { + klog.Warning("Inconsistent iptables state detected.") + } + return true, nil + }, stopCh) + + if err != nil { + // stopCh was closed + for _, table := range tables { + _ = runner.DeleteChain(table, canary) + } + return + } + + klog.V(2).Infof("Reloading after iptables flush") + reloadFunc() + } +} + +// chainExists is used internally by Monitor; none of the public Interface methods can be +// used to distinguish "chain exists" from "chain does not exist" with no side effects +func (runner *runner) chainExists(table Table, chain Chain) bool { + fullArgs := makeFullArgs(table, chain) + + runner.mu.Lock() + defer runner.mu.Unlock() + + _, err := runner.run(opListChain, fullArgs) + return err == nil +} + type operation string const ( opCreateChain operation = "-N" opFlushChain operation = "-F" opDeleteChain operation = "-X" + opListChain operation = "-L" opAppendRule operation = "-A" opCheckRule operation = "-C" opDeleteRule operation = "-D" diff --git a/pkg/util/iptables/monitor_test.go b/pkg/util/iptables/monitor_test.go new file mode 100644 index 00000000000..0a0cc875d6a --- /dev/null +++ b/pkg/util/iptables/monitor_test.go @@ -0,0 +1,268 @@ +// +build linux + +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package iptables + +import ( + "context" + "fmt" + "io" + "sync" + "sync/atomic" + "testing" + "time" + + "k8s.io/apimachinery/pkg/util/sets" + utilwait "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/utils/exec" +) + +// We can't use the normal FakeExec because we don't know precisely how many times the +// Monitor thread will do its checks, and we don't know precisely how its iptables calls +// will interleave with the main thread's. So we use our own fake Exec implementation that +// implements a minimal iptables interface. This will need updates as iptables.runner +// changes its use of Exec. +type monitorFakeExec struct { + sync.Mutex + + tables map[string]sets.String +} + +func newMonitorFakeExec() *monitorFakeExec { + tables := make(map[string]sets.String) + tables["mangle"] = sets.NewString() + tables["filter"] = sets.NewString() + tables["nat"] = sets.NewString() + return &monitorFakeExec{tables: tables} +} + +func (mfe *monitorFakeExec) Command(cmd string, args ...string) exec.Cmd { + return &monitorFakeCmd{mfe: mfe, cmd: cmd, args: args} +} + +func (mfe *monitorFakeExec) CommandContext(ctx context.Context, cmd string, args ...string) exec.Cmd { + return mfe.Command(cmd, args...) +} + +func (mfe *monitorFakeExec) LookPath(file string) (string, error) { + return file, nil +} + +type monitorFakeCmd struct { + mfe *monitorFakeExec + cmd string + args []string +} + +func (mfc *monitorFakeCmd) CombinedOutput() ([]byte, error) { + if mfc.cmd == cmdIPTablesRestore { + // Only used for "iptables-restore --version", and the result doesn't matter + return []byte{}, nil + } else if mfc.cmd != cmdIPTables { + panic("bad command " + mfc.cmd) + } + + if len(mfc.args) == 1 && mfc.args[0] == "--version" { + return []byte("iptables v1.6.2"), nil + } + + if len(mfc.args) != 6 || mfc.args[0] != WaitString || mfc.args[1] != WaitSecondsValue || mfc.args[4] != "-t" { + panic(fmt.Sprintf("bad args %#v", mfc.args)) + } + op := operation(mfc.args[2]) + chainName := mfc.args[3] + tableName := mfc.args[5] + + mfc.mfe.Lock() + defer mfc.mfe.Unlock() + + table := mfc.mfe.tables[tableName] + if table == nil { + return []byte{}, fmt.Errorf("no such table %q", tableName) + } + + switch op { + case opCreateChain: + if !table.Has(chainName) { + table.Insert(chainName) + } + return []byte{}, nil + case opListChain: + if table.Has(chainName) { + return []byte{}, nil + } else { + return []byte{}, fmt.Errorf("no such chain %q", chainName) + } + case opDeleteChain: + table.Delete(chainName) + return []byte{}, nil + default: + panic("should not be reached") + } +} + +func (mfc *monitorFakeCmd) SetStdin(in io.Reader) { + // Used by getIPTablesRestoreVersionString(), can be ignored +} + +func (mfc *monitorFakeCmd) Run() error { + panic("should not be reached") +} +func (mfc *monitorFakeCmd) Output() ([]byte, error) { + panic("should not be reached") +} +func (mfc *monitorFakeCmd) SetDir(dir string) { + panic("should not be reached") +} +func (mfc *monitorFakeCmd) SetStdout(out io.Writer) { + panic("should not be reached") +} +func (mfc *monitorFakeCmd) SetStderr(out io.Writer) { + panic("should not be reached") +} +func (mfc *monitorFakeCmd) SetEnv(env []string) { + panic("should not be reached") +} +func (mfc *monitorFakeCmd) StdoutPipe() (io.ReadCloser, error) { + panic("should not be reached") +} +func (mfc *monitorFakeCmd) StderrPipe() (io.ReadCloser, error) { + panic("should not be reached") +} +func (mfc *monitorFakeCmd) Start() error { + panic("should not be reached") +} +func (mfc *monitorFakeCmd) Wait() error { + panic("should not be reached") +} +func (mfc *monitorFakeCmd) Stop() { + panic("should not be reached") +} + +func TestIPTablesMonitor(t *testing.T) { + mfe := newMonitorFakeExec() + ipt := New(mfe, ProtocolIpv4) + + var reloads uint32 + stopCh := make(chan struct{}) + + canary := Chain("MONITOR-TEST-CANARY") + tables := []Table{TableMangle, TableFilter, TableNAT} + go ipt.Monitor(canary, tables, func() { + if !ensureNoChains(mfe) { + t.Errorf("reload called while canaries still exist") + } + atomic.AddUint32(&reloads, 1) + }, 100*time.Millisecond, stopCh) + + // Monitor should create canary chains quickly + if err := waitForChains(mfe, canary, tables); err != nil { + t.Errorf("failed to create iptables canaries: %v", err) + } + + if err := waitForReloads(&reloads, 0); err != nil { + t.Errorf("got unexpected reloads: %v", err) + } + + // If we delete all of the chains, it should reload + ipt.DeleteChain(TableMangle, canary) + ipt.DeleteChain(TableFilter, canary) + ipt.DeleteChain(TableNAT, canary) + + if err := waitForReloads(&reloads, 1); err != nil { + t.Errorf("got unexpected number of reloads after flush: %v", err) + } + if err := waitForChains(mfe, canary, tables); err != nil { + t.Errorf("failed to create iptables canaries: %v", err) + } + + // If we delete two chains, it should not reload yet + ipt.DeleteChain(TableMangle, canary) + ipt.DeleteChain(TableFilter, canary) + + if err := waitForNoReload(&reloads, 1); err != nil { + t.Errorf("got unexpected number of reloads after partial flush: %v", err) + } + + // If we delete the last chain, it should reload now + ipt.DeleteChain(TableNAT, canary) + + if err := waitForReloads(&reloads, 2); err != nil { + t.Errorf("got unexpected number of reloads after slow flush: %v", err) + } + if err := waitForChains(mfe, canary, tables); err != nil { + t.Errorf("failed to create iptables canaries: %v", err) + } + + // If we close the stop channel, it should stop running + close(stopCh) + + if err := waitForNoReload(&reloads, 2); err != nil { + t.Errorf("got unexpected number of reloads after partial flush: %v", err) + } + if !ensureNoChains(mfe) { + t.Errorf("canaries still exist after stopping monitor") + } +} + +func waitForChains(mfe *monitorFakeExec, canary Chain, tables []Table) error { + return utilwait.PollImmediate(100*time.Millisecond, time.Second, func() (bool, error) { + mfe.Lock() + defer mfe.Unlock() + + for _, table := range tables { + if !mfe.tables[string(table)].Has(string(canary)) { + return false, nil + } + } + return true, nil + }) +} + +func ensureNoChains(mfe *monitorFakeExec) bool { + mfe.Lock() + defer mfe.Unlock() + return mfe.tables["mangle"].Len() == 0 && + mfe.tables["filter"].Len() == 0 && + mfe.tables["nat"].Len() == 0 +} + +func waitForReloads(reloads *uint32, expected uint32) error { + if atomic.LoadUint32(reloads) < expected { + utilwait.PollImmediate(100*time.Millisecond, time.Second, func() (bool, error) { + return atomic.LoadUint32(reloads) >= expected, nil + }) + } + got := atomic.LoadUint32(reloads) + if got != expected { + return fmt.Errorf("expected %d, got %d", expected, got) + } + return nil +} + +func waitForNoReload(reloads *uint32, expected uint32) error { + utilwait.PollImmediate(50*time.Millisecond, 250*time.Millisecond, func() (bool, error) { + return atomic.LoadUint32(reloads) > expected, nil + }) + + got := atomic.LoadUint32(reloads) + if got != expected { + return fmt.Errorf("expected %d, got %d", expected, got) + } + return nil +} diff --git a/pkg/util/iptables/testing/fake.go b/pkg/util/iptables/testing/fake.go index 67309166784..c95d9810a65 100644 --- a/pkg/util/iptables/testing/fake.go +++ b/pkg/util/iptables/testing/fake.go @@ -20,6 +20,7 @@ import ( "bytes" "fmt" "strings" + "time" "k8s.io/kubernetes/pkg/util/iptables" ) @@ -99,6 +100,9 @@ func (f *FakeIPTables) RestoreAll(data []byte, flush iptables.FlushFlag, counter return nil } +func (f *FakeIPTables) Monitor(canary iptables.Chain, tables []iptables.Table, reloadFunc func(), interval time.Duration, stopCh <-chan struct{}) { +} + func getToken(line, separator string) string { tokens := strings.Split(line, separator) if len(tokens) == 2 {