client-go: extract warning headers from API responses

Kubernetes-commit: b1098bd0d53658bfb945e485683d543ab7dc73ba
This commit is contained in:
Jordan Liggitt 2019-01-17 11:35:07 -05:00 committed by Kubernetes Publisher
parent 63eae69e3c
commit 75f9ee62c1
5 changed files with 213 additions and 18 deletions

View File

@ -94,6 +94,10 @@ type RESTClient struct {
// overridden.
rateLimiter flowcontrol.RateLimiter
// warningHandler is shared among all requests created by this client.
// If not set, defaultWarningHandler is used.
warningHandler WarningHandler
// Set specific behavior of the client. If not set http.DefaultClient will be used.
Client *http.Client
}

View File

@ -123,6 +123,10 @@ type Config struct {
// Rate limiter for limiting connections to the master from this client. If present overwrites QPS/Burst
RateLimiter flowcontrol.RateLimiter
// WarningHandler handles warnings in server responses.
// If not set, the default warning handler is used.
WarningHandler WarningHandler
// The maximum length of time to wait before giving up on a server request. A value of zero means no timeout.
Timeout time.Duration
@ -339,7 +343,11 @@ func RESTClientFor(config *Config) (*RESTClient, error) {
Negotiator: runtime.NewClientNegotiator(config.NegotiatedSerializer, gv),
}
return NewRESTClient(baseURL, versionedAPIPath, clientContent, rateLimiter, httpClient)
restClient, err := NewRESTClient(baseURL, versionedAPIPath, clientContent, rateLimiter, httpClient)
if err == nil && config.WarningHandler != nil {
restClient.warningHandler = config.WarningHandler
}
return restClient, err
}
// UnversionedRESTClientFor is the same as RESTClientFor, except that it allows
@ -393,7 +401,11 @@ func UnversionedRESTClientFor(config *Config) (*RESTClient, error) {
Negotiator: runtime.NewClientNegotiator(config.NegotiatedSerializer, gv),
}
return NewRESTClient(baseURL, versionedAPIPath, clientContent, rateLimiter, httpClient)
restClient, err := NewRESTClient(baseURL, versionedAPIPath, clientContent, rateLimiter, httpClient)
if err == nil && config.WarningHandler != nil {
restClient.warningHandler = config.WarningHandler
}
return restClient, err
}
// SetKubernetesDefaults sets default values on the provided client config for accessing the
@ -562,6 +574,7 @@ func AnonymousClientConfig(config *Config) *Config {
NextProtos: config.TLSClientConfig.NextProtos,
},
RateLimiter: config.RateLimiter,
WarningHandler: config.WarningHandler,
UserAgent: config.UserAgent,
DisableCompression: config.DisableCompression,
QPS: config.QPS,
@ -608,6 +621,7 @@ func CopyConfig(config *Config) *Config {
QPS: config.QPS,
Burst: config.Burst,
RateLimiter: config.RateLimiter,
WarningHandler: config.WarningHandler,
Timeout: config.Timeout,
Dial: config.Dial,
Proxy: config.Proxy,

View File

@ -258,6 +258,10 @@ var fakeWrapperFunc = func(http.RoundTripper) http.RoundTripper {
return &fakeRoundTripper{}
}
type fakeWarningHandler struct{}
func (f fakeWarningHandler) HandleWarningHeader(code int, agent string, message string) {}
type fakeNegotiatedSerializer struct{}
func (n *fakeNegotiatedSerializer) SupportedMediaTypes() []runtime.SerializerInfo {
@ -319,6 +323,9 @@ func TestAnonymousConfig(t *testing.T) {
f.Fuzz(limiter)
*r = limiter
},
func(h *WarningHandler, f fuzz.Continue) {
*h = &fakeWarningHandler{}
},
// Authentication does not require fuzzer
func(r *AuthProviderConfigPersister, f fuzz.Continue) {},
func(r *clientcmdapi.AuthProviderConfig, f fuzz.Continue) {
@ -409,6 +416,9 @@ func TestCopyConfig(t *testing.T) {
f.Fuzz(limiter)
*r = limiter
},
func(h *WarningHandler, f fuzz.Continue) {
*h = &fakeWarningHandler{}
},
func(r *AuthProviderConfigPersister, f fuzz.Continue) {
*r = fakeAuthProviderConfigPersister{}
},
@ -595,12 +605,13 @@ func TestConfigSprint(t *testing.T) {
QPS: 1,
Burst: 2,
RateLimiter: &fakeLimiter{},
WarningHandler: fakeWarningHandler{},
Timeout: 3 * time.Second,
Dial: fakeDialFunc,
Proxy: fakeProxyFunc,
}
want := fmt.Sprintf(
`&rest.Config{Host:"localhost:8080", APIPath:"v1", ContentConfig:rest.ContentConfig{AcceptContentTypes:"application/json", ContentType:"application/json", GroupVersion:(*schema.GroupVersion)(nil), NegotiatedSerializer:runtime.NegotiatedSerializer(nil)}, Username:"gopher", Password:"--- REDACTED ---", BearerToken:"--- REDACTED ---", BearerTokenFile:"", Impersonate:rest.ImpersonationConfig{UserName:"gopher2", Groups:[]string(nil), Extra:map[string][]string(nil)}, AuthProvider:api.AuthProviderConfig{Name: "gopher", Config: map[string]string{--- REDACTED ---}}, AuthConfigPersister:rest.AuthProviderConfigPersister(--- REDACTED ---), ExecProvider:api.AuthProviderConfig{Command: "sudo", Args: []string{"--- REDACTED ---"}, Env: []ExecEnvVar{--- REDACTED ---}, APIVersion: ""}, TLSClientConfig:rest.sanitizedTLSClientConfig{Insecure:false, ServerName:"", CertFile:"a.crt", KeyFile:"a.key", CAFile:"", CertData:[]uint8{0x2d, 0x2d, 0x2d, 0x20, 0x54, 0x52, 0x55, 0x4e, 0x43, 0x41, 0x54, 0x45, 0x44, 0x20, 0x2d, 0x2d, 0x2d}, KeyData:[]uint8{0x2d, 0x2d, 0x2d, 0x20, 0x52, 0x45, 0x44, 0x41, 0x43, 0x54, 0x45, 0x44, 0x20, 0x2d, 0x2d, 0x2d}, CAData:[]uint8(nil), NextProtos:[]string{"h2", "http/1.1"}}, UserAgent:"gobot", DisableCompression:false, Transport:(*rest.fakeRoundTripper)(%p), WrapTransport:(transport.WrapperFunc)(%p), QPS:1, Burst:2, RateLimiter:(*rest.fakeLimiter)(%p), Timeout:3000000000, Dial:(func(context.Context, string, string) (net.Conn, error))(%p), Proxy:(func(*http.Request) (*url.URL, error))(%p)}`,
`&rest.Config{Host:"localhost:8080", APIPath:"v1", ContentConfig:rest.ContentConfig{AcceptContentTypes:"application/json", ContentType:"application/json", GroupVersion:(*schema.GroupVersion)(nil), NegotiatedSerializer:runtime.NegotiatedSerializer(nil)}, Username:"gopher", Password:"--- REDACTED ---", BearerToken:"--- REDACTED ---", BearerTokenFile:"", Impersonate:rest.ImpersonationConfig{UserName:"gopher2", Groups:[]string(nil), Extra:map[string][]string(nil)}, AuthProvider:api.AuthProviderConfig{Name: "gopher", Config: map[string]string{--- REDACTED ---}}, AuthConfigPersister:rest.AuthProviderConfigPersister(--- REDACTED ---), ExecProvider:api.AuthProviderConfig{Command: "sudo", Args: []string{"--- REDACTED ---"}, Env: []ExecEnvVar{--- REDACTED ---}, APIVersion: ""}, TLSClientConfig:rest.sanitizedTLSClientConfig{Insecure:false, ServerName:"", CertFile:"a.crt", KeyFile:"a.key", CAFile:"", CertData:[]uint8{0x2d, 0x2d, 0x2d, 0x20, 0x54, 0x52, 0x55, 0x4e, 0x43, 0x41, 0x54, 0x45, 0x44, 0x20, 0x2d, 0x2d, 0x2d}, KeyData:[]uint8{0x2d, 0x2d, 0x2d, 0x20, 0x52, 0x45, 0x44, 0x41, 0x43, 0x54, 0x45, 0x44, 0x20, 0x2d, 0x2d, 0x2d}, CAData:[]uint8(nil), NextProtos:[]string{"h2", "http/1.1"}}, UserAgent:"gobot", DisableCompression:false, Transport:(*rest.fakeRoundTripper)(%p), WrapTransport:(transport.WrapperFunc)(%p), QPS:1, Burst:2, RateLimiter:(*rest.fakeLimiter)(%p), WarningHandler:rest.fakeWarningHandler{}, Timeout:3000000000, Dial:(func(context.Context, string, string) (net.Conn, error))(%p), Proxy:(func(*http.Request) (*url.URL, error))(%p)}`,
c.Transport, fakeWrapperFunc, c.RateLimiter, fakeDialFunc, fakeProxyFunc,
)

View File

@ -88,6 +88,8 @@ var noBackoff = &NoBackoff{}
type Request struct {
c *RESTClient
warningHandler WarningHandler
rateLimiter flowcontrol.RateLimiter
backoff BackoffManager
timeout time.Duration
@ -141,6 +143,7 @@ func NewRequest(c *RESTClient) *Request {
timeout: timeout,
pathPrefix: pathPrefix,
maxRetries: 10,
warningHandler: c.warningHandler,
}
switch {
@ -218,6 +221,13 @@ func (r *Request) BackOff(manager BackoffManager) *Request {
return r
}
// WarningHandler sets the handler this client uses when warning headers are encountered.
// If set to nil, this client will use the default warning handler (see SetDefaultWarningHandler).
func (r *Request) WarningHandler(handler WarningHandler) *Request {
r.warningHandler = handler
return r
}
// Throttle receives a rate-limiter and sets or replaces an existing request limiter
func (r *Request) Throttle(limiter flowcontrol.RateLimiter) *Request {
r.rateLimiter = limiter
@ -692,6 +702,8 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
return nil, err
}
handleWarnings(resp.Header, r.warningHandler)
frameReader := framer.NewFrameReader(resp.Body)
watchEventDecoder := streaming.NewDecoder(frameReader, streamingSerializer)
@ -764,6 +776,7 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
switch {
case (resp.StatusCode >= 200) && (resp.StatusCode < 300):
handleWarnings(resp.Header, r.warningHandler)
return resp.Body, nil
default:
@ -1020,6 +1033,7 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu
body: body,
contentType: contentType,
statusCode: resp.StatusCode,
warnings: handleWarnings(resp.Header, r.warningHandler),
}
}
}
@ -1038,6 +1052,7 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu
statusCode: resp.StatusCode,
decoder: decoder,
err: err,
warnings: handleWarnings(resp.Header, r.warningHandler),
}
}
@ -1046,6 +1061,7 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu
contentType: contentType,
statusCode: resp.StatusCode,
decoder: decoder,
warnings: handleWarnings(resp.Header, r.warningHandler),
}
}
@ -1181,6 +1197,7 @@ func retryAfterSeconds(resp *http.Response) (int, bool) {
// Result contains the result of calling Request.Do().
type Result struct {
body []byte
warnings []net.WarningHeader
contentType string
err error
statusCode int
@ -1294,6 +1311,11 @@ func (r Result) Error() error {
return r.err
}
// Warnings returns any warning headers received in the response
func (r Result) Warnings() []net.WarningHeader {
return r.warnings
}
// NameMayNotBe specifies strings that cannot be used as names specified as path segments (like the REST API or etcd store)
var NameMayNotBe = []string{".", ".."}

144
rest/warnings.go Normal file
View File

@ -0,0 +1,144 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rest
import (
"fmt"
"io"
"net/http"
"sync"
"k8s.io/klog/v2"
"k8s.io/apimachinery/pkg/util/net"
)
// WarningHandler is an interface for handling warning headers
type WarningHandler interface {
// HandleWarningHeader is called with the warn code, agent, and text when a warning header is countered.
HandleWarningHeader(code int, agent string, text string)
}
var (
defaultWarningHandler WarningHandler = WarningLogger{}
defaultWarningHandlerLock sync.RWMutex
)
// SetDefaultWarningHandler sets the default handler client uses when warning headers are encountered.
// By default, warnings are printed to stderr.
func SetDefaultWarningHandler(l WarningHandler) {
defaultWarningHandlerLock.Lock()
defer defaultWarningHandlerLock.Unlock()
defaultWarningHandler = l
}
func getDefaultWarningHandler() WarningHandler {
defaultWarningHandlerLock.RLock()
defer defaultWarningHandlerLock.RUnlock()
l := defaultWarningHandler
return l
}
// NoWarnings is an implementation of WarningHandler that suppresses warnings.
type NoWarnings struct{}
func (NoWarnings) HandleWarningHeader(code int, agent string, message string) {}
// WarningLogger is an implementation of WarningHandler that logs code 299 warnings
type WarningLogger struct{}
func (WarningLogger) HandleWarningHeader(code int, agent string, message string) {
if code != 299 || len(message) == 0 {
return
}
klog.Warning(message)
}
type warningWriter struct {
// out is the writer to output warnings to
out io.Writer
// opts contains options controlling warning output
opts WarningWriterOptions
// writtenLock guards written and writtenCount
writtenLock sync.Mutex
writtenCount int
written map[string]struct{}
}
// WarningWriterOptions controls the behavior of a WarningHandler constructed using NewWarningWriter()
type WarningWriterOptions struct {
// Deduplicate indicates a given warning message should only be written once.
// Setting this to true in a long-running process handling many warnings can result in increased memory use.
Deduplicate bool
// Color indicates that warning output can include ANSI color codes
Color bool
}
// NewWarningWriter returns an implementation of WarningHandler that outputs code 299 warnings to the specified writer.
func NewWarningWriter(out io.Writer, opts WarningWriterOptions) *warningWriter {
h := &warningWriter{out: out, opts: opts}
if opts.Deduplicate {
h.written = map[string]struct{}{}
}
return h
}
const (
yellowColor = "\u001b[33;1m"
resetColor = "\u001b[0m"
)
// HandleWarningHeader prints warnings with code=299 to the configured writer.
func (w *warningWriter) HandleWarningHeader(code int, agent string, message string) {
if code != 299 || len(message) == 0 {
return
}
w.writtenLock.Lock()
defer w.writtenLock.Unlock()
if w.opts.Deduplicate {
if _, alreadyWritten := w.written[message]; alreadyWritten {
return
}
w.written[message] = struct{}{}
}
w.writtenCount++
if w.opts.Color {
fmt.Fprintf(w.out, "%sWarning:%s %s\n", yellowColor, resetColor, message)
} else {
fmt.Fprintf(w.out, "Warning: %s\n", message)
}
}
func (w *warningWriter) WarningCount() int {
w.writtenLock.Lock()
defer w.writtenLock.Unlock()
return w.writtenCount
}
func handleWarnings(headers http.Header, handler WarningHandler) []net.WarningHeader {
if handler == nil {
handler = getDefaultWarningHandler()
}
warnings, _ := net.ParseWarningHeaders(headers["Warning"])
for _, warning := range warnings {
handler.HandleWarningHeader(warning.Code, warning.Agent, warning.Text)
}
return warnings
}