From 187c42c88498188fc72b161f2c04d77ce3716866 Mon Sep 17 00:00:00 2001 From: Rajneesh180 Date: Wed, 4 Mar 2026 16:05:26 +0530 Subject: [PATCH] client-go: guard ResultChan() close in StartEventWatcher When watch.Broadcaster.Shutdown() is called it drains all queued events then calls closeAll(), which closes every watcher's result channel. eventBroadcasterImpl.Shutdown() calls Broadcaster.Shutdown() first, then calls the cancellation context's cancel() function. Between those two steps there is a window in which the result channel is closed while the cancellation context is still live. Without the two-value channel receive the goroutine in StartEventWatcher would spin on the already-closed channel: each select iteration immediately receives the zero-value watch.Event, the type assertion fails (nil interface, ok == false), and the loop continues burning CPU until the select scheduler eventually picks the cancelationCtx.Done() case. Guard against this by reading the ok boolean from the channel receive: case watchEvent, ok := <-watcher.ResultChan(): if !ok { return } This is the correct and idiomatic Go pattern for a channel that may be closed by its producer. Note that when this return path is taken the broadcaster has already delivered every queued event (Broadcaster.Shutdown blocks until the distribute loop exits before closeAll runs), so no events are silently dropped. Add a regression test (TestStartEventWatcherExitsOnDirectShutdown) that creates a broadcaster without an external context so Shutdown() is fully synchronous, starts a watcher, and verifies the goroutine exits cleanly via goleak.VerifyNone. Signed-off-by: Rajneesh180 Kubernetes-commit: 95c15b54069922b0a66c198a064577ea0a160694 --- tools/record/event.go | 5 ++++- tools/record/event_test.go | 29 +++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/tools/record/event.go b/tools/record/event.go index 58322d44d..305a92438 100644 --- a/tools/record/event.go +++ b/tools/record/event.go @@ -408,7 +408,10 @@ func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) w case <-e.cancelationCtx.Done(): watcher.Stop() return - case watchEvent := <-watcher.ResultChan(): + case watchEvent, ok := <-watcher.ResultChan(): + if !ok { + return + } event, ok := watchEvent.Object.(*v1.Event) if !ok { // This is all local, so there's no reason this should diff --git a/tools/record/event_test.go b/tools/record/event_test.go index c5748726f..4797f2994 100644 --- a/tools/record/event_test.go +++ b/tools/record/event_test.go @@ -135,6 +135,35 @@ func TestBroadcasterShutdown(t *testing.T) { goleak.VerifyNone(t) } +// TestStartEventWatcherExitsOnDirectShutdown is a regression test for the +// hot-loop that occurred when Broadcaster.Shutdown() closed the watcher's +// result channel before the cancellation context was cancelled. +// +// When NewBroadcaster is called without a context, Shutdown() sequentially: +// +// 1. Calls e.Broadcaster.Shutdown() → watcher result channels are closed via +// closeAll(); all queued events have already been delivered at this point. +// 2. Calls e.cancel() → cancelationCtx is cancelled. +// +// Between steps 1 and 2 there is a window where ResultChan() is closed but +// cancelationCtx.Done() has not yet fired. The goroutine inside +// StartEventWatcher must detect the closed channel (ok == false) and return +// cleanly instead of spinning. goleak.VerifyTestMain (installed in +// main_test.go) will report a leaked goroutine if the fix regresses. +func TestStartEventWatcherExitsOnDirectShutdown(t *testing.T) { + // No context → haveCtxCancelation is false, so no background Broadcaster + // shutdown goroutine; Shutdown() is fully synchronous here. + caster := NewBroadcaster(WithSleepDuration(0)) + caster.StartStructuredLogging(0) + + // Direct shutdown: closes watcher channels first, then cancels context. + caster.Shutdown() + + // goleak.VerifyTestMain checks for leaks across the whole test binary, but + // explicitly verifying here surfaces any regression in this specific path. + goleak.VerifyNone(t) +} + func TestNonRacyShutdown(t *testing.T) { // Attempt to simulate previously racy conditions, and ensure that no race // occurs: Nominally, calling "Eventf" *followed by* shutdown from the same