From ee215ba705500c864f207668a0ba1309e0880cfb Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Mon, 13 May 2019 10:58:54 -0400 Subject: [PATCH] Graceful custom resource storage teardown --- .../pkg/apiserver/BUILD | 3 + .../pkg/apiserver/apiserver.go | 1 + .../pkg/apiserver/customresource_handler.go | 88 ++++++++---- .../test/integration/BUILD | 1 + .../test/integration/change_test.go | 125 ++++++++++++++++++ 5 files changed, 195 insertions(+), 23 deletions(-) create mode 100644 staging/src/k8s.io/apiextensions-apiserver/test/integration/change_test.go diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/BUILD b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/BUILD index d624136ac96..60686c95c91 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/BUILD +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/BUILD @@ -53,7 +53,9 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/runtime/serializer/versioning:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/waitgroup:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/version:go_default_library", "//staging/src/k8s.io/apiserver/pkg/admission:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library", @@ -68,6 +70,7 @@ go_library( "//staging/src/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library", "//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/server/filters:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/storage:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go index 1a1496ec8f6..9883471c647 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go @@ -187,6 +187,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) c.ExtraConfig.AuthResolverWrapper, c.ExtraConfig.MasterCount, s.GenericAPIServer.Authorizer, + c.GenericConfig.RequestTimeout, ) if err != nil { return nil, err diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go index a06070c5724..0cfbe187295 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go @@ -52,6 +52,8 @@ import ( "k8s.io/apimachinery/pkg/runtime/serializer/versioning" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" + utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup" "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/authorization/authorizer" "k8s.io/apiserver/pkg/endpoints/handlers" @@ -62,6 +64,7 @@ import ( "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/registry/generic" genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" + genericfilters "k8s.io/apiserver/pkg/server/filters" "k8s.io/apiserver/pkg/storage/storagebackend" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/apiserver/pkg/util/webhook" @@ -100,6 +103,9 @@ type crdHandler struct { // so that we can do create on update. authorizer authorizer.Authorizer + + // request timeout we should delay storage teardown for + requestTimeout time.Duration } // crdInfo stores enough information to serve the storage for the custom resource @@ -123,6 +129,8 @@ type crdInfo struct { // storageVersion is the CRD version used when storing the object in etcd. storageVersion string + + waitGroup *utilwaitgroup.SafeWaitGroup } // crdStorageMap goes from customresourcedefinition to its storage @@ -139,7 +147,8 @@ func NewCustomResourceDefinitionHandler( serviceResolver webhook.ServiceResolver, authResolverWrapper webhook.AuthenticationInfoResolverWrapper, masterCount int, - authorizer authorizer.Authorizer) (*crdHandler, error) { + authorizer authorizer.Authorizer, + requestTimeout time.Duration) (*crdHandler, error) { ret := &crdHandler{ versionDiscoveryHandler: versionDiscoveryHandler, groupDiscoveryHandler: groupDiscoveryHandler, @@ -151,6 +160,7 @@ func NewCustomResourceDefinitionHandler( establishingController: establishingController, masterCount: masterCount, authorizer: authorizer, + requestTimeout: requestTimeout, } crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ UpdateFunc: ret.updateCustomResourceDefinition, @@ -169,6 +179,11 @@ func NewCustomResourceDefinitionHandler( return ret, nil } +// watches are expected to handle storage disruption gracefully, +// both on the server-side (by terminating the watch connection) +// and on the client side (by restarting the watch) +var longRunningFilter = genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()) + func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { ctx := req.Context() requestInfo, ok := apirequest.RequestInfoFrom(ctx) @@ -238,7 +253,7 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { supportedTypes = append(supportedTypes, string(types.ApplyPatchType)) } - var handler http.HandlerFunc + var handlerFunc http.HandlerFunc subresources, err := apiextensions.GetSubresourcesForVersion(crd, requestInfo.APIVersion) if err != nil { utilruntime.HandleError(err) @@ -247,18 +262,19 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { } switch { case subresource == "status" && subresources != nil && subresources.Status != nil: - handler = r.serveStatus(w, req, requestInfo, crdInfo, terminating, supportedTypes) + handlerFunc = r.serveStatus(w, req, requestInfo, crdInfo, terminating, supportedTypes) case subresource == "scale" && subresources != nil && subresources.Scale != nil: - handler = r.serveScale(w, req, requestInfo, crdInfo, terminating, supportedTypes) + handlerFunc = r.serveScale(w, req, requestInfo, crdInfo, terminating, supportedTypes) case len(subresource) == 0: - handler = r.serveResource(w, req, requestInfo, crdInfo, terminating, supportedTypes) + handlerFunc = r.serveResource(w, req, requestInfo, crdInfo, terminating, supportedTypes) default: http.Error(w, "the server could not find the requested resource", http.StatusNotFound) } - if handler != nil { - handler = metrics.InstrumentHandlerFunc(verb, requestInfo.APIGroup, requestInfo.APIVersion, resource, subresource, scope, metrics.APIServerComponent, handler) - handler(w, req) + if handlerFunc != nil { + handlerFunc = metrics.InstrumentHandlerFunc(verb, requestInfo.APIGroup, requestInfo.APIVersion, resource, subresource, scope, metrics.APIServerComponent, handlerFunc) + handler := genericfilters.WithWaitGroup(handlerFunc, longRunningFilter, crdInfo.waitGroup) + handler.ServeHTTP(w, req) return } } @@ -365,18 +381,18 @@ func (r *crdHandler) updateCustomResourceDefinition(oldObj, newObj interface{}) klog.V(4).Infof("Updating customresourcedefinition %s", oldCRD.Name) - // Copy because we cannot write to storageMap without a race - // as it is used without locking elsewhere. - storageMap2 := storageMap.clone() - if oldInfo, ok := storageMap2[types.UID(oldCRD.UID)]; ok { - for _, storage := range oldInfo.storages { - // destroy only the main storage. Those for the subresources share cacher and etcd clients. - storage.CustomResource.DestroyFunc() - } - delete(storageMap2, types.UID(oldCRD.UID)) - } + if oldInfo, ok := storageMap[types.UID(oldCRD.UID)]; ok { + // Copy because we cannot write to storageMap without a race + // as it is used without locking elsewhere. + storageMap2 := storageMap.clone() - r.customStorage.Store(storageMap2) + // Remove from the CRD info map and store the map + delete(storageMap2, types.UID(oldCRD.UID)) + r.customStorage.Store(storageMap2) + + // Tear down the old storage + go r.tearDown(oldInfo) + } } // removeDeadStorage removes REST storage that isn't being used @@ -390,6 +406,7 @@ func (r *crdHandler) removeDeadStorage() { r.customStorageLock.Lock() defer r.customStorageLock.Unlock() + oldInfos := []*crdInfo{} storageMap := r.customStorage.Load().(crdStorageMap) // Copy because we cannot write to storageMap without a race // as it is used without locking elsewhere @@ -404,14 +421,38 @@ func (r *crdHandler) removeDeadStorage() { } if !found { klog.V(4).Infof("Removing dead CRD storage for %s/%s", s.spec.Group, s.spec.Names.Kind) - for _, storage := range s.storages { - // destroy only the main storage. Those for the subresources share cacher and etcd clients. - storage.CustomResource.DestroyFunc() - } + oldInfos = append(oldInfos, s) delete(storageMap2, uid) } } r.customStorage.Store(storageMap2) + + for _, s := range oldInfos { + go r.tearDown(s) + } +} + +// Wait up to a minute for requests to drain, then tear down storage +func (r *crdHandler) tearDown(oldInfo *crdInfo) { + requestsDrained := make(chan struct{}) + go func() { + defer close(requestsDrained) + // Allow time for in-flight requests with a handle to the old info to register themselves + time.Sleep(time.Second) + // Wait for in-flight requests to drain + oldInfo.waitGroup.Wait() + }() + + select { + case <-time.After(r.requestTimeout * 2): + klog.Warningf("timeout waiting for requests to drain for %s/%s, tearing down storage", oldInfo.spec.Group, oldInfo.spec.Names.Kind) + case <-requestsDrained: + } + + for _, storage := range oldInfo.storages { + // destroy only the main storage. Those for the subresources share cacher and etcd clients. + storage.CustomResource.DestroyFunc() + } } // GetCustomResourceListerCollectionDeleter returns the ListerCollectionDeleter of @@ -622,6 +663,7 @@ func (r *crdHandler) getOrCreateServingInfoFor(crd *apiextensions.CustomResource scaleRequestScopes: scaleScopes, statusRequestScopes: statusScopes, storageVersion: storageVersion, + waitGroup: &utilwaitgroup.SafeWaitGroup{}, } // Copy because we cannot write to storageMap without a race diff --git a/staging/src/k8s.io/apiextensions-apiserver/test/integration/BUILD b/staging/src/k8s.io/apiextensions-apiserver/test/integration/BUILD index 546aa4920fe..eebb1c7511b 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/test/integration/BUILD +++ b/staging/src/k8s.io/apiextensions-apiserver/test/integration/BUILD @@ -11,6 +11,7 @@ go_test( srcs = [ "apply_test.go", "basic_test.go", + "change_test.go", "finalization_test.go", "objectmeta_test.go", "registration_test.go", diff --git a/staging/src/k8s.io/apiextensions-apiserver/test/integration/change_test.go b/staging/src/k8s.io/apiextensions-apiserver/test/integration/change_test.go new file mode 100644 index 00000000000..554a1c80691 --- /dev/null +++ b/staging/src/k8s.io/apiextensions-apiserver/test/integration/change_test.go @@ -0,0 +1,125 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "fmt" + "sync" + "testing" + "time" + + apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + "k8s.io/apiextensions-apiserver/test/integration/fixtures" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/dynamic" +) + +func TestChangeCRD(t *testing.T) { + tearDown, config, _, err := fixtures.StartDefaultServer(t) + if err != nil { + t.Fatal(err) + } + defer tearDown() + config.QPS = 1000 + config.Burst = 1000 + apiExtensionsClient, err := clientset.NewForConfig(config) + if err != nil { + t.Fatal(err) + } + dynamicClient, err := dynamic.NewForConfig(config) + if err != nil { + t.Fatal(err) + } + + noxuDefinition := fixtures.NewNoxuCustomResourceDefinition(apiextensionsv1beta1.NamespaceScoped) + noxuDefinition, err = fixtures.CreateNewCustomResourceDefinition(noxuDefinition, apiExtensionsClient, dynamicClient) + if err != nil { + t.Fatal(err) + } + + ns := "default" + noxuNamespacedResourceClient := newNamespacedCustomResourceVersionedClient(ns, dynamicClient, noxuDefinition, "v1beta1") + + stopChan := make(chan struct{}) + + wg := &sync.WaitGroup{} + + // Set up loop to modify CRD in the background + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-stopChan: + return + default: + } + + noxuDefinitionToUpdate, err := apiExtensionsClient.ApiextensionsV1beta1().CustomResourceDefinitions().Get(noxuDefinition.Name, metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + if len(noxuDefinitionToUpdate.Spec.Versions) == 1 { + v2 := noxuDefinitionToUpdate.Spec.Versions[0] + v2.Name = "v2" + v2.Served = true + v2.Storage = false + noxuDefinitionToUpdate.Spec.Versions = append(noxuDefinitionToUpdate.Spec.Versions, v2) + } else { + noxuDefinitionToUpdate.Spec.Versions = noxuDefinitionToUpdate.Spec.Versions[0:1] + } + if _, err := apiExtensionsClient.ApiextensionsV1beta1().CustomResourceDefinitions().Update(noxuDefinitionToUpdate); err != nil && !apierrors.IsConflict(err) { + t.Fatal(err) + } + time.Sleep(10 * time.Millisecond) + } + }() + + // Set up 100 loops creating and reading custom resources + for i := 0; i < 100; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + noxuInstanceToCreate := fixtures.NewNoxuInstance(ns, fmt.Sprintf("foo-%d", i)) + if _, err := noxuNamespacedResourceClient.Create(noxuInstanceToCreate, metav1.CreateOptions{}); err != nil { + t.Fatal(err) + } + for { + select { + case <-stopChan: + return + default: + if _, err := noxuNamespacedResourceClient.Get(noxuInstanceToCreate.GetName(), metav1.GetOptions{}); err != nil { + t.Fatal(err) + } + } + time.Sleep(10 * time.Millisecond) + } + }(i) + } + + // Let all the established get request loops soak + time.Sleep(5 * time.Second) + + // Tear down + close(stopChan) + + // Let loops drain + wg.Wait() +}