diff --git a/staging/src/k8s.io/client-go/rest/request.go b/staging/src/k8s.io/client-go/rest/request.go index dfe5e0addce..f0fc7d1eba5 100644 --- a/staging/src/k8s.io/client-go/rest/request.go +++ b/staging/src/k8s.io/client-go/rest/request.go @@ -450,11 +450,9 @@ func (r *Request) Body(obj interface{}) *Request { r.err = err return r } - glogBody("Request Body", data) r.body = nil r.bodyBytes = data case []byte: - glogBody("Request Body", t) r.body = nil r.bodyBytes = t case io.Reader: @@ -475,7 +473,6 @@ func (r *Request) Body(obj interface{}) *Request { r.err = err return r } - glogBody("Request Body", data) r.body = nil r.bodyBytes = data r.SetHeader("Content-Type", r.c.content.ContentType) @@ -704,6 +701,10 @@ func (b *throttledLogger) Infof(message string, args ...interface{}) { // Watch attempts to begin watching the requested location. // Returns a watch.Interface, or an error. func (r *Request) Watch(ctx context.Context) (watch.Interface, error) { + if r.body == nil { + logBody(ctx, 2, "Request Body", r.bodyBytes) + } + // We specifically don't want to rate limit watches, so we // don't use r.rateLimiter here. if r.err != nil { @@ -752,7 +753,7 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) { // the server must have sent us an error in 'err' return true, nil } - result := r.transformResponse(resp, req) + result := r.transformResponse(ctx, resp, req) if err := result.Error(); err != nil { return true, err } @@ -845,6 +846,10 @@ func (r WatchListResult) Into(obj runtime.Object) error { // Check the documentation https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists // to see what parameters are currently required. func (r *Request) WatchList(ctx context.Context) WatchListResult { + if r.body == nil { + logBody(ctx, 2, "Request Body", r.bodyBytes) + } + if !clientfeatures.FeatureGates().Enabled(clientfeatures.WatchListClient) { return WatchListResult{err: fmt.Errorf("%q feature gate is not enabled", clientfeatures.WatchListClient)} } @@ -969,6 +974,10 @@ func sanitize(req *Request, resp *http.Response, err error) (string, string) { // Any non-2xx http status code causes an error. If we get a non-2xx code, we try to convert the body into an APIStatus object. // If we can, we return that as an error. Otherwise, we create an error that lists the http status and the content of the response. func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) { + if r.body == nil { + logBody(ctx, 2, "Request Body", r.bodyBytes) + } + if r.err != nil { return nil, r.err } @@ -1012,7 +1021,7 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) { if retry.IsNextRetry(ctx, r, req, resp, err, neverRetryError) { return false, nil } - result := r.transformResponse(resp, req) + result := r.transformResponse(ctx, resp, req) if err := result.Error(); err != nil { return true, err } @@ -1199,9 +1208,13 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp // - If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError // - http.Client.Do errors are returned directly. func (r *Request) Do(ctx context.Context) Result { + if r.body == nil { + logBody(ctx, 2, "Request Body", r.bodyBytes) + } + var result Result err := r.request(ctx, func(req *http.Request, resp *http.Response) { - result = r.transformResponse(resp, req) + result = r.transformResponse(ctx, resp, req) }) if err != nil { return Result{err: err} @@ -1214,10 +1227,14 @@ func (r *Request) Do(ctx context.Context) Result { // DoRaw executes the request but does not process the response body. func (r *Request) DoRaw(ctx context.Context) ([]byte, error) { + if r.body == nil { + logBody(ctx, 2, "Request Body", r.bodyBytes) + } + var result Result err := r.request(ctx, func(req *http.Request, resp *http.Response) { result.body, result.err = io.ReadAll(resp.Body) - glogBody("Response Body", result.body) + logBody(ctx, 2, "Response Body", result.body) if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent { result.err = r.transformUnstructuredResponseError(resp, req, result.body) } @@ -1232,7 +1249,7 @@ func (r *Request) DoRaw(ctx context.Context) ([]byte, error) { } // transformResponse converts an API response into a structured API object -func (r *Request) transformResponse(resp *http.Response, req *http.Request) Result { +func (r *Request) transformResponse(ctx context.Context, resp *http.Response, req *http.Request) Result { var body []byte if resp.Body != nil { data, err := io.ReadAll(resp.Body) @@ -1261,7 +1278,8 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu } } - glogBody("Response Body", body) + // Call depth is tricky. This one is okay for Do and DoRaw. + logBody(ctx, 7, "Response Body", body) // verify the content type is accurate var decoder runtime.Decoder @@ -1321,14 +1339,14 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu } // truncateBody decides if the body should be truncated, based on the glog Verbosity. -func truncateBody(body string) string { +func truncateBody(logger klog.Logger, body string) string { max := 0 switch { - case bool(klog.V(10).Enabled()): + case bool(logger.V(10).Enabled()): return body - case bool(klog.V(9).Enabled()): + case bool(logger.V(9).Enabled()): max = 10240 - case bool(klog.V(8).Enabled()): + case bool(logger.V(8).Enabled()): max = 1024 } @@ -1339,17 +1357,21 @@ func truncateBody(body string) string { return body[:max] + fmt.Sprintf(" [truncated %d chars]", len(body)-max) } -// glogBody logs a body output that could be either JSON or protobuf. It explicitly guards against +// logBody logs a body output that could be either JSON or protobuf. It explicitly guards against // allocating a new string for the body output unless necessary. Uses a simple heuristic to determine // whether the body is printable. -func glogBody(prefix string, body []byte) { - if klogV := klog.V(8); klogV.Enabled() { +// +// It needs to be called by all functions which send or receive the data. +func logBody(ctx context.Context, callDepth int, prefix string, body []byte) { + logger := klog.FromContext(ctx) + if loggerV := logger.V(8); loggerV.Enabled() { + loggerV := loggerV.WithCallDepth(callDepth) if bytes.IndexFunc(body, func(r rune) bool { return r < 0x0a }) != -1 { - klogV.Infof("%s:\n%s", prefix, truncateBody(hex.Dump(body))) + loggerV.Info(prefix, "body", truncateBody(logger, hex.Dump(body))) } else { - klogV.Infof("%s: %s", prefix, truncateBody(string(body))) + loggerV.Info(prefix, "body", truncateBody(logger, string(body))) } } } diff --git a/staging/src/k8s.io/client-go/rest/request_test.go b/staging/src/k8s.io/client-go/rest/request_test.go index 38c95c99f49..7d87c384648 100644 --- a/staging/src/k8s.io/client-go/rest/request_test.go +++ b/staging/src/k8s.io/client-go/rest/request_test.go @@ -28,7 +28,10 @@ import ( "net/http/httptest" "net/url" "os" + "path" "reflect" + "regexp" + goruntime "runtime" "strings" "sync" "sync/atomic" @@ -36,6 +39,9 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" @@ -54,6 +60,7 @@ import ( "k8s.io/client-go/util/flowcontrol" utiltesting "k8s.io/client-go/util/testing" "k8s.io/klog/v2" + "k8s.io/klog/v2/ktesting" testingclock "k8s.io/utils/clock/testing" ) @@ -553,6 +560,7 @@ func TestURLTemplate(t *testing.T) { } func TestTransformResponse(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) invalid := []byte("aaaaa") uri, _ := url.Parse("http://localhost") testCases := []struct { @@ -601,7 +609,7 @@ func TestTransformResponse(t *testing.T) { if test.Response.Body == nil { test.Response.Body = io.NopCloser(bytes.NewReader([]byte{})) } - result := r.transformResponse(test.Response, &http.Request{}) + result := r.transformResponse(ctx, test.Response, &http.Request{}) response, created, err := result.body, result.statusCode == http.StatusCreated, result.err hasErr := err != nil if hasErr != test.Error { @@ -652,6 +660,7 @@ func (r *renegotiator) StreamDecoder(contentType string, params map[string]strin } func TestTransformResponseNegotiate(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) invalid := []byte("aaaaa") uri, _ := url.Parse("http://localhost") testCases := []struct { @@ -765,7 +774,7 @@ func TestTransformResponseNegotiate(t *testing.T) { if test.Response.Body == nil { test.Response.Body = io.NopCloser(bytes.NewReader([]byte{})) } - result := r.transformResponse(test.Response, &http.Request{}) + result := r.transformResponse(ctx, test.Response, &http.Request{}) _, err := result.body, result.err hasErr := err != nil if hasErr != test.Error { @@ -890,6 +899,7 @@ func TestTransformUnstructuredError(t *testing.T) { for _, testCase := range testCases { t.Run("", func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) r := &Request{ c: &RESTClient{ content: defaultContentConfig(), @@ -897,7 +907,7 @@ func TestTransformUnstructuredError(t *testing.T) { resourceName: testCase.Name, resource: testCase.Resource, } - result := r.transformResponse(testCase.Res, testCase.Req) + result := r.transformResponse(ctx, testCase.Res, testCase.Req) err := result.err if !testCase.ErrFn(err) { t.Fatalf("unexpected error: %v", err) @@ -2331,7 +2341,7 @@ func TestTruncateBody(t *testing.T) { l := flag.Lookup("v").Value.(flag.Getter).Get().(klog.Level) for _, test := range tests { flag.Set("v", test.level) - got := truncateBody(test.body) + got := truncateBody(klog.Background(), test.body) if got != test.want { t.Errorf("truncateBody(%v) = %v, want %v", test.body, got, test.want) } @@ -4051,3 +4061,78 @@ func TestRequestConcurrencyWithRetry(t *testing.T) { t.Errorf("Expected attempts: %d, but got: %d", expected, attempts) } } + +func TestRequestLogging(t *testing.T) { + testcases := map[string]struct { + v int + body any + expectedOutput string + }{ + "no-output": { + v: 7, + body: []byte("ping"), + }, + "output": { + v: 8, + body: []byte("ping"), + expectedOutput: `] "Request Body" logger="TestLogger" body="ping" +] "Response Body" logger="TestLogger" body="pong" +`, + }, + "io-reader": { + v: 8, + body: strings.NewReader("ping"), + // Cannot log the request body! + expectedOutput: `] "Response Body" logger="TestLogger" body="pong" +`, + }, + "truncate": { + v: 8, + body: []byte(strings.Repeat("a", 2000)), + expectedOutput: fmt.Sprintf(`] "Request Body" logger="TestLogger" body="%s [truncated 976 chars]" +] "Response Body" logger="TestLogger" body="pong" +`, strings.Repeat("a", 1024)), + }, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + state := klog.CaptureState() + defer state.Restore() + + var buffer bytes.Buffer + klog.SetOutput(&buffer) + klog.LogToStderr(false) + var fs flag.FlagSet + klog.InitFlags(&fs) + require.NoError(t, fs.Set("v", fmt.Sprintf("%d", tc.v)), "set verbosity") + + client := clientForFunc(func(req *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader("pong")), + }, nil + }) + + req := NewRequestWithClient(nil, "", defaultContentConfig(), client). + Body(tc.body) + + logger := klog.Background() + logger = klog.LoggerWithName(logger, "TestLogger") + ctx := klog.NewContext(context.Background(), logger) + + _, file, line, _ := goruntime.Caller(0) + result := req.Do(ctx) + require.NoError(t, result.Error(), "request.Do") + + // Compare log output: + // - strip date/time/pid from each line (fixed length header) + // - replace with the actual call location + state.Restore() + expectedOutput := strings.ReplaceAll(tc.expectedOutput, "", fmt.Sprintf("%s:%d", path.Base(file), line+1)) + actualOutput := buffer.String() + actualOutput = regexp.MustCompile(`(?m)^.{30}`).ReplaceAllString(actualOutput, "") + assert.Equal(t, expectedOutput, actualOutput) + }) + } +}