diff --git a/pkg/kubelet/container_bridge.go b/pkg/kubelet/container_bridge.go index 52555023088..55a1f4827b0 100644 --- a/pkg/kubelet/container_bridge.go +++ b/pkg/kubelet/container_bridge.go @@ -18,6 +18,7 @@ package kubelet import ( "bytes" + "fmt" "net" "os" "os/exec" @@ -25,6 +26,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/iptables" ) var cidrRegexp = regexp.MustCompile(`inet ([0-9a-fA-F.:]*/[0-9]*)`) @@ -140,28 +142,17 @@ func cbr0CidrCorrect(wantCIDR *net.IPNet) bool { return wantCIDR.IP.Equal(cbr0IP) && bytes.Equal(wantCIDR.Mask, cbr0CIDR.Mask) } -// TODO(dawnchen): Using pkg/util/iptables -// nonMasqueradeCIDR is the CIDR for our internal IP range; traffic to IPs outside this range will use IP masquerade. -func ensureIPTablesMasqRule(nonMasqueradeCIDR string) error { - // Check if the MASQUERADE rule exist or not - if err := exec.Command("iptables", - "-t", "nat", - "-C", "POSTROUTING", - "!", "-d", nonMasqueradeCIDR, +// nonMasqueradeCIDR is the CIDR for our internal IP range; traffic to IPs +// outside this range will use IP masquerade. +func ensureIPTablesMasqRule(client iptables.Interface, nonMasqueradeCIDR string) error { + if _, err := client.EnsureRule(iptables.Append, iptables.TableNAT, + iptables.ChainPostrouting, + "-m", "comment", "--comment", "kubelet: SNAT outbound cluster traffic", "-m", "addrtype", "!", "--dst-type", "LOCAL", - "-j", "MASQUERADE").Run(); err == nil { - // The MASQUERADE rule exists - return nil - } - - glog.Infof("MASQUERADE rule doesn't exist, recreate it (with nonMasqueradeCIDR %s)", nonMasqueradeCIDR) - if err := exec.Command("iptables", - "-t", "nat", - "-A", "POSTROUTING", "!", "-d", nonMasqueradeCIDR, - "-m", "addrtype", "!", "--dst-type", "LOCAL", - "-j", "MASQUERADE").Run(); err != nil { - return err + "-j", "MASQUERADE"); err != nil { + return fmt.Errorf("Failed to ensure masquerading for %s chain %s: %v", + iptables.TableNAT, iptables.ChainPostrouting, err) } return nil } diff --git a/pkg/kubelet/container_bridge_test.go b/pkg/kubelet/container_bridge_test.go new file mode 100644 index 00000000000..b7af6d89ccf --- /dev/null +++ b/pkg/kubelet/container_bridge_test.go @@ -0,0 +1,154 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "testing" + + "k8s.io/kubernetes/pkg/util/dbus" + "k8s.io/kubernetes/pkg/util/exec" + "k8s.io/kubernetes/pkg/util/iptables" + "k8s.io/kubernetes/pkg/util/sets" +) + +func TestEnsureIPTablesMasqRuleNew(t *testing.T) { + fcmd := exec.FakeCmd{ + CombinedOutputScript: []exec.FakeCombinedOutputAction{ + // iptables version check + func() ([]byte, error) { return []byte("iptables v1.9.22"), nil }, + // Status 1 on the first call. + func() ([]byte, error) { return nil, &exec.FakeExitError{Status: 1} }, + // Success on the second call. + func() ([]byte, error) { return []byte{}, nil }, + }, + } + fexec := exec.FakeExec{ + CommandScript: []exec.FakeCommandAction{ + // iptables version check + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + // The second Command() call is checking the rule. Failure of that means create it. + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + } + runner := iptables.New(&fexec, dbus.NewFake(nil, nil), iptables.ProtocolIpv4) + defer runner.Destroy() + err := ensureIPTablesMasqRule(runner, "127.0.0.0/8") + if err != nil { + t.Errorf("expected success, got %v", err) + } + if fcmd.CombinedOutputCalls != 3 { + t.Errorf("expected 3 CombinedOutput() calls, got %d", fcmd.CombinedOutputCalls) + } + if !sets.NewString(fcmd.CombinedOutputLog[2]...).HasAll("iptables", "-t", "nat", "-A", "POSTROUTING", + "-m", "comment", "--comment", "kubelet: SNAT outbound cluster traffic", + "!", "-d", "127.0.0.0/8", "-j", "MASQUERADE") { + t.Errorf("wrong CombinedOutput() log, got %#v", fcmd.CombinedOutputLog[2]) + } +} + +func TestEnsureIPTablesMasqRuleAlreadyExists(t *testing.T) { + fcmd := exec.FakeCmd{ + CombinedOutputScript: []exec.FakeCombinedOutputAction{ + // iptables version check + func() ([]byte, error) { return []byte("iptables v1.9.22"), nil }, + // Success. + func() ([]byte, error) { return []byte{}, nil }, + }, + } + fexec := exec.FakeExec{ + CommandScript: []exec.FakeCommandAction{ + // iptables version check + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + // The second Command() call is checking the rule. Success of that exec means "done". + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + } + runner := iptables.New(&fexec, dbus.NewFake(nil, nil), iptables.ProtocolIpv4) + defer runner.Destroy() + err := ensureIPTablesMasqRule(runner, "127.0.0.0/8") + if err != nil { + t.Errorf("expected success, got %v", err) + } + if fcmd.CombinedOutputCalls != 2 { + t.Errorf("expected 2 CombinedOutput() calls, got %d", fcmd.CombinedOutputCalls) + } + if !sets.NewString(fcmd.CombinedOutputLog[1]...).HasAll("iptables", "-t", "nat", "-C", "POSTROUTING", + "-m", "comment", "--comment", "kubelet: SNAT outbound cluster traffic", + "!", "-d", "127.0.0.0/8", "-j", "MASQUERADE") { + t.Errorf("wrong CombinedOutput() log, got %#v", fcmd.CombinedOutputLog[1]) + } +} + +func TestEnsureIPTablesMasqRuleErrorChecking(t *testing.T) { + fcmd := exec.FakeCmd{ + CombinedOutputScript: []exec.FakeCombinedOutputAction{ + // iptables version check + func() ([]byte, error) { return []byte("iptables v1.9.22"), nil }, + // Status 2 on the first call. + func() ([]byte, error) { return nil, &exec.FakeExitError{Status: 2} }, + }, + } + fexec := exec.FakeExec{ + CommandScript: []exec.FakeCommandAction{ + // iptables version check + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + // The second Command() call is checking the rule. Failure of that means create it. + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + } + runner := iptables.New(&fexec, dbus.NewFake(nil, nil), iptables.ProtocolIpv4) + defer runner.Destroy() + err := ensureIPTablesMasqRule(runner, "127.0.0.0/8") + if err == nil { + t.Errorf("expected failure") + } + if fcmd.CombinedOutputCalls != 2 { + t.Errorf("expected 2 CombinedOutput() calls, got %d", fcmd.CombinedOutputCalls) + } +} + +func TestEnsureIPTablesMasqRuleErrorCreating(t *testing.T) { + fcmd := exec.FakeCmd{ + CombinedOutputScript: []exec.FakeCombinedOutputAction{ + // iptables version check + func() ([]byte, error) { return []byte("iptables v1.9.22"), nil }, + // Status 1 on the first call. + func() ([]byte, error) { return nil, &exec.FakeExitError{Status: 1} }, + // Status 1 on the second call. + func() ([]byte, error) { return nil, &exec.FakeExitError{Status: 1} }, + }, + } + fexec := exec.FakeExec{ + CommandScript: []exec.FakeCommandAction{ + // iptables version check + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + // The second Command() call is checking the rule. Failure of that means create it. + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + } + runner := iptables.New(&fexec, dbus.NewFake(nil, nil), iptables.ProtocolIpv4) + defer runner.Destroy() + err := ensureIPTablesMasqRule(runner, "127.0.0.0/8") + if err == nil { + t.Errorf("expected failure") + } + if fcmd.CombinedOutputCalls != 3 { + t.Errorf("expected 3 CombinedOutput() calls, got %d", fcmd.CombinedOutputCalls) + } +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 3d2896202fd..ba162ac38dd 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -75,10 +75,12 @@ import ( "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/bandwidth" "k8s.io/kubernetes/pkg/util/clock" + utildbus "k8s.io/kubernetes/pkg/util/dbus" utilerrors "k8s.io/kubernetes/pkg/util/errors" utilexec "k8s.io/kubernetes/pkg/util/exec" "k8s.io/kubernetes/pkg/util/flowcontrol" kubeio "k8s.io/kubernetes/pkg/util/io" + utilipt "k8s.io/kubernetes/pkg/util/iptables" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/oom" "k8s.io/kubernetes/pkg/util/procfs" @@ -360,6 +362,7 @@ func NewMainKubelet( enableCustomMetrics: enableCustomMetrics, babysitDaemons: babysitDaemons, enableControllerAttachDetach: enableControllerAttachDetach, + iptClient: utilipt.New(utilexec.New(), utildbus.New(), utilipt.ProtocolIpv4), } if klet.flannelExperimentalOverlay { @@ -560,6 +563,7 @@ type Kubelet struct { dockerClient dockertools.DockerInterface runtimeCache kubecontainer.RuntimeCache kubeClient clientset.Interface + iptClient utilipt.Interface rootDirectory string // podWorkers handle syncing Pods in response to events. diff --git a/pkg/kubelet/kubelet_network.go b/pkg/kubelet/kubelet_network.go index 59548dfe9fb..1a34efa5b70 100644 --- a/pkg/kubelet/kubelet_network.go +++ b/pkg/kubelet/kubelet_network.go @@ -255,7 +255,7 @@ func (kl *Kubelet) syncNetworkStatus() { } kl.updatePodCIDR(podCIDR) } - if err := ensureIPTablesMasqRule(kl.nonMasqueradeCIDR); err != nil { + if err := ensureIPTablesMasqRule(kl.iptClient, kl.nonMasqueradeCIDR); err != nil { err = fmt.Errorf("Error on adding ip table rules: %v", err) glog.Error(err) kl.runtimeState.setNetworkState(err)