diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 49884621646..c96aa3be217 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -439,6 +439,8 @@ func NewMainKubelet( klet.livenessManager, klet.volumeManager, klet.httpClient, + klet.networkPlugin, + klet.hairpinMode == componentconfig.HairpinVeth, utilexec.New(), kubecontainer.RealOS{}, imageBackOff, diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index d71b60a360b..376e10b2c67 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -45,6 +45,8 @@ import ( "k8s.io/kubernetes/pkg/credentialprovider" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/lifecycle" + "k8s.io/kubernetes/pkg/kubelet/network" + "k8s.io/kubernetes/pkg/kubelet/network/hairpin" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/util/format" @@ -74,8 +76,9 @@ const ( kubernetesUnitPrefix = "k8s_" unitKubernetesSection = "X-Kubernetes" - unitPodName = "POD" - unitRktID = "RktID" + unitPodUID = "PodUID" + unitPodName = "PodName" + unitPodNamespace = "PodNamespace" unitRestartCount = "RestartCount" k8sRktKubeletAnno = "rkt.kubernetes.io/managed-by-kubelet" @@ -133,8 +136,17 @@ type Runtime struct { execer utilexec.Interface os kubecontainer.OSInterface + // Network plugin. + networkPlugin network.NetworkPlugin + + // If true, the "hairpin mode" flag is set on container interfaces. + // A false value means the kubelet just backs off from setting it, + // it might already be true. + configureHairpinMode bool + // used for a systemd Exec, which requires the full path. - touchPath string + touchPath string + nsenterPath string versions versions } @@ -171,6 +183,8 @@ func New( livenessManager proberesults.Manager, volumeGetter volumeGetter, httpClient kubetypes.HttpGetter, + networkPlugin network.NetworkPlugin, + hairpinMode bool, execer utilexec.Interface, os kubecontainer.OSInterface, imageBackOff *flowcontrol.Backoff, @@ -203,6 +217,11 @@ func New( return nil, fmt.Errorf("cannot find touch binary: %v", err) } + nsenterPath, err := execer.LookPath("nsenter") + if err != nil { + return nil, fmt.Errorf("cannot find nsenter binary: %v", err) + } + rkt := &Runtime{ os: kubecontainer.RealOS{}, systemd: systemd, @@ -216,8 +235,10 @@ func New( recorder: recorder, livenessManager: livenessManager, volumeGetter: volumeGetter, + networkPlugin: networkPlugin, execer: execer, touchPath: touchPath, + nsenterPath: nsenterPath, } rkt.config, err = rkt.getConfig(rkt.config) @@ -862,23 +883,22 @@ func serviceFilePath(serviceName string) string { } // generateRunCommand crafts a 'rkt run-prepared' command with necessary parameters. -func (r *Runtime) generateRunCommand(pod *api.Pod, uuid string) (string, error) { +func (r *Runtime) generateRunCommand(pod *api.Pod, uuid, netnsName string) (string, error) { runPrepared := r.buildCommand("run-prepared").Args + // Network namespace set up in kubelet; rkt networking not used + runPrepared = append(runPrepared, "--net=host") + var hostname string var err error - // Setup network configuration. - if kubecontainer.IsHostNetworkPod(pod) { - runPrepared = append(runPrepared, "--net=host") - + // Setup DNS and hostname configuration. + if len(netnsName) == 0 { // TODO(yifan): Let runtimeHelper.GeneratePodHostNameAndDomain() to handle this. hostname, err = r.os.Hostname() if err != nil { return "", err } } else { - runPrepared = append(runPrepared, fmt.Sprintf("--net=%s", defaultNetworkName)) - // Setup DNS. dnsServers, dnsSearches, err := r.runtimeHelper.GetClusterDNS(pod) if err != nil { @@ -899,12 +919,37 @@ func (r *Runtime) generateRunCommand(pod *api.Pod, uuid string) (string, error) if err != nil { return "", err } + + // Drop the `rkt run-prepared` into the network namespace we + // created. + // TODO: switch to 'ip netns exec' once we can depend on a new + // enough version that doesn't have bugs like + // https://bugzilla.redhat.com/show_bug.cgi?id=882047 + nsenterExec := []string{r.nsenterPath, "--net=\"" + netnsPathFromName(netnsName) + "\"", "--"} + runPrepared = append(nsenterExec, runPrepared...) } + runPrepared = append(runPrepared, fmt.Sprintf("--hostname=%s", hostname)) runPrepared = append(runPrepared, uuid) return strings.Join(runPrepared, " "), nil } +func (r *Runtime) cleanupPodNetwork(pod *api.Pod) error { + glog.V(3).Infof("Calling network plugin %s to tear down pod for %s", r.networkPlugin.Name(), format.Pod(pod)) + + var teardownErr error + containerID := kubecontainer.ContainerID{ID: string(pod.UID)} + if err := r.networkPlugin.TearDownPod(pod.Namespace, pod.Name, containerID); err != nil { + teardownErr = fmt.Errorf("rkt: failed to tear down network for pod %s: %v", format.Pod(pod), err) + } + + if _, err := r.execer.Command("ip", "netns", "del", makePodNetnsName(pod.UID)).Output(); err != nil { + return fmt.Errorf("rkt: Failed to remove network namespace for pod %s: %v", format.Pod(pod), err) + } + + return teardownErr +} + // preparePod will: // // 1. Invoke 'rkt prepare' to prepare the pod, and get the rkt pod uuid. @@ -912,7 +957,7 @@ func (r *Runtime) generateRunCommand(pod *api.Pod, uuid string) (string, error) // // On success, it will return a string that represents name of the unit file // and the runtime pod. -func (r *Runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, *kubecontainer.Pod, error) { +func (r *Runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret, netnsName string) (string, *kubecontainer.Pod, error) { // Generate the pod manifest from the pod spec. manifest, err := r.makePodManifest(pod, pullSecrets) if err != nil { @@ -957,7 +1002,7 @@ func (r *Runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, *k glog.V(4).Infof("'rkt prepare' returns %q", uuid) // Create systemd service file for the rkt pod. - runPrepared, err := r.generateRunCommand(pod, uuid) + runPrepared, err := r.generateRunCommand(pod, uuid, netnsName) if err != nil { return "", nil, fmt.Errorf("failed to generate 'rkt run-prepared' command: %v", err) } @@ -972,6 +1017,10 @@ func (r *Runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, *k newUnitOption("Service", "ExecStopPost", markPodFinished), // This enables graceful stop. newUnitOption("Service", "KillMode", "mixed"), + // Track pod info for garbage collection + newUnitOption(unitKubernetesSection, unitPodUID, string(pod.UID)), + newUnitOption(unitKubernetesSection, unitPodName, pod.Name), + newUnitOption(unitKubernetesSection, unitPodNamespace, pod.Namespace), } serviceName := makePodServiceFileName(uuid) @@ -1024,12 +1073,57 @@ func (r *Runtime) generateEvents(runtimePod *kubecontainer.Pod, reason string, f return } +func makePodNetnsName(podID types.UID) string { + return fmt.Sprintf("%s_%s", kubernetesUnitPrefix, string(podID)) +} + +func netnsPathFromName(netnsName string) string { + return fmt.Sprintf("/var/run/netns/%s", netnsName) +} + +func (r *Runtime) setupPodNetwork(pod *api.Pod) (string, error) { + netnsName := makePodNetnsName(pod.UID) + + // Create a new network namespace for the pod + r.execer.Command("ip", "netns", "del", netnsName).Output() + _, err := r.execer.Command("ip", "netns", "add", netnsName).Output() + if err != nil { + return "", fmt.Errorf("failed to create pod network namespace: %v", err) + } + + // Set up networking with the network plugin + glog.V(3).Infof("Calling network plugin %s to setup pod for %s", r.networkPlugin.Name(), format.Pod(pod)) + containerID := kubecontainer.ContainerID{ID: string(pod.UID)} + err = r.networkPlugin.SetUpPod(pod.Namespace, pod.Name, containerID) + if err != nil { + return "", fmt.Errorf("failed to set up pod network: %v", err) + } + + if r.configureHairpinMode { + if err = hairpin.SetUpContainerPath(netnsPathFromName(netnsName), network.DefaultInterfaceName); err != nil { + glog.Warningf("Hairpin setup failed for pod %q: %v", format.Pod(pod), err) + } + } + + return netnsName, nil +} + // RunPod first creates the unit file for a pod, and then // starts the unit over d-bus. func (r *Runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error { glog.V(4).Infof("Rkt starts to run pod: name %q.", format.Pod(pod)) - name, runtimePod, prepareErr := r.preparePod(pod, pullSecrets) + var err error + var netnsName string + if !kubecontainer.IsHostNetworkPod(pod) { + netnsName, err = r.setupPodNetwork(pod) + if err != nil { + r.cleanupPodNetwork(pod) + return err + } + } + + name, runtimePod, prepareErr := r.preparePod(pod, pullSecrets, netnsName) // Set container references and generate events. // If preparedPod fails, then send out 'failed' events for each container. @@ -1049,6 +1143,7 @@ func (r *Runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error { } if prepareErr != nil { + r.cleanupPodNetwork(pod) return prepareErr } @@ -1057,9 +1152,10 @@ func (r *Runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error { // RestartUnit has the same effect as StartUnit if the unit is not running, besides it can restart // a unit if the unit file is changed and reloaded. reschan := make(chan string) - _, err := r.systemd.RestartUnit(name, "replace", reschan) + _, err = r.systemd.RestartUnit(name, "replace", reschan) if err != nil { r.generateEvents(runtimePod, "Failed", err) + r.cleanupPodNetwork(pod) return err } @@ -1067,6 +1163,7 @@ func (r *Runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error { if res != "done" { err := fmt.Errorf("Failed to restart unit %q: %s", name, res) r.generateEvents(runtimePod, "Failed", err) + r.cleanupPodNetwork(pod) return err } @@ -1078,6 +1175,7 @@ func (r *Runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error { if errKill := r.KillPod(pod, *runtimePod, nil); errKill != nil { return errors.NewAggregate([]error{err, errKill}) } + r.cleanupPodNetwork(pod) return err } @@ -1364,6 +1462,21 @@ func (r *Runtime) KillPod(pod *api.Pod, runningPod kubecontainer.Pod, gracePerio return err } + // Clean up networking; use running pod details since 'pod' can be nil + if pod == nil || !kubecontainer.IsHostNetworkPod(pod) { + err := r.cleanupPodNetwork(&api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: runningPod.ID, + Name: runningPod.Name, + Namespace: runningPod.Namespace, + }, + }) + if err != nil { + glog.Errorf("rkt: failed to tear down network for unit %q: %v", serviceName, err) + return err + } + } + return nil } @@ -1490,7 +1603,46 @@ func podIsActive(pod *rktapi.Pod) bool { // GetNetNS returns the network namespace path for the given container func (r *Runtime) GetNetNS(containerID kubecontainer.ContainerID) (string, error) { - return "", nil + // This is a slight hack, kubenet shouldn't be asking us about a container id + // but a pod id. This is because it knows too much about the infra container. + // We pretend the pod.UID is an infra container ID. + // This deception is only possible because we played the same trick in + // `networkPlugin.SetUpPod` and `networkPlugin.TearDownPod`. + return netnsPathFromName(makePodNetnsName(types.UID(containerID.ID))), nil +} + +func podDetailsFromServiceFile(serviceFilePath string) (string, string, string, error) { + f, err := os.Open(serviceFilePath) + if err != nil { + return "", "", "", err + } + defer f.Close() + + opts, err := unit.Deserialize(f) + if err != nil { + return "", "", "", err + } + + var id, name, namespace string + for _, o := range opts { + if o.Section != unitKubernetesSection { + continue + } + switch o.Name { + case unitPodUID: + id = o.Value + case unitPodName: + name = o.Value + case unitPodNamespace: + namespace = o.Value + } + + if id != "" && name != "" && namespace != "" { + return id, name, namespace, nil + } + } + + return "", "", "", fmt.Errorf("failed to parse pod from file %s", serviceFilePath) } // GarbageCollect collects the pods/containers. @@ -1553,7 +1705,13 @@ func (r *Runtime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy) error rktUUID := getRktUUIDFromServiceFileName(serviceName) if _, ok := allPods[rktUUID]; !ok { glog.V(4).Infof("rkt: No rkt pod found for service file %q, will remove it", serviceName) - if err := r.os.Remove(serviceFilePath(serviceName)); err != nil { + + serviceFile := serviceFilePath(serviceName) + + // Network may not be around anymore so errors are ignored + r.cleanupPodNetworkFromServiceFile(serviceFile) + + if err := r.os.Remove(serviceFile); err != nil { errlist = append(errlist, fmt.Errorf("rkt: Failed to remove service file %q: %v", serviceName, err)) } } @@ -1586,18 +1744,39 @@ func (r *Runtime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy) error return errors.NewAggregate(errlist) } +// Read kubernetes pod UUID, namespace, and name from systemd service file and +// use that to clean up any pod network that may still exist. +func (r *Runtime) cleanupPodNetworkFromServiceFile(serviceFilePath string) { + id, name, namespace, err := podDetailsFromServiceFile(serviceFilePath) + if err == nil { + r.cleanupPodNetwork(&api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: types.UID(id), + Name: name, + Namespace: namespace, + }, + }) + } +} + // removePod calls 'rkt rm $UUID' to delete a rkt pod, it also remove the systemd service file // related to the pod. func (r *Runtime) removePod(uuid string) error { var errlist []error glog.V(4).Infof("rkt: GC is removing pod %q", uuid) + + serviceName := makePodServiceFileName(uuid) + serviceFile := serviceFilePath(serviceName) + + // Network may not be around anymore so errors are ignored + r.cleanupPodNetworkFromServiceFile(serviceFile) + if _, err := r.cli.RunCommand("rm", uuid); err != nil { errlist = append(errlist, fmt.Errorf("rkt: Failed to remove pod %q: %v", uuid, err)) } // GC systemd service files as well. - serviceName := makePodServiceFileName(uuid) - if err := r.os.Remove(serviceFilePath(serviceName)); err != nil { + if err := r.os.Remove(serviceFile); err != nil { errlist = append(errlist, fmt.Errorf("rkt: Failed to remove service file %q for pod %q: %v", serviceName, uuid, err)) } diff --git a/pkg/kubelet/rkt/rkt_test.go b/pkg/kubelet/rkt/rkt_test.go index 944829be224..fc6eaafe114 100644 --- a/pkg/kubelet/rkt/rkt_test.go +++ b/pkg/kubelet/rkt/rkt_test.go @@ -1076,8 +1076,9 @@ func TestSetApp(t *testing.T) { func TestGenerateRunCommand(t *testing.T) { hostName := "test-hostname" tests := []struct { - pod *api.Pod - uuid string + pod *api.Pod + uuid string + netnsName string dnsServers []string dnsSearches []string @@ -1095,6 +1096,7 @@ func TestGenerateRunCommand(t *testing.T) { Spec: api.PodSpec{}, }, "rkt-uuid-foo", + "default", []string{}, []string{}, "", @@ -1109,11 +1111,12 @@ func TestGenerateRunCommand(t *testing.T) { }, }, "rkt-uuid-foo", + "default", []string{}, []string{}, "pod-hostname-foo", nil, - "/bin/rkt/rkt --insecure-options=image,ondisk --local-config=/var/rkt/local/data --dir=/var/data run-prepared --net=rkt.kubernetes.io --hostname=pod-hostname-foo rkt-uuid-foo", + " --net=\"/var/run/netns/default\" -- /bin/rkt/rkt --insecure-options=image,ondisk --local-config=/var/rkt/local/data --dir=/var/data run-prepared --net=host --hostname=pod-hostname-foo rkt-uuid-foo", }, // Case #2, returns no dns, with host-net. { @@ -1128,6 +1131,7 @@ func TestGenerateRunCommand(t *testing.T) { }, }, "rkt-uuid-foo", + "", []string{}, []string{}, "", @@ -1147,11 +1151,12 @@ func TestGenerateRunCommand(t *testing.T) { }, }, "rkt-uuid-foo", + "default", []string{"127.0.0.1"}, []string{"."}, "pod-hostname-foo", nil, - "/bin/rkt/rkt --insecure-options=image,ondisk --local-config=/var/rkt/local/data --dir=/var/data run-prepared --net=rkt.kubernetes.io --dns=127.0.0.1 --dns-search=. --dns-opt=ndots:5 --hostname=pod-hostname-foo rkt-uuid-foo", + " --net=\"/var/run/netns/default\" -- /bin/rkt/rkt --insecure-options=image,ondisk --local-config=/var/rkt/local/data --dir=/var/data run-prepared --net=host --dns=127.0.0.1 --dns-search=. --dns-opt=ndots:5 --hostname=pod-hostname-foo rkt-uuid-foo", }, // Case #4, returns no dns, dns searches, with host-network. { @@ -1166,6 +1171,7 @@ func TestGenerateRunCommand(t *testing.T) { }, }, "rkt-uuid-foo", + "", []string{"127.0.0.1"}, []string{"."}, "pod-hostname-foo", @@ -1189,7 +1195,7 @@ func TestGenerateRunCommand(t *testing.T) { testCaseHint := fmt.Sprintf("test case #%d", i) rkt.runtimeHelper = &fakeRuntimeHelper{tt.dnsServers, tt.dnsSearches, tt.hostName, "", tt.err} - result, err := rkt.generateRunCommand(tt.pod, tt.uuid) + result, err := rkt.generateRunCommand(tt.pod, tt.uuid, tt.netnsName) assert.Equal(t, tt.err, err, testCaseHint) assert.Equal(t, tt.expect, result, testCaseHint) }