From 5826868586d501060a6151d04c511236b11a26d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arda=20G=C3=BC=C3=A7l=C3=BC?= Date: Tue, 22 Oct 2024 21:49:06 +0300 Subject: [PATCH] Wire context to logs command and add interrupt handler (#127503) * Wire context to logs command and add interrupt handler * Move conditional outside of interrupt handler --- .../src/k8s.io/kubectl/pkg/cmd/debug/debug.go | 8 +++--- .../src/k8s.io/kubectl/pkg/cmd/logs/logs.go | 28 ++++++++++++------- .../k8s.io/kubectl/pkg/cmd/logs/logs_test.go | 12 ++++---- staging/src/k8s.io/kubectl/pkg/cmd/run/run.go | 2 +- 4 files changed, 29 insertions(+), 21 deletions(-) diff --git a/staging/src/k8s.io/kubectl/pkg/cmd/debug/debug.go b/staging/src/k8s.io/kubectl/pkg/cmd/debug/debug.go index bc2f14ec99a..1cb325be988 100644 --- a/staging/src/k8s.io/kubectl/pkg/cmd/debug/debug.go +++ b/staging/src/k8s.io/kubectl/pkg/cmd/debug/debug.go @@ -945,12 +945,12 @@ func (o *DebugOptions) handleAttachPod(ctx context.Context, restClientGetter gen } if status.State.Terminated != nil { klog.V(1).Info("Ephemeral container terminated, falling back to logs") - return logOpts(restClientGetter, pod, opts) + return logOpts(ctx, restClientGetter, pod, opts) } if err := opts.Run(); err != nil { fmt.Fprintf(opts.ErrOut, "warning: couldn't attach to pod/%s, falling back to streaming logs: %v\n", podName, err) - return logOpts(restClientGetter, pod, opts) + return logOpts(ctx, restClientGetter, pod, opts) } return nil } @@ -968,7 +968,7 @@ func getContainerStatusByName(pod *corev1.Pod, containerName string) *corev1.Con } // logOpts logs output from opts to the pods log. -func logOpts(restClientGetter genericclioptions.RESTClientGetter, pod *corev1.Pod, opts *attach.AttachOptions) error { +func logOpts(ctx context.Context, restClientGetter genericclioptions.RESTClientGetter, pod *corev1.Pod, opts *attach.AttachOptions) error { ctrName, err := opts.GetContainerName(pod) if err != nil { return err @@ -979,7 +979,7 @@ func logOpts(restClientGetter genericclioptions.RESTClientGetter, pod *corev1.Po return err } for _, request := range requests { - if err := logs.DefaultConsumeRequest(request, opts.Out); err != nil { + if err := logs.DefaultConsumeRequest(ctx, request, opts.Out); err != nil { return err } } diff --git a/staging/src/k8s.io/kubectl/pkg/cmd/logs/logs.go b/staging/src/k8s.io/kubectl/pkg/cmd/logs/logs.go index 25105d7181c..e7f9bd391ec 100644 --- a/staging/src/k8s.io/kubectl/pkg/cmd/logs/logs.go +++ b/staging/src/k8s.io/kubectl/pkg/cmd/logs/logs.go @@ -41,6 +41,7 @@ import ( "k8s.io/kubectl/pkg/util" "k8s.io/kubectl/pkg/util/completion" "k8s.io/kubectl/pkg/util/i18n" + "k8s.io/kubectl/pkg/util/interrupt" "k8s.io/kubectl/pkg/util/templates" ) @@ -124,7 +125,7 @@ type LogsOptions struct { Options runtime.Object Resources []string - ConsumeRequestFn func(rest.ResponseWrapper, io.Writer) error + ConsumeRequestFn func(context.Context, rest.ResponseWrapper, io.Writer) error // PodLogOptions SinceTime string @@ -375,14 +376,21 @@ func (o LogsOptions) RunLogs() error { len(requests), o.MaxFollowConcurrency, ) } - - return o.parallelConsumeRequest(requests) } - return o.sequentialConsumeRequest(requests) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + intr := interrupt.New(nil, cancel) + return intr.Run(func() error { + if o.Follow && len(requests) > 1 { + return o.parallelConsumeRequest(ctx, requests) + } + + return o.sequentialConsumeRequest(ctx, requests) + }) } -func (o LogsOptions) parallelConsumeRequest(requests map[corev1.ObjectReference]rest.ResponseWrapper) error { +func (o LogsOptions) parallelConsumeRequest(ctx context.Context, requests map[corev1.ObjectReference]rest.ResponseWrapper) error { reader, writer := io.Pipe() wg := &sync.WaitGroup{} wg.Add(len(requests)) @@ -390,7 +398,7 @@ func (o LogsOptions) parallelConsumeRequest(requests map[corev1.ObjectReference] go func(objRef corev1.ObjectReference, request rest.ResponseWrapper) { defer wg.Done() out := o.addPrefixIfNeeded(objRef, writer) - if err := o.ConsumeRequestFn(request, out); err != nil { + if err := o.ConsumeRequestFn(ctx, request, out); err != nil { if !o.IgnoreLogErrors { writer.CloseWithError(err) @@ -413,10 +421,10 @@ func (o LogsOptions) parallelConsumeRequest(requests map[corev1.ObjectReference] return err } -func (o LogsOptions) sequentialConsumeRequest(requests map[corev1.ObjectReference]rest.ResponseWrapper) error { +func (o LogsOptions) sequentialConsumeRequest(ctx context.Context, requests map[corev1.ObjectReference]rest.ResponseWrapper) error { for objRef, request := range requests { out := o.addPrefixIfNeeded(objRef, o.Out) - if err := o.ConsumeRequestFn(request, out); err != nil { + if err := o.ConsumeRequestFn(ctx, request, out); err != nil { if !o.IgnoreLogErrors { return err } @@ -457,8 +465,8 @@ func (o LogsOptions) addPrefixIfNeeded(ref corev1.ObjectReference, writer io.Wri // A successful read returns err == nil, not err == io.EOF. // Because the function is defined to read from request until io.EOF, it does // not treat an io.EOF as an error to be reported. -func DefaultConsumeRequest(request rest.ResponseWrapper, out io.Writer) error { - readCloser, err := request.Stream(context.TODO()) +func DefaultConsumeRequest(ctx context.Context, request rest.ResponseWrapper, out io.Writer) error { + readCloser, err := request.Stream(ctx) if err != nil { return err } diff --git a/staging/src/k8s.io/kubectl/pkg/cmd/logs/logs_test.go b/staging/src/k8s.io/kubectl/pkg/cmd/logs/logs_test.go index 36fa8a70403..ba025612495 100644 --- a/staging/src/k8s.io/kubectl/pkg/cmd/logs/logs_test.go +++ b/staging/src/k8s.io/kubectl/pkg/cmd/logs/logs_test.go @@ -304,7 +304,7 @@ func TestLog(t *testing.T) { o := NewLogsOptions(streams) o.LogsForObject = mock.mockLogsForObject - o.ConsumeRequestFn = func(req restclient.ResponseWrapper, out io.Writer) error { + o.ConsumeRequestFn = func(ctx context.Context, req restclient.ResponseWrapper, out io.Writer) error { return errors.New("Error from the ConsumeRequestFn") } return o @@ -378,7 +378,7 @@ func TestLog(t *testing.T) { o := NewLogsOptions(streams) o.LogsForObject = mock.mockLogsForObject - o.ConsumeRequestFn = func(req restclient.ResponseWrapper, out io.Writer) error { + o.ConsumeRequestFn = func(ctx context.Context, req restclient.ResponseWrapper, out io.Writer) error { return errors.New("Error from the ConsumeRequestFn") } o.Follow = true @@ -401,7 +401,7 @@ func TestLog(t *testing.T) { o := NewLogsOptions(streams) o.LogsForObject = mock.mockLogsForObject - o.ConsumeRequestFn = func(req restclient.ResponseWrapper, out io.Writer) error { + o.ConsumeRequestFn = func(ctx context.Context, req restclient.ResponseWrapper, out io.Writer) error { return errors.New("Error from the ConsumeRequestFn") } o.Follow = true @@ -808,7 +808,7 @@ func TestDefaultConsumeRequest(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { buf := &bytes.Buffer{} - err := DefaultConsumeRequest(test.request, buf) + err := DefaultConsumeRequest(context.TODO(), test.request, buf) if err != nil && !strings.Contains(err.Error(), test.expectedErr) { t.Errorf("%s: expected to find:\n\t%s\nfound:\n\t%s\n", test.name, test.expectedErr, err.Error()) @@ -932,8 +932,8 @@ type logTestMock struct { wg *sync.WaitGroup } -func (l *logTestMock) mockConsumeRequest(request restclient.ResponseWrapper, out io.Writer) error { - readCloser, err := request.Stream(context.Background()) +func (l *logTestMock) mockConsumeRequest(ctx context.Context, request restclient.ResponseWrapper, out io.Writer) error { + readCloser, err := request.Stream(ctx) if err != nil { return err } diff --git a/staging/src/k8s.io/kubectl/pkg/cmd/run/run.go b/staging/src/k8s.io/kubectl/pkg/cmd/run/run.go index 75e9846c3db..3a423c522ea 100644 --- a/staging/src/k8s.io/kubectl/pkg/cmd/run/run.go +++ b/staging/src/k8s.io/kubectl/pkg/cmd/run/run.go @@ -504,7 +504,7 @@ func logOpts(restClientGetter genericclioptions.RESTClientGetter, pod *corev1.Po return err } for _, request := range requests { - if err := logs.DefaultConsumeRequest(request, opts.Out); err != nil { + if err := logs.DefaultConsumeRequest(context.Background(), request, opts.Out); err != nil { return err } }