diff --git a/pkg/api/jsonbase.go b/pkg/api/jsonbase.go index 4ce61709439..46b62418f31 100644 --- a/pkg/api/jsonbase.go +++ b/pkg/api/jsonbase.go @@ -44,6 +44,8 @@ func (v JSONBaseVersioning) SetResourceVersion(obj interface{}, version uint64) // JSONBase lets you work with a JSONBase from any of the versioned or // internal APIObjects. type JSONBaseInterface interface { + ID() string + SetID(ID string) APIVersion() string SetAPIVersion(version string) Kind() string @@ -53,11 +55,20 @@ type JSONBaseInterface interface { } type genericJSONBase struct { + id *string apiVersion *string kind *string resourceVersion *uint64 } +func (g genericJSONBase) ID() string { + return *g.id +} + +func (g genericJSONBase) SetID(id string) { + *g.id = id +} + func (g genericJSONBase) APIVersion() string { return *g.apiVersion } @@ -112,16 +123,16 @@ func fieldPtr(v reflect.Value, fieldName string, dest interface{}) error { // Returns an error if this isn't the case. func newGenericJSONBase(v reflect.Value) (genericJSONBase, error) { g := genericJSONBase{} - err := fieldPtr(v, "APIVersion", &g.apiVersion) - if err != nil { + if err := fieldPtr(v, "ID", &g.id); err != nil { return g, err } - err = fieldPtr(v, "Kind", &g.kind) - if err != nil { + if err := fieldPtr(v, "APIVersion", &g.apiVersion); err != nil { return g, err } - err = fieldPtr(v, "ResourceVersion", &g.resourceVersion) - if err != nil { + if err := fieldPtr(v, "Kind", &g.kind); err != nil { + return g, err + } + if err := fieldPtr(v, "ResourceVersion", &g.resourceVersion); err != nil { return g, err } return g, nil diff --git a/pkg/api/jsonbase_test.go b/pkg/api/jsonbase_test.go index 1b0500f0693..413183a0a01 100644 --- a/pkg/api/jsonbase_test.go +++ b/pkg/api/jsonbase_test.go @@ -23,6 +23,7 @@ import ( func TestGenericJSONBase(t *testing.T) { j := JSONBase{ + ID: "foo", APIVersion: "a", Kind: "b", ResourceVersion: 1, @@ -31,8 +32,11 @@ func TestGenericJSONBase(t *testing.T) { if err != nil { t.Fatalf("new err: %v", err) } - // Proove g supports JSONBaseInterface. + // Prove g supports JSONBaseInterface. jbi := JSONBaseInterface(g) + if e, a := "foo", jbi.ID(); e != a { + t.Errorf("expected %v, got %v", e, a) + } if e, a := "a", jbi.APIVersion(); e != a { t.Errorf("expected %v, got %v", e, a) } @@ -43,10 +47,15 @@ func TestGenericJSONBase(t *testing.T) { t.Errorf("expected %v, got %v", e, a) } + jbi.SetID("bar") jbi.SetAPIVersion("c") jbi.SetKind("d") jbi.SetResourceVersion(2) + // Prove that jbi changes the original object. + if e, a := "bar", j.ID; e != a { + t.Errorf("expected %v, got %v", e, a) + } if e, a := "c", j.APIVersion; e != a { t.Errorf("expected %v, got %v", e, a) } diff --git a/pkg/client/cache/doc.go b/pkg/client/cache/doc.go new file mode 100644 index 00000000000..b7a70d9999c --- /dev/null +++ b/pkg/client/cache/doc.go @@ -0,0 +1,24 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package cache is a client-side caching mechanism. It is useful for +// reducing the number of server calls you'd otherwise need to make. +// Reflector watches a server and updates a Store. Two stores are provided; +// one that simply caches objects (for example, to allow a scheduler to +// list currently available minions), and one that additionally acts as +// a FIFO queue (for example, to allow a scheduler to process incoming +// pods). +package cache diff --git a/pkg/client/cache/fifo.go b/pkg/client/cache/fifo.go new file mode 100644 index 00000000000..0b24e414b39 --- /dev/null +++ b/pkg/client/cache/fifo.go @@ -0,0 +1,113 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "sync" +) + +// FIFO recieves adds and updates from a Reflector, and puts them in a queue for +// FIFO order processing. If multiple adds/updates of a single item happen while +// an item is in the queue before it has been processed, it will only be +// processed once, and when it is processed, the most recent version will be +// processed. This can't be done with a channel. +type FIFO struct { + lock sync.RWMutex + cond sync.Cond + items map[string]interface{} + queue []string +} + +// Add inserts an item, and puts it in the queue. +func (f *FIFO) Add(ID string, obj interface{}) { + f.lock.Lock() + defer f.lock.Unlock() + f.items[ID] = obj + f.queue = append(f.queue, ID) + f.cond.Broadcast() +} + +// Update updates an item, and adds it to the queue. +func (f *FIFO) Update(ID string, obj interface{}) { + f.lock.Lock() + defer f.lock.Unlock() + f.items[ID] = obj + f.queue = append(f.queue, ID) + f.cond.Broadcast() +} + +// Delete removes an item. It doesn't add it to the queue, because +// this implementation assumes the consumer only cares about the objects, +// not the order in which they were created/added. +func (f *FIFO) Delete(ID string, obj interface{}) { + f.lock.Lock() + defer f.lock.Unlock() + delete(f.items, ID) +} + +// List returns a list of all the items. +func (f *FIFO) List() []interface{} { + f.lock.RLock() + defer f.lock.RUnlock() + list := make([]interface{}, 0, len(f.items)) + for _, item := range f.items { + list = append(list, item) + } + return list +} + +// Get returns the requested item, or sets exists=false. +func (f *FIFO) Get(ID string) (item interface{}, exists bool) { + f.lock.RLock() + defer f.lock.RUnlock() + item, exists = f.items[ID] + return item, exists +} + +// Pop waits until an item is ready and returns it. If multiple items are +// ready, they are returned in the order in which they were added/updated. +// The item is removed from the queue (and the store) before it is returned, +// so if you don't succesfully process it, you need to add it back with Add(). +func (f *FIFO) Pop() interface{} { + f.lock.Lock() + defer f.lock.Unlock() + for { + for len(f.queue) == 0 { + f.cond.Wait() + } + id := f.queue[0] + f.queue = f.queue[1:] + item, ok := f.items[id] + if !ok { + // Item may have been deleted subsequently. + continue + } + delete(f.items, id) + return item + } +} + +// NewFIFO returns a Store which can be used to queue up items to +// process. +func NewFIFO() *FIFO { + f := &FIFO{ + items: map[string]interface{}{}, + queue: []string{}, + } + f.cond.L = &f.lock + return f +} diff --git a/pkg/client/cache/fifo_test.go b/pkg/client/cache/fifo_test.go new file mode 100644 index 00000000000..f86360aa327 --- /dev/null +++ b/pkg/client/cache/fifo_test.go @@ -0,0 +1,83 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "testing" + "time" +) + +func TestFIFO_basic(t *testing.T) { + f := NewFIFO() + const amount = 500 + go func() { + for i := 0; i < amount; i++ { + f.Add(string([]rune{'a', rune(i)}), i+1) + } + }() + go func() { + for u := uint(0); u < amount; u++ { + f.Add(string([]rune{'b', rune(u)}), u+1) + } + }() + + lastInt := int(0) + lastUint := uint(0) + for i := 0; i < amount*2; i++ { + switch obj := f.Pop().(type) { + case int: + if obj <= lastInt { + t.Errorf("got %v (int) out of order, last was %v", obj, lastInt) + } + lastInt = obj + case uint: + if obj <= lastUint { + t.Errorf("got %v (uint) out of order, last was %v", obj, lastUint) + } else { + lastUint = obj + } + default: + t.Fatalf("unexpected type %#v", obj) + } + } +} + +func TestFIFO_addUpdate(t *testing.T) { + f := NewFIFO() + f.Add("foo", 10) + f.Update("foo", 15) + got := make(chan int, 2) + go func() { + for { + got <- f.Pop().(int) + } + }() + + first := <-got + if e, a := 15, first; e != a { + t.Errorf("Didn't get updated value (%v), got %v", e, a) + } + select { + case unexpected := <-got: + t.Errorf("Got second value %v", unexpected) + case <-time.After(50 * time.Millisecond): + } + _, exists := f.Get("foo") + if exists { + t.Errorf("item did not get removed") + } +} diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go new file mode 100644 index 00000000000..bb7be90660a --- /dev/null +++ b/pkg/client/cache/reflector.go @@ -0,0 +1,106 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "reflect" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + "github.com/golang/glog" +) + +// Store is a generic object storage interface. Reflector knows how to watch a server +// and update a store. A generic store is provided, which allows Reflector to be used +// as a local caching system, and an LRU store, which allows Reflector to work like a +// queue of items yet to be processed. +type Store interface { + Add(ID string, obj interface{}) + Update(ID string, obj interface{}) + Delete(ID string, obj interface{}) + List() []interface{} + Get(ID string) (item interface{}, exists bool) +} + +// Reflector watches a specified resource and causes all changes to be reflected in the given store. +type Reflector struct { + kubeClient *client.Client + resource string + expectedType reflect.Type + store Store +} + +// NewReflector makes a new Reflector object which will keep the given store up to +// date with the server's contents for the given resource. Reflector promises to +// only put things in the store that have the type of expectedType. +// TODO: define a query so you only locally cache a subset of items. +func NewReflector(resource string, kubeClient *client.Client, expectedType interface{}, store Store) *Reflector { + gc := &Reflector{ + resource: resource, + kubeClient: kubeClient, + store: store, + expectedType: reflect.TypeOf(expectedType), + } + return gc +} + +func (gc *Reflector) Run() { + go util.Forever(func() { + w, err := gc.startWatch() + if err != nil { + glog.Errorf("failed to watch %v: %v", gc.resource, err) + return + } + gc.watchHandler(w) + }, 5*time.Second) +} + +func (gc *Reflector) startWatch() (watch.Interface, error) { + return gc.kubeClient.Get().Path(gc.resource).Path("watch").Watch() +} + +func (gc *Reflector) watchHandler(w watch.Interface) { + for { + event, ok := <-w.ResultChan() + if !ok { + glog.Errorf("unexpected watch close") + return + } + if e, a := gc.expectedType, reflect.TypeOf(event.Object); e != a { + glog.Errorf("expected type %v, but watch event object had type %v", e, a) + continue + } + jsonBase, err := api.FindJSONBase(event.Object) + if err != nil { + glog.Errorf("unable to understand watch event %#v", event) + continue + } + switch event.Type { + case watch.Added: + gc.store.Add(jsonBase.ID(), event.Object) + case watch.Modified: + gc.store.Update(jsonBase.ID(), event.Object) + case watch.Deleted: + gc.store.Delete(jsonBase.ID(), event.Object) + default: + glog.Errorf("unable to understand watch event %#v", event) + } + } +} diff --git a/pkg/client/cache/reflector_test.go b/pkg/client/cache/reflector_test.go new file mode 100644 index 00000000000..ec32a7727a7 --- /dev/null +++ b/pkg/client/cache/reflector_test.go @@ -0,0 +1,94 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +func TestReflector_watchHandler(t *testing.T) { + s := NewStore() + g := NewReflector("foo", nil, &api.Pod{}, s) + fw := watch.NewFake() + s.Add("foo", &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}) + s.Add("bar", &api.Pod{JSONBase: api.JSONBase{ID: "bar"}}) + go func() { + fw.Modify(&api.Pod{JSONBase: api.JSONBase{ID: "bar", ResourceVersion: 55}}) + fw.Add(&api.Pod{JSONBase: api.JSONBase{ID: "baz"}}) + fw.Add(&api.Service{JSONBase: api.JSONBase{ID: "rejected"}}) + fw.Delete(&api.Pod{JSONBase: api.JSONBase{ID: "foo"}}) + fw.Stop() + }() + g.watchHandler(fw) + + table := []struct { + ID string + RV uint64 + exists bool + }{ + {"foo", 0, false}, + {"rejected", 0, false}, + {"bar", 55, true}, + {"baz", 0, true}, + } + for _, item := range table { + obj, exists := s.Get(item.ID) + if e, a := item.exists, exists; e != a { + t.Errorf("%v: expected %v, got %v", item.ID, e, a) + } + if !exists { + continue + } + if e, a := item.RV, obj.(*api.Pod).ResourceVersion; e != a { + t.Errorf("%v: expected %v, got %v", item.ID, e, a) + } + } +} + +func TestReflector_startWatch(t *testing.T) { + table := []struct{ resource, path string }{ + {"pods", "/api/v1beta1/pods/watch"}, + {"services", "/api/v1beta1/services/watch"}, + } + for _, testItem := range table { + got := make(chan struct{}) + srv := httptest.NewServer(http.HandlerFunc( + func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(http.StatusNotFound) + if req.URL.Path == testItem.path { + close(got) + return + } + t.Errorf("unexpected path %v", req.URL.Path) + })) + s := NewStore() + c := client.New(srv.URL, nil) + g := NewReflector(testItem.resource, c, &api.Pod{}, s) + _, err := g.startWatch() + // We're just checking that it watches the right path. + if err == nil { + t.Errorf("unexpected non-error") + } + <-got + } +} diff --git a/pkg/client/cache/store.go b/pkg/client/cache/store.go new file mode 100644 index 00000000000..016aa18ce84 --- /dev/null +++ b/pkg/client/cache/store.go @@ -0,0 +1,73 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "sync" +) + +type cache struct { + lock sync.RWMutex + items map[string]interface{} +} + +// Add inserts an item into the cache. +func (c *cache) Add(ID string, obj interface{}) { + c.lock.Lock() + defer c.lock.Unlock() + c.items[ID] = obj +} + +// Update sets an item in the cache to its updated state. +func (c *cache) Update(ID string, obj interface{}) { + c.lock.Lock() + defer c.lock.Unlock() + c.items[ID] = obj +} + +// Delete removes an item from the cache. +func (c *cache) Delete(ID string, obj interface{}) { + c.lock.Lock() + defer c.lock.Unlock() + delete(c.items, ID) +} + +// List returns a list of all the items. +// List is completely threadsafe as long as you treat all items as immutable. +func (c *cache) List() []interface{} { + c.lock.RLock() + defer c.lock.RUnlock() + list := make([]interface{}, 0, len(c.items)) + for _, item := range c.items { + list = append(list, item) + } + return list +} + +// Get returns the requested item, or sets exists=false. +// Get is completely threadsafe as long as you treat all items as immutable. +func (c *cache) Get(ID string) (item interface{}, exists bool) { + c.lock.RLock() + defer c.lock.RUnlock() + item, exists = c.items[ID] + return item, exists +} + +// NewStore returns a Store implemented simply with a map and a lock. +func NewStore() Store { + return &cache{items: map[string]interface{}{}} +} diff --git a/pkg/client/cache/store_test.go b/pkg/client/cache/store_test.go new file mode 100644 index 00000000000..9154488c31a --- /dev/null +++ b/pkg/client/cache/store_test.go @@ -0,0 +1,68 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +// Test public interface +func doTestStore(t *testing.T, store Store) { + store.Add("foo", "bar") + if item, ok := store.Get("foo"); !ok { + t.Errorf("didn't find inserted item") + } else { + if e, a := "bar", item.(string); e != a { + t.Errorf("expected %v, got %v", e, a) + } + } + store.Update("foo", "baz") + if item, ok := store.Get("foo"); !ok { + t.Errorf("didn't find inserted item") + } else { + if e, a := "baz", item.(string); e != a { + t.Errorf("expected %v, got %v", e, a) + } + } + store.Delete("foo", "qux") + if _, ok := store.Get("foo"); ok { + t.Errorf("found deleted item??") + } + store.Add("a", "b") + store.Add("c", "d") + store.Add("e", "e") + found := util.StringSet{} + for _, item := range store.List() { + found.Insert(item.(string)) + } + if !found.HasAll("b", "d", "e") { + t.Errorf("missing items") + } + if len(found) != 3 { + t.Errorf("extra items") + } +} + +func TestCache(t *testing.T) { + doTestStore(t, NewStore()) +} + +func TestFIFOCache(t *testing.T) { + doTestStore(t, NewFIFO()) +}