mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #109914 from stevekuznetsov/skuznets/generic-watch-cancel-test
storage/testing: move cancelled watch test to generic package
This commit is contained in:
commit
ccb7118c22
@ -137,23 +137,7 @@ func TestWatchError(t *testing.T) {
|
|||||||
|
|
||||||
func TestWatchContextCancel(t *testing.T) {
|
func TestWatchContextCancel(t *testing.T) {
|
||||||
ctx, store, _ := testSetup(t)
|
ctx, store, _ := testSetup(t)
|
||||||
canceledCtx, cancel := context.WithCancel(ctx)
|
storagetesting.RunTestWatchContextCancel(ctx, t, store)
|
||||||
cancel()
|
|
||||||
// When we watch with a canceled context, we should detect that it's context canceled.
|
|
||||||
// We won't take it as error and also close the watcher.
|
|
||||||
w, err := store.watcher.Watch(canceledCtx, "/abc", 0, false, false, storage.Everything)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case _, ok := <-w.ResultChan():
|
|
||||||
if ok {
|
|
||||||
t.Error("ResultChan() should be closed")
|
|
||||||
}
|
|
||||||
case <-time.After(wait.ForeverTestTimeout):
|
|
||||||
t.Errorf("timeout after %v", wait.ForeverTestTimeout)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
|
func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
|
||||||
|
@ -148,6 +148,29 @@ func RunTestWatchFromNoneZero(ctx context.Context, t *testing.T, store storage.I
|
|||||||
TestCheckResult(t, watch.Modified, w, out)
|
TestCheckResult(t, watch.Modified, w, out)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func RunTestWatchContextCancel(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
|
canceledCtx, cancel := context.WithCancel(ctx)
|
||||||
|
cancel()
|
||||||
|
// When we watch with a canceled context, we should detect that it's context canceled.
|
||||||
|
// We won't take it as error and also close the watcher.
|
||||||
|
w, err := store.Watch(canceledCtx, "/abc", storage.ListOptions{
|
||||||
|
ResourceVersion: "0",
|
||||||
|
Predicate: storage.Everything,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case _, ok := <-w.ResultChan():
|
||||||
|
if ok {
|
||||||
|
t.Error("ResultChan() should be closed")
|
||||||
|
}
|
||||||
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
|
t.Errorf("timeout after %v", wait.ForeverTestTimeout)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func RunTestWatchInitializationSignal(ctx context.Context, t *testing.T, store storage.Interface) {
|
func RunTestWatchInitializationSignal(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
ctx, _ = context.WithTimeout(ctx, 5*time.Second)
|
ctx, _ = context.WithTimeout(ctx, 5*time.Second)
|
||||||
initSignal := utilflowcontrol.NewInitializationSignal()
|
initSignal := utilflowcontrol.NewInitializationSignal()
|
||||||
|
Loading…
Reference in New Issue
Block a user