diff --git a/pkg/client/cache/delta_fifo.go b/pkg/client/cache/delta_fifo.go index 808a854556e..fcf2c313717 100644 --- a/pkg/client/cache/delta_fifo.go +++ b/pkg/client/cache/delta_fifo.go @@ -43,15 +43,18 @@ import ( // TODO: consider merging keyLister with this object, tracking a list of // "known" keys when Pop() is called. Have to think about how that // affects error retrying. +// TODO(lavalamp): I believe there is a possible race only when using an +// external known object source that the above TODO would +// fix. // // Also see the comment on DeltaFIFO. -func NewDeltaFIFO(keyFunc KeyFunc, compressor DeltaCompressor, knownObjectKeys KeyLister) *DeltaFIFO { +func NewDeltaFIFO(keyFunc KeyFunc, compressor DeltaCompressor, knownObjects KeyListerGetter) *DeltaFIFO { f := &DeltaFIFO{ items: map[string]Deltas{}, queue: []string{}, keyFunc: keyFunc, deltaCompressor: compressor, - knownObjectKeys: knownObjectKeys, + knownObjects: knownObjects, } f.cond.L = &f.lock return f @@ -80,9 +83,8 @@ func NewDeltaFIFO(keyFunc KeyFunc, compressor DeltaCompressor, knownObjectKeys K // // A note on the KeyLister used by the DeltaFIFO: It's main purpose is // to list keys that are "known", for the puspose of figuring out which -// items have been deleted when Replace() is called. If the given KeyLister -// also satisfies the KeyGetter interface, the deleted objet will be -// included in the DeleteFinalStateUnknown markers. These objects +// items have been deleted when Replace() or Delete() are called. The deleted +// objet will be included in the DeleteFinalStateUnknown markers. These objects // could be stale. // // You may provide a function to compress deltas (e.g., represent a @@ -106,10 +108,10 @@ type DeltaFIFO struct { // deltas. It may be nil. deltaCompressor DeltaCompressor - // knownObjectKeys list keys that are "known", for the + // knownObjects list keys that are "known", for the // purpose of figuring out which items have been deleted - // when Replace() is called. - knownObjectKeys KeyLister + // when Replace() or Delete() is called. + knownObjects KeyListerGetter } var ( @@ -154,10 +156,30 @@ func (f *DeltaFIFO) Update(obj interface{}) error { return f.queueActionLocked(Updated, obj) } -// Delete is just like Add, but makes an Deleted Delta. +// Delete is just like Add, but makes an Deleted Delta. If the item does not +// already exist, it will be ignored. (It may have already been deleted by a +// Replace (re-list), for example. func (f *DeltaFIFO) Delete(obj interface{}) error { + id, err := f.KeyOf(obj) + if err != nil { + return KeyError{obj, err} + } f.lock.Lock() defer f.lock.Unlock() + if f.knownObjects == nil { + if _, exists := f.items[id]; !exists { + // Presumably, this was deleted when a relist happened. + // Don't provide a second report of the same deletion. + return nil + } + } else if _, exists, err := f.knownObjects.GetByKey(id); err == nil && !exists { + // Presumably, this was deleted when a relist happened. + // Don't provide a second report of the same deletion. + // TODO(lavalamp): This may be racy-- we aren't properly locked + // with knownObjects. + return nil + } + return f.queueActionLocked(Deleted, obj) } @@ -191,14 +213,54 @@ func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error { return nil } +// re-listing and watching can deliver the same update multiple times in any +// order. This will combine the most recent two deltas if they are the same. +func dedupDeltas(deltas Deltas) Deltas { + n := len(deltas) + if n < 2 { + return deltas + } + a := &deltas[n-1] + b := &deltas[n-2] + if out := isDup(a, b); out != nil { + d := append(Deltas{}, deltas[:n-2]...) + return append(d, *out) + } + return deltas +} + +// If a & b represent the same event, returns the delta that ought to be kept. +// Otherwise, returns nil. +// TODO: is there anything other than deletions that need deduping? +func isDup(a, b *Delta) *Delta { + if out := isDeletionDup(a, b); out != nil { + return out + } + // TODO: Detect other duplicate situations? Are there any? + return nil +} + +// keep the one with the most information if both are deletions. +func isDeletionDup(a, b *Delta) *Delta { + if b.Type != Deleted || a.Type != Deleted { + return nil + } + // Do more sophisticated checks, or is this sufficient? + if _, ok := b.Object.(DeletedFinalStateUnknown); ok { + return a + } + return b +} + // queueActionLocked appends to the delta list for the object, calling -// f.deltaCompressor if needed +// f.deltaCompressor if needed. Caller must lock first. func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { id, err := f.KeyOf(obj) if err != nil { return KeyError{obj, err} } newDeltas := append(f.items[id], Delta{actionType, obj}) + newDeltas = dedupDeltas(newDeltas) if f.deltaCompressor != nil { newDeltas = f.deltaCompressor.Compress(newDeltas) } @@ -310,52 +372,51 @@ func (f *DeltaFIFO) Pop() interface{} { func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error { f.lock.Lock() defer f.lock.Unlock() - for _, item := range list { - if err := f.queueActionLocked(Sync, item); err != nil { - return fmt.Errorf("couldn't enqueue object: %v", err) - } - } - if f.knownObjectKeys == nil { - return nil - } - - keySet := make(sets.String, len(list)) + keys := make(sets.String, len(list)) for _, item := range list { key, err := f.KeyOf(item) if err != nil { return KeyError{item, err} } - keySet.Insert(key) + keys.Insert(key) + if err := f.queueActionLocked(Sync, item); err != nil { + return fmt.Errorf("couldn't enqueue object: %v", err) + } + } + + if f.knownObjects == nil { + // Do deletion detection against our own list. + for k, oldItem := range f.items { + if keys.Has(k) { + continue + } + var deletedObj interface{} + if n := oldItem.Newest(); n != nil { + deletedObj = n.Object + } + if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { + return err + } + } + return nil } // Detect deletions not already in the queue. - knownKeys := f.knownObjectKeys.ListKeys() + // TODO(lavalamp): This may be racy-- we aren't properly locked + // with knownObjects. Unproven. + knownKeys := f.knownObjects.ListKeys() for _, k := range knownKeys { - if _, exists := keySet[k]; exists { + if keys.Has(k) { continue } - // This key isn't in the complete set we got, so it must have been deleted. - if d, exists := f.items[k]; exists { - // Don't issue a delete delta if we have one enqueued as the most - // recent delta. - if d.Newest().Type == Deleted { - continue - } - } - var deletedObj interface{} - if keyGetter, ok := f.knownObjectKeys.(KeyGetter); ok { - var exists bool - var err error - deletedObj, exists, err = keyGetter.GetByKey(k) - if err != nil || !exists { - deletedObj = nil - if err != nil { - glog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k) - } else { - glog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k) - } - } + deletedObj, exists, err := f.knownObjects.GetByKey(k) + if err != nil { + deletedObj = nil + glog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k) + } else if !exists { + deletedObj = nil + glog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k) } if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { return err @@ -364,6 +425,12 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error { return nil } +// A KeyListerGetter is anything that knows how to list its keys and look up by key. +type KeyListerGetter interface { + KeyLister + KeyGetter +} + // A KeyLister is anything that knows how to list its keys. type KeyLister interface { ListKeys() []string diff --git a/pkg/client/cache/delta_fifo_test.go b/pkg/client/cache/delta_fifo_test.go index 1b2288ab006..6adec860571 100644 --- a/pkg/client/cache/delta_fifo_test.go +++ b/pkg/client/cache/delta_fifo_test.go @@ -98,13 +98,13 @@ func TestDeltaFIFO_compressorWorks(t *testing.T) { f.Add(mkFifoObj("foo", 10)) f.Update(mkFifoObj("foo", 12)) f.Replace([]interface{}{mkFifoObj("foo", 20)}, "0") - f.Delete(mkFifoObj("foo", 15)) - f.Delete(mkFifoObj("foo", 18)) // flush the last one out + f.Delete(mkFifoObj("foo", 22)) + f.Add(mkFifoObj("foo", 25)) // flush the last one out expect := []DeltaType{Added, Updated, Sync, Deleted} if e, a := expect, oldestTypes; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } - if e, a := (Deltas{{Deleted, mkFifoObj("foo", 18)}}), f.Pop().(Deltas); !reflect.DeepEqual(e, a) { + if e, a := (Deltas{{Added, mkFifoObj("foo", 25)}}), f.Pop().(Deltas); !reflect.DeepEqual(e, a) { t.Fatalf("Expected %#v, got %#v", e, a) } @@ -126,7 +126,10 @@ func TestDeltaFIFO_addUpdate(t *testing.T) { got := make(chan testFifoObject, 2) go func() { for { - got <- testPop(f) + obj := f.Pop().(Deltas).Newest().Object.(testFifoObject) + t.Logf("got a thing %#v", obj) + t.Logf("D len: %v", len(f.queue)) + got <- obj } }() @@ -145,10 +148,39 @@ func TestDeltaFIFO_addUpdate(t *testing.T) { } } -func TestDeltaFIFO_enqueueing(t *testing.T) { +func TestDeltaFIFO_enqueueingNoLister(t *testing.T) { f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil) f.Add(mkFifoObj("foo", 10)) f.Update(mkFifoObj("bar", 15)) + f.Add(mkFifoObj("qux", 17)) + f.Delete(mkFifoObj("qux", 18)) + + // This delete does not enqueue anything because baz doesn't exist. + f.Delete(mkFifoObj("baz", 20)) + + expectList := []int{10, 15, 18} + for _, expect := range expectList { + if e, a := expect, testPop(f).val; e != a { + t.Errorf("Didn't get updated value (%v), got %v", e, a) + } + } + if e, a := 0, len(f.items); e != a { + t.Errorf("queue unexpectedly not empty: %v != %v\n%#v", e, a, f.items) + } +} + +func TestDeltaFIFO_enqueueingWithLister(t *testing.T) { + f := NewDeltaFIFO( + testFifoObjectKeyFunc, + nil, + keyLookupFunc(func() []string { + return []string{"foo", "bar", "baz"} + }), + ) + f.Add(mkFifoObj("foo", 10)) + f.Update(mkFifoObj("bar", 15)) + + // This delete does enqueue the deletion, because "baz" is in the key lister. f.Delete(mkFifoObj("baz", 20)) expectList := []int{10, 15, 20} diff --git a/pkg/controller/framework/controller_test.go b/pkg/controller/framework/controller_test.go index d5fb51871fe..c01714a6ef6 100644 --- a/pkg/controller/framework/controller_test.go +++ b/pkg/controller/framework/controller_test.go @@ -311,30 +311,6 @@ func TestUpdate(t *testing.T) { pair{FROM, FROM}: true, } - var testDoneWG sync.WaitGroup - - // Make a controller that deletes things once it observes an update. - // It calls Done() on the wait group on deletions so we can tell when - // everything we've added has been deleted. - _, controller := framework.NewInformer( - source, - &api.Pod{}, - time.Millisecond*1, - framework.ResourceEventHandlerFuncs{ - UpdateFunc: func(oldObj, newObj interface{}) { - o, n := oldObj.(*api.Pod), newObj.(*api.Pod) - from, to := o.Labels["check"], n.Labels["check"] - if !allowedTransitions[pair{from, to}] { - t.Errorf("observed transition %q -> %q for %v", from, to, n.Name) - } - source.Delete(n) - }, - DeleteFunc: func(obj interface{}) { - testDoneWG.Done() - }, - }, - ) - pod := func(name, check string) *api.Pod { return &api.Pod{ ObjectMeta: api.ObjectMeta{ @@ -368,8 +344,32 @@ func TestUpdate(t *testing.T) { } const threads = 3 + + var testDoneWG sync.WaitGroup testDoneWG.Add(threads * len(tests)) + // Make a controller that deletes things once it observes an update. + // It calls Done() on the wait group on deletions so we can tell when + // everything we've added has been deleted. + _, controller := framework.NewInformer( + source, + &api.Pod{}, + time.Millisecond*1, + framework.ResourceEventHandlerFuncs{ + UpdateFunc: func(oldObj, newObj interface{}) { + o, n := oldObj.(*api.Pod), newObj.(*api.Pod) + from, to := o.Labels["check"], n.Labels["check"] + if !allowedTransitions[pair{from, to}] { + t.Errorf("observed transition %q -> %q for %v", from, to, n.Name) + } + source.Delete(n) + }, + DeleteFunc: func(obj interface{}) { + testDoneWG.Done() + }, + }, + ) + // Run the controller and run it until we close stop. // Once Run() is called, calls to testDoneWG.Done() might start, so // all testDoneWG.Add() calls must happen before this point diff --git a/pkg/controller/framework/fake_controller_source.go b/pkg/controller/framework/fake_controller_source.go index d4b99bd096d..986346e6457 100644 --- a/pkg/controller/framework/fake_controller_source.go +++ b/pkg/controller/framework/fake_controller_source.go @@ -103,7 +103,7 @@ func (f *FakeControllerSource) Change(e watch.Event, watchProbability float64) { panic(err) // this is test code only } - resourceVersion := len(f.changes) + resourceVersion := len(f.changes) + 1 objMeta.ResourceVersion = strconv.Itoa(resourceVersion) f.changes = append(f.changes, e) key := f.key(objMeta) @@ -127,7 +127,7 @@ func (f *FakeControllerSource) List() (runtime.Object, error) { for _, obj := range f.items { // Must make a copy to allow clients to modify the object. // Otherwise, if they make a change and write it back, they - // will inadvertently change the our canonical copy (in + // will inadvertently change our canonical copy (in // addition to racing with other clients). objCopy, err := api.Scheme.DeepCopy(obj) if err != nil { @@ -157,7 +157,6 @@ func (f *FakeControllerSource) Watch(resourceVersion string) (watch.Interface, e if err != nil { return nil, err } - rc++ // Don't re-send them a change they already have. if rc < len(f.changes) { changes := []watch.Event{} for _, c := range f.changes[rc:] { @@ -178,3 +177,11 @@ func (f *FakeControllerSource) Watch(resourceVersion string) (watch.Interface, e } return f.broadcaster.Watch(), nil } + +// Shutdown closes the underlying broadcaster, waiting for events to be +// delivered. It's an error to call any method after calling shutdown. This is +// enforced by Shutdown() leaving f locked. +func (f *FakeControllerSource) Shutdown() { + f.lock.Lock() // Purposely no unlock. + f.broadcaster.Shutdown() +} diff --git a/pkg/controller/framework/fake_controller_source_test.go b/pkg/controller/framework/fake_controller_source_test.go index 409b73c8377..fcf618fb6ff 100644 --- a/pkg/controller/framework/fake_controller_source_test.go +++ b/pkg/controller/framework/fake_controller_source_test.go @@ -17,11 +17,36 @@ limitations under the License. package framework import ( + "sync" "testing" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/watch" ) +// ensure the watch delivers the requested and only the requested items. +func consume(t *testing.T, w watch.Interface, rvs []string, done *sync.WaitGroup) { + defer done.Done() + for _, rv := range rvs { + got, ok := <-w.ResultChan() + if !ok { + t.Errorf("%#v: unexpected channel close, wanted %v", rvs, rv) + return + } + gotRV := got.Object.(*api.Pod).ObjectMeta.ResourceVersion + if e, a := rv, gotRV; e != a { + t.Errorf("wanted %v, got %v", e, a) + } else { + t.Logf("Got %v as expected", gotRV) + } + } + // We should not get anything else. + got, open := <-w.ResultChan() + if open { + t.Errorf("%#v: unwanted object %#v", rvs, got) + } +} + func TestRCNumber(t *testing.T) { pod := func(name string) *api.Pod { return &api.Pod{ @@ -31,6 +56,9 @@ func TestRCNumber(t *testing.T) { } } + wg := &sync.WaitGroup{} + wg.Add(3) + source := NewFakeControllerSource() source.Add(pod("foo")) source.Modify(pod("foo")) @@ -40,9 +68,27 @@ func TestRCNumber(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - defer w.Stop() - got := <-w.ResultChan() - if e, a := "2", got.Object.(*api.Pod).ObjectMeta.ResourceVersion; e != a { + go consume(t, w, []string{"2", "3"}, wg) + + list, err := source.List() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if e, a := "3", list.(*api.List).ResourceVersion; e != a { t.Errorf("wanted %v, got %v", e, a) } + + w2, err := source.Watch("2") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + go consume(t, w2, []string{"3"}, wg) + + w3, err := source.Watch("3") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + go consume(t, w3, []string{}, wg) + source.Shutdown() + wg.Wait() } diff --git a/pkg/watch/mux.go b/pkg/watch/mux.go index c1cb40f6a8e..c123968d819 100644 --- a/pkg/watch/mux.go +++ b/pkg/watch/mux.go @@ -39,6 +39,8 @@ const incomingQueueLength = 25 // Broadcaster distributes event notifications among any number of watchers. Every event // is delivered to every watcher. type Broadcaster struct { + // TODO: see if this lock is needed now that new watchers go through + // the incoming channel. lock sync.Mutex watchers map[int64]*broadcasterWatcher @@ -73,21 +75,48 @@ func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *B return m } +const internalRunFunctionMarker = "internal-do-function" + +// a function type we can shoehorn into the queue. +type functionFakeRuntimeObject func() + +func (functionFakeRuntimeObject) IsAnAPIObject() {} + +// Execute f, blocking the incoming queue (and waiting for it to drain first). +// The purpose of this terrible hack is so that watchers added after an event +// won't ever see that event, and will always see any event after they are +// added. +func (b *Broadcaster) blockQueue(f func()) { + var wg sync.WaitGroup + wg.Add(1) + b.incoming <- Event{ + Type: internalRunFunctionMarker, + Object: functionFakeRuntimeObject(func() { + defer wg.Done() + f() + }), + } + wg.Wait() +} + // Watch adds a new watcher to the list and returns an Interface for it. // Note: new watchers will only receive new events. They won't get an entire history // of previous events. func (m *Broadcaster) Watch() Interface { - m.lock.Lock() - defer m.lock.Unlock() - id := m.nextWatcher - m.nextWatcher++ - w := &broadcasterWatcher{ - result: make(chan Event, m.watchQueueLength), - stopped: make(chan struct{}), - id: id, - m: m, - } - m.watchers[id] = w + var w *broadcasterWatcher + m.blockQueue(func() { + m.lock.Lock() + defer m.lock.Unlock() + id := m.nextWatcher + m.nextWatcher++ + w = &broadcasterWatcher{ + result: make(chan Event, m.watchQueueLength), + stopped: make(chan struct{}), + id: id, + m: m, + } + m.watchers[id] = w + }) return w } @@ -96,24 +125,27 @@ func (m *Broadcaster) Watch() Interface { // The returned watch will have a queue length that is at least large enough to accommodate // all of the items in queuedEvents. func (m *Broadcaster) WatchWithPrefix(queuedEvents []Event) Interface { - m.lock.Lock() - defer m.lock.Unlock() - id := m.nextWatcher - m.nextWatcher++ - length := m.watchQueueLength - if n := len(queuedEvents) + 1; n > length { - length = n - } - w := &broadcasterWatcher{ - result: make(chan Event, length), - stopped: make(chan struct{}), - id: id, - m: m, - } - m.watchers[id] = w - for _, e := range queuedEvents { - w.result <- e - } + var w *broadcasterWatcher + m.blockQueue(func() { + m.lock.Lock() + defer m.lock.Unlock() + id := m.nextWatcher + m.nextWatcher++ + length := m.watchQueueLength + if n := len(queuedEvents) + 1; n > length { + length = n + } + w = &broadcasterWatcher{ + result: make(chan Event, length), + stopped: make(chan struct{}), + id: id, + m: m, + } + m.watchers[id] = w + for _, e := range queuedEvents { + w.result <- e + } + }) return w } @@ -167,6 +199,10 @@ func (m *Broadcaster) loop() { if !ok { break } + if event.Type == internalRunFunctionMarker { + event.Object.(functionFakeRuntimeObject)() + continue + } m.distribute(event) } m.closeAll()