Merge pull request #126999 from pohly/log-client-go-rest-body

client-go/rest: contextual logging of request/response
This commit is contained in:
Kubernetes Prow Robot 2024-09-12 09:41:12 +01:00 committed by GitHub
commit 9e59765585
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 129 additions and 22 deletions

View File

@ -450,11 +450,9 @@ func (r *Request) Body(obj interface{}) *Request {
r.err = err r.err = err
return r return r
} }
glogBody("Request Body", data)
r.body = nil r.body = nil
r.bodyBytes = data r.bodyBytes = data
case []byte: case []byte:
glogBody("Request Body", t)
r.body = nil r.body = nil
r.bodyBytes = t r.bodyBytes = t
case io.Reader: case io.Reader:
@ -475,7 +473,6 @@ func (r *Request) Body(obj interface{}) *Request {
r.err = err r.err = err
return r return r
} }
glogBody("Request Body", data)
r.body = nil r.body = nil
r.bodyBytes = data r.bodyBytes = data
r.SetHeader("Content-Type", r.c.content.ContentType) r.SetHeader("Content-Type", r.c.content.ContentType)
@ -704,6 +701,10 @@ func (b *throttledLogger) Infof(message string, args ...interface{}) {
// Watch attempts to begin watching the requested location. // Watch attempts to begin watching the requested location.
// Returns a watch.Interface, or an error. // Returns a watch.Interface, or an error.
func (r *Request) Watch(ctx context.Context) (watch.Interface, error) { func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
if r.body == nil {
logBody(ctx, 2, "Request Body", r.bodyBytes)
}
// We specifically don't want to rate limit watches, so we // We specifically don't want to rate limit watches, so we
// don't use r.rateLimiter here. // don't use r.rateLimiter here.
if r.err != nil { if r.err != nil {
@ -752,7 +753,7 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
// the server must have sent us an error in 'err' // the server must have sent us an error in 'err'
return true, nil return true, nil
} }
result := r.transformResponse(resp, req) result := r.transformResponse(ctx, resp, req)
if err := result.Error(); err != nil { if err := result.Error(); err != nil {
return true, err return true, err
} }
@ -845,6 +846,10 @@ func (r WatchListResult) Into(obj runtime.Object) error {
// Check the documentation https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists // Check the documentation https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists
// to see what parameters are currently required. // to see what parameters are currently required.
func (r *Request) WatchList(ctx context.Context) WatchListResult { func (r *Request) WatchList(ctx context.Context) WatchListResult {
if r.body == nil {
logBody(ctx, 2, "Request Body", r.bodyBytes)
}
if !clientfeatures.FeatureGates().Enabled(clientfeatures.WatchListClient) { if !clientfeatures.FeatureGates().Enabled(clientfeatures.WatchListClient) {
return WatchListResult{err: fmt.Errorf("%q feature gate is not enabled", clientfeatures.WatchListClient)} return WatchListResult{err: fmt.Errorf("%q feature gate is not enabled", clientfeatures.WatchListClient)}
} }
@ -969,6 +974,10 @@ func sanitize(req *Request, resp *http.Response, err error) (string, string) {
// Any non-2xx http status code causes an error. If we get a non-2xx code, we try to convert the body into an APIStatus object. // Any non-2xx http status code causes an error. If we get a non-2xx code, we try to convert the body into an APIStatus object.
// If we can, we return that as an error. Otherwise, we create an error that lists the http status and the content of the response. // If we can, we return that as an error. Otherwise, we create an error that lists the http status and the content of the response.
func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) { func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
if r.body == nil {
logBody(ctx, 2, "Request Body", r.bodyBytes)
}
if r.err != nil { if r.err != nil {
return nil, r.err return nil, r.err
} }
@ -1012,7 +1021,7 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
if retry.IsNextRetry(ctx, r, req, resp, err, neverRetryError) { if retry.IsNextRetry(ctx, r, req, resp, err, neverRetryError) {
return false, nil return false, nil
} }
result := r.transformResponse(resp, req) result := r.transformResponse(ctx, resp, req)
if err := result.Error(); err != nil { if err := result.Error(); err != nil {
return true, err return true, err
} }
@ -1199,9 +1208,13 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
// - If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError // - If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
// - http.Client.Do errors are returned directly. // - http.Client.Do errors are returned directly.
func (r *Request) Do(ctx context.Context) Result { func (r *Request) Do(ctx context.Context) Result {
if r.body == nil {
logBody(ctx, 2, "Request Body", r.bodyBytes)
}
var result Result var result Result
err := r.request(ctx, func(req *http.Request, resp *http.Response) { err := r.request(ctx, func(req *http.Request, resp *http.Response) {
result = r.transformResponse(resp, req) result = r.transformResponse(ctx, resp, req)
}) })
if err != nil { if err != nil {
return Result{err: err} return Result{err: err}
@ -1214,10 +1227,14 @@ func (r *Request) Do(ctx context.Context) Result {
// DoRaw executes the request but does not process the response body. // DoRaw executes the request but does not process the response body.
func (r *Request) DoRaw(ctx context.Context) ([]byte, error) { func (r *Request) DoRaw(ctx context.Context) ([]byte, error) {
if r.body == nil {
logBody(ctx, 2, "Request Body", r.bodyBytes)
}
var result Result var result Result
err := r.request(ctx, func(req *http.Request, resp *http.Response) { err := r.request(ctx, func(req *http.Request, resp *http.Response) {
result.body, result.err = io.ReadAll(resp.Body) result.body, result.err = io.ReadAll(resp.Body)
glogBody("Response Body", result.body) logBody(ctx, 2, "Response Body", result.body)
if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent { if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent {
result.err = r.transformUnstructuredResponseError(resp, req, result.body) result.err = r.transformUnstructuredResponseError(resp, req, result.body)
} }
@ -1232,7 +1249,7 @@ func (r *Request) DoRaw(ctx context.Context) ([]byte, error) {
} }
// transformResponse converts an API response into a structured API object // transformResponse converts an API response into a structured API object
func (r *Request) transformResponse(resp *http.Response, req *http.Request) Result { func (r *Request) transformResponse(ctx context.Context, resp *http.Response, req *http.Request) Result {
var body []byte var body []byte
if resp.Body != nil { if resp.Body != nil {
data, err := io.ReadAll(resp.Body) data, err := io.ReadAll(resp.Body)
@ -1261,7 +1278,8 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu
} }
} }
glogBody("Response Body", body) // Call depth is tricky. This one is okay for Do and DoRaw.
logBody(ctx, 7, "Response Body", body)
// verify the content type is accurate // verify the content type is accurate
var decoder runtime.Decoder var decoder runtime.Decoder
@ -1321,14 +1339,14 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu
} }
// truncateBody decides if the body should be truncated, based on the glog Verbosity. // truncateBody decides if the body should be truncated, based on the glog Verbosity.
func truncateBody(body string) string { func truncateBody(logger klog.Logger, body string) string {
max := 0 max := 0
switch { switch {
case bool(klog.V(10).Enabled()): case bool(logger.V(10).Enabled()):
return body return body
case bool(klog.V(9).Enabled()): case bool(logger.V(9).Enabled()):
max = 10240 max = 10240
case bool(klog.V(8).Enabled()): case bool(logger.V(8).Enabled()):
max = 1024 max = 1024
} }
@ -1339,17 +1357,21 @@ func truncateBody(body string) string {
return body[:max] + fmt.Sprintf(" [truncated %d chars]", len(body)-max) return body[:max] + fmt.Sprintf(" [truncated %d chars]", len(body)-max)
} }
// glogBody logs a body output that could be either JSON or protobuf. It explicitly guards against // logBody logs a body output that could be either JSON or protobuf. It explicitly guards against
// allocating a new string for the body output unless necessary. Uses a simple heuristic to determine // allocating a new string for the body output unless necessary. Uses a simple heuristic to determine
// whether the body is printable. // whether the body is printable.
func glogBody(prefix string, body []byte) { //
if klogV := klog.V(8); klogV.Enabled() { // It needs to be called by all functions which send or receive the data.
func logBody(ctx context.Context, callDepth int, prefix string, body []byte) {
logger := klog.FromContext(ctx)
if loggerV := logger.V(8); loggerV.Enabled() {
loggerV := loggerV.WithCallDepth(callDepth)
if bytes.IndexFunc(body, func(r rune) bool { if bytes.IndexFunc(body, func(r rune) bool {
return r < 0x0a return r < 0x0a
}) != -1 { }) != -1 {
klogV.Infof("%s:\n%s", prefix, truncateBody(hex.Dump(body))) loggerV.Info(prefix, "body", truncateBody(logger, hex.Dump(body)))
} else { } else {
klogV.Infof("%s: %s", prefix, truncateBody(string(body))) loggerV.Info(prefix, "body", truncateBody(logger, string(body)))
} }
} }
} }

View File

@ -28,7 +28,10 @@ import (
"net/http/httptest" "net/http/httptest"
"net/url" "net/url"
"os" "os"
"path"
"reflect" "reflect"
"regexp"
goruntime "runtime"
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -36,6 +39,9 @@ import (
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality" apiequality "k8s.io/apimachinery/pkg/api/equality"
@ -54,6 +60,7 @@ import (
"k8s.io/client-go/util/flowcontrol" "k8s.io/client-go/util/flowcontrol"
utiltesting "k8s.io/client-go/util/testing" utiltesting "k8s.io/client-go/util/testing"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
testingclock "k8s.io/utils/clock/testing" testingclock "k8s.io/utils/clock/testing"
) )
@ -553,6 +560,7 @@ func TestURLTemplate(t *testing.T) {
} }
func TestTransformResponse(t *testing.T) { func TestTransformResponse(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
invalid := []byte("aaaaa") invalid := []byte("aaaaa")
uri, _ := url.Parse("http://localhost") uri, _ := url.Parse("http://localhost")
testCases := []struct { testCases := []struct {
@ -601,7 +609,7 @@ func TestTransformResponse(t *testing.T) {
if test.Response.Body == nil { if test.Response.Body == nil {
test.Response.Body = io.NopCloser(bytes.NewReader([]byte{})) test.Response.Body = io.NopCloser(bytes.NewReader([]byte{}))
} }
result := r.transformResponse(test.Response, &http.Request{}) result := r.transformResponse(ctx, test.Response, &http.Request{})
response, created, err := result.body, result.statusCode == http.StatusCreated, result.err response, created, err := result.body, result.statusCode == http.StatusCreated, result.err
hasErr := err != nil hasErr := err != nil
if hasErr != test.Error { if hasErr != test.Error {
@ -652,6 +660,7 @@ func (r *renegotiator) StreamDecoder(contentType string, params map[string]strin
} }
func TestTransformResponseNegotiate(t *testing.T) { func TestTransformResponseNegotiate(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
invalid := []byte("aaaaa") invalid := []byte("aaaaa")
uri, _ := url.Parse("http://localhost") uri, _ := url.Parse("http://localhost")
testCases := []struct { testCases := []struct {
@ -765,7 +774,7 @@ func TestTransformResponseNegotiate(t *testing.T) {
if test.Response.Body == nil { if test.Response.Body == nil {
test.Response.Body = io.NopCloser(bytes.NewReader([]byte{})) test.Response.Body = io.NopCloser(bytes.NewReader([]byte{}))
} }
result := r.transformResponse(test.Response, &http.Request{}) result := r.transformResponse(ctx, test.Response, &http.Request{})
_, err := result.body, result.err _, err := result.body, result.err
hasErr := err != nil hasErr := err != nil
if hasErr != test.Error { if hasErr != test.Error {
@ -890,6 +899,7 @@ func TestTransformUnstructuredError(t *testing.T) {
for _, testCase := range testCases { for _, testCase := range testCases {
t.Run("", func(t *testing.T) { t.Run("", func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
r := &Request{ r := &Request{
c: &RESTClient{ c: &RESTClient{
content: defaultContentConfig(), content: defaultContentConfig(),
@ -897,7 +907,7 @@ func TestTransformUnstructuredError(t *testing.T) {
resourceName: testCase.Name, resourceName: testCase.Name,
resource: testCase.Resource, resource: testCase.Resource,
} }
result := r.transformResponse(testCase.Res, testCase.Req) result := r.transformResponse(ctx, testCase.Res, testCase.Req)
err := result.err err := result.err
if !testCase.ErrFn(err) { if !testCase.ErrFn(err) {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
@ -2331,7 +2341,7 @@ func TestTruncateBody(t *testing.T) {
l := flag.Lookup("v").Value.(flag.Getter).Get().(klog.Level) l := flag.Lookup("v").Value.(flag.Getter).Get().(klog.Level)
for _, test := range tests { for _, test := range tests {
flag.Set("v", test.level) flag.Set("v", test.level)
got := truncateBody(test.body) got := truncateBody(klog.Background(), test.body)
if got != test.want { if got != test.want {
t.Errorf("truncateBody(%v) = %v, want %v", test.body, got, test.want) t.Errorf("truncateBody(%v) = %v, want %v", test.body, got, test.want)
} }
@ -4051,3 +4061,78 @@ func TestRequestConcurrencyWithRetry(t *testing.T) {
t.Errorf("Expected attempts: %d, but got: %d", expected, attempts) t.Errorf("Expected attempts: %d, but got: %d", expected, attempts)
} }
} }
func TestRequestLogging(t *testing.T) {
testcases := map[string]struct {
v int
body any
expectedOutput string
}{
"no-output": {
v: 7,
body: []byte("ping"),
},
"output": {
v: 8,
body: []byte("ping"),
expectedOutput: `<location>] "Request Body" logger="TestLogger" body="ping"
<location>] "Response Body" logger="TestLogger" body="pong"
`,
},
"io-reader": {
v: 8,
body: strings.NewReader("ping"),
// Cannot log the request body!
expectedOutput: `<location>] "Response Body" logger="TestLogger" body="pong"
`,
},
"truncate": {
v: 8,
body: []byte(strings.Repeat("a", 2000)),
expectedOutput: fmt.Sprintf(`<location>] "Request Body" logger="TestLogger" body="%s [truncated 976 chars]"
<location>] "Response Body" logger="TestLogger" body="pong"
`, strings.Repeat("a", 1024)),
},
}
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
state := klog.CaptureState()
defer state.Restore()
var buffer bytes.Buffer
klog.SetOutput(&buffer)
klog.LogToStderr(false)
var fs flag.FlagSet
klog.InitFlags(&fs)
require.NoError(t, fs.Set("v", fmt.Sprintf("%d", tc.v)), "set verbosity")
client := clientForFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(strings.NewReader("pong")),
}, nil
})
req := NewRequestWithClient(nil, "", defaultContentConfig(), client).
Body(tc.body)
logger := klog.Background()
logger = klog.LoggerWithName(logger, "TestLogger")
ctx := klog.NewContext(context.Background(), logger)
_, file, line, _ := goruntime.Caller(0)
result := req.Do(ctx)
require.NoError(t, result.Error(), "request.Do")
// Compare log output:
// - strip date/time/pid from each line (fixed length header)
// - replace <location> with the actual call location
state.Restore()
expectedOutput := strings.ReplaceAll(tc.expectedOutput, "<location>", fmt.Sprintf("%s:%d", path.Base(file), line+1))
actualOutput := buffer.String()
actualOutput = regexp.MustCompile(`(?m)^.{30}`).ReplaceAllString(actualOutput, "")
assert.Equal(t, expectedOutput, actualOutput)
})
}
}