diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index 099eeabb..7893f5f6 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -397,6 +397,11 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors select { case <-stopCh: + // we can only end up here when the stopCh + // was closed after a successful watchlist or list request + if w != nil { + w.Stop() + } return nil default: } diff --git a/tools/cache/reflector_test.go b/tools/cache/reflector_test.go index b26fe345..611357b7 100644 --- a/tools/cache/reflector_test.go +++ b/tools/cache/reflector_test.go @@ -28,6 +28,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" @@ -124,6 +126,26 @@ func TestReflectorResyncChan(t *testing.T) { } } +// TestEstablishedWatchStoppedAfterStopCh ensures that +// an established watch will be closed right after +// the StopCh was also closed. +func TestEstablishedWatchStoppedAfterStopCh(t *testing.T) { + ctx, ctxCancel := context.WithCancel(context.TODO()) + ctxCancel() + w := watch.NewFake() + require.False(t, w.IsStopped()) + + // w is stopped when the stopCh is closed + target := NewReflector(nil, &v1.Pod{}, nil, 0) + err := target.watch(w, ctx.Done(), nil) + require.NoError(t, err) + require.True(t, w.IsStopped()) + + // noop when the w is nil and the ctx is closed + err = target.watch(nil, ctx.Done(), nil) + require.NoError(t, err) +} + func BenchmarkReflectorResyncChanMany(b *testing.B) { s := NewStore(MetaNamespaceKeyFunc) g := NewReflector(&testLW{}, &v1.Pod{}, s, 25*time.Millisecond)