mirror of
https://github.com/rancher/steve.git
synced 2025-09-11 12:19:53 +00:00
Attempting to fix flaky tests
Some tests which relied on timeouts were a bit flaky in CI. This PR refactors a few of them to work on a more reliable method of receiving from a channel and raises the timeout of another test.
This commit is contained in:
@@ -19,6 +19,7 @@ type DebounceableRefresher struct {
|
|||||||
// Refreshable is any type that can be refreshed. The refresh method should by protected by a mutex internally.
|
// Refreshable is any type that can be refreshed. The refresh method should by protected by a mutex internally.
|
||||||
Refreshable Refreshable
|
Refreshable Refreshable
|
||||||
current context.CancelFunc
|
current context.CancelFunc
|
||||||
|
onCancel func()
|
||||||
}
|
}
|
||||||
|
|
||||||
// RefreshAfter requests a refresh after a certain time has passed. Subsequent calls to this method will
|
// RefreshAfter requests a refresh after a certain time has passed. Subsequent calls to this method will
|
||||||
@@ -39,7 +40,10 @@ func (d *DebounceableRefresher) RefreshAfter(duration time.Duration) {
|
|||||||
defer timer.Stop()
|
defer timer.Stop()
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
// this indicates that the context was cancelled. Do nothing.
|
// this indicates that the context was cancelled.
|
||||||
|
if d.onCancel != nil {
|
||||||
|
d.onCancel()
|
||||||
|
}
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
// note this can cause multiple refreshes to happen concurrently
|
// note this can cause multiple refreshes to happen concurrently
|
||||||
err := d.Refreshable.Refresh()
|
err := d.Refreshable.Refresh()
|
||||||
|
@@ -1,8 +1,8 @@
|
|||||||
package debounce
|
package debounce
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync/atomic"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -10,38 +10,63 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type refreshable struct {
|
type refreshable struct {
|
||||||
wasRefreshed atomic.Bool
|
refreshChannel chan struct{}
|
||||||
retErr error
|
cancelChannel chan struct{}
|
||||||
|
retErr error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *refreshable) Refresh() error {
|
func (r *refreshable) Refresh() error {
|
||||||
r.wasRefreshed.Store(true)
|
r.refreshChannel <- struct{}{}
|
||||||
return r.retErr
|
return r.retErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *refreshable) onCancel() {
|
||||||
|
r.cancelChannel <- struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
func TestRefreshAfter(t *testing.T) {
|
func TestRefreshAfter(t *testing.T) {
|
||||||
ref := refreshable{}
|
t.Parallel()
|
||||||
|
refreshChannel := make(chan struct{}, 1)
|
||||||
|
cancelChannel := make(chan struct{}, 1)
|
||||||
|
ref := refreshable{
|
||||||
|
refreshChannel: refreshChannel,
|
||||||
|
cancelChannel: cancelChannel,
|
||||||
|
}
|
||||||
debounce := DebounceableRefresher{
|
debounce := DebounceableRefresher{
|
||||||
Refreshable: &ref,
|
Refreshable: &ref,
|
||||||
|
onCancel: ref.onCancel,
|
||||||
}
|
}
|
||||||
debounce.RefreshAfter(time.Millisecond * 2)
|
debounce.RefreshAfter(time.Millisecond * 100)
|
||||||
debounce.RefreshAfter(time.Microsecond * 2)
|
debounce.RefreshAfter(time.Millisecond * 10)
|
||||||
time.Sleep(time.Millisecond * 1)
|
err := receiveWithTimeout(cancelChannel, time.Second*5)
|
||||||
// test that the second refresh call overrode the first - Micro < Milli so this should have ran
|
require.NoError(t, err)
|
||||||
require.True(t, ref.wasRefreshed.Load())
|
err = receiveWithTimeout(refreshChannel, time.Second*5)
|
||||||
ref.wasRefreshed.Store(false)
|
require.NoError(t, err)
|
||||||
time.Sleep(time.Millisecond * 2)
|
close(refreshChannel)
|
||||||
// test that the call was debounced - though we called this twice only one refresh should be called
|
close(cancelChannel)
|
||||||
require.False(t, ref.wasRefreshed.Load())
|
|
||||||
|
|
||||||
|
// test the error case
|
||||||
|
refreshChannel = make(chan struct{}, 1)
|
||||||
|
defer close(refreshChannel)
|
||||||
ref = refreshable{
|
ref = refreshable{
|
||||||
retErr: fmt.Errorf("Some error"),
|
retErr: fmt.Errorf("Some error"),
|
||||||
|
refreshChannel: refreshChannel,
|
||||||
}
|
}
|
||||||
debounce = DebounceableRefresher{
|
debounce = DebounceableRefresher{
|
||||||
Refreshable: &ref,
|
Refreshable: &ref,
|
||||||
}
|
}
|
||||||
debounce.RefreshAfter(time.Microsecond * 2)
|
debounce.RefreshAfter(time.Millisecond * 100)
|
||||||
// test the error case
|
err = receiveWithTimeout(refreshChannel, time.Second*5)
|
||||||
time.Sleep(time.Millisecond * 1)
|
require.NoError(t, err)
|
||||||
require.True(t, ref.wasRefreshed.Load())
|
}
|
||||||
|
|
||||||
|
func receiveWithTimeout(channel chan struct{}, timeout time.Duration) error {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||||
|
defer cancel()
|
||||||
|
select {
|
||||||
|
case <-channel:
|
||||||
|
return nil
|
||||||
|
case <-ctx.Done():
|
||||||
|
return fmt.Errorf("channel did not recieve value in timeout %d", timeout)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@@ -44,7 +44,7 @@ func Test_countsBuffer(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// first event is not buffered, so we expect to receive it quicker than the debounce
|
// first event is not buffered, so we expect to receive it quicker than the debounce
|
||||||
_, err := receiveWithTimeout(outputChannel, time.Millisecond*1)
|
_, err := receiveWithTimeout(outputChannel, time.Second*1)
|
||||||
assert.NoError(t, err, "Expected first event to be received quickly")
|
assert.NoError(t, err, "Expected first event to be received quickly")
|
||||||
|
|
||||||
// stream our standard count events
|
// stream our standard count events
|
||||||
|
@@ -2,7 +2,7 @@ package definitions
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"sync/atomic"
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -14,22 +14,27 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type refreshable struct {
|
type refreshable struct {
|
||||||
wasRefreshed atomic.Bool
|
refreshChannel chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *refreshable) Refresh() error {
|
func (r *refreshable) Refresh() error {
|
||||||
r.wasRefreshed.Store(true)
|
r.refreshChannel <- struct{}{}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_onChangeCRD(t *testing.T) {
|
func Test_onChangeCRD(t *testing.T) {
|
||||||
internalRefresh := refreshable{}
|
t.Parallel()
|
||||||
|
refreshChannel := make(chan struct{}, 1)
|
||||||
|
defer close(refreshChannel)
|
||||||
|
internalRefresh := refreshable{
|
||||||
|
refreshChannel: refreshChannel,
|
||||||
|
}
|
||||||
refresher := debounce.DebounceableRefresher{
|
refresher := debounce.DebounceableRefresher{
|
||||||
Refreshable: &internalRefresh,
|
Refreshable: &internalRefresh,
|
||||||
}
|
}
|
||||||
refreshHandler := refreshHandler{
|
refreshHandler := refreshHandler{
|
||||||
debounceRef: &refresher,
|
debounceRef: &refresher,
|
||||||
debounceDuration: time.Microsecond * 5,
|
debounceDuration: time.Microsecond * 2,
|
||||||
}
|
}
|
||||||
input := apiextv1.CustomResourceDefinition{
|
input := apiextv1.CustomResourceDefinition{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
@@ -39,19 +44,23 @@ func Test_onChangeCRD(t *testing.T) {
|
|||||||
output, err := refreshHandler.onChangeCRD("test-crd", &input)
|
output, err := refreshHandler.onChangeCRD("test-crd", &input)
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
require.Equal(t, input, *output)
|
require.Equal(t, input, *output)
|
||||||
// waiting to allow the debouncer to refresh the refreshable
|
err = receiveWithTimeout(refreshChannel, time.Second*5)
|
||||||
time.Sleep(time.Millisecond * 2)
|
require.NoError(t, err)
|
||||||
require.True(t, internalRefresh.wasRefreshed.Load())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_onChangeAPIService(t *testing.T) {
|
func Test_onChangeAPIService(t *testing.T) {
|
||||||
internalRefresh := refreshable{}
|
t.Parallel()
|
||||||
|
refreshChannel := make(chan struct{}, 1)
|
||||||
|
defer close(refreshChannel)
|
||||||
|
internalRefresh := refreshable{
|
||||||
|
refreshChannel: refreshChannel,
|
||||||
|
}
|
||||||
refresher := debounce.DebounceableRefresher{
|
refresher := debounce.DebounceableRefresher{
|
||||||
Refreshable: &internalRefresh,
|
Refreshable: &internalRefresh,
|
||||||
}
|
}
|
||||||
refreshHandler := refreshHandler{
|
refreshHandler := refreshHandler{
|
||||||
debounceRef: &refresher,
|
debounceRef: &refresher,
|
||||||
debounceDuration: time.Microsecond * 5,
|
debounceDuration: time.Microsecond * 2,
|
||||||
}
|
}
|
||||||
input := apiregv1.APIService{
|
input := apiregv1.APIService{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
@@ -61,24 +70,44 @@ func Test_onChangeAPIService(t *testing.T) {
|
|||||||
output, err := refreshHandler.onChangeAPIService("test-apiservice", &input)
|
output, err := refreshHandler.onChangeAPIService("test-apiservice", &input)
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
require.Equal(t, input, *output)
|
require.Equal(t, input, *output)
|
||||||
// waiting to allow the debouncer to refresh the refreshable
|
err = receiveWithTimeout(refreshChannel, time.Second*5)
|
||||||
time.Sleep(time.Millisecond * 2)
|
require.NoError(t, err)
|
||||||
require.True(t, internalRefresh.wasRefreshed.Load())
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_startBackgroundRefresh(t *testing.T) {
|
func Test_startBackgroundRefresh(t *testing.T) {
|
||||||
internalRefresh := refreshable{}
|
t.Parallel()
|
||||||
|
refreshChannel := make(chan struct{}, 1)
|
||||||
|
internalRefresh := refreshable{
|
||||||
|
refreshChannel: refreshChannel,
|
||||||
|
}
|
||||||
refresher := debounce.DebounceableRefresher{
|
refresher := debounce.DebounceableRefresher{
|
||||||
Refreshable: &internalRefresh,
|
Refreshable: &internalRefresh,
|
||||||
}
|
}
|
||||||
refreshHandler := refreshHandler{
|
refreshHandler := refreshHandler{
|
||||||
debounceRef: &refresher,
|
debounceRef: &refresher,
|
||||||
debounceDuration: time.Microsecond * 5,
|
debounceDuration: time.Microsecond * 2,
|
||||||
}
|
}
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
refreshHandler.startBackgroundRefresh(ctx, time.Microsecond*10)
|
refreshHandler.startBackgroundRefresh(ctx, time.Microsecond*2)
|
||||||
time.Sleep(time.Millisecond * 2)
|
|
||||||
require.True(t, internalRefresh.wasRefreshed.Load())
|
err := receiveWithTimeout(refreshChannel, time.Second*5)
|
||||||
|
require.NoError(t, err)
|
||||||
|
// we want to stop the refresher before closing the channel to avoid errors
|
||||||
|
// since this just stops the background refresh from asking for a new refresh, we still
|
||||||
|
// need to wait for any currently debounced refreshes to finish
|
||||||
cancel()
|
cancel()
|
||||||
|
time.Sleep(time.Second * 1)
|
||||||
|
close(refreshChannel)
|
||||||
|
}
|
||||||
|
|
||||||
|
func receiveWithTimeout(channel chan struct{}, timeout time.Duration) error {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||||
|
defer cancel()
|
||||||
|
select {
|
||||||
|
case <-channel:
|
||||||
|
return nil
|
||||||
|
case <-ctx.Done():
|
||||||
|
return fmt.Errorf("channel did not recieve value in timeout %d", timeout)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user