diff --git a/pkg/tools/etcd_tools.go b/pkg/tools/etcd_tools.go index 2b3e46fec7c..e1fe2840143 100644 --- a/pkg/tools/etcd_tools.go +++ b/pkg/tools/etcd_tools.go @@ -106,12 +106,21 @@ func IsEtcdWatchStoppedByUser(err error) bool { return etcd.ErrWatchStoppedByUser == err } -// Returns true iff err is an etcd error, whose errorCode matches errorCode +// isEtcdErrorNum returns true iff err is an etcd error, whose errorCode matches errorCode func isEtcdErrorNum(err error, errorCode int) bool { etcdError, ok := err.(*etcd.EtcdError) return ok && etcdError != nil && etcdError.ErrorCode == errorCode } +// etcdErrorIndex returns the index associated with the error message and whether the +// index was available. +func etcdErrorIndex(err error) (uint64, bool) { + if etcdError, ok := err.(*etcd.EtcdError); ok { + return etcdError.Index, true + } + return 0, false +} + func (h *EtcdHelper) listEtcdNode(key string) ([]*etcd.Node, error) { result, err := h.Client.Get(key, false, true) if err != nil { @@ -297,9 +306,9 @@ func Everything(interface{}) bool { // WatchList begins watching the specified key's items. Items are decoded into // API objects, and any items passing 'filter' are sent down the returned // watch.Interface. resourceVersion may be used to specify what version to begin -// watching (e.g., for reconnecting without missing any updateds). +// watching (e.g., for reconnecting without missing any updates). func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) { - w := newEtcdWatcher(true, filter, h.Codec) + w := newEtcdWatcher(true, filter, h.Codec, h.ResourceVersioner, nil) go w.etcdWatch(h.Client, key, resourceVersion) return w, nil } @@ -307,14 +316,38 @@ func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter Filter // Watch begins watching the specified key. Events are decoded into // API objects and sent down the returned watch.Interface. func (h *EtcdHelper) Watch(key string, resourceVersion uint64) (watch.Interface, error) { - w := newEtcdWatcher(false, nil, h.Codec) + return h.WatchAndTransform(key, resourceVersion, nil) +} + +// WatchAndTransform begins watching the specified key. Events are decoded into +// API objects and sent down the returned watch.Interface. If the transform +// function is provided, the value decoded from etcd will be passed to the function +// prior to being returned. +// +// The transform function can be used to populate data not available to etcd, or to +// change or wrap the serialized etcd object. +// +// startTime := time.Now() +// helper.WatchAndTransform(key, version, func(input interface{}) (interface{}, error) { +// value := input.(TimeAwareValue) +// value.Since = startTime +// return value, nil +// }) +// +func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) (watch.Interface, error) { + w := newEtcdWatcher(false, nil, h.Codec, h.ResourceVersioner, transform) go w.etcdWatch(h.Client, key, resourceVersion) return w, nil } +// TransformFunc attempts to convert an object to another object for use with a watcher +type TransformFunc func(interface{}) (interface{}, error) + // etcdWatcher converts a native etcd watch to a watch.Interface. type etcdWatcher struct { - encoding Codec + encoding Codec + versioner ResourceVersioner + transform TransformFunc list bool // If we're doing a recursive watch, should be true. filter FilterFunc @@ -332,10 +365,13 @@ type etcdWatcher struct { emit func(watch.Event) } -// Returns a new etcdWatcher; if list is true, watch sub-nodes. -func newEtcdWatcher(list bool, filter FilterFunc, encoding Codec) *etcdWatcher { +// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform +// and a versioner, the versioner must be able to handle the objects that transform creates. +func newEtcdWatcher(list bool, filter FilterFunc, encoding Codec, versioner ResourceVersioner, transform TransformFunc) *etcdWatcher { w := &etcdWatcher{ encoding: encoding, + versioner: versioner, + transform: transform, list: list, filter: filter, etcdIncoming: make(chan *etcd.Response), @@ -354,13 +390,55 @@ func newEtcdWatcher(list bool, filter FilterFunc, encoding Codec) *etcdWatcher { func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion uint64) { defer util.HandleCrash() defer close(w.etcdCallEnded) + if resourceVersion == 0 { + latest, ok := etcdGetInitialWatchState(client, key, w.list, w.etcdIncoming) + if !ok { + return + } + resourceVersion = latest + } _, err := client.Watch(key, resourceVersion, w.list, w.etcdIncoming, w.etcdStop) if err != etcd.ErrWatchStoppedByUser { - glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, err) + glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, key) } } -// Pull stuff from etcd, convert, and push out the outgoing channel. Meant to be +// etcdGetInitialWatchState turns an etcd Get request into a watch equivalent +func etcdGetInitialWatchState(client EtcdGetSet, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, success bool) { + success = true + + resp, err := client.Get(key, false, recursive) + if err != nil { + if !IsEtcdNotFound(err) { + glog.Errorf("watch was unable to retrieve the current index for the provided key: %v (%#v)", err, key) + success = false + return + } + if index, ok := etcdErrorIndex(err); ok { + resourceVersion = index + } + return + } + resourceVersion = resp.EtcdIndex + convertRecursiveResponse(resp.Node, resp, incoming) + return +} + +// convertRecursiveResponse turns a recursive get response from etcd into individual response objects +// by copying the original response. This emulates the behavior of a recursive watch. +func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming chan<- *etcd.Response) { + if node.Dir { + for i := range node.Nodes { + convertRecursiveResponse(node.Nodes[i], response, incoming) + } + return + } + copied := *response + copied.Node = node + incoming <- &copied +} + +// translate pulls stuff from etcd, convert, and push out the outgoing channel. Meant to be // called as a goroutine. func (w *etcdWatcher) translate() { defer close(w.outgoing) @@ -385,6 +463,7 @@ func (w *etcdWatcher) translate() { func (w *etcdWatcher) sendResult(res *etcd.Response) { var action watch.EventType var data []byte + var index uint64 switch res.Action { case "create": if res.Node == nil { @@ -392,13 +471,15 @@ func (w *etcdWatcher) sendResult(res *etcd.Response) { return } data = []byte(res.Node.Value) + index = res.Node.ModifiedIndex action = watch.Added - case "set": + case "set", "compareAndSwap", "get": if res.Node == nil { glog.Errorf("unexpected nil node: %#v", res) return } data = []byte(res.Node.Value) + index = res.Node.ModifiedIndex action = watch.Modified case "delete": if res.PrevNode == nil { @@ -406,6 +487,7 @@ func (w *etcdWatcher) sendResult(res *etcd.Response) { return } data = []byte(res.PrevNode.Value) + index = res.PrevNode.ModifiedIndex action = watch.Deleted default: glog.Errorf("unknown action: %v", res.Action) @@ -419,6 +501,25 @@ func (w *etcdWatcher) sendResult(res *etcd.Response) { w.Stop() return } + + // ensure resource version is set on the object we load from etcd + if w.versioner != nil { + if err := w.versioner.SetResourceVersion(obj, index); err != nil { + glog.Errorf("failure to version api object (%d) %#v: %v", index, obj, err) + } + } + + // perform any necessary transformation + if w.transform != nil { + obj, err = w.transform(obj) + if err != nil { + glog.Errorf("failure to transform api object %#v: %v", obj, err) + // TODO: expose an error through watch.Interface? + w.Stop() + return + } + } + w.emit(watch.Event{ Type: action, Object: obj, diff --git a/pkg/tools/etcd_tools_test.go b/pkg/tools/etcd_tools_test.go index fec310c81fd..c7398e031e7 100644 --- a/pkg/tools/etcd_tools_test.go +++ b/pkg/tools/etcd_tools_test.go @@ -21,6 +21,7 @@ import ( "reflect" "sync" "testing" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/conversion" @@ -329,7 +330,7 @@ func TestWatchInterpretation_ListCreate(t *testing.T) { w := newEtcdWatcher(true, func(interface{}) bool { t.Errorf("unexpected filter call") return true - }, codec) + }, codec, versioner, nil) pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} podBytes, _ := codec.Encode(pod) @@ -353,7 +354,7 @@ func TestWatchInterpretation_ListAdd(t *testing.T) { w := newEtcdWatcher(true, func(interface{}) bool { t.Errorf("unexpected filter call") return true - }, codec) + }, codec, versioner, nil) pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} podBytes, _ := codec.Encode(pod) @@ -377,7 +378,7 @@ func TestWatchInterpretation_Delete(t *testing.T) { w := newEtcdWatcher(true, func(interface{}) bool { t.Errorf("unexpected filter call") return true - }, codec) + }, codec, versioner, nil) pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} podBytes, _ := codec.Encode(pod) @@ -401,7 +402,7 @@ func TestWatchInterpretation_ResponseNotSet(t *testing.T) { w := newEtcdWatcher(false, func(interface{}) bool { t.Errorf("unexpected filter call") return true - }, codec) + }, codec, versioner, nil) w.emit = func(e watch.Event) { t.Errorf("Unexpected emit: %v", e) } @@ -415,7 +416,7 @@ func TestWatchInterpretation_ResponseNoNode(t *testing.T) { w := newEtcdWatcher(false, func(interface{}) bool { t.Errorf("unexpected filter call") return true - }, codec) + }, codec, versioner, nil) w.emit = func(e watch.Event) { t.Errorf("Unexpected emit: %v", e) } @@ -428,7 +429,7 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) { w := newEtcdWatcher(false, func(interface{}) bool { t.Errorf("unexpected filter call") return true - }, codec) + }, codec, versioner, nil) w.emit = func(e watch.Event) { t.Errorf("Unexpected emit: %v", e) } @@ -442,6 +443,7 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) { func TestWatch(t *testing.T) { fakeClient := MakeFakeEtcdClient(t) + fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{} h := EtcdHelper{fakeClient, codec, versioner} watching, err := h.Watch("/some/key", 0) @@ -450,6 +452,10 @@ func TestWatch(t *testing.T) { } fakeClient.WaitForWatchCompletion() + // when no get can be done AND the server doesn't provide an index, the Watch is 0 (from now) + if fakeClient.WatchIndex != 0 { + t.Errorf("Expected client to be at index %d, got %#v", 0, fakeClient) + } // Test normal case pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} @@ -481,9 +487,165 @@ func TestWatch(t *testing.T) { } } +func TestWatchFromZeroIndex(t *testing.T) { + pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} + + fakeClient := MakeFakeEtcdClient(t) + fakeClient.Data["/some/key"] = EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: api.EncodeOrDie(pod), + ModifiedIndex: 1, + }, + Action: "compareAndSwap", + EtcdIndex: 2, + }, + } + h := EtcdHelper{fakeClient, codec, versioner} + + watching, err := h.Watch("/some/key", 0) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + fakeClient.WaitForWatchCompletion() + + // the existing node is detected and the index set + event := <-watching.ResultChan() + if e, a := watch.Modified, event.Type; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + actualPod, ok := event.Object.(*api.Pod) + if !ok { + t.Fatalf("expected a pod, got %#v", event.Object) + } + if actualPod.ResourceVersion != 1 { + t.Errorf("Expected pod with resource version %d, Got %#v", 1, actualPod) + } + pod.ResourceVersion = 1 + if e, a := pod, event.Object; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %v, got %v", e, a) + } + watching.Stop() +} + +func TestWatchListFromZeroIndex(t *testing.T) { + pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} + + fakeClient := MakeFakeEtcdClient(t) + fakeClient.Data["/some/key"] = EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Dir: true, + Nodes: etcd.Nodes{ + &etcd.Node{ + Value: api.EncodeOrDie(pod), + ModifiedIndex: 1, + Nodes: etcd.Nodes{}, + }, + &etcd.Node{ + Value: api.EncodeOrDie(pod), + ModifiedIndex: 2, + Nodes: etcd.Nodes{}, + }, + }, + }, + Action: "get", + EtcdIndex: 3, + }, + } + h := EtcdHelper{fakeClient, codec, versioner} + + watching, err := h.WatchList("/some/key", 0, nil) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // the existing node is detected and the index set + event := <-watching.ResultChan() + for i := 0; i < 2; i++ { + if e, a := watch.Modified, event.Type; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + actualPod, ok := event.Object.(*api.Pod) + if !ok { + t.Fatalf("expected a pod, got %#v", event.Object) + } + if actualPod.ResourceVersion != 1 { + t.Errorf("Expected pod with resource version %d, Got %#v", 1, actualPod) + } + pod.ResourceVersion = 1 + if e, a := pod, event.Object; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %v, got %v", e, a) + } + } + + fakeClient.WaitForWatchCompletion() + watching.Stop() +} + +func TestWatchFromNotFound(t *testing.T) { + fakeClient := MakeFakeEtcdClient(t) + fakeClient.Data["/some/key"] = EtcdResponseWithError{ + R: &etcd.Response{ + Node: nil, + }, + E: &etcd.EtcdError{ + Index: 2, + ErrorCode: 100, + }, + } + h := EtcdHelper{fakeClient, codec, versioner} + + watching, err := h.Watch("/some/key", 0) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + fakeClient.WaitForWatchCompletion() + if fakeClient.WatchIndex != 2 { + t.Errorf("Expected client to wait for %d, got %#v", 2, fakeClient) + } + + watching.Stop() +} + +func TestWatchFromOtherError(t *testing.T) { + fakeClient := MakeFakeEtcdClient(t) + fakeClient.Data["/some/key"] = EtcdResponseWithError{ + R: &etcd.Response{ + Node: nil, + }, + E: &etcd.EtcdError{ + Index: 2, + ErrorCode: 101, + }, + } + h := EtcdHelper{fakeClient, codec, versioner} + + watching, err := h.Watch("/some/key", 0) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + select { + case _, ok := <-watching.ResultChan(): + if ok { + t.Fatalf("expected result channel to be closed") + } + case <-time.After(1 * time.Millisecond): + t.Fatalf("watch should have closed channel: %#v", watching) + } + + if fakeClient.WatchResponse != nil || fakeClient.WatchIndex != 0 { + t.Fatalf("Watch should not have been invoked: %#v", fakeClient) + } +} + func TestWatchPurposefulShutdown(t *testing.T) { fakeClient := MakeFakeEtcdClient(t) h := EtcdHelper{fakeClient, codec, versioner} + fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{} // Test purposeful shutdown watching, err := h.Watch("/some/key", 0) diff --git a/pkg/tools/fake_etcd_client.go b/pkg/tools/fake_etcd_client.go index 39a0c320783..2379837a2aa 100644 --- a/pkg/tools/fake_etcd_client.go +++ b/pkg/tools/fake_etcd_client.go @@ -51,6 +51,7 @@ type FakeEtcdClient struct { // Will become valid after Watch is called; tester may write to it. Tester may // also read from it to verify that it's closed after injecting an error. WatchResponse chan *etcd.Response + WatchIndex uint64 // Write to this to prematurely stop a Watch that is running in a goroutine. WatchInjectError chan<- error WatchStop chan<- bool @@ -229,6 +230,7 @@ func (f *FakeEtcdClient) WaitForWatchCompletion() { func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) { f.WatchResponse = receiver f.WatchStop = stop + f.WatchIndex = waitIndex injectedError := make(chan error) defer close(injectedError)