diff --git a/pkg/kubelet/container/runtime.go b/pkg/kubelet/container/runtime.go index 00110af2845..e7510359c39 100644 --- a/pkg/kubelet/container/runtime.go +++ b/pkg/kubelet/container/runtime.go @@ -97,6 +97,12 @@ type Runtime interface { RemoveImage(image ImageSpec) error // Returns Image statistics. ImageStats() (*ImageStats, error) + // Returns the filesystem path of the pod's network namespace; if the + // runtime does not handle namespace creation itself, or cannot return + // the network namespace path, it should return an error. + // TODO: Change ContainerID to a Pod ID since the namespace is shared + // by all containers in the pod. + GetNetNS(containerID ContainerID) (string, error) // TODO(vmarmol): Unify pod and containerID args. // GetContainerLogs returns logs of a specific container. By // default, it returns a snapshot of the container log. Set 'follow' to true to diff --git a/pkg/kubelet/container/testing/fake_runtime.go b/pkg/kubelet/container/testing/fake_runtime.go index 093b7654250..db2aeacf3e5 100644 --- a/pkg/kubelet/container/testing/fake_runtime.go +++ b/pkg/kubelet/container/testing/fake_runtime.go @@ -338,6 +338,14 @@ func (f *FakeRuntime) PortForward(pod *Pod, port uint16, stream io.ReadWriteClos return f.Err } +func (f *FakeRuntime) GetNetNS(containerID ContainerID) (string, error) { + f.Lock() + defer f.Unlock() + + f.CalledFunctions = append(f.CalledFunctions, "GetNetNS") + return "", f.Err +} + func (f *FakeRuntime) GarbageCollect(gcPolicy ContainerGCPolicy) error { f.Lock() defer f.Unlock() diff --git a/pkg/kubelet/container/testing/runtime_mock.go b/pkg/kubelet/container/testing/runtime_mock.go index 5f01bc1414e..3f269249b39 100644 --- a/pkg/kubelet/container/testing/runtime_mock.go +++ b/pkg/kubelet/container/testing/runtime_mock.go @@ -128,6 +128,11 @@ func (r *Mock) PortForward(pod *Pod, port uint16, stream io.ReadWriteCloser) err return args.Error(0) } +func (r *Mock) GetNetNS(containerID ContainerID) (string, error) { + args := r.Called(containerID) + return "", args.Error(0) +} + func (r *Mock) GarbageCollect(gcPolicy ContainerGCPolicy) error { args := r.Called(gcPolicy) return args.Error(0) diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index f2082144a06..fea46a5a52b 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -1959,7 +1959,7 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, _ api.PodStatus, podStatus *kubec } if dm.configureHairpinMode { - if err = hairpin.SetUpContainer(podInfraContainer.State.Pid, network.DefaultInterfaceName); err != nil { + if err = hairpin.SetUpContainerPid(podInfraContainer.State.Pid, network.DefaultInterfaceName); err != nil { glog.Warningf("Hairpin setup failed for pod %q: %v", format.Pod(pod), err) } } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index b7810e1f007..127548f50c6 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -439,6 +439,8 @@ func NewMainKubelet( klet.livenessManager, klet.volumeManager, klet.httpClient, + klet.networkPlugin, + klet.hairpinMode == componentconfig.HairpinVeth, utilexec.New(), kubecontainer.RealOS{}, imageBackOff, diff --git a/pkg/kubelet/network/hairpin/hairpin.go b/pkg/kubelet/network/hairpin/hairpin.go index cf5d2937563..b725fbba3af 100644 --- a/pkg/kubelet/network/hairpin/hairpin.go +++ b/pkg/kubelet/network/hairpin/hairpin.go @@ -40,13 +40,23 @@ var ( ethtoolOutputRegex = regexp.MustCompile("peer_ifindex: (\\d+)") ) -func SetUpContainer(containerPid int, containerInterfaceName string) error { - e := exec.New() - return setUpContainerInternal(e, containerPid, containerInterfaceName) +func SetUpContainerPid(containerPid int, containerInterfaceName string) error { + pidStr := fmt.Sprintf("%d", containerPid) + nsenterArgs := []string{"-t", pidStr, "-n"} + return setUpContainerInternal(containerInterfaceName, pidStr, nsenterArgs) } -func setUpContainerInternal(e exec.Interface, containerPid int, containerInterfaceName string) error { - hostIfName, err := findPairInterfaceOfContainerInterface(e, containerPid, containerInterfaceName) +func SetUpContainerPath(netnsPath string, containerInterfaceName string) error { + if netnsPath[0] != '/' { + return fmt.Errorf("netnsPath path '%s' was invalid", netnsPath) + } + nsenterArgs := []string{"-n", netnsPath} + return setUpContainerInternal(containerInterfaceName, netnsPath, nsenterArgs) +} + +func setUpContainerInternal(containerInterfaceName, containerDesc string, nsenterArgs []string) error { + e := exec.New() + hostIfName, err := findPairInterfaceOfContainerInterface(e, containerInterfaceName, containerDesc, nsenterArgs) if err != nil { glog.Infof("Unable to find pair interface, setting up all interfaces: %v", err) return setUpAllInterfaces() @@ -54,7 +64,7 @@ func setUpContainerInternal(e exec.Interface, containerPid int, containerInterfa return setUpInterface(hostIfName) } -func findPairInterfaceOfContainerInterface(e exec.Interface, containerPid int, containerInterfaceName string) (string, error) { +func findPairInterfaceOfContainerInterface(e exec.Interface, containerInterfaceName, containerDesc string, nsenterArgs []string) (string, error) { nsenterPath, err := e.LookPath("nsenter") if err != nil { return "", err @@ -63,15 +73,16 @@ func findPairInterfaceOfContainerInterface(e exec.Interface, containerPid int, c if err != nil { return "", err } - // Get container's interface index - output, err := e.Command(nsenterPath, "-t", fmt.Sprintf("%d", containerPid), "-n", "-F", "--", ethtoolPath, "--statistics", containerInterfaceName).CombinedOutput() + + nsenterArgs = append(nsenterArgs, "-F", "--", ethtoolPath, "--statistics", containerInterfaceName) + output, err := e.Command(nsenterPath, nsenterArgs...).CombinedOutput() if err != nil { - return "", fmt.Errorf("Unable to query interface %s of container %d: %v: %s", containerInterfaceName, containerPid, err, string(output)) + return "", fmt.Errorf("Unable to query interface %s of container %s: %v: %s", containerInterfaceName, containerDesc, err, string(output)) } // look for peer_ifindex match := ethtoolOutputRegex.FindSubmatch(output) if match == nil { - return "", fmt.Errorf("No peer_ifindex in interface statistics for %s of container %d", containerInterfaceName, containerPid) + return "", fmt.Errorf("No peer_ifindex in interface statistics for %s of container %s", containerInterfaceName, containerDesc) } peerIfIndex, err := strconv.Atoi(string(match[1])) if err != nil { // seems impossible (\d+ not numeric) diff --git a/pkg/kubelet/network/hairpin/hairpin_test.go b/pkg/kubelet/network/hairpin/hairpin_test.go index 63bc9ef5bad..b94c7b997cb 100644 --- a/pkg/kubelet/network/hairpin/hairpin_test.go +++ b/pkg/kubelet/network/hairpin/hairpin_test.go @@ -69,7 +69,8 @@ func TestFindPairInterfaceOfContainerInterface(t *testing.T) { return fmt.Sprintf("/fake-bin/%s", file), nil }, } - name, err := findPairInterfaceOfContainerInterface(&fexec, 123, "eth0") + nsenterArgs := []string{"-t", "123", "-n"} + name, err := findPairInterfaceOfContainerInterface(&fexec, "eth0", "123", nsenterArgs) if test.expectErr { if err == nil { t.Errorf("unexpected non-error") diff --git a/pkg/kubelet/network/kubenet/kubenet_linux.go b/pkg/kubelet/network/kubenet/kubenet_linux.go index 0c85212d927..c274e9b56b4 100644 --- a/pkg/kubelet/network/kubenet/kubenet_linux.go +++ b/pkg/kubelet/network/kubenet/kubenet_linux.go @@ -31,10 +31,10 @@ import ( "github.com/vishvananda/netlink/nl" "github.com/appc/cni/libcni" + cnitypes "github.com/appc/cni/pkg/types" "github.com/golang/glog" "k8s.io/kubernetes/pkg/apis/componentconfig" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" - "k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/util/bandwidth" utilexec "k8s.io/kubernetes/pkg/util/exec" @@ -55,6 +55,7 @@ type kubenetNetworkPlugin struct { host network.Host netConfig *libcni.NetworkConfig + loConfig *libcni.NetworkConfig cniConfig *libcni.CNIConfig shaper bandwidth.BandwidthShaper podCIDRs map[kubecontainer.ContainerID]string @@ -94,10 +95,20 @@ func (plugin *kubenetNetworkPlugin) Init(host network.Host, hairpinMode componen // was built-in, we simply ignore the error here. A better thing to do is // to check the kernel version in the future. plugin.execer.Command("modprobe", "br-netfilter").CombinedOutput() - if err := utilsysctl.SetSysctl(sysctlBridgeCallIptables, 1); err != nil { + err := utilsysctl.SetSysctl(sysctlBridgeCallIptables, 1) + if err != nil { glog.Warningf("can't set sysctl %s: %v", sysctlBridgeCallIptables, err) } + plugin.loConfig, err = libcni.ConfFromBytes([]byte(`{ + "cniVersion": "0.1.0", + "name": "kubenet-loopback", + "type": "loopback" +}`)) + if err != nil { + return fmt.Errorf("Failed to generate loopback config: %v", err) + } + return nil } @@ -250,7 +261,7 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k start := time.Now() defer func() { - glog.V(4).Infof("TearDownPod took %v for %s/%s", time.Since(start), namespace, name) + glog.V(4).Infof("SetUpPod took %v for %s/%s", time.Since(start), namespace, name) }() pod, ok := plugin.host.GetPodByName(namespace, name) @@ -266,24 +277,21 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k return fmt.Errorf("Kubenet cannot SetUpPod: %v", err) } - runtime, ok := plugin.host.GetRuntime().(*dockertools.DockerManager) - if !ok { - return fmt.Errorf("Kubenet execution called on non-docker runtime") - } - netnsPath, err := runtime.GetNetNS(id) - if err != nil { - return fmt.Errorf("Kubenet failed to retrieve network namespace path: %v", err) - } - - rt := buildCNIRuntimeConf(name, namespace, id, netnsPath) - if err != nil { - return fmt.Errorf("Error building CNI config: %v", err) - } - - if err = plugin.addContainerToNetwork(id, rt); err != nil { + // Bring up container loopback interface + if _, err := plugin.addContainerToNetwork(plugin.loConfig, "lo", namespace, name, id); err != nil { return err } + // Hook container up with our bridge + res, err := plugin.addContainerToNetwork(plugin.netConfig, network.DefaultInterfaceName, namespace, name, id) + if err != nil { + return err + } + if res.IP4 == nil || res.IP4.IP.String() == "" { + return fmt.Errorf("CNI plugin reported no IPv4 address for container %v.", id) + } + plugin.podCIDRs[id] = res.IP4.IP.String() + // Put the container bridge into promiscuous mode to force it to accept hairpin packets. // TODO: Remove this once the kernel bug (#20096) is fixed. // TODO: check and set promiscuous mode with netlink once vishvananda/netlink supports it @@ -330,20 +338,6 @@ func (plugin *kubenetNetworkPlugin) TearDownPod(namespace string, name string, i return fmt.Errorf("Kubenet needs a PodCIDR to tear down pods") } - runtime, ok := plugin.host.GetRuntime().(*dockertools.DockerManager) - if !ok { - return fmt.Errorf("Kubenet execution called on non-docker runtime") - } - netnsPath, err := runtime.GetNetNS(id) - if err != nil { - return err - } - - rt := buildCNIRuntimeConf(name, namespace, id, netnsPath) - if err != nil { - return fmt.Errorf("Error building CNI config: %v", err) - } - // no cached CIDR is Ok during teardown if cidr, ok := plugin.podCIDRs[id]; ok { glog.V(5).Infof("Removing pod CIDR %s from shaper", cidr) @@ -354,9 +348,10 @@ func (plugin *kubenetNetworkPlugin) TearDownPod(namespace string, name string, i } } } - if err = plugin.delContainerFromNetwork(id, rt); err != nil { + if err := plugin.delContainerFromNetwork(plugin.netConfig, network.DefaultInterfaceName, namespace, name, id); err != nil { return err } + delete(plugin.podCIDRs, id) return nil } @@ -373,12 +368,8 @@ func (plugin *kubenetNetworkPlugin) GetPodNetworkStatus(namespace string, name s return &network.PodNetworkStatus{IP: ip}, nil } } - // TODO: remove type conversion once kubenet supports multiple runtime - runtime, ok := plugin.host.GetRuntime().(*dockertools.DockerManager) - if !ok { - return nil, fmt.Errorf("Kubenet execution called on non-docker runtime") - } - netnsPath, err := runtime.GetNetNS(id) + + netnsPath, err := plugin.host.GetRuntime().GetNetNS(id) if err != nil { return nil, fmt.Errorf("Kubenet failed to retrieve network namespace path: %v", err) } @@ -412,37 +403,43 @@ func (plugin *kubenetNetworkPlugin) Status() error { return nil } -func buildCNIRuntimeConf(podName string, podNs string, podInfraContainerID kubecontainer.ContainerID, podNetnsPath string) *libcni.RuntimeConf { - glog.V(4).Infof("Kubenet: using netns path %v", podNetnsPath) - glog.V(4).Infof("Kubenet: using podns path %v", podNs) +func (plugin *kubenetNetworkPlugin) buildCNIRuntimeConf(ifName string, id kubecontainer.ContainerID) (*libcni.RuntimeConf, error) { + netnsPath, err := plugin.host.GetRuntime().GetNetNS(id) + if err != nil { + return nil, fmt.Errorf("Kubenet failed to retrieve network namespace path: %v", err) + } return &libcni.RuntimeConf{ - ContainerID: podInfraContainerID.ID, - NetNS: podNetnsPath, - IfName: network.DefaultInterfaceName, - } + ContainerID: id.ID, + NetNS: netnsPath, + IfName: ifName, + }, nil } -func (plugin *kubenetNetworkPlugin) addContainerToNetwork(id kubecontainer.ContainerID, rt *libcni.RuntimeConf) error { - glog.V(3).Infof("Calling cni plugins to add container to network with cni runtime: %+v", rt) - res, err := plugin.cniConfig.AddNetwork(plugin.netConfig, rt) +func (plugin *kubenetNetworkPlugin) addContainerToNetwork(config *libcni.NetworkConfig, ifName, namespace, name string, id kubecontainer.ContainerID) (*cnitypes.Result, error) { + rt, err := plugin.buildCNIRuntimeConf(ifName, id) if err != nil { - return fmt.Errorf("Error adding container to network: %v", err) - } - if res.IP4 == nil || res.IP4.IP.String() == "" { - return fmt.Errorf("CNI plugin reported no IPv4 address for container %v.", id) + return nil, fmt.Errorf("Error building CNI config: %v", err) } - plugin.podCIDRs[id] = res.IP4.IP.String() - return nil + glog.V(3).Infof("Adding %s/%s to '%s' with CNI '%s' plugin and runtime: %+v", namespace, name, config.Network.Name, config.Network.Type, rt) + res, err := plugin.cniConfig.AddNetwork(config, rt) + if err != nil { + return nil, fmt.Errorf("Error adding container to network: %v", err) + } + return res, nil } -func (plugin *kubenetNetworkPlugin) delContainerFromNetwork(id kubecontainer.ContainerID, rt *libcni.RuntimeConf) error { - glog.V(3).Infof("Calling cni plugins to remove container from network with cni runtime: %+v", rt) - if err := plugin.cniConfig.DelNetwork(plugin.netConfig, rt); err != nil { +func (plugin *kubenetNetworkPlugin) delContainerFromNetwork(config *libcni.NetworkConfig, ifName, namespace, name string, id kubecontainer.ContainerID) error { + rt, err := plugin.buildCNIRuntimeConf(ifName, id) + if err != nil { + return fmt.Errorf("Error building CNI config: %v", err) + } + + glog.V(3).Infof("Removing %s/%s from '%s' with CNI '%s' plugin and runtime: %+v", namespace, name, config.Network.Name, config.Network.Type, rt) + if err := plugin.cniConfig.DelNetwork(config, rt); err != nil { return fmt.Errorf("Error removing container from network: %v", err) } - delete(plugin.podCIDRs, id) return nil } diff --git a/pkg/kubelet/network/kubenet/kubenet_linux_test.go b/pkg/kubelet/network/kubenet/kubenet_linux_test.go index 6d31673bbb8..a16e11e7fa7 100644 --- a/pkg/kubelet/network/kubenet/kubenet_linux_test.go +++ b/pkg/kubelet/network/kubenet/kubenet_linux_test.go @@ -17,6 +17,8 @@ limitations under the License. package kubenet import ( + "fmt" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/network" nettest "k8s.io/kubernetes/pkg/kubelet/network/testing" @@ -38,9 +40,6 @@ func TestGetPodNetworkStatus(t *testing.T) { podIPMap[kubecontainer.ContainerID{ID: "1"}] = "10.245.0.2/32" podIPMap[kubecontainer.ContainerID{ID: "2"}] = "10.245.0.3/32" - fhost := nettest.NewFakeHost(nil) - fakeKubenet := newFakeKubenetPlugin(podIPMap, nil, fhost) - testCases := []struct { id string expectError bool @@ -66,6 +65,34 @@ func TestGetPodNetworkStatus(t *testing.T) { //TODO: add test cases for retrieving ip inside container network namespace } + fakeCmds := make([]exec.FakeCommandAction, 0) + for _, t := range testCases { + // the fake commands return the IP from the given index, or an error + fCmd := exec.FakeCmd{ + CombinedOutputScript: []exec.FakeCombinedOutputAction{ + func() ([]byte, error) { + ip, ok := podIPMap[kubecontainer.ContainerID{ID: t.id}] + if !ok { + return nil, fmt.Errorf("Pod IP %q not found", t.id) + } + return []byte(ip), nil + }, + }, + } + fakeCmds = append(fakeCmds, func(cmd string, args ...string) exec.Cmd { + return exec.InitFakeCmd(&fCmd, cmd, args...) + }) + } + fexec := exec.FakeExec{ + CommandScript: fakeCmds, + LookPathFunc: func(file string) (string, error) { + return fmt.Sprintf("/fake-bin/%s", file), nil + }, + } + + fhost := nettest.NewFakeHost(nil) + fakeKubenet := newFakeKubenetPlugin(podIPMap, &fexec, fhost) + for i, tc := range testCases { out, err := fakeKubenet.GetPodNetworkStatus("", "", kubecontainer.ContainerID{ID: tc.id}) if tc.expectError { diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index a075631cb50..376e10b2c67 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -45,6 +45,8 @@ import ( "k8s.io/kubernetes/pkg/credentialprovider" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/lifecycle" + "k8s.io/kubernetes/pkg/kubelet/network" + "k8s.io/kubernetes/pkg/kubelet/network/hairpin" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/util/format" @@ -74,8 +76,9 @@ const ( kubernetesUnitPrefix = "k8s_" unitKubernetesSection = "X-Kubernetes" - unitPodName = "POD" - unitRktID = "RktID" + unitPodUID = "PodUID" + unitPodName = "PodName" + unitPodNamespace = "PodNamespace" unitRestartCount = "RestartCount" k8sRktKubeletAnno = "rkt.kubernetes.io/managed-by-kubelet" @@ -133,8 +136,17 @@ type Runtime struct { execer utilexec.Interface os kubecontainer.OSInterface + // Network plugin. + networkPlugin network.NetworkPlugin + + // If true, the "hairpin mode" flag is set on container interfaces. + // A false value means the kubelet just backs off from setting it, + // it might already be true. + configureHairpinMode bool + // used for a systemd Exec, which requires the full path. - touchPath string + touchPath string + nsenterPath string versions versions } @@ -171,6 +183,8 @@ func New( livenessManager proberesults.Manager, volumeGetter volumeGetter, httpClient kubetypes.HttpGetter, + networkPlugin network.NetworkPlugin, + hairpinMode bool, execer utilexec.Interface, os kubecontainer.OSInterface, imageBackOff *flowcontrol.Backoff, @@ -203,6 +217,11 @@ func New( return nil, fmt.Errorf("cannot find touch binary: %v", err) } + nsenterPath, err := execer.LookPath("nsenter") + if err != nil { + return nil, fmt.Errorf("cannot find nsenter binary: %v", err) + } + rkt := &Runtime{ os: kubecontainer.RealOS{}, systemd: systemd, @@ -216,8 +235,10 @@ func New( recorder: recorder, livenessManager: livenessManager, volumeGetter: volumeGetter, + networkPlugin: networkPlugin, execer: execer, touchPath: touchPath, + nsenterPath: nsenterPath, } rkt.config, err = rkt.getConfig(rkt.config) @@ -862,23 +883,22 @@ func serviceFilePath(serviceName string) string { } // generateRunCommand crafts a 'rkt run-prepared' command with necessary parameters. -func (r *Runtime) generateRunCommand(pod *api.Pod, uuid string) (string, error) { +func (r *Runtime) generateRunCommand(pod *api.Pod, uuid, netnsName string) (string, error) { runPrepared := r.buildCommand("run-prepared").Args + // Network namespace set up in kubelet; rkt networking not used + runPrepared = append(runPrepared, "--net=host") + var hostname string var err error - // Setup network configuration. - if kubecontainer.IsHostNetworkPod(pod) { - runPrepared = append(runPrepared, "--net=host") - + // Setup DNS and hostname configuration. + if len(netnsName) == 0 { // TODO(yifan): Let runtimeHelper.GeneratePodHostNameAndDomain() to handle this. hostname, err = r.os.Hostname() if err != nil { return "", err } } else { - runPrepared = append(runPrepared, fmt.Sprintf("--net=%s", defaultNetworkName)) - // Setup DNS. dnsServers, dnsSearches, err := r.runtimeHelper.GetClusterDNS(pod) if err != nil { @@ -899,12 +919,37 @@ func (r *Runtime) generateRunCommand(pod *api.Pod, uuid string) (string, error) if err != nil { return "", err } + + // Drop the `rkt run-prepared` into the network namespace we + // created. + // TODO: switch to 'ip netns exec' once we can depend on a new + // enough version that doesn't have bugs like + // https://bugzilla.redhat.com/show_bug.cgi?id=882047 + nsenterExec := []string{r.nsenterPath, "--net=\"" + netnsPathFromName(netnsName) + "\"", "--"} + runPrepared = append(nsenterExec, runPrepared...) } + runPrepared = append(runPrepared, fmt.Sprintf("--hostname=%s", hostname)) runPrepared = append(runPrepared, uuid) return strings.Join(runPrepared, " "), nil } +func (r *Runtime) cleanupPodNetwork(pod *api.Pod) error { + glog.V(3).Infof("Calling network plugin %s to tear down pod for %s", r.networkPlugin.Name(), format.Pod(pod)) + + var teardownErr error + containerID := kubecontainer.ContainerID{ID: string(pod.UID)} + if err := r.networkPlugin.TearDownPod(pod.Namespace, pod.Name, containerID); err != nil { + teardownErr = fmt.Errorf("rkt: failed to tear down network for pod %s: %v", format.Pod(pod), err) + } + + if _, err := r.execer.Command("ip", "netns", "del", makePodNetnsName(pod.UID)).Output(); err != nil { + return fmt.Errorf("rkt: Failed to remove network namespace for pod %s: %v", format.Pod(pod), err) + } + + return teardownErr +} + // preparePod will: // // 1. Invoke 'rkt prepare' to prepare the pod, and get the rkt pod uuid. @@ -912,7 +957,7 @@ func (r *Runtime) generateRunCommand(pod *api.Pod, uuid string) (string, error) // // On success, it will return a string that represents name of the unit file // and the runtime pod. -func (r *Runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, *kubecontainer.Pod, error) { +func (r *Runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret, netnsName string) (string, *kubecontainer.Pod, error) { // Generate the pod manifest from the pod spec. manifest, err := r.makePodManifest(pod, pullSecrets) if err != nil { @@ -957,7 +1002,7 @@ func (r *Runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, *k glog.V(4).Infof("'rkt prepare' returns %q", uuid) // Create systemd service file for the rkt pod. - runPrepared, err := r.generateRunCommand(pod, uuid) + runPrepared, err := r.generateRunCommand(pod, uuid, netnsName) if err != nil { return "", nil, fmt.Errorf("failed to generate 'rkt run-prepared' command: %v", err) } @@ -972,6 +1017,10 @@ func (r *Runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, *k newUnitOption("Service", "ExecStopPost", markPodFinished), // This enables graceful stop. newUnitOption("Service", "KillMode", "mixed"), + // Track pod info for garbage collection + newUnitOption(unitKubernetesSection, unitPodUID, string(pod.UID)), + newUnitOption(unitKubernetesSection, unitPodName, pod.Name), + newUnitOption(unitKubernetesSection, unitPodNamespace, pod.Namespace), } serviceName := makePodServiceFileName(uuid) @@ -1024,12 +1073,57 @@ func (r *Runtime) generateEvents(runtimePod *kubecontainer.Pod, reason string, f return } +func makePodNetnsName(podID types.UID) string { + return fmt.Sprintf("%s_%s", kubernetesUnitPrefix, string(podID)) +} + +func netnsPathFromName(netnsName string) string { + return fmt.Sprintf("/var/run/netns/%s", netnsName) +} + +func (r *Runtime) setupPodNetwork(pod *api.Pod) (string, error) { + netnsName := makePodNetnsName(pod.UID) + + // Create a new network namespace for the pod + r.execer.Command("ip", "netns", "del", netnsName).Output() + _, err := r.execer.Command("ip", "netns", "add", netnsName).Output() + if err != nil { + return "", fmt.Errorf("failed to create pod network namespace: %v", err) + } + + // Set up networking with the network plugin + glog.V(3).Infof("Calling network plugin %s to setup pod for %s", r.networkPlugin.Name(), format.Pod(pod)) + containerID := kubecontainer.ContainerID{ID: string(pod.UID)} + err = r.networkPlugin.SetUpPod(pod.Namespace, pod.Name, containerID) + if err != nil { + return "", fmt.Errorf("failed to set up pod network: %v", err) + } + + if r.configureHairpinMode { + if err = hairpin.SetUpContainerPath(netnsPathFromName(netnsName), network.DefaultInterfaceName); err != nil { + glog.Warningf("Hairpin setup failed for pod %q: %v", format.Pod(pod), err) + } + } + + return netnsName, nil +} + // RunPod first creates the unit file for a pod, and then // starts the unit over d-bus. func (r *Runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error { glog.V(4).Infof("Rkt starts to run pod: name %q.", format.Pod(pod)) - name, runtimePod, prepareErr := r.preparePod(pod, pullSecrets) + var err error + var netnsName string + if !kubecontainer.IsHostNetworkPod(pod) { + netnsName, err = r.setupPodNetwork(pod) + if err != nil { + r.cleanupPodNetwork(pod) + return err + } + } + + name, runtimePod, prepareErr := r.preparePod(pod, pullSecrets, netnsName) // Set container references and generate events. // If preparedPod fails, then send out 'failed' events for each container. @@ -1049,6 +1143,7 @@ func (r *Runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error { } if prepareErr != nil { + r.cleanupPodNetwork(pod) return prepareErr } @@ -1057,9 +1152,10 @@ func (r *Runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error { // RestartUnit has the same effect as StartUnit if the unit is not running, besides it can restart // a unit if the unit file is changed and reloaded. reschan := make(chan string) - _, err := r.systemd.RestartUnit(name, "replace", reschan) + _, err = r.systemd.RestartUnit(name, "replace", reschan) if err != nil { r.generateEvents(runtimePod, "Failed", err) + r.cleanupPodNetwork(pod) return err } @@ -1067,6 +1163,7 @@ func (r *Runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error { if res != "done" { err := fmt.Errorf("Failed to restart unit %q: %s", name, res) r.generateEvents(runtimePod, "Failed", err) + r.cleanupPodNetwork(pod) return err } @@ -1078,6 +1175,7 @@ func (r *Runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error { if errKill := r.KillPod(pod, *runtimePod, nil); errKill != nil { return errors.NewAggregate([]error{err, errKill}) } + r.cleanupPodNetwork(pod) return err } @@ -1364,6 +1462,21 @@ func (r *Runtime) KillPod(pod *api.Pod, runningPod kubecontainer.Pod, gracePerio return err } + // Clean up networking; use running pod details since 'pod' can be nil + if pod == nil || !kubecontainer.IsHostNetworkPod(pod) { + err := r.cleanupPodNetwork(&api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: runningPod.ID, + Name: runningPod.Name, + Namespace: runningPod.Namespace, + }, + }) + if err != nil { + glog.Errorf("rkt: failed to tear down network for unit %q: %v", serviceName, err) + return err + } + } + return nil } @@ -1488,6 +1601,50 @@ func podIsActive(pod *rktapi.Pod) bool { pod.State == rktapi.PodState_POD_STATE_RUNNING } +// GetNetNS returns the network namespace path for the given container +func (r *Runtime) GetNetNS(containerID kubecontainer.ContainerID) (string, error) { + // This is a slight hack, kubenet shouldn't be asking us about a container id + // but a pod id. This is because it knows too much about the infra container. + // We pretend the pod.UID is an infra container ID. + // This deception is only possible because we played the same trick in + // `networkPlugin.SetUpPod` and `networkPlugin.TearDownPod`. + return netnsPathFromName(makePodNetnsName(types.UID(containerID.ID))), nil +} + +func podDetailsFromServiceFile(serviceFilePath string) (string, string, string, error) { + f, err := os.Open(serviceFilePath) + if err != nil { + return "", "", "", err + } + defer f.Close() + + opts, err := unit.Deserialize(f) + if err != nil { + return "", "", "", err + } + + var id, name, namespace string + for _, o := range opts { + if o.Section != unitKubernetesSection { + continue + } + switch o.Name { + case unitPodUID: + id = o.Value + case unitPodName: + name = o.Value + case unitPodNamespace: + namespace = o.Value + } + + if id != "" && name != "" && namespace != "" { + return id, name, namespace, nil + } + } + + return "", "", "", fmt.Errorf("failed to parse pod from file %s", serviceFilePath) +} + // GarbageCollect collects the pods/containers. // After one GC iteration: // - The deleted pods will be removed. @@ -1548,7 +1705,13 @@ func (r *Runtime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy) error rktUUID := getRktUUIDFromServiceFileName(serviceName) if _, ok := allPods[rktUUID]; !ok { glog.V(4).Infof("rkt: No rkt pod found for service file %q, will remove it", serviceName) - if err := r.os.Remove(serviceFilePath(serviceName)); err != nil { + + serviceFile := serviceFilePath(serviceName) + + // Network may not be around anymore so errors are ignored + r.cleanupPodNetworkFromServiceFile(serviceFile) + + if err := r.os.Remove(serviceFile); err != nil { errlist = append(errlist, fmt.Errorf("rkt: Failed to remove service file %q: %v", serviceName, err)) } } @@ -1581,18 +1744,39 @@ func (r *Runtime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy) error return errors.NewAggregate(errlist) } +// Read kubernetes pod UUID, namespace, and name from systemd service file and +// use that to clean up any pod network that may still exist. +func (r *Runtime) cleanupPodNetworkFromServiceFile(serviceFilePath string) { + id, name, namespace, err := podDetailsFromServiceFile(serviceFilePath) + if err == nil { + r.cleanupPodNetwork(&api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: types.UID(id), + Name: name, + Namespace: namespace, + }, + }) + } +} + // removePod calls 'rkt rm $UUID' to delete a rkt pod, it also remove the systemd service file // related to the pod. func (r *Runtime) removePod(uuid string) error { var errlist []error glog.V(4).Infof("rkt: GC is removing pod %q", uuid) + + serviceName := makePodServiceFileName(uuid) + serviceFile := serviceFilePath(serviceName) + + // Network may not be around anymore so errors are ignored + r.cleanupPodNetworkFromServiceFile(serviceFile) + if _, err := r.cli.RunCommand("rm", uuid); err != nil { errlist = append(errlist, fmt.Errorf("rkt: Failed to remove pod %q: %v", uuid, err)) } // GC systemd service files as well. - serviceName := makePodServiceFileName(uuid) - if err := r.os.Remove(serviceFilePath(serviceName)); err != nil { + if err := r.os.Remove(serviceFile); err != nil { errlist = append(errlist, fmt.Errorf("rkt: Failed to remove service file %q for pod %q: %v", serviceName, uuid, err)) } diff --git a/pkg/kubelet/rkt/rkt_test.go b/pkg/kubelet/rkt/rkt_test.go index 944829be224..fc6eaafe114 100644 --- a/pkg/kubelet/rkt/rkt_test.go +++ b/pkg/kubelet/rkt/rkt_test.go @@ -1076,8 +1076,9 @@ func TestSetApp(t *testing.T) { func TestGenerateRunCommand(t *testing.T) { hostName := "test-hostname" tests := []struct { - pod *api.Pod - uuid string + pod *api.Pod + uuid string + netnsName string dnsServers []string dnsSearches []string @@ -1095,6 +1096,7 @@ func TestGenerateRunCommand(t *testing.T) { Spec: api.PodSpec{}, }, "rkt-uuid-foo", + "default", []string{}, []string{}, "", @@ -1109,11 +1111,12 @@ func TestGenerateRunCommand(t *testing.T) { }, }, "rkt-uuid-foo", + "default", []string{}, []string{}, "pod-hostname-foo", nil, - "/bin/rkt/rkt --insecure-options=image,ondisk --local-config=/var/rkt/local/data --dir=/var/data run-prepared --net=rkt.kubernetes.io --hostname=pod-hostname-foo rkt-uuid-foo", + " --net=\"/var/run/netns/default\" -- /bin/rkt/rkt --insecure-options=image,ondisk --local-config=/var/rkt/local/data --dir=/var/data run-prepared --net=host --hostname=pod-hostname-foo rkt-uuid-foo", }, // Case #2, returns no dns, with host-net. { @@ -1128,6 +1131,7 @@ func TestGenerateRunCommand(t *testing.T) { }, }, "rkt-uuid-foo", + "", []string{}, []string{}, "", @@ -1147,11 +1151,12 @@ func TestGenerateRunCommand(t *testing.T) { }, }, "rkt-uuid-foo", + "default", []string{"127.0.0.1"}, []string{"."}, "pod-hostname-foo", nil, - "/bin/rkt/rkt --insecure-options=image,ondisk --local-config=/var/rkt/local/data --dir=/var/data run-prepared --net=rkt.kubernetes.io --dns=127.0.0.1 --dns-search=. --dns-opt=ndots:5 --hostname=pod-hostname-foo rkt-uuid-foo", + " --net=\"/var/run/netns/default\" -- /bin/rkt/rkt --insecure-options=image,ondisk --local-config=/var/rkt/local/data --dir=/var/data run-prepared --net=host --dns=127.0.0.1 --dns-search=. --dns-opt=ndots:5 --hostname=pod-hostname-foo rkt-uuid-foo", }, // Case #4, returns no dns, dns searches, with host-network. { @@ -1166,6 +1171,7 @@ func TestGenerateRunCommand(t *testing.T) { }, }, "rkt-uuid-foo", + "", []string{"127.0.0.1"}, []string{"."}, "pod-hostname-foo", @@ -1189,7 +1195,7 @@ func TestGenerateRunCommand(t *testing.T) { testCaseHint := fmt.Sprintf("test case #%d", i) rkt.runtimeHelper = &fakeRuntimeHelper{tt.dnsServers, tt.dnsSearches, tt.hostName, "", tt.err} - result, err := rkt.generateRunCommand(tt.pod, tt.uuid) + result, err := rkt.generateRunCommand(tt.pod, tt.uuid, tt.netnsName) assert.Equal(t, tt.err, err, testCaseHint) assert.Equal(t, tt.expect, result, testCaseHint) }