Update vendor, go.mod and go.sum for k8s bump to v0.18.3

Signed-off-by: Billy McFall <22157057+Billy99@users.noreply.github.com>
This commit is contained in:
Billy McFall
2020-10-23 10:52:52 -04:00
parent c8739f64b9
commit 36b5edff29
892 changed files with 147015 additions and 61162 deletions

View File

@@ -30,6 +30,7 @@ import (
"reflect"
"strconv"
"strings"
"sync"
"time"
"golang.org/x/net/http2"
@@ -38,6 +39,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
utilclock "k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/watch"
restclientwatch "k8s.io/client-go/rest/watch"
@@ -48,8 +50,12 @@ import (
var (
// longThrottleLatency defines threshold for logging requests. All requests being
// throttle for more than longThrottleLatency will be logged.
// throttled (via the provided rateLimiter) for more than longThrottleLatency will
// be logged.
longThrottleLatency = 50 * time.Millisecond
// extraLongThrottleLatency defines the threshold for logging requests at log level 2.
extraLongThrottleLatency = 1 * time.Second
)
// HTTPClient is an interface for testing a request object.
@@ -60,8 +66,8 @@ type HTTPClient interface {
// ResponseWrapper is an interface for getting a response.
// The response may be either accessed as a raw data (the whole output is put into memory) or as a stream.
type ResponseWrapper interface {
DoRaw() ([]byte, error)
Stream() (io.ReadCloser, error)
DoRaw(context.Context) ([]byte, error)
Stream(context.Context) (io.ReadCloser, error)
}
// RequestConstructionError is returned when there's an error assembling a request.
@@ -74,19 +80,20 @@ func (r *RequestConstructionError) Error() string {
return fmt.Sprintf("request construction error: '%v'", r.Err)
}
var noBackoff = &NoBackoff{}
// Request allows for building up a request to a server in a chained fashion.
// Any errors are stored until the end of your call, so you only have to
// check once.
type Request struct {
// required
client HTTPClient
verb string
c *RESTClient
baseURL *url.URL
content ContentConfig
serializers Serializers
rateLimiter flowcontrol.RateLimiter
backoff BackoffManager
timeout time.Duration
// generic components accessible via method setters
verb string
pathPrefix string
subpath string
params url.Values
@@ -98,50 +105,67 @@ type Request struct {
resource string
resourceName string
subresource string
timeout time.Duration
// output
err error
body io.Reader
// This is only used for per-request timeouts, deadlines, and cancellations.
ctx context.Context
backoffMgr BackoffManager
throttle flowcontrol.RateLimiter
}
// NewRequest creates a new request helper object for accessing runtime.Objects on a server.
func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, serializers Serializers, backoff BackoffManager, throttle flowcontrol.RateLimiter, timeout time.Duration) *Request {
func NewRequest(c *RESTClient) *Request {
var backoff BackoffManager
if c.createBackoffMgr != nil {
backoff = c.createBackoffMgr()
}
if backoff == nil {
klog.V(2).Infof("Not implementing request backoff strategy.")
backoff = &NoBackoff{}
backoff = noBackoff
}
pathPrefix := "/"
if baseURL != nil {
pathPrefix = path.Join(pathPrefix, baseURL.Path)
var pathPrefix string
if c.base != nil {
pathPrefix = path.Join("/", c.base.Path, c.versionedAPIPath)
} else {
pathPrefix = path.Join("/", c.versionedAPIPath)
}
var timeout time.Duration
if c.Client != nil {
timeout = c.Client.Timeout
}
r := &Request{
client: client,
verb: verb,
baseURL: baseURL,
pathPrefix: path.Join(pathPrefix, versionedAPIPath),
content: content,
serializers: serializers,
backoffMgr: backoff,
throttle: throttle,
c: c,
rateLimiter: c.rateLimiter,
backoff: backoff,
timeout: timeout,
pathPrefix: pathPrefix,
}
switch {
case len(content.AcceptContentTypes) > 0:
r.SetHeader("Accept", content.AcceptContentTypes)
case len(content.ContentType) > 0:
r.SetHeader("Accept", content.ContentType+", */*")
case len(c.content.AcceptContentTypes) > 0:
r.SetHeader("Accept", c.content.AcceptContentTypes)
case len(c.content.ContentType) > 0:
r.SetHeader("Accept", c.content.ContentType+", */*")
}
return r
}
// NewRequestWithClient creates a Request with an embedded RESTClient for use in test scenarios.
func NewRequestWithClient(base *url.URL, versionedAPIPath string, content ClientContentConfig, client *http.Client) *Request {
return NewRequest(&RESTClient{
base: base,
versionedAPIPath: versionedAPIPath,
content: content,
Client: client,
})
}
// Verb sets the verb this request will use.
func (r *Request) Verb(verb string) *Request {
r.verb = verb
return r
}
// Prefix adds segments to the relative beginning to the request path. These
// items will be placed before the optional Namespace, Resource, or Name sections.
// Setting AbsPath will clear any previously set Prefix segments
@@ -184,17 +208,17 @@ func (r *Request) Resource(resource string) *Request {
// or defaults to the stub implementation if nil is provided
func (r *Request) BackOff(manager BackoffManager) *Request {
if manager == nil {
r.backoffMgr = &NoBackoff{}
r.backoff = &NoBackoff{}
return r
}
r.backoffMgr = manager
r.backoff = manager
return r
}
// Throttle receives a rate-limiter and sets or replaces an existing request limiter
func (r *Request) Throttle(limiter flowcontrol.RateLimiter) *Request {
r.throttle = limiter
r.rateLimiter = limiter
return r
}
@@ -272,8 +296,8 @@ func (r *Request) AbsPath(segments ...string) *Request {
if r.err != nil {
return r
}
r.pathPrefix = path.Join(r.baseURL.Path, path.Join(segments...))
if len(segments) == 1 && (len(r.baseURL.Path) > 1 || len(segments[0]) > 1) && strings.HasSuffix(segments[0], "/") {
r.pathPrefix = path.Join(r.c.base.Path, path.Join(segments...))
if len(segments) == 1 && (len(r.c.base.Path) > 1 || len(segments[0]) > 1) && strings.HasSuffix(segments[0], "/") {
// preserve any trailing slashes for legacy behavior
r.pathPrefix += "/"
}
@@ -317,7 +341,7 @@ func (r *Request) Param(paramName, s string) *Request {
// VersionedParams will not write query parameters that have omitempty set and are empty. If a
// parameter has already been set it is appended to (Params and VersionedParams are additive).
func (r *Request) VersionedParams(obj runtime.Object, codec runtime.ParameterCodec) *Request {
return r.SpecificallyVersionedParams(obj, codec, *r.content.GroupVersion)
return r.SpecificallyVersionedParams(obj, codec, r.c.content.GroupVersion)
}
func (r *Request) SpecificallyVersionedParams(obj runtime.Object, codec runtime.ParameterCodec, version schema.GroupVersion) *Request {
@@ -397,27 +421,25 @@ func (r *Request) Body(obj interface{}) *Request {
if reflect.ValueOf(t).IsNil() {
return r
}
data, err := runtime.Encode(r.serializers.Encoder, t)
encoder, err := r.c.content.Negotiator.Encoder(r.c.content.ContentType, nil)
if err != nil {
r.err = err
return r
}
data, err := runtime.Encode(encoder, t)
if err != nil {
r.err = err
return r
}
glogBody("Request Body", data)
r.body = bytes.NewReader(data)
r.SetHeader("Content-Type", r.content.ContentType)
r.SetHeader("Content-Type", r.c.content.ContentType)
default:
r.err = fmt.Errorf("unknown type used for body: %+v", obj)
}
return r
}
// Context adds a context to the request. Contexts are only used for
// timeouts, deadlines, and cancellations.
func (r *Request) Context(ctx context.Context) *Request {
r.ctx = ctx
return r
}
// URL returns the current working URL.
func (r *Request) URL() *url.URL {
p := r.pathPrefix
@@ -433,8 +455,8 @@ func (r *Request) URL() *url.URL {
}
finalURL := &url.URL{}
if r.baseURL != nil {
*finalURL = *r.baseURL
if r.c.base != nil {
*finalURL = *r.c.base
}
finalURL.Path = p
@@ -468,8 +490,8 @@ func (r Request) finalURLTemplate() url.URL {
segments := strings.Split(r.URL().Path, "/")
groupIndex := 0
index := 0
if r.URL() != nil && r.baseURL != nil && strings.Contains(r.URL().Path, r.baseURL.Path) {
groupIndex += len(strings.Split(r.baseURL.Path, "/"))
if r.URL() != nil && r.c.base != nil && strings.Contains(r.URL().Path, r.c.base.Path) {
groupIndex += len(strings.Split(r.c.base.Path, "/"))
}
if groupIndex >= len(segments) {
return *url
@@ -521,68 +543,119 @@ func (r Request) finalURLTemplate() url.URL {
return *url
}
func (r *Request) tryThrottle() {
now := time.Now()
if r.throttle != nil {
r.throttle.Accept()
func (r *Request) tryThrottle(ctx context.Context) error {
if r.rateLimiter == nil {
return nil
}
if latency := time.Since(now); latency > longThrottleLatency {
klog.V(4).Infof("Throttling request took %v, request: %s:%s", latency, r.verb, r.URL().String())
now := time.Now()
err := r.rateLimiter.Wait(ctx)
latency := time.Since(now)
if latency > longThrottleLatency {
klog.V(3).Infof("Throttling request took %v, request: %s:%s", latency, r.verb, r.URL().String())
}
if latency > extraLongThrottleLatency {
// If the rate limiter latency is very high, the log message should be printed at a higher log level,
// but we use a throttled logger to prevent spamming.
globalThrottledLogger.Infof("Throttling request took %v, request: %s:%s", latency, r.verb, r.URL().String())
}
metrics.RateLimiterLatency.Observe(r.verb, r.finalURLTemplate(), latency)
return err
}
type throttleSettings struct {
logLevel klog.Level
minLogInterval time.Duration
lastLogTime time.Time
lock sync.RWMutex
}
type throttledLogger struct {
clock utilclock.PassiveClock
settings []*throttleSettings
}
var globalThrottledLogger = &throttledLogger{
clock: utilclock.RealClock{},
settings: []*throttleSettings{
{
logLevel: 2,
minLogInterval: 1 * time.Second,
}, {
logLevel: 0,
minLogInterval: 10 * time.Second,
},
},
}
func (b *throttledLogger) attemptToLog() (klog.Level, bool) {
for _, setting := range b.settings {
if bool(klog.V(setting.logLevel)) {
// Return early without write locking if possible.
if func() bool {
setting.lock.RLock()
defer setting.lock.RUnlock()
return b.clock.Since(setting.lastLogTime) >= setting.minLogInterval
}() {
setting.lock.Lock()
defer setting.lock.Unlock()
if b.clock.Since(setting.lastLogTime) >= setting.minLogInterval {
setting.lastLogTime = b.clock.Now()
return setting.logLevel, true
}
}
return -1, false
}
}
return -1, false
}
// Infof will write a log message at each logLevel specified by the reciever's throttleSettings
// as long as it hasn't written a log message more recently than minLogInterval.
func (b *throttledLogger) Infof(message string, args ...interface{}) {
if logLevel, ok := b.attemptToLog(); ok {
klog.V(logLevel).Infof(message, args...)
}
}
// Watch attempts to begin watching the requested location.
// Returns a watch.Interface, or an error.
func (r *Request) Watch() (watch.Interface, error) {
return r.WatchWithSpecificDecoders(
func(body io.ReadCloser) streaming.Decoder {
framer := r.serializers.Framer.NewFrameReader(body)
return streaming.NewDecoder(framer, r.serializers.StreamingSerializer)
},
r.serializers.Decoder,
)
}
// WatchWithSpecificDecoders attempts to begin watching the requested location with a *different* decoder.
// Turns out that you want one "standard" decoder for the watch event and one "personal" decoder for the content
// Returns a watch.Interface, or an error.
func (r *Request) WatchWithSpecificDecoders(wrapperDecoderFn func(io.ReadCloser) streaming.Decoder, embeddedDecoder runtime.Decoder) (watch.Interface, error) {
func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
// We specifically don't want to rate limit watches, so we
// don't use r.throttle here.
// don't use r.rateLimiter here.
if r.err != nil {
return nil, r.err
}
if r.serializers.Framer == nil {
return nil, fmt.Errorf("watching resources is not possible with this client (content-type: %s)", r.content.ContentType)
}
url := r.URL().String()
req, err := http.NewRequest(r.verb, url, r.body)
if err != nil {
return nil, err
}
if r.ctx != nil {
req = req.WithContext(r.ctx)
}
req = req.WithContext(ctx)
req.Header = r.headers
client := r.client
client := r.c.Client
if client == nil {
client = http.DefaultClient
}
r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
resp, err := client.Do(req)
updateURLMetrics(r, resp, err)
if r.baseURL != nil {
if r.c.base != nil {
if err != nil {
r.backoffMgr.UpdateBackoff(r.baseURL, err, 0)
r.backoff.UpdateBackoff(r.c.base, err, 0)
} else {
r.backoffMgr.UpdateBackoff(r.baseURL, err, resp.StatusCode)
r.backoff.UpdateBackoff(r.c.base, err, resp.StatusCode)
}
}
if err != nil {
// The watch stream mechanism handles many common partial data errors, so closed
// connections can be retried in many cases.
if net.IsProbableEOF(err) {
if net.IsProbableEOF(err) || net.IsTimeout(err) {
return watch.NewEmptyWatch(), nil
}
return nil, err
@@ -592,18 +665,36 @@ func (r *Request) WatchWithSpecificDecoders(wrapperDecoderFn func(io.ReadCloser)
if result := r.transformResponse(resp, req); result.err != nil {
return nil, result.err
}
return nil, fmt.Errorf("for request '%+v', got status: %v", url, resp.StatusCode)
return nil, fmt.Errorf("for request %s, got status: %v", url, resp.StatusCode)
}
wrapperDecoder := wrapperDecoderFn(resp.Body)
return watch.NewStreamWatcher(restclientwatch.NewDecoder(wrapperDecoder, embeddedDecoder)), nil
contentType := resp.Header.Get("Content-Type")
mediaType, params, err := mime.ParseMediaType(contentType)
if err != nil {
klog.V(4).Infof("Unexpected content type from the server: %q: %v", contentType, err)
}
objectDecoder, streamingSerializer, framer, err := r.c.content.Negotiator.StreamDecoder(mediaType, params)
if err != nil {
return nil, err
}
frameReader := framer.NewFrameReader(resp.Body)
watchEventDecoder := streaming.NewDecoder(frameReader, streamingSerializer)
return watch.NewStreamWatcher(
restclientwatch.NewDecoder(watchEventDecoder, objectDecoder),
// use 500 to indicate that the cause of the error is unknown - other error codes
// are more specific to HTTP interactions, and set a reason
errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"),
), nil
}
// updateURLMetrics is a convenience function for pushing metrics.
// It also handles corner cases for incomplete/invalid request data.
func updateURLMetrics(req *Request, resp *http.Response, err error) {
url := "none"
if req.baseURL != nil {
url = req.baseURL.Host
if req.c.base != nil {
url = req.c.base.Host
}
// Errors can be arbitrary strings. Unbound label cardinality is not suitable for a metric
@@ -620,34 +711,37 @@ func updateURLMetrics(req *Request, resp *http.Response, err error) {
// Returns io.ReadCloser which could be used for streaming of the response, or an error
// Any non-2xx http status code causes an error. If we get a non-2xx code, we try to convert the body into an APIStatus object.
// If we can, we return that as an error. Otherwise, we create an error that lists the http status and the content of the response.
func (r *Request) Stream() (io.ReadCloser, error) {
func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
if r.err != nil {
return nil, r.err
}
r.tryThrottle()
if err := r.tryThrottle(ctx); err != nil {
return nil, err
}
url := r.URL().String()
req, err := http.NewRequest(r.verb, url, nil)
if err != nil {
return nil, err
}
if r.ctx != nil {
req = req.WithContext(r.ctx)
if r.body != nil {
req.Body = ioutil.NopCloser(r.body)
}
req = req.WithContext(ctx)
req.Header = r.headers
client := r.client
client := r.c.Client
if client == nil {
client = http.DefaultClient
}
r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
resp, err := client.Do(req)
updateURLMetrics(r, resp, err)
if r.baseURL != nil {
if r.c.base != nil {
if err != nil {
r.backoffMgr.UpdateBackoff(r.URL(), err, 0)
r.backoff.UpdateBackoff(r.URL(), err, 0)
} else {
r.backoffMgr.UpdateBackoff(r.URL(), err, resp.StatusCode)
r.backoff.UpdateBackoff(r.URL(), err, resp.StatusCode)
}
}
if err != nil {
@@ -671,11 +765,38 @@ func (r *Request) Stream() (io.ReadCloser, error) {
}
}
// requestPreflightCheck looks for common programmer errors on Request.
//
// We tackle here two programmer mistakes. The first one is to try to create
// something(POST) using an empty string as namespace with namespaceSet as
// true. If namespaceSet is true then namespace should also be defined. The
// second mistake is, when under the same circumstances, the programmer tries
// to GET, PUT or DELETE a named resource(resourceName != ""), again, if
// namespaceSet is true then namespace must not be empty.
func (r *Request) requestPreflightCheck() error {
if !r.namespaceSet {
return nil
}
if len(r.namespace) > 0 {
return nil
}
switch r.verb {
case "POST":
return fmt.Errorf("an empty namespace may not be set during creation")
case "GET", "PUT", "DELETE":
if len(r.resourceName) > 0 {
return fmt.Errorf("an empty namespace may not be set when a resource name is provided")
}
}
return nil
}
// request connects to the server and invokes the provided function when a server response is
// received. It handles retry behavior and up front validation of requests. It will invoke
// fn at most once. It will return an error if a problem occurred prior to connecting to the
// server - the provided function is responsible for handling server errors.
func (r *Request) request(fn func(*http.Request, *http.Response)) error {
func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Response)) error {
//Metrics for total request latency
start := time.Now()
defer func() {
@@ -687,69 +808,76 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error {
return r.err
}
// TODO: added to catch programmer errors (invoking operations with an object with an empty namespace)
if (r.verb == "GET" || r.verb == "PUT" || r.verb == "DELETE") && r.namespaceSet && len(r.resourceName) > 0 && len(r.namespace) == 0 {
return fmt.Errorf("an empty namespace may not be set when a resource name is provided")
}
if (r.verb == "POST") && r.namespaceSet && len(r.namespace) == 0 {
return fmt.Errorf("an empty namespace may not be set during creation")
if err := r.requestPreflightCheck(); err != nil {
return err
}
client := r.client
client := r.c.Client
if client == nil {
client = http.DefaultClient
}
// Throttle the first try before setting up the timeout configured on the
// client. We don't want a throttled client to return timeouts to callers
// before it makes a single request.
if err := r.tryThrottle(ctx); err != nil {
return err
}
if r.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, r.timeout)
defer cancel()
}
// Right now we make about ten retry attempts if we get a Retry-After response.
maxRetries := 10
retries := 0
for {
url := r.URL().String()
req, err := http.NewRequest(r.verb, url, r.body)
if err != nil {
return err
}
if r.timeout > 0 {
if r.ctx == nil {
r.ctx = context.Background()
}
var cancelFn context.CancelFunc
r.ctx, cancelFn = context.WithTimeout(r.ctx, r.timeout)
defer cancelFn()
}
if r.ctx != nil {
req = req.WithContext(r.ctx)
}
req = req.WithContext(ctx)
req.Header = r.headers
r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
if retries > 0 {
// We are retrying the request that we already send to apiserver
// at least once before.
// This request should also be throttled with the client-internal throttler.
r.tryThrottle()
// This request should also be throttled with the client-internal rate limiter.
if err := r.tryThrottle(ctx); err != nil {
return err
}
}
resp, err := client.Do(req)
updateURLMetrics(r, resp, err)
if err != nil {
r.backoffMgr.UpdateBackoff(r.URL(), err, 0)
r.backoff.UpdateBackoff(r.URL(), err, 0)
} else {
r.backoffMgr.UpdateBackoff(r.URL(), err, resp.StatusCode)
r.backoff.UpdateBackoff(r.URL(), err, resp.StatusCode)
}
if err != nil {
// "Connection reset by peer" is usually a transient error.
// "Connection reset by peer" or "apiserver is shutting down" are usually a transient errors.
// Thus in case of "GET" operations, we simply retry it.
// We are not automatically retrying "write" operations, as
// they are not idempotent.
if !net.IsConnectionReset(err) || r.verb != "GET" {
if r.verb != "GET" {
return err
}
// For the purpose of retry, we set the artificial "retry-after" response.
// TODO: Should we clean the original response if it exists?
resp = &http.Response{
StatusCode: http.StatusInternalServerError,
Header: http.Header{"Retry-After": []string{"1"}},
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
// For connection errors and apiserver shutdown errors retry.
if net.IsConnectionReset(err) || net.IsProbableEOF(err) {
// For the purpose of retry, we set the artificial "retry-after" response.
// TODO: Should we clean the original response if it exists?
resp = &http.Response{
StatusCode: http.StatusInternalServerError,
Header: http.Header{"Retry-After": []string{"1"}},
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
}
} else {
return err
}
}
@@ -777,7 +905,7 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error {
}
klog.V(4).Infof("Got a Retry-After %ds response for attempt %d to %v", seconds, retries, url)
r.backoffMgr.Sleep(time.Duration(seconds) * time.Second)
r.backoff.Sleep(time.Duration(seconds) * time.Second)
return false
}
fn(req, resp)
@@ -793,15 +921,11 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error {
// processing.
//
// Error type:
// * If the request can't be constructed, or an error happened earlier while building its
// arguments: *RequestConstructionError
// * If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
// * http.Client.Do errors are returned directly.
func (r *Request) Do() Result {
r.tryThrottle()
func (r *Request) Do(ctx context.Context) Result {
var result Result
err := r.request(func(req *http.Request, resp *http.Response) {
err := r.request(ctx, func(req *http.Request, resp *http.Response) {
result = r.transformResponse(resp, req)
})
if err != nil {
@@ -811,11 +935,9 @@ func (r *Request) Do() Result {
}
// DoRaw executes the request but does not process the response body.
func (r *Request) DoRaw() ([]byte, error) {
r.tryThrottle()
func (r *Request) DoRaw(ctx context.Context) ([]byte, error) {
var result Result
err := r.request(func(req *http.Request, resp *http.Response) {
err := r.request(ctx, func(req *http.Request, resp *http.Response) {
result.body, result.err = ioutil.ReadAll(resp.Body)
glogBody("Response Body", result.body)
if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent {
@@ -845,13 +967,13 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu
// 3. Apiserver closes connection.
// 4. client-go should catch this and return an error.
klog.V(2).Infof("Stream error %#v when reading response body, may be caused by closed connection.", err)
streamErr := fmt.Errorf("Stream error %#v when reading response body, may be caused by closed connection. Please retry.", err)
streamErr := fmt.Errorf("stream error when reading response body, may be caused by closed connection. Please retry. Original error: %v", err)
return Result{
err: streamErr,
}
default:
klog.Errorf("Unexpected error when reading response body: %#v", err)
unexpectedErr := fmt.Errorf("Unexpected error %#v when reading response body. Please retry.", err)
klog.Errorf("Unexpected error when reading response body: %v", err)
unexpectedErr := fmt.Errorf("unexpected error when reading response body. Please retry. Original error: %v", err)
return Result{
err: unexpectedErr,
}
@@ -861,14 +983,18 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu
glogBody("Response Body", body)
// verify the content type is accurate
var decoder runtime.Decoder
contentType := resp.Header.Get("Content-Type")
decoder := r.serializers.Decoder
if len(contentType) > 0 && (decoder == nil || (len(r.content.ContentType) > 0 && contentType != r.content.ContentType)) {
if len(contentType) == 0 {
contentType = r.c.content.ContentType
}
if len(contentType) > 0 {
var err error
mediaType, params, err := mime.ParseMediaType(contentType)
if err != nil {
return Result{err: errors.NewInternalError(err)}
}
decoder, err = r.serializers.RenegotiatedDecoder(mediaType, params)
decoder, err = r.c.content.Negotiator.Decoder(mediaType, params)
if err != nil {
// if we fail to negotiate a decoder, treat this as an unstructured error
switch {
@@ -988,7 +1114,7 @@ func (r *Request) newUnstructuredResponseError(body []byte, isTextResponse bool,
}
var groupResource schema.GroupResource
if len(r.resource) > 0 {
groupResource.Group = r.content.GroupVersion.Group
groupResource.Group = r.c.content.GroupVersion.Group
groupResource.Resource = r.resource
}
return errors.NewGenericServerResponse(
@@ -1100,7 +1226,8 @@ func (r Result) Into(obj runtime.Object) error {
return fmt.Errorf("serializer for %s doesn't exist", r.contentType)
}
if len(r.body) == 0 {
return fmt.Errorf("0-length response")
return fmt.Errorf("0-length response with status code: %d and content type: %s",
r.statusCode, r.contentType)
}
out, _, err := r.decoder.Decode(r.body, nil, obj)
@@ -1195,7 +1322,6 @@ func IsValidPathSegmentPrefix(name string) []string {
func ValidatePathSegmentName(name string, prefix bool) []string {
if prefix {
return IsValidPathSegmentPrefix(name)
} else {
return IsValidPathSegmentName(name)
}
return IsValidPathSegmentName(name)
}