mirror of
https://github.com/rancher/steve.git
synced 2025-09-08 02:39:26 +00:00
Split RegisterAfterUpsert into RegisterAfterAdd and RegisterAfterUpdate (#644)
* Split RegisterAfterUpsert into two We're going to need to be able to differentiate between Add and Update for storing events in the _events table. * Update mocks
This commit is contained in:
@@ -54,7 +54,8 @@ type Store struct {
|
||||
listStmt *sql.Stmt
|
||||
listKeysStmt *sql.Stmt
|
||||
|
||||
afterUpsert []func(key string, obj any, tx transaction.Client) error
|
||||
afterAdd []func(key string, obj any, tx transaction.Client) error
|
||||
afterUpdate []func(key string, obj any, tx transaction.Client) error
|
||||
afterDelete []func(key string, tx transaction.Client) error
|
||||
}
|
||||
|
||||
@@ -70,7 +71,8 @@ func NewStore(ctx context.Context, example any, keyFunc cache.KeyFunc, c db.Clie
|
||||
Client: c,
|
||||
keyFunc: keyFunc,
|
||||
shouldEncrypt: shouldEncrypt,
|
||||
afterUpsert: []func(key string, obj any, tx transaction.Client) error{},
|
||||
afterAdd: []func(key string, obj any, tx transaction.Client) error{},
|
||||
afterUpdate: []func(key string, obj any, tx transaction.Client) error{},
|
||||
afterDelete: []func(key string, tx transaction.Client) error{},
|
||||
}
|
||||
|
||||
@@ -106,22 +108,6 @@ func NewStore(ctx context.Context, example any, keyFunc cache.KeyFunc, c db.Clie
|
||||
}
|
||||
|
||||
/* Core methods */
|
||||
// upsert saves an obj with its key, or updates key with obj if it exists in this Store
|
||||
func (s *Store) upsert(key string, obj any) error {
|
||||
return s.WithTransaction(s.ctx, true, func(tx transaction.Client) error {
|
||||
err := s.Upsert(tx, s.upsertStmt, key, obj, s.shouldEncrypt)
|
||||
if err != nil {
|
||||
return &db.QueryError{QueryString: s.upsertQuery, Err: err}
|
||||
}
|
||||
|
||||
err = s.runAfterUpsert(key, obj, tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// deleteByKey deletes the object associated with key, if it exists in this Store
|
||||
func (s *Store) deleteByKey(key string) error {
|
||||
@@ -167,7 +153,19 @@ func (s *Store) Add(obj any) error {
|
||||
return err
|
||||
}
|
||||
|
||||
err = s.upsert(key, obj)
|
||||
err = s.WithTransaction(s.ctx, true, func(tx transaction.Client) error {
|
||||
err := s.Upsert(tx, s.upsertStmt, key, obj, s.shouldEncrypt)
|
||||
if err != nil {
|
||||
return &db.QueryError{QueryString: s.upsertQuery, Err: err}
|
||||
}
|
||||
|
||||
err = s.runAfterAdd(key, obj, tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
log.Errorf("Error in Store.Add for type %v: %v", s.name, err)
|
||||
return err
|
||||
@@ -177,7 +175,29 @@ func (s *Store) Add(obj any) error {
|
||||
|
||||
// Update saves an obj, or updates it if it exists in this Store
|
||||
func (s *Store) Update(obj any) error {
|
||||
return s.Add(obj)
|
||||
key, err := s.keyFunc(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = s.WithTransaction(s.ctx, true, func(tx transaction.Client) error {
|
||||
err := s.Upsert(tx, s.upsertStmt, key, obj, s.shouldEncrypt)
|
||||
if err != nil {
|
||||
return &db.QueryError{QueryString: s.upsertQuery, Err: err}
|
||||
}
|
||||
|
||||
err = s.runAfterUpdate(key, obj, tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
log.Errorf("Error in Store.Update for type %v: %v", s.name, err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete deletes the given object, if it exists in this Store
|
||||
@@ -279,7 +299,7 @@ func (s *Store) replaceByKey(objects map[string]any) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = s.runAfterUpsert(key, obj, txC)
|
||||
err = s.runAfterAdd(key, obj, txC)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -296,11 +316,6 @@ func (s *Store) Resync() error {
|
||||
|
||||
/* Utilities */
|
||||
|
||||
// RegisterAfterUpsert registers a func to be called after each upsert
|
||||
func (s *Store) RegisterAfterUpsert(f func(key string, obj any, txC transaction.Client) error) {
|
||||
s.afterUpsert = append(s.afterUpsert, f)
|
||||
}
|
||||
|
||||
func (s *Store) GetName() string {
|
||||
return s.name
|
||||
}
|
||||
@@ -313,10 +328,24 @@ func (s *Store) GetType() reflect.Type {
|
||||
return s.typ
|
||||
}
|
||||
|
||||
// keep
|
||||
// runAfterUpsert executes functions registered to run after upsert
|
||||
func (s *Store) runAfterUpsert(key string, obj any, txC transaction.Client) error {
|
||||
for _, f := range s.afterUpsert {
|
||||
// RegisterAfterAdd registers a func to be called after each add event
|
||||
func (s *Store) RegisterAfterAdd(f func(key string, obj any, txC transaction.Client) error) {
|
||||
s.afterAdd = append(s.afterAdd, f)
|
||||
}
|
||||
|
||||
// RegisterAfterUpdate registers a func to be called after each update event
|
||||
func (s *Store) RegisterAfterUpdate(f func(key string, obj any, txC transaction.Client) error) {
|
||||
s.afterUpdate = append(s.afterUpdate, f)
|
||||
}
|
||||
|
||||
// RegisterAfterDelete registers a func to be called after each deletion
|
||||
func (s *Store) RegisterAfterDelete(f func(key string, txC transaction.Client) error) {
|
||||
s.afterDelete = append(s.afterDelete, f)
|
||||
}
|
||||
|
||||
// runAfterAdd executes functions registered to run after add event
|
||||
func (s *Store) runAfterAdd(key string, obj any, txC transaction.Client) error {
|
||||
for _, f := range s.afterAdd {
|
||||
err := f(key, obj, txC)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -325,13 +354,18 @@ func (s *Store) runAfterUpsert(key string, obj any, txC transaction.Client) erro
|
||||
return nil
|
||||
}
|
||||
|
||||
// RegisterAfterDelete registers a func to be called after each deletion
|
||||
func (s *Store) RegisterAfterDelete(f func(key string, txC transaction.Client) error) {
|
||||
s.afterDelete = append(s.afterDelete, f)
|
||||
// runAfterUpdate executes functions registered to run after update event
|
||||
func (s *Store) runAfterUpdate(key string, obj any, txC transaction.Client) error {
|
||||
for _, f := range s.afterUpdate {
|
||||
err := f(key, obj, txC)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// keep
|
||||
// runAfterDelete executes functions registered to run after upsert
|
||||
// runAfterDelete executes functions registered to run after delete event
|
||||
func (s *Store) runAfterDelete(key string, txC transaction.Client) error {
|
||||
for _, f := range s.afterDelete {
|
||||
err := f(key, txC)
|
||||
|
@@ -62,7 +62,7 @@ func TestAdd(t *testing.T) {
|
||||
},
|
||||
})
|
||||
|
||||
tests = append(tests, testCase{description: "Add with no DB client errors and an afterUpsert function", test: func(t *testing.T, shouldEncrypt bool) {
|
||||
tests = append(tests, testCase{description: "Add with no DB client errors and an afterAdd function", test: func(t *testing.T, shouldEncrypt bool) {
|
||||
c, txC := SetupMockDB(t)
|
||||
store := SetupStore(t, c, shouldEncrypt)
|
||||
|
||||
@@ -76,7 +76,7 @@ func TestAdd(t *testing.T) {
|
||||
})
|
||||
|
||||
var count int
|
||||
store.afterUpsert = append(store.afterUpsert, func(key string, object any, tx transaction.Client) error {
|
||||
store.afterAdd = append(store.afterAdd, func(key string, object any, tx transaction.Client) error {
|
||||
count++
|
||||
return nil
|
||||
})
|
||||
@@ -86,7 +86,7 @@ func TestAdd(t *testing.T) {
|
||||
},
|
||||
})
|
||||
|
||||
tests = append(tests, testCase{description: "Add with no DB client errors and an afterUpsert function that returns error", test: func(t *testing.T, shouldEncrypt bool) {
|
||||
tests = append(tests, testCase{description: "Add with no DB client errors and an afterAdd function that returns error", test: func(t *testing.T, shouldEncrypt bool) {
|
||||
c, txC := SetupMockDB(t)
|
||||
store := SetupStore(t, c, shouldEncrypt)
|
||||
|
||||
@@ -99,7 +99,7 @@ func TestAdd(t *testing.T) {
|
||||
}
|
||||
})
|
||||
|
||||
store.afterUpsert = append(store.afterUpsert, func(key string, object any, txC transaction.Client) error {
|
||||
store.afterAdd = append(store.afterAdd, func(key string, object any, txC transaction.Client) error {
|
||||
return fmt.Errorf("error")
|
||||
})
|
||||
err := store.Add(testObject)
|
||||
@@ -184,7 +184,7 @@ func TestUpdate(t *testing.T) {
|
||||
},
|
||||
})
|
||||
|
||||
tests = append(tests, testCase{description: "Update with no DB client errors and an afterUpsert function", test: func(t *testing.T, shouldEncrypt bool) {
|
||||
tests = append(tests, testCase{description: "Update with no DB client errors and an afterUpdate function", test: func(t *testing.T, shouldEncrypt bool) {
|
||||
c, txC := SetupMockDB(t)
|
||||
store := SetupStore(t, c, shouldEncrypt)
|
||||
|
||||
@@ -198,7 +198,7 @@ func TestUpdate(t *testing.T) {
|
||||
})
|
||||
|
||||
var count int
|
||||
store.afterUpsert = append(store.afterUpsert, func(key string, object any, txC transaction.Client) error {
|
||||
store.afterUpdate = append(store.afterUpdate, func(key string, object any, txC transaction.Client) error {
|
||||
count++
|
||||
return nil
|
||||
})
|
||||
@@ -208,7 +208,7 @@ func TestUpdate(t *testing.T) {
|
||||
},
|
||||
})
|
||||
|
||||
tests = append(tests, testCase{description: "Update with no DB client errors and an afterUpsert function that returns error", test: func(t *testing.T, shouldEncrypt bool) {
|
||||
tests = append(tests, testCase{description: "Update with no DB client errors and an afterUpdate function that returns error", test: func(t *testing.T, shouldEncrypt bool) {
|
||||
c, txC := SetupMockDB(t)
|
||||
store := SetupStore(t, c, shouldEncrypt)
|
||||
|
||||
@@ -222,7 +222,7 @@ func TestUpdate(t *testing.T) {
|
||||
}
|
||||
})
|
||||
|
||||
store.afterUpsert = append(store.afterUpsert, func(key string, object any, txC transaction.Client) error {
|
||||
store.afterUpdate = append(store.afterUpdate, func(key string, object any, txC transaction.Client) error {
|
||||
return fmt.Errorf("error")
|
||||
})
|
||||
err := store.Update(testObject)
|
||||
|
Reference in New Issue
Block a user