diff --git a/pkg/kubectl/cmd/logs/logs.go b/pkg/kubectl/cmd/logs/logs.go index 5f8f3162b19..82565e3f9ef 100644 --- a/pkg/kubectl/cmd/logs/logs.go +++ b/pkg/kubectl/cmd/logs/logs.go @@ -17,10 +17,12 @@ limitations under the License. package logs import ( + "bufio" "errors" "fmt" "io" "os" + "sync" "time" "github.com/spf13/cobra" @@ -59,6 +61,9 @@ var ( # Begin streaming the logs of the ruby container in pod web-1 kubectl logs -f -c ruby web-1 + # Begin streaming the logs from all containers in pods defined by label app=nginx + kubectl logs -f -lapp=nginx --all-containers=true + # Display only the most recent 20 lines of output in pod nginx kubectl logs --tail=20 nginx @@ -86,7 +91,7 @@ type LogsOptions struct { Options runtime.Object Resources []string - ConsumeRequestFn func(*rest.Request, io.Writer) error + ConsumeRequestFn func(rest.ResponseWrapper, io.Writer) error // PodLogOptions SinceTime string @@ -101,6 +106,7 @@ type LogsOptions struct { // whether or not a container name was given via --container ContainerNameSpecified bool Selector string + MaxFollowConcurency int Object runtime.Object GetPodTimeout time.Duration @@ -112,9 +118,10 @@ type LogsOptions struct { func NewLogsOptions(streams genericclioptions.IOStreams, allContainers bool) *LogsOptions { return &LogsOptions{ - IOStreams: streams, - AllContainers: allContainers, - Tail: -1, + IOStreams: streams, + AllContainers: allContainers, + Tail: -1, + MaxFollowConcurency: 5, } } @@ -151,6 +158,7 @@ func NewCmdLogs(f cmdutil.Factory, streams genericclioptions.IOStreams) *cobra.C cmd.Flags().StringVarP(&o.Container, "container", "c", o.Container, "Print the logs of this container") cmdutil.AddPodRunningTimeoutFlag(cmd, defaultPodLogsTimeout) cmd.Flags().StringVarP(&o.Selector, "selector", "l", o.Selector, "Selector (label query) to filter on.") + cmd.Flags().IntVar(&o.MaxFollowConcurency, "max-log-requests", o.MaxFollowConcurency, "Specify maximum number of concurrent logs to follow when using by a selector. Defaults to 5.") return cmd } @@ -256,10 +264,6 @@ func (o *LogsOptions) Complete(f cmdutil.Factory, cmd *cobra.Command, args []str } func (o LogsOptions) Validate() error { - if o.Follow && len(o.Selector) > 0 { - return fmt.Errorf("only one of follow (-f) or selector (-l) is allowed") - } - if len(o.SinceTime) > 0 && o.SinceSeconds != 0 { return fmt.Errorf("at most one of `sinceTime` or `sinceSeconds` may be specified") } @@ -298,6 +302,47 @@ func (o LogsOptions) RunLogs() error { return err } + if o.Follow && len(requests) > 1 { + if len(requests) > o.MaxFollowConcurency { + return fmt.Errorf( + "you are attempting to follow %d log streams, but maximum allowed concurency is %d, use --max-log-requests to increase the limit", + len(requests), o.MaxFollowConcurency, + ) + } + + return o.parallelConsumeRequest(requests) + } + + return o.sequentialConsumeRequest(requests) +} + +func (o LogsOptions) parallelConsumeRequest(requests []rest.ResponseWrapper) error { + reader, writer := io.Pipe() + wg := &sync.WaitGroup{} + wg.Add(len(requests)) + for _, request := range requests { + go func(request rest.ResponseWrapper) { + if err := o.ConsumeRequestFn(request, writer); err != nil { + writer.CloseWithError(err) + + // It's important to return here to propagate the error via the pipe + return + } + + wg.Done() + }(request) + } + + go func() { + wg.Wait() + writer.Close() + }() + + _, err := io.Copy(o.Out, reader) + return err +} + +func (o LogsOptions) sequentialConsumeRequest(requests []rest.ResponseWrapper) error { for _, request := range requests { if err := o.ConsumeRequestFn(request, o.Out); err != nil { return err @@ -307,13 +352,33 @@ func (o LogsOptions) RunLogs() error { return nil } -func DefaultConsumeRequest(request *rest.Request, out io.Writer) error { +// DefaultConsumeRequest reads the data from request and writes into +// the out writer. It buffers data from requests until the newline or io.EOF +// occurs in the data, so it doesn't interleave logs sub-line +// when running concurrently. +// +// A successful read returns err == nil, not err == io.EOF. +// Because the function is defined to read from request until io.EOF, it does +// not treat an io.EOF as an error to be reported. +func DefaultConsumeRequest(request rest.ResponseWrapper, out io.Writer) error { readCloser, err := request.Stream() if err != nil { return err } defer readCloser.Close() - _, err = io.Copy(out, readCloser) - return err + r := bufio.NewReader(readCloser) + for { + bytes, err := r.ReadBytes('\n') + if _, err := out.Write(bytes); err != nil { + return err + } + + if err != nil { + if err != io.EOF { + return err + } + return nil + } + } } diff --git a/pkg/kubectl/cmd/logs/logs_test.go b/pkg/kubectl/cmd/logs/logs_test.go index d7329d91e42..bbac96aed33 100644 --- a/pkg/kubectl/cmd/logs/logs_test.go +++ b/pkg/kubectl/cmd/logs/logs_test.go @@ -17,11 +17,15 @@ limitations under the License. package logs import ( + "bytes" "errors" "fmt" "io" + "io/ioutil" "strings" + "sync" "testing" + "testing/iotest" "time" corev1 "k8s.io/api/core/v1" @@ -34,36 +38,198 @@ import ( func TestLog(t *testing.T) { tests := []struct { - name, version, podPath, logPath string - pod *corev1.Pod + name string + opts func(genericclioptions.IOStreams) *LogsOptions + expectedErr string + expectedOutSubstrings []string }{ { name: "v1 - pod log", - pod: testPod(), + opts: func(streams genericclioptions.IOStreams) *LogsOptions { + mock := &logTestMock{ + logsForObjectRequests: []restclient.ResponseWrapper{ + &responseWrapperMock{data: strings.NewReader("test log content\n")}, + }, + } + + o := NewLogsOptions(streams, false) + o.LogsForObject = mock.mockLogsForObject + o.ConsumeRequestFn = mock.mockConsumeRequest + + return o + }, + expectedOutSubstrings: []string{"test log content\n"}, + }, + { + name: "get logs from multiple requests sequentially", + opts: func(streams genericclioptions.IOStreams) *LogsOptions { + mock := &logTestMock{ + logsForObjectRequests: []restclient.ResponseWrapper{ + &responseWrapperMock{data: strings.NewReader("test log content from source 1\n")}, + &responseWrapperMock{data: strings.NewReader("test log content from source 2\n")}, + &responseWrapperMock{data: strings.NewReader("test log content from source 3\n")}, + }, + } + + o := NewLogsOptions(streams, false) + o.LogsForObject = mock.mockLogsForObject + o.ConsumeRequestFn = mock.mockConsumeRequest + return o + }, + expectedOutSubstrings: []string{ + // Order in this case muse always by the same, because we read requests sequentially + "test log content from source 1\ntest log content from source 2\ntest log content from source 3\n", + }, + }, + { + name: "follow logs from multiple requests concurrently", + opts: func(streams genericclioptions.IOStreams) *LogsOptions { + wg := &sync.WaitGroup{} + mock := &logTestMock{ + logsForObjectRequests: []restclient.ResponseWrapper{ + &responseWrapperMock{data: strings.NewReader("test log content from source 1\n")}, + &responseWrapperMock{data: strings.NewReader("test log content from source 2\n")}, + &responseWrapperMock{data: strings.NewReader("test log content from source 3\n")}, + }, + wg: wg, + } + wg.Add(3) + + o := NewLogsOptions(streams, false) + o.LogsForObject = mock.mockLogsForObject + o.ConsumeRequestFn = mock.mockConsumeRequest + o.Follow = true + return o + }, + expectedOutSubstrings: []string{ + "test log content from source 1\n", + "test log content from source 2\n", + "test log content from source 3\n", + }, + }, + { + name: "fail to follow logs from multiple requests when there are more logs sources then MaxFollowConcurency allows", + opts: func(streams genericclioptions.IOStreams) *LogsOptions { + wg := &sync.WaitGroup{} + mock := &logTestMock{ + logsForObjectRequests: []restclient.ResponseWrapper{ + &responseWrapperMock{data: strings.NewReader("test log content\n")}, + &responseWrapperMock{data: strings.NewReader("test log content\n")}, + &responseWrapperMock{data: strings.NewReader("test log content\n")}, + }, + wg: wg, + } + wg.Add(3) + + o := NewLogsOptions(streams, false) + o.LogsForObject = mock.mockLogsForObject + o.ConsumeRequestFn = mock.mockConsumeRequest + o.MaxFollowConcurency = 2 + o.Follow = true + return o + }, + expectedErr: "you are attempting to follow 3 log streams, but maximum allowed concurency is 2, use --max-log-requests to increase the limit", + }, + { + name: "fail if LogsForObject fails", + opts: func(streams genericclioptions.IOStreams) *LogsOptions { + o := NewLogsOptions(streams, false) + o.LogsForObject = func(restClientGetter genericclioptions.RESTClientGetter, object, options runtime.Object, timeout time.Duration, allContainers bool) ([]restclient.ResponseWrapper, error) { + return nil, errors.New("Error from the LogsForObject") + } + return o + }, + expectedErr: "Error from the LogsForObject", + }, + { + name: "fail to get logs, if ConsumeRequestFn fails", + opts: func(streams genericclioptions.IOStreams) *LogsOptions { + mock := &logTestMock{ + logsForObjectRequests: []restclient.ResponseWrapper{ + &responseWrapperMock{}, + &responseWrapperMock{}, + }, + } + + o := NewLogsOptions(streams, false) + o.LogsForObject = mock.mockLogsForObject + o.ConsumeRequestFn = func(req restclient.ResponseWrapper, out io.Writer) error { + return errors.New("Error from the ConsumeRequestFn") + } + return o + }, + expectedErr: "Error from the ConsumeRequestFn", + }, + { + name: "fail to follow logs from multiple requests, if ConsumeRequestFn fails", + opts: func(streams genericclioptions.IOStreams) *LogsOptions { + wg := &sync.WaitGroup{} + mock := &logTestMock{ + logsForObjectRequests: []restclient.ResponseWrapper{ + &responseWrapperMock{}, + &responseWrapperMock{}, + &responseWrapperMock{}, + }, + wg: wg, + } + wg.Add(3) + + o := NewLogsOptions(streams, false) + o.LogsForObject = mock.mockLogsForObject + o.ConsumeRequestFn = func(req restclient.ResponseWrapper, out io.Writer) error { + return errors.New("Error from the ConsumeRequestFn") + } + o.Follow = true + return o + }, + expectedErr: "Error from the ConsumeRequestFn", + }, + { + name: "fail to follow logs, if ConsumeRequestFn fails", + opts: func(streams genericclioptions.IOStreams) *LogsOptions { + mock := &logTestMock{ + logsForObjectRequests: []restclient.ResponseWrapper{&responseWrapperMock{}}, + } + + o := NewLogsOptions(streams, false) + o.LogsForObject = mock.mockLogsForObject + o.ConsumeRequestFn = func(req restclient.ResponseWrapper, out io.Writer) error { + return errors.New("Error from the ConsumeRequestFn") + } + o.Follow = true + return o + }, + expectedErr: "Error from the ConsumeRequestFn", }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - logContent := "test log content" tf := cmdtesting.NewTestFactory().WithNamespace("test") defer tf.Cleanup() streams, _, buf, _ := genericclioptions.NewTestIOStreams() - mock := &logTestMock{ - logsContent: logContent, + opts := test.opts(streams) + opts.Namespace = "test" + opts.Object = testPod() + opts.Options = &corev1.PodLogOptions{} + err := opts.RunLogs() + + if err == nil && len(test.expectedErr) > 0 { + t.Fatalf("expected error %q, got none", test.expectedErr) } - opts := NewLogsOptions(streams, false) - opts.Namespace = "test" - opts.Object = test.pod - opts.Options = &corev1.PodLogOptions{} - opts.LogsForObject = mock.mockLogsForObject - opts.ConsumeRequestFn = mock.mockConsumeRequest - opts.RunLogs() + if err != nil && !strings.Contains(err.Error(), test.expectedErr) { + t.Errorf("%s: expected to find:\n\t%s\nfound:\n\t%s\n", test.name, test.expectedErr, err.Error()) + } - if buf.String() != logContent { - t.Errorf("%s: did not get expected log content. Got: %s", test.name, buf.String()) + bufStr := buf.String() + if test.expectedOutSubstrings != nil { + for _, substr := range test.expectedOutSubstrings { + if !strings.Contains(bufStr, substr) { + t.Errorf("%s: expected to contain %#v. Output: %#v", test.name, substr, bufStr) + } + } } }) } @@ -199,23 +365,6 @@ func TestValidateLogOptions(t *testing.T) { args: []string{"my-pod", "my-container"}, expected: "only one of -c or an inline", }, - { - name: "follow and selector conflict", - opts: func(streams genericclioptions.IOStreams) *LogsOptions { - o := NewLogsOptions(streams, false) - o.Selector = "foo" - o.Follow = true - - var err error - o.Options, err = o.ToLogOptions() - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - return o - }, - expected: "only one of follow (-f) or selector (-l) is allowed", - }, } for _, test := range tests { streams := genericclioptions.NewTestIOStreamsDiscard() @@ -274,16 +423,114 @@ func TestLogComplete(t *testing.T) { } } +func TestDefaultConsumeRequest(t *testing.T) { + tests := []struct { + name string + request restclient.ResponseWrapper + expectedErr string + expectedOut string + }{ + { + name: "error from request stream", + request: &responseWrapperMock{ + err: errors.New("err from the stream"), + }, + expectedErr: "err from the stream", + }, + { + name: "error while reading", + request: &responseWrapperMock{ + data: iotest.TimeoutReader(strings.NewReader("Some data")), + }, + expectedErr: iotest.ErrTimeout.Error(), + expectedOut: "Some data", + }, + { + name: "read with empty string", + request: &responseWrapperMock{ + data: strings.NewReader(""), + }, + expectedOut: "", + }, + { + name: "read without new lines", + request: &responseWrapperMock{ + data: strings.NewReader("some string without a new line"), + }, + expectedOut: "some string without a new line", + }, + { + name: "read with newlines in the middle", + request: &responseWrapperMock{ + data: strings.NewReader("foo\nbar"), + }, + expectedOut: "foo\nbar", + }, + { + name: "read with newline at the end", + request: &responseWrapperMock{ + data: strings.NewReader("foo\n"), + }, + expectedOut: "foo\n", + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + buf := &bytes.Buffer{} + err := DefaultConsumeRequest(test.request, buf) + + if err != nil && !strings.Contains(err.Error(), test.expectedErr) { + t.Errorf("%s: expected to find:\n\t%s\nfound:\n\t%s\n", test.name, test.expectedErr, err.Error()) + } + + if buf.String() != test.expectedOut { + t.Errorf("%s: did not get expected log content. Got: %s", test.name, buf.String()) + } + }) + } +} + +type responseWrapperMock struct { + data io.Reader + err error +} + +func (r *responseWrapperMock) DoRaw() ([]byte, error) { + data, _ := ioutil.ReadAll(r.data) + return data, r.err +} + +func (r *responseWrapperMock) Stream() (io.ReadCloser, error) { + return ioutil.NopCloser(r.data), r.err +} + type logTestMock struct { - logsContent string + logsForObjectRequests []restclient.ResponseWrapper + + // We need a WaitGroup in some test cases to make sure that we fetch logs concurrently. + // These test cases will finish successfully without the WaitGroup, but the WaitGroup + // will help us to identify regression when someone accidentally changes + // concurrent fetching to sequential + wg *sync.WaitGroup } -func (l *logTestMock) mockConsumeRequest(req *restclient.Request, out io.Writer) error { - fmt.Fprintf(out, l.logsContent) - return nil +func (l *logTestMock) mockConsumeRequest(request restclient.ResponseWrapper, out io.Writer) error { + readCloser, err := request.Stream() + if err != nil { + return err + } + defer readCloser.Close() + + // Just copy everything for a test sake + _, err = io.Copy(out, readCloser) + if l.wg != nil { + l.wg.Done() + l.wg.Wait() + } + return err } -func (l *logTestMock) mockLogsForObject(restClientGetter genericclioptions.RESTClientGetter, object, options runtime.Object, timeout time.Duration, allContainers bool) ([]*restclient.Request, error) { +func (l *logTestMock) mockLogsForObject(restClientGetter genericclioptions.RESTClientGetter, object, options runtime.Object, timeout time.Duration, allContainers bool) ([]restclient.ResponseWrapper, error) { switch object.(type) { case *corev1.Pod: _, ok := options.(*corev1.PodLogOptions) @@ -291,7 +538,7 @@ func (l *logTestMock) mockLogsForObject(restClientGetter genericclioptions.RESTC return nil, errors.New("provided options object is not a PodLogOptions") } - return []*restclient.Request{{}}, nil + return l.logsForObjectRequests, nil default: return nil, fmt.Errorf("cannot get the logs from %T", object) } diff --git a/pkg/kubectl/polymorphichelpers/interface.go b/pkg/kubectl/polymorphichelpers/interface.go index f93ff0d2791..15cb482c7be 100644 --- a/pkg/kubectl/polymorphichelpers/interface.go +++ b/pkg/kubectl/polymorphichelpers/interface.go @@ -29,7 +29,7 @@ import ( ) // LogsForObjectFunc is a function type that can tell you how to get logs for a runtime.object -type LogsForObjectFunc func(restClientGetter genericclioptions.RESTClientGetter, object, options runtime.Object, timeout time.Duration, allContainers bool) ([]*rest.Request, error) +type LogsForObjectFunc func(restClientGetter genericclioptions.RESTClientGetter, object, options runtime.Object, timeout time.Duration, allContainers bool) ([]rest.ResponseWrapper, error) // LogsForObjectFn gives a way to easily override the function for unit testing if needed. var LogsForObjectFn LogsForObjectFunc = logsForObject diff --git a/pkg/kubectl/polymorphichelpers/logsforobject.go b/pkg/kubectl/polymorphichelpers/logsforobject.go index 85472a135ee..e8f3cddf243 100644 --- a/pkg/kubectl/polymorphichelpers/logsforobject.go +++ b/pkg/kubectl/polymorphichelpers/logsforobject.go @@ -32,7 +32,7 @@ import ( "k8s.io/kubernetes/pkg/kubectl/util/podutils" ) -func logsForObject(restClientGetter genericclioptions.RESTClientGetter, object, options runtime.Object, timeout time.Duration, allContainers bool) ([]*rest.Request, error) { +func logsForObject(restClientGetter genericclioptions.RESTClientGetter, object, options runtime.Object, timeout time.Duration, allContainers bool) ([]rest.ResponseWrapper, error) { clientConfig, err := restClientGetter.ToRESTConfig() if err != nil { return nil, err @@ -47,7 +47,7 @@ func logsForObject(restClientGetter genericclioptions.RESTClientGetter, object, // TODO: remove internal clientset once all callers use external versions // this is split for easy test-ability -func logsForObjectWithClient(clientset corev1client.CoreV1Interface, object, options runtime.Object, timeout time.Duration, allContainers bool) ([]*rest.Request, error) { +func logsForObjectWithClient(clientset corev1client.CoreV1Interface, object, options runtime.Object, timeout time.Duration, allContainers bool) ([]rest.ResponseWrapper, error) { opts, ok := options.(*corev1.PodLogOptions) if !ok { return nil, errors.New("provided options object is not a PodLogOptions") @@ -55,7 +55,7 @@ func logsForObjectWithClient(clientset corev1client.CoreV1Interface, object, opt switch t := object.(type) { case *corev1.PodList: - ret := []*rest.Request{} + ret := []rest.ResponseWrapper{} for i := range t.Items { currRet, err := logsForObjectWithClient(clientset, &t.Items[i], options, timeout, allContainers) if err != nil { @@ -68,10 +68,10 @@ func logsForObjectWithClient(clientset corev1client.CoreV1Interface, object, opt case *corev1.Pod: // if allContainers is true, then we're going to locate all containers and then iterate through them. At that point, "allContainers" is false if !allContainers { - return []*rest.Request{clientset.Pods(t.Namespace).GetLogs(t.Name, opts)}, nil + return []rest.ResponseWrapper{clientset.Pods(t.Namespace).GetLogs(t.Name, opts)}, nil } - ret := []*rest.Request{} + ret := []rest.ResponseWrapper{} for _, c := range t.Spec.InitContainers { currOpts := opts.DeepCopy() currOpts.Container = c.Name