mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #82514 from dims/limit-exec-probe-bytes-read
Exec probes should not be unbounded
This commit is contained in:
commit
579e0c74c1
@ -42,6 +42,8 @@ type streamingRuntime struct {
|
||||
|
||||
var _ streaming.Runtime = &streamingRuntime{}
|
||||
|
||||
const maxMsgSize = 1024 * 1024 * 16
|
||||
|
||||
func (r *streamingRuntime) Exec(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
|
||||
return r.exec(containerID, cmd, in, out, err, tty, resize, 0)
|
||||
}
|
||||
@ -78,8 +80,8 @@ func (ds *dockerService) ExecSync(_ context.Context, req *runtimeapi.ExecSyncReq
|
||||
var stdoutBuffer, stderrBuffer bytes.Buffer
|
||||
err := ds.streamingRuntime.exec(req.ContainerId, req.Cmd,
|
||||
nil, // in
|
||||
ioutils.WriteCloserWrapper(&stdoutBuffer),
|
||||
ioutils.WriteCloserWrapper(&stderrBuffer),
|
||||
ioutils.WriteCloserWrapper(ioutils.LimitWriter(&stdoutBuffer, maxMsgSize)),
|
||||
ioutils.WriteCloserWrapper(ioutils.LimitWriter(&stderrBuffer, maxMsgSize)),
|
||||
false, // tty
|
||||
nil, // resize
|
||||
timeout)
|
||||
|
@ -58,6 +58,7 @@ go_test(
|
||||
"//pkg/kubelet/prober/results:go_default_library",
|
||||
"//pkg/kubelet/status:go_default_library",
|
||||
"//pkg/kubelet/status/testing:go_default_library",
|
||||
"//pkg/kubelet/util/ioutils:go_default_library",
|
||||
"//pkg/probe:go_default_library",
|
||||
"//pkg/probe/exec:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
|
@ -252,63 +252,68 @@ func formatURL(scheme string, host string, port int, path string) *url.URL {
|
||||
type execInContainer struct {
|
||||
// run executes a command in a container. Combined stdout and stderr output is always returned. An
|
||||
// error is returned if one occurred.
|
||||
run func() ([]byte, error)
|
||||
run func() ([]byte, error)
|
||||
writer io.Writer
|
||||
}
|
||||
|
||||
func (pb *prober) newExecInContainer(container v1.Container, containerID kubecontainer.ContainerID, cmd []string, timeout time.Duration) exec.Cmd {
|
||||
return execInContainer{func() ([]byte, error) {
|
||||
return &execInContainer{run: func() ([]byte, error) {
|
||||
return pb.runner.RunInContainer(containerID, cmd, timeout)
|
||||
}}
|
||||
}
|
||||
|
||||
func (eic execInContainer) Run() error {
|
||||
return fmt.Errorf("unimplemented")
|
||||
func (eic *execInContainer) Run() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (eic execInContainer) CombinedOutput() ([]byte, error) {
|
||||
func (eic *execInContainer) CombinedOutput() ([]byte, error) {
|
||||
return eic.run()
|
||||
}
|
||||
|
||||
func (eic execInContainer) Output() ([]byte, error) {
|
||||
func (eic *execInContainer) Output() ([]byte, error) {
|
||||
return nil, fmt.Errorf("unimplemented")
|
||||
}
|
||||
|
||||
func (eic execInContainer) SetDir(dir string) {
|
||||
func (eic *execInContainer) SetDir(dir string) {
|
||||
//unimplemented
|
||||
}
|
||||
|
||||
func (eic execInContainer) SetStdin(in io.Reader) {
|
||||
func (eic *execInContainer) SetStdin(in io.Reader) {
|
||||
//unimplemented
|
||||
}
|
||||
|
||||
func (eic execInContainer) SetStdout(out io.Writer) {
|
||||
func (eic *execInContainer) SetStdout(out io.Writer) {
|
||||
eic.writer = out
|
||||
}
|
||||
|
||||
func (eic *execInContainer) SetStderr(out io.Writer) {
|
||||
eic.writer = out
|
||||
}
|
||||
|
||||
func (eic *execInContainer) SetEnv(env []string) {
|
||||
//unimplemented
|
||||
}
|
||||
|
||||
func (eic execInContainer) SetStderr(out io.Writer) {
|
||||
func (eic *execInContainer) Stop() {
|
||||
//unimplemented
|
||||
}
|
||||
|
||||
func (eic execInContainer) SetEnv(env []string) {
|
||||
//unimplemented
|
||||
func (eic *execInContainer) Start() error {
|
||||
data, err := eic.run()
|
||||
if eic.writer != nil {
|
||||
eic.writer.Write(data)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (eic execInContainer) Stop() {
|
||||
//unimplemented
|
||||
func (eic *execInContainer) Wait() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (eic execInContainer) Start() error {
|
||||
return fmt.Errorf("unimplemented")
|
||||
}
|
||||
|
||||
func (eic execInContainer) Wait() error {
|
||||
return fmt.Errorf("unimplemented")
|
||||
}
|
||||
|
||||
func (eic execInContainer) StdoutPipe() (io.ReadCloser, error) {
|
||||
func (eic *execInContainer) StdoutPipe() (io.ReadCloser, error) {
|
||||
return nil, fmt.Errorf("unimplemented")
|
||||
}
|
||||
|
||||
func (eic execInContainer) StderrPipe() (io.ReadCloser, error) {
|
||||
func (eic *execInContainer) StderrPipe() (io.ReadCloser, error) {
|
||||
return nil, fmt.Errorf("unimplemented")
|
||||
}
|
||||
|
@ -17,10 +17,12 @@ limitations under the License.
|
||||
package prober
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
@ -29,6 +31,7 @@ import (
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
|
||||
"k8s.io/kubernetes/pkg/kubelet/prober/results"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/ioutils"
|
||||
"k8s.io/kubernetes/pkg/probe"
|
||||
execprobe "k8s.io/kubernetes/pkg/probe/exec"
|
||||
)
|
||||
@ -329,23 +332,38 @@ func TestProbe(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNewExecInContainer(t *testing.T) {
|
||||
limit := 1024
|
||||
tenKilobyte := strings.Repeat("logs-123", 128*10)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
err error
|
||||
name string
|
||||
stdout string
|
||||
expected string
|
||||
err error
|
||||
}{
|
||||
{
|
||||
name: "no error",
|
||||
err: nil,
|
||||
name: "no error",
|
||||
stdout: "foo",
|
||||
expected: "foo",
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
name: "error - make sure we get output",
|
||||
err: errors.New("bad"),
|
||||
name: "no error",
|
||||
stdout: tenKilobyte,
|
||||
expected: tenKilobyte[0:limit],
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
name: "error - make sure we get output",
|
||||
stdout: "foo",
|
||||
expected: "foo",
|
||||
err: errors.New("bad"),
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
runner := &containertest.FakeContainerCommandRunner{
|
||||
Stdout: "foo",
|
||||
Stdout: test.stdout,
|
||||
Err: test.err,
|
||||
}
|
||||
prober := &prober{
|
||||
@ -357,7 +375,16 @@ func TestNewExecInContainer(t *testing.T) {
|
||||
cmd := []string{"/foo", "bar"}
|
||||
exec := prober.newExecInContainer(container, containerID, cmd, 0)
|
||||
|
||||
actualOutput, err := exec.CombinedOutput()
|
||||
var dataBuffer bytes.Buffer
|
||||
writer := ioutils.LimitWriter(&dataBuffer, int64(limit))
|
||||
exec.SetStderr(writer)
|
||||
exec.SetStdout(writer)
|
||||
err := exec.Start()
|
||||
if err == nil {
|
||||
err = exec.Wait()
|
||||
}
|
||||
actualOutput := dataBuffer.Bytes()
|
||||
|
||||
if e, a := containerID, runner.ContainerID; e != a {
|
||||
t.Errorf("%s: container id: expected %v, got %v", test.name, e, a)
|
||||
}
|
||||
@ -365,7 +392,7 @@ func TestNewExecInContainer(t *testing.T) {
|
||||
t.Errorf("%s: cmd: expected %v, got %v", test.name, e, a)
|
||||
}
|
||||
// this isn't 100% foolproof as a bug in a real ContainerCommandRunner where it fails to copy to stdout/stderr wouldn't be caught by this test
|
||||
if e, a := "foo", string(actualOutput); e != a {
|
||||
if e, a := test.expected, string(actualOutput); e != a {
|
||||
t.Errorf("%s: output: expected %q, got %q", test.name, e, a)
|
||||
}
|
||||
if e, a := fmt.Sprintf("%v", test.err), fmt.Sprintf("%v", err); e != a {
|
||||
|
@ -1,9 +1,6 @@
|
||||
package(default_visibility = ["//visibility:public"])
|
||||
|
||||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_library",
|
||||
)
|
||||
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
@ -23,3 +20,10 @@ filegroup(
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["ioutils_test.go"],
|
||||
embed = [":go_default_library"],
|
||||
deps = ["//vendor/github.com/stretchr/testify/assert:go_default_library"],
|
||||
)
|
||||
|
@ -35,3 +35,36 @@ func (w *writeCloserWrapper) Close() error {
|
||||
func WriteCloserWrapper(w io.Writer) io.WriteCloser {
|
||||
return &writeCloserWrapper{w}
|
||||
}
|
||||
|
||||
// LimitWriter is a copy of the standard library ioutils.LimitReader,
|
||||
// applied to the writer interface.
|
||||
// LimitWriter returns a Writer that writes to w
|
||||
// but stops with EOF after n bytes.
|
||||
// The underlying implementation is a *LimitedWriter.
|
||||
func LimitWriter(w io.Writer, n int64) io.Writer { return &LimitedWriter{w, n} }
|
||||
|
||||
// A LimitedWriter writes to W but limits the amount of
|
||||
// data returned to just N bytes. Each call to Write
|
||||
// updates N to reflect the new amount remaining.
|
||||
// Write returns EOF when N <= 0 or when the underlying W returns EOF.
|
||||
type LimitedWriter struct {
|
||||
W io.Writer // underlying writer
|
||||
N int64 // max bytes remaining
|
||||
}
|
||||
|
||||
func (l *LimitedWriter) Write(p []byte) (n int, err error) {
|
||||
if l.N <= 0 {
|
||||
return 0, io.ErrShortWrite
|
||||
}
|
||||
truncated := false
|
||||
if int64(len(p)) > l.N {
|
||||
p = p[0:l.N]
|
||||
truncated = true
|
||||
}
|
||||
n, err = l.W.Write(p)
|
||||
l.N -= int64(n)
|
||||
if err == nil && truncated {
|
||||
err = io.ErrShortWrite
|
||||
}
|
||||
return
|
||||
}
|
||||
|
93
pkg/kubelet/util/ioutils/ioutils_test.go
Normal file
93
pkg/kubelet/util/ioutils/ioutils_test.go
Normal file
@ -0,0 +1,93 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package ioutils
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestLimitWriter(t *testing.T) {
|
||||
r := rand.New(rand.NewSource(1234)) // Fixed source to prevent flakes.
|
||||
|
||||
tests := []struct {
|
||||
inputSize, limit, writeSize int64
|
||||
}{
|
||||
// Single write tests
|
||||
{100, 101, 100},
|
||||
{100, 100, 100},
|
||||
{100, 99, 100},
|
||||
{1, 1, 1},
|
||||
{100, 10, 100},
|
||||
{100, 0, 100},
|
||||
{100, -1, 100},
|
||||
// Multi write tests
|
||||
{100, 101, 10},
|
||||
{100, 100, 10},
|
||||
{100, 99, 10},
|
||||
{100, 10, 10},
|
||||
{100, 0, 10},
|
||||
{100, -1, 10},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(fmt.Sprintf("inputSize=%d limit=%d writes=%d", test.inputSize, test.limit, test.writeSize), func(t *testing.T) {
|
||||
input := make([]byte, test.inputSize)
|
||||
r.Read(input)
|
||||
output := &bytes.Buffer{}
|
||||
w := LimitWriter(output, test.limit)
|
||||
|
||||
var (
|
||||
err error
|
||||
written int64
|
||||
n int
|
||||
)
|
||||
for written < test.inputSize && err == nil {
|
||||
n, err = w.Write(input[written : written+test.writeSize])
|
||||
written += int64(n)
|
||||
}
|
||||
|
||||
expectWritten := bounded(0, test.inputSize, test.limit)
|
||||
assert.EqualValues(t, expectWritten, written)
|
||||
if expectWritten <= 0 {
|
||||
assert.Empty(t, output)
|
||||
} else {
|
||||
assert.Equal(t, input[:expectWritten], output.Bytes())
|
||||
}
|
||||
|
||||
if test.limit < test.inputSize {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func bounded(min, val, max int64) int64 {
|
||||
if max < val {
|
||||
val = max
|
||||
}
|
||||
if val < min {
|
||||
val = min
|
||||
}
|
||||
return val
|
||||
}
|
@ -11,6 +11,7 @@ go_library(
|
||||
srcs = ["exec.go"],
|
||||
importpath = "k8s.io/kubernetes/pkg/probe/exec",
|
||||
deps = [
|
||||
"//pkg/kubelet/util/ioutils:go_default_library",
|
||||
"//pkg/probe:go_default_library",
|
||||
"//vendor/k8s.io/klog:go_default_library",
|
||||
"//vendor/k8s.io/utils/exec:go_default_library",
|
||||
|
@ -17,10 +17,17 @@ limitations under the License.
|
||||
package exec
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/ioutils"
|
||||
"k8s.io/kubernetes/pkg/probe"
|
||||
"k8s.io/utils/exec"
|
||||
|
||||
"k8s.io/klog"
|
||||
"k8s.io/utils/exec"
|
||||
)
|
||||
|
||||
const (
|
||||
maxReadLength = 10 * 1 << 10 // 10KB
|
||||
)
|
||||
|
||||
// New creates a Prober.
|
||||
@ -39,7 +46,17 @@ type execProber struct{}
|
||||
// from executing a command. Returns the Result status, command output, and
|
||||
// errors if any.
|
||||
func (pr execProber) Probe(e exec.Cmd) (probe.Result, string, error) {
|
||||
data, err := e.CombinedOutput()
|
||||
var dataBuffer bytes.Buffer
|
||||
writer := ioutils.LimitWriter(&dataBuffer, maxReadLength)
|
||||
|
||||
e.SetStderr(writer)
|
||||
e.SetStdout(writer)
|
||||
err := e.Start()
|
||||
if err == nil {
|
||||
err = e.Wait()
|
||||
}
|
||||
data := dataBuffer.Bytes()
|
||||
|
||||
klog.V(4).Infof("Exec probe response: %q", string(data))
|
||||
if err != nil {
|
||||
exit, ok := err.(exec.ExitError)
|
||||
|
@ -19,6 +19,7 @@ package exec
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"k8s.io/kubernetes/pkg/probe"
|
||||
@ -28,6 +29,7 @@ type FakeCmd struct {
|
||||
out []byte
|
||||
stdout []byte
|
||||
err error
|
||||
writer io.Writer
|
||||
}
|
||||
|
||||
func (f *FakeCmd) Run() error {
|
||||
@ -46,15 +48,25 @@ func (f *FakeCmd) SetDir(dir string) {}
|
||||
|
||||
func (f *FakeCmd) SetStdin(in io.Reader) {}
|
||||
|
||||
func (f *FakeCmd) SetStdout(out io.Writer) {}
|
||||
func (f *FakeCmd) SetStdout(out io.Writer) {
|
||||
f.writer = out
|
||||
}
|
||||
|
||||
func (f *FakeCmd) SetStderr(out io.Writer) {}
|
||||
func (f *FakeCmd) SetStderr(out io.Writer) {
|
||||
f.writer = out
|
||||
}
|
||||
|
||||
func (f *FakeCmd) SetEnv(env []string) {}
|
||||
|
||||
func (f *FakeCmd) Stop() {}
|
||||
|
||||
func (f *FakeCmd) Start() error { return nil }
|
||||
func (f *FakeCmd) Start() error {
|
||||
if f.writer != nil {
|
||||
f.writer.Write(f.out)
|
||||
return f.err
|
||||
}
|
||||
return f.err
|
||||
}
|
||||
|
||||
func (f *FakeCmd) Wait() error { return nil }
|
||||
|
||||
@ -90,20 +102,26 @@ func (f *fakeExitError) ExitStatus() int {
|
||||
func TestExec(t *testing.T) {
|
||||
prober := New()
|
||||
|
||||
tenKilobyte := strings.Repeat("logs-123", 128*10) // 8*128*10=10240 = 10KB of text.
|
||||
elevenKilobyte := strings.Repeat("logs-123", 8*128*11) // 8*128*11=11264 = 11KB of text.
|
||||
|
||||
tests := []struct {
|
||||
expectedStatus probe.Result
|
||||
expectError bool
|
||||
input string
|
||||
output string
|
||||
err error
|
||||
}{
|
||||
// Ok
|
||||
{probe.Success, false, "OK", nil},
|
||||
{probe.Success, false, "OK", "OK", nil},
|
||||
// Ok
|
||||
{probe.Success, false, "OK", &fakeExitError{true, 0}},
|
||||
{probe.Success, false, "OK", "OK", &fakeExitError{true, 0}},
|
||||
// Ok - truncated output
|
||||
{probe.Success, false, elevenKilobyte, tenKilobyte, nil},
|
||||
// Run returns error
|
||||
{probe.Unknown, true, "", fmt.Errorf("test error")},
|
||||
{probe.Unknown, true, "", "", fmt.Errorf("test error")},
|
||||
// Unhealthy
|
||||
{probe.Failure, false, "Fail", &fakeExitError{true, 1}},
|
||||
{probe.Failure, false, "Fail", "", &fakeExitError{true, 1}},
|
||||
}
|
||||
for i, test := range tests {
|
||||
fake := FakeCmd{
|
||||
|
Loading…
Reference in New Issue
Block a user