From 6f8dfdfb0e722f5e293957095ae0b5562a9e4347 Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Fri, 7 Feb 2020 15:54:00 -0600 Subject: [PATCH 1/3] Fix docker/journald logging conformance See issue #86367 --- .../dockershim/docker_legacy_service.go | 15 ++- pkg/kubelet/dockershim/helpers.go | 48 ++++++++ pkg/kubelet/dockershim/helpers_test.go | 104 ++++++++++++++++++ 3 files changed, 166 insertions(+), 1 deletion(-) diff --git a/pkg/kubelet/dockershim/docker_legacy_service.go b/pkg/kubelet/dockershim/docker_legacy_service.go index 78616c21f24..352f110e300 100644 --- a/pkg/kubelet/dockershim/docker_legacy_service.go +++ b/pkg/kubelet/dockershim/docker_legacy_service.go @@ -18,6 +18,7 @@ package dockershim import ( "context" + "errors" "fmt" "io" "strconv" @@ -29,6 +30,7 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kubetypes "k8s.io/apimachinery/pkg/types" + "k8s.io/klog" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/kuberuntime" @@ -76,12 +78,23 @@ func (d *dockerService) GetContainerLogs(_ context.Context, pod *v1.Pod, contain opts.Tail = strconv.FormatInt(*logOptions.TailLines, 10) } + if logOptions.LimitBytes != nil { + // stdout and stderr share the total write limit + max := *logOptions.LimitBytes + stderr = limitedWriter(stderr, &max) + stdout = limitedWriter(stdout, &max) + } sopts := libdocker.StreamOptions{ OutputStream: stdout, ErrorStream: stderr, RawTerminal: container.Config.Tty, } - return d.client.Logs(containerID.ID, opts, sopts) + err = d.client.Logs(containerID.ID, opts, sopts) + if errors.Is(err, errMaximumWrite) { + klog.V(2).Infof("finished logs, hit byte limit %d", *logOptions.LimitBytes) + err = nil + } + return err } // GetContainerLogTail attempts to read up to MaxContainerTerminationMessageLogLength diff --git a/pkg/kubelet/dockershim/helpers.go b/pkg/kubelet/dockershim/helpers.go index a370e57fc72..4dfd9f03ba4 100644 --- a/pkg/kubelet/dockershim/helpers.go +++ b/pkg/kubelet/dockershim/helpers.go @@ -17,10 +17,13 @@ limitations under the License. package dockershim import ( + "errors" "fmt" + "io" "regexp" "strconv" "strings" + "sync/atomic" dockertypes "github.com/docker/docker/api/types" dockercontainer "github.com/docker/docker/api/types/container" @@ -393,3 +396,48 @@ type dockerOpt struct { func (d dockerOpt) GetKV() (string, string) { return d.key, d.value } + +// writeLimiter limits the total output written across one or more streams. +type writeLimiter struct { + delegate io.Writer + limit *int64 +} + +func (w writeLimiter) Write(p []byte) (int, error) { + if len(p) == 0 { + return 0, nil + } + limit := atomic.LoadInt64(w.limit) + if limit <= 0 { + return 0, errMaximumWrite + } + var truncated bool + if limit < int64(len(p)) { + p = p[0:limit] + truncated = true + } + n, err := w.delegate.Write(p) + if n > 0 { + newLimit := limit - int64(n) + for !atomic.CompareAndSwapInt64(w.limit, limit, newLimit) { + limit = atomic.LoadInt64(w.limit) + newLimit = limit - int64(n) + } + } + if err == nil && truncated { + err = errMaximumWrite + } + return n, err +} + +func limitedWriter(w io.Writer, limit *int64) io.Writer { + if w == nil { + return nil + } + return &writeLimiter{ + delegate: w, + limit: limit, + } +} + +var errMaximumWrite = errors.New("maximum write") diff --git a/pkg/kubelet/dockershim/helpers_test.go b/pkg/kubelet/dockershim/helpers_test.go index 20144820274..ba9b972d0ee 100644 --- a/pkg/kubelet/dockershim/helpers_test.go +++ b/pkg/kubelet/dockershim/helpers_test.go @@ -17,7 +17,10 @@ limitations under the License. package dockershim import ( + "bytes" + "errors" "fmt" + "sync" "testing" dockertypes "github.com/docker/docker/api/types" @@ -332,3 +335,104 @@ func TestGenerateMountBindings(t *testing.T) { assert.Equal(t, expectedResult, result) } + +func TestLimitedWriter(t *testing.T) { + max := func(x, y int64) int64 { + if x > y { + return x + } + return y + } + for name, tc := range map[string]struct { + w bytes.Buffer + toWrite string + limit int64 + wants string + wantsErr error + }{ + "nil": {}, + "neg": { + toWrite: "a", + wantsErr: errMaximumWrite, + limit: -1, + }, + "1byte-over": { + toWrite: "a", + wantsErr: errMaximumWrite, + }, + "1byte-maxed": { + toWrite: "a", + wants: "a", + limit: 1, + }, + "1byte-under": { + toWrite: "a", + wants: "a", + limit: 2, + }, + "6byte-over": { + toWrite: "foobar", + wants: "foo", + limit: 3, + wantsErr: errMaximumWrite, + }, + "6byte-maxed": { + toWrite: "foobar", + wants: "foobar", + limit: 6, + }, + "6byte-under": { + toWrite: "foobar", + wants: "foobar", + limit: 20, + }, + } { + t.Run(name, func(t *testing.T) { + limit := tc.limit + w := limitedWriter(&tc.w, &limit) + n, err := w.Write([]byte(tc.toWrite)) + if int64(n) > max(0, tc.limit) { + t.Fatalf("bytes written (%d) exceeds limit (%d)", n, tc.limit) + } + if (err != nil) != (tc.wantsErr != nil) { + if err != nil { + t.Fatal("unexpected error:", err) + } + t.Fatal("expected error:", err) + } + if err != nil { + if !errors.Is(err, tc.wantsErr) { + t.Fatal("expected error: ", tc.wantsErr, " instead of: ", err) + } + if !errors.Is(err, errMaximumWrite) { + return + } + // check contents for errMaximumWrite + } + if s := tc.w.String(); s != tc.wants { + t.Fatalf("expected %q instead of %q", tc.wants, s) + } + }) + } + + // test concurrency. run this test a bunch of times to attempt to flush + // out any data races or concurrency issues. + for i := 0; i < 1000; i++ { + var ( + b1, b2 bytes.Buffer + limit = int64(10) + w1 = limitedWriter(&b1, &limit) + w2 = limitedWriter(&b2, &limit) + ch = make(chan struct{}) + wg sync.WaitGroup + ) + wg.Add(2) + go func() { defer wg.Done(); <-ch; w1.Write([]byte("hello")) }() + go func() { defer wg.Done(); <-ch; w2.Write([]byte("world")) }() + close(ch) + wg.Wait() + if limit != 0 { + t.Fatalf("expected max limit to be reached, instead of %d", limit) + } + } +} From a4230055f36635d5ca15c7f34b562954721b429c Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Thu, 13 Feb 2020 11:43:09 -0600 Subject: [PATCH 2/3] address review feedback --- pkg/kubelet/dockershim/helpers.go | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/pkg/kubelet/dockershim/helpers.go b/pkg/kubelet/dockershim/helpers.go index 4dfd9f03ba4..82da392f60c 100644 --- a/pkg/kubelet/dockershim/helpers.go +++ b/pkg/kubelet/dockershim/helpers.go @@ -397,13 +397,13 @@ func (d dockerOpt) GetKV() (string, string) { return d.key, d.value } -// writeLimiter limits the total output written across one or more streams. -type writeLimiter struct { +// sharedWriteLimiter limits the total output written across one or more streams. +type sharedWriteLimiter struct { delegate io.Writer limit *int64 } -func (w writeLimiter) Write(p []byte) (int, error) { +func (w sharedWriteLimiter) Write(p []byte) (int, error) { if len(p) == 0 { return 0, nil } @@ -418,11 +418,7 @@ func (w writeLimiter) Write(p []byte) (int, error) { } n, err := w.delegate.Write(p) if n > 0 { - newLimit := limit - int64(n) - for !atomic.CompareAndSwapInt64(w.limit, limit, newLimit) { - limit = atomic.LoadInt64(w.limit) - newLimit = limit - int64(n) - } + atomic.AddInt64(w.limit, -1*int64(n)) } if err == nil && truncated { err = errMaximumWrite @@ -434,7 +430,7 @@ func limitedWriter(w io.Writer, limit *int64) io.Writer { if w == nil { return nil } - return &writeLimiter{ + return &sharedWriteLimiter{ delegate: w, limit: limit, } From 0e178f9341405d974bc493c75a4188b83f2ba189 Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Fri, 14 Feb 2020 13:48:41 -0600 Subject: [PATCH 3/3] rename to sharedLimitWriter --- pkg/kubelet/dockershim/docker_legacy_service.go | 4 ++-- pkg/kubelet/dockershim/helpers.go | 2 +- pkg/kubelet/dockershim/helpers_test.go | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/kubelet/dockershim/docker_legacy_service.go b/pkg/kubelet/dockershim/docker_legacy_service.go index 352f110e300..5026722303d 100644 --- a/pkg/kubelet/dockershim/docker_legacy_service.go +++ b/pkg/kubelet/dockershim/docker_legacy_service.go @@ -81,8 +81,8 @@ func (d *dockerService) GetContainerLogs(_ context.Context, pod *v1.Pod, contain if logOptions.LimitBytes != nil { // stdout and stderr share the total write limit max := *logOptions.LimitBytes - stderr = limitedWriter(stderr, &max) - stdout = limitedWriter(stdout, &max) + stderr = sharedLimitWriter(stderr, &max) + stdout = sharedLimitWriter(stdout, &max) } sopts := libdocker.StreamOptions{ OutputStream: stdout, diff --git a/pkg/kubelet/dockershim/helpers.go b/pkg/kubelet/dockershim/helpers.go index 82da392f60c..9883d901261 100644 --- a/pkg/kubelet/dockershim/helpers.go +++ b/pkg/kubelet/dockershim/helpers.go @@ -426,7 +426,7 @@ func (w sharedWriteLimiter) Write(p []byte) (int, error) { return n, err } -func limitedWriter(w io.Writer, limit *int64) io.Writer { +func sharedLimitWriter(w io.Writer, limit *int64) io.Writer { if w == nil { return nil } diff --git a/pkg/kubelet/dockershim/helpers_test.go b/pkg/kubelet/dockershim/helpers_test.go index ba9b972d0ee..2597af61e4a 100644 --- a/pkg/kubelet/dockershim/helpers_test.go +++ b/pkg/kubelet/dockershim/helpers_test.go @@ -389,7 +389,7 @@ func TestLimitedWriter(t *testing.T) { } { t.Run(name, func(t *testing.T) { limit := tc.limit - w := limitedWriter(&tc.w, &limit) + w := sharedLimitWriter(&tc.w, &limit) n, err := w.Write([]byte(tc.toWrite)) if int64(n) > max(0, tc.limit) { t.Fatalf("bytes written (%d) exceeds limit (%d)", n, tc.limit) @@ -421,8 +421,8 @@ func TestLimitedWriter(t *testing.T) { var ( b1, b2 bytes.Buffer limit = int64(10) - w1 = limitedWriter(&b1, &limit) - w2 = limitedWriter(&b2, &limit) + w1 = sharedLimitWriter(&b1, &limit) + w2 = sharedLimitWriter(&b2, &limit) ch = make(chan struct{}) wg sync.WaitGroup )