mirror of
https://github.com/kubernetes/client-go.git
synced 2025-06-25 14:41:53 +00:00
Merge pull request #129330 from pohly/log-client-go-transport
client-go/transport: structured, contextual logging Kubernetes-commit: e07aeb7c8b52a486518f2f55ea58595ae007f3b0
This commit is contained in:
commit
f2030849e1
2
go.mod
2
go.mod
@ -9,6 +9,7 @@ godebug default=go1.23
|
|||||||
godebug winsymlink=0
|
godebug winsymlink=0
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
github.com/go-logr/logr v1.4.2
|
||||||
github.com/gogo/protobuf v1.3.2
|
github.com/gogo/protobuf v1.3.2
|
||||||
github.com/google/gnostic-models v0.6.9
|
github.com/google/gnostic-models v0.6.9
|
||||||
github.com/google/go-cmp v0.6.0
|
github.com/google/go-cmp v0.6.0
|
||||||
@ -41,7 +42,6 @@ require (
|
|||||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||||
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
|
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
|
||||||
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
|
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
|
||||||
github.com/go-logr/logr v1.4.2 // indirect
|
|
||||||
github.com/go-openapi/jsonpointer v0.21.0 // indirect
|
github.com/go-openapi/jsonpointer v0.21.0 // indirect
|
||||||
github.com/go-openapi/jsonreference v0.20.2 // indirect
|
github.com/go-openapi/jsonreference v0.20.2 // indirect
|
||||||
github.com/go-openapi/swag v0.23.0 // indirect
|
github.com/go-openapi/swag v0.23.0 // indirect
|
||||||
|
@ -28,6 +28,7 @@ import (
|
|||||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/client-go/tools/metrics"
|
"k8s.io/client-go/tools/metrics"
|
||||||
|
"k8s.io/klog/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TlsTransportCache caches TLS http.RoundTrippers different configurations. The
|
// TlsTransportCache caches TLS http.RoundTrippers different configurations. The
|
||||||
@ -116,10 +117,13 @@ func (c *tlsTransportCache) get(config *Config) (http.RoundTripper, error) {
|
|||||||
// If we use are reloading files, we need to handle certificate rotation properly
|
// If we use are reloading files, we need to handle certificate rotation properly
|
||||||
// TODO(jackkleeman): We can also add rotation here when config.HasCertCallback() is true
|
// TODO(jackkleeman): We can also add rotation here when config.HasCertCallback() is true
|
||||||
if config.TLS.ReloadTLSFiles && tlsConfig != nil && tlsConfig.GetClientCertificate != nil {
|
if config.TLS.ReloadTLSFiles && tlsConfig != nil && tlsConfig.GetClientCertificate != nil {
|
||||||
dynamicCertDialer := certRotatingDialer(tlsConfig.GetClientCertificate, dial)
|
// The TLS cache is a singleton, so sharing the same name for all of its
|
||||||
|
// background activity seems okay.
|
||||||
|
logger := klog.Background().WithName("tls-transport-cache")
|
||||||
|
dynamicCertDialer := certRotatingDialer(logger, tlsConfig.GetClientCertificate, dial)
|
||||||
tlsConfig.GetClientCertificate = dynamicCertDialer.GetClientCertificate
|
tlsConfig.GetClientCertificate = dynamicCertDialer.GetClientCertificate
|
||||||
dial = dynamicCertDialer.connDialer.DialContext
|
dial = dynamicCertDialer.connDialer.DialContext
|
||||||
go dynamicCertDialer.Run(DialerStopCh)
|
go dynamicCertDialer.run(DialerStopCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
proxy := http.ProxyFromEnvironment
|
proxy := http.ProxyFromEnvironment
|
||||||
|
@ -19,7 +19,6 @@ package transport
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"fmt"
|
|
||||||
"reflect"
|
"reflect"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -40,6 +39,7 @@ var CertCallbackRefreshDuration = 5 * time.Minute
|
|||||||
type reloadFunc func(*tls.CertificateRequestInfo) (*tls.Certificate, error)
|
type reloadFunc func(*tls.CertificateRequestInfo) (*tls.Certificate, error)
|
||||||
|
|
||||||
type dynamicClientCert struct {
|
type dynamicClientCert struct {
|
||||||
|
logger klog.Logger
|
||||||
clientCert *tls.Certificate
|
clientCert *tls.Certificate
|
||||||
certMtx sync.RWMutex
|
certMtx sync.RWMutex
|
||||||
|
|
||||||
@ -50,8 +50,9 @@ type dynamicClientCert struct {
|
|||||||
queue workqueue.TypedRateLimitingInterface[string]
|
queue workqueue.TypedRateLimitingInterface[string]
|
||||||
}
|
}
|
||||||
|
|
||||||
func certRotatingDialer(reload reloadFunc, dial utilnet.DialFunc) *dynamicClientCert {
|
func certRotatingDialer(logger klog.Logger, reload reloadFunc, dial utilnet.DialFunc) *dynamicClientCert {
|
||||||
d := &dynamicClientCert{
|
d := &dynamicClientCert{
|
||||||
|
logger: logger,
|
||||||
reload: reload,
|
reload: reload,
|
||||||
connDialer: connrotation.NewDialer(connrotation.DialFunc(dial)),
|
connDialer: connrotation.NewDialer(connrotation.DialFunc(dial)),
|
||||||
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
|
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
|
||||||
@ -88,7 +89,7 @@ func (c *dynamicClientCert) loadClientCert() (*tls.Certificate, error) {
|
|||||||
return cert, nil
|
return cert, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.V(1).Infof("certificate rotation detected, shutting down client connections to start using new credentials")
|
c.logger.V(1).Info("Certificate rotation detected, shutting down client connections to start using new credentials")
|
||||||
c.connDialer.CloseAll()
|
c.connDialer.CloseAll()
|
||||||
|
|
||||||
return cert, nil
|
return cert, nil
|
||||||
@ -133,12 +134,12 @@ func byteMatrixEqual(left, right [][]byte) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// run starts the controller and blocks until stopCh is closed.
|
// run starts the controller and blocks until stopCh is closed.
|
||||||
func (c *dynamicClientCert) Run(stopCh <-chan struct{}) {
|
func (c *dynamicClientCert) run(stopCh <-chan struct{}) {
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrashWithLogger(c.logger)
|
||||||
defer c.queue.ShutDown()
|
defer c.queue.ShutDown()
|
||||||
|
|
||||||
klog.V(3).Infof("Starting client certificate rotation controller")
|
c.logger.V(3).Info("Starting client certificate rotation controller")
|
||||||
defer klog.V(3).Infof("Shutting down client certificate rotation controller")
|
defer c.logger.V(3).Info("Shutting down client certificate rotation controller")
|
||||||
|
|
||||||
go wait.Until(c.runWorker, time.Second, stopCh)
|
go wait.Until(c.runWorker, time.Second, stopCh)
|
||||||
|
|
||||||
@ -168,7 +169,7 @@ func (c *dynamicClientCert) processNextWorkItem() bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
|
utilruntime.HandleErrorWithLogger(c.logger, err, "Loading client cert failed", "key", dsKey)
|
||||||
c.queue.AddRateLimited(dsKey)
|
c.queue.AddRateLimited(dsKey)
|
||||||
|
|
||||||
return true
|
return true
|
||||||
|
@ -21,10 +21,12 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptrace"
|
"net/http/httptrace"
|
||||||
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/go-logr/logr"
|
||||||
"golang.org/x/oauth2"
|
"golang.org/x/oauth2"
|
||||||
|
|
||||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||||
@ -68,19 +70,16 @@ func HTTPWrappersForConfig(config *Config, rt http.RoundTripper) (http.RoundTrip
|
|||||||
return rt, nil
|
return rt, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// DebugWrappers wraps a round tripper and logs based on the current log level.
|
// DebugWrappers potentially wraps a round tripper with a wrapper that logs
|
||||||
|
// based on the log level in the context of each individual request.
|
||||||
|
//
|
||||||
|
// At the moment, wrapping depends on the global log verbosity and is done
|
||||||
|
// if that verbosity is >= 6. This may change in the future.
|
||||||
func DebugWrappers(rt http.RoundTripper) http.RoundTripper {
|
func DebugWrappers(rt http.RoundTripper) http.RoundTripper {
|
||||||
switch {
|
//nolint:logcheck // The actual logging is done with a different logger, so only checking here is okay.
|
||||||
case bool(klog.V(9).Enabled()):
|
if klog.V(6).Enabled() {
|
||||||
rt = NewDebuggingRoundTripper(rt, DebugCurlCommand, DebugURLTiming, DebugDetailedTiming, DebugResponseHeaders)
|
rt = NewDebuggingRoundTripper(rt, DebugByContext)
|
||||||
case bool(klog.V(8).Enabled()):
|
|
||||||
rt = NewDebuggingRoundTripper(rt, DebugJustURL, DebugRequestHeaders, DebugResponseStatus, DebugResponseHeaders)
|
|
||||||
case bool(klog.V(7).Enabled()):
|
|
||||||
rt = NewDebuggingRoundTripper(rt, DebugJustURL, DebugRequestHeaders, DebugResponseStatus)
|
|
||||||
case bool(klog.V(6).Enabled()):
|
|
||||||
rt = NewDebuggingRoundTripper(rt, DebugURLTiming)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return rt
|
return rt
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -380,14 +379,17 @@ func (r *requestInfo) toCurl() string {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return fmt.Sprintf("curl -v -X%s %s '%s'", r.RequestVerb, headers, r.RequestURL)
|
// Newline at the end makes this look better in the text log output (the
|
||||||
|
// only usage of this method) because it becomes a multi-line string with
|
||||||
|
// no quoting.
|
||||||
|
return fmt.Sprintf("curl -v -X%s %s '%s'\n", r.RequestVerb, headers, r.RequestURL)
|
||||||
}
|
}
|
||||||
|
|
||||||
// debuggingRoundTripper will display information about the requests passing
|
// debuggingRoundTripper will display information about the requests passing
|
||||||
// through it based on what is configured
|
// through it based on what is configured
|
||||||
type debuggingRoundTripper struct {
|
type debuggingRoundTripper struct {
|
||||||
delegatedRoundTripper http.RoundTripper
|
delegatedRoundTripper http.RoundTripper
|
||||||
levels map[DebugLevel]bool
|
levels int
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ utilnet.RoundTripperWrapper = &debuggingRoundTripper{}
|
var _ utilnet.RoundTripperWrapper = &debuggingRoundTripper{}
|
||||||
@ -412,6 +414,26 @@ const (
|
|||||||
DebugResponseHeaders
|
DebugResponseHeaders
|
||||||
// DebugDetailedTiming will add to the debug output the duration of the HTTP requests events.
|
// DebugDetailedTiming will add to the debug output the duration of the HTTP requests events.
|
||||||
DebugDetailedTiming
|
DebugDetailedTiming
|
||||||
|
// DebugByContext will add any of the above depending on the verbosity of the per-request logger obtained from the requests context.
|
||||||
|
//
|
||||||
|
// Can be combined in NewDebuggingRoundTripper with some of the other options, in which case the
|
||||||
|
// debug roundtripper will always log what is requested there plus the information that gets
|
||||||
|
// enabled by the context's log verbosity.
|
||||||
|
DebugByContext
|
||||||
|
)
|
||||||
|
|
||||||
|
// Different log levels include different sets of information.
|
||||||
|
//
|
||||||
|
// Not exported because the exact content of log messages is not part
|
||||||
|
// of of the package API.
|
||||||
|
const (
|
||||||
|
levelsV6 = (1 << DebugURLTiming)
|
||||||
|
// Logging *less* information for the response at level 7 compared to 6 replicates prior behavior:
|
||||||
|
// https://github.com/kubernetes/kubernetes/blob/2b472fe4690c83a2b343995f88050b2a3e9ff0fa/staging/src/k8s.io/client-go/transport/round_trippers.go#L79
|
||||||
|
// Presumably that was done because verb and URL are already in the request log entry.
|
||||||
|
levelsV7 = (1 << DebugJustURL) | (1 << DebugRequestHeaders) | (1 << DebugResponseStatus)
|
||||||
|
levelsV8 = (1 << DebugJustURL) | (1 << DebugRequestHeaders) | (1 << DebugResponseStatus) | (1 << DebugResponseHeaders)
|
||||||
|
levelsV9 = (1 << DebugCurlCommand) | (1 << DebugURLTiming) | (1 << DebugDetailedTiming) | (1 << DebugResponseHeaders)
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewDebuggingRoundTripper allows to display in the logs output debug information
|
// NewDebuggingRoundTripper allows to display in the logs output debug information
|
||||||
@ -419,10 +441,9 @@ const (
|
|||||||
func NewDebuggingRoundTripper(rt http.RoundTripper, levels ...DebugLevel) http.RoundTripper {
|
func NewDebuggingRoundTripper(rt http.RoundTripper, levels ...DebugLevel) http.RoundTripper {
|
||||||
drt := &debuggingRoundTripper{
|
drt := &debuggingRoundTripper{
|
||||||
delegatedRoundTripper: rt,
|
delegatedRoundTripper: rt,
|
||||||
levels: make(map[DebugLevel]bool, len(levels)),
|
|
||||||
}
|
}
|
||||||
for _, v := range levels {
|
for _, v := range levels {
|
||||||
drt.levels[v] = true
|
drt.levels |= 1 << v
|
||||||
}
|
}
|
||||||
return drt
|
return drt
|
||||||
}
|
}
|
||||||
@ -464,27 +485,51 @@ func maskValue(key string, value string) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (rt *debuggingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
func (rt *debuggingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||||
|
logger := klog.FromContext(req.Context())
|
||||||
|
levels := rt.levels
|
||||||
|
|
||||||
|
// When logging depends on the context, it uses the verbosity of the per-context logger
|
||||||
|
// and a hard-coded mapping of verbosity to debug details. Otherwise all messages
|
||||||
|
// are logged as V(0).
|
||||||
|
if levels&(1<<DebugByContext) != 0 {
|
||||||
|
if loggerV := logger.V(9); loggerV.Enabled() {
|
||||||
|
logger = loggerV
|
||||||
|
// The curl command replaces logging of the URL.
|
||||||
|
levels |= levelsV9
|
||||||
|
} else if loggerV := logger.V(8); loggerV.Enabled() {
|
||||||
|
logger = loggerV
|
||||||
|
levels |= levelsV8
|
||||||
|
} else if loggerV := logger.V(7); loggerV.Enabled() {
|
||||||
|
logger = loggerV
|
||||||
|
levels |= levelsV7
|
||||||
|
} else if loggerV := logger.V(6); loggerV.Enabled() {
|
||||||
|
logger = loggerV
|
||||||
|
levels |= levelsV6
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
reqInfo := newRequestInfo(req)
|
reqInfo := newRequestInfo(req)
|
||||||
|
|
||||||
if rt.levels[DebugJustURL] {
|
kvs := make([]any, 0, 8) // Exactly large enough for all appends below.
|
||||||
klog.Infof("%s %s", reqInfo.RequestVerb, reqInfo.RequestURL)
|
if levels&(1<<DebugJustURL) != 0 {
|
||||||
|
kvs = append(kvs,
|
||||||
|
"verb", reqInfo.RequestVerb,
|
||||||
|
"url", reqInfo.RequestURL,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
if rt.levels[DebugCurlCommand] {
|
if levels&(1<<DebugCurlCommand) != 0 {
|
||||||
klog.Infof("%s", reqInfo.toCurl())
|
kvs = append(kvs, "curlCommand", reqInfo.toCurl())
|
||||||
}
|
}
|
||||||
if rt.levels[DebugRequestHeaders] {
|
if levels&(1<<DebugRequestHeaders) != 0 {
|
||||||
klog.Info("Request Headers:")
|
kvs = append(kvs, "headers", newHeadersMap(reqInfo.RequestHeaders))
|
||||||
for key, values := range reqInfo.RequestHeaders {
|
}
|
||||||
for _, value := range values {
|
if len(kvs) > 0 {
|
||||||
value = maskValue(key, value)
|
logger.Info("Request", kvs...)
|
||||||
klog.Infof(" %s: %s", key, value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
|
|
||||||
if rt.levels[DebugDetailedTiming] {
|
if levels&(1<<DebugDetailedTiming) != 0 {
|
||||||
var getConn, dnsStart, dialStart, tlsStart, serverStart time.Time
|
var getConn, dnsStart, dialStart, tlsStart, serverStart time.Time
|
||||||
var host string
|
var host string
|
||||||
trace := &httptrace.ClientTrace{
|
trace := &httptrace.ClientTrace{
|
||||||
@ -499,7 +544,7 @@ func (rt *debuggingRoundTripper) RoundTrip(req *http.Request) (*http.Response, e
|
|||||||
reqInfo.muTrace.Lock()
|
reqInfo.muTrace.Lock()
|
||||||
defer reqInfo.muTrace.Unlock()
|
defer reqInfo.muTrace.Unlock()
|
||||||
reqInfo.DNSLookup = time.Since(dnsStart)
|
reqInfo.DNSLookup = time.Since(dnsStart)
|
||||||
klog.Infof("HTTP Trace: DNS Lookup for %s resolved to %v", host, info.Addrs)
|
logger.Info("HTTP Trace: DNS Lookup resolved", "host", host, "address", info.Addrs)
|
||||||
},
|
},
|
||||||
// Dial
|
// Dial
|
||||||
ConnectStart: func(network, addr string) {
|
ConnectStart: func(network, addr string) {
|
||||||
@ -512,9 +557,9 @@ func (rt *debuggingRoundTripper) RoundTrip(req *http.Request) (*http.Response, e
|
|||||||
defer reqInfo.muTrace.Unlock()
|
defer reqInfo.muTrace.Unlock()
|
||||||
reqInfo.Dialing = time.Since(dialStart)
|
reqInfo.Dialing = time.Since(dialStart)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Infof("HTTP Trace: Dial to %s:%s failed: %v", network, addr, err)
|
logger.Info("HTTP Trace: Dial failed", "network", network, "address", addr, "err", err)
|
||||||
} else {
|
} else {
|
||||||
klog.Infof("HTTP Trace: Dial to %s:%s succeed", network, addr)
|
logger.Info("HTTP Trace: Dial succeed", "network", network, "address", addr)
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
// TLS
|
// TLS
|
||||||
@ -556,42 +601,85 @@ func (rt *debuggingRoundTripper) RoundTrip(req *http.Request) (*http.Response, e
|
|||||||
|
|
||||||
reqInfo.complete(response, err)
|
reqInfo.complete(response, err)
|
||||||
|
|
||||||
if rt.levels[DebugURLTiming] {
|
kvs = make([]any, 0, 20) // Exactly large enough for all appends below.
|
||||||
klog.Infof("%s %s %s in %d milliseconds", reqInfo.RequestVerb, reqInfo.RequestURL, reqInfo.ResponseStatus, reqInfo.Duration.Nanoseconds()/int64(time.Millisecond))
|
if levels&(1<<DebugURLTiming) != 0 {
|
||||||
|
kvs = append(kvs, "verb", reqInfo.RequestVerb, "url", reqInfo.RequestURL)
|
||||||
}
|
}
|
||||||
if rt.levels[DebugDetailedTiming] {
|
if levels&(1<<DebugURLTiming|1<<DebugResponseStatus) != 0 {
|
||||||
stats := ""
|
kvs = append(kvs, "status", reqInfo.ResponseStatus)
|
||||||
|
}
|
||||||
|
if levels&(1<<DebugResponseHeaders) != 0 {
|
||||||
|
kvs = append(kvs, "headers", newHeadersMap(reqInfo.ResponseHeaders))
|
||||||
|
}
|
||||||
|
if levels&(1<<DebugURLTiming|1<<DebugDetailedTiming|1<<DebugResponseStatus) != 0 {
|
||||||
|
kvs = append(kvs, "milliseconds", reqInfo.Duration.Nanoseconds()/int64(time.Millisecond))
|
||||||
|
}
|
||||||
|
if levels&(1<<DebugDetailedTiming) != 0 {
|
||||||
if !reqInfo.ConnectionReused {
|
if !reqInfo.ConnectionReused {
|
||||||
stats += fmt.Sprintf(`DNSLookup %d ms Dial %d ms TLSHandshake %d ms`,
|
kvs = append(kvs,
|
||||||
reqInfo.DNSLookup.Nanoseconds()/int64(time.Millisecond),
|
"dnsLookupMilliseconds", reqInfo.DNSLookup.Nanoseconds()/int64(time.Millisecond),
|
||||||
reqInfo.Dialing.Nanoseconds()/int64(time.Millisecond),
|
"dialMilliseconds", reqInfo.Dialing.Nanoseconds()/int64(time.Millisecond),
|
||||||
reqInfo.TLSHandshake.Nanoseconds()/int64(time.Millisecond),
|
"tlsHandshakeMilliseconds", reqInfo.TLSHandshake.Nanoseconds()/int64(time.Millisecond),
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
stats += fmt.Sprintf(`GetConnection %d ms`, reqInfo.GetConnection.Nanoseconds()/int64(time.Millisecond))
|
kvs = append(kvs, "getConnectionMilliseconds", reqInfo.GetConnection.Nanoseconds()/int64(time.Millisecond))
|
||||||
}
|
}
|
||||||
if reqInfo.ServerProcessing != 0 {
|
if reqInfo.ServerProcessing != 0 {
|
||||||
stats += fmt.Sprintf(` ServerProcessing %d ms`, reqInfo.ServerProcessing.Nanoseconds()/int64(time.Millisecond))
|
kvs = append(kvs, "serverProcessingMilliseconds", reqInfo.ServerProcessing.Nanoseconds()/int64(time.Millisecond))
|
||||||
}
|
}
|
||||||
stats += fmt.Sprintf(` Duration %d ms`, reqInfo.Duration.Nanoseconds()/int64(time.Millisecond))
|
|
||||||
klog.Infof("HTTP Statistics: %s", stats)
|
|
||||||
}
|
}
|
||||||
|
if len(kvs) > 0 {
|
||||||
if rt.levels[DebugResponseStatus] {
|
logger.Info("Response", kvs...)
|
||||||
klog.Infof("Response Status: %s in %d milliseconds", reqInfo.ResponseStatus, reqInfo.Duration.Nanoseconds()/int64(time.Millisecond))
|
|
||||||
}
|
|
||||||
if rt.levels[DebugResponseHeaders] {
|
|
||||||
klog.Info("Response Headers:")
|
|
||||||
for key, values := range reqInfo.ResponseHeaders {
|
|
||||||
for _, value := range values {
|
|
||||||
klog.Infof(" %s: %s", key, value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return response, err
|
return response, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// headerMap formats headers sorted and across multiple lines with no quoting
|
||||||
|
// when using string output and as JSON when using zapr.
|
||||||
|
type headersMap http.Header
|
||||||
|
|
||||||
|
// newHeadersMap masks all sensitive values. This has to be done before
|
||||||
|
// passing the map to a logger because while in practice all loggers
|
||||||
|
// either use String or MarshalLog, that is not guaranteed.
|
||||||
|
func newHeadersMap(header http.Header) headersMap {
|
||||||
|
h := make(headersMap, len(header))
|
||||||
|
for key, values := range header {
|
||||||
|
maskedValues := make([]string, 0, len(values))
|
||||||
|
for _, value := range values {
|
||||||
|
maskedValues = append(maskedValues, maskValue(key, value))
|
||||||
|
}
|
||||||
|
h[key] = maskedValues
|
||||||
|
}
|
||||||
|
return h
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ fmt.Stringer = headersMap{}
|
||||||
|
var _ logr.Marshaler = headersMap{}
|
||||||
|
|
||||||
|
func (h headersMap) String() string {
|
||||||
|
// The fixed size typically avoids memory allocations when it is large enough.
|
||||||
|
keys := make([]string, 0, 20)
|
||||||
|
for key := range h {
|
||||||
|
keys = append(keys, key)
|
||||||
|
}
|
||||||
|
sort.Strings(keys)
|
||||||
|
var buffer strings.Builder
|
||||||
|
for _, key := range keys {
|
||||||
|
for _, value := range h[key] {
|
||||||
|
_, _ = buffer.WriteString(key)
|
||||||
|
_, _ = buffer.WriteString(": ")
|
||||||
|
_, _ = buffer.WriteString(value)
|
||||||
|
_, _ = buffer.WriteString("\n")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return buffer.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h headersMap) MarshalLog() any {
|
||||||
|
return map[string][]string(h)
|
||||||
|
}
|
||||||
|
|
||||||
func (rt *debuggingRoundTripper) WrappedRoundTripper() http.RoundTripper {
|
func (rt *debuggingRoundTripper) WrappedRoundTripper() http.RoundTripper {
|
||||||
return rt.delegatedRoundTripper
|
return rt.delegatedRoundTripper
|
||||||
}
|
}
|
||||||
|
@ -18,13 +18,18 @@ package transport
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/go-logr/logr/funcr"
|
||||||
|
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -460,101 +465,224 @@ func TestHeaderEscapeRoundTrip(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//nolint:logcheck // Intentionally tests with global logging.
|
||||||
func TestDebuggingRoundTripper(t *testing.T) {
|
func TestDebuggingRoundTripper(t *testing.T) {
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
rawURL := "https://127.0.0.1:12345/api/v1/pods?limit=500"
|
rawURL := "https://127.0.0.1:12345/api/v1/pods?limit=500"
|
||||||
req := &http.Request{
|
parsedURL, err := url.Parse(rawURL)
|
||||||
Method: http.MethodGet,
|
if err != nil {
|
||||||
Header: map[string][]string{
|
t.Fatalf("url.Parse(%q) returned error: %v", rawURL, err)
|
||||||
"Authorization": {"bearer secretauthtoken"},
|
|
||||||
"X-Test-Request": {"test"},
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
method := http.MethodGet
|
||||||
|
header := map[string][]string{
|
||||||
|
"Authorization": {"bearer secretauthtoken"},
|
||||||
|
"X-Test-Request": {"test"},
|
||||||
|
}
|
||||||
|
reqHeaderText := `headers=<
|
||||||
|
Authorization: bearer <masked>
|
||||||
|
X-Test-Request: test
|
||||||
|
>`
|
||||||
|
// Both can be written by funcr.
|
||||||
|
reqHeaderJSON := `"headers":{"Authorization":["bearer <masked>"],"X-Test-Request":["test"]}`
|
||||||
|
reqHeaderJSONReversed := `"headers":{"X-Test-Request":["test"],"Authorization":["bearer <masked>"]}`
|
||||||
|
|
||||||
res := &http.Response{
|
res := &http.Response{
|
||||||
Status: "OK",
|
Status: "OK",
|
||||||
StatusCode: http.StatusOK,
|
StatusCode: http.StatusOK,
|
||||||
Header: map[string][]string{
|
Header: map[string][]string{
|
||||||
"X-Test-Response": {"test"},
|
"X-Test-Response": {"a", "b"},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
resHeaderText := `headers=<
|
||||||
|
X-Test-Response: a
|
||||||
|
X-Test-Response: b
|
||||||
|
>`
|
||||||
|
resHeaderJSON := `"headers":{"X-Test-Response":["a","b"]}`
|
||||||
|
|
||||||
tcs := []struct {
|
tcs := []struct {
|
||||||
levels []DebugLevel
|
levels []DebugLevel
|
||||||
expectedOutputLines []string
|
v int
|
||||||
|
expectedTextLines []string
|
||||||
|
expectedJSONLines []string
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
levels: []DebugLevel{DebugJustURL},
|
levels: []DebugLevel{DebugJustURL},
|
||||||
expectedOutputLines: []string{fmt.Sprintf("%s %s", req.Method, rawURL)},
|
expectedTextLines: []string{fmt.Sprintf(`"Request" verb=%q url=%q`, method, rawURL)},
|
||||||
|
expectedJSONLines: []string{fmt.Sprintf(`"msg":"Request","verb":%q,"url":%q`, method, rawURL)},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
levels: []DebugLevel{DebugRequestHeaders},
|
levels: []DebugLevel{DebugRequestHeaders},
|
||||||
expectedOutputLines: func() []string {
|
expectedTextLines: []string{`"Request" ` + reqHeaderText},
|
||||||
lines := []string{fmt.Sprintf("Request Headers:\n")}
|
expectedJSONLines: []string{`"msg":"Request",` + reqHeaderJSON},
|
||||||
for key, values := range req.Header {
|
|
||||||
for _, value := range values {
|
|
||||||
if key == "Authorization" {
|
|
||||||
value = "bearer <masked>"
|
|
||||||
}
|
|
||||||
lines = append(lines, fmt.Sprintf(" %s: %s\n", key, value))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return lines
|
|
||||||
}(),
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
levels: []DebugLevel{DebugResponseHeaders},
|
levels: []DebugLevel{DebugResponseHeaders},
|
||||||
expectedOutputLines: func() []string {
|
expectedTextLines: []string{`"Response" ` + resHeaderText},
|
||||||
lines := []string{fmt.Sprintf("Response Headers:\n")}
|
expectedJSONLines: []string{`"msg":"Response",` + resHeaderJSON},
|
||||||
for key, values := range res.Header {
|
|
||||||
for _, value := range values {
|
|
||||||
lines = append(lines, fmt.Sprintf(" %s: %s\n", key, value))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return lines
|
|
||||||
}(),
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
levels: []DebugLevel{DebugURLTiming},
|
levels: []DebugLevel{DebugURLTiming},
|
||||||
expectedOutputLines: []string{fmt.Sprintf("%s %s %s", req.Method, rawURL, res.Status)},
|
expectedTextLines: []string{fmt.Sprintf(`"Response" verb=%q url=%q status=%q`, method, rawURL, res.Status)},
|
||||||
|
expectedJSONLines: []string{fmt.Sprintf(`"msg":"Response","verb":%q,"url":%q,"status":%q`, method, rawURL, res.Status)},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
levels: []DebugLevel{DebugResponseStatus},
|
levels: []DebugLevel{DebugResponseStatus},
|
||||||
expectedOutputLines: []string{fmt.Sprintf("Response Status: %s", res.Status)},
|
expectedTextLines: []string{fmt.Sprintf(`"Response" status=%q`, res.Status)},
|
||||||
|
expectedJSONLines: []string{fmt.Sprintf(`"msg":"Response","status":%q`, res.Status)},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
levels: []DebugLevel{DebugCurlCommand},
|
levels: []DebugLevel{DebugCurlCommand},
|
||||||
expectedOutputLines: []string{fmt.Sprintf("curl -v -X")},
|
expectedTextLines: []string{`curlCommand=<
|
||||||
|
curl -v -X`},
|
||||||
|
expectedJSONLines: []string{`"curlCommand":"curl -v -X`},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
levels: []DebugLevel{DebugURLTiming, DebugResponseStatus},
|
||||||
|
expectedTextLines: []string{fmt.Sprintf(`"Response" verb=%q url=%q status=%q milliseconds=`, method, rawURL, res.Status)},
|
||||||
|
expectedJSONLines: []string{fmt.Sprintf(`"msg":"Response","verb":%q,"url":%q,"status":%q,"milliseconds":`, method, rawURL, res.Status)},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
levels: []DebugLevel{DebugByContext},
|
||||||
|
v: 5,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
levels: []DebugLevel{DebugByContext, DebugURLTiming},
|
||||||
|
v: 5,
|
||||||
|
expectedTextLines: []string{
|
||||||
|
fmt.Sprintf(`"Response" verb=%q url=%q status=%q milliseconds=`, method, rawURL, res.Status),
|
||||||
|
},
|
||||||
|
expectedJSONLines: []string{
|
||||||
|
fmt.Sprintf(`"msg":"Response","verb":%q,"url":%q,"status":%q,"milliseconds":`, method, rawURL, res.Status),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
levels: []DebugLevel{DebugByContext},
|
||||||
|
v: 6,
|
||||||
|
expectedTextLines: []string{
|
||||||
|
fmt.Sprintf(`"Response" verb=%q url=%q status=%q milliseconds=`, method, rawURL, res.Status),
|
||||||
|
},
|
||||||
|
expectedJSONLines: []string{
|
||||||
|
fmt.Sprintf(`"msg":"Response","verb":%q,"url":%q,"status":%q,"milliseconds":`, method, rawURL, res.Status),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
levels: []DebugLevel{DebugByContext},
|
||||||
|
v: 7,
|
||||||
|
expectedTextLines: []string{
|
||||||
|
fmt.Sprintf(`"Request" verb=%q url=%q %s
|
||||||
|
`, method, rawURL, reqHeaderText),
|
||||||
|
fmt.Sprintf(`"Response" status=%q milliseconds=`, res.Status),
|
||||||
|
},
|
||||||
|
expectedJSONLines: []string{
|
||||||
|
fmt.Sprintf(`"msg":"Request","verb":%q,"url":%q,%s`, method, rawURL, reqHeaderJSON),
|
||||||
|
fmt.Sprintf(`"msg":"Response","status":%q,"milliseconds":`, res.Status),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
levels: []DebugLevel{DebugByContext},
|
||||||
|
v: 8,
|
||||||
|
expectedTextLines: []string{
|
||||||
|
fmt.Sprintf(`"Request" verb=%q url=%q %s
|
||||||
|
`, method, rawURL, reqHeaderText),
|
||||||
|
fmt.Sprintf(`"Response" status=%q %s milliseconds=`, res.Status, resHeaderText),
|
||||||
|
},
|
||||||
|
expectedJSONLines: []string{
|
||||||
|
fmt.Sprintf(`"msg":"Request","verb":%q,"url":%q,%s`, method, rawURL, reqHeaderJSON),
|
||||||
|
fmt.Sprintf(`"msg":"Response","status":%q,%s,"milliseconds":`, res.Status, resHeaderJSON),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
levels: []DebugLevel{DebugByContext},
|
||||||
|
v: 9,
|
||||||
|
expectedTextLines: []string{
|
||||||
|
fmt.Sprintf(`"Request" curlCommand=<
|
||||||
|
curl -v -X%s`, method),
|
||||||
|
fmt.Sprintf(`"Response" verb=%q url=%q status=%q %s milliseconds=`, method, rawURL, res.Status, resHeaderText),
|
||||||
|
},
|
||||||
|
expectedJSONLines: []string{
|
||||||
|
fmt.Sprintf(`"msg":"Request","curlCommand":"curl -v -X%s`, method),
|
||||||
|
fmt.Sprintf(`"msg":"Response","verb":%q,"url":%q,"status":%q,%s,"milliseconds":`, method, rawURL, res.Status, resHeaderJSON),
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range tcs {
|
for i, tc := range tcs {
|
||||||
// hijack the klog output
|
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
|
||||||
tmpWriteBuffer := bytes.NewBuffer(nil)
|
for _, format := range []string{"text", "JSON"} {
|
||||||
klog.SetOutput(tmpWriteBuffer)
|
t.Run(format, func(t *testing.T) {
|
||||||
klog.LogToStderr(false)
|
// hijack the klog output
|
||||||
|
state := klog.CaptureState()
|
||||||
|
tmpWriteBuffer := bytes.NewBuffer(nil)
|
||||||
|
klog.SetOutput(tmpWriteBuffer)
|
||||||
|
klog.LogToStderr(false)
|
||||||
|
var fs flag.FlagSet
|
||||||
|
klog.InitFlags(&fs)
|
||||||
|
if err := fs.Set("one_output", "true"); err != nil {
|
||||||
|
t.Errorf("unexpected error setting -one_output: %v", err)
|
||||||
|
}
|
||||||
|
if err := fs.Set("v", fmt.Sprintf("%d", tc.v)); err != nil {
|
||||||
|
t.Errorf("unexpected error setting -v: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
// parse rawURL
|
expectOutput := tc.expectedTextLines
|
||||||
parsedURL, err := url.Parse(rawURL)
|
var req *http.Request
|
||||||
if err != nil {
|
if format == "JSON" {
|
||||||
t.Fatalf("url.Parse(%q) returned error: %v", rawURL, err)
|
// Logger will be picked up through the context.
|
||||||
}
|
logger := funcr.NewJSON(func(obj string) {
|
||||||
req.URL = parsedURL
|
_, _ = tmpWriteBuffer.Write([]byte(obj))
|
||||||
|
_, _ = tmpWriteBuffer.Write([]byte("\n"))
|
||||||
|
}, funcr.Options{Verbosity: tc.v})
|
||||||
|
ctx := klog.NewContext(context.Background(), logger)
|
||||||
|
expectOutput = tc.expectedJSONLines
|
||||||
|
r, err := http.NewRequestWithContext(ctx, method, rawURL, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error constructing the HTTP request: %v", err)
|
||||||
|
}
|
||||||
|
req = r
|
||||||
|
} else {
|
||||||
|
// Intentionally no context, as before.
|
||||||
|
req = &http.Request{
|
||||||
|
Method: method,
|
||||||
|
URL: parsedURL,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
req.Header = header
|
||||||
|
|
||||||
// execute the round tripper
|
// execute the round tripper
|
||||||
rt := &testRoundTripper{
|
rt := &testRoundTripper{
|
||||||
Response: res,
|
Response: res,
|
||||||
}
|
}
|
||||||
NewDebuggingRoundTripper(rt, tc.levels...).RoundTrip(req)
|
if len(tc.levels) == 1 && tc.levels[0] == DebugByContext {
|
||||||
|
DebugWrappers(rt).RoundTrip(req)
|
||||||
|
} else {
|
||||||
|
NewDebuggingRoundTripper(rt, tc.levels...).RoundTrip(req)
|
||||||
|
}
|
||||||
|
|
||||||
// call Flush to ensure the text isn't still buffered
|
// call Flush to ensure the text isn't still buffered
|
||||||
klog.Flush()
|
klog.Flush()
|
||||||
|
|
||||||
// check if klog's output contains the expected lines
|
// check if klog's output contains the expected lines
|
||||||
actual := tmpWriteBuffer.String()
|
actual := tmpWriteBuffer.String()
|
||||||
for _, expected := range tc.expectedOutputLines {
|
|
||||||
if !strings.Contains(actual, expected) {
|
// funcr writes a map in non-deterministic order.
|
||||||
t.Errorf("%q does not contain expected output %q", actual, expected)
|
// Fix that up before comparison.
|
||||||
|
actual = strings.ReplaceAll(actual, reqHeaderJSONReversed, reqHeaderJSON)
|
||||||
|
|
||||||
|
for _, expected := range expectOutput {
|
||||||
|
if !strings.Contains(actual, expected) {
|
||||||
|
t.Errorf("verbosity %d: expected this substring:\n%s\n\ngot:\n%s", tc.v, expected, actual)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// These test cases describe all expected lines. Split the log output
|
||||||
|
// into log entries and compare their number.
|
||||||
|
entries := regexp.MustCompile(`(?m)^[I{]`).FindAllStringIndex(actual, -1)
|
||||||
|
if tc.v > 0 && len(entries) != len(expectOutput) {
|
||||||
|
t.Errorf("expected %d output lines, got %d:\n%s", len(expectOutput), len(entries), actual)
|
||||||
|
}
|
||||||
|
|
||||||
|
state.Restore()
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -182,7 +182,10 @@ func (ts *cachingTokenSource) Token() (*oauth2.Token, error) {
|
|||||||
if ts.tok == nil {
|
if ts.tok == nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
klog.Errorf("Unable to rotate token: %v", err)
|
// Not using a caller-provided logger isn't ideal, but impossible to fix
|
||||||
|
// without new APIs that go up all the way to HTTPWrappersForConfig.
|
||||||
|
// This is currently deemed not worth changing (too much effort, not enough benefit).
|
||||||
|
klog.TODO().Error(err, "Unable to rotate token")
|
||||||
return ts.tok, nil
|
return ts.tok, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -353,7 +353,7 @@ func tryCancelRequest(rt http.RoundTripper, req *http.Request) {
|
|||||||
case utilnet.RoundTripperWrapper:
|
case utilnet.RoundTripperWrapper:
|
||||||
tryCancelRequest(rt.WrappedRoundTripper(), req)
|
tryCancelRequest(rt.WrappedRoundTripper(), req)
|
||||||
default:
|
default:
|
||||||
klog.Warningf("Unable to cancel request for %T", rt)
|
klog.FromContext(req.Context()).Info("Warning: unable to cancel request", "roundTripperType", fmt.Sprintf("%T", rt))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user