From 3079798d030a13d292bf3f87727ac6ed6ddfccf6 Mon Sep 17 00:00:00 2001 From: Haowei Cai Date: Wed, 6 Mar 2019 20:35:38 -0800 Subject: [PATCH] kube-aggregator: periodically resync local specs Co-authored-by: Dr. Stefan Schimanski --- .../openapi/aggregator/aggregator.go | 29 ++++++++++++++++++- .../pkg/controllers/openapi/controller.go | 19 +++++++++--- 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go index ca36b8e7902..c360e4b0f57 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go @@ -19,6 +19,7 @@ package aggregator import ( "fmt" "net/http" + "strings" "sync" "time" @@ -40,17 +41,35 @@ type SpecAggregator interface { UpdateAPIServiceSpec(apiServiceName string, spec *spec.Swagger, etag string) error RemoveAPIServiceSpec(apiServiceName string) error GetAPIServiceInfo(apiServiceName string) (handler http.Handler, etag string, exists bool) + GetAPIServiceNames() []string } const ( aggregatorUser = "system:aggregator" specDownloadTimeout = 60 * time.Second - localDelegateChainNamePattern = "k8s_internal_local_delegation_chain_%010d" + localDelegateChainNamePrefix = "k8s_internal_local_delegation_chain_" + localDelegateChainNamePattern = localDelegateChainNamePrefix + "%010d" // A randomly generated UUID to differentiate local and remote eTags. locallyGeneratedEtagPrefix = "\"6E8F849B434D4B98A569B9D7718876E9-" ) +// IsLocalAPIService returns true for local specs from delegates. +func IsLocalAPIService(apiServiceName string) bool { + return strings.HasPrefix(apiServiceName, localDelegateChainNamePrefix) +} + +// GetAPIServicesName returns the names of APIServices recorded in specAggregator.openAPISpecs. +// We use this function to pass the names of local APIServices to the controller in this package, +// so that the controller can periodically sync the OpenAPI spec from delegation API servers. +func (s *specAggregator) GetAPIServiceNames() []string { + names := make([]string, len(s.openAPISpecs)) + for key := range s.openAPISpecs { + names = append(names, key) + } + return names +} + // BuildAndRegisterAggregator registered OpenAPI aggregator handler. This function is not thread safe as it only being called on startup. func BuildAndRegisterAggregator(downloader *Downloader, delegationTarget server.DelegationTarget, webServices []*restful.WebService, config *common.Config, pathHandler common.PathHandler) (SpecAggregator, error) { @@ -161,6 +180,7 @@ func (s *specAggregator) buildOpenAPISpec() (specToReturn *spec.Swagger, err err return nil, err } } + return specToReturn, nil } @@ -179,7 +199,14 @@ func (s *specAggregator) updateOpenAPISpec() error { // tryUpdatingServiceSpecs tries updating openAPISpecs map with specified specInfo, and keeps the map intact // if the update fails. func (s *specAggregator) tryUpdatingServiceSpecs(specInfo *openAPISpecInfo) error { + if specInfo == nil { + return fmt.Errorf("invalid input: specInfo must be non-nil") + } orgSpecInfo, exists := s.openAPISpecs[specInfo.apiService.Name] + // Skip aggregation if OpenAPI spec didn't change + if exists && orgSpecInfo != nil && orgSpecInfo.etag == specInfo.etag { + return nil + } s.openAPISpecs[specInfo.apiService.Name] = specInfo if err := s.updateOpenAPISpec(); err != nil { if exists { diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/controller.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/controller.go index 764d7b8f3ab..ea8de12151f 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/controller.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/controller.go @@ -30,8 +30,9 @@ import ( ) const ( - successfulUpdateDelay = time.Minute - failedUpdateMaxExpDelay = time.Hour + successfulUpdateDelay = time.Minute + successfulUpdateDelayLocal = time.Second + failedUpdateMaxExpDelay = time.Hour ) type syncAction int @@ -64,6 +65,11 @@ func NewAggregationController(downloader *aggregator.Downloader, openAPIAggregat c.syncHandler = c.sync + // update each service at least once, also those which are not coming from APIServices, namely local services + for _, name := range openAPIAggregationManager.GetAPIServiceNames() { + c.queue.AddAfter(name, time.Second) + } + return c } @@ -104,8 +110,13 @@ func (c *AggregationController) processNextWorkItem() bool { switch action { case syncRequeue: - klog.Infof("OpenAPI AggregationController: action for item %s: Requeue.", key) - c.queue.AddAfter(key, successfulUpdateDelay) + if aggregator.IsLocalAPIService(key.(string)) { + klog.V(7).Infof("OpenAPI AggregationController: action for local item %s: Requeue after %s.", key, successfulUpdateDelayLocal) + c.queue.AddAfter(key, successfulUpdateDelayLocal) + } else { + klog.V(7).Infof("OpenAPI AggregationController: action for item %s: Requeue.", key) + c.queue.AddAfter(key, successfulUpdateDelay) + } case syncRequeueRateLimited: klog.Infof("OpenAPI AggregationController: action for item %s: Rate Limited Requeue.", key) c.queue.AddRateLimited(key)