From d900134a60ccc16c427262dc12c2b1db7fac53c0 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Thu, 14 Aug 2014 13:23:07 -0700 Subject: [PATCH] Add filter to watch --- pkg/watch/filter.go | 68 +++++++++++++++++++++++++++++++++ pkg/watch/filter_test.go | 82 ++++++++++++++++++++++++++++++++++++++++ pkg/watch/mux.go | 5 ++- 3 files changed, 153 insertions(+), 2 deletions(-) create mode 100644 pkg/watch/filter.go create mode 100644 pkg/watch/filter_test.go diff --git a/pkg/watch/filter.go b/pkg/watch/filter.go new file mode 100644 index 00000000000..61955ec4614 --- /dev/null +++ b/pkg/watch/filter.go @@ -0,0 +1,68 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import () + +// FilterFunc should take an event, possibly modify it in some way, and return +// the modified event. If the event should be ignored, then return keep=false. +type FilterFunc func(in Event) (out Event, keep bool) + +// Filter passes all events through f before allowing them to pass on. +// Putting a filter on a watch, as an unavoidable side-effect due to the way +// go channels work, effectively causes the watch's event channel to have its +// queue length increased by one. +func Filter(w Interface, f FilterFunc) Interface { + fw := &filteredWatch{ + incoming: w, + result: make(chan Event), + f: f, + } + go fw.loop() + return fw +} + +type filteredWatch struct { + incoming Interface + result chan Event + f FilterFunc +} + +// ResultChan returns a channel which will receive filtered events. +func (fw *filteredWatch) ResultChan() <-chan Event { + return fw.result +} + +// Stop stops the upstream watch, which will eventually stop this watch. +func (fw *filteredWatch) Stop() { + fw.incoming.Stop() +} + +// loop waits for new values, filters them, and resends them. +func (fw *filteredWatch) loop() { + defer close(fw.result) + for { + event, ok := <-fw.incoming.ResultChan() + if !ok { + break + } + filtered, keep := fw.f(event) + if keep { + fw.result <- filtered + } + } +} diff --git a/pkg/watch/filter_test.go b/pkg/watch/filter_test.go new file mode 100644 index 00000000000..a6e59ca3ed9 --- /dev/null +++ b/pkg/watch/filter_test.go @@ -0,0 +1,82 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "reflect" + "testing" +) + +func TestFilter(t *testing.T) { + table := []Event{ + {Added, "foo"}, + {Added, "bar"}, + {Added, "baz"}, + {Added, "qux"}, + {Added, "zoo"}, + } + + source := NewFake() + filtered := Filter(source, func(e Event) (Event, bool) { + return e, e.Object.(string)[0] != 'b' + }) + + go func() { + for _, item := range table { + source.Action(item.Type, item.Object) + } + source.Stop() + }() + + var got []string + for { + event, ok := <-filtered.ResultChan() + if !ok { + break + } + got = append(got, event.Object.(string)) + } + + if e, a := []string{"foo", "qux", "zoo"}, got; !reflect.DeepEqual(e, a) { + t.Errorf("got %v, wanted %v", e, a) + } +} + +func TestFilterStop(t *testing.T) { + source := NewFake() + filtered := Filter(source, func(e Event) (Event, bool) { + return e, e.Object.(string)[0] != 'b' + }) + + go func() { + source.Add("foo") + filtered.Stop() + }() + + var got []string + for { + event, ok := <-filtered.ResultChan() + if !ok { + break + } + got = append(got, event.Object.(string)) + } + + if e, a := []string{"foo"}, got; !reflect.DeepEqual(e, a) { + t.Errorf("got %v, wanted %v", e, a) + } +} diff --git a/pkg/watch/mux.go b/pkg/watch/mux.go index 6ec0cec8bb6..4935e204ffe 100644 --- a/pkg/watch/mux.go +++ b/pkg/watch/mux.go @@ -20,7 +20,8 @@ import ( "sync" ) -// Mux distributes event notifications among any number of watchers. +// Mux distributes event notifications among any number of watchers. Every event +// is delivered to every watcher. type Mux struct { lock sync.Mutex @@ -33,7 +34,7 @@ type Mux struct { // NewMux creates a new Mux. queueLength is the maximum number of events to queue. // When queueLength is 0, Action will block until any prior event has been // completely distributed. It is guaranteed that events will be distibuted in the -// order in which they occurr, but the order in which a single event is distributed +// order in which they ocurr, but the order in which a single event is distributed // among all of the watchers is unspecified. func NewMux(queueLength int) *Mux { m := &Mux{