From 041d56f3d0aa43c95f446903d91200406bfe0dc5 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Sun, 3 Aug 2014 15:36:36 -0700 Subject: [PATCH] finish testing client/cache --- pkg/api/jsonbase.go | 12 +-- pkg/api/jsonbase_test.go | 11 ++- pkg/client/cache/doc.go | 2 +- pkg/client/cache/fifo.go | 10 ++- pkg/client/cache/fifo_test.go | 83 +++++++++++++++++ pkg/client/cache/{getter.go => reflector.go} | 40 +++++---- pkg/client/cache/reflector_test.go | 94 ++++++++++++++++++++ pkg/client/cache/{cache.go => store.go} | 2 + pkg/client/cache/store_test.go | 4 + 9 files changed, 227 insertions(+), 31 deletions(-) create mode 100644 pkg/client/cache/fifo_test.go rename pkg/client/cache/{getter.go => reflector.go} (69%) create mode 100644 pkg/client/cache/reflector_test.go rename pkg/client/cache/{cache.go => store.go} (91%) diff --git a/pkg/api/jsonbase.go b/pkg/api/jsonbase.go index 24b12960f68..46b62418f31 100644 --- a/pkg/api/jsonbase.go +++ b/pkg/api/jsonbase.go @@ -123,20 +123,16 @@ func fieldPtr(v reflect.Value, fieldName string, dest interface{}) error { // Returns an error if this isn't the case. func newGenericJSONBase(v reflect.Value) (genericJSONBase, error) { g := genericJSONBase{} - err := fieldPtr(v, "ID", &g.id) - if err != nil { + if err := fieldPtr(v, "ID", &g.id); err != nil { return g, err } - err = fieldPtr(v, "APIVersion", &g.apiVersion) - if err != nil { + if err := fieldPtr(v, "APIVersion", &g.apiVersion); err != nil { return g, err } - err = fieldPtr(v, "Kind", &g.kind) - if err != nil { + if err := fieldPtr(v, "Kind", &g.kind); err != nil { return g, err } - err = fieldPtr(v, "ResourceVersion", &g.resourceVersion) - if err != nil { + if err := fieldPtr(v, "ResourceVersion", &g.resourceVersion); err != nil { return g, err } return g, nil diff --git a/pkg/api/jsonbase_test.go b/pkg/api/jsonbase_test.go index 1b0500f0693..413183a0a01 100644 --- a/pkg/api/jsonbase_test.go +++ b/pkg/api/jsonbase_test.go @@ -23,6 +23,7 @@ import ( func TestGenericJSONBase(t *testing.T) { j := JSONBase{ + ID: "foo", APIVersion: "a", Kind: "b", ResourceVersion: 1, @@ -31,8 +32,11 @@ func TestGenericJSONBase(t *testing.T) { if err != nil { t.Fatalf("new err: %v", err) } - // Proove g supports JSONBaseInterface. + // Prove g supports JSONBaseInterface. jbi := JSONBaseInterface(g) + if e, a := "foo", jbi.ID(); e != a { + t.Errorf("expected %v, got %v", e, a) + } if e, a := "a", jbi.APIVersion(); e != a { t.Errorf("expected %v, got %v", e, a) } @@ -43,10 +47,15 @@ func TestGenericJSONBase(t *testing.T) { t.Errorf("expected %v, got %v", e, a) } + jbi.SetID("bar") jbi.SetAPIVersion("c") jbi.SetKind("d") jbi.SetResourceVersion(2) + // Prove that jbi changes the original object. + if e, a := "bar", j.ID; e != a { + t.Errorf("expected %v, got %v", e, a) + } if e, a := "c", j.APIVersion; e != a { t.Errorf("expected %v, got %v", e, a) } diff --git a/pkg/client/cache/doc.go b/pkg/client/cache/doc.go index 2a34439e656..b7a70d9999c 100644 --- a/pkg/client/cache/doc.go +++ b/pkg/client/cache/doc.go @@ -16,7 +16,7 @@ limitations under the License. // Package cache is a client-side caching mechanism. It is useful for // reducing the number of server calls you'd otherwise need to make. -// Getter watches a server and updates a Store. Two stores are provided; +// Reflector watches a server and updates a Store. Two stores are provided; // one that simply caches objects (for example, to allow a scheduler to // list currently available minions), and one that additionally acts as // a FIFO queue (for example, to allow a scheduler to process incoming diff --git a/pkg/client/cache/fifo.go b/pkg/client/cache/fifo.go index 25a256b2aa0..0b24e414b39 100644 --- a/pkg/client/cache/fifo.go +++ b/pkg/client/cache/fifo.go @@ -20,6 +20,11 @@ import ( "sync" ) +// FIFO recieves adds and updates from a Reflector, and puts them in a queue for +// FIFO order processing. If multiple adds/updates of a single item happen while +// an item is in the queue before it has been processed, it will only be +// processed once, and when it is processed, the most recent version will be +// processed. This can't be done with a channel. type FIFO struct { lock sync.RWMutex cond sync.Cond @@ -91,13 +96,14 @@ func (f *FIFO) Pop() interface{} { // Item may have been deleted subsequently. continue } + delete(f.items, id) return item } } -// NewFIFOStore returns a Store which can be used to queue up items to +// NewFIFO returns a Store which can be used to queue up items to // process. -func NewFIFOStore() *FIFO { +func NewFIFO() *FIFO { f := &FIFO{ items: map[string]interface{}{}, queue: []string{}, diff --git a/pkg/client/cache/fifo_test.go b/pkg/client/cache/fifo_test.go new file mode 100644 index 00000000000..f86360aa327 --- /dev/null +++ b/pkg/client/cache/fifo_test.go @@ -0,0 +1,83 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "testing" + "time" +) + +func TestFIFO_basic(t *testing.T) { + f := NewFIFO() + const amount = 500 + go func() { + for i := 0; i < amount; i++ { + f.Add(string([]rune{'a', rune(i)}), i+1) + } + }() + go func() { + for u := uint(0); u < amount; u++ { + f.Add(string([]rune{'b', rune(u)}), u+1) + } + }() + + lastInt := int(0) + lastUint := uint(0) + for i := 0; i < amount*2; i++ { + switch obj := f.Pop().(type) { + case int: + if obj <= lastInt { + t.Errorf("got %v (int) out of order, last was %v", obj, lastInt) + } + lastInt = obj + case uint: + if obj <= lastUint { + t.Errorf("got %v (uint) out of order, last was %v", obj, lastUint) + } else { + lastUint = obj + } + default: + t.Fatalf("unexpected type %#v", obj) + } + } +} + +func TestFIFO_addUpdate(t *testing.T) { + f := NewFIFO() + f.Add("foo", 10) + f.Update("foo", 15) + got := make(chan int, 2) + go func() { + for { + got <- f.Pop().(int) + } + }() + + first := <-got + if e, a := 15, first; e != a { + t.Errorf("Didn't get updated value (%v), got %v", e, a) + } + select { + case unexpected := <-got: + t.Errorf("Got second value %v", unexpected) + case <-time.After(50 * time.Millisecond): + } + _, exists := f.Get("foo") + if exists { + t.Errorf("item did not get removed") + } +} diff --git a/pkg/client/cache/getter.go b/pkg/client/cache/reflector.go similarity index 69% rename from pkg/client/cache/getter.go rename to pkg/client/cache/reflector.go index fad79b581e4..bb7be90660a 100644 --- a/pkg/client/cache/getter.go +++ b/pkg/client/cache/reflector.go @@ -27,9 +27,9 @@ import ( "github.com/golang/glog" ) -// Store is a generic object storage interface. Getter knows how to watch a server -// and update a store. A generic store is provided, which allows Getter to be used -// as a local caching system, and an LRU store, which allows Getter to work like a +// Store is a generic object storage interface. Reflector knows how to watch a server +// and update a store. A generic store is provided, which allows Reflector to be used +// as a local caching system, and an LRU store, which allows Reflector to work like a // queue of items yet to be processed. type Store interface { Add(ID string, obj interface{}) @@ -39,20 +39,20 @@ type Store interface { Get(ID string) (item interface{}, exists bool) } -// Getter watches a specified resource and causes all changes to be reflected in the given store. -type Getter struct { +// Reflector watches a specified resource and causes all changes to be reflected in the given store. +type Reflector struct { kubeClient *client.Client resource string expectedType reflect.Type store Store } -// NewGetter makes a new Getter object which will keep the given store up to -// date with the server's contents for the given resource. Getter promises to +// NewReflector makes a new Reflector object which will keep the given store up to +// date with the server's contents for the given resource. Reflector promises to // only put things in the store that have the type of expectedType. // TODO: define a query so you only locally cache a subset of items. -func NewGetter(resource string, kubeClient *client.Client, expectedType interface{}, store Store) *Getter { - gc := &Getter{ +func NewReflector(resource string, kubeClient *client.Client, expectedType interface{}, store Store) *Reflector { + gc := &Reflector{ resource: resource, kubeClient: kubeClient, store: store, @@ -61,20 +61,22 @@ func NewGetter(resource string, kubeClient *client.Client, expectedType interfac return gc } -func (gc *Getter) Run() { - go util.Forever(gc.watch, 5*time.Second) +func (gc *Reflector) Run() { + go util.Forever(func() { + w, err := gc.startWatch() + if err != nil { + glog.Errorf("failed to watch %v: %v", gc.resource, err) + return + } + gc.watchHandler(w) + }, 5*time.Second) } -func (gc *Getter) watch() { - w, err := gc.kubeClient.Get().Path(gc.resource).Watch() - if err != nil { - glog.Errorf("failed to watch %v: %v", gc.resource, err) - return - } - gc.watchHandler(w) +func (gc *Reflector) startWatch() (watch.Interface, error) { + return gc.kubeClient.Get().Path(gc.resource).Path("watch").Watch() } -func (gc *Getter) watchHandler(w watch.Interface) { +func (gc *Reflector) watchHandler(w watch.Interface) { for { event, ok := <-w.ResultChan() if !ok { diff --git a/pkg/client/cache/reflector_test.go b/pkg/client/cache/reflector_test.go new file mode 100644 index 00000000000..ec32a7727a7 --- /dev/null +++ b/pkg/client/cache/reflector_test.go @@ -0,0 +1,94 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +func TestReflector_watchHandler(t *testing.T) { + s := NewStore() + g := NewReflector("foo", nil, &api.Pod{}, s) + fw := watch.NewFake() + s.Add("foo", &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}) + s.Add("bar", &api.Pod{JSONBase: api.JSONBase{ID: "bar"}}) + go func() { + fw.Modify(&api.Pod{JSONBase: api.JSONBase{ID: "bar", ResourceVersion: 55}}) + fw.Add(&api.Pod{JSONBase: api.JSONBase{ID: "baz"}}) + fw.Add(&api.Service{JSONBase: api.JSONBase{ID: "rejected"}}) + fw.Delete(&api.Pod{JSONBase: api.JSONBase{ID: "foo"}}) + fw.Stop() + }() + g.watchHandler(fw) + + table := []struct { + ID string + RV uint64 + exists bool + }{ + {"foo", 0, false}, + {"rejected", 0, false}, + {"bar", 55, true}, + {"baz", 0, true}, + } + for _, item := range table { + obj, exists := s.Get(item.ID) + if e, a := item.exists, exists; e != a { + t.Errorf("%v: expected %v, got %v", item.ID, e, a) + } + if !exists { + continue + } + if e, a := item.RV, obj.(*api.Pod).ResourceVersion; e != a { + t.Errorf("%v: expected %v, got %v", item.ID, e, a) + } + } +} + +func TestReflector_startWatch(t *testing.T) { + table := []struct{ resource, path string }{ + {"pods", "/api/v1beta1/pods/watch"}, + {"services", "/api/v1beta1/services/watch"}, + } + for _, testItem := range table { + got := make(chan struct{}) + srv := httptest.NewServer(http.HandlerFunc( + func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(http.StatusNotFound) + if req.URL.Path == testItem.path { + close(got) + return + } + t.Errorf("unexpected path %v", req.URL.Path) + })) + s := NewStore() + c := client.New(srv.URL, nil) + g := NewReflector(testItem.resource, c, &api.Pod{}, s) + _, err := g.startWatch() + // We're just checking that it watches the right path. + if err == nil { + t.Errorf("unexpected non-error") + } + <-got + } +} diff --git a/pkg/client/cache/cache.go b/pkg/client/cache/store.go similarity index 91% rename from pkg/client/cache/cache.go rename to pkg/client/cache/store.go index c0bfb20a6c4..016aa18ce84 100644 --- a/pkg/client/cache/cache.go +++ b/pkg/client/cache/store.go @@ -47,6 +47,7 @@ func (c *cache) Delete(ID string, obj interface{}) { } // List returns a list of all the items. +// List is completely threadsafe as long as you treat all items as immutable. func (c *cache) List() []interface{} { c.lock.RLock() defer c.lock.RUnlock() @@ -58,6 +59,7 @@ func (c *cache) List() []interface{} { } // Get returns the requested item, or sets exists=false. +// Get is completely threadsafe as long as you treat all items as immutable. func (c *cache) Get(ID string) (item interface{}, exists bool) { c.lock.RLock() defer c.lock.RUnlock() diff --git a/pkg/client/cache/store_test.go b/pkg/client/cache/store_test.go index c84d2926c95..9154488c31a 100644 --- a/pkg/client/cache/store_test.go +++ b/pkg/client/cache/store_test.go @@ -62,3 +62,7 @@ func doTestStore(t *testing.T, store Store) { func TestCache(t *testing.T) { doTestStore(t, NewStore()) } + +func TestFIFOCache(t *testing.T) { + doTestStore(t, NewFIFO()) +}