diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 9a6cfb8ac9c..0b9567c0aba 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -38,6 +38,7 @@ import ( "k8s.io/kubernetes/pkg/proxy/userspace" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" + utildbus "k8s.io/kubernetes/pkg/util/dbus" "k8s.io/kubernetes/pkg/util/exec" utiliptables "k8s.io/kubernetes/pkg/util/iptables" nodeutil "k8s.io/kubernetes/pkg/util/node" @@ -105,7 +106,8 @@ func (s *ProxyServer) Run(_ []string) error { // remove iptables rules and exit if s.CleanupAndExit { execer := exec.New() - ipt := utiliptables.New(execer, protocol) + dbus := utildbus.New() + ipt := utiliptables.New(execer, dbus, protocol) encounteredError := userspace.CleanupLeftovers(ipt) encounteredError = iptables.CleanupLeftovers(ipt) || encounteredError if encounteredError { @@ -165,6 +167,10 @@ func (s *ProxyServer) Run(_ []string) error { var proxier proxy.ProxyProvider var endpointsHandler config.EndpointsConfigHandler + execer := exec.New() + dbus := utildbus.New() + ipt := utiliptables.New(execer, dbus, protocol) + shouldUseIptables := false if !s.ForceUserspaceProxy { var err error @@ -178,8 +184,6 @@ func (s *ProxyServer) Run(_ []string) error { if shouldUseIptables { glog.V(2).Info("Using iptables Proxier.") - execer := exec.New() - ipt := utiliptables.New(execer, protocol) proxierIptables, err := iptables.NewProxier(ipt, execer, s.SyncPeriod, s.MasqueradeAll) if err != nil { glog.Fatalf("Unable to create proxier: %v", err) @@ -198,8 +202,6 @@ func (s *ProxyServer) Run(_ []string) error { // set EndpointsConfigHandler to our loadBalancer endpointsHandler = loadBalancer - execer := exec.New() - ipt := utiliptables.New(execer, protocol) proxierUserspace, err := userspace.NewProxier(loadBalancer, s.BindAddress, ipt, s.PortRange, s.SyncPeriod) if err != nil { glog.Fatalf("Unable to create proxier: %v", err) diff --git a/pkg/proxy/userspace/proxier_test.go b/pkg/proxy/userspace/proxier_test.go index 3c789e62be5..b8d6ecd460f 100644 --- a/pkg/proxy/userspace/proxier_test.go +++ b/pkg/proxy/userspace/proxier_test.go @@ -121,6 +121,12 @@ func (fake *fakeIptables) RestoreAll(data []byte, flush iptables.FlushFlag, coun return nil } +func (fake *fakeIptables) AddReloadFunc(reloadFunc func()) { +} + +func (fake *fakeIptables) Destroy() { +} + var tcpServerPort int var udpServerPort int diff --git a/pkg/util/iptables/iptables.go b/pkg/util/iptables/iptables.go index ab0f0e71ff5..d314f3df8db 100644 --- a/pkg/util/iptables/iptables.go +++ b/pkg/util/iptables/iptables.go @@ -25,7 +25,9 @@ import ( "sync" "github.com/coreos/go-semver/semver" + godbus "github.com/godbus/dbus" "github.com/golang/glog" + utildbus "k8s.io/kubernetes/pkg/util/dbus" utilexec "k8s.io/kubernetes/pkg/util/exec" "k8s.io/kubernetes/pkg/util/sets" ) @@ -64,6 +66,10 @@ type Interface interface { Restore(table Table, data []byte, flush FlushFlag, counters RestoreCountersFlag) error // RestoreAll is the same as Restore except that no table is specified. RestoreAll(data []byte, flush FlushFlag, counters RestoreCountersFlag) error + // AddReloadFunc adds a function to call on iptables reload + AddReloadFunc(reloadFunc func()) + // Destroy cleans up resources used by the Interface + Destroy() } type Protocol byte @@ -118,24 +124,66 @@ const MinWait2Version = "1.4.22" type runner struct { mu sync.Mutex exec utilexec.Interface + dbus utildbus.Interface protocol Protocol hasCheck bool waitFlag []string + + reloadFuncs []func() + signal chan *godbus.Signal } // New returns a new Interface which will exec iptables. -func New(exec utilexec.Interface, protocol Protocol) Interface { +func New(exec utilexec.Interface, dbus utildbus.Interface, protocol Protocol) Interface { vstring, err := GetIptablesVersionString(exec) if err != nil { glog.Warningf("Error checking iptables version, assuming version at least %s: %v", MinCheckVersion, err) vstring = MinCheckVersion } - return &runner{ + runner := &runner{ exec: exec, + dbus: dbus, protocol: protocol, hasCheck: getIptablesHasCheckCommand(vstring), waitFlag: getIptablesWaitFlag(vstring), } + runner.connectToFirewallD() + return runner +} + +// Destroy is part of Interface. +func (runner *runner) Destroy() { + if runner.signal != nil { + runner.signal <- nil + } +} + +const ( + firewalldName = "org.fedoraproject.FirewallD1" + firewalldPath = "/org/fedoraproject/FirewallD1" + firewalldInterface = "org.fedoraproject.FirewallD1" +) + +// Connects to D-Bus and listens for FirewallD start/restart. (On non-FirewallD-using +// systems, this is effectively a no-op; we listen for the signals, but they will never be +// emitted, so reload() will never be called.) +func (runner *runner) connectToFirewallD() { + bus, err := runner.dbus.SystemBus() + if err != nil { + glog.V(1).Info("Could not connect to D-Bus system bus: %s", err) + return + } + + rule := fmt.Sprintf("type='signal',sender='%s',path='%s',interface='%s',member='Reloaded'", firewalldName, firewalldPath, firewalldInterface) + bus.BusObject().Call("org.freedesktop.DBus.AddMatch", 0, rule) + + rule = fmt.Sprintf("type='signal',interface='org.freedesktop.DBus',member='NameOwnerChanged',path='/org/freedesktop/DBus',sender='org.freedesktop.DBus',arg0='%s'", firewalldName) + bus.BusObject().Call("org.freedesktop.DBus.AddMatch", 0, rule) + + runner.signal = make(chan *godbus.Signal, 10) + bus.Signal(runner.signal) + + go runner.dbusSignalHandler(bus) } // EnsureChain is part of Interface. @@ -472,3 +520,50 @@ func GetIptablesVersionString(exec utilexec.Interface) (string, error) { } return match[1], nil } + +// goroutine to listen for D-Bus signals +func (runner *runner) dbusSignalHandler(bus utildbus.Connection) { + firewalld := bus.Object(firewalldName, firewalldPath) + + for s := range runner.signal { + if s == nil { + // Unregister + bus.Signal(runner.signal) + return + } + + switch s.Name { + case "org.freedesktop.DBus.NameOwnerChanged": + name := s.Body[0].(string) + new_owner := s.Body[2].(string) + + if name != firewalldName || len(new_owner) == 0 { + continue + } + + // FirewallD startup (specifically the part where it deletes + // all existing iptables rules) may not yet be complete when + // we get this signal, so make a dummy request to it to + // synchronize. + firewalld.Call(firewalldInterface+".getDefaultZone", 0) + + runner.reload() + case firewalldInterface + ".Reloaded": + runner.reload() + } + } +} + +// AddReloadFunc is part of Interface +func (runner *runner) AddReloadFunc(reloadFunc func()) { + runner.reloadFuncs = append(runner.reloadFuncs, reloadFunc) +} + +// runs all reload funcs to re-sync iptables rules +func (runner *runner) reload() { + glog.V(1).Infof("reloading iptables rules") + + for _, f := range runner.reloadFuncs { + f() + } +} diff --git a/pkg/util/iptables/iptables_test.go b/pkg/util/iptables/iptables_test.go index c8f7c1a100e..8bef165957b 100644 --- a/pkg/util/iptables/iptables_test.go +++ b/pkg/util/iptables/iptables_test.go @@ -19,7 +19,9 @@ package iptables import ( "strings" "testing" + "time" + "k8s.io/kubernetes/pkg/util/dbus" "k8s.io/kubernetes/pkg/util/exec" "k8s.io/kubernetes/pkg/util/sets" ) @@ -55,7 +57,8 @@ func testEnsureChain(t *testing.T, protocol Protocol) { func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := New(&fexec, protocol) + runner := New(&fexec, dbus.NewFake(nil, nil), protocol) + defer runner.Destroy() // Success. exists, err := runner.EnsureChain(TableNAT, Chain("FOOBAR")) if err != nil { @@ -112,7 +115,8 @@ func TestFlushChain(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := New(&fexec, ProtocolIpv4) + runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4) + defer runner.Destroy() // Success. err := runner.FlushChain(TableNAT, Chain("FOOBAR")) if err != nil { @@ -149,7 +153,8 @@ func TestDeleteChain(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := New(&fexec, ProtocolIpv4) + runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4) + defer runner.Destroy() // Success. err := runner.DeleteChain(TableNAT, Chain("FOOBAR")) if err != nil { @@ -185,7 +190,8 @@ func TestEnsureRuleAlreadyExists(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := New(&fexec, ProtocolIpv4) + runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4) + defer runner.Destroy() exists, err := runner.EnsureRule(Append, TableNAT, ChainOutput, "abc", "123") if err != nil { t.Errorf("expected success, got %v", err) @@ -221,7 +227,8 @@ func TestEnsureRuleNew(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := New(&fexec, ProtocolIpv4) + runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4) + defer runner.Destroy() exists, err := runner.EnsureRule(Append, TableNAT, ChainOutput, "abc", "123") if err != nil { t.Errorf("expected success, got %v", err) @@ -254,7 +261,8 @@ func TestEnsureRuleErrorChecking(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := New(&fexec, ProtocolIpv4) + runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4) + defer runner.Destroy() _, err := runner.EnsureRule(Append, TableNAT, ChainOutput, "abc", "123") if err == nil { t.Errorf("expected failure") @@ -284,7 +292,8 @@ func TestEnsureRuleErrorCreating(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := New(&fexec, ProtocolIpv4) + runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4) + defer runner.Destroy() _, err := runner.EnsureRule(Append, TableNAT, ChainOutput, "abc", "123") if err == nil { t.Errorf("expected failure") @@ -311,7 +320,8 @@ func TestDeleteRuleAlreadyExists(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := New(&fexec, ProtocolIpv4) + runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4) + defer runner.Destroy() err := runner.DeleteRule(TableNAT, ChainOutput, "abc", "123") if err != nil { t.Errorf("expected success, got %v", err) @@ -344,7 +354,8 @@ func TestDeleteRuleNew(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := New(&fexec, ProtocolIpv4) + runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4) + defer runner.Destroy() err := runner.DeleteRule(TableNAT, ChainOutput, "abc", "123") if err != nil { t.Errorf("expected success, got %v", err) @@ -374,7 +385,8 @@ func TestDeleteRuleErrorChecking(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := New(&fexec, ProtocolIpv4) + runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4) + defer runner.Destroy() err := runner.DeleteRule(TableNAT, ChainOutput, "abc", "123") if err == nil { t.Errorf("expected failure") @@ -404,7 +416,8 @@ func TestDeleteRuleErrorCreating(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := New(&fexec, ProtocolIpv4) + runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4) + defer runner.Destroy() err := runner.DeleteRule(TableNAT, ChainOutput, "abc", "123") if err == nil { t.Errorf("expected failure") @@ -565,7 +578,8 @@ func TestWaitFlagUnavailable(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := New(&fexec, ProtocolIpv4) + runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4) + defer runner.Destroy() err := runner.DeleteChain(TableNAT, Chain("FOOBAR")) if err != nil { t.Errorf("expected success, got %v", err) @@ -593,7 +607,8 @@ func TestWaitFlagOld(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := New(&fexec, ProtocolIpv4) + runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4) + defer runner.Destroy() err := runner.DeleteChain(TableNAT, Chain("FOOBAR")) if err != nil { t.Errorf("expected success, got %v", err) @@ -624,7 +639,8 @@ func TestWaitFlagNew(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - runner := New(&fexec, ProtocolIpv4) + runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4) + defer runner.Destroy() err := runner.DeleteChain(TableNAT, Chain("FOOBAR")) if err != nil { t.Errorf("expected success, got %v", err) @@ -639,3 +655,114 @@ func TestWaitFlagNew(t *testing.T) { t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[1]) } } + +func TestReload(t *testing.T) { + dbusConn := dbus.NewFakeConnection() + dbusConn.SetBusObject(func(method string, args ...interface{}) ([]interface{}, error) { return nil, nil }) + dbusConn.AddObject(firewalldName, firewalldPath, func(method string, args ...interface{}) ([]interface{}, error) { return nil, nil }) + fdbus := dbus.NewFake(dbusConn, nil) + + reloaded := make(chan bool, 2) + + fcmd := exec.FakeCmd{ + CombinedOutputScript: []exec.FakeCombinedOutputAction{ + // iptables version check + func() ([]byte, error) { return []byte("iptables v1.4.22"), nil }, + + // first reload + // EnsureChain + func() ([]byte, error) { return []byte{}, nil }, + // EnsureRule abc check + func() ([]byte, error) { return []byte{}, &exec.FakeExitError{1} }, + // EnsureRule abc + func() ([]byte, error) { return []byte{}, nil }, + + // second reload + // EnsureChain + func() ([]byte, error) { return []byte{}, nil }, + // EnsureRule abc check + func() ([]byte, error) { return []byte{}, &exec.FakeExitError{1} }, + // EnsureRule abc + func() ([]byte, error) { return []byte{}, nil }, + }, + } + fexec := exec.FakeExec{ + CommandScript: []exec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + } + + runner := New(&fexec, fdbus, ProtocolIpv4) + defer runner.Destroy() + + runner.AddReloadFunc(func() { + exists, err := runner.EnsureChain(TableNAT, Chain("FOOBAR")) + if err != nil { + t.Errorf("expected success, got %v", err) + } + if exists { + t.Errorf("expected exists = false") + } + reloaded <- true + }) + + runner.AddReloadFunc(func() { + exists, err := runner.EnsureRule(Append, TableNAT, ChainOutput, "abc", "123") + if err != nil { + t.Errorf("expected success, got %v", err) + } + if exists { + t.Errorf("expected exists = false") + } + reloaded <- true + }) + + dbusConn.EmitSignal("org.freedesktop.DBus", "/org/freedesktop/DBus", "org.freedesktop.DBus", "NameOwnerChanged", firewalldName, "", ":1.1") + <-reloaded + <-reloaded + + if fcmd.CombinedOutputCalls != 4 { + t.Errorf("expected 4 CombinedOutput() calls total, got %d", fcmd.CombinedOutputCalls) + } + if !sets.NewString(fcmd.CombinedOutputLog[1]...).HasAll("iptables", "-t", "nat", "-N", "FOOBAR") { + t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[1]) + } + if !sets.NewString(fcmd.CombinedOutputLog[2]...).HasAll("iptables", "-t", "nat", "-C", "OUTPUT", "abc", "123") { + t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[2]) + } + if !sets.NewString(fcmd.CombinedOutputLog[3]...).HasAll("iptables", "-t", "nat", "-A", "OUTPUT", "abc", "123") { + t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[3]) + } + + go func() { time.Sleep(time.Second / 100); reloaded <- true }() + dbusConn.EmitSignal(firewalldName, firewalldPath, firewalldInterface, "DefaultZoneChanged", "public") + dbusConn.EmitSignal("org.freedesktop.DBus", "/org/freedesktop/DBus", "org.freedesktop.DBus", "NameOwnerChanged", "io.k8s.Something", "", ":1.1") + <-reloaded + + if fcmd.CombinedOutputCalls != 4 { + t.Errorf("Incorrect signal caused a reload") + } + + dbusConn.EmitSignal(firewalldName, firewalldPath, firewalldInterface, "Reloaded") + <-reloaded + <-reloaded + + if fcmd.CombinedOutputCalls != 7 { + t.Errorf("expected 7 CombinedOutput() calls total, got %d", fcmd.CombinedOutputCalls) + } + if !sets.NewString(fcmd.CombinedOutputLog[4]...).HasAll("iptables", "-t", "nat", "-N", "FOOBAR") { + t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[4]) + } + if !sets.NewString(fcmd.CombinedOutputLog[5]...).HasAll("iptables", "-t", "nat", "-C", "OUTPUT", "abc", "123") { + t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[5]) + } + if !sets.NewString(fcmd.CombinedOutputLog[6]...).HasAll("iptables", "-t", "nat", "-A", "OUTPUT", "abc", "123") { + t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[6]) + } +}