mirror of
https://github.com/kubernetes/client-go.git
synced 2025-09-04 08:35:10 +00:00
manually sync with k8s.io/kubernetest at 17375fc59fff39135af63bd1750bb07c36ef873b, k8s.io/apimachinery at d90aa2c8531f13b0ca734845934c10dcb6a56ca7
This commit is contained in:
37
tools/cache/fifo.go
vendored
37
tools/cache/fifo.go
vendored
@@ -17,6 +17,7 @@ limitations under the License.
|
||||
package cache
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
@@ -33,6 +34,8 @@ type ErrRequeue struct {
|
||||
Err error
|
||||
}
|
||||
|
||||
var FIFOClosedError error = errors.New("DeltaFIFO: manipulating with closed queue")
|
||||
|
||||
func (e ErrRequeue) Error() string {
|
||||
if e.Err == nil {
|
||||
return "the popped item should be requeued without returning an error"
|
||||
@@ -58,6 +61,9 @@ type Queue interface {
|
||||
|
||||
// Return true if the first batch of items has been popped
|
||||
HasSynced() bool
|
||||
|
||||
// Close queue
|
||||
Close()
|
||||
}
|
||||
|
||||
// Helper function for popping from Queue.
|
||||
@@ -100,12 +106,26 @@ type FIFO struct {
|
||||
// keyFunc is used to make the key used for queued item insertion and retrieval, and
|
||||
// should be deterministic.
|
||||
keyFunc KeyFunc
|
||||
|
||||
// Indication the queue is closed.
|
||||
// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
|
||||
// Currently, not used to gate any of CRED operations.
|
||||
closed bool
|
||||
closedLock sync.Mutex
|
||||
}
|
||||
|
||||
var (
|
||||
_ = Queue(&FIFO{}) // FIFO is a Queue
|
||||
)
|
||||
|
||||
// Close the queue.
|
||||
func (f *FIFO) Close() {
|
||||
f.closedLock.Lock()
|
||||
defer f.closedLock.Unlock()
|
||||
f.closed = true
|
||||
f.cond.Broadcast()
|
||||
}
|
||||
|
||||
// Return true if an Add/Update/Delete/AddIfNotPresent are called first,
|
||||
// or an Update called first but the first batch of items inserted by Replace() has been popped
|
||||
func (f *FIFO) HasSynced() bool {
|
||||
@@ -222,6 +242,16 @@ func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
|
||||
return item, exists, nil
|
||||
}
|
||||
|
||||
// Checks if the queue is closed
|
||||
func (f *FIFO) IsClosed() bool {
|
||||
f.closedLock.Lock()
|
||||
defer f.closedLock.Unlock()
|
||||
if f.closed {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Pop waits until an item is ready and processes it. If multiple items are
|
||||
// ready, they are returned in the order in which they were added/updated.
|
||||
// The item is removed from the queue (and the store) before it is processed,
|
||||
@@ -233,6 +263,13 @@ func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
|
||||
defer f.lock.Unlock()
|
||||
for {
|
||||
for len(f.queue) == 0 {
|
||||
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
|
||||
// When Close() is called, the f.closed is set and the condition is broadcasted.
|
||||
// Which causes this loop to continue and return from the Pop().
|
||||
if f.IsClosed() {
|
||||
return nil, FIFOClosedError
|
||||
}
|
||||
|
||||
f.cond.Wait()
|
||||
}
|
||||
id := f.queue[0]
|
||||
|
Reference in New Issue
Block a user