From fa12441ab99cac81b0034208fd10d8a4fc3d5bd0 Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Tue, 10 Mar 2020 03:13:20 -0400 Subject: [PATCH] Preserve target apiVersion when decoding into unstructured lists --- .../pkg/apiserver/BUILD | 15 + .../apiserver/customresource_handler_test.go | 266 +++++++++++++++++- .../k8s.io/apiserver/pkg/storage/etcd3/BUILD | 1 + .../apiserver/pkg/storage/etcd3/store.go | 30 +- 4 files changed, 307 insertions(+), 5 deletions(-) diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/BUILD b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/BUILD index 2bb62a08081..6dbc209028e 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/BUILD +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/BUILD @@ -102,12 +102,27 @@ go_test( deps = [ "//staging/src/k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1:go_default_library", "//staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/conversion:go_default_library", + "//staging/src/k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake:go_default_library", + "//staging/src/k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions:go_default_library", "//staging/src/k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1:go_default_library", + "//staging/src/k8s.io/apiextensions-apiserver/pkg/controller/establish:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/admission:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/discovery:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/registry/generic:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/server/options:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/storage/etcd3/testing:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/webhook:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//vendor/sigs.k8s.io/yaml:go_default_library", ], diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler_test.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler_test.go index 874390f21d3..da05e803d72 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler_test.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler_test.go @@ -17,22 +17,43 @@ limitations under the License. package apiserver import ( + "context" "encoding/json" "io" "io/ioutil" + "net" "net/http" "net/http/httptest" - "sigs.k8s.io/yaml" + "net/url" + "strconv" "testing" + "time" + + "sigs.k8s.io/yaml" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apiextensions-apiserver/pkg/apiserver/conversion" + "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake" + informers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions" listers "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1" + "k8s.io/apiextensions-apiserver/pkg/controller/establish" + metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer/protobuf" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apiserver/pkg/admission" + "k8s.io/apiserver/pkg/authorization/authorizer" "k8s.io/apiserver/pkg/endpoints/discovery" apirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/registry/generic" + genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" + "k8s.io/apiserver/pkg/registry/rest" + "k8s.io/apiserver/pkg/server/options" + etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" + "k8s.io/apiserver/pkg/util/webhook" "k8s.io/client-go/tools/cache" ) @@ -418,3 +439,246 @@ func TestRouting(t *testing.T) { }) } } + +func TestHandlerConversionWithWatchCache(t *testing.T) { + testHandlerConversion(t, true) +} + +func TestHandlerConversionWithoutWatchCache(t *testing.T) { + testHandlerConversion(t, false) +} + +func testHandlerConversion(t *testing.T, enableWatchCache bool) { + cl := fake.NewSimpleClientset() + informers := informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0) + crdInformer := informers.Apiextensions().V1().CustomResourceDefinitions() + + server, storageConfig := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) + defer server.Terminate(t) + + crd := multiVersionFixture.DeepCopy() + if _, err := cl.ApiextensionsV1().CustomResourceDefinitions().Create(context.TODO(), crd, metav1.CreateOptions{}); err != nil { + t.Fatal(err) + } + if err := crdInformer.Informer().GetStore().Add(crd); err != nil { + t.Fatal(err) + } + + etcdOptions := options.NewEtcdOptions(storageConfig) + etcdOptions.StorageConfig.Codec = unstructured.UnstructuredJSONScheme + restOptionsGetter := generic.RESTOptions{ + StorageConfig: &etcdOptions.StorageConfig, + Decorator: generic.UndecoratedStorage, + EnableGarbageCollection: true, + DeleteCollectionWorkers: 1, + ResourcePrefix: crd.Spec.Group + "/" + crd.Spec.Names.Plural, + CountMetricPollPeriod: time.Minute, + } + if enableWatchCache { + restOptionsGetter.Decorator = genericregistry.StorageWithCacher(100) + } + + handler, err := NewCustomResourceDefinitionHandler( + &versionDiscoveryHandler{}, &groupDiscoveryHandler{}, + crdInformer, + http.HandlerFunc(func(http.ResponseWriter, *http.Request) {}), + restOptionsGetter, + dummyAdmissionImpl{}, + &establish.EstablishingController{}, + dummyServiceResolverImpl{}, + func(r webhook.AuthenticationInfoResolver) webhook.AuthenticationInfoResolver { return r }, + 1, + dummyAuthorizerImpl{}, + time.Minute, time.Minute, nil, 3*1024*1024) + if err != nil { + t.Fatal(err) + } + + crdInfo, err := handler.getOrCreateServingInfoFor(crd.UID, crd.Name) + if err != nil { + t.Fatal(err) + } + + updateValidateFunc := func(ctx context.Context, obj, old runtime.Object) error { return nil } + validateFunc := func(ctx context.Context, obj runtime.Object) error { return nil } + startResourceVersion := "" + + if enableWatchCache { + // Let watch cache establish initial list + time.Sleep(time.Second) + } + + // Create and delete a marker object to get a starting resource version + { + u := &unstructured.Unstructured{Object: map[string]interface{}{}} + u.SetGroupVersionKind(schema.GroupVersionKind{Group: "stable.example.com", Version: "v1beta1", Kind: "MultiVersion"}) + u.SetName("marker") + if item, err := crdInfo.storages["v1beta1"].CustomResource.Create(context.TODO(), u, validateFunc, &metav1.CreateOptions{}); err != nil { + t.Fatal(err) + } else { + startResourceVersion = item.(*unstructured.Unstructured).GetResourceVersion() + } + if _, _, err := crdInfo.storages["v1beta1"].CustomResource.Delete(context.TODO(), u.GetName(), validateFunc, &metav1.DeleteOptions{}); err != nil { + t.Fatal(err) + } + } + + // Create and get every version, expect returned result to match creation GVK + for _, version := range crd.Spec.Versions { + expectGVK := schema.GroupVersionKind{Group: "stable.example.com", Version: version.Name, Kind: "MultiVersion"} + u := &unstructured.Unstructured{Object: map[string]interface{}{}} + u.SetGroupVersionKind(expectGVK) + u.SetName("my-" + version.Name) + unstructured.SetNestedField(u.Object, int64(1), "spec", "num") + + // Create + if item, err := crdInfo.storages[version.Name].CustomResource.Create(context.TODO(), u, validateFunc, &metav1.CreateOptions{}); err != nil { + t.Fatal(err) + } else if item.GetObjectKind().GroupVersionKind() != expectGVK { + t.Errorf("expected create result to be %#v, got %#v", expectGVK, item.GetObjectKind().GroupVersionKind()) + } else { + u = item.(*unstructured.Unstructured) + } + + // Update + u.SetAnnotations(map[string]string{"updated": "true"}) + if item, _, err := crdInfo.storages[version.Name].CustomResource.Update(context.TODO(), u.GetName(), rest.DefaultUpdatedObjectInfo(u), validateFunc, updateValidateFunc, false, &metav1.UpdateOptions{}); err != nil { + t.Fatal(err) + } else if item.GetObjectKind().GroupVersionKind() != expectGVK { + t.Errorf("expected update result to be %#v, got %#v", expectGVK, item.GetObjectKind().GroupVersionKind()) + } + + // Get + if item, err := crdInfo.storages[version.Name].CustomResource.Get(context.TODO(), u.GetName(), &metav1.GetOptions{}); err != nil { + t.Fatal(err) + } else if item.GetObjectKind().GroupVersionKind() != expectGVK { + t.Errorf("expected get result to be %#v, got %#v", expectGVK, item.GetObjectKind().GroupVersionKind()) + } + + if enableWatchCache { + // Allow time to propagate the create into the cache + time.Sleep(time.Second) + // Get cached + if item, err := crdInfo.storages[version.Name].CustomResource.Get(context.TODO(), u.GetName(), &metav1.GetOptions{ResourceVersion: "0"}); err != nil { + t.Fatal(err) + } else if item.GetObjectKind().GroupVersionKind() != expectGVK { + t.Errorf("expected cached get result to be %#v, got %#v", expectGVK, item.GetObjectKind().GroupVersionKind()) + } + } + } + + // List every version, expect all returned items to match request GVK + for _, version := range crd.Spec.Versions { + expectGVK := schema.GroupVersionKind{Group: "stable.example.com", Version: version.Name, Kind: "MultiVersion"} + + if list, err := crdInfo.storages[version.Name].CustomResource.List(context.TODO(), &metainternalversion.ListOptions{}); err != nil { + t.Fatal(err) + } else { + for _, item := range list.(*unstructured.UnstructuredList).Items { + if item.GroupVersionKind() != expectGVK { + t.Errorf("expected list item to be %#v, got %#v", expectGVK, item.GroupVersionKind()) + } + } + } + + if enableWatchCache { + // List from watch cache + if list, err := crdInfo.storages[version.Name].CustomResource.List(context.TODO(), &metainternalversion.ListOptions{ResourceVersion: "0"}); err != nil { + t.Fatal(err) + } else { + for _, item := range list.(*unstructured.UnstructuredList).Items { + if item.GroupVersionKind() != expectGVK { + t.Errorf("expected cached list item to be %#v, got %#v", expectGVK, item.GroupVersionKind()) + } + } + } + } + + watch, err := crdInfo.storages[version.Name].CustomResource.Watch(context.TODO(), &metainternalversion.ListOptions{ResourceVersion: startResourceVersion}) + if err != nil { + t.Fatal(err) + } + // 5 events: delete marker, create v1alpha1, create v1beta1, update v1alpha1, update v1beta1 + for i := 0; i < 5; i++ { + select { + case event, ok := <-watch.ResultChan(): + if !ok { + t.Fatalf("watch closed") + } + item, isUnstructured := event.Object.(*unstructured.Unstructured) + if !isUnstructured { + t.Fatalf("unexpected object type %T: %#v", item, event) + } + if item.GroupVersionKind() != expectGVK { + t.Errorf("expected watch object to be %#v, got %#v", expectGVK, item.GroupVersionKind()) + } + case <-time.After(time.Second): + t.Errorf("timed out waiting for watch event") + } + } + // Expect no more watch events + select { + case event := <-watch.ResultChan(): + t.Errorf("unexpected event: %#v", event) + case <-time.After(time.Second): + } + } +} + +type dummyAdmissionImpl struct{} + +func (dummyAdmissionImpl) Handles(operation admission.Operation) bool { return false } + +type dummyAuthorizerImpl struct{} + +func (dummyAuthorizerImpl) Authorize(ctx context.Context, a authorizer.Attributes) (authorized authorizer.Decision, reason string, err error) { + return authorizer.DecisionAllow, "", nil +} + +type dummyServiceResolverImpl struct{} + +func (dummyServiceResolverImpl) ResolveEndpoint(namespace, name string, port int32) (*url.URL, error) { + return &url.URL{Scheme: "https", Host: net.JoinHostPort(name+"."+namespace+".svc", strconv.Itoa(int(port)))}, nil +} + +var multiVersionFixture = &apiextensionsv1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{Name: "multiversion.stable.example.com", UID: types.UID("12345")}, + Spec: apiextensionsv1.CustomResourceDefinitionSpec{ + Group: "stable.example.com", + Names: apiextensionsv1.CustomResourceDefinitionNames{ + Plural: "multiversion", Singular: "multiversion", Kind: "MultiVersion", ShortNames: []string{"mv"}, ListKind: "MultiVersionList", Categories: []string{"all"}, + }, + Conversion: &apiextensionsv1.CustomResourceConversion{Strategy: apiextensionsv1.NoneConverter}, + Scope: apiextensionsv1.ClusterScoped, + PreserveUnknownFields: false, + Versions: []apiextensionsv1.CustomResourceDefinitionVersion{ + { + // storage version, same schema as v1alpha1 + Name: "v1beta1", Served: true, Storage: true, + Subresources: &apiextensionsv1.CustomResourceSubresources{Status: &apiextensionsv1.CustomResourceSubresourceStatus{}}, + Schema: &apiextensionsv1.CustomResourceValidation{ + OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{ + Type: "object", + Properties: map[string]apiextensionsv1.JSONSchemaProps{"num": {Type: "integer", Description: "v1beta1 num field"}}, + }, + }, + }, + { + // same schema as v1beta1 + Name: "v1alpha1", Served: true, Storage: false, + Subresources: &apiextensionsv1.CustomResourceSubresources{Status: &apiextensionsv1.CustomResourceSubresourceStatus{}}, + Schema: &apiextensionsv1.CustomResourceValidation{ + OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{ + Type: "object", + Properties: map[string]apiextensionsv1.JSONSchemaProps{"num": {Type: "integer", Description: "v1alpha1 num field"}}, + }, + }, + }, + }, + }, + Status: apiextensionsv1.CustomResourceDefinitionStatus{ + AcceptedNames: apiextensionsv1.CustomResourceDefinitionNames{ + Plural: "multiversion", Singular: "multiversion", Kind: "MultiVersion", ShortNames: []string{"mv"}, ListKind: "MultiVersionList", Categories: []string{"all"}, + }, + }, +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD index e95c24c4e98..a233ea072ce 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD @@ -68,6 +68,7 @@ go_library( deps = [ "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/conversion:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index a90675784a2..ac92a99a992 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -32,6 +32,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" @@ -394,6 +395,8 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin return fmt.Errorf("need ptr to slice: %v", err) } + newItemFunc := getNewItemFunc(listObj, v) + key = path.Join(s.pathPrefix, key) startTime := time.Now() getResp, err := s.client.KV.Get(ctx, key, s.getOps...) @@ -410,7 +413,7 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin if err != nil { return storage.NewInternalError(err.Error()) } - if err := appendListItem(v, data, uint64(getResp.Kvs[0].ModRevision), pred, s.codec, s.versioner); err != nil { + if err := appendListItem(v, data, uint64(getResp.Kvs[0].ModRevision), pred, s.codec, s.versioner, newItemFunc); err != nil { return err } } @@ -418,6 +421,23 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision), "", nil) } +func getNewItemFunc(listObj runtime.Object, v reflect.Value) func() runtime.Object { + // For unstructured lists with a target group/version, preserve the group/version in the instantiated list items + if unstructuredList, isUnstructured := listObj.(*unstructured.UnstructuredList); isUnstructured { + if apiVersion := unstructuredList.GetAPIVersion(); len(apiVersion) > 0 { + return func() runtime.Object { + return &unstructured.Unstructured{Object: map[string]interface{}{"apiVersion": apiVersion}} + } + } + } + + // Otherwise just instantiate an empty item + elem := v.Type().Elem() + return func() runtime.Object { + return reflect.New(elem).Interface().(runtime.Object) + } +} + func (s *store) Count(key string) (int64, error) { key = path.Join(s.pathPrefix, key) startTime := time.Now() @@ -525,6 +545,8 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor options = append(options, clientv3.WithLimit(pred.Limit)) } + newItemFunc := getNewItemFunc(listObj, v) + var returnedRV, continueRV int64 var continueKey string switch { @@ -609,7 +631,7 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor return storage.NewInternalErrorf("unable to transform key %q: %v", kv.Key, err) } - if err := appendListItem(v, data, uint64(kv.ModRevision), pred, s.codec, s.versioner); err != nil { + if err := appendListItem(v, data, uint64(kv.ModRevision), pred, s.codec, s.versioner, newItemFunc); err != nil { return err } } @@ -832,8 +854,8 @@ func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objP } // appendListItem decodes and appends the object (if it passes filter) to v, which must be a slice. -func appendListItem(v reflect.Value, data []byte, rev uint64, pred storage.SelectionPredicate, codec runtime.Codec, versioner storage.Versioner) error { - obj, _, err := codec.Decode(data, nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object)) +func appendListItem(v reflect.Value, data []byte, rev uint64, pred storage.SelectionPredicate, codec runtime.Codec, versioner storage.Versioner, newItemFunc func() runtime.Object) error { + obj, _, err := codec.Decode(data, nil, newItemFunc()) if err != nil { return err }