From cf97607be556a83608b72779a2f8f57bfd88640d Mon Sep 17 00:00:00 2001 From: Tom Lebreux Date: Tue, 10 Jun 2025 15:29:42 -0600 Subject: [PATCH] Return `resourceversion too old` error to UI instead of logging (#667) * Return watch error instead of logging it The UI needs to know about watch error like `resourceversion too old` so we need to return it. * Sort by resourceVersion as number The UI makes some assumption on resourceVersion. It assumes they are a number and they are ordered by the number value. We'll want to fix this at some point most likely but for now let's give something in a way that UI wants. * Remove -d suffix After much testing, a delete of an object seems to have its own resourceVersion so we don't need the -d suffix, we can simply use the new resourceVersion. --- pkg/sqlcache/informer/informer.go | 14 +++- pkg/sqlcache/informer/listoption_indexer.go | 7 -- .../informer/listoption_indexer_test.go | 83 +++++++++++++++---- pkg/stores/sqlproxy/proxy_store.go | 6 +- 4 files changed, 84 insertions(+), 26 deletions(-) diff --git a/pkg/sqlcache/informer/informer.go b/pkg/sqlcache/informer/informer.go index 026b49ee..64461123 100644 --- a/pkg/sqlcache/informer/informer.go +++ b/pkg/sqlcache/informer/informer.go @@ -6,13 +6,16 @@ package informer import ( "context" + "errors" "sort" + "strconv" "time" "github.com/rancher/steve/pkg/sqlcache/db" "github.com/rancher/steve/pkg/sqlcache/partition" "github.com/rancher/steve/pkg/sqlcache/sqltypes" sqlStore "github.com/rancher/steve/pkg/sqlcache/store" + "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" @@ -67,7 +70,16 @@ func NewInformer(ctx context.Context, client dynamic.ResourceInterface, fields [ a, err := client.List(ctx, options) // We want the list to be consistent when there are going to be relists sort.SliceStable(a.Items, func(i int, j int) bool { - return a.Items[i].GetResourceVersion() < a.Items[j].GetResourceVersion() + var err error + rvI, err1 := strconv.Atoi(a.Items[i].GetResourceVersion()) + err = errors.Join(err, err1) + rvJ, err2 := strconv.Atoi(a.Items[j].GetResourceVersion()) + err = errors.Join(err, err2) + if err != nil { + logrus.Debug("ResourceVersion not a number, falling back to string comparison") + return a.Items[i].GetResourceVersion() < a.Items[j].GetResourceVersion() + } + return rvI < rvJ }) return a, err }, diff --git a/pkg/sqlcache/informer/listoption_indexer.go b/pkg/sqlcache/informer/listoption_indexer.go index bdef66cc..5549389b 100644 --- a/pkg/sqlcache/informer/listoption_indexer.go +++ b/pkg/sqlcache/informer/listoption_indexer.go @@ -451,13 +451,6 @@ func (l *ListOptionIndexer) notifyEvent(eventType watch.EventType, oldObj any, o } latestRV := acc.GetResourceVersion() - // Append a -d suffix because the RV might be the same as the previous object - // in the following case: - // - Add obj1 with RV 100 - // - Delete obj1 with RV 100 - if eventType == watch.Deleted { - latestRV = latestRV + "-d" - } _, err = tx.Stmt(l.upsertEventsStmt).Exec(latestRV, eventType, toBytes(obj)) if err != nil { return &db.QueryError{QueryString: l.upsertEventsQuery, Err: err} diff --git a/pkg/sqlcache/informer/listoption_indexer_test.go b/pkg/sqlcache/informer/listoption_indexer_test.go index d70a4b09..490f0f9e 100644 --- a/pkg/sqlcache/informer/listoption_indexer_test.go +++ b/pkg/sqlcache/informer/listoption_indexer_test.go @@ -2190,13 +2190,11 @@ func TestWatchResourceVersion(t *testing.T) { "app": "bar", }) - barNew := &unstructured.Unstructured{} - barNew.SetResourceVersion("160") - barNew.SetName("bar") - barNew.SetNamespace("bar") - barNew.SetLabels(map[string]string{ - "app": "bar", - }) + barDeleted := bar.DeepCopy() + barDeleted.SetResourceVersion("160") + + barNew := bar.DeepCopy() + barNew.SetResourceVersion("170") parentCtx := context.Background() @@ -2225,7 +2223,7 @@ func TestWatchResourceVersion(t *testing.T) { assert.NoError(t, err) rv3 := getRV(t) - err = loi.Delete(bar) + err = loi.Delete(barDeleted) assert.NoError(t, err) rv4 := getRV(t) @@ -2246,7 +2244,7 @@ func TestWatchResourceVersion(t *testing.T) { expectedEvents: []watch.Event{ {Type: watch.Modified, Object: fooUpdated}, {Type: watch.Added, Object: bar}, - {Type: watch.Deleted, Object: bar}, + {Type: watch.Deleted, Object: barDeleted}, {Type: watch.Added, Object: barNew}, }, }, @@ -2254,14 +2252,14 @@ func TestWatchResourceVersion(t *testing.T) { rv: rv2, expectedEvents: []watch.Event{ {Type: watch.Added, Object: bar}, - {Type: watch.Deleted, Object: bar}, + {Type: watch.Deleted, Object: barDeleted}, {Type: watch.Added, Object: barNew}, }, }, { rv: rv3, expectedEvents: []watch.Event{ - {Type: watch.Deleted, Object: bar}, + {Type: watch.Deleted, Object: barDeleted}, {Type: watch.Added, Object: barNew}, }, }, @@ -2344,9 +2342,11 @@ func TestWatchGarbageCollection(t *testing.T) { bar.SetResourceVersion("150") bar.SetName("bar") - barNew := &unstructured.Unstructured{} - barNew.SetResourceVersion("160") - barNew.SetName("bar") + barDeleted := bar.DeepCopy() + barDeleted.SetResourceVersion("160") + + barNew := bar.DeepCopy() + barNew.SetResourceVersion("170") parentCtx := context.Background() @@ -2375,7 +2375,7 @@ func TestWatchGarbageCollection(t *testing.T) { assert.NoError(t, err) rv3 := getRV(t) - err = loi.Delete(bar) + err = loi.Delete(barDeleted) assert.NoError(t, err) rv4 := getRV(t) @@ -2394,7 +2394,7 @@ func TestWatchGarbageCollection(t *testing.T) { { rv: rv3, expectedEvents: []watch.Event{ - {Type: watch.Deleted, Object: bar}, + {Type: watch.Deleted, Object: barDeleted}, }, }, { @@ -2449,3 +2449,54 @@ func TestWatchGarbageCollection(t *testing.T) { assert.NoError(t, err) } } + +func TestNonNumberResourceVersion(t *testing.T) { + ctx := context.Background() + + opts := ListOptionIndexerOptions{ + Fields: [][]string{{"metadata", "somefield"}}, + IsNamespaced: true, + } + loi, err := makeListOptionIndexer(ctx, opts) + assert.NoError(t, err) + + foo := &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "name": "foo", + }, + }, + } + foo.SetResourceVersion("a") + foo2 := foo.DeepCopy() + foo2.SetResourceVersion("b") + foo2.SetLabels(map[string]string{ + "hello": "world", + }) + bar := &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "name": "bar", + }, + }, + } + bar.SetResourceVersion("c") + err = loi.Add(foo) + assert.NoError(t, err) + err = loi.Update(foo2) + assert.NoError(t, err) + err = loi.Add(bar) + assert.NoError(t, err) + + expectedUnstructured := &unstructured.Unstructured{ + Object: map[string]any{ + "items": []any{bar.Object, foo2.Object}, + }, + } + expectedList, err := expectedUnstructured.ToList() + require.NoError(t, err) + + list, _, _, err := loi.ListByOptions(ctx, &sqltypes.ListOptions{}, []partition.Partition{{All: true}}, "") + assert.NoError(t, err) + assert.Equal(t, expectedList.Items, list.Items) +} diff --git a/pkg/stores/sqlproxy/proxy_store.go b/pkg/stores/sqlproxy/proxy_store.go index 41ecc956..7af34581 100644 --- a/pkg/stores/sqlproxy/proxy_store.go +++ b/pkg/stores/sqlproxy/proxy_store.go @@ -564,6 +564,8 @@ func (s *Store) watch(apiOp *types.APIRequest, schema *types.APISchema, w types. result := make(chan watch.Event) go func() { + defer close(result) + ctx := apiOp.Context() idNamespace, _ := kv.RSplit(w.ID, "/") if idNamespace == "" { @@ -580,10 +582,10 @@ func (s *Store) watch(apiOp *types.APIRequest, schema *types.APISchema, w types. } err := inf.ByOptionsLister.Watch(ctx, opts, result) if err != nil { - logrus.Error(err) + returnErr(err, result) + return } logrus.Debugf("closing watcher for %s", schema.ID) - close(result) }() return result, nil }