From 496a6f89687e80b324b4c511306c8931268765dd Mon Sep 17 00:00:00 2001 From: Eric Promislow Date: Wed, 25 Jun 2025 16:10:48 -0700 Subject: [PATCH] Hard-wire external associations: 3 sections in, this one is 4/7 (#645) * Continue rebasing. * Wrote unit tests for external associations. * Fix the generated SQL. Some syntactic sugar (capitalizing the keywords), but use the 'ON' syntax on JOINs. * whitespace fix post rebase * We want "management.cattle.io.projects:spec.displayName" not "...spec.clusterName" * Fix the database calls: drop the key * Fix breakage during automatic rebase merging gone wrong. * ws fix - NFC * Post rebase-merge fixes * Fix rebase-driven merge. * Fix rebase breakage. * go-uber v0.5.2 prefers real arg names to '' --- pkg/sqlcache/db/client.go | 29 ++++ pkg/sqlcache/informer/db_mocks_test.go | 15 ++ .../informer/factory/db_mocks_test.go | 15 ++ .../informer/factory/informer_factory.go | 7 +- .../informer/factory/informer_factory_test.go | 49 +++--- pkg/sqlcache/informer/informer.go | 4 +- pkg/sqlcache/informer/informer_test.go | 12 +- .../informer/listoption_indexer_test.go | 2 +- pkg/sqlcache/informer/sql_mocks_test.go | 15 ++ pkg/sqlcache/integration_test.go | 2 +- pkg/sqlcache/sqltypes/types.go | 32 ++++ pkg/sqlcache/store/db_mocks_test.go | 15 ++ pkg/sqlcache/store/store.go | 143 +++++++++++++++--- pkg/sqlcache/store/store_test.go | 131 +++++++++++++++- pkg/stores/sqlproxy/proxy_mocks_test.go | 8 +- pkg/stores/sqlproxy/proxy_store.go | 31 +++- pkg/stores/sqlproxy/proxy_store_test.go | 18 +-- 17 files changed, 456 insertions(+), 72 deletions(-) diff --git a/pkg/sqlcache/db/client.go b/pkg/sqlcache/db/client.go index 8a5554a7..4e680c22 100644 --- a/pkg/sqlcache/db/client.go +++ b/pkg/sqlcache/db/client.go @@ -44,6 +44,7 @@ type Client interface { QueryForRows(ctx context.Context, stmt transaction.Stmt, params ...any) (*sql.Rows, error) ReadObjects(rows Rows, typ reflect.Type, shouldDecrypt bool) ([]any, error) ReadStrings(rows Rows) ([]string, error) + ReadStrings2(rows Rows) ([][]string, error) ReadInt(rows Rows) (int, error) Upsert(tx transaction.Client, stmt *sql.Stmt, key string, obj any, shouldEncrypt bool) error CloseStmt(closable Closable) error @@ -265,6 +266,34 @@ func (c *client) ReadStrings(rows Rows) ([]string, error) { return result, nil } +// ReadStrings2 scans the given rows into pairs of strings, and then returns the strings as a slice. +func (c *client) ReadStrings2(rows Rows) ([][]string, error) { + c.connLock.RLock() + defer c.connLock.RUnlock() + + var result [][]string + for rows.Next() { + var key1, key2 string + err := rows.Scan(&key1, &key2) + if err != nil { + return nil, closeRowsOnError(rows, err) + } + + result = append(result, []string{key1, key2}) + } + err := rows.Err() + if err != nil { + return nil, closeRowsOnError(rows, err) + } + + err = rows.Close() + if err != nil { + return nil, err + } + + return result, nil +} + // ReadInt scans the first of the given rows into a single int (eg. for COUNT() queries) func (c *client) ReadInt(rows Rows) (int, error) { c.connLock.RLock() diff --git a/pkg/sqlcache/informer/db_mocks_test.go b/pkg/sqlcache/informer/db_mocks_test.go index 172f5ee5..702dda4d 100644 --- a/pkg/sqlcache/informer/db_mocks_test.go +++ b/pkg/sqlcache/informer/db_mocks_test.go @@ -235,6 +235,21 @@ func (mr *MockClientMockRecorder) ReadStrings(rows any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadStrings", reflect.TypeOf((*MockClient)(nil).ReadStrings), rows) } +// ReadStrings2 mocks base method. +func (m *MockClient) ReadStrings2(rows db.Rows) ([][]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReadStrings2", rows) + ret0, _ := ret[0].([][]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ReadStrings2 indicates an expected call of ReadStrings2. +func (mr *MockClientMockRecorder) ReadStrings2(rows any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadStrings2", reflect.TypeOf((*MockClient)(nil).ReadStrings2), rows) +} + // Upsert mocks base method. func (m *MockClient) Upsert(tx transaction.Client, stmt *sql.Stmt, key string, obj any, shouldEncrypt bool) error { m.ctrl.T.Helper() diff --git a/pkg/sqlcache/informer/factory/db_mocks_test.go b/pkg/sqlcache/informer/factory/db_mocks_test.go index 396396c1..0fae0191 100644 --- a/pkg/sqlcache/informer/factory/db_mocks_test.go +++ b/pkg/sqlcache/informer/factory/db_mocks_test.go @@ -151,6 +151,21 @@ func (mr *MockClientMockRecorder) ReadStrings(rows any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadStrings", reflect.TypeOf((*MockClient)(nil).ReadStrings), rows) } +// ReadStrings2 mocks base method. +func (m *MockClient) ReadStrings2(rows db.Rows) ([][]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReadStrings2", rows) + ret0, _ := ret[0].([][]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ReadStrings2 indicates an expected call of ReadStrings2. +func (mr *MockClientMockRecorder) ReadStrings2(rows any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadStrings2", reflect.TypeOf((*MockClient)(nil).ReadStrings2), rows) +} + // Upsert mocks base method. func (m *MockClient) Upsert(tx transaction.Client, stmt *sql.Stmt, key string, obj any, shouldEncrypt bool) error { m.ctrl.T.Helper() diff --git a/pkg/sqlcache/informer/factory/informer_factory.go b/pkg/sqlcache/informer/factory/informer_factory.go index a1fb6b69..dd29f883 100644 --- a/pkg/sqlcache/informer/factory/informer_factory.go +++ b/pkg/sqlcache/informer/factory/informer_factory.go @@ -14,6 +14,7 @@ import ( "github.com/rancher/steve/pkg/sqlcache/db" "github.com/rancher/steve/pkg/sqlcache/encryption" "github.com/rancher/steve/pkg/sqlcache/informer" + "github.com/rancher/steve/pkg/sqlcache/sqltypes" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" @@ -47,7 +48,7 @@ type guardedInformer struct { mutex *sync.Mutex } -type newInformer func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespace bool, watchable bool, maxEventsCount int) (*informer.Informer, error) +type newInformer func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespace bool, watchable bool, maxEventsCount int) (*informer.Informer, error) type Cache struct { informer.ByOptionsLister @@ -108,7 +109,7 @@ func NewCacheFactory(opts CacheFactoryOptions) (*CacheFactory, error) { // CacheFor returns an informer for given GVK, using sql store indexed with fields, using the specified client. For virtual fields, they must be added by the transform function // and specified by fields to be used for later fields. -func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced bool, watchable bool) (Cache, error) { +func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced bool, watchable bool) (Cache, error) { // First of all block Reset() until we are done f.mutex.RLock() defer f.mutex.RUnlock() @@ -145,7 +146,7 @@ func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, transfor _, encryptResourceAlways := defaultEncryptedResourceTypes[gvk] shouldEncrypt := f.encryptAll || encryptResourceAlways maxEventsCount := f.getMaximumEventsCount(gvk) - i, err := f.newInformer(ctx, client, fields, transform, gvk, f.dbClient, shouldEncrypt, namespaced, watchable, maxEventsCount) + i, err := f.newInformer(ctx, client, fields, externalUpdateInfo, transform, gvk, f.dbClient, shouldEncrypt, namespaced, watchable, maxEventsCount) if err != nil { return Cache{}, err } diff --git a/pkg/sqlcache/informer/factory/informer_factory_test.go b/pkg/sqlcache/informer/factory/informer_factory_test.go index f9f5f153..92d30510 100644 --- a/pkg/sqlcache/informer/factory/informer_factory_test.go +++ b/pkg/sqlcache/informer/factory/informer_factory_test.go @@ -8,6 +8,7 @@ import ( "github.com/rancher/steve/pkg/sqlcache/db" "github.com/rancher/steve/pkg/sqlcache/informer" + "github.com/rancher/steve/pkg/sqlcache/sqltypes" "github.com/stretchr/testify/assert" "go.uber.org/mock/gomock" @@ -74,13 +75,14 @@ func TestCacheFor(t *testing.T) { expectedC := Cache{ ByOptionsLister: i, } - testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) { + testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) { assert.Equal(t, client, dynamicClient) assert.Equal(t, fields, fields) assert.Equal(t, expectedGVK, gvk) assert.Equal(t, db, dbClient) assert.Equal(t, false, shouldEncrypt) assert.Equal(t, 0, maxEventsCount) + assert.Nil(t, externalUpdateInfo) return i, nil } f := &CacheFactory{ @@ -97,12 +99,12 @@ func TestCacheFor(t *testing.T) { }() var c Cache var err error - c, err = f.CacheFor(context.Background(), fields, nil, dynamicClient, expectedGVK, false, true) + c, err = f.CacheFor(context.Background(), fields, nil, nil, dynamicClient, expectedGVK, false, true) assert.Nil(t, err) assert.Equal(t, expectedC, c) // this sleep is critical to the test. It ensure there has been enough time for expected function like Run to be invoked in their go routines. time.Sleep(1 * time.Second) - c2, err := f.CacheFor(context.Background(), fields, nil, dynamicClient, expectedGVK, false, true) + c2, err := f.CacheFor(context.Background(), fields, nil, nil, dynamicClient, expectedGVK, false, true) assert.Nil(t, err) assert.Equal(t, c, c2) }}) @@ -120,13 +122,14 @@ func TestCacheFor(t *testing.T) { // need to set this so Run function is not nil SharedIndexInformer: sii, } - testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) { + testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) { assert.Equal(t, client, dynamicClient) assert.Equal(t, fields, fields) assert.Equal(t, expectedGVK, gvk) assert.Equal(t, db, dbClient) assert.Equal(t, false, shouldEncrypt) assert.Equal(t, 0, maxEventsCount) + assert.Nil(t, externalUpdateInfo) return expectedI, nil } f := &CacheFactory{ @@ -141,7 +144,7 @@ func TestCacheFor(t *testing.T) { close(f.stopCh) }() var err error - _, err = f.CacheFor(context.Background(), fields, nil, dynamicClient, expectedGVK, false, true) + _, err = f.CacheFor(context.Background(), fields, nil, nil, dynamicClient, expectedGVK, false, true) assert.NotNil(t, err) time.Sleep(2 * time.Second) }}) @@ -163,13 +166,14 @@ func TestCacheFor(t *testing.T) { expectedC := Cache{ ByOptionsLister: i, } - testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) { + testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) { assert.Equal(t, client, dynamicClient) assert.Equal(t, fields, fields) assert.Equal(t, expectedGVK, gvk) assert.Equal(t, db, dbClient) assert.Equal(t, false, shouldEncrypt) assert.Equal(t, 0, maxEventsCount) + assert.Nil(t, externalUpdateInfo) return i, nil } f := &CacheFactory{ @@ -182,7 +186,7 @@ func TestCacheFor(t *testing.T) { close(f.stopCh) var c Cache var err error - c, err = f.CacheFor(context.Background(), fields, nil, dynamicClient, expectedGVK, false, true) + c, err = f.CacheFor(context.Background(), fields, nil, nil, dynamicClient, expectedGVK, false, true) assert.Nil(t, err) assert.Equal(t, expectedC, c) time.Sleep(1 * time.Second) @@ -203,13 +207,14 @@ func TestCacheFor(t *testing.T) { expectedC := Cache{ ByOptionsLister: i, } - testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) { + testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) { assert.Equal(t, client, dynamicClient) assert.Equal(t, fields, fields) assert.Equal(t, expectedGVK, gvk) assert.Equal(t, db, dbClient) assert.Equal(t, true, shouldEncrypt) assert.Equal(t, 0, maxEventsCount) + assert.Nil(t, externalUpdateInfo) return i, nil } f := &CacheFactory{ @@ -226,7 +231,7 @@ func TestCacheFor(t *testing.T) { }() var c Cache var err error - c, err = f.CacheFor(context.Background(), fields, nil, dynamicClient, expectedGVK, false, true) + c, err = f.CacheFor(context.Background(), fields, nil, nil, dynamicClient, expectedGVK, false, true) assert.Nil(t, err) assert.Equal(t, expectedC, c) time.Sleep(1 * time.Second) @@ -252,13 +257,14 @@ func TestCacheFor(t *testing.T) { expectedC := Cache{ ByOptionsLister: i, } - testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced, watchable bool, maxEventsCount int) (*informer.Informer, error) { + testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced, watchable bool, maxEventsCount int) (*informer.Informer, error) { assert.Equal(t, client, dynamicClient) assert.Equal(t, fields, fields) assert.Equal(t, expectedGVK, gvk) assert.Equal(t, db, dbClient) assert.Equal(t, true, shouldEncrypt) assert.Equal(t, 0, maxEventsCount) + assert.Nil(t, externalUpdateInfo) return i, nil } f := &CacheFactory{ @@ -275,7 +281,7 @@ func TestCacheFor(t *testing.T) { }() var c Cache var err error - c, err = f.CacheFor(context.Background(), fields, nil, dynamicClient, expectedGVK, false, true) + c, err = f.CacheFor(context.Background(), fields, nil, nil, dynamicClient, expectedGVK, false, true) assert.Nil(t, err) assert.Equal(t, expectedC, c) time.Sleep(1 * time.Second) @@ -300,13 +306,14 @@ func TestCacheFor(t *testing.T) { expectedC := Cache{ ByOptionsLister: i, } - testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced, watchable bool, maxEventsCount int) (*informer.Informer, error) { + testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced, watchable bool, maxEventsCount int) (*informer.Informer, error) { assert.Equal(t, client, dynamicClient) assert.Equal(t, fields, fields) assert.Equal(t, expectedGVK, gvk) assert.Equal(t, db, dbClient) assert.Equal(t, true, shouldEncrypt) assert.Equal(t, 0, maxEventsCount) + assert.Nil(t, externalUpdateInfo) return i, nil } f := &CacheFactory{ @@ -323,7 +330,7 @@ func TestCacheFor(t *testing.T) { }() var c Cache var err error - c, err = f.CacheFor(context.Background(), fields, nil, dynamicClient, expectedGVK, false, true) + c, err = f.CacheFor(context.Background(), fields, nil, nil, dynamicClient, expectedGVK, false, true) assert.Nil(t, err) assert.Equal(t, expectedC, c) time.Sleep(1 * time.Second) @@ -348,7 +355,7 @@ func TestCacheFor(t *testing.T) { expectedC := Cache{ ByOptionsLister: i, } - testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) { + testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) { // we can't test func == func, so instead we check if the output was as expected input := "someinput" ouput, err := transform(input) @@ -363,6 +370,7 @@ func TestCacheFor(t *testing.T) { assert.Equal(t, db, dbClient) assert.Equal(t, false, shouldEncrypt) assert.Equal(t, 0, maxEventsCount) + assert.Nil(t, externalUpdateInfo) return i, nil } f := &CacheFactory{ @@ -379,7 +387,7 @@ func TestCacheFor(t *testing.T) { }() var c Cache var err error - c, err = f.CacheFor(context.Background(), fields, transformFunc, dynamicClient, expectedGVK, false, true) + c, err = f.CacheFor(context.Background(), fields, nil, transformFunc, dynamicClient, expectedGVK, false, true) assert.Nil(t, err) assert.Equal(t, expectedC, c) time.Sleep(1 * time.Second) @@ -400,13 +408,14 @@ func TestCacheFor(t *testing.T) { expectedC := Cache{ ByOptionsLister: i, } - testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) { + testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) { assert.Equal(t, client, dynamicClient) assert.Equal(t, fields, fields) assert.Equal(t, expectedGVK, gvk) assert.Equal(t, db, dbClient) assert.Equal(t, true, shouldEncrypt) assert.Equal(t, 10, maxEventsCount) + assert.Nil(t, externalUpdateInfo) return i, nil } f := &CacheFactory{ @@ -424,7 +433,8 @@ func TestCacheFor(t *testing.T) { }() var c Cache var err error - c, err = f.CacheFor(context.Background(), fields, nil, dynamicClient, expectedGVK, false, true) + // CacheFor(ctx context.Context, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced bool, watchable bool) + c, err = f.CacheFor(context.Background(), fields, nil, nil, dynamicClient, expectedGVK, false, true) assert.Nil(t, err) assert.Equal(t, expectedC, c) time.Sleep(1 * time.Second) @@ -449,13 +459,14 @@ func TestCacheFor(t *testing.T) { expectedC := Cache{ ByOptionsLister: i, } - testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) { + testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) { assert.Equal(t, client, dynamicClient) assert.Equal(t, fields, fields) assert.Equal(t, expectedGVK, gvk) assert.Equal(t, db, dbClient) assert.Equal(t, true, shouldEncrypt) assert.Equal(t, 10, maxEventsCount) + assert.Nil(t, externalUpdateInfo) return i, nil } f := &CacheFactory{ @@ -476,7 +487,7 @@ func TestCacheFor(t *testing.T) { }() var c Cache var err error - c, err = f.CacheFor(context.Background(), fields, nil, dynamicClient, expectedGVK, false, true) + c, err = f.CacheFor(context.Background(), fields, nil, nil, dynamicClient, expectedGVK, false, true) assert.Nil(t, err) assert.Equal(t, expectedC, c) time.Sleep(1 * time.Second) diff --git a/pkg/sqlcache/informer/informer.go b/pkg/sqlcache/informer/informer.go index 40c8b75f..cb46427f 100644 --- a/pkg/sqlcache/informer/informer.go +++ b/pkg/sqlcache/informer/informer.go @@ -55,7 +55,7 @@ var newInformer = cache.NewSharedIndexInformer // NewInformer returns a new SQLite-backed Informer for the type specified by schema in unstructured.Unstructured form // using the specified client -func NewInformer(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, maxEventsCount int) (*Informer, error) { +func NewInformer(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, maxEventsCount int) (*Informer, error) { watchFunc := func(options metav1.ListOptions) (watch.Interface, error) { return client.Watch(ctx, options) } @@ -112,7 +112,7 @@ func NewInformer(ctx context.Context, client dynamic.ResourceInterface, fields [ name := informerNameFromGVK(gvk) - s, err := sqlStore.NewStore(ctx, example, cache.DeletionHandlingMetaNamespaceKeyFunc, db, shouldEncrypt, name) + s, err := sqlStore.NewStore(ctx, example, cache.DeletionHandlingMetaNamespaceKeyFunc, db, shouldEncrypt, gvk, name, externalUpdateInfo) if err != nil { return nil, err } diff --git a/pkg/sqlcache/informer/informer_test.go b/pkg/sqlcache/informer/informer_test.go index 8d455052..d7f5ed30 100644 --- a/pkg/sqlcache/informer/informer_test.go +++ b/pkg/sqlcache/informer/informer_test.go @@ -81,7 +81,7 @@ func TestNewInformer(t *testing.T) { } }) - informer, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true, true, 0) + informer, err := NewInformer(context.Background(), dynamicClient, fields, nil, nil, gvk, dbClient, false, true, true, 0) assert.Nil(t, err) assert.NotNil(t, informer.ByOptionsLister) assert.NotNil(t, informer.SharedIndexInformer) @@ -105,7 +105,7 @@ func TestNewInformer(t *testing.T) { } }) - _, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true, true, 0) + _, err := NewInformer(context.Background(), dynamicClient, fields, nil, nil, gvk, dbClient, false, true, true, 0) assert.NotNil(t, err) }}) tests = append(tests, testCase{description: "NewInformer() with errors returned from NewIndexer(), should return an error", test: func(t *testing.T) { @@ -140,7 +140,7 @@ func TestNewInformer(t *testing.T) { } }) - _, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true, true, 0) + _, err := NewInformer(context.Background(), dynamicClient, fields, nil, nil, gvk, dbClient, false, true, true, 0) assert.NotNil(t, err) }}) tests = append(tests, testCase{description: "NewInformer() with errors returned from NewListOptionIndexer(), should return an error", test: func(t *testing.T) { @@ -193,7 +193,7 @@ func TestNewInformer(t *testing.T) { } }) - _, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true, true, 0) + _, err := NewInformer(context.Background(), dynamicClient, fields, nil, nil, gvk, dbClient, false, true, true, 0) assert.NotNil(t, err) }}) tests = append(tests, testCase{description: "NewInformer() with transform func", test: func(t *testing.T) { @@ -257,7 +257,7 @@ func TestNewInformer(t *testing.T) { transformFunc := func(input interface{}) (interface{}, error) { return "someoutput", nil } - informer, err := NewInformer(context.Background(), dynamicClient, fields, transformFunc, gvk, dbClient, false, true, true, 0) + informer, err := NewInformer(context.Background(), dynamicClient, fields, nil, transformFunc, gvk, dbClient, false, true, true, 0) assert.Nil(t, err) assert.NotNil(t, informer.ByOptionsLister) assert.NotNil(t, informer.SharedIndexInformer) @@ -293,7 +293,7 @@ func TestNewInformer(t *testing.T) { transformFunc := func(input interface{}) (interface{}, error) { return "someoutput", nil } - _, err := NewInformer(context.Background(), dynamicClient, fields, transformFunc, gvk, dbClient, false, true, true, 0) + _, err := NewInformer(context.Background(), dynamicClient, fields, nil, transformFunc, gvk, dbClient, false, true, true, 0) assert.Error(t, err) newInformer = cache.NewSharedIndexInformer }}) diff --git a/pkg/sqlcache/informer/listoption_indexer_test.go b/pkg/sqlcache/informer/listoption_indexer_test.go index 02792de3..79a08fe8 100644 --- a/pkg/sqlcache/informer/listoption_indexer_test.go +++ b/pkg/sqlcache/informer/listoption_indexer_test.go @@ -50,7 +50,7 @@ func makeListOptionIndexer(ctx context.Context, opts ListOptionIndexerOptions) ( return nil, "", err } - s, err := store.NewStore(ctx, example, cache.DeletionHandlingMetaNamespaceKeyFunc, db, false, name) + s, err := store.NewStore(ctx, example, cache.DeletionHandlingMetaNamespaceKeyFunc, db, false, gvk, name, nil) if err != nil { return nil, "", err } diff --git a/pkg/sqlcache/informer/sql_mocks_test.go b/pkg/sqlcache/informer/sql_mocks_test.go index df43b201..70016975 100644 --- a/pkg/sqlcache/informer/sql_mocks_test.go +++ b/pkg/sqlcache/informer/sql_mocks_test.go @@ -281,6 +281,21 @@ func (mr *MockStoreMockRecorder) ReadStrings(rows any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadStrings", reflect.TypeOf((*MockStore)(nil).ReadStrings), rows) } +// ReadStrings2 mocks base method. +func (m *MockStore) ReadStrings2(rows db.Rows) ([][]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReadStrings2", rows) + ret0, _ := ret[0].([][]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ReadStrings2 indicates an expected call of ReadStrings2. +func (mr *MockStoreMockRecorder) ReadStrings2(rows any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadStrings2", reflect.TypeOf((*MockStore)(nil).ReadStrings2), rows) +} + // RegisterAfterAdd mocks base method. func (m *MockStore) RegisterAfterAdd(f func(string, any, transaction.Client) error) { m.ctrl.T.Helper() diff --git a/pkg/sqlcache/integration_test.go b/pkg/sqlcache/integration_test.go index e9c36e8e..eb9e599c 100644 --- a/pkg/sqlcache/integration_test.go +++ b/pkg/sqlcache/integration_test.go @@ -323,7 +323,7 @@ func (i *IntegrationSuite) createCacheAndFactory(fields [][]string, transformFun Resource: "configmaps", } dynamicResource := dynamicClient.Resource(configMapGVR).Namespace(testNamespace) - cache, err := cacheFactory.CacheFor(context.Background(), fields, transformFunc, dynamicResource, configMapGVK, true, true) + cache, err := cacheFactory.CacheFor(context.Background(), fields, nil, transformFunc, dynamicResource, configMapGVK, true, true) if err != nil { return nil, nil, fmt.Errorf("unable to make cache: %w", err) } diff --git a/pkg/sqlcache/sqltypes/types.go b/pkg/sqlcache/sqltypes/types.go index eca0cc44..66d230ec 100644 --- a/pkg/sqlcache/sqltypes/types.go +++ b/pkg/sqlcache/sqltypes/types.go @@ -1,5 +1,7 @@ package sqltypes +import "k8s.io/apimachinery/pkg/runtime/schema" + type Op string const ( @@ -69,6 +71,36 @@ type Pagination struct { Page int } +type ExternalDependency struct { + SourceGVK string + SourceFieldName string + TargetGVK string + TargetKeyFieldName string + TargetFinalFieldName string +} + +type ExternalLabelDependency struct { + SourceGVK string + SourceLabelName string + TargetGVK string + TargetKeyFieldName string + TargetFinalFieldName string +} + +type ExternalGVKUpdates struct { + AffectedGVK schema.GroupVersionKind + ExternalDependencies []ExternalDependency + ExternalLabelDependencies []ExternalLabelDependency +} + +type ExternalGVKDependency map[schema.GroupVersionKind]*ExternalGVKUpdates + +type ExternalInfoPacket struct { + ExternalDependencies []ExternalDependency + ExternalLabelDependencies []ExternalLabelDependency + ExternalGVKDependencies ExternalGVKDependency +} + func NewSortList() *SortList { return &SortList{ SortDirectives: []Sort{}, diff --git a/pkg/sqlcache/store/db_mocks_test.go b/pkg/sqlcache/store/db_mocks_test.go index f76331a9..c6ed7b07 100644 --- a/pkg/sqlcache/store/db_mocks_test.go +++ b/pkg/sqlcache/store/db_mocks_test.go @@ -235,6 +235,21 @@ func (mr *MockClientMockRecorder) ReadStrings(rows any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadStrings", reflect.TypeOf((*MockClient)(nil).ReadStrings), rows) } +// ReadStrings2 mocks base method. +func (m *MockClient) ReadStrings2(rows db.Rows) ([][]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReadStrings2", rows) + ret0, _ := ret[0].([][]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ReadStrings2 indicates an expected call of ReadStrings2. +func (mr *MockClientMockRecorder) ReadStrings2(rows any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadStrings2", reflect.TypeOf((*MockClient)(nil).ReadStrings2), rows) +} + // Upsert mocks base method. func (m *MockClient) Upsert(tx transaction.Client, stmt *sql.Stmt, key string, obj any, shouldEncrypt bool) error { m.ctrl.T.Helper() diff --git a/pkg/sqlcache/store/store.go b/pkg/sqlcache/store/store.go index d73b7354..6472c29e 100644 --- a/pkg/sqlcache/store/store.go +++ b/pkg/sqlcache/store/store.go @@ -12,6 +12,9 @@ import ( "github.com/rancher/lasso/pkg/log" "github.com/rancher/steve/pkg/sqlcache/db" "github.com/rancher/steve/pkg/sqlcache/db/transaction" + "github.com/rancher/steve/pkg/sqlcache/sqltypes" + "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/tools/cache" // needed for drivers @@ -37,11 +40,13 @@ const ( type Store struct { db.Client - ctx context.Context - name string - typ reflect.Type - keyFunc cache.KeyFunc - shouldEncrypt bool + ctx context.Context + gvk schema.GroupVersionKind + name string + externalUpdateInfo *sqltypes.ExternalGVKUpdates + typ reflect.Type + keyFunc cache.KeyFunc + shouldEncrypt bool upsertQuery string deleteQuery string @@ -67,18 +72,20 @@ type Store struct { var _ cache.Store = (*Store)(nil) // NewStore creates a SQLite-backed cache.Store for objects of the given example type -func NewStore(ctx context.Context, example any, keyFunc cache.KeyFunc, c db.Client, shouldEncrypt bool, name string) (*Store, error) { +func NewStore(ctx context.Context, example any, keyFunc cache.KeyFunc, c db.Client, shouldEncrypt bool, gvk schema.GroupVersionKind, name string, externalUpdateInfo *sqltypes.ExternalGVKUpdates) (*Store, error) { s := &Store{ - ctx: ctx, - name: name, - typ: reflect.TypeOf(example), - Client: c, - keyFunc: keyFunc, - shouldEncrypt: shouldEncrypt, - afterAdd: []func(key string, obj any, tx transaction.Client) error{}, - afterUpdate: []func(key string, obj any, tx transaction.Client) error{}, - afterDelete: []func(key string, obj any, tx transaction.Client) error{}, - afterDeleteAll: []func(tx transaction.Client) error{}, + ctx: ctx, + name: name, + gvk: gvk, + externalUpdateInfo: externalUpdateInfo, + typ: reflect.TypeOf(example), + Client: c, + keyFunc: keyFunc, + shouldEncrypt: shouldEncrypt, + afterAdd: []func(key string, obj any, tx transaction.Client) error{}, + afterUpdate: []func(key string, obj any, tx transaction.Client) error{}, + afterDelete: []func(key string, obj any, tx transaction.Client) error{}, + afterDeleteAll: []func(tx transaction.Client) error{}, } dbName := db.Sanitize(s.name) @@ -114,7 +121,103 @@ func NewStore(ctx context.Context, example any, keyFunc cache.KeyFunc, c db.Clie return s, nil } -/* Core methods */ +func (s *Store) checkUpdateExternalInfo(key string) error { + if s.externalUpdateInfo == nil { + return nil + } + return s.WithTransaction(s.ctx, true, func(tx transaction.Client) error { + if err := s.updateExternalInfo(tx, key, s.externalUpdateInfo); err != nil { + // Just report and ignore errors + logrus.Errorf("Error updating external info %v: %s", s.externalUpdateInfo, err) + } + return nil + }) +} + +func (s *Store) updateExternalInfo(tx transaction.Client, key string, externalUpdateInfo *sqltypes.ExternalGVKUpdates) error { + for _, labelDep := range externalUpdateInfo.ExternalLabelDependencies { + rawGetStmt := fmt.Sprintf(`SELECT DISTINCT f.key, ex2."%s" FROM "%s_fields" f + LEFT OUTER JOIN "%s_labels" lt1 ON f.key = lt1.key + JOIN "%s_fields" ex2 ON lt1.value = ex2."%s" + WHERE lt1.label = ? AND f."%s" != ex2."%s"`, + labelDep.TargetFinalFieldName, + labelDep.SourceGVK, + labelDep.SourceGVK, + labelDep.TargetGVK, + labelDep.TargetKeyFieldName, + labelDep.TargetFinalFieldName, + labelDep.TargetFinalFieldName, + ) + getStmt := s.Prepare(rawGetStmt) + rows, err := s.QueryForRows(s.ctx, getStmt, labelDep.SourceLabelName) + if err != nil { + logrus.Infof("Error getting external info for table %s, key %s: %v", labelDep.TargetGVK, key, &db.QueryError{QueryString: rawGetStmt, Err: err}) + continue + } + result, err := s.ReadStrings2(rows) + if err != nil { + logrus.Infof("Error reading objects for table %s, key %s: %s", labelDep.TargetGVK, key, err) + continue + } + if len(result) == 0 { + continue + } + for _, innerResult := range result { + sourceKey := innerResult[0] + finalTargetValue := innerResult[1] + rawStmt := fmt.Sprintf(`UPDATE "%s_fields" SET "%s" = ? WHERE key = ?`, + labelDep.SourceGVK, labelDep.TargetFinalFieldName) + preparedStmt := s.Prepare(rawStmt) + _, err = tx.Stmt(preparedStmt).Exec(finalTargetValue, sourceKey) + if err != nil { + logrus.Infof("Error running %s(%s, %s): %s", rawStmt, finalTargetValue, sourceKey, err) + continue + } + } + } + for _, nonLabelDep := range externalUpdateInfo.ExternalDependencies { + rawGetStmt := fmt.Sprintf(`SELECT f.key, ex2."%s" + FROM "%s_fields" f JOIN "%s_fields" ex2 ON f."%s" = ex2."%s" + WHERE f."%s" != ex2."%s"`, + nonLabelDep.TargetFinalFieldName, + nonLabelDep.SourceGVK, + nonLabelDep.TargetGVK, + nonLabelDep.SourceFieldName, + nonLabelDep.TargetKeyFieldName, + nonLabelDep.TargetFinalFieldName, + nonLabelDep.TargetFinalFieldName) + // TODO: Try to fold the two blocks together + + getStmt := s.Prepare(rawGetStmt) + rows, err := s.QueryForRows(s.ctx, getStmt, nonLabelDep.SourceFieldName) + if err != nil { + logrus.Infof("Error getting external info for table %s, key %s: %v", nonLabelDep.TargetGVK, key, &db.QueryError{QueryString: rawGetStmt, Err: err}) + continue + } + result, err := s.ReadStrings2(rows) + if err != nil { + logrus.Infof("Error reading objects for table %s, key %s: %s", nonLabelDep.TargetGVK, key, err) + continue + } + if len(result) == 0 { + continue + } + for _, innerResult := range result { + sourceKey := innerResult[0] + finalTargetValue := innerResult[1] + rawStmt := fmt.Sprintf(`UPDATE "%s_fields" SET "%s" = ? WHERE key = ?`, + nonLabelDep.SourceGVK, nonLabelDep.TargetFinalFieldName) + preparedStmt := s.Prepare(rawStmt) + _, err = tx.Stmt(preparedStmt).Exec(finalTargetValue, sourceKey) + if err != nil { + logrus.Infof("Error running %s(%s, %s): %s", rawStmt, finalTargetValue, sourceKey, err) + continue + } + } + } + return nil + +} // deleteByKey deletes the object associated with key, if it exists in this Store func (s *Store) deleteByKey(key string, obj any) error { @@ -153,6 +256,8 @@ func (s *Store) GetByKey(key string) (item any, exists bool, err error) { /* Satisfy cache.Store */ +/* Core methods */ + // Add saves an obj, or updates it if it exists in this Store func (s *Store) Add(obj any) error { key, err := s.keyFunc(obj) @@ -177,7 +282,7 @@ func (s *Store) Add(obj any) error { log.Errorf("Error in Store.Add for type %v: %v", s.name, err) return err } - return nil + return s.checkUpdateExternalInfo(key) } // Update saves an obj, or updates it if it exists in this Store @@ -204,7 +309,7 @@ func (s *Store) Update(obj any) error { log.Errorf("Error in Store.Update for type %v: %v", s.name, err) return err } - return nil + return s.checkUpdateExternalInfo(key) } // Delete deletes the given object, if it exists in this Store diff --git a/pkg/sqlcache/store/store_test.go b/pkg/sqlcache/store/store_test.go index 0e16dc70..97c96474 100644 --- a/pkg/sqlcache/store/store_test.go +++ b/pkg/sqlcache/store/store_test.go @@ -14,7 +14,10 @@ import ( "context" "database/sql" "fmt" + "github.com/rancher/steve/pkg/sqlcache/sqltypes" "reflect" + "regexp" + "strings" "testing" "github.com/rancher/steve/pkg/sqlcache/db" @@ -22,6 +25,7 @@ import ( "github.com/stretchr/testify/assert" "go.uber.org/mock/gomock" + "k8s.io/apimachinery/pkg/runtime/schema" ) func testStoreKeyFunc(obj interface{}) (string, error) { @@ -670,6 +674,96 @@ func TestResync(t *testing.T) { } } +type StringMatcher struct { + expected string +} + +var ptn = regexp.MustCompile(`\s\s+`) + +func dropWhiteSpace(s string) string { + s1 := strings.TrimSpace(s) + s2 := strings.ReplaceAll(s1, "\n", " ") + s3 := strings.ReplaceAll(s2, "\r", " ") + return ptn.ReplaceAllString(s3, " ") +} + +func (m StringMatcher) Matches(x any) bool { + s, ok := x.(string) + if !ok { + return false + } + return dropWhiteSpace(s) == m.expected +} + +func (m StringMatcher) String() string { + return m.expected +} + +func WSIgnoringMatcher(expected string) gomock.Matcher { + return StringMatcher{ + expected: dropWhiteSpace(expected), + } +} + +func TestAddWithExternalUpdates(t *testing.T) { + type testCase struct { + description string + test func(t *testing.T) + } + testObject := testStoreObject{Id: "testStoreObject", Val: "a"} + var tests []testCase + tests = append(tests, testCase{description: "Add with no DB client errors", test: func(t *testing.T) { + c, txC := SetupMockDB(t) + stmts := NewMockStmt(gomock.NewController(t)) + store := SetupStoreWithExternalDependencies(t, c) + + c.EXPECT().Upsert(txC, store.upsertStmt, "testStoreObject", testObject, store.shouldEncrypt).Return(nil) + c.EXPECT().WithTransaction(gomock.Any(), true, gomock.Any()).Return(nil).Do( + func(ctx context.Context, shouldEncrypt bool, f db.WithTransactionFunction) { + err := f(txC) + if err != nil { + t.Fail() + } + }).Times(2) + rawStmt := `SELECT DISTINCT f.key, ex2."spec.displayName" FROM "_v1_Namespace_fields" f + LEFT OUTER JOIN "_v1_Namespace_labels" lt1 ON f.key = lt1.key + JOIN "management.cattle.io_v3_Project_fields" ex2 ON lt1.value = ex2."metadata.name" + WHERE lt1.label = ? AND f."spec.displayName" != ex2."spec.displayName"` + c.EXPECT().Prepare(WSIgnoringMatcher(rawStmt)) + results1 := []any{"field.cattle.io/projectId"} + c.EXPECT().QueryForRows(gomock.Any(), gomock.Any(), results1) + c.EXPECT().ReadStrings2(gomock.Any()).Return([][]string{{"lego.cattle.io/fields1", "moose1"}}, nil) + rawStmt2 := `UPDATE "_v1_Namespace_fields" SET "spec.displayName" = ? WHERE key = ?` + c.EXPECT().Prepare(rawStmt2) + txC.EXPECT().Stmt(gomock.Any()).Return(stmts) + stmts.EXPECT().Exec("moose1", "lego.cattle.io/fields1") + + rawStmt3 := `SELECT f.key, ex2."spec.projectName" FROM "_v1_Pods_fields" f + JOIN "provisioner.cattle.io_v3_Cluster_fields" ex2 ON f."field.cattle.io/fixer" = ex2."metadata.name" + WHERE f."spec.projectName" != ex2."spec.projectName"` + c.EXPECT().Prepare(WSIgnoringMatcher(rawStmt3)) + results2 := []any{"field.cattle.io/fixer"} + c.EXPECT().QueryForRows(gomock.Any(), gomock.Any(), results2) + + c.EXPECT().ReadStrings2(gomock.Any()).Return([][]string{{"lego.cattle.io/fields2", "moose2"}}, nil) + rawStmt4 := `UPDATE "_v1_Pods_fields" SET "spec.projectName" = ? WHERE key = ?` + c.EXPECT().Prepare(rawStmt4) + txC.EXPECT().Stmt(gomock.Any()).Return(stmts) + stmts.EXPECT().Exec("moose2", "lego.cattle.io/fields2") + + err := store.Add(testObject) + assert.Nil(t, err) + }, + }) + + t.Parallel() + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + test.test(t) + }) + } +} + func SetupMockDB(t *testing.T) (*MockClient, *MockTXClient) { dbC := NewMockClient(gomock.NewController(t)) // add functionality once store expectation are known txC := NewMockTXClient(gomock.NewController(t)) @@ -693,7 +787,42 @@ func SetupMockDB(t *testing.T) (*MockClient, *MockTXClient) { return dbC, txC } func SetupStore(t *testing.T, client *MockClient, shouldEncrypt bool) *Store { - store, err := NewStore(context.Background(), testStoreObject{}, testStoreKeyFunc, client, shouldEncrypt, "testStoreObject") + name := "testStoreObject" + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: name} + store, err := NewStore(context.Background(), testStoreObject{}, testStoreKeyFunc, client, shouldEncrypt, gvk, name, nil) + if err != nil { + t.Error(err) + } + return store +} + +func gvkKey(group, version, kind string) string { + return group + "_" + version + "_" + kind +} + +func SetupStoreWithExternalDependencies(t *testing.T, client *MockClient) *Store { + name := "testStoreObject" + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: name} + namespaceProjectLabelDep := sqltypes.ExternalLabelDependency{ + SourceGVK: gvkKey("", "v1", "Namespace"), + SourceLabelName: "field.cattle.io/projectId", + TargetGVK: gvkKey("management.cattle.io", "v3", "Project"), + TargetKeyFieldName: "metadata.name", + TargetFinalFieldName: "spec.displayName", + } + namespaceNonLabelDep := sqltypes.ExternalDependency{ + SourceGVK: gvkKey("", "v1", "Pods"), + SourceFieldName: "field.cattle.io/fixer", + TargetGVK: gvkKey("provisioner.cattle.io", "v3", "Cluster"), + TargetKeyFieldName: "metadata.name", + TargetFinalFieldName: "spec.projectName", + } + updateInfo := sqltypes.ExternalGVKUpdates{ + AffectedGVK: schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"}, + ExternalDependencies: []sqltypes.ExternalDependency{namespaceNonLabelDep}, + ExternalLabelDependencies: []sqltypes.ExternalLabelDependency{namespaceProjectLabelDep}, + } + store, err := NewStore(context.Background(), testStoreObject{}, testStoreKeyFunc, client, false, gvk, name, &updateInfo) if err != nil { t.Error(err) } diff --git a/pkg/stores/sqlproxy/proxy_mocks_test.go b/pkg/stores/sqlproxy/proxy_mocks_test.go index b0279a9e..ac8a403e 100644 --- a/pkg/stores/sqlproxy/proxy_mocks_test.go +++ b/pkg/stores/sqlproxy/proxy_mocks_test.go @@ -267,18 +267,18 @@ func (m *MockCacheFactory) EXPECT() *MockCacheFactoryMockRecorder { } // CacheFor mocks base method. -func (m *MockCacheFactory) CacheFor(ctx context.Context, fields [][]string, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced, watchable bool) (factory.Cache, error) { +func (m *MockCacheFactory) CacheFor(ctx context.Context, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced, watchable bool) (factory.Cache, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CacheFor", ctx, fields, transform, client, gvk, namespaced, watchable) + ret := m.ctrl.Call(m, "CacheFor", ctx, fields, externalUpdateInfo, transform, client, gvk, namespaced, watchable) ret0, _ := ret[0].(factory.Cache) ret1, _ := ret[1].(error) return ret0, ret1 } // CacheFor indicates an expected call of CacheFor. -func (mr *MockCacheFactoryMockRecorder) CacheFor(ctx, fields, transform, client, gvk, namespaced, watchable any) *gomock.Call { +func (mr *MockCacheFactoryMockRecorder) CacheFor(ctx, fields, externalUpdateInfo, transform, client, gvk, namespaced, watchable any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CacheFor", reflect.TypeOf((*MockCacheFactory)(nil).CacheFor), ctx, fields, transform, client, gvk, namespaced, watchable) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CacheFor", reflect.TypeOf((*MockCacheFactory)(nil).CacheFor), ctx, fields, externalUpdateInfo, transform, client, gvk, namespaced, watchable) } // Reset mocks base method. diff --git a/pkg/stores/sqlproxy/proxy_store.go b/pkg/stores/sqlproxy/proxy_store.go index d642cfdc..c782ff78 100644 --- a/pkg/stores/sqlproxy/proxy_store.go +++ b/pkg/stores/sqlproxy/proxy_store.go @@ -75,7 +75,9 @@ var ( {"reason"}, }, gvkKey("", "v1", "Namespace"): { - {"metadata", "labels", "field.cattle.io/projectId"}}, + {"metadata", "labels", "field.cattle.io/projectId"}, + {"spec", "displayName"}, + }, gvkKey("", "v1", "Node"): { {"status", "nodeInfo", "kubeletVersion"}, {"status", "nodeInfo", "operatingSystem"}}, @@ -154,7 +156,9 @@ var ( gvkKey("management.cattle.io", "v3", "NodeTemplate"): { {"spec", "clusterName"}}, gvkKey("management.cattle.io", "v3", "Project"): { - {"spec", "clusterName"}}, + {"spec", "displayName"}, + {"spec", "clusterName"}, + }, gvkKey("networking.k8s.io", "v1", "Ingress"): { {"spec", "rules", "host"}, {"spec", "ingressClassName"}, @@ -188,6 +192,20 @@ var ( }, }, } + namespaceProjectLabelDep = sqltypes.ExternalLabelDependency{ + SourceGVK: gvkKey("", "v1", "Namespace"), + SourceLabelName: "field.cattle.io/projectId", + TargetGVK: gvkKey("management.cattle.io", "v3", "Project"), + TargetKeyFieldName: "metadata.name", + TargetFinalFieldName: "spec.displayName", + } + externalGVKDependencies = sqltypes.ExternalGVKDependency{ + schema.GroupVersionKind{Group: "management.cattle.io", Version: "v3", Kind: "Project"}: &sqltypes.ExternalGVKUpdates{ + AffectedGVK: schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Namespace"}, + ExternalDependencies: nil, + ExternalLabelDependencies: []sqltypes.ExternalLabelDependency{namespaceProjectLabelDep}, + }, + } ) func init() { @@ -260,7 +278,7 @@ type Store struct { type CacheFactoryInitializer func() (CacheFactory, error) type CacheFactory interface { - CacheFor(ctx context.Context, fields [][]string, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced bool, watchable bool) (factory.Cache, error) + CacheFor(ctx context.Context, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced bool, watchable bool) (factory.Cache, error) Reset() error } @@ -342,8 +360,7 @@ func (s *Store) initializeNamespaceCache() error { // get the ns informer tableClient := &tablelistconvert.Client{ResourceInterface: client} - attrs := attributes.GVK(&nsSchema) - nsInformer, err := s.cacheFactory.CacheFor(s.ctx, fields, transformFunc, tableClient, attrs, false, true) + nsInformer, err := s.cacheFactory.CacheFor(s.ctx, fields, externalGVKDependencies[gvk], transformFunc, tableClient, gvk, false, true) if err != nil { return err } @@ -550,7 +567,7 @@ func (s *Store) watch(apiOp *types.APIRequest, schema *types.APISchema, w types. tableClient := &tablelistconvert.Client{ResourceInterface: client} attrs := attributes.GVK(schema) ns := attributes.Namespaced(schema) - inf, err := s.cacheFactory.CacheFor(s.ctx, fields, transformFunc, tableClient, attrs, ns, controllerschema.IsListWatchable(schema)) + inf, err := s.cacheFactory.CacheFor(s.ctx, fields, externalGVKDependencies[gvk], transformFunc, tableClient, attrs, ns, controllerschema.IsListWatchable(schema)) if err != nil { return nil, err } @@ -763,7 +780,7 @@ func (s *Store) ListByPartitions(apiOp *types.APIRequest, apiSchema *types.APISc tableClient := &tablelistconvert.Client{ResourceInterface: client} attrs := attributes.GVK(apiSchema) ns := attributes.Namespaced(apiSchema) - inf, err := s.cacheFactory.CacheFor(s.ctx, fields, transformFunc, tableClient, attrs, ns, controllerschema.IsListWatchable(apiSchema)) + inf, err := s.cacheFactory.CacheFor(s.ctx, fields, externalGVKDependencies[gvk], transformFunc, tableClient, attrs, ns, controllerschema.IsListWatchable(apiSchema)) if err != nil { return nil, 0, "", err } diff --git a/pkg/stores/sqlproxy/proxy_store_test.go b/pkg/stores/sqlproxy/proxy_store_test.go index ed40875a..8074552c 100644 --- a/pkg/stores/sqlproxy/proxy_store_test.go +++ b/pkg/stores/sqlproxy/proxy_store_test.go @@ -84,7 +84,7 @@ func TestNewProxyStore(t *testing.T) { nsSchema := baseNSSchema scc.EXPECT().SetColumns(context.Background(), &nsSchema).Return(nil) cg.EXPECT().TableAdminClient(nil, &nsSchema, "", &WarningBuffer{}).Return(ri, nil) - cf.EXPECT().CacheFor(context.Background(), [][]string{{`id`}, {`metadata`, `state`, `name`}, {"metadata", "labels", "field.cattle.io/projectId"}}, gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(&nsSchema), false, true).Return(c, nil) + cf.EXPECT().CacheFor(context.Background(), [][]string{{`id`}, {`metadata`, `state`, `name`}, {"metadata", "labels", "field.cattle.io/projectId"}, {"spec", "displayName"}}, nil, gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(&nsSchema), false, true).Return(c, nil) s, err := NewProxyStore(context.Background(), scc, cg, rn, nil, cf) assert.Nil(t, err) @@ -151,7 +151,7 @@ func TestNewProxyStore(t *testing.T) { nsSchema := baseNSSchema scc.EXPECT().SetColumns(context.Background(), &nsSchema).Return(nil) cg.EXPECT().TableAdminClient(nil, &nsSchema, "", &WarningBuffer{}).Return(ri, nil) - cf.EXPECT().CacheFor(context.Background(), [][]string{{`id`}, {`metadata`, `state`, `name`}, {"metadata", "labels", "field.cattle.io/projectId"}}, gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(&nsSchema), false, true).Return(factory.Cache{}, fmt.Errorf("error")) + cf.EXPECT().CacheFor(context.Background(), [][]string{{`id`}, {`metadata`, `state`, `name`}, {"metadata", "labels", "field.cattle.io/projectId"}, {"spec", "displayName"}}, nil, gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(&nsSchema), false, true).Return(factory.Cache{}, fmt.Errorf("error")) s, err := NewProxyStore(context.Background(), scc, cg, rn, nil, cf) assert.Nil(t, err) @@ -244,7 +244,7 @@ func TestListByPartitions(t *testing.T) { assert.Nil(t, err) cg.EXPECT().TableAdminClient(req, schema, "", &WarningBuffer{}).Return(ri, nil) // This tests that fields are being extracted from schema columns and the type specific fields map - cf.EXPECT().CacheFor(context.Background(), [][]string{{"some", "field"}, {`id`}, {`metadata`, `state`, `name`}, {"gvk", "specific", "fields"}}, gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(schema), attributes.Namespaced(schema), true).Return(c, nil) + cf.EXPECT().CacheFor(context.Background(), [][]string{{"some", "field"}, {`id`}, {`metadata`, `state`, `name`}, {"gvk", "specific", "fields"}}, nil, gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(schema), attributes.Namespaced(schema), true).Return(c, nil) tb.EXPECT().GetTransformFunc(attributes.GVK(schema), []common.ColumnDefinition{{Field: "some.field"}}).Return(func(obj interface{}) (interface{}, error) { return obj, nil }) bloi.EXPECT().ListByOptions(req.Context(), &opts, partitions, req.Namespace).Return(listToReturn, len(listToReturn.Items), "", nil) list, total, contToken, err := s.ListByPartitions(req, schema, partitions) @@ -460,7 +460,7 @@ func TestListByPartitions(t *testing.T) { // This tests that fields are being extracted from schema columns and the type specific fields map // note also the watchable bool is expected to be false - cf.EXPECT().CacheFor(context.Background(), [][]string{{"some", "field"}, {`id`}, {`metadata`, `state`, `name`}, {"gvk", "specific", "fields"}}, gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(schema), attributes.Namespaced(schema), false).Return(c, nil) + cf.EXPECT().CacheFor(context.Background(), [][]string{{"some", "field"}, {`id`}, {`metadata`, `state`, `name`}, {"gvk", "specific", "fields"}}, nil, gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(schema), attributes.Namespaced(schema), false).Return(c, nil) tb.EXPECT().GetTransformFunc(attributes.GVK(schema), []common.ColumnDefinition{{Field: "some.field"}}).Return(func(obj interface{}) (interface{}, error) { return obj, nil }) bloi.EXPECT().ListByOptions(req.Context(), &opts, partitions, req.Namespace).Return(listToReturn, len(listToReturn.Items), "", nil) @@ -535,7 +535,7 @@ func TestListByPartitions(t *testing.T) { cg.EXPECT().TableAdminClient(req, schema, "", &WarningBuffer{}).Return(ri, nil) // This tests that fields are being extracted from schema columns and the type specific fields map tb.EXPECT().GetTransformFunc(attributes.GVK(schema), gomock.Any()).Return(func(obj interface{}) (interface{}, error) { return obj, nil }) - cf.EXPECT().CacheFor(context.Background(), [][]string{{"some", "field"}, {`id`}, {`metadata`, `state`, `name`}, {"gvk", "specific", "fields"}}, gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(schema), attributes.Namespaced(schema), true).Return(factory.Cache{}, fmt.Errorf("error")) + cf.EXPECT().CacheFor(context.Background(), [][]string{{"some", "field"}, {`id`}, {`metadata`, `state`, `name`}, {"gvk", "specific", "fields"}}, nil, gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(schema), attributes.Namespaced(schema), true).Return(factory.Cache{}, fmt.Errorf("error")) _, _, _, err = s.ListByPartitions(req, schema, partitions) assert.NotNil(t, err) @@ -611,7 +611,7 @@ func TestListByPartitions(t *testing.T) { assert.Nil(t, err) cg.EXPECT().TableAdminClient(req, schema, "", &WarningBuffer{}).Return(ri, nil) // This tests that fields are being extracted from schema columns and the type specific fields map - cf.EXPECT().CacheFor(context.Background(), [][]string{{"some", "field"}, {`id`}, {`metadata`, `state`, `name`}, {"gvk", "specific", "fields"}}, gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(schema), attributes.Namespaced(schema), true).Return(c, nil) + cf.EXPECT().CacheFor(context.Background(), [][]string{{"some", "field"}, {`id`}, {`metadata`, `state`, `name`}, {"gvk", "specific", "fields"}}, nil, gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(schema), attributes.Namespaced(schema), true).Return(c, nil) bloi.EXPECT().ListByOptions(req.Context(), &opts, partitions, req.Namespace).Return(nil, 0, "", fmt.Errorf("error")) tb.EXPECT().GetTransformFunc(attributes.GVK(schema), gomock.Any()).Return(func(obj interface{}) (interface{}, error) { return obj, nil }) @@ -724,7 +724,7 @@ func TestListByPartitionWithUserAccess(t *testing.T) { } attributes.SetGVK(theSchema, gvk) cg.EXPECT().TableAdminClient(apiOp, theSchema, "", &WarningBuffer{}).Return(ri, nil) - cf.EXPECT().CacheFor(context.Background(), [][]string{{"some", "field"}, {"id"}, {"metadata", "state", "name"}}, gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(theSchema), attributes.Namespaced(theSchema), true).Return(c, nil) + cf.EXPECT().CacheFor(context.Background(), [][]string{{"some", "field"}, {"id"}, {"metadata", "state", "name"}}, gomock.Any(), gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(theSchema), attributes.Namespaced(theSchema), true).Return(c, nil) tb.EXPECT().GetTransformFunc(attributes.GVK(theSchema), gomock.Any()).Return(func(obj interface{}) (interface{}, error) { return obj, nil }) listToReturn := &unstructured.UnstructuredList{ @@ -766,7 +766,7 @@ func TestReset(t *testing.T) { cf.EXPECT().Reset().Return(nil) cs.EXPECT().SetColumns(gomock.Any(), gomock.Any()).Return(nil) cg.EXPECT().TableAdminClient(nil, &nsSchema, "", &WarningBuffer{}).Return(ri, nil) - cf.EXPECT().CacheFor(context.Background(), [][]string{{`id`}, {`metadata`, `state`, `name`}, {"metadata", "labels", "field.cattle.io/projectId"}}, gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(&nsSchema), false, true).Return(nsc2, nil) + cf.EXPECT().CacheFor(context.Background(), [][]string{{`id`}, {`metadata`, `state`, `name`}, {"metadata", "labels", "field.cattle.io/projectId"}, {"spec", "displayName"}}, nil, gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(&nsSchema), false, true).Return(nsc2, nil) tb.EXPECT().GetTransformFunc(attributes.GVK(&nsSchema), gomock.Any()).Return(func(obj interface{}) (interface{}, error) { return obj, nil }) err := s.Reset() assert.Nil(t, err) @@ -873,7 +873,7 @@ func TestReset(t *testing.T) { cf.EXPECT().Reset().Return(nil) cs.EXPECT().SetColumns(gomock.Any(), gomock.Any()).Return(nil) cg.EXPECT().TableAdminClient(nil, &nsSchema, "", &WarningBuffer{}).Return(ri, nil) - cf.EXPECT().CacheFor(context.Background(), [][]string{{`id`}, {`metadata`, `state`, `name`}, {"metadata", "labels", "field.cattle.io/projectId"}}, gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(&nsSchema), false, true).Return(factory.Cache{}, fmt.Errorf("error")) + cf.EXPECT().CacheFor(context.Background(), [][]string{{`id`}, {`metadata`, `state`, `name`}, {"metadata", "labels", "field.cattle.io/projectId"}, {"spec", "displayName"}}, nil, gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(&nsSchema), false, true).Return(factory.Cache{}, fmt.Errorf("error")) tb.EXPECT().GetTransformFunc(attributes.GVK(&nsSchema), gomock.Any()).Return(func(obj interface{}) (interface{}, error) { return obj, nil }) err := s.Reset() assert.NotNil(t, err)