kubelet: dockershim should return grpc status with DeadlineExceeded code

Signed-off-by: Andrew Sy Kim <kim.andrewsy@gmail.com>
This commit is contained in:
Andrew Sy Kim 2020-11-15 20:22:53 -05:00
parent 147a120948
commit a59189e213
5 changed files with 97 additions and 67 deletions

View File

@ -37,7 +37,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/credentialprovider:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubelet/apis/config:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/checkpointmanager/checksum:go_default_library",
@ -56,13 +55,11 @@ go_library(
"//pkg/kubelet/types:go_default_library",
"//pkg/kubelet/util/cache:go_default_library",
"//pkg/kubelet/util/ioutils:go_default_library",
"//pkg/probe/exec:go_default_library",
"//pkg/util/parsers:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/tools/remotecommand:go_default_library",
"//staging/src/k8s.io/cri-api/pkg/apis/runtime/v1alpha2:go_default_library",
"//vendor/github.com/armon/circbuf:go_default_library",
@ -73,6 +70,8 @@ go_library(
"//vendor/github.com/docker/docker/api/types/strslice:go_default_library",
"//vendor/github.com/docker/docker/pkg/jsonmessage:go_default_library",
"//vendor/github.com/docker/go-connections/nat:go_default_library",
"//vendor/google.golang.org/grpc/codes:go_default_library",
"//vendor/google.golang.org/grpc/status:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
] + select({

View File

@ -21,20 +21,23 @@ package dockershim
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"math"
"time"
dockertypes "github.com/docker/docker/api/types"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/client-go/tools/remotecommand"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/cri/streaming"
"k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker"
"k8s.io/kubernetes/pkg/kubelet/util/ioutils"
utilexec "k8s.io/utils/exec"
"k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker"
)
type streamingRuntime struct {
@ -47,16 +50,17 @@ var _ streaming.Runtime = &streamingRuntime{}
const maxMsgSize = 1024 * 1024 * 16
func (r *streamingRuntime) Exec(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
return r.exec(containerID, cmd, in, out, err, tty, resize, 0)
return r.exec(context.TODO(), containerID, cmd, in, out, err, tty, resize, 0)
}
// Internal version of Exec adds a timeout.
func (r *streamingRuntime) exec(containerID string, cmd []string, in io.Reader, out, errw io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
func (r *streamingRuntime) exec(ctx context.Context, containerID string, cmd []string, in io.Reader, out, errw io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
container, err := checkContainerStatus(r.client, containerID)
if err != nil {
return err
}
return r.execHandler.ExecInContainer(r.client, container, cmd, in, out, errw, tty, resize, timeout)
return r.execHandler.ExecInContainer(ctx, r.client, container, cmd, in, out, errw, tty, resize, timeout)
}
func (r *streamingRuntime) Attach(containerID string, in io.Reader, out, errw io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
@ -77,10 +81,10 @@ func (r *streamingRuntime) PortForward(podSandboxID string, port int32, stream i
// ExecSync executes a command in the container, and returns the stdout output.
// If command exits with a non-zero exit code, an error is returned.
func (ds *dockerService) ExecSync(_ context.Context, req *runtimeapi.ExecSyncRequest) (*runtimeapi.ExecSyncResponse, error) {
func (ds *dockerService) ExecSync(ctx context.Context, req *runtimeapi.ExecSyncRequest) (*runtimeapi.ExecSyncResponse, error) {
timeout := time.Duration(req.Timeout) * time.Second
var stdoutBuffer, stderrBuffer bytes.Buffer
err := ds.streamingRuntime.exec(req.ContainerId, req.Cmd,
err := ds.streamingRuntime.exec(ctx, req.ContainerId, req.Cmd,
nil, // in
ioutils.WriteCloserWrapper(ioutils.LimitWriter(&stdoutBuffer, maxMsgSize)),
ioutils.WriteCloserWrapper(ioutils.LimitWriter(&stderrBuffer, maxMsgSize)),
@ -88,6 +92,11 @@ func (ds *dockerService) ExecSync(_ context.Context, req *runtimeapi.ExecSyncReq
nil, // resize
timeout)
// kubelet's remote runtime expects a grpc error with status code DeadlineExceeded on time out.
if errors.Is(err, context.DeadlineExceeded) {
return nil, status.Errorf(codes.DeadlineExceeded, err.Error())
}
var exitCode int32
if err != nil {
exitError, ok := err.(utilexec.ExitError)

View File

@ -20,6 +20,7 @@ package dockershim
import (
"bytes"
"context"
"fmt"
"io"
@ -28,7 +29,7 @@ import (
func (r *streamingRuntime) portForward(podSandboxID string, port int32, stream io.ReadWriteCloser) error {
stderr := new(bytes.Buffer)
err := r.exec(podSandboxID, []string{"wincat.exe", "127.0.0.1", fmt.Sprint(port)}, stream, stream, ioutils.WriteCloserWrapper(stderr), false, nil, 0)
err := r.exec(context.TODO(), podSandboxID, []string{"wincat.exe", "127.0.0.1", fmt.Sprint(port)}, stream, stream, ioutils.WriteCloserWrapper(stderr), false, nil, 0)
if err != nil {
return fmt.Errorf("%v: %s", err, stderr.String())
}

View File

@ -19,26 +19,22 @@ limitations under the License.
package dockershim
import (
"context"
"fmt"
"io"
"strings"
"time"
dockertypes "github.com/docker/docker/api/types"
"k8s.io/klog/v2"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/kubernetes/pkg/features"
"k8s.io/klog/v2"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/probe/exec"
"k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker"
)
// ExecHandler knows how to execute a command in a running Docker container.
type ExecHandler interface {
ExecInContainer(client libdocker.Interface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error
ExecInContainer(ctx context.Context, client libdocker.Interface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error
}
type dockerExitError struct {
@ -65,7 +61,7 @@ func (d *dockerExitError) ExitStatus() int {
type NativeExecHandler struct{}
// ExecInContainer executes the cmd in container using the Docker's exec API
func (*NativeExecHandler) ExecInContainer(client libdocker.Interface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
func (*NativeExecHandler) ExecInContainer(ctx context.Context, client libdocker.Interface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
done := make(chan struct{})
defer close(done)
@ -109,53 +105,55 @@ func (*NativeExecHandler) ExecInContainer(client libdocker.Interface, container
RawTerminal: tty,
ExecStarted: execStarted,
}
err = client.StartExec(execObj.ID, startOpts, streamOpts)
if err != nil {
return err
if timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
// if ExecProbeTimeout feature gate is disabled, preserve existing behavior to ignore exec timeouts
var execTimeout <-chan time.Time
if timeout > 0 && utilfeature.DefaultFeatureGate.Enabled(features.ExecProbeTimeout) {
execTimeout = time.After(timeout)
} else {
// skip exec timeout if provided timeout is 0
execTimeout = nil
}
// StartExec is a blocking call, so we need to run it concurrently and catch
// its error in a channel
execErr := make(chan error, 1)
go func() {
execErr <- client.StartExec(execObj.ID, startOpts, streamOpts)
}()
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
count := 0
for {
select {
case <-execTimeout:
return exec.NewTimeoutError(fmt.Errorf("command %q timed out", strings.Join(cmd, " ")), timeout)
// need to use "default" here instead of <-ticker.C, otherwise we delay the initial InspectExec by 2 seconds.
default:
inspect, inspectErr := client.InspectExec(execObj.ID)
if inspectErr != nil {
return inspectErr
}
if !inspect.Running {
if inspect.ExitCode != 0 {
return &dockerExitError{inspect}
}
return nil
}
// Only limit the amount of InspectExec calls if the exec timeout was not set.
// When a timeout is not set, we stop polling the exec session after 5 attempts and allow the process to continue running.
if execTimeout == nil {
count++
if count == 5 {
klog.Errorf("Exec session %s in container %s terminated but process still running!", execObj.ID, container.ID)
return nil
}
}
<-ticker.C
select {
case <-ctx.Done():
return ctx.Err()
case err := <-execErr:
if err != nil {
return err
}
}
// InspectExec may not always return latest state of exec, so call it a few times until
// it returns an exec inspect that shows that the process is no longer running.
retries := 0
maxRetries := 5
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
inspect, err := client.InspectExec(execObj.ID)
if err != nil {
return err
}
if !inspect.Running {
if inspect.ExitCode != 0 {
return &dockerExitError{inspect}
}
return nil
}
retries++
if retries == maxRetries {
klog.Errorf("Exec session %s in container %s terminated but process still running!", execObj.ID, container.ID)
return nil
}
<-ticker.C
}
}

View File

@ -19,6 +19,7 @@ limitations under the License.
package dockershim
import (
"context"
"fmt"
"io"
"testing"
@ -29,7 +30,6 @@ import (
"github.com/stretchr/testify/assert"
"k8s.io/client-go/tools/remotecommand"
mockclient "k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker/testing"
)
@ -37,6 +37,7 @@ func TestExecInContainer(t *testing.T) {
testcases := []struct {
description string
timeout time.Duration
returnCreateExec1 *dockertypes.IDResponse
returnCreateExec2 error
returnStartExec error
@ -45,6 +46,7 @@ func TestExecInContainer(t *testing.T) {
expectError error
}{{
description: "ExecInContainer succeeds",
timeout: time.Minute,
returnCreateExec1: &dockertypes.IDResponse{ID: "12345678"},
returnCreateExec2: nil,
returnStartExec: nil,
@ -58,6 +60,7 @@ func TestExecInContainer(t *testing.T) {
expectError: nil,
}, {
description: "CreateExec returns an error",
timeout: time.Minute,
returnCreateExec1: nil,
returnCreateExec2: fmt.Errorf("error in CreateExec()"),
returnStartExec: nil,
@ -66,6 +69,7 @@ func TestExecInContainer(t *testing.T) {
expectError: fmt.Errorf("failed to exec in container - Exec setup failed - error in CreateExec()"),
}, {
description: "StartExec returns an error",
timeout: time.Minute,
returnCreateExec1: &dockertypes.IDResponse{ID: "12345678"},
returnCreateExec2: nil,
returnStartExec: fmt.Errorf("error in StartExec()"),
@ -74,12 +78,27 @@ func TestExecInContainer(t *testing.T) {
expectError: fmt.Errorf("error in StartExec()"),
}, {
description: "InspectExec returns an error",
timeout: time.Minute,
returnCreateExec1: &dockertypes.IDResponse{ID: "12345678"},
returnCreateExec2: nil,
returnStartExec: nil,
returnInspectExec1: nil,
returnInspectExec2: fmt.Errorf("error in InspectExec()"),
expectError: fmt.Errorf("error in InspectExec()"),
}, {
description: "ExecInContainer returns context DeadlineExceeded",
timeout: 1 * time.Second,
returnCreateExec1: &dockertypes.IDResponse{ID: "12345678"},
returnCreateExec2: nil,
returnStartExec: context.DeadlineExceeded,
returnInspectExec1: &dockertypes.ContainerExecInspect{
ExecID: "200",
ContainerID: "12345678",
Running: true,
ExitCode: 0,
Pid: 100},
returnInspectExec2: nil,
expectError: context.DeadlineExceeded,
}}
eh := &NativeExecHandler{}
@ -89,7 +108,6 @@ func TestExecInContainer(t *testing.T) {
var stdin io.Reader
var stdout, stderr io.WriteCloser
var resize <-chan remotecommand.TerminalSize
var timeout time.Duration
for _, tc := range testcases {
t.Logf("TestCase: %q", tc.description)
@ -102,8 +120,13 @@ func TestExecInContainer(t *testing.T) {
mockClient.EXPECT().InspectExec(gomock.Any()).Return(
tc.returnInspectExec1,
tc.returnInspectExec2)
err := eh.ExecInContainer(mockClient, container, cmd, stdin, stdout, stderr, false, resize, timeout)
assert.Equal(t, err, tc.expectError)
// use parent context of 2 minutes since that's the default remote
// runtime connection timeout used by dockershim
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
err := eh.ExecInContainer(ctx, mockClient, container, cmd, stdin, stdout, stderr, false, resize, tc.timeout)
assert.Equal(t, tc.expectError, err)
}
}