From 374e3638f2a3f4d172ef268506b9725da987b47e Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Thu, 21 Sep 2023 10:48:01 +0000 Subject: [PATCH] workqueue unit tests to assert collapsing behaviors Change-Id: If4bf54c3af603c17de49055960f9a76e7d38aab3 Kubernetes-commit: 8a31bb6786c2d3600bade49de7a64a026802bd6d --- util/workqueue/queue_test.go | 66 ++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/util/workqueue/queue_test.go b/util/workqueue/queue_test.go index de782035..e2a33973 100644 --- a/util/workqueue/queue_test.go +++ b/util/workqueue/queue_test.go @@ -197,6 +197,72 @@ func TestReinsert(t *testing.T) { } } +func TestCollapse(t *testing.T) { + q := workqueue.New() + // Add a new one twice + q.Add("bar") + q.Add("bar") + + // It should get the new one + i, _ := q.Get() + if i != "bar" { + t.Errorf("Expected %v, got %v", "bar", i) + } + + // Finish that one up + q.Done(i) + + // There should be no more objects in the queue + if a := q.Len(); a != 0 { + t.Errorf("Expected queue to be empty. Has %v items", a) + } +} + +func TestCollapseWhileProcessing(t *testing.T) { + q := workqueue.New() + q.Add("foo") + + // Start processing + i, _ := q.Get() + if i != "foo" { + t.Errorf("Expected %v, got %v", "foo", i) + } + + // Add the same one twice + q.Add("foo") + q.Add("foo") + + waitCh := make(chan struct{}) + // simulate another worker consuming the queue + go func() { + defer close(waitCh) + i, _ := q.Get() + if i != "foo" { + t.Errorf("Expected %v, got %v", "foo", i) + } + // Finish that one up + q.Done(i) + }() + + // give the worker some head start to avoid races + // on the select statement that cause flakiness + time.Sleep(100 * time.Millisecond) + // Finish the first one to unblock the other worker + select { + case <-waitCh: + t.Errorf("worker should be blocked until we are done") + default: + q.Done("foo") + } + + // wait for the worker to consume the new object + // There should be no more objects in the queue + <-waitCh + if a := q.Len(); a != 0 { + t.Errorf("Expected queue to be empty. Has %v items", a) + } +} + func TestQueueDrainageUsingShutDownWithDrain(t *testing.T) { q := workqueue.New()