diff --git a/pkg/debounce/refresher.go b/pkg/debounce/refresher.go new file mode 100644 index 00000000..fd7843c4 --- /dev/null +++ b/pkg/debounce/refresher.go @@ -0,0 +1,51 @@ +package debounce + +import ( + "context" + "sync" + "time" + + "github.com/sirupsen/logrus" +) + +// Refreshable represents an object which can be refreshed. This should be protected by a mutex for concurrent operation. +type Refreshable interface { + Refresh() error +} + +// DebounceableRefresher is used to debounce multiple attempts to refresh a refreshable type. +type DebounceableRefresher struct { + sync.Mutex + // Refreshable is any type that can be refreshed. The refresh method should by protected by a mutex internally. + Refreshable Refreshable + current context.CancelFunc +} + +// RefreshAfter requests a refresh after a certain time has passed. Subsequent calls to this method will +// delay the requested refresh by the new duration. Note that this is a total override of the previous calls - calling +// RefreshAfter(time.Second * 2) and then immediately calling RefreshAfter(time.Microsecond * 1) will run a refresh +// in one microsecond +func (d *DebounceableRefresher) RefreshAfter(duration time.Duration) { + d.Lock() + defer d.Unlock() + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + if d.current != nil { + d.current() + } + d.current = cancel + go func() { + timer := time.NewTimer(duration) + defer timer.Stop() + select { + case <-ctx.Done(): + // this indicates that the context was cancelled. Do nothing. + case <-timer.C: + // note this can cause multiple refreshes to happen concurrently + err := d.Refreshable.Refresh() + if err != nil { + logrus.Errorf("failed to refresh with error: %v", err) + } + } + }() +} diff --git a/pkg/schema/definitions/handler.go b/pkg/schema/definitions/handler.go index 094cb7ea..155950d9 100644 --- a/pkg/schema/definitions/handler.go +++ b/pkg/schema/definitions/handler.go @@ -5,13 +5,11 @@ import ( "fmt" "net/http" "sync" - "time" "github.com/rancher/apiserver/pkg/apierror" "github.com/rancher/apiserver/pkg/types" "github.com/rancher/steve/pkg/schema/converter" "github.com/rancher/wrangler/v2/pkg/schemas/validation" - "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/discovery" "k8s.io/kube-openapi/pkg/util/proto" @@ -28,15 +26,11 @@ var ( } ) -// schemaDefinitionHandler is a byID handler for a specific schema, which provides field definitions for all schemas. +// SchemaDefinitionHandler is a byID handler for a specific schema, which provides field definitions for all schemas. // Does not implement any method allowing a caller to list definitions for all schemas. -type schemaDefinitionHandler struct { +type SchemaDefinitionHandler struct { sync.RWMutex - // lastRefresh is the last time that the handler retrieved models from kubernetes. - lastRefresh time.Time - // refreshStale is the duration between lastRefresh and the next refresh of models. - refreshStale time.Duration // client is the discovery client used to get the groups/resources/fields from kubernetes. client discovery.DiscoveryInterface // models are the cached models from the last response from kubernetes. @@ -46,9 +40,35 @@ type schemaDefinitionHandler struct { schemaToModel map[string]string } +// Refresh writeLocks and updates the cache with new schemaDefinitions. Will result in a call to kubernetes to retrieve +// the openAPI schemas. +func (s *SchemaDefinitionHandler) Refresh() error { + s.Lock() + defer s.Unlock() + openapi, err := s.client.OpenAPISchema() + if err != nil { + return fmt.Errorf("unable to fetch openapi definition: %w", err) + } + models, err := proto.NewOpenAPIData(openapi) + if err != nil { + return fmt.Errorf("unable to parse openapi definition into models: %w", err) + } + s.models = &models + nameIndex, err := s.indexSchemaNames(models) + // indexSchemaNames may successfully refresh some definitions, but still return an error + // in these cases, store what we could find, but still return up an error + if nameIndex != nil { + s.schemaToModel = nameIndex + } + if err != nil { + return fmt.Errorf("unable to index schema name to model name: %w", err) + } + return nil +} + // byIDHandler is the Handler method for a request to get the schema definition for a specifc schema. Will use the // cached models found during the last refresh as part of this process. -func (s *schemaDefinitionHandler) byIDHandler(request *types.APIRequest) (types.APIObject, error) { +func (s *SchemaDefinitionHandler) byIDHandler(request *types.APIRequest) (types.APIObject, error) { // pseudo-access check, designed to make sure that users have access to the schema for the definition that they // are accessing. requestSchema := request.Schemas.LookupSchema(request.Name) @@ -56,14 +76,6 @@ func (s *schemaDefinitionHandler) byIDHandler(request *types.APIRequest) (types. return types.APIObject{}, apierror.NewAPIError(validation.NotFound, "no such schema") } - if s.needsRefresh() { - err := s.refresh() - if err != nil { - logrus.Errorf("error refreshing schemas %s", err.Error()) - return types.APIObject{}, apierror.NewAPIError(internalServerErrorCode, "error refreshing schemas") - } - } - // lock only in read-mode so that we don't read while refresh writes. Only use a read-lock - using a write lock // would make this endpoint only usable by one caller at a time s.RLock() @@ -100,46 +112,9 @@ func (s *schemaDefinitionHandler) byIDHandler(request *types.APIRequest) (types. }, nil } -// needsRefresh readLocks and checks if the cache needs to be refreshed. -func (s *schemaDefinitionHandler) needsRefresh() bool { - s.RLock() - defer s.RUnlock() - if s.lastRefresh.IsZero() { - return true - } - return s.lastRefresh.Add(s.refreshStale).Before(time.Now()) -} - -// refresh writeLocks and updates the cache with new schemaDefinitions. Will result in a call to kubernetes to retrieve -// the openAPI schemas. -func (s *schemaDefinitionHandler) refresh() error { - s.Lock() - defer s.Unlock() - openapi, err := s.client.OpenAPISchema() - if err != nil { - return fmt.Errorf("unable to fetch openapi definition: %w", err) - } - models, err := proto.NewOpenAPIData(openapi) - if err != nil { - return fmt.Errorf("unable to parse openapi definition into models: %w", err) - } - s.models = &models - nameIndex, err := s.indexSchemaNames(models) - // indexSchemaNames may successfully refresh some definitions, but still return an error - // in these cases, store what we could find, but still return up an error - if nameIndex != nil { - s.schemaToModel = nameIndex - s.lastRefresh = time.Now() - } - if err != nil { - return fmt.Errorf("unable to index schema name to model name: %w", err) - } - return nil -} - // indexSchemaNames returns a map of schemaID to the modelName for a given schema. Will use the preferred version of a // resource if possible. May return a map and an error if it was able to index some schemas but not others. -func (s *schemaDefinitionHandler) indexSchemaNames(models proto.Models) (map[string]string, error) { +func (s *SchemaDefinitionHandler) indexSchemaNames(models proto.Models) (map[string]string, error) { _, resourceLists, err := s.client.ServerGroupsAndResources() // this may occasionally fail to discover certain groups, but we still can refresh the others in those cases if _, ok := err.(*discovery.ErrGroupDiscoveryFailed); err != nil && !ok { diff --git a/pkg/schema/definitions/refresh.go b/pkg/schema/definitions/refresh.go new file mode 100644 index 00000000..394873cb --- /dev/null +++ b/pkg/schema/definitions/refresh.go @@ -0,0 +1,48 @@ +package definitions + +import ( + "context" + "time" + + "github.com/rancher/steve/pkg/debounce" + apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apiregv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" +) + +// refreshHandler triggers refreshes for a Debounceable refresher after a CRD/APIService has been changed +// intended to refresh the schema definitions after a CRD has been added and is hopefully available in k8s. +type refreshHandler struct { + // debounceRef is the debounceableRefresher containing the Refreshable (typically the schema definition handler) + debounceRef *debounce.DebounceableRefresher + // debounceDuration is the duration that the handler should ask the DebounceableRefresher to wait before refreshing + debounceDuration time.Duration +} + +// onChangeCRD refreshes the debounceRef after a CRD is added/changed +func (r *refreshHandler) onChangeCRD(key string, crd *apiextv1.CustomResourceDefinition) (*apiextv1.CustomResourceDefinition, error) { + r.debounceRef.RefreshAfter(r.debounceDuration) + return crd, nil +} + +// onChangeAPIService refreshes the debounceRef after an APIService is added/changed +func (r *refreshHandler) onChangeAPIService(key string, api *apiregv1.APIService) (*apiregv1.APIService, error) { + r.debounceRef.RefreshAfter(r.debounceDuration) + return api, nil +} + +// startBackgroundRefresh starts a force refresh that runs for every tick of duration. Can be stopped +// by cancelling the context +func (r *refreshHandler) startBackgroundRefresh(ctx context.Context, duration time.Duration) { + go func() { + ticker := time.NewTicker(duration) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + r.debounceRef.RefreshAfter(r.debounceDuration) + } + } + }() +} diff --git a/pkg/schema/definitions/schema.go b/pkg/schema/definitions/schema.go index b6ddf44c..b9ba1ebc 100644 --- a/pkg/schema/definitions/schema.go +++ b/pkg/schema/definitions/schema.go @@ -1,37 +1,30 @@ package definitions import ( + "context" + "os" + "strconv" "time" "github.com/rancher/apiserver/pkg/types" + "github.com/rancher/steve/pkg/debounce" + apiextcontrollerv1 "github.com/rancher/wrangler/v2/pkg/generated/controllers/apiextensions.k8s.io/v1" + v1 "github.com/rancher/wrangler/v2/pkg/generated/controllers/apiregistration.k8s.io/v1" "github.com/rancher/wrangler/v2/pkg/schemas" + "github.com/sirupsen/logrus" "k8s.io/client-go/discovery" ) const ( - gvkExtensionName = "x-kubernetes-group-version-kind" - gvkExtensionGroup = "group" - gvkExtensionVersion = "version" - gvkExtensionKind = "kind" - defaultDuration = time.Second * 5 + handlerKey = "schema-definitions" + delayEnvVar = "CATTLE_CRD_REFRESH_DELAY_SECONDS" + defaultDelay = 2 + delayUnit = time.Second + refreshEnvVar = "CATTLE_BACKGROUND_REFRESH_MINUTES" + defaultRefresh = 10 + refreshUnit = time.Minute ) -// Register registers the schemaDefinition schema. -func Register(baseSchema *types.APISchemas, client discovery.DiscoveryInterface) { - handler := schemaDefinitionHandler{ - client: client, - refreshStale: defaultDuration, - } - baseSchema.MustAddSchema(types.APISchema{ - Schema: &schemas.Schema{ - ID: "schemaDefinition", - PluralName: "schemaDefinitions", - ResourceMethods: []string{"GET"}, - }, - ByIDHandler: handler.byIDHandler, - }) -} - type schemaDefinition struct { DefinitionType string `json:"definitionType"` Definitions map[string]definition `json:"definitions"` @@ -49,3 +42,55 @@ type definitionField struct { Description string `json:"description,omitempty"` Required bool `json:"required,omitempty"` } + +// Register registers the schemaDefinition schema. +func Register(ctx context.Context, + baseSchema *types.APISchemas, + client discovery.DiscoveryInterface, + crd apiextcontrollerv1.CustomResourceDefinitionController, + apiService v1.APIServiceController) { + handler := SchemaDefinitionHandler{ + client: client, + } + baseSchema.MustAddSchema(types.APISchema{ + Schema: &schemas.Schema{ + ID: "schemaDefinition", + PluralName: "schemaDefinitions", + ResourceMethods: []string{"GET"}, + }, + ByIDHandler: handler.byIDHandler, + }) + + debounce := debounce.DebounceableRefresher{ + Refreshable: &handler, + } + crdDebounce := getDurationEnvVarOrDefault(delayEnvVar, defaultDelay, delayUnit) + refHandler := refreshHandler{ + debounceRef: &debounce, + debounceDuration: crdDebounce, + } + crd.OnChange(ctx, handlerKey, refHandler.onChangeCRD) + apiService.OnChange(ctx, handlerKey, refHandler.onChangeAPIService) + refreshFrequency := getDurationEnvVarOrDefault(refreshEnvVar, defaultRefresh, refreshUnit) + // there's a delay between when a CRD is created and when it is available in the openapi/v2 endpoint + // the crd/apiservice controllers use a delay of 2 seconds to account for this, but it's possible that this isn't + // enough in certain environments, so we also use an infrequent background refresh to eventually correct any misses + refHandler.startBackgroundRefresh(ctx, refreshFrequency) +} + +// getDurationEnvVarOrDefault gets the duration value for a given envVar. If not found, it returns the provided default. +// unit is the unit of time (time.Second/time.Minute/etc.) that the returned duration should be in +func getDurationEnvVarOrDefault(envVar string, defaultVal int, unit time.Duration) time.Duration { + defaultDuration := time.Duration(defaultVal) * unit + envValue, ok := os.LookupEnv(envVar) + if !ok { + return defaultDuration + } + parsed, err := strconv.Atoi(envValue) + if err != nil { + logrus.Errorf("Env var %s was specified, but could not be converted to an int, default of %d seconds will be used", + envVar, int64(defaultDuration.Seconds())) + return defaultDuration + } + return time.Duration(parsed) * unit +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 88ee2da4..2097aea2 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -142,7 +142,8 @@ func setup(ctx context.Context, server *Server) error { if err = resources.DefaultSchemas(ctx, server.BaseSchemas, ccache, server.ClientFactory, sf, server.Version); err != nil { return err } - definitions.Register(server.BaseSchemas, server.controllers.K8s.Discovery()) + definitions.Register(ctx, server.BaseSchemas, server.controllers.K8s.Discovery(), + server.controllers.CRD.CustomResourceDefinition(), server.controllers.API.APIService()) summaryCache := summarycache.New(sf, ccache) summaryCache.Start(ctx)