From 45a3b0781645f9f2865603a44ef63d9d07c3888d Mon Sep 17 00:00:00 2001 From: Tom Lebreux Date: Wed, 10 Sep 2025 15:09:25 -0400 Subject: [PATCH] Fix send to closed channel panic in watch (#817) --- pkg/sqlcache/informer/listoption_indexer.go | 5 ++ .../informer/listoption_indexer_test.go | 65 +++++++++++++++++++ 2 files changed, 70 insertions(+) diff --git a/pkg/sqlcache/informer/listoption_indexer.go b/pkg/sqlcache/informer/listoption_indexer.go index 972c6e23..588b266c 100644 --- a/pkg/sqlcache/informer/listoption_indexer.go +++ b/pkg/sqlcache/informer/listoption_indexer.go @@ -399,6 +399,11 @@ func (l *ListOptionIndexer) Watch(ctx context.Context, opts WatchOptions, events return nil }) if err != nil { + // We might have added a watcher but the transaction failed in + // which case we still want to remove the watcher + if key != nil { + l.removeWatcher(key) + } return err } diff --git a/pkg/sqlcache/informer/listoption_indexer_test.go b/pkg/sqlcache/informer/listoption_indexer_test.go index 10d37393..ee8fe393 100644 --- a/pkg/sqlcache/informer/listoption_indexer_test.go +++ b/pkg/sqlcache/informer/listoption_indexer_test.go @@ -3505,3 +3505,68 @@ func TestNonNumberResourceVersion(t *testing.T) { require.NoError(t, err) assert.Equal(t, expectedList.Items, list.Items) } + +// Test that we don't panic in case the transaction fails but stil manages to add a watcher +func TestWatchCancel(t *testing.T) { + startWatcher := func(ctx context.Context, loi *ListOptionIndexer, rv string) (chan watch.Event, chan error) { + eventsCh := make(chan watch.Event, 1) + errCh := make(chan error, 1) + go func() { + watchErr := loi.Watch(ctx, WatchOptions{ResourceVersion: rv}, eventsCh) + errCh <- watchErr + close(eventsCh) + }() + time.Sleep(100 * time.Millisecond) + return eventsCh, errCh + } + + ctx := context.Background() + + opts := ListOptionIndexerOptions{ + Fields: [][]string{{"metadata", "somefield"}}, + IsNamespaced: true, + } + loi, dbPath, err := makeListOptionIndexer(ctx, opts, false, emptyNamespaceList) + defer cleanTempFiles(dbPath) + assert.NoError(t, err) + + foo := &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "name": "foo", + }, + }, + } + foo.SetResourceVersion("100") + + foo2 := foo.DeepCopy() + foo2.SetResourceVersion("200") + + foo3 := foo.DeepCopy() + foo3.SetResourceVersion("300") + + err = loi.Add(foo) + assert.NoError(t, err) + loi.Add(foo2) + assert.NoError(t, err) + loi.Add(foo3) + assert.NoError(t, err) + + watchCtx, watchCancel := context.WithCancel(ctx) + + eventsCh, errCh := startWatcher(watchCtx, loi, "100") + + <-eventsCh + + watchCancel() + + <-eventsCh + + go func() { + foo4 := foo.DeepCopy() + foo4.SetResourceVersion("400") + loi.Add(foo4) + }() + <-errCh + time.Sleep(1 * time.Second) +}