From a8184e81bc47e2b08d3ab9c50009d34b15dc4381 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Thu, 5 Mar 2015 13:38:31 -0800 Subject: [PATCH 1/4] Update FIFO documentation --- pkg/client/cache/fifo.go | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/pkg/client/cache/fifo.go b/pkg/client/cache/fifo.go index 7ff644ad0d2..e778980d6f9 100644 --- a/pkg/client/cache/fifo.go +++ b/pkg/client/cache/fifo.go @@ -21,11 +21,25 @@ import ( "sync" ) +// Queue is exactly like a Store, but has a Pop() method too. +type Queue interface { + Store + // Pop blocks until it has something to return. + Pop() interface{} +} + // FIFO receives adds and updates from a Reflector, and puts them in a queue for // FIFO order processing. If multiple adds/updates of a single item happen while // an item is in the queue before it has been processed, it will only be // processed once, and when it is processed, the most recent version will be // processed. This can't be done with a channel. +// +// FIFO solves this use case: +// * You want to process every object (exactly) once. +// * You want to process the most recent version of the object when you process it. +// * You do not want to process deleted objects, they should be removed from the queue. +// * You do not want to periodically reprocess objects. +// Compare with DeltaFIFO for other use cases. type FIFO struct { lock sync.RWMutex cond sync.Cond @@ -37,6 +51,10 @@ type FIFO struct { keyFunc KeyFunc } +var ( + _ = Queue(&FIFO{}) // FIFO is a Queue +) + // Add inserts an item, and puts it in the queue. The item is only enqueued // if it doesn't already exist in the set. func (f *FIFO) Add(obj interface{}) error { @@ -127,7 +145,8 @@ func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) { // Pop waits until an item is ready and returns it. If multiple items are // ready, they are returned in the order in which they were added/updated. // The item is removed from the queue (and the store) before it is returned, -// so if you don't succesfully process it, you need to add it back with Add(). +// so if you don't succesfully process it, you need to add it back with +// AddIfNotPresent(). func (f *FIFO) Pop() interface{} { f.lock.Lock() defer f.lock.Unlock() From e8cc126a568fe7fe3aaa3ca64f0e7b2d3a3d0fff Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 9 Mar 2015 14:48:59 -0700 Subject: [PATCH 2/4] Add ListKeys() to FIFO & test --- pkg/client/cache/fifo.go | 19 ++++++++++ pkg/client/cache/fifo_test.go | 67 ++++++++++++++++------------------- 2 files changed, 49 insertions(+), 37 deletions(-) diff --git a/pkg/client/cache/fifo.go b/pkg/client/cache/fifo.go index e778980d6f9..685c98fade6 100644 --- a/pkg/client/cache/fifo.go +++ b/pkg/client/cache/fifo.go @@ -24,8 +24,15 @@ import ( // Queue is exactly like a Store, but has a Pop() method too. type Queue interface { Store + // Pop blocks until it has something to return. Pop() interface{} + + // AddIfNotPresent adds a value previously + // returned by Pop back into the queue as long + // as nothing else (presumably more recent) + // has since been added. + AddIfNotPresent(interface{}) error } // FIFO receives adds and updates from a Reflector, and puts them in a queue for @@ -125,6 +132,18 @@ func (f *FIFO) List() []interface{} { return list } +// ListKeys returns a list of all the keys of the objects currently +// in the FIFO. +func (f *FIFO) ListKeys() []string { + f.lock.RLock() + defer f.lock.RUnlock() + list := make([]string, 0, len(f.items)) + for key := range f.items { + list = append(list, key) + } + return list +} + // Get returns the requested item, or sets exists=false. func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) { key, err := f.keyFunc(obj) diff --git a/pkg/client/cache/fifo_test.go b/pkg/client/cache/fifo_test.go index f96ea14849b..1db91740a33 100644 --- a/pkg/client/cache/fifo_test.go +++ b/pkg/client/cache/fifo_test.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "reflect" "testing" "time" ) @@ -30,21 +31,21 @@ type testFifoObject struct { val interface{} } -func TestFIFO_basic(t *testing.T) { - mkObj := func(name string, val interface{}) testFifoObject { - return testFifoObject{name: name, val: val} - } +func mkFifoObj(name string, val interface{}) testFifoObject { + return testFifoObject{name: name, val: val} +} +func TestFIFO_basic(t *testing.T) { f := NewFIFO(testFifoObjectKeyFunc) const amount = 500 go func() { for i := 0; i < amount; i++ { - f.Add(mkObj(string([]rune{'a', rune(i)}), i+1)) + f.Add(mkFifoObj(string([]rune{'a', rune(i)}), i+1)) } }() go func() { for u := uint64(0); u < amount; u++ { - f.Add(mkObj(string([]rune{'b', rune(u)}), u+1)) + f.Add(mkFifoObj(string([]rune{'b', rune(u)}), u+1)) } }() @@ -70,13 +71,17 @@ func TestFIFO_basic(t *testing.T) { } func TestFIFO_addUpdate(t *testing.T) { - mkObj := func(name string, val interface{}) testFifoObject { - return testFifoObject{name: name, val: val} + f := NewFIFO(testFifoObjectKeyFunc) + f.Add(mkFifoObj("foo", 10)) + f.Update(mkFifoObj("foo", 15)) + + if e, a := []interface{}{mkFifoObj("foo", 15)}, f.List(); !reflect.DeepEqual(e, a) { + t.Errorf("Expected %+v, got %+v", e, a) + } + if e, a := []string{"foo"}, f.ListKeys(); !reflect.DeepEqual(e, a) { + t.Errorf("Expected %+v, got %+v", e, a) } - f := NewFIFO(testFifoObjectKeyFunc) - f.Add(mkObj("foo", 10)) - f.Update(mkObj("foo", 15)) got := make(chan testFifoObject, 2) go func() { for { @@ -93,20 +98,16 @@ func TestFIFO_addUpdate(t *testing.T) { t.Errorf("Got second value %v", unexpected.val) case <-time.After(50 * time.Millisecond): } - _, exists, _ := f.Get(mkObj("foo", "")) + _, exists, _ := f.Get(mkFifoObj("foo", "")) if exists { t.Errorf("item did not get removed") } } func TestFIFO_addReplace(t *testing.T) { - mkObj := func(name string, val interface{}) testFifoObject { - return testFifoObject{name: name, val: val} - } - f := NewFIFO(testFifoObjectKeyFunc) - f.Add(mkObj("foo", 10)) - f.Replace([]interface{}{mkObj("foo", 15)}) + f.Add(mkFifoObj("foo", 10)) + f.Replace([]interface{}{mkFifoObj("foo", 15)}) got := make(chan testFifoObject, 2) go func() { for { @@ -123,30 +124,26 @@ func TestFIFO_addReplace(t *testing.T) { t.Errorf("Got second value %v", unexpected.val) case <-time.After(50 * time.Millisecond): } - _, exists, _ := f.Get(mkObj("foo", "")) + _, exists, _ := f.Get(mkFifoObj("foo", "")) if exists { t.Errorf("item did not get removed") } } func TestFIFO_detectLineJumpers(t *testing.T) { - mkObj := func(name string, val interface{}) testFifoObject { - return testFifoObject{name: name, val: val} - } - f := NewFIFO(testFifoObjectKeyFunc) - f.Add(mkObj("foo", 10)) - f.Add(mkObj("bar", 1)) - f.Add(mkObj("foo", 11)) - f.Add(mkObj("foo", 13)) - f.Add(mkObj("zab", 30)) + f.Add(mkFifoObj("foo", 10)) + f.Add(mkFifoObj("bar", 1)) + f.Add(mkFifoObj("foo", 11)) + f.Add(mkFifoObj("foo", 13)) + f.Add(mkFifoObj("zab", 30)) if e, a := 13, f.Pop().(testFifoObject).val; a != e { t.Fatalf("expected %d, got %d", e, a) } - f.Add(mkObj("foo", 14)) // ensure foo doesn't jump back in line + f.Add(mkFifoObj("foo", 14)) // ensure foo doesn't jump back in line if e, a := 1, f.Pop().(testFifoObject).val; a != e { t.Fatalf("expected %d, got %d", e, a) @@ -162,16 +159,12 @@ func TestFIFO_detectLineJumpers(t *testing.T) { } func TestFIFO_addIfNotPresent(t *testing.T) { - mkObj := func(name string, val interface{}) testFifoObject { - return testFifoObject{name: name, val: val} - } - f := NewFIFO(testFifoObjectKeyFunc) - f.Add(mkObj("a", 1)) - f.Add(mkObj("b", 2)) - f.AddIfNotPresent(mkObj("b", 3)) - f.AddIfNotPresent(mkObj("c", 4)) + f.Add(mkFifoObj("a", 1)) + f.Add(mkFifoObj("b", 2)) + f.AddIfNotPresent(mkFifoObj("b", 3)) + f.AddIfNotPresent(mkFifoObj("c", 4)) if e, a := 3, len(f.items); a != e { t.Fatalf("expected queue length %d, got %d", e, a) From af870b87727d00685daabccca0d9830b1ad09530 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Thu, 26 Mar 2015 13:28:05 -0700 Subject: [PATCH 3/4] Add proper key error type --- pkg/client/cache/fifo.go | 11 +++++------ pkg/client/cache/store.go | 22 +++++++++++++++++----- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/pkg/client/cache/fifo.go b/pkg/client/cache/fifo.go index 685c98fade6..eccafd8e064 100644 --- a/pkg/client/cache/fifo.go +++ b/pkg/client/cache/fifo.go @@ -17,7 +17,6 @@ limitations under the License. package cache import ( - "fmt" "sync" ) @@ -67,7 +66,7 @@ var ( func (f *FIFO) Add(obj interface{}) error { id, err := f.keyFunc(obj) if err != nil { - return fmt.Errorf("couldn't create key for object: %v", err) + return KeyError{obj, err} } f.lock.Lock() defer f.lock.Unlock() @@ -88,7 +87,7 @@ func (f *FIFO) Add(obj interface{}) error { func (f *FIFO) AddIfNotPresent(obj interface{}) error { id, err := f.keyFunc(obj) if err != nil { - return fmt.Errorf("couldn't create key for object: %v", err) + return KeyError{obj, err} } f.lock.Lock() defer f.lock.Unlock() @@ -113,7 +112,7 @@ func (f *FIFO) Update(obj interface{}) error { func (f *FIFO) Delete(obj interface{}) error { id, err := f.keyFunc(obj) if err != nil { - return fmt.Errorf("couldn't create key for object: %v", err) + return KeyError{obj, err} } f.lock.Lock() defer f.lock.Unlock() @@ -148,7 +147,7 @@ func (f *FIFO) ListKeys() []string { func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) { key, err := f.keyFunc(obj) if err != nil { - return nil, false, fmt.Errorf("couldn't create key for object: %v", err) + return nil, false, KeyError{obj, err} } return f.GetByKey(key) } @@ -194,7 +193,7 @@ func (f *FIFO) Replace(list []interface{}) error { for _, item := range list { key, err := f.keyFunc(item) if err != nil { - return fmt.Errorf("couldn't create key for object: %v", err) + return KeyError{item, err} } items[key] = item } diff --git a/pkg/client/cache/store.go b/pkg/client/cache/store.go index 828a1224ba3..47d70971c0b 100644 --- a/pkg/client/cache/store.go +++ b/pkg/client/cache/store.go @@ -49,6 +49,18 @@ type Store interface { // KeyFunc knows how to make a key from an object. Implementations should be deterministic. type KeyFunc func(obj interface{}) (string, error) +// KeyError will be returned any time a KeyFunc gives an error; it includes the object +// at fault. +type KeyError struct { + Obj interface{} + Err error +} + +// Error gives a human-readable description of the error. +func (k KeyError) Error() string { + return fmt.Sprintf("couldn't create key for object %+v: %v", k.Obj, k.Err) +} + // MetaNamespaceKeyFunc is a convenient default KeyFunc which knows how to make // keys for API objects which implement meta.Interface. // The key uses the format: / @@ -79,7 +91,7 @@ type cache struct { func (c *cache) Add(obj interface{}) error { key, err := c.keyFunc(obj) if err != nil { - return fmt.Errorf("couldn't create key for object: %v", err) + return KeyError{obj, err} } // keep a pointer to whatever could have been there previously c.lock.Lock() @@ -148,7 +160,7 @@ func (c *cache) deleteFromIndices(obj interface{}) error { func (c *cache) Update(obj interface{}) error { key, err := c.keyFunc(obj) if err != nil { - return fmt.Errorf("couldn't create key for object: %v", err) + return KeyError{obj, err} } c.lock.Lock() defer c.lock.Unlock() @@ -162,7 +174,7 @@ func (c *cache) Update(obj interface{}) error { func (c *cache) Delete(obj interface{}) error { key, err := c.keyFunc(obj) if err != nil { - return fmt.Errorf("couldn't create key for object: %v", err) + return KeyError{obj, err} } c.lock.Lock() defer c.lock.Unlock() @@ -212,7 +224,7 @@ func (c *cache) Index(indexName string, obj interface{}) ([]interface{}, error) func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error) { key, _ := c.keyFunc(obj) if err != nil { - return nil, false, fmt.Errorf("couldn't create key for object: %v", err) + return nil, false, KeyError{obj, err} } return c.GetByKey(key) } @@ -234,7 +246,7 @@ func (c *cache) Replace(list []interface{}) error { for _, item := range list { key, err := c.keyFunc(item) if err != nil { - return fmt.Errorf("couldn't create key for object: %v", err) + return KeyError{item, err} } items[key] = item } From 68287713a0b56e95ab5684cf600ca4fa0fd7f216 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Thu, 5 Mar 2015 14:42:01 -0800 Subject: [PATCH 4/4] Add a "delta fifo", which queues up object changes Test coverage for module at 80%. --- pkg/client/cache/delta_fifo.go | 434 ++++++++++++++++++++++++++++ pkg/client/cache/delta_fifo_test.go | 277 ++++++++++++++++++ 2 files changed, 711 insertions(+) create mode 100644 pkg/client/cache/delta_fifo.go create mode 100644 pkg/client/cache/delta_fifo_test.go diff --git a/pkg/client/cache/delta_fifo.go b/pkg/client/cache/delta_fifo.go new file mode 100644 index 00000000000..1f5fa245ea7 --- /dev/null +++ b/pkg/client/cache/delta_fifo.go @@ -0,0 +1,434 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "errors" + "fmt" + "sync" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +// NewDeltaFIFO returns a Store which can be used process changes to items. +// +// keyFunc is used to figure out what key an object should have. (It's +// exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.) +// +// 'compressor' may compress as many or as few items as it wants +// (including returning an empty slice), but it should do what it +// does quickly since it is called while the queue is locked. +// 'compressor' may be nil if you don't want any delta compression. +// +// 'keyLister' is expected to return a list of keys that the consumer of +// this queue "knows about". It is used to decide which items are missing +// when Replace() is called; 'Deleted' deltas are produced for these items. +// It may be nil if you don't need to detect all deletions. +// TODO: consider merging keyLister with this object, tracking a list of +// "known" keys when Pop() is called. Have to think about how that +// affects error retrying. +// +// Also see the comment on DeltaFIFO. +func NewDeltaFIFO(keyFunc KeyFunc, compressor DeltaCompressor, knownObjectKeys KeyLister) *DeltaFIFO { + f := &DeltaFIFO{ + items: map[string]Deltas{}, + queue: []string{}, + keyFunc: keyFunc, + deltaCompressor: compressor, + knownObjectKeys: knownObjectKeys, + } + f.cond.L = &f.lock + return f +} + +// DeltaFIFO is like FIFO, but allows you to process deletes. +// +// DeltaFIFO is a producer-consumer queue, where a Reflector is +// indended to be the producer, and the consumer is whatever calls +// the Pop() method. +// +// DeltaFIFO solves this use case: +// * You want to process every object change (delta) at most once. +// * When you process an object, you want to see everything +// that's happened to it since you last processed it. +// * You want to process the deletion of objects. +// * You might want to periodically reprocess objects. +// +// DeltaFIFO's Pop(), Get(), and GetByKey() methods return +// interface{} to satisfy the Store/Queue interfaces, but it +// will always return an object of type Deltas. +// +// A note on threading: If you call Pop() in parallel from multiple +// threads, you could end up with multiple threads processing slightly +// different versions of the same object. +// +// You may provide a function to compress deltas (e.g., represent a +// series of Updates as a single Update). +type DeltaFIFO struct { + // lock/cond protects access to 'items' and 'queue'. + lock sync.RWMutex + cond sync.Cond + + // We depend on the property that items in the set are in + // the queue and vice versa, and that all Deltas in this + // map have at least one Delta. + items map[string]Deltas + queue []string + + // keyFunc is used to make the key used for queued item + // insertion and retrieval, and should be deterministic. + keyFunc KeyFunc + + // deltaCompressor tells us how to combine two or more + // deltas. It may be nil. + deltaCompressor DeltaCompressor + + // knownObjectKeys list keys that are "known", for the + // purpose of figuring out which items have been deleted + // when Replace() is called. + knownObjectKeys KeyLister +} + +var ( + _ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue +) + +var ( + // ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas + // object with zero length is encountered (should be impossible, + // even if such an object is accidentally produced by a DeltaCompressor-- + // but included for completeness). + ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key") +) + +// KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or +// DeletedFinalStateUnknown objects. +func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) { + if d, ok := obj.(Deltas); ok { + if len(d) == 0 { + return "", KeyError{obj, ErrZeroLengthDeltasObject} + } + obj = d.Newest().Object + } + if d, ok := obj.(DeletedFinalStateUnknown); ok { + return d.Key, nil + } + return f.keyFunc(obj) +} + +// Add inserts an item, and puts it in the queue. The item is only enqueued +// if it doesn't already exist in the set. +func (f *DeltaFIFO) Add(obj interface{}) error { + f.lock.Lock() + defer f.lock.Unlock() + return f.queueActionLocked(Added, obj) +} + +// Update is just like Add, but makes an Updated Delta. +func (f *DeltaFIFO) Update(obj interface{}) error { + f.lock.Lock() + defer f.lock.Unlock() + return f.queueActionLocked(Updated, obj) +} + +// Delete is just like Add, but makes an Deleted Delta. +func (f *DeltaFIFO) Delete(obj interface{}) error { + f.lock.Lock() + defer f.lock.Unlock() + return f.queueActionLocked(Deleted, obj) +} + +// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already +// present in the set, it is neither enqueued nor added to the set. +// +// This is useful in a single producer/consumer scenario so that the consumer can +// safely retry items without contending with the producer and potentially enqueueing +// stale items. +// +// Important: obj must be a Deltas (the output of the Pop() function). Yes, this is +// different from the Add/Update/Delete functions. +func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error { + deltas, ok := obj.(Deltas) + if !ok { + return fmt.Errorf("object must be of type deltas, but got: %#v", obj) + } + id, err := f.KeyOf(deltas.Newest().Object) + if err != nil { + return KeyError{obj, err} + } + f.lock.Lock() + defer f.lock.Unlock() + if _, exists := f.items[id]; exists { + return nil + } + + f.queue = append(f.queue, id) + f.items[id] = deltas + f.cond.Broadcast() + return nil +} + +// queueActionLocked appends to the delta list for the object, calling +// f.deltaCompressor if needed +func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { + id, err := f.KeyOf(obj) + if err != nil { + return KeyError{obj, err} + } + newDeltas := append(f.items[id], Delta{actionType, obj}) + if f.deltaCompressor != nil { + newDeltas = f.deltaCompressor.Compress(newDeltas) + } + + _, exists := f.items[id] + if len(newDeltas) > 0 { + if !exists { + f.queue = append(f.queue, id) + } + f.items[id] = newDeltas + f.cond.Broadcast() + } else if exists { + // The compression step removed all deltas, so + // we need to remove this from our map (extra items + // in the queue are ignored if they are not in the + // map). + delete(f.items, id) + } + return nil +} + +// List returns a list of all the items; it returns the object +// from the most recent Delta. +// You should treat the items returned inside the deltas as immutable. +func (f *DeltaFIFO) List() []interface{} { + f.lock.RLock() + defer f.lock.RUnlock() + list := make([]interface{}, 0, len(f.items)) + for _, item := range f.items { + // Copy item's slice so operations on this slice (delta + // compression) won't interfere with the object we return. + item = copyDeltas(item) + list = append(list, item.Newest().Object) + } + return list +} + +// ListKeys returns a list of all the keys of the objects currently +// in the FIFO. +func (f *DeltaFIFO) ListKeys() []string { + f.lock.RLock() + defer f.lock.RUnlock() + list := make([]string, 0, len(f.items)) + for key := range f.items { + list = append(list, key) + } + return list +} + +// Get returns the complete list of deltas for the requested item, +// or sets exists=false. +// You should treat the items returned inside the deltas as immutable. +func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) { + key, err := f.KeyOf(obj) + if err != nil { + return nil, false, KeyError{obj, err} + } + return f.GetByKey(key) +} + +// GetByKey returns the complete list of deltas for the requested item, +// setting exists=false if that list is empty. +// You should treat the items returned inside the deltas as immutable. +func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) { + f.lock.RLock() + defer f.lock.RUnlock() + d, exists := f.items[key] + if exists { + // Copy item's slice so operations on this slice (delta + // compression) won't interfere with the object we return. + d = copyDeltas(d) + } + return d, exists, nil +} + +// Pop blocks until an item is added to the queue, and then returns it. If +// multiple items are ready, they are returned in the order in which they were +// added/updated. The item is removed from the queue (and the store) before it +// is returned, so if you don't succesfully process it, you need to add it back +// with AddIfNotPresent(). +// +// Pop returns a 'Deltas', which has a complete list of all the things +// that happened to the object (deltas) while it was sitting in the queue. +func (f *DeltaFIFO) Pop() interface{} { + f.lock.Lock() + defer f.lock.Unlock() + for { + for len(f.queue) == 0 { + f.cond.Wait() + } + id := f.queue[0] + f.queue = f.queue[1:] + item, ok := f.items[id] + if !ok { + // Item may have been deleted subsequently. + continue + } + delete(f.items, id) + // Don't need to copyDeltas here, because we're transferring + // ownership to the caller. + return item + } +} + +// Replace will delete the contents of 'f', using instead the given map. +// 'f' takes ownersip of the map, you should not reference the map again +// after calling this function. f's queue is reset, too; upon return, it +// will contain the items in the map, in no particular order. +func (f *DeltaFIFO) Replace(list []interface{}) error { + f.lock.Lock() + defer f.lock.Unlock() + for _, item := range list { + if err := f.queueActionLocked(Sync, item); err != nil { + return fmt.Errorf("couldn't enqueue object: %v", err) + } + } + if f.knownObjectKeys == nil { + return nil + } + + keySet := make(util.StringSet, len(list)) + for _, item := range list { + key, err := f.KeyOf(item) + if err != nil { + return KeyError{item, err} + } + keySet.Insert(key) + } + + // Detect deletions not already in the queue. + knownKeys := f.knownObjectKeys.ListKeys() + for _, k := range knownKeys { + if _, exists := keySet[k]; exists { + continue + } + + // This key isn't in the complete set we got, so it must have been deleted. + if d, exists := f.items[k]; exists { + // Don't issue a delete delta if we have one enqueued as the most + // recent delta. + if d.Newest().Type == Deleted { + continue + } + } + if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k}); err != nil { + return err + } + } + return nil +} + +// A KeyLister is anything that knows how to list its keys. +type KeyLister interface { + ListKeys() []string +} + +// KeyListerFunc adapts a raw function to be a KeyLister. +type KeyListerFunc func() []string + +// ListKeys just calls kl. +func (kl KeyListerFunc) ListKeys() []string { + return kl() +} + +// DeltaCompressor is an algorithm that removes redundant changes. +type DeltaCompressor interface { + Compress(Deltas) Deltas +} + +// DeltaCompressorFunc should remove redundant changes; but changes that +// are redundant depend on one's desired semantics, so this is an +// injectable function. +// +// DeltaCompressorFunc adapts a raw function to be a DeltaCompressor. +type DeltaCompressorFunc func(Deltas) Deltas + +// Compress just calls dc. +func (dc DeltaCompressorFunc) Compress(d Deltas) Deltas { + return dc(d) +} + +// DeltaType is the type of a change (addition, deletion, etc) +type DeltaType string + +const ( + Added DeltaType = "Added" + Updated DeltaType = "Updated" + Deleted DeltaType = "Deleted" + // The other types are obvious. You'll get Sync deltas when: + // * A watch expires/errors out and a new list/watch cycle is started. + // * You've turned on periodic syncs. + // (Anything that trigger's DeltaFIFO's Replace() method.) + Sync DeltaType = "Sync" +) + +// Delta is the type stored by a DeltaFIFO. It tells you what change +// happened, and the object's state after* that change. +// +// [*] Unless the change is a deletion, and then you'll get the final +// state of the object before it was deleted. +type Delta struct { + Type DeltaType + Object interface{} +} + +// Deltas is a list of one or more 'Delta's to an individual object. +// The oldest delta is at index 0, the newest delta is the last one. +type Deltas []Delta + +// Oldest is a convenience function that returns the oldest delta, or +// nil if there are no deltas. +func (d Deltas) Oldest() *Delta { + if len(d) > 0 { + return &d[0] + } + return nil +} + +// Newest is a convenience function that returns the newest delta, or +// nil if there are no deltas. +func (d Deltas) Newest() *Delta { + if n := len(d); n > 0 { + return &d[n-1] + } + return nil +} + +// copyDeltas returns a shallow copy of d; that is, it copies the slice but not +// the objects in the slice. This allows Get/List to return an object that we +// know won't be clobbered by a subsequent call to a delta compressor. +func copyDeltas(d Deltas) Deltas { + d2 := make(Deltas, len(d)) + copy(d2, d) + return d2 +} + +// DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where +// an object was deleted but the watch deletion event was was missed. +// In this case we don't know the final "resting" state of the object. +type DeletedFinalStateUnknown struct { + Key string +} diff --git a/pkg/client/cache/delta_fifo_test.go b/pkg/client/cache/delta_fifo_test.go new file mode 100644 index 00000000000..7bee133e055 --- /dev/null +++ b/pkg/client/cache/delta_fifo_test.go @@ -0,0 +1,277 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "reflect" + "testing" + "time" +) + +// helper function to reduce stuttering +func testPop(f *DeltaFIFO) testFifoObject { + return f.Pop().(Deltas).Newest().Object.(testFifoObject) +} + +func TestDeltaFIFO_basic(t *testing.T) { + f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil) + const amount = 500 + go func() { + for i := 0; i < amount; i++ { + f.Add(mkFifoObj(string([]rune{'a', rune(i)}), i+1)) + } + }() + go func() { + for u := uint64(0); u < amount; u++ { + f.Add(mkFifoObj(string([]rune{'b', rune(u)}), u+1)) + } + }() + + lastInt := int(0) + lastUint := uint64(0) + for i := 0; i < amount*2; i++ { + switch obj := testPop(f).val.(type) { + case int: + if obj <= lastInt { + t.Errorf("got %v (int) out of order, last was %v", obj, lastInt) + } + lastInt = obj + case uint64: + if obj <= lastUint { + t.Errorf("got %v (uint) out of order, last was %v", obj, lastUint) + } else { + lastUint = obj + } + default: + t.Fatalf("unexpected type %#v", obj) + } + } +} + +func TestDeltaFIFO_compressorWorks(t *testing.T) { + oldestTypes := []DeltaType{} + f := NewDeltaFIFO( + testFifoObjectKeyFunc, + // This function just keeps the most recent delta + // and puts deleted ones in the list. + DeltaCompressorFunc(func(d Deltas) Deltas { + if n := len(d); n > 1 { + oldestTypes = append(oldestTypes, d[0].Type) + d = d[1:] + } + return d + }), + nil, + ) + f.Add(mkFifoObj("foo", 10)) + f.Update(mkFifoObj("foo", 12)) + f.Replace([]interface{}{mkFifoObj("foo", 20)}) + f.Delete(mkFifoObj("foo", 15)) + f.Delete(mkFifoObj("foo", 18)) // flush the last one out + expect := []DeltaType{Added, Updated, Sync, Deleted} + if e, a := expect, oldestTypes; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + if e, a := (Deltas{{Deleted, mkFifoObj("foo", 18)}}), f.Pop().(Deltas); !reflect.DeepEqual(e, a) { + t.Fatalf("Expected %#v, got %#v", e, a) + } + +} + +func TestDeltaFIFO_addUpdate(t *testing.T) { + f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil) + f.Add(mkFifoObj("foo", 10)) + f.Update(mkFifoObj("foo", 12)) + f.Delete(mkFifoObj("foo", 15)) + + if e, a := []interface{}{mkFifoObj("foo", 15)}, f.List(); !reflect.DeepEqual(e, a) { + t.Errorf("Expected %+v, got %+v", e, a) + } + if e, a := []string{"foo"}, f.ListKeys(); !reflect.DeepEqual(e, a) { + t.Errorf("Expected %+v, got %+v", e, a) + } + + got := make(chan testFifoObject, 2) + go func() { + for { + got <- testPop(f) + } + }() + + first := <-got + if e, a := 15, first.val; e != a { + t.Errorf("Didn't get updated value (%v), got %v", e, a) + } + select { + case unexpected := <-got: + t.Errorf("Got second value %v", unexpected.val) + case <-time.After(50 * time.Millisecond): + } + _, exists, _ := f.Get(mkFifoObj("foo", "")) + if exists { + t.Errorf("item did not get removed") + } +} + +func TestDeltaFIFO_enqueueing(t *testing.T) { + f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil) + f.Add(mkFifoObj("foo", 10)) + f.Update(mkFifoObj("bar", 15)) + f.Delete(mkFifoObj("baz", 20)) + + expectList := []int{10, 15, 20} + for _, expect := range expectList { + if e, a := expect, testPop(f).val; e != a { + t.Errorf("Didn't get updated value (%v), got %v", e, a) + } + } + if e, a := 0, len(f.items); e != a { + t.Errorf("queue unexpectedly not empty: %v != %v", e, a) + } +} + +func TestDeltaFIFO_addReplace(t *testing.T) { + f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil) + f.Add(mkFifoObj("foo", 10)) + f.Replace([]interface{}{mkFifoObj("foo", 15)}) + got := make(chan testFifoObject, 2) + go func() { + for { + got <- testPop(f) + } + }() + + first := <-got + if e, a := 15, first.val; e != a { + t.Errorf("Didn't get updated value (%v), got %v", e, a) + } + select { + case unexpected := <-got: + t.Errorf("Got second value %v", unexpected.val) + case <-time.After(50 * time.Millisecond): + } + _, exists, _ := f.Get(mkFifoObj("foo", "")) + if exists { + t.Errorf("item did not get removed") + } +} + +func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) { + f := NewDeltaFIFO( + testFifoObjectKeyFunc, + nil, + KeyListerFunc(func() []string { + return []string{"foo", "bar", "baz"} + }), + ) + f.Delete(mkFifoObj("baz", 10)) + f.Replace([]interface{}{mkFifoObj("foo", 5)}) + + expectedList := []Deltas{ + {{Deleted, mkFifoObj("baz", 10)}}, + {{Sync, mkFifoObj("foo", 5)}}, + {{Deleted, DeletedFinalStateUnknown{Key: "bar"}}}, + } + + for _, expected := range expectedList { + cur := f.Pop().(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } +} + +func TestDeltaFIFO_detectLineJumpers(t *testing.T) { + f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil) + + f.Add(mkFifoObj("foo", 10)) + f.Add(mkFifoObj("bar", 1)) + f.Add(mkFifoObj("foo", 11)) + f.Add(mkFifoObj("foo", 13)) + f.Add(mkFifoObj("zab", 30)) + + if e, a := 13, testPop(f).val; a != e { + t.Fatalf("expected %d, got %d", e, a) + } + + f.Add(mkFifoObj("foo", 14)) // ensure foo doesn't jump back in line + + if e, a := 1, testPop(f).val; a != e { + t.Fatalf("expected %d, got %d", e, a) + } + + if e, a := 30, testPop(f).val; a != e { + t.Fatalf("expected %d, got %d", e, a) + } + + if e, a := 14, testPop(f).val; a != e { + t.Fatalf("expected %d, got %d", e, a) + } +} + +func TestDeltaFIFO_addIfNotPresent(t *testing.T) { + f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil) + + f.Add(mkFifoObj("b", 3)) + b3 := f.Pop() + f.Add(mkFifoObj("c", 4)) + c4 := f.Pop() + if e, a := 0, len(f.items); e != a { + t.Fatalf("Expected %v, got %v items in queue", e, a) + } + + f.Add(mkFifoObj("a", 1)) + f.Add(mkFifoObj("b", 2)) + f.AddIfNotPresent(b3) + f.AddIfNotPresent(c4) + + if e, a := 3, len(f.items); a != e { + t.Fatalf("expected queue length %d, got %d", e, a) + } + + expectedValues := []int{1, 2, 4} + for _, expected := range expectedValues { + if actual := testPop(f).val; actual != expected { + t.Fatalf("expected value %d, got %d", expected, actual) + } + } +} + +func TestDeltaFIFO_KeyOf(t *testing.T) { + f := DeltaFIFO{keyFunc: testFifoObjectKeyFunc} + + table := []struct { + obj interface{} + key string + }{ + {obj: testFifoObject{name: "A"}, key: "A"}, + {obj: DeletedFinalStateUnknown{Key: "B"}, key: "B"}, + {obj: Deltas{{Object: testFifoObject{name: "C"}}}, key: "C"}, + {obj: Deltas{{Object: DeletedFinalStateUnknown{Key: "D"}}}, key: "D"}, + } + + for _, item := range table { + got, err := f.KeyOf(item.obj) + if err != nil { + t.Errorf("Unexpected error for %q: %v", item.obj, err) + continue + } + if e, a := item.key, got; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + } +}