diff --git a/pkg/controller/doc.go b/pkg/controller/doc.go index 1b67b41ca82..8a708ad5d1c 100644 --- a/pkg/controller/doc.go +++ b/pkg/controller/doc.go @@ -14,6 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package controller contains logic for watching and syncronizing +// Package controller contains logic for watching and synchronizing // replicationControllers. package controller diff --git a/pkg/controller/replication_controller.go b/pkg/controller/replication_controller.go index 90479489f01..39691cfbdc0 100644 --- a/pkg/controller/replication_controller.go +++ b/pkg/controller/replication_controller.go @@ -38,6 +38,9 @@ type ReplicationManager struct { kubeClient client.ClientInterface podControl PodControlInterface syncTime <-chan time.Time + + // To allow injection of syncReplicationController for testing. + syncHandler func(controllerSpec api.ReplicationController) error } // An interface that knows how to add or delete pods @@ -74,13 +77,17 @@ func (r RealPodControl) deletePod(podID string) error { } func MakeReplicationManager(etcdClient util.EtcdClient, kubeClient client.ClientInterface) *ReplicationManager { - return &ReplicationManager{ + rm := &ReplicationManager{ kubeClient: kubeClient, etcdClient: etcdClient, podControl: RealPodControl{ kubeClient: kubeClient, }, } + rm.syncHandler = func(controllerSpec api.ReplicationController) error { + return rm.syncReplicationController(controllerSpec) + } + return rm } // Begin watching and syncing. @@ -91,35 +98,40 @@ func (rm *ReplicationManager) Run(period time.Duration) { func (rm *ReplicationManager) watchControllers() { watchChannel := make(chan *etcd.Response) + stop := make(chan bool) + defer func() { + // Ensure that the call to watch ends. + close(stop) + }() go func() { defer util.HandleCrash() defer func() { close(watchChannel) }() - rm.etcdClient.Watch("/registry/controllers", 0, true, watchChannel, nil) + _, err := rm.etcdClient.Watch("/registry/controllers", 0, true, watchChannel, stop) + if err != etcd.ErrWatchStoppedByUser { + log.Printf("etcd.Watch stopped unexpectedly: %v (%#v)", err, err) + } }() for { select { case <-rm.syncTime: rm.synchronize() - case watchResponse, ok := <-watchChannel: - if !ok { - // watchChannel has been closed. Let the util.Forever() that - // called us call us again. + case watchResponse, open := <-watchChannel: + if !open || watchResponse == nil { + // watchChannel has been closed, or something else went + // wrong with our etcd watch call. Let the util.Forever() + // that called us call us again. return } - if watchResponse == nil { - time.Sleep(time.Second * 10) - continue - } log.Printf("Got watch: %#v", watchResponse) controller, err := rm.handleWatchResponse(watchResponse) if err != nil { log.Printf("Error handling data: %#v, %#v", err, watchResponse) continue } - rm.syncReplicationController(*controller) + rm.syncHandler(*controller) } } } @@ -179,9 +191,10 @@ func (rm *ReplicationManager) synchronize() { err := helper.ExtractList("/registry/controllers", &controllerSpecs) if err != nil { log.Printf("Synchronization error: %v (%#v)", err, err) + return } for _, controllerSpec := range controllerSpecs { - err = rm.syncReplicationController(controllerSpec) + err = rm.syncHandler(controllerSpec) if err != nil { log.Printf("Error synchronizing: %#v", err) } diff --git a/pkg/controller/replication_controller_test.go b/pkg/controller/replication_controller_test.go index 696af15960a..a7e6f621b80 100644 --- a/pkg/controller/replication_controller_test.go +++ b/pkg/controller/replication_controller_test.go @@ -22,6 +22,7 @@ import ( "net/http/httptest" "reflect" "testing" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" @@ -393,3 +394,76 @@ func TestSyncronize(t *testing.T) { validateSyncReplication(t, &fakePodControl, 7, 0) } + +type asyncTimeout struct { + doneChan chan bool +} + +func beginTimeout(d time.Duration) *asyncTimeout { + a := &asyncTimeout{doneChan: make(chan bool)} + go func() { + select { + case <-a.doneChan: + return + case <-time.After(d): + panic("Timeout expired!") + } + }() + return a +} + +func (a *asyncTimeout) done() { + close(a.doneChan) +} + +func TestWatchControllers(t *testing.T) { + defer beginTimeout(20 * time.Second).done() + fakeEtcd := util.MakeFakeEtcdClient(t) + manager := MakeReplicationManager(fakeEtcd, nil) + var testControllerSpec api.ReplicationController + receivedCount := 0 + manager.syncHandler = func(controllerSpec api.ReplicationController) error { + if !reflect.DeepEqual(controllerSpec, testControllerSpec) { + t.Errorf("Expected %#v, but got %#v", testControllerSpec, controllerSpec) + } + receivedCount++ + return nil + } + + go manager.watchControllers() + time.Sleep(10 * time.Millisecond) + + // Test normal case + testControllerSpec.ID = "foo" + fakeEtcd.WatchResponse <- &etcd.Response{ + Action: "set", + Node: &etcd.Node{ + Value: util.MakeJSONString(testControllerSpec), + }, + } + + time.Sleep(10 * time.Millisecond) + if receivedCount != 1 { + t.Errorf("Expected 1 call but got %v", receivedCount) + } + + // Test error case + fakeEtcd.WatchInjectError <- fmt.Errorf("Injected error") + time.Sleep(10 * time.Millisecond) + + // Did everything shut down? + if _, open := <-fakeEtcd.WatchResponse; open { + t.Errorf("An injected error did not cause a graceful shutdown") + } + + // Test purposeful shutdown + go manager.watchControllers() + time.Sleep(10 * time.Millisecond) + fakeEtcd.WatchStop <- true + time.Sleep(10 * time.Millisecond) + + // Did everything shut down? + if _, open := <-fakeEtcd.WatchResponse; open { + t.Errorf("A stop did not cause a graceful shutdown") + } +} diff --git a/pkg/util/etcd_tools.go b/pkg/util/etcd_tools.go index 55a4dd897b1..e97903c4aec 100644 --- a/pkg/util/etcd_tools.go +++ b/pkg/util/etcd_tools.go @@ -94,25 +94,19 @@ func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}) error { // empty responses and nil response nodes exactly like a not found error. func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) error { response, err := h.Client.Get(key, false, false) - returnZero := false - if err != nil { - if ignoreNotFound && IsEtcdNotFound(err) { - returnZero = true - } else { + + if err != nil && !IsEtcdNotFound(err) { + return err + } + if err != nil || response.Node == nil || len(response.Node.Value) == 0 { + if ignoreNotFound { + pv := reflect.ValueOf(objPtr) + pv.Elem().Set(reflect.Zero(pv.Type().Elem())) + return nil + } else if err != nil { return err } - } - if !returnZero && (response.Node == nil || len(response.Node.Value) == 0) { - if ignoreNotFound { - returnZero = true - } else { - return fmt.Errorf("key '%v' found no nodes field: %#v", key, response) - } - } - if returnZero { - pv := reflect.ValueOf(objPtr) - pv.Elem().Set(reflect.Zero(pv.Type().Elem())) - return nil + return fmt.Errorf("key '%v' found no nodes field: %#v", key, response) } return json.Unmarshal([]byte(response.Node.Value), objPtr) } diff --git a/pkg/util/fake_etcd_client.go b/pkg/util/fake_etcd_client.go index d98bb500c24..56e023fa859 100644 --- a/pkg/util/fake_etcd_client.go +++ b/pkg/util/fake_etcd_client.go @@ -45,6 +45,13 @@ type FakeEtcdClient struct { Err error t *testing.T Ix int + + // Will become valid after Watch is called; tester may write to it. Tester may + // also read from it to verify that it's closed after injecting an error. + WatchResponse chan *etcd.Response + // Write to this to prematurely stop a Watch that is running in a goroutine. + WatchInjectError chan<- error + WatchStop chan<- bool } func MakeFakeEtcdClient(t *testing.T) *FakeEtcdClient { @@ -88,5 +95,17 @@ func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, err } func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) { - return nil, fmt.Errorf("unimplemented") + f.WatchResponse = receiver + f.WatchStop = stop + injectedError := make(chan error) + defer close(injectedError) + f.WatchInjectError = injectedError + select { + case <-stop: + return nil, etcd.ErrWatchStoppedByUser + case err := <-injectedError: + return nil, err + } + // Never get here. + return nil, nil }