mirror of
https://github.com/kubernetes/client-go.git
synced 2025-06-23 13:47:19 +00:00
client-go/rest: contextual logging of request/response
Logging in rest.Request.Body cannot be made context-aware without an API change. Such a change is complicated if done in a backwards-compatible fashion (must change lots of callers in Kubernetes) and prohibitive if not (all callers of Body would have to pass a context). Instead, logging of the request body gets moved into the functions which send the request. This is a change of behavior, but it is limited to log levels >= 8 and thus should have no impact in production. A request which gets sent multiple times will also log the body multiple times. This might even be a good thing because it serves as reminder what is being sent when it is being sent. While at it, stack backtracing gets enhanced so that the caller of the REST API is logged and tests for the new behavior get added. Kubernetes-commit: 57f9b7c7a2412865e7817dbf7638881b00ac9721
This commit is contained in:
parent
c5e16f8d68
commit
3d02d42465
@ -450,11 +450,9 @@ func (r *Request) Body(obj interface{}) *Request {
|
|||||||
r.err = err
|
r.err = err
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
glogBody("Request Body", data)
|
|
||||||
r.body = nil
|
r.body = nil
|
||||||
r.bodyBytes = data
|
r.bodyBytes = data
|
||||||
case []byte:
|
case []byte:
|
||||||
glogBody("Request Body", t)
|
|
||||||
r.body = nil
|
r.body = nil
|
||||||
r.bodyBytes = t
|
r.bodyBytes = t
|
||||||
case io.Reader:
|
case io.Reader:
|
||||||
@ -475,7 +473,6 @@ func (r *Request) Body(obj interface{}) *Request {
|
|||||||
r.err = err
|
r.err = err
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
glogBody("Request Body", data)
|
|
||||||
r.body = nil
|
r.body = nil
|
||||||
r.bodyBytes = data
|
r.bodyBytes = data
|
||||||
r.SetHeader("Content-Type", r.c.content.ContentType)
|
r.SetHeader("Content-Type", r.c.content.ContentType)
|
||||||
@ -704,6 +701,10 @@ func (b *throttledLogger) Infof(message string, args ...interface{}) {
|
|||||||
// Watch attempts to begin watching the requested location.
|
// Watch attempts to begin watching the requested location.
|
||||||
// Returns a watch.Interface, or an error.
|
// Returns a watch.Interface, or an error.
|
||||||
func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
|
func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
|
||||||
|
if r.body == nil {
|
||||||
|
logBody(ctx, 2, "Request Body", r.bodyBytes)
|
||||||
|
}
|
||||||
|
|
||||||
// We specifically don't want to rate limit watches, so we
|
// We specifically don't want to rate limit watches, so we
|
||||||
// don't use r.rateLimiter here.
|
// don't use r.rateLimiter here.
|
||||||
if r.err != nil {
|
if r.err != nil {
|
||||||
@ -752,7 +753,7 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
|
|||||||
// the server must have sent us an error in 'err'
|
// the server must have sent us an error in 'err'
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
result := r.transformResponse(resp, req)
|
result := r.transformResponse(ctx, resp, req)
|
||||||
if err := result.Error(); err != nil {
|
if err := result.Error(); err != nil {
|
||||||
return true, err
|
return true, err
|
||||||
}
|
}
|
||||||
@ -845,6 +846,10 @@ func (r WatchListResult) Into(obj runtime.Object) error {
|
|||||||
// Check the documentation https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists
|
// Check the documentation https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists
|
||||||
// to see what parameters are currently required.
|
// to see what parameters are currently required.
|
||||||
func (r *Request) WatchList(ctx context.Context) WatchListResult {
|
func (r *Request) WatchList(ctx context.Context) WatchListResult {
|
||||||
|
if r.body == nil {
|
||||||
|
logBody(ctx, 2, "Request Body", r.bodyBytes)
|
||||||
|
}
|
||||||
|
|
||||||
if !clientfeatures.FeatureGates().Enabled(clientfeatures.WatchListClient) {
|
if !clientfeatures.FeatureGates().Enabled(clientfeatures.WatchListClient) {
|
||||||
return WatchListResult{err: fmt.Errorf("%q feature gate is not enabled", clientfeatures.WatchListClient)}
|
return WatchListResult{err: fmt.Errorf("%q feature gate is not enabled", clientfeatures.WatchListClient)}
|
||||||
}
|
}
|
||||||
@ -969,6 +974,10 @@ func sanitize(req *Request, resp *http.Response, err error) (string, string) {
|
|||||||
// Any non-2xx http status code causes an error. If we get a non-2xx code, we try to convert the body into an APIStatus object.
|
// Any non-2xx http status code causes an error. If we get a non-2xx code, we try to convert the body into an APIStatus object.
|
||||||
// If we can, we return that as an error. Otherwise, we create an error that lists the http status and the content of the response.
|
// If we can, we return that as an error. Otherwise, we create an error that lists the http status and the content of the response.
|
||||||
func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
|
func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
|
||||||
|
if r.body == nil {
|
||||||
|
logBody(ctx, 2, "Request Body", r.bodyBytes)
|
||||||
|
}
|
||||||
|
|
||||||
if r.err != nil {
|
if r.err != nil {
|
||||||
return nil, r.err
|
return nil, r.err
|
||||||
}
|
}
|
||||||
@ -1012,7 +1021,7 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
|
|||||||
if retry.IsNextRetry(ctx, r, req, resp, err, neverRetryError) {
|
if retry.IsNextRetry(ctx, r, req, resp, err, neverRetryError) {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
result := r.transformResponse(resp, req)
|
result := r.transformResponse(ctx, resp, req)
|
||||||
if err := result.Error(); err != nil {
|
if err := result.Error(); err != nil {
|
||||||
return true, err
|
return true, err
|
||||||
}
|
}
|
||||||
@ -1199,9 +1208,13 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
|
|||||||
// - If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
|
// - If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
|
||||||
// - http.Client.Do errors are returned directly.
|
// - http.Client.Do errors are returned directly.
|
||||||
func (r *Request) Do(ctx context.Context) Result {
|
func (r *Request) Do(ctx context.Context) Result {
|
||||||
|
if r.body == nil {
|
||||||
|
logBody(ctx, 2, "Request Body", r.bodyBytes)
|
||||||
|
}
|
||||||
|
|
||||||
var result Result
|
var result Result
|
||||||
err := r.request(ctx, func(req *http.Request, resp *http.Response) {
|
err := r.request(ctx, func(req *http.Request, resp *http.Response) {
|
||||||
result = r.transformResponse(resp, req)
|
result = r.transformResponse(ctx, resp, req)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Result{err: err}
|
return Result{err: err}
|
||||||
@ -1214,10 +1227,14 @@ func (r *Request) Do(ctx context.Context) Result {
|
|||||||
|
|
||||||
// DoRaw executes the request but does not process the response body.
|
// DoRaw executes the request but does not process the response body.
|
||||||
func (r *Request) DoRaw(ctx context.Context) ([]byte, error) {
|
func (r *Request) DoRaw(ctx context.Context) ([]byte, error) {
|
||||||
|
if r.body == nil {
|
||||||
|
logBody(ctx, 2, "Request Body", r.bodyBytes)
|
||||||
|
}
|
||||||
|
|
||||||
var result Result
|
var result Result
|
||||||
err := r.request(ctx, func(req *http.Request, resp *http.Response) {
|
err := r.request(ctx, func(req *http.Request, resp *http.Response) {
|
||||||
result.body, result.err = io.ReadAll(resp.Body)
|
result.body, result.err = io.ReadAll(resp.Body)
|
||||||
glogBody("Response Body", result.body)
|
logBody(ctx, 2, "Response Body", result.body)
|
||||||
if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent {
|
if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent {
|
||||||
result.err = r.transformUnstructuredResponseError(resp, req, result.body)
|
result.err = r.transformUnstructuredResponseError(resp, req, result.body)
|
||||||
}
|
}
|
||||||
@ -1232,7 +1249,7 @@ func (r *Request) DoRaw(ctx context.Context) ([]byte, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// transformResponse converts an API response into a structured API object
|
// transformResponse converts an API response into a structured API object
|
||||||
func (r *Request) transformResponse(resp *http.Response, req *http.Request) Result {
|
func (r *Request) transformResponse(ctx context.Context, resp *http.Response, req *http.Request) Result {
|
||||||
var body []byte
|
var body []byte
|
||||||
if resp.Body != nil {
|
if resp.Body != nil {
|
||||||
data, err := io.ReadAll(resp.Body)
|
data, err := io.ReadAll(resp.Body)
|
||||||
@ -1261,7 +1278,8 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
glogBody("Response Body", body)
|
// Call depth is tricky. This one is okay for Do and DoRaw.
|
||||||
|
logBody(ctx, 7, "Response Body", body)
|
||||||
|
|
||||||
// verify the content type is accurate
|
// verify the content type is accurate
|
||||||
var decoder runtime.Decoder
|
var decoder runtime.Decoder
|
||||||
@ -1321,14 +1339,14 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu
|
|||||||
}
|
}
|
||||||
|
|
||||||
// truncateBody decides if the body should be truncated, based on the glog Verbosity.
|
// truncateBody decides if the body should be truncated, based on the glog Verbosity.
|
||||||
func truncateBody(body string) string {
|
func truncateBody(logger klog.Logger, body string) string {
|
||||||
max := 0
|
max := 0
|
||||||
switch {
|
switch {
|
||||||
case bool(klog.V(10).Enabled()):
|
case bool(logger.V(10).Enabled()):
|
||||||
return body
|
return body
|
||||||
case bool(klog.V(9).Enabled()):
|
case bool(logger.V(9).Enabled()):
|
||||||
max = 10240
|
max = 10240
|
||||||
case bool(klog.V(8).Enabled()):
|
case bool(logger.V(8).Enabled()):
|
||||||
max = 1024
|
max = 1024
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1339,17 +1357,21 @@ func truncateBody(body string) string {
|
|||||||
return body[:max] + fmt.Sprintf(" [truncated %d chars]", len(body)-max)
|
return body[:max] + fmt.Sprintf(" [truncated %d chars]", len(body)-max)
|
||||||
}
|
}
|
||||||
|
|
||||||
// glogBody logs a body output that could be either JSON or protobuf. It explicitly guards against
|
// logBody logs a body output that could be either JSON or protobuf. It explicitly guards against
|
||||||
// allocating a new string for the body output unless necessary. Uses a simple heuristic to determine
|
// allocating a new string for the body output unless necessary. Uses a simple heuristic to determine
|
||||||
// whether the body is printable.
|
// whether the body is printable.
|
||||||
func glogBody(prefix string, body []byte) {
|
//
|
||||||
if klogV := klog.V(8); klogV.Enabled() {
|
// It needs to be called by all functions which send or receive the data.
|
||||||
|
func logBody(ctx context.Context, callDepth int, prefix string, body []byte) {
|
||||||
|
logger := klog.FromContext(ctx)
|
||||||
|
if loggerV := logger.V(8); loggerV.Enabled() {
|
||||||
|
loggerV := loggerV.WithCallDepth(callDepth)
|
||||||
if bytes.IndexFunc(body, func(r rune) bool {
|
if bytes.IndexFunc(body, func(r rune) bool {
|
||||||
return r < 0x0a
|
return r < 0x0a
|
||||||
}) != -1 {
|
}) != -1 {
|
||||||
klogV.Infof("%s:\n%s", prefix, truncateBody(hex.Dump(body)))
|
loggerV.Info(prefix, "body", truncateBody(logger, hex.Dump(body)))
|
||||||
} else {
|
} else {
|
||||||
klogV.Infof("%s: %s", prefix, truncateBody(string(body)))
|
loggerV.Info(prefix, "body", truncateBody(logger, string(body)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -28,7 +28,10 @@ import (
|
|||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
|
"path"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"regexp"
|
||||||
|
goruntime "runtime"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
@ -36,6 +39,9 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||||
@ -54,6 +60,7 @@ import (
|
|||||||
"k8s.io/client-go/util/flowcontrol"
|
"k8s.io/client-go/util/flowcontrol"
|
||||||
utiltesting "k8s.io/client-go/util/testing"
|
utiltesting "k8s.io/client-go/util/testing"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
|
"k8s.io/klog/v2/ktesting"
|
||||||
testingclock "k8s.io/utils/clock/testing"
|
testingclock "k8s.io/utils/clock/testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -553,6 +560,7 @@ func TestURLTemplate(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestTransformResponse(t *testing.T) {
|
func TestTransformResponse(t *testing.T) {
|
||||||
|
_, ctx := ktesting.NewTestContext(t)
|
||||||
invalid := []byte("aaaaa")
|
invalid := []byte("aaaaa")
|
||||||
uri, _ := url.Parse("http://localhost")
|
uri, _ := url.Parse("http://localhost")
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
@ -601,7 +609,7 @@ func TestTransformResponse(t *testing.T) {
|
|||||||
if test.Response.Body == nil {
|
if test.Response.Body == nil {
|
||||||
test.Response.Body = io.NopCloser(bytes.NewReader([]byte{}))
|
test.Response.Body = io.NopCloser(bytes.NewReader([]byte{}))
|
||||||
}
|
}
|
||||||
result := r.transformResponse(test.Response, &http.Request{})
|
result := r.transformResponse(ctx, test.Response, &http.Request{})
|
||||||
response, created, err := result.body, result.statusCode == http.StatusCreated, result.err
|
response, created, err := result.body, result.statusCode == http.StatusCreated, result.err
|
||||||
hasErr := err != nil
|
hasErr := err != nil
|
||||||
if hasErr != test.Error {
|
if hasErr != test.Error {
|
||||||
@ -652,6 +660,7 @@ func (r *renegotiator) StreamDecoder(contentType string, params map[string]strin
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestTransformResponseNegotiate(t *testing.T) {
|
func TestTransformResponseNegotiate(t *testing.T) {
|
||||||
|
_, ctx := ktesting.NewTestContext(t)
|
||||||
invalid := []byte("aaaaa")
|
invalid := []byte("aaaaa")
|
||||||
uri, _ := url.Parse("http://localhost")
|
uri, _ := url.Parse("http://localhost")
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
@ -765,7 +774,7 @@ func TestTransformResponseNegotiate(t *testing.T) {
|
|||||||
if test.Response.Body == nil {
|
if test.Response.Body == nil {
|
||||||
test.Response.Body = io.NopCloser(bytes.NewReader([]byte{}))
|
test.Response.Body = io.NopCloser(bytes.NewReader([]byte{}))
|
||||||
}
|
}
|
||||||
result := r.transformResponse(test.Response, &http.Request{})
|
result := r.transformResponse(ctx, test.Response, &http.Request{})
|
||||||
_, err := result.body, result.err
|
_, err := result.body, result.err
|
||||||
hasErr := err != nil
|
hasErr := err != nil
|
||||||
if hasErr != test.Error {
|
if hasErr != test.Error {
|
||||||
@ -890,6 +899,7 @@ func TestTransformUnstructuredError(t *testing.T) {
|
|||||||
|
|
||||||
for _, testCase := range testCases {
|
for _, testCase := range testCases {
|
||||||
t.Run("", func(t *testing.T) {
|
t.Run("", func(t *testing.T) {
|
||||||
|
_, ctx := ktesting.NewTestContext(t)
|
||||||
r := &Request{
|
r := &Request{
|
||||||
c: &RESTClient{
|
c: &RESTClient{
|
||||||
content: defaultContentConfig(),
|
content: defaultContentConfig(),
|
||||||
@ -897,7 +907,7 @@ func TestTransformUnstructuredError(t *testing.T) {
|
|||||||
resourceName: testCase.Name,
|
resourceName: testCase.Name,
|
||||||
resource: testCase.Resource,
|
resource: testCase.Resource,
|
||||||
}
|
}
|
||||||
result := r.transformResponse(testCase.Res, testCase.Req)
|
result := r.transformResponse(ctx, testCase.Res, testCase.Req)
|
||||||
err := result.err
|
err := result.err
|
||||||
if !testCase.ErrFn(err) {
|
if !testCase.ErrFn(err) {
|
||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
@ -2331,7 +2341,7 @@ func TestTruncateBody(t *testing.T) {
|
|||||||
l := flag.Lookup("v").Value.(flag.Getter).Get().(klog.Level)
|
l := flag.Lookup("v").Value.(flag.Getter).Get().(klog.Level)
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
flag.Set("v", test.level)
|
flag.Set("v", test.level)
|
||||||
got := truncateBody(test.body)
|
got := truncateBody(klog.Background(), test.body)
|
||||||
if got != test.want {
|
if got != test.want {
|
||||||
t.Errorf("truncateBody(%v) = %v, want %v", test.body, got, test.want)
|
t.Errorf("truncateBody(%v) = %v, want %v", test.body, got, test.want)
|
||||||
}
|
}
|
||||||
@ -4051,3 +4061,78 @@ func TestRequestConcurrencyWithRetry(t *testing.T) {
|
|||||||
t.Errorf("Expected attempts: %d, but got: %d", expected, attempts)
|
t.Errorf("Expected attempts: %d, but got: %d", expected, attempts)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRequestLogging(t *testing.T) {
|
||||||
|
testcases := map[string]struct {
|
||||||
|
v int
|
||||||
|
body any
|
||||||
|
expectedOutput string
|
||||||
|
}{
|
||||||
|
"no-output": {
|
||||||
|
v: 7,
|
||||||
|
body: []byte("ping"),
|
||||||
|
},
|
||||||
|
"output": {
|
||||||
|
v: 8,
|
||||||
|
body: []byte("ping"),
|
||||||
|
expectedOutput: `<location>] "Request Body" logger="TestLogger" body="ping"
|
||||||
|
<location>] "Response Body" logger="TestLogger" body="pong"
|
||||||
|
`,
|
||||||
|
},
|
||||||
|
"io-reader": {
|
||||||
|
v: 8,
|
||||||
|
body: strings.NewReader("ping"),
|
||||||
|
// Cannot log the request body!
|
||||||
|
expectedOutput: `<location>] "Response Body" logger="TestLogger" body="pong"
|
||||||
|
`,
|
||||||
|
},
|
||||||
|
"truncate": {
|
||||||
|
v: 8,
|
||||||
|
body: []byte(strings.Repeat("a", 2000)),
|
||||||
|
expectedOutput: fmt.Sprintf(`<location>] "Request Body" logger="TestLogger" body="%s [truncated 976 chars]"
|
||||||
|
<location>] "Response Body" logger="TestLogger" body="pong"
|
||||||
|
`, strings.Repeat("a", 1024)),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, tc := range testcases {
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
state := klog.CaptureState()
|
||||||
|
defer state.Restore()
|
||||||
|
|
||||||
|
var buffer bytes.Buffer
|
||||||
|
klog.SetOutput(&buffer)
|
||||||
|
klog.LogToStderr(false)
|
||||||
|
var fs flag.FlagSet
|
||||||
|
klog.InitFlags(&fs)
|
||||||
|
require.NoError(t, fs.Set("v", fmt.Sprintf("%d", tc.v)), "set verbosity")
|
||||||
|
|
||||||
|
client := clientForFunc(func(req *http.Request) (*http.Response, error) {
|
||||||
|
return &http.Response{
|
||||||
|
StatusCode: http.StatusOK,
|
||||||
|
Body: io.NopCloser(strings.NewReader("pong")),
|
||||||
|
}, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
req := NewRequestWithClient(nil, "", defaultContentConfig(), client).
|
||||||
|
Body(tc.body)
|
||||||
|
|
||||||
|
logger := klog.Background()
|
||||||
|
logger = klog.LoggerWithName(logger, "TestLogger")
|
||||||
|
ctx := klog.NewContext(context.Background(), logger)
|
||||||
|
|
||||||
|
_, file, line, _ := goruntime.Caller(0)
|
||||||
|
result := req.Do(ctx)
|
||||||
|
require.NoError(t, result.Error(), "request.Do")
|
||||||
|
|
||||||
|
// Compare log output:
|
||||||
|
// - strip date/time/pid from each line (fixed length header)
|
||||||
|
// - replace <location> with the actual call location
|
||||||
|
state.Restore()
|
||||||
|
expectedOutput := strings.ReplaceAll(tc.expectedOutput, "<location>", fmt.Sprintf("%s:%d", path.Base(file), line+1))
|
||||||
|
actualOutput := buffer.String()
|
||||||
|
actualOutput = regexp.MustCompile(`(?m)^.{30}`).ReplaceAllString(actualOutput, "")
|
||||||
|
assert.Equal(t, expectedOutput, actualOutput)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user