mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 12:15:52 +00:00
Add webhook converter integration test suite
This commit is contained in:
parent
adf6fa6987
commit
e23769f018
@ -30,6 +30,7 @@ BADSYMBOLS=(
|
||||
"httptest"
|
||||
"testify"
|
||||
"testing[.]"
|
||||
"TestOnlySetFatalOnDecodeError"
|
||||
)
|
||||
|
||||
# b/c hyperkube binds everything simply check that for bad symbols
|
||||
|
@ -12,6 +12,7 @@ go_test(
|
||||
"apply_test.go",
|
||||
"basic_test.go",
|
||||
"change_test.go",
|
||||
"conversion_test.go",
|
||||
"finalization_test.go",
|
||||
"objectmeta_test.go",
|
||||
"registration_test.go",
|
||||
@ -30,7 +31,9 @@ go_test(
|
||||
"//staging/src/k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/scheme:go_default_library",
|
||||
"//staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/options:go_default_library",
|
||||
"//staging/src/k8s.io/apiextensions-apiserver/pkg/features:go_default_library",
|
||||
"//staging/src/k8s.io/apiextensions-apiserver/test/integration/convert:go_default_library",
|
||||
"//staging/src/k8s.io/apiextensions-apiserver/test/integration/fixtures:go_default_library",
|
||||
"//staging/src/k8s.io/apiextensions-apiserver/test/integration/storage:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
@ -41,11 +44,13 @@ go_test(
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/json:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/yaml:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/rest:go_default_library",
|
||||
@ -69,7 +74,9 @@ filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [
|
||||
":package-srcs",
|
||||
"//staging/src/k8s.io/apiextensions-apiserver/test/integration/convert:all-srcs",
|
||||
"//staging/src/k8s.io/apiextensions-apiserver/test/integration/fixtures:all-srcs",
|
||||
"//staging/src/k8s.io/apiextensions-apiserver/test/integration/storage:all-srcs",
|
||||
],
|
||||
tags = ["automanaged"],
|
||||
)
|
||||
|
@ -0,0 +1,564 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package integration
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
|
||||
"k8s.io/apiextensions-apiserver/pkg/cmd/server/options"
|
||||
serveroptions "k8s.io/apiextensions-apiserver/pkg/cmd/server/options"
|
||||
apiextensionsfeatures "k8s.io/apiextensions-apiserver/pkg/features"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
etcd3watcher "k8s.io/apiserver/pkg/storage/etcd3"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/dynamic"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
|
||||
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
|
||||
"k8s.io/apiextensions-apiserver/test/integration/convert"
|
||||
"k8s.io/apiextensions-apiserver/test/integration/fixtures"
|
||||
"k8s.io/apiextensions-apiserver/test/integration/storage"
|
||||
)
|
||||
|
||||
type Checker func(t *testing.T, ctc *conversionTestContext)
|
||||
|
||||
func checks(checkers ...Checker) []Checker {
|
||||
return checkers
|
||||
}
|
||||
|
||||
func TestWebhookConverter(t *testing.T) {
|
||||
tests := []struct {
|
||||
group string
|
||||
handler http.Handler
|
||||
checks []Checker
|
||||
}{
|
||||
{
|
||||
group: "noop-converter",
|
||||
handler: convert.NewObjectConverterWebhookHandler(t, noopConverter),
|
||||
checks: checks(validateStorageVersion, validateServed, validateMixedStorageVersions),
|
||||
},
|
||||
{
|
||||
group: "nontrivial-converter",
|
||||
handler: convert.NewObjectConverterWebhookHandler(t, nontrivialConverter),
|
||||
checks: checks(validateStorageVersion, validateServed, validateMixedStorageVersions),
|
||||
},
|
||||
{
|
||||
group: "empty-response",
|
||||
handler: convert.NewReviewWebhookHandler(t, emptyResponseConverter),
|
||||
checks: checks(expectConversionFailureMessage("empty-response", "expected 1 converted objects")),
|
||||
},
|
||||
{
|
||||
group: "failure-message",
|
||||
handler: convert.NewReviewWebhookHandler(t, failureResponseConverter("custom webhook conversion error")),
|
||||
checks: checks(expectConversionFailureMessage("failure-message", "custom webhook conversion error")),
|
||||
},
|
||||
}
|
||||
|
||||
// TODO: Added for integration testing of conversion webhooks, where decode errors due to conversion webhook failures need to be tested.
|
||||
// Maybe we should identify conversion webhook related errors in decoding to avoid triggering this? Or maybe having this special casing
|
||||
// of test cases in production code should be removed?
|
||||
etcd3watcher.TestOnlySetFatalOnDecodeError(false)
|
||||
defer etcd3watcher.TestOnlySetFatalOnDecodeError(true)
|
||||
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, apiextensionsfeatures.CustomResourceWebhookConversion, true)()
|
||||
tearDown, config, options, err := fixtures.StartDefaultServer(t)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
apiExtensionsClient, err := clientset.NewForConfig(config)
|
||||
if err != nil {
|
||||
tearDown()
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
dynamicClient, err := dynamic.NewForConfig(config)
|
||||
if err != nil {
|
||||
tearDown()
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer tearDown()
|
||||
|
||||
crd := multiVersionFixture.DeepCopy()
|
||||
|
||||
RESTOptionsGetter := serveroptions.NewCRDRESTOptionsGetter(*options.RecommendedOptions.Etcd)
|
||||
restOptions, err := RESTOptionsGetter.GetRESTOptions(schema.GroupResource{Group: crd.Spec.Group, Resource: crd.Spec.Names.Plural})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
etcdClient, _, err := storage.GetEtcdClients(restOptions.StorageConfig.Transport)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer etcdClient.Close()
|
||||
|
||||
etcdObjectReader := storage.NewEtcdObjectReader(etcdClient, &restOptions, crd)
|
||||
ctcTearDown, ctc := newConversionTestContext(t, apiExtensionsClient, dynamicClient, etcdObjectReader, crd)
|
||||
defer ctcTearDown()
|
||||
|
||||
// read only object to read at a different version than stored when we need to force conversion
|
||||
marker, err := ctc.versionedClient("marker", "v1beta2").Create(newConversionMultiVersionFixture("marker", "marker", "v1beta2"), metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.group, func(t *testing.T) {
|
||||
tearDown, webhookClientConfig, webhookWaitReady, err := convert.StartConversionWebhookServerWithWaitReady(test.handler)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer tearDown()
|
||||
|
||||
ctc.setConversionWebhook(t, webhookClientConfig)
|
||||
defer ctc.removeConversionWebhook(t)
|
||||
|
||||
err = webhookWaitReady(30*time.Second, func() error {
|
||||
// the marker's storage version is v1beta2, so a v1beta1 read always triggers conversion
|
||||
_, err := ctc.versionedClient(marker.GetNamespace(), "v1beta1").Get(marker.GetName(), metav1.GetOptions{})
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for i, checkFn := range test.checks {
|
||||
name := fmt.Sprintf("check-%d", i)
|
||||
t.Run(name, func(t *testing.T) {
|
||||
ctc.setAndWaitStorageVersion(t, "v1beta2")
|
||||
ctc.namespace = fmt.Sprintf("webhook-conversion-%s-%s", test.group, name)
|
||||
checkFn(t, ctc)
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func validateStorageVersion(t *testing.T, ctc *conversionTestContext) {
|
||||
ns := ctc.namespace
|
||||
|
||||
for _, version := range []string{"v1beta1", "v1beta2"} {
|
||||
t.Run(version, func(t *testing.T) {
|
||||
name := "storageversion-" + version
|
||||
client := ctc.versionedClient(ns, version)
|
||||
obj, err := client.Create(newConversionMultiVersionFixture(ns, name, version), metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
ctc.setAndWaitStorageVersion(t, "v1beta2")
|
||||
|
||||
obj, err = client.Get(obj.GetName(), metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ctc.setAndWaitStorageVersion(t, "v1beta1")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// validateMixedStorageVersions ensures that identical custom resources written at different storage versions
|
||||
// are readable and remain the same.
|
||||
func validateMixedStorageVersions(t *testing.T, ctc *conversionTestContext) {
|
||||
ns := ctc.namespace
|
||||
|
||||
v1client := ctc.versionedClient(ns, "v1beta1")
|
||||
v2client := ctc.versionedClient(ns, "v1beta2")
|
||||
clients := map[string]dynamic.ResourceInterface{"v1beta1": v1client, "v1beta2": v2client}
|
||||
versions := []string{"v1beta1", "v1beta2"}
|
||||
|
||||
// Create CRs at all storage versions
|
||||
objNames := []string{}
|
||||
for _, version := range versions {
|
||||
ctc.setAndWaitStorageVersion(t, version)
|
||||
|
||||
name := "stored-at-" + version
|
||||
obj, err := clients[version].Create(newConversionMultiVersionFixture(ns, name, version), metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
objNames = append(objNames, obj.GetName())
|
||||
}
|
||||
|
||||
// Ensure copies of an object have the same fields and values at each custom resource definition version regardless of storage version
|
||||
for clientVersion, client := range clients {
|
||||
t.Run(clientVersion, func(t *testing.T) {
|
||||
o1, err := client.Get(objNames[0], metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for _, objName := range objNames[1:] {
|
||||
o2, err := client.Get(objName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// ignore metadata for comparison purposes
|
||||
delete(o1.Object, "metadata")
|
||||
delete(o2.Object, "metadata")
|
||||
if !reflect.DeepEqual(o1.Object, o2.Object) {
|
||||
t.Errorf("Expected custom resource to be same regardless of which storage version is used but got %+v != %+v", o1, o2)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func validateServed(t *testing.T, ctc *conversionTestContext) {
|
||||
ns := ctc.namespace
|
||||
|
||||
for _, version := range []string{"v1beta1", "v1beta2"} {
|
||||
t.Run(version, func(t *testing.T) {
|
||||
name := "served-" + version
|
||||
client := ctc.versionedClient(ns, version)
|
||||
obj, err := client.Create(newConversionMultiVersionFixture(ns, name, version), metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
ctc.setServed(t, version, false)
|
||||
ctc.waitForServed(t, version, false, client, obj)
|
||||
ctc.setServed(t, version, true)
|
||||
ctc.waitForServed(t, version, true, client, obj)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func expectConversionFailureMessage(id, message string) func(t *testing.T, ctc *conversionTestContext) {
|
||||
return func(t *testing.T, ctc *conversionTestContext) {
|
||||
ns := ctc.namespace
|
||||
v1client := ctc.versionedClient(ns, "v1beta1")
|
||||
v2client := ctc.versionedClient(ns, "v1beta2")
|
||||
var err error
|
||||
// storage version is v1beta2, so this skips conversion
|
||||
obj, err := v2client.Create(newConversionMultiVersionFixture(ns, id, "v1beta2"), metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for _, verb := range []string{"get", "list", "create", "udpate", "patch", "delete", "deletecollection"} {
|
||||
t.Run(verb, func(t *testing.T) {
|
||||
switch verb {
|
||||
case "get":
|
||||
_, err = v1client.Get(obj.GetName(), metav1.GetOptions{})
|
||||
case "list":
|
||||
_, err = v1client.List(metav1.ListOptions{})
|
||||
case "create":
|
||||
_, err = v1client.Create(newConversionMultiVersionFixture(ns, id, "v1beta1"), metav1.CreateOptions{})
|
||||
case "update":
|
||||
_, err = v1client.Update(obj, metav1.UpdateOptions{})
|
||||
case "patch":
|
||||
_, err = v1client.Patch(obj.GetName(), types.MergePatchType, []byte(`{"metadata":{"annotations":{"patch":"true"}}}`), metav1.PatchOptions{})
|
||||
case "delete":
|
||||
err = v1client.Delete(obj.GetName(), &metav1.DeleteOptions{})
|
||||
case "deletecollection":
|
||||
err = v1client.DeleteCollection(&metav1.DeleteOptions{}, metav1.ListOptions{})
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
t.Errorf("expected error with message %s, but got no error", message)
|
||||
} else if !strings.Contains(err.Error(), message) {
|
||||
t.Errorf("expected error with message %s, but got %v", message, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
for _, subresource := range []string{"status", "scale"} {
|
||||
for _, verb := range []string{"get", "udpate", "patch"} {
|
||||
t.Run(fmt.Sprintf("%s-%s", subresource, verb), func(t *testing.T) {
|
||||
switch verb {
|
||||
case "create":
|
||||
_, err = v1client.Create(newConversionMultiVersionFixture(ns, id, "v1beta1"), metav1.CreateOptions{}, subresource)
|
||||
case "update":
|
||||
_, err = v1client.Update(obj, metav1.UpdateOptions{}, subresource)
|
||||
case "patch":
|
||||
_, err = v1client.Patch(obj.GetName(), types.MergePatchType, []byte(`{"metadata":{"annotations":{"patch":"true"}}}`), metav1.PatchOptions{}, subresource)
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
t.Errorf("expected error with message %s, but got no error", message)
|
||||
} else if !strings.Contains(err.Error(), message) {
|
||||
t.Errorf("expected error with message %s, but got %v", message, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func noopConverter(desiredAPIVersion string, obj runtime.RawExtension) (runtime.RawExtension, error) {
|
||||
u := &unstructured.Unstructured{Object: map[string]interface{}{}}
|
||||
if err := json.Unmarshal(obj.Raw, u); err != nil {
|
||||
return runtime.RawExtension{}, fmt.Errorf("Fail to deserialize object: %s with error: %v", string(obj.Raw), err)
|
||||
}
|
||||
u.Object["apiVersion"] = desiredAPIVersion
|
||||
raw, err := json.Marshal(u)
|
||||
if err != nil {
|
||||
return runtime.RawExtension{}, fmt.Errorf("Fail to serialize object: %v with error: %v", u, err)
|
||||
}
|
||||
return runtime.RawExtension{Raw: raw}, nil
|
||||
}
|
||||
|
||||
func emptyResponseConverter(review apiextensionsv1beta1.ConversionReview) (apiextensionsv1beta1.ConversionReview, error) {
|
||||
review.Response = &apiextensionsv1beta1.ConversionResponse{
|
||||
UID: review.Request.UID,
|
||||
ConvertedObjects: []runtime.RawExtension{},
|
||||
Result: metav1.Status{Status: "Success"},
|
||||
}
|
||||
return review, nil
|
||||
}
|
||||
|
||||
func failureResponseConverter(message string) func(review apiextensionsv1beta1.ConversionReview) (apiextensionsv1beta1.ConversionReview, error) {
|
||||
return func(review apiextensionsv1beta1.ConversionReview) (apiextensionsv1beta1.ConversionReview, error) {
|
||||
review.Response = &apiextensionsv1beta1.ConversionResponse{
|
||||
UID: review.Request.UID,
|
||||
ConvertedObjects: []runtime.RawExtension{},
|
||||
Result: metav1.Status{Message: message, Status: "Failure"},
|
||||
}
|
||||
return review, nil
|
||||
}
|
||||
}
|
||||
|
||||
func nontrivialConverter(desiredAPIVersion string, obj runtime.RawExtension) (runtime.RawExtension, error) {
|
||||
u := &unstructured.Unstructured{Object: map[string]interface{}{}}
|
||||
if err := json.Unmarshal(obj.Raw, u); err != nil {
|
||||
return runtime.RawExtension{}, fmt.Errorf("Fail to deserialize object: %s with error: %v", string(obj.Raw), err)
|
||||
}
|
||||
|
||||
currentAPIVersion := u.Object["apiVersion"]
|
||||
if currentAPIVersion == "v1beta2" && desiredAPIVersion == "v1beta1" {
|
||||
u.Object["num"] = u.Object["numv2"]
|
||||
u.Object["content"] = u.Object["contentv2"]
|
||||
delete(u.Object, "numv2")
|
||||
delete(u.Object, "contentv2")
|
||||
}
|
||||
if currentAPIVersion == "v1beta1" && desiredAPIVersion == "v1beta2" {
|
||||
u.Object["numv2"] = u.Object["num"]
|
||||
u.Object["contentv2"] = u.Object["content"]
|
||||
delete(u.Object, "num")
|
||||
delete(u.Object, "content")
|
||||
}
|
||||
u.Object["apiVersion"] = desiredAPIVersion
|
||||
raw, err := json.Marshal(u)
|
||||
if err != nil {
|
||||
return runtime.RawExtension{}, fmt.Errorf("Fail to serialize object: %v with error: %v", u, err)
|
||||
}
|
||||
return runtime.RawExtension{Raw: raw}, nil
|
||||
}
|
||||
|
||||
func newConversionTestContext(t *testing.T, apiExtensionsClient clientset.Interface, dynamicClient dynamic.Interface, etcdObjectReader *storage.EtcdObjectReader, crd *apiextensionsv1beta1.CustomResourceDefinition) (func(), *conversionTestContext) {
|
||||
crd, err := fixtures.CreateNewCustomResourceDefinition(crd, apiExtensionsClient, dynamicClient)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
tearDown := func() {
|
||||
if err := fixtures.DeleteCustomResourceDefinition(crd, apiExtensionsClient); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
return tearDown, &conversionTestContext{apiExtensionsClient: apiExtensionsClient, dynamicClient: dynamicClient, crd: crd, etcdObjectReader: etcdObjectReader}
|
||||
}
|
||||
|
||||
type conversionTestContext struct {
|
||||
namespace string
|
||||
apiExtensionsClient clientset.Interface
|
||||
dynamicClient dynamic.Interface
|
||||
options *options.CustomResourceDefinitionsServerOptions
|
||||
crd *apiextensionsv1beta1.CustomResourceDefinition
|
||||
etcdObjectReader *storage.EtcdObjectReader
|
||||
}
|
||||
|
||||
func (c *conversionTestContext) versionedClient(ns string, version string) dynamic.ResourceInterface {
|
||||
return newNamespacedCustomResourceVersionedClient(ns, c.dynamicClient, c.crd, version)
|
||||
}
|
||||
|
||||
func (c *conversionTestContext) setConversionWebhook(t *testing.T, webhookClientConfig *apiextensionsv1beta1.WebhookClientConfig) {
|
||||
crd, err := c.apiExtensionsClient.ApiextensionsV1beta1().CustomResourceDefinitions().Get(c.crd.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
crd.Spec.Conversion = &apiextensionsv1beta1.CustomResourceConversion{
|
||||
Strategy: apiextensionsv1beta1.WebhookConverter,
|
||||
WebhookClientConfig: webhookClientConfig,
|
||||
}
|
||||
crd, err = c.apiExtensionsClient.ApiextensionsV1beta1().CustomResourceDefinitions().Update(crd)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
c.crd = crd
|
||||
|
||||
}
|
||||
|
||||
func (c *conversionTestContext) removeConversionWebhook(t *testing.T) {
|
||||
crd, err := c.apiExtensionsClient.ApiextensionsV1beta1().CustomResourceDefinitions().Get(c.crd.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
crd.Spec.Conversion = &apiextensionsv1beta1.CustomResourceConversion{
|
||||
Strategy: apiextensionsv1beta1.NoneConverter,
|
||||
}
|
||||
|
||||
crd, err = c.apiExtensionsClient.ApiextensionsV1beta1().CustomResourceDefinitions().Update(crd)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
c.crd = crd
|
||||
}
|
||||
|
||||
func (c *conversionTestContext) setAndWaitStorageVersion(t *testing.T, version string) {
|
||||
c.setStorageVersion(t, "v1beta2")
|
||||
|
||||
client := c.versionedClient("probe", "v1beta2")
|
||||
name := fmt.Sprintf("probe-%v", uuid.NewUUID())
|
||||
storageProbe, err := client.Create(newConversionMultiVersionFixture("probe", name, "v1beta2"), metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
c.waitForStorageVersion(t, "v1beta2", c.versionedClient(storageProbe.GetNamespace(), "v1beta2"), storageProbe)
|
||||
|
||||
err = client.Delete(name, &metav1.DeleteOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *conversionTestContext) setStorageVersion(t *testing.T, version string) {
|
||||
crd, err := c.apiExtensionsClient.ApiextensionsV1beta1().CustomResourceDefinitions().Get(c.crd.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for i, v := range crd.Spec.Versions {
|
||||
crd.Spec.Versions[i].Storage = (v.Name == version)
|
||||
}
|
||||
crd, err = c.apiExtensionsClient.ApiextensionsV1beta1().CustomResourceDefinitions().Update(crd)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
c.crd = crd
|
||||
}
|
||||
|
||||
func (c *conversionTestContext) waitForStorageVersion(t *testing.T, version string, versionedClient dynamic.ResourceInterface, obj *unstructured.Unstructured) *unstructured.Unstructured {
|
||||
c.etcdObjectReader.WaitForStorageVersion(version, obj.GetNamespace(), obj.GetName(), 30*time.Second, func() {
|
||||
var err error
|
||||
obj, err = versionedClient.Update(obj, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to update object: %v", err)
|
||||
}
|
||||
})
|
||||
return obj
|
||||
}
|
||||
|
||||
func (c *conversionTestContext) setServed(t *testing.T, version string, served bool) {
|
||||
crd, err := c.apiExtensionsClient.ApiextensionsV1beta1().CustomResourceDefinitions().Get(c.crd.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for i, v := range crd.Spec.Versions {
|
||||
if v.Name == version {
|
||||
crd.Spec.Versions[i].Served = served
|
||||
}
|
||||
}
|
||||
crd, err = c.apiExtensionsClient.ApiextensionsV1beta1().CustomResourceDefinitions().Update(crd)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
c.crd = crd
|
||||
}
|
||||
|
||||
func (c *conversionTestContext) waitForServed(t *testing.T, version string, served bool, versionedClient dynamic.ResourceInterface, obj *unstructured.Unstructured) {
|
||||
timeout := 30 * time.Second
|
||||
waitCh := time.After(timeout)
|
||||
for {
|
||||
obj, err := versionedClient.Get(obj.GetName(), metav1.GetOptions{})
|
||||
if (err == nil && served) || (errors.IsNotFound(err) && served == false) {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-waitCh:
|
||||
t.Fatalf("Timed out after %v waiting for CRD served=%t for version %s for %v. Last error: %v", timeout, served, version, obj, err)
|
||||
case <-time.After(10 * time.Millisecond):
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var multiVersionFixture = &apiextensionsv1beta1.CustomResourceDefinition{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "multiversion.stable.example.com"},
|
||||
Spec: apiextensionsv1beta1.CustomResourceDefinitionSpec{
|
||||
Group: "stable.example.com",
|
||||
Version: "v1beta1",
|
||||
Names: apiextensionsv1beta1.CustomResourceDefinitionNames{
|
||||
Plural: "multiversion",
|
||||
Singular: "multiversion",
|
||||
Kind: "MultiVersion",
|
||||
ShortNames: []string{"mv"},
|
||||
ListKind: "MultiVersionList",
|
||||
Categories: []string{"all"},
|
||||
},
|
||||
Scope: apiextensionsv1beta1.NamespaceScoped,
|
||||
Versions: []apiextensionsv1beta1.CustomResourceDefinitionVersion{
|
||||
{
|
||||
Name: "v1beta1",
|
||||
Served: true,
|
||||
Storage: false,
|
||||
},
|
||||
{
|
||||
Name: "v1beta2",
|
||||
Served: true,
|
||||
Storage: true,
|
||||
},
|
||||
},
|
||||
Subresources: &apiextensionsv1beta1.CustomResourceSubresources{
|
||||
Status: &apiextensionsv1beta1.CustomResourceSubresourceStatus{},
|
||||
Scale: &apiextensionsv1beta1.CustomResourceSubresourceScale{
|
||||
SpecReplicasPath: ".spec.num.num1",
|
||||
StatusReplicasPath: ".status.num.num2",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
func newConversionMultiVersionFixture(namespace, name, version string) *unstructured.Unstructured {
|
||||
return &unstructured.Unstructured{
|
||||
Object: map[string]interface{}{
|
||||
"apiVersion": "stable.example.com/" + version,
|
||||
"kind": "MultiVersion",
|
||||
"metadata": map[string]interface{}{
|
||||
"namespace": namespace,
|
||||
"name": name,
|
||||
},
|
||||
"content": map[string]interface{}{
|
||||
"key": "value",
|
||||
},
|
||||
"num": map[string]interface{}{
|
||||
"num1": 1,
|
||||
"num2": 1000000,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
@ -0,0 +1,29 @@
|
||||
load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["webhook.go"],
|
||||
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiextensions-apiserver/test/integration/convert",
|
||||
importpath = "k8s.io/apiextensions-apiserver/test/integration/convert",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//staging/src/k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
@ -0,0 +1,206 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package convert
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
)
|
||||
|
||||
// WaitReadyFunc calls triggerConversionFn periodically and waits until it detects that the webhook
|
||||
// conversion server has handled at least 1 conversion request or the timeout is exceeded, in which
|
||||
// case an error is returned.
|
||||
type WaitReadyFunc func(timeout time.Duration, triggerConversionFn func() error) error
|
||||
|
||||
// StartConversionWebhookServerWithWaitReady starts an http server with the provided handler and returns the WebhookClientConfig
|
||||
// needed to configure a CRD to use this conversion webhook as its converter.
|
||||
// It also returns a WaitReadyFunc to be called after the CRD is configured to wait until the conversion webhook handler
|
||||
// accepts at least one conversion request. If the server fails to start, an error is returned.
|
||||
// WaitReady is useful when changing the conversion webhook config of an existing CRD because the update does not take effect immediately.
|
||||
func StartConversionWebhookServerWithWaitReady(handler http.Handler) (func(), *apiextensionsv1beta1.WebhookClientConfig, WaitReadyFunc, error) {
|
||||
var once sync.Once
|
||||
handlerReadyC := make(chan struct{})
|
||||
readyNotifyHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
once.Do(func() {
|
||||
close(handlerReadyC)
|
||||
})
|
||||
handler.ServeHTTP(w, r)
|
||||
})
|
||||
|
||||
tearDown, webhookConfig, err := StartConversionWebhookServer(readyNotifyHandler)
|
||||
if err != nil {
|
||||
return nil, nil, nil, fmt.Errorf("error starting webhook server: %v", err)
|
||||
}
|
||||
|
||||
waitReady := func(timeout time.Duration, triggerConversionFn func() error) error {
|
||||
var err error
|
||||
for {
|
||||
select {
|
||||
case <-handlerReadyC:
|
||||
return nil
|
||||
case <-time.After(timeout):
|
||||
return fmt.Errorf("Timed out waiting for CRD webhook converter update, last trigger conversion error: %v", err)
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
err = triggerConversionFn()
|
||||
}
|
||||
}
|
||||
}
|
||||
return tearDown, webhookConfig, waitReady, err
|
||||
}
|
||||
|
||||
// StartConversionWebhookServer starts an http server with the provided handler and returns the WebhookClientConfig
|
||||
// needed to configure a CRD to use this conversion webhook as its converter.
|
||||
func StartConversionWebhookServer(handler http.Handler) (func(), *apiextensionsv1beta1.WebhookClientConfig, error) {
|
||||
// Use a unique path for each webhook server. This ensures that all conversion requests
|
||||
// received by the handler are intended for it; if a WebhookClientConfig other than this one
|
||||
// is applied in the api server, conversion requests will not reach the handler (if they
|
||||
// reach the server they will be returned at 404). This helps prevent tests that require a
|
||||
// specific conversion webhook from accidentally using a different one, which could otherwise
|
||||
// cause a test to flake or pass when it should fail. Since updating the conversion client
|
||||
// config of a custom resource definition does not take effect immediately, this is needed
|
||||
// by the WaitReady returned StartConversionWebhookServerWithWaitReady to detect when a
|
||||
// conversion client config change in the api server has taken effect.
|
||||
path := fmt.Sprintf("/conversionwebhook-%s", uuid.NewUUID())
|
||||
roots := x509.NewCertPool()
|
||||
if !roots.AppendCertsFromPEM(localhostCert) {
|
||||
return nil, nil, fmt.Errorf("Failed to append Cert from PEM")
|
||||
}
|
||||
cert, err := tls.X509KeyPair(localhostCert, localhostKey)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("Failed to build cert with error: %+v", err)
|
||||
}
|
||||
webhookMux := http.NewServeMux()
|
||||
webhookMux.Handle(path, handler)
|
||||
webhookServer := httptest.NewUnstartedServer(webhookMux)
|
||||
webhookServer.TLS = &tls.Config{
|
||||
RootCAs: roots,
|
||||
Certificates: []tls.Certificate{cert},
|
||||
}
|
||||
webhookServer.StartTLS()
|
||||
endpoint := webhookServer.URL + path
|
||||
webhookConfig := &apiextensionsv1beta1.WebhookClientConfig{
|
||||
CABundle: localhostCert,
|
||||
URL: &endpoint,
|
||||
}
|
||||
return webhookServer.Close, webhookConfig, nil
|
||||
}
|
||||
|
||||
// ReviewConverterFunc converts an entire ConversionReview.
|
||||
type ReviewConverterFunc func(review apiextensionsv1beta1.ConversionReview) (apiextensionsv1beta1.ConversionReview, error)
|
||||
|
||||
// NewReviewWebhookHandler creates a handler that delegates the review conversion to the provided ReviewConverterFunc.
|
||||
func NewReviewWebhookHandler(t *testing.T, converterFunc ReviewConverterFunc) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
defer r.Body.Close()
|
||||
data, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
if contentType := r.Header.Get("Content-Type"); contentType != "application/json" {
|
||||
t.Errorf("contentType=%s, expect application/json", contentType)
|
||||
return
|
||||
}
|
||||
|
||||
review := apiextensionsv1beta1.ConversionReview{}
|
||||
if err := json.Unmarshal(data, &review); err != nil {
|
||||
t.Errorf("Fail to deserialize object: %s with error: %v", string(data), err)
|
||||
http.Error(w, err.Error(), 400)
|
||||
return
|
||||
}
|
||||
|
||||
review, err = converterFunc(review)
|
||||
if err != nil {
|
||||
t.Errorf("Error converting review: %v", err)
|
||||
http.Error(w, err.Error(), 500)
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
if err := json.NewEncoder(w).Encode(review); err != nil {
|
||||
t.Errorf("Marshal of response failed with error: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// ObjectConverterFunc converts a single custom resource to the desiredAPIVersion and returns it or returns an error.
|
||||
type ObjectConverterFunc func(desiredAPIVersion string, customResource runtime.RawExtension) (runtime.RawExtension, error)
|
||||
|
||||
// NewObjectConverterWebhookHandler creates a handler that delegates custom resource conversion to the provided ConverterFunc.
|
||||
func NewObjectConverterWebhookHandler(t *testing.T, converterFunc ObjectConverterFunc) http.Handler {
|
||||
return NewReviewWebhookHandler(t, func(review apiextensionsv1beta1.ConversionReview) (apiextensionsv1beta1.ConversionReview, error) {
|
||||
converted := []runtime.RawExtension{}
|
||||
errMsgs := []string{}
|
||||
for _, obj := range review.Request.Objects {
|
||||
convertedObj, err := converterFunc(review.Request.DesiredAPIVersion, obj)
|
||||
if err != nil {
|
||||
errMsgs = append(errMsgs, err.Error())
|
||||
}
|
||||
|
||||
converted = append(converted, convertedObj)
|
||||
}
|
||||
|
||||
review.Response = &apiextensionsv1beta1.ConversionResponse{
|
||||
UID: review.Request.UID,
|
||||
ConvertedObjects: converted,
|
||||
}
|
||||
if len(errMsgs) == 0 {
|
||||
review.Response.Result = metav1.Status{Status: "Success"}
|
||||
} else {
|
||||
review.Response.Result = metav1.Status{Status: "Failure", Message: strings.Join(errMsgs, ", ")}
|
||||
}
|
||||
return review, nil
|
||||
})
|
||||
}
|
||||
|
||||
// localhostCert was generated from crypto/tls/generate_cert.go with the following command:
|
||||
// go run generate_cert.go --rsa-bits 512 --host 127.0.0.1,::1,example.com --ca --start-date "Jan 1 00:00:00 1970" --duration=1000000h
|
||||
var localhostCert = []byte(`-----BEGIN CERTIFICATE-----
|
||||
MIIBjzCCATmgAwIBAgIRAKpi2WmTcFrVjxrl5n5YDUEwDQYJKoZIhvcNAQELBQAw
|
||||
EjEQMA4GA1UEChMHQWNtZSBDbzAgFw03MDAxMDEwMDAwMDBaGA8yMDg0MDEyOTE2
|
||||
MDAwMFowEjEQMA4GA1UEChMHQWNtZSBDbzBcMA0GCSqGSIb3DQEBAQUAA0sAMEgC
|
||||
QQC9fEbRszP3t14Gr4oahV7zFObBI4TfA5i7YnlMXeLinb7MnvT4bkfOJzE6zktn
|
||||
59zP7UiHs3l4YOuqrjiwM413AgMBAAGjaDBmMA4GA1UdDwEB/wQEAwICpDATBgNV
|
||||
HSUEDDAKBggrBgEFBQcDATAPBgNVHRMBAf8EBTADAQH/MC4GA1UdEQQnMCWCC2V4
|
||||
YW1wbGUuY29thwR/AAABhxAAAAAAAAAAAAAAAAAAAAABMA0GCSqGSIb3DQEBCwUA
|
||||
A0EAUsVE6KMnza/ZbodLlyeMzdo7EM/5nb5ywyOxgIOCf0OOLHsPS9ueGLQX9HEG
|
||||
//yjTXuhNcUugExIjM/AIwAZPQ==
|
||||
-----END CERTIFICATE-----`)
|
||||
|
||||
// localhostKey is the private key for localhostCert.
|
||||
var localhostKey = []byte(`-----BEGIN RSA PRIVATE KEY-----
|
||||
MIIBOwIBAAJBAL18RtGzM/e3XgavihqFXvMU5sEjhN8DmLtieUxd4uKdvsye9Phu
|
||||
R84nMTrOS2fn3M/tSIezeXhg66quOLAzjXcCAwEAAQJBAKcRxH9wuglYLBdI/0OT
|
||||
BLzfWPZCEw1vZmMR2FF1Fm8nkNOVDPleeVGTWoOEcYYlQbpTmkGSxJ6ya+hqRi6x
|
||||
goECIQDx3+X49fwpL6B5qpJIJMyZBSCuMhH4B7JevhGGFENi3wIhAMiNJN5Q3UkL
|
||||
IuSvv03kaPR5XVQ99/UeEetUgGvBcABpAiBJSBzVITIVCGkGc7d+RCf49KTCIklv
|
||||
bGWObufAR8Ni4QIgWpILjW8dkGg8GOUZ0zaNA6Nvt6TIv2UWGJ4v5PoV98kCIQDx
|
||||
rIiZs5QbKdycsv9gQJzwQAogC8o04X3Zz3dsoX+h4A==
|
||||
-----END RSA PRIVATE KEY-----`)
|
@ -0,0 +1,31 @@
|
||||
load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["objectreader.go"],
|
||||
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiextensions-apiserver/test/integration/storage",
|
||||
importpath = "k8s.io/apiextensions-apiserver/test/integration/storage",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//staging/src/k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/registry/generic:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/clientv3:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/pkg/transport:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
@ -0,0 +1,111 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"path"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apiserver/pkg/registry/generic"
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||
)
|
||||
|
||||
// EtcdObjectReader provides direct access to custom resource objects stored in etcd.
|
||||
type EtcdObjectReader struct {
|
||||
etcdClient *clientv3.Client
|
||||
storagePrefix string
|
||||
crd *apiextensionsv1beta1.CustomResourceDefinition
|
||||
}
|
||||
|
||||
// NewEtcdObjectReader creates a reader for accessing custom resource objects directly from etcd.
|
||||
func NewEtcdObjectReader(etcdClient *clientv3.Client, restOptions *generic.RESTOptions, crd *apiextensionsv1beta1.CustomResourceDefinition) *EtcdObjectReader {
|
||||
return &EtcdObjectReader{etcdClient, restOptions.StorageConfig.Prefix, crd}
|
||||
}
|
||||
|
||||
// WaitForStorageVersion calls the updateObjFn periodically and waits for the version of the custom resource stored in etcd to be set to the provided version.
|
||||
// Typically updateObjFn should perform a noop update to the object so that when stored version of a CRD changes, the object is written at the updated storage version.
|
||||
// If the timeout is exceeded a error is returned.
|
||||
// This is useful when updating the stored version of an existing CRD because the update does not take effect immediately.
|
||||
func (s *EtcdObjectReader) WaitForStorageVersion(version string, ns, name string, timeout time.Duration, updateObjFn func()) error {
|
||||
waitCh := time.After(timeout)
|
||||
for {
|
||||
storage, err := s.GetStoredCustomResource(ns, name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if storage.GetObjectKind().GroupVersionKind().Version == version {
|
||||
return nil
|
||||
}
|
||||
select {
|
||||
case <-waitCh:
|
||||
return fmt.Errorf("timed out after %v waiting for storage version to be %s for object (namespace:%s name:%s)", timeout, version, ns, name)
|
||||
case <-time.After(10 * time.Millisecond):
|
||||
updateObjFn()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetStoredCustomResource gets the storage representation of a custom resource from etcd.
|
||||
func (s *EtcdObjectReader) GetStoredCustomResource(ns, name string) (*unstructured.Unstructured, error) {
|
||||
key := path.Join("/", s.storagePrefix, s.crd.Spec.Group, s.crd.Spec.Names.Plural, ns, name)
|
||||
resp, err := s.etcdClient.KV.Get(context.Background(), key)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error getting storage object %s, %s from etcd at key %s: %v", ns, name, key, err)
|
||||
}
|
||||
if len(resp.Kvs) == 0 {
|
||||
return nil, fmt.Errorf("no storage object found for %s, %s in etcd for key %s", ns, name, key)
|
||||
}
|
||||
raw := resp.Kvs[0].Value
|
||||
u := &unstructured.Unstructured{Object: map[string]interface{}{}}
|
||||
if err := json.Unmarshal(raw, u); err != nil {
|
||||
return nil, fmt.Errorf("error deserializing object %s: %v", string(raw), err)
|
||||
}
|
||||
return u, nil
|
||||
}
|
||||
|
||||
// GetEtcdClients returns an initialized clientv3.Client and clientv3.KV.
|
||||
func GetEtcdClients(config storagebackend.TransportConfig) (*clientv3.Client, clientv3.KV, error) {
|
||||
tlsInfo := transport.TLSInfo{
|
||||
CertFile: config.CertFile,
|
||||
KeyFile: config.KeyFile,
|
||||
CAFile: config.CAFile,
|
||||
}
|
||||
|
||||
tlsConfig, err := tlsInfo.ClientConfig()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
cfg := clientv3.Config{
|
||||
Endpoints: config.ServerList,
|
||||
TLS: tlsConfig,
|
||||
}
|
||||
|
||||
c, err := clientv3.New(cfg)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return c, clientv3.NewKV(c), nil
|
||||
}
|
@ -56,9 +56,15 @@ func testingDeferOnDecodeError() {
|
||||
|
||||
func init() {
|
||||
// check to see if we are running in a test environment
|
||||
TestOnlySetFatalOnDecodeError(true)
|
||||
fatalOnDecodeError, _ = strconv.ParseBool(os.Getenv("KUBE_PANIC_WATCH_DECODE_ERROR"))
|
||||
}
|
||||
|
||||
// TestOnlySetFatalOnDecodeError should only be used for cases where decode errors are expected and need to be tested. e.g. conversion webhooks.
|
||||
func TestOnlySetFatalOnDecodeError(b bool) {
|
||||
fatalOnDecodeError = b
|
||||
}
|
||||
|
||||
type watcher struct {
|
||||
client *clientv3.Client
|
||||
codec runtime.Codec
|
||||
|
Loading…
Reference in New Issue
Block a user