From ba59e14d442be8b6b42f707913df3e879c3e34c7 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Tue, 30 May 2017 20:24:12 -0700 Subject: [PATCH] Add TPR to CRD migration helper. --- cmd/kube-apiserver/app/BUILD | 1 + cmd/kube-apiserver/app/server.go | 7 +- hack/make-rules/test-cmd-util.sh | 109 +++++++++++++- pkg/master/master.go | 6 +- pkg/master/master_openapi_test.go | 2 +- pkg/master/master_test.go | 4 +- pkg/master/thirdparty/BUILD | 7 + pkg/master/thirdparty/thirdparty.go | 136 ++++++++++++++++-- .../thirdpartyresourcedata/storage/BUILD | 5 + .../thirdpartyresourcedata/storage/storage.go | 56 +++++++- .../etcd/etcd_storage_path_test.go | 2 +- test/integration/examples/apiserver_test.go | 2 +- test/integration/framework/master_utils.go | 2 +- 13 files changed, 313 insertions(+), 26 deletions(-) diff --git a/cmd/kube-apiserver/app/BUILD b/cmd/kube-apiserver/app/BUILD index aa343ba063c..46a7a00e941 100644 --- a/cmd/kube-apiserver/app/BUILD +++ b/cmd/kube-apiserver/app/BUILD @@ -82,6 +82,7 @@ go_library( "//vendor/k8s.io/apiserver/pkg/admission:go_default_library", "//vendor/k8s.io/apiserver/pkg/authentication/authenticator:go_default_library", "//vendor/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library", + "//vendor/k8s.io/apiserver/pkg/registry/generic:go_default_library", "//vendor/k8s.io/apiserver/pkg/server:go_default_library", "//vendor/k8s.io/apiserver/pkg/server/filters:go_default_library", "//vendor/k8s.io/apiserver/pkg/server/healthz:go_default_library", diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 2e92c5fe14a..f827d4677ef 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -47,6 +47,7 @@ import ( "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/authentication/authenticator" "k8s.io/apiserver/pkg/authorization/authorizer" + genericregistry "k8s.io/apiserver/pkg/registry/generic" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/filters" serverstorage "k8s.io/apiserver/pkg/server/storage" @@ -117,7 +118,7 @@ func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error { return err } - kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer, sharedInformers) + kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer, sharedInformers, apiExtensionsConfig.CRDRESTOptionsGetter) if err != nil { return err } @@ -161,8 +162,8 @@ func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error { } // CreateKubeAPIServer creates and wires a workable kube-apiserver -func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer genericapiserver.DelegationTarget, sharedInformers informers.SharedInformerFactory) (*master.Master, error) { - kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer) +func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer genericapiserver.DelegationTarget, sharedInformers informers.SharedInformerFactory, crdRESTOptionsGetter genericregistry.RESTOptionsGetter) (*master.Master, error) { + kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer, crdRESTOptionsGetter) if err != nil { return nil, err } diff --git a/hack/make-rules/test-cmd-util.sh b/hack/make-rules/test-cmd-util.sh index 87fc834188b..e3618809214 100644 --- a/hack/make-rules/test-cmd-util.sh +++ b/hack/make-rules/test-cmd-util.sh @@ -179,6 +179,7 @@ function kubectl-with-retry() # wait-for-pods-with-label "app=foo" "nginx-0nginx-1" function wait-for-pods-with-label() { + local i for i in $(seq 1 10); do kubeout=`kubectl get po -l $1 --template '{{range.items}}{{.metadata.name}}{{end}}' --sort-by metadata.name "${kube_flags[@]}"` if [[ $kubeout = $2 ]]; then @@ -1413,6 +1414,101 @@ __EOF__ kubectl delete thirdpartyresources/bar.company.com "${kube_flags[@]}" } +run_tpr_migration_tests() { + local i tries + create_and_use_new_namespace + + # Create CRD first. This is sort of backwards so we can create a marker below. + kubectl "${kube_flags_with_token[@]}" create -f - << __EOF__ +{ + "kind": "CustomResourceDefinition", + "apiVersion": "apiextensions.k8s.io/v1beta1", + "metadata": { + "name": "foos.company.crd" + }, + "spec": { + "group": "company.crd", + "version": "v1", + "names": { + "plural": "foos", + "kind": "Foo" + } + } +} +__EOF__ + # Wait for API to become available. + tries=0 + until kubectl "${kube_flags[@]}" get foos.company.crd || [ $tries -gt 10 ]; do + tries=$((tries+1)) + sleep ${tries} + done + kube::test::get_object_assert foos.company.crd '{{len .items}}' '0' + + # Create a marker that only exists in CRD so we know when CRD is active vs. TPR. + kubectl "${kube_flags[@]}" create -f - << __EOF__ +{ + "kind": "Foo", + "apiVersion": "company.crd/v1", + "metadata": { + "name": "crd-marker" + }, + "testValue": "only exists in CRD" +} +__EOF__ + kube::test::get_object_assert foos.company.crd '{{len .items}}' '1' + + # Now create a TPR that sits in front of the CRD and hides it. + kubectl "${kube_flags[@]}" create -f - << __EOF__ +{ + "kind": "ThirdPartyResource", + "apiVersion": "extensions/v1beta1", + "metadata": { + "name": "foo.company.crd" + }, + "versions": [ + { + "name": "v1" + } + ] +} +__EOF__ + # The marker should disappear. + kube::test::wait_object_assert foos.company.crd '{{len .items}}' '0' + + # Add some items to the TPR. + for i in {1..10}; do + kubectl "${kube_flags[@]}" create -f - << __EOF__ +{ + "kind": "Foo", + "apiVersion": "company.crd/v1", + "metadata": { + "name": "tpr-${i}" + }, + "testValue": "migrate-${i}" +} +__EOF__ + done + kube::test::get_object_assert foos.company.crd '{{len .items}}' '10' + + # Delete the TPR and wait for the CRD to take over. + kubectl "${kube_flags[@]}" delete thirdpartyresource/foo.company.crd + tries=0 + until kubectl "${kube_flags[@]}" get foos.company.crd/crd-marker || [ $tries -gt 10 ]; do + tries=$((tries+1)) + sleep ${tries} + done + kube::test::get_object_assert foos.company.crd/crd-marker '{{.testValue}}' 'only exists in CRD' + + # Check if the TPR items were migrated to CRD. + kube::test::get_object_assert foos.company.crd '{{len .items}}' '11' + for i in {1..10}; do + kube::test::get_object_assert foos.company.crd/tpr-${i} '{{.testValue}}' "migrate-${i}" + done + + # teardown + kubectl delete customresourcedefinitions/foos.company.crd "${kube_flags_with_token[@]}" +} + kube::util::non_native_resources() { local times @@ -2951,12 +3047,12 @@ runTests() { kube::log::status "Checking kubectl version" kubectl version - i=0 + ns_num=0 create_and_use_new_namespace() { - i=$(($i+1)) - kube::log::status "Creating namespace namespace${i}" - kubectl create namespace "namespace${i}" - kubectl config set-context "${CONTEXT}" --namespace="namespace${i}" + ns_num=$(($ns_num+1)) + kube::log::status "Creating namespace namespace${ns_num}" + kubectl create namespace "namespace${ns_num}" + kubectl config set-context "${CONTEXT}" --namespace="namespace${ns_num}" } kube_flags=( @@ -3290,6 +3386,9 @@ runTests() { if kube::test::if_supports_resource "${thirdpartyresources}" ; then run_tpr_tests + if kube::test::if_supports_resource "${customresourcedefinitions}" ; then + run_tpr_migration_tests + fi fi ################# diff --git a/pkg/master/master.go b/pkg/master/master.go index 1c42ae1c79a..7574a657df0 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -28,6 +28,7 @@ import ( utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apiserver/pkg/endpoints/discovery" "k8s.io/apiserver/pkg/registry/generic" + genericregistry "k8s.io/apiserver/pkg/registry/generic" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/healthz" serverstorage "k8s.io/apiserver/pkg/server/storage" @@ -210,7 +211,7 @@ func (c *Config) SkipComplete() completedConfig { // Certain config fields will be set to a default value if unset. // Certain config fields must be specified, including: // KubeletClientConfig -func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Master, error) { +func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget, crdRESTOptionsGetter genericregistry.RESTOptionsGetter) (*Master, error) { if reflect.DeepEqual(c.KubeletClientConfig, kubeletclient.KubeletClientConfig{}) { return nil, fmt.Errorf("Master.New() called with empty config.KubeletClientConfig") } @@ -254,7 +255,8 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) autoscalingrest.RESTStorageProvider{}, batchrest.RESTStorageProvider{}, certificatesrest.RESTStorageProvider{}, - extensionsrest.RESTStorageProvider{ResourceInterface: thirdparty.NewThirdPartyResourceServer(s, s.DiscoveryGroupManager, c.StorageFactory)}, + // TODO(enisoc): Remove crdRESTOptionsGetter input argument when TPR code is removed. + extensionsrest.RESTStorageProvider{ResourceInterface: thirdparty.NewThirdPartyResourceServer(s, s.DiscoveryGroupManager, c.StorageFactory, crdRESTOptionsGetter)}, networkingrest.RESTStorageProvider{}, policyrest.RESTStorageProvider{}, rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorizer}, diff --git a/pkg/master/master_openapi_test.go b/pkg/master/master_openapi_test.go index 4629e4159eb..7e9f0c78635 100644 --- a/pkg/master/master_openapi_test.go +++ b/pkg/master/master_openapi_test.go @@ -54,7 +54,7 @@ func TestValidOpenAPISpec(t *testing.T) { } config.GenericConfig.SwaggerConfig = genericapiserver.DefaultSwaggerConfig() - master, err := config.Complete().New(genericapiserver.EmptyDelegate) + master, err := config.Complete().New(genericapiserver.EmptyDelegate, nil) if err != nil { t.Fatalf("Error in bringing up the master: %v", err) } diff --git a/pkg/master/master_test.go b/pkg/master/master_test.go index 25d7ba010b2..b6bf30b1803 100644 --- a/pkg/master/master_test.go +++ b/pkg/master/master_test.go @@ -115,7 +115,7 @@ func setUp(t *testing.T) (*etcdtesting.EtcdTestServer, Config, *assert.Assertion func newMaster(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.Assertions) { etcdserver, config, assert := setUp(t) - master, err := config.Complete().New(genericapiserver.EmptyDelegate) + master, err := config.Complete().New(genericapiserver.EmptyDelegate, nil) if err != nil { t.Fatalf("Error in bringing up the master: %v", err) } @@ -141,7 +141,7 @@ func limitedAPIResourceConfigSource() *serverstorage.ResourceConfig { func newLimitedMaster(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.Assertions) { etcdserver, config, assert := setUp(t) config.APIResourceConfigSource = limitedAPIResourceConfigSource() - master, err := config.Complete().New(genericapiserver.EmptyDelegate) + master, err := config.Complete().New(genericapiserver.EmptyDelegate, nil) if err != nil { t.Fatalf("Error in bringing up the master: %v", err) } diff --git a/pkg/master/thirdparty/BUILD b/pkg/master/thirdparty/BUILD index 80d93b2121c..c32b4824273 100644 --- a/pkg/master/thirdparty/BUILD +++ b/pkg/master/thirdparty/BUILD @@ -27,9 +27,12 @@ go_library( "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/json:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apiserver/pkg/endpoints:go_default_library", @@ -40,12 +43,16 @@ go_library( "//vendor/k8s.io/apiserver/pkg/server:go_default_library", "//vendor/k8s.io/apiserver/pkg/server/storage:go_default_library", "//vendor/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", + "//vendor/k8s.io/client-go/discovery:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/util/workqueue:go_default_library", "//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library", "//vendor/k8s.io/kube-apiextensions-server/pkg/apis/apiextensions:go_default_library", + "//vendor/k8s.io/kube-apiextensions-server/pkg/apiserver:go_default_library", + "//vendor/k8s.io/kube-apiextensions-server/pkg/client/clientset/internalclientset/typed/apiextensions/internalversion:go_default_library", "//vendor/k8s.io/kube-apiextensions-server/pkg/client/informers/internalversion/apiextensions/internalversion:go_default_library", "//vendor/k8s.io/kube-apiextensions-server/pkg/client/listers/apiextensions/internalversion:go_default_library", + "//vendor/k8s.io/kube-apiextensions-server/pkg/registry/customresource:go_default_library", ], ) diff --git a/pkg/master/thirdparty/thirdparty.go b/pkg/master/thirdparty/thirdparty.go index 9a70bf81541..50df6f763c8 100644 --- a/pkg/master/thirdparty/thirdparty.go +++ b/pkg/master/thirdparty/thirdparty.go @@ -25,16 +25,26 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/json" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" genericapi "k8s.io/apiserver/pkg/endpoints" "k8s.io/apiserver/pkg/endpoints/discovery" + "k8s.io/apiserver/pkg/endpoints/request" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/generic" "k8s.io/apiserver/pkg/registry/rest" genericapiserver "k8s.io/apiserver/pkg/server" serverstorgage "k8s.io/apiserver/pkg/server/storage" "k8s.io/apiserver/pkg/storage/storagebackend" + discoveryclient "k8s.io/client-go/discovery" + "k8s.io/kube-apiextensions-server/pkg/apis/apiextensions" + apiextensionsserver "k8s.io/kube-apiextensions-server/pkg/apiserver" + apiextensionsclient "k8s.io/kube-apiextensions-server/pkg/client/clientset/internalclientset/typed/apiextensions/internalversion" + "k8s.io/kube-apiextensions-server/pkg/registry/customresource" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apis/extensions" extensionsrest "k8s.io/kubernetes/pkg/registry/extensions/rest" @@ -71,13 +81,16 @@ type ThirdPartyResourceServer struct { // Useful for reliable testing. Shouldn't be used otherwise. disableThirdPartyControllerForTesting bool + + crdRESTOptionsGetter generic.RESTOptionsGetter } -func NewThirdPartyResourceServer(genericAPIServer *genericapiserver.GenericAPIServer, availableGroupManager discovery.GroupManager, storageFactory serverstorgage.StorageFactory) *ThirdPartyResourceServer { +func NewThirdPartyResourceServer(genericAPIServer *genericapiserver.GenericAPIServer, availableGroupManager discovery.GroupManager, storageFactory serverstorgage.StorageFactory, crdRESTOptionsGetter generic.RESTOptionsGetter) *ThirdPartyResourceServer { ret := &ThirdPartyResourceServer{ genericAPIServer: genericAPIServer, thirdPartyResources: map[string]*thirdPartyEntry{}, availableGroupManager: availableGroupManager, + crdRESTOptionsGetter: crdRESTOptionsGetter, } var err error @@ -130,7 +143,7 @@ func (m *ThirdPartyResourceServer) removeThirdPartyStorage(path, resource string if !found { return nil } - if err := m.removeAllThirdPartyResources(storage); err != nil { + if err := m.removeThirdPartyResourceData(&entry.group, resource, storage); err != nil { return err } delete(entry.storage, resource) @@ -166,25 +179,132 @@ func (m *ThirdPartyResourceServer) RemoveThirdPartyResource(path string) error { return nil } -func (m *ThirdPartyResourceServer) removeAllThirdPartyResources(registry *thirdpartyresourcedatastore.REST) error { - ctx := genericapirequest.NewDefaultContext() +func (m *ThirdPartyResourceServer) removeThirdPartyResourceData(group *metav1.APIGroup, resource string, registry *thirdpartyresourcedatastore.REST) error { + // Freeze TPR data to prevent new writes via this apiserver process. + // Other apiservers can still write. This is best-effort because there + // are worse problems with TPR data than the possibility of going back + // in time when migrating to CRD [citation needed]. + registry.Freeze() + + ctx := genericapirequest.NewContext() existingData, err := registry.List(ctx, nil) if err != nil { return err } list, ok := existingData.(*extensions.ThirdPartyResourceDataList) if !ok { - return fmt.Errorf("expected a *ThirdPartyResourceDataList, got %#v", list) + return fmt.Errorf("expected a *ThirdPartyResourceDataList, got %T", existingData) } - for ix := range list.Items { - item := &list.Items[ix] - if _, _, err := registry.Delete(ctx, item.Name, nil); err != nil { + + // Migrate TPR data to CRD if requested. + gvk := schema.GroupVersionKind{Group: group.Name, Version: group.PreferredVersion.Version, Kind: registry.Kind()} + migrationRequested, err := m.migrateThirdPartyResourceData(gvk, resource, list) + if err != nil { + // Migration is best-effort. Log and continue. + utilruntime.HandleError(fmt.Errorf("failed to migrate TPR data: %v", err)) + } + + // Skip deletion of TPR data if migration was requested (whether or not it succeeded). + // This leaves the etcd data around for rollback, and to avoid sending DELETE watch events. + if migrationRequested { + return nil + } + + for i := range list.Items { + item := &list.Items[i] + + // Use registry.Store.Delete() to bypass the frozen registry.Delete(). + if _, _, err := registry.Store.Delete(genericapirequest.WithNamespace(ctx, item.Namespace), item.Name, nil); err != nil { return err } } return nil } +func (m *ThirdPartyResourceServer) findMatchingCRD(gvk schema.GroupVersionKind, resource string) (*apiextensions.CustomResourceDefinition, error) { + // CustomResourceDefinitionList does not implement the protobuf marshalling interface. + config := *m.genericAPIServer.LoopbackClientConfig + config.ContentType = "application/json" + crdClient, err := apiextensionsclient.NewForConfig(&config) + if err != nil { + return nil, fmt.Errorf("can't create apiextensions client: %v", err) + } + crdList, err := crdClient.CustomResourceDefinitions().List(metav1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("can't list CustomResourceDefinitions: %v", err) + } + for i := range crdList.Items { + item := &crdList.Items[i] + if item.Spec.Scope == apiextensions.NamespaceScoped && + item.Spec.Group == gvk.Group && item.Spec.Version == gvk.Version && + item.Status.AcceptedNames.Kind == gvk.Kind && item.Status.AcceptedNames.Plural == resource { + return item, nil + } + } + return nil, nil +} + +func (m *ThirdPartyResourceServer) migrateThirdPartyResourceData(gvk schema.GroupVersionKind, resource string, dataList *extensions.ThirdPartyResourceDataList) (bool, error) { + // A matching CustomResourceDefinition implies migration is requested. + crd, err := m.findMatchingCRD(gvk, resource) + if err != nil { + return false, fmt.Errorf("can't determine if TPR should migrate: %v", err) + } + if crd == nil { + // No migration requested. + return false, nil + } + + // Talk directly to CustomResource storage. + // We have to bypass the API server because TPR is shadowing CRD at this point. + storage := customresource.NewREST( + schema.GroupResource{Group: crd.Spec.Group, Resource: crd.Spec.Names.Plural}, + schema.GroupVersionKind{Group: crd.Spec.Group, Version: crd.Spec.Version, Kind: crd.Spec.Names.ListKind}, + apiextensionsserver.UnstructuredCopier{}, + customresource.NewStrategy(discoveryclient.NewUnstructuredObjectTyper(nil), true), + m.crdRESTOptionsGetter, + ) + + // Copy TPR data to CustomResource. + var errs []error + ctx := request.NewContext() + for i := range dataList.Items { + item := &dataList.Items[i] + + // Convert TPR data to Unstructured. + objMap := make(map[string]interface{}) + if err := json.Unmarshal(item.Data, &objMap); err != nil { + errs = append(errs, fmt.Errorf("can't unmarshal TPR data %q: %v", item.Name, err)) + continue + } + + // Convert metadata to Unstructured and merge with data. + // cf. thirdpartyresourcedata.encodeToJSON() + metaMap := make(map[string]interface{}) + buf, err := json.Marshal(&item.ObjectMeta) + if err != nil { + errs = append(errs, fmt.Errorf("can't marshal metadata for TPR data %q: %v", item.Name, err)) + continue + } + if err := json.Unmarshal(buf, &metaMap); err != nil { + errs = append(errs, fmt.Errorf("can't unmarshal TPR data %q: %v", item.Name, err)) + continue + } + // resourceVersion cannot be set when creating objects. + delete(metaMap, "resourceVersion") + objMap["metadata"] = metaMap + + // Store CustomResource. + obj := &unstructured.Unstructured{Object: objMap} + createCtx := request.WithNamespace(ctx, obj.GetNamespace()) + if _, err := storage.Create(createCtx, obj); err != nil { + errs = append(errs, fmt.Errorf("can't create CustomResource for TPR data %q: %v", item.Name, err)) + continue + } + } + return true, utilerrors.NewAggregate(errs) +} + // ListThirdPartyResources lists all currently installed third party resources // The format is / func (m *ThirdPartyResourceServer) ListThirdPartyResources() []string { diff --git a/pkg/registry/extensions/thirdpartyresourcedata/storage/BUILD b/pkg/registry/extensions/thirdpartyresourcedata/storage/BUILD index 2b1cf854388..c1300d6b0a6 100644 --- a/pkg/registry/extensions/thirdpartyresourcedata/storage/BUILD +++ b/pkg/registry/extensions/thirdpartyresourcedata/storage/BUILD @@ -35,9 +35,14 @@ go_library( "//pkg/apis/extensions:go_default_library", "//pkg/registry/cachesize:go_default_library", "//pkg/registry/extensions/thirdpartyresourcedata:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/internalversion:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library", "//vendor/k8s.io/apiserver/pkg/registry/generic:go_default_library", "//vendor/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library", + "//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library", ], ) diff --git a/pkg/registry/extensions/thirdpartyresourcedata/storage/storage.go b/pkg/registry/extensions/thirdpartyresourcedata/storage/storage.go index b19fbb5a47a..4680e735de9 100644 --- a/pkg/registry/extensions/thirdpartyresourcedata/storage/storage.go +++ b/pkg/registry/extensions/thirdpartyresourcedata/storage/storage.go @@ -18,20 +18,72 @@ package storage import ( "strings" + "sync/atomic" + "k8s.io/apimachinery/pkg/api/errors" + metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/generic" genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" + "k8s.io/apiserver/pkg/registry/rest" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/registry/cachesize" "k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresourcedata" ) -// REST implements a RESTStorage for ThirdPartyResourceData +// errFrozen is a transient error to indicate that clients should retry with backoff. +var errFrozen = errors.NewServiceUnavailable("TPR data is temporarily frozen") + +// REST implements a RESTStorage for ThirdPartyResourceData. type REST struct { *genericregistry.Store - kind string + kind string + frozen atomic.Value +} + +// Freeze causes all future calls to Create/Update/Delete/DeleteCollection to return a transient error. +// This is irreversible and meant for use when the TPR data is being deleted or migrated/abandoned. +func (r *REST) Freeze() { + r.frozen.Store(true) +} + +func (r *REST) isFrozen() bool { + return r.frozen.Load() != nil +} + +// Create is a wrapper to support Freeze. +func (r *REST) Create(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) { + if r.isFrozen() { + return nil, errFrozen + } + return r.Store.Create(ctx, obj) +} + +// Update is a wrapper to support Freeze. +func (r *REST) Update(ctx genericapirequest.Context, name string, objInfo rest.UpdatedObjectInfo) (runtime.Object, bool, error) { + if r.isFrozen() { + return nil, false, errFrozen + } + return r.Store.Update(ctx, name, objInfo) +} + +// Delete is a wrapper to support Freeze. +func (r *REST) Delete(ctx genericapirequest.Context, name string, options *metav1.DeleteOptions) (runtime.Object, bool, error) { + if r.isFrozen() { + return nil, false, errFrozen + } + return r.Store.Delete(ctx, name, options) +} + +// DeleteCollection is a wrapper to support Freeze. +func (r *REST) DeleteCollection(ctx genericapirequest.Context, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) (runtime.Object, error) { + if r.isFrozen() { + return nil, errFrozen + } + return r.Store.DeleteCollection(ctx, options, listOptions) } // NewREST returns a registry which will store ThirdPartyResourceData in the given helper diff --git a/test/integration/etcd/etcd_storage_path_test.go b/test/integration/etcd/etcd_storage_path_test.go index e2712ab98bb..936ac26b153 100644 --- a/test/integration/etcd/etcd_storage_path_test.go +++ b/test/integration/etcd/etcd_storage_path_test.go @@ -607,7 +607,7 @@ func startRealMasterOrDie(t *testing.T, certDir string) (*allClient, clientv3.KV kubeAPIServerConfig.APIResourceConfigSource = &allResourceSource{} // force enable all resources - kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.EmptyDelegate, sharedInformers) + kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.EmptyDelegate, sharedInformers, nil) if err != nil { t.Fatal(err) } diff --git a/test/integration/examples/apiserver_test.go b/test/integration/examples/apiserver_test.go index 7766282e7d2..87f401701f4 100644 --- a/test/integration/examples/apiserver_test.go +++ b/test/integration/examples/apiserver_test.go @@ -118,7 +118,7 @@ func TestAggregatedAPIServer(t *testing.T) { } kubeClientConfigValue.Store(kubeAPIServerConfig.GenericConfig.LoopbackClientConfig) - kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.EmptyDelegate, sharedInformers) + kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.EmptyDelegate, sharedInformers, nil) if err != nil { t.Fatal(err) } diff --git a/test/integration/framework/master_utils.go b/test/integration/framework/master_utils.go index 499a8424936..7adc063a708 100644 --- a/test/integration/framework/master_utils.go +++ b/test/integration/framework/master_utils.go @@ -253,7 +253,7 @@ func startMasterOrDie(masterConfig *master.Config, incomingServer *httptest.Serv } masterConfig.GenericConfig.SharedInformerFactory = extinformers.NewSharedInformerFactory(clientset, masterConfig.GenericConfig.LoopbackClientConfig.Timeout) - m, err = masterConfig.Complete().New(genericapiserver.EmptyDelegate) + m, err = masterConfig.Complete().New(genericapiserver.EmptyDelegate, nil) if err != nil { closeFn() glog.Fatalf("error in bringing up the master: %v", err)