Merge pull request #110772 from p0lyn0mial/upstream-reflector-gets-stream

client-go: Add support for API streaming to the reflector
This commit is contained in:
Kubernetes Prow Robot 2023-03-10 05:34:39 -08:00 committed by GitHub
commit 90c3232de7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 702 additions and 12 deletions

View File

@ -415,6 +415,9 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
// We don't want to terminate all watchers as recreating all watchers puts high load on api-server.
// In most of the cases, leader is reelected within few cycles.
reflector.MaxInternalErrorRetryDuration = time.Second * 30
// since the watch-list is provided by the watch cache instruct
// the reflector to issue a regular LIST against the store
reflector.UseWatchList = false
cacher.watchCache = watchCache
cacher.reflector = reflector

View File

@ -18,6 +18,7 @@ package cache
import (
"errors"
"os"
"sync"
"time"
@ -147,6 +148,9 @@ func (c *controller) Run(stopCh <-chan struct{}) {
if c.config.WatchErrorHandler != nil {
r.watchErrorHandler = c.config.WatchErrorHandler
}
if s := os.Getenv("ENABLE_CLIENT_GO_WATCH_LIST_ALPHA"); len(s) > 0 {
r.UseWatchList = true
}
c.reflectorMutex.Lock()
c.reflector = r

View File

@ -41,6 +41,7 @@ import (
"k8s.io/client-go/tools/pager"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
"k8s.io/utils/pointer"
"k8s.io/utils/trace"
)
@ -99,6 +100,15 @@ type Reflector struct {
ShouldResync func() bool
// MaxInternalErrorRetryDuration defines how long we should retry internal errors returned by watch.
MaxInternalErrorRetryDuration time.Duration
// UseWatchList if turned on instructs the reflector to open a stream to bring data from the API server.
// Streaming has the primary advantage of using fewer server's resources to fetch data.
//
// The old behaviour establishes a LIST request which gets data in chunks.
// Paginated list is less efficient and depending on the actual size of objects
// might result in an increased memory consumption of the APIServer.
//
// See https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details
UseWatchList bool
}
// ResourceVersionUpdater is an interface that allows store implementation to
@ -311,17 +321,39 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
// It returns error if ListAndWatch didn't even try to initialize watch.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name)
var err error
var w watch.Interface
fallbackToList := !r.UseWatchList
err := r.list(stopCh)
if err != nil {
return err
if r.UseWatchList {
w, err = r.watchList(stopCh)
if w == nil && err == nil {
// stopCh was closed
return nil
}
if err != nil {
if !apierrors.IsInvalid(err) {
return err
}
klog.Warning("the watch-list feature is not supported by the server, falling back to the previous LIST/WATCH semantic")
fallbackToList = true
// Ensure that we won't accidentally pass some garbage down the watch.
w = nil
}
}
if fallbackToList {
err = r.list(stopCh)
if err != nil {
return err
}
}
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
go r.startResync(stopCh, cancelCh, resyncerrc)
return r.watch(nil, stopCh, resyncerrc)
return r.watch(w, stopCh, resyncerrc)
}
// startResync periodically calls r.store.Resync() method.
@ -392,8 +424,9 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc
}
}
err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh)
err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, nil, r.clock, resyncerrc, stopCh)
// Ensure that watch will not be reused across iterations.
w.Stop()
w = nil
retry.After(err)
if err != nil {
@ -528,6 +561,114 @@ func (r *Reflector) list(stopCh <-chan struct{}) error {
return nil
}
// watchList establishes a stream to get a consistent snapshot of data
// from the server as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#proposal
//
// case 1: start at Most Recent (RV="", ResourceVersionMatch=ResourceVersionMatchNotOlderThan)
// Establishes a consistent stream with the server.
// That means the returned data is consistent, as if, served directly from etcd via a quorum read.
// It begins with synthetic "Added" events of all resources up to the most recent ResourceVersion.
// It ends with a synthetic "Bookmark" event containing the most recent ResourceVersion.
// After receiving a "Bookmark" event the reflector is considered to be synchronized.
// It replaces its internal store with the collected items and
// reuses the current watch requests for getting further events.
//
// case 2: start at Exact (RV>"0", ResourceVersionMatch=ResourceVersionMatchNotOlderThan)
// Establishes a stream with the server at the provided resource version.
// To establish the initial state the server begins with synthetic "Added" events.
// It ends with a synthetic "Bookmark" event containing the provided or newer resource version.
// After receiving a "Bookmark" event the reflector is considered to be synchronized.
// It replaces its internal store with the collected items and
// reuses the current watch requests for getting further events.
func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) {
var w watch.Interface
var err error
var temporaryStore Store
var resourceVersion string
// TODO(#115478): see if this function could be turned
// into a method and see if error handling
// could be unified with the r.watch method
isErrorRetriableWithSideEffectsFn := func(err error) bool {
if canRetry := isWatchErrorRetriable(err); canRetry {
klog.V(2).Infof("%s: watch-list of %v returned %v - backing off", r.name, r.typeDescription, err)
<-r.initConnBackoffManager.Backoff().C()
return true
}
if isExpiredError(err) || isTooLargeResourceVersionError(err) {
// we tried to re-establish a watch request but the provided RV
// has either expired or it is greater than the server knows about.
// In that case we reset the RV and
// try to get a consistent snapshot from the watch cache (case 1)
r.setIsLastSyncResourceVersionUnavailable(true)
return true
}
return false
}
initTrace := trace.New("Reflector WatchList", trace.Field{Key: "name", Value: r.name})
defer initTrace.LogIfLong(10 * time.Second)
for {
select {
case <-stopCh:
return nil, nil
default:
}
resourceVersion = ""
lastKnownRV := r.rewatchResourceVersion()
temporaryStore = NewStore(DeletionHandlingMetaNamespaceKeyFunc)
// TODO(#115478): large "list", slow clients, slow network, p&f
// might slow down streaming and eventually fail.
// maybe in such a case we should retry with an increased timeout?
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options := metav1.ListOptions{
ResourceVersion: lastKnownRV,
AllowWatchBookmarks: true,
SendInitialEvents: pointer.Bool(true),
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
TimeoutSeconds: &timeoutSeconds,
}
start := r.clock.Now()
w, err = r.listerWatcher.Watch(options)
if err != nil {
if isErrorRetriableWithSideEffectsFn(err) {
continue
}
return nil, err
}
bookmarkReceived := pointer.Bool(false)
err = watchHandler(start, w, temporaryStore, r.expectedType, r.expectedGVK, r.name, r.typeDescription,
func(rv string) { resourceVersion = rv },
bookmarkReceived,
r.clock, make(chan error), stopCh)
if err != nil {
w.Stop() // stop and retry with clean state
if err == errorStopRequested {
return nil, nil
}
if isErrorRetriableWithSideEffectsFn(err) {
continue
}
return nil, err
}
if *bookmarkReceived {
break
}
}
// We successfully got initial state from watch-list confirmed by the
// "k8s.io/initial-events-end" bookmark.
initTrace.Step("Objects streamed", trace.Field{Key: "count", Value: len(temporaryStore.List())})
r.setIsLastSyncResourceVersionUnavailable(false)
if err = r.store.Replace(temporaryStore.List(), resourceVersion); err != nil {
return nil, fmt.Errorf("unable to sync watch-list result: %v", err)
}
initTrace.Step("SyncWith done")
r.setLastSyncResourceVersion(resourceVersion)
return w, nil
}
// syncWith replaces the store's items with the given list.
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
found := make([]interface{}, 0, len(items))
@ -546,15 +687,17 @@ func watchHandler(start time.Time,
name string,
expectedTypeName string,
setLastSyncResourceVersion func(string),
exitOnInitialEventsEndBookmark *bool,
clock clock.Clock,
errc chan error,
stopCh <-chan struct{},
) error {
eventCount := 0
// Stopping the watcher should be idempotent and if we return from this function there's no way
// we're coming back in with the same watch interface.
defer w.Stop()
if exitOnInitialEventsEndBookmark != nil {
// set it to false just in case somebody
// made it positive
*exitOnInitialEventsEndBookmark = false
}
loop:
for {
@ -609,6 +752,11 @@ loop:
}
case watch.Bookmark:
// A `Bookmark` means watch has synced here, just update the resourceVersion
if _, ok := meta.GetAnnotations()["k8s.io/initial-events-end"]; ok {
if exitOnInitialEventsEndBookmark != nil {
*exitOnInitialEventsEndBookmark = true
}
}
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
}
@ -617,6 +765,11 @@ loop:
rvu.UpdateResourceVersion(resourceVersion)
}
eventCount++
if exitOnInitialEventsEndBookmark != nil && *exitOnInitialEventsEndBookmark {
watchDuration := clock.Since(start)
klog.V(4).Infof("exiting %v Watch because received the bookmark that marks the end of initial events stream, total %v items received in %v", name, eventCount, watchDuration)
return nil
}
}
}
@ -665,6 +818,18 @@ func (r *Reflector) relistResourceVersion() string {
return r.lastSyncResourceVersion
}
// rewatchResourceVersion determines the resource version the reflector should start streaming from.
func (r *Reflector) rewatchResourceVersion() string {
r.lastSyncResourceVersionMutex.RLock()
defer r.lastSyncResourceVersionMutex.RUnlock()
if r.isLastSyncResourceVersionUnavailable {
// initial stream should return data at the most recent resource version.
// the returned data must be consistent i.e. as if served from etcd via a quorum read
return ""
}
return r.lastSyncResourceVersion
}
// setIsLastSyncResourceVersionUnavailable sets if the last list or watch request with lastSyncResourceVersion returned
// "expired" or "too large resource version" error.
func (r *Reflector) setIsLastSyncResourceVersionUnavailable(isUnavailable bool) {

View File

@ -138,7 +138,7 @@ func TestReflectorWatchHandlerError(t *testing.T) {
go func() {
fw.Stop()
}()
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, wait.NeverStop)
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, wait.NeverStop)
if err == nil {
t.Errorf("unexpected non-error")
}
@ -157,7 +157,7 @@ func TestReflectorWatchHandler(t *testing.T) {
fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "32"}})
fw.Stop()
}()
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, wait.NeverStop)
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, wait.NeverStop)
if err != nil {
t.Errorf("unexpected error %v", err)
}
@ -205,7 +205,7 @@ func TestReflectorStopWatch(t *testing.T) {
fw := watch.NewFake()
stopWatch := make(chan struct{}, 1)
stopWatch <- struct{}{}
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, stopWatch)
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, stopWatch)
if err != errorStopRequested {
t.Errorf("expected stop error, got %q", err)
}

View File

@ -0,0 +1,518 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"fmt"
"sort"
"sync"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/utils/pointer"
)
func TestWatchList(t *testing.T) {
scenarios := []struct {
name string
disableUseWatchList bool
// closes listWatcher after sending the specified number of watch events
closeAfterWatchEvents int
// closes listWatcher after getting the specified number of watch requests
closeAfterWatchRequests int
// closes listWatcher after getting the specified number of list requests
closeAfterListRequests int
// stops Watcher after sending the specified number of watch events
stopAfterWatchEvents int
watchOptionsPredicate func(options metav1.ListOptions) error
watchEvents []watch.Event
podList *v1.PodList
expectedRequestOptions []metav1.ListOptions
expectedWatchRequests int
expectedListRequests int
expectedStoreContent []v1.Pod
expectedError error
}{
{
name: "the reflector won't be synced if the bookmark event has been received",
watchEvents: []watch.Event{{Type: watch.Added, Object: makePod("p1", "1")}},
closeAfterWatchEvents: 1,
expectedWatchRequests: 1,
expectedRequestOptions: []metav1.ListOptions{{
SendInitialEvents: pointer.Bool(true),
AllowWatchBookmarks: true,
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
TimeoutSeconds: pointer.Int64(1),
}},
},
{
name: "the reflector uses the old LIST/WATCH semantics if the UseWatchList is turned off",
disableUseWatchList: true,
closeAfterWatchRequests: 1,
podList: &v1.PodList{
ListMeta: metav1.ListMeta{ResourceVersion: "1"},
Items: []v1.Pod{*makePod("p1", "1")},
},
expectedWatchRequests: 1,
expectedListRequests: 1,
expectedRequestOptions: []metav1.ListOptions{
{
ResourceVersion: "0",
Limit: 500,
},
{
AllowWatchBookmarks: true,
ResourceVersion: "1",
TimeoutSeconds: pointer.Int64(1),
}},
expectedStoreContent: []v1.Pod{*makePod("p1", "1")},
},
{
name: "returning any other error than apierrors.NewInvalid stops the reflector and reports the error",
watchOptionsPredicate: func(options metav1.ListOptions) error {
return fmt.Errorf("dummy error")
},
expectedError: fmt.Errorf("dummy error"),
expectedWatchRequests: 1,
expectedRequestOptions: []metav1.ListOptions{{
SendInitialEvents: pointer.Bool(true),
AllowWatchBookmarks: true,
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
TimeoutSeconds: pointer.Int64(1),
}},
},
{
name: "the reflector can fall back to old LIST/WATCH semantics when a server doesn't support streaming",
watchOptionsPredicate: func(options metav1.ListOptions) error {
if options.SendInitialEvents != nil && *options.SendInitialEvents {
return apierrors.NewInvalid(schema.GroupKind{}, "streaming is not allowed", nil)
}
return nil
},
podList: &v1.PodList{
ListMeta: metav1.ListMeta{ResourceVersion: "1"},
Items: []v1.Pod{*makePod("p1", "1")},
},
closeAfterWatchEvents: 1,
watchEvents: []watch.Event{{Type: watch.Added, Object: makePod("p2", "2")}},
expectedWatchRequests: 2,
expectedListRequests: 1,
expectedStoreContent: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2")},
expectedRequestOptions: []metav1.ListOptions{
{
SendInitialEvents: pointer.Bool(true),
AllowWatchBookmarks: true,
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
TimeoutSeconds: pointer.Int64(1),
},
{
ResourceVersion: "0",
Limit: 500,
},
{
AllowWatchBookmarks: true,
ResourceVersion: "1",
TimeoutSeconds: pointer.Int64(1),
},
},
},
{
name: "prove that the reflector is synced after receiving a bookmark event",
closeAfterWatchEvents: 3,
watchEvents: []watch.Event{
{Type: watch.Added, Object: makePod("p1", "1")},
{Type: watch.Added, Object: makePod("p2", "2")},
{Type: watch.Bookmark, Object: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "2",
Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
},
}},
},
expectedWatchRequests: 1,
expectedRequestOptions: []metav1.ListOptions{{
SendInitialEvents: pointer.Bool(true),
AllowWatchBookmarks: true,
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
TimeoutSeconds: pointer.Int64(1),
}},
expectedStoreContent: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2")},
},
{
name: "check if Updates and Deletes events are propagated during streaming (until the bookmark is received)",
closeAfterWatchEvents: 6,
watchEvents: []watch.Event{
{Type: watch.Added, Object: makePod("p1", "1")},
{Type: watch.Added, Object: makePod("p2", "2")},
{Type: watch.Modified, Object: func() runtime.Object {
p1 := makePod("p1", "3")
p1.Spec.ActiveDeadlineSeconds = pointer.Int64(12)
return p1
}()},
{Type: watch.Added, Object: makePod("p3", "4")},
{Type: watch.Deleted, Object: makePod("p3", "5")},
{Type: watch.Bookmark, Object: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "5",
Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
},
}},
},
expectedWatchRequests: 1,
expectedRequestOptions: []metav1.ListOptions{{
SendInitialEvents: pointer.Bool(true),
AllowWatchBookmarks: true,
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
TimeoutSeconds: pointer.Int64(1),
}},
expectedStoreContent: []v1.Pod{
*makePod("p2", "2"),
func() v1.Pod {
p1 := *makePod("p1", "3")
p1.Spec.ActiveDeadlineSeconds = pointer.Int64(12)
return p1
}(),
},
},
{
name: "checks if the reflector retries 429",
watchOptionsPredicate: func() func(options metav1.ListOptions) error {
counter := 1
return func(options metav1.ListOptions) error {
if counter < 3 {
counter++
return apierrors.NewTooManyRequests("busy, check again later", 1)
}
return nil
}
}(),
closeAfterWatchEvents: 2,
watchEvents: []watch.Event{
{Type: watch.Added, Object: makePod("p1", "1")},
{Type: watch.Bookmark, Object: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "2",
Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
},
}},
},
expectedWatchRequests: 3,
expectedRequestOptions: []metav1.ListOptions{
{
SendInitialEvents: pointer.Bool(true),
AllowWatchBookmarks: true,
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
TimeoutSeconds: pointer.Int64(1),
},
{
SendInitialEvents: pointer.Bool(true),
AllowWatchBookmarks: true,
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
TimeoutSeconds: pointer.Int64(1),
},
{
SendInitialEvents: pointer.Bool(true),
AllowWatchBookmarks: true,
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
TimeoutSeconds: pointer.Int64(1),
},
},
expectedStoreContent: []v1.Pod{*makePod("p1", "1")},
},
{
name: "check if stopping a watcher before sync results in creating a new watch-list request",
stopAfterWatchEvents: 1,
closeAfterWatchEvents: 3,
watchEvents: []watch.Event{
{Type: watch.Added, Object: makePod("p1", "1")},
// second request
{Type: watch.Added, Object: makePod("p1", "1")},
{Type: watch.Bookmark, Object: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "1",
Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
},
}},
},
expectedWatchRequests: 2,
expectedRequestOptions: []metav1.ListOptions{
{
SendInitialEvents: pointer.Bool(true),
AllowWatchBookmarks: true,
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
TimeoutSeconds: pointer.Int64(1),
},
{
SendInitialEvents: pointer.Bool(true),
AllowWatchBookmarks: true,
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
TimeoutSeconds: pointer.Int64(1),
},
},
expectedStoreContent: []v1.Pod{*makePod("p1", "1")},
},
{
name: "stopping a watcher after synchronization results in creating a new watch request",
stopAfterWatchEvents: 4,
closeAfterWatchEvents: 5,
watchEvents: []watch.Event{
{Type: watch.Added, Object: makePod("p1", "1")},
{Type: watch.Added, Object: makePod("p2", "2")},
{Type: watch.Bookmark, Object: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "2",
Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
},
}},
{Type: watch.Added, Object: makePod("p3", "3")},
// second request
{Type: watch.Added, Object: makePod("p4", "4")},
},
expectedWatchRequests: 2,
expectedRequestOptions: []metav1.ListOptions{
{
SendInitialEvents: pointer.Bool(true),
AllowWatchBookmarks: true,
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
TimeoutSeconds: pointer.Int64(1),
},
{
AllowWatchBookmarks: true,
ResourceVersion: "3",
TimeoutSeconds: pointer.Int64(1),
},
},
expectedStoreContent: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2"), *makePod("p3", "3"), *makePod("p4", "4")},
},
{
name: "expiring an established watcher results in returning an error from the reflector",
watchOptionsPredicate: func() func(options metav1.ListOptions) error {
counter := 0
return func(options metav1.ListOptions) error {
counter++
if counter == 2 {
return apierrors.NewResourceExpired("rv already expired")
}
return nil
}
}(),
stopAfterWatchEvents: 3,
watchEvents: []watch.Event{
{Type: watch.Added, Object: makePod("p1", "1")},
{Type: watch.Bookmark, Object: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "2",
Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
},
}},
{Type: watch.Added, Object: makePod("p3", "3")},
},
expectedWatchRequests: 2,
expectedRequestOptions: []metav1.ListOptions{
{
SendInitialEvents: pointer.Bool(true),
AllowWatchBookmarks: true,
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
TimeoutSeconds: pointer.Int64(1),
},
{
AllowWatchBookmarks: true,
ResourceVersion: "3",
TimeoutSeconds: pointer.Int64(1),
},
},
expectedStoreContent: []v1.Pod{*makePod("p1", "1"), *makePod("p3", "3")},
expectedError: apierrors.NewResourceExpired("rv already expired"),
},
}
for _, s := range scenarios {
t.Run(s.name, func(t *testing.T) {
scenario := s // capture as local variable
listWatcher, store, reflector, stopCh := testData()
go func() {
for i, e := range scenario.watchEvents {
listWatcher.fakeWatcher.Action(e.Type, e.Object)
if i+1 == scenario.stopAfterWatchEvents {
listWatcher.StopAndRecreateWatch()
continue
}
if i+1 == scenario.closeAfterWatchEvents {
close(stopCh)
}
}
}()
listWatcher.watchOptionsPredicate = scenario.watchOptionsPredicate
listWatcher.closeAfterWatchRequests = scenario.closeAfterWatchRequests
listWatcher.customListResponse = scenario.podList
listWatcher.closeAfterListRequests = scenario.closeAfterListRequests
if scenario.disableUseWatchList {
reflector.UseWatchList = false
}
err := reflector.ListAndWatch(stopCh)
if scenario.expectedError != nil && err == nil {
t.Fatalf("expected error %q, got nil", scenario.expectedError)
}
if scenario.expectedError == nil && err != nil {
t.Fatalf("unexpected error: %v", err)
}
if scenario.expectedError != nil && err.Error() != scenario.expectedError.Error() {
t.Fatalf("expected error %q, got %q", scenario.expectedError, err.Error())
}
verifyWatchCounter(t, listWatcher, scenario.expectedWatchRequests)
verifyListCounter(t, listWatcher, scenario.expectedListRequests)
verifyRequestOptions(t, listWatcher, scenario.expectedRequestOptions)
verifyStore(t, store, scenario.expectedStoreContent)
})
}
}
func verifyRequestOptions(t *testing.T, lw *fakeListWatcher, expectedRequestOptions []metav1.ListOptions) {
if len(lw.requestOptions) != len(expectedRequestOptions) {
t.Fatalf("expected to receive exactly %v requests, got %v", len(expectedRequestOptions), len(lw.requestOptions))
}
for index, expectedRequestOption := range expectedRequestOptions {
actualRequestOption := lw.requestOptions[index]
if actualRequestOption.TimeoutSeconds == nil && expectedRequestOption.TimeoutSeconds != nil {
t.Fatalf("expected the request to specify TimeoutSeconds option but it didn't, actual = %#v, expected = %#v", actualRequestOption, expectedRequestOption)
}
if actualRequestOption.TimeoutSeconds != nil && expectedRequestOption.TimeoutSeconds == nil {
t.Fatalf("unexpected TimeoutSeconds option specified, actual = %#v, expected = %#v", actualRequestOption, expectedRequestOption)
}
// ignore actual values
actualRequestOption.TimeoutSeconds = nil
expectedRequestOption.TimeoutSeconds = nil
if !cmp.Equal(actualRequestOption, expectedRequestOption) {
t.Fatalf("expected %#v, got %#v", expectedRequestOption, actualRequestOption)
}
}
}
func verifyListCounter(t *testing.T, lw *fakeListWatcher, expectedListCounter int) {
if lw.listCounter != expectedListCounter {
t.Fatalf("unexpected number of LIST requests, got: %v, expected: %v", lw.listCounter, expectedListCounter)
}
}
func verifyWatchCounter(t *testing.T, lw *fakeListWatcher, expectedWatchCounter int) {
if lw.watchCounter != expectedWatchCounter {
t.Fatalf("unexpected number of WATCH requests, got: %v, expected: %v", lw.watchCounter, expectedWatchCounter)
}
}
type byName []v1.Pod
func (a byName) Len() int { return len(a) }
func (a byName) Less(i, j int) bool { return a[i].Name < a[j].Name }
func (a byName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func verifyStore(t *testing.T, s Store, expectedPods []v1.Pod) {
rawPods := s.List()
actualPods := []v1.Pod{}
for _, p := range rawPods {
actualPods = append(actualPods, *p.(*v1.Pod))
}
sort.Sort(byName(actualPods))
sort.Sort(byName(expectedPods))
if !cmp.Equal(actualPods, expectedPods, cmpopts.EquateEmpty()) {
t.Fatalf("unexpected store content, diff: %s", cmp.Diff(actualPods, expectedPods))
}
}
func makePod(name, rv string) *v1.Pod {
return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: name, ResourceVersion: rv}}
}
func testData() (*fakeListWatcher, Store, *Reflector, chan struct{}) {
s := NewStore(MetaNamespaceKeyFunc)
stopCh := make(chan struct{})
lw := &fakeListWatcher{
fakeWatcher: watch.NewFake(),
stop: func() {
close(stopCh)
},
}
r := NewReflector(lw, &v1.Pod{}, s, 0)
r.UseWatchList = true
return lw, s, r, stopCh
}
type fakeListWatcher struct {
lock sync.Mutex
fakeWatcher *watch.FakeWatcher
listCounter int
watchCounter int
closeAfterWatchRequests int
closeAfterListRequests int
stop func()
requestOptions []metav1.ListOptions
customListResponse *v1.PodList
watchOptionsPredicate func(options metav1.ListOptions) error
}
func (lw *fakeListWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
lw.listCounter++
lw.requestOptions = append(lw.requestOptions, options)
if lw.listCounter == lw.closeAfterListRequests {
lw.stop()
}
if lw.customListResponse != nil {
return lw.customListResponse, nil
}
return nil, fmt.Errorf("not implemented")
}
func (lw *fakeListWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
lw.watchCounter++
lw.requestOptions = append(lw.requestOptions, options)
if lw.watchCounter == lw.closeAfterWatchRequests {
lw.stop()
}
if lw.watchOptionsPredicate != nil {
if err := lw.watchOptionsPredicate(options); err != nil {
return nil, err
}
}
lw.lock.Lock()
defer lw.lock.Unlock()
return lw.fakeWatcher, nil
}
func (lw *fakeListWatcher) StopAndRecreateWatch() {
lw.lock.Lock()
defer lw.lock.Unlock()
lw.fakeWatcher.Stop()
lw.fakeWatcher = watch.NewFake()
}