mirror of
https://github.com/rancher/steve.git
synced 2025-09-13 05:49:16 +00:00
Hard-wire external associations: 3 sections in, this one is 4/7 (#645)
* Continue rebasing. * Wrote unit tests for external associations. * Fix the generated SQL. Some syntactic sugar (capitalizing the keywords), but use the 'ON' syntax on JOINs. * whitespace fix post rebase * We want "management.cattle.io.projects:spec.displayName" not "...spec.clusterName" * Fix the database calls: drop the key * Fix breakage during automatic rebase merging gone wrong. * ws fix - NFC * Post rebase-merge fixes * Fix rebase-driven merge. * Fix rebase breakage. * go-uber v0.5.2 prefers real arg names to '<argN>'
This commit is contained in:
@@ -12,6 +12,9 @@ import (
|
||||
"github.com/rancher/lasso/pkg/log"
|
||||
"github.com/rancher/steve/pkg/sqlcache/db"
|
||||
"github.com/rancher/steve/pkg/sqlcache/db/transaction"
|
||||
"github.com/rancher/steve/pkg/sqlcache/sqltypes"
|
||||
"github.com/sirupsen/logrus"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
// needed for drivers
|
||||
@@ -37,11 +40,13 @@ const (
|
||||
type Store struct {
|
||||
db.Client
|
||||
|
||||
ctx context.Context
|
||||
name string
|
||||
typ reflect.Type
|
||||
keyFunc cache.KeyFunc
|
||||
shouldEncrypt bool
|
||||
ctx context.Context
|
||||
gvk schema.GroupVersionKind
|
||||
name string
|
||||
externalUpdateInfo *sqltypes.ExternalGVKUpdates
|
||||
typ reflect.Type
|
||||
keyFunc cache.KeyFunc
|
||||
shouldEncrypt bool
|
||||
|
||||
upsertQuery string
|
||||
deleteQuery string
|
||||
@@ -67,18 +72,20 @@ type Store struct {
|
||||
var _ cache.Store = (*Store)(nil)
|
||||
|
||||
// NewStore creates a SQLite-backed cache.Store for objects of the given example type
|
||||
func NewStore(ctx context.Context, example any, keyFunc cache.KeyFunc, c db.Client, shouldEncrypt bool, name string) (*Store, error) {
|
||||
func NewStore(ctx context.Context, example any, keyFunc cache.KeyFunc, c db.Client, shouldEncrypt bool, gvk schema.GroupVersionKind, name string, externalUpdateInfo *sqltypes.ExternalGVKUpdates) (*Store, error) {
|
||||
s := &Store{
|
||||
ctx: ctx,
|
||||
name: name,
|
||||
typ: reflect.TypeOf(example),
|
||||
Client: c,
|
||||
keyFunc: keyFunc,
|
||||
shouldEncrypt: shouldEncrypt,
|
||||
afterAdd: []func(key string, obj any, tx transaction.Client) error{},
|
||||
afterUpdate: []func(key string, obj any, tx transaction.Client) error{},
|
||||
afterDelete: []func(key string, obj any, tx transaction.Client) error{},
|
||||
afterDeleteAll: []func(tx transaction.Client) error{},
|
||||
ctx: ctx,
|
||||
name: name,
|
||||
gvk: gvk,
|
||||
externalUpdateInfo: externalUpdateInfo,
|
||||
typ: reflect.TypeOf(example),
|
||||
Client: c,
|
||||
keyFunc: keyFunc,
|
||||
shouldEncrypt: shouldEncrypt,
|
||||
afterAdd: []func(key string, obj any, tx transaction.Client) error{},
|
||||
afterUpdate: []func(key string, obj any, tx transaction.Client) error{},
|
||||
afterDelete: []func(key string, obj any, tx transaction.Client) error{},
|
||||
afterDeleteAll: []func(tx transaction.Client) error{},
|
||||
}
|
||||
|
||||
dbName := db.Sanitize(s.name)
|
||||
@@ -114,7 +121,103 @@ func NewStore(ctx context.Context, example any, keyFunc cache.KeyFunc, c db.Clie
|
||||
return s, nil
|
||||
}
|
||||
|
||||
/* Core methods */
|
||||
func (s *Store) checkUpdateExternalInfo(key string) error {
|
||||
if s.externalUpdateInfo == nil {
|
||||
return nil
|
||||
}
|
||||
return s.WithTransaction(s.ctx, true, func(tx transaction.Client) error {
|
||||
if err := s.updateExternalInfo(tx, key, s.externalUpdateInfo); err != nil {
|
||||
// Just report and ignore errors
|
||||
logrus.Errorf("Error updating external info %v: %s", s.externalUpdateInfo, err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Store) updateExternalInfo(tx transaction.Client, key string, externalUpdateInfo *sqltypes.ExternalGVKUpdates) error {
|
||||
for _, labelDep := range externalUpdateInfo.ExternalLabelDependencies {
|
||||
rawGetStmt := fmt.Sprintf(`SELECT DISTINCT f.key, ex2."%s" FROM "%s_fields" f
|
||||
LEFT OUTER JOIN "%s_labels" lt1 ON f.key = lt1.key
|
||||
JOIN "%s_fields" ex2 ON lt1.value = ex2."%s"
|
||||
WHERE lt1.label = ? AND f."%s" != ex2."%s"`,
|
||||
labelDep.TargetFinalFieldName,
|
||||
labelDep.SourceGVK,
|
||||
labelDep.SourceGVK,
|
||||
labelDep.TargetGVK,
|
||||
labelDep.TargetKeyFieldName,
|
||||
labelDep.TargetFinalFieldName,
|
||||
labelDep.TargetFinalFieldName,
|
||||
)
|
||||
getStmt := s.Prepare(rawGetStmt)
|
||||
rows, err := s.QueryForRows(s.ctx, getStmt, labelDep.SourceLabelName)
|
||||
if err != nil {
|
||||
logrus.Infof("Error getting external info for table %s, key %s: %v", labelDep.TargetGVK, key, &db.QueryError{QueryString: rawGetStmt, Err: err})
|
||||
continue
|
||||
}
|
||||
result, err := s.ReadStrings2(rows)
|
||||
if err != nil {
|
||||
logrus.Infof("Error reading objects for table %s, key %s: %s", labelDep.TargetGVK, key, err)
|
||||
continue
|
||||
}
|
||||
if len(result) == 0 {
|
||||
continue
|
||||
}
|
||||
for _, innerResult := range result {
|
||||
sourceKey := innerResult[0]
|
||||
finalTargetValue := innerResult[1]
|
||||
rawStmt := fmt.Sprintf(`UPDATE "%s_fields" SET "%s" = ? WHERE key = ?`,
|
||||
labelDep.SourceGVK, labelDep.TargetFinalFieldName)
|
||||
preparedStmt := s.Prepare(rawStmt)
|
||||
_, err = tx.Stmt(preparedStmt).Exec(finalTargetValue, sourceKey)
|
||||
if err != nil {
|
||||
logrus.Infof("Error running %s(%s, %s): %s", rawStmt, finalTargetValue, sourceKey, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, nonLabelDep := range externalUpdateInfo.ExternalDependencies {
|
||||
rawGetStmt := fmt.Sprintf(`SELECT f.key, ex2."%s"
|
||||
FROM "%s_fields" f JOIN "%s_fields" ex2 ON f."%s" = ex2."%s"
|
||||
WHERE f."%s" != ex2."%s"`,
|
||||
nonLabelDep.TargetFinalFieldName,
|
||||
nonLabelDep.SourceGVK,
|
||||
nonLabelDep.TargetGVK,
|
||||
nonLabelDep.SourceFieldName,
|
||||
nonLabelDep.TargetKeyFieldName,
|
||||
nonLabelDep.TargetFinalFieldName,
|
||||
nonLabelDep.TargetFinalFieldName)
|
||||
// TODO: Try to fold the two blocks together
|
||||
|
||||
getStmt := s.Prepare(rawGetStmt)
|
||||
rows, err := s.QueryForRows(s.ctx, getStmt, nonLabelDep.SourceFieldName)
|
||||
if err != nil {
|
||||
logrus.Infof("Error getting external info for table %s, key %s: %v", nonLabelDep.TargetGVK, key, &db.QueryError{QueryString: rawGetStmt, Err: err})
|
||||
continue
|
||||
}
|
||||
result, err := s.ReadStrings2(rows)
|
||||
if err != nil {
|
||||
logrus.Infof("Error reading objects for table %s, key %s: %s", nonLabelDep.TargetGVK, key, err)
|
||||
continue
|
||||
}
|
||||
if len(result) == 0 {
|
||||
continue
|
||||
}
|
||||
for _, innerResult := range result {
|
||||
sourceKey := innerResult[0]
|
||||
finalTargetValue := innerResult[1]
|
||||
rawStmt := fmt.Sprintf(`UPDATE "%s_fields" SET "%s" = ? WHERE key = ?`,
|
||||
nonLabelDep.SourceGVK, nonLabelDep.TargetFinalFieldName)
|
||||
preparedStmt := s.Prepare(rawStmt)
|
||||
_, err = tx.Stmt(preparedStmt).Exec(finalTargetValue, sourceKey)
|
||||
if err != nil {
|
||||
logrus.Infof("Error running %s(%s, %s): %s", rawStmt, finalTargetValue, sourceKey, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
// deleteByKey deletes the object associated with key, if it exists in this Store
|
||||
func (s *Store) deleteByKey(key string, obj any) error {
|
||||
@@ -153,6 +256,8 @@ func (s *Store) GetByKey(key string) (item any, exists bool, err error) {
|
||||
|
||||
/* Satisfy cache.Store */
|
||||
|
||||
/* Core methods */
|
||||
|
||||
// Add saves an obj, or updates it if it exists in this Store
|
||||
func (s *Store) Add(obj any) error {
|
||||
key, err := s.keyFunc(obj)
|
||||
@@ -177,7 +282,7 @@ func (s *Store) Add(obj any) error {
|
||||
log.Errorf("Error in Store.Add for type %v: %v", s.name, err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return s.checkUpdateExternalInfo(key)
|
||||
}
|
||||
|
||||
// Update saves an obj, or updates it if it exists in this Store
|
||||
@@ -204,7 +309,7 @@ func (s *Store) Update(obj any) error {
|
||||
log.Errorf("Error in Store.Update for type %v: %v", s.name, err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return s.checkUpdateExternalInfo(key)
|
||||
}
|
||||
|
||||
// Delete deletes the given object, if it exists in this Store
|
||||
|
Reference in New Issue
Block a user