diff --git a/pkg/master/master.go b/pkg/master/master.go index a01070ad418..e3cafe59636 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -25,10 +25,17 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" - "github.com/GoogleCloudPlatform/kubernetes/pkg/registry" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/controller" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/etcd" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/memory" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service" "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" - "github.com/coreos/go-etcd/etcd" + + goetcd "github.com/coreos/go-etcd/etcd" "github.com/golang/glog" ) @@ -46,10 +53,10 @@ type Config struct { // Master contains state for a Kubernetes cluster master/api server. type Master struct { - podRegistry registry.PodRegistry - controllerRegistry registry.ControllerRegistry - serviceRegistry registry.ServiceRegistry - minionRegistry registry.MinionRegistry + podRegistry pod.Registry + controllerRegistry controller.Registry + serviceRegistry service.Registry + minionRegistry minion.Registry storage map[string]apiserver.RESTStorage client *client.Client } @@ -57,10 +64,10 @@ type Master struct { // NewMemoryServer returns a new instance of Master backed with memory (not etcd). func NewMemoryServer(c *Config) *Master { m := &Master{ - podRegistry: registry.MakeMemoryRegistry(), - controllerRegistry: registry.MakeMemoryRegistry(), - serviceRegistry: registry.MakeMemoryRegistry(), - minionRegistry: registry.MakeMinionRegistry(c.Minions), + podRegistry: memory.NewRegistry(), + controllerRegistry: memory.NewRegistry(), + serviceRegistry: memory.NewRegistry(), + minionRegistry: minion.NewRegistry(c.Minions), client: c.Client, } m.init(c.Cloud, c.PodInfoGetter) @@ -69,12 +76,12 @@ func NewMemoryServer(c *Config) *Master { // New returns a new instance of Master connected to the given etcdServer. func New(c *Config) *Master { - etcdClient := etcd.NewClient(c.EtcdServers) + etcdClient := goetcd.NewClient(c.EtcdServers) minionRegistry := minionRegistryMaker(c) m := &Master{ - podRegistry: registry.MakeEtcdRegistry(etcdClient, minionRegistry), - controllerRegistry: registry.MakeEtcdRegistry(etcdClient, minionRegistry), - serviceRegistry: registry.MakeEtcdRegistry(etcdClient, minionRegistry), + podRegistry: etcd.NewRegistry(etcdClient, minionRegistry), + controllerRegistry: etcd.NewRegistry(etcdClient, minionRegistry), + serviceRegistry: etcd.NewRegistry(etcdClient, minionRegistry), minionRegistry: minionRegistry, client: c.Client, } @@ -82,23 +89,23 @@ func New(c *Config) *Master { return m } -func minionRegistryMaker(c *Config) registry.MinionRegistry { - var minionRegistry registry.MinionRegistry +func minionRegistryMaker(c *Config) minion.Registry { + var minionRegistry minion.Registry if c.Cloud != nil && len(c.MinionRegexp) > 0 { var err error - minionRegistry, err = registry.MakeCloudMinionRegistry(c.Cloud, c.MinionRegexp) + minionRegistry, err = minion.NewCloudRegistry(c.Cloud, c.MinionRegexp) if err != nil { glog.Errorf("Failed to initalize cloud minion registry reverting to static registry (%#v)", err) } } if minionRegistry == nil { - minionRegistry = registry.MakeMinionRegistry(c.Minions) + minionRegistry = minion.NewRegistry(c.Minions) } if c.HealthCheckMinions { - minionRegistry = registry.NewHealthyMinionRegistry(minionRegistry, &http.Client{}) + minionRegistry = minion.NewHealthyRegistry(minionRegistry, &http.Client{}) } if c.MinionCacheTTL > 0 { - cachingMinionRegistry, err := registry.NewCachingMinionRegistry(minionRegistry, c.MinionCacheTTL) + cachingMinionRegistry, err := minion.NewCachingRegistry(minionRegistry, c.MinionCacheTTL) if err != nil { glog.Errorf("Failed to initialize caching layer, ignoring cache.") } else { @@ -112,17 +119,23 @@ func (m *Master) init(cloud cloudprovider.Interface, podInfoGetter client.PodInf podCache := NewPodCache(podInfoGetter, m.podRegistry, time.Second*30) go podCache.Loop() - endpoints := registry.MakeEndpointController(m.serviceRegistry, m.client) + endpoints := endpoint.NewEndpointController(m.serviceRegistry, m.client) go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) random := rand.New(rand.NewSource(int64(time.Now().Nanosecond()))) s := scheduler.NewRandomFitScheduler(m.podRegistry, random) m.storage = map[string]apiserver.RESTStorage{ - "pods": registry.MakePodRegistryStorage(m.podRegistry, podInfoGetter, s, m.minionRegistry, cloud, podCache), - "replicationControllers": registry.NewControllerRegistryStorage(m.controllerRegistry, m.podRegistry), - "services": registry.MakeServiceRegistryStorage(m.serviceRegistry, cloud, m.minionRegistry), - "minions": registry.MakeMinionRegistryStorage(m.minionRegistry), - "bindings": registry.MakeBindingStorage(m.podRegistry), + "pods": pod.NewRegistryStorage(&pod.RegistryStorageConfig{ + CloudProvider: cloud, + MinionLister: m.minionRegistry, + PodCache: podCache, + PodInfoGetter: podInfoGetter, + Registry: m.podRegistry, + Scheduler: s, + }), + "replicationControllers": controller.NewRegistryStorage(m.controllerRegistry, m.podRegistry), + "services": service.NewRegistryStorage(m.serviceRegistry, cloud, m.minionRegistry), + "minions": minion.NewRegistryStorage(m.minionRegistry), } } diff --git a/pkg/master/pod_cache.go b/pkg/master/pod_cache.go index a4d0ff24b7c..1d7a36cef68 100644 --- a/pkg/master/pod_cache.go +++ b/pkg/master/pod_cache.go @@ -23,8 +23,9 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" - "github.com/GoogleCloudPlatform/kubernetes/pkg/registry" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/golang/glog" ) @@ -32,7 +33,7 @@ import ( // that cache up to date. type PodCache struct { containerInfo client.PodInfoGetter - pods registry.PodRegistry + pods pod.Registry // This is a map of pod id to a map of container name to the podInfo map[string]api.PodInfo period time.Duration @@ -40,7 +41,7 @@ type PodCache struct { } // NewPodCache returns a new PodCache which watches container information registered in the given PodRegistry. -func NewPodCache(info client.PodInfoGetter, pods registry.PodRegistry, period time.Duration) *PodCache { +func NewPodCache(info client.PodInfoGetter, pods pod.Registry, period time.Duration) *PodCache { return &PodCache{ containerInfo: info, pods: pods, diff --git a/pkg/master/pod_cache_test.go b/pkg/master/pod_cache_test.go index f2eb86c1d8e..d176c31c485 100644 --- a/pkg/master/pod_cache_test.go +++ b/pkg/master/pod_cache_test.go @@ -22,7 +22,7 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/registry" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" "github.com/fsouza/go-dockerclient" ) @@ -97,7 +97,7 @@ func TestPodUpdateAllContainers(t *testing.T) { } pods := []api.Pod{pod} - mockRegistry := registry.MakeMockPodRegistry(pods) + mockRegistry := registrytest.NewPodRegistry(pods) expected := api.PodInfo{"foo": docker.Container{ID: "foo"}} fake := FakePodInfoGetter{ diff --git a/pkg/registry/bindingstorage.go b/pkg/registry/binding/storage.go similarity index 93% rename from pkg/registry/bindingstorage.go rename to pkg/registry/binding/storage.go index 74900fdfbca..14926f6c57a 100644 --- a/pkg/registry/bindingstorage.go +++ b/pkg/registry/binding/storage.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package binding import ( "fmt" @@ -22,17 +22,18 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" ) // BindingStorage implements the RESTStorage interface. When bindings are written, it // changes the location of the affected pods. This information is eventually reflected // in the pod's CurrentState.Host field. type BindingStorage struct { - podRegistry PodRegistry + podRegistry pod.Registry } // MakeBindingStorage makes a new BindingStorage backed by the given PodRegistry. -func MakeBindingStorage(podRegistry PodRegistry) *BindingStorage { +func MakeBindingStorage(podRegistry pod.Registry) *BindingStorage { return &BindingStorage{ podRegistry: podRegistry, } diff --git a/pkg/registry/bindingstorage_test.go b/pkg/registry/binding/storage_test.go similarity index 98% rename from pkg/registry/bindingstorage_test.go rename to pkg/registry/binding/storage_test.go index a4371e8bfed..e60afcf6dc8 100644 --- a/pkg/registry/bindingstorage_test.go +++ b/pkg/registry/binding/storage_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package binding import ( "reflect" diff --git a/pkg/registry/interfaces.go b/pkg/registry/controller/registry.go similarity index 52% rename from pkg/registry/interfaces.go rename to pkg/registry/controller/registry.go index dfc3cd479e4..0765b2188bf 100644 --- a/pkg/registry/interfaces.go +++ b/pkg/registry/controller/registry.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package controller import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -22,22 +22,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) -// PodRegistry is an interface implemented by things that know how to store Pod objects. -type PodRegistry interface { - // ListPods obtains a list of pods that match selector. - ListPods(selector labels.Selector) ([]api.Pod, error) - // Get a specific pod - GetPod(podID string) (*api.Pod, error) - // Create a pod based on a specification, schedule it onto a specific machine. - CreatePod(machine string, pod api.Pod) error - // Update an existing pod - UpdatePod(pod api.Pod) error - // Delete an existing pod - DeletePod(podID string) error -} - -// ControllerRegistry is an interface for things that know how to store ReplicationControllers. -type ControllerRegistry interface { +// Registry is an interface for things that know how to store ReplicationControllers. +type Registry interface { ListControllers() ([]api.ReplicationController, error) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) GetController(controllerID string) (*api.ReplicationController, error) @@ -45,13 +31,3 @@ type ControllerRegistry interface { UpdateController(controller api.ReplicationController) error DeleteController(controllerID string) error } - -// ServiceRegistry is an interface for things that know how to store services. -type ServiceRegistry interface { - ListServices() (api.ServiceList, error) - CreateService(svc api.Service) error - GetService(name string) (*api.Service, error) - DeleteService(name string) error - UpdateService(svc api.Service) error - UpdateEndpoints(e api.Endpoints) error -} diff --git a/pkg/registry/controllerstorage.go b/pkg/registry/controller/storage.go similarity index 57% rename from pkg/registry/controllerstorage.go rename to pkg/registry/controller/storage.go index 458977d2a12..cd3c05c06a3 100644 --- a/pkg/registry/controllerstorage.go +++ b/pkg/registry/controller/storage.go @@ -14,39 +14,82 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package controller import ( "fmt" "time" - "code.google.com/p/go-uuid/uuid" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + + "code.google.com/p/go-uuid/uuid" ) -// ControllerRegistryStorage is an implementation of RESTStorage for the api server. -type ControllerRegistryStorage struct { - registry ControllerRegistry - podRegistry PodRegistry - // Period in between polls when waiting for a controller to complete - pollPeriod time.Duration +// RegistryStorage stores data for the replication controller service. +// It implements apiserver.RESTStorage. +type RegistryStorage struct { + registry Registry + podRegistry pod.Registry + pollPeriod time.Duration } -func NewControllerRegistryStorage(registry ControllerRegistry, podRegistry PodRegistry) apiserver.RESTStorage { - return &ControllerRegistryStorage{ +// NewRegistryStorage returns a new apiserver.RESTStorage for the given +// registry and podRegistry. +func NewRegistryStorage(registry Registry, podRegistry pod.Registry) apiserver.RESTStorage { + return &RegistryStorage{ registry: registry, podRegistry: podRegistry, pollPeriod: time.Second * 10, } } +// Create registers then given ReplicationController. +func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { + controller, ok := obj.(*api.ReplicationController) + if !ok { + return nil, fmt.Errorf("not a replication controller: %#v", obj) + } + if len(controller.ID) == 0 { + controller.ID = uuid.NewUUID().String() + } + // Pod Manifest ID should be assigned by the pod API + controller.DesiredState.PodTemplate.DesiredState.Manifest.ID = "" + if errs := api.ValidateReplicationController(controller); len(errs) > 0 { + return nil, fmt.Errorf("Validation errors: %v", errs) + } + return apiserver.MakeAsync(func() (interface{}, error) { + err := rs.registry.CreateController(*controller) + if err != nil { + return nil, err + } + return rs.waitForController(*controller) + }), nil +} + +// Delete asynchronously deletes the ReplicationController specified by its id. +func (rs *RegistryStorage) Delete(id string) (<-chan interface{}, error) { + return apiserver.MakeAsync(func() (interface{}, error) { + return api.Status{Status: api.StatusSuccess}, rs.registry.DeleteController(id) + }), nil +} + +// Get obtains the ReplicationController specified by its id. +func (rs *RegistryStorage) Get(id string) (interface{}, error) { + controller, err := rs.registry.GetController(id) + if err != nil { + return nil, err + } + return controller, err +} + // List obtains a list of ReplicationControllers that match selector. -func (storage *ControllerRegistryStorage) List(selector labels.Selector) (interface{}, error) { +func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) { result := api.ReplicationControllerList{} - controllers, err := storage.registry.ListControllers() + controllers, err := rs.registry.ListControllers() if err == nil { for _, controller := range controllers { if selector.Matches(labels.Set(controller.Labels)) { @@ -57,54 +100,14 @@ func (storage *ControllerRegistryStorage) List(selector labels.Selector) (interf return result, err } -// Get obtains the ReplicationController specified by its id. -func (storage *ControllerRegistryStorage) Get(id string) (interface{}, error) { - controller, err := storage.registry.GetController(id) - if err != nil { - return nil, err - } - return controller, err -} - -// Delete asynchronously deletes the ReplicationController specified by its id. -func (storage *ControllerRegistryStorage) Delete(id string) (<-chan interface{}, error) { - return apiserver.MakeAsync(func() (interface{}, error) { - return &api.Status{Status: api.StatusSuccess}, storage.registry.DeleteController(id) - }), nil -} - -// New creates a new ReplicationController for use with Create and Update -func (storage *ControllerRegistryStorage) New() interface{} { +// New creates a new ReplicationController for use with Create and Update. +func (rs RegistryStorage) New() interface{} { return &api.ReplicationController{} } -// Create registers a given new ReplicationController instance to storage.registry. -func (storage *ControllerRegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { - controller, ok := obj.(*api.ReplicationController) - if !ok { - return nil, fmt.Errorf("not a replication controller: %#v", obj) - } - if len(controller.ID) == 0 { - controller.ID = uuid.NewUUID().String() - } - // Pod Manifest ID should be assigned by the pod API - controller.DesiredState.PodTemplate.DesiredState.Manifest.ID = "" - - if errs := api.ValidateReplicationController(controller); len(errs) > 0 { - return nil, fmt.Errorf("Validation errors: %v", errs) - } - - return apiserver.MakeAsync(func() (interface{}, error) { - err := storage.registry.CreateController(*controller) - if err != nil { - return nil, err - } - return storage.waitForController(*controller) - }), nil -} - -// Update replaces a given ReplicationController instance with an existing instance in storage.registry. -func (storage *ControllerRegistryStorage) Update(obj interface{}) (<-chan interface{}, error) { +// Update replaces a given ReplicationController instance with an existing +// instance in storage.registry. +func (rs *RegistryStorage) Update(obj interface{}) (<-chan interface{}, error) { controller, ok := obj.(*api.ReplicationController) if !ok { return nil, fmt.Errorf("not a replication controller: %#v", obj) @@ -113,30 +116,30 @@ func (storage *ControllerRegistryStorage) Update(obj interface{}) (<-chan interf return nil, fmt.Errorf("Validation errors: %v", errs) } return apiserver.MakeAsync(func() (interface{}, error) { - err := storage.registry.UpdateController(*controller) + err := rs.registry.UpdateController(*controller) if err != nil { return nil, err } - return storage.waitForController(*controller) + return rs.waitForController(*controller) }), nil } -func (storage *ControllerRegistryStorage) waitForController(ctrl api.ReplicationController) (interface{}, error) { +// Watch returns ReplicationController events via a watch.Interface. +// It implements apiserver.ResourceWatcher. +func (rs *RegistryStorage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + return rs.registry.WatchControllers(label, field, resourceVersion) +} + +func (rs *RegistryStorage) waitForController(ctrl api.ReplicationController) (interface{}, error) { for { - pods, err := storage.podRegistry.ListPods(labels.Set(ctrl.DesiredState.ReplicaSelector).AsSelector()) + pods, err := rs.podRegistry.ListPods(labels.Set(ctrl.DesiredState.ReplicaSelector).AsSelector()) if err != nil { return ctrl, err } if len(pods) == ctrl.DesiredState.Replicas { break } - time.Sleep(storage.pollPeriod) + time.Sleep(rs.pollPeriod) } return ctrl, nil } - -// WatchAll returns ReplicationController events via a watch.Interface, implementing -// apiserver.ResourceWatcher. -func (storage *ControllerRegistryStorage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { - return storage.registry.WatchControllers(label, field, resourceVersion) -} diff --git a/pkg/registry/controllerstorage_test.go b/pkg/registry/controller/storage_test.go similarity index 78% rename from pkg/registry/controllerstorage_test.go rename to pkg/registry/controller/storage_test.go index 9dd699ca5f7..9447126b462 100644 --- a/pkg/registry/controllerstorage_test.go +++ b/pkg/registry/controller/storage_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package controller import ( "encoding/json" @@ -26,50 +26,20 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" ) -// TODO: Why do we have this AND MemoryRegistry? -type MockControllerRegistry struct { - err error - controllers []api.ReplicationController -} - -func (registry *MockControllerRegistry) ListControllers() ([]api.ReplicationController, error) { - return registry.controllers, registry.err -} - -func (registry *MockControllerRegistry) GetController(ID string) (*api.ReplicationController, error) { - return &api.ReplicationController{}, registry.err -} - -func (registry *MockControllerRegistry) CreateController(controller api.ReplicationController) error { - return registry.err -} - -func (registry *MockControllerRegistry) UpdateController(controller api.ReplicationController) error { - return registry.err -} - -func (registry *MockControllerRegistry) DeleteController(ID string) error { - return registry.err -} - -func (registry *MockControllerRegistry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { - return nil, registry.err -} - func TestListControllersError(t *testing.T) { - mockRegistry := MockControllerRegistry{ - err: fmt.Errorf("test error"), + mockRegistry := registrytest.ControllerRegistry{ + Err: fmt.Errorf("test error"), } - storage := ControllerRegistryStorage{ + storage := RegistryStorage{ registry: &mockRegistry, } controllersObj, err := storage.List(nil) controllers := controllersObj.(api.ReplicationControllerList) - if err != mockRegistry.err { - t.Errorf("Expected %#v, Got %#v", mockRegistry.err, err) + if err != mockRegistry.Err { + t.Errorf("Expected %#v, Got %#v", mockRegistry.Err, err) } if len(controllers.Items) != 0 { t.Errorf("Unexpected non-zero ctrl list: %#v", controllers) @@ -77,8 +47,8 @@ func TestListControllersError(t *testing.T) { } func TestListEmptyControllerList(t *testing.T) { - mockRegistry := MockControllerRegistry{} - storage := ControllerRegistryStorage{ + mockRegistry := registrytest.ControllerRegistry{} + storage := RegistryStorage{ registry: &mockRegistry, } controllers, err := storage.List(labels.Everything()) @@ -92,8 +62,8 @@ func TestListEmptyControllerList(t *testing.T) { } func TestListControllerList(t *testing.T) { - mockRegistry := MockControllerRegistry{ - controllers: []api.ReplicationController{ + mockRegistry := registrytest.ControllerRegistry{ + Controllers: []api.ReplicationController{ { JSONBase: api.JSONBase{ ID: "foo", @@ -106,7 +76,7 @@ func TestListControllerList(t *testing.T) { }, }, } - storage := ControllerRegistryStorage{ + storage := RegistryStorage{ registry: &mockRegistry, } controllersObj, err := storage.List(labels.Everything()) @@ -127,8 +97,8 @@ func TestListControllerList(t *testing.T) { } func TestControllerDecode(t *testing.T) { - mockRegistry := MockControllerRegistry{} - storage := ControllerRegistryStorage{ + mockRegistry := registrytest.ControllerRegistry{} + storage := RegistryStorage{ registry: &mockRegistry, } controller := &api.ReplicationController{ @@ -238,16 +208,16 @@ var validPodTemplate = api.PodTemplate{ } func TestCreateController(t *testing.T) { - mockRegistry := MockControllerRegistry{} - mockPodRegistry := MockPodRegistry{ - pods: []api.Pod{ + mockRegistry := registrytest.ControllerRegistry{} + mockPodRegistry := registrytest.PodRegistry{ + Pods: []api.Pod{ { JSONBase: api.JSONBase{ID: "foo"}, Labels: map[string]string{"a": "b"}, }, }, } - storage := ControllerRegistryStorage{ + storage := RegistryStorage{ registry: &mockRegistry, podRegistry: &mockPodRegistry, pollPeriod: time.Millisecond * 1, @@ -276,7 +246,7 @@ func TestCreateController(t *testing.T) { } mockPodRegistry.Lock() - mockPodRegistry.pods = []api.Pod{ + mockPodRegistry.Pods = []api.Pod{ { JSONBase: api.JSONBase{ID: "foo"}, Labels: map[string]string{"a": "b"}, @@ -297,8 +267,8 @@ func TestCreateController(t *testing.T) { } func TestControllerStorageValidatesCreate(t *testing.T) { - mockRegistry := MockControllerRegistry{} - storage := ControllerRegistryStorage{ + mockRegistry := registrytest.ControllerRegistry{} + storage := RegistryStorage{ registry: &mockRegistry, podRegistry: nil, pollPeriod: time.Millisecond * 1, @@ -328,13 +298,12 @@ func TestControllerStorageValidatesCreate(t *testing.T) { } func TestControllerStorageValidatesUpdate(t *testing.T) { - mockRegistry := MockControllerRegistry{} - storage := ControllerRegistryStorage{ + mockRegistry := registrytest.ControllerRegistry{} + storage := RegistryStorage{ registry: &mockRegistry, podRegistry: nil, pollPeriod: time.Millisecond * 1, } - failureCases := map[string]api.ReplicationController{ "empty ID": { JSONBase: api.JSONBase{ID: ""}, diff --git a/pkg/registry/endpoints.go b/pkg/registry/endpoint/endpoints.go similarity index 87% rename from pkg/registry/endpoints.go rename to pkg/registry/endpoint/endpoints.go index f7d34f89899..1eaa5e32c37 100644 --- a/pkg/registry/endpoints.go +++ b/pkg/registry/endpoint/endpoints.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package endpoint import ( "fmt" @@ -24,42 +24,27 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/golang/glog" ) -func MakeEndpointController(serviceRegistry ServiceRegistry, client *client.Client) *EndpointController { +// A EndpointController manages service endpoints. +type EndpointController struct { + client *client.Client + serviceRegistry service.Registry +} + +// NewEndpointController returns a new *EndpointController. +func NewEndpointController(serviceRegistry service.Registry, client *client.Client) *EndpointController { return &EndpointController{ serviceRegistry: serviceRegistry, client: client, } } -type EndpointController struct { - serviceRegistry ServiceRegistry - client *client.Client -} - -func findPort(manifest *api.ContainerManifest, portName util.IntOrString) (int, error) { - if ((portName.Kind == util.IntstrString && len(portName.StrVal) == 0) || - (portName.Kind == util.IntstrInt && portName.IntVal == 0)) && - len(manifest.Containers[0].Ports) > 0 { - return manifest.Containers[0].Ports[0].ContainerPort, nil - } - if portName.Kind == util.IntstrInt { - return portName.IntVal, nil - } - name := portName.StrVal - for _, container := range manifest.Containers { - for _, port := range container.Ports { - if port.Name == name { - return port.ContainerPort, nil - } - } - } - return -1, fmt.Errorf("no suitable port for manifest: %s", manifest.ID) -} - +// SyncServiceEndpoints syncs service endpoints. func (e *EndpointController) SyncServiceEndpoints() error { services, err := e.serviceRegistry.ListServices() if err != nil { @@ -98,3 +83,24 @@ func (e *EndpointController) SyncServiceEndpoints() error { } return resultErr } + +// findPort locates the container port for the given manifest and portName. +func findPort(manifest *api.ContainerManifest, portName util.IntOrString) (int, error) { + if ((portName.Kind == util.IntstrString && len(portName.StrVal) == 0) || + (portName.Kind == util.IntstrInt && portName.IntVal == 0)) && + len(manifest.Containers[0].Ports) > 0 { + return manifest.Containers[0].Ports[0].ContainerPort, nil + } + if portName.Kind == util.IntstrInt { + return portName.IntVal, nil + } + name := portName.StrVal + for _, container := range manifest.Containers { + for _, port := range container.Ports { + if port.Name == name { + return port.ContainerPort, nil + } + } + } + return -1, fmt.Errorf("no suitable port for manifest: %s", manifest.ID) +} diff --git a/pkg/registry/endpoints_test.go b/pkg/registry/endpoint/endpoints_test.go similarity index 84% rename from pkg/registry/endpoints_test.go rename to pkg/registry/endpoint/endpoints_test.go index db5d3463f5d..87fe7d4e927 100644 --- a/pkg/registry/endpoints_test.go +++ b/pkg/registry/endpoint/endpoints_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package endpoint import ( "encoding/json" @@ -24,6 +24,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) @@ -82,7 +83,6 @@ func TestFindPort(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - if port != 8080 { t.Errorf("Expected 8080, Got %d", port) } @@ -90,7 +90,6 @@ func TestFindPort(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - if port != 8000 { t.Errorf("Expected 8000, Got %d", port) } @@ -110,7 +109,6 @@ func TestFindPort(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - if port != 8080 { t.Errorf("Expected 8080, Got %d", port) } @@ -118,7 +116,6 @@ func TestFindPort(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - if port != 8080 { t.Errorf("Expected 8080, Got %d", port) } @@ -132,10 +129,8 @@ func TestSyncEndpointsEmpty(t *testing.T) { } testServer := httptest.NewTLSServer(&fakeHandler) client := client.New(testServer.URL, nil) - - serviceRegistry := MockServiceRegistry{} - - endpoints := MakeEndpointController(&serviceRegistry, client) + serviceRegistry := registrytest.ServiceRegistry{} + endpoints := NewEndpointController(&serviceRegistry, client) err := endpoints.SyncServiceEndpoints() if err != nil { t.Errorf("unexpected error: %v", err) @@ -151,15 +146,13 @@ func TestSyncEndpointsError(t *testing.T) { } testServer := httptest.NewTLSServer(&fakeHandler) client := client.New(testServer.URL, nil) - - serviceRegistry := MockServiceRegistry{ - err: fmt.Errorf("test error"), + serviceRegistry := registrytest.ServiceRegistry{ + Err: fmt.Errorf("test error"), } - - endpoints := MakeEndpointController(&serviceRegistry, client) + endpoints := NewEndpointController(&serviceRegistry, client) err := endpoints.SyncServiceEndpoints() - if err != serviceRegistry.err { - t.Errorf("Errors don't match: %#v %#v", err, serviceRegistry.err) + if err != serviceRegistry.Err { + t.Errorf("Errors don't match: %#v %#v", err, serviceRegistry.Err) } } @@ -171,9 +164,8 @@ func TestSyncEndpointsItems(t *testing.T) { } testServer := httptest.NewTLSServer(&fakeHandler) client := client.New(testServer.URL, nil) - - serviceRegistry := MockServiceRegistry{ - list: api.ServiceList{ + serviceRegistry := registrytest.ServiceRegistry{ + List: api.ServiceList{ Items: []api.Service{ { Selector: map[string]string{ @@ -183,15 +175,13 @@ func TestSyncEndpointsItems(t *testing.T) { }, }, } - - endpoints := MakeEndpointController(&serviceRegistry, client) + endpoints := NewEndpointController(&serviceRegistry, client) err := endpoints.SyncServiceEndpoints() if err != nil { t.Errorf("unexpected error: %v", err) } - - if len(serviceRegistry.endpoints.Endpoints) != 1 { - t.Errorf("Unexpected endpoints update: %#v", serviceRegistry.endpoints) + if len(serviceRegistry.Endpoints.Endpoints) != 1 { + t.Errorf("Unexpected endpoints update: %#v", serviceRegistry.Endpoints) } } @@ -201,9 +191,8 @@ func TestSyncEndpointsPodError(t *testing.T) { } testServer := httptest.NewTLSServer(&fakeHandler) client := client.New(testServer.URL, nil) - - serviceRegistry := MockServiceRegistry{ - list: api.ServiceList{ + serviceRegistry := registrytest.ServiceRegistry{ + List: api.ServiceList{ Items: []api.Service{ { Selector: map[string]string{ @@ -213,8 +202,7 @@ func TestSyncEndpointsPodError(t *testing.T) { }, }, } - - endpoints := MakeEndpointController(&serviceRegistry, client) + endpoints := NewEndpointController(&serviceRegistry, client) err := endpoints.SyncServiceEndpoints() if err == nil { t.Error("Unexpected non-error") diff --git a/pkg/registry/etcdregistry.go b/pkg/registry/etcd/etcd.go similarity index 63% rename from pkg/registry/etcdregistry.go rename to pkg/registry/etcd/etcd.go index 2eb9e30deb4..31c4f669663 100644 --- a/pkg/registry/etcdregistry.go +++ b/pkg/registry/etcd/etcd.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package etcd import ( "fmt" @@ -22,27 +22,31 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + "github.com/golang/glog" ) // TODO: Need to add a reconciler loop that makes sure that things in pods are reflected into // kubelet (and vice versa) -// EtcdRegistry implements PodRegistry, ControllerRegistry and ServiceRegistry with backed by etcd. -type EtcdRegistry struct { - helper tools.EtcdHelper +// Registry implements PodRegistry, ControllerRegistry and ServiceRegistry +// with backed by etcd. +type Registry struct { + tools.EtcdHelper manifestFactory ManifestFactory } -// MakeEtcdRegistry creates an etcd registry. -// 'client' is the connection to etcd -// 'machines' is the list of machines -// 'scheduler' is the scheduling algorithm to use. -func MakeEtcdRegistry(client tools.EtcdClient, machines MinionRegistry) *EtcdRegistry { - registry := &EtcdRegistry{ - helper: tools.EtcdHelper{client, api.Codec, api.ResourceVersioner}, +// NewRegistry creates an etcd registry. +func NewRegistry(client tools.EtcdClient, machines minion.Registry) *Registry { + registry := &Registry{ + EtcdHelper: tools.EtcdHelper{ + client, + api.Codec, + api.ResourceVersioner, + }, } registry.manifestFactory = &BasicManifestFactory{ serviceRegistry: registry, @@ -55,11 +59,10 @@ func makePodKey(podID string) string { } // ListPods obtains a list of pods that match selector. -func (registry *EtcdRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) { +func (r *Registry) ListPods(selector labels.Selector) ([]api.Pod, error) { allPods := []api.Pod{} filteredPods := []api.Pod{} - err := registry.helper.ExtractList("/registry/pods", &allPods) - if err != nil { + if err := r.ExtractList("/registry/pods", &allPods); err != nil { return nil, err } for _, pod := range allPods { @@ -75,10 +78,9 @@ func (registry *EtcdRegistry) ListPods(selector labels.Selector) ([]api.Pod, err } // GetPod gets a specific pod specified by its ID. -func (registry *EtcdRegistry) GetPod(podID string) (*api.Pod, error) { +func (r *Registry) GetPod(podID string) (*api.Pod, error) { var pod api.Pod - err := registry.helper.ExtractObj(makePodKey(podID), &pod, false) - if err != nil { + if err := r.ExtractObj(makePodKey(podID), &pod, false); err != nil { return nil, err } // TODO: Currently nothing sets CurrentState.Host. We need a feedback loop that sets @@ -93,66 +95,52 @@ func makeContainerKey(machine string) string { } // CreatePod creates a pod based on a specification, schedule it onto a specific machine. -func (registry *EtcdRegistry) CreatePod(machine string, pod api.Pod) error { +func (r *Registry) CreatePod(machine string, pod api.Pod) error { // Set current status to "Waiting". pod.CurrentState.Status = api.PodWaiting pod.CurrentState.Host = "" - // DesiredState.Host == "" is a signal to the scheduler that this pod needs scheduling. pod.DesiredState.Status = api.PodRunning pod.DesiredState.Host = "" - - err := registry.helper.CreateObj(makePodKey(pod.ID), &pod) - if err != nil { + if err := r.CreateObj(makePodKey(pod.ID), &pod); err != nil { return err } - // TODO: Until scheduler separation is completed, just assign here. - return registry.AssignPod(pod.ID, machine) + return r.AssignPod(pod.ID, machine) } // AssignPod assigns the given pod to the given machine. // TODO: hook this up via apiserver, not by calling it from CreatePod(). -func (registry *EtcdRegistry) AssignPod(podID string, machine string) error { +func (r *Registry) AssignPod(podID string, machine string) error { podKey := makePodKey(podID) var finalPod *api.Pod - err := registry.helper.AtomicUpdate( - podKey, - &api.Pod{}, - func(obj interface{}) (interface{}, error) { - pod, ok := obj.(*api.Pod) - if !ok { - return nil, fmt.Errorf("unexpected object: %#v", obj) - } - pod.DesiredState.Host = machine - finalPod = pod - return pod, nil - }, - ) + err := r.AtomicUpdate(podKey, &api.Pod{}, func(obj interface{}) (interface{}, error) { + pod, ok := obj.(*api.Pod) + if !ok { + return nil, fmt.Errorf("unexpected object: %#v", obj) + } + pod.DesiredState.Host = machine + finalPod = pod + return pod, nil + }) if err != nil { return err } - // TODO: move this to a watch/rectification loop. - manifest, err := registry.manifestFactory.MakeManifest(machine, *finalPod) + manifest, err := r.manifestFactory.MakeManifest(machine, *finalPod) if err != nil { return err } - contKey := makeContainerKey(machine) - err = registry.helper.AtomicUpdate( - contKey, - &api.ContainerManifestList{}, - func(in interface{}) (interface{}, error) { - manifests := *in.(*api.ContainerManifestList) - manifests.Items = append(manifests.Items, manifest) - return manifests, nil - }, - ) + err = r.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) { + manifests := *in.(*api.ContainerManifestList) + manifests.Items = append(manifests.Items, manifest) + return manifests, nil + }) if err != nil { // Don't strand stuff. This is a terrible hack that won't be needed // when the above TODO is fixed. - err2 := registry.helper.Delete(podKey, false) + err2 := r.Delete(podKey, false) if err2 != nil { glog.Errorf("Probably stranding a pod, couldn't delete %v: %#v", podKey, err2) } @@ -160,41 +148,38 @@ func (registry *EtcdRegistry) AssignPod(podID string, machine string) error { return err } -func (registry *EtcdRegistry) UpdatePod(pod api.Pod) error { +func (r *Registry) UpdatePod(pod api.Pod) error { return fmt.Errorf("unimplemented!") } // DeletePod deletes an existing pod specified by its ID. -func (registry *EtcdRegistry) DeletePod(podID string) error { +func (r *Registry) DeletePod(podID string) error { var pod api.Pod podKey := makePodKey(podID) - err := registry.helper.ExtractObj(podKey, &pod, false) + err := r.ExtractObj(podKey, &pod, false) if tools.IsEtcdNotFound(err) { return apiserver.NewNotFoundErr("pod", podID) } if err != nil { return err } - // First delete the pod, so a scheduler doesn't notice it getting removed from the // machine and attempt to put it somewhere. - err = registry.helper.Delete(podKey, true) + err = r.Delete(podKey, true) if tools.IsEtcdNotFound(err) { return apiserver.NewNotFoundErr("pod", podID) } if err != nil { return err } - machine := pod.DesiredState.Host if machine == "" { // Pod was never scheduled anywhere, just return. return nil } - // Next, remove the pod from the machine atomically. contKey := makeContainerKey(machine) - return registry.helper.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) { + return r.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) { manifests := in.(*api.ContainerManifestList) newManifests := make([]api.ContainerManifest, 0, len(manifests.Items)) found := false @@ -217,18 +202,18 @@ func (registry *EtcdRegistry) DeletePod(podID string) error { } // ListControllers obtains a list of ReplicationControllers. -func (registry *EtcdRegistry) ListControllers() ([]api.ReplicationController, error) { +func (r *Registry) ListControllers() ([]api.ReplicationController, error) { var controllers []api.ReplicationController - err := registry.helper.ExtractList("/registry/controllers", &controllers) + err := r.ExtractList("/registry/controllers", &controllers) return controllers, err } // WatchControllers begins watching for new, changed, or deleted controllers. -func (registry *EtcdRegistry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (r *Registry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { if !field.Empty() { return nil, fmt.Errorf("no field selector implemented for controllers") } - return registry.helper.WatchList("/registry/controllers", resourceVersion, func(obj interface{}) bool { + return r.WatchList("/registry/controllers", resourceVersion, func(obj interface{}) bool { return label.Matches(labels.Set(obj.(*api.ReplicationController).Labels)) }) } @@ -238,10 +223,10 @@ func makeControllerKey(id string) string { } // GetController gets a specific ReplicationController specified by its ID. -func (registry *EtcdRegistry) GetController(controllerID string) (*api.ReplicationController, error) { +func (r *Registry) GetController(controllerID string) (*api.ReplicationController, error) { var controller api.ReplicationController key := makeControllerKey(controllerID) - err := registry.helper.ExtractObj(key, &controller, false) + err := r.ExtractObj(key, &controller, false) if tools.IsEtcdNotFound(err) { return nil, apiserver.NewNotFoundErr("replicationController", controllerID) } @@ -252,8 +237,8 @@ func (registry *EtcdRegistry) GetController(controllerID string) (*api.Replicati } // CreateController creates a new ReplicationController. -func (registry *EtcdRegistry) CreateController(controller api.ReplicationController) error { - err := registry.helper.CreateObj(makeControllerKey(controller.ID), controller) +func (r *Registry) CreateController(controller api.ReplicationController) error { + err := r.CreateObj(makeControllerKey(controller.ID), controller) if tools.IsEtcdNodeExist(err) { return apiserver.NewAlreadyExistsErr("replicationController", controller.ID) } @@ -261,14 +246,14 @@ func (registry *EtcdRegistry) CreateController(controller api.ReplicationControl } // UpdateController replaces an existing ReplicationController. -func (registry *EtcdRegistry) UpdateController(controller api.ReplicationController) error { - return registry.helper.SetObj(makeControllerKey(controller.ID), controller) +func (r *Registry) UpdateController(controller api.ReplicationController) error { + return r.SetObj(makeControllerKey(controller.ID), controller) } // DeleteController deletes a ReplicationController specified by its ID. -func (registry *EtcdRegistry) DeleteController(controllerID string) error { +func (r *Registry) DeleteController(controllerID string) error { key := makeControllerKey(controllerID) - err := registry.helper.Delete(key, false) + err := r.Delete(key, false) if tools.IsEtcdNotFound(err) { return apiserver.NewNotFoundErr("replicationController", controllerID) } @@ -280,15 +265,15 @@ func makeServiceKey(name string) string { } // ListServices obtains a list of Services. -func (registry *EtcdRegistry) ListServices() (api.ServiceList, error) { +func (r *Registry) ListServices() (api.ServiceList, error) { var list api.ServiceList - err := registry.helper.ExtractList("/registry/services/specs", &list.Items) + err := r.ExtractList("/registry/services/specs", &list.Items) return list, err } // CreateService creates a new Service. -func (registry *EtcdRegistry) CreateService(svc api.Service) error { - err := registry.helper.CreateObj(makeServiceKey(svc.ID), svc) +func (r *Registry) CreateService(svc api.Service) error { + err := r.CreateObj(makeServiceKey(svc.ID), svc) if tools.IsEtcdNodeExist(err) { return apiserver.NewAlreadyExistsErr("service", svc.ID) } @@ -296,10 +281,10 @@ func (registry *EtcdRegistry) CreateService(svc api.Service) error { } // GetService obtains a Service specified by its name. -func (registry *EtcdRegistry) GetService(name string) (*api.Service, error) { +func (r *Registry) GetService(name string) (*api.Service, error) { key := makeServiceKey(name) var svc api.Service - err := registry.helper.ExtractObj(key, &svc, false) + err := r.ExtractObj(key, &svc, false) if tools.IsEtcdNotFound(err) { return nil, apiserver.NewNotFoundErr("service", name) } @@ -314,9 +299,9 @@ func makeServiceEndpointsKey(name string) string { } // DeleteService deletes a Service specified by its name. -func (registry *EtcdRegistry) DeleteService(name string) error { +func (r *Registry) DeleteService(name string) error { key := makeServiceKey(name) - err := registry.helper.Delete(key, true) + err := r.Delete(key, true) if tools.IsEtcdNotFound(err) { return apiserver.NewNotFoundErr("service", name) } @@ -324,7 +309,7 @@ func (registry *EtcdRegistry) DeleteService(name string) error { return err } key = makeServiceEndpointsKey(name) - err = registry.helper.Delete(key, true) + err = r.Delete(key, true) if !tools.IsEtcdNotFound(err) { return err } @@ -332,12 +317,14 @@ func (registry *EtcdRegistry) DeleteService(name string) error { } // UpdateService replaces an existing Service. -func (registry *EtcdRegistry) UpdateService(svc api.Service) error { - return registry.helper.SetObj(makeServiceKey(svc.ID), svc) +func (r *Registry) UpdateService(svc api.Service) error { + return r.SetObj(makeServiceKey(svc.ID), svc) } // UpdateEndpoints update Endpoints of a Service. -func (registry *EtcdRegistry) UpdateEndpoints(e api.Endpoints) error { - updateFunc := func(interface{}) (interface{}, error) { return e, nil } - return registry.helper.AtomicUpdate(makeServiceEndpointsKey(e.ID), &api.Endpoints{}, updateFunc) +func (r *Registry) UpdateEndpoints(e api.Endpoints) error { + return r.AtomicUpdate(makeServiceEndpointsKey(e.ID), &api.Endpoints{}, + func(interface{}) (interface{}, error) { + return e, nil + }) } diff --git a/pkg/registry/etcdregistry_test.go b/pkg/registry/etcd/etcd_test.go similarity index 98% rename from pkg/registry/etcdregistry_test.go rename to pkg/registry/etcd/etcd_test.go index 967e60645bb..6ef0889c939 100644 --- a/pkg/registry/etcdregistry_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package etcd import ( "reflect" @@ -23,14 +23,17 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/coreos/go-etcd/etcd" ) -func MakeTestEtcdRegistry(client tools.EtcdClient, machines []string) *EtcdRegistry { - registry := MakeEtcdRegistry(client, MakeMinionRegistry(machines)) +func MakeTestEtcdRegistry(client tools.EtcdClient, machines []string) *Registry { + registry := NewRegistry(client, minion.NewRegistry(machines)) registry.manifestFactory = &BasicManifestFactory{ - serviceRegistry: &MockServiceRegistry{}, + serviceRegistry: ®istrytest.ServiceRegistry{}, } return registry } diff --git a/pkg/registry/manifest_factory.go b/pkg/registry/etcd/manifest_factory.go similarity index 86% rename from pkg/registry/manifest_factory.go rename to pkg/registry/etcd/manifest_factory.go index 9f8cb19a598..fc724882b30 100644 --- a/pkg/registry/manifest_factory.go +++ b/pkg/registry/etcd/manifest_factory.go @@ -14,10 +14,11 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package etcd import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service" ) type ManifestFactory interface { @@ -26,11 +27,11 @@ type ManifestFactory interface { } type BasicManifestFactory struct { - serviceRegistry ServiceRegistry + serviceRegistry service.Registry } func (b *BasicManifestFactory) MakeManifest(machine string, pod api.Pod) (api.ContainerManifest, error) { - envVars, err := GetServiceEnvironmentVariables(b.serviceRegistry, machine) + envVars, err := service.GetServiceEnvironmentVariables(b.serviceRegistry, machine) if err != nil { return api.ContainerManifest{}, err } diff --git a/pkg/registry/manifest_factory_test.go b/pkg/registry/etcd/manifest_factory_test.go similarity index 94% rename from pkg/registry/manifest_factory_test.go rename to pkg/registry/etcd/manifest_factory_test.go index c95f1669c6d..b51090be5cf 100644 --- a/pkg/registry/manifest_factory_test.go +++ b/pkg/registry/etcd/manifest_factory_test.go @@ -14,18 +14,19 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package etcd import ( "reflect" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) func TestMakeManifestNoServices(t *testing.T) { - registry := MockServiceRegistry{} + registry := registrytest.ServiceRegistry{} factory := &BasicManifestFactory{ serviceRegistry: ®istry, } @@ -58,8 +59,8 @@ func TestMakeManifestNoServices(t *testing.T) { } func TestMakeManifestServices(t *testing.T) { - registry := MockServiceRegistry{ - list: api.ServiceList{ + registry := registrytest.ServiceRegistry{ + List: api.ServiceList{ Items: []api.Service{ { JSONBase: api.JSONBase{ID: "test"}, @@ -134,8 +135,8 @@ func TestMakeManifestServices(t *testing.T) { } func TestMakeManifestServicesExistingEnvVar(t *testing.T) { - registry := MockServiceRegistry{ - list: api.ServiceList{ + registry := registrytest.ServiceRegistry{ + List: api.ServiceList{ Items: []api.Service{ { JSONBase: api.JSONBase{ID: "test"}, diff --git a/pkg/registry/memory/memory.go b/pkg/registry/memory/memory.go new file mode 100644 index 00000000000..d74b037858f --- /dev/null +++ b/pkg/registry/memory/memory.go @@ -0,0 +1,191 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package memory + +import ( + "errors" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +// An implementation of PodRegistry and ControllerRegistry that is backed +// by memory. Mainly used for testing. +type Registry struct { + podData map[string]api.Pod + controllerData map[string]api.ReplicationController + serviceData map[string]api.Service +} + +// NewRegistry returns a new Registry. +func NewRegistry() *Registry { + return &Registry{ + podData: map[string]api.Pod{}, + controllerData: map[string]api.ReplicationController{}, + serviceData: map[string]api.Service{}, + } +} + +// CreateController registers the given replication controller. +func (r *Registry) CreateController(controller api.ReplicationController) error { + r.controllerData[controller.ID] = controller + return nil +} + +// CreatePod registers the given pod. +func (r *Registry) CreatePod(machine string, pod api.Pod) error { + r.podData[pod.ID] = pod + return nil +} + +// CreateService registers the given service. +func (r *Registry) CreateService(svc api.Service) error { + r.serviceData[svc.ID] = svc + return nil +} + +// DeleteController deletes the named replication controller from the +// registry. +func (r *Registry) DeleteController(controllerID string) error { + if _, ok := r.controllerData[controllerID]; !ok { + return apiserver.NewNotFoundErr("replicationController", controllerID) + } + delete(r.controllerData, controllerID) + return nil +} + +// DeletePod deletes the named pod from the registry. +func (r *Registry) DeletePod(podID string) error { + if _, ok := r.podData[podID]; !ok { + return apiserver.NewNotFoundErr("pod", podID) + } + delete(r.podData, podID) + return nil +} + +// DeleteService deletes the named service from the registry. +// It returns an error if the service is not found in the registry. +func (r *Registry) DeleteService(name string) error { + if _, ok := r.serviceData[name]; !ok { + return apiserver.NewNotFoundErr("service", name) + } + delete(r.serviceData, name) + return nil +} + +// GetController returns an *api.ReplicationController for the name controller. +// It returns an error if the controller is not found in the registry. +func (r *Registry) GetController(controllerID string) (*api.ReplicationController, error) { + controller, found := r.controllerData[controllerID] + if found { + return &controller, nil + } else { + return nil, apiserver.NewNotFoundErr("replicationController", controllerID) + } +} + +// GetPod returns an *api.Pod for the named pod. +// It returns an error if the pod is not found in the registry. +func (r *Registry) GetPod(podID string) (*api.Pod, error) { + pod, found := r.podData[podID] + if found { + return &pod, nil + } else { + return nil, apiserver.NewNotFoundErr("pod", podID) + } +} + +// GetService returns an *api.Service for the named service. +// It returns an error if the service is not found in the registry. +func (r *Registry) GetService(name string) (*api.Service, error) { + svc, found := r.serviceData[name] + if !found { + return nil, apiserver.NewNotFoundErr("service", name) + } + return &svc, nil +} + +// ListControllers returns all registered replication controllers. +func (r *Registry) ListControllers() ([]api.ReplicationController, error) { + result := []api.ReplicationController{} + for _, value := range r.controllerData { + result = append(result, value) + } + return result, nil +} + +// ListPods returns all registered pods for the given selector. +func (r *Registry) ListPods(selector labels.Selector) ([]api.Pod, error) { + result := []api.Pod{} + for _, value := range r.podData { + if selector.Matches(labels.Set(value.Labels)) { + result = append(result, value) + } + } + return result, nil +} + +// ListServices returns all registered services. +func (r *Registry) ListServices() (api.ServiceList, error) { + var list []api.Service + for _, value := range r.serviceData { + list = append(list, value) + } + return api.ServiceList{Items: list}, nil +} + +// UpdateController updates the given controller in the registry. +// It returns an error if the controller is not found in the registry. +func (r *Registry) UpdateController(controller api.ReplicationController) error { + if _, ok := r.controllerData[controller.ID]; !ok { + return apiserver.NewNotFoundErr("replicationController", controller.ID) + } + r.controllerData[controller.ID] = controller + return nil +} + +// UpdateEndpoints always returns nil. +func (r *Registry) UpdateEndpoints(e api.Endpoints) error { + return nil +} + +// UpdatePod updates the given pod in the registry. +// It returns an error if the pod is not found in the registry. +func (r *Registry) UpdatePod(pod api.Pod) error { + if _, ok := r.podData[pod.ID]; !ok { + return apiserver.NewNotFoundErr("pod", pod.ID) + } + r.podData[pod.ID] = pod + return nil +} + +// UpdateService updates the given service in the registry. +// It returns an error if the service is not found in the registry. +func (r *Registry) UpdateService(svc api.Service) error { + if _, ok := r.serviceData[svc.ID]; !ok { + return apiserver.NewNotFoundErr("service", svc.ID) + } + return r.CreateService(svc) +} + +// WatchControllers always returns an error. +// It is not implemented. +func (r *Registry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + return nil, errors.New("unimplemented") +} diff --git a/pkg/registry/memory_registry_test.go b/pkg/registry/memory/memory_test.go similarity index 92% rename from pkg/registry/memory_registry_test.go rename to pkg/registry/memory/memory_test.go index 4db09bdbc6d..e5f7c093562 100644 --- a/pkg/registry/memory_registry_test.go +++ b/pkg/registry/memory/memory_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package memory import ( "testing" @@ -25,32 +25,30 @@ import ( ) func TestListPodsEmpty(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() pods, err := registry.ListPods(labels.Everything()) if err != nil { t.Errorf("unexpected error: %v", err) } - if len(pods) != 0 { t.Errorf("Unexpected pod list: %#v", pods) } } func TestMemoryListPods(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() registry.CreatePod("machine", api.Pod{JSONBase: api.JSONBase{ID: "foo"}}) pods, err := registry.ListPods(labels.Everything()) if err != nil { t.Errorf("unexpected error: %v", err) } - if len(pods) != 1 || pods[0].ID != "foo" { t.Errorf("Unexpected pod list: %#v", pods) } } func TestMemoryGetPods(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() pod, err := registry.GetPod("foo") if !apiserver.IsNotFound(err) { if err != nil { @@ -62,7 +60,7 @@ func TestMemoryGetPods(t *testing.T) { } func TestMemorySetGetPods(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() expectedPod := api.Pod{JSONBase: api.JSONBase{ID: "foo"}} registry.CreatePod("machine", expectedPod) pod, err := registry.GetPod("foo") @@ -76,7 +74,7 @@ func TestMemorySetGetPods(t *testing.T) { } func TestMemoryUpdatePods(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() pod := api.Pod{ JSONBase: api.JSONBase{ ID: "foo", @@ -96,7 +94,7 @@ func TestMemoryUpdatePods(t *testing.T) { } func TestMemorySetUpdateGetPods(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() oldPod := api.Pod{JSONBase: api.JSONBase{ID: "foo"}} expectedPod := api.Pod{ JSONBase: api.JSONBase{ @@ -119,7 +117,7 @@ func TestMemorySetUpdateGetPods(t *testing.T) { } func TestMemoryDeletePods(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() err := registry.DeletePod("foo") if !apiserver.IsNotFound(err) { if err != nil { @@ -131,7 +129,7 @@ func TestMemoryDeletePods(t *testing.T) { } func TestMemorySetDeleteGetPods(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() expectedPod := api.Pod{JSONBase: api.JSONBase{ID: "foo"}} registry.CreatePod("machine", expectedPod) registry.DeletePod("foo") @@ -146,7 +144,7 @@ func TestMemorySetDeleteGetPods(t *testing.T) { } func TestListControllersEmpty(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() ctls, err := registry.ListControllers() if err != nil { t.Errorf("unexpected error: %v", err) @@ -158,7 +156,7 @@ func TestListControllersEmpty(t *testing.T) { } func TestMemoryListControllers(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() registry.CreateController(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}) ctls, err := registry.ListControllers() if err != nil { @@ -171,7 +169,7 @@ func TestMemoryListControllers(t *testing.T) { } func TestMemoryGetController(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() ctl, err := registry.GetController("foo") if !apiserver.IsNotFound(err) { if err != nil { @@ -183,7 +181,7 @@ func TestMemoryGetController(t *testing.T) { } func TestMemorySetGetControllers(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() expectedController := api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}} registry.CreateController(expectedController) ctl, err := registry.GetController("foo") @@ -197,7 +195,7 @@ func TestMemorySetGetControllers(t *testing.T) { } func TestMemoryUpdateController(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() ctl := api.ReplicationController{ JSONBase: api.JSONBase{ ID: "foo", @@ -217,7 +215,7 @@ func TestMemoryUpdateController(t *testing.T) { } func TestMemorySetUpdateGetControllers(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() oldController := api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}} expectedController := api.ReplicationController{ JSONBase: api.JSONBase{ @@ -240,7 +238,7 @@ func TestMemorySetUpdateGetControllers(t *testing.T) { } func TestMemoryDeleteController(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() err := registry.DeleteController("foo") if !apiserver.IsNotFound(err) { if err != nil { @@ -252,7 +250,7 @@ func TestMemoryDeleteController(t *testing.T) { } func TestMemorySetDeleteGetControllers(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() expectedController := api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}} registry.CreateController(expectedController) registry.DeleteController("foo") @@ -267,7 +265,7 @@ func TestMemorySetDeleteGetControllers(t *testing.T) { } func TestListServicesEmpty(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() svcs, err := registry.ListServices() if err != nil { t.Errorf("unexpected error: %v", err) @@ -279,7 +277,7 @@ func TestListServicesEmpty(t *testing.T) { } func TestMemoryListServices(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() registry.CreateService(api.Service{JSONBase: api.JSONBase{ID: "foo"}}) svcs, err := registry.ListServices() if err != nil { @@ -292,7 +290,7 @@ func TestMemoryListServices(t *testing.T) { } func TestMemoryGetService(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() svc, err := registry.GetService("foo") if !apiserver.IsNotFound(err) { if err != nil { @@ -304,7 +302,7 @@ func TestMemoryGetService(t *testing.T) { } func TestMemorySetGetServices(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() expectedService := api.Service{JSONBase: api.JSONBase{ID: "foo"}} registry.CreateService(expectedService) svc, err := registry.GetService("foo") @@ -318,7 +316,7 @@ func TestMemorySetGetServices(t *testing.T) { } func TestMemoryUpdateService(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() svc := api.Service{ JSONBase: api.JSONBase{ ID: "foo", @@ -336,7 +334,7 @@ func TestMemoryUpdateService(t *testing.T) { } func TestMemorySetUpdateGetServices(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() oldService := api.Service{JSONBase: api.JSONBase{ID: "foo"}} expectedService := api.Service{ JSONBase: api.JSONBase{ @@ -357,7 +355,7 @@ func TestMemorySetUpdateGetServices(t *testing.T) { } func TestMemoryDeleteService(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() err := registry.DeleteService("foo") if !apiserver.IsNotFound(err) { if err != nil { @@ -369,7 +367,7 @@ func TestMemoryDeleteService(t *testing.T) { } func TestMemorySetDeleteGetServices(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() expectedService := api.Service{JSONBase: api.JSONBase{ID: "foo"}} registry.CreateService(expectedService) registry.DeleteService("foo") diff --git a/pkg/registry/memory_registry.go b/pkg/registry/memory_registry.go deleted file mode 100644 index bce2fb60b10..00000000000 --- a/pkg/registry/memory_registry.go +++ /dev/null @@ -1,165 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package registry - -import ( - "errors" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" - "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" -) - -// An implementation of PodRegistry and ControllerRegistry that is backed by memory -// Mainly used for testing. -type MemoryRegistry struct { - podData map[string]api.Pod - controllerData map[string]api.ReplicationController - serviceData map[string]api.Service -} - -func MakeMemoryRegistry() *MemoryRegistry { - return &MemoryRegistry{ - podData: map[string]api.Pod{}, - controllerData: map[string]api.ReplicationController{}, - serviceData: map[string]api.Service{}, - } -} - -func (registry *MemoryRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) { - result := []api.Pod{} - for _, value := range registry.podData { - if selector.Matches(labels.Set(value.Labels)) { - result = append(result, value) - } - } - return result, nil -} - -func (registry *MemoryRegistry) GetPod(podID string) (*api.Pod, error) { - pod, found := registry.podData[podID] - if found { - return &pod, nil - } else { - return nil, apiserver.NewNotFoundErr("pod", podID) - } -} - -func (registry *MemoryRegistry) CreatePod(machine string, pod api.Pod) error { - registry.podData[pod.ID] = pod - return nil -} - -func (registry *MemoryRegistry) DeletePod(podID string) error { - if _, ok := registry.podData[podID]; !ok { - return apiserver.NewNotFoundErr("pod", podID) - } - delete(registry.podData, podID) - return nil -} - -func (registry *MemoryRegistry) UpdatePod(pod api.Pod) error { - if _, ok := registry.podData[pod.ID]; !ok { - return apiserver.NewNotFoundErr("pod", pod.ID) - } - registry.podData[pod.ID] = pod - return nil -} - -func (registry *MemoryRegistry) ListControllers() ([]api.ReplicationController, error) { - result := []api.ReplicationController{} - for _, value := range registry.controllerData { - result = append(result, value) - } - return result, nil -} - -func (registry *MemoryRegistry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { - return nil, errors.New("unimplemented") -} - -func (registry *MemoryRegistry) GetController(controllerID string) (*api.ReplicationController, error) { - controller, found := registry.controllerData[controllerID] - if found { - return &controller, nil - } else { - return nil, apiserver.NewNotFoundErr("replicationController", controllerID) - } -} - -func (registry *MemoryRegistry) CreateController(controller api.ReplicationController) error { - registry.controllerData[controller.ID] = controller - return nil -} - -func (registry *MemoryRegistry) DeleteController(controllerID string) error { - if _, ok := registry.controllerData[controllerID]; !ok { - return apiserver.NewNotFoundErr("replicationController", controllerID) - } - delete(registry.controllerData, controllerID) - return nil -} - -func (registry *MemoryRegistry) UpdateController(controller api.ReplicationController) error { - if _, ok := registry.controllerData[controller.ID]; !ok { - return apiserver.NewNotFoundErr("replicationController", controller.ID) - } - registry.controllerData[controller.ID] = controller - return nil -} - -func (registry *MemoryRegistry) ListServices() (api.ServiceList, error) { - var list []api.Service - for _, value := range registry.serviceData { - list = append(list, value) - } - return api.ServiceList{Items: list}, nil -} - -func (registry *MemoryRegistry) CreateService(svc api.Service) error { - registry.serviceData[svc.ID] = svc - return nil -} - -func (registry *MemoryRegistry) GetService(name string) (*api.Service, error) { - svc, found := registry.serviceData[name] - if found { - return &svc, nil - } else { - return nil, apiserver.NewNotFoundErr("service", name) - } -} - -func (registry *MemoryRegistry) DeleteService(name string) error { - if _, ok := registry.serviceData[name]; !ok { - return apiserver.NewNotFoundErr("service", name) - } - delete(registry.serviceData, name) - return nil -} - -func (registry *MemoryRegistry) UpdateService(svc api.Service) error { - if _, ok := registry.serviceData[svc.ID]; !ok { - return apiserver.NewNotFoundErr("service", svc.ID) - } - return registry.CreateService(svc) -} - -func (registry *MemoryRegistry) UpdateEndpoints(e api.Endpoints) error { - return nil -} diff --git a/pkg/registry/caching_minion_registry.go b/pkg/registry/minion/caching_registry.go similarity index 54% rename from pkg/registry/caching_minion_registry.go rename to pkg/registry/minion/caching_registry.go index 44b61bf0fc8..ec04939b21e 100644 --- a/pkg/registry/caching_minion_registry.go +++ b/pkg/registry/minion/caching_registry.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package minion import ( "sync" @@ -32,8 +32,8 @@ func (SystemClock) Now() time.Time { return time.Now() } -type CachingMinionRegistry struct { - delegate MinionRegistry +type CachingRegistry struct { + delegate Registry ttl time.Duration minions []string lastUpdate int64 @@ -41,12 +41,12 @@ type CachingMinionRegistry struct { clock Clock } -func NewCachingMinionRegistry(delegate MinionRegistry, ttl time.Duration) (MinionRegistry, error) { +func NewCachingRegistry(delegate Registry, ttl time.Duration) (Registry, error) { list, err := delegate.List() if err != nil { return nil, err } - return &CachingMinionRegistry{ + return &CachingRegistry{ delegate: delegate, ttl: ttl, minions: list, @@ -55,44 +55,16 @@ func NewCachingMinionRegistry(delegate MinionRegistry, ttl time.Duration) (Minio }, nil } -func (c *CachingMinionRegistry) List() ([]string, error) { - if c.expired() { - err := c.refresh(false) - if err != nil { - return c.minions, err - } - } - return c.minions, nil -} - -func (c *CachingMinionRegistry) Insert(minion string) error { - err := c.delegate.Insert(minion) - if err != nil { - return err - } - return c.refresh(true) -} - -func (c *CachingMinionRegistry) Delete(minion string) error { - err := c.delegate.Delete(minion) - if err != nil { - return err - } - return c.refresh(true) -} - -func (c *CachingMinionRegistry) Contains(minion string) (bool, error) { - if c.expired() { - err := c.refresh(false) - if err != nil { +func (r *CachingRegistry) Contains(minion string) (bool, error) { + if r.expired() { + if err := r.refresh(false); err != nil { return false, err } } - // block updates in the middle of a contains. - c.lock.RLock() - defer c.lock.RUnlock() - for _, name := range c.minions { + r.lock.RLock() + defer r.lock.RUnlock() + for _, name := range r.minions { if name == minion { return true, nil } @@ -100,23 +72,46 @@ func (c *CachingMinionRegistry) Contains(minion string) (bool, error) { return false, nil } +func (r *CachingRegistry) Delete(minion string) error { + if err := r.delegate.Delete(minion); err != nil { + return err + } + return r.refresh(true) +} + +func (r *CachingRegistry) Insert(minion string) error { + if err := r.delegate.Insert(minion); err != nil { + return err + } + return r.refresh(true) +} + +func (r *CachingRegistry) List() ([]string, error) { + if r.expired() { + if err := r.refresh(false); err != nil { + return r.minions, err + } + } + return r.minions, nil +} + +func (r *CachingRegistry) expired() bool { + var unix int64 + atomic.SwapInt64(&unix, r.lastUpdate) + return r.clock.Now().Sub(time.Unix(r.lastUpdate, 0)) > r.ttl +} + // refresh updates the current store. It double checks expired under lock with the assumption // of optimistic concurrency with the other functions. -func (c *CachingMinionRegistry) refresh(force bool) error { - c.lock.Lock() - defer c.lock.Unlock() - if force || c.expired() { +func (r *CachingRegistry) refresh(force bool) error { + r.lock.Lock() + defer r.lock.Unlock() + if force || r.expired() { var err error - c.minions, err = c.delegate.List() - time := c.clock.Now() - atomic.SwapInt64(&c.lastUpdate, time.Unix()) + r.minions, err = r.delegate.List() + time := r.clock.Now() + atomic.SwapInt64(&r.lastUpdate, time.Unix()) return err } return nil } - -func (c *CachingMinionRegistry) expired() bool { - var unix int64 - atomic.SwapInt64(&unix, c.lastUpdate) - return c.clock.Now().Sub(time.Unix(c.lastUpdate, 0)) > c.ttl -} diff --git a/pkg/registry/caching_minion_registry_test.go b/pkg/registry/minion/caching_registry_test.go similarity index 75% rename from pkg/registry/caching_minion_registry_test.go rename to pkg/registry/minion/caching_registry_test.go index 5f05522b09a..d09ccd77434 100644 --- a/pkg/registry/caching_minion_registry_test.go +++ b/pkg/registry/minion/caching_registry_test.go @@ -14,12 +14,14 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package minion import ( "reflect" "testing" "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" ) type fakeClock struct { @@ -34,9 +36,9 @@ func TestCachingHit(t *testing.T) { fakeClock := fakeClock{ now: time.Unix(0, 0), } - fakeRegistry := MakeMockMinionRegistry([]string{"m1", "m2"}) + fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"}) expected := []string{"m1", "m2", "m3"} - cache := CachingMinionRegistry{ + cache := CachingRegistry{ delegate: fakeRegistry, ttl: 1 * time.Second, clock: &fakeClock, @@ -56,9 +58,9 @@ func TestCachingMiss(t *testing.T) { fakeClock := fakeClock{ now: time.Unix(0, 0), } - fakeRegistry := MakeMockMinionRegistry([]string{"m1", "m2"}) + fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"}) expected := []string{"m1", "m2", "m3"} - cache := CachingMinionRegistry{ + cache := CachingRegistry{ delegate: fakeRegistry, ttl: 1 * time.Second, clock: &fakeClock, @@ -70,9 +72,8 @@ func TestCachingMiss(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - - if !reflect.DeepEqual(list, fakeRegistry.minions) { - t.Errorf("expected: %v, got %v", fakeRegistry.minions, list) + if !reflect.DeepEqual(list, fakeRegistry.Minions) { + t.Errorf("expected: %v, got %v", fakeRegistry.Minions, list) } } @@ -80,9 +81,9 @@ func TestCachingInsert(t *testing.T) { fakeClock := fakeClock{ now: time.Unix(0, 0), } - fakeRegistry := MakeMockMinionRegistry([]string{"m1", "m2"}) + fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"}) expected := []string{"m1", "m2", "m3"} - cache := CachingMinionRegistry{ + cache := CachingRegistry{ delegate: fakeRegistry, ttl: 1 * time.Second, clock: &fakeClock, @@ -93,14 +94,12 @@ func TestCachingInsert(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - list, err := cache.List() if err != nil { t.Errorf("unexpected error: %v", err) } - - if !reflect.DeepEqual(list, fakeRegistry.minions) { - t.Errorf("expected: %v, got %v", fakeRegistry.minions, list) + if !reflect.DeepEqual(list, fakeRegistry.Minions) { + t.Errorf("expected: %v, got %v", fakeRegistry.Minions, list) } } @@ -108,9 +107,9 @@ func TestCachingDelete(t *testing.T) { fakeClock := fakeClock{ now: time.Unix(0, 0), } - fakeRegistry := MakeMockMinionRegistry([]string{"m1", "m2"}) + fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"}) expected := []string{"m1", "m2", "m3"} - cache := CachingMinionRegistry{ + cache := CachingRegistry{ delegate: fakeRegistry, ttl: 1 * time.Second, clock: &fakeClock, @@ -121,13 +120,11 @@ func TestCachingDelete(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - list, err := cache.List() if err != nil { t.Errorf("unexpected error: %v", err) } - - if !reflect.DeepEqual(list, fakeRegistry.minions) { - t.Errorf("expected: %v, got %v", fakeRegistry.minions, list) + if !reflect.DeepEqual(list, fakeRegistry.Minions) { + t.Errorf("expected: %v, got %v", fakeRegistry.Minions, list) } } diff --git a/pkg/registry/cloud_minion_registry.go b/pkg/registry/minion/cloud_registry.go similarity index 66% rename from pkg/registry/cloud_minion_registry.go rename to pkg/registry/minion/cloud_registry.go index 3a158fabc6d..9d7688ce188 100644 --- a/pkg/registry/cloud_minion_registry.go +++ b/pkg/registry/minion/cloud_registry.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package minion import ( "fmt" @@ -22,37 +22,20 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" ) -type CloudMinionRegistry struct { +type CloudRegistry struct { cloud cloudprovider.Interface matchRE string } -func MakeCloudMinionRegistry(cloud cloudprovider.Interface, matchRE string) (*CloudMinionRegistry, error) { - return &CloudMinionRegistry{ +func NewCloudRegistry(cloud cloudprovider.Interface, matchRE string) (*CloudRegistry, error) { + return &CloudRegistry{ cloud: cloud, matchRE: matchRE, }, nil } -func (c *CloudMinionRegistry) List() ([]string, error) { - instances, ok := c.cloud.Instances() - if !ok { - return nil, fmt.Errorf("cloud doesn't support instances") - } - - return instances.List(c.matchRE) -} - -func (c *CloudMinionRegistry) Insert(minion string) error { - return fmt.Errorf("unsupported") -} - -func (c *CloudMinionRegistry) Delete(minion string) error { - return fmt.Errorf("unsupported") -} - -func (c *CloudMinionRegistry) Contains(minion string) (bool, error) { - instances, err := c.List() +func (r *CloudRegistry) Contains(minion string) (bool, error) { + instances, err := r.List() if err != nil { return false, err } @@ -63,3 +46,19 @@ func (c *CloudMinionRegistry) Contains(minion string) (bool, error) { } return false, nil } + +func (r CloudRegistry) Delete(minion string) error { + return fmt.Errorf("unsupported") +} + +func (r CloudRegistry) Insert(minion string) error { + return fmt.Errorf("unsupported") +} + +func (r *CloudRegistry) List() ([]string, error) { + instances, ok := r.cloud.Instances() + if !ok { + return nil, fmt.Errorf("cloud doesn't support instances") + } + return instances.List(r.matchRE) +} diff --git a/pkg/registry/cloud_minion_registry_test.go b/pkg/registry/minion/cloud_registry_test.go similarity index 91% rename from pkg/registry/cloud_minion_registry_test.go rename to pkg/registry/minion/cloud_registry_test.go index 366625bc041..1b1fb08e917 100644 --- a/pkg/registry/cloud_minion_registry_test.go +++ b/pkg/registry/minion/cloud_registry_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package minion import ( "reflect" @@ -28,7 +28,7 @@ func TestCloudList(t *testing.T) { fakeCloud := cloudprovider.FakeCloud{ Machines: instances, } - registry, err := MakeCloudMinionRegistry(&fakeCloud, ".*") + registry, err := NewCloudRegistry(&fakeCloud, ".*") if err != nil { t.Errorf("unexpected error: %v", err) } @@ -48,7 +48,7 @@ func TestCloudContains(t *testing.T) { fakeCloud := cloudprovider.FakeCloud{ Machines: instances, } - registry, err := MakeCloudMinionRegistry(&fakeCloud, ".*") + registry, err := NewCloudRegistry(&fakeCloud, ".*") if err != nil { t.Errorf("unexpected error: %v", err) } @@ -77,7 +77,7 @@ func TestCloudListRegexp(t *testing.T) { fakeCloud := cloudprovider.FakeCloud{ Machines: instances, } - registry, err := MakeCloudMinionRegistry(&fakeCloud, "m[0-9]+") + registry, err := NewCloudRegistry(&fakeCloud, "m[0-9]+") if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/pkg/registry/healthy_minion_registry.go b/pkg/registry/minion/healthy_registry.go similarity index 60% rename from pkg/registry/healthy_minion_registry.go rename to pkg/registry/minion/healthy_registry.go index 658052f13e8..33eabb49456 100644 --- a/pkg/registry/healthy_minion_registry.go +++ b/pkg/registry/minion/healthy_registry.go @@ -14,42 +14,65 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package minion import ( "fmt" "net/http" "github.com/GoogleCloudPlatform/kubernetes/pkg/health" + "github.com/golang/glog" ) -type HealthyMinionRegistry struct { - delegate MinionRegistry +type HealthyRegistry struct { + delegate Registry client health.HTTPGetInterface port int } -func NewHealthyMinionRegistry(delegate MinionRegistry, client *http.Client) MinionRegistry { - return &HealthyMinionRegistry{ +func NewHealthyRegistry(delegate Registry, client *http.Client) Registry { + return &HealthyRegistry{ delegate: delegate, client: client, port: 10250, } } -func (h *HealthyMinionRegistry) makeMinionURL(minion string) string { - return fmt.Sprintf("http://%s:%d/healthz", minion, h.port) +func (r *HealthyRegistry) Contains(minion string) (bool, error) { + contains, err := r.delegate.Contains(minion) + if err != nil { + return false, err + } + if !contains { + return false, nil + } + status, err := health.DoHTTPCheck(r.makeMinionURL(minion), r.client) + if err != nil { + return false, err + } + if status == health.Unhealthy { + return false, nil + } + return true, nil } -func (h *HealthyMinionRegistry) List() (currentMinions []string, err error) { +func (r *HealthyRegistry) Delete(minion string) error { + return r.delegate.Delete(minion) +} + +func (r *HealthyRegistry) Insert(minion string) error { + return r.delegate.Insert(minion) +} + +func (r *HealthyRegistry) List() (currentMinions []string, err error) { var result []string - list, err := h.delegate.List() + list, err := r.delegate.List() if err != nil { return result, err } for _, minion := range list { - status, err := health.DoHTTPCheck(h.makeMinionURL(minion), h.client) + status, err := health.DoHTTPCheck(r.makeMinionURL(minion), r.client) if err != nil { glog.Errorf("%s failed health check with error: %s", minion, err) continue @@ -61,28 +84,6 @@ func (h *HealthyMinionRegistry) List() (currentMinions []string, err error) { return result, nil } -func (h *HealthyMinionRegistry) Insert(minion string) error { - return h.delegate.Insert(minion) -} - -func (h *HealthyMinionRegistry) Delete(minion string) error { - return h.delegate.Delete(minion) -} - -func (h *HealthyMinionRegistry) Contains(minion string) (bool, error) { - contains, err := h.delegate.Contains(minion) - if err != nil { - return false, err - } - if !contains { - return false, nil - } - status, err := health.DoHTTPCheck(h.makeMinionURL(minion), h.client) - if err != nil { - return false, err - } - if status == health.Unhealthy { - return false, nil - } - return true, nil +func (r *HealthyRegistry) makeMinionURL(minion string) string { + return fmt.Sprintf("http://%s:%d/healthz", minion, r.port) } diff --git a/pkg/registry/healthy_minion_registry_test.go b/pkg/registry/minion/healthy_registry_test.go similarity index 81% rename from pkg/registry/healthy_minion_registry_test.go rename to pkg/registry/minion/healthy_registry_test.go index 132339564fc..cebcf31f1d6 100644 --- a/pkg/registry/healthy_minion_registry_test.go +++ b/pkg/registry/minion/healthy_registry_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package minion import ( "bytes" @@ -22,6 +22,8 @@ import ( "net/http" "reflect" "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" ) type alwaysYes struct{} @@ -38,41 +40,33 @@ func (alwaysYes) Get(url string) (*http.Response, error) { } func TestBasicDelegation(t *testing.T) { - mockMinionRegistry := MockMinionRegistry{ - minions: []string{"m1", "m2", "m3"}, - } - healthy := HealthyMinionRegistry{ - delegate: &mockMinionRegistry, + mockMinionRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2", "m3"}) + healthy := HealthyRegistry{ + delegate: mockMinionRegistry, client: alwaysYes{}, } - list, err := healthy.List() if err != nil { t.Errorf("unexpected error: %v", err) } - - if !reflect.DeepEqual(list, mockMinionRegistry.minions) { - t.Errorf("Expected %v, Got %v", mockMinionRegistry.minions, list) + if !reflect.DeepEqual(list, mockMinionRegistry.Minions) { + t.Errorf("Expected %v, Got %v", mockMinionRegistry.Minions, list) } err = healthy.Insert("foo") if err != nil { t.Errorf("unexpected error: %v", err) } - ok, err := healthy.Contains("m1") if err != nil { t.Errorf("unexpected error: %v", err) } - if !ok { t.Errorf("Unexpected absence of 'm1'") } - ok, err = healthy.Contains("m5") if err != nil { t.Errorf("unexpected error: %v", err) } - if ok { t.Errorf("Unexpected presence of 'm5'") } @@ -91,21 +85,17 @@ func (n *notMinion) Get(url string) (*http.Response, error) { } func TestFiltering(t *testing.T) { - mockMinionRegistry := MockMinionRegistry{ - minions: []string{"m1", "m2", "m3"}, - } - healthy := HealthyMinionRegistry{ - delegate: &mockMinionRegistry, + mockMinionRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2", "m3"}) + healthy := HealthyRegistry{ + delegate: mockMinionRegistry, client: ¬Minion{minion: "m1"}, port: 10250, } - expected := []string{"m2", "m3"} list, err := healthy.List() if err != nil { t.Errorf("unexpected error: %v", err) } - if !reflect.DeepEqual(list, expected) { t.Errorf("Expected %v, Got %v", expected, list) } @@ -113,7 +103,6 @@ func TestFiltering(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - if ok { t.Errorf("Unexpected presence of 'm1'") } diff --git a/pkg/registry/minionregistry.go b/pkg/registry/minion/minion.go similarity index 92% rename from pkg/registry/minionregistry.go rename to pkg/registry/minion/minion.go index 7d5ab774dc5..ddf90d73a98 100644 --- a/pkg/registry/minionregistry.go +++ b/pkg/registry/minion/minion.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package minion import ( "fmt" @@ -27,7 +27,7 @@ import ( var ErrDoesNotExist = fmt.Errorf("The requested resource does not exist.") // Keep track of a set of minions. Safe for concurrent reading/writing. -type MinionRegistry interface { +type Registry interface { List() (currentMinions []string, err error) Insert(minion string) error Delete(minion string) error @@ -35,7 +35,7 @@ type MinionRegistry interface { } // Initialize a minion registry with a list of minions. -func MakeMinionRegistry(minions []string) MinionRegistry { +func NewRegistry(minions []string) Registry { m := &minionList{ minions: util.StringSet{}, } @@ -50,22 +50,10 @@ type minionList struct { lock sync.Mutex } -func (m *minionList) List() (currentMinions []string, err error) { +func (m *minionList) Contains(minion string) (bool, error) { m.lock.Lock() defer m.lock.Unlock() - // Convert from map to []string - for minion := range m.minions { - currentMinions = append(currentMinions, minion) - } - sort.StringSlice(currentMinions).Sort() - return -} - -func (m *minionList) Insert(newMinion string) error { - m.lock.Lock() - defer m.lock.Unlock() - m.minions.Insert(newMinion) - return nil + return m.minions.Has(minion), nil } func (m *minionList) Delete(minion string) error { @@ -75,8 +63,19 @@ func (m *minionList) Delete(minion string) error { return nil } -func (m *minionList) Contains(minion string) (bool, error) { +func (m *minionList) Insert(newMinion string) error { m.lock.Lock() defer m.lock.Unlock() - return m.minions.Has(minion), nil + m.minions.Insert(newMinion) + return nil +} + +func (m *minionList) List() (currentMinions []string, err error) { + m.lock.Lock() + defer m.lock.Unlock() + for minion := range m.minions { + currentMinions = append(currentMinions, minion) + } + sort.StringSlice(currentMinions).Sort() + return } diff --git a/pkg/registry/minionregistry_test.go b/pkg/registry/minion/minion_test.go similarity index 92% rename from pkg/registry/minionregistry_test.go rename to pkg/registry/minion/minion_test.go index 1a709679a09..f4dd2bfa5b4 100644 --- a/pkg/registry/minionregistry_test.go +++ b/pkg/registry/minion/minion_test.go @@ -14,15 +14,15 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package minion import ( "reflect" "testing" ) -func TestMinionRegistry(t *testing.T) { - m := MakeMinionRegistry([]string{"foo", "bar"}) +func TestRegistry(t *testing.T) { + m := NewRegistry([]string{"foo", "bar"}) if has, err := m.Contains("foo"); !has || err != nil { t.Errorf("missing expected object") } @@ -32,21 +32,18 @@ func TestMinionRegistry(t *testing.T) { if has, err := m.Contains("baz"); has || err != nil { t.Errorf("has unexpected object") } - if err := m.Insert("baz"); err != nil { t.Errorf("insert failed") } if has, err := m.Contains("baz"); !has || err != nil { t.Errorf("insert didn't actually insert") } - if err := m.Delete("bar"); err != nil { t.Errorf("delete failed") } if has, err := m.Contains("bar"); has || err != nil { t.Errorf("delete didn't actually delete") } - list, err := m.List() if err != nil { t.Errorf("got error calling List") diff --git a/pkg/registry/minionstorage.go b/pkg/registry/minion/storage.go similarity index 56% rename from pkg/registry/minionstorage.go rename to pkg/registry/minion/storage.go index d98b9e5d45d..188ca1de859 100644 --- a/pkg/registry/minionstorage.go +++ b/pkg/registry/minion/storage.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package minion import ( "fmt" @@ -24,46 +24,19 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" ) -// MinionRegistryStorage implements the RESTStorage interface, backed by a MinionRegistry. -type MinionRegistryStorage struct { - registry MinionRegistry +// RegistryStorage implements the RESTStorage interface, backed by a MinionRegistry. +type RegistryStorage struct { + registry Registry } -func MakeMinionRegistryStorage(m MinionRegistry) apiserver.RESTStorage { - return &MinionRegistryStorage{ +// NewRegistryStorage returns a new RegistryStorage. +func NewRegistryStorage(m Registry) apiserver.RESTStorage { + return &RegistryStorage{ registry: m, } } -func (storage *MinionRegistryStorage) toApiMinion(name string) api.Minion { - return api.Minion{JSONBase: api.JSONBase{ID: name}} -} - -func (storage *MinionRegistryStorage) List(selector labels.Selector) (interface{}, error) { - nameList, err := storage.registry.List() - if err != nil { - return nil, err - } - var list api.MinionList - for _, name := range nameList { - list.Items = append(list.Items, storage.toApiMinion(name)) - } - return list, nil -} - -func (storage *MinionRegistryStorage) Get(id string) (interface{}, error) { - exists, err := storage.registry.Contains(id) - if !exists { - return nil, ErrDoesNotExist - } - return storage.toApiMinion(id), err -} - -func (storage *MinionRegistryStorage) New() interface{} { - return &api.Minion{} -} - -func (storage *MinionRegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { +func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { minion, ok := obj.(*api.Minion) if !ok { return nil, fmt.Errorf("not a minion: %#v", obj) @@ -72,27 +45,23 @@ func (storage *MinionRegistryStorage) Create(obj interface{}) (<-chan interface{ return nil, fmt.Errorf("ID should not be empty: %#v", minion) } return apiserver.MakeAsync(func() (interface{}, error) { - err := storage.registry.Insert(minion.ID) + err := rs.registry.Insert(minion.ID) if err != nil { return nil, err } - contains, err := storage.registry.Contains(minion.ID) + contains, err := rs.registry.Contains(minion.ID) if err != nil { return nil, err } if contains { - return storage.toApiMinion(minion.ID), nil + return rs.toApiMinion(minion.ID), nil } return nil, fmt.Errorf("unable to add minion %#v", minion) }), nil } -func (storage *MinionRegistryStorage) Update(minion interface{}) (<-chan interface{}, error) { - return nil, fmt.Errorf("Minions can only be created (inserted) and deleted.") -} - -func (storage *MinionRegistryStorage) Delete(id string) (<-chan interface{}, error) { - exists, err := storage.registry.Contains(id) +func (rs *RegistryStorage) Delete(id string) (<-chan interface{}, error) { + exists, err := rs.registry.Contains(id) if !exists { return nil, ErrDoesNotExist } @@ -100,6 +69,38 @@ func (storage *MinionRegistryStorage) Delete(id string) (<-chan interface{}, err return nil, err } return apiserver.MakeAsync(func() (interface{}, error) { - return &api.Status{Status: api.StatusSuccess}, storage.registry.Delete(id) + return &api.Status{Status: api.StatusSuccess}, rs.registry.Delete(id) }), nil } + +func (rs *RegistryStorage) Get(id string) (interface{}, error) { + exists, err := rs.registry.Contains(id) + if !exists { + return nil, ErrDoesNotExist + } + return rs.toApiMinion(id), err +} + +func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) { + nameList, err := rs.registry.List() + if err != nil { + return nil, err + } + var list api.MinionList + for _, name := range nameList { + list.Items = append(list.Items, rs.toApiMinion(name)) + } + return list, nil +} + +func (rs RegistryStorage) New() interface{} { + return &api.Minion{} +} + +func (rs *RegistryStorage) Update(minion interface{}) (<-chan interface{}, error) { + return nil, fmt.Errorf("Minions can only be created (inserted) and deleted.") +} + +func (rs *RegistryStorage) toApiMinion(name string) api.Minion { + return api.Minion{JSONBase: api.JSONBase{ID: name}} +} diff --git a/pkg/registry/minionstorage_test.go b/pkg/registry/minion/storage_test.go similarity index 95% rename from pkg/registry/minionstorage_test.go rename to pkg/registry/minion/storage_test.go index a0c7ef7f306..96d81595cd5 100644 --- a/pkg/registry/minionstorage_test.go +++ b/pkg/registry/minion/storage_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package minion import ( "reflect" @@ -25,8 +25,8 @@ import ( ) func TestMinionRegistryStorage(t *testing.T) { - m := MakeMinionRegistry([]string{"foo", "bar"}) - ms := MakeMinionRegistryStorage(m) + m := NewRegistry([]string{"foo", "bar"}) + ms := NewRegistryStorage(m) if obj, err := ms.Get("foo"); err != nil || obj.(api.Minion).ID != "foo" { t.Errorf("missing expected object") diff --git a/pkg/registry/mock_registry.go b/pkg/registry/mock_registry.go deleted file mode 100644 index 89b8d90b8f6..00000000000 --- a/pkg/registry/mock_registry.go +++ /dev/null @@ -1,127 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package registry - -import ( - "sync" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" -) - -type MockPodRegistry struct { - err error - pod *api.Pod - pods []api.Pod - sync.Mutex -} - -func MakeMockPodRegistry(pods []api.Pod) *MockPodRegistry { - return &MockPodRegistry{ - pods: pods, - } -} - -func (registry *MockPodRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) { - registry.Lock() - defer registry.Unlock() - if registry.err != nil { - return registry.pods, registry.err - } - var filtered []api.Pod - for _, pod := range registry.pods { - if selector.Matches(labels.Set(pod.Labels)) { - filtered = append(filtered, pod) - } - } - return filtered, nil -} - -func (registry *MockPodRegistry) GetPod(podId string) (*api.Pod, error) { - registry.Lock() - defer registry.Unlock() - return registry.pod, registry.err -} - -func (registry *MockPodRegistry) CreatePod(machine string, pod api.Pod) error { - registry.Lock() - defer registry.Unlock() - return registry.err -} - -func (registry *MockPodRegistry) UpdatePod(pod api.Pod) error { - registry.Lock() - defer registry.Unlock() - registry.pod = &pod - return registry.err -} - -func (registry *MockPodRegistry) DeletePod(podId string) error { - registry.Lock() - defer registry.Unlock() - return registry.err -} - -type MockMinionRegistry struct { - err error - minion string - minions []string - sync.Mutex -} - -func MakeMockMinionRegistry(minions []string) *MockMinionRegistry { - return &MockMinionRegistry{ - minions: minions, - } -} - -func (registry *MockMinionRegistry) List() ([]string, error) { - registry.Lock() - defer registry.Unlock() - return registry.minions, registry.err -} - -func (registry *MockMinionRegistry) Insert(minion string) error { - registry.Lock() - defer registry.Unlock() - registry.minion = minion - return registry.err -} - -func (registry *MockMinionRegistry) Contains(minion string) (bool, error) { - registry.Lock() - defer registry.Unlock() - for _, name := range registry.minions { - if name == minion { - return true, registry.err - } - } - return false, registry.err -} - -func (registry *MockMinionRegistry) Delete(minion string) error { - registry.Lock() - defer registry.Unlock() - var newList []string - for _, name := range registry.minions { - if name != minion { - newList = append(newList, name) - } - } - registry.minions = newList - return registry.err -} diff --git a/pkg/registry/pod/registry.go b/pkg/registry/pod/registry.go new file mode 100644 index 00000000000..12440353ad5 --- /dev/null +++ b/pkg/registry/pod/registry.go @@ -0,0 +1,36 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pod + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" +) + +// Registry is an interface implemented by things that know how to store Pod objects. +type Registry interface { + // ListPods obtains a list of pods that match selector. + ListPods(selector labels.Selector) ([]api.Pod, error) + // Get a specific pod + GetPod(podID string) (*api.Pod, error) + // Create a pod based on a specification, schedule it onto a specific machine. + CreatePod(machine string, pod api.Pod) error + // Update an existing pod + UpdatePod(pod api.Pod) error + // Delete an existing pod + DeletePod(podID string) error +} diff --git a/pkg/registry/podstorage.go b/pkg/registry/pod/storage.go similarity index 60% rename from pkg/registry/podstorage.go rename to pkg/registry/pod/storage.go index 16124c1dd7e..b84035fe872 100644 --- a/pkg/registry/podstorage.go +++ b/pkg/registry/pod/storage.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package pod import ( "fmt" @@ -22,77 +22,130 @@ import ( "sync" "time" - "code.google.com/p/go-uuid/uuid" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" + + "code.google.com/p/go-uuid/uuid" "github.com/golang/glog" ) -// PodRegistryStorage implements the RESTStorage interface in terms of a PodRegistry -type PodRegistryStorage struct { - registry PodRegistry - podInfoGetter client.PodInfoGetter - podCache client.PodInfoGetter - scheduler scheduler.Scheduler +// RegistryStorage implements the RESTStorage interface in terms of a PodRegistry +type RegistryStorage struct { + cloudProvider cloudprovider.Interface + mu sync.Mutex minionLister scheduler.MinionLister - cloud cloudprovider.Interface + podCache client.PodInfoGetter + podInfoGetter client.PodInfoGetter podPollPeriod time.Duration - lock sync.Mutex + registry Registry + scheduler scheduler.Scheduler } -// MakePodRegistryStorage makes a RESTStorage object for a pod registry. -// Parameters: -// registry: The pod registry -// podInfoGetter: Source of fresh container info -// scheduler: The scheduler for assigning pods to machines -// minionLister: Object which can list available minions for the scheduler -// cloud: Interface to a cloud provider (may be null) -// podCache: Source of cached container info -func MakePodRegistryStorage(registry PodRegistry, - podInfoGetter client.PodInfoGetter, - scheduler scheduler.Scheduler, - minionLister scheduler.MinionLister, - cloud cloudprovider.Interface, - podCache client.PodInfoGetter) apiserver.RESTStorage { - return &PodRegistryStorage{ - registry: registry, - podInfoGetter: podInfoGetter, - scheduler: scheduler, - minionLister: minionLister, - cloud: cloud, - podCache: podCache, +type RegistryStorageConfig struct { + CloudProvider cloudprovider.Interface + MinionLister scheduler.MinionLister + PodCache client.PodInfoGetter + PodInfoGetter client.PodInfoGetter + Registry Registry + Scheduler scheduler.Scheduler +} + +// NewRegistryStorage returns a new RegistryStorage. +func NewRegistryStorage(config *RegistryStorageConfig) apiserver.RESTStorage { + return &RegistryStorage{ + cloudProvider: config.CloudProvider, + minionLister: config.MinionLister, + podCache: config.PodCache, + podInfoGetter: config.PodInfoGetter, podPollPeriod: time.Second * 10, + registry: config.Registry, + scheduler: config.Scheduler, } } -func (storage *PodRegistryStorage) List(selector labels.Selector) (interface{}, error) { +func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { + pod := obj.(*api.Pod) + if len(pod.ID) == 0 { + pod.ID = uuid.NewUUID().String() + } + pod.DesiredState.Manifest.ID = pod.ID + if errs := api.ValidatePod(pod); len(errs) > 0 { + return nil, fmt.Errorf("Validation errors: %v", errs) + } + return apiserver.MakeAsync(func() (interface{}, error) { + if err := rs.scheduleAndCreatePod(*pod); err != nil { + return nil, err + } + return rs.waitForPodRunning(*pod) + }), nil +} + +func (rs *RegistryStorage) Delete(id string) (<-chan interface{}, error) { + return apiserver.MakeAsync(func() (interface{}, error) { + return api.Status{Status: api.StatusSuccess}, rs.registry.DeletePod(id) + }), nil +} + +func (rs *RegistryStorage) Get(id string) (interface{}, error) { + pod, err := rs.registry.GetPod(id) + if err != nil { + return pod, err + } + if pod == nil { + return pod, nil + } + if rs.podCache != nil || rs.podInfoGetter != nil { + rs.fillPodInfo(pod) + pod.CurrentState.Status = makePodStatus(pod) + } + pod.CurrentState.HostIP = getInstanceIP(rs.cloudProvider, pod.CurrentState.Host) + return pod, err +} + +func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) { var result api.PodList - pods, err := storage.registry.ListPods(selector) + pods, err := rs.registry.ListPods(selector) if err == nil { result.Items = pods for i := range result.Items { - storage.fillPodInfo(&result.Items[i]) + rs.fillPodInfo(&result.Items[i]) } } - return result, err } -func (storage *PodRegistryStorage) fillPodInfo(pod *api.Pod) { +func (rs RegistryStorage) New() interface{} { + return &api.Pod{} +} + +func (rs *RegistryStorage) Update(obj interface{}) (<-chan interface{}, error) { + pod := obj.(*api.Pod) + if errs := api.ValidatePod(pod); len(errs) > 0 { + return nil, fmt.Errorf("Validation errors: %v", errs) + } + return apiserver.MakeAsync(func() (interface{}, error) { + if err := rs.registry.UpdatePod(*pod); err != nil { + return nil, err + } + return rs.waitForPodRunning(*pod) + }), nil +} + +func (rs *RegistryStorage) fillPodInfo(pod *api.Pod) { // Get cached info for the list currently. // TODO: Optionally use fresh info - if storage.podCache != nil { - info, err := storage.podCache.GetPodInfo(pod.CurrentState.Host, pod.ID) + if rs.podCache != nil { + info, err := rs.podCache.GetPodInfo(pod.CurrentState.Host, pod.ID) if err != nil { if err != client.ErrPodInfoNotAvailable { glog.Errorf("Error getting container info from cache: %#v", err) } - if storage.podInfoGetter != nil { - info, err = storage.podInfoGetter.GetPodInfo(pod.CurrentState.Host, pod.ID) + if rs.podInfoGetter != nil { + info, err = rs.podInfoGetter.GetPodInfo(pod.CurrentState.Host, pod.ID) } if err != nil { if err != client.ErrPodInfoNotAvailable { @@ -115,37 +168,6 @@ func (storage *PodRegistryStorage) fillPodInfo(pod *api.Pod) { } } -func makePodStatus(pod *api.Pod) api.PodStatus { - if pod.CurrentState.Info == nil || pod.CurrentState.Host == "" { - return api.PodWaiting - } - running := 0 - stopped := 0 - unknown := 0 - for _, container := range pod.DesiredState.Manifest.Containers { - if info, ok := pod.CurrentState.Info[container.Name]; ok { - if info.State.Running { - running++ - } else { - stopped++ - } - } else { - unknown++ - } - } - - switch { - case running > 0 && stopped == 0 && unknown == 0: - return api.PodRunning - case running == 0 && stopped > 0 && unknown == 0: - return api.PodTerminated - case running == 0 && stopped == 0 && unknown > 0: - return api.PodWaiting - default: - return api.PodWaiting - } -} - func getInstanceIP(cloud cloudprovider.Interface, host string) string { if cloud == nil { return "" @@ -166,82 +188,50 @@ func getInstanceIP(cloud cloudprovider.Interface, host string) string { return addr.String() } -func (storage *PodRegistryStorage) Get(id string) (interface{}, error) { - pod, err := storage.registry.GetPod(id) - if err != nil { - return pod, err +func makePodStatus(pod *api.Pod) api.PodStatus { + if pod.CurrentState.Info == nil || pod.CurrentState.Host == "" { + return api.PodWaiting } - if pod == nil { - return pod, nil + running := 0 + stopped := 0 + unknown := 0 + for _, container := range pod.DesiredState.Manifest.Containers { + if info, ok := pod.CurrentState.Info[container.Name]; ok { + if info.State.Running { + running++ + } else { + stopped++ + } + } else { + unknown++ + } } - if storage.podCache != nil || storage.podInfoGetter != nil { - storage.fillPodInfo(pod) - pod.CurrentState.Status = makePodStatus(pod) + switch { + case running > 0 && stopped == 0 && unknown == 0: + return api.PodRunning + case running == 0 && stopped > 0 && unknown == 0: + return api.PodTerminated + case running == 0 && stopped == 0 && unknown > 0: + return api.PodWaiting + default: + return api.PodWaiting } - pod.CurrentState.HostIP = getInstanceIP(storage.cloud, pod.CurrentState.Host) - - return pod, err } -func (storage *PodRegistryStorage) Delete(id string) (<-chan interface{}, error) { - return apiserver.MakeAsync(func() (interface{}, error) { - return &api.Status{Status: api.StatusSuccess}, storage.registry.DeletePod(id) - }), nil -} - -func (storage *PodRegistryStorage) New() interface{} { - return &api.Pod{} -} - -func (storage *PodRegistryStorage) scheduleAndCreatePod(pod api.Pod) error { - storage.lock.Lock() - defer storage.lock.Unlock() +func (rs *RegistryStorage) scheduleAndCreatePod(pod api.Pod) error { + rs.mu.Lock() + defer rs.mu.Unlock() // TODO(lavalamp): Separate scheduler more cleanly. - machine, err := storage.scheduler.Schedule(pod, storage.minionLister) + machine, err := rs.scheduler.Schedule(pod, rs.minionLister) if err != nil { return err } - return storage.registry.CreatePod(machine, pod) + return rs.registry.CreatePod(machine, pod) } -func (storage *PodRegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { - pod := obj.(*api.Pod) - if len(pod.ID) == 0 { - pod.ID = uuid.NewUUID().String() - } - pod.DesiredState.Manifest.ID = pod.ID - - if errs := api.ValidatePod(pod); len(errs) > 0 { - return nil, fmt.Errorf("Validation errors: %v", errs) - } - - return apiserver.MakeAsync(func() (interface{}, error) { - err := storage.scheduleAndCreatePod(*pod) - if err != nil { - return nil, err - } - return storage.waitForPodRunning(*pod) - }), nil -} - -func (storage *PodRegistryStorage) Update(obj interface{}) (<-chan interface{}, error) { - pod := obj.(*api.Pod) - if errs := api.ValidatePod(pod); len(errs) > 0 { - return nil, fmt.Errorf("Validation errors: %v", errs) - } - return apiserver.MakeAsync(func() (interface{}, error) { - err := storage.registry.UpdatePod(*pod) - if err != nil { - return nil, err - } - return storage.waitForPodRunning(*pod) - }), nil -} - -func (storage *PodRegistryStorage) waitForPodRunning(pod api.Pod) (interface{}, error) { +func (rs *RegistryStorage) waitForPodRunning(pod api.Pod) (interface{}, error) { for { - podObj, err := storage.Get(pod.ID) - + podObj, err := rs.Get(pod.ID) if err != nil || podObj == nil { return nil, err } @@ -254,7 +244,7 @@ func (storage *PodRegistryStorage) waitForPodRunning(pod api.Pod) (interface{}, case api.PodRunning, api.PodTerminated: return pod, nil default: - time.Sleep(storage.podPollPeriod) + time.Sleep(rs.podPollPeriod) } } return pod, nil diff --git a/pkg/registry/podstorage_test.go b/pkg/registry/pod/storage_test.go similarity index 78% rename from pkg/registry/podstorage_test.go rename to pkg/registry/pod/storage_test.go index 7c4bc5e2af1..d3e4d5cc6ce 100644 --- a/pkg/registry/podstorage_test.go +++ b/pkg/registry/pod/storage_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package pod import ( "fmt" @@ -25,7 +25,10 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" + "github.com/fsouza/go-dockerclient" ) @@ -52,12 +55,12 @@ func expectPod(t *testing.T, ch <-chan interface{}) (*api.Pod, bool) { } func TestCreatePodRegistryError(t *testing.T) { - mockRegistry := &MockPodRegistry{ - err: fmt.Errorf("test error"), + podRegistry := ®istrytest.PodRegistry{ + Err: fmt.Errorf("test error"), } - storage := PodRegistryStorage{ - scheduler: &MockScheduler{}, - registry: mockRegistry, + storage := RegistryStorage{ + scheduler: ®istrytest.Scheduler{}, + registry: podRegistry, } desiredState := api.PodState{ Manifest: api.ContainerManifest{ @@ -69,25 +72,14 @@ func TestCreatePodRegistryError(t *testing.T) { if err != nil { t.Errorf("Expected %#v, Got %#v", nil, err) } - expectApiStatusError(t, ch, mockRegistry.err.Error()) -} - -type MockScheduler struct { - err error - pod api.Pod - machine string -} - -func (m *MockScheduler) Schedule(pod api.Pod, lister scheduler.MinionLister) (string, error) { - m.pod = pod - return m.machine, m.err + expectApiStatusError(t, ch, podRegistry.Err.Error()) } func TestCreatePodSchedulerError(t *testing.T) { - mockScheduler := MockScheduler{ - err: fmt.Errorf("test error"), + mockScheduler := registrytest.Scheduler{ + Err: fmt.Errorf("test error"), } - storage := PodRegistryStorage{ + storage := RegistryStorage{ scheduler: &mockScheduler, } desiredState := api.PodState{ @@ -100,26 +92,15 @@ func TestCreatePodSchedulerError(t *testing.T) { if err != nil { t.Errorf("Expected %#v, Got %#v", nil, err) } - expectApiStatusError(t, ch, mockScheduler.err.Error()) -} - -type MockPodStorageRegistry struct { - MockPodRegistry - machine string -} - -func (r *MockPodStorageRegistry) CreatePod(machine string, pod api.Pod) error { - r.MockPodRegistry.pod = &pod - r.machine = machine - return r.MockPodRegistry.err + expectApiStatusError(t, ch, mockScheduler.Err.Error()) } func TestCreatePodSetsIds(t *testing.T) { - mockRegistry := &MockPodStorageRegistry{ - MockPodRegistry: MockPodRegistry{err: fmt.Errorf("test error")}, + mockRegistry := ®istrytest.PodRegistryStorage{ + PodRegistry: registrytest.PodRegistry{Err: fmt.Errorf("test error")}, } - storage := PodRegistryStorage{ - scheduler: &MockScheduler{machine: "test"}, + storage := RegistryStorage{ + scheduler: ®istrytest.Scheduler{Machine: "test"}, registry: mockRegistry, } desiredState := api.PodState{ @@ -132,26 +113,26 @@ func TestCreatePodSetsIds(t *testing.T) { if err != nil { t.Errorf("Expected %#v, Got %#v", nil, err) } - expectApiStatusError(t, ch, mockRegistry.err.Error()) + expectApiStatusError(t, ch, mockRegistry.Err.Error()) - if len(mockRegistry.MockPodRegistry.pod.ID) == 0 { + if len(mockRegistry.PodRegistry.Pod.ID) == 0 { t.Errorf("Expected pod ID to be set, Got %#v", pod) } - if mockRegistry.MockPodRegistry.pod.DesiredState.Manifest.ID != mockRegistry.MockPodRegistry.pod.ID { + if mockRegistry.PodRegistry.Pod.DesiredState.Manifest.ID != mockRegistry.PodRegistry.Pod.ID { t.Errorf("Expected manifest ID to be equal to pod ID, Got %#v", pod) } } func TestListPodsError(t *testing.T) { - mockRegistry := MockPodRegistry{ - err: fmt.Errorf("test error"), + mockRegistry := registrytest.PodRegistry{ + Err: fmt.Errorf("test error"), } - storage := PodRegistryStorage{ + storage := RegistryStorage{ registry: &mockRegistry, } pods, err := storage.List(labels.Everything()) - if err != mockRegistry.err { - t.Errorf("Expected %#v, Got %#v", mockRegistry.err, err) + if err != mockRegistry.Err { + t.Errorf("Expected %#v, Got %#v", mockRegistry.Err, err) } if len(pods.(api.PodList).Items) != 0 { t.Errorf("Unexpected non-zero pod list: %#v", pods) @@ -159,8 +140,8 @@ func TestListPodsError(t *testing.T) { } func TestListEmptyPodList(t *testing.T) { - mockRegistry := MockPodRegistry{} - storage := PodRegistryStorage{ + mockRegistry := registrytest.PodRegistry{} + storage := RegistryStorage{ registry: &mockRegistry, } pods, err := storage.List(labels.Everything()) @@ -174,8 +155,8 @@ func TestListEmptyPodList(t *testing.T) { } func TestListPodList(t *testing.T) { - mockRegistry := MockPodRegistry{ - pods: []api.Pod{ + mockRegistry := registrytest.PodRegistry{ + Pods: []api.Pod{ { JSONBase: api.JSONBase{ ID: "foo", @@ -188,7 +169,7 @@ func TestListPodList(t *testing.T) { }, }, } - storage := PodRegistryStorage{ + storage := RegistryStorage{ registry: &mockRegistry, } podsObj, err := storage.List(labels.Everything()) @@ -209,8 +190,8 @@ func TestListPodList(t *testing.T) { } func TestPodDecode(t *testing.T) { - mockRegistry := MockPodRegistry{} - storage := PodRegistryStorage{ + mockRegistry := registrytest.PodRegistry{} + storage := RegistryStorage{ registry: &mockRegistry, } expected := &api.Pod{ @@ -234,12 +215,12 @@ func TestPodDecode(t *testing.T) { } func TestGetPod(t *testing.T) { - mockRegistry := MockPodRegistry{ - pod: &api.Pod{ + mockRegistry := registrytest.PodRegistry{ + Pod: &api.Pod{ JSONBase: api.JSONBase{ID: "foo"}, }, } - storage := PodRegistryStorage{ + storage := RegistryStorage{ registry: &mockRegistry, } obj, err := storage.Get("foo") @@ -248,21 +229,21 @@ func TestGetPod(t *testing.T) { t.Errorf("unexpected error: %v", err) } - if !reflect.DeepEqual(*mockRegistry.pod, *pod) { - t.Errorf("Unexpected pod. Expected %#v, Got %#v", *mockRegistry.pod, *pod) + if !reflect.DeepEqual(*mockRegistry.Pod, *pod) { + t.Errorf("Unexpected pod. Expected %#v, Got %#v", *mockRegistry.Pod, *pod) } } func TestGetPodCloud(t *testing.T) { fakeCloud := &cloudprovider.FakeCloud{} - mockRegistry := MockPodRegistry{ - pod: &api.Pod{ + mockRegistry := registrytest.PodRegistry{ + Pod: &api.Pod{ JSONBase: api.JSONBase{ID: "foo"}, }, } - storage := PodRegistryStorage{ - registry: &mockRegistry, - cloud: fakeCloud, + storage := RegistryStorage{ + registry: &mockRegistry, + cloudProvider: fakeCloud, } obj, err := storage.Get("foo") pod := obj.(*api.Pod) @@ -270,8 +251,8 @@ func TestGetPodCloud(t *testing.T) { t.Errorf("unexpected error: %v", err) } - if !reflect.DeepEqual(*mockRegistry.pod, *pod) { - t.Errorf("Unexpected pod. Expected %#v, Got %#v", *mockRegistry.pod, *pod) + if !reflect.DeepEqual(*mockRegistry.Pod, *pod) { + t.Errorf("Unexpected pod. Expected %#v, Got %#v", *mockRegistry.Pod, *pod) } if len(fakeCloud.Calls) != 1 || fakeCloud.Calls[0] != "ip-address" { t.Errorf("Unexpected calls: %#v", fakeCloud.Calls) @@ -373,11 +354,11 @@ func TestMakePodStatus(t *testing.T) { } func TestPodStorageValidatesCreate(t *testing.T) { - mockRegistry := &MockPodStorageRegistry{ - MockPodRegistry: MockPodRegistry{err: fmt.Errorf("test error")}, + mockRegistry := ®istrytest.PodRegistryStorage{ + PodRegistry: registrytest.PodRegistry{Err: fmt.Errorf("test error")}, } - storage := PodRegistryStorage{ - scheduler: &MockScheduler{machine: "test"}, + storage := RegistryStorage{ + scheduler: ®istrytest.Scheduler{Machine: "test"}, registry: mockRegistry, } pod := &api.Pod{} @@ -391,11 +372,11 @@ func TestPodStorageValidatesCreate(t *testing.T) { } func TestPodStorageValidatesUpdate(t *testing.T) { - mockRegistry := &MockPodStorageRegistry{ - MockPodRegistry: MockPodRegistry{err: fmt.Errorf("test error")}, + mockRegistry := ®istrytest.PodRegistryStorage{ + PodRegistry: registrytest.PodRegistry{Err: fmt.Errorf("test error")}, } - storage := PodRegistryStorage{ - scheduler: &MockScheduler{machine: "test"}, + storage := RegistryStorage{ + scheduler: ®istrytest.Scheduler{Machine: "test"}, registry: mockRegistry, } pod := &api.Pod{} @@ -409,19 +390,19 @@ func TestPodStorageValidatesUpdate(t *testing.T) { } func TestCreatePod(t *testing.T) { - mockRegistry := MockPodRegistry{ - pod: &api.Pod{ + mockRegistry := registrytest.PodRegistry{ + Pod: &api.Pod{ JSONBase: api.JSONBase{ID: "foo"}, CurrentState: api.PodState{ Host: "machine", }, }, } - storage := PodRegistryStorage{ + storage := RegistryStorage{ registry: &mockRegistry, podPollPeriod: time.Millisecond * 100, scheduler: scheduler.MakeRoundRobinScheduler(), - minionLister: MakeMinionRegistry([]string{"machine"}), + minionLister: minion.NewRegistry([]string{"machine"}), } desiredState := api.PodState{ Manifest: api.ContainerManifest{ @@ -479,18 +460,14 @@ func TestFillPodInfo(t *testing.T) { }, }, } - storage := PodRegistryStorage{ + storage := RegistryStorage{ podCache: &fakeGetter, } - pod := api.Pod{} - storage.fillPodInfo(&pod) - if !reflect.DeepEqual(fakeGetter.info, pod.CurrentState.Info) { t.Errorf("Expected: %#v, Got %#v", fakeGetter.info, pod.CurrentState.Info) } - if pod.CurrentState.PodIP != expectedIP { t.Errorf("Expected %s, Got %s", expectedIP, pod.CurrentState.PodIP) } @@ -506,18 +483,14 @@ func TestFillPodInfoNoData(t *testing.T) { }, }, } - storage := PodRegistryStorage{ + storage := RegistryStorage{ podCache: &fakeGetter, } - pod := api.Pod{} - storage.fillPodInfo(&pod) - if !reflect.DeepEqual(fakeGetter.info, pod.CurrentState.Info) { t.Errorf("Expected %#v, Got %#v", fakeGetter.info, pod.CurrentState.Info) } - if pod.CurrentState.PodIP != expectedIP { t.Errorf("Expected %s, Got %s", expectedIP, pod.CurrentState.PodIP) } diff --git a/pkg/registry/registrytest/controller.go b/pkg/registry/registrytest/controller.go new file mode 100644 index 00000000000..7855c611b54 --- /dev/null +++ b/pkg/registry/registrytest/controller.go @@ -0,0 +1,53 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registrytest + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +// TODO: Why do we have this AND MemoryRegistry? +type ControllerRegistry struct { + Err error + Controllers []api.ReplicationController +} + +func (r *ControllerRegistry) ListControllers() ([]api.ReplicationController, error) { + return r.Controllers, r.Err +} + +func (r *ControllerRegistry) GetController(ID string) (*api.ReplicationController, error) { + return &api.ReplicationController{}, r.Err +} + +func (r *ControllerRegistry) CreateController(controller api.ReplicationController) error { + return r.Err +} + +func (r *ControllerRegistry) UpdateController(controller api.ReplicationController) error { + return r.Err +} + +func (r *ControllerRegistry) DeleteController(ID string) error { + return r.Err +} + +func (r *ControllerRegistry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + return nil, r.Err +} diff --git a/pkg/registry/registrytest/minion.go b/pkg/registry/registrytest/minion.go new file mode 100644 index 00000000000..736c6ae697e --- /dev/null +++ b/pkg/registry/registrytest/minion.go @@ -0,0 +1,69 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registrytest + +import "sync" + +type MinionRegistry struct { + Err error + Minion string + Minions []string + sync.Mutex +} + +func NewMinionRegistry(minions []string) *MinionRegistry { + return &MinionRegistry{ + Minions: minions, + } +} + +func (r *MinionRegistry) List() ([]string, error) { + r.Lock() + defer r.Unlock() + return r.Minions, r.Err +} + +func (r *MinionRegistry) Insert(minion string) error { + r.Lock() + defer r.Unlock() + r.Minion = minion + return r.Err +} + +func (r *MinionRegistry) Contains(minion string) (bool, error) { + r.Lock() + defer r.Unlock() + for _, name := range r.Minions { + if name == minion { + return true, r.Err + } + } + return false, r.Err +} + +func (r *MinionRegistry) Delete(minion string) error { + r.Lock() + defer r.Unlock() + var newList []string + for _, name := range r.Minions { + if name != minion { + newList = append(newList, name) + } + } + r.Minions = newList + return r.Err +} diff --git a/pkg/registry/registrytest/pod.go b/pkg/registry/registrytest/pod.go new file mode 100644 index 00000000000..b2803328a46 --- /dev/null +++ b/pkg/registry/registrytest/pod.go @@ -0,0 +1,77 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registrytest + +import ( + "sync" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" +) + +type PodRegistry struct { + Err error + Pod *api.Pod + Pods []api.Pod + sync.Mutex +} + +func NewPodRegistry(pods []api.Pod) *PodRegistry { + return &PodRegistry{ + Pods: pods, + } +} + +func (r *PodRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) { + r.Lock() + defer r.Unlock() + if r.Err != nil { + return r.Pods, r.Err + } + var filtered []api.Pod + for _, pod := range r.Pods { + if selector.Matches(labels.Set(pod.Labels)) { + filtered = append(filtered, pod) + } + } + return filtered, nil +} + +func (r *PodRegistry) GetPod(podId string) (*api.Pod, error) { + r.Lock() + defer r.Unlock() + return r.Pod, r.Err +} + +func (r *PodRegistry) CreatePod(machine string, pod api.Pod) error { + r.Lock() + defer r.Unlock() + return r.Err +} + +func (r *PodRegistry) UpdatePod(pod api.Pod) error { + r.Lock() + defer r.Unlock() + r.Pod = &pod + return r.Err +} + +func (r *PodRegistry) DeletePod(podId string) error { + r.Lock() + defer r.Unlock() + return r.Err +} diff --git a/pkg/registry/registrytest/pod_storage.go b/pkg/registry/registrytest/pod_storage.go new file mode 100644 index 00000000000..fb787d48242 --- /dev/null +++ b/pkg/registry/registrytest/pod_storage.go @@ -0,0 +1,30 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registrytest + +import "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + +type PodRegistryStorage struct { + PodRegistry + machine string +} + +func (rs *PodRegistryStorage) CreatePod(machine string, pod api.Pod) error { + rs.PodRegistry.Pod = &pod + rs.machine = machine + return rs.PodRegistry.Err +} diff --git a/pkg/registry/registrytest/schedular.go b/pkg/registry/registrytest/schedular.go new file mode 100644 index 00000000000..2e44ccbaa32 --- /dev/null +++ b/pkg/registry/registrytest/schedular.go @@ -0,0 +1,33 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registrytest + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" +) + +type Scheduler struct { + Err error + Pod api.Pod + Machine string +} + +func (s *Scheduler) Schedule(pod api.Pod, lister scheduler.MinionLister) (string, error) { + s.Pod = pod + return s.Machine, s.Err +} diff --git a/pkg/registry/mock_service_registry.go b/pkg/registry/registrytest/service.go similarity index 50% rename from pkg/registry/mock_service_registry.go rename to pkg/registry/registrytest/service.go index 712881f541a..b637d9cb837 100644 --- a/pkg/registry/mock_service_registry.go +++ b/pkg/registry/registrytest/service.go @@ -14,39 +14,39 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package registrytest import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" ) -type MockServiceRegistry struct { - list api.ServiceList - err error - endpoints api.Endpoints +type ServiceRegistry struct { + List api.ServiceList + Err error + Endpoints api.Endpoints } -func (m *MockServiceRegistry) ListServices() (api.ServiceList, error) { - return m.list, m.err +func (r *ServiceRegistry) ListServices() (api.ServiceList, error) { + return r.List, r.Err } -func (m *MockServiceRegistry) CreateService(svc api.Service) error { - return m.err +func (r *ServiceRegistry) CreateService(svc api.Service) error { + return r.Err } -func (m *MockServiceRegistry) GetService(name string) (*api.Service, error) { - return nil, m.err +func (r *ServiceRegistry) GetService(name string) (*api.Service, error) { + return nil, r.Err } -func (m *MockServiceRegistry) DeleteService(name string) error { - return m.err +func (r *ServiceRegistry) DeleteService(name string) error { + return r.Err } -func (m *MockServiceRegistry) UpdateService(svc api.Service) error { - return m.err +func (r *ServiceRegistry) UpdateService(svc api.Service) error { + return r.Err } -func (m *MockServiceRegistry) UpdateEndpoints(e api.Endpoints) error { - m.endpoints = e - return m.err +func (r *ServiceRegistry) UpdateEndpoints(e api.Endpoints) error { + r.Endpoints = e + return r.Err } diff --git a/pkg/registry/service/registry.go b/pkg/registry/service/registry.go new file mode 100644 index 00000000000..676ad46ab78 --- /dev/null +++ b/pkg/registry/service/registry.go @@ -0,0 +1,31 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package service + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" +) + +// Registry is an interface for things that know how to store services. +type Registry interface { + ListServices() (api.ServiceList, error) + CreateService(svc api.Service) error + GetService(name string) (*api.Service, error) + DeleteService(name string) error + UpdateService(svc api.Service) error + UpdateEndpoints(e api.Endpoints) error +} diff --git a/pkg/registry/servicestorage.go b/pkg/registry/service/storage.go similarity index 72% rename from pkg/registry/servicestorage.go rename to pkg/registry/service/storage.go index 31fae4ad6b4..4dd54a3a3ed 100644 --- a/pkg/registry/servicestorage.go +++ b/pkg/registry/service/storage.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package service import ( "fmt" @@ -25,25 +25,168 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) -// ServiceRegistryStorage adapts a service registry into apiserver's RESTStorage model. -type ServiceRegistryStorage struct { - registry ServiceRegistry +// RegistryStorage adapts a service registry into apiserver's RESTStorage model. +type RegistryStorage struct { + registry Registry cloud cloudprovider.Interface - machines MinionRegistry + machines minion.Registry } -// MakeServiceRegistryStorage makes a new ServiceRegistryStorage. -func MakeServiceRegistryStorage(registry ServiceRegistry, cloud cloudprovider.Interface, machines MinionRegistry) apiserver.RESTStorage { - return &ServiceRegistryStorage{ +// NewRegistryStorage returns a new RegistryStorage. +func NewRegistryStorage(registry Registry, cloud cloudprovider.Interface, machines minion.Registry) apiserver.RESTStorage { + return &RegistryStorage{ registry: registry, cloud: cloud, machines: machines, } } +func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { + srv := obj.(*api.Service) + if errs := api.ValidateService(srv); len(errs) > 0 { + return nil, fmt.Errorf("Validation errors: %v", errs) + } + return apiserver.MakeAsync(func() (interface{}, error) { + // TODO: Consider moving this to a rectification loop, so that we make/remove external load balancers + // correctly no matter what http operations happen. + if srv.CreateExternalLoadBalancer { + if rs.cloud == nil { + return nil, fmt.Errorf("requested an external service, but no cloud provider supplied.") + } + balancer, ok := rs.cloud.TCPLoadBalancer() + if !ok { + return nil, fmt.Errorf("The cloud provider does not support external TCP load balancers.") + } + zones, ok := rs.cloud.Zones() + if !ok { + return nil, fmt.Errorf("The cloud provider does not support zone enumeration.") + } + hosts, err := rs.machines.List() + if err != nil { + return nil, err + } + zone, err := zones.GetZone() + if err != nil { + return nil, err + } + err = balancer.CreateTCPLoadBalancer(srv.ID, zone.Region, srv.Port, hosts) + if err != nil { + return nil, err + } + } + // TODO actually wait for the object to be fully created here. + err := rs.registry.CreateService(*srv) + if err != nil { + return nil, err + } + return rs.registry.GetService(srv.ID) + }), nil +} + +func (rs *RegistryStorage) Delete(id string) (<-chan interface{}, error) { + service, err := rs.registry.GetService(id) + if err != nil { + return nil, err + } + return apiserver.MakeAsync(func() (interface{}, error) { + rs.deleteExternalLoadBalancer(service) + return api.Status{Status: api.StatusSuccess}, rs.registry.DeleteService(id) + }), nil +} + +func (rs *RegistryStorage) Get(id string) (interface{}, error) { + s, err := rs.registry.GetService(id) + if err != nil { + return nil, err + } + return s, err +} + +func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) { + list, err := rs.registry.ListServices() + if err != nil { + return nil, err + } + var filtered []api.Service + for _, service := range list.Items { + if selector.Matches(labels.Set(service.Labels)) { + filtered = append(filtered, service) + } + } + list.Items = filtered + return list, err +} + +func (rs RegistryStorage) New() interface{} { + return &api.Service{} +} + +// GetServiceEnvironmentVariables populates a list of environment variables that are use +// in the container environment to get access to services. +func GetServiceEnvironmentVariables(registry Registry, machine string) ([]api.EnvVar, error) { + var result []api.EnvVar + services, err := registry.ListServices() + if err != nil { + return result, err + } + for _, service := range services.Items { + name := strings.ToUpper(service.ID) + "_SERVICE_PORT" + value := strconv.Itoa(service.Port) + result = append(result, api.EnvVar{Name: name, Value: value}) + result = append(result, makeLinkVariables(service, machine)...) + } + result = append(result, api.EnvVar{Name: "SERVICE_HOST", Value: machine}) + return result, nil +} + +func (rs *RegistryStorage) Update(obj interface{}) (<-chan interface{}, error) { + srv := obj.(*api.Service) + if srv.ID == "" { + return nil, fmt.Errorf("ID should not be empty: %#v", srv) + } + if errs := api.ValidateService(srv); len(errs) > 0 { + return nil, fmt.Errorf("Validation errors: %v", errs) + } + return apiserver.MakeAsync(func() (interface{}, error) { + // TODO: check to see if external load balancer status changed + err := rs.registry.UpdateService(*srv) + if err != nil { + return nil, err + } + return rs.registry.GetService(srv.ID) + }), nil +} + +func (rs *RegistryStorage) deleteExternalLoadBalancer(service *api.Service) error { + if !service.CreateExternalLoadBalancer || rs.cloud == nil { + return nil + } + zones, ok := rs.cloud.Zones() + if !ok { + // We failed to get zone enumerator. + // As this should have failed when we tried in "create" too, + // assume external load balancer was never created. + return nil + } + balancer, ok := rs.cloud.TCPLoadBalancer() + if !ok { + // See comment above. + return nil + } + zone, err := zones.GetZone() + if err != nil { + return err + } + if err := balancer.DeleteTCPLoadBalancer(service.JSONBase.ID, zone.Region); err != nil { + return err + } + return nil +} + func makeLinkVariables(service api.Service, machine string) []api.EnvVar { prefix := strings.ToUpper(service.ID) var port string @@ -76,150 +219,3 @@ func makeLinkVariables(service api.Service, machine string) []api.EnvVar { }, } } - -// GetServiceEnvironmentVariables populates a list of environment variables that are use -// in the container environment to get access to services. -func GetServiceEnvironmentVariables(registry ServiceRegistry, machine string) ([]api.EnvVar, error) { - var result []api.EnvVar - services, err := registry.ListServices() - if err != nil { - return result, err - } - for _, service := range services.Items { - name := strings.ToUpper(service.ID) + "_SERVICE_PORT" - value := strconv.Itoa(service.Port) - result = append(result, api.EnvVar{Name: name, Value: value}) - result = append(result, makeLinkVariables(service, machine)...) - } - result = append(result, api.EnvVar{Name: "SERVICE_HOST", Value: machine}) - return result, nil -} - -func (sr *ServiceRegistryStorage) List(selector labels.Selector) (interface{}, error) { - list, err := sr.registry.ListServices() - if err != nil { - return nil, err - } - var filtered []api.Service - for _, service := range list.Items { - if selector.Matches(labels.Set(service.Labels)) { - filtered = append(filtered, service) - } - } - list.Items = filtered - return list, err -} - -func (sr *ServiceRegistryStorage) Get(id string) (interface{}, error) { - service, err := sr.registry.GetService(id) - if err != nil { - return nil, err - } - return service, err -} - -func (sr *ServiceRegistryStorage) deleteExternalLoadBalancer(service *api.Service) error { - if !service.CreateExternalLoadBalancer || sr.cloud == nil { - return nil - } - - zones, ok := sr.cloud.Zones() - if !ok { - // We failed to get zone enumerator. - // As this should have failed when we tried in "create" too, - // assume external load balancer was never created. - return nil - } - - balancer, ok := sr.cloud.TCPLoadBalancer() - if !ok { - // See comment above. - return nil - } - - zone, err := zones.GetZone() - if err != nil { - return err - } - - if err := balancer.DeleteTCPLoadBalancer(service.JSONBase.ID, zone.Region); err != nil { - return err - } - - return nil -} - -func (sr *ServiceRegistryStorage) Delete(id string) (<-chan interface{}, error) { - service, err := sr.registry.GetService(id) - if err != nil { - return nil, err - } - return apiserver.MakeAsync(func() (interface{}, error) { - sr.deleteExternalLoadBalancer(service) - return &api.Status{Status: api.StatusSuccess}, sr.registry.DeleteService(id) - }), nil -} - -func (sr *ServiceRegistryStorage) New() interface{} { - return &api.Service{} -} - -func (sr *ServiceRegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { - srv := obj.(*api.Service) - if errs := api.ValidateService(srv); len(errs) > 0 { - return nil, fmt.Errorf("Validation errors: %v", errs) - } - return apiserver.MakeAsync(func() (interface{}, error) { - // TODO: Consider moving this to a rectification loop, so that we make/remove external load balancers - // correctly no matter what http operations happen. - if srv.CreateExternalLoadBalancer { - if sr.cloud == nil { - return nil, fmt.Errorf("requested an external service, but no cloud provider supplied.") - } - balancer, ok := sr.cloud.TCPLoadBalancer() - if !ok { - return nil, fmt.Errorf("The cloud provider does not support external TCP load balancers.") - } - zones, ok := sr.cloud.Zones() - if !ok { - return nil, fmt.Errorf("The cloud provider does not support zone enumeration.") - } - hosts, err := sr.machines.List() - if err != nil { - return nil, err - } - zone, err := zones.GetZone() - if err != nil { - return nil, err - } - err = balancer.CreateTCPLoadBalancer(srv.ID, zone.Region, srv.Port, hosts) - if err != nil { - return nil, err - } - } - // TODO actually wait for the object to be fully created here. - err := sr.registry.CreateService(*srv) - if err != nil { - return nil, err - } - return sr.registry.GetService(srv.ID) - }), nil -} - -func (sr *ServiceRegistryStorage) Update(obj interface{}) (<-chan interface{}, error) { - srv := obj.(*api.Service) - if srv.ID == "" { - return nil, fmt.Errorf("ID should not be empty: %#v", srv) - } - if errs := api.ValidateService(srv); len(errs) > 0 { - return nil, fmt.Errorf("Validation errors: %v", errs) - } - return apiserver.MakeAsync(func() (interface{}, error) { - // TODO: check to see if external load balancer status changed - err := sr.registry.UpdateService(*srv) - if err != nil { - return nil, err - } - return sr.registry.GetService(srv.ID) - }), nil -} diff --git a/pkg/registry/servicestorage_test.go b/pkg/registry/service/storage_test.go similarity index 73% rename from pkg/registry/servicestorage_test.go rename to pkg/registry/service/storage_test.go index af1b0c65684..bb61ab0dccd 100644 --- a/pkg/registry/servicestorage_test.go +++ b/pkg/registry/service/storage_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package service import ( "fmt" @@ -23,40 +23,37 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/memory" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) -func TestServiceRegistry(t *testing.T) { - memory := MakeMemoryRegistry() +func TestRegistry(t *testing.T) { + registry := memory.NewRegistry() fakeCloud := &cloudprovider.FakeCloud{} machines := []string{"foo", "bar", "baz"} - - storage := MakeServiceRegistryStorage(memory, fakeCloud, MakeMinionRegistry(machines)) - + storage := NewRegistryStorage(registry, fakeCloud, minion.NewRegistry(machines)) svc := &api.Service{ JSONBase: api.JSONBase{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, } c, _ := storage.Create(svc) <-c - if len(fakeCloud.Calls) != 0 { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } - srv, err := memory.GetService(svc.ID) + srv, err := registry.GetService(svc.ID) if err != nil { t.Errorf("unexpected error: %v", err) } - if srv == nil { t.Errorf("Failed to find service: %s", svc.ID) } } func TestServiceStorageValidatesCreate(t *testing.T) { - memory := MakeMemoryRegistry() - storage := MakeServiceRegistryStorage(memory, nil, nil) - + registry := memory.NewRegistry() + storage := NewRegistryStorage(registry, nil, nil) failureCases := map[string]api.Service{ "empty ID": { JSONBase: api.JSONBase{ID: ""}, @@ -79,13 +76,12 @@ func TestServiceStorageValidatesCreate(t *testing.T) { } func TestServiceStorageValidatesUpdate(t *testing.T) { - memory := MakeMemoryRegistry() - memory.CreateService(api.Service{ + registry := memory.NewRegistry() + registry.CreateService(api.Service{ JSONBase: api.JSONBase{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, }) - storage := MakeServiceRegistryStorage(memory, nil, nil) - + storage := NewRegistryStorage(registry, nil, nil) failureCases := map[string]api.Service{ "empty ID": { JSONBase: api.JSONBase{ID: ""}, @@ -108,12 +104,10 @@ func TestServiceStorageValidatesUpdate(t *testing.T) { } func TestServiceRegistryExternalService(t *testing.T) { - memory := MakeMemoryRegistry() + registry := memory.NewRegistry() fakeCloud := &cloudprovider.FakeCloud{} machines := []string{"foo", "bar", "baz"} - - storage := MakeServiceRegistryStorage(memory, fakeCloud, MakeMinionRegistry(machines)) - + storage := NewRegistryStorage(registry, fakeCloud, minion.NewRegistry(machines)) svc := &api.Service{ JSONBase: api.JSONBase{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, @@ -121,29 +115,25 @@ func TestServiceRegistryExternalService(t *testing.T) { } c, _ := storage.Create(svc) <-c - if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "create" { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } - srv, err := memory.GetService(svc.ID) + srv, err := registry.GetService(svc.ID) if err != nil { t.Errorf("unexpected error: %v", err) } - if srv == nil { t.Errorf("Failed to find service: %s", svc.ID) } } func TestServiceRegistryExternalServiceError(t *testing.T) { - memory := MakeMemoryRegistry() + registry := memory.NewRegistry() fakeCloud := &cloudprovider.FakeCloud{ Err: fmt.Errorf("test error"), } machines := []string{"foo", "bar", "baz"} - - storage := MakeServiceRegistryStorage(memory, fakeCloud, MakeMinionRegistry(machines)) - + storage := NewRegistryStorage(registry, fakeCloud, minion.NewRegistry(machines)) svc := &api.Service{ JSONBase: api.JSONBase{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, @@ -151,75 +141,66 @@ func TestServiceRegistryExternalServiceError(t *testing.T) { } c, _ := storage.Create(svc) <-c - if len(fakeCloud.Calls) != 1 || fakeCloud.Calls[0] != "get-zone" { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } - srv, err := memory.GetService("foo") + srv, err := registry.GetService("foo") if !apiserver.IsNotFound(err) { if err != nil { - t.Errorf("memory.GetService(%q) failed with %v; expected failure with not found error", svc.ID, err) + t.Errorf("registry.GetService(%q) failed with %v; expected failure with not found error", svc.ID, err) } else { - t.Errorf("memory.GetService(%q) = %v; expected failure with not found error", svc.ID, srv) + t.Errorf("registry.GetService(%q) = %v; expected failure with not found error", svc.ID, srv) } } } func TestServiceRegistryDelete(t *testing.T) { - memory := MakeMemoryRegistry() + registry := memory.NewRegistry() fakeCloud := &cloudprovider.FakeCloud{} machines := []string{"foo", "bar", "baz"} - - storage := MakeServiceRegistryStorage(memory, fakeCloud, MakeMinionRegistry(machines)) - + storage := NewRegistryStorage(registry, fakeCloud, minion.NewRegistry(machines)) svc := api.Service{ JSONBase: api.JSONBase{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, } - memory.CreateService(svc) - + registry.CreateService(svc) c, _ := storage.Delete(svc.ID) <-c - if len(fakeCloud.Calls) != 0 { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } - srv, err := memory.GetService(svc.ID) + srv, err := registry.GetService(svc.ID) if !apiserver.IsNotFound(err) { if err != nil { - t.Errorf("memory.GetService(%q) failed with %v; expected failure with not found error", svc.ID, err) + t.Errorf("registry.GetService(%q) failed with %v; expected failure with not found error", svc.ID, err) } else { - t.Errorf("memory.GetService(%q) = %v; expected failure with not found error", svc.ID, srv) + t.Errorf("registry.GetService(%q) = %v; expected failure with not found error", svc.ID, srv) } } } func TestServiceRegistryDeleteExternal(t *testing.T) { - memory := MakeMemoryRegistry() + registry := memory.NewRegistry() fakeCloud := &cloudprovider.FakeCloud{} machines := []string{"foo", "bar", "baz"} - - storage := MakeServiceRegistryStorage(memory, fakeCloud, MakeMinionRegistry(machines)) - + storage := NewRegistryStorage(registry, fakeCloud, minion.NewRegistry(machines)) svc := api.Service{ JSONBase: api.JSONBase{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, CreateExternalLoadBalancer: true, } - memory.CreateService(svc) - + registry.CreateService(svc) c, _ := storage.Delete(svc.ID) <-c - if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "delete" { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } - srv, err := memory.GetService(svc.ID) + srv, err := registry.GetService(svc.ID) if !apiserver.IsNotFound(err) { if err != nil { - t.Errorf("memory.GetService(%q) failed with %v; expected failure with not found error", svc.ID, err) + t.Errorf("registry.GetService(%q) failed with %v; expected failure with not found error", svc.ID, err) } else { - t.Errorf("memory.GetService(%q) = %v; expected failure with not found error", svc.ID, srv) + t.Errorf("registry.GetService(%q) = %v; expected failure with not found error", svc.ID, srv) } } } diff --git a/pkg/tools/etcd_tools_test.go b/pkg/tools/etcd_tools_test.go index 0ec93083260..1762f8cb132 100644 --- a/pkg/tools/etcd_tools_test.go +++ b/pkg/tools/etcd_tools_test.go @@ -70,15 +70,15 @@ func TestExtractList(t *testing.T) { Node: &etcd.Node{ Nodes: []*etcd.Node{ { - Value: `{"id":"foo"}`, + Value: `{"id":"foo"}`, ModifiedIndex: 1, }, { - Value: `{"id":"bar"}`, + Value: `{"id":"bar"}`, ModifiedIndex: 2, }, { - Value: `{"id":"baz"}`, + Value: `{"id":"baz"}`, ModifiedIndex: 3, }, },