Remove direct and indirect streaming runtime interface.

This commit is contained in:
Lantao Liu 2018-05-16 01:14:06 -07:00
parent 10b9fd3a01
commit aeb6cacf01
7 changed files with 79 additions and 291 deletions

View File

@ -17,11 +17,9 @@ limitations under the License.
package container
import (
"bytes"
"fmt"
"hash/fnv"
"strings"
"time"
"github.com/golang/glog"
@ -32,7 +30,6 @@ import (
"k8s.io/client-go/tools/record"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/kubelet/util/ioutils"
hashutil "k8s.io/kubernetes/pkg/util/hash"
"k8s.io/kubernetes/third_party/forked/golang/expansion"
)
@ -253,22 +250,6 @@ func FormatPod(pod *Pod) string {
return fmt.Sprintf("%s_%s(%s)", pod.Name, pod.Namespace, pod.ID)
}
type containerCommandRunnerWrapper struct {
DirectStreamingRuntime
}
var _ ContainerCommandRunner = &containerCommandRunnerWrapper{}
func (r *containerCommandRunnerWrapper) RunInContainer(id ContainerID, cmd []string, timeout time.Duration) ([]byte, error) {
var buffer bytes.Buffer
output := ioutils.WriteCloserWrapper(&buffer)
err := r.ExecInContainer(id, cmd, nil, output, output, false, nil, timeout)
// Even if err is non-nil, there still may be output (e.g. the exec wrote to stdout or stderr but
// the command returned a nonzero exit code). Therefore, always return the output along with the
// error.
return buffer.Bytes(), err
}
// GetContainerSpec gets the container spec by containerName.
func GetContainerSpec(pod *v1.Pod, containerName string) *v1.Container {
for i, c := range pod.Spec.Containers {

View File

@ -124,22 +124,10 @@ type Runtime interface {
UpdatePodCIDR(podCIDR string) error
}
// DirectStreamingRuntime is the interface implemented by runtimes for which the streaming calls
// (exec/attach/port-forward) should be served directly by the Kubelet.
type DirectStreamingRuntime interface {
// Runs the command in the container of the specified pod. Attaches
// the processes stdin, stdout, and stderr. Optionally uses a tty.
ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error
// Forward the specified port from the specified pod to the stream.
PortForward(pod *Pod, port int32, stream io.ReadWriteCloser) error
// ContainerAttach encapsulates the attaching to containers for testability
ContainerAttacher
}
// IndirectStreamingRuntime is the interface implemented by runtimes that handle the serving of the
// StreamingRuntime is the interface implemented by runtimes that handle the serving of the
// streaming calls (exec/attach/port-forward) themselves. In this case, Kubelet should redirect to
// the runtime server.
type IndirectStreamingRuntime interface {
type StreamingRuntime interface {
GetExec(id ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error)
GetAttach(id ContainerID, stdin, stdout, stderr, tty bool) (*url.URL, error)
GetPortForward(podName, podNamespace string, podUID types.UID, ports []int32) (*url.URL, error)

View File

@ -26,7 +26,6 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/client-go/util/flowcontrol"
. "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/volume"
@ -59,34 +58,13 @@ type FakeRuntime struct {
StatusErr error
}
type FakeDirectStreamingRuntime struct {
*FakeRuntime
// Arguments to streaming method calls.
Args struct {
// Attach / Exec args
ContainerID ContainerID
Cmd []string
Stdin io.Reader
Stdout io.WriteCloser
Stderr io.WriteCloser
TTY bool
// Port-forward args
Pod *Pod
Port int32
Stream io.ReadWriteCloser
}
}
var _ DirectStreamingRuntime = &FakeDirectStreamingRuntime{}
const FakeHost = "localhost:12345"
type FakeIndirectStreamingRuntime struct {
type FakeStreamingRuntime struct {
*FakeRuntime
}
var _ IndirectStreamingRuntime = &FakeIndirectStreamingRuntime{}
var _ StreamingRuntime = &FakeStreamingRuntime{}
// FakeRuntime should implement Runtime.
var _ Runtime = &FakeRuntime{}
@ -311,35 +289,6 @@ func (f *FakeRuntime) GetPodStatus(uid types.UID, name, namespace string) (*PodS
return &status, f.Err
}
func (f *FakeDirectStreamingRuntime) ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
f.Lock()
defer f.Unlock()
f.CalledFunctions = append(f.CalledFunctions, "ExecInContainer")
f.Args.ContainerID = containerID
f.Args.Cmd = cmd
f.Args.Stdin = stdin
f.Args.Stdout = stdout
f.Args.Stderr = stderr
f.Args.TTY = tty
return f.Err
}
func (f *FakeDirectStreamingRuntime) AttachContainer(containerID ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
f.Lock()
defer f.Unlock()
f.CalledFunctions = append(f.CalledFunctions, "AttachContainer")
f.Args.ContainerID = containerID
f.Args.Stdin = stdin
f.Args.Stdout = stdout
f.Args.Stderr = stderr
f.Args.TTY = tty
return f.Err
}
func (f *FakeRuntime) GetContainerLogs(pod *v1.Pod, containerID ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error) {
f.Lock()
defer f.Unlock()
@ -394,18 +343,6 @@ func (f *FakeRuntime) RemoveImage(image ImageSpec) error {
return f.Err
}
func (f *FakeDirectStreamingRuntime) PortForward(pod *Pod, port int32, stream io.ReadWriteCloser) error {
f.Lock()
defer f.Unlock()
f.CalledFunctions = append(f.CalledFunctions, "PortForward")
f.Args.Pod = pod
f.Args.Port = port
f.Args.Stream = stream
return f.Err
}
func (f *FakeRuntime) GetNetNS(containerID ContainerID) (string, error) {
f.Lock()
defer f.Unlock()
@ -455,7 +392,7 @@ func (f *FakeRuntime) ImageStats() (*ImageStats, error) {
return nil, f.Err
}
func (f *FakeIndirectStreamingRuntime) GetExec(id ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error) {
func (f *FakeStreamingRuntime) GetExec(id ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error) {
f.Lock()
defer f.Unlock()
@ -463,7 +400,7 @@ func (f *FakeIndirectStreamingRuntime) GetExec(id ContainerID, cmd []string, std
return &url.URL{Host: FakeHost}, f.Err
}
func (f *FakeIndirectStreamingRuntime) GetAttach(id ContainerID, stdin, stdout, stderr, tty bool) (*url.URL, error) {
func (f *FakeStreamingRuntime) GetAttach(id ContainerID, stdin, stdout, stderr, tty bool) (*url.URL, error) {
f.Lock()
defer f.Unlock()
@ -471,7 +408,7 @@ func (f *FakeIndirectStreamingRuntime) GetAttach(id ContainerID, stdin, stdout,
return &url.URL{Host: FakeHost}, f.Err
}
func (f *FakeIndirectStreamingRuntime) GetPortForward(podName, podNamespace string, podUID types.UID, ports []int32) (*url.URL, error) {
func (f *FakeStreamingRuntime) GetPortForward(podName, podNamespace string, podUID types.UID, ports []int32) (*url.URL, error) {
f.Lock()
defer f.Unlock()

View File

@ -670,6 +670,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
return nil, err
}
klet.containerRuntime = runtime
klet.streamingRuntime = runtime
klet.runner = runtime
if cadvisor.UsingLegacyCadvisorStats(containerRuntime, remoteRuntimeEndpoint) {
@ -1002,6 +1003,9 @@ type Kubelet struct {
// Container runtime.
containerRuntime kubecontainer.Runtime
// Streaming runtime handles container streaming.
streamingRuntime kubecontainer.StreamingRuntime
// Container runtime service (needed by container runtime Start()).
// TODO(CD): try to make this available without holding a reference in this
// struct. For example, by adding a getter to generic runtime.

View File

@ -1592,139 +1592,78 @@ func (kl *Kubelet) RunInContainer(podFullName string, podUID types.UID, containe
// ExecInContainer executes a command in a container, connecting the supplied
// stdin/stdout/stderr to the command's IO streams.
func (kl *Kubelet) ExecInContainer(podFullName string, podUID types.UID, containerName string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
streamingRuntime, ok := kl.containerRuntime.(kubecontainer.DirectStreamingRuntime)
if !ok {
return fmt.Errorf("streaming methods not supported by runtime")
}
container, err := kl.findContainer(podFullName, podUID, containerName)
if err != nil {
return err
}
if container == nil {
return fmt.Errorf("container not found (%q)", containerName)
}
return streamingRuntime.ExecInContainer(container.ID, cmd, stdin, stdout, stderr, tty, resize, timeout)
// TODO(random-liu): Remove this.
return fmt.Errorf("unimplemented")
}
// AttachContainer uses the container runtime to attach the given streams to
// the given container.
func (kl *Kubelet) AttachContainer(podFullName string, podUID types.UID, containerName string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
streamingRuntime, ok := kl.containerRuntime.(kubecontainer.DirectStreamingRuntime)
if !ok {
return fmt.Errorf("streaming methods not supported by runtime")
}
container, err := kl.findContainer(podFullName, podUID, containerName)
if err != nil {
return err
}
if container == nil {
return fmt.Errorf("container not found (%q)", containerName)
}
return streamingRuntime.AttachContainer(container.ID, stdin, stdout, stderr, tty, resize)
// TODO(random-liu): Remove this.
return fmt.Errorf("unimplemented")
}
// PortForward connects to the pod's port and copies data between the port
// and the stream.
func (kl *Kubelet) PortForward(podFullName string, podUID types.UID, port int32, stream io.ReadWriteCloser) error {
streamingRuntime, ok := kl.containerRuntime.(kubecontainer.DirectStreamingRuntime)
if !ok {
return fmt.Errorf("streaming methods not supported by runtime")
}
pods, err := kl.containerRuntime.GetPods(false)
if err != nil {
return err
}
// Resolve and type convert back again.
// We need the static pod UID but the kubecontainer API works with types.UID.
podUID = types.UID(kl.podManager.TranslatePodUID(podUID))
pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID)
if pod.IsEmpty() {
return fmt.Errorf("pod not found (%q)", podFullName)
}
return streamingRuntime.PortForward(&pod, port, stream)
// TODO(random-liu): Remove this.
return fmt.Errorf("unimplemented")
}
// GetExec gets the URL the exec will be served from, or nil if the Kubelet will serve it.
func (kl *Kubelet) GetExec(podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommandserver.Options) (*url.URL, error) {
switch streamingRuntime := kl.containerRuntime.(type) {
case kubecontainer.DirectStreamingRuntime:
// Kubelet will serve the exec directly.
return nil, nil
case kubecontainer.IndirectStreamingRuntime:
container, err := kl.findContainer(podFullName, podUID, containerName)
if err != nil {
return nil, err
}
if container == nil {
return nil, fmt.Errorf("container not found (%q)", containerName)
}
return streamingRuntime.GetExec(container.ID, cmd, streamOpts.Stdin, streamOpts.Stdout, streamOpts.Stderr, streamOpts.TTY)
default:
return nil, fmt.Errorf("container runtime does not support exec")
container, err := kl.findContainer(podFullName, podUID, containerName)
if err != nil {
return nil, err
}
if container == nil {
return nil, fmt.Errorf("container not found (%q)", containerName)
}
return kl.streamingRuntime.GetExec(container.ID, cmd, streamOpts.Stdin, streamOpts.Stdout, streamOpts.Stderr, streamOpts.TTY)
}
// GetAttach gets the URL the attach will be served from, or nil if the Kubelet will serve it.
func (kl *Kubelet) GetAttach(podFullName string, podUID types.UID, containerName string, streamOpts remotecommandserver.Options) (*url.URL, error) {
switch streamingRuntime := kl.containerRuntime.(type) {
case kubecontainer.DirectStreamingRuntime:
// Kubelet will serve the attach directly.
return nil, nil
case kubecontainer.IndirectStreamingRuntime:
container, err := kl.findContainer(podFullName, podUID, containerName)
if err != nil {
return nil, err
}
if container == nil {
return nil, fmt.Errorf("container %s not found in pod %s", containerName, podFullName)
}
// The TTY setting for attach must match the TTY setting in the initial container configuration,
// since whether the process is running in a TTY cannot be changed after it has started. We
// need the api.Pod to get the TTY status.
pod, found := kl.GetPodByFullName(podFullName)
if !found || (string(podUID) != "" && pod.UID != podUID) {
return nil, fmt.Errorf("pod %s not found", podFullName)
}
containerSpec := kubecontainer.GetContainerSpec(pod, containerName)
if containerSpec == nil {
return nil, fmt.Errorf("container %s not found in pod %s", containerName, podFullName)
}
tty := containerSpec.TTY
return streamingRuntime.GetAttach(container.ID, streamOpts.Stdin, streamOpts.Stdout, streamOpts.Stderr, tty)
default:
return nil, fmt.Errorf("container runtime does not support attach")
container, err := kl.findContainer(podFullName, podUID, containerName)
if err != nil {
return nil, err
}
if container == nil {
return nil, fmt.Errorf("container %s not found in pod %s", containerName, podFullName)
}
// The TTY setting for attach must match the TTY setting in the initial container configuration,
// since whether the process is running in a TTY cannot be changed after it has started. We
// need the api.Pod to get the TTY status.
pod, found := kl.GetPodByFullName(podFullName)
if !found || (string(podUID) != "" && pod.UID != podUID) {
return nil, fmt.Errorf("pod %s not found", podFullName)
}
containerSpec := kubecontainer.GetContainerSpec(pod, containerName)
if containerSpec == nil {
return nil, fmt.Errorf("container %s not found in pod %s", containerName, podFullName)
}
tty := containerSpec.TTY
return kl.streamingRuntime.GetAttach(container.ID, streamOpts.Stdin, streamOpts.Stdout, streamOpts.Stderr, tty)
}
// GetPortForward gets the URL the port-forward will be served from, or nil if the Kubelet will serve it.
func (kl *Kubelet) GetPortForward(podName, podNamespace string, podUID types.UID, portForwardOpts portforward.V4Options) (*url.URL, error) {
switch streamingRuntime := kl.containerRuntime.(type) {
case kubecontainer.DirectStreamingRuntime:
// Kubelet will serve the attach directly.
return nil, nil
case kubecontainer.IndirectStreamingRuntime:
pods, err := kl.containerRuntime.GetPods(false)
if err != nil {
return nil, err
}
// Resolve and type convert back again.
// We need the static pod UID but the kubecontainer API works with types.UID.
podUID = types.UID(kl.podManager.TranslatePodUID(podUID))
podFullName := kubecontainer.BuildPodFullName(podName, podNamespace)
pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID)
if pod.IsEmpty() {
return nil, fmt.Errorf("pod not found (%q)", podFullName)
}
return streamingRuntime.GetPortForward(podName, podNamespace, podUID, portForwardOpts.Ports)
default:
return nil, fmt.Errorf("container runtime does not support port-forward")
pods, err := kl.containerRuntime.GetPods(false)
if err != nil {
return nil, err
}
// Resolve and type convert back again.
// We need the static pod UID but the kubecontainer API works with types.UID.
podUID = types.UID(kl.podManager.TranslatePodUID(podUID))
podFullName := kubecontainer.BuildPodFullName(podName, podNamespace)
pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID)
if pod.IsEmpty() {
return nil, fmt.Errorf("pod not found (%q)", podFullName)
}
return kl.streamingRuntime.GetPortForward(podName, podNamespace, podUID, portForwardOpts.Ports)
}
// cleanupOrphanedPodCgroups removes cgroups that should no longer exist.

View File

@ -2149,53 +2149,21 @@ func TestExec(t *testing.T) {
}},
}
{ // No streaming case
description := "no streaming - " + tc.description
redirect, err := kubelet.GetExec(tc.podFullName, podUID, tc.container, command, remotecommand.Options{})
assert.Error(t, err, description)
assert.Nil(t, redirect, description)
description := "streaming - " + tc.description
fakeRuntime := &containertest.FakeStreamingRuntime{FakeRuntime: testKubelet.fakeRuntime}
kubelet.containerRuntime = fakeRuntime
kubelet.streamingRuntime = fakeRuntime
err = kubelet.ExecInContainer(tc.podFullName, podUID, tc.container, command, stdin, stdout, stderr, tty, nil, 0)
redirect, err := kubelet.GetExec(tc.podFullName, podUID, tc.container, command, remotecommand.Options{})
if tc.expectError {
assert.Error(t, err, description)
}
{ // Direct streaming case
description := "direct streaming - " + tc.description
fakeRuntime := &containertest.FakeDirectStreamingRuntime{FakeRuntime: testKubelet.fakeRuntime}
kubelet.containerRuntime = fakeRuntime
redirect, err := kubelet.GetExec(tc.podFullName, podUID, tc.container, command, remotecommand.Options{})
} else {
assert.NoError(t, err, description)
assert.Nil(t, redirect, description)
err = kubelet.ExecInContainer(tc.podFullName, podUID, tc.container, command, stdin, stdout, stderr, tty, nil, 0)
if tc.expectError {
assert.Error(t, err, description)
} else {
assert.NoError(t, err, description)
assert.Equal(t, fakeRuntime.Args.ContainerID.ID, containerID, description+": ID")
assert.Equal(t, fakeRuntime.Args.Cmd, command, description+": Command")
assert.Equal(t, fakeRuntime.Args.Stdin, stdin, description+": Stdin")
assert.Equal(t, fakeRuntime.Args.Stdout, stdout, description+": Stdout")
assert.Equal(t, fakeRuntime.Args.Stderr, stderr, description+": Stderr")
assert.Equal(t, fakeRuntime.Args.TTY, tty, description+": TTY")
}
assert.Equal(t, containertest.FakeHost, redirect.Host, description+": redirect")
}
{ // Indirect streaming case
description := "indirect streaming - " + tc.description
fakeRuntime := &containertest.FakeIndirectStreamingRuntime{FakeRuntime: testKubelet.fakeRuntime}
kubelet.containerRuntime = fakeRuntime
redirect, err := kubelet.GetExec(tc.podFullName, podUID, tc.container, command, remotecommand.Options{})
if tc.expectError {
assert.Error(t, err, description)
} else {
assert.NoError(t, err, description)
assert.Equal(t, containertest.FakeHost, redirect.Host, description+": redirect")
}
err = kubelet.ExecInContainer(tc.podFullName, podUID, tc.container, command, stdin, stdout, stderr, tty, nil, 0)
assert.Error(t, err, description)
}
err = kubelet.ExecInContainer(tc.podFullName, podUID, tc.container, command, stdin, stdout, stderr, tty, nil, 0)
assert.Error(t, err, description)
}
}
@ -2241,50 +2209,21 @@ func TestPortForward(t *testing.T) {
}
podFullName := kubecontainer.GetPodFullName(podWithUIDNameNs(podUID, tc.podName, podNamespace))
{ // No streaming case
description := "no streaming - " + tc.description
redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID, portforward.V4Options{})
assert.Error(t, err, description)
assert.Nil(t, redirect, description)
description := "streaming - " + tc.description
fakeRuntime := &containertest.FakeStreamingRuntime{FakeRuntime: testKubelet.fakeRuntime}
kubelet.containerRuntime = fakeRuntime
kubelet.streamingRuntime = fakeRuntime
err = kubelet.PortForward(podFullName, podUID, port, stream)
redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID, portforward.V4Options{})
if tc.expectError {
assert.Error(t, err, description)
}
{ // Direct streaming case
description := "direct streaming - " + tc.description
fakeRuntime := &containertest.FakeDirectStreamingRuntime{FakeRuntime: testKubelet.fakeRuntime}
kubelet.containerRuntime = fakeRuntime
redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID, portforward.V4Options{})
} else {
assert.NoError(t, err, description)
assert.Nil(t, redirect, description)
err = kubelet.PortForward(podFullName, podUID, port, stream)
if tc.expectError {
assert.Error(t, err, description)
} else {
assert.NoError(t, err, description)
require.Equal(t, fakeRuntime.Args.Pod.ID, podUID, description+": Pod UID")
require.Equal(t, fakeRuntime.Args.Port, port, description+": Port")
require.Equal(t, fakeRuntime.Args.Stream, stream, description+": stream")
}
assert.Equal(t, containertest.FakeHost, redirect.Host, description+": redirect")
}
{ // Indirect streaming case
description := "indirect streaming - " + tc.description
fakeRuntime := &containertest.FakeIndirectStreamingRuntime{FakeRuntime: testKubelet.fakeRuntime}
kubelet.containerRuntime = fakeRuntime
redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID, portforward.V4Options{})
if tc.expectError {
assert.Error(t, err, description)
} else {
assert.NoError(t, err, description)
assert.Equal(t, containertest.FakeHost, redirect.Host, description+": redirect")
}
err = kubelet.PortForward(podFullName, podUID, port, stream)
assert.Error(t, err, description)
}
err = kubelet.PortForward(podFullName, podUID, port, stream)
assert.Error(t, err, description)
}
}

View File

@ -120,7 +120,7 @@ type kubeGenericRuntimeManager struct {
type KubeGenericRuntime interface {
kubecontainer.Runtime
kubecontainer.IndirectStreamingRuntime
kubecontainer.StreamingRuntime
kubecontainer.ContainerCommandRunner
}