mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Watch bookmarks may contain version of objects of other types
This commit is contained in:
parent
4b24dca228
commit
0bd8104809
@ -65,12 +65,9 @@ go_test(
|
||||
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/apis/example:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/apis/example/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
|
||||
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
@ -793,7 +793,19 @@ func (c *Cacher) dispatchEvents() {
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
c.dispatchEvent(&event)
|
||||
// Don't dispatch bookmarks coming from the storage layer.
|
||||
// They can be very frequent (even to the level of subseconds)
|
||||
// to allow efficient watch resumption on kube-apiserver restarts,
|
||||
// and propagating them down may overload the whole system.
|
||||
//
|
||||
// TODO: If at some point we decide the performance and scalability
|
||||
// footprint is acceptable, this is the place to hook them in.
|
||||
// However, we then need to check if this was called as a result
|
||||
// of a bookmark event or regular Add/Update/Delete operation by
|
||||
// checking if resourceVersion here has changed.
|
||||
if event.Type != watch.Bookmark {
|
||||
c.dispatchEvent(&event)
|
||||
}
|
||||
lastProcessedResourceVersion = event.ResourceVersion
|
||||
case <-bookmarkTimer.C():
|
||||
bookmarkTimer.Reset(wait.Jitter(time.Second, 0.25))
|
||||
|
@ -41,10 +41,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/apis/example"
|
||||
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
||||
"k8s.io/apiserver/pkg/features"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -637,7 +634,6 @@ func TestTimeBucketWatchersBasic(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestCacherNoLeakWithMultipleWatchers(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)()
|
||||
backingStorage := &dummyStorage{}
|
||||
cacher, _, err := newTestCacher(backingStorage)
|
||||
if err != nil {
|
||||
@ -865,7 +861,6 @@ func TestCacherSendsMultipleWatchBookmarks(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)()
|
||||
backingStorage := &dummyStorage{}
|
||||
cacher, _, err := newTestCacher(backingStorage)
|
||||
if err != nil {
|
||||
@ -938,6 +933,71 @@ func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestBookmarksOnResourceVersionUpdates(t *testing.T) {
|
||||
backingStorage := &dummyStorage{}
|
||||
cacher, _, err := newTestCacher(backingStorage)
|
||||
if err != nil {
|
||||
t.Fatalf("Couldn't create cacher: %v", err)
|
||||
}
|
||||
defer cacher.Stop()
|
||||
|
||||
// Ensure that bookmarks are sent more frequently than every 1m.
|
||||
cacher.bookmarkWatchers = newTimeBucketWatchers(clock.RealClock{}, 2*time.Second)
|
||||
|
||||
// Wait until cacher is initialized.
|
||||
cacher.ready.wait()
|
||||
|
||||
makePod := func(i int) *examplev1.Pod {
|
||||
return &examplev1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("pod-%d", i),
|
||||
Namespace: "ns",
|
||||
ResourceVersion: fmt.Sprintf("%d", i),
|
||||
},
|
||||
}
|
||||
}
|
||||
if err := cacher.watchCache.Add(makePod(1000)); err != nil {
|
||||
t.Errorf("error: %v", err)
|
||||
}
|
||||
|
||||
pred := storage.Everything
|
||||
pred.AllowWatchBookmarks = true
|
||||
|
||||
w, err := cacher.Watch(context.TODO(), "/pods/ns", storage.ListOptions{
|
||||
ResourceVersion: "1000",
|
||||
Predicate: pred,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create watch: %v", err)
|
||||
}
|
||||
|
||||
expectedRV := 2000
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for {
|
||||
event, ok := <-w.ResultChan()
|
||||
if !ok {
|
||||
t.Fatalf("Unexpected closed channel")
|
||||
}
|
||||
rv, err := cacher.versioner.ObjectResourceVersion(event.Object)
|
||||
if err != nil {
|
||||
t.Errorf("failed to parse resource version from %#v: %v", event.Object, err)
|
||||
}
|
||||
if event.Type == watch.Bookmark && rv == uint64(expectedRV) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Simulate progress notify event.
|
||||
cacher.watchCache.UpdateResourceVersion(strconv.Itoa(expectedRV))
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) {
|
||||
backingStorage := &dummyStorage{}
|
||||
cacher, _, err := newTestCacher(backingStorage)
|
||||
|
@ -320,8 +320,9 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
|
||||
}
|
||||
|
||||
// Avoid calling event handler under lock.
|
||||
// This is safe as long as there is at most one call to processEvent in flight
|
||||
// at any point in time.
|
||||
// This is safe as long as there is at most one call to Add/Update/Delete and
|
||||
// UpdateResourceVersion in flight at any point in time, which is true now,
|
||||
// because reflector calls them synchronously from its main thread.
|
||||
if w.eventHandler != nil {
|
||||
w.eventHandler(wcEvent)
|
||||
}
|
||||
@ -388,20 +389,23 @@ func (w *watchCache) UpdateResourceVersion(resourceVersion string) {
|
||||
return
|
||||
}
|
||||
|
||||
w.Lock()
|
||||
defer w.Unlock()
|
||||
w.resourceVersion = rv
|
||||
func() {
|
||||
w.Lock()
|
||||
defer w.Unlock()
|
||||
w.resourceVersion = rv
|
||||
}()
|
||||
|
||||
// Don't dispatch bookmarks coming from the storage layer.
|
||||
// They can be very frequent (even to the level of subseconds)
|
||||
// to allow efficient watch resumption on kube-apiserver restarts,
|
||||
// and propagating them down may overload the whole system.
|
||||
//
|
||||
// TODO: If at some point we decide the performance and scalability
|
||||
// footprint is acceptable, this is the place to hook them in.
|
||||
// However, we then need to check if this was called as a result
|
||||
// of a bookmark event or regular Add/Update/Delete operation by
|
||||
// checking if resourceVersion here has changed.
|
||||
// Avoid calling event handler under lock.
|
||||
// This is safe as long as there is at most one call to Add/Update/Delete and
|
||||
// UpdateResourceVersion in flight at any point in time, which is true now,
|
||||
// because reflector calls them synchronously from its main thread.
|
||||
if w.eventHandler != nil {
|
||||
wcEvent := &watchCacheEvent{
|
||||
Type: watch.Bookmark,
|
||||
ResourceVersion: rv,
|
||||
}
|
||||
w.eventHandler(wcEvent)
|
||||
}
|
||||
}
|
||||
|
||||
// List returns list of pointers to <storeElement> objects.
|
||||
|
Loading…
Reference in New Issue
Block a user