refactor customresource handler

This commit is contained in:
hzxuzhonghu 2018-01-05 16:13:51 +08:00
parent 17ffacc37f
commit 86ffa59d34
2 changed files with 75 additions and 66 deletions

View File

@ -177,7 +177,6 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
versionDiscoveryHandler, versionDiscoveryHandler,
groupDiscoveryHandler, groupDiscoveryHandler,
s.GenericAPIServer.RequestContextMapper(), s.GenericAPIServer.RequestContextMapper(),
s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions().Lister(),
s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(),
delegateHandler, delegateHandler,
c.ExtraConfig.CRDRESTOptionsGetter, c.ExtraConfig.CRDRESTOptionsGetter,

View File

@ -44,12 +44,13 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/endpoints/handlers" "k8s.io/apiserver/pkg/endpoints/handlers"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
apirequest "k8s.io/apiserver/pkg/endpoints/request" apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/generic" "k8s.io/apiserver/pkg/registry/generic"
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/client-go/discovery" "k8s.io/client-go/discovery"
cache "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions" "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
apiservervalidation "k8s.io/apiextensions-apiserver/pkg/apiserver/validation" apiservervalidation "k8s.io/apiextensions-apiserver/pkg/apiserver/validation"
@ -67,6 +68,9 @@ type crdHandler struct {
customStorageLock sync.Mutex customStorageLock sync.Mutex
// customStorage contains a crdStorageMap // customStorage contains a crdStorageMap
// atomic.Value has a very good read performance compared to sync.RWMutex
// see https://gist.github.com/dim/152e6bf80e1384ea72e17ac717a5000a
// which is suited for most read and rarely write cases
customStorage atomic.Value customStorage atomic.Value
requestContextMapper apirequest.RequestContextMapper requestContextMapper apirequest.RequestContextMapper
@ -96,7 +100,6 @@ func NewCustomResourceDefinitionHandler(
versionDiscoveryHandler *versionDiscoveryHandler, versionDiscoveryHandler *versionDiscoveryHandler,
groupDiscoveryHandler *groupDiscoveryHandler, groupDiscoveryHandler *groupDiscoveryHandler,
requestContextMapper apirequest.RequestContextMapper, requestContextMapper apirequest.RequestContextMapper,
crdLister listers.CustomResourceDefinitionLister,
crdInformer informers.CustomResourceDefinitionInformer, crdInformer informers.CustomResourceDefinitionInformer,
delegate http.Handler, delegate http.Handler,
restOptionsGetter generic.RESTOptionsGetter, restOptionsGetter generic.RESTOptionsGetter,
@ -106,7 +109,7 @@ func NewCustomResourceDefinitionHandler(
groupDiscoveryHandler: groupDiscoveryHandler, groupDiscoveryHandler: groupDiscoveryHandler,
customStorage: atomic.Value{}, customStorage: atomic.Value{},
requestContextMapper: requestContextMapper, requestContextMapper: requestContextMapper,
crdLister: crdLister, crdLister: crdInformer.Lister(),
delegate: delegate, delegate: delegate,
restOptionsGetter: restOptionsGetter, restOptionsGetter: restOptionsGetter,
admission: admission, admission: admission,
@ -120,19 +123,20 @@ func NewCustomResourceDefinitionHandler(
}) })
ret.customStorage.Store(crdStorageMap{}) ret.customStorage.Store(crdStorageMap{})
return ret return ret
} }
func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
ctx, ok := r.requestContextMapper.Get(req) ctx, ok := r.requestContextMapper.Get(req)
if !ok { if !ok {
// programmer error responsewriters.InternalError(w, req, fmt.Errorf("no context found for request"))
panic("missing context") return
} }
requestInfo, ok := apirequest.RequestInfoFrom(ctx) requestInfo, ok := apirequest.RequestInfoFrom(ctx)
if !ok { if !ok {
// programmer error responsewriters.InternalError(w, req, fmt.Errorf("no RequestInfo found in the context"))
panic("missing requestInfo") return
} }
if !requestInfo.IsResourceRequest { if !requestInfo.IsResourceRequest {
pathParts := splitPath(requestInfo.Path) pathParts := splitPath(requestInfo.Path)
@ -168,6 +172,7 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
} }
if !apiextensions.IsCRDConditionTrue(crd, apiextensions.Established) { if !apiextensions.IsCRDConditionTrue(crd, apiextensions.Established) {
r.delegate.ServeHTTP(w, req) r.delegate.ServeHTTP(w, req)
return
} }
if len(requestInfo.Subresource) > 0 { if len(requestInfo.Subresource) > 0 {
http.NotFound(w, req) http.NotFound(w, req)
@ -176,7 +181,7 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
terminating := apiextensions.IsCRDConditionTrue(crd, apiextensions.Terminating) terminating := apiextensions.IsCRDConditionTrue(crd, apiextensions.Terminating)
crdInfo, err := r.getServingInfoFor(crd) crdInfo, err := r.getOrCreateServingInfoFor(crd)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
@ -242,19 +247,52 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
} }
} }
func (r *crdHandler) updateCustomResourceDefinition(oldObj, newObj interface{}) {
oldCRD := oldObj.(*apiextensions.CustomResourceDefinition)
newCRD := newObj.(*apiextensions.CustomResourceDefinition)
r.customStorageLock.Lock()
defer r.customStorageLock.Unlock()
storageMap := r.customStorage.Load().(crdStorageMap)
oldInfo, found := storageMap[newCRD.UID]
if !found {
return
}
if apiequality.Semantic.DeepEqual(&newCRD.Spec, oldInfo.spec) && apiequality.Semantic.DeepEqual(&newCRD.Status.AcceptedNames, oldInfo.acceptedNames) {
glog.V(6).Infof("Ignoring customresourcedefinition %s update because neither spec, nor accepted names changed", oldCRD.Name)
return
}
glog.V(4).Infof("Updating customresourcedefinition %s", oldCRD.Name)
// Copy because we cannot write to storageMap without a race
// as it is used without locking elsewhere.
storageMap2 := storageMap.clone()
if oldInfo, ok := storageMap2[types.UID(oldCRD.UID)]; ok {
oldInfo.storage.DestroyFunc()
delete(storageMap2, types.UID(oldCRD.UID))
}
r.customStorage.Store(storageMap2)
}
// removeDeadStorage removes REST storage that isn't being used // removeDeadStorage removes REST storage that isn't being used
func (r *crdHandler) removeDeadStorage() { func (r *crdHandler) removeDeadStorage() {
// these don't have to be live. A snapshot is fine
// if we wrongly delete, that's ok. The rest storage will be recreated on the next request
// if we wrongly miss one, that's ok. We'll get it next time
storageMap := r.customStorage.Load().(crdStorageMap)
allCustomResourceDefinitions, err := r.crdLister.List(labels.Everything()) allCustomResourceDefinitions, err := r.crdLister.List(labels.Everything())
if err != nil { if err != nil {
utilruntime.HandleError(err) utilruntime.HandleError(err)
return return
} }
for uid, s := range storageMap { r.customStorageLock.Lock()
defer r.customStorageLock.Unlock()
storageMap := r.customStorage.Load().(crdStorageMap)
// Copy because we cannot write to storageMap without a race
// as it is used without locking elsewhere
storageMap2 := storageMap.clone()
for uid, s := range storageMap2 {
found := false found := false
for _, crd := range allCustomResourceDefinitions { for _, crd := range allCustomResourceDefinitions {
if crd.UID == uid { if crd.UID == uid {
@ -265,38 +303,33 @@ func (r *crdHandler) removeDeadStorage() {
if !found { if !found {
glog.V(4).Infof("Removing dead CRD storage for %v", s.requestScope.Resource) glog.V(4).Infof("Removing dead CRD storage for %v", s.requestScope.Resource)
s.storage.DestroyFunc() s.storage.DestroyFunc()
delete(storageMap, uid) delete(storageMap2, uid)
} }
} }
r.customStorage.Store(storageMap2)
r.customStorageLock.Lock()
defer r.customStorageLock.Unlock()
r.customStorage.Store(storageMap)
} }
// GetCustomResourceListerCollectionDeleter returns the ListerCollectionDeleter for // GetCustomResourceListerCollectionDeleter returns the ListerCollectionDeleter for
// the given uid, or nil if one does not exist. // the given uid, or nil if one does not exist.
func (r *crdHandler) GetCustomResourceListerCollectionDeleter(crd *apiextensions.CustomResourceDefinition) finalizer.ListerCollectionDeleter { func (r *crdHandler) GetCustomResourceListerCollectionDeleter(crd *apiextensions.CustomResourceDefinition) finalizer.ListerCollectionDeleter {
info, err := r.getServingInfoFor(crd) info, err := r.getOrCreateServingInfoFor(crd)
if err != nil { if err != nil {
utilruntime.HandleError(err) utilruntime.HandleError(err)
} }
return info.storage return info.storage
} }
func (r *crdHandler) getServingInfoFor(crd *apiextensions.CustomResourceDefinition) (*crdInfo, error) { func (r *crdHandler) getOrCreateServingInfoFor(crd *apiextensions.CustomResourceDefinition) (*crdInfo, error) {
storageMap := r.customStorage.Load().(crdStorageMap) storageMap := r.customStorage.Load().(crdStorageMap)
ret, ok := storageMap[crd.UID] if ret, ok := storageMap[crd.UID]; ok {
if ok {
return ret, nil return ret, nil
} }
r.customStorageLock.Lock() r.customStorageLock.Lock()
defer r.customStorageLock.Unlock() defer r.customStorageLock.Unlock()
ret, ok = storageMap[crd.UID] storageMap = r.customStorage.Load().(crdStorageMap)
if ok { if ret, ok := storageMap[crd.UID]; ok {
return ret, nil return ret, nil
} }
@ -384,7 +417,7 @@ func (r *crdHandler) getServingInfoFor(crd *apiextensions.CustomResourceDefiniti
MetaGroupVersion: metav1.SchemeGroupVersion, MetaGroupVersion: metav1.SchemeGroupVersion,
} }
ret = &crdInfo{ ret := &crdInfo{
spec: &crd.Spec, spec: &crd.Spec,
acceptedNames: &crd.Status.AcceptedNames, acceptedNames: &crd.Status.AcceptedNames,
@ -392,16 +425,13 @@ func (r *crdHandler) getServingInfoFor(crd *apiextensions.CustomResourceDefiniti
requestScope: requestScope, requestScope: requestScope,
} }
storageMap2 := make(crdStorageMap, len(storageMap))
// Copy because we cannot write to storageMap without a race // Copy because we cannot write to storageMap without a race
// as it is used without locking elsewhere // as it is used without locking elsewhere.
for k, v := range storageMap { storageMap2 := storageMap.clone()
storageMap2[k] = v
}
storageMap2[crd.UID] = ret storageMap2[crd.UID] = ret
r.customStorage.Store(storageMap2) r.customStorage.Store(storageMap2)
return ret, nil return ret, nil
} }
@ -423,39 +453,6 @@ func (c crdObjectConverter) ConvertFieldLabel(version, kind, label, value string
} }
} }
func (c *crdHandler) updateCustomResourceDefinition(oldObj, newObj interface{}) {
oldCRD := oldObj.(*apiextensions.CustomResourceDefinition)
newCRD := newObj.(*apiextensions.CustomResourceDefinition)
c.customStorageLock.Lock()
defer c.customStorageLock.Unlock()
storageMap := c.customStorage.Load().(crdStorageMap)
oldInfo, found := storageMap[newCRD.UID]
if !found {
return
}
if apiequality.Semantic.DeepEqual(&newCRD.Spec, oldInfo.spec) && apiequality.Semantic.DeepEqual(&newCRD.Status.AcceptedNames, oldInfo.acceptedNames) {
glog.V(6).Infof("Ignoring customresourcedefinition %s update because neither spec, nor accepted names changed", oldCRD.Name)
return
}
glog.V(4).Infof("Updating customresourcedefinition %s", oldCRD.Name)
storageMap2 := make(crdStorageMap, len(storageMap))
// Copy because we cannot write to storageMap without a race
// as it is used without locking elsewhere
for k, v := range storageMap {
if k == oldCRD.UID {
v.storage.DestroyFunc()
continue
}
storageMap2[k] = v
}
c.customStorage.Store(storageMap2)
}
type unstructuredNegotiatedSerializer struct { type unstructuredNegotiatedSerializer struct {
typer runtime.ObjectTyper typer runtime.ObjectTyper
creator runtime.ObjectCreater creator runtime.ObjectCreater
@ -578,3 +575,16 @@ func (t CRDRESTOptionsGetter) GetRESTOptions(resource schema.GroupResource) (gen
} }
return ret, nil return ret, nil
} }
// clone returns a clone of the provided crdStorageMap.
// The clone is a shallow copy of the map.
func (in crdStorageMap) clone() crdStorageMap {
if in == nil {
return nil
}
out := make(crdStorageMap, len(in))
for key, value := range in {
out[key] = value
}
return out
}