Merge pull request #129577 from ning0515/fix-125332

Only set worker to nil when the key exists.
This commit is contained in:
Kubernetes Prow Robot 2025-01-20 01:14:36 -08:00 committed by GitHub
commit 4766d191f4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 41 additions and 51 deletions

View File

@ -105,13 +105,7 @@ func (q *TimedWorkerQueue) getWrappedWorkerFunc(key string) func(ctx context.Con
err := q.workFunc(ctx, fireAt, args)
q.Lock()
defer q.Unlock()
if err == nil {
// To avoid duplicated calls we keep the key in the queue, to prevent
// subsequent additions.
q.workers[key] = nil
} else {
delete(q.workers, key)
}
delete(q.workers, key)
return err
}
}

View File

@ -28,6 +28,7 @@ import (
)
func TestExecute(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
testVal := int32(0)
wg := sync.WaitGroup{}
wg.Add(5)
@ -37,17 +38,11 @@ func TestExecute(t *testing.T) {
return nil
})
now := time.Now()
queue.AddWork(context.TODO(), NewWorkArgs("1", "1"), now, now)
queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, now)
queue.AddWork(context.TODO(), NewWorkArgs("3", "3"), now, now)
queue.AddWork(context.TODO(), NewWorkArgs("4", "4"), now, now)
queue.AddWork(context.TODO(), NewWorkArgs("5", "5"), now, now)
// Adding the same thing second time should be no-op
queue.AddWork(context.TODO(), NewWorkArgs("1", "1"), now, now)
queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, now)
queue.AddWork(context.TODO(), NewWorkArgs("3", "3"), now, now)
queue.AddWork(context.TODO(), NewWorkArgs("4", "4"), now, now)
queue.AddWork(context.TODO(), NewWorkArgs("5", "5"), now, now)
queue.AddWork(ctx, NewWorkArgs("1", "1"), now, now)
queue.AddWork(ctx, NewWorkArgs("2", "2"), now, now)
queue.AddWork(ctx, NewWorkArgs("3", "3"), now, now)
queue.AddWork(ctx, NewWorkArgs("4", "4"), now, now)
queue.AddWork(ctx, NewWorkArgs("5", "5"), now, now)
wg.Wait()
lastVal := atomic.LoadInt32(&testVal)
if lastVal != 5 {
@ -56,6 +51,7 @@ func TestExecute(t *testing.T) {
}
func TestExecuteDelayed(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
testVal := int32(0)
wg := sync.WaitGroup{}
wg.Add(5)
@ -68,16 +64,16 @@ func TestExecuteDelayed(t *testing.T) {
then := now.Add(10 * time.Second)
fakeClock := testingclock.NewFakeClock(now)
queue.clock = fakeClock
queue.AddWork(context.TODO(), NewWorkArgs("1", "1"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("3", "3"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("4", "4"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("5", "5"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("1", "1"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("3", "3"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("4", "4"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("5", "5"), now, then)
queue.AddWork(ctx, NewWorkArgs("1", "1"), now, then)
queue.AddWork(ctx, NewWorkArgs("2", "2"), now, then)
queue.AddWork(ctx, NewWorkArgs("3", "3"), now, then)
queue.AddWork(ctx, NewWorkArgs("4", "4"), now, then)
queue.AddWork(ctx, NewWorkArgs("5", "5"), now, then)
queue.AddWork(ctx, NewWorkArgs("1", "1"), now, then)
queue.AddWork(ctx, NewWorkArgs("2", "2"), now, then)
queue.AddWork(ctx, NewWorkArgs("3", "3"), now, then)
queue.AddWork(ctx, NewWorkArgs("4", "4"), now, then)
queue.AddWork(ctx, NewWorkArgs("5", "5"), now, then)
fakeClock.Step(11 * time.Second)
wg.Wait()
lastVal := atomic.LoadInt32(&testVal)
@ -87,6 +83,7 @@ func TestExecuteDelayed(t *testing.T) {
}
func TestCancel(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
testVal := int32(0)
wg := sync.WaitGroup{}
wg.Add(3)
@ -99,17 +96,16 @@ func TestCancel(t *testing.T) {
then := now.Add(10 * time.Second)
fakeClock := testingclock.NewFakeClock(now)
queue.clock = fakeClock
queue.AddWork(context.TODO(), NewWorkArgs("1", "1"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("3", "3"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("4", "4"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("5", "5"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("1", "1"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("3", "3"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("4", "4"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("5", "5"), now, then)
logger, _ := ktesting.NewTestContext(t)
queue.AddWork(ctx, NewWorkArgs("1", "1"), now, then)
queue.AddWork(ctx, NewWorkArgs("2", "2"), now, then)
queue.AddWork(ctx, NewWorkArgs("3", "3"), now, then)
queue.AddWork(ctx, NewWorkArgs("4", "4"), now, then)
queue.AddWork(ctx, NewWorkArgs("5", "5"), now, then)
queue.AddWork(ctx, NewWorkArgs("1", "1"), now, then)
queue.AddWork(ctx, NewWorkArgs("2", "2"), now, then)
queue.AddWork(ctx, NewWorkArgs("3", "3"), now, then)
queue.AddWork(ctx, NewWorkArgs("4", "4"), now, then)
queue.AddWork(ctx, NewWorkArgs("5", "5"), now, then)
queue.CancelWork(logger, NewWorkArgs("2", "2").KeyFromWorkArgs())
queue.CancelWork(logger, NewWorkArgs("4", "4").KeyFromWorkArgs())
fakeClock.Step(11 * time.Second)
@ -121,6 +117,7 @@ func TestCancel(t *testing.T) {
}
func TestCancelAndReadd(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
testVal := int32(0)
wg := sync.WaitGroup{}
wg.Add(4)
@ -133,20 +130,19 @@ func TestCancelAndReadd(t *testing.T) {
then := now.Add(10 * time.Second)
fakeClock := testingclock.NewFakeClock(now)
queue.clock = fakeClock
queue.AddWork(context.TODO(), NewWorkArgs("1", "1"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("3", "3"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("4", "4"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("5", "5"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("1", "1"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("3", "3"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("4", "4"), now, then)
queue.AddWork(context.TODO(), NewWorkArgs("5", "5"), now, then)
logger, _ := ktesting.NewTestContext(t)
queue.AddWork(ctx, NewWorkArgs("1", "1"), now, then)
queue.AddWork(ctx, NewWorkArgs("2", "2"), now, then)
queue.AddWork(ctx, NewWorkArgs("3", "3"), now, then)
queue.AddWork(ctx, NewWorkArgs("4", "4"), now, then)
queue.AddWork(ctx, NewWorkArgs("5", "5"), now, then)
queue.AddWork(ctx, NewWorkArgs("1", "1"), now, then)
queue.AddWork(ctx, NewWorkArgs("2", "2"), now, then)
queue.AddWork(ctx, NewWorkArgs("3", "3"), now, then)
queue.AddWork(ctx, NewWorkArgs("4", "4"), now, then)
queue.AddWork(ctx, NewWorkArgs("5", "5"), now, then)
queue.CancelWork(logger, NewWorkArgs("2", "2").KeyFromWorkArgs())
queue.CancelWork(logger, NewWorkArgs("4", "4").KeyFromWorkArgs())
queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, then)
queue.AddWork(ctx, NewWorkArgs("2", "2"), now, then)
fakeClock.Step(11 * time.Second)
wg.Wait()
lastVal := atomic.LoadInt32(&testVal)