From c21a0ca39f5a3d8af67e2f12dacf019f22e49e8b Mon Sep 17 00:00:00 2001 From: Kelsey Hightower Date: Mon, 11 Aug 2014 00:34:59 -0700 Subject: [PATCH] Breakup the registry package into separate packages. Currently all registry implementations live in a single package, which makes it bit harder to maintain. The different registry implementations do not follow the same coding style and naming conventions, which makes the code harder to read. Breakup the registry package into smaller packages based on the registry implementation. Refactor the registry packages to follow a similar coding style and naming convention. This patch does not introduce any changes in behavior. --- pkg/master/master.go | 65 ++-- pkg/master/pod_cache.go | 7 +- pkg/master/pod_cache_test.go | 4 +- .../{bindingstorage.go => binding/storage.go} | 7 +- .../storage_test.go} | 2 +- .../{interfaces.go => controller/registry.go} | 30 +- .../storage.go} | 139 ++++---- .../storage_test.go} | 77 ++--- pkg/registry/{ => endpoint}/endpoints.go | 60 ++-- pkg/registry/{ => endpoint}/endpoints_test.go | 46 +-- .../{etcdregistry.go => etcd/etcd.go} | 157 +++++---- .../etcd_test.go} | 11 +- pkg/registry/{ => etcd}/manifest_factory.go | 7 +- .../{ => etcd}/manifest_factory_test.go | 13 +- pkg/registry/memory/memory.go | 191 +++++++++++ .../memory_test.go} | 52 ++- pkg/registry/memory_registry.go | 165 ---------- .../caching_registry.go} | 99 +++--- .../caching_registry_test.go} | 37 +-- .../cloud_registry.go} | 45 ++- .../cloud_registry_test.go} | 8 +- .../healthy_registry.go} | 69 ++-- .../healthy_registry_test.go} | 33 +- .../{minionregistry.go => minion/minion.go} | 37 ++- .../minion_test.go} | 9 +- .../{minionstorage.go => minion/storage.go} | 91 +++--- .../storage_test.go} | 6 +- pkg/registry/mock_registry.go | 127 -------- pkg/registry/pod/registry.go | 36 +++ .../{podstorage.go => pod/storage.go} | 262 ++++++++------- .../storage_test.go} | 145 ++++----- pkg/registry/registrytest/controller.go | 53 +++ pkg/registry/registrytest/minion.go | 69 ++++ pkg/registry/registrytest/pod.go | 77 +++++ pkg/registry/registrytest/pod_storage.go | 30 ++ pkg/registry/registrytest/schedular.go | 33 ++ .../service.go} | 36 +-- pkg/registry/service/registry.go | 31 ++ .../{servicestorage.go => service/storage.go} | 306 +++++++++--------- .../storage_test.go} | 83 ++--- pkg/tools/etcd_tools_test.go | 6 +- 41 files changed, 1427 insertions(+), 1334 deletions(-) rename pkg/registry/{bindingstorage.go => binding/storage.go} (93%) rename pkg/registry/{bindingstorage_test.go => binding/storage_test.go} (98%) rename pkg/registry/{interfaces.go => controller/registry.go} (52%) rename pkg/registry/{controllerstorage.go => controller/storage.go} (57%) rename pkg/registry/{controllerstorage_test.go => controller/storage_test.go} (78%) rename pkg/registry/{ => endpoint}/endpoints.go (87%) rename pkg/registry/{ => endpoint}/endpoints_test.go (84%) rename pkg/registry/{etcdregistry.go => etcd/etcd.go} (63%) rename pkg/registry/{etcdregistry_test.go => etcd/etcd_test.go} (98%) rename pkg/registry/{ => etcd}/manifest_factory.go (86%) rename pkg/registry/{ => etcd}/manifest_factory_test.go (94%) create mode 100644 pkg/registry/memory/memory.go rename pkg/registry/{memory_registry_test.go => memory/memory_test.go} (92%) delete mode 100644 pkg/registry/memory_registry.go rename pkg/registry/{caching_minion_registry.go => minion/caching_registry.go} (54%) rename pkg/registry/{caching_minion_registry_test.go => minion/caching_registry_test.go} (75%) rename pkg/registry/{cloud_minion_registry.go => minion/cloud_registry.go} (66%) rename pkg/registry/{cloud_minion_registry_test.go => minion/cloud_registry_test.go} (91%) rename pkg/registry/{healthy_minion_registry.go => minion/healthy_registry.go} (60%) rename pkg/registry/{healthy_minion_registry_test.go => minion/healthy_registry_test.go} (81%) rename pkg/registry/{minionregistry.go => minion/minion.go} (92%) rename pkg/registry/{minionregistry_test.go => minion/minion_test.go} (92%) rename pkg/registry/{minionstorage.go => minion/storage.go} (56%) rename pkg/registry/{minionstorage_test.go => minion/storage_test.go} (95%) delete mode 100644 pkg/registry/mock_registry.go create mode 100644 pkg/registry/pod/registry.go rename pkg/registry/{podstorage.go => pod/storage.go} (60%) rename pkg/registry/{podstorage_test.go => pod/storage_test.go} (78%) create mode 100644 pkg/registry/registrytest/controller.go create mode 100644 pkg/registry/registrytest/minion.go create mode 100644 pkg/registry/registrytest/pod.go create mode 100644 pkg/registry/registrytest/pod_storage.go create mode 100644 pkg/registry/registrytest/schedular.go rename pkg/registry/{mock_service_registry.go => registrytest/service.go} (50%) create mode 100644 pkg/registry/service/registry.go rename pkg/registry/{servicestorage.go => service/storage.go} (72%) rename pkg/registry/{servicestorage_test.go => service/storage_test.go} (73%) diff --git a/pkg/master/master.go b/pkg/master/master.go index a01070ad418..e3cafe59636 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -25,10 +25,17 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" - "github.com/GoogleCloudPlatform/kubernetes/pkg/registry" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/controller" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/etcd" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/memory" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service" "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" - "github.com/coreos/go-etcd/etcd" + + goetcd "github.com/coreos/go-etcd/etcd" "github.com/golang/glog" ) @@ -46,10 +53,10 @@ type Config struct { // Master contains state for a Kubernetes cluster master/api server. type Master struct { - podRegistry registry.PodRegistry - controllerRegistry registry.ControllerRegistry - serviceRegistry registry.ServiceRegistry - minionRegistry registry.MinionRegistry + podRegistry pod.Registry + controllerRegistry controller.Registry + serviceRegistry service.Registry + minionRegistry minion.Registry storage map[string]apiserver.RESTStorage client *client.Client } @@ -57,10 +64,10 @@ type Master struct { // NewMemoryServer returns a new instance of Master backed with memory (not etcd). func NewMemoryServer(c *Config) *Master { m := &Master{ - podRegistry: registry.MakeMemoryRegistry(), - controllerRegistry: registry.MakeMemoryRegistry(), - serviceRegistry: registry.MakeMemoryRegistry(), - minionRegistry: registry.MakeMinionRegistry(c.Minions), + podRegistry: memory.NewRegistry(), + controllerRegistry: memory.NewRegistry(), + serviceRegistry: memory.NewRegistry(), + minionRegistry: minion.NewRegistry(c.Minions), client: c.Client, } m.init(c.Cloud, c.PodInfoGetter) @@ -69,12 +76,12 @@ func NewMemoryServer(c *Config) *Master { // New returns a new instance of Master connected to the given etcdServer. func New(c *Config) *Master { - etcdClient := etcd.NewClient(c.EtcdServers) + etcdClient := goetcd.NewClient(c.EtcdServers) minionRegistry := minionRegistryMaker(c) m := &Master{ - podRegistry: registry.MakeEtcdRegistry(etcdClient, minionRegistry), - controllerRegistry: registry.MakeEtcdRegistry(etcdClient, minionRegistry), - serviceRegistry: registry.MakeEtcdRegistry(etcdClient, minionRegistry), + podRegistry: etcd.NewRegistry(etcdClient, minionRegistry), + controllerRegistry: etcd.NewRegistry(etcdClient, minionRegistry), + serviceRegistry: etcd.NewRegistry(etcdClient, minionRegistry), minionRegistry: minionRegistry, client: c.Client, } @@ -82,23 +89,23 @@ func New(c *Config) *Master { return m } -func minionRegistryMaker(c *Config) registry.MinionRegistry { - var minionRegistry registry.MinionRegistry +func minionRegistryMaker(c *Config) minion.Registry { + var minionRegistry minion.Registry if c.Cloud != nil && len(c.MinionRegexp) > 0 { var err error - minionRegistry, err = registry.MakeCloudMinionRegistry(c.Cloud, c.MinionRegexp) + minionRegistry, err = minion.NewCloudRegistry(c.Cloud, c.MinionRegexp) if err != nil { glog.Errorf("Failed to initalize cloud minion registry reverting to static registry (%#v)", err) } } if minionRegistry == nil { - minionRegistry = registry.MakeMinionRegistry(c.Minions) + minionRegistry = minion.NewRegistry(c.Minions) } if c.HealthCheckMinions { - minionRegistry = registry.NewHealthyMinionRegistry(minionRegistry, &http.Client{}) + minionRegistry = minion.NewHealthyRegistry(minionRegistry, &http.Client{}) } if c.MinionCacheTTL > 0 { - cachingMinionRegistry, err := registry.NewCachingMinionRegistry(minionRegistry, c.MinionCacheTTL) + cachingMinionRegistry, err := minion.NewCachingRegistry(minionRegistry, c.MinionCacheTTL) if err != nil { glog.Errorf("Failed to initialize caching layer, ignoring cache.") } else { @@ -112,17 +119,23 @@ func (m *Master) init(cloud cloudprovider.Interface, podInfoGetter client.PodInf podCache := NewPodCache(podInfoGetter, m.podRegistry, time.Second*30) go podCache.Loop() - endpoints := registry.MakeEndpointController(m.serviceRegistry, m.client) + endpoints := endpoint.NewEndpointController(m.serviceRegistry, m.client) go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) random := rand.New(rand.NewSource(int64(time.Now().Nanosecond()))) s := scheduler.NewRandomFitScheduler(m.podRegistry, random) m.storage = map[string]apiserver.RESTStorage{ - "pods": registry.MakePodRegistryStorage(m.podRegistry, podInfoGetter, s, m.minionRegistry, cloud, podCache), - "replicationControllers": registry.NewControllerRegistryStorage(m.controllerRegistry, m.podRegistry), - "services": registry.MakeServiceRegistryStorage(m.serviceRegistry, cloud, m.minionRegistry), - "minions": registry.MakeMinionRegistryStorage(m.minionRegistry), - "bindings": registry.MakeBindingStorage(m.podRegistry), + "pods": pod.NewRegistryStorage(&pod.RegistryStorageConfig{ + CloudProvider: cloud, + MinionLister: m.minionRegistry, + PodCache: podCache, + PodInfoGetter: podInfoGetter, + Registry: m.podRegistry, + Scheduler: s, + }), + "replicationControllers": controller.NewRegistryStorage(m.controllerRegistry, m.podRegistry), + "services": service.NewRegistryStorage(m.serviceRegistry, cloud, m.minionRegistry), + "minions": minion.NewRegistryStorage(m.minionRegistry), } } diff --git a/pkg/master/pod_cache.go b/pkg/master/pod_cache.go index a4d0ff24b7c..1d7a36cef68 100644 --- a/pkg/master/pod_cache.go +++ b/pkg/master/pod_cache.go @@ -23,8 +23,9 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" - "github.com/GoogleCloudPlatform/kubernetes/pkg/registry" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/golang/glog" ) @@ -32,7 +33,7 @@ import ( // that cache up to date. type PodCache struct { containerInfo client.PodInfoGetter - pods registry.PodRegistry + pods pod.Registry // This is a map of pod id to a map of container name to the podInfo map[string]api.PodInfo period time.Duration @@ -40,7 +41,7 @@ type PodCache struct { } // NewPodCache returns a new PodCache which watches container information registered in the given PodRegistry. -func NewPodCache(info client.PodInfoGetter, pods registry.PodRegistry, period time.Duration) *PodCache { +func NewPodCache(info client.PodInfoGetter, pods pod.Registry, period time.Duration) *PodCache { return &PodCache{ containerInfo: info, pods: pods, diff --git a/pkg/master/pod_cache_test.go b/pkg/master/pod_cache_test.go index f2eb86c1d8e..d176c31c485 100644 --- a/pkg/master/pod_cache_test.go +++ b/pkg/master/pod_cache_test.go @@ -22,7 +22,7 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/registry" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" "github.com/fsouza/go-dockerclient" ) @@ -97,7 +97,7 @@ func TestPodUpdateAllContainers(t *testing.T) { } pods := []api.Pod{pod} - mockRegistry := registry.MakeMockPodRegistry(pods) + mockRegistry := registrytest.NewPodRegistry(pods) expected := api.PodInfo{"foo": docker.Container{ID: "foo"}} fake := FakePodInfoGetter{ diff --git a/pkg/registry/bindingstorage.go b/pkg/registry/binding/storage.go similarity index 93% rename from pkg/registry/bindingstorage.go rename to pkg/registry/binding/storage.go index 74900fdfbca..14926f6c57a 100644 --- a/pkg/registry/bindingstorage.go +++ b/pkg/registry/binding/storage.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package binding import ( "fmt" @@ -22,17 +22,18 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" ) // BindingStorage implements the RESTStorage interface. When bindings are written, it // changes the location of the affected pods. This information is eventually reflected // in the pod's CurrentState.Host field. type BindingStorage struct { - podRegistry PodRegistry + podRegistry pod.Registry } // MakeBindingStorage makes a new BindingStorage backed by the given PodRegistry. -func MakeBindingStorage(podRegistry PodRegistry) *BindingStorage { +func MakeBindingStorage(podRegistry pod.Registry) *BindingStorage { return &BindingStorage{ podRegistry: podRegistry, } diff --git a/pkg/registry/bindingstorage_test.go b/pkg/registry/binding/storage_test.go similarity index 98% rename from pkg/registry/bindingstorage_test.go rename to pkg/registry/binding/storage_test.go index a4371e8bfed..e60afcf6dc8 100644 --- a/pkg/registry/bindingstorage_test.go +++ b/pkg/registry/binding/storage_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package binding import ( "reflect" diff --git a/pkg/registry/interfaces.go b/pkg/registry/controller/registry.go similarity index 52% rename from pkg/registry/interfaces.go rename to pkg/registry/controller/registry.go index dfc3cd479e4..0765b2188bf 100644 --- a/pkg/registry/interfaces.go +++ b/pkg/registry/controller/registry.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package controller import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -22,22 +22,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) -// PodRegistry is an interface implemented by things that know how to store Pod objects. -type PodRegistry interface { - // ListPods obtains a list of pods that match selector. - ListPods(selector labels.Selector) ([]api.Pod, error) - // Get a specific pod - GetPod(podID string) (*api.Pod, error) - // Create a pod based on a specification, schedule it onto a specific machine. - CreatePod(machine string, pod api.Pod) error - // Update an existing pod - UpdatePod(pod api.Pod) error - // Delete an existing pod - DeletePod(podID string) error -} - -// ControllerRegistry is an interface for things that know how to store ReplicationControllers. -type ControllerRegistry interface { +// Registry is an interface for things that know how to store ReplicationControllers. +type Registry interface { ListControllers() ([]api.ReplicationController, error) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) GetController(controllerID string) (*api.ReplicationController, error) @@ -45,13 +31,3 @@ type ControllerRegistry interface { UpdateController(controller api.ReplicationController) error DeleteController(controllerID string) error } - -// ServiceRegistry is an interface for things that know how to store services. -type ServiceRegistry interface { - ListServices() (api.ServiceList, error) - CreateService(svc api.Service) error - GetService(name string) (*api.Service, error) - DeleteService(name string) error - UpdateService(svc api.Service) error - UpdateEndpoints(e api.Endpoints) error -} diff --git a/pkg/registry/controllerstorage.go b/pkg/registry/controller/storage.go similarity index 57% rename from pkg/registry/controllerstorage.go rename to pkg/registry/controller/storage.go index 458977d2a12..cd3c05c06a3 100644 --- a/pkg/registry/controllerstorage.go +++ b/pkg/registry/controller/storage.go @@ -14,39 +14,82 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package controller import ( "fmt" "time" - "code.google.com/p/go-uuid/uuid" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + + "code.google.com/p/go-uuid/uuid" ) -// ControllerRegistryStorage is an implementation of RESTStorage for the api server. -type ControllerRegistryStorage struct { - registry ControllerRegistry - podRegistry PodRegistry - // Period in between polls when waiting for a controller to complete - pollPeriod time.Duration +// RegistryStorage stores data for the replication controller service. +// It implements apiserver.RESTStorage. +type RegistryStorage struct { + registry Registry + podRegistry pod.Registry + pollPeriod time.Duration } -func NewControllerRegistryStorage(registry ControllerRegistry, podRegistry PodRegistry) apiserver.RESTStorage { - return &ControllerRegistryStorage{ +// NewRegistryStorage returns a new apiserver.RESTStorage for the given +// registry and podRegistry. +func NewRegistryStorage(registry Registry, podRegistry pod.Registry) apiserver.RESTStorage { + return &RegistryStorage{ registry: registry, podRegistry: podRegistry, pollPeriod: time.Second * 10, } } +// Create registers then given ReplicationController. +func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { + controller, ok := obj.(*api.ReplicationController) + if !ok { + return nil, fmt.Errorf("not a replication controller: %#v", obj) + } + if len(controller.ID) == 0 { + controller.ID = uuid.NewUUID().String() + } + // Pod Manifest ID should be assigned by the pod API + controller.DesiredState.PodTemplate.DesiredState.Manifest.ID = "" + if errs := api.ValidateReplicationController(controller); len(errs) > 0 { + return nil, fmt.Errorf("Validation errors: %v", errs) + } + return apiserver.MakeAsync(func() (interface{}, error) { + err := rs.registry.CreateController(*controller) + if err != nil { + return nil, err + } + return rs.waitForController(*controller) + }), nil +} + +// Delete asynchronously deletes the ReplicationController specified by its id. +func (rs *RegistryStorage) Delete(id string) (<-chan interface{}, error) { + return apiserver.MakeAsync(func() (interface{}, error) { + return api.Status{Status: api.StatusSuccess}, rs.registry.DeleteController(id) + }), nil +} + +// Get obtains the ReplicationController specified by its id. +func (rs *RegistryStorage) Get(id string) (interface{}, error) { + controller, err := rs.registry.GetController(id) + if err != nil { + return nil, err + } + return controller, err +} + // List obtains a list of ReplicationControllers that match selector. -func (storage *ControllerRegistryStorage) List(selector labels.Selector) (interface{}, error) { +func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) { result := api.ReplicationControllerList{} - controllers, err := storage.registry.ListControllers() + controllers, err := rs.registry.ListControllers() if err == nil { for _, controller := range controllers { if selector.Matches(labels.Set(controller.Labels)) { @@ -57,54 +100,14 @@ func (storage *ControllerRegistryStorage) List(selector labels.Selector) (interf return result, err } -// Get obtains the ReplicationController specified by its id. -func (storage *ControllerRegistryStorage) Get(id string) (interface{}, error) { - controller, err := storage.registry.GetController(id) - if err != nil { - return nil, err - } - return controller, err -} - -// Delete asynchronously deletes the ReplicationController specified by its id. -func (storage *ControllerRegistryStorage) Delete(id string) (<-chan interface{}, error) { - return apiserver.MakeAsync(func() (interface{}, error) { - return &api.Status{Status: api.StatusSuccess}, storage.registry.DeleteController(id) - }), nil -} - -// New creates a new ReplicationController for use with Create and Update -func (storage *ControllerRegistryStorage) New() interface{} { +// New creates a new ReplicationController for use with Create and Update. +func (rs RegistryStorage) New() interface{} { return &api.ReplicationController{} } -// Create registers a given new ReplicationController instance to storage.registry. -func (storage *ControllerRegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { - controller, ok := obj.(*api.ReplicationController) - if !ok { - return nil, fmt.Errorf("not a replication controller: %#v", obj) - } - if len(controller.ID) == 0 { - controller.ID = uuid.NewUUID().String() - } - // Pod Manifest ID should be assigned by the pod API - controller.DesiredState.PodTemplate.DesiredState.Manifest.ID = "" - - if errs := api.ValidateReplicationController(controller); len(errs) > 0 { - return nil, fmt.Errorf("Validation errors: %v", errs) - } - - return apiserver.MakeAsync(func() (interface{}, error) { - err := storage.registry.CreateController(*controller) - if err != nil { - return nil, err - } - return storage.waitForController(*controller) - }), nil -} - -// Update replaces a given ReplicationController instance with an existing instance in storage.registry. -func (storage *ControllerRegistryStorage) Update(obj interface{}) (<-chan interface{}, error) { +// Update replaces a given ReplicationController instance with an existing +// instance in storage.registry. +func (rs *RegistryStorage) Update(obj interface{}) (<-chan interface{}, error) { controller, ok := obj.(*api.ReplicationController) if !ok { return nil, fmt.Errorf("not a replication controller: %#v", obj) @@ -113,30 +116,30 @@ func (storage *ControllerRegistryStorage) Update(obj interface{}) (<-chan interf return nil, fmt.Errorf("Validation errors: %v", errs) } return apiserver.MakeAsync(func() (interface{}, error) { - err := storage.registry.UpdateController(*controller) + err := rs.registry.UpdateController(*controller) if err != nil { return nil, err } - return storage.waitForController(*controller) + return rs.waitForController(*controller) }), nil } -func (storage *ControllerRegistryStorage) waitForController(ctrl api.ReplicationController) (interface{}, error) { +// Watch returns ReplicationController events via a watch.Interface. +// It implements apiserver.ResourceWatcher. +func (rs *RegistryStorage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + return rs.registry.WatchControllers(label, field, resourceVersion) +} + +func (rs *RegistryStorage) waitForController(ctrl api.ReplicationController) (interface{}, error) { for { - pods, err := storage.podRegistry.ListPods(labels.Set(ctrl.DesiredState.ReplicaSelector).AsSelector()) + pods, err := rs.podRegistry.ListPods(labels.Set(ctrl.DesiredState.ReplicaSelector).AsSelector()) if err != nil { return ctrl, err } if len(pods) == ctrl.DesiredState.Replicas { break } - time.Sleep(storage.pollPeriod) + time.Sleep(rs.pollPeriod) } return ctrl, nil } - -// WatchAll returns ReplicationController events via a watch.Interface, implementing -// apiserver.ResourceWatcher. -func (storage *ControllerRegistryStorage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { - return storage.registry.WatchControllers(label, field, resourceVersion) -} diff --git a/pkg/registry/controllerstorage_test.go b/pkg/registry/controller/storage_test.go similarity index 78% rename from pkg/registry/controllerstorage_test.go rename to pkg/registry/controller/storage_test.go index 9dd699ca5f7..9447126b462 100644 --- a/pkg/registry/controllerstorage_test.go +++ b/pkg/registry/controller/storage_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package controller import ( "encoding/json" @@ -26,50 +26,20 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" ) -// TODO: Why do we have this AND MemoryRegistry? -type MockControllerRegistry struct { - err error - controllers []api.ReplicationController -} - -func (registry *MockControllerRegistry) ListControllers() ([]api.ReplicationController, error) { - return registry.controllers, registry.err -} - -func (registry *MockControllerRegistry) GetController(ID string) (*api.ReplicationController, error) { - return &api.ReplicationController{}, registry.err -} - -func (registry *MockControllerRegistry) CreateController(controller api.ReplicationController) error { - return registry.err -} - -func (registry *MockControllerRegistry) UpdateController(controller api.ReplicationController) error { - return registry.err -} - -func (registry *MockControllerRegistry) DeleteController(ID string) error { - return registry.err -} - -func (registry *MockControllerRegistry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { - return nil, registry.err -} - func TestListControllersError(t *testing.T) { - mockRegistry := MockControllerRegistry{ - err: fmt.Errorf("test error"), + mockRegistry := registrytest.ControllerRegistry{ + Err: fmt.Errorf("test error"), } - storage := ControllerRegistryStorage{ + storage := RegistryStorage{ registry: &mockRegistry, } controllersObj, err := storage.List(nil) controllers := controllersObj.(api.ReplicationControllerList) - if err != mockRegistry.err { - t.Errorf("Expected %#v, Got %#v", mockRegistry.err, err) + if err != mockRegistry.Err { + t.Errorf("Expected %#v, Got %#v", mockRegistry.Err, err) } if len(controllers.Items) != 0 { t.Errorf("Unexpected non-zero ctrl list: %#v", controllers) @@ -77,8 +47,8 @@ func TestListControllersError(t *testing.T) { } func TestListEmptyControllerList(t *testing.T) { - mockRegistry := MockControllerRegistry{} - storage := ControllerRegistryStorage{ + mockRegistry := registrytest.ControllerRegistry{} + storage := RegistryStorage{ registry: &mockRegistry, } controllers, err := storage.List(labels.Everything()) @@ -92,8 +62,8 @@ func TestListEmptyControllerList(t *testing.T) { } func TestListControllerList(t *testing.T) { - mockRegistry := MockControllerRegistry{ - controllers: []api.ReplicationController{ + mockRegistry := registrytest.ControllerRegistry{ + Controllers: []api.ReplicationController{ { JSONBase: api.JSONBase{ ID: "foo", @@ -106,7 +76,7 @@ func TestListControllerList(t *testing.T) { }, }, } - storage := ControllerRegistryStorage{ + storage := RegistryStorage{ registry: &mockRegistry, } controllersObj, err := storage.List(labels.Everything()) @@ -127,8 +97,8 @@ func TestListControllerList(t *testing.T) { } func TestControllerDecode(t *testing.T) { - mockRegistry := MockControllerRegistry{} - storage := ControllerRegistryStorage{ + mockRegistry := registrytest.ControllerRegistry{} + storage := RegistryStorage{ registry: &mockRegistry, } controller := &api.ReplicationController{ @@ -238,16 +208,16 @@ var validPodTemplate = api.PodTemplate{ } func TestCreateController(t *testing.T) { - mockRegistry := MockControllerRegistry{} - mockPodRegistry := MockPodRegistry{ - pods: []api.Pod{ + mockRegistry := registrytest.ControllerRegistry{} + mockPodRegistry := registrytest.PodRegistry{ + Pods: []api.Pod{ { JSONBase: api.JSONBase{ID: "foo"}, Labels: map[string]string{"a": "b"}, }, }, } - storage := ControllerRegistryStorage{ + storage := RegistryStorage{ registry: &mockRegistry, podRegistry: &mockPodRegistry, pollPeriod: time.Millisecond * 1, @@ -276,7 +246,7 @@ func TestCreateController(t *testing.T) { } mockPodRegistry.Lock() - mockPodRegistry.pods = []api.Pod{ + mockPodRegistry.Pods = []api.Pod{ { JSONBase: api.JSONBase{ID: "foo"}, Labels: map[string]string{"a": "b"}, @@ -297,8 +267,8 @@ func TestCreateController(t *testing.T) { } func TestControllerStorageValidatesCreate(t *testing.T) { - mockRegistry := MockControllerRegistry{} - storage := ControllerRegistryStorage{ + mockRegistry := registrytest.ControllerRegistry{} + storage := RegistryStorage{ registry: &mockRegistry, podRegistry: nil, pollPeriod: time.Millisecond * 1, @@ -328,13 +298,12 @@ func TestControllerStorageValidatesCreate(t *testing.T) { } func TestControllerStorageValidatesUpdate(t *testing.T) { - mockRegistry := MockControllerRegistry{} - storage := ControllerRegistryStorage{ + mockRegistry := registrytest.ControllerRegistry{} + storage := RegistryStorage{ registry: &mockRegistry, podRegistry: nil, pollPeriod: time.Millisecond * 1, } - failureCases := map[string]api.ReplicationController{ "empty ID": { JSONBase: api.JSONBase{ID: ""}, diff --git a/pkg/registry/endpoints.go b/pkg/registry/endpoint/endpoints.go similarity index 87% rename from pkg/registry/endpoints.go rename to pkg/registry/endpoint/endpoints.go index f7d34f89899..1eaa5e32c37 100644 --- a/pkg/registry/endpoints.go +++ b/pkg/registry/endpoint/endpoints.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package endpoint import ( "fmt" @@ -24,42 +24,27 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/golang/glog" ) -func MakeEndpointController(serviceRegistry ServiceRegistry, client *client.Client) *EndpointController { +// A EndpointController manages service endpoints. +type EndpointController struct { + client *client.Client + serviceRegistry service.Registry +} + +// NewEndpointController returns a new *EndpointController. +func NewEndpointController(serviceRegistry service.Registry, client *client.Client) *EndpointController { return &EndpointController{ serviceRegistry: serviceRegistry, client: client, } } -type EndpointController struct { - serviceRegistry ServiceRegistry - client *client.Client -} - -func findPort(manifest *api.ContainerManifest, portName util.IntOrString) (int, error) { - if ((portName.Kind == util.IntstrString && len(portName.StrVal) == 0) || - (portName.Kind == util.IntstrInt && portName.IntVal == 0)) && - len(manifest.Containers[0].Ports) > 0 { - return manifest.Containers[0].Ports[0].ContainerPort, nil - } - if portName.Kind == util.IntstrInt { - return portName.IntVal, nil - } - name := portName.StrVal - for _, container := range manifest.Containers { - for _, port := range container.Ports { - if port.Name == name { - return port.ContainerPort, nil - } - } - } - return -1, fmt.Errorf("no suitable port for manifest: %s", manifest.ID) -} - +// SyncServiceEndpoints syncs service endpoints. func (e *EndpointController) SyncServiceEndpoints() error { services, err := e.serviceRegistry.ListServices() if err != nil { @@ -98,3 +83,24 @@ func (e *EndpointController) SyncServiceEndpoints() error { } return resultErr } + +// findPort locates the container port for the given manifest and portName. +func findPort(manifest *api.ContainerManifest, portName util.IntOrString) (int, error) { + if ((portName.Kind == util.IntstrString && len(portName.StrVal) == 0) || + (portName.Kind == util.IntstrInt && portName.IntVal == 0)) && + len(manifest.Containers[0].Ports) > 0 { + return manifest.Containers[0].Ports[0].ContainerPort, nil + } + if portName.Kind == util.IntstrInt { + return portName.IntVal, nil + } + name := portName.StrVal + for _, container := range manifest.Containers { + for _, port := range container.Ports { + if port.Name == name { + return port.ContainerPort, nil + } + } + } + return -1, fmt.Errorf("no suitable port for manifest: %s", manifest.ID) +} diff --git a/pkg/registry/endpoints_test.go b/pkg/registry/endpoint/endpoints_test.go similarity index 84% rename from pkg/registry/endpoints_test.go rename to pkg/registry/endpoint/endpoints_test.go index db5d3463f5d..87fe7d4e927 100644 --- a/pkg/registry/endpoints_test.go +++ b/pkg/registry/endpoint/endpoints_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package endpoint import ( "encoding/json" @@ -24,6 +24,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) @@ -82,7 +83,6 @@ func TestFindPort(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - if port != 8080 { t.Errorf("Expected 8080, Got %d", port) } @@ -90,7 +90,6 @@ func TestFindPort(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - if port != 8000 { t.Errorf("Expected 8000, Got %d", port) } @@ -110,7 +109,6 @@ func TestFindPort(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - if port != 8080 { t.Errorf("Expected 8080, Got %d", port) } @@ -118,7 +116,6 @@ func TestFindPort(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - if port != 8080 { t.Errorf("Expected 8080, Got %d", port) } @@ -132,10 +129,8 @@ func TestSyncEndpointsEmpty(t *testing.T) { } testServer := httptest.NewTLSServer(&fakeHandler) client := client.New(testServer.URL, nil) - - serviceRegistry := MockServiceRegistry{} - - endpoints := MakeEndpointController(&serviceRegistry, client) + serviceRegistry := registrytest.ServiceRegistry{} + endpoints := NewEndpointController(&serviceRegistry, client) err := endpoints.SyncServiceEndpoints() if err != nil { t.Errorf("unexpected error: %v", err) @@ -151,15 +146,13 @@ func TestSyncEndpointsError(t *testing.T) { } testServer := httptest.NewTLSServer(&fakeHandler) client := client.New(testServer.URL, nil) - - serviceRegistry := MockServiceRegistry{ - err: fmt.Errorf("test error"), + serviceRegistry := registrytest.ServiceRegistry{ + Err: fmt.Errorf("test error"), } - - endpoints := MakeEndpointController(&serviceRegistry, client) + endpoints := NewEndpointController(&serviceRegistry, client) err := endpoints.SyncServiceEndpoints() - if err != serviceRegistry.err { - t.Errorf("Errors don't match: %#v %#v", err, serviceRegistry.err) + if err != serviceRegistry.Err { + t.Errorf("Errors don't match: %#v %#v", err, serviceRegistry.Err) } } @@ -171,9 +164,8 @@ func TestSyncEndpointsItems(t *testing.T) { } testServer := httptest.NewTLSServer(&fakeHandler) client := client.New(testServer.URL, nil) - - serviceRegistry := MockServiceRegistry{ - list: api.ServiceList{ + serviceRegistry := registrytest.ServiceRegistry{ + List: api.ServiceList{ Items: []api.Service{ { Selector: map[string]string{ @@ -183,15 +175,13 @@ func TestSyncEndpointsItems(t *testing.T) { }, }, } - - endpoints := MakeEndpointController(&serviceRegistry, client) + endpoints := NewEndpointController(&serviceRegistry, client) err := endpoints.SyncServiceEndpoints() if err != nil { t.Errorf("unexpected error: %v", err) } - - if len(serviceRegistry.endpoints.Endpoints) != 1 { - t.Errorf("Unexpected endpoints update: %#v", serviceRegistry.endpoints) + if len(serviceRegistry.Endpoints.Endpoints) != 1 { + t.Errorf("Unexpected endpoints update: %#v", serviceRegistry.Endpoints) } } @@ -201,9 +191,8 @@ func TestSyncEndpointsPodError(t *testing.T) { } testServer := httptest.NewTLSServer(&fakeHandler) client := client.New(testServer.URL, nil) - - serviceRegistry := MockServiceRegistry{ - list: api.ServiceList{ + serviceRegistry := registrytest.ServiceRegistry{ + List: api.ServiceList{ Items: []api.Service{ { Selector: map[string]string{ @@ -213,8 +202,7 @@ func TestSyncEndpointsPodError(t *testing.T) { }, }, } - - endpoints := MakeEndpointController(&serviceRegistry, client) + endpoints := NewEndpointController(&serviceRegistry, client) err := endpoints.SyncServiceEndpoints() if err == nil { t.Error("Unexpected non-error") diff --git a/pkg/registry/etcdregistry.go b/pkg/registry/etcd/etcd.go similarity index 63% rename from pkg/registry/etcdregistry.go rename to pkg/registry/etcd/etcd.go index 2eb9e30deb4..31c4f669663 100644 --- a/pkg/registry/etcdregistry.go +++ b/pkg/registry/etcd/etcd.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package etcd import ( "fmt" @@ -22,27 +22,31 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + "github.com/golang/glog" ) // TODO: Need to add a reconciler loop that makes sure that things in pods are reflected into // kubelet (and vice versa) -// EtcdRegistry implements PodRegistry, ControllerRegistry and ServiceRegistry with backed by etcd. -type EtcdRegistry struct { - helper tools.EtcdHelper +// Registry implements PodRegistry, ControllerRegistry and ServiceRegistry +// with backed by etcd. +type Registry struct { + tools.EtcdHelper manifestFactory ManifestFactory } -// MakeEtcdRegistry creates an etcd registry. -// 'client' is the connection to etcd -// 'machines' is the list of machines -// 'scheduler' is the scheduling algorithm to use. -func MakeEtcdRegistry(client tools.EtcdClient, machines MinionRegistry) *EtcdRegistry { - registry := &EtcdRegistry{ - helper: tools.EtcdHelper{client, api.Codec, api.ResourceVersioner}, +// NewRegistry creates an etcd registry. +func NewRegistry(client tools.EtcdClient, machines minion.Registry) *Registry { + registry := &Registry{ + EtcdHelper: tools.EtcdHelper{ + client, + api.Codec, + api.ResourceVersioner, + }, } registry.manifestFactory = &BasicManifestFactory{ serviceRegistry: registry, @@ -55,11 +59,10 @@ func makePodKey(podID string) string { } // ListPods obtains a list of pods that match selector. -func (registry *EtcdRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) { +func (r *Registry) ListPods(selector labels.Selector) ([]api.Pod, error) { allPods := []api.Pod{} filteredPods := []api.Pod{} - err := registry.helper.ExtractList("/registry/pods", &allPods) - if err != nil { + if err := r.ExtractList("/registry/pods", &allPods); err != nil { return nil, err } for _, pod := range allPods { @@ -75,10 +78,9 @@ func (registry *EtcdRegistry) ListPods(selector labels.Selector) ([]api.Pod, err } // GetPod gets a specific pod specified by its ID. -func (registry *EtcdRegistry) GetPod(podID string) (*api.Pod, error) { +func (r *Registry) GetPod(podID string) (*api.Pod, error) { var pod api.Pod - err := registry.helper.ExtractObj(makePodKey(podID), &pod, false) - if err != nil { + if err := r.ExtractObj(makePodKey(podID), &pod, false); err != nil { return nil, err } // TODO: Currently nothing sets CurrentState.Host. We need a feedback loop that sets @@ -93,66 +95,52 @@ func makeContainerKey(machine string) string { } // CreatePod creates a pod based on a specification, schedule it onto a specific machine. -func (registry *EtcdRegistry) CreatePod(machine string, pod api.Pod) error { +func (r *Registry) CreatePod(machine string, pod api.Pod) error { // Set current status to "Waiting". pod.CurrentState.Status = api.PodWaiting pod.CurrentState.Host = "" - // DesiredState.Host == "" is a signal to the scheduler that this pod needs scheduling. pod.DesiredState.Status = api.PodRunning pod.DesiredState.Host = "" - - err := registry.helper.CreateObj(makePodKey(pod.ID), &pod) - if err != nil { + if err := r.CreateObj(makePodKey(pod.ID), &pod); err != nil { return err } - // TODO: Until scheduler separation is completed, just assign here. - return registry.AssignPod(pod.ID, machine) + return r.AssignPod(pod.ID, machine) } // AssignPod assigns the given pod to the given machine. // TODO: hook this up via apiserver, not by calling it from CreatePod(). -func (registry *EtcdRegistry) AssignPod(podID string, machine string) error { +func (r *Registry) AssignPod(podID string, machine string) error { podKey := makePodKey(podID) var finalPod *api.Pod - err := registry.helper.AtomicUpdate( - podKey, - &api.Pod{}, - func(obj interface{}) (interface{}, error) { - pod, ok := obj.(*api.Pod) - if !ok { - return nil, fmt.Errorf("unexpected object: %#v", obj) - } - pod.DesiredState.Host = machine - finalPod = pod - return pod, nil - }, - ) + err := r.AtomicUpdate(podKey, &api.Pod{}, func(obj interface{}) (interface{}, error) { + pod, ok := obj.(*api.Pod) + if !ok { + return nil, fmt.Errorf("unexpected object: %#v", obj) + } + pod.DesiredState.Host = machine + finalPod = pod + return pod, nil + }) if err != nil { return err } - // TODO: move this to a watch/rectification loop. - manifest, err := registry.manifestFactory.MakeManifest(machine, *finalPod) + manifest, err := r.manifestFactory.MakeManifest(machine, *finalPod) if err != nil { return err } - contKey := makeContainerKey(machine) - err = registry.helper.AtomicUpdate( - contKey, - &api.ContainerManifestList{}, - func(in interface{}) (interface{}, error) { - manifests := *in.(*api.ContainerManifestList) - manifests.Items = append(manifests.Items, manifest) - return manifests, nil - }, - ) + err = r.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) { + manifests := *in.(*api.ContainerManifestList) + manifests.Items = append(manifests.Items, manifest) + return manifests, nil + }) if err != nil { // Don't strand stuff. This is a terrible hack that won't be needed // when the above TODO is fixed. - err2 := registry.helper.Delete(podKey, false) + err2 := r.Delete(podKey, false) if err2 != nil { glog.Errorf("Probably stranding a pod, couldn't delete %v: %#v", podKey, err2) } @@ -160,41 +148,38 @@ func (registry *EtcdRegistry) AssignPod(podID string, machine string) error { return err } -func (registry *EtcdRegistry) UpdatePod(pod api.Pod) error { +func (r *Registry) UpdatePod(pod api.Pod) error { return fmt.Errorf("unimplemented!") } // DeletePod deletes an existing pod specified by its ID. -func (registry *EtcdRegistry) DeletePod(podID string) error { +func (r *Registry) DeletePod(podID string) error { var pod api.Pod podKey := makePodKey(podID) - err := registry.helper.ExtractObj(podKey, &pod, false) + err := r.ExtractObj(podKey, &pod, false) if tools.IsEtcdNotFound(err) { return apiserver.NewNotFoundErr("pod", podID) } if err != nil { return err } - // First delete the pod, so a scheduler doesn't notice it getting removed from the // machine and attempt to put it somewhere. - err = registry.helper.Delete(podKey, true) + err = r.Delete(podKey, true) if tools.IsEtcdNotFound(err) { return apiserver.NewNotFoundErr("pod", podID) } if err != nil { return err } - machine := pod.DesiredState.Host if machine == "" { // Pod was never scheduled anywhere, just return. return nil } - // Next, remove the pod from the machine atomically. contKey := makeContainerKey(machine) - return registry.helper.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) { + return r.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) { manifests := in.(*api.ContainerManifestList) newManifests := make([]api.ContainerManifest, 0, len(manifests.Items)) found := false @@ -217,18 +202,18 @@ func (registry *EtcdRegistry) DeletePod(podID string) error { } // ListControllers obtains a list of ReplicationControllers. -func (registry *EtcdRegistry) ListControllers() ([]api.ReplicationController, error) { +func (r *Registry) ListControllers() ([]api.ReplicationController, error) { var controllers []api.ReplicationController - err := registry.helper.ExtractList("/registry/controllers", &controllers) + err := r.ExtractList("/registry/controllers", &controllers) return controllers, err } // WatchControllers begins watching for new, changed, or deleted controllers. -func (registry *EtcdRegistry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (r *Registry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { if !field.Empty() { return nil, fmt.Errorf("no field selector implemented for controllers") } - return registry.helper.WatchList("/registry/controllers", resourceVersion, func(obj interface{}) bool { + return r.WatchList("/registry/controllers", resourceVersion, func(obj interface{}) bool { return label.Matches(labels.Set(obj.(*api.ReplicationController).Labels)) }) } @@ -238,10 +223,10 @@ func makeControllerKey(id string) string { } // GetController gets a specific ReplicationController specified by its ID. -func (registry *EtcdRegistry) GetController(controllerID string) (*api.ReplicationController, error) { +func (r *Registry) GetController(controllerID string) (*api.ReplicationController, error) { var controller api.ReplicationController key := makeControllerKey(controllerID) - err := registry.helper.ExtractObj(key, &controller, false) + err := r.ExtractObj(key, &controller, false) if tools.IsEtcdNotFound(err) { return nil, apiserver.NewNotFoundErr("replicationController", controllerID) } @@ -252,8 +237,8 @@ func (registry *EtcdRegistry) GetController(controllerID string) (*api.Replicati } // CreateController creates a new ReplicationController. -func (registry *EtcdRegistry) CreateController(controller api.ReplicationController) error { - err := registry.helper.CreateObj(makeControllerKey(controller.ID), controller) +func (r *Registry) CreateController(controller api.ReplicationController) error { + err := r.CreateObj(makeControllerKey(controller.ID), controller) if tools.IsEtcdNodeExist(err) { return apiserver.NewAlreadyExistsErr("replicationController", controller.ID) } @@ -261,14 +246,14 @@ func (registry *EtcdRegistry) CreateController(controller api.ReplicationControl } // UpdateController replaces an existing ReplicationController. -func (registry *EtcdRegistry) UpdateController(controller api.ReplicationController) error { - return registry.helper.SetObj(makeControllerKey(controller.ID), controller) +func (r *Registry) UpdateController(controller api.ReplicationController) error { + return r.SetObj(makeControllerKey(controller.ID), controller) } // DeleteController deletes a ReplicationController specified by its ID. -func (registry *EtcdRegistry) DeleteController(controllerID string) error { +func (r *Registry) DeleteController(controllerID string) error { key := makeControllerKey(controllerID) - err := registry.helper.Delete(key, false) + err := r.Delete(key, false) if tools.IsEtcdNotFound(err) { return apiserver.NewNotFoundErr("replicationController", controllerID) } @@ -280,15 +265,15 @@ func makeServiceKey(name string) string { } // ListServices obtains a list of Services. -func (registry *EtcdRegistry) ListServices() (api.ServiceList, error) { +func (r *Registry) ListServices() (api.ServiceList, error) { var list api.ServiceList - err := registry.helper.ExtractList("/registry/services/specs", &list.Items) + err := r.ExtractList("/registry/services/specs", &list.Items) return list, err } // CreateService creates a new Service. -func (registry *EtcdRegistry) CreateService(svc api.Service) error { - err := registry.helper.CreateObj(makeServiceKey(svc.ID), svc) +func (r *Registry) CreateService(svc api.Service) error { + err := r.CreateObj(makeServiceKey(svc.ID), svc) if tools.IsEtcdNodeExist(err) { return apiserver.NewAlreadyExistsErr("service", svc.ID) } @@ -296,10 +281,10 @@ func (registry *EtcdRegistry) CreateService(svc api.Service) error { } // GetService obtains a Service specified by its name. -func (registry *EtcdRegistry) GetService(name string) (*api.Service, error) { +func (r *Registry) GetService(name string) (*api.Service, error) { key := makeServiceKey(name) var svc api.Service - err := registry.helper.ExtractObj(key, &svc, false) + err := r.ExtractObj(key, &svc, false) if tools.IsEtcdNotFound(err) { return nil, apiserver.NewNotFoundErr("service", name) } @@ -314,9 +299,9 @@ func makeServiceEndpointsKey(name string) string { } // DeleteService deletes a Service specified by its name. -func (registry *EtcdRegistry) DeleteService(name string) error { +func (r *Registry) DeleteService(name string) error { key := makeServiceKey(name) - err := registry.helper.Delete(key, true) + err := r.Delete(key, true) if tools.IsEtcdNotFound(err) { return apiserver.NewNotFoundErr("service", name) } @@ -324,7 +309,7 @@ func (registry *EtcdRegistry) DeleteService(name string) error { return err } key = makeServiceEndpointsKey(name) - err = registry.helper.Delete(key, true) + err = r.Delete(key, true) if !tools.IsEtcdNotFound(err) { return err } @@ -332,12 +317,14 @@ func (registry *EtcdRegistry) DeleteService(name string) error { } // UpdateService replaces an existing Service. -func (registry *EtcdRegistry) UpdateService(svc api.Service) error { - return registry.helper.SetObj(makeServiceKey(svc.ID), svc) +func (r *Registry) UpdateService(svc api.Service) error { + return r.SetObj(makeServiceKey(svc.ID), svc) } // UpdateEndpoints update Endpoints of a Service. -func (registry *EtcdRegistry) UpdateEndpoints(e api.Endpoints) error { - updateFunc := func(interface{}) (interface{}, error) { return e, nil } - return registry.helper.AtomicUpdate(makeServiceEndpointsKey(e.ID), &api.Endpoints{}, updateFunc) +func (r *Registry) UpdateEndpoints(e api.Endpoints) error { + return r.AtomicUpdate(makeServiceEndpointsKey(e.ID), &api.Endpoints{}, + func(interface{}) (interface{}, error) { + return e, nil + }) } diff --git a/pkg/registry/etcdregistry_test.go b/pkg/registry/etcd/etcd_test.go similarity index 98% rename from pkg/registry/etcdregistry_test.go rename to pkg/registry/etcd/etcd_test.go index 967e60645bb..6ef0889c939 100644 --- a/pkg/registry/etcdregistry_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package etcd import ( "reflect" @@ -23,14 +23,17 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/coreos/go-etcd/etcd" ) -func MakeTestEtcdRegistry(client tools.EtcdClient, machines []string) *EtcdRegistry { - registry := MakeEtcdRegistry(client, MakeMinionRegistry(machines)) +func MakeTestEtcdRegistry(client tools.EtcdClient, machines []string) *Registry { + registry := NewRegistry(client, minion.NewRegistry(machines)) registry.manifestFactory = &BasicManifestFactory{ - serviceRegistry: &MockServiceRegistry{}, + serviceRegistry: ®istrytest.ServiceRegistry{}, } return registry } diff --git a/pkg/registry/manifest_factory.go b/pkg/registry/etcd/manifest_factory.go similarity index 86% rename from pkg/registry/manifest_factory.go rename to pkg/registry/etcd/manifest_factory.go index 9f8cb19a598..fc724882b30 100644 --- a/pkg/registry/manifest_factory.go +++ b/pkg/registry/etcd/manifest_factory.go @@ -14,10 +14,11 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package etcd import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service" ) type ManifestFactory interface { @@ -26,11 +27,11 @@ type ManifestFactory interface { } type BasicManifestFactory struct { - serviceRegistry ServiceRegistry + serviceRegistry service.Registry } func (b *BasicManifestFactory) MakeManifest(machine string, pod api.Pod) (api.ContainerManifest, error) { - envVars, err := GetServiceEnvironmentVariables(b.serviceRegistry, machine) + envVars, err := service.GetServiceEnvironmentVariables(b.serviceRegistry, machine) if err != nil { return api.ContainerManifest{}, err } diff --git a/pkg/registry/manifest_factory_test.go b/pkg/registry/etcd/manifest_factory_test.go similarity index 94% rename from pkg/registry/manifest_factory_test.go rename to pkg/registry/etcd/manifest_factory_test.go index c95f1669c6d..b51090be5cf 100644 --- a/pkg/registry/manifest_factory_test.go +++ b/pkg/registry/etcd/manifest_factory_test.go @@ -14,18 +14,19 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package etcd import ( "reflect" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) func TestMakeManifestNoServices(t *testing.T) { - registry := MockServiceRegistry{} + registry := registrytest.ServiceRegistry{} factory := &BasicManifestFactory{ serviceRegistry: ®istry, } @@ -58,8 +59,8 @@ func TestMakeManifestNoServices(t *testing.T) { } func TestMakeManifestServices(t *testing.T) { - registry := MockServiceRegistry{ - list: api.ServiceList{ + registry := registrytest.ServiceRegistry{ + List: api.ServiceList{ Items: []api.Service{ { JSONBase: api.JSONBase{ID: "test"}, @@ -134,8 +135,8 @@ func TestMakeManifestServices(t *testing.T) { } func TestMakeManifestServicesExistingEnvVar(t *testing.T) { - registry := MockServiceRegistry{ - list: api.ServiceList{ + registry := registrytest.ServiceRegistry{ + List: api.ServiceList{ Items: []api.Service{ { JSONBase: api.JSONBase{ID: "test"}, diff --git a/pkg/registry/memory/memory.go b/pkg/registry/memory/memory.go new file mode 100644 index 00000000000..d74b037858f --- /dev/null +++ b/pkg/registry/memory/memory.go @@ -0,0 +1,191 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package memory + +import ( + "errors" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +// An implementation of PodRegistry and ControllerRegistry that is backed +// by memory. Mainly used for testing. +type Registry struct { + podData map[string]api.Pod + controllerData map[string]api.ReplicationController + serviceData map[string]api.Service +} + +// NewRegistry returns a new Registry. +func NewRegistry() *Registry { + return &Registry{ + podData: map[string]api.Pod{}, + controllerData: map[string]api.ReplicationController{}, + serviceData: map[string]api.Service{}, + } +} + +// CreateController registers the given replication controller. +func (r *Registry) CreateController(controller api.ReplicationController) error { + r.controllerData[controller.ID] = controller + return nil +} + +// CreatePod registers the given pod. +func (r *Registry) CreatePod(machine string, pod api.Pod) error { + r.podData[pod.ID] = pod + return nil +} + +// CreateService registers the given service. +func (r *Registry) CreateService(svc api.Service) error { + r.serviceData[svc.ID] = svc + return nil +} + +// DeleteController deletes the named replication controller from the +// registry. +func (r *Registry) DeleteController(controllerID string) error { + if _, ok := r.controllerData[controllerID]; !ok { + return apiserver.NewNotFoundErr("replicationController", controllerID) + } + delete(r.controllerData, controllerID) + return nil +} + +// DeletePod deletes the named pod from the registry. +func (r *Registry) DeletePod(podID string) error { + if _, ok := r.podData[podID]; !ok { + return apiserver.NewNotFoundErr("pod", podID) + } + delete(r.podData, podID) + return nil +} + +// DeleteService deletes the named service from the registry. +// It returns an error if the service is not found in the registry. +func (r *Registry) DeleteService(name string) error { + if _, ok := r.serviceData[name]; !ok { + return apiserver.NewNotFoundErr("service", name) + } + delete(r.serviceData, name) + return nil +} + +// GetController returns an *api.ReplicationController for the name controller. +// It returns an error if the controller is not found in the registry. +func (r *Registry) GetController(controllerID string) (*api.ReplicationController, error) { + controller, found := r.controllerData[controllerID] + if found { + return &controller, nil + } else { + return nil, apiserver.NewNotFoundErr("replicationController", controllerID) + } +} + +// GetPod returns an *api.Pod for the named pod. +// It returns an error if the pod is not found in the registry. +func (r *Registry) GetPod(podID string) (*api.Pod, error) { + pod, found := r.podData[podID] + if found { + return &pod, nil + } else { + return nil, apiserver.NewNotFoundErr("pod", podID) + } +} + +// GetService returns an *api.Service for the named service. +// It returns an error if the service is not found in the registry. +func (r *Registry) GetService(name string) (*api.Service, error) { + svc, found := r.serviceData[name] + if !found { + return nil, apiserver.NewNotFoundErr("service", name) + } + return &svc, nil +} + +// ListControllers returns all registered replication controllers. +func (r *Registry) ListControllers() ([]api.ReplicationController, error) { + result := []api.ReplicationController{} + for _, value := range r.controllerData { + result = append(result, value) + } + return result, nil +} + +// ListPods returns all registered pods for the given selector. +func (r *Registry) ListPods(selector labels.Selector) ([]api.Pod, error) { + result := []api.Pod{} + for _, value := range r.podData { + if selector.Matches(labels.Set(value.Labels)) { + result = append(result, value) + } + } + return result, nil +} + +// ListServices returns all registered services. +func (r *Registry) ListServices() (api.ServiceList, error) { + var list []api.Service + for _, value := range r.serviceData { + list = append(list, value) + } + return api.ServiceList{Items: list}, nil +} + +// UpdateController updates the given controller in the registry. +// It returns an error if the controller is not found in the registry. +func (r *Registry) UpdateController(controller api.ReplicationController) error { + if _, ok := r.controllerData[controller.ID]; !ok { + return apiserver.NewNotFoundErr("replicationController", controller.ID) + } + r.controllerData[controller.ID] = controller + return nil +} + +// UpdateEndpoints always returns nil. +func (r *Registry) UpdateEndpoints(e api.Endpoints) error { + return nil +} + +// UpdatePod updates the given pod in the registry. +// It returns an error if the pod is not found in the registry. +func (r *Registry) UpdatePod(pod api.Pod) error { + if _, ok := r.podData[pod.ID]; !ok { + return apiserver.NewNotFoundErr("pod", pod.ID) + } + r.podData[pod.ID] = pod + return nil +} + +// UpdateService updates the given service in the registry. +// It returns an error if the service is not found in the registry. +func (r *Registry) UpdateService(svc api.Service) error { + if _, ok := r.serviceData[svc.ID]; !ok { + return apiserver.NewNotFoundErr("service", svc.ID) + } + return r.CreateService(svc) +} + +// WatchControllers always returns an error. +// It is not implemented. +func (r *Registry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + return nil, errors.New("unimplemented") +} diff --git a/pkg/registry/memory_registry_test.go b/pkg/registry/memory/memory_test.go similarity index 92% rename from pkg/registry/memory_registry_test.go rename to pkg/registry/memory/memory_test.go index 4db09bdbc6d..e5f7c093562 100644 --- a/pkg/registry/memory_registry_test.go +++ b/pkg/registry/memory/memory_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package memory import ( "testing" @@ -25,32 +25,30 @@ import ( ) func TestListPodsEmpty(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() pods, err := registry.ListPods(labels.Everything()) if err != nil { t.Errorf("unexpected error: %v", err) } - if len(pods) != 0 { t.Errorf("Unexpected pod list: %#v", pods) } } func TestMemoryListPods(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() registry.CreatePod("machine", api.Pod{JSONBase: api.JSONBase{ID: "foo"}}) pods, err := registry.ListPods(labels.Everything()) if err != nil { t.Errorf("unexpected error: %v", err) } - if len(pods) != 1 || pods[0].ID != "foo" { t.Errorf("Unexpected pod list: %#v", pods) } } func TestMemoryGetPods(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() pod, err := registry.GetPod("foo") if !apiserver.IsNotFound(err) { if err != nil { @@ -62,7 +60,7 @@ func TestMemoryGetPods(t *testing.T) { } func TestMemorySetGetPods(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() expectedPod := api.Pod{JSONBase: api.JSONBase{ID: "foo"}} registry.CreatePod("machine", expectedPod) pod, err := registry.GetPod("foo") @@ -76,7 +74,7 @@ func TestMemorySetGetPods(t *testing.T) { } func TestMemoryUpdatePods(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() pod := api.Pod{ JSONBase: api.JSONBase{ ID: "foo", @@ -96,7 +94,7 @@ func TestMemoryUpdatePods(t *testing.T) { } func TestMemorySetUpdateGetPods(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() oldPod := api.Pod{JSONBase: api.JSONBase{ID: "foo"}} expectedPod := api.Pod{ JSONBase: api.JSONBase{ @@ -119,7 +117,7 @@ func TestMemorySetUpdateGetPods(t *testing.T) { } func TestMemoryDeletePods(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() err := registry.DeletePod("foo") if !apiserver.IsNotFound(err) { if err != nil { @@ -131,7 +129,7 @@ func TestMemoryDeletePods(t *testing.T) { } func TestMemorySetDeleteGetPods(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() expectedPod := api.Pod{JSONBase: api.JSONBase{ID: "foo"}} registry.CreatePod("machine", expectedPod) registry.DeletePod("foo") @@ -146,7 +144,7 @@ func TestMemorySetDeleteGetPods(t *testing.T) { } func TestListControllersEmpty(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() ctls, err := registry.ListControllers() if err != nil { t.Errorf("unexpected error: %v", err) @@ -158,7 +156,7 @@ func TestListControllersEmpty(t *testing.T) { } func TestMemoryListControllers(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() registry.CreateController(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}) ctls, err := registry.ListControllers() if err != nil { @@ -171,7 +169,7 @@ func TestMemoryListControllers(t *testing.T) { } func TestMemoryGetController(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() ctl, err := registry.GetController("foo") if !apiserver.IsNotFound(err) { if err != nil { @@ -183,7 +181,7 @@ func TestMemoryGetController(t *testing.T) { } func TestMemorySetGetControllers(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() expectedController := api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}} registry.CreateController(expectedController) ctl, err := registry.GetController("foo") @@ -197,7 +195,7 @@ func TestMemorySetGetControllers(t *testing.T) { } func TestMemoryUpdateController(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() ctl := api.ReplicationController{ JSONBase: api.JSONBase{ ID: "foo", @@ -217,7 +215,7 @@ func TestMemoryUpdateController(t *testing.T) { } func TestMemorySetUpdateGetControllers(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() oldController := api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}} expectedController := api.ReplicationController{ JSONBase: api.JSONBase{ @@ -240,7 +238,7 @@ func TestMemorySetUpdateGetControllers(t *testing.T) { } func TestMemoryDeleteController(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() err := registry.DeleteController("foo") if !apiserver.IsNotFound(err) { if err != nil { @@ -252,7 +250,7 @@ func TestMemoryDeleteController(t *testing.T) { } func TestMemorySetDeleteGetControllers(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() expectedController := api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}} registry.CreateController(expectedController) registry.DeleteController("foo") @@ -267,7 +265,7 @@ func TestMemorySetDeleteGetControllers(t *testing.T) { } func TestListServicesEmpty(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() svcs, err := registry.ListServices() if err != nil { t.Errorf("unexpected error: %v", err) @@ -279,7 +277,7 @@ func TestListServicesEmpty(t *testing.T) { } func TestMemoryListServices(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() registry.CreateService(api.Service{JSONBase: api.JSONBase{ID: "foo"}}) svcs, err := registry.ListServices() if err != nil { @@ -292,7 +290,7 @@ func TestMemoryListServices(t *testing.T) { } func TestMemoryGetService(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() svc, err := registry.GetService("foo") if !apiserver.IsNotFound(err) { if err != nil { @@ -304,7 +302,7 @@ func TestMemoryGetService(t *testing.T) { } func TestMemorySetGetServices(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() expectedService := api.Service{JSONBase: api.JSONBase{ID: "foo"}} registry.CreateService(expectedService) svc, err := registry.GetService("foo") @@ -318,7 +316,7 @@ func TestMemorySetGetServices(t *testing.T) { } func TestMemoryUpdateService(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() svc := api.Service{ JSONBase: api.JSONBase{ ID: "foo", @@ -336,7 +334,7 @@ func TestMemoryUpdateService(t *testing.T) { } func TestMemorySetUpdateGetServices(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() oldService := api.Service{JSONBase: api.JSONBase{ID: "foo"}} expectedService := api.Service{ JSONBase: api.JSONBase{ @@ -357,7 +355,7 @@ func TestMemorySetUpdateGetServices(t *testing.T) { } func TestMemoryDeleteService(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() err := registry.DeleteService("foo") if !apiserver.IsNotFound(err) { if err != nil { @@ -369,7 +367,7 @@ func TestMemoryDeleteService(t *testing.T) { } func TestMemorySetDeleteGetServices(t *testing.T) { - registry := MakeMemoryRegistry() + registry := NewRegistry() expectedService := api.Service{JSONBase: api.JSONBase{ID: "foo"}} registry.CreateService(expectedService) registry.DeleteService("foo") diff --git a/pkg/registry/memory_registry.go b/pkg/registry/memory_registry.go deleted file mode 100644 index bce2fb60b10..00000000000 --- a/pkg/registry/memory_registry.go +++ /dev/null @@ -1,165 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package registry - -import ( - "errors" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" - "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" -) - -// An implementation of PodRegistry and ControllerRegistry that is backed by memory -// Mainly used for testing. -type MemoryRegistry struct { - podData map[string]api.Pod - controllerData map[string]api.ReplicationController - serviceData map[string]api.Service -} - -func MakeMemoryRegistry() *MemoryRegistry { - return &MemoryRegistry{ - podData: map[string]api.Pod{}, - controllerData: map[string]api.ReplicationController{}, - serviceData: map[string]api.Service{}, - } -} - -func (registry *MemoryRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) { - result := []api.Pod{} - for _, value := range registry.podData { - if selector.Matches(labels.Set(value.Labels)) { - result = append(result, value) - } - } - return result, nil -} - -func (registry *MemoryRegistry) GetPod(podID string) (*api.Pod, error) { - pod, found := registry.podData[podID] - if found { - return &pod, nil - } else { - return nil, apiserver.NewNotFoundErr("pod", podID) - } -} - -func (registry *MemoryRegistry) CreatePod(machine string, pod api.Pod) error { - registry.podData[pod.ID] = pod - return nil -} - -func (registry *MemoryRegistry) DeletePod(podID string) error { - if _, ok := registry.podData[podID]; !ok { - return apiserver.NewNotFoundErr("pod", podID) - } - delete(registry.podData, podID) - return nil -} - -func (registry *MemoryRegistry) UpdatePod(pod api.Pod) error { - if _, ok := registry.podData[pod.ID]; !ok { - return apiserver.NewNotFoundErr("pod", pod.ID) - } - registry.podData[pod.ID] = pod - return nil -} - -func (registry *MemoryRegistry) ListControllers() ([]api.ReplicationController, error) { - result := []api.ReplicationController{} - for _, value := range registry.controllerData { - result = append(result, value) - } - return result, nil -} - -func (registry *MemoryRegistry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { - return nil, errors.New("unimplemented") -} - -func (registry *MemoryRegistry) GetController(controllerID string) (*api.ReplicationController, error) { - controller, found := registry.controllerData[controllerID] - if found { - return &controller, nil - } else { - return nil, apiserver.NewNotFoundErr("replicationController", controllerID) - } -} - -func (registry *MemoryRegistry) CreateController(controller api.ReplicationController) error { - registry.controllerData[controller.ID] = controller - return nil -} - -func (registry *MemoryRegistry) DeleteController(controllerID string) error { - if _, ok := registry.controllerData[controllerID]; !ok { - return apiserver.NewNotFoundErr("replicationController", controllerID) - } - delete(registry.controllerData, controllerID) - return nil -} - -func (registry *MemoryRegistry) UpdateController(controller api.ReplicationController) error { - if _, ok := registry.controllerData[controller.ID]; !ok { - return apiserver.NewNotFoundErr("replicationController", controller.ID) - } - registry.controllerData[controller.ID] = controller - return nil -} - -func (registry *MemoryRegistry) ListServices() (api.ServiceList, error) { - var list []api.Service - for _, value := range registry.serviceData { - list = append(list, value) - } - return api.ServiceList{Items: list}, nil -} - -func (registry *MemoryRegistry) CreateService(svc api.Service) error { - registry.serviceData[svc.ID] = svc - return nil -} - -func (registry *MemoryRegistry) GetService(name string) (*api.Service, error) { - svc, found := registry.serviceData[name] - if found { - return &svc, nil - } else { - return nil, apiserver.NewNotFoundErr("service", name) - } -} - -func (registry *MemoryRegistry) DeleteService(name string) error { - if _, ok := registry.serviceData[name]; !ok { - return apiserver.NewNotFoundErr("service", name) - } - delete(registry.serviceData, name) - return nil -} - -func (registry *MemoryRegistry) UpdateService(svc api.Service) error { - if _, ok := registry.serviceData[svc.ID]; !ok { - return apiserver.NewNotFoundErr("service", svc.ID) - } - return registry.CreateService(svc) -} - -func (registry *MemoryRegistry) UpdateEndpoints(e api.Endpoints) error { - return nil -} diff --git a/pkg/registry/caching_minion_registry.go b/pkg/registry/minion/caching_registry.go similarity index 54% rename from pkg/registry/caching_minion_registry.go rename to pkg/registry/minion/caching_registry.go index 44b61bf0fc8..ec04939b21e 100644 --- a/pkg/registry/caching_minion_registry.go +++ b/pkg/registry/minion/caching_registry.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package minion import ( "sync" @@ -32,8 +32,8 @@ func (SystemClock) Now() time.Time { return time.Now() } -type CachingMinionRegistry struct { - delegate MinionRegistry +type CachingRegistry struct { + delegate Registry ttl time.Duration minions []string lastUpdate int64 @@ -41,12 +41,12 @@ type CachingMinionRegistry struct { clock Clock } -func NewCachingMinionRegistry(delegate MinionRegistry, ttl time.Duration) (MinionRegistry, error) { +func NewCachingRegistry(delegate Registry, ttl time.Duration) (Registry, error) { list, err := delegate.List() if err != nil { return nil, err } - return &CachingMinionRegistry{ + return &CachingRegistry{ delegate: delegate, ttl: ttl, minions: list, @@ -55,44 +55,16 @@ func NewCachingMinionRegistry(delegate MinionRegistry, ttl time.Duration) (Minio }, nil } -func (c *CachingMinionRegistry) List() ([]string, error) { - if c.expired() { - err := c.refresh(false) - if err != nil { - return c.minions, err - } - } - return c.minions, nil -} - -func (c *CachingMinionRegistry) Insert(minion string) error { - err := c.delegate.Insert(minion) - if err != nil { - return err - } - return c.refresh(true) -} - -func (c *CachingMinionRegistry) Delete(minion string) error { - err := c.delegate.Delete(minion) - if err != nil { - return err - } - return c.refresh(true) -} - -func (c *CachingMinionRegistry) Contains(minion string) (bool, error) { - if c.expired() { - err := c.refresh(false) - if err != nil { +func (r *CachingRegistry) Contains(minion string) (bool, error) { + if r.expired() { + if err := r.refresh(false); err != nil { return false, err } } - // block updates in the middle of a contains. - c.lock.RLock() - defer c.lock.RUnlock() - for _, name := range c.minions { + r.lock.RLock() + defer r.lock.RUnlock() + for _, name := range r.minions { if name == minion { return true, nil } @@ -100,23 +72,46 @@ func (c *CachingMinionRegistry) Contains(minion string) (bool, error) { return false, nil } +func (r *CachingRegistry) Delete(minion string) error { + if err := r.delegate.Delete(minion); err != nil { + return err + } + return r.refresh(true) +} + +func (r *CachingRegistry) Insert(minion string) error { + if err := r.delegate.Insert(minion); err != nil { + return err + } + return r.refresh(true) +} + +func (r *CachingRegistry) List() ([]string, error) { + if r.expired() { + if err := r.refresh(false); err != nil { + return r.minions, err + } + } + return r.minions, nil +} + +func (r *CachingRegistry) expired() bool { + var unix int64 + atomic.SwapInt64(&unix, r.lastUpdate) + return r.clock.Now().Sub(time.Unix(r.lastUpdate, 0)) > r.ttl +} + // refresh updates the current store. It double checks expired under lock with the assumption // of optimistic concurrency with the other functions. -func (c *CachingMinionRegistry) refresh(force bool) error { - c.lock.Lock() - defer c.lock.Unlock() - if force || c.expired() { +func (r *CachingRegistry) refresh(force bool) error { + r.lock.Lock() + defer r.lock.Unlock() + if force || r.expired() { var err error - c.minions, err = c.delegate.List() - time := c.clock.Now() - atomic.SwapInt64(&c.lastUpdate, time.Unix()) + r.minions, err = r.delegate.List() + time := r.clock.Now() + atomic.SwapInt64(&r.lastUpdate, time.Unix()) return err } return nil } - -func (c *CachingMinionRegistry) expired() bool { - var unix int64 - atomic.SwapInt64(&unix, c.lastUpdate) - return c.clock.Now().Sub(time.Unix(c.lastUpdate, 0)) > c.ttl -} diff --git a/pkg/registry/caching_minion_registry_test.go b/pkg/registry/minion/caching_registry_test.go similarity index 75% rename from pkg/registry/caching_minion_registry_test.go rename to pkg/registry/minion/caching_registry_test.go index 5f05522b09a..d09ccd77434 100644 --- a/pkg/registry/caching_minion_registry_test.go +++ b/pkg/registry/minion/caching_registry_test.go @@ -14,12 +14,14 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package minion import ( "reflect" "testing" "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" ) type fakeClock struct { @@ -34,9 +36,9 @@ func TestCachingHit(t *testing.T) { fakeClock := fakeClock{ now: time.Unix(0, 0), } - fakeRegistry := MakeMockMinionRegistry([]string{"m1", "m2"}) + fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"}) expected := []string{"m1", "m2", "m3"} - cache := CachingMinionRegistry{ + cache := CachingRegistry{ delegate: fakeRegistry, ttl: 1 * time.Second, clock: &fakeClock, @@ -56,9 +58,9 @@ func TestCachingMiss(t *testing.T) { fakeClock := fakeClock{ now: time.Unix(0, 0), } - fakeRegistry := MakeMockMinionRegistry([]string{"m1", "m2"}) + fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"}) expected := []string{"m1", "m2", "m3"} - cache := CachingMinionRegistry{ + cache := CachingRegistry{ delegate: fakeRegistry, ttl: 1 * time.Second, clock: &fakeClock, @@ -70,9 +72,8 @@ func TestCachingMiss(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - - if !reflect.DeepEqual(list, fakeRegistry.minions) { - t.Errorf("expected: %v, got %v", fakeRegistry.minions, list) + if !reflect.DeepEqual(list, fakeRegistry.Minions) { + t.Errorf("expected: %v, got %v", fakeRegistry.Minions, list) } } @@ -80,9 +81,9 @@ func TestCachingInsert(t *testing.T) { fakeClock := fakeClock{ now: time.Unix(0, 0), } - fakeRegistry := MakeMockMinionRegistry([]string{"m1", "m2"}) + fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"}) expected := []string{"m1", "m2", "m3"} - cache := CachingMinionRegistry{ + cache := CachingRegistry{ delegate: fakeRegistry, ttl: 1 * time.Second, clock: &fakeClock, @@ -93,14 +94,12 @@ func TestCachingInsert(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - list, err := cache.List() if err != nil { t.Errorf("unexpected error: %v", err) } - - if !reflect.DeepEqual(list, fakeRegistry.minions) { - t.Errorf("expected: %v, got %v", fakeRegistry.minions, list) + if !reflect.DeepEqual(list, fakeRegistry.Minions) { + t.Errorf("expected: %v, got %v", fakeRegistry.Minions, list) } } @@ -108,9 +107,9 @@ func TestCachingDelete(t *testing.T) { fakeClock := fakeClock{ now: time.Unix(0, 0), } - fakeRegistry := MakeMockMinionRegistry([]string{"m1", "m2"}) + fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"}) expected := []string{"m1", "m2", "m3"} - cache := CachingMinionRegistry{ + cache := CachingRegistry{ delegate: fakeRegistry, ttl: 1 * time.Second, clock: &fakeClock, @@ -121,13 +120,11 @@ func TestCachingDelete(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - list, err := cache.List() if err != nil { t.Errorf("unexpected error: %v", err) } - - if !reflect.DeepEqual(list, fakeRegistry.minions) { - t.Errorf("expected: %v, got %v", fakeRegistry.minions, list) + if !reflect.DeepEqual(list, fakeRegistry.Minions) { + t.Errorf("expected: %v, got %v", fakeRegistry.Minions, list) } } diff --git a/pkg/registry/cloud_minion_registry.go b/pkg/registry/minion/cloud_registry.go similarity index 66% rename from pkg/registry/cloud_minion_registry.go rename to pkg/registry/minion/cloud_registry.go index 3a158fabc6d..9d7688ce188 100644 --- a/pkg/registry/cloud_minion_registry.go +++ b/pkg/registry/minion/cloud_registry.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package minion import ( "fmt" @@ -22,37 +22,20 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" ) -type CloudMinionRegistry struct { +type CloudRegistry struct { cloud cloudprovider.Interface matchRE string } -func MakeCloudMinionRegistry(cloud cloudprovider.Interface, matchRE string) (*CloudMinionRegistry, error) { - return &CloudMinionRegistry{ +func NewCloudRegistry(cloud cloudprovider.Interface, matchRE string) (*CloudRegistry, error) { + return &CloudRegistry{ cloud: cloud, matchRE: matchRE, }, nil } -func (c *CloudMinionRegistry) List() ([]string, error) { - instances, ok := c.cloud.Instances() - if !ok { - return nil, fmt.Errorf("cloud doesn't support instances") - } - - return instances.List(c.matchRE) -} - -func (c *CloudMinionRegistry) Insert(minion string) error { - return fmt.Errorf("unsupported") -} - -func (c *CloudMinionRegistry) Delete(minion string) error { - return fmt.Errorf("unsupported") -} - -func (c *CloudMinionRegistry) Contains(minion string) (bool, error) { - instances, err := c.List() +func (r *CloudRegistry) Contains(minion string) (bool, error) { + instances, err := r.List() if err != nil { return false, err } @@ -63,3 +46,19 @@ func (c *CloudMinionRegistry) Contains(minion string) (bool, error) { } return false, nil } + +func (r CloudRegistry) Delete(minion string) error { + return fmt.Errorf("unsupported") +} + +func (r CloudRegistry) Insert(minion string) error { + return fmt.Errorf("unsupported") +} + +func (r *CloudRegistry) List() ([]string, error) { + instances, ok := r.cloud.Instances() + if !ok { + return nil, fmt.Errorf("cloud doesn't support instances") + } + return instances.List(r.matchRE) +} diff --git a/pkg/registry/cloud_minion_registry_test.go b/pkg/registry/minion/cloud_registry_test.go similarity index 91% rename from pkg/registry/cloud_minion_registry_test.go rename to pkg/registry/minion/cloud_registry_test.go index 366625bc041..1b1fb08e917 100644 --- a/pkg/registry/cloud_minion_registry_test.go +++ b/pkg/registry/minion/cloud_registry_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package minion import ( "reflect" @@ -28,7 +28,7 @@ func TestCloudList(t *testing.T) { fakeCloud := cloudprovider.FakeCloud{ Machines: instances, } - registry, err := MakeCloudMinionRegistry(&fakeCloud, ".*") + registry, err := NewCloudRegistry(&fakeCloud, ".*") if err != nil { t.Errorf("unexpected error: %v", err) } @@ -48,7 +48,7 @@ func TestCloudContains(t *testing.T) { fakeCloud := cloudprovider.FakeCloud{ Machines: instances, } - registry, err := MakeCloudMinionRegistry(&fakeCloud, ".*") + registry, err := NewCloudRegistry(&fakeCloud, ".*") if err != nil { t.Errorf("unexpected error: %v", err) } @@ -77,7 +77,7 @@ func TestCloudListRegexp(t *testing.T) { fakeCloud := cloudprovider.FakeCloud{ Machines: instances, } - registry, err := MakeCloudMinionRegistry(&fakeCloud, "m[0-9]+") + registry, err := NewCloudRegistry(&fakeCloud, "m[0-9]+") if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/pkg/registry/healthy_minion_registry.go b/pkg/registry/minion/healthy_registry.go similarity index 60% rename from pkg/registry/healthy_minion_registry.go rename to pkg/registry/minion/healthy_registry.go index 658052f13e8..33eabb49456 100644 --- a/pkg/registry/healthy_minion_registry.go +++ b/pkg/registry/minion/healthy_registry.go @@ -14,42 +14,65 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package minion import ( "fmt" "net/http" "github.com/GoogleCloudPlatform/kubernetes/pkg/health" + "github.com/golang/glog" ) -type HealthyMinionRegistry struct { - delegate MinionRegistry +type HealthyRegistry struct { + delegate Registry client health.HTTPGetInterface port int } -func NewHealthyMinionRegistry(delegate MinionRegistry, client *http.Client) MinionRegistry { - return &HealthyMinionRegistry{ +func NewHealthyRegistry(delegate Registry, client *http.Client) Registry { + return &HealthyRegistry{ delegate: delegate, client: client, port: 10250, } } -func (h *HealthyMinionRegistry) makeMinionURL(minion string) string { - return fmt.Sprintf("http://%s:%d/healthz", minion, h.port) +func (r *HealthyRegistry) Contains(minion string) (bool, error) { + contains, err := r.delegate.Contains(minion) + if err != nil { + return false, err + } + if !contains { + return false, nil + } + status, err := health.DoHTTPCheck(r.makeMinionURL(minion), r.client) + if err != nil { + return false, err + } + if status == health.Unhealthy { + return false, nil + } + return true, nil } -func (h *HealthyMinionRegistry) List() (currentMinions []string, err error) { +func (r *HealthyRegistry) Delete(minion string) error { + return r.delegate.Delete(minion) +} + +func (r *HealthyRegistry) Insert(minion string) error { + return r.delegate.Insert(minion) +} + +func (r *HealthyRegistry) List() (currentMinions []string, err error) { var result []string - list, err := h.delegate.List() + list, err := r.delegate.List() if err != nil { return result, err } for _, minion := range list { - status, err := health.DoHTTPCheck(h.makeMinionURL(minion), h.client) + status, err := health.DoHTTPCheck(r.makeMinionURL(minion), r.client) if err != nil { glog.Errorf("%s failed health check with error: %s", minion, err) continue @@ -61,28 +84,6 @@ func (h *HealthyMinionRegistry) List() (currentMinions []string, err error) { return result, nil } -func (h *HealthyMinionRegistry) Insert(minion string) error { - return h.delegate.Insert(minion) -} - -func (h *HealthyMinionRegistry) Delete(minion string) error { - return h.delegate.Delete(minion) -} - -func (h *HealthyMinionRegistry) Contains(minion string) (bool, error) { - contains, err := h.delegate.Contains(minion) - if err != nil { - return false, err - } - if !contains { - return false, nil - } - status, err := health.DoHTTPCheck(h.makeMinionURL(minion), h.client) - if err != nil { - return false, err - } - if status == health.Unhealthy { - return false, nil - } - return true, nil +func (r *HealthyRegistry) makeMinionURL(minion string) string { + return fmt.Sprintf("http://%s:%d/healthz", minion, r.port) } diff --git a/pkg/registry/healthy_minion_registry_test.go b/pkg/registry/minion/healthy_registry_test.go similarity index 81% rename from pkg/registry/healthy_minion_registry_test.go rename to pkg/registry/minion/healthy_registry_test.go index 132339564fc..cebcf31f1d6 100644 --- a/pkg/registry/healthy_minion_registry_test.go +++ b/pkg/registry/minion/healthy_registry_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package minion import ( "bytes" @@ -22,6 +22,8 @@ import ( "net/http" "reflect" "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" ) type alwaysYes struct{} @@ -38,41 +40,33 @@ func (alwaysYes) Get(url string) (*http.Response, error) { } func TestBasicDelegation(t *testing.T) { - mockMinionRegistry := MockMinionRegistry{ - minions: []string{"m1", "m2", "m3"}, - } - healthy := HealthyMinionRegistry{ - delegate: &mockMinionRegistry, + mockMinionRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2", "m3"}) + healthy := HealthyRegistry{ + delegate: mockMinionRegistry, client: alwaysYes{}, } - list, err := healthy.List() if err != nil { t.Errorf("unexpected error: %v", err) } - - if !reflect.DeepEqual(list, mockMinionRegistry.minions) { - t.Errorf("Expected %v, Got %v", mockMinionRegistry.minions, list) + if !reflect.DeepEqual(list, mockMinionRegistry.Minions) { + t.Errorf("Expected %v, Got %v", mockMinionRegistry.Minions, list) } err = healthy.Insert("foo") if err != nil { t.Errorf("unexpected error: %v", err) } - ok, err := healthy.Contains("m1") if err != nil { t.Errorf("unexpected error: %v", err) } - if !ok { t.Errorf("Unexpected absence of 'm1'") } - ok, err = healthy.Contains("m5") if err != nil { t.Errorf("unexpected error: %v", err) } - if ok { t.Errorf("Unexpected presence of 'm5'") } @@ -91,21 +85,17 @@ func (n *notMinion) Get(url string) (*http.Response, error) { } func TestFiltering(t *testing.T) { - mockMinionRegistry := MockMinionRegistry{ - minions: []string{"m1", "m2", "m3"}, - } - healthy := HealthyMinionRegistry{ - delegate: &mockMinionRegistry, + mockMinionRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2", "m3"}) + healthy := HealthyRegistry{ + delegate: mockMinionRegistry, client: ¬Minion{minion: "m1"}, port: 10250, } - expected := []string{"m2", "m3"} list, err := healthy.List() if err != nil { t.Errorf("unexpected error: %v", err) } - if !reflect.DeepEqual(list, expected) { t.Errorf("Expected %v, Got %v", expected, list) } @@ -113,7 +103,6 @@ func TestFiltering(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - if ok { t.Errorf("Unexpected presence of 'm1'") } diff --git a/pkg/registry/minionregistry.go b/pkg/registry/minion/minion.go similarity index 92% rename from pkg/registry/minionregistry.go rename to pkg/registry/minion/minion.go index 7d5ab774dc5..ddf90d73a98 100644 --- a/pkg/registry/minionregistry.go +++ b/pkg/registry/minion/minion.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package minion import ( "fmt" @@ -27,7 +27,7 @@ import ( var ErrDoesNotExist = fmt.Errorf("The requested resource does not exist.") // Keep track of a set of minions. Safe for concurrent reading/writing. -type MinionRegistry interface { +type Registry interface { List() (currentMinions []string, err error) Insert(minion string) error Delete(minion string) error @@ -35,7 +35,7 @@ type MinionRegistry interface { } // Initialize a minion registry with a list of minions. -func MakeMinionRegistry(minions []string) MinionRegistry { +func NewRegistry(minions []string) Registry { m := &minionList{ minions: util.StringSet{}, } @@ -50,22 +50,10 @@ type minionList struct { lock sync.Mutex } -func (m *minionList) List() (currentMinions []string, err error) { +func (m *minionList) Contains(minion string) (bool, error) { m.lock.Lock() defer m.lock.Unlock() - // Convert from map to []string - for minion := range m.minions { - currentMinions = append(currentMinions, minion) - } - sort.StringSlice(currentMinions).Sort() - return -} - -func (m *minionList) Insert(newMinion string) error { - m.lock.Lock() - defer m.lock.Unlock() - m.minions.Insert(newMinion) - return nil + return m.minions.Has(minion), nil } func (m *minionList) Delete(minion string) error { @@ -75,8 +63,19 @@ func (m *minionList) Delete(minion string) error { return nil } -func (m *minionList) Contains(minion string) (bool, error) { +func (m *minionList) Insert(newMinion string) error { m.lock.Lock() defer m.lock.Unlock() - return m.minions.Has(minion), nil + m.minions.Insert(newMinion) + return nil +} + +func (m *minionList) List() (currentMinions []string, err error) { + m.lock.Lock() + defer m.lock.Unlock() + for minion := range m.minions { + currentMinions = append(currentMinions, minion) + } + sort.StringSlice(currentMinions).Sort() + return } diff --git a/pkg/registry/minionregistry_test.go b/pkg/registry/minion/minion_test.go similarity index 92% rename from pkg/registry/minionregistry_test.go rename to pkg/registry/minion/minion_test.go index 1a709679a09..f4dd2bfa5b4 100644 --- a/pkg/registry/minionregistry_test.go +++ b/pkg/registry/minion/minion_test.go @@ -14,15 +14,15 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package minion import ( "reflect" "testing" ) -func TestMinionRegistry(t *testing.T) { - m := MakeMinionRegistry([]string{"foo", "bar"}) +func TestRegistry(t *testing.T) { + m := NewRegistry([]string{"foo", "bar"}) if has, err := m.Contains("foo"); !has || err != nil { t.Errorf("missing expected object") } @@ -32,21 +32,18 @@ func TestMinionRegistry(t *testing.T) { if has, err := m.Contains("baz"); has || err != nil { t.Errorf("has unexpected object") } - if err := m.Insert("baz"); err != nil { t.Errorf("insert failed") } if has, err := m.Contains("baz"); !has || err != nil { t.Errorf("insert didn't actually insert") } - if err := m.Delete("bar"); err != nil { t.Errorf("delete failed") } if has, err := m.Contains("bar"); has || err != nil { t.Errorf("delete didn't actually delete") } - list, err := m.List() if err != nil { t.Errorf("got error calling List") diff --git a/pkg/registry/minionstorage.go b/pkg/registry/minion/storage.go similarity index 56% rename from pkg/registry/minionstorage.go rename to pkg/registry/minion/storage.go index d98b9e5d45d..188ca1de859 100644 --- a/pkg/registry/minionstorage.go +++ b/pkg/registry/minion/storage.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package minion import ( "fmt" @@ -24,46 +24,19 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" ) -// MinionRegistryStorage implements the RESTStorage interface, backed by a MinionRegistry. -type MinionRegistryStorage struct { - registry MinionRegistry +// RegistryStorage implements the RESTStorage interface, backed by a MinionRegistry. +type RegistryStorage struct { + registry Registry } -func MakeMinionRegistryStorage(m MinionRegistry) apiserver.RESTStorage { - return &MinionRegistryStorage{ +// NewRegistryStorage returns a new RegistryStorage. +func NewRegistryStorage(m Registry) apiserver.RESTStorage { + return &RegistryStorage{ registry: m, } } -func (storage *MinionRegistryStorage) toApiMinion(name string) api.Minion { - return api.Minion{JSONBase: api.JSONBase{ID: name}} -} - -func (storage *MinionRegistryStorage) List(selector labels.Selector) (interface{}, error) { - nameList, err := storage.registry.List() - if err != nil { - return nil, err - } - var list api.MinionList - for _, name := range nameList { - list.Items = append(list.Items, storage.toApiMinion(name)) - } - return list, nil -} - -func (storage *MinionRegistryStorage) Get(id string) (interface{}, error) { - exists, err := storage.registry.Contains(id) - if !exists { - return nil, ErrDoesNotExist - } - return storage.toApiMinion(id), err -} - -func (storage *MinionRegistryStorage) New() interface{} { - return &api.Minion{} -} - -func (storage *MinionRegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { +func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { minion, ok := obj.(*api.Minion) if !ok { return nil, fmt.Errorf("not a minion: %#v", obj) @@ -72,27 +45,23 @@ func (storage *MinionRegistryStorage) Create(obj interface{}) (<-chan interface{ return nil, fmt.Errorf("ID should not be empty: %#v", minion) } return apiserver.MakeAsync(func() (interface{}, error) { - err := storage.registry.Insert(minion.ID) + err := rs.registry.Insert(minion.ID) if err != nil { return nil, err } - contains, err := storage.registry.Contains(minion.ID) + contains, err := rs.registry.Contains(minion.ID) if err != nil { return nil, err } if contains { - return storage.toApiMinion(minion.ID), nil + return rs.toApiMinion(minion.ID), nil } return nil, fmt.Errorf("unable to add minion %#v", minion) }), nil } -func (storage *MinionRegistryStorage) Update(minion interface{}) (<-chan interface{}, error) { - return nil, fmt.Errorf("Minions can only be created (inserted) and deleted.") -} - -func (storage *MinionRegistryStorage) Delete(id string) (<-chan interface{}, error) { - exists, err := storage.registry.Contains(id) +func (rs *RegistryStorage) Delete(id string) (<-chan interface{}, error) { + exists, err := rs.registry.Contains(id) if !exists { return nil, ErrDoesNotExist } @@ -100,6 +69,38 @@ func (storage *MinionRegistryStorage) Delete(id string) (<-chan interface{}, err return nil, err } return apiserver.MakeAsync(func() (interface{}, error) { - return &api.Status{Status: api.StatusSuccess}, storage.registry.Delete(id) + return &api.Status{Status: api.StatusSuccess}, rs.registry.Delete(id) }), nil } + +func (rs *RegistryStorage) Get(id string) (interface{}, error) { + exists, err := rs.registry.Contains(id) + if !exists { + return nil, ErrDoesNotExist + } + return rs.toApiMinion(id), err +} + +func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) { + nameList, err := rs.registry.List() + if err != nil { + return nil, err + } + var list api.MinionList + for _, name := range nameList { + list.Items = append(list.Items, rs.toApiMinion(name)) + } + return list, nil +} + +func (rs RegistryStorage) New() interface{} { + return &api.Minion{} +} + +func (rs *RegistryStorage) Update(minion interface{}) (<-chan interface{}, error) { + return nil, fmt.Errorf("Minions can only be created (inserted) and deleted.") +} + +func (rs *RegistryStorage) toApiMinion(name string) api.Minion { + return api.Minion{JSONBase: api.JSONBase{ID: name}} +} diff --git a/pkg/registry/minionstorage_test.go b/pkg/registry/minion/storage_test.go similarity index 95% rename from pkg/registry/minionstorage_test.go rename to pkg/registry/minion/storage_test.go index a0c7ef7f306..96d81595cd5 100644 --- a/pkg/registry/minionstorage_test.go +++ b/pkg/registry/minion/storage_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package minion import ( "reflect" @@ -25,8 +25,8 @@ import ( ) func TestMinionRegistryStorage(t *testing.T) { - m := MakeMinionRegistry([]string{"foo", "bar"}) - ms := MakeMinionRegistryStorage(m) + m := NewRegistry([]string{"foo", "bar"}) + ms := NewRegistryStorage(m) if obj, err := ms.Get("foo"); err != nil || obj.(api.Minion).ID != "foo" { t.Errorf("missing expected object") diff --git a/pkg/registry/mock_registry.go b/pkg/registry/mock_registry.go deleted file mode 100644 index 89b8d90b8f6..00000000000 --- a/pkg/registry/mock_registry.go +++ /dev/null @@ -1,127 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package registry - -import ( - "sync" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" -) - -type MockPodRegistry struct { - err error - pod *api.Pod - pods []api.Pod - sync.Mutex -} - -func MakeMockPodRegistry(pods []api.Pod) *MockPodRegistry { - return &MockPodRegistry{ - pods: pods, - } -} - -func (registry *MockPodRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) { - registry.Lock() - defer registry.Unlock() - if registry.err != nil { - return registry.pods, registry.err - } - var filtered []api.Pod - for _, pod := range registry.pods { - if selector.Matches(labels.Set(pod.Labels)) { - filtered = append(filtered, pod) - } - } - return filtered, nil -} - -func (registry *MockPodRegistry) GetPod(podId string) (*api.Pod, error) { - registry.Lock() - defer registry.Unlock() - return registry.pod, registry.err -} - -func (registry *MockPodRegistry) CreatePod(machine string, pod api.Pod) error { - registry.Lock() - defer registry.Unlock() - return registry.err -} - -func (registry *MockPodRegistry) UpdatePod(pod api.Pod) error { - registry.Lock() - defer registry.Unlock() - registry.pod = &pod - return registry.err -} - -func (registry *MockPodRegistry) DeletePod(podId string) error { - registry.Lock() - defer registry.Unlock() - return registry.err -} - -type MockMinionRegistry struct { - err error - minion string - minions []string - sync.Mutex -} - -func MakeMockMinionRegistry(minions []string) *MockMinionRegistry { - return &MockMinionRegistry{ - minions: minions, - } -} - -func (registry *MockMinionRegistry) List() ([]string, error) { - registry.Lock() - defer registry.Unlock() - return registry.minions, registry.err -} - -func (registry *MockMinionRegistry) Insert(minion string) error { - registry.Lock() - defer registry.Unlock() - registry.minion = minion - return registry.err -} - -func (registry *MockMinionRegistry) Contains(minion string) (bool, error) { - registry.Lock() - defer registry.Unlock() - for _, name := range registry.minions { - if name == minion { - return true, registry.err - } - } - return false, registry.err -} - -func (registry *MockMinionRegistry) Delete(minion string) error { - registry.Lock() - defer registry.Unlock() - var newList []string - for _, name := range registry.minions { - if name != minion { - newList = append(newList, name) - } - } - registry.minions = newList - return registry.err -} diff --git a/pkg/registry/pod/registry.go b/pkg/registry/pod/registry.go new file mode 100644 index 00000000000..12440353ad5 --- /dev/null +++ b/pkg/registry/pod/registry.go @@ -0,0 +1,36 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pod + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" +) + +// Registry is an interface implemented by things that know how to store Pod objects. +type Registry interface { + // ListPods obtains a list of pods that match selector. + ListPods(selector labels.Selector) ([]api.Pod, error) + // Get a specific pod + GetPod(podID string) (*api.Pod, error) + // Create a pod based on a specification, schedule it onto a specific machine. + CreatePod(machine string, pod api.Pod) error + // Update an existing pod + UpdatePod(pod api.Pod) error + // Delete an existing pod + DeletePod(podID string) error +} diff --git a/pkg/registry/podstorage.go b/pkg/registry/pod/storage.go similarity index 60% rename from pkg/registry/podstorage.go rename to pkg/registry/pod/storage.go index 16124c1dd7e..b84035fe872 100644 --- a/pkg/registry/podstorage.go +++ b/pkg/registry/pod/storage.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package pod import ( "fmt" @@ -22,77 +22,130 @@ import ( "sync" "time" - "code.google.com/p/go-uuid/uuid" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" + + "code.google.com/p/go-uuid/uuid" "github.com/golang/glog" ) -// PodRegistryStorage implements the RESTStorage interface in terms of a PodRegistry -type PodRegistryStorage struct { - registry PodRegistry - podInfoGetter client.PodInfoGetter - podCache client.PodInfoGetter - scheduler scheduler.Scheduler +// RegistryStorage implements the RESTStorage interface in terms of a PodRegistry +type RegistryStorage struct { + cloudProvider cloudprovider.Interface + mu sync.Mutex minionLister scheduler.MinionLister - cloud cloudprovider.Interface + podCache client.PodInfoGetter + podInfoGetter client.PodInfoGetter podPollPeriod time.Duration - lock sync.Mutex + registry Registry + scheduler scheduler.Scheduler } -// MakePodRegistryStorage makes a RESTStorage object for a pod registry. -// Parameters: -// registry: The pod registry -// podInfoGetter: Source of fresh container info -// scheduler: The scheduler for assigning pods to machines -// minionLister: Object which can list available minions for the scheduler -// cloud: Interface to a cloud provider (may be null) -// podCache: Source of cached container info -func MakePodRegistryStorage(registry PodRegistry, - podInfoGetter client.PodInfoGetter, - scheduler scheduler.Scheduler, - minionLister scheduler.MinionLister, - cloud cloudprovider.Interface, - podCache client.PodInfoGetter) apiserver.RESTStorage { - return &PodRegistryStorage{ - registry: registry, - podInfoGetter: podInfoGetter, - scheduler: scheduler, - minionLister: minionLister, - cloud: cloud, - podCache: podCache, +type RegistryStorageConfig struct { + CloudProvider cloudprovider.Interface + MinionLister scheduler.MinionLister + PodCache client.PodInfoGetter + PodInfoGetter client.PodInfoGetter + Registry Registry + Scheduler scheduler.Scheduler +} + +// NewRegistryStorage returns a new RegistryStorage. +func NewRegistryStorage(config *RegistryStorageConfig) apiserver.RESTStorage { + return &RegistryStorage{ + cloudProvider: config.CloudProvider, + minionLister: config.MinionLister, + podCache: config.PodCache, + podInfoGetter: config.PodInfoGetter, podPollPeriod: time.Second * 10, + registry: config.Registry, + scheduler: config.Scheduler, } } -func (storage *PodRegistryStorage) List(selector labels.Selector) (interface{}, error) { +func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { + pod := obj.(*api.Pod) + if len(pod.ID) == 0 { + pod.ID = uuid.NewUUID().String() + } + pod.DesiredState.Manifest.ID = pod.ID + if errs := api.ValidatePod(pod); len(errs) > 0 { + return nil, fmt.Errorf("Validation errors: %v", errs) + } + return apiserver.MakeAsync(func() (interface{}, error) { + if err := rs.scheduleAndCreatePod(*pod); err != nil { + return nil, err + } + return rs.waitForPodRunning(*pod) + }), nil +} + +func (rs *RegistryStorage) Delete(id string) (<-chan interface{}, error) { + return apiserver.MakeAsync(func() (interface{}, error) { + return api.Status{Status: api.StatusSuccess}, rs.registry.DeletePod(id) + }), nil +} + +func (rs *RegistryStorage) Get(id string) (interface{}, error) { + pod, err := rs.registry.GetPod(id) + if err != nil { + return pod, err + } + if pod == nil { + return pod, nil + } + if rs.podCache != nil || rs.podInfoGetter != nil { + rs.fillPodInfo(pod) + pod.CurrentState.Status = makePodStatus(pod) + } + pod.CurrentState.HostIP = getInstanceIP(rs.cloudProvider, pod.CurrentState.Host) + return pod, err +} + +func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) { var result api.PodList - pods, err := storage.registry.ListPods(selector) + pods, err := rs.registry.ListPods(selector) if err == nil { result.Items = pods for i := range result.Items { - storage.fillPodInfo(&result.Items[i]) + rs.fillPodInfo(&result.Items[i]) } } - return result, err } -func (storage *PodRegistryStorage) fillPodInfo(pod *api.Pod) { +func (rs RegistryStorage) New() interface{} { + return &api.Pod{} +} + +func (rs *RegistryStorage) Update(obj interface{}) (<-chan interface{}, error) { + pod := obj.(*api.Pod) + if errs := api.ValidatePod(pod); len(errs) > 0 { + return nil, fmt.Errorf("Validation errors: %v", errs) + } + return apiserver.MakeAsync(func() (interface{}, error) { + if err := rs.registry.UpdatePod(*pod); err != nil { + return nil, err + } + return rs.waitForPodRunning(*pod) + }), nil +} + +func (rs *RegistryStorage) fillPodInfo(pod *api.Pod) { // Get cached info for the list currently. // TODO: Optionally use fresh info - if storage.podCache != nil { - info, err := storage.podCache.GetPodInfo(pod.CurrentState.Host, pod.ID) + if rs.podCache != nil { + info, err := rs.podCache.GetPodInfo(pod.CurrentState.Host, pod.ID) if err != nil { if err != client.ErrPodInfoNotAvailable { glog.Errorf("Error getting container info from cache: %#v", err) } - if storage.podInfoGetter != nil { - info, err = storage.podInfoGetter.GetPodInfo(pod.CurrentState.Host, pod.ID) + if rs.podInfoGetter != nil { + info, err = rs.podInfoGetter.GetPodInfo(pod.CurrentState.Host, pod.ID) } if err != nil { if err != client.ErrPodInfoNotAvailable { @@ -115,37 +168,6 @@ func (storage *PodRegistryStorage) fillPodInfo(pod *api.Pod) { } } -func makePodStatus(pod *api.Pod) api.PodStatus { - if pod.CurrentState.Info == nil || pod.CurrentState.Host == "" { - return api.PodWaiting - } - running := 0 - stopped := 0 - unknown := 0 - for _, container := range pod.DesiredState.Manifest.Containers { - if info, ok := pod.CurrentState.Info[container.Name]; ok { - if info.State.Running { - running++ - } else { - stopped++ - } - } else { - unknown++ - } - } - - switch { - case running > 0 && stopped == 0 && unknown == 0: - return api.PodRunning - case running == 0 && stopped > 0 && unknown == 0: - return api.PodTerminated - case running == 0 && stopped == 0 && unknown > 0: - return api.PodWaiting - default: - return api.PodWaiting - } -} - func getInstanceIP(cloud cloudprovider.Interface, host string) string { if cloud == nil { return "" @@ -166,82 +188,50 @@ func getInstanceIP(cloud cloudprovider.Interface, host string) string { return addr.String() } -func (storage *PodRegistryStorage) Get(id string) (interface{}, error) { - pod, err := storage.registry.GetPod(id) - if err != nil { - return pod, err +func makePodStatus(pod *api.Pod) api.PodStatus { + if pod.CurrentState.Info == nil || pod.CurrentState.Host == "" { + return api.PodWaiting } - if pod == nil { - return pod, nil + running := 0 + stopped := 0 + unknown := 0 + for _, container := range pod.DesiredState.Manifest.Containers { + if info, ok := pod.CurrentState.Info[container.Name]; ok { + if info.State.Running { + running++ + } else { + stopped++ + } + } else { + unknown++ + } } - if storage.podCache != nil || storage.podInfoGetter != nil { - storage.fillPodInfo(pod) - pod.CurrentState.Status = makePodStatus(pod) + switch { + case running > 0 && stopped == 0 && unknown == 0: + return api.PodRunning + case running == 0 && stopped > 0 && unknown == 0: + return api.PodTerminated + case running == 0 && stopped == 0 && unknown > 0: + return api.PodWaiting + default: + return api.PodWaiting } - pod.CurrentState.HostIP = getInstanceIP(storage.cloud, pod.CurrentState.Host) - - return pod, err } -func (storage *PodRegistryStorage) Delete(id string) (<-chan interface{}, error) { - return apiserver.MakeAsync(func() (interface{}, error) { - return &api.Status{Status: api.StatusSuccess}, storage.registry.DeletePod(id) - }), nil -} - -func (storage *PodRegistryStorage) New() interface{} { - return &api.Pod{} -} - -func (storage *PodRegistryStorage) scheduleAndCreatePod(pod api.Pod) error { - storage.lock.Lock() - defer storage.lock.Unlock() +func (rs *RegistryStorage) scheduleAndCreatePod(pod api.Pod) error { + rs.mu.Lock() + defer rs.mu.Unlock() // TODO(lavalamp): Separate scheduler more cleanly. - machine, err := storage.scheduler.Schedule(pod, storage.minionLister) + machine, err := rs.scheduler.Schedule(pod, rs.minionLister) if err != nil { return err } - return storage.registry.CreatePod(machine, pod) + return rs.registry.CreatePod(machine, pod) } -func (storage *PodRegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { - pod := obj.(*api.Pod) - if len(pod.ID) == 0 { - pod.ID = uuid.NewUUID().String() - } - pod.DesiredState.Manifest.ID = pod.ID - - if errs := api.ValidatePod(pod); len(errs) > 0 { - return nil, fmt.Errorf("Validation errors: %v", errs) - } - - return apiserver.MakeAsync(func() (interface{}, error) { - err := storage.scheduleAndCreatePod(*pod) - if err != nil { - return nil, err - } - return storage.waitForPodRunning(*pod) - }), nil -} - -func (storage *PodRegistryStorage) Update(obj interface{}) (<-chan interface{}, error) { - pod := obj.(*api.Pod) - if errs := api.ValidatePod(pod); len(errs) > 0 { - return nil, fmt.Errorf("Validation errors: %v", errs) - } - return apiserver.MakeAsync(func() (interface{}, error) { - err := storage.registry.UpdatePod(*pod) - if err != nil { - return nil, err - } - return storage.waitForPodRunning(*pod) - }), nil -} - -func (storage *PodRegistryStorage) waitForPodRunning(pod api.Pod) (interface{}, error) { +func (rs *RegistryStorage) waitForPodRunning(pod api.Pod) (interface{}, error) { for { - podObj, err := storage.Get(pod.ID) - + podObj, err := rs.Get(pod.ID) if err != nil || podObj == nil { return nil, err } @@ -254,7 +244,7 @@ func (storage *PodRegistryStorage) waitForPodRunning(pod api.Pod) (interface{}, case api.PodRunning, api.PodTerminated: return pod, nil default: - time.Sleep(storage.podPollPeriod) + time.Sleep(rs.podPollPeriod) } } return pod, nil diff --git a/pkg/registry/podstorage_test.go b/pkg/registry/pod/storage_test.go similarity index 78% rename from pkg/registry/podstorage_test.go rename to pkg/registry/pod/storage_test.go index 7c4bc5e2af1..d3e4d5cc6ce 100644 --- a/pkg/registry/podstorage_test.go +++ b/pkg/registry/pod/storage_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package pod import ( "fmt" @@ -25,7 +25,10 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" + "github.com/fsouza/go-dockerclient" ) @@ -52,12 +55,12 @@ func expectPod(t *testing.T, ch <-chan interface{}) (*api.Pod, bool) { } func TestCreatePodRegistryError(t *testing.T) { - mockRegistry := &MockPodRegistry{ - err: fmt.Errorf("test error"), + podRegistry := ®istrytest.PodRegistry{ + Err: fmt.Errorf("test error"), } - storage := PodRegistryStorage{ - scheduler: &MockScheduler{}, - registry: mockRegistry, + storage := RegistryStorage{ + scheduler: ®istrytest.Scheduler{}, + registry: podRegistry, } desiredState := api.PodState{ Manifest: api.ContainerManifest{ @@ -69,25 +72,14 @@ func TestCreatePodRegistryError(t *testing.T) { if err != nil { t.Errorf("Expected %#v, Got %#v", nil, err) } - expectApiStatusError(t, ch, mockRegistry.err.Error()) -} - -type MockScheduler struct { - err error - pod api.Pod - machine string -} - -func (m *MockScheduler) Schedule(pod api.Pod, lister scheduler.MinionLister) (string, error) { - m.pod = pod - return m.machine, m.err + expectApiStatusError(t, ch, podRegistry.Err.Error()) } func TestCreatePodSchedulerError(t *testing.T) { - mockScheduler := MockScheduler{ - err: fmt.Errorf("test error"), + mockScheduler := registrytest.Scheduler{ + Err: fmt.Errorf("test error"), } - storage := PodRegistryStorage{ + storage := RegistryStorage{ scheduler: &mockScheduler, } desiredState := api.PodState{ @@ -100,26 +92,15 @@ func TestCreatePodSchedulerError(t *testing.T) { if err != nil { t.Errorf("Expected %#v, Got %#v", nil, err) } - expectApiStatusError(t, ch, mockScheduler.err.Error()) -} - -type MockPodStorageRegistry struct { - MockPodRegistry - machine string -} - -func (r *MockPodStorageRegistry) CreatePod(machine string, pod api.Pod) error { - r.MockPodRegistry.pod = &pod - r.machine = machine - return r.MockPodRegistry.err + expectApiStatusError(t, ch, mockScheduler.Err.Error()) } func TestCreatePodSetsIds(t *testing.T) { - mockRegistry := &MockPodStorageRegistry{ - MockPodRegistry: MockPodRegistry{err: fmt.Errorf("test error")}, + mockRegistry := ®istrytest.PodRegistryStorage{ + PodRegistry: registrytest.PodRegistry{Err: fmt.Errorf("test error")}, } - storage := PodRegistryStorage{ - scheduler: &MockScheduler{machine: "test"}, + storage := RegistryStorage{ + scheduler: ®istrytest.Scheduler{Machine: "test"}, registry: mockRegistry, } desiredState := api.PodState{ @@ -132,26 +113,26 @@ func TestCreatePodSetsIds(t *testing.T) { if err != nil { t.Errorf("Expected %#v, Got %#v", nil, err) } - expectApiStatusError(t, ch, mockRegistry.err.Error()) + expectApiStatusError(t, ch, mockRegistry.Err.Error()) - if len(mockRegistry.MockPodRegistry.pod.ID) == 0 { + if len(mockRegistry.PodRegistry.Pod.ID) == 0 { t.Errorf("Expected pod ID to be set, Got %#v", pod) } - if mockRegistry.MockPodRegistry.pod.DesiredState.Manifest.ID != mockRegistry.MockPodRegistry.pod.ID { + if mockRegistry.PodRegistry.Pod.DesiredState.Manifest.ID != mockRegistry.PodRegistry.Pod.ID { t.Errorf("Expected manifest ID to be equal to pod ID, Got %#v", pod) } } func TestListPodsError(t *testing.T) { - mockRegistry := MockPodRegistry{ - err: fmt.Errorf("test error"), + mockRegistry := registrytest.PodRegistry{ + Err: fmt.Errorf("test error"), } - storage := PodRegistryStorage{ + storage := RegistryStorage{ registry: &mockRegistry, } pods, err := storage.List(labels.Everything()) - if err != mockRegistry.err { - t.Errorf("Expected %#v, Got %#v", mockRegistry.err, err) + if err != mockRegistry.Err { + t.Errorf("Expected %#v, Got %#v", mockRegistry.Err, err) } if len(pods.(api.PodList).Items) != 0 { t.Errorf("Unexpected non-zero pod list: %#v", pods) @@ -159,8 +140,8 @@ func TestListPodsError(t *testing.T) { } func TestListEmptyPodList(t *testing.T) { - mockRegistry := MockPodRegistry{} - storage := PodRegistryStorage{ + mockRegistry := registrytest.PodRegistry{} + storage := RegistryStorage{ registry: &mockRegistry, } pods, err := storage.List(labels.Everything()) @@ -174,8 +155,8 @@ func TestListEmptyPodList(t *testing.T) { } func TestListPodList(t *testing.T) { - mockRegistry := MockPodRegistry{ - pods: []api.Pod{ + mockRegistry := registrytest.PodRegistry{ + Pods: []api.Pod{ { JSONBase: api.JSONBase{ ID: "foo", @@ -188,7 +169,7 @@ func TestListPodList(t *testing.T) { }, }, } - storage := PodRegistryStorage{ + storage := RegistryStorage{ registry: &mockRegistry, } podsObj, err := storage.List(labels.Everything()) @@ -209,8 +190,8 @@ func TestListPodList(t *testing.T) { } func TestPodDecode(t *testing.T) { - mockRegistry := MockPodRegistry{} - storage := PodRegistryStorage{ + mockRegistry := registrytest.PodRegistry{} + storage := RegistryStorage{ registry: &mockRegistry, } expected := &api.Pod{ @@ -234,12 +215,12 @@ func TestPodDecode(t *testing.T) { } func TestGetPod(t *testing.T) { - mockRegistry := MockPodRegistry{ - pod: &api.Pod{ + mockRegistry := registrytest.PodRegistry{ + Pod: &api.Pod{ JSONBase: api.JSONBase{ID: "foo"}, }, } - storage := PodRegistryStorage{ + storage := RegistryStorage{ registry: &mockRegistry, } obj, err := storage.Get("foo") @@ -248,21 +229,21 @@ func TestGetPod(t *testing.T) { t.Errorf("unexpected error: %v", err) } - if !reflect.DeepEqual(*mockRegistry.pod, *pod) { - t.Errorf("Unexpected pod. Expected %#v, Got %#v", *mockRegistry.pod, *pod) + if !reflect.DeepEqual(*mockRegistry.Pod, *pod) { + t.Errorf("Unexpected pod. Expected %#v, Got %#v", *mockRegistry.Pod, *pod) } } func TestGetPodCloud(t *testing.T) { fakeCloud := &cloudprovider.FakeCloud{} - mockRegistry := MockPodRegistry{ - pod: &api.Pod{ + mockRegistry := registrytest.PodRegistry{ + Pod: &api.Pod{ JSONBase: api.JSONBase{ID: "foo"}, }, } - storage := PodRegistryStorage{ - registry: &mockRegistry, - cloud: fakeCloud, + storage := RegistryStorage{ + registry: &mockRegistry, + cloudProvider: fakeCloud, } obj, err := storage.Get("foo") pod := obj.(*api.Pod) @@ -270,8 +251,8 @@ func TestGetPodCloud(t *testing.T) { t.Errorf("unexpected error: %v", err) } - if !reflect.DeepEqual(*mockRegistry.pod, *pod) { - t.Errorf("Unexpected pod. Expected %#v, Got %#v", *mockRegistry.pod, *pod) + if !reflect.DeepEqual(*mockRegistry.Pod, *pod) { + t.Errorf("Unexpected pod. Expected %#v, Got %#v", *mockRegistry.Pod, *pod) } if len(fakeCloud.Calls) != 1 || fakeCloud.Calls[0] != "ip-address" { t.Errorf("Unexpected calls: %#v", fakeCloud.Calls) @@ -373,11 +354,11 @@ func TestMakePodStatus(t *testing.T) { } func TestPodStorageValidatesCreate(t *testing.T) { - mockRegistry := &MockPodStorageRegistry{ - MockPodRegistry: MockPodRegistry{err: fmt.Errorf("test error")}, + mockRegistry := ®istrytest.PodRegistryStorage{ + PodRegistry: registrytest.PodRegistry{Err: fmt.Errorf("test error")}, } - storage := PodRegistryStorage{ - scheduler: &MockScheduler{machine: "test"}, + storage := RegistryStorage{ + scheduler: ®istrytest.Scheduler{Machine: "test"}, registry: mockRegistry, } pod := &api.Pod{} @@ -391,11 +372,11 @@ func TestPodStorageValidatesCreate(t *testing.T) { } func TestPodStorageValidatesUpdate(t *testing.T) { - mockRegistry := &MockPodStorageRegistry{ - MockPodRegistry: MockPodRegistry{err: fmt.Errorf("test error")}, + mockRegistry := ®istrytest.PodRegistryStorage{ + PodRegistry: registrytest.PodRegistry{Err: fmt.Errorf("test error")}, } - storage := PodRegistryStorage{ - scheduler: &MockScheduler{machine: "test"}, + storage := RegistryStorage{ + scheduler: ®istrytest.Scheduler{Machine: "test"}, registry: mockRegistry, } pod := &api.Pod{} @@ -409,19 +390,19 @@ func TestPodStorageValidatesUpdate(t *testing.T) { } func TestCreatePod(t *testing.T) { - mockRegistry := MockPodRegistry{ - pod: &api.Pod{ + mockRegistry := registrytest.PodRegistry{ + Pod: &api.Pod{ JSONBase: api.JSONBase{ID: "foo"}, CurrentState: api.PodState{ Host: "machine", }, }, } - storage := PodRegistryStorage{ + storage := RegistryStorage{ registry: &mockRegistry, podPollPeriod: time.Millisecond * 100, scheduler: scheduler.MakeRoundRobinScheduler(), - minionLister: MakeMinionRegistry([]string{"machine"}), + minionLister: minion.NewRegistry([]string{"machine"}), } desiredState := api.PodState{ Manifest: api.ContainerManifest{ @@ -479,18 +460,14 @@ func TestFillPodInfo(t *testing.T) { }, }, } - storage := PodRegistryStorage{ + storage := RegistryStorage{ podCache: &fakeGetter, } - pod := api.Pod{} - storage.fillPodInfo(&pod) - if !reflect.DeepEqual(fakeGetter.info, pod.CurrentState.Info) { t.Errorf("Expected: %#v, Got %#v", fakeGetter.info, pod.CurrentState.Info) } - if pod.CurrentState.PodIP != expectedIP { t.Errorf("Expected %s, Got %s", expectedIP, pod.CurrentState.PodIP) } @@ -506,18 +483,14 @@ func TestFillPodInfoNoData(t *testing.T) { }, }, } - storage := PodRegistryStorage{ + storage := RegistryStorage{ podCache: &fakeGetter, } - pod := api.Pod{} - storage.fillPodInfo(&pod) - if !reflect.DeepEqual(fakeGetter.info, pod.CurrentState.Info) { t.Errorf("Expected %#v, Got %#v", fakeGetter.info, pod.CurrentState.Info) } - if pod.CurrentState.PodIP != expectedIP { t.Errorf("Expected %s, Got %s", expectedIP, pod.CurrentState.PodIP) } diff --git a/pkg/registry/registrytest/controller.go b/pkg/registry/registrytest/controller.go new file mode 100644 index 00000000000..7855c611b54 --- /dev/null +++ b/pkg/registry/registrytest/controller.go @@ -0,0 +1,53 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registrytest + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +// TODO: Why do we have this AND MemoryRegistry? +type ControllerRegistry struct { + Err error + Controllers []api.ReplicationController +} + +func (r *ControllerRegistry) ListControllers() ([]api.ReplicationController, error) { + return r.Controllers, r.Err +} + +func (r *ControllerRegistry) GetController(ID string) (*api.ReplicationController, error) { + return &api.ReplicationController{}, r.Err +} + +func (r *ControllerRegistry) CreateController(controller api.ReplicationController) error { + return r.Err +} + +func (r *ControllerRegistry) UpdateController(controller api.ReplicationController) error { + return r.Err +} + +func (r *ControllerRegistry) DeleteController(ID string) error { + return r.Err +} + +func (r *ControllerRegistry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + return nil, r.Err +} diff --git a/pkg/registry/registrytest/minion.go b/pkg/registry/registrytest/minion.go new file mode 100644 index 00000000000..736c6ae697e --- /dev/null +++ b/pkg/registry/registrytest/minion.go @@ -0,0 +1,69 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registrytest + +import "sync" + +type MinionRegistry struct { + Err error + Minion string + Minions []string + sync.Mutex +} + +func NewMinionRegistry(minions []string) *MinionRegistry { + return &MinionRegistry{ + Minions: minions, + } +} + +func (r *MinionRegistry) List() ([]string, error) { + r.Lock() + defer r.Unlock() + return r.Minions, r.Err +} + +func (r *MinionRegistry) Insert(minion string) error { + r.Lock() + defer r.Unlock() + r.Minion = minion + return r.Err +} + +func (r *MinionRegistry) Contains(minion string) (bool, error) { + r.Lock() + defer r.Unlock() + for _, name := range r.Minions { + if name == minion { + return true, r.Err + } + } + return false, r.Err +} + +func (r *MinionRegistry) Delete(minion string) error { + r.Lock() + defer r.Unlock() + var newList []string + for _, name := range r.Minions { + if name != minion { + newList = append(newList, name) + } + } + r.Minions = newList + return r.Err +} diff --git a/pkg/registry/registrytest/pod.go b/pkg/registry/registrytest/pod.go new file mode 100644 index 00000000000..b2803328a46 --- /dev/null +++ b/pkg/registry/registrytest/pod.go @@ -0,0 +1,77 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registrytest + +import ( + "sync" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" +) + +type PodRegistry struct { + Err error + Pod *api.Pod + Pods []api.Pod + sync.Mutex +} + +func NewPodRegistry(pods []api.Pod) *PodRegistry { + return &PodRegistry{ + Pods: pods, + } +} + +func (r *PodRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) { + r.Lock() + defer r.Unlock() + if r.Err != nil { + return r.Pods, r.Err + } + var filtered []api.Pod + for _, pod := range r.Pods { + if selector.Matches(labels.Set(pod.Labels)) { + filtered = append(filtered, pod) + } + } + return filtered, nil +} + +func (r *PodRegistry) GetPod(podId string) (*api.Pod, error) { + r.Lock() + defer r.Unlock() + return r.Pod, r.Err +} + +func (r *PodRegistry) CreatePod(machine string, pod api.Pod) error { + r.Lock() + defer r.Unlock() + return r.Err +} + +func (r *PodRegistry) UpdatePod(pod api.Pod) error { + r.Lock() + defer r.Unlock() + r.Pod = &pod + return r.Err +} + +func (r *PodRegistry) DeletePod(podId string) error { + r.Lock() + defer r.Unlock() + return r.Err +} diff --git a/pkg/registry/registrytest/pod_storage.go b/pkg/registry/registrytest/pod_storage.go new file mode 100644 index 00000000000..fb787d48242 --- /dev/null +++ b/pkg/registry/registrytest/pod_storage.go @@ -0,0 +1,30 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registrytest + +import "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + +type PodRegistryStorage struct { + PodRegistry + machine string +} + +func (rs *PodRegistryStorage) CreatePod(machine string, pod api.Pod) error { + rs.PodRegistry.Pod = &pod + rs.machine = machine + return rs.PodRegistry.Err +} diff --git a/pkg/registry/registrytest/schedular.go b/pkg/registry/registrytest/schedular.go new file mode 100644 index 00000000000..2e44ccbaa32 --- /dev/null +++ b/pkg/registry/registrytest/schedular.go @@ -0,0 +1,33 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registrytest + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" +) + +type Scheduler struct { + Err error + Pod api.Pod + Machine string +} + +func (s *Scheduler) Schedule(pod api.Pod, lister scheduler.MinionLister) (string, error) { + s.Pod = pod + return s.Machine, s.Err +} diff --git a/pkg/registry/mock_service_registry.go b/pkg/registry/registrytest/service.go similarity index 50% rename from pkg/registry/mock_service_registry.go rename to pkg/registry/registrytest/service.go index 712881f541a..b637d9cb837 100644 --- a/pkg/registry/mock_service_registry.go +++ b/pkg/registry/registrytest/service.go @@ -14,39 +14,39 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package registrytest import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" ) -type MockServiceRegistry struct { - list api.ServiceList - err error - endpoints api.Endpoints +type ServiceRegistry struct { + List api.ServiceList + Err error + Endpoints api.Endpoints } -func (m *MockServiceRegistry) ListServices() (api.ServiceList, error) { - return m.list, m.err +func (r *ServiceRegistry) ListServices() (api.ServiceList, error) { + return r.List, r.Err } -func (m *MockServiceRegistry) CreateService(svc api.Service) error { - return m.err +func (r *ServiceRegistry) CreateService(svc api.Service) error { + return r.Err } -func (m *MockServiceRegistry) GetService(name string) (*api.Service, error) { - return nil, m.err +func (r *ServiceRegistry) GetService(name string) (*api.Service, error) { + return nil, r.Err } -func (m *MockServiceRegistry) DeleteService(name string) error { - return m.err +func (r *ServiceRegistry) DeleteService(name string) error { + return r.Err } -func (m *MockServiceRegistry) UpdateService(svc api.Service) error { - return m.err +func (r *ServiceRegistry) UpdateService(svc api.Service) error { + return r.Err } -func (m *MockServiceRegistry) UpdateEndpoints(e api.Endpoints) error { - m.endpoints = e - return m.err +func (r *ServiceRegistry) UpdateEndpoints(e api.Endpoints) error { + r.Endpoints = e + return r.Err } diff --git a/pkg/registry/service/registry.go b/pkg/registry/service/registry.go new file mode 100644 index 00000000000..676ad46ab78 --- /dev/null +++ b/pkg/registry/service/registry.go @@ -0,0 +1,31 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package service + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" +) + +// Registry is an interface for things that know how to store services. +type Registry interface { + ListServices() (api.ServiceList, error) + CreateService(svc api.Service) error + GetService(name string) (*api.Service, error) + DeleteService(name string) error + UpdateService(svc api.Service) error + UpdateEndpoints(e api.Endpoints) error +} diff --git a/pkg/registry/servicestorage.go b/pkg/registry/service/storage.go similarity index 72% rename from pkg/registry/servicestorage.go rename to pkg/registry/service/storage.go index 31fae4ad6b4..4dd54a3a3ed 100644 --- a/pkg/registry/servicestorage.go +++ b/pkg/registry/service/storage.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package service import ( "fmt" @@ -25,25 +25,168 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) -// ServiceRegistryStorage adapts a service registry into apiserver's RESTStorage model. -type ServiceRegistryStorage struct { - registry ServiceRegistry +// RegistryStorage adapts a service registry into apiserver's RESTStorage model. +type RegistryStorage struct { + registry Registry cloud cloudprovider.Interface - machines MinionRegistry + machines minion.Registry } -// MakeServiceRegistryStorage makes a new ServiceRegistryStorage. -func MakeServiceRegistryStorage(registry ServiceRegistry, cloud cloudprovider.Interface, machines MinionRegistry) apiserver.RESTStorage { - return &ServiceRegistryStorage{ +// NewRegistryStorage returns a new RegistryStorage. +func NewRegistryStorage(registry Registry, cloud cloudprovider.Interface, machines minion.Registry) apiserver.RESTStorage { + return &RegistryStorage{ registry: registry, cloud: cloud, machines: machines, } } +func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { + srv := obj.(*api.Service) + if errs := api.ValidateService(srv); len(errs) > 0 { + return nil, fmt.Errorf("Validation errors: %v", errs) + } + return apiserver.MakeAsync(func() (interface{}, error) { + // TODO: Consider moving this to a rectification loop, so that we make/remove external load balancers + // correctly no matter what http operations happen. + if srv.CreateExternalLoadBalancer { + if rs.cloud == nil { + return nil, fmt.Errorf("requested an external service, but no cloud provider supplied.") + } + balancer, ok := rs.cloud.TCPLoadBalancer() + if !ok { + return nil, fmt.Errorf("The cloud provider does not support external TCP load balancers.") + } + zones, ok := rs.cloud.Zones() + if !ok { + return nil, fmt.Errorf("The cloud provider does not support zone enumeration.") + } + hosts, err := rs.machines.List() + if err != nil { + return nil, err + } + zone, err := zones.GetZone() + if err != nil { + return nil, err + } + err = balancer.CreateTCPLoadBalancer(srv.ID, zone.Region, srv.Port, hosts) + if err != nil { + return nil, err + } + } + // TODO actually wait for the object to be fully created here. + err := rs.registry.CreateService(*srv) + if err != nil { + return nil, err + } + return rs.registry.GetService(srv.ID) + }), nil +} + +func (rs *RegistryStorage) Delete(id string) (<-chan interface{}, error) { + service, err := rs.registry.GetService(id) + if err != nil { + return nil, err + } + return apiserver.MakeAsync(func() (interface{}, error) { + rs.deleteExternalLoadBalancer(service) + return api.Status{Status: api.StatusSuccess}, rs.registry.DeleteService(id) + }), nil +} + +func (rs *RegistryStorage) Get(id string) (interface{}, error) { + s, err := rs.registry.GetService(id) + if err != nil { + return nil, err + } + return s, err +} + +func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) { + list, err := rs.registry.ListServices() + if err != nil { + return nil, err + } + var filtered []api.Service + for _, service := range list.Items { + if selector.Matches(labels.Set(service.Labels)) { + filtered = append(filtered, service) + } + } + list.Items = filtered + return list, err +} + +func (rs RegistryStorage) New() interface{} { + return &api.Service{} +} + +// GetServiceEnvironmentVariables populates a list of environment variables that are use +// in the container environment to get access to services. +func GetServiceEnvironmentVariables(registry Registry, machine string) ([]api.EnvVar, error) { + var result []api.EnvVar + services, err := registry.ListServices() + if err != nil { + return result, err + } + for _, service := range services.Items { + name := strings.ToUpper(service.ID) + "_SERVICE_PORT" + value := strconv.Itoa(service.Port) + result = append(result, api.EnvVar{Name: name, Value: value}) + result = append(result, makeLinkVariables(service, machine)...) + } + result = append(result, api.EnvVar{Name: "SERVICE_HOST", Value: machine}) + return result, nil +} + +func (rs *RegistryStorage) Update(obj interface{}) (<-chan interface{}, error) { + srv := obj.(*api.Service) + if srv.ID == "" { + return nil, fmt.Errorf("ID should not be empty: %#v", srv) + } + if errs := api.ValidateService(srv); len(errs) > 0 { + return nil, fmt.Errorf("Validation errors: %v", errs) + } + return apiserver.MakeAsync(func() (interface{}, error) { + // TODO: check to see if external load balancer status changed + err := rs.registry.UpdateService(*srv) + if err != nil { + return nil, err + } + return rs.registry.GetService(srv.ID) + }), nil +} + +func (rs *RegistryStorage) deleteExternalLoadBalancer(service *api.Service) error { + if !service.CreateExternalLoadBalancer || rs.cloud == nil { + return nil + } + zones, ok := rs.cloud.Zones() + if !ok { + // We failed to get zone enumerator. + // As this should have failed when we tried in "create" too, + // assume external load balancer was never created. + return nil + } + balancer, ok := rs.cloud.TCPLoadBalancer() + if !ok { + // See comment above. + return nil + } + zone, err := zones.GetZone() + if err != nil { + return err + } + if err := balancer.DeleteTCPLoadBalancer(service.JSONBase.ID, zone.Region); err != nil { + return err + } + return nil +} + func makeLinkVariables(service api.Service, machine string) []api.EnvVar { prefix := strings.ToUpper(service.ID) var port string @@ -76,150 +219,3 @@ func makeLinkVariables(service api.Service, machine string) []api.EnvVar { }, } } - -// GetServiceEnvironmentVariables populates a list of environment variables that are use -// in the container environment to get access to services. -func GetServiceEnvironmentVariables(registry ServiceRegistry, machine string) ([]api.EnvVar, error) { - var result []api.EnvVar - services, err := registry.ListServices() - if err != nil { - return result, err - } - for _, service := range services.Items { - name := strings.ToUpper(service.ID) + "_SERVICE_PORT" - value := strconv.Itoa(service.Port) - result = append(result, api.EnvVar{Name: name, Value: value}) - result = append(result, makeLinkVariables(service, machine)...) - } - result = append(result, api.EnvVar{Name: "SERVICE_HOST", Value: machine}) - return result, nil -} - -func (sr *ServiceRegistryStorage) List(selector labels.Selector) (interface{}, error) { - list, err := sr.registry.ListServices() - if err != nil { - return nil, err - } - var filtered []api.Service - for _, service := range list.Items { - if selector.Matches(labels.Set(service.Labels)) { - filtered = append(filtered, service) - } - } - list.Items = filtered - return list, err -} - -func (sr *ServiceRegistryStorage) Get(id string) (interface{}, error) { - service, err := sr.registry.GetService(id) - if err != nil { - return nil, err - } - return service, err -} - -func (sr *ServiceRegistryStorage) deleteExternalLoadBalancer(service *api.Service) error { - if !service.CreateExternalLoadBalancer || sr.cloud == nil { - return nil - } - - zones, ok := sr.cloud.Zones() - if !ok { - // We failed to get zone enumerator. - // As this should have failed when we tried in "create" too, - // assume external load balancer was never created. - return nil - } - - balancer, ok := sr.cloud.TCPLoadBalancer() - if !ok { - // See comment above. - return nil - } - - zone, err := zones.GetZone() - if err != nil { - return err - } - - if err := balancer.DeleteTCPLoadBalancer(service.JSONBase.ID, zone.Region); err != nil { - return err - } - - return nil -} - -func (sr *ServiceRegistryStorage) Delete(id string) (<-chan interface{}, error) { - service, err := sr.registry.GetService(id) - if err != nil { - return nil, err - } - return apiserver.MakeAsync(func() (interface{}, error) { - sr.deleteExternalLoadBalancer(service) - return &api.Status{Status: api.StatusSuccess}, sr.registry.DeleteService(id) - }), nil -} - -func (sr *ServiceRegistryStorage) New() interface{} { - return &api.Service{} -} - -func (sr *ServiceRegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { - srv := obj.(*api.Service) - if errs := api.ValidateService(srv); len(errs) > 0 { - return nil, fmt.Errorf("Validation errors: %v", errs) - } - return apiserver.MakeAsync(func() (interface{}, error) { - // TODO: Consider moving this to a rectification loop, so that we make/remove external load balancers - // correctly no matter what http operations happen. - if srv.CreateExternalLoadBalancer { - if sr.cloud == nil { - return nil, fmt.Errorf("requested an external service, but no cloud provider supplied.") - } - balancer, ok := sr.cloud.TCPLoadBalancer() - if !ok { - return nil, fmt.Errorf("The cloud provider does not support external TCP load balancers.") - } - zones, ok := sr.cloud.Zones() - if !ok { - return nil, fmt.Errorf("The cloud provider does not support zone enumeration.") - } - hosts, err := sr.machines.List() - if err != nil { - return nil, err - } - zone, err := zones.GetZone() - if err != nil { - return nil, err - } - err = balancer.CreateTCPLoadBalancer(srv.ID, zone.Region, srv.Port, hosts) - if err != nil { - return nil, err - } - } - // TODO actually wait for the object to be fully created here. - err := sr.registry.CreateService(*srv) - if err != nil { - return nil, err - } - return sr.registry.GetService(srv.ID) - }), nil -} - -func (sr *ServiceRegistryStorage) Update(obj interface{}) (<-chan interface{}, error) { - srv := obj.(*api.Service) - if srv.ID == "" { - return nil, fmt.Errorf("ID should not be empty: %#v", srv) - } - if errs := api.ValidateService(srv); len(errs) > 0 { - return nil, fmt.Errorf("Validation errors: %v", errs) - } - return apiserver.MakeAsync(func() (interface{}, error) { - // TODO: check to see if external load balancer status changed - err := sr.registry.UpdateService(*srv) - if err != nil { - return nil, err - } - return sr.registry.GetService(srv.ID) - }), nil -} diff --git a/pkg/registry/servicestorage_test.go b/pkg/registry/service/storage_test.go similarity index 73% rename from pkg/registry/servicestorage_test.go rename to pkg/registry/service/storage_test.go index af1b0c65684..bb61ab0dccd 100644 --- a/pkg/registry/servicestorage_test.go +++ b/pkg/registry/service/storage_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package registry +package service import ( "fmt" @@ -23,40 +23,37 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/memory" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) -func TestServiceRegistry(t *testing.T) { - memory := MakeMemoryRegistry() +func TestRegistry(t *testing.T) { + registry := memory.NewRegistry() fakeCloud := &cloudprovider.FakeCloud{} machines := []string{"foo", "bar", "baz"} - - storage := MakeServiceRegistryStorage(memory, fakeCloud, MakeMinionRegistry(machines)) - + storage := NewRegistryStorage(registry, fakeCloud, minion.NewRegistry(machines)) svc := &api.Service{ JSONBase: api.JSONBase{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, } c, _ := storage.Create(svc) <-c - if len(fakeCloud.Calls) != 0 { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } - srv, err := memory.GetService(svc.ID) + srv, err := registry.GetService(svc.ID) if err != nil { t.Errorf("unexpected error: %v", err) } - if srv == nil { t.Errorf("Failed to find service: %s", svc.ID) } } func TestServiceStorageValidatesCreate(t *testing.T) { - memory := MakeMemoryRegistry() - storage := MakeServiceRegistryStorage(memory, nil, nil) - + registry := memory.NewRegistry() + storage := NewRegistryStorage(registry, nil, nil) failureCases := map[string]api.Service{ "empty ID": { JSONBase: api.JSONBase{ID: ""}, @@ -79,13 +76,12 @@ func TestServiceStorageValidatesCreate(t *testing.T) { } func TestServiceStorageValidatesUpdate(t *testing.T) { - memory := MakeMemoryRegistry() - memory.CreateService(api.Service{ + registry := memory.NewRegistry() + registry.CreateService(api.Service{ JSONBase: api.JSONBase{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, }) - storage := MakeServiceRegistryStorage(memory, nil, nil) - + storage := NewRegistryStorage(registry, nil, nil) failureCases := map[string]api.Service{ "empty ID": { JSONBase: api.JSONBase{ID: ""}, @@ -108,12 +104,10 @@ func TestServiceStorageValidatesUpdate(t *testing.T) { } func TestServiceRegistryExternalService(t *testing.T) { - memory := MakeMemoryRegistry() + registry := memory.NewRegistry() fakeCloud := &cloudprovider.FakeCloud{} machines := []string{"foo", "bar", "baz"} - - storage := MakeServiceRegistryStorage(memory, fakeCloud, MakeMinionRegistry(machines)) - + storage := NewRegistryStorage(registry, fakeCloud, minion.NewRegistry(machines)) svc := &api.Service{ JSONBase: api.JSONBase{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, @@ -121,29 +115,25 @@ func TestServiceRegistryExternalService(t *testing.T) { } c, _ := storage.Create(svc) <-c - if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "create" { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } - srv, err := memory.GetService(svc.ID) + srv, err := registry.GetService(svc.ID) if err != nil { t.Errorf("unexpected error: %v", err) } - if srv == nil { t.Errorf("Failed to find service: %s", svc.ID) } } func TestServiceRegistryExternalServiceError(t *testing.T) { - memory := MakeMemoryRegistry() + registry := memory.NewRegistry() fakeCloud := &cloudprovider.FakeCloud{ Err: fmt.Errorf("test error"), } machines := []string{"foo", "bar", "baz"} - - storage := MakeServiceRegistryStorage(memory, fakeCloud, MakeMinionRegistry(machines)) - + storage := NewRegistryStorage(registry, fakeCloud, minion.NewRegistry(machines)) svc := &api.Service{ JSONBase: api.JSONBase{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, @@ -151,75 +141,66 @@ func TestServiceRegistryExternalServiceError(t *testing.T) { } c, _ := storage.Create(svc) <-c - if len(fakeCloud.Calls) != 1 || fakeCloud.Calls[0] != "get-zone" { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } - srv, err := memory.GetService("foo") + srv, err := registry.GetService("foo") if !apiserver.IsNotFound(err) { if err != nil { - t.Errorf("memory.GetService(%q) failed with %v; expected failure with not found error", svc.ID, err) + t.Errorf("registry.GetService(%q) failed with %v; expected failure with not found error", svc.ID, err) } else { - t.Errorf("memory.GetService(%q) = %v; expected failure with not found error", svc.ID, srv) + t.Errorf("registry.GetService(%q) = %v; expected failure with not found error", svc.ID, srv) } } } func TestServiceRegistryDelete(t *testing.T) { - memory := MakeMemoryRegistry() + registry := memory.NewRegistry() fakeCloud := &cloudprovider.FakeCloud{} machines := []string{"foo", "bar", "baz"} - - storage := MakeServiceRegistryStorage(memory, fakeCloud, MakeMinionRegistry(machines)) - + storage := NewRegistryStorage(registry, fakeCloud, minion.NewRegistry(machines)) svc := api.Service{ JSONBase: api.JSONBase{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, } - memory.CreateService(svc) - + registry.CreateService(svc) c, _ := storage.Delete(svc.ID) <-c - if len(fakeCloud.Calls) != 0 { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } - srv, err := memory.GetService(svc.ID) + srv, err := registry.GetService(svc.ID) if !apiserver.IsNotFound(err) { if err != nil { - t.Errorf("memory.GetService(%q) failed with %v; expected failure with not found error", svc.ID, err) + t.Errorf("registry.GetService(%q) failed with %v; expected failure with not found error", svc.ID, err) } else { - t.Errorf("memory.GetService(%q) = %v; expected failure with not found error", svc.ID, srv) + t.Errorf("registry.GetService(%q) = %v; expected failure with not found error", svc.ID, srv) } } } func TestServiceRegistryDeleteExternal(t *testing.T) { - memory := MakeMemoryRegistry() + registry := memory.NewRegistry() fakeCloud := &cloudprovider.FakeCloud{} machines := []string{"foo", "bar", "baz"} - - storage := MakeServiceRegistryStorage(memory, fakeCloud, MakeMinionRegistry(machines)) - + storage := NewRegistryStorage(registry, fakeCloud, minion.NewRegistry(machines)) svc := api.Service{ JSONBase: api.JSONBase{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, CreateExternalLoadBalancer: true, } - memory.CreateService(svc) - + registry.CreateService(svc) c, _ := storage.Delete(svc.ID) <-c - if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "delete" { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } - srv, err := memory.GetService(svc.ID) + srv, err := registry.GetService(svc.ID) if !apiserver.IsNotFound(err) { if err != nil { - t.Errorf("memory.GetService(%q) failed with %v; expected failure with not found error", svc.ID, err) + t.Errorf("registry.GetService(%q) failed with %v; expected failure with not found error", svc.ID, err) } else { - t.Errorf("memory.GetService(%q) = %v; expected failure with not found error", svc.ID, srv) + t.Errorf("registry.GetService(%q) = %v; expected failure with not found error", svc.ID, srv) } } } diff --git a/pkg/tools/etcd_tools_test.go b/pkg/tools/etcd_tools_test.go index 0ec93083260..1762f8cb132 100644 --- a/pkg/tools/etcd_tools_test.go +++ b/pkg/tools/etcd_tools_test.go @@ -70,15 +70,15 @@ func TestExtractList(t *testing.T) { Node: &etcd.Node{ Nodes: []*etcd.Node{ { - Value: `{"id":"foo"}`, + Value: `{"id":"foo"}`, ModifiedIndex: 1, }, { - Value: `{"id":"bar"}`, + Value: `{"id":"bar"}`, ModifiedIndex: 2, }, { - Value: `{"id":"baz"}`, + Value: `{"id":"baz"}`, ModifiedIndex: 3, }, },