mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 06:27:05 +00:00
Merge pull request #57883 from hzxuzhonghu/crd-handler
Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. refactor customeresource handler **What this PR does / why we need it**: - fix data race bug - fix lock usage bug. - remove some redundant code **Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*: Fixes #57882 **Special notes for your reviewer**: **Release note**: ```release-note NONE ```
This commit is contained in:
commit
9b6ac17f42
@ -1774,6 +1774,10 @@
|
||||
"ImportPath": "k8s.io/apiserver/pkg/endpoints/handlers",
|
||||
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters",
|
||||
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/apiserver/pkg/endpoints/request",
|
||||
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||
|
@ -54,6 +54,7 @@ go_library(
|
||||
"//vendor/k8s.io/apiserver/pkg/admission:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/endpoints/discovery:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/endpoints/handlers:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/registry/generic:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library",
|
||||
|
@ -177,7 +177,6 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
|
||||
versionDiscoveryHandler,
|
||||
groupDiscoveryHandler,
|
||||
s.GenericAPIServer.RequestContextMapper(),
|
||||
s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions().Lister(),
|
||||
s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(),
|
||||
delegateHandler,
|
||||
c.ExtraConfig.CRDRESTOptionsGetter,
|
||||
|
@ -44,12 +44,13 @@ import (
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apiserver/pkg/admission"
|
||||
"k8s.io/apiserver/pkg/endpoints/handlers"
|
||||
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
|
||||
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/registry/generic"
|
||||
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||
"k8s.io/client-go/discovery"
|
||||
cache "k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
|
||||
apiservervalidation "k8s.io/apiextensions-apiserver/pkg/apiserver/validation"
|
||||
@ -67,6 +68,9 @@ type crdHandler struct {
|
||||
|
||||
customStorageLock sync.Mutex
|
||||
// customStorage contains a crdStorageMap
|
||||
// atomic.Value has a very good read performance compared to sync.RWMutex
|
||||
// see https://gist.github.com/dim/152e6bf80e1384ea72e17ac717a5000a
|
||||
// which is suited for most read and rarely write cases
|
||||
customStorage atomic.Value
|
||||
|
||||
requestContextMapper apirequest.RequestContextMapper
|
||||
@ -96,7 +100,6 @@ func NewCustomResourceDefinitionHandler(
|
||||
versionDiscoveryHandler *versionDiscoveryHandler,
|
||||
groupDiscoveryHandler *groupDiscoveryHandler,
|
||||
requestContextMapper apirequest.RequestContextMapper,
|
||||
crdLister listers.CustomResourceDefinitionLister,
|
||||
crdInformer informers.CustomResourceDefinitionInformer,
|
||||
delegate http.Handler,
|
||||
restOptionsGetter generic.RESTOptionsGetter,
|
||||
@ -106,7 +109,7 @@ func NewCustomResourceDefinitionHandler(
|
||||
groupDiscoveryHandler: groupDiscoveryHandler,
|
||||
customStorage: atomic.Value{},
|
||||
requestContextMapper: requestContextMapper,
|
||||
crdLister: crdLister,
|
||||
crdLister: crdInformer.Lister(),
|
||||
delegate: delegate,
|
||||
restOptionsGetter: restOptionsGetter,
|
||||
admission: admission,
|
||||
@ -120,19 +123,20 @@ func NewCustomResourceDefinitionHandler(
|
||||
})
|
||||
|
||||
ret.customStorage.Store(crdStorageMap{})
|
||||
|
||||
return ret
|
||||
}
|
||||
|
||||
func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
ctx, ok := r.requestContextMapper.Get(req)
|
||||
if !ok {
|
||||
// programmer error
|
||||
panic("missing context")
|
||||
responsewriters.InternalError(w, req, fmt.Errorf("no context found for request"))
|
||||
return
|
||||
}
|
||||
requestInfo, ok := apirequest.RequestInfoFrom(ctx)
|
||||
if !ok {
|
||||
// programmer error
|
||||
panic("missing requestInfo")
|
||||
responsewriters.InternalError(w, req, fmt.Errorf("no RequestInfo found in the context"))
|
||||
return
|
||||
}
|
||||
if !requestInfo.IsResourceRequest {
|
||||
pathParts := splitPath(requestInfo.Path)
|
||||
@ -168,6 +172,7 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
}
|
||||
if !apiextensions.IsCRDConditionTrue(crd, apiextensions.Established) {
|
||||
r.delegate.ServeHTTP(w, req)
|
||||
return
|
||||
}
|
||||
if len(requestInfo.Subresource) > 0 {
|
||||
http.NotFound(w, req)
|
||||
@ -176,7 +181,7 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
|
||||
terminating := apiextensions.IsCRDConditionTrue(crd, apiextensions.Terminating)
|
||||
|
||||
crdInfo, err := r.getServingInfoFor(crd)
|
||||
crdInfo, err := r.getOrCreateServingInfoFor(crd)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
@ -242,19 +247,52 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
func (r *crdHandler) updateCustomResourceDefinition(oldObj, newObj interface{}) {
|
||||
oldCRD := oldObj.(*apiextensions.CustomResourceDefinition)
|
||||
newCRD := newObj.(*apiextensions.CustomResourceDefinition)
|
||||
|
||||
r.customStorageLock.Lock()
|
||||
defer r.customStorageLock.Unlock()
|
||||
|
||||
storageMap := r.customStorage.Load().(crdStorageMap)
|
||||
oldInfo, found := storageMap[newCRD.UID]
|
||||
if !found {
|
||||
return
|
||||
}
|
||||
if apiequality.Semantic.DeepEqual(&newCRD.Spec, oldInfo.spec) && apiequality.Semantic.DeepEqual(&newCRD.Status.AcceptedNames, oldInfo.acceptedNames) {
|
||||
glog.V(6).Infof("Ignoring customresourcedefinition %s update because neither spec, nor accepted names changed", oldCRD.Name)
|
||||
return
|
||||
}
|
||||
|
||||
glog.V(4).Infof("Updating customresourcedefinition %s", oldCRD.Name)
|
||||
|
||||
// Copy because we cannot write to storageMap without a race
|
||||
// as it is used without locking elsewhere.
|
||||
storageMap2 := storageMap.clone()
|
||||
if oldInfo, ok := storageMap2[types.UID(oldCRD.UID)]; ok {
|
||||
oldInfo.storage.DestroyFunc()
|
||||
delete(storageMap2, types.UID(oldCRD.UID))
|
||||
}
|
||||
|
||||
r.customStorage.Store(storageMap2)
|
||||
}
|
||||
|
||||
// removeDeadStorage removes REST storage that isn't being used
|
||||
func (r *crdHandler) removeDeadStorage() {
|
||||
// these don't have to be live. A snapshot is fine
|
||||
// if we wrongly delete, that's ok. The rest storage will be recreated on the next request
|
||||
// if we wrongly miss one, that's ok. We'll get it next time
|
||||
storageMap := r.customStorage.Load().(crdStorageMap)
|
||||
allCustomResourceDefinitions, err := r.crdLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
return
|
||||
}
|
||||
|
||||
for uid, s := range storageMap {
|
||||
r.customStorageLock.Lock()
|
||||
defer r.customStorageLock.Unlock()
|
||||
|
||||
storageMap := r.customStorage.Load().(crdStorageMap)
|
||||
// Copy because we cannot write to storageMap without a race
|
||||
// as it is used without locking elsewhere
|
||||
storageMap2 := storageMap.clone()
|
||||
for uid, s := range storageMap2 {
|
||||
found := false
|
||||
for _, crd := range allCustomResourceDefinitions {
|
||||
if crd.UID == uid {
|
||||
@ -265,38 +303,33 @@ func (r *crdHandler) removeDeadStorage() {
|
||||
if !found {
|
||||
glog.V(4).Infof("Removing dead CRD storage for %v", s.requestScope.Resource)
|
||||
s.storage.DestroyFunc()
|
||||
delete(storageMap, uid)
|
||||
delete(storageMap2, uid)
|
||||
}
|
||||
}
|
||||
|
||||
r.customStorageLock.Lock()
|
||||
defer r.customStorageLock.Unlock()
|
||||
|
||||
r.customStorage.Store(storageMap)
|
||||
r.customStorage.Store(storageMap2)
|
||||
}
|
||||
|
||||
// GetCustomResourceListerCollectionDeleter returns the ListerCollectionDeleter for
|
||||
// the given uid, or nil if one does not exist.
|
||||
func (r *crdHandler) GetCustomResourceListerCollectionDeleter(crd *apiextensions.CustomResourceDefinition) finalizer.ListerCollectionDeleter {
|
||||
info, err := r.getServingInfoFor(crd)
|
||||
info, err := r.getOrCreateServingInfoFor(crd)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
}
|
||||
return info.storage
|
||||
}
|
||||
|
||||
func (r *crdHandler) getServingInfoFor(crd *apiextensions.CustomResourceDefinition) (*crdInfo, error) {
|
||||
func (r *crdHandler) getOrCreateServingInfoFor(crd *apiextensions.CustomResourceDefinition) (*crdInfo, error) {
|
||||
storageMap := r.customStorage.Load().(crdStorageMap)
|
||||
ret, ok := storageMap[crd.UID]
|
||||
if ok {
|
||||
if ret, ok := storageMap[crd.UID]; ok {
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
r.customStorageLock.Lock()
|
||||
defer r.customStorageLock.Unlock()
|
||||
|
||||
ret, ok = storageMap[crd.UID]
|
||||
if ok {
|
||||
storageMap = r.customStorage.Load().(crdStorageMap)
|
||||
if ret, ok := storageMap[crd.UID]; ok {
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
@ -384,7 +417,7 @@ func (r *crdHandler) getServingInfoFor(crd *apiextensions.CustomResourceDefiniti
|
||||
MetaGroupVersion: metav1.SchemeGroupVersion,
|
||||
}
|
||||
|
||||
ret = &crdInfo{
|
||||
ret := &crdInfo{
|
||||
spec: &crd.Spec,
|
||||
acceptedNames: &crd.Status.AcceptedNames,
|
||||
|
||||
@ -392,16 +425,13 @@ func (r *crdHandler) getServingInfoFor(crd *apiextensions.CustomResourceDefiniti
|
||||
requestScope: requestScope,
|
||||
}
|
||||
|
||||
storageMap2 := make(crdStorageMap, len(storageMap))
|
||||
|
||||
// Copy because we cannot write to storageMap without a race
|
||||
// as it is used without locking elsewhere
|
||||
for k, v := range storageMap {
|
||||
storageMap2[k] = v
|
||||
}
|
||||
// as it is used without locking elsewhere.
|
||||
storageMap2 := storageMap.clone()
|
||||
|
||||
storageMap2[crd.UID] = ret
|
||||
r.customStorage.Store(storageMap2)
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
@ -423,39 +453,6 @@ func (c crdObjectConverter) ConvertFieldLabel(version, kind, label, value string
|
||||
}
|
||||
}
|
||||
|
||||
func (c *crdHandler) updateCustomResourceDefinition(oldObj, newObj interface{}) {
|
||||
oldCRD := oldObj.(*apiextensions.CustomResourceDefinition)
|
||||
newCRD := newObj.(*apiextensions.CustomResourceDefinition)
|
||||
|
||||
c.customStorageLock.Lock()
|
||||
defer c.customStorageLock.Unlock()
|
||||
storageMap := c.customStorage.Load().(crdStorageMap)
|
||||
|
||||
oldInfo, found := storageMap[newCRD.UID]
|
||||
if !found {
|
||||
return
|
||||
}
|
||||
if apiequality.Semantic.DeepEqual(&newCRD.Spec, oldInfo.spec) && apiequality.Semantic.DeepEqual(&newCRD.Status.AcceptedNames, oldInfo.acceptedNames) {
|
||||
glog.V(6).Infof("Ignoring customresourcedefinition %s update because neither spec, nor accepted names changed", oldCRD.Name)
|
||||
return
|
||||
}
|
||||
|
||||
glog.V(4).Infof("Updating customresourcedefinition %s", oldCRD.Name)
|
||||
storageMap2 := make(crdStorageMap, len(storageMap))
|
||||
|
||||
// Copy because we cannot write to storageMap without a race
|
||||
// as it is used without locking elsewhere
|
||||
for k, v := range storageMap {
|
||||
if k == oldCRD.UID {
|
||||
v.storage.DestroyFunc()
|
||||
continue
|
||||
}
|
||||
storageMap2[k] = v
|
||||
}
|
||||
|
||||
c.customStorage.Store(storageMap2)
|
||||
}
|
||||
|
||||
type unstructuredNegotiatedSerializer struct {
|
||||
typer runtime.ObjectTyper
|
||||
creator runtime.ObjectCreater
|
||||
@ -578,3 +575,16 @@ func (t CRDRESTOptionsGetter) GetRESTOptions(resource schema.GroupResource) (gen
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// clone returns a clone of the provided crdStorageMap.
|
||||
// The clone is a shallow copy of the map.
|
||||
func (in crdStorageMap) clone() crdStorageMap {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := make(crdStorageMap, len(in))
|
||||
for key, value := range in {
|
||||
out[key] = value
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user