Watch delivers current state for resourceVersion=0

Allows clients to get the current state without having to execute
a get followed by a watch.  Makes integration with action loops
much cleaner.
This commit is contained in:
Clayton Coleman 2014-08-11 00:08:06 -04:00
parent c71866164f
commit c5630a9567
3 changed files with 281 additions and 16 deletions

View File

@ -106,12 +106,21 @@ func IsEtcdWatchStoppedByUser(err error) bool {
return etcd.ErrWatchStoppedByUser == err
}
// Returns true iff err is an etcd error, whose errorCode matches errorCode
// isEtcdErrorNum returns true iff err is an etcd error, whose errorCode matches errorCode
func isEtcdErrorNum(err error, errorCode int) bool {
etcdError, ok := err.(*etcd.EtcdError)
return ok && etcdError != nil && etcdError.ErrorCode == errorCode
}
// etcdErrorIndex returns the index associated with the error message and whether the
// index was available.
func etcdErrorIndex(err error) (uint64, bool) {
if etcdError, ok := err.(*etcd.EtcdError); ok {
return etcdError.Index, true
}
return 0, false
}
func (h *EtcdHelper) listEtcdNode(key string) ([]*etcd.Node, error) {
result, err := h.Client.Get(key, false, true)
if err != nil {
@ -297,9 +306,9 @@ func Everything(interface{}) bool {
// WatchList begins watching the specified key's items. Items are decoded into
// API objects, and any items passing 'filter' are sent down the returned
// watch.Interface. resourceVersion may be used to specify what version to begin
// watching (e.g., for reconnecting without missing any updateds).
// watching (e.g., for reconnecting without missing any updates).
func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
w := newEtcdWatcher(true, filter, h.Codec)
w := newEtcdWatcher(true, filter, h.Codec, h.ResourceVersioner, nil)
go w.etcdWatch(h.Client, key, resourceVersion)
return w, nil
}
@ -307,14 +316,38 @@ func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter Filter
// Watch begins watching the specified key. Events are decoded into
// API objects and sent down the returned watch.Interface.
func (h *EtcdHelper) Watch(key string, resourceVersion uint64) (watch.Interface, error) {
w := newEtcdWatcher(false, nil, h.Codec)
return h.WatchAndTransform(key, resourceVersion, nil)
}
// WatchAndTransform begins watching the specified key. Events are decoded into
// API objects and sent down the returned watch.Interface. If the transform
// function is provided, the value decoded from etcd will be passed to the function
// prior to being returned.
//
// The transform function can be used to populate data not available to etcd, or to
// change or wrap the serialized etcd object.
//
// startTime := time.Now()
// helper.WatchAndTransform(key, version, func(input interface{}) (interface{}, error) {
// value := input.(TimeAwareValue)
// value.Since = startTime
// return value, nil
// })
//
func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) (watch.Interface, error) {
w := newEtcdWatcher(false, nil, h.Codec, h.ResourceVersioner, transform)
go w.etcdWatch(h.Client, key, resourceVersion)
return w, nil
}
// TransformFunc attempts to convert an object to another object for use with a watcher
type TransformFunc func(interface{}) (interface{}, error)
// etcdWatcher converts a native etcd watch to a watch.Interface.
type etcdWatcher struct {
encoding Codec
encoding Codec
versioner ResourceVersioner
transform TransformFunc
list bool // If we're doing a recursive watch, should be true.
filter FilterFunc
@ -332,10 +365,13 @@ type etcdWatcher struct {
emit func(watch.Event)
}
// Returns a new etcdWatcher; if list is true, watch sub-nodes.
func newEtcdWatcher(list bool, filter FilterFunc, encoding Codec) *etcdWatcher {
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform
// and a versioner, the versioner must be able to handle the objects that transform creates.
func newEtcdWatcher(list bool, filter FilterFunc, encoding Codec, versioner ResourceVersioner, transform TransformFunc) *etcdWatcher {
w := &etcdWatcher{
encoding: encoding,
versioner: versioner,
transform: transform,
list: list,
filter: filter,
etcdIncoming: make(chan *etcd.Response),
@ -354,13 +390,55 @@ func newEtcdWatcher(list bool, filter FilterFunc, encoding Codec) *etcdWatcher {
func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion uint64) {
defer util.HandleCrash()
defer close(w.etcdCallEnded)
if resourceVersion == 0 {
latest, ok := etcdGetInitialWatchState(client, key, w.list, w.etcdIncoming)
if !ok {
return
}
resourceVersion = latest
}
_, err := client.Watch(key, resourceVersion, w.list, w.etcdIncoming, w.etcdStop)
if err != etcd.ErrWatchStoppedByUser {
glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, err)
glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, key)
}
}
// Pull stuff from etcd, convert, and push out the outgoing channel. Meant to be
// etcdGetInitialWatchState turns an etcd Get request into a watch equivalent
func etcdGetInitialWatchState(client EtcdGetSet, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, success bool) {
success = true
resp, err := client.Get(key, false, recursive)
if err != nil {
if !IsEtcdNotFound(err) {
glog.Errorf("watch was unable to retrieve the current index for the provided key: %v (%#v)", err, key)
success = false
return
}
if index, ok := etcdErrorIndex(err); ok {
resourceVersion = index
}
return
}
resourceVersion = resp.EtcdIndex
convertRecursiveResponse(resp.Node, resp, incoming)
return
}
// convertRecursiveResponse turns a recursive get response from etcd into individual response objects
// by copying the original response. This emulates the behavior of a recursive watch.
func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming chan<- *etcd.Response) {
if node.Dir {
for i := range node.Nodes {
convertRecursiveResponse(node.Nodes[i], response, incoming)
}
return
}
copied := *response
copied.Node = node
incoming <- &copied
}
// translate pulls stuff from etcd, convert, and push out the outgoing channel. Meant to be
// called as a goroutine.
func (w *etcdWatcher) translate() {
defer close(w.outgoing)
@ -385,6 +463,7 @@ func (w *etcdWatcher) translate() {
func (w *etcdWatcher) sendResult(res *etcd.Response) {
var action watch.EventType
var data []byte
var index uint64
switch res.Action {
case "create":
if res.Node == nil {
@ -392,13 +471,15 @@ func (w *etcdWatcher) sendResult(res *etcd.Response) {
return
}
data = []byte(res.Node.Value)
index = res.Node.ModifiedIndex
action = watch.Added
case "set":
case "set", "compareAndSwap", "get":
if res.Node == nil {
glog.Errorf("unexpected nil node: %#v", res)
return
}
data = []byte(res.Node.Value)
index = res.Node.ModifiedIndex
action = watch.Modified
case "delete":
if res.PrevNode == nil {
@ -406,6 +487,7 @@ func (w *etcdWatcher) sendResult(res *etcd.Response) {
return
}
data = []byte(res.PrevNode.Value)
index = res.PrevNode.ModifiedIndex
action = watch.Deleted
default:
glog.Errorf("unknown action: %v", res.Action)
@ -419,6 +501,25 @@ func (w *etcdWatcher) sendResult(res *etcd.Response) {
w.Stop()
return
}
// ensure resource version is set on the object we load from etcd
if w.versioner != nil {
if err := w.versioner.SetResourceVersion(obj, index); err != nil {
glog.Errorf("failure to version api object (%d) %#v: %v", index, obj, err)
}
}
// perform any necessary transformation
if w.transform != nil {
obj, err = w.transform(obj)
if err != nil {
glog.Errorf("failure to transform api object %#v: %v", obj, err)
// TODO: expose an error through watch.Interface?
w.Stop()
return
}
}
w.emit(watch.Event{
Type: action,
Object: obj,

View File

@ -21,6 +21,7 @@ import (
"reflect"
"sync"
"testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
@ -329,7 +330,7 @@ func TestWatchInterpretation_ListCreate(t *testing.T) {
w := newEtcdWatcher(true, func(interface{}) bool {
t.Errorf("unexpected filter call")
return true
}, codec)
}, codec, versioner, nil)
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
podBytes, _ := codec.Encode(pod)
@ -353,7 +354,7 @@ func TestWatchInterpretation_ListAdd(t *testing.T) {
w := newEtcdWatcher(true, func(interface{}) bool {
t.Errorf("unexpected filter call")
return true
}, codec)
}, codec, versioner, nil)
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
podBytes, _ := codec.Encode(pod)
@ -377,7 +378,7 @@ func TestWatchInterpretation_Delete(t *testing.T) {
w := newEtcdWatcher(true, func(interface{}) bool {
t.Errorf("unexpected filter call")
return true
}, codec)
}, codec, versioner, nil)
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
podBytes, _ := codec.Encode(pod)
@ -401,7 +402,7 @@ func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
w := newEtcdWatcher(false, func(interface{}) bool {
t.Errorf("unexpected filter call")
return true
}, codec)
}, codec, versioner, nil)
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
@ -415,7 +416,7 @@ func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
w := newEtcdWatcher(false, func(interface{}) bool {
t.Errorf("unexpected filter call")
return true
}, codec)
}, codec, versioner, nil)
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
@ -428,7 +429,7 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) {
w := newEtcdWatcher(false, func(interface{}) bool {
t.Errorf("unexpected filter call")
return true
}, codec)
}, codec, versioner, nil)
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
@ -442,6 +443,7 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) {
func TestWatch(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t)
fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{}
h := EtcdHelper{fakeClient, codec, versioner}
watching, err := h.Watch("/some/key", 0)
@ -450,6 +452,10 @@ func TestWatch(t *testing.T) {
}
fakeClient.WaitForWatchCompletion()
// when no get can be done AND the server doesn't provide an index, the Watch is 0 (from now)
if fakeClient.WatchIndex != 0 {
t.Errorf("Expected client to be at index %d, got %#v", 0, fakeClient)
}
// Test normal case
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
@ -481,9 +487,165 @@ func TestWatch(t *testing.T) {
}
}
func TestWatchFromZeroIndex(t *testing.T) {
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
fakeClient := MakeFakeEtcdClient(t)
fakeClient.Data["/some/key"] = EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: api.EncodeOrDie(pod),
ModifiedIndex: 1,
},
Action: "compareAndSwap",
EtcdIndex: 2,
},
}
h := EtcdHelper{fakeClient, codec, versioner}
watching, err := h.Watch("/some/key", 0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
// the existing node is detected and the index set
event := <-watching.ResultChan()
if e, a := watch.Modified, event.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
actualPod, ok := event.Object.(*api.Pod)
if !ok {
t.Fatalf("expected a pod, got %#v", event.Object)
}
if actualPod.ResourceVersion != 1 {
t.Errorf("Expected pod with resource version %d, Got %#v", 1, actualPod)
}
pod.ResourceVersion = 1
if e, a := pod, event.Object; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
watching.Stop()
}
func TestWatchListFromZeroIndex(t *testing.T) {
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
fakeClient := MakeFakeEtcdClient(t)
fakeClient.Data["/some/key"] = EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Dir: true,
Nodes: etcd.Nodes{
&etcd.Node{
Value: api.EncodeOrDie(pod),
ModifiedIndex: 1,
Nodes: etcd.Nodes{},
},
&etcd.Node{
Value: api.EncodeOrDie(pod),
ModifiedIndex: 2,
Nodes: etcd.Nodes{},
},
},
},
Action: "get",
EtcdIndex: 3,
},
}
h := EtcdHelper{fakeClient, codec, versioner}
watching, err := h.WatchList("/some/key", 0, nil)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// the existing node is detected and the index set
event := <-watching.ResultChan()
for i := 0; i < 2; i++ {
if e, a := watch.Modified, event.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
actualPod, ok := event.Object.(*api.Pod)
if !ok {
t.Fatalf("expected a pod, got %#v", event.Object)
}
if actualPod.ResourceVersion != 1 {
t.Errorf("Expected pod with resource version %d, Got %#v", 1, actualPod)
}
pod.ResourceVersion = 1
if e, a := pod, event.Object; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
}
fakeClient.WaitForWatchCompletion()
watching.Stop()
}
func TestWatchFromNotFound(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t)
fakeClient.Data["/some/key"] = EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: &etcd.EtcdError{
Index: 2,
ErrorCode: 100,
},
}
h := EtcdHelper{fakeClient, codec, versioner}
watching, err := h.Watch("/some/key", 0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
if fakeClient.WatchIndex != 2 {
t.Errorf("Expected client to wait for %d, got %#v", 2, fakeClient)
}
watching.Stop()
}
func TestWatchFromOtherError(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t)
fakeClient.Data["/some/key"] = EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: &etcd.EtcdError{
Index: 2,
ErrorCode: 101,
},
}
h := EtcdHelper{fakeClient, codec, versioner}
watching, err := h.Watch("/some/key", 0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
select {
case _, ok := <-watching.ResultChan():
if ok {
t.Fatalf("expected result channel to be closed")
}
case <-time.After(1 * time.Millisecond):
t.Fatalf("watch should have closed channel: %#v", watching)
}
if fakeClient.WatchResponse != nil || fakeClient.WatchIndex != 0 {
t.Fatalf("Watch should not have been invoked: %#v", fakeClient)
}
}
func TestWatchPurposefulShutdown(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t)
h := EtcdHelper{fakeClient, codec, versioner}
fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{}
// Test purposeful shutdown
watching, err := h.Watch("/some/key", 0)

View File

@ -51,6 +51,7 @@ type FakeEtcdClient struct {
// Will become valid after Watch is called; tester may write to it. Tester may
// also read from it to verify that it's closed after injecting an error.
WatchResponse chan *etcd.Response
WatchIndex uint64
// Write to this to prematurely stop a Watch that is running in a goroutine.
WatchInjectError chan<- error
WatchStop chan<- bool
@ -229,6 +230,7 @@ func (f *FakeEtcdClient) WaitForWatchCompletion() {
func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) {
f.WatchResponse = receiver
f.WatchStop = stop
f.WatchIndex = waitIndex
injectedError := make(chan error)
defer close(injectedError)