From 71709ae09e2077073e823b96b2bf6166d79ea6e3 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Tue, 5 Aug 2014 16:20:50 -0700 Subject: [PATCH] Make replication controller use client --- pkg/controller/replication_controller.go | 25 ++++++++----------- pkg/controller/replication_controller_test.go | 25 ++++++++++++------- pkg/registry/etcdregistry.go | 2 +- 3 files changed, 28 insertions(+), 24 deletions(-) diff --git a/pkg/controller/replication_controller.go b/pkg/controller/replication_controller.go index d9845c59d1f..64367b7f7ed 100644 --- a/pkg/controller/replication_controller.go +++ b/pkg/controller/replication_controller.go @@ -85,28 +85,23 @@ func MakeReplicationManager(kubeClient client.Interface) *ReplicationManager { }, } rm.syncHandler = rm.syncReplicationController - rm.watchMaker = rm.makeAPIWatch return rm } // Run begins watching and syncing. func (rm *ReplicationManager) Run(period time.Duration) { rm.syncTime = time.Tick(period) - go util.Forever(func() { rm.watchControllers() }, period) + index := uint64(0) + go util.Forever(func() { rm.watchControllers(&index) }, period) } -// makeAPIWatch starts watching via the apiserver. -func (rm *ReplicationManager) makeAPIWatch() (watch.Interface, error) { - // TODO: Fix this ugly type assertion. - return rm.kubeClient.(*client.Client). - Get(). - Path("watch"). - Path("replicationControllers"). - Watch() -} - -func (rm *ReplicationManager) watchControllers() { - watching, err := rm.watchMaker() +// index is a pointer to the resource version to use/update. +func (rm *ReplicationManager) watchControllers(index *uint64) { + watching, err := rm.kubeClient.WatchReplicationControllers( + labels.Everything(), + labels.Everything(), + *index, + ) if err != nil { glog.Errorf("Unexpected failure to watch: %v", err) time.Sleep(5 * time.Second) @@ -128,6 +123,8 @@ func (rm *ReplicationManager) watchControllers() { if rc, ok := event.Object.(*api.ReplicationController); !ok { glog.Errorf("unexpected object: %#v", event.Object) } else { + // If we get disconnected, start where we left off. + *index = rc.ResourceVersion + 1 rm.syncHandler(*rc) } } diff --git a/pkg/controller/replication_controller_test.go b/pkg/controller/replication_controller_test.go index bebaa948202..b62d5fb70ab 100644 --- a/pkg/controller/replication_controller_test.go +++ b/pkg/controller/replication_controller_test.go @@ -28,6 +28,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" @@ -305,7 +306,7 @@ func TestSyncronize(t *testing.T) { w.WriteHeader(http.StatusNotFound) t.Errorf("Unexpected request for %v", req.RequestURI) }) - testServer := httptest.NewTLSServer(mux) + testServer := httptest.NewServer(mux) client := client.New(testServer.URL, nil) manager := MakeReplicationManager(client) fakePodControl := FakePodControl{} @@ -316,13 +317,18 @@ func TestSyncronize(t *testing.T) { validateSyncReplication(t, &fakePodControl, 7, 0) } -func TestWatchControllers(t *testing.T) { - fakeWatcher := watch.NewFake() - manager := MakeReplicationManager(nil) - manager.watchMaker = func() (watch.Interface, error) { - return fakeWatcher, nil - } +type FakeWatcher struct { + w *watch.FakeWatcher + *client.FakeClient +} +func (fw FakeWatcher) WatchReplicationControllers(l, f labels.Selector, rv uint64) (watch.Interface, error) { + return fw.w, nil +} + +func TestWatchControllers(t *testing.T) { + client := FakeWatcher{watch.NewFake(), &client.FakeClient{}} + manager := MakeReplicationManager(client) var testControllerSpec api.ReplicationController received := make(chan struct{}) manager.syncHandler = func(controllerSpec api.ReplicationController) error { @@ -333,11 +339,12 @@ func TestWatchControllers(t *testing.T) { return nil } - go manager.watchControllers() + index := uint64(0) + go manager.watchControllers(&index) // Test normal case testControllerSpec.ID = "foo" - fakeWatcher.Add(&testControllerSpec) + client.w.Add(&testControllerSpec) select { case <-received: diff --git a/pkg/registry/etcdregistry.go b/pkg/registry/etcdregistry.go index fa50ee4353e..d4cc759fcb6 100644 --- a/pkg/registry/etcdregistry.go +++ b/pkg/registry/etcdregistry.go @@ -206,7 +206,7 @@ func (registry *EtcdRegistry) ListControllers() ([]api.ReplicationController, er // WatchControllers begins watching for new, changed, or deleted controllers. func (registry *EtcdRegistry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { - if field.String() != "" { + if !field.Empty() { return nil, fmt.Errorf("no field selector implemented for controllers") } return registry.helper.WatchList("/registry/controllers", resourceVersion, func(obj interface{}) bool {