Etcd watcher verification

To make sure the etcd watcher works, I changed the replication
controller to use watch.Interface. I made apiserver support watches on
controllers, so replicationController can be run only off of the
apiserver. I made sure all the etcd watch testing that used to be in
replicationController is now tested on the new etcd watcher in
pkg/tools/.
This commit is contained in:
Daniel Smith 2014-07-20 12:00:52 -07:00
parent aa454fea93
commit 928092e79e
13 changed files with 187 additions and 331 deletions

View File

@ -104,7 +104,10 @@ func startComponents(manifestURL string) (apiServerURL string) {
handler.delegate = m.ConstructHandler("/api/v1beta1") handler.delegate = m.ConstructHandler("/api/v1beta1")
controllerManager := controller.MakeReplicationManager(etcdClient, cl) controllerManager := controller.MakeReplicationManager(etcdClient, cl)
controllerManager.Run(1 * time.Second)
// Prove that controllerManager's watch works by making it not sync until after this
// test is over. (Hopefully we don't take 10 minutes!)
controllerManager.Run(10 * time.Minute)
// Kubelet (localhost) // Kubelet (localhost)
cfg1 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates) cfg1 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates)
@ -192,7 +195,12 @@ func runAtomicPutTest(c *client.Client) {
for { for {
glog.Infof("Starting to update (%s, %s)", l, v) glog.Infof("Starting to update (%s, %s)", l, v)
var tmpSvc api.Service var tmpSvc api.Service
err := c.Get().Path("services").Path(svc.ID).Do().Into(&tmpSvc) err := c.Get().
Path("services").
Path(svc.ID).
PollPeriod(100 * time.Millisecond).
Do().
Into(&tmpSvc)
if err != nil { if err != nil {
glog.Errorf("Error getting atomicService: %v", err) glog.Errorf("Error getting atomicService: %v", err)
continue continue

View File

@ -43,6 +43,7 @@ type RESTStorage interface {
// ResourceWatcher should be implemented by all RESTStorage objects that // ResourceWatcher should be implemented by all RESTStorage objects that
// want to offer the ability to watch for changes through the watch api. // want to offer the ability to watch for changes through the watch api.
type ResourceWatcher interface { type ResourceWatcher interface {
// TODO: take a query, like List, to filter out unwanted events.
WatchAll() (watch.Interface, error) WatchAll() (watch.Interface, error)
WatchSingle(id string) (watch.Interface, error) WatchSingle(id string) (watch.Interface, error)
} }

View File

@ -17,8 +17,6 @@ limitations under the License.
package controller package controller
import ( import (
"encoding/json"
"fmt"
"sync" "sync"
"time" "time"
@ -27,13 +25,14 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog" "github.com/golang/glog"
) )
// ReplicationManager is responsible for synchronizing ReplicationController objects stored in etcd // ReplicationManager is responsible for synchronizing ReplicationController objects stored in etcd
// with actual running pods. // with actual running pods.
// TODO: Remove the etcd dependency and re-factor in terms of a generic watch interface // TODO: Allow choice of switching between etcd/apiserver watching, or remove etcd references
// from this file completely.
type ReplicationManager struct { type ReplicationManager struct {
etcdClient tools.EtcdClient etcdClient tools.EtcdClient
kubeClient client.Interface kubeClient client.Interface
@ -42,6 +41,9 @@ type ReplicationManager struct {
// To allow injection of syncReplicationController for testing. // To allow injection of syncReplicationController for testing.
syncHandler func(controllerSpec api.ReplicationController) error syncHandler func(controllerSpec api.ReplicationController) error
// To allow injection of watch creation.
watchMaker func() (watch.Interface, error)
} }
// PodControlInterface is an interface that knows how to add or delete pods // PodControlInterface is an interface that knows how to add or delete pods
@ -60,6 +62,7 @@ type RealPodControl struct {
func (r RealPodControl) createReplica(controllerSpec api.ReplicationController) { func (r RealPodControl) createReplica(controllerSpec api.ReplicationController) {
labels := controllerSpec.DesiredState.PodTemplate.Labels labels := controllerSpec.DesiredState.PodTemplate.Labels
// TODO: don't fail to set this label just because the map isn't created.
if labels != nil { if labels != nil {
labels["replicationController"] = controllerSpec.ID labels["replicationController"] = controllerSpec.ID
} }
@ -86,9 +89,8 @@ func MakeReplicationManager(etcdClient tools.EtcdClient, kubeClient client.Inter
kubeClient: kubeClient, kubeClient: kubeClient,
}, },
} }
rm.syncHandler = func(controllerSpec api.ReplicationController) error { rm.syncHandler = rm.syncReplicationController
return rm.syncReplicationController(controllerSpec) rm.watchMaker = rm.makeAPIWatch
}
return rm return rm
} }
@ -98,69 +100,49 @@ func (rm *ReplicationManager) Run(period time.Duration) {
go util.Forever(func() { rm.watchControllers() }, period) go util.Forever(func() { rm.watchControllers() }, period)
} }
func (rm *ReplicationManager) watchControllers() { // makeEtcdWatch starts watching via etcd.
watchChannel := make(chan *etcd.Response) func (rm *ReplicationManager) makeEtcdWatch() (watch.Interface, error) {
stop := make(chan bool) helper := tools.EtcdHelper{rm.etcdClient}
// Ensure that the call to watch ends. return helper.WatchList("/registry/controllers", tools.Everything)
defer close(stop) }
go func() { // makeAPIWatch starts watching via the apiserver.
defer util.HandleCrash() func (rm *ReplicationManager) makeAPIWatch() (watch.Interface, error) {
_, err := rm.etcdClient.Watch("/registry/controllers", 0, true, watchChannel, stop) // TODO: Fix this ugly type assertion.
if err == etcd.ErrWatchStoppedByUser { return rm.kubeClient.(*client.Client).
close(watchChannel) Get().
} else { Path("watch").
glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, err) Path("replicationControllers").
Watch()
}
func (rm *ReplicationManager) watchControllers() {
watching, err := rm.watchMaker()
if err != nil {
glog.Errorf("Unexpected failure to watch: %v", err)
time.Sleep(5 * time.Second)
return
} }
}()
for { for {
select { select {
case <-rm.syncTime: case <-rm.syncTime:
rm.synchronize() rm.synchronize()
case watchResponse, open := <-watchChannel: case event, open := <-watching.ResultChan():
if !open || watchResponse == nil { if !open {
// watchChannel has been closed, or something else went // watchChannel has been closed, or something else went
// wrong with our etcd watch call. Let the util.Forever() // wrong with our etcd watch call. Let the util.Forever()
// that called us call us again. // that called us call us again.
return return
} }
glog.Infof("Got watch: %#v", watchResponse) glog.Infof("Got watch: %#v", event)
controller, err := rm.handleWatchResponse(watchResponse) if rc, ok := event.Object.(*api.ReplicationController); !ok {
if err != nil { glog.Errorf("unexpected object: %#v", event.Object)
glog.Errorf("Error handling data: %#v, %#v", err, watchResponse) } else {
continue rm.syncHandler(*rc)
}
rm.syncHandler(*controller)
} }
} }
} }
func (rm *ReplicationManager) handleWatchResponse(response *etcd.Response) (*api.ReplicationController, error) {
switch response.Action {
case "set":
if response.Node == nil {
return nil, fmt.Errorf("response node is null %#v", response)
}
var controllerSpec api.ReplicationController
if err := json.Unmarshal([]byte(response.Node.Value), &controllerSpec); err != nil {
return nil, err
}
return &controllerSpec, nil
case "delete":
// Ensure that the final state of a replication controller is applied before it is deleted.
// Otherwise, a replication controller could be modified and then deleted (for example, from 3 to 0
// replicas), and it would be non-deterministic which of its pods continued to exist.
if response.PrevNode == nil {
return nil, fmt.Errorf("previous node is null %#v", response)
}
var controllerSpec api.ReplicationController
if err := json.Unmarshal([]byte(response.PrevNode.Value), &controllerSpec); err != nil {
return nil, err
}
return &controllerSpec, nil
}
return nil, nil
} }
func (rm *ReplicationManager) filterActivePods(pods []api.Pod) []api.Pod { func (rm *ReplicationManager) filterActivePods(pods []api.Pod) []api.Pod {

View File

@ -29,6 +29,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/coreos/go-etcd/etcd" "github.com/coreos/go-etcd/etcd"
) )
@ -221,142 +222,6 @@ func TestCreateReplica(t *testing.T) {
} }
} }
func TestHandleWatchResponseNotSet(t *testing.T) {
body, _ := api.Encode(makePodList(2))
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
}
testServer := httptest.NewTLSServer(&fakeHandler)
client := client.New(testServer.URL, nil)
fakePodControl := FakePodControl{}
manager := MakeReplicationManager(nil, client)
manager.podControl = &fakePodControl
_, err := manager.handleWatchResponse(&etcd.Response{
Action: "update",
})
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
}
func TestHandleWatchResponseNoNode(t *testing.T) {
body, _ := api.Encode(makePodList(2))
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
}
testServer := httptest.NewTLSServer(&fakeHandler)
client := client.New(testServer.URL, nil)
fakePodControl := FakePodControl{}
manager := MakeReplicationManager(nil, client)
manager.podControl = &fakePodControl
_, err := manager.handleWatchResponse(&etcd.Response{
Action: "set",
})
if err == nil {
t.Error("Unexpected non-error")
}
}
func TestHandleWatchResponseBadData(t *testing.T) {
body, _ := api.Encode(makePodList(2))
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
}
testServer := httptest.NewTLSServer(&fakeHandler)
client := client.New(testServer.URL, nil)
fakePodControl := FakePodControl{}
manager := MakeReplicationManager(nil, client)
manager.podControl = &fakePodControl
_, err := manager.handleWatchResponse(&etcd.Response{
Action: "set",
Node: &etcd.Node{
Value: "foobar",
},
})
if err == nil {
t.Error("Unexpected non-error")
}
}
func TestHandleWatchResponse(t *testing.T) {
body, _ := api.Encode(makePodList(2))
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
}
testServer := httptest.NewTLSServer(&fakeHandler)
client := client.New(testServer.URL, nil)
fakePodControl := FakePodControl{}
manager := MakeReplicationManager(nil, client)
manager.podControl = &fakePodControl
controller := makeReplicationController(2)
// TODO: fixme when etcd uses Encode/Decode
data, err := json.Marshal(controller)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
controllerOut, err := manager.handleWatchResponse(&etcd.Response{
Action: "set",
Node: &etcd.Node{
Value: string(data),
},
})
if err != nil {
t.Errorf("Unexpected error: %#v", err)
}
if !reflect.DeepEqual(controller, *controllerOut) {
t.Errorf("Unexpected mismatch. Expected %#v, Saw: %#v", controller, controllerOut)
}
}
func TestHandleWatchResponseDelete(t *testing.T) {
body, _ := api.Encode(makePodList(2))
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
}
testServer := httptest.NewTLSServer(&fakeHandler)
client := client.New(testServer.URL, nil)
fakePodControl := FakePodControl{}
manager := MakeReplicationManager(nil, client)
manager.podControl = &fakePodControl
controller := makeReplicationController(2)
// TODO: fixme when etcd writing uses api.Encode
data, err := json.Marshal(controller)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
controllerOut, err := manager.handleWatchResponse(&etcd.Response{
Action: "delete",
PrevNode: &etcd.Node{
Value: string(data),
},
})
if err != nil {
t.Errorf("Unexpected error: %#v", err)
}
if !reflect.DeepEqual(controller, *controllerOut) {
t.Errorf("Unexpected mismatch. Expected %#v, Saw: %#v", controller, controllerOut)
}
}
func TestSyncronize(t *testing.T) { func TestSyncronize(t *testing.T) {
controllerSpec1 := api.ReplicationController{ controllerSpec1 := api.ReplicationController{
JSONBase: api.JSONBase{APIVersion: "v1beta1"}, JSONBase: api.JSONBase{APIVersion: "v1beta1"},
@ -435,9 +300,14 @@ func TestSyncronize(t *testing.T) {
func TestWatchControllers(t *testing.T) { func TestWatchControllers(t *testing.T) {
fakeEtcd := tools.MakeFakeEtcdClient(t) fakeEtcd := tools.MakeFakeEtcdClient(t)
fakeWatcher := watch.NewFake()
manager := MakeReplicationManager(fakeEtcd, nil) manager := MakeReplicationManager(fakeEtcd, nil)
manager.watchMaker = func() (watch.Interface, error) {
return fakeWatcher, nil
}
var testControllerSpec api.ReplicationController var testControllerSpec api.ReplicationController
received := make(chan bool) received := make(chan struct{})
manager.syncHandler = func(controllerSpec api.ReplicationController) error { manager.syncHandler = func(controllerSpec api.ReplicationController) error {
if !reflect.DeepEqual(controllerSpec, testControllerSpec) { if !reflect.DeepEqual(controllerSpec, testControllerSpec) {
t.Errorf("Expected %#v, but got %#v", testControllerSpec, controllerSpec) t.Errorf("Expected %#v, but got %#v", testControllerSpec, controllerSpec)
@ -448,38 +318,13 @@ func TestWatchControllers(t *testing.T) {
go manager.watchControllers() go manager.watchControllers()
fakeEtcd.WaitForWatchCompletion()
// Test normal case // Test normal case
testControllerSpec.ID = "foo" testControllerSpec.ID = "foo"
fakeEtcd.WatchResponse <- &etcd.Response{ fakeWatcher.Add(&testControllerSpec)
Action: "set",
Node: &etcd.Node{
Value: util.MakeJSONString(testControllerSpec),
},
}
select { select {
case <-received: case <-received:
case <-time.After(10 * time.Millisecond): case <-time.After(10 * time.Millisecond):
t.Errorf("Expected 1 call but got 0") t.Errorf("Expected 1 call but got 0")
} }
// Test error case
fakeEtcd.WatchInjectError <- fmt.Errorf("Injected error")
// Did everything shut down?
if _, open := <-fakeEtcd.WatchResponse; open {
t.Errorf("An injected error did not cause a graceful shutdown")
}
// Test purposeful shutdown
go manager.watchControllers()
fakeEtcd.WaitForWatchCompletion()
fakeEtcd.WatchStop <- true
// Did everything shut down?
if _, open := <-fakeEtcd.WatchResponse; open {
t.Errorf("A stop did not cause a graceful shutdown")
}
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package registry package registry
import ( import (
"errors"
"fmt" "fmt"
"time" "time"
@ -24,6 +25,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
) )
// ControllerRegistryStorage is an implementation of RESTStorage for the api server. // ControllerRegistryStorage is an implementation of RESTStorage for the api server.
@ -135,3 +137,15 @@ func (storage *ControllerRegistryStorage) waitForController(ctrl api.Replication
} }
return ctrl, nil return ctrl, nil
} }
// WatchAll returns ReplicationController events via a watch.Interface, implementing
// apiserver.ResourceWatcher.
func (storage *ControllerRegistryStorage) WatchAll() (watch.Interface, error) {
return storage.registry.WatchControllers()
}
// WatchSingle returns events for a single ReplicationController via a watch.Interface,
// implementing apiserver.ResourceWatcher.
func (storage *ControllerRegistryStorage) WatchSingle(id string) (watch.Interface, error) {
return nil, errors.New("unimplemented")
}

View File

@ -26,6 +26,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
) )
type MockControllerRegistry struct { type MockControllerRegistry struct {
@ -51,6 +52,9 @@ func (registry *MockControllerRegistry) UpdateController(controller api.Replicat
func (registry *MockControllerRegistry) DeleteController(ID string) error { func (registry *MockControllerRegistry) DeleteController(ID string) error {
return registry.err return registry.err
} }
func (registry *MockControllerRegistry) WatchControllers() (watch.Interface, error) {
return nil, registry.err
}
func TestListControllersError(t *testing.T) { func TestListControllersError(t *testing.T) {
mockRegistry := MockControllerRegistry{ mockRegistry := MockControllerRegistry{
@ -267,13 +271,13 @@ func TestControllerStorageValidatesCreate(t *testing.T) {
} }
failureCases := map[string]api.ReplicationController{ failureCases := map[string]api.ReplicationController{
"empty ID": api.ReplicationController{ "empty ID": {
JSONBase: api.JSONBase{ID: ""}, JSONBase: api.JSONBase{ID: ""},
DesiredState: api.ReplicationControllerState{ DesiredState: api.ReplicationControllerState{
ReplicaSelector: map[string]string{"bar": "baz"}, ReplicaSelector: map[string]string{"bar": "baz"},
}, },
}, },
"empty selector": api.ReplicationController{ "empty selector": {
JSONBase: api.JSONBase{ID: "abc"}, JSONBase: api.JSONBase{ID: "abc"},
DesiredState: api.ReplicationControllerState{}, DesiredState: api.ReplicationControllerState{},
}, },
@ -298,13 +302,13 @@ func TestControllerStorageValidatesUpdate(t *testing.T) {
} }
failureCases := map[string]api.ReplicationController{ failureCases := map[string]api.ReplicationController{
"empty ID": api.ReplicationController{ "empty ID": {
JSONBase: api.JSONBase{ID: ""}, JSONBase: api.JSONBase{ID: ""},
DesiredState: api.ReplicationControllerState{ DesiredState: api.ReplicationControllerState{
ReplicaSelector: map[string]string{"bar": "baz"}, ReplicaSelector: map[string]string{"bar": "baz"},
}, },
}, },
"empty selector": api.ReplicationController{ "empty selector": {
JSONBase: api.JSONBase{ID: "abc"}, JSONBase: api.JSONBase{ID: "abc"},
DesiredState: api.ReplicationControllerState{}, DesiredState: api.ReplicationControllerState{},
}, },

View File

@ -23,6 +23,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog" "github.com/golang/glog"
) )
@ -207,6 +208,12 @@ func (registry *EtcdRegistry) ListControllers() ([]api.ReplicationController, er
return controllers, err return controllers, err
} }
// WatchControllers begins watching for new, changed, or deleted controllers.
// TODO: Add id/selector parameters?
func (registry *EtcdRegistry) WatchControllers() (watch.Interface, error) {
return registry.helper().WatchList("/registry/controllers", tools.Everything)
}
func makeControllerKey(id string) string { func makeControllerKey(id string) string {
return "/registry/controllers/" + id return "/registry/controllers/" + id
} }

View File

@ -19,6 +19,7 @@ package registry
import ( import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
) )
// PodRegistry is an interface implemented by things that know how to store Pod objects. // PodRegistry is an interface implemented by things that know how to store Pod objects.
@ -38,6 +39,7 @@ type PodRegistry interface {
// ControllerRegistry is an interface for things that know how to store ReplicationControllers. // ControllerRegistry is an interface for things that know how to store ReplicationControllers.
type ControllerRegistry interface { type ControllerRegistry interface {
ListControllers() ([]api.ReplicationController, error) ListControllers() ([]api.ReplicationController, error)
WatchControllers() (watch.Interface, error)
GetController(controllerID string) (*api.ReplicationController, error) GetController(controllerID string) (*api.ReplicationController, error)
CreateController(controller api.ReplicationController) error CreateController(controller api.ReplicationController) error
UpdateController(controller api.ReplicationController) error UpdateController(controller api.ReplicationController) error

View File

@ -17,9 +17,12 @@ limitations under the License.
package registry package registry
import ( import (
"errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
) )
// An implementation of PodRegistry and ControllerRegistry that is backed by memory // An implementation of PodRegistry and ControllerRegistry that is backed by memory
@ -86,6 +89,10 @@ func (registry *MemoryRegistry) ListControllers() ([]api.ReplicationController,
return result, nil return result, nil
} }
func (registry *MemoryRegistry) WatchControllers() (watch.Interface, error) {
return nil, errors.New("unimplemented")
}
func (registry *MemoryRegistry) GetController(controllerID string) (*api.ReplicationController, error) { func (registry *MemoryRegistry) GetController(controllerID string) (*api.ReplicationController, error) {
controller, found := registry.controllerData[controllerID] controller, found := registry.controllerData[controllerID]
if found { if found {

View File

@ -18,6 +18,7 @@ package tools
import ( import (
"encoding/json" "encoding/json"
"fmt"
"io" "io"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -44,8 +45,15 @@ func NewAPIEventDecoder(stream io.ReadCloser) *APIEventDecoder {
func (d *APIEventDecoder) Decode() (action watch.EventType, object interface{}, err error) { func (d *APIEventDecoder) Decode() (action watch.EventType, object interface{}, err error) {
var got api.WatchEvent var got api.WatchEvent
err = d.decoder.Decode(&got) err = d.decoder.Decode(&got)
if err != nil {
return action, nil, err
}
switch got.Type {
case watch.Added, watch.Modified, watch.Deleted:
return got.Type, got.Object.Object, err return got.Type, got.Object.Object, err
} }
return action, nil, fmt.Errorf("got invalid watch event type: %v", got.Type)
}
// Close closes the underlying stream. // Close closes the underlying stream.
func (d *APIEventDecoder) Close() { func (d *APIEventDecoder) Close() {

View File

@ -19,6 +19,7 @@ package tools
import ( import (
"fmt" "fmt"
"reflect" "reflect"
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@ -269,9 +270,15 @@ type etcdWatcher struct {
etcdIncoming chan *etcd.Response etcdIncoming chan *etcd.Response
etcdStop chan bool etcdStop chan bool
etcdCallEnded chan struct{}
outgoing chan watch.Event outgoing chan watch.Event
userStop chan struct{} userStop chan struct{}
stopped bool
stopLock sync.Mutex
// Injectable for testing. Send the event down the outgoing channel.
emit func(watch.Event)
} }
// Returns a new etcdWatcher; if list is true, watch sub-nodes. // Returns a new etcdWatcher; if list is true, watch sub-nodes.
@ -281,9 +288,11 @@ func newEtcdWatcher(list bool, filter FilterFunc) *etcdWatcher {
filter: filter, filter: filter,
etcdIncoming: make(chan *etcd.Response), etcdIncoming: make(chan *etcd.Response),
etcdStop: make(chan bool), etcdStop: make(chan bool),
etcdCallEnded: make(chan struct{}),
outgoing: make(chan watch.Event), outgoing: make(chan watch.Event),
userStop: make(chan struct{}), userStop: make(chan struct{}),
} }
w.emit = func(e watch.Event) { w.outgoing <- e }
go w.translate() go w.translate()
return w return w
} }
@ -292,11 +301,9 @@ func newEtcdWatcher(list bool, filter FilterFunc) *etcdWatcher {
// as a goroutine. // as a goroutine.
func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string) { func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string) {
defer util.HandleCrash() defer util.HandleCrash()
defer close(w.etcdCallEnded)
_, err := client.Watch(key, 0, w.list, w.etcdIncoming, w.etcdStop) _, err := client.Watch(key, 0, w.list, w.etcdIncoming, w.etcdStop)
if err == etcd.ErrWatchStoppedByUser { if err != etcd.ErrWatchStoppedByUser {
// etcd doesn't close the channel in this case.
close(w.etcdIncoming)
} else {
glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, err) glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, err)
} }
} }
@ -309,6 +316,8 @@ func (w *etcdWatcher) translate() {
for { for {
select { select {
case <-w.etcdCallEnded:
return
case <-w.userStop: case <-w.userStop:
w.etcdStop <- true w.etcdStop <- true
return return
@ -324,7 +333,6 @@ func (w *etcdWatcher) translate() {
func (w *etcdWatcher) sendResult(res *etcd.Response) { func (w *etcdWatcher) sendResult(res *etcd.Response) {
var action watch.EventType var action watch.EventType
var data []byte var data []byte
var nodes etcd.Nodes
switch res.Action { switch res.Action {
case "set": case "set":
if res.Node == nil { if res.Node == nil {
@ -332,7 +340,6 @@ func (w *etcdWatcher) sendResult(res *etcd.Response) {
return return
} }
data = []byte(res.Node.Value) data = []byte(res.Node.Value)
nodes = res.Node.Nodes
// TODO: Is this conditional correct? // TODO: Is this conditional correct?
if res.EtcdIndex > 0 { if res.EtcdIndex > 0 {
action = watch.Modified action = watch.Modified
@ -345,38 +352,23 @@ func (w *etcdWatcher) sendResult(res *etcd.Response) {
return return
} }
data = []byte(res.PrevNode.Value) data = []byte(res.PrevNode.Value)
nodes = res.PrevNode.Nodes
action = watch.Deleted action = watch.Deleted
} default:
glog.Errorf("unknown action: %v", res.Action)
// If listing, we're interested in sub-nodes.
if w.list {
for _, n := range nodes {
obj, err := api.Decode([]byte(n.Value))
if err != nil {
glog.Errorf("failure to decode api object: %#v", res)
continue
}
if w.filter != nil && !w.filter(obj) {
continue
}
w.outgoing <- watch.Event{
Type: action,
Object: obj,
}
}
return return
} }
obj, err := api.Decode(data) obj, err := api.Decode(data)
if err != nil { if err != nil {
glog.Errorf("failure to decode api object: %#v", res) glog.Errorf("failure to decode api object: '%v' from %#v", string(data), res)
// TODO: expose an error through watch.Interface?
w.Stop()
return return
} }
w.outgoing <- watch.Event{ w.emit(watch.Event{
Type: action, Type: action,
Object: obj, Object: obj,
} })
} }
// ResultChannel implements watch.Interface. // ResultChannel implements watch.Interface.
@ -386,5 +378,11 @@ func (w *etcdWatcher) ResultChan() <-chan watch.Event {
// Stop implements watch.Interface. // Stop implements watch.Interface.
func (w *etcdWatcher) Stop() { func (w *etcdWatcher) Stop() {
w.stopLock.Lock()
defer w.stopLock.Unlock()
// Prevent double channel closes.
if !w.stopped {
w.stopped = true
close(w.userStop) close(w.userStop)
} }
}

View File

@ -220,71 +220,7 @@ func TestAtomicUpdate(t *testing.T) {
} }
func TestWatchInterpretation_ListAdd(t *testing.T) { func TestWatchInterpretation_ListAdd(t *testing.T) {
called := false
w := newEtcdWatcher(true, func(interface{}) bool { w := newEtcdWatcher(true, func(interface{}) bool {
called = true
return true
})
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
podBytes, _ := api.Encode(pod)
go w.sendResult(&etcd.Response{
Action: "set",
Node: &etcd.Node{
Nodes: etcd.Nodes{
{
Value: string(podBytes),
},
},
},
})
got := <-w.outgoing
if e, a := watch.Added, got.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
if e, a := pod, got.Object; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
if !called {
t.Errorf("filter never called")
}
}
func TestWatchInterpretation_ListDelete(t *testing.T) {
called := false
w := newEtcdWatcher(true, func(interface{}) bool {
called = true
return true
})
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
podBytes, _ := api.Encode(pod)
go w.sendResult(&etcd.Response{
Action: "delete",
PrevNode: &etcd.Node{
Nodes: etcd.Nodes{
{
Value: string(podBytes),
},
},
},
})
got := <-w.outgoing
if e, a := watch.Deleted, got.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
if e, a := pod, got.Object; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
if !called {
t.Errorf("filter never called")
}
}
func TestWatchInterpretation_SingleAdd(t *testing.T) {
w := newEtcdWatcher(false, func(interface{}) bool {
t.Errorf("unexpected filter call") t.Errorf("unexpected filter call")
return true return true
}) })
@ -307,8 +243,8 @@ func TestWatchInterpretation_SingleAdd(t *testing.T) {
} }
} }
func TestWatchInterpretation_SingleDelete(t *testing.T) { func TestWatchInterpretation_Delete(t *testing.T) {
w := newEtcdWatcher(false, func(interface{}) bool { w := newEtcdWatcher(true, func(interface{}) bool {
t.Errorf("unexpected filter call") t.Errorf("unexpected filter call")
return true return true
}) })
@ -331,6 +267,49 @@ func TestWatchInterpretation_SingleDelete(t *testing.T) {
} }
} }
func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
w := newEtcdWatcher(false, func(interface{}) bool {
t.Errorf("unexpected filter call")
return true
})
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
w.sendResult(&etcd.Response{
Action: "update",
})
}
func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
w := newEtcdWatcher(false, func(interface{}) bool {
t.Errorf("unexpected filter call")
return true
})
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
w.sendResult(&etcd.Response{
Action: "set",
})
}
func TestWatchInterpretation_ResponseBadData(t *testing.T) {
w := newEtcdWatcher(false, func(interface{}) bool {
t.Errorf("unexpected filter call")
return true
})
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
w.sendResult(&etcd.Response{
Action: "set",
Node: &etcd.Node{
Value: "foobar",
},
})
}
func TestWatch(t *testing.T) { func TestWatch(t *testing.T) {
fakeEtcd := MakeFakeEtcdClient(t) fakeEtcd := MakeFakeEtcdClient(t)
h := EtcdHelper{fakeEtcd} h := EtcdHelper{fakeEtcd}

View File

@ -215,6 +215,9 @@ func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool,
if receiver == nil { if receiver == nil {
return f.Get(prefix, false, recursive) return f.Get(prefix, false, recursive)
} else {
// Emulate etcd's behavior. (I think.)
defer close(receiver)
} }
f.watchCompletedChan <- true f.watchCompletedChan <- true
@ -222,8 +225,6 @@ func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool,
case <-stop: case <-stop:
return nil, etcd.ErrWatchStoppedByUser return nil, etcd.ErrWatchStoppedByUser
case err := <-injectedError: case err := <-injectedError:
// Emulate etcd's behavior.
close(receiver)
return nil, err return nil, err
} }
// Never get here. // Never get here.