From 55e6eb2ce366b6e66a131f5fd836c51f2beac6b7 Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Mon, 2 May 2016 17:27:41 -0500 Subject: [PATCH 1/6] Allow using netns path instead of container PID to change hairpin mode --- pkg/kubelet/dockertools/manager.go | 2 +- pkg/kubelet/network/hairpin/hairpin.go | 31 ++++++++++++++------- pkg/kubelet/network/hairpin/hairpin_test.go | 3 +- 3 files changed, 24 insertions(+), 12 deletions(-) diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index 10ac9e281f3..2258e7e222c 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -1957,7 +1957,7 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, _ api.PodStatus, podStatus *kubec } if dm.configureHairpinMode { - if err = hairpin.SetUpContainer(podInfraContainer.State.Pid, network.DefaultInterfaceName); err != nil { + if err = hairpin.SetUpContainerPid(podInfraContainer.State.Pid, network.DefaultInterfaceName); err != nil { glog.Warningf("Hairpin setup failed for pod %q: %v", format.Pod(pod), err) } } diff --git a/pkg/kubelet/network/hairpin/hairpin.go b/pkg/kubelet/network/hairpin/hairpin.go index cf5d2937563..b725fbba3af 100644 --- a/pkg/kubelet/network/hairpin/hairpin.go +++ b/pkg/kubelet/network/hairpin/hairpin.go @@ -40,13 +40,23 @@ var ( ethtoolOutputRegex = regexp.MustCompile("peer_ifindex: (\\d+)") ) -func SetUpContainer(containerPid int, containerInterfaceName string) error { - e := exec.New() - return setUpContainerInternal(e, containerPid, containerInterfaceName) +func SetUpContainerPid(containerPid int, containerInterfaceName string) error { + pidStr := fmt.Sprintf("%d", containerPid) + nsenterArgs := []string{"-t", pidStr, "-n"} + return setUpContainerInternal(containerInterfaceName, pidStr, nsenterArgs) } -func setUpContainerInternal(e exec.Interface, containerPid int, containerInterfaceName string) error { - hostIfName, err := findPairInterfaceOfContainerInterface(e, containerPid, containerInterfaceName) +func SetUpContainerPath(netnsPath string, containerInterfaceName string) error { + if netnsPath[0] != '/' { + return fmt.Errorf("netnsPath path '%s' was invalid", netnsPath) + } + nsenterArgs := []string{"-n", netnsPath} + return setUpContainerInternal(containerInterfaceName, netnsPath, nsenterArgs) +} + +func setUpContainerInternal(containerInterfaceName, containerDesc string, nsenterArgs []string) error { + e := exec.New() + hostIfName, err := findPairInterfaceOfContainerInterface(e, containerInterfaceName, containerDesc, nsenterArgs) if err != nil { glog.Infof("Unable to find pair interface, setting up all interfaces: %v", err) return setUpAllInterfaces() @@ -54,7 +64,7 @@ func setUpContainerInternal(e exec.Interface, containerPid int, containerInterfa return setUpInterface(hostIfName) } -func findPairInterfaceOfContainerInterface(e exec.Interface, containerPid int, containerInterfaceName string) (string, error) { +func findPairInterfaceOfContainerInterface(e exec.Interface, containerInterfaceName, containerDesc string, nsenterArgs []string) (string, error) { nsenterPath, err := e.LookPath("nsenter") if err != nil { return "", err @@ -63,15 +73,16 @@ func findPairInterfaceOfContainerInterface(e exec.Interface, containerPid int, c if err != nil { return "", err } - // Get container's interface index - output, err := e.Command(nsenterPath, "-t", fmt.Sprintf("%d", containerPid), "-n", "-F", "--", ethtoolPath, "--statistics", containerInterfaceName).CombinedOutput() + + nsenterArgs = append(nsenterArgs, "-F", "--", ethtoolPath, "--statistics", containerInterfaceName) + output, err := e.Command(nsenterPath, nsenterArgs...).CombinedOutput() if err != nil { - return "", fmt.Errorf("Unable to query interface %s of container %d: %v: %s", containerInterfaceName, containerPid, err, string(output)) + return "", fmt.Errorf("Unable to query interface %s of container %s: %v: %s", containerInterfaceName, containerDesc, err, string(output)) } // look for peer_ifindex match := ethtoolOutputRegex.FindSubmatch(output) if match == nil { - return "", fmt.Errorf("No peer_ifindex in interface statistics for %s of container %d", containerInterfaceName, containerPid) + return "", fmt.Errorf("No peer_ifindex in interface statistics for %s of container %s", containerInterfaceName, containerDesc) } peerIfIndex, err := strconv.Atoi(string(match[1])) if err != nil { // seems impossible (\d+ not numeric) diff --git a/pkg/kubelet/network/hairpin/hairpin_test.go b/pkg/kubelet/network/hairpin/hairpin_test.go index 63bc9ef5bad..b94c7b997cb 100644 --- a/pkg/kubelet/network/hairpin/hairpin_test.go +++ b/pkg/kubelet/network/hairpin/hairpin_test.go @@ -69,7 +69,8 @@ func TestFindPairInterfaceOfContainerInterface(t *testing.T) { return fmt.Sprintf("/fake-bin/%s", file), nil }, } - name, err := findPairInterfaceOfContainerInterface(&fexec, 123, "eth0") + nsenterArgs := []string{"-t", "123", "-n"} + name, err := findPairInterfaceOfContainerInterface(&fexec, "eth0", "123", nsenterArgs) if test.expectErr { if err == nil { t.Errorf("unexpected non-error") From 9b85d20c73e14f76ed30a13270d1e9a77c6c83ab Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Mon, 2 May 2016 19:49:02 -0500 Subject: [PATCH 2/6] kubelet/runtime: add method to return pod network namespace path Some runtimes (eg, Hypernetes) don't create network namespaces for pods, so network plugins must correctly handle any error returned from the runtime. --- pkg/kubelet/container/runtime.go | 6 ++++++ pkg/kubelet/container/testing/fake_runtime.go | 8 +++++++ pkg/kubelet/container/testing/runtime_mock.go | 5 +++++ pkg/kubelet/network/kubenet/kubenet_linux.go | 21 ++++--------------- pkg/kubelet/rkt/rkt.go | 5 +++++ 5 files changed, 28 insertions(+), 17 deletions(-) diff --git a/pkg/kubelet/container/runtime.go b/pkg/kubelet/container/runtime.go index 81189cd8887..10c1161f541 100644 --- a/pkg/kubelet/container/runtime.go +++ b/pkg/kubelet/container/runtime.go @@ -97,6 +97,12 @@ type Runtime interface { RemoveImage(image ImageSpec) error // Returns Image statistics. ImageStats() (*ImageStats, error) + // Returns the filesystem path of the pod's network namespace; if the + // runtime does not handle namespace creation itself, or cannot return + // the network namespace path, it should return an error. + // TODO: Change ContainerID to a Pod ID since the namespace is shared + // by all containers in the pod. + GetNetNS(containerID ContainerID) (string, error) // TODO(vmarmol): Unify pod and containerID args. // GetContainerLogs returns logs of a specific container. By // default, it returns a snapshot of the container log. Set 'follow' to true to diff --git a/pkg/kubelet/container/testing/fake_runtime.go b/pkg/kubelet/container/testing/fake_runtime.go index 093b7654250..db2aeacf3e5 100644 --- a/pkg/kubelet/container/testing/fake_runtime.go +++ b/pkg/kubelet/container/testing/fake_runtime.go @@ -338,6 +338,14 @@ func (f *FakeRuntime) PortForward(pod *Pod, port uint16, stream io.ReadWriteClos return f.Err } +func (f *FakeRuntime) GetNetNS(containerID ContainerID) (string, error) { + f.Lock() + defer f.Unlock() + + f.CalledFunctions = append(f.CalledFunctions, "GetNetNS") + return "", f.Err +} + func (f *FakeRuntime) GarbageCollect(gcPolicy ContainerGCPolicy) error { f.Lock() defer f.Unlock() diff --git a/pkg/kubelet/container/testing/runtime_mock.go b/pkg/kubelet/container/testing/runtime_mock.go index 5f01bc1414e..3f269249b39 100644 --- a/pkg/kubelet/container/testing/runtime_mock.go +++ b/pkg/kubelet/container/testing/runtime_mock.go @@ -128,6 +128,11 @@ func (r *Mock) PortForward(pod *Pod, port uint16, stream io.ReadWriteCloser) err return args.Error(0) } +func (r *Mock) GetNetNS(containerID ContainerID) (string, error) { + args := r.Called(containerID) + return "", args.Error(0) +} + func (r *Mock) GarbageCollect(gcPolicy ContainerGCPolicy) error { args := r.Called(gcPolicy) return args.Error(0) diff --git a/pkg/kubelet/network/kubenet/kubenet_linux.go b/pkg/kubelet/network/kubenet/kubenet_linux.go index 0c85212d927..60e1b2e59be 100644 --- a/pkg/kubelet/network/kubenet/kubenet_linux.go +++ b/pkg/kubelet/network/kubenet/kubenet_linux.go @@ -34,7 +34,6 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/apis/componentconfig" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" - "k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/util/bandwidth" utilexec "k8s.io/kubernetes/pkg/util/exec" @@ -266,11 +265,7 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k return fmt.Errorf("Kubenet cannot SetUpPod: %v", err) } - runtime, ok := plugin.host.GetRuntime().(*dockertools.DockerManager) - if !ok { - return fmt.Errorf("Kubenet execution called on non-docker runtime") - } - netnsPath, err := runtime.GetNetNS(id) + netnsPath, err := plugin.host.GetRuntime().GetNetNS(id) if err != nil { return fmt.Errorf("Kubenet failed to retrieve network namespace path: %v", err) } @@ -330,11 +325,7 @@ func (plugin *kubenetNetworkPlugin) TearDownPod(namespace string, name string, i return fmt.Errorf("Kubenet needs a PodCIDR to tear down pods") } - runtime, ok := plugin.host.GetRuntime().(*dockertools.DockerManager) - if !ok { - return fmt.Errorf("Kubenet execution called on non-docker runtime") - } - netnsPath, err := runtime.GetNetNS(id) + netnsPath, err := plugin.host.GetRuntime().GetNetNS(id) if err != nil { return err } @@ -373,12 +364,8 @@ func (plugin *kubenetNetworkPlugin) GetPodNetworkStatus(namespace string, name s return &network.PodNetworkStatus{IP: ip}, nil } } - // TODO: remove type conversion once kubenet supports multiple runtime - runtime, ok := plugin.host.GetRuntime().(*dockertools.DockerManager) - if !ok { - return nil, fmt.Errorf("Kubenet execution called on non-docker runtime") - } - netnsPath, err := runtime.GetNetNS(id) + + netnsPath, err := plugin.host.GetRuntime().GetNetNS(id) if err != nil { return nil, fmt.Errorf("Kubenet failed to retrieve network namespace path: %v", err) } diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index a075631cb50..d71b60a360b 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -1488,6 +1488,11 @@ func podIsActive(pod *rktapi.Pod) bool { pod.State == rktapi.PodState_POD_STATE_RUNNING } +// GetNetNS returns the network namespace path for the given container +func (r *Runtime) GetNetNS(containerID kubecontainer.ContainerID) (string, error) { + return "", nil +} + // GarbageCollect collects the pods/containers. // After one GC iteration: // - The deleted pods will be removed. From 62e4635cfb584b1dfad883e08d282ea26a39da50 Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Mon, 2 May 2016 19:49:42 -0500 Subject: [PATCH 3/6] rkt: set up network namespace with plugins --- pkg/kubelet/kubelet.go | 2 + pkg/kubelet/rkt/rkt.go | 215 +++++++++++++++++++++++++++++++++--- pkg/kubelet/rkt/rkt_test.go | 16 ++- 3 files changed, 210 insertions(+), 23 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 49884621646..c96aa3be217 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -439,6 +439,8 @@ func NewMainKubelet( klet.livenessManager, klet.volumeManager, klet.httpClient, + klet.networkPlugin, + klet.hairpinMode == componentconfig.HairpinVeth, utilexec.New(), kubecontainer.RealOS{}, imageBackOff, diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index d71b60a360b..376e10b2c67 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -45,6 +45,8 @@ import ( "k8s.io/kubernetes/pkg/credentialprovider" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/lifecycle" + "k8s.io/kubernetes/pkg/kubelet/network" + "k8s.io/kubernetes/pkg/kubelet/network/hairpin" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/util/format" @@ -74,8 +76,9 @@ const ( kubernetesUnitPrefix = "k8s_" unitKubernetesSection = "X-Kubernetes" - unitPodName = "POD" - unitRktID = "RktID" + unitPodUID = "PodUID" + unitPodName = "PodName" + unitPodNamespace = "PodNamespace" unitRestartCount = "RestartCount" k8sRktKubeletAnno = "rkt.kubernetes.io/managed-by-kubelet" @@ -133,8 +136,17 @@ type Runtime struct { execer utilexec.Interface os kubecontainer.OSInterface + // Network plugin. + networkPlugin network.NetworkPlugin + + // If true, the "hairpin mode" flag is set on container interfaces. + // A false value means the kubelet just backs off from setting it, + // it might already be true. + configureHairpinMode bool + // used for a systemd Exec, which requires the full path. - touchPath string + touchPath string + nsenterPath string versions versions } @@ -171,6 +183,8 @@ func New( livenessManager proberesults.Manager, volumeGetter volumeGetter, httpClient kubetypes.HttpGetter, + networkPlugin network.NetworkPlugin, + hairpinMode bool, execer utilexec.Interface, os kubecontainer.OSInterface, imageBackOff *flowcontrol.Backoff, @@ -203,6 +217,11 @@ func New( return nil, fmt.Errorf("cannot find touch binary: %v", err) } + nsenterPath, err := execer.LookPath("nsenter") + if err != nil { + return nil, fmt.Errorf("cannot find nsenter binary: %v", err) + } + rkt := &Runtime{ os: kubecontainer.RealOS{}, systemd: systemd, @@ -216,8 +235,10 @@ func New( recorder: recorder, livenessManager: livenessManager, volumeGetter: volumeGetter, + networkPlugin: networkPlugin, execer: execer, touchPath: touchPath, + nsenterPath: nsenterPath, } rkt.config, err = rkt.getConfig(rkt.config) @@ -862,23 +883,22 @@ func serviceFilePath(serviceName string) string { } // generateRunCommand crafts a 'rkt run-prepared' command with necessary parameters. -func (r *Runtime) generateRunCommand(pod *api.Pod, uuid string) (string, error) { +func (r *Runtime) generateRunCommand(pod *api.Pod, uuid, netnsName string) (string, error) { runPrepared := r.buildCommand("run-prepared").Args + // Network namespace set up in kubelet; rkt networking not used + runPrepared = append(runPrepared, "--net=host") + var hostname string var err error - // Setup network configuration. - if kubecontainer.IsHostNetworkPod(pod) { - runPrepared = append(runPrepared, "--net=host") - + // Setup DNS and hostname configuration. + if len(netnsName) == 0 { // TODO(yifan): Let runtimeHelper.GeneratePodHostNameAndDomain() to handle this. hostname, err = r.os.Hostname() if err != nil { return "", err } } else { - runPrepared = append(runPrepared, fmt.Sprintf("--net=%s", defaultNetworkName)) - // Setup DNS. dnsServers, dnsSearches, err := r.runtimeHelper.GetClusterDNS(pod) if err != nil { @@ -899,12 +919,37 @@ func (r *Runtime) generateRunCommand(pod *api.Pod, uuid string) (string, error) if err != nil { return "", err } + + // Drop the `rkt run-prepared` into the network namespace we + // created. + // TODO: switch to 'ip netns exec' once we can depend on a new + // enough version that doesn't have bugs like + // https://bugzilla.redhat.com/show_bug.cgi?id=882047 + nsenterExec := []string{r.nsenterPath, "--net=\"" + netnsPathFromName(netnsName) + "\"", "--"} + runPrepared = append(nsenterExec, runPrepared...) } + runPrepared = append(runPrepared, fmt.Sprintf("--hostname=%s", hostname)) runPrepared = append(runPrepared, uuid) return strings.Join(runPrepared, " "), nil } +func (r *Runtime) cleanupPodNetwork(pod *api.Pod) error { + glog.V(3).Infof("Calling network plugin %s to tear down pod for %s", r.networkPlugin.Name(), format.Pod(pod)) + + var teardownErr error + containerID := kubecontainer.ContainerID{ID: string(pod.UID)} + if err := r.networkPlugin.TearDownPod(pod.Namespace, pod.Name, containerID); err != nil { + teardownErr = fmt.Errorf("rkt: failed to tear down network for pod %s: %v", format.Pod(pod), err) + } + + if _, err := r.execer.Command("ip", "netns", "del", makePodNetnsName(pod.UID)).Output(); err != nil { + return fmt.Errorf("rkt: Failed to remove network namespace for pod %s: %v", format.Pod(pod), err) + } + + return teardownErr +} + // preparePod will: // // 1. Invoke 'rkt prepare' to prepare the pod, and get the rkt pod uuid. @@ -912,7 +957,7 @@ func (r *Runtime) generateRunCommand(pod *api.Pod, uuid string) (string, error) // // On success, it will return a string that represents name of the unit file // and the runtime pod. -func (r *Runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, *kubecontainer.Pod, error) { +func (r *Runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret, netnsName string) (string, *kubecontainer.Pod, error) { // Generate the pod manifest from the pod spec. manifest, err := r.makePodManifest(pod, pullSecrets) if err != nil { @@ -957,7 +1002,7 @@ func (r *Runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, *k glog.V(4).Infof("'rkt prepare' returns %q", uuid) // Create systemd service file for the rkt pod. - runPrepared, err := r.generateRunCommand(pod, uuid) + runPrepared, err := r.generateRunCommand(pod, uuid, netnsName) if err != nil { return "", nil, fmt.Errorf("failed to generate 'rkt run-prepared' command: %v", err) } @@ -972,6 +1017,10 @@ func (r *Runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, *k newUnitOption("Service", "ExecStopPost", markPodFinished), // This enables graceful stop. newUnitOption("Service", "KillMode", "mixed"), + // Track pod info for garbage collection + newUnitOption(unitKubernetesSection, unitPodUID, string(pod.UID)), + newUnitOption(unitKubernetesSection, unitPodName, pod.Name), + newUnitOption(unitKubernetesSection, unitPodNamespace, pod.Namespace), } serviceName := makePodServiceFileName(uuid) @@ -1024,12 +1073,57 @@ func (r *Runtime) generateEvents(runtimePod *kubecontainer.Pod, reason string, f return } +func makePodNetnsName(podID types.UID) string { + return fmt.Sprintf("%s_%s", kubernetesUnitPrefix, string(podID)) +} + +func netnsPathFromName(netnsName string) string { + return fmt.Sprintf("/var/run/netns/%s", netnsName) +} + +func (r *Runtime) setupPodNetwork(pod *api.Pod) (string, error) { + netnsName := makePodNetnsName(pod.UID) + + // Create a new network namespace for the pod + r.execer.Command("ip", "netns", "del", netnsName).Output() + _, err := r.execer.Command("ip", "netns", "add", netnsName).Output() + if err != nil { + return "", fmt.Errorf("failed to create pod network namespace: %v", err) + } + + // Set up networking with the network plugin + glog.V(3).Infof("Calling network plugin %s to setup pod for %s", r.networkPlugin.Name(), format.Pod(pod)) + containerID := kubecontainer.ContainerID{ID: string(pod.UID)} + err = r.networkPlugin.SetUpPod(pod.Namespace, pod.Name, containerID) + if err != nil { + return "", fmt.Errorf("failed to set up pod network: %v", err) + } + + if r.configureHairpinMode { + if err = hairpin.SetUpContainerPath(netnsPathFromName(netnsName), network.DefaultInterfaceName); err != nil { + glog.Warningf("Hairpin setup failed for pod %q: %v", format.Pod(pod), err) + } + } + + return netnsName, nil +} + // RunPod first creates the unit file for a pod, and then // starts the unit over d-bus. func (r *Runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error { glog.V(4).Infof("Rkt starts to run pod: name %q.", format.Pod(pod)) - name, runtimePod, prepareErr := r.preparePod(pod, pullSecrets) + var err error + var netnsName string + if !kubecontainer.IsHostNetworkPod(pod) { + netnsName, err = r.setupPodNetwork(pod) + if err != nil { + r.cleanupPodNetwork(pod) + return err + } + } + + name, runtimePod, prepareErr := r.preparePod(pod, pullSecrets, netnsName) // Set container references and generate events. // If preparedPod fails, then send out 'failed' events for each container. @@ -1049,6 +1143,7 @@ func (r *Runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error { } if prepareErr != nil { + r.cleanupPodNetwork(pod) return prepareErr } @@ -1057,9 +1152,10 @@ func (r *Runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error { // RestartUnit has the same effect as StartUnit if the unit is not running, besides it can restart // a unit if the unit file is changed and reloaded. reschan := make(chan string) - _, err := r.systemd.RestartUnit(name, "replace", reschan) + _, err = r.systemd.RestartUnit(name, "replace", reschan) if err != nil { r.generateEvents(runtimePod, "Failed", err) + r.cleanupPodNetwork(pod) return err } @@ -1067,6 +1163,7 @@ func (r *Runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error { if res != "done" { err := fmt.Errorf("Failed to restart unit %q: %s", name, res) r.generateEvents(runtimePod, "Failed", err) + r.cleanupPodNetwork(pod) return err } @@ -1078,6 +1175,7 @@ func (r *Runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error { if errKill := r.KillPod(pod, *runtimePod, nil); errKill != nil { return errors.NewAggregate([]error{err, errKill}) } + r.cleanupPodNetwork(pod) return err } @@ -1364,6 +1462,21 @@ func (r *Runtime) KillPod(pod *api.Pod, runningPod kubecontainer.Pod, gracePerio return err } + // Clean up networking; use running pod details since 'pod' can be nil + if pod == nil || !kubecontainer.IsHostNetworkPod(pod) { + err := r.cleanupPodNetwork(&api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: runningPod.ID, + Name: runningPod.Name, + Namespace: runningPod.Namespace, + }, + }) + if err != nil { + glog.Errorf("rkt: failed to tear down network for unit %q: %v", serviceName, err) + return err + } + } + return nil } @@ -1490,7 +1603,46 @@ func podIsActive(pod *rktapi.Pod) bool { // GetNetNS returns the network namespace path for the given container func (r *Runtime) GetNetNS(containerID kubecontainer.ContainerID) (string, error) { - return "", nil + // This is a slight hack, kubenet shouldn't be asking us about a container id + // but a pod id. This is because it knows too much about the infra container. + // We pretend the pod.UID is an infra container ID. + // This deception is only possible because we played the same trick in + // `networkPlugin.SetUpPod` and `networkPlugin.TearDownPod`. + return netnsPathFromName(makePodNetnsName(types.UID(containerID.ID))), nil +} + +func podDetailsFromServiceFile(serviceFilePath string) (string, string, string, error) { + f, err := os.Open(serviceFilePath) + if err != nil { + return "", "", "", err + } + defer f.Close() + + opts, err := unit.Deserialize(f) + if err != nil { + return "", "", "", err + } + + var id, name, namespace string + for _, o := range opts { + if o.Section != unitKubernetesSection { + continue + } + switch o.Name { + case unitPodUID: + id = o.Value + case unitPodName: + name = o.Value + case unitPodNamespace: + namespace = o.Value + } + + if id != "" && name != "" && namespace != "" { + return id, name, namespace, nil + } + } + + return "", "", "", fmt.Errorf("failed to parse pod from file %s", serviceFilePath) } // GarbageCollect collects the pods/containers. @@ -1553,7 +1705,13 @@ func (r *Runtime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy) error rktUUID := getRktUUIDFromServiceFileName(serviceName) if _, ok := allPods[rktUUID]; !ok { glog.V(4).Infof("rkt: No rkt pod found for service file %q, will remove it", serviceName) - if err := r.os.Remove(serviceFilePath(serviceName)); err != nil { + + serviceFile := serviceFilePath(serviceName) + + // Network may not be around anymore so errors are ignored + r.cleanupPodNetworkFromServiceFile(serviceFile) + + if err := r.os.Remove(serviceFile); err != nil { errlist = append(errlist, fmt.Errorf("rkt: Failed to remove service file %q: %v", serviceName, err)) } } @@ -1586,18 +1744,39 @@ func (r *Runtime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy) error return errors.NewAggregate(errlist) } +// Read kubernetes pod UUID, namespace, and name from systemd service file and +// use that to clean up any pod network that may still exist. +func (r *Runtime) cleanupPodNetworkFromServiceFile(serviceFilePath string) { + id, name, namespace, err := podDetailsFromServiceFile(serviceFilePath) + if err == nil { + r.cleanupPodNetwork(&api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: types.UID(id), + Name: name, + Namespace: namespace, + }, + }) + } +} + // removePod calls 'rkt rm $UUID' to delete a rkt pod, it also remove the systemd service file // related to the pod. func (r *Runtime) removePod(uuid string) error { var errlist []error glog.V(4).Infof("rkt: GC is removing pod %q", uuid) + + serviceName := makePodServiceFileName(uuid) + serviceFile := serviceFilePath(serviceName) + + // Network may not be around anymore so errors are ignored + r.cleanupPodNetworkFromServiceFile(serviceFile) + if _, err := r.cli.RunCommand("rm", uuid); err != nil { errlist = append(errlist, fmt.Errorf("rkt: Failed to remove pod %q: %v", uuid, err)) } // GC systemd service files as well. - serviceName := makePodServiceFileName(uuid) - if err := r.os.Remove(serviceFilePath(serviceName)); err != nil { + if err := r.os.Remove(serviceFile); err != nil { errlist = append(errlist, fmt.Errorf("rkt: Failed to remove service file %q for pod %q: %v", serviceName, uuid, err)) } diff --git a/pkg/kubelet/rkt/rkt_test.go b/pkg/kubelet/rkt/rkt_test.go index 944829be224..fc6eaafe114 100644 --- a/pkg/kubelet/rkt/rkt_test.go +++ b/pkg/kubelet/rkt/rkt_test.go @@ -1076,8 +1076,9 @@ func TestSetApp(t *testing.T) { func TestGenerateRunCommand(t *testing.T) { hostName := "test-hostname" tests := []struct { - pod *api.Pod - uuid string + pod *api.Pod + uuid string + netnsName string dnsServers []string dnsSearches []string @@ -1095,6 +1096,7 @@ func TestGenerateRunCommand(t *testing.T) { Spec: api.PodSpec{}, }, "rkt-uuid-foo", + "default", []string{}, []string{}, "", @@ -1109,11 +1111,12 @@ func TestGenerateRunCommand(t *testing.T) { }, }, "rkt-uuid-foo", + "default", []string{}, []string{}, "pod-hostname-foo", nil, - "/bin/rkt/rkt --insecure-options=image,ondisk --local-config=/var/rkt/local/data --dir=/var/data run-prepared --net=rkt.kubernetes.io --hostname=pod-hostname-foo rkt-uuid-foo", + " --net=\"/var/run/netns/default\" -- /bin/rkt/rkt --insecure-options=image,ondisk --local-config=/var/rkt/local/data --dir=/var/data run-prepared --net=host --hostname=pod-hostname-foo rkt-uuid-foo", }, // Case #2, returns no dns, with host-net. { @@ -1128,6 +1131,7 @@ func TestGenerateRunCommand(t *testing.T) { }, }, "rkt-uuid-foo", + "", []string{}, []string{}, "", @@ -1147,11 +1151,12 @@ func TestGenerateRunCommand(t *testing.T) { }, }, "rkt-uuid-foo", + "default", []string{"127.0.0.1"}, []string{"."}, "pod-hostname-foo", nil, - "/bin/rkt/rkt --insecure-options=image,ondisk --local-config=/var/rkt/local/data --dir=/var/data run-prepared --net=rkt.kubernetes.io --dns=127.0.0.1 --dns-search=. --dns-opt=ndots:5 --hostname=pod-hostname-foo rkt-uuid-foo", + " --net=\"/var/run/netns/default\" -- /bin/rkt/rkt --insecure-options=image,ondisk --local-config=/var/rkt/local/data --dir=/var/data run-prepared --net=host --dns=127.0.0.1 --dns-search=. --dns-opt=ndots:5 --hostname=pod-hostname-foo rkt-uuid-foo", }, // Case #4, returns no dns, dns searches, with host-network. { @@ -1166,6 +1171,7 @@ func TestGenerateRunCommand(t *testing.T) { }, }, "rkt-uuid-foo", + "", []string{"127.0.0.1"}, []string{"."}, "pod-hostname-foo", @@ -1189,7 +1195,7 @@ func TestGenerateRunCommand(t *testing.T) { testCaseHint := fmt.Sprintf("test case #%d", i) rkt.runtimeHelper = &fakeRuntimeHelper{tt.dnsServers, tt.dnsSearches, tt.hostName, "", tt.err} - result, err := rkt.generateRunCommand(tt.pod, tt.uuid) + result, err := rkt.generateRunCommand(tt.pod, tt.uuid, tt.netnsName) assert.Equal(t, tt.err, err, testCaseHint) assert.Equal(t, tt.expect, result, testCaseHint) } From 5de7b561e350d43d39f1db00d8c5655dff165f0d Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Mon, 9 May 2016 16:19:41 -0500 Subject: [PATCH 4/6] kubenet: ensure loopback interface is up and consolidate CNI setup --- pkg/kubelet/network/kubenet/kubenet_linux.go | 96 +++++++++++--------- 1 file changed, 53 insertions(+), 43 deletions(-) diff --git a/pkg/kubelet/network/kubenet/kubenet_linux.go b/pkg/kubelet/network/kubenet/kubenet_linux.go index 60e1b2e59be..27689dcd57f 100644 --- a/pkg/kubelet/network/kubenet/kubenet_linux.go +++ b/pkg/kubelet/network/kubenet/kubenet_linux.go @@ -31,6 +31,7 @@ import ( "github.com/vishvananda/netlink/nl" "github.com/appc/cni/libcni" + cnitypes "github.com/appc/cni/pkg/types" "github.com/golang/glog" "k8s.io/kubernetes/pkg/apis/componentconfig" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" @@ -54,6 +55,7 @@ type kubenetNetworkPlugin struct { host network.Host netConfig *libcni.NetworkConfig + loConfig *libcni.NetworkConfig cniConfig *libcni.CNIConfig shaper bandwidth.BandwidthShaper podCIDRs map[kubecontainer.ContainerID]string @@ -93,10 +95,20 @@ func (plugin *kubenetNetworkPlugin) Init(host network.Host, hairpinMode componen // was built-in, we simply ignore the error here. A better thing to do is // to check the kernel version in the future. plugin.execer.Command("modprobe", "br-netfilter").CombinedOutput() - if err := utilsysctl.SetSysctl(sysctlBridgeCallIptables, 1); err != nil { + err := utilsysctl.SetSysctl(sysctlBridgeCallIptables, 1) + if err != nil { glog.Warningf("can't set sysctl %s: %v", sysctlBridgeCallIptables, err) } + plugin.loConfig, err = libcni.ConfFromBytes([]byte(`{ + "cniVersion": "0.1.0", + "name": "kubenet-loopback", + "type": "loopback" +}`)) + if err != nil { + return fmt.Errorf("Failed to generate loopback config: %v", err) + } + return nil } @@ -265,20 +277,21 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k return fmt.Errorf("Kubenet cannot SetUpPod: %v", err) } - netnsPath, err := plugin.host.GetRuntime().GetNetNS(id) - if err != nil { - return fmt.Errorf("Kubenet failed to retrieve network namespace path: %v", err) - } - - rt := buildCNIRuntimeConf(name, namespace, id, netnsPath) - if err != nil { - return fmt.Errorf("Error building CNI config: %v", err) - } - - if err = plugin.addContainerToNetwork(id, rt); err != nil { + // Bring up container loopback interface + if _, err := plugin.addContainerToNetwork(plugin.loConfig, "lo", namespace, name, id); err != nil { return err } + // Hook container up with our bridge + res, err := plugin.addContainerToNetwork(plugin.netConfig, network.DefaultInterfaceName, namespace, name, id) + if err != nil { + return err + } + if res.IP4 == nil || res.IP4.IP.String() == "" { + return fmt.Errorf("CNI plugin reported no IPv4 address for container %v.", id) + } + plugin.podCIDRs[id] = res.IP4.IP.String() + // Put the container bridge into promiscuous mode to force it to accept hairpin packets. // TODO: Remove this once the kernel bug (#20096) is fixed. // TODO: check and set promiscuous mode with netlink once vishvananda/netlink supports it @@ -325,16 +338,6 @@ func (plugin *kubenetNetworkPlugin) TearDownPod(namespace string, name string, i return fmt.Errorf("Kubenet needs a PodCIDR to tear down pods") } - netnsPath, err := plugin.host.GetRuntime().GetNetNS(id) - if err != nil { - return err - } - - rt := buildCNIRuntimeConf(name, namespace, id, netnsPath) - if err != nil { - return fmt.Errorf("Error building CNI config: %v", err) - } - // no cached CIDR is Ok during teardown if cidr, ok := plugin.podCIDRs[id]; ok { glog.V(5).Infof("Removing pod CIDR %s from shaper", cidr) @@ -345,9 +348,10 @@ func (plugin *kubenetNetworkPlugin) TearDownPod(namespace string, name string, i } } } - if err = plugin.delContainerFromNetwork(id, rt); err != nil { + if err := plugin.delContainerFromNetwork(plugin.netConfig, network.DefaultInterfaceName, namespace, name, id); err != nil { return err } + delete(plugin.podCIDRs, id) return nil } @@ -399,37 +403,43 @@ func (plugin *kubenetNetworkPlugin) Status() error { return nil } -func buildCNIRuntimeConf(podName string, podNs string, podInfraContainerID kubecontainer.ContainerID, podNetnsPath string) *libcni.RuntimeConf { - glog.V(4).Infof("Kubenet: using netns path %v", podNetnsPath) - glog.V(4).Infof("Kubenet: using podns path %v", podNs) +func (plugin *kubenetNetworkPlugin) buildCNIRuntimeConf(ifName string, id kubecontainer.ContainerID) (*libcni.RuntimeConf, error) { + netnsPath, err := plugin.host.GetRuntime().GetNetNS(id) + if err != nil { + return nil, fmt.Errorf("Kubenet failed to retrieve network namespace path: %v", err) + } return &libcni.RuntimeConf{ - ContainerID: podInfraContainerID.ID, - NetNS: podNetnsPath, - IfName: network.DefaultInterfaceName, - } + ContainerID: id.ID, + NetNS: netnsPath, + IfName: ifName, + }, nil } -func (plugin *kubenetNetworkPlugin) addContainerToNetwork(id kubecontainer.ContainerID, rt *libcni.RuntimeConf) error { - glog.V(3).Infof("Calling cni plugins to add container to network with cni runtime: %+v", rt) - res, err := plugin.cniConfig.AddNetwork(plugin.netConfig, rt) +func (plugin *kubenetNetworkPlugin) addContainerToNetwork(config *libcni.NetworkConfig, ifName, namespace, name string, id kubecontainer.ContainerID) (*cnitypes.Result, error) { + rt, err := plugin.buildCNIRuntimeConf(ifName, id) if err != nil { - return fmt.Errorf("Error adding container to network: %v", err) - } - if res.IP4 == nil || res.IP4.IP.String() == "" { - return fmt.Errorf("CNI plugin reported no IPv4 address for container %v.", id) + return nil, fmt.Errorf("Error building CNI config: %v", err) } - plugin.podCIDRs[id] = res.IP4.IP.String() - return nil + glog.V(3).Infof("Adding %s/%s to '%s' with CNI '%s' plugin and runtime: %+v", namespace, name, config.Network.Name, config.Network.Type, rt) + res, err := plugin.cniConfig.AddNetwork(config, rt) + if err != nil { + return nil, fmt.Errorf("Error adding container to network: %v", err) + } + return res, nil } -func (plugin *kubenetNetworkPlugin) delContainerFromNetwork(id kubecontainer.ContainerID, rt *libcni.RuntimeConf) error { - glog.V(3).Infof("Calling cni plugins to remove container from network with cni runtime: %+v", rt) - if err := plugin.cniConfig.DelNetwork(plugin.netConfig, rt); err != nil { +func (plugin *kubenetNetworkPlugin) delContainerFromNetwork(config *libcni.NetworkConfig, ifName, namespace, name string, id kubecontainer.ContainerID) error { + rt, err := plugin.buildCNIRuntimeConf(ifName, id) + if err != nil { + return fmt.Errorf("Error building CNI config: %v", err) + } + + glog.V(3).Infof("Removing %s/%s from '%s' with CNI '%s' plugin and runtime: %+v", namespace, name, config.Network.Name, config.Network.Type, rt) + if err := plugin.cniConfig.DelNetwork(config, rt); err != nil { return fmt.Errorf("Error removing container from network: %v", err) } - delete(plugin.podCIDRs, id) return nil } From 3dbbe263165f1738bf7ac0cfc6f165c3c1aab3f6 Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Wed, 11 May 2016 09:33:34 -0500 Subject: [PATCH 5/6] kubenet: fix log message text --- pkg/kubelet/network/kubenet/kubenet_linux.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/kubelet/network/kubenet/kubenet_linux.go b/pkg/kubelet/network/kubenet/kubenet_linux.go index 27689dcd57f..c274e9b56b4 100644 --- a/pkg/kubelet/network/kubenet/kubenet_linux.go +++ b/pkg/kubelet/network/kubenet/kubenet_linux.go @@ -261,7 +261,7 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k start := time.Now() defer func() { - glog.V(4).Infof("TearDownPod took %v for %s/%s", time.Since(start), namespace, name) + glog.V(4).Infof("SetUpPod took %v for %s/%s", time.Since(start), namespace, name) }() pod, ok := plugin.host.GetPodByName(namespace, name) From 552b648caffb5be5ae86e237157fdad234caf4a4 Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Wed, 11 May 2016 15:32:49 -0500 Subject: [PATCH 6/6] kubenet: implement fake execer for testcases When the IP isn't in the internal map, GetPodNetworkStatus() needs to call the execer for the 'nsenter' program. That means the execer needs to be !nil, which it wasn't before. --- .../network/kubenet/kubenet_linux_test.go | 33 +++++++++++++++++-- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/pkg/kubelet/network/kubenet/kubenet_linux_test.go b/pkg/kubelet/network/kubenet/kubenet_linux_test.go index 6d31673bbb8..a16e11e7fa7 100644 --- a/pkg/kubelet/network/kubenet/kubenet_linux_test.go +++ b/pkg/kubelet/network/kubenet/kubenet_linux_test.go @@ -17,6 +17,8 @@ limitations under the License. package kubenet import ( + "fmt" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/network" nettest "k8s.io/kubernetes/pkg/kubelet/network/testing" @@ -38,9 +40,6 @@ func TestGetPodNetworkStatus(t *testing.T) { podIPMap[kubecontainer.ContainerID{ID: "1"}] = "10.245.0.2/32" podIPMap[kubecontainer.ContainerID{ID: "2"}] = "10.245.0.3/32" - fhost := nettest.NewFakeHost(nil) - fakeKubenet := newFakeKubenetPlugin(podIPMap, nil, fhost) - testCases := []struct { id string expectError bool @@ -66,6 +65,34 @@ func TestGetPodNetworkStatus(t *testing.T) { //TODO: add test cases for retrieving ip inside container network namespace } + fakeCmds := make([]exec.FakeCommandAction, 0) + for _, t := range testCases { + // the fake commands return the IP from the given index, or an error + fCmd := exec.FakeCmd{ + CombinedOutputScript: []exec.FakeCombinedOutputAction{ + func() ([]byte, error) { + ip, ok := podIPMap[kubecontainer.ContainerID{ID: t.id}] + if !ok { + return nil, fmt.Errorf("Pod IP %q not found", t.id) + } + return []byte(ip), nil + }, + }, + } + fakeCmds = append(fakeCmds, func(cmd string, args ...string) exec.Cmd { + return exec.InitFakeCmd(&fCmd, cmd, args...) + }) + } + fexec := exec.FakeExec{ + CommandScript: fakeCmds, + LookPathFunc: func(file string) (string, error) { + return fmt.Sprintf("/fake-bin/%s", file), nil + }, + } + + fhost := nettest.NewFakeHost(nil) + fakeKubenet := newFakeKubenetPlugin(podIPMap, &fexec, fhost) + for i, tc := range testCases { out, err := fakeKubenet.GetPodNetworkStatus("", "", kubecontainer.ContainerID{ID: tc.id}) if tc.expectError {