diff --git a/pkg/kubelet/dockershim/docker_legacy_service.go b/pkg/kubelet/dockershim/docker_legacy_service.go index 78616c21f24..5026722303d 100644 --- a/pkg/kubelet/dockershim/docker_legacy_service.go +++ b/pkg/kubelet/dockershim/docker_legacy_service.go @@ -18,6 +18,7 @@ package dockershim import ( "context" + "errors" "fmt" "io" "strconv" @@ -29,6 +30,7 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kubetypes "k8s.io/apimachinery/pkg/types" + "k8s.io/klog" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/kuberuntime" @@ -76,12 +78,23 @@ func (d *dockerService) GetContainerLogs(_ context.Context, pod *v1.Pod, contain opts.Tail = strconv.FormatInt(*logOptions.TailLines, 10) } + if logOptions.LimitBytes != nil { + // stdout and stderr share the total write limit + max := *logOptions.LimitBytes + stderr = sharedLimitWriter(stderr, &max) + stdout = sharedLimitWriter(stdout, &max) + } sopts := libdocker.StreamOptions{ OutputStream: stdout, ErrorStream: stderr, RawTerminal: container.Config.Tty, } - return d.client.Logs(containerID.ID, opts, sopts) + err = d.client.Logs(containerID.ID, opts, sopts) + if errors.Is(err, errMaximumWrite) { + klog.V(2).Infof("finished logs, hit byte limit %d", *logOptions.LimitBytes) + err = nil + } + return err } // GetContainerLogTail attempts to read up to MaxContainerTerminationMessageLogLength diff --git a/pkg/kubelet/dockershim/helpers.go b/pkg/kubelet/dockershim/helpers.go index a370e57fc72..9883d901261 100644 --- a/pkg/kubelet/dockershim/helpers.go +++ b/pkg/kubelet/dockershim/helpers.go @@ -17,10 +17,13 @@ limitations under the License. package dockershim import ( + "errors" "fmt" + "io" "regexp" "strconv" "strings" + "sync/atomic" dockertypes "github.com/docker/docker/api/types" dockercontainer "github.com/docker/docker/api/types/container" @@ -393,3 +396,44 @@ type dockerOpt struct { func (d dockerOpt) GetKV() (string, string) { return d.key, d.value } + +// sharedWriteLimiter limits the total output written across one or more streams. +type sharedWriteLimiter struct { + delegate io.Writer + limit *int64 +} + +func (w sharedWriteLimiter) Write(p []byte) (int, error) { + if len(p) == 0 { + return 0, nil + } + limit := atomic.LoadInt64(w.limit) + if limit <= 0 { + return 0, errMaximumWrite + } + var truncated bool + if limit < int64(len(p)) { + p = p[0:limit] + truncated = true + } + n, err := w.delegate.Write(p) + if n > 0 { + atomic.AddInt64(w.limit, -1*int64(n)) + } + if err == nil && truncated { + err = errMaximumWrite + } + return n, err +} + +func sharedLimitWriter(w io.Writer, limit *int64) io.Writer { + if w == nil { + return nil + } + return &sharedWriteLimiter{ + delegate: w, + limit: limit, + } +} + +var errMaximumWrite = errors.New("maximum write") diff --git a/pkg/kubelet/dockershim/helpers_test.go b/pkg/kubelet/dockershim/helpers_test.go index 20144820274..2597af61e4a 100644 --- a/pkg/kubelet/dockershim/helpers_test.go +++ b/pkg/kubelet/dockershim/helpers_test.go @@ -17,7 +17,10 @@ limitations under the License. package dockershim import ( + "bytes" + "errors" "fmt" + "sync" "testing" dockertypes "github.com/docker/docker/api/types" @@ -332,3 +335,104 @@ func TestGenerateMountBindings(t *testing.T) { assert.Equal(t, expectedResult, result) } + +func TestLimitedWriter(t *testing.T) { + max := func(x, y int64) int64 { + if x > y { + return x + } + return y + } + for name, tc := range map[string]struct { + w bytes.Buffer + toWrite string + limit int64 + wants string + wantsErr error + }{ + "nil": {}, + "neg": { + toWrite: "a", + wantsErr: errMaximumWrite, + limit: -1, + }, + "1byte-over": { + toWrite: "a", + wantsErr: errMaximumWrite, + }, + "1byte-maxed": { + toWrite: "a", + wants: "a", + limit: 1, + }, + "1byte-under": { + toWrite: "a", + wants: "a", + limit: 2, + }, + "6byte-over": { + toWrite: "foobar", + wants: "foo", + limit: 3, + wantsErr: errMaximumWrite, + }, + "6byte-maxed": { + toWrite: "foobar", + wants: "foobar", + limit: 6, + }, + "6byte-under": { + toWrite: "foobar", + wants: "foobar", + limit: 20, + }, + } { + t.Run(name, func(t *testing.T) { + limit := tc.limit + w := sharedLimitWriter(&tc.w, &limit) + n, err := w.Write([]byte(tc.toWrite)) + if int64(n) > max(0, tc.limit) { + t.Fatalf("bytes written (%d) exceeds limit (%d)", n, tc.limit) + } + if (err != nil) != (tc.wantsErr != nil) { + if err != nil { + t.Fatal("unexpected error:", err) + } + t.Fatal("expected error:", err) + } + if err != nil { + if !errors.Is(err, tc.wantsErr) { + t.Fatal("expected error: ", tc.wantsErr, " instead of: ", err) + } + if !errors.Is(err, errMaximumWrite) { + return + } + // check contents for errMaximumWrite + } + if s := tc.w.String(); s != tc.wants { + t.Fatalf("expected %q instead of %q", tc.wants, s) + } + }) + } + + // test concurrency. run this test a bunch of times to attempt to flush + // out any data races or concurrency issues. + for i := 0; i < 1000; i++ { + var ( + b1, b2 bytes.Buffer + limit = int64(10) + w1 = sharedLimitWriter(&b1, &limit) + w2 = sharedLimitWriter(&b2, &limit) + ch = make(chan struct{}) + wg sync.WaitGroup + ) + wg.Add(2) + go func() { defer wg.Done(); <-ch; w1.Write([]byte("hello")) }() + go func() { defer wg.Done(); <-ch; w2.Write([]byte("world")) }() + close(ch) + wg.Wait() + if limit != 0 { + t.Fatalf("expected max limit to be reached, instead of %d", limit) + } + } +}