From 2e1683e1a1b545bf967225f7fe4e8646d47ff01c Mon Sep 17 00:00:00 2001 From: David Eads Date: Tue, 24 Apr 2018 13:41:40 -0400 Subject: [PATCH] add easy to use dynamic client Kubernetes-commit: 3632037e600d82a954c05ecd4f9cb6f1ca93d41c --- dynamic/bad_debt.go | 79 ++++++++++ dynamic/client.go | 308 +++------------------------------------ dynamic/client_pool.go | 2 +- dynamic/client_test.go | 125 ++++++++-------- dynamic/scheme.go | 98 +++++++++++++ dynamic/simple.go | 322 +++++++++++++++++++++++++++++++++++++++++ rest/request.go | 24 ++- 7 files changed, 598 insertions(+), 360 deletions(-) create mode 100644 dynamic/bad_debt.go create mode 100644 dynamic/scheme.go create mode 100644 dynamic/simple.go diff --git a/dynamic/bad_debt.go b/dynamic/bad_debt.go new file mode 100644 index 00000000..8492d56a --- /dev/null +++ b/dynamic/bad_debt.go @@ -0,0 +1,79 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dynamic + +import ( + "encoding/json" + "io" + "strings" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" +) + +// dynamicCodec is a codec that wraps the standard unstructured codec +// with special handling for Status objects. +// Deprecated only used by test code and its wrong +type dynamicCodec struct{} + +func (dynamicCodec) Decode(data []byte, gvk *schema.GroupVersionKind, obj runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) { + obj, gvk, err := unstructured.UnstructuredJSONScheme.Decode(data, gvk, obj) + if err != nil { + return nil, nil, err + } + + if _, ok := obj.(*metav1.Status); !ok && strings.ToLower(gvk.Kind) == "status" { + obj = &metav1.Status{} + err := json.Unmarshal(data, obj) + if err != nil { + return nil, nil, err + } + } + + return obj, gvk, nil +} + +func (dynamicCodec) Encode(obj runtime.Object, w io.Writer) error { + return unstructured.UnstructuredJSONScheme.Encode(obj, w) +} + +// ContentConfig returns a rest.ContentConfig for dynamic types. +// Deprecated only used by test code and its wrong +func ContentConfig() rest.ContentConfig { + var jsonInfo runtime.SerializerInfo + // TODO: scheme.Codecs here should become "pkg/apis/server/scheme" which is the minimal core you need + // to talk to a kubernetes server + for _, info := range scheme.Codecs.SupportedMediaTypes() { + if info.MediaType == runtime.ContentTypeJSON { + jsonInfo = info + break + } + } + + jsonInfo.Serializer = dynamicCodec{} + jsonInfo.PrettySerializer = nil + return rest.ContentConfig{ + AcceptContentTypes: runtime.ContentTypeJSON, + ContentType: runtime.ContentTypeJSON, + NegotiatedSerializer: serializer.NegotiatedSerializerWrapper(jsonInfo), + } +} diff --git a/dynamic/client.go b/dynamic/client.go index 833e4353..43db68c4 100644 --- a/dynamic/client.go +++ b/dynamic/client.go @@ -20,37 +20,24 @@ limitations under the License. package dynamic import ( - "encoding/json" - "errors" - "io" - "net/url" "strings" - "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/conversion/queryparams" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/kubernetes/scheme" restclient "k8s.io/client-go/rest" - "k8s.io/client-go/util/flowcontrol" ) // Interface is a Kubernetes client that allows you to access metadata // and manipulate metadata of a Kubernetes API group. type Interface interface { - // GetRateLimiter returns the rate limiter for this client. - GetRateLimiter() flowcontrol.RateLimiter // Resource returns an API interface to the specified resource for this client's // group and version. If resource is not a namespaced resource, then namespace // is ignored. The ResourceInterface inherits the parameter codec of this client. Resource(resource *metav1.APIResource, namespace string) ResourceInterface - // ParameterCodec returns a client with the provided parameter codec. - ParameterCodec(parameterCodec runtime.ParameterCodec) Interface } // ResourceInterface is an API interface to a specific resource under a @@ -77,303 +64,50 @@ type ResourceInterface interface { // Client is a Kubernetes client that allows you to access metadata // and manipulate metadata of a Kubernetes API group, and implements Interface. type Client struct { - cl *restclient.RESTClient - parameterCodec runtime.ParameterCodec + version schema.GroupVersion + delegate DynamicInterface } // NewClient returns a new client based on the passed in config. The // codec is ignored, as the dynamic client uses it's own codec. -func NewClient(conf *restclient.Config) (*Client, error) { - // avoid changing the original config - confCopy := *conf - conf = &confCopy - - contentConfig := ContentConfig() - contentConfig.GroupVersion = conf.GroupVersion - if conf.NegotiatedSerializer != nil { - contentConfig.NegotiatedSerializer = conf.NegotiatedSerializer - } - conf.ContentConfig = contentConfig - - if conf.APIPath == "" { - conf.APIPath = "/api" - } - - if len(conf.UserAgent) == 0 { - conf.UserAgent = restclient.DefaultKubernetesUserAgent() - } - - cl, err := restclient.RESTClientFor(conf) +func NewClient(conf *restclient.Config, version schema.GroupVersion) (*Client, error) { + delegate, err := NewForConfig(conf) if err != nil { return nil, err } - return &Client{cl: cl}, nil -} - -// GetRateLimiter returns rate limier. -func (c *Client) GetRateLimiter() flowcontrol.RateLimiter { - return c.cl.GetRateLimiter() + return &Client{version: version, delegate: delegate}, nil } // Resource returns an API interface to the specified resource for this client's // group and version. If resource is not a namespaced resource, then namespace // is ignored. The ResourceInterface inherits the parameter codec of c. func (c *Client) Resource(resource *metav1.APIResource, namespace string) ResourceInterface { - return &ResourceClient{ - cl: c.cl, - resource: resource, - ns: namespace, - parameterCodec: c.parameterCodec, - } -} - -// ParameterCodec returns a client with the provided parameter codec. -func (c *Client) ParameterCodec(parameterCodec runtime.ParameterCodec) Interface { - return &Client{ - cl: c.cl, - parameterCodec: parameterCodec, - } -} - -// ResourceClient is an API interface to a specific resource under a -// dynamic client, and implements ResourceInterface. -type ResourceClient struct { - cl *restclient.RESTClient - resource *metav1.APIResource - ns string - parameterCodec runtime.ParameterCodec -} - -func (rc *ResourceClient) parseResourceSubresourceName() (string, []string) { - var resourceName string - var subresourceName []string - if strings.Contains(rc.resource.Name, "/") { - resourceName = strings.Split(rc.resource.Name, "/")[0] - subresourceName = strings.Split(rc.resource.Name, "/")[1:] - } else { - resourceName = rc.resource.Name + resourceTokens := strings.SplitN(resource.Name, "/", 2) + subresource := "" + if len(resourceTokens) > 1 { + subresource = resourceTokens[1] } - return resourceName, subresourceName -} - -// List returns a list of objects for this resource. -func (rc *ResourceClient) List(opts metav1.ListOptions) (runtime.Object, error) { - parameterEncoder := rc.parameterCodec - if parameterEncoder == nil { - parameterEncoder = defaultParameterEncoder + if len(namespace) == 0 { + return oldResourceShim(c.delegate.ClusterSubresource(c.version.WithResource(resourceTokens[0]), subresource)) } - return rc.cl.Get(). - NamespaceIfScoped(rc.ns, rc.resource.Namespaced). - Resource(rc.resource.Name). - VersionedParams(&opts, parameterEncoder). - Do(). - Get() + return oldResourceShim(c.delegate.NamespacedSubresource(c.version.WithResource(resourceTokens[0]), subresource, namespace)) } -// Get gets the resource with the specified name. -func (rc *ResourceClient) Get(name string, opts metav1.GetOptions) (*unstructured.Unstructured, error) { - parameterEncoder := rc.parameterCodec - if parameterEncoder == nil { - parameterEncoder = defaultParameterEncoder - } - result := new(unstructured.Unstructured) - resourceName, subresourceName := rc.parseResourceSubresourceName() - err := rc.cl.Get(). - NamespaceIfScoped(rc.ns, rc.resource.Namespaced). - Resource(resourceName). - SubResource(subresourceName...). - VersionedParams(&opts, parameterEncoder). - Name(name). - Do(). - Into(result) - return result, err +// the old interfaces used the wrong type for lists. this fixes that +func oldResourceShim(in DynamicResourceInterface) ResourceInterface { + return oldResourceShimType{DynamicResourceInterface: in} } -// Delete deletes the resource with the specified name. -func (rc *ResourceClient) Delete(name string, opts *metav1.DeleteOptions) error { - return rc.cl.Delete(). - NamespaceIfScoped(rc.ns, rc.resource.Namespaced). - Resource(rc.resource.Name). - Name(name). - Body(opts). - Do(). - Error() +type oldResourceShimType struct { + DynamicResourceInterface } -// DeleteCollection deletes a collection of objects. -func (rc *ResourceClient) DeleteCollection(deleteOptions *metav1.DeleteOptions, listOptions metav1.ListOptions) error { - parameterEncoder := rc.parameterCodec - if parameterEncoder == nil { - parameterEncoder = defaultParameterEncoder - } - return rc.cl.Delete(). - NamespaceIfScoped(rc.ns, rc.resource.Namespaced). - Resource(rc.resource.Name). - VersionedParams(&listOptions, parameterEncoder). - Body(deleteOptions). - Do(). - Error() +func (s oldResourceShimType) List(opts metav1.ListOptions) (runtime.Object, error) { + return s.DynamicResourceInterface.List(opts) } -// Create creates the provided resource. -func (rc *ResourceClient) Create(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) { - result := new(unstructured.Unstructured) - resourceName, subresourceName := rc.parseResourceSubresourceName() - req := rc.cl.Post(). - NamespaceIfScoped(rc.ns, rc.resource.Namespaced). - Resource(resourceName). - Body(obj) - if len(subresourceName) > 0 { - // If the provided resource is a subresource, the POST request should contain - // object name. Examples of subresources that support Create operation: - // core/v1/pods/{name}/binding - // core/v1/pods/{name}/eviction - // extensions/v1beta1/deployments/{name}/rollback - // apps/v1beta1/deployments/{name}/rollback - // NOTE: Currently our system assumes every subresource object has the same - // name as the parent resource object. E.g. a pods/binding object having - // metadada.name "foo" means pod "foo" is being bound. We may need to - // change this if we break the assumption in the future. - req = req.SubResource(subresourceName...). - Name(obj.GetName()) - } - err := req.Do(). - Into(result) - return result, err +func (s oldResourceShimType) Patch(name string, pt types.PatchType, data []byte) (*unstructured.Unstructured, error) { + return s.DynamicResourceInterface.Patch(name, pt, data) } - -// Update updates the provided resource. -func (rc *ResourceClient) Update(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) { - result := new(unstructured.Unstructured) - if len(obj.GetName()) == 0 { - return result, errors.New("object missing name") - } - resourceName, subresourceName := rc.parseResourceSubresourceName() - err := rc.cl.Put(). - NamespaceIfScoped(rc.ns, rc.resource.Namespaced). - Resource(resourceName). - SubResource(subresourceName...). - // NOTE: Currently our system assumes every subresource object has the same - // name as the parent resource object. E.g. a pods/binding object having - // metadada.name "foo" means pod "foo" is being bound. We may need to - // change this if we break the assumption in the future. - Name(obj.GetName()). - Body(obj). - Do(). - Into(result) - return result, err -} - -// Watch returns a watch.Interface that watches the resource. -func (rc *ResourceClient) Watch(opts metav1.ListOptions) (watch.Interface, error) { - parameterEncoder := rc.parameterCodec - if parameterEncoder == nil { - parameterEncoder = defaultParameterEncoder - } - opts.Watch = true - return rc.cl.Get(). - NamespaceIfScoped(rc.ns, rc.resource.Namespaced). - Resource(rc.resource.Name). - VersionedParams(&opts, parameterEncoder). - Watch() -} - -// Patch applies the patch and returns the patched resource. -func (rc *ResourceClient) Patch(name string, pt types.PatchType, data []byte) (*unstructured.Unstructured, error) { - result := new(unstructured.Unstructured) - resourceName, subresourceName := rc.parseResourceSubresourceName() - err := rc.cl.Patch(pt). - NamespaceIfScoped(rc.ns, rc.resource.Namespaced). - Resource(resourceName). - SubResource(subresourceName...). - Name(name). - Body(data). - Do(). - Into(result) - return result, err -} - -// dynamicCodec is a codec that wraps the standard unstructured codec -// with special handling for Status objects. -type dynamicCodec struct{} - -func (dynamicCodec) Decode(data []byte, gvk *schema.GroupVersionKind, obj runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) { - obj, gvk, err := unstructured.UnstructuredJSONScheme.Decode(data, gvk, obj) - if err != nil { - return nil, nil, err - } - - if _, ok := obj.(*metav1.Status); !ok && strings.ToLower(gvk.Kind) == "status" { - obj = &metav1.Status{} - err := json.Unmarshal(data, obj) - if err != nil { - return nil, nil, err - } - } - - return obj, gvk, nil -} - -func (dynamicCodec) Encode(obj runtime.Object, w io.Writer) error { - return unstructured.UnstructuredJSONScheme.Encode(obj, w) -} - -// ContentConfig returns a restclient.ContentConfig for dynamic types. -func ContentConfig() restclient.ContentConfig { - var jsonInfo runtime.SerializerInfo - // TODO: scheme.Codecs here should become "pkg/apis/server/scheme" which is the minimal core you need - // to talk to a kubernetes server - for _, info := range scheme.Codecs.SupportedMediaTypes() { - if info.MediaType == runtime.ContentTypeJSON { - jsonInfo = info - break - } - } - - jsonInfo.Serializer = dynamicCodec{} - jsonInfo.PrettySerializer = nil - return restclient.ContentConfig{ - AcceptContentTypes: runtime.ContentTypeJSON, - ContentType: runtime.ContentTypeJSON, - NegotiatedSerializer: serializer.NegotiatedSerializerWrapper(jsonInfo), - } -} - -// paramaterCodec is a codec converts an API object to query -// parameters without trying to convert to the target version. -type parameterCodec struct{} - -func (parameterCodec) EncodeParameters(obj runtime.Object, to schema.GroupVersion) (url.Values, error) { - return queryparams.Convert(obj) -} - -func (parameterCodec) DecodeParameters(parameters url.Values, from schema.GroupVersion, into runtime.Object) error { - return errors.New("DecodeParameters not implemented on dynamic parameterCodec") -} - -var defaultParameterEncoder runtime.ParameterCodec = parameterCodec{} - -type versionedParameterEncoderWithV1Fallback struct{} - -func (versionedParameterEncoderWithV1Fallback) EncodeParameters(obj runtime.Object, to schema.GroupVersion) (url.Values, error) { - ret, err := scheme.ParameterCodec.EncodeParameters(obj, to) - if err != nil && runtime.IsNotRegisteredError(err) { - // fallback to v1 - return scheme.ParameterCodec.EncodeParameters(obj, v1.SchemeGroupVersion) - } - return ret, err -} - -func (versionedParameterEncoderWithV1Fallback) DecodeParameters(parameters url.Values, from schema.GroupVersion, into runtime.Object) error { - return errors.New("DecodeParameters not implemented on versionedParameterEncoderWithV1Fallback") -} - -// VersionedParameterEncoderWithV1Fallback is useful for encoding query -// parameters for custom resources. It tries to convert object to the -// specified version before converting it to query parameters, and falls back to -// converting to v1 if the object is not registered in the specified version. -// For the record, currently API server always treats query parameters sent to a -// custom resource endpoint as v1. -var VersionedParameterEncoderWithV1Fallback runtime.ParameterCodec = versionedParameterEncoderWithV1Fallback{} diff --git a/dynamic/client_pool.go b/dynamic/client_pool.go index a5e1b297..f4d258be 100644 --- a/dynamic/client_pool.go +++ b/dynamic/client_pool.go @@ -113,7 +113,7 @@ func (c *clientPoolImpl) ClientForGroupVersionKind(kind schema.GroupVersionKind) // we need to make a client conf.GroupVersion = &gv - dynamicClient, err := NewClient(conf) + dynamicClient, err := NewClient(conf, gv) if err != nil { return nil, err } diff --git a/dynamic/client_test.go b/dynamic/client_test.go index ffa550ed..3cc6e80c 100644 --- a/dynamic/client_test.go +++ b/dynamic/client_test.go @@ -63,7 +63,7 @@ func getClientServer(gv *schema.GroupVersion, h func(http.ResponseWriter, *http. cl, err := NewClient(&restclient.Config{ Host: srv.URL, ContentConfig: restclient.ContentConfig{GroupVersion: gv}, - }) + }, *gv) if err != nil { srv.Close() return nil, nil, err @@ -81,7 +81,7 @@ func TestList(t *testing.T) { }{ { name: "normal_list", - path: "/api/gtest/vtest/rtest", + path: "/apis/gtest/vtest/rtest", resp: getListJSON("vTest", "rTestList", getJSON("vTest", "rTest", "item1"), getJSON("vTest", "rTest", "item2")), @@ -99,7 +99,7 @@ func TestList(t *testing.T) { { name: "namespaced_list", namespace: "nstest", - path: "/api/gtest/vtest/namespaces/nstest/rtest", + path: "/apis/gtest/vtest/namespaces/nstest/rtest", resp: getListJSON("vTest", "rTestList", getJSON("vTest", "rTest", "item1"), getJSON("vTest", "rTest", "item2")), @@ -160,7 +160,7 @@ func TestGet(t *testing.T) { { resource: "rtest", name: "normal_get", - path: "/api/gtest/vtest/rtest/normal_get", + path: "/apis/gtest/vtest/rtest/normal_get", resp: getJSON("vTest", "rTest", "normal_get"), want: getObject("vTest", "rTest", "normal_get"), }, @@ -168,14 +168,14 @@ func TestGet(t *testing.T) { resource: "rtest", namespace: "nstest", name: "namespaced_get", - path: "/api/gtest/vtest/namespaces/nstest/rtest/namespaced_get", + path: "/apis/gtest/vtest/namespaces/nstest/rtest/namespaced_get", resp: getJSON("vTest", "rTest", "namespaced_get"), want: getObject("vTest", "rTest", "namespaced_get"), }, { resource: "rtest/srtest", name: "normal_subresource_get", - path: "/api/gtest/vtest/rtest/normal_subresource_get/srtest", + path: "/apis/gtest/vtest/rtest/normal_subresource_get/srtest", resp: getJSON("vTest", "srTest", "normal_subresource_get"), want: getObject("vTest", "srTest", "normal_subresource_get"), }, @@ -183,7 +183,7 @@ func TestGet(t *testing.T) { resource: "rtest/srtest", namespace: "nstest", name: "namespaced_subresource_get", - path: "/api/gtest/vtest/namespaces/nstest/rtest/namespaced_subresource_get/srtest", + path: "/apis/gtest/vtest/namespaces/nstest/rtest/namespaced_subresource_get/srtest", resp: getJSON("vTest", "srTest", "namespaced_subresource_get"), want: getObject("vTest", "srTest", "namespaced_subresource_get"), }, @@ -222,23 +222,33 @@ func TestGet(t *testing.T) { } func TestDelete(t *testing.T) { + background := metav1.DeletePropagationBackground + uid := types.UID("uid") + statusOK := &metav1.Status{ TypeMeta: metav1.TypeMeta{Kind: "Status"}, Status: metav1.StatusSuccess, } tcs := []struct { - namespace string - name string - path string + namespace string + name string + path string + deleteOptions *metav1.DeleteOptions }{ { name: "normal_delete", - path: "/api/gtest/vtest/rtest/normal_delete", + path: "/apis/gtest/vtest/rtest/normal_delete", }, { namespace: "nstest", name: "namespaced_delete", - path: "/api/gtest/vtest/namespaces/nstest/rtest/namespaced_delete", + path: "/apis/gtest/vtest/namespaces/nstest/rtest/namespaced_delete", + }, + { + namespace: "nstest", + name: "namespaced_delete_with_options", + path: "/apis/gtest/vtest/namespaces/nstest/rtest/namespaced_delete_with_options", + deleteOptions: &metav1.DeleteOptions{Preconditions: &metav1.Preconditions{UID: &uid}, PropagationPolicy: &background}, }, } for _, tc := range tcs { @@ -262,7 +272,7 @@ func TestDelete(t *testing.T) { } defer srv.Close() - err = cl.Resource(resource, tc.namespace).Delete(tc.name, nil) + err = cl.Resource(resource, tc.namespace).Delete(tc.name, tc.deleteOptions) if err != nil { t.Errorf("unexpected error when deleting %q: %v", tc.name, err) continue @@ -282,12 +292,12 @@ func TestDeleteCollection(t *testing.T) { }{ { name: "normal_delete_collection", - path: "/api/gtest/vtest/rtest", + path: "/apis/gtest/vtest/rtest", }, { namespace: "nstest", name: "namespaced_delete_collection", - path: "/api/gtest/vtest/namespaces/nstest/rtest", + path: "/apis/gtest/vtest/namespaces/nstest/rtest", }, } for _, tc := range tcs { @@ -330,28 +340,15 @@ func TestCreate(t *testing.T) { { resource: "rtest", name: "normal_create", - path: "/api/gtest/vtest/rtest", - obj: getObject("vTest", "rTest", "normal_create"), + path: "/apis/gtest/vtest/rtest", + obj: getObject("gtest/vTest", "rTest", "normal_create"), }, { resource: "rtest", name: "namespaced_create", namespace: "nstest", - path: "/api/gtest/vtest/namespaces/nstest/rtest", - obj: getObject("vTest", "rTest", "namespaced_create"), - }, - { - resource: "rtest/srtest", - name: "normal_subresource_create", - path: "/api/gtest/vtest/rtest/normal_subresource_create/srtest", - obj: getObject("vTest", "srTest", "normal_subresource_create"), - }, - { - resource: "rtest/srtest", - name: "namespaced_subresource_create", - namespace: "nstest", - path: "/api/gtest/vtest/namespaces/nstest/rtest/namespaced_subresource_create/srtest", - obj: getObject("vTest", "srTest", "namespaced_subresource_create"), + path: "/apis/gtest/vtest/namespaces/nstest/rtest", + obj: getObject("gtest/vTest", "rTest", "namespaced_create"), }, } for _, tc := range tcs { @@ -405,28 +402,28 @@ func TestUpdate(t *testing.T) { { resource: "rtest", name: "normal_update", - path: "/api/gtest/vtest/rtest/normal_update", - obj: getObject("vTest", "rTest", "normal_update"), + path: "/apis/gtest/vtest/rtest/normal_update", + obj: getObject("gtest/vTest", "rTest", "normal_update"), }, { resource: "rtest", name: "namespaced_update", namespace: "nstest", - path: "/api/gtest/vtest/namespaces/nstest/rtest/namespaced_update", - obj: getObject("vTest", "rTest", "namespaced_update"), + path: "/apis/gtest/vtest/namespaces/nstest/rtest/namespaced_update", + obj: getObject("gtest/vTest", "rTest", "namespaced_update"), }, { resource: "rtest/srtest", name: "normal_subresource_update", - path: "/api/gtest/vtest/rtest/normal_update/srtest", - obj: getObject("vTest", "srTest", "normal_update"), + path: "/apis/gtest/vtest/rtest/normal_update/srtest", + obj: getObject("gtest/vTest", "srTest", "normal_update"), }, { resource: "rtest/srtest", name: "namespaced_subresource_update", namespace: "nstest", - path: "/api/gtest/vtest/namespaces/nstest/rtest/namespaced_update/srtest", - obj: getObject("vTest", "srTest", "namespaced_update"), + path: "/apis/gtest/vtest/namespaces/nstest/rtest/namespaced_update/srtest", + obj: getObject("gtest/vTest", "srTest", "namespaced_update"), }, } for _, tc := range tcs { @@ -479,23 +476,23 @@ func TestWatch(t *testing.T) { }{ { name: "normal_watch", - path: "/api/gtest/vtest/rtest", + path: "/apis/gtest/vtest/rtest", query: "watch=true", events: []watch.Event{ - {Type: watch.Added, Object: getObject("vTest", "rTest", "normal_watch")}, - {Type: watch.Modified, Object: getObject("vTest", "rTest", "normal_watch")}, - {Type: watch.Deleted, Object: getObject("vTest", "rTest", "normal_watch")}, + {Type: watch.Added, Object: getObject("gtest/vTest", "rTest", "normal_watch")}, + {Type: watch.Modified, Object: getObject("gtest/vTest", "rTest", "normal_watch")}, + {Type: watch.Deleted, Object: getObject("gtest/vTest", "rTest", "normal_watch")}, }, }, { name: "namespaced_watch", namespace: "nstest", - path: "/api/gtest/vtest/namespaces/nstest/rtest", + path: "/apis/gtest/vtest/namespaces/nstest/rtest", query: "watch=true", events: []watch.Event{ - {Type: watch.Added, Object: getObject("vTest", "rTest", "namespaced_watch")}, - {Type: watch.Modified, Object: getObject("vTest", "rTest", "namespaced_watch")}, - {Type: watch.Deleted, Object: getObject("vTest", "rTest", "namespaced_watch")}, + {Type: watch.Added, Object: getObject("gtest/vTest", "rTest", "namespaced_watch")}, + {Type: watch.Modified, Object: getObject("gtest/vTest", "rTest", "namespaced_watch")}, + {Type: watch.Deleted, Object: getObject("gtest/vTest", "rTest", "namespaced_watch")}, }, }, } @@ -552,32 +549,32 @@ func TestPatch(t *testing.T) { { resource: "rtest", name: "normal_patch", - path: "/api/gtest/vtest/rtest/normal_patch", - patch: getJSON("vTest", "rTest", "normal_patch"), - want: getObject("vTest", "rTest", "normal_patch"), + path: "/apis/gtest/vtest/rtest/normal_patch", + patch: getJSON("gtest/vTest", "rTest", "normal_patch"), + want: getObject("gtest/vTest", "rTest", "normal_patch"), }, { resource: "rtest", name: "namespaced_patch", namespace: "nstest", - path: "/api/gtest/vtest/namespaces/nstest/rtest/namespaced_patch", - patch: getJSON("vTest", "rTest", "namespaced_patch"), - want: getObject("vTest", "rTest", "namespaced_patch"), + path: "/apis/gtest/vtest/namespaces/nstest/rtest/namespaced_patch", + patch: getJSON("gtest/vTest", "rTest", "namespaced_patch"), + want: getObject("gtest/vTest", "rTest", "namespaced_patch"), }, { resource: "rtest/srtest", name: "normal_subresource_patch", - path: "/api/gtest/vtest/rtest/normal_subresource_patch/srtest", - patch: getJSON("vTest", "srTest", "normal_subresource_patch"), - want: getObject("vTest", "srTest", "normal_subresource_patch"), + path: "/apis/gtest/vtest/rtest/normal_subresource_patch/srtest", + patch: getJSON("gtest/vTest", "srTest", "normal_subresource_patch"), + want: getObject("gtest/vTest", "srTest", "normal_subresource_patch"), }, { resource: "rtest/srtest", name: "namespaced_subresource_patch", namespace: "nstest", - path: "/api/gtest/vtest/namespaces/nstest/rtest/namespaced_subresource_patch/srtest", - patch: getJSON("vTest", "srTest", "namespaced_subresource_patch"), - want: getObject("vTest", "srTest", "namespaced_subresource_patch"), + path: "/apis/gtest/vtest/namespaces/nstest/rtest/namespaced_subresource_patch/srtest", + patch: getJSON("gtest/vTest", "srTest", "namespaced_subresource_patch"), + want: getObject("gtest/vTest", "srTest", "namespaced_subresource_patch"), }, } for _, tc := range tcs { @@ -624,11 +621,3 @@ func TestPatch(t *testing.T) { } } } - -func TestVersionedParameterEncoderWithV1Fallback(t *testing.T) { - enc := VersionedParameterEncoderWithV1Fallback - _, err := enc.EncodeParameters(&metav1.ListOptions{}, schema.GroupVersion{Group: "foo.bar.com", Version: "v4"}) - if err != nil { - t.Errorf("Unexpected error: %v", err) - } -} diff --git a/dynamic/scheme.go b/dynamic/scheme.go new file mode 100644 index 00000000..c4aa081f --- /dev/null +++ b/dynamic/scheme.go @@ -0,0 +1,98 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dynamic + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/runtime/serializer/json" + "k8s.io/apimachinery/pkg/runtime/serializer/versioning" +) + +var watchScheme = runtime.NewScheme() +var basicScheme = runtime.NewScheme() +var deleteScheme = runtime.NewScheme() +var parameterScheme = runtime.NewScheme() +var deleteOptionsCodec = serializer.NewCodecFactory(deleteScheme) +var dynamicParameterCodec = runtime.NewParameterCodec(parameterScheme) + +var versionV1 = schema.GroupVersion{Version: "v1"} + +func init() { + metav1.AddToGroupVersion(watchScheme, versionV1) + metav1.AddToGroupVersion(basicScheme, versionV1) + metav1.AddToGroupVersion(parameterScheme, versionV1) + metav1.AddToGroupVersion(deleteScheme, versionV1) +} + +var watchJsonSerializerInfo = runtime.SerializerInfo{ + MediaType: "application/json", + EncodesAsText: true, + Serializer: json.NewSerializer(json.DefaultMetaFactory, watchScheme, watchScheme, false), + PrettySerializer: json.NewSerializer(json.DefaultMetaFactory, watchScheme, watchScheme, true), + StreamSerializer: &runtime.StreamSerializerInfo{ + EncodesAsText: true, + Serializer: json.NewSerializer(json.DefaultMetaFactory, watchScheme, watchScheme, false), + Framer: json.Framer, + }, +} + +// watchNegotiatedSerializer is used to read the wrapper of the watch stream +type watchNegotiatedSerializer struct{} + +var watchNegotiatedSerializerInstance = watchNegotiatedSerializer{} + +func (s watchNegotiatedSerializer) SupportedMediaTypes() []runtime.SerializerInfo { + return []runtime.SerializerInfo{watchJsonSerializerInfo} +} + +func (s watchNegotiatedSerializer) EncoderForVersion(encoder runtime.Encoder, gv runtime.GroupVersioner) runtime.Encoder { + return versioning.NewDefaultingCodecForScheme(watchScheme, encoder, nil, gv, nil) +} + +func (s watchNegotiatedSerializer) DecoderToVersion(decoder runtime.Decoder, gv runtime.GroupVersioner) runtime.Decoder { + return versioning.NewDefaultingCodecForScheme(watchScheme, nil, decoder, nil, gv) +} + +// basicNegotiatedSerializer is used to handle discovery and error handling serialization +type basicNegotiatedSerializer struct{} + +func (s basicNegotiatedSerializer) SupportedMediaTypes() []runtime.SerializerInfo { + return []runtime.SerializerInfo{ + { + MediaType: "application/json", + EncodesAsText: true, + Serializer: json.NewSerializer(json.DefaultMetaFactory, basicScheme, basicScheme, false), + PrettySerializer: json.NewSerializer(json.DefaultMetaFactory, basicScheme, basicScheme, true), + StreamSerializer: &runtime.StreamSerializerInfo{ + EncodesAsText: true, + Serializer: json.NewSerializer(json.DefaultMetaFactory, basicScheme, basicScheme, false), + Framer: json.Framer, + }, + }, + } +} + +func (s basicNegotiatedSerializer) EncoderForVersion(encoder runtime.Encoder, gv runtime.GroupVersioner) runtime.Encoder { + return versioning.NewDefaultingCodecForScheme(watchScheme, encoder, nil, gv, nil) +} + +func (s basicNegotiatedSerializer) DecoderToVersion(decoder runtime.Decoder, gv runtime.GroupVersioner) runtime.Decoder { + return versioning.NewDefaultingCodecForScheme(watchScheme, nil, decoder, nil, gv) +} diff --git a/dynamic/simple.go b/dynamic/simple.go new file mode 100644 index 00000000..350a2865 --- /dev/null +++ b/dynamic/simple.go @@ -0,0 +1,322 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dynamic + +import ( + "fmt" + "io" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer/streaming" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/rest" +) + +type DynamicInterface interface { + ClusterResource(resource schema.GroupVersionResource) DynamicResourceInterface + NamespacedResource(resource schema.GroupVersionResource, namespace string) DynamicResourceInterface + + // Deprecated, this isn't how we want to do it + ClusterSubresource(resource schema.GroupVersionResource, subresource string) DynamicResourceInterface + // Deprecated, this isn't how we want to do it + NamespacedSubresource(resource schema.GroupVersionResource, subresource, namespace string) DynamicResourceInterface +} + +type DynamicResourceInterface interface { + Create(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) + Update(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) + UpdateStatus(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) + Delete(name string, options *metav1.DeleteOptions) error + DeleteCollection(options *metav1.DeleteOptions, listOptions metav1.ListOptions) error + Get(name string, options metav1.GetOptions) (*unstructured.Unstructured, error) + List(opts metav1.ListOptions) (*unstructured.UnstructuredList, error) + Watch(opts metav1.ListOptions) (watch.Interface, error) + Patch(name string, pt types.PatchType, data []byte, subresources ...string) (*unstructured.Unstructured, error) +} + +type dynamicClient struct { + client *rest.RESTClient +} + +var _ DynamicInterface = &dynamicClient{} + +func NewForConfig(inConfig *rest.Config) (DynamicInterface, error) { + config := rest.CopyConfig(inConfig) + // for serializing the options + config.GroupVersion = &schema.GroupVersion{} + config.APIPath = "/if-you-see-this-search-for-the-break" + config.AcceptContentTypes = "application/json" + config.ContentType = "application/json" + config.NegotiatedSerializer = basicNegotiatedSerializer{} // this gets used for discovery and error handling types + if config.UserAgent == "" { + config.UserAgent = rest.DefaultKubernetesUserAgent() + } + + restClient, err := rest.RESTClientFor(config) + if err != nil { + return nil, err + } + + return &dynamicClient{client: restClient}, nil +} + +type dynamicResourceClient struct { + client *dynamicClient + namespace string + resource schema.GroupVersionResource + subresource string +} + +func (c *dynamicClient) ClusterResource(resource schema.GroupVersionResource) DynamicResourceInterface { + return &dynamicResourceClient{client: c, resource: resource} +} +func (c *dynamicClient) NamespacedResource(resource schema.GroupVersionResource, namespace string) DynamicResourceInterface { + return &dynamicResourceClient{client: c, resource: resource, namespace: namespace} +} + +func (c *dynamicClient) ClusterSubresource(resource schema.GroupVersionResource, subresource string) DynamicResourceInterface { + return &dynamicResourceClient{client: c, resource: resource, subresource: subresource} +} +func (c *dynamicClient) NamespacedSubresource(resource schema.GroupVersionResource, subresource, namespace string) DynamicResourceInterface { + return &dynamicResourceClient{client: c, resource: resource, namespace: namespace, subresource: subresource} +} + +func (c *dynamicResourceClient) Create(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) { + if len(c.subresource) > 0 { + return nil, fmt.Errorf("create not supported for subresources") + } + + outBytes, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj) + if err != nil { + return nil, err + } + + result := c.client.client.Post().AbsPath(c.makeURLSegments("")...).Body(outBytes).Do() + if err := result.Error(); err != nil { + return nil, err + } + + retBytes, err := result.Raw() + if err != nil { + return nil, err + } + uncastObj, err := runtime.Decode(unstructured.UnstructuredJSONScheme, retBytes) + if err != nil { + return nil, err + } + return uncastObj.(*unstructured.Unstructured), nil +} + +func (c *dynamicResourceClient) Update(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) { + accessor, err := meta.Accessor(obj) + if err != nil { + return nil, err + } + outBytes, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj) + if err != nil { + return nil, err + } + + result := c.client.client.Put().AbsPath(c.makeURLSegments(accessor.GetName())...).Body(outBytes).Do() + if err := result.Error(); err != nil { + return nil, err + } + + retBytes, err := result.Raw() + if err != nil { + return nil, err + } + uncastObj, err := runtime.Decode(unstructured.UnstructuredJSONScheme, retBytes) + if err != nil { + return nil, err + } + return uncastObj.(*unstructured.Unstructured), nil +} + +func (c *dynamicResourceClient) UpdateStatus(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) { + accessor, err := meta.Accessor(obj) + if err != nil { + return nil, err + } + + result := c.client.client.Put().AbsPath(append(c.makeURLSegments(accessor.GetName()), "status")...).Body(obj).Do() + uncastObj, err := result.Get() + if err != nil { + return nil, err + } + return uncastObj.(*unstructured.Unstructured), nil +} + +func (c *dynamicResourceClient) Delete(name string, opts *metav1.DeleteOptions) error { + if opts == nil { + opts = &metav1.DeleteOptions{} + } + if opts == nil { + opts = &metav1.DeleteOptions{} + } + deleteOptionsByte, err := runtime.Encode(deleteOptionsCodec.LegacyCodec(schema.GroupVersion{Version: "v1"}), opts) + if err != nil { + return err + } + + result := c.client.client.Delete().AbsPath(c.makeURLSegments(name)...).Body(deleteOptionsByte).Do() + return result.Error() +} + +func (c *dynamicResourceClient) DeleteCollection(opts *metav1.DeleteOptions, listOptions metav1.ListOptions) error { + if len(c.subresource) > 0 { + return fmt.Errorf("deletecollection not supported for subresources") + } + + if opts == nil { + opts = &metav1.DeleteOptions{} + } + deleteOptionsByte, err := runtime.Encode(deleteOptionsCodec.LegacyCodec(schema.GroupVersion{Version: "v1"}), opts) + if err != nil { + return err + } + + result := c.client.client.Delete().AbsPath(c.makeURLSegments("")...).Body(deleteOptionsByte).SpecificallyVersionedParams(&listOptions, dynamicParameterCodec, versionV1).Do() + return result.Error() +} + +func (c *dynamicResourceClient) Get(name string, opts metav1.GetOptions) (*unstructured.Unstructured, error) { + result := c.client.client.Get().AbsPath(c.makeURLSegments(name)...).SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1).Do() + if err := result.Error(); err != nil { + return nil, err + } + retBytes, err := result.Raw() + if err != nil { + return nil, err + } + uncastObj, err := runtime.Decode(unstructured.UnstructuredJSONScheme, retBytes) + if err != nil { + return nil, err + } + return uncastObj.(*unstructured.Unstructured), nil +} + +func (c *dynamicResourceClient) List(opts metav1.ListOptions) (*unstructured.UnstructuredList, error) { + if len(c.subresource) > 0 { + return nil, fmt.Errorf("list not supported for subresources") + } + + result := c.client.client.Get().AbsPath(c.makeURLSegments("")...).SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1).Do() + if err := result.Error(); err != nil { + return nil, err + } + retBytes, err := result.Raw() + if err != nil { + return nil, err + } + uncastObj, err := runtime.Decode(unstructured.UnstructuredJSONScheme, retBytes) + if err != nil { + return nil, err + } + if list, ok := uncastObj.(*unstructured.UnstructuredList); ok { + return list, nil + } + + list, err := uncastObj.(*unstructured.Unstructured).ToList() + if err != nil { + return nil, err + } + return list, nil +} + +func (c *dynamicResourceClient) Watch(opts metav1.ListOptions) (watch.Interface, error) { + if len(c.subresource) > 0 { + return nil, fmt.Errorf("watch not supported for subresources") + } + + internalGV := schema.GroupVersions{ + {Group: c.resource.Group, Version: runtime.APIVersionInternal}, + // always include the legacy group as a decoding target to handle non-error `Status` return types + {Group: "", Version: runtime.APIVersionInternal}, + } + s := &rest.Serializers{ + Encoder: watchNegotiatedSerializerInstance.EncoderForVersion(watchJsonSerializerInfo.Serializer, c.resource.GroupVersion()), + Decoder: watchNegotiatedSerializerInstance.DecoderToVersion(watchJsonSerializerInfo.Serializer, internalGV), + + RenegotiatedDecoder: func(contentType string, params map[string]string) (runtime.Decoder, error) { + return watchNegotiatedSerializerInstance.DecoderToVersion(watchJsonSerializerInfo.Serializer, internalGV), nil + }, + StreamingSerializer: watchJsonSerializerInfo.StreamSerializer.Serializer, + Framer: watchJsonSerializerInfo.StreamSerializer.Framer, + } + + wrappedDecoderFn := func(body io.ReadCloser) streaming.Decoder { + framer := s.Framer.NewFrameReader(body) + return streaming.NewDecoder(framer, s.StreamingSerializer) + } + + opts.Watch = true + return c.client.client.Get().AbsPath(c.makeURLSegments("")...). + SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1). + WatchWithSpecificDecoders(wrappedDecoderFn, unstructured.UnstructuredJSONScheme) +} + +func (c *dynamicResourceClient) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (*unstructured.Unstructured, error) { + result := c.client.client.Patch(pt).AbsPath(append(c.makeURLSegments(name), subresources...)...).Body(data).Do() + if err := result.Error(); err != nil { + return nil, err + } + retBytes, err := result.Raw() + if err != nil { + return nil, err + } + uncastObj, err := runtime.Decode(unstructured.UnstructuredJSONScheme, retBytes) + if err != nil { + return nil, err + } + return uncastObj.(*unstructured.Unstructured), nil +} + +func (c *dynamicResourceClient) makeURLSegments(name string) []string { + url := []string{} + if len(c.resource.Group) == 0 { + url = append(url, "api") + } else { + url = append(url, "apis", c.resource.Group) + } + url = append(url, c.resource.Version) + + if len(c.namespace) > 0 { + url = append(url, "namespaces", c.namespace) + } + url = append(url, c.resource.Resource) + + if len(name) > 0 { + url = append(url, name) + + // subresources only work on things with names + if len(c.subresource) > 0 { + url = append(url, c.subresource) + } + } else { + if len(c.subresource) > 0 { + panic("somehow snuck a subresource and an empty name. programmer error") + } + } + + return url +} diff --git a/rest/request.go b/rest/request.go index 9df0b444..09ffd76d 100644 --- a/rest/request.go +++ b/rest/request.go @@ -317,10 +317,14 @@ func (r *Request) Param(paramName, s string) *Request { // VersionedParams will not write query parameters that have omitempty set and are empty. If a // parameter has already been set it is appended to (Params and VersionedParams are additive). func (r *Request) VersionedParams(obj runtime.Object, codec runtime.ParameterCodec) *Request { + return r.SpecificallyVersionedParams(obj, codec, *r.content.GroupVersion) +} + +func (r *Request) SpecificallyVersionedParams(obj runtime.Object, codec runtime.ParameterCodec, version schema.GroupVersion) *Request { if r.err != nil { return r } - params, err := codec.EncodeParameters(obj, *r.content.GroupVersion) + params, err := codec.EncodeParameters(obj, version) if err != nil { r.err = err return r @@ -485,6 +489,19 @@ func (r *Request) tryThrottle() { // Watch attempts to begin watching the requested location. // Returns a watch.Interface, or an error. func (r *Request) Watch() (watch.Interface, error) { + return r.WatchWithSpecificDecoders( + func(body io.ReadCloser) streaming.Decoder { + framer := r.serializers.Framer.NewFrameReader(body) + return streaming.NewDecoder(framer, r.serializers.StreamingSerializer) + }, + r.serializers.Decoder, + ) +} + +// WatchWithSpecificDecoders attempts to begin watching the requested location with a *different* decoder. +// Turns out that you want one "standard" decoder for the watch event and one "personal" decoder for the content +// Returns a watch.Interface, or an error. +func (r *Request) WatchWithSpecificDecoders(wrapperDecoderFn func(io.ReadCloser) streaming.Decoder, embeddedDecoder runtime.Decoder) (watch.Interface, error) { // We specifically don't want to rate limit watches, so we // don't use r.throttle here. if r.err != nil { @@ -532,9 +549,8 @@ func (r *Request) Watch() (watch.Interface, error) { } return nil, fmt.Errorf("for request '%+v', got status: %v", url, resp.StatusCode) } - framer := r.serializers.Framer.NewFrameReader(resp.Body) - decoder := streaming.NewDecoder(framer, r.serializers.StreamingSerializer) - return watch.NewStreamWatcher(restclientwatch.NewDecoder(decoder, r.serializers.Decoder)), nil + wrapperDecoder := wrapperDecoderFn(resp.Body) + return watch.NewStreamWatcher(restclientwatch.NewDecoder(wrapperDecoder, embeddedDecoder)), nil } // updateURLMetrics is a convenience function for pushing metrics.