mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-10-26 11:07:45 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			601 lines
		
	
	
		
			19 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			601 lines
		
	
	
		
			19 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
| Copyright 2014 The Kubernetes Authors.
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| */
 | |
| 
 | |
| package cache
 | |
| 
 | |
| import (
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"sync"
 | |
| 
 | |
| 	"k8s.io/kubernetes/pkg/util/sets"
 | |
| 
 | |
| 	"github.com/golang/glog"
 | |
| )
 | |
| 
 | |
| // NewDeltaFIFO returns a Store which can be used process changes to items.
 | |
| //
 | |
| // keyFunc is used to figure out what key an object should have. (It's
 | |
| // exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.)
 | |
| //
 | |
| // 'compressor' may compress as many or as few items as it wants
 | |
| // (including returning an empty slice), but it should do what it
 | |
| // does quickly since it is called while the queue is locked.
 | |
| // 'compressor' may be nil if you don't want any delta compression.
 | |
| //
 | |
| // 'keyLister' is expected to return a list of keys that the consumer of
 | |
| // this queue "knows about". It is used to decide which items are missing
 | |
| // when Replace() is called; 'Deleted' deltas are produced for these items.
 | |
| // It may be nil if you don't need to detect all deletions.
 | |
| // TODO: consider merging keyLister with this object, tracking a list of
 | |
| //       "known" keys when Pop() is called. Have to think about how that
 | |
| //       affects error retrying.
 | |
| // TODO(lavalamp): I believe there is a possible race only when using an
 | |
| //                 external known object source that the above TODO would
 | |
| //                 fix.
 | |
| //
 | |
| // Also see the comment on DeltaFIFO.
 | |
| func NewDeltaFIFO(keyFunc KeyFunc, compressor DeltaCompressor, knownObjects KeyListerGetter) *DeltaFIFO {
 | |
| 	f := &DeltaFIFO{
 | |
| 		items:           map[string]Deltas{},
 | |
| 		queue:           []string{},
 | |
| 		keyFunc:         keyFunc,
 | |
| 		deltaCompressor: compressor,
 | |
| 		knownObjects:    knownObjects,
 | |
| 	}
 | |
| 	f.cond.L = &f.lock
 | |
| 	return f
 | |
| }
 | |
| 
 | |
| // DeltaFIFO is like FIFO, but allows you to process deletes.
 | |
| //
 | |
| // DeltaFIFO is a producer-consumer queue, where a Reflector is
 | |
| // intended to be the producer, and the consumer is whatever calls
 | |
| // the Pop() method.
 | |
| //
 | |
| // DeltaFIFO solves this use case:
 | |
| //  * You want to process every object change (delta) at most once.
 | |
| //  * When you process an object, you want to see everything
 | |
| //    that's happened to it since you last processed it.
 | |
| //  * You want to process the deletion of objects.
 | |
| //  * You might want to periodically reprocess objects.
 | |
| //
 | |
| // DeltaFIFO's Pop(), Get(), and GetByKey() methods return
 | |
| // interface{} to satisfy the Store/Queue interfaces, but it
 | |
| // will always return an object of type Deltas.
 | |
| //
 | |
| // A note on threading: If you call Pop() in parallel from multiple
 | |
| // threads, you could end up with multiple threads processing slightly
 | |
| // different versions of the same object.
 | |
| //
 | |
| // A note on the KeyLister used by the DeltaFIFO: It's main purpose is
 | |
| // to list keys that are "known", for the purpose of figuring out which
 | |
| // items have been deleted when Replace() or Delete() are called. The deleted
 | |
| // object will be included in the DeleteFinalStateUnknown markers. These objects
 | |
| // could be stale.
 | |
| //
 | |
| // You may provide a function to compress deltas (e.g., represent a
 | |
| // series of Updates as a single Update).
 | |
| type DeltaFIFO struct {
 | |
| 	// lock/cond protects access to 'items' and 'queue'.
 | |
| 	lock sync.RWMutex
 | |
| 	cond sync.Cond
 | |
| 
 | |
| 	// We depend on the property that items in the set are in
 | |
| 	// the queue and vice versa, and that all Deltas in this
 | |
| 	// map have at least one Delta.
 | |
| 	items map[string]Deltas
 | |
| 	queue []string
 | |
| 
 | |
| 	// populated is true if the first batch of items inserted by Replace() has been populated
 | |
| 	// or Delete/Add/Update was called first.
 | |
| 	populated bool
 | |
| 	// initialPopulationCount is the number of items inserted by the first call of Replace()
 | |
| 	initialPopulationCount int
 | |
| 
 | |
| 	// keyFunc is used to make the key used for queued item
 | |
| 	// insertion and retrieval, and should be deterministic.
 | |
| 	keyFunc KeyFunc
 | |
| 
 | |
| 	// deltaCompressor tells us how to combine two or more
 | |
| 	// deltas. It may be nil.
 | |
| 	deltaCompressor DeltaCompressor
 | |
| 
 | |
| 	// knownObjects list keys that are "known", for the
 | |
| 	// purpose of figuring out which items have been deleted
 | |
| 	// when Replace() or Delete() is called.
 | |
| 	knownObjects KeyListerGetter
 | |
| }
 | |
| 
 | |
| var (
 | |
| 	_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
 | |
| )
 | |
| 
 | |
| var (
 | |
| 	// ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas
 | |
| 	// object with zero length is encountered (should be impossible,
 | |
| 	// even if such an object is accidentally produced by a DeltaCompressor--
 | |
| 	// but included for completeness).
 | |
| 	ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key")
 | |
| )
 | |
| 
 | |
| // KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or
 | |
| // DeletedFinalStateUnknown objects.
 | |
| func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
 | |
| 	if d, ok := obj.(Deltas); ok {
 | |
| 		if len(d) == 0 {
 | |
| 			return "", KeyError{obj, ErrZeroLengthDeltasObject}
 | |
| 		}
 | |
| 		obj = d.Newest().Object
 | |
| 	}
 | |
| 	if d, ok := obj.(DeletedFinalStateUnknown); ok {
 | |
| 		return d.Key, nil
 | |
| 	}
 | |
| 	return f.keyFunc(obj)
 | |
| }
 | |
| 
 | |
| // Return true if an Add/Update/Delete/AddIfNotPresent are called first,
 | |
| // or an Update called first but the first batch of items inserted by Replace() has been popped
 | |
| func (f *DeltaFIFO) HasSynced() bool {
 | |
| 	f.lock.Lock()
 | |
| 	defer f.lock.Unlock()
 | |
| 	return f.populated && f.initialPopulationCount == 0
 | |
| }
 | |
| 
 | |
| // Add inserts an item, and puts it in the queue. The item is only enqueued
 | |
| // if it doesn't already exist in the set.
 | |
| func (f *DeltaFIFO) Add(obj interface{}) error {
 | |
| 	f.lock.Lock()
 | |
| 	defer f.lock.Unlock()
 | |
| 	f.populated = true
 | |
| 	return f.queueActionLocked(Added, obj)
 | |
| }
 | |
| 
 | |
| // Update is just like Add, but makes an Updated Delta.
 | |
| func (f *DeltaFIFO) Update(obj interface{}) error {
 | |
| 	f.lock.Lock()
 | |
| 	defer f.lock.Unlock()
 | |
| 	f.populated = true
 | |
| 	return f.queueActionLocked(Updated, obj)
 | |
| }
 | |
| 
 | |
| // Delete is just like Add, but makes an Deleted Delta. If the item does not
 | |
| // already exist, it will be ignored. (It may have already been deleted by a
 | |
| // Replace (re-list), for example.
 | |
| func (f *DeltaFIFO) Delete(obj interface{}) error {
 | |
| 	id, err := f.KeyOf(obj)
 | |
| 	if err != nil {
 | |
| 		return KeyError{obj, err}
 | |
| 	}
 | |
| 	f.lock.Lock()
 | |
| 	defer f.lock.Unlock()
 | |
| 	f.populated = true
 | |
| 	if f.knownObjects == nil {
 | |
| 		if _, exists := f.items[id]; !exists {
 | |
| 			// Presumably, this was deleted when a relist happened.
 | |
| 			// Don't provide a second report of the same deletion.
 | |
| 			return nil
 | |
| 		}
 | |
| 	} else {
 | |
| 		// We only want to skip the "deletion" action if the object doesn't
 | |
| 		// exist in knownObjects and it doesn't have corresponding item in items.
 | |
| 		// Note that even if there is a "deletion" action in items, we can ignore it,
 | |
| 		// because it will be deduped automatically in "queueActionLocked"
 | |
| 		_, exists, err := f.knownObjects.GetByKey(id)
 | |
| 		_, itemsExist := f.items[id]
 | |
| 		if err == nil && !exists && !itemsExist {
 | |
| 			// Presumably, this was deleted when a relist happened.
 | |
| 			// Don't provide a second report of the same deletion.
 | |
| 			// TODO(lavalamp): This may be racy-- we aren't properly locked
 | |
| 			// with knownObjects.
 | |
| 			return nil
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return f.queueActionLocked(Deleted, obj)
 | |
| }
 | |
| 
 | |
| // AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
 | |
| // present in the set, it is neither enqueued nor added to the set.
 | |
| //
 | |
| // This is useful in a single producer/consumer scenario so that the consumer can
 | |
| // safely retry items without contending with the producer and potentially enqueueing
 | |
| // stale items.
 | |
| //
 | |
| // Important: obj must be a Deltas (the output of the Pop() function). Yes, this is
 | |
| // different from the Add/Update/Delete functions.
 | |
| func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
 | |
| 	deltas, ok := obj.(Deltas)
 | |
| 	if !ok {
 | |
| 		return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
 | |
| 	}
 | |
| 	id, err := f.KeyOf(deltas.Newest().Object)
 | |
| 	if err != nil {
 | |
| 		return KeyError{obj, err}
 | |
| 	}
 | |
| 	f.lock.Lock()
 | |
| 	defer f.lock.Unlock()
 | |
| 	f.populated = true
 | |
| 	if _, exists := f.items[id]; exists {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	f.queue = append(f.queue, id)
 | |
| 	f.items[id] = deltas
 | |
| 	f.cond.Broadcast()
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // re-listing and watching can deliver the same update multiple times in any
 | |
| // order. This will combine the most recent two deltas if they are the same.
 | |
| func dedupDeltas(deltas Deltas) Deltas {
 | |
| 	n := len(deltas)
 | |
| 	if n < 2 {
 | |
| 		return deltas
 | |
| 	}
 | |
| 	a := &deltas[n-1]
 | |
| 	b := &deltas[n-2]
 | |
| 	if out := isDup(a, b); out != nil {
 | |
| 		d := append(Deltas{}, deltas[:n-2]...)
 | |
| 		return append(d, *out)
 | |
| 	}
 | |
| 	return deltas
 | |
| }
 | |
| 
 | |
| // If a & b represent the same event, returns the delta that ought to be kept.
 | |
| // Otherwise, returns nil.
 | |
| // TODO: is there anything other than deletions that need deduping?
 | |
| func isDup(a, b *Delta) *Delta {
 | |
| 	if out := isDeletionDup(a, b); out != nil {
 | |
| 		return out
 | |
| 	}
 | |
| 	// TODO: Detect other duplicate situations? Are there any?
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // keep the one with the most information if both are deletions.
 | |
| func isDeletionDup(a, b *Delta) *Delta {
 | |
| 	if b.Type != Deleted || a.Type != Deleted {
 | |
| 		return nil
 | |
| 	}
 | |
| 	// Do more sophisticated checks, or is this sufficient?
 | |
| 	if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
 | |
| 		return a
 | |
| 	}
 | |
| 	return b
 | |
| }
 | |
| 
 | |
| // willObjectBeDeletedLocked returns true only if the last delta for the
 | |
| // given object is Delete. Caller must lock first.
 | |
| func (f *DeltaFIFO) willObjectBeDeletedLocked(id string) bool {
 | |
| 	deltas := f.items[id]
 | |
| 	return len(deltas) > 0 && deltas[len(deltas)-1].Type == Deleted
 | |
| }
 | |
| 
 | |
| // queueActionLocked appends to the delta list for the object, calling
 | |
| // f.deltaCompressor if needed. Caller must lock first.
 | |
| func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
 | |
| 	id, err := f.KeyOf(obj)
 | |
| 	if err != nil {
 | |
| 		return KeyError{obj, err}
 | |
| 	}
 | |
| 
 | |
| 	// If object is supposed to be deleted (last event is Deleted),
 | |
| 	// then we should ignore Sync events, because it would result in
 | |
| 	// recreation of this object.
 | |
| 	if actionType == Sync && f.willObjectBeDeletedLocked(id) {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	newDeltas := append(f.items[id], Delta{actionType, obj})
 | |
| 	newDeltas = dedupDeltas(newDeltas)
 | |
| 	if f.deltaCompressor != nil {
 | |
| 		newDeltas = f.deltaCompressor.Compress(newDeltas)
 | |
| 	}
 | |
| 
 | |
| 	_, exists := f.items[id]
 | |
| 	if len(newDeltas) > 0 {
 | |
| 		if !exists {
 | |
| 			f.queue = append(f.queue, id)
 | |
| 		}
 | |
| 		f.items[id] = newDeltas
 | |
| 		f.cond.Broadcast()
 | |
| 	} else if exists {
 | |
| 		// The compression step removed all deltas, so
 | |
| 		// we need to remove this from our map (extra items
 | |
| 		// in the queue are ignored if they are not in the
 | |
| 		// map).
 | |
| 		delete(f.items, id)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // List returns a list of all the items; it returns the object
 | |
| // from the most recent Delta.
 | |
| // You should treat the items returned inside the deltas as immutable.
 | |
| func (f *DeltaFIFO) List() []interface{} {
 | |
| 	f.lock.RLock()
 | |
| 	defer f.lock.RUnlock()
 | |
| 	return f.listLocked()
 | |
| }
 | |
| 
 | |
| func (f *DeltaFIFO) listLocked() []interface{} {
 | |
| 	list := make([]interface{}, 0, len(f.items))
 | |
| 	for _, item := range f.items {
 | |
| 		// Copy item's slice so operations on this slice (delta
 | |
| 		// compression) won't interfere with the object we return.
 | |
| 		item = copyDeltas(item)
 | |
| 		list = append(list, item.Newest().Object)
 | |
| 	}
 | |
| 	return list
 | |
| }
 | |
| 
 | |
| // ListKeys returns a list of all the keys of the objects currently
 | |
| // in the FIFO.
 | |
| func (f *DeltaFIFO) ListKeys() []string {
 | |
| 	f.lock.RLock()
 | |
| 	defer f.lock.RUnlock()
 | |
| 	list := make([]string, 0, len(f.items))
 | |
| 	for key := range f.items {
 | |
| 		list = append(list, key)
 | |
| 	}
 | |
| 	return list
 | |
| }
 | |
| 
 | |
| // Get returns the complete list of deltas for the requested item,
 | |
| // or sets exists=false.
 | |
| // You should treat the items returned inside the deltas as immutable.
 | |
| func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
 | |
| 	key, err := f.KeyOf(obj)
 | |
| 	if err != nil {
 | |
| 		return nil, false, KeyError{obj, err}
 | |
| 	}
 | |
| 	return f.GetByKey(key)
 | |
| }
 | |
| 
 | |
| // GetByKey returns the complete list of deltas for the requested item,
 | |
| // setting exists=false if that list is empty.
 | |
| // You should treat the items returned inside the deltas as immutable.
 | |
| func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
 | |
| 	f.lock.RLock()
 | |
| 	defer f.lock.RUnlock()
 | |
| 	d, exists := f.items[key]
 | |
| 	if exists {
 | |
| 		// Copy item's slice so operations on this slice (delta
 | |
| 		// compression) won't interfere with the object we return.
 | |
| 		d = copyDeltas(d)
 | |
| 	}
 | |
| 	return d, exists, nil
 | |
| }
 | |
| 
 | |
| // Pop blocks until an item is added to the queue, and then returns it.  If
 | |
| // multiple items are ready, they are returned in the order in which they were
 | |
| // added/updated. The item is removed from the queue (and the store) before it
 | |
| // is returned, so if you don't successfully process it, you need to add it back
 | |
| // with AddIfNotPresent().
 | |
| // process function is called under lock, so it is safe update data structures
 | |
| // in it that need to be in sync with the queue (e.g. knownKeys).
 | |
| //
 | |
| // Pop returns a 'Deltas', which has a complete list of all the things
 | |
| // that happened to the object (deltas) while it was sitting in the queue.
 | |
| func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
 | |
| 	f.lock.Lock()
 | |
| 	defer f.lock.Unlock()
 | |
| 	for {
 | |
| 		for len(f.queue) == 0 {
 | |
| 			f.cond.Wait()
 | |
| 		}
 | |
| 		id := f.queue[0]
 | |
| 		f.queue = f.queue[1:]
 | |
| 		item, ok := f.items[id]
 | |
| 		if f.initialPopulationCount > 0 {
 | |
| 			f.initialPopulationCount--
 | |
| 		}
 | |
| 		if !ok {
 | |
| 			// Item may have been deleted subsequently.
 | |
| 			continue
 | |
| 		}
 | |
| 		delete(f.items, id)
 | |
| 		// Don't need to copyDeltas here, because we're transferring
 | |
| 		// ownership to the caller.
 | |
| 		return item, process(item)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Replace will delete the contents of 'f', using instead the given map.
 | |
| // 'f' takes ownership of the map, you should not reference the map again
 | |
| // after calling this function. f's queue is reset, too; upon return, it
 | |
| // will contain the items in the map, in no particular order.
 | |
| func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
 | |
| 	f.lock.Lock()
 | |
| 	defer f.lock.Unlock()
 | |
| 	keys := make(sets.String, len(list))
 | |
| 
 | |
| 	if !f.populated {
 | |
| 		f.populated = true
 | |
| 		f.initialPopulationCount = len(list)
 | |
| 	}
 | |
| 
 | |
| 	for _, item := range list {
 | |
| 		key, err := f.KeyOf(item)
 | |
| 		if err != nil {
 | |
| 			return KeyError{item, err}
 | |
| 		}
 | |
| 		keys.Insert(key)
 | |
| 		if err := f.queueActionLocked(Sync, item); err != nil {
 | |
| 			return fmt.Errorf("couldn't enqueue object: %v", err)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if f.knownObjects == nil {
 | |
| 		// Do deletion detection against our own list.
 | |
| 		for k, oldItem := range f.items {
 | |
| 			if keys.Has(k) {
 | |
| 				continue
 | |
| 			}
 | |
| 			var deletedObj interface{}
 | |
| 			if n := oldItem.Newest(); n != nil {
 | |
| 				deletedObj = n.Object
 | |
| 			}
 | |
| 			if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 		}
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// Detect deletions not already in the queue.
 | |
| 	// TODO(lavalamp): This may be racy-- we aren't properly locked
 | |
| 	// with knownObjects. Unproven.
 | |
| 	knownKeys := f.knownObjects.ListKeys()
 | |
| 	for _, k := range knownKeys {
 | |
| 		if keys.Has(k) {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		deletedObj, exists, err := f.knownObjects.GetByKey(k)
 | |
| 		if err != nil {
 | |
| 			deletedObj = nil
 | |
| 			glog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
 | |
| 		} else if !exists {
 | |
| 			deletedObj = nil
 | |
| 			glog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
 | |
| 		}
 | |
| 		if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Resync will send a sync event for each item
 | |
| func (f *DeltaFIFO) Resync() error {
 | |
| 	f.lock.RLock()
 | |
| 	defer f.lock.RUnlock()
 | |
| 	for _, k := range f.knownObjects.ListKeys() {
 | |
| 		obj, exists, err := f.knownObjects.GetByKey(k)
 | |
| 		if err != nil {
 | |
| 			glog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, k)
 | |
| 			continue
 | |
| 		} else if !exists {
 | |
| 			glog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", k)
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		if err := f.queueActionLocked(Sync, obj); err != nil {
 | |
| 			return fmt.Errorf("couldn't queue object: %v", err)
 | |
| 		}
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // A KeyListerGetter is anything that knows how to list its keys and look up by key.
 | |
| type KeyListerGetter interface {
 | |
| 	KeyLister
 | |
| 	KeyGetter
 | |
| }
 | |
| 
 | |
| // A KeyLister is anything that knows how to list its keys.
 | |
| type KeyLister interface {
 | |
| 	ListKeys() []string
 | |
| }
 | |
| 
 | |
| // A KeyGetter is anything that knows how to get the value stored under a given key.
 | |
| type KeyGetter interface {
 | |
| 	GetByKey(key string) (interface{}, bool, error)
 | |
| }
 | |
| 
 | |
| // DeltaCompressor is an algorithm that removes redundant changes.
 | |
| type DeltaCompressor interface {
 | |
| 	Compress(Deltas) Deltas
 | |
| }
 | |
| 
 | |
| // DeltaCompressorFunc should remove redundant changes; but changes that
 | |
| // are redundant depend on one's desired semantics, so this is an
 | |
| // injectable function.
 | |
| //
 | |
| // DeltaCompressorFunc adapts a raw function to be a DeltaCompressor.
 | |
| type DeltaCompressorFunc func(Deltas) Deltas
 | |
| 
 | |
| // Compress just calls dc.
 | |
| func (dc DeltaCompressorFunc) Compress(d Deltas) Deltas {
 | |
| 	return dc(d)
 | |
| }
 | |
| 
 | |
| // DeltaType is the type of a change (addition, deletion, etc)
 | |
| type DeltaType string
 | |
| 
 | |
| const (
 | |
| 	Added   DeltaType = "Added"
 | |
| 	Updated DeltaType = "Updated"
 | |
| 	Deleted DeltaType = "Deleted"
 | |
| 	// The other types are obvious. You'll get Sync deltas when:
 | |
| 	//  * A watch expires/errors out and a new list/watch cycle is started.
 | |
| 	//  * You've turned on periodic syncs.
 | |
| 	// (Anything that trigger's DeltaFIFO's Replace() method.)
 | |
| 	Sync DeltaType = "Sync"
 | |
| )
 | |
| 
 | |
| // Delta is the type stored by a DeltaFIFO. It tells you what change
 | |
| // happened, and the object's state after* that change.
 | |
| //
 | |
| // [*] Unless the change is a deletion, and then you'll get the final
 | |
| //     state of the object before it was deleted.
 | |
| type Delta struct {
 | |
| 	Type   DeltaType
 | |
| 	Object interface{}
 | |
| }
 | |
| 
 | |
| // Deltas is a list of one or more 'Delta's to an individual object.
 | |
| // The oldest delta is at index 0, the newest delta is the last one.
 | |
| type Deltas []Delta
 | |
| 
 | |
| // Oldest is a convenience function that returns the oldest delta, or
 | |
| // nil if there are no deltas.
 | |
| func (d Deltas) Oldest() *Delta {
 | |
| 	if len(d) > 0 {
 | |
| 		return &d[0]
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Newest is a convenience function that returns the newest delta, or
 | |
| // nil if there are no deltas.
 | |
| func (d Deltas) Newest() *Delta {
 | |
| 	if n := len(d); n > 0 {
 | |
| 		return &d[n-1]
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // copyDeltas returns a shallow copy of d; that is, it copies the slice but not
 | |
| // the objects in the slice. This allows Get/List to return an object that we
 | |
| // know won't be clobbered by a subsequent call to a delta compressor.
 | |
| func copyDeltas(d Deltas) Deltas {
 | |
| 	d2 := make(Deltas, len(d))
 | |
| 	copy(d2, d)
 | |
| 	return d2
 | |
| }
 | |
| 
 | |
| // DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where
 | |
| // an object was deleted but the watch deletion event was missed. In this
 | |
| // case we don't know the final "resting" state of the object, so there's
 | |
| // a chance the included `Obj` is stale.
 | |
| type DeletedFinalStateUnknown struct {
 | |
| 	Key string
 | |
| 	Obj interface{}
 | |
| }
 |