Merge pull request #81517 from danwinship/iptables-monitor

drop firewalld monitoring, add better iptables monitor
This commit is contained in:
Kubernetes Prow Robot 2019-09-17 10:58:02 -07:00 committed by GitHub
commit e7090e8f5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 495 additions and 896 deletions

View File

@ -75,7 +75,6 @@ go_library(
] + select({
"@io_bazel_rules_go//go/platform:android": [
"//pkg/proxy/metrics:go_default_library",
"//pkg/util/dbus:go_default_library",
"//pkg/util/node:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
@ -83,7 +82,6 @@ go_library(
],
"@io_bazel_rules_go//go/platform:darwin": [
"//pkg/proxy/metrics:go_default_library",
"//pkg/util/dbus:go_default_library",
"//pkg/util/node:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
@ -91,7 +89,6 @@ go_library(
],
"@io_bazel_rules_go//go/platform:dragonfly": [
"//pkg/proxy/metrics:go_default_library",
"//pkg/util/dbus:go_default_library",
"//pkg/util/node:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
@ -99,7 +96,6 @@ go_library(
],
"@io_bazel_rules_go//go/platform:freebsd": [
"//pkg/proxy/metrics:go_default_library",
"//pkg/util/dbus:go_default_library",
"//pkg/util/node:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
@ -107,7 +103,6 @@ go_library(
],
"@io_bazel_rules_go//go/platform:linux": [
"//pkg/proxy/metrics:go_default_library",
"//pkg/util/dbus:go_default_library",
"//pkg/util/node:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
@ -115,7 +110,6 @@ go_library(
],
"@io_bazel_rules_go//go/platform:nacl": [
"//pkg/proxy/metrics:go_default_library",
"//pkg/util/dbus:go_default_library",
"//pkg/util/node:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
@ -123,7 +117,6 @@ go_library(
],
"@io_bazel_rules_go//go/platform:netbsd": [
"//pkg/proxy/metrics:go_default_library",
"//pkg/util/dbus:go_default_library",
"//pkg/util/node:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
@ -131,7 +124,6 @@ go_library(
],
"@io_bazel_rules_go//go/platform:openbsd": [
"//pkg/proxy/metrics:go_default_library",
"//pkg/util/dbus:go_default_library",
"//pkg/util/node:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
@ -139,7 +131,6 @@ go_library(
],
"@io_bazel_rules_go//go/platform:plan9": [
"//pkg/proxy/metrics:go_default_library",
"//pkg/util/dbus:go_default_library",
"//pkg/util/node:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
@ -147,7 +138,6 @@ go_library(
],
"@io_bazel_rules_go//go/platform:solaris": [
"//pkg/proxy/metrics:go_default_library",
"//pkg/util/dbus:go_default_library",
"//pkg/util/node:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",

View File

@ -42,7 +42,6 @@ import (
"k8s.io/kubernetes/pkg/proxy/metrics"
"k8s.io/kubernetes/pkg/proxy/userspace"
"k8s.io/kubernetes/pkg/util/configz"
utildbus "k8s.io/kubernetes/pkg/util/dbus"
utilipset "k8s.io/kubernetes/pkg/util/ipset"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
utilipvs "k8s.io/kubernetes/pkg/util/ipvs"
@ -84,13 +83,11 @@ func newProxyServer(
var ipvsInterface utilipvs.Interface
var kernelHandler ipvs.KernelHandler
var ipsetInterface utilipset.Interface
var dbus utildbus.Interface
// Create a iptables utils.
execer := exec.New()
dbus = utildbus.New()
iptInterface = utiliptables.New(execer, dbus, protocol)
iptInterface = utiliptables.New(execer, protocol)
kernelHandler = ipvs.NewLinuxKernelHandler()
ipsetInterface = utilipset.New(execer)
canUseIPVS, _ := ipvs.CanUseIPVSProxier(kernelHandler, ipsetInterface)
@ -181,10 +178,10 @@ func newProxyServer(
var ipt [2]utiliptables.Interface
if iptInterface.IsIpv6() {
ipt[1] = iptInterface
ipt[0] = utiliptables.New(execer, dbus, utiliptables.ProtocolIpv4)
ipt[0] = utiliptables.New(execer, utiliptables.ProtocolIpv4)
} else {
ipt[0] = iptInterface
ipt[1] = utiliptables.New(execer, dbus, utiliptables.ProtocolIpv6)
ipt[1] = utiliptables.New(execer, utiliptables.ProtocolIpv6)
}
proxier, err = ipvs.NewDualStackProxier(
@ -253,8 +250,6 @@ func newProxyServer(
}
}
iptInterface.AddReloadFunc(proxier.Sync)
return &ProxyServer{
Client: client,
EventClient: eventClient,

View File

@ -75,7 +75,6 @@
"k8s.io/kubernetes/pkg/serviceaccount",
"k8s.io/kubernetes/pkg/util/async",
"k8s.io/kubernetes/pkg/util/conntrack",
"k8s.io/kubernetes/pkg/util/dbus",
"k8s.io/kubernetes/pkg/util/hash",
"k8s.io/kubernetes/pkg/util/iptables",
"k8s.io/kubernetes/pkg/util/metrics",

2
go.mod
View File

@ -61,7 +61,7 @@ require (
github.com/go-openapi/strfmt v0.19.0
github.com/go-openapi/validate v0.19.2
github.com/go-ozzo/ozzo-validation v3.5.0+incompatible // indirect
github.com/godbus/dbus v4.1.0+incompatible
github.com/godbus/dbus v4.1.0+incompatible // indirect
github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d
github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903
github.com/golang/mock v1.2.0

View File

@ -100,7 +100,6 @@ go_library(
"//pkg/scheduler/api:go_default_library",
"//pkg/security/apparmor:go_default_library",
"//pkg/security/podsecuritypolicy/sysctl:go_default_library",
"//pkg/util/dbus:go_default_library",
"//pkg/util/iptables:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/util/node:go_default_library",

View File

@ -21,6 +21,7 @@ import (
"fmt"
"net"
"strings"
"time"
"k8s.io/apimachinery/pkg/util/sets"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
@ -335,10 +336,7 @@ func (f *fakeIPTables) RestoreAll(data []byte, flush utiliptables.FlushFlag, cou
return f.restore("", data, flush)
}
func (f *fakeIPTables) AddReloadFunc(reloadFunc func()) {
}
func (f *fakeIPTables) Destroy() {
func (f *fakeIPTables) Monitor(canary utiliptables.Chain, tables []utiliptables.Table, reloadFunc func(), interval time.Duration, stopCh <-chan struct{}) {
}
func (f *fakeIPTables) isBuiltinChain(tableName utiliptables.Table, chainName utiliptables.Chain) bool {

View File

@ -42,7 +42,6 @@ go_library(
"//pkg/kubelet/dockershim/network:go_default_library",
"//pkg/kubelet/dockershim/network/hostport:go_default_library",
"//pkg/util/bandwidth:go_default_library",
"//pkg/util/dbus:go_default_library",
"//pkg/util/ebtables:go_default_library",
"//pkg/util/iptables:go_default_library",
"//pkg/util/sysctl:go_default_library",

View File

@ -41,7 +41,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/dockershim/network"
"k8s.io/kubernetes/pkg/kubelet/dockershim/network/hostport"
"k8s.io/kubernetes/pkg/util/bandwidth"
utildbus "k8s.io/kubernetes/pkg/util/dbus"
utilebtables "k8s.io/kubernetes/pkg/util/ebtables"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
@ -125,9 +124,8 @@ type kubenetNetworkPlugin struct {
func NewPlugin(networkPluginDirs []string, cacheDir string) network.NetworkPlugin {
execer := utilexec.New()
dbus := utildbus.New()
iptInterface := utiliptables.New(execer, dbus, utiliptables.ProtocolIpv4)
iptInterfacev6 := utiliptables.New(execer, dbus, utiliptables.ProtocolIpv6)
iptInterface := utiliptables.New(execer, utiliptables.ProtocolIpv4)
iptInterfacev6 := utiliptables.New(execer, utiliptables.ProtocolIpv6)
return &kubenetNetworkPlugin{
podIPs: make(map[kubecontainer.ContainerID]utilsets.String),
execer: utilexec.New(),

View File

@ -109,7 +109,6 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/security/apparmor"
sysctlwhitelist "k8s.io/kubernetes/pkg/security/podsecuritypolicy/sysctl"
utildbus "k8s.io/kubernetes/pkg/util/dbus"
utilipt "k8s.io/kubernetes/pkg/util/iptables"
"k8s.io/kubernetes/pkg/util/mount"
nodeutil "k8s.io/kubernetes/pkg/util/node"
@ -537,7 +536,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
nodeIPValidator: validateNodeIP,
clock: clock.RealClock{},
enableControllerAttachDetach: kubeCfg.EnableControllerAttachDetach,
iptClient: utilipt.New(utilexec.New(), utildbus.New(), protocol),
iptClient: utilipt.New(utilexec.New(), protocol),
makeIPTablesUtilChains: kubeCfg.MakeIPTablesUtilChains,
iptablesMasqueradeBit: int(kubeCfg.IPTablesMasqueradeBit),
iptablesDropBit: int(kubeCfg.IPTablesDropBit),
@ -1428,9 +1427,9 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
}
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
// Start loop to sync iptables util rules
// Set up iptables util rules
if kl.makeIPTablesUtilChains {
go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop)
kl.initNetworkUtil()
}
// Start a goroutine responsible for killing pods (that are not properly

View File

@ -20,11 +20,20 @@ package kubelet
import (
"fmt"
"time"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
)
func (kl *Kubelet) initNetworkUtil() {
kl.syncNetworkUtil()
go kl.iptClient.Monitor(utiliptables.Chain("KUBE-KUBELET-CANARY"),
[]utiliptables.Table{utiliptables.TableMangle, utiliptables.TableNAT, utiliptables.TableFilter},
kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop)
}
// syncNetworkUtil ensures the network utility are present on host.
// Network util includes:
// 1. In nat table, KUBE-MARK-DROP rule to mark connections for dropping

View File

@ -19,4 +19,4 @@ limitations under the License.
package kubelet
// Do nothing.
func (kl *Kubelet) syncNetworkUtil() {}
func (kl *Kubelet) initNetworkUtil() {}

View File

@ -323,7 +323,13 @@ func NewProxier(ipt utiliptables.Interface,
}
burstSyncs := 2
klog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs)
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
// We pass syncPeriod to ipt.Monitor, which will call us only if it needs to.
// We need to pass *some* maxInterval to NewBoundedFrequencyRunner anyway though.
// time.Hour is arbitrary.
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, time.Hour, burstSyncs)
go ipt.Monitor(utiliptables.Chain("KUBE-PROXY-CANARY"),
[]utiliptables.Table{utiliptables.TableMangle, utiliptables.TableNAT, utiliptables.TableFilter},
proxier.syncProxyRules, syncPeriod, wait.NeverStop)
return proxier, nil
}

View File

@ -17,7 +17,6 @@ filegroup(
"//pkg/util/configz:all-srcs",
"//pkg/util/conntrack:all-srcs",
"//pkg/util/coverage:all-srcs",
"//pkg/util/dbus:all-srcs",
"//pkg/util/ebtables:all-srcs",
"//pkg/util/env:all-srcs",
"//pkg/util/filesystem:all-srcs",

View File

@ -1,38 +0,0 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
name = "go_default_library",
srcs = [
"dbus.go",
"doc.go",
"fake_dbus.go",
],
importpath = "k8s.io/kubernetes/pkg/util/dbus",
deps = ["//vendor/github.com/godbus/dbus:go_default_library"],
)
go_test(
name = "go_default_test",
srcs = ["dbus_test.go"],
embed = [":go_default_library"],
deps = ["//vendor/github.com/godbus/dbus:go_default_library"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -1,133 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dbus
import (
godbus "github.com/godbus/dbus"
)
// Interface is an interface that presents a subset of the godbus/dbus API. Use this
// when you want to inject fakeable/mockable D-Bus behavior.
type Interface interface {
// SystemBus returns a connection to the system bus, connecting to it
// first if necessary
SystemBus() (Connection, error)
// SessionBus returns a connection to the session bus, connecting to it
// first if necessary
SessionBus() (Connection, error)
}
// Connection represents a D-Bus connection
type Connection interface {
// Returns an Object representing the bus itself
BusObject() Object
// Object creates a representation of a remote D-Bus object
Object(name, path string) Object
// Signal registers or unregisters a channel to receive D-Bus signals
Signal(ch chan<- *godbus.Signal)
}
// Object represents a remote D-Bus object
type Object interface {
// Call synchronously calls a D-Bus method
Call(method string, flags godbus.Flags, args ...interface{}) Call
}
// Call represents a pending or completed D-Bus method call
type Call interface {
// Store returns a completed call's return values, or an error
Store(retvalues ...interface{}) error
}
// Implements Interface in terms of actually talking to D-Bus
type dbusImpl struct {
systemBus *connImpl
sessionBus *connImpl
}
// Implements Connection as a godbus.Conn
type connImpl struct {
conn *godbus.Conn
}
// Implements Object as a godbus.Object
type objectImpl struct {
object godbus.BusObject
}
// Implements Call as a godbus.Call
type callImpl struct {
call *godbus.Call
}
// New returns a new Interface which will use godbus to talk to D-Bus
func New() Interface {
return &dbusImpl{}
}
// SystemBus is part of Interface
func (db *dbusImpl) SystemBus() (Connection, error) {
if db.systemBus == nil {
bus, err := godbus.SystemBus()
if err != nil {
return nil, err
}
db.systemBus = &connImpl{bus}
}
return db.systemBus, nil
}
// SessionBus is part of Interface
func (db *dbusImpl) SessionBus() (Connection, error) {
if db.sessionBus == nil {
bus, err := godbus.SessionBus()
if err != nil {
return nil, err
}
db.sessionBus = &connImpl{bus}
}
return db.sessionBus, nil
}
// BusObject is part of the Connection interface
func (conn *connImpl) BusObject() Object {
return &objectImpl{conn.conn.BusObject()}
}
// Object is part of the Connection interface
func (conn *connImpl) Object(name, path string) Object {
return &objectImpl{conn.conn.Object(name, godbus.ObjectPath(path))}
}
// Signal is part of the Connection interface
func (conn *connImpl) Signal(ch chan<- *godbus.Signal) {
conn.conn.Signal(ch)
}
// Call is part of the Object interface
func (obj *objectImpl) Call(method string, flags godbus.Flags, args ...interface{}) Call {
return &callImpl{obj.object.Call(method, flags, args...)}
}
// Store is part of the Call interface
func (call *callImpl) Store(retvalues ...interface{}) error {
return call.call.Store(retvalues...)
}

View File

@ -1,243 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dbus
import (
"fmt"
"os"
"testing"
godbus "github.com/godbus/dbus"
)
const (
DBusNameFlagDoNotQueue uint32 = 1 << (iota + 1)
)
const (
DBusRequestNameReplyPrimaryOwner uint32 = iota + 1
DBusRequestNameReplyAlreadyOwner
)
const (
DBusReleaseNameReplyReleased uint32 = iota + 1
DBusReleaseNameReplyNotOwner
)
func doDBusTest(t *testing.T, dbus Interface, real bool) {
bus, err := dbus.SystemBus()
if err != nil {
if !real {
t.Errorf("dbus.SystemBus() failed with fake Interface")
}
t.Skipf("D-Bus is not running: %v", err)
}
busObj := bus.BusObject()
id := ""
err = busObj.Call("org.freedesktop.DBus.GetId", 0).Store(&id)
if err != nil {
t.Errorf("expected success, got %v", err)
}
if len(id) == 0 {
t.Errorf("expected non-empty Id, got \"\"")
}
// Switch to the session bus for the rest, since the system bus is more
// locked down (and thus harder to trick into emitting signals).
bus, err = dbus.SessionBus()
if err != nil {
if !real {
t.Errorf("dbus.SystemBus() failed with fake Interface")
}
t.Skipf("D-Bus session bus is not available: %v", err)
}
busObj = bus.BusObject()
name := fmt.Sprintf("io.kubernetes.dbus_test_%d", os.Getpid())
owner := ""
err = busObj.Call("org.freedesktop.DBus.GetNameOwner", 0, name).Store(&owner)
if err == nil {
t.Errorf("expected '%s' to be un-owned, but found owner %s", name, owner)
}
dbuserr, ok := err.(godbus.Error)
if !ok {
t.Errorf("expected godbus.Error, but got %#v", err)
}
if dbuserr.Name != "org.freedesktop.DBus.Error.NameHasNoOwner" {
t.Errorf("expected NameHasNoOwner error but got %v", err)
}
sigchan := make(chan *godbus.Signal, 10)
bus.Signal(sigchan)
rule := fmt.Sprintf("type='signal',interface='org.freedesktop.DBus',member='NameOwnerChanged',path='/org/freedesktop/DBus',sender='org.freedesktop.DBus',arg0='%s'", name)
err = busObj.Call("org.freedesktop.DBus.AddMatch", 0, rule).Store()
if err != nil {
t.Errorf("expected success, got %v", err)
}
var ret uint32
err = busObj.Call("org.freedesktop.DBus.RequestName", 0, name, DBusNameFlagDoNotQueue).Store(&ret)
if err != nil {
t.Errorf("expected success, got %v", err)
}
if ret != DBusRequestNameReplyPrimaryOwner {
t.Errorf("expected %v, got %v", DBusRequestNameReplyPrimaryOwner, ret)
}
err = busObj.Call("org.freedesktop.DBus.GetNameOwner", 0, name).Store(&owner)
if err != nil {
t.Errorf("expected success, got %v", err)
}
var changedSignal, acquiredSignal, lostSignal *godbus.Signal
sig1 := <-sigchan
sig2 := <-sigchan
// We get two signals, but the order isn't guaranteed
if sig1.Name == "org.freedesktop.DBus.NameOwnerChanged" {
changedSignal = sig1
acquiredSignal = sig2
} else {
acquiredSignal = sig1
changedSignal = sig2
}
if acquiredSignal.Sender != "org.freedesktop.DBus" || acquiredSignal.Name != "org.freedesktop.DBus.NameAcquired" {
t.Errorf("expected NameAcquired signal, got %v", acquiredSignal)
}
acquiredName := acquiredSignal.Body[0].(string)
if acquiredName != name {
t.Errorf("unexpected NameAcquired arguments: %v", acquiredSignal)
}
if changedSignal.Sender != "org.freedesktop.DBus" || changedSignal.Name != "org.freedesktop.DBus.NameOwnerChanged" {
t.Errorf("expected NameOwnerChanged signal, got %v", changedSignal)
}
changedName := changedSignal.Body[0].(string)
oldOwner := changedSignal.Body[1].(string)
newOwner := changedSignal.Body[2].(string)
if changedName != name || oldOwner != "" || newOwner != owner {
t.Errorf("unexpected NameOwnerChanged arguments: %v", changedSignal)
}
err = busObj.Call("org.freedesktop.DBus.ReleaseName", 0, name).Store(&ret)
if err != nil {
t.Errorf("expected success, got %v", err)
}
if ret != DBusReleaseNameReplyReleased {
t.Errorf("expected %v, got %v", DBusReleaseNameReplyReleased, ret)
}
sig1 = <-sigchan
sig2 = <-sigchan
if sig1.Name == "org.freedesktop.DBus.NameOwnerChanged" {
changedSignal = sig1
lostSignal = sig2
} else {
lostSignal = sig1
changedSignal = sig2
}
if lostSignal.Sender != "org.freedesktop.DBus" || lostSignal.Name != "org.freedesktop.DBus.NameLost" {
t.Errorf("expected NameLost signal, got %v", lostSignal)
}
lostName := lostSignal.Body[0].(string)
if lostName != name {
t.Errorf("unexpected NameLost arguments: %v", lostSignal)
}
if changedSignal.Sender != "org.freedesktop.DBus" || changedSignal.Name != "org.freedesktop.DBus.NameOwnerChanged" {
t.Errorf("expected NameOwnerChanged signal, got %v", changedSignal)
}
changedName = changedSignal.Body[0].(string)
oldOwner = changedSignal.Body[1].(string)
newOwner = changedSignal.Body[2].(string)
if changedName != name || oldOwner != owner || newOwner != "" {
t.Errorf("unexpected NameOwnerChanged arguments: %v", changedSignal)
}
if len(sigchan) != 0 {
t.Errorf("unexpected extra signals (%d)", len(sigchan))
}
// Unregister sigchan
bus.Signal(sigchan)
}
func TestRealDBus(t *testing.T) {
dbus := New()
doDBusTest(t, dbus, true)
}
func TestFakeDBus(t *testing.T) {
uniqueName := ":1.1"
ownedName := ""
fakeSystem := NewFakeConnection()
fakeSystem.SetBusObject(
func(method string, args ...interface{}) ([]interface{}, error) {
if method == "org.freedesktop.DBus.GetId" {
return []interface{}{"foo"}, nil
}
return nil, fmt.Errorf("unexpected method call '%s'", method)
},
)
fakeSession := NewFakeConnection()
fakeSession.SetBusObject(
func(method string, args ...interface{}) ([]interface{}, error) {
if method == "org.freedesktop.DBus.GetNameOwner" {
checkName := args[0].(string)
if checkName != ownedName {
return nil, godbus.Error{Name: "org.freedesktop.DBus.Error.NameHasNoOwner", Body: nil}
}
return []interface{}{uniqueName}, nil
} else if method == "org.freedesktop.DBus.RequestName" {
reqName := args[0].(string)
_ = args[1].(uint32)
if ownedName != "" {
return []interface{}{DBusRequestNameReplyAlreadyOwner}, nil
}
ownedName = reqName
fakeSession.EmitSignal("org.freedesktop.DBus", "/org/freedesktop/DBus", "org.freedesktop.DBus", "NameAcquired", reqName)
fakeSession.EmitSignal("org.freedesktop.DBus", "/org/freedesktop/DBus", "org.freedesktop.DBus", "NameOwnerChanged", reqName, "", uniqueName)
return []interface{}{DBusRequestNameReplyPrimaryOwner}, nil
} else if method == "org.freedesktop.DBus.ReleaseName" {
reqName := args[0].(string)
if reqName != ownedName {
return []interface{}{DBusReleaseNameReplyNotOwner}, nil
}
ownedName = ""
fakeSession.EmitSignal("org.freedesktop.DBus", "/org/freedesktop/DBus", "org.freedesktop.DBus", "NameOwnerChanged", reqName, uniqueName, "")
fakeSession.EmitSignal("org.freedesktop.DBus", "/org/freedesktop/DBus", "org.freedesktop.DBus", "NameLost", reqName)
return []interface{}{DBusReleaseNameReplyReleased}, nil
} else if method == "org.freedesktop.DBus.AddMatch" {
return nil, nil
} else {
return nil, fmt.Errorf("unexpected method call '%s'", method)
}
},
)
dbus := NewFake(fakeSystem, fakeSession)
doDBusTest(t, dbus, false)
}

View File

@ -1,18 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package dbus provides an injectable interface and implementations for D-Bus communication
package dbus // import "k8s.io/kubernetes/pkg/util/dbus"

View File

@ -1,140 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dbus
import (
"fmt"
"sync"
godbus "github.com/godbus/dbus"
)
// Fake is a simple fake Interface type.
type Fake struct {
systemBus *FakeConnection
sessionBus *FakeConnection
}
// FakeConnection represents a fake D-Bus connection
type FakeConnection struct {
lock sync.Mutex
busObject *fakeObject
objects map[string]*fakeObject
signalHandlers []chan<- *godbus.Signal
}
// FakeHandler is used to handle fake D-Bus method calls
type FakeHandler func(method string, args ...interface{}) ([]interface{}, error)
type fakeObject struct {
handler FakeHandler
}
type fakeCall struct {
ret []interface{}
err error
}
// NewFake returns a new Interface which will fake talking to D-Bus
func NewFake(systemBus *FakeConnection, sessionBus *FakeConnection) *Fake {
return &Fake{systemBus, sessionBus}
}
// NewFakeConnection returns a FakeConnection Interface
func NewFakeConnection() *FakeConnection {
return &FakeConnection{
objects: make(map[string]*fakeObject),
}
}
// SystemBus is part of Interface
func (db *Fake) SystemBus() (Connection, error) {
if db.systemBus != nil {
return db.systemBus, nil
}
return nil, fmt.Errorf("DBus is not running")
}
// SessionBus is part of Interface
func (db *Fake) SessionBus() (Connection, error) {
if db.sessionBus != nil {
return db.sessionBus, nil
}
return nil, fmt.Errorf("DBus is not running")
}
// BusObject is part of the Connection interface
func (conn *FakeConnection) BusObject() Object {
return conn.busObject
}
// Object is part of the Connection interface
func (conn *FakeConnection) Object(name, path string) Object {
return conn.objects[name+path]
}
// Signal is part of the Connection interface
func (conn *FakeConnection) Signal(ch chan<- *godbus.Signal) {
conn.lock.Lock()
defer conn.lock.Unlock()
for i := range conn.signalHandlers {
if conn.signalHandlers[i] == ch {
conn.signalHandlers = append(conn.signalHandlers[:i], conn.signalHandlers[i+1:]...)
return
}
}
conn.signalHandlers = append(conn.signalHandlers, ch)
}
// SetBusObject sets the handler for the BusObject of conn
func (conn *FakeConnection) SetBusObject(handler FakeHandler) {
conn.busObject = &fakeObject{handler}
}
// AddObject adds a handler for the Object at name and path
func (conn *FakeConnection) AddObject(name, path string, handler FakeHandler) {
conn.objects[name+path] = &fakeObject{handler}
}
// EmitSignal emits a signal on conn
func (conn *FakeConnection) EmitSignal(name, path, iface, signal string, args ...interface{}) {
conn.lock.Lock()
defer conn.lock.Unlock()
sig := &godbus.Signal{
Sender: name,
Path: godbus.ObjectPath(path),
Name: iface + "." + signal,
Body: args,
}
for _, ch := range conn.signalHandlers {
ch <- sig
}
}
// Call is part of the Object interface
func (obj *fakeObject) Call(method string, flags godbus.Flags, args ...interface{}) Call {
ret, err := obj.handler(method, args...)
return &fakeCall{ret, err}
}
// Store is part of the Call interface
func (call *fakeCall) Store(retvalues ...interface{}) error {
if call.err != nil {
return call.err
}
return godbus.Store(call.ret, retvalues...)
}

View File

@ -17,17 +17,15 @@ go_library(
],
importpath = "k8s.io/kubernetes/pkg/util/iptables",
deps = [
"//pkg/util/dbus:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/version:go_default_library",
"//vendor/github.com/godbus/dbus:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
"//vendor/k8s.io/utils/trace:go_default_library",
] + select({
"@io_bazel_rules_go//go/platform:linux": [
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/golang.org/x/sys/unix:go_default_library",
],
"//conditions:default": [],
@ -38,14 +36,15 @@ go_test(
name = "go_default_test",
srcs = [
"iptables_test.go",
"monitor_test.go",
"save_restore_test.go",
],
embed = [":go_default_library"],
deps = select({
"@io_bazel_rules_go//go/platform:linux": [
"//pkg/util/dbus:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/version:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
"//vendor/k8s.io/utils/exec/testing:go_default_library",
],

View File

@ -25,11 +25,10 @@ import (
"sync"
"time"
godbus "github.com/godbus/dbus"
"k8s.io/apimachinery/pkg/util/sets"
utilversion "k8s.io/apimachinery/pkg/util/version"
utilwait "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"
utildbus "k8s.io/kubernetes/pkg/util/dbus"
utilexec "k8s.io/utils/exec"
utiltrace "k8s.io/utils/trace"
)
@ -65,10 +64,17 @@ type Interface interface {
Restore(table Table, data []byte, flush FlushFlag, counters RestoreCountersFlag) error
// RestoreAll is the same as Restore except that no table is specified.
RestoreAll(data []byte, flush FlushFlag, counters RestoreCountersFlag) error
// AddReloadFunc adds a function to call on iptables reload
AddReloadFunc(reloadFunc func())
// Destroy cleans up resources used by the Interface
Destroy()
// Monitor detects when the given iptables tables have been flushed by an external
// tool (e.g. a firewall reload) by creating canary chains and polling to see if
// they have been deleted. (Specifically, it polls tables[0] every interval until
// the canary has been deleted from there, then waits a short additional time for
// the canaries to be deleted from the remaining tables as well. You can optimize
// the polling by listing a relatively empty table in tables[0]). When a flush is
// detected, this calls the reloadFunc so the caller can reload their own iptables
// rules. If it is unable to create the canary chains (either initially or after
// a reload) it will log an error and stop monitoring.
// (This function should be called from a goroutine.)
Monitor(canary Chain, tables []Table, reloadFunc func(), interval time.Duration, stopCh <-chan struct{})
// HasRandomFully reveals whether `-j MASQUERADE` takes the
// `--random-fully` option. This is helpful to work around a
// Linux kernel bug that sometimes causes multiple flows to get
@ -143,22 +149,17 @@ const LockfilePath16x = "/run/xtables.lock"
type runner struct {
mu sync.Mutex
exec utilexec.Interface
dbus utildbus.Interface
protocol Protocol
hasCheck bool
hasListener bool
hasRandomFully bool
waitFlag []string
restoreWaitFlag []string
lockfilePath string
reloadFuncs []func()
signal chan *godbus.Signal
}
// newInternal returns a new Interface which will exec iptables, and allows the
// caller to change the iptables-restore lockfile path
func newInternal(exec utilexec.Interface, dbus utildbus.Interface, protocol Protocol, lockfilePath string) Interface {
func newInternal(exec utilexec.Interface, protocol Protocol, lockfilePath string) Interface {
version, err := getIPTablesVersion(exec, protocol)
if err != nil {
klog.Warningf("Error checking iptables version, assuming version at least %s: %v", MinCheckVersion, err)
@ -171,10 +172,8 @@ func newInternal(exec utilexec.Interface, dbus utildbus.Interface, protocol Prot
runner := &runner{
exec: exec,
dbus: dbus,
protocol: protocol,
hasCheck: version.AtLeast(MinCheckVersion),
hasListener: false,
hasRandomFully: version.AtLeast(RandomFullyMinVersion),
waitFlag: getIPTablesWaitFlag(version),
restoreWaitFlag: getIPTablesRestoreWaitFlag(version, exec, protocol),
@ -184,44 +183,8 @@ func newInternal(exec utilexec.Interface, dbus utildbus.Interface, protocol Prot
}
// New returns a new Interface which will exec iptables.
func New(exec utilexec.Interface, dbus utildbus.Interface, protocol Protocol) Interface {
return newInternal(exec, dbus, protocol, "")
}
// Destroy is part of Interface.
func (runner *runner) Destroy() {
if runner.signal != nil {
runner.signal <- nil
}
}
const (
firewalldName = "org.fedoraproject.FirewallD1"
firewalldPath = "/org/fedoraproject/FirewallD1"
firewalldInterface = "org.fedoraproject.FirewallD1"
)
// Connects to D-Bus and listens for FirewallD start/restart. (On non-FirewallD-using
// systems, this is effectively a no-op; we listen for the signals, but they will never be
// emitted, so reload() will never be called.)
func (runner *runner) connectToFirewallD() {
bus, err := runner.dbus.SystemBus()
if err != nil {
klog.V(1).Infof("Could not connect to D-Bus system bus: %s", err)
return
}
runner.hasListener = true
rule := fmt.Sprintf("type='signal',sender='%s',path='%s',interface='%s',member='Reloaded'", firewalldName, firewalldPath, firewalldInterface)
bus.BusObject().Call("org.freedesktop.DBus.AddMatch", 0, rule)
rule = fmt.Sprintf("type='signal',interface='org.freedesktop.DBus',member='NameOwnerChanged',path='/org/freedesktop/DBus',sender='org.freedesktop.DBus',arg0='%s'", firewalldName)
bus.BusObject().Call("org.freedesktop.DBus.AddMatch", 0, rule)
runner.signal = make(chan *godbus.Signal, 10)
bus.Signal(runner.signal)
go runner.dbusSignalHandler(bus)
func New(exec utilexec.Interface, protocol Protocol) Interface {
return newInternal(exec, protocol, "")
}
// EnsureChain is part of Interface.
@ -529,12 +492,78 @@ func (runner *runner) checkRuleUsingCheck(args []string) (bool, error) {
return false, fmt.Errorf("error checking rule: %v: %s", err, out)
}
const (
// Max time we wait for an iptables flush to complete after we notice it has started
iptablesFlushTimeout = 5 * time.Second
// How often we poll while waiting for an iptables flush to complete
iptablesFlushPollTime = 100 * time.Millisecond
)
// Monitor is part of Interface
func (runner *runner) Monitor(canary Chain, tables []Table, reloadFunc func(), interval time.Duration, stopCh <-chan struct{}) {
for {
for _, table := range tables {
if _, err := runner.EnsureChain(table, canary); err != nil {
klog.Warningf("Could not set up iptables canary %s/%s: %v", string(table), string(canary), err)
return
}
}
// Poll until stopCh is closed or iptables is flushed
err := utilwait.PollUntil(interval, func() (bool, error) {
if runner.chainExists(tables[0], canary) {
return false, nil
}
klog.V(2).Infof("iptables canary %s/%s deleted", string(tables[0]), string(canary))
// Wait for the other canaries to be deleted too before returning
// so we don't start reloading too soon.
err := utilwait.PollImmediate(iptablesFlushPollTime, iptablesFlushTimeout, func() (bool, error) {
for i := 1; i < len(tables); i++ {
if runner.chainExists(tables[i], canary) {
return false, nil
}
}
return true, nil
})
if err != nil {
klog.Warning("Inconsistent iptables state detected.")
}
return true, nil
}, stopCh)
if err != nil {
// stopCh was closed
for _, table := range tables {
_ = runner.DeleteChain(table, canary)
}
return
}
klog.V(2).Infof("Reloading after iptables flush")
reloadFunc()
}
}
// chainExists is used internally by Monitor; none of the public Interface methods can be
// used to distinguish "chain exists" from "chain does not exist" with no side effects
func (runner *runner) chainExists(table Table, chain Chain) bool {
fullArgs := makeFullArgs(table, chain)
runner.mu.Lock()
defer runner.mu.Unlock()
_, err := runner.run(opListChain, fullArgs)
return err == nil
}
type operation string
const (
opCreateChain operation = "-N"
opFlushChain operation = "-F"
opDeleteChain operation = "-X"
opListChain operation = "-L"
opAppendRule operation = "-A"
opCheckRule operation = "-C"
opDeleteRule operation = "-D"
@ -620,62 +649,6 @@ func getIPTablesRestoreVersionString(exec utilexec.Interface, protocol Protocol)
return match[1], nil
}
// goroutine to listen for D-Bus signals
func (runner *runner) dbusSignalHandler(bus utildbus.Connection) {
firewalld := bus.Object(firewalldName, firewalldPath)
for s := range runner.signal {
if s == nil {
// Unregister
bus.Signal(runner.signal)
return
}
switch s.Name {
case "org.freedesktop.DBus.NameOwnerChanged":
name := s.Body[0].(string)
newOwner := s.Body[2].(string)
if name != firewalldName || len(newOwner) == 0 {
continue
}
// FirewallD startup (specifically the part where it deletes
// all existing iptables rules) may not yet be complete when
// we get this signal, so make a dummy request to it to
// synchronize.
firewalld.Call(firewalldInterface+".getDefaultZone", 0)
runner.reload()
case firewalldInterface + ".Reloaded":
runner.reload()
}
}
}
// AddReloadFunc is part of Interface
func (runner *runner) AddReloadFunc(reloadFunc func()) {
runner.mu.Lock()
defer runner.mu.Unlock()
// We only need to listen to firewalld if there are Reload functions, so lazy
// initialize the listener.
if !runner.hasListener {
runner.connectToFirewallD()
}
runner.reloadFuncs = append(runner.reloadFuncs, reloadFunc)
}
// runs all reload funcs to re-sync iptables rules
func (runner *runner) reload() {
klog.V(1).Infof("reloading iptables rules")
for _, f := range runner.reloadFuncs {
f()
}
}
func (runner *runner) HasRandomFully() bool {
return runner.hasRandomFully
}

View File

@ -26,11 +26,9 @@ import (
"reflect"
"strings"
"testing"
"time"
"k8s.io/apimachinery/pkg/util/sets"
utilversion "k8s.io/apimachinery/pkg/util/version"
"k8s.io/kubernetes/pkg/util/dbus"
"k8s.io/utils/exec"
fakeexec "k8s.io/utils/exec/testing"
)
@ -64,8 +62,7 @@ func testIPTablesVersionCmds(t *testing.T, protocol Protocol) {
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, dbus.NewFake(nil, nil), protocol)
defer runner.Destroy()
_ = New(&fexec, protocol)
// Check that proper iptables version command was used during runner instantiation
if !sets.NewString(fcmd.CombinedOutputLog[0]...).HasAll(iptablesCmd, "--version") {
@ -109,8 +106,7 @@ func testEnsureChain(t *testing.T, protocol Protocol) {
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, dbus.NewFake(nil, nil), protocol)
defer runner.Destroy()
runner := New(&fexec, protocol)
// Success.
exists, err := runner.EnsureChain(TableNAT, Chain("FOOBAR"))
if err != nil {
@ -167,8 +163,7 @@ func TestFlushChain(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4)
defer runner.Destroy()
runner := New(&fexec, ProtocolIpv4)
// Success.
err := runner.FlushChain(TableNAT, Chain("FOOBAR"))
if err != nil {
@ -205,8 +200,7 @@ func TestDeleteChain(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4)
defer runner.Destroy()
runner := New(&fexec, ProtocolIpv4)
// Success.
err := runner.DeleteChain(TableNAT, Chain("FOOBAR"))
if err != nil {
@ -242,8 +236,7 @@ func TestEnsureRuleAlreadyExists(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4)
defer runner.Destroy()
runner := New(&fexec, ProtocolIpv4)
exists, err := runner.EnsureRule(Append, TableNAT, ChainOutput, "abc", "123")
if err != nil {
t.Errorf("expected success, got %v", err)
@ -279,8 +272,7 @@ func TestEnsureRuleNew(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4)
defer runner.Destroy()
runner := New(&fexec, ProtocolIpv4)
exists, err := runner.EnsureRule(Append, TableNAT, ChainOutput, "abc", "123")
if err != nil {
t.Errorf("expected success, got %v", err)
@ -313,8 +305,7 @@ func TestEnsureRuleErrorChecking(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4)
defer runner.Destroy()
runner := New(&fexec, ProtocolIpv4)
_, err := runner.EnsureRule(Append, TableNAT, ChainOutput, "abc", "123")
if err == nil {
t.Errorf("expected failure")
@ -344,8 +335,7 @@ func TestEnsureRuleErrorCreating(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4)
defer runner.Destroy()
runner := New(&fexec, ProtocolIpv4)
_, err := runner.EnsureRule(Append, TableNAT, ChainOutput, "abc", "123")
if err == nil {
t.Errorf("expected failure")
@ -372,8 +362,7 @@ func TestDeleteRuleDoesNotExist(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4)
defer runner.Destroy()
runner := New(&fexec, ProtocolIpv4)
err := runner.DeleteRule(TableNAT, ChainOutput, "abc", "123")
if err != nil {
t.Errorf("expected success, got %v", err)
@ -406,8 +395,7 @@ func TestDeleteRuleExists(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4)
defer runner.Destroy()
runner := New(&fexec, ProtocolIpv4)
err := runner.DeleteRule(TableNAT, ChainOutput, "abc", "123")
if err != nil {
t.Errorf("expected success, got %v", err)
@ -437,8 +425,7 @@ func TestDeleteRuleErrorChecking(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4)
defer runner.Destroy()
runner := New(&fexec, ProtocolIpv4)
err := runner.DeleteRule(TableNAT, ChainOutput, "abc", "123")
if err == nil {
t.Errorf("expected failure")
@ -468,8 +455,7 @@ func TestDeleteRuleErrorDeleting(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4)
defer runner.Destroy()
runner := New(&fexec, ProtocolIpv4)
err := runner.DeleteRule(TableNAT, ChainOutput, "abc", "123")
if err == nil {
t.Errorf("expected failure")
@ -504,7 +490,7 @@ func TestGetIPTablesHasCheckCommand(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
ipt := New(&fexec, nil, ProtocolIpv4)
ipt := New(&fexec, ProtocolIpv4)
runner := ipt.(*runner)
if testCase.Expected != runner.hasCheck {
t.Errorf("Expected result: %v, Got result: %v", testCase.Expected, runner.hasCheck)
@ -665,8 +651,7 @@ func TestWaitFlagUnavailable(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4)
defer runner.Destroy()
runner := New(&fexec, ProtocolIpv4)
err := runner.DeleteChain(TableNAT, Chain("FOOBAR"))
if err != nil {
t.Errorf("expected success, got %v", err)
@ -697,8 +682,7 @@ func TestWaitFlagOld(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4)
defer runner.Destroy()
runner := New(&fexec, ProtocolIpv4)
err := runner.DeleteChain(TableNAT, Chain("FOOBAR"))
if err != nil {
t.Errorf("expected success, got %v", err)
@ -732,8 +716,7 @@ func TestWaitFlagNew(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4)
defer runner.Destroy()
runner := New(&fexec, ProtocolIpv4)
err := runner.DeleteChain(TableNAT, Chain("FOOBAR"))
if err != nil {
t.Errorf("expected success, got %v", err)
@ -746,117 +729,6 @@ func TestWaitFlagNew(t *testing.T) {
}
}
func TestReload(t *testing.T) {
dbusConn := dbus.NewFakeConnection()
dbusConn.SetBusObject(func(method string, args ...interface{}) ([]interface{}, error) { return nil, nil })
dbusConn.AddObject(firewalldName, firewalldPath, func(method string, args ...interface{}) ([]interface{}, error) { return nil, nil })
fdbus := dbus.NewFake(dbusConn, nil)
reloaded := make(chan bool, 2)
fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{
// iptables version check
func() ([]byte, error) { return []byte("iptables v1.6.4"), nil },
// first reload
// EnsureChain
func() ([]byte, error) { return []byte{}, nil },
// EnsureRule abc check
func() ([]byte, error) { return []byte{}, &fakeexec.FakeExitError{Status: 1} },
// EnsureRule abc
func() ([]byte, error) { return []byte{}, nil },
// second reload
// EnsureChain
func() ([]byte, error) { return []byte{}, nil },
// EnsureRule abc check
func() ([]byte, error) { return []byte{}, &fakeexec.FakeExitError{Status: 1} },
// EnsureRule abc
func() ([]byte, error) { return []byte{}, nil },
},
}
fexec := fakeexec.FakeExec{
CommandScript: []fakeexec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, fdbus, ProtocolIpv4)
defer runner.Destroy()
runner.AddReloadFunc(func() {
exists, err := runner.EnsureChain(TableNAT, Chain("FOOBAR"))
if err != nil {
t.Errorf("expected success, got %v", err)
}
if exists {
t.Errorf("expected exists = false")
}
reloaded <- true
})
runner.AddReloadFunc(func() {
exists, err := runner.EnsureRule(Append, TableNAT, ChainOutput, "abc", "123")
if err != nil {
t.Errorf("expected success, got %v", err)
}
if exists {
t.Errorf("expected exists = false")
}
reloaded <- true
})
dbusConn.EmitSignal("org.freedesktop.DBus", "/org/freedesktop/DBus", "org.freedesktop.DBus", "NameOwnerChanged", firewalldName, "", ":1.1")
<-reloaded
<-reloaded
if fcmd.CombinedOutputCalls != 4 {
t.Errorf("expected 4 CombinedOutput() calls total, got %d", fcmd.CombinedOutputCalls)
}
if !sets.NewString(fcmd.CombinedOutputLog[1]...).HasAll("iptables", "-t", "nat", "-N", "FOOBAR") {
t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[2])
}
if !sets.NewString(fcmd.CombinedOutputLog[2]...).HasAll("iptables", "-t", "nat", "-C", "OUTPUT", "abc", "123") {
t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[3])
}
if !sets.NewString(fcmd.CombinedOutputLog[3]...).HasAll("iptables", "-t", "nat", "-A", "OUTPUT", "abc", "123") {
t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[4])
}
go func() { time.Sleep(time.Second / 100); reloaded <- true }()
dbusConn.EmitSignal(firewalldName, firewalldPath, firewalldInterface, "DefaultZoneChanged", "public")
dbusConn.EmitSignal("org.freedesktop.DBus", "/org/freedesktop/DBus", "org.freedesktop.DBus", "NameOwnerChanged", "io.k8s.Something", "", ":1.1")
<-reloaded
if fcmd.CombinedOutputCalls != 4 {
t.Errorf("Incorrect signal caused a reload")
}
dbusConn.EmitSignal(firewalldName, firewalldPath, firewalldInterface, "Reloaded")
<-reloaded
<-reloaded
if fcmd.CombinedOutputCalls != 7 {
t.Errorf("expected 7 CombinedOutput() calls total, got %d", fcmd.CombinedOutputCalls)
}
if !sets.NewString(fcmd.CombinedOutputLog[4]...).HasAll("iptables", "-t", "nat", "-N", "FOOBAR") {
t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[5])
}
if !sets.NewString(fcmd.CombinedOutputLog[5]...).HasAll("iptables", "-t", "nat", "-C", "OUTPUT", "abc", "123") {
t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[6])
}
if !sets.NewString(fcmd.CombinedOutputLog[6]...).HasAll("iptables", "-t", "nat", "-A", "OUTPUT", "abc", "123") {
t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[7])
}
}
func testSaveInto(t *testing.T, protocol Protocol) {
version := " v1.9.22"
iptablesCmd := iptablesCommand(protocol)
@ -890,8 +762,7 @@ COMMIT
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, dbus.NewFake(nil, nil), protocol)
defer runner.Destroy()
runner := New(&fexec, protocol)
buffer := bytes.NewBuffer(nil)
// Success.
@ -960,8 +831,7 @@ func testRestore(t *testing.T, protocol Protocol) {
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, dbus.NewFake(nil, nil), protocol)
defer runner.Destroy()
runner := New(&fexec, protocol)
// both flags true
err := runner.Restore(TableNAT, []byte{}, FlushTables, RestoreCounters)
@ -1043,9 +913,8 @@ func TestRestoreAll(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := newInternal(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4, TestLockfilePath)
runner := newInternal(&fexec, ProtocolIpv4, TestLockfilePath)
defer os.Remove(TestLockfilePath)
defer runner.Destroy()
err := runner.RestoreAll([]byte{}, NoFlushTables, RestoreCounters)
if err != nil {
@ -1085,9 +954,8 @@ func TestRestoreAllWait(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := newInternal(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4, TestLockfilePath)
runner := newInternal(&fexec, ProtocolIpv4, TestLockfilePath)
defer os.Remove(TestLockfilePath)
defer runner.Destroy()
err := runner.RestoreAll([]byte{}, NoFlushTables, RestoreCounters)
if err != nil {
@ -1131,9 +999,8 @@ func TestRestoreAllWaitOldIptablesRestore(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := newInternal(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4, TestLockfilePath)
runner := newInternal(&fexec, ProtocolIpv4, TestLockfilePath)
defer os.Remove(TestLockfilePath)
defer runner.Destroy()
err := runner.RestoreAll([]byte{}, NoFlushTables, RestoreCounters)
if err != nil {
@ -1178,9 +1045,8 @@ func TestRestoreAllGrabNewLock(t *testing.T) {
},
}
runner := newInternal(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4, TestLockfilePath)
runner := newInternal(&fexec, ProtocolIpv4, TestLockfilePath)
defer os.Remove(TestLockfilePath)
defer runner.Destroy()
// Grab the /run lock and ensure the RestoreAll fails
runLock, err := os.OpenFile(TestLockfilePath, os.O_CREATE, 0600)
@ -1221,9 +1087,8 @@ func TestRestoreAllGrabOldLock(t *testing.T) {
},
}
runner := newInternal(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4, TestLockfilePath)
runner := newInternal(&fexec, ProtocolIpv4, TestLockfilePath)
defer os.Remove(TestLockfilePath)
defer runner.Destroy()
// Grab the abstract @xtables socket
runLock, err := net.ListenUnix("unix", &net.UnixAddr{Name: "@xtables", Net: "unix"})
@ -1262,9 +1127,8 @@ func TestRestoreAllWaitBackportedIptablesRestore(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := newInternal(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4, TestLockfilePath)
runner := newInternal(&fexec, ProtocolIpv4, TestLockfilePath)
defer os.Remove(TestLockfilePath)
defer runner.Destroy()
err := runner.RestoreAll([]byte{}, NoFlushTables, RestoreCounters)
if err != nil {

View File

@ -0,0 +1,268 @@
// +build linux
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package iptables
import (
"context"
"fmt"
"io"
"sync"
"sync/atomic"
"testing"
"time"
"k8s.io/apimachinery/pkg/util/sets"
utilwait "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/utils/exec"
)
// We can't use the normal FakeExec because we don't know precisely how many times the
// Monitor thread will do its checks, and we don't know precisely how its iptables calls
// will interleave with the main thread's. So we use our own fake Exec implementation that
// implements a minimal iptables interface. This will need updates as iptables.runner
// changes its use of Exec.
type monitorFakeExec struct {
sync.Mutex
tables map[string]sets.String
}
func newMonitorFakeExec() *monitorFakeExec {
tables := make(map[string]sets.String)
tables["mangle"] = sets.NewString()
tables["filter"] = sets.NewString()
tables["nat"] = sets.NewString()
return &monitorFakeExec{tables: tables}
}
func (mfe *monitorFakeExec) Command(cmd string, args ...string) exec.Cmd {
return &monitorFakeCmd{mfe: mfe, cmd: cmd, args: args}
}
func (mfe *monitorFakeExec) CommandContext(ctx context.Context, cmd string, args ...string) exec.Cmd {
return mfe.Command(cmd, args...)
}
func (mfe *monitorFakeExec) LookPath(file string) (string, error) {
return file, nil
}
type monitorFakeCmd struct {
mfe *monitorFakeExec
cmd string
args []string
}
func (mfc *monitorFakeCmd) CombinedOutput() ([]byte, error) {
if mfc.cmd == cmdIPTablesRestore {
// Only used for "iptables-restore --version", and the result doesn't matter
return []byte{}, nil
} else if mfc.cmd != cmdIPTables {
panic("bad command " + mfc.cmd)
}
if len(mfc.args) == 1 && mfc.args[0] == "--version" {
return []byte("iptables v1.6.2"), nil
}
if len(mfc.args) != 6 || mfc.args[0] != WaitString || mfc.args[1] != WaitSecondsValue || mfc.args[4] != "-t" {
panic(fmt.Sprintf("bad args %#v", mfc.args))
}
op := operation(mfc.args[2])
chainName := mfc.args[3]
tableName := mfc.args[5]
mfc.mfe.Lock()
defer mfc.mfe.Unlock()
table := mfc.mfe.tables[tableName]
if table == nil {
return []byte{}, fmt.Errorf("no such table %q", tableName)
}
switch op {
case opCreateChain:
if !table.Has(chainName) {
table.Insert(chainName)
}
return []byte{}, nil
case opListChain:
if table.Has(chainName) {
return []byte{}, nil
} else {
return []byte{}, fmt.Errorf("no such chain %q", chainName)
}
case opDeleteChain:
table.Delete(chainName)
return []byte{}, nil
default:
panic("should not be reached")
}
}
func (mfc *monitorFakeCmd) SetStdin(in io.Reader) {
// Used by getIPTablesRestoreVersionString(), can be ignored
}
func (mfc *monitorFakeCmd) Run() error {
panic("should not be reached")
}
func (mfc *monitorFakeCmd) Output() ([]byte, error) {
panic("should not be reached")
}
func (mfc *monitorFakeCmd) SetDir(dir string) {
panic("should not be reached")
}
func (mfc *monitorFakeCmd) SetStdout(out io.Writer) {
panic("should not be reached")
}
func (mfc *monitorFakeCmd) SetStderr(out io.Writer) {
panic("should not be reached")
}
func (mfc *monitorFakeCmd) SetEnv(env []string) {
panic("should not be reached")
}
func (mfc *monitorFakeCmd) StdoutPipe() (io.ReadCloser, error) {
panic("should not be reached")
}
func (mfc *monitorFakeCmd) StderrPipe() (io.ReadCloser, error) {
panic("should not be reached")
}
func (mfc *monitorFakeCmd) Start() error {
panic("should not be reached")
}
func (mfc *monitorFakeCmd) Wait() error {
panic("should not be reached")
}
func (mfc *monitorFakeCmd) Stop() {
panic("should not be reached")
}
func TestIPTablesMonitor(t *testing.T) {
mfe := newMonitorFakeExec()
ipt := New(mfe, ProtocolIpv4)
var reloads uint32
stopCh := make(chan struct{})
canary := Chain("MONITOR-TEST-CANARY")
tables := []Table{TableMangle, TableFilter, TableNAT}
go ipt.Monitor(canary, tables, func() {
if !ensureNoChains(mfe) {
t.Errorf("reload called while canaries still exist")
}
atomic.AddUint32(&reloads, 1)
}, 100*time.Millisecond, stopCh)
// Monitor should create canary chains quickly
if err := waitForChains(mfe, canary, tables); err != nil {
t.Errorf("failed to create iptables canaries: %v", err)
}
if err := waitForReloads(&reloads, 0); err != nil {
t.Errorf("got unexpected reloads: %v", err)
}
// If we delete all of the chains, it should reload
ipt.DeleteChain(TableMangle, canary)
ipt.DeleteChain(TableFilter, canary)
ipt.DeleteChain(TableNAT, canary)
if err := waitForReloads(&reloads, 1); err != nil {
t.Errorf("got unexpected number of reloads after flush: %v", err)
}
if err := waitForChains(mfe, canary, tables); err != nil {
t.Errorf("failed to create iptables canaries: %v", err)
}
// If we delete two chains, it should not reload yet
ipt.DeleteChain(TableMangle, canary)
ipt.DeleteChain(TableFilter, canary)
if err := waitForNoReload(&reloads, 1); err != nil {
t.Errorf("got unexpected number of reloads after partial flush: %v", err)
}
// If we delete the last chain, it should reload now
ipt.DeleteChain(TableNAT, canary)
if err := waitForReloads(&reloads, 2); err != nil {
t.Errorf("got unexpected number of reloads after slow flush: %v", err)
}
if err := waitForChains(mfe, canary, tables); err != nil {
t.Errorf("failed to create iptables canaries: %v", err)
}
// If we close the stop channel, it should stop running
close(stopCh)
if err := waitForNoReload(&reloads, 2); err != nil {
t.Errorf("got unexpected number of reloads after partial flush: %v", err)
}
if !ensureNoChains(mfe) {
t.Errorf("canaries still exist after stopping monitor")
}
}
func waitForChains(mfe *monitorFakeExec, canary Chain, tables []Table) error {
return utilwait.PollImmediate(100*time.Millisecond, time.Second, func() (bool, error) {
mfe.Lock()
defer mfe.Unlock()
for _, table := range tables {
if !mfe.tables[string(table)].Has(string(canary)) {
return false, nil
}
}
return true, nil
})
}
func ensureNoChains(mfe *monitorFakeExec) bool {
mfe.Lock()
defer mfe.Unlock()
return mfe.tables["mangle"].Len() == 0 &&
mfe.tables["filter"].Len() == 0 &&
mfe.tables["nat"].Len() == 0
}
func waitForReloads(reloads *uint32, expected uint32) error {
if atomic.LoadUint32(reloads) < expected {
utilwait.PollImmediate(100*time.Millisecond, time.Second, func() (bool, error) {
return atomic.LoadUint32(reloads) >= expected, nil
})
}
got := atomic.LoadUint32(reloads)
if got != expected {
return fmt.Errorf("expected %d, got %d", expected, got)
}
return nil
}
func waitForNoReload(reloads *uint32, expected uint32) error {
utilwait.PollImmediate(50*time.Millisecond, 250*time.Millisecond, func() (bool, error) {
return atomic.LoadUint32(reloads) > expected, nil
})
got := atomic.LoadUint32(reloads)
if got != expected {
return fmt.Errorf("expected %d, got %d", expected, got)
}
return nil
}

View File

@ -20,6 +20,7 @@ import (
"bytes"
"fmt"
"strings"
"time"
"k8s.io/kubernetes/pkg/util/iptables"
)
@ -98,9 +99,9 @@ func (f *FakeIPTables) RestoreAll(data []byte, flush iptables.FlushFlag, counter
f.Lines = data
return nil
}
func (*FakeIPTables) AddReloadFunc(reloadFunc func()) {}
func (*FakeIPTables) Destroy() {}
func (f *FakeIPTables) Monitor(canary iptables.Chain, tables []iptables.Table, reloadFunc func(), interval time.Duration, stopCh <-chan struct{}) {
}
func getToken(line, separator string) string {
tokens := strings.Split(line, separator)

View File

@ -19,10 +19,14 @@ package network
import (
"fmt"
"net/http"
"strings"
"k8s.io/apimachinery/pkg/util/sets"
utilwait "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/master/ports"
"k8s.io/kubernetes/test/e2e/framework"
e2eservice "k8s.io/kubernetes/test/e2e/framework/service"
e2essh "k8s.io/kubernetes/test/e2e/framework/ssh"
"github.com/onsi/ginkgo"
)
@ -235,4 +239,88 @@ var _ = SIGDescribe("Networking", func() {
}
})
})
ginkgo.It("should recreate its iptables rules if they are deleted [Disruptive]", func() {
framework.SkipUnlessProviderIs(framework.ProvidersWithSSH...)
framework.SkipUnlessSSHKeyPresent()
hosts, err := e2essh.NodeSSHHosts(f.ClientSet)
framework.ExpectNoError(err, "failed to find external/internal IPs for every node")
if len(hosts) == 0 {
framework.Failf("No ssh-able nodes")
}
host := hosts[0]
ns := f.Namespace.Name
numPods, servicePort := 3, defaultServeHostnameServicePort
svc := "iptables-flush-test"
defer func() {
framework.ExpectNoError(e2eservice.StopServeHostnameService(f.ClientSet, ns, svc))
}()
podNames, svcIP, err := e2eservice.StartServeHostnameService(f.ClientSet, getServeHostnameService(svc), ns, numPods)
framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svc, ns)
// Ideally we want to reload the system firewall, but we don't necessarily
// know how to do that on this system ("firewall-cmd --reload"? "systemctl
// restart iptables"?). So instead we just manually delete all "KUBE-"
// chains.
ginkgo.By("dumping iptables rules on a node")
result, err := e2essh.SSH("sudo iptables-save", host, framework.TestContext.Provider)
if err != nil || result.Code != 0 {
e2essh.LogResult(result)
framework.Failf("couldn't dump iptable rules: %v", err)
}
// All the commands that delete rules have to come before all the commands
// that delete chains, since the chains can't be deleted while there are
// still rules referencing them.
var deleteRuleCmds, deleteChainCmds []string
table := ""
for _, line := range strings.Split(result.Stdout, "\n") {
if strings.HasPrefix(line, "*") {
table = line[1:]
} else if table == "" {
continue
}
// Delete jumps from non-KUBE chains to KUBE chains
if !strings.HasPrefix(line, "-A KUBE-") && strings.Contains(line, "-j KUBE-") {
deleteRuleCmds = append(deleteRuleCmds, fmt.Sprintf("sudo iptables -t %s -D %s || true", table, line[3:]))
}
// Flush and delete all KUBE chains
if strings.HasPrefix(line, ":KUBE-") {
chain := strings.Split(line, " ")[0][1:]
deleteRuleCmds = append(deleteRuleCmds, fmt.Sprintf("sudo iptables -t %s -F %s || true", table, chain))
deleteChainCmds = append(deleteChainCmds, fmt.Sprintf("sudo iptables -t %s -X %s || true", table, chain))
}
}
cmd := strings.Join(append(deleteRuleCmds, deleteChainCmds...), "\n")
ginkgo.By("deleting all KUBE-* iptables chains")
result, err = e2essh.SSH(cmd, host, framework.TestContext.Provider)
if err != nil || result.Code != 0 {
e2essh.LogResult(result)
framework.Failf("couldn't delete iptable rules: %v", err)
}
ginkgo.By("verifying that kube-proxy rules are eventually recreated")
framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceUp(f.ClientSet, ns, host, podNames, svcIP, servicePort))
ginkgo.By("verifying that kubelet rules are eventually recreated")
err = utilwait.PollImmediate(framework.Poll, framework.RestartNodeReadyAgainTimeout, func() (bool, error) {
result, err = e2essh.SSH("sudo iptables-save -t nat", host, framework.TestContext.Provider)
if err != nil || result.Code != 0 {
e2essh.LogResult(result)
return false, err
}
if strings.Contains(result.Stdout, "\n-A KUBE-MARK-DROP ") {
return true, nil
}
return false, nil
})
framework.ExpectNoError(err, "kubelet did not recreate its iptables rules")
})
})

View File

@ -476,18 +476,6 @@ var _ = SIGDescribe("Services", func() {
}
framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceUp(cs, ns, host, podNames1, svc1IP, servicePort))
framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceUp(cs, ns, host, podNames2, svc2IP, servicePort))
ginkgo.By("Removing iptable rules")
result, err := e2essh.SSH(`
sudo iptables -t nat -F KUBE-SERVICES || true;
sudo iptables -t nat -F KUBE-PORTALS-HOST || true;
sudo iptables -t nat -F KUBE-PORTALS-CONTAINER || true`, host, framework.TestContext.Provider)
if err != nil || result.Code != 0 {
e2essh.LogResult(result)
framework.Failf("couldn't remove iptable rules: %v", err)
}
framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceUp(cs, ns, host, podNames1, svc1IP, servicePort))
framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceUp(cs, ns, host, podNames2, svc2IP, servicePort))
})
ginkgo.It("should work after restarting apiserver [Disruptive]", func() {

View File

@ -778,7 +778,6 @@ k8s.io/kubernetes/pkg/util/async,spxtr,1,
k8s.io/kubernetes/pkg/util/bandwidth,thockin,1,
k8s.io/kubernetes/pkg/util/config,jszczepkowski,1,
k8s.io/kubernetes/pkg/util/configz,ixdy,1,
k8s.io/kubernetes/pkg/util/dbus,roberthbailey,1,
k8s.io/kubernetes/pkg/util/env,asalkeld,0,
k8s.io/kubernetes/pkg/util/goroutinemap,saad-ali,0,
k8s.io/kubernetes/pkg/util/hash,timothysc,1,

1 name owner auto-assigned sig
778 k8s.io/kubernetes/pkg/util/bandwidth thockin 1
779 k8s.io/kubernetes/pkg/util/config jszczepkowski 1
780 k8s.io/kubernetes/pkg/util/configz ixdy 1
k8s.io/kubernetes/pkg/util/dbus roberthbailey 1
781 k8s.io/kubernetes/pkg/util/env asalkeld 0
782 k8s.io/kubernetes/pkg/util/goroutinemap saad-ali 0
783 k8s.io/kubernetes/pkg/util/hash timothysc 1