client-go/rest: move content type wiring from client to request

Signed-off-by: Monis Khan <mok@microsoft.com>

Kubernetes-commit: fe1eda0649fdb6a15200d279be214c67246d3b12
This commit is contained in:
Monis Khan 2024-08-18 15:01:57 -04:00 committed by Kubernetes Publisher
parent c5146a9031
commit 1647efd5c4
4 changed files with 70 additions and 64 deletions

View File

@ -105,10 +105,6 @@ type RESTClient struct {
// NewRESTClient creates a new RESTClient. This client performs generic REST functions // NewRESTClient creates a new RESTClient. This client performs generic REST functions
// such as Get, Put, Post, and Delete on specified paths. // such as Get, Put, Post, and Delete on specified paths.
func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ClientContentConfig, rateLimiter flowcontrol.RateLimiter, client *http.Client) (*RESTClient, error) { func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ClientContentConfig, rateLimiter flowcontrol.RateLimiter, client *http.Client) (*RESTClient, error) {
if len(config.ContentType) == 0 {
config.ContentType = "application/json"
}
base := *baseURL base := *baseURL
if !strings.HasSuffix(base.Path, "/") { if !strings.HasSuffix(base.Path, "/") {
base.Path += "/" base.Path += "/"

View File

@ -100,6 +100,9 @@ func defaultRequestRetryFn(maxRetries int) WithRetry {
type Request struct { type Request struct {
c *RESTClient c *RESTClient
contentConfig ClientContentConfig
contentTypeNotSet bool
warningHandler WarningHandler warningHandler WarningHandler
rateLimiter flowcontrol.RateLimiter rateLimiter flowcontrol.RateLimiter
@ -153,6 +156,12 @@ func NewRequest(c *RESTClient) *Request {
timeout = c.Client.Timeout timeout = c.Client.Timeout
} }
contentConfig := c.content
contentTypeNotSet := len(contentConfig.ContentType) == 0
if contentTypeNotSet {
contentConfig.ContentType = "application/json"
}
r := &Request{ r := &Request{
c: c, c: c,
rateLimiter: c.rateLimiter, rateLimiter: c.rateLimiter,
@ -162,6 +171,9 @@ func NewRequest(c *RESTClient) *Request {
maxRetries: 10, maxRetries: 10,
retryFn: defaultRequestRetryFn, retryFn: defaultRequestRetryFn,
warningHandler: c.warningHandler, warningHandler: c.warningHandler,
contentConfig: contentConfig,
contentTypeNotSet: contentTypeNotSet,
} }
switch { switch {
@ -371,7 +383,7 @@ func (r *Request) Param(paramName, s string) *Request {
// VersionedParams will not write query parameters that have omitempty set and are empty. If a // VersionedParams will not write query parameters that have omitempty set and are empty. If a
// parameter has already been set it is appended to (Params and VersionedParams are additive). // parameter has already been set it is appended to (Params and VersionedParams are additive).
func (r *Request) VersionedParams(obj runtime.Object, codec runtime.ParameterCodec) *Request { func (r *Request) VersionedParams(obj runtime.Object, codec runtime.ParameterCodec) *Request {
return r.SpecificallyVersionedParams(obj, codec, r.c.content.GroupVersion) return r.SpecificallyVersionedParams(obj, codec, r.contentConfig.GroupVersion)
} }
func (r *Request) SpecificallyVersionedParams(obj runtime.Object, codec runtime.ParameterCodec, version schema.GroupVersion) *Request { func (r *Request) SpecificallyVersionedParams(obj runtime.Object, codec runtime.ParameterCodec, version schema.GroupVersion) *Request {
@ -464,7 +476,7 @@ func (r *Request) Body(obj interface{}) *Request {
if reflect.ValueOf(t).IsNil() { if reflect.ValueOf(t).IsNil() {
return r return r
} }
encoder, err := r.c.content.Negotiator.Encoder(r.c.content.ContentType, nil) encoder, err := r.contentConfig.Negotiator.Encoder(r.contentConfig.ContentType, nil)
if err != nil { if err != nil {
r.err = err r.err = err
return r return r
@ -476,7 +488,7 @@ func (r *Request) Body(obj interface{}) *Request {
} }
r.body = nil r.body = nil
r.bodyBytes = data r.bodyBytes = data
r.SetHeader("Content-Type", r.c.content.ContentType) r.SetHeader("Content-Type", r.contentConfig.ContentType)
default: default:
r.err = fmt.Errorf("unknown type used for body: %+v", obj) r.err = fmt.Errorf("unknown type used for body: %+v", obj)
} }
@ -944,7 +956,7 @@ func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, runtim
if err != nil { if err != nil {
klog.V(4).Infof("Unexpected content type from the server: %q: %v", contentType, err) klog.V(4).Infof("Unexpected content type from the server: %q: %v", contentType, err)
} }
objectDecoder, streamingSerializer, framer, err := r.c.content.Negotiator.StreamDecoder(mediaType, params) objectDecoder, streamingSerializer, framer, err := r.contentConfig.Negotiator.StreamDecoder(mediaType, params)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -1310,7 +1322,7 @@ func (r *Request) transformResponse(ctx context.Context, resp *http.Response, re
var decoder runtime.Decoder var decoder runtime.Decoder
contentType := resp.Header.Get("Content-Type") contentType := resp.Header.Get("Content-Type")
if len(contentType) == 0 { if len(contentType) == 0 {
contentType = r.c.content.ContentType contentType = r.contentConfig.ContentType
} }
if len(contentType) > 0 { if len(contentType) > 0 {
var err error var err error
@ -1318,7 +1330,7 @@ func (r *Request) transformResponse(ctx context.Context, resp *http.Response, re
if err != nil { if err != nil {
return Result{err: errors.NewInternalError(err)} return Result{err: errors.NewInternalError(err)}
} }
decoder, err = r.c.content.Negotiator.Decoder(mediaType, params) decoder, err = r.contentConfig.Negotiator.Decoder(mediaType, params)
if err != nil { if err != nil {
// if we fail to negotiate a decoder, treat this as an unstructured error // if we fail to negotiate a decoder, treat this as an unstructured error
switch { switch {
@ -1445,7 +1457,7 @@ func (r *Request) newUnstructuredResponseError(body []byte, isTextResponse bool,
} }
var groupResource schema.GroupResource var groupResource schema.GroupResource
if len(r.resource) > 0 { if len(r.resource) > 0 {
groupResource.Group = r.c.content.GroupVersion.Group groupResource.Group = r.contentConfig.GroupVersion.Group
groupResource.Resource = r.resource groupResource.Resource = r.resource
} }
return errors.NewGenericServerResponse( return errors.NewGenericServerResponse(

View File

@ -43,6 +43,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality" apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -66,7 +67,7 @@ import (
func TestNewRequestSetsAccept(t *testing.T) { func TestNewRequestSetsAccept(t *testing.T) {
r := NewRequestWithClient(&url.URL{Path: "/path/"}, "", ClientContentConfig{}, nil).Verb("get") r := NewRequestWithClient(&url.URL{Path: "/path/"}, "", ClientContentConfig{}, nil).Verb("get")
if r.headers.Get("Accept") != "" { if r.headers.Get("Accept") != "application/json, */*" {
t.Errorf("unexpected headers: %#v", r.headers) t.Errorf("unexpected headers: %#v", r.headers)
} }
r = NewRequestWithClient(&url.URL{Path: "/path/"}, "", ClientContentConfig{ContentType: "application/other"}, nil).Verb("get") r = NewRequestWithClient(&url.URL{Path: "/path/"}, "", ClientContentConfig{ContentType: "application/other"}, nil).Verb("get")
@ -112,9 +113,9 @@ func TestRequestWithErrorWontChange(t *testing.T) {
gvCopy := v1.SchemeGroupVersion gvCopy := v1.SchemeGroupVersion
original := Request{ original := Request{
err: errors.New("test"), err: errors.New("test"),
c: &RESTClient{ c: &RESTClient{},
content: ClientContentConfig{GroupVersion: gvCopy},
}, contentConfig: ClientContentConfig{GroupVersion: gvCopy},
} }
r := original r := original
changed := r.Param("foo", "bar"). changed := r.Param("foo", "bar").
@ -236,7 +237,7 @@ func TestRequestParam(t *testing.T) {
} }
func TestRequestVersionedParams(t *testing.T) { func TestRequestVersionedParams(t *testing.T) {
r := (&Request{c: &RESTClient{content: ClientContentConfig{GroupVersion: v1.SchemeGroupVersion}}}).Param("foo", "a") r := (&Request{c: &RESTClient{}, contentConfig: ClientContentConfig{GroupVersion: v1.SchemeGroupVersion}}).Param("foo", "a")
if !reflect.DeepEqual(r.params, url.Values{"foo": []string{"a"}}) { if !reflect.DeepEqual(r.params, url.Values{"foo": []string{"a"}}) {
t.Errorf("should have set a param: %#v", r) t.Errorf("should have set a param: %#v", r)
} }
@ -252,7 +253,7 @@ func TestRequestVersionedParams(t *testing.T) {
} }
func TestRequestVersionedParamsFromListOptions(t *testing.T) { func TestRequestVersionedParamsFromListOptions(t *testing.T) {
r := &Request{c: &RESTClient{content: ClientContentConfig{GroupVersion: v1.SchemeGroupVersion}}} r := &Request{c: &RESTClient{}, contentConfig: ClientContentConfig{GroupVersion: v1.SchemeGroupVersion}}
r.VersionedParams(&metav1.ListOptions{ResourceVersion: "1"}, scheme.ParameterCodec) r.VersionedParams(&metav1.ListOptions{ResourceVersion: "1"}, scheme.ParameterCodec)
if !reflect.DeepEqual(r.params, url.Values{ if !reflect.DeepEqual(r.params, url.Values{
"resourceVersion": []string{"1"}, "resourceVersion": []string{"1"},
@ -272,7 +273,7 @@ func TestRequestVersionedParamsFromListOptions(t *testing.T) {
func TestRequestVersionedParamsWithInvalidScheme(t *testing.T) { func TestRequestVersionedParamsWithInvalidScheme(t *testing.T) {
parameterCodec := runtime.NewParameterCodec(runtime.NewScheme()) parameterCodec := runtime.NewParameterCodec(runtime.NewScheme())
r := (&Request{c: &RESTClient{content: ClientContentConfig{GroupVersion: v1.SchemeGroupVersion}}}) r := &Request{c: &RESTClient{}, contentConfig: ClientContentConfig{GroupVersion: v1.SchemeGroupVersion}}
r.VersionedParams(&v1.PodExecOptions{Stdin: false, Stdout: true}, r.VersionedParams(&v1.PodExecOptions{Stdin: false, Stdout: true},
parameterCodec) parameterCodec)
@ -336,7 +337,7 @@ func TestRequestBody(t *testing.T) {
} }
// test unencodable api object // test unencodable api object
r = (&Request{c: &RESTClient{content: defaultContentConfig()}}).Body(&NotAnAPIObject{}) r = (&Request{c: &RESTClient{}, contentConfig: defaultContentConfig()}).Body(&NotAnAPIObject{})
if r.err == nil || r.body != nil { if r.err == nil || r.body != nil {
t.Errorf("should have set err and left body nil: %#v", r) t.Errorf("should have set err and left body nil: %#v", r)
} }
@ -901,9 +902,8 @@ func TestTransformUnstructuredError(t *testing.T) {
t.Run("", func(t *testing.T) { t.Run("", func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t) _, ctx := ktesting.NewTestContext(t)
r := &Request{ r := &Request{
c: &RESTClient{ contentConfig: defaultContentConfig(),
content: defaultContentConfig(), c: &RESTClient{},
},
resourceName: testCase.Name, resourceName: testCase.Name,
resource: testCase.Resource, resource: testCase.Resource,
} }
@ -989,8 +989,8 @@ func TestRequestWatch(t *testing.T) {
{ {
name: "server returns forbidden with json content", name: "server returns forbidden with json content",
Request: &Request{ Request: &Request{
contentConfig: defaultContentConfig(),
c: &RESTClient{ c: &RESTClient{
content: defaultContentConfig(),
base: &url.URL{}, base: &url.URL{},
}, },
}, },
@ -1018,8 +1018,8 @@ func TestRequestWatch(t *testing.T) {
{ {
name: "server returns forbidden without content", name: "server returns forbidden without content",
Request: &Request{ Request: &Request{
contentConfig: defaultContentConfig(),
c: &RESTClient{ c: &RESTClient{
content: defaultContentConfig(),
base: &url.URL{}, base: &url.URL{},
}, },
}, },
@ -1038,8 +1038,8 @@ func TestRequestWatch(t *testing.T) {
{ {
name: "server returns unauthorized", name: "server returns unauthorized",
Request: &Request{ Request: &Request{
contentConfig: defaultContentConfig(),
c: &RESTClient{ c: &RESTClient{
content: defaultContentConfig(),
base: &url.URL{}, base: &url.URL{},
}, },
}, },
@ -1058,8 +1058,8 @@ func TestRequestWatch(t *testing.T) {
{ {
name: "server returns unauthorized", name: "server returns unauthorized",
Request: &Request{ Request: &Request{
contentConfig: defaultContentConfig(),
c: &RESTClient{ c: &RESTClient{
content: defaultContentConfig(),
base: &url.URL{}, base: &url.URL{},
}, },
}, },
@ -1254,8 +1254,8 @@ func TestRequestStream(t *testing.T) {
}, },
{ {
Request: &Request{ Request: &Request{
contentConfig: defaultContentConfig(),
c: &RESTClient{ c: &RESTClient{
content: defaultContentConfig(),
base: &url.URL{}, base: &url.URL{},
}, },
}, },
@ -1273,8 +1273,8 @@ func TestRequestStream(t *testing.T) {
}, },
{ {
Request: &Request{ Request: &Request{
contentConfig: defaultContentConfig(),
c: &RESTClient{ c: &RESTClient{
content: defaultContentConfig(),
base: &url.URL{}, base: &url.URL{},
}, },
}, },
@ -2960,8 +2960,8 @@ func testRequestWithRetry(t *testing.T, key string, doFunc func(ctx context.Cont
verb: test.verb, verb: test.verb,
body: test.body, body: test.body,
bodyBytes: test.bodyBytes, bodyBytes: test.bodyBytes,
contentConfig: defaultContentConfig(),
c: &RESTClient{ c: &RESTClient{
content: defaultContentConfig(),
Client: client, Client: client,
}, },
backoff: &noSleepBackOff{}, backoff: &noSleepBackOff{},
@ -3253,9 +3253,9 @@ func testRetryWithRateLimiterBackoffAndMetrics(t *testing.T, key string, doFunc
req := &Request{ req := &Request{
verb: "GET", verb: "GET",
bodyBytes: []byte{}, bodyBytes: []byte{},
contentConfig: defaultContentConfig(),
c: &RESTClient{ c: &RESTClient{
base: base, base: base,
content: defaultContentConfig(),
Client: client, Client: client,
rateLimiter: interceptor, rateLimiter: interceptor,
}, },
@ -3389,9 +3389,9 @@ func testWithRetryInvokeOrder(t *testing.T, key string, doFunc func(ctx context.
req := &Request{ req := &Request{
verb: "GET", verb: "GET",
bodyBytes: []byte{}, bodyBytes: []byte{},
contentConfig: defaultContentConfig(),
c: &RESTClient{ c: &RESTClient{
base: base, base: base,
content: defaultContentConfig(),
Client: client, Client: client,
}, },
pathPrefix: "/api/v1", pathPrefix: "/api/v1",
@ -3564,9 +3564,9 @@ func testWithWrapPreviousError(t *testing.T, doFunc func(ctx context.Context, r
req := &Request{ req := &Request{
verb: "GET", verb: "GET",
bodyBytes: []byte{}, bodyBytes: []byte{},
contentConfig: defaultContentConfig(),
c: &RESTClient{ c: &RESTClient{
base: base, base: base,
content: defaultContentConfig(),
Client: client, Client: client,
}, },
pathPrefix: "/api/v1", pathPrefix: "/api/v1",
@ -3987,9 +3987,9 @@ func TestRetryableConditions(t *testing.T) {
u, _ := url.Parse("http://localhost:123" + "/apis") u, _ := url.Parse("http://localhost:123" + "/apis")
req := &Request{ req := &Request{
verb: verb, verb: verb,
contentConfig: defaultContentConfig(),
c: &RESTClient{ c: &RESTClient{
base: u, base: u,
content: defaultContentConfig(),
Client: client, Client: client,
}, },
backoff: &noSleepBackOff{}, backoff: &noSleepBackOff{},
@ -4031,8 +4031,8 @@ func TestRequestConcurrencyWithRetry(t *testing.T) {
req := &Request{ req := &Request{
verb: "POST", verb: "POST",
contentConfig: defaultContentConfig(),
c: &RESTClient{ c: &RESTClient{
content: defaultContentConfig(),
Client: client, Client: client,
}, },
backoff: &noSleepBackOff{}, backoff: &noSleepBackOff{},

View File

@ -200,9 +200,7 @@ func TestWatchListSuccess(t *testing.T) {
ctx := context.Background() ctx := context.Background()
fakeWatcher := watch.NewFake() fakeWatcher := watch.NewFake()
target := &Request{ target := &Request{
c: &RESTClient{ c: &RESTClient{},
content: ClientContentConfig{},
},
} }
go func(watchEvents []watch.Event) { go func(watchEvents []watch.Event) {