From 15e9bfd9ae5d09ea3dc7f663c2d877f14d6b4d9a Mon Sep 17 00:00:00 2001 From: Filip Grzadkowski Date: Fri, 24 Apr 2015 13:07:32 +0200 Subject: [PATCH] Add a simple cache for objects stored in etcd. --- pkg/master/master_test.go | 3 +- pkg/tools/etcd_helper.go | 84 ++++++++++++++++++++++++++--- pkg/tools/etcd_helper_test.go | 10 ++-- pkg/tools/etcd_helper_watch.go | 18 +++++-- pkg/tools/etcd_helper_watch_test.go | 40 +++++++++----- 5 files changed, 125 insertions(+), 30 deletions(-) diff --git a/pkg/master/master_test.go b/pkg/master/master_test.go index 217d1c73225..727f50c8127 100644 --- a/pkg/master/master_test.go +++ b/pkg/master/master_test.go @@ -31,7 +31,8 @@ func TestGetServersToValidate(t *testing.T) { config := Config{} fakeClient := tools.NewFakeEtcdClient(t) fakeClient.Machines = []string{"http://machine1:4001", "http://machine2", "http://machine3:4003"} - config.EtcdHelper = tools.EtcdHelper{fakeClient, latest.Codec, nil, etcdtest.PathPrefix()} + config.EtcdHelper = tools.NewEtcdHelper(fakeClient, latest.Codec, etcdtest.PathPrefix()) + config.EtcdHelper.Versioner = nil master.nodeRegistry = registrytest.NewMinionRegistry([]string{"node1", "node2"}, api.NodeResources{}) diff --git a/pkg/tools/etcd_helper.go b/pkg/tools/etcd_helper.go index 7cc4cbf4dc4..aca33554a65 100644 --- a/pkg/tools/etcd_helper.go +++ b/pkg/tools/etcd_helper.go @@ -26,6 +26,7 @@ import ( "path" "reflect" "strings" + "sync" "github.com/GoogleCloudPlatform/kubernetes/pkg/conversion" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" @@ -42,6 +43,16 @@ type EtcdHelper struct { Versioner EtcdVersioner // prefix for all etcd keys PathPrefix string + + // We cache objects stored in etcd. For keys we use Node.ModifiedIndex which is equivalent + // to resourceVersion. + // This depends on etcd's indexes being globally unique across all objects/types. This will + // have to revisited if we decide to do things like multiple etcd clusters, or etcd will + // support multi-object transaction that will result in many objects with the same index. + // Number of entries stored in the cache is controlled by maxEtcdCacheEntries constant. + // TODO: Measure how much this cache helps after the conversion code is optimized. + cache map[uint64]runtime.Object + mutex *sync.RWMutex } // NewEtcdHelper creates a helper that works against objects that use the internal @@ -52,6 +63,8 @@ func NewEtcdHelper(client EtcdGetSet, codec runtime.Codec, prefix string) EtcdHe Codec: codec, Versioner: APIObjectVersioner{}, PathPrefix: prefix, + cache: make(map[uint64]runtime.Object), + mutex: new(sync.RWMutex), } } @@ -121,19 +134,74 @@ func (h *EtcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) er } continue } - obj := reflect.New(v.Type().Elem()) - if err := h.Codec.DecodeInto([]byte(node.Value), obj.Interface().(runtime.Object)); err != nil { - return err + if obj, found := h.getFromCache(node.ModifiedIndex); found { + v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) + } else { + obj := reflect.New(v.Type().Elem()) + if err := h.Codec.DecodeInto([]byte(node.Value), obj.Interface().(runtime.Object)); err != nil { + return err + } + if h.Versioner != nil { + // being unable to set the version does not prevent the object from being extracted + _ = h.Versioner.UpdateObject(obj.Interface().(runtime.Object), node) + } + v.Set(reflect.Append(v, obj.Elem())) + if node.ModifiedIndex != 0 { + h.addToCache(node.ModifiedIndex, obj.Interface().(runtime.Object)) + } } - if h.Versioner != nil { - // being unable to set the version does not prevent the object from being extracted - _ = h.Versioner.UpdateObject(obj.Interface().(runtime.Object), node) - } - v.Set(reflect.Append(v, obj.Elem())) } return nil } +// etcdCache defines interface used for caching objects stored in etcd. Objects are keyed by +// their Node.ModifiedIndex, which is unique across all types. +// All implementations must be thread-safe. +type etcdCache interface { + getFromCache(index uint64) (runtime.Object, bool) + addToCache(index uint64, obj runtime.Object) +} + +const maxEtcdCacheEntries int = 50000 + +func (h *EtcdHelper) getFromCache(index uint64) (runtime.Object, bool) { + var obj runtime.Object + func() { + h.mutex.RLock() + defer h.mutex.RUnlock() + obj = h.cache[index] + }() + if obj != nil { + // We should not return the object itself to avoid poluting the cache if someone + // modifies returned values. + objCopy, err := conversion.DeepCopy(obj) + if err != nil { + glog.Errorf("Error during DeepCopy of cached object: %q", err) + return nil, false + } + return objCopy.(runtime.Object), true + } + return nil, false +} + +func (h *EtcdHelper) addToCache(index uint64, obj runtime.Object) { + objCopy, err := conversion.DeepCopy(obj) + if err != nil { + glog.Errorf("Error during DeepCopy of cached object: %q", err) + return + } + h.mutex.Lock() + defer h.mutex.Unlock() + h.cache[index] = objCopy.(runtime.Object) + if len(h.cache) > maxEtcdCacheEntries { + var randomKey uint64 + for randomKey = range h.cache { + break + } + delete(h.cache, randomKey) + } +} + // ExtractToList works on a *List api object (an object that satisfies the runtime.IsList // definition) and extracts a go object per etcd node into a slice with the resource version. func (h *EtcdHelper) ExtractToList(key string, listObj runtime.Object) error { diff --git a/pkg/tools/etcd_helper_test.go b/pkg/tools/etcd_helper_test.go index aeb3dcf8ac5..f35fe48b1bd 100644 --- a/pkg/tools/etcd_helper_test.go +++ b/pkg/tools/etcd_helper_test.go @@ -174,7 +174,7 @@ func TestExtractToListAcrossDirectories(t *testing.T) { Key: "/baz", Value: getEncodedPod("baz"), Dir: false, - ModifiedIndex: 1, + ModifiedIndex: 3, }, }, }, @@ -199,7 +199,7 @@ func TestExtractToListAcrossDirectories(t *testing.T) { Items: []api.Pod{ // We expect list to be sorted by directory (e.g. namespace) first, then by name. { - ObjectMeta: api.ObjectMeta{Name: "baz", ResourceVersion: "1"}, + ObjectMeta: api.ObjectMeta{Name: "baz", ResourceVersion: "3"}, Spec: api.PodSpec{ RestartPolicy: api.RestartPolicyAlways, DNSPolicy: api.DNSClusterFirst, @@ -482,7 +482,8 @@ func TestSetObjWithVersion(t *testing.T) { func TestSetObjWithoutResourceVersioner(t *testing.T) { obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} fakeClient := NewFakeEtcdClient(t) - helper := EtcdHelper{fakeClient, testapi.Codec(), nil, etcdtest.PathPrefix()} + helper := NewEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix()) + helper.Versioner = nil returnedObj := &api.Pod{} err := helper.SetObj("/some/key", obj, returnedObj, 3) key := etcdtest.AddPrefix("/some/key") @@ -509,7 +510,8 @@ func TestSetObjWithoutResourceVersioner(t *testing.T) { func TestSetObjNilOutParam(t *testing.T) { obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} fakeClient := NewFakeEtcdClient(t) - helper := EtcdHelper{fakeClient, testapi.Codec(), nil, etcdtest.PathPrefix()} + helper := NewEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix()) + helper.Versioner = nil err := helper.SetObj("/some/key", obj, nil, 3) if err != nil { t.Errorf("Unexpected error %#v", err) diff --git a/pkg/tools/etcd_helper_watch.go b/pkg/tools/etcd_helper_watch.go index 56262ad6b63..35e69132169 100644 --- a/pkg/tools/etcd_helper_watch.go +++ b/pkg/tools/etcd_helper_watch.go @@ -72,7 +72,7 @@ func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) { // watching (e.g., for reconnecting without missing any updates). func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) { key = h.PrefixEtcdKey(key) - w := newEtcdWatcher(true, exceptKey(key), filter, h.Codec, h.Versioner, nil) + w := newEtcdWatcher(true, exceptKey(key), filter, h.Codec, h.Versioner, nil, h) go w.etcdWatch(h.Client, key, resourceVersion) return w, nil } @@ -82,7 +82,7 @@ func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter Filter // Errors will be sent down the channel. func (h *EtcdHelper) Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) { key = h.PrefixEtcdKey(key) - w := newEtcdWatcher(false, nil, filter, h.Codec, h.Versioner, nil) + w := newEtcdWatcher(false, nil, filter, h.Codec, h.Versioner, nil, h) go w.etcdWatch(h.Client, key, resourceVersion) return w, nil } @@ -105,7 +105,7 @@ func (h *EtcdHelper) Watch(key string, resourceVersion uint64, filter FilterFunc // Errors will be sent down the channel. func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) watch.Interface { key = h.PrefixEtcdKey(key) - w := newEtcdWatcher(false, nil, Everything, h.Codec, h.Versioner, transform) + w := newEtcdWatcher(false, nil, Everything, h.Codec, h.Versioner, transform, h) go w.etcdWatch(h.Client, key, resourceVersion) return w } @@ -145,6 +145,8 @@ type etcdWatcher struct { // Injectable for testing. Send the event down the outgoing channel. emit func(watch.Event) + + cache etcdCache } // watchWaitDuration is the amount of time to wait for an error from watch. @@ -152,7 +154,7 @@ const watchWaitDuration = 100 * time.Millisecond // newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform // and a versioner, the versioner must be able to handle the objects that transform creates. -func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding runtime.Codec, versioner EtcdVersioner, transform TransformFunc) *etcdWatcher { +func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding runtime.Codec, versioner EtcdVersioner, transform TransformFunc, cache etcdCache) *etcdWatcher { w := &etcdWatcher{ encoding: encoding, versioner: versioner, @@ -165,6 +167,7 @@ func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding etcdStop: make(chan bool), outgoing: make(chan watch.Event), userStop: make(chan struct{}), + cache: cache, } w.emit = func(e watch.Event) { w.outgoing <- e } go w.translate() @@ -256,6 +259,10 @@ func (w *etcdWatcher) translate() { } func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) { + if obj, found := w.cache.getFromCache(node.ModifiedIndex); found { + return obj, nil + } + obj, err := w.encoding.Decode([]byte(node.Value)) if err != nil { return nil, err @@ -277,6 +284,9 @@ func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) { } } + if node.ModifiedIndex != 0 { + w.cache.addToCache(node.ModifiedIndex, obj) + } return obj, nil } diff --git a/pkg/tools/etcd_helper_watch_test.go b/pkg/tools/etcd_helper_watch_test.go index e575ec4f4e6..e3571a59581 100644 --- a/pkg/tools/etcd_helper_watch_test.go +++ b/pkg/tools/etcd_helper_watch_test.go @@ -32,6 +32,18 @@ import ( var versioner = APIObjectVersioner{} +// Implements etcdCache interface as empty methods (i.e. does not cache any objects) +type fakeEtcdCache struct{} + +func (f *fakeEtcdCache) getFromCache(index uint64) (runtime.Object, bool) { + return nil, false +} + +func (f *fakeEtcdCache) addToCache(index uint64, obj runtime.Object) { +} + +var _ etcdCache = &fakeEtcdCache{} + func TestWatchInterpretations(t *testing.T) { codec := latest.Codec // Declare some pods to make the test cases compact. @@ -115,7 +127,7 @@ func TestWatchInterpretations(t *testing.T) { for name, item := range table { for _, action := range item.actions { - w := newEtcdWatcher(true, nil, firstLetterIsB, codec, versioner, nil) + w := newEtcdWatcher(true, nil, firstLetterIsB, codec, versioner, nil, &fakeEtcdCache{}) emitCalled := false w.emit = func(event watch.Event) { emitCalled = true @@ -153,7 +165,7 @@ func TestWatchInterpretations(t *testing.T) { } func TestWatchInterpretation_ResponseNotSet(t *testing.T) { - w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil) + w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil, &fakeEtcdCache{}) w.emit = func(e watch.Event) { t.Errorf("Unexpected emit: %v", e) } @@ -167,7 +179,7 @@ func TestWatchInterpretation_ResponseNotSet(t *testing.T) { func TestWatchInterpretation_ResponseNoNode(t *testing.T) { actions := []string{"create", "set", "compareAndSwap", "delete"} for _, action := range actions { - w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil) + w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil, &fakeEtcdCache{}) w.emit = func(e watch.Event) { t.Errorf("Unexpected emit: %v", e) } @@ -181,7 +193,7 @@ func TestWatchInterpretation_ResponseNoNode(t *testing.T) { func TestWatchInterpretation_ResponseBadData(t *testing.T) { actions := []string{"create", "set", "compareAndSwap", "delete"} for _, action := range actions { - w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil) + w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil, &fakeEtcdCache{}) w.emit = func(e watch.Event) { t.Errorf("Unexpected emit: %v", e) } @@ -206,7 +218,7 @@ func TestWatchEtcdError(t *testing.T) { fakeClient := NewFakeEtcdClient(t) fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{} fakeClient.WatchImmediateError = fmt.Errorf("immediate error") - h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()} + h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) watching, err := h.Watch("/some/key", 4, Everything) if err != nil { @@ -236,7 +248,7 @@ func TestWatch(t *testing.T) { key := "/some/key" prefixedKey := etcdtest.AddPrefix(key) fakeClient.expectNotFoundGetSet[prefixedKey] = struct{}{} - h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()} + h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) watching, err := h.Watch(key, 0, Everything) if err != nil { @@ -412,7 +424,7 @@ func TestWatchEtcdState(t *testing.T) { fakeClient.Data[key] = value } - h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()} + h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) watching, err := h.Watch(baseKey, testCase.From, Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -485,7 +497,7 @@ func TestWatchFromZeroIndex(t *testing.T) { key := "/some/key" prefixedKey := etcdtest.AddPrefix(key) fakeClient.Data[prefixedKey] = testCase.Response - h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()} + h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) watching, err := h.Watch(key, 0, Everything) if err != nil { @@ -546,7 +558,7 @@ func TestWatchListFromZeroIndex(t *testing.T) { EtcdIndex: 3, }, } - h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()} + h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) watching, err := h.WatchList(key, 0, Everything) if err != nil { @@ -586,7 +598,7 @@ func TestWatchListIgnoresRootKey(t *testing.T) { prefixedKey := etcdtest.AddPrefix(key) fakeClient := NewFakeEtcdClient(t) - h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()} + h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) watching, err := h.WatchList(key, 1, Everything) if err != nil { @@ -639,7 +651,7 @@ func TestWatchFromNotFound(t *testing.T) { ErrorCode: 100, }, } - h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()} + h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) watching, err := h.Watch(key, 0, Everything) if err != nil { @@ -666,7 +678,8 @@ func TestWatchFromOtherError(t *testing.T) { ErrorCode: 101, }, } - h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()} + h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) + watching, err := h.Watch(key, 0, Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -696,7 +709,8 @@ func TestWatchFromOtherError(t *testing.T) { func TestWatchPurposefulShutdown(t *testing.T) { fakeClient := NewFakeEtcdClient(t) - h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()} + + h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) key := "/some/key" prefixedKey := etcdtest.AddPrefix(key) fakeClient.expectNotFoundGetSet[prefixedKey] = struct{}{}