From e23769f018ff50763656f99edd907e91e8bfff3e Mon Sep 17 00:00:00 2001 From: Joe Betz Date: Wed, 8 May 2019 22:42:41 -0700 Subject: [PATCH] Add webhook converter integration test suite --- hack/verify-symbols.sh | 1 + .../test/integration/BUILD | 7 + .../test/integration/conversion_test.go | 564 ++++++++++++++++++ .../test/integration/convert/BUILD | 29 + .../test/integration/convert/webhook.go | 206 +++++++ .../test/integration/storage/BUILD | 31 + .../test/integration/storage/objectreader.go | 111 ++++ .../apiserver/pkg/storage/etcd3/watcher.go | 6 + 8 files changed, 955 insertions(+) create mode 100644 staging/src/k8s.io/apiextensions-apiserver/test/integration/conversion_test.go create mode 100644 staging/src/k8s.io/apiextensions-apiserver/test/integration/convert/BUILD create mode 100644 staging/src/k8s.io/apiextensions-apiserver/test/integration/convert/webhook.go create mode 100644 staging/src/k8s.io/apiextensions-apiserver/test/integration/storage/BUILD create mode 100644 staging/src/k8s.io/apiextensions-apiserver/test/integration/storage/objectreader.go diff --git a/hack/verify-symbols.sh b/hack/verify-symbols.sh index 8f0ef8dce3d..597446f257d 100755 --- a/hack/verify-symbols.sh +++ b/hack/verify-symbols.sh @@ -30,6 +30,7 @@ BADSYMBOLS=( "httptest" "testify" "testing[.]" + "TestOnlySetFatalOnDecodeError" ) # b/c hyperkube binds everything simply check that for bad symbols diff --git a/staging/src/k8s.io/apiextensions-apiserver/test/integration/BUILD b/staging/src/k8s.io/apiextensions-apiserver/test/integration/BUILD index eebb1c7511b..0a14e9889f9 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/test/integration/BUILD +++ b/staging/src/k8s.io/apiextensions-apiserver/test/integration/BUILD @@ -12,6 +12,7 @@ go_test( "apply_test.go", "basic_test.go", "change_test.go", + "conversion_test.go", "finalization_test.go", "objectmeta_test.go", "registration_test.go", @@ -30,7 +31,9 @@ go_test( "//staging/src/k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/scheme:go_default_library", "//staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/options:go_default_library", "//staging/src/k8s.io/apiextensions-apiserver/pkg/features:go_default_library", + "//staging/src/k8s.io/apiextensions-apiserver/test/integration/convert:go_default_library", "//staging/src/k8s.io/apiextensions-apiserver/test/integration/fixtures:go_default_library", + "//staging/src/k8s.io/apiextensions-apiserver/test/integration/storage:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", @@ -41,11 +44,13 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/json:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/yaml:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library", "//staging/src/k8s.io/apiserver/pkg/features:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/dynamic:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", @@ -69,7 +74,9 @@ filegroup( name = "all-srcs", srcs = [ ":package-srcs", + "//staging/src/k8s.io/apiextensions-apiserver/test/integration/convert:all-srcs", "//staging/src/k8s.io/apiextensions-apiserver/test/integration/fixtures:all-srcs", + "//staging/src/k8s.io/apiextensions-apiserver/test/integration/storage:all-srcs", ], tags = ["automanaged"], ) diff --git a/staging/src/k8s.io/apiextensions-apiserver/test/integration/conversion_test.go b/staging/src/k8s.io/apiextensions-apiserver/test/integration/conversion_test.go new file mode 100644 index 00000000000..20068952770 --- /dev/null +++ b/staging/src/k8s.io/apiextensions-apiserver/test/integration/conversion_test.go @@ -0,0 +1,564 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "encoding/json" + "fmt" + "net/http" + "reflect" + "strings" + "testing" + "time" + + "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + "k8s.io/apiextensions-apiserver/pkg/cmd/server/options" + serveroptions "k8s.io/apiextensions-apiserver/pkg/cmd/server/options" + apiextensionsfeatures "k8s.io/apiextensions-apiserver/pkg/features" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/uuid" + etcd3watcher "k8s.io/apiserver/pkg/storage/etcd3" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/dynamic" + featuregatetesting "k8s.io/component-base/featuregate/testing" + + apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + "k8s.io/apiextensions-apiserver/test/integration/convert" + "k8s.io/apiextensions-apiserver/test/integration/fixtures" + "k8s.io/apiextensions-apiserver/test/integration/storage" +) + +type Checker func(t *testing.T, ctc *conversionTestContext) + +func checks(checkers ...Checker) []Checker { + return checkers +} + +func TestWebhookConverter(t *testing.T) { + tests := []struct { + group string + handler http.Handler + checks []Checker + }{ + { + group: "noop-converter", + handler: convert.NewObjectConverterWebhookHandler(t, noopConverter), + checks: checks(validateStorageVersion, validateServed, validateMixedStorageVersions), + }, + { + group: "nontrivial-converter", + handler: convert.NewObjectConverterWebhookHandler(t, nontrivialConverter), + checks: checks(validateStorageVersion, validateServed, validateMixedStorageVersions), + }, + { + group: "empty-response", + handler: convert.NewReviewWebhookHandler(t, emptyResponseConverter), + checks: checks(expectConversionFailureMessage("empty-response", "expected 1 converted objects")), + }, + { + group: "failure-message", + handler: convert.NewReviewWebhookHandler(t, failureResponseConverter("custom webhook conversion error")), + checks: checks(expectConversionFailureMessage("failure-message", "custom webhook conversion error")), + }, + } + + // TODO: Added for integration testing of conversion webhooks, where decode errors due to conversion webhook failures need to be tested. + // Maybe we should identify conversion webhook related errors in decoding to avoid triggering this? Or maybe having this special casing + // of test cases in production code should be removed? + etcd3watcher.TestOnlySetFatalOnDecodeError(false) + defer etcd3watcher.TestOnlySetFatalOnDecodeError(true) + + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, apiextensionsfeatures.CustomResourceWebhookConversion, true)() + tearDown, config, options, err := fixtures.StartDefaultServer(t) + if err != nil { + t.Fatal(err) + } + + apiExtensionsClient, err := clientset.NewForConfig(config) + if err != nil { + tearDown() + t.Fatal(err) + } + + dynamicClient, err := dynamic.NewForConfig(config) + if err != nil { + tearDown() + t.Fatal(err) + } + defer tearDown() + + crd := multiVersionFixture.DeepCopy() + + RESTOptionsGetter := serveroptions.NewCRDRESTOptionsGetter(*options.RecommendedOptions.Etcd) + restOptions, err := RESTOptionsGetter.GetRESTOptions(schema.GroupResource{Group: crd.Spec.Group, Resource: crd.Spec.Names.Plural}) + if err != nil { + t.Fatal(err) + } + etcdClient, _, err := storage.GetEtcdClients(restOptions.StorageConfig.Transport) + if err != nil { + t.Fatal(err) + } + defer etcdClient.Close() + + etcdObjectReader := storage.NewEtcdObjectReader(etcdClient, &restOptions, crd) + ctcTearDown, ctc := newConversionTestContext(t, apiExtensionsClient, dynamicClient, etcdObjectReader, crd) + defer ctcTearDown() + + // read only object to read at a different version than stored when we need to force conversion + marker, err := ctc.versionedClient("marker", "v1beta2").Create(newConversionMultiVersionFixture("marker", "marker", "v1beta2"), metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + + for _, test := range tests { + t.Run(test.group, func(t *testing.T) { + tearDown, webhookClientConfig, webhookWaitReady, err := convert.StartConversionWebhookServerWithWaitReady(test.handler) + if err != nil { + t.Fatal(err) + } + defer tearDown() + + ctc.setConversionWebhook(t, webhookClientConfig) + defer ctc.removeConversionWebhook(t) + + err = webhookWaitReady(30*time.Second, func() error { + // the marker's storage version is v1beta2, so a v1beta1 read always triggers conversion + _, err := ctc.versionedClient(marker.GetNamespace(), "v1beta1").Get(marker.GetName(), metav1.GetOptions{}) + return err + }) + if err != nil { + t.Fatal(err) + } + + for i, checkFn := range test.checks { + name := fmt.Sprintf("check-%d", i) + t.Run(name, func(t *testing.T) { + ctc.setAndWaitStorageVersion(t, "v1beta2") + ctc.namespace = fmt.Sprintf("webhook-conversion-%s-%s", test.group, name) + checkFn(t, ctc) + }) + } + }) + } +} + +func validateStorageVersion(t *testing.T, ctc *conversionTestContext) { + ns := ctc.namespace + + for _, version := range []string{"v1beta1", "v1beta2"} { + t.Run(version, func(t *testing.T) { + name := "storageversion-" + version + client := ctc.versionedClient(ns, version) + obj, err := client.Create(newConversionMultiVersionFixture(ns, name, version), metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + ctc.setAndWaitStorageVersion(t, "v1beta2") + + obj, err = client.Get(obj.GetName(), metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + + ctc.setAndWaitStorageVersion(t, "v1beta1") + }) + } +} + +// validateMixedStorageVersions ensures that identical custom resources written at different storage versions +// are readable and remain the same. +func validateMixedStorageVersions(t *testing.T, ctc *conversionTestContext) { + ns := ctc.namespace + + v1client := ctc.versionedClient(ns, "v1beta1") + v2client := ctc.versionedClient(ns, "v1beta2") + clients := map[string]dynamic.ResourceInterface{"v1beta1": v1client, "v1beta2": v2client} + versions := []string{"v1beta1", "v1beta2"} + + // Create CRs at all storage versions + objNames := []string{} + for _, version := range versions { + ctc.setAndWaitStorageVersion(t, version) + + name := "stored-at-" + version + obj, err := clients[version].Create(newConversionMultiVersionFixture(ns, name, version), metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + objNames = append(objNames, obj.GetName()) + } + + // Ensure copies of an object have the same fields and values at each custom resource definition version regardless of storage version + for clientVersion, client := range clients { + t.Run(clientVersion, func(t *testing.T) { + o1, err := client.Get(objNames[0], metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + for _, objName := range objNames[1:] { + o2, err := client.Get(objName, metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + + // ignore metadata for comparison purposes + delete(o1.Object, "metadata") + delete(o2.Object, "metadata") + if !reflect.DeepEqual(o1.Object, o2.Object) { + t.Errorf("Expected custom resource to be same regardless of which storage version is used but got %+v != %+v", o1, o2) + } + } + }) + } +} + +func validateServed(t *testing.T, ctc *conversionTestContext) { + ns := ctc.namespace + + for _, version := range []string{"v1beta1", "v1beta2"} { + t.Run(version, func(t *testing.T) { + name := "served-" + version + client := ctc.versionedClient(ns, version) + obj, err := client.Create(newConversionMultiVersionFixture(ns, name, version), metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + ctc.setServed(t, version, false) + ctc.waitForServed(t, version, false, client, obj) + ctc.setServed(t, version, true) + ctc.waitForServed(t, version, true, client, obj) + }) + } +} + +func expectConversionFailureMessage(id, message string) func(t *testing.T, ctc *conversionTestContext) { + return func(t *testing.T, ctc *conversionTestContext) { + ns := ctc.namespace + v1client := ctc.versionedClient(ns, "v1beta1") + v2client := ctc.versionedClient(ns, "v1beta2") + var err error + // storage version is v1beta2, so this skips conversion + obj, err := v2client.Create(newConversionMultiVersionFixture(ns, id, "v1beta2"), metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + for _, verb := range []string{"get", "list", "create", "udpate", "patch", "delete", "deletecollection"} { + t.Run(verb, func(t *testing.T) { + switch verb { + case "get": + _, err = v1client.Get(obj.GetName(), metav1.GetOptions{}) + case "list": + _, err = v1client.List(metav1.ListOptions{}) + case "create": + _, err = v1client.Create(newConversionMultiVersionFixture(ns, id, "v1beta1"), metav1.CreateOptions{}) + case "update": + _, err = v1client.Update(obj, metav1.UpdateOptions{}) + case "patch": + _, err = v1client.Patch(obj.GetName(), types.MergePatchType, []byte(`{"metadata":{"annotations":{"patch":"true"}}}`), metav1.PatchOptions{}) + case "delete": + err = v1client.Delete(obj.GetName(), &metav1.DeleteOptions{}) + case "deletecollection": + err = v1client.DeleteCollection(&metav1.DeleteOptions{}, metav1.ListOptions{}) + } + + if err == nil { + t.Errorf("expected error with message %s, but got no error", message) + } else if !strings.Contains(err.Error(), message) { + t.Errorf("expected error with message %s, but got %v", message, err) + } + }) + } + for _, subresource := range []string{"status", "scale"} { + for _, verb := range []string{"get", "udpate", "patch"} { + t.Run(fmt.Sprintf("%s-%s", subresource, verb), func(t *testing.T) { + switch verb { + case "create": + _, err = v1client.Create(newConversionMultiVersionFixture(ns, id, "v1beta1"), metav1.CreateOptions{}, subresource) + case "update": + _, err = v1client.Update(obj, metav1.UpdateOptions{}, subresource) + case "patch": + _, err = v1client.Patch(obj.GetName(), types.MergePatchType, []byte(`{"metadata":{"annotations":{"patch":"true"}}}`), metav1.PatchOptions{}, subresource) + } + + if err == nil { + t.Errorf("expected error with message %s, but got no error", message) + } else if !strings.Contains(err.Error(), message) { + t.Errorf("expected error with message %s, but got %v", message, err) + } + }) + } + } + } +} + +func noopConverter(desiredAPIVersion string, obj runtime.RawExtension) (runtime.RawExtension, error) { + u := &unstructured.Unstructured{Object: map[string]interface{}{}} + if err := json.Unmarshal(obj.Raw, u); err != nil { + return runtime.RawExtension{}, fmt.Errorf("Fail to deserialize object: %s with error: %v", string(obj.Raw), err) + } + u.Object["apiVersion"] = desiredAPIVersion + raw, err := json.Marshal(u) + if err != nil { + return runtime.RawExtension{}, fmt.Errorf("Fail to serialize object: %v with error: %v", u, err) + } + return runtime.RawExtension{Raw: raw}, nil +} + +func emptyResponseConverter(review apiextensionsv1beta1.ConversionReview) (apiextensionsv1beta1.ConversionReview, error) { + review.Response = &apiextensionsv1beta1.ConversionResponse{ + UID: review.Request.UID, + ConvertedObjects: []runtime.RawExtension{}, + Result: metav1.Status{Status: "Success"}, + } + return review, nil +} + +func failureResponseConverter(message string) func(review apiextensionsv1beta1.ConversionReview) (apiextensionsv1beta1.ConversionReview, error) { + return func(review apiextensionsv1beta1.ConversionReview) (apiextensionsv1beta1.ConversionReview, error) { + review.Response = &apiextensionsv1beta1.ConversionResponse{ + UID: review.Request.UID, + ConvertedObjects: []runtime.RawExtension{}, + Result: metav1.Status{Message: message, Status: "Failure"}, + } + return review, nil + } +} + +func nontrivialConverter(desiredAPIVersion string, obj runtime.RawExtension) (runtime.RawExtension, error) { + u := &unstructured.Unstructured{Object: map[string]interface{}{}} + if err := json.Unmarshal(obj.Raw, u); err != nil { + return runtime.RawExtension{}, fmt.Errorf("Fail to deserialize object: %s with error: %v", string(obj.Raw), err) + } + + currentAPIVersion := u.Object["apiVersion"] + if currentAPIVersion == "v1beta2" && desiredAPIVersion == "v1beta1" { + u.Object["num"] = u.Object["numv2"] + u.Object["content"] = u.Object["contentv2"] + delete(u.Object, "numv2") + delete(u.Object, "contentv2") + } + if currentAPIVersion == "v1beta1" && desiredAPIVersion == "v1beta2" { + u.Object["numv2"] = u.Object["num"] + u.Object["contentv2"] = u.Object["content"] + delete(u.Object, "num") + delete(u.Object, "content") + } + u.Object["apiVersion"] = desiredAPIVersion + raw, err := json.Marshal(u) + if err != nil { + return runtime.RawExtension{}, fmt.Errorf("Fail to serialize object: %v with error: %v", u, err) + } + return runtime.RawExtension{Raw: raw}, nil +} + +func newConversionTestContext(t *testing.T, apiExtensionsClient clientset.Interface, dynamicClient dynamic.Interface, etcdObjectReader *storage.EtcdObjectReader, crd *apiextensionsv1beta1.CustomResourceDefinition) (func(), *conversionTestContext) { + crd, err := fixtures.CreateNewCustomResourceDefinition(crd, apiExtensionsClient, dynamicClient) + if err != nil { + t.Fatal(err) + } + + tearDown := func() { + if err := fixtures.DeleteCustomResourceDefinition(crd, apiExtensionsClient); err != nil { + t.Fatal(err) + } + } + + return tearDown, &conversionTestContext{apiExtensionsClient: apiExtensionsClient, dynamicClient: dynamicClient, crd: crd, etcdObjectReader: etcdObjectReader} +} + +type conversionTestContext struct { + namespace string + apiExtensionsClient clientset.Interface + dynamicClient dynamic.Interface + options *options.CustomResourceDefinitionsServerOptions + crd *apiextensionsv1beta1.CustomResourceDefinition + etcdObjectReader *storage.EtcdObjectReader +} + +func (c *conversionTestContext) versionedClient(ns string, version string) dynamic.ResourceInterface { + return newNamespacedCustomResourceVersionedClient(ns, c.dynamicClient, c.crd, version) +} + +func (c *conversionTestContext) setConversionWebhook(t *testing.T, webhookClientConfig *apiextensionsv1beta1.WebhookClientConfig) { + crd, err := c.apiExtensionsClient.ApiextensionsV1beta1().CustomResourceDefinitions().Get(c.crd.Name, metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + crd.Spec.Conversion = &apiextensionsv1beta1.CustomResourceConversion{ + Strategy: apiextensionsv1beta1.WebhookConverter, + WebhookClientConfig: webhookClientConfig, + } + crd, err = c.apiExtensionsClient.ApiextensionsV1beta1().CustomResourceDefinitions().Update(crd) + if err != nil { + t.Fatal(err) + } + c.crd = crd + +} + +func (c *conversionTestContext) removeConversionWebhook(t *testing.T) { + crd, err := c.apiExtensionsClient.ApiextensionsV1beta1().CustomResourceDefinitions().Get(c.crd.Name, metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + crd.Spec.Conversion = &apiextensionsv1beta1.CustomResourceConversion{ + Strategy: apiextensionsv1beta1.NoneConverter, + } + + crd, err = c.apiExtensionsClient.ApiextensionsV1beta1().CustomResourceDefinitions().Update(crd) + if err != nil { + t.Fatal(err) + } + c.crd = crd +} + +func (c *conversionTestContext) setAndWaitStorageVersion(t *testing.T, version string) { + c.setStorageVersion(t, "v1beta2") + + client := c.versionedClient("probe", "v1beta2") + name := fmt.Sprintf("probe-%v", uuid.NewUUID()) + storageProbe, err := client.Create(newConversionMultiVersionFixture("probe", name, "v1beta2"), metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + c.waitForStorageVersion(t, "v1beta2", c.versionedClient(storageProbe.GetNamespace(), "v1beta2"), storageProbe) + + err = client.Delete(name, &metav1.DeleteOptions{}) + if err != nil { + t.Fatal(err) + } +} + +func (c *conversionTestContext) setStorageVersion(t *testing.T, version string) { + crd, err := c.apiExtensionsClient.ApiextensionsV1beta1().CustomResourceDefinitions().Get(c.crd.Name, metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + for i, v := range crd.Spec.Versions { + crd.Spec.Versions[i].Storage = (v.Name == version) + } + crd, err = c.apiExtensionsClient.ApiextensionsV1beta1().CustomResourceDefinitions().Update(crd) + if err != nil { + t.Fatal(err) + } + c.crd = crd +} + +func (c *conversionTestContext) waitForStorageVersion(t *testing.T, version string, versionedClient dynamic.ResourceInterface, obj *unstructured.Unstructured) *unstructured.Unstructured { + c.etcdObjectReader.WaitForStorageVersion(version, obj.GetNamespace(), obj.GetName(), 30*time.Second, func() { + var err error + obj, err = versionedClient.Update(obj, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("failed to update object: %v", err) + } + }) + return obj +} + +func (c *conversionTestContext) setServed(t *testing.T, version string, served bool) { + crd, err := c.apiExtensionsClient.ApiextensionsV1beta1().CustomResourceDefinitions().Get(c.crd.Name, metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + for i, v := range crd.Spec.Versions { + if v.Name == version { + crd.Spec.Versions[i].Served = served + } + } + crd, err = c.apiExtensionsClient.ApiextensionsV1beta1().CustomResourceDefinitions().Update(crd) + if err != nil { + t.Fatal(err) + } + c.crd = crd +} + +func (c *conversionTestContext) waitForServed(t *testing.T, version string, served bool, versionedClient dynamic.ResourceInterface, obj *unstructured.Unstructured) { + timeout := 30 * time.Second + waitCh := time.After(timeout) + for { + obj, err := versionedClient.Get(obj.GetName(), metav1.GetOptions{}) + if (err == nil && served) || (errors.IsNotFound(err) && served == false) { + return + } + select { + case <-waitCh: + t.Fatalf("Timed out after %v waiting for CRD served=%t for version %s for %v. Last error: %v", timeout, served, version, obj, err) + case <-time.After(10 * time.Millisecond): + } + } +} + +var multiVersionFixture = &apiextensionsv1beta1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{Name: "multiversion.stable.example.com"}, + Spec: apiextensionsv1beta1.CustomResourceDefinitionSpec{ + Group: "stable.example.com", + Version: "v1beta1", + Names: apiextensionsv1beta1.CustomResourceDefinitionNames{ + Plural: "multiversion", + Singular: "multiversion", + Kind: "MultiVersion", + ShortNames: []string{"mv"}, + ListKind: "MultiVersionList", + Categories: []string{"all"}, + }, + Scope: apiextensionsv1beta1.NamespaceScoped, + Versions: []apiextensionsv1beta1.CustomResourceDefinitionVersion{ + { + Name: "v1beta1", + Served: true, + Storage: false, + }, + { + Name: "v1beta2", + Served: true, + Storage: true, + }, + }, + Subresources: &apiextensionsv1beta1.CustomResourceSubresources{ + Status: &apiextensionsv1beta1.CustomResourceSubresourceStatus{}, + Scale: &apiextensionsv1beta1.CustomResourceSubresourceScale{ + SpecReplicasPath: ".spec.num.num1", + StatusReplicasPath: ".status.num.num2", + }, + }, + }, +} + +func newConversionMultiVersionFixture(namespace, name, version string) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "stable.example.com/" + version, + "kind": "MultiVersion", + "metadata": map[string]interface{}{ + "namespace": namespace, + "name": name, + }, + "content": map[string]interface{}{ + "key": "value", + }, + "num": map[string]interface{}{ + "num1": 1, + "num2": 1000000, + }, + }, + } +} diff --git a/staging/src/k8s.io/apiextensions-apiserver/test/integration/convert/BUILD b/staging/src/k8s.io/apiextensions-apiserver/test/integration/convert/BUILD new file mode 100644 index 00000000000..eafa4b428f0 --- /dev/null +++ b/staging/src/k8s.io/apiextensions-apiserver/test/integration/convert/BUILD @@ -0,0 +1,29 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["webhook.go"], + importmap = "k8s.io/kubernetes/vendor/k8s.io/apiextensions-apiserver/test/integration/convert", + importpath = "k8s.io/apiextensions-apiserver/test/integration/convert", + visibility = ["//visibility:public"], + deps = [ + "//staging/src/k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/staging/src/k8s.io/apiextensions-apiserver/test/integration/convert/webhook.go b/staging/src/k8s.io/apiextensions-apiserver/test/integration/convert/webhook.go new file mode 100644 index 00000000000..d829ac2bb5a --- /dev/null +++ b/staging/src/k8s.io/apiextensions-apiserver/test/integration/convert/webhook.go @@ -0,0 +1,206 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package convert + +import ( + "crypto/tls" + "crypto/x509" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" + + apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/uuid" +) + +// WaitReadyFunc calls triggerConversionFn periodically and waits until it detects that the webhook +// conversion server has handled at least 1 conversion request or the timeout is exceeded, in which +// case an error is returned. +type WaitReadyFunc func(timeout time.Duration, triggerConversionFn func() error) error + +// StartConversionWebhookServerWithWaitReady starts an http server with the provided handler and returns the WebhookClientConfig +// needed to configure a CRD to use this conversion webhook as its converter. +// It also returns a WaitReadyFunc to be called after the CRD is configured to wait until the conversion webhook handler +// accepts at least one conversion request. If the server fails to start, an error is returned. +// WaitReady is useful when changing the conversion webhook config of an existing CRD because the update does not take effect immediately. +func StartConversionWebhookServerWithWaitReady(handler http.Handler) (func(), *apiextensionsv1beta1.WebhookClientConfig, WaitReadyFunc, error) { + var once sync.Once + handlerReadyC := make(chan struct{}) + readyNotifyHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + once.Do(func() { + close(handlerReadyC) + }) + handler.ServeHTTP(w, r) + }) + + tearDown, webhookConfig, err := StartConversionWebhookServer(readyNotifyHandler) + if err != nil { + return nil, nil, nil, fmt.Errorf("error starting webhook server: %v", err) + } + + waitReady := func(timeout time.Duration, triggerConversionFn func() error) error { + var err error + for { + select { + case <-handlerReadyC: + return nil + case <-time.After(timeout): + return fmt.Errorf("Timed out waiting for CRD webhook converter update, last trigger conversion error: %v", err) + case <-time.After(100 * time.Millisecond): + err = triggerConversionFn() + } + } + } + return tearDown, webhookConfig, waitReady, err +} + +// StartConversionWebhookServer starts an http server with the provided handler and returns the WebhookClientConfig +// needed to configure a CRD to use this conversion webhook as its converter. +func StartConversionWebhookServer(handler http.Handler) (func(), *apiextensionsv1beta1.WebhookClientConfig, error) { + // Use a unique path for each webhook server. This ensures that all conversion requests + // received by the handler are intended for it; if a WebhookClientConfig other than this one + // is applied in the api server, conversion requests will not reach the handler (if they + // reach the server they will be returned at 404). This helps prevent tests that require a + // specific conversion webhook from accidentally using a different one, which could otherwise + // cause a test to flake or pass when it should fail. Since updating the conversion client + // config of a custom resource definition does not take effect immediately, this is needed + // by the WaitReady returned StartConversionWebhookServerWithWaitReady to detect when a + // conversion client config change in the api server has taken effect. + path := fmt.Sprintf("/conversionwebhook-%s", uuid.NewUUID()) + roots := x509.NewCertPool() + if !roots.AppendCertsFromPEM(localhostCert) { + return nil, nil, fmt.Errorf("Failed to append Cert from PEM") + } + cert, err := tls.X509KeyPair(localhostCert, localhostKey) + if err != nil { + return nil, nil, fmt.Errorf("Failed to build cert with error: %+v", err) + } + webhookMux := http.NewServeMux() + webhookMux.Handle(path, handler) + webhookServer := httptest.NewUnstartedServer(webhookMux) + webhookServer.TLS = &tls.Config{ + RootCAs: roots, + Certificates: []tls.Certificate{cert}, + } + webhookServer.StartTLS() + endpoint := webhookServer.URL + path + webhookConfig := &apiextensionsv1beta1.WebhookClientConfig{ + CABundle: localhostCert, + URL: &endpoint, + } + return webhookServer.Close, webhookConfig, nil +} + +// ReviewConverterFunc converts an entire ConversionReview. +type ReviewConverterFunc func(review apiextensionsv1beta1.ConversionReview) (apiextensionsv1beta1.ConversionReview, error) + +// NewReviewWebhookHandler creates a handler that delegates the review conversion to the provided ReviewConverterFunc. +func NewReviewWebhookHandler(t *testing.T, converterFunc ReviewConverterFunc) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + data, err := ioutil.ReadAll(r.Body) + if err != nil { + t.Error(err) + return + } + if contentType := r.Header.Get("Content-Type"); contentType != "application/json" { + t.Errorf("contentType=%s, expect application/json", contentType) + return + } + + review := apiextensionsv1beta1.ConversionReview{} + if err := json.Unmarshal(data, &review); err != nil { + t.Errorf("Fail to deserialize object: %s with error: %v", string(data), err) + http.Error(w, err.Error(), 400) + return + } + + review, err = converterFunc(review) + if err != nil { + t.Errorf("Error converting review: %v", err) + http.Error(w, err.Error(), 500) + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(review); err != nil { + t.Errorf("Marshal of response failed with error: %v", err) + } + }) +} + +// ObjectConverterFunc converts a single custom resource to the desiredAPIVersion and returns it or returns an error. +type ObjectConverterFunc func(desiredAPIVersion string, customResource runtime.RawExtension) (runtime.RawExtension, error) + +// NewObjectConverterWebhookHandler creates a handler that delegates custom resource conversion to the provided ConverterFunc. +func NewObjectConverterWebhookHandler(t *testing.T, converterFunc ObjectConverterFunc) http.Handler { + return NewReviewWebhookHandler(t, func(review apiextensionsv1beta1.ConversionReview) (apiextensionsv1beta1.ConversionReview, error) { + converted := []runtime.RawExtension{} + errMsgs := []string{} + for _, obj := range review.Request.Objects { + convertedObj, err := converterFunc(review.Request.DesiredAPIVersion, obj) + if err != nil { + errMsgs = append(errMsgs, err.Error()) + } + + converted = append(converted, convertedObj) + } + + review.Response = &apiextensionsv1beta1.ConversionResponse{ + UID: review.Request.UID, + ConvertedObjects: converted, + } + if len(errMsgs) == 0 { + review.Response.Result = metav1.Status{Status: "Success"} + } else { + review.Response.Result = metav1.Status{Status: "Failure", Message: strings.Join(errMsgs, ", ")} + } + return review, nil + }) +} + +// localhostCert was generated from crypto/tls/generate_cert.go with the following command: +// go run generate_cert.go --rsa-bits 512 --host 127.0.0.1,::1,example.com --ca --start-date "Jan 1 00:00:00 1970" --duration=1000000h +var localhostCert = []byte(`-----BEGIN CERTIFICATE----- +MIIBjzCCATmgAwIBAgIRAKpi2WmTcFrVjxrl5n5YDUEwDQYJKoZIhvcNAQELBQAw +EjEQMA4GA1UEChMHQWNtZSBDbzAgFw03MDAxMDEwMDAwMDBaGA8yMDg0MDEyOTE2 +MDAwMFowEjEQMA4GA1UEChMHQWNtZSBDbzBcMA0GCSqGSIb3DQEBAQUAA0sAMEgC +QQC9fEbRszP3t14Gr4oahV7zFObBI4TfA5i7YnlMXeLinb7MnvT4bkfOJzE6zktn +59zP7UiHs3l4YOuqrjiwM413AgMBAAGjaDBmMA4GA1UdDwEB/wQEAwICpDATBgNV +HSUEDDAKBggrBgEFBQcDATAPBgNVHRMBAf8EBTADAQH/MC4GA1UdEQQnMCWCC2V4 +YW1wbGUuY29thwR/AAABhxAAAAAAAAAAAAAAAAAAAAABMA0GCSqGSIb3DQEBCwUA +A0EAUsVE6KMnza/ZbodLlyeMzdo7EM/5nb5ywyOxgIOCf0OOLHsPS9ueGLQX9HEG +//yjTXuhNcUugExIjM/AIwAZPQ== +-----END CERTIFICATE-----`) + +// localhostKey is the private key for localhostCert. +var localhostKey = []byte(`-----BEGIN RSA PRIVATE KEY----- +MIIBOwIBAAJBAL18RtGzM/e3XgavihqFXvMU5sEjhN8DmLtieUxd4uKdvsye9Phu +R84nMTrOS2fn3M/tSIezeXhg66quOLAzjXcCAwEAAQJBAKcRxH9wuglYLBdI/0OT +BLzfWPZCEw1vZmMR2FF1Fm8nkNOVDPleeVGTWoOEcYYlQbpTmkGSxJ6ya+hqRi6x +goECIQDx3+X49fwpL6B5qpJIJMyZBSCuMhH4B7JevhGGFENi3wIhAMiNJN5Q3UkL +IuSvv03kaPR5XVQ99/UeEetUgGvBcABpAiBJSBzVITIVCGkGc7d+RCf49KTCIklv +bGWObufAR8Ni4QIgWpILjW8dkGg8GOUZ0zaNA6Nvt6TIv2UWGJ4v5PoV98kCIQDx +rIiZs5QbKdycsv9gQJzwQAogC8o04X3Zz3dsoX+h4A== +-----END RSA PRIVATE KEY-----`) diff --git a/staging/src/k8s.io/apiextensions-apiserver/test/integration/storage/BUILD b/staging/src/k8s.io/apiextensions-apiserver/test/integration/storage/BUILD new file mode 100644 index 00000000000..e3fd12256e2 --- /dev/null +++ b/staging/src/k8s.io/apiextensions-apiserver/test/integration/storage/BUILD @@ -0,0 +1,31 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["objectreader.go"], + importmap = "k8s.io/kubernetes/vendor/k8s.io/apiextensions-apiserver/test/integration/storage", + importpath = "k8s.io/apiextensions-apiserver/test/integration/storage", + visibility = ["//visibility:public"], + deps = [ + "//staging/src/k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/registry/generic:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", + "//vendor/github.com/coreos/etcd/clientv3:go_default_library", + "//vendor/github.com/coreos/etcd/pkg/transport:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/staging/src/k8s.io/apiextensions-apiserver/test/integration/storage/objectreader.go b/staging/src/k8s.io/apiextensions-apiserver/test/integration/storage/objectreader.go new file mode 100644 index 00000000000..a6e21fd6242 --- /dev/null +++ b/staging/src/k8s.io/apiextensions-apiserver/test/integration/storage/objectreader.go @@ -0,0 +1,111 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "context" + "encoding/json" + "fmt" + "path" + "time" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/pkg/transport" + apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apiserver/pkg/registry/generic" + "k8s.io/apiserver/pkg/storage/storagebackend" +) + +// EtcdObjectReader provides direct access to custom resource objects stored in etcd. +type EtcdObjectReader struct { + etcdClient *clientv3.Client + storagePrefix string + crd *apiextensionsv1beta1.CustomResourceDefinition +} + +// NewEtcdObjectReader creates a reader for accessing custom resource objects directly from etcd. +func NewEtcdObjectReader(etcdClient *clientv3.Client, restOptions *generic.RESTOptions, crd *apiextensionsv1beta1.CustomResourceDefinition) *EtcdObjectReader { + return &EtcdObjectReader{etcdClient, restOptions.StorageConfig.Prefix, crd} +} + +// WaitForStorageVersion calls the updateObjFn periodically and waits for the version of the custom resource stored in etcd to be set to the provided version. +// Typically updateObjFn should perform a noop update to the object so that when stored version of a CRD changes, the object is written at the updated storage version. +// If the timeout is exceeded a error is returned. +// This is useful when updating the stored version of an existing CRD because the update does not take effect immediately. +func (s *EtcdObjectReader) WaitForStorageVersion(version string, ns, name string, timeout time.Duration, updateObjFn func()) error { + waitCh := time.After(timeout) + for { + storage, err := s.GetStoredCustomResource(ns, name) + if err != nil { + return err + } + if storage.GetObjectKind().GroupVersionKind().Version == version { + return nil + } + select { + case <-waitCh: + return fmt.Errorf("timed out after %v waiting for storage version to be %s for object (namespace:%s name:%s)", timeout, version, ns, name) + case <-time.After(10 * time.Millisecond): + updateObjFn() + } + } +} + +// GetStoredCustomResource gets the storage representation of a custom resource from etcd. +func (s *EtcdObjectReader) GetStoredCustomResource(ns, name string) (*unstructured.Unstructured, error) { + key := path.Join("/", s.storagePrefix, s.crd.Spec.Group, s.crd.Spec.Names.Plural, ns, name) + resp, err := s.etcdClient.KV.Get(context.Background(), key) + if err != nil { + return nil, fmt.Errorf("error getting storage object %s, %s from etcd at key %s: %v", ns, name, key, err) + } + if len(resp.Kvs) == 0 { + return nil, fmt.Errorf("no storage object found for %s, %s in etcd for key %s", ns, name, key) + } + raw := resp.Kvs[0].Value + u := &unstructured.Unstructured{Object: map[string]interface{}{}} + if err := json.Unmarshal(raw, u); err != nil { + return nil, fmt.Errorf("error deserializing object %s: %v", string(raw), err) + } + return u, nil +} + +// GetEtcdClients returns an initialized clientv3.Client and clientv3.KV. +func GetEtcdClients(config storagebackend.TransportConfig) (*clientv3.Client, clientv3.KV, error) { + tlsInfo := transport.TLSInfo{ + CertFile: config.CertFile, + KeyFile: config.KeyFile, + CAFile: config.CAFile, + } + + tlsConfig, err := tlsInfo.ClientConfig() + if err != nil { + return nil, nil, err + } + + cfg := clientv3.Config{ + Endpoints: config.ServerList, + TLS: tlsConfig, + } + + c, err := clientv3.New(cfg) + if err != nil { + return nil, nil, err + } + + return c, clientv3.NewKV(c), nil +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go index 15437f27e74..f2b16f3bd75 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go @@ -56,9 +56,15 @@ func testingDeferOnDecodeError() { func init() { // check to see if we are running in a test environment + TestOnlySetFatalOnDecodeError(true) fatalOnDecodeError, _ = strconv.ParseBool(os.Getenv("KUBE_PANIC_WATCH_DECODE_ERROR")) } +// TestOnlySetFatalOnDecodeError should only be used for cases where decode errors are expected and need to be tested. e.g. conversion webhooks. +func TestOnlySetFatalOnDecodeError(b bool) { + fatalOnDecodeError = b +} + type watcher struct { client *clientv3.Client codec runtime.Codec