diff --git a/util/workqueue/queue.go b/util/workqueue/queue.go index 3cec1768..aaef3c5d 100644 --- a/util/workqueue/queue.go +++ b/util/workqueue/queue.go @@ -20,6 +20,7 @@ import ( "sync" "time" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/utils/clock" ) @@ -163,8 +164,8 @@ func newQueue[T comparable](c clock.WithTicker, queue Queue[T], metrics queueMet t := &Typed[T]{ clock: c, queue: queue, - dirty: set[T]{}, - processing: set[T]{}, + dirty: sets.Set[T]{}, + processing: sets.Set[T]{}, cond: sync.NewCond(&sync.Mutex{}), metrics: metrics, unfinishedWorkUpdatePeriod: updatePeriod, @@ -192,13 +193,13 @@ type Typed[t comparable] struct { queue Queue[t] // dirty defines all of the items that need to be processed. - dirty set[t] + dirty sets.Set[t] // Things that are currently being processed are in the processing set. // These things may be simultaneously in the dirty set. When we finish // processing something and remove it from this set, we'll check if // it's in the dirty set, and if so, add it to the queue. - processing set[t] + processing sets.Set[t] cond *sync.Cond @@ -211,26 +212,6 @@ type Typed[t comparable] struct { clock clock.WithTicker } -type empty struct{} -type set[t comparable] map[t]empty - -func (s set[t]) has(item t) bool { - _, exists := s[item] - return exists -} - -func (s set[t]) insert(item t) { - s[item] = empty{} -} - -func (s set[t]) delete(item t) { - delete(s, item) -} - -func (s set[t]) len() int { - return len(s) -} - // Add marks item as needing processing. func (q *Typed[T]) Add(item T) { q.cond.L.Lock() @@ -238,10 +219,10 @@ func (q *Typed[T]) Add(item T) { if q.shuttingDown { return } - if q.dirty.has(item) { + if q.dirty.Has(item) { // the same item is added again before it is processed, call the Touch // function if the queue cares about it (for e.g, reset its priority) - if !q.processing.has(item) { + if !q.processing.Has(item) { q.queue.Touch(item) } return @@ -249,8 +230,8 @@ func (q *Typed[T]) Add(item T) { q.metrics.add(item) - q.dirty.insert(item) - if q.processing.has(item) { + q.dirty.Insert(item) + if q.processing.Has(item) { return } @@ -285,8 +266,8 @@ func (q *Typed[T]) Get() (item T, shutdown bool) { q.metrics.get(item) - q.processing.insert(item) - q.dirty.delete(item) + q.processing.Insert(item) + q.dirty.Delete(item) return item, false } @@ -300,11 +281,11 @@ func (q *Typed[T]) Done(item T) { q.metrics.done(item) - q.processing.delete(item) - if q.dirty.has(item) { + q.processing.Delete(item) + if q.dirty.Has(item) { q.queue.Push(item) q.cond.Signal() - } else if q.processing.len() == 0 { + } else if q.processing.Len() == 0 { q.cond.Signal() } } @@ -337,7 +318,7 @@ func (q *Typed[T]) ShutDownWithDrain() { q.shuttingDown = true q.cond.Broadcast() - for q.processing.len() != 0 && q.drain { + for q.processing.Len() != 0 && q.drain { q.cond.Wait() } }