diff --git a/go.mod b/go.mod index 0b9ba3fc..9f8ad1e2 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ godebug default=go1.23 godebug winsymlink=0 require ( + github.com/go-logr/logr v1.4.2 github.com/gogo/protobuf v1.3.2 github.com/google/gnostic-models v0.6.9 github.com/google/go-cmp v0.6.0 @@ -41,7 +42,6 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/fxamacker/cbor/v2 v2.7.0 // indirect - github.com/go-logr/logr v1.4.2 // indirect github.com/go-openapi/jsonpointer v0.21.0 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect github.com/go-openapi/swag v0.23.0 // indirect diff --git a/transport/cache.go b/transport/cache.go index 7c7f1b33..b8dd8661 100644 --- a/transport/cache.go +++ b/transport/cache.go @@ -28,6 +28,7 @@ import ( utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/metrics" + "k8s.io/klog/v2" ) // TlsTransportCache caches TLS http.RoundTrippers different configurations. The @@ -116,10 +117,13 @@ func (c *tlsTransportCache) get(config *Config) (http.RoundTripper, error) { // If we use are reloading files, we need to handle certificate rotation properly // TODO(jackkleeman): We can also add rotation here when config.HasCertCallback() is true if config.TLS.ReloadTLSFiles && tlsConfig != nil && tlsConfig.GetClientCertificate != nil { - dynamicCertDialer := certRotatingDialer(tlsConfig.GetClientCertificate, dial) + // The TLS cache is a singleton, so sharing the same name for all of its + // background activity seems okay. + logger := klog.Background().WithName("tls-transport-cache") + dynamicCertDialer := certRotatingDialer(logger, tlsConfig.GetClientCertificate, dial) tlsConfig.GetClientCertificate = dynamicCertDialer.GetClientCertificate dial = dynamicCertDialer.connDialer.DialContext - go dynamicCertDialer.Run(DialerStopCh) + go dynamicCertDialer.run(DialerStopCh) } proxy := http.ProxyFromEnvironment diff --git a/transport/cert_rotation.go b/transport/cert_rotation.go index e76f6581..e343f42b 100644 --- a/transport/cert_rotation.go +++ b/transport/cert_rotation.go @@ -19,7 +19,6 @@ package transport import ( "bytes" "crypto/tls" - "fmt" "reflect" "sync" "time" @@ -40,6 +39,7 @@ var CertCallbackRefreshDuration = 5 * time.Minute type reloadFunc func(*tls.CertificateRequestInfo) (*tls.Certificate, error) type dynamicClientCert struct { + logger klog.Logger clientCert *tls.Certificate certMtx sync.RWMutex @@ -50,8 +50,9 @@ type dynamicClientCert struct { queue workqueue.TypedRateLimitingInterface[string] } -func certRotatingDialer(reload reloadFunc, dial utilnet.DialFunc) *dynamicClientCert { +func certRotatingDialer(logger klog.Logger, reload reloadFunc, dial utilnet.DialFunc) *dynamicClientCert { d := &dynamicClientCert{ + logger: logger, reload: reload, connDialer: connrotation.NewDialer(connrotation.DialFunc(dial)), queue: workqueue.NewTypedRateLimitingQueueWithConfig( @@ -88,7 +89,7 @@ func (c *dynamicClientCert) loadClientCert() (*tls.Certificate, error) { return cert, nil } - klog.V(1).Infof("certificate rotation detected, shutting down client connections to start using new credentials") + c.logger.V(1).Info("Certificate rotation detected, shutting down client connections to start using new credentials") c.connDialer.CloseAll() return cert, nil @@ -133,12 +134,12 @@ func byteMatrixEqual(left, right [][]byte) bool { } // run starts the controller and blocks until stopCh is closed. -func (c *dynamicClientCert) Run(stopCh <-chan struct{}) { - defer utilruntime.HandleCrash() +func (c *dynamicClientCert) run(stopCh <-chan struct{}) { + defer utilruntime.HandleCrashWithLogger(c.logger) defer c.queue.ShutDown() - klog.V(3).Infof("Starting client certificate rotation controller") - defer klog.V(3).Infof("Shutting down client certificate rotation controller") + c.logger.V(3).Info("Starting client certificate rotation controller") + defer c.logger.V(3).Info("Shutting down client certificate rotation controller") go wait.Until(c.runWorker, time.Second, stopCh) @@ -168,7 +169,7 @@ func (c *dynamicClientCert) processNextWorkItem() bool { return true } - utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err)) + utilruntime.HandleErrorWithLogger(c.logger, err, "Loading client cert failed", "key", dsKey) c.queue.AddRateLimited(dsKey) return true diff --git a/transport/round_trippers.go b/transport/round_trippers.go index 52fefb53..39fcebd9 100644 --- a/transport/round_trippers.go +++ b/transport/round_trippers.go @@ -21,10 +21,12 @@ import ( "fmt" "net/http" "net/http/httptrace" + "sort" "strings" "sync" "time" + "github.com/go-logr/logr" "golang.org/x/oauth2" utilnet "k8s.io/apimachinery/pkg/util/net" @@ -68,19 +70,16 @@ func HTTPWrappersForConfig(config *Config, rt http.RoundTripper) (http.RoundTrip return rt, nil } -// DebugWrappers wraps a round tripper and logs based on the current log level. +// DebugWrappers potentially wraps a round tripper with a wrapper that logs +// based on the log level in the context of each individual request. +// +// At the moment, wrapping depends on the global log verbosity and is done +// if that verbosity is >= 6. This may change in the future. func DebugWrappers(rt http.RoundTripper) http.RoundTripper { - switch { - case bool(klog.V(9).Enabled()): - rt = NewDebuggingRoundTripper(rt, DebugCurlCommand, DebugURLTiming, DebugDetailedTiming, DebugResponseHeaders) - case bool(klog.V(8).Enabled()): - rt = NewDebuggingRoundTripper(rt, DebugJustURL, DebugRequestHeaders, DebugResponseStatus, DebugResponseHeaders) - case bool(klog.V(7).Enabled()): - rt = NewDebuggingRoundTripper(rt, DebugJustURL, DebugRequestHeaders, DebugResponseStatus) - case bool(klog.V(6).Enabled()): - rt = NewDebuggingRoundTripper(rt, DebugURLTiming) + //nolint:logcheck // The actual logging is done with a different logger, so only checking here is okay. + if klog.V(6).Enabled() { + rt = NewDebuggingRoundTripper(rt, DebugByContext) } - return rt } @@ -380,14 +379,17 @@ func (r *requestInfo) toCurl() string { } } - return fmt.Sprintf("curl -v -X%s %s '%s'", r.RequestVerb, headers, r.RequestURL) + // Newline at the end makes this look better in the text log output (the + // only usage of this method) because it becomes a multi-line string with + // no quoting. + return fmt.Sprintf("curl -v -X%s %s '%s'\n", r.RequestVerb, headers, r.RequestURL) } // debuggingRoundTripper will display information about the requests passing // through it based on what is configured type debuggingRoundTripper struct { delegatedRoundTripper http.RoundTripper - levels map[DebugLevel]bool + levels int } var _ utilnet.RoundTripperWrapper = &debuggingRoundTripper{} @@ -412,6 +414,26 @@ const ( DebugResponseHeaders // DebugDetailedTiming will add to the debug output the duration of the HTTP requests events. DebugDetailedTiming + // DebugByContext will add any of the above depending on the verbosity of the per-request logger obtained from the requests context. + // + // Can be combined in NewDebuggingRoundTripper with some of the other options, in which case the + // debug roundtripper will always log what is requested there plus the information that gets + // enabled by the context's log verbosity. + DebugByContext +) + +// Different log levels include different sets of information. +// +// Not exported because the exact content of log messages is not part +// of of the package API. +const ( + levelsV6 = (1 << DebugURLTiming) + // Logging *less* information for the response at level 7 compared to 6 replicates prior behavior: + // https://github.com/kubernetes/kubernetes/blob/2b472fe4690c83a2b343995f88050b2a3e9ff0fa/staging/src/k8s.io/client-go/transport/round_trippers.go#L79 + // Presumably that was done because verb and URL are already in the request log entry. + levelsV7 = (1 << DebugJustURL) | (1 << DebugRequestHeaders) | (1 << DebugResponseStatus) + levelsV8 = (1 << DebugJustURL) | (1 << DebugRequestHeaders) | (1 << DebugResponseStatus) | (1 << DebugResponseHeaders) + levelsV9 = (1 << DebugCurlCommand) | (1 << DebugURLTiming) | (1 << DebugDetailedTiming) | (1 << DebugResponseHeaders) ) // NewDebuggingRoundTripper allows to display in the logs output debug information @@ -419,10 +441,9 @@ const ( func NewDebuggingRoundTripper(rt http.RoundTripper, levels ...DebugLevel) http.RoundTripper { drt := &debuggingRoundTripper{ delegatedRoundTripper: rt, - levels: make(map[DebugLevel]bool, len(levels)), } for _, v := range levels { - drt.levels[v] = true + drt.levels |= 1 << v } return drt } @@ -464,27 +485,51 @@ func maskValue(key string, value string) string { } func (rt *debuggingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + logger := klog.FromContext(req.Context()) + levels := rt.levels + + // When logging depends on the context, it uses the verbosity of the per-context logger + // and a hard-coded mapping of verbosity to debug details. Otherwise all messages + // are logged as V(0). + if levels&(1< 0 { + logger.Info("Request", kvs...) } startTime := time.Now() - if rt.levels[DebugDetailedTiming] { + if levels&(1< 0 { + logger.Info("Response", kvs...) } return response, err } +// headerMap formats headers sorted and across multiple lines with no quoting +// when using string output and as JSON when using zapr. +type headersMap http.Header + +// newHeadersMap masks all sensitive values. This has to be done before +// passing the map to a logger because while in practice all loggers +// either use String or MarshalLog, that is not guaranteed. +func newHeadersMap(header http.Header) headersMap { + h := make(headersMap, len(header)) + for key, values := range header { + maskedValues := make([]string, 0, len(values)) + for _, value := range values { + maskedValues = append(maskedValues, maskValue(key, value)) + } + h[key] = maskedValues + } + return h +} + +var _ fmt.Stringer = headersMap{} +var _ logr.Marshaler = headersMap{} + +func (h headersMap) String() string { + // The fixed size typically avoids memory allocations when it is large enough. + keys := make([]string, 0, 20) + for key := range h { + keys = append(keys, key) + } + sort.Strings(keys) + var buffer strings.Builder + for _, key := range keys { + for _, value := range h[key] { + _, _ = buffer.WriteString(key) + _, _ = buffer.WriteString(": ") + _, _ = buffer.WriteString(value) + _, _ = buffer.WriteString("\n") + } + } + return buffer.String() +} + +func (h headersMap) MarshalLog() any { + return map[string][]string(h) +} + func (rt *debuggingRoundTripper) WrappedRoundTripper() http.RoundTripper { return rt.delegatedRoundTripper } diff --git a/transport/round_trippers_test.go b/transport/round_trippers_test.go index 1e20f709..7ccfc3a6 100644 --- a/transport/round_trippers_test.go +++ b/transport/round_trippers_test.go @@ -18,13 +18,18 @@ package transport import ( "bytes" + "context" + "flag" "fmt" "net/http" "net/url" "reflect" + "regexp" "strings" "testing" + "github.com/go-logr/logr/funcr" + "k8s.io/klog/v2" ) @@ -460,101 +465,224 @@ func TestHeaderEscapeRoundTrip(t *testing.T) { } } +//nolint:logcheck // Intentionally tests with global logging. func TestDebuggingRoundTripper(t *testing.T) { - t.Parallel() - rawURL := "https://127.0.0.1:12345/api/v1/pods?limit=500" - req := &http.Request{ - Method: http.MethodGet, - Header: map[string][]string{ - "Authorization": {"bearer secretauthtoken"}, - "X-Test-Request": {"test"}, - }, + parsedURL, err := url.Parse(rawURL) + if err != nil { + t.Fatalf("url.Parse(%q) returned error: %v", rawURL, err) } + method := http.MethodGet + header := map[string][]string{ + "Authorization": {"bearer secretauthtoken"}, + "X-Test-Request": {"test"}, + } + reqHeaderText := `headers=< + Authorization: bearer + X-Test-Request: test + >` + // Both can be written by funcr. + reqHeaderJSON := `"headers":{"Authorization":["bearer "],"X-Test-Request":["test"]}` + reqHeaderJSONReversed := `"headers":{"X-Test-Request":["test"],"Authorization":["bearer "]}` + res := &http.Response{ Status: "OK", StatusCode: http.StatusOK, Header: map[string][]string{ - "X-Test-Response": {"test"}, + "X-Test-Response": {"a", "b"}, }, } + + resHeaderText := `headers=< + X-Test-Response: a + X-Test-Response: b + >` + resHeaderJSON := `"headers":{"X-Test-Response":["a","b"]}` + tcs := []struct { - levels []DebugLevel - expectedOutputLines []string + levels []DebugLevel + v int + expectedTextLines []string + expectedJSONLines []string }{ { - levels: []DebugLevel{DebugJustURL}, - expectedOutputLines: []string{fmt.Sprintf("%s %s", req.Method, rawURL)}, + levels: []DebugLevel{DebugJustURL}, + expectedTextLines: []string{fmt.Sprintf(`"Request" verb=%q url=%q`, method, rawURL)}, + expectedJSONLines: []string{fmt.Sprintf(`"msg":"Request","verb":%q,"url":%q`, method, rawURL)}, }, { - levels: []DebugLevel{DebugRequestHeaders}, - expectedOutputLines: func() []string { - lines := []string{fmt.Sprintf("Request Headers:\n")} - for key, values := range req.Header { - for _, value := range values { - if key == "Authorization" { - value = "bearer " - } - lines = append(lines, fmt.Sprintf(" %s: %s\n", key, value)) - } - } - return lines - }(), + levels: []DebugLevel{DebugRequestHeaders}, + expectedTextLines: []string{`"Request" ` + reqHeaderText}, + expectedJSONLines: []string{`"msg":"Request",` + reqHeaderJSON}, }, { - levels: []DebugLevel{DebugResponseHeaders}, - expectedOutputLines: func() []string { - lines := []string{fmt.Sprintf("Response Headers:\n")} - for key, values := range res.Header { - for _, value := range values { - lines = append(lines, fmt.Sprintf(" %s: %s\n", key, value)) - } - } - return lines - }(), + levels: []DebugLevel{DebugResponseHeaders}, + expectedTextLines: []string{`"Response" ` + resHeaderText}, + expectedJSONLines: []string{`"msg":"Response",` + resHeaderJSON}, }, { - levels: []DebugLevel{DebugURLTiming}, - expectedOutputLines: []string{fmt.Sprintf("%s %s %s", req.Method, rawURL, res.Status)}, + levels: []DebugLevel{DebugURLTiming}, + expectedTextLines: []string{fmt.Sprintf(`"Response" verb=%q url=%q status=%q`, method, rawURL, res.Status)}, + expectedJSONLines: []string{fmt.Sprintf(`"msg":"Response","verb":%q,"url":%q,"status":%q`, method, rawURL, res.Status)}, }, { - levels: []DebugLevel{DebugResponseStatus}, - expectedOutputLines: []string{fmt.Sprintf("Response Status: %s", res.Status)}, + levels: []DebugLevel{DebugResponseStatus}, + expectedTextLines: []string{fmt.Sprintf(`"Response" status=%q`, res.Status)}, + expectedJSONLines: []string{fmt.Sprintf(`"msg":"Response","status":%q`, res.Status)}, }, { - levels: []DebugLevel{DebugCurlCommand}, - expectedOutputLines: []string{fmt.Sprintf("curl -v -X")}, + levels: []DebugLevel{DebugCurlCommand}, + expectedTextLines: []string{`curlCommand=< + curl -v -X`}, + expectedJSONLines: []string{`"curlCommand":"curl -v -X`}, + }, + { + levels: []DebugLevel{DebugURLTiming, DebugResponseStatus}, + expectedTextLines: []string{fmt.Sprintf(`"Response" verb=%q url=%q status=%q milliseconds=`, method, rawURL, res.Status)}, + expectedJSONLines: []string{fmt.Sprintf(`"msg":"Response","verb":%q,"url":%q,"status":%q,"milliseconds":`, method, rawURL, res.Status)}, + }, + { + levels: []DebugLevel{DebugByContext}, + v: 5, + }, + { + levels: []DebugLevel{DebugByContext, DebugURLTiming}, + v: 5, + expectedTextLines: []string{ + fmt.Sprintf(`"Response" verb=%q url=%q status=%q milliseconds=`, method, rawURL, res.Status), + }, + expectedJSONLines: []string{ + fmt.Sprintf(`"msg":"Response","verb":%q,"url":%q,"status":%q,"milliseconds":`, method, rawURL, res.Status), + }, + }, + { + levels: []DebugLevel{DebugByContext}, + v: 6, + expectedTextLines: []string{ + fmt.Sprintf(`"Response" verb=%q url=%q status=%q milliseconds=`, method, rawURL, res.Status), + }, + expectedJSONLines: []string{ + fmt.Sprintf(`"msg":"Response","verb":%q,"url":%q,"status":%q,"milliseconds":`, method, rawURL, res.Status), + }, + }, + { + levels: []DebugLevel{DebugByContext}, + v: 7, + expectedTextLines: []string{ + fmt.Sprintf(`"Request" verb=%q url=%q %s +`, method, rawURL, reqHeaderText), + fmt.Sprintf(`"Response" status=%q milliseconds=`, res.Status), + }, + expectedJSONLines: []string{ + fmt.Sprintf(`"msg":"Request","verb":%q,"url":%q,%s`, method, rawURL, reqHeaderJSON), + fmt.Sprintf(`"msg":"Response","status":%q,"milliseconds":`, res.Status), + }, + }, + { + levels: []DebugLevel{DebugByContext}, + v: 8, + expectedTextLines: []string{ + fmt.Sprintf(`"Request" verb=%q url=%q %s +`, method, rawURL, reqHeaderText), + fmt.Sprintf(`"Response" status=%q %s milliseconds=`, res.Status, resHeaderText), + }, + expectedJSONLines: []string{ + fmt.Sprintf(`"msg":"Request","verb":%q,"url":%q,%s`, method, rawURL, reqHeaderJSON), + fmt.Sprintf(`"msg":"Response","status":%q,%s,"milliseconds":`, res.Status, resHeaderJSON), + }, + }, + { + levels: []DebugLevel{DebugByContext}, + v: 9, + expectedTextLines: []string{ + fmt.Sprintf(`"Request" curlCommand=< + curl -v -X%s`, method), + fmt.Sprintf(`"Response" verb=%q url=%q status=%q %s milliseconds=`, method, rawURL, res.Status, resHeaderText), + }, + expectedJSONLines: []string{ + fmt.Sprintf(`"msg":"Request","curlCommand":"curl -v -X%s`, method), + fmt.Sprintf(`"msg":"Response","verb":%q,"url":%q,"status":%q,%s,"milliseconds":`, method, rawURL, res.Status, resHeaderJSON), + }, }, } - for _, tc := range tcs { - // hijack the klog output - tmpWriteBuffer := bytes.NewBuffer(nil) - klog.SetOutput(tmpWriteBuffer) - klog.LogToStderr(false) + for i, tc := range tcs { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + for _, format := range []string{"text", "JSON"} { + t.Run(format, func(t *testing.T) { + // hijack the klog output + state := klog.CaptureState() + tmpWriteBuffer := bytes.NewBuffer(nil) + klog.SetOutput(tmpWriteBuffer) + klog.LogToStderr(false) + var fs flag.FlagSet + klog.InitFlags(&fs) + if err := fs.Set("one_output", "true"); err != nil { + t.Errorf("unexpected error setting -one_output: %v", err) + } + if err := fs.Set("v", fmt.Sprintf("%d", tc.v)); err != nil { + t.Errorf("unexpected error setting -v: %v", err) + } - // parse rawURL - parsedURL, err := url.Parse(rawURL) - if err != nil { - t.Fatalf("url.Parse(%q) returned error: %v", rawURL, err) - } - req.URL = parsedURL + expectOutput := tc.expectedTextLines + var req *http.Request + if format == "JSON" { + // Logger will be picked up through the context. + logger := funcr.NewJSON(func(obj string) { + _, _ = tmpWriteBuffer.Write([]byte(obj)) + _, _ = tmpWriteBuffer.Write([]byte("\n")) + }, funcr.Options{Verbosity: tc.v}) + ctx := klog.NewContext(context.Background(), logger) + expectOutput = tc.expectedJSONLines + r, err := http.NewRequestWithContext(ctx, method, rawURL, nil) + if err != nil { + t.Fatalf("unexpected error constructing the HTTP request: %v", err) + } + req = r + } else { + // Intentionally no context, as before. + req = &http.Request{ + Method: method, + URL: parsedURL, + } + } + req.Header = header - // execute the round tripper - rt := &testRoundTripper{ - Response: res, - } - NewDebuggingRoundTripper(rt, tc.levels...).RoundTrip(req) + // execute the round tripper + rt := &testRoundTripper{ + Response: res, + } + if len(tc.levels) == 1 && tc.levels[0] == DebugByContext { + DebugWrappers(rt).RoundTrip(req) + } else { + NewDebuggingRoundTripper(rt, tc.levels...).RoundTrip(req) + } - // call Flush to ensure the text isn't still buffered - klog.Flush() + // call Flush to ensure the text isn't still buffered + klog.Flush() - // check if klog's output contains the expected lines - actual := tmpWriteBuffer.String() - for _, expected := range tc.expectedOutputLines { - if !strings.Contains(actual, expected) { - t.Errorf("%q does not contain expected output %q", actual, expected) + // check if klog's output contains the expected lines + actual := tmpWriteBuffer.String() + + // funcr writes a map in non-deterministic order. + // Fix that up before comparison. + actual = strings.ReplaceAll(actual, reqHeaderJSONReversed, reqHeaderJSON) + + for _, expected := range expectOutput { + if !strings.Contains(actual, expected) { + t.Errorf("verbosity %d: expected this substring:\n%s\n\ngot:\n%s", tc.v, expected, actual) + } + } + // These test cases describe all expected lines. Split the log output + // into log entries and compare their number. + entries := regexp.MustCompile(`(?m)^[I{]`).FindAllStringIndex(actual, -1) + if tc.v > 0 && len(entries) != len(expectOutput) { + t.Errorf("expected %d output lines, got %d:\n%s", len(expectOutput), len(entries), actual) + } + + state.Restore() + }) } - } + }) } } diff --git a/transport/token_source.go b/transport/token_source.go index 8e312800..469dd817 100644 --- a/transport/token_source.go +++ b/transport/token_source.go @@ -182,7 +182,10 @@ func (ts *cachingTokenSource) Token() (*oauth2.Token, error) { if ts.tok == nil { return nil, err } - klog.Errorf("Unable to rotate token: %v", err) + // Not using a caller-provided logger isn't ideal, but impossible to fix + // without new APIs that go up all the way to HTTPWrappersForConfig. + // This is currently deemed not worth changing (too much effort, not enough benefit). + klog.TODO().Error(err, "Unable to rotate token") return ts.tok, nil } diff --git a/transport/transport.go b/transport/transport.go index 4770331a..8fdcc570 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -353,7 +353,7 @@ func tryCancelRequest(rt http.RoundTripper, req *http.Request) { case utilnet.RoundTripperWrapper: tryCancelRequest(rt.WrappedRoundTripper(), req) default: - klog.Warningf("Unable to cancel request for %T", rt) + klog.FromContext(req.Context()).Info("Warning: unable to cancel request", "roundTripperType", fmt.Sprintf("%T", rt)) } }