From b630c7bcb40099359263753a20e0dfe025be2aa6 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Sat, 19 Jul 2014 23:26:26 -0700 Subject: [PATCH] Implement watch.Interface via etcd. --- pkg/tools/etcd_tools.go | 157 ++++++++++++++++++++++++++++++ pkg/tools/etcd_tools_test.go | 181 +++++++++++++++++++++++++++++++++++ 2 files changed, 338 insertions(+) diff --git a/pkg/tools/etcd_tools.go b/pkg/tools/etcd_tools.go index 1c6a3fc8eea..39539e1289c 100644 --- a/pkg/tools/etcd_tools.go +++ b/pkg/tools/etcd_tools.go @@ -22,7 +22,10 @@ import ( "reflect" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/coreos/go-etcd/etcd" + "github.com/golang/glog" ) const ( @@ -53,6 +56,7 @@ type EtcdGetSet interface { Get(key string, sort, recursive bool) (*etcd.Response, error) Set(key, value string, ttl uint64) (*etcd.Response, error) CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error) + Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) } // EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client. @@ -228,3 +232,156 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType interface{}, tryUpdate E return err } } + +// FilterFunc is a predicate which takes an API object and returns true +// iff the object should remain in the set. +type FilterFunc func(obj interface{}) bool + +// Everything is a FilterFunc which accepts all objects. +func Everything(interface{}) bool { + return true +} + +// WatchList begins watching the specified key's items. Items are decoded into +// API objects, and any items passing 'filter' are sent down the returned +// watch.Interface. +func (h *EtcdHelper) WatchList(key string, filter FilterFunc) (watch.Interface, error) { + w := newEtcdWatcher(true, filter) + go w.etcdWatch(h.Client, key) + return w, nil +} + +// Watch begins watching the specified key. Events are decoded into +// API objects and sent down the returned watch.Interface. +func (h *EtcdHelper) Watch(key string) (watch.Interface, error) { + w := newEtcdWatcher(false, nil) + go w.etcdWatch(h.Client, key) + return w, nil +} + +// etcdWatcher converts a native etcd watch to a watch.Interface. +type etcdWatcher struct { + list bool // If we're doing a recursive watch, should be true. + filter FilterFunc + + etcdIncoming chan *etcd.Response + etcdStop chan bool + + outgoing chan watch.Event + userStop chan struct{} +} + +// Returns a new etcdWatcher; if list is true, watch sub-nodes. +func newEtcdWatcher(list bool, filter FilterFunc) *etcdWatcher { + w := &etcdWatcher{ + list: list, + filter: filter, + etcdIncoming: make(chan *etcd.Response), + etcdStop: make(chan bool), + outgoing: make(chan watch.Event), + userStop: make(chan struct{}), + } + go w.translate() + return w +} + +// etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called +// as a goroutine. +func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string) { + defer util.HandleCrash() + _, err := client.Watch(key, 0, w.list, w.etcdIncoming, w.etcdStop) + if err == etcd.ErrWatchStoppedByUser { + // etcd doesn't close the channel in this case. + close(w.etcdIncoming) + } else { + glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, err) + } +} + +// Pull stuff from etcd, convert, and push out the outgoing channel. Meant to be +// called as a goroutine. +func (w *etcdWatcher) translate() { + defer close(w.outgoing) + defer util.HandleCrash() + + for { + select { + case <-w.userStop: + w.etcdStop <- true + return + case res, ok := <-w.etcdIncoming: + if !ok { + return + } + w.sendResult(res) + } + } +} + +func (w *etcdWatcher) sendResult(res *etcd.Response) { + var action watch.EventType + var data []byte + var nodes etcd.Nodes + switch res.Action { + case "set": + if res.Node == nil { + glog.Errorf("unexpected nil node: %#v", res) + return + } + data = []byte(res.Node.Value) + nodes = res.Node.Nodes + // TODO: Is this conditional correct? + if res.EtcdIndex > 0 { + action = watch.Modified + } else { + action = watch.Added + } + case "delete": + if res.PrevNode == nil { + glog.Errorf("unexpected nil prev node: %#v", res) + return + } + data = []byte(res.PrevNode.Value) + nodes = res.PrevNode.Nodes + action = watch.Deleted + } + + // If listing, we're interested in sub-nodes. + if w.list { + for _, n := range nodes { + obj, err := api.Decode([]byte(n.Value)) + if err != nil { + glog.Errorf("failure to decode api object: %#v", res) + continue + } + if w.filter != nil && !w.filter(obj) { + continue + } + w.outgoing <- watch.Event{ + Type: action, + Object: obj, + } + } + return + } + + obj, err := api.Decode(data) + if err != nil { + glog.Errorf("failure to decode api object: %#v", res) + return + } + w.outgoing <- watch.Event{ + Type: action, + Object: obj, + } +} + +// ResultChannel implements watch.Interface. +func (w *etcdWatcher) ResultChan() <-chan watch.Event { + return w.outgoing +} + +// Stop implements watch.Interface. +func (w *etcdWatcher) Stop() { + close(w.userStop) +} diff --git a/pkg/tools/etcd_tools_test.go b/pkg/tools/etcd_tools_test.go index 2f0bef5bae7..0796bbb8773 100644 --- a/pkg/tools/etcd_tools_test.go +++ b/pkg/tools/etcd_tools_test.go @@ -20,8 +20,11 @@ import ( "fmt" "reflect" "testing" + "time" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/coreos/go-etcd/etcd" ) @@ -150,3 +153,181 @@ func TestSetObj(t *testing.T) { t.Errorf("Wanted %v, got %v", expect, got) } } + +func TestWatchInterpretation_ListAdd(t *testing.T) { + called := false + w := newEtcdWatcher(true, func(interface{}) bool { + called = true + return true + }) + pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} + podBytes, _ := api.Encode(pod) + + go w.sendResult(&etcd.Response{ + Action: "set", + Node: &etcd.Node{ + Nodes: etcd.Nodes{ + { + Value: string(podBytes), + }, + }, + }, + }) + + got := <-w.outgoing + if e, a := watch.Added, got.Type; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + if e, a := pod, got.Object; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %v, got %v", e, a) + } + if !called { + t.Errorf("filter never called") + } +} + +func TestWatchInterpretation_ListDelete(t *testing.T) { + called := false + w := newEtcdWatcher(true, func(interface{}) bool { + called = true + return true + }) + pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} + podBytes, _ := api.Encode(pod) + + go w.sendResult(&etcd.Response{ + Action: "delete", + PrevNode: &etcd.Node{ + Nodes: etcd.Nodes{ + { + Value: string(podBytes), + }, + }, + }, + }) + + got := <-w.outgoing + if e, a := watch.Deleted, got.Type; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + if e, a := pod, got.Object; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %v, got %v", e, a) + } + if !called { + t.Errorf("filter never called") + } +} + +func TestWatchInterpretation_SingleAdd(t *testing.T) { + w := newEtcdWatcher(false, func(interface{}) bool { + t.Errorf("unexpected filter call") + return true + }) + pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} + podBytes, _ := api.Encode(pod) + + go w.sendResult(&etcd.Response{ + Action: "set", + Node: &etcd.Node{ + Value: string(podBytes), + }, + }) + + got := <-w.outgoing + if e, a := watch.Added, got.Type; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + if e, a := pod, got.Object; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %v, got %v", e, a) + } +} + +func TestWatchInterpretation_SingleDelete(t *testing.T) { + w := newEtcdWatcher(false, func(interface{}) bool { + t.Errorf("unexpected filter call") + return true + }) + pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} + podBytes, _ := api.Encode(pod) + + go w.sendResult(&etcd.Response{ + Action: "delete", + PrevNode: &etcd.Node{ + Value: string(podBytes), + }, + }) + + got := <-w.outgoing + if e, a := watch.Deleted, got.Type; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + if e, a := pod, got.Object; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %v, got %v", e, a) + } +} + +func TestWatch(t *testing.T) { + fakeEtcd := MakeFakeEtcdClient(t) + h := EtcdHelper{fakeEtcd} + + watching, err := h.Watch("/some/key") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + fakeEtcd.WaitForWatchCompletion() + + // Test normal case + pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} + podBytes, _ := api.Encode(pod) + fakeEtcd.WatchResponse <- &etcd.Response{ + Action: "set", + Node: &etcd.Node{ + Value: string(podBytes), + }, + } + + select { + case event := <-watching.ResultChan(): + if e, a := watch.Added, event.Type; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + if e, a := pod, event.Object; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %v, got %v", e, a) + } + case <-time.After(10 * time.Millisecond): + t.Errorf("Expected 1 call but got 0") + } + + // Test error case + fakeEtcd.WatchInjectError <- fmt.Errorf("Injected error") + + // Did everything shut down? + if _, open := <-fakeEtcd.WatchResponse; open { + t.Errorf("An injected error did not cause a graceful shutdown") + } + if _, open := <-watching.ResultChan(); open { + t.Errorf("An injected error did not cause a graceful shutdown") + } +} + +func TestWatchPurposefulShutdown(t *testing.T) { + fakeEtcd := MakeFakeEtcdClient(t) + h := EtcdHelper{fakeEtcd} + + // Test purposeful shutdown + watching, err := h.Watch("/some/key") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + fakeEtcd.WaitForWatchCompletion() + watching.Stop() + + // Did everything shut down? + if _, open := <-fakeEtcd.WatchResponse; open { + t.Errorf("A stop did not cause a graceful shutdown") + } + if _, open := <-watching.ResultChan(); open { + t.Errorf("An injected error did not cause a graceful shutdown") + } +}