Merge pull request #123347 from zhouhaibing089/abstract-queue

workqueue: make queue as configurable

Kubernetes-commit: 301eb8d47a794393a85a9449ab6b29d902ed06bd
This commit is contained in:
Kubernetes Publisher
2024-04-18 00:00:42 -07:00
5 changed files with 106 additions and 20 deletions

4
go.mod
View File

@@ -24,8 +24,8 @@ require (
golang.org/x/term v0.18.0 golang.org/x/term v0.18.0
golang.org/x/time v0.3.0 golang.org/x/time v0.3.0
google.golang.org/protobuf v1.33.0 google.golang.org/protobuf v1.33.0
k8s.io/api v0.0.0-20240404035423-5e7d566356d1 k8s.io/api v0.0.0-20240418013359-a819b1d9bd16
k8s.io/apimachinery v0.0.0-20240404035254-e696ec55a32e k8s.io/apimachinery v0.0.0-20240418093209-8c36da9e60f7
k8s.io/klog/v2 v2.120.1 k8s.io/klog/v2 v2.120.1
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340
k8s.io/utils v0.0.0-20230726121419-3b25d923346b k8s.io/utils v0.0.0-20230726121419-3b25d923346b

8
go.sum
View File

@@ -153,10 +153,10 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
k8s.io/api v0.0.0-20240404035423-5e7d566356d1 h1:tUkP151p85IMjkPt1+gdSJ4a7HTp6atyw0BPaOl43AI= k8s.io/api v0.0.0-20240418013359-a819b1d9bd16 h1:DXJla1ulezom5N0QIRlZetozcxtRPdS7U+muHDJuiO4=
k8s.io/api v0.0.0-20240404035423-5e7d566356d1/go.mod h1:hpltBotDO81r+TzqESp+1COe04YlRTmdCzAysBBM8CU= k8s.io/api v0.0.0-20240418013359-a819b1d9bd16/go.mod h1:a1YU16kjsAapUzg1LYaOqTnbMlo87NXy9bSeWjRmfoo=
k8s.io/apimachinery v0.0.0-20240404035254-e696ec55a32e h1:QDMqQVyH8eAEDzaa0HcUsmoJE2goz2xNXb2SKkcU3Lw= k8s.io/apimachinery v0.0.0-20240418093209-8c36da9e60f7 h1:SydhMcp6AJkjqqVcd0o0uz7ntTcs/QyIgIHAFYfIm7E=
k8s.io/apimachinery v0.0.0-20240404035254-e696ec55a32e/go.mod h1:iexa2somDaxdnj7bha06bhb43Zpa6eWH8N8dbqVjTUc= k8s.io/apimachinery v0.0.0-20240418093209-8c36da9e60f7/go.mod h1:iexa2somDaxdnj7bha06bhb43Zpa6eWH8N8dbqVjTUc=
k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw= k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw=
k8s.io/klog/v2 v2.120.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/klog/v2 v2.120.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7FjZpUb45WallggurYhKGag= k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7FjZpUb45WallggurYhKGag=

View File

@@ -41,7 +41,7 @@ func TestMetricShutdown(t *testing.T) {
updateCalled: ch, updateCalled: ch,
} }
c := testingclock.NewFakeClock(time.Now()) c := testingclock.NewFakeClock(time.Now())
q := newQueue(c, m, time.Millisecond) q := newQueue(c, DefaultQueue(), m, time.Millisecond)
for !c.HasWaiters() { for !c.HasWaiters() {
// Wait for the go routine to call NewTicker() // Wait for the go routine to call NewTicker()
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)

View File

@@ -33,6 +33,48 @@ type Interface interface {
ShuttingDown() bool ShuttingDown() bool
} }
// Queue is the underlying storage for items. The functions below are always
// called from the same goroutine.
type Queue interface {
// Touch can be hooked when an existing item is added again. This may be
// useful if the implementation allows priority change for the given item.
Touch(item interface{})
// Push adds a new item.
Push(item interface{})
// Len tells the total number of items.
Len() int
// Pop retrieves an item.
Pop() (item interface{})
}
// DefaultQueue is a slice based FIFO queue.
func DefaultQueue() Queue {
return new(queue)
}
// queue is a slice which implements Queue.
type queue []interface{}
func (q *queue) Touch(item interface{}) {}
func (q *queue) Push(item interface{}) {
*q = append(*q, item)
}
func (q *queue) Len() int {
return len(*q)
}
func (q *queue) Pop() (item interface{}) {
item = (*q)[0]
// The underlying array still exists and reference this object, so the object will not be garbage collected.
(*q)[0] = nil
*q = (*q)[1:]
return item
}
// QueueConfig specifies optional configurations to customize an Interface. // QueueConfig specifies optional configurations to customize an Interface.
type QueueConfig struct { type QueueConfig struct {
// Name for the queue. If unnamed, the metrics will not be registered. // Name for the queue. If unnamed, the metrics will not be registered.
@@ -44,6 +86,9 @@ type QueueConfig struct {
// Clock ability to inject real or fake clock for testing purposes. // Clock ability to inject real or fake clock for testing purposes.
Clock clock.WithTicker Clock clock.WithTicker
// Queue provides the underlying queue to use. It is optional and defaults to slice based FIFO queue.
Queue Queue
} }
// New constructs a new work queue (see the package comment). // New constructs a new work queue (see the package comment).
@@ -83,16 +128,22 @@ func newQueueWithConfig(config QueueConfig, updatePeriod time.Duration) *Type {
config.Clock = clock.RealClock{} config.Clock = clock.RealClock{}
} }
if config.Queue == nil {
config.Queue = DefaultQueue()
}
return newQueue( return newQueue(
config.Clock, config.Clock,
config.Queue,
metricsFactory.newQueueMetrics(config.Name, config.Clock), metricsFactory.newQueueMetrics(config.Name, config.Clock),
updatePeriod, updatePeriod,
) )
} }
func newQueue(c clock.WithTicker, metrics queueMetrics, updatePeriod time.Duration) *Type { func newQueue(c clock.WithTicker, queue Queue, metrics queueMetrics, updatePeriod time.Duration) *Type {
t := &Type{ t := &Type{
clock: c, clock: c,
queue: queue,
dirty: set{}, dirty: set{},
processing: set{}, processing: set{},
cond: sync.NewCond(&sync.Mutex{}), cond: sync.NewCond(&sync.Mutex{}),
@@ -116,7 +167,7 @@ type Type struct {
// queue defines the order in which we will work on items. Every // queue defines the order in which we will work on items. Every
// element of queue should be in the dirty set and not in the // element of queue should be in the dirty set and not in the
// processing set. // processing set.
queue []t queue Queue
// dirty defines all of the items that need to be processed. // dirty defines all of the items that need to be processed.
dirty set dirty set
@@ -167,6 +218,11 @@ func (q *Type) Add(item interface{}) {
return return
} }
if q.dirty.has(item) { if q.dirty.has(item) {
// the same item is added again before it is processed, call the Touch
// function if the queue cares about it (for e.g, reset its priority)
if !q.processing.has(item) {
q.queue.Touch(item)
}
return return
} }
@@ -177,7 +233,7 @@ func (q *Type) Add(item interface{}) {
return return
} }
q.queue = append(q.queue, item) q.queue.Push(item)
q.cond.Signal() q.cond.Signal()
} }
@@ -187,7 +243,7 @@ func (q *Type) Add(item interface{}) {
func (q *Type) Len() int { func (q *Type) Len() int {
q.cond.L.Lock() q.cond.L.Lock()
defer q.cond.L.Unlock() defer q.cond.L.Unlock()
return len(q.queue) return q.queue.Len()
} }
// Get blocks until it can return an item to be processed. If shutdown = true, // Get blocks until it can return an item to be processed. If shutdown = true,
@@ -196,18 +252,15 @@ func (q *Type) Len() int {
func (q *Type) Get() (item interface{}, shutdown bool) { func (q *Type) Get() (item interface{}, shutdown bool) {
q.cond.L.Lock() q.cond.L.Lock()
defer q.cond.L.Unlock() defer q.cond.L.Unlock()
for len(q.queue) == 0 && !q.shuttingDown { for q.queue.Len() == 0 && !q.shuttingDown {
q.cond.Wait() q.cond.Wait()
} }
if len(q.queue) == 0 { if q.queue.Len() == 0 {
// We must be shutting down. // We must be shutting down.
return nil, true return nil, true
} }
item = q.queue[0] item = q.queue.Pop()
// The underlying array still exists and reference this object, so the object will not be garbage collected.
q.queue[0] = nil
q.queue = q.queue[1:]
q.metrics.get(item) q.metrics.get(item)
@@ -228,7 +281,7 @@ func (q *Type) Done(item interface{}) {
q.processing.delete(item) q.processing.delete(item)
if q.dirty.has(item) { if q.dirty.has(item) {
q.queue = append(q.queue, item) q.queue.Push(item)
q.cond.Signal() q.cond.Signal()
} else if q.processing.len() == 0 { } else if q.processing.len() == 0 {
q.cond.Signal() q.cond.Signal()

View File

@@ -27,6 +27,23 @@ import (
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
) )
// traceQueue traces whether items are touched
type traceQueue struct {
workqueue.Queue
touched map[interface{}]struct{}
}
func (t *traceQueue) Touch(item interface{}) {
t.Queue.Touch(item)
if t.touched == nil {
t.touched = make(map[interface{}]struct{})
}
t.touched[item] = struct{}{}
}
var _ workqueue.Queue = &traceQueue{}
func TestBasic(t *testing.T) { func TestBasic(t *testing.T) {
tests := []struct { tests := []struct {
queue *workqueue.Type queue *workqueue.Type
@@ -198,7 +215,11 @@ func TestReinsert(t *testing.T) {
} }
func TestCollapse(t *testing.T) { func TestCollapse(t *testing.T) {
q := workqueue.New() tq := &traceQueue{Queue: workqueue.DefaultQueue()}
q := workqueue.NewWithConfig(workqueue.QueueConfig{
Name: "",
Queue: tq,
})
// Add a new one twice // Add a new one twice
q.Add("bar") q.Add("bar")
q.Add("bar") q.Add("bar")
@@ -216,10 +237,18 @@ func TestCollapse(t *testing.T) {
if a := q.Len(); a != 0 { if a := q.Len(); a != 0 {
t.Errorf("Expected queue to be empty. Has %v items", a) t.Errorf("Expected queue to be empty. Has %v items", a)
} }
if _, ok := tq.touched["bar"]; !ok {
t.Errorf("Expected bar to be Touched")
}
} }
func TestCollapseWhileProcessing(t *testing.T) { func TestCollapseWhileProcessing(t *testing.T) {
q := workqueue.New() tq := &traceQueue{Queue: workqueue.DefaultQueue()}
q := workqueue.NewWithConfig(workqueue.QueueConfig{
Name: "",
Queue: tq,
})
q.Add("foo") q.Add("foo")
// Start processing // Start processing
@@ -261,6 +290,10 @@ func TestCollapseWhileProcessing(t *testing.T) {
if a := q.Len(); a != 0 { if a := q.Len(); a != 0 {
t.Errorf("Expected queue to be empty. Has %v items", a) t.Errorf("Expected queue to be empty. Has %v items", a)
} }
if _, ok := tq.touched["foo"]; ok {
t.Errorf("Unexpected Touch")
}
} }
func TestQueueDrainageUsingShutDownWithDrain(t *testing.T) { func TestQueueDrainageUsingShutDownWithDrain(t *testing.T) {