Add support for Attach to the kubelet.

This is a pre-cursor to supporting 'kubectl attach ...' and 'kubectl run -it ...'
This commit is contained in:
Brendan Burns 2015-07-27 21:48:55 -07:00
parent e45c6f9847
commit 64be76c14d
10 changed files with 357 additions and 12 deletions

View File

@ -234,6 +234,14 @@ func (f *FakeRuntime) ExecInContainer(containerID string, cmd []string, stdin io
return f.Err
}
func (f *FakeRuntime) AttachContainer(containerID string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
f.Lock()
defer f.Unlock()
f.CalledFunctions = append(f.CalledFunctions, "AttachContainer")
return f.Err
}
func (f *FakeRuntime) RunInContainer(containerID string, cmd []string) ([]byte, error) {
f.Lock()
defer f.Unlock()

View File

@ -77,6 +77,12 @@ type Runtime interface {
GetContainerLogs(pod *api.Pod, containerID, tail string, follow bool, stdout, stderr io.Writer) (err error)
// ContainerCommandRunner encapsulates the command runner interfaces for testability.
ContainerCommandRunner
// ContainerAttach encapsulates the attaching to containers for testability
ContainerAttacher
}
type ContainerAttacher interface {
AttachContainer(id string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) (err error)
}
// CommandRunner encapsulates the command runner interfaces for testability.

View File

@ -71,6 +71,7 @@ type DockerInterface interface {
CreateExec(docker.CreateExecOptions) (*docker.Exec, error)
StartExec(string, docker.StartExecOptions) error
InspectExec(id string) (*docker.ExecInspect, error)
AttachToContainer(opts docker.AttachToContainerOptions) error
}
// KubeletContainerName encapsulates a pod name and a Kubernetes container name.

View File

@ -298,6 +298,13 @@ func (f *FakeDockerClient) StartExec(_ string, _ docker.StartExecOptions) error
return nil
}
func (f *FakeDockerClient) AttachToContainer(opts docker.AttachToContainerOptions) error {
f.Lock()
defer f.Unlock()
f.called = append(f.called, "attach")
return nil
}
func (f *FakeDockerClient) InspectExec(id string) (*docker.ExecInspect, error) {
return f.ExecInspect, f.popError("inspect_exec")
}

View File

@ -189,3 +189,12 @@ func (in instrumentedDockerInterface) InspectExec(id string) (*docker.ExecInspec
recordError(operation, err)
return out, err
}
func (in instrumentedDockerInterface) AttachToContainer(opts docker.AttachToContainerOptions) error {
const operation = "attach"
defer recordOperation(operation, time.Now())
err := in.client.AttachToContainer(opts)
recordError(operation, err)
return err
}

View File

@ -1006,6 +1006,21 @@ func (dm *DockerManager) ExecInContainer(containerId string, cmd []string, stdin
return dm.execHandler.ExecInContainer(dm.client, container, cmd, stdin, stdout, stderr, tty)
}
func (dm *DockerManager) AttachContainer(containerId string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
opts := docker.AttachToContainerOptions{
Container: containerId,
InputStream: stdin,
OutputStream: stdout,
ErrorStream: stderr,
Logs: true,
Stdin: stdin != nil,
Stdout: stdout != nil,
Stderr: stderr != nil,
RawTerminal: tty,
}
return dm.client.AttachToContainer(opts)
}
func noPodInfraContainerError(podName, podNamespace string) error {
return fmt.Errorf("cannot find pod infra container in pod %q", kubecontainer.BuildPodFullName(podName, podNamespace))
}

View File

@ -2348,6 +2348,19 @@ func (kl *Kubelet) ExecInContainer(podFullName string, podUID types.UID, contain
return kl.runner.ExecInContainer(string(container.ID), cmd, stdin, stdout, stderr, tty)
}
func (kl *Kubelet) AttachContainer(podFullName string, podUID types.UID, containerName string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
podUID = kl.podManager.TranslatePodUID(podUID)
container, err := kl.findContainer(podFullName, podUID, containerName)
if err != nil {
return err
}
if container == nil {
return fmt.Errorf("container not found (%q)", containerName)
}
return kl.containerRuntime.AttachContainer(string(container.ID), stdin, stdout, stderr, tty)
}
// PortForward connects to the pod's port and copies data between the port
// and the stream.
func (kl *Kubelet) PortForward(podFullName string, podUID types.UID, port uint16, stream io.ReadWriteCloser) error {

View File

@ -18,6 +18,7 @@ package rkt
import (
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
@ -945,6 +946,10 @@ func (r *runtime) RunInContainer(containerID string, cmd []string) ([]byte, erro
return []byte(strings.Join(result, "\n")), err
}
func (r *runtime) AttachContainer(containerID string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
return errors.New("unimplemented")
}
// Note: In rkt, the container ID is in the form of "UUID:appName:ImageID", where
// appName is the container name.
func (r *runtime) ExecInContainer(containerID string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {

View File

@ -101,6 +101,7 @@ type HostInterface interface {
GetPodByName(namespace, name string) (*api.Pod, bool)
RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error)
ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error
AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool) error
GetKubeletContainerLogs(podFullName, containerName, tail string, follow, previous bool, stdout, stderr io.Writer) error
ServeLogs(w http.ResponseWriter, req *http.Request)
PortForward(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error
@ -140,6 +141,7 @@ func (s *Server) InstallDefaultHandlers() {
func (s *Server) InstallDebuggingHandlers() {
s.mux.HandleFunc("/run/", s.handleRun)
s.mux.HandleFunc("/exec/", s.handleExec)
s.mux.HandleFunc("/attach/", s.handleAttach)
s.mux.HandleFunc("/portForward/", s.handlePortForward)
s.mux.HandleFunc("/logs/", s.handleLogs)
@ -367,6 +369,42 @@ func parseContainerCoordinates(path string) (namespace, pod string, uid types.UI
return
}
const streamCreationTimeout = 30 * time.Second
func (s *Server) handleAttach(w http.ResponseWriter, req *http.Request) {
u, err := url.ParseRequestURI(req.RequestURI)
if err != nil {
s.error(w, err)
return
}
podNamespace, podID, uid, container, err := parseContainerCoordinates(u.Path)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
pod, ok := s.host.GetPodByName(podNamespace, podID)
if !ok {
http.Error(w, "Pod does not exist", http.StatusNotFound)
return
}
stdinStream, stdoutStream, stderrStream, errorStream, conn, tty, ok := s.createStreams(w, req)
if conn != nil {
defer conn.Close()
}
if !ok {
// error is handled in the createStreams function
return
}
err = s.host.AttachContainer(kubecontainer.GetPodFullName(pod), uid, container, stdinStream, stdoutStream, stderrStream, tty)
if err != nil {
msg := fmt.Sprintf("Error executing command in container: %v", err)
glog.Error(msg)
errorStream.Write([]byte(msg))
}
}
// handleRun handles requests to run a command inside a container.
func (s *Server) handleRun(w http.ResponseWriter, req *http.Request) {
u, err := url.ParseRequestURI(req.RequestURI)
@ -394,8 +432,6 @@ func (s *Server) handleRun(w http.ResponseWriter, req *http.Request) {
w.Write(data)
}
const streamCreationTimeout = 30 * time.Second
// handleExec handles requests to run a command inside a container.
func (s *Server) handleExec(w http.ResponseWriter, req *http.Request) {
u, err := url.ParseRequestURI(req.RequestURI)
@ -413,7 +449,22 @@ func (s *Server) handleExec(w http.ResponseWriter, req *http.Request) {
http.Error(w, "Pod does not exist", http.StatusNotFound)
return
}
stdinStream, stdoutStream, stderrStream, errorStream, conn, tty, ok := s.createStreams(w, req)
if conn != nil {
defer conn.Close()
}
if !ok {
return
}
err = s.host.ExecInContainer(kubecontainer.GetPodFullName(pod), uid, container, u.Query()[api.ExecCommandParamm], stdinStream, stdoutStream, stderrStream, tty)
if err != nil {
msg := fmt.Sprintf("Error executing command in container: %v", err)
glog.Error(msg)
errorStream.Write([]byte(msg))
}
}
func (s *Server) createStreams(w http.ResponseWriter, req *http.Request) (io.Reader, io.WriteCloser, io.WriteCloser, io.WriteCloser, httpstream.Connection, bool, bool) {
req.ParseForm()
// start at 1 for error stream
expectedStreams := 1
@ -430,7 +481,7 @@ func (s *Server) handleExec(w http.ResponseWriter, req *http.Request) {
if expectedStreams == 1 {
http.Error(w, "You must specify at least 1 of stdin, stdout, stderr", http.StatusBadRequest)
return
return nil, nil, nil, nil, nil, false, false
}
streamCh := make(chan httpstream.Stream)
@ -445,9 +496,8 @@ func (s *Server) handleExec(w http.ResponseWriter, req *http.Request) {
// The upgrader is responsible for notifying the client of any errors that
// occurred during upgrading. All we can do is return here at this point
// if we weren't successful in upgrading.
return
return nil, nil, nil, nil, nil, false, false
}
defer conn.Close()
conn.SetIdleTimeout(s.host.StreamingConnectionIdleTimeout())
@ -485,7 +535,7 @@ WaitForStreams:
// TODO find a way to return the error to the user. Maybe use a separate
// stream to report errors?
glog.Error("Timed out waiting for client to create streams")
return
return nil, nil, nil, nil, nil, false, false
}
}
@ -494,12 +544,7 @@ WaitForStreams:
stdinStream.Close()
}
err = s.host.ExecInContainer(kubecontainer.GetPodFullName(pod), uid, container, u.Query()[api.ExecCommandParamm], stdinStream, stdoutStream, stderrStream, tty)
if err != nil {
msg := fmt.Sprintf("Error executing command in container: %v", err)
glog.Error(msg)
errorStream.Write([]byte(msg))
}
return stdinStream, stdoutStream, stderrStream, errorStream, conn, tty, true
}
func parsePodCoordinates(path string) (namespace, pod string, uid types.UID, err error) {

View File

@ -52,6 +52,7 @@ type fakeKubelet struct {
runFunc func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error)
containerVersionFunc func() (kubecontainer.Version, error)
execFunc func(pod string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error
attachFunc func(pod string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool) error
portForwardFunc func(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error
containerLogsFunc func(podFullName, containerName, tail string, follow, pervious bool, stdout, stderr io.Writer) error
streamingConnectionIdleTimeoutFunc func() time.Duration
@ -116,6 +117,10 @@ func (fk *fakeKubelet) ExecInContainer(name string, uid types.UID, container str
return fk.execFunc(name, uid, container, cmd, in, out, err, tty)
}
func (fk *fakeKubelet) AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool) error {
return fk.attachFunc(name, uid, container, in, out, err, tty)
}
func (fk *fakeKubelet) PortForward(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error {
return fk.portForwardFunc(name, uid, port, stream)
}
@ -931,6 +936,237 @@ func TestServeExecInContainer(t *testing.T) {
}
}
// TODO: largely cloned from TestServeExecContainer, refactor and re-use code
func TestServeAttachContainer(t *testing.T) {
tests := []struct {
stdin bool
stdout bool
stderr bool
tty bool
responseStatusCode int
uid bool
}{
{responseStatusCode: http.StatusBadRequest},
{stdin: true, responseStatusCode: http.StatusSwitchingProtocols},
{stdout: true, responseStatusCode: http.StatusSwitchingProtocols},
{stderr: true, responseStatusCode: http.StatusSwitchingProtocols},
{stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols},
{stdout: true, stderr: true, tty: true, responseStatusCode: http.StatusSwitchingProtocols},
{stdin: true, stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols},
}
for i, test := range tests {
fw := newServerTest()
fw.fakeKubelet.streamingConnectionIdleTimeoutFunc = func() time.Duration {
return 0
}
podNamespace := "other"
podName := "foo"
expectedPodName := getPodName(podName, podNamespace)
expectedUid := "9b01b80f-8fb4-11e4-95ab-4200af06647"
expectedContainerName := "baz"
expectedStdin := "stdin"
expectedStdout := "stdout"
expectedStderr := "stderr"
attachFuncDone := make(chan struct{})
clientStdoutReadDone := make(chan struct{})
clientStderrReadDone := make(chan struct{})
fw.fakeKubelet.attachFunc = func(podFullName string, uid types.UID, containerName string, in io.Reader, out, stderr io.WriteCloser, tty bool) error {
defer close(attachFuncDone)
if podFullName != expectedPodName {
t.Fatalf("%d: podFullName: expected %s, got %s", i, expectedPodName, podFullName)
}
if test.uid && string(uid) != expectedUid {
t.Fatalf("%d: uid: expected %v, got %v", i, expectedUid, uid)
}
if containerName != expectedContainerName {
t.Fatalf("%d: containerName: expected %s, got %s", i, expectedContainerName, containerName)
}
if test.stdin {
if in == nil {
t.Fatalf("%d: stdin: expected non-nil", i)
}
b := make([]byte, 10)
n, err := in.Read(b)
if err != nil {
t.Fatalf("%d: error reading from stdin: %v", i, err)
}
if e, a := expectedStdin, string(b[0:n]); e != a {
t.Fatalf("%d: stdin: expected to read %v, got %v", i, e, a)
}
} else if in != nil {
t.Fatalf("%d: stdin: expected nil: %#v", i, in)
}
if test.stdout {
if out == nil {
t.Fatalf("%d: stdout: expected non-nil", i)
}
_, err := out.Write([]byte(expectedStdout))
if err != nil {
t.Fatalf("%d:, error writing to stdout: %v", i, err)
}
out.Close()
<-clientStdoutReadDone
} else if out != nil {
t.Fatalf("%d: stdout: expected nil: %#v", i, out)
}
if tty {
if stderr != nil {
t.Fatalf("%d: tty set but received non-nil stderr: %v", i, stderr)
}
} else if test.stderr {
if stderr == nil {
t.Fatalf("%d: stderr: expected non-nil", i)
}
_, err := stderr.Write([]byte(expectedStderr))
if err != nil {
t.Fatalf("%d:, error writing to stderr: %v", i, err)
}
stderr.Close()
<-clientStderrReadDone
} else if stderr != nil {
t.Fatalf("%d: stderr: expected nil: %#v", i, stderr)
}
return nil
}
var url string
if test.uid {
url = fw.testHTTPServer.URL + "/attach/" + podNamespace + "/" + podName + "/" + expectedUid + "/" + expectedContainerName + "?"
} else {
url = fw.testHTTPServer.URL + "/attach/" + podNamespace + "/" + podName + "/" + expectedContainerName + "?"
}
if test.stdin {
url += "&" + api.ExecStdinParam + "=1"
}
if test.stdout {
url += "&" + api.ExecStdoutParam + "=1"
}
if test.stderr && !test.tty {
url += "&" + api.ExecStderrParam + "=1"
}
if test.tty {
url += "&" + api.ExecTTYParam + "=1"
}
var (
resp *http.Response
err error
upgradeRoundTripper httpstream.UpgradeRoundTripper
c *http.Client
)
if test.responseStatusCode != http.StatusSwitchingProtocols {
c = &http.Client{}
} else {
upgradeRoundTripper = spdy.NewRoundTripper(nil)
c = &http.Client{Transport: upgradeRoundTripper}
}
resp, err = c.Get(url)
if err != nil {
t.Fatalf("%d: Got error GETing: %v", i, err)
}
defer resp.Body.Close()
_, err = ioutil.ReadAll(resp.Body)
if err != nil {
t.Errorf("%d: Error reading response body: %v", i, err)
}
if e, a := test.responseStatusCode, resp.StatusCode; e != a {
t.Fatalf("%d: response status: expected %v, got %v", i, e, a)
}
if test.responseStatusCode != http.StatusSwitchingProtocols {
continue
}
conn, err := upgradeRoundTripper.NewConnection(resp)
if err != nil {
t.Fatalf("Unexpected error creating streaming connection: %s", err)
}
if conn == nil {
t.Fatalf("%d: unexpected nil conn", i)
}
defer conn.Close()
h := http.Header{}
h.Set(api.StreamType, api.StreamTypeError)
errorStream, err := conn.CreateStream(h)
if err != nil {
t.Fatalf("%d: error creating error stream: %v", i, err)
}
defer errorStream.Reset()
if test.stdin {
h.Set(api.StreamType, api.StreamTypeStdin)
stream, err := conn.CreateStream(h)
if err != nil {
t.Fatalf("%d: error creating stdin stream: %v", i, err)
}
defer stream.Reset()
_, err = stream.Write([]byte(expectedStdin))
if err != nil {
t.Fatalf("%d: error writing to stdin stream: %v", i, err)
}
}
var stdoutStream httpstream.Stream
if test.stdout {
h.Set(api.StreamType, api.StreamTypeStdout)
stdoutStream, err = conn.CreateStream(h)
if err != nil {
t.Fatalf("%d: error creating stdout stream: %v", i, err)
}
defer stdoutStream.Reset()
}
var stderrStream httpstream.Stream
if test.stderr && !test.tty {
h.Set(api.StreamType, api.StreamTypeStderr)
stderrStream, err = conn.CreateStream(h)
if err != nil {
t.Fatalf("%d: error creating stderr stream: %v", i, err)
}
defer stderrStream.Reset()
}
if test.stdout {
output := make([]byte, 10)
n, err := stdoutStream.Read(output)
close(clientStdoutReadDone)
if err != nil {
t.Fatalf("%d: error reading from stdout stream: %v", i, err)
}
if e, a := expectedStdout, string(output[0:n]); e != a {
t.Fatalf("%d: stdout: expected '%v', got '%v'", i, e, a)
}
}
if test.stderr && !test.tty {
output := make([]byte, 10)
n, err := stderrStream.Read(output)
close(clientStderrReadDone)
if err != nil {
t.Fatalf("%d: error reading from stderr stream: %v", i, err)
}
if e, a := expectedStderr, string(output[0:n]); e != a {
t.Fatalf("%d: stderr: expected '%v', got '%v'", i, e, a)
}
}
<-attachFuncDone
}
}
func TestServePortForwardIdleTimeout(t *testing.T) {
fw := newServerTest()