diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index 157436da..67e6feb9 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -732,6 +732,8 @@ func watchHandler(start time.Time, stopCh <-chan struct{}, ) error { eventCount := 0 + initialEventsEndBookmarkWarningTicker := newInitialEventsEndBookmarkTicker(name, clock, start, exitOnInitialEventsEndBookmark != nil) + defer initialEventsEndBookmarkWarningTicker.Stop() if exitOnInitialEventsEndBookmark != nil { // set it to false just in case somebody // made it positive @@ -809,6 +811,9 @@ loop: klog.V(4).Infof("exiting %v Watch because received the bookmark that marks the end of initial events stream, total %v items received in %v", name, eventCount, watchDuration) return nil } + initialEventsEndBookmarkWarningTicker.observeLastEventTimeStamp(clock.Now()) + case <-initialEventsEndBookmarkWarningTicker.C(): + initialEventsEndBookmarkWarningTicker.warnIfExpired() } } @@ -929,3 +934,88 @@ func isWatchErrorRetriable(err error) bool { } return false } + +// initialEventsEndBookmarkTicker a ticker that produces a warning if the bookmark event +// which marks the end of the watch stream, has not been received within the defined tick interval. +// +// Note: +// The methods exposed by this type are not thread-safe. +type initialEventsEndBookmarkTicker struct { + clock.Ticker + clock clock.Clock + name string + + watchStart time.Time + tickInterval time.Duration + lastEventObserveTime time.Time +} + +// newInitialEventsEndBookmarkTicker returns a noop ticker if exitOnInitialEventsEndBookmarkRequested is false. +// Otherwise, it returns a ticker that exposes a method producing a warning if the bookmark event, +// which marks the end of the watch stream, has not been received within the defined tick interval. +// +// Note that the caller controls whether to call t.C() and t.Stop(). +// +// In practice, the reflector exits the watchHandler as soon as the bookmark event is received and calls the t.C() method. +func newInitialEventsEndBookmarkTicker(name string, c clock.Clock, watchStart time.Time, exitOnInitialEventsEndBookmarkRequested bool) *initialEventsEndBookmarkTicker { + return newInitialEventsEndBookmarkTickerInternal(name, c, watchStart, 10*time.Second, exitOnInitialEventsEndBookmarkRequested) +} + +func newInitialEventsEndBookmarkTickerInternal(name string, c clock.Clock, watchStart time.Time, tickInterval time.Duration, exitOnInitialEventsEndBookmarkRequested bool) *initialEventsEndBookmarkTicker { + clockWithTicker, ok := c.(clock.WithTicker) + if !ok || !exitOnInitialEventsEndBookmarkRequested { + if exitOnInitialEventsEndBookmarkRequested { + klog.Warningf("clock does not support WithTicker interface but exitOnInitialEventsEndBookmark was requested") + } + return &initialEventsEndBookmarkTicker{ + Ticker: &noopTicker{}, + } + } + + return &initialEventsEndBookmarkTicker{ + Ticker: clockWithTicker.NewTicker(tickInterval), + clock: c, + name: name, + watchStart: watchStart, + tickInterval: tickInterval, + } +} + +func (t *initialEventsEndBookmarkTicker) observeLastEventTimeStamp(lastEventObserveTime time.Time) { + t.lastEventObserveTime = lastEventObserveTime +} + +func (t *initialEventsEndBookmarkTicker) warnIfExpired() { + if err := t.produceWarningIfExpired(); err != nil { + klog.Warning(err) + } +} + +// produceWarningIfExpired returns an error that represents a warning when +// the time elapsed since the last received event exceeds the tickInterval. +// +// Note that this method should be called when t.C() yields a value. +func (t *initialEventsEndBookmarkTicker) produceWarningIfExpired() error { + if _, ok := t.Ticker.(*noopTicker); ok { + return nil /*noop ticker*/ + } + if t.lastEventObserveTime.IsZero() { + return fmt.Errorf("%s: awaiting required bookmark event for initial events stream, no events received for %v", t.name, t.clock.Since(t.watchStart)) + } + elapsedTime := t.clock.Now().Sub(t.lastEventObserveTime) + hasBookmarkTimerExpired := elapsedTime >= t.tickInterval + + if !hasBookmarkTimerExpired { + return nil + } + return fmt.Errorf("%s: hasn't received required bookmark event marking the end of initial events stream, received last event %v ago", t.name, elapsedTime) +} + +var _ clock.Ticker = &noopTicker{} + +// TODO(#115478): move to k8s/utils repo +type noopTicker struct{} + +func (t *noopTicker) C() <-chan time.Time { return nil } + +func (t *noopTicker) Stop() {} diff --git a/tools/cache/reflector_watchlist_test.go b/tools/cache/reflector_watchlist_test.go index a2eec919..c2fbeca8 100644 --- a/tools/cache/reflector_watchlist_test.go +++ b/tools/cache/reflector_watchlist_test.go @@ -17,13 +17,16 @@ limitations under the License. package cache import ( + "errors" "fmt" "sort" "sync" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -32,10 +35,78 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" + testingclock "k8s.io/utils/clock/testing" "k8s.io/utils/pointer" "k8s.io/utils/ptr" ) +func TestInitialEventsEndBookmarkTicker(t *testing.T) { + assertNoEvents := func(t *testing.T, c <-chan time.Time) { + select { + case e := <-c: + t.Errorf("Unexpected: %#v event received, expected no events", e) + default: + return + } + } + + t.Run("testing NoopInitialEventsEndBookmarkTicker", func(t *testing.T) { + clock := testingclock.NewFakeClock(time.Now()) + target := newInitialEventsEndBookmarkTickerInternal("testName", clock, clock.Now(), time.Second, false) + + clock.Step(30 * time.Second) + assertNoEvents(t, target.C()) + actualWarning := target.produceWarningIfExpired() + + require.Empty(t, actualWarning, "didn't expect any warning") + // validate if the other methods don't produce panic + target.warnIfExpired() + target.observeLastEventTimeStamp(clock.Now()) + + // make sure that after calling the other methods + // nothing hasn't changed + actualWarning = target.produceWarningIfExpired() + require.Empty(t, actualWarning, "didn't expect any warning") + assertNoEvents(t, target.C()) + + target.Stop() + }) + + t.Run("testing InitialEventsEndBookmarkTicker backed by a fake clock", func(t *testing.T) { + clock := testingclock.NewFakeClock(time.Now()) + target := newInitialEventsEndBookmarkTickerInternal("testName", clock, clock.Now(), time.Second, true) + clock.Step(500 * time.Millisecond) + assertNoEvents(t, target.C()) + + clock.Step(500 * time.Millisecond) + <-target.C() + actualWarning := target.produceWarningIfExpired() + require.Equal(t, errors.New("testName: awaiting required bookmark event for initial events stream, no events received for 1s"), actualWarning) + + clock.Step(time.Second) + <-target.C() + actualWarning = target.produceWarningIfExpired() + require.Equal(t, errors.New("testName: awaiting required bookmark event for initial events stream, no events received for 2s"), actualWarning) + + target.observeLastEventTimeStamp(clock.Now()) + clock.Step(500 * time.Millisecond) + assertNoEvents(t, target.C()) + + clock.Step(500 * time.Millisecond) + <-target.C() + actualWarning = target.produceWarningIfExpired() + require.Equal(t, errors.New("testName: hasn't received required bookmark event marking the end of initial events stream, received last event 1s ago"), actualWarning) + + clock.Step(time.Second) + <-target.C() + actualWarning = target.produceWarningIfExpired() + require.Equal(t, errors.New("testName: hasn't received required bookmark event marking the end of initial events stream, received last event 2s ago"), actualWarning) + + target.Stop() + assertNoEvents(t, target.C()) + }) +} + func TestWatchList(t *testing.T) { scenarios := []struct { name string