mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 15:25:57 +00:00
Merge pull request #124642 from wojtek-t/resilient_watchcache_initialization
Implement ResilientWatchCacheInitialization
This commit is contained in:
commit
eef6c6082d
@ -1236,6 +1236,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
|
|||||||
|
|
||||||
genericfeatures.RemainingItemCount: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
|
genericfeatures.RemainingItemCount: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
|
||||||
|
|
||||||
|
genericfeatures.ResilientWatchCacheInitialization: {Default: true, PreRelease: featuregate.Beta},
|
||||||
|
|
||||||
genericfeatures.SeparateCacheWatchRPC: {Default: true, PreRelease: featuregate.Beta},
|
genericfeatures.SeparateCacheWatchRPC: {Default: true, PreRelease: featuregate.Beta},
|
||||||
|
|
||||||
genericfeatures.ServerSideApply: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29
|
genericfeatures.ServerSideApply: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29
|
||||||
|
@ -59,9 +59,74 @@ func TestChangeCRD(t *testing.T) {
|
|||||||
ns := "default"
|
ns := "default"
|
||||||
noxuNamespacedResourceClient := newNamespacedCustomResourceVersionedClient(ns, dynamicClient, noxuDefinition, "v1beta1")
|
noxuNamespacedResourceClient := newNamespacedCustomResourceVersionedClient(ns, dynamicClient, noxuDefinition, "v1beta1")
|
||||||
|
|
||||||
stopChan := make(chan struct{})
|
updateCRD := func() {
|
||||||
|
noxuDefinitionToUpdate, err := apiExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), noxuDefinition.Name, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(noxuDefinitionToUpdate.Spec.Versions) == 1 {
|
||||||
|
v2 := noxuDefinitionToUpdate.Spec.Versions[0]
|
||||||
|
v2.Name = "v2"
|
||||||
|
v2.Served = true
|
||||||
|
v2.Storage = false
|
||||||
|
noxuDefinitionToUpdate.Spec.Versions = append(noxuDefinitionToUpdate.Spec.Versions, v2)
|
||||||
|
} else {
|
||||||
|
noxuDefinitionToUpdate.Spec.Versions = noxuDefinitionToUpdate.Spec.Versions[0:1]
|
||||||
|
}
|
||||||
|
if _, err := apiExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Update(context.TODO(), noxuDefinitionToUpdate, metav1.UpdateOptions{}); err != nil && !apierrors.IsConflict(err) {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set up 10 watchers for custom resource.
|
||||||
|
// We can't exercise them in a loop the same way as get requests, as watchcache
|
||||||
|
// can reject them with 429 and Retry-After: 1 if it is uninitialized and even
|
||||||
|
// though 429 is automatically retried, with frequent watchcache terminations and
|
||||||
|
// reinitializations they could either end-up being rejected N times and fail or
|
||||||
|
// or not initialize until the last watchcache reinitialization and then not be
|
||||||
|
// terminated. Thus we exercise their termination explicitly at the beginning.
|
||||||
wg := &sync.WaitGroup{}
|
wg := &sync.WaitGroup{}
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(i int) {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
w, err := noxuNamespacedResourceClient.Watch(context.TODO(), metav1.ListOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error establishing watch: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for event := range w.ResultChan() {
|
||||||
|
switch event.Type {
|
||||||
|
case watch.Added, watch.Modified, watch.Deleted:
|
||||||
|
// all expected
|
||||||
|
default:
|
||||||
|
t.Errorf("unexpected watch event: %#v", event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Let all the established watches soak request loops soak
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
|
||||||
|
// Update CRD and ensure that all watches are gracefully terminated.
|
||||||
|
updateCRD()
|
||||||
|
|
||||||
|
drained := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer close(drained)
|
||||||
|
wg.Wait()
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-drained:
|
||||||
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
|
t.Fatal("timed out waiting for watchers to be terminated")
|
||||||
|
}
|
||||||
|
|
||||||
|
stopChan := make(chan struct{})
|
||||||
|
|
||||||
// Set up loop to modify CRD in the background
|
// Set up loop to modify CRD in the background
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
@ -76,28 +141,11 @@ func TestChangeCRD(t *testing.T) {
|
|||||||
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
|
||||||
noxuDefinitionToUpdate, err := apiExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), noxuDefinition.Name, metav1.GetOptions{})
|
updateCRD()
|
||||||
if err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if len(noxuDefinitionToUpdate.Spec.Versions) == 1 {
|
|
||||||
v2 := noxuDefinitionToUpdate.Spec.Versions[0]
|
|
||||||
v2.Name = "v2"
|
|
||||||
v2.Served = true
|
|
||||||
v2.Storage = false
|
|
||||||
noxuDefinitionToUpdate.Spec.Versions = append(noxuDefinitionToUpdate.Spec.Versions, v2)
|
|
||||||
} else {
|
|
||||||
noxuDefinitionToUpdate.Spec.Versions = noxuDefinitionToUpdate.Spec.Versions[0:1]
|
|
||||||
}
|
|
||||||
if _, err := apiExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Update(context.TODO(), noxuDefinitionToUpdate, metav1.UpdateOptions{}); err != nil && !apierrors.IsConflict(err) {
|
|
||||||
t.Error(err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Set up 10 loops creating and reading and watching custom resources
|
// Set up 10 loops creating and reading custom resources
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(i int) {
|
go func(i int) {
|
||||||
@ -120,32 +168,6 @@ func TestChangeCRD(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}(i)
|
}(i)
|
||||||
|
|
||||||
wg.Add(1)
|
|
||||||
go func(i int) {
|
|
||||||
defer wg.Done()
|
|
||||||
for {
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
select {
|
|
||||||
case <-stopChan:
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
w, err := noxuNamespacedResourceClient.Watch(context.TODO(), metav1.ListOptions{})
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("unexpected error establishing watch: %v", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
for event := range w.ResultChan() {
|
|
||||||
switch event.Type {
|
|
||||||
case watch.Added, watch.Modified, watch.Deleted:
|
|
||||||
// all expected
|
|
||||||
default:
|
|
||||||
t.Errorf("unexpected watch event: %#v", event)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}(i)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Let all the established get request loops soak
|
// Let all the established get request loops soak
|
||||||
@ -155,7 +177,7 @@ func TestChangeCRD(t *testing.T) {
|
|||||||
close(stopChan)
|
close(stopChan)
|
||||||
|
|
||||||
// Let loops drain
|
// Let loops drain
|
||||||
drained := make(chan struct{})
|
drained = make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
defer close(drained)
|
defer close(drained)
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
@ -164,6 +186,6 @@ func TestChangeCRD(t *testing.T) {
|
|||||||
select {
|
select {
|
||||||
case <-drained:
|
case <-drained:
|
||||||
case <-time.After(wait.ForeverTestTimeout):
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
t.Error("timed out waiting for clients to complete")
|
t.Fatal("timed out waiting for clients to complete")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -173,6 +173,13 @@ const (
|
|||||||
// to a chunking list request.
|
// to a chunking list request.
|
||||||
RemainingItemCount featuregate.Feature = "RemainingItemCount"
|
RemainingItemCount featuregate.Feature = "RemainingItemCount"
|
||||||
|
|
||||||
|
// owner: @wojtek-t
|
||||||
|
// beta: v1.31
|
||||||
|
//
|
||||||
|
// Enables resilient watchcache initialization to avoid controlplane
|
||||||
|
// overload.
|
||||||
|
ResilientWatchCacheInitialization featuregate.Feature = "ResilientWatchCacheInitialization"
|
||||||
|
|
||||||
// owner: @serathius
|
// owner: @serathius
|
||||||
// beta: v1.30
|
// beta: v1.30
|
||||||
//
|
//
|
||||||
@ -353,6 +360,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
|
|||||||
|
|
||||||
RemainingItemCount: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.32
|
RemainingItemCount: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.32
|
||||||
|
|
||||||
|
ResilientWatchCacheInitialization: {Default: true, PreRelease: featuregate.Beta},
|
||||||
|
|
||||||
RetryGenerateName: {Default: true, PreRelease: featuregate.Beta},
|
RetryGenerateName: {Default: true, PreRelease: featuregate.Beta},
|
||||||
|
|
||||||
SeparateCacheWatchRPC: {Default: true, PreRelease: featuregate.Beta},
|
SeparateCacheWatchRPC: {Default: true, PreRelease: featuregate.Beta},
|
||||||
|
@ -532,9 +532,18 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
readyGeneration, err := c.ready.waitAndReadGeneration(ctx)
|
var readyGeneration int
|
||||||
if err != nil {
|
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
return nil, errors.NewServiceUnavailable(err.Error())
|
var ok bool
|
||||||
|
readyGeneration, ok = c.ready.checkAndReadGeneration()
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.NewTooManyRequests("storage is (re)initializing", 1)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
readyGeneration, err = c.ready.waitAndReadGeneration(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.NewServiceUnavailable(err.Error())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// determine the namespace and name scope of the watch, first from the request, secondarily from the field selector
|
// determine the namespace and name scope of the watch, first from the request, secondarily from the field selector
|
||||||
@ -676,6 +685,14 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o
|
|||||||
return c.storage.Get(ctx, key, opts, objPtr)
|
return c.storage.Get(ctx, key, opts, objPtr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
|
if !c.ready.check() {
|
||||||
|
// If Cache is not initialized, delegate Get requests to storage
|
||||||
|
// as described in https://kep.k8s.io/4568
|
||||||
|
return c.storage.Get(ctx, key, opts, objPtr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// If resourceVersion is specified, serve it from cache.
|
// If resourceVersion is specified, serve it from cache.
|
||||||
// It's guaranteed that the returned value is at least that
|
// It's guaranteed that the returned value is at least that
|
||||||
// fresh as the given resourceVersion.
|
// fresh as the given resourceVersion.
|
||||||
@ -684,16 +701,18 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if getRV == 0 && !c.ready.check() {
|
|
||||||
// If Cacher is not yet initialized and we don't require any specific
|
|
||||||
// minimal resource version, simply forward the request to storage.
|
|
||||||
return c.storage.Get(ctx, key, opts, objPtr)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Do not create a trace - it's not for free and there are tons
|
// Do not create a trace - it's not for free and there are tons
|
||||||
// of Get requests. We can add it if it will be really needed.
|
// of Get requests. We can add it if it will be really needed.
|
||||||
if err := c.ready.wait(ctx); err != nil {
|
|
||||||
return errors.NewServiceUnavailable(err.Error())
|
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
|
if getRV == 0 && !c.ready.check() {
|
||||||
|
// If Cacher is not yet initialized and we don't require any specific
|
||||||
|
// minimal resource version, simply forward the request to storage.
|
||||||
|
return c.storage.Get(ctx, key, opts, objPtr)
|
||||||
|
}
|
||||||
|
if err := c.ready.wait(ctx); err != nil {
|
||||||
|
return errors.NewServiceUnavailable(err.Error())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
objVal, err := conversion.EnforcePtr(objPtr)
|
objVal, err := conversion.EnforcePtr(objPtr)
|
||||||
@ -743,6 +762,14 @@ func shouldDelegateList(opts storage.ListOptions) bool {
|
|||||||
return consistentReadFromStorage || hasContinuation || hasLimit || unsupportedMatch
|
return consistentReadFromStorage || hasContinuation || hasLimit || unsupportedMatch
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func shouldDelegateListOnNotReadyCache(opts storage.ListOptions) bool {
|
||||||
|
pred := opts.Predicate
|
||||||
|
noLabelSelector := pred.Label == nil || pred.Label.Empty()
|
||||||
|
noFieldSelector := pred.Field == nil || pred.Field.Empty()
|
||||||
|
hasLimit := pred.Limit > 0
|
||||||
|
return noLabelSelector && noFieldSelector && hasLimit
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred storage.SelectionPredicate, recursive bool) ([]interface{}, uint64, string, error) {
|
func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred storage.SelectionPredicate, recursive bool) ([]interface{}, uint64, string, error) {
|
||||||
if !recursive {
|
if !recursive {
|
||||||
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(ctx, listRV, key)
|
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(ctx, listRV, key)
|
||||||
@ -770,10 +797,19 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if listRV == 0 && !c.ready.check() {
|
|
||||||
// If Cacher is not yet initialized and we don't require any specific
|
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
// minimal resource version, simply forward the request to storage.
|
if !c.ready.check() && shouldDelegateListOnNotReadyCache(opts) {
|
||||||
return c.storage.GetList(ctx, key, opts, listObj)
|
// If Cacher is not initialized, delegate List requests to storage
|
||||||
|
// as described in https://kep.k8s.io/4568
|
||||||
|
return c.storage.GetList(ctx, key, opts, listObj)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if listRV == 0 && !c.ready.check() {
|
||||||
|
// If Cacher is not yet initialized and we don't require any specific
|
||||||
|
// minimal resource version, simply forward the request to storage.
|
||||||
|
return c.storage.GetList(ctx, key, opts, listObj)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
|
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
|
||||||
if resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported {
|
if resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported {
|
||||||
@ -788,8 +824,16 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
|
|||||||
attribute.Stringer("type", c.groupResource))
|
attribute.Stringer("type", c.groupResource))
|
||||||
defer span.End(500 * time.Millisecond)
|
defer span.End(500 * time.Millisecond)
|
||||||
|
|
||||||
if err := c.ready.wait(ctx); err != nil {
|
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
return errors.NewServiceUnavailable(err.Error())
|
if !c.ready.check() {
|
||||||
|
// If Cacher is not initialized, reject List requests
|
||||||
|
// as described in https://kep.k8s.io/4568
|
||||||
|
return errors.NewTooManyRequests("storage is (re)initializing", 1)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if err := c.ready.wait(ctx); err != nil {
|
||||||
|
return errors.NewServiceUnavailable(err.Error())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
span.AddEvent("Ready")
|
span.AddEvent("Ready")
|
||||||
|
|
||||||
|
@ -464,14 +464,25 @@ func testSetupWithEtcdServer(t *testing.T, opts ...setupOption) (context.Context
|
|||||||
t.Fatalf("Failed to inject list errors: %v", err)
|
t.Fatalf("Failed to inject list errors: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
|
// The tests assume that Get/GetList/Watch calls shouldn't fail.
|
||||||
|
// However, 429 error can now be returned if watchcache is under initialization.
|
||||||
|
// To avoid rewriting all tests, we wait for watcache to initialize.
|
||||||
|
if err := cacher.ready.wait(ctx); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return ctx, cacher, server, terminate
|
return ctx, cacher, server, terminate
|
||||||
}
|
}
|
||||||
|
|
||||||
func testSetupWithEtcdAndCreateWrapper(t *testing.T, opts ...setupOption) (storage.Interface, tearDownFunc) {
|
func testSetupWithEtcdAndCreateWrapper(t *testing.T, opts ...setupOption) (storage.Interface, tearDownFunc) {
|
||||||
_, cacher, _, tearDown := testSetupWithEtcdServer(t, opts...)
|
_, cacher, _, tearDown := testSetupWithEtcdServer(t, opts...)
|
||||||
|
|
||||||
if err := cacher.ready.wait(context.TODO()); err != nil {
|
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
t.Fatalf("unexpected error waiting for the cache to be ready")
|
if err := cacher.ready.wait(context.TODO()); err != nil {
|
||||||
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return &createWrapper{Cacher: cacher}, tearDown
|
return &createWrapper{Cacher: cacher}, tearDown
|
||||||
}
|
}
|
||||||
|
@ -53,7 +53,7 @@ import (
|
|||||||
"k8s.io/utils/pointer"
|
"k8s.io/utils/pointer"
|
||||||
)
|
)
|
||||||
|
|
||||||
func newTestCacher(s storage.Interface) (*Cacher, storage.Versioner, error) {
|
func newTestCacherWithoutSyncing(s storage.Interface) (*Cacher, storage.Versioner, error) {
|
||||||
prefix := "pods"
|
prefix := "pods"
|
||||||
config := Config{
|
config := Config{
|
||||||
Storage: s,
|
Storage: s,
|
||||||
@ -79,9 +79,27 @@ func newTestCacher(s storage.Interface) (*Cacher, storage.Versioner, error) {
|
|||||||
Clock: clock.RealClock{},
|
Clock: clock.RealClock{},
|
||||||
}
|
}
|
||||||
cacher, err := NewCacherFromConfig(config)
|
cacher, err := NewCacherFromConfig(config)
|
||||||
|
|
||||||
return cacher, storage.APIObjectVersioner{}, err
|
return cacher, storage.APIObjectVersioner{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newTestCacher(s storage.Interface) (*Cacher, storage.Versioner, error) {
|
||||||
|
cacher, versioner, err := newTestCacherWithoutSyncing(s)
|
||||||
|
if err != nil {
|
||||||
|
return nil, versioner, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
|
// The tests assume that Get/GetList/Watch calls shouldn't fail.
|
||||||
|
// However, 429 error can now be returned if watchcache is under initialization.
|
||||||
|
// To avoid rewriting all tests, we wait for watcache to initialize.
|
||||||
|
if err := cacher.ready.wait(context.Background()); err != nil {
|
||||||
|
return nil, storage.APIObjectVersioner{}, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return cacher, versioner, nil
|
||||||
|
}
|
||||||
|
|
||||||
type dummyStorage struct {
|
type dummyStorage struct {
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
err error
|
err error
|
||||||
@ -222,10 +240,12 @@ func testGetListCacheBypass(t *testing.T, options storage.ListOptions, expectByp
|
|||||||
|
|
||||||
result := &example.PodList{}
|
result := &example.PodList{}
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
if err := cacher.ready.wait(context.Background()); err != nil {
|
if err := cacher.ready.wait(context.Background()); err != nil {
|
||||||
t.Fatalf("unexpected error waiting for the cache to be ready")
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Inject error to underlying layer and check if cacher is not bypassed.
|
// Inject error to underlying layer and check if cacher is not bypassed.
|
||||||
backingStorage.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
backingStorage.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
||||||
currentResourceVersion := "42"
|
currentResourceVersion := "42"
|
||||||
@ -267,9 +287,10 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) {
|
|||||||
}
|
}
|
||||||
result := &example.PodList{}
|
result := &example.PodList{}
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
if err := cacher.ready.wait(context.Background()); err != nil {
|
if err := cacher.ready.wait(context.Background()); err != nil {
|
||||||
t.Fatalf("unexpected error waiting for the cache to be ready")
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Inject error to underlying layer and check if cacher is not bypassed.
|
// Inject error to underlying layer and check if cacher is not bypassed.
|
||||||
@ -301,9 +322,10 @@ func TestGetCacheBypass(t *testing.T) {
|
|||||||
|
|
||||||
result := &example.Pod{}
|
result := &example.Pod{}
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
if err := cacher.ready.wait(context.Background()); err != nil {
|
if err := cacher.ready.wait(context.Background()); err != nil {
|
||||||
t.Fatalf("unexpected error waiting for the cache to be ready")
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Inject error to underlying layer and check if cacher is not bypassed.
|
// Inject error to underlying layer and check if cacher is not bypassed.
|
||||||
@ -333,9 +355,10 @@ func TestWatchCacheBypass(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
if err := cacher.ready.wait(context.Background()); err != nil {
|
if err := cacher.ready.wait(context.Background()); err != nil {
|
||||||
t.Fatalf("unexpected error waiting for the cache to be ready")
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
|
_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
|
||||||
@ -375,6 +398,43 @@ func TestWatchCacheBypass(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTooManyRequestsNotReturned(t *testing.T) {
|
||||||
|
// Ensure that with ResilientWatchCacheInitialization feature disabled, we don't return 429
|
||||||
|
// errors when watchcache is not initialized.
|
||||||
|
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ResilientWatchCacheInitialization, false)
|
||||||
|
|
||||||
|
dummyErr := fmt.Errorf("dummy")
|
||||||
|
backingStorage := &dummyStorage{err: dummyErr}
|
||||||
|
cacher, _, err := newTestCacherWithoutSyncing(backingStorage)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
|
}
|
||||||
|
defer cacher.Stop()
|
||||||
|
|
||||||
|
opts := storage.ListOptions{
|
||||||
|
ResourceVersion: "0",
|
||||||
|
Predicate: storage.Everything,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cancel the request so that it doesn't hang forever.
|
||||||
|
listCtx, listCancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
|
||||||
|
defer listCancel()
|
||||||
|
|
||||||
|
result := &example.PodList{}
|
||||||
|
err = cacher.GetList(listCtx, "/pods/ns", opts, result)
|
||||||
|
if err != nil && apierrors.IsTooManyRequests(err) {
|
||||||
|
t.Errorf("Unexpected 429 error without ResilientWatchCacheInitialization feature for List")
|
||||||
|
}
|
||||||
|
|
||||||
|
watchCtx, watchCancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
|
||||||
|
defer watchCancel()
|
||||||
|
|
||||||
|
_, err = cacher.Watch(watchCtx, "/pods/ns", opts)
|
||||||
|
if err != nil && apierrors.IsTooManyRequests(err) {
|
||||||
|
t.Errorf("Unexpected 429 error without ResilientWatchCacheInitialization feature for Watch")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestEmptyWatchEventCache(t *testing.T) {
|
func TestEmptyWatchEventCache(t *testing.T) {
|
||||||
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
|
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
@ -471,7 +531,7 @@ func TestWatchNotHangingOnStartupFailure(t *testing.T) {
|
|||||||
// constantly failing lists to the underlying storage.
|
// constantly failing lists to the underlying storage.
|
||||||
dummyErr := fmt.Errorf("dummy")
|
dummyErr := fmt.Errorf("dummy")
|
||||||
backingStorage := &dummyStorage{err: dummyErr}
|
backingStorage := &dummyStorage{err: dummyErr}
|
||||||
cacher, _, err := newTestCacher(backingStorage)
|
cacher, _, err := newTestCacherWithoutSyncing(backingStorage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Couldn't create cacher: %v", err)
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
}
|
}
|
||||||
@ -489,8 +549,14 @@ func TestWatchNotHangingOnStartupFailure(t *testing.T) {
|
|||||||
// Ensure that it terminates when its context is cancelled
|
// Ensure that it terminates when its context is cancelled
|
||||||
// (e.g. the request is terminated for whatever reason).
|
// (e.g. the request is terminated for whatever reason).
|
||||||
_, err = cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0"})
|
_, err = cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0"})
|
||||||
if err == nil || err.Error() != apierrors.NewServiceUnavailable(context.Canceled.Error()).Error() {
|
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
t.Errorf("Unexpected error: %#v", err)
|
if err == nil || err.Error() != apierrors.NewServiceUnavailable(context.Canceled.Error()).Error() {
|
||||||
|
t.Errorf("Unexpected error: %#v", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if err == nil || err.Error() != apierrors.NewTooManyRequests("storage is (re)initializing", 1).Error() {
|
||||||
|
t.Errorf("Unexpected error: %#v", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -502,9 +568,10 @@ func TestWatcherNotGoingBackInTime(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
if err := cacher.ready.wait(context.Background()); err != nil {
|
if err := cacher.ready.wait(context.Background()); err != nil {
|
||||||
t.Fatalf("unexpected error waiting for the cache to be ready")
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure there is some budget for slowing down processing.
|
// Ensure there is some budget for slowing down processing.
|
||||||
@ -588,9 +655,10 @@ func TestCacherDontAcceptRequestsStopped(t *testing.T) {
|
|||||||
t.Fatalf("Couldn't create cacher: %v", err)
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
if err := cacher.ready.wait(context.Background()); err != nil {
|
if err := cacher.ready.wait(context.Background()); err != nil {
|
||||||
t.Fatalf("unexpected error waiting for the cache to be ready")
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
w, err := cacher.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
w, err := cacher.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
||||||
@ -623,17 +691,32 @@ func TestCacherDontAcceptRequestsStopped(t *testing.T) {
|
|||||||
IgnoreNotFound: true,
|
IgnoreNotFound: true,
|
||||||
ResourceVersion: "1",
|
ResourceVersion: "1",
|
||||||
}, result)
|
}, result)
|
||||||
if err == nil {
|
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
t.Fatalf("Success to create Get: %v", err)
|
if err == nil {
|
||||||
|
t.Fatalf("Success to create Get: %v", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to get object: %v:", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
listResult := &example.PodList{}
|
listResult := &example.PodList{}
|
||||||
err = cacher.GetList(context.TODO(), "pods/ns", storage.ListOptions{
|
err = cacher.GetList(context.TODO(), "pods/ns", storage.ListOptions{
|
||||||
ResourceVersion: "1",
|
ResourceVersion: "1",
|
||||||
Recursive: true,
|
Recursive: true,
|
||||||
|
Predicate: storage.SelectionPredicate{
|
||||||
|
Limit: 500,
|
||||||
|
},
|
||||||
}, listResult)
|
}, listResult)
|
||||||
if err == nil {
|
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
t.Fatalf("Success to create GetList: %v", err)
|
if err == nil {
|
||||||
|
t.Fatalf("Success to create GetList: %v", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to list objects: %v", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
@ -762,10 +845,12 @@ func TestCacherNoLeakWithMultipleWatchers(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
if err := cacher.ready.wait(context.Background()); err != nil {
|
if err := cacher.ready.wait(context.Background()); err != nil {
|
||||||
t.Fatalf("unexpected error waiting for the cache to be ready")
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pred := storage.Everything
|
pred := storage.Everything
|
||||||
pred.AllowWatchBookmarks = true
|
pred.AllowWatchBookmarks = true
|
||||||
|
|
||||||
@ -841,9 +926,10 @@ func testCacherSendBookmarkEvents(t *testing.T, allowWatchBookmarks, expectedBoo
|
|||||||
}
|
}
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
if err := cacher.ready.wait(context.Background()); err != nil {
|
if err := cacher.ready.wait(context.Background()); err != nil {
|
||||||
t.Fatalf("unexpected error waiting for the cache to be ready")
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
pred := storage.Everything
|
pred := storage.Everything
|
||||||
pred.AllowWatchBookmarks = allowWatchBookmarks
|
pred.AllowWatchBookmarks = allowWatchBookmarks
|
||||||
@ -941,9 +1027,10 @@ func TestCacherSendsMultipleWatchBookmarks(t *testing.T) {
|
|||||||
// resolution how frequency we recompute.
|
// resolution how frequency we recompute.
|
||||||
cacher.bookmarkWatchers.bookmarkFrequency = time.Second
|
cacher.bookmarkWatchers.bookmarkFrequency = time.Second
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
if err := cacher.ready.wait(context.Background()); err != nil {
|
if err := cacher.ready.wait(context.Background()); err != nil {
|
||||||
t.Fatalf("unexpected error waiting for the cache to be ready")
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
pred := storage.Everything
|
pred := storage.Everything
|
||||||
pred.AllowWatchBookmarks = true
|
pred.AllowWatchBookmarks = true
|
||||||
@ -1011,9 +1098,10 @@ func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
if err := cacher.ready.wait(context.Background()); err != nil {
|
if err := cacher.ready.wait(context.Background()); err != nil {
|
||||||
t.Fatalf("unexpected error waiting for the cache to be ready")
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure there is some budget for slowing down processing.
|
// Ensure there is some budget for slowing down processing.
|
||||||
@ -1089,9 +1177,10 @@ func TestBookmarksOnResourceVersionUpdates(t *testing.T) {
|
|||||||
// Ensure that bookmarks are sent more frequently than every 1m.
|
// Ensure that bookmarks are sent more frequently than every 1m.
|
||||||
cacher.bookmarkWatchers = newTimeBucketWatchers(clock.RealClock{}, 2*time.Second)
|
cacher.bookmarkWatchers = newTimeBucketWatchers(clock.RealClock{}, 2*time.Second)
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
if err := cacher.ready.wait(context.Background()); err != nil {
|
if err := cacher.ready.wait(context.Background()); err != nil {
|
||||||
t.Fatalf("unexpected error waiting for the cache to be ready")
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
makePod := func(i int) *examplev1.Pod {
|
makePod := func(i int) *examplev1.Pod {
|
||||||
@ -1167,9 +1256,10 @@ func TestStartingResourceVersion(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
if err := cacher.ready.wait(context.Background()); err != nil {
|
if err := cacher.ready.wait(context.Background()); err != nil {
|
||||||
t.Fatalf("unexpected error waiting for the cache to be ready")
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure there is some budget for slowing down processing.
|
// Ensure there is some budget for slowing down processing.
|
||||||
@ -1247,9 +1337,10 @@ func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
if err := cacher.ready.wait(context.Background()); err != nil {
|
if err := cacher.ready.wait(context.Background()); err != nil {
|
||||||
t.Fatalf("unexpected error waiting for the cache to be ready")
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure there is some budget for slowing down processing.
|
// Ensure there is some budget for slowing down processing.
|
||||||
@ -1389,9 +1480,10 @@ func TestCachingDeleteEvents(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
if err := cacher.ready.wait(context.Background()); err != nil {
|
if err := cacher.ready.wait(context.Background()); err != nil {
|
||||||
t.Fatalf("unexpected error waiting for the cache to be ready")
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fooPredicate := storage.SelectionPredicate{
|
fooPredicate := storage.SelectionPredicate{
|
||||||
@ -1471,9 +1563,10 @@ func testCachingObjects(t *testing.T, watchersCount int) {
|
|||||||
}
|
}
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
if err := cacher.ready.wait(context.Background()); err != nil {
|
if err := cacher.ready.wait(context.Background()); err != nil {
|
||||||
t.Fatalf("unexpected error waiting for the cache to be ready")
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
dispatchedEvents := []*watchCacheEvent{}
|
dispatchedEvents := []*watchCacheEvent{}
|
||||||
@ -1567,10 +1660,12 @@ func TestCacheIntervalInvalidationStopsWatch(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
if err := cacher.ready.wait(context.Background()); err != nil {
|
if err := cacher.ready.wait(context.Background()); err != nil {
|
||||||
t.Fatalf("unexpected error waiting for the cache to be ready")
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure there is enough budget for slow processing since
|
// Ensure there is enough budget for slow processing since
|
||||||
// the entire watch cache is going to be served through the
|
// the entire watch cache is going to be served through the
|
||||||
// interval and events won't be popped from the cacheWatcher's
|
// interval and events won't be popped from the cacheWatcher's
|
||||||
@ -1754,8 +1849,11 @@ func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) {
|
|||||||
t.Fatalf("Couldn't create cacher: %v", err)
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
}
|
}
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
if err := cacher.ready.wait(context.Background()); err != nil {
|
|
||||||
t.Fatalf("unexpected error waiting for the cache to be ready")
|
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
|
if err := cacher.ready.wait(context.Background()); err != nil {
|
||||||
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
w, err := cacher.Watch(context.Background(), "pods/ns", scenario.opts)
|
w, err := cacher.Watch(context.Background(), "pods/ns", scenario.opts)
|
||||||
@ -1911,9 +2009,10 @@ func TestWatchListIsSynchronisedWhenNoEventsFromStoreReceived(t *testing.T) {
|
|||||||
require.NoError(t, err, "failed to create cacher")
|
require.NoError(t, err, "failed to create cacher")
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
// wait until cacher is initialized.
|
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
if err := cacher.ready.wait(context.Background()); err != nil {
|
if err := cacher.ready.wait(context.Background()); err != nil {
|
||||||
t.Fatalf("unexpected error waiting for the cache to be ready")
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pred := storage.Everything
|
pred := storage.Everything
|
||||||
@ -1942,9 +2041,10 @@ func TestForgetWatcher(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
// wait until cacher is initialized.
|
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
if err := cacher.ready.wait(context.Background()); err != nil {
|
if err := cacher.ready.wait(context.Background()); err != nil {
|
||||||
t.Fatalf("unexpected error waiting for the cache to be ready")
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
assertCacherInternalState := func(expectedWatchersCounter, expectedValueWatchersCounter int) {
|
assertCacherInternalState := func(expectedWatchersCounter, expectedValueWatchersCounter int) {
|
||||||
@ -2334,8 +2434,11 @@ func TestGetBookmarkAfterResourceVersionLockedFunc(t *testing.T) {
|
|||||||
require.NoError(t, err, "couldn't create cacher")
|
require.NoError(t, err, "couldn't create cacher")
|
||||||
|
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
if err := cacher.ready.wait(context.Background()); err != nil {
|
|
||||||
t.Fatalf("unexpected error waiting for the cache to be ready")
|
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
|
if err := cacher.ready.wait(context.Background()); err != nil {
|
||||||
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cacher.watchCache.UpdateResourceVersion(fmt.Sprintf("%d", scenario.watchCacheResourceVersion))
|
cacher.watchCache.UpdateResourceVersion(fmt.Sprintf("%d", scenario.watchCacheResourceVersion))
|
||||||
@ -2395,8 +2498,11 @@ func TestWatchStreamSeparation(t *testing.T) {
|
|||||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SeparateCacheWatchRPC, tc.separateCacheWatchRPC)
|
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SeparateCacheWatchRPC, tc.separateCacheWatchRPC)
|
||||||
_, cacher, _, terminate := testSetupWithEtcdServer(t)
|
_, cacher, _, terminate := testSetupWithEtcdServer(t)
|
||||||
t.Cleanup(terminate)
|
t.Cleanup(terminate)
|
||||||
if err := cacher.ready.wait(context.TODO()); err != nil {
|
|
||||||
t.Fatalf("unexpected error waiting for the cache to be ready")
|
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
|
if err := cacher.ready.wait(context.TODO()); err != nil {
|
||||||
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
getCacherRV := func() uint64 {
|
getCacherRV := func() uint64 {
|
||||||
|
@ -697,13 +697,59 @@ func TestBackoffOnTooManyRequests(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
r.ListAndWatch(stopCh)
|
if err := r.ListAndWatch(stopCh); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
close(stopCh)
|
close(stopCh)
|
||||||
if bm.calls != 2 {
|
if bm.calls != 2 {
|
||||||
t.Errorf("unexpected watch backoff calls: %d", bm.calls)
|
t.Errorf("unexpected watch backoff calls: %d", bm.calls)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNoRelistOnTooManyRequests(t *testing.T) {
|
||||||
|
err := apierrors.NewTooManyRequests("too many requests", 1)
|
||||||
|
clock := &clock.RealClock{}
|
||||||
|
bm := &fakeBackoff{clock: clock}
|
||||||
|
listCalls, watchCalls := 0, 0
|
||||||
|
|
||||||
|
lw := &testLW{
|
||||||
|
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||||
|
listCalls++
|
||||||
|
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
|
||||||
|
},
|
||||||
|
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||||
|
watchCalls++
|
||||||
|
if watchCalls < 5 {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
w := watch.NewFake()
|
||||||
|
w.Stop()
|
||||||
|
return w, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
r := &Reflector{
|
||||||
|
name: "test-reflector",
|
||||||
|
listerWatcher: lw,
|
||||||
|
store: NewFIFO(MetaNamespaceKeyFunc),
|
||||||
|
backoffManager: bm,
|
||||||
|
clock: clock,
|
||||||
|
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
|
||||||
|
}
|
||||||
|
|
||||||
|
stopCh := make(chan struct{})
|
||||||
|
if err := r.ListAndWatch(stopCh); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
close(stopCh)
|
||||||
|
if listCalls != 1 {
|
||||||
|
t.Errorf("unexpected list calls: %d", listCalls)
|
||||||
|
}
|
||||||
|
if watchCalls != 5 {
|
||||||
|
t.Errorf("unexpected watch calls: %d", watchCalls)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestRetryInternalError(t *testing.T) {
|
func TestRetryInternalError(t *testing.T) {
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
name string
|
name string
|
||||||
|
Loading…
Reference in New Issue
Block a user