Fixup kube-openapi/pkg/cached changes

This commit is contained in:
Dr. Stefan Schimanski 2023-07-23 19:46:20 +02:00
parent e1ffbacb7a
commit dad1b2a430
No known key found for this signature in database
GPG Key ID: 4C68E0F19F95EC33
3 changed files with 50 additions and 51 deletions

View File

@ -66,24 +66,23 @@ type Controller struct {
// changed. crdCache is a cached.Replaceable and updates are thread // changed. crdCache is a cached.Replaceable and updates are thread
// safe. Thus, no lock is needed to protect this struct. // safe. Thus, no lock is needed to protect this struct.
type specCache struct { type specCache struct {
crdCache cached.Replaceable[*apiextensionsv1.CustomResourceDefinition] crdCache cached.LastSuccess[*apiextensionsv1.CustomResourceDefinition]
mergedVersionSpec cached.Data[*spec.Swagger] mergedVersionSpec cached.Value[*spec.Swagger]
} }
func (s *specCache) update(crd *apiextensionsv1.CustomResourceDefinition) { func (s *specCache) update(crd *apiextensionsv1.CustomResourceDefinition) {
s.crdCache.Replace(cached.NewResultOK(crd, generateCRDHash(crd))) s.crdCache.Store(cached.Static(crd, generateCRDHash(crd)))
} }
func createSpecCache(crd *apiextensionsv1.CustomResourceDefinition) *specCache { func createSpecCache(crd *apiextensionsv1.CustomResourceDefinition) *specCache {
s := specCache{} s := specCache{}
s.update(crd) s.update(crd)
s.mergedVersionSpec = cached.NewTransformer[*apiextensionsv1.CustomResourceDefinition](func(result cached.Result[*apiextensionsv1.CustomResourceDefinition]) cached.Result[*spec.Swagger] { s.mergedVersionSpec = cached.Transform[*apiextensionsv1.CustomResourceDefinition](func(crd *apiextensionsv1.CustomResourceDefinition, etag string, err error) (*spec.Swagger, string, error) {
if result.Err != nil { if err != nil {
// This should never happen, but return the err if it does. // This should never happen, but return the err if it does.
return cached.NewResultErr[*spec.Swagger](result.Err) return nil, "", err
} }
crd := result.Data
mergeSpec := &spec.Swagger{} mergeSpec := &spec.Swagger{}
for _, v := range crd.Spec.Versions { for _, v := range crd.Spec.Versions {
if !v.Served { if !v.Served {
@ -93,15 +92,15 @@ func createSpecCache(crd *apiextensionsv1.CustomResourceDefinition) *specCache {
// Defaults must be pruned here for CRDs to cleanly merge with the static // Defaults must be pruned here for CRDs to cleanly merge with the static
// spec that already has defaults pruned // spec that already has defaults pruned
if err != nil { if err != nil {
return cached.NewResultErr[*spec.Swagger](err) return nil, "", err
} }
s.Definitions = handler.PruneDefaults(s.Definitions) s.Definitions = handler.PruneDefaults(s.Definitions)
mergeSpec, err = builder.MergeSpecs(mergeSpec, s) mergeSpec, err = builder.MergeSpecs(mergeSpec, s)
if err != nil { if err != nil {
return cached.NewResultErr[*spec.Swagger](err) return nil, "", err
} }
} }
return cached.NewResultOK(mergeSpec, generateCRDHash(crd)) return mergeSpec, generateCRDHash(crd), nil
}, &s.crdCache) }, &s.crdCache)
return &s return &s
} }
@ -234,27 +233,27 @@ func (c *Controller) sync(name string) error {
// updateSpecLocked updates the cached spec graph. // updateSpecLocked updates the cached spec graph.
func (c *Controller) updateSpecLocked() { func (c *Controller) updateSpecLocked() {
specList := make([]cached.Data[*spec.Swagger], 0, len(c.specsByName)) specList := make([]cached.Value[*spec.Swagger], 0, len(c.specsByName))
for crd := range c.specsByName { for crd := range c.specsByName {
specList = append(specList, c.specsByName[crd].mergedVersionSpec) specList = append(specList, c.specsByName[crd].mergedVersionSpec)
} }
cache := cached.NewListMerger(func(results []cached.Result[*spec.Swagger]) cached.Result[*spec.Swagger] { cache := cached.MergeList(func(results []cached.Result[*spec.Swagger]) (*spec.Swagger, string, error) {
localCRDSpec := make([]*spec.Swagger, 0, len(results)) localCRDSpec := make([]*spec.Swagger, 0, len(results))
for k := range results { for k := range results {
if results[k].Err == nil { if results[k].Err == nil {
localCRDSpec = append(localCRDSpec, results[k].Data) localCRDSpec = append(localCRDSpec, results[k].Value)
} }
} }
mergedSpec, err := builder.MergeSpecs(c.staticSpec, localCRDSpec...) mergedSpec, err := builder.MergeSpecs(c.staticSpec, localCRDSpec...)
if err != nil { if err != nil {
return cached.NewResultErr[*spec.Swagger](fmt.Errorf("failed to merge specs: %v", err)) return nil, "", fmt.Errorf("failed to merge specs: %v", err)
} }
// A UUID is returned for the etag because we will only // A UUID is returned for the etag because we will only
// create a new merger when a CRD has changed. A hash based // create a new merger when a CRD has changed. A hash based
// etag is more expensive because the CRDs are not // etag is more expensive because the CRDs are not
// premarshalled. // premarshalled.
return cached.NewResultOK(mergedSpec, uuid.New().String()) return mergedSpec, uuid.New().String(), nil
}, specList) }, specList)
c.openAPIService.UpdateSpecLazy(cache) c.openAPIService.UpdateSpecLazy(cache)
} }

View File

@ -63,11 +63,11 @@ const (
type openAPISpecInfo struct { type openAPISpecInfo struct {
apiService v1.APIService apiService v1.APIService
// spec is the cached OpenAPI spec // spec is the cached OpenAPI spec
spec cached.Replaceable[*spec.Swagger] spec cached.LastSuccess[*spec.Swagger]
// The downloader is used only for non-local apiservices to // The downloader is used only for non-local apiservices to
// re-update the spec every so often. // re-update the spec every so often.
downloader cached.Data[*spec.Swagger] downloader cached.Value[*spec.Swagger]
} }
type specAggregator struct { type specAggregator struct {
@ -88,7 +88,7 @@ func buildAndRegisterSpecAggregatorForLocalServices(downloader *Downloader, aggr
downloader: downloader, downloader: downloader,
specsByAPIServiceName: map[string]*openAPISpecInfo{}, specsByAPIServiceName: map[string]*openAPISpecInfo{},
} }
cachedAggregatorSpec := cached.NewResultOK(aggregatorSpec, "never-changes") cachedAggregatorSpec := cached.Static(aggregatorSpec, "never-changes")
s.addLocalSpec(fmt.Sprintf(localDelegateChainNamePattern, 0), cachedAggregatorSpec) s.addLocalSpec(fmt.Sprintf(localDelegateChainNamePattern, 0), cachedAggregatorSpec)
for i, handler := range delegationHandlers { for i, handler := range delegationHandlers {
name := fmt.Sprintf(localDelegateChainNamePattern, i+1) name := fmt.Sprintf(localDelegateChainNamePattern, i+1)
@ -132,55 +132,55 @@ func BuildAndRegisterAggregator(downloader *Downloader, delegationTarget server.
return s, nil return s, nil
} }
func (s *specAggregator) addLocalSpec(name string, spec cached.Data[*spec.Swagger]) { func (s *specAggregator) addLocalSpec(name string, cachedSpec cached.Value[*spec.Swagger]) {
service := v1.APIService{} service := v1.APIService{}
service.Name = name service.Name = name
info := &openAPISpecInfo{ info := &openAPISpecInfo{
apiService: service, apiService: service,
} }
info.spec.Replace(spec) info.spec.Store(cachedSpec)
s.specsByAPIServiceName[name] = info s.specsByAPIServiceName[name] = info
} }
// buildMergeSpecLocked creates a new cached mergeSpec from the list of cached specs. // buildMergeSpecLocked creates a new cached mergeSpec from the list of cached specs.
func (s *specAggregator) buildMergeSpecLocked() cached.Data[*spec.Swagger] { func (s *specAggregator) buildMergeSpecLocked() cached.Value[*spec.Swagger] {
apiServices := make([]*v1.APIService, 0, len(s.specsByAPIServiceName)) apiServices := make([]*v1.APIService, 0, len(s.specsByAPIServiceName))
for k := range s.specsByAPIServiceName { for k := range s.specsByAPIServiceName {
apiServices = append(apiServices, &s.specsByAPIServiceName[k].apiService) apiServices = append(apiServices, &s.specsByAPIServiceName[k].apiService)
} }
sortByPriority(apiServices) sortByPriority(apiServices)
caches := make([]cached.Data[*spec.Swagger], len(apiServices)) caches := make([]cached.Value[*spec.Swagger], len(apiServices))
for i, apiService := range apiServices { for i, apiService := range apiServices {
caches[i] = &(s.specsByAPIServiceName[apiService.Name].spec) caches[i] = &(s.specsByAPIServiceName[apiService.Name].spec)
} }
return cached.NewListMerger(func(results []cached.Result[*spec.Swagger]) cached.Result[*spec.Swagger] { return cached.MergeList(func(results []cached.Result[*spec.Swagger]) (*spec.Swagger, string, error) {
var merged *spec.Swagger var merged *spec.Swagger
etags := make([]string, 0, len(results)) etags := make([]string, 0, len(results))
for _, specInfo := range results { for _, specInfo := range results {
result := specInfo.Get() result, etag, err := specInfo.Get()
if result.Err != nil { if err != nil {
// APIService name and err message will be included in // APIService name and err message will be included in
// the error message as part of decorateError // the error message as part of decorateError
klog.Warning(result.Err) klog.Warning(err)
continue continue
} }
if merged == nil { if merged == nil {
merged = &spec.Swagger{} merged = &spec.Swagger{}
*merged = *result.Data *merged = *result
// Paths, Definitions and parameters are set by // Paths, Definitions and parameters are set by
// MergeSpecsIgnorePathConflictRenamingDefinitionsAndParameters // MergeSpecsIgnorePathConflictRenamingDefinitionsAndParameters
merged.Paths = nil merged.Paths = nil
merged.Definitions = nil merged.Definitions = nil
merged.Parameters = nil merged.Parameters = nil
} }
etags = append(etags, result.Etag) etags = append(etags, etag)
if err := aggregator.MergeSpecsIgnorePathConflictRenamingDefinitionsAndParameters(merged, result.Data); err != nil { if err := aggregator.MergeSpecsIgnorePathConflictRenamingDefinitionsAndParameters(merged, result); err != nil {
return cached.NewResultErr[*spec.Swagger](fmt.Errorf("failed to build merge specs: %v", err)) return nil, "", fmt.Errorf("failed to build merge specs: %v", err)
} }
} }
// Printing the etags list is stable because it is sorted. // Printing the etags list is stable because it is sorted.
return cached.NewResultOK(merged, fmt.Sprintf("%x", sha256.Sum256([]byte(fmt.Sprintf("%#v", etags))))) return merged, fmt.Sprintf("%x", sha256.Sum256([]byte(fmt.Sprintf("%#v", etags)))), nil
}, caches) }, caches)
} }
@ -191,15 +191,15 @@ func (s *specAggregator) updateServiceLocked(name string) error {
if !exists { if !exists {
return ErrAPIServiceNotFound return ErrAPIServiceNotFound
} }
result := specInfo.downloader.Get() result, etag, err := specInfo.downloader.Get()
filteredResult := cached.NewTransformer[*spec.Swagger](func(result cached.Result[*spec.Swagger]) cached.Result[*spec.Swagger] { filteredResult := cached.Transform[*spec.Swagger](func(result *spec.Swagger, etag string, err error) (*spec.Swagger, string, error) {
if result.Err != nil { if err != nil {
return result return nil, "", err
} }
return cached.NewResultOK(aggregator.FilterSpecByPathsWithoutSideEffects(result.Data, []string{"/apis/"}), result.Etag) return aggregator.FilterSpecByPathsWithoutSideEffects(result, []string{"/apis/"}), etag, nil
}, result) }, cached.Result[*spec.Swagger]{Value: result, Etag: etag, Err: err})
specInfo.spec.Replace(filteredResult) specInfo.spec.Store(filteredResult)
return result.Err return err
} }
// UpdateAPIServiceSpec updates the api service. It is thread safe. // UpdateAPIServiceSpec updates the api service. It is thread safe.
@ -246,11 +246,11 @@ func (s *specAggregator) RemoveAPIService(apiServiceName string) {
// decorateError creates a new cache that wraps a downloader // decorateError creates a new cache that wraps a downloader
// cache the name of the apiservice to help with debugging. // cache the name of the apiservice to help with debugging.
func decorateError(name string, cache cached.Data[*spec.Swagger]) cached.Data[*spec.Swagger] { func decorateError(name string, cache cached.Value[*spec.Swagger]) cached.Value[*spec.Swagger] {
return cached.NewTransformer(func(result cached.Result[*spec.Swagger]) cached.Result[*spec.Swagger] { return cached.Transform(func(result *spec.Swagger, etag string, err error) (*spec.Swagger, string, error) {
if result.Err != nil { if err != nil {
return cached.NewResultErr[*spec.Swagger](fmt.Errorf("failed to download %v: %v", name, result.Err)) return nil, "", fmt.Errorf("failed to download %v: %v", name, err)
} }
return result return result, etag, err
}, cache) }, cache)
} }

View File

@ -37,18 +37,18 @@ type cacheableDownloader struct {
spec *spec.Swagger spec *spec.Swagger
} }
// Creates a downloader that also returns the etag, making it useful to use as a cached dependency. // NewCacheableDownloader creates a downloader that also returns the etag, making it useful to use as a cached dependency.
func NewCacheableDownloader(downloader *Downloader, handler http.Handler) cached.Data[*spec.Swagger] { func NewCacheableDownloader(downloader *Downloader, handler http.Handler) cached.Value[*spec.Swagger] {
return &cacheableDownloader{ return &cacheableDownloader{
downloader: downloader, downloader: downloader,
handler: handler, handler: handler,
} }
} }
func (d *cacheableDownloader) Get() cached.Result[*spec.Swagger] { func (d *cacheableDownloader) Get() (*spec.Swagger, string, error) {
swagger, etag, status, err := d.downloader.Download(d.handler, d.etag) swagger, etag, status, err := d.downloader.Download(d.handler, d.etag)
if err != nil { if err != nil {
return cached.NewResultErr[*spec.Swagger](err) return nil, "", err
} }
switch status { switch status {
case http.StatusNotModified: case http.StatusNotModified:
@ -61,11 +61,11 @@ func (d *cacheableDownloader) Get() cached.Result[*spec.Swagger] {
} }
fallthrough fallthrough
case http.StatusNotFound: case http.StatusNotFound:
return cached.NewResultErr[*spec.Swagger](ErrAPIServiceNotFound) return nil, "", ErrAPIServiceNotFound
default: default:
return cached.NewResultErr[*spec.Swagger](fmt.Errorf("invalid status code: %v", status)) return nil, "", fmt.Errorf("invalid status code: %v", status)
} }
return cached.NewResultOK(d.spec, d.etag) return d.spec, d.etag, nil
} }
// Downloader is the OpenAPI downloader type. It will try to download spec from /openapi/v2 or /swagger.json endpoint. // Downloader is the OpenAPI downloader type. It will try to download spec from /openapi/v2 or /swagger.json endpoint.