Merge pull request #136509 from pohly/client-go-informer-deadlock

client-go informers: fix potential deadlock

Kubernetes-commit: cc0fdc1fe2cfada52875a0c5fa115f69b4dd477a
This commit is contained in:
Kubernetes Publisher
2026-01-30 01:22:14 +05:30
2 changed files with 108 additions and 32 deletions

View File

@@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"k8s.io/apimachinery/pkg/api/meta"
@@ -302,9 +303,12 @@ func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defa
func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.Object, options SharedIndexInformerOptions) SharedIndexInformer {
realClock := &clock.RealClock{}
processor := &sharedProcessor{clock: realClock}
processor.listenersRCond = sync.NewCond(processor.listenersLock.RLocker())
return &sharedIndexInformer{
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers),
processor: &sharedProcessor{clock: realClock},
processor: processor,
listerWatcher: lw,
objectType: exampleObject,
objectDescription: options.ObjectDescription,
@@ -799,6 +803,7 @@ func (s *sharedIndexInformer) RemoveEventHandler(handle ResourceEventHandlerRegi
type sharedProcessor struct {
listenersStarted bool
listenersLock sync.RWMutex
listenersRCond *sync.Cond // Caller of Wait must hold a read lock on listenersLock.
// Map from listeners to whether or not they are currently syncing
listeners map[*processorListener]bool
clock clock.Clock
@@ -868,6 +873,14 @@ func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
// Before we start blocking on writes to the listeners' channels,
// ensure that they all have been started. If the processor stops,
// p.listeners gets cleared, in which case we also continue here
// and return without doing anything.
for !p.listenersStarted && len(p.listeners) > 0 {
p.listenersRCond.Wait()
}
for listener, isSyncing := range p.listeners {
switch {
case !sync:
@@ -882,15 +895,25 @@ func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
}
}
// sharedProcessorRunHook can be used inside tests to execute additional code
// at the start of sharedProcessor.run.
var sharedProcessorRunHook atomic.Pointer[func()]
func (p *sharedProcessor) run(ctx context.Context) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
hook := sharedProcessorRunHook.Load()
if hook != nil {
(*hook)()
}
// Changing listenersStarted needs a write lock.
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
for listener := range p.listeners {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
p.listenersRCond.Signal()
}()
<-ctx.Done()
@@ -907,6 +930,9 @@ func (p *sharedProcessor) run(ctx context.Context) {
// Reset to false since no listeners are running
p.listenersStarted = false
// Wake up sharedProcessor.distribute.
p.listenersRCond.Signal()
p.wg.Wait() // Wait for all .pop() and .run() to stop
}

View File

@@ -25,6 +25,7 @@ import (
"strings"
"sync"
"testing"
"testing/synctest"
"time"
"github.com/google/go-cmp/cmp"
@@ -46,6 +47,7 @@ import (
)
type testListener struct {
printlnFunc func(string) // Some, but not all tests use this for per-test output.
lock sync.RWMutex
resyncPeriod time.Duration
expectedItemNames sets.Set[string]
@@ -73,9 +75,18 @@ func (l *testListener) OnUpdate(old, new interface{}) {
func (l *testListener) OnDelete(obj interface{}) {
}
func (l *testListener) println(msg string) {
msg = l.name + ": " + msg
if l.printlnFunc != nil {
l.printlnFunc(msg)
} else {
fmt.Println(msg)
}
}
func (l *testListener) handle(obj interface{}) {
key, _ := MetaNamespaceKeyFunc(obj)
fmt.Printf("%s: handle: %v\n", l.name, key)
l.println(fmt.Sprintf("handle: %v", key))
l.lock.Lock()
defer l.lock.Unlock()
objectMeta, _ := meta.Accessor(obj)
@@ -83,7 +94,7 @@ func (l *testListener) handle(obj interface{}) {
}
func (l *testListener) ok() bool {
fmt.Println("polling")
l.println("polling")
err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) {
if l.satisfiedExpectations() {
return true, nil
@@ -95,9 +106,9 @@ func (l *testListener) ok() bool {
}
// wait just a bit to allow any unexpected stragglers to come in
fmt.Println("sleeping")
l.println("sleeping")
time.Sleep(1 * time.Second)
fmt.Println("final check")
l.println("final check")
return l.satisfiedExpectations()
}
@@ -108,6 +119,12 @@ func (l *testListener) satisfiedExpectations() bool {
return sets.New(l.receivedItemNames...).Equal(l.expectedItemNames)
}
func (l *testListener) reset() {
l.lock.Lock()
defer l.lock.Unlock()
l.receivedItemNames = nil
}
func eventHandlerCount(i SharedInformer) int {
s := i.(*sharedIndexInformer)
s.startedLock.Lock()
@@ -208,6 +225,34 @@ func TestIndexer(t *testing.T) {
}
func TestListenerResyncPeriods(t *testing.T) {
synctest.Test(t, func(t *testing.T) { testListenerResyncPeriods(t, 0) })
}
func TestListenerResyncPeriodsDelayed(t *testing.T) {
// sharedProcessor used to deadlock when sharedProcessor.run happened
// to be reached after the first sharedProcessor.distribute.
// This artififical delay simulates that situation.
//
// Must not run in parallel to other tests, but it doesn't need to:
// it doesn't sleep in real-world time and therefore completes quickly.
synctest.Test(t, func(t *testing.T) { testListenerResyncPeriods(t, time.Second) })
}
func testListenerResyncPeriods(t *testing.T, startupDelay time.Duration) {
start := time.Now()
println := func(msg string) {
delta := time.Since(start)
t.Logf("%s: %s", delta, msg)
}
if startupDelay > 0 {
hook := func() {
time.Sleep(startupDelay)
}
sharedProcessorRunHook.Store(&hook)
defer sharedProcessorRunHook.Store(nil)
}
// source simulates an apiserver object endpoint.
source := newFakeControllerSource(t)
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
@@ -216,20 +261,19 @@ func TestListenerResyncPeriods(t *testing.T) {
// create the shared informer and resync every 1s
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
clock := testingclock.NewFakeClock(time.Now())
informer.clock = clock
informer.processor.clock = clock
// listener 1, never resync
listener1 := newTestListener("listener1", 0, "pod1", "pod2")
listener1.printlnFunc = println
informer.AddEventHandlerWithResyncPeriod(listener1, listener1.resyncPeriod)
// listener 2, resync every 2s
listener2 := newTestListener("listener2", 2*time.Second, "pod1", "pod2")
listener2.printlnFunc = println
informer.AddEventHandlerWithResyncPeriod(listener2, listener2.resyncPeriod)
// listener 3, resync every 3s
listener3 := newTestListener("listener3", 3*time.Second, "pod1", "pod2")
listener3.printlnFunc = println
informer.AddEventHandlerWithResyncPeriod(listener3, listener3.resyncPeriod)
listeners := []*testListener{listener1, listener2, listener3}
@@ -241,59 +285,65 @@ func TestListenerResyncPeriods(t *testing.T) {
wg.Wait()
}()
// ensure all listeners got the initial List
// We must potentially advance time to unblock sharedProcess.run,
// otherwise synctest.Wait() below returns without anything
// being done.
time.Sleep(startupDelay)
// Ensure all listeners got the initial after the initial processing, at the initial start time.
//
// synctest.Wait doesn't detect deadlocks involving mutexes: they are considered not durably
// blocking, so in case of such a deadlock it'll just keep waiting until the entire unit test
// times out. The backtraces then show the deadlock.
synctest.Wait()
for _, listener := range listeners {
if !listener.ok() {
t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames)
if !listener.satisfiedExpectations() {
t.Errorf("%s: %s: expected %v, got %v", time.Since(start), listener.name, listener.expectedItemNames, listener.receivedItemNames)
}
}
// reset
for _, listener := range listeners {
listener.receivedItemNames = []string{}
listener.reset()
}
// advance so listener2 gets a resync
clock.Step(2 * time.Second)
time.Sleep(2*time.Second - startupDelay)
synctest.Wait()
// make sure listener2 got the resync
if !listener2.ok() {
t.Errorf("%s: expected %v, got %v", listener2.name, listener2.expectedItemNames, listener2.receivedItemNames)
if !listener2.satisfiedExpectations() {
t.Errorf("%s: %s: expected %v, got %v", time.Since(start), listener2.name, listener2.expectedItemNames, listener2.receivedItemNames)
}
// wait a bit to give errant items a chance to go to 1 and 3
time.Sleep(1 * time.Second)
// make sure listeners 1 and 3 got nothing
if len(listener1.receivedItemNames) != 0 {
t.Errorf("listener1: should not have resynced (got %d)", len(listener1.receivedItemNames))
t.Errorf("%s: listener1: should not have resynced (got %d)", time.Since(start), len(listener1.receivedItemNames))
}
if len(listener3.receivedItemNames) != 0 {
t.Errorf("listener3: should not have resynced (got %d)", len(listener3.receivedItemNames))
t.Errorf("%s: listener3: should not have resynced (got %d)", time.Since(start), len(listener3.receivedItemNames))
}
// reset
for _, listener := range listeners {
listener.receivedItemNames = []string{}
listener.reset()
}
// advance so listener3 gets a resync
clock.Step(1 * time.Second)
time.Sleep(1 * time.Second)
synctest.Wait()
// make sure listener3 got the resync
if !listener3.ok() {
t.Errorf("%s: expected %v, got %v", listener3.name, listener3.expectedItemNames, listener3.receivedItemNames)
if !listener3.satisfiedExpectations() {
t.Errorf("%s: %s: expected %v, got %v", time.Since(start), listener3.name, listener3.expectedItemNames, listener3.receivedItemNames)
}
// wait a bit to give errant items a chance to go to 1 and 2
time.Sleep(1 * time.Second)
// make sure listeners 1 and 2 got nothing
if len(listener1.receivedItemNames) != 0 {
t.Errorf("listener1: should not have resynced (got %d)", len(listener1.receivedItemNames))
t.Errorf("%s: listener1: should not have resynced (got %d)", time.Since(start), len(listener1.receivedItemNames))
}
if len(listener2.receivedItemNames) != 0 {
t.Errorf("listener2: should not have resynced (got %d)", len(listener2.receivedItemNames))
t.Errorf("%s: listener2: should not have resynced (got %d)", time.Since(start), len(listener2.receivedItemNames))
}
}