client-go/rest: finish conversion to contextual logging

The remaining calls can be converted without API changes.

Kubernetes-commit: 7821abf2ae289673bbfa3b9a6b8b34f5196c7c7e
This commit is contained in:
Patrick Ohly
2024-09-02 17:55:12 +02:00
committed by Kubernetes Publisher
parent 7aa9904196
commit 5d128adc87
5 changed files with 49 additions and 36 deletions

View File

@@ -54,7 +54,7 @@ import (
"k8s.io/utils/clock"
)
var (
const (
// longThrottleLatency defines threshold for logging requests. All requests being
// throttled (via the provided rateLimiter) for more than longThrottleLatency will
// be logged.
@@ -676,21 +676,17 @@ func (r *Request) tryThrottleWithInfo(ctx context.Context, retryInfo string) err
}
latency := time.Since(now)
var message string
switch {
case len(retryInfo) > 0:
message = fmt.Sprintf("Waited for %v, %s - request: %s:%s", latency, retryInfo, r.verb, r.URL().String())
default:
message = fmt.Sprintf("Waited for %v due to client-side throttling, not priority and fairness, request: %s:%s", latency, r.verb, r.URL().String())
}
if latency > longThrottleLatency {
klog.V(3).Info(message)
}
if latency > extraLongThrottleLatency {
// If the rate limiter latency is very high, the log message should be printed at a higher log level,
// but we use a throttled logger to prevent spamming.
globalThrottledLogger.Infof("%s", message)
if retryInfo == "" {
retryInfo = "client-side throttling, not priority and fairness"
}
klog.FromContext(ctx).V(3).Info("Waited before sending request", "delay", latency, "reason", retryInfo, "verb", r.verb, "URL", r.URL())
if latency > extraLongThrottleLatency {
// If the rate limiter latency is very high, the log message should be printed at a higher log level,
// but we use a throttled logger to prevent spamming.
globalThrottledLogger.info(klog.FromContext(ctx), "Waited before sending request", "delay", latency, "reason", retryInfo, "verb", r.verb, "URL", r.URL())
}
}
metrics.RateLimiterLatency.Observe(ctx, r.verb, r.finalURLTemplate(), latency)
@@ -702,7 +698,7 @@ func (r *Request) tryThrottle(ctx context.Context) error {
}
type throttleSettings struct {
logLevel klog.Level
logLevel int
minLogInterval time.Duration
lastLogTime time.Time
@@ -727,9 +723,9 @@ var globalThrottledLogger = &throttledLogger{
},
}
func (b *throttledLogger) attemptToLog() (klog.Level, bool) {
func (b *throttledLogger) attemptToLog(logger klog.Logger) (int, bool) {
for _, setting := range b.settings {
if bool(klog.V(setting.logLevel).Enabled()) {
if bool(logger.V(setting.logLevel).Enabled()) {
// Return early without write locking if possible.
if func() bool {
setting.lock.RLock()
@@ -751,9 +747,9 @@ func (b *throttledLogger) attemptToLog() (klog.Level, bool) {
// Infof will write a log message at each logLevel specified by the receiver's throttleSettings
// as long as it hasn't written a log message more recently than minLogInterval.
func (b *throttledLogger) Infof(message string, args ...interface{}) {
if logLevel, ok := b.attemptToLog(); ok {
klog.V(logLevel).Infof(message, args...)
func (b *throttledLogger) info(logger klog.Logger, message string, kv ...any) {
if logLevel, ok := b.attemptToLog(logger); ok {
logger.V(logLevel).Info(message, kv...)
}
}
@@ -1000,7 +996,7 @@ func (r *Request) newStreamWatcher(ctx context.Context, resp *http.Response) (wa
contentType := resp.Header.Get("Content-Type")
mediaType, params, err := mime.ParseMediaType(contentType)
if err != nil {
klog.V(4).Infof("Unexpected content type from the server: %q: %v", contentType, err)
klog.FromContext(ctx).V(4).Info("Unexpected content type from the server", "contentType", contentType, "err", err)
}
objectDecoder, streamingSerializer, framer, err := r.contentConfig.Negotiator.StreamDecoder(mediaType, params)
if err != nil {
@@ -1202,7 +1198,7 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
}()
if r.err != nil {
klog.V(4).Infof("Error in request: %v", r.err)
klog.FromContext(ctx).V(4).Info("Error in request", "err", r.err)
return r.err
}
@@ -1303,7 +1299,7 @@ func (r *Request) Do(ctx context.Context) Result {
result = r.transformResponse(ctx, resp, req)
})
if err != nil {
return Result{err: err}
return Result{err: err, loggingCtx: context.WithoutCancel(ctx)}
}
if result.err == nil || len(result.body) > 0 {
metrics.ResponseSize.Observe(ctx, r.verb, r.URL().Host, float64(len(result.body)))
@@ -1350,16 +1346,18 @@ func (r *Request) transformResponse(ctx context.Context, resp *http.Response, re
// 2. Apiserver sends back the headers and then part of the body
// 3. Apiserver closes connection.
// 4. client-go should catch this and return an error.
klog.V(2).Infof("Stream error %#v when reading response body, may be caused by closed connection.", err)
klog.FromContext(ctx).V(2).Info("Stream error when reading response body, may be caused by closed connection", "err", err)
streamErr := fmt.Errorf("stream error when reading response body, may be caused by closed connection. Please retry. Original error: %w", err)
return Result{
err: streamErr,
err: streamErr,
loggingCtx: context.WithoutCancel(ctx),
}
default:
klog.Errorf("Unexpected error when reading response body: %v", err)
klog.FromContext(ctx).Error(err, "Unexpected error when reading response body")
unexpectedErr := fmt.Errorf("unexpected error when reading response body. Please retry. Original error: %w", err)
return Result{
err: unexpectedErr,
err: unexpectedErr,
loggingCtx: context.WithoutCancel(ctx),
}
}
}
@@ -1377,7 +1375,7 @@ func (r *Request) transformResponse(ctx context.Context, resp *http.Response, re
var err error
mediaType, params, err := mime.ParseMediaType(contentType)
if err != nil {
return Result{err: errors.NewInternalError(err)}
return Result{err: errors.NewInternalError(err), loggingCtx: context.WithoutCancel(ctx)}
}
decoder, err = r.contentConfig.Negotiator.Decoder(mediaType, params)
if err != nil {
@@ -1386,13 +1384,14 @@ func (r *Request) transformResponse(ctx context.Context, resp *http.Response, re
case resp.StatusCode == http.StatusSwitchingProtocols:
// no-op, we've been upgraded
case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
return Result{err: r.transformUnstructuredResponseError(resp, req, body)}
return Result{err: r.transformUnstructuredResponseError(resp, req, body), loggingCtx: context.WithoutCancel(ctx)}
}
return Result{
body: body,
contentType: contentType,
statusCode: resp.StatusCode,
warnings: handleWarnings(ctx, resp.Header, r.warningHandler),
loggingCtx: context.WithoutCancel(ctx),
}
}
}
@@ -1412,6 +1411,7 @@ func (r *Request) transformResponse(ctx context.Context, resp *http.Response, re
decoder: decoder,
err: err,
warnings: handleWarnings(ctx, resp.Header, r.warningHandler),
loggingCtx: context.WithoutCancel(ctx),
}
}
@@ -1421,6 +1421,7 @@ func (r *Request) transformResponse(ctx context.Context, resp *http.Response, re
statusCode: resp.StatusCode,
decoder: decoder,
warnings: handleWarnings(ctx, resp.Header, r.warningHandler),
loggingCtx: context.WithoutCancel(ctx),
}
}
@@ -1552,6 +1553,10 @@ type Result struct {
err error
statusCode int
// Log calls in Result methods use the same context for logging as the
// method which created the Result. This context has no cancellation.
loggingCtx context.Context
decoder runtime.Decoder
}
@@ -1656,7 +1661,11 @@ func (r Result) Error() error {
// to be backwards compatible with old servers that do not return a version, default to "v1"
out, _, err := r.decoder.Decode(r.body, &schema.GroupVersionKind{Version: "v1"}, nil)
if err != nil {
klog.V(5).Infof("body was not decodable (unable to check for Status): %v", err)
ctx := r.loggingCtx
if ctx == nil {
ctx = context.Background()
}
klog.FromContext(ctx).V(5).Info("Body was not decodable (unable to check for Status)", "err", err)
return r.err
}
switch t := out.(type) {