diff --git a/cluster/get-kube-local.sh b/cluster/get-kube-local.sh index b9f2e568e7a..a0aa7a9c2ff 100755 --- a/cluster/get-kube-local.sh +++ b/cluster/get-kube-local.sh @@ -62,6 +62,7 @@ function create_cluster { --volume=/var/lib/docker/:/var/lib/docker:rw \ --volume=/var/lib/kubelet/:/var/lib/kubelet:rw \ --volume=/var/run:/var/run:rw \ + --volume=/run/xtables.lock:/run/xtables.lock:rw \ --net=host \ --pid=host \ --privileged=true \ diff --git a/hack/local-up-cluster.sh b/hack/local-up-cluster.sh index d992511ada0..ef6962a29db 100755 --- a/hack/local-up-cluster.sh +++ b/hack/local-up-cluster.sh @@ -687,6 +687,7 @@ function start_kubelet { --volume=/var/lib/docker/:/var/lib/docker:ro \ --volume=/var/lib/kubelet/:/var/lib/kubelet:rw \ --volume=/dev:/dev \ + --volume=/run/xtables.lock:/run/xtables.lock:rw \ ${cred_bind} \ --net=host \ --privileged=true \ diff --git a/pkg/util/iptables/BUILD b/pkg/util/iptables/BUILD index 7dcb1e37a74..7ed0b92a48b 100644 --- a/pkg/util/iptables/BUILD +++ b/pkg/util/iptables/BUILD @@ -23,6 +23,7 @@ go_library( "//vendor/github.com/godbus/dbus:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", ], ) diff --git a/pkg/util/iptables/OWNERS b/pkg/util/iptables/OWNERS new file mode 100644 index 00000000000..8005a3c3567 --- /dev/null +++ b/pkg/util/iptables/OWNERS @@ -0,0 +1,9 @@ +reviewers: + - dcbw + - thockin + - eparis +approvers: + - dcbw + - thockin + - eparis + diff --git a/pkg/util/iptables/iptables.go b/pkg/util/iptables/iptables.go index 9c8800f9997..c25cf2776f2 100644 --- a/pkg/util/iptables/iptables.go +++ b/pkg/util/iptables/iptables.go @@ -19,13 +19,18 @@ package iptables import ( "bytes" "fmt" + "net" + "os" "regexp" "strings" "sync" + "syscall" + "time" godbus "github.com/godbus/dbus" "github.com/golang/glog" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" utildbus "k8s.io/kubernetes/pkg/util/dbus" utilexec "k8s.io/kubernetes/pkg/util/exec" utilversion "k8s.io/kubernetes/pkg/util/version" @@ -122,6 +127,8 @@ const MinCheckVersion = "1.4.11" const MinWaitVersion = "1.4.20" const MinWait2Version = "1.4.22" +const LockfilePath16x = "/run/xtables.lock" + // runner implements Interface in terms of exec("iptables"). type runner struct { mu sync.Mutex @@ -131,19 +138,25 @@ type runner struct { hasCheck bool waitFlag []string restoreWaitFlag []string + lockfilePath string reloadFuncs []func() signal chan *godbus.Signal } -// New returns a new Interface which will exec iptables. -func New(exec utilexec.Interface, dbus utildbus.Interface, protocol Protocol) Interface { +// newInternal returns a new Interface which will exec iptables, and allows the +// caller to change the iptables-restore lockfile path +func newInternal(exec utilexec.Interface, dbus utildbus.Interface, protocol Protocol, lockfilePath string) Interface { vstring, err := getIPTablesVersionString(exec) if err != nil { glog.Warningf("Error checking iptables version, assuming version at least %s: %v", MinCheckVersion, err) vstring = MinCheckVersion } + if lockfilePath == "" { + lockfilePath = LockfilePath16x + } + runner := &runner{ exec: exec, dbus: dbus, @@ -151,6 +164,7 @@ func New(exec utilexec.Interface, dbus utildbus.Interface, protocol Protocol) In hasCheck: getIPTablesHasCheckCommand(vstring), waitFlag: getIPTablesWaitFlag(vstring), restoreWaitFlag: getIPTablesRestoreWaitFlag(exec), + lockfilePath: lockfilePath, } // TODO this needs to be moved to a separate Start() or Run() function so that New() has zero side // effects. @@ -158,6 +172,11 @@ func New(exec utilexec.Interface, dbus utildbus.Interface, protocol Protocol) In return runner } +// New returns a new Interface which will exec iptables. +func New(exec utilexec.Interface, dbus utildbus.Interface, protocol Protocol) Interface { + return newInternal(exec, dbus, protocol, "") +} + // Destroy is part of Interface. func (runner *runner) Destroy() { if runner.signal != nil { @@ -327,6 +346,71 @@ func (runner *runner) RestoreAll(data []byte, flush FlushFlag, counters RestoreC return runner.restoreInternal(args, data, flush, counters) } +type locker struct { + lock16 *os.File + lock14 *net.UnixListener +} + +func (l *locker) Close() { + if l.lock16 != nil { + l.lock16.Close() + } + if l.lock14 != nil { + l.lock14.Close() + } +} + +func (runner *runner) grabIptablesLocks() (*locker, error) { + var err error + var success bool + + l := &locker{} + defer func(l *locker) { + // Clean up immediately on failure + if !success { + l.Close() + } + }(l) + + if len(runner.restoreWaitFlag) > 0 { + // iptables-restore supports --wait; no need to grab locks + return l, nil + } + + // Grab both 1.6.x and 1.4.x-style locks; we don't know what the + // iptables-restore version is if it doesn't support --wait, so we + // can't assume which lock method it'll use. + + // Roughly duplicate iptables 1.6.x xtables_lock() function. + l.lock16, err = os.OpenFile(runner.lockfilePath, os.O_CREATE, 0600) + if err != nil { + return nil, fmt.Errorf("failed to open iptables lock %s: %v", runner.lockfilePath, err) + } + + if err := wait.PollImmediate(200*time.Millisecond, 2*time.Second, func() (bool, error) { + if err := syscall.Flock(int(l.lock16.Fd()), syscall.LOCK_EX|syscall.LOCK_NB); err != nil { + return false, nil + } + return true, nil + }); err != nil { + return nil, fmt.Errorf("failed to acquire new iptables lock: %v", err) + } + + // Roughly duplicate iptables 1.4.x xtables_lock() function. + if err := wait.PollImmediate(200*time.Millisecond, 2*time.Second, func() (bool, error) { + l.lock14, err = net.ListenUnix("unix", &net.UnixAddr{Name: "@xtables", Net: "unix"}) + if err != nil { + return false, nil + } + return true, nil + }); err != nil { + return nil, fmt.Errorf("failed to acquire old iptables lock: %v", err) + } + + success = true + return l, nil +} + // restoreInternal is the shared part of Restore/RestoreAll func (runner *runner) restoreInternal(args []string, data []byte, flush FlushFlag, counters RestoreCountersFlag) error { runner.mu.Lock() @@ -339,6 +423,15 @@ func (runner *runner) restoreInternal(args []string, data []byte, flush FlushFla args = append(args, "--counters") } + // Grab the iptables lock to prevent iptables-restore and iptables + // from stepping on each other. iptables-restore 1.6.2 will have + // a --wait option like iptables itself, but that's not widely deployed. + locker, err := runner.grabIptablesLocks() + if err != nil { + return err + } + defer locker.Close() + // run the command and return the output or an error including the output and error fullArgs := append(runner.restoreWaitFlag, args...) glog.V(4).Infof("running iptables-restore %v", fullArgs) diff --git a/pkg/util/iptables/iptables_test.go b/pkg/util/iptables/iptables_test.go index f65f14ddc70..17cd665f049 100644 --- a/pkg/util/iptables/iptables_test.go +++ b/pkg/util/iptables/iptables_test.go @@ -17,7 +17,10 @@ limitations under the License. package iptables import ( + "net" + "os" "strings" + "syscall" "testing" "time" @@ -26,6 +29,8 @@ import ( "k8s.io/kubernetes/pkg/util/exec" ) +const TestLockfilePath = "xtables.lock" + func getIPTablesCommand(protocol Protocol) string { if protocol == ProtocolIpv4 { return cmdIPTables @@ -1036,12 +1041,12 @@ func TestRestoreAll(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4) + runner := newInternal(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4, TestLockfilePath) defer runner.Destroy() err := runner.RestoreAll([]byte{}, NoFlushTables, RestoreCounters) if err != nil { - t.Errorf("expected success, got %v", err) + t.Fatalf("expected success, got %v", err) } commandSet := sets.NewString(fcmd.CombinedOutputLog[2]...) @@ -1080,12 +1085,12 @@ func TestRestoreAllWait(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4) + runner := newInternal(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4, TestLockfilePath) defer runner.Destroy() err := runner.RestoreAll([]byte{}, NoFlushTables, RestoreCounters) if err != nil { - t.Errorf("expected success, got %v", err) + t.Fatalf("expected success, got %v", err) } commandSet := sets.NewString(fcmd.CombinedOutputLog[2]...) @@ -1125,12 +1130,12 @@ func TestRestoreAllWaitOldIptablesRestore(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4) + runner := newInternal(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4, TestLockfilePath) defer runner.Destroy() err := runner.RestoreAll([]byte{}, NoFlushTables, RestoreCounters) if err != nil { - t.Errorf("expected success, got %v", err) + t.Fatalf("expected success, got %v", err) } commandSet := sets.NewString(fcmd.CombinedOutputLog[2]...) @@ -1151,3 +1156,83 @@ func TestRestoreAllWaitOldIptablesRestore(t *testing.T) { t.Errorf("expected failure") } } + +// TestRestoreAllGrabNewLock tests that the iptables code will grab the +// iptables /run lock when using an iptables-restore version that does not +// support the --wait argument +func TestRestoreAllGrabNewLock(t *testing.T) { + fcmd := exec.FakeCmd{ + CombinedOutputScript: []exec.FakeCombinedOutputAction{ + // iptables version check + func() ([]byte, error) { return []byte("iptables v1.9.22"), nil }, + // iptables-restore version check + func() ([]byte, error) { return []byte("unrecognized option: --version"), nil }, + }, + } + fexec := exec.FakeExec{ + CommandScript: []exec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + } + + runner := newInternal(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4, TestLockfilePath) + defer runner.Destroy() + + // Grab the /run lock and ensure the RestoreAll fails + runLock, err := os.OpenFile(TestLockfilePath, os.O_CREATE, 0600) + if err != nil { + t.Fatalf("expected to open %s, got %v", TestLockfilePath, err) + } + defer runLock.Close() + + if err := syscall.Flock(int(runLock.Fd()), syscall.LOCK_EX|syscall.LOCK_NB); err != nil { + t.Errorf("expected to lock %s, got %v", TestLockfilePath, err) + } + + err = runner.RestoreAll([]byte{}, NoFlushTables, RestoreCounters) + if err == nil { + t.Errorf("expected failure, got success instead") + } + if !strings.Contains(err.Error(), "failed to acquire new iptables lock: timed out waiting for the condition") { + t.Errorf("expected timeout error, got %v", err) + } +} + +// TestRestoreAllGrabOldLock tests that the iptables code will grab the +// iptables @xtables abstract unix socket lock when using an iptables-restore +// version that does not support the --wait argument +func TestRestoreAllGrabOldLock(t *testing.T) { + fcmd := exec.FakeCmd{ + CombinedOutputScript: []exec.FakeCombinedOutputAction{ + // iptables version check + func() ([]byte, error) { return []byte("iptables v1.9.22"), nil }, + // iptables-restore version check + func() ([]byte, error) { return []byte("unrecognized option: --version"), nil }, + }, + } + fexec := exec.FakeExec{ + CommandScript: []exec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + } + + runner := newInternal(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4, TestLockfilePath) + defer runner.Destroy() + + // Grab the abstract @xtables socket + runLock, err := net.ListenUnix("unix", &net.UnixAddr{Name: "@xtables", Net: "unix"}) + if err != nil { + t.Fatalf("expected to lock @xtables, got %v", err) + } + defer runLock.Close() + + err = runner.RestoreAll([]byte{}, NoFlushTables, RestoreCounters) + if err == nil { + t.Errorf("expected failure, got success instead") + } + if !strings.Contains(err.Error(), "failed to acquire old iptables lock: timed out waiting for the condition") { + t.Errorf("expected timeout error, got %v", err) + } +}