mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-03 17:30:00 +00:00
restore event Gone test
This commit is contained in:
parent
0f7de876a5
commit
f3cbfc3f7e
@ -57,6 +57,11 @@ var (
|
|||||||
codecs = serializer.NewCodecFactory(scheme)
|
codecs = serializer.NewCodecFactory(scheme)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// watchCacheDefaultCapacity syncs watch cache defaultLowerBoundCapacity.
|
||||||
|
watchCacheDefaultCapacity = 100
|
||||||
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion)
|
metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion)
|
||||||
utilruntime.Must(example.AddToScheme(scheme))
|
utilruntime.Must(example.AddToScheme(scheme))
|
||||||
@ -101,6 +106,10 @@ func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServ
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newTestCacher(s storage.Interface, cap int) (*cacherstorage.Cacher, storage.Versioner, error) {
|
func newTestCacher(s storage.Interface, cap int) (*cacherstorage.Cacher, storage.Versioner, error) {
|
||||||
|
return newTestCacherWithClock(s, cap, clock.RealClock{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTestCacherWithClock(s storage.Interface, cap int, clock clock.Clock) (*cacherstorage.Cacher, storage.Versioner, error) {
|
||||||
prefix := "pods"
|
prefix := "pods"
|
||||||
v := etcd3.APIObjectVersioner{}
|
v := etcd3.APIObjectVersioner{}
|
||||||
config := cacherstorage.Config{
|
config := cacherstorage.Config{
|
||||||
@ -113,7 +122,7 @@ func newTestCacher(s storage.Interface, cap int) (*cacherstorage.Cacher, storage
|
|||||||
NewFunc: func() runtime.Object { return &example.Pod{} },
|
NewFunc: func() runtime.Object { return &example.Pod{} },
|
||||||
NewListFunc: func() runtime.Object { return &example.PodList{} },
|
NewListFunc: func() runtime.Object { return &example.PodList{} },
|
||||||
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
|
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
|
||||||
Clock: clock.RealClock{},
|
Clock: clock,
|
||||||
}
|
}
|
||||||
cacher, err := cacherstorage.NewCacherFromConfig(config)
|
cacher, err := cacherstorage.NewCacherFromConfig(config)
|
||||||
return cacher, v, err
|
return cacher, v, err
|
||||||
@ -396,7 +405,8 @@ func TestWatch(t *testing.T) {
|
|||||||
// Inject one list error to make sure we test the relist case.
|
// Inject one list error to make sure we test the relist case.
|
||||||
etcdStorage = &injectListError{errors: 1, Interface: etcdStorage}
|
etcdStorage = &injectListError{errors: 1, Interface: etcdStorage}
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
cacher, _, err := newTestCacher(etcdStorage, 3) // small capacity to trigger "too old version" error
|
fakeClock := clock.NewFakeClock(time.Now())
|
||||||
|
cacher, _, err := newTestCacherWithClock(etcdStorage, watchCacheDefaultCapacity, fakeClock)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Couldn't create cacher: %v", err)
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
}
|
}
|
||||||
@ -439,15 +449,6 @@ func TestWatch(t *testing.T) {
|
|||||||
verifyWatchEvent(t, watcher, watch.Added, podFoo)
|
verifyWatchEvent(t, watcher, watch.Added, podFoo)
|
||||||
verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)
|
verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)
|
||||||
|
|
||||||
// Check whether we get too-old error via the watch channel
|
|
||||||
tooOldWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "1", Predicate: storage.Everything})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Expected no direct error, got %v", err)
|
|
||||||
}
|
|
||||||
defer tooOldWatcher.Stop()
|
|
||||||
// Events happens in eventFreshDuration, cache expand without event "Gone".
|
|
||||||
verifyWatchEvent(t, tooOldWatcher, watch.Added, podFoo)
|
|
||||||
|
|
||||||
initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: fooCreated.ResourceVersion, Predicate: storage.Everything})
|
initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: fooCreated.ResourceVersion, Predicate: storage.Everything})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
@ -468,6 +469,25 @@ func TestWatch(t *testing.T) {
|
|||||||
_ = updatePod(t, etcdStorage, podFooBis, fooUpdated)
|
_ = updatePod(t, etcdStorage, podFooBis, fooUpdated)
|
||||||
|
|
||||||
verifyWatchEvent(t, nowWatcher, watch.Modified, podFooBis)
|
verifyWatchEvent(t, nowWatcher, watch.Modified, podFooBis)
|
||||||
|
|
||||||
|
// Add watchCacheDefaultCapacity events to make current watch cache full.
|
||||||
|
// Make start and last event duration exceed eventFreshDuration(current 75s) to ensure watch cache won't expand.
|
||||||
|
for i := 0; i < watchCacheDefaultCapacity; i++ {
|
||||||
|
fakeClock.SetTime(time.Now().Add(time.Duration(i) * time.Minute))
|
||||||
|
podFoo := makeTestPod(fmt.Sprintf("foo-%d", i))
|
||||||
|
updatePod(t, etcdStorage, podFoo, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check whether we get too-old error via the watch channel
|
||||||
|
tooOldWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "1", Predicate: storage.Everything})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Expected no direct error, got %v", err)
|
||||||
|
}
|
||||||
|
defer tooOldWatcher.Stop()
|
||||||
|
|
||||||
|
// Ensure we get a "Gone" error.
|
||||||
|
expectedResourceExpiredError := errors.NewResourceExpired("").ErrStatus
|
||||||
|
verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedResourceExpiredError)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWatcherTimeout(t *testing.T) {
|
func TestWatcherTimeout(t *testing.T) {
|
||||||
|
Loading…
Reference in New Issue
Block a user