feat(kubelet): only returns logs that match the given stream

Signed-off-by: Jian Zeng <anonymousknight96@gmail.com>
This commit is contained in:
Jian Zeng 2024-09-13 23:12:21 +08:00
parent 0793f6577f
commit 94cd0a0892
No known key found for this signature in database
GPG Key ID: 61C5DB9CE28EED62
5 changed files with 361 additions and 31 deletions

View File

@ -1566,11 +1566,14 @@ func (kl *Kubelet) GetKubeletContainerLogs(ctx context.Context, podFullName, con
return err
}
// Do a zero-byte write to stdout before handing off to the container runtime.
// This ensures at least one Write call is made to the writer when copying starts,
// even if we then block waiting for log output from the container.
if _, err := stdout.Write([]byte{}); err != nil {
return err
// Since v1.32, stdout may be nil if the stream is not requested.
if stdout != nil {
// Do a zero-byte write to stdout before handing off to the container runtime.
// This ensures at least one Write call is made to the writer when copying starts,
// even if we then block waiting for log output from the container.
if _, err := stdout.Write([]byte{}); err != nil {
return err
}
}
return kl.containerRuntime.GetContainerLogs(ctx, pod, containerID, logOptions, stdout, stderr)

View File

@ -41,9 +41,9 @@ import (
oteltrace "go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
"k8s.io/utils/clock"
netutils "k8s.io/utils/net"
"k8s.io/utils/ptr"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -83,6 +83,7 @@ import (
apisgrpc "k8s.io/kubernetes/pkg/kubelet/apis/grpc"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
"k8s.io/kubernetes/pkg/kubelet/prober"
servermetrics "k8s.io/kubernetes/pkg/kubelet/server/metrics"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
@ -723,6 +724,13 @@ func (s *Server) getContainerLogs(request *restful.Request, response *restful.Re
response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Unable to decode query."}`))
return
}
if utilfeature.DefaultFeatureGate.Enabled(features.PodLogsQuerySplitStreams) {
// Even with defaulters, logOptions.Stream can be nil if no arguments are provided at all.
if logOptions.Stream == nil {
// Default to "All" to maintain backward compatibility.
logOptions.Stream = ptr.To(v1.LogStreamAll)
}
}
logOptions.TypeMeta = metav1.TypeMeta{}
if errs := validation.ValidatePodLogOptions(logOptions); len(errs) > 0 {
response.WriteError(http.StatusUnprocessableEntity, fmt.Errorf(`{"message": "Invalid request."}`))
@ -744,9 +752,40 @@ func (s *Server) getContainerLogs(request *restful.Request, response *restful.Re
response.WriteError(http.StatusInternalServerError, fmt.Errorf("unable to convert %v into http.Flusher, cannot show logs", reflect.TypeOf(response)))
return
}
fw := flushwriter.Wrap(response.ResponseWriter)
var (
stdout io.Writer
stderr io.Writer
fw = flushwriter.Wrap(response.ResponseWriter)
)
if utilfeature.DefaultFeatureGate.Enabled(features.PodLogsQuerySplitStreams) {
wantedStream := logOptions.Stream
// No stream type specified, default to All
if wantedStream == nil {
allStream := v1.LogStreamAll
wantedStream = &allStream
}
switch *wantedStream {
case v1.LogStreamStdout:
stdout, stderr = fw, nil
case v1.LogStreamStderr:
stdout, stderr = nil, fw
case v1.LogStreamAll:
stdout, stderr = fw, fw
default:
_ = response.WriteError(http.StatusBadRequest, fmt.Errorf("invalid stream type %q", *logOptions.Stream))
return
}
} else {
if logOptions.Stream != nil && *logOptions.Stream != v1.LogStreamAll {
_ = response.WriteError(http.StatusBadRequest, fmt.Errorf("unable to return the given log stream: %q. Please enable PodLogsQuerySplitStreams feature gate in kubelet", *logOptions.Stream))
return
}
stdout, stderr = fw, fw
}
response.Header().Set("Transfer-Encoding", "chunked")
if err := s.host.GetKubeletContainerLogs(ctx, kubecontainer.GetPodFullName(pod), containerName, logOptions, fw, fw); err != nil {
if err := s.host.GetKubeletContainerLogs(ctx, kubecontainer.GetPodFullName(pod), containerName, logOptions, stdout, stderr); err != nil {
response.WriteError(http.StatusBadRequest, err)
return
}

View File

@ -37,6 +37,8 @@ import (
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/utils/ptr"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
@ -50,7 +52,6 @@ import (
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/utils/ptr"
// Do some initialization to decode the query parameters correctly.
"k8s.io/apiserver/pkg/server/healthz"
@ -820,29 +821,41 @@ func TestContainerLogs(t *testing.T) {
}
for desc, test := range tests {
t.Run(desc, func(t *testing.T) {
output := "foo bar"
podNamespace := "other"
podName := "foo"
expectedPodName := getPodName(podName, podNamespace)
expectedContainerName := "baz"
setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
setGetContainerLogsFunc(fw, t, expectedPodName, expectedContainerName, test.podLogOption, output)
resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName + test.query)
if err != nil {
t.Errorf("Got error GETing: %v", err)
}
defer resp.Body.Close()
// To make sure the original behavior doesn't change no matter the feature PodLogsQuerySplitStreams is enabled or not.
for _, enablePodLogsQuerySplitStreams := range []bool{true, false} {
t.Run(fmt.Sprintf("%s (enablePodLogsQuerySplitStreams=%v)", desc, enablePodLogsQuerySplitStreams), func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodLogsQuerySplitStreams, enablePodLogsQuerySplitStreams)
expectedLogOptions := test.podLogOption.DeepCopy()
if enablePodLogsQuerySplitStreams && expectedLogOptions.Stream == nil {
// The HTTP handler will internally set the default stream value.
expectedLogOptions.Stream = ptr.To(v1.LogStreamAll)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
t.Errorf("Error reading container logs: %v", err)
}
result := string(body)
if result != output {
t.Errorf("Expected: '%v', got: '%v'", output, result)
}
})
output := "foo bar"
podNamespace := "other"
podName := "foo"
expectedPodName := getPodName(podName, podNamespace)
expectedContainerName := "baz"
setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
setGetContainerLogsFunc(fw, t, expectedPodName, expectedContainerName, expectedLogOptions, output)
resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName + test.query)
if err != nil {
t.Errorf("Got error GETing: %v", err)
}
defer func() {
_ = resp.Body.Close()
}()
body, err := io.ReadAll(resp.Body)
if err != nil {
t.Errorf("Error reading container logs: %v", err)
}
result := string(body)
if result != output {
t.Errorf("Expected: '%v', got: '%v'", output, result)
}
})
}
}
}
@ -866,6 +879,220 @@ func TestContainerLogsWithInvalidTail(t *testing.T) {
}
}
func TestContainerLogsWithSeparateStream(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodLogsQuerySplitStreams, true)
type logEntry struct {
stream string
msg string
}
fw := newServerTest()
defer fw.testHTTPServer.Close()
var (
streamStdout = v1.LogStreamStdout
streamStderr = v1.LogStreamStderr
streamAll = v1.LogStreamAll
)
testCases := []struct {
name string
query string
logs []logEntry
expectedOutput string
expectedLogOptions *v1.PodLogOptions
}{
{
// Defaulters don't work if the query is empty.
// See also https://github.com/kubernetes/kubernetes/issues/128589
name: "empty query should return all logs",
logs: []logEntry{
{stream: v1.LogStreamStdout, msg: "foo\n"},
{stream: v1.LogStreamStderr, msg: "bar\n"},
},
query: "",
expectedLogOptions: &v1.PodLogOptions{
Stream: &streamAll,
},
expectedOutput: "foo\nbar\n",
},
{
name: "missing stream param should return all logs",
logs: []logEntry{
{stream: v1.LogStreamStdout, msg: "foo\n"},
{stream: v1.LogStreamStderr, msg: "bar\n"},
},
query: "?limitBytes=100",
expectedLogOptions: &v1.PodLogOptions{
Stream: &streamAll,
LimitBytes: ptr.To[int64](100),
},
expectedOutput: "foo\nbar\n",
},
{
name: "only stdout logs",
logs: []logEntry{
{stream: v1.LogStreamStdout, msg: "out1\n"},
{stream: v1.LogStreamStderr, msg: "err1\n"},
{stream: v1.LogStreamStdout, msg: "out2\n"},
},
query: "?stream=Stdout",
expectedLogOptions: &v1.PodLogOptions{
Stream: &streamStdout,
},
expectedOutput: "out1\nout2\n",
},
{
name: "only stderr logs",
logs: []logEntry{
{stream: v1.LogStreamStderr, msg: "err1\n"},
{stream: v1.LogStreamStderr, msg: "err2\n"},
{stream: v1.LogStreamStdout, msg: "out1\n"},
},
query: "?stream=Stderr",
expectedLogOptions: &v1.PodLogOptions{
Stream: &streamStderr,
},
expectedOutput: "err1\nerr2\n",
},
{
name: "return all logs",
logs: []logEntry{
{stream: v1.LogStreamStdout, msg: "out1\n"},
{stream: v1.LogStreamStderr, msg: "err1\n"},
{stream: v1.LogStreamStdout, msg: "out2\n"},
},
query: "?stream=All",
expectedLogOptions: &v1.PodLogOptions{
Stream: &streamAll,
},
expectedOutput: "out1\nerr1\nout2\n",
},
{
name: "stdout logs with legacy tail",
logs: []logEntry{
{stream: v1.LogStreamStdout, msg: "out1\n"},
{stream: v1.LogStreamStderr, msg: "err1\n"},
{stream: v1.LogStreamStdout, msg: "out2\n"},
},
query: "?stream=All&tail=1",
expectedLogOptions: &v1.PodLogOptions{
Stream: &streamAll,
TailLines: ptr.To[int64](1),
},
expectedOutput: "out2\n",
},
{
name: "return the last 2 lines of logs",
logs: []logEntry{
{stream: v1.LogStreamStdout, msg: "out1\n"},
{stream: v1.LogStreamStderr, msg: "err1\n"},
{stream: v1.LogStreamStdout, msg: "out2\n"},
},
query: "?stream=All&tailLines=2",
expectedLogOptions: &v1.PodLogOptions{
Stream: &streamAll,
TailLines: ptr.To[int64](2),
},
expectedOutput: "err1\nout2\n",
},
{
name: "return the first 6 bytes of the stdout log stream",
logs: []logEntry{
{stream: v1.LogStreamStderr, msg: "err1\n"},
{stream: v1.LogStreamStdout, msg: "out1\n"},
{stream: v1.LogStreamStderr, msg: "err2\n"},
{stream: v1.LogStreamStdout, msg: "out2\n"},
},
query: "?stream=Stdout&limitBytes=6",
expectedLogOptions: &v1.PodLogOptions{
Stream: &streamStdout,
LimitBytes: ptr.To[int64](6),
},
expectedOutput: "out1\no",
},
{
name: "invalid stream",
logs: []logEntry{
{stream: v1.LogStreamStderr, msg: "err1\n"},
{stream: v1.LogStreamStdout, msg: "out1\n"},
},
query: "?stream=invalid",
expectedLogOptions: nil,
expectedOutput: `{"message": "Invalid request."}`,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
podNamespace := "other"
podName := "foo"
expectedContainerName := "baz"
setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
fw.fakeKubelet.containerLogsFunc = func(_ context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error {
if !reflect.DeepEqual(tc.expectedLogOptions, logOptions) {
t.Errorf("expected %#v, got %#v", tc.expectedLogOptions, logOptions)
}
var dst io.Writer
tailLines := len(tc.logs)
if logOptions.TailLines != nil {
tailLines = int(*logOptions.TailLines)
}
remain := 0
if logOptions.LimitBytes != nil {
remain = int(*logOptions.LimitBytes)
} else {
for _, log := range tc.logs {
remain += len(log.msg)
}
}
logs := tc.logs[len(tc.logs)-tailLines:]
for _, log := range logs {
switch log.stream {
case v1.LogStreamStdout:
dst = stdout
case v1.LogStreamStderr:
dst = stderr
}
// Skip if the stream is not requested
if dst == nil {
continue
}
line := log.msg
if len(line) > remain {
line = line[:remain]
}
_, _ = io.WriteString(dst, line)
remain -= len(line)
if remain <= 0 {
return nil
}
}
return nil
}
resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName + tc.query)
if err != nil {
t.Errorf("Got error GETing: %v", err)
}
defer func() {
_ = resp.Body.Close()
}()
body, err := io.ReadAll(resp.Body)
if err != nil {
t.Errorf("Error reading container logs: %v", err)
}
result := string(body)
if result != tc.expectedOutput {
t.Errorf("Expected: %q, got: %q", tc.expectedOutput, result)
}
})
}
}
func TestCheckpointContainer(t *testing.T) {
podNamespace := "other"
podName := "foo"

View File

@ -267,6 +267,11 @@ func (w *logWriter) write(msg *logMessage, addPrefix bool) error {
default:
return fmt.Errorf("unexpected stream type %q", msg.stream)
}
// Since v1.32, either w.stdout or w.stderr may be nil if the stream is not requested.
// In such case, we should neither count the bytes nor write to the stream.
if stream == nil {
return nil
}
n, err := stream.Write(line)
w.remain -= int64(n)
if err != nil {

View File

@ -20,6 +20,7 @@ import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
@ -28,6 +29,7 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -540,3 +542,57 @@ func TestReadLogsLimitsWithTimestamps(t *testing.T) {
assert.Equal(t, 2, lineCount, "should have two lines")
}
func TestOnlyStdoutStream(t *testing.T) {
timestamp := time.Unix(1234, 43210)
msgs := []*logMessage{
{
timestamp: timestamp,
stream: runtimeapi.Stdout,
log: []byte("out1\n"),
},
{
timestamp: timestamp,
stream: runtimeapi.Stderr,
log: []byte("err1\n"),
},
{
timestamp: timestamp,
stream: runtimeapi.Stdout,
log: []byte("out2\n"),
},
}
testCases := map[string]struct {
limitBytes int64
expectedStdout string
}{
"all stdout logs": {
limitBytes: -1,
expectedStdout: "out1\nout2\n",
},
"the first 7 bytes from stdout": {
limitBytes: 7,
expectedStdout: "out1\nou",
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
stdoutBuf := bytes.NewBuffer(nil)
w := newLogWriter(stdoutBuf, nil, &LogOptions{
bytes: tc.limitBytes,
})
for _, msg := range msgs {
err := w.write(msg, false)
if errors.Is(err, errMaximumWrite) {
continue
}
require.NoError(t, err)
}
assert.EqualValues(t, tc.expectedStdout, stdoutBuf.String())
})
}
}