mirror of
https://github.com/kubernetes/client-go.git
synced 2025-06-26 15:12:06 +00:00
Merge pull request #91435 from jqmichael/deltaFifoDoc
Updated delta FIFO doc Kubernetes-commit: 5a784d4faaf2d589ed61c4c386f3060b358740f6
This commit is contained in:
commit
be97aaa976
45
tools/cache/delta_fifo.go
vendored
45
tools/cache/delta_fifo.go
vendored
@ -41,7 +41,7 @@ import (
|
||||
// affects error retrying.
|
||||
// NOTE: It is possible to misuse this and cause a race when using an
|
||||
// external known object source.
|
||||
// Whether there is a potential race depends on how the comsumer
|
||||
// Whether there is a potential race depends on how the consumer
|
||||
// modifies knownObjects. In Pop(), process function is called under
|
||||
// lock, so it is safe to update data structures in it that need to be
|
||||
// in sync with the queue (e.g. knownObjects).
|
||||
@ -99,7 +99,7 @@ type DeltaFIFOOptions struct {
|
||||
EmitDeltaTypeReplaced bool
|
||||
}
|
||||
|
||||
// NewDeltaFIFOWithOptions returns a Store which can be used process changes to
|
||||
// NewDeltaFIFOWithOptions returns a Queue which can be used to process changes to
|
||||
// items. See also the comment on DeltaFIFO.
|
||||
func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
|
||||
if opts.KeyFunction == nil {
|
||||
@ -144,7 +144,8 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
|
||||
//
|
||||
// DeltaFIFO's Pop(), Get(), and GetByKey() methods return
|
||||
// interface{} to satisfy the Store/Queue interfaces, but they
|
||||
// will always return an object of type Deltas.
|
||||
// will always return an object of type Deltas. List() returns
|
||||
// the newest objects currently in the FIFO.
|
||||
//
|
||||
// A DeltaFIFO's knownObjects KeyListerGetter provides the abilities
|
||||
// to list Store keys and to get objects by Store key. The objects in
|
||||
@ -160,14 +161,16 @@ type DeltaFIFO struct {
|
||||
lock sync.RWMutex
|
||||
cond sync.Cond
|
||||
|
||||
// We depend on the property that items in the set are in
|
||||
// the queue and vice versa, and that all Deltas in this
|
||||
// map have at least one Delta.
|
||||
// `items` maps keys to Deltas.
|
||||
// `queue` maintains FIFO order of keys for consumption in Pop().
|
||||
// We maintain the property that keys in the `items` and `queue` are
|
||||
// strictly 1:1 mapping, and that all Deltas in `items` should have
|
||||
// at least one Delta.
|
||||
items map[string]Deltas
|
||||
queue []string
|
||||
|
||||
// populated is true if the first batch of items inserted by Replace() has been populated
|
||||
// or Delete/Add/Update was called first.
|
||||
// or Delete/Add/Update/AddIfNotPresent was called first.
|
||||
populated bool
|
||||
// initialPopulationCount is the number of items inserted by the first call of Replace()
|
||||
initialPopulationCount int
|
||||
@ -180,7 +183,6 @@ type DeltaFIFO struct {
|
||||
// Replace(), and Resync()
|
||||
knownObjects KeyListerGetter
|
||||
|
||||
// Indication the queue is closed.
|
||||
// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
|
||||
// Currently, not used to gate any of CRED operations.
|
||||
closed bool
|
||||
@ -225,7 +227,7 @@ func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
|
||||
}
|
||||
|
||||
// HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first,
|
||||
// or an Update called first but the first batch of items inserted by Replace() has been popped
|
||||
// or the first batch of items inserted by Replace() has been popped.
|
||||
func (f *DeltaFIFO) HasSynced() bool {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
@ -282,6 +284,7 @@ func (f *DeltaFIFO) Delete(obj interface{}) error {
|
||||
}
|
||||
}
|
||||
|
||||
// exist in items and/or KnownObjects
|
||||
return f.queueActionLocked(Deleted, obj)
|
||||
}
|
||||
|
||||
@ -332,6 +335,11 @@ func dedupDeltas(deltas Deltas) Deltas {
|
||||
a := &deltas[n-1]
|
||||
b := &deltas[n-2]
|
||||
if out := isDup(a, b); out != nil {
|
||||
// `a` and `b` are duplicates. Only keep the one returned from isDup().
|
||||
// TODO: This extra array allocation and copy seems unnecessary if
|
||||
// all we do to dedup is compare the new delta with the last element
|
||||
// in `items`, which could be done by mutating `items` directly.
|
||||
// Might be worth profiling and investigating if it is safe to optimize.
|
||||
d := append(Deltas{}, deltas[:n-2]...)
|
||||
return append(d, *out)
|
||||
}
|
||||
@ -456,10 +464,12 @@ func (f *DeltaFIFO) IsClosed() bool {
|
||||
// added/updated. The item is removed from the queue (and the store) before it
|
||||
// is returned, so if you don't successfully process it, you need to add it back
|
||||
// with AddIfNotPresent().
|
||||
// process function is called under lock, so it is safe update data structures
|
||||
// process function is called under lock, so it is safe to update data structures
|
||||
// in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
|
||||
// may return an instance of ErrRequeue with a nested error to indicate the current
|
||||
// item should be requeued (equivalent to calling AddIfNotPresent under the lock).
|
||||
// process should avoid expensive I/O operation so that other queue operations, i.e.
|
||||
// Add() and Get(), won't be blocked for too long.
|
||||
//
|
||||
// Pop returns a 'Deltas', which has a complete list of all the things
|
||||
// that happened to the object (deltas) while it was sitting in the queue.
|
||||
@ -520,6 +530,7 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
|
||||
action = Replaced
|
||||
}
|
||||
|
||||
// Add Sync/Replaced action for each new item.
|
||||
for _, item := range list {
|
||||
key, err := f.KeyOf(item)
|
||||
if err != nil {
|
||||
@ -538,6 +549,9 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
|
||||
if keys.Has(k) {
|
||||
continue
|
||||
}
|
||||
// Delete pre-existing items not in the new list.
|
||||
// This could happen if watch deletion event was missed while
|
||||
// disconnected from apiserver.
|
||||
var deletedObj interface{}
|
||||
if n := oldItem.Newest(); n != nil {
|
||||
deletedObj = n.Object
|
||||
@ -649,7 +663,8 @@ type KeyLister interface {
|
||||
|
||||
// A KeyGetter is anything that knows how to get the value stored under a given key.
|
||||
type KeyGetter interface {
|
||||
GetByKey(key string) (interface{}, bool, error)
|
||||
// GetByKey returns the value associated with the key, or sets exists=false.
|
||||
GetByKey(key string) (value interface{}, exists bool, err error)
|
||||
}
|
||||
|
||||
// DeltaType is the type of a change (addition, deletion, etc)
|
||||
@ -712,10 +727,10 @@ func copyDeltas(d Deltas) Deltas {
|
||||
return d2
|
||||
}
|
||||
|
||||
// DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where
|
||||
// an object was deleted but the watch deletion event was missed. In this
|
||||
// case we don't know the final "resting" state of the object, so there's
|
||||
// a chance the included `Obj` is stale.
|
||||
// DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where an object
|
||||
// was deleted but the watch deletion event was missed while disconnected from
|
||||
// apiserver. In this case we don't know the final "resting" state of the object, so
|
||||
// there's a chance the included `Obj` is stale.
|
||||
type DeletedFinalStateUnknown struct {
|
||||
Key string
|
||||
Obj interface{}
|
||||
|
Loading…
Reference in New Issue
Block a user