Merge pull request #36020 from timstclair/klet-stream

Automatic merge from submit-queue

Separate Direct and Indirect streaming paths, implement indirect path for CRI

This PR refactors the `pkg/kubelet/container.Runtime` interface to remove the `ExecInContainer`, `PortForward` and `AttachContainer` methods. Instead, those methods are part of the `DirectStreamingRuntime` interface which all "legacy" runtimes implement. I also added an `IndirectStreamingRuntime` which handles the redirect path and is implemented by CRI runtimes. To control the size of this PR, I did not fully setup the indirect streaming path for the dockershim, so I left legacy path behind.

Most of this PR is moving & renaming associated with the refactoring. To understand the functional changes, I suggest tracing the code from `getExec` in `pkg/kubelet/server/server.go`, which calls `GetExec` in `pkg/kubelet/kubelet_pods.go` to determine whether to follow the direct or indirect path.

For https://github.com/kubernetes/kubernetes/issues/29579

/cc @kubernetes/sig-node
This commit is contained in:
Kubernetes Submit Queue
2016-11-04 11:52:06 -07:00
committed by GitHub
28 changed files with 723 additions and 436 deletions

View File

@@ -19,8 +19,6 @@ package kubelet
import (
"bytes"
"errors"
"fmt"
"io"
"net"
"sort"
"testing"
@@ -32,9 +30,9 @@ import (
"k8s.io/kubernetes/pkg/apimachinery/registered"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/term"
)
func TestMakeMounts(t *testing.T) {
@@ -112,47 +110,6 @@ func TestMakeMounts(t *testing.T) {
assert.Equal(t, expectedMounts, mounts, "mounts of container %+v", container)
}
type fakeContainerCommandRunner struct {
// what was passed in
Cmd []string
ID kubecontainer.ContainerID
PodID types.UID
E error
Stdin io.Reader
Stdout io.WriteCloser
Stderr io.WriteCloser
TTY bool
Port uint16
Stream io.ReadWriteCloser
// what to return
StdoutData string
StderrData string
}
func (f *fakeContainerCommandRunner) ExecInContainer(id kubecontainer.ContainerID, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error {
// record params
f.Cmd = cmd
f.ID = id
f.Stdin = in
f.Stdout = out
f.Stderr = err
f.TTY = tty
// Copy stdout/stderr data
fmt.Fprint(out, f.StdoutData)
fmt.Fprint(out, f.StderrData)
return f.E
}
func (f *fakeContainerCommandRunner) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error {
f.PodID = pod.ID
f.Port = port
f.Stream = stream
return nil
}
func TestRunInContainerNoSuchPod(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
@@ -172,14 +129,13 @@ func TestRunInContainerNoSuchPod(t *testing.T) {
}
func TestRunInContainer(t *testing.T) {
for _, testError := range []error{nil, errors.New("foo")} {
for _, testError := range []error{nil, errors.New("bar")} {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
fakeRuntime := testKubelet.fakeRuntime
fakeCommandRunner := fakeContainerCommandRunner{
E: testError,
StdoutData: "foo",
StderrData: "bar",
fakeCommandRunner := containertest.FakeContainerCommandRunner{
Err: testError,
Stdout: "foo",
}
kubelet.runner = &fakeCommandRunner
@@ -198,11 +154,11 @@ func TestRunInContainer(t *testing.T) {
}
cmd := []string{"ls"}
actualOutput, err := kubelet.RunInContainer("podFoo_nsFoo", "", "containerFoo", cmd)
assert.Equal(t, containerID, fakeCommandRunner.ID, "(testError=%v) ID", testError)
assert.Equal(t, containerID, fakeCommandRunner.ContainerID, "(testError=%v) ID", testError)
assert.Equal(t, cmd, fakeCommandRunner.Cmd, "(testError=%v) command", testError)
// this isn't 100% foolproof as a bug in a real ContainerCommandRunner where it fails to copy to stdout/stderr wouldn't be caught by this test
assert.Equal(t, "foobar", string(actualOutput), "(testError=%v) output", testError)
assert.Equal(t, fmt.Sprintf("%s", err), fmt.Sprintf("%s", testError), "(testError=%v) err", testError)
assert.Equal(t, "foo", string(actualOutput), "(testError=%v) output", testError)
assert.Equal(t, err, testError, "(testError=%v) err", testError)
}
}
@@ -1052,73 +1008,6 @@ func TestPodPhaseWithRestartOnFailure(t *testing.T) {
}
}
func TestExecInContainerNoSuchPod(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
fakeRuntime := testKubelet.fakeRuntime
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet.runner = &fakeCommandRunner
fakeRuntime.PodList = []*containertest.FakePod{}
podName := "podFoo"
podNamespace := "nsFoo"
containerID := "containerFoo"
err := kubelet.ExecInContainer(
kubecontainer.GetPodFullName(&api.Pod{ObjectMeta: api.ObjectMeta{Name: podName, Namespace: podNamespace}}),
"",
containerID,
[]string{"ls"},
nil,
nil,
nil,
false,
nil,
)
require.Error(t, err)
require.True(t, fakeCommandRunner.ID.IsEmpty(), "Unexpected invocation of runner.ExecInContainer")
}
func TestExecInContainerNoSuchContainer(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
fakeRuntime := testKubelet.fakeRuntime
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet.runner = &fakeCommandRunner
podName := "podFoo"
podNamespace := "nsFoo"
containerID := "containerFoo"
fakeRuntime.PodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "12345678",
Name: podName,
Namespace: podNamespace,
Containers: []*kubecontainer.Container{
{Name: "bar",
ID: kubecontainer.ContainerID{Type: "test", ID: "barID"}},
},
}},
}
err := kubelet.ExecInContainer(
kubecontainer.GetPodFullName(&api.Pod{ObjectMeta: api.ObjectMeta{
UID: "12345678",
Name: podName,
Namespace: podNamespace,
}}),
"",
containerID,
[]string{"ls"},
nil,
nil,
nil,
false,
nil,
)
require.Error(t, err)
require.True(t, fakeCommandRunner.ID.IsEmpty(), "Unexpected invocation of runner.ExecInContainer")
}
type fakeReadWriteCloser struct{}
func (f *fakeReadWriteCloser) Write(data []byte) (int, error) {
@@ -1133,116 +1022,195 @@ func (f *fakeReadWriteCloser) Close() error {
return nil
}
func TestExecInContainer(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
fakeRuntime := testKubelet.fakeRuntime
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet.runner = &fakeCommandRunner
func TestExec(t *testing.T) {
const (
podName = "podFoo"
podNamespace = "nsFoo"
podUID types.UID = "12345678"
containerID = "containerFoo"
tty = true
)
var (
podFullName = kubecontainer.GetPodFullName(podWithUidNameNs(podUID, podName, podNamespace))
command = []string{"ls"}
stdin = &bytes.Buffer{}
stdout = &fakeReadWriteCloser{}
stderr = &fakeReadWriteCloser{}
)
podName := "podFoo"
podNamespace := "nsFoo"
containerID := "containerFoo"
command := []string{"ls"}
stdin := &bytes.Buffer{}
stdout := &fakeReadWriteCloser{}
stderr := &fakeReadWriteCloser{}
tty := true
fakeRuntime.PodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "12345678",
Name: podName,
Namespace: podNamespace,
Containers: []*kubecontainer.Container{
{Name: containerID,
ID: kubecontainer.ContainerID{Type: "test", ID: containerID},
testcases := []struct {
description string
podFullName string
container string
expectError bool
}{{
description: "success case",
podFullName: podFullName,
container: containerID,
}, {
description: "no such pod",
podFullName: "bar" + podFullName,
container: containerID,
expectError: true,
}, {
description: "no such container",
podFullName: podFullName,
container: "containerBar",
expectError: true,
}}
for _, tc := range testcases {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
testKubelet.fakeRuntime.PodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: podUID,
Name: podName,
Namespace: podNamespace,
Containers: []*kubecontainer.Container{
{Name: containerID,
ID: kubecontainer.ContainerID{Type: "test", ID: containerID},
},
},
},
}},
}},
}
{ // No streaming case
description := "no streaming - " + tc.description
redirect, err := kubelet.GetExec(tc.podFullName, podUID, tc.container, command, remotecommand.Options{})
assert.Error(t, err, description)
assert.Nil(t, redirect, description)
err = kubelet.ExecInContainer(tc.podFullName, podUID, tc.container, command, stdin, stdout, stderr, tty, nil)
assert.Error(t, err, description)
}
{ // Direct streaming case
description := "direct streaming - " + tc.description
fakeRuntime := &containertest.FakeDirectStreamingRuntime{FakeRuntime: testKubelet.fakeRuntime}
kubelet.containerRuntime = fakeRuntime
redirect, err := kubelet.GetExec(tc.podFullName, podUID, tc.container, command, remotecommand.Options{})
assert.NoError(t, err, description)
assert.Nil(t, redirect, description)
err = kubelet.ExecInContainer(tc.podFullName, podUID, tc.container, command, stdin, stdout, stderr, tty, nil)
if tc.expectError {
assert.Error(t, err, description)
} else {
assert.NoError(t, err, description)
assert.Equal(t, fakeRuntime.Args.ContainerID.ID, containerID, description+": ID")
assert.Equal(t, fakeRuntime.Args.Cmd, command, description+": Command")
assert.Equal(t, fakeRuntime.Args.Stdin, stdin, description+": Stdin")
assert.Equal(t, fakeRuntime.Args.Stdout, stdout, description+": Stdout")
assert.Equal(t, fakeRuntime.Args.Stderr, stderr, description+": Stderr")
assert.Equal(t, fakeRuntime.Args.TTY, tty, description+": TTY")
}
}
{ // Indirect streaming case
description := "indirect streaming - " + tc.description
fakeRuntime := &containertest.FakeIndirectStreamingRuntime{FakeRuntime: testKubelet.fakeRuntime}
kubelet.containerRuntime = fakeRuntime
redirect, err := kubelet.GetExec(tc.podFullName, podUID, tc.container, command, remotecommand.Options{})
if tc.expectError {
assert.Error(t, err, description)
} else {
assert.NoError(t, err, description)
assert.Equal(t, containertest.FakeHost, redirect.Host, description+": redirect")
}
err = kubelet.ExecInContainer(tc.podFullName, podUID, tc.container, command, stdin, stdout, stderr, tty, nil)
assert.Error(t, err, description)
}
}
err := kubelet.ExecInContainer(
kubecontainer.GetPodFullName(podWithUidNameNs("12345678", podName, podNamespace)),
"",
containerID,
[]string{"ls"},
stdin,
stdout,
stderr,
tty,
nil,
)
require.NoError(t, err)
require.Equal(t, fakeCommandRunner.ID.ID, containerID, "ID")
require.Equal(t, fakeCommandRunner.Cmd, command, "Command")
require.Equal(t, fakeCommandRunner.Stdin, stdin, "Stdin")
require.Equal(t, fakeCommandRunner.Stdout, stdout, "Stdout")
require.Equal(t, fakeCommandRunner.Stderr, stderr, "Stderr")
require.Equal(t, fakeCommandRunner.TTY, tty, "TTY")
}
func TestPortForwardNoSuchPod(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
fakeRuntime := testKubelet.fakeRuntime
fakeRuntime.PodList = []*containertest.FakePod{}
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet.runner = &fakeCommandRunner
podName := "podFoo"
podNamespace := "nsFoo"
var port uint16 = 5000
err := kubelet.PortForward(
kubecontainer.GetPodFullName(&api.Pod{ObjectMeta: api.ObjectMeta{Name: podName, Namespace: podNamespace}}),
"",
port,
nil,
)
require.Error(t, err)
require.True(t, fakeCommandRunner.ID.IsEmpty(), "unexpected invocation of runner.PortForward")
}
func TestPortForward(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
fakeRuntime := testKubelet.fakeRuntime
podName := "podFoo"
podNamespace := "nsFoo"
podID := types.UID("12345678")
fakeRuntime.PodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: podID,
Name: podName,
Namespace: podNamespace,
Containers: []*kubecontainer.Container{
{
Name: "foo",
ID: kubecontainer.ContainerID{Type: "test", ID: "containerFoo"},
},
},
}},
}
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet.runner = &fakeCommandRunner
var port uint16 = 5000
stream := &fakeReadWriteCloser{}
err := kubelet.PortForward(
kubecontainer.GetPodFullName(&api.Pod{ObjectMeta: api.ObjectMeta{
UID: "12345678",
Name: podName,
Namespace: podNamespace,
}}),
"",
port,
stream,
const (
podName = "podFoo"
podNamespace = "nsFoo"
podUID types.UID = "12345678"
port uint16 = 5000
)
require.NoError(t, err)
require.Equal(t, fakeCommandRunner.PodID, podID, "Pod ID")
require.Equal(t, fakeCommandRunner.Port, port, "Port")
require.Equal(t, fakeCommandRunner.Stream, stream, "stream")
var (
stream = &fakeReadWriteCloser{}
)
testcases := []struct {
description string
podName string
expectError bool
}{{
description: "success case",
podName: podName,
}, {
description: "no such pod",
podName: "bar",
expectError: true,
}}
for _, tc := range testcases {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
testKubelet.fakeRuntime.PodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: podUID,
Name: podName,
Namespace: podNamespace,
Containers: []*kubecontainer.Container{
{Name: "foo",
ID: kubecontainer.ContainerID{Type: "test", ID: "foo"},
},
},
}},
}
podFullName := kubecontainer.GetPodFullName(podWithUidNameNs(podUID, tc.podName, podNamespace))
{ // No streaming case
description := "no streaming - " + tc.description
redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID)
assert.Error(t, err, description)
assert.Nil(t, redirect, description)
err = kubelet.PortForward(podFullName, podUID, port, stream)
assert.Error(t, err, description)
}
{ // Direct streaming case
description := "direct streaming - " + tc.description
fakeRuntime := &containertest.FakeDirectStreamingRuntime{FakeRuntime: testKubelet.fakeRuntime}
kubelet.containerRuntime = fakeRuntime
redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID)
assert.NoError(t, err, description)
assert.Nil(t, redirect, description)
err = kubelet.PortForward(podFullName, podUID, port, stream)
if tc.expectError {
assert.Error(t, err, description)
} else {
assert.NoError(t, err, description)
require.Equal(t, fakeRuntime.Args.Pod.ID, podUID, description+": Pod UID")
require.Equal(t, fakeRuntime.Args.Port, port, description+": Port")
require.Equal(t, fakeRuntime.Args.Stream, stream, description+": stream")
}
}
{ // Indirect streaming case
description := "indirect streaming - " + tc.description
fakeRuntime := &containertest.FakeIndirectStreamingRuntime{FakeRuntime: testKubelet.fakeRuntime}
kubelet.containerRuntime = fakeRuntime
redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID)
if tc.expectError {
assert.Error(t, err, description)
} else {
assert.NoError(t, err, description)
assert.Equal(t, containertest.FakeHost, redirect.Host, description+": redirect")
}
err = kubelet.PortForward(podFullName, podUID, port, stream)
assert.Error(t, err, description)
}
}
}
// Tests that identify the host port conflicts are detected correctly.