diff --git a/pkg/debounce/refresher.go b/pkg/debounce/refresher.go index fd7843c4..4439ab1d 100644 --- a/pkg/debounce/refresher.go +++ b/pkg/debounce/refresher.go @@ -19,6 +19,7 @@ type DebounceableRefresher struct { // Refreshable is any type that can be refreshed. The refresh method should by protected by a mutex internally. Refreshable Refreshable current context.CancelFunc + onCancel func() } // RefreshAfter requests a refresh after a certain time has passed. Subsequent calls to this method will @@ -39,7 +40,10 @@ func (d *DebounceableRefresher) RefreshAfter(duration time.Duration) { defer timer.Stop() select { case <-ctx.Done(): - // this indicates that the context was cancelled. Do nothing. + // this indicates that the context was cancelled. + if d.onCancel != nil { + d.onCancel() + } case <-timer.C: // note this can cause multiple refreshes to happen concurrently err := d.Refreshable.Refresh() diff --git a/pkg/debounce/refresher_test.go b/pkg/debounce/refresher_test.go index 50ad896e..5c4f17dd 100644 --- a/pkg/debounce/refresher_test.go +++ b/pkg/debounce/refresher_test.go @@ -1,8 +1,8 @@ package debounce import ( + "context" "fmt" - "sync/atomic" "testing" "time" @@ -10,38 +10,63 @@ import ( ) type refreshable struct { - wasRefreshed atomic.Bool - retErr error + refreshChannel chan struct{} + cancelChannel chan struct{} + retErr error } func (r *refreshable) Refresh() error { - r.wasRefreshed.Store(true) + r.refreshChannel <- struct{}{} return r.retErr } +func (r *refreshable) onCancel() { + r.cancelChannel <- struct{}{} +} + func TestRefreshAfter(t *testing.T) { - ref := refreshable{} + t.Parallel() + refreshChannel := make(chan struct{}, 1) + cancelChannel := make(chan struct{}, 1) + ref := refreshable{ + refreshChannel: refreshChannel, + cancelChannel: cancelChannel, + } debounce := DebounceableRefresher{ Refreshable: &ref, + onCancel: ref.onCancel, } - debounce.RefreshAfter(time.Millisecond * 2) - debounce.RefreshAfter(time.Microsecond * 2) - time.Sleep(time.Millisecond * 1) - // test that the second refresh call overrode the first - Micro < Milli so this should have ran - require.True(t, ref.wasRefreshed.Load()) - ref.wasRefreshed.Store(false) - time.Sleep(time.Millisecond * 2) - // test that the call was debounced - though we called this twice only one refresh should be called - require.False(t, ref.wasRefreshed.Load()) + debounce.RefreshAfter(time.Millisecond * 100) + debounce.RefreshAfter(time.Millisecond * 10) + err := receiveWithTimeout(cancelChannel, time.Second*5) + require.NoError(t, err) + err = receiveWithTimeout(refreshChannel, time.Second*5) + require.NoError(t, err) + close(refreshChannel) + close(cancelChannel) + // test the error case + refreshChannel = make(chan struct{}, 1) + defer close(refreshChannel) ref = refreshable{ - retErr: fmt.Errorf("Some error"), + retErr: fmt.Errorf("Some error"), + refreshChannel: refreshChannel, } debounce = DebounceableRefresher{ Refreshable: &ref, } - debounce.RefreshAfter(time.Microsecond * 2) - // test the error case - time.Sleep(time.Millisecond * 1) - require.True(t, ref.wasRefreshed.Load()) + debounce.RefreshAfter(time.Millisecond * 100) + err = receiveWithTimeout(refreshChannel, time.Second*5) + require.NoError(t, err) +} + +func receiveWithTimeout(channel chan struct{}, timeout time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + select { + case <-channel: + return nil + case <-ctx.Done(): + return fmt.Errorf("channel did not recieve value in timeout %d", timeout) + } } diff --git a/pkg/resources/counts/buffer_test.go b/pkg/resources/counts/buffer_test.go index 381b70a3..5c2d71c4 100644 --- a/pkg/resources/counts/buffer_test.go +++ b/pkg/resources/counts/buffer_test.go @@ -44,7 +44,7 @@ func Test_countsBuffer(t *testing.T) { } // first event is not buffered, so we expect to receive it quicker than the debounce - _, err := receiveWithTimeout(outputChannel, time.Millisecond*1) + _, err := receiveWithTimeout(outputChannel, time.Second*1) assert.NoError(t, err, "Expected first event to be received quickly") // stream our standard count events diff --git a/pkg/schema/definitions/refresh_test.go b/pkg/schema/definitions/refresh_test.go index 627aecc8..ed5e44d6 100644 --- a/pkg/schema/definitions/refresh_test.go +++ b/pkg/schema/definitions/refresh_test.go @@ -2,7 +2,7 @@ package definitions import ( "context" - "sync/atomic" + "fmt" "testing" "time" @@ -14,22 +14,27 @@ import ( ) type refreshable struct { - wasRefreshed atomic.Bool + refreshChannel chan struct{} } func (r *refreshable) Refresh() error { - r.wasRefreshed.Store(true) + r.refreshChannel <- struct{}{} return nil } func Test_onChangeCRD(t *testing.T) { - internalRefresh := refreshable{} + t.Parallel() + refreshChannel := make(chan struct{}, 1) + defer close(refreshChannel) + internalRefresh := refreshable{ + refreshChannel: refreshChannel, + } refresher := debounce.DebounceableRefresher{ Refreshable: &internalRefresh, } refreshHandler := refreshHandler{ debounceRef: &refresher, - debounceDuration: time.Microsecond * 5, + debounceDuration: time.Microsecond * 2, } input := apiextv1.CustomResourceDefinition{ ObjectMeta: metav1.ObjectMeta{ @@ -39,19 +44,23 @@ func Test_onChangeCRD(t *testing.T) { output, err := refreshHandler.onChangeCRD("test-crd", &input) require.Nil(t, err) require.Equal(t, input, *output) - // waiting to allow the debouncer to refresh the refreshable - time.Sleep(time.Millisecond * 2) - require.True(t, internalRefresh.wasRefreshed.Load()) + err = receiveWithTimeout(refreshChannel, time.Second*5) + require.NoError(t, err) } func Test_onChangeAPIService(t *testing.T) { - internalRefresh := refreshable{} + t.Parallel() + refreshChannel := make(chan struct{}, 1) + defer close(refreshChannel) + internalRefresh := refreshable{ + refreshChannel: refreshChannel, + } refresher := debounce.DebounceableRefresher{ Refreshable: &internalRefresh, } refreshHandler := refreshHandler{ debounceRef: &refresher, - debounceDuration: time.Microsecond * 5, + debounceDuration: time.Microsecond * 2, } input := apiregv1.APIService{ ObjectMeta: metav1.ObjectMeta{ @@ -61,24 +70,44 @@ func Test_onChangeAPIService(t *testing.T) { output, err := refreshHandler.onChangeAPIService("test-apiservice", &input) require.Nil(t, err) require.Equal(t, input, *output) - // waiting to allow the debouncer to refresh the refreshable - time.Sleep(time.Millisecond * 2) - require.True(t, internalRefresh.wasRefreshed.Load()) + err = receiveWithTimeout(refreshChannel, time.Second*5) + require.NoError(t, err) } func Test_startBackgroundRefresh(t *testing.T) { - internalRefresh := refreshable{} + t.Parallel() + refreshChannel := make(chan struct{}, 1) + internalRefresh := refreshable{ + refreshChannel: refreshChannel, + } refresher := debounce.DebounceableRefresher{ Refreshable: &internalRefresh, } refreshHandler := refreshHandler{ debounceRef: &refresher, - debounceDuration: time.Microsecond * 5, + debounceDuration: time.Microsecond * 2, } ctx, cancel := context.WithCancel(context.Background()) - refreshHandler.startBackgroundRefresh(ctx, time.Microsecond*10) - time.Sleep(time.Millisecond * 2) - require.True(t, internalRefresh.wasRefreshed.Load()) + refreshHandler.startBackgroundRefresh(ctx, time.Microsecond*2) + + err := receiveWithTimeout(refreshChannel, time.Second*5) + require.NoError(t, err) + // we want to stop the refresher before closing the channel to avoid errors + // since this just stops the background refresh from asking for a new refresh, we still + // need to wait for any currently debounced refreshes to finish cancel() + time.Sleep(time.Second * 1) + close(refreshChannel) +} + +func receiveWithTimeout(channel chan struct{}, timeout time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + select { + case <-channel: + return nil + case <-ctx.Done(): + return fmt.Errorf("channel did not recieve value in timeout %d", timeout) + } }