diff --git a/cmd/controller-manager/controller-manager.go b/cmd/controller-manager/controller-manager.go index 8781876bca5..7b24f1a4290 100644 --- a/cmd/controller-manager/controller-manager.go +++ b/cmd/controller-manager/controller-manager.go @@ -28,8 +28,7 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" - "github.com/GoogleCloudPlatform/kubernetes/pkg/registry" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" "github.com/coreos/go-etcd/etcd" ) @@ -48,12 +47,11 @@ func main() { // Set up logger for etcd client etcd.SetLogger(log.New(os.Stderr, "etcd ", log.LstdFlags)) - controllerManager := registry.MakeReplicationManager(etcd.NewClient([]string{*etcd_servers}), + controllerManager := controller.MakeReplicationManager(etcd.NewClient([]string{*etcd_servers}), client.Client{ Host: "http://" + *master, }) - go util.Forever(func() { controllerManager.Synchronize() }, 20*time.Second) - go util.Forever(func() { controllerManager.WatchControllers() }, 20*time.Second) + controllerManager.Run(10 * time.Second) select {} } diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index ac3967ad1ba..62661145779 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -28,6 +28,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry" "github.com/coreos/go-etcd/etcd" ) @@ -48,13 +49,12 @@ func main() { }, "/api/v1beta1") server := httptest.NewServer(apiserver) - controllerManager := registry.MakeReplicationManager(etcd.NewClient(servers), + controllerManager := controller.MakeReplicationManager(etcd.NewClient(servers), client.Client{ Host: server.URL, }) - go controllerManager.Synchronize() - go controllerManager.WatchControllers() + controllerManager.Run(10 * time.Second) // Ok. we're good to go. log.Printf("API Server started on %s", server.URL) diff --git a/cmd/localkube/localkube.go b/cmd/localkube/localkube.go index 6bd25e9d81e..5f201162194 100644 --- a/cmd/localkube/localkube.go +++ b/cmd/localkube/localkube.go @@ -29,10 +29,9 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/master" - "github.com/GoogleCloudPlatform/kubernetes/pkg/registry" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/coreos/go-etcd/etcd" "github.com/fsouza/go-dockerclient" ) @@ -86,13 +85,12 @@ func api_server() { // Starts up a controller manager. Never returns. func controller_manager() { - controllerManager := registry.MakeReplicationManager(etcd.NewClient([]string{*etcd_server}), + controllerManager := controller.MakeReplicationManager(etcd.NewClient([]string{*etcd_server}), client.Client{ Host: fmt.Sprintf("http://%s:%d", *master_address, *master_port), }) - go util.Forever(func() { controllerManager.Synchronize() }, 20*time.Second) - go util.Forever(func() { controllerManager.WatchControllers() }, 20*time.Second) + controllerManager.Run(20 * time.Second) select {} } diff --git a/pkg/cloudcfg/cloudcfg_test.go b/pkg/cloudcfg/cloudcfg_test.go index 7b0142e7282..de23e61216b 100644 --- a/pkg/cloudcfg/cloudcfg_test.go +++ b/pkg/cloudcfg/cloudcfg_test.go @@ -166,7 +166,7 @@ func TestDoRequest(t *testing.T) { if body != expectedBody { t.Errorf("Expected body: '%s', saw: '%s'", expectedBody, body) } - fakeHandler.ValidateRequest(t, "/foo/bar", "GET", &fakeHandler.ResponseBody) + fakeHandler.ValidateRequest(t, "/foo/bar", "GET", nil) } func TestRunController(t *testing.T) { diff --git a/pkg/controller/doc.go b/pkg/controller/doc.go new file mode 100644 index 00000000000..8a708ad5d1c --- /dev/null +++ b/pkg/controller/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package controller contains logic for watching and synchronizing +// replicationControllers. +package controller diff --git a/pkg/registry/replication_controller.go b/pkg/controller/replication_controller.go similarity index 64% rename from pkg/registry/replication_controller.go rename to pkg/controller/replication_controller.go index 3cdd2518609..39691cfbdc0 100644 --- a/pkg/registry/replication_controller.go +++ b/pkg/controller/replication_controller.go @@ -13,7 +13,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -package registry + +package controller import ( "encoding/json" @@ -21,7 +22,6 @@ import ( "log" "math/rand" "strings" - "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -34,10 +34,13 @@ import ( // with actual running pods. // TODO: Remove the etcd dependency and re-factor in terms of a generic watch interface type ReplicationManager struct { - etcdClient *etcd.Client + etcdClient util.EtcdClient kubeClient client.ClientInterface podControl PodControlInterface - updateLock sync.Mutex + syncTime <-chan time.Time + + // To allow injection of syncReplicationController for testing. + syncHandler func(controllerSpec api.ReplicationController) error } // An interface that knows how to add or delete pods @@ -73,44 +76,63 @@ func (r RealPodControl) deletePod(podID string) error { return r.kubeClient.DeletePod(podID) } -func MakeReplicationManager(etcdClient *etcd.Client, kubeClient client.ClientInterface) *ReplicationManager { - return &ReplicationManager{ +func MakeReplicationManager(etcdClient util.EtcdClient, kubeClient client.ClientInterface) *ReplicationManager { + rm := &ReplicationManager{ kubeClient: kubeClient, etcdClient: etcdClient, podControl: RealPodControl{ kubeClient: kubeClient, }, } + rm.syncHandler = func(controllerSpec api.ReplicationController) error { + return rm.syncReplicationController(controllerSpec) + } + return rm } -func (rm *ReplicationManager) WatchControllers() { +// Begin watching and syncing. +func (rm *ReplicationManager) Run(period time.Duration) { + rm.syncTime = time.Tick(period) + go util.Forever(func() { rm.watchControllers() }, period) +} + +func (rm *ReplicationManager) watchControllers() { watchChannel := make(chan *etcd.Response) + stop := make(chan bool) + defer func() { + // Ensure that the call to watch ends. + close(stop) + }() go func() { defer util.HandleCrash() defer func() { close(watchChannel) }() - rm.etcdClient.Watch("/registry/controllers", 0, true, watchChannel, nil) + _, err := rm.etcdClient.Watch("/registry/controllers", 0, true, watchChannel, stop) + if err != etcd.ErrWatchStoppedByUser { + log.Printf("etcd.Watch stopped unexpectedly: %v (%#v)", err, err) + } }() for { - watchResponse, ok := <-watchChannel - if !ok { - // watchChannel has been closed. Let the util.Forever() that - // called us call us again. - return + select { + case <-rm.syncTime: + rm.synchronize() + case watchResponse, open := <-watchChannel: + if !open || watchResponse == nil { + // watchChannel has been closed, or something else went + // wrong with our etcd watch call. Let the util.Forever() + // that called us call us again. + return + } + log.Printf("Got watch: %#v", watchResponse) + controller, err := rm.handleWatchResponse(watchResponse) + if err != nil { + log.Printf("Error handling data: %#v, %#v", err, watchResponse) + continue + } + rm.syncHandler(*controller) } - if watchResponse == nil { - time.Sleep(time.Second * 10) - continue - } - log.Printf("Got watch: %#v", watchResponse) - controller, err := rm.handleWatchResponse(watchResponse) - if err != nil { - log.Printf("Error handling data: %#v, %#v", err, watchResponse) - continue - } - rm.syncReplicationController(*controller) } } @@ -141,7 +163,6 @@ func (rm *ReplicationManager) filterActivePods(pods []api.Pod) []api.Pod { } func (rm *ReplicationManager) syncReplicationController(controllerSpec api.ReplicationController) error { - rm.updateLock.Lock() podList, err := rm.kubeClient.ListPods(controllerSpec.DesiredState.ReplicasInSet) if err != nil { return err @@ -161,38 +182,21 @@ func (rm *ReplicationManager) syncReplicationController(controllerSpec api.Repli rm.podControl.deletePod(filteredList[i].ID) } } - rm.updateLock.Unlock() return nil } -func (rm *ReplicationManager) Synchronize() { - for { - response, err := rm.etcdClient.Get("/registry/controllers", false, false) +func (rm *ReplicationManager) synchronize() { + var controllerSpecs []api.ReplicationController + helper := util.EtcdHelper{rm.etcdClient} + err := helper.ExtractList("/registry/controllers", &controllerSpecs) + if err != nil { + log.Printf("Synchronization error: %v (%#v)", err, err) + return + } + for _, controllerSpec := range controllerSpecs { + err = rm.syncHandler(controllerSpec) if err != nil { - log.Printf("Synchronization error %#v", err) + log.Printf("Error synchronizing: %#v", err) } - // TODO(bburns): There is a race here, if we get a version of the controllers, and then it is - // updated, its possible that the watch will pick up the change first, and then we will execute - // using the old version of the controller. - // Probably the correct thing to do is to use the version number in etcd to detect when - // we are stale. - // Punting on this for now, but this could lead to some nasty bugs, so we should really fix it - // sooner rather than later. - if response != nil && response.Node != nil && response.Node.Nodes != nil { - for _, value := range response.Node.Nodes { - var controllerSpec api.ReplicationController - err := json.Unmarshal([]byte(value.Value), &controllerSpec) - if err != nil { - log.Printf("Unexpected error: %#v", err) - continue - } - log.Printf("Synchronizing %s\n", controllerSpec.ID) - err = rm.syncReplicationController(controllerSpec) - if err != nil { - log.Printf("Error synchronizing: %#v", err) - } - } - } - time.Sleep(10 * time.Second) } } diff --git a/pkg/registry/replication_controller_test.go b/pkg/controller/replication_controller_test.go similarity index 68% rename from pkg/registry/replication_controller_test.go rename to pkg/controller/replication_controller_test.go index 2bbbced8ba8..a7e6f621b80 100644 --- a/pkg/registry/replication_controller_test.go +++ b/pkg/controller/replication_controller_test.go @@ -13,7 +13,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -package registry + +package controller import ( "encoding/json" @@ -21,6 +22,7 @@ import ( "net/http/httptest" "reflect" "testing" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" @@ -31,6 +33,13 @@ import ( // TODO: Move this to a common place, it's needed in multiple tests. var apiPath = "/api/v1beta1" +// TODO: This doesn't reduce typing enough to make it worth the less readable errors. Remove. +func expectNoError(t *testing.T, err error) { + if err != nil { + t.Errorf("Unexpected error: %#v", err) + } +} + func makeUrl(suffix string) string { return apiPath + suffix } @@ -309,3 +318,152 @@ func TestHandleWatchResponse(t *testing.T) { t.Errorf("Unexpected mismatch. Expected %#v, Saw: %#v", controller, controllerOut) } } + +func TestSyncronize(t *testing.T) { + controllerSpec1 := api.ReplicationController{ + DesiredState: api.ReplicationControllerState{ + Replicas: 4, + PodTemplate: api.PodTemplate{ + DesiredState: api.PodState{ + Manifest: api.ContainerManifest{ + Containers: []api.Container{ + { + Image: "foo/bar", + }, + }, + }, + }, + Labels: map[string]string{ + "name": "foo", + "type": "production", + }, + }, + }, + } + controllerSpec2 := api.ReplicationController{ + DesiredState: api.ReplicationControllerState{ + Replicas: 3, + PodTemplate: api.PodTemplate{ + DesiredState: api.PodState{ + Manifest: api.ContainerManifest{ + Containers: []api.Container{ + { + Image: "bar/baz", + }, + }, + }, + }, + Labels: map[string]string{ + "name": "bar", + "type": "production", + }, + }, + }, + } + + fakeEtcd := util.MakeFakeEtcdClient(t) + fakeEtcd.Data["/registry/controllers"] = util.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Nodes: []*etcd.Node{ + { + Value: util.MakeJSONString(controllerSpec1), + }, + { + Value: util.MakeJSONString(controllerSpec2), + }, + }, + }, + }, + } + + fakeHandler := util.FakeHandler{ + StatusCode: 200, + ResponseBody: "{}", + T: t, + } + testServer := httptest.NewTLSServer(&fakeHandler) + client := client.Client{ + Host: testServer.URL, + } + manager := MakeReplicationManager(fakeEtcd, client) + fakePodControl := FakePodControl{} + manager.podControl = &fakePodControl + + manager.synchronize() + + validateSyncReplication(t, &fakePodControl, 7, 0) +} + +type asyncTimeout struct { + doneChan chan bool +} + +func beginTimeout(d time.Duration) *asyncTimeout { + a := &asyncTimeout{doneChan: make(chan bool)} + go func() { + select { + case <-a.doneChan: + return + case <-time.After(d): + panic("Timeout expired!") + } + }() + return a +} + +func (a *asyncTimeout) done() { + close(a.doneChan) +} + +func TestWatchControllers(t *testing.T) { + defer beginTimeout(20 * time.Second).done() + fakeEtcd := util.MakeFakeEtcdClient(t) + manager := MakeReplicationManager(fakeEtcd, nil) + var testControllerSpec api.ReplicationController + receivedCount := 0 + manager.syncHandler = func(controllerSpec api.ReplicationController) error { + if !reflect.DeepEqual(controllerSpec, testControllerSpec) { + t.Errorf("Expected %#v, but got %#v", testControllerSpec, controllerSpec) + } + receivedCount++ + return nil + } + + go manager.watchControllers() + time.Sleep(10 * time.Millisecond) + + // Test normal case + testControllerSpec.ID = "foo" + fakeEtcd.WatchResponse <- &etcd.Response{ + Action: "set", + Node: &etcd.Node{ + Value: util.MakeJSONString(testControllerSpec), + }, + } + + time.Sleep(10 * time.Millisecond) + if receivedCount != 1 { + t.Errorf("Expected 1 call but got %v", receivedCount) + } + + // Test error case + fakeEtcd.WatchInjectError <- fmt.Errorf("Injected error") + time.Sleep(10 * time.Millisecond) + + // Did everything shut down? + if _, open := <-fakeEtcd.WatchResponse; open { + t.Errorf("An injected error did not cause a graceful shutdown") + } + + // Test purposeful shutdown + go manager.watchControllers() + time.Sleep(10 * time.Millisecond) + fakeEtcd.WatchStop <- true + time.Sleep(10 * time.Millisecond) + + // Did everything shut down? + if _, open := <-fakeEtcd.WatchResponse; open { + t.Errorf("A stop did not cause a graceful shutdown") + } +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 0167af40310..620c99df7e5 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -33,7 +33,6 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/registry" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/coreos/go-etcd/etcd" "github.com/fsouza/go-dockerclient" @@ -62,7 +61,7 @@ type DockerInterface interface { // The main kubelet implementation type Kubelet struct { Hostname string - Client registry.EtcdClient + Client util.EtcdClient DockerClient DockerInterface FileCheckFrequency time.Duration SyncFrequency time.Duration diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 46d8472639b..6eea97987ec 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -27,7 +27,6 @@ import ( "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/registry" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/coreos/go-etcd/etcd" "github.com/fsouza/go-dockerclient" @@ -398,13 +397,13 @@ func (cr *channelReader) GetList() [][]api.ContainerManifest { } func TestGetKubeletStateFromEtcdNoData(t *testing.T) { - fakeClient := registry.MakeFakeEtcdClient(t) + fakeClient := util.MakeFakeEtcdClient(t) kubelet := Kubelet{ Client: fakeClient, } channel := make(chan []api.ContainerManifest) reader := startReading(channel) - fakeClient.Data["/registry/hosts/machine/kubelet"] = registry.EtcdResponseWithError{ + fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{ R: &etcd.Response{}, E: nil, } @@ -420,13 +419,13 @@ func TestGetKubeletStateFromEtcdNoData(t *testing.T) { } func TestGetKubeletStateFromEtcd(t *testing.T) { - fakeClient := registry.MakeFakeEtcdClient(t) + fakeClient := util.MakeFakeEtcdClient(t) kubelet := Kubelet{ Client: fakeClient, } channel := make(chan []api.ContainerManifest) reader := startReading(channel) - fakeClient.Data["/registry/hosts/machine/kubelet"] = registry.EtcdResponseWithError{ + fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{ R: &etcd.Response{ Node: &etcd.Node{ Value: util.MakeJSONString([]api.Container{}), @@ -444,13 +443,13 @@ func TestGetKubeletStateFromEtcd(t *testing.T) { } func TestGetKubeletStateFromEtcdNotFound(t *testing.T) { - fakeClient := registry.MakeFakeEtcdClient(t) + fakeClient := util.MakeFakeEtcdClient(t) kubelet := Kubelet{ Client: fakeClient, } channel := make(chan []api.ContainerManifest) reader := startReading(channel) - fakeClient.Data["/registry/hosts/machine/kubelet"] = registry.EtcdResponseWithError{ + fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{ R: &etcd.Response{}, E: &etcd.EtcdError{ ErrorCode: 100, @@ -466,13 +465,13 @@ func TestGetKubeletStateFromEtcdNotFound(t *testing.T) { } func TestGetKubeletStateFromEtcdError(t *testing.T) { - fakeClient := registry.MakeFakeEtcdClient(t) + fakeClient := util.MakeFakeEtcdClient(t) kubelet := Kubelet{ Client: fakeClient, } channel := make(chan []api.ContainerManifest) reader := startReading(channel) - fakeClient.Data["/registry/hosts/machine/kubelet"] = registry.EtcdResponseWithError{ + fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{ R: &etcd.Response{}, E: &etcd.EtcdError{ ErrorCode: 200, // non not found error @@ -554,7 +553,7 @@ func TestSyncManifestsDeletes(t *testing.T) { } func TestEventWriting(t *testing.T) { - fakeEtcd := registry.MakeFakeEtcdClient(t) + fakeEtcd := util.MakeFakeEtcdClient(t) kubelet := &Kubelet{ Client: fakeEtcd, } @@ -581,7 +580,7 @@ func TestEventWriting(t *testing.T) { } func TestEventWritingError(t *testing.T) { - fakeEtcd := registry.MakeFakeEtcdClient(t) + fakeEtcd := util.MakeFakeEtcdClient(t) kubelet := &Kubelet{ Client: fakeEtcd, } diff --git a/pkg/registry/etcd_registry.go b/pkg/registry/etcd_registry.go index fcf5ee064c6..389e4b98ecc 100644 --- a/pkg/registry/etcd_registry.go +++ b/pkg/registry/etcd_registry.go @@ -13,38 +13,25 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ + package registry import ( "encoding/json" "fmt" "log" - "reflect" - - "github.com/coreos/go-etcd/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) // TODO: Need to add a reconciler loop that makes sure that things in pods are reflected into // kubelet (and vice versa) -// EtcdClient is an injectable interface for testing. -type EtcdClient interface { - AddChild(key, data string, ttl uint64) (*etcd.Response, error) - Get(key string, sort, recursive bool) (*etcd.Response, error) - Set(key, value string, ttl uint64) (*etcd.Response, error) - Create(key, value string, ttl uint64) (*etcd.Response, error) - Delete(key string, recursive bool) (*etcd.Response, error) - // I'd like to use directional channels here (e.g. <-chan) but this interface mimics - // the etcd client interface which doesn't, and it doesn't seem worth it to wrap the api. - Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) -} - // EtcdRegistry is an implementation of both ControllerRegistry and PodRegistry which is backed with etcd. type EtcdRegistry struct { - etcdClient EtcdClient + etcdClient util.EtcdClient machines []string manifestFactory ManifestFactory } @@ -53,7 +40,7 @@ type EtcdRegistry struct { // 'client' is the connection to etcd // 'machines' is the list of machines // 'scheduler' is the scheduling algorithm to use. -func MakeEtcdRegistry(client EtcdClient, machines []string) *EtcdRegistry { +func MakeEtcdRegistry(client util.EtcdClient, machines []string) *EtcdRegistry { registry := &EtcdRegistry{ etcdClient: client, machines: machines, @@ -68,11 +55,15 @@ func makePodKey(machine, podID string) string { return "/registry/hosts/" + machine + "/pods/" + podID } +func (registry *EtcdRegistry) helper() *util.EtcdHelper { + return &util.EtcdHelper{registry.etcdClient} +} + func (registry *EtcdRegistry) ListPods(query labels.Query) ([]api.Pod, error) { pods := []api.Pod{} for _, machine := range registry.machines { var machinePods []api.Pod - err := registry.extractList("/registry/hosts/"+machine+"/pods", &machinePods) + err := registry.helper().ExtractList("/registry/hosts/"+machine+"/pods", &machinePods) if err != nil { return pods, err } @@ -86,80 +77,6 @@ func (registry *EtcdRegistry) ListPods(query labels.Query) ([]api.Pod, error) { return pods, nil } -func (registry *EtcdRegistry) listEtcdNode(key string) ([]*etcd.Node, error) { - result, err := registry.etcdClient.Get(key, false, true) - if err != nil { - nodes := make([]*etcd.Node, 0) - if isEtcdNotFound(err) { - return nodes, nil - } else { - return nodes, err - } - } - return result.Node.Nodes, nil -} - -// Extract a go object per etcd node into a slice. -func (r *EtcdRegistry) extractList(key string, slicePtr interface{}) error { - nodes, err := r.listEtcdNode(key) - if err != nil { - return err - } - pv := reflect.ValueOf(slicePtr) - if pv.Type().Kind() != reflect.Ptr || pv.Type().Elem().Kind() != reflect.Slice { - // This should not happen at runtime. - panic("need ptr to slice") - } - v := pv.Elem() - for _, node := range nodes { - obj := reflect.New(v.Type().Elem()) - err = json.Unmarshal([]byte(node.Value), obj.Interface()) - if err != nil { - return err - } - v.Set(reflect.Append(v, obj.Elem())) - } - return nil -} - -// Unmarshals json found at key into objPtr. On a not found error, will either return -// a zero object of the requested type, or an error, depending on ignoreNotFound. Treats -// empty responses and nil response nodes exactly like a not found error. -func (r *EtcdRegistry) extractObj(key string, objPtr interface{}, ignoreNotFound bool) error { - response, err := r.etcdClient.Get(key, false, false) - returnZero := false - if err != nil { - if ignoreNotFound && isEtcdNotFound(err) { - returnZero = true - } else { - return err - } - } - if !returnZero && (response.Node == nil || len(response.Node.Value) == 0) { - if ignoreNotFound { - returnZero = true - } else { - return fmt.Errorf("key '%v' found no nodes field: %#v", key, response) - } - } - if returnZero { - pv := reflect.ValueOf(objPtr) - pv.Elem().Set(reflect.Zero(pv.Type().Elem())) - return nil - } - return json.Unmarshal([]byte(response.Node.Value), objPtr) -} - -// json marshals obj, and stores under key. -func (r *EtcdRegistry) setObj(key string, obj interface{}) error { - data, err := json.Marshal(obj) - if err != nil { - return err - } - _, err = r.etcdClient.Set(key, string(data), 0) - return err -} - func (registry *EtcdRegistry) GetPod(podID string) (*api.Pod, error) { pod, _, err := registry.findPod(podID) return &pod, err @@ -170,12 +87,12 @@ func makeContainerKey(machine string) string { } func (registry *EtcdRegistry) loadManifests(machine string) (manifests []api.ContainerManifest, err error) { - err = registry.extractObj(makeContainerKey(machine), &manifests, true) + err = registry.helper().ExtractObj(makeContainerKey(machine), &manifests, true) return } func (registry *EtcdRegistry) updateManifests(machine string, manifests []api.ContainerManifest) error { - return registry.setObj(makeContainerKey(machine), manifests) + return registry.helper().SetObj(makeContainerKey(machine), manifests) } func (registry *EtcdRegistry) CreatePod(machineIn string, pod api.Pod) error { @@ -249,7 +166,7 @@ func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error func (registry *EtcdRegistry) getPodForMachine(machine, podID string) (pod api.Pod, err error) { key := makePodKey(machine, podID) - err = registry.extractObj(key, &pod, false) + err = registry.helper().ExtractObj(key, &pod, false) if err != nil { return } @@ -267,26 +184,9 @@ func (registry *EtcdRegistry) findPod(podID string) (api.Pod, string, error) { return api.Pod{}, "", fmt.Errorf("pod not found %s", podID) } -func isEtcdNotFound(err error) bool { - if err == nil { - return false - } - switch err.(type) { - case *etcd.EtcdError: - etcdError := err.(*etcd.EtcdError) - if etcdError == nil { - return false - } - if etcdError.ErrorCode == 100 { - return true - } - } - return false -} - func (registry *EtcdRegistry) ListControllers() ([]api.ReplicationController, error) { var controllers []api.ReplicationController - err := registry.extractList("/registry/controllers", &controllers) + err := registry.helper().ExtractList("/registry/controllers", &controllers) return controllers, err } @@ -297,7 +197,7 @@ func makeControllerKey(id string) string { func (registry *EtcdRegistry) GetController(controllerID string) (*api.ReplicationController, error) { var controller api.ReplicationController key := makeControllerKey(controllerID) - err := registry.extractObj(key, &controller, false) + err := registry.helper().ExtractObj(key, &controller, false) if err != nil { return nil, err } @@ -310,7 +210,7 @@ func (registry *EtcdRegistry) CreateController(controller api.ReplicationControl } func (registry *EtcdRegistry) UpdateController(controller api.ReplicationController) error { - return registry.setObj(makeControllerKey(controller.ID), controller) + return registry.helper().SetObj(makeControllerKey(controller.ID), controller) } func (registry *EtcdRegistry) DeleteController(controllerID string) error { @@ -325,18 +225,18 @@ func makeServiceKey(name string) string { func (registry *EtcdRegistry) ListServices() (api.ServiceList, error) { var list api.ServiceList - err := registry.extractList("/registry/services/specs", &list.Items) + err := registry.helper().ExtractList("/registry/services/specs", &list.Items) return list, err } func (registry *EtcdRegistry) CreateService(svc api.Service) error { - return registry.setObj(makeServiceKey(svc.ID), svc) + return registry.helper().SetObj(makeServiceKey(svc.ID), svc) } func (registry *EtcdRegistry) GetService(name string) (*api.Service, error) { key := makeServiceKey(name) var svc api.Service - err := registry.extractObj(key, &svc, false) + err := registry.helper().ExtractObj(key, &svc, false) if err != nil { return nil, err } @@ -359,5 +259,5 @@ func (registry *EtcdRegistry) UpdateService(svc api.Service) error { } func (registry *EtcdRegistry) UpdateEndpoints(e api.Endpoints) error { - return registry.setObj("/registry/services/endpoints/"+e.Name, e) + return registry.helper().SetObj("/registry/services/endpoints/"+e.Name, e) } diff --git a/pkg/registry/etcd_registry_test.go b/pkg/registry/etcd_registry_test.go index 4e13dc9ab12..85676d80f88 100644 --- a/pkg/registry/etcd_registry_test.go +++ b/pkg/registry/etcd_registry_test.go @@ -26,8 +26,16 @@ import ( "github.com/coreos/go-etcd/etcd" ) +func MakeTestEtcdRegistry(client util.EtcdClient, machines []string) *EtcdRegistry { + registry := MakeEtcdRegistry(client, machines) + registry.manifestFactory = &BasicManifestFactory{ + serviceRegistry: &MockServiceRegistry{}, + } + return registry +} + func TestEtcdGetPod(t *testing.T) { - fakeClient := MakeFakeEtcdClient(t) + fakeClient := util.MakeFakeEtcdClient(t) fakeClient.Set("/registry/hosts/machine/pods/foo", util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) pod, err := registry.GetPod("foo") @@ -38,8 +46,8 @@ func TestEtcdGetPod(t *testing.T) { } func TestEtcdGetPodNotFound(t *testing.T) { - fakeClient := MakeFakeEtcdClient(t) - fakeClient.Data["/registry/hosts/machine/pods/foo"] = EtcdResponseWithError{ + fakeClient := util.MakeFakeEtcdClient(t) + fakeClient.Data["/registry/hosts/machine/pods/foo"] = util.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -55,8 +63,8 @@ func TestEtcdGetPodNotFound(t *testing.T) { } func TestEtcdCreatePod(t *testing.T) { - fakeClient := MakeFakeEtcdClient(t) - fakeClient.Data["/registry/hosts/machine/pods/foo"] = EtcdResponseWithError{ + fakeClient := util.MakeFakeEtcdClient(t) + fakeClient.Data["/registry/hosts/machine/pods/foo"] = util.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -97,8 +105,8 @@ func TestEtcdCreatePod(t *testing.T) { } func TestEtcdCreatePodAlreadyExisting(t *testing.T) { - fakeClient := MakeFakeEtcdClient(t) - fakeClient.Data["/registry/hosts/machine/pods/foo"] = EtcdResponseWithError{ + fakeClient := util.MakeFakeEtcdClient(t) + fakeClient.Data["/registry/hosts/machine/pods/foo"] = util.EtcdResponseWithError{ R: &etcd.Response{ Node: &etcd.Node{ Value: util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), @@ -118,14 +126,14 @@ func TestEtcdCreatePodAlreadyExisting(t *testing.T) { } func TestEtcdCreatePodWithContainersError(t *testing.T) { - fakeClient := MakeFakeEtcdClient(t) - fakeClient.Data["/registry/hosts/machine/pods/foo"] = EtcdResponseWithError{ + fakeClient := util.MakeFakeEtcdClient(t) + fakeClient.Data["/registry/hosts/machine/pods/foo"] = util.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, E: &etcd.EtcdError{ErrorCode: 100}, } - fakeClient.Data["/registry/hosts/machine/kubelet"] = EtcdResponseWithError{ + fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -150,14 +158,14 @@ func TestEtcdCreatePodWithContainersError(t *testing.T) { } func TestEtcdCreatePodWithContainersNotFound(t *testing.T) { - fakeClient := MakeFakeEtcdClient(t) - fakeClient.Data["/registry/hosts/machine/pods/foo"] = EtcdResponseWithError{ + fakeClient := util.MakeFakeEtcdClient(t) + fakeClient.Data["/registry/hosts/machine/pods/foo"] = util.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, E: &etcd.EtcdError{ErrorCode: 100}, } - fakeClient.Data["/registry/hosts/machine/kubelet"] = EtcdResponseWithError{ + fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -198,8 +206,8 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) { } func TestEtcdCreatePodWithExistingContainers(t *testing.T) { - fakeClient := MakeFakeEtcdClient(t) - fakeClient.Data["/registry/hosts/machine/pods/foo"] = EtcdResponseWithError{ + fakeClient := util.MakeFakeEtcdClient(t) + fakeClient.Data["/registry/hosts/machine/pods/foo"] = util.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -245,7 +253,7 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) { } func TestEtcdDeletePod(t *testing.T) { - fakeClient := MakeFakeEtcdClient(t) + fakeClient := util.MakeFakeEtcdClient(t) key := "/registry/hosts/machine/pods/foo" fakeClient.Set(key, util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0) fakeClient.Set("/registry/hosts/machine/kubelet", util.MakeJSONString([]api.ContainerManifest{ @@ -256,11 +264,11 @@ func TestEtcdDeletePod(t *testing.T) { registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) err := registry.DeletePod("foo") expectNoError(t, err) - if len(fakeClient.deletedKeys) != 1 { - t.Errorf("Expected 1 delete, found %#v", fakeClient.deletedKeys) + if len(fakeClient.DeletedKeys) != 1 { + t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys) } - if fakeClient.deletedKeys[0] != key { - t.Errorf("Unexpected key: %s, expected %s", fakeClient.deletedKeys[0], key) + if fakeClient.DeletedKeys[0] != key { + t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key) } response, _ := fakeClient.Get("/registry/hosts/machine/kubelet", false, false) if response.Node.Value != "[]" { @@ -269,7 +277,7 @@ func TestEtcdDeletePod(t *testing.T) { } func TestEtcdDeletePodMultipleContainers(t *testing.T) { - fakeClient := MakeFakeEtcdClient(t) + fakeClient := util.MakeFakeEtcdClient(t) key := "/registry/hosts/machine/pods/foo" fakeClient.Set(key, util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0) fakeClient.Set("/registry/hosts/machine/kubelet", util.MakeJSONString([]api.ContainerManifest{ @@ -279,11 +287,11 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) { registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) err := registry.DeletePod("foo") expectNoError(t, err) - if len(fakeClient.deletedKeys) != 1 { - t.Errorf("Expected 1 delete, found %#v", fakeClient.deletedKeys) + if len(fakeClient.DeletedKeys) != 1 { + t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys) } - if fakeClient.deletedKeys[0] != key { - t.Errorf("Unexpected key: %s, expected %s", fakeClient.deletedKeys[0], key) + if fakeClient.DeletedKeys[0] != key { + t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key) } response, _ := fakeClient.Get("/registry/hosts/machine/kubelet", false, false) var manifests []api.ContainerManifest @@ -297,9 +305,9 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) { } func TestEtcdEmptyListPods(t *testing.T) { - fakeClient := MakeFakeEtcdClient(t) + fakeClient := util.MakeFakeEtcdClient(t) key := "/registry/hosts/machine/pods" - fakeClient.Data[key] = EtcdResponseWithError{ + fakeClient.Data[key] = util.EtcdResponseWithError{ R: &etcd.Response{ Node: &etcd.Node{ Nodes: []*etcd.Node{}, @@ -316,9 +324,9 @@ func TestEtcdEmptyListPods(t *testing.T) { } func TestEtcdListPodsNotFound(t *testing.T) { - fakeClient := MakeFakeEtcdClient(t) + fakeClient := util.MakeFakeEtcdClient(t) key := "/registry/hosts/machine/pods" - fakeClient.Data[key] = EtcdResponseWithError{ + fakeClient.Data[key] = util.EtcdResponseWithError{ R: &etcd.Response{}, E: &etcd.EtcdError{ErrorCode: 100}, } @@ -331,9 +339,9 @@ func TestEtcdListPodsNotFound(t *testing.T) { } func TestEtcdListPods(t *testing.T) { - fakeClient := MakeFakeEtcdClient(t) + fakeClient := util.MakeFakeEtcdClient(t) key := "/registry/hosts/machine/pods" - fakeClient.Data[key] = EtcdResponseWithError{ + fakeClient.Data[key] = util.EtcdResponseWithError{ R: &etcd.Response{ Node: &etcd.Node{ Nodes: []*etcd.Node{ @@ -361,9 +369,9 @@ func TestEtcdListPods(t *testing.T) { } func TestEtcdListControllersNotFound(t *testing.T) { - fakeClient := MakeFakeEtcdClient(t) + fakeClient := util.MakeFakeEtcdClient(t) key := "/registry/controllers" - fakeClient.Data[key] = EtcdResponseWithError{ + fakeClient.Data[key] = util.EtcdResponseWithError{ R: &etcd.Response{}, E: &etcd.EtcdError{ErrorCode: 100}, } @@ -376,9 +384,9 @@ func TestEtcdListControllersNotFound(t *testing.T) { } func TestEtcdListServicesNotFound(t *testing.T) { - fakeClient := MakeFakeEtcdClient(t) + fakeClient := util.MakeFakeEtcdClient(t) key := "/registry/services/specs" - fakeClient.Data[key] = EtcdResponseWithError{ + fakeClient.Data[key] = util.EtcdResponseWithError{ R: &etcd.Response{}, E: &etcd.EtcdError{ErrorCode: 100}, } @@ -391,9 +399,9 @@ func TestEtcdListServicesNotFound(t *testing.T) { } func TestEtcdListControllers(t *testing.T) { - fakeClient := MakeFakeEtcdClient(t) + fakeClient := util.MakeFakeEtcdClient(t) key := "/registry/controllers" - fakeClient.Data[key] = EtcdResponseWithError{ + fakeClient.Data[key] = util.EtcdResponseWithError{ R: &etcd.Response{ Node: &etcd.Node{ Nodes: []*etcd.Node{ @@ -417,7 +425,7 @@ func TestEtcdListControllers(t *testing.T) { } func TestEtcdGetController(t *testing.T) { - fakeClient := MakeFakeEtcdClient(t) + fakeClient := util.MakeFakeEtcdClient(t) fakeClient.Set("/registry/controllers/foo", util.MakeJSONString(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) ctrl, err := registry.GetController("foo") @@ -428,8 +436,8 @@ func TestEtcdGetController(t *testing.T) { } func TestEtcdGetControllerNotFound(t *testing.T) { - fakeClient := MakeFakeEtcdClient(t) - fakeClient.Data["/registry/controllers/foo"] = EtcdResponseWithError{ + fakeClient := util.MakeFakeEtcdClient(t) + fakeClient.Data["/registry/controllers/foo"] = util.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -448,21 +456,21 @@ func TestEtcdGetControllerNotFound(t *testing.T) { } func TestEtcdDeleteController(t *testing.T) { - fakeClient := MakeFakeEtcdClient(t) + fakeClient := util.MakeFakeEtcdClient(t) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) err := registry.DeleteController("foo") expectNoError(t, err) - if len(fakeClient.deletedKeys) != 1 { - t.Errorf("Expected 1 delete, found %#v", fakeClient.deletedKeys) + if len(fakeClient.DeletedKeys) != 1 { + t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys) } key := "/registry/controllers/foo" - if fakeClient.deletedKeys[0] != key { - t.Errorf("Unexpected key: %s, expected %s", fakeClient.deletedKeys[0], key) + if fakeClient.DeletedKeys[0] != key { + t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key) } } func TestEtcdCreateController(t *testing.T) { - fakeClient := MakeFakeEtcdClient(t) + fakeClient := util.MakeFakeEtcdClient(t) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) err := registry.CreateController(api.ReplicationController{ JSONBase: api.JSONBase{ @@ -481,7 +489,7 @@ func TestEtcdCreateController(t *testing.T) { } func TestEtcdUpdateController(t *testing.T) { - fakeClient := MakeFakeEtcdClient(t) + fakeClient := util.MakeFakeEtcdClient(t) fakeClient.Set("/registry/controllers/foo", util.MakeJSONString(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) err := registry.UpdateController(api.ReplicationController{ @@ -498,9 +506,9 @@ func TestEtcdUpdateController(t *testing.T) { } func TestEtcdListServices(t *testing.T) { - fakeClient := MakeFakeEtcdClient(t) + fakeClient := util.MakeFakeEtcdClient(t) key := "/registry/services/specs" - fakeClient.Data[key] = EtcdResponseWithError{ + fakeClient.Data[key] = util.EtcdResponseWithError{ R: &etcd.Response{ Node: &etcd.Node{ Nodes: []*etcd.Node{ @@ -524,8 +532,8 @@ func TestEtcdListServices(t *testing.T) { } func TestEtcdCreateService(t *testing.T) { - fakeClient := MakeFakeEtcdClient(t) - fakeClient.Data["/registry/services/specs/foo"] = EtcdResponseWithError{ + fakeClient := util.MakeFakeEtcdClient(t) + fakeClient.Data["/registry/services/specs/foo"] = util.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -547,7 +555,7 @@ func TestEtcdCreateService(t *testing.T) { } func TestEtcdGetService(t *testing.T) { - fakeClient := MakeFakeEtcdClient(t) + fakeClient := util.MakeFakeEtcdClient(t) fakeClient.Set("/registry/services/specs/foo", util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) service, err := registry.GetService("foo") @@ -558,8 +566,8 @@ func TestEtcdGetService(t *testing.T) { } func TestEtcdGetServiceNotFound(t *testing.T) { - fakeClient := MakeFakeEtcdClient(t) - fakeClient.Data["/registry/services/specs/foo"] = EtcdResponseWithError{ + fakeClient := util.MakeFakeEtcdClient(t) + fakeClient.Data["/registry/services/specs/foo"] = util.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -575,25 +583,25 @@ func TestEtcdGetServiceNotFound(t *testing.T) { } func TestEtcdDeleteService(t *testing.T) { - fakeClient := MakeFakeEtcdClient(t) + fakeClient := util.MakeFakeEtcdClient(t) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) err := registry.DeleteService("foo") expectNoError(t, err) - if len(fakeClient.deletedKeys) != 2 { - t.Errorf("Expected 2 delete, found %#v", fakeClient.deletedKeys) + if len(fakeClient.DeletedKeys) != 2 { + t.Errorf("Expected 2 delete, found %#v", fakeClient.DeletedKeys) } key := "/registry/services/specs/foo" - if fakeClient.deletedKeys[0] != key { - t.Errorf("Unexpected key: %s, expected %s", fakeClient.deletedKeys[0], key) + if fakeClient.DeletedKeys[0] != key { + t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key) } key = "/registry/services/endpoints/foo" - if fakeClient.deletedKeys[1] != key { - t.Errorf("Unexpected key: %s, expected %s", fakeClient.deletedKeys[1], key) + if fakeClient.DeletedKeys[1] != key { + t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[1], key) } } func TestEtcdUpdateService(t *testing.T) { - fakeClient := MakeFakeEtcdClient(t) + fakeClient := util.MakeFakeEtcdClient(t) fakeClient.Set("/registry/services/specs/foo", util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) err := registry.UpdateService(api.Service{ @@ -610,7 +618,7 @@ func TestEtcdUpdateService(t *testing.T) { } func TestEtcdUpdateEndpoints(t *testing.T) { - fakeClient := MakeFakeEtcdClient(t) + fakeClient := util.MakeFakeEtcdClient(t) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) endpoints := api.Endpoints{ Name: "foo", diff --git a/pkg/util/etcd_tools.go b/pkg/util/etcd_tools.go new file mode 100644 index 00000000000..e97903c4aec --- /dev/null +++ b/pkg/util/etcd_tools.go @@ -0,0 +1,122 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "encoding/json" + "fmt" + "reflect" + + "github.com/coreos/go-etcd/etcd" +) + +// Interface exposing only the etcd operations needed by EtcdHelper. +type EtcdGetSet interface { + Get(key string, sort, recursive bool) (*etcd.Response, error) + Set(key, value string, ttl uint64) (*etcd.Response, error) +} + +// EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client. +type EtcdHelper struct { + Client EtcdGetSet +} + +// Returns true iff err is an etcd not found error. +func IsEtcdNotFound(err error) bool { + if err == nil { + return false + } + switch err.(type) { + case *etcd.EtcdError: + etcdError := err.(*etcd.EtcdError) + if etcdError == nil { + return false + } + if etcdError.ErrorCode == 100 { + return true + } + } + return false +} + +func (h *EtcdHelper) listEtcdNode(key string) ([]*etcd.Node, error) { + result, err := h.Client.Get(key, false, true) + if err != nil { + nodes := make([]*etcd.Node, 0) + if IsEtcdNotFound(err) { + return nodes, nil + } else { + return nodes, err + } + } + return result.Node.Nodes, nil +} + +// Extract a go object per etcd node into a slice. +func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}) error { + nodes, err := h.listEtcdNode(key) + if err != nil { + return err + } + pv := reflect.ValueOf(slicePtr) + if pv.Type().Kind() != reflect.Ptr || pv.Type().Elem().Kind() != reflect.Slice { + // This should not happen at runtime. + panic("need ptr to slice") + } + v := pv.Elem() + for _, node := range nodes { + obj := reflect.New(v.Type().Elem()) + err = json.Unmarshal([]byte(node.Value), obj.Interface()) + if err != nil { + return err + } + v.Set(reflect.Append(v, obj.Elem())) + } + return nil +} + +// Unmarshals json found at key into objPtr. On a not found error, will either return +// a zero object of the requested type, or an error, depending on ignoreNotFound. Treats +// empty responses and nil response nodes exactly like a not found error. +func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) error { + response, err := h.Client.Get(key, false, false) + + if err != nil && !IsEtcdNotFound(err) { + return err + } + if err != nil || response.Node == nil || len(response.Node.Value) == 0 { + if ignoreNotFound { + pv := reflect.ValueOf(objPtr) + pv.Elem().Set(reflect.Zero(pv.Type().Elem())) + return nil + } else if err != nil { + return err + } + return fmt.Errorf("key '%v' found no nodes field: %#v", key, response) + } + return json.Unmarshal([]byte(response.Node.Value), objPtr) +} + +// SetObj marshals obj via json, and stores under key. +func (h *EtcdHelper) SetObj(key string, obj interface{}) error { + data, err := json.Marshal(obj) + if err != nil { + return err + } + _, err = h.Client.Set(key, string(data), 0) + return err +} diff --git a/pkg/util/etcd_tools_test.go b/pkg/util/etcd_tools_test.go new file mode 100644 index 00000000000..ebf11e835c7 --- /dev/null +++ b/pkg/util/etcd_tools_test.go @@ -0,0 +1,135 @@ +package util + +import ( + "fmt" + "reflect" + "testing" + + "github.com/coreos/go-etcd/etcd" +) + +type fakeEtcdGetSet struct { + get func(key string, sort, recursive bool) (*etcd.Response, error) + set func(key, value string, ttl uint64) (*etcd.Response, error) +} + +func TestIsNotFoundErr(t *testing.T) { + try := func(err error, isNotFound bool) { + if IsEtcdNotFound(err) != isNotFound { + t.Errorf("Expected %#v to return %v, but it did not", err, isNotFound) + } + } + try(&etcd.EtcdError{ErrorCode: 100}, true) + try(&etcd.EtcdError{ErrorCode: 101}, false) + try(nil, false) + try(fmt.Errorf("some other kind of error"), false) +} + +type testMarshalType struct { + ID string `json:"id"` +} + +func TestExtractList(t *testing.T) { + fakeClient := MakeFakeEtcdClient(t) + fakeClient.Data["/some/key"] = EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Nodes: []*etcd.Node{ + { + Value: `{"id":"foo"}`, + }, + { + Value: `{"id":"bar"}`, + }, + { + Value: `{"id":"baz"}`, + }, + }, + }, + }, + } + expect := []testMarshalType{ + {"foo"}, + {"bar"}, + {"baz"}, + } + var got []testMarshalType + helper := EtcdHelper{fakeClient} + err := helper.ExtractList("/some/key", &got) + if err != nil { + t.Errorf("Unexpected error %#v", err) + } + if !reflect.DeepEqual(got, expect) { + t.Errorf("Wanted %#v, got %#v", expect, got) + } +} + +func TestExtractObj(t *testing.T) { + fakeClient := MakeFakeEtcdClient(t) + expect := testMarshalType{ID: "foo"} + fakeClient.Set("/some/key", MakeJSONString(expect), 0) + helper := EtcdHelper{fakeClient} + var got testMarshalType + err := helper.ExtractObj("/some/key", &got, false) + if err != nil { + t.Errorf("Unexpected error %#v", err) + } + if !reflect.DeepEqual(got, expect) { + t.Errorf("Wanted %#v, got %#v", expect, got) + } +} + +func TestExtractObjNotFoundErr(t *testing.T) { + fakeClient := MakeFakeEtcdClient(t) + fakeClient.Data["/some/key"] = EtcdResponseWithError{ + R: &etcd.Response{ + Node: nil, + }, + E: &etcd.EtcdError{ + ErrorCode: 100, + }, + } + fakeClient.Data["/some/key2"] = EtcdResponseWithError{ + R: &etcd.Response{ + Node: nil, + }, + } + fakeClient.Data["/some/key3"] = EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: "", + }, + }, + } + helper := EtcdHelper{fakeClient} + try := func(key string) { + var got testMarshalType + err := helper.ExtractObj(key, &got, false) + if err == nil { + t.Errorf("%s: wanted error but didn't get one", key) + } + err = helper.ExtractObj(key, &got, true) + if err != nil { + t.Errorf("%s: didn't want error but got %#v", key, err) + } + } + + try("/some/key") + try("/some/key2") + try("/some/key3") +} + +func TestSetObj(t *testing.T) { + obj := testMarshalType{ID: "foo"} + fakeClient := MakeFakeEtcdClient(t) + helper := EtcdHelper{fakeClient} + err := helper.SetObj("/some/key", obj) + if err != nil { + t.Errorf("Unexpected error %#v", err) + } + expect := MakeJSONString(obj) + got := fakeClient.Data["/some/key"].R.Node.Value + if expect != got { + t.Errorf("Wanted %v, got %v", expect, got) + } +} diff --git a/pkg/registry/fake_etcd_client.go b/pkg/util/fake_etcd_client.go similarity index 59% rename from pkg/registry/fake_etcd_client.go rename to pkg/util/fake_etcd_client.go index d5a54bda1bd..56e023fa859 100644 --- a/pkg/registry/fake_etcd_client.go +++ b/pkg/util/fake_etcd_client.go @@ -13,7 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -package registry +package util import ( "fmt" @@ -22,6 +22,18 @@ import ( "github.com/coreos/go-etcd/etcd" ) +// EtcdClient is an injectable interface for testing. +type EtcdClient interface { + AddChild(key, data string, ttl uint64) (*etcd.Response, error) + Get(key string, sort, recursive bool) (*etcd.Response, error) + Set(key, value string, ttl uint64) (*etcd.Response, error) + Create(key, value string, ttl uint64) (*etcd.Response, error) + Delete(key string, recursive bool) (*etcd.Response, error) + // I'd like to use directional channels here (e.g. <-chan) but this interface mimics + // the etcd client interface which doesn't, and it doesn't seem worth it to wrap the api. + Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) +} + type EtcdResponseWithError struct { R *etcd.Response E error @@ -29,10 +41,17 @@ type EtcdResponseWithError struct { type FakeEtcdClient struct { Data map[string]EtcdResponseWithError - deletedKeys []string + DeletedKeys []string Err error t *testing.T Ix int + + // Will become valid after Watch is called; tester may write to it. Tester may + // also read from it to verify that it's closed after injecting an error. + WatchResponse chan *etcd.Response + // Write to this to prematurely stop a Watch that is running in a goroutine. + WatchInjectError chan<- error + WatchStop chan<- bool } func MakeFakeEtcdClient(t *testing.T) *FakeEtcdClient { @@ -71,18 +90,22 @@ func (f *FakeEtcdClient) Create(key, value string, ttl uint64) (*etcd.Response, return f.Set(key, value, ttl) } func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, error) { - f.deletedKeys = append(f.deletedKeys, key) + f.DeletedKeys = append(f.DeletedKeys, key) return &etcd.Response{}, f.Err } func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) { - return nil, fmt.Errorf("unimplemented") -} - -func MakeTestEtcdRegistry(client EtcdClient, machines []string) *EtcdRegistry { - registry := MakeEtcdRegistry(client, machines) - registry.manifestFactory = &BasicManifestFactory{ - serviceRegistry: &MockServiceRegistry{}, + f.WatchResponse = receiver + f.WatchStop = stop + injectedError := make(chan error) + defer close(injectedError) + f.WatchInjectError = injectedError + select { + case <-stop: + return nil, etcd.ErrWatchStoppedByUser + case err := <-injectedError: + return nil, err } - return registry + // Never get here. + return nil, nil } diff --git a/pkg/util/fake_handler.go b/pkg/util/fake_handler.go index f6b4adb7253..94936a69993 100644 --- a/pkg/util/fake_handler.go +++ b/pkg/util/fake_handler.go @@ -32,6 +32,7 @@ type LogInterface interface { // FakeHandler is to assist in testing HTTP requests. type FakeHandler struct { RequestReceived *http.Request + RequestBody string StatusCode int ResponseBody string // For logging - you can use a *testing.T @@ -48,7 +49,7 @@ func (f *FakeHandler) ServeHTTP(response http.ResponseWriter, request *http.Requ if err != nil && f.T != nil { f.T.Logf("Received read error: %#v", err) } - f.ResponseBody = string(bodyReceived) + f.RequestBody = string(bodyReceived) } func (f FakeHandler) ValidateRequest(t TestInterface, expectedPath, expectedMethod string, body *string) { @@ -59,8 +60,8 @@ func (f FakeHandler) ValidateRequest(t TestInterface, expectedPath, expectedMeth t.Errorf("Unexpected method: %s", f.RequestReceived.Method) } if body != nil { - if *body != f.ResponseBody { - t.Errorf("Received body:\n%s\n Doesn't match expected body:\n%s", f.ResponseBody, *body) + if *body != f.RequestBody { + t.Errorf("Received body:\n%s\n Doesn't match expected body:\n%s", f.RequestBody, *body) } } }