From 9bbcc2938d41daa40d3080a1b6524afbe4e27bd9 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Sun, 10 Nov 2019 16:52:08 -0500 Subject: [PATCH] Always negotiate a decoder using ClientNegotiator This commit performs two refactors and fixes a bug. Refactor 1 changes the signature of Request to take a RESTClient, which removes the extra copy of everything on RESTClient from Request. A pair of optional constructors are added for testing. The major functional change is that Request no longer has the shim HTTPClient interface and so some test cases change slightly because we are now going through http.Client code paths instead of direct to our test stubs. Refactor 2 changes the signature of RESTClient to take a ClientContentConfig instead of ContentConfig - the primary difference being that ClientContentConfig uses ClientNegotiator instead of NegotiatedSerializer and the old Serializers type. We also collapse some redundancies (like the rate limiter can be created outside the constructor). The bug fix is to negotiate the streaming content type on a Watch() like we do for requests. We stop caching the decoder and simply resolve it on the request. We also clean up the dynamic client and remove the extra WatchSpecificVersions() method by providing a properly wrapped dynamic client. Kubernetes-commit: 3b780c64b89606f4e6b21f48fb9c305d5998b9e5 --- metadata/fake/simple.go | 1 - rest/client.go | 141 +++------- rest/client_test.go | 8 +- rest/config.go | 69 ++++- rest/config_test.go | 53 ++++ rest/fake/fake.go | 58 ++-- rest/request.go | 198 +++++++------ rest/request_test.go | 608 ++++++++++++++++++++++++---------------- 8 files changed, 666 insertions(+), 470 deletions(-) diff --git a/metadata/fake/simple.go b/metadata/fake/simple.go index 0667e9e1..9d5e0f45 100644 --- a/metadata/fake/simple.go +++ b/metadata/fake/simple.go @@ -317,7 +317,6 @@ func (c *metadataResourceClient) List(opts metav1.ListOptions) (*metav1.PartialO if !ok { return nil, fmt.Errorf("incoming object is incorrect type %T", obj) } - fmt.Printf("DEBUG: %#v\n", inputList) list := &metav1.PartialObjectMetadataList{ ListMeta: inputList.ListMeta, diff --git a/rest/client.go b/rest/client.go index 927403cb..53c6abd3 100644 --- a/rest/client.go +++ b/rest/client.go @@ -17,8 +17,6 @@ limitations under the License. package rest import ( - "fmt" - "mime" "net/http" "net/url" "os" @@ -51,6 +49,28 @@ type Interface interface { APIVersion() schema.GroupVersion } +// ClientContentConfig controls how RESTClient communicates with the server. +// +// TODO: ContentConfig will be updated to accept a Negotiator instead of a +// NegotiatedSerializer and NegotiatedSerializer will be removed. +type ClientContentConfig struct { + // AcceptContentTypes specifies the types the client will accept and is optional. + // If not set, ContentType will be used to define the Accept header + AcceptContentTypes string + // ContentType specifies the wire format used to communicate with the server. + // This value will be set as the Accept header on requests made to the server if + // AcceptContentTypes is not set, and as the default content type on any object + // sent to the server. If not set, "application/json" is used. + ContentType string + // GroupVersion is the API version to talk to. Must be provided when initializing + // a RESTClient directly. When initializing a Client, will be set with the default + // code version. This is used as the default group version for VersionedParams. + GroupVersion schema.GroupVersion + // Negotiator is used for obtaining encoders and decoders for multiple + // supported media types. + Negotiator runtime.ClientNegotiator +} + // RESTClient imposes common Kubernetes API conventions on a set of resource paths. // The baseURL is expected to point to an HTTP or HTTPS path that is the parent // of one or more resources. The server should return a decodable API resource @@ -64,34 +84,27 @@ type RESTClient struct { // versionedAPIPath is a path segment connecting the base URL to the resource root versionedAPIPath string - // contentConfig is the information used to communicate with the server. - contentConfig ContentConfig - - // serializers contain all serializers for underlying content type. - serializers Serializers + // content describes how a RESTClient encodes and decodes responses. + content ClientContentConfig // creates BackoffManager that is passed to requests. createBackoffMgr func() BackoffManager - // TODO extract this into a wrapper interface via the RESTClient interface in kubectl. - Throttle flowcontrol.RateLimiter + // rateLimiter is shared among all requests created by this client unless specifically + // overridden. + rateLimiter flowcontrol.RateLimiter // Set specific behavior of the client. If not set http.DefaultClient will be used. Client *http.Client } -type Serializers struct { - Encoder runtime.Encoder - Decoder runtime.Decoder - StreamingSerializer runtime.Serializer - Framer runtime.Framer - RenegotiatedDecoder func(contentType string, params map[string]string) (runtime.Decoder, error) -} - // NewRESTClient creates a new RESTClient. This client performs generic REST functions -// such as Get, Put, Post, and Delete on specified paths. Codec controls encoding and -// decoding of responses from the server. -func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConfig, maxQPS float32, maxBurst int, rateLimiter flowcontrol.RateLimiter, client *http.Client) (*RESTClient, error) { +// such as Get, Put, Post, and Delete on specified paths. +func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ClientContentConfig, rateLimiter flowcontrol.RateLimiter, client *http.Client) (*RESTClient, error) { + if len(config.ContentType) == 0 { + config.ContentType = "application/json" + } + base := *baseURL if !strings.HasSuffix(base.Path, "/") { base.Path += "/" @@ -99,31 +112,14 @@ func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConf base.RawQuery = "" base.Fragment = "" - if config.GroupVersion == nil { - config.GroupVersion = &schema.GroupVersion{} - } - if len(config.ContentType) == 0 { - config.ContentType = "application/json" - } - serializers, err := createSerializers(config) - if err != nil { - return nil, err - } - - var throttle flowcontrol.RateLimiter - if maxQPS > 0 && rateLimiter == nil { - throttle = flowcontrol.NewTokenBucketRateLimiter(maxQPS, maxBurst) - } else if rateLimiter != nil { - throttle = rateLimiter - } return &RESTClient{ base: &base, versionedAPIPath: versionedAPIPath, - contentConfig: config, - serializers: *serializers, + content: config, createBackoffMgr: readExpBackoffConfig, - Throttle: throttle, - Client: client, + rateLimiter: rateLimiter, + + Client: client, }, nil } @@ -132,7 +128,7 @@ func (c *RESTClient) GetRateLimiter() flowcontrol.RateLimiter { if c == nil { return nil } - return c.Throttle + return c.rateLimiter } // readExpBackoffConfig handles the internal logic of determining what the @@ -153,58 +149,6 @@ func readExpBackoffConfig() BackoffManager { time.Duration(backoffDurationInt)*time.Second)} } -// createSerializers creates all necessary serializers for given contentType. -// TODO: the negotiated serializer passed to this method should probably return -// serializers that control decoding and versioning without this package -// being aware of the types. Depends on whether RESTClient must deal with -// generic infrastructure. -func createSerializers(config ContentConfig) (*Serializers, error) { - mediaTypes := config.NegotiatedSerializer.SupportedMediaTypes() - contentType := config.ContentType - mediaType, _, err := mime.ParseMediaType(contentType) - if err != nil { - return nil, fmt.Errorf("the content type specified in the client configuration is not recognized: %v", err) - } - info, ok := runtime.SerializerInfoForMediaType(mediaTypes, mediaType) - if !ok { - if len(contentType) != 0 || len(mediaTypes) == 0 { - return nil, fmt.Errorf("no serializers registered for %s", contentType) - } - info = mediaTypes[0] - } - - internalGV := schema.GroupVersions{ - { - Group: config.GroupVersion.Group, - Version: runtime.APIVersionInternal, - }, - // always include the legacy group as a decoding target to handle non-error `Status` return types - { - Group: "", - Version: runtime.APIVersionInternal, - }, - } - - s := &Serializers{ - Encoder: config.NegotiatedSerializer.EncoderForVersion(info.Serializer, *config.GroupVersion), - Decoder: config.NegotiatedSerializer.DecoderToVersion(info.Serializer, internalGV), - - RenegotiatedDecoder: func(contentType string, params map[string]string) (runtime.Decoder, error) { - info, ok := runtime.SerializerInfoForMediaType(mediaTypes, contentType) - if !ok { - return nil, fmt.Errorf("serializer for %s not registered", contentType) - } - return config.NegotiatedSerializer.DecoderToVersion(info.Serializer, internalGV), nil - }, - } - if info.StreamSerializer != nil { - s.StreamingSerializer = info.StreamSerializer.Serializer - s.Framer = info.StreamSerializer.Framer - } - - return s, nil -} - // Verb begins a request with a verb (GET, POST, PUT, DELETE). // // Example usage of RESTClient's request building interface: @@ -219,12 +163,7 @@ func createSerializers(config ContentConfig) (*Serializers, error) { // list, ok := resp.(*api.PodList) // func (c *RESTClient) Verb(verb string) *Request { - backoff := c.createBackoffMgr() - - if c.Client == nil { - return NewRequest(nil, verb, c.base, c.versionedAPIPath, c.contentConfig, c.serializers, backoff, c.Throttle, 0) - } - return NewRequest(c.Client, verb, c.base, c.versionedAPIPath, c.contentConfig, c.serializers, backoff, c.Throttle, c.Client.Timeout) + return NewRequest(c).Verb(verb) } // Post begins a POST request. Short for c.Verb("POST"). @@ -254,5 +193,5 @@ func (c *RESTClient) Delete() *Request { // APIVersion returns the APIVersion this RESTClient is expected to use. func (c *RESTClient) APIVersion() schema.GroupVersion { - return *c.contentConfig.GroupVersion + return c.content.GroupVersion } diff --git a/rest/client_test.go b/rest/client_test.go index de864e1f..997c8d45 100644 --- a/rest/client_test.go +++ b/rest/client_test.go @@ -27,7 +27,7 @@ import ( "fmt" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" v1beta1 "k8s.io/api/extensions/v1beta1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -57,12 +57,14 @@ func TestSerializer(t *testing.T) { NegotiatedSerializer: scheme.Codecs.WithoutConversion(), } - serializer, err := createSerializers(contentConfig) + n := runtime.NewClientNegotiator(contentConfig.NegotiatedSerializer, gv) + d, err := n.Decoder("application/json", nil) if err != nil { t.Fatal(err) } + // bytes based on actual return from API server when encoding an "unversioned" object - obj, err := runtime.Decode(serializer.Decoder, []byte(`{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Success"}`)) + obj, err := runtime.Decode(d, []byte(`{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Success"}`)) t.Log(obj) if err != nil { t.Fatal(err) diff --git a/rest/config.go b/rest/config.go index fb81fb7b..f58f5183 100644 --- a/rest/config.go +++ b/rest/config.go @@ -269,6 +269,9 @@ type ContentConfig struct { GroupVersion *schema.GroupVersion // NegotiatedSerializer is used for obtaining encoders and decoders for multiple // supported media types. + // + // TODO: NegotiatedSerializer will be phased out as internal clients are removed + // from Kubernetes. NegotiatedSerializer runtime.NegotiatedSerializer } @@ -283,14 +286,6 @@ func RESTClientFor(config *Config) (*RESTClient, error) { if config.NegotiatedSerializer == nil { return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient") } - qps := config.QPS - if config.QPS == 0.0 { - qps = DefaultQPS - } - burst := config.Burst - if config.Burst == 0 { - burst = DefaultBurst - } baseURL, versionedAPIPath, err := defaultServerUrlFor(config) if err != nil { @@ -310,7 +305,33 @@ func RESTClientFor(config *Config) (*RESTClient, error) { } } - return NewRESTClient(baseURL, versionedAPIPath, config.ContentConfig, qps, burst, config.RateLimiter, httpClient) + rateLimiter := config.RateLimiter + if rateLimiter == nil { + qps := config.QPS + if config.QPS == 0.0 { + qps = DefaultQPS + } + burst := config.Burst + if config.Burst == 0 { + burst = DefaultBurst + } + if qps > 0 { + rateLimiter = flowcontrol.NewTokenBucketRateLimiter(qps, burst) + } + } + + var gv schema.GroupVersion + if config.GroupVersion != nil { + gv = *config.GroupVersion + } + clientContent := ClientContentConfig{ + AcceptContentTypes: config.AcceptContentTypes, + ContentType: config.ContentType, + GroupVersion: gv, + Negotiator: runtime.NewClientNegotiator(config.NegotiatedSerializer, gv), + } + + return NewRESTClient(baseURL, versionedAPIPath, clientContent, rateLimiter, httpClient) } // UnversionedRESTClientFor is the same as RESTClientFor, except that it allows @@ -338,13 +359,33 @@ func UnversionedRESTClientFor(config *Config) (*RESTClient, error) { } } - versionConfig := config.ContentConfig - if versionConfig.GroupVersion == nil { - v := metav1.SchemeGroupVersion - versionConfig.GroupVersion = &v + rateLimiter := config.RateLimiter + if rateLimiter == nil { + qps := config.QPS + if config.QPS == 0.0 { + qps = DefaultQPS + } + burst := config.Burst + if config.Burst == 0 { + burst = DefaultBurst + } + if qps > 0 { + rateLimiter = flowcontrol.NewTokenBucketRateLimiter(qps, burst) + } } - return NewRESTClient(baseURL, versionedAPIPath, versionConfig, config.QPS, config.Burst, config.RateLimiter, httpClient) + gv := metav1.SchemeGroupVersion + if config.GroupVersion != nil { + gv = *config.GroupVersion + } + clientContent := ClientContentConfig{ + AcceptContentTypes: config.AcceptContentTypes, + ContentType: config.ContentType, + GroupVersion: gv, + Negotiator: runtime.NewClientNegotiator(config.NegotiatedSerializer, gv), + } + + return NewRESTClient(baseURL, versionedAPIPath, clientContent, rateLimiter, httpClient) } // SetKubernetesDefaults sets default values on the provided client config for accessing the diff --git a/rest/config_test.go b/rest/config_test.go index 06915567..f8e6b2d6 100644 --- a/rest/config_test.go +++ b/rest/config_test.go @@ -155,6 +155,59 @@ func TestRESTClientRequires(t *testing.T) { } } +func TestRESTClientLimiter(t *testing.T) { + testCases := []struct { + Name string + Config Config + Limiter flowcontrol.RateLimiter + }{ + { + Config: Config{}, + Limiter: flowcontrol.NewTokenBucketRateLimiter(5, 10), + }, + { + Config: Config{QPS: 10}, + Limiter: flowcontrol.NewTokenBucketRateLimiter(10, 10), + }, + { + Config: Config{QPS: -1}, + Limiter: nil, + }, + { + Config: Config{ + RateLimiter: flowcontrol.NewTokenBucketRateLimiter(11, 12), + }, + Limiter: flowcontrol.NewTokenBucketRateLimiter(11, 12), + }, + } + for _, testCase := range testCases { + t.Run("Versioned_"+testCase.Name, func(t *testing.T) { + config := testCase.Config + config.Host = "127.0.0.1" + config.ContentConfig = ContentConfig{GroupVersion: &v1.SchemeGroupVersion, NegotiatedSerializer: scheme.Codecs} + client, err := RESTClientFor(&config) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !reflect.DeepEqual(testCase.Limiter, client.rateLimiter) { + t.Fatalf("unexpected rate limiter: %#v", client.rateLimiter) + } + }) + t.Run("Unversioned_"+testCase.Name, func(t *testing.T) { + config := testCase.Config + config.Host = "127.0.0.1" + config.ContentConfig = ContentConfig{GroupVersion: &v1.SchemeGroupVersion, NegotiatedSerializer: scheme.Codecs} + client, err := UnversionedRESTClientFor(&config) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !reflect.DeepEqual(testCase.Limiter, client.rateLimiter) { + t.Fatalf("unexpected rate limiter: %#v", client.rateLimiter) + } + }) + } +} + type fakeLimiter struct { FakeSaturation float64 FakeQPS float32 diff --git a/rest/fake/fake.go b/rest/fake/fake.go index bbba2da3..293e0969 100644 --- a/rest/fake/fake.go +++ b/rest/fake/fake.go @@ -29,6 +29,8 @@ import ( "k8s.io/client-go/util/flowcontrol" ) +// CreateHTTPClient creates an http.Client that will invoke the provided roundTripper func +// when a request is made. func CreateHTTPClient(roundTripper func(*http.Request) (*http.Response, error)) *http.Client { return &http.Client{ Transport: roundTripperFunc(roundTripper), @@ -41,40 +43,49 @@ func (f roundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) { return f(req) } -// RESTClient provides a fake RESTClient interface. +// RESTClient provides a fake RESTClient interface. It is used to mock network +// interactions via a rest.Request, or to make them via the provided Client to +// a specific server. type RESTClient struct { - Client *http.Client NegotiatedSerializer runtime.NegotiatedSerializer GroupVersion schema.GroupVersion VersionedAPIPath string - Req *http.Request + // Err is returned when any request would be made to the server. If Err is set, + // Req will not be recorded, Resp will not be returned, and Client will not be + // invoked. + Err error + // Req is set to the last request that was executed (had the methods Do/DoRaw) invoked. + Req *http.Request + // If Client is specified, the client will be invoked instead of returning Resp if + // Err is not set. + Client *http.Client + // Resp is returned to the caller after Req is recorded, unless Err or Client are set. Resp *http.Response - Err error } func (c *RESTClient) Get() *restclient.Request { - return c.request("GET") + return c.Verb("GET") } func (c *RESTClient) Put() *restclient.Request { - return c.request("PUT") + return c.Verb("PUT") } func (c *RESTClient) Patch(pt types.PatchType) *restclient.Request { - return c.request("PATCH").SetHeader("Content-Type", string(pt)) + return c.Verb("PATCH").SetHeader("Content-Type", string(pt)) } func (c *RESTClient) Post() *restclient.Request { - return c.request("POST") + return c.Verb("POST") } func (c *RESTClient) Delete() *restclient.Request { - return c.request("DELETE") + return c.Verb("DELETE") } func (c *RESTClient) Verb(verb string) *restclient.Request { - return c.request(verb) + return c.Request().Verb(verb) } func (c *RESTClient) APIVersion() schema.GroupVersion { @@ -85,28 +96,17 @@ func (c *RESTClient) GetRateLimiter() flowcontrol.RateLimiter { return nil } -func (c *RESTClient) request(verb string) *restclient.Request { - config := restclient.ContentConfig{ - ContentType: runtime.ContentTypeJSON, - GroupVersion: &c.GroupVersion, - NegotiatedSerializer: c.NegotiatedSerializer, +func (c *RESTClient) Request() *restclient.Request { + config := restclient.ClientContentConfig{ + ContentType: runtime.ContentTypeJSON, + GroupVersion: c.GroupVersion, + Negotiator: runtime.NewClientNegotiator(c.NegotiatedSerializer, c.GroupVersion), } - - ns := c.NegotiatedSerializer - info, _ := runtime.SerializerInfoForMediaType(ns.SupportedMediaTypes(), runtime.ContentTypeJSON) - serializers := restclient.Serializers{ - // TODO this was hardcoded before, but it doesn't look right - Encoder: ns.EncoderForVersion(info.Serializer, c.GroupVersion), - Decoder: ns.DecoderToVersion(info.Serializer, c.GroupVersion), - } - if info.StreamSerializer != nil { - serializers.StreamingSerializer = info.StreamSerializer.Serializer - serializers.Framer = info.StreamSerializer.Framer - } - return restclient.NewRequest(c, verb, &url.URL{Host: "localhost"}, c.VersionedAPIPath, config, serializers, nil, nil, 0) + return restclient.NewRequestWithClient(&url.URL{Scheme: "https", Host: "localhost"}, c.VersionedAPIPath, config, CreateHTTPClient(c.do)) } -func (c *RESTClient) Do(req *http.Request) (*http.Response, error) { +// do is invoked when a Request() created by this client is executed. +func (c *RESTClient) do(req *http.Request) (*http.Response, error) { if c.Err != nil { return nil, c.Err } diff --git a/rest/request.go b/rest/request.go index 556061de..9e0c2611 100644 --- a/rest/request.go +++ b/rest/request.go @@ -48,7 +48,8 @@ import ( var ( // longThrottleLatency defines threshold for logging requests. All requests being - // throttle for more than longThrottleLatency will be logged. + // throttled (via the provided rateLimiter) for more than longThrottleLatency will + // be logged. longThrottleLatency = 50 * time.Millisecond ) @@ -74,19 +75,20 @@ func (r *RequestConstructionError) Error() string { return fmt.Sprintf("request construction error: '%v'", r.Err) } +var noBackoff = &NoBackoff{} + // Request allows for building up a request to a server in a chained fashion. // Any errors are stored until the end of your call, so you only have to // check once. type Request struct { - // required - client HTTPClient - verb string + c *RESTClient - baseURL *url.URL - content ContentConfig - serializers Serializers + rateLimiter flowcontrol.RateLimiter + backoff BackoffManager + timeout time.Duration // generic components accessible via method setters + verb string pathPrefix string subpath string params url.Values @@ -98,7 +100,6 @@ type Request struct { resource string resourceName string subresource string - timeout time.Duration // output err error @@ -106,42 +107,63 @@ type Request struct { // This is only used for per-request timeouts, deadlines, and cancellations. ctx context.Context - - backoffMgr BackoffManager - throttle flowcontrol.RateLimiter } // NewRequest creates a new request helper object for accessing runtime.Objects on a server. -func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, serializers Serializers, backoff BackoffManager, throttle flowcontrol.RateLimiter, timeout time.Duration) *Request { +func NewRequest(c *RESTClient) *Request { + var backoff BackoffManager + if c.createBackoffMgr != nil { + backoff = c.createBackoffMgr() + } if backoff == nil { - klog.V(2).Infof("Not implementing request backoff strategy.") - backoff = &NoBackoff{} + backoff = noBackoff } - pathPrefix := "/" - if baseURL != nil { - pathPrefix = path.Join(pathPrefix, baseURL.Path) + var pathPrefix string + if c.base != nil { + pathPrefix = path.Join("/", c.base.Path, c.versionedAPIPath) + } else { + pathPrefix = path.Join("/", c.versionedAPIPath) } + + var timeout time.Duration + if c.Client != nil { + timeout = c.Client.Timeout + } + r := &Request{ - client: client, - verb: verb, - baseURL: baseURL, - pathPrefix: path.Join(pathPrefix, versionedAPIPath), - content: content, - serializers: serializers, - backoffMgr: backoff, - throttle: throttle, + c: c, + rateLimiter: c.rateLimiter, + backoff: backoff, timeout: timeout, + pathPrefix: pathPrefix, } + switch { - case len(content.AcceptContentTypes) > 0: - r.SetHeader("Accept", content.AcceptContentTypes) - case len(content.ContentType) > 0: - r.SetHeader("Accept", content.ContentType+", */*") + case len(c.content.AcceptContentTypes) > 0: + r.SetHeader("Accept", c.content.AcceptContentTypes) + case len(c.content.ContentType) > 0: + r.SetHeader("Accept", c.content.ContentType+", */*") } return r } +// NewRequestWithClient creates a Request with an embedded RESTClient for use in test scenarios. +func NewRequestWithClient(base *url.URL, versionedAPIPath string, content ClientContentConfig, client *http.Client) *Request { + return NewRequest(&RESTClient{ + base: base, + versionedAPIPath: versionedAPIPath, + content: content, + Client: client, + }) +} + +// Verb sets the verb this request will use. +func (r *Request) Verb(verb string) *Request { + r.verb = verb + return r +} + // Prefix adds segments to the relative beginning to the request path. These // items will be placed before the optional Namespace, Resource, or Name sections. // Setting AbsPath will clear any previously set Prefix segments @@ -184,17 +206,17 @@ func (r *Request) Resource(resource string) *Request { // or defaults to the stub implementation if nil is provided func (r *Request) BackOff(manager BackoffManager) *Request { if manager == nil { - r.backoffMgr = &NoBackoff{} + r.backoff = &NoBackoff{} return r } - r.backoffMgr = manager + r.backoff = manager return r } // Throttle receives a rate-limiter and sets or replaces an existing request limiter func (r *Request) Throttle(limiter flowcontrol.RateLimiter) *Request { - r.throttle = limiter + r.rateLimiter = limiter return r } @@ -272,8 +294,8 @@ func (r *Request) AbsPath(segments ...string) *Request { if r.err != nil { return r } - r.pathPrefix = path.Join(r.baseURL.Path, path.Join(segments...)) - if len(segments) == 1 && (len(r.baseURL.Path) > 1 || len(segments[0]) > 1) && strings.HasSuffix(segments[0], "/") { + r.pathPrefix = path.Join(r.c.base.Path, path.Join(segments...)) + if len(segments) == 1 && (len(r.c.base.Path) > 1 || len(segments[0]) > 1) && strings.HasSuffix(segments[0], "/") { // preserve any trailing slashes for legacy behavior r.pathPrefix += "/" } @@ -317,7 +339,7 @@ func (r *Request) Param(paramName, s string) *Request { // VersionedParams will not write query parameters that have omitempty set and are empty. If a // parameter has already been set it is appended to (Params and VersionedParams are additive). func (r *Request) VersionedParams(obj runtime.Object, codec runtime.ParameterCodec) *Request { - return r.SpecificallyVersionedParams(obj, codec, *r.content.GroupVersion) + return r.SpecificallyVersionedParams(obj, codec, r.c.content.GroupVersion) } func (r *Request) SpecificallyVersionedParams(obj runtime.Object, codec runtime.ParameterCodec, version schema.GroupVersion) *Request { @@ -397,14 +419,19 @@ func (r *Request) Body(obj interface{}) *Request { if reflect.ValueOf(t).IsNil() { return r } - data, err := runtime.Encode(r.serializers.Encoder, t) + encoder, err := r.c.content.Negotiator.Encoder(r.c.content.ContentType, nil) + if err != nil { + r.err = err + return r + } + data, err := runtime.Encode(encoder, t) if err != nil { r.err = err return r } glogBody("Request Body", data) r.body = bytes.NewReader(data) - r.SetHeader("Content-Type", r.content.ContentType) + r.SetHeader("Content-Type", r.c.content.ContentType) default: r.err = fmt.Errorf("unknown type used for body: %+v", obj) } @@ -433,8 +460,8 @@ func (r *Request) URL() *url.URL { } finalURL := &url.URL{} - if r.baseURL != nil { - *finalURL = *r.baseURL + if r.c.base != nil { + *finalURL = *r.c.base } finalURL.Path = p @@ -468,8 +495,8 @@ func (r Request) finalURLTemplate() url.URL { segments := strings.Split(r.URL().Path, "/") groupIndex := 0 index := 0 - if r.URL() != nil && r.baseURL != nil && strings.Contains(r.URL().Path, r.baseURL.Path) { - groupIndex += len(strings.Split(r.baseURL.Path, "/")) + if r.URL() != nil && r.c.base != nil && strings.Contains(r.URL().Path, r.c.base.Path) { + groupIndex += len(strings.Split(r.c.base.Path, "/")) } if groupIndex >= len(segments) { return *url @@ -522,16 +549,16 @@ func (r Request) finalURLTemplate() url.URL { } func (r *Request) tryThrottle() error { - if r.throttle == nil { + if r.rateLimiter == nil { return nil } now := time.Now() var err error if r.ctx != nil { - err = r.throttle.Wait(r.ctx) + err = r.rateLimiter.Wait(r.ctx) } else { - r.throttle.Accept() + r.rateLimiter.Accept() } if latency := time.Since(now); latency > longThrottleLatency { @@ -544,27 +571,11 @@ func (r *Request) tryThrottle() error { // Watch attempts to begin watching the requested location. // Returns a watch.Interface, or an error. func (r *Request) Watch() (watch.Interface, error) { - return r.WatchWithSpecificDecoders( - func(body io.ReadCloser) streaming.Decoder { - framer := r.serializers.Framer.NewFrameReader(body) - return streaming.NewDecoder(framer, r.serializers.StreamingSerializer) - }, - r.serializers.Decoder, - ) -} - -// WatchWithSpecificDecoders attempts to begin watching the requested location with a *different* decoder. -// Turns out that you want one "standard" decoder for the watch event and one "personal" decoder for the content -// Returns a watch.Interface, or an error. -func (r *Request) WatchWithSpecificDecoders(wrapperDecoderFn func(io.ReadCloser) streaming.Decoder, embeddedDecoder runtime.Decoder) (watch.Interface, error) { // We specifically don't want to rate limit watches, so we - // don't use r.throttle here. + // don't use r.rateLimiter here. if r.err != nil { return nil, r.err } - if r.serializers.Framer == nil { - return nil, fmt.Errorf("watching resources is not possible with this client (content-type: %s)", r.content.ContentType) - } url := r.URL().String() req, err := http.NewRequest(r.verb, url, r.body) @@ -575,18 +586,18 @@ func (r *Request) WatchWithSpecificDecoders(wrapperDecoderFn func(io.ReadCloser) req = req.WithContext(r.ctx) } req.Header = r.headers - client := r.client + client := r.c.Client if client == nil { client = http.DefaultClient } - r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL())) + r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL())) resp, err := client.Do(req) updateURLMetrics(r, resp, err) - if r.baseURL != nil { + if r.c.base != nil { if err != nil { - r.backoffMgr.UpdateBackoff(r.baseURL, err, 0) + r.backoff.UpdateBackoff(r.c.base, err, 0) } else { - r.backoffMgr.UpdateBackoff(r.baseURL, err, resp.StatusCode) + r.backoff.UpdateBackoff(r.c.base, err, resp.StatusCode) } } if err != nil { @@ -604,9 +615,22 @@ func (r *Request) WatchWithSpecificDecoders(wrapperDecoderFn func(io.ReadCloser) } return nil, fmt.Errorf("for request %s, got status: %v", url, resp.StatusCode) } - wrapperDecoder := wrapperDecoderFn(resp.Body) + + contentType := resp.Header.Get("Content-Type") + mediaType, params, err := mime.ParseMediaType(contentType) + if err != nil { + klog.V(4).Infof("Unexpected content type from the server: %q: %v", contentType, err) + } + objectDecoder, streamingSerializer, framer, err := r.c.content.Negotiator.StreamDecoder(mediaType, params) + if err != nil { + return nil, err + } + + frameReader := framer.NewFrameReader(resp.Body) + watchEventDecoder := streaming.NewDecoder(frameReader, streamingSerializer) + return watch.NewStreamWatcher( - restclientwatch.NewDecoder(wrapperDecoder, embeddedDecoder), + restclientwatch.NewDecoder(watchEventDecoder, objectDecoder), // use 500 to indicate that the cause of the error is unknown - other error codes // are more specific to HTTP interactions, and set a reason errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"), @@ -617,8 +641,8 @@ func (r *Request) WatchWithSpecificDecoders(wrapperDecoderFn func(io.ReadCloser) // It also handles corner cases for incomplete/invalid request data. func updateURLMetrics(req *Request, resp *http.Response, err error) { url := "none" - if req.baseURL != nil { - url = req.baseURL.Host + if req.c.base != nil { + url = req.c.base.Host } // Errors can be arbitrary strings. Unbound label cardinality is not suitable for a metric @@ -656,18 +680,18 @@ func (r *Request) Stream() (io.ReadCloser, error) { req = req.WithContext(r.ctx) } req.Header = r.headers - client := r.client + client := r.c.Client if client == nil { client = http.DefaultClient } - r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL())) + r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL())) resp, err := client.Do(req) updateURLMetrics(r, resp, err) - if r.baseURL != nil { + if r.c.base != nil { if err != nil { - r.backoffMgr.UpdateBackoff(r.URL(), err, 0) + r.backoff.UpdateBackoff(r.URL(), err, 0) } else { - r.backoffMgr.UpdateBackoff(r.URL(), err, resp.StatusCode) + r.backoff.UpdateBackoff(r.URL(), err, resp.StatusCode) } } if err != nil { @@ -738,7 +762,7 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error { return err } - client := r.client + client := r.c.Client if client == nil { client = http.DefaultClient } @@ -765,11 +789,11 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error { } req.Header = r.headers - r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL())) + r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL())) if retries > 0 { // We are retrying the request that we already send to apiserver // at least once before. - // This request should also be throttled with the client-internal throttler. + // This request should also be throttled with the client-internal rate limiter. if err := r.tryThrottle(); err != nil { return err } @@ -777,9 +801,9 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error { resp, err := client.Do(req) updateURLMetrics(r, resp, err) if err != nil { - r.backoffMgr.UpdateBackoff(r.URL(), err, 0) + r.backoff.UpdateBackoff(r.URL(), err, 0) } else { - r.backoffMgr.UpdateBackoff(r.URL(), err, resp.StatusCode) + r.backoff.UpdateBackoff(r.URL(), err, resp.StatusCode) } if err != nil { // "Connection reset by peer" is usually a transient error. @@ -822,7 +846,7 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error { } klog.V(4).Infof("Got a Retry-After %ds response for attempt %d to %v", seconds, retries, url) - r.backoffMgr.Sleep(time.Duration(seconds) * time.Second) + r.backoff.Sleep(time.Duration(seconds) * time.Second) return false } fn(req, resp) @@ -908,14 +932,18 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu glogBody("Response Body", body) // verify the content type is accurate + var decoder runtime.Decoder contentType := resp.Header.Get("Content-Type") - decoder := r.serializers.Decoder - if len(contentType) > 0 && (decoder == nil || (len(r.content.ContentType) > 0 && contentType != r.content.ContentType)) { + if len(contentType) == 0 { + contentType = r.c.content.ContentType + } + if len(contentType) > 0 { + var err error mediaType, params, err := mime.ParseMediaType(contentType) if err != nil { return Result{err: errors.NewInternalError(err)} } - decoder, err = r.serializers.RenegotiatedDecoder(mediaType, params) + decoder, err = r.c.content.Negotiator.Decoder(mediaType, params) if err != nil { // if we fail to negotiate a decoder, treat this as an unstructured error switch { @@ -1035,7 +1063,7 @@ func (r *Request) newUnstructuredResponseError(body []byte, isTextResponse bool, } var groupResource schema.GroupResource if len(r.resource) > 0 { - groupResource.Group = r.content.GroupVersion.Group + groupResource.Group = r.c.content.GroupVersion.Group groupResource.Resource = r.resource } return errors.NewGenericServerResponse( diff --git a/rest/request_test.go b/rest/request_test.go index dd8d789c..0c8efe66 100644 --- a/rest/request_test.go +++ b/rest/request_test.go @@ -56,24 +56,30 @@ import ( ) func TestNewRequestSetsAccept(t *testing.T) { - r := NewRequest(nil, "get", &url.URL{Path: "/path/"}, "", ContentConfig{}, Serializers{}, nil, nil, 0) + r := NewRequestWithClient(&url.URL{Path: "/path/"}, "", ClientContentConfig{}, nil).Verb("get") if r.headers.Get("Accept") != "" { t.Errorf("unexpected headers: %#v", r.headers) } - r = NewRequest(nil, "get", &url.URL{Path: "/path/"}, "", ContentConfig{ContentType: "application/other"}, Serializers{}, nil, nil, 0) + r = NewRequestWithClient(&url.URL{Path: "/path/"}, "", ClientContentConfig{ContentType: "application/other"}, nil).Verb("get") if r.headers.Get("Accept") != "application/other, */*" { t.Errorf("unexpected headers: %#v", r.headers) } } +func clientForFunc(fn clientFunc) *http.Client { + return &http.Client{ + Transport: fn, + } +} + type clientFunc func(req *http.Request) (*http.Response, error) -func (f clientFunc) Do(req *http.Request) (*http.Response, error) { +func (f clientFunc) RoundTrip(req *http.Request) (*http.Response, error) { return f(req) } func TestRequestSetsHeaders(t *testing.T) { - server := clientFunc(func(req *http.Request) (*http.Response, error) { + server := clientForFunc(func(req *http.Request) (*http.Response, error) { if req.Header.Get("Accept") != "application/other, */*" { t.Errorf("unexpected headers: %#v", req.Header) } @@ -84,8 +90,8 @@ func TestRequestSetsHeaders(t *testing.T) { }) config := defaultContentConfig() config.ContentType = "application/other" - serializers := defaultSerializers(t) - r := NewRequest(server, "get", &url.URL{Path: "/path"}, "", config, serializers, nil, nil, 0) + r := NewRequestWithClient(&url.URL{Path: "/path"}, "", config, nil).Verb("get") + r.c.Client = server // Check if all "issue" methods are setting headers. _ = r.Do() @@ -96,8 +102,10 @@ func TestRequestSetsHeaders(t *testing.T) { func TestRequestWithErrorWontChange(t *testing.T) { gvCopy := v1.SchemeGroupVersion original := Request{ - err: errors.New("test"), - content: ContentConfig{GroupVersion: &gvCopy}, + err: errors.New("test"), + c: &RESTClient{ + content: ClientContentConfig{GroupVersion: gvCopy}, + }, } r := original changed := r.Param("foo", "bar"). @@ -118,26 +126,26 @@ func TestRequestWithErrorWontChange(t *testing.T) { } func TestRequestPreservesBaseTrailingSlash(t *testing.T) { - r := &Request{baseURL: &url.URL{}, pathPrefix: "/path/"} + r := &Request{c: &RESTClient{base: &url.URL{}}, pathPrefix: "/path/"} if s := r.URL().String(); s != "/path/" { t.Errorf("trailing slash should be preserved: %s", s) } } func TestRequestAbsPathPreservesTrailingSlash(t *testing.T) { - r := (&Request{baseURL: &url.URL{}}).AbsPath("/foo/") + r := (&Request{c: &RESTClient{base: &url.URL{}}}).AbsPath("/foo/") if s := r.URL().String(); s != "/foo/" { t.Errorf("trailing slash should be preserved: %s", s) } - r = (&Request{baseURL: &url.URL{}}).AbsPath("/foo/") + r = (&Request{c: &RESTClient{base: &url.URL{}}}).AbsPath("/foo/") if s := r.URL().String(); s != "/foo/" { t.Errorf("trailing slash should be preserved: %s", s) } } func TestRequestAbsPathJoins(t *testing.T) { - r := (&Request{baseURL: &url.URL{}}).AbsPath("foo/bar", "baz") + r := (&Request{c: &RESTClient{base: &url.URL{}}}).AbsPath("foo/bar", "baz") if s := r.URL().String(); s != "foo/bar/baz" { t.Errorf("trailing slash should be preserved: %s", s) } @@ -145,9 +153,7 @@ func TestRequestAbsPathJoins(t *testing.T) { func TestRequestSetsNamespace(t *testing.T) { r := (&Request{ - baseURL: &url.URL{ - Path: "/", - }, + c: &RESTClient{base: &url.URL{Path: "/"}}, }).Namespace("foo") if r.namespace == "" { t.Errorf("namespace should be set: %#v", r) @@ -160,7 +166,7 @@ func TestRequestSetsNamespace(t *testing.T) { func TestRequestOrdersNamespaceInPath(t *testing.T) { r := (&Request{ - baseURL: &url.URL{}, + c: &RESTClient{base: &url.URL{}}, pathPrefix: "/test/", }).Name("bar").Resource("baz").Namespace("foo") if s := r.URL().String(); s != "/test/namespaces/foo/baz/bar" { @@ -170,7 +176,7 @@ func TestRequestOrdersNamespaceInPath(t *testing.T) { func TestRequestOrdersSubResource(t *testing.T) { r := (&Request{ - baseURL: &url.URL{}, + c: &RESTClient{base: &url.URL{}}, pathPrefix: "/test/", }).Name("bar").Resource("baz").Namespace("foo").Suffix("test").SubResource("a", "b") if s := r.URL().String(); s != "/test/namespaces/foo/baz/bar/a/b/test" { @@ -226,7 +232,7 @@ func TestRequestParam(t *testing.T) { } func TestRequestVersionedParams(t *testing.T) { - r := (&Request{content: ContentConfig{GroupVersion: &v1.SchemeGroupVersion}}).Param("foo", "a") + r := (&Request{c: &RESTClient{content: ClientContentConfig{GroupVersion: v1.SchemeGroupVersion}}}).Param("foo", "a") if !reflect.DeepEqual(r.params, url.Values{"foo": []string{"a"}}) { t.Errorf("should have set a param: %#v", r) } @@ -242,7 +248,7 @@ func TestRequestVersionedParams(t *testing.T) { } func TestRequestVersionedParamsFromListOptions(t *testing.T) { - r := &Request{content: ContentConfig{GroupVersion: &v1.SchemeGroupVersion}} + r := &Request{c: &RESTClient{content: ClientContentConfig{GroupVersion: v1.SchemeGroupVersion}}} r.VersionedParams(&metav1.ListOptions{ResourceVersion: "1"}, scheme.ParameterCodec) if !reflect.DeepEqual(r.params, url.Values{ "resourceVersion": []string{"1"}, @@ -277,24 +283,15 @@ type NotAnAPIObject struct{} func (obj NotAnAPIObject) GroupVersionKind() *schema.GroupVersionKind { return nil } func (obj NotAnAPIObject) SetGroupVersionKind(gvk *schema.GroupVersionKind) {} -func defaultContentConfig() ContentConfig { +func defaultContentConfig() ClientContentConfig { gvCopy := v1.SchemeGroupVersion - return ContentConfig{ - ContentType: "application/json", - GroupVersion: &gvCopy, - NegotiatedSerializer: scheme.Codecs.WithoutConversion(), + return ClientContentConfig{ + ContentType: "application/json", + GroupVersion: gvCopy, + Negotiator: runtime.NewClientNegotiator(scheme.Codecs.WithoutConversion(), gvCopy), } } -func defaultSerializers(t *testing.T) Serializers { - config := defaultContentConfig() - serializers, err := createSerializers(config) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - return *serializers -} - func TestRequestBody(t *testing.T) { // test unknown type r := (&Request{}).Body([]string{"test"}) @@ -315,7 +312,7 @@ func TestRequestBody(t *testing.T) { } // test unencodable api object - r = (&Request{content: defaultContentConfig()}).Body(&NotAnAPIObject{}) + r = (&Request{c: &RESTClient{content: defaultContentConfig()}}).Body(&NotAnAPIObject{}) if r.err == nil || r.body != nil { t.Errorf("should have set err and left body nil: %#v", r) } @@ -347,14 +344,14 @@ func TestURLTemplate(t *testing.T) { }{ { // non dynamic client - Request: NewRequest(nil, "POST", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0). + Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("POST"). Prefix("api", "v1").Resource("r1").Namespace("ns").Name("nm").Param("p0", "v0"), ExpectedFullURL: "http://localhost/some/base/url/path/api/v1/namespaces/ns/r1/nm?p0=v0", ExpectedFinalURL: "http://localhost/some/base/url/path/api/v1/namespaces/%7Bnamespace%7D/r1/%7Bname%7D?p0=%7Bvalue%7D", }, { // non dynamic client with wrong api group - Request: NewRequest(nil, "POST", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0). + Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("POST"). Prefix("pre1", "v1").Resource("r1").Namespace("ns").Name("nm").Param("p0", "v0"), ExpectedFullURL: "http://localhost/some/base/url/path/pre1/v1/namespaces/ns/r1/nm?p0=v0", ExpectedFinalURL: "http://localhost/%7Bprefix%7D", @@ -362,7 +359,7 @@ func TestURLTemplate(t *testing.T) { { // dynamic client with core group + namespace + resourceResource (with name) // /api/$RESOURCEVERSION/namespaces/$NAMESPACE/$RESOURCE/%NAME - Request: NewRequest(nil, "DELETE", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0). + Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE"). Prefix("/api/v1/namespaces/ns/r1/name1"), ExpectedFullURL: "http://localhost/some/base/url/path/api/v1/namespaces/ns/r1/name1", ExpectedFinalURL: "http://localhost/some/base/url/path/api/v1/namespaces/%7Bnamespace%7D/r1/%7Bname%7D", @@ -370,7 +367,7 @@ func TestURLTemplate(t *testing.T) { { // dynamic client with named group + namespace + resourceResource (with name) // /apis/$NAMEDGROUPNAME/$RESOURCEVERSION/namespaces/$NAMESPACE/$RESOURCE/%NAME - Request: NewRequest(nil, "DELETE", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0). + Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE"). Prefix("/apis/g1/v1/namespaces/ns/r1/name1"), ExpectedFullURL: "http://localhost/some/base/url/path/apis/g1/v1/namespaces/ns/r1/name1", ExpectedFinalURL: "http://localhost/some/base/url/path/apis/g1/v1/namespaces/%7Bnamespace%7D/r1/%7Bname%7D", @@ -378,7 +375,7 @@ func TestURLTemplate(t *testing.T) { { // dynamic client with core group + namespace + resourceResource (with NO name) // /api/$RESOURCEVERSION/namespaces/$NAMESPACE/$RESOURCE - Request: NewRequest(nil, "DELETE", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0). + Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE"). Prefix("/api/v1/namespaces/ns/r1"), ExpectedFullURL: "http://localhost/some/base/url/path/api/v1/namespaces/ns/r1", ExpectedFinalURL: "http://localhost/some/base/url/path/api/v1/namespaces/%7Bnamespace%7D/r1", @@ -386,7 +383,7 @@ func TestURLTemplate(t *testing.T) { { // dynamic client with named group + namespace + resourceResource (with NO name) // /apis/$NAMEDGROUPNAME/$RESOURCEVERSION/namespaces/$NAMESPACE/$RESOURCE - Request: NewRequest(nil, "DELETE", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0). + Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE"). Prefix("/apis/g1/v1/namespaces/ns/r1"), ExpectedFullURL: "http://localhost/some/base/url/path/apis/g1/v1/namespaces/ns/r1", ExpectedFinalURL: "http://localhost/some/base/url/path/apis/g1/v1/namespaces/%7Bnamespace%7D/r1", @@ -394,7 +391,7 @@ func TestURLTemplate(t *testing.T) { { // dynamic client with core group + resourceResource (with name) // /api/$RESOURCEVERSION/$RESOURCE/%NAME - Request: NewRequest(nil, "DELETE", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0). + Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE"). Prefix("/api/v1/r1/name1"), ExpectedFullURL: "http://localhost/some/base/url/path/api/v1/r1/name1", ExpectedFinalURL: "http://localhost/some/base/url/path/api/v1/r1/%7Bname%7D", @@ -402,7 +399,7 @@ func TestURLTemplate(t *testing.T) { { // dynamic client with named group + resourceResource (with name) // /apis/$NAMEDGROUPNAME/$RESOURCEVERSION/$RESOURCE/%NAME - Request: NewRequest(nil, "DELETE", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0). + Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE"). Prefix("/apis/g1/v1/r1/name1"), ExpectedFullURL: "http://localhost/some/base/url/path/apis/g1/v1/r1/name1", ExpectedFinalURL: "http://localhost/some/base/url/path/apis/g1/v1/r1/%7Bname%7D", @@ -410,7 +407,7 @@ func TestURLTemplate(t *testing.T) { { // dynamic client with named group + namespace + resourceResource (with name) + subresource // /apis/$NAMEDGROUPNAME/$RESOURCEVERSION/namespaces/$NAMESPACE/$RESOURCE/%NAME/$SUBRESOURCE - Request: NewRequest(nil, "DELETE", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0). + Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE"). Prefix("/apis/namespaces/namespaces/namespaces/namespaces/namespaces/namespaces/finalize"), ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/namespaces/namespaces/namespaces/finalize", ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/%7Bnamespace%7D/namespaces/%7Bname%7D/finalize", @@ -418,7 +415,7 @@ func TestURLTemplate(t *testing.T) { { // dynamic client with named group + namespace + resourceResource (with name) // /apis/$NAMEDGROUPNAME/$RESOURCEVERSION/namespaces/$NAMESPACE/$RESOURCE/%NAME - Request: NewRequest(nil, "DELETE", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0). + Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE"). Prefix("/apis/namespaces/namespaces/namespaces/namespaces/namespaces/namespaces"), ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/namespaces/namespaces/namespaces", ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/%7Bnamespace%7D/namespaces/%7Bname%7D", @@ -426,7 +423,7 @@ func TestURLTemplate(t *testing.T) { { // dynamic client with named group + namespace + resourceResource (with NO name) + subresource // /apis/$NAMEDGROUPNAME/$RESOURCEVERSION/namespaces/$NAMESPACE/$RESOURCE/%SUBRESOURCE - Request: NewRequest(nil, "DELETE", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0). + Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE"). Prefix("/apis/namespaces/namespaces/namespaces/namespaces/namespaces/finalize"), ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/namespaces/namespaces/finalize", ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/%7Bnamespace%7D/namespaces/finalize", @@ -434,7 +431,7 @@ func TestURLTemplate(t *testing.T) { { // dynamic client with named group + namespace + resourceResource (with NO name) + subresource // /apis/$NAMEDGROUPNAME/$RESOURCEVERSION/namespaces/$NAMESPACE/$RESOURCE/%SUBRESOURCE - Request: NewRequest(nil, "DELETE", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0). + Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE"). Prefix("/apis/namespaces/namespaces/namespaces/namespaces/namespaces/status"), ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/namespaces/namespaces/status", ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/%7Bnamespace%7D/namespaces/status", @@ -442,7 +439,7 @@ func TestURLTemplate(t *testing.T) { { // dynamic client with named group + namespace + resourceResource (with no name) // /apis/$NAMEDGROUPNAME/$RESOURCEVERSION/namespaces/$NAMESPACE/$RESOURCE/%NAME - Request: NewRequest(nil, "DELETE", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0). + Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE"). Prefix("/apis/namespaces/namespaces/namespaces/namespaces/namespaces"), ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/namespaces/namespaces", ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/%7Bnamespace%7D/namespaces", @@ -450,7 +447,7 @@ func TestURLTemplate(t *testing.T) { { // dynamic client with named group + resourceResource (with name) + subresource // /apis/$NAMEDGROUPNAME/$RESOURCEVERSION/namespaces/$NAMESPACE/$RESOURCE/%NAME - Request: NewRequest(nil, "DELETE", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0). + Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE"). Prefix("/apis/namespaces/namespaces/namespaces/namespaces/finalize"), ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/namespaces/finalize", ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/%7Bname%7D/finalize", @@ -458,7 +455,7 @@ func TestURLTemplate(t *testing.T) { { // dynamic client with named group + resourceResource (with name) + subresource // /apis/$NAMEDGROUPNAME/$RESOURCEVERSION/namespaces/$NAMESPACE/$RESOURCE/%NAME - Request: NewRequest(nil, "DELETE", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0). + Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE"). Prefix("/apis/namespaces/namespaces/namespaces/namespaces/status"), ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/namespaces/status", ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/%7Bname%7D/status", @@ -466,7 +463,7 @@ func TestURLTemplate(t *testing.T) { { // dynamic client with named group + resourceResource (with name) // /apis/$NAMEDGROUPNAME/$RESOURCEVERSION/$RESOURCE/%NAME - Request: NewRequest(nil, "DELETE", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0). + Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE"). Prefix("/apis/namespaces/namespaces/namespaces/namespaces"), ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/namespaces", ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/%7Bname%7D", @@ -474,7 +471,7 @@ func TestURLTemplate(t *testing.T) { { // dynamic client with named group + resourceResource (with no name) // /apis/$NAMEDGROUPNAME/$RESOURCEVERSION/$RESOURCE/%NAME - Request: NewRequest(nil, "DELETE", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0). + Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE"). Prefix("/apis/namespaces/namespaces/namespaces"), ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces", ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces", @@ -482,7 +479,7 @@ func TestURLTemplate(t *testing.T) { { // dynamic client with wrong api group + namespace + resourceResource (with name) + subresource // /apis/$NAMEDGROUPNAME/$RESOURCEVERSION/namespaces/$NAMESPACE/$RESOURCE/%NAME/$SUBRESOURCE - Request: NewRequest(nil, "DELETE", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0). + Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE"). Prefix("/pre1/namespaces/namespaces/namespaces/namespaces/namespaces/namespaces/finalize"), ExpectedFullURL: "http://localhost/some/base/url/path/pre1/namespaces/namespaces/namespaces/namespaces/namespaces/namespaces/finalize", ExpectedFinalURL: "http://localhost/%7Bprefix%7D", @@ -550,7 +547,7 @@ func TestTransformResponse(t *testing.T) { {Response: &http.Response{StatusCode: http.StatusOK, Body: ioutil.NopCloser(bytes.NewReader(invalid))}, Data: invalid}, } for i, test := range testCases { - r := NewRequest(nil, "", uri, "", defaultContentConfig(), defaultSerializers(t), nil, nil, 0) + r := NewRequestWithClient(uri, "", defaultContentConfig(), nil) if test.Response.Body == nil { test.Response.Body = ioutil.NopCloser(bytes.NewReader([]byte{})) } @@ -589,13 +586,21 @@ type renegotiator struct { err error } -func (r *renegotiator) invoke(contentType string, params map[string]string) (runtime.Decoder, error) { +func (r *renegotiator) Decoder(contentType string, params map[string]string) (runtime.Decoder, error) { r.called = true r.contentType = contentType r.params = params return r.decoder, r.err } +func (r *renegotiator) Encoder(contentType string, params map[string]string) (runtime.Encoder, error) { + return nil, fmt.Errorf("UNIMPLEMENTED") +} + +func (r *renegotiator) StreamDecoder(contentType string, params map[string]string) (runtime.Decoder, runtime.Serializer, runtime.Framer, error) { + return nil, nil, nil, fmt.Errorf("UNIMPLEMENTED") +} + func TestTransformResponseNegotiate(t *testing.T) { invalid := []byte("aaaaa") uri, _ := url.Parse("http://localhost") @@ -619,7 +624,9 @@ func TestTransformResponseNegotiate(t *testing.T) { Header: http.Header{"Content-Type": []string{"application/json"}}, Body: ioutil.NopCloser(bytes.NewReader(invalid)), }, - Error: true, + Called: true, + ExpectContentType: "application/json", + Error: true, ErrFn: func(err error) bool { return err.Error() != "aaaaa" && apierrors.IsUnauthorized(err) }, @@ -655,22 +662,26 @@ func TestTransformResponseNegotiate(t *testing.T) { }, }, { - // no negotiation when no content type specified + // negotiate when no content type specified Response: &http.Response{ StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"text/any"}}, Body: ioutil.NopCloser(bytes.NewReader(invalid)), }, - Decoder: scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), + Decoder: scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), + Called: true, + ExpectContentType: "text/any", }, { - // no negotiation when no response content type specified + // negotiate when no response content type specified ContentType: "text/any", Response: &http.Response{ StatusCode: http.StatusOK, Body: ioutil.NopCloser(bytes.NewReader(invalid)), }, - Decoder: scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), + Decoder: scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), + Called: true, + ExpectContentType: "text/any", }, { // unrecognized content type is not handled @@ -693,15 +704,14 @@ func TestTransformResponseNegotiate(t *testing.T) { }, } for i, test := range testCases { - serializers := defaultSerializers(t) + contentConfig := defaultContentConfig() + contentConfig.ContentType = test.ContentType negotiator := &renegotiator{ decoder: test.Decoder, err: test.NegotiateErr, } - serializers.RenegotiatedDecoder = negotiator.invoke - contentConfig := defaultContentConfig() - contentConfig.ContentType = test.ContentType - r := NewRequest(nil, "", uri, "", contentConfig, serializers, nil, nil, 0) + contentConfig.Negotiator = negotiator + r := NewRequestWithClient(uri, "", contentConfig, nil) if test.Response.Body == nil { test.Response.Body = ioutil.NopCloser(bytes.NewReader([]byte{})) } @@ -828,53 +838,55 @@ func TestTransformUnstructuredError(t *testing.T) { }, } - for i, testCase := range testCases { - r := &Request{ - content: defaultContentConfig(), - serializers: defaultSerializers(t), - resourceName: testCase.Name, - resource: testCase.Resource, - } - result := r.transformResponse(testCase.Res, testCase.Req) - err := result.err - if !testCase.ErrFn(err) { - t.Errorf("unexpected error: %v", err) - continue - } - if !apierrors.IsUnexpectedServerError(err) { - t.Errorf("%d: unexpected error type: %v", i, err) - } - if len(testCase.Name) != 0 && !strings.Contains(err.Error(), testCase.Name) { - t.Errorf("unexpected error string: %s", err) - } - if len(testCase.Resource) != 0 && !strings.Contains(err.Error(), testCase.Resource) { - t.Errorf("unexpected error string: %s", err) - } + for _, testCase := range testCases { + t.Run("", func(t *testing.T) { + r := &Request{ + c: &RESTClient{ + content: defaultContentConfig(), + }, + resourceName: testCase.Name, + resource: testCase.Resource, + } + result := r.transformResponse(testCase.Res, testCase.Req) + err := result.err + if !testCase.ErrFn(err) { + t.Fatalf("unexpected error: %v", err) + } + if !apierrors.IsUnexpectedServerError(err) { + t.Errorf("unexpected error type: %v", err) + } + if len(testCase.Name) != 0 && !strings.Contains(err.Error(), testCase.Name) { + t.Errorf("unexpected error string: %s", err) + } + if len(testCase.Resource) != 0 && !strings.Contains(err.Error(), testCase.Resource) { + t.Errorf("unexpected error string: %s", err) + } - // verify Error() properly transforms the error - transformed := result.Error() - expect := testCase.Transformed - if expect == nil { - expect = err - } - if !reflect.DeepEqual(expect, transformed) { - t.Errorf("%d: unexpected Error(): %s", i, diff.ObjectReflectDiff(expect, transformed)) - } + // verify Error() properly transforms the error + transformed := result.Error() + expect := testCase.Transformed + if expect == nil { + expect = err + } + if !reflect.DeepEqual(expect, transformed) { + t.Errorf("unexpected Error(): %s", diff.ObjectReflectDiff(expect, transformed)) + } - // verify result.Get properly transforms the error - if _, err := result.Get(); !reflect.DeepEqual(expect, err) { - t.Errorf("%d: unexpected error on Get(): %s", i, diff.ObjectReflectDiff(expect, err)) - } + // verify result.Get properly transforms the error + if _, err := result.Get(); !reflect.DeepEqual(expect, err) { + t.Errorf("unexpected error on Get(): %s", diff.ObjectReflectDiff(expect, err)) + } - // verify result.Into properly handles the error - if err := result.Into(&v1.Pod{}); !reflect.DeepEqual(expect, err) { - t.Errorf("%d: unexpected error on Into(): %s", i, diff.ObjectReflectDiff(expect, err)) - } + // verify result.Into properly handles the error + if err := result.Into(&v1.Pod{}); !reflect.DeepEqual(expect, err) { + t.Errorf("unexpected error on Into(): %s", diff.ObjectReflectDiff(expect, err)) + } - // verify result.Raw leaves the error in the untransformed state - if _, err := result.Raw(); !reflect.DeepEqual(result.err, err) { - t.Errorf("%d: unexpected error on Raw(): %s", i, diff.ObjectReflectDiff(expect, err)) - } + // verify result.Raw leaves the error in the untransformed state + if _, err := result.Raw(); !reflect.DeepEqual(result.err, err) { + t.Errorf("unexpected error on Raw(): %s", diff.ObjectReflectDiff(expect, err)) + } + }) } } @@ -898,27 +910,32 @@ func TestRequestWatch(t *testing.T) { Err: true, }, { - Request: &Request{baseURL: &url.URL{}, pathPrefix: "%"}, + Request: &Request{c: &RESTClient{base: &url.URL{}}, pathPrefix: "%"}, Err: true, }, { Request: &Request{ - client: clientFunc(func(req *http.Request) (*http.Response, error) { - return nil, errors.New("err") - }), - baseURL: &url.URL{}, + c: &RESTClient{ + Client: clientForFunc(func(req *http.Request) (*http.Response, error) { + return nil, errors.New("err") + }), + base: &url.URL{}, + }, }, Err: true, }, { Request: &Request{ - content: defaultContentConfig(), - serializers: defaultSerializers(t), - client: clientFunc(func(req *http.Request) (*http.Response, error) { - resp := &http.Response{StatusCode: http.StatusOK, Body: errorReader{err: errors.New("test error")}} - return resp, nil - }), - baseURL: &url.URL{}, + c: &RESTClient{ + content: defaultContentConfig(), + Client: clientForFunc(func(req *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusForbidden, + Body: ioutil.NopCloser(bytes.NewReader([]byte{})), + }, nil + }), + base: &url.URL{}, + }, }, Expect: []watch.Event{ { @@ -943,18 +960,23 @@ func TestRequestWatch(t *testing.T) { }, }, }, + Err: true, + ErrFn: func(err error) bool { + return apierrors.IsForbidden(err) + }, }, { Request: &Request{ - content: defaultContentConfig(), - serializers: defaultSerializers(t), - client: clientFunc(func(req *http.Request) (*http.Response, error) { - return &http.Response{ - StatusCode: http.StatusForbidden, - Body: ioutil.NopCloser(bytes.NewReader([]byte{})), - }, nil - }), - baseURL: &url.URL{}, + c: &RESTClient{ + content: defaultContentConfig(), + Client: clientForFunc(func(req *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusForbidden, + Body: ioutil.NopCloser(bytes.NewReader([]byte{})), + }, nil + }), + base: &url.URL{}, + }, }, Err: true, ErrFn: func(err error) bool { @@ -963,15 +985,16 @@ func TestRequestWatch(t *testing.T) { }, { Request: &Request{ - content: defaultContentConfig(), - serializers: defaultSerializers(t), - client: clientFunc(func(req *http.Request) (*http.Response, error) { - return &http.Response{ - StatusCode: http.StatusUnauthorized, - Body: ioutil.NopCloser(bytes.NewReader([]byte{})), - }, nil - }), - baseURL: &url.URL{}, + c: &RESTClient{ + content: defaultContentConfig(), + Client: clientForFunc(func(req *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusUnauthorized, + Body: ioutil.NopCloser(bytes.NewReader([]byte{})), + }, nil + }), + base: &url.URL{}, + }, }, Err: true, ErrFn: func(err error) bool { @@ -980,18 +1003,19 @@ func TestRequestWatch(t *testing.T) { }, { Request: &Request{ - content: defaultContentConfig(), - serializers: defaultSerializers(t), - client: clientFunc(func(req *http.Request) (*http.Response, error) { - return &http.Response{ - StatusCode: http.StatusUnauthorized, - Body: ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &metav1.Status{ - Status: metav1.StatusFailure, - Reason: metav1.StatusReasonUnauthorized, - })))), - }, nil - }), - baseURL: &url.URL{}, + c: &RESTClient{ + content: defaultContentConfig(), + Client: clientForFunc(func(req *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusUnauthorized, + Body: ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &metav1.Status{ + Status: metav1.StatusFailure, + Reason: metav1.StatusReasonUnauthorized, + })))), + }, nil + }), + base: &url.URL{}, + }, }, Err: true, ErrFn: func(err error) bool { @@ -1000,63 +1024,60 @@ func TestRequestWatch(t *testing.T) { }, { Request: &Request{ - serializers: defaultSerializers(t), - client: clientFunc(func(req *http.Request) (*http.Response, error) { - return nil, io.EOF - }), - baseURL: &url.URL{}, + c: &RESTClient{ + Client: clientForFunc(func(req *http.Request) (*http.Response, error) { + return nil, io.EOF + }), + base: &url.URL{}, + }, }, Empty: true, }, { Request: &Request{ - serializers: defaultSerializers(t), - client: clientFunc(func(req *http.Request) (*http.Response, error) { - return nil, &url.Error{Err: io.EOF} - }), - baseURL: &url.URL{}, + c: &RESTClient{ + Client: clientForFunc(func(req *http.Request) (*http.Response, error) { + return nil, errors.New("http: can't write HTTP request on broken connection") + }), + base: &url.URL{}, + }, }, Empty: true, }, { Request: &Request{ - serializers: defaultSerializers(t), - client: clientFunc(func(req *http.Request) (*http.Response, error) { - return nil, errors.New("http: can't write HTTP request on broken connection") - }), - baseURL: &url.URL{}, - }, - Empty: true, - }, - { - Request: &Request{ - serializers: defaultSerializers(t), - client: clientFunc(func(req *http.Request) (*http.Response, error) { - return nil, errors.New("foo: connection reset by peer") - }), - baseURL: &url.URL{}, + c: &RESTClient{ + Client: clientForFunc(func(req *http.Request) (*http.Response, error) { + return nil, errors.New("foo: connection reset by peer") + }), + base: &url.URL{}, + }, }, Empty: true, }, } - for i, testCase := range testCases { + for _, testCase := range testCases { t.Run("", func(t *testing.T) { - testCase.Request.backoffMgr = &NoBackoff{} + testCase.Request.backoff = &NoBackoff{} watch, err := testCase.Request.Watch() hasErr := err != nil if hasErr != testCase.Err { - t.Fatalf("%d: expected %t, got %t: %v", i, testCase.Err, hasErr, err) + t.Fatalf("expected %t, got %t: %v", testCase.Err, hasErr, err) } if testCase.ErrFn != nil && !testCase.ErrFn(err) { - t.Errorf("%d: error not valid: %v", i, err) + t.Errorf("error not valid: %v", err) } if hasErr && watch != nil { - t.Fatalf("%d: watch should be nil when error is returned", i) + t.Fatalf("watch should be nil when error is returned") } + if hasErr { + return + } + defer watch.Stop() if testCase.Empty { - _, ok := <-watch.ResultChan() + evt, ok := <-watch.ResultChan() if ok { - t.Errorf("%d: expected the watch to be empty: %#v", i, watch) + t.Errorf("expected the watch to be empty: %#v", evt) } } if testCase.Expect != nil { @@ -1085,46 +1106,50 @@ func TestRequestStream(t *testing.T) { Err: true, }, { - Request: &Request{baseURL: &url.URL{}, pathPrefix: "%"}, + Request: &Request{c: &RESTClient{base: &url.URL{}}, pathPrefix: "%"}, Err: true, }, { Request: &Request{ - client: clientFunc(func(req *http.Request) (*http.Response, error) { - return nil, errors.New("err") - }), - baseURL: &url.URL{}, + c: &RESTClient{ + Client: clientForFunc(func(req *http.Request) (*http.Response, error) { + return nil, errors.New("err") + }), + base: &url.URL{}, + }, }, Err: true, }, { Request: &Request{ - client: clientFunc(func(req *http.Request) (*http.Response, error) { - return &http.Response{ - StatusCode: http.StatusUnauthorized, - Body: ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &metav1.Status{ - Status: metav1.StatusFailure, - Reason: metav1.StatusReasonUnauthorized, - })))), - }, nil - }), - content: defaultContentConfig(), - serializers: defaultSerializers(t), - baseURL: &url.URL{}, + c: &RESTClient{ + Client: clientForFunc(func(req *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusUnauthorized, + Body: ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &metav1.Status{ + Status: metav1.StatusFailure, + Reason: metav1.StatusReasonUnauthorized, + })))), + }, nil + }), + content: defaultContentConfig(), + base: &url.URL{}, + }, }, Err: true, }, { Request: &Request{ - client: clientFunc(func(req *http.Request) (*http.Response, error) { - return &http.Response{ - StatusCode: http.StatusBadRequest, - Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"a container name must be specified for pod kube-dns-v20-mz5cv, choose one of: [kubedns dnsmasq healthz]","reason":"BadRequest","code":400}`))), - }, nil - }), - content: defaultContentConfig(), - serializers: defaultSerializers(t), - baseURL: &url.URL{}, + c: &RESTClient{ + Client: clientForFunc(func(req *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusBadRequest, + Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"a container name must be specified for pod kube-dns-v20-mz5cv, choose one of: [kubedns dnsmasq healthz]","reason":"BadRequest","code":400}`))), + }, nil + }), + content: defaultContentConfig(), + base: &url.URL{}, + }, }, Err: true, ErrFn: func(err error) bool { @@ -1136,7 +1161,7 @@ func TestRequestStream(t *testing.T) { }, } for i, testCase := range testCases { - testCase.Request.backoffMgr = &NoBackoff{} + testCase.Request.backoff = &NoBackoff{} body, err := testCase.Request.Stream() hasErr := err != nil if hasErr != testCase.Err { @@ -1194,25 +1219,27 @@ func TestRequestDo(t *testing.T) { Err bool }{ { - Request: &Request{err: errors.New("bail")}, + Request: &Request{c: &RESTClient{}, err: errors.New("bail")}, Err: true, }, { - Request: &Request{baseURL: &url.URL{}, pathPrefix: "%"}, + Request: &Request{c: &RESTClient{base: &url.URL{}}, pathPrefix: "%"}, Err: true, }, { Request: &Request{ - client: clientFunc(func(req *http.Request) (*http.Response, error) { - return nil, errors.New("err") - }), - baseURL: &url.URL{}, + c: &RESTClient{ + Client: clientForFunc(func(req *http.Request) (*http.Response, error) { + return nil, errors.New("err") + }), + base: &url.URL{}, + }, }, Err: true, }, } for i, testCase := range testCases { - testCase.Request.backoffMgr = &NoBackoff{} + testCase.Request.backoff = &NoBackoff{} body, err := testCase.Request.Do().Raw() hasErr := err != nil if hasErr != testCase.Err { @@ -1281,7 +1308,7 @@ func TestBackoffLifecycle(t *testing.T) { seconds := []int{0, 1, 2, 4, 8, 0, 1, 2, 4, 0} request := c.Verb("POST").Prefix("backofftest").Suffix("abc") clock := clock.FakeClock{} - request.backoffMgr = &URLBackoff{ + request.backoff = &URLBackoff{ // Use a fake backoff here to avoid flakes and speed the test up. Backoff: flowcontrol.NewFakeBackOff( time.Duration(1)*time.Second, @@ -1290,7 +1317,7 @@ func TestBackoffLifecycle(t *testing.T) { )} for _, sec := range seconds { - thisBackoff := request.backoffMgr.CalculateBackoff(request.URL()) + thisBackoff := request.backoff.CalculateBackoff(request.URL()) t.Logf("Current backoff %v", thisBackoff) if thisBackoff != time.Duration(sec)*time.Second { t.Errorf("Backoff is %v instead of %v", thisBackoff, sec) @@ -1335,11 +1362,11 @@ func TestCheckRetryClosesBody(t *testing.T) { })) defer testServer.Close() - backoffMgr := &testBackoffManager{} + backoff := &testBackoffManager{} expectedSleeps := []time.Duration{0, time.Second, 0, time.Second, 0, time.Second, 0, time.Second, 0} c := testRESTClient(t, testServer) - c.createBackoffMgr = func() BackoffManager { return backoffMgr } + c.createBackoffMgr = func() BackoffManager { return backoff } _, err := c.Verb("POST"). Prefix("foo", "bar"). Suffix("baz"). @@ -1353,8 +1380,8 @@ func TestCheckRetryClosesBody(t *testing.T) { if count != 5 { t.Errorf("unexpected retries: %d", count) } - if !reflect.DeepEqual(backoffMgr.sleeps, expectedSleeps) { - t.Errorf("unexpected sleeps, expected: %v, got: %v", expectedSleeps, backoffMgr.sleeps) + if !reflect.DeepEqual(backoff.sleeps, expectedSleeps) { + t.Errorf("unexpected sleeps, expected: %v, got: %v", expectedSleeps, backoff.sleeps) } } @@ -1363,17 +1390,19 @@ func TestConnectionResetByPeerIsRetried(t *testing.T) { backoff := &testBackoffManager{} req := &Request{ verb: "GET", - client: clientFunc(func(req *http.Request) (*http.Response, error) { - count++ - if count >= 3 { - return &http.Response{ - StatusCode: http.StatusOK, - Body: ioutil.NopCloser(bytes.NewReader([]byte{})), - }, nil - } - return nil, &net.OpError{Err: syscall.ECONNRESET} - }), - backoffMgr: backoff, + c: &RESTClient{ + Client: clientForFunc(func(req *http.Request) (*http.Response, error) { + count++ + if count >= 3 { + return &http.Response{ + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(bytes.NewReader([]byte{})), + }, nil + } + return nil, &net.OpError{Err: syscall.ECONNRESET} + }), + }, + backoff: backoff, } // We expect two retries of "connection reset by peer" and the success. _, err := req.Do().Raw() @@ -1683,7 +1712,7 @@ func TestAbsPath(t *testing.T) { {"/p1/api/p2", "/api/r1", "/api/", "/p1/api/p2/api/"}, } { u, _ := url.Parse("http://localhost:123" + tc.configPrefix) - r := NewRequest(nil, "POST", u, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0).Prefix(tc.resourcePrefix).AbsPath(tc.absPath) + r := NewRequestWithClient(u, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("POST").Prefix(tc.resourcePrefix).AbsPath(tc.absPath) if r.pathPrefix != tc.wantsAbsPath { t.Errorf("test case %d failed, unexpected path: %q, expected %q", i, r.pathPrefix, tc.wantsAbsPath) } @@ -1803,6 +1832,66 @@ func TestWatch(t *testing.T) { s := testRESTClient(t, testServer) watching, err := s.Get().Prefix("path/to/watch/thing").Watch() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + for _, item := range table { + got, ok := <-watching.ResultChan() + if !ok { + t.Fatalf("Unexpected early close") + } + if e, a := item.t, got.Type; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + if e, a := item.obj, got.Object; !apiequality.Semantic.DeepDerivative(e, a) { + t.Errorf("Expected %v, got %v", e, a) + } + } + + _, ok := <-watching.ResultChan() + if ok { + t.Fatal("Unexpected non-close") + } +} + +func TestWatchNonDefaultContentType(t *testing.T) { + var table = []struct { + t watch.EventType + obj runtime.Object + }{ + {watch.Added, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "first"}}}, + {watch.Modified, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "second"}}}, + {watch.Deleted, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "last"}}}, + } + + testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + flusher, ok := w.(http.Flusher) + if !ok { + panic("need flusher!") + } + + w.Header().Set("Transfer-Encoding", "chunked") + // manually set the content type here so we get the renegotiation behavior + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + flusher.Flush() + + encoder := restclientwatch.NewEncoder(streaming.NewEncoder(w, scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion)), scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion)) + for _, item := range table { + if err := encoder.Encode(&watch.Event{Type: item.t, Object: item.obj}); err != nil { + panic(err) + } + flusher.Flush() + } + })) + defer testServer.Close() + + // set the default content type to protobuf so that we test falling back to JSON serialization + contentConfig := defaultContentConfig() + contentConfig.ContentType = "application/vnd.kubernetes.protobuf" + s := testRESTClientWithConfig(t, testServer, contentConfig) + watching, err := s.Get().Prefix("path/to/watch/thing").Watch() if err != nil { t.Fatalf("Unexpected error") } @@ -1826,6 +1915,45 @@ func TestWatch(t *testing.T) { } } +func TestWatchUnknownContentType(t *testing.T) { + var table = []struct { + t watch.EventType + obj runtime.Object + }{ + {watch.Added, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "first"}}}, + {watch.Modified, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "second"}}}, + {watch.Deleted, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "last"}}}, + } + + testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + flusher, ok := w.(http.Flusher) + if !ok { + panic("need flusher!") + } + + w.Header().Set("Transfer-Encoding", "chunked") + // manually set the content type here so we get the renegotiation behavior + w.Header().Set("Content-Type", "foobar") + w.WriteHeader(http.StatusOK) + flusher.Flush() + + encoder := restclientwatch.NewEncoder(streaming.NewEncoder(w, scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion)), scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion)) + for _, item := range table { + if err := encoder.Encode(&watch.Event{Type: item.t, Object: item.obj}); err != nil { + panic(err) + } + flusher.Flush() + } + })) + defer testServer.Close() + + s := testRESTClient(t, testServer) + _, err := s.Get().Prefix("path/to/watch/thing").Watch() + if err == nil { + t.Fatalf("Expected to fail due to lack of known stream serialization for content type") + } +} + func TestStream(t *testing.T) { expectedBody := "expected body" @@ -1856,21 +1984,27 @@ func TestStream(t *testing.T) { } } -func testRESTClient(t testing.TB, srv *httptest.Server) *RESTClient { - baseURL, _ := url.Parse("http://localhost") +func testRESTClientWithConfig(t testing.TB, srv *httptest.Server, contentConfig ClientContentConfig) *RESTClient { + base, _ := url.Parse("http://localhost") if srv != nil { var err error - baseURL, err = url.Parse(srv.URL) + base, err = url.Parse(srv.URL) if err != nil { t.Fatalf("failed to parse test URL: %v", err) } } versionedAPIPath := defaultResourcePathWithPrefix("", "", "", "") - client, err := NewRESTClient(baseURL, versionedAPIPath, defaultContentConfig(), 0, 0, nil, nil) + client, err := NewRESTClient(base, versionedAPIPath, contentConfig, nil, nil) if err != nil { t.Fatalf("failed to create a client: %v", err) } return client + +} + +func testRESTClient(t testing.TB, srv *httptest.Server) *RESTClient { + contentConfig := defaultContentConfig() + return testRESTClientWithConfig(t, srv, contentConfig) } func TestDoContext(t *testing.T) {