From 928092e79eddfb396c09e056dbf6a0a7cc00a9d3 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Sun, 20 Jul 2014 12:00:52 -0700 Subject: [PATCH] Etcd watcher verification To make sure the etcd watcher works, I changed the replication controller to use watch.Interface. I made apiserver support watches on controllers, so replicationController can be run only off of the apiserver. I made sure all the etcd watch testing that used to be in replicationController is now tested on the new etcd watcher in pkg/tools/. --- cmd/integration/integration.go | 12 +- pkg/apiserver/interfaces.go | 1 + pkg/controller/replication_controller.go | 94 ++++------ pkg/controller/replication_controller_test.go | 171 +----------------- pkg/registry/controller_registry.go | 14 ++ pkg/registry/controller_registry_test.go | 12 +- pkg/registry/etcd_registry.go | 7 + pkg/registry/interfaces.go | 2 + pkg/registry/memory_registry.go | 7 + pkg/tools/decoder.go | 10 +- pkg/tools/etcd_tools.go | 72 ++++---- pkg/tools/etcd_tools_test.go | 111 +++++------- pkg/tools/fake_etcd_client.go | 5 +- 13 files changed, 187 insertions(+), 331 deletions(-) diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 0818aa7beab..616eb8015cf 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -104,7 +104,10 @@ func startComponents(manifestURL string) (apiServerURL string) { handler.delegate = m.ConstructHandler("/api/v1beta1") controllerManager := controller.MakeReplicationManager(etcdClient, cl) - controllerManager.Run(1 * time.Second) + + // Prove that controllerManager's watch works by making it not sync until after this + // test is over. (Hopefully we don't take 10 minutes!) + controllerManager.Run(10 * time.Minute) // Kubelet (localhost) cfg1 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates) @@ -192,7 +195,12 @@ func runAtomicPutTest(c *client.Client) { for { glog.Infof("Starting to update (%s, %s)", l, v) var tmpSvc api.Service - err := c.Get().Path("services").Path(svc.ID).Do().Into(&tmpSvc) + err := c.Get(). + Path("services"). + Path(svc.ID). + PollPeriod(100 * time.Millisecond). + Do(). + Into(&tmpSvc) if err != nil { glog.Errorf("Error getting atomicService: %v", err) continue diff --git a/pkg/apiserver/interfaces.go b/pkg/apiserver/interfaces.go index c268d532a69..318d09f6e8a 100644 --- a/pkg/apiserver/interfaces.go +++ b/pkg/apiserver/interfaces.go @@ -43,6 +43,7 @@ type RESTStorage interface { // ResourceWatcher should be implemented by all RESTStorage objects that // want to offer the ability to watch for changes through the watch api. type ResourceWatcher interface { + // TODO: take a query, like List, to filter out unwanted events. WatchAll() (watch.Interface, error) WatchSingle(id string) (watch.Interface, error) } diff --git a/pkg/controller/replication_controller.go b/pkg/controller/replication_controller.go index a9c448147c1..37abec12921 100644 --- a/pkg/controller/replication_controller.go +++ b/pkg/controller/replication_controller.go @@ -17,8 +17,6 @@ limitations under the License. package controller import ( - "encoding/json" - "fmt" "sync" "time" @@ -27,13 +25,14 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" - "github.com/coreos/go-etcd/etcd" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/golang/glog" ) // ReplicationManager is responsible for synchronizing ReplicationController objects stored in etcd // with actual running pods. -// TODO: Remove the etcd dependency and re-factor in terms of a generic watch interface +// TODO: Allow choice of switching between etcd/apiserver watching, or remove etcd references +// from this file completely. type ReplicationManager struct { etcdClient tools.EtcdClient kubeClient client.Interface @@ -42,6 +41,9 @@ type ReplicationManager struct { // To allow injection of syncReplicationController for testing. syncHandler func(controllerSpec api.ReplicationController) error + + // To allow injection of watch creation. + watchMaker func() (watch.Interface, error) } // PodControlInterface is an interface that knows how to add or delete pods @@ -60,6 +62,7 @@ type RealPodControl struct { func (r RealPodControl) createReplica(controllerSpec api.ReplicationController) { labels := controllerSpec.DesiredState.PodTemplate.Labels + // TODO: don't fail to set this label just because the map isn't created. if labels != nil { labels["replicationController"] = controllerSpec.ID } @@ -86,9 +89,8 @@ func MakeReplicationManager(etcdClient tools.EtcdClient, kubeClient client.Inter kubeClient: kubeClient, }, } - rm.syncHandler = func(controllerSpec api.ReplicationController) error { - return rm.syncReplicationController(controllerSpec) - } + rm.syncHandler = rm.syncReplicationController + rm.watchMaker = rm.makeAPIWatch return rm } @@ -98,71 +100,51 @@ func (rm *ReplicationManager) Run(period time.Duration) { go util.Forever(func() { rm.watchControllers() }, period) } -func (rm *ReplicationManager) watchControllers() { - watchChannel := make(chan *etcd.Response) - stop := make(chan bool) - // Ensure that the call to watch ends. - defer close(stop) +// makeEtcdWatch starts watching via etcd. +func (rm *ReplicationManager) makeEtcdWatch() (watch.Interface, error) { + helper := tools.EtcdHelper{rm.etcdClient} + return helper.WatchList("/registry/controllers", tools.Everything) +} - go func() { - defer util.HandleCrash() - _, err := rm.etcdClient.Watch("/registry/controllers", 0, true, watchChannel, stop) - if err == etcd.ErrWatchStoppedByUser { - close(watchChannel) - } else { - glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, err) - } - }() +// makeAPIWatch starts watching via the apiserver. +func (rm *ReplicationManager) makeAPIWatch() (watch.Interface, error) { + // TODO: Fix this ugly type assertion. + return rm.kubeClient.(*client.Client). + Get(). + Path("watch"). + Path("replicationControllers"). + Watch() +} + +func (rm *ReplicationManager) watchControllers() { + watching, err := rm.watchMaker() + if err != nil { + glog.Errorf("Unexpected failure to watch: %v", err) + time.Sleep(5 * time.Second) + return + } for { select { case <-rm.syncTime: rm.synchronize() - case watchResponse, open := <-watchChannel: - if !open || watchResponse == nil { + case event, open := <-watching.ResultChan(): + if !open { // watchChannel has been closed, or something else went // wrong with our etcd watch call. Let the util.Forever() // that called us call us again. return } - glog.Infof("Got watch: %#v", watchResponse) - controller, err := rm.handleWatchResponse(watchResponse) - if err != nil { - glog.Errorf("Error handling data: %#v, %#v", err, watchResponse) - continue + glog.Infof("Got watch: %#v", event) + if rc, ok := event.Object.(*api.ReplicationController); !ok { + glog.Errorf("unexpected object: %#v", event.Object) + } else { + rm.syncHandler(*rc) } - rm.syncHandler(*controller) } } } -func (rm *ReplicationManager) handleWatchResponse(response *etcd.Response) (*api.ReplicationController, error) { - switch response.Action { - case "set": - if response.Node == nil { - return nil, fmt.Errorf("response node is null %#v", response) - } - var controllerSpec api.ReplicationController - if err := json.Unmarshal([]byte(response.Node.Value), &controllerSpec); err != nil { - return nil, err - } - return &controllerSpec, nil - case "delete": - // Ensure that the final state of a replication controller is applied before it is deleted. - // Otherwise, a replication controller could be modified and then deleted (for example, from 3 to 0 - // replicas), and it would be non-deterministic which of its pods continued to exist. - if response.PrevNode == nil { - return nil, fmt.Errorf("previous node is null %#v", response) - } - var controllerSpec api.ReplicationController - if err := json.Unmarshal([]byte(response.PrevNode.Value), &controllerSpec); err != nil { - return nil, err - } - return &controllerSpec, nil - } - return nil, nil -} - func (rm *ReplicationManager) filterActivePods(pods []api.Pod) []api.Pod { var result []api.Pod for _, value := range pods { diff --git a/pkg/controller/replication_controller_test.go b/pkg/controller/replication_controller_test.go index fbd660eccb6..e2845f852a8 100644 --- a/pkg/controller/replication_controller_test.go +++ b/pkg/controller/replication_controller_test.go @@ -29,6 +29,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/coreos/go-etcd/etcd" ) @@ -221,142 +222,6 @@ func TestCreateReplica(t *testing.T) { } } -func TestHandleWatchResponseNotSet(t *testing.T) { - body, _ := api.Encode(makePodList(2)) - fakeHandler := util.FakeHandler{ - StatusCode: 200, - ResponseBody: string(body), - } - testServer := httptest.NewTLSServer(&fakeHandler) - client := client.New(testServer.URL, nil) - - fakePodControl := FakePodControl{} - - manager := MakeReplicationManager(nil, client) - manager.podControl = &fakePodControl - _, err := manager.handleWatchResponse(&etcd.Response{ - Action: "update", - }) - if err != nil { - t.Errorf("Unexpected error: %v", err) - } -} - -func TestHandleWatchResponseNoNode(t *testing.T) { - body, _ := api.Encode(makePodList(2)) - fakeHandler := util.FakeHandler{ - StatusCode: 200, - ResponseBody: string(body), - } - testServer := httptest.NewTLSServer(&fakeHandler) - client := client.New(testServer.URL, nil) - - fakePodControl := FakePodControl{} - - manager := MakeReplicationManager(nil, client) - manager.podControl = &fakePodControl - _, err := manager.handleWatchResponse(&etcd.Response{ - Action: "set", - }) - if err == nil { - t.Error("Unexpected non-error") - } -} - -func TestHandleWatchResponseBadData(t *testing.T) { - body, _ := api.Encode(makePodList(2)) - fakeHandler := util.FakeHandler{ - StatusCode: 200, - ResponseBody: string(body), - } - testServer := httptest.NewTLSServer(&fakeHandler) - client := client.New(testServer.URL, nil) - - fakePodControl := FakePodControl{} - - manager := MakeReplicationManager(nil, client) - manager.podControl = &fakePodControl - _, err := manager.handleWatchResponse(&etcd.Response{ - Action: "set", - Node: &etcd.Node{ - Value: "foobar", - }, - }) - if err == nil { - t.Error("Unexpected non-error") - } -} - -func TestHandleWatchResponse(t *testing.T) { - body, _ := api.Encode(makePodList(2)) - fakeHandler := util.FakeHandler{ - StatusCode: 200, - ResponseBody: string(body), - } - testServer := httptest.NewTLSServer(&fakeHandler) - client := client.New(testServer.URL, nil) - - fakePodControl := FakePodControl{} - - manager := MakeReplicationManager(nil, client) - manager.podControl = &fakePodControl - - controller := makeReplicationController(2) - - // TODO: fixme when etcd uses Encode/Decode - data, err := json.Marshal(controller) - if err != nil { - t.Errorf("Unexpected error: %v", err) - } - controllerOut, err := manager.handleWatchResponse(&etcd.Response{ - Action: "set", - Node: &etcd.Node{ - Value: string(data), - }, - }) - if err != nil { - t.Errorf("Unexpected error: %#v", err) - } - if !reflect.DeepEqual(controller, *controllerOut) { - t.Errorf("Unexpected mismatch. Expected %#v, Saw: %#v", controller, controllerOut) - } -} - -func TestHandleWatchResponseDelete(t *testing.T) { - body, _ := api.Encode(makePodList(2)) - fakeHandler := util.FakeHandler{ - StatusCode: 200, - ResponseBody: string(body), - } - testServer := httptest.NewTLSServer(&fakeHandler) - client := client.New(testServer.URL, nil) - - fakePodControl := FakePodControl{} - - manager := MakeReplicationManager(nil, client) - manager.podControl = &fakePodControl - - controller := makeReplicationController(2) - - // TODO: fixme when etcd writing uses api.Encode - data, err := json.Marshal(controller) - if err != nil { - t.Errorf("Unexpected error: %v", err) - } - controllerOut, err := manager.handleWatchResponse(&etcd.Response{ - Action: "delete", - PrevNode: &etcd.Node{ - Value: string(data), - }, - }) - if err != nil { - t.Errorf("Unexpected error: %#v", err) - } - if !reflect.DeepEqual(controller, *controllerOut) { - t.Errorf("Unexpected mismatch. Expected %#v, Saw: %#v", controller, controllerOut) - } -} - func TestSyncronize(t *testing.T) { controllerSpec1 := api.ReplicationController{ JSONBase: api.JSONBase{APIVersion: "v1beta1"}, @@ -435,9 +300,14 @@ func TestSyncronize(t *testing.T) { func TestWatchControllers(t *testing.T) { fakeEtcd := tools.MakeFakeEtcdClient(t) + fakeWatcher := watch.NewFake() manager := MakeReplicationManager(fakeEtcd, nil) + manager.watchMaker = func() (watch.Interface, error) { + return fakeWatcher, nil + } + var testControllerSpec api.ReplicationController - received := make(chan bool) + received := make(chan struct{}) manager.syncHandler = func(controllerSpec api.ReplicationController) error { if !reflect.DeepEqual(controllerSpec, testControllerSpec) { t.Errorf("Expected %#v, but got %#v", testControllerSpec, controllerSpec) @@ -448,38 +318,13 @@ func TestWatchControllers(t *testing.T) { go manager.watchControllers() - fakeEtcd.WaitForWatchCompletion() - // Test normal case testControllerSpec.ID = "foo" - fakeEtcd.WatchResponse <- &etcd.Response{ - Action: "set", - Node: &etcd.Node{ - Value: util.MakeJSONString(testControllerSpec), - }, - } + fakeWatcher.Add(&testControllerSpec) select { case <-received: case <-time.After(10 * time.Millisecond): t.Errorf("Expected 1 call but got 0") } - - // Test error case - fakeEtcd.WatchInjectError <- fmt.Errorf("Injected error") - - // Did everything shut down? - if _, open := <-fakeEtcd.WatchResponse; open { - t.Errorf("An injected error did not cause a graceful shutdown") - } - - // Test purposeful shutdown - go manager.watchControllers() - fakeEtcd.WaitForWatchCompletion() - fakeEtcd.WatchStop <- true - - // Did everything shut down? - if _, open := <-fakeEtcd.WatchResponse; open { - t.Errorf("A stop did not cause a graceful shutdown") - } } diff --git a/pkg/registry/controller_registry.go b/pkg/registry/controller_registry.go index 39fd1ebedf4..de93c69d92f 100644 --- a/pkg/registry/controller_registry.go +++ b/pkg/registry/controller_registry.go @@ -17,6 +17,7 @@ limitations under the License. package registry import ( + "errors" "fmt" "time" @@ -24,6 +25,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) // ControllerRegistryStorage is an implementation of RESTStorage for the api server. @@ -135,3 +137,15 @@ func (storage *ControllerRegistryStorage) waitForController(ctrl api.Replication } return ctrl, nil } + +// WatchAll returns ReplicationController events via a watch.Interface, implementing +// apiserver.ResourceWatcher. +func (storage *ControllerRegistryStorage) WatchAll() (watch.Interface, error) { + return storage.registry.WatchControllers() +} + +// WatchSingle returns events for a single ReplicationController via a watch.Interface, +// implementing apiserver.ResourceWatcher. +func (storage *ControllerRegistryStorage) WatchSingle(id string) (watch.Interface, error) { + return nil, errors.New("unimplemented") +} diff --git a/pkg/registry/controller_registry_test.go b/pkg/registry/controller_registry_test.go index 8e9a46160df..b097dc1e895 100644 --- a/pkg/registry/controller_registry_test.go +++ b/pkg/registry/controller_registry_test.go @@ -26,6 +26,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) type MockControllerRegistry struct { @@ -51,6 +52,9 @@ func (registry *MockControllerRegistry) UpdateController(controller api.Replicat func (registry *MockControllerRegistry) DeleteController(ID string) error { return registry.err } +func (registry *MockControllerRegistry) WatchControllers() (watch.Interface, error) { + return nil, registry.err +} func TestListControllersError(t *testing.T) { mockRegistry := MockControllerRegistry{ @@ -267,13 +271,13 @@ func TestControllerStorageValidatesCreate(t *testing.T) { } failureCases := map[string]api.ReplicationController{ - "empty ID": api.ReplicationController{ + "empty ID": { JSONBase: api.JSONBase{ID: ""}, DesiredState: api.ReplicationControllerState{ ReplicaSelector: map[string]string{"bar": "baz"}, }, }, - "empty selector": api.ReplicationController{ + "empty selector": { JSONBase: api.JSONBase{ID: "abc"}, DesiredState: api.ReplicationControllerState{}, }, @@ -298,13 +302,13 @@ func TestControllerStorageValidatesUpdate(t *testing.T) { } failureCases := map[string]api.ReplicationController{ - "empty ID": api.ReplicationController{ + "empty ID": { JSONBase: api.JSONBase{ID: ""}, DesiredState: api.ReplicationControllerState{ ReplicaSelector: map[string]string{"bar": "baz"}, }, }, - "empty selector": api.ReplicationController{ + "empty selector": { JSONBase: api.JSONBase{ID: "abc"}, DesiredState: api.ReplicationControllerState{}, }, diff --git a/pkg/registry/etcd_registry.go b/pkg/registry/etcd_registry.go index 1c8075eea0c..f64c7eb544d 100644 --- a/pkg/registry/etcd_registry.go +++ b/pkg/registry/etcd_registry.go @@ -23,6 +23,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/golang/glog" ) @@ -207,6 +208,12 @@ func (registry *EtcdRegistry) ListControllers() ([]api.ReplicationController, er return controllers, err } +// WatchControllers begins watching for new, changed, or deleted controllers. +// TODO: Add id/selector parameters? +func (registry *EtcdRegistry) WatchControllers() (watch.Interface, error) { + return registry.helper().WatchList("/registry/controllers", tools.Everything) +} + func makeControllerKey(id string) string { return "/registry/controllers/" + id } diff --git a/pkg/registry/interfaces.go b/pkg/registry/interfaces.go index 4c52195625d..0dda241197d 100644 --- a/pkg/registry/interfaces.go +++ b/pkg/registry/interfaces.go @@ -19,6 +19,7 @@ package registry import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) // PodRegistry is an interface implemented by things that know how to store Pod objects. @@ -38,6 +39,7 @@ type PodRegistry interface { // ControllerRegistry is an interface for things that know how to store ReplicationControllers. type ControllerRegistry interface { ListControllers() ([]api.ReplicationController, error) + WatchControllers() (watch.Interface, error) GetController(controllerID string) (*api.ReplicationController, error) CreateController(controller api.ReplicationController) error UpdateController(controller api.ReplicationController) error diff --git a/pkg/registry/memory_registry.go b/pkg/registry/memory_registry.go index 51d7c5f34c6..59ed64d89ec 100644 --- a/pkg/registry/memory_registry.go +++ b/pkg/registry/memory_registry.go @@ -17,9 +17,12 @@ limitations under the License. package registry import ( + "errors" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) // An implementation of PodRegistry and ControllerRegistry that is backed by memory @@ -86,6 +89,10 @@ func (registry *MemoryRegistry) ListControllers() ([]api.ReplicationController, return result, nil } +func (registry *MemoryRegistry) WatchControllers() (watch.Interface, error) { + return nil, errors.New("unimplemented") +} + func (registry *MemoryRegistry) GetController(controllerID string) (*api.ReplicationController, error) { controller, found := registry.controllerData[controllerID] if found { diff --git a/pkg/tools/decoder.go b/pkg/tools/decoder.go index 5f50a1ece27..96bd73ab496 100644 --- a/pkg/tools/decoder.go +++ b/pkg/tools/decoder.go @@ -18,6 +18,7 @@ package tools import ( "encoding/json" + "fmt" "io" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -44,7 +45,14 @@ func NewAPIEventDecoder(stream io.ReadCloser) *APIEventDecoder { func (d *APIEventDecoder) Decode() (action watch.EventType, object interface{}, err error) { var got api.WatchEvent err = d.decoder.Decode(&got) - return got.Type, got.Object.Object, err + if err != nil { + return action, nil, err + } + switch got.Type { + case watch.Added, watch.Modified, watch.Deleted: + return got.Type, got.Object.Object, err + } + return action, nil, fmt.Errorf("got invalid watch event type: %v", got.Type) } // Close closes the underlying stream. diff --git a/pkg/tools/etcd_tools.go b/pkg/tools/etcd_tools.go index 2f7bce4ef5b..4753c9127bd 100644 --- a/pkg/tools/etcd_tools.go +++ b/pkg/tools/etcd_tools.go @@ -19,6 +19,7 @@ package tools import ( "fmt" "reflect" + "sync" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -267,23 +268,31 @@ type etcdWatcher struct { list bool // If we're doing a recursive watch, should be true. filter FilterFunc - etcdIncoming chan *etcd.Response - etcdStop chan bool + etcdIncoming chan *etcd.Response + etcdStop chan bool + etcdCallEnded chan struct{} outgoing chan watch.Event userStop chan struct{} + stopped bool + stopLock sync.Mutex + + // Injectable for testing. Send the event down the outgoing channel. + emit func(watch.Event) } // Returns a new etcdWatcher; if list is true, watch sub-nodes. func newEtcdWatcher(list bool, filter FilterFunc) *etcdWatcher { w := &etcdWatcher{ - list: list, - filter: filter, - etcdIncoming: make(chan *etcd.Response), - etcdStop: make(chan bool), - outgoing: make(chan watch.Event), - userStop: make(chan struct{}), + list: list, + filter: filter, + etcdIncoming: make(chan *etcd.Response), + etcdStop: make(chan bool), + etcdCallEnded: make(chan struct{}), + outgoing: make(chan watch.Event), + userStop: make(chan struct{}), } + w.emit = func(e watch.Event) { w.outgoing <- e } go w.translate() return w } @@ -292,11 +301,9 @@ func newEtcdWatcher(list bool, filter FilterFunc) *etcdWatcher { // as a goroutine. func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string) { defer util.HandleCrash() + defer close(w.etcdCallEnded) _, err := client.Watch(key, 0, w.list, w.etcdIncoming, w.etcdStop) - if err == etcd.ErrWatchStoppedByUser { - // etcd doesn't close the channel in this case. - close(w.etcdIncoming) - } else { + if err != etcd.ErrWatchStoppedByUser { glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, err) } } @@ -309,6 +316,8 @@ func (w *etcdWatcher) translate() { for { select { + case <-w.etcdCallEnded: + return case <-w.userStop: w.etcdStop <- true return @@ -324,7 +333,6 @@ func (w *etcdWatcher) translate() { func (w *etcdWatcher) sendResult(res *etcd.Response) { var action watch.EventType var data []byte - var nodes etcd.Nodes switch res.Action { case "set": if res.Node == nil { @@ -332,7 +340,6 @@ func (w *etcdWatcher) sendResult(res *etcd.Response) { return } data = []byte(res.Node.Value) - nodes = res.Node.Nodes // TODO: Is this conditional correct? if res.EtcdIndex > 0 { action = watch.Modified @@ -345,38 +352,23 @@ func (w *etcdWatcher) sendResult(res *etcd.Response) { return } data = []byte(res.PrevNode.Value) - nodes = res.PrevNode.Nodes action = watch.Deleted - } - - // If listing, we're interested in sub-nodes. - if w.list { - for _, n := range nodes { - obj, err := api.Decode([]byte(n.Value)) - if err != nil { - glog.Errorf("failure to decode api object: %#v", res) - continue - } - if w.filter != nil && !w.filter(obj) { - continue - } - w.outgoing <- watch.Event{ - Type: action, - Object: obj, - } - } + default: + glog.Errorf("unknown action: %v", res.Action) return } obj, err := api.Decode(data) if err != nil { - glog.Errorf("failure to decode api object: %#v", res) + glog.Errorf("failure to decode api object: '%v' from %#v", string(data), res) + // TODO: expose an error through watch.Interface? + w.Stop() return } - w.outgoing <- watch.Event{ + w.emit(watch.Event{ Type: action, Object: obj, - } + }) } // ResultChannel implements watch.Interface. @@ -386,5 +378,11 @@ func (w *etcdWatcher) ResultChan() <-chan watch.Event { // Stop implements watch.Interface. func (w *etcdWatcher) Stop() { - close(w.userStop) + w.stopLock.Lock() + defer w.stopLock.Unlock() + // Prevent double channel closes. + if !w.stopped { + w.stopped = true + close(w.userStop) + } } diff --git a/pkg/tools/etcd_tools_test.go b/pkg/tools/etcd_tools_test.go index 7b210b6fc5b..45babe45536 100644 --- a/pkg/tools/etcd_tools_test.go +++ b/pkg/tools/etcd_tools_test.go @@ -220,71 +220,7 @@ func TestAtomicUpdate(t *testing.T) { } func TestWatchInterpretation_ListAdd(t *testing.T) { - called := false w := newEtcdWatcher(true, func(interface{}) bool { - called = true - return true - }) - pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} - podBytes, _ := api.Encode(pod) - - go w.sendResult(&etcd.Response{ - Action: "set", - Node: &etcd.Node{ - Nodes: etcd.Nodes{ - { - Value: string(podBytes), - }, - }, - }, - }) - - got := <-w.outgoing - if e, a := watch.Added, got.Type; e != a { - t.Errorf("Expected %v, got %v", e, a) - } - if e, a := pod, got.Object; !reflect.DeepEqual(e, a) { - t.Errorf("Expected %v, got %v", e, a) - } - if !called { - t.Errorf("filter never called") - } -} - -func TestWatchInterpretation_ListDelete(t *testing.T) { - called := false - w := newEtcdWatcher(true, func(interface{}) bool { - called = true - return true - }) - pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} - podBytes, _ := api.Encode(pod) - - go w.sendResult(&etcd.Response{ - Action: "delete", - PrevNode: &etcd.Node{ - Nodes: etcd.Nodes{ - { - Value: string(podBytes), - }, - }, - }, - }) - - got := <-w.outgoing - if e, a := watch.Deleted, got.Type; e != a { - t.Errorf("Expected %v, got %v", e, a) - } - if e, a := pod, got.Object; !reflect.DeepEqual(e, a) { - t.Errorf("Expected %v, got %v", e, a) - } - if !called { - t.Errorf("filter never called") - } -} - -func TestWatchInterpretation_SingleAdd(t *testing.T) { - w := newEtcdWatcher(false, func(interface{}) bool { t.Errorf("unexpected filter call") return true }) @@ -307,8 +243,8 @@ func TestWatchInterpretation_SingleAdd(t *testing.T) { } } -func TestWatchInterpretation_SingleDelete(t *testing.T) { - w := newEtcdWatcher(false, func(interface{}) bool { +func TestWatchInterpretation_Delete(t *testing.T) { + w := newEtcdWatcher(true, func(interface{}) bool { t.Errorf("unexpected filter call") return true }) @@ -331,6 +267,49 @@ func TestWatchInterpretation_SingleDelete(t *testing.T) { } } +func TestWatchInterpretation_ResponseNotSet(t *testing.T) { + w := newEtcdWatcher(false, func(interface{}) bool { + t.Errorf("unexpected filter call") + return true + }) + w.emit = func(e watch.Event) { + t.Errorf("Unexpected emit: %v", e) + } + + w.sendResult(&etcd.Response{ + Action: "update", + }) +} + +func TestWatchInterpretation_ResponseNoNode(t *testing.T) { + w := newEtcdWatcher(false, func(interface{}) bool { + t.Errorf("unexpected filter call") + return true + }) + w.emit = func(e watch.Event) { + t.Errorf("Unexpected emit: %v", e) + } + w.sendResult(&etcd.Response{ + Action: "set", + }) +} + +func TestWatchInterpretation_ResponseBadData(t *testing.T) { + w := newEtcdWatcher(false, func(interface{}) bool { + t.Errorf("unexpected filter call") + return true + }) + w.emit = func(e watch.Event) { + t.Errorf("Unexpected emit: %v", e) + } + w.sendResult(&etcd.Response{ + Action: "set", + Node: &etcd.Node{ + Value: "foobar", + }, + }) +} + func TestWatch(t *testing.T) { fakeEtcd := MakeFakeEtcdClient(t) h := EtcdHelper{fakeEtcd} diff --git a/pkg/tools/fake_etcd_client.go b/pkg/tools/fake_etcd_client.go index 0bd88f53164..feac29b72f7 100644 --- a/pkg/tools/fake_etcd_client.go +++ b/pkg/tools/fake_etcd_client.go @@ -215,6 +215,9 @@ func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, if receiver == nil { return f.Get(prefix, false, recursive) + } else { + // Emulate etcd's behavior. (I think.) + defer close(receiver) } f.watchCompletedChan <- true @@ -222,8 +225,6 @@ func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, case <-stop: return nil, etcd.ErrWatchStoppedByUser case err := <-injectedError: - // Emulate etcd's behavior. - close(receiver) return nil, err } // Never get here.