diff --git a/pkg/kubelet/container/fake_runtime.go b/pkg/kubelet/container/fake_runtime.go index 768e2e49a10..18b19bf5ce7 100644 --- a/pkg/kubelet/container/fake_runtime.go +++ b/pkg/kubelet/container/fake_runtime.go @@ -234,6 +234,14 @@ func (f *FakeRuntime) ExecInContainer(containerID string, cmd []string, stdin io return f.Err } +func (f *FakeRuntime) AttachContainer(containerID string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { + f.Lock() + defer f.Unlock() + + f.CalledFunctions = append(f.CalledFunctions, "AttachContainer") + return f.Err +} + func (f *FakeRuntime) RunInContainer(containerID string, cmd []string) ([]byte, error) { f.Lock() defer f.Unlock() diff --git a/pkg/kubelet/container/runtime.go b/pkg/kubelet/container/runtime.go index 3403f401c71..58a82358382 100644 --- a/pkg/kubelet/container/runtime.go +++ b/pkg/kubelet/container/runtime.go @@ -77,6 +77,12 @@ type Runtime interface { GetContainerLogs(pod *api.Pod, containerID, tail string, follow bool, stdout, stderr io.Writer) (err error) // ContainerCommandRunner encapsulates the command runner interfaces for testability. ContainerCommandRunner + // ContainerAttach encapsulates the attaching to containers for testability + ContainerAttacher +} + +type ContainerAttacher interface { + AttachContainer(id string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) (err error) } // CommandRunner encapsulates the command runner interfaces for testability. diff --git a/pkg/kubelet/dockertools/docker.go b/pkg/kubelet/dockertools/docker.go index f29a1f2a62b..bf3dfe338d8 100644 --- a/pkg/kubelet/dockertools/docker.go +++ b/pkg/kubelet/dockertools/docker.go @@ -71,6 +71,7 @@ type DockerInterface interface { CreateExec(docker.CreateExecOptions) (*docker.Exec, error) StartExec(string, docker.StartExecOptions) error InspectExec(id string) (*docker.ExecInspect, error) + AttachToContainer(opts docker.AttachToContainerOptions) error } // KubeletContainerName encapsulates a pod name and a Kubernetes container name. diff --git a/pkg/kubelet/dockertools/fake_docker_client.go b/pkg/kubelet/dockertools/fake_docker_client.go index 2c90765416e..2cb9ce8af4a 100644 --- a/pkg/kubelet/dockertools/fake_docker_client.go +++ b/pkg/kubelet/dockertools/fake_docker_client.go @@ -298,6 +298,13 @@ func (f *FakeDockerClient) StartExec(_ string, _ docker.StartExecOptions) error return nil } +func (f *FakeDockerClient) AttachToContainer(opts docker.AttachToContainerOptions) error { + f.Lock() + defer f.Unlock() + f.called = append(f.called, "attach") + return nil +} + func (f *FakeDockerClient) InspectExec(id string) (*docker.ExecInspect, error) { return f.ExecInspect, f.popError("inspect_exec") } diff --git a/pkg/kubelet/dockertools/instrumented_docker.go b/pkg/kubelet/dockertools/instrumented_docker.go index 2110fbedbad..2d8af57070d 100644 --- a/pkg/kubelet/dockertools/instrumented_docker.go +++ b/pkg/kubelet/dockertools/instrumented_docker.go @@ -189,3 +189,12 @@ func (in instrumentedDockerInterface) InspectExec(id string) (*docker.ExecInspec recordError(operation, err) return out, err } + +func (in instrumentedDockerInterface) AttachToContainer(opts docker.AttachToContainerOptions) error { + const operation = "attach" + defer recordOperation(operation, time.Now()) + + err := in.client.AttachToContainer(opts) + recordError(operation, err) + return err +} diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index c7a9f22bcd6..c7a062f74c7 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -1006,6 +1006,21 @@ func (dm *DockerManager) ExecInContainer(containerId string, cmd []string, stdin return dm.execHandler.ExecInContainer(dm.client, container, cmd, stdin, stdout, stderr, tty) } +func (dm *DockerManager) AttachContainer(containerId string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { + opts := docker.AttachToContainerOptions{ + Container: containerId, + InputStream: stdin, + OutputStream: stdout, + ErrorStream: stderr, + Logs: true, + Stdin: stdin != nil, + Stdout: stdout != nil, + Stderr: stderr != nil, + RawTerminal: tty, + } + return dm.client.AttachToContainer(opts) +} + func noPodInfraContainerError(podName, podNamespace string) error { return fmt.Errorf("cannot find pod infra container in pod %q", kubecontainer.BuildPodFullName(podName, podNamespace)) } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 53843a068fc..80b2b179203 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -2348,6 +2348,19 @@ func (kl *Kubelet) ExecInContainer(podFullName string, podUID types.UID, contain return kl.runner.ExecInContainer(string(container.ID), cmd, stdin, stdout, stderr, tty) } +func (kl *Kubelet) AttachContainer(podFullName string, podUID types.UID, containerName string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { + podUID = kl.podManager.TranslatePodUID(podUID) + + container, err := kl.findContainer(podFullName, podUID, containerName) + if err != nil { + return err + } + if container == nil { + return fmt.Errorf("container not found (%q)", containerName) + } + return kl.containerRuntime.AttachContainer(string(container.ID), stdin, stdout, stderr, tty) +} + // PortForward connects to the pod's port and copies data between the port // and the stream. func (kl *Kubelet) PortForward(podFullName string, podUID types.UID, port uint16, stream io.ReadWriteCloser) error { diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index ba2da9c58fa..985b5b5e16a 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -18,6 +18,7 @@ package rkt import ( "encoding/json" + "errors" "fmt" "io" "io/ioutil" @@ -945,6 +946,10 @@ func (r *runtime) RunInContainer(containerID string, cmd []string) ([]byte, erro return []byte(strings.Join(result, "\n")), err } +func (r *runtime) AttachContainer(containerID string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { + return errors.New("unimplemented") +} + // Note: In rkt, the container ID is in the form of "UUID:appName:ImageID", where // appName is the container name. func (r *runtime) ExecInContainer(containerID string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { diff --git a/pkg/kubelet/server.go b/pkg/kubelet/server.go index dbe93bb7574..55e54ed1e71 100644 --- a/pkg/kubelet/server.go +++ b/pkg/kubelet/server.go @@ -101,6 +101,7 @@ type HostInterface interface { GetPodByName(namespace, name string) (*api.Pod, bool) RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error + AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool) error GetKubeletContainerLogs(podFullName, containerName, tail string, follow, previous bool, stdout, stderr io.Writer) error ServeLogs(w http.ResponseWriter, req *http.Request) PortForward(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error @@ -140,6 +141,7 @@ func (s *Server) InstallDefaultHandlers() { func (s *Server) InstallDebuggingHandlers() { s.mux.HandleFunc("/run/", s.handleRun) s.mux.HandleFunc("/exec/", s.handleExec) + s.mux.HandleFunc("/attach/", s.handleAttach) s.mux.HandleFunc("/portForward/", s.handlePortForward) s.mux.HandleFunc("/logs/", s.handleLogs) @@ -367,6 +369,42 @@ func parseContainerCoordinates(path string) (namespace, pod string, uid types.UI return } +const streamCreationTimeout = 30 * time.Second + +func (s *Server) handleAttach(w http.ResponseWriter, req *http.Request) { + u, err := url.ParseRequestURI(req.RequestURI) + if err != nil { + s.error(w, err) + return + } + podNamespace, podID, uid, container, err := parseContainerCoordinates(u.Path) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + pod, ok := s.host.GetPodByName(podNamespace, podID) + if !ok { + http.Error(w, "Pod does not exist", http.StatusNotFound) + return + } + + stdinStream, stdoutStream, stderrStream, errorStream, conn, tty, ok := s.createStreams(w, req) + if conn != nil { + defer conn.Close() + } + if !ok { + // error is handled in the createStreams function + return + } + + err = s.host.AttachContainer(kubecontainer.GetPodFullName(pod), uid, container, stdinStream, stdoutStream, stderrStream, tty) + if err != nil { + msg := fmt.Sprintf("Error executing command in container: %v", err) + glog.Error(msg) + errorStream.Write([]byte(msg)) + } +} + // handleRun handles requests to run a command inside a container. func (s *Server) handleRun(w http.ResponseWriter, req *http.Request) { u, err := url.ParseRequestURI(req.RequestURI) @@ -394,8 +432,6 @@ func (s *Server) handleRun(w http.ResponseWriter, req *http.Request) { w.Write(data) } -const streamCreationTimeout = 30 * time.Second - // handleExec handles requests to run a command inside a container. func (s *Server) handleExec(w http.ResponseWriter, req *http.Request) { u, err := url.ParseRequestURI(req.RequestURI) @@ -413,7 +449,22 @@ func (s *Server) handleExec(w http.ResponseWriter, req *http.Request) { http.Error(w, "Pod does not exist", http.StatusNotFound) return } + stdinStream, stdoutStream, stderrStream, errorStream, conn, tty, ok := s.createStreams(w, req) + if conn != nil { + defer conn.Close() + } + if !ok { + return + } + err = s.host.ExecInContainer(kubecontainer.GetPodFullName(pod), uid, container, u.Query()[api.ExecCommandParamm], stdinStream, stdoutStream, stderrStream, tty) + if err != nil { + msg := fmt.Sprintf("Error executing command in container: %v", err) + glog.Error(msg) + errorStream.Write([]byte(msg)) + } +} +func (s *Server) createStreams(w http.ResponseWriter, req *http.Request) (io.Reader, io.WriteCloser, io.WriteCloser, io.WriteCloser, httpstream.Connection, bool, bool) { req.ParseForm() // start at 1 for error stream expectedStreams := 1 @@ -430,7 +481,7 @@ func (s *Server) handleExec(w http.ResponseWriter, req *http.Request) { if expectedStreams == 1 { http.Error(w, "You must specify at least 1 of stdin, stdout, stderr", http.StatusBadRequest) - return + return nil, nil, nil, nil, nil, false, false } streamCh := make(chan httpstream.Stream) @@ -445,9 +496,8 @@ func (s *Server) handleExec(w http.ResponseWriter, req *http.Request) { // The upgrader is responsible for notifying the client of any errors that // occurred during upgrading. All we can do is return here at this point // if we weren't successful in upgrading. - return + return nil, nil, nil, nil, nil, false, false } - defer conn.Close() conn.SetIdleTimeout(s.host.StreamingConnectionIdleTimeout()) @@ -485,7 +535,7 @@ WaitForStreams: // TODO find a way to return the error to the user. Maybe use a separate // stream to report errors? glog.Error("Timed out waiting for client to create streams") - return + return nil, nil, nil, nil, nil, false, false } } @@ -494,12 +544,7 @@ WaitForStreams: stdinStream.Close() } - err = s.host.ExecInContainer(kubecontainer.GetPodFullName(pod), uid, container, u.Query()[api.ExecCommandParamm], stdinStream, stdoutStream, stderrStream, tty) - if err != nil { - msg := fmt.Sprintf("Error executing command in container: %v", err) - glog.Error(msg) - errorStream.Write([]byte(msg)) - } + return stdinStream, stdoutStream, stderrStream, errorStream, conn, tty, true } func parsePodCoordinates(path string) (namespace, pod string, uid types.UID, err error) { diff --git a/pkg/kubelet/server_test.go b/pkg/kubelet/server_test.go index 39c7a4eaa77..d02754149f3 100644 --- a/pkg/kubelet/server_test.go +++ b/pkg/kubelet/server_test.go @@ -52,6 +52,7 @@ type fakeKubelet struct { runFunc func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error) containerVersionFunc func() (kubecontainer.Version, error) execFunc func(pod string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error + attachFunc func(pod string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool) error portForwardFunc func(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error containerLogsFunc func(podFullName, containerName, tail string, follow, pervious bool, stdout, stderr io.Writer) error streamingConnectionIdleTimeoutFunc func() time.Duration @@ -116,6 +117,10 @@ func (fk *fakeKubelet) ExecInContainer(name string, uid types.UID, container str return fk.execFunc(name, uid, container, cmd, in, out, err, tty) } +func (fk *fakeKubelet) AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool) error { + return fk.attachFunc(name, uid, container, in, out, err, tty) +} + func (fk *fakeKubelet) PortForward(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error { return fk.portForwardFunc(name, uid, port, stream) } @@ -931,6 +936,237 @@ func TestServeExecInContainer(t *testing.T) { } } +// TODO: largely cloned from TestServeExecContainer, refactor and re-use code +func TestServeAttachContainer(t *testing.T) { + tests := []struct { + stdin bool + stdout bool + stderr bool + tty bool + responseStatusCode int + uid bool + }{ + {responseStatusCode: http.StatusBadRequest}, + {stdin: true, responseStatusCode: http.StatusSwitchingProtocols}, + {stdout: true, responseStatusCode: http.StatusSwitchingProtocols}, + {stderr: true, responseStatusCode: http.StatusSwitchingProtocols}, + {stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols}, + {stdout: true, stderr: true, tty: true, responseStatusCode: http.StatusSwitchingProtocols}, + {stdin: true, stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols}, + } + + for i, test := range tests { + fw := newServerTest() + + fw.fakeKubelet.streamingConnectionIdleTimeoutFunc = func() time.Duration { + return 0 + } + + podNamespace := "other" + podName := "foo" + expectedPodName := getPodName(podName, podNamespace) + expectedUid := "9b01b80f-8fb4-11e4-95ab-4200af06647" + expectedContainerName := "baz" + expectedStdin := "stdin" + expectedStdout := "stdout" + expectedStderr := "stderr" + attachFuncDone := make(chan struct{}) + clientStdoutReadDone := make(chan struct{}) + clientStderrReadDone := make(chan struct{}) + + fw.fakeKubelet.attachFunc = func(podFullName string, uid types.UID, containerName string, in io.Reader, out, stderr io.WriteCloser, tty bool) error { + defer close(attachFuncDone) + if podFullName != expectedPodName { + t.Fatalf("%d: podFullName: expected %s, got %s", i, expectedPodName, podFullName) + } + if test.uid && string(uid) != expectedUid { + t.Fatalf("%d: uid: expected %v, got %v", i, expectedUid, uid) + } + if containerName != expectedContainerName { + t.Fatalf("%d: containerName: expected %s, got %s", i, expectedContainerName, containerName) + } + + if test.stdin { + if in == nil { + t.Fatalf("%d: stdin: expected non-nil", i) + } + b := make([]byte, 10) + n, err := in.Read(b) + if err != nil { + t.Fatalf("%d: error reading from stdin: %v", i, err) + } + if e, a := expectedStdin, string(b[0:n]); e != a { + t.Fatalf("%d: stdin: expected to read %v, got %v", i, e, a) + } + } else if in != nil { + t.Fatalf("%d: stdin: expected nil: %#v", i, in) + } + + if test.stdout { + if out == nil { + t.Fatalf("%d: stdout: expected non-nil", i) + } + _, err := out.Write([]byte(expectedStdout)) + if err != nil { + t.Fatalf("%d:, error writing to stdout: %v", i, err) + } + out.Close() + <-clientStdoutReadDone + } else if out != nil { + t.Fatalf("%d: stdout: expected nil: %#v", i, out) + } + + if tty { + if stderr != nil { + t.Fatalf("%d: tty set but received non-nil stderr: %v", i, stderr) + } + } else if test.stderr { + if stderr == nil { + t.Fatalf("%d: stderr: expected non-nil", i) + } + _, err := stderr.Write([]byte(expectedStderr)) + if err != nil { + t.Fatalf("%d:, error writing to stderr: %v", i, err) + } + stderr.Close() + <-clientStderrReadDone + } else if stderr != nil { + t.Fatalf("%d: stderr: expected nil: %#v", i, stderr) + } + + return nil + } + + var url string + if test.uid { + url = fw.testHTTPServer.URL + "/attach/" + podNamespace + "/" + podName + "/" + expectedUid + "/" + expectedContainerName + "?" + } else { + url = fw.testHTTPServer.URL + "/attach/" + podNamespace + "/" + podName + "/" + expectedContainerName + "?" + } + if test.stdin { + url += "&" + api.ExecStdinParam + "=1" + } + if test.stdout { + url += "&" + api.ExecStdoutParam + "=1" + } + if test.stderr && !test.tty { + url += "&" + api.ExecStderrParam + "=1" + } + if test.tty { + url += "&" + api.ExecTTYParam + "=1" + } + + var ( + resp *http.Response + err error + upgradeRoundTripper httpstream.UpgradeRoundTripper + c *http.Client + ) + + if test.responseStatusCode != http.StatusSwitchingProtocols { + c = &http.Client{} + } else { + upgradeRoundTripper = spdy.NewRoundTripper(nil) + c = &http.Client{Transport: upgradeRoundTripper} + } + + resp, err = c.Get(url) + if err != nil { + t.Fatalf("%d: Got error GETing: %v", i, err) + } + defer resp.Body.Close() + + _, err = ioutil.ReadAll(resp.Body) + if err != nil { + t.Errorf("%d: Error reading response body: %v", i, err) + } + + if e, a := test.responseStatusCode, resp.StatusCode; e != a { + t.Fatalf("%d: response status: expected %v, got %v", i, e, a) + } + + if test.responseStatusCode != http.StatusSwitchingProtocols { + continue + } + + conn, err := upgradeRoundTripper.NewConnection(resp) + if err != nil { + t.Fatalf("Unexpected error creating streaming connection: %s", err) + } + if conn == nil { + t.Fatalf("%d: unexpected nil conn", i) + } + defer conn.Close() + + h := http.Header{} + h.Set(api.StreamType, api.StreamTypeError) + errorStream, err := conn.CreateStream(h) + if err != nil { + t.Fatalf("%d: error creating error stream: %v", i, err) + } + defer errorStream.Reset() + + if test.stdin { + h.Set(api.StreamType, api.StreamTypeStdin) + stream, err := conn.CreateStream(h) + if err != nil { + t.Fatalf("%d: error creating stdin stream: %v", i, err) + } + defer stream.Reset() + _, err = stream.Write([]byte(expectedStdin)) + if err != nil { + t.Fatalf("%d: error writing to stdin stream: %v", i, err) + } + } + + var stdoutStream httpstream.Stream + if test.stdout { + h.Set(api.StreamType, api.StreamTypeStdout) + stdoutStream, err = conn.CreateStream(h) + if err != nil { + t.Fatalf("%d: error creating stdout stream: %v", i, err) + } + defer stdoutStream.Reset() + } + + var stderrStream httpstream.Stream + if test.stderr && !test.tty { + h.Set(api.StreamType, api.StreamTypeStderr) + stderrStream, err = conn.CreateStream(h) + if err != nil { + t.Fatalf("%d: error creating stderr stream: %v", i, err) + } + defer stderrStream.Reset() + } + + if test.stdout { + output := make([]byte, 10) + n, err := stdoutStream.Read(output) + close(clientStdoutReadDone) + if err != nil { + t.Fatalf("%d: error reading from stdout stream: %v", i, err) + } + if e, a := expectedStdout, string(output[0:n]); e != a { + t.Fatalf("%d: stdout: expected '%v', got '%v'", i, e, a) + } + } + + if test.stderr && !test.tty { + output := make([]byte, 10) + n, err := stderrStream.Read(output) + close(clientStderrReadDone) + if err != nil { + t.Fatalf("%d: error reading from stderr stream: %v", i, err) + } + if e, a := expectedStderr, string(output[0:n]); e != a { + t.Fatalf("%d: stderr: expected '%v', got '%v'", i, e, a) + } + } + + <-attachFuncDone + } +} + func TestServePortForwardIdleTimeout(t *testing.T) { fw := newServerTest()