diff --git a/pkg/kubelet/dockertools/docker.go b/pkg/kubelet/dockertools/docker.go index bd70d74b7c9..d21d9cce410 100644 --- a/pkg/kubelet/dockertools/docker.go +++ b/pkg/kubelet/dockertools/docker.go @@ -17,14 +17,11 @@ limitations under the License. package dockertools import ( - "bufio" - "bytes" "fmt" "hash/adler32" "io" "math/rand" "os" - "os/exec" "strconv" "strings" @@ -112,209 +109,6 @@ func newDockerPuller(client DockerInterface, qps float32, burst int) DockerPulle } } -type dockerContainerCommandRunner struct { - client DockerInterface -} - -// The first version of docker that supports exec natively is 1.3.0 == API 1.15 -var dockerAPIVersionWithExec, _ = docker.NewAPIVersion("1.15") - -// Returns the major and minor version numbers of docker server. -// TODO(yifan): Remove this once the ContainerCommandRunner is implemented by dockerManager. -func (d *dockerContainerCommandRunner) getDockerServerVersion() (docker.APIVersion, error) { - env, err := d.client.Version() - if err != nil { - return nil, fmt.Errorf("failed to get docker server version - %v", err) - } - - apiVersion := env.Get("ApiVersion") - version, err := docker.NewAPIVersion(apiVersion) - if err != nil { - return nil, fmt.Errorf("failed to parse docker server version %q: %v", apiVersion, err) - } - - return version, nil -} - -func (d *dockerContainerCommandRunner) nativeExecSupportExists() (bool, error) { - version, err := d.getDockerServerVersion() - if err != nil { - return false, err - } - return version.GreaterThanOrEqualTo(dockerAPIVersionWithExec), nil -} - -func (d *dockerContainerCommandRunner) getRunInContainerCommand(containerID string, cmd []string) (*exec.Cmd, error) { - args := append([]string{"exec"}, cmd...) - command := exec.Command("/usr/sbin/nsinit", args...) - command.Dir = fmt.Sprintf("/var/lib/docker/execdriver/native/%s", containerID) - return command, nil -} - -func (d *dockerContainerCommandRunner) runInContainerUsingNsinit(containerID string, cmd []string) ([]byte, error) { - c, err := d.getRunInContainerCommand(containerID, cmd) - if err != nil { - return nil, err - } - return c.CombinedOutput() -} - -// RunInContainer uses nsinit to run the command inside the container identified by containerID -// TODO(yifan): Use strong type for containerID. -func (d *dockerContainerCommandRunner) RunInContainer(containerID string, cmd []string) ([]byte, error) { - // If native exec support does not exist in the local docker daemon use nsinit. - useNativeExec, err := d.nativeExecSupportExists() - if err != nil { - return nil, err - } - if !useNativeExec { - return d.runInContainerUsingNsinit(containerID, cmd) - } - createOpts := docker.CreateExecOptions{ - Container: containerID, - Cmd: cmd, - AttachStdin: false, - AttachStdout: true, - AttachStderr: true, - Tty: false, - } - execObj, err := d.client.CreateExec(createOpts) - if err != nil { - return nil, fmt.Errorf("failed to run in container - Exec setup failed - %v", err) - } - var buf bytes.Buffer - wrBuf := bufio.NewWriter(&buf) - startOpts := docker.StartExecOptions{ - Detach: false, - Tty: false, - OutputStream: wrBuf, - ErrorStream: wrBuf, - RawTerminal: false, - } - errChan := make(chan error, 1) - go func() { - errChan <- d.client.StartExec(execObj.ID, startOpts) - }() - wrBuf.Flush() - return buf.Bytes(), <-errChan -} - -// ExecInContainer uses nsenter to run the command inside the container identified by containerID. -// -// TODO: -// - match cgroups of container -// - should we support `docker exec`? -// - should we support nsenter in a container, running with elevated privs and --pid=host? -// - use strong type for containerId -func (d *dockerContainerCommandRunner) ExecInContainer(containerId string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { - nsenter, err := exec.LookPath("nsenter") - if err != nil { - return fmt.Errorf("exec unavailable - unable to locate nsenter") - } - - container, err := d.client.InspectContainer(containerId) - if err != nil { - return err - } - - if !container.State.Running { - return fmt.Errorf("container not running (%s)", container) - } - - containerPid := container.State.Pid - - // TODO what if the container doesn't have `env`??? - args := []string{"-t", fmt.Sprintf("%d", containerPid), "-m", "-i", "-u", "-n", "-p", "--", "env", "-i"} - args = append(args, fmt.Sprintf("HOSTNAME=%s", container.Config.Hostname)) - args = append(args, container.Config.Env...) - args = append(args, cmd...) - command := exec.Command(nsenter, args...) - if tty { - p, err := StartPty(command) - if err != nil { - return err - } - defer p.Close() - - // make sure to close the stdout stream - defer stdout.Close() - - if stdin != nil { - go io.Copy(p, stdin) - } - - if stdout != nil { - go io.Copy(stdout, p) - } - - return command.Wait() - } else { - if stdin != nil { - // Use an os.Pipe here as it returns true *os.File objects. - // This way, if you run 'kubectl exec -p -i bash' (no tty) and type 'exit', - // the call below to command.Run() can unblock because its Stdin is the read half - // of the pipe. - r, w, err := os.Pipe() - if err != nil { - return err - } - go io.Copy(w, stdin) - - command.Stdin = r - } - if stdout != nil { - command.Stdout = stdout - } - if stderr != nil { - command.Stderr = stderr - } - - return command.Run() - } -} - -// PortForward executes socat in the pod's network namespace and copies -// data between stream (representing the user's local connection on their -// computer) and the specified port in the container. -// -// TODO: -// - match cgroups of container -// - should we support nsenter + socat on the host? (current impl) -// - should we support nsenter + socat in a container, running with elevated privs and --pid=host? -func (d *dockerContainerCommandRunner) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error { - podInfraContainer := pod.FindContainerByName(PodInfraContainerName) - if podInfraContainer == nil { - return fmt.Errorf("cannot find pod infra container in pod %q", kubecontainer.BuildPodFullName(pod.Name, pod.Namespace)) - } - container, err := d.client.InspectContainer(string(podInfraContainer.ID)) - if err != nil { - return err - } - - if !container.State.Running { - return fmt.Errorf("container not running (%s)", container) - } - - containerPid := container.State.Pid - // TODO what if the host doesn't have it??? - _, lookupErr := exec.LookPath("socat") - if lookupErr != nil { - return fmt.Errorf("Unable to do port forwarding: socat not found.") - } - args := []string{"-t", fmt.Sprintf("%d", containerPid), "-n", "socat", "-", fmt.Sprintf("TCP4:localhost:%d", port)} - // TODO use exec.LookPath - command := exec.Command("nsenter", args...) - command.Stdin = stream - command.Stdout = stream - return command.Run() -} - -// NewDockerContainerCommandRunner creates a ContainerCommandRunner which uses nsinit to run a command -// inside a container. -func NewDockerContainerCommandRunner(client DockerInterface) ContainerCommandRunner { - return &dockerContainerCommandRunner{client: client} -} - func parseImageName(image string) (string, string) { return parsers.ParseRepositoryTag(image) } diff --git a/pkg/kubelet/dockertools/docker_test.go b/pkg/kubelet/dockertools/docker_test.go index 514c2ceafa1..3e0c6bc558d 100644 --- a/pkg/kubelet/dockertools/docker_test.go +++ b/pkg/kubelet/dockertools/docker_test.go @@ -144,7 +144,7 @@ func TestVersion(t *testing.T) { func TestExecSupportExists(t *testing.T) { fakeDocker := &FakeDockerClient{VersionInfo: docker.Env{"Version=1.3.0", "ApiVersion=1.15"}} - runner := dockerContainerCommandRunner{fakeDocker} + runner := &DockerManager{client: fakeDocker} useNativeExec, err := runner.nativeExecSupportExists() if err != nil { t.Errorf("got error while checking for exec support - %s", err) @@ -156,7 +156,7 @@ func TestExecSupportExists(t *testing.T) { func TestExecSupportNotExists(t *testing.T) { fakeDocker := &FakeDockerClient{VersionInfo: docker.Env{"Version=1.1.2", "ApiVersion=1.14"}} - runner := dockerContainerCommandRunner{fakeDocker} + runner := &DockerManager{client: fakeDocker} useNativeExec, _ := runner.nativeExecSupportExists() if useNativeExec { t.Errorf("invalid exec support check output.") @@ -164,7 +164,7 @@ func TestExecSupportNotExists(t *testing.T) { } func TestDockerContainerCommand(t *testing.T) { - runner := dockerContainerCommandRunner{} + runner := &DockerManager{} containerID := "1234" command := []string{"ls"} cmd, _ := runner.getRunInContainerCommand(containerID, command) diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index 4016bd3d0a4..122a45ad01e 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -17,11 +17,14 @@ limitations under the License. package dockertools import ( + "bufio" + "bytes" "errors" "fmt" "io" "io/ioutil" "os" + "os/exec" "path" "strconv" "strings" @@ -697,3 +700,183 @@ func (dm *DockerManager) Version() (kubecontainer.Version, error) { } return dockerVersion(version), nil } + +// The first version of docker that supports exec natively is 1.3.0 == API 1.15 +var dockerAPIVersionWithExec = "1.15" + +func (dm *DockerManager) nativeExecSupportExists() (bool, error) { + version, err := dm.Version() + if err != nil { + return false, err + } + result, err := version.Compare(dockerAPIVersionWithExec) + if result >= 0 { + return true, err + } + return false, err +} + +func (dm *DockerManager) getRunInContainerCommand(containerID string, cmd []string) (*exec.Cmd, error) { + args := append([]string{"exec"}, cmd...) + command := exec.Command("/usr/sbin/nsinit", args...) + command.Dir = fmt.Sprintf("/var/lib/docker/execdriver/native/%s", containerID) + return command, nil +} + +func (dm *DockerManager) runInContainerUsingNsinit(containerID string, cmd []string) ([]byte, error) { + c, err := dm.getRunInContainerCommand(containerID, cmd) + if err != nil { + return nil, err + } + return c.CombinedOutput() +} + +// RunInContainer uses nsinit to run the command inside the container identified by containerID +// TODO(yifan): Use strong type for containerID. +func (dm *DockerManager) RunInContainer(containerID string, cmd []string) ([]byte, error) { + // If native exec support does not exist in the local docker daemon use nsinit. + useNativeExec, err := dm.nativeExecSupportExists() + if err != nil { + return nil, err + } + if !useNativeExec { + return dm.runInContainerUsingNsinit(containerID, cmd) + } + createOpts := docker.CreateExecOptions{ + Container: containerID, + Cmd: cmd, + AttachStdin: false, + AttachStdout: true, + AttachStderr: true, + Tty: false, + } + execObj, err := dm.client.CreateExec(createOpts) + if err != nil { + return nil, fmt.Errorf("failed to run in container - Exec setup failed - %v", err) + } + var buf bytes.Buffer + wrBuf := bufio.NewWriter(&buf) + startOpts := docker.StartExecOptions{ + Detach: false, + Tty: false, + OutputStream: wrBuf, + ErrorStream: wrBuf, + RawTerminal: false, + } + errChan := make(chan error, 1) + go func() { + errChan <- dm.client.StartExec(execObj.ID, startOpts) + }() + wrBuf.Flush() + return buf.Bytes(), <-errChan +} + +// ExecInContainer uses nsenter to run the command inside the container identified by containerID. +// +// TODO: +// - match cgroups of container +// - should we support `docker exec`? +// - should we support nsenter in a container, running with elevated privs and --pid=host? +// - use strong type for containerId +func (dm *DockerManager) ExecInContainer(containerId string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { + nsenter, err := exec.LookPath("nsenter") + if err != nil { + return fmt.Errorf("exec unavailable - unable to locate nsenter") + } + + container, err := dm.client.InspectContainer(containerId) + if err != nil { + return err + } + + if !container.State.Running { + return fmt.Errorf("container not running (%s)", container) + } + + containerPid := container.State.Pid + + // TODO what if the container doesn't have `env`??? + args := []string{"-t", fmt.Sprintf("%d", containerPid), "-m", "-i", "-u", "-n", "-p", "--", "env", "-i"} + args = append(args, fmt.Sprintf("HOSTNAME=%s", container.Config.Hostname)) + args = append(args, container.Config.Env...) + args = append(args, cmd...) + command := exec.Command(nsenter, args...) + if tty { + p, err := StartPty(command) + if err != nil { + return err + } + defer p.Close() + + // make sure to close the stdout stream + defer stdout.Close() + + if stdin != nil { + go io.Copy(p, stdin) + } + + if stdout != nil { + go io.Copy(stdout, p) + } + + return command.Wait() + } else { + if stdin != nil { + // Use an os.Pipe here as it returns true *os.File objects. + // This way, if you run 'kubectl exec -p -i bash' (no tty) and type 'exit', + // the call below to command.Run() can unblock because its Stdin is the read half + // of the pipe. + r, w, err := os.Pipe() + if err != nil { + return err + } + go io.Copy(w, stdin) + + command.Stdin = r + } + if stdout != nil { + command.Stdout = stdout + } + if stderr != nil { + command.Stderr = stderr + } + + return command.Run() + } +} + +// PortForward executes socat in the pod's network namespace and copies +// data between stream (representing the user's local connection on their +// computer) and the specified port in the container. +// +// TODO: +// - match cgroups of container +// - should we support nsenter + socat on the host? (current impl) +// - should we support nsenter + socat in a container, running with elevated privs and --pid=host? +func (dm *DockerManager) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error { + podInfraContainer := pod.FindContainerByName(PodInfraContainerName) + if podInfraContainer == nil { + return fmt.Errorf("cannot find pod infra container in pod %q", kubecontainer.BuildPodFullName(pod.Name, pod.Namespace)) + } + container, err := dm.client.InspectContainer(string(podInfraContainer.ID)) + if err != nil { + return err + } + + if !container.State.Running { + return fmt.Errorf("container not running (%s)", container) + } + + containerPid := container.State.Pid + // TODO what if the host doesn't have it??? + _, lookupErr := exec.LookPath("socat") + if lookupErr != nil { + return fmt.Errorf("Unable to do port forwarding: socat not found.") + } + args := []string{"-t", fmt.Sprintf("%d", containerPid), "-n", "socat", "-", fmt.Sprintf("TCP4:localhost:%d", port)} + // TODO use exec.LookPath + command := exec.Command("nsenter", args...) + command.Stdin = stream + command.Stdout = stream + return command.Run() +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 63f169f5f51..1bb63406fbd 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -210,7 +210,7 @@ func NewMainKubelet( resyncInterval: resyncInterval, containerRefManager: kubecontainer.NewRefManager(), readinessManager: kubecontainer.NewReadinessManager(), - runner: dockertools.NewDockerContainerCommandRunner(dockerClient), + runner: containerManager, httpClient: &http.Client{}, sourcesReady: sourcesReady, clusterDomain: clusterDomain,