diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapi/controller.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapi/controller.go index ee9c0afb0fa..b2ffe6b3db8 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapi/controller.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapi/controller.go @@ -66,24 +66,23 @@ type Controller struct { // changed. crdCache is a cached.Replaceable and updates are thread // safe. Thus, no lock is needed to protect this struct. type specCache struct { - crdCache cached.Replaceable[*apiextensionsv1.CustomResourceDefinition] - mergedVersionSpec cached.Data[*spec.Swagger] + crdCache cached.LastSuccess[*apiextensionsv1.CustomResourceDefinition] + mergedVersionSpec cached.Value[*spec.Swagger] } func (s *specCache) update(crd *apiextensionsv1.CustomResourceDefinition) { - s.crdCache.Replace(cached.NewResultOK(crd, generateCRDHash(crd))) + s.crdCache.Store(cached.Static(crd, generateCRDHash(crd))) } func createSpecCache(crd *apiextensionsv1.CustomResourceDefinition) *specCache { s := specCache{} s.update(crd) - s.mergedVersionSpec = cached.NewTransformer[*apiextensionsv1.CustomResourceDefinition](func(result cached.Result[*apiextensionsv1.CustomResourceDefinition]) cached.Result[*spec.Swagger] { - if result.Err != nil { + s.mergedVersionSpec = cached.Transform[*apiextensionsv1.CustomResourceDefinition](func(crd *apiextensionsv1.CustomResourceDefinition, etag string, err error) (*spec.Swagger, string, error) { + if err != nil { // This should never happen, but return the err if it does. - return cached.NewResultErr[*spec.Swagger](result.Err) + return nil, "", err } - crd := result.Data mergeSpec := &spec.Swagger{} for _, v := range crd.Spec.Versions { if !v.Served { @@ -93,15 +92,15 @@ func createSpecCache(crd *apiextensionsv1.CustomResourceDefinition) *specCache { // Defaults must be pruned here for CRDs to cleanly merge with the static // spec that already has defaults pruned if err != nil { - return cached.NewResultErr[*spec.Swagger](err) + return nil, "", err } s.Definitions = handler.PruneDefaults(s.Definitions) mergeSpec, err = builder.MergeSpecs(mergeSpec, s) if err != nil { - return cached.NewResultErr[*spec.Swagger](err) + return nil, "", err } } - return cached.NewResultOK(mergeSpec, generateCRDHash(crd)) + return mergeSpec, generateCRDHash(crd), nil }, &s.crdCache) return &s } @@ -234,27 +233,27 @@ func (c *Controller) sync(name string) error { // updateSpecLocked updates the cached spec graph. func (c *Controller) updateSpecLocked() { - specList := make([]cached.Data[*spec.Swagger], 0, len(c.specsByName)) + specList := make([]cached.Value[*spec.Swagger], 0, len(c.specsByName)) for crd := range c.specsByName { specList = append(specList, c.specsByName[crd].mergedVersionSpec) } - cache := cached.NewListMerger(func(results []cached.Result[*spec.Swagger]) cached.Result[*spec.Swagger] { + cache := cached.MergeList(func(results []cached.Result[*spec.Swagger]) (*spec.Swagger, string, error) { localCRDSpec := make([]*spec.Swagger, 0, len(results)) for k := range results { if results[k].Err == nil { - localCRDSpec = append(localCRDSpec, results[k].Data) + localCRDSpec = append(localCRDSpec, results[k].Value) } } mergedSpec, err := builder.MergeSpecs(c.staticSpec, localCRDSpec...) if err != nil { - return cached.NewResultErr[*spec.Swagger](fmt.Errorf("failed to merge specs: %v", err)) + return nil, "", fmt.Errorf("failed to merge specs: %v", err) } // A UUID is returned for the etag because we will only // create a new merger when a CRD has changed. A hash based // etag is more expensive because the CRDs are not // premarshalled. - return cached.NewResultOK(mergedSpec, uuid.New().String()) + return mergedSpec, uuid.New().String(), nil }, specList) c.openAPIService.UpdateSpecLazy(cache) } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go index 8491293e725..88e4ed71fbd 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go @@ -63,11 +63,11 @@ const ( type openAPISpecInfo struct { apiService v1.APIService // spec is the cached OpenAPI spec - spec cached.Replaceable[*spec.Swagger] + spec cached.LastSuccess[*spec.Swagger] // The downloader is used only for non-local apiservices to // re-update the spec every so often. - downloader cached.Data[*spec.Swagger] + downloader cached.Value[*spec.Swagger] } type specAggregator struct { @@ -88,7 +88,7 @@ func buildAndRegisterSpecAggregatorForLocalServices(downloader *Downloader, aggr downloader: downloader, specsByAPIServiceName: map[string]*openAPISpecInfo{}, } - cachedAggregatorSpec := cached.NewResultOK(aggregatorSpec, "never-changes") + cachedAggregatorSpec := cached.Static(aggregatorSpec, "never-changes") s.addLocalSpec(fmt.Sprintf(localDelegateChainNamePattern, 0), cachedAggregatorSpec) for i, handler := range delegationHandlers { name := fmt.Sprintf(localDelegateChainNamePattern, i+1) @@ -132,55 +132,55 @@ func BuildAndRegisterAggregator(downloader *Downloader, delegationTarget server. return s, nil } -func (s *specAggregator) addLocalSpec(name string, spec cached.Data[*spec.Swagger]) { +func (s *specAggregator) addLocalSpec(name string, cachedSpec cached.Value[*spec.Swagger]) { service := v1.APIService{} service.Name = name info := &openAPISpecInfo{ apiService: service, } - info.spec.Replace(spec) + info.spec.Store(cachedSpec) s.specsByAPIServiceName[name] = info } // buildMergeSpecLocked creates a new cached mergeSpec from the list of cached specs. -func (s *specAggregator) buildMergeSpecLocked() cached.Data[*spec.Swagger] { +func (s *specAggregator) buildMergeSpecLocked() cached.Value[*spec.Swagger] { apiServices := make([]*v1.APIService, 0, len(s.specsByAPIServiceName)) for k := range s.specsByAPIServiceName { apiServices = append(apiServices, &s.specsByAPIServiceName[k].apiService) } sortByPriority(apiServices) - caches := make([]cached.Data[*spec.Swagger], len(apiServices)) + caches := make([]cached.Value[*spec.Swagger], len(apiServices)) for i, apiService := range apiServices { caches[i] = &(s.specsByAPIServiceName[apiService.Name].spec) } - return cached.NewListMerger(func(results []cached.Result[*spec.Swagger]) cached.Result[*spec.Swagger] { + return cached.MergeList(func(results []cached.Result[*spec.Swagger]) (*spec.Swagger, string, error) { var merged *spec.Swagger etags := make([]string, 0, len(results)) for _, specInfo := range results { - result := specInfo.Get() - if result.Err != nil { + result, etag, err := specInfo.Get() + if err != nil { // APIService name and err message will be included in // the error message as part of decorateError - klog.Warning(result.Err) + klog.Warning(err) continue } if merged == nil { merged = &spec.Swagger{} - *merged = *result.Data + *merged = *result // Paths, Definitions and parameters are set by // MergeSpecsIgnorePathConflictRenamingDefinitionsAndParameters merged.Paths = nil merged.Definitions = nil merged.Parameters = nil } - etags = append(etags, result.Etag) - if err := aggregator.MergeSpecsIgnorePathConflictRenamingDefinitionsAndParameters(merged, result.Data); err != nil { - return cached.NewResultErr[*spec.Swagger](fmt.Errorf("failed to build merge specs: %v", err)) + etags = append(etags, etag) + if err := aggregator.MergeSpecsIgnorePathConflictRenamingDefinitionsAndParameters(merged, result); err != nil { + return nil, "", fmt.Errorf("failed to build merge specs: %v", err) } } // Printing the etags list is stable because it is sorted. - return cached.NewResultOK(merged, fmt.Sprintf("%x", sha256.Sum256([]byte(fmt.Sprintf("%#v", etags))))) + return merged, fmt.Sprintf("%x", sha256.Sum256([]byte(fmt.Sprintf("%#v", etags)))), nil }, caches) } @@ -191,15 +191,15 @@ func (s *specAggregator) updateServiceLocked(name string) error { if !exists { return ErrAPIServiceNotFound } - result := specInfo.downloader.Get() - filteredResult := cached.NewTransformer[*spec.Swagger](func(result cached.Result[*spec.Swagger]) cached.Result[*spec.Swagger] { - if result.Err != nil { - return result + result, etag, err := specInfo.downloader.Get() + filteredResult := cached.Transform[*spec.Swagger](func(result *spec.Swagger, etag string, err error) (*spec.Swagger, string, error) { + if err != nil { + return nil, "", err } - return cached.NewResultOK(aggregator.FilterSpecByPathsWithoutSideEffects(result.Data, []string{"/apis/"}), result.Etag) - }, result) - specInfo.spec.Replace(filteredResult) - return result.Err + return aggregator.FilterSpecByPathsWithoutSideEffects(result, []string{"/apis/"}), etag, nil + }, cached.Result[*spec.Swagger]{Value: result, Etag: etag, Err: err}) + specInfo.spec.Store(filteredResult) + return err } // UpdateAPIServiceSpec updates the api service. It is thread safe. @@ -246,11 +246,11 @@ func (s *specAggregator) RemoveAPIService(apiServiceName string) { // decorateError creates a new cache that wraps a downloader // cache the name of the apiservice to help with debugging. -func decorateError(name string, cache cached.Data[*spec.Swagger]) cached.Data[*spec.Swagger] { - return cached.NewTransformer(func(result cached.Result[*spec.Swagger]) cached.Result[*spec.Swagger] { - if result.Err != nil { - return cached.NewResultErr[*spec.Swagger](fmt.Errorf("failed to download %v: %v", name, result.Err)) +func decorateError(name string, cache cached.Value[*spec.Swagger]) cached.Value[*spec.Swagger] { + return cached.Transform(func(result *spec.Swagger, etag string, err error) (*spec.Swagger, string, error) { + if err != nil { + return nil, "", fmt.Errorf("failed to download %v: %v", name, err) } - return result + return result, etag, err }, cache) } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/downloader.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/downloader.go index 3098f593e24..c8898630e78 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/downloader.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/downloader.go @@ -37,18 +37,18 @@ type cacheableDownloader struct { spec *spec.Swagger } -// Creates a downloader that also returns the etag, making it useful to use as a cached dependency. -func NewCacheableDownloader(downloader *Downloader, handler http.Handler) cached.Data[*spec.Swagger] { +// NewCacheableDownloader creates a downloader that also returns the etag, making it useful to use as a cached dependency. +func NewCacheableDownloader(downloader *Downloader, handler http.Handler) cached.Value[*spec.Swagger] { return &cacheableDownloader{ downloader: downloader, handler: handler, } } -func (d *cacheableDownloader) Get() cached.Result[*spec.Swagger] { +func (d *cacheableDownloader) Get() (*spec.Swagger, string, error) { swagger, etag, status, err := d.downloader.Download(d.handler, d.etag) if err != nil { - return cached.NewResultErr[*spec.Swagger](err) + return nil, "", err } switch status { case http.StatusNotModified: @@ -61,11 +61,11 @@ func (d *cacheableDownloader) Get() cached.Result[*spec.Swagger] { } fallthrough case http.StatusNotFound: - return cached.NewResultErr[*spec.Swagger](ErrAPIServiceNotFound) + return nil, "", ErrAPIServiceNotFound default: - return cached.NewResultErr[*spec.Swagger](fmt.Errorf("invalid status code: %v", status)) + return nil, "", fmt.Errorf("invalid status code: %v", status) } - return cached.NewResultOK(d.spec, d.etag) + return d.spec, d.etag, nil } // Downloader is the OpenAPI downloader type. It will try to download spec from /openapi/v2 or /swagger.json endpoint.