mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 22:17:14 +00:00
Merge pull request #87933 from jdef/fix/86367
Fix docker/journald logging conformance
This commit is contained in:
commit
3d70825195
@ -18,6 +18,7 @@ package dockershim
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"strconv"
|
"strconv"
|
||||||
@ -29,6 +30,7 @@ import (
|
|||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
kubetypes "k8s.io/apimachinery/pkg/types"
|
kubetypes "k8s.io/apimachinery/pkg/types"
|
||||||
|
"k8s.io/klog"
|
||||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/kuberuntime"
|
"k8s.io/kubernetes/pkg/kubelet/kuberuntime"
|
||||||
|
|
||||||
@ -76,12 +78,23 @@ func (d *dockerService) GetContainerLogs(_ context.Context, pod *v1.Pod, contain
|
|||||||
opts.Tail = strconv.FormatInt(*logOptions.TailLines, 10)
|
opts.Tail = strconv.FormatInt(*logOptions.TailLines, 10)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if logOptions.LimitBytes != nil {
|
||||||
|
// stdout and stderr share the total write limit
|
||||||
|
max := *logOptions.LimitBytes
|
||||||
|
stderr = sharedLimitWriter(stderr, &max)
|
||||||
|
stdout = sharedLimitWriter(stdout, &max)
|
||||||
|
}
|
||||||
sopts := libdocker.StreamOptions{
|
sopts := libdocker.StreamOptions{
|
||||||
OutputStream: stdout,
|
OutputStream: stdout,
|
||||||
ErrorStream: stderr,
|
ErrorStream: stderr,
|
||||||
RawTerminal: container.Config.Tty,
|
RawTerminal: container.Config.Tty,
|
||||||
}
|
}
|
||||||
return d.client.Logs(containerID.ID, opts, sopts)
|
err = d.client.Logs(containerID.ID, opts, sopts)
|
||||||
|
if errors.Is(err, errMaximumWrite) {
|
||||||
|
klog.V(2).Infof("finished logs, hit byte limit %d", *logOptions.LimitBytes)
|
||||||
|
err = nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetContainerLogTail attempts to read up to MaxContainerTerminationMessageLogLength
|
// GetContainerLogTail attempts to read up to MaxContainerTerminationMessageLogLength
|
||||||
|
@ -17,10 +17,13 @@ limitations under the License.
|
|||||||
package dockershim
|
package dockershim
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"regexp"
|
"regexp"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
dockertypes "github.com/docker/docker/api/types"
|
dockertypes "github.com/docker/docker/api/types"
|
||||||
dockercontainer "github.com/docker/docker/api/types/container"
|
dockercontainer "github.com/docker/docker/api/types/container"
|
||||||
@ -393,3 +396,44 @@ type dockerOpt struct {
|
|||||||
func (d dockerOpt) GetKV() (string, string) {
|
func (d dockerOpt) GetKV() (string, string) {
|
||||||
return d.key, d.value
|
return d.key, d.value
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// sharedWriteLimiter limits the total output written across one or more streams.
|
||||||
|
type sharedWriteLimiter struct {
|
||||||
|
delegate io.Writer
|
||||||
|
limit *int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w sharedWriteLimiter) Write(p []byte) (int, error) {
|
||||||
|
if len(p) == 0 {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
limit := atomic.LoadInt64(w.limit)
|
||||||
|
if limit <= 0 {
|
||||||
|
return 0, errMaximumWrite
|
||||||
|
}
|
||||||
|
var truncated bool
|
||||||
|
if limit < int64(len(p)) {
|
||||||
|
p = p[0:limit]
|
||||||
|
truncated = true
|
||||||
|
}
|
||||||
|
n, err := w.delegate.Write(p)
|
||||||
|
if n > 0 {
|
||||||
|
atomic.AddInt64(w.limit, -1*int64(n))
|
||||||
|
}
|
||||||
|
if err == nil && truncated {
|
||||||
|
err = errMaximumWrite
|
||||||
|
}
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func sharedLimitWriter(w io.Writer, limit *int64) io.Writer {
|
||||||
|
if w == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return &sharedWriteLimiter{
|
||||||
|
delegate: w,
|
||||||
|
limit: limit,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var errMaximumWrite = errors.New("maximum write")
|
||||||
|
@ -17,7 +17,10 @@ limitations under the License.
|
|||||||
package dockershim
|
package dockershim
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
dockertypes "github.com/docker/docker/api/types"
|
dockertypes "github.com/docker/docker/api/types"
|
||||||
@ -332,3 +335,104 @@ func TestGenerateMountBindings(t *testing.T) {
|
|||||||
|
|
||||||
assert.Equal(t, expectedResult, result)
|
assert.Equal(t, expectedResult, result)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestLimitedWriter(t *testing.T) {
|
||||||
|
max := func(x, y int64) int64 {
|
||||||
|
if x > y {
|
||||||
|
return x
|
||||||
|
}
|
||||||
|
return y
|
||||||
|
}
|
||||||
|
for name, tc := range map[string]struct {
|
||||||
|
w bytes.Buffer
|
||||||
|
toWrite string
|
||||||
|
limit int64
|
||||||
|
wants string
|
||||||
|
wantsErr error
|
||||||
|
}{
|
||||||
|
"nil": {},
|
||||||
|
"neg": {
|
||||||
|
toWrite: "a",
|
||||||
|
wantsErr: errMaximumWrite,
|
||||||
|
limit: -1,
|
||||||
|
},
|
||||||
|
"1byte-over": {
|
||||||
|
toWrite: "a",
|
||||||
|
wantsErr: errMaximumWrite,
|
||||||
|
},
|
||||||
|
"1byte-maxed": {
|
||||||
|
toWrite: "a",
|
||||||
|
wants: "a",
|
||||||
|
limit: 1,
|
||||||
|
},
|
||||||
|
"1byte-under": {
|
||||||
|
toWrite: "a",
|
||||||
|
wants: "a",
|
||||||
|
limit: 2,
|
||||||
|
},
|
||||||
|
"6byte-over": {
|
||||||
|
toWrite: "foobar",
|
||||||
|
wants: "foo",
|
||||||
|
limit: 3,
|
||||||
|
wantsErr: errMaximumWrite,
|
||||||
|
},
|
||||||
|
"6byte-maxed": {
|
||||||
|
toWrite: "foobar",
|
||||||
|
wants: "foobar",
|
||||||
|
limit: 6,
|
||||||
|
},
|
||||||
|
"6byte-under": {
|
||||||
|
toWrite: "foobar",
|
||||||
|
wants: "foobar",
|
||||||
|
limit: 20,
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
limit := tc.limit
|
||||||
|
w := sharedLimitWriter(&tc.w, &limit)
|
||||||
|
n, err := w.Write([]byte(tc.toWrite))
|
||||||
|
if int64(n) > max(0, tc.limit) {
|
||||||
|
t.Fatalf("bytes written (%d) exceeds limit (%d)", n, tc.limit)
|
||||||
|
}
|
||||||
|
if (err != nil) != (tc.wantsErr != nil) {
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("unexpected error:", err)
|
||||||
|
}
|
||||||
|
t.Fatal("expected error:", err)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
if !errors.Is(err, tc.wantsErr) {
|
||||||
|
t.Fatal("expected error: ", tc.wantsErr, " instead of: ", err)
|
||||||
|
}
|
||||||
|
if !errors.Is(err, errMaximumWrite) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// check contents for errMaximumWrite
|
||||||
|
}
|
||||||
|
if s := tc.w.String(); s != tc.wants {
|
||||||
|
t.Fatalf("expected %q instead of %q", tc.wants, s)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// test concurrency. run this test a bunch of times to attempt to flush
|
||||||
|
// out any data races or concurrency issues.
|
||||||
|
for i := 0; i < 1000; i++ {
|
||||||
|
var (
|
||||||
|
b1, b2 bytes.Buffer
|
||||||
|
limit = int64(10)
|
||||||
|
w1 = sharedLimitWriter(&b1, &limit)
|
||||||
|
w2 = sharedLimitWriter(&b2, &limit)
|
||||||
|
ch = make(chan struct{})
|
||||||
|
wg sync.WaitGroup
|
||||||
|
)
|
||||||
|
wg.Add(2)
|
||||||
|
go func() { defer wg.Done(); <-ch; w1.Write([]byte("hello")) }()
|
||||||
|
go func() { defer wg.Done(); <-ch; w2.Write([]byte("world")) }()
|
||||||
|
close(ch)
|
||||||
|
wg.Wait()
|
||||||
|
if limit != 0 {
|
||||||
|
t.Fatalf("expected max limit to be reached, instead of %d", limit)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user