mirror of
https://github.com/rancher/steve.git
synced 2025-09-23 04:19:37 +00:00
* On Reset, remove all the sqlite files, not just the main one. * Avoid resetting the database and cache when not needed. Avoid resetting when a change doesn't bring in new schema fields. Always reset when a CRD is deleted. * Force a database reset when a new CRD is added. * Improve code readability. No functional change.
This commit is contained in:
@@ -19,6 +19,7 @@ import (
|
||||
authorizationv1 "k8s.io/api/authorization/v1"
|
||||
apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
k8sapimachineryschema "k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/discovery"
|
||||
authorizationv1client "k8s.io/client-go/kubernetes/typed/authorization/v1"
|
||||
apiv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
|
||||
@@ -31,23 +32,27 @@ var (
|
||||
}
|
||||
)
|
||||
|
||||
type SchemasHandlerFunc func(schemas *schema2.Collection) error
|
||||
type SchemasHandlerFunc func(schemas *schema2.Collection, changedSchemas map[string]*types.APISchema, deletedSomething bool) error
|
||||
|
||||
func (s SchemasHandlerFunc) OnSchemas(schemas *schema2.Collection) error {
|
||||
return s(schemas)
|
||||
func (s SchemasHandlerFunc) OnSchemas(schemas *schema2.Collection, changedSchemas map[string]*types.APISchema, forceChange bool) error {
|
||||
return s(schemas, changedSchemas, forceChange)
|
||||
}
|
||||
|
||||
type handler struct {
|
||||
sync.Mutex
|
||||
|
||||
ctx context.Context
|
||||
toSync int32
|
||||
schemas *schema2.Collection
|
||||
client discovery.DiscoveryInterface
|
||||
cols *common.DynamicColumns
|
||||
crd apiextcontrollerv1.CustomResourceDefinitionClient
|
||||
ssar authorizationv1client.SelfSubjectAccessReviewInterface
|
||||
handler SchemasHandlerFunc
|
||||
ctx context.Context
|
||||
toSync int32
|
||||
schemas *schema2.Collection
|
||||
client discovery.DiscoveryInterface
|
||||
cols *common.DynamicColumns
|
||||
crdClient apiextcontrollerv1.CustomResourceDefinitionClient
|
||||
ssar authorizationv1client.SelfSubjectAccessReviewInterface
|
||||
handler SchemasHandlerFunc
|
||||
changedIDs map[k8sapimachineryschema.GroupVersionKind]bool
|
||||
createdCRDs map[k8sapimachineryschema.GroupVersionKind]bool
|
||||
deletedCRDs map[k8sapimachineryschema.GroupVersionKind]bool
|
||||
gvksFromKeys map[string][]k8sapimachineryschema.GroupVersionKind
|
||||
}
|
||||
|
||||
func Register(ctx context.Context,
|
||||
@@ -60,20 +65,59 @@ func Register(ctx context.Context,
|
||||
schemas *schema2.Collection) {
|
||||
|
||||
h := &handler{
|
||||
ctx: ctx,
|
||||
cols: cols,
|
||||
client: discovery,
|
||||
schemas: schemas,
|
||||
handler: schemasHandler,
|
||||
crd: crd,
|
||||
ssar: ssar,
|
||||
ctx: ctx,
|
||||
cols: cols,
|
||||
client: discovery,
|
||||
schemas: schemas,
|
||||
handler: schemasHandler,
|
||||
crdClient: crd,
|
||||
ssar: ssar,
|
||||
changedIDs: make(map[k8sapimachineryschema.GroupVersionKind]bool),
|
||||
gvksFromKeys: make(map[string][]k8sapimachineryschema.GroupVersionKind),
|
||||
createdCRDs: make(map[k8sapimachineryschema.GroupVersionKind]bool),
|
||||
deletedCRDs: make(map[k8sapimachineryschema.GroupVersionKind]bool),
|
||||
}
|
||||
|
||||
apiService.OnChange(ctx, "schema", h.OnChangeAPIService)
|
||||
crd.OnChange(ctx, "schema", h.OnChangeCRD)
|
||||
}
|
||||
|
||||
func (h *handler) handleDeletedCRD(key string, crd *apiextv1.CustomResourceDefinition) {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
gvkList, ok := h.gvksFromKeys[key]
|
||||
if !ok {
|
||||
logrus.Infof("No associated GVK for CRD key %s", key)
|
||||
return
|
||||
}
|
||||
for _, gvk := range gvkList {
|
||||
h.deletedCRDs[gvk] = true
|
||||
}
|
||||
delete(h.gvksFromKeys, key) // Don't need this anymore
|
||||
h.queueRefresh()
|
||||
}
|
||||
|
||||
func (h *handler) OnChangeCRD(key string, crd *apiextv1.CustomResourceDefinition) (*apiextv1.CustomResourceDefinition, error) {
|
||||
if crd == nil {
|
||||
h.handleDeletedCRD(key, crd)
|
||||
return crd, nil
|
||||
}
|
||||
spec := crd.Spec
|
||||
group := spec.Group
|
||||
kind := spec.Names.Kind
|
||||
gvkList := make([]k8sapimachineryschema.GroupVersionKind, len(spec.Versions))
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
for i, version := range spec.Versions {
|
||||
gvk := k8sapimachineryschema.GroupVersionKind{Group: group, Version: version.Name, Kind: kind}
|
||||
gvkList[i] = gvk
|
||||
h.changedIDs[gvk] = true
|
||||
_, ok := h.gvksFromKeys[key]
|
||||
if !ok {
|
||||
h.createdCRDs[gvk] = true
|
||||
}
|
||||
}
|
||||
h.gvksFromKeys[key] = gvkList
|
||||
h.queueRefresh()
|
||||
return crd, nil
|
||||
}
|
||||
@@ -88,7 +132,29 @@ func (h *handler) queueRefresh() {
|
||||
|
||||
go func() {
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
if err := h.refreshAll(h.ctx); err != nil {
|
||||
var err error
|
||||
var changedIDs map[k8sapimachineryschema.GroupVersionKind]bool
|
||||
var deletedCRDs map[k8sapimachineryschema.GroupVersionKind]bool
|
||||
var createdCRDs map[k8sapimachineryschema.GroupVersionKind]bool
|
||||
h.Lock()
|
||||
if len(h.createdCRDs) > 0 {
|
||||
createdCRDs = h.createdCRDs
|
||||
h.createdCRDs = make(map[k8sapimachineryschema.GroupVersionKind]bool)
|
||||
}
|
||||
if len(h.deletedCRDs) > 0 {
|
||||
deletedCRDs = h.deletedCRDs
|
||||
h.deletedCRDs = make(map[k8sapimachineryschema.GroupVersionKind]bool)
|
||||
}
|
||||
if len(h.changedIDs) > 0 {
|
||||
changedIDs = h.changedIDs
|
||||
h.changedIDs = make(map[k8sapimachineryschema.GroupVersionKind]bool)
|
||||
}
|
||||
h.Unlock()
|
||||
crdNumCountChanged := len(deletedCRDs) > 0 || len(createdCRDs) > 0
|
||||
if len(changedIDs) > 0 || crdNumCountChanged {
|
||||
err = h.refreshAll(h.ctx, changedIDs, crdNumCountChanged)
|
||||
}
|
||||
if err != nil {
|
||||
logrus.Errorf("failed to sync schemas: %v", err)
|
||||
atomic.StoreInt32(&h.toSync, 1)
|
||||
}
|
||||
@@ -148,7 +214,7 @@ func (h *handler) getColumns(ctx context.Context, schemas map[string]*types.APIS
|
||||
return eg.Wait()
|
||||
}
|
||||
|
||||
func (h *handler) refreshAll(ctx context.Context) error {
|
||||
func (h *handler) refreshAll(ctx context.Context, changedGVKs map[k8sapimachineryschema.GroupVersionKind]bool, forceChange bool) error {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
@@ -156,12 +222,13 @@ func (h *handler) refreshAll(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
schemas, err := converter.ToSchemas(h.crd, h.client)
|
||||
schemas, err := converter.ToSchemas(h.crdClient, h.client)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
filteredSchemas := map[string]*types.APISchema{}
|
||||
changedSchemasByID := map[string]*types.APISchema{}
|
||||
for _, schema := range schemas {
|
||||
if IsListWatchable(schema) {
|
||||
if preferredTypeExists(schema, schemas) {
|
||||
@@ -181,6 +248,10 @@ func (h *handler) refreshAll(ctx context.Context) error {
|
||||
schema.PluralName = converter.GVRToPluralName(gvr)
|
||||
}
|
||||
filteredSchemas[schema.ID] = schema
|
||||
if changedGVKs[gvk] {
|
||||
// nil[x] is always false if the first-time runner called this
|
||||
changedSchemasByID[schema.ID] = schema
|
||||
}
|
||||
}
|
||||
|
||||
if err := h.getColumns(h.ctx, filteredSchemas); err != nil {
|
||||
@@ -189,7 +260,7 @@ func (h *handler) refreshAll(ctx context.Context) error {
|
||||
|
||||
h.schemas.Reset(filteredSchemas)
|
||||
if h.handler != nil {
|
||||
return h.handler.OnSchemas(h.schemas)
|
||||
return h.handler.OnSchemas(h.schemas, changedSchemasByID, forceChange)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
Reference in New Issue
Block a user