cacher_watcher: Add support for consistent streaming

design details https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details
This commit is contained in:
Lukasz Szaszkiewicz 2023-02-27 13:32:49 +01:00
parent 7c7e773305
commit 52ce41a293
2 changed files with 347 additions and 4 deletions

View File

@ -19,8 +19,10 @@ package cacher
import ( import (
"context" "context"
"fmt" "fmt"
"sync"
"time" "time"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@ -32,6 +34,21 @@ import (
"k8s.io/klog/v2" "k8s.io/klog/v2"
) )
// possible states of the cache watcher
const (
// cacheWatcherWaitingForBookmark indicates the cacher
// is waiting for a bookmark event with a specific RV set
cacheWatcherWaitingForBookmark = iota
// cacheWatcherBookmarkReceived indicates that the cacher
// has received a bookmark event with required RV
cacheWatcherBookmarkReceived
// cacheWatcherBookmarkSent indicates that the cacher
// has already sent a bookmark event to a client
cacheWatcherBookmarkSent
)
// cacheWatcher implements watch.Interface // cacheWatcher implements watch.Interface
// this is not thread-safe // this is not thread-safe
type cacheWatcher struct { type cacheWatcher struct {
@ -55,6 +72,20 @@ type cacheWatcher struct {
// drainInputBuffer indicates whether we should delay closing this watcher // drainInputBuffer indicates whether we should delay closing this watcher
// and send all event in the input buffer. // and send all event in the input buffer.
drainInputBuffer bool drainInputBuffer bool
// bookmarkAfterResourceVersion holds an RV that indicates
// when we should start delivering bookmark events.
// If this field holds the value of 0 that means
// we don't have any special preferences toward delivering bookmark events.
// Note that this field is used in conjunction with the state field.
// It should not be changed once the watcher has been started.
bookmarkAfterResourceVersion uint64
// stateMutex protects state
stateMutex sync.Mutex
// state holds a numeric value indicating the current state of the watcher
state int
} }
func newCacheWatcher( func newCacheWatcher(
@ -115,8 +146,17 @@ func (c *cacheWatcher) stopLocked() {
} }
func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool { func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
// if the bookmarkAfterResourceVersion hasn't been seen
// we will try to deliver a bookmark event every second.
// the following check will discard a bookmark event
// if it is < than the bookmarkAfterResourceVersion
// so that we don't pollute the input channel
if event.Type == watch.Bookmark && event.ResourceVersion < c.bookmarkAfterResourceVersion {
return false
}
select { select {
case c.input <- event: case c.input <- event:
c.markBookmarkAfterRvAsReceived(event)
return true return true
default: default:
return false return false
@ -124,6 +164,9 @@ func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
} }
// Nil timer means that add will not block (if it can't send event immediately, it will break the watcher) // Nil timer means that add will not block (if it can't send event immediately, it will break the watcher)
//
// Note that bookmark events are never added via the add method only via the nonblockingAdd.
// Changing this behaviour will require moving the markBookmarkAfterRvAsReceived method
func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool { func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool {
// Try to send the event immediately, without blocking. // Try to send the event immediately, without blocking.
if c.nonblockingAdd(event) { if c.nonblockingAdd(event) {
@ -136,7 +179,31 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool {
// we simply terminate it. // we simply terminate it.
klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v", c.groupResource.String(), c.identifier, len(c.input), len(c.result)) klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v", c.groupResource.String(), c.identifier, len(c.input), len(c.result))
metrics.TerminatedWatchersCounter.WithLabelValues(c.groupResource.String()).Inc() metrics.TerminatedWatchersCounter.WithLabelValues(c.groupResource.String()).Inc()
c.forget(false) // This means that we couldn't send event to that watcher.
// Since we don't want to block on it infinitely, we simply terminate it.
// we are graceful = false, when:
//
// (a) The bookmarkAfterResourceVersionReceived hasn't been received,
// we can safely terminate the watcher. Because the client is waiting
// for this specific bookmark, and we even haven't received one.
// (b) We have seen the bookmarkAfterResourceVersion, and it was sent already to the client.
// We can simply terminate the watcher.
// we are graceful = true, when:
//
// (a) We have seen a bookmark, but it hasn't been sent to the client yet.
// That means we should drain the input buffer which contains
// the bookmarkAfterResourceVersion we want. We do that to make progress
// as clients can re-establish a new watch with the given RV and receive
// further notifications.
graceful := func() bool {
c.stateMutex.Lock()
defer c.stateMutex.Unlock()
return c.state == cacheWatcherBookmarkReceived
}()
klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v, graceful = %v", c.groupResource.String(), c.identifier, len(c.input), len(c.result), graceful)
c.forget(graceful)
} }
if timer == nil { if timer == nil {
@ -162,10 +229,20 @@ func (c *cacheWatcher) nextBookmarkTime(now time.Time, bookmarkFrequency time.Du
// //
// (b) roughly every minute // (b) roughly every minute
// //
// (c) immediately when the bookmarkAfterResourceVersion wasn't confirmed
// in this scenario the client have already seen (or is in the process of sending)
// all initial data and is interested in seeing
// a specific RV value (aka. the bookmarkAfterResourceVersion)
// since we don't know when the cacher will see the RV we increase frequency
//
// (b) gives us periodicity if the watch breaks due to unexpected // (b) gives us periodicity if the watch breaks due to unexpected
// conditions, (a) ensures that on timeout the watcher is as close to // conditions, (a) ensures that on timeout the watcher is as close to
// now as possible - this covers 99% of cases. // now as possible - this covers 99% of cases.
if !c.wasBookmarkAfterRvReceived() {
return time.Time{}, true // schedule immediately
}
heartbeatTime := now.Add(bookmarkFrequency) heartbeatTime := now.Add(bookmarkFrequency)
if c.deadline.IsZero() { if c.deadline.IsZero() {
// Timeout is set by our client libraries (e.g. reflector) as well as defaulted by // Timeout is set by our client libraries (e.g. reflector) as well as defaulted by
@ -182,6 +259,76 @@ func (c *cacheWatcher) nextBookmarkTime(now time.Time, bookmarkFrequency time.Du
return heartbeatTime, true return heartbeatTime, true
} }
// wasBookmarkAfterRvReceived same as wasBookmarkAfterRvReceivedLocked just acquires a lock
func (c *cacheWatcher) wasBookmarkAfterRvReceived() bool {
c.stateMutex.Lock()
defer c.stateMutex.Unlock()
return c.wasBookmarkAfterRvReceivedLocked()
}
// wasBookmarkAfterRvReceivedLocked checks if the given cacheWatcher
// have seen a bookmark event >= bookmarkAfterResourceVersion
func (c *cacheWatcher) wasBookmarkAfterRvReceivedLocked() bool {
return c.state != cacheWatcherWaitingForBookmark
}
// markBookmarkAfterRvAsReceived indicates that the given cacheWatcher
// have seen a bookmark event >= bookmarkAfterResourceVersion
func (c *cacheWatcher) markBookmarkAfterRvAsReceived(event *watchCacheEvent) {
if event.Type == watch.Bookmark {
c.stateMutex.Lock()
defer c.stateMutex.Unlock()
if c.wasBookmarkAfterRvReceivedLocked() {
return
}
// bookmark events are scheduled by startDispatchingBookmarkEvents method
// since we received a bookmark event that means we have
// converged towards the expected RV and it is okay to update the state so that
// this cacher can be scheduler for a regular bookmark events
c.state = cacheWatcherBookmarkReceived
}
}
// wasBookmarkAfterRvSentLocked checks if a bookmark event
// with an RV >= the bookmarkAfterResourceVersion has been sent by this watcher
func (c *cacheWatcher) wasBookmarkAfterRvSentLocked() bool {
return c.state == cacheWatcherBookmarkSent
}
// wasBookmarkAfterRvSent same as wasBookmarkAfterRvSentLocked just acquires a lock
func (c *cacheWatcher) wasBookmarkAfterRvSent() bool {
c.stateMutex.Lock()
defer c.stateMutex.Unlock()
return c.wasBookmarkAfterRvSentLocked()
}
// markBookmarkAfterRvSent indicates that the given cacheWatcher
// have sent a bookmark event with an RV >= the bookmarkAfterResourceVersion
//
// this function relies on the fact that the nonblockingAdd method
// won't admit a bookmark event with an RV < the bookmarkAfterResourceVersion
// so the first received bookmark event is considered to match the bookmarkAfterResourceVersion
func (c *cacheWatcher) markBookmarkAfterRvSent(event *watchCacheEvent) {
// note that bookmark events are not so common so will acquire a lock every ~60 second or so
if event.Type == watch.Bookmark {
c.stateMutex.Lock()
defer c.stateMutex.Unlock()
if !c.wasBookmarkAfterRvSentLocked() {
c.state = cacheWatcherBookmarkSent
}
}
}
// setBookmarkAfterResourceVersion sets the bookmarkAfterResourceVersion and the state associated with it
func (c *cacheWatcher) setBookmarkAfterResourceVersion(bookmarkAfterResourceVersion uint64) {
state := cacheWatcherWaitingForBookmark
if bookmarkAfterResourceVersion == 0 {
state = cacheWatcherBookmarkSent // if no specific RV was requested we assume no-op
}
c.state = state
c.bookmarkAfterResourceVersion = bookmarkAfterResourceVersion
}
// setDrainInputBufferLocked if set to true indicates that we should delay closing this watcher // setDrainInputBufferLocked if set to true indicates that we should delay closing this watcher
// until we send all events residing in the input buffer. // until we send all events residing in the input buffer.
func (c *cacheWatcher) setDrainInputBufferLocked(drain bool) { func (c *cacheWatcher) setDrainInputBufferLocked(drain bool) {
@ -216,7 +363,21 @@ func updateResourceVersion(object runtime.Object, versioner storage.Versioner, r
func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event { func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event {
if event.Type == watch.Bookmark { if event.Type == watch.Bookmark {
return &watch.Event{Type: watch.Bookmark, Object: event.Object.DeepCopyObject()} e := &watch.Event{Type: watch.Bookmark, Object: event.Object.DeepCopyObject()}
if !c.wasBookmarkAfterRvSent() {
objMeta, err := meta.Accessor(e.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error while accessing object's metadata gr: %v, identifier: %v, obj: %#v, err: %v", c.groupResource, c.identifier, e.Object, err))
return nil
}
objAnnotations := objMeta.GetAnnotations()
if objAnnotations == nil {
objAnnotations = map[string]string{}
}
objAnnotations["k8s.io/initial-events-end"] = "true"
objMeta.SetAnnotations(objAnnotations)
}
return e
} }
curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields) curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields)
@ -276,6 +437,7 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
select { select {
case c.result <- *watchEvent: case c.result <- *watchEvent:
c.markBookmarkAfterRvSent(event)
case <-c.done: case <-c.done:
} }
} }
@ -360,7 +522,9 @@ func (c *cacheWatcher) process(ctx context.Context, resourceVersion uint64) {
return return
} }
// only send events newer than resourceVersion // only send events newer than resourceVersion
if event.ResourceVersion > resourceVersion { // or a bookmark event with an RV equal to resourceVersion
// if we haven't sent one to the client
if event.ResourceVersion > resourceVersion || (event.Type == watch.Bookmark && event.ResourceVersion == resourceVersion && !c.wasBookmarkAfterRvSent()) {
c.sendWatchCacheEvent(event) c.sendWatchCacheEvent(event)
} }
case <-ctx.Done(): case <-ctx.Done():

View File

@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
testingclock "k8s.io/utils/clock/testing" testingclock "k8s.io/utils/clock/testing"
) )
@ -291,7 +292,9 @@ func TestTimeBucketWatchersBasic(t *testing.T) {
forget := func(bool) {} forget := func(bool) {}
newWatcher := func(deadline time.Time) *cacheWatcher { newWatcher := func(deadline time.Time) *cacheWatcher {
return newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "") w := newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "")
w.setBookmarkAfterResourceVersion(0)
return w
} }
clock := testingclock.NewFakeClock(time.Now()) clock := testingclock.NewFakeClock(time.Now())
@ -412,3 +415,179 @@ func TestCacheWatcherDrainingRequestedButNotDrained(t *testing.T) {
t.Fatalf("expected forget() to be called three times, because processInterval should call Stop(): %v", err) t.Fatalf("expected forget() to be called three times, because processInterval should call Stop(): %v", err)
} }
} }
// TestCacheWatcherDrainingNoBookmarkAfterResourceVersionReceived verifies if the watcher will be stopped
// when adding an item times out and the bookmarkAfterResourceVersion hasn't been received
func TestCacheWatcherDrainingNoBookmarkAfterResourceVersionReceived(t *testing.T) {
var lock sync.RWMutex
var w *cacheWatcher
count := 0
filter := func(string, labels.Set, fields.Set) bool { return true }
forget := func(drainWatcher bool) {
lock.Lock()
defer lock.Unlock()
if drainWatcher == true {
t.Fatalf("didn't expect drainWatcher to be set to true")
}
count++
w.setDrainInputBufferLocked(drainWatcher)
w.stopLocked()
}
initEvents := []*watchCacheEvent{
{Object: &v1.Pod{}},
{Object: &v1.Pod{}},
}
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
w.setBookmarkAfterResourceVersion(10)
go w.processInterval(context.Background(), intervalFromEvents(initEvents), 0)
if w.add(&watchCacheEvent{Object: &v1.Pod{}}, time.NewTimer(1*time.Second)) {
t.Fatal("expected the add method to fail")
}
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
lock.RLock()
defer lock.RUnlock()
return count == 2, nil
}); err != nil {
t.Fatalf("expected forget() to be called twice, first call from w.add() and then from w.Stop() called from w.processInterval(): %v", err)
}
if !w.stopped {
t.Fatal("expected the watcher to be stopped but it wasn't")
}
}
// TestCacheWatcherDrainingNoBookmarkAfterResourceVersionSent checks if the watcher's input
// channel is drained if the bookmarkAfterResourceVersion was received but not sent
func TestCacheWatcherDrainingNoBookmarkAfterResourceVersionSent(t *testing.T) {
makePod := func(rv uint64) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("pod-%d", rv),
Namespace: "ns",
ResourceVersion: fmt.Sprintf("%d", rv),
Annotations: map[string]string{},
},
}
}
var lock sync.RWMutex
var w *cacheWatcher
watchInitializationSignal := utilflowcontrol.NewInitializationSignal()
ctx := utilflowcontrol.WithInitializationSignal(context.Background(), watchInitializationSignal)
count := 0
filter := func(string, labels.Set, fields.Set) bool { return true }
forget := func(drainWatcher bool) {
lock.Lock()
defer lock.Unlock()
count++
w.setDrainInputBufferLocked(drainWatcher)
w.stopLocked()
}
initEvents := []*watchCacheEvent{{Object: makePod(1)}, {Object: makePod(2)}}
w = newCacheWatcher(2, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
w.setBookmarkAfterResourceVersion(10)
go w.processInterval(ctx, intervalFromEvents(initEvents), 0)
watchInitializationSignal.Wait()
// note that we can add three events even though the chanSize is two because
// one event has been popped off from the input chan
if !w.add(&watchCacheEvent{Object: makePod(5), ResourceVersion: 5}, time.NewTimer(1*time.Second)) {
t.Fatal("failed adding an even to the watcher")
}
if !w.nonblockingAdd(&watchCacheEvent{Type: watch.Bookmark, ResourceVersion: 10, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "10"}}}) {
t.Fatal("failed adding an even to the watcher")
}
if !w.add(&watchCacheEvent{Object: makePod(15), ResourceVersion: 15}, time.NewTimer(1*time.Second)) {
t.Fatal("failed adding an even to the watcher")
}
if w.add(&watchCacheEvent{Object: makePod(20), ResourceVersion: 20}, time.NewTimer(1*time.Second)) {
t.Fatal("expected the add method to fail")
}
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
lock.RLock()
defer lock.RUnlock()
return count == 1, nil
}); err != nil {
t.Fatalf("expected forget() to be called once, just from the w.add() method: %v", err)
}
if !w.stopped {
t.Fatal("expected the watcher to be stopped but it wasn't")
}
verifyEvents(t, w, []watch.Event{
{Type: watch.Added, Object: makePod(1)},
{Type: watch.Added, Object: makePod(2)},
{Type: watch.Added, Object: makePod(5)},
{Type: watch.Bookmark, Object: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "10",
Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
},
}},
{Type: watch.Added, Object: makePod(15)},
}, true)
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
lock.RLock()
defer lock.RUnlock()
return count == 2, nil
}); err != nil {
t.Fatalf("expected forget() to be called twice, the second call is from w.Stop() method called from w.processInterval(): %v", err)
}
}
func TestBookmarkAfterResourceVersionWatchers(t *testing.T) {
newWatcher := func(id string, deadline time.Time) *cacheWatcher {
w := newCacheWatcher(0, func(_ string, _ labels.Set, _ fields.Set) bool { return true }, func(bool) {}, testVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, id)
w.setBookmarkAfterResourceVersion(10)
return w
}
clock := testingclock.NewFakeClock(time.Now())
target := newTimeBucketWatchers(clock, defaultBookmarkFrequency)
if !target.addWatcher(newWatcher("1", clock.Now().Add(2*time.Minute))) {
t.Fatal("failed adding an even to the watcher")
}
// the watcher is immediately expired (it's waiting for bookmark, so it is scheduled immediately)
ret := target.popExpiredWatchers()
if len(ret) != 1 || len(ret[0]) != 1 {
t.Fatalf("expected only one watcher to be expired")
}
if !target.addWatcher(ret[0][0]) {
t.Fatal("failed adding an even to the watcher")
}
// after one second time the watcher is still expired
clock.Step(1 * time.Second)
ret = target.popExpiredWatchers()
if len(ret) != 1 || len(ret[0]) != 1 {
t.Fatalf("expected only one watcher to be expired")
}
if !target.addWatcher(ret[0][0]) {
t.Fatal("failed adding an even to the watcher")
}
// after 29 seconds the watcher is still expired
clock.Step(29 * time.Second)
ret = target.popExpiredWatchers()
if len(ret) != 1 || len(ret[0]) != 1 {
t.Fatalf("expected only one watcher to be expired")
}
// after confirming the watcher is not expired immediately
ret[0][0].markBookmarkAfterRvAsReceived(&watchCacheEvent{Type: watch.Bookmark, ResourceVersion: 10, Object: &v1.Pod{}})
if !target.addWatcher(ret[0][0]) {
t.Fatal("failed adding an even to the watcher")
}
clock.Step(30 * time.Second)
ret = target.popExpiredWatchers()
if len(ret) != 0 {
t.Fatalf("didn't expect any watchers to be expired")
}
clock.Step(30 * time.Second)
ret = target.popExpiredWatchers()
if len(ret) != 1 || len(ret[0]) != 1 {
t.Fatalf("expected only one watcher to be expired")
}
}