Merge pull request #90560 from wojtek-t/frequent_bookmarks

Send watch bookmarks every minute
This commit is contained in:
Kubernetes Prow Robot 2020-05-01 00:41:09 -07:00 committed by GitHub
commit 3546d6267c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 112 additions and 22 deletions

View File

@ -51,6 +51,9 @@ const (
// storageWatchListPageSize is the cacher's request chunk size of
// initial and resync watch lists to storage.
storageWatchListPageSize = int64(10000)
// defaultBookmarkFrequency defines how frequently watch bookmarks should be send
// in addition to sending a bookmark right before watch deadline
defaultBookmarkFrequency = time.Minute
)
// Config contains the configuration for a given Cache.
@ -154,24 +157,26 @@ func (i *indexedWatchers) terminateAll(objectType reflect.Type, done func(*cache
// second in a bucket, and pop up them once at the timeout. To be more specific,
// if you set fire time at X, you can get the bookmark within (X-1,X+1) period.
type watcherBookmarkTimeBuckets struct {
lock sync.Mutex
watchersBuckets map[int64][]*cacheWatcher
startBucketID int64
clock clock.Clock
lock sync.Mutex
watchersBuckets map[int64][]*cacheWatcher
startBucketID int64
clock clock.Clock
bookmarkFrequency time.Duration
}
func newTimeBucketWatchers(clock clock.Clock) *watcherBookmarkTimeBuckets {
func newTimeBucketWatchers(clock clock.Clock, bookmarkFrequency time.Duration) *watcherBookmarkTimeBuckets {
return &watcherBookmarkTimeBuckets{
watchersBuckets: make(map[int64][]*cacheWatcher),
startBucketID: clock.Now().Unix(),
clock: clock,
watchersBuckets: make(map[int64][]*cacheWatcher),
startBucketID: clock.Now().Unix(),
clock: clock,
bookmarkFrequency: bookmarkFrequency,
}
}
// adds a watcher to the bucket, if the deadline is before the start, it will be
// added to the first one.
func (t *watcherBookmarkTimeBuckets) addWatcher(w *cacheWatcher) bool {
nextTime, ok := w.nextBookmarkTime(t.clock.Now())
nextTime, ok := w.nextBookmarkTime(t.clock.Now(), t.bookmarkFrequency)
if !ok {
return false
}
@ -339,7 +344,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
stopCh: stopCh,
clock: clock,
timer: time.NewTimer(time.Duration(0)),
bookmarkWatchers: newTimeBucketWatchers(clock),
bookmarkWatchers: newTimeBucketWatchers(clock, defaultBookmarkFrequency),
}
// Ensure that timer is stopped.
@ -914,9 +919,8 @@ func (c *Cacher) startDispatchingBookmarkEvents() {
continue
}
c.watchersBuffer = append(c.watchersBuffer, watcher)
// Given that we send bookmark event once at deadline-2s, never push again
// after the watcher pops up from the buckets. Once we decide to change the
// strategy to more sophisticated, we may need it here.
// Requeue the watcher for the next bookmark if needed.
c.bookmarkWatchers.addWatcher(watcher)
}
}
}
@ -1219,13 +1223,28 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool {
}
}
func (c *cacheWatcher) nextBookmarkTime(now time.Time) (time.Time, bool) {
// For now we return 2s before deadline (and maybe +infinity is now already passed this time)
// but it gives us extensibility for the future(false when deadline is not set).
func (c *cacheWatcher) nextBookmarkTime(now time.Time, bookmarkFrequency time.Duration) (time.Time, bool) {
// We try to send bookmarks:
// (a) roughly every minute
// (b) right before the watcher timeout - for now we simply set it 2s before
// the deadline
// The former gives us periodicity if the watch breaks due to unexpected
// conditions, the later ensures that on timeout the watcher is as close to
// now as possible - this covers 99% of cases.
heartbeatTime := now.Add(bookmarkFrequency)
if c.deadline.IsZero() {
return c.deadline, false
// Timeout is set by our client libraries (e.g. reflector) as well as defaulted by
// apiserver if properly configured. So this shoudln't happen in practice.
return heartbeatTime, true
}
return c.deadline.Add(-2 * time.Second), true
if pretimeoutTime := c.deadline.Add(-2 * time.Second); pretimeoutTime.Before(heartbeatTime) {
heartbeatTime = pretimeoutTime
}
if heartbeatTime.Before(now) {
return time.Time{}, false
}
return heartbeatTime, true
}
func getEventObject(object runtime.Object) runtime.Object {

View File

@ -558,7 +558,7 @@ func TestTimeBucketWatchersBasic(t *testing.T) {
}
clock := clock.NewFakeClock(time.Now())
watchers := newTimeBucketWatchers(clock)
watchers := newTimeBucketWatchers(clock, defaultBookmarkFrequency)
now := clock.Now()
watchers.addWatcher(newWatcher(now.Add(10 * time.Second)))
watchers.addWatcher(newWatcher(now.Add(20 * time.Second)))
@ -746,6 +746,77 @@ func TestCacherSendBookmarkEvents(t *testing.T) {
}
}
func TestCacherSendsMultipleWatchBookmarks(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage, 1000)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
// Update bookmarkFrequency to speed up test.
// Note that the frequency lower than 1s doesn't change much due to
// resolution how frequency we recompute.
cacher.bookmarkWatchers.bookmarkFrequency = time.Second
// Wait until cacher is initialized.
cacher.ready.wait()
pred := storage.Everything
pred.AllowWatchBookmarks = true
makePod := func(index int) *examplev1.Pod {
return &examplev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("pod-%d", index),
Namespace: "ns",
ResourceVersion: fmt.Sprintf("%v", 100+index),
},
}
}
// Create pod to initialize watch cache.
if err := cacher.watchCache.Add(makePod(0)); err != nil {
t.Fatalf("failed to add a pod: %v", err)
}
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
w, err := cacher.Watch(ctx, "pods/ns", "100", pred)
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}
// Create one more pod, to ensure that current RV is higher and thus
// bookmarks will be delievere (events are delivered for RV higher
// than the max from init events).
if err := cacher.watchCache.Add(makePod(1)); err != nil {
t.Fatalf("failed to add a pod: %v", err)
}
timeoutCh := time.After(5 * time.Second)
lastObservedRV := uint64(0)
// Ensure that a watcher gets two bookmarks.
for observedBookmarks := 0; observedBookmarks < 2; {
select {
case event, ok := <-w.ResultChan():
if !ok {
t.Fatal("Unexpected closed")
}
rv, err := cacher.versioner.ObjectResourceVersion(event.Object)
if err != nil {
t.Errorf("failed to parse resource version from %#v: %v", event.Object, err)
}
if event.Type == watch.Bookmark {
observedBookmarks++
if rv < lastObservedRV {
t.Errorf("Unexpected bookmark event resource version %v (last %v)", rv, lastObservedRV)
}
}
lastObservedRV = rv
case <-timeoutCh:
t.Fatal("Unexpected timeout to receive bookmark events")
}
}
}
func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)()
backingStorage := &dummyStorage{}

View File

@ -855,12 +855,12 @@ func TestWatchDispatchBookmarkEvents(t *testing.T) {
allowWatchBookmark bool
}{
{ // test old client won't get Bookmark event
timeout: 2 * time.Second,
timeout: 3 * time.Second,
expected: false,
allowWatchBookmark: false,
},
{
timeout: 2 * time.Second,
timeout: 3 * time.Second,
expected: true,
allowWatchBookmark: true,
},
@ -909,7 +909,7 @@ func TestWatchBookmarksWithCorrectResourceVersion(t *testing.T) {
pred := storage.Everything
pred.AllowWatchBookmarks = true
ctx, _ := context.WithTimeout(context.Background(), 2*time.Second)
ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
watcher, err := cacher.WatchList(ctx, "pods/ns", "0", pred)
if err != nil {
t.Fatalf("Unexpected error: %v", err)