client-go rest: store logger in Result

Storing a context and making sure that it never gets canceled also has
overhead. We might as well just do the klog.FromContext when constructing
the Result and store the logger for later use.

Kubernetes-commit: b7386467c8df686e935c477eac26049a80de789b
This commit is contained in:
Patrick Ohly 2024-11-27 13:19:47 +01:00 committed by Kubernetes Publisher
parent 5d128adc87
commit be86cb59f1

View File

@ -762,7 +762,7 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
func (r *Request) watchInternal(ctx context.Context) (watch.Interface, runtime.Decoder, error) { func (r *Request) watchInternal(ctx context.Context) (watch.Interface, runtime.Decoder, error) {
if r.body == nil { if r.body == nil {
logBody(ctx, 2, "Request Body", r.bodyBytes) logBody(klog.FromContext(ctx), 2, "Request Body", r.bodyBytes)
} }
// We specifically don't want to rate limit watches, so we // We specifically don't want to rate limit watches, so we
@ -921,7 +921,7 @@ func (r WatchListResult) Into(obj runtime.Object) error {
// to see what parameters are currently required. // to see what parameters are currently required.
func (r *Request) WatchList(ctx context.Context) WatchListResult { func (r *Request) WatchList(ctx context.Context) WatchListResult {
if r.body == nil { if r.body == nil {
logBody(ctx, 2, "Request Body", r.bodyBytes) logBody(klog.FromContext(ctx), 2, "Request Body", r.bodyBytes)
} }
if !clientfeatures.FeatureGates().Enabled(clientfeatures.WatchListClient) { if !clientfeatures.FeatureGates().Enabled(clientfeatures.WatchListClient) {
@ -1054,7 +1054,7 @@ func sanitize(req *Request, resp *http.Response, err error) (string, string) {
// If we can, we return that as an error. Otherwise, we create an error that lists the http status and the content of the response. // If we can, we return that as an error. Otherwise, we create an error that lists the http status and the content of the response.
func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) { func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
if r.body == nil { if r.body == nil {
logBody(ctx, 2, "Request Body", r.bodyBytes) logBody(klog.FromContext(ctx), 2, "Request Body", r.bodyBytes)
} }
if r.err != nil { if r.err != nil {
@ -1290,8 +1290,9 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
// - If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError // - If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
// - http.Client.Do errors are returned directly. // - http.Client.Do errors are returned directly.
func (r *Request) Do(ctx context.Context) Result { func (r *Request) Do(ctx context.Context) Result {
logger := klog.FromContext(ctx)
if r.body == nil { if r.body == nil {
logBody(ctx, 2, "Request Body", r.bodyBytes) logBody(logger, 2, "Request Body", r.bodyBytes)
} }
var result Result var result Result
@ -1299,7 +1300,7 @@ func (r *Request) Do(ctx context.Context) Result {
result = r.transformResponse(ctx, resp, req) result = r.transformResponse(ctx, resp, req)
}) })
if err != nil { if err != nil {
return Result{err: err, loggingCtx: context.WithoutCancel(ctx)} return Result{err: err, logger: logger}
} }
if result.err == nil || len(result.body) > 0 { if result.err == nil || len(result.body) > 0 {
metrics.ResponseSize.Observe(ctx, r.verb, r.URL().Host, float64(len(result.body))) metrics.ResponseSize.Observe(ctx, r.verb, r.URL().Host, float64(len(result.body)))
@ -1309,14 +1310,15 @@ func (r *Request) Do(ctx context.Context) Result {
// DoRaw executes the request but does not process the response body. // DoRaw executes the request but does not process the response body.
func (r *Request) DoRaw(ctx context.Context) ([]byte, error) { func (r *Request) DoRaw(ctx context.Context) ([]byte, error) {
logger := klog.FromContext(ctx)
if r.body == nil { if r.body == nil {
logBody(ctx, 2, "Request Body", r.bodyBytes) logBody(logger, 2, "Request Body", r.bodyBytes)
} }
var result Result var result Result
err := r.request(ctx, func(req *http.Request, resp *http.Response) { err := r.request(ctx, func(req *http.Request, resp *http.Response) {
result.body, result.err = io.ReadAll(resp.Body) result.body, result.err = io.ReadAll(resp.Body)
logBody(ctx, 2, "Response Body", result.body) logBody(logger, 2, "Response Body", result.body)
if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent { if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent {
result.err = r.transformUnstructuredResponseError(resp, req, result.body) result.err = r.transformUnstructuredResponseError(resp, req, result.body)
} }
@ -1332,6 +1334,7 @@ func (r *Request) DoRaw(ctx context.Context) ([]byte, error) {
// transformResponse converts an API response into a structured API object // transformResponse converts an API response into a structured API object
func (r *Request) transformResponse(ctx context.Context, resp *http.Response, req *http.Request) Result { func (r *Request) transformResponse(ctx context.Context, resp *http.Response, req *http.Request) Result {
logger := klog.FromContext(ctx)
var body []byte var body []byte
if resp.Body != nil { if resp.Body != nil {
data, err := io.ReadAll(resp.Body) data, err := io.ReadAll(resp.Body)
@ -1346,24 +1349,24 @@ func (r *Request) transformResponse(ctx context.Context, resp *http.Response, re
// 2. Apiserver sends back the headers and then part of the body // 2. Apiserver sends back the headers and then part of the body
// 3. Apiserver closes connection. // 3. Apiserver closes connection.
// 4. client-go should catch this and return an error. // 4. client-go should catch this and return an error.
klog.FromContext(ctx).V(2).Info("Stream error when reading response body, may be caused by closed connection", "err", err) logger.V(2).Info("Stream error when reading response body, may be caused by closed connection", "err", err)
streamErr := fmt.Errorf("stream error when reading response body, may be caused by closed connection. Please retry. Original error: %w", err) streamErr := fmt.Errorf("stream error when reading response body, may be caused by closed connection. Please retry. Original error: %w", err)
return Result{ return Result{
err: streamErr, err: streamErr,
loggingCtx: context.WithoutCancel(ctx), logger: logger,
} }
default: default:
klog.FromContext(ctx).Error(err, "Unexpected error when reading response body") logger.Error(err, "Unexpected error when reading response body")
unexpectedErr := fmt.Errorf("unexpected error when reading response body. Please retry. Original error: %w", err) unexpectedErr := fmt.Errorf("unexpected error when reading response body. Please retry. Original error: %w", err)
return Result{ return Result{
err: unexpectedErr, err: unexpectedErr,
loggingCtx: context.WithoutCancel(ctx), logger: logger,
} }
} }
} }
// Call depth is tricky. This one is okay for Do and DoRaw. // Call depth is tricky. This one is okay for Do and DoRaw.
logBody(ctx, 7, "Response Body", body) logBody(logger, 7, "Response Body", body)
// verify the content type is accurate // verify the content type is accurate
var decoder runtime.Decoder var decoder runtime.Decoder
@ -1375,7 +1378,7 @@ func (r *Request) transformResponse(ctx context.Context, resp *http.Response, re
var err error var err error
mediaType, params, err := mime.ParseMediaType(contentType) mediaType, params, err := mime.ParseMediaType(contentType)
if err != nil { if err != nil {
return Result{err: errors.NewInternalError(err), loggingCtx: context.WithoutCancel(ctx)} return Result{err: errors.NewInternalError(err), logger: logger}
} }
decoder, err = r.contentConfig.Negotiator.Decoder(mediaType, params) decoder, err = r.contentConfig.Negotiator.Decoder(mediaType, params)
if err != nil { if err != nil {
@ -1384,14 +1387,14 @@ func (r *Request) transformResponse(ctx context.Context, resp *http.Response, re
case resp.StatusCode == http.StatusSwitchingProtocols: case resp.StatusCode == http.StatusSwitchingProtocols:
// no-op, we've been upgraded // no-op, we've been upgraded
case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent: case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
return Result{err: r.transformUnstructuredResponseError(resp, req, body), loggingCtx: context.WithoutCancel(ctx)} return Result{err: r.transformUnstructuredResponseError(resp, req, body), logger: logger}
} }
return Result{ return Result{
body: body, body: body,
contentType: contentType, contentType: contentType,
statusCode: resp.StatusCode, statusCode: resp.StatusCode,
warnings: handleWarnings(ctx, resp.Header, r.warningHandler), warnings: handleWarnings(ctx, resp.Header, r.warningHandler),
loggingCtx: context.WithoutCancel(ctx), logger: logger,
} }
} }
} }
@ -1411,7 +1414,7 @@ func (r *Request) transformResponse(ctx context.Context, resp *http.Response, re
decoder: decoder, decoder: decoder,
err: err, err: err,
warnings: handleWarnings(ctx, resp.Header, r.warningHandler), warnings: handleWarnings(ctx, resp.Header, r.warningHandler),
loggingCtx: context.WithoutCancel(ctx), logger: logger,
} }
} }
@ -1421,7 +1424,7 @@ func (r *Request) transformResponse(ctx context.Context, resp *http.Response, re
statusCode: resp.StatusCode, statusCode: resp.StatusCode,
decoder: decoder, decoder: decoder,
warnings: handleWarnings(ctx, resp.Header, r.warningHandler), warnings: handleWarnings(ctx, resp.Header, r.warningHandler),
loggingCtx: context.WithoutCancel(ctx), logger: logger,
} }
} }
@ -1449,8 +1452,7 @@ func truncateBody(logger klog.Logger, body string) string {
// whether the body is printable. // whether the body is printable.
// //
// It needs to be called by all functions which send or receive the data. // It needs to be called by all functions which send or receive the data.
func logBody(ctx context.Context, callDepth int, prefix string, body []byte) { func logBody(logger klog.Logger, callDepth int, prefix string, body []byte) {
logger := klog.FromContext(ctx)
if loggerV := logger.V(8); loggerV.Enabled() { if loggerV := logger.V(8); loggerV.Enabled() {
loggerV := loggerV.WithCallDepth(callDepth) loggerV := loggerV.WithCallDepth(callDepth)
if bytes.IndexFunc(body, func(r rune) bool { if bytes.IndexFunc(body, func(r rune) bool {
@ -1552,10 +1554,7 @@ type Result struct {
contentType string contentType string
err error err error
statusCode int statusCode int
logger klog.Logger
// Log calls in Result methods use the same context for logging as the
// method which created the Result. This context has no cancellation.
loggingCtx context.Context
decoder runtime.Decoder decoder runtime.Decoder
} }
@ -1661,11 +1660,7 @@ func (r Result) Error() error {
// to be backwards compatible with old servers that do not return a version, default to "v1" // to be backwards compatible with old servers that do not return a version, default to "v1"
out, _, err := r.decoder.Decode(r.body, &schema.GroupVersionKind{Version: "v1"}, nil) out, _, err := r.decoder.Decode(r.body, &schema.GroupVersionKind{Version: "v1"}, nil)
if err != nil { if err != nil {
ctx := r.loggingCtx r.logger.V(5).Info("Body was not decodable (unable to check for Status)", "err", err)
if ctx == nil {
ctx = context.Background()
}
klog.FromContext(ctx).V(5).Info("Body was not decodable (unable to check for Status)", "err", err)
return r.err return r.err
} }
switch t := out.(type) { switch t := out.(type) {