mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Add filter to watch
This commit is contained in:
parent
ca55bfb29c
commit
d900134a60
68
pkg/watch/filter.go
Normal file
68
pkg/watch/filter.go
Normal file
@ -0,0 +1,68 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package watch
|
||||
|
||||
import ()
|
||||
|
||||
// FilterFunc should take an event, possibly modify it in some way, and return
|
||||
// the modified event. If the event should be ignored, then return keep=false.
|
||||
type FilterFunc func(in Event) (out Event, keep bool)
|
||||
|
||||
// Filter passes all events through f before allowing them to pass on.
|
||||
// Putting a filter on a watch, as an unavoidable side-effect due to the way
|
||||
// go channels work, effectively causes the watch's event channel to have its
|
||||
// queue length increased by one.
|
||||
func Filter(w Interface, f FilterFunc) Interface {
|
||||
fw := &filteredWatch{
|
||||
incoming: w,
|
||||
result: make(chan Event),
|
||||
f: f,
|
||||
}
|
||||
go fw.loop()
|
||||
return fw
|
||||
}
|
||||
|
||||
type filteredWatch struct {
|
||||
incoming Interface
|
||||
result chan Event
|
||||
f FilterFunc
|
||||
}
|
||||
|
||||
// ResultChan returns a channel which will receive filtered events.
|
||||
func (fw *filteredWatch) ResultChan() <-chan Event {
|
||||
return fw.result
|
||||
}
|
||||
|
||||
// Stop stops the upstream watch, which will eventually stop this watch.
|
||||
func (fw *filteredWatch) Stop() {
|
||||
fw.incoming.Stop()
|
||||
}
|
||||
|
||||
// loop waits for new values, filters them, and resends them.
|
||||
func (fw *filteredWatch) loop() {
|
||||
defer close(fw.result)
|
||||
for {
|
||||
event, ok := <-fw.incoming.ResultChan()
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
filtered, keep := fw.f(event)
|
||||
if keep {
|
||||
fw.result <- filtered
|
||||
}
|
||||
}
|
||||
}
|
82
pkg/watch/filter_test.go
Normal file
82
pkg/watch/filter_test.go
Normal file
@ -0,0 +1,82 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package watch
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestFilter(t *testing.T) {
|
||||
table := []Event{
|
||||
{Added, "foo"},
|
||||
{Added, "bar"},
|
||||
{Added, "baz"},
|
||||
{Added, "qux"},
|
||||
{Added, "zoo"},
|
||||
}
|
||||
|
||||
source := NewFake()
|
||||
filtered := Filter(source, func(e Event) (Event, bool) {
|
||||
return e, e.Object.(string)[0] != 'b'
|
||||
})
|
||||
|
||||
go func() {
|
||||
for _, item := range table {
|
||||
source.Action(item.Type, item.Object)
|
||||
}
|
||||
source.Stop()
|
||||
}()
|
||||
|
||||
var got []string
|
||||
for {
|
||||
event, ok := <-filtered.ResultChan()
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
got = append(got, event.Object.(string))
|
||||
}
|
||||
|
||||
if e, a := []string{"foo", "qux", "zoo"}, got; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("got %v, wanted %v", e, a)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFilterStop(t *testing.T) {
|
||||
source := NewFake()
|
||||
filtered := Filter(source, func(e Event) (Event, bool) {
|
||||
return e, e.Object.(string)[0] != 'b'
|
||||
})
|
||||
|
||||
go func() {
|
||||
source.Add("foo")
|
||||
filtered.Stop()
|
||||
}()
|
||||
|
||||
var got []string
|
||||
for {
|
||||
event, ok := <-filtered.ResultChan()
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
got = append(got, event.Object.(string))
|
||||
}
|
||||
|
||||
if e, a := []string{"foo"}, got; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("got %v, wanted %v", e, a)
|
||||
}
|
||||
}
|
@ -20,7 +20,8 @@ import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Mux distributes event notifications among any number of watchers.
|
||||
// Mux distributes event notifications among any number of watchers. Every event
|
||||
// is delivered to every watcher.
|
||||
type Mux struct {
|
||||
lock sync.Mutex
|
||||
|
||||
@ -33,7 +34,7 @@ type Mux struct {
|
||||
// NewMux creates a new Mux. queueLength is the maximum number of events to queue.
|
||||
// When queueLength is 0, Action will block until any prior event has been
|
||||
// completely distributed. It is guaranteed that events will be distibuted in the
|
||||
// order in which they occurr, but the order in which a single event is distributed
|
||||
// order in which they ocurr, but the order in which a single event is distributed
|
||||
// among all of the watchers is unspecified.
|
||||
func NewMux(queueLength int) *Mux {
|
||||
m := &Mux{
|
||||
|
Loading…
Reference in New Issue
Block a user