1
0
mirror of https://github.com/rancher/steve.git synced 2025-08-17 13:56:28 +00:00

Merge pull request #161 from MbolotSuse/resource-schema-improved-cache

Resource schema improved cache
This commit is contained in:
Michael Bolot 2024-03-14 09:57:06 -05:00 committed by GitHub
commit 870824dc8f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 751 additions and 302 deletions

51
pkg/debounce/refresher.go Normal file
View File

@ -0,0 +1,51 @@
package debounce
import (
"context"
"sync"
"time"
"github.com/sirupsen/logrus"
)
// Refreshable represents an object which can be refreshed. This should be protected by a mutex for concurrent operation.
type Refreshable interface {
Refresh() error
}
// DebounceableRefresher is used to debounce multiple attempts to refresh a refreshable type.
type DebounceableRefresher struct {
sync.Mutex
// Refreshable is any type that can be refreshed. The refresh method should by protected by a mutex internally.
Refreshable Refreshable
current context.CancelFunc
}
// RefreshAfter requests a refresh after a certain time has passed. Subsequent calls to this method will
// delay the requested refresh by the new duration. Note that this is a total override of the previous calls - calling
// RefreshAfter(time.Second * 2) and then immediately calling RefreshAfter(time.Microsecond * 1) will run a refresh
// in one microsecond
func (d *DebounceableRefresher) RefreshAfter(duration time.Duration) {
d.Lock()
defer d.Unlock()
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
if d.current != nil {
d.current()
}
d.current = cancel
go func() {
timer := time.NewTimer(duration)
defer timer.Stop()
select {
case <-ctx.Done():
// this indicates that the context was cancelled. Do nothing.
case <-timer.C:
// note this can cause multiple refreshes to happen concurrently
err := d.Refreshable.Refresh()
if err != nil {
logrus.Errorf("failed to refresh with error: %v", err)
}
}
}()
}

View File

@ -0,0 +1,47 @@
package debounce
import (
"fmt"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/require"
)
type refreshable struct {
wasRefreshed atomic.Bool
retErr error
}
func (r *refreshable) Refresh() error {
r.wasRefreshed.Store(true)
return r.retErr
}
func TestRefreshAfter(t *testing.T) {
ref := refreshable{}
debounce := DebounceableRefresher{
Refreshable: &ref,
}
debounce.RefreshAfter(time.Millisecond * 2)
debounce.RefreshAfter(time.Microsecond * 2)
time.Sleep(time.Millisecond * 1)
// test that the second refresh call overrode the first - Micro < Milli so this should have ran
require.True(t, ref.wasRefreshed.Load())
ref.wasRefreshed.Store(false)
time.Sleep(time.Millisecond * 2)
// test that the call was debounced - though we called this twice only one refresh should be called
require.False(t, ref.wasRefreshed.Load())
ref = refreshable{
retErr: fmt.Errorf("Some error"),
}
debounce = DebounceableRefresher{
Refreshable: &ref,
}
debounce.RefreshAfter(time.Microsecond * 2)
// test the error case
time.Sleep(time.Millisecond * 1)
require.True(t, ref.wasRefreshed.Load())
}

View File

@ -1,18 +1,15 @@
package definitions package definitions
import ( import (
"errors"
"fmt" "fmt"
"net/http" "net/http"
"sync" "sync"
"time"
"github.com/rancher/apiserver/pkg/apierror" "github.com/rancher/apiserver/pkg/apierror"
"github.com/rancher/apiserver/pkg/types" "github.com/rancher/apiserver/pkg/types"
"github.com/rancher/steve/pkg/schema/converter" "github.com/rancher/steve/pkg/schema/converter"
"github.com/rancher/wrangler/v2/pkg/schemas/validation" "github.com/rancher/wrangler/v2/pkg/schemas/validation"
"github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery" "k8s.io/client-go/discovery"
"k8s.io/kube-openapi/pkg/util/proto" "k8s.io/kube-openapi/pkg/util/proto"
) )
@ -28,15 +25,11 @@ var (
} }
) )
// schemaDefinitionHandler is a byID handler for a specific schema, which provides field definitions for all schemas. // SchemaDefinitionHandler is a byID handler for a specific schema, which provides field definitions for all schemas.
// Does not implement any method allowing a caller to list definitions for all schemas. // Does not implement any method allowing a caller to list definitions for all schemas.
type schemaDefinitionHandler struct { type SchemaDefinitionHandler struct {
sync.RWMutex sync.RWMutex
// lastRefresh is the last time that the handler retrieved models from kubernetes.
lastRefresh time.Time
// refreshStale is the duration between lastRefresh and the next refresh of models.
refreshStale time.Duration
// client is the discovery client used to get the groups/resources/fields from kubernetes. // client is the discovery client used to get the groups/resources/fields from kubernetes.
client discovery.DiscoveryInterface client discovery.DiscoveryInterface
// models are the cached models from the last response from kubernetes. // models are the cached models from the last response from kubernetes.
@ -46,9 +39,32 @@ type schemaDefinitionHandler struct {
schemaToModel map[string]string schemaToModel map[string]string
} }
// Refresh writeLocks and updates the cache with new schemaDefinitions. Will result in a call to kubernetes to retrieve
// the openAPI schemas.
func (s *SchemaDefinitionHandler) Refresh() error {
openapi, err := s.client.OpenAPISchema()
if err != nil {
return fmt.Errorf("unable to fetch openapi definition: %w", err)
}
models, err := proto.NewOpenAPIData(openapi)
if err != nil {
return fmt.Errorf("unable to parse openapi definition into models: %w", err)
}
groups, err := s.client.ServerGroups()
if err != nil {
return fmt.Errorf("unable to retrieve groups: %w", err)
}
s.Lock()
defer s.Unlock()
nameIndex := s.indexSchemaNames(models, groups)
s.schemaToModel = nameIndex
s.models = &models
return nil
}
// byIDHandler is the Handler method for a request to get the schema definition for a specifc schema. Will use the // byIDHandler is the Handler method for a request to get the schema definition for a specifc schema. Will use the
// cached models found during the last refresh as part of this process. // cached models found during the last refresh as part of this process.
func (s *schemaDefinitionHandler) byIDHandler(request *types.APIRequest) (types.APIObject, error) { func (s *SchemaDefinitionHandler) byIDHandler(request *types.APIRequest) (types.APIObject, error) {
// pseudo-access check, designed to make sure that users have access to the schema for the definition that they // pseudo-access check, designed to make sure that users have access to the schema for the definition that they
// are accessing. // are accessing.
requestSchema := request.Schemas.LookupSchema(request.Name) requestSchema := request.Schemas.LookupSchema(request.Name)
@ -56,14 +72,6 @@ func (s *schemaDefinitionHandler) byIDHandler(request *types.APIRequest) (types.
return types.APIObject{}, apierror.NewAPIError(validation.NotFound, "no such schema") return types.APIObject{}, apierror.NewAPIError(validation.NotFound, "no such schema")
} }
if s.needsRefresh() {
err := s.refresh()
if err != nil {
logrus.Errorf("error refreshing schemas %s", err.Error())
return types.APIObject{}, apierror.NewAPIError(internalServerErrorCode, "error refreshing schemas")
}
}
// lock only in read-mode so that we don't read while refresh writes. Only use a read-lock - using a write lock // lock only in read-mode so that we don't read while refresh writes. Only use a read-lock - using a write lock
// would make this endpoint only usable by one caller at a time // would make this endpoint only usable by one caller at a time
s.RLock() s.RLock()
@ -100,72 +108,13 @@ func (s *schemaDefinitionHandler) byIDHandler(request *types.APIRequest) (types.
}, nil }, nil
} }
// needsRefresh readLocks and checks if the cache needs to be refreshed.
func (s *schemaDefinitionHandler) needsRefresh() bool {
s.RLock()
defer s.RUnlock()
if s.lastRefresh.IsZero() {
return true
}
return s.lastRefresh.Add(s.refreshStale).Before(time.Now())
}
// refresh writeLocks and updates the cache with new schemaDefinitions. Will result in a call to kubernetes to retrieve
// the openAPI schemas.
func (s *schemaDefinitionHandler) refresh() error {
s.Lock()
defer s.Unlock()
openapi, err := s.client.OpenAPISchema()
if err != nil {
return fmt.Errorf("unable to fetch openapi definition: %w", err)
}
models, err := proto.NewOpenAPIData(openapi)
if err != nil {
return fmt.Errorf("unable to parse openapi definition into models: %w", err)
}
s.models = &models
nameIndex, err := s.indexSchemaNames(models)
// indexSchemaNames may successfully refresh some definitions, but still return an error
// in these cases, store what we could find, but still return up an error
if nameIndex != nil {
s.schemaToModel = nameIndex
s.lastRefresh = time.Now()
}
if err != nil {
return fmt.Errorf("unable to index schema name to model name: %w", err)
}
return nil
}
// indexSchemaNames returns a map of schemaID to the modelName for a given schema. Will use the preferred version of a // indexSchemaNames returns a map of schemaID to the modelName for a given schema. Will use the preferred version of a
// resource if possible. May return a map and an error if it was able to index some schemas but not others. // resource if possible. Can return an error if unable to find groups.
func (s *schemaDefinitionHandler) indexSchemaNames(models proto.Models) (map[string]string, error) { func (s *SchemaDefinitionHandler) indexSchemaNames(models proto.Models, groups *metav1.APIGroupList) map[string]string {
_, resourceLists, err := s.client.ServerGroupsAndResources() preferredResourceVersions := map[string]string{}
// this may occasionally fail to discover certain groups, but we still can refresh the others in those cases if groups != nil {
if _, ok := err.(*discovery.ErrGroupDiscoveryFailed); err != nil && !ok { for _, group := range groups.Groups {
return nil, fmt.Errorf("unable to retrieve groups and resources: %w", err) preferredResourceVersions[group.Name] = group.PreferredVersion.Version
}
preferredResourceVersions := map[schema.GroupKind]string{}
for _, resourceList := range resourceLists {
if resourceList == nil {
continue
}
groupVersion, gvErr := schema.ParseGroupVersion(resourceList.GroupVersion)
// we may fail to parse the GV of one group, but can still parse out the others
if gvErr != nil {
err = errors.Join(err, fmt.Errorf("unable to parse group version %s: %w", resourceList.GroupVersion, gvErr))
continue
}
for _, resource := range resourceList.APIResources {
gk := schema.GroupKind{
Group: groupVersion.Group,
Kind: resource.Kind,
}
// per the resource docs, if the resource.Version is empty, the preferred version for
// this resource is the version of the APIResourceList it is in
if resource.Version == "" || resource.Version == groupVersion.Version {
preferredResourceVersions[gk] = groupVersion.Version
}
} }
} }
schemaToModel := map[string]string{} schemaToModel := map[string]string{}
@ -181,17 +130,13 @@ func (s *schemaDefinitionHandler) indexSchemaNames(models proto.Models) (map[str
// we can safely continue // we can safely continue
continue continue
} }
gk := schema.GroupKind{ prefVersion := preferredResourceVersions[gvk.Group]
Group: gvk.Group,
Kind: gvk.Kind,
}
prefVersion, ok := preferredResourceVersions[gk]
// if we don't have a known preferred version for this group or we are the preferred version // if we don't have a known preferred version for this group or we are the preferred version
// add this as the model name for the schema // add this as the model name for the schema
if !ok || prefVersion == gvk.Version { if prefVersion == "" || prefVersion == gvk.Version {
schemaID := converter.GVKToSchemaID(*gvk) schemaID := converter.GVKToSchemaID(*gvk)
schemaToModel[schemaID] = modelName schemaToModel[schemaID] = modelName
} }
} }
return schemaToModel, err return schemaToModel
} }

View File

@ -3,7 +3,6 @@ package definitions
import ( import (
"fmt" "fmt"
"testing" "testing"
"time"
openapi_v2 "github.com/google/gnostic-models/openapiv2" openapi_v2 "github.com/google/gnostic-models/openapiv2"
"github.com/rancher/apiserver/pkg/apierror" "github.com/rancher/apiserver/pkg/apierror"
@ -11,85 +10,105 @@ import (
wschemas "github.com/rancher/wrangler/v2/pkg/schemas" wschemas "github.com/rancher/wrangler/v2/pkg/schemas"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/version" "k8s.io/apimachinery/pkg/version"
"k8s.io/client-go/discovery" "k8s.io/client-go/discovery"
"k8s.io/client-go/openapi" "k8s.io/client-go/openapi"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
"k8s.io/kube-openapi/pkg/util/proto"
) )
var globalRoleObject = types.APIObject{ func TestRefresh(t *testing.T) {
ID: "management.cattle.io.globalrole", defaultDocument, err := openapi_v2.ParseDocument([]byte(openapi_raw))
Type: "schemaDefinition", require.NoError(t, err)
Object: schemaDefinition{ defaultModels, err := proto.NewOpenAPIData(defaultDocument)
DefinitionType: "io.cattle.management.v2.GlobalRole", require.NoError(t, err)
Definitions: map[string]definition{ defaultSchemaToModel := map[string]string{
"io.cattle.management.v2.GlobalRole": { "management.cattle.io.globalrole": "io.cattle.management.v1.GlobalRole",
ResourceFields: map[string]definitionField{ "noversion.cattle.io.resource": "io.cattle.noversion.v2.Resource",
"apiVersion": { "missinggroup.cattle.io.resource": "io.cattle.missinggroup.v2.Resource",
Type: "string", }
Description: "The APIVersion of this resource", tests := []struct {
}, name string
"kind": { openapiError error
Type: "string", serverGroupsErr error
Description: "The kind", useBadOpenApiDoc bool
}, nilGroups bool
"metadata": { wantModels *proto.Models
Type: "io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta", wantSchemaToModel map[string]string
Description: "The metadata", wantError bool
}, }{
"spec": { {
Type: "io.cattle.management.v2.GlobalRole.spec", Description: "The spec for the project", name: "success",
}, wantModels: &defaultModels,
}, wantSchemaToModel: defaultSchemaToModel,
Type: "io.cattle.management.v2.GlobalRole", },
Description: "A Global Role V2 provides Global Permissions in Rancher", {
}, name: "error - openapi doc unavailable",
"io.cattle.management.v2.GlobalRole.spec": { openapiError: fmt.Errorf("server unavailable"),
ResourceFields: map[string]definitionField{ wantError: true,
"clusterName": { },
Type: "string", {
Description: "The name of the cluster", name: "error - unable to parse openapi doc",
Required: true, useBadOpenApiDoc: true,
}, wantError: true,
"displayName": { },
Type: "string", {
Description: "The UI readable name", name: "error - unable to retrieve groups and resources",
Required: true, serverGroupsErr: fmt.Errorf("server not available"),
}, wantError: true,
"newField": { },
Type: "string", {
Description: "A new field not present in v1", name: "no groups or error from server",
}, nilGroups: true,
"notRequired": { wantModels: &defaultModels,
Type: "boolean", wantSchemaToModel: map[string]string{
Description: "Some field that isn't required", "management.cattle.io.globalrole": "io.cattle.management.v2.GlobalRole",
}, "noversion.cattle.io.resource": "io.cattle.noversion.v2.Resource",
}, "missinggroup.cattle.io.resource": "io.cattle.missinggroup.v2.Resource",
Type: "io.cattle.management.v2.GlobalRole.spec",
Description: "The spec for the project",
},
"io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta": {
ResourceFields: map[string]definitionField{
"annotations": {
Type: "map",
SubType: "string",
Description: "annotations of the resource",
},
"name": {
Type: "string",
SubType: "",
Description: "name of the resource",
},
},
Type: "io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta",
Description: "Object Metadata",
}, },
}, },
}, }
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
client, err := buildDefaultDiscovery()
client.DocumentErr = test.openapiError
client.GroupsErr = test.serverGroupsErr
if test.useBadOpenApiDoc {
schema := client.Document.Definitions.AdditionalProperties[0]
schema.Value.Type = &openapi_v2.TypeItem{
Value: []string{"multiple", "entries"},
}
}
if test.nilGroups {
client.Groups = nil
}
require.Nil(t, err)
handler := SchemaDefinitionHandler{
client: client,
}
err = handler.Refresh()
if test.wantError {
require.Error(t, err)
} else {
require.NoError(t, err)
}
require.Equal(t, test.wantModels, handler.models)
require.Equal(t, test.wantSchemaToModel, handler.schemaToModel)
})
}
} }
func TestByID(t *testing.T) { func Test_byID(t *testing.T) {
defaultDocument, err := openapi_v2.ParseDocument([]byte(openapi_raw))
require.NoError(t, err)
defaultModels, err := proto.NewOpenAPIData(defaultDocument)
require.NoError(t, err)
defaultSchemaToModel := map[string]string{
"management.cattle.io.globalrole": "io.cattle.management.v2.GlobalRole",
}
schemas := types.EmptyAPISchemas() schemas := types.EmptyAPISchemas()
addBaseSchema := func(names ...string) { addBaseSchema := func(names ...string) {
for _, name := range names { for _, name := range names {
@ -107,121 +126,134 @@ func TestByID(t *testing.T) {
return &input return &input
} }
addBaseSchema("management.cattle.io.globalrole", "management.cattle.io.missingfrommodel") addBaseSchema("management.cattle.io.globalrole", "management.cattle.io.missingfrommodel", "management.cattle.io.notakind")
tests := []struct { tests := []struct {
name string name string
schemaName string schemaName string
needsRefresh bool models *proto.Models
openapiError error schemaToModel map[string]string
serverGroupsResourcesErr error wantObject *types.APIObject
useBadOpenApiDoc bool wantError bool
unparseableGV bool wantErrorCode *int
wantObject *types.APIObject
wantError bool
wantErrorCode *int
}{ }{
{ {
name: "global role definition", name: "global role definition",
schemaName: "management.cattle.io.globalrole", schemaName: "management.cattle.io.globalrole",
needsRefresh: true, models: &defaultModels,
wantObject: &globalRoleObject, schemaToModel: defaultSchemaToModel,
wantObject: &types.APIObject{
ID: "management.cattle.io.globalrole",
Type: "schemaDefinition",
Object: schemaDefinition{
DefinitionType: "io.cattle.management.v2.GlobalRole",
Definitions: map[string]definition{
"io.cattle.management.v2.GlobalRole": {
ResourceFields: map[string]definitionField{
"apiVersion": {
Type: "string",
Description: "The APIVersion of this resource",
},
"kind": {
Type: "string",
Description: "The kind",
},
"metadata": {
Type: "io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta",
Description: "The metadata",
},
"spec": {
Type: "io.cattle.management.v2.GlobalRole.spec", Description: "The spec for the project",
},
},
Type: "io.cattle.management.v2.GlobalRole",
Description: "A Global Role V2 provides Global Permissions in Rancher",
},
"io.cattle.management.v2.GlobalRole.spec": {
ResourceFields: map[string]definitionField{
"clusterName": {
Type: "string",
Description: "The name of the cluster",
Required: true,
},
"displayName": {
Type: "string",
Description: "The UI readable name",
Required: true,
},
"newField": {
Type: "string",
Description: "A new field not present in v1",
},
"notRequired": {
Type: "boolean",
Description: "Some field that isn't required",
},
},
Type: "io.cattle.management.v2.GlobalRole.spec",
Description: "The spec for the project",
},
"io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta": {
ResourceFields: map[string]definitionField{
"annotations": {
Type: "map",
SubType: "string",
Description: "annotations of the resource",
},
"name": {
Type: "string",
SubType: "",
Description: "name of the resource",
},
},
Type: "io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta",
Description: "Object Metadata",
},
},
},
},
}, },
{ {
name: "missing definition", name: "missing definition",
schemaName: "management.cattle.io.cluster", schemaName: "management.cattle.io.cluster",
needsRefresh: true, models: &defaultModels,
schemaToModel: defaultSchemaToModel,
wantError: true, wantError: true,
wantErrorCode: intPtr(404), wantErrorCode: intPtr(404),
}, },
{ {
name: "not refreshed", name: "not refreshed",
schemaName: "management.cattle.io.globalrole", schemaName: "management.cattle.io.globalrole",
needsRefresh: false,
wantError: true, wantError: true,
wantErrorCode: intPtr(503), wantErrorCode: intPtr(503),
}, },
{ {
name: "missing from model", name: "has schema, missing from model",
schemaName: "management.cattle.io.missingfrommodel", schemaName: "management.cattle.io.missingfrommodel",
needsRefresh: true, models: &defaultModels,
schemaToModel: defaultSchemaToModel,
wantError: true, wantError: true,
wantErrorCode: intPtr(503), wantErrorCode: intPtr(503),
}, },
{ {
name: "refresh error - openapi doc unavailable", name: "has schema, model is not a kind",
schemaName: "management.cattle.io.globalrole", schemaName: "management.cattle.io.notakind",
needsRefresh: true, models: &defaultModels,
openapiError: fmt.Errorf("server unavailable"), schemaToModel: map[string]string{
wantError: true, "management.cattle.io.notakind": "io.management.cattle.NotAKind",
wantErrorCode: intPtr(500),
},
{
name: "refresh error - unable to parse openapi doc",
schemaName: "management.cattle.io.globalrole",
needsRefresh: true,
useBadOpenApiDoc: true,
wantError: true,
wantErrorCode: intPtr(500),
},
{
name: "refresh error - unable to retrieve groups and resources",
schemaName: "management.cattle.io.globalrole",
needsRefresh: true,
serverGroupsResourcesErr: fmt.Errorf("server not available"),
wantError: true,
wantErrorCode: intPtr(500),
},
{
name: "refresh error - unable to retrieve all groups and resources",
schemaName: "management.cattle.io.globalrole",
needsRefresh: true,
serverGroupsResourcesErr: &discovery.ErrGroupDiscoveryFailed{
Groups: map[schema.GroupVersion]error{
{
Group: "other.cattle.io",
Version: "v1",
}: fmt.Errorf("some group error"),
},
}, },
wantError: true, wantError: true,
wantErrorCode: intPtr(500), wantErrorCode: intPtr(500),
}, },
{
name: "refresh error - unparesable gv",
schemaName: "management.cattle.io.globalrole",
needsRefresh: true,
unparseableGV: true,
wantError: true,
wantErrorCode: intPtr(500),
},
} }
for _, test := range tests { for _, test := range tests {
test := test test := test
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
t.Parallel() t.Parallel()
client, err := buildDefaultDiscovery() handler := SchemaDefinitionHandler{
client.DocumentErr = test.openapiError models: test.models,
client.GroupResourcesErr = test.serverGroupsResourcesErr schemaToModel: test.schemaToModel,
if test.useBadOpenApiDoc {
schema := client.Document.Definitions.AdditionalProperties[0]
schema.Value.Type = &openapi_v2.TypeItem{
Value: []string{"multiple", "entries"},
}
}
if test.unparseableGV {
client.Resources = append(client.Resources, &metav1.APIResourceList{
GroupVersion: "not/parse/able",
})
}
require.Nil(t, err)
handler := schemaDefinitionHandler{
client: client,
}
if !test.needsRefresh {
handler.lastRefresh = time.Now()
handler.refreshStale = time.Minute * 1
} }
request := types.APIRequest{ request := types.APIRequest{
Schemas: schemas, Schemas: schemas,
@ -248,66 +280,64 @@ func buildDefaultDiscovery() (*fakeDiscovery, error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to parse openapi document %w", err) return nil, fmt.Errorf("unable to parse openapi document %w", err)
} }
groups := []*metav1.APIGroup{ groups := []metav1.APIGroup{
{ {
Name: "management.cattle.io", Name: "management.cattle.io",
PreferredVersion: metav1.GroupVersionForDiscovery{ PreferredVersion: metav1.GroupVersionForDiscovery{
Version: "v2", GroupVersion: "management.cattle.io/v2",
Version: "v1",
}, },
}, Versions: []metav1.GroupVersionForDiscovery{
}
resources := []*metav1.APIResourceList{
{
GroupVersion: schema.GroupVersion{
Group: "management.cattle.io",
Version: "v2",
}.String(),
APIResources: []metav1.APIResource{
{ {
Group: "management.cattle.io", GroupVersion: "management.cattle.io/v1",
Kind: "GlobalRole", Version: "v1",
Version: "v2", },
{
GroupVersion: "management.cattle.io/v2",
Version: "v2",
}, },
}, },
}, },
{ {
GroupVersion: schema.GroupVersion{ Name: "noversion.cattle.io",
Group: "management.cattle.io", Versions: []metav1.GroupVersionForDiscovery{
Version: "v1",
}.String(),
APIResources: []metav1.APIResource{
{ {
Group: "management.cattle.io", GroupVersion: "noversion.cattle.io/v1",
Kind: "GlobalRole", Version: "v1",
Version: "v2", },
{
GroupVersion: "noversion.cattle.io/v2",
Version: "v2",
}, },
}, },
}, },
nil,
} }
return &fakeDiscovery{ return &fakeDiscovery{
Groups: groups, Groups: &metav1.APIGroupList{
Resources: resources, Groups: groups,
Document: document, },
Document: document,
}, nil }, nil
} }
type fakeDiscovery struct { type fakeDiscovery struct {
Groups []*metav1.APIGroup Groups *metav1.APIGroupList
Resources []*metav1.APIResourceList Document *openapi_v2.Document
Document *openapi_v2.Document GroupsErr error
GroupResourcesErr error DocumentErr error
DocumentErr error
} }
// ServerGroupsAndResources is the only method we actually need for the test - just returns what is on the struct // ServerGroups is the only method that needs to be mocked
func (f *fakeDiscovery) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) { func (f *fakeDiscovery) ServerGroups() (*metav1.APIGroupList, error) {
return f.Groups, f.Resources, f.GroupResourcesErr return f.Groups, f.GroupsErr
} }
// The rest of these methods are just here to conform to discovery.DiscoveryInterface // The rest of these methods are just here to conform to discovery.DiscoveryInterface
func (f *fakeDiscovery) RESTClient() restclient.Interface { return nil } func (f *fakeDiscovery) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
func (f *fakeDiscovery) ServerGroups() (*metav1.APIGroupList, error) { return nil, nil } return nil, nil, nil
}
func (f *fakeDiscovery) RESTClient() restclient.Interface { return nil }
func (f *fakeDiscovery) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) { func (f *fakeDiscovery) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
return nil, nil return nil, nil
} }

View File

@ -82,7 +82,129 @@ definitions:
- group: "management.cattle.io" - group: "management.cattle.io"
version: "v2" version: "v2"
kind: "GlobalRole" kind: "GlobalRole"
io.management.cattle.NotAKind: io.cattle.noversion.v2.Resource:
description: "A No Version V2 resource is for a group with no preferred version"
type: "object"
properties:
apiVersion:
description: "The APIVersion of this resource"
type: "string"
kind:
description: "The kind"
type: "string"
metadata:
description: "The metadata"
$ref: "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta"
spec:
description: "The spec for the resource"
type: "object"
required:
- "name"
properties:
name:
description: "The name of the resource"
type: "string"
notRequired:
description: "Some field that isn't required"
type: "boolean"
newField:
description: "A new field not present in v1"
type: "string"
x-kubernetes-group-version-kind:
- group: "noversion.cattle.io"
version: "v2"
kind: "Resource"
io.cattle.noversion.v1.Resource:
description: "A No Version V1 resource is for a group with no preferred version"
type: "object"
properties:
apiVersion:
description: "The APIVersion of this resource"
type: "string"
kind:
description: "The kind"
type: "string"
metadata:
description: "The metadata"
$ref: "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta"
spec:
description: "The spec for the resource"
type: "object"
required:
- "name"
properties:
name:
description: "The name of the resource"
type: "string"
notRequired:
description: "Some field that isn't required"
type: "boolean"
x-kubernetes-group-version-kind:
- group: "noversion.cattle.io"
version: "v1"
kind: "Resource"
io.cattle.missinggroup.v2.Resource:
description: "A Missing Group V2 resource is for a group not listed by server groups"
type: "object"
properties:
apiVersion:
description: "The APIVersion of this resource"
type: "string"
kind:
description: "The kind"
type: "string"
metadata:
description: "The metadata"
$ref: "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta"
spec:
description: "The spec for the resource"
type: "object"
required:
- "name"
properties:
name:
description: "The name of the resource"
type: "string"
notRequired:
description: "Some field that isn't required"
type: "boolean"
newField:
description: "A new field not present in v1"
type: "string"
x-kubernetes-group-version-kind:
- group: "missinggroup.cattle.io"
version: "v2"
kind: "Resource"
io.cattle.missinggroup.v1.Resource:
description: "A Missing Group V1 resource is for a group not listed by server groups"
type: "object"
properties:
apiVersion:
description: "The APIVersion of this resource"
type: "string"
kind:
description: "The kind"
type: "string"
metadata:
description: "The metadata"
$ref: "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta"
spec:
description: "The spec for the resource"
type: "object"
required:
- "name"
properties:
name:
description: "The name of the resource"
type: "string"
notRequired:
description: "Some field that isn't required"
type: "boolean"
x-kubernetes-group-version-kind:
- group: "missinggroup.cattle.io"
version: "v1"
kind: "Resource"
io.cattle.management.NotAKind:
type: "string" type: "string"
description: "Some string which isn't a kind" description: "Some string which isn't a kind"
io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta: io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta:

View File

@ -0,0 +1,48 @@
package definitions
import (
"context"
"time"
"github.com/rancher/steve/pkg/debounce"
apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiregv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
)
// refreshHandler triggers refreshes for a Debounceable refresher after a CRD/APIService has been changed
// intended to refresh the schema definitions after a CRD has been added and is hopefully available in k8s.
type refreshHandler struct {
// debounceRef is the debounceableRefresher containing the Refreshable (typically the schema definition handler)
debounceRef *debounce.DebounceableRefresher
// debounceDuration is the duration that the handler should ask the DebounceableRefresher to wait before refreshing
debounceDuration time.Duration
}
// onChangeCRD refreshes the debounceRef after a CRD is added/changed
func (r *refreshHandler) onChangeCRD(key string, crd *apiextv1.CustomResourceDefinition) (*apiextv1.CustomResourceDefinition, error) {
r.debounceRef.RefreshAfter(r.debounceDuration)
return crd, nil
}
// onChangeAPIService refreshes the debounceRef after an APIService is added/changed
func (r *refreshHandler) onChangeAPIService(key string, api *apiregv1.APIService) (*apiregv1.APIService, error) {
r.debounceRef.RefreshAfter(r.debounceDuration)
return api, nil
}
// startBackgroundRefresh starts a force refresh that runs for every tick of duration. Can be stopped
// by cancelling the context
func (r *refreshHandler) startBackgroundRefresh(ctx context.Context, duration time.Duration) {
go func() {
ticker := time.NewTicker(duration)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
r.debounceRef.RefreshAfter(r.debounceDuration)
}
}
}()
}

View File

@ -0,0 +1,84 @@
package definitions
import (
"context"
"sync/atomic"
"testing"
"time"
"github.com/rancher/steve/pkg/debounce"
"github.com/stretchr/testify/require"
apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apiregv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
)
type refreshable struct {
wasRefreshed atomic.Bool
}
func (r *refreshable) Refresh() error {
r.wasRefreshed.Store(true)
return nil
}
func Test_onChangeCRD(t *testing.T) {
internalRefresh := refreshable{}
refresher := debounce.DebounceableRefresher{
Refreshable: &internalRefresh,
}
refreshHandler := refreshHandler{
debounceRef: &refresher,
debounceDuration: time.Microsecond * 5,
}
input := apiextv1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{
Name: "test-crd",
},
}
output, err := refreshHandler.onChangeCRD("test-crd", &input)
require.Nil(t, err)
require.Equal(t, input, *output)
// waiting to allow the debouncer to refresh the refreshable
time.Sleep(time.Millisecond * 2)
require.True(t, internalRefresh.wasRefreshed.Load())
}
func Test_onChangeAPIService(t *testing.T) {
internalRefresh := refreshable{}
refresher := debounce.DebounceableRefresher{
Refreshable: &internalRefresh,
}
refreshHandler := refreshHandler{
debounceRef: &refresher,
debounceDuration: time.Microsecond * 5,
}
input := apiregv1.APIService{
ObjectMeta: metav1.ObjectMeta{
Name: "test-apiservice",
},
}
output, err := refreshHandler.onChangeAPIService("test-apiservice", &input)
require.Nil(t, err)
require.Equal(t, input, *output)
// waiting to allow the debouncer to refresh the refreshable
time.Sleep(time.Millisecond * 2)
require.True(t, internalRefresh.wasRefreshed.Load())
}
func Test_startBackgroundRefresh(t *testing.T) {
internalRefresh := refreshable{}
refresher := debounce.DebounceableRefresher{
Refreshable: &internalRefresh,
}
refreshHandler := refreshHandler{
debounceRef: &refresher,
debounceDuration: time.Microsecond * 5,
}
ctx, cancel := context.WithCancel(context.Background())
refreshHandler.startBackgroundRefresh(ctx, time.Microsecond*10)
time.Sleep(time.Millisecond * 2)
require.True(t, internalRefresh.wasRefreshed.Load())
cancel()
}

View File

@ -1,37 +1,30 @@
package definitions package definitions
import ( import (
"context"
"os"
"strconv"
"time" "time"
"github.com/rancher/apiserver/pkg/types" "github.com/rancher/apiserver/pkg/types"
"github.com/rancher/steve/pkg/debounce"
apiextcontrollerv1 "github.com/rancher/wrangler/v2/pkg/generated/controllers/apiextensions.k8s.io/v1"
v1 "github.com/rancher/wrangler/v2/pkg/generated/controllers/apiregistration.k8s.io/v1"
"github.com/rancher/wrangler/v2/pkg/schemas" "github.com/rancher/wrangler/v2/pkg/schemas"
"github.com/sirupsen/logrus"
"k8s.io/client-go/discovery" "k8s.io/client-go/discovery"
) )
const ( const (
gvkExtensionName = "x-kubernetes-group-version-kind" handlerKey = "schema-definitions"
gvkExtensionGroup = "group" delayEnvVar = "CATTLE_CRD_REFRESH_DELAY_SECONDS"
gvkExtensionVersion = "version" defaultDelay = 2
gvkExtensionKind = "kind" delayUnit = time.Second
defaultDuration = time.Second * 5 refreshEnvVar = "CATTLE_BACKGROUND_REFRESH_MINUTES"
defaultRefresh = 10
refreshUnit = time.Minute
) )
// Register registers the schemaDefinition schema.
func Register(baseSchema *types.APISchemas, client discovery.DiscoveryInterface) {
handler := schemaDefinitionHandler{
client: client,
refreshStale: defaultDuration,
}
baseSchema.MustAddSchema(types.APISchema{
Schema: &schemas.Schema{
ID: "schemaDefinition",
PluralName: "schemaDefinitions",
ResourceMethods: []string{"GET"},
},
ByIDHandler: handler.byIDHandler,
})
}
type schemaDefinition struct { type schemaDefinition struct {
DefinitionType string `json:"definitionType"` DefinitionType string `json:"definitionType"`
Definitions map[string]definition `json:"definitions"` Definitions map[string]definition `json:"definitions"`
@ -49,3 +42,55 @@ type definitionField struct {
Description string `json:"description,omitempty"` Description string `json:"description,omitempty"`
Required bool `json:"required,omitempty"` Required bool `json:"required,omitempty"`
} }
// Register registers the schemaDefinition schema.
func Register(ctx context.Context,
baseSchema *types.APISchemas,
client discovery.DiscoveryInterface,
crd apiextcontrollerv1.CustomResourceDefinitionController,
apiService v1.APIServiceController) {
handler := SchemaDefinitionHandler{
client: client,
}
baseSchema.MustAddSchema(types.APISchema{
Schema: &schemas.Schema{
ID: "schemaDefinition",
PluralName: "schemaDefinitions",
ResourceMethods: []string{"GET"},
},
ByIDHandler: handler.byIDHandler,
})
debounce := debounce.DebounceableRefresher{
Refreshable: &handler,
}
crdDebounce := getDurationEnvVarOrDefault(delayEnvVar, defaultDelay, delayUnit)
refHandler := refreshHandler{
debounceRef: &debounce,
debounceDuration: crdDebounce,
}
crd.OnChange(ctx, handlerKey, refHandler.onChangeCRD)
apiService.OnChange(ctx, handlerKey, refHandler.onChangeAPIService)
refreshFrequency := getDurationEnvVarOrDefault(refreshEnvVar, defaultRefresh, refreshUnit)
// there's a delay between when a CRD is created and when it is available in the openapi/v2 endpoint
// the crd/apiservice controllers use a delay of 2 seconds to account for this, but it's possible that this isn't
// enough in certain environments, so we also use an infrequent background refresh to eventually correct any misses
refHandler.startBackgroundRefresh(ctx, refreshFrequency)
}
// getDurationEnvVarOrDefault gets the duration value for a given envVar. If not found, it returns the provided default.
// unit is the unit of time (time.Second/time.Minute/etc.) that the returned duration should be in
func getDurationEnvVarOrDefault(envVar string, defaultVal int, unit time.Duration) time.Duration {
defaultDuration := time.Duration(defaultVal) * unit
envValue, ok := os.LookupEnv(envVar)
if !ok {
return defaultDuration
}
parsed, err := strconv.Atoi(envValue)
if err != nil {
logrus.Errorf("Env var %s was specified, but could not be converted to an int, default of %d seconds will be used",
envVar, int64(defaultDuration.Seconds()))
return defaultDuration
}
return time.Duration(parsed) * unit
}

View File

@ -0,0 +1,76 @@
package definitions
import (
"context"
"os"
"testing"
"time"
"github.com/golang/mock/gomock"
"github.com/rancher/apiserver/pkg/types"
"github.com/rancher/wrangler/v2/pkg/generic/fake"
"github.com/stretchr/testify/require"
apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiregv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
)
func TestRegister(t *testing.T) {
schemas := types.EmptyAPISchemas()
client := fakeDiscovery{}
ctrl := gomock.NewController(t)
crdController := fake.NewMockNonNamespacedControllerInterface[*apiextv1.CustomResourceDefinition, *apiextv1.CustomResourceDefinitionList](ctrl)
apisvcController := fake.NewMockNonNamespacedControllerInterface[*apiregv1.APIService, *apiregv1.APIServiceList](ctrl)
ctx, cancel := context.WithCancel(context.Background())
crdController.EXPECT().OnChange(ctx, handlerKey, gomock.Any())
apisvcController.EXPECT().OnChange(ctx, handlerKey, gomock.Any())
Register(ctx, schemas, &client, crdController, apisvcController)
registeredSchema := schemas.LookupSchema("schemaDefinition")
require.NotNil(t, registeredSchema)
require.Len(t, registeredSchema.ResourceMethods, 1)
require.Equal(t, registeredSchema.ResourceMethods[0], "GET")
require.NotNil(t, registeredSchema.ByIDHandler)
// Register will spawn a background thread, so we want to stop that to not impact other tests
cancel()
}
func Test_getDurationEnvVarOrDefault(t *testing.T) {
os.Setenv("VALID", "1")
os.Setenv("INVALID", "NOTANUMBER")
tests := []struct {
name string
envVar string
defaultValue int
unit time.Duration
wantDuration time.Duration
}{
{
name: "not found, use default",
envVar: "NOT_FOUND",
defaultValue: 12,
unit: time.Second,
wantDuration: time.Second * 12,
},
{
name: "found but not an int",
envVar: "INVALID",
defaultValue: 24,
unit: time.Minute,
wantDuration: time.Minute * 24,
},
{
name: "found and valid int",
envVar: "VALID",
defaultValue: 30,
unit: time.Hour,
wantDuration: time.Hour * 1,
},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
got := getDurationEnvVarOrDefault(test.envVar, test.defaultValue, test.unit)
require.Equal(t, test.wantDuration, got)
})
}
}

View File

@ -142,7 +142,8 @@ func setup(ctx context.Context, server *Server) error {
if err = resources.DefaultSchemas(ctx, server.BaseSchemas, ccache, server.ClientFactory, sf, server.Version); err != nil { if err = resources.DefaultSchemas(ctx, server.BaseSchemas, ccache, server.ClientFactory, sf, server.Version); err != nil {
return err return err
} }
definitions.Register(server.BaseSchemas, server.controllers.K8s.Discovery()) definitions.Register(ctx, server.BaseSchemas, server.controllers.K8s.Discovery(),
server.controllers.CRD.CustomResourceDefinition(), server.controllers.API.APIService())
summaryCache := summarycache.New(sf, ccache) summaryCache := summarycache.New(sf, ccache)
summaryCache.Start(ctx) summaryCache.Start(ctx)