diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go index 8d34368b251..c35ac49094c 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go @@ -67,7 +67,9 @@ type openAPISpecInfo struct { // The downloader is used only for non-local apiservices to // re-update the spec every so often. - downloader cached.Value[*spec.Swagger] + // Calling Get() is not thread safe and should only be called by a single + // thread via the openapi controller. + downloader CacheableDownloader } type specAggregator struct { @@ -93,8 +95,7 @@ func buildAndRegisterSpecAggregatorForLocalServices(downloader *Downloader, aggr for i, handler := range delegationHandlers { name := fmt.Sprintf(localDelegateChainNamePattern, i+1) - spec := NewCacheableDownloader(downloader, handler) - spec = decorateError(name, spec) + spec := NewCacheableDownloader(name, downloader, handler) s.addLocalSpec(name, spec) } @@ -218,14 +219,18 @@ func (s *specAggregator) AddUpdateAPIService(apiService *v1.APIService, handler s.mutex.Lock() defer s.mutex.Unlock() - _, exists := s.specsByAPIServiceName[apiService.Name] + existingSpec, exists := s.specsByAPIServiceName[apiService.Name] if !exists { - s.specsByAPIServiceName[apiService.Name] = &openAPISpecInfo{ + specInfo := &openAPISpecInfo{ apiService: *apiService, - downloader: decorateError(apiService.Name, NewCacheableDownloader(s.downloader, handler)), + downloader: NewCacheableDownloader(apiService.Name, s.downloader, handler), } - s.specByAPIServiceName[apiService.Name].spec.Store(cached.Result[*spec.Swagger]{Err: fmt.Errorf("spec for apiservice %s is not yet available", apiService.Name)}) + specInfo.spec.Store(cached.Result[*spec.Swagger]{Err: fmt.Errorf("spec for apiservice %s is not yet available", apiService.Name)}) + s.specsByAPIServiceName[apiService.Name] = specInfo s.openAPIVersionedService.UpdateSpecLazy(s.buildMergeSpecLocked()) + } else { + existingSpec.apiService = *apiService + existingSpec.downloader.UpdateHandler(handler) } return nil @@ -244,14 +249,3 @@ func (s *specAggregator) RemoveAPIService(apiServiceName string) { // Re-create the mergeSpec for the new list of apiservices s.openAPIVersionedService.UpdateSpecLazy(s.buildMergeSpecLocked()) } - -// decorateError creates a new cache that wraps a downloader -// cache the name of the apiservice to help with debugging. -func decorateError(name string, cache cached.Value[*spec.Swagger]) cached.Value[*spec.Swagger] { - return cached.Transform(func(result *spec.Swagger, etag string, err error) (*spec.Swagger, string, error) { - if err != nil { - return nil, "", fmt.Errorf("failed to download %v: %v", name, err) - } - return result, etag, err - }, cache) -} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/downloader.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/downloader.go index c8898630e78..03721365805 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/downloader.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/downloader.go @@ -21,32 +21,53 @@ import ( "fmt" "net/http" "strings" + "sync/atomic" "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/endpoints/request" - "k8s.io/kube-openapi/pkg/cached" "k8s.io/kube-openapi/pkg/validation/spec" ) +type CacheableDownloader interface { + UpdateHandler(http.Handler) + Get() (*spec.Swagger, string, error) +} + // cacheableDownloader is a downloader that will always return the data // and the etag. type cacheableDownloader struct { + name string downloader *Downloader - handler http.Handler - etag string - spec *spec.Swagger + // handler is the http Handler for the apiservice that can be replaced + handler atomic.Pointer[http.Handler] + etag string + spec *spec.Swagger } // NewCacheableDownloader creates a downloader that also returns the etag, making it useful to use as a cached dependency. -func NewCacheableDownloader(downloader *Downloader, handler http.Handler) cached.Value[*spec.Swagger] { - return &cacheableDownloader{ +func NewCacheableDownloader(apiServiceName string, downloader *Downloader, handler http.Handler) CacheableDownloader { + c := &cacheableDownloader{ + name: apiServiceName, downloader: downloader, - handler: handler, } + c.handler.Store(&handler) + return c +} +func (d *cacheableDownloader) UpdateHandler(handler http.Handler) { + d.handler.Store(&handler) } func (d *cacheableDownloader) Get() (*spec.Swagger, string, error) { - swagger, etag, status, err := d.downloader.Download(d.handler, d.etag) + spec, etag, err := d.get() + if err != nil { + return spec, etag, fmt.Errorf("failed to download %v: %v", d.name, err) + } + return spec, etag, err +} + +func (d *cacheableDownloader) get() (*spec.Swagger, string, error) { + h := *d.handler.Load() + swagger, etag, status, err := d.downloader.Download(h, d.etag) if err != nil { return nil, "", err }