Merge pull request #120799 from aojea/workqueue_test

workqueue unit tests to assert collapsing behaviors

Kubernetes-commit: cde6a46a483a0cc7d3f2404db506e1c24ecd4e9d
This commit is contained in:
Kubernetes Publisher 2023-09-21 08:44:27 -07:00
commit 34d1b64e67
3 changed files with 70 additions and 4 deletions

4
go.mod
View File

@ -24,7 +24,7 @@ require (
golang.org/x/term v0.10.0
golang.org/x/time v0.3.0
google.golang.org/protobuf v1.31.0
k8s.io/api v0.0.0-20230915221828-1cac0b1ef7e3
k8s.io/api v0.0.0-20230920101735-cdfdc4766232
k8s.io/apimachinery v0.0.0-20230918221506-a017454658b6
k8s.io/klog/v2 v2.100.1
k8s.io/kube-openapi v0.0.0-20230905202853-d090da108d2f
@ -61,6 +61,6 @@ require (
)
replace (
k8s.io/api => k8s.io/api v0.0.0-20230915221828-1cac0b1ef7e3
k8s.io/api => k8s.io/api v0.0.0-20230920101735-cdfdc4766232
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20230918221506-a017454658b6
)

4
go.sum
View File

@ -148,8 +148,8 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
k8s.io/api v0.0.0-20230915221828-1cac0b1ef7e3 h1:C2ejpTGej8JB8kVw+3usPL1xzm0PiVyfsV0ck/MvHm4=
k8s.io/api v0.0.0-20230915221828-1cac0b1ef7e3/go.mod h1:pj88kBF1wwtwtgNStnyZDc5tCq74jZGLG1D6W9a9QIc=
k8s.io/api v0.0.0-20230920101735-cdfdc4766232 h1:ZSAL3A6g4esdkNgOgsY7HcFfrsxvFzNm3zOor02JbrI=
k8s.io/api v0.0.0-20230920101735-cdfdc4766232/go.mod h1:EqRAohQGiJxWbXjwnVEXTvuskYU4bGYDV7RYaXQcEAU=
k8s.io/apimachinery v0.0.0-20230918221506-a017454658b6 h1:JW6faJsf+BgjA3p5w3AxMAy5dyXDx/F4Uzy0km1Kgks=
k8s.io/apimachinery v0.0.0-20230918221506-a017454658b6/go.mod h1:ITRsvhyE2eLGBxgwRxs79z49RNNQh7HUqBvHCNIgEZc=
k8s.io/klog/v2 v2.100.1 h1:7WCHKK6K8fNhTqfBhISHQ97KrnJNFZMcQvKp7gP/tmg=

View File

@ -197,6 +197,72 @@ func TestReinsert(t *testing.T) {
}
}
func TestCollapse(t *testing.T) {
q := workqueue.New()
// Add a new one twice
q.Add("bar")
q.Add("bar")
// It should get the new one
i, _ := q.Get()
if i != "bar" {
t.Errorf("Expected %v, got %v", "bar", i)
}
// Finish that one up
q.Done(i)
// There should be no more objects in the queue
if a := q.Len(); a != 0 {
t.Errorf("Expected queue to be empty. Has %v items", a)
}
}
func TestCollapseWhileProcessing(t *testing.T) {
q := workqueue.New()
q.Add("foo")
// Start processing
i, _ := q.Get()
if i != "foo" {
t.Errorf("Expected %v, got %v", "foo", i)
}
// Add the same one twice
q.Add("foo")
q.Add("foo")
waitCh := make(chan struct{})
// simulate another worker consuming the queue
go func() {
defer close(waitCh)
i, _ := q.Get()
if i != "foo" {
t.Errorf("Expected %v, got %v", "foo", i)
}
// Finish that one up
q.Done(i)
}()
// give the worker some head start to avoid races
// on the select statement that cause flakiness
time.Sleep(100 * time.Millisecond)
// Finish the first one to unblock the other worker
select {
case <-waitCh:
t.Errorf("worker should be blocked until we are done")
default:
q.Done("foo")
}
// wait for the worker to consume the new object
// There should be no more objects in the queue
<-waitCh
if a := q.Len(); a != 0 {
t.Errorf("Expected queue to be empty. Has %v items", a)
}
}
func TestQueueDrainageUsingShutDownWithDrain(t *testing.T) {
q := workqueue.New()