mirror of
https://github.com/rancher/steve.git
synced 2025-08-02 07:12:36 +00:00
Improving Schema Definitions cache/refresh
Updates the schema definitions refresh method to use a new debounce method. Adds a handler which refreshes the definitions every 2 seconds after adding a CRD and every 10 minutes by default.
This commit is contained in:
parent
641178e7cb
commit
2f8e64840b
51
pkg/debounce/refresher.go
Normal file
51
pkg/debounce/refresher.go
Normal file
@ -0,0 +1,51 @@
|
||||
package debounce
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// Refreshable represents an object which can be refreshed. This should be protected by a mutex for concurrent operation.
|
||||
type Refreshable interface {
|
||||
Refresh() error
|
||||
}
|
||||
|
||||
// DebounceableRefresher is used to debounce multiple attempts to refresh a refreshable type.
|
||||
type DebounceableRefresher struct {
|
||||
sync.Mutex
|
||||
// Refreshable is any type that can be refreshed. The refresh method should by protected by a mutex internally.
|
||||
Refreshable Refreshable
|
||||
current context.CancelFunc
|
||||
}
|
||||
|
||||
// RefreshAfter requests a refresh after a certain time has passed. Subsequent calls to this method will
|
||||
// delay the requested refresh by the new duration. Note that this is a total override of the previous calls - calling
|
||||
// RefreshAfter(time.Second * 2) and then immediately calling RefreshAfter(time.Microsecond * 1) will run a refresh
|
||||
// in one microsecond
|
||||
func (d *DebounceableRefresher) RefreshAfter(duration time.Duration) {
|
||||
d.Lock()
|
||||
defer d.Unlock()
|
||||
ctx := context.Background()
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
if d.current != nil {
|
||||
d.current()
|
||||
}
|
||||
d.current = cancel
|
||||
go func() {
|
||||
timer := time.NewTimer(duration)
|
||||
defer timer.Stop()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// this indicates that the context was cancelled. Do nothing.
|
||||
case <-timer.C:
|
||||
// note this can cause multiple refreshes to happen concurrently
|
||||
err := d.Refreshable.Refresh()
|
||||
if err != nil {
|
||||
logrus.Errorf("failed to refresh with error: %v", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
@ -5,13 +5,11 @@ import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/rancher/apiserver/pkg/apierror"
|
||||
"github.com/rancher/apiserver/pkg/types"
|
||||
"github.com/rancher/steve/pkg/schema/converter"
|
||||
"github.com/rancher/wrangler/v2/pkg/schemas/validation"
|
||||
"github.com/sirupsen/logrus"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/discovery"
|
||||
"k8s.io/kube-openapi/pkg/util/proto"
|
||||
@ -28,15 +26,11 @@ var (
|
||||
}
|
||||
)
|
||||
|
||||
// schemaDefinitionHandler is a byID handler for a specific schema, which provides field definitions for all schemas.
|
||||
// SchemaDefinitionHandler is a byID handler for a specific schema, which provides field definitions for all schemas.
|
||||
// Does not implement any method allowing a caller to list definitions for all schemas.
|
||||
type schemaDefinitionHandler struct {
|
||||
type SchemaDefinitionHandler struct {
|
||||
sync.RWMutex
|
||||
|
||||
// lastRefresh is the last time that the handler retrieved models from kubernetes.
|
||||
lastRefresh time.Time
|
||||
// refreshStale is the duration between lastRefresh and the next refresh of models.
|
||||
refreshStale time.Duration
|
||||
// client is the discovery client used to get the groups/resources/fields from kubernetes.
|
||||
client discovery.DiscoveryInterface
|
||||
// models are the cached models from the last response from kubernetes.
|
||||
@ -46,9 +40,35 @@ type schemaDefinitionHandler struct {
|
||||
schemaToModel map[string]string
|
||||
}
|
||||
|
||||
// Refresh writeLocks and updates the cache with new schemaDefinitions. Will result in a call to kubernetes to retrieve
|
||||
// the openAPI schemas.
|
||||
func (s *SchemaDefinitionHandler) Refresh() error {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
openapi, err := s.client.OpenAPISchema()
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to fetch openapi definition: %w", err)
|
||||
}
|
||||
models, err := proto.NewOpenAPIData(openapi)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to parse openapi definition into models: %w", err)
|
||||
}
|
||||
s.models = &models
|
||||
nameIndex, err := s.indexSchemaNames(models)
|
||||
// indexSchemaNames may successfully refresh some definitions, but still return an error
|
||||
// in these cases, store what we could find, but still return up an error
|
||||
if nameIndex != nil {
|
||||
s.schemaToModel = nameIndex
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to index schema name to model name: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// byIDHandler is the Handler method for a request to get the schema definition for a specifc schema. Will use the
|
||||
// cached models found during the last refresh as part of this process.
|
||||
func (s *schemaDefinitionHandler) byIDHandler(request *types.APIRequest) (types.APIObject, error) {
|
||||
func (s *SchemaDefinitionHandler) byIDHandler(request *types.APIRequest) (types.APIObject, error) {
|
||||
// pseudo-access check, designed to make sure that users have access to the schema for the definition that they
|
||||
// are accessing.
|
||||
requestSchema := request.Schemas.LookupSchema(request.Name)
|
||||
@ -56,14 +76,6 @@ func (s *schemaDefinitionHandler) byIDHandler(request *types.APIRequest) (types.
|
||||
return types.APIObject{}, apierror.NewAPIError(validation.NotFound, "no such schema")
|
||||
}
|
||||
|
||||
if s.needsRefresh() {
|
||||
err := s.refresh()
|
||||
if err != nil {
|
||||
logrus.Errorf("error refreshing schemas %s", err.Error())
|
||||
return types.APIObject{}, apierror.NewAPIError(internalServerErrorCode, "error refreshing schemas")
|
||||
}
|
||||
}
|
||||
|
||||
// lock only in read-mode so that we don't read while refresh writes. Only use a read-lock - using a write lock
|
||||
// would make this endpoint only usable by one caller at a time
|
||||
s.RLock()
|
||||
@ -100,46 +112,9 @@ func (s *schemaDefinitionHandler) byIDHandler(request *types.APIRequest) (types.
|
||||
}, nil
|
||||
}
|
||||
|
||||
// needsRefresh readLocks and checks if the cache needs to be refreshed.
|
||||
func (s *schemaDefinitionHandler) needsRefresh() bool {
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
if s.lastRefresh.IsZero() {
|
||||
return true
|
||||
}
|
||||
return s.lastRefresh.Add(s.refreshStale).Before(time.Now())
|
||||
}
|
||||
|
||||
// refresh writeLocks and updates the cache with new schemaDefinitions. Will result in a call to kubernetes to retrieve
|
||||
// the openAPI schemas.
|
||||
func (s *schemaDefinitionHandler) refresh() error {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
openapi, err := s.client.OpenAPISchema()
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to fetch openapi definition: %w", err)
|
||||
}
|
||||
models, err := proto.NewOpenAPIData(openapi)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to parse openapi definition into models: %w", err)
|
||||
}
|
||||
s.models = &models
|
||||
nameIndex, err := s.indexSchemaNames(models)
|
||||
// indexSchemaNames may successfully refresh some definitions, but still return an error
|
||||
// in these cases, store what we could find, but still return up an error
|
||||
if nameIndex != nil {
|
||||
s.schemaToModel = nameIndex
|
||||
s.lastRefresh = time.Now()
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to index schema name to model name: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// indexSchemaNames returns a map of schemaID to the modelName for a given schema. Will use the preferred version of a
|
||||
// resource if possible. May return a map and an error if it was able to index some schemas but not others.
|
||||
func (s *schemaDefinitionHandler) indexSchemaNames(models proto.Models) (map[string]string, error) {
|
||||
func (s *SchemaDefinitionHandler) indexSchemaNames(models proto.Models) (map[string]string, error) {
|
||||
_, resourceLists, err := s.client.ServerGroupsAndResources()
|
||||
// this may occasionally fail to discover certain groups, but we still can refresh the others in those cases
|
||||
if _, ok := err.(*discovery.ErrGroupDiscoveryFailed); err != nil && !ok {
|
||||
|
48
pkg/schema/definitions/refresh.go
Normal file
48
pkg/schema/definitions/refresh.go
Normal file
@ -0,0 +1,48 @@
|
||||
package definitions
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/rancher/steve/pkg/debounce"
|
||||
apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
|
||||
apiregv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
|
||||
)
|
||||
|
||||
// refreshHandler triggers refreshes for a Debounceable refresher after a CRD/APIService has been changed
|
||||
// intended to refresh the schema definitions after a CRD has been added and is hopefully available in k8s.
|
||||
type refreshHandler struct {
|
||||
// debounceRef is the debounceableRefresher containing the Refreshable (typically the schema definition handler)
|
||||
debounceRef *debounce.DebounceableRefresher
|
||||
// debounceDuration is the duration that the handler should ask the DebounceableRefresher to wait before refreshing
|
||||
debounceDuration time.Duration
|
||||
}
|
||||
|
||||
// onChangeCRD refreshes the debounceRef after a CRD is added/changed
|
||||
func (r *refreshHandler) onChangeCRD(key string, crd *apiextv1.CustomResourceDefinition) (*apiextv1.CustomResourceDefinition, error) {
|
||||
r.debounceRef.RefreshAfter(r.debounceDuration)
|
||||
return crd, nil
|
||||
}
|
||||
|
||||
// onChangeAPIService refreshes the debounceRef after an APIService is added/changed
|
||||
func (r *refreshHandler) onChangeAPIService(key string, api *apiregv1.APIService) (*apiregv1.APIService, error) {
|
||||
r.debounceRef.RefreshAfter(r.debounceDuration)
|
||||
return api, nil
|
||||
}
|
||||
|
||||
// startBackgroundRefresh starts a force refresh that runs for every tick of duration. Can be stopped
|
||||
// by cancelling the context
|
||||
func (r *refreshHandler) startBackgroundRefresh(ctx context.Context, duration time.Duration) {
|
||||
go func() {
|
||||
ticker := time.NewTicker(duration)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
r.debounceRef.RefreshAfter(r.debounceDuration)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
@ -1,37 +1,30 @@
|
||||
package definitions
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/rancher/apiserver/pkg/types"
|
||||
"github.com/rancher/steve/pkg/debounce"
|
||||
apiextcontrollerv1 "github.com/rancher/wrangler/v2/pkg/generated/controllers/apiextensions.k8s.io/v1"
|
||||
v1 "github.com/rancher/wrangler/v2/pkg/generated/controllers/apiregistration.k8s.io/v1"
|
||||
"github.com/rancher/wrangler/v2/pkg/schemas"
|
||||
"github.com/sirupsen/logrus"
|
||||
"k8s.io/client-go/discovery"
|
||||
)
|
||||
|
||||
const (
|
||||
gvkExtensionName = "x-kubernetes-group-version-kind"
|
||||
gvkExtensionGroup = "group"
|
||||
gvkExtensionVersion = "version"
|
||||
gvkExtensionKind = "kind"
|
||||
defaultDuration = time.Second * 5
|
||||
handlerKey = "schema-definitions"
|
||||
delayEnvVar = "CATTLE_CRD_REFRESH_DELAY_SECONDS"
|
||||
defaultDelay = 2
|
||||
delayUnit = time.Second
|
||||
refreshEnvVar = "CATTLE_BACKGROUND_REFRESH_MINUTES"
|
||||
defaultRefresh = 10
|
||||
refreshUnit = time.Minute
|
||||
)
|
||||
|
||||
// Register registers the schemaDefinition schema.
|
||||
func Register(baseSchema *types.APISchemas, client discovery.DiscoveryInterface) {
|
||||
handler := schemaDefinitionHandler{
|
||||
client: client,
|
||||
refreshStale: defaultDuration,
|
||||
}
|
||||
baseSchema.MustAddSchema(types.APISchema{
|
||||
Schema: &schemas.Schema{
|
||||
ID: "schemaDefinition",
|
||||
PluralName: "schemaDefinitions",
|
||||
ResourceMethods: []string{"GET"},
|
||||
},
|
||||
ByIDHandler: handler.byIDHandler,
|
||||
})
|
||||
}
|
||||
|
||||
type schemaDefinition struct {
|
||||
DefinitionType string `json:"definitionType"`
|
||||
Definitions map[string]definition `json:"definitions"`
|
||||
@ -49,3 +42,55 @@ type definitionField struct {
|
||||
Description string `json:"description,omitempty"`
|
||||
Required bool `json:"required,omitempty"`
|
||||
}
|
||||
|
||||
// Register registers the schemaDefinition schema.
|
||||
func Register(ctx context.Context,
|
||||
baseSchema *types.APISchemas,
|
||||
client discovery.DiscoveryInterface,
|
||||
crd apiextcontrollerv1.CustomResourceDefinitionController,
|
||||
apiService v1.APIServiceController) {
|
||||
handler := SchemaDefinitionHandler{
|
||||
client: client,
|
||||
}
|
||||
baseSchema.MustAddSchema(types.APISchema{
|
||||
Schema: &schemas.Schema{
|
||||
ID: "schemaDefinition",
|
||||
PluralName: "schemaDefinitions",
|
||||
ResourceMethods: []string{"GET"},
|
||||
},
|
||||
ByIDHandler: handler.byIDHandler,
|
||||
})
|
||||
|
||||
debounce := debounce.DebounceableRefresher{
|
||||
Refreshable: &handler,
|
||||
}
|
||||
crdDebounce := getDurationEnvVarOrDefault(delayEnvVar, defaultDelay, delayUnit)
|
||||
refHandler := refreshHandler{
|
||||
debounceRef: &debounce,
|
||||
debounceDuration: crdDebounce,
|
||||
}
|
||||
crd.OnChange(ctx, handlerKey, refHandler.onChangeCRD)
|
||||
apiService.OnChange(ctx, handlerKey, refHandler.onChangeAPIService)
|
||||
refreshFrequency := getDurationEnvVarOrDefault(refreshEnvVar, defaultRefresh, refreshUnit)
|
||||
// there's a delay between when a CRD is created and when it is available in the openapi/v2 endpoint
|
||||
// the crd/apiservice controllers use a delay of 2 seconds to account for this, but it's possible that this isn't
|
||||
// enough in certain environments, so we also use an infrequent background refresh to eventually correct any misses
|
||||
refHandler.startBackgroundRefresh(ctx, refreshFrequency)
|
||||
}
|
||||
|
||||
// getDurationEnvVarOrDefault gets the duration value for a given envVar. If not found, it returns the provided default.
|
||||
// unit is the unit of time (time.Second/time.Minute/etc.) that the returned duration should be in
|
||||
func getDurationEnvVarOrDefault(envVar string, defaultVal int, unit time.Duration) time.Duration {
|
||||
defaultDuration := time.Duration(defaultVal) * unit
|
||||
envValue, ok := os.LookupEnv(envVar)
|
||||
if !ok {
|
||||
return defaultDuration
|
||||
}
|
||||
parsed, err := strconv.Atoi(envValue)
|
||||
if err != nil {
|
||||
logrus.Errorf("Env var %s was specified, but could not be converted to an int, default of %d seconds will be used",
|
||||
envVar, int64(defaultDuration.Seconds()))
|
||||
return defaultDuration
|
||||
}
|
||||
return time.Duration(parsed) * unit
|
||||
}
|
||||
|
@ -142,7 +142,8 @@ func setup(ctx context.Context, server *Server) error {
|
||||
if err = resources.DefaultSchemas(ctx, server.BaseSchemas, ccache, server.ClientFactory, sf, server.Version); err != nil {
|
||||
return err
|
||||
}
|
||||
definitions.Register(server.BaseSchemas, server.controllers.K8s.Discovery())
|
||||
definitions.Register(ctx, server.BaseSchemas, server.controllers.K8s.Discovery(),
|
||||
server.controllers.CRD.CustomResourceDefinition(), server.controllers.API.APIService())
|
||||
|
||||
summaryCache := summarycache.New(sf, ccache)
|
||||
summaryCache.Start(ctx)
|
||||
|
Loading…
Reference in New Issue
Block a user