Wire context to logs command and add interrupt handler (#127503)

* Wire context to logs command and add interrupt handler

* Move conditional outside of interrupt handler
This commit is contained in:
Arda Güçlü 2024-10-22 21:49:06 +03:00 committed by GitHub
parent 1caf9a150b
commit 5826868586
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 29 additions and 21 deletions

View File

@ -945,12 +945,12 @@ func (o *DebugOptions) handleAttachPod(ctx context.Context, restClientGetter gen
} }
if status.State.Terminated != nil { if status.State.Terminated != nil {
klog.V(1).Info("Ephemeral container terminated, falling back to logs") klog.V(1).Info("Ephemeral container terminated, falling back to logs")
return logOpts(restClientGetter, pod, opts) return logOpts(ctx, restClientGetter, pod, opts)
} }
if err := opts.Run(); err != nil { if err := opts.Run(); err != nil {
fmt.Fprintf(opts.ErrOut, "warning: couldn't attach to pod/%s, falling back to streaming logs: %v\n", podName, err) fmt.Fprintf(opts.ErrOut, "warning: couldn't attach to pod/%s, falling back to streaming logs: %v\n", podName, err)
return logOpts(restClientGetter, pod, opts) return logOpts(ctx, restClientGetter, pod, opts)
} }
return nil return nil
} }
@ -968,7 +968,7 @@ func getContainerStatusByName(pod *corev1.Pod, containerName string) *corev1.Con
} }
// logOpts logs output from opts to the pods log. // logOpts logs output from opts to the pods log.
func logOpts(restClientGetter genericclioptions.RESTClientGetter, pod *corev1.Pod, opts *attach.AttachOptions) error { func logOpts(ctx context.Context, restClientGetter genericclioptions.RESTClientGetter, pod *corev1.Pod, opts *attach.AttachOptions) error {
ctrName, err := opts.GetContainerName(pod) ctrName, err := opts.GetContainerName(pod)
if err != nil { if err != nil {
return err return err
@ -979,7 +979,7 @@ func logOpts(restClientGetter genericclioptions.RESTClientGetter, pod *corev1.Po
return err return err
} }
for _, request := range requests { for _, request := range requests {
if err := logs.DefaultConsumeRequest(request, opts.Out); err != nil { if err := logs.DefaultConsumeRequest(ctx, request, opts.Out); err != nil {
return err return err
} }
} }

View File

@ -41,6 +41,7 @@ import (
"k8s.io/kubectl/pkg/util" "k8s.io/kubectl/pkg/util"
"k8s.io/kubectl/pkg/util/completion" "k8s.io/kubectl/pkg/util/completion"
"k8s.io/kubectl/pkg/util/i18n" "k8s.io/kubectl/pkg/util/i18n"
"k8s.io/kubectl/pkg/util/interrupt"
"k8s.io/kubectl/pkg/util/templates" "k8s.io/kubectl/pkg/util/templates"
) )
@ -124,7 +125,7 @@ type LogsOptions struct {
Options runtime.Object Options runtime.Object
Resources []string Resources []string
ConsumeRequestFn func(rest.ResponseWrapper, io.Writer) error ConsumeRequestFn func(context.Context, rest.ResponseWrapper, io.Writer) error
// PodLogOptions // PodLogOptions
SinceTime string SinceTime string
@ -375,14 +376,21 @@ func (o LogsOptions) RunLogs() error {
len(requests), o.MaxFollowConcurrency, len(requests), o.MaxFollowConcurrency,
) )
} }
return o.parallelConsumeRequest(requests)
} }
return o.sequentialConsumeRequest(requests) ctx, cancel := context.WithCancel(context.Background())
defer cancel()
intr := interrupt.New(nil, cancel)
return intr.Run(func() error {
if o.Follow && len(requests) > 1 {
return o.parallelConsumeRequest(ctx, requests)
}
return o.sequentialConsumeRequest(ctx, requests)
})
} }
func (o LogsOptions) parallelConsumeRequest(requests map[corev1.ObjectReference]rest.ResponseWrapper) error { func (o LogsOptions) parallelConsumeRequest(ctx context.Context, requests map[corev1.ObjectReference]rest.ResponseWrapper) error {
reader, writer := io.Pipe() reader, writer := io.Pipe()
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
wg.Add(len(requests)) wg.Add(len(requests))
@ -390,7 +398,7 @@ func (o LogsOptions) parallelConsumeRequest(requests map[corev1.ObjectReference]
go func(objRef corev1.ObjectReference, request rest.ResponseWrapper) { go func(objRef corev1.ObjectReference, request rest.ResponseWrapper) {
defer wg.Done() defer wg.Done()
out := o.addPrefixIfNeeded(objRef, writer) out := o.addPrefixIfNeeded(objRef, writer)
if err := o.ConsumeRequestFn(request, out); err != nil { if err := o.ConsumeRequestFn(ctx, request, out); err != nil {
if !o.IgnoreLogErrors { if !o.IgnoreLogErrors {
writer.CloseWithError(err) writer.CloseWithError(err)
@ -413,10 +421,10 @@ func (o LogsOptions) parallelConsumeRequest(requests map[corev1.ObjectReference]
return err return err
} }
func (o LogsOptions) sequentialConsumeRequest(requests map[corev1.ObjectReference]rest.ResponseWrapper) error { func (o LogsOptions) sequentialConsumeRequest(ctx context.Context, requests map[corev1.ObjectReference]rest.ResponseWrapper) error {
for objRef, request := range requests { for objRef, request := range requests {
out := o.addPrefixIfNeeded(objRef, o.Out) out := o.addPrefixIfNeeded(objRef, o.Out)
if err := o.ConsumeRequestFn(request, out); err != nil { if err := o.ConsumeRequestFn(ctx, request, out); err != nil {
if !o.IgnoreLogErrors { if !o.IgnoreLogErrors {
return err return err
} }
@ -457,8 +465,8 @@ func (o LogsOptions) addPrefixIfNeeded(ref corev1.ObjectReference, writer io.Wri
// A successful read returns err == nil, not err == io.EOF. // A successful read returns err == nil, not err == io.EOF.
// Because the function is defined to read from request until io.EOF, it does // Because the function is defined to read from request until io.EOF, it does
// not treat an io.EOF as an error to be reported. // not treat an io.EOF as an error to be reported.
func DefaultConsumeRequest(request rest.ResponseWrapper, out io.Writer) error { func DefaultConsumeRequest(ctx context.Context, request rest.ResponseWrapper, out io.Writer) error {
readCloser, err := request.Stream(context.TODO()) readCloser, err := request.Stream(ctx)
if err != nil { if err != nil {
return err return err
} }

View File

@ -304,7 +304,7 @@ func TestLog(t *testing.T) {
o := NewLogsOptions(streams) o := NewLogsOptions(streams)
o.LogsForObject = mock.mockLogsForObject o.LogsForObject = mock.mockLogsForObject
o.ConsumeRequestFn = func(req restclient.ResponseWrapper, out io.Writer) error { o.ConsumeRequestFn = func(ctx context.Context, req restclient.ResponseWrapper, out io.Writer) error {
return errors.New("Error from the ConsumeRequestFn") return errors.New("Error from the ConsumeRequestFn")
} }
return o return o
@ -378,7 +378,7 @@ func TestLog(t *testing.T) {
o := NewLogsOptions(streams) o := NewLogsOptions(streams)
o.LogsForObject = mock.mockLogsForObject o.LogsForObject = mock.mockLogsForObject
o.ConsumeRequestFn = func(req restclient.ResponseWrapper, out io.Writer) error { o.ConsumeRequestFn = func(ctx context.Context, req restclient.ResponseWrapper, out io.Writer) error {
return errors.New("Error from the ConsumeRequestFn") return errors.New("Error from the ConsumeRequestFn")
} }
o.Follow = true o.Follow = true
@ -401,7 +401,7 @@ func TestLog(t *testing.T) {
o := NewLogsOptions(streams) o := NewLogsOptions(streams)
o.LogsForObject = mock.mockLogsForObject o.LogsForObject = mock.mockLogsForObject
o.ConsumeRequestFn = func(req restclient.ResponseWrapper, out io.Writer) error { o.ConsumeRequestFn = func(ctx context.Context, req restclient.ResponseWrapper, out io.Writer) error {
return errors.New("Error from the ConsumeRequestFn") return errors.New("Error from the ConsumeRequestFn")
} }
o.Follow = true o.Follow = true
@ -808,7 +808,7 @@ func TestDefaultConsumeRequest(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
buf := &bytes.Buffer{} buf := &bytes.Buffer{}
err := DefaultConsumeRequest(test.request, buf) err := DefaultConsumeRequest(context.TODO(), test.request, buf)
if err != nil && !strings.Contains(err.Error(), test.expectedErr) { if err != nil && !strings.Contains(err.Error(), test.expectedErr) {
t.Errorf("%s: expected to find:\n\t%s\nfound:\n\t%s\n", test.name, test.expectedErr, err.Error()) t.Errorf("%s: expected to find:\n\t%s\nfound:\n\t%s\n", test.name, test.expectedErr, err.Error())
@ -932,8 +932,8 @@ type logTestMock struct {
wg *sync.WaitGroup wg *sync.WaitGroup
} }
func (l *logTestMock) mockConsumeRequest(request restclient.ResponseWrapper, out io.Writer) error { func (l *logTestMock) mockConsumeRequest(ctx context.Context, request restclient.ResponseWrapper, out io.Writer) error {
readCloser, err := request.Stream(context.Background()) readCloser, err := request.Stream(ctx)
if err != nil { if err != nil {
return err return err
} }

View File

@ -504,7 +504,7 @@ func logOpts(restClientGetter genericclioptions.RESTClientGetter, pod *corev1.Po
return err return err
} }
for _, request := range requests { for _, request := range requests {
if err := logs.DefaultConsumeRequest(request, opts.Out); err != nil { if err := logs.DefaultConsumeRequest(context.Background(), request, opts.Out); err != nil {
return err return err
} }
} }