Merge pull request #14881 from lavalamp/fix-14617

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot
2015-10-20 14:14:18 -07:00
6 changed files with 296 additions and 108 deletions

View File

@@ -43,15 +43,18 @@ import (
// TODO: consider merging keyLister with this object, tracking a list of
// "known" keys when Pop() is called. Have to think about how that
// affects error retrying.
// TODO(lavalamp): I believe there is a possible race only when using an
// external known object source that the above TODO would
// fix.
//
// Also see the comment on DeltaFIFO.
func NewDeltaFIFO(keyFunc KeyFunc, compressor DeltaCompressor, knownObjectKeys KeyLister) *DeltaFIFO {
func NewDeltaFIFO(keyFunc KeyFunc, compressor DeltaCompressor, knownObjects KeyListerGetter) *DeltaFIFO {
f := &DeltaFIFO{
items: map[string]Deltas{},
queue: []string{},
keyFunc: keyFunc,
deltaCompressor: compressor,
knownObjectKeys: knownObjectKeys,
knownObjects: knownObjects,
}
f.cond.L = &f.lock
return f
@@ -80,9 +83,8 @@ func NewDeltaFIFO(keyFunc KeyFunc, compressor DeltaCompressor, knownObjectKeys K
//
// A note on the KeyLister used by the DeltaFIFO: It's main purpose is
// to list keys that are "known", for the puspose of figuring out which
// items have been deleted when Replace() is called. If the given KeyLister
// also satisfies the KeyGetter interface, the deleted objet will be
// included in the DeleteFinalStateUnknown markers. These objects
// items have been deleted when Replace() or Delete() are called. The deleted
// objet will be included in the DeleteFinalStateUnknown markers. These objects
// could be stale.
//
// You may provide a function to compress deltas (e.g., represent a
@@ -106,10 +108,10 @@ type DeltaFIFO struct {
// deltas. It may be nil.
deltaCompressor DeltaCompressor
// knownObjectKeys list keys that are "known", for the
// knownObjects list keys that are "known", for the
// purpose of figuring out which items have been deleted
// when Replace() is called.
knownObjectKeys KeyLister
// when Replace() or Delete() is called.
knownObjects KeyListerGetter
}
var (
@@ -154,10 +156,30 @@ func (f *DeltaFIFO) Update(obj interface{}) error {
return f.queueActionLocked(Updated, obj)
}
// Delete is just like Add, but makes an Deleted Delta.
// Delete is just like Add, but makes an Deleted Delta. If the item does not
// already exist, it will be ignored. (It may have already been deleted by a
// Replace (re-list), for example.
func (f *DeltaFIFO) Delete(obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
f.lock.Lock()
defer f.lock.Unlock()
if f.knownObjects == nil {
if _, exists := f.items[id]; !exists {
// Presumably, this was deleted when a relist happened.
// Don't provide a second report of the same deletion.
return nil
}
} else if _, exists, err := f.knownObjects.GetByKey(id); err == nil && !exists {
// Presumably, this was deleted when a relist happened.
// Don't provide a second report of the same deletion.
// TODO(lavalamp): This may be racy-- we aren't properly locked
// with knownObjects.
return nil
}
return f.queueActionLocked(Deleted, obj)
}
@@ -191,14 +213,54 @@ func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
return nil
}
// re-listing and watching can deliver the same update multiple times in any
// order. This will combine the most recent two deltas if they are the same.
func dedupDeltas(deltas Deltas) Deltas {
n := len(deltas)
if n < 2 {
return deltas
}
a := &deltas[n-1]
b := &deltas[n-2]
if out := isDup(a, b); out != nil {
d := append(Deltas{}, deltas[:n-2]...)
return append(d, *out)
}
return deltas
}
// If a & b represent the same event, returns the delta that ought to be kept.
// Otherwise, returns nil.
// TODO: is there anything other than deletions that need deduping?
func isDup(a, b *Delta) *Delta {
if out := isDeletionDup(a, b); out != nil {
return out
}
// TODO: Detect other duplicate situations? Are there any?
return nil
}
// keep the one with the most information if both are deletions.
func isDeletionDup(a, b *Delta) *Delta {
if b.Type != Deleted || a.Type != Deleted {
return nil
}
// Do more sophisticated checks, or is this sufficient?
if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
return a
}
return b
}
// queueActionLocked appends to the delta list for the object, calling
// f.deltaCompressor if needed
// f.deltaCompressor if needed. Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
newDeltas := append(f.items[id], Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)
if f.deltaCompressor != nil {
newDeltas = f.deltaCompressor.Compress(newDeltas)
}
@@ -310,52 +372,51 @@ func (f *DeltaFIFO) Pop() interface{} {
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
f.lock.Lock()
defer f.lock.Unlock()
for _, item := range list {
if err := f.queueActionLocked(Sync, item); err != nil {
return fmt.Errorf("couldn't enqueue object: %v", err)
}
}
if f.knownObjectKeys == nil {
return nil
}
keySet := make(sets.String, len(list))
keys := make(sets.String, len(list))
for _, item := range list {
key, err := f.KeyOf(item)
if err != nil {
return KeyError{item, err}
}
keySet.Insert(key)
keys.Insert(key)
if err := f.queueActionLocked(Sync, item); err != nil {
return fmt.Errorf("couldn't enqueue object: %v", err)
}
}
if f.knownObjects == nil {
// Do deletion detection against our own list.
for k, oldItem := range f.items {
if keys.Has(k) {
continue
}
var deletedObj interface{}
if n := oldItem.Newest(); n != nil {
deletedObj = n.Object
}
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
return nil
}
// Detect deletions not already in the queue.
knownKeys := f.knownObjectKeys.ListKeys()
// TODO(lavalamp): This may be racy-- we aren't properly locked
// with knownObjects. Unproven.
knownKeys := f.knownObjects.ListKeys()
for _, k := range knownKeys {
if _, exists := keySet[k]; exists {
if keys.Has(k) {
continue
}
// This key isn't in the complete set we got, so it must have been deleted.
if d, exists := f.items[k]; exists {
// Don't issue a delete delta if we have one enqueued as the most
// recent delta.
if d.Newest().Type == Deleted {
continue
}
}
var deletedObj interface{}
if keyGetter, ok := f.knownObjectKeys.(KeyGetter); ok {
var exists bool
var err error
deletedObj, exists, err = keyGetter.GetByKey(k)
if err != nil || !exists {
deletedObj = nil
if err != nil {
glog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
} else {
glog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
}
}
deletedObj, exists, err := f.knownObjects.GetByKey(k)
if err != nil {
deletedObj = nil
glog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
} else if !exists {
deletedObj = nil
glog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
}
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
@@ -364,6 +425,12 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
return nil
}
// A KeyListerGetter is anything that knows how to list its keys and look up by key.
type KeyListerGetter interface {
KeyLister
KeyGetter
}
// A KeyLister is anything that knows how to list its keys.
type KeyLister interface {
ListKeys() []string

View File

@@ -98,13 +98,13 @@ func TestDeltaFIFO_compressorWorks(t *testing.T) {
f.Add(mkFifoObj("foo", 10))
f.Update(mkFifoObj("foo", 12))
f.Replace([]interface{}{mkFifoObj("foo", 20)}, "0")
f.Delete(mkFifoObj("foo", 15))
f.Delete(mkFifoObj("foo", 18)) // flush the last one out
f.Delete(mkFifoObj("foo", 22))
f.Add(mkFifoObj("foo", 25)) // flush the last one out
expect := []DeltaType{Added, Updated, Sync, Deleted}
if e, a := expect, oldestTypes; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#v, got %#v", e, a)
}
if e, a := (Deltas{{Deleted, mkFifoObj("foo", 18)}}), f.Pop().(Deltas); !reflect.DeepEqual(e, a) {
if e, a := (Deltas{{Added, mkFifoObj("foo", 25)}}), f.Pop().(Deltas); !reflect.DeepEqual(e, a) {
t.Fatalf("Expected %#v, got %#v", e, a)
}
@@ -126,7 +126,10 @@ func TestDeltaFIFO_addUpdate(t *testing.T) {
got := make(chan testFifoObject, 2)
go func() {
for {
got <- testPop(f)
obj := f.Pop().(Deltas).Newest().Object.(testFifoObject)
t.Logf("got a thing %#v", obj)
t.Logf("D len: %v", len(f.queue))
got <- obj
}
}()
@@ -145,10 +148,39 @@ func TestDeltaFIFO_addUpdate(t *testing.T) {
}
}
func TestDeltaFIFO_enqueueing(t *testing.T) {
func TestDeltaFIFO_enqueueingNoLister(t *testing.T) {
f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil)
f.Add(mkFifoObj("foo", 10))
f.Update(mkFifoObj("bar", 15))
f.Add(mkFifoObj("qux", 17))
f.Delete(mkFifoObj("qux", 18))
// This delete does not enqueue anything because baz doesn't exist.
f.Delete(mkFifoObj("baz", 20))
expectList := []int{10, 15, 18}
for _, expect := range expectList {
if e, a := expect, testPop(f).val; e != a {
t.Errorf("Didn't get updated value (%v), got %v", e, a)
}
}
if e, a := 0, len(f.items); e != a {
t.Errorf("queue unexpectedly not empty: %v != %v\n%#v", e, a, f.items)
}
}
func TestDeltaFIFO_enqueueingWithLister(t *testing.T) {
f := NewDeltaFIFO(
testFifoObjectKeyFunc,
nil,
keyLookupFunc(func() []string {
return []string{"foo", "bar", "baz"}
}),
)
f.Add(mkFifoObj("foo", 10))
f.Update(mkFifoObj("bar", 15))
// This delete does enqueue the deletion, because "baz" is in the key lister.
f.Delete(mkFifoObj("baz", 20))
expectList := []int{10, 15, 20}