mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-11 22:20:18 +00:00
Fix cache to use the "List then Watch" pattern.
This commit is contained in:
17
pkg/client/cache/fifo.go
vendored
17
pkg/client/cache/fifo.go
vendored
@@ -116,6 +116,23 @@ func (f *FIFO) Pop() interface{} {
|
||||
}
|
||||
}
|
||||
|
||||
// Replace will delete the contents of 'f', using instead the given map.
|
||||
// 'f' takes ownersip of the map, you should not reference the map again
|
||||
// after calling this function. f's queue is reset, too; upon return, it
|
||||
// will contain the items in the map, in no particular order.
|
||||
func (f *FIFO) Replace(idToObj map[string]interface{}) {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
f.items = idToObj
|
||||
f.queue = f.queue[:0]
|
||||
for id := range idToObj {
|
||||
f.queue = append(f.queue, id)
|
||||
}
|
||||
if len(f.queue) > 0 {
|
||||
f.cond.Broadcast()
|
||||
}
|
||||
}
|
||||
|
||||
// NewFIFO returns a Store which can be used to queue up items to
|
||||
// process.
|
||||
func NewFIFO() *FIFO {
|
||||
|
26
pkg/client/cache/fifo_test.go
vendored
26
pkg/client/cache/fifo_test.go
vendored
@@ -81,3 +81,29 @@ func TestFIFO_addUpdate(t *testing.T) {
|
||||
t.Errorf("item did not get removed")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFIFO_addReplace(t *testing.T) {
|
||||
f := NewFIFO()
|
||||
f.Add("foo", 10)
|
||||
f.Replace(map[string]interface{}{"foo": 15})
|
||||
got := make(chan int, 2)
|
||||
go func() {
|
||||
for {
|
||||
got <- f.Pop().(int)
|
||||
}
|
||||
}()
|
||||
|
||||
first := <-got
|
||||
if e, a := 15, first; e != a {
|
||||
t.Errorf("Didn't get updated value (%v), got %v", e, a)
|
||||
}
|
||||
select {
|
||||
case unexpected := <-got:
|
||||
t.Errorf("Got second value %v", unexpected)
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
}
|
||||
_, exists := f.Get("foo")
|
||||
if exists {
|
||||
t.Errorf("item did not get removed")
|
||||
}
|
||||
}
|
||||
|
89
pkg/client/cache/reflector.go
vendored
89
pkg/client/cache/reflector.go
vendored
@@ -17,6 +17,7 @@ limitations under the License.
|
||||
package cache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
@@ -26,58 +27,106 @@ import (
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
|
||||
type ListerWatcher interface {
|
||||
// List should return a list type object; the Items field will be extracted, and the
|
||||
// ResourceVersion field will be used to start the watch in the right place.
|
||||
List() (runtime.Object, error)
|
||||
// Watch should begin a watch at the specified version.
|
||||
Watch(resourceVersion uint64) (watch.Interface, error)
|
||||
}
|
||||
|
||||
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
|
||||
type Reflector struct {
|
||||
// The type of object we expect to place in the store.
|
||||
expectedType reflect.Type
|
||||
// The destination to sync up with the watch source
|
||||
store Store
|
||||
// watchFactory is called to initiate watches.
|
||||
watchFactory WatchFactory
|
||||
// listerWatcher is used to perform lists and watches.
|
||||
listerWatcher ListerWatcher
|
||||
// period controls timing between one watch ending and
|
||||
// the beginning of the next one.
|
||||
period time.Duration
|
||||
}
|
||||
|
||||
// WatchFactory should begin a watch at the specified version.
|
||||
type WatchFactory func(resourceVersion uint64) (watch.Interface, error)
|
||||
|
||||
// NewReflector creates a new Reflector object which will keep the given store up to
|
||||
// date with the server's contents for the given resource. Reflector promises to
|
||||
// only put things in the store that have the type of expectedType.
|
||||
func NewReflector(watchFactory WatchFactory, expectedType interface{}, store Store) *Reflector {
|
||||
gc := &Reflector{
|
||||
watchFactory: watchFactory,
|
||||
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store) *Reflector {
|
||||
r := &Reflector{
|
||||
listerWatcher: lw,
|
||||
store: store,
|
||||
expectedType: reflect.TypeOf(expectedType),
|
||||
period: time.Second,
|
||||
}
|
||||
return gc
|
||||
return r
|
||||
}
|
||||
|
||||
// Run starts a watch and handles watch events. Will restart the watch if it is closed.
|
||||
// Run starts a goroutine and returns immediately.
|
||||
func (gc *Reflector) Run() {
|
||||
func (r *Reflector) Run() {
|
||||
go util.Forever(func() { r.listAndWatch() }, r.period)
|
||||
}
|
||||
|
||||
func (r *Reflector) listAndWatch() {
|
||||
var resourceVersion uint64
|
||||
go util.Forever(func() {
|
||||
w, err := gc.watchFactory(resourceVersion)
|
||||
|
||||
list, err := r.listerWatcher.List()
|
||||
if err != nil {
|
||||
glog.Errorf("failed to watch %v: %v", gc.expectedType, err)
|
||||
glog.Errorf("Failed to list %v: %v", r.expectedType, err)
|
||||
return
|
||||
}
|
||||
gc.watchHandler(w, &resourceVersion)
|
||||
}, gc.period)
|
||||
jsonBase, err := runtime.FindJSONBase(list)
|
||||
if err != nil {
|
||||
glog.Errorf("Unable to understand list result %#v", list)
|
||||
return
|
||||
}
|
||||
resourceVersion = jsonBase.ResourceVersion()
|
||||
items, err := runtime.ExtractList(list)
|
||||
if err != nil {
|
||||
glog.Errorf("Unable to understand list result %#v (%v)", list, err)
|
||||
return
|
||||
}
|
||||
err = r.syncWith(items)
|
||||
if err != nil {
|
||||
glog.Errorf("Unable to sync list result: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
w, err := r.listerWatcher.Watch(resourceVersion)
|
||||
if err != nil {
|
||||
glog.Errorf("failed to watch %v: %v", r.expectedType, err)
|
||||
return
|
||||
}
|
||||
r.watchHandler(w, &resourceVersion)
|
||||
}
|
||||
}
|
||||
|
||||
// syncWith replaces the store's items with the given list.
|
||||
func (r *Reflector) syncWith(items []runtime.Object) error {
|
||||
found := map[string]interface{}{}
|
||||
for _, item := range items {
|
||||
jsonBase, err := runtime.FindJSONBase(item)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unexpected item in list: %v", err)
|
||||
}
|
||||
found[jsonBase.ID()] = item
|
||||
}
|
||||
|
||||
r.store.Replace(found)
|
||||
return nil
|
||||
}
|
||||
|
||||
// watchHandler watches w and keeps *resourceVersion up to date.
|
||||
func (gc *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) {
|
||||
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) {
|
||||
for {
|
||||
event, ok := <-w.ResultChan()
|
||||
if !ok {
|
||||
glog.Errorf("unexpected watch close")
|
||||
return
|
||||
}
|
||||
if e, a := gc.expectedType, reflect.TypeOf(event.Object); e != a {
|
||||
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
|
||||
glog.Errorf("expected type %v, but watch event object had type %v", e, a)
|
||||
continue
|
||||
}
|
||||
@@ -88,14 +137,14 @@ func (gc *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) {
|
||||
}
|
||||
switch event.Type {
|
||||
case watch.Added:
|
||||
gc.store.Add(jsonBase.ID(), event.Object)
|
||||
r.store.Add(jsonBase.ID(), event.Object)
|
||||
case watch.Modified:
|
||||
gc.store.Update(jsonBase.ID(), event.Object)
|
||||
r.store.Update(jsonBase.ID(), event.Object)
|
||||
case watch.Deleted:
|
||||
// TODO: Will any consumers need access to the "last known
|
||||
// state", which is passed in event.Object? If so, may need
|
||||
// to change this.
|
||||
gc.store.Delete(jsonBase.ID())
|
||||
r.store.Delete(jsonBase.ID())
|
||||
default:
|
||||
glog.Errorf("unable to understand watch event %#v", event)
|
||||
}
|
||||
|
129
pkg/client/cache/reflector_test.go
vendored
129
pkg/client/cache/reflector_test.go
vendored
@@ -17,15 +17,27 @@ limitations under the License.
|
||||
package cache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
type testLW struct {
|
||||
ListFunc func() (runtime.Object, error)
|
||||
WatchFunc func(resourceVersion uint64) (watch.Interface, error)
|
||||
}
|
||||
|
||||
func (t *testLW) List() (runtime.Object, error) { return t.ListFunc() }
|
||||
func (t *testLW) Watch(resourceVersion uint64) (watch.Interface, error) {
|
||||
return t.WatchFunc(resourceVersion)
|
||||
}
|
||||
|
||||
func TestReflector_watchHandler(t *testing.T) {
|
||||
s := NewStore()
|
||||
g := NewReflector(nil, &api.Pod{}, s)
|
||||
g := NewReflector(&testLW{}, &api.Pod{}, s)
|
||||
fw := watch.NewFake()
|
||||
s.Add("foo", &api.Pod{JSONBase: api.JSONBase{ID: "foo"}})
|
||||
s.Add("bar", &api.Pod{JSONBase: api.JSONBase{ID: "bar"}})
|
||||
@@ -68,13 +80,15 @@ func TestReflector_watchHandler(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestReflector_Run(t *testing.T) {
|
||||
func TestReflector_listAndWatch(t *testing.T) {
|
||||
createdFakes := make(chan *watch.FakeWatcher)
|
||||
|
||||
// Expect our starter to get called at the beginning of the watch with 0, and again with 3 when we
|
||||
// inject an error at 2.
|
||||
expectedRVs := []uint64{0, 3}
|
||||
watchStarter := func(rv uint64) (watch.Interface, error) {
|
||||
// The ListFunc says that it's at revision 1. Therefore, we expect our WatchFunc
|
||||
// to get called at the beginning of the watch with 1, and again with 4 when we
|
||||
// inject an error at 3.
|
||||
expectedRVs := []uint64{1, 4}
|
||||
lw := &testLW{
|
||||
WatchFunc: func(rv uint64) (watch.Interface, error) {
|
||||
fw := watch.NewFake()
|
||||
if e, a := expectedRVs[0], rv; e != a {
|
||||
t.Errorf("Expected rv %v, but got %v", e, a)
|
||||
@@ -84,11 +98,14 @@ func TestReflector_Run(t *testing.T) {
|
||||
// we don't want to block here, so report the new fake via a go routine.
|
||||
go func() { createdFakes <- fw }()
|
||||
return fw, nil
|
||||
},
|
||||
ListFunc: func() (runtime.Object, error) {
|
||||
return &api.PodList{JSONBase: api.JSONBase{ResourceVersion: 1}}, nil
|
||||
},
|
||||
}
|
||||
s := NewFIFO()
|
||||
r := NewReflector(watchStarter, &api.Pod{}, s)
|
||||
r.period = 0
|
||||
r.Run()
|
||||
r := NewReflector(lw, &api.Pod{}, s)
|
||||
go r.listAndWatch()
|
||||
|
||||
ids := []string{"foo", "bar", "baz", "qux", "zoo"}
|
||||
var fw *watch.FakeWatcher
|
||||
@@ -96,9 +113,9 @@ func TestReflector_Run(t *testing.T) {
|
||||
if fw == nil {
|
||||
fw = <-createdFakes
|
||||
}
|
||||
sendingRV := uint64(i + 1)
|
||||
sendingRV := uint64(i + 2)
|
||||
fw.Add(&api.Pod{JSONBase: api.JSONBase{ID: id, ResourceVersion: sendingRV}})
|
||||
if sendingRV == 2 {
|
||||
if sendingRV == 3 {
|
||||
// Inject a failure.
|
||||
fw.Stop()
|
||||
fw = nil
|
||||
@@ -111,7 +128,7 @@ func TestReflector_Run(t *testing.T) {
|
||||
if e, a := id, pod.ID; e != a {
|
||||
t.Errorf("%v: Expected %v, got %v", i, e, a)
|
||||
}
|
||||
if e, a := uint64(i+1), pod.ResourceVersion; e != a {
|
||||
if e, a := uint64(i+2), pod.ResourceVersion; e != a {
|
||||
t.Errorf("%v: Expected %v, got %v", i, e, a)
|
||||
}
|
||||
}
|
||||
@@ -120,3 +137,91 @@ func TestReflector_Run(t *testing.T) {
|
||||
t.Error("called watchStarter an unexpected number of times")
|
||||
}
|
||||
}
|
||||
|
||||
func TestReflector_listAndWatchWithErrors(t *testing.T) {
|
||||
mkPod := func(id string, rv uint64) *api.Pod {
|
||||
return &api.Pod{JSONBase: api.JSONBase{ID: id, ResourceVersion: rv}}
|
||||
}
|
||||
mkList := func(rv uint64, pods ...*api.Pod) *api.PodList {
|
||||
list := &api.PodList{JSONBase: api.JSONBase{ResourceVersion: rv}}
|
||||
for _, pod := range pods {
|
||||
list.Items = append(list.Items, *pod)
|
||||
}
|
||||
return list
|
||||
}
|
||||
table := []struct {
|
||||
list *api.PodList
|
||||
listErr error
|
||||
events []watch.Event
|
||||
watchErr error
|
||||
}{
|
||||
{
|
||||
list: mkList(1),
|
||||
events: []watch.Event{
|
||||
{watch.Added, mkPod("foo", 2)},
|
||||
{watch.Added, mkPod("bar", 3)},
|
||||
},
|
||||
}, {
|
||||
list: mkList(3, mkPod("foo", 2), mkPod("bar", 3)),
|
||||
events: []watch.Event{
|
||||
{watch.Deleted, mkPod("foo", 4)},
|
||||
{watch.Added, mkPod("qux", 5)},
|
||||
},
|
||||
}, {
|
||||
listErr: fmt.Errorf("a list error"),
|
||||
}, {
|
||||
list: mkList(5, mkPod("bar", 3), mkPod("qux", 5)),
|
||||
watchErr: fmt.Errorf("a watch error"),
|
||||
}, {
|
||||
list: mkList(5, mkPod("bar", 3), mkPod("qux", 5)),
|
||||
events: []watch.Event{
|
||||
{watch.Added, mkPod("baz", 6)},
|
||||
},
|
||||
}, {
|
||||
list: mkList(6, mkPod("bar", 3), mkPod("qux", 5), mkPod("baz", 6)),
|
||||
},
|
||||
}
|
||||
|
||||
s := NewFIFO()
|
||||
for line, item := range table {
|
||||
if item.list != nil {
|
||||
// Test that the list is what currently exists in the store.
|
||||
current := s.List()
|
||||
checkMap := map[string]uint64{}
|
||||
for _, item := range current {
|
||||
pod := item.(*api.Pod)
|
||||
checkMap[pod.ID] = pod.ResourceVersion
|
||||
}
|
||||
for _, pod := range item.list.Items {
|
||||
if e, a := pod.ResourceVersion, checkMap[pod.ID]; e != a {
|
||||
t.Errorf("%v: expected %v, got %v for pod %v", line, e, a, pod.ID)
|
||||
}
|
||||
}
|
||||
if e, a := len(item.list.Items), len(checkMap); e != a {
|
||||
t.Errorf("%v: expected %v, got %v", line, e, a)
|
||||
}
|
||||
}
|
||||
watchRet, watchErr := item.events, item.watchErr
|
||||
lw := &testLW{
|
||||
WatchFunc: func(rv uint64) (watch.Interface, error) {
|
||||
if watchErr != nil {
|
||||
return nil, watchErr
|
||||
}
|
||||
watchErr = fmt.Errorf("second watch")
|
||||
fw := watch.NewFake()
|
||||
go func() {
|
||||
for _, e := range watchRet {
|
||||
fw.Action(e.Type, e.Object)
|
||||
}
|
||||
fw.Stop()
|
||||
}()
|
||||
return fw, nil
|
||||
},
|
||||
ListFunc: func() (runtime.Object, error) {
|
||||
return item.list, item.listErr
|
||||
},
|
||||
}
|
||||
r := NewReflector(lw, &api.Pod{}, s)
|
||||
r.listAndWatch()
|
||||
}
|
||||
}
|
||||
|
14
pkg/client/cache/store.go
vendored
14
pkg/client/cache/store.go
vendored
@@ -33,6 +33,11 @@ type Store interface {
|
||||
List() []interface{}
|
||||
Contains() util.StringSet
|
||||
Get(id string) (item interface{}, exists bool)
|
||||
|
||||
// Replace will delete the contents of the store, using instead the
|
||||
// given map. Store takes ownership of the map, you should not reference
|
||||
// it after calling this function.
|
||||
Replace(idToObj map[string]interface{})
|
||||
}
|
||||
|
||||
type cache struct {
|
||||
@@ -95,6 +100,15 @@ func (c *cache) Get(id string) (item interface{}, exists bool) {
|
||||
return item, exists
|
||||
}
|
||||
|
||||
// Replace will delete the contents of 'c', using instead the given map.
|
||||
// 'c' takes ownership of the map, you should not reference the map again
|
||||
// after calling this function.
|
||||
func (c *cache) Replace(idToObj map[string]interface{}) {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
c.items = idToObj
|
||||
}
|
||||
|
||||
// NewStore returns a Store implemented simply with a map and a lock.
|
||||
func NewStore() Store {
|
||||
return &cache{items: map[string]interface{}{}}
|
||||
|
32
pkg/client/cache/store_test.go
vendored
32
pkg/client/cache/store_test.go
vendored
@@ -45,10 +45,11 @@ func doTestStore(t *testing.T, store Store) {
|
||||
t.Errorf("found deleted item??")
|
||||
}
|
||||
|
||||
// Test List
|
||||
// Test List.
|
||||
store.Add("a", "b")
|
||||
store.Add("c", "d")
|
||||
store.Add("e", "e")
|
||||
{
|
||||
found := util.StringSet{}
|
||||
for _, item := range store.List() {
|
||||
found.Insert(item.(string))
|
||||
@@ -68,6 +69,35 @@ func doTestStore(t *testing.T, store Store) {
|
||||
if len(ids) != 3 {
|
||||
t.Errorf("extra items")
|
||||
}
|
||||
}
|
||||
|
||||
// Test Replace.
|
||||
store.Replace(map[string]interface{}{
|
||||
"foo": "foo",
|
||||
"bar": "bar",
|
||||
})
|
||||
|
||||
{
|
||||
found := util.StringSet{}
|
||||
for _, item := range store.List() {
|
||||
found.Insert(item.(string))
|
||||
}
|
||||
if !found.HasAll("foo", "bar") {
|
||||
t.Errorf("missing items")
|
||||
}
|
||||
if len(found) != 2 {
|
||||
t.Errorf("extra items")
|
||||
}
|
||||
|
||||
// Check that ID list is correct.
|
||||
ids := store.Contains()
|
||||
if !ids.HasAll("foo", "bar") {
|
||||
t.Errorf("missing items")
|
||||
}
|
||||
if len(ids) != 2 {
|
||||
t.Errorf("extra items")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCache(t *testing.T) {
|
||||
|
Reference in New Issue
Block a user