Added clarification to delta FIFO doc

Kubernetes-commit: c71272b0c7ac09991ba215e7bc5a8af8334c4b17
This commit is contained in:
Qing Ju 2020-05-25 19:48:10 -07:00 committed by Kubernetes Publisher
parent 72878402c8
commit 89cf2be62c

View File

@ -41,7 +41,7 @@ import (
// affects error retrying. // affects error retrying.
// NOTE: It is possible to misuse this and cause a race when using an // NOTE: It is possible to misuse this and cause a race when using an
// external known object source. // external known object source.
// Whether there is a potential race depends on how the comsumer // Whether there is a potential race depends on how the consumer
// modifies knownObjects. In Pop(), process function is called under // modifies knownObjects. In Pop(), process function is called under
// lock, so it is safe to update data structures in it that need to be // lock, so it is safe to update data structures in it that need to be
// in sync with the queue (e.g. knownObjects). // in sync with the queue (e.g. knownObjects).
@ -99,7 +99,7 @@ type DeltaFIFOOptions struct {
EmitDeltaTypeReplaced bool EmitDeltaTypeReplaced bool
} }
// NewDeltaFIFOWithOptions returns a Store which can be used process changes to // NewDeltaFIFOWithOptions returns a Queue which can be used to process changes to
// items. See also the comment on DeltaFIFO. // items. See also the comment on DeltaFIFO.
func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
if opts.KeyFunction == nil { if opts.KeyFunction == nil {
@ -144,7 +144,8 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
// //
// DeltaFIFO's Pop(), Get(), and GetByKey() methods return // DeltaFIFO's Pop(), Get(), and GetByKey() methods return
// interface{} to satisfy the Store/Queue interfaces, but they // interface{} to satisfy the Store/Queue interfaces, but they
// will always return an object of type Deltas. // will always return an object of type Deltas. List() returns
// the newest objects currently in the FIFO.
// //
// A DeltaFIFO's knownObjects KeyListerGetter provides the abilities // A DeltaFIFO's knownObjects KeyListerGetter provides the abilities
// to list Store keys and to get objects by Store key. The objects in // to list Store keys and to get objects by Store key. The objects in
@ -160,14 +161,16 @@ type DeltaFIFO struct {
lock sync.RWMutex lock sync.RWMutex
cond sync.Cond cond sync.Cond
// We depend on the property that items in the set are in // `items` maps keys to Deltas.
// the queue and vice versa, and that all Deltas in this // `queue` maintains FIFO order of keys for consumption in Pop().
// map have at least one Delta. // We maintain the property that keys in the `items` and `queue` are
// strictly 1:1 mapping, and that all Deltas in `items` should have
// at least one Delta.
items map[string]Deltas items map[string]Deltas
queue []string queue []string
// populated is true if the first batch of items inserted by Replace() has been populated // populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update was called first. // or Delete/Add/Update/AddIfNotPresent was called first.
populated bool populated bool
// initialPopulationCount is the number of items inserted by the first call of Replace() // initialPopulationCount is the number of items inserted by the first call of Replace()
initialPopulationCount int initialPopulationCount int
@ -180,7 +183,6 @@ type DeltaFIFO struct {
// Replace(), and Resync() // Replace(), and Resync()
knownObjects KeyListerGetter knownObjects KeyListerGetter
// Indication the queue is closed.
// Used to indicate a queue is closed so a control loop can exit when a queue is empty. // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
// Currently, not used to gate any of CRED operations. // Currently, not used to gate any of CRED operations.
closed bool closed bool
@ -225,7 +227,7 @@ func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
} }
// HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first, // HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first,
// or an Update called first but the first batch of items inserted by Replace() has been popped // or the first batch of items inserted by Replace() has been popped.
func (f *DeltaFIFO) HasSynced() bool { func (f *DeltaFIFO) HasSynced() bool {
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()
@ -282,6 +284,7 @@ func (f *DeltaFIFO) Delete(obj interface{}) error {
} }
} }
// exist in items and/or KnownObjects
return f.queueActionLocked(Deleted, obj) return f.queueActionLocked(Deleted, obj)
} }
@ -332,6 +335,11 @@ func dedupDeltas(deltas Deltas) Deltas {
a := &deltas[n-1] a := &deltas[n-1]
b := &deltas[n-2] b := &deltas[n-2]
if out := isDup(a, b); out != nil { if out := isDup(a, b); out != nil {
// `a` and `b` are duplicates. Only keep the one returned from isDup().
// TODO: This extra array allocation and copy seems unnecessary if
// all we do to dedup is compare the new delta with the last element
// in `items`, which could be done by mutating `items` directly.
// Might be worth profiling and investigating if it is safe to optimize.
d := append(Deltas{}, deltas[:n-2]...) d := append(Deltas{}, deltas[:n-2]...)
return append(d, *out) return append(d, *out)
} }
@ -456,10 +464,12 @@ func (f *DeltaFIFO) IsClosed() bool {
// added/updated. The item is removed from the queue (and the store) before it // added/updated. The item is removed from the queue (and the store) before it
// is returned, so if you don't successfully process it, you need to add it back // is returned, so if you don't successfully process it, you need to add it back
// with AddIfNotPresent(). // with AddIfNotPresent().
// process function is called under lock, so it is safe update data structures // process function is called under lock, so it is safe to update data structures
// in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc // in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
// may return an instance of ErrRequeue with a nested error to indicate the current // may return an instance of ErrRequeue with a nested error to indicate the current
// item should be requeued (equivalent to calling AddIfNotPresent under the lock). // item should be requeued (equivalent to calling AddIfNotPresent under the lock).
// process should avoid expensive I/O operation so that other queue operations, i.e.
// Add() and Get(), won't be blocked for too long.
// //
// Pop returns a 'Deltas', which has a complete list of all the things // Pop returns a 'Deltas', which has a complete list of all the things
// that happened to the object (deltas) while it was sitting in the queue. // that happened to the object (deltas) while it was sitting in the queue.
@ -520,6 +530,7 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
action = Replaced action = Replaced
} }
// Add Sync/Replaced action for each new item.
for _, item := range list { for _, item := range list {
key, err := f.KeyOf(item) key, err := f.KeyOf(item)
if err != nil { if err != nil {
@ -538,6 +549,9 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
if keys.Has(k) { if keys.Has(k) {
continue continue
} }
// Delete pre-existing items not in the new list.
// This could happen if watch deletion event was missed while
// disconnected from apiserver.
var deletedObj interface{} var deletedObj interface{}
if n := oldItem.Newest(); n != nil { if n := oldItem.Newest(); n != nil {
deletedObj = n.Object deletedObj = n.Object
@ -649,7 +663,8 @@ type KeyLister interface {
// A KeyGetter is anything that knows how to get the value stored under a given key. // A KeyGetter is anything that knows how to get the value stored under a given key.
type KeyGetter interface { type KeyGetter interface {
GetByKey(key string) (interface{}, bool, error) // GetByKey returns the value associated with the key, or sets exists=false.
GetByKey(key string) (value interface{}, exists bool, err error)
} }
// DeltaType is the type of a change (addition, deletion, etc) // DeltaType is the type of a change (addition, deletion, etc)
@ -712,10 +727,10 @@ func copyDeltas(d Deltas) Deltas {
return d2 return d2
} }
// DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where // DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where an object
// an object was deleted but the watch deletion event was missed. In this // was deleted but the watch deletion event was missed while disconnected from
// case we don't know the final "resting" state of the object, so there's // apiserver. In this case we don't know the final "resting" state of the object, so
// a chance the included `Obj` is stale. // there's a chance the included `Obj` is stale.
type DeletedFinalStateUnknown struct { type DeletedFinalStateUnknown struct {
Key string Key string
Obj interface{} Obj interface{}