add easy to use dynamic client

Kubernetes-commit: 3632037e600d82a954c05ecd4f9cb6f1ca93d41c
This commit is contained in:
David Eads 2018-04-24 13:41:40 -04:00 committed by Kubernetes Publisher
parent 3df37beef0
commit 2e1683e1a1
7 changed files with 598 additions and 360 deletions

79
dynamic/bad_debt.go Normal file
View File

@ -0,0 +1,79 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamic
import (
"encoding/json"
"io"
"strings"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
)
// dynamicCodec is a codec that wraps the standard unstructured codec
// with special handling for Status objects.
// Deprecated only used by test code and its wrong
type dynamicCodec struct{}
func (dynamicCodec) Decode(data []byte, gvk *schema.GroupVersionKind, obj runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
obj, gvk, err := unstructured.UnstructuredJSONScheme.Decode(data, gvk, obj)
if err != nil {
return nil, nil, err
}
if _, ok := obj.(*metav1.Status); !ok && strings.ToLower(gvk.Kind) == "status" {
obj = &metav1.Status{}
err := json.Unmarshal(data, obj)
if err != nil {
return nil, nil, err
}
}
return obj, gvk, nil
}
func (dynamicCodec) Encode(obj runtime.Object, w io.Writer) error {
return unstructured.UnstructuredJSONScheme.Encode(obj, w)
}
// ContentConfig returns a rest.ContentConfig for dynamic types.
// Deprecated only used by test code and its wrong
func ContentConfig() rest.ContentConfig {
var jsonInfo runtime.SerializerInfo
// TODO: scheme.Codecs here should become "pkg/apis/server/scheme" which is the minimal core you need
// to talk to a kubernetes server
for _, info := range scheme.Codecs.SupportedMediaTypes() {
if info.MediaType == runtime.ContentTypeJSON {
jsonInfo = info
break
}
}
jsonInfo.Serializer = dynamicCodec{}
jsonInfo.PrettySerializer = nil
return rest.ContentConfig{
AcceptContentTypes: runtime.ContentTypeJSON,
ContentType: runtime.ContentTypeJSON,
NegotiatedSerializer: serializer.NegotiatedSerializerWrapper(jsonInfo),
}
}

View File

@ -20,37 +20,24 @@ limitations under the License.
package dynamic package dynamic
import ( import (
"encoding/json"
"errors"
"io"
"net/url"
"strings" "strings"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/conversion/queryparams"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/scheme"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
"k8s.io/client-go/util/flowcontrol"
) )
// Interface is a Kubernetes client that allows you to access metadata // Interface is a Kubernetes client that allows you to access metadata
// and manipulate metadata of a Kubernetes API group. // and manipulate metadata of a Kubernetes API group.
type Interface interface { type Interface interface {
// GetRateLimiter returns the rate limiter for this client.
GetRateLimiter() flowcontrol.RateLimiter
// Resource returns an API interface to the specified resource for this client's // Resource returns an API interface to the specified resource for this client's
// group and version. If resource is not a namespaced resource, then namespace // group and version. If resource is not a namespaced resource, then namespace
// is ignored. The ResourceInterface inherits the parameter codec of this client. // is ignored. The ResourceInterface inherits the parameter codec of this client.
Resource(resource *metav1.APIResource, namespace string) ResourceInterface Resource(resource *metav1.APIResource, namespace string) ResourceInterface
// ParameterCodec returns a client with the provided parameter codec.
ParameterCodec(parameterCodec runtime.ParameterCodec) Interface
} }
// ResourceInterface is an API interface to a specific resource under a // ResourceInterface is an API interface to a specific resource under a
@ -77,303 +64,50 @@ type ResourceInterface interface {
// Client is a Kubernetes client that allows you to access metadata // Client is a Kubernetes client that allows you to access metadata
// and manipulate metadata of a Kubernetes API group, and implements Interface. // and manipulate metadata of a Kubernetes API group, and implements Interface.
type Client struct { type Client struct {
cl *restclient.RESTClient version schema.GroupVersion
parameterCodec runtime.ParameterCodec delegate DynamicInterface
} }
// NewClient returns a new client based on the passed in config. The // NewClient returns a new client based on the passed in config. The
// codec is ignored, as the dynamic client uses it's own codec. // codec is ignored, as the dynamic client uses it's own codec.
func NewClient(conf *restclient.Config) (*Client, error) { func NewClient(conf *restclient.Config, version schema.GroupVersion) (*Client, error) {
// avoid changing the original config delegate, err := NewForConfig(conf)
confCopy := *conf
conf = &confCopy
contentConfig := ContentConfig()
contentConfig.GroupVersion = conf.GroupVersion
if conf.NegotiatedSerializer != nil {
contentConfig.NegotiatedSerializer = conf.NegotiatedSerializer
}
conf.ContentConfig = contentConfig
if conf.APIPath == "" {
conf.APIPath = "/api"
}
if len(conf.UserAgent) == 0 {
conf.UserAgent = restclient.DefaultKubernetesUserAgent()
}
cl, err := restclient.RESTClientFor(conf)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &Client{cl: cl}, nil return &Client{version: version, delegate: delegate}, nil
}
// GetRateLimiter returns rate limier.
func (c *Client) GetRateLimiter() flowcontrol.RateLimiter {
return c.cl.GetRateLimiter()
} }
// Resource returns an API interface to the specified resource for this client's // Resource returns an API interface to the specified resource for this client's
// group and version. If resource is not a namespaced resource, then namespace // group and version. If resource is not a namespaced resource, then namespace
// is ignored. The ResourceInterface inherits the parameter codec of c. // is ignored. The ResourceInterface inherits the parameter codec of c.
func (c *Client) Resource(resource *metav1.APIResource, namespace string) ResourceInterface { func (c *Client) Resource(resource *metav1.APIResource, namespace string) ResourceInterface {
return &ResourceClient{ resourceTokens := strings.SplitN(resource.Name, "/", 2)
cl: c.cl, subresource := ""
resource: resource, if len(resourceTokens) > 1 {
ns: namespace, subresource = resourceTokens[1]
parameterCodec: c.parameterCodec,
}
}
// ParameterCodec returns a client with the provided parameter codec.
func (c *Client) ParameterCodec(parameterCodec runtime.ParameterCodec) Interface {
return &Client{
cl: c.cl,
parameterCodec: parameterCodec,
}
}
// ResourceClient is an API interface to a specific resource under a
// dynamic client, and implements ResourceInterface.
type ResourceClient struct {
cl *restclient.RESTClient
resource *metav1.APIResource
ns string
parameterCodec runtime.ParameterCodec
}
func (rc *ResourceClient) parseResourceSubresourceName() (string, []string) {
var resourceName string
var subresourceName []string
if strings.Contains(rc.resource.Name, "/") {
resourceName = strings.Split(rc.resource.Name, "/")[0]
subresourceName = strings.Split(rc.resource.Name, "/")[1:]
} else {
resourceName = rc.resource.Name
} }
return resourceName, subresourceName if len(namespace) == 0 {
} return oldResourceShim(c.delegate.ClusterSubresource(c.version.WithResource(resourceTokens[0]), subresource))
// List returns a list of objects for this resource.
func (rc *ResourceClient) List(opts metav1.ListOptions) (runtime.Object, error) {
parameterEncoder := rc.parameterCodec
if parameterEncoder == nil {
parameterEncoder = defaultParameterEncoder
} }
return rc.cl.Get(). return oldResourceShim(c.delegate.NamespacedSubresource(c.version.WithResource(resourceTokens[0]), subresource, namespace))
NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(rc.resource.Name).
VersionedParams(&opts, parameterEncoder).
Do().
Get()
} }
// Get gets the resource with the specified name. // the old interfaces used the wrong type for lists. this fixes that
func (rc *ResourceClient) Get(name string, opts metav1.GetOptions) (*unstructured.Unstructured, error) { func oldResourceShim(in DynamicResourceInterface) ResourceInterface {
parameterEncoder := rc.parameterCodec return oldResourceShimType{DynamicResourceInterface: in}
if parameterEncoder == nil {
parameterEncoder = defaultParameterEncoder
}
result := new(unstructured.Unstructured)
resourceName, subresourceName := rc.parseResourceSubresourceName()
err := rc.cl.Get().
NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(resourceName).
SubResource(subresourceName...).
VersionedParams(&opts, parameterEncoder).
Name(name).
Do().
Into(result)
return result, err
} }
// Delete deletes the resource with the specified name. type oldResourceShimType struct {
func (rc *ResourceClient) Delete(name string, opts *metav1.DeleteOptions) error { DynamicResourceInterface
return rc.cl.Delete().
NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(rc.resource.Name).
Name(name).
Body(opts).
Do().
Error()
} }
// DeleteCollection deletes a collection of objects. func (s oldResourceShimType) List(opts metav1.ListOptions) (runtime.Object, error) {
func (rc *ResourceClient) DeleteCollection(deleteOptions *metav1.DeleteOptions, listOptions metav1.ListOptions) error { return s.DynamicResourceInterface.List(opts)
parameterEncoder := rc.parameterCodec
if parameterEncoder == nil {
parameterEncoder = defaultParameterEncoder
}
return rc.cl.Delete().
NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(rc.resource.Name).
VersionedParams(&listOptions, parameterEncoder).
Body(deleteOptions).
Do().
Error()
} }
// Create creates the provided resource. func (s oldResourceShimType) Patch(name string, pt types.PatchType, data []byte) (*unstructured.Unstructured, error) {
func (rc *ResourceClient) Create(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) { return s.DynamicResourceInterface.Patch(name, pt, data)
result := new(unstructured.Unstructured)
resourceName, subresourceName := rc.parseResourceSubresourceName()
req := rc.cl.Post().
NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(resourceName).
Body(obj)
if len(subresourceName) > 0 {
// If the provided resource is a subresource, the POST request should contain
// object name. Examples of subresources that support Create operation:
// core/v1/pods/{name}/binding
// core/v1/pods/{name}/eviction
// extensions/v1beta1/deployments/{name}/rollback
// apps/v1beta1/deployments/{name}/rollback
// NOTE: Currently our system assumes every subresource object has the same
// name as the parent resource object. E.g. a pods/binding object having
// metadada.name "foo" means pod "foo" is being bound. We may need to
// change this if we break the assumption in the future.
req = req.SubResource(subresourceName...).
Name(obj.GetName())
}
err := req.Do().
Into(result)
return result, err
} }
// Update updates the provided resource.
func (rc *ResourceClient) Update(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
result := new(unstructured.Unstructured)
if len(obj.GetName()) == 0 {
return result, errors.New("object missing name")
}
resourceName, subresourceName := rc.parseResourceSubresourceName()
err := rc.cl.Put().
NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(resourceName).
SubResource(subresourceName...).
// NOTE: Currently our system assumes every subresource object has the same
// name as the parent resource object. E.g. a pods/binding object having
// metadada.name "foo" means pod "foo" is being bound. We may need to
// change this if we break the assumption in the future.
Name(obj.GetName()).
Body(obj).
Do().
Into(result)
return result, err
}
// Watch returns a watch.Interface that watches the resource.
func (rc *ResourceClient) Watch(opts metav1.ListOptions) (watch.Interface, error) {
parameterEncoder := rc.parameterCodec
if parameterEncoder == nil {
parameterEncoder = defaultParameterEncoder
}
opts.Watch = true
return rc.cl.Get().
NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(rc.resource.Name).
VersionedParams(&opts, parameterEncoder).
Watch()
}
// Patch applies the patch and returns the patched resource.
func (rc *ResourceClient) Patch(name string, pt types.PatchType, data []byte) (*unstructured.Unstructured, error) {
result := new(unstructured.Unstructured)
resourceName, subresourceName := rc.parseResourceSubresourceName()
err := rc.cl.Patch(pt).
NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(resourceName).
SubResource(subresourceName...).
Name(name).
Body(data).
Do().
Into(result)
return result, err
}
// dynamicCodec is a codec that wraps the standard unstructured codec
// with special handling for Status objects.
type dynamicCodec struct{}
func (dynamicCodec) Decode(data []byte, gvk *schema.GroupVersionKind, obj runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
obj, gvk, err := unstructured.UnstructuredJSONScheme.Decode(data, gvk, obj)
if err != nil {
return nil, nil, err
}
if _, ok := obj.(*metav1.Status); !ok && strings.ToLower(gvk.Kind) == "status" {
obj = &metav1.Status{}
err := json.Unmarshal(data, obj)
if err != nil {
return nil, nil, err
}
}
return obj, gvk, nil
}
func (dynamicCodec) Encode(obj runtime.Object, w io.Writer) error {
return unstructured.UnstructuredJSONScheme.Encode(obj, w)
}
// ContentConfig returns a restclient.ContentConfig for dynamic types.
func ContentConfig() restclient.ContentConfig {
var jsonInfo runtime.SerializerInfo
// TODO: scheme.Codecs here should become "pkg/apis/server/scheme" which is the minimal core you need
// to talk to a kubernetes server
for _, info := range scheme.Codecs.SupportedMediaTypes() {
if info.MediaType == runtime.ContentTypeJSON {
jsonInfo = info
break
}
}
jsonInfo.Serializer = dynamicCodec{}
jsonInfo.PrettySerializer = nil
return restclient.ContentConfig{
AcceptContentTypes: runtime.ContentTypeJSON,
ContentType: runtime.ContentTypeJSON,
NegotiatedSerializer: serializer.NegotiatedSerializerWrapper(jsonInfo),
}
}
// paramaterCodec is a codec converts an API object to query
// parameters without trying to convert to the target version.
type parameterCodec struct{}
func (parameterCodec) EncodeParameters(obj runtime.Object, to schema.GroupVersion) (url.Values, error) {
return queryparams.Convert(obj)
}
func (parameterCodec) DecodeParameters(parameters url.Values, from schema.GroupVersion, into runtime.Object) error {
return errors.New("DecodeParameters not implemented on dynamic parameterCodec")
}
var defaultParameterEncoder runtime.ParameterCodec = parameterCodec{}
type versionedParameterEncoderWithV1Fallback struct{}
func (versionedParameterEncoderWithV1Fallback) EncodeParameters(obj runtime.Object, to schema.GroupVersion) (url.Values, error) {
ret, err := scheme.ParameterCodec.EncodeParameters(obj, to)
if err != nil && runtime.IsNotRegisteredError(err) {
// fallback to v1
return scheme.ParameterCodec.EncodeParameters(obj, v1.SchemeGroupVersion)
}
return ret, err
}
func (versionedParameterEncoderWithV1Fallback) DecodeParameters(parameters url.Values, from schema.GroupVersion, into runtime.Object) error {
return errors.New("DecodeParameters not implemented on versionedParameterEncoderWithV1Fallback")
}
// VersionedParameterEncoderWithV1Fallback is useful for encoding query
// parameters for custom resources. It tries to convert object to the
// specified version before converting it to query parameters, and falls back to
// converting to v1 if the object is not registered in the specified version.
// For the record, currently API server always treats query parameters sent to a
// custom resource endpoint as v1.
var VersionedParameterEncoderWithV1Fallback runtime.ParameterCodec = versionedParameterEncoderWithV1Fallback{}

View File

@ -113,7 +113,7 @@ func (c *clientPoolImpl) ClientForGroupVersionKind(kind schema.GroupVersionKind)
// we need to make a client // we need to make a client
conf.GroupVersion = &gv conf.GroupVersion = &gv
dynamicClient, err := NewClient(conf) dynamicClient, err := NewClient(conf, gv)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -63,7 +63,7 @@ func getClientServer(gv *schema.GroupVersion, h func(http.ResponseWriter, *http.
cl, err := NewClient(&restclient.Config{ cl, err := NewClient(&restclient.Config{
Host: srv.URL, Host: srv.URL,
ContentConfig: restclient.ContentConfig{GroupVersion: gv}, ContentConfig: restclient.ContentConfig{GroupVersion: gv},
}) }, *gv)
if err != nil { if err != nil {
srv.Close() srv.Close()
return nil, nil, err return nil, nil, err
@ -81,7 +81,7 @@ func TestList(t *testing.T) {
}{ }{
{ {
name: "normal_list", name: "normal_list",
path: "/api/gtest/vtest/rtest", path: "/apis/gtest/vtest/rtest",
resp: getListJSON("vTest", "rTestList", resp: getListJSON("vTest", "rTestList",
getJSON("vTest", "rTest", "item1"), getJSON("vTest", "rTest", "item1"),
getJSON("vTest", "rTest", "item2")), getJSON("vTest", "rTest", "item2")),
@ -99,7 +99,7 @@ func TestList(t *testing.T) {
{ {
name: "namespaced_list", name: "namespaced_list",
namespace: "nstest", namespace: "nstest",
path: "/api/gtest/vtest/namespaces/nstest/rtest", path: "/apis/gtest/vtest/namespaces/nstest/rtest",
resp: getListJSON("vTest", "rTestList", resp: getListJSON("vTest", "rTestList",
getJSON("vTest", "rTest", "item1"), getJSON("vTest", "rTest", "item1"),
getJSON("vTest", "rTest", "item2")), getJSON("vTest", "rTest", "item2")),
@ -160,7 +160,7 @@ func TestGet(t *testing.T) {
{ {
resource: "rtest", resource: "rtest",
name: "normal_get", name: "normal_get",
path: "/api/gtest/vtest/rtest/normal_get", path: "/apis/gtest/vtest/rtest/normal_get",
resp: getJSON("vTest", "rTest", "normal_get"), resp: getJSON("vTest", "rTest", "normal_get"),
want: getObject("vTest", "rTest", "normal_get"), want: getObject("vTest", "rTest", "normal_get"),
}, },
@ -168,14 +168,14 @@ func TestGet(t *testing.T) {
resource: "rtest", resource: "rtest",
namespace: "nstest", namespace: "nstest",
name: "namespaced_get", name: "namespaced_get",
path: "/api/gtest/vtest/namespaces/nstest/rtest/namespaced_get", path: "/apis/gtest/vtest/namespaces/nstest/rtest/namespaced_get",
resp: getJSON("vTest", "rTest", "namespaced_get"), resp: getJSON("vTest", "rTest", "namespaced_get"),
want: getObject("vTest", "rTest", "namespaced_get"), want: getObject("vTest", "rTest", "namespaced_get"),
}, },
{ {
resource: "rtest/srtest", resource: "rtest/srtest",
name: "normal_subresource_get", name: "normal_subresource_get",
path: "/api/gtest/vtest/rtest/normal_subresource_get/srtest", path: "/apis/gtest/vtest/rtest/normal_subresource_get/srtest",
resp: getJSON("vTest", "srTest", "normal_subresource_get"), resp: getJSON("vTest", "srTest", "normal_subresource_get"),
want: getObject("vTest", "srTest", "normal_subresource_get"), want: getObject("vTest", "srTest", "normal_subresource_get"),
}, },
@ -183,7 +183,7 @@ func TestGet(t *testing.T) {
resource: "rtest/srtest", resource: "rtest/srtest",
namespace: "nstest", namespace: "nstest",
name: "namespaced_subresource_get", name: "namespaced_subresource_get",
path: "/api/gtest/vtest/namespaces/nstest/rtest/namespaced_subresource_get/srtest", path: "/apis/gtest/vtest/namespaces/nstest/rtest/namespaced_subresource_get/srtest",
resp: getJSON("vTest", "srTest", "namespaced_subresource_get"), resp: getJSON("vTest", "srTest", "namespaced_subresource_get"),
want: getObject("vTest", "srTest", "namespaced_subresource_get"), want: getObject("vTest", "srTest", "namespaced_subresource_get"),
}, },
@ -222,23 +222,33 @@ func TestGet(t *testing.T) {
} }
func TestDelete(t *testing.T) { func TestDelete(t *testing.T) {
background := metav1.DeletePropagationBackground
uid := types.UID("uid")
statusOK := &metav1.Status{ statusOK := &metav1.Status{
TypeMeta: metav1.TypeMeta{Kind: "Status"}, TypeMeta: metav1.TypeMeta{Kind: "Status"},
Status: metav1.StatusSuccess, Status: metav1.StatusSuccess,
} }
tcs := []struct { tcs := []struct {
namespace string namespace string
name string name string
path string path string
deleteOptions *metav1.DeleteOptions
}{ }{
{ {
name: "normal_delete", name: "normal_delete",
path: "/api/gtest/vtest/rtest/normal_delete", path: "/apis/gtest/vtest/rtest/normal_delete",
}, },
{ {
namespace: "nstest", namespace: "nstest",
name: "namespaced_delete", name: "namespaced_delete",
path: "/api/gtest/vtest/namespaces/nstest/rtest/namespaced_delete", path: "/apis/gtest/vtest/namespaces/nstest/rtest/namespaced_delete",
},
{
namespace: "nstest",
name: "namespaced_delete_with_options",
path: "/apis/gtest/vtest/namespaces/nstest/rtest/namespaced_delete_with_options",
deleteOptions: &metav1.DeleteOptions{Preconditions: &metav1.Preconditions{UID: &uid}, PropagationPolicy: &background},
}, },
} }
for _, tc := range tcs { for _, tc := range tcs {
@ -262,7 +272,7 @@ func TestDelete(t *testing.T) {
} }
defer srv.Close() defer srv.Close()
err = cl.Resource(resource, tc.namespace).Delete(tc.name, nil) err = cl.Resource(resource, tc.namespace).Delete(tc.name, tc.deleteOptions)
if err != nil { if err != nil {
t.Errorf("unexpected error when deleting %q: %v", tc.name, err) t.Errorf("unexpected error when deleting %q: %v", tc.name, err)
continue continue
@ -282,12 +292,12 @@ func TestDeleteCollection(t *testing.T) {
}{ }{
{ {
name: "normal_delete_collection", name: "normal_delete_collection",
path: "/api/gtest/vtest/rtest", path: "/apis/gtest/vtest/rtest",
}, },
{ {
namespace: "nstest", namespace: "nstest",
name: "namespaced_delete_collection", name: "namespaced_delete_collection",
path: "/api/gtest/vtest/namespaces/nstest/rtest", path: "/apis/gtest/vtest/namespaces/nstest/rtest",
}, },
} }
for _, tc := range tcs { for _, tc := range tcs {
@ -330,28 +340,15 @@ func TestCreate(t *testing.T) {
{ {
resource: "rtest", resource: "rtest",
name: "normal_create", name: "normal_create",
path: "/api/gtest/vtest/rtest", path: "/apis/gtest/vtest/rtest",
obj: getObject("vTest", "rTest", "normal_create"), obj: getObject("gtest/vTest", "rTest", "normal_create"),
}, },
{ {
resource: "rtest", resource: "rtest",
name: "namespaced_create", name: "namespaced_create",
namespace: "nstest", namespace: "nstest",
path: "/api/gtest/vtest/namespaces/nstest/rtest", path: "/apis/gtest/vtest/namespaces/nstest/rtest",
obj: getObject("vTest", "rTest", "namespaced_create"), obj: getObject("gtest/vTest", "rTest", "namespaced_create"),
},
{
resource: "rtest/srtest",
name: "normal_subresource_create",
path: "/api/gtest/vtest/rtest/normal_subresource_create/srtest",
obj: getObject("vTest", "srTest", "normal_subresource_create"),
},
{
resource: "rtest/srtest",
name: "namespaced_subresource_create",
namespace: "nstest",
path: "/api/gtest/vtest/namespaces/nstest/rtest/namespaced_subresource_create/srtest",
obj: getObject("vTest", "srTest", "namespaced_subresource_create"),
}, },
} }
for _, tc := range tcs { for _, tc := range tcs {
@ -405,28 +402,28 @@ func TestUpdate(t *testing.T) {
{ {
resource: "rtest", resource: "rtest",
name: "normal_update", name: "normal_update",
path: "/api/gtest/vtest/rtest/normal_update", path: "/apis/gtest/vtest/rtest/normal_update",
obj: getObject("vTest", "rTest", "normal_update"), obj: getObject("gtest/vTest", "rTest", "normal_update"),
}, },
{ {
resource: "rtest", resource: "rtest",
name: "namespaced_update", name: "namespaced_update",
namespace: "nstest", namespace: "nstest",
path: "/api/gtest/vtest/namespaces/nstest/rtest/namespaced_update", path: "/apis/gtest/vtest/namespaces/nstest/rtest/namespaced_update",
obj: getObject("vTest", "rTest", "namespaced_update"), obj: getObject("gtest/vTest", "rTest", "namespaced_update"),
}, },
{ {
resource: "rtest/srtest", resource: "rtest/srtest",
name: "normal_subresource_update", name: "normal_subresource_update",
path: "/api/gtest/vtest/rtest/normal_update/srtest", path: "/apis/gtest/vtest/rtest/normal_update/srtest",
obj: getObject("vTest", "srTest", "normal_update"), obj: getObject("gtest/vTest", "srTest", "normal_update"),
}, },
{ {
resource: "rtest/srtest", resource: "rtest/srtest",
name: "namespaced_subresource_update", name: "namespaced_subresource_update",
namespace: "nstest", namespace: "nstest",
path: "/api/gtest/vtest/namespaces/nstest/rtest/namespaced_update/srtest", path: "/apis/gtest/vtest/namespaces/nstest/rtest/namespaced_update/srtest",
obj: getObject("vTest", "srTest", "namespaced_update"), obj: getObject("gtest/vTest", "srTest", "namespaced_update"),
}, },
} }
for _, tc := range tcs { for _, tc := range tcs {
@ -479,23 +476,23 @@ func TestWatch(t *testing.T) {
}{ }{
{ {
name: "normal_watch", name: "normal_watch",
path: "/api/gtest/vtest/rtest", path: "/apis/gtest/vtest/rtest",
query: "watch=true", query: "watch=true",
events: []watch.Event{ events: []watch.Event{
{Type: watch.Added, Object: getObject("vTest", "rTest", "normal_watch")}, {Type: watch.Added, Object: getObject("gtest/vTest", "rTest", "normal_watch")},
{Type: watch.Modified, Object: getObject("vTest", "rTest", "normal_watch")}, {Type: watch.Modified, Object: getObject("gtest/vTest", "rTest", "normal_watch")},
{Type: watch.Deleted, Object: getObject("vTest", "rTest", "normal_watch")}, {Type: watch.Deleted, Object: getObject("gtest/vTest", "rTest", "normal_watch")},
}, },
}, },
{ {
name: "namespaced_watch", name: "namespaced_watch",
namespace: "nstest", namespace: "nstest",
path: "/api/gtest/vtest/namespaces/nstest/rtest", path: "/apis/gtest/vtest/namespaces/nstest/rtest",
query: "watch=true", query: "watch=true",
events: []watch.Event{ events: []watch.Event{
{Type: watch.Added, Object: getObject("vTest", "rTest", "namespaced_watch")}, {Type: watch.Added, Object: getObject("gtest/vTest", "rTest", "namespaced_watch")},
{Type: watch.Modified, Object: getObject("vTest", "rTest", "namespaced_watch")}, {Type: watch.Modified, Object: getObject("gtest/vTest", "rTest", "namespaced_watch")},
{Type: watch.Deleted, Object: getObject("vTest", "rTest", "namespaced_watch")}, {Type: watch.Deleted, Object: getObject("gtest/vTest", "rTest", "namespaced_watch")},
}, },
}, },
} }
@ -552,32 +549,32 @@ func TestPatch(t *testing.T) {
{ {
resource: "rtest", resource: "rtest",
name: "normal_patch", name: "normal_patch",
path: "/api/gtest/vtest/rtest/normal_patch", path: "/apis/gtest/vtest/rtest/normal_patch",
patch: getJSON("vTest", "rTest", "normal_patch"), patch: getJSON("gtest/vTest", "rTest", "normal_patch"),
want: getObject("vTest", "rTest", "normal_patch"), want: getObject("gtest/vTest", "rTest", "normal_patch"),
}, },
{ {
resource: "rtest", resource: "rtest",
name: "namespaced_patch", name: "namespaced_patch",
namespace: "nstest", namespace: "nstest",
path: "/api/gtest/vtest/namespaces/nstest/rtest/namespaced_patch", path: "/apis/gtest/vtest/namespaces/nstest/rtest/namespaced_patch",
patch: getJSON("vTest", "rTest", "namespaced_patch"), patch: getJSON("gtest/vTest", "rTest", "namespaced_patch"),
want: getObject("vTest", "rTest", "namespaced_patch"), want: getObject("gtest/vTest", "rTest", "namespaced_patch"),
}, },
{ {
resource: "rtest/srtest", resource: "rtest/srtest",
name: "normal_subresource_patch", name: "normal_subresource_patch",
path: "/api/gtest/vtest/rtest/normal_subresource_patch/srtest", path: "/apis/gtest/vtest/rtest/normal_subresource_patch/srtest",
patch: getJSON("vTest", "srTest", "normal_subresource_patch"), patch: getJSON("gtest/vTest", "srTest", "normal_subresource_patch"),
want: getObject("vTest", "srTest", "normal_subresource_patch"), want: getObject("gtest/vTest", "srTest", "normal_subresource_patch"),
}, },
{ {
resource: "rtest/srtest", resource: "rtest/srtest",
name: "namespaced_subresource_patch", name: "namespaced_subresource_patch",
namespace: "nstest", namespace: "nstest",
path: "/api/gtest/vtest/namespaces/nstest/rtest/namespaced_subresource_patch/srtest", path: "/apis/gtest/vtest/namespaces/nstest/rtest/namespaced_subresource_patch/srtest",
patch: getJSON("vTest", "srTest", "namespaced_subresource_patch"), patch: getJSON("gtest/vTest", "srTest", "namespaced_subresource_patch"),
want: getObject("vTest", "srTest", "namespaced_subresource_patch"), want: getObject("gtest/vTest", "srTest", "namespaced_subresource_patch"),
}, },
} }
for _, tc := range tcs { for _, tc := range tcs {
@ -624,11 +621,3 @@ func TestPatch(t *testing.T) {
} }
} }
} }
func TestVersionedParameterEncoderWithV1Fallback(t *testing.T) {
enc := VersionedParameterEncoderWithV1Fallback
_, err := enc.EncodeParameters(&metav1.ListOptions{}, schema.GroupVersion{Group: "foo.bar.com", Version: "v4"})
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
}

98
dynamic/scheme.go Normal file
View File

@ -0,0 +1,98 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamic
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/apimachinery/pkg/runtime/serializer/versioning"
)
var watchScheme = runtime.NewScheme()
var basicScheme = runtime.NewScheme()
var deleteScheme = runtime.NewScheme()
var parameterScheme = runtime.NewScheme()
var deleteOptionsCodec = serializer.NewCodecFactory(deleteScheme)
var dynamicParameterCodec = runtime.NewParameterCodec(parameterScheme)
var versionV1 = schema.GroupVersion{Version: "v1"}
func init() {
metav1.AddToGroupVersion(watchScheme, versionV1)
metav1.AddToGroupVersion(basicScheme, versionV1)
metav1.AddToGroupVersion(parameterScheme, versionV1)
metav1.AddToGroupVersion(deleteScheme, versionV1)
}
var watchJsonSerializerInfo = runtime.SerializerInfo{
MediaType: "application/json",
EncodesAsText: true,
Serializer: json.NewSerializer(json.DefaultMetaFactory, watchScheme, watchScheme, false),
PrettySerializer: json.NewSerializer(json.DefaultMetaFactory, watchScheme, watchScheme, true),
StreamSerializer: &runtime.StreamSerializerInfo{
EncodesAsText: true,
Serializer: json.NewSerializer(json.DefaultMetaFactory, watchScheme, watchScheme, false),
Framer: json.Framer,
},
}
// watchNegotiatedSerializer is used to read the wrapper of the watch stream
type watchNegotiatedSerializer struct{}
var watchNegotiatedSerializerInstance = watchNegotiatedSerializer{}
func (s watchNegotiatedSerializer) SupportedMediaTypes() []runtime.SerializerInfo {
return []runtime.SerializerInfo{watchJsonSerializerInfo}
}
func (s watchNegotiatedSerializer) EncoderForVersion(encoder runtime.Encoder, gv runtime.GroupVersioner) runtime.Encoder {
return versioning.NewDefaultingCodecForScheme(watchScheme, encoder, nil, gv, nil)
}
func (s watchNegotiatedSerializer) DecoderToVersion(decoder runtime.Decoder, gv runtime.GroupVersioner) runtime.Decoder {
return versioning.NewDefaultingCodecForScheme(watchScheme, nil, decoder, nil, gv)
}
// basicNegotiatedSerializer is used to handle discovery and error handling serialization
type basicNegotiatedSerializer struct{}
func (s basicNegotiatedSerializer) SupportedMediaTypes() []runtime.SerializerInfo {
return []runtime.SerializerInfo{
{
MediaType: "application/json",
EncodesAsText: true,
Serializer: json.NewSerializer(json.DefaultMetaFactory, basicScheme, basicScheme, false),
PrettySerializer: json.NewSerializer(json.DefaultMetaFactory, basicScheme, basicScheme, true),
StreamSerializer: &runtime.StreamSerializerInfo{
EncodesAsText: true,
Serializer: json.NewSerializer(json.DefaultMetaFactory, basicScheme, basicScheme, false),
Framer: json.Framer,
},
},
}
}
func (s basicNegotiatedSerializer) EncoderForVersion(encoder runtime.Encoder, gv runtime.GroupVersioner) runtime.Encoder {
return versioning.NewDefaultingCodecForScheme(watchScheme, encoder, nil, gv, nil)
}
func (s basicNegotiatedSerializer) DecoderToVersion(decoder runtime.Decoder, gv runtime.GroupVersioner) runtime.Decoder {
return versioning.NewDefaultingCodecForScheme(watchScheme, nil, decoder, nil, gv)
}

322
dynamic/simple.go Normal file
View File

@ -0,0 +1,322 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamic
import (
"fmt"
"io"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/rest"
)
type DynamicInterface interface {
ClusterResource(resource schema.GroupVersionResource) DynamicResourceInterface
NamespacedResource(resource schema.GroupVersionResource, namespace string) DynamicResourceInterface
// Deprecated, this isn't how we want to do it
ClusterSubresource(resource schema.GroupVersionResource, subresource string) DynamicResourceInterface
// Deprecated, this isn't how we want to do it
NamespacedSubresource(resource schema.GroupVersionResource, subresource, namespace string) DynamicResourceInterface
}
type DynamicResourceInterface interface {
Create(obj *unstructured.Unstructured) (*unstructured.Unstructured, error)
Update(obj *unstructured.Unstructured) (*unstructured.Unstructured, error)
UpdateStatus(obj *unstructured.Unstructured) (*unstructured.Unstructured, error)
Delete(name string, options *metav1.DeleteOptions) error
DeleteCollection(options *metav1.DeleteOptions, listOptions metav1.ListOptions) error
Get(name string, options metav1.GetOptions) (*unstructured.Unstructured, error)
List(opts metav1.ListOptions) (*unstructured.UnstructuredList, error)
Watch(opts metav1.ListOptions) (watch.Interface, error)
Patch(name string, pt types.PatchType, data []byte, subresources ...string) (*unstructured.Unstructured, error)
}
type dynamicClient struct {
client *rest.RESTClient
}
var _ DynamicInterface = &dynamicClient{}
func NewForConfig(inConfig *rest.Config) (DynamicInterface, error) {
config := rest.CopyConfig(inConfig)
// for serializing the options
config.GroupVersion = &schema.GroupVersion{}
config.APIPath = "/if-you-see-this-search-for-the-break"
config.AcceptContentTypes = "application/json"
config.ContentType = "application/json"
config.NegotiatedSerializer = basicNegotiatedSerializer{} // this gets used for discovery and error handling types
if config.UserAgent == "" {
config.UserAgent = rest.DefaultKubernetesUserAgent()
}
restClient, err := rest.RESTClientFor(config)
if err != nil {
return nil, err
}
return &dynamicClient{client: restClient}, nil
}
type dynamicResourceClient struct {
client *dynamicClient
namespace string
resource schema.GroupVersionResource
subresource string
}
func (c *dynamicClient) ClusterResource(resource schema.GroupVersionResource) DynamicResourceInterface {
return &dynamicResourceClient{client: c, resource: resource}
}
func (c *dynamicClient) NamespacedResource(resource schema.GroupVersionResource, namespace string) DynamicResourceInterface {
return &dynamicResourceClient{client: c, resource: resource, namespace: namespace}
}
func (c *dynamicClient) ClusterSubresource(resource schema.GroupVersionResource, subresource string) DynamicResourceInterface {
return &dynamicResourceClient{client: c, resource: resource, subresource: subresource}
}
func (c *dynamicClient) NamespacedSubresource(resource schema.GroupVersionResource, subresource, namespace string) DynamicResourceInterface {
return &dynamicResourceClient{client: c, resource: resource, namespace: namespace, subresource: subresource}
}
func (c *dynamicResourceClient) Create(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
if len(c.subresource) > 0 {
return nil, fmt.Errorf("create not supported for subresources")
}
outBytes, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj)
if err != nil {
return nil, err
}
result := c.client.client.Post().AbsPath(c.makeURLSegments("")...).Body(outBytes).Do()
if err := result.Error(); err != nil {
return nil, err
}
retBytes, err := result.Raw()
if err != nil {
return nil, err
}
uncastObj, err := runtime.Decode(unstructured.UnstructuredJSONScheme, retBytes)
if err != nil {
return nil, err
}
return uncastObj.(*unstructured.Unstructured), nil
}
func (c *dynamicResourceClient) Update(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
accessor, err := meta.Accessor(obj)
if err != nil {
return nil, err
}
outBytes, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj)
if err != nil {
return nil, err
}
result := c.client.client.Put().AbsPath(c.makeURLSegments(accessor.GetName())...).Body(outBytes).Do()
if err := result.Error(); err != nil {
return nil, err
}
retBytes, err := result.Raw()
if err != nil {
return nil, err
}
uncastObj, err := runtime.Decode(unstructured.UnstructuredJSONScheme, retBytes)
if err != nil {
return nil, err
}
return uncastObj.(*unstructured.Unstructured), nil
}
func (c *dynamicResourceClient) UpdateStatus(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
accessor, err := meta.Accessor(obj)
if err != nil {
return nil, err
}
result := c.client.client.Put().AbsPath(append(c.makeURLSegments(accessor.GetName()), "status")...).Body(obj).Do()
uncastObj, err := result.Get()
if err != nil {
return nil, err
}
return uncastObj.(*unstructured.Unstructured), nil
}
func (c *dynamicResourceClient) Delete(name string, opts *metav1.DeleteOptions) error {
if opts == nil {
opts = &metav1.DeleteOptions{}
}
if opts == nil {
opts = &metav1.DeleteOptions{}
}
deleteOptionsByte, err := runtime.Encode(deleteOptionsCodec.LegacyCodec(schema.GroupVersion{Version: "v1"}), opts)
if err != nil {
return err
}
result := c.client.client.Delete().AbsPath(c.makeURLSegments(name)...).Body(deleteOptionsByte).Do()
return result.Error()
}
func (c *dynamicResourceClient) DeleteCollection(opts *metav1.DeleteOptions, listOptions metav1.ListOptions) error {
if len(c.subresource) > 0 {
return fmt.Errorf("deletecollection not supported for subresources")
}
if opts == nil {
opts = &metav1.DeleteOptions{}
}
deleteOptionsByte, err := runtime.Encode(deleteOptionsCodec.LegacyCodec(schema.GroupVersion{Version: "v1"}), opts)
if err != nil {
return err
}
result := c.client.client.Delete().AbsPath(c.makeURLSegments("")...).Body(deleteOptionsByte).SpecificallyVersionedParams(&listOptions, dynamicParameterCodec, versionV1).Do()
return result.Error()
}
func (c *dynamicResourceClient) Get(name string, opts metav1.GetOptions) (*unstructured.Unstructured, error) {
result := c.client.client.Get().AbsPath(c.makeURLSegments(name)...).SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1).Do()
if err := result.Error(); err != nil {
return nil, err
}
retBytes, err := result.Raw()
if err != nil {
return nil, err
}
uncastObj, err := runtime.Decode(unstructured.UnstructuredJSONScheme, retBytes)
if err != nil {
return nil, err
}
return uncastObj.(*unstructured.Unstructured), nil
}
func (c *dynamicResourceClient) List(opts metav1.ListOptions) (*unstructured.UnstructuredList, error) {
if len(c.subresource) > 0 {
return nil, fmt.Errorf("list not supported for subresources")
}
result := c.client.client.Get().AbsPath(c.makeURLSegments("")...).SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1).Do()
if err := result.Error(); err != nil {
return nil, err
}
retBytes, err := result.Raw()
if err != nil {
return nil, err
}
uncastObj, err := runtime.Decode(unstructured.UnstructuredJSONScheme, retBytes)
if err != nil {
return nil, err
}
if list, ok := uncastObj.(*unstructured.UnstructuredList); ok {
return list, nil
}
list, err := uncastObj.(*unstructured.Unstructured).ToList()
if err != nil {
return nil, err
}
return list, nil
}
func (c *dynamicResourceClient) Watch(opts metav1.ListOptions) (watch.Interface, error) {
if len(c.subresource) > 0 {
return nil, fmt.Errorf("watch not supported for subresources")
}
internalGV := schema.GroupVersions{
{Group: c.resource.Group, Version: runtime.APIVersionInternal},
// always include the legacy group as a decoding target to handle non-error `Status` return types
{Group: "", Version: runtime.APIVersionInternal},
}
s := &rest.Serializers{
Encoder: watchNegotiatedSerializerInstance.EncoderForVersion(watchJsonSerializerInfo.Serializer, c.resource.GroupVersion()),
Decoder: watchNegotiatedSerializerInstance.DecoderToVersion(watchJsonSerializerInfo.Serializer, internalGV),
RenegotiatedDecoder: func(contentType string, params map[string]string) (runtime.Decoder, error) {
return watchNegotiatedSerializerInstance.DecoderToVersion(watchJsonSerializerInfo.Serializer, internalGV), nil
},
StreamingSerializer: watchJsonSerializerInfo.StreamSerializer.Serializer,
Framer: watchJsonSerializerInfo.StreamSerializer.Framer,
}
wrappedDecoderFn := func(body io.ReadCloser) streaming.Decoder {
framer := s.Framer.NewFrameReader(body)
return streaming.NewDecoder(framer, s.StreamingSerializer)
}
opts.Watch = true
return c.client.client.Get().AbsPath(c.makeURLSegments("")...).
SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1).
WatchWithSpecificDecoders(wrappedDecoderFn, unstructured.UnstructuredJSONScheme)
}
func (c *dynamicResourceClient) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (*unstructured.Unstructured, error) {
result := c.client.client.Patch(pt).AbsPath(append(c.makeURLSegments(name), subresources...)...).Body(data).Do()
if err := result.Error(); err != nil {
return nil, err
}
retBytes, err := result.Raw()
if err != nil {
return nil, err
}
uncastObj, err := runtime.Decode(unstructured.UnstructuredJSONScheme, retBytes)
if err != nil {
return nil, err
}
return uncastObj.(*unstructured.Unstructured), nil
}
func (c *dynamicResourceClient) makeURLSegments(name string) []string {
url := []string{}
if len(c.resource.Group) == 0 {
url = append(url, "api")
} else {
url = append(url, "apis", c.resource.Group)
}
url = append(url, c.resource.Version)
if len(c.namespace) > 0 {
url = append(url, "namespaces", c.namespace)
}
url = append(url, c.resource.Resource)
if len(name) > 0 {
url = append(url, name)
// subresources only work on things with names
if len(c.subresource) > 0 {
url = append(url, c.subresource)
}
} else {
if len(c.subresource) > 0 {
panic("somehow snuck a subresource and an empty name. programmer error")
}
}
return url
}

View File

@ -317,10 +317,14 @@ func (r *Request) Param(paramName, s string) *Request {
// VersionedParams will not write query parameters that have omitempty set and are empty. If a // VersionedParams will not write query parameters that have omitempty set and are empty. If a
// parameter has already been set it is appended to (Params and VersionedParams are additive). // parameter has already been set it is appended to (Params and VersionedParams are additive).
func (r *Request) VersionedParams(obj runtime.Object, codec runtime.ParameterCodec) *Request { func (r *Request) VersionedParams(obj runtime.Object, codec runtime.ParameterCodec) *Request {
return r.SpecificallyVersionedParams(obj, codec, *r.content.GroupVersion)
}
func (r *Request) SpecificallyVersionedParams(obj runtime.Object, codec runtime.ParameterCodec, version schema.GroupVersion) *Request {
if r.err != nil { if r.err != nil {
return r return r
} }
params, err := codec.EncodeParameters(obj, *r.content.GroupVersion) params, err := codec.EncodeParameters(obj, version)
if err != nil { if err != nil {
r.err = err r.err = err
return r return r
@ -485,6 +489,19 @@ func (r *Request) tryThrottle() {
// Watch attempts to begin watching the requested location. // Watch attempts to begin watching the requested location.
// Returns a watch.Interface, or an error. // Returns a watch.Interface, or an error.
func (r *Request) Watch() (watch.Interface, error) { func (r *Request) Watch() (watch.Interface, error) {
return r.WatchWithSpecificDecoders(
func(body io.ReadCloser) streaming.Decoder {
framer := r.serializers.Framer.NewFrameReader(body)
return streaming.NewDecoder(framer, r.serializers.StreamingSerializer)
},
r.serializers.Decoder,
)
}
// WatchWithSpecificDecoders attempts to begin watching the requested location with a *different* decoder.
// Turns out that you want one "standard" decoder for the watch event and one "personal" decoder for the content
// Returns a watch.Interface, or an error.
func (r *Request) WatchWithSpecificDecoders(wrapperDecoderFn func(io.ReadCloser) streaming.Decoder, embeddedDecoder runtime.Decoder) (watch.Interface, error) {
// We specifically don't want to rate limit watches, so we // We specifically don't want to rate limit watches, so we
// don't use r.throttle here. // don't use r.throttle here.
if r.err != nil { if r.err != nil {
@ -532,9 +549,8 @@ func (r *Request) Watch() (watch.Interface, error) {
} }
return nil, fmt.Errorf("for request '%+v', got status: %v", url, resp.StatusCode) return nil, fmt.Errorf("for request '%+v', got status: %v", url, resp.StatusCode)
} }
framer := r.serializers.Framer.NewFrameReader(resp.Body) wrapperDecoder := wrapperDecoderFn(resp.Body)
decoder := streaming.NewDecoder(framer, r.serializers.StreamingSerializer) return watch.NewStreamWatcher(restclientwatch.NewDecoder(wrapperDecoder, embeddedDecoder)), nil
return watch.NewStreamWatcher(restclientwatch.NewDecoder(decoder, r.serializers.Decoder)), nil
} }
// updateURLMetrics is a convenience function for pushing metrics. // updateURLMetrics is a convenience function for pushing metrics.