Merge pull request #127360 from knight42/feat/split-stdout-stderr-server-side

API: add a new `Stream` field to `PodLogOptions`
This commit is contained in:
Kubernetes Prow Robot 2024-11-07 19:44:45 +00:00 committed by GitHub
commit 9660e5c4cd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
37 changed files with 1729 additions and 959 deletions

View File

@ -19587,8 +19587,15 @@
"type": "boolean",
"uniqueItems": true
},
"tailLines-2fRTNzbP": {
"description": "If set, the number of lines from the end of the logs to show. If not specified, logs are shown from the creation of the container or sinceSeconds or sinceTime",
"stream-l-48cgXv": {
"description": "Specify which container log stream to return to the client. Acceptable values are \"All\", \"Stdout\" and \"Stderr\". If not specified, \"All\" is used, and both stdout and stderr are returned interleaved. Note that when \"TailLines\" is specified, \"Stream\" can only be set to nil or \"All\".",
"in": "query",
"name": "stream",
"type": "string",
"uniqueItems": true
},
"tailLines-9xQLWHMV": {
"description": "If set, the number of lines from the end of the logs to show. If not specified, logs are shown from the creation of the container or sinceSeconds or sinceTime. Note that when \"TailLines\" is specified, \"Stream\" can only be set to nil or \"All\".",
"in": "query",
"name": "tailLines",
"type": "integer",
@ -24199,7 +24206,10 @@
"$ref": "#/parameters/sinceSeconds-vE2NLdnP"
},
{
"$ref": "#/parameters/tailLines-2fRTNzbP"
"$ref": "#/parameters/stream-l-48cgXv"
},
{
"$ref": "#/parameters/tailLines-9xQLWHMV"
},
{
"$ref": "#/parameters/timestamps-c17fW1w_"

View File

@ -18038,7 +18038,16 @@
}
},
{
"description": "If set, the number of lines from the end of the logs to show. If not specified, logs are shown from the creation of the container or sinceSeconds or sinceTime",
"description": "Specify which container log stream to return to the client. Acceptable values are \"All\", \"Stdout\" and \"Stderr\". If not specified, \"All\" is used, and both stdout and stderr are returned interleaved. Note that when \"TailLines\" is specified, \"Stream\" can only be set to nil or \"All\".",
"in": "query",
"name": "stream",
"schema": {
"type": "string",
"uniqueItems": true
}
},
{
"description": "If set, the number of lines from the end of the logs to show. If not specified, logs are shown from the creation of the container or sinceSeconds or sinceTime. Note that when \"TailLines\" is specified, \"Stream\" can only be set to nil or \"All\".",
"in": "query",
"name": "tailLines",
"schema": {

View File

@ -5572,6 +5572,15 @@ type Preconditions struct {
UID *types.UID
}
const (
// LogStreamStdout is the stream type for stdout.
LogStreamStdout = "Stdout"
// LogStreamStderr is the stream type for stderr.
LogStreamStderr = "Stderr"
// LogStreamAll represents the combined stdout and stderr.
LogStreamAll = "All"
)
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// PodLogOptions is the query options for a Pod's logs REST call
@ -5598,7 +5607,8 @@ type PodLogOptions struct {
// of log output.
Timestamps bool
// If set, the number of lines from the end of the logs to show. If not specified,
// logs are shown from the creation of the container or sinceSeconds or sinceTime
// logs are shown from the creation of the container or sinceSeconds or sinceTime.
// Note that when "TailLines" is specified, "Stream" can only be set to nil or "All".
TailLines *int64
// If set, the number of bytes to read from the server before terminating the
// log output. This may not display a complete final line of logging, and may return
@ -5613,6 +5623,14 @@ type PodLogOptions struct {
// the actual log data coming from the real kubelet).
// +optional
InsecureSkipTLSVerifyBackend bool
// Specify which container log stream to return to the client.
// Acceptable values are "All", "Stdout" and "Stderr". If not specified, "All" is used, and both stdout and stderr
// are returned interleaved.
// Note that when "TailLines" is specified, "Stream" can only be set to nil or "All".
// +featureGate=PodLogsQuerySplitStreams
// +optional
Stream *string
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

View File

@ -20,15 +20,15 @@ import (
"fmt"
"reflect"
v1 "k8s.io/api/core/v1"
"k8s.io/utils/ptr"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/kubernetes/pkg/apis/apps"
"k8s.io/kubernetes/pkg/apis/core"
utilpointer "k8s.io/utils/pointer"
)
func addConversionFuncs(scheme *runtime.Scheme) error {
@ -380,7 +380,7 @@ func Convert_v1_Pod_To_core_Pod(in *v1.Pod, out *core.Pod, s conversion.Scope) e
// Forcing the value of TerminationGracePeriodSeconds to 1 if it is negative.
// Just for Pod, not for PodSpec, because we don't want to change the behavior of the PodTemplate.
if in.Spec.TerminationGracePeriodSeconds != nil && *in.Spec.TerminationGracePeriodSeconds < 0 {
out.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(1)
out.Spec.TerminationGracePeriodSeconds = ptr.To[int64](1)
}
return nil
}
@ -397,7 +397,7 @@ func Convert_core_Pod_To_v1_Pod(in *core.Pod, out *v1.Pod, s conversion.Scope) e
// Forcing the value of TerminationGracePeriodSeconds to 1 if it is negative.
// Just for Pod, not for PodSpec, because we don't want to change the behavior of the PodTemplate.
if in.Spec.TerminationGracePeriodSeconds != nil && *in.Spec.TerminationGracePeriodSeconds < 0 {
out.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(1)
out.Spec.TerminationGracePeriodSeconds = ptr.To[int64](1)
}
return nil
}
@ -554,3 +554,13 @@ func Convert_core_PersistentVolumeSpec_To_v1_PersistentVolumeSpec(in *core.Persi
func Convert_v1_PersistentVolumeSpec_To_core_PersistentVolumeSpec(in *v1.PersistentVolumeSpec, out *core.PersistentVolumeSpec, s conversion.Scope) error {
return autoConvert_v1_PersistentVolumeSpec_To_core_PersistentVolumeSpec(in, out, s)
}
// Convert_Slice_string_To_Pointer_string is needed because decoding URL parameters requires manual assistance.
func Convert_Slice_string_To_Pointer_string(in *[]string, out **string, s conversion.Scope) error {
if len(*in) == 0 {
return nil
}
temp := (*in)[0]
*out = &temp
return nil
}

View File

@ -52,6 +52,8 @@ func TestPodLogOptions(t *testing.T) {
sinceTime := metav1.NewTime(time.Date(2000, 1, 1, 12, 34, 56, 0, time.UTC).Local())
tailLines := int64(2)
limitBytes := int64(3)
v1StreamStderr := v1.LogStreamStderr
coreStreamStderr := core.LogStreamStderr
versionedLogOptions := &v1.PodLogOptions{
Container: "mycontainer",
@ -62,6 +64,7 @@ func TestPodLogOptions(t *testing.T) {
Timestamps: true,
TailLines: &tailLines,
LimitBytes: &limitBytes,
Stream: &v1StreamStderr,
}
unversionedLogOptions := &core.PodLogOptions{
Container: "mycontainer",
@ -72,6 +75,7 @@ func TestPodLogOptions(t *testing.T) {
Timestamps: true,
TailLines: &tailLines,
LimitBytes: &limitBytes,
Stream: &coreStreamStderr,
}
expectedParameters := url.Values{
"container": {"mycontainer"},
@ -82,6 +86,7 @@ func TestPodLogOptions(t *testing.T) {
"timestamps": {"true"},
"tailLines": {"2"},
"limitBytes": {"3"},
"stream": {"Stderr"},
}
codec := runtime.NewParameterCodec(legacyscheme.Scheme)

View File

@ -19,6 +19,8 @@ package v1
import (
"time"
"k8s.io/utils/ptr"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
@ -26,7 +28,6 @@ import (
"k8s.io/kubernetes/pkg/api/v1/service"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/util/parsers"
"k8s.io/utils/pointer"
)
func addDefaultingFuncs(scheme *runtime.Scheme) error {
@ -64,7 +65,7 @@ func SetDefaults_ReplicationController(obj *v1.ReplicationController) {
}
}
func SetDefaults_Volume(obj *v1.Volume) {
if pointer.AllPtrFieldsNil(&obj.VolumeSource) {
if ptr.AllPtrFieldsNil(&obj.VolumeSource) {
obj.VolumeSource = v1.VolumeSource{
EmptyDir: &v1.EmptyDirVolumeSource{},
}
@ -147,7 +148,7 @@ func SetDefaults_Service(obj *v1.Service) {
if obj.Spec.Type == v1.ServiceTypeLoadBalancer {
if obj.Spec.AllocateLoadBalancerNodePorts == nil {
obj.Spec.AllocateLoadBalancerNodePorts = pointer.Bool(true)
obj.Spec.AllocateLoadBalancerNodePorts = ptr.To(true)
}
}
@ -429,3 +430,11 @@ func SetDefaults_HostPathVolumeSource(obj *v1.HostPathVolumeSource) {
obj.Type = &typeVol
}
}
func SetDefaults_PodLogOptions(obj *v1.PodLogOptions) {
if utilfeature.DefaultFeatureGate.Enabled(features.PodLogsQuerySplitStreams) {
if obj.Stream == nil {
obj.Stream = ptr.To(v1.LogStreamAll)
}
}
}

View File

@ -24,6 +24,8 @@ import (
"testing"
"github.com/google/go-cmp/cmp"
"k8s.io/utils/ptr"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -35,7 +37,6 @@ import (
"k8s.io/kubernetes/pkg/api/legacyscheme"
corev1 "k8s.io/kubernetes/pkg/apis/core/v1"
"k8s.io/kubernetes/pkg/features"
utilpointer "k8s.io/utils/pointer"
// ensure types are installed
_ "k8s.io/kubernetes/pkg/apis/core/install"
@ -690,7 +691,7 @@ func TestSetDefaultReplicationControllerReplicas(t *testing.T) {
{
rc: v1.ReplicationController{
Spec: v1.ReplicationControllerSpec{
Replicas: utilpointer.Int32(0),
Replicas: ptr.To[int32](0),
Template: &v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
@ -705,7 +706,7 @@ func TestSetDefaultReplicationControllerReplicas(t *testing.T) {
{
rc: v1.ReplicationController{
Spec: v1.ReplicationControllerSpec{
Replicas: utilpointer.Int32(3),
Replicas: ptr.To[int32](3),
Template: &v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
@ -1930,7 +1931,7 @@ func TestDefaultRequestIsNotSetForReplicationController(t *testing.T) {
}
rc := &v1.ReplicationController{
Spec: v1.ReplicationControllerSpec{
Replicas: utilpointer.Int32(3),
Replicas: ptr.To[int32](3),
Template: &v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
@ -2349,3 +2350,26 @@ func TestSetDefaults_Volume(t *testing.T) {
})
}
}
func TestSetDefaults_PodLogOptions(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodLogsQuerySplitStreams, true)
for desc, tc := range map[string]struct {
given, expected *v1.PodLogOptions
}{
"defaults to All": {
given: &v1.PodLogOptions{},
expected: &v1.PodLogOptions{Stream: ptr.To(v1.LogStreamAll)},
},
"the specified stream should not be overridden": {
given: &v1.PodLogOptions{Stream: ptr.To(v1.LogStreamStdout)},
expected: &v1.PodLogOptions{Stream: ptr.To(v1.LogStreamStdout)},
},
} {
t.Run(desc, func(t *testing.T) {
corev1.SetDefaults_PodLogOptions(tc.given)
if !cmp.Equal(tc.given, tc.expected) {
t.Errorf("expected volume %+v, but got %+v", tc.expected, tc.given)
}
})
}
}

View File

@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/core/helper"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
@ -124,6 +125,12 @@ func validateResourceName(value core.ResourceName, fldPath *field.Path) field.Er
return allErrs
}
var validLogStreams = sets.New[string](
v1.LogStreamStdout,
v1.LogStreamStderr,
v1.LogStreamAll,
)
// ValidatePodLogOptions checks if options that are set are at the correct
// value. Any incorrect value will be returned to the ErrorList.
func ValidatePodLogOptions(opts *v1.PodLogOptions) field.ErrorList {
@ -142,6 +149,15 @@ func ValidatePodLogOptions(opts *v1.PodLogOptions) field.ErrorList {
allErrs = append(allErrs, field.Invalid(field.NewPath("sinceSeconds"), *opts.SinceSeconds, "must be greater than 0"))
}
}
// opts.Stream can be nil because defaulting might not apply if no URL params are provided.
if opts.Stream != nil {
if !validLogStreams.Has(*opts.Stream) {
allErrs = append(allErrs, field.NotSupported(field.NewPath("stream"), *opts.Stream, validLogStreams.UnsortedList()))
}
if *opts.Stream != v1.LogStreamAll && opts.TailLines != nil {
allErrs = append(allErrs, field.Forbidden(field.NewPath(""), "`tailLines` and specific `stream` are mutually exclusive for now"))
}
}
return allErrs
}

View File

@ -25,6 +25,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/kubernetes/pkg/apis/core"
)
@ -216,6 +217,10 @@ func TestValidatePodLogOptions(t *testing.T) {
sinceSecondsGreaterThan1 = int64(10)
sinceSecondsLessThan1 = int64(0)
timestamp = metav1.Now()
stdoutStream = v1.LogStreamStdout
stderrStream = v1.LogStreamStderr
allStream = v1.LogStreamAll
invalidStream = "invalid"
)
successCase := []struct {
@ -252,6 +257,24 @@ func TestValidatePodLogOptions(t *testing.T) {
TailLines: &positiveLine,
SinceSeconds: &sinceSecondsGreaterThan1,
},
}, {
name: "PodLogOptions with stdout Stream",
podLogOptions: v1.PodLogOptions{
Stream: &stdoutStream,
},
}, {
name: "PodLogOptions with stderr Stream and Follow",
podLogOptions: v1.PodLogOptions{
Stream: &stderrStream,
Follow: true,
},
}, {
name: "PodLogOptions with All Stream, TailLines and LimitBytes",
podLogOptions: v1.PodLogOptions{
Stream: &allStream,
TailLines: &positiveLine,
LimitBytes: &limitBytesGreaterThan1,
},
}}
for _, tc := range successCase {
t.Run(tc.name, func(t *testing.T) {
@ -293,6 +316,23 @@ func TestValidatePodLogOptions(t *testing.T) {
SinceSeconds: &sinceSecondsGreaterThan1,
SinceTime: &timestamp,
},
}, {
name: "Invalid podLogOptions with invalid Stream",
podLogOptions: v1.PodLogOptions{
Stream: &invalidStream,
},
}, {
name: "Invalid podLogOptions with stdout Stream and TailLines set",
podLogOptions: v1.PodLogOptions{
Stream: &stdoutStream,
TailLines: &positiveLine,
},
}, {
name: "Invalid podLogOptions with stderr Stream and TailLines set",
podLogOptions: v1.PodLogOptions{
Stream: &stderrStream,
TailLines: &positiveLine,
},
}}
for _, tc := range errorCase {
t.Run(tc.name, func(t *testing.T) {

View File

@ -2302,6 +2302,11 @@ func RegisterConversions(s *runtime.Scheme) error {
}); err != nil {
return err
}
if err := s.AddConversionFunc((*[]string)(nil), (**string)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_Slice_string_To_Pointer_string(a.(*[]string), b.(**string), scope)
}); err != nil {
return err
}
if err := s.AddConversionFunc((*apps.ReplicaSetSpec)(nil), (*corev1.ReplicationControllerSpec)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_apps_ReplicaSetSpec_To_v1_ReplicationControllerSpec(a.(*apps.ReplicaSetSpec), b.(*corev1.ReplicationControllerSpec), scope)
}); err != nil {
@ -6327,6 +6332,7 @@ func autoConvert_v1_PodLogOptions_To_core_PodLogOptions(in *corev1.PodLogOptions
out.TailLines = (*int64)(unsafe.Pointer(in.TailLines))
out.LimitBytes = (*int64)(unsafe.Pointer(in.LimitBytes))
out.InsecureSkipTLSVerifyBackend = in.InsecureSkipTLSVerifyBackend
out.Stream = (*string)(unsafe.Pointer(in.Stream))
return nil
}
@ -6345,6 +6351,7 @@ func autoConvert_core_PodLogOptions_To_v1_PodLogOptions(in *core.PodLogOptions,
out.TailLines = (*int64)(unsafe.Pointer(in.TailLines))
out.LimitBytes = (*int64)(unsafe.Pointer(in.LimitBytes))
out.InsecureSkipTLSVerifyBackend = in.InsecureSkipTLSVerifyBackend
out.Stream = (*string)(unsafe.Pointer(in.Stream))
return nil
}
@ -6419,6 +6426,13 @@ func autoConvert_url_Values_To_v1_PodLogOptions(in *url.Values, out *corev1.PodL
} else {
out.InsecureSkipTLSVerifyBackend = false
}
if values, ok := map[string][]string(*in)["stream"]; ok && len(values) > 0 {
if err := Convert_Slice_string_To_Pointer_string(&values, &out.Stream, s); err != nil {
return err
}
} else {
out.Stream = nil
}
return nil
}

View File

@ -48,6 +48,7 @@ func RegisterDefaults(scheme *runtime.Scheme) error {
scheme.AddTypeDefaultingFunc(&corev1.PersistentVolumeList{}, func(obj interface{}) { SetObjectDefaults_PersistentVolumeList(obj.(*corev1.PersistentVolumeList)) })
scheme.AddTypeDefaultingFunc(&corev1.Pod{}, func(obj interface{}) { SetObjectDefaults_Pod(obj.(*corev1.Pod)) })
scheme.AddTypeDefaultingFunc(&corev1.PodList{}, func(obj interface{}) { SetObjectDefaults_PodList(obj.(*corev1.PodList)) })
scheme.AddTypeDefaultingFunc(&corev1.PodLogOptions{}, func(obj interface{}) { SetObjectDefaults_PodLogOptions(obj.(*corev1.PodLogOptions)) })
scheme.AddTypeDefaultingFunc(&corev1.PodStatusResult{}, func(obj interface{}) { SetObjectDefaults_PodStatusResult(obj.(*corev1.PodStatusResult)) })
scheme.AddTypeDefaultingFunc(&corev1.PodTemplate{}, func(obj interface{}) { SetObjectDefaults_PodTemplate(obj.(*corev1.PodTemplate)) })
scheme.AddTypeDefaultingFunc(&corev1.PodTemplateList{}, func(obj interface{}) { SetObjectDefaults_PodTemplateList(obj.(*corev1.PodTemplateList)) })
@ -532,6 +533,10 @@ func SetObjectDefaults_PodList(in *corev1.PodList) {
}
}
func SetObjectDefaults_PodLogOptions(in *corev1.PodLogOptions) {
SetDefaults_PodLogOptions(in)
}
func SetObjectDefaults_PodStatusResult(in *corev1.PodStatusResult) {
for i := range in.Status.InitContainerStatuses {
a := &in.Status.InitContainerStatuses[i]

View File

@ -31,6 +31,8 @@ import (
"unicode/utf8"
"github.com/google/go-cmp/cmp"
netutils "k8s.io/utils/net"
v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/resource"
@ -47,6 +49,7 @@ import (
utilsysctl "k8s.io/component-helpers/node/util/sysctl"
schedulinghelper "k8s.io/component-helpers/scheduling/corev1"
kubeletapis "k8s.io/kubelet/pkg/apis"
apiservice "k8s.io/kubernetes/pkg/api/service"
"k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/core/helper"
@ -57,7 +60,6 @@ import (
"k8s.io/kubernetes/pkg/cluster/ports"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/fieldpath"
netutils "k8s.io/utils/net"
)
const isNegativeErrorMsg string = apimachineryvalidation.IsNegativeErrorMsg
@ -7517,7 +7519,13 @@ func validateOS(podSpec *core.PodSpec, fldPath *field.Path, opts PodValidationOp
return allErrs
}
func ValidatePodLogOptions(opts *core.PodLogOptions) field.ErrorList {
var validLogStreams = sets.New[string](
core.LogStreamStdout,
core.LogStreamStderr,
core.LogStreamAll,
)
func ValidatePodLogOptions(opts *core.PodLogOptions, allowStreamSelection bool) field.ErrorList {
allErrs := field.ErrorList{}
if opts.TailLines != nil && *opts.TailLines < 0 {
allErrs = append(allErrs, field.Invalid(field.NewPath("tailLines"), *opts.TailLines, isNegativeErrorMsg))
@ -7533,6 +7541,20 @@ func ValidatePodLogOptions(opts *core.PodLogOptions) field.ErrorList {
allErrs = append(allErrs, field.Invalid(field.NewPath("sinceSeconds"), *opts.SinceSeconds, "must be greater than 0"))
}
}
if allowStreamSelection {
if opts.Stream == nil {
allErrs = append(allErrs, field.Required(field.NewPath("stream"), "must be specified"))
} else {
if !validLogStreams.Has(*opts.Stream) {
allErrs = append(allErrs, field.NotSupported(field.NewPath("stream"), *opts.Stream, validLogStreams.UnsortedList()))
}
if *opts.Stream != core.LogStreamAll && opts.TailLines != nil {
allErrs = append(allErrs, field.Forbidden(field.NewPath(""), "`tailLines` and specific `stream` are mutually exclusive for now"))
}
}
} else if opts.Stream != nil {
allErrs = append(allErrs, field.Forbidden(field.NewPath("stream"), "may not be specified"))
}
return allErrs
}

View File

@ -21022,29 +21022,79 @@ func TestValidPodLogOptions(t *testing.T) {
negative := int64(-1)
zero := int64(0)
positive := int64(1)
stdoutStream := core.LogStreamStdout
stderrStream := core.LogStreamStderr
allStream := core.LogStreamAll
invalidStream := "invalid"
tests := []struct {
opt core.PodLogOptions
errs int
opt core.PodLogOptions
errs int
allowStreamSelection bool
}{
{core.PodLogOptions{}, 0},
{core.PodLogOptions{Previous: true}, 0},
{core.PodLogOptions{Follow: true}, 0},
{core.PodLogOptions{TailLines: &zero}, 0},
{core.PodLogOptions{TailLines: &negative}, 1},
{core.PodLogOptions{TailLines: &positive}, 0},
{core.PodLogOptions{LimitBytes: &zero}, 1},
{core.PodLogOptions{LimitBytes: &negative}, 1},
{core.PodLogOptions{LimitBytes: &positive}, 0},
{core.PodLogOptions{SinceSeconds: &negative}, 1},
{core.PodLogOptions{SinceSeconds: &positive}, 0},
{core.PodLogOptions{SinceSeconds: &zero}, 1},
{core.PodLogOptions{SinceTime: &now}, 0},
{core.PodLogOptions{}, 0, false},
{core.PodLogOptions{Previous: true}, 0, false},
{core.PodLogOptions{Follow: true}, 0, false},
{core.PodLogOptions{TailLines: &zero}, 0, false},
{core.PodLogOptions{TailLines: &negative}, 1, false},
{core.PodLogOptions{TailLines: &positive}, 0, false},
{core.PodLogOptions{LimitBytes: &zero}, 1, false},
{core.PodLogOptions{LimitBytes: &negative}, 1, false},
{core.PodLogOptions{LimitBytes: &positive}, 0, false},
{core.PodLogOptions{SinceSeconds: &negative}, 1, false},
{core.PodLogOptions{SinceSeconds: &positive}, 0, false},
{core.PodLogOptions{SinceSeconds: &zero}, 1, false},
{core.PodLogOptions{SinceTime: &now}, 0, false},
{
opt: core.PodLogOptions{
Stream: &stdoutStream,
},
allowStreamSelection: false,
errs: 1,
},
{
opt: core.PodLogOptions{
Stream: &stdoutStream,
},
allowStreamSelection: true,
},
{
opt: core.PodLogOptions{
Stream: &invalidStream,
},
allowStreamSelection: true,
errs: 1,
},
{
opt: core.PodLogOptions{
Stream: &stderrStream,
TailLines: &positive,
},
allowStreamSelection: true,
errs: 1,
},
{
opt: core.PodLogOptions{
Stream: &allStream,
TailLines: &positive,
},
allowStreamSelection: true,
},
{
opt: core.PodLogOptions{
Stream: &stdoutStream,
LimitBytes: &positive,
SinceTime: &now,
},
allowStreamSelection: true,
},
}
for i, test := range tests {
errs := ValidatePodLogOptions(&test.opt)
if test.errs != len(errs) {
t.Errorf("%d: Unexpected errors: %v", i, errs)
}
t.Run(fmt.Sprintf("case-%d", i), func(t *testing.T) {
errs := ValidatePodLogOptions(&test.opt, test.allowStreamSelection)
if test.errs != len(errs) {
t.Errorf("%d: Unexpected errors: %v", i, errs)
}
})
}
}

View File

@ -3937,6 +3937,11 @@ func (in *PodLogOptions) DeepCopyInto(out *PodLogOptions) {
*out = new(int64)
**out = **in
}
if in.Stream != nil {
in, out := &in.Stream, &out.Stream
*out = new(string)
**out = **in
}
return
}

View File

@ -466,6 +466,13 @@ const (
// Set pod completion index as a pod label for Indexed Jobs.
PodIndexLabel featuregate.Feature = "PodIndexLabel"
// owner: @knight42
// kep: https://kep.k8s.io/3288
// alpha: v1.32
//
// Enables only stdout or stderr of the container to be retrievd.
PodLogsQuerySplitStreams featuregate.Feature = "PodLogsQuerySplitStreams"
// owner: @ddebroy, @kannon92
//
// Enables reporting of PodReadyToStartContainersCondition condition in pod status after pod

View File

@ -703,6 +703,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
{Version: version.MustParse("1.32"), Default: true, LockToDefault: true, PreRelease: featuregate.GA},
},
PodLogsQuerySplitStreams: {
{Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Alpha},
},
StatefulSetAutoDeletePVC: {
{Version: version.MustParse("1.23"), Default: false, PreRelease: featuregate.Alpha},
{Version: version.MustParse("1.27"), Default: true, PreRelease: featuregate.Beta},

View File

@ -27710,7 +27710,7 @@ func schema_k8sio_api_core_v1_PodLogOptions(ref common.ReferenceCallback) common
},
"tailLines": {
SchemaProps: spec.SchemaProps{
Description: "If set, the number of lines from the end of the logs to show. If not specified, logs are shown from the creation of the container or sinceSeconds or sinceTime",
Description: "If set, the number of lines from the end of the logs to show. If not specified, logs are shown from the creation of the container or sinceSeconds or sinceTime. Note that when \"TailLines\" is specified, \"Stream\" can only be set to nil or \"All\".",
Type: []string{"integer"},
Format: "int64",
},
@ -27729,6 +27729,13 @@ func schema_k8sio_api_core_v1_PodLogOptions(ref common.ReferenceCallback) common
Format: "",
},
},
"stream": {
SchemaProps: spec.SchemaProps{
Description: "Specify which container log stream to return to the client. Acceptable values are \"All\", \"Stdout\" and \"Stderr\". If not specified, \"All\" is used, and both stdout and stderr are returned interleaved. Note that when \"TailLines\" is specified, \"Stream\" can only be set to nil or \"All\".",
Type: []string{"string"},
Format: "",
},
},
},
},
},

View File

@ -1563,11 +1563,14 @@ func (kl *Kubelet) GetKubeletContainerLogs(ctx context.Context, podFullName, con
return err
}
// Do a zero-byte write to stdout before handing off to the container runtime.
// This ensures at least one Write call is made to the writer when copying starts,
// even if we then block waiting for log output from the container.
if _, err := stdout.Write([]byte{}); err != nil {
return err
// Since v1.32, stdout may be nil if the stream is not requested.
if stdout != nil {
// Do a zero-byte write to stdout before handing off to the container runtime.
// This ensures at least one Write call is made to the writer when copying starts,
// even if we then block waiting for log output from the container.
if _, err := stdout.Write([]byte{}); err != nil {
return err
}
}
return kl.containerRuntime.GetContainerLogs(ctx, pod, containerID, logOptions, stdout, stderr)

View File

@ -41,9 +41,9 @@ import (
oteltrace "go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
"k8s.io/utils/clock"
netutils "k8s.io/utils/net"
"k8s.io/utils/ptr"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -83,6 +83,7 @@ import (
apisgrpc "k8s.io/kubernetes/pkg/kubelet/apis/grpc"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
"k8s.io/kubernetes/pkg/kubelet/prober"
servermetrics "k8s.io/kubernetes/pkg/kubelet/server/metrics"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
@ -723,6 +724,13 @@ func (s *Server) getContainerLogs(request *restful.Request, response *restful.Re
response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Unable to decode query."}`))
return
}
if utilfeature.DefaultFeatureGate.Enabled(features.PodLogsQuerySplitStreams) {
// Even with defaulters, logOptions.Stream can be nil if no arguments are provided at all.
if logOptions.Stream == nil {
// Default to "All" to maintain backward compatibility.
logOptions.Stream = ptr.To(v1.LogStreamAll)
}
}
logOptions.TypeMeta = metav1.TypeMeta{}
if errs := validation.ValidatePodLogOptions(logOptions); len(errs) > 0 {
response.WriteError(http.StatusUnprocessableEntity, fmt.Errorf(`{"message": "Invalid request."}`))
@ -744,9 +752,40 @@ func (s *Server) getContainerLogs(request *restful.Request, response *restful.Re
response.WriteError(http.StatusInternalServerError, fmt.Errorf("unable to convert %v into http.Flusher, cannot show logs", reflect.TypeOf(response)))
return
}
fw := flushwriter.Wrap(response.ResponseWriter)
var (
stdout io.Writer
stderr io.Writer
fw = flushwriter.Wrap(response.ResponseWriter)
)
if utilfeature.DefaultFeatureGate.Enabled(features.PodLogsQuerySplitStreams) {
wantedStream := logOptions.Stream
// No stream type specified, default to All
if wantedStream == nil {
allStream := v1.LogStreamAll
wantedStream = &allStream
}
switch *wantedStream {
case v1.LogStreamStdout:
stdout, stderr = fw, nil
case v1.LogStreamStderr:
stdout, stderr = nil, fw
case v1.LogStreamAll:
stdout, stderr = fw, fw
default:
_ = response.WriteError(http.StatusBadRequest, fmt.Errorf("invalid stream type %q", *logOptions.Stream))
return
}
} else {
if logOptions.Stream != nil && *logOptions.Stream != v1.LogStreamAll {
_ = response.WriteError(http.StatusBadRequest, fmt.Errorf("unable to return the given log stream: %q. Please enable PodLogsQuerySplitStreams feature gate in kubelet", *logOptions.Stream))
return
}
stdout, stderr = fw, fw
}
response.Header().Set("Transfer-Encoding", "chunked")
if err := s.host.GetKubeletContainerLogs(ctx, kubecontainer.GetPodFullName(pod), containerName, logOptions, fw, fw); err != nil {
if err := s.host.GetKubeletContainerLogs(ctx, kubecontainer.GetPodFullName(pod), containerName, logOptions, stdout, stderr); err != nil {
response.WriteError(http.StatusBadRequest, err)
return
}

View File

@ -37,6 +37,8 @@ import (
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/utils/ptr"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
@ -50,7 +52,6 @@ import (
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/utils/ptr"
// Do some initialization to decode the query parameters correctly.
"k8s.io/apiserver/pkg/server/healthz"
@ -820,29 +821,41 @@ func TestContainerLogs(t *testing.T) {
}
for desc, test := range tests {
t.Run(desc, func(t *testing.T) {
output := "foo bar"
podNamespace := "other"
podName := "foo"
expectedPodName := getPodName(podName, podNamespace)
expectedContainerName := "baz"
setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
setGetContainerLogsFunc(fw, t, expectedPodName, expectedContainerName, test.podLogOption, output)
resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName + test.query)
if err != nil {
t.Errorf("Got error GETing: %v", err)
}
defer resp.Body.Close()
// To make sure the original behavior doesn't change no matter the feature PodLogsQuerySplitStreams is enabled or not.
for _, enablePodLogsQuerySplitStreams := range []bool{true, false} {
t.Run(fmt.Sprintf("%s (enablePodLogsQuerySplitStreams=%v)", desc, enablePodLogsQuerySplitStreams), func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodLogsQuerySplitStreams, enablePodLogsQuerySplitStreams)
expectedLogOptions := test.podLogOption.DeepCopy()
if enablePodLogsQuerySplitStreams && expectedLogOptions.Stream == nil {
// The HTTP handler will internally set the default stream value.
expectedLogOptions.Stream = ptr.To(v1.LogStreamAll)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
t.Errorf("Error reading container logs: %v", err)
}
result := string(body)
if result != output {
t.Errorf("Expected: '%v', got: '%v'", output, result)
}
})
output := "foo bar"
podNamespace := "other"
podName := "foo"
expectedPodName := getPodName(podName, podNamespace)
expectedContainerName := "baz"
setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
setGetContainerLogsFunc(fw, t, expectedPodName, expectedContainerName, expectedLogOptions, output)
resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName + test.query)
if err != nil {
t.Errorf("Got error GETing: %v", err)
}
defer func() {
_ = resp.Body.Close()
}()
body, err := io.ReadAll(resp.Body)
if err != nil {
t.Errorf("Error reading container logs: %v", err)
}
result := string(body)
if result != output {
t.Errorf("Expected: '%v', got: '%v'", output, result)
}
})
}
}
}
@ -866,6 +879,220 @@ func TestContainerLogsWithInvalidTail(t *testing.T) {
}
}
func TestContainerLogsWithSeparateStream(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodLogsQuerySplitStreams, true)
type logEntry struct {
stream string
msg string
}
fw := newServerTest()
defer fw.testHTTPServer.Close()
var (
streamStdout = v1.LogStreamStdout
streamStderr = v1.LogStreamStderr
streamAll = v1.LogStreamAll
)
testCases := []struct {
name string
query string
logs []logEntry
expectedOutput string
expectedLogOptions *v1.PodLogOptions
}{
{
// Defaulters don't work if the query is empty.
// See also https://github.com/kubernetes/kubernetes/issues/128589
name: "empty query should return all logs",
logs: []logEntry{
{stream: v1.LogStreamStdout, msg: "foo\n"},
{stream: v1.LogStreamStderr, msg: "bar\n"},
},
query: "",
expectedLogOptions: &v1.PodLogOptions{
Stream: &streamAll,
},
expectedOutput: "foo\nbar\n",
},
{
name: "missing stream param should return all logs",
logs: []logEntry{
{stream: v1.LogStreamStdout, msg: "foo\n"},
{stream: v1.LogStreamStderr, msg: "bar\n"},
},
query: "?limitBytes=100",
expectedLogOptions: &v1.PodLogOptions{
Stream: &streamAll,
LimitBytes: ptr.To[int64](100),
},
expectedOutput: "foo\nbar\n",
},
{
name: "only stdout logs",
logs: []logEntry{
{stream: v1.LogStreamStdout, msg: "out1\n"},
{stream: v1.LogStreamStderr, msg: "err1\n"},
{stream: v1.LogStreamStdout, msg: "out2\n"},
},
query: "?stream=Stdout",
expectedLogOptions: &v1.PodLogOptions{
Stream: &streamStdout,
},
expectedOutput: "out1\nout2\n",
},
{
name: "only stderr logs",
logs: []logEntry{
{stream: v1.LogStreamStderr, msg: "err1\n"},
{stream: v1.LogStreamStderr, msg: "err2\n"},
{stream: v1.LogStreamStdout, msg: "out1\n"},
},
query: "?stream=Stderr",
expectedLogOptions: &v1.PodLogOptions{
Stream: &streamStderr,
},
expectedOutput: "err1\nerr2\n",
},
{
name: "return all logs",
logs: []logEntry{
{stream: v1.LogStreamStdout, msg: "out1\n"},
{stream: v1.LogStreamStderr, msg: "err1\n"},
{stream: v1.LogStreamStdout, msg: "out2\n"},
},
query: "?stream=All",
expectedLogOptions: &v1.PodLogOptions{
Stream: &streamAll,
},
expectedOutput: "out1\nerr1\nout2\n",
},
{
name: "stdout logs with legacy tail",
logs: []logEntry{
{stream: v1.LogStreamStdout, msg: "out1\n"},
{stream: v1.LogStreamStderr, msg: "err1\n"},
{stream: v1.LogStreamStdout, msg: "out2\n"},
},
query: "?stream=All&tail=1",
expectedLogOptions: &v1.PodLogOptions{
Stream: &streamAll,
TailLines: ptr.To[int64](1),
},
expectedOutput: "out2\n",
},
{
name: "return the last 2 lines of logs",
logs: []logEntry{
{stream: v1.LogStreamStdout, msg: "out1\n"},
{stream: v1.LogStreamStderr, msg: "err1\n"},
{stream: v1.LogStreamStdout, msg: "out2\n"},
},
query: "?stream=All&tailLines=2",
expectedLogOptions: &v1.PodLogOptions{
Stream: &streamAll,
TailLines: ptr.To[int64](2),
},
expectedOutput: "err1\nout2\n",
},
{
name: "return the first 6 bytes of the stdout log stream",
logs: []logEntry{
{stream: v1.LogStreamStderr, msg: "err1\n"},
{stream: v1.LogStreamStdout, msg: "out1\n"},
{stream: v1.LogStreamStderr, msg: "err2\n"},
{stream: v1.LogStreamStdout, msg: "out2\n"},
},
query: "?stream=Stdout&limitBytes=6",
expectedLogOptions: &v1.PodLogOptions{
Stream: &streamStdout,
LimitBytes: ptr.To[int64](6),
},
expectedOutput: "out1\no",
},
{
name: "invalid stream",
logs: []logEntry{
{stream: v1.LogStreamStderr, msg: "err1\n"},
{stream: v1.LogStreamStdout, msg: "out1\n"},
},
query: "?stream=invalid",
expectedLogOptions: nil,
expectedOutput: `{"message": "Invalid request."}`,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
podNamespace := "other"
podName := "foo"
expectedContainerName := "baz"
setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
fw.fakeKubelet.containerLogsFunc = func(_ context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error {
if !reflect.DeepEqual(tc.expectedLogOptions, logOptions) {
t.Errorf("expected %#v, got %#v", tc.expectedLogOptions, logOptions)
}
var dst io.Writer
tailLines := len(tc.logs)
if logOptions.TailLines != nil {
tailLines = int(*logOptions.TailLines)
}
remain := 0
if logOptions.LimitBytes != nil {
remain = int(*logOptions.LimitBytes)
} else {
for _, log := range tc.logs {
remain += len(log.msg)
}
}
logs := tc.logs[len(tc.logs)-tailLines:]
for _, log := range logs {
switch log.stream {
case v1.LogStreamStdout:
dst = stdout
case v1.LogStreamStderr:
dst = stderr
}
// Skip if the stream is not requested
if dst == nil {
continue
}
line := log.msg
if len(line) > remain {
line = line[:remain]
}
_, _ = io.WriteString(dst, line)
remain -= len(line)
if remain <= 0 {
return nil
}
}
return nil
}
resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName + tc.query)
if err != nil {
t.Errorf("Got error GETing: %v", err)
}
defer func() {
_ = resp.Body.Close()
}()
body, err := io.ReadAll(resp.Body)
if err != nil {
t.Errorf("Error reading container logs: %v", err)
}
result := string(body)
if result != tc.expectedOutput {
t.Errorf("Expected: %q, got: %q", tc.expectedOutput, result)
}
})
}
}
func TestCheckpointContainer(t *testing.T) {
podNamespace := "other"
podName := "foo"

View File

@ -21,6 +21,8 @@ import (
"fmt"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
@ -86,7 +88,10 @@ func (r *LogREST) Get(ctx context.Context, name string, opts runtime.Object) (ru
countSkipTLSMetric(logOpts.InsecureSkipTLSVerifyBackend)
if errs := validation.ValidatePodLogOptions(logOpts); len(errs) > 0 {
if !utilfeature.DefaultFeatureGate.Enabled(features.PodLogsQuerySplitStreams) {
logOpts.Stream = nil
}
if errs := validation.ValidatePodLogOptions(logOpts, utilfeature.DefaultFeatureGate.Enabled(features.PodLogsQuerySplitStreams)); len(errs) > 0 {
return nil, errors.NewInvalid(api.Kind("PodLogOptions"), name, errs)
}
location, transport, err := pod.LogLocation(ctx, r.Store, r.KubeletConn, name, logOpts)

View File

@ -26,6 +26,9 @@ import (
"strings"
"time"
netutils "k8s.io/utils/net"
"sigs.k8s.io/structured-merge-diff/v4/fieldpath"
apiv1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
@ -50,8 +53,6 @@ import (
corevalidation "k8s.io/kubernetes/pkg/apis/core/validation"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/client"
netutils "k8s.io/utils/net"
"sigs.k8s.io/structured-merge-diff/v4/fieldpath"
)
// podStrategy implements behavior for Pods
@ -563,6 +564,10 @@ func LogLocation(
if opts.LimitBytes != nil {
params.Add("limitBytes", strconv.FormatInt(*opts.LimitBytes, 10))
}
if utilfeature.DefaultFeatureGate.Enabled(features.PodLogsQuerySplitStreams) {
// With defaulters, We can be confident that opts.Stream is not nil here.
params.Add("stream", string(*opts.Stream))
}
loc := &url.URL{
Scheme: nodeInfo.Scheme,
Host: net.JoinHostPort(nodeInfo.Hostname, nodeInfo.Port),

View File

@ -10,6 +10,7 @@
- k8s.io/kubernetes/pkg/util
- k8s.io/api/core/v1
- k8s.io/utils/pointer
- k8s.io/utils/ptr
- k8s.io/utils/net
- k8s.io/klog

File diff suppressed because it is too large Load Diff

View File

@ -3836,7 +3836,8 @@ message PodLogOptions {
optional bool timestamps = 6;
// If set, the number of lines from the end of the logs to show. If not specified,
// logs are shown from the creation of the container or sinceSeconds or sinceTime
// logs are shown from the creation of the container or sinceSeconds or sinceTime.
// Note that when "TailLines" is specified, "Stream" can only be set to nil or "All".
// +optional
optional int64 tailLines = 7;
@ -3854,6 +3855,14 @@ message PodLogOptions {
// the actual log data coming from the real kubelet).
// +optional
optional bool insecureSkipTLSVerifyBackend = 9;
// Specify which container log stream to return to the client.
// Acceptable values are "All", "Stdout" and "Stderr". If not specified, "All" is used, and both stdout and stderr
// are returned interleaved.
// Note that when "TailLines" is specified, "Stream" can only be set to nil or "All".
// +featureGate=PodLogsQuerySplitStreams
// +optional
optional string stream = 10;
}
// PodOS defines the OS parameters of a pod.

View File

@ -6643,6 +6643,15 @@ type Preconditions struct {
UID *types.UID `json:"uid,omitempty" protobuf:"bytes,1,opt,name=uid,casttype=k8s.io/apimachinery/pkg/types.UID"`
}
const (
// LogStreamStdout is the stream type for stdout.
LogStreamStdout = "Stdout"
// LogStreamStderr is the stream type for stderr.
LogStreamStderr = "Stderr"
// LogStreamAll represents the combined stdout and stderr.
LogStreamAll = "All"
)
// +k8s:conversion-gen:explicit-from=net/url.Values
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +k8s:prerelease-lifecycle-gen:introduced=1.0
@ -6677,7 +6686,8 @@ type PodLogOptions struct {
// +optional
Timestamps bool `json:"timestamps,omitempty" protobuf:"varint,6,opt,name=timestamps"`
// If set, the number of lines from the end of the logs to show. If not specified,
// logs are shown from the creation of the container or sinceSeconds or sinceTime
// logs are shown from the creation of the container or sinceSeconds or sinceTime.
// Note that when "TailLines" is specified, "Stream" can only be set to nil or "All".
// +optional
TailLines *int64 `json:"tailLines,omitempty" protobuf:"varint,7,opt,name=tailLines"`
// If set, the number of bytes to read from the server before terminating the
@ -6694,6 +6704,14 @@ type PodLogOptions struct {
// the actual log data coming from the real kubelet).
// +optional
InsecureSkipTLSVerifyBackend bool `json:"insecureSkipTLSVerifyBackend,omitempty" protobuf:"varint,9,opt,name=insecureSkipTLSVerifyBackend"`
// Specify which container log stream to return to the client.
// Acceptable values are "All", "Stdout" and "Stderr". If not specified, "All" is used, and both stdout and stderr
// are returned interleaved.
// Note that when "TailLines" is specified, "Stream" can only be set to nil or "All".
// +featureGate=PodLogsQuerySplitStreams
// +optional
Stream *string `json:"stream,omitempty" protobuf:"varint,10,opt,name=stream"`
}
// +k8s:conversion-gen:explicit-from=net/url.Values

View File

@ -1683,9 +1683,10 @@ var map_PodLogOptions = map[string]string{
"sinceSeconds": "A relative time in seconds before the current time from which to show logs. If this value precedes the time a pod was started, only logs since the pod start will be returned. If this value is in the future, no logs will be returned. Only one of sinceSeconds or sinceTime may be specified.",
"sinceTime": "An RFC3339 timestamp from which to show logs. If this value precedes the time a pod was started, only logs since the pod start will be returned. If this value is in the future, no logs will be returned. Only one of sinceSeconds or sinceTime may be specified.",
"timestamps": "If true, add an RFC3339 or RFC3339Nano timestamp at the beginning of every line of log output. Defaults to false.",
"tailLines": "If set, the number of lines from the end of the logs to show. If not specified, logs are shown from the creation of the container or sinceSeconds or sinceTime",
"tailLines": "If set, the number of lines from the end of the logs to show. If not specified, logs are shown from the creation of the container or sinceSeconds or sinceTime. Note that when \"TailLines\" is specified, \"Stream\" can only be set to nil or \"All\".",
"limitBytes": "If set, the number of bytes to read from the server before terminating the log output. This may not display a complete final line of logging, and may return slightly more or slightly less than the specified limit.",
"insecureSkipTLSVerifyBackend": "insecureSkipTLSVerifyBackend indicates that the apiserver should not confirm the validity of the serving certificate of the backend it is connecting to. This will make the HTTPS connection between the apiserver and the backend insecure. This means the apiserver cannot verify the log data it is receiving came from the real kubelet. If the kubelet is configured to verify the apiserver's TLS credentials, it does not mean the connection to the real kubelet is vulnerable to a man in the middle attack (e.g. an attacker could not intercept the actual log data coming from the real kubelet).",
"stream": "Specify which container log stream to return to the client. Acceptable values are \"All\", \"Stdout\" and \"Stderr\". If not specified, \"All\" is used, and both stdout and stderr are returned interleaved. Note that when \"TailLines\" is specified, \"Stream\" can only be set to nil or \"All\".",
}
func (PodLogOptions) SwaggerDoc() map[string]string {

View File

@ -3935,6 +3935,11 @@ func (in *PodLogOptions) DeepCopyInto(out *PodLogOptions) {
*out = new(int64)
**out = **in
}
if in.Stream != nil {
in, out := &in.Stream, &out.Stream
*out = new(string)
**out = **in
}
return
}

View File

@ -9,5 +9,6 @@
"timestamps": true,
"tailLines": 7,
"limitBytes": 8,
"insecureSkipTLSVerifyBackend": true
"insecureSkipTLSVerifyBackend": true,
"stream": "streamValue"
}

View File

@ -7,5 +7,6 @@ limitBytes: 8
previous: true
sinceSeconds: 4
sinceTime: "2005-01-01T01:01:01Z"
stream: streamValue
tailLines: 7
timestamps: true

View File

@ -1216,6 +1216,8 @@ func typeToJSON(typeName string) string {
return "string"
case "v1.IncludeObjectPolicy", "*v1.IncludeObjectPolicy":
return "string"
case "*string":
return "string"
// TODO: Fix these when go-restful supports a way to specify an array query param:
// https://github.com/emicklei/go-restful/issues/225

View File

@ -267,6 +267,11 @@ func (w *logWriter) write(msg *logMessage, addPrefix bool) error {
default:
return fmt.Errorf("unexpected stream type %q", msg.stream)
}
// Since v1.32, either w.stdout or w.stderr may be nil if the stream is not requested.
// In such case, we should neither count the bytes nor write to the stream.
if stream == nil {
return nil
}
n, err := stream.Write(line)
w.remain -= int64(n)
if err != nil {

View File

@ -20,6 +20,7 @@ import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
@ -28,6 +29,7 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -540,3 +542,57 @@ func TestReadLogsLimitsWithTimestamps(t *testing.T) {
assert.Equal(t, 2, lineCount, "should have two lines")
}
func TestOnlyStdoutStream(t *testing.T) {
timestamp := time.Unix(1234, 43210)
msgs := []*logMessage{
{
timestamp: timestamp,
stream: runtimeapi.Stdout,
log: []byte("out1\n"),
},
{
timestamp: timestamp,
stream: runtimeapi.Stderr,
log: []byte("err1\n"),
},
{
timestamp: timestamp,
stream: runtimeapi.Stdout,
log: []byte("out2\n"),
},
}
testCases := map[string]struct {
limitBytes int64
expectedStdout string
}{
"all stdout logs": {
limitBytes: -1,
expectedStdout: "out1\nout2\n",
},
"the first 7 bytes from stdout": {
limitBytes: 7,
expectedStdout: "out1\nou",
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
stdoutBuf := bytes.NewBuffer(nil)
w := newLogWriter(stdoutBuf, nil, &LogOptions{
bytes: tc.limitBytes,
})
for _, msg := range msgs {
err := w.write(msg, false)
if errors.Is(err, errMaximumWrite) {
continue
}
require.NoError(t, err)
}
assert.EqualValues(t, tc.expectedStdout, stdoutBuf.String())
})
}
}

View File

@ -275,6 +275,11 @@ var (
// Marks a single test that tests Pod Lifecycle Sleep action with zero duration. Requires feature gate PodLifecycleSleepActionAllowZero to be enabled.
PodLifecycleSleepActionAllowZero = framework.WithFeature(framework.ValidFeatures.Add("PodLifecycleSleepActionAllowZero"))
// Owner: sig-node
// Marks tests that require a cluster with PodLogsQuerySplitStreams
// (used for testing specific log stream <https://kep.k8s.io/3288>)
PodLogsQuerySplitStreams = framework.WithFeature(framework.ValidFeatures.Add("PodLogsQuerySplitStreams"))
// TODO: document the feature (owning SIG, when to use this feature for a test)
PodPriority = framework.WithFeature(framework.ValidFeatures.Add("PodPriority"))

View File

@ -28,12 +28,15 @@ import (
"time"
"github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
admissionapi "k8s.io/pod-security-admission/api"
"k8s.io/kubernetes/test/e2e/feature"
"k8s.io/kubernetes/test/e2e/framework"
e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl"
@ -46,7 +49,6 @@ import (
e2evolume "k8s.io/kubernetes/test/e2e/framework/volume"
testutils "k8s.io/kubernetes/test/utils"
imageutils "k8s.io/kubernetes/test/utils/image"
admissionapi "k8s.io/pod-security-admission/api"
"github.com/onsi/ginkgo/v2"
)
@ -144,7 +146,7 @@ func createPodUsingNfs(ctx context.Context, f *framework.Framework, c clientset.
},
},
},
RestartPolicy: v1.RestartPolicyNever, //don't restart pod
RestartPolicy: v1.RestartPolicyNever, // don't restart pod
Volumes: []v1.Volume{
{
Name: "nfs-vol",
@ -640,6 +642,76 @@ var _ = SIGDescribe("kubelet", func() {
})
})
var _ = SIGDescribe("specific log stream", feature.PodLogsQuerySplitStreams, func() {
var (
c clientset.Interface
ns string
)
f := framework.NewDefaultFramework("pod-log-stream")
f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
ginkgo.BeforeEach(func() {
c = f.ClientSet
ns = f.Namespace.Name
})
ginkgo.It("kubectl get --raw /api/v1/namespaces/default/pods/<pod-name>/log?stream", func(ctx context.Context) {
ginkgo.By("create pod")
pod := &v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
GenerateName: "log-stream-",
Namespace: ns,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "log-stream",
Image: imageutils.GetE2EImage(imageutils.BusyBox),
Command: []string{"/bin/sh"},
Args: []string{"-c", "echo out1; echo err1 >&2; tail -f /dev/null"},
},
},
RestartPolicy: v1.RestartPolicyNever, // don't restart pod
},
}
rtnPod, err := c.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{})
framework.ExpectNoError(err)
err = e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, rtnPod.Name, f.Namespace.Name, framework.PodStartTimeout) // running & ready
framework.ExpectNoError(err)
rtnPod, err = c.CoreV1().Pods(ns).Get(ctx, rtnPod.Name, metav1.GetOptions{}) // return fresh pod
framework.ExpectNoError(err)
ginkgo.By("Starting the command")
tk := e2ekubectl.NewTestKubeconfig(framework.TestContext.CertDir, framework.TestContext.Host, framework.TestContext.KubeConfig, framework.TestContext.KubeContext, framework.TestContext.KubectlPath, ns)
queryCommand := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/log?stream=All", rtnPod.Namespace, rtnPod.Name)
cmd := tk.KubectlCmd("get", "--raw", queryCommand)
result := runKubectlCommand(cmd)
// the order of the logs is indeterminate
assertContains("out1", result)
assertContains("err1", result)
queryCommand = fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/log?stream=Stdout", rtnPod.Namespace, rtnPod.Name)
cmd = tk.KubectlCmd("get", "--raw", queryCommand)
result = runKubectlCommand(cmd)
assertContains("out1", result)
assertNotContains("err1", result)
queryCommand = fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/log?stream=Stderr", rtnPod.Namespace, rtnPod.Name)
cmd = tk.KubectlCmd("get", "--raw", queryCommand)
result = runKubectlCommand(cmd)
assertContains("err1", result)
assertNotContains("out1", result)
})
})
func getLinuxNodes(nodes *v1.NodeList) *v1.NodeList {
filteredNodes := nodes
e2enode.Filter(filteredNodes, func(node v1.Node) bool {
@ -696,6 +768,13 @@ func assertContains(expectedString string, result string) {
framework.Failf("Failed to find \"%s\"", expectedString)
}
func assertNotContains(expectedString string, result string) {
if !strings.Contains(result, expectedString) {
return
}
framework.Failf("Found unexpected \"%s\"", expectedString)
}
func commandOnNode(nodeName string, cmd string) string {
result, err := e2essh.NodeExec(context.Background(), nodeName, cmd, framework.TestContext.Provider)
framework.ExpectNoError(err)

View File

@ -904,6 +904,12 @@
lockToDefault: false
preRelease: Alpha
version: "1.32"
- name: PodLogsQuerySplitStreams
versionedSpecs:
- default: false
lockToDefault: false
preRelease: Alpha
version: "1.32"
- name: PodReadyToStartContainersCondition
versionedSpecs:
- default: false