Always negotiate a decoder using ClientNegotiator

This commit performs two refactors and fixes a bug.

Refactor 1 changes the signature of Request to take a RESTClient, which
removes the extra copy of everything on RESTClient from Request. A pair
of optional constructors are added for testing. The major functional
change is that Request no longer has the shim HTTPClient interface and
so some test cases change slightly because we are now going through
http.Client code paths instead of direct to our test stubs.

Refactor 2 changes the signature of RESTClient to take a
ClientContentConfig instead of ContentConfig - the primary difference
being that ClientContentConfig uses ClientNegotiator instead of
NegotiatedSerializer and the old Serializers type. We also collapse
some redundancies (like the rate limiter can be created outside the
constructor).

The bug fix is to negotiate the streaming content type on a Watch()
like we do for requests. We stop caching the decoder and simply
resolve it on the request. We also clean up the dynamic client
and remove the extra WatchSpecificVersions() method by providing
a properly wrapped dynamic client.

Kubernetes-commit: 3b780c64b89606f4e6b21f48fb9c305d5998b9e5
This commit is contained in:
Clayton Coleman 2019-11-10 16:52:08 -05:00 committed by Kubernetes Publisher
parent 881cd219a8
commit 9bbcc2938d
8 changed files with 666 additions and 470 deletions

View File

@ -317,7 +317,6 @@ func (c *metadataResourceClient) List(opts metav1.ListOptions) (*metav1.PartialO
if !ok {
return nil, fmt.Errorf("incoming object is incorrect type %T", obj)
}
fmt.Printf("DEBUG: %#v\n", inputList)
list := &metav1.PartialObjectMetadataList{
ListMeta: inputList.ListMeta,

View File

@ -17,8 +17,6 @@ limitations under the License.
package rest
import (
"fmt"
"mime"
"net/http"
"net/url"
"os"
@ -51,6 +49,28 @@ type Interface interface {
APIVersion() schema.GroupVersion
}
// ClientContentConfig controls how RESTClient communicates with the server.
//
// TODO: ContentConfig will be updated to accept a Negotiator instead of a
// NegotiatedSerializer and NegotiatedSerializer will be removed.
type ClientContentConfig struct {
// AcceptContentTypes specifies the types the client will accept and is optional.
// If not set, ContentType will be used to define the Accept header
AcceptContentTypes string
// ContentType specifies the wire format used to communicate with the server.
// This value will be set as the Accept header on requests made to the server if
// AcceptContentTypes is not set, and as the default content type on any object
// sent to the server. If not set, "application/json" is used.
ContentType string
// GroupVersion is the API version to talk to. Must be provided when initializing
// a RESTClient directly. When initializing a Client, will be set with the default
// code version. This is used as the default group version for VersionedParams.
GroupVersion schema.GroupVersion
// Negotiator is used for obtaining encoders and decoders for multiple
// supported media types.
Negotiator runtime.ClientNegotiator
}
// RESTClient imposes common Kubernetes API conventions on a set of resource paths.
// The baseURL is expected to point to an HTTP or HTTPS path that is the parent
// of one or more resources. The server should return a decodable API resource
@ -64,34 +84,27 @@ type RESTClient struct {
// versionedAPIPath is a path segment connecting the base URL to the resource root
versionedAPIPath string
// contentConfig is the information used to communicate with the server.
contentConfig ContentConfig
// serializers contain all serializers for underlying content type.
serializers Serializers
// content describes how a RESTClient encodes and decodes responses.
content ClientContentConfig
// creates BackoffManager that is passed to requests.
createBackoffMgr func() BackoffManager
// TODO extract this into a wrapper interface via the RESTClient interface in kubectl.
Throttle flowcontrol.RateLimiter
// rateLimiter is shared among all requests created by this client unless specifically
// overridden.
rateLimiter flowcontrol.RateLimiter
// Set specific behavior of the client. If not set http.DefaultClient will be used.
Client *http.Client
}
type Serializers struct {
Encoder runtime.Encoder
Decoder runtime.Decoder
StreamingSerializer runtime.Serializer
Framer runtime.Framer
RenegotiatedDecoder func(contentType string, params map[string]string) (runtime.Decoder, error)
// NewRESTClient creates a new RESTClient. This client performs generic REST functions
// such as Get, Put, Post, and Delete on specified paths.
func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ClientContentConfig, rateLimiter flowcontrol.RateLimiter, client *http.Client) (*RESTClient, error) {
if len(config.ContentType) == 0 {
config.ContentType = "application/json"
}
// NewRESTClient creates a new RESTClient. This client performs generic REST functions
// such as Get, Put, Post, and Delete on specified paths. Codec controls encoding and
// decoding of responses from the server.
func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConfig, maxQPS float32, maxBurst int, rateLimiter flowcontrol.RateLimiter, client *http.Client) (*RESTClient, error) {
base := *baseURL
if !strings.HasSuffix(base.Path, "/") {
base.Path += "/"
@ -99,30 +112,13 @@ func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConf
base.RawQuery = ""
base.Fragment = ""
if config.GroupVersion == nil {
config.GroupVersion = &schema.GroupVersion{}
}
if len(config.ContentType) == 0 {
config.ContentType = "application/json"
}
serializers, err := createSerializers(config)
if err != nil {
return nil, err
}
var throttle flowcontrol.RateLimiter
if maxQPS > 0 && rateLimiter == nil {
throttle = flowcontrol.NewTokenBucketRateLimiter(maxQPS, maxBurst)
} else if rateLimiter != nil {
throttle = rateLimiter
}
return &RESTClient{
base: &base,
versionedAPIPath: versionedAPIPath,
contentConfig: config,
serializers: *serializers,
content: config,
createBackoffMgr: readExpBackoffConfig,
Throttle: throttle,
rateLimiter: rateLimiter,
Client: client,
}, nil
}
@ -132,7 +128,7 @@ func (c *RESTClient) GetRateLimiter() flowcontrol.RateLimiter {
if c == nil {
return nil
}
return c.Throttle
return c.rateLimiter
}
// readExpBackoffConfig handles the internal logic of determining what the
@ -153,58 +149,6 @@ func readExpBackoffConfig() BackoffManager {
time.Duration(backoffDurationInt)*time.Second)}
}
// createSerializers creates all necessary serializers for given contentType.
// TODO: the negotiated serializer passed to this method should probably return
// serializers that control decoding and versioning without this package
// being aware of the types. Depends on whether RESTClient must deal with
// generic infrastructure.
func createSerializers(config ContentConfig) (*Serializers, error) {
mediaTypes := config.NegotiatedSerializer.SupportedMediaTypes()
contentType := config.ContentType
mediaType, _, err := mime.ParseMediaType(contentType)
if err != nil {
return nil, fmt.Errorf("the content type specified in the client configuration is not recognized: %v", err)
}
info, ok := runtime.SerializerInfoForMediaType(mediaTypes, mediaType)
if !ok {
if len(contentType) != 0 || len(mediaTypes) == 0 {
return nil, fmt.Errorf("no serializers registered for %s", contentType)
}
info = mediaTypes[0]
}
internalGV := schema.GroupVersions{
{
Group: config.GroupVersion.Group,
Version: runtime.APIVersionInternal,
},
// always include the legacy group as a decoding target to handle non-error `Status` return types
{
Group: "",
Version: runtime.APIVersionInternal,
},
}
s := &Serializers{
Encoder: config.NegotiatedSerializer.EncoderForVersion(info.Serializer, *config.GroupVersion),
Decoder: config.NegotiatedSerializer.DecoderToVersion(info.Serializer, internalGV),
RenegotiatedDecoder: func(contentType string, params map[string]string) (runtime.Decoder, error) {
info, ok := runtime.SerializerInfoForMediaType(mediaTypes, contentType)
if !ok {
return nil, fmt.Errorf("serializer for %s not registered", contentType)
}
return config.NegotiatedSerializer.DecoderToVersion(info.Serializer, internalGV), nil
},
}
if info.StreamSerializer != nil {
s.StreamingSerializer = info.StreamSerializer.Serializer
s.Framer = info.StreamSerializer.Framer
}
return s, nil
}
// Verb begins a request with a verb (GET, POST, PUT, DELETE).
//
// Example usage of RESTClient's request building interface:
@ -219,12 +163,7 @@ func createSerializers(config ContentConfig) (*Serializers, error) {
// list, ok := resp.(*api.PodList)
//
func (c *RESTClient) Verb(verb string) *Request {
backoff := c.createBackoffMgr()
if c.Client == nil {
return NewRequest(nil, verb, c.base, c.versionedAPIPath, c.contentConfig, c.serializers, backoff, c.Throttle, 0)
}
return NewRequest(c.Client, verb, c.base, c.versionedAPIPath, c.contentConfig, c.serializers, backoff, c.Throttle, c.Client.Timeout)
return NewRequest(c).Verb(verb)
}
// Post begins a POST request. Short for c.Verb("POST").
@ -254,5 +193,5 @@ func (c *RESTClient) Delete() *Request {
// APIVersion returns the APIVersion this RESTClient is expected to use.
func (c *RESTClient) APIVersion() schema.GroupVersion {
return *c.contentConfig.GroupVersion
return c.content.GroupVersion
}

View File

@ -27,7 +27,7 @@ import (
"fmt"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
v1beta1 "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -57,12 +57,14 @@ func TestSerializer(t *testing.T) {
NegotiatedSerializer: scheme.Codecs.WithoutConversion(),
}
serializer, err := createSerializers(contentConfig)
n := runtime.NewClientNegotiator(contentConfig.NegotiatedSerializer, gv)
d, err := n.Decoder("application/json", nil)
if err != nil {
t.Fatal(err)
}
// bytes based on actual return from API server when encoding an "unversioned" object
obj, err := runtime.Decode(serializer.Decoder, []byte(`{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Success"}`))
obj, err := runtime.Decode(d, []byte(`{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Success"}`))
t.Log(obj)
if err != nil {
t.Fatal(err)

View File

@ -269,6 +269,9 @@ type ContentConfig struct {
GroupVersion *schema.GroupVersion
// NegotiatedSerializer is used for obtaining encoders and decoders for multiple
// supported media types.
//
// TODO: NegotiatedSerializer will be phased out as internal clients are removed
// from Kubernetes.
NegotiatedSerializer runtime.NegotiatedSerializer
}
@ -283,14 +286,6 @@ func RESTClientFor(config *Config) (*RESTClient, error) {
if config.NegotiatedSerializer == nil {
return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient")
}
qps := config.QPS
if config.QPS == 0.0 {
qps = DefaultQPS
}
burst := config.Burst
if config.Burst == 0 {
burst = DefaultBurst
}
baseURL, versionedAPIPath, err := defaultServerUrlFor(config)
if err != nil {
@ -310,7 +305,33 @@ func RESTClientFor(config *Config) (*RESTClient, error) {
}
}
return NewRESTClient(baseURL, versionedAPIPath, config.ContentConfig, qps, burst, config.RateLimiter, httpClient)
rateLimiter := config.RateLimiter
if rateLimiter == nil {
qps := config.QPS
if config.QPS == 0.0 {
qps = DefaultQPS
}
burst := config.Burst
if config.Burst == 0 {
burst = DefaultBurst
}
if qps > 0 {
rateLimiter = flowcontrol.NewTokenBucketRateLimiter(qps, burst)
}
}
var gv schema.GroupVersion
if config.GroupVersion != nil {
gv = *config.GroupVersion
}
clientContent := ClientContentConfig{
AcceptContentTypes: config.AcceptContentTypes,
ContentType: config.ContentType,
GroupVersion: gv,
Negotiator: runtime.NewClientNegotiator(config.NegotiatedSerializer, gv),
}
return NewRESTClient(baseURL, versionedAPIPath, clientContent, rateLimiter, httpClient)
}
// UnversionedRESTClientFor is the same as RESTClientFor, except that it allows
@ -338,13 +359,33 @@ func UnversionedRESTClientFor(config *Config) (*RESTClient, error) {
}
}
versionConfig := config.ContentConfig
if versionConfig.GroupVersion == nil {
v := metav1.SchemeGroupVersion
versionConfig.GroupVersion = &v
rateLimiter := config.RateLimiter
if rateLimiter == nil {
qps := config.QPS
if config.QPS == 0.0 {
qps = DefaultQPS
}
burst := config.Burst
if config.Burst == 0 {
burst = DefaultBurst
}
if qps > 0 {
rateLimiter = flowcontrol.NewTokenBucketRateLimiter(qps, burst)
}
}
return NewRESTClient(baseURL, versionedAPIPath, versionConfig, config.QPS, config.Burst, config.RateLimiter, httpClient)
gv := metav1.SchemeGroupVersion
if config.GroupVersion != nil {
gv = *config.GroupVersion
}
clientContent := ClientContentConfig{
AcceptContentTypes: config.AcceptContentTypes,
ContentType: config.ContentType,
GroupVersion: gv,
Negotiator: runtime.NewClientNegotiator(config.NegotiatedSerializer, gv),
}
return NewRESTClient(baseURL, versionedAPIPath, clientContent, rateLimiter, httpClient)
}
// SetKubernetesDefaults sets default values on the provided client config for accessing the

View File

@ -155,6 +155,59 @@ func TestRESTClientRequires(t *testing.T) {
}
}
func TestRESTClientLimiter(t *testing.T) {
testCases := []struct {
Name string
Config Config
Limiter flowcontrol.RateLimiter
}{
{
Config: Config{},
Limiter: flowcontrol.NewTokenBucketRateLimiter(5, 10),
},
{
Config: Config{QPS: 10},
Limiter: flowcontrol.NewTokenBucketRateLimiter(10, 10),
},
{
Config: Config{QPS: -1},
Limiter: nil,
},
{
Config: Config{
RateLimiter: flowcontrol.NewTokenBucketRateLimiter(11, 12),
},
Limiter: flowcontrol.NewTokenBucketRateLimiter(11, 12),
},
}
for _, testCase := range testCases {
t.Run("Versioned_"+testCase.Name, func(t *testing.T) {
config := testCase.Config
config.Host = "127.0.0.1"
config.ContentConfig = ContentConfig{GroupVersion: &v1.SchemeGroupVersion, NegotiatedSerializer: scheme.Codecs}
client, err := RESTClientFor(&config)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !reflect.DeepEqual(testCase.Limiter, client.rateLimiter) {
t.Fatalf("unexpected rate limiter: %#v", client.rateLimiter)
}
})
t.Run("Unversioned_"+testCase.Name, func(t *testing.T) {
config := testCase.Config
config.Host = "127.0.0.1"
config.ContentConfig = ContentConfig{GroupVersion: &v1.SchemeGroupVersion, NegotiatedSerializer: scheme.Codecs}
client, err := UnversionedRESTClientFor(&config)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !reflect.DeepEqual(testCase.Limiter, client.rateLimiter) {
t.Fatalf("unexpected rate limiter: %#v", client.rateLimiter)
}
})
}
}
type fakeLimiter struct {
FakeSaturation float64
FakeQPS float32

View File

@ -29,6 +29,8 @@ import (
"k8s.io/client-go/util/flowcontrol"
)
// CreateHTTPClient creates an http.Client that will invoke the provided roundTripper func
// when a request is made.
func CreateHTTPClient(roundTripper func(*http.Request) (*http.Response, error)) *http.Client {
return &http.Client{
Transport: roundTripperFunc(roundTripper),
@ -41,40 +43,49 @@ func (f roundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) {
return f(req)
}
// RESTClient provides a fake RESTClient interface.
// RESTClient provides a fake RESTClient interface. It is used to mock network
// interactions via a rest.Request, or to make them via the provided Client to
// a specific server.
type RESTClient struct {
Client *http.Client
NegotiatedSerializer runtime.NegotiatedSerializer
GroupVersion schema.GroupVersion
VersionedAPIPath string
Req *http.Request
Resp *http.Response
// Err is returned when any request would be made to the server. If Err is set,
// Req will not be recorded, Resp will not be returned, and Client will not be
// invoked.
Err error
// Req is set to the last request that was executed (had the methods Do/DoRaw) invoked.
Req *http.Request
// If Client is specified, the client will be invoked instead of returning Resp if
// Err is not set.
Client *http.Client
// Resp is returned to the caller after Req is recorded, unless Err or Client are set.
Resp *http.Response
}
func (c *RESTClient) Get() *restclient.Request {
return c.request("GET")
return c.Verb("GET")
}
func (c *RESTClient) Put() *restclient.Request {
return c.request("PUT")
return c.Verb("PUT")
}
func (c *RESTClient) Patch(pt types.PatchType) *restclient.Request {
return c.request("PATCH").SetHeader("Content-Type", string(pt))
return c.Verb("PATCH").SetHeader("Content-Type", string(pt))
}
func (c *RESTClient) Post() *restclient.Request {
return c.request("POST")
return c.Verb("POST")
}
func (c *RESTClient) Delete() *restclient.Request {
return c.request("DELETE")
return c.Verb("DELETE")
}
func (c *RESTClient) Verb(verb string) *restclient.Request {
return c.request(verb)
return c.Request().Verb(verb)
}
func (c *RESTClient) APIVersion() schema.GroupVersion {
@ -85,28 +96,17 @@ func (c *RESTClient) GetRateLimiter() flowcontrol.RateLimiter {
return nil
}
func (c *RESTClient) request(verb string) *restclient.Request {
config := restclient.ContentConfig{
func (c *RESTClient) Request() *restclient.Request {
config := restclient.ClientContentConfig{
ContentType: runtime.ContentTypeJSON,
GroupVersion: &c.GroupVersion,
NegotiatedSerializer: c.NegotiatedSerializer,
GroupVersion: c.GroupVersion,
Negotiator: runtime.NewClientNegotiator(c.NegotiatedSerializer, c.GroupVersion),
}
return restclient.NewRequestWithClient(&url.URL{Scheme: "https", Host: "localhost"}, c.VersionedAPIPath, config, CreateHTTPClient(c.do))
}
ns := c.NegotiatedSerializer
info, _ := runtime.SerializerInfoForMediaType(ns.SupportedMediaTypes(), runtime.ContentTypeJSON)
serializers := restclient.Serializers{
// TODO this was hardcoded before, but it doesn't look right
Encoder: ns.EncoderForVersion(info.Serializer, c.GroupVersion),
Decoder: ns.DecoderToVersion(info.Serializer, c.GroupVersion),
}
if info.StreamSerializer != nil {
serializers.StreamingSerializer = info.StreamSerializer.Serializer
serializers.Framer = info.StreamSerializer.Framer
}
return restclient.NewRequest(c, verb, &url.URL{Host: "localhost"}, c.VersionedAPIPath, config, serializers, nil, nil, 0)
}
func (c *RESTClient) Do(req *http.Request) (*http.Response, error) {
// do is invoked when a Request() created by this client is executed.
func (c *RESTClient) do(req *http.Request) (*http.Response, error) {
if c.Err != nil {
return nil, c.Err
}

View File

@ -48,7 +48,8 @@ import (
var (
// longThrottleLatency defines threshold for logging requests. All requests being
// throttle for more than longThrottleLatency will be logged.
// throttled (via the provided rateLimiter) for more than longThrottleLatency will
// be logged.
longThrottleLatency = 50 * time.Millisecond
)
@ -74,19 +75,20 @@ func (r *RequestConstructionError) Error() string {
return fmt.Sprintf("request construction error: '%v'", r.Err)
}
var noBackoff = &NoBackoff{}
// Request allows for building up a request to a server in a chained fashion.
// Any errors are stored until the end of your call, so you only have to
// check once.
type Request struct {
// required
client HTTPClient
verb string
c *RESTClient
baseURL *url.URL
content ContentConfig
serializers Serializers
rateLimiter flowcontrol.RateLimiter
backoff BackoffManager
timeout time.Duration
// generic components accessible via method setters
verb string
pathPrefix string
subpath string
params url.Values
@ -98,7 +100,6 @@ type Request struct {
resource string
resourceName string
subresource string
timeout time.Duration
// output
err error
@ -106,42 +107,63 @@ type Request struct {
// This is only used for per-request timeouts, deadlines, and cancellations.
ctx context.Context
backoffMgr BackoffManager
throttle flowcontrol.RateLimiter
}
// NewRequest creates a new request helper object for accessing runtime.Objects on a server.
func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, serializers Serializers, backoff BackoffManager, throttle flowcontrol.RateLimiter, timeout time.Duration) *Request {
func NewRequest(c *RESTClient) *Request {
var backoff BackoffManager
if c.createBackoffMgr != nil {
backoff = c.createBackoffMgr()
}
if backoff == nil {
klog.V(2).Infof("Not implementing request backoff strategy.")
backoff = &NoBackoff{}
backoff = noBackoff
}
pathPrefix := "/"
if baseURL != nil {
pathPrefix = path.Join(pathPrefix, baseURL.Path)
var pathPrefix string
if c.base != nil {
pathPrefix = path.Join("/", c.base.Path, c.versionedAPIPath)
} else {
pathPrefix = path.Join("/", c.versionedAPIPath)
}
var timeout time.Duration
if c.Client != nil {
timeout = c.Client.Timeout
}
r := &Request{
client: client,
verb: verb,
baseURL: baseURL,
pathPrefix: path.Join(pathPrefix, versionedAPIPath),
content: content,
serializers: serializers,
backoffMgr: backoff,
throttle: throttle,
c: c,
rateLimiter: c.rateLimiter,
backoff: backoff,
timeout: timeout,
pathPrefix: pathPrefix,
}
switch {
case len(content.AcceptContentTypes) > 0:
r.SetHeader("Accept", content.AcceptContentTypes)
case len(content.ContentType) > 0:
r.SetHeader("Accept", content.ContentType+", */*")
case len(c.content.AcceptContentTypes) > 0:
r.SetHeader("Accept", c.content.AcceptContentTypes)
case len(c.content.ContentType) > 0:
r.SetHeader("Accept", c.content.ContentType+", */*")
}
return r
}
// NewRequestWithClient creates a Request with an embedded RESTClient for use in test scenarios.
func NewRequestWithClient(base *url.URL, versionedAPIPath string, content ClientContentConfig, client *http.Client) *Request {
return NewRequest(&RESTClient{
base: base,
versionedAPIPath: versionedAPIPath,
content: content,
Client: client,
})
}
// Verb sets the verb this request will use.
func (r *Request) Verb(verb string) *Request {
r.verb = verb
return r
}
// Prefix adds segments to the relative beginning to the request path. These
// items will be placed before the optional Namespace, Resource, or Name sections.
// Setting AbsPath will clear any previously set Prefix segments
@ -184,17 +206,17 @@ func (r *Request) Resource(resource string) *Request {
// or defaults to the stub implementation if nil is provided
func (r *Request) BackOff(manager BackoffManager) *Request {
if manager == nil {
r.backoffMgr = &NoBackoff{}
r.backoff = &NoBackoff{}
return r
}
r.backoffMgr = manager
r.backoff = manager
return r
}
// Throttle receives a rate-limiter and sets or replaces an existing request limiter
func (r *Request) Throttle(limiter flowcontrol.RateLimiter) *Request {
r.throttle = limiter
r.rateLimiter = limiter
return r
}
@ -272,8 +294,8 @@ func (r *Request) AbsPath(segments ...string) *Request {
if r.err != nil {
return r
}
r.pathPrefix = path.Join(r.baseURL.Path, path.Join(segments...))
if len(segments) == 1 && (len(r.baseURL.Path) > 1 || len(segments[0]) > 1) && strings.HasSuffix(segments[0], "/") {
r.pathPrefix = path.Join(r.c.base.Path, path.Join(segments...))
if len(segments) == 1 && (len(r.c.base.Path) > 1 || len(segments[0]) > 1) && strings.HasSuffix(segments[0], "/") {
// preserve any trailing slashes for legacy behavior
r.pathPrefix += "/"
}
@ -317,7 +339,7 @@ func (r *Request) Param(paramName, s string) *Request {
// VersionedParams will not write query parameters that have omitempty set and are empty. If a
// parameter has already been set it is appended to (Params and VersionedParams are additive).
func (r *Request) VersionedParams(obj runtime.Object, codec runtime.ParameterCodec) *Request {
return r.SpecificallyVersionedParams(obj, codec, *r.content.GroupVersion)
return r.SpecificallyVersionedParams(obj, codec, r.c.content.GroupVersion)
}
func (r *Request) SpecificallyVersionedParams(obj runtime.Object, codec runtime.ParameterCodec, version schema.GroupVersion) *Request {
@ -397,14 +419,19 @@ func (r *Request) Body(obj interface{}) *Request {
if reflect.ValueOf(t).IsNil() {
return r
}
data, err := runtime.Encode(r.serializers.Encoder, t)
encoder, err := r.c.content.Negotiator.Encoder(r.c.content.ContentType, nil)
if err != nil {
r.err = err
return r
}
data, err := runtime.Encode(encoder, t)
if err != nil {
r.err = err
return r
}
glogBody("Request Body", data)
r.body = bytes.NewReader(data)
r.SetHeader("Content-Type", r.content.ContentType)
r.SetHeader("Content-Type", r.c.content.ContentType)
default:
r.err = fmt.Errorf("unknown type used for body: %+v", obj)
}
@ -433,8 +460,8 @@ func (r *Request) URL() *url.URL {
}
finalURL := &url.URL{}
if r.baseURL != nil {
*finalURL = *r.baseURL
if r.c.base != nil {
*finalURL = *r.c.base
}
finalURL.Path = p
@ -468,8 +495,8 @@ func (r Request) finalURLTemplate() url.URL {
segments := strings.Split(r.URL().Path, "/")
groupIndex := 0
index := 0
if r.URL() != nil && r.baseURL != nil && strings.Contains(r.URL().Path, r.baseURL.Path) {
groupIndex += len(strings.Split(r.baseURL.Path, "/"))
if r.URL() != nil && r.c.base != nil && strings.Contains(r.URL().Path, r.c.base.Path) {
groupIndex += len(strings.Split(r.c.base.Path, "/"))
}
if groupIndex >= len(segments) {
return *url
@ -522,16 +549,16 @@ func (r Request) finalURLTemplate() url.URL {
}
func (r *Request) tryThrottle() error {
if r.throttle == nil {
if r.rateLimiter == nil {
return nil
}
now := time.Now()
var err error
if r.ctx != nil {
err = r.throttle.Wait(r.ctx)
err = r.rateLimiter.Wait(r.ctx)
} else {
r.throttle.Accept()
r.rateLimiter.Accept()
}
if latency := time.Since(now); latency > longThrottleLatency {
@ -544,27 +571,11 @@ func (r *Request) tryThrottle() error {
// Watch attempts to begin watching the requested location.
// Returns a watch.Interface, or an error.
func (r *Request) Watch() (watch.Interface, error) {
return r.WatchWithSpecificDecoders(
func(body io.ReadCloser) streaming.Decoder {
framer := r.serializers.Framer.NewFrameReader(body)
return streaming.NewDecoder(framer, r.serializers.StreamingSerializer)
},
r.serializers.Decoder,
)
}
// WatchWithSpecificDecoders attempts to begin watching the requested location with a *different* decoder.
// Turns out that you want one "standard" decoder for the watch event and one "personal" decoder for the content
// Returns a watch.Interface, or an error.
func (r *Request) WatchWithSpecificDecoders(wrapperDecoderFn func(io.ReadCloser) streaming.Decoder, embeddedDecoder runtime.Decoder) (watch.Interface, error) {
// We specifically don't want to rate limit watches, so we
// don't use r.throttle here.
// don't use r.rateLimiter here.
if r.err != nil {
return nil, r.err
}
if r.serializers.Framer == nil {
return nil, fmt.Errorf("watching resources is not possible with this client (content-type: %s)", r.content.ContentType)
}
url := r.URL().String()
req, err := http.NewRequest(r.verb, url, r.body)
@ -575,18 +586,18 @@ func (r *Request) WatchWithSpecificDecoders(wrapperDecoderFn func(io.ReadCloser)
req = req.WithContext(r.ctx)
}
req.Header = r.headers
client := r.client
client := r.c.Client
if client == nil {
client = http.DefaultClient
}
r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
resp, err := client.Do(req)
updateURLMetrics(r, resp, err)
if r.baseURL != nil {
if r.c.base != nil {
if err != nil {
r.backoffMgr.UpdateBackoff(r.baseURL, err, 0)
r.backoff.UpdateBackoff(r.c.base, err, 0)
} else {
r.backoffMgr.UpdateBackoff(r.baseURL, err, resp.StatusCode)
r.backoff.UpdateBackoff(r.c.base, err, resp.StatusCode)
}
}
if err != nil {
@ -604,9 +615,22 @@ func (r *Request) WatchWithSpecificDecoders(wrapperDecoderFn func(io.ReadCloser)
}
return nil, fmt.Errorf("for request %s, got status: %v", url, resp.StatusCode)
}
wrapperDecoder := wrapperDecoderFn(resp.Body)
contentType := resp.Header.Get("Content-Type")
mediaType, params, err := mime.ParseMediaType(contentType)
if err != nil {
klog.V(4).Infof("Unexpected content type from the server: %q: %v", contentType, err)
}
objectDecoder, streamingSerializer, framer, err := r.c.content.Negotiator.StreamDecoder(mediaType, params)
if err != nil {
return nil, err
}
frameReader := framer.NewFrameReader(resp.Body)
watchEventDecoder := streaming.NewDecoder(frameReader, streamingSerializer)
return watch.NewStreamWatcher(
restclientwatch.NewDecoder(wrapperDecoder, embeddedDecoder),
restclientwatch.NewDecoder(watchEventDecoder, objectDecoder),
// use 500 to indicate that the cause of the error is unknown - other error codes
// are more specific to HTTP interactions, and set a reason
errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"),
@ -617,8 +641,8 @@ func (r *Request) WatchWithSpecificDecoders(wrapperDecoderFn func(io.ReadCloser)
// It also handles corner cases for incomplete/invalid request data.
func updateURLMetrics(req *Request, resp *http.Response, err error) {
url := "none"
if req.baseURL != nil {
url = req.baseURL.Host
if req.c.base != nil {
url = req.c.base.Host
}
// Errors can be arbitrary strings. Unbound label cardinality is not suitable for a metric
@ -656,18 +680,18 @@ func (r *Request) Stream() (io.ReadCloser, error) {
req = req.WithContext(r.ctx)
}
req.Header = r.headers
client := r.client
client := r.c.Client
if client == nil {
client = http.DefaultClient
}
r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
resp, err := client.Do(req)
updateURLMetrics(r, resp, err)
if r.baseURL != nil {
if r.c.base != nil {
if err != nil {
r.backoffMgr.UpdateBackoff(r.URL(), err, 0)
r.backoff.UpdateBackoff(r.URL(), err, 0)
} else {
r.backoffMgr.UpdateBackoff(r.URL(), err, resp.StatusCode)
r.backoff.UpdateBackoff(r.URL(), err, resp.StatusCode)
}
}
if err != nil {
@ -738,7 +762,7 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error {
return err
}
client := r.client
client := r.c.Client
if client == nil {
client = http.DefaultClient
}
@ -765,11 +789,11 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error {
}
req.Header = r.headers
r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
if retries > 0 {
// We are retrying the request that we already send to apiserver
// at least once before.
// This request should also be throttled with the client-internal throttler.
// This request should also be throttled with the client-internal rate limiter.
if err := r.tryThrottle(); err != nil {
return err
}
@ -777,9 +801,9 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error {
resp, err := client.Do(req)
updateURLMetrics(r, resp, err)
if err != nil {
r.backoffMgr.UpdateBackoff(r.URL(), err, 0)
r.backoff.UpdateBackoff(r.URL(), err, 0)
} else {
r.backoffMgr.UpdateBackoff(r.URL(), err, resp.StatusCode)
r.backoff.UpdateBackoff(r.URL(), err, resp.StatusCode)
}
if err != nil {
// "Connection reset by peer" is usually a transient error.
@ -822,7 +846,7 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error {
}
klog.V(4).Infof("Got a Retry-After %ds response for attempt %d to %v", seconds, retries, url)
r.backoffMgr.Sleep(time.Duration(seconds) * time.Second)
r.backoff.Sleep(time.Duration(seconds) * time.Second)
return false
}
fn(req, resp)
@ -908,14 +932,18 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu
glogBody("Response Body", body)
// verify the content type is accurate
var decoder runtime.Decoder
contentType := resp.Header.Get("Content-Type")
decoder := r.serializers.Decoder
if len(contentType) > 0 && (decoder == nil || (len(r.content.ContentType) > 0 && contentType != r.content.ContentType)) {
if len(contentType) == 0 {
contentType = r.c.content.ContentType
}
if len(contentType) > 0 {
var err error
mediaType, params, err := mime.ParseMediaType(contentType)
if err != nil {
return Result{err: errors.NewInternalError(err)}
}
decoder, err = r.serializers.RenegotiatedDecoder(mediaType, params)
decoder, err = r.c.content.Negotiator.Decoder(mediaType, params)
if err != nil {
// if we fail to negotiate a decoder, treat this as an unstructured error
switch {
@ -1035,7 +1063,7 @@ func (r *Request) newUnstructuredResponseError(body []byte, isTextResponse bool,
}
var groupResource schema.GroupResource
if len(r.resource) > 0 {
groupResource.Group = r.content.GroupVersion.Group
groupResource.Group = r.c.content.GroupVersion.Group
groupResource.Resource = r.resource
}
return errors.NewGenericServerResponse(

View File

@ -56,24 +56,30 @@ import (
)
func TestNewRequestSetsAccept(t *testing.T) {
r := NewRequest(nil, "get", &url.URL{Path: "/path/"}, "", ContentConfig{}, Serializers{}, nil, nil, 0)
r := NewRequestWithClient(&url.URL{Path: "/path/"}, "", ClientContentConfig{}, nil).Verb("get")
if r.headers.Get("Accept") != "" {
t.Errorf("unexpected headers: %#v", r.headers)
}
r = NewRequest(nil, "get", &url.URL{Path: "/path/"}, "", ContentConfig{ContentType: "application/other"}, Serializers{}, nil, nil, 0)
r = NewRequestWithClient(&url.URL{Path: "/path/"}, "", ClientContentConfig{ContentType: "application/other"}, nil).Verb("get")
if r.headers.Get("Accept") != "application/other, */*" {
t.Errorf("unexpected headers: %#v", r.headers)
}
}
func clientForFunc(fn clientFunc) *http.Client {
return &http.Client{
Transport: fn,
}
}
type clientFunc func(req *http.Request) (*http.Response, error)
func (f clientFunc) Do(req *http.Request) (*http.Response, error) {
func (f clientFunc) RoundTrip(req *http.Request) (*http.Response, error) {
return f(req)
}
func TestRequestSetsHeaders(t *testing.T) {
server := clientFunc(func(req *http.Request) (*http.Response, error) {
server := clientForFunc(func(req *http.Request) (*http.Response, error) {
if req.Header.Get("Accept") != "application/other, */*" {
t.Errorf("unexpected headers: %#v", req.Header)
}
@ -84,8 +90,8 @@ func TestRequestSetsHeaders(t *testing.T) {
})
config := defaultContentConfig()
config.ContentType = "application/other"
serializers := defaultSerializers(t)
r := NewRequest(server, "get", &url.URL{Path: "/path"}, "", config, serializers, nil, nil, 0)
r := NewRequestWithClient(&url.URL{Path: "/path"}, "", config, nil).Verb("get")
r.c.Client = server
// Check if all "issue" methods are setting headers.
_ = r.Do()
@ -97,7 +103,9 @@ func TestRequestWithErrorWontChange(t *testing.T) {
gvCopy := v1.SchemeGroupVersion
original := Request{
err: errors.New("test"),
content: ContentConfig{GroupVersion: &gvCopy},
c: &RESTClient{
content: ClientContentConfig{GroupVersion: gvCopy},
},
}
r := original
changed := r.Param("foo", "bar").
@ -118,26 +126,26 @@ func TestRequestWithErrorWontChange(t *testing.T) {
}
func TestRequestPreservesBaseTrailingSlash(t *testing.T) {
r := &Request{baseURL: &url.URL{}, pathPrefix: "/path/"}
r := &Request{c: &RESTClient{base: &url.URL{}}, pathPrefix: "/path/"}
if s := r.URL().String(); s != "/path/" {
t.Errorf("trailing slash should be preserved: %s", s)
}
}
func TestRequestAbsPathPreservesTrailingSlash(t *testing.T) {
r := (&Request{baseURL: &url.URL{}}).AbsPath("/foo/")
r := (&Request{c: &RESTClient{base: &url.URL{}}}).AbsPath("/foo/")
if s := r.URL().String(); s != "/foo/" {
t.Errorf("trailing slash should be preserved: %s", s)
}
r = (&Request{baseURL: &url.URL{}}).AbsPath("/foo/")
r = (&Request{c: &RESTClient{base: &url.URL{}}}).AbsPath("/foo/")
if s := r.URL().String(); s != "/foo/" {
t.Errorf("trailing slash should be preserved: %s", s)
}
}
func TestRequestAbsPathJoins(t *testing.T) {
r := (&Request{baseURL: &url.URL{}}).AbsPath("foo/bar", "baz")
r := (&Request{c: &RESTClient{base: &url.URL{}}}).AbsPath("foo/bar", "baz")
if s := r.URL().String(); s != "foo/bar/baz" {
t.Errorf("trailing slash should be preserved: %s", s)
}
@ -145,9 +153,7 @@ func TestRequestAbsPathJoins(t *testing.T) {
func TestRequestSetsNamespace(t *testing.T) {
r := (&Request{
baseURL: &url.URL{
Path: "/",
},
c: &RESTClient{base: &url.URL{Path: "/"}},
}).Namespace("foo")
if r.namespace == "" {
t.Errorf("namespace should be set: %#v", r)
@ -160,7 +166,7 @@ func TestRequestSetsNamespace(t *testing.T) {
func TestRequestOrdersNamespaceInPath(t *testing.T) {
r := (&Request{
baseURL: &url.URL{},
c: &RESTClient{base: &url.URL{}},
pathPrefix: "/test/",
}).Name("bar").Resource("baz").Namespace("foo")
if s := r.URL().String(); s != "/test/namespaces/foo/baz/bar" {
@ -170,7 +176,7 @@ func TestRequestOrdersNamespaceInPath(t *testing.T) {
func TestRequestOrdersSubResource(t *testing.T) {
r := (&Request{
baseURL: &url.URL{},
c: &RESTClient{base: &url.URL{}},
pathPrefix: "/test/",
}).Name("bar").Resource("baz").Namespace("foo").Suffix("test").SubResource("a", "b")
if s := r.URL().String(); s != "/test/namespaces/foo/baz/bar/a/b/test" {
@ -226,7 +232,7 @@ func TestRequestParam(t *testing.T) {
}
func TestRequestVersionedParams(t *testing.T) {
r := (&Request{content: ContentConfig{GroupVersion: &v1.SchemeGroupVersion}}).Param("foo", "a")
r := (&Request{c: &RESTClient{content: ClientContentConfig{GroupVersion: v1.SchemeGroupVersion}}}).Param("foo", "a")
if !reflect.DeepEqual(r.params, url.Values{"foo": []string{"a"}}) {
t.Errorf("should have set a param: %#v", r)
}
@ -242,7 +248,7 @@ func TestRequestVersionedParams(t *testing.T) {
}
func TestRequestVersionedParamsFromListOptions(t *testing.T) {
r := &Request{content: ContentConfig{GroupVersion: &v1.SchemeGroupVersion}}
r := &Request{c: &RESTClient{content: ClientContentConfig{GroupVersion: v1.SchemeGroupVersion}}}
r.VersionedParams(&metav1.ListOptions{ResourceVersion: "1"}, scheme.ParameterCodec)
if !reflect.DeepEqual(r.params, url.Values{
"resourceVersion": []string{"1"},
@ -277,24 +283,15 @@ type NotAnAPIObject struct{}
func (obj NotAnAPIObject) GroupVersionKind() *schema.GroupVersionKind { return nil }
func (obj NotAnAPIObject) SetGroupVersionKind(gvk *schema.GroupVersionKind) {}
func defaultContentConfig() ContentConfig {
func defaultContentConfig() ClientContentConfig {
gvCopy := v1.SchemeGroupVersion
return ContentConfig{
return ClientContentConfig{
ContentType: "application/json",
GroupVersion: &gvCopy,
NegotiatedSerializer: scheme.Codecs.WithoutConversion(),
GroupVersion: gvCopy,
Negotiator: runtime.NewClientNegotiator(scheme.Codecs.WithoutConversion(), gvCopy),
}
}
func defaultSerializers(t *testing.T) Serializers {
config := defaultContentConfig()
serializers, err := createSerializers(config)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
return *serializers
}
func TestRequestBody(t *testing.T) {
// test unknown type
r := (&Request{}).Body([]string{"test"})
@ -315,7 +312,7 @@ func TestRequestBody(t *testing.T) {
}
// test unencodable api object
r = (&Request{content: defaultContentConfig()}).Body(&NotAnAPIObject{})
r = (&Request{c: &RESTClient{content: defaultContentConfig()}}).Body(&NotAnAPIObject{})
if r.err == nil || r.body != nil {
t.Errorf("should have set err and left body nil: %#v", r)
}
@ -347,14 +344,14 @@ func TestURLTemplate(t *testing.T) {
}{
{
// non dynamic client
Request: NewRequest(nil, "POST", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0).
Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("POST").
Prefix("api", "v1").Resource("r1").Namespace("ns").Name("nm").Param("p0", "v0"),
ExpectedFullURL: "http://localhost/some/base/url/path/api/v1/namespaces/ns/r1/nm?p0=v0",
ExpectedFinalURL: "http://localhost/some/base/url/path/api/v1/namespaces/%7Bnamespace%7D/r1/%7Bname%7D?p0=%7Bvalue%7D",
},
{
// non dynamic client with wrong api group
Request: NewRequest(nil, "POST", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0).
Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("POST").
Prefix("pre1", "v1").Resource("r1").Namespace("ns").Name("nm").Param("p0", "v0"),
ExpectedFullURL: "http://localhost/some/base/url/path/pre1/v1/namespaces/ns/r1/nm?p0=v0",
ExpectedFinalURL: "http://localhost/%7Bprefix%7D",
@ -362,7 +359,7 @@ func TestURLTemplate(t *testing.T) {
{
// dynamic client with core group + namespace + resourceResource (with name)
// /api/$RESOURCEVERSION/namespaces/$NAMESPACE/$RESOURCE/%NAME
Request: NewRequest(nil, "DELETE", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0).
Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
Prefix("/api/v1/namespaces/ns/r1/name1"),
ExpectedFullURL: "http://localhost/some/base/url/path/api/v1/namespaces/ns/r1/name1",
ExpectedFinalURL: "http://localhost/some/base/url/path/api/v1/namespaces/%7Bnamespace%7D/r1/%7Bname%7D",
@ -370,7 +367,7 @@ func TestURLTemplate(t *testing.T) {
{
// dynamic client with named group + namespace + resourceResource (with name)
// /apis/$NAMEDGROUPNAME/$RESOURCEVERSION/namespaces/$NAMESPACE/$RESOURCE/%NAME
Request: NewRequest(nil, "DELETE", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0).
Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
Prefix("/apis/g1/v1/namespaces/ns/r1/name1"),
ExpectedFullURL: "http://localhost/some/base/url/path/apis/g1/v1/namespaces/ns/r1/name1",
ExpectedFinalURL: "http://localhost/some/base/url/path/apis/g1/v1/namespaces/%7Bnamespace%7D/r1/%7Bname%7D",
@ -378,7 +375,7 @@ func TestURLTemplate(t *testing.T) {
{
// dynamic client with core group + namespace + resourceResource (with NO name)
// /api/$RESOURCEVERSION/namespaces/$NAMESPACE/$RESOURCE
Request: NewRequest(nil, "DELETE", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0).
Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
Prefix("/api/v1/namespaces/ns/r1"),
ExpectedFullURL: "http://localhost/some/base/url/path/api/v1/namespaces/ns/r1",
ExpectedFinalURL: "http://localhost/some/base/url/path/api/v1/namespaces/%7Bnamespace%7D/r1",
@ -386,7 +383,7 @@ func TestURLTemplate(t *testing.T) {
{
// dynamic client with named group + namespace + resourceResource (with NO name)
// /apis/$NAMEDGROUPNAME/$RESOURCEVERSION/namespaces/$NAMESPACE/$RESOURCE
Request: NewRequest(nil, "DELETE", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0).
Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
Prefix("/apis/g1/v1/namespaces/ns/r1"),
ExpectedFullURL: "http://localhost/some/base/url/path/apis/g1/v1/namespaces/ns/r1",
ExpectedFinalURL: "http://localhost/some/base/url/path/apis/g1/v1/namespaces/%7Bnamespace%7D/r1",
@ -394,7 +391,7 @@ func TestURLTemplate(t *testing.T) {
{
// dynamic client with core group + resourceResource (with name)
// /api/$RESOURCEVERSION/$RESOURCE/%NAME
Request: NewRequest(nil, "DELETE", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0).
Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
Prefix("/api/v1/r1/name1"),
ExpectedFullURL: "http://localhost/some/base/url/path/api/v1/r1/name1",
ExpectedFinalURL: "http://localhost/some/base/url/path/api/v1/r1/%7Bname%7D",
@ -402,7 +399,7 @@ func TestURLTemplate(t *testing.T) {
{
// dynamic client with named group + resourceResource (with name)
// /apis/$NAMEDGROUPNAME/$RESOURCEVERSION/$RESOURCE/%NAME
Request: NewRequest(nil, "DELETE", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0).
Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
Prefix("/apis/g1/v1/r1/name1"),
ExpectedFullURL: "http://localhost/some/base/url/path/apis/g1/v1/r1/name1",
ExpectedFinalURL: "http://localhost/some/base/url/path/apis/g1/v1/r1/%7Bname%7D",
@ -410,7 +407,7 @@ func TestURLTemplate(t *testing.T) {
{
// dynamic client with named group + namespace + resourceResource (with name) + subresource
// /apis/$NAMEDGROUPNAME/$RESOURCEVERSION/namespaces/$NAMESPACE/$RESOURCE/%NAME/$SUBRESOURCE
Request: NewRequest(nil, "DELETE", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0).
Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
Prefix("/apis/namespaces/namespaces/namespaces/namespaces/namespaces/namespaces/finalize"),
ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/namespaces/namespaces/namespaces/finalize",
ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/%7Bnamespace%7D/namespaces/%7Bname%7D/finalize",
@ -418,7 +415,7 @@ func TestURLTemplate(t *testing.T) {
{
// dynamic client with named group + namespace + resourceResource (with name)
// /apis/$NAMEDGROUPNAME/$RESOURCEVERSION/namespaces/$NAMESPACE/$RESOURCE/%NAME
Request: NewRequest(nil, "DELETE", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0).
Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
Prefix("/apis/namespaces/namespaces/namespaces/namespaces/namespaces/namespaces"),
ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/namespaces/namespaces/namespaces",
ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/%7Bnamespace%7D/namespaces/%7Bname%7D",
@ -426,7 +423,7 @@ func TestURLTemplate(t *testing.T) {
{
// dynamic client with named group + namespace + resourceResource (with NO name) + subresource
// /apis/$NAMEDGROUPNAME/$RESOURCEVERSION/namespaces/$NAMESPACE/$RESOURCE/%SUBRESOURCE
Request: NewRequest(nil, "DELETE", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0).
Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
Prefix("/apis/namespaces/namespaces/namespaces/namespaces/namespaces/finalize"),
ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/namespaces/namespaces/finalize",
ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/%7Bnamespace%7D/namespaces/finalize",
@ -434,7 +431,7 @@ func TestURLTemplate(t *testing.T) {
{
// dynamic client with named group + namespace + resourceResource (with NO name) + subresource
// /apis/$NAMEDGROUPNAME/$RESOURCEVERSION/namespaces/$NAMESPACE/$RESOURCE/%SUBRESOURCE
Request: NewRequest(nil, "DELETE", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0).
Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
Prefix("/apis/namespaces/namespaces/namespaces/namespaces/namespaces/status"),
ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/namespaces/namespaces/status",
ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/%7Bnamespace%7D/namespaces/status",
@ -442,7 +439,7 @@ func TestURLTemplate(t *testing.T) {
{
// dynamic client with named group + namespace + resourceResource (with no name)
// /apis/$NAMEDGROUPNAME/$RESOURCEVERSION/namespaces/$NAMESPACE/$RESOURCE/%NAME
Request: NewRequest(nil, "DELETE", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0).
Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
Prefix("/apis/namespaces/namespaces/namespaces/namespaces/namespaces"),
ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/namespaces/namespaces",
ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/%7Bnamespace%7D/namespaces",
@ -450,7 +447,7 @@ func TestURLTemplate(t *testing.T) {
{
// dynamic client with named group + resourceResource (with name) + subresource
// /apis/$NAMEDGROUPNAME/$RESOURCEVERSION/namespaces/$NAMESPACE/$RESOURCE/%NAME
Request: NewRequest(nil, "DELETE", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0).
Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
Prefix("/apis/namespaces/namespaces/namespaces/namespaces/finalize"),
ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/namespaces/finalize",
ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/%7Bname%7D/finalize",
@ -458,7 +455,7 @@ func TestURLTemplate(t *testing.T) {
{
// dynamic client with named group + resourceResource (with name) + subresource
// /apis/$NAMEDGROUPNAME/$RESOURCEVERSION/namespaces/$NAMESPACE/$RESOURCE/%NAME
Request: NewRequest(nil, "DELETE", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0).
Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
Prefix("/apis/namespaces/namespaces/namespaces/namespaces/status"),
ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/namespaces/status",
ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/%7Bname%7D/status",
@ -466,7 +463,7 @@ func TestURLTemplate(t *testing.T) {
{
// dynamic client with named group + resourceResource (with name)
// /apis/$NAMEDGROUPNAME/$RESOURCEVERSION/$RESOURCE/%NAME
Request: NewRequest(nil, "DELETE", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0).
Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
Prefix("/apis/namespaces/namespaces/namespaces/namespaces"),
ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/namespaces",
ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/%7Bname%7D",
@ -474,7 +471,7 @@ func TestURLTemplate(t *testing.T) {
{
// dynamic client with named group + resourceResource (with no name)
// /apis/$NAMEDGROUPNAME/$RESOURCEVERSION/$RESOURCE/%NAME
Request: NewRequest(nil, "DELETE", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0).
Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
Prefix("/apis/namespaces/namespaces/namespaces"),
ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces",
ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces",
@ -482,7 +479,7 @@ func TestURLTemplate(t *testing.T) {
{
// dynamic client with wrong api group + namespace + resourceResource (with name) + subresource
// /apis/$NAMEDGROUPNAME/$RESOURCEVERSION/namespaces/$NAMESPACE/$RESOURCE/%NAME/$SUBRESOURCE
Request: NewRequest(nil, "DELETE", uri, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0).
Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
Prefix("/pre1/namespaces/namespaces/namespaces/namespaces/namespaces/namespaces/finalize"),
ExpectedFullURL: "http://localhost/some/base/url/path/pre1/namespaces/namespaces/namespaces/namespaces/namespaces/namespaces/finalize",
ExpectedFinalURL: "http://localhost/%7Bprefix%7D",
@ -550,7 +547,7 @@ func TestTransformResponse(t *testing.T) {
{Response: &http.Response{StatusCode: http.StatusOK, Body: ioutil.NopCloser(bytes.NewReader(invalid))}, Data: invalid},
}
for i, test := range testCases {
r := NewRequest(nil, "", uri, "", defaultContentConfig(), defaultSerializers(t), nil, nil, 0)
r := NewRequestWithClient(uri, "", defaultContentConfig(), nil)
if test.Response.Body == nil {
test.Response.Body = ioutil.NopCloser(bytes.NewReader([]byte{}))
}
@ -589,13 +586,21 @@ type renegotiator struct {
err error
}
func (r *renegotiator) invoke(contentType string, params map[string]string) (runtime.Decoder, error) {
func (r *renegotiator) Decoder(contentType string, params map[string]string) (runtime.Decoder, error) {
r.called = true
r.contentType = contentType
r.params = params
return r.decoder, r.err
}
func (r *renegotiator) Encoder(contentType string, params map[string]string) (runtime.Encoder, error) {
return nil, fmt.Errorf("UNIMPLEMENTED")
}
func (r *renegotiator) StreamDecoder(contentType string, params map[string]string) (runtime.Decoder, runtime.Serializer, runtime.Framer, error) {
return nil, nil, nil, fmt.Errorf("UNIMPLEMENTED")
}
func TestTransformResponseNegotiate(t *testing.T) {
invalid := []byte("aaaaa")
uri, _ := url.Parse("http://localhost")
@ -619,6 +624,8 @@ func TestTransformResponseNegotiate(t *testing.T) {
Header: http.Header{"Content-Type": []string{"application/json"}},
Body: ioutil.NopCloser(bytes.NewReader(invalid)),
},
Called: true,
ExpectContentType: "application/json",
Error: true,
ErrFn: func(err error) bool {
return err.Error() != "aaaaa" && apierrors.IsUnauthorized(err)
@ -655,22 +662,26 @@ func TestTransformResponseNegotiate(t *testing.T) {
},
},
{
// no negotiation when no content type specified
// negotiate when no content type specified
Response: &http.Response{
StatusCode: http.StatusOK,
Header: http.Header{"Content-Type": []string{"text/any"}},
Body: ioutil.NopCloser(bytes.NewReader(invalid)),
},
Decoder: scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion),
Called: true,
ExpectContentType: "text/any",
},
{
// no negotiation when no response content type specified
// negotiate when no response content type specified
ContentType: "text/any",
Response: &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(bytes.NewReader(invalid)),
},
Decoder: scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion),
Called: true,
ExpectContentType: "text/any",
},
{
// unrecognized content type is not handled
@ -693,15 +704,14 @@ func TestTransformResponseNegotiate(t *testing.T) {
},
}
for i, test := range testCases {
serializers := defaultSerializers(t)
contentConfig := defaultContentConfig()
contentConfig.ContentType = test.ContentType
negotiator := &renegotiator{
decoder: test.Decoder,
err: test.NegotiateErr,
}
serializers.RenegotiatedDecoder = negotiator.invoke
contentConfig := defaultContentConfig()
contentConfig.ContentType = test.ContentType
r := NewRequest(nil, "", uri, "", contentConfig, serializers, nil, nil, 0)
contentConfig.Negotiator = negotiator
r := NewRequestWithClient(uri, "", contentConfig, nil)
if test.Response.Body == nil {
test.Response.Body = ioutil.NopCloser(bytes.NewReader([]byte{}))
}
@ -828,21 +838,22 @@ func TestTransformUnstructuredError(t *testing.T) {
},
}
for i, testCase := range testCases {
for _, testCase := range testCases {
t.Run("", func(t *testing.T) {
r := &Request{
c: &RESTClient{
content: defaultContentConfig(),
serializers: defaultSerializers(t),
},
resourceName: testCase.Name,
resource: testCase.Resource,
}
result := r.transformResponse(testCase.Res, testCase.Req)
err := result.err
if !testCase.ErrFn(err) {
t.Errorf("unexpected error: %v", err)
continue
t.Fatalf("unexpected error: %v", err)
}
if !apierrors.IsUnexpectedServerError(err) {
t.Errorf("%d: unexpected error type: %v", i, err)
t.Errorf("unexpected error type: %v", err)
}
if len(testCase.Name) != 0 && !strings.Contains(err.Error(), testCase.Name) {
t.Errorf("unexpected error string: %s", err)
@ -858,23 +869,24 @@ func TestTransformUnstructuredError(t *testing.T) {
expect = err
}
if !reflect.DeepEqual(expect, transformed) {
t.Errorf("%d: unexpected Error(): %s", i, diff.ObjectReflectDiff(expect, transformed))
t.Errorf("unexpected Error(): %s", diff.ObjectReflectDiff(expect, transformed))
}
// verify result.Get properly transforms the error
if _, err := result.Get(); !reflect.DeepEqual(expect, err) {
t.Errorf("%d: unexpected error on Get(): %s", i, diff.ObjectReflectDiff(expect, err))
t.Errorf("unexpected error on Get(): %s", diff.ObjectReflectDiff(expect, err))
}
// verify result.Into properly handles the error
if err := result.Into(&v1.Pod{}); !reflect.DeepEqual(expect, err) {
t.Errorf("%d: unexpected error on Into(): %s", i, diff.ObjectReflectDiff(expect, err))
t.Errorf("unexpected error on Into(): %s", diff.ObjectReflectDiff(expect, err))
}
// verify result.Raw leaves the error in the untransformed state
if _, err := result.Raw(); !reflect.DeepEqual(result.err, err) {
t.Errorf("%d: unexpected error on Raw(): %s", i, diff.ObjectReflectDiff(expect, err))
t.Errorf("unexpected error on Raw(): %s", diff.ObjectReflectDiff(expect, err))
}
})
}
}
@ -898,27 +910,32 @@ func TestRequestWatch(t *testing.T) {
Err: true,
},
{
Request: &Request{baseURL: &url.URL{}, pathPrefix: "%"},
Request: &Request{c: &RESTClient{base: &url.URL{}}, pathPrefix: "%"},
Err: true,
},
{
Request: &Request{
client: clientFunc(func(req *http.Request) (*http.Response, error) {
c: &RESTClient{
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
return nil, errors.New("err")
}),
baseURL: &url.URL{},
base: &url.URL{},
},
},
Err: true,
},
{
Request: &Request{
c: &RESTClient{
content: defaultContentConfig(),
serializers: defaultSerializers(t),
client: clientFunc(func(req *http.Request) (*http.Response, error) {
resp := &http.Response{StatusCode: http.StatusOK, Body: errorReader{err: errors.New("test error")}}
return resp, nil
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusForbidden,
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
}, nil
}),
baseURL: &url.URL{},
base: &url.URL{},
},
},
Expect: []watch.Event{
{
@ -943,18 +960,23 @@ func TestRequestWatch(t *testing.T) {
},
},
},
Err: true,
ErrFn: func(err error) bool {
return apierrors.IsForbidden(err)
},
},
{
Request: &Request{
c: &RESTClient{
content: defaultContentConfig(),
serializers: defaultSerializers(t),
client: clientFunc(func(req *http.Request) (*http.Response, error) {
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusForbidden,
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
}, nil
}),
baseURL: &url.URL{},
base: &url.URL{},
},
},
Err: true,
ErrFn: func(err error) bool {
@ -963,15 +985,16 @@ func TestRequestWatch(t *testing.T) {
},
{
Request: &Request{
c: &RESTClient{
content: defaultContentConfig(),
serializers: defaultSerializers(t),
client: clientFunc(func(req *http.Request) (*http.Response, error) {
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnauthorized,
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
}, nil
}),
baseURL: &url.URL{},
base: &url.URL{},
},
},
Err: true,
ErrFn: func(err error) bool {
@ -980,9 +1003,9 @@ func TestRequestWatch(t *testing.T) {
},
{
Request: &Request{
c: &RESTClient{
content: defaultContentConfig(),
serializers: defaultSerializers(t),
client: clientFunc(func(req *http.Request) (*http.Response, error) {
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnauthorized,
Body: ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &metav1.Status{
@ -991,7 +1014,8 @@ func TestRequestWatch(t *testing.T) {
})))),
}, nil
}),
baseURL: &url.URL{},
base: &url.URL{},
},
},
Err: true,
ErrFn: func(err error) bool {
@ -1000,63 +1024,60 @@ func TestRequestWatch(t *testing.T) {
},
{
Request: &Request{
serializers: defaultSerializers(t),
client: clientFunc(func(req *http.Request) (*http.Response, error) {
c: &RESTClient{
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
return nil, io.EOF
}),
baseURL: &url.URL{},
base: &url.URL{},
},
},
Empty: true,
},
{
Request: &Request{
serializers: defaultSerializers(t),
client: clientFunc(func(req *http.Request) (*http.Response, error) {
return nil, &url.Error{Err: io.EOF}
}),
baseURL: &url.URL{},
},
Empty: true,
},
{
Request: &Request{
serializers: defaultSerializers(t),
client: clientFunc(func(req *http.Request) (*http.Response, error) {
c: &RESTClient{
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
return nil, errors.New("http: can't write HTTP request on broken connection")
}),
baseURL: &url.URL{},
base: &url.URL{},
},
},
Empty: true,
},
{
Request: &Request{
serializers: defaultSerializers(t),
client: clientFunc(func(req *http.Request) (*http.Response, error) {
c: &RESTClient{
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
return nil, errors.New("foo: connection reset by peer")
}),
baseURL: &url.URL{},
base: &url.URL{},
},
},
Empty: true,
},
}
for i, testCase := range testCases {
for _, testCase := range testCases {
t.Run("", func(t *testing.T) {
testCase.Request.backoffMgr = &NoBackoff{}
testCase.Request.backoff = &NoBackoff{}
watch, err := testCase.Request.Watch()
hasErr := err != nil
if hasErr != testCase.Err {
t.Fatalf("%d: expected %t, got %t: %v", i, testCase.Err, hasErr, err)
t.Fatalf("expected %t, got %t: %v", testCase.Err, hasErr, err)
}
if testCase.ErrFn != nil && !testCase.ErrFn(err) {
t.Errorf("%d: error not valid: %v", i, err)
t.Errorf("error not valid: %v", err)
}
if hasErr && watch != nil {
t.Fatalf("%d: watch should be nil when error is returned", i)
t.Fatalf("watch should be nil when error is returned")
}
if hasErr {
return
}
defer watch.Stop()
if testCase.Empty {
_, ok := <-watch.ResultChan()
evt, ok := <-watch.ResultChan()
if ok {
t.Errorf("%d: expected the watch to be empty: %#v", i, watch)
t.Errorf("expected the watch to be empty: %#v", evt)
}
}
if testCase.Expect != nil {
@ -1085,21 +1106,24 @@ func TestRequestStream(t *testing.T) {
Err: true,
},
{
Request: &Request{baseURL: &url.URL{}, pathPrefix: "%"},
Request: &Request{c: &RESTClient{base: &url.URL{}}, pathPrefix: "%"},
Err: true,
},
{
Request: &Request{
client: clientFunc(func(req *http.Request) (*http.Response, error) {
c: &RESTClient{
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
return nil, errors.New("err")
}),
baseURL: &url.URL{},
base: &url.URL{},
},
},
Err: true,
},
{
Request: &Request{
client: clientFunc(func(req *http.Request) (*http.Response, error) {
c: &RESTClient{
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnauthorized,
Body: ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &metav1.Status{
@ -1109,22 +1133,23 @@ func TestRequestStream(t *testing.T) {
}, nil
}),
content: defaultContentConfig(),
serializers: defaultSerializers(t),
baseURL: &url.URL{},
base: &url.URL{},
},
},
Err: true,
},
{
Request: &Request{
client: clientFunc(func(req *http.Request) (*http.Response, error) {
c: &RESTClient{
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusBadRequest,
Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"a container name must be specified for pod kube-dns-v20-mz5cv, choose one of: [kubedns dnsmasq healthz]","reason":"BadRequest","code":400}`))),
}, nil
}),
content: defaultContentConfig(),
serializers: defaultSerializers(t),
baseURL: &url.URL{},
base: &url.URL{},
},
},
Err: true,
ErrFn: func(err error) bool {
@ -1136,7 +1161,7 @@ func TestRequestStream(t *testing.T) {
},
}
for i, testCase := range testCases {
testCase.Request.backoffMgr = &NoBackoff{}
testCase.Request.backoff = &NoBackoff{}
body, err := testCase.Request.Stream()
hasErr := err != nil
if hasErr != testCase.Err {
@ -1194,25 +1219,27 @@ func TestRequestDo(t *testing.T) {
Err bool
}{
{
Request: &Request{err: errors.New("bail")},
Request: &Request{c: &RESTClient{}, err: errors.New("bail")},
Err: true,
},
{
Request: &Request{baseURL: &url.URL{}, pathPrefix: "%"},
Request: &Request{c: &RESTClient{base: &url.URL{}}, pathPrefix: "%"},
Err: true,
},
{
Request: &Request{
client: clientFunc(func(req *http.Request) (*http.Response, error) {
c: &RESTClient{
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
return nil, errors.New("err")
}),
baseURL: &url.URL{},
base: &url.URL{},
},
},
Err: true,
},
}
for i, testCase := range testCases {
testCase.Request.backoffMgr = &NoBackoff{}
testCase.Request.backoff = &NoBackoff{}
body, err := testCase.Request.Do().Raw()
hasErr := err != nil
if hasErr != testCase.Err {
@ -1281,7 +1308,7 @@ func TestBackoffLifecycle(t *testing.T) {
seconds := []int{0, 1, 2, 4, 8, 0, 1, 2, 4, 0}
request := c.Verb("POST").Prefix("backofftest").Suffix("abc")
clock := clock.FakeClock{}
request.backoffMgr = &URLBackoff{
request.backoff = &URLBackoff{
// Use a fake backoff here to avoid flakes and speed the test up.
Backoff: flowcontrol.NewFakeBackOff(
time.Duration(1)*time.Second,
@ -1290,7 +1317,7 @@ func TestBackoffLifecycle(t *testing.T) {
)}
for _, sec := range seconds {
thisBackoff := request.backoffMgr.CalculateBackoff(request.URL())
thisBackoff := request.backoff.CalculateBackoff(request.URL())
t.Logf("Current backoff %v", thisBackoff)
if thisBackoff != time.Duration(sec)*time.Second {
t.Errorf("Backoff is %v instead of %v", thisBackoff, sec)
@ -1335,11 +1362,11 @@ func TestCheckRetryClosesBody(t *testing.T) {
}))
defer testServer.Close()
backoffMgr := &testBackoffManager{}
backoff := &testBackoffManager{}
expectedSleeps := []time.Duration{0, time.Second, 0, time.Second, 0, time.Second, 0, time.Second, 0}
c := testRESTClient(t, testServer)
c.createBackoffMgr = func() BackoffManager { return backoffMgr }
c.createBackoffMgr = func() BackoffManager { return backoff }
_, err := c.Verb("POST").
Prefix("foo", "bar").
Suffix("baz").
@ -1353,8 +1380,8 @@ func TestCheckRetryClosesBody(t *testing.T) {
if count != 5 {
t.Errorf("unexpected retries: %d", count)
}
if !reflect.DeepEqual(backoffMgr.sleeps, expectedSleeps) {
t.Errorf("unexpected sleeps, expected: %v, got: %v", expectedSleeps, backoffMgr.sleeps)
if !reflect.DeepEqual(backoff.sleeps, expectedSleeps) {
t.Errorf("unexpected sleeps, expected: %v, got: %v", expectedSleeps, backoff.sleeps)
}
}
@ -1363,7 +1390,8 @@ func TestConnectionResetByPeerIsRetried(t *testing.T) {
backoff := &testBackoffManager{}
req := &Request{
verb: "GET",
client: clientFunc(func(req *http.Request) (*http.Response, error) {
c: &RESTClient{
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
count++
if count >= 3 {
return &http.Response{
@ -1373,7 +1401,8 @@ func TestConnectionResetByPeerIsRetried(t *testing.T) {
}
return nil, &net.OpError{Err: syscall.ECONNRESET}
}),
backoffMgr: backoff,
},
backoff: backoff,
}
// We expect two retries of "connection reset by peer" and the success.
_, err := req.Do().Raw()
@ -1683,7 +1712,7 @@ func TestAbsPath(t *testing.T) {
{"/p1/api/p2", "/api/r1", "/api/", "/p1/api/p2/api/"},
} {
u, _ := url.Parse("http://localhost:123" + tc.configPrefix)
r := NewRequest(nil, "POST", u, "", ContentConfig{GroupVersion: &schema.GroupVersion{Group: "test"}}, Serializers{}, nil, nil, 0).Prefix(tc.resourcePrefix).AbsPath(tc.absPath)
r := NewRequestWithClient(u, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("POST").Prefix(tc.resourcePrefix).AbsPath(tc.absPath)
if r.pathPrefix != tc.wantsAbsPath {
t.Errorf("test case %d failed, unexpected path: %q, expected %q", i, r.pathPrefix, tc.wantsAbsPath)
}
@ -1803,6 +1832,66 @@ func TestWatch(t *testing.T) {
s := testRESTClient(t, testServer)
watching, err := s.Get().Prefix("path/to/watch/thing").Watch()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
for _, item := range table {
got, ok := <-watching.ResultChan()
if !ok {
t.Fatalf("Unexpected early close")
}
if e, a := item.t, got.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
if e, a := item.obj, got.Object; !apiequality.Semantic.DeepDerivative(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
}
_, ok := <-watching.ResultChan()
if ok {
t.Fatal("Unexpected non-close")
}
}
func TestWatchNonDefaultContentType(t *testing.T) {
var table = []struct {
t watch.EventType
obj runtime.Object
}{
{watch.Added, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "first"}}},
{watch.Modified, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "second"}}},
{watch.Deleted, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "last"}}},
}
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
panic("need flusher!")
}
w.Header().Set("Transfer-Encoding", "chunked")
// manually set the content type here so we get the renegotiation behavior
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
flusher.Flush()
encoder := restclientwatch.NewEncoder(streaming.NewEncoder(w, scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion)), scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion))
for _, item := range table {
if err := encoder.Encode(&watch.Event{Type: item.t, Object: item.obj}); err != nil {
panic(err)
}
flusher.Flush()
}
}))
defer testServer.Close()
// set the default content type to protobuf so that we test falling back to JSON serialization
contentConfig := defaultContentConfig()
contentConfig.ContentType = "application/vnd.kubernetes.protobuf"
s := testRESTClientWithConfig(t, testServer, contentConfig)
watching, err := s.Get().Prefix("path/to/watch/thing").Watch()
if err != nil {
t.Fatalf("Unexpected error")
}
@ -1826,6 +1915,45 @@ func TestWatch(t *testing.T) {
}
}
func TestWatchUnknownContentType(t *testing.T) {
var table = []struct {
t watch.EventType
obj runtime.Object
}{
{watch.Added, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "first"}}},
{watch.Modified, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "second"}}},
{watch.Deleted, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "last"}}},
}
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
panic("need flusher!")
}
w.Header().Set("Transfer-Encoding", "chunked")
// manually set the content type here so we get the renegotiation behavior
w.Header().Set("Content-Type", "foobar")
w.WriteHeader(http.StatusOK)
flusher.Flush()
encoder := restclientwatch.NewEncoder(streaming.NewEncoder(w, scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion)), scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion))
for _, item := range table {
if err := encoder.Encode(&watch.Event{Type: item.t, Object: item.obj}); err != nil {
panic(err)
}
flusher.Flush()
}
}))
defer testServer.Close()
s := testRESTClient(t, testServer)
_, err := s.Get().Prefix("path/to/watch/thing").Watch()
if err == nil {
t.Fatalf("Expected to fail due to lack of known stream serialization for content type")
}
}
func TestStream(t *testing.T) {
expectedBody := "expected body"
@ -1856,21 +1984,27 @@ func TestStream(t *testing.T) {
}
}
func testRESTClient(t testing.TB, srv *httptest.Server) *RESTClient {
baseURL, _ := url.Parse("http://localhost")
func testRESTClientWithConfig(t testing.TB, srv *httptest.Server, contentConfig ClientContentConfig) *RESTClient {
base, _ := url.Parse("http://localhost")
if srv != nil {
var err error
baseURL, err = url.Parse(srv.URL)
base, err = url.Parse(srv.URL)
if err != nil {
t.Fatalf("failed to parse test URL: %v", err)
}
}
versionedAPIPath := defaultResourcePathWithPrefix("", "", "", "")
client, err := NewRESTClient(baseURL, versionedAPIPath, defaultContentConfig(), 0, 0, nil, nil)
client, err := NewRESTClient(base, versionedAPIPath, contentConfig, nil, nil)
if err != nil {
t.Fatalf("failed to create a client: %v", err)
}
return client
}
func testRESTClient(t testing.TB, srv *httptest.Server) *RESTClient {
contentConfig := defaultContentConfig()
return testRESTClientWithConfig(t, srv, contentConfig)
}
func TestDoContext(t *testing.T) {