client-go/reflector: warns when the bookmark event for initial events hasn't been received

Kubernetes-commit: 93960f489069a744afda1be42f82349e25d7e4d7
This commit is contained in:
Lukasz Szaszkiewicz 2024-04-29 15:18:54 +02:00 committed by Kubernetes Publisher
parent 62f959700d
commit 0280901a4d
2 changed files with 161 additions and 0 deletions

View File

@ -732,6 +732,8 @@ func watchHandler(start time.Time,
stopCh <-chan struct{},
) error {
eventCount := 0
initialEventsEndBookmarkWarningTicker := newInitialEventsEndBookmarkTicker(name, clock, start, exitOnInitialEventsEndBookmark != nil)
defer initialEventsEndBookmarkWarningTicker.Stop()
if exitOnInitialEventsEndBookmark != nil {
// set it to false just in case somebody
// made it positive
@ -809,6 +811,9 @@ loop:
klog.V(4).Infof("exiting %v Watch because received the bookmark that marks the end of initial events stream, total %v items received in %v", name, eventCount, watchDuration)
return nil
}
initialEventsEndBookmarkWarningTicker.observeLastEventTimeStamp(clock.Now())
case <-initialEventsEndBookmarkWarningTicker.C():
initialEventsEndBookmarkWarningTicker.warnIfExpired()
}
}
@ -929,3 +934,88 @@ func isWatchErrorRetriable(err error) bool {
}
return false
}
// initialEventsEndBookmarkTicker a ticker that produces a warning if the bookmark event
// which marks the end of the watch stream, has not been received within the defined tick interval.
//
// Note:
// The methods exposed by this type are not thread-safe.
type initialEventsEndBookmarkTicker struct {
clock.Ticker
clock clock.Clock
name string
watchStart time.Time
tickInterval time.Duration
lastEventObserveTime time.Time
}
// newInitialEventsEndBookmarkTicker returns a noop ticker if exitOnInitialEventsEndBookmarkRequested is false.
// Otherwise, it returns a ticker that exposes a method producing a warning if the bookmark event,
// which marks the end of the watch stream, has not been received within the defined tick interval.
//
// Note that the caller controls whether to call t.C() and t.Stop().
//
// In practice, the reflector exits the watchHandler as soon as the bookmark event is received and calls the t.C() method.
func newInitialEventsEndBookmarkTicker(name string, c clock.Clock, watchStart time.Time, exitOnInitialEventsEndBookmarkRequested bool) *initialEventsEndBookmarkTicker {
return newInitialEventsEndBookmarkTickerInternal(name, c, watchStart, 10*time.Second, exitOnInitialEventsEndBookmarkRequested)
}
func newInitialEventsEndBookmarkTickerInternal(name string, c clock.Clock, watchStart time.Time, tickInterval time.Duration, exitOnInitialEventsEndBookmarkRequested bool) *initialEventsEndBookmarkTicker {
clockWithTicker, ok := c.(clock.WithTicker)
if !ok || !exitOnInitialEventsEndBookmarkRequested {
if exitOnInitialEventsEndBookmarkRequested {
klog.Warningf("clock does not support WithTicker interface but exitOnInitialEventsEndBookmark was requested")
}
return &initialEventsEndBookmarkTicker{
Ticker: &noopTicker{},
}
}
return &initialEventsEndBookmarkTicker{
Ticker: clockWithTicker.NewTicker(tickInterval),
clock: c,
name: name,
watchStart: watchStart,
tickInterval: tickInterval,
}
}
func (t *initialEventsEndBookmarkTicker) observeLastEventTimeStamp(lastEventObserveTime time.Time) {
t.lastEventObserveTime = lastEventObserveTime
}
func (t *initialEventsEndBookmarkTicker) warnIfExpired() {
if err := t.produceWarningIfExpired(); err != nil {
klog.Warning(err)
}
}
// produceWarningIfExpired returns an error that represents a warning when
// the time elapsed since the last received event exceeds the tickInterval.
//
// Note that this method should be called when t.C() yields a value.
func (t *initialEventsEndBookmarkTicker) produceWarningIfExpired() error {
if _, ok := t.Ticker.(*noopTicker); ok {
return nil /*noop ticker*/
}
if t.lastEventObserveTime.IsZero() {
return fmt.Errorf("%s: awaiting required bookmark event for initial events stream, no events received for %v", t.name, t.clock.Since(t.watchStart))
}
elapsedTime := t.clock.Now().Sub(t.lastEventObserveTime)
hasBookmarkTimerExpired := elapsedTime >= t.tickInterval
if !hasBookmarkTimerExpired {
return nil
}
return fmt.Errorf("%s: hasn't received required bookmark event marking the end of initial events stream, received last event %v ago", t.name, elapsedTime)
}
var _ clock.Ticker = &noopTicker{}
// TODO(#115478): move to k8s/utils repo
type noopTicker struct{}
func (t *noopTicker) C() <-chan time.Time { return nil }
func (t *noopTicker) Stop() {}

View File

@ -17,13 +17,16 @@ limitations under the License.
package cache
import (
"errors"
"fmt"
"sort"
"sync"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -32,10 +35,78 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
testingclock "k8s.io/utils/clock/testing"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"
)
func TestInitialEventsEndBookmarkTicker(t *testing.T) {
assertNoEvents := func(t *testing.T, c <-chan time.Time) {
select {
case e := <-c:
t.Errorf("Unexpected: %#v event received, expected no events", e)
default:
return
}
}
t.Run("testing NoopInitialEventsEndBookmarkTicker", func(t *testing.T) {
clock := testingclock.NewFakeClock(time.Now())
target := newInitialEventsEndBookmarkTickerInternal("testName", clock, clock.Now(), time.Second, false)
clock.Step(30 * time.Second)
assertNoEvents(t, target.C())
actualWarning := target.produceWarningIfExpired()
require.Empty(t, actualWarning, "didn't expect any warning")
// validate if the other methods don't produce panic
target.warnIfExpired()
target.observeLastEventTimeStamp(clock.Now())
// make sure that after calling the other methods
// nothing hasn't changed
actualWarning = target.produceWarningIfExpired()
require.Empty(t, actualWarning, "didn't expect any warning")
assertNoEvents(t, target.C())
target.Stop()
})
t.Run("testing InitialEventsEndBookmarkTicker backed by a fake clock", func(t *testing.T) {
clock := testingclock.NewFakeClock(time.Now())
target := newInitialEventsEndBookmarkTickerInternal("testName", clock, clock.Now(), time.Second, true)
clock.Step(500 * time.Millisecond)
assertNoEvents(t, target.C())
clock.Step(500 * time.Millisecond)
<-target.C()
actualWarning := target.produceWarningIfExpired()
require.Equal(t, errors.New("testName: awaiting required bookmark event for initial events stream, no events received for 1s"), actualWarning)
clock.Step(time.Second)
<-target.C()
actualWarning = target.produceWarningIfExpired()
require.Equal(t, errors.New("testName: awaiting required bookmark event for initial events stream, no events received for 2s"), actualWarning)
target.observeLastEventTimeStamp(clock.Now())
clock.Step(500 * time.Millisecond)
assertNoEvents(t, target.C())
clock.Step(500 * time.Millisecond)
<-target.C()
actualWarning = target.produceWarningIfExpired()
require.Equal(t, errors.New("testName: hasn't received required bookmark event marking the end of initial events stream, received last event 1s ago"), actualWarning)
clock.Step(time.Second)
<-target.C()
actualWarning = target.produceWarningIfExpired()
require.Equal(t, errors.New("testName: hasn't received required bookmark event marking the end of initial events stream, received last event 2s ago"), actualWarning)
target.Stop()
assertNoEvents(t, target.C())
})
}
func TestWatchList(t *testing.T) {
scenarios := []struct {
name string