add easy to use dynamic client

This commit is contained in:
David Eads 2018-04-24 13:41:40 -04:00
parent 3fb88a23d9
commit 3632037e60
26 changed files with 838 additions and 430 deletions

View File

@ -36,7 +36,6 @@ import (
cacheddiscovery "k8s.io/client-go/discovery/cached" cacheddiscovery "k8s.io/client-go/discovery/cached"
"k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint" endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/pkg/controller/garbagecollector" "k8s.io/kubernetes/pkg/controller/garbagecollector"
@ -294,9 +293,6 @@ func startResourceQuotaController(ctx ControllerContext) (bool, error) {
} }
func startNamespaceController(ctx ControllerContext) (bool, error) { func startNamespaceController(ctx ControllerContext) (bool, error) {
// TODO: should use a dynamic RESTMapper built from the discovery results.
restMapper := legacyscheme.Registry.RESTMapper()
// the namespace cleanup controller is very chatty. It makes lots of discovery calls and then it makes lots of delete calls // the namespace cleanup controller is very chatty. It makes lots of discovery calls and then it makes lots of delete calls
// the ratelimiter negatively affects its speed. Deleting 100 total items in a namespace (that's only a few of each resource // the ratelimiter negatively affects its speed. Deleting 100 total items in a namespace (that's only a few of each resource
// including events), takes ~10 seconds by default. // including events), takes ~10 seconds by default.
@ -304,13 +300,17 @@ func startNamespaceController(ctx ControllerContext) (bool, error) {
nsKubeconfig.QPS *= 10 nsKubeconfig.QPS *= 10
nsKubeconfig.Burst *= 10 nsKubeconfig.Burst *= 10
namespaceKubeClient := clientset.NewForConfigOrDie(nsKubeconfig) namespaceKubeClient := clientset.NewForConfigOrDie(nsKubeconfig)
namespaceClientPool := dynamic.NewClientPool(nsKubeconfig, restMapper, dynamic.LegacyAPIPathResolverFunc)
discoverResourcesFn := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources discoverResourcesFn := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources
dynamicClient, err := dynamic.NewForConfig(nsKubeconfig)
if err != nil {
return true, err
}
namespaceController := namespacecontroller.NewNamespaceController( namespaceController := namespacecontroller.NewNamespaceController(
namespaceKubeClient, namespaceKubeClient,
namespaceClientPool, dynamicClient,
discoverResourcesFn, discoverResourcesFn,
ctx.InformerFactory.Core().V1().Namespaces(), ctx.InformerFactory.Core().V1().Namespaces(),
ctx.ComponentConfig.NamespaceController.NamespaceSyncPeriod.Duration, ctx.ComponentConfig.NamespaceController.NamespaceSyncPeriod.Duration,

View File

@ -303,7 +303,7 @@ func (gc *GarbageCollector) isDangling(reference metav1.OwnerReference, item *no
// TODO: It's only necessary to talk to the API server if the owner node // TODO: It's only necessary to talk to the API server if the owner node
// is a "virtual" node. The local graph could lag behind the real // is a "virtual" node. The local graph could lag behind the real
// status, but in practice, the difference is small. // status, but in practice, the difference is small.
owner, err = client.Resource(resource, item.identity.Namespace).Get(reference.Name, metav1.GetOptions{}) owner, err = client.Resource(resource, resourceDefaultNamespace(resource, item.identity.Namespace)).Get(reference.Name, metav1.GetOptions{})
switch { switch {
case errors.IsNotFound(err): case errors.IsNotFound(err):
gc.absentOwnerCache.Add(reference.UID) gc.absentOwnerCache.Add(reference.UID)

View File

@ -686,9 +686,13 @@ func TestOrphanDependentsFailure(t *testing.T) {
}, },
} }
err := gc.orphanDependents(objectReference{}, dependents) err := gc.orphanDependents(objectReference{}, dependents)
expected := `the server reported a conflict (patch pods pod)` expected := `the server reported a conflict`
if err == nil || !strings.Contains(err.Error(), expected) { if err == nil || !strings.Contains(err.Error(), expected) {
t.Errorf("expected error contains text %s, got %v", expected, err) if err != nil {
t.Errorf("expected error contains text %q, got %q", expected, err.Error())
} else {
t.Errorf("expected error contains text %q, got nil", expected)
}
} }
} }

View File

@ -135,7 +135,7 @@ func listWatcher(client dynamic.Interface, resource schema.GroupVersionResource)
// namespaces if it's namespace scoped, so leave // namespaces if it's namespace scoped, so leave
// APIResource.Namespaced as false is all right. // APIResource.Namespaced as false is all right.
apiResource := metav1.APIResource{Name: resource.Resource} apiResource := metav1.APIResource{Name: resource.Resource}
return client.ParameterCodec(dynamic.VersionedParameterEncoderWithV1Fallback). return client.
Resource(&apiResource, metav1.NamespaceAll). Resource(&apiResource, metav1.NamespaceAll).
List(options) List(options)
}, },
@ -145,7 +145,7 @@ func listWatcher(client dynamic.Interface, resource schema.GroupVersionResource)
// namespaces if it's namespace scoped, so leave // namespaces if it's namespace scoped, so leave
// APIResource.Namespaced as false is all right. // APIResource.Namespaced as false is all right.
apiResource := metav1.APIResource{Name: resource.Resource} apiResource := metav1.APIResource{Name: resource.Resource}
return client.ParameterCodec(dynamic.VersionedParameterEncoderWithV1Fallback). return client.
Resource(&apiResource, metav1.NamespaceAll). Resource(&apiResource, metav1.NamespaceAll).
Watch(options) Watch(options)
}, },

View File

@ -30,6 +30,14 @@ import (
"k8s.io/client-go/util/retry" "k8s.io/client-go/util/retry"
) )
// cluster scoped resources don't have namespaces. Default to the item's namespace, but clear it for cluster scoped resources
func resourceDefaultNamespace(resource *metav1.APIResource, defaultNamespace string) string {
if resource.Namespaced {
return defaultNamespace
}
return ""
}
// apiResource consults the REST mapper to translate an <apiVersion, kind, // apiResource consults the REST mapper to translate an <apiVersion, kind,
// namespace> tuple to a unversioned.APIResource struct. // namespace> tuple to a unversioned.APIResource struct.
func (gc *GarbageCollector) apiResource(apiVersion, kind string) (*metav1.APIResource, error) { func (gc *GarbageCollector) apiResource(apiVersion, kind string) (*metav1.APIResource, error) {
@ -60,7 +68,7 @@ func (gc *GarbageCollector) deleteObject(item objectReference, policy *metav1.De
uid := item.UID uid := item.UID
preconditions := metav1.Preconditions{UID: &uid} preconditions := metav1.Preconditions{UID: &uid}
deleteOptions := metav1.DeleteOptions{Preconditions: &preconditions, PropagationPolicy: policy} deleteOptions := metav1.DeleteOptions{Preconditions: &preconditions, PropagationPolicy: policy}
return client.Resource(resource, item.Namespace).Delete(item.Name, &deleteOptions) return client.Resource(resource, resourceDefaultNamespace(resource, item.Namespace)).Delete(item.Name, &deleteOptions)
} }
func (gc *GarbageCollector) getObject(item objectReference) (*unstructured.Unstructured, error) { func (gc *GarbageCollector) getObject(item objectReference) (*unstructured.Unstructured, error) {
@ -73,7 +81,7 @@ func (gc *GarbageCollector) getObject(item objectReference) (*unstructured.Unstr
if err != nil { if err != nil {
return nil, err return nil, err
} }
return client.Resource(resource, item.Namespace).Get(item.Name, metav1.GetOptions{}) return client.Resource(resource, resourceDefaultNamespace(resource, item.Namespace)).Get(item.Name, metav1.GetOptions{})
} }
func (gc *GarbageCollector) updateObject(item objectReference, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) { func (gc *GarbageCollector) updateObject(item objectReference, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
@ -86,7 +94,7 @@ func (gc *GarbageCollector) updateObject(item objectReference, obj *unstructured
if err != nil { if err != nil {
return nil, err return nil, err
} }
return client.Resource(resource, item.Namespace).Update(obj) return client.Resource(resource, resourceDefaultNamespace(resource, item.Namespace)).Update(obj)
} }
func (gc *GarbageCollector) patchObject(item objectReference, patch []byte) (*unstructured.Unstructured, error) { func (gc *GarbageCollector) patchObject(item objectReference, patch []byte) (*unstructured.Unstructured, error) {
@ -99,7 +107,7 @@ func (gc *GarbageCollector) patchObject(item objectReference, patch []byte) (*un
if err != nil { if err != nil {
return nil, err return nil, err
} }
return client.Resource(resource, item.Namespace).Patch(item.Name, types.StrategicMergePatchType, patch) return client.Resource(resource, resourceDefaultNamespace(resource, item.Namespace)).Patch(item.Name, types.StrategicMergePatchType, patch)
} }
// TODO: Using Patch when strategicmerge supports deleting an entry from a // TODO: Using Patch when strategicmerge supports deleting an entry from a

View File

@ -31,7 +31,6 @@ go_test(
srcs = ["namespaced_resources_deleter_test.go"], srcs = ["namespaced_resources_deleter_test.go"],
embed = [":go_default_library"], embed = [":go_default_library"],
deps = [ deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/apis/core:go_default_library", "//pkg/apis/core:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",

View File

@ -43,13 +43,13 @@ type NamespacedResourcesDeleterInterface interface {
} }
func NewNamespacedResourcesDeleter(nsClient v1clientset.NamespaceInterface, func NewNamespacedResourcesDeleter(nsClient v1clientset.NamespaceInterface,
clientPool dynamic.ClientPool, podsGetter v1clientset.PodsGetter, dynamicClient dynamic.DynamicInterface, podsGetter v1clientset.PodsGetter,
discoverResourcesFn func() ([]*metav1.APIResourceList, error), discoverResourcesFn func() ([]*metav1.APIResourceList, error),
finalizerToken v1.FinalizerName, deleteNamespaceWhenDone bool) NamespacedResourcesDeleterInterface { finalizerToken v1.FinalizerName, deleteNamespaceWhenDone bool) NamespacedResourcesDeleterInterface {
d := &namespacedResourcesDeleter{ d := &namespacedResourcesDeleter{
nsClient: nsClient, nsClient: nsClient,
clientPool: clientPool, dynamicClient: dynamicClient,
podsGetter: podsGetter, podsGetter: podsGetter,
opCache: &operationNotSupportedCache{ opCache: &operationNotSupportedCache{
m: make(map[operationKey]bool), m: make(map[operationKey]bool),
}, },
@ -68,7 +68,7 @@ type namespacedResourcesDeleter struct {
// Client to manipulate the namespace. // Client to manipulate the namespace.
nsClient v1clientset.NamespaceInterface nsClient v1clientset.NamespaceInterface
// Dynamic client to list and delete all namespaced resources. // Dynamic client to list and delete all namespaced resources.
clientPool dynamic.ClientPool dynamicClient dynamic.DynamicInterface
// Interface to get PodInterface. // Interface to get PodInterface.
podsGetter v1clientset.PodsGetter podsGetter v1clientset.PodsGetter
// Cache of what operations are not supported on each group version resource. // Cache of what operations are not supported on each group version resource.
@ -328,9 +328,7 @@ func (d *namespacedResourcesDeleter) finalizeNamespace(namespace *v1.Namespace)
// deleteCollection is a helper function that will delete the collection of resources // deleteCollection is a helper function that will delete the collection of resources
// it returns true if the operation was supported on the server. // it returns true if the operation was supported on the server.
// it returns an error if the operation was supported on the server but was unable to complete. // it returns an error if the operation was supported on the server but was unable to complete.
func (d *namespacedResourcesDeleter) deleteCollection( func (d *namespacedResourcesDeleter) deleteCollection(gvr schema.GroupVersionResource, namespace string) (bool, error) {
dynamicClient dynamic.Interface, gvr schema.GroupVersionResource,
namespace string) (bool, error) {
glog.V(5).Infof("namespace controller - deleteCollection - namespace: %s, gvr: %v", namespace, gvr) glog.V(5).Infof("namespace controller - deleteCollection - namespace: %s, gvr: %v", namespace, gvr)
key := operationKey{operation: operationDeleteCollection, gvr: gvr} key := operationKey{operation: operationDeleteCollection, gvr: gvr}
@ -339,14 +337,12 @@ func (d *namespacedResourcesDeleter) deleteCollection(
return false, nil return false, nil
} }
apiResource := metav1.APIResource{Name: gvr.Resource, Namespaced: true}
// namespace controller does not want the garbage collector to insert the orphan finalizer since it calls // namespace controller does not want the garbage collector to insert the orphan finalizer since it calls
// resource deletions generically. it will ensure all resources in the namespace are purged prior to releasing // resource deletions generically. it will ensure all resources in the namespace are purged prior to releasing
// namespace itself. // namespace itself.
background := metav1.DeletePropagationBackground background := metav1.DeletePropagationBackground
opts := &metav1.DeleteOptions{PropagationPolicy: &background} opts := &metav1.DeleteOptions{PropagationPolicy: &background}
err := dynamicClient.Resource(&apiResource, namespace).DeleteCollection(opts, metav1.ListOptions{}) err := d.dynamicClient.NamespacedResource(gvr, namespace).DeleteCollection(opts, metav1.ListOptions{})
if err == nil { if err == nil {
return true, nil return true, nil
@ -373,8 +369,7 @@ func (d *namespacedResourcesDeleter) deleteCollection(
// the list of items in the collection (if found) // the list of items in the collection (if found)
// a boolean if the operation is supported // a boolean if the operation is supported
// an error if the operation is supported but could not be completed. // an error if the operation is supported but could not be completed.
func (d *namespacedResourcesDeleter) listCollection( func (d *namespacedResourcesDeleter) listCollection(gvr schema.GroupVersionResource, namespace string) (*unstructured.UnstructuredList, bool, error) {
dynamicClient dynamic.Interface, gvr schema.GroupVersionResource, namespace string) (*unstructured.UnstructuredList, bool, error) {
glog.V(5).Infof("namespace controller - listCollection - namespace: %s, gvr: %v", namespace, gvr) glog.V(5).Infof("namespace controller - listCollection - namespace: %s, gvr: %v", namespace, gvr)
key := operationKey{operation: operationList, gvr: gvr} key := operationKey{operation: operationList, gvr: gvr}
@ -383,13 +378,8 @@ func (d *namespacedResourcesDeleter) listCollection(
return nil, false, nil return nil, false, nil
} }
apiResource := metav1.APIResource{Name: gvr.Resource, Namespaced: true} unstructuredList, err := d.dynamicClient.NamespacedResource(gvr, namespace).List(metav1.ListOptions{IncludeUninitialized: true})
obj, err := dynamicClient.Resource(&apiResource, namespace).List(metav1.ListOptions{IncludeUninitialized: true})
if err == nil { if err == nil {
unstructuredList, ok := obj.(*unstructured.UnstructuredList)
if !ok {
return nil, false, fmt.Errorf("resource: %s, expected *unstructured.UnstructuredList, got %#v", apiResource.Name, obj)
}
return unstructuredList, true, nil return unstructuredList, true, nil
} }
@ -409,22 +399,20 @@ func (d *namespacedResourcesDeleter) listCollection(
} }
// deleteEachItem is a helper function that will list the collection of resources and delete each item 1 by 1. // deleteEachItem is a helper function that will list the collection of resources and delete each item 1 by 1.
func (d *namespacedResourcesDeleter) deleteEachItem( func (d *namespacedResourcesDeleter) deleteEachItem(gvr schema.GroupVersionResource, namespace string) error {
dynamicClient dynamic.Interface, gvr schema.GroupVersionResource, namespace string) error {
glog.V(5).Infof("namespace controller - deleteEachItem - namespace: %s, gvr: %v", namespace, gvr) glog.V(5).Infof("namespace controller - deleteEachItem - namespace: %s, gvr: %v", namespace, gvr)
unstructuredList, listSupported, err := d.listCollection(dynamicClient, gvr, namespace) unstructuredList, listSupported, err := d.listCollection(gvr, namespace)
if err != nil { if err != nil {
return err return err
} }
if !listSupported { if !listSupported {
return nil return nil
} }
apiResource := metav1.APIResource{Name: gvr.Resource, Namespaced: true}
for _, item := range unstructuredList.Items { for _, item := range unstructuredList.Items {
background := metav1.DeletePropagationBackground background := metav1.DeletePropagationBackground
opts := &metav1.DeleteOptions{PropagationPolicy: &background} opts := &metav1.DeleteOptions{PropagationPolicy: &background}
if err = dynamicClient.Resource(&apiResource, namespace).Delete(item.GetName(), opts); err != nil && !errors.IsNotFound(err) && !errors.IsMethodNotSupported(err) { if err = d.dynamicClient.NamespacedResource(gvr, namespace).Delete(item.GetName(), opts); err != nil && !errors.IsNotFound(err) && !errors.IsMethodNotSupported(err) {
return err return err
} }
} }
@ -447,22 +435,15 @@ func (d *namespacedResourcesDeleter) deleteAllContentForGroupVersionResource(
} }
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - estimate - namespace: %s, gvr: %v, estimate: %v", namespace, gvr, estimate) glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - estimate - namespace: %s, gvr: %v, estimate: %v", namespace, gvr, estimate)
// get a client for this group version...
dynamicClient, err := d.clientPool.ClientForGroupVersionResource(gvr)
if err != nil {
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - unable to get client - namespace: %s, gvr: %v, err: %v", namespace, gvr, err)
return estimate, err
}
// first try to delete the entire collection // first try to delete the entire collection
deleteCollectionSupported, err := d.deleteCollection(dynamicClient, gvr, namespace) deleteCollectionSupported, err := d.deleteCollection(gvr, namespace)
if err != nil { if err != nil {
return estimate, err return estimate, err
} }
// delete collection was not supported, so we list and delete each item... // delete collection was not supported, so we list and delete each item...
if !deleteCollectionSupported { if !deleteCollectionSupported {
err = d.deleteEachItem(dynamicClient, gvr, namespace) err = d.deleteEachItem(gvr, namespace)
if err != nil { if err != nil {
return estimate, err return estimate, err
} }
@ -471,7 +452,7 @@ func (d *namespacedResourcesDeleter) deleteAllContentForGroupVersionResource(
// verify there are no more remaining items // verify there are no more remaining items
// it is not an error condition for there to be remaining items if local estimate is non-zero // it is not an error condition for there to be remaining items if local estimate is non-zero
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - checking for no more items in namespace: %s, gvr: %v", namespace, gvr) glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - checking for no more items in namespace: %s, gvr: %v", namespace, gvr)
unstructuredList, listSupported, err := d.listCollection(dynamicClient, gvr, namespace) unstructuredList, listSupported, err := d.listCollection(gvr, namespace)
if err != nil { if err != nil {
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - error verifying no items in namespace: %s, gvr: %v, err: %v", namespace, gvr, err) glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - error verifying no items in namespace: %s, gvr: %v, err: %v", namespace, gvr, err)
return estimate, err return estimate, err
@ -497,8 +478,7 @@ func (d *namespacedResourcesDeleter) deleteAllContentForGroupVersionResource(
// deleteAllContent will use the dynamic client to delete each resource identified in groupVersionResources. // deleteAllContent will use the dynamic client to delete each resource identified in groupVersionResources.
// It returns an estimate of the time remaining before the remaining resources are deleted. // It returns an estimate of the time remaining before the remaining resources are deleted.
// If estimate > 0, not all resources are guaranteed to be gone. // If estimate > 0, not all resources are guaranteed to be gone.
func (d *namespacedResourcesDeleter) deleteAllContent( func (d *namespacedResourcesDeleter) deleteAllContent(namespace string, namespaceDeletedAt metav1.Time) (int64, error) {
namespace string, namespaceDeletedAt metav1.Time) (int64, error) {
estimate := int64(0) estimate := int64(0)
glog.V(4).Infof("namespace controller - deleteAllContent - namespace: %s", namespace) glog.V(4).Infof("namespace controller - deleteAllContent - namespace: %s", namespace)
resources, err := d.discoverResourcesFn() resources, err := d.discoverResourcesFn()

View File

@ -36,7 +36,6 @@ import (
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
core "k8s.io/client-go/testing" core "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/api/legacyscheme"
api "k8s.io/kubernetes/pkg/apis/core" api "k8s.io/kubernetes/pkg/apis/core"
) )
@ -173,14 +172,16 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *metav1.APIVersio
defer srv.Close() defer srv.Close()
mockClient := fake.NewSimpleClientset(testInput.testNamespace) mockClient := fake.NewSimpleClientset(testInput.testNamespace)
clientPool := dynamic.NewClientPool(clientConfig, legacyscheme.Registry.RESTMapper(), dynamic.LegacyAPIPathResolverFunc) dynamicClient, err := dynamic.NewForConfig(clientConfig)
if err != nil {
t.Fatal(err)
}
fn := func() ([]*metav1.APIResourceList, error) { fn := func() ([]*metav1.APIResourceList, error) {
return resources, nil return resources, nil
} }
d := NewNamespacedResourcesDeleter(mockClient.Core().Namespaces(), clientPool, mockClient.Core(), fn, v1.FinalizerKubernetes, true) d := NewNamespacedResourcesDeleter(mockClient.Core().Namespaces(), dynamicClient, mockClient.Core(), fn, v1.FinalizerKubernetes, true)
err := d.Delete(testInput.testNamespace.Name) if err := d.Delete(testInput.testNamespace.Name); err != nil {
if err != nil {
t.Errorf("scenario %s - Unexpected error when synching namespace %v", scenario, err) t.Errorf("scenario %s - Unexpected error when synching namespace %v", scenario, err)
} }

View File

@ -63,7 +63,7 @@ type NamespaceController struct {
// NewNamespaceController creates a new NamespaceController // NewNamespaceController creates a new NamespaceController
func NewNamespaceController( func NewNamespaceController(
kubeClient clientset.Interface, kubeClient clientset.Interface,
clientPool dynamic.ClientPool, dynamicClient dynamic.DynamicInterface,
discoverResourcesFn func() ([]*metav1.APIResourceList, error), discoverResourcesFn func() ([]*metav1.APIResourceList, error),
namespaceInformer coreinformers.NamespaceInformer, namespaceInformer coreinformers.NamespaceInformer,
resyncPeriod time.Duration, resyncPeriod time.Duration,
@ -72,7 +72,7 @@ func NewNamespaceController(
// create the controller so we can inject the enqueue function // create the controller so we can inject the enqueue function
namespaceController := &NamespaceController{ namespaceController := &NamespaceController{
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "namespace"), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "namespace"),
namespacedResourcesDeleter: deletion.NewNamespacedResourcesDeleter(kubeClient.CoreV1().Namespaces(), clientPool, kubeClient.CoreV1(), discoverResourcesFn, finalizerToken, true), namespacedResourcesDeleter: deletion.NewNamespacedResourcesDeleter(kubeClient.CoreV1().Namespaces(), dynamicClient, kubeClient.CoreV1(), discoverResourcesFn, finalizerToken, true),
} }
if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil { if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {

View File

@ -197,7 +197,7 @@ func genericDescriber(clientAccessFactory ClientAccessFactory, mapping *meta.RES
clientConfigCopy.GroupVersion = &gv clientConfigCopy.GroupVersion = &gv
// used to fetch the resource // used to fetch the resource
dynamicClient, err := dynamic.NewClient(&clientConfigCopy) dynamicClient, err := dynamic.NewClient(&clientConfigCopy, gv)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -49,6 +49,9 @@ filegroup(
filegroup( filegroup(
name = "all-srcs", name = "all-srcs",
srcs = [":package-srcs"], srcs = [
":package-srcs",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/unstructuredscheme:all-srcs",
],
tags = ["automanaged"], tags = ["automanaged"],
) )

View File

@ -58,6 +58,26 @@ func (obj *Unstructured) IsList() bool {
_, ok = field.([]interface{}) _, ok = field.([]interface{})
return ok return ok
} }
func (obj *Unstructured) ToList() (*UnstructuredList, error) {
if !obj.IsList() {
// return an empty list back
return &UnstructuredList{Object: obj.Object}, nil
}
ret := &UnstructuredList{}
ret.Object = obj.Object
err := obj.EachListItem(func(item runtime.Object) error {
castItem := item.(*Unstructured)
ret.Items = append(ret.Items, *castItem)
return nil
})
if err != nil {
return nil, err
}
return ret, nil
}
func (obj *Unstructured) EachListItem(fn func(runtime.Object) error) error { func (obj *Unstructured) EachListItem(fn func(runtime.Object) error) error {
field, ok := obj.Object["items"] field, ok := obj.Object["items"]

View File

@ -0,0 +1,30 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["scheme.go"],
importpath = "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/unstructuredscheme",
visibility = ["//visibility:public"],
deps = [
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer/json:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer/versioning:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,121 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package unstructuredscheme
import (
"fmt"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/apimachinery/pkg/runtime/serializer/versioning"
)
var (
scheme = runtime.NewScheme()
codecs = serializer.NewCodecFactory(scheme)
)
// NewUnstructuredNegotiatedSerializer returns a simple, negotiated serializer
func NewUnstructuredNegotiatedSerializer() runtime.NegotiatedSerializer {
return unstructuredNegotiatedSerializer{
scheme: scheme,
typer: NewUnstructuredObjectTyper(),
creator: NewUnstructuredCreator(),
}
}
type unstructuredNegotiatedSerializer struct {
scheme *runtime.Scheme
typer runtime.ObjectTyper
creator runtime.ObjectCreater
}
func (s unstructuredNegotiatedSerializer) SupportedMediaTypes() []runtime.SerializerInfo {
return []runtime.SerializerInfo{
{
MediaType: "application/json",
EncodesAsText: true,
Serializer: json.NewSerializer(json.DefaultMetaFactory, s.creator, s.typer, false),
PrettySerializer: json.NewSerializer(json.DefaultMetaFactory, s.creator, s.typer, true),
StreamSerializer: &runtime.StreamSerializerInfo{
EncodesAsText: true,
Serializer: json.NewSerializer(json.DefaultMetaFactory, s.creator, s.typer, false),
Framer: json.Framer,
},
},
{
MediaType: "application/yaml",
EncodesAsText: true,
Serializer: json.NewYAMLSerializer(json.DefaultMetaFactory, s.creator, s.typer),
},
}
}
func (s unstructuredNegotiatedSerializer) EncoderForVersion(encoder runtime.Encoder, gv runtime.GroupVersioner) runtime.Encoder {
return versioning.NewDefaultingCodecForScheme(s.scheme, encoder, nil, gv, nil)
}
func (s unstructuredNegotiatedSerializer) DecoderToVersion(decoder runtime.Decoder, gv runtime.GroupVersioner) runtime.Decoder {
return versioning.NewDefaultingCodecForScheme(s.scheme, nil, decoder, nil, gv)
}
type unstructuredObjectTyper struct {
}
// NewUnstructuredObjectTyper returns an object typer that can deal with unstructured things
func NewUnstructuredObjectTyper() runtime.ObjectTyper {
return unstructuredObjectTyper{}
}
func (t unstructuredObjectTyper) ObjectKinds(obj runtime.Object) ([]schema.GroupVersionKind, bool, error) {
// Delegate for things other than Unstructured.
if _, ok := obj.(runtime.Unstructured); !ok {
return nil, false, fmt.Errorf("cannot type %T", obj)
}
return []schema.GroupVersionKind{obj.GetObjectKind().GroupVersionKind()}, false, nil
}
func (t unstructuredObjectTyper) Recognizes(gvk schema.GroupVersionKind) bool {
return true
}
type unstructuredCreator struct{}
// NewUnstructuredCreator returns a simple object creator that always returns an unstructured
func NewUnstructuredCreator() runtime.ObjectCreater {
return unstructuredCreator{}
}
func (c unstructuredCreator) New(kind schema.GroupVersionKind) (runtime.Object, error) {
ret := &unstructured.Unstructured{}
ret.SetGroupVersionKind(kind)
return ret, nil
}
type unstructuredDefaulter struct {
}
// NewUnstructuredDefaulter returns defaulter suitable for unstructured types that doesn't default anything
func NewUnstructuredDefaulter() runtime.ObjectDefaulter {
return unstructuredDefaulter{}
}
func (d unstructuredDefaulter) Default(in runtime.Object) {
}

View File

@ -30,25 +30,28 @@ go_test(
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = [ srcs = [
"bad_debt.go",
"client.go", "client.go",
"client_pool.go", "client_pool.go",
"dynamic_util.go", "dynamic_util.go",
"scheme.go",
"simple.go",
], ],
importpath = "k8s.io/client-go/dynamic", importpath = "k8s.io/client-go/dynamic",
deps = [ deps = [
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/conversion/queryparams:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer/json:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer/streaming:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer/versioning:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library", "//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
], ],
) )

View File

@ -0,0 +1,79 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamic
import (
"encoding/json"
"io"
"strings"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
)
// dynamicCodec is a codec that wraps the standard unstructured codec
// with special handling for Status objects.
// Deprecated only used by test code and its wrong
type dynamicCodec struct{}
func (dynamicCodec) Decode(data []byte, gvk *schema.GroupVersionKind, obj runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
obj, gvk, err := unstructured.UnstructuredJSONScheme.Decode(data, gvk, obj)
if err != nil {
return nil, nil, err
}
if _, ok := obj.(*metav1.Status); !ok && strings.ToLower(gvk.Kind) == "status" {
obj = &metav1.Status{}
err := json.Unmarshal(data, obj)
if err != nil {
return nil, nil, err
}
}
return obj, gvk, nil
}
func (dynamicCodec) Encode(obj runtime.Object, w io.Writer) error {
return unstructured.UnstructuredJSONScheme.Encode(obj, w)
}
// ContentConfig returns a rest.ContentConfig for dynamic types.
// Deprecated only used by test code and its wrong
func ContentConfig() rest.ContentConfig {
var jsonInfo runtime.SerializerInfo
// TODO: scheme.Codecs here should become "pkg/apis/server/scheme" which is the minimal core you need
// to talk to a kubernetes server
for _, info := range scheme.Codecs.SupportedMediaTypes() {
if info.MediaType == runtime.ContentTypeJSON {
jsonInfo = info
break
}
}
jsonInfo.Serializer = dynamicCodec{}
jsonInfo.PrettySerializer = nil
return rest.ContentConfig{
AcceptContentTypes: runtime.ContentTypeJSON,
ContentType: runtime.ContentTypeJSON,
NegotiatedSerializer: serializer.NegotiatedSerializerWrapper(jsonInfo),
}
}

View File

@ -20,37 +20,24 @@ limitations under the License.
package dynamic package dynamic
import ( import (
"encoding/json"
"errors"
"io"
"net/url"
"strings" "strings"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/conversion/queryparams"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/scheme"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
"k8s.io/client-go/util/flowcontrol"
) )
// Interface is a Kubernetes client that allows you to access metadata // Interface is a Kubernetes client that allows you to access metadata
// and manipulate metadata of a Kubernetes API group. // and manipulate metadata of a Kubernetes API group.
type Interface interface { type Interface interface {
// GetRateLimiter returns the rate limiter for this client.
GetRateLimiter() flowcontrol.RateLimiter
// Resource returns an API interface to the specified resource for this client's // Resource returns an API interface to the specified resource for this client's
// group and version. If resource is not a namespaced resource, then namespace // group and version. If resource is not a namespaced resource, then namespace
// is ignored. The ResourceInterface inherits the parameter codec of this client. // is ignored. The ResourceInterface inherits the parameter codec of this client.
Resource(resource *metav1.APIResource, namespace string) ResourceInterface Resource(resource *metav1.APIResource, namespace string) ResourceInterface
// ParameterCodec returns a client with the provided parameter codec.
ParameterCodec(parameterCodec runtime.ParameterCodec) Interface
} }
// ResourceInterface is an API interface to a specific resource under a // ResourceInterface is an API interface to a specific resource under a
@ -77,303 +64,50 @@ type ResourceInterface interface {
// Client is a Kubernetes client that allows you to access metadata // Client is a Kubernetes client that allows you to access metadata
// and manipulate metadata of a Kubernetes API group, and implements Interface. // and manipulate metadata of a Kubernetes API group, and implements Interface.
type Client struct { type Client struct {
cl *restclient.RESTClient version schema.GroupVersion
parameterCodec runtime.ParameterCodec delegate DynamicInterface
} }
// NewClient returns a new client based on the passed in config. The // NewClient returns a new client based on the passed in config. The
// codec is ignored, as the dynamic client uses it's own codec. // codec is ignored, as the dynamic client uses it's own codec.
func NewClient(conf *restclient.Config) (*Client, error) { func NewClient(conf *restclient.Config, version schema.GroupVersion) (*Client, error) {
// avoid changing the original config delegate, err := NewForConfig(conf)
confCopy := *conf
conf = &confCopy
contentConfig := ContentConfig()
contentConfig.GroupVersion = conf.GroupVersion
if conf.NegotiatedSerializer != nil {
contentConfig.NegotiatedSerializer = conf.NegotiatedSerializer
}
conf.ContentConfig = contentConfig
if conf.APIPath == "" {
conf.APIPath = "/api"
}
if len(conf.UserAgent) == 0 {
conf.UserAgent = restclient.DefaultKubernetesUserAgent()
}
cl, err := restclient.RESTClientFor(conf)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &Client{cl: cl}, nil return &Client{version: version, delegate: delegate}, nil
}
// GetRateLimiter returns rate limier.
func (c *Client) GetRateLimiter() flowcontrol.RateLimiter {
return c.cl.GetRateLimiter()
} }
// Resource returns an API interface to the specified resource for this client's // Resource returns an API interface to the specified resource for this client's
// group and version. If resource is not a namespaced resource, then namespace // group and version. If resource is not a namespaced resource, then namespace
// is ignored. The ResourceInterface inherits the parameter codec of c. // is ignored. The ResourceInterface inherits the parameter codec of c.
func (c *Client) Resource(resource *metav1.APIResource, namespace string) ResourceInterface { func (c *Client) Resource(resource *metav1.APIResource, namespace string) ResourceInterface {
return &ResourceClient{ resourceTokens := strings.SplitN(resource.Name, "/", 2)
cl: c.cl, subresource := ""
resource: resource, if len(resourceTokens) > 1 {
ns: namespace, subresource = resourceTokens[1]
parameterCodec: c.parameterCodec,
}
}
// ParameterCodec returns a client with the provided parameter codec.
func (c *Client) ParameterCodec(parameterCodec runtime.ParameterCodec) Interface {
return &Client{
cl: c.cl,
parameterCodec: parameterCodec,
}
}
// ResourceClient is an API interface to a specific resource under a
// dynamic client, and implements ResourceInterface.
type ResourceClient struct {
cl *restclient.RESTClient
resource *metav1.APIResource
ns string
parameterCodec runtime.ParameterCodec
}
func (rc *ResourceClient) parseResourceSubresourceName() (string, []string) {
var resourceName string
var subresourceName []string
if strings.Contains(rc.resource.Name, "/") {
resourceName = strings.Split(rc.resource.Name, "/")[0]
subresourceName = strings.Split(rc.resource.Name, "/")[1:]
} else {
resourceName = rc.resource.Name
} }
return resourceName, subresourceName if len(namespace) == 0 {
} return oldResourceShim(c.delegate.ClusterSubresource(c.version.WithResource(resourceTokens[0]), subresource))
// List returns a list of objects for this resource.
func (rc *ResourceClient) List(opts metav1.ListOptions) (runtime.Object, error) {
parameterEncoder := rc.parameterCodec
if parameterEncoder == nil {
parameterEncoder = defaultParameterEncoder
} }
return rc.cl.Get(). return oldResourceShim(c.delegate.NamespacedSubresource(c.version.WithResource(resourceTokens[0]), subresource, namespace))
NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(rc.resource.Name).
VersionedParams(&opts, parameterEncoder).
Do().
Get()
} }
// Get gets the resource with the specified name. // the old interfaces used the wrong type for lists. this fixes that
func (rc *ResourceClient) Get(name string, opts metav1.GetOptions) (*unstructured.Unstructured, error) { func oldResourceShim(in DynamicResourceInterface) ResourceInterface {
parameterEncoder := rc.parameterCodec return oldResourceShimType{DynamicResourceInterface: in}
if parameterEncoder == nil {
parameterEncoder = defaultParameterEncoder
}
result := new(unstructured.Unstructured)
resourceName, subresourceName := rc.parseResourceSubresourceName()
err := rc.cl.Get().
NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(resourceName).
SubResource(subresourceName...).
VersionedParams(&opts, parameterEncoder).
Name(name).
Do().
Into(result)
return result, err
} }
// Delete deletes the resource with the specified name. type oldResourceShimType struct {
func (rc *ResourceClient) Delete(name string, opts *metav1.DeleteOptions) error { DynamicResourceInterface
return rc.cl.Delete().
NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(rc.resource.Name).
Name(name).
Body(opts).
Do().
Error()
} }
// DeleteCollection deletes a collection of objects. func (s oldResourceShimType) List(opts metav1.ListOptions) (runtime.Object, error) {
func (rc *ResourceClient) DeleteCollection(deleteOptions *metav1.DeleteOptions, listOptions metav1.ListOptions) error { return s.DynamicResourceInterface.List(opts)
parameterEncoder := rc.parameterCodec
if parameterEncoder == nil {
parameterEncoder = defaultParameterEncoder
}
return rc.cl.Delete().
NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(rc.resource.Name).
VersionedParams(&listOptions, parameterEncoder).
Body(deleteOptions).
Do().
Error()
} }
// Create creates the provided resource. func (s oldResourceShimType) Patch(name string, pt types.PatchType, data []byte) (*unstructured.Unstructured, error) {
func (rc *ResourceClient) Create(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) { return s.DynamicResourceInterface.Patch(name, pt, data)
result := new(unstructured.Unstructured)
resourceName, subresourceName := rc.parseResourceSubresourceName()
req := rc.cl.Post().
NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(resourceName).
Body(obj)
if len(subresourceName) > 0 {
// If the provided resource is a subresource, the POST request should contain
// object name. Examples of subresources that support Create operation:
// core/v1/pods/{name}/binding
// core/v1/pods/{name}/eviction
// extensions/v1beta1/deployments/{name}/rollback
// apps/v1beta1/deployments/{name}/rollback
// NOTE: Currently our system assumes every subresource object has the same
// name as the parent resource object. E.g. a pods/binding object having
// metadada.name "foo" means pod "foo" is being bound. We may need to
// change this if we break the assumption in the future.
req = req.SubResource(subresourceName...).
Name(obj.GetName())
}
err := req.Do().
Into(result)
return result, err
} }
// Update updates the provided resource.
func (rc *ResourceClient) Update(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
result := new(unstructured.Unstructured)
if len(obj.GetName()) == 0 {
return result, errors.New("object missing name")
}
resourceName, subresourceName := rc.parseResourceSubresourceName()
err := rc.cl.Put().
NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(resourceName).
SubResource(subresourceName...).
// NOTE: Currently our system assumes every subresource object has the same
// name as the parent resource object. E.g. a pods/binding object having
// metadada.name "foo" means pod "foo" is being bound. We may need to
// change this if we break the assumption in the future.
Name(obj.GetName()).
Body(obj).
Do().
Into(result)
return result, err
}
// Watch returns a watch.Interface that watches the resource.
func (rc *ResourceClient) Watch(opts metav1.ListOptions) (watch.Interface, error) {
parameterEncoder := rc.parameterCodec
if parameterEncoder == nil {
parameterEncoder = defaultParameterEncoder
}
opts.Watch = true
return rc.cl.Get().
NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(rc.resource.Name).
VersionedParams(&opts, parameterEncoder).
Watch()
}
// Patch applies the patch and returns the patched resource.
func (rc *ResourceClient) Patch(name string, pt types.PatchType, data []byte) (*unstructured.Unstructured, error) {
result := new(unstructured.Unstructured)
resourceName, subresourceName := rc.parseResourceSubresourceName()
err := rc.cl.Patch(pt).
NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(resourceName).
SubResource(subresourceName...).
Name(name).
Body(data).
Do().
Into(result)
return result, err
}
// dynamicCodec is a codec that wraps the standard unstructured codec
// with special handling for Status objects.
type dynamicCodec struct{}
func (dynamicCodec) Decode(data []byte, gvk *schema.GroupVersionKind, obj runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
obj, gvk, err := unstructured.UnstructuredJSONScheme.Decode(data, gvk, obj)
if err != nil {
return nil, nil, err
}
if _, ok := obj.(*metav1.Status); !ok && strings.ToLower(gvk.Kind) == "status" {
obj = &metav1.Status{}
err := json.Unmarshal(data, obj)
if err != nil {
return nil, nil, err
}
}
return obj, gvk, nil
}
func (dynamicCodec) Encode(obj runtime.Object, w io.Writer) error {
return unstructured.UnstructuredJSONScheme.Encode(obj, w)
}
// ContentConfig returns a restclient.ContentConfig for dynamic types.
func ContentConfig() restclient.ContentConfig {
var jsonInfo runtime.SerializerInfo
// TODO: scheme.Codecs here should become "pkg/apis/server/scheme" which is the minimal core you need
// to talk to a kubernetes server
for _, info := range scheme.Codecs.SupportedMediaTypes() {
if info.MediaType == runtime.ContentTypeJSON {
jsonInfo = info
break
}
}
jsonInfo.Serializer = dynamicCodec{}
jsonInfo.PrettySerializer = nil
return restclient.ContentConfig{
AcceptContentTypes: runtime.ContentTypeJSON,
ContentType: runtime.ContentTypeJSON,
NegotiatedSerializer: serializer.NegotiatedSerializerWrapper(jsonInfo),
}
}
// paramaterCodec is a codec converts an API object to query
// parameters without trying to convert to the target version.
type parameterCodec struct{}
func (parameterCodec) EncodeParameters(obj runtime.Object, to schema.GroupVersion) (url.Values, error) {
return queryparams.Convert(obj)
}
func (parameterCodec) DecodeParameters(parameters url.Values, from schema.GroupVersion, into runtime.Object) error {
return errors.New("DecodeParameters not implemented on dynamic parameterCodec")
}
var defaultParameterEncoder runtime.ParameterCodec = parameterCodec{}
type versionedParameterEncoderWithV1Fallback struct{}
func (versionedParameterEncoderWithV1Fallback) EncodeParameters(obj runtime.Object, to schema.GroupVersion) (url.Values, error) {
ret, err := scheme.ParameterCodec.EncodeParameters(obj, to)
if err != nil && runtime.IsNotRegisteredError(err) {
// fallback to v1
return scheme.ParameterCodec.EncodeParameters(obj, v1.SchemeGroupVersion)
}
return ret, err
}
func (versionedParameterEncoderWithV1Fallback) DecodeParameters(parameters url.Values, from schema.GroupVersion, into runtime.Object) error {
return errors.New("DecodeParameters not implemented on versionedParameterEncoderWithV1Fallback")
}
// VersionedParameterEncoderWithV1Fallback is useful for encoding query
// parameters for custom resources. It tries to convert object to the
// specified version before converting it to query parameters, and falls back to
// converting to v1 if the object is not registered in the specified version.
// For the record, currently API server always treats query parameters sent to a
// custom resource endpoint as v1.
var VersionedParameterEncoderWithV1Fallback runtime.ParameterCodec = versionedParameterEncoderWithV1Fallback{}

View File

@ -113,7 +113,7 @@ func (c *clientPoolImpl) ClientForGroupVersionKind(kind schema.GroupVersionKind)
// we need to make a client // we need to make a client
conf.GroupVersion = &gv conf.GroupVersion = &gv
dynamicClient, err := NewClient(conf) dynamicClient, err := NewClient(conf, gv)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -63,7 +63,7 @@ func getClientServer(gv *schema.GroupVersion, h func(http.ResponseWriter, *http.
cl, err := NewClient(&restclient.Config{ cl, err := NewClient(&restclient.Config{
Host: srv.URL, Host: srv.URL,
ContentConfig: restclient.ContentConfig{GroupVersion: gv}, ContentConfig: restclient.ContentConfig{GroupVersion: gv},
}) }, *gv)
if err != nil { if err != nil {
srv.Close() srv.Close()
return nil, nil, err return nil, nil, err
@ -81,7 +81,7 @@ func TestList(t *testing.T) {
}{ }{
{ {
name: "normal_list", name: "normal_list",
path: "/api/gtest/vtest/rtest", path: "/apis/gtest/vtest/rtest",
resp: getListJSON("vTest", "rTestList", resp: getListJSON("vTest", "rTestList",
getJSON("vTest", "rTest", "item1"), getJSON("vTest", "rTest", "item1"),
getJSON("vTest", "rTest", "item2")), getJSON("vTest", "rTest", "item2")),
@ -99,7 +99,7 @@ func TestList(t *testing.T) {
{ {
name: "namespaced_list", name: "namespaced_list",
namespace: "nstest", namespace: "nstest",
path: "/api/gtest/vtest/namespaces/nstest/rtest", path: "/apis/gtest/vtest/namespaces/nstest/rtest",
resp: getListJSON("vTest", "rTestList", resp: getListJSON("vTest", "rTestList",
getJSON("vTest", "rTest", "item1"), getJSON("vTest", "rTest", "item1"),
getJSON("vTest", "rTest", "item2")), getJSON("vTest", "rTest", "item2")),
@ -160,7 +160,7 @@ func TestGet(t *testing.T) {
{ {
resource: "rtest", resource: "rtest",
name: "normal_get", name: "normal_get",
path: "/api/gtest/vtest/rtest/normal_get", path: "/apis/gtest/vtest/rtest/normal_get",
resp: getJSON("vTest", "rTest", "normal_get"), resp: getJSON("vTest", "rTest", "normal_get"),
want: getObject("vTest", "rTest", "normal_get"), want: getObject("vTest", "rTest", "normal_get"),
}, },
@ -168,14 +168,14 @@ func TestGet(t *testing.T) {
resource: "rtest", resource: "rtest",
namespace: "nstest", namespace: "nstest",
name: "namespaced_get", name: "namespaced_get",
path: "/api/gtest/vtest/namespaces/nstest/rtest/namespaced_get", path: "/apis/gtest/vtest/namespaces/nstest/rtest/namespaced_get",
resp: getJSON("vTest", "rTest", "namespaced_get"), resp: getJSON("vTest", "rTest", "namespaced_get"),
want: getObject("vTest", "rTest", "namespaced_get"), want: getObject("vTest", "rTest", "namespaced_get"),
}, },
{ {
resource: "rtest/srtest", resource: "rtest/srtest",
name: "normal_subresource_get", name: "normal_subresource_get",
path: "/api/gtest/vtest/rtest/normal_subresource_get/srtest", path: "/apis/gtest/vtest/rtest/normal_subresource_get/srtest",
resp: getJSON("vTest", "srTest", "normal_subresource_get"), resp: getJSON("vTest", "srTest", "normal_subresource_get"),
want: getObject("vTest", "srTest", "normal_subresource_get"), want: getObject("vTest", "srTest", "normal_subresource_get"),
}, },
@ -183,7 +183,7 @@ func TestGet(t *testing.T) {
resource: "rtest/srtest", resource: "rtest/srtest",
namespace: "nstest", namespace: "nstest",
name: "namespaced_subresource_get", name: "namespaced_subresource_get",
path: "/api/gtest/vtest/namespaces/nstest/rtest/namespaced_subresource_get/srtest", path: "/apis/gtest/vtest/namespaces/nstest/rtest/namespaced_subresource_get/srtest",
resp: getJSON("vTest", "srTest", "namespaced_subresource_get"), resp: getJSON("vTest", "srTest", "namespaced_subresource_get"),
want: getObject("vTest", "srTest", "namespaced_subresource_get"), want: getObject("vTest", "srTest", "namespaced_subresource_get"),
}, },
@ -222,23 +222,33 @@ func TestGet(t *testing.T) {
} }
func TestDelete(t *testing.T) { func TestDelete(t *testing.T) {
background := metav1.DeletePropagationBackground
uid := types.UID("uid")
statusOK := &metav1.Status{ statusOK := &metav1.Status{
TypeMeta: metav1.TypeMeta{Kind: "Status"}, TypeMeta: metav1.TypeMeta{Kind: "Status"},
Status: metav1.StatusSuccess, Status: metav1.StatusSuccess,
} }
tcs := []struct { tcs := []struct {
namespace string namespace string
name string name string
path string path string
deleteOptions *metav1.DeleteOptions
}{ }{
{ {
name: "normal_delete", name: "normal_delete",
path: "/api/gtest/vtest/rtest/normal_delete", path: "/apis/gtest/vtest/rtest/normal_delete",
}, },
{ {
namespace: "nstest", namespace: "nstest",
name: "namespaced_delete", name: "namespaced_delete",
path: "/api/gtest/vtest/namespaces/nstest/rtest/namespaced_delete", path: "/apis/gtest/vtest/namespaces/nstest/rtest/namespaced_delete",
},
{
namespace: "nstest",
name: "namespaced_delete_with_options",
path: "/apis/gtest/vtest/namespaces/nstest/rtest/namespaced_delete_with_options",
deleteOptions: &metav1.DeleteOptions{Preconditions: &metav1.Preconditions{UID: &uid}, PropagationPolicy: &background},
}, },
} }
for _, tc := range tcs { for _, tc := range tcs {
@ -262,7 +272,7 @@ func TestDelete(t *testing.T) {
} }
defer srv.Close() defer srv.Close()
err = cl.Resource(resource, tc.namespace).Delete(tc.name, nil) err = cl.Resource(resource, tc.namespace).Delete(tc.name, tc.deleteOptions)
if err != nil { if err != nil {
t.Errorf("unexpected error when deleting %q: %v", tc.name, err) t.Errorf("unexpected error when deleting %q: %v", tc.name, err)
continue continue
@ -282,12 +292,12 @@ func TestDeleteCollection(t *testing.T) {
}{ }{
{ {
name: "normal_delete_collection", name: "normal_delete_collection",
path: "/api/gtest/vtest/rtest", path: "/apis/gtest/vtest/rtest",
}, },
{ {
namespace: "nstest", namespace: "nstest",
name: "namespaced_delete_collection", name: "namespaced_delete_collection",
path: "/api/gtest/vtest/namespaces/nstest/rtest", path: "/apis/gtest/vtest/namespaces/nstest/rtest",
}, },
} }
for _, tc := range tcs { for _, tc := range tcs {
@ -330,28 +340,15 @@ func TestCreate(t *testing.T) {
{ {
resource: "rtest", resource: "rtest",
name: "normal_create", name: "normal_create",
path: "/api/gtest/vtest/rtest", path: "/apis/gtest/vtest/rtest",
obj: getObject("vTest", "rTest", "normal_create"), obj: getObject("gtest/vTest", "rTest", "normal_create"),
}, },
{ {
resource: "rtest", resource: "rtest",
name: "namespaced_create", name: "namespaced_create",
namespace: "nstest", namespace: "nstest",
path: "/api/gtest/vtest/namespaces/nstest/rtest", path: "/apis/gtest/vtest/namespaces/nstest/rtest",
obj: getObject("vTest", "rTest", "namespaced_create"), obj: getObject("gtest/vTest", "rTest", "namespaced_create"),
},
{
resource: "rtest/srtest",
name: "normal_subresource_create",
path: "/api/gtest/vtest/rtest/normal_subresource_create/srtest",
obj: getObject("vTest", "srTest", "normal_subresource_create"),
},
{
resource: "rtest/srtest",
name: "namespaced_subresource_create",
namespace: "nstest",
path: "/api/gtest/vtest/namespaces/nstest/rtest/namespaced_subresource_create/srtest",
obj: getObject("vTest", "srTest", "namespaced_subresource_create"),
}, },
} }
for _, tc := range tcs { for _, tc := range tcs {
@ -405,28 +402,28 @@ func TestUpdate(t *testing.T) {
{ {
resource: "rtest", resource: "rtest",
name: "normal_update", name: "normal_update",
path: "/api/gtest/vtest/rtest/normal_update", path: "/apis/gtest/vtest/rtest/normal_update",
obj: getObject("vTest", "rTest", "normal_update"), obj: getObject("gtest/vTest", "rTest", "normal_update"),
}, },
{ {
resource: "rtest", resource: "rtest",
name: "namespaced_update", name: "namespaced_update",
namespace: "nstest", namespace: "nstest",
path: "/api/gtest/vtest/namespaces/nstest/rtest/namespaced_update", path: "/apis/gtest/vtest/namespaces/nstest/rtest/namespaced_update",
obj: getObject("vTest", "rTest", "namespaced_update"), obj: getObject("gtest/vTest", "rTest", "namespaced_update"),
}, },
{ {
resource: "rtest/srtest", resource: "rtest/srtest",
name: "normal_subresource_update", name: "normal_subresource_update",
path: "/api/gtest/vtest/rtest/normal_update/srtest", path: "/apis/gtest/vtest/rtest/normal_update/srtest",
obj: getObject("vTest", "srTest", "normal_update"), obj: getObject("gtest/vTest", "srTest", "normal_update"),
}, },
{ {
resource: "rtest/srtest", resource: "rtest/srtest",
name: "namespaced_subresource_update", name: "namespaced_subresource_update",
namespace: "nstest", namespace: "nstest",
path: "/api/gtest/vtest/namespaces/nstest/rtest/namespaced_update/srtest", path: "/apis/gtest/vtest/namespaces/nstest/rtest/namespaced_update/srtest",
obj: getObject("vTest", "srTest", "namespaced_update"), obj: getObject("gtest/vTest", "srTest", "namespaced_update"),
}, },
} }
for _, tc := range tcs { for _, tc := range tcs {
@ -479,23 +476,23 @@ func TestWatch(t *testing.T) {
}{ }{
{ {
name: "normal_watch", name: "normal_watch",
path: "/api/gtest/vtest/rtest", path: "/apis/gtest/vtest/rtest",
query: "watch=true", query: "watch=true",
events: []watch.Event{ events: []watch.Event{
{Type: watch.Added, Object: getObject("vTest", "rTest", "normal_watch")}, {Type: watch.Added, Object: getObject("gtest/vTest", "rTest", "normal_watch")},
{Type: watch.Modified, Object: getObject("vTest", "rTest", "normal_watch")}, {Type: watch.Modified, Object: getObject("gtest/vTest", "rTest", "normal_watch")},
{Type: watch.Deleted, Object: getObject("vTest", "rTest", "normal_watch")}, {Type: watch.Deleted, Object: getObject("gtest/vTest", "rTest", "normal_watch")},
}, },
}, },
{ {
name: "namespaced_watch", name: "namespaced_watch",
namespace: "nstest", namespace: "nstest",
path: "/api/gtest/vtest/namespaces/nstest/rtest", path: "/apis/gtest/vtest/namespaces/nstest/rtest",
query: "watch=true", query: "watch=true",
events: []watch.Event{ events: []watch.Event{
{Type: watch.Added, Object: getObject("vTest", "rTest", "namespaced_watch")}, {Type: watch.Added, Object: getObject("gtest/vTest", "rTest", "namespaced_watch")},
{Type: watch.Modified, Object: getObject("vTest", "rTest", "namespaced_watch")}, {Type: watch.Modified, Object: getObject("gtest/vTest", "rTest", "namespaced_watch")},
{Type: watch.Deleted, Object: getObject("vTest", "rTest", "namespaced_watch")}, {Type: watch.Deleted, Object: getObject("gtest/vTest", "rTest", "namespaced_watch")},
}, },
}, },
} }
@ -552,32 +549,32 @@ func TestPatch(t *testing.T) {
{ {
resource: "rtest", resource: "rtest",
name: "normal_patch", name: "normal_patch",
path: "/api/gtest/vtest/rtest/normal_patch", path: "/apis/gtest/vtest/rtest/normal_patch",
patch: getJSON("vTest", "rTest", "normal_patch"), patch: getJSON("gtest/vTest", "rTest", "normal_patch"),
want: getObject("vTest", "rTest", "normal_patch"), want: getObject("gtest/vTest", "rTest", "normal_patch"),
}, },
{ {
resource: "rtest", resource: "rtest",
name: "namespaced_patch", name: "namespaced_patch",
namespace: "nstest", namespace: "nstest",
path: "/api/gtest/vtest/namespaces/nstest/rtest/namespaced_patch", path: "/apis/gtest/vtest/namespaces/nstest/rtest/namespaced_patch",
patch: getJSON("vTest", "rTest", "namespaced_patch"), patch: getJSON("gtest/vTest", "rTest", "namespaced_patch"),
want: getObject("vTest", "rTest", "namespaced_patch"), want: getObject("gtest/vTest", "rTest", "namespaced_patch"),
}, },
{ {
resource: "rtest/srtest", resource: "rtest/srtest",
name: "normal_subresource_patch", name: "normal_subresource_patch",
path: "/api/gtest/vtest/rtest/normal_subresource_patch/srtest", path: "/apis/gtest/vtest/rtest/normal_subresource_patch/srtest",
patch: getJSON("vTest", "srTest", "normal_subresource_patch"), patch: getJSON("gtest/vTest", "srTest", "normal_subresource_patch"),
want: getObject("vTest", "srTest", "normal_subresource_patch"), want: getObject("gtest/vTest", "srTest", "normal_subresource_patch"),
}, },
{ {
resource: "rtest/srtest", resource: "rtest/srtest",
name: "namespaced_subresource_patch", name: "namespaced_subresource_patch",
namespace: "nstest", namespace: "nstest",
path: "/api/gtest/vtest/namespaces/nstest/rtest/namespaced_subresource_patch/srtest", path: "/apis/gtest/vtest/namespaces/nstest/rtest/namespaced_subresource_patch/srtest",
patch: getJSON("vTest", "srTest", "namespaced_subresource_patch"), patch: getJSON("gtest/vTest", "srTest", "namespaced_subresource_patch"),
want: getObject("vTest", "srTest", "namespaced_subresource_patch"), want: getObject("gtest/vTest", "srTest", "namespaced_subresource_patch"),
}, },
} }
for _, tc := range tcs { for _, tc := range tcs {
@ -624,11 +621,3 @@ func TestPatch(t *testing.T) {
} }
} }
} }
func TestVersionedParameterEncoderWithV1Fallback(t *testing.T) {
enc := VersionedParameterEncoderWithV1Fallback
_, err := enc.EncodeParameters(&metav1.ListOptions{}, schema.GroupVersion{Group: "foo.bar.com", Version: "v4"})
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
}

View File

@ -0,0 +1,98 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamic
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/apimachinery/pkg/runtime/serializer/versioning"
)
var watchScheme = runtime.NewScheme()
var basicScheme = runtime.NewScheme()
var deleteScheme = runtime.NewScheme()
var parameterScheme = runtime.NewScheme()
var deleteOptionsCodec = serializer.NewCodecFactory(deleteScheme)
var dynamicParameterCodec = runtime.NewParameterCodec(parameterScheme)
var versionV1 = schema.GroupVersion{Version: "v1"}
func init() {
metav1.AddToGroupVersion(watchScheme, versionV1)
metav1.AddToGroupVersion(basicScheme, versionV1)
metav1.AddToGroupVersion(parameterScheme, versionV1)
metav1.AddToGroupVersion(deleteScheme, versionV1)
}
var watchJsonSerializerInfo = runtime.SerializerInfo{
MediaType: "application/json",
EncodesAsText: true,
Serializer: json.NewSerializer(json.DefaultMetaFactory, watchScheme, watchScheme, false),
PrettySerializer: json.NewSerializer(json.DefaultMetaFactory, watchScheme, watchScheme, true),
StreamSerializer: &runtime.StreamSerializerInfo{
EncodesAsText: true,
Serializer: json.NewSerializer(json.DefaultMetaFactory, watchScheme, watchScheme, false),
Framer: json.Framer,
},
}
// watchNegotiatedSerializer is used to read the wrapper of the watch stream
type watchNegotiatedSerializer struct{}
var watchNegotiatedSerializerInstance = watchNegotiatedSerializer{}
func (s watchNegotiatedSerializer) SupportedMediaTypes() []runtime.SerializerInfo {
return []runtime.SerializerInfo{watchJsonSerializerInfo}
}
func (s watchNegotiatedSerializer) EncoderForVersion(encoder runtime.Encoder, gv runtime.GroupVersioner) runtime.Encoder {
return versioning.NewDefaultingCodecForScheme(watchScheme, encoder, nil, gv, nil)
}
func (s watchNegotiatedSerializer) DecoderToVersion(decoder runtime.Decoder, gv runtime.GroupVersioner) runtime.Decoder {
return versioning.NewDefaultingCodecForScheme(watchScheme, nil, decoder, nil, gv)
}
// basicNegotiatedSerializer is used to handle discovery and error handling serialization
type basicNegotiatedSerializer struct{}
func (s basicNegotiatedSerializer) SupportedMediaTypes() []runtime.SerializerInfo {
return []runtime.SerializerInfo{
{
MediaType: "application/json",
EncodesAsText: true,
Serializer: json.NewSerializer(json.DefaultMetaFactory, basicScheme, basicScheme, false),
PrettySerializer: json.NewSerializer(json.DefaultMetaFactory, basicScheme, basicScheme, true),
StreamSerializer: &runtime.StreamSerializerInfo{
EncodesAsText: true,
Serializer: json.NewSerializer(json.DefaultMetaFactory, basicScheme, basicScheme, false),
Framer: json.Framer,
},
},
}
}
func (s basicNegotiatedSerializer) EncoderForVersion(encoder runtime.Encoder, gv runtime.GroupVersioner) runtime.Encoder {
return versioning.NewDefaultingCodecForScheme(watchScheme, encoder, nil, gv, nil)
}
func (s basicNegotiatedSerializer) DecoderToVersion(decoder runtime.Decoder, gv runtime.GroupVersioner) runtime.Decoder {
return versioning.NewDefaultingCodecForScheme(watchScheme, nil, decoder, nil, gv)
}

View File

@ -0,0 +1,322 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamic
import (
"fmt"
"io"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/rest"
)
type DynamicInterface interface {
ClusterResource(resource schema.GroupVersionResource) DynamicResourceInterface
NamespacedResource(resource schema.GroupVersionResource, namespace string) DynamicResourceInterface
// Deprecated, this isn't how we want to do it
ClusterSubresource(resource schema.GroupVersionResource, subresource string) DynamicResourceInterface
// Deprecated, this isn't how we want to do it
NamespacedSubresource(resource schema.GroupVersionResource, subresource, namespace string) DynamicResourceInterface
}
type DynamicResourceInterface interface {
Create(obj *unstructured.Unstructured) (*unstructured.Unstructured, error)
Update(obj *unstructured.Unstructured) (*unstructured.Unstructured, error)
UpdateStatus(obj *unstructured.Unstructured) (*unstructured.Unstructured, error)
Delete(name string, options *metav1.DeleteOptions) error
DeleteCollection(options *metav1.DeleteOptions, listOptions metav1.ListOptions) error
Get(name string, options metav1.GetOptions) (*unstructured.Unstructured, error)
List(opts metav1.ListOptions) (*unstructured.UnstructuredList, error)
Watch(opts metav1.ListOptions) (watch.Interface, error)
Patch(name string, pt types.PatchType, data []byte, subresources ...string) (*unstructured.Unstructured, error)
}
type dynamicClient struct {
client *rest.RESTClient
}
var _ DynamicInterface = &dynamicClient{}
func NewForConfig(inConfig *rest.Config) (DynamicInterface, error) {
config := rest.CopyConfig(inConfig)
// for serializing the options
config.GroupVersion = &schema.GroupVersion{}
config.APIPath = "/if-you-see-this-search-for-the-break"
config.AcceptContentTypes = "application/json"
config.ContentType = "application/json"
config.NegotiatedSerializer = basicNegotiatedSerializer{} // this gets used for discovery and error handling types
if config.UserAgent == "" {
config.UserAgent = rest.DefaultKubernetesUserAgent()
}
restClient, err := rest.RESTClientFor(config)
if err != nil {
return nil, err
}
return &dynamicClient{client: restClient}, nil
}
type dynamicResourceClient struct {
client *dynamicClient
namespace string
resource schema.GroupVersionResource
subresource string
}
func (c *dynamicClient) ClusterResource(resource schema.GroupVersionResource) DynamicResourceInterface {
return &dynamicResourceClient{client: c, resource: resource}
}
func (c *dynamicClient) NamespacedResource(resource schema.GroupVersionResource, namespace string) DynamicResourceInterface {
return &dynamicResourceClient{client: c, resource: resource, namespace: namespace}
}
func (c *dynamicClient) ClusterSubresource(resource schema.GroupVersionResource, subresource string) DynamicResourceInterface {
return &dynamicResourceClient{client: c, resource: resource, subresource: subresource}
}
func (c *dynamicClient) NamespacedSubresource(resource schema.GroupVersionResource, subresource, namespace string) DynamicResourceInterface {
return &dynamicResourceClient{client: c, resource: resource, namespace: namespace, subresource: subresource}
}
func (c *dynamicResourceClient) Create(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
if len(c.subresource) > 0 {
return nil, fmt.Errorf("create not supported for subresources")
}
outBytes, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj)
if err != nil {
return nil, err
}
result := c.client.client.Post().AbsPath(c.makeURLSegments("")...).Body(outBytes).Do()
if err := result.Error(); err != nil {
return nil, err
}
retBytes, err := result.Raw()
if err != nil {
return nil, err
}
uncastObj, err := runtime.Decode(unstructured.UnstructuredJSONScheme, retBytes)
if err != nil {
return nil, err
}
return uncastObj.(*unstructured.Unstructured), nil
}
func (c *dynamicResourceClient) Update(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
accessor, err := meta.Accessor(obj)
if err != nil {
return nil, err
}
outBytes, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj)
if err != nil {
return nil, err
}
result := c.client.client.Put().AbsPath(c.makeURLSegments(accessor.GetName())...).Body(outBytes).Do()
if err := result.Error(); err != nil {
return nil, err
}
retBytes, err := result.Raw()
if err != nil {
return nil, err
}
uncastObj, err := runtime.Decode(unstructured.UnstructuredJSONScheme, retBytes)
if err != nil {
return nil, err
}
return uncastObj.(*unstructured.Unstructured), nil
}
func (c *dynamicResourceClient) UpdateStatus(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
accessor, err := meta.Accessor(obj)
if err != nil {
return nil, err
}
result := c.client.client.Put().AbsPath(append(c.makeURLSegments(accessor.GetName()), "status")...).Body(obj).Do()
uncastObj, err := result.Get()
if err != nil {
return nil, err
}
return uncastObj.(*unstructured.Unstructured), nil
}
func (c *dynamicResourceClient) Delete(name string, opts *metav1.DeleteOptions) error {
if opts == nil {
opts = &metav1.DeleteOptions{}
}
if opts == nil {
opts = &metav1.DeleteOptions{}
}
deleteOptionsByte, err := runtime.Encode(deleteOptionsCodec.LegacyCodec(schema.GroupVersion{Version: "v1"}), opts)
if err != nil {
return err
}
result := c.client.client.Delete().AbsPath(c.makeURLSegments(name)...).Body(deleteOptionsByte).Do()
return result.Error()
}
func (c *dynamicResourceClient) DeleteCollection(opts *metav1.DeleteOptions, listOptions metav1.ListOptions) error {
if len(c.subresource) > 0 {
return fmt.Errorf("deletecollection not supported for subresources")
}
if opts == nil {
opts = &metav1.DeleteOptions{}
}
deleteOptionsByte, err := runtime.Encode(deleteOptionsCodec.LegacyCodec(schema.GroupVersion{Version: "v1"}), opts)
if err != nil {
return err
}
result := c.client.client.Delete().AbsPath(c.makeURLSegments("")...).Body(deleteOptionsByte).SpecificallyVersionedParams(&listOptions, dynamicParameterCodec, versionV1).Do()
return result.Error()
}
func (c *dynamicResourceClient) Get(name string, opts metav1.GetOptions) (*unstructured.Unstructured, error) {
result := c.client.client.Get().AbsPath(c.makeURLSegments(name)...).SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1).Do()
if err := result.Error(); err != nil {
return nil, err
}
retBytes, err := result.Raw()
if err != nil {
return nil, err
}
uncastObj, err := runtime.Decode(unstructured.UnstructuredJSONScheme, retBytes)
if err != nil {
return nil, err
}
return uncastObj.(*unstructured.Unstructured), nil
}
func (c *dynamicResourceClient) List(opts metav1.ListOptions) (*unstructured.UnstructuredList, error) {
if len(c.subresource) > 0 {
return nil, fmt.Errorf("list not supported for subresources")
}
result := c.client.client.Get().AbsPath(c.makeURLSegments("")...).SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1).Do()
if err := result.Error(); err != nil {
return nil, err
}
retBytes, err := result.Raw()
if err != nil {
return nil, err
}
uncastObj, err := runtime.Decode(unstructured.UnstructuredJSONScheme, retBytes)
if err != nil {
return nil, err
}
if list, ok := uncastObj.(*unstructured.UnstructuredList); ok {
return list, nil
}
list, err := uncastObj.(*unstructured.Unstructured).ToList()
if err != nil {
return nil, err
}
return list, nil
}
func (c *dynamicResourceClient) Watch(opts metav1.ListOptions) (watch.Interface, error) {
if len(c.subresource) > 0 {
return nil, fmt.Errorf("watch not supported for subresources")
}
internalGV := schema.GroupVersions{
{Group: c.resource.Group, Version: runtime.APIVersionInternal},
// always include the legacy group as a decoding target to handle non-error `Status` return types
{Group: "", Version: runtime.APIVersionInternal},
}
s := &rest.Serializers{
Encoder: watchNegotiatedSerializerInstance.EncoderForVersion(watchJsonSerializerInfo.Serializer, c.resource.GroupVersion()),
Decoder: watchNegotiatedSerializerInstance.DecoderToVersion(watchJsonSerializerInfo.Serializer, internalGV),
RenegotiatedDecoder: func(contentType string, params map[string]string) (runtime.Decoder, error) {
return watchNegotiatedSerializerInstance.DecoderToVersion(watchJsonSerializerInfo.Serializer, internalGV), nil
},
StreamingSerializer: watchJsonSerializerInfo.StreamSerializer.Serializer,
Framer: watchJsonSerializerInfo.StreamSerializer.Framer,
}
wrappedDecoderFn := func(body io.ReadCloser) streaming.Decoder {
framer := s.Framer.NewFrameReader(body)
return streaming.NewDecoder(framer, s.StreamingSerializer)
}
opts.Watch = true
return c.client.client.Get().AbsPath(c.makeURLSegments("")...).
SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1).
WatchWithSpecificDecoders(wrappedDecoderFn, unstructured.UnstructuredJSONScheme)
}
func (c *dynamicResourceClient) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (*unstructured.Unstructured, error) {
result := c.client.client.Patch(pt).AbsPath(append(c.makeURLSegments(name), subresources...)...).Body(data).Do()
if err := result.Error(); err != nil {
return nil, err
}
retBytes, err := result.Raw()
if err != nil {
return nil, err
}
uncastObj, err := runtime.Decode(unstructured.UnstructuredJSONScheme, retBytes)
if err != nil {
return nil, err
}
return uncastObj.(*unstructured.Unstructured), nil
}
func (c *dynamicResourceClient) makeURLSegments(name string) []string {
url := []string{}
if len(c.resource.Group) == 0 {
url = append(url, "api")
} else {
url = append(url, "apis", c.resource.Group)
}
url = append(url, c.resource.Version)
if len(c.namespace) > 0 {
url = append(url, "namespaces", c.namespace)
}
url = append(url, c.resource.Resource)
if len(name) > 0 {
url = append(url, name)
// subresources only work on things with names
if len(c.subresource) > 0 {
url = append(url, c.subresource)
}
} else {
if len(c.subresource) > 0 {
panic("somehow snuck a subresource and an empty name. programmer error")
}
}
return url
}

View File

@ -317,10 +317,14 @@ func (r *Request) Param(paramName, s string) *Request {
// VersionedParams will not write query parameters that have omitempty set and are empty. If a // VersionedParams will not write query parameters that have omitempty set and are empty. If a
// parameter has already been set it is appended to (Params and VersionedParams are additive). // parameter has already been set it is appended to (Params and VersionedParams are additive).
func (r *Request) VersionedParams(obj runtime.Object, codec runtime.ParameterCodec) *Request { func (r *Request) VersionedParams(obj runtime.Object, codec runtime.ParameterCodec) *Request {
return r.SpecificallyVersionedParams(obj, codec, *r.content.GroupVersion)
}
func (r *Request) SpecificallyVersionedParams(obj runtime.Object, codec runtime.ParameterCodec, version schema.GroupVersion) *Request {
if r.err != nil { if r.err != nil {
return r return r
} }
params, err := codec.EncodeParameters(obj, *r.content.GroupVersion) params, err := codec.EncodeParameters(obj, version)
if err != nil { if err != nil {
r.err = err r.err = err
return r return r
@ -485,6 +489,19 @@ func (r *Request) tryThrottle() {
// Watch attempts to begin watching the requested location. // Watch attempts to begin watching the requested location.
// Returns a watch.Interface, or an error. // Returns a watch.Interface, or an error.
func (r *Request) Watch() (watch.Interface, error) { func (r *Request) Watch() (watch.Interface, error) {
return r.WatchWithSpecificDecoders(
func(body io.ReadCloser) streaming.Decoder {
framer := r.serializers.Framer.NewFrameReader(body)
return streaming.NewDecoder(framer, r.serializers.StreamingSerializer)
},
r.serializers.Decoder,
)
}
// WatchWithSpecificDecoders attempts to begin watching the requested location with a *different* decoder.
// Turns out that you want one "standard" decoder for the watch event and one "personal" decoder for the content
// Returns a watch.Interface, or an error.
func (r *Request) WatchWithSpecificDecoders(wrapperDecoderFn func(io.ReadCloser) streaming.Decoder, embeddedDecoder runtime.Decoder) (watch.Interface, error) {
// We specifically don't want to rate limit watches, so we // We specifically don't want to rate limit watches, so we
// don't use r.throttle here. // don't use r.throttle here.
if r.err != nil { if r.err != nil {
@ -532,9 +549,8 @@ func (r *Request) Watch() (watch.Interface, error) {
} }
return nil, fmt.Errorf("for request '%+v', got status: %v", url, resp.StatusCode) return nil, fmt.Errorf("for request '%+v', got status: %v", url, resp.StatusCode)
} }
framer := r.serializers.Framer.NewFrameReader(resp.Body) wrapperDecoder := wrapperDecoderFn(resp.Body)
decoder := streaming.NewDecoder(framer, r.serializers.StreamingSerializer) return watch.NewStreamWatcher(restclientwatch.NewDecoder(wrapperDecoder, embeddedDecoder)), nil
return watch.NewStreamWatcher(restclientwatch.NewDecoder(decoder, r.serializers.Decoder)), nil
} }
// updateURLMetrics is a convenience function for pushing metrics. // updateURLMetrics is a convenience function for pushing metrics.

View File

@ -23,7 +23,6 @@ go_library(
"//cmd/kube-apiserver/app:go_default_library", "//cmd/kube-apiserver/app:go_default_library",
"//cmd/kube-apiserver/app/options:go_default_library", "//cmd/kube-apiserver/app/options:go_default_library",
"//cmd/kubelet/app/options:go_default_library", "//cmd/kubelet/app/options:go_default_library",
"//pkg/api/legacyscheme:go_default_library",
"//pkg/controller/namespace:go_default_library", "//pkg/controller/namespace:go_default_library",
"//pkg/features:go_default_library", "//pkg/features:go_default_library",
"//pkg/kubelet/apis/kubeletconfig:go_default_library", "//pkg/kubelet/apis/kubeletconfig:go_default_library",

View File

@ -24,7 +24,6 @@ import (
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
"k8s.io/kubernetes/pkg/api/legacyscheme"
namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace" namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace"
) )
@ -56,12 +55,15 @@ func (n *NamespaceController) Start() error {
if err != nil { if err != nil {
return err return err
} }
clientPool := dynamic.NewClientPool(config, legacyscheme.Registry.RESTMapper(), dynamic.LegacyAPIPathResolverFunc) dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
return err
}
discoverResourcesFn := client.Discovery().ServerPreferredNamespacedResources discoverResourcesFn := client.Discovery().ServerPreferredNamespacedResources
informerFactory := informers.NewSharedInformerFactory(client, ncResyncPeriod) informerFactory := informers.NewSharedInformerFactory(client, ncResyncPeriod)
nc := namespacecontroller.NewNamespaceController( nc := namespacecontroller.NewNamespaceController(
client, client,
clientPool, dynamicClient,
discoverResourcesFn, discoverResourcesFn,
informerFactory.Core().V1().Namespaces(), informerFactory.Core().V1().Namespaces(),
ncResyncPeriod, v1.FinalizerKubernetes, ncResyncPeriod, v1.FinalizerKubernetes,

View File

@ -45,7 +45,7 @@ func TestDynamicClient(t *testing.T) {
} }
client := clientset.NewForConfigOrDie(config) client := clientset.NewForConfigOrDie(config)
dynamicClient, err := dynamic.NewClient(config) dynamicClient, err := dynamic.NewClient(config, *gv)
_ = dynamicClient _ = dynamicClient
if err != nil { if err != nil {
t.Fatalf("unexpected error creating dynamic client: %v", err) t.Fatalf("unexpected error creating dynamic client: %v", err)

View File

@ -153,7 +153,7 @@ func TestCRD(t *testing.T) {
barComConfig := *result.ClientConfig barComConfig := *result.ClientConfig
barComConfig.GroupVersion = &schema.GroupVersion{Group: "cr.bar.com", Version: "v1"} barComConfig.GroupVersion = &schema.GroupVersion{Group: "cr.bar.com", Version: "v1"}
barComConfig.APIPath = "/apis" barComConfig.APIPath = "/apis"
barComClient, err := dynamic.NewClient(&barComConfig) barComClient, err := dynamic.NewClient(&barComConfig, *barComConfig.GroupVersion)
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }