Merge pull request #144 from lavalamp/master

Refactor controller manager.
This commit is contained in:
brendandburns 2014-06-18 13:26:41 -07:00
commit dbdb326eaf
15 changed files with 641 additions and 277 deletions

View File

@ -28,8 +28,7 @@ import (
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry" "github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd" "github.com/coreos/go-etcd/etcd"
) )
@ -48,12 +47,11 @@ func main() {
// Set up logger for etcd client // Set up logger for etcd client
etcd.SetLogger(log.New(os.Stderr, "etcd ", log.LstdFlags)) etcd.SetLogger(log.New(os.Stderr, "etcd ", log.LstdFlags))
controllerManager := registry.MakeReplicationManager(etcd.NewClient([]string{*etcd_servers}), controllerManager := controller.MakeReplicationManager(etcd.NewClient([]string{*etcd_servers}),
client.Client{ client.Client{
Host: "http://" + *master, Host: "http://" + *master,
}) })
go util.Forever(func() { controllerManager.Synchronize() }, 20*time.Second) controllerManager.Run(10 * time.Second)
go util.Forever(func() { controllerManager.WatchControllers() }, 20*time.Second)
select {} select {}
} }

View File

@ -28,6 +28,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry"
"github.com/coreos/go-etcd/etcd" "github.com/coreos/go-etcd/etcd"
) )
@ -48,13 +49,12 @@ func main() {
}, "/api/v1beta1") }, "/api/v1beta1")
server := httptest.NewServer(apiserver) server := httptest.NewServer(apiserver)
controllerManager := registry.MakeReplicationManager(etcd.NewClient(servers), controllerManager := controller.MakeReplicationManager(etcd.NewClient(servers),
client.Client{ client.Client{
Host: server.URL, Host: server.URL,
}) })
go controllerManager.Synchronize() controllerManager.Run(10 * time.Second)
go controllerManager.WatchControllers()
// Ok. we're good to go. // Ok. we're good to go.
log.Printf("API Server started on %s", server.URL) log.Printf("API Server started on %s", server.URL)

View File

@ -29,10 +29,9 @@ import (
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master" "github.com/GoogleCloudPlatform/kubernetes/pkg/master"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd" "github.com/coreos/go-etcd/etcd"
"github.com/fsouza/go-dockerclient" "github.com/fsouza/go-dockerclient"
) )
@ -86,13 +85,12 @@ func api_server() {
// Starts up a controller manager. Never returns. // Starts up a controller manager. Never returns.
func controller_manager() { func controller_manager() {
controllerManager := registry.MakeReplicationManager(etcd.NewClient([]string{*etcd_server}), controllerManager := controller.MakeReplicationManager(etcd.NewClient([]string{*etcd_server}),
client.Client{ client.Client{
Host: fmt.Sprintf("http://%s:%d", *master_address, *master_port), Host: fmt.Sprintf("http://%s:%d", *master_address, *master_port),
}) })
go util.Forever(func() { controllerManager.Synchronize() }, 20*time.Second) controllerManager.Run(20 * time.Second)
go util.Forever(func() { controllerManager.WatchControllers() }, 20*time.Second)
select {} select {}
} }

View File

@ -166,7 +166,7 @@ func TestDoRequest(t *testing.T) {
if body != expectedBody { if body != expectedBody {
t.Errorf("Expected body: '%s', saw: '%s'", expectedBody, body) t.Errorf("Expected body: '%s', saw: '%s'", expectedBody, body)
} }
fakeHandler.ValidateRequest(t, "/foo/bar", "GET", &fakeHandler.ResponseBody) fakeHandler.ValidateRequest(t, "/foo/bar", "GET", nil)
} }
func TestRunController(t *testing.T) { func TestRunController(t *testing.T) {

19
pkg/controller/doc.go Normal file
View File

@ -0,0 +1,19 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package controller contains logic for watching and synchronizing
// replicationControllers.
package controller

View File

@ -13,7 +13,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package registry
package controller
import ( import (
"encoding/json" "encoding/json"
@ -21,7 +22,6 @@ import (
"log" "log"
"math/rand" "math/rand"
"strings" "strings"
"sync"
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -34,10 +34,13 @@ import (
// with actual running pods. // with actual running pods.
// TODO: Remove the etcd dependency and re-factor in terms of a generic watch interface // TODO: Remove the etcd dependency and re-factor in terms of a generic watch interface
type ReplicationManager struct { type ReplicationManager struct {
etcdClient *etcd.Client etcdClient util.EtcdClient
kubeClient client.ClientInterface kubeClient client.ClientInterface
podControl PodControlInterface podControl PodControlInterface
updateLock sync.Mutex syncTime <-chan time.Time
// To allow injection of syncReplicationController for testing.
syncHandler func(controllerSpec api.ReplicationController) error
} }
// An interface that knows how to add or delete pods // An interface that knows how to add or delete pods
@ -73,44 +76,63 @@ func (r RealPodControl) deletePod(podID string) error {
return r.kubeClient.DeletePod(podID) return r.kubeClient.DeletePod(podID)
} }
func MakeReplicationManager(etcdClient *etcd.Client, kubeClient client.ClientInterface) *ReplicationManager { func MakeReplicationManager(etcdClient util.EtcdClient, kubeClient client.ClientInterface) *ReplicationManager {
return &ReplicationManager{ rm := &ReplicationManager{
kubeClient: kubeClient, kubeClient: kubeClient,
etcdClient: etcdClient, etcdClient: etcdClient,
podControl: RealPodControl{ podControl: RealPodControl{
kubeClient: kubeClient, kubeClient: kubeClient,
}, },
} }
rm.syncHandler = func(controllerSpec api.ReplicationController) error {
return rm.syncReplicationController(controllerSpec)
}
return rm
} }
func (rm *ReplicationManager) WatchControllers() { // Begin watching and syncing.
func (rm *ReplicationManager) Run(period time.Duration) {
rm.syncTime = time.Tick(period)
go util.Forever(func() { rm.watchControllers() }, period)
}
func (rm *ReplicationManager) watchControllers() {
watchChannel := make(chan *etcd.Response) watchChannel := make(chan *etcd.Response)
stop := make(chan bool)
defer func() {
// Ensure that the call to watch ends.
close(stop)
}()
go func() { go func() {
defer util.HandleCrash() defer util.HandleCrash()
defer func() { defer func() {
close(watchChannel) close(watchChannel)
}() }()
rm.etcdClient.Watch("/registry/controllers", 0, true, watchChannel, nil) _, err := rm.etcdClient.Watch("/registry/controllers", 0, true, watchChannel, stop)
if err != etcd.ErrWatchStoppedByUser {
log.Printf("etcd.Watch stopped unexpectedly: %v (%#v)", err, err)
}
}() }()
for { for {
watchResponse, ok := <-watchChannel select {
if !ok { case <-rm.syncTime:
// watchChannel has been closed. Let the util.Forever() that rm.synchronize()
// called us call us again. case watchResponse, open := <-watchChannel:
if !open || watchResponse == nil {
// watchChannel has been closed, or something else went
// wrong with our etcd watch call. Let the util.Forever()
// that called us call us again.
return return
} }
if watchResponse == nil {
time.Sleep(time.Second * 10)
continue
}
log.Printf("Got watch: %#v", watchResponse) log.Printf("Got watch: %#v", watchResponse)
controller, err := rm.handleWatchResponse(watchResponse) controller, err := rm.handleWatchResponse(watchResponse)
if err != nil { if err != nil {
log.Printf("Error handling data: %#v, %#v", err, watchResponse) log.Printf("Error handling data: %#v, %#v", err, watchResponse)
continue continue
} }
rm.syncReplicationController(*controller) rm.syncHandler(*controller)
}
} }
} }
@ -141,7 +163,6 @@ func (rm *ReplicationManager) filterActivePods(pods []api.Pod) []api.Pod {
} }
func (rm *ReplicationManager) syncReplicationController(controllerSpec api.ReplicationController) error { func (rm *ReplicationManager) syncReplicationController(controllerSpec api.ReplicationController) error {
rm.updateLock.Lock()
podList, err := rm.kubeClient.ListPods(controllerSpec.DesiredState.ReplicasInSet) podList, err := rm.kubeClient.ListPods(controllerSpec.DesiredState.ReplicasInSet)
if err != nil { if err != nil {
return err return err
@ -161,38 +182,21 @@ func (rm *ReplicationManager) syncReplicationController(controllerSpec api.Repli
rm.podControl.deletePod(filteredList[i].ID) rm.podControl.deletePod(filteredList[i].ID)
} }
} }
rm.updateLock.Unlock()
return nil return nil
} }
func (rm *ReplicationManager) Synchronize() { func (rm *ReplicationManager) synchronize() {
for { var controllerSpecs []api.ReplicationController
response, err := rm.etcdClient.Get("/registry/controllers", false, false) helper := util.EtcdHelper{rm.etcdClient}
err := helper.ExtractList("/registry/controllers", &controllerSpecs)
if err != nil { if err != nil {
log.Printf("Synchronization error %#v", err) log.Printf("Synchronization error: %v (%#v)", err, err)
return
} }
// TODO(bburns): There is a race here, if we get a version of the controllers, and then it is for _, controllerSpec := range controllerSpecs {
// updated, its possible that the watch will pick up the change first, and then we will execute err = rm.syncHandler(controllerSpec)
// using the old version of the controller.
// Probably the correct thing to do is to use the version number in etcd to detect when
// we are stale.
// Punting on this for now, but this could lead to some nasty bugs, so we should really fix it
// sooner rather than later.
if response != nil && response.Node != nil && response.Node.Nodes != nil {
for _, value := range response.Node.Nodes {
var controllerSpec api.ReplicationController
err := json.Unmarshal([]byte(value.Value), &controllerSpec)
if err != nil {
log.Printf("Unexpected error: %#v", err)
continue
}
log.Printf("Synchronizing %s\n", controllerSpec.ID)
err = rm.syncReplicationController(controllerSpec)
if err != nil { if err != nil {
log.Printf("Error synchronizing: %#v", err) log.Printf("Error synchronizing: %#v", err)
} }
} }
} }
time.Sleep(10 * time.Second)
}
}

View File

@ -13,7 +13,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package registry
package controller
import ( import (
"encoding/json" "encoding/json"
@ -21,6 +22,7 @@ import (
"net/http/httptest" "net/http/httptest"
"reflect" "reflect"
"testing" "testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
@ -31,6 +33,13 @@ import (
// TODO: Move this to a common place, it's needed in multiple tests. // TODO: Move this to a common place, it's needed in multiple tests.
var apiPath = "/api/v1beta1" var apiPath = "/api/v1beta1"
// TODO: This doesn't reduce typing enough to make it worth the less readable errors. Remove.
func expectNoError(t *testing.T, err error) {
if err != nil {
t.Errorf("Unexpected error: %#v", err)
}
}
func makeUrl(suffix string) string { func makeUrl(suffix string) string {
return apiPath + suffix return apiPath + suffix
} }
@ -309,3 +318,152 @@ func TestHandleWatchResponse(t *testing.T) {
t.Errorf("Unexpected mismatch. Expected %#v, Saw: %#v", controller, controllerOut) t.Errorf("Unexpected mismatch. Expected %#v, Saw: %#v", controller, controllerOut)
} }
} }
func TestSyncronize(t *testing.T) {
controllerSpec1 := api.ReplicationController{
DesiredState: api.ReplicationControllerState{
Replicas: 4,
PodTemplate: api.PodTemplate{
DesiredState: api.PodState{
Manifest: api.ContainerManifest{
Containers: []api.Container{
{
Image: "foo/bar",
},
},
},
},
Labels: map[string]string{
"name": "foo",
"type": "production",
},
},
},
}
controllerSpec2 := api.ReplicationController{
DesiredState: api.ReplicationControllerState{
Replicas: 3,
PodTemplate: api.PodTemplate{
DesiredState: api.PodState{
Manifest: api.ContainerManifest{
Containers: []api.Container{
{
Image: "bar/baz",
},
},
},
},
Labels: map[string]string{
"name": "bar",
"type": "production",
},
},
},
}
fakeEtcd := util.MakeFakeEtcdClient(t)
fakeEtcd.Data["/registry/controllers"] = util.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Nodes: []*etcd.Node{
{
Value: util.MakeJSONString(controllerSpec1),
},
{
Value: util.MakeJSONString(controllerSpec2),
},
},
},
},
}
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: "{}",
T: t,
}
testServer := httptest.NewTLSServer(&fakeHandler)
client := client.Client{
Host: testServer.URL,
}
manager := MakeReplicationManager(fakeEtcd, client)
fakePodControl := FakePodControl{}
manager.podControl = &fakePodControl
manager.synchronize()
validateSyncReplication(t, &fakePodControl, 7, 0)
}
type asyncTimeout struct {
doneChan chan bool
}
func beginTimeout(d time.Duration) *asyncTimeout {
a := &asyncTimeout{doneChan: make(chan bool)}
go func() {
select {
case <-a.doneChan:
return
case <-time.After(d):
panic("Timeout expired!")
}
}()
return a
}
func (a *asyncTimeout) done() {
close(a.doneChan)
}
func TestWatchControllers(t *testing.T) {
defer beginTimeout(20 * time.Second).done()
fakeEtcd := util.MakeFakeEtcdClient(t)
manager := MakeReplicationManager(fakeEtcd, nil)
var testControllerSpec api.ReplicationController
receivedCount := 0
manager.syncHandler = func(controllerSpec api.ReplicationController) error {
if !reflect.DeepEqual(controllerSpec, testControllerSpec) {
t.Errorf("Expected %#v, but got %#v", testControllerSpec, controllerSpec)
}
receivedCount++
return nil
}
go manager.watchControllers()
time.Sleep(10 * time.Millisecond)
// Test normal case
testControllerSpec.ID = "foo"
fakeEtcd.WatchResponse <- &etcd.Response{
Action: "set",
Node: &etcd.Node{
Value: util.MakeJSONString(testControllerSpec),
},
}
time.Sleep(10 * time.Millisecond)
if receivedCount != 1 {
t.Errorf("Expected 1 call but got %v", receivedCount)
}
// Test error case
fakeEtcd.WatchInjectError <- fmt.Errorf("Injected error")
time.Sleep(10 * time.Millisecond)
// Did everything shut down?
if _, open := <-fakeEtcd.WatchResponse; open {
t.Errorf("An injected error did not cause a graceful shutdown")
}
// Test purposeful shutdown
go manager.watchControllers()
time.Sleep(10 * time.Millisecond)
fakeEtcd.WatchStop <- true
time.Sleep(10 * time.Millisecond)
// Did everything shut down?
if _, open := <-fakeEtcd.WatchResponse; open {
t.Errorf("A stop did not cause a graceful shutdown")
}
}

View File

@ -33,7 +33,6 @@ import (
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd" "github.com/coreos/go-etcd/etcd"
"github.com/fsouza/go-dockerclient" "github.com/fsouza/go-dockerclient"
@ -62,7 +61,7 @@ type DockerInterface interface {
// The main kubelet implementation // The main kubelet implementation
type Kubelet struct { type Kubelet struct {
Hostname string Hostname string
Client registry.EtcdClient Client util.EtcdClient
DockerClient DockerInterface DockerClient DockerInterface
FileCheckFrequency time.Duration FileCheckFrequency time.Duration
SyncFrequency time.Duration SyncFrequency time.Duration

View File

@ -27,7 +27,6 @@ import (
"testing" "testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd" "github.com/coreos/go-etcd/etcd"
"github.com/fsouza/go-dockerclient" "github.com/fsouza/go-dockerclient"
@ -398,13 +397,13 @@ func (cr *channelReader) GetList() [][]api.ContainerManifest {
} }
func TestGetKubeletStateFromEtcdNoData(t *testing.T) { func TestGetKubeletStateFromEtcdNoData(t *testing.T) {
fakeClient := registry.MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
kubelet := Kubelet{ kubelet := Kubelet{
Client: fakeClient, Client: fakeClient,
} }
channel := make(chan []api.ContainerManifest) channel := make(chan []api.ContainerManifest)
reader := startReading(channel) reader := startReading(channel)
fakeClient.Data["/registry/hosts/machine/kubelet"] = registry.EtcdResponseWithError{ fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{
R: &etcd.Response{}, R: &etcd.Response{},
E: nil, E: nil,
} }
@ -420,13 +419,13 @@ func TestGetKubeletStateFromEtcdNoData(t *testing.T) {
} }
func TestGetKubeletStateFromEtcd(t *testing.T) { func TestGetKubeletStateFromEtcd(t *testing.T) {
fakeClient := registry.MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
kubelet := Kubelet{ kubelet := Kubelet{
Client: fakeClient, Client: fakeClient,
} }
channel := make(chan []api.ContainerManifest) channel := make(chan []api.ContainerManifest)
reader := startReading(channel) reader := startReading(channel)
fakeClient.Data["/registry/hosts/machine/kubelet"] = registry.EtcdResponseWithError{ fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: &etcd.Node{ Node: &etcd.Node{
Value: util.MakeJSONString([]api.Container{}), Value: util.MakeJSONString([]api.Container{}),
@ -444,13 +443,13 @@ func TestGetKubeletStateFromEtcd(t *testing.T) {
} }
func TestGetKubeletStateFromEtcdNotFound(t *testing.T) { func TestGetKubeletStateFromEtcdNotFound(t *testing.T) {
fakeClient := registry.MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
kubelet := Kubelet{ kubelet := Kubelet{
Client: fakeClient, Client: fakeClient,
} }
channel := make(chan []api.ContainerManifest) channel := make(chan []api.ContainerManifest)
reader := startReading(channel) reader := startReading(channel)
fakeClient.Data["/registry/hosts/machine/kubelet"] = registry.EtcdResponseWithError{ fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{
R: &etcd.Response{}, R: &etcd.Response{},
E: &etcd.EtcdError{ E: &etcd.EtcdError{
ErrorCode: 100, ErrorCode: 100,
@ -466,13 +465,13 @@ func TestGetKubeletStateFromEtcdNotFound(t *testing.T) {
} }
func TestGetKubeletStateFromEtcdError(t *testing.T) { func TestGetKubeletStateFromEtcdError(t *testing.T) {
fakeClient := registry.MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
kubelet := Kubelet{ kubelet := Kubelet{
Client: fakeClient, Client: fakeClient,
} }
channel := make(chan []api.ContainerManifest) channel := make(chan []api.ContainerManifest)
reader := startReading(channel) reader := startReading(channel)
fakeClient.Data["/registry/hosts/machine/kubelet"] = registry.EtcdResponseWithError{ fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{
R: &etcd.Response{}, R: &etcd.Response{},
E: &etcd.EtcdError{ E: &etcd.EtcdError{
ErrorCode: 200, // non not found error ErrorCode: 200, // non not found error
@ -554,7 +553,7 @@ func TestSyncManifestsDeletes(t *testing.T) {
} }
func TestEventWriting(t *testing.T) { func TestEventWriting(t *testing.T) {
fakeEtcd := registry.MakeFakeEtcdClient(t) fakeEtcd := util.MakeFakeEtcdClient(t)
kubelet := &Kubelet{ kubelet := &Kubelet{
Client: fakeEtcd, Client: fakeEtcd,
} }
@ -581,7 +580,7 @@ func TestEventWriting(t *testing.T) {
} }
func TestEventWritingError(t *testing.T) { func TestEventWritingError(t *testing.T) {
fakeEtcd := registry.MakeFakeEtcdClient(t) fakeEtcd := util.MakeFakeEtcdClient(t)
kubelet := &Kubelet{ kubelet := &Kubelet{
Client: fakeEtcd, Client: fakeEtcd,
} }

View File

@ -13,38 +13,25 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package registry package registry
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"log" "log"
"reflect"
"github.com/coreos/go-etcd/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
) )
// TODO: Need to add a reconciler loop that makes sure that things in pods are reflected into // TODO: Need to add a reconciler loop that makes sure that things in pods are reflected into
// kubelet (and vice versa) // kubelet (and vice versa)
// EtcdClient is an injectable interface for testing.
type EtcdClient interface {
AddChild(key, data string, ttl uint64) (*etcd.Response, error)
Get(key string, sort, recursive bool) (*etcd.Response, error)
Set(key, value string, ttl uint64) (*etcd.Response, error)
Create(key, value string, ttl uint64) (*etcd.Response, error)
Delete(key string, recursive bool) (*etcd.Response, error)
// I'd like to use directional channels here (e.g. <-chan) but this interface mimics
// the etcd client interface which doesn't, and it doesn't seem worth it to wrap the api.
Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error)
}
// EtcdRegistry is an implementation of both ControllerRegistry and PodRegistry which is backed with etcd. // EtcdRegistry is an implementation of both ControllerRegistry and PodRegistry which is backed with etcd.
type EtcdRegistry struct { type EtcdRegistry struct {
etcdClient EtcdClient etcdClient util.EtcdClient
machines []string machines []string
manifestFactory ManifestFactory manifestFactory ManifestFactory
} }
@ -53,7 +40,7 @@ type EtcdRegistry struct {
// 'client' is the connection to etcd // 'client' is the connection to etcd
// 'machines' is the list of machines // 'machines' is the list of machines
// 'scheduler' is the scheduling algorithm to use. // 'scheduler' is the scheduling algorithm to use.
func MakeEtcdRegistry(client EtcdClient, machines []string) *EtcdRegistry { func MakeEtcdRegistry(client util.EtcdClient, machines []string) *EtcdRegistry {
registry := &EtcdRegistry{ registry := &EtcdRegistry{
etcdClient: client, etcdClient: client,
machines: machines, machines: machines,
@ -68,11 +55,15 @@ func makePodKey(machine, podID string) string {
return "/registry/hosts/" + machine + "/pods/" + podID return "/registry/hosts/" + machine + "/pods/" + podID
} }
func (registry *EtcdRegistry) helper() *util.EtcdHelper {
return &util.EtcdHelper{registry.etcdClient}
}
func (registry *EtcdRegistry) ListPods(query labels.Query) ([]api.Pod, error) { func (registry *EtcdRegistry) ListPods(query labels.Query) ([]api.Pod, error) {
pods := []api.Pod{} pods := []api.Pod{}
for _, machine := range registry.machines { for _, machine := range registry.machines {
var machinePods []api.Pod var machinePods []api.Pod
err := registry.extractList("/registry/hosts/"+machine+"/pods", &machinePods) err := registry.helper().ExtractList("/registry/hosts/"+machine+"/pods", &machinePods)
if err != nil { if err != nil {
return pods, err return pods, err
} }
@ -86,80 +77,6 @@ func (registry *EtcdRegistry) ListPods(query labels.Query) ([]api.Pod, error) {
return pods, nil return pods, nil
} }
func (registry *EtcdRegistry) listEtcdNode(key string) ([]*etcd.Node, error) {
result, err := registry.etcdClient.Get(key, false, true)
if err != nil {
nodes := make([]*etcd.Node, 0)
if isEtcdNotFound(err) {
return nodes, nil
} else {
return nodes, err
}
}
return result.Node.Nodes, nil
}
// Extract a go object per etcd node into a slice.
func (r *EtcdRegistry) extractList(key string, slicePtr interface{}) error {
nodes, err := r.listEtcdNode(key)
if err != nil {
return err
}
pv := reflect.ValueOf(slicePtr)
if pv.Type().Kind() != reflect.Ptr || pv.Type().Elem().Kind() != reflect.Slice {
// This should not happen at runtime.
panic("need ptr to slice")
}
v := pv.Elem()
for _, node := range nodes {
obj := reflect.New(v.Type().Elem())
err = json.Unmarshal([]byte(node.Value), obj.Interface())
if err != nil {
return err
}
v.Set(reflect.Append(v, obj.Elem()))
}
return nil
}
// Unmarshals json found at key into objPtr. On a not found error, will either return
// a zero object of the requested type, or an error, depending on ignoreNotFound. Treats
// empty responses and nil response nodes exactly like a not found error.
func (r *EtcdRegistry) extractObj(key string, objPtr interface{}, ignoreNotFound bool) error {
response, err := r.etcdClient.Get(key, false, false)
returnZero := false
if err != nil {
if ignoreNotFound && isEtcdNotFound(err) {
returnZero = true
} else {
return err
}
}
if !returnZero && (response.Node == nil || len(response.Node.Value) == 0) {
if ignoreNotFound {
returnZero = true
} else {
return fmt.Errorf("key '%v' found no nodes field: %#v", key, response)
}
}
if returnZero {
pv := reflect.ValueOf(objPtr)
pv.Elem().Set(reflect.Zero(pv.Type().Elem()))
return nil
}
return json.Unmarshal([]byte(response.Node.Value), objPtr)
}
// json marshals obj, and stores under key.
func (r *EtcdRegistry) setObj(key string, obj interface{}) error {
data, err := json.Marshal(obj)
if err != nil {
return err
}
_, err = r.etcdClient.Set(key, string(data), 0)
return err
}
func (registry *EtcdRegistry) GetPod(podID string) (*api.Pod, error) { func (registry *EtcdRegistry) GetPod(podID string) (*api.Pod, error) {
pod, _, err := registry.findPod(podID) pod, _, err := registry.findPod(podID)
return &pod, err return &pod, err
@ -170,12 +87,12 @@ func makeContainerKey(machine string) string {
} }
func (registry *EtcdRegistry) loadManifests(machine string) (manifests []api.ContainerManifest, err error) { func (registry *EtcdRegistry) loadManifests(machine string) (manifests []api.ContainerManifest, err error) {
err = registry.extractObj(makeContainerKey(machine), &manifests, true) err = registry.helper().ExtractObj(makeContainerKey(machine), &manifests, true)
return return
} }
func (registry *EtcdRegistry) updateManifests(machine string, manifests []api.ContainerManifest) error { func (registry *EtcdRegistry) updateManifests(machine string, manifests []api.ContainerManifest) error {
return registry.setObj(makeContainerKey(machine), manifests) return registry.helper().SetObj(makeContainerKey(machine), manifests)
} }
func (registry *EtcdRegistry) CreatePod(machineIn string, pod api.Pod) error { func (registry *EtcdRegistry) CreatePod(machineIn string, pod api.Pod) error {
@ -249,7 +166,7 @@ func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error
func (registry *EtcdRegistry) getPodForMachine(machine, podID string) (pod api.Pod, err error) { func (registry *EtcdRegistry) getPodForMachine(machine, podID string) (pod api.Pod, err error) {
key := makePodKey(machine, podID) key := makePodKey(machine, podID)
err = registry.extractObj(key, &pod, false) err = registry.helper().ExtractObj(key, &pod, false)
if err != nil { if err != nil {
return return
} }
@ -267,26 +184,9 @@ func (registry *EtcdRegistry) findPod(podID string) (api.Pod, string, error) {
return api.Pod{}, "", fmt.Errorf("pod not found %s", podID) return api.Pod{}, "", fmt.Errorf("pod not found %s", podID)
} }
func isEtcdNotFound(err error) bool {
if err == nil {
return false
}
switch err.(type) {
case *etcd.EtcdError:
etcdError := err.(*etcd.EtcdError)
if etcdError == nil {
return false
}
if etcdError.ErrorCode == 100 {
return true
}
}
return false
}
func (registry *EtcdRegistry) ListControllers() ([]api.ReplicationController, error) { func (registry *EtcdRegistry) ListControllers() ([]api.ReplicationController, error) {
var controllers []api.ReplicationController var controllers []api.ReplicationController
err := registry.extractList("/registry/controllers", &controllers) err := registry.helper().ExtractList("/registry/controllers", &controllers)
return controllers, err return controllers, err
} }
@ -297,7 +197,7 @@ func makeControllerKey(id string) string {
func (registry *EtcdRegistry) GetController(controllerID string) (*api.ReplicationController, error) { func (registry *EtcdRegistry) GetController(controllerID string) (*api.ReplicationController, error) {
var controller api.ReplicationController var controller api.ReplicationController
key := makeControllerKey(controllerID) key := makeControllerKey(controllerID)
err := registry.extractObj(key, &controller, false) err := registry.helper().ExtractObj(key, &controller, false)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -310,7 +210,7 @@ func (registry *EtcdRegistry) CreateController(controller api.ReplicationControl
} }
func (registry *EtcdRegistry) UpdateController(controller api.ReplicationController) error { func (registry *EtcdRegistry) UpdateController(controller api.ReplicationController) error {
return registry.setObj(makeControllerKey(controller.ID), controller) return registry.helper().SetObj(makeControllerKey(controller.ID), controller)
} }
func (registry *EtcdRegistry) DeleteController(controllerID string) error { func (registry *EtcdRegistry) DeleteController(controllerID string) error {
@ -325,18 +225,18 @@ func makeServiceKey(name string) string {
func (registry *EtcdRegistry) ListServices() (api.ServiceList, error) { func (registry *EtcdRegistry) ListServices() (api.ServiceList, error) {
var list api.ServiceList var list api.ServiceList
err := registry.extractList("/registry/services/specs", &list.Items) err := registry.helper().ExtractList("/registry/services/specs", &list.Items)
return list, err return list, err
} }
func (registry *EtcdRegistry) CreateService(svc api.Service) error { func (registry *EtcdRegistry) CreateService(svc api.Service) error {
return registry.setObj(makeServiceKey(svc.ID), svc) return registry.helper().SetObj(makeServiceKey(svc.ID), svc)
} }
func (registry *EtcdRegistry) GetService(name string) (*api.Service, error) { func (registry *EtcdRegistry) GetService(name string) (*api.Service, error) {
key := makeServiceKey(name) key := makeServiceKey(name)
var svc api.Service var svc api.Service
err := registry.extractObj(key, &svc, false) err := registry.helper().ExtractObj(key, &svc, false)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -359,5 +259,5 @@ func (registry *EtcdRegistry) UpdateService(svc api.Service) error {
} }
func (registry *EtcdRegistry) UpdateEndpoints(e api.Endpoints) error { func (registry *EtcdRegistry) UpdateEndpoints(e api.Endpoints) error {
return registry.setObj("/registry/services/endpoints/"+e.Name, e) return registry.helper().SetObj("/registry/services/endpoints/"+e.Name, e)
} }

View File

@ -26,8 +26,16 @@ import (
"github.com/coreos/go-etcd/etcd" "github.com/coreos/go-etcd/etcd"
) )
func MakeTestEtcdRegistry(client util.EtcdClient, machines []string) *EtcdRegistry {
registry := MakeEtcdRegistry(client, machines)
registry.manifestFactory = &BasicManifestFactory{
serviceRegistry: &MockServiceRegistry{},
}
return registry
}
func TestEtcdGetPod(t *testing.T) { func TestEtcdGetPod(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Set("/registry/hosts/machine/pods/foo", util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0) fakeClient.Set("/registry/hosts/machine/pods/foo", util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
pod, err := registry.GetPod("foo") pod, err := registry.GetPod("foo")
@ -38,8 +46,8 @@ func TestEtcdGetPod(t *testing.T) {
} }
func TestEtcdGetPodNotFound(t *testing.T) { func TestEtcdGetPodNotFound(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/hosts/machine/pods/foo"] = EtcdResponseWithError{ fakeClient.Data["/registry/hosts/machine/pods/foo"] = util.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: nil, Node: nil,
}, },
@ -55,8 +63,8 @@ func TestEtcdGetPodNotFound(t *testing.T) {
} }
func TestEtcdCreatePod(t *testing.T) { func TestEtcdCreatePod(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/hosts/machine/pods/foo"] = EtcdResponseWithError{ fakeClient.Data["/registry/hosts/machine/pods/foo"] = util.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: nil, Node: nil,
}, },
@ -97,8 +105,8 @@ func TestEtcdCreatePod(t *testing.T) {
} }
func TestEtcdCreatePodAlreadyExisting(t *testing.T) { func TestEtcdCreatePodAlreadyExisting(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/hosts/machine/pods/foo"] = EtcdResponseWithError{ fakeClient.Data["/registry/hosts/machine/pods/foo"] = util.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: &etcd.Node{ Node: &etcd.Node{
Value: util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), Value: util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}),
@ -118,14 +126,14 @@ func TestEtcdCreatePodAlreadyExisting(t *testing.T) {
} }
func TestEtcdCreatePodWithContainersError(t *testing.T) { func TestEtcdCreatePodWithContainersError(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/hosts/machine/pods/foo"] = EtcdResponseWithError{ fakeClient.Data["/registry/hosts/machine/pods/foo"] = util.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: nil, Node: nil,
}, },
E: &etcd.EtcdError{ErrorCode: 100}, E: &etcd.EtcdError{ErrorCode: 100},
} }
fakeClient.Data["/registry/hosts/machine/kubelet"] = EtcdResponseWithError{ fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: nil, Node: nil,
}, },
@ -150,14 +158,14 @@ func TestEtcdCreatePodWithContainersError(t *testing.T) {
} }
func TestEtcdCreatePodWithContainersNotFound(t *testing.T) { func TestEtcdCreatePodWithContainersNotFound(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/hosts/machine/pods/foo"] = EtcdResponseWithError{ fakeClient.Data["/registry/hosts/machine/pods/foo"] = util.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: nil, Node: nil,
}, },
E: &etcd.EtcdError{ErrorCode: 100}, E: &etcd.EtcdError{ErrorCode: 100},
} }
fakeClient.Data["/registry/hosts/machine/kubelet"] = EtcdResponseWithError{ fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: nil, Node: nil,
}, },
@ -198,8 +206,8 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) {
} }
func TestEtcdCreatePodWithExistingContainers(t *testing.T) { func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/hosts/machine/pods/foo"] = EtcdResponseWithError{ fakeClient.Data["/registry/hosts/machine/pods/foo"] = util.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: nil, Node: nil,
}, },
@ -245,7 +253,7 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
} }
func TestEtcdDeletePod(t *testing.T) { func TestEtcdDeletePod(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
key := "/registry/hosts/machine/pods/foo" key := "/registry/hosts/machine/pods/foo"
fakeClient.Set(key, util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0) fakeClient.Set(key, util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0)
fakeClient.Set("/registry/hosts/machine/kubelet", util.MakeJSONString([]api.ContainerManifest{ fakeClient.Set("/registry/hosts/machine/kubelet", util.MakeJSONString([]api.ContainerManifest{
@ -256,11 +264,11 @@ func TestEtcdDeletePod(t *testing.T) {
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.DeletePod("foo") err := registry.DeletePod("foo")
expectNoError(t, err) expectNoError(t, err)
if len(fakeClient.deletedKeys) != 1 { if len(fakeClient.DeletedKeys) != 1 {
t.Errorf("Expected 1 delete, found %#v", fakeClient.deletedKeys) t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys)
} }
if fakeClient.deletedKeys[0] != key { if fakeClient.DeletedKeys[0] != key {
t.Errorf("Unexpected key: %s, expected %s", fakeClient.deletedKeys[0], key) t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
} }
response, _ := fakeClient.Get("/registry/hosts/machine/kubelet", false, false) response, _ := fakeClient.Get("/registry/hosts/machine/kubelet", false, false)
if response.Node.Value != "[]" { if response.Node.Value != "[]" {
@ -269,7 +277,7 @@ func TestEtcdDeletePod(t *testing.T) {
} }
func TestEtcdDeletePodMultipleContainers(t *testing.T) { func TestEtcdDeletePodMultipleContainers(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
key := "/registry/hosts/machine/pods/foo" key := "/registry/hosts/machine/pods/foo"
fakeClient.Set(key, util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0) fakeClient.Set(key, util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0)
fakeClient.Set("/registry/hosts/machine/kubelet", util.MakeJSONString([]api.ContainerManifest{ fakeClient.Set("/registry/hosts/machine/kubelet", util.MakeJSONString([]api.ContainerManifest{
@ -279,11 +287,11 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) {
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.DeletePod("foo") err := registry.DeletePod("foo")
expectNoError(t, err) expectNoError(t, err)
if len(fakeClient.deletedKeys) != 1 { if len(fakeClient.DeletedKeys) != 1 {
t.Errorf("Expected 1 delete, found %#v", fakeClient.deletedKeys) t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys)
} }
if fakeClient.deletedKeys[0] != key { if fakeClient.DeletedKeys[0] != key {
t.Errorf("Unexpected key: %s, expected %s", fakeClient.deletedKeys[0], key) t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
} }
response, _ := fakeClient.Get("/registry/hosts/machine/kubelet", false, false) response, _ := fakeClient.Get("/registry/hosts/machine/kubelet", false, false)
var manifests []api.ContainerManifest var manifests []api.ContainerManifest
@ -297,9 +305,9 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) {
} }
func TestEtcdEmptyListPods(t *testing.T) { func TestEtcdEmptyListPods(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
key := "/registry/hosts/machine/pods" key := "/registry/hosts/machine/pods"
fakeClient.Data[key] = EtcdResponseWithError{ fakeClient.Data[key] = util.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: &etcd.Node{ Node: &etcd.Node{
Nodes: []*etcd.Node{}, Nodes: []*etcd.Node{},
@ -316,9 +324,9 @@ func TestEtcdEmptyListPods(t *testing.T) {
} }
func TestEtcdListPodsNotFound(t *testing.T) { func TestEtcdListPodsNotFound(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
key := "/registry/hosts/machine/pods" key := "/registry/hosts/machine/pods"
fakeClient.Data[key] = EtcdResponseWithError{ fakeClient.Data[key] = util.EtcdResponseWithError{
R: &etcd.Response{}, R: &etcd.Response{},
E: &etcd.EtcdError{ErrorCode: 100}, E: &etcd.EtcdError{ErrorCode: 100},
} }
@ -331,9 +339,9 @@ func TestEtcdListPodsNotFound(t *testing.T) {
} }
func TestEtcdListPods(t *testing.T) { func TestEtcdListPods(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
key := "/registry/hosts/machine/pods" key := "/registry/hosts/machine/pods"
fakeClient.Data[key] = EtcdResponseWithError{ fakeClient.Data[key] = util.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: &etcd.Node{ Node: &etcd.Node{
Nodes: []*etcd.Node{ Nodes: []*etcd.Node{
@ -361,9 +369,9 @@ func TestEtcdListPods(t *testing.T) {
} }
func TestEtcdListControllersNotFound(t *testing.T) { func TestEtcdListControllersNotFound(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
key := "/registry/controllers" key := "/registry/controllers"
fakeClient.Data[key] = EtcdResponseWithError{ fakeClient.Data[key] = util.EtcdResponseWithError{
R: &etcd.Response{}, R: &etcd.Response{},
E: &etcd.EtcdError{ErrorCode: 100}, E: &etcd.EtcdError{ErrorCode: 100},
} }
@ -376,9 +384,9 @@ func TestEtcdListControllersNotFound(t *testing.T) {
} }
func TestEtcdListServicesNotFound(t *testing.T) { func TestEtcdListServicesNotFound(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
key := "/registry/services/specs" key := "/registry/services/specs"
fakeClient.Data[key] = EtcdResponseWithError{ fakeClient.Data[key] = util.EtcdResponseWithError{
R: &etcd.Response{}, R: &etcd.Response{},
E: &etcd.EtcdError{ErrorCode: 100}, E: &etcd.EtcdError{ErrorCode: 100},
} }
@ -391,9 +399,9 @@ func TestEtcdListServicesNotFound(t *testing.T) {
} }
func TestEtcdListControllers(t *testing.T) { func TestEtcdListControllers(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
key := "/registry/controllers" key := "/registry/controllers"
fakeClient.Data[key] = EtcdResponseWithError{ fakeClient.Data[key] = util.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: &etcd.Node{ Node: &etcd.Node{
Nodes: []*etcd.Node{ Nodes: []*etcd.Node{
@ -417,7 +425,7 @@ func TestEtcdListControllers(t *testing.T) {
} }
func TestEtcdGetController(t *testing.T) { func TestEtcdGetController(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Set("/registry/controllers/foo", util.MakeJSONString(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0) fakeClient.Set("/registry/controllers/foo", util.MakeJSONString(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
ctrl, err := registry.GetController("foo") ctrl, err := registry.GetController("foo")
@ -428,8 +436,8 @@ func TestEtcdGetController(t *testing.T) {
} }
func TestEtcdGetControllerNotFound(t *testing.T) { func TestEtcdGetControllerNotFound(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/controllers/foo"] = EtcdResponseWithError{ fakeClient.Data["/registry/controllers/foo"] = util.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: nil, Node: nil,
}, },
@ -448,21 +456,21 @@ func TestEtcdGetControllerNotFound(t *testing.T) {
} }
func TestEtcdDeleteController(t *testing.T) { func TestEtcdDeleteController(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.DeleteController("foo") err := registry.DeleteController("foo")
expectNoError(t, err) expectNoError(t, err)
if len(fakeClient.deletedKeys) != 1 { if len(fakeClient.DeletedKeys) != 1 {
t.Errorf("Expected 1 delete, found %#v", fakeClient.deletedKeys) t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys)
} }
key := "/registry/controllers/foo" key := "/registry/controllers/foo"
if fakeClient.deletedKeys[0] != key { if fakeClient.DeletedKeys[0] != key {
t.Errorf("Unexpected key: %s, expected %s", fakeClient.deletedKeys[0], key) t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
} }
} }
func TestEtcdCreateController(t *testing.T) { func TestEtcdCreateController(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.CreateController(api.ReplicationController{ err := registry.CreateController(api.ReplicationController{
JSONBase: api.JSONBase{ JSONBase: api.JSONBase{
@ -481,7 +489,7 @@ func TestEtcdCreateController(t *testing.T) {
} }
func TestEtcdUpdateController(t *testing.T) { func TestEtcdUpdateController(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Set("/registry/controllers/foo", util.MakeJSONString(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0) fakeClient.Set("/registry/controllers/foo", util.MakeJSONString(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.UpdateController(api.ReplicationController{ err := registry.UpdateController(api.ReplicationController{
@ -498,9 +506,9 @@ func TestEtcdUpdateController(t *testing.T) {
} }
func TestEtcdListServices(t *testing.T) { func TestEtcdListServices(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
key := "/registry/services/specs" key := "/registry/services/specs"
fakeClient.Data[key] = EtcdResponseWithError{ fakeClient.Data[key] = util.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: &etcd.Node{ Node: &etcd.Node{
Nodes: []*etcd.Node{ Nodes: []*etcd.Node{
@ -524,8 +532,8 @@ func TestEtcdListServices(t *testing.T) {
} }
func TestEtcdCreateService(t *testing.T) { func TestEtcdCreateService(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/services/specs/foo"] = EtcdResponseWithError{ fakeClient.Data["/registry/services/specs/foo"] = util.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: nil, Node: nil,
}, },
@ -547,7 +555,7 @@ func TestEtcdCreateService(t *testing.T) {
} }
func TestEtcdGetService(t *testing.T) { func TestEtcdGetService(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Set("/registry/services/specs/foo", util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0) fakeClient.Set("/registry/services/specs/foo", util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
service, err := registry.GetService("foo") service, err := registry.GetService("foo")
@ -558,8 +566,8 @@ func TestEtcdGetService(t *testing.T) {
} }
func TestEtcdGetServiceNotFound(t *testing.T) { func TestEtcdGetServiceNotFound(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/services/specs/foo"] = EtcdResponseWithError{ fakeClient.Data["/registry/services/specs/foo"] = util.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: nil, Node: nil,
}, },
@ -575,25 +583,25 @@ func TestEtcdGetServiceNotFound(t *testing.T) {
} }
func TestEtcdDeleteService(t *testing.T) { func TestEtcdDeleteService(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.DeleteService("foo") err := registry.DeleteService("foo")
expectNoError(t, err) expectNoError(t, err)
if len(fakeClient.deletedKeys) != 2 { if len(fakeClient.DeletedKeys) != 2 {
t.Errorf("Expected 2 delete, found %#v", fakeClient.deletedKeys) t.Errorf("Expected 2 delete, found %#v", fakeClient.DeletedKeys)
} }
key := "/registry/services/specs/foo" key := "/registry/services/specs/foo"
if fakeClient.deletedKeys[0] != key { if fakeClient.DeletedKeys[0] != key {
t.Errorf("Unexpected key: %s, expected %s", fakeClient.deletedKeys[0], key) t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
} }
key = "/registry/services/endpoints/foo" key = "/registry/services/endpoints/foo"
if fakeClient.deletedKeys[1] != key { if fakeClient.DeletedKeys[1] != key {
t.Errorf("Unexpected key: %s, expected %s", fakeClient.deletedKeys[1], key) t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[1], key)
} }
} }
func TestEtcdUpdateService(t *testing.T) { func TestEtcdUpdateService(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Set("/registry/services/specs/foo", util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0) fakeClient.Set("/registry/services/specs/foo", util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.UpdateService(api.Service{ err := registry.UpdateService(api.Service{
@ -610,7 +618,7 @@ func TestEtcdUpdateService(t *testing.T) {
} }
func TestEtcdUpdateEndpoints(t *testing.T) { func TestEtcdUpdateEndpoints(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
endpoints := api.Endpoints{ endpoints := api.Endpoints{
Name: "foo", Name: "foo",

122
pkg/util/etcd_tools.go Normal file
View File

@ -0,0 +1,122 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"encoding/json"
"fmt"
"reflect"
"github.com/coreos/go-etcd/etcd"
)
// Interface exposing only the etcd operations needed by EtcdHelper.
type EtcdGetSet interface {
Get(key string, sort, recursive bool) (*etcd.Response, error)
Set(key, value string, ttl uint64) (*etcd.Response, error)
}
// EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client.
type EtcdHelper struct {
Client EtcdGetSet
}
// Returns true iff err is an etcd not found error.
func IsEtcdNotFound(err error) bool {
if err == nil {
return false
}
switch err.(type) {
case *etcd.EtcdError:
etcdError := err.(*etcd.EtcdError)
if etcdError == nil {
return false
}
if etcdError.ErrorCode == 100 {
return true
}
}
return false
}
func (h *EtcdHelper) listEtcdNode(key string) ([]*etcd.Node, error) {
result, err := h.Client.Get(key, false, true)
if err != nil {
nodes := make([]*etcd.Node, 0)
if IsEtcdNotFound(err) {
return nodes, nil
} else {
return nodes, err
}
}
return result.Node.Nodes, nil
}
// Extract a go object per etcd node into a slice.
func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}) error {
nodes, err := h.listEtcdNode(key)
if err != nil {
return err
}
pv := reflect.ValueOf(slicePtr)
if pv.Type().Kind() != reflect.Ptr || pv.Type().Elem().Kind() != reflect.Slice {
// This should not happen at runtime.
panic("need ptr to slice")
}
v := pv.Elem()
for _, node := range nodes {
obj := reflect.New(v.Type().Elem())
err = json.Unmarshal([]byte(node.Value), obj.Interface())
if err != nil {
return err
}
v.Set(reflect.Append(v, obj.Elem()))
}
return nil
}
// Unmarshals json found at key into objPtr. On a not found error, will either return
// a zero object of the requested type, or an error, depending on ignoreNotFound. Treats
// empty responses and nil response nodes exactly like a not found error.
func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) error {
response, err := h.Client.Get(key, false, false)
if err != nil && !IsEtcdNotFound(err) {
return err
}
if err != nil || response.Node == nil || len(response.Node.Value) == 0 {
if ignoreNotFound {
pv := reflect.ValueOf(objPtr)
pv.Elem().Set(reflect.Zero(pv.Type().Elem()))
return nil
} else if err != nil {
return err
}
return fmt.Errorf("key '%v' found no nodes field: %#v", key, response)
}
return json.Unmarshal([]byte(response.Node.Value), objPtr)
}
// SetObj marshals obj via json, and stores under key.
func (h *EtcdHelper) SetObj(key string, obj interface{}) error {
data, err := json.Marshal(obj)
if err != nil {
return err
}
_, err = h.Client.Set(key, string(data), 0)
return err
}

135
pkg/util/etcd_tools_test.go Normal file
View File

@ -0,0 +1,135 @@
package util
import (
"fmt"
"reflect"
"testing"
"github.com/coreos/go-etcd/etcd"
)
type fakeEtcdGetSet struct {
get func(key string, sort, recursive bool) (*etcd.Response, error)
set func(key, value string, ttl uint64) (*etcd.Response, error)
}
func TestIsNotFoundErr(t *testing.T) {
try := func(err error, isNotFound bool) {
if IsEtcdNotFound(err) != isNotFound {
t.Errorf("Expected %#v to return %v, but it did not", err, isNotFound)
}
}
try(&etcd.EtcdError{ErrorCode: 100}, true)
try(&etcd.EtcdError{ErrorCode: 101}, false)
try(nil, false)
try(fmt.Errorf("some other kind of error"), false)
}
type testMarshalType struct {
ID string `json:"id"`
}
func TestExtractList(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t)
fakeClient.Data["/some/key"] = EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Nodes: []*etcd.Node{
{
Value: `{"id":"foo"}`,
},
{
Value: `{"id":"bar"}`,
},
{
Value: `{"id":"baz"}`,
},
},
},
},
}
expect := []testMarshalType{
{"foo"},
{"bar"},
{"baz"},
}
var got []testMarshalType
helper := EtcdHelper{fakeClient}
err := helper.ExtractList("/some/key", &got)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
if !reflect.DeepEqual(got, expect) {
t.Errorf("Wanted %#v, got %#v", expect, got)
}
}
func TestExtractObj(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t)
expect := testMarshalType{ID: "foo"}
fakeClient.Set("/some/key", MakeJSONString(expect), 0)
helper := EtcdHelper{fakeClient}
var got testMarshalType
err := helper.ExtractObj("/some/key", &got, false)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
if !reflect.DeepEqual(got, expect) {
t.Errorf("Wanted %#v, got %#v", expect, got)
}
}
func TestExtractObjNotFoundErr(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t)
fakeClient.Data["/some/key"] = EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: &etcd.EtcdError{
ErrorCode: 100,
},
}
fakeClient.Data["/some/key2"] = EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
}
fakeClient.Data["/some/key3"] = EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: "",
},
},
}
helper := EtcdHelper{fakeClient}
try := func(key string) {
var got testMarshalType
err := helper.ExtractObj(key, &got, false)
if err == nil {
t.Errorf("%s: wanted error but didn't get one", key)
}
err = helper.ExtractObj(key, &got, true)
if err != nil {
t.Errorf("%s: didn't want error but got %#v", key, err)
}
}
try("/some/key")
try("/some/key2")
try("/some/key3")
}
func TestSetObj(t *testing.T) {
obj := testMarshalType{ID: "foo"}
fakeClient := MakeFakeEtcdClient(t)
helper := EtcdHelper{fakeClient}
err := helper.SetObj("/some/key", obj)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
expect := MakeJSONString(obj)
got := fakeClient.Data["/some/key"].R.Node.Value
if expect != got {
t.Errorf("Wanted %v, got %v", expect, got)
}
}

View File

@ -13,7 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package registry package util
import ( import (
"fmt" "fmt"
@ -22,6 +22,18 @@ import (
"github.com/coreos/go-etcd/etcd" "github.com/coreos/go-etcd/etcd"
) )
// EtcdClient is an injectable interface for testing.
type EtcdClient interface {
AddChild(key, data string, ttl uint64) (*etcd.Response, error)
Get(key string, sort, recursive bool) (*etcd.Response, error)
Set(key, value string, ttl uint64) (*etcd.Response, error)
Create(key, value string, ttl uint64) (*etcd.Response, error)
Delete(key string, recursive bool) (*etcd.Response, error)
// I'd like to use directional channels here (e.g. <-chan) but this interface mimics
// the etcd client interface which doesn't, and it doesn't seem worth it to wrap the api.
Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error)
}
type EtcdResponseWithError struct { type EtcdResponseWithError struct {
R *etcd.Response R *etcd.Response
E error E error
@ -29,10 +41,17 @@ type EtcdResponseWithError struct {
type FakeEtcdClient struct { type FakeEtcdClient struct {
Data map[string]EtcdResponseWithError Data map[string]EtcdResponseWithError
deletedKeys []string DeletedKeys []string
Err error Err error
t *testing.T t *testing.T
Ix int Ix int
// Will become valid after Watch is called; tester may write to it. Tester may
// also read from it to verify that it's closed after injecting an error.
WatchResponse chan *etcd.Response
// Write to this to prematurely stop a Watch that is running in a goroutine.
WatchInjectError chan<- error
WatchStop chan<- bool
} }
func MakeFakeEtcdClient(t *testing.T) *FakeEtcdClient { func MakeFakeEtcdClient(t *testing.T) *FakeEtcdClient {
@ -71,18 +90,22 @@ func (f *FakeEtcdClient) Create(key, value string, ttl uint64) (*etcd.Response,
return f.Set(key, value, ttl) return f.Set(key, value, ttl)
} }
func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, error) { func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, error) {
f.deletedKeys = append(f.deletedKeys, key) f.DeletedKeys = append(f.DeletedKeys, key)
return &etcd.Response{}, f.Err return &etcd.Response{}, f.Err
} }
func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) { func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) {
return nil, fmt.Errorf("unimplemented") f.WatchResponse = receiver
f.WatchStop = stop
injectedError := make(chan error)
defer close(injectedError)
f.WatchInjectError = injectedError
select {
case <-stop:
return nil, etcd.ErrWatchStoppedByUser
case err := <-injectedError:
return nil, err
} }
// Never get here.
func MakeTestEtcdRegistry(client EtcdClient, machines []string) *EtcdRegistry { return nil, nil
registry := MakeEtcdRegistry(client, machines)
registry.manifestFactory = &BasicManifestFactory{
serviceRegistry: &MockServiceRegistry{},
}
return registry
} }

View File

@ -32,6 +32,7 @@ type LogInterface interface {
// FakeHandler is to assist in testing HTTP requests. // FakeHandler is to assist in testing HTTP requests.
type FakeHandler struct { type FakeHandler struct {
RequestReceived *http.Request RequestReceived *http.Request
RequestBody string
StatusCode int StatusCode int
ResponseBody string ResponseBody string
// For logging - you can use a *testing.T // For logging - you can use a *testing.T
@ -48,7 +49,7 @@ func (f *FakeHandler) ServeHTTP(response http.ResponseWriter, request *http.Requ
if err != nil && f.T != nil { if err != nil && f.T != nil {
f.T.Logf("Received read error: %#v", err) f.T.Logf("Received read error: %#v", err)
} }
f.ResponseBody = string(bodyReceived) f.RequestBody = string(bodyReceived)
} }
func (f FakeHandler) ValidateRequest(t TestInterface, expectedPath, expectedMethod string, body *string) { func (f FakeHandler) ValidateRequest(t TestInterface, expectedPath, expectedMethod string, body *string) {
@ -59,8 +60,8 @@ func (f FakeHandler) ValidateRequest(t TestInterface, expectedPath, expectedMeth
t.Errorf("Unexpected method: %s", f.RequestReceived.Method) t.Errorf("Unexpected method: %s", f.RequestReceived.Method)
} }
if body != nil { if body != nil {
if *body != f.ResponseBody { if *body != f.RequestBody {
t.Errorf("Received body:\n%s\n Doesn't match expected body:\n%s", f.ResponseBody, *body) t.Errorf("Received body:\n%s\n Doesn't match expected body:\n%s", f.RequestBody, *body)
} }
} }
} }