From 25a6fa144f4faa61b9b8672f4609b3df817ddafa Mon Sep 17 00:00:00 2001 From: XiangNing Xia Date: Tue, 14 Jan 2025 04:19:55 +0000 Subject: [PATCH] There is no need to set the worker to nil to avoid potential timing issues. --- pkg/controller/tainteviction/timed_workers.go | 8 +- .../tainteviction/timed_workers_test.go | 84 +++++++++---------- 2 files changed, 41 insertions(+), 51 deletions(-) diff --git a/pkg/controller/tainteviction/timed_workers.go b/pkg/controller/tainteviction/timed_workers.go index 8260c86ff36..fa7615d9b9a 100644 --- a/pkg/controller/tainteviction/timed_workers.go +++ b/pkg/controller/tainteviction/timed_workers.go @@ -105,13 +105,7 @@ func (q *TimedWorkerQueue) getWrappedWorkerFunc(key string) func(ctx context.Con err := q.workFunc(ctx, fireAt, args) q.Lock() defer q.Unlock() - if err == nil { - // To avoid duplicated calls we keep the key in the queue, to prevent - // subsequent additions. - q.workers[key] = nil - } else { - delete(q.workers, key) - } + delete(q.workers, key) return err } } diff --git a/pkg/controller/tainteviction/timed_workers_test.go b/pkg/controller/tainteviction/timed_workers_test.go index 389ccd3ceb2..d39acaed443 100644 --- a/pkg/controller/tainteviction/timed_workers_test.go +++ b/pkg/controller/tainteviction/timed_workers_test.go @@ -28,6 +28,7 @@ import ( ) func TestExecute(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) testVal := int32(0) wg := sync.WaitGroup{} wg.Add(5) @@ -37,17 +38,11 @@ func TestExecute(t *testing.T) { return nil }) now := time.Now() - queue.AddWork(context.TODO(), NewWorkArgs("1", "1"), now, now) - queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, now) - queue.AddWork(context.TODO(), NewWorkArgs("3", "3"), now, now) - queue.AddWork(context.TODO(), NewWorkArgs("4", "4"), now, now) - queue.AddWork(context.TODO(), NewWorkArgs("5", "5"), now, now) - // Adding the same thing second time should be no-op - queue.AddWork(context.TODO(), NewWorkArgs("1", "1"), now, now) - queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, now) - queue.AddWork(context.TODO(), NewWorkArgs("3", "3"), now, now) - queue.AddWork(context.TODO(), NewWorkArgs("4", "4"), now, now) - queue.AddWork(context.TODO(), NewWorkArgs("5", "5"), now, now) + queue.AddWork(ctx, NewWorkArgs("1", "1"), now, now) + queue.AddWork(ctx, NewWorkArgs("2", "2"), now, now) + queue.AddWork(ctx, NewWorkArgs("3", "3"), now, now) + queue.AddWork(ctx, NewWorkArgs("4", "4"), now, now) + queue.AddWork(ctx, NewWorkArgs("5", "5"), now, now) wg.Wait() lastVal := atomic.LoadInt32(&testVal) if lastVal != 5 { @@ -56,6 +51,7 @@ func TestExecute(t *testing.T) { } func TestExecuteDelayed(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) testVal := int32(0) wg := sync.WaitGroup{} wg.Add(5) @@ -68,16 +64,16 @@ func TestExecuteDelayed(t *testing.T) { then := now.Add(10 * time.Second) fakeClock := testingclock.NewFakeClock(now) queue.clock = fakeClock - queue.AddWork(context.TODO(), NewWorkArgs("1", "1"), now, then) - queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, then) - queue.AddWork(context.TODO(), NewWorkArgs("3", "3"), now, then) - queue.AddWork(context.TODO(), NewWorkArgs("4", "4"), now, then) - queue.AddWork(context.TODO(), NewWorkArgs("5", "5"), now, then) - queue.AddWork(context.TODO(), NewWorkArgs("1", "1"), now, then) - queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, then) - queue.AddWork(context.TODO(), NewWorkArgs("3", "3"), now, then) - queue.AddWork(context.TODO(), NewWorkArgs("4", "4"), now, then) - queue.AddWork(context.TODO(), NewWorkArgs("5", "5"), now, then) + queue.AddWork(ctx, NewWorkArgs("1", "1"), now, then) + queue.AddWork(ctx, NewWorkArgs("2", "2"), now, then) + queue.AddWork(ctx, NewWorkArgs("3", "3"), now, then) + queue.AddWork(ctx, NewWorkArgs("4", "4"), now, then) + queue.AddWork(ctx, NewWorkArgs("5", "5"), now, then) + queue.AddWork(ctx, NewWorkArgs("1", "1"), now, then) + queue.AddWork(ctx, NewWorkArgs("2", "2"), now, then) + queue.AddWork(ctx, NewWorkArgs("3", "3"), now, then) + queue.AddWork(ctx, NewWorkArgs("4", "4"), now, then) + queue.AddWork(ctx, NewWorkArgs("5", "5"), now, then) fakeClock.Step(11 * time.Second) wg.Wait() lastVal := atomic.LoadInt32(&testVal) @@ -87,6 +83,7 @@ func TestExecuteDelayed(t *testing.T) { } func TestCancel(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) testVal := int32(0) wg := sync.WaitGroup{} wg.Add(3) @@ -99,17 +96,16 @@ func TestCancel(t *testing.T) { then := now.Add(10 * time.Second) fakeClock := testingclock.NewFakeClock(now) queue.clock = fakeClock - queue.AddWork(context.TODO(), NewWorkArgs("1", "1"), now, then) - queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, then) - queue.AddWork(context.TODO(), NewWorkArgs("3", "3"), now, then) - queue.AddWork(context.TODO(), NewWorkArgs("4", "4"), now, then) - queue.AddWork(context.TODO(), NewWorkArgs("5", "5"), now, then) - queue.AddWork(context.TODO(), NewWorkArgs("1", "1"), now, then) - queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, then) - queue.AddWork(context.TODO(), NewWorkArgs("3", "3"), now, then) - queue.AddWork(context.TODO(), NewWorkArgs("4", "4"), now, then) - queue.AddWork(context.TODO(), NewWorkArgs("5", "5"), now, then) - logger, _ := ktesting.NewTestContext(t) + queue.AddWork(ctx, NewWorkArgs("1", "1"), now, then) + queue.AddWork(ctx, NewWorkArgs("2", "2"), now, then) + queue.AddWork(ctx, NewWorkArgs("3", "3"), now, then) + queue.AddWork(ctx, NewWorkArgs("4", "4"), now, then) + queue.AddWork(ctx, NewWorkArgs("5", "5"), now, then) + queue.AddWork(ctx, NewWorkArgs("1", "1"), now, then) + queue.AddWork(ctx, NewWorkArgs("2", "2"), now, then) + queue.AddWork(ctx, NewWorkArgs("3", "3"), now, then) + queue.AddWork(ctx, NewWorkArgs("4", "4"), now, then) + queue.AddWork(ctx, NewWorkArgs("5", "5"), now, then) queue.CancelWork(logger, NewWorkArgs("2", "2").KeyFromWorkArgs()) queue.CancelWork(logger, NewWorkArgs("4", "4").KeyFromWorkArgs()) fakeClock.Step(11 * time.Second) @@ -121,6 +117,7 @@ func TestCancel(t *testing.T) { } func TestCancelAndReadd(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) testVal := int32(0) wg := sync.WaitGroup{} wg.Add(4) @@ -133,20 +130,19 @@ func TestCancelAndReadd(t *testing.T) { then := now.Add(10 * time.Second) fakeClock := testingclock.NewFakeClock(now) queue.clock = fakeClock - queue.AddWork(context.TODO(), NewWorkArgs("1", "1"), now, then) - queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, then) - queue.AddWork(context.TODO(), NewWorkArgs("3", "3"), now, then) - queue.AddWork(context.TODO(), NewWorkArgs("4", "4"), now, then) - queue.AddWork(context.TODO(), NewWorkArgs("5", "5"), now, then) - queue.AddWork(context.TODO(), NewWorkArgs("1", "1"), now, then) - queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, then) - queue.AddWork(context.TODO(), NewWorkArgs("3", "3"), now, then) - queue.AddWork(context.TODO(), NewWorkArgs("4", "4"), now, then) - queue.AddWork(context.TODO(), NewWorkArgs("5", "5"), now, then) - logger, _ := ktesting.NewTestContext(t) + queue.AddWork(ctx, NewWorkArgs("1", "1"), now, then) + queue.AddWork(ctx, NewWorkArgs("2", "2"), now, then) + queue.AddWork(ctx, NewWorkArgs("3", "3"), now, then) + queue.AddWork(ctx, NewWorkArgs("4", "4"), now, then) + queue.AddWork(ctx, NewWorkArgs("5", "5"), now, then) + queue.AddWork(ctx, NewWorkArgs("1", "1"), now, then) + queue.AddWork(ctx, NewWorkArgs("2", "2"), now, then) + queue.AddWork(ctx, NewWorkArgs("3", "3"), now, then) + queue.AddWork(ctx, NewWorkArgs("4", "4"), now, then) + queue.AddWork(ctx, NewWorkArgs("5", "5"), now, then) queue.CancelWork(logger, NewWorkArgs("2", "2").KeyFromWorkArgs()) queue.CancelWork(logger, NewWorkArgs("4", "4").KeyFromWorkArgs()) - queue.AddWork(context.TODO(), NewWorkArgs("2", "2"), now, then) + queue.AddWork(ctx, NewWorkArgs("2", "2"), now, then) fakeClock.Step(11 * time.Second) wg.Wait() lastVal := atomic.LoadInt32(&testVal)