mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 10:51:29 +00:00
Merge pull request #110960 from p0lyn0mial/upstream-cacher-sends-stream
cacher consistent streaming support
This commit is contained in:
commit
0ad676fca8
@ -19,8 +19,10 @@ package cacher
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
@ -32,6 +34,21 @@ import (
|
|||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// possible states of the cache watcher
|
||||||
|
const (
|
||||||
|
// cacheWatcherWaitingForBookmark indicates the cacher
|
||||||
|
// is waiting for a bookmark event with a specific RV set
|
||||||
|
cacheWatcherWaitingForBookmark = iota
|
||||||
|
|
||||||
|
// cacheWatcherBookmarkReceived indicates that the cacher
|
||||||
|
// has received a bookmark event with required RV
|
||||||
|
cacheWatcherBookmarkReceived
|
||||||
|
|
||||||
|
// cacheWatcherBookmarkSent indicates that the cacher
|
||||||
|
// has already sent a bookmark event to a client
|
||||||
|
cacheWatcherBookmarkSent
|
||||||
|
)
|
||||||
|
|
||||||
// cacheWatcher implements watch.Interface
|
// cacheWatcher implements watch.Interface
|
||||||
// this is not thread-safe
|
// this is not thread-safe
|
||||||
type cacheWatcher struct {
|
type cacheWatcher struct {
|
||||||
@ -55,6 +72,20 @@ type cacheWatcher struct {
|
|||||||
// drainInputBuffer indicates whether we should delay closing this watcher
|
// drainInputBuffer indicates whether we should delay closing this watcher
|
||||||
// and send all event in the input buffer.
|
// and send all event in the input buffer.
|
||||||
drainInputBuffer bool
|
drainInputBuffer bool
|
||||||
|
|
||||||
|
// bookmarkAfterResourceVersion holds an RV that indicates
|
||||||
|
// when we should start delivering bookmark events.
|
||||||
|
// If this field holds the value of 0 that means
|
||||||
|
// we don't have any special preferences toward delivering bookmark events.
|
||||||
|
// Note that this field is used in conjunction with the state field.
|
||||||
|
// It should not be changed once the watcher has been started.
|
||||||
|
bookmarkAfterResourceVersion uint64
|
||||||
|
|
||||||
|
// stateMutex protects state
|
||||||
|
stateMutex sync.Mutex
|
||||||
|
|
||||||
|
// state holds a numeric value indicating the current state of the watcher
|
||||||
|
state int
|
||||||
}
|
}
|
||||||
|
|
||||||
func newCacheWatcher(
|
func newCacheWatcher(
|
||||||
@ -115,8 +146,17 @@ func (c *cacheWatcher) stopLocked() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
|
func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
|
||||||
|
// if the bookmarkAfterResourceVersion hasn't been seen
|
||||||
|
// we will try to deliver a bookmark event every second.
|
||||||
|
// the following check will discard a bookmark event
|
||||||
|
// if it is < than the bookmarkAfterResourceVersion
|
||||||
|
// so that we don't pollute the input channel
|
||||||
|
if event.Type == watch.Bookmark && event.ResourceVersion < c.bookmarkAfterResourceVersion {
|
||||||
|
return false
|
||||||
|
}
|
||||||
select {
|
select {
|
||||||
case c.input <- event:
|
case c.input <- event:
|
||||||
|
c.markBookmarkAfterRvAsReceived(event)
|
||||||
return true
|
return true
|
||||||
default:
|
default:
|
||||||
return false
|
return false
|
||||||
@ -124,6 +164,9 @@ func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Nil timer means that add will not block (if it can't send event immediately, it will break the watcher)
|
// Nil timer means that add will not block (if it can't send event immediately, it will break the watcher)
|
||||||
|
//
|
||||||
|
// Note that bookmark events are never added via the add method only via the nonblockingAdd.
|
||||||
|
// Changing this behaviour will require moving the markBookmarkAfterRvAsReceived method
|
||||||
func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool {
|
func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool {
|
||||||
// Try to send the event immediately, without blocking.
|
// Try to send the event immediately, without blocking.
|
||||||
if c.nonblockingAdd(event) {
|
if c.nonblockingAdd(event) {
|
||||||
@ -136,7 +179,31 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool {
|
|||||||
// we simply terminate it.
|
// we simply terminate it.
|
||||||
klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v", c.groupResource.String(), c.identifier, len(c.input), len(c.result))
|
klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v", c.groupResource.String(), c.identifier, len(c.input), len(c.result))
|
||||||
metrics.TerminatedWatchersCounter.WithLabelValues(c.groupResource.String()).Inc()
|
metrics.TerminatedWatchersCounter.WithLabelValues(c.groupResource.String()).Inc()
|
||||||
c.forget(false)
|
// This means that we couldn't send event to that watcher.
|
||||||
|
// Since we don't want to block on it infinitely, we simply terminate it.
|
||||||
|
|
||||||
|
// we are graceful = false, when:
|
||||||
|
//
|
||||||
|
// (a) The bookmarkAfterResourceVersionReceived hasn't been received,
|
||||||
|
// we can safely terminate the watcher. Because the client is waiting
|
||||||
|
// for this specific bookmark, and we even haven't received one.
|
||||||
|
// (b) We have seen the bookmarkAfterResourceVersion, and it was sent already to the client.
|
||||||
|
// We can simply terminate the watcher.
|
||||||
|
|
||||||
|
// we are graceful = true, when:
|
||||||
|
//
|
||||||
|
// (a) We have seen a bookmark, but it hasn't been sent to the client yet.
|
||||||
|
// That means we should drain the input buffer which contains
|
||||||
|
// the bookmarkAfterResourceVersion we want. We do that to make progress
|
||||||
|
// as clients can re-establish a new watch with the given RV and receive
|
||||||
|
// further notifications.
|
||||||
|
graceful := func() bool {
|
||||||
|
c.stateMutex.Lock()
|
||||||
|
defer c.stateMutex.Unlock()
|
||||||
|
return c.state == cacheWatcherBookmarkReceived
|
||||||
|
}()
|
||||||
|
klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v, graceful = %v", c.groupResource.String(), c.identifier, len(c.input), len(c.result), graceful)
|
||||||
|
c.forget(graceful)
|
||||||
}
|
}
|
||||||
|
|
||||||
if timer == nil {
|
if timer == nil {
|
||||||
@ -162,10 +229,20 @@ func (c *cacheWatcher) nextBookmarkTime(now time.Time, bookmarkFrequency time.Du
|
|||||||
//
|
//
|
||||||
// (b) roughly every minute
|
// (b) roughly every minute
|
||||||
//
|
//
|
||||||
|
// (c) immediately when the bookmarkAfterResourceVersion wasn't confirmed
|
||||||
|
// in this scenario the client have already seen (or is in the process of sending)
|
||||||
|
// all initial data and is interested in seeing
|
||||||
|
// a specific RV value (aka. the bookmarkAfterResourceVersion)
|
||||||
|
// since we don't know when the cacher will see the RV we increase frequency
|
||||||
|
//
|
||||||
// (b) gives us periodicity if the watch breaks due to unexpected
|
// (b) gives us periodicity if the watch breaks due to unexpected
|
||||||
// conditions, (a) ensures that on timeout the watcher is as close to
|
// conditions, (a) ensures that on timeout the watcher is as close to
|
||||||
// now as possible - this covers 99% of cases.
|
// now as possible - this covers 99% of cases.
|
||||||
|
|
||||||
|
if !c.wasBookmarkAfterRvReceived() {
|
||||||
|
return time.Time{}, true // schedule immediately
|
||||||
|
}
|
||||||
|
|
||||||
heartbeatTime := now.Add(bookmarkFrequency)
|
heartbeatTime := now.Add(bookmarkFrequency)
|
||||||
if c.deadline.IsZero() {
|
if c.deadline.IsZero() {
|
||||||
// Timeout is set by our client libraries (e.g. reflector) as well as defaulted by
|
// Timeout is set by our client libraries (e.g. reflector) as well as defaulted by
|
||||||
@ -182,6 +259,76 @@ func (c *cacheWatcher) nextBookmarkTime(now time.Time, bookmarkFrequency time.Du
|
|||||||
return heartbeatTime, true
|
return heartbeatTime, true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// wasBookmarkAfterRvReceived same as wasBookmarkAfterRvReceivedLocked just acquires a lock
|
||||||
|
func (c *cacheWatcher) wasBookmarkAfterRvReceived() bool {
|
||||||
|
c.stateMutex.Lock()
|
||||||
|
defer c.stateMutex.Unlock()
|
||||||
|
return c.wasBookmarkAfterRvReceivedLocked()
|
||||||
|
}
|
||||||
|
|
||||||
|
// wasBookmarkAfterRvReceivedLocked checks if the given cacheWatcher
|
||||||
|
// have seen a bookmark event >= bookmarkAfterResourceVersion
|
||||||
|
func (c *cacheWatcher) wasBookmarkAfterRvReceivedLocked() bool {
|
||||||
|
return c.state != cacheWatcherWaitingForBookmark
|
||||||
|
}
|
||||||
|
|
||||||
|
// markBookmarkAfterRvAsReceived indicates that the given cacheWatcher
|
||||||
|
// have seen a bookmark event >= bookmarkAfterResourceVersion
|
||||||
|
func (c *cacheWatcher) markBookmarkAfterRvAsReceived(event *watchCacheEvent) {
|
||||||
|
if event.Type == watch.Bookmark {
|
||||||
|
c.stateMutex.Lock()
|
||||||
|
defer c.stateMutex.Unlock()
|
||||||
|
if c.wasBookmarkAfterRvReceivedLocked() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// bookmark events are scheduled by startDispatchingBookmarkEvents method
|
||||||
|
// since we received a bookmark event that means we have
|
||||||
|
// converged towards the expected RV and it is okay to update the state so that
|
||||||
|
// this cacher can be scheduler for a regular bookmark events
|
||||||
|
c.state = cacheWatcherBookmarkReceived
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// wasBookmarkAfterRvSentLocked checks if a bookmark event
|
||||||
|
// with an RV >= the bookmarkAfterResourceVersion has been sent by this watcher
|
||||||
|
func (c *cacheWatcher) wasBookmarkAfterRvSentLocked() bool {
|
||||||
|
return c.state == cacheWatcherBookmarkSent
|
||||||
|
}
|
||||||
|
|
||||||
|
// wasBookmarkAfterRvSent same as wasBookmarkAfterRvSentLocked just acquires a lock
|
||||||
|
func (c *cacheWatcher) wasBookmarkAfterRvSent() bool {
|
||||||
|
c.stateMutex.Lock()
|
||||||
|
defer c.stateMutex.Unlock()
|
||||||
|
return c.wasBookmarkAfterRvSentLocked()
|
||||||
|
}
|
||||||
|
|
||||||
|
// markBookmarkAfterRvSent indicates that the given cacheWatcher
|
||||||
|
// have sent a bookmark event with an RV >= the bookmarkAfterResourceVersion
|
||||||
|
//
|
||||||
|
// this function relies on the fact that the nonblockingAdd method
|
||||||
|
// won't admit a bookmark event with an RV < the bookmarkAfterResourceVersion
|
||||||
|
// so the first received bookmark event is considered to match the bookmarkAfterResourceVersion
|
||||||
|
func (c *cacheWatcher) markBookmarkAfterRvSent(event *watchCacheEvent) {
|
||||||
|
// note that bookmark events are not so common so will acquire a lock every ~60 second or so
|
||||||
|
if event.Type == watch.Bookmark {
|
||||||
|
c.stateMutex.Lock()
|
||||||
|
defer c.stateMutex.Unlock()
|
||||||
|
if !c.wasBookmarkAfterRvSentLocked() {
|
||||||
|
c.state = cacheWatcherBookmarkSent
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// setBookmarkAfterResourceVersion sets the bookmarkAfterResourceVersion and the state associated with it
|
||||||
|
func (c *cacheWatcher) setBookmarkAfterResourceVersion(bookmarkAfterResourceVersion uint64) {
|
||||||
|
state := cacheWatcherWaitingForBookmark
|
||||||
|
if bookmarkAfterResourceVersion == 0 {
|
||||||
|
state = cacheWatcherBookmarkSent // if no specific RV was requested we assume no-op
|
||||||
|
}
|
||||||
|
c.state = state
|
||||||
|
c.bookmarkAfterResourceVersion = bookmarkAfterResourceVersion
|
||||||
|
}
|
||||||
|
|
||||||
// setDrainInputBufferLocked if set to true indicates that we should delay closing this watcher
|
// setDrainInputBufferLocked if set to true indicates that we should delay closing this watcher
|
||||||
// until we send all events residing in the input buffer.
|
// until we send all events residing in the input buffer.
|
||||||
func (c *cacheWatcher) setDrainInputBufferLocked(drain bool) {
|
func (c *cacheWatcher) setDrainInputBufferLocked(drain bool) {
|
||||||
@ -216,7 +363,21 @@ func updateResourceVersion(object runtime.Object, versioner storage.Versioner, r
|
|||||||
|
|
||||||
func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event {
|
func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event {
|
||||||
if event.Type == watch.Bookmark {
|
if event.Type == watch.Bookmark {
|
||||||
return &watch.Event{Type: watch.Bookmark, Object: event.Object.DeepCopyObject()}
|
e := &watch.Event{Type: watch.Bookmark, Object: event.Object.DeepCopyObject()}
|
||||||
|
if !c.wasBookmarkAfterRvSent() {
|
||||||
|
objMeta, err := meta.Accessor(e.Object)
|
||||||
|
if err != nil {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("error while accessing object's metadata gr: %v, identifier: %v, obj: %#v, err: %v", c.groupResource, c.identifier, e.Object, err))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
objAnnotations := objMeta.GetAnnotations()
|
||||||
|
if objAnnotations == nil {
|
||||||
|
objAnnotations = map[string]string{}
|
||||||
|
}
|
||||||
|
objAnnotations["k8s.io/initial-events-end"] = "true"
|
||||||
|
objMeta.SetAnnotations(objAnnotations)
|
||||||
|
}
|
||||||
|
return e
|
||||||
}
|
}
|
||||||
|
|
||||||
curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields)
|
curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields)
|
||||||
@ -276,6 +437,7 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case c.result <- *watchEvent:
|
case c.result <- *watchEvent:
|
||||||
|
c.markBookmarkAfterRvSent(event)
|
||||||
case <-c.done:
|
case <-c.done:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -360,7 +522,9 @@ func (c *cacheWatcher) process(ctx context.Context, resourceVersion uint64) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
// only send events newer than resourceVersion
|
// only send events newer than resourceVersion
|
||||||
if event.ResourceVersion > resourceVersion {
|
// or a bookmark event with an RV equal to resourceVersion
|
||||||
|
// if we haven't sent one to the client
|
||||||
|
if event.ResourceVersion > resourceVersion || (event.Type == watch.Bookmark && event.ResourceVersion == resourceVersion && !c.wasBookmarkAfterRvSent()) {
|
||||||
c.sendWatchCacheEvent(event)
|
c.sendWatchCacheEvent(event)
|
||||||
}
|
}
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
@ -33,6 +33,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
|
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
||||||
testingclock "k8s.io/utils/clock/testing"
|
testingclock "k8s.io/utils/clock/testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -291,7 +292,9 @@ func TestTimeBucketWatchersBasic(t *testing.T) {
|
|||||||
forget := func(bool) {}
|
forget := func(bool) {}
|
||||||
|
|
||||||
newWatcher := func(deadline time.Time) *cacheWatcher {
|
newWatcher := func(deadline time.Time) *cacheWatcher {
|
||||||
return newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "")
|
w := newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "")
|
||||||
|
w.setBookmarkAfterResourceVersion(0)
|
||||||
|
return w
|
||||||
}
|
}
|
||||||
|
|
||||||
clock := testingclock.NewFakeClock(time.Now())
|
clock := testingclock.NewFakeClock(time.Now())
|
||||||
@ -412,3 +415,179 @@ func TestCacheWatcherDrainingRequestedButNotDrained(t *testing.T) {
|
|||||||
t.Fatalf("expected forget() to be called three times, because processInterval should call Stop(): %v", err)
|
t.Fatalf("expected forget() to be called three times, because processInterval should call Stop(): %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestCacheWatcherDrainingNoBookmarkAfterResourceVersionReceived verifies if the watcher will be stopped
|
||||||
|
// when adding an item times out and the bookmarkAfterResourceVersion hasn't been received
|
||||||
|
func TestCacheWatcherDrainingNoBookmarkAfterResourceVersionReceived(t *testing.T) {
|
||||||
|
var lock sync.RWMutex
|
||||||
|
var w *cacheWatcher
|
||||||
|
count := 0
|
||||||
|
filter := func(string, labels.Set, fields.Set) bool { return true }
|
||||||
|
forget := func(drainWatcher bool) {
|
||||||
|
lock.Lock()
|
||||||
|
defer lock.Unlock()
|
||||||
|
if drainWatcher == true {
|
||||||
|
t.Fatalf("didn't expect drainWatcher to be set to true")
|
||||||
|
}
|
||||||
|
count++
|
||||||
|
w.setDrainInputBufferLocked(drainWatcher)
|
||||||
|
w.stopLocked()
|
||||||
|
}
|
||||||
|
initEvents := []*watchCacheEvent{
|
||||||
|
{Object: &v1.Pod{}},
|
||||||
|
{Object: &v1.Pod{}},
|
||||||
|
}
|
||||||
|
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
|
||||||
|
w.setBookmarkAfterResourceVersion(10)
|
||||||
|
go w.processInterval(context.Background(), intervalFromEvents(initEvents), 0)
|
||||||
|
if w.add(&watchCacheEvent{Object: &v1.Pod{}}, time.NewTimer(1*time.Second)) {
|
||||||
|
t.Fatal("expected the add method to fail")
|
||||||
|
}
|
||||||
|
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
|
||||||
|
lock.RLock()
|
||||||
|
defer lock.RUnlock()
|
||||||
|
return count == 2, nil
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("expected forget() to be called twice, first call from w.add() and then from w.Stop() called from w.processInterval(): %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !w.stopped {
|
||||||
|
t.Fatal("expected the watcher to be stopped but it wasn't")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestCacheWatcherDrainingNoBookmarkAfterResourceVersionSent checks if the watcher's input
|
||||||
|
// channel is drained if the bookmarkAfterResourceVersion was received but not sent
|
||||||
|
func TestCacheWatcherDrainingNoBookmarkAfterResourceVersionSent(t *testing.T) {
|
||||||
|
makePod := func(rv uint64) *v1.Pod {
|
||||||
|
return &v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: fmt.Sprintf("pod-%d", rv),
|
||||||
|
Namespace: "ns",
|
||||||
|
ResourceVersion: fmt.Sprintf("%d", rv),
|
||||||
|
Annotations: map[string]string{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
var lock sync.RWMutex
|
||||||
|
var w *cacheWatcher
|
||||||
|
watchInitializationSignal := utilflowcontrol.NewInitializationSignal()
|
||||||
|
ctx := utilflowcontrol.WithInitializationSignal(context.Background(), watchInitializationSignal)
|
||||||
|
count := 0
|
||||||
|
filter := func(string, labels.Set, fields.Set) bool { return true }
|
||||||
|
forget := func(drainWatcher bool) {
|
||||||
|
lock.Lock()
|
||||||
|
defer lock.Unlock()
|
||||||
|
count++
|
||||||
|
w.setDrainInputBufferLocked(drainWatcher)
|
||||||
|
w.stopLocked()
|
||||||
|
}
|
||||||
|
initEvents := []*watchCacheEvent{{Object: makePod(1)}, {Object: makePod(2)}}
|
||||||
|
w = newCacheWatcher(2, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
|
||||||
|
w.setBookmarkAfterResourceVersion(10)
|
||||||
|
go w.processInterval(ctx, intervalFromEvents(initEvents), 0)
|
||||||
|
watchInitializationSignal.Wait()
|
||||||
|
|
||||||
|
// note that we can add three events even though the chanSize is two because
|
||||||
|
// one event has been popped off from the input chan
|
||||||
|
if !w.add(&watchCacheEvent{Object: makePod(5), ResourceVersion: 5}, time.NewTimer(1*time.Second)) {
|
||||||
|
t.Fatal("failed adding an even to the watcher")
|
||||||
|
}
|
||||||
|
if !w.nonblockingAdd(&watchCacheEvent{Type: watch.Bookmark, ResourceVersion: 10, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "10"}}}) {
|
||||||
|
t.Fatal("failed adding an even to the watcher")
|
||||||
|
}
|
||||||
|
if !w.add(&watchCacheEvent{Object: makePod(15), ResourceVersion: 15}, time.NewTimer(1*time.Second)) {
|
||||||
|
t.Fatal("failed adding an even to the watcher")
|
||||||
|
}
|
||||||
|
if w.add(&watchCacheEvent{Object: makePod(20), ResourceVersion: 20}, time.NewTimer(1*time.Second)) {
|
||||||
|
t.Fatal("expected the add method to fail")
|
||||||
|
}
|
||||||
|
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
|
||||||
|
lock.RLock()
|
||||||
|
defer lock.RUnlock()
|
||||||
|
return count == 1, nil
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("expected forget() to be called once, just from the w.add() method: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !w.stopped {
|
||||||
|
t.Fatal("expected the watcher to be stopped but it wasn't")
|
||||||
|
}
|
||||||
|
verifyEvents(t, w, []watch.Event{
|
||||||
|
{Type: watch.Added, Object: makePod(1)},
|
||||||
|
{Type: watch.Added, Object: makePod(2)},
|
||||||
|
{Type: watch.Added, Object: makePod(5)},
|
||||||
|
{Type: watch.Bookmark, Object: &v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
ResourceVersion: "10",
|
||||||
|
Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
|
||||||
|
},
|
||||||
|
}},
|
||||||
|
{Type: watch.Added, Object: makePod(15)},
|
||||||
|
}, true)
|
||||||
|
|
||||||
|
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
|
||||||
|
lock.RLock()
|
||||||
|
defer lock.RUnlock()
|
||||||
|
return count == 2, nil
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("expected forget() to be called twice, the second call is from w.Stop() method called from w.processInterval(): %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBookmarkAfterResourceVersionWatchers(t *testing.T) {
|
||||||
|
newWatcher := func(id string, deadline time.Time) *cacheWatcher {
|
||||||
|
w := newCacheWatcher(0, func(_ string, _ labels.Set, _ fields.Set) bool { return true }, func(bool) {}, testVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, id)
|
||||||
|
w.setBookmarkAfterResourceVersion(10)
|
||||||
|
return w
|
||||||
|
}
|
||||||
|
|
||||||
|
clock := testingclock.NewFakeClock(time.Now())
|
||||||
|
target := newTimeBucketWatchers(clock, defaultBookmarkFrequency)
|
||||||
|
if !target.addWatcher(newWatcher("1", clock.Now().Add(2*time.Minute))) {
|
||||||
|
t.Fatal("failed adding an even to the watcher")
|
||||||
|
}
|
||||||
|
|
||||||
|
// the watcher is immediately expired (it's waiting for bookmark, so it is scheduled immediately)
|
||||||
|
ret := target.popExpiredWatchers()
|
||||||
|
if len(ret) != 1 || len(ret[0]) != 1 {
|
||||||
|
t.Fatalf("expected only one watcher to be expired")
|
||||||
|
}
|
||||||
|
if !target.addWatcher(ret[0][0]) {
|
||||||
|
t.Fatal("failed adding an even to the watcher")
|
||||||
|
}
|
||||||
|
|
||||||
|
// after one second time the watcher is still expired
|
||||||
|
clock.Step(1 * time.Second)
|
||||||
|
ret = target.popExpiredWatchers()
|
||||||
|
if len(ret) != 1 || len(ret[0]) != 1 {
|
||||||
|
t.Fatalf("expected only one watcher to be expired")
|
||||||
|
}
|
||||||
|
if !target.addWatcher(ret[0][0]) {
|
||||||
|
t.Fatal("failed adding an even to the watcher")
|
||||||
|
}
|
||||||
|
|
||||||
|
// after 29 seconds the watcher is still expired
|
||||||
|
clock.Step(29 * time.Second)
|
||||||
|
ret = target.popExpiredWatchers()
|
||||||
|
if len(ret) != 1 || len(ret[0]) != 1 {
|
||||||
|
t.Fatalf("expected only one watcher to be expired")
|
||||||
|
}
|
||||||
|
|
||||||
|
// after confirming the watcher is not expired immediately
|
||||||
|
ret[0][0].markBookmarkAfterRvAsReceived(&watchCacheEvent{Type: watch.Bookmark, ResourceVersion: 10, Object: &v1.Pod{}})
|
||||||
|
if !target.addWatcher(ret[0][0]) {
|
||||||
|
t.Fatal("failed adding an even to the watcher")
|
||||||
|
}
|
||||||
|
clock.Step(30 * time.Second)
|
||||||
|
ret = target.popExpiredWatchers()
|
||||||
|
if len(ret) != 0 {
|
||||||
|
t.Fatalf("didn't expect any watchers to be expired")
|
||||||
|
}
|
||||||
|
|
||||||
|
clock.Step(30 * time.Second)
|
||||||
|
ret = target.popExpiredWatchers()
|
||||||
|
if len(ret) != 1 || len(ret[0]) != 1 {
|
||||||
|
t.Fatalf("expected only one watcher to be expired")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -21,6 +21,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -34,7 +35,6 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/util/validation/field"
|
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/apiserver/pkg/audit"
|
"k8s.io/apiserver/pkg/audit"
|
||||||
@ -290,6 +290,9 @@ type Cacher struct {
|
|||||||
// newFunc is a function that creates new empty object storing a object of type Type.
|
// newFunc is a function that creates new empty object storing a object of type Type.
|
||||||
newFunc func() runtime.Object
|
newFunc func() runtime.Object
|
||||||
|
|
||||||
|
// newListFunc is a function that creates new empty list for storing objects of type Type.
|
||||||
|
newListFunc func() runtime.Object
|
||||||
|
|
||||||
// indexedTrigger is used for optimizing amount of watchers that needs to process
|
// indexedTrigger is used for optimizing amount of watchers that needs to process
|
||||||
// an incoming event.
|
// an incoming event.
|
||||||
indexedTrigger *indexedTriggerFunc
|
indexedTrigger *indexedTriggerFunc
|
||||||
@ -371,6 +374,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
|
|||||||
groupResource: config.GroupResource,
|
groupResource: config.GroupResource,
|
||||||
versioner: config.Versioner,
|
versioner: config.Versioner,
|
||||||
newFunc: config.NewFunc,
|
newFunc: config.NewFunc,
|
||||||
|
newListFunc: config.NewListFunc,
|
||||||
indexedTrigger: indexedTrigger,
|
indexedTrigger: indexedTrigger,
|
||||||
watcherIdx: 0,
|
watcherIdx: 0,
|
||||||
watchers: indexedWatchers{
|
watchers: indexedWatchers{
|
||||||
@ -498,19 +502,18 @@ type namespacedName struct {
|
|||||||
|
|
||||||
// Watch implements storage.Interface.
|
// Watch implements storage.Interface.
|
||||||
func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
|
func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
|
||||||
if opts.SendInitialEvents != nil {
|
|
||||||
return nil, errors.NewInvalid(
|
|
||||||
schema.GroupKind{Group: c.groupResource.Group, Kind: c.groupResource.Resource},
|
|
||||||
"",
|
|
||||||
field.ErrorList{field.Forbidden(field.NewPath("sendInitialEvents"), "for watch is not yet implemented by the watch cache")},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
pred := opts.Predicate
|
pred := opts.Predicate
|
||||||
// If the resourceVersion is unset, ensure that the rv
|
// if the watch-list feature wasn't set and the resourceVersion is unset
|
||||||
// from which the watch is being served, is the latest
|
// ensure that the rv from which the watch is being served, is the latest
|
||||||
// one. "latest" is ensured by serving the watch from
|
// one. "latest" is ensured by serving the watch from
|
||||||
// the underlying storage.
|
// the underlying storage.
|
||||||
if opts.ResourceVersion == "" {
|
//
|
||||||
|
// it should never happen due to our validation but let's just be super-safe here
|
||||||
|
// and disable sendingInitialEvents when the feature wasn't enabled
|
||||||
|
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) && opts.SendInitialEvents != nil {
|
||||||
|
opts.SendInitialEvents = nil
|
||||||
|
}
|
||||||
|
if opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
|
||||||
return c.storage.Watch(ctx, key, opts)
|
return c.storage.Watch(ctx, key, opts)
|
||||||
}
|
}
|
||||||
watchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
|
watchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
|
||||||
@ -553,6 +556,18 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
|
|||||||
// watchers on our watcher having a processing hiccup
|
// watchers on our watcher having a processing hiccup
|
||||||
chanSize := c.watchCache.suggestedWatchChannelSize(c.indexedTrigger != nil, triggerSupported)
|
chanSize := c.watchCache.suggestedWatchChannelSize(c.indexedTrigger != nil, triggerSupported)
|
||||||
|
|
||||||
|
// Determine a function that computes the bookmarkAfterResourceVersion
|
||||||
|
bookmarkAfterResourceVersionFn, err := c.getBookmarkAfterResourceVersionLockedFunc(ctx, watchRV, opts)
|
||||||
|
if err != nil {
|
||||||
|
return newErrWatcher(err), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Determine a function that computes the watchRV we should start from
|
||||||
|
startWatchResourceVersionFn, err := c.getStartResourceVersionForWatchLockedFunc(ctx, watchRV, opts)
|
||||||
|
if err != nil {
|
||||||
|
return newErrWatcher(err), nil
|
||||||
|
}
|
||||||
|
|
||||||
// Determine watch timeout('0' means deadline is not set, ignore checking)
|
// Determine watch timeout('0' means deadline is not set, ignore checking)
|
||||||
deadline, _ := ctx.Deadline()
|
deadline, _ := ctx.Deadline()
|
||||||
|
|
||||||
@ -580,6 +595,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
|
|||||||
// underlying watchCache is calling processEvent under its lock.
|
// underlying watchCache is calling processEvent under its lock.
|
||||||
c.watchCache.RLock()
|
c.watchCache.RLock()
|
||||||
defer c.watchCache.RUnlock()
|
defer c.watchCache.RUnlock()
|
||||||
|
watchRV = startWatchResourceVersionFn()
|
||||||
cacheInterval, err := c.watchCache.getAllEventsSinceLocked(watchRV)
|
cacheInterval, err := c.watchCache.getAllEventsSinceLocked(watchRV)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// To match the uncached watch implementation, once we have passed authn/authz/admission,
|
// To match the uncached watch implementation, once we have passed authn/authz/admission,
|
||||||
@ -593,6 +609,8 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
|
|||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
// Update watcher.forget function once we can compute it.
|
// Update watcher.forget function once we can compute it.
|
||||||
watcher.forget = forgetWatcher(c, watcher, c.watcherIdx, scope, triggerValue, triggerSupported)
|
watcher.forget = forgetWatcher(c, watcher, c.watcherIdx, scope, triggerValue, triggerSupported)
|
||||||
|
// Update the bookMarkAfterResourceVersion
|
||||||
|
watcher.setBookmarkAfterResourceVersion(bookmarkAfterResourceVersionFn())
|
||||||
c.watchers.addWatcher(watcher, c.watcherIdx, scope, triggerValue, triggerSupported)
|
c.watchers.addWatcher(watcher, c.watcherIdx, scope, triggerValue, triggerSupported)
|
||||||
|
|
||||||
// Add it to the queue only when the client support watch bookmarks.
|
// Add it to the queue only when the client support watch bookmarks.
|
||||||
@ -1165,6 +1183,88 @@ func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
|
|||||||
return c.versioner.ParseResourceVersion(resourceVersion)
|
return c.versioner.ParseResourceVersion(resourceVersion)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getCurrentResourceVersionFromStorage gets the current resource version from the underlying storage engine.
|
||||||
|
// this method issues an empty list request and reads only the ResourceVersion from the object metadata
|
||||||
|
func (c *Cacher) getCurrentResourceVersionFromStorage(ctx context.Context) (uint64, error) {
|
||||||
|
if c.newListFunc == nil {
|
||||||
|
return 0, fmt.Errorf("newListFunction wasn't provided for %v", c.objectType)
|
||||||
|
}
|
||||||
|
emptyList := c.newListFunc()
|
||||||
|
pred := storage.SelectionPredicate{
|
||||||
|
Label: labels.Everything(),
|
||||||
|
Field: fields.Everything(),
|
||||||
|
Limit: 1, // just in case we actually hit something
|
||||||
|
}
|
||||||
|
|
||||||
|
err := c.storage.GetList(ctx, c.resourcePrefix, storage.ListOptions{Predicate: pred}, emptyList)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
emptyListAccessor, err := meta.ListAccessor(emptyList)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
if emptyListAccessor == nil {
|
||||||
|
return 0, fmt.Errorf("unable to extract a list accessor from %T", emptyList)
|
||||||
|
}
|
||||||
|
|
||||||
|
currentResourceVersion, err := strconv.Atoi(emptyListAccessor.GetResourceVersion())
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if currentResourceVersion == 0 {
|
||||||
|
return 0, fmt.Errorf("the current resource version must be greater than 0")
|
||||||
|
}
|
||||||
|
return uint64(currentResourceVersion), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// getBookmarkAfterResourceVersionLockedFunc returns a function that
|
||||||
|
// spits a ResourceVersion after which the bookmark event will be delivered.
|
||||||
|
//
|
||||||
|
// The returned function must be called under the watchCache lock.
|
||||||
|
func (c *Cacher) getBookmarkAfterResourceVersionLockedFunc(ctx context.Context, parsedResourceVersion uint64, opts storage.ListOptions) (func() uint64, error) {
|
||||||
|
if opts.SendInitialEvents == nil || *opts.SendInitialEvents == false || !opts.Predicate.AllowWatchBookmarks {
|
||||||
|
return func() uint64 { return 0 }, nil
|
||||||
|
}
|
||||||
|
return c.getCommonResourceVersionLockedFunc(ctx, parsedResourceVersion, opts)
|
||||||
|
}
|
||||||
|
|
||||||
|
// getStartResourceVersionForWatchLockedFunc returns a function that
|
||||||
|
// spits a ResourceVersion the watch will be started from.
|
||||||
|
// Depending on the input parameters the semantics of the returned ResourceVersion are:
|
||||||
|
// - start at Exact (return parsedWatchResourceVersion)
|
||||||
|
// - start at Most Recent (return an RV from etcd)
|
||||||
|
// - start at Any (return the current watchCache's RV)
|
||||||
|
//
|
||||||
|
// The returned function must be called under the watchCache lock.
|
||||||
|
func (c *Cacher) getStartResourceVersionForWatchLockedFunc(ctx context.Context, parsedWatchResourceVersion uint64, opts storage.ListOptions) (func() uint64, error) {
|
||||||
|
if opts.SendInitialEvents == nil || *opts.SendInitialEvents == true {
|
||||||
|
return func() uint64 { return parsedWatchResourceVersion }, nil
|
||||||
|
}
|
||||||
|
return c.getCommonResourceVersionLockedFunc(ctx, parsedWatchResourceVersion, opts)
|
||||||
|
}
|
||||||
|
|
||||||
|
// getCommonResourceVersionLockedFunc a helper that simply computes a ResourceVersion
|
||||||
|
// based on the input parameters. Please examine callers of this method to get more context.
|
||||||
|
//
|
||||||
|
// The returned function must be called under the watchCache lock.
|
||||||
|
func (c *Cacher) getCommonResourceVersionLockedFunc(ctx context.Context, parsedWatchResourceVersion uint64, opts storage.ListOptions) (func() uint64, error) {
|
||||||
|
switch {
|
||||||
|
case len(opts.ResourceVersion) == 0:
|
||||||
|
rv, err := c.getCurrentResourceVersionFromStorage(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return func() uint64 { return rv }, nil
|
||||||
|
case parsedWatchResourceVersion == 0:
|
||||||
|
// here we assume that watchCache locked is already held
|
||||||
|
return func() uint64 { return c.watchCache.resourceVersion }, nil
|
||||||
|
default:
|
||||||
|
return func() uint64 { return parsedWatchResourceVersion }, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// cacherListerWatcher opaques storage.Interface to expose cache.ListerWatcher.
|
// cacherListerWatcher opaques storage.Interface to expose cache.ListerWatcher.
|
||||||
type cacherListerWatcher struct {
|
type cacherListerWatcher struct {
|
||||||
storage storage.Interface
|
storage storage.Interface
|
||||||
|
@ -27,6 +27,9 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"k8s.io/apimachinery/pkg/api/apitesting"
|
||||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
"k8s.io/apimachinery/pkg/api/meta"
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
@ -41,8 +44,14 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/apiserver/pkg/apis/example"
|
"k8s.io/apiserver/pkg/apis/example"
|
||||||
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
||||||
|
"k8s.io/apiserver/pkg/features"
|
||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
|
"k8s.io/apiserver/pkg/storage/etcd3"
|
||||||
|
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
|
||||||
|
"k8s.io/apiserver/pkg/storage/value/encrypt/identity"
|
||||||
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
||||||
|
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||||
"k8s.io/utils/clock"
|
"k8s.io/utils/clock"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -76,6 +85,9 @@ func (testVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
|
|||||||
return strconv.ParseUint(version, 10, 64)
|
return strconv.ParseUint(version, 10, 64)
|
||||||
}
|
}
|
||||||
func (testVersioner) ParseResourceVersion(resourceVersion string) (uint64, error) {
|
func (testVersioner) ParseResourceVersion(resourceVersion string) (uint64, error) {
|
||||||
|
if len(resourceVersion) == 0 {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
return strconv.ParseUint(resourceVersion, 10, 64)
|
return strconv.ParseUint(resourceVersion, 10, 64)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -111,7 +123,8 @@ func newTestCacher(s storage.Interface) (*Cacher, storage.Versioner, error) {
|
|||||||
|
|
||||||
type dummyStorage struct {
|
type dummyStorage struct {
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
err error
|
err error
|
||||||
|
getListFn func(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type dummyWatch struct {
|
type dummyWatch struct {
|
||||||
@ -151,10 +164,12 @@ func (d *dummyStorage) Get(_ context.Context, _ string, _ storage.GetOptions, _
|
|||||||
|
|
||||||
return d.err
|
return d.err
|
||||||
}
|
}
|
||||||
func (d *dummyStorage) GetList(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error {
|
func (d *dummyStorage) GetList(ctx context.Context, resPrefix string, opts storage.ListOptions, listObj runtime.Object) error {
|
||||||
|
if d.getListFn != nil {
|
||||||
|
return d.getListFn(ctx, resPrefix, opts, listObj)
|
||||||
|
}
|
||||||
d.RLock()
|
d.RLock()
|
||||||
defer d.RUnlock()
|
defer d.RUnlock()
|
||||||
|
|
||||||
podList := listObj.(*example.PodList)
|
podList := listObj.(*example.PodList)
|
||||||
podList.ListMeta = metav1.ListMeta{ResourceVersion: "100"}
|
podList.ListMeta = metav1.ListMeta{ResourceVersion: "100"}
|
||||||
return d.err
|
return d.err
|
||||||
@ -1082,28 +1097,68 @@ func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func verifyEvents(t *testing.T, w watch.Interface, events []watch.Event) {
|
func verifyEvents(t *testing.T, w watch.Interface, events []watch.Event, strictOrder bool) {
|
||||||
_, _, line, _ := goruntime.Caller(1)
|
_, _, line, _ := goruntime.Caller(1)
|
||||||
for _, expectedEvent := range events {
|
actualEvents := make([]watch.Event, len(events))
|
||||||
|
for idx := range events {
|
||||||
select {
|
select {
|
||||||
case event := <-w.ResultChan():
|
case event := <-w.ResultChan():
|
||||||
if e, a := expectedEvent.Type, event.Type; e != a {
|
actualEvents[idx] = event
|
||||||
t.Logf("(called from line %d)", line)
|
|
||||||
t.Errorf("Expected: %s, got: %s", e, a)
|
|
||||||
}
|
|
||||||
object := event.Object
|
|
||||||
if co, ok := object.(runtime.CacheableObject); ok {
|
|
||||||
object = co.GetObject()
|
|
||||||
}
|
|
||||||
if e, a := expectedEvent.Object, object; !apiequality.Semantic.DeepEqual(e, a) {
|
|
||||||
t.Logf("(called from line %d)", line)
|
|
||||||
t.Errorf("Expected: %#v, got: %#v", e, a)
|
|
||||||
}
|
|
||||||
case <-time.After(wait.ForeverTestTimeout):
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
t.Logf("(called from line %d)", line)
|
t.Logf("(called from line %d)", line)
|
||||||
t.Errorf("Timed out waiting for an event")
|
t.Errorf("Timed out waiting for an event")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
validateEvents := func(expected, actual watch.Event) (bool, []string) {
|
||||||
|
errors := []string{}
|
||||||
|
if e, a := expected.Type, actual.Type; e != a {
|
||||||
|
errors = append(errors, fmt.Sprintf("Expected: %s, got: %s", e, a))
|
||||||
|
}
|
||||||
|
actualObject := actual.Object
|
||||||
|
if co, ok := actualObject.(runtime.CacheableObject); ok {
|
||||||
|
actualObject = co.GetObject()
|
||||||
|
}
|
||||||
|
if e, a := expected.Object, actualObject; !apiequality.Semantic.DeepEqual(e, a) {
|
||||||
|
errors = append(errors, fmt.Sprintf("Expected: %#v, got: %#v", e, a))
|
||||||
|
}
|
||||||
|
return len(errors) == 0, errors
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(events) != len(actualEvents) {
|
||||||
|
t.Fatalf("unexpected number of events: %d, expected: %d, acutalEvents: %#v, expectedEvents:%#v", len(actualEvents), len(events), actualEvents, events)
|
||||||
|
}
|
||||||
|
|
||||||
|
if strictOrder {
|
||||||
|
for idx, expectedEvent := range events {
|
||||||
|
valid, errors := validateEvents(expectedEvent, actualEvents[idx])
|
||||||
|
if !valid {
|
||||||
|
t.Logf("(called from line %d)", line)
|
||||||
|
for _, err := range errors {
|
||||||
|
t.Errorf(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, expectedEvent := range events {
|
||||||
|
validated := false
|
||||||
|
for _, actualEvent := range actualEvents {
|
||||||
|
if validated, _ = validateEvents(expectedEvent, actualEvent); validated {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !validated {
|
||||||
|
t.Fatalf("Expected: %#v but didn't find", expectedEvent)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func verifyNoEvents(t *testing.T, w watch.Interface) {
|
||||||
|
select {
|
||||||
|
case e := <-w.ResultChan():
|
||||||
|
t.Errorf("Unexpected: %#v event received, expected no events", e)
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCachingDeleteEvents(t *testing.T) {
|
func TestCachingDeleteEvents(t *testing.T) {
|
||||||
@ -1183,9 +1238,9 @@ func TestCachingDeleteEvents(t *testing.T) {
|
|||||||
cacher.watchCache.Update(pod3)
|
cacher.watchCache.Update(pod3)
|
||||||
cacher.watchCache.Delete(pod4)
|
cacher.watchCache.Delete(pod4)
|
||||||
|
|
||||||
verifyEvents(t, allEventsWatcher, allEvents)
|
verifyEvents(t, allEventsWatcher, allEvents, true)
|
||||||
verifyEvents(t, fooEventsWatcher, fooEvents)
|
verifyEvents(t, fooEventsWatcher, fooEvents, true)
|
||||||
verifyEvents(t, barEventsWatcher, barEvents)
|
verifyEvents(t, barEventsWatcher, barEvents, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
func testCachingObjects(t *testing.T, watchersCount int) {
|
func testCachingObjects(t *testing.T, watchersCount int) {
|
||||||
@ -1367,3 +1422,272 @@ func TestCacheIntervalInvalidationStopsWatch(t *testing.T) {
|
|||||||
t.Errorf("unexpected number of events received, expected: %d, got: %d", bufferSize+1, received)
|
t.Errorf("unexpected number of events received, expected: %d, got: %d", bufferSize+1, received)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCacherWatchSemantics(t *testing.T) {
|
||||||
|
trueVal, falseVal := true, false
|
||||||
|
makePod := func(rv uint64) *example.Pod {
|
||||||
|
return &example.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: fmt.Sprintf("pod-%d", rv),
|
||||||
|
Namespace: "ns",
|
||||||
|
ResourceVersion: fmt.Sprintf("%d", rv),
|
||||||
|
Annotations: map[string]string{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
scenarios := []struct {
|
||||||
|
name string
|
||||||
|
allowWatchBookmarks bool
|
||||||
|
sendInitialEvents *bool
|
||||||
|
resourceVersion string
|
||||||
|
storageResourceVersion string
|
||||||
|
|
||||||
|
initialPods []*example.Pod
|
||||||
|
podsAfterEstablishingWatch []*example.Pod
|
||||||
|
|
||||||
|
expectedInitialEventsInStrictOrder []watch.Event
|
||||||
|
expectedInitialEventsInRandomOrder []watch.Event
|
||||||
|
expectedEventsAfterEstablishingWatch []watch.Event
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=unset, storageRV=102",
|
||||||
|
allowWatchBookmarks: true,
|
||||||
|
sendInitialEvents: &trueVal,
|
||||||
|
storageResourceVersion: "102",
|
||||||
|
initialPods: []*example.Pod{makePod(101)},
|
||||||
|
podsAfterEstablishingWatch: []*example.Pod{makePod(102)},
|
||||||
|
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}},
|
||||||
|
expectedEventsAfterEstablishingWatch: []watch.Event{
|
||||||
|
{Type: watch.Added, Object: makePod(102)},
|
||||||
|
{Type: watch.Bookmark, Object: &example.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
ResourceVersion: "102",
|
||||||
|
Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
|
||||||
|
},
|
||||||
|
}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=0, storageRV=105",
|
||||||
|
allowWatchBookmarks: true,
|
||||||
|
sendInitialEvents: &trueVal,
|
||||||
|
resourceVersion: "0",
|
||||||
|
storageResourceVersion: "105",
|
||||||
|
initialPods: []*example.Pod{makePod(101), makePod(102)},
|
||||||
|
expectedInitialEventsInRandomOrder: []watch.Event{
|
||||||
|
{Type: watch.Added, Object: makePod(101)},
|
||||||
|
{Type: watch.Added, Object: makePod(102)},
|
||||||
|
},
|
||||||
|
expectedInitialEventsInStrictOrder: []watch.Event{
|
||||||
|
{Type: watch.Bookmark, Object: &example.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
ResourceVersion: "102",
|
||||||
|
Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
|
||||||
|
},
|
||||||
|
}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=101, storageRV=105",
|
||||||
|
allowWatchBookmarks: true,
|
||||||
|
sendInitialEvents: &trueVal,
|
||||||
|
resourceVersion: "101",
|
||||||
|
storageResourceVersion: "105",
|
||||||
|
initialPods: []*example.Pod{makePod(101), makePod(102)},
|
||||||
|
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(102)}},
|
||||||
|
expectedInitialEventsInStrictOrder: []watch.Event{
|
||||||
|
{Type: watch.Bookmark, Object: &example.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
ResourceVersion: "102",
|
||||||
|
Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
|
||||||
|
},
|
||||||
|
}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=unset, storageRV=102",
|
||||||
|
sendInitialEvents: &trueVal,
|
||||||
|
storageResourceVersion: "102",
|
||||||
|
initialPods: []*example.Pod{makePod(101)},
|
||||||
|
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}},
|
||||||
|
podsAfterEstablishingWatch: []*example.Pod{makePod(102)},
|
||||||
|
expectedEventsAfterEstablishingWatch: []watch.Event{{Type: watch.Added, Object: makePod(102)}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// note we set storage's RV to some future value, mustn't be used by this scenario
|
||||||
|
name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=0, storageRV=105",
|
||||||
|
sendInitialEvents: &trueVal,
|
||||||
|
resourceVersion: "0",
|
||||||
|
storageResourceVersion: "105",
|
||||||
|
initialPods: []*example.Pod{makePod(101), makePod(102)},
|
||||||
|
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// note we set storage's RV to some future value, mustn't be used by this scenario
|
||||||
|
name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=101, storageRV=105",
|
||||||
|
sendInitialEvents: &trueVal,
|
||||||
|
resourceVersion: "101",
|
||||||
|
storageResourceVersion: "105",
|
||||||
|
initialPods: []*example.Pod{makePod(101), makePod(102)},
|
||||||
|
// make sure we only get initial events that are > initial RV (101)
|
||||||
|
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(102)}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "sendInitialEvents=false, RV=unset, storageRV=103",
|
||||||
|
sendInitialEvents: &falseVal,
|
||||||
|
storageResourceVersion: "103",
|
||||||
|
initialPods: []*example.Pod{makePod(101), makePod(102)},
|
||||||
|
podsAfterEstablishingWatch: []*example.Pod{makePod(104)},
|
||||||
|
expectedEventsAfterEstablishingWatch: []watch.Event{{Type: watch.Added, Object: makePod(104)}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// note we set storage's RV to some future value, mustn't be used by this scenario
|
||||||
|
name: "sendInitialEvents=false, RV=0, storageRV=105",
|
||||||
|
sendInitialEvents: &falseVal,
|
||||||
|
resourceVersion: "0",
|
||||||
|
storageResourceVersion: "105",
|
||||||
|
initialPods: []*example.Pod{makePod(101), makePod(102)},
|
||||||
|
podsAfterEstablishingWatch: []*example.Pod{makePod(103)},
|
||||||
|
expectedEventsAfterEstablishingWatch: []watch.Event{{Type: watch.Added, Object: makePod(103)}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// note we set storage's RV to some future value, mustn't be used by this scenario
|
||||||
|
name: "legacy, RV=0, storageRV=105",
|
||||||
|
resourceVersion: "0",
|
||||||
|
storageResourceVersion: "105",
|
||||||
|
initialPods: []*example.Pod{makePod(101), makePod(102)},
|
||||||
|
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// note we set storage's RV to some future value, mustn't be used by this scenario
|
||||||
|
name: "legacy, RV=unset, storageRV=105",
|
||||||
|
storageResourceVersion: "105",
|
||||||
|
initialPods: []*example.Pod{makePod(101), makePod(102)},
|
||||||
|
// no events because the watch is delegated to the underlying storage
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, scenario := range scenarios {
|
||||||
|
t.Run(scenario.name, func(t *testing.T) {
|
||||||
|
// set up env
|
||||||
|
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)()
|
||||||
|
storageListMetaResourceVersion := ""
|
||||||
|
backingStorage := &dummyStorage{getListFn: func(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error {
|
||||||
|
podList := listObj.(*example.PodList)
|
||||||
|
podList.ListMeta = metav1.ListMeta{ResourceVersion: storageListMetaResourceVersion}
|
||||||
|
return nil
|
||||||
|
}}
|
||||||
|
|
||||||
|
cacher, _, err := newTestCacher(backingStorage)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("falied to create cacher: %v", err)
|
||||||
|
}
|
||||||
|
defer cacher.Stop()
|
||||||
|
if err := cacher.ready.wait(context.TODO()); err != nil {
|
||||||
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
|
|
||||||
|
// now, run a scenario
|
||||||
|
// but first let's add some initial data
|
||||||
|
for _, obj := range scenario.initialPods {
|
||||||
|
err = cacher.watchCache.Add(obj)
|
||||||
|
require.NoError(t, err, "failed to add a pod: %v")
|
||||||
|
}
|
||||||
|
// read request params
|
||||||
|
opts := storage.ListOptions{Predicate: storage.Everything}
|
||||||
|
opts.SendInitialEvents = scenario.sendInitialEvents
|
||||||
|
opts.Predicate.AllowWatchBookmarks = scenario.allowWatchBookmarks
|
||||||
|
if len(scenario.resourceVersion) > 0 {
|
||||||
|
opts.ResourceVersion = scenario.resourceVersion
|
||||||
|
}
|
||||||
|
// before starting a new watch set a storage RV to some future value
|
||||||
|
storageListMetaResourceVersion = scenario.storageResourceVersion
|
||||||
|
|
||||||
|
w, err := cacher.Watch(context.Background(), "pods/ns", opts)
|
||||||
|
require.NoError(t, err, "failed to create watch: %v")
|
||||||
|
defer w.Stop()
|
||||||
|
|
||||||
|
// make sure we only get initial events
|
||||||
|
verifyEvents(t, w, scenario.expectedInitialEventsInRandomOrder, false)
|
||||||
|
verifyEvents(t, w, scenario.expectedInitialEventsInStrictOrder, true)
|
||||||
|
verifyNoEvents(t, w)
|
||||||
|
// add a pod that is greater than the storage's RV when the watch was started
|
||||||
|
for _, obj := range scenario.podsAfterEstablishingWatch {
|
||||||
|
err = cacher.watchCache.Add(obj)
|
||||||
|
require.NoError(t, err, "failed to add a pod: %v")
|
||||||
|
}
|
||||||
|
verifyEvents(t, w, scenario.expectedEventsAfterEstablishingWatch, true)
|
||||||
|
verifyNoEvents(t, w)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetCurrentResourceVersionFromStorage(t *testing.T) {
|
||||||
|
// test data
|
||||||
|
newEtcdTestStorage := func(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
|
||||||
|
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
|
||||||
|
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, example.SchemeGroupVersion), func() runtime.Object { return &example.Pod{} }, prefix, schema.GroupResource{Resource: "pods"}, identity.NewEncryptCheckTransformer(), true, etcd3.NewDefaultLeaseManagerConfig())
|
||||||
|
return server, storage
|
||||||
|
}
|
||||||
|
server, etcdStorage := newEtcdTestStorage(t, "")
|
||||||
|
defer server.Terminate(t)
|
||||||
|
podCacher, versioner, err := newTestCacher(etcdStorage)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create podCacher: %v", err)
|
||||||
|
}
|
||||||
|
defer podCacher.Stop()
|
||||||
|
|
||||||
|
makePod := func(name string) *example.Pod {
|
||||||
|
return &example.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
createPod := func(obj *example.Pod) *example.Pod {
|
||||||
|
key := "pods/" + obj.Namespace + "/" + obj.Name
|
||||||
|
out := &example.Pod{}
|
||||||
|
err := etcdStorage.Create(context.TODO(), key, obj, out, 0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
getPod := func(name, ns string) *example.Pod {
|
||||||
|
key := "pods/" + ns + "/" + name
|
||||||
|
out := &example.Pod{}
|
||||||
|
err := etcdStorage.Get(context.TODO(), key, storage.GetOptions{}, out)
|
||||||
|
require.NoError(t, err)
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
makeReplicaSet := func(name string) *example.ReplicaSet {
|
||||||
|
return &example.ReplicaSet{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
createReplicaSet := func(obj *example.ReplicaSet) *example.ReplicaSet {
|
||||||
|
key := "replicasets/" + obj.Namespace + "/" + obj.Name
|
||||||
|
out := &example.ReplicaSet{}
|
||||||
|
err := etcdStorage.Create(context.TODO(), key, obj, out, 0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
// create a pod and make sure its RV is equal to the one maintained by etcd
|
||||||
|
pod := createPod(makePod("pod-1"))
|
||||||
|
currentStorageRV, err := podCacher.getCurrentResourceVersionFromStorage(context.TODO())
|
||||||
|
require.NoError(t, err)
|
||||||
|
podRV, err := versioner.ParseResourceVersion(pod.ResourceVersion)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, currentStorageRV, podRV, "expected the global etcd RV to be equal to pod's RV")
|
||||||
|
|
||||||
|
// now create a replicaset (new resource) and make sure the target function returns global etcd RV
|
||||||
|
rs := createReplicaSet(makeReplicaSet("replicaset-1"))
|
||||||
|
currentStorageRV, err = podCacher.getCurrentResourceVersionFromStorage(context.TODO())
|
||||||
|
require.NoError(t, err)
|
||||||
|
rsRV, err := versioner.ParseResourceVersion(rs.ResourceVersion)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, currentStorageRV, rsRV, "expected the global etcd RV to be equal to replicaset's RV")
|
||||||
|
|
||||||
|
// ensure that the pod's RV hasn't been changed
|
||||||
|
currentPod := getPod(pod.Name, pod.Namespace)
|
||||||
|
currentPodRV, err := versioner.ParseResourceVersion(currentPod.ResourceVersion)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, currentPodRV, podRV, "didn't expect to see the pod's RV changed")
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user