Preserve target apiVersion when decoding into unstructured lists

This commit is contained in:
Jordan Liggitt 2020-03-10 03:13:20 -04:00
parent 5877945048
commit fa12441ab9
4 changed files with 307 additions and 5 deletions

View File

@ -102,12 +102,27 @@ go_test(
deps = [ deps = [
"//staging/src/k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1:go_default_library", "//staging/src/k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1:go_default_library",
"//staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/conversion:go_default_library", "//staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/conversion:go_default_library",
"//staging/src/k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake:go_default_library",
"//staging/src/k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions:go_default_library",
"//staging/src/k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1:go_default_library", "//staging/src/k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1:go_default_library",
"//staging/src/k8s.io/apiextensions-apiserver/pkg/controller/establish:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/discovery:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/discovery:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/generic:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/options:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3/testing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/webhook:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/sigs.k8s.io/yaml:go_default_library", "//vendor/sigs.k8s.io/yaml:go_default_library",
], ],

View File

@ -17,22 +17,43 @@ limitations under the License.
package apiserver package apiserver
import ( import (
"context"
"encoding/json" "encoding/json"
"io" "io"
"io/ioutil" "io/ioutil"
"net"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"sigs.k8s.io/yaml" "net/url"
"strconv"
"testing" "testing"
"time"
"sigs.k8s.io/yaml"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apiextensions-apiserver/pkg/apiserver/conversion" "k8s.io/apiextensions-apiserver/pkg/apiserver/conversion"
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake"
informers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
listers "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1" listers "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1"
"k8s.io/apiextensions-apiserver/pkg/controller/establish"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/protobuf" "k8s.io/apimachinery/pkg/runtime/serializer/protobuf"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/endpoints/discovery" "k8s.io/apiserver/pkg/endpoints/discovery"
apirequest "k8s.io/apiserver/pkg/endpoints/request" apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/generic"
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/server/options"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
"k8s.io/apiserver/pkg/util/webhook"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
) )
@ -418,3 +439,246 @@ func TestRouting(t *testing.T) {
}) })
} }
} }
func TestHandlerConversionWithWatchCache(t *testing.T) {
testHandlerConversion(t, true)
}
func TestHandlerConversionWithoutWatchCache(t *testing.T) {
testHandlerConversion(t, false)
}
func testHandlerConversion(t *testing.T, enableWatchCache bool) {
cl := fake.NewSimpleClientset()
informers := informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0)
crdInformer := informers.Apiextensions().V1().CustomResourceDefinitions()
server, storageConfig := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
defer server.Terminate(t)
crd := multiVersionFixture.DeepCopy()
if _, err := cl.ApiextensionsV1().CustomResourceDefinitions().Create(context.TODO(), crd, metav1.CreateOptions{}); err != nil {
t.Fatal(err)
}
if err := crdInformer.Informer().GetStore().Add(crd); err != nil {
t.Fatal(err)
}
etcdOptions := options.NewEtcdOptions(storageConfig)
etcdOptions.StorageConfig.Codec = unstructured.UnstructuredJSONScheme
restOptionsGetter := generic.RESTOptions{
StorageConfig: &etcdOptions.StorageConfig,
Decorator: generic.UndecoratedStorage,
EnableGarbageCollection: true,
DeleteCollectionWorkers: 1,
ResourcePrefix: crd.Spec.Group + "/" + crd.Spec.Names.Plural,
CountMetricPollPeriod: time.Minute,
}
if enableWatchCache {
restOptionsGetter.Decorator = genericregistry.StorageWithCacher(100)
}
handler, err := NewCustomResourceDefinitionHandler(
&versionDiscoveryHandler{}, &groupDiscoveryHandler{},
crdInformer,
http.HandlerFunc(func(http.ResponseWriter, *http.Request) {}),
restOptionsGetter,
dummyAdmissionImpl{},
&establish.EstablishingController{},
dummyServiceResolverImpl{},
func(r webhook.AuthenticationInfoResolver) webhook.AuthenticationInfoResolver { return r },
1,
dummyAuthorizerImpl{},
time.Minute, time.Minute, nil, 3*1024*1024)
if err != nil {
t.Fatal(err)
}
crdInfo, err := handler.getOrCreateServingInfoFor(crd.UID, crd.Name)
if err != nil {
t.Fatal(err)
}
updateValidateFunc := func(ctx context.Context, obj, old runtime.Object) error { return nil }
validateFunc := func(ctx context.Context, obj runtime.Object) error { return nil }
startResourceVersion := ""
if enableWatchCache {
// Let watch cache establish initial list
time.Sleep(time.Second)
}
// Create and delete a marker object to get a starting resource version
{
u := &unstructured.Unstructured{Object: map[string]interface{}{}}
u.SetGroupVersionKind(schema.GroupVersionKind{Group: "stable.example.com", Version: "v1beta1", Kind: "MultiVersion"})
u.SetName("marker")
if item, err := crdInfo.storages["v1beta1"].CustomResource.Create(context.TODO(), u, validateFunc, &metav1.CreateOptions{}); err != nil {
t.Fatal(err)
} else {
startResourceVersion = item.(*unstructured.Unstructured).GetResourceVersion()
}
if _, _, err := crdInfo.storages["v1beta1"].CustomResource.Delete(context.TODO(), u.GetName(), validateFunc, &metav1.DeleteOptions{}); err != nil {
t.Fatal(err)
}
}
// Create and get every version, expect returned result to match creation GVK
for _, version := range crd.Spec.Versions {
expectGVK := schema.GroupVersionKind{Group: "stable.example.com", Version: version.Name, Kind: "MultiVersion"}
u := &unstructured.Unstructured{Object: map[string]interface{}{}}
u.SetGroupVersionKind(expectGVK)
u.SetName("my-" + version.Name)
unstructured.SetNestedField(u.Object, int64(1), "spec", "num")
// Create
if item, err := crdInfo.storages[version.Name].CustomResource.Create(context.TODO(), u, validateFunc, &metav1.CreateOptions{}); err != nil {
t.Fatal(err)
} else if item.GetObjectKind().GroupVersionKind() != expectGVK {
t.Errorf("expected create result to be %#v, got %#v", expectGVK, item.GetObjectKind().GroupVersionKind())
} else {
u = item.(*unstructured.Unstructured)
}
// Update
u.SetAnnotations(map[string]string{"updated": "true"})
if item, _, err := crdInfo.storages[version.Name].CustomResource.Update(context.TODO(), u.GetName(), rest.DefaultUpdatedObjectInfo(u), validateFunc, updateValidateFunc, false, &metav1.UpdateOptions{}); err != nil {
t.Fatal(err)
} else if item.GetObjectKind().GroupVersionKind() != expectGVK {
t.Errorf("expected update result to be %#v, got %#v", expectGVK, item.GetObjectKind().GroupVersionKind())
}
// Get
if item, err := crdInfo.storages[version.Name].CustomResource.Get(context.TODO(), u.GetName(), &metav1.GetOptions{}); err != nil {
t.Fatal(err)
} else if item.GetObjectKind().GroupVersionKind() != expectGVK {
t.Errorf("expected get result to be %#v, got %#v", expectGVK, item.GetObjectKind().GroupVersionKind())
}
if enableWatchCache {
// Allow time to propagate the create into the cache
time.Sleep(time.Second)
// Get cached
if item, err := crdInfo.storages[version.Name].CustomResource.Get(context.TODO(), u.GetName(), &metav1.GetOptions{ResourceVersion: "0"}); err != nil {
t.Fatal(err)
} else if item.GetObjectKind().GroupVersionKind() != expectGVK {
t.Errorf("expected cached get result to be %#v, got %#v", expectGVK, item.GetObjectKind().GroupVersionKind())
}
}
}
// List every version, expect all returned items to match request GVK
for _, version := range crd.Spec.Versions {
expectGVK := schema.GroupVersionKind{Group: "stable.example.com", Version: version.Name, Kind: "MultiVersion"}
if list, err := crdInfo.storages[version.Name].CustomResource.List(context.TODO(), &metainternalversion.ListOptions{}); err != nil {
t.Fatal(err)
} else {
for _, item := range list.(*unstructured.UnstructuredList).Items {
if item.GroupVersionKind() != expectGVK {
t.Errorf("expected list item to be %#v, got %#v", expectGVK, item.GroupVersionKind())
}
}
}
if enableWatchCache {
// List from watch cache
if list, err := crdInfo.storages[version.Name].CustomResource.List(context.TODO(), &metainternalversion.ListOptions{ResourceVersion: "0"}); err != nil {
t.Fatal(err)
} else {
for _, item := range list.(*unstructured.UnstructuredList).Items {
if item.GroupVersionKind() != expectGVK {
t.Errorf("expected cached list item to be %#v, got %#v", expectGVK, item.GroupVersionKind())
}
}
}
}
watch, err := crdInfo.storages[version.Name].CustomResource.Watch(context.TODO(), &metainternalversion.ListOptions{ResourceVersion: startResourceVersion})
if err != nil {
t.Fatal(err)
}
// 5 events: delete marker, create v1alpha1, create v1beta1, update v1alpha1, update v1beta1
for i := 0; i < 5; i++ {
select {
case event, ok := <-watch.ResultChan():
if !ok {
t.Fatalf("watch closed")
}
item, isUnstructured := event.Object.(*unstructured.Unstructured)
if !isUnstructured {
t.Fatalf("unexpected object type %T: %#v", item, event)
}
if item.GroupVersionKind() != expectGVK {
t.Errorf("expected watch object to be %#v, got %#v", expectGVK, item.GroupVersionKind())
}
case <-time.After(time.Second):
t.Errorf("timed out waiting for watch event")
}
}
// Expect no more watch events
select {
case event := <-watch.ResultChan():
t.Errorf("unexpected event: %#v", event)
case <-time.After(time.Second):
}
}
}
type dummyAdmissionImpl struct{}
func (dummyAdmissionImpl) Handles(operation admission.Operation) bool { return false }
type dummyAuthorizerImpl struct{}
func (dummyAuthorizerImpl) Authorize(ctx context.Context, a authorizer.Attributes) (authorized authorizer.Decision, reason string, err error) {
return authorizer.DecisionAllow, "", nil
}
type dummyServiceResolverImpl struct{}
func (dummyServiceResolverImpl) ResolveEndpoint(namespace, name string, port int32) (*url.URL, error) {
return &url.URL{Scheme: "https", Host: net.JoinHostPort(name+"."+namespace+".svc", strconv.Itoa(int(port)))}, nil
}
var multiVersionFixture = &apiextensionsv1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{Name: "multiversion.stable.example.com", UID: types.UID("12345")},
Spec: apiextensionsv1.CustomResourceDefinitionSpec{
Group: "stable.example.com",
Names: apiextensionsv1.CustomResourceDefinitionNames{
Plural: "multiversion", Singular: "multiversion", Kind: "MultiVersion", ShortNames: []string{"mv"}, ListKind: "MultiVersionList", Categories: []string{"all"},
},
Conversion: &apiextensionsv1.CustomResourceConversion{Strategy: apiextensionsv1.NoneConverter},
Scope: apiextensionsv1.ClusterScoped,
PreserveUnknownFields: false,
Versions: []apiextensionsv1.CustomResourceDefinitionVersion{
{
// storage version, same schema as v1alpha1
Name: "v1beta1", Served: true, Storage: true,
Subresources: &apiextensionsv1.CustomResourceSubresources{Status: &apiextensionsv1.CustomResourceSubresourceStatus{}},
Schema: &apiextensionsv1.CustomResourceValidation{
OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{
Type: "object",
Properties: map[string]apiextensionsv1.JSONSchemaProps{"num": {Type: "integer", Description: "v1beta1 num field"}},
},
},
},
{
// same schema as v1beta1
Name: "v1alpha1", Served: true, Storage: false,
Subresources: &apiextensionsv1.CustomResourceSubresources{Status: &apiextensionsv1.CustomResourceSubresourceStatus{}},
Schema: &apiextensionsv1.CustomResourceValidation{
OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{
Type: "object",
Properties: map[string]apiextensionsv1.JSONSchemaProps{"num": {Type: "integer", Description: "v1alpha1 num field"}},
},
},
},
},
},
Status: apiextensionsv1.CustomResourceDefinitionStatus{
AcceptedNames: apiextensionsv1.CustomResourceDefinitionNames{
Plural: "multiversion", Singular: "multiversion", Kind: "MultiVersion", ShortNames: []string{"mv"}, ListKind: "MultiVersionList", Categories: []string{"all"},
},
},
}

View File

@ -68,6 +68,7 @@ go_library(
deps = [ deps = [
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/conversion:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/conversion:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",

View File

@ -32,6 +32,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
@ -394,6 +395,8 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin
return fmt.Errorf("need ptr to slice: %v", err) return fmt.Errorf("need ptr to slice: %v", err)
} }
newItemFunc := getNewItemFunc(listObj, v)
key = path.Join(s.pathPrefix, key) key = path.Join(s.pathPrefix, key)
startTime := time.Now() startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key, s.getOps...) getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
@ -410,7 +413,7 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin
if err != nil { if err != nil {
return storage.NewInternalError(err.Error()) return storage.NewInternalError(err.Error())
} }
if err := appendListItem(v, data, uint64(getResp.Kvs[0].ModRevision), pred, s.codec, s.versioner); err != nil { if err := appendListItem(v, data, uint64(getResp.Kvs[0].ModRevision), pred, s.codec, s.versioner, newItemFunc); err != nil {
return err return err
} }
} }
@ -418,6 +421,23 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin
return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision), "", nil) return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision), "", nil)
} }
func getNewItemFunc(listObj runtime.Object, v reflect.Value) func() runtime.Object {
// For unstructured lists with a target group/version, preserve the group/version in the instantiated list items
if unstructuredList, isUnstructured := listObj.(*unstructured.UnstructuredList); isUnstructured {
if apiVersion := unstructuredList.GetAPIVersion(); len(apiVersion) > 0 {
return func() runtime.Object {
return &unstructured.Unstructured{Object: map[string]interface{}{"apiVersion": apiVersion}}
}
}
}
// Otherwise just instantiate an empty item
elem := v.Type().Elem()
return func() runtime.Object {
return reflect.New(elem).Interface().(runtime.Object)
}
}
func (s *store) Count(key string) (int64, error) { func (s *store) Count(key string) (int64, error) {
key = path.Join(s.pathPrefix, key) key = path.Join(s.pathPrefix, key)
startTime := time.Now() startTime := time.Now()
@ -525,6 +545,8 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
options = append(options, clientv3.WithLimit(pred.Limit)) options = append(options, clientv3.WithLimit(pred.Limit))
} }
newItemFunc := getNewItemFunc(listObj, v)
var returnedRV, continueRV int64 var returnedRV, continueRV int64
var continueKey string var continueKey string
switch { switch {
@ -609,7 +631,7 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
return storage.NewInternalErrorf("unable to transform key %q: %v", kv.Key, err) return storage.NewInternalErrorf("unable to transform key %q: %v", kv.Key, err)
} }
if err := appendListItem(v, data, uint64(kv.ModRevision), pred, s.codec, s.versioner); err != nil { if err := appendListItem(v, data, uint64(kv.ModRevision), pred, s.codec, s.versioner, newItemFunc); err != nil {
return err return err
} }
} }
@ -832,8 +854,8 @@ func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objP
} }
// appendListItem decodes and appends the object (if it passes filter) to v, which must be a slice. // appendListItem decodes and appends the object (if it passes filter) to v, which must be a slice.
func appendListItem(v reflect.Value, data []byte, rev uint64, pred storage.SelectionPredicate, codec runtime.Codec, versioner storage.Versioner) error { func appendListItem(v reflect.Value, data []byte, rev uint64, pred storage.SelectionPredicate, codec runtime.Codec, versioner storage.Versioner, newItemFunc func() runtime.Object) error {
obj, _, err := codec.Decode(data, nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object)) obj, _, err := codec.Decode(data, nil, newItemFunc())
if err != nil { if err != nil {
return err return err
} }