diff --git a/pkg/kubelet/dockershim/BUILD b/pkg/kubelet/dockershim/BUILD index 4bd4958e11e..57919f932b8 100644 --- a/pkg/kubelet/dockershim/BUILD +++ b/pkg/kubelet/dockershim/BUILD @@ -37,7 +37,6 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/credentialprovider:go_default_library", - "//pkg/features:go_default_library", "//pkg/kubelet/apis/config:go_default_library", "//pkg/kubelet/checkpointmanager:go_default_library", "//pkg/kubelet/checkpointmanager/checksum:go_default_library", @@ -56,13 +55,11 @@ go_library( "//pkg/kubelet/types:go_default_library", "//pkg/kubelet/util/cache:go_default_library", "//pkg/kubelet/util/ioutils:go_default_library", - "//pkg/probe/exec:go_default_library", "//pkg/util/parsers:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/tools/remotecommand:go_default_library", "//staging/src/k8s.io/cri-api/pkg/apis/runtime/v1alpha2:go_default_library", "//vendor/github.com/armon/circbuf:go_default_library", @@ -73,6 +70,8 @@ go_library( "//vendor/github.com/docker/docker/api/types/strslice:go_default_library", "//vendor/github.com/docker/docker/pkg/jsonmessage:go_default_library", "//vendor/github.com/docker/go-connections/nat:go_default_library", + "//vendor/google.golang.org/grpc/codes:go_default_library", + "//vendor/google.golang.org/grpc/status:go_default_library", "//vendor/k8s.io/klog/v2:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", ] + select({ diff --git a/pkg/kubelet/dockershim/docker_streaming.go b/pkg/kubelet/dockershim/docker_streaming.go index 91e194570eb..c7d68ddeb6e 100644 --- a/pkg/kubelet/dockershim/docker_streaming.go +++ b/pkg/kubelet/dockershim/docker_streaming.go @@ -21,20 +21,23 @@ package dockershim import ( "bytes" "context" + "errors" "fmt" "io" "math" "time" dockertypes "github.com/docker/docker/api/types" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "k8s.io/client-go/tools/remotecommand" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/cri/streaming" + "k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker" "k8s.io/kubernetes/pkg/kubelet/util/ioutils" utilexec "k8s.io/utils/exec" - - "k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker" ) type streamingRuntime struct { @@ -47,16 +50,17 @@ var _ streaming.Runtime = &streamingRuntime{} const maxMsgSize = 1024 * 1024 * 16 func (r *streamingRuntime) Exec(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error { - return r.exec(containerID, cmd, in, out, err, tty, resize, 0) + return r.exec(context.TODO(), containerID, cmd, in, out, err, tty, resize, 0) } // Internal version of Exec adds a timeout. -func (r *streamingRuntime) exec(containerID string, cmd []string, in io.Reader, out, errw io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { +func (r *streamingRuntime) exec(ctx context.Context, containerID string, cmd []string, in io.Reader, out, errw io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { container, err := checkContainerStatus(r.client, containerID) if err != nil { return err } - return r.execHandler.ExecInContainer(r.client, container, cmd, in, out, errw, tty, resize, timeout) + + return r.execHandler.ExecInContainer(ctx, r.client, container, cmd, in, out, errw, tty, resize, timeout) } func (r *streamingRuntime) Attach(containerID string, in io.Reader, out, errw io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error { @@ -77,10 +81,10 @@ func (r *streamingRuntime) PortForward(podSandboxID string, port int32, stream i // ExecSync executes a command in the container, and returns the stdout output. // If command exits with a non-zero exit code, an error is returned. -func (ds *dockerService) ExecSync(_ context.Context, req *runtimeapi.ExecSyncRequest) (*runtimeapi.ExecSyncResponse, error) { +func (ds *dockerService) ExecSync(ctx context.Context, req *runtimeapi.ExecSyncRequest) (*runtimeapi.ExecSyncResponse, error) { timeout := time.Duration(req.Timeout) * time.Second var stdoutBuffer, stderrBuffer bytes.Buffer - err := ds.streamingRuntime.exec(req.ContainerId, req.Cmd, + err := ds.streamingRuntime.exec(ctx, req.ContainerId, req.Cmd, nil, // in ioutils.WriteCloserWrapper(ioutils.LimitWriter(&stdoutBuffer, maxMsgSize)), ioutils.WriteCloserWrapper(ioutils.LimitWriter(&stderrBuffer, maxMsgSize)), @@ -88,6 +92,11 @@ func (ds *dockerService) ExecSync(_ context.Context, req *runtimeapi.ExecSyncReq nil, // resize timeout) + // kubelet's remote runtime expects a grpc error with status code DeadlineExceeded on time out. + if errors.Is(err, context.DeadlineExceeded) { + return nil, status.Errorf(codes.DeadlineExceeded, err.Error()) + } + var exitCode int32 if err != nil { exitError, ok := err.(utilexec.ExitError) diff --git a/pkg/kubelet/dockershim/docker_streaming_windows.go b/pkg/kubelet/dockershim/docker_streaming_windows.go index 64c5ba18833..8872e2a6df5 100644 --- a/pkg/kubelet/dockershim/docker_streaming_windows.go +++ b/pkg/kubelet/dockershim/docker_streaming_windows.go @@ -20,6 +20,7 @@ package dockershim import ( "bytes" + "context" "fmt" "io" @@ -28,7 +29,7 @@ import ( func (r *streamingRuntime) portForward(podSandboxID string, port int32, stream io.ReadWriteCloser) error { stderr := new(bytes.Buffer) - err := r.exec(podSandboxID, []string{"wincat.exe", "127.0.0.1", fmt.Sprint(port)}, stream, stream, ioutils.WriteCloserWrapper(stderr), false, nil, 0) + err := r.exec(context.TODO(), podSandboxID, []string{"wincat.exe", "127.0.0.1", fmt.Sprint(port)}, stream, stream, ioutils.WriteCloserWrapper(stderr), false, nil, 0) if err != nil { return fmt.Errorf("%v: %s", err, stderr.String()) } diff --git a/pkg/kubelet/dockershim/exec.go b/pkg/kubelet/dockershim/exec.go index d79cff31d97..a735759a615 100644 --- a/pkg/kubelet/dockershim/exec.go +++ b/pkg/kubelet/dockershim/exec.go @@ -19,26 +19,22 @@ limitations under the License. package dockershim import ( + "context" "fmt" "io" - "strings" "time" dockertypes "github.com/docker/docker/api/types" - "k8s.io/klog/v2" - utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/remotecommand" - "k8s.io/kubernetes/pkg/features" + "k8s.io/klog/v2" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" - "k8s.io/kubernetes/pkg/probe/exec" - "k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker" ) // ExecHandler knows how to execute a command in a running Docker container. type ExecHandler interface { - ExecInContainer(client libdocker.Interface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error + ExecInContainer(ctx context.Context, client libdocker.Interface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error } type dockerExitError struct { @@ -65,7 +61,7 @@ func (d *dockerExitError) ExitStatus() int { type NativeExecHandler struct{} // ExecInContainer executes the cmd in container using the Docker's exec API -func (*NativeExecHandler) ExecInContainer(client libdocker.Interface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { +func (*NativeExecHandler) ExecInContainer(ctx context.Context, client libdocker.Interface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { done := make(chan struct{}) defer close(done) @@ -109,53 +105,55 @@ func (*NativeExecHandler) ExecInContainer(client libdocker.Interface, container RawTerminal: tty, ExecStarted: execStarted, } - err = client.StartExec(execObj.ID, startOpts, streamOpts) - if err != nil { - return err + + if timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, timeout) + defer cancel() } - // if ExecProbeTimeout feature gate is disabled, preserve existing behavior to ignore exec timeouts - var execTimeout <-chan time.Time - if timeout > 0 && utilfeature.DefaultFeatureGate.Enabled(features.ExecProbeTimeout) { - execTimeout = time.After(timeout) - } else { - // skip exec timeout if provided timeout is 0 - execTimeout = nil - } + // StartExec is a blocking call, so we need to run it concurrently and catch + // its error in a channel + execErr := make(chan error, 1) + go func() { + execErr <- client.StartExec(execObj.ID, startOpts, streamOpts) + }() - ticker := time.NewTicker(2 * time.Second) - defer ticker.Stop() - count := 0 - for { - select { - case <-execTimeout: - return exec.NewTimeoutError(fmt.Errorf("command %q timed out", strings.Join(cmd, " ")), timeout) - // need to use "default" here instead of <-ticker.C, otherwise we delay the initial InspectExec by 2 seconds. - default: - inspect, inspectErr := client.InspectExec(execObj.ID) - if inspectErr != nil { - return inspectErr - } - - if !inspect.Running { - if inspect.ExitCode != 0 { - return &dockerExitError{inspect} - } - - return nil - } - - // Only limit the amount of InspectExec calls if the exec timeout was not set. - // When a timeout is not set, we stop polling the exec session after 5 attempts and allow the process to continue running. - if execTimeout == nil { - count++ - if count == 5 { - klog.Errorf("Exec session %s in container %s terminated but process still running!", execObj.ID, container.ID) - return nil - } - } - - <-ticker.C + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-execErr: + if err != nil { + return err } } + + // InspectExec may not always return latest state of exec, so call it a few times until + // it returns an exec inspect that shows that the process is no longer running. + retries := 0 + maxRetries := 5 + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + for { + inspect, err := client.InspectExec(execObj.ID) + if err != nil { + return err + } + + if !inspect.Running { + if inspect.ExitCode != 0 { + return &dockerExitError{inspect} + } + + return nil + } + + retries++ + if retries == maxRetries { + klog.Errorf("Exec session %s in container %s terminated but process still running!", execObj.ID, container.ID) + return nil + } + + <-ticker.C + } } diff --git a/pkg/kubelet/dockershim/exec_test.go b/pkg/kubelet/dockershim/exec_test.go index 2dddee59aa4..cc7e00bac50 100644 --- a/pkg/kubelet/dockershim/exec_test.go +++ b/pkg/kubelet/dockershim/exec_test.go @@ -19,6 +19,7 @@ limitations under the License. package dockershim import ( + "context" "fmt" "io" "testing" @@ -29,7 +30,6 @@ import ( "github.com/stretchr/testify/assert" "k8s.io/client-go/tools/remotecommand" - mockclient "k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker/testing" ) @@ -37,6 +37,7 @@ func TestExecInContainer(t *testing.T) { testcases := []struct { description string + timeout time.Duration returnCreateExec1 *dockertypes.IDResponse returnCreateExec2 error returnStartExec error @@ -45,6 +46,7 @@ func TestExecInContainer(t *testing.T) { expectError error }{{ description: "ExecInContainer succeeds", + timeout: time.Minute, returnCreateExec1: &dockertypes.IDResponse{ID: "12345678"}, returnCreateExec2: nil, returnStartExec: nil, @@ -58,6 +60,7 @@ func TestExecInContainer(t *testing.T) { expectError: nil, }, { description: "CreateExec returns an error", + timeout: time.Minute, returnCreateExec1: nil, returnCreateExec2: fmt.Errorf("error in CreateExec()"), returnStartExec: nil, @@ -66,6 +69,7 @@ func TestExecInContainer(t *testing.T) { expectError: fmt.Errorf("failed to exec in container - Exec setup failed - error in CreateExec()"), }, { description: "StartExec returns an error", + timeout: time.Minute, returnCreateExec1: &dockertypes.IDResponse{ID: "12345678"}, returnCreateExec2: nil, returnStartExec: fmt.Errorf("error in StartExec()"), @@ -74,12 +78,27 @@ func TestExecInContainer(t *testing.T) { expectError: fmt.Errorf("error in StartExec()"), }, { description: "InspectExec returns an error", + timeout: time.Minute, returnCreateExec1: &dockertypes.IDResponse{ID: "12345678"}, returnCreateExec2: nil, returnStartExec: nil, returnInspectExec1: nil, returnInspectExec2: fmt.Errorf("error in InspectExec()"), expectError: fmt.Errorf("error in InspectExec()"), + }, { + description: "ExecInContainer returns context DeadlineExceeded", + timeout: 1 * time.Second, + returnCreateExec1: &dockertypes.IDResponse{ID: "12345678"}, + returnCreateExec2: nil, + returnStartExec: context.DeadlineExceeded, + returnInspectExec1: &dockertypes.ContainerExecInspect{ + ExecID: "200", + ContainerID: "12345678", + Running: true, + ExitCode: 0, + Pid: 100}, + returnInspectExec2: nil, + expectError: context.DeadlineExceeded, }} eh := &NativeExecHandler{} @@ -89,7 +108,6 @@ func TestExecInContainer(t *testing.T) { var stdin io.Reader var stdout, stderr io.WriteCloser var resize <-chan remotecommand.TerminalSize - var timeout time.Duration for _, tc := range testcases { t.Logf("TestCase: %q", tc.description) @@ -102,8 +120,13 @@ func TestExecInContainer(t *testing.T) { mockClient.EXPECT().InspectExec(gomock.Any()).Return( tc.returnInspectExec1, tc.returnInspectExec2) - err := eh.ExecInContainer(mockClient, container, cmd, stdin, stdout, stderr, false, resize, timeout) - assert.Equal(t, err, tc.expectError) + + // use parent context of 2 minutes since that's the default remote + // runtime connection timeout used by dockershim + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + err := eh.ExecInContainer(ctx, mockClient, container, cmd, stdin, stdout, stderr, false, resize, tc.timeout) + assert.Equal(t, tc.expectError, err) } } diff --git a/test/e2e/common/container_probe.go b/test/e2e/common/container_probe.go index 83a9fd98999..59e0315ad9a 100644 --- a/test/e2e/common/container_probe.go +++ b/test/e2e/common/container_probe.go @@ -123,6 +123,7 @@ var _ = framework.KubeDescribe("Probing container", func() { livenessProbe := &v1.Probe{ Handler: execHandler([]string{"cat", "/tmp/health"}), InitialDelaySeconds: 15, + TimeoutSeconds: 5, // default 1s can be pretty aggressive in CI environments with low resources FailureThreshold: 1, } pod := busyBoxPodSpec(nil, livenessProbe, cmd) @@ -139,6 +140,7 @@ var _ = framework.KubeDescribe("Probing container", func() { livenessProbe := &v1.Probe{ Handler: execHandler([]string{"cat", "/tmp/health"}), InitialDelaySeconds: 15, + TimeoutSeconds: 5, // default 1s can be pretty aggressive in CI environments with low resources FailureThreshold: 1, } pod := busyBoxPodSpec(nil, livenessProbe, cmd)