diff --git a/pkg/kubelet/dockershim/docker_streaming.go b/pkg/kubelet/dockershim/docker_streaming.go index 2df59c6b05d..76b2fecd11b 100644 --- a/pkg/kubelet/dockershim/docker_streaming.go +++ b/pkg/kubelet/dockershim/docker_streaming.go @@ -42,6 +42,8 @@ type streamingRuntime struct { var _ streaming.Runtime = &streamingRuntime{} +const maxMsgSize = 1024 * 1024 * 16 + func (r *streamingRuntime) Exec(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error { return r.exec(containerID, cmd, in, out, err, tty, resize, 0) } @@ -78,8 +80,8 @@ func (ds *dockerService) ExecSync(_ context.Context, req *runtimeapi.ExecSyncReq var stdoutBuffer, stderrBuffer bytes.Buffer err := ds.streamingRuntime.exec(req.ContainerId, req.Cmd, nil, // in - ioutils.WriteCloserWrapper(&stdoutBuffer), - ioutils.WriteCloserWrapper(&stderrBuffer), + ioutils.WriteCloserWrapper(ioutils.LimitWriter(&stdoutBuffer, maxMsgSize)), + ioutils.WriteCloserWrapper(ioutils.LimitWriter(&stderrBuffer, maxMsgSize)), false, // tty nil, // resize timeout) diff --git a/pkg/kubelet/prober/BUILD b/pkg/kubelet/prober/BUILD index 90ccc7d0bbe..a9d2e14b8eb 100644 --- a/pkg/kubelet/prober/BUILD +++ b/pkg/kubelet/prober/BUILD @@ -58,6 +58,7 @@ go_test( "//pkg/kubelet/prober/results:go_default_library", "//pkg/kubelet/status:go_default_library", "//pkg/kubelet/status/testing:go_default_library", + "//pkg/kubelet/util/ioutils:go_default_library", "//pkg/probe:go_default_library", "//pkg/probe/exec:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", diff --git a/pkg/kubelet/prober/prober.go b/pkg/kubelet/prober/prober.go index 173a23e77f2..dd94f1705b2 100644 --- a/pkg/kubelet/prober/prober.go +++ b/pkg/kubelet/prober/prober.go @@ -252,63 +252,68 @@ func formatURL(scheme string, host string, port int, path string) *url.URL { type execInContainer struct { // run executes a command in a container. Combined stdout and stderr output is always returned. An // error is returned if one occurred. - run func() ([]byte, error) + run func() ([]byte, error) + writer io.Writer } func (pb *prober) newExecInContainer(container v1.Container, containerID kubecontainer.ContainerID, cmd []string, timeout time.Duration) exec.Cmd { - return execInContainer{func() ([]byte, error) { + return &execInContainer{run: func() ([]byte, error) { return pb.runner.RunInContainer(containerID, cmd, timeout) }} } -func (eic execInContainer) Run() error { - return fmt.Errorf("unimplemented") +func (eic *execInContainer) Run() error { + return nil } -func (eic execInContainer) CombinedOutput() ([]byte, error) { +func (eic *execInContainer) CombinedOutput() ([]byte, error) { return eic.run() } -func (eic execInContainer) Output() ([]byte, error) { +func (eic *execInContainer) Output() ([]byte, error) { return nil, fmt.Errorf("unimplemented") } -func (eic execInContainer) SetDir(dir string) { +func (eic *execInContainer) SetDir(dir string) { //unimplemented } -func (eic execInContainer) SetStdin(in io.Reader) { +func (eic *execInContainer) SetStdin(in io.Reader) { //unimplemented } -func (eic execInContainer) SetStdout(out io.Writer) { +func (eic *execInContainer) SetStdout(out io.Writer) { + eic.writer = out +} + +func (eic *execInContainer) SetStderr(out io.Writer) { + eic.writer = out +} + +func (eic *execInContainer) SetEnv(env []string) { //unimplemented } -func (eic execInContainer) SetStderr(out io.Writer) { +func (eic *execInContainer) Stop() { //unimplemented } -func (eic execInContainer) SetEnv(env []string) { - //unimplemented +func (eic *execInContainer) Start() error { + data, err := eic.run() + if eic.writer != nil { + eic.writer.Write(data) + } + return err } -func (eic execInContainer) Stop() { - //unimplemented +func (eic *execInContainer) Wait() error { + return nil } -func (eic execInContainer) Start() error { - return fmt.Errorf("unimplemented") -} - -func (eic execInContainer) Wait() error { - return fmt.Errorf("unimplemented") -} - -func (eic execInContainer) StdoutPipe() (io.ReadCloser, error) { +func (eic *execInContainer) StdoutPipe() (io.ReadCloser, error) { return nil, fmt.Errorf("unimplemented") } -func (eic execInContainer) StderrPipe() (io.ReadCloser, error) { +func (eic *execInContainer) StderrPipe() (io.ReadCloser, error) { return nil, fmt.Errorf("unimplemented") } diff --git a/pkg/kubelet/prober/prober_test.go b/pkg/kubelet/prober/prober_test.go index dab63587059..41b4ddbc971 100644 --- a/pkg/kubelet/prober/prober_test.go +++ b/pkg/kubelet/prober/prober_test.go @@ -17,10 +17,12 @@ limitations under the License. package prober import ( + "bytes" "errors" "fmt" "net/http" "reflect" + "strings" "testing" "k8s.io/api/core/v1" @@ -29,6 +31,7 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" "k8s.io/kubernetes/pkg/kubelet/prober/results" + "k8s.io/kubernetes/pkg/kubelet/util/ioutils" "k8s.io/kubernetes/pkg/probe" execprobe "k8s.io/kubernetes/pkg/probe/exec" ) @@ -329,23 +332,38 @@ func TestProbe(t *testing.T) { } func TestNewExecInContainer(t *testing.T) { + limit := 1024 + tenKilobyte := strings.Repeat("logs-123", 128*10) + tests := []struct { - name string - err error + name string + stdout string + expected string + err error }{ { - name: "no error", - err: nil, + name: "no error", + stdout: "foo", + expected: "foo", + err: nil, }, { - name: "error - make sure we get output", - err: errors.New("bad"), + name: "no error", + stdout: tenKilobyte, + expected: tenKilobyte[0:limit], + err: nil, + }, + { + name: "error - make sure we get output", + stdout: "foo", + expected: "foo", + err: errors.New("bad"), }, } for _, test := range tests { runner := &containertest.FakeContainerCommandRunner{ - Stdout: "foo", + Stdout: test.stdout, Err: test.err, } prober := &prober{ @@ -357,7 +375,16 @@ func TestNewExecInContainer(t *testing.T) { cmd := []string{"/foo", "bar"} exec := prober.newExecInContainer(container, containerID, cmd, 0) - actualOutput, err := exec.CombinedOutput() + var dataBuffer bytes.Buffer + writer := ioutils.LimitWriter(&dataBuffer, int64(limit)) + exec.SetStderr(writer) + exec.SetStdout(writer) + err := exec.Start() + if err == nil { + err = exec.Wait() + } + actualOutput := dataBuffer.Bytes() + if e, a := containerID, runner.ContainerID; e != a { t.Errorf("%s: container id: expected %v, got %v", test.name, e, a) } @@ -365,7 +392,7 @@ func TestNewExecInContainer(t *testing.T) { t.Errorf("%s: cmd: expected %v, got %v", test.name, e, a) } // this isn't 100% foolproof as a bug in a real ContainerCommandRunner where it fails to copy to stdout/stderr wouldn't be caught by this test - if e, a := "foo", string(actualOutput); e != a { + if e, a := test.expected, string(actualOutput); e != a { t.Errorf("%s: output: expected %q, got %q", test.name, e, a) } if e, a := fmt.Sprintf("%v", test.err), fmt.Sprintf("%v", err); e != a { diff --git a/pkg/kubelet/util/ioutils/BUILD b/pkg/kubelet/util/ioutils/BUILD index e84deec931e..43bd2626084 100644 --- a/pkg/kubelet/util/ioutils/BUILD +++ b/pkg/kubelet/util/ioutils/BUILD @@ -1,9 +1,6 @@ package(default_visibility = ["//visibility:public"]) -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", -) +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", @@ -23,3 +20,10 @@ filegroup( srcs = [":package-srcs"], tags = ["automanaged"], ) + +go_test( + name = "go_default_test", + srcs = ["ioutils_test.go"], + embed = [":go_default_library"], + deps = ["//vendor/github.com/stretchr/testify/assert:go_default_library"], +) diff --git a/pkg/kubelet/util/ioutils/ioutils.go b/pkg/kubelet/util/ioutils/ioutils.go index 42f1998c794..1b2b5a6d5dd 100644 --- a/pkg/kubelet/util/ioutils/ioutils.go +++ b/pkg/kubelet/util/ioutils/ioutils.go @@ -35,3 +35,36 @@ func (w *writeCloserWrapper) Close() error { func WriteCloserWrapper(w io.Writer) io.WriteCloser { return &writeCloserWrapper{w} } + +// LimitWriter is a copy of the standard library ioutils.LimitReader, +// applied to the writer interface. +// LimitWriter returns a Writer that writes to w +// but stops with EOF after n bytes. +// The underlying implementation is a *LimitedWriter. +func LimitWriter(w io.Writer, n int64) io.Writer { return &LimitedWriter{w, n} } + +// A LimitedWriter writes to W but limits the amount of +// data returned to just N bytes. Each call to Write +// updates N to reflect the new amount remaining. +// Write returns EOF when N <= 0 or when the underlying W returns EOF. +type LimitedWriter struct { + W io.Writer // underlying writer + N int64 // max bytes remaining +} + +func (l *LimitedWriter) Write(p []byte) (n int, err error) { + if l.N <= 0 { + return 0, io.ErrShortWrite + } + truncated := false + if int64(len(p)) > l.N { + p = p[0:l.N] + truncated = true + } + n, err = l.W.Write(p) + l.N -= int64(n) + if err == nil && truncated { + err = io.ErrShortWrite + } + return +} diff --git a/pkg/kubelet/util/ioutils/ioutils_test.go b/pkg/kubelet/util/ioutils/ioutils_test.go new file mode 100644 index 00000000000..524a4aed67d --- /dev/null +++ b/pkg/kubelet/util/ioutils/ioutils_test.go @@ -0,0 +1,93 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ioutils + +import ( + "bytes" + "fmt" + "math/rand" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestLimitWriter(t *testing.T) { + r := rand.New(rand.NewSource(1234)) // Fixed source to prevent flakes. + + tests := []struct { + inputSize, limit, writeSize int64 + }{ + // Single write tests + {100, 101, 100}, + {100, 100, 100}, + {100, 99, 100}, + {1, 1, 1}, + {100, 10, 100}, + {100, 0, 100}, + {100, -1, 100}, + // Multi write tests + {100, 101, 10}, + {100, 100, 10}, + {100, 99, 10}, + {100, 10, 10}, + {100, 0, 10}, + {100, -1, 10}, + } + + for _, test := range tests { + t.Run(fmt.Sprintf("inputSize=%d limit=%d writes=%d", test.inputSize, test.limit, test.writeSize), func(t *testing.T) { + input := make([]byte, test.inputSize) + r.Read(input) + output := &bytes.Buffer{} + w := LimitWriter(output, test.limit) + + var ( + err error + written int64 + n int + ) + for written < test.inputSize && err == nil { + n, err = w.Write(input[written : written+test.writeSize]) + written += int64(n) + } + + expectWritten := bounded(0, test.inputSize, test.limit) + assert.EqualValues(t, expectWritten, written) + if expectWritten <= 0 { + assert.Empty(t, output) + } else { + assert.Equal(t, input[:expectWritten], output.Bytes()) + } + + if test.limit < test.inputSize { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func bounded(min, val, max int64) int64 { + if max < val { + val = max + } + if val < min { + val = min + } + return val +} diff --git a/pkg/probe/exec/BUILD b/pkg/probe/exec/BUILD index 317dc0fc0cd..a22faf032c2 100644 --- a/pkg/probe/exec/BUILD +++ b/pkg/probe/exec/BUILD @@ -11,6 +11,7 @@ go_library( srcs = ["exec.go"], importpath = "k8s.io/kubernetes/pkg/probe/exec", deps = [ + "//pkg/kubelet/util/ioutils:go_default_library", "//pkg/probe:go_default_library", "//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", diff --git a/pkg/probe/exec/exec.go b/pkg/probe/exec/exec.go index a6ae523aa62..b8cfe0d2ad7 100644 --- a/pkg/probe/exec/exec.go +++ b/pkg/probe/exec/exec.go @@ -17,10 +17,17 @@ limitations under the License. package exec import ( + "bytes" + + "k8s.io/kubernetes/pkg/kubelet/util/ioutils" "k8s.io/kubernetes/pkg/probe" - "k8s.io/utils/exec" "k8s.io/klog" + "k8s.io/utils/exec" +) + +const ( + maxReadLength = 10 * 1 << 10 // 10KB ) // New creates a Prober. @@ -39,7 +46,17 @@ type execProber struct{} // from executing a command. Returns the Result status, command output, and // errors if any. func (pr execProber) Probe(e exec.Cmd) (probe.Result, string, error) { - data, err := e.CombinedOutput() + var dataBuffer bytes.Buffer + writer := ioutils.LimitWriter(&dataBuffer, maxReadLength) + + e.SetStderr(writer) + e.SetStdout(writer) + err := e.Start() + if err == nil { + err = e.Wait() + } + data := dataBuffer.Bytes() + klog.V(4).Infof("Exec probe response: %q", string(data)) if err != nil { exit, ok := err.(exec.ExitError) diff --git a/pkg/probe/exec/exec_test.go b/pkg/probe/exec/exec_test.go index 1d8eebb0634..300b3d8746c 100644 --- a/pkg/probe/exec/exec_test.go +++ b/pkg/probe/exec/exec_test.go @@ -19,6 +19,7 @@ package exec import ( "fmt" "io" + "strings" "testing" "k8s.io/kubernetes/pkg/probe" @@ -28,6 +29,7 @@ type FakeCmd struct { out []byte stdout []byte err error + writer io.Writer } func (f *FakeCmd) Run() error { @@ -46,15 +48,25 @@ func (f *FakeCmd) SetDir(dir string) {} func (f *FakeCmd) SetStdin(in io.Reader) {} -func (f *FakeCmd) SetStdout(out io.Writer) {} +func (f *FakeCmd) SetStdout(out io.Writer) { + f.writer = out +} -func (f *FakeCmd) SetStderr(out io.Writer) {} +func (f *FakeCmd) SetStderr(out io.Writer) { + f.writer = out +} func (f *FakeCmd) SetEnv(env []string) {} func (f *FakeCmd) Stop() {} -func (f *FakeCmd) Start() error { return nil } +func (f *FakeCmd) Start() error { + if f.writer != nil { + f.writer.Write(f.out) + return f.err + } + return f.err +} func (f *FakeCmd) Wait() error { return nil } @@ -90,20 +102,26 @@ func (f *fakeExitError) ExitStatus() int { func TestExec(t *testing.T) { prober := New() + tenKilobyte := strings.Repeat("logs-123", 128*10) // 8*128*10=10240 = 10KB of text. + elevenKilobyte := strings.Repeat("logs-123", 8*128*11) // 8*128*11=11264 = 11KB of text. + tests := []struct { expectedStatus probe.Result expectError bool + input string output string err error }{ // Ok - {probe.Success, false, "OK", nil}, + {probe.Success, false, "OK", "OK", nil}, // Ok - {probe.Success, false, "OK", &fakeExitError{true, 0}}, + {probe.Success, false, "OK", "OK", &fakeExitError{true, 0}}, + // Ok - truncated output + {probe.Success, false, elevenKilobyte, tenKilobyte, nil}, // Run returns error - {probe.Unknown, true, "", fmt.Errorf("test error")}, + {probe.Unknown, true, "", "", fmt.Errorf("test error")}, // Unhealthy - {probe.Failure, false, "Fail", &fakeExitError{true, 1}}, + {probe.Failure, false, "Fail", "", &fakeExitError{true, 1}}, } for i, test := range tests { fake := FakeCmd{