mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Merge pull request #7559 from fgrzadkowski/perf
Add a simple cache for objects stored in etcd.
This commit is contained in:
commit
e637cd66e8
@ -31,7 +31,8 @@ func TestGetServersToValidate(t *testing.T) {
|
|||||||
config := Config{}
|
config := Config{}
|
||||||
fakeClient := tools.NewFakeEtcdClient(t)
|
fakeClient := tools.NewFakeEtcdClient(t)
|
||||||
fakeClient.Machines = []string{"http://machine1:4001", "http://machine2", "http://machine3:4003"}
|
fakeClient.Machines = []string{"http://machine1:4001", "http://machine2", "http://machine3:4003"}
|
||||||
config.EtcdHelper = tools.EtcdHelper{fakeClient, latest.Codec, nil, etcdtest.PathPrefix()}
|
config.EtcdHelper = tools.NewEtcdHelper(fakeClient, latest.Codec, etcdtest.PathPrefix())
|
||||||
|
config.EtcdHelper.Versioner = nil
|
||||||
|
|
||||||
master.nodeRegistry = registrytest.NewMinionRegistry([]string{"node1", "node2"}, api.NodeResources{})
|
master.nodeRegistry = registrytest.NewMinionRegistry([]string{"node1", "node2"}, api.NodeResources{})
|
||||||
|
|
||||||
|
@ -26,6 +26,7 @@ import (
|
|||||||
"path"
|
"path"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||||
@ -42,6 +43,16 @@ type EtcdHelper struct {
|
|||||||
Versioner EtcdVersioner
|
Versioner EtcdVersioner
|
||||||
// prefix for all etcd keys
|
// prefix for all etcd keys
|
||||||
PathPrefix string
|
PathPrefix string
|
||||||
|
|
||||||
|
// We cache objects stored in etcd. For keys we use Node.ModifiedIndex which is equivalent
|
||||||
|
// to resourceVersion.
|
||||||
|
// This depends on etcd's indexes being globally unique across all objects/types. This will
|
||||||
|
// have to revisited if we decide to do things like multiple etcd clusters, or etcd will
|
||||||
|
// support multi-object transaction that will result in many objects with the same index.
|
||||||
|
// Number of entries stored in the cache is controlled by maxEtcdCacheEntries constant.
|
||||||
|
// TODO: Measure how much this cache helps after the conversion code is optimized.
|
||||||
|
cache map[uint64]runtime.Object
|
||||||
|
mutex *sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewEtcdHelper creates a helper that works against objects that use the internal
|
// NewEtcdHelper creates a helper that works against objects that use the internal
|
||||||
@ -52,6 +63,8 @@ func NewEtcdHelper(client EtcdGetSet, codec runtime.Codec, prefix string) EtcdHe
|
|||||||
Codec: codec,
|
Codec: codec,
|
||||||
Versioner: APIObjectVersioner{},
|
Versioner: APIObjectVersioner{},
|
||||||
PathPrefix: prefix,
|
PathPrefix: prefix,
|
||||||
|
cache: make(map[uint64]runtime.Object),
|
||||||
|
mutex: new(sync.RWMutex),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -121,19 +134,74 @@ func (h *EtcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) er
|
|||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
obj := reflect.New(v.Type().Elem())
|
if obj, found := h.getFromCache(node.ModifiedIndex); found {
|
||||||
if err := h.Codec.DecodeInto([]byte(node.Value), obj.Interface().(runtime.Object)); err != nil {
|
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
|
||||||
return err
|
} else {
|
||||||
|
obj := reflect.New(v.Type().Elem())
|
||||||
|
if err := h.Codec.DecodeInto([]byte(node.Value), obj.Interface().(runtime.Object)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if h.Versioner != nil {
|
||||||
|
// being unable to set the version does not prevent the object from being extracted
|
||||||
|
_ = h.Versioner.UpdateObject(obj.Interface().(runtime.Object), node)
|
||||||
|
}
|
||||||
|
v.Set(reflect.Append(v, obj.Elem()))
|
||||||
|
if node.ModifiedIndex != 0 {
|
||||||
|
h.addToCache(node.ModifiedIndex, obj.Interface().(runtime.Object))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if h.Versioner != nil {
|
|
||||||
// being unable to set the version does not prevent the object from being extracted
|
|
||||||
_ = h.Versioner.UpdateObject(obj.Interface().(runtime.Object), node)
|
|
||||||
}
|
|
||||||
v.Set(reflect.Append(v, obj.Elem()))
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// etcdCache defines interface used for caching objects stored in etcd. Objects are keyed by
|
||||||
|
// their Node.ModifiedIndex, which is unique across all types.
|
||||||
|
// All implementations must be thread-safe.
|
||||||
|
type etcdCache interface {
|
||||||
|
getFromCache(index uint64) (runtime.Object, bool)
|
||||||
|
addToCache(index uint64, obj runtime.Object)
|
||||||
|
}
|
||||||
|
|
||||||
|
const maxEtcdCacheEntries int = 50000
|
||||||
|
|
||||||
|
func (h *EtcdHelper) getFromCache(index uint64) (runtime.Object, bool) {
|
||||||
|
var obj runtime.Object
|
||||||
|
func() {
|
||||||
|
h.mutex.RLock()
|
||||||
|
defer h.mutex.RUnlock()
|
||||||
|
obj = h.cache[index]
|
||||||
|
}()
|
||||||
|
if obj != nil {
|
||||||
|
// We should not return the object itself to avoid poluting the cache if someone
|
||||||
|
// modifies returned values.
|
||||||
|
objCopy, err := conversion.DeepCopy(obj)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Error during DeepCopy of cached object: %q", err)
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
return objCopy.(runtime.Object), true
|
||||||
|
}
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *EtcdHelper) addToCache(index uint64, obj runtime.Object) {
|
||||||
|
objCopy, err := conversion.DeepCopy(obj)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Error during DeepCopy of cached object: %q", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
h.mutex.Lock()
|
||||||
|
defer h.mutex.Unlock()
|
||||||
|
h.cache[index] = objCopy.(runtime.Object)
|
||||||
|
if len(h.cache) > maxEtcdCacheEntries {
|
||||||
|
var randomKey uint64
|
||||||
|
for randomKey = range h.cache {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
delete(h.cache, randomKey)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ExtractToList works on a *List api object (an object that satisfies the runtime.IsList
|
// ExtractToList works on a *List api object (an object that satisfies the runtime.IsList
|
||||||
// definition) and extracts a go object per etcd node into a slice with the resource version.
|
// definition) and extracts a go object per etcd node into a slice with the resource version.
|
||||||
func (h *EtcdHelper) ExtractToList(key string, listObj runtime.Object) error {
|
func (h *EtcdHelper) ExtractToList(key string, listObj runtime.Object) error {
|
||||||
|
@ -174,7 +174,7 @@ func TestExtractToListAcrossDirectories(t *testing.T) {
|
|||||||
Key: "/baz",
|
Key: "/baz",
|
||||||
Value: getEncodedPod("baz"),
|
Value: getEncodedPod("baz"),
|
||||||
Dir: false,
|
Dir: false,
|
||||||
ModifiedIndex: 1,
|
ModifiedIndex: 3,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -199,7 +199,7 @@ func TestExtractToListAcrossDirectories(t *testing.T) {
|
|||||||
Items: []api.Pod{
|
Items: []api.Pod{
|
||||||
// We expect list to be sorted by directory (e.g. namespace) first, then by name.
|
// We expect list to be sorted by directory (e.g. namespace) first, then by name.
|
||||||
{
|
{
|
||||||
ObjectMeta: api.ObjectMeta{Name: "baz", ResourceVersion: "1"},
|
ObjectMeta: api.ObjectMeta{Name: "baz", ResourceVersion: "3"},
|
||||||
Spec: api.PodSpec{
|
Spec: api.PodSpec{
|
||||||
RestartPolicy: api.RestartPolicyAlways,
|
RestartPolicy: api.RestartPolicyAlways,
|
||||||
DNSPolicy: api.DNSClusterFirst,
|
DNSPolicy: api.DNSClusterFirst,
|
||||||
@ -482,7 +482,8 @@ func TestSetObjWithVersion(t *testing.T) {
|
|||||||
func TestSetObjWithoutResourceVersioner(t *testing.T) {
|
func TestSetObjWithoutResourceVersioner(t *testing.T) {
|
||||||
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
|
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
|
||||||
fakeClient := NewFakeEtcdClient(t)
|
fakeClient := NewFakeEtcdClient(t)
|
||||||
helper := EtcdHelper{fakeClient, testapi.Codec(), nil, etcdtest.PathPrefix()}
|
helper := NewEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
|
||||||
|
helper.Versioner = nil
|
||||||
returnedObj := &api.Pod{}
|
returnedObj := &api.Pod{}
|
||||||
err := helper.SetObj("/some/key", obj, returnedObj, 3)
|
err := helper.SetObj("/some/key", obj, returnedObj, 3)
|
||||||
key := etcdtest.AddPrefix("/some/key")
|
key := etcdtest.AddPrefix("/some/key")
|
||||||
@ -509,7 +510,8 @@ func TestSetObjWithoutResourceVersioner(t *testing.T) {
|
|||||||
func TestSetObjNilOutParam(t *testing.T) {
|
func TestSetObjNilOutParam(t *testing.T) {
|
||||||
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
|
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
|
||||||
fakeClient := NewFakeEtcdClient(t)
|
fakeClient := NewFakeEtcdClient(t)
|
||||||
helper := EtcdHelper{fakeClient, testapi.Codec(), nil, etcdtest.PathPrefix()}
|
helper := NewEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
|
||||||
|
helper.Versioner = nil
|
||||||
err := helper.SetObj("/some/key", obj, nil, 3)
|
err := helper.SetObj("/some/key", obj, nil, 3)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error %#v", err)
|
t.Errorf("Unexpected error %#v", err)
|
||||||
|
@ -72,7 +72,7 @@ func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) {
|
|||||||
// watching (e.g., for reconnecting without missing any updates).
|
// watching (e.g., for reconnecting without missing any updates).
|
||||||
func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
|
func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
|
||||||
key = h.PrefixEtcdKey(key)
|
key = h.PrefixEtcdKey(key)
|
||||||
w := newEtcdWatcher(true, exceptKey(key), filter, h.Codec, h.Versioner, nil)
|
w := newEtcdWatcher(true, exceptKey(key), filter, h.Codec, h.Versioner, nil, h)
|
||||||
go w.etcdWatch(h.Client, key, resourceVersion)
|
go w.etcdWatch(h.Client, key, resourceVersion)
|
||||||
return w, nil
|
return w, nil
|
||||||
}
|
}
|
||||||
@ -82,7 +82,7 @@ func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter Filter
|
|||||||
// Errors will be sent down the channel.
|
// Errors will be sent down the channel.
|
||||||
func (h *EtcdHelper) Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
|
func (h *EtcdHelper) Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
|
||||||
key = h.PrefixEtcdKey(key)
|
key = h.PrefixEtcdKey(key)
|
||||||
w := newEtcdWatcher(false, nil, filter, h.Codec, h.Versioner, nil)
|
w := newEtcdWatcher(false, nil, filter, h.Codec, h.Versioner, nil, h)
|
||||||
go w.etcdWatch(h.Client, key, resourceVersion)
|
go w.etcdWatch(h.Client, key, resourceVersion)
|
||||||
return w, nil
|
return w, nil
|
||||||
}
|
}
|
||||||
@ -105,7 +105,7 @@ func (h *EtcdHelper) Watch(key string, resourceVersion uint64, filter FilterFunc
|
|||||||
// Errors will be sent down the channel.
|
// Errors will be sent down the channel.
|
||||||
func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) watch.Interface {
|
func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) watch.Interface {
|
||||||
key = h.PrefixEtcdKey(key)
|
key = h.PrefixEtcdKey(key)
|
||||||
w := newEtcdWatcher(false, nil, Everything, h.Codec, h.Versioner, transform)
|
w := newEtcdWatcher(false, nil, Everything, h.Codec, h.Versioner, transform, h)
|
||||||
go w.etcdWatch(h.Client, key, resourceVersion)
|
go w.etcdWatch(h.Client, key, resourceVersion)
|
||||||
return w
|
return w
|
||||||
}
|
}
|
||||||
@ -145,6 +145,8 @@ type etcdWatcher struct {
|
|||||||
|
|
||||||
// Injectable for testing. Send the event down the outgoing channel.
|
// Injectable for testing. Send the event down the outgoing channel.
|
||||||
emit func(watch.Event)
|
emit func(watch.Event)
|
||||||
|
|
||||||
|
cache etcdCache
|
||||||
}
|
}
|
||||||
|
|
||||||
// watchWaitDuration is the amount of time to wait for an error from watch.
|
// watchWaitDuration is the amount of time to wait for an error from watch.
|
||||||
@ -152,7 +154,7 @@ const watchWaitDuration = 100 * time.Millisecond
|
|||||||
|
|
||||||
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform
|
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform
|
||||||
// and a versioner, the versioner must be able to handle the objects that transform creates.
|
// and a versioner, the versioner must be able to handle the objects that transform creates.
|
||||||
func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding runtime.Codec, versioner EtcdVersioner, transform TransformFunc) *etcdWatcher {
|
func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding runtime.Codec, versioner EtcdVersioner, transform TransformFunc, cache etcdCache) *etcdWatcher {
|
||||||
w := &etcdWatcher{
|
w := &etcdWatcher{
|
||||||
encoding: encoding,
|
encoding: encoding,
|
||||||
versioner: versioner,
|
versioner: versioner,
|
||||||
@ -165,6 +167,7 @@ func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding
|
|||||||
etcdStop: make(chan bool),
|
etcdStop: make(chan bool),
|
||||||
outgoing: make(chan watch.Event),
|
outgoing: make(chan watch.Event),
|
||||||
userStop: make(chan struct{}),
|
userStop: make(chan struct{}),
|
||||||
|
cache: cache,
|
||||||
}
|
}
|
||||||
w.emit = func(e watch.Event) { w.outgoing <- e }
|
w.emit = func(e watch.Event) { w.outgoing <- e }
|
||||||
go w.translate()
|
go w.translate()
|
||||||
@ -256,6 +259,10 @@ func (w *etcdWatcher) translate() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
|
func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
|
||||||
|
if obj, found := w.cache.getFromCache(node.ModifiedIndex); found {
|
||||||
|
return obj, nil
|
||||||
|
}
|
||||||
|
|
||||||
obj, err := w.encoding.Decode([]byte(node.Value))
|
obj, err := w.encoding.Decode([]byte(node.Value))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -277,6 +284,9 @@ func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if node.ModifiedIndex != 0 {
|
||||||
|
w.cache.addToCache(node.ModifiedIndex, obj)
|
||||||
|
}
|
||||||
return obj, nil
|
return obj, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -32,6 +32,18 @@ import (
|
|||||||
|
|
||||||
var versioner = APIObjectVersioner{}
|
var versioner = APIObjectVersioner{}
|
||||||
|
|
||||||
|
// Implements etcdCache interface as empty methods (i.e. does not cache any objects)
|
||||||
|
type fakeEtcdCache struct{}
|
||||||
|
|
||||||
|
func (f *fakeEtcdCache) getFromCache(index uint64) (runtime.Object, bool) {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeEtcdCache) addToCache(index uint64, obj runtime.Object) {
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ etcdCache = &fakeEtcdCache{}
|
||||||
|
|
||||||
func TestWatchInterpretations(t *testing.T) {
|
func TestWatchInterpretations(t *testing.T) {
|
||||||
codec := latest.Codec
|
codec := latest.Codec
|
||||||
// Declare some pods to make the test cases compact.
|
// Declare some pods to make the test cases compact.
|
||||||
@ -115,7 +127,7 @@ func TestWatchInterpretations(t *testing.T) {
|
|||||||
|
|
||||||
for name, item := range table {
|
for name, item := range table {
|
||||||
for _, action := range item.actions {
|
for _, action := range item.actions {
|
||||||
w := newEtcdWatcher(true, nil, firstLetterIsB, codec, versioner, nil)
|
w := newEtcdWatcher(true, nil, firstLetterIsB, codec, versioner, nil, &fakeEtcdCache{})
|
||||||
emitCalled := false
|
emitCalled := false
|
||||||
w.emit = func(event watch.Event) {
|
w.emit = func(event watch.Event) {
|
||||||
emitCalled = true
|
emitCalled = true
|
||||||
@ -153,7 +165,7 @@ func TestWatchInterpretations(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
|
func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
|
||||||
w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil)
|
w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil, &fakeEtcdCache{})
|
||||||
w.emit = func(e watch.Event) {
|
w.emit = func(e watch.Event) {
|
||||||
t.Errorf("Unexpected emit: %v", e)
|
t.Errorf("Unexpected emit: %v", e)
|
||||||
}
|
}
|
||||||
@ -167,7 +179,7 @@ func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
|
|||||||
func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
|
func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
|
||||||
actions := []string{"create", "set", "compareAndSwap", "delete"}
|
actions := []string{"create", "set", "compareAndSwap", "delete"}
|
||||||
for _, action := range actions {
|
for _, action := range actions {
|
||||||
w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil)
|
w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil, &fakeEtcdCache{})
|
||||||
w.emit = func(e watch.Event) {
|
w.emit = func(e watch.Event) {
|
||||||
t.Errorf("Unexpected emit: %v", e)
|
t.Errorf("Unexpected emit: %v", e)
|
||||||
}
|
}
|
||||||
@ -181,7 +193,7 @@ func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
|
|||||||
func TestWatchInterpretation_ResponseBadData(t *testing.T) {
|
func TestWatchInterpretation_ResponseBadData(t *testing.T) {
|
||||||
actions := []string{"create", "set", "compareAndSwap", "delete"}
|
actions := []string{"create", "set", "compareAndSwap", "delete"}
|
||||||
for _, action := range actions {
|
for _, action := range actions {
|
||||||
w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil)
|
w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil, &fakeEtcdCache{})
|
||||||
w.emit = func(e watch.Event) {
|
w.emit = func(e watch.Event) {
|
||||||
t.Errorf("Unexpected emit: %v", e)
|
t.Errorf("Unexpected emit: %v", e)
|
||||||
}
|
}
|
||||||
@ -206,7 +218,7 @@ func TestWatchEtcdError(t *testing.T) {
|
|||||||
fakeClient := NewFakeEtcdClient(t)
|
fakeClient := NewFakeEtcdClient(t)
|
||||||
fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{}
|
fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{}
|
||||||
fakeClient.WatchImmediateError = fmt.Errorf("immediate error")
|
fakeClient.WatchImmediateError = fmt.Errorf("immediate error")
|
||||||
h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()}
|
h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
|
||||||
|
|
||||||
watching, err := h.Watch("/some/key", 4, Everything)
|
watching, err := h.Watch("/some/key", 4, Everything)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -236,7 +248,7 @@ func TestWatch(t *testing.T) {
|
|||||||
key := "/some/key"
|
key := "/some/key"
|
||||||
prefixedKey := etcdtest.AddPrefix(key)
|
prefixedKey := etcdtest.AddPrefix(key)
|
||||||
fakeClient.expectNotFoundGetSet[prefixedKey] = struct{}{}
|
fakeClient.expectNotFoundGetSet[prefixedKey] = struct{}{}
|
||||||
h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()}
|
h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
|
||||||
|
|
||||||
watching, err := h.Watch(key, 0, Everything)
|
watching, err := h.Watch(key, 0, Everything)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -412,7 +424,7 @@ func TestWatchEtcdState(t *testing.T) {
|
|||||||
fakeClient.Data[key] = value
|
fakeClient.Data[key] = value
|
||||||
}
|
}
|
||||||
|
|
||||||
h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()}
|
h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
|
||||||
watching, err := h.Watch(baseKey, testCase.From, Everything)
|
watching, err := h.Watch(baseKey, testCase.From, Everything)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
@ -485,7 +497,7 @@ func TestWatchFromZeroIndex(t *testing.T) {
|
|||||||
key := "/some/key"
|
key := "/some/key"
|
||||||
prefixedKey := etcdtest.AddPrefix(key)
|
prefixedKey := etcdtest.AddPrefix(key)
|
||||||
fakeClient.Data[prefixedKey] = testCase.Response
|
fakeClient.Data[prefixedKey] = testCase.Response
|
||||||
h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()}
|
h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
|
||||||
|
|
||||||
watching, err := h.Watch(key, 0, Everything)
|
watching, err := h.Watch(key, 0, Everything)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -546,7 +558,7 @@ func TestWatchListFromZeroIndex(t *testing.T) {
|
|||||||
EtcdIndex: 3,
|
EtcdIndex: 3,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()}
|
h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
|
||||||
|
|
||||||
watching, err := h.WatchList(key, 0, Everything)
|
watching, err := h.WatchList(key, 0, Everything)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -586,7 +598,7 @@ func TestWatchListIgnoresRootKey(t *testing.T) {
|
|||||||
prefixedKey := etcdtest.AddPrefix(key)
|
prefixedKey := etcdtest.AddPrefix(key)
|
||||||
|
|
||||||
fakeClient := NewFakeEtcdClient(t)
|
fakeClient := NewFakeEtcdClient(t)
|
||||||
h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()}
|
h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
|
||||||
|
|
||||||
watching, err := h.WatchList(key, 1, Everything)
|
watching, err := h.WatchList(key, 1, Everything)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -639,7 +651,7 @@ func TestWatchFromNotFound(t *testing.T) {
|
|||||||
ErrorCode: 100,
|
ErrorCode: 100,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()}
|
h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
|
||||||
|
|
||||||
watching, err := h.Watch(key, 0, Everything)
|
watching, err := h.Watch(key, 0, Everything)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -666,7 +678,8 @@ func TestWatchFromOtherError(t *testing.T) {
|
|||||||
ErrorCode: 101,
|
ErrorCode: 101,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()}
|
h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
|
||||||
|
|
||||||
watching, err := h.Watch(key, 0, Everything)
|
watching, err := h.Watch(key, 0, Everything)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
@ -696,7 +709,8 @@ func TestWatchFromOtherError(t *testing.T) {
|
|||||||
|
|
||||||
func TestWatchPurposefulShutdown(t *testing.T) {
|
func TestWatchPurposefulShutdown(t *testing.T) {
|
||||||
fakeClient := NewFakeEtcdClient(t)
|
fakeClient := NewFakeEtcdClient(t)
|
||||||
h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()}
|
|
||||||
|
h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
|
||||||
key := "/some/key"
|
key := "/some/key"
|
||||||
prefixedKey := etcdtest.AddPrefix(key)
|
prefixedKey := etcdtest.AddPrefix(key)
|
||||||
fakeClient.expectNotFoundGetSet[prefixedKey] = struct{}{}
|
fakeClient.expectNotFoundGetSet[prefixedKey] = struct{}{}
|
||||||
|
Loading…
Reference in New Issue
Block a user