From a47b65bf8a74ab00f249c6d8939e721543e47471 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Sun, 3 Aug 2014 00:00:42 -0700 Subject: [PATCH] Add cache package. --- pkg/client/cache/cache.go | 71 ++++++++++++++++++++++ pkg/client/cache/doc.go | 24 ++++++++ pkg/client/cache/fifo.go | 107 +++++++++++++++++++++++++++++++++ pkg/client/cache/getter.go | 104 ++++++++++++++++++++++++++++++++ pkg/client/cache/store_test.go | 64 ++++++++++++++++++++ 5 files changed, 370 insertions(+) create mode 100644 pkg/client/cache/cache.go create mode 100644 pkg/client/cache/doc.go create mode 100644 pkg/client/cache/fifo.go create mode 100644 pkg/client/cache/getter.go create mode 100644 pkg/client/cache/store_test.go diff --git a/pkg/client/cache/cache.go b/pkg/client/cache/cache.go new file mode 100644 index 00000000000..c0bfb20a6c4 --- /dev/null +++ b/pkg/client/cache/cache.go @@ -0,0 +1,71 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "sync" +) + +type cache struct { + lock sync.RWMutex + items map[string]interface{} +} + +// Add inserts an item into the cache. +func (c *cache) Add(ID string, obj interface{}) { + c.lock.Lock() + defer c.lock.Unlock() + c.items[ID] = obj +} + +// Update sets an item in the cache to its updated state. +func (c *cache) Update(ID string, obj interface{}) { + c.lock.Lock() + defer c.lock.Unlock() + c.items[ID] = obj +} + +// Delete removes an item from the cache. +func (c *cache) Delete(ID string, obj interface{}) { + c.lock.Lock() + defer c.lock.Unlock() + delete(c.items, ID) +} + +// List returns a list of all the items. +func (c *cache) List() []interface{} { + c.lock.RLock() + defer c.lock.RUnlock() + list := make([]interface{}, 0, len(c.items)) + for _, item := range c.items { + list = append(list, item) + } + return list +} + +// Get returns the requested item, or sets exists=false. +func (c *cache) Get(ID string) (item interface{}, exists bool) { + c.lock.RLock() + defer c.lock.RUnlock() + item, exists = c.items[ID] + return item, exists +} + +// NewStore returns a Store implemented simply with a map and a lock. +func NewStore() Store { + return &cache{items: map[string]interface{}{}} +} diff --git a/pkg/client/cache/doc.go b/pkg/client/cache/doc.go new file mode 100644 index 00000000000..2a34439e656 --- /dev/null +++ b/pkg/client/cache/doc.go @@ -0,0 +1,24 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package cache is a client-side caching mechanism. It is useful for +// reducing the number of server calls you'd otherwise need to make. +// Getter watches a server and updates a Store. Two stores are provided; +// one that simply caches objects (for example, to allow a scheduler to +// list currently available minions), and one that additionally acts as +// a FIFO queue (for example, to allow a scheduler to process incoming +// pods). +package cache diff --git a/pkg/client/cache/fifo.go b/pkg/client/cache/fifo.go new file mode 100644 index 00000000000..25a256b2aa0 --- /dev/null +++ b/pkg/client/cache/fifo.go @@ -0,0 +1,107 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "sync" +) + +type FIFO struct { + lock sync.RWMutex + cond sync.Cond + items map[string]interface{} + queue []string +} + +// Add inserts an item, and puts it in the queue. +func (f *FIFO) Add(ID string, obj interface{}) { + f.lock.Lock() + defer f.lock.Unlock() + f.items[ID] = obj + f.queue = append(f.queue, ID) + f.cond.Broadcast() +} + +// Update updates an item, and adds it to the queue. +func (f *FIFO) Update(ID string, obj interface{}) { + f.lock.Lock() + defer f.lock.Unlock() + f.items[ID] = obj + f.queue = append(f.queue, ID) + f.cond.Broadcast() +} + +// Delete removes an item. It doesn't add it to the queue, because +// this implementation assumes the consumer only cares about the objects, +// not the order in which they were created/added. +func (f *FIFO) Delete(ID string, obj interface{}) { + f.lock.Lock() + defer f.lock.Unlock() + delete(f.items, ID) +} + +// List returns a list of all the items. +func (f *FIFO) List() []interface{} { + f.lock.RLock() + defer f.lock.RUnlock() + list := make([]interface{}, 0, len(f.items)) + for _, item := range f.items { + list = append(list, item) + } + return list +} + +// Get returns the requested item, or sets exists=false. +func (f *FIFO) Get(ID string) (item interface{}, exists bool) { + f.lock.RLock() + defer f.lock.RUnlock() + item, exists = f.items[ID] + return item, exists +} + +// Pop waits until an item is ready and returns it. If multiple items are +// ready, they are returned in the order in which they were added/updated. +// The item is removed from the queue (and the store) before it is returned, +// so if you don't succesfully process it, you need to add it back with Add(). +func (f *FIFO) Pop() interface{} { + f.lock.Lock() + defer f.lock.Unlock() + for { + for len(f.queue) == 0 { + f.cond.Wait() + } + id := f.queue[0] + f.queue = f.queue[1:] + item, ok := f.items[id] + if !ok { + // Item may have been deleted subsequently. + continue + } + return item + } +} + +// NewFIFOStore returns a Store which can be used to queue up items to +// process. +func NewFIFOStore() *FIFO { + f := &FIFO{ + items: map[string]interface{}{}, + queue: []string{}, + } + f.cond.L = &f.lock + return f +} diff --git a/pkg/client/cache/getter.go b/pkg/client/cache/getter.go new file mode 100644 index 00000000000..fad79b581e4 --- /dev/null +++ b/pkg/client/cache/getter.go @@ -0,0 +1,104 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "reflect" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + "github.com/golang/glog" +) + +// Store is a generic object storage interface. Getter knows how to watch a server +// and update a store. A generic store is provided, which allows Getter to be used +// as a local caching system, and an LRU store, which allows Getter to work like a +// queue of items yet to be processed. +type Store interface { + Add(ID string, obj interface{}) + Update(ID string, obj interface{}) + Delete(ID string, obj interface{}) + List() []interface{} + Get(ID string) (item interface{}, exists bool) +} + +// Getter watches a specified resource and causes all changes to be reflected in the given store. +type Getter struct { + kubeClient *client.Client + resource string + expectedType reflect.Type + store Store +} + +// NewGetter makes a new Getter object which will keep the given store up to +// date with the server's contents for the given resource. Getter promises to +// only put things in the store that have the type of expectedType. +// TODO: define a query so you only locally cache a subset of items. +func NewGetter(resource string, kubeClient *client.Client, expectedType interface{}, store Store) *Getter { + gc := &Getter{ + resource: resource, + kubeClient: kubeClient, + store: store, + expectedType: reflect.TypeOf(expectedType), + } + return gc +} + +func (gc *Getter) Run() { + go util.Forever(gc.watch, 5*time.Second) +} + +func (gc *Getter) watch() { + w, err := gc.kubeClient.Get().Path(gc.resource).Watch() + if err != nil { + glog.Errorf("failed to watch %v: %v", gc.resource, err) + return + } + gc.watchHandler(w) +} + +func (gc *Getter) watchHandler(w watch.Interface) { + for { + event, ok := <-w.ResultChan() + if !ok { + glog.Errorf("unexpected watch close") + return + } + if e, a := gc.expectedType, reflect.TypeOf(event.Object); e != a { + glog.Errorf("expected type %v, but watch event object had type %v", e, a) + continue + } + jsonBase, err := api.FindJSONBase(event.Object) + if err != nil { + glog.Errorf("unable to understand watch event %#v", event) + continue + } + switch event.Type { + case watch.Added: + gc.store.Add(jsonBase.ID(), event.Object) + case watch.Modified: + gc.store.Update(jsonBase.ID(), event.Object) + case watch.Deleted: + gc.store.Delete(jsonBase.ID(), event.Object) + default: + glog.Errorf("unable to understand watch event %#v", event) + } + } +} diff --git a/pkg/client/cache/store_test.go b/pkg/client/cache/store_test.go new file mode 100644 index 00000000000..c84d2926c95 --- /dev/null +++ b/pkg/client/cache/store_test.go @@ -0,0 +1,64 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +// Test public interface +func doTestStore(t *testing.T, store Store) { + store.Add("foo", "bar") + if item, ok := store.Get("foo"); !ok { + t.Errorf("didn't find inserted item") + } else { + if e, a := "bar", item.(string); e != a { + t.Errorf("expected %v, got %v", e, a) + } + } + store.Update("foo", "baz") + if item, ok := store.Get("foo"); !ok { + t.Errorf("didn't find inserted item") + } else { + if e, a := "baz", item.(string); e != a { + t.Errorf("expected %v, got %v", e, a) + } + } + store.Delete("foo", "qux") + if _, ok := store.Get("foo"); ok { + t.Errorf("found deleted item??") + } + store.Add("a", "b") + store.Add("c", "d") + store.Add("e", "e") + found := util.StringSet{} + for _, item := range store.List() { + found.Insert(item.(string)) + } + if !found.HasAll("b", "d", "e") { + t.Errorf("missing items") + } + if len(found) != 3 { + t.Errorf("extra items") + } +} + +func TestCache(t *testing.T) { + doTestStore(t, NewStore()) +}