diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index 444e241389f..26aee8faa0f 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -76,12 +76,12 @@ go_library( "//pkg/kubelet/remote:go_default_library", "//pkg/kubelet/rkt:go_default_library", "//pkg/kubelet/server:go_default_library", + "//pkg/kubelet/server/remotecommand:go_default_library", "//pkg/kubelet/server/stats:go_default_library", "//pkg/kubelet/status:go_default_library", "//pkg/kubelet/sysctl:go_default_library", "//pkg/kubelet/types:go_default_library", "//pkg/kubelet/util/format:go_default_library", - "//pkg/kubelet/util/ioutils:go_default_library", "//pkg/kubelet/util/queue:go_default_library", "//pkg/kubelet/util/sliceutils:go_default_library", "//pkg/kubelet/volumemanager:go_default_library", @@ -173,6 +173,7 @@ go_test( "//pkg/kubelet/pod/testing:go_default_library", "//pkg/kubelet/prober/results:go_default_library", "//pkg/kubelet/prober/testing:go_default_library", + "//pkg/kubelet/server/remotecommand:go_default_library", "//pkg/kubelet/server/stats:go_default_library", "//pkg/kubelet/status:go_default_library", "//pkg/kubelet/types:go_default_library", @@ -190,7 +191,6 @@ go_test( "//pkg/util/rand:go_default_library", "//pkg/util/runtime:go_default_library", "//pkg/util/sets:go_default_library", - "//pkg/util/term:go_default_library", "//pkg/util/testing:go_default_library", "//pkg/util/uuid:go_default_library", "//pkg/util/wait:go_default_library", diff --git a/pkg/kubelet/container/BUILD b/pkg/kubelet/container/BUILD index 4121c633b4e..5d03b4ba505 100644 --- a/pkg/kubelet/container/BUILD +++ b/pkg/kubelet/container/BUILD @@ -33,6 +33,7 @@ go_library( "//pkg/client/record:go_default_library", "//pkg/kubelet/api/v1alpha1/runtime:go_default_library", "//pkg/kubelet/util/format:go_default_library", + "//pkg/kubelet/util/ioutils:go_default_library", "//pkg/runtime:go_default_library", "//pkg/types:go_default_library", "//pkg/util/errors:go_default_library", diff --git a/pkg/kubelet/container/helpers.go b/pkg/kubelet/container/helpers.go index 4b06a92d093..50a8b4a11dd 100644 --- a/pkg/kubelet/container/helpers.go +++ b/pkg/kubelet/container/helpers.go @@ -17,6 +17,7 @@ limitations under the License. package container import ( + "bytes" "fmt" "hash/adler32" "strings" @@ -28,6 +29,7 @@ import ( "k8s.io/kubernetes/pkg/client/record" runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" "k8s.io/kubernetes/pkg/kubelet/util/format" + "k8s.io/kubernetes/pkg/kubelet/util/ioutils" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" hashutil "k8s.io/kubernetes/pkg/util/hash" @@ -223,3 +225,23 @@ func FormatPod(pod *Pod) string { // (DNS subdomain format), while allowed in the container name format. return fmt.Sprintf("%s_%s(%s)", pod.Name, pod.Namespace, pod.ID) } + +type containerCommandRunnerWrapper struct { + DirectStreamingRuntime +} + +var _ ContainerCommandRunner = &containerCommandRunnerWrapper{} + +func DirectStreamingRunner(runtime DirectStreamingRuntime) ContainerCommandRunner { + return &containerCommandRunnerWrapper{runtime} +} + +func (r *containerCommandRunnerWrapper) RunInContainer(id ContainerID, cmd []string) ([]byte, error) { + var buffer bytes.Buffer + output := ioutils.WriteCloserWrapper(&buffer) + err := r.ExecInContainer(id, cmd, nil, output, output, false, nil) + // Even if err is non-nil, there still may be output (e.g. the exec wrote to stdout or stderr but + // the command returned a nonzero exit code). Therefore, always return the output along with the + // error. + return buffer.Bytes(), err +} diff --git a/pkg/kubelet/container/runtime.go b/pkg/kubelet/container/runtime.go index cc0ad3fe45e..f9ed18ee638 100644 --- a/pkg/kubelet/container/runtime.go +++ b/pkg/kubelet/container/runtime.go @@ -19,6 +19,7 @@ package container import ( "fmt" "io" + "net/url" "reflect" "strings" "time" @@ -112,10 +113,6 @@ type Runtime interface { GetContainerLogs(pod *api.Pod, containerID ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) (err error) // Delete a container. If the container is still running, an error is returned. DeleteContainer(containerID ContainerID) error - // ContainerCommandRunner encapsulates the command runner interfaces for testability. - ContainerCommandRunner - // ContainerAttach encapsulates the attaching to containers for testability - ContainerAttacher // ImageService provides methods to image-related methods. ImageService // UpdatePodCIDR sends a new podCIDR to the runtime. @@ -124,6 +121,28 @@ type Runtime interface { UpdatePodCIDR(podCIDR string) error } +// DirectStreamingRuntime is the interface implemented by runtimes for which the streaming calls +// (exec/attach/port-forward) should be served directly by the Kubelet. +type DirectStreamingRuntime interface { + // Runs the command in the container of the specified pod using nsenter. + // Attaches the processes stdin, stdout, and stderr. Optionally uses a + // tty. + ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error + // Forward the specified port from the specified pod to the stream. + PortForward(pod *Pod, port uint16, stream io.ReadWriteCloser) error + // ContainerAttach encapsulates the attaching to containers for testability + ContainerAttacher +} + +// IndirectStreamingRuntime is the interface implemented by runtimes that handle the serving of the +// streaming calls (exec/attach/port-forward) themselves. In this case, Kubelet should redirect to +// the runtime server. +type IndirectStreamingRuntime interface { + GetExec(id ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error) + GetAttach(id ContainerID, stdin, stdout, stderr bool) (*url.URL, error) + GetPortForward(podName, podNamespace string, podUID types.UID) (*url.URL, error) +} + type ImageService interface { // PullImage pulls an image from the network to local storage using the supplied // secrets if necessary. @@ -142,14 +161,10 @@ type ContainerAttacher interface { AttachContainer(id ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) (err error) } -// CommandRunner encapsulates the command runner interfaces for testability. type ContainerCommandRunner interface { - // Runs the command in the container of the specified pod using nsenter. - // Attaches the processes stdin, stdout, and stderr. Optionally uses a - // tty. - ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error - // Forward the specified port from the specified pod to the stream. - PortForward(pod *Pod, port uint16, stream io.ReadWriteCloser) error + // RunInContainer synchronously executes the command in the container, and returns the output. + // If the command completes with a non-0 exit code, a pkg/util/exec.ExitError will be returned. + RunInContainer(id ContainerID, cmd []string) ([]byte, error) } // Pod is a group of containers. diff --git a/pkg/kubelet/container/testing/fake_runtime.go b/pkg/kubelet/container/testing/fake_runtime.go index 1873512fa5f..a79c65f33cc 100644 --- a/pkg/kubelet/container/testing/fake_runtime.go +++ b/pkg/kubelet/container/testing/fake_runtime.go @@ -19,6 +19,7 @@ package testing import ( "fmt" "io" + "net/url" "reflect" "sync" "time" @@ -57,6 +58,31 @@ type FakeRuntime struct { StatusErr error } +type FakeDirectStreamingRuntime struct { + *FakeRuntime + + // Arguments to streaming method calls. + Args struct { + // Attach / Exec args + ContainerID ContainerID + Cmd []string + Stdin io.Reader + Stdout io.WriteCloser + Stderr io.WriteCloser + TTY bool + // Port-forward args + Pod *Pod + Port uint16 + Stream io.ReadWriteCloser + } +} + +const FakeHost = "localhost:12345" + +type FakeIndirectStreamingRuntime struct { + *FakeRuntime +} + // FakeRuntime should implement Runtime. var _ Runtime = &FakeRuntime{} @@ -279,19 +305,32 @@ func (f *FakeRuntime) GetPodStatus(uid types.UID, name, namespace string) (*PodS return &status, f.Err } -func (f *FakeRuntime) ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { +func (f *FakeDirectStreamingRuntime) ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { f.Lock() defer f.Unlock() f.CalledFunctions = append(f.CalledFunctions, "ExecInContainer") + f.Args.ContainerID = containerID + f.Args.Cmd = cmd + f.Args.Stdin = stdin + f.Args.Stdout = stdout + f.Args.Stderr = stderr + f.Args.TTY = tty + return f.Err } -func (f *FakeRuntime) AttachContainer(containerID ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { +func (f *FakeDirectStreamingRuntime) AttachContainer(containerID ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { f.Lock() defer f.Unlock() f.CalledFunctions = append(f.CalledFunctions, "AttachContainer") + f.Args.ContainerID = containerID + f.Args.Stdin = stdin + f.Args.Stdout = stdout + f.Args.Stderr = stderr + f.Args.TTY = tty + return f.Err } @@ -349,11 +388,15 @@ func (f *FakeRuntime) RemoveImage(image ImageSpec) error { return f.Err } -func (f *FakeRuntime) PortForward(pod *Pod, port uint16, stream io.ReadWriteCloser) error { +func (f *FakeDirectStreamingRuntime) PortForward(pod *Pod, port uint16, stream io.ReadWriteCloser) error { f.Lock() defer f.Unlock() f.CalledFunctions = append(f.CalledFunctions, "PortForward") + f.Args.Pod = pod + f.Args.Port = port + f.Args.Stream = stream + return f.Err } @@ -405,3 +448,47 @@ func (f *FakeRuntime) ImageStats() (*ImageStats, error) { f.CalledFunctions = append(f.CalledFunctions, "ImageStats") return nil, f.Err } + +func (f *FakeIndirectStreamingRuntime) GetExec(id ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error) { + f.Lock() + defer f.Unlock() + + f.CalledFunctions = append(f.CalledFunctions, "GetExec") + return &url.URL{Host: FakeHost}, f.Err +} + +func (f *FakeIndirectStreamingRuntime) GetAttach(id ContainerID, stdin, stdout, stderr bool) (*url.URL, error) { + f.Lock() + defer f.Unlock() + + f.CalledFunctions = append(f.CalledFunctions, "GetAttach") + return &url.URL{Host: FakeHost}, f.Err +} + +func (f *FakeIndirectStreamingRuntime) GetPortForward(podName, podNamespace string, podUID types.UID) (*url.URL, error) { + f.Lock() + defer f.Unlock() + + f.CalledFunctions = append(f.CalledFunctions, "GetPortForward") + return &url.URL{Host: FakeHost}, f.Err +} + +type FakeContainerCommandRunner struct { + // what to return + Stdout string + Err error + + // actual values when invoked + ContainerID ContainerID + Cmd []string +} + +var _ ContainerCommandRunner = &FakeContainerCommandRunner{} + +func (f *FakeContainerCommandRunner) RunInContainer(containerID ContainerID, cmd []string) ([]byte, error) { + // record invoked values + f.ContainerID = containerID + f.Cmd = cmd + + return []byte(f.Stdout), f.Err +} diff --git a/pkg/kubelet/dockertools/docker_manager.go b/pkg/kubelet/dockertools/docker_manager.go index 3db194dc0b0..63a0f290c0e 100644 --- a/pkg/kubelet/dockertools/docker_manager.go +++ b/pkg/kubelet/dockertools/docker_manager.go @@ -107,8 +107,9 @@ const ( ) var ( - // DockerManager implements the Runtime interface. - _ kubecontainer.Runtime = &DockerManager{} + // DockerManager implements the Runtime and DirectStreamingRuntime interfaces. + _ kubecontainer.Runtime = &DockerManager{} + _ kubecontainer.DirectStreamingRuntime = &DockerManager{} // TODO: make this a TTL based pull (if image older than X policy, pull) podInfraContainerImagePullPolicy = api.PullIfNotPresent @@ -281,7 +282,8 @@ func NewDockerManager( imageStatsProvider: newImageStatsProvider(client), seccompProfileRoot: seccompProfileRoot, } - dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm) + cmdRunner := kubecontainer.DirectStreamingRunner(dm) + dm.runner = lifecycle.NewHandlerRunner(httpClient, cmdRunner, dm) dm.imagePuller = images.NewImageManager(kubecontainer.FilterEventRecorder(recorder), dm, imageBackOff, serializeImagePulls, qps, burst) dm.containerGC = NewContainerGC(client, podGetter, containerLogsDir) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 7772489b8a8..3ddf5edf1ba 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -551,7 +551,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub // becomes the default. klet.networkPlugin = nil - klet.containerRuntime, err = kuberuntime.NewKubeGenericRuntimeManager( + runtime, err := kuberuntime.NewKubeGenericRuntimeManager( kubecontainer.FilterEventRecorder(kubeDeps.Recorder), klet.livenessManager, containerRefManager, @@ -582,9 +582,11 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub if err != nil { return nil, err } + klet.containerRuntime = runtime + klet.runner = runtime default: // Only supported one for now, continue. - klet.containerRuntime = dockertools.NewDockerManager( + runtime := dockertools.NewDockerManager( kubeDeps.DockerClient, kubecontainer.FilterEventRecorder(kubeDeps.Recorder), klet.livenessManager, @@ -616,6 +618,8 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub kubeCfg.SeccompProfileRoot, kubeDeps.ContainerRuntimeOptions..., ) + klet.containerRuntime = runtime + klet.runner = kubecontainer.DirectStreamingRunner(runtime) } case "rkt": // TODO: Include hairpin mode settings in rkt? @@ -647,6 +651,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub return nil, err } klet.containerRuntime = rktRuntime + klet.runner = kubecontainer.DirectStreamingRunner(rktRuntime) case "remote": remoteRuntimeService, err := remote.NewRemoteRuntimeService(kubeCfg.RemoteRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration) if err != nil { @@ -656,7 +661,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub if err != nil { return nil, err } - klet.containerRuntime, err = kuberuntime.NewKubeGenericRuntimeManager( + runtime, err := kuberuntime.NewKubeGenericRuntimeManager( kubecontainer.FilterEventRecorder(kubeDeps.Recorder), klet.livenessManager, containerRefManager, @@ -677,6 +682,8 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub if err != nil { return nil, err } + klet.containerRuntime = runtime + klet.runner = runtime default: return nil, fmt.Errorf("unsupported container runtime %q specified", kubeCfg.ContainerRuntime) } @@ -703,7 +710,6 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub } klet.imageManager = imageManager - klet.runner = klet.containerRuntime klet.statusManager = status.NewManager(kubeClient, klet.podManager) klet.probeManager = prober.NewManager( diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index dd74cad68b6..14f648518c4 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -22,6 +22,7 @@ import ( "io" "io/ioutil" "net/http" + "net/url" "os" "path" "path/filepath" @@ -38,10 +39,10 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/envvars" "k8s.io/kubernetes/pkg/kubelet/images" + "k8s.io/kubernetes/pkg/kubelet/server/remotecommand" "k8s.io/kubernetes/pkg/kubelet/status" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/util/format" - "k8s.io/kubernetes/pkg/kubelet/util/ioutils" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/sets" @@ -1204,14 +1205,13 @@ func (kl *Kubelet) findContainer(podFullName string, podUID types.UID, container if err != nil { return nil, err } + podUID = kl.podManager.TranslatePodUID(podUID) pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID) return pod.FindContainerByName(containerName), nil } // Run a command in a container, returns the combined stdout, stderr as an array of bytes func (kl *Kubelet) RunInContainer(podFullName string, podUID types.UID, containerName string, cmd []string) ([]byte, error) { - podUID = kl.podManager.TranslatePodUID(podUID) - container, err := kl.findContainer(podFullName, podUID, containerName) if err != nil { return nil, err @@ -1219,20 +1219,16 @@ func (kl *Kubelet) RunInContainer(podFullName string, podUID types.UID, containe if container == nil { return nil, fmt.Errorf("container not found (%q)", containerName) } - - var buffer bytes.Buffer - output := ioutils.WriteCloserWrapper(&buffer) - err = kl.runner.ExecInContainer(container.ID, cmd, nil, output, output, false, nil) - // Even if err is non-nil, there still may be output (e.g. the exec wrote to stdout or stderr but - // the command returned a nonzero exit code). Therefore, always return the output along with the - // error. - return buffer.Bytes(), err + return kl.runner.RunInContainer(container.ID, cmd) } // ExecInContainer executes a command in a container, connecting the supplied // stdin/stdout/stderr to the command's IO streams. func (kl *Kubelet) ExecInContainer(podFullName string, podUID types.UID, containerName string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { - podUID = kl.podManager.TranslatePodUID(podUID) + streamingRuntime, ok := kl.containerRuntime.(kubecontainer.DirectStreamingRuntime) + if !ok { + return fmt.Errorf("streaming methods not supported by runtime") + } container, err := kl.findContainer(podFullName, podUID, containerName) if err != nil { @@ -1241,13 +1237,16 @@ func (kl *Kubelet) ExecInContainer(podFullName string, podUID types.UID, contain if container == nil { return fmt.Errorf("container not found (%q)", containerName) } - return kl.runner.ExecInContainer(container.ID, cmd, stdin, stdout, stderr, tty, resize) + return streamingRuntime.ExecInContainer(container.ID, cmd, stdin, stdout, stderr, tty, resize) } // AttachContainer uses the container runtime to attach the given streams to // the given container. func (kl *Kubelet) AttachContainer(podFullName string, podUID types.UID, containerName string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { - podUID = kl.podManager.TranslatePodUID(podUID) + streamingRuntime, ok := kl.containerRuntime.(kubecontainer.DirectStreamingRuntime) + if !ok { + return fmt.Errorf("streaming methods not supported by runtime") + } container, err := kl.findContainer(podFullName, podUID, containerName) if err != nil { @@ -1256,23 +1255,92 @@ func (kl *Kubelet) AttachContainer(podFullName string, podUID types.UID, contain if container == nil { return fmt.Errorf("container not found (%q)", containerName) } - return kl.containerRuntime.AttachContainer(container.ID, stdin, stdout, stderr, tty, resize) + return streamingRuntime.AttachContainer(container.ID, stdin, stdout, stderr, tty, resize) } // PortForward connects to the pod's port and copies data between the port // and the stream. func (kl *Kubelet) PortForward(podFullName string, podUID types.UID, port uint16, stream io.ReadWriteCloser) error { - podUID = kl.podManager.TranslatePodUID(podUID) + streamingRuntime, ok := kl.containerRuntime.(kubecontainer.DirectStreamingRuntime) + if !ok { + return fmt.Errorf("streaming methods not supported by runtime") + } pods, err := kl.containerRuntime.GetPods(false) if err != nil { return err } + podUID = kl.podManager.TranslatePodUID(podUID) pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID) if pod.IsEmpty() { return fmt.Errorf("pod not found (%q)", podFullName) } - return kl.runner.PortForward(&pod, port, stream) + return streamingRuntime.PortForward(&pod, port, stream) +} + +// GetExec gets the URL the exec will be served from, or nil if the Kubelet will serve it. +func (kl *Kubelet) GetExec(podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommand.Options) (*url.URL, error) { + switch streamingRuntime := kl.containerRuntime.(type) { + case kubecontainer.DirectStreamingRuntime: + // Kubelet will serve the exec directly. + return nil, nil + case kubecontainer.IndirectStreamingRuntime: + container, err := kl.findContainer(podFullName, podUID, containerName) + if err != nil { + return nil, err + } + if container == nil { + return nil, fmt.Errorf("container not found (%q)", containerName) + } + return streamingRuntime.GetExec(container.ID, cmd, streamOpts.Stdin, streamOpts.Stdout, streamOpts.Stderr, streamOpts.TTY) + default: + return nil, fmt.Errorf("container runtime does not support exec") + } +} + +// GetAttach gets the URL the attach will be served from, or nil if the Kubelet will serve it. +func (kl *Kubelet) GetAttach(podFullName string, podUID types.UID, containerName string, streamOpts remotecommand.Options) (*url.URL, error) { + switch streamingRuntime := kl.containerRuntime.(type) { + case kubecontainer.DirectStreamingRuntime: + // Kubelet will serve the attach directly. + return nil, nil + case kubecontainer.IndirectStreamingRuntime: + container, err := kl.findContainer(podFullName, podUID, containerName) + if err != nil { + return nil, err + } + if container == nil { + return nil, fmt.Errorf("container not found (%q)", containerName) + } + + return streamingRuntime.GetAttach(container.ID, streamOpts.Stdin, streamOpts.Stdout, streamOpts.Stderr) + default: + return nil, fmt.Errorf("container runtime does not support attach") + } +} + +// GetPortForward gets the URL the port-forward will be served from, or nil if the Kubelet will serve it. +func (kl *Kubelet) GetPortForward(podName, podNamespace string, podUID types.UID) (*url.URL, error) { + switch streamingRuntime := kl.containerRuntime.(type) { + case kubecontainer.DirectStreamingRuntime: + // Kubelet will serve the attach directly. + return nil, nil + case kubecontainer.IndirectStreamingRuntime: + pods, err := kl.containerRuntime.GetPods(false) + if err != nil { + return nil, err + } + podUID = kl.podManager.TranslatePodUID(podUID) + podFullName := kubecontainer.BuildPodFullName(podName, podNamespace) + pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID) + if pod.IsEmpty() { + return nil, fmt.Errorf("pod not found (%q)", podFullName) + } + + return streamingRuntime.GetPortForward(podName, podNamespace, podUID) + default: + return nil, fmt.Errorf("container runtime does not support port-forward") + } } // cleanupOrphanedPodCgroups removes the Cgroups of pods that should not be diff --git a/pkg/kubelet/kubelet_pods_test.go b/pkg/kubelet/kubelet_pods_test.go index add58b6f3c4..85e14e21af3 100644 --- a/pkg/kubelet/kubelet_pods_test.go +++ b/pkg/kubelet/kubelet_pods_test.go @@ -19,8 +19,6 @@ package kubelet import ( "bytes" "errors" - "fmt" - "io" "net" "sort" "testing" @@ -32,9 +30,9 @@ import ( "k8s.io/kubernetes/pkg/apimachinery/registered" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" + "k8s.io/kubernetes/pkg/kubelet/server/remotecommand" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/types" - "k8s.io/kubernetes/pkg/util/term" ) func TestMakeMounts(t *testing.T) { @@ -112,47 +110,6 @@ func TestMakeMounts(t *testing.T) { assert.Equal(t, expectedMounts, mounts, "mounts of container %+v", container) } -type fakeContainerCommandRunner struct { - // what was passed in - Cmd []string - ID kubecontainer.ContainerID - PodID types.UID - E error - Stdin io.Reader - Stdout io.WriteCloser - Stderr io.WriteCloser - TTY bool - Port uint16 - Stream io.ReadWriteCloser - - // what to return - StdoutData string - StderrData string -} - -func (f *fakeContainerCommandRunner) ExecInContainer(id kubecontainer.ContainerID, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error { - // record params - f.Cmd = cmd - f.ID = id - f.Stdin = in - f.Stdout = out - f.Stderr = err - f.TTY = tty - - // Copy stdout/stderr data - fmt.Fprint(out, f.StdoutData) - fmt.Fprint(out, f.StderrData) - - return f.E -} - -func (f *fakeContainerCommandRunner) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error { - f.PodID = pod.ID - f.Port = port - f.Stream = stream - return nil -} - func TestRunInContainerNoSuchPod(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) kubelet := testKubelet.kubelet @@ -172,14 +129,13 @@ func TestRunInContainerNoSuchPod(t *testing.T) { } func TestRunInContainer(t *testing.T) { - for _, testError := range []error{nil, errors.New("foo")} { + for _, testError := range []error{nil, errors.New("bar")} { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) kubelet := testKubelet.kubelet fakeRuntime := testKubelet.fakeRuntime - fakeCommandRunner := fakeContainerCommandRunner{ - E: testError, - StdoutData: "foo", - StderrData: "bar", + fakeCommandRunner := containertest.FakeContainerCommandRunner{ + Err: testError, + Stdout: "foo", } kubelet.runner = &fakeCommandRunner @@ -198,11 +154,11 @@ func TestRunInContainer(t *testing.T) { } cmd := []string{"ls"} actualOutput, err := kubelet.RunInContainer("podFoo_nsFoo", "", "containerFoo", cmd) - assert.Equal(t, containerID, fakeCommandRunner.ID, "(testError=%v) ID", testError) + assert.Equal(t, containerID, fakeCommandRunner.ContainerID, "(testError=%v) ID", testError) assert.Equal(t, cmd, fakeCommandRunner.Cmd, "(testError=%v) command", testError) // this isn't 100% foolproof as a bug in a real ContainerCommandRunner where it fails to copy to stdout/stderr wouldn't be caught by this test - assert.Equal(t, "foobar", string(actualOutput), "(testError=%v) output", testError) - assert.Equal(t, fmt.Sprintf("%s", err), fmt.Sprintf("%s", testError), "(testError=%v) err", testError) + assert.Equal(t, "foo", string(actualOutput), "(testError=%v) output", testError) + assert.Equal(t, err, testError, "(testError=%v) err", testError) } } @@ -1052,73 +1008,6 @@ func TestPodPhaseWithRestartOnFailure(t *testing.T) { } } -func TestExecInContainerNoSuchPod(t *testing.T) { - testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) - kubelet := testKubelet.kubelet - fakeRuntime := testKubelet.fakeRuntime - fakeCommandRunner := fakeContainerCommandRunner{} - kubelet.runner = &fakeCommandRunner - fakeRuntime.PodList = []*containertest.FakePod{} - - podName := "podFoo" - podNamespace := "nsFoo" - containerID := "containerFoo" - err := kubelet.ExecInContainer( - kubecontainer.GetPodFullName(&api.Pod{ObjectMeta: api.ObjectMeta{Name: podName, Namespace: podNamespace}}), - "", - containerID, - []string{"ls"}, - nil, - nil, - nil, - false, - nil, - ) - require.Error(t, err) - require.True(t, fakeCommandRunner.ID.IsEmpty(), "Unexpected invocation of runner.ExecInContainer") -} - -func TestExecInContainerNoSuchContainer(t *testing.T) { - testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) - kubelet := testKubelet.kubelet - fakeRuntime := testKubelet.fakeRuntime - fakeCommandRunner := fakeContainerCommandRunner{} - kubelet.runner = &fakeCommandRunner - - podName := "podFoo" - podNamespace := "nsFoo" - containerID := "containerFoo" - fakeRuntime.PodList = []*containertest.FakePod{ - {Pod: &kubecontainer.Pod{ - ID: "12345678", - Name: podName, - Namespace: podNamespace, - Containers: []*kubecontainer.Container{ - {Name: "bar", - ID: kubecontainer.ContainerID{Type: "test", ID: "barID"}}, - }, - }}, - } - - err := kubelet.ExecInContainer( - kubecontainer.GetPodFullName(&api.Pod{ObjectMeta: api.ObjectMeta{ - UID: "12345678", - Name: podName, - Namespace: podNamespace, - }}), - "", - containerID, - []string{"ls"}, - nil, - nil, - nil, - false, - nil, - ) - require.Error(t, err) - require.True(t, fakeCommandRunner.ID.IsEmpty(), "Unexpected invocation of runner.ExecInContainer") -} - type fakeReadWriteCloser struct{} func (f *fakeReadWriteCloser) Write(data []byte) (int, error) { @@ -1133,116 +1022,195 @@ func (f *fakeReadWriteCloser) Close() error { return nil } -func TestExecInContainer(t *testing.T) { - testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) - kubelet := testKubelet.kubelet - fakeRuntime := testKubelet.fakeRuntime - fakeCommandRunner := fakeContainerCommandRunner{} - kubelet.runner = &fakeCommandRunner +func TestExec(t *testing.T) { + const ( + podName = "podFoo" + podNamespace = "nsFoo" + podUID types.UID = "12345678" + containerID = "containerFoo" + tty = true + ) + var ( + podFullName = kubecontainer.GetPodFullName(podWithUidNameNs(podUID, podName, podNamespace)) + command = []string{"ls"} + stdin = &bytes.Buffer{} + stdout = &fakeReadWriteCloser{} + stderr = &fakeReadWriteCloser{} + ) - podName := "podFoo" - podNamespace := "nsFoo" - containerID := "containerFoo" - command := []string{"ls"} - stdin := &bytes.Buffer{} - stdout := &fakeReadWriteCloser{} - stderr := &fakeReadWriteCloser{} - tty := true - fakeRuntime.PodList = []*containertest.FakePod{ - {Pod: &kubecontainer.Pod{ - ID: "12345678", - Name: podName, - Namespace: podNamespace, - Containers: []*kubecontainer.Container{ - {Name: containerID, - ID: kubecontainer.ContainerID{Type: "test", ID: containerID}, + testcases := []struct { + description string + podFullName string + container string + expectError bool + }{{ + description: "success case", + podFullName: podFullName, + container: containerID, + }, { + description: "no such pod", + podFullName: "bar" + podFullName, + container: containerID, + expectError: true, + }, { + description: "no such container", + podFullName: podFullName, + container: "containerBar", + expectError: true, + }} + + for _, tc := range testcases { + testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) + kubelet := testKubelet.kubelet + testKubelet.fakeRuntime.PodList = []*containertest.FakePod{ + {Pod: &kubecontainer.Pod{ + ID: podUID, + Name: podName, + Namespace: podNamespace, + Containers: []*kubecontainer.Container{ + {Name: containerID, + ID: kubecontainer.ContainerID{Type: "test", ID: containerID}, + }, }, - }, - }}, + }}, + } + + { // No streaming case + description := "no streaming - " + tc.description + redirect, err := kubelet.GetExec(tc.podFullName, podUID, tc.container, command, remotecommand.Options{}) + assert.Error(t, err, description) + assert.Nil(t, redirect, description) + + err = kubelet.ExecInContainer(tc.podFullName, podUID, tc.container, command, stdin, stdout, stderr, tty, nil) + assert.Error(t, err, description) + } + { // Direct streaming case + description := "direct streaming - " + tc.description + fakeRuntime := &containertest.FakeDirectStreamingRuntime{FakeRuntime: testKubelet.fakeRuntime} + kubelet.containerRuntime = fakeRuntime + + redirect, err := kubelet.GetExec(tc.podFullName, podUID, tc.container, command, remotecommand.Options{}) + assert.NoError(t, err, description) + assert.Nil(t, redirect, description) + + err = kubelet.ExecInContainer(tc.podFullName, podUID, tc.container, command, stdin, stdout, stderr, tty, nil) + if tc.expectError { + assert.Error(t, err, description) + } else { + assert.NoError(t, err, description) + assert.Equal(t, fakeRuntime.Args.ContainerID.ID, containerID, description+": ID") + assert.Equal(t, fakeRuntime.Args.Cmd, command, description+": Command") + assert.Equal(t, fakeRuntime.Args.Stdin, stdin, description+": Stdin") + assert.Equal(t, fakeRuntime.Args.Stdout, stdout, description+": Stdout") + assert.Equal(t, fakeRuntime.Args.Stderr, stderr, description+": Stderr") + assert.Equal(t, fakeRuntime.Args.TTY, tty, description+": TTY") + } + } + { // Indirect streaming case + description := "indirect streaming - " + tc.description + fakeRuntime := &containertest.FakeIndirectStreamingRuntime{FakeRuntime: testKubelet.fakeRuntime} + kubelet.containerRuntime = fakeRuntime + + redirect, err := kubelet.GetExec(tc.podFullName, podUID, tc.container, command, remotecommand.Options{}) + if tc.expectError { + assert.Error(t, err, description) + } else { + assert.NoError(t, err, description) + assert.Equal(t, containertest.FakeHost, redirect.Host, description+": redirect") + } + + err = kubelet.ExecInContainer(tc.podFullName, podUID, tc.container, command, stdin, stdout, stderr, tty, nil) + assert.Error(t, err, description) + } } - - err := kubelet.ExecInContainer( - kubecontainer.GetPodFullName(podWithUidNameNs("12345678", podName, podNamespace)), - "", - containerID, - []string{"ls"}, - stdin, - stdout, - stderr, - tty, - nil, - ) - require.NoError(t, err) - require.Equal(t, fakeCommandRunner.ID.ID, containerID, "ID") - require.Equal(t, fakeCommandRunner.Cmd, command, "Command") - require.Equal(t, fakeCommandRunner.Stdin, stdin, "Stdin") - require.Equal(t, fakeCommandRunner.Stdout, stdout, "Stdout") - require.Equal(t, fakeCommandRunner.Stderr, stderr, "Stderr") - require.Equal(t, fakeCommandRunner.TTY, tty, "TTY") -} - -func TestPortForwardNoSuchPod(t *testing.T) { - testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) - kubelet := testKubelet.kubelet - fakeRuntime := testKubelet.fakeRuntime - fakeRuntime.PodList = []*containertest.FakePod{} - fakeCommandRunner := fakeContainerCommandRunner{} - kubelet.runner = &fakeCommandRunner - - podName := "podFoo" - podNamespace := "nsFoo" - var port uint16 = 5000 - - err := kubelet.PortForward( - kubecontainer.GetPodFullName(&api.Pod{ObjectMeta: api.ObjectMeta{Name: podName, Namespace: podNamespace}}), - "", - port, - nil, - ) - require.Error(t, err) - require.True(t, fakeCommandRunner.ID.IsEmpty(), "unexpected invocation of runner.PortForward") } func TestPortForward(t *testing.T) { - testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) - kubelet := testKubelet.kubelet - fakeRuntime := testKubelet.fakeRuntime - - podName := "podFoo" - podNamespace := "nsFoo" - podID := types.UID("12345678") - fakeRuntime.PodList = []*containertest.FakePod{ - {Pod: &kubecontainer.Pod{ - ID: podID, - Name: podName, - Namespace: podNamespace, - Containers: []*kubecontainer.Container{ - { - Name: "foo", - ID: kubecontainer.ContainerID{Type: "test", ID: "containerFoo"}, - }, - }, - }}, - } - fakeCommandRunner := fakeContainerCommandRunner{} - kubelet.runner = &fakeCommandRunner - - var port uint16 = 5000 - stream := &fakeReadWriteCloser{} - err := kubelet.PortForward( - kubecontainer.GetPodFullName(&api.Pod{ObjectMeta: api.ObjectMeta{ - UID: "12345678", - Name: podName, - Namespace: podNamespace, - }}), - "", - port, - stream, + const ( + podName = "podFoo" + podNamespace = "nsFoo" + podUID types.UID = "12345678" + port uint16 = 5000 ) - require.NoError(t, err) - require.Equal(t, fakeCommandRunner.PodID, podID, "Pod ID") - require.Equal(t, fakeCommandRunner.Port, port, "Port") - require.Equal(t, fakeCommandRunner.Stream, stream, "stream") + var ( + stream = &fakeReadWriteCloser{} + ) + + testcases := []struct { + description string + podName string + expectError bool + }{{ + description: "success case", + podName: podName, + }, { + description: "no such pod", + podName: "bar", + expectError: true, + }} + + for _, tc := range testcases { + testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) + kubelet := testKubelet.kubelet + testKubelet.fakeRuntime.PodList = []*containertest.FakePod{ + {Pod: &kubecontainer.Pod{ + ID: podUID, + Name: podName, + Namespace: podNamespace, + Containers: []*kubecontainer.Container{ + {Name: "foo", + ID: kubecontainer.ContainerID{Type: "test", ID: "foo"}, + }, + }, + }}, + } + + podFullName := kubecontainer.GetPodFullName(podWithUidNameNs(podUID, tc.podName, podNamespace)) + { // No streaming case + description := "no streaming - " + tc.description + redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID) + assert.Error(t, err, description) + assert.Nil(t, redirect, description) + + err = kubelet.PortForward(podFullName, podUID, port, stream) + assert.Error(t, err, description) + } + { // Direct streaming case + description := "direct streaming - " + tc.description + fakeRuntime := &containertest.FakeDirectStreamingRuntime{FakeRuntime: testKubelet.fakeRuntime} + kubelet.containerRuntime = fakeRuntime + + redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID) + assert.NoError(t, err, description) + assert.Nil(t, redirect, description) + + err = kubelet.PortForward(podFullName, podUID, port, stream) + if tc.expectError { + assert.Error(t, err, description) + } else { + assert.NoError(t, err, description) + require.Equal(t, fakeRuntime.Args.Pod.ID, podUID, description+": Pod UID") + require.Equal(t, fakeRuntime.Args.Port, port, description+": Port") + require.Equal(t, fakeRuntime.Args.Stream, stream, description+": stream") + } + } + { // Indirect streaming case + description := "indirect streaming - " + tc.description + fakeRuntime := &containertest.FakeIndirectStreamingRuntime{FakeRuntime: testKubelet.fakeRuntime} + kubelet.containerRuntime = fakeRuntime + + redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID) + if tc.expectError { + assert.Error(t, err, description) + } else { + assert.NoError(t, err, description) + assert.Equal(t, containertest.FakeHost, redirect.Host, description+": redirect") + } + + err = kubelet.PortForward(podFullName, podUID, port, stream) + assert.Error(t, err, description) + } + } } // Tests that identify the host port conflicts are detected correctly. diff --git a/pkg/kubelet/kuberuntime/kuberuntime_container.go b/pkg/kubelet/kuberuntime/kuberuntime_container.go index 3b867387e2d..2ae10fedb01 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_container.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_container.go @@ -21,6 +21,7 @@ import ( "io" "io/ioutil" "math/rand" + "net/url" "os" "path/filepath" "sort" @@ -679,10 +680,10 @@ func findNextInitContainerToRun(pod *api.Pod, podStatus *kubecontainer.PodStatus } // AttachContainer attaches to the container's console +// TODO: Remove this method once the indirect streaming path is fully functional. func (m *kubeGenericRuntimeManager) AttachContainer(id kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) (err error) { // Use `docker attach` directly for in-process docker integration for // now to unblock other tests. - // TODO: remove this hack after attach is defined in CRI. if ds, ok := m.runtimeService.(dockershim.DockerLegacyService); ok { return ds.LegacyAttach(id, stdin, stdout, stderr, tty, resize) } @@ -701,14 +702,51 @@ func (m *kubeGenericRuntimeManager) GetContainerLogs(pod *api.Pod, containerID k return ReadLogs(path, logOptions, stdout, stderr) } +// GetExec gets the endpoint the runtime will serve the exec request from. +func (m *kubeGenericRuntimeManager) GetExec(id kubecontainer.ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error) { + req := &runtimeApi.ExecRequest{ + ContainerId: &id.ID, + Cmd: cmd, + Tty: &tty, + Stdin: &stdin, + } + resp, err := m.runtimeService.Exec(req) + if err != nil { + return nil, err + } + + return url.Parse(resp.GetUrl()) +} + +// GetAttach gets the endpoint the runtime will serve the attach request from. +func (m *kubeGenericRuntimeManager) GetAttach(id kubecontainer.ContainerID, stdin, stdout, stderr bool) (*url.URL, error) { + req := &runtimeApi.AttachRequest{ + ContainerId: &id.ID, + Stdin: &stdin, + } + resp, err := m.runtimeService.Attach(req) + if err != nil { + return nil, err + } + return url.Parse(resp.GetUrl()) +} + +// RunInContainer synchronously executes the command in the container, and returns the output. +func (m *kubeGenericRuntimeManager) RunInContainer(id kubecontainer.ContainerID, cmd []string) ([]byte, error) { + stdout, stderr, err := m.runtimeService.ExecSync(id.ID, cmd, 0) + // NOTE(timstclair): This does not correctly interleave stdout & stderr, but should be sufficient + // for logging purposes. A combined output option will need to be added to the ExecSyncRequest + // if more precise output ordering is ever required. + return append(stdout, stderr...), err +} + // Runs the command in the container of the specified pod using nsenter. // Attaches the processes stdin, stdout, and stderr. Optionally uses a // tty. -// TODO: handle terminal resizing, refer https://github.com/kubernetes/kubernetes/issues/29579 +// TODO: Remove this method once the indirect streaming path is fully functional. func (m *kubeGenericRuntimeManager) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { // Use `docker exec` directly for in-process docker integration for // now to unblock other tests. - // TODO: remove this hack after exec is defined in CRI. if ds, ok := m.runtimeService.(dockershim.DockerLegacyService); ok { return ds.LegacyExec(containerID, cmd, stdin, stdout, stderr, tty, resize) } diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager.go b/pkg/kubelet/kuberuntime/kuberuntime_manager.go index 0d4b6dddd86..f91bf041e7a 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager.go @@ -110,6 +110,14 @@ type kubeGenericRuntimeManager struct { versionCache *cache.ObjectCache } +type KubeGenericRuntime interface { + kubecontainer.Runtime + kubecontainer.IndirectStreamingRuntime + kubecontainer.ContainerCommandRunner + // TODO(timstclair): Remove this once the indirect path is fully functional. + kubecontainer.DirectStreamingRuntime +} + // NewKubeGenericRuntimeManager creates a new kubeGenericRuntimeManager func NewKubeGenericRuntimeManager( recorder record.EventRecorder, @@ -128,7 +136,7 @@ func NewKubeGenericRuntimeManager( cpuCFSQuota bool, runtimeService internalApi.RuntimeService, imageService internalApi.ImageManagerService, -) (kubecontainer.Runtime, error) { +) (KubeGenericRuntime, error) { kubeRuntimeManager := &kubeGenericRuntimeManager{ recorder: recorder, cpuCFSQuota: cpuCFSQuota, @@ -840,7 +848,7 @@ func (m *kubeGenericRuntimeManager) GetPodStatus(uid kubetypes.UID, name, namesp // Anyhow, we only promised "best-effort" restart count reporting, we can just ignore // these limitations now. // TODO: move this comment to SyncPod. - podSandboxIDs, err := m.getSandboxIDByPodUID(string(uid), nil) + podSandboxIDs, err := m.getSandboxIDByPodUID(uid, nil) if err != nil { return nil, err } @@ -938,6 +946,7 @@ func (m *kubeGenericRuntimeManager) GetPodContainerID(pod *kubecontainer.Pod) (k } // Forward the specified port from the specified pod to the stream. +// TODO: Remove this method once the indirect streaming path is fully functional. func (m *kubeGenericRuntimeManager) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error { formattedPod := kubecontainer.FormatPod(pod) if len(pod.Sandboxes) == 0 { @@ -947,7 +956,6 @@ func (m *kubeGenericRuntimeManager) PortForward(pod *kubecontainer.Pod, port uin // Use docker portforward directly for in-process docker integration // now to unblock other tests. - // TODO: remove this hack after portforward is defined in CRI. if ds, ok := m.runtimeService.(dockershim.DockerLegacyService); ok { return ds.LegacyPortForward(pod.Sandboxes[0].ID.ID, port, stream) } diff --git a/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go b/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go index f42e8f13a9e..f7ac7e41cc7 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go @@ -19,6 +19,7 @@ package kuberuntime import ( "fmt" "net" + "net/url" "sort" "github.com/golang/glog" @@ -27,6 +28,7 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/util/format" + kubetypes "k8s.io/kubernetes/pkg/types" ) // createPodSandbox creates a pod sandbox and returns (podSandBoxID, message, error). @@ -195,10 +197,10 @@ func (m *kubeGenericRuntimeManager) determinePodSandboxIP(podNamespace, podName // getPodSandboxID gets the sandbox id by podUID and returns ([]sandboxID, error). // Param state could be nil in order to get all sandboxes belonging to same pod. -func (m *kubeGenericRuntimeManager) getSandboxIDByPodUID(podUID string, state *runtimeApi.PodSandboxState) ([]string, error) { +func (m *kubeGenericRuntimeManager) getSandboxIDByPodUID(podUID kubetypes.UID, state *runtimeApi.PodSandboxState) ([]string, error) { filter := &runtimeApi.PodSandboxFilter{ State: state, - LabelSelector: map[string]string{types.KubernetesPodUIDLabel: podUID}, + LabelSelector: map[string]string{types.KubernetesPodUIDLabel: string(podUID)}, } sandboxes, err := m.runtimeService.ListPodSandbox(filter) if err != nil { @@ -219,3 +221,23 @@ func (m *kubeGenericRuntimeManager) getSandboxIDByPodUID(podUID string, state *r return sandboxIDs, nil } + +// GetPortForward gets the endpoint the runtime will serve the port-forward request from. +func (m *kubeGenericRuntimeManager) GetPortForward(podName, podNamespace string, podUID kubetypes.UID) (*url.URL, error) { + sandboxIDs, err := m.getSandboxIDByPodUID(podUID, nil) + if err != nil { + return nil, fmt.Errorf("failed to find sandboxID for pod %s: %v", format.PodDesc(podName, podNamespace, podUID), err) + } + if len(sandboxIDs) == 0 { + return nil, fmt.Errorf("failed to find sandboxID for pod %s", format.PodDesc(podName, podNamespace, podUID)) + } + // TODO: Port is unused for now, but we may need it in the future. + req := &runtimeApi.PortForwardRequest{ + PodSandboxId: &sandboxIDs[0], + } + resp, err := m.runtimeService.PortForward(req) + if err != nil { + return nil, err + } + return url.Parse(resp.GetUrl()) +} diff --git a/pkg/kubelet/lifecycle/BUILD b/pkg/kubelet/lifecycle/BUILD index bc25e2b7884..7e9be26475a 100644 --- a/pkg/kubelet/lifecycle/BUILD +++ b/pkg/kubelet/lifecycle/BUILD @@ -25,7 +25,6 @@ go_library( "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/types:go_default_library", "//pkg/kubelet/util/format:go_default_library", - "//pkg/kubelet/util/ioutils:go_default_library", "//pkg/security/apparmor:go_default_library", "//pkg/types:go_default_library", "//pkg/util/intstr:go_default_library", @@ -44,6 +43,5 @@ go_test( "//pkg/api:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/util/intstr:go_default_library", - "//pkg/util/term:go_default_library", ], ) diff --git a/pkg/kubelet/lifecycle/handlers.go b/pkg/kubelet/lifecycle/handlers.go index c4d0d7b7a97..c4d6f4dfdcc 100644 --- a/pkg/kubelet/lifecycle/handlers.go +++ b/pkg/kubelet/lifecycle/handlers.go @@ -17,7 +17,6 @@ limitations under the License. package lifecycle import ( - "bytes" "fmt" "io/ioutil" "net" @@ -29,7 +28,6 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/util/format" - "k8s.io/kubernetes/pkg/kubelet/util/ioutils" "k8s.io/kubernetes/pkg/security/apparmor" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/intstr" @@ -56,14 +54,10 @@ func NewHandlerRunner(httpGetter kubetypes.HttpGetter, commandRunner kubecontain func (hr *HandlerRunner) Run(containerID kubecontainer.ContainerID, pod *api.Pod, container *api.Container, handler *api.Handler) (string, error) { switch { case handler.Exec != nil: - var ( - buffer bytes.Buffer - msg string - ) - output := ioutils.WriteCloserWrapper(&buffer) - err := hr.commandRunner.ExecInContainer(containerID, handler.Exec.Command, nil, output, output, false, nil) + var msg string + output, err := hr.commandRunner.RunInContainer(containerID, handler.Exec.Command) if err != nil { - msg := fmt.Sprintf("Exec lifecycle hook (%v) for Container %q in Pod %q failed - error: %v, message: %q", handler.Exec.Command, container.Name, format.Pod(pod), err, buffer.String()) + msg := fmt.Sprintf("Exec lifecycle hook (%v) for Container %q in Pod %q failed - error: %v, message: %q", handler.Exec.Command, container.Name, format.Pod(pod), err, string(output)) glog.V(1).Infof(msg) } return msg, err diff --git a/pkg/kubelet/lifecycle/handlers_test.go b/pkg/kubelet/lifecycle/handlers_test.go index 74399a53693..31a1c8b6b05 100644 --- a/pkg/kubelet/lifecycle/handlers_test.go +++ b/pkg/kubelet/lifecycle/handlers_test.go @@ -18,7 +18,6 @@ package lifecycle import ( "fmt" - "io" "io/ioutil" "net/http" "reflect" @@ -28,7 +27,6 @@ import ( "k8s.io/kubernetes/pkg/api" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/util/intstr" - "k8s.io/kubernetes/pkg/util/term" ) func TestResolvePortInt(t *testing.T) { @@ -81,14 +79,10 @@ type fakeContainerCommandRunner struct { ID kubecontainer.ContainerID } -func (f *fakeContainerCommandRunner) ExecInContainer(id kubecontainer.ContainerID, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error { +func (f *fakeContainerCommandRunner) RunInContainer(id kubecontainer.ContainerID, cmd []string) ([]byte, error) { f.Cmd = cmd f.ID = id - return nil -} - -func (f *fakeContainerCommandRunner) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error { - return nil + return nil, nil } func TestRunHandlerExec(t *testing.T) { diff --git a/pkg/kubelet/prober/BUILD b/pkg/kubelet/prober/BUILD index 47d30df253c..a191bf7e004 100644 --- a/pkg/kubelet/prober/BUILD +++ b/pkg/kubelet/prober/BUILD @@ -26,7 +26,6 @@ go_library( "//pkg/kubelet/prober/results:go_default_library", "//pkg/kubelet/status:go_default_library", "//pkg/kubelet/util/format:go_default_library", - "//pkg/kubelet/util/ioutils:go_default_library", "//pkg/probe:go_default_library", "//pkg/probe/exec:go_default_library", "//pkg/probe/http:go_default_library", @@ -57,6 +56,7 @@ go_test( "//pkg/client/clientset_generated/internalclientset/fake:go_default_library", "//pkg/client/record:go_default_library", "//pkg/kubelet/container:go_default_library", + "//pkg/kubelet/container/testing:go_default_library", "//pkg/kubelet/pod:go_default_library", "//pkg/kubelet/prober/results:go_default_library", "//pkg/kubelet/status:go_default_library", @@ -65,7 +65,6 @@ go_test( "//pkg/util/exec:go_default_library", "//pkg/util/intstr:go_default_library", "//pkg/util/runtime:go_default_library", - "//pkg/util/term:go_default_library", "//pkg/util/wait:go_default_library", "//vendor:github.com/golang/glog", ], diff --git a/pkg/kubelet/prober/prober.go b/pkg/kubelet/prober/prober.go index 8f248566fc2..b09b6491451 100644 --- a/pkg/kubelet/prober/prober.go +++ b/pkg/kubelet/prober/prober.go @@ -17,7 +17,6 @@ limitations under the License. package prober import ( - "bytes" "fmt" "io" "net" @@ -33,7 +32,6 @@ import ( "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/util/format" - "k8s.io/kubernetes/pkg/kubelet/util/ioutils" "k8s.io/kubernetes/pkg/probe" execprobe "k8s.io/kubernetes/pkg/probe/exec" httprobe "k8s.io/kubernetes/pkg/probe/http" @@ -230,13 +228,7 @@ type execInContainer struct { func (p *prober) newExecInContainer(container api.Container, containerID kubecontainer.ContainerID, cmd []string) exec.Cmd { return execInContainer{func() ([]byte, error) { - var buffer bytes.Buffer - output := ioutils.WriteCloserWrapper(&buffer) - err := p.runner.ExecInContainer(containerID, cmd, nil, output, output, false, nil) - // Even if err is non-nil, there still may be output (e.g. the exec wrote to stdout or stderr but - // the command returned a nonzero exit code). Therefore, always return the output along with the - // error. - return buffer.Bytes(), err + return p.runner.RunInContainer(containerID, cmd) }} } diff --git a/pkg/kubelet/prober/prober_test.go b/pkg/kubelet/prober/prober_test.go index 39f108b2e66..82d8b889744 100644 --- a/pkg/kubelet/prober/prober_test.go +++ b/pkg/kubelet/prober/prober_test.go @@ -19,7 +19,6 @@ package prober import ( "errors" "fmt" - "io" "net/http" "reflect" "testing" @@ -27,10 +26,10 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/record" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/probe" "k8s.io/kubernetes/pkg/util/intstr" - "k8s.io/kubernetes/pkg/util/term" ) func TestFormatURL(t *testing.T) { @@ -279,40 +278,6 @@ func TestProbe(t *testing.T) { } } -type fakeContainerCommandRunner struct { - // what to return - stdoutData string - stderrData string - err error - - // actual values when invoked - containerID kubecontainer.ContainerID - cmd []string - stdin io.Reader - tty bool - resize <-chan term.Size -} - -var _ kubecontainer.ContainerCommandRunner = &fakeContainerCommandRunner{} - -func (f *fakeContainerCommandRunner) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { - // record invoked values - f.containerID = containerID - f.cmd = cmd - f.stdin = stdin - f.tty = tty - f.resize = resize - - fmt.Fprint(stdout, f.stdoutData) - fmt.Fprint(stdout, f.stderrData) - - return f.err -} - -func (f *fakeContainerCommandRunner) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error { - panic("not implemented") -} - func TestNewExecInContainer(t *testing.T) { tests := []struct { name string @@ -329,10 +294,9 @@ func TestNewExecInContainer(t *testing.T) { } for _, test := range tests { - runner := &fakeContainerCommandRunner{ - stdoutData: "foo", - stderrData: "bar", - err: test.err, + runner := &containertest.FakeContainerCommandRunner{ + Stdout: "foo", + Err: test.err, } prober := &prober{ runner: runner, @@ -344,23 +308,14 @@ func TestNewExecInContainer(t *testing.T) { exec := prober.newExecInContainer(container, containerID, cmd) actualOutput, err := exec.CombinedOutput() - if e, a := containerID, runner.containerID; e != a { + if e, a := containerID, runner.ContainerID; e != a { t.Errorf("%s: container id: expected %v, got %v", test.name, e, a) } - if e, a := cmd, runner.cmd; !reflect.DeepEqual(e, a) { + if e, a := cmd, runner.Cmd; !reflect.DeepEqual(e, a) { t.Errorf("%s: cmd: expected %v, got %v", test.name, e, a) } - if runner.stdin != nil { - t.Errorf("%s: stdin: expected nil, got %v", test.name, runner.stdin) - } - if runner.tty { - t.Errorf("%s: tty: expected false", test.name) - } - if runner.resize != nil { - t.Errorf("%s: resize chan: expected nil, got %v", test.name, runner.resize) - } // this isn't 100% foolproof as a bug in a real ContainerCommandRunner where it fails to copy to stdout/stderr wouldn't be caught by this test - if e, a := "foobar", string(actualOutput); e != a { + if e, a := "foo", string(actualOutput); e != a { t.Errorf("%s: output: expected %q, got %q", test.name, e, a) } if e, a := fmt.Sprintf("%v", test.err), fmt.Sprintf("%v", err); e != a { diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index 073c91af407..62c9172c8ab 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -181,6 +181,7 @@ type Runtime struct { } var _ kubecontainer.Runtime = &Runtime{} +var _ kubecontainer.DirectStreamingRuntime = &Runtime{} // TODO(yifan): This duplicates the podGetter in dockertools. type podGetter interface { @@ -276,7 +277,8 @@ func New( return nil, fmt.Errorf("rkt: cannot get config from rkt api service: %v", err) } - rkt.runner = lifecycle.NewHandlerRunner(httpClient, rkt, rkt) + cmdRunner := kubecontainer.DirectStreamingRunner(rkt) + rkt.runner = lifecycle.NewHandlerRunner(httpClient, cmdRunner, rkt) rkt.imagePuller = images.NewImageManager(recorder, rkt, imageBackOff, serializeImagePulls, imagePullQPS, imagePullBurst) diff --git a/pkg/kubelet/server/BUILD b/pkg/kubelet/server/BUILD index 6223bdeac9e..bed803086fe 100644 --- a/pkg/kubelet/server/BUILD +++ b/pkg/kubelet/server/BUILD @@ -34,6 +34,7 @@ go_library( "//pkg/kubelet/server/portforward:go_default_library", "//pkg/kubelet/server/remotecommand:go_default_library", "//pkg/kubelet/server/stats:go_default_library", + "//pkg/kubelet/server/streaming:go_default_library", "//pkg/runtime:go_default_library", "//pkg/types:go_default_library", "//pkg/util/configz:go_default_library", @@ -65,6 +66,7 @@ go_test( "//pkg/kubelet/cm:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/container/testing:go_default_library", + "//pkg/kubelet/server/remotecommand:go_default_library", "//pkg/kubelet/server/stats:go_default_library", "//pkg/kubelet/types:go_default_library", "//pkg/types:go_default_library", @@ -75,5 +77,7 @@ go_test( "//pkg/volume:go_default_library", "//vendor:github.com/google/cadvisor/info/v1", "//vendor:github.com/google/cadvisor/info/v2", + "//vendor:github.com/stretchr/testify/assert", + "//vendor:github.com/stretchr/testify/require", ], ) diff --git a/pkg/kubelet/server/remotecommand/httpstream.go b/pkg/kubelet/server/remotecommand/httpstream.go index 141a8836cf8..32e0ae82c75 100644 --- a/pkg/kubelet/server/remotecommand/httpstream.go +++ b/pkg/kubelet/server/remotecommand/httpstream.go @@ -36,18 +36,18 @@ import ( "github.com/golang/glog" ) -// options contains details about which streams are required for +// Options contains details about which streams are required for // remote command execution. -type options struct { - stdin bool - stdout bool - stderr bool - tty bool +type Options struct { + Stdin bool + Stdout bool + Stderr bool + TTY bool expectedStreams int } -// newOptions creates a new options from the Request. -func newOptions(req *http.Request) (*options, error) { +// NewOptions creates a new Options from the Request. +func NewOptions(req *http.Request) (*Options, error) { tty := req.FormValue(api.ExecTTYParam) == "1" stdin := req.FormValue(api.ExecStdinParam) == "1" stdout := req.FormValue(api.ExecStdoutParam) == "1" @@ -74,11 +74,11 @@ func newOptions(req *http.Request) (*options, error) { return nil, fmt.Errorf("you must specify at least 1 of stdin, stdout, stderr") } - return &options{ - stdin: stdin, - stdout: stdout, - stderr: stderr, - tty: tty, + return &Options{ + Stdin: stdin, + Stdout: stdout, + Stderr: stderr, + TTY: tty, expectedStreams: expectedStreams, }, nil } @@ -116,7 +116,7 @@ func waitStreamReply(replySent <-chan struct{}, notify chan<- struct{}, stop <-c } func createStreams(req *http.Request, w http.ResponseWriter, supportedStreamProtocols []string, idleTimeout, streamCreationTimeout time.Duration) (*context, bool) { - opts, err := newOptions(req) + opts, err := NewOptions(req) if err != nil { runtime.HandleError(err) w.WriteHeader(http.StatusBadRequest) @@ -143,7 +143,7 @@ func createStreams(req *http.Request, w http.ResponseWriter, supportedStreamProt return ctx, true } -func createHttpStreamStreams(req *http.Request, w http.ResponseWriter, opts *options, supportedStreamProtocols []string, idleTimeout, streamCreationTimeout time.Duration) (*context, bool) { +func createHttpStreamStreams(req *http.Request, w http.ResponseWriter, opts *Options, supportedStreamProtocols []string, idleTimeout, streamCreationTimeout time.Duration) (*context, bool) { protocol, err := httpstream.Handshake(req, w, supportedStreamProtocols) if err != nil { w.WriteHeader(http.StatusBadRequest) @@ -183,7 +183,7 @@ func createHttpStreamStreams(req *http.Request, w http.ResponseWriter, opts *opt handler = &v1ProtocolHandler{} } - if opts.tty && handler.supportsTerminalResizing() { + if opts.TTY && handler.supportsTerminalResizing() { opts.expectedStreams++ } @@ -197,7 +197,7 @@ func createHttpStreamStreams(req *http.Request, w http.ResponseWriter, opts *opt } ctx.conn = conn - ctx.tty = opts.tty + ctx.tty = opts.TTY return ctx, true } diff --git a/pkg/kubelet/server/remotecommand/websocket.go b/pkg/kubelet/server/remotecommand/websocket.go index 243f6dfc0b0..4278f81dc8d 100644 --- a/pkg/kubelet/server/remotecommand/websocket.go +++ b/pkg/kubelet/server/remotecommand/websocket.go @@ -41,12 +41,12 @@ const ( // createChannels returns the standard channel types for a shell connection (STDIN 0, STDOUT 1, STDERR 2) // along with the approximate duplex value. It also creates the error (3) and resize (4) channels. -func createChannels(opts *options) []wsstream.ChannelType { +func createChannels(opts *Options) []wsstream.ChannelType { // open the requested channels, and always open the error channel channels := make([]wsstream.ChannelType, 5) - channels[stdinChannel] = readChannel(opts.stdin) - channels[stdoutChannel] = writeChannel(opts.stdout) - channels[stderrChannel] = writeChannel(opts.stderr) + channels[stdinChannel] = readChannel(opts.Stdin) + channels[stdoutChannel] = writeChannel(opts.Stdout) + channels[stderrChannel] = writeChannel(opts.Stderr) channels[errorChannel] = wsstream.WriteChannel channels[resizeChannel] = wsstream.ReadChannel return channels @@ -70,7 +70,7 @@ func writeChannel(real bool) wsstream.ChannelType { // createWebSocketStreams returns a context containing the websocket connection and // streams needed to perform an exec or an attach. -func createWebSocketStreams(req *http.Request, w http.ResponseWriter, opts *options, idleTimeout time.Duration) (*context, bool) { +func createWebSocketStreams(req *http.Request, w http.ResponseWriter, opts *Options, idleTimeout time.Duration) (*context, bool) { channels := createChannels(opts) conn := wsstream.NewConn(map[string]wsstream.ChannelProtocolConfig{ "": { @@ -104,9 +104,9 @@ func createWebSocketStreams(req *http.Request, w http.ResponseWriter, opts *opti // Send an empty message to the lowest writable channel to notify the client the connection is established // TODO: make generic to SPDY and WebSockets and do it outside of this method? switch { - case opts.stdout: + case opts.Stdout: streams[stdoutChannel].Write([]byte{}) - case opts.stderr: + case opts.Stderr: streams[stderrChannel].Write([]byte{}) default: streams[errorChannel].Write([]byte{}) @@ -117,7 +117,7 @@ func createWebSocketStreams(req *http.Request, w http.ResponseWriter, opts *opti stdinStream: streams[stdinChannel], stdoutStream: streams[stdoutChannel], stderrStream: streams[stderrChannel], - tty: opts.tty, + tty: opts.TTY, resizeStream: streams[resizeChannel], } diff --git a/pkg/kubelet/server/server.go b/pkg/kubelet/server/server.go index ec609c0d050..456390901b9 100644 --- a/pkg/kubelet/server/server.go +++ b/pkg/kubelet/server/server.go @@ -23,6 +23,7 @@ import ( "net" "net/http" "net/http/pprof" + "net/url" "reflect" "strconv" "strings" @@ -48,6 +49,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/server/portforward" "k8s.io/kubernetes/pkg/kubelet/server/remotecommand" "k8s.io/kubernetes/pkg/kubelet/server/stats" + "k8s.io/kubernetes/pkg/kubelet/server/streaming" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/configz" @@ -178,6 +180,9 @@ type HostInterface interface { RootFsInfo() (cadvisorapiv2.FsInfo, error) ListVolumesForPod(podUID types.UID) (map[string]volume.Volume, bool) PLEGHealthCheck() (bool, error) + GetExec(podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommand.Options) (*url.URL, error) + GetAttach(podFullName string, podUID types.UID, containerName string, streamOpts remotecommand.Options) (*url.URL, error) + GetPortForward(podName, podNamespace string, podUID types.UID) (*url.URL, error) } // NewServer initializes and configures a kubelet.Server object to handle HTTP requests. @@ -565,31 +570,59 @@ func (s *Server) getSpec(request *restful.Request, response *restful.Response) { response.WriteEntity(info) } -func getContainerCoordinates(request *restful.Request) (namespace, pod string, uid types.UID, container string) { - namespace = request.PathParameter("podNamespace") - pod = request.PathParameter("podID") - if uidStr := request.PathParameter("uid"); uidStr != "" { - uid = types.UID(uidStr) +type requestParams struct { + podNamespace string + podName string + podUID types.UID + containerName string + cmd []string + streamOpts remotecommand.Options +} + +func getRequestParams(req *restful.Request) requestParams { + streamOpts, err := remotecommand.NewOptions(req.Request) + if err != nil { + glog.Warningf("Unable to parse request stream options: %v", err) + } + if streamOpts == nil { + streamOpts = &remotecommand.Options{} + } + return requestParams{ + podNamespace: req.PathParameter("podNamespace"), + podName: req.PathParameter("podID"), + podUID: types.UID(req.PathParameter("uid")), + containerName: req.PathParameter("containerName"), + cmd: req.Request.URL.Query()[api.ExecCommandParamm], + streamOpts: *streamOpts, } - container = request.PathParameter("containerName") - return } // getAttach handles requests to attach to a container. func (s *Server) getAttach(request *restful.Request, response *restful.Response) { - podNamespace, podID, uid, container := getContainerCoordinates(request) - pod, ok := s.host.GetPodByName(podNamespace, podID) + params := getRequestParams(request) + pod, ok := s.host.GetPodByName(params.podNamespace, params.podName) if !ok { response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist")) return } + podFullName := kubecontainer.GetPodFullName(pod) + redirect, err := s.host.GetAttach(podFullName, params.podUID, params.containerName, params.streamOpts) + if err != nil { + response.WriteError(streaming.HTTPStatus(err), err) + return + } + if redirect != nil { + http.Redirect(response.ResponseWriter, request.Request, redirect.String(), http.StatusFound) + return + } + remotecommand.ServeAttach(response.ResponseWriter, request.Request, s.host, - kubecontainer.GetPodFullName(pod), - uid, - container, + podFullName, + params.podUID, + params.containerName, s.host.StreamingConnectionIdleTimeout(), remotecommand.DefaultStreamCreationTimeout, remotecommand.SupportedStreamingProtocols) @@ -597,19 +630,30 @@ func (s *Server) getAttach(request *restful.Request, response *restful.Response) // getExec handles requests to run a command inside a container. func (s *Server) getExec(request *restful.Request, response *restful.Response) { - podNamespace, podID, uid, container := getContainerCoordinates(request) - pod, ok := s.host.GetPodByName(podNamespace, podID) + params := getRequestParams(request) + pod, ok := s.host.GetPodByName(params.podNamespace, params.podName) if !ok { response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist")) return } + podFullName := kubecontainer.GetPodFullName(pod) + redirect, err := s.host.GetExec(podFullName, params.podUID, params.containerName, params.cmd, params.streamOpts) + if err != nil { + response.WriteError(streaming.HTTPStatus(err), err) + return + } + if redirect != nil { + http.Redirect(response.ResponseWriter, request.Request, redirect.String(), http.StatusFound) + return + } + remotecommand.ServeExec(response.ResponseWriter, request.Request, s.host, - kubecontainer.GetPodFullName(pod), - uid, - container, + podFullName, + params.podUID, + params.containerName, s.host.StreamingConnectionIdleTimeout(), remotecommand.DefaultStreamCreationTimeout, remotecommand.SupportedStreamingProtocols) @@ -617,14 +661,16 @@ func (s *Server) getExec(request *restful.Request, response *restful.Response) { // getRun handles requests to run a command inside a container. func (s *Server) getRun(request *restful.Request, response *restful.Response) { - podNamespace, podID, uid, container := getContainerCoordinates(request) - pod, ok := s.host.GetPodByName(podNamespace, podID) + params := getRequestParams(request) + pod, ok := s.host.GetPodByName(params.podNamespace, params.podName) if !ok { response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist")) return } - command := strings.Split(request.QueryParameter("cmd"), " ") - data, err := s.host.RunInContainer(kubecontainer.GetPodFullName(pod), uid, container, command) + + // For legacy reasons, run uses different query param than exec. + params.cmd = strings.Split(request.QueryParameter("cmd"), " ") + data, err := s.host.RunInContainer(kubecontainer.GetPodFullName(pod), params.podUID, params.containerName, params.cmd) if err != nil { response.WriteError(http.StatusInternalServerError, err) return @@ -632,15 +678,6 @@ func (s *Server) getRun(request *restful.Request, response *restful.Response) { writeJsonResponse(response, data) } -func getPodCoordinates(request *restful.Request) (namespace, pod string, uid types.UID) { - namespace = request.PathParameter("podNamespace") - pod = request.PathParameter("podID") - if uidStr := request.PathParameter("uid"); uidStr != "" { - uid = types.UID(uidStr) - } - return -} - // Derived from go-restful writeJSON. func writeJsonResponse(response *restful.Response, data []byte) { if data == nil { @@ -658,16 +695,30 @@ func writeJsonResponse(response *restful.Response, data []byte) { // getPortForward handles a new restful port forward request. It determines the // pod name and uid and then calls ServePortForward. func (s *Server) getPortForward(request *restful.Request, response *restful.Response) { - podNamespace, podID, uid := getPodCoordinates(request) - pod, ok := s.host.GetPodByName(podNamespace, podID) + params := getRequestParams(request) + pod, ok := s.host.GetPodByName(params.podNamespace, params.podName) if !ok { response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist")) return } - podName := kubecontainer.GetPodFullName(pod) + redirect, err := s.host.GetPortForward(params.podName, params.podNamespace, params.podUID) + if err != nil { + response.WriteError(streaming.HTTPStatus(err), err) + return + } + if redirect != nil { + http.Redirect(response.ResponseWriter, request.Request, redirect.String(), http.StatusFound) + return + } - portforward.ServePortForward(response.ResponseWriter, request.Request, s.host, podName, uid, s.host.StreamingConnectionIdleTimeout(), remotecommand.DefaultStreamCreationTimeout) + portforward.ServePortForward(response.ResponseWriter, + request.Request, + s.host, + kubecontainer.GetPodFullName(pod), + params.podUID, + s.host.StreamingConnectionIdleTimeout(), + remotecommand.DefaultStreamCreationTimeout) } // ServeHTTP responds to HTTP requests on the Kubelet. diff --git a/pkg/kubelet/server/server_test.go b/pkg/kubelet/server/server_test.go index 7b028ddd480..afa45e8535d 100644 --- a/pkg/kubelet/server/server_test.go +++ b/pkg/kubelet/server/server_test.go @@ -27,6 +27,7 @@ import ( "net/http" "net/http/httptest" "net/http/httputil" + "net/url" "reflect" "strconv" "strings" @@ -35,6 +36,8 @@ import ( cadvisorapi "github.com/google/cadvisor/info/v1" cadvisorapiv2 "github.com/google/cadvisor/info/v2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "k8s.io/kubernetes/pkg/api" apierrs "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/auth/authorizer" @@ -42,6 +45,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainertesting "k8s.io/kubernetes/pkg/kubelet/container/testing" + "k8s.io/kubernetes/pkg/kubelet/server/remotecommand" "k8s.io/kubernetes/pkg/kubelet/server/stats" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/types" @@ -70,6 +74,7 @@ type fakeKubelet struct { resyncInterval time.Duration loopEntryTime time.Time plegHealth bool + redirectURL *url.URL } func (fk *fakeKubelet) ResyncInterval() time.Duration { @@ -132,6 +137,18 @@ func (fk *fakeKubelet) PortForward(name string, uid types.UID, port uint16, stre return fk.portForwardFunc(name, uid, port, stream) } +func (fk *fakeKubelet) GetExec(podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommand.Options) (*url.URL, error) { + return fk.redirectURL, nil +} + +func (fk *fakeKubelet) GetAttach(podFullName string, podUID types.UID, containerName string, streamOpts remotecommand.Options) (*url.URL, error) { + return fk.redirectURL, nil +} + +func (fk *fakeKubelet) GetPortForward(podName, podNamespace string, podUID types.UID) (*url.URL, error) { + return fk.redirectURL, nil +} + func (fk *fakeKubelet) StreamingConnectionIdleTimeout() time.Duration { return fk.streamingConnectionIdleTimeoutFunc() } @@ -1136,6 +1153,7 @@ func testExecAttach(t *testing.T, verb string) { tty bool responseStatusCode int uid bool + responseLocation string }{ {responseStatusCode: http.StatusBadRequest}, {stdin: true, responseStatusCode: http.StatusSwitchingProtocols}, @@ -1144,6 +1162,7 @@ func testExecAttach(t *testing.T, verb string) { {stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols}, {stdout: true, stderr: true, tty: true, responseStatusCode: http.StatusSwitchingProtocols}, {stdin: true, stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols}, + {responseStatusCode: http.StatusFound, responseLocation: "http://localhost:12345/" + verb}, } for i, test := range tests { @@ -1154,6 +1173,12 @@ func testExecAttach(t *testing.T, verb string) { return 0 } + if test.responseLocation != "" { + var err error + fw.fakeKubelet.redirectURL, err = url.Parse(test.responseLocation) + require.NoError(t, err) + } + podNamespace := "other" podName := "foo" expectedPodName := getPodName(podName, podNamespace) @@ -1277,6 +1302,10 @@ func testExecAttach(t *testing.T, verb string) { if test.responseStatusCode != http.StatusSwitchingProtocols { c = &http.Client{} + // Don't follow redirects, since we want to inspect the redirect response. + c.CheckRedirect = func(*http.Request, []*http.Request) error { + return http.ErrUseLastResponse + } } else { upgradeRoundTripper = spdy.NewRoundTripper(nil) c = &http.Client{Transport: upgradeRoundTripper} @@ -1297,6 +1326,10 @@ func testExecAttach(t *testing.T, verb string) { t.Fatalf("%d: response status: expected %v, got %v", i, e, a) } + if e, a := test.responseLocation, resp.Header.Get("Location"); e != a { + t.Errorf("%d: response location: expected %v, got %v", i, e, a) + } + if test.responseStatusCode != http.StatusSwitchingProtocols { continue } @@ -1435,11 +1468,12 @@ func TestServePortForwardIdleTimeout(t *testing.T) { func TestServePortForward(t *testing.T) { tests := []struct { - port string - uid bool - clientData string - containerData string - shouldError bool + port string + uid bool + clientData string + containerData string + shouldError bool + responseLocation string }{ {port: "", shouldError: true}, {port: "abc", shouldError: true}, @@ -1451,6 +1485,7 @@ func TestServePortForward(t *testing.T) { {port: "8000", clientData: "client data", containerData: "container data", shouldError: false}, {port: "65535", shouldError: false}, {port: "65535", uid: true, shouldError: false}, + {port: "65535", responseLocation: "http://localhost:12345/portforward", shouldError: false}, } podNamespace := "other" @@ -1466,6 +1501,12 @@ func TestServePortForward(t *testing.T) { return 0 } + if test.responseLocation != "" { + var err error + fw.fakeKubelet.redirectURL, err = url.Parse(test.responseLocation) + require.NoError(t, err) + } + portForwardFuncDone := make(chan struct{}) fw.fakeKubelet.portForwardFunc = func(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error { @@ -1517,6 +1558,10 @@ func TestServePortForward(t *testing.T) { upgradeRoundTripper := spdy.NewRoundTripper(nil) c := &http.Client{Transport: upgradeRoundTripper} + // Don't follow redirects, since we want to inspect the redirect response. + c.CheckRedirect = func(*http.Request, []*http.Request) error { + return http.ErrUseLastResponse + } resp, err := c.Post(url, "", nil) if err != nil { @@ -1524,6 +1569,14 @@ func TestServePortForward(t *testing.T) { } defer resp.Body.Close() + if test.responseLocation != "" { + assert.Equal(t, http.StatusFound, resp.StatusCode, "%d: status code", i) + assert.Equal(t, test.responseLocation, resp.Header.Get("Location"), "%d: location", i) + continue + } else { + assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode, "%d: status code", i) + } + conn, err := upgradeRoundTripper.NewConnection(resp) if err != nil { t.Fatalf("Unexpected error creating streaming connection: %s", err) diff --git a/pkg/kubelet/server/streaming/BUILD b/pkg/kubelet/server/streaming/BUILD index 5e818a6a397..089ab2c0a07 100644 --- a/pkg/kubelet/server/streaming/BUILD +++ b/pkg/kubelet/server/streaming/BUILD @@ -24,6 +24,7 @@ go_library( "//pkg/types:go_default_library", "//pkg/util/term:go_default_library", "//vendor:github.com/emicklei/go-restful", + "//vendor:google.golang.org/grpc", "//vendor:google.golang.org/grpc/codes", "//vendor:k8s.io/client-go/pkg/api", ], diff --git a/pkg/kubelet/server/streaming/errors.go b/pkg/kubelet/server/streaming/errors.go index 1fdd8dfab0b..3d957bb1edc 100644 --- a/pkg/kubelet/server/streaming/errors.go +++ b/pkg/kubelet/server/streaming/errors.go @@ -18,30 +18,27 @@ package streaming import ( "fmt" + "net/http" "time" + "google.golang.org/grpc" "google.golang.org/grpc/codes" ) -type ResponseError struct { - Err string - Code codes.Code -} - -func (e *ResponseError) Error() string { - return e.Err -} - func ErrorStreamingDisabled(method string) error { - return &ResponseError{ - Err: fmt.Sprintf("streaming method %s disabled", method), - Code: codes.NotFound, - } + return grpc.Errorf(codes.NotFound, fmt.Sprintf("streaming method %s disabled", method)) } func ErrorTimeout(op string, timeout time.Duration) error { - return &ResponseError{ - Err: fmt.Sprintf("%s timed out after %s", op, timeout.String()), - Code: codes.DeadlineExceeded, + return grpc.Errorf(codes.DeadlineExceeded, fmt.Sprintf("%s timed out after %s", op, timeout.String())) +} + +// Translates a CRI streaming error into an HTTP status code. +func HTTPStatus(err error) int { + switch grpc.Code(err) { + case codes.NotFound: + return http.StatusNotFound + default: + return http.StatusInternalServerError } } diff --git a/pkg/kubelet/util/format/BUILD b/pkg/kubelet/util/format/BUILD index 941a6c274de..52879359cd3 100644 --- a/pkg/kubelet/util/format/BUILD +++ b/pkg/kubelet/util/format/BUILD @@ -17,7 +17,10 @@ go_library( "resources.go", ], tags = ["automanaged"], - deps = ["//pkg/api:go_default_library"], + deps = [ + "//pkg/api:go_default_library", + "//pkg/types:go_default_library", + ], ) go_test( diff --git a/pkg/kubelet/util/format/pod.go b/pkg/kubelet/util/format/pod.go index 1ec76455180..13bfc764430 100644 --- a/pkg/kubelet/util/format/pod.go +++ b/pkg/kubelet/util/format/pod.go @@ -22,16 +22,23 @@ import ( "time" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/types" ) type podHandler func(*api.Pod) string -// Pod returns a string reprenetating a pod in a human readable format, +// Pod returns a string representing a pod in a consistent human readable format, // with pod UID as part of the string. func Pod(pod *api.Pod) string { + return PodDesc(pod.Name, pod.Namespace, pod.UID) +} + +// PodDesc returns a string representing a pod in a consistent human readable format, +// with pod UID as part of the string. +func PodDesc(podName, podNamespace string, podUID types.UID) string { // Use underscore as the delimiter because it is not allowed in pod name // (DNS subdomain format), while allowed in the container name format. - return fmt.Sprintf("%s_%s(%s)", pod.Name, pod.Namespace, pod.UID) + return fmt.Sprintf("%s_%s(%s)", podName, podNamespace, podUID) } // PodWithDeletionTimestamp is the same as Pod. In addition, it prints the