There is no need to set the worker to nil to avoid potential timing issues.

This commit is contained in:
XiangNing Xia 2025-01-14 04:19:55 +00:00
parent 5e220e4041
commit 25a6fa144f
2 changed files with 41 additions and 51 deletions

View File

@ -105,13 +105,7 @@ func (q *TimedWorkerQueue) getWrappedWorkerFunc(key string) func(ctx context.Con
err := q.workFunc(ctx, fireAt, args)
q.Lock()
defer q.Unlock()
if err == nil {
// To avoid duplicated calls we keep the key in the queue, to prevent
// subsequent additions.
q.workers[key] = nil
} else {
delete(q.workers, key)
}
return err
}
}

View File

@ -28,6 +28,7 @@ import (
)
func TestExecute(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
testVal := int32(0)
wg := sync.WaitGroup{}
wg.Add(5)
@ -37,17 +38,11 @@ func TestExecute(t *testing.T) {
return nil
})
now := time.Now()
queue.AddWork(context.TODO(), NewWorkArgs("1", "1"), now, now)
queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, now)
queue.AddWork(context.TODO(), NewWorkArgs("3", "3"), now, now)
queue.AddWork(context.TODO(), NewWorkArgs("4", "4"), now, now)
queue.AddWork(context.TODO(), NewWorkArgs("5", "5"), now, now)
// Adding the same thing second time should be no-op
queue.AddWork(context.TODO(), NewWorkArgs("1", "1"), now, now)
queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, now)
queue.AddWork(context.TODO(), NewWorkArgs("3", "3"), now, now)
queue.AddWork(context.TODO(), NewWorkArgs("4", "4"), now, now)
queue.AddWork(context.TODO(), NewWorkArgs("5", "5"), now, now)
queue.AddWork(ctx, NewWorkArgs("1", "1"), now, now)
queue.AddWork(ctx, NewWorkArgs("2", "2"), now, now)
queue.AddWork(ctx, NewWorkArgs("3", "3"), now, now)
queue.AddWork(ctx, NewWorkArgs("4", "4"), now, now)
queue.AddWork(ctx, NewWorkArgs("5", "5"), now, now)
wg.Wait()
lastVal := atomic.LoadInt32(&testVal)
if lastVal != 5 {
@ -56,6 +51,7 @@ func TestExecute(t *testing.T) {
}
func TestExecuteDelayed(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
testVal := int32(0)
wg := sync.WaitGroup{}
wg.Add(5)
@ -68,16 +64,16 @@ func TestExecuteDelayed(t *testing.T) {
then := now.Add(10 * time.Second)
fakeClock := testingclock.NewFakeClock(now)
queue.clock = fakeClock
queue.AddWork(context.TODO(), NewWorkArgs("1", "1"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("3", "3"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("4", "4"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("5", "5"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("1", "1"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("3", "3"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("4", "4"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("5", "5"), now, then)
queue.AddWork(ctx, NewWorkArgs("1", "1"), now, then)
queue.AddWork(ctx, NewWorkArgs("2", "2"), now, then)
queue.AddWork(ctx, NewWorkArgs("3", "3"), now, then)
queue.AddWork(ctx, NewWorkArgs("4", "4"), now, then)
queue.AddWork(ctx, NewWorkArgs("5", "5"), now, then)
queue.AddWork(ctx, NewWorkArgs("1", "1"), now, then)
queue.AddWork(ctx, NewWorkArgs("2", "2"), now, then)
queue.AddWork(ctx, NewWorkArgs("3", "3"), now, then)
queue.AddWork(ctx, NewWorkArgs("4", "4"), now, then)
queue.AddWork(ctx, NewWorkArgs("5", "5"), now, then)
fakeClock.Step(11 * time.Second)
wg.Wait()
lastVal := atomic.LoadInt32(&testVal)
@ -87,6 +83,7 @@ func TestExecuteDelayed(t *testing.T) {
}
func TestCancel(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
testVal := int32(0)
wg := sync.WaitGroup{}
wg.Add(3)
@ -99,17 +96,16 @@ func TestCancel(t *testing.T) {
then := now.Add(10 * time.Second)
fakeClock := testingclock.NewFakeClock(now)
queue.clock = fakeClock
queue.AddWork(context.TODO(), NewWorkArgs("1", "1"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("3", "3"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("4", "4"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("5", "5"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("1", "1"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("3", "3"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("4", "4"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("5", "5"), now, then)
logger, _ := ktesting.NewTestContext(t)
queue.AddWork(ctx, NewWorkArgs("1", "1"), now, then)
queue.AddWork(ctx, NewWorkArgs("2", "2"), now, then)
queue.AddWork(ctx, NewWorkArgs("3", "3"), now, then)
queue.AddWork(ctx, NewWorkArgs("4", "4"), now, then)
queue.AddWork(ctx, NewWorkArgs("5", "5"), now, then)
queue.AddWork(ctx, NewWorkArgs("1", "1"), now, then)
queue.AddWork(ctx, NewWorkArgs("2", "2"), now, then)
queue.AddWork(ctx, NewWorkArgs("3", "3"), now, then)
queue.AddWork(ctx, NewWorkArgs("4", "4"), now, then)
queue.AddWork(ctx, NewWorkArgs("5", "5"), now, then)
queue.CancelWork(logger, NewWorkArgs("2", "2").KeyFromWorkArgs())
queue.CancelWork(logger, NewWorkArgs("4", "4").KeyFromWorkArgs())
fakeClock.Step(11 * time.Second)
@ -121,6 +117,7 @@ func TestCancel(t *testing.T) {
}
func TestCancelAndReadd(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
testVal := int32(0)
wg := sync.WaitGroup{}
wg.Add(4)
@ -133,20 +130,19 @@ func TestCancelAndReadd(t *testing.T) {
then := now.Add(10 * time.Second)
fakeClock := testingclock.NewFakeClock(now)
queue.clock = fakeClock
queue.AddWork(context.TODO(), NewWorkArgs("1", "1"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("3", "3"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("4", "4"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("5", "5"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("1", "1"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("3", "3"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("4", "4"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("5", "5"), now, then)
logger, _ := ktesting.NewTestContext(t)
queue.AddWork(ctx, NewWorkArgs("1", "1"), now, then)
queue.AddWork(ctx, NewWorkArgs("2", "2"), now, then)
queue.AddWork(ctx, NewWorkArgs("3", "3"), now, then)
queue.AddWork(ctx, NewWorkArgs("4", "4"), now, then)
queue.AddWork(ctx, NewWorkArgs("5", "5"), now, then)
queue.AddWork(ctx, NewWorkArgs("1", "1"), now, then)
queue.AddWork(ctx, NewWorkArgs("2", "2"), now, then)
queue.AddWork(ctx, NewWorkArgs("3", "3"), now, then)
queue.AddWork(ctx, NewWorkArgs("4", "4"), now, then)
queue.AddWork(ctx, NewWorkArgs("5", "5"), now, then)
queue.CancelWork(logger, NewWorkArgs("2", "2").KeyFromWorkArgs())
queue.CancelWork(logger, NewWorkArgs("4", "4").KeyFromWorkArgs())
queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, then)
queue.AddWork(ctx, NewWorkArgs("2", "2"), now, then)
fakeClock.Step(11 * time.Second)
wg.Wait()
lastVal := atomic.LoadInt32(&testVal)