mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-11-03 23:40:03 +00:00 
			
		
		
		
	Merge pull request #77816 from liggitt/graceful-crd
Graceful custom resource storage teardown
This commit is contained in:
		@@ -53,7 +53,9 @@ go_library(
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/runtime/serializer/versioning:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/util/waitgroup:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/version:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apiserver/pkg/admission:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
 | 
			
		||||
@@ -68,6 +70,7 @@ go_library(
 | 
			
		||||
        "//staging/src/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apiserver/pkg/server:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apiserver/pkg/server/filters:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apiserver/pkg/server/storage:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
 | 
			
		||||
 
 | 
			
		||||
@@ -187,6 +187,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
 | 
			
		||||
		c.ExtraConfig.AuthResolverWrapper,
 | 
			
		||||
		c.ExtraConfig.MasterCount,
 | 
			
		||||
		s.GenericAPIServer.Authorizer,
 | 
			
		||||
		c.GenericConfig.RequestTimeout,
 | 
			
		||||
	)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
 
 | 
			
		||||
@@ -52,6 +52,8 @@ import (
 | 
			
		||||
	"k8s.io/apimachinery/pkg/runtime/serializer/versioning"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/types"
 | 
			
		||||
	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/sets"
 | 
			
		||||
	utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup"
 | 
			
		||||
	"k8s.io/apiserver/pkg/admission"
 | 
			
		||||
	"k8s.io/apiserver/pkg/authorization/authorizer"
 | 
			
		||||
	"k8s.io/apiserver/pkg/endpoints/handlers"
 | 
			
		||||
@@ -62,6 +64,7 @@ import (
 | 
			
		||||
	"k8s.io/apiserver/pkg/features"
 | 
			
		||||
	"k8s.io/apiserver/pkg/registry/generic"
 | 
			
		||||
	genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
 | 
			
		||||
	genericfilters "k8s.io/apiserver/pkg/server/filters"
 | 
			
		||||
	"k8s.io/apiserver/pkg/storage/storagebackend"
 | 
			
		||||
	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | 
			
		||||
	"k8s.io/apiserver/pkg/util/webhook"
 | 
			
		||||
@@ -100,6 +103,9 @@ type crdHandler struct {
 | 
			
		||||
 | 
			
		||||
	// so that we can do create on update.
 | 
			
		||||
	authorizer authorizer.Authorizer
 | 
			
		||||
 | 
			
		||||
	// request timeout we should delay storage teardown for
 | 
			
		||||
	requestTimeout time.Duration
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// crdInfo stores enough information to serve the storage for the custom resource
 | 
			
		||||
@@ -123,6 +129,8 @@ type crdInfo struct {
 | 
			
		||||
 | 
			
		||||
	// storageVersion is the CRD version used when storing the object in etcd.
 | 
			
		||||
	storageVersion string
 | 
			
		||||
 | 
			
		||||
	waitGroup *utilwaitgroup.SafeWaitGroup
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// crdStorageMap goes from customresourcedefinition to its storage
 | 
			
		||||
@@ -139,7 +147,8 @@ func NewCustomResourceDefinitionHandler(
 | 
			
		||||
	serviceResolver webhook.ServiceResolver,
 | 
			
		||||
	authResolverWrapper webhook.AuthenticationInfoResolverWrapper,
 | 
			
		||||
	masterCount int,
 | 
			
		||||
	authorizer authorizer.Authorizer) (*crdHandler, error) {
 | 
			
		||||
	authorizer authorizer.Authorizer,
 | 
			
		||||
	requestTimeout time.Duration) (*crdHandler, error) {
 | 
			
		||||
	ret := &crdHandler{
 | 
			
		||||
		versionDiscoveryHandler: versionDiscoveryHandler,
 | 
			
		||||
		groupDiscoveryHandler:   groupDiscoveryHandler,
 | 
			
		||||
@@ -151,6 +160,7 @@ func NewCustomResourceDefinitionHandler(
 | 
			
		||||
		establishingController:  establishingController,
 | 
			
		||||
		masterCount:             masterCount,
 | 
			
		||||
		authorizer:              authorizer,
 | 
			
		||||
		requestTimeout:          requestTimeout,
 | 
			
		||||
	}
 | 
			
		||||
	crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
 | 
			
		||||
		UpdateFunc: ret.updateCustomResourceDefinition,
 | 
			
		||||
@@ -169,6 +179,11 @@ func NewCustomResourceDefinitionHandler(
 | 
			
		||||
	return ret, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// watches are expected to handle storage disruption gracefully,
 | 
			
		||||
// both on the server-side (by terminating the watch connection)
 | 
			
		||||
// and on the client side (by restarting the watch)
 | 
			
		||||
var longRunningFilter = genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString())
 | 
			
		||||
 | 
			
		||||
func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
 | 
			
		||||
	ctx := req.Context()
 | 
			
		||||
	requestInfo, ok := apirequest.RequestInfoFrom(ctx)
 | 
			
		||||
@@ -238,7 +253,7 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
 | 
			
		||||
		supportedTypes = append(supportedTypes, string(types.ApplyPatchType))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var handler http.HandlerFunc
 | 
			
		||||
	var handlerFunc http.HandlerFunc
 | 
			
		||||
	subresources, err := apiextensions.GetSubresourcesForVersion(crd, requestInfo.APIVersion)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		utilruntime.HandleError(err)
 | 
			
		||||
@@ -247,18 +262,19 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
 | 
			
		||||
	}
 | 
			
		||||
	switch {
 | 
			
		||||
	case subresource == "status" && subresources != nil && subresources.Status != nil:
 | 
			
		||||
		handler = r.serveStatus(w, req, requestInfo, crdInfo, terminating, supportedTypes)
 | 
			
		||||
		handlerFunc = r.serveStatus(w, req, requestInfo, crdInfo, terminating, supportedTypes)
 | 
			
		||||
	case subresource == "scale" && subresources != nil && subresources.Scale != nil:
 | 
			
		||||
		handler = r.serveScale(w, req, requestInfo, crdInfo, terminating, supportedTypes)
 | 
			
		||||
		handlerFunc = r.serveScale(w, req, requestInfo, crdInfo, terminating, supportedTypes)
 | 
			
		||||
	case len(subresource) == 0:
 | 
			
		||||
		handler = r.serveResource(w, req, requestInfo, crdInfo, terminating, supportedTypes)
 | 
			
		||||
		handlerFunc = r.serveResource(w, req, requestInfo, crdInfo, terminating, supportedTypes)
 | 
			
		||||
	default:
 | 
			
		||||
		http.Error(w, "the server could not find the requested resource", http.StatusNotFound)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if handler != nil {
 | 
			
		||||
		handler = metrics.InstrumentHandlerFunc(verb, requestInfo.APIGroup, requestInfo.APIVersion, resource, subresource, scope, metrics.APIServerComponent, handler)
 | 
			
		||||
		handler(w, req)
 | 
			
		||||
	if handlerFunc != nil {
 | 
			
		||||
		handlerFunc = metrics.InstrumentHandlerFunc(verb, requestInfo.APIGroup, requestInfo.APIVersion, resource, subresource, scope, metrics.APIServerComponent, handlerFunc)
 | 
			
		||||
		handler := genericfilters.WithWaitGroup(handlerFunc, longRunningFilter, crdInfo.waitGroup)
 | 
			
		||||
		handler.ServeHTTP(w, req)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
@@ -365,18 +381,18 @@ func (r *crdHandler) updateCustomResourceDefinition(oldObj, newObj interface{})
 | 
			
		||||
 | 
			
		||||
	klog.V(4).Infof("Updating customresourcedefinition %s", oldCRD.Name)
 | 
			
		||||
 | 
			
		||||
	// Copy because we cannot write to storageMap without a race
 | 
			
		||||
	// as it is used without locking elsewhere.
 | 
			
		||||
	storageMap2 := storageMap.clone()
 | 
			
		||||
	if oldInfo, ok := storageMap2[types.UID(oldCRD.UID)]; ok {
 | 
			
		||||
		for _, storage := range oldInfo.storages {
 | 
			
		||||
			// destroy only the main storage. Those for the subresources share cacher and etcd clients.
 | 
			
		||||
			storage.CustomResource.DestroyFunc()
 | 
			
		||||
		}
 | 
			
		||||
		delete(storageMap2, types.UID(oldCRD.UID))
 | 
			
		||||
	}
 | 
			
		||||
	if oldInfo, ok := storageMap[types.UID(oldCRD.UID)]; ok {
 | 
			
		||||
		// Copy because we cannot write to storageMap without a race
 | 
			
		||||
		// as it is used without locking elsewhere.
 | 
			
		||||
		storageMap2 := storageMap.clone()
 | 
			
		||||
 | 
			
		||||
	r.customStorage.Store(storageMap2)
 | 
			
		||||
		// Remove from the CRD info map and store the map
 | 
			
		||||
		delete(storageMap2, types.UID(oldCRD.UID))
 | 
			
		||||
		r.customStorage.Store(storageMap2)
 | 
			
		||||
 | 
			
		||||
		// Tear down the old storage
 | 
			
		||||
		go r.tearDown(oldInfo)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// removeDeadStorage removes REST storage that isn't being used
 | 
			
		||||
@@ -390,6 +406,7 @@ func (r *crdHandler) removeDeadStorage() {
 | 
			
		||||
	r.customStorageLock.Lock()
 | 
			
		||||
	defer r.customStorageLock.Unlock()
 | 
			
		||||
 | 
			
		||||
	oldInfos := []*crdInfo{}
 | 
			
		||||
	storageMap := r.customStorage.Load().(crdStorageMap)
 | 
			
		||||
	// Copy because we cannot write to storageMap without a race
 | 
			
		||||
	// as it is used without locking elsewhere
 | 
			
		||||
@@ -404,14 +421,38 @@ func (r *crdHandler) removeDeadStorage() {
 | 
			
		||||
		}
 | 
			
		||||
		if !found {
 | 
			
		||||
			klog.V(4).Infof("Removing dead CRD storage for %s/%s", s.spec.Group, s.spec.Names.Kind)
 | 
			
		||||
			for _, storage := range s.storages {
 | 
			
		||||
				// destroy only the main storage. Those for the subresources share cacher and etcd clients.
 | 
			
		||||
				storage.CustomResource.DestroyFunc()
 | 
			
		||||
			}
 | 
			
		||||
			oldInfos = append(oldInfos, s)
 | 
			
		||||
			delete(storageMap2, uid)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	r.customStorage.Store(storageMap2)
 | 
			
		||||
 | 
			
		||||
	for _, s := range oldInfos {
 | 
			
		||||
		go r.tearDown(s)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Wait up to a minute for requests to drain, then tear down storage
 | 
			
		||||
func (r *crdHandler) tearDown(oldInfo *crdInfo) {
 | 
			
		||||
	requestsDrained := make(chan struct{})
 | 
			
		||||
	go func() {
 | 
			
		||||
		defer close(requestsDrained)
 | 
			
		||||
		// Allow time for in-flight requests with a handle to the old info to register themselves
 | 
			
		||||
		time.Sleep(time.Second)
 | 
			
		||||
		// Wait for in-flight requests to drain
 | 
			
		||||
		oldInfo.waitGroup.Wait()
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	select {
 | 
			
		||||
	case <-time.After(r.requestTimeout * 2):
 | 
			
		||||
		klog.Warningf("timeout waiting for requests to drain for %s/%s, tearing down storage", oldInfo.spec.Group, oldInfo.spec.Names.Kind)
 | 
			
		||||
	case <-requestsDrained:
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, storage := range oldInfo.storages {
 | 
			
		||||
		// destroy only the main storage. Those for the subresources share cacher and etcd clients.
 | 
			
		||||
		storage.CustomResource.DestroyFunc()
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetCustomResourceListerCollectionDeleter returns the ListerCollectionDeleter of
 | 
			
		||||
@@ -622,6 +663,7 @@ func (r *crdHandler) getOrCreateServingInfoFor(crd *apiextensions.CustomResource
 | 
			
		||||
		scaleRequestScopes:  scaleScopes,
 | 
			
		||||
		statusRequestScopes: statusScopes,
 | 
			
		||||
		storageVersion:      storageVersion,
 | 
			
		||||
		waitGroup:           &utilwaitgroup.SafeWaitGroup{},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Copy because we cannot write to storageMap without a race
 | 
			
		||||
 
 | 
			
		||||
@@ -11,6 +11,7 @@ go_test(
 | 
			
		||||
    srcs = [
 | 
			
		||||
        "apply_test.go",
 | 
			
		||||
        "basic_test.go",
 | 
			
		||||
        "change_test.go",
 | 
			
		||||
        "finalization_test.go",
 | 
			
		||||
        "objectmeta_test.go",
 | 
			
		||||
        "registration_test.go",
 | 
			
		||||
 
 | 
			
		||||
@@ -0,0 +1,125 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2019 The Kubernetes Authors.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package integration
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
 | 
			
		||||
	"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
 | 
			
		||||
	"k8s.io/apiextensions-apiserver/test/integration/fixtures"
 | 
			
		||||
	apierrors "k8s.io/apimachinery/pkg/api/errors"
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/client-go/dynamic"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestChangeCRD(t *testing.T) {
 | 
			
		||||
	tearDown, config, _, err := fixtures.StartDefaultServer(t)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
	defer tearDown()
 | 
			
		||||
	config.QPS = 1000
 | 
			
		||||
	config.Burst = 1000
 | 
			
		||||
	apiExtensionsClient, err := clientset.NewForConfig(config)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
	dynamicClient, err := dynamic.NewForConfig(config)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	noxuDefinition := fixtures.NewNoxuCustomResourceDefinition(apiextensionsv1beta1.NamespaceScoped)
 | 
			
		||||
	noxuDefinition, err = fixtures.CreateNewCustomResourceDefinition(noxuDefinition, apiExtensionsClient, dynamicClient)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ns := "default"
 | 
			
		||||
	noxuNamespacedResourceClient := newNamespacedCustomResourceVersionedClient(ns, dynamicClient, noxuDefinition, "v1beta1")
 | 
			
		||||
 | 
			
		||||
	stopChan := make(chan struct{})
 | 
			
		||||
 | 
			
		||||
	wg := &sync.WaitGroup{}
 | 
			
		||||
 | 
			
		||||
	// Set up loop to modify CRD in the background
 | 
			
		||||
	wg.Add(1)
 | 
			
		||||
	go func() {
 | 
			
		||||
		defer wg.Done()
 | 
			
		||||
		for {
 | 
			
		||||
			select {
 | 
			
		||||
			case <-stopChan:
 | 
			
		||||
				return
 | 
			
		||||
			default:
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			noxuDefinitionToUpdate, err := apiExtensionsClient.ApiextensionsV1beta1().CustomResourceDefinitions().Get(noxuDefinition.Name, metav1.GetOptions{})
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Fatal(err)
 | 
			
		||||
			}
 | 
			
		||||
			if len(noxuDefinitionToUpdate.Spec.Versions) == 1 {
 | 
			
		||||
				v2 := noxuDefinitionToUpdate.Spec.Versions[0]
 | 
			
		||||
				v2.Name = "v2"
 | 
			
		||||
				v2.Served = true
 | 
			
		||||
				v2.Storage = false
 | 
			
		||||
				noxuDefinitionToUpdate.Spec.Versions = append(noxuDefinitionToUpdate.Spec.Versions, v2)
 | 
			
		||||
			} else {
 | 
			
		||||
				noxuDefinitionToUpdate.Spec.Versions = noxuDefinitionToUpdate.Spec.Versions[0:1]
 | 
			
		||||
			}
 | 
			
		||||
			if _, err := apiExtensionsClient.ApiextensionsV1beta1().CustomResourceDefinitions().Update(noxuDefinitionToUpdate); err != nil && !apierrors.IsConflict(err) {
 | 
			
		||||
				t.Fatal(err)
 | 
			
		||||
			}
 | 
			
		||||
			time.Sleep(10 * time.Millisecond)
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	// Set up 100 loops creating and reading custom resources
 | 
			
		||||
	for i := 0; i < 100; i++ {
 | 
			
		||||
		wg.Add(1)
 | 
			
		||||
		go func(i int) {
 | 
			
		||||
			defer wg.Done()
 | 
			
		||||
			noxuInstanceToCreate := fixtures.NewNoxuInstance(ns, fmt.Sprintf("foo-%d", i))
 | 
			
		||||
			if _, err := noxuNamespacedResourceClient.Create(noxuInstanceToCreate, metav1.CreateOptions{}); err != nil {
 | 
			
		||||
				t.Fatal(err)
 | 
			
		||||
			}
 | 
			
		||||
			for {
 | 
			
		||||
				select {
 | 
			
		||||
				case <-stopChan:
 | 
			
		||||
					return
 | 
			
		||||
				default:
 | 
			
		||||
					if _, err := noxuNamespacedResourceClient.Get(noxuInstanceToCreate.GetName(), metav1.GetOptions{}); err != nil {
 | 
			
		||||
						t.Fatal(err)
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
				time.Sleep(10 * time.Millisecond)
 | 
			
		||||
			}
 | 
			
		||||
		}(i)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Let all the established get request loops soak
 | 
			
		||||
	time.Sleep(5 * time.Second)
 | 
			
		||||
 | 
			
		||||
	// Tear down
 | 
			
		||||
	close(stopChan)
 | 
			
		||||
 | 
			
		||||
	// Let loops drain
 | 
			
		||||
	wg.Wait()
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user