Use sets.Set instead of a local impl

Kubernetes-commit: d193ffe57ffbbbc07dd49993e4f06ab3b50b2317
This commit is contained in:
Mikhail Mazurskiy 2025-07-07 19:21:39 +10:00 committed by Kubernetes Publisher
parent 879be6242f
commit 76df3ebd2e

View File

@ -20,6 +20,7 @@ import (
"sync" "sync"
"time" "time"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/clock" "k8s.io/utils/clock"
) )
@ -163,8 +164,8 @@ func newQueue[T comparable](c clock.WithTicker, queue Queue[T], metrics queueMet
t := &Typed[T]{ t := &Typed[T]{
clock: c, clock: c,
queue: queue, queue: queue,
dirty: set[T]{}, dirty: sets.Set[T]{},
processing: set[T]{}, processing: sets.Set[T]{},
cond: sync.NewCond(&sync.Mutex{}), cond: sync.NewCond(&sync.Mutex{}),
metrics: metrics, metrics: metrics,
unfinishedWorkUpdatePeriod: updatePeriod, unfinishedWorkUpdatePeriod: updatePeriod,
@ -192,13 +193,13 @@ type Typed[t comparable] struct {
queue Queue[t] queue Queue[t]
// dirty defines all of the items that need to be processed. // dirty defines all of the items that need to be processed.
dirty set[t] dirty sets.Set[t]
// Things that are currently being processed are in the processing set. // Things that are currently being processed are in the processing set.
// These things may be simultaneously in the dirty set. When we finish // These things may be simultaneously in the dirty set. When we finish
// processing something and remove it from this set, we'll check if // processing something and remove it from this set, we'll check if
// it's in the dirty set, and if so, add it to the queue. // it's in the dirty set, and if so, add it to the queue.
processing set[t] processing sets.Set[t]
cond *sync.Cond cond *sync.Cond
@ -211,26 +212,6 @@ type Typed[t comparable] struct {
clock clock.WithTicker clock clock.WithTicker
} }
type empty struct{}
type set[t comparable] map[t]empty
func (s set[t]) has(item t) bool {
_, exists := s[item]
return exists
}
func (s set[t]) insert(item t) {
s[item] = empty{}
}
func (s set[t]) delete(item t) {
delete(s, item)
}
func (s set[t]) len() int {
return len(s)
}
// Add marks item as needing processing. // Add marks item as needing processing.
func (q *Typed[T]) Add(item T) { func (q *Typed[T]) Add(item T) {
q.cond.L.Lock() q.cond.L.Lock()
@ -238,10 +219,10 @@ func (q *Typed[T]) Add(item T) {
if q.shuttingDown { if q.shuttingDown {
return return
} }
if q.dirty.has(item) { if q.dirty.Has(item) {
// the same item is added again before it is processed, call the Touch // the same item is added again before it is processed, call the Touch
// function if the queue cares about it (for e.g, reset its priority) // function if the queue cares about it (for e.g, reset its priority)
if !q.processing.has(item) { if !q.processing.Has(item) {
q.queue.Touch(item) q.queue.Touch(item)
} }
return return
@ -249,8 +230,8 @@ func (q *Typed[T]) Add(item T) {
q.metrics.add(item) q.metrics.add(item)
q.dirty.insert(item) q.dirty.Insert(item)
if q.processing.has(item) { if q.processing.Has(item) {
return return
} }
@ -285,8 +266,8 @@ func (q *Typed[T]) Get() (item T, shutdown bool) {
q.metrics.get(item) q.metrics.get(item)
q.processing.insert(item) q.processing.Insert(item)
q.dirty.delete(item) q.dirty.Delete(item)
return item, false return item, false
} }
@ -300,11 +281,11 @@ func (q *Typed[T]) Done(item T) {
q.metrics.done(item) q.metrics.done(item)
q.processing.delete(item) q.processing.Delete(item)
if q.dirty.has(item) { if q.dirty.Has(item) {
q.queue.Push(item) q.queue.Push(item)
q.cond.Signal() q.cond.Signal()
} else if q.processing.len() == 0 { } else if q.processing.Len() == 0 {
q.cond.Signal() q.cond.Signal()
} }
} }
@ -337,7 +318,7 @@ func (q *Typed[T]) ShutDownWithDrain() {
q.shuttingDown = true q.shuttingDown = true
q.cond.Broadcast() q.cond.Broadcast()
for q.processing.len() != 0 && q.drain { for q.processing.Len() != 0 && q.drain {
q.cond.Wait() q.cond.Wait()
} }
} }