Merge pull request #84692 from smarterclayton/protocol_errors

Fix watch negotiation when using a non-default mime type in the client
This commit is contained in:
Kubernetes Prow Robot 2019-11-10 21:41:56 -08:00 committed by GitHub
commit c28921f248
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
50 changed files with 1036 additions and 1614 deletions

View File

@ -412,7 +412,6 @@ staging/src/k8s.io/apiserver/plugin/pkg/authorizer/webhook
staging/src/k8s.io/cli-runtime/pkg/genericclioptions
staging/src/k8s.io/cli-runtime/pkg/printers
staging/src/k8s.io/cli-runtime/pkg/resource
staging/src/k8s.io/client-go/deprecated-dynamic
staging/src/k8s.io/client-go/discovery
staging/src/k8s.io/client-go/discovery/fake
staging/src/k8s.io/client-go/dynamic

View File

@ -31,6 +31,7 @@ import (
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/httpstream"
@ -232,11 +233,11 @@ func TestStream(t *testing.T) {
server := httptest.NewServer(fakeServer(t, requestReceived, name, exec, testCase.Stdin, testCase.Stdout, testCase.Stderr, testCase.Error, testCase.Tty, testCase.MessageCount, testCase.ServerProtocols))
url, _ := url.ParseRequestURI(server.URL)
config := restclient.ContentConfig{
GroupVersion: &schema.GroupVersion{Group: "x"},
NegotiatedSerializer: legacyscheme.Codecs,
config := restclient.ClientContentConfig{
GroupVersion: schema.GroupVersion{Group: "x"},
Negotiator: runtime.NewClientNegotiator(legacyscheme.Codecs.WithoutConversion(), schema.GroupVersion{Group: "x"}),
}
c, err := restclient.NewRESTClient(url, "", config, -1, -1, nil, nil)
c, err := restclient.NewRESTClient(url, "", config, nil, nil)
if err != nil {
t.Fatalf("failed to create a client: %v", err)
}

View File

@ -127,7 +127,7 @@ func TestRunAccessCheck(t *testing.T) {
tf := cmdtesting.NewTestFactory().WithNamespace("test")
defer tf.Cleanup()
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
tf.Client = &fake.RESTClient{
GroupVersion: schema.GroupVersion{Group: "", Version: "v1"},
@ -197,7 +197,7 @@ func TestRunAccessList(t *testing.T) {
tf := cmdtesting.NewTestFactory().WithNamespace("test")
defer tf.Cleanup()
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
codec := scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...)
tf.Client = &fake.RESTClient{

View File

@ -551,7 +551,7 @@ func TestBadTar(t *testing.T) {
func TestCopyToPod(t *testing.T) {
tf := cmdtesting.NewTestFactory().WithNamespace("test")
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
codec := scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...)
tf.Client = &fake.RESTClient{
@ -621,7 +621,7 @@ func TestCopyToPod(t *testing.T) {
func TestCopyToPodNoPreserve(t *testing.T) {
tf := cmdtesting.NewTestFactory().WithNamespace("test")
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
codec := scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...)
tf.Client = &fake.RESTClient{

View File

@ -50,6 +50,7 @@ go_library(
"helper.go",
"interfaces.go",
"mapper.go",
"negotiate.go",
"register.go",
"scheme.go",
"scheme_builder.go",

View File

@ -155,6 +155,28 @@ type NegotiatedSerializer interface {
DecoderToVersion(serializer Decoder, gv GroupVersioner) Decoder
}
// ClientNegotiator handles turning an HTTP content type into the appropriate encoder.
// Use NewClientNegotiator or NewVersionedClientNegotiator to create this interface from
// a NegotiatedSerializer.
type ClientNegotiator interface {
// Encoder returns the appropriate encoder for the provided contentType (e.g. application/json)
// and any optional mediaType parameters (e.g. pretty=1), or an error. If no serializer is found
// a NegotiateError will be returned. The current client implementations consider params to be
// optional modifiers to the contentType and will ignore unrecognized parameters.
Encoder(contentType string, params map[string]string) (Encoder, error)
// Decoder returns the appropriate decoder for the provided contentType (e.g. application/json)
// and any optional mediaType parameters (e.g. pretty=1), or an error. If no serializer is found
// a NegotiateError will be returned. The current client implementations consider params to be
// optional modifiers to the contentType and will ignore unrecognized parameters.
Decoder(contentType string, params map[string]string) (Decoder, error)
// StreamDecoder returns the appropriate stream decoder for the provided contentType (e.g.
// application/json) and any optional mediaType parameters (e.g. pretty=1), or an error. If no
// serializer is found a NegotiateError will be returned. The Serializer and Framer will always
// be returned if a Decoder is returned. The current client implementations consider params to be
// optional modifiers to the contentType and will ignore unrecognized parameters.
StreamDecoder(contentType string, params map[string]string) (Decoder, Serializer, Framer, error)
}
// StorageSerializer is an interface used for obtaining encoders, decoders, and serializers
// that can read and write data at rest. This would commonly be used by client tools that must
// read files, or server side storage interfaces that persist restful objects.

View File

@ -0,0 +1,146 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package runtime
import (
"fmt"
"k8s.io/apimachinery/pkg/runtime/schema"
)
// NegotiateError is returned when a ClientNegotiator is unable to locate
// a serializer for the requested operation.
type NegotiateError struct {
ContentType string
Stream bool
}
func (e NegotiateError) Error() string {
if e.Stream {
return fmt.Sprintf("no stream serializers registered for %s", e.ContentType)
}
return fmt.Sprintf("no serializers registered for %s", e.ContentType)
}
type clientNegotiator struct {
serializer NegotiatedSerializer
encode, decode GroupVersioner
}
func (n *clientNegotiator) Encoder(contentType string, params map[string]string) (Encoder, error) {
// TODO: `pretty=1` is handled in NegotiateOutputMediaType, consider moving it to this method
// if client negotiators truly need to use it
mediaTypes := n.serializer.SupportedMediaTypes()
info, ok := SerializerInfoForMediaType(mediaTypes, contentType)
if !ok {
if len(contentType) != 0 || len(mediaTypes) == 0 {
return nil, NegotiateError{ContentType: contentType}
}
info = mediaTypes[0]
}
return n.serializer.EncoderForVersion(info.Serializer, n.encode), nil
}
func (n *clientNegotiator) Decoder(contentType string, params map[string]string) (Decoder, error) {
mediaTypes := n.serializer.SupportedMediaTypes()
info, ok := SerializerInfoForMediaType(mediaTypes, contentType)
if !ok {
if len(contentType) != 0 || len(mediaTypes) == 0 {
return nil, NegotiateError{ContentType: contentType}
}
info = mediaTypes[0]
}
return n.serializer.DecoderToVersion(info.Serializer, n.decode), nil
}
func (n *clientNegotiator) StreamDecoder(contentType string, params map[string]string) (Decoder, Serializer, Framer, error) {
mediaTypes := n.serializer.SupportedMediaTypes()
info, ok := SerializerInfoForMediaType(mediaTypes, contentType)
if !ok {
if len(contentType) != 0 || len(mediaTypes) == 0 {
return nil, nil, nil, NegotiateError{ContentType: contentType, Stream: true}
}
info = mediaTypes[0]
}
if info.StreamSerializer == nil {
return nil, nil, nil, NegotiateError{ContentType: info.MediaType, Stream: true}
}
return n.serializer.DecoderToVersion(info.Serializer, n.decode), info.StreamSerializer.Serializer, info.StreamSerializer.Framer, nil
}
// NewClientNegotiator will attempt to retrieve the appropriate encoder, decoder, or
// stream decoder for a given content type. Does not perform any conversion, but will
// encode the object to the desired group, version, and kind. Use when creating a client.
func NewClientNegotiator(serializer NegotiatedSerializer, gv schema.GroupVersion) ClientNegotiator {
return &clientNegotiator{
serializer: serializer,
encode: gv,
}
}
// NewInternalClientNegotiator applies the default client rules for connecting to a Kubernetes apiserver
// where objects are converted to gv prior to sending and decoded to their internal representation prior
// to retrieval.
//
// DEPRECATED: Internal clients are deprecated and will be removed in a future Kubernetes release.
func NewInternalClientNegotiator(serializer NegotiatedSerializer, gv schema.GroupVersion) ClientNegotiator {
decode := schema.GroupVersions{
{
Group: gv.Group,
Version: APIVersionInternal,
},
// always include the legacy group as a decoding target to handle non-error `Status` return types
{
Group: "",
Version: APIVersionInternal,
},
}
return &clientNegotiator{
encode: gv,
decode: decode,
serializer: serializer,
}
}
// NewSimpleClientNegotiator will negotiate for a single serializer. This should only be used
// for testing or when the caller is taking responsibility for setting the GVK on encoded objects.
func NewSimpleClientNegotiator(info SerializerInfo, gv schema.GroupVersion) ClientNegotiator {
return &clientNegotiator{
serializer: &simpleNegotiatedSerializer{info: info},
encode: gv,
}
}
type simpleNegotiatedSerializer struct {
info SerializerInfo
}
func NewSimpleNegotiatedSerializer(info SerializerInfo) NegotiatedSerializer {
return &simpleNegotiatedSerializer{info: info}
}
func (n *simpleNegotiatedSerializer) SupportedMediaTypes() []SerializerInfo {
return []SerializerInfo{n.info}
}
func (n *simpleNegotiatedSerializer) EncoderForVersion(e Encoder, _ GroupVersioner) Encoder {
return e
}
func (n *simpleNegotiatedSerializer) DecoderToVersion(d Decoder, _gv GroupVersioner) Decoder {
return d
}

View File

@ -34,7 +34,6 @@ import (
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
@ -659,38 +658,20 @@ func TestWatchHTTPDynamicClientErrors(t *testing.T) {
watchServer.ServeHTTP(w, req)
}))
defer s.Close()
defer s.CloseClientConnections()
client := dynamic.NewForConfigOrDie(&restclient.Config{
Host: s.URL,
APIPath: "/" + prefix,
}).Resource(newGroupVersion.WithResource("simple"))
w, err := client.Watch(metav1.ListOptions{})
if err != nil {
_, err := client.Watch(metav1.ListOptions{})
if err == nil {
t.Fatal(err)
}
errStatus := errors.NewInternalError(fmt.Errorf("we got an error")).Status()
watcher.Error(&errStatus)
watcher.Stop()
got := <-w.ResultChan()
if got.Type != watch.Error {
t.Fatalf("unexpected watch type: %#v", got)
if err.Error() != "no stream serializers registered for testcase/json" {
t.Fatalf("unexpected error: %v", err)
}
obj, ok := got.Object.(*unstructured.Unstructured)
if !ok {
t.Fatalf("not the correct object type: %#v", got)
}
status := &metav1.Status{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), status); err != nil {
t.Fatal(err)
}
if status.Kind != "Status" || status.APIVersion != "v1" || status.Code != 500 || status.Status != "Failure" || !strings.Contains(status.Message, "we got an error") {
t.Fatalf("error: %#v", status)
}
t.Logf("status: %#v", status)
}
func TestWatchHTTPTimeout(t *testing.T) {

View File

@ -592,7 +592,7 @@ func TestHelperReplace(t *testing.T) {
Req: expectPut,
},
}
for i, tt := range tests {
for _, tt := range tests {
t.Run(tt.Name, func(t *testing.T) {
client := &fake.RESTClient{
GroupVersion: corev1GV,
@ -607,24 +607,24 @@ func TestHelperReplace(t *testing.T) {
}
_, err := modifier.Replace(tt.Namespace, "foo", tt.Overwrite, tt.Object)
if (err != nil) != tt.Err {
t.Errorf("%d: unexpected error: %t %v", i, tt.Err, err)
t.Fatalf("unexpected error: %t %v", tt.Err, err)
}
if err != nil {
return
}
if tt.Req != nil && !tt.Req(tt.ExpectPath, client.Req) {
t.Errorf("%d: unexpected request: %#v", i, client.Req)
if tt.Req != nil && (client.Req == nil || !tt.Req(tt.ExpectPath, client.Req)) {
t.Fatalf("unexpected request: %#v", client.Req)
}
body, err := ioutil.ReadAll(client.Req.Body)
if err != nil {
t.Fatalf("%d: unexpected error: %#v", i, err)
t.Fatalf("unexpected error: %#v", err)
}
expect := []byte{}
if tt.ExpectObject != nil {
expect = []byte(runtime.EncodeOrDie(corev1Codec, tt.ExpectObject))
}
if !reflect.DeepEqual(expect, body) {
t.Errorf("%d: unexpected body: %s", i, string(body))
t.Fatalf("unexpected body: %s", string(body))
}
})
}

View File

@ -9,7 +9,6 @@ filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//staging/src/k8s.io/client-go/deprecated-dynamic:all-srcs",
"//staging/src/k8s.io/client-go/discovery:all-srcs",
"//staging/src/k8s.io/client-go/dynamic:all-srcs",
"//staging/src/k8s.io/client-go/examples/create-update-delete-deployment:all-srcs",

View File

@ -1,54 +0,0 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"client.go",
"client_pool.go",
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/client-go/deprecated-dynamic",
importpath = "k8s.io/client-go/deprecated-dynamic",
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["client_test.go"],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer/streaming:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/rest/watch:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -1,131 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package dynamic provides a client interface to arbitrary Kubernetes
// APIs that exposes common high level operations and exposes common
// metadata.
package deprecated_dynamic
import (
"strings"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
restclient "k8s.io/client-go/rest"
)
// Interface is a Kubernetes client that allows you to access metadata
// and manipulate metadata of a Kubernetes API group.
type Interface interface {
// Resource returns an API interface to the specified resource for this client's
// group and version. If resource is not a namespaced resource, then namespace
// is ignored. The ResourceInterface inherits the parameter codec of this client.
Resource(resource *metav1.APIResource, namespace string) ResourceInterface
}
// ResourceInterface is an API interface to a specific resource under a
// dynamic client.
type ResourceInterface interface {
// List returns a list of objects for this resource.
List(opts metav1.ListOptions) (runtime.Object, error)
// Get gets the resource with the specified name.
Get(name string, opts metav1.GetOptions) (*unstructured.Unstructured, error)
// Delete deletes the resource with the specified name.
Delete(name string, opts *metav1.DeleteOptions) error
// DeleteCollection deletes a collection of objects.
DeleteCollection(deleteOptions *metav1.DeleteOptions, listOptions metav1.ListOptions) error
// Create creates the provided resource.
Create(obj *unstructured.Unstructured) (*unstructured.Unstructured, error)
// Update updates the provided resource.
Update(obj *unstructured.Unstructured) (*unstructured.Unstructured, error)
// Watch returns a watch.Interface that watches the resource.
Watch(opts metav1.ListOptions) (watch.Interface, error)
// Patch patches the provided resource.
Patch(name string, pt types.PatchType, data []byte) (*unstructured.Unstructured, error)
}
// Client is a Kubernetes client that allows you to access metadata
// and manipulate metadata of a Kubernetes API group, and implements Interface.
type Client struct {
version schema.GroupVersion
delegate dynamic.Interface
}
// NewClient returns a new client based on the passed in config. The
// codec is ignored, as the dynamic client uses it's own codec.
func NewClient(conf *restclient.Config, version schema.GroupVersion) (*Client, error) {
delegate, err := dynamic.NewForConfig(conf)
if err != nil {
return nil, err
}
return &Client{version: version, delegate: delegate}, nil
}
// Resource returns an API interface to the specified resource for this client's
// group and version. If resource is not a namespaced resource, then namespace
// is ignored. The ResourceInterface inherits the parameter codec of c.
func (c *Client) Resource(resource *metav1.APIResource, namespace string) ResourceInterface {
resourceTokens := strings.SplitN(resource.Name, "/", 2)
subresources := []string{}
if len(resourceTokens) > 1 {
subresources = strings.Split(resourceTokens[1], "/")
}
if len(namespace) == 0 {
return oldResourceShim(c.delegate.Resource(c.version.WithResource(resourceTokens[0])), subresources)
}
return oldResourceShim(c.delegate.Resource(c.version.WithResource(resourceTokens[0])).Namespace(namespace), subresources)
}
// the old interfaces used the wrong type for lists. this fixes that
func oldResourceShim(in dynamic.ResourceInterface, subresources []string) ResourceInterface {
return oldResourceShimType{ResourceInterface: in, subresources: subresources}
}
type oldResourceShimType struct {
dynamic.ResourceInterface
subresources []string
}
func (s oldResourceShimType) Create(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
return s.ResourceInterface.Create(obj, metav1.CreateOptions{}, s.subresources...)
}
func (s oldResourceShimType) Update(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
return s.ResourceInterface.Update(obj, metav1.UpdateOptions{}, s.subresources...)
}
func (s oldResourceShimType) Delete(name string, opts *metav1.DeleteOptions) error {
return s.ResourceInterface.Delete(name, opts, s.subresources...)
}
func (s oldResourceShimType) Get(name string, opts metav1.GetOptions) (*unstructured.Unstructured, error) {
return s.ResourceInterface.Get(name, opts, s.subresources...)
}
func (s oldResourceShimType) List(opts metav1.ListOptions) (runtime.Object, error) {
return s.ResourceInterface.List(opts)
}
func (s oldResourceShimType) Patch(name string, pt types.PatchType, data []byte) (*unstructured.Unstructured, error) {
return s.ResourceInterface.Patch(name, pt, data, metav1.PatchOptions{}, s.subresources...)
}

View File

@ -1,122 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package deprecated_dynamic
import (
"sync"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
restclient "k8s.io/client-go/rest"
)
// ClientPool manages a pool of dynamic clients.
type ClientPool interface {
// ClientForGroupVersionResource returns a client configured for the specified groupVersionResource.
// Resource may be empty.
ClientForGroupVersionResource(resource schema.GroupVersionResource) (Interface, error)
// ClientForGroupVersionKind returns a client configured for the specified groupVersionKind.
// Kind may be empty.
ClientForGroupVersionKind(kind schema.GroupVersionKind) (Interface, error)
}
// APIPathResolverFunc knows how to convert a groupVersion to its API path. The Kind field is
// optional.
type APIPathResolverFunc func(kind schema.GroupVersionKind) string
// LegacyAPIPathResolverFunc can resolve paths properly with the legacy API.
func LegacyAPIPathResolverFunc(kind schema.GroupVersionKind) string {
if len(kind.Group) == 0 {
return "/api"
}
return "/apis"
}
// clientPoolImpl implements ClientPool and caches clients for the resource group versions
// is asked to retrieve. This type is thread safe.
type clientPoolImpl struct {
lock sync.RWMutex
config *restclient.Config
clients map[schema.GroupVersion]*Client
apiPathResolverFunc APIPathResolverFunc
mapper meta.RESTMapper
}
// NewClientPool returns a ClientPool from the specified config. It reuses clients for the same
// group version. It is expected this type may be wrapped by specific logic that special cases certain
// resources or groups.
func NewClientPool(config *restclient.Config, mapper meta.RESTMapper, apiPathResolverFunc APIPathResolverFunc) ClientPool {
confCopy := *config
return &clientPoolImpl{
config: &confCopy,
clients: map[schema.GroupVersion]*Client{},
apiPathResolverFunc: apiPathResolverFunc,
mapper: mapper,
}
}
// Instantiates a new dynamic client pool with the given config.
func NewDynamicClientPool(cfg *restclient.Config) ClientPool {
// restMapper is not needed when using LegacyAPIPathResolverFunc
emptyMapper := meta.MultiRESTMapper{}
return NewClientPool(cfg, emptyMapper, LegacyAPIPathResolverFunc)
}
// ClientForGroupVersionResource uses the provided RESTMapper to identify the appropriate resource. Resource may
// be empty. If no matching kind is found the underlying client for that group is still returned.
func (c *clientPoolImpl) ClientForGroupVersionResource(resource schema.GroupVersionResource) (Interface, error) {
kinds, err := c.mapper.KindsFor(resource)
if err != nil {
if meta.IsNoMatchError(err) {
return c.ClientForGroupVersionKind(schema.GroupVersionKind{Group: resource.Group, Version: resource.Version})
}
return nil, err
}
return c.ClientForGroupVersionKind(kinds[0])
}
// ClientForGroupVersion returns a client for the specified groupVersion, creates one if none exists. Kind
// in the GroupVersionKind may be empty.
func (c *clientPoolImpl) ClientForGroupVersionKind(kind schema.GroupVersionKind) (Interface, error) {
c.lock.Lock()
defer c.lock.Unlock()
gv := kind.GroupVersion()
// do we have a client already configured?
if existingClient, found := c.clients[gv]; found {
return existingClient, nil
}
// avoid changing the original config
confCopy := *c.config
conf := &confCopy
// we need to set the api path based on group version, if no group, default to legacy path
conf.APIPath = c.apiPathResolverFunc(kind)
// we need to make a client
conf.GroupVersion = &gv
dynamicClient, err := NewClient(conf, gv)
if err != nil {
return nil, err
}
c.clients[gv] = dynamicClient
return dynamicClient, nil
}

View File

@ -1,624 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package deprecated_dynamic
import (
"bytes"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"reflect"
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
restclient "k8s.io/client-go/rest"
restclientwatch "k8s.io/client-go/rest/watch"
)
func getJSON(version, kind, name string) []byte {
return []byte(fmt.Sprintf(`{"apiVersion": %q, "kind": %q, "metadata": {"name": %q}}`, version, kind, name))
}
func getListJSON(version, kind string, items ...[]byte) []byte {
json := fmt.Sprintf(`{"apiVersion": %q, "kind": %q, "items": [%s]}`,
version, kind, bytes.Join(items, []byte(",")))
return []byte(json)
}
func getObject(version, kind, name string) *unstructured.Unstructured {
return &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": version,
"kind": kind,
"metadata": map[string]interface{}{
"name": name,
},
},
}
}
func getClientServer(gv *schema.GroupVersion, h func(http.ResponseWriter, *http.Request)) (Interface, *httptest.Server, error) {
srv := httptest.NewServer(http.HandlerFunc(h))
cl, err := NewClient(&restclient.Config{
Host: srv.URL,
ContentConfig: restclient.ContentConfig{GroupVersion: gv},
}, *gv)
if err != nil {
srv.Close()
return nil, nil, err
}
return cl, srv, nil
}
func TestList(t *testing.T) {
tcs := []struct {
name string
namespace string
path string
resp []byte
want *unstructured.UnstructuredList
}{
{
name: "normal_list",
path: "/apis/gtest/vtest/rtest",
resp: getListJSON("vTest", "rTestList",
getJSON("vTest", "rTest", "item1"),
getJSON("vTest", "rTest", "item2")),
want: &unstructured.UnstructuredList{
Object: map[string]interface{}{
"apiVersion": "vTest",
"kind": "rTestList",
},
Items: []unstructured.Unstructured{
*getObject("vTest", "rTest", "item1"),
*getObject("vTest", "rTest", "item2"),
},
},
},
{
name: "namespaced_list",
namespace: "nstest",
path: "/apis/gtest/vtest/namespaces/nstest/rtest",
resp: getListJSON("vTest", "rTestList",
getJSON("vTest", "rTest", "item1"),
getJSON("vTest", "rTest", "item2")),
want: &unstructured.UnstructuredList{
Object: map[string]interface{}{
"apiVersion": "vTest",
"kind": "rTestList",
},
Items: []unstructured.Unstructured{
*getObject("vTest", "rTest", "item1"),
*getObject("vTest", "rTest", "item2"),
},
},
},
}
for _, tc := range tcs {
gv := &schema.GroupVersion{Group: "gtest", Version: "vtest"}
resource := &metav1.APIResource{Name: "rtest", Namespaced: len(tc.namespace) != 0}
cl, srv, err := getClientServer(gv, func(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
t.Errorf("List(%q) got HTTP method %s. wanted GET", tc.name, r.Method)
}
if r.URL.Path != tc.path {
t.Errorf("List(%q) got path %s. wanted %s", tc.name, r.URL.Path, tc.path)
}
w.Header().Set("Content-Type", runtime.ContentTypeJSON)
w.Write(tc.resp)
})
if err != nil {
t.Errorf("unexpected error when creating client: %v", err)
continue
}
defer srv.Close()
got, err := cl.Resource(resource, tc.namespace).List(metav1.ListOptions{})
if err != nil {
t.Errorf("unexpected error when listing %q: %v", tc.name, err)
continue
}
if !reflect.DeepEqual(got, tc.want) {
t.Errorf("List(%q) want: %v\ngot: %v", tc.name, tc.want, got)
}
}
}
func TestGet(t *testing.T) {
tcs := []struct {
resource string
namespace string
name string
path string
resp []byte
want *unstructured.Unstructured
}{
{
resource: "rtest",
name: "normal_get",
path: "/apis/gtest/vtest/rtest/normal_get",
resp: getJSON("vTest", "rTest", "normal_get"),
want: getObject("vTest", "rTest", "normal_get"),
},
{
resource: "rtest",
namespace: "nstest",
name: "namespaced_get",
path: "/apis/gtest/vtest/namespaces/nstest/rtest/namespaced_get",
resp: getJSON("vTest", "rTest", "namespaced_get"),
want: getObject("vTest", "rTest", "namespaced_get"),
},
{
resource: "rtest/srtest",
name: "normal_subresource_get",
path: "/apis/gtest/vtest/rtest/normal_subresource_get/srtest",
resp: getJSON("vTest", "srTest", "normal_subresource_get"),
want: getObject("vTest", "srTest", "normal_subresource_get"),
},
{
resource: "rtest/srtest",
namespace: "nstest",
name: "namespaced_subresource_get",
path: "/apis/gtest/vtest/namespaces/nstest/rtest/namespaced_subresource_get/srtest",
resp: getJSON("vTest", "srTest", "namespaced_subresource_get"),
want: getObject("vTest", "srTest", "namespaced_subresource_get"),
},
}
for _, tc := range tcs {
gv := &schema.GroupVersion{Group: "gtest", Version: "vtest"}
resource := &metav1.APIResource{Name: tc.resource, Namespaced: len(tc.namespace) != 0}
cl, srv, err := getClientServer(gv, func(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
t.Errorf("Get(%q) got HTTP method %s. wanted GET", tc.name, r.Method)
}
if r.URL.Path != tc.path {
t.Errorf("Get(%q) got path %s. wanted %s", tc.name, r.URL.Path, tc.path)
}
w.Header().Set("Content-Type", runtime.ContentTypeJSON)
w.Write(tc.resp)
})
if err != nil {
t.Errorf("unexpected error when creating client: %v", err)
continue
}
defer srv.Close()
got, err := cl.Resource(resource, tc.namespace).Get(tc.name, metav1.GetOptions{})
if err != nil {
t.Errorf("unexpected error when getting %q: %v", tc.name, err)
continue
}
if !reflect.DeepEqual(got, tc.want) {
t.Errorf("Get(%q) want: %v\ngot: %v", tc.name, tc.want, got)
}
}
}
func TestDelete(t *testing.T) {
background := metav1.DeletePropagationBackground
uid := types.UID("uid")
statusOK := &metav1.Status{
TypeMeta: metav1.TypeMeta{Kind: "Status"},
Status: metav1.StatusSuccess,
}
tcs := []struct {
namespace string
name string
path string
deleteOptions *metav1.DeleteOptions
}{
{
name: "normal_delete",
path: "/apis/gtest/vtest/rtest/normal_delete",
},
{
namespace: "nstest",
name: "namespaced_delete",
path: "/apis/gtest/vtest/namespaces/nstest/rtest/namespaced_delete",
},
{
namespace: "nstest",
name: "namespaced_delete_with_options",
path: "/apis/gtest/vtest/namespaces/nstest/rtest/namespaced_delete_with_options",
deleteOptions: &metav1.DeleteOptions{Preconditions: &metav1.Preconditions{UID: &uid}, PropagationPolicy: &background},
},
}
for _, tc := range tcs {
gv := &schema.GroupVersion{Group: "gtest", Version: "vtest"}
resource := &metav1.APIResource{Name: "rtest", Namespaced: len(tc.namespace) != 0}
cl, srv, err := getClientServer(gv, func(w http.ResponseWriter, r *http.Request) {
if r.Method != "DELETE" {
t.Errorf("Delete(%q) got HTTP method %s. wanted DELETE", tc.name, r.Method)
}
if r.URL.Path != tc.path {
t.Errorf("Delete(%q) got path %s. wanted %s", tc.name, r.URL.Path, tc.path)
}
w.Header().Set("Content-Type", runtime.ContentTypeJSON)
unstructured.UnstructuredJSONScheme.Encode(statusOK, w)
})
if err != nil {
t.Errorf("unexpected error when creating client: %v", err)
continue
}
defer srv.Close()
err = cl.Resource(resource, tc.namespace).Delete(tc.name, tc.deleteOptions)
if err != nil {
t.Errorf("unexpected error when deleting %q: %v", tc.name, err)
continue
}
}
}
func TestDeleteCollection(t *testing.T) {
statusOK := &metav1.Status{
TypeMeta: metav1.TypeMeta{Kind: "Status"},
Status: metav1.StatusSuccess,
}
tcs := []struct {
namespace string
name string
path string
}{
{
name: "normal_delete_collection",
path: "/apis/gtest/vtest/rtest",
},
{
namespace: "nstest",
name: "namespaced_delete_collection",
path: "/apis/gtest/vtest/namespaces/nstest/rtest",
},
}
for _, tc := range tcs {
gv := &schema.GroupVersion{Group: "gtest", Version: "vtest"}
resource := &metav1.APIResource{Name: "rtest", Namespaced: len(tc.namespace) != 0}
cl, srv, err := getClientServer(gv, func(w http.ResponseWriter, r *http.Request) {
if r.Method != "DELETE" {
t.Errorf("DeleteCollection(%q) got HTTP method %s. wanted DELETE", tc.name, r.Method)
}
if r.URL.Path != tc.path {
t.Errorf("DeleteCollection(%q) got path %s. wanted %s", tc.name, r.URL.Path, tc.path)
}
w.Header().Set("Content-Type", runtime.ContentTypeJSON)
unstructured.UnstructuredJSONScheme.Encode(statusOK, w)
})
if err != nil {
t.Errorf("unexpected error when creating client: %v", err)
continue
}
defer srv.Close()
err = cl.Resource(resource, tc.namespace).DeleteCollection(nil, metav1.ListOptions{})
if err != nil {
t.Errorf("unexpected error when deleting collection %q: %v", tc.name, err)
continue
}
}
}
func TestCreate(t *testing.T) {
tcs := []struct {
resource string
name string
namespace string
obj *unstructured.Unstructured
path string
}{
{
resource: "rtest",
name: "normal_create",
path: "/apis/gtest/vtest/rtest",
obj: getObject("gtest/vTest", "rTest", "normal_create"),
},
{
resource: "rtest",
name: "namespaced_create",
namespace: "nstest",
path: "/apis/gtest/vtest/namespaces/nstest/rtest",
obj: getObject("gtest/vTest", "rTest", "namespaced_create"),
},
}
for _, tc := range tcs {
gv := &schema.GroupVersion{Group: "gtest", Version: "vtest"}
resource := &metav1.APIResource{Name: tc.resource, Namespaced: len(tc.namespace) != 0}
cl, srv, err := getClientServer(gv, func(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
t.Errorf("Create(%q) got HTTP method %s. wanted POST", tc.name, r.Method)
}
if r.URL.Path != tc.path {
t.Errorf("Create(%q) got path %s. wanted %s", tc.name, r.URL.Path, tc.path)
}
w.Header().Set("Content-Type", runtime.ContentTypeJSON)
data, err := ioutil.ReadAll(r.Body)
if err != nil {
t.Errorf("Create(%q) unexpected error reading body: %v", tc.name, err)
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Write(data)
})
if err != nil {
t.Errorf("unexpected error when creating client: %v", err)
continue
}
defer srv.Close()
got, err := cl.Resource(resource, tc.namespace).Create(tc.obj)
if err != nil {
t.Errorf("unexpected error when creating %q: %v", tc.name, err)
continue
}
if !reflect.DeepEqual(got, tc.obj) {
t.Errorf("Create(%q) want: %v\ngot: %v", tc.name, tc.obj, got)
}
}
}
func TestUpdate(t *testing.T) {
tcs := []struct {
resource string
name string
namespace string
obj *unstructured.Unstructured
path string
}{
{
resource: "rtest",
name: "normal_update",
path: "/apis/gtest/vtest/rtest/normal_update",
obj: getObject("gtest/vTest", "rTest", "normal_update"),
},
{
resource: "rtest",
name: "namespaced_update",
namespace: "nstest",
path: "/apis/gtest/vtest/namespaces/nstest/rtest/namespaced_update",
obj: getObject("gtest/vTest", "rTest", "namespaced_update"),
},
{
resource: "rtest/srtest",
name: "normal_subresource_update",
path: "/apis/gtest/vtest/rtest/normal_update/srtest",
obj: getObject("gtest/vTest", "srTest", "normal_update"),
},
{
resource: "rtest/srtest",
name: "namespaced_subresource_update",
namespace: "nstest",
path: "/apis/gtest/vtest/namespaces/nstest/rtest/namespaced_update/srtest",
obj: getObject("gtest/vTest", "srTest", "namespaced_update"),
},
}
for _, tc := range tcs {
gv := &schema.GroupVersion{Group: "gtest", Version: "vtest"}
resource := &metav1.APIResource{Name: tc.resource, Namespaced: len(tc.namespace) != 0}
cl, srv, err := getClientServer(gv, func(w http.ResponseWriter, r *http.Request) {
if r.Method != "PUT" {
t.Errorf("Update(%q) got HTTP method %s. wanted PUT", tc.name, r.Method)
}
if r.URL.Path != tc.path {
t.Errorf("Update(%q) got path %s. wanted %s", tc.name, r.URL.Path, tc.path)
}
w.Header().Set("Content-Type", runtime.ContentTypeJSON)
data, err := ioutil.ReadAll(r.Body)
if err != nil {
t.Errorf("Update(%q) unexpected error reading body: %v", tc.name, err)
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Write(data)
})
if err != nil {
t.Errorf("unexpected error when creating client: %v", err)
continue
}
defer srv.Close()
got, err := cl.Resource(resource, tc.namespace).Update(tc.obj)
if err != nil {
t.Errorf("unexpected error when updating %q: %v", tc.name, err)
continue
}
if !reflect.DeepEqual(got, tc.obj) {
t.Errorf("Update(%q) want: %v\ngot: %v", tc.name, tc.obj, got)
}
}
}
func TestWatch(t *testing.T) {
tcs := []struct {
name string
namespace string
events []watch.Event
path string
query string
}{
{
name: "normal_watch",
path: "/apis/gtest/vtest/rtest",
query: "watch=true",
events: []watch.Event{
{Type: watch.Added, Object: getObject("gtest/vTest", "rTest", "normal_watch")},
{Type: watch.Modified, Object: getObject("gtest/vTest", "rTest", "normal_watch")},
{Type: watch.Deleted, Object: getObject("gtest/vTest", "rTest", "normal_watch")},
},
},
{
name: "namespaced_watch",
namespace: "nstest",
path: "/apis/gtest/vtest/namespaces/nstest/rtest",
query: "watch=true",
events: []watch.Event{
{Type: watch.Added, Object: getObject("gtest/vTest", "rTest", "namespaced_watch")},
{Type: watch.Modified, Object: getObject("gtest/vTest", "rTest", "namespaced_watch")},
{Type: watch.Deleted, Object: getObject("gtest/vTest", "rTest", "namespaced_watch")},
},
},
}
for _, tc := range tcs {
gv := &schema.GroupVersion{Group: "gtest", Version: "vtest"}
resource := &metav1.APIResource{Name: "rtest", Namespaced: len(tc.namespace) != 0}
cl, srv, err := getClientServer(gv, func(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
t.Errorf("Watch(%q) got HTTP method %s. wanted GET", tc.name, r.Method)
}
if r.URL.Path != tc.path {
t.Errorf("Watch(%q) got path %s. wanted %s", tc.name, r.URL.Path, tc.path)
}
if r.URL.RawQuery != tc.query {
t.Errorf("Watch(%q) got query %s. wanted %s", tc.name, r.URL.RawQuery, tc.query)
}
codec := unstructured.UnstructuredJSONScheme
enc := restclientwatch.NewEncoder(streaming.NewEncoder(w, codec), codec)
for _, e := range tc.events {
enc.Encode(&e)
}
})
if err != nil {
t.Errorf("unexpected error when creating client: %v", err)
continue
}
defer srv.Close()
watcher, err := cl.Resource(resource, tc.namespace).Watch(metav1.ListOptions{})
if err != nil {
t.Errorf("unexpected error when watching %q: %v", tc.name, err)
continue
}
for _, want := range tc.events {
got := <-watcher.ResultChan()
if !reflect.DeepEqual(got, want) {
t.Errorf("Watch(%q) want: %v\ngot: %v", tc.name, want, got)
}
}
}
}
func TestPatch(t *testing.T) {
tcs := []struct {
resource string
name string
namespace string
patch []byte
want *unstructured.Unstructured
path string
}{
{
resource: "rtest",
name: "normal_patch",
path: "/apis/gtest/vtest/rtest/normal_patch",
patch: getJSON("gtest/vTest", "rTest", "normal_patch"),
want: getObject("gtest/vTest", "rTest", "normal_patch"),
},
{
resource: "rtest",
name: "namespaced_patch",
namespace: "nstest",
path: "/apis/gtest/vtest/namespaces/nstest/rtest/namespaced_patch",
patch: getJSON("gtest/vTest", "rTest", "namespaced_patch"),
want: getObject("gtest/vTest", "rTest", "namespaced_patch"),
},
{
resource: "rtest/srtest",
name: "normal_subresource_patch",
path: "/apis/gtest/vtest/rtest/normal_subresource_patch/srtest",
patch: getJSON("gtest/vTest", "srTest", "normal_subresource_patch"),
want: getObject("gtest/vTest", "srTest", "normal_subresource_patch"),
},
{
resource: "rtest/srtest",
name: "namespaced_subresource_patch",
namespace: "nstest",
path: "/apis/gtest/vtest/namespaces/nstest/rtest/namespaced_subresource_patch/srtest",
patch: getJSON("gtest/vTest", "srTest", "namespaced_subresource_patch"),
want: getObject("gtest/vTest", "srTest", "namespaced_subresource_patch"),
},
}
for _, tc := range tcs {
gv := &schema.GroupVersion{Group: "gtest", Version: "vtest"}
resource := &metav1.APIResource{Name: tc.resource, Namespaced: len(tc.namespace) != 0}
cl, srv, err := getClientServer(gv, func(w http.ResponseWriter, r *http.Request) {
if r.Method != "PATCH" {
t.Errorf("Patch(%q) got HTTP method %s. wanted PATCH", tc.name, r.Method)
}
if r.URL.Path != tc.path {
t.Errorf("Patch(%q) got path %s. wanted %s", tc.name, r.URL.Path, tc.path)
}
content := r.Header.Get("Content-Type")
if content != string(types.StrategicMergePatchType) {
t.Errorf("Patch(%q) got Content-Type %s. wanted %s", tc.name, content, types.StrategicMergePatchType)
}
data, err := ioutil.ReadAll(r.Body)
if err != nil {
t.Errorf("Patch(%q) unexpected error reading body: %v", tc.name, err)
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(data)
})
if err != nil {
t.Errorf("unexpected error when creating client: %v", err)
continue
}
defer srv.Close()
got, err := cl.Resource(resource, tc.namespace).Patch(tc.name, types.StrategicMergePatchType, tc.patch)
if err != nil {
t.Errorf("unexpected error when patching %q: %v", tc.name, err)
continue
}
if !reflect.DeepEqual(got, tc.want) {
t.Errorf("Patch(%q) want: %v\ngot: %v", tc.name, tc.want, got)
}
}
}

View File

@ -40,8 +40,6 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer/streaming:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer/versioning:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",

View File

@ -537,6 +537,8 @@ func TestWatch(t *testing.T) {
t.Errorf("Watch(%q) got query %s. wanted %s", tc.name, r.URL.RawQuery, tc.query)
}
w.Header().Set("Content-Type", "application/json")
enc := restclientwatch.NewEncoder(streaming.NewEncoder(w, unstructured.UnstructuredJSONScheme), unstructured.UnstructuredJSONScheme)
for _, e := range tc.events {
enc.Encode(&e)

View File

@ -18,11 +18,11 @@ package dynamic
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/apimachinery/pkg/runtime/serializer/versioning"
)
var watchScheme = runtime.NewScheme()
@ -41,37 +41,6 @@ func init() {
metav1.AddToGroupVersion(deleteScheme, versionV1)
}
var watchJsonSerializerInfo = runtime.SerializerInfo{
MediaType: "application/json",
MediaTypeType: "application",
MediaTypeSubType: "json",
EncodesAsText: true,
Serializer: json.NewSerializer(json.DefaultMetaFactory, watchScheme, watchScheme, false),
PrettySerializer: json.NewSerializer(json.DefaultMetaFactory, watchScheme, watchScheme, true),
StreamSerializer: &runtime.StreamSerializerInfo{
EncodesAsText: true,
Serializer: json.NewSerializer(json.DefaultMetaFactory, watchScheme, watchScheme, false),
Framer: json.Framer,
},
}
// watchNegotiatedSerializer is used to read the wrapper of the watch stream
type watchNegotiatedSerializer struct{}
var watchNegotiatedSerializerInstance = watchNegotiatedSerializer{}
func (s watchNegotiatedSerializer) SupportedMediaTypes() []runtime.SerializerInfo {
return []runtime.SerializerInfo{watchJsonSerializerInfo}
}
func (s watchNegotiatedSerializer) EncoderForVersion(encoder runtime.Encoder, gv runtime.GroupVersioner) runtime.Encoder {
return versioning.NewDefaultingCodecForScheme(watchScheme, encoder, nil, gv, nil)
}
func (s watchNegotiatedSerializer) DecoderToVersion(decoder runtime.Decoder, gv runtime.GroupVersioner) runtime.Decoder {
return versioning.NewDefaultingCodecForScheme(watchScheme, nil, decoder, nil, gv)
}
// basicNegotiatedSerializer is used to handle discovery and error handling serialization
type basicNegotiatedSerializer struct{}
@ -82,8 +51,8 @@ func (s basicNegotiatedSerializer) SupportedMediaTypes() []runtime.SerializerInf
MediaTypeType: "application",
MediaTypeSubType: "json",
EncodesAsText: true,
Serializer: json.NewSerializer(json.DefaultMetaFactory, basicScheme, basicScheme, false),
PrettySerializer: json.NewSerializer(json.DefaultMetaFactory, basicScheme, basicScheme, true),
Serializer: json.NewSerializer(json.DefaultMetaFactory, unstructuredCreater{basicScheme}, unstructuredTyper{basicScheme}, false),
PrettySerializer: json.NewSerializer(json.DefaultMetaFactory, unstructuredCreater{basicScheme}, unstructuredTyper{basicScheme}, true),
StreamSerializer: &runtime.StreamSerializerInfo{
EncodesAsText: true,
Serializer: json.NewSerializer(json.DefaultMetaFactory, basicScheme, basicScheme, false),
@ -94,9 +63,46 @@ func (s basicNegotiatedSerializer) SupportedMediaTypes() []runtime.SerializerInf
}
func (s basicNegotiatedSerializer) EncoderForVersion(encoder runtime.Encoder, gv runtime.GroupVersioner) runtime.Encoder {
return versioning.NewDefaultingCodecForScheme(watchScheme, encoder, nil, gv, nil)
return runtime.WithVersionEncoder{
Version: gv,
Encoder: encoder,
ObjectTyper: unstructuredTyper{basicScheme},
}
}
func (s basicNegotiatedSerializer) DecoderToVersion(decoder runtime.Decoder, gv runtime.GroupVersioner) runtime.Decoder {
return versioning.NewDefaultingCodecForScheme(watchScheme, nil, decoder, nil, gv)
return decoder
}
type unstructuredCreater struct {
nested runtime.ObjectCreater
}
func (c unstructuredCreater) New(kind schema.GroupVersionKind) (runtime.Object, error) {
out, err := c.nested.New(kind)
if err == nil {
return out, nil
}
out = &unstructured.Unstructured{}
out.GetObjectKind().SetGroupVersionKind(kind)
return out, nil
}
type unstructuredTyper struct {
nested runtime.ObjectTyper
}
func (t unstructuredTyper) ObjectKinds(obj runtime.Object) ([]schema.GroupVersionKind, bool, error) {
kinds, unversioned, err := t.nested.ObjectKinds(obj)
if err == nil {
return kinds, unversioned, nil
}
if _, ok := obj.(runtime.Unstructured); ok && !obj.GetObjectKind().GroupVersionKind().Empty() {
return []schema.GroupVersionKind{obj.GetObjectKind().GroupVersionKind()}, false, nil
}
return nil, false, err
}
func (t unstructuredTyper) Recognizes(gvk schema.GroupVersionKind) bool {
return true
}

View File

@ -18,14 +18,12 @@ package dynamic
import (
"fmt"
"io"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/rest"
@ -282,31 +280,10 @@ func (c *dynamicResourceClient) List(opts metav1.ListOptions) (*unstructured.Uns
}
func (c *dynamicResourceClient) Watch(opts metav1.ListOptions) (watch.Interface, error) {
internalGV := schema.GroupVersions{
{Group: c.resource.Group, Version: runtime.APIVersionInternal},
// always include the legacy group as a decoding target to handle non-error `Status` return types
{Group: "", Version: runtime.APIVersionInternal},
}
s := &rest.Serializers{
Encoder: watchNegotiatedSerializerInstance.EncoderForVersion(watchJsonSerializerInfo.Serializer, c.resource.GroupVersion()),
Decoder: watchNegotiatedSerializerInstance.DecoderToVersion(watchJsonSerializerInfo.Serializer, internalGV),
RenegotiatedDecoder: func(contentType string, params map[string]string) (runtime.Decoder, error) {
return watchNegotiatedSerializerInstance.DecoderToVersion(watchJsonSerializerInfo.Serializer, internalGV), nil
},
StreamingSerializer: watchJsonSerializerInfo.StreamSerializer.Serializer,
Framer: watchJsonSerializerInfo.StreamSerializer.Framer,
}
wrappedDecoderFn := func(body io.ReadCloser) streaming.Decoder {
framer := s.Framer.NewFrameReader(body)
return streaming.NewDecoder(framer, s.StreamingSerializer)
}
opts.Watch = true
return c.client.client.Get().AbsPath(c.makeURLSegments("")...).
SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1).
WatchWithSpecificDecoders(wrappedDecoderFn, unstructured.UnstructuredJSONScheme)
Watch()
}
func (c *dynamicResourceClient) Patch(name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (*unstructured.Unstructured, error) {

View File

@ -317,7 +317,6 @@ func (c *metadataResourceClient) List(opts metav1.ListOptions) (*metav1.PartialO
if !ok {
return nil, fmt.Errorf("incoming object is incorrect type %T", obj)
}
fmt.Printf("DEBUG: %#v\n", inputList)
list := &metav1.PartialObjectMetadataList{
ListMeta: inputList.ListMeta,

View File

@ -17,8 +17,6 @@ limitations under the License.
package rest
import (
"fmt"
"mime"
"net/http"
"net/url"
"os"
@ -51,6 +49,28 @@ type Interface interface {
APIVersion() schema.GroupVersion
}
// ClientContentConfig controls how RESTClient communicates with the server.
//
// TODO: ContentConfig will be updated to accept a Negotiator instead of a
// NegotiatedSerializer and NegotiatedSerializer will be removed.
type ClientContentConfig struct {
// AcceptContentTypes specifies the types the client will accept and is optional.
// If not set, ContentType will be used to define the Accept header
AcceptContentTypes string
// ContentType specifies the wire format used to communicate with the server.
// This value will be set as the Accept header on requests made to the server if
// AcceptContentTypes is not set, and as the default content type on any object
// sent to the server. If not set, "application/json" is used.
ContentType string
// GroupVersion is the API version to talk to. Must be provided when initializing
// a RESTClient directly. When initializing a Client, will be set with the default
// code version. This is used as the default group version for VersionedParams.
GroupVersion schema.GroupVersion
// Negotiator is used for obtaining encoders and decoders for multiple
// supported media types.
Negotiator runtime.ClientNegotiator
}
// RESTClient imposes common Kubernetes API conventions on a set of resource paths.
// The baseURL is expected to point to an HTTP or HTTPS path that is the parent
// of one or more resources. The server should return a decodable API resource
@ -64,34 +84,27 @@ type RESTClient struct {
// versionedAPIPath is a path segment connecting the base URL to the resource root
versionedAPIPath string
// contentConfig is the information used to communicate with the server.
contentConfig ContentConfig
// serializers contain all serializers for underlying content type.
serializers Serializers
// content describes how a RESTClient encodes and decodes responses.
content ClientContentConfig
// creates BackoffManager that is passed to requests.
createBackoffMgr func() BackoffManager
// TODO extract this into a wrapper interface via the RESTClient interface in kubectl.
Throttle flowcontrol.RateLimiter
// rateLimiter is shared among all requests created by this client unless specifically
// overridden.
rateLimiter flowcontrol.RateLimiter
// Set specific behavior of the client. If not set http.DefaultClient will be used.
Client *http.Client
}
type Serializers struct {
Encoder runtime.Encoder
Decoder runtime.Decoder
StreamingSerializer runtime.Serializer
Framer runtime.Framer
RenegotiatedDecoder func(contentType string, params map[string]string) (runtime.Decoder, error)
}
// NewRESTClient creates a new RESTClient. This client performs generic REST functions
// such as Get, Put, Post, and Delete on specified paths. Codec controls encoding and
// decoding of responses from the server.
func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConfig, maxQPS float32, maxBurst int, rateLimiter flowcontrol.RateLimiter, client *http.Client) (*RESTClient, error) {
// such as Get, Put, Post, and Delete on specified paths.
func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ClientContentConfig, rateLimiter flowcontrol.RateLimiter, client *http.Client) (*RESTClient, error) {
if len(config.ContentType) == 0 {
config.ContentType = "application/json"
}
base := *baseURL
if !strings.HasSuffix(base.Path, "/") {
base.Path += "/"
@ -99,31 +112,14 @@ func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConf
base.RawQuery = ""
base.Fragment = ""
if config.GroupVersion == nil {
config.GroupVersion = &schema.GroupVersion{}
}
if len(config.ContentType) == 0 {
config.ContentType = "application/json"
}
serializers, err := createSerializers(config)
if err != nil {
return nil, err
}
var throttle flowcontrol.RateLimiter
if maxQPS > 0 && rateLimiter == nil {
throttle = flowcontrol.NewTokenBucketRateLimiter(maxQPS, maxBurst)
} else if rateLimiter != nil {
throttle = rateLimiter
}
return &RESTClient{
base: &base,
versionedAPIPath: versionedAPIPath,
contentConfig: config,
serializers: *serializers,
content: config,
createBackoffMgr: readExpBackoffConfig,
Throttle: throttle,
Client: client,
rateLimiter: rateLimiter,
Client: client,
}, nil
}
@ -132,7 +128,7 @@ func (c *RESTClient) GetRateLimiter() flowcontrol.RateLimiter {
if c == nil {
return nil
}
return c.Throttle
return c.rateLimiter
}
// readExpBackoffConfig handles the internal logic of determining what the
@ -153,58 +149,6 @@ func readExpBackoffConfig() BackoffManager {
time.Duration(backoffDurationInt)*time.Second)}
}
// createSerializers creates all necessary serializers for given contentType.
// TODO: the negotiated serializer passed to this method should probably return
// serializers that control decoding and versioning without this package
// being aware of the types. Depends on whether RESTClient must deal with
// generic infrastructure.
func createSerializers(config ContentConfig) (*Serializers, error) {
mediaTypes := config.NegotiatedSerializer.SupportedMediaTypes()
contentType := config.ContentType
mediaType, _, err := mime.ParseMediaType(contentType)
if err != nil {
return nil, fmt.Errorf("the content type specified in the client configuration is not recognized: %v", err)
}
info, ok := runtime.SerializerInfoForMediaType(mediaTypes, mediaType)
if !ok {
if len(contentType) != 0 || len(mediaTypes) == 0 {
return nil, fmt.Errorf("no serializers registered for %s", contentType)
}
info = mediaTypes[0]
}
internalGV := schema.GroupVersions{
{
Group: config.GroupVersion.Group,
Version: runtime.APIVersionInternal,
},
// always include the legacy group as a decoding target to handle non-error `Status` return types
{
Group: "",
Version: runtime.APIVersionInternal,
},
}
s := &Serializers{
Encoder: config.NegotiatedSerializer.EncoderForVersion(info.Serializer, *config.GroupVersion),
Decoder: config.NegotiatedSerializer.DecoderToVersion(info.Serializer, internalGV),
RenegotiatedDecoder: func(contentType string, params map[string]string) (runtime.Decoder, error) {
info, ok := runtime.SerializerInfoForMediaType(mediaTypes, contentType)
if !ok {
return nil, fmt.Errorf("serializer for %s not registered", contentType)
}
return config.NegotiatedSerializer.DecoderToVersion(info.Serializer, internalGV), nil
},
}
if info.StreamSerializer != nil {
s.StreamingSerializer = info.StreamSerializer.Serializer
s.Framer = info.StreamSerializer.Framer
}
return s, nil
}
// Verb begins a request with a verb (GET, POST, PUT, DELETE).
//
// Example usage of RESTClient's request building interface:
@ -219,12 +163,7 @@ func createSerializers(config ContentConfig) (*Serializers, error) {
// list, ok := resp.(*api.PodList)
//
func (c *RESTClient) Verb(verb string) *Request {
backoff := c.createBackoffMgr()
if c.Client == nil {
return NewRequest(nil, verb, c.base, c.versionedAPIPath, c.contentConfig, c.serializers, backoff, c.Throttle, 0)
}
return NewRequest(c.Client, verb, c.base, c.versionedAPIPath, c.contentConfig, c.serializers, backoff, c.Throttle, c.Client.Timeout)
return NewRequest(c).Verb(verb)
}
// Post begins a POST request. Short for c.Verb("POST").
@ -254,5 +193,5 @@ func (c *RESTClient) Delete() *Request {
// APIVersion returns the APIVersion this RESTClient is expected to use.
func (c *RESTClient) APIVersion() schema.GroupVersion {
return *c.contentConfig.GroupVersion
return c.content.GroupVersion
}

View File

@ -27,7 +27,7 @@ import (
"fmt"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
v1beta1 "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -57,12 +57,14 @@ func TestSerializer(t *testing.T) {
NegotiatedSerializer: scheme.Codecs.WithoutConversion(),
}
serializer, err := createSerializers(contentConfig)
n := runtime.NewClientNegotiator(contentConfig.NegotiatedSerializer, gv)
d, err := n.Decoder("application/json", nil)
if err != nil {
t.Fatal(err)
}
// bytes based on actual return from API server when encoding an "unversioned" object
obj, err := runtime.Decode(serializer.Decoder, []byte(`{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Success"}`))
obj, err := runtime.Decode(d, []byte(`{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Success"}`))
t.Log(obj)
if err != nil {
t.Fatal(err)

View File

@ -269,6 +269,9 @@ type ContentConfig struct {
GroupVersion *schema.GroupVersion
// NegotiatedSerializer is used for obtaining encoders and decoders for multiple
// supported media types.
//
// TODO: NegotiatedSerializer will be phased out as internal clients are removed
// from Kubernetes.
NegotiatedSerializer runtime.NegotiatedSerializer
}
@ -283,14 +286,6 @@ func RESTClientFor(config *Config) (*RESTClient, error) {
if config.NegotiatedSerializer == nil {
return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient")
}
qps := config.QPS
if config.QPS == 0.0 {
qps = DefaultQPS
}
burst := config.Burst
if config.Burst == 0 {
burst = DefaultBurst
}
baseURL, versionedAPIPath, err := defaultServerUrlFor(config)
if err != nil {
@ -310,7 +305,33 @@ func RESTClientFor(config *Config) (*RESTClient, error) {
}
}
return NewRESTClient(baseURL, versionedAPIPath, config.ContentConfig, qps, burst, config.RateLimiter, httpClient)
rateLimiter := config.RateLimiter
if rateLimiter == nil {
qps := config.QPS
if config.QPS == 0.0 {
qps = DefaultQPS
}
burst := config.Burst
if config.Burst == 0 {
burst = DefaultBurst
}
if qps > 0 {
rateLimiter = flowcontrol.NewTokenBucketRateLimiter(qps, burst)
}
}
var gv schema.GroupVersion
if config.GroupVersion != nil {
gv = *config.GroupVersion
}
clientContent := ClientContentConfig{
AcceptContentTypes: config.AcceptContentTypes,
ContentType: config.ContentType,
GroupVersion: gv,
Negotiator: runtime.NewClientNegotiator(config.NegotiatedSerializer, gv),
}
return NewRESTClient(baseURL, versionedAPIPath, clientContent, rateLimiter, httpClient)
}
// UnversionedRESTClientFor is the same as RESTClientFor, except that it allows
@ -338,13 +359,33 @@ func UnversionedRESTClientFor(config *Config) (*RESTClient, error) {
}
}
versionConfig := config.ContentConfig
if versionConfig.GroupVersion == nil {
v := metav1.SchemeGroupVersion
versionConfig.GroupVersion = &v
rateLimiter := config.RateLimiter
if rateLimiter == nil {
qps := config.QPS
if config.QPS == 0.0 {
qps = DefaultQPS
}
burst := config.Burst
if config.Burst == 0 {
burst = DefaultBurst
}
if qps > 0 {
rateLimiter = flowcontrol.NewTokenBucketRateLimiter(qps, burst)
}
}
return NewRESTClient(baseURL, versionedAPIPath, versionConfig, config.QPS, config.Burst, config.RateLimiter, httpClient)
gv := metav1.SchemeGroupVersion
if config.GroupVersion != nil {
gv = *config.GroupVersion
}
clientContent := ClientContentConfig{
AcceptContentTypes: config.AcceptContentTypes,
ContentType: config.ContentType,
GroupVersion: gv,
Negotiator: runtime.NewClientNegotiator(config.NegotiatedSerializer, gv),
}
return NewRESTClient(baseURL, versionedAPIPath, clientContent, rateLimiter, httpClient)
}
// SetKubernetesDefaults sets default values on the provided client config for accessing the

View File

@ -155,6 +155,59 @@ func TestRESTClientRequires(t *testing.T) {
}
}
func TestRESTClientLimiter(t *testing.T) {
testCases := []struct {
Name string
Config Config
Limiter flowcontrol.RateLimiter
}{
{
Config: Config{},
Limiter: flowcontrol.NewTokenBucketRateLimiter(5, 10),
},
{
Config: Config{QPS: 10},
Limiter: flowcontrol.NewTokenBucketRateLimiter(10, 10),
},
{
Config: Config{QPS: -1},
Limiter: nil,
},
{
Config: Config{
RateLimiter: flowcontrol.NewTokenBucketRateLimiter(11, 12),
},
Limiter: flowcontrol.NewTokenBucketRateLimiter(11, 12),
},
}
for _, testCase := range testCases {
t.Run("Versioned_"+testCase.Name, func(t *testing.T) {
config := testCase.Config
config.Host = "127.0.0.1"
config.ContentConfig = ContentConfig{GroupVersion: &v1.SchemeGroupVersion, NegotiatedSerializer: scheme.Codecs}
client, err := RESTClientFor(&config)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !reflect.DeepEqual(testCase.Limiter, client.rateLimiter) {
t.Fatalf("unexpected rate limiter: %#v", client.rateLimiter)
}
})
t.Run("Unversioned_"+testCase.Name, func(t *testing.T) {
config := testCase.Config
config.Host = "127.0.0.1"
config.ContentConfig = ContentConfig{GroupVersion: &v1.SchemeGroupVersion, NegotiatedSerializer: scheme.Codecs}
client, err := UnversionedRESTClientFor(&config)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !reflect.DeepEqual(testCase.Limiter, client.rateLimiter) {
t.Fatalf("unexpected rate limiter: %#v", client.rateLimiter)
}
})
}
}
type fakeLimiter struct {
FakeSaturation float64
FakeQPS float32

View File

@ -29,6 +29,8 @@ import (
"k8s.io/client-go/util/flowcontrol"
)
// CreateHTTPClient creates an http.Client that will invoke the provided roundTripper func
// when a request is made.
func CreateHTTPClient(roundTripper func(*http.Request) (*http.Response, error)) *http.Client {
return &http.Client{
Transport: roundTripperFunc(roundTripper),
@ -41,40 +43,49 @@ func (f roundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) {
return f(req)
}
// RESTClient provides a fake RESTClient interface.
// RESTClient provides a fake RESTClient interface. It is used to mock network
// interactions via a rest.Request, or to make them via the provided Client to
// a specific server.
type RESTClient struct {
Client *http.Client
NegotiatedSerializer runtime.NegotiatedSerializer
GroupVersion schema.GroupVersion
VersionedAPIPath string
Req *http.Request
// Err is returned when any request would be made to the server. If Err is set,
// Req will not be recorded, Resp will not be returned, and Client will not be
// invoked.
Err error
// Req is set to the last request that was executed (had the methods Do/DoRaw) invoked.
Req *http.Request
// If Client is specified, the client will be invoked instead of returning Resp if
// Err is not set.
Client *http.Client
// Resp is returned to the caller after Req is recorded, unless Err or Client are set.
Resp *http.Response
Err error
}
func (c *RESTClient) Get() *restclient.Request {
return c.request("GET")
return c.Verb("GET")
}
func (c *RESTClient) Put() *restclient.Request {
return c.request("PUT")
return c.Verb("PUT")
}
func (c *RESTClient) Patch(pt types.PatchType) *restclient.Request {
return c.request("PATCH").SetHeader("Content-Type", string(pt))
return c.Verb("PATCH").SetHeader("Content-Type", string(pt))
}
func (c *RESTClient) Post() *restclient.Request {
return c.request("POST")
return c.Verb("POST")
}
func (c *RESTClient) Delete() *restclient.Request {
return c.request("DELETE")
return c.Verb("DELETE")
}
func (c *RESTClient) Verb(verb string) *restclient.Request {
return c.request(verb)
return c.Request().Verb(verb)
}
func (c *RESTClient) APIVersion() schema.GroupVersion {
@ -85,28 +96,17 @@ func (c *RESTClient) GetRateLimiter() flowcontrol.RateLimiter {
return nil
}
func (c *RESTClient) request(verb string) *restclient.Request {
config := restclient.ContentConfig{
ContentType: runtime.ContentTypeJSON,
GroupVersion: &c.GroupVersion,
NegotiatedSerializer: c.NegotiatedSerializer,
func (c *RESTClient) Request() *restclient.Request {
config := restclient.ClientContentConfig{
ContentType: runtime.ContentTypeJSON,
GroupVersion: c.GroupVersion,
Negotiator: runtime.NewClientNegotiator(c.NegotiatedSerializer, c.GroupVersion),
}
ns := c.NegotiatedSerializer
info, _ := runtime.SerializerInfoForMediaType(ns.SupportedMediaTypes(), runtime.ContentTypeJSON)
serializers := restclient.Serializers{
// TODO this was hardcoded before, but it doesn't look right
Encoder: ns.EncoderForVersion(info.Serializer, c.GroupVersion),
Decoder: ns.DecoderToVersion(info.Serializer, c.GroupVersion),
}
if info.StreamSerializer != nil {
serializers.StreamingSerializer = info.StreamSerializer.Serializer
serializers.Framer = info.StreamSerializer.Framer
}
return restclient.NewRequest(c, verb, &url.URL{Host: "localhost"}, c.VersionedAPIPath, config, serializers, nil, nil, 0)
return restclient.NewRequestWithClient(&url.URL{Scheme: "https", Host: "localhost"}, c.VersionedAPIPath, config, CreateHTTPClient(c.do))
}
func (c *RESTClient) Do(req *http.Request) (*http.Response, error) {
// do is invoked when a Request() created by this client is executed.
func (c *RESTClient) do(req *http.Request) (*http.Response, error) {
if c.Err != nil {
return nil, c.Err
}

View File

@ -48,7 +48,8 @@ import (
var (
// longThrottleLatency defines threshold for logging requests. All requests being
// throttle for more than longThrottleLatency will be logged.
// throttled (via the provided rateLimiter) for more than longThrottleLatency will
// be logged.
longThrottleLatency = 50 * time.Millisecond
)
@ -74,19 +75,20 @@ func (r *RequestConstructionError) Error() string {
return fmt.Sprintf("request construction error: '%v'", r.Err)
}
var noBackoff = &NoBackoff{}
// Request allows for building up a request to a server in a chained fashion.
// Any errors are stored until the end of your call, so you only have to
// check once.
type Request struct {
// required
client HTTPClient
verb string
c *RESTClient
baseURL *url.URL
content ContentConfig
serializers Serializers
rateLimiter flowcontrol.RateLimiter
backoff BackoffManager
timeout time.Duration
// generic components accessible via method setters
verb string
pathPrefix string
subpath string
params url.Values
@ -98,7 +100,6 @@ type Request struct {
resource string
resourceName string
subresource string
timeout time.Duration
// output
err error
@ -106,42 +107,63 @@ type Request struct {
// This is only used for per-request timeouts, deadlines, and cancellations.
ctx context.Context
backoffMgr BackoffManager
throttle flowcontrol.RateLimiter
}
// NewRequest creates a new request helper object for accessing runtime.Objects on a server.
func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, serializers Serializers, backoff BackoffManager, throttle flowcontrol.RateLimiter, timeout time.Duration) *Request {
func NewRequest(c *RESTClient) *Request {
var backoff BackoffManager
if c.createBackoffMgr != nil {
backoff = c.createBackoffMgr()
}
if backoff == nil {
klog.V(2).Infof("Not implementing request backoff strategy.")
backoff = &NoBackoff{}
backoff = noBackoff
}
pathPrefix := "/"
if baseURL != nil {
pathPrefix = path.Join(pathPrefix, baseURL.Path)
var pathPrefix string
if c.base != nil {
pathPrefix = path.Join("/", c.base.Path, c.versionedAPIPath)
} else {
pathPrefix = path.Join("/", c.versionedAPIPath)
}
var timeout time.Duration
if c.Client != nil {
timeout = c.Client.Timeout
}
r := &Request{
client: client,
verb: verb,
baseURL: baseURL,
pathPrefix: path.Join(pathPrefix, versionedAPIPath),
content: content,
serializers: serializers,
backoffMgr: backoff,
throttle: throttle,
c: c,
rateLimiter: c.rateLimiter,
backoff: backoff,
timeout: timeout,
pathPrefix: pathPrefix,
}
switch {
case len(content.AcceptContentTypes) > 0:
r.SetHeader("Accept", content.AcceptContentTypes)
case len(content.ContentType) > 0:
r.SetHeader("Accept", content.ContentType+", */*")
case len(c.content.AcceptContentTypes) > 0:
r.SetHeader("Accept", c.content.AcceptContentTypes)
case len(c.content.ContentType) > 0:
r.SetHeader("Accept", c.content.ContentType+", */*")
}
return r
}
// NewRequestWithClient creates a Request with an embedded RESTClient for use in test scenarios.
func NewRequestWithClient(base *url.URL, versionedAPIPath string, content ClientContentConfig, client *http.Client) *Request {
return NewRequest(&RESTClient{
base: base,
versionedAPIPath: versionedAPIPath,
content: content,
Client: client,
})
}
// Verb sets the verb this request will use.
func (r *Request) Verb(verb string) *Request {
r.verb = verb
return r
}
// Prefix adds segments to the relative beginning to the request path. These
// items will be placed before the optional Namespace, Resource, or Name sections.
// Setting AbsPath will clear any previously set Prefix segments
@ -184,17 +206,17 @@ func (r *Request) Resource(resource string) *Request {
// or defaults to the stub implementation if nil is provided
func (r *Request) BackOff(manager BackoffManager) *Request {
if manager == nil {
r.backoffMgr = &NoBackoff{}
r.backoff = &NoBackoff{}
return r
}
r.backoffMgr = manager
r.backoff = manager
return r
}
// Throttle receives a rate-limiter and sets or replaces an existing request limiter
func (r *Request) Throttle(limiter flowcontrol.RateLimiter) *Request {
r.throttle = limiter
r.rateLimiter = limiter
return r
}
@ -272,8 +294,8 @@ func (r *Request) AbsPath(segments ...string) *Request {
if r.err != nil {
return r
}
r.pathPrefix = path.Join(r.baseURL.Path, path.Join(segments...))
if len(segments) == 1 && (len(r.baseURL.Path) > 1 || len(segments[0]) > 1) && strings.HasSuffix(segments[0], "/") {
r.pathPrefix = path.Join(r.c.base.Path, path.Join(segments...))
if len(segments) == 1 && (len(r.c.base.Path) > 1 || len(segments[0]) > 1) && strings.HasSuffix(segments[0], "/") {
// preserve any trailing slashes for legacy behavior
r.pathPrefix += "/"
}
@ -317,7 +339,7 @@ func (r *Request) Param(paramName, s string) *Request {
// VersionedParams will not write query parameters that have omitempty set and are empty. If a
// parameter has already been set it is appended to (Params and VersionedParams are additive).
func (r *Request) VersionedParams(obj runtime.Object, codec runtime.ParameterCodec) *Request {
return r.SpecificallyVersionedParams(obj, codec, *r.content.GroupVersion)
return r.SpecificallyVersionedParams(obj, codec, r.c.content.GroupVersion)
}
func (r *Request) SpecificallyVersionedParams(obj runtime.Object, codec runtime.ParameterCodec, version schema.GroupVersion) *Request {
@ -397,14 +419,19 @@ func (r *Request) Body(obj interface{}) *Request {
if reflect.ValueOf(t).IsNil() {
return r
}
data, err := runtime.Encode(r.serializers.Encoder, t)
encoder, err := r.c.content.Negotiator.Encoder(r.c.content.ContentType, nil)
if err != nil {
r.err = err
return r
}
data, err := runtime.Encode(encoder, t)
if err != nil {
r.err = err
return r
}
glogBody("Request Body", data)
r.body = bytes.NewReader(data)
r.SetHeader("Content-Type", r.content.ContentType)
r.SetHeader("Content-Type", r.c.content.ContentType)
default:
r.err = fmt.Errorf("unknown type used for body: %+v", obj)
}
@ -433,8 +460,8 @@ func (r *Request) URL() *url.URL {
}
finalURL := &url.URL{}
if r.baseURL != nil {
*finalURL = *r.baseURL
if r.c.base != nil {
*finalURL = *r.c.base
}
finalURL.Path = p
@ -468,8 +495,8 @@ func (r Request) finalURLTemplate() url.URL {
segments := strings.Split(r.URL().Path, "/")
groupIndex := 0
index := 0
if r.URL() != nil && r.baseURL != nil && strings.Contains(r.URL().Path, r.baseURL.Path) {
groupIndex += len(strings.Split(r.baseURL.Path, "/"))
if r.URL() != nil && r.c.base != nil && strings.Contains(r.URL().Path, r.c.base.Path) {
groupIndex += len(strings.Split(r.c.base.Path, "/"))
}
if groupIndex >= len(segments) {
return *url
@ -522,16 +549,16 @@ func (r Request) finalURLTemplate() url.URL {
}
func (r *Request) tryThrottle() error {
if r.throttle == nil {
if r.rateLimiter == nil {
return nil
}
now := time.Now()
var err error
if r.ctx != nil {
err = r.throttle.Wait(r.ctx)
err = r.rateLimiter.Wait(r.ctx)
} else {
r.throttle.Accept()
r.rateLimiter.Accept()
}
if latency := time.Since(now); latency > longThrottleLatency {
@ -544,27 +571,11 @@ func (r *Request) tryThrottle() error {
// Watch attempts to begin watching the requested location.
// Returns a watch.Interface, or an error.
func (r *Request) Watch() (watch.Interface, error) {
return r.WatchWithSpecificDecoders(
func(body io.ReadCloser) streaming.Decoder {
framer := r.serializers.Framer.NewFrameReader(body)
return streaming.NewDecoder(framer, r.serializers.StreamingSerializer)
},
r.serializers.Decoder,
)
}
// WatchWithSpecificDecoders attempts to begin watching the requested location with a *different* decoder.
// Turns out that you want one "standard" decoder for the watch event and one "personal" decoder for the content
// Returns a watch.Interface, or an error.
func (r *Request) WatchWithSpecificDecoders(wrapperDecoderFn func(io.ReadCloser) streaming.Decoder, embeddedDecoder runtime.Decoder) (watch.Interface, error) {
// We specifically don't want to rate limit watches, so we
// don't use r.throttle here.
// don't use r.rateLimiter here.
if r.err != nil {
return nil, r.err
}
if r.serializers.Framer == nil {
return nil, fmt.Errorf("watching resources is not possible with this client (content-type: %s)", r.content.ContentType)
}
url := r.URL().String()
req, err := http.NewRequest(r.verb, url, r.body)
@ -575,18 +586,18 @@ func (r *Request) WatchWithSpecificDecoders(wrapperDecoderFn func(io.ReadCloser)
req = req.WithContext(r.ctx)
}
req.Header = r.headers
client := r.client
client := r.c.Client
if client == nil {
client = http.DefaultClient
}
r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
resp, err := client.Do(req)
updateURLMetrics(r, resp, err)
if r.baseURL != nil {
if r.c.base != nil {
if err != nil {
r.backoffMgr.UpdateBackoff(r.baseURL, err, 0)
r.backoff.UpdateBackoff(r.c.base, err, 0)
} else {
r.backoffMgr.UpdateBackoff(r.baseURL, err, resp.StatusCode)
r.backoff.UpdateBackoff(r.c.base, err, resp.StatusCode)
}
}
if err != nil {
@ -604,9 +615,22 @@ func (r *Request) WatchWithSpecificDecoders(wrapperDecoderFn func(io.ReadCloser)
}
return nil, fmt.Errorf("for request %s, got status: %v", url, resp.StatusCode)
}
wrapperDecoder := wrapperDecoderFn(resp.Body)
contentType := resp.Header.Get("Content-Type")
mediaType, params, err := mime.ParseMediaType(contentType)
if err != nil {
klog.V(4).Infof("Unexpected content type from the server: %q: %v", contentType, err)
}
objectDecoder, streamingSerializer, framer, err := r.c.content.Negotiator.StreamDecoder(mediaType, params)
if err != nil {
return nil, err
}
frameReader := framer.NewFrameReader(resp.Body)
watchEventDecoder := streaming.NewDecoder(frameReader, streamingSerializer)
return watch.NewStreamWatcher(
restclientwatch.NewDecoder(wrapperDecoder, embeddedDecoder),
restclientwatch.NewDecoder(watchEventDecoder, objectDecoder),
// use 500 to indicate that the cause of the error is unknown - other error codes
// are more specific to HTTP interactions, and set a reason
errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"),
@ -617,8 +641,8 @@ func (r *Request) WatchWithSpecificDecoders(wrapperDecoderFn func(io.ReadCloser)
// It also handles corner cases for incomplete/invalid request data.
func updateURLMetrics(req *Request, resp *http.Response, err error) {
url := "none"
if req.baseURL != nil {
url = req.baseURL.Host
if req.c.base != nil {
url = req.c.base.Host
}
// Errors can be arbitrary strings. Unbound label cardinality is not suitable for a metric
@ -656,18 +680,18 @@ func (r *Request) Stream() (io.ReadCloser, error) {
req = req.WithContext(r.ctx)
}
req.Header = r.headers
client := r.client
client := r.c.Client
if client == nil {
client = http.DefaultClient
}
r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
resp, err := client.Do(req)
updateURLMetrics(r, resp, err)
if r.baseURL != nil {
if r.c.base != nil {
if err != nil {
r.backoffMgr.UpdateBackoff(r.URL(), err, 0)
r.backoff.UpdateBackoff(r.URL(), err, 0)
} else {
r.backoffMgr.UpdateBackoff(r.URL(), err, resp.StatusCode)
r.backoff.UpdateBackoff(r.URL(), err, resp.StatusCode)
}
}
if err != nil {
@ -738,7 +762,7 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error {
return err
}
client := r.client
client := r.c.Client
if client == nil {
client = http.DefaultClient
}
@ -765,11 +789,11 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error {
}
req.Header = r.headers
r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
if retries > 0 {
// We are retrying the request that we already send to apiserver
// at least once before.
// This request should also be throttled with the client-internal throttler.
// This request should also be throttled with the client-internal rate limiter.
if err := r.tryThrottle(); err != nil {
return err
}
@ -777,9 +801,9 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error {
resp, err := client.Do(req)
updateURLMetrics(r, resp, err)
if err != nil {
r.backoffMgr.UpdateBackoff(r.URL(), err, 0)
r.backoff.UpdateBackoff(r.URL(), err, 0)
} else {
r.backoffMgr.UpdateBackoff(r.URL(), err, resp.StatusCode)
r.backoff.UpdateBackoff(r.URL(), err, resp.StatusCode)
}
if err != nil {
// "Connection reset by peer" is usually a transient error.
@ -822,7 +846,7 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error {
}
klog.V(4).Infof("Got a Retry-After %ds response for attempt %d to %v", seconds, retries, url)
r.backoffMgr.Sleep(time.Duration(seconds) * time.Second)
r.backoff.Sleep(time.Duration(seconds) * time.Second)
return false
}
fn(req, resp)
@ -908,14 +932,18 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu
glogBody("Response Body", body)
// verify the content type is accurate
var decoder runtime.Decoder
contentType := resp.Header.Get("Content-Type")
decoder := r.serializers.Decoder
if len(contentType) > 0 && (decoder == nil || (len(r.content.ContentType) > 0 && contentType != r.content.ContentType)) {
if len(contentType) == 0 {
contentType = r.c.content.ContentType
}
if len(contentType) > 0 {
var err error
mediaType, params, err := mime.ParseMediaType(contentType)
if err != nil {
return Result{err: errors.NewInternalError(err)}
}
decoder, err = r.serializers.RenegotiatedDecoder(mediaType, params)
decoder, err = r.c.content.Negotiator.Decoder(mediaType, params)
if err != nil {
// if we fail to negotiate a decoder, treat this as an unstructured error
switch {
@ -1035,7 +1063,7 @@ func (r *Request) newUnstructuredResponseError(body []byte, isTextResponse bool,
}
var groupResource schema.GroupResource
if len(r.resource) > 0 {
groupResource.Group = r.content.GroupVersion.Group
groupResource.Group = r.c.content.GroupVersion.Group
groupResource.Resource = r.resource
}
return errors.NewGenericServerResponse(

File diff suppressed because it is too large Load Diff

View File

@ -239,7 +239,7 @@ func TestAttach(t *testing.T) {
defer tf.Cleanup()
codec := scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...)
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
tf.Client = &fake.RESTClient{
GroupVersion: schema.GroupVersion{Group: "", Version: "v1"},
@ -341,7 +341,7 @@ func TestAttachWarnings(t *testing.T) {
streams, _, _, bufErr := genericclioptions.NewTestIOStreams()
codec := scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...)
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
tf.Client = &fake.RESTClient{
GroupVersion: schema.GroupVersion{Group: "", Version: "v1"},

View File

@ -20,7 +20,6 @@ import (
"bytes"
"io/ioutil"
"net/http"
"net/url"
"reflect"
"testing"
@ -71,7 +70,7 @@ func TestCreateClusterRoleBinding(t *testing.T) {
tf := cmdtesting.NewTestFactory().WithNamespace("test")
defer tf.Cleanup()
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
info, _ := runtime.SerializerInfoForMediaType(ns.SupportedMediaTypes(), runtime.ContentTypeJSON)
encoder := ns.EncoderForVersion(info.Serializer, groupVersion)
@ -79,6 +78,7 @@ func TestCreateClusterRoleBinding(t *testing.T) {
tf.Client = &ClusterRoleBindingRESTClient{
RESTClient: &fake.RESTClient{
GroupVersion: schema.GroupVersion{Group: "rbac.authorization.k8s.io", Version: "v1beta1"},
NegotiatedSerializer: ns,
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
switch p, m := req.URL.Path, req.Method; {
@ -129,19 +129,5 @@ type ClusterRoleBindingRESTClient struct {
}
func (c *ClusterRoleBindingRESTClient) Post() *restclient.Request {
config := restclient.ContentConfig{
ContentType: runtime.ContentTypeJSON,
NegotiatedSerializer: c.NegotiatedSerializer,
}
info, _ := runtime.SerializerInfoForMediaType(c.NegotiatedSerializer.SupportedMediaTypes(), runtime.ContentTypeJSON)
serializers := restclient.Serializers{
Encoder: c.NegotiatedSerializer.EncoderForVersion(info.Serializer, schema.GroupVersion{Group: "rbac.authorization.k8s.io", Version: "v1beta1"}),
Decoder: c.NegotiatedSerializer.DecoderToVersion(info.Serializer, schema.GroupVersion{Group: "rbac.authorization.k8s.io", Version: "v1beta1"}),
}
if info.StreamSerializer != nil {
serializers.StreamingSerializer = info.StreamSerializer.Serializer
serializers.Framer = info.StreamSerializer.Framer
}
return restclient.NewRequest(c, "POST", &url.URL{Host: "localhost"}, c.VersionedAPIPath, config, serializers, nil, nil, 0)
return c.RESTClient.Verb("POST")
}

View File

@ -35,7 +35,7 @@ func TestCreateConfigMap(t *testing.T) {
defer tf.Cleanup()
codec := scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...)
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
tf.Client = &fake.RESTClient{
GroupVersion: schema.GroupVersion{Group: "", Version: "v1"},

View File

@ -91,7 +91,7 @@ func TestCreateDeployment(t *testing.T) {
tf := cmdtesting.NewTestFactory().WithNamespace("test")
defer tf.Cleanup()
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
fakeDiscovery := "{\"kind\":\"APIResourceList\",\"apiVersion\":\"v1\",\"groupVersion\":\"apps/v1\",\"resources\":[{\"name\":\"deployments\",\"singularName\":\"\",\"namespaced\":true,\"kind\":\"Deployment\",\"verbs\":[\"create\",\"delete\",\"deletecollection\",\"get\",\"list\",\"patch\",\"update\",\"watch\"],\"shortNames\":[\"deploy\"],\"categories\":[\"all\"]}]}"
tf.Client = &fake.RESTClient{
NegotiatedSerializer: ns,
@ -121,7 +121,7 @@ func TestCreateDeploymentNoImage(t *testing.T) {
tf := cmdtesting.NewTestFactory().WithNamespace("test")
defer tf.Cleanup()
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
fakeDiscovery := "{\"kind\":\"APIResourceList\",\"apiVersion\":\"v1\",\"groupVersion\":\"apps/v1\",\"resources\":[{\"name\":\"deployments\",\"singularName\":\"\",\"namespaced\":true,\"kind\":\"Deployment\",\"verbs\":[\"create\",\"delete\",\"deletecollection\",\"get\",\"list\",\"patch\",\"update\",\"watch\"],\"shortNames\":[\"deploy\"],\"categories\":[\"all\"]}]}"
tf.Client = &fake.RESTClient{
NegotiatedSerializer: ns,

View File

@ -35,7 +35,7 @@ func TestCreateNamespace(t *testing.T) {
defer tf.Cleanup()
codec := scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...)
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
tf.Client = &fake.RESTClient{
GroupVersion: schema.GroupVersion{Version: "v1"},

View File

@ -35,7 +35,7 @@ func TestCreatePdb(t *testing.T) {
tf := cmdtesting.NewTestFactory().WithNamespace("test")
defer tf.Cleanup()
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
tf.Client = &fake.RESTClient{
GroupVersion: schema.GroupVersion{Group: "policy", Version: "v1beta1"},

View File

@ -35,7 +35,7 @@ func TestCreatePriorityClass(t *testing.T) {
tf := cmdtesting.NewTestFactory()
defer tf.Cleanup()
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
tf.Client = &fake.RESTClient{
GroupVersion: schema.GroupVersion{Group: "scheduling.k8s.io", Version: "v1beta1"},

View File

@ -20,7 +20,6 @@ import (
"bytes"
"io/ioutil"
"net/http"
"net/url"
"reflect"
"testing"
@ -73,7 +72,7 @@ func TestCreateRoleBinding(t *testing.T) {
tf := cmdtesting.NewTestFactory().WithNamespace("test")
defer tf.Cleanup()
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
info, _ := runtime.SerializerInfoForMediaType(ns.SupportedMediaTypes(), runtime.ContentTypeJSON)
encoder := ns.EncoderForVersion(info.Serializer, groupVersion)
@ -81,6 +80,7 @@ func TestCreateRoleBinding(t *testing.T) {
tf.Client = &RoleBindingRESTClient{
RESTClient: &fake.RESTClient{
GroupVersion: groupVersion,
NegotiatedSerializer: ns,
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
switch p, m := req.URL.Path, req.Method; {
@ -125,20 +125,5 @@ type RoleBindingRESTClient struct {
}
func (c *RoleBindingRESTClient) Post() *restclient.Request {
config := restclient.ContentConfig{
ContentType: runtime.ContentTypeJSON,
GroupVersion: &groupVersion,
NegotiatedSerializer: c.NegotiatedSerializer,
}
info, _ := runtime.SerializerInfoForMediaType(c.NegotiatedSerializer.SupportedMediaTypes(), runtime.ContentTypeJSON)
serializers := restclient.Serializers{
Encoder: c.NegotiatedSerializer.EncoderForVersion(info.Serializer, groupVersion),
Decoder: c.NegotiatedSerializer.DecoderToVersion(info.Serializer, groupVersion),
}
if info.StreamSerializer != nil {
serializers.StreamingSerializer = info.StreamSerializer.Serializer
serializers.Framer = info.StreamSerializer.Framer
}
return restclient.NewRequest(c, "POST", &url.URL{Host: "localhost"}, c.VersionedAPIPath, config, serializers, nil, nil, 0)
return c.RESTClient.Verb("POST")
}

View File

@ -40,7 +40,7 @@ func TestCreateSecretGeneric(t *testing.T) {
defer tf.Cleanup()
codec := scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...)
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
tf.Client = &fake.RESTClient{
GroupVersion: schema.GroupVersion{Version: "v1"},
@ -72,7 +72,7 @@ func TestCreateSecretDockerRegistry(t *testing.T) {
secretObject.Name = "my-secret"
tf := cmdtesting.NewTestFactory().WithNamespace("test")
codec := scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...)
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
tf.Client = &fake.RESTClient{
GroupVersion: schema.GroupVersion{Version: "v1"},

View File

@ -35,7 +35,7 @@ func TestCreateServiceAccount(t *testing.T) {
defer tf.Cleanup()
codec := scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...)
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
tf.Client = &fake.RESTClient{
GroupVersion: schema.GroupVersion{Version: "v1"},

View File

@ -160,7 +160,7 @@ func TestCordon(t *testing.T) {
defer tf.Cleanup()
codec := scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...)
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
newNode := &corev1.Node{}
updated := false
@ -738,7 +738,7 @@ func TestDrain(t *testing.T) {
defer tf.Cleanup()
codec := scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...)
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
tf.Client = &fake.RESTClient{
GroupVersion: schema.GroupVersion{Group: "", Version: "v1"},

View File

@ -135,7 +135,7 @@ func TestPodAndContainer(t *testing.T) {
tf := cmdtesting.NewTestFactory().WithNamespace("test")
defer tf.Cleanup()
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
tf.Client = &fake.RESTClient{
NegotiatedSerializer: ns,
@ -205,7 +205,7 @@ func TestExec(t *testing.T) {
defer tf.Cleanup()
codec := scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...)
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
tf.Client = &fake.RESTClient{
GroupVersion: schema.GroupVersion{Group: "", Version: "v1"},

View File

@ -602,7 +602,7 @@ func TestRunExposeService(t *testing.T) {
defer tf.Cleanup()
codec := scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...)
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
tf.Client = &fake.RESTClient{
GroupVersion: schema.GroupVersion{Version: "v1"},

View File

@ -77,7 +77,7 @@ func testPortForward(t *testing.T, flags map[string]string, args []string) {
defer tf.Cleanup()
codec := scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...)
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
tf.Client = &fake.RESTClient{
VersionedAPIPath: "/api/v1",

View File

@ -68,7 +68,7 @@ go_test(
srcs = ["rollout_pause_test.go"],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/api/extensions/v1beta1:go_default_library",
"//staging/src/k8s.io/api/apps/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",

View File

@ -20,10 +20,9 @@ import (
"bytes"
"io/ioutil"
"net/http"
"net/url"
"testing"
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
@ -34,23 +33,24 @@ import (
"k8s.io/kubectl/pkg/scheme"
)
var rolloutPauseGroupVersionEncoder = schema.GroupVersion{Group: "extensions", Version: "v1beta1"}
var rolloutPauseGroupVersionDecoder = schema.GroupVersion{Group: "extensions", Version: "v1beta1"}
var rolloutPauseGroupVersionEncoder = schema.GroupVersion{Group: "apps", Version: "v1"}
var rolloutPauseGroupVersionDecoder = schema.GroupVersion{Group: "apps", Version: "v1"}
func TestRolloutPause(t *testing.T) {
deploymentName := "deployment/nginx-deployment"
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
tf := cmdtesting.NewTestFactory().WithNamespace("test")
info, _ := runtime.SerializerInfoForMediaType(ns.SupportedMediaTypes(), runtime.ContentTypeJSON)
encoder := ns.EncoderForVersion(info.Serializer, rolloutPauseGroupVersionEncoder)
tf.Client = &RolloutPauseRESTClient{
RESTClient: &fake.RESTClient{
GroupVersion: rolloutPauseGroupVersionEncoder,
NegotiatedSerializer: ns,
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
switch p, m := req.URL.Path, req.Method; {
case p == "/namespaces/test/deployments/nginx-deployment" && (m == "GET" || m == "PATCH"):
responseDeployment := &extensionsv1beta1.Deployment{}
responseDeployment := &appsv1.Deployment{}
responseDeployment.Name = deploymentName
body := ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(encoder, responseDeployment))))
return &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: body}, nil
@ -66,7 +66,7 @@ func TestRolloutPause(t *testing.T) {
cmd := NewCmdRolloutPause(tf, streams)
cmd.Run(cmd, []string{deploymentName})
expectedOutput := "deployment.extensions/" + deploymentName + " paused\n"
expectedOutput := "deployment.apps/" + deploymentName + " paused\n"
if buf.String() != expectedOutput {
t.Errorf("expected output: %s, but got: %s", expectedOutput, buf.String())
}
@ -77,39 +77,9 @@ type RolloutPauseRESTClient struct {
}
func (c *RolloutPauseRESTClient) Get() *restclient.Request {
config := restclient.ContentConfig{
ContentType: runtime.ContentTypeJSON,
GroupVersion: &rolloutPauseGroupVersionEncoder,
NegotiatedSerializer: c.NegotiatedSerializer,
}
info, _ := runtime.SerializerInfoForMediaType(c.NegotiatedSerializer.SupportedMediaTypes(), runtime.ContentTypeJSON)
serializers := restclient.Serializers{
Encoder: c.NegotiatedSerializer.EncoderForVersion(info.Serializer, rolloutPauseGroupVersionEncoder),
Decoder: c.NegotiatedSerializer.DecoderToVersion(info.Serializer, rolloutPauseGroupVersionDecoder),
}
if info.StreamSerializer != nil {
serializers.StreamingSerializer = info.StreamSerializer.Serializer
serializers.Framer = info.StreamSerializer.Framer
}
return restclient.NewRequest(c, "GET", &url.URL{Host: "localhost"}, c.VersionedAPIPath, config, serializers, nil, nil, 0)
return c.RESTClient.Verb("GET")
}
func (c *RolloutPauseRESTClient) Patch(pt types.PatchType) *restclient.Request {
config := restclient.ContentConfig{
ContentType: runtime.ContentTypeJSON,
GroupVersion: &rolloutPauseGroupVersionEncoder,
NegotiatedSerializer: c.NegotiatedSerializer,
}
info, _ := runtime.SerializerInfoForMediaType(c.NegotiatedSerializer.SupportedMediaTypes(), runtime.ContentTypeJSON)
serializers := restclient.Serializers{
Encoder: c.NegotiatedSerializer.EncoderForVersion(info.Serializer, rolloutPauseGroupVersionEncoder),
Decoder: c.NegotiatedSerializer.DecoderToVersion(info.Serializer, rolloutPauseGroupVersionDecoder),
}
if info.StreamSerializer != nil {
serializers.StreamingSerializer = info.StreamSerializer.Serializer
serializers.Framer = info.StreamSerializer.Framer
}
return restclient.NewRequest(c, "PATCH", &url.URL{Host: "localhost"}, c.VersionedAPIPath, config, serializers, nil, nil, 0)
return c.RESTClient.Verb("PATCH")
}

View File

@ -168,7 +168,7 @@ func TestRunArgsFollowDashRules(t *testing.T) {
defer tf.Cleanup()
codec := scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...)
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
tf.Client = &fake.RESTClient{
GroupVersion: corev1.SchemeGroupVersion,
@ -327,7 +327,7 @@ func TestGenerateService(t *testing.T) {
defer tf.Cleanup()
codec := scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...)
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
tf.ClientConfigVal = cmdtesting.DefaultClientConfig()
tf.Client = &fake.RESTClient{
@ -505,8 +505,9 @@ func TestRunValidations(t *testing.T) {
defer tf.Cleanup()
_, _, codec := cmdtesting.NewExternalScheme()
ns := scheme.Codecs.WithoutConversion()
tf.Client = &fake.RESTClient{
NegotiatedSerializer: scheme.Codecs,
NegotiatedSerializer: ns,
Resp: &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: cmdtesting.ObjBody(codec, cmdtesting.NewInternalType("", "", ""))},
}
tf.ClientConfigVal = cmdtesting.DefaultClientConfig()

View File

@ -243,7 +243,7 @@ func TestTaint(t *testing.T) {
defer tf.Cleanup()
codec := scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...)
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
tf.Client = &fake.RESTClient{
NegotiatedSerializer: ns,

View File

@ -53,7 +53,7 @@ func TestTopNodeAllMetrics(t *testing.T) {
defer tf.Cleanup()
codec := scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...)
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
tf.Client = &fake.RESTClient{
NegotiatedSerializer: ns,
@ -109,7 +109,7 @@ func TestTopNodeAllMetricsCustomDefaults(t *testing.T) {
defer tf.Cleanup()
codec := scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...)
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
tf.Client = &fake.RESTClient{
NegotiatedSerializer: ns,
@ -172,7 +172,7 @@ func TestTopNodeWithNameMetrics(t *testing.T) {
defer tf.Cleanup()
codec := scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...)
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
tf.Client = &fake.RESTClient{
NegotiatedSerializer: ns,
@ -238,7 +238,7 @@ func TestTopNodeWithLabelSelectorMetrics(t *testing.T) {
defer tf.Cleanup()
codec := scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...)
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
tf.Client = &fake.RESTClient{
NegotiatedSerializer: ns,
@ -292,7 +292,7 @@ func TestTopNodeAllMetricsFromMetricsServer(t *testing.T) {
defer tf.Cleanup()
codec := scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...)
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
tf.Client = &fake.RESTClient{
NegotiatedSerializer: ns,
@ -359,7 +359,7 @@ func TestTopNodeWithNameMetricsFromMetricsServer(t *testing.T) {
defer tf.Cleanup()
codec := scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...)
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
tf.Client = &fake.RESTClient{
NegotiatedSerializer: ns,
@ -436,7 +436,7 @@ func TestTopNodeWithLabelSelectorMetricsFromMetricsServer(t *testing.T) {
defer tf.Cleanup()
codec := scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...)
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
tf.Client = &fake.RESTClient{
NegotiatedSerializer: ns,

View File

@ -176,7 +176,7 @@ func TestTopPod(t *testing.T) {
tf := cmdtesting.NewTestFactory().WithNamespace(testNS)
defer tf.Cleanup()
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
tf.Client = &fake.RESTClient{
NegotiatedSerializer: ns,
@ -323,7 +323,7 @@ func TestTopPodWithMetricsServer(t *testing.T) {
tf := cmdtesting.NewTestFactory().WithNamespace(testNS)
defer tf.Cleanup()
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
tf.Client = &fake.RESTClient{
NegotiatedSerializer: ns,
@ -527,7 +527,7 @@ func TestTopPodCustomDefaults(t *testing.T) {
tf := cmdtesting.NewTestFactory().WithNamespace(testNS)
defer tf.Cleanup()
ns := scheme.Codecs
ns := scheme.Codecs.WithoutConversion()
tf.Client = &fake.RESTClient{
NegotiatedSerializer: ns,

View File

@ -21,6 +21,7 @@ go_library(
"garbage_collector.go",
"generated_clientset.go",
"namespace.go",
"protocol.go",
"resource_quota.go",
"table_conversion.go",
"watch.go",

View File

@ -0,0 +1,80 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apimachinery
import (
"fmt"
"strconv"
g "github.com/onsi/ginkgo"
o "github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework"
)
var _ = SIGDescribe("client-go should negotiate", func() {
f := framework.NewDefaultFramework("protocol")
f.SkipNamespaceCreation = true
for _, s := range []string{
"application/json",
"application/vnd.kubernetes.protobuf",
"application/vnd.kubernetes.protobuf,application/json",
"application/json,application/vnd.kubernetes.protobuf",
} {
accept := s
g.It(fmt.Sprintf("watch and report errors with accept %q", accept), func() {
cfg, err := framework.LoadConfig()
framework.ExpectNoError(err)
cfg.AcceptContentTypes = accept
c := kubernetes.NewForConfigOrDie(cfg)
svcs, err := c.CoreV1().Services("default").Get("kubernetes", metav1.GetOptions{})
framework.ExpectNoError(err)
rv, err := strconv.Atoi(svcs.ResourceVersion)
framework.ExpectNoError(err)
w, err := c.CoreV1().Services("default").Watch(metav1.ListOptions{ResourceVersion: strconv.Itoa(rv - 1)})
framework.ExpectNoError(err)
defer w.Stop()
evt, ok := <-w.ResultChan()
o.Expect(ok).To(o.BeTrue())
switch evt.Type {
case watch.Added, watch.Modified:
// this is allowed
case watch.Error:
err := errors.FromObject(evt.Object)
if errors.IsGone(err) {
// this is allowed, since the kubernetes object could be very old
break
}
if errors.IsUnexpectedObjectError(err) {
g.Fail(fmt.Sprintf("unexpected object, wanted v1.Status: %#v", evt.Object))
}
g.Fail(fmt.Sprintf("unexpected error: %#v", evt.Object))
default:
g.Fail(fmt.Sprintf("unexpected type %s: %#v", evt.Type, evt.Object))
}
})
}
})

View File

@ -54,7 +54,6 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/serviceaccount:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",

View File

@ -27,7 +27,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/kubernetes/test/e2e/framework"
@ -77,9 +77,12 @@ var _ = SIGDescribe("Service endpoints latency", func() {
)
// Turn off rate limiting--it interferes with our measurements.
oldThrottle := f.ClientSet.CoreV1().RESTClient().GetRateLimiter()
f.ClientSet.CoreV1().RESTClient().(*restclient.RESTClient).Throttle = flowcontrol.NewFakeAlwaysRateLimiter()
defer func() { f.ClientSet.CoreV1().RESTClient().(*restclient.RESTClient).Throttle = oldThrottle }()
cfg, err := framework.LoadConfig()
if err != nil {
framework.Failf("Unable to load config: %v", err)
}
cfg.RateLimiter = flowcontrol.NewFakeAlwaysRateLimiter()
f.ClientSet = kubernetes.NewForConfigOrDie(cfg)
failing := sets.NewString()
d, err := runServiceLatencies(f, parallelTrials, totalTrials, acceptableFailureRatio)