Add the resource version to api.*List items from etcd

Allows clients to watch more easily (can invoke Get, then
Watch).
This commit is contained in:
Clayton Coleman
2014-08-28 20:48:07 -04:00
parent c380dd0426
commit bafc422ac0
17 changed files with 120 additions and 89 deletions

View File

@@ -23,7 +23,7 @@ import (
// Registry is an interface for things that know how to store ReplicationControllers.
type Registry interface {
ListControllers() ([]api.ReplicationController, error)
ListControllers() (*api.ReplicationControllerList, error)
WatchControllers(resourceVersion uint64) (watch.Interface, error)
GetController(controllerID string) (*api.ReplicationController, error)
CreateController(controller api.ReplicationController) error

View File

@@ -92,16 +92,18 @@ func (rs *RegistryStorage) Get(id string) (interface{}, error) {
// List obtains a list of ReplicationControllers that match selector.
func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) {
result := api.ReplicationControllerList{}
controllers, err := rs.registry.ListControllers()
if err == nil {
for _, controller := range controllers {
if selector.Matches(labels.Set(controller.Labels)) {
result.Items = append(result.Items, controller)
}
if err != nil {
return nil, err
}
filtered := []api.ReplicationController{}
for _, controller := range controllers.Items {
if selector.Matches(labels.Set(controller.Labels)) {
filtered = append(filtered, controller)
}
}
return result, err
controllers.Items = filtered
return controllers, err
}
// New creates a new ReplicationController for use with Create and Update.
@@ -150,7 +152,7 @@ func (rs *RegistryStorage) waitForController(ctrl api.ReplicationController) (in
if err != nil {
return ctrl, err
}
if len(pods) == ctrl.DesiredState.Replicas {
if len(pods.Items) == ctrl.DesiredState.Replicas {
break
}
time.Sleep(rs.pollPeriod)

View File

@@ -37,18 +37,17 @@ func TestListControllersError(t *testing.T) {
storage := RegistryStorage{
registry: &mockRegistry,
}
controllersObj, err := storage.List(nil)
controllers := controllersObj.(api.ReplicationControllerList)
controllers, err := storage.List(nil)
if err != mockRegistry.Err {
t.Errorf("Expected %#v, Got %#v", mockRegistry.Err, err)
}
if len(controllers.Items) != 0 {
t.Errorf("Unexpected non-zero ctrl list: %#v", controllers)
if controllers != nil {
t.Errorf("Unexpected non-nil ctrl list: %#v", controllers)
}
}
func TestListEmptyControllerList(t *testing.T) {
mockRegistry := registrytest.ControllerRegistry{}
mockRegistry := registrytest.ControllerRegistry{nil, &api.ReplicationControllerList{JSONBase: api.JSONBase{ResourceVersion: 1}}}
storage := RegistryStorage{
registry: &mockRegistry,
}
@@ -57,22 +56,27 @@ func TestListEmptyControllerList(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
if len(controllers.(api.ReplicationControllerList).Items) != 0 {
if len(controllers.(*api.ReplicationControllerList).Items) != 0 {
t.Errorf("Unexpected non-zero ctrl list: %#v", controllers)
}
if controllers.(*api.ReplicationControllerList).ResourceVersion != 1 {
t.Errorf("Unexpected resource version: %#v", controllers)
}
}
func TestListControllerList(t *testing.T) {
mockRegistry := registrytest.ControllerRegistry{
Controllers: []api.ReplicationController{
{
JSONBase: api.JSONBase{
ID: "foo",
Controllers: &api.ReplicationControllerList{
Items: []api.ReplicationController{
{
JSONBase: api.JSONBase{
ID: "foo",
},
},
},
{
JSONBase: api.JSONBase{
ID: "bar",
{
JSONBase: api.JSONBase{
ID: "bar",
},
},
},
},
@@ -81,7 +85,7 @@ func TestListControllerList(t *testing.T) {
registry: &mockRegistry,
}
controllersObj, err := storage.List(labels.Everything())
controllers := controllersObj.(api.ReplicationControllerList)
controllers := controllersObj.(*api.ReplicationControllerList)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@@ -212,10 +216,12 @@ var validPodTemplate = api.PodTemplate{
func TestCreateController(t *testing.T) {
mockRegistry := registrytest.ControllerRegistry{}
mockPodRegistry := registrytest.PodRegistry{
Pods: []api.Pod{
{
JSONBase: api.JSONBase{ID: "foo"},
Labels: map[string]string{"a": "b"},
Pods: &api.PodList{
Items: []api.Pod{
{
JSONBase: api.JSONBase{ID: "foo"},
Labels: map[string]string{"a": "b"},
},
},
},
}

View File

@@ -59,22 +59,24 @@ func makePodKey(podID string) string {
}
// ListPods obtains a list of pods that match selector.
func (r *Registry) ListPods(selector labels.Selector) ([]api.Pod, error) {
allPods := []api.Pod{}
filteredPods := []api.Pod{}
if err := r.ExtractList("/registry/pods", &allPods); err != nil {
func (r *Registry) ListPods(selector labels.Selector) (*api.PodList, error) {
allPods := api.PodList{}
err := r.ExtractList("/registry/pods", &allPods.Items, &allPods.ResourceVersion)
if err != nil {
return nil, err
}
for _, pod := range allPods {
filtered := []api.Pod{}
for _, pod := range allPods.Items {
if selector.Matches(labels.Set(pod.Labels)) {
// TODO: Currently nothing sets CurrentState.Host. We need a feedback loop that sets
// the CurrentState.Host and Status fields. Here we pretend that reality perfectly
// matches our desires.
pod.CurrentState.Host = pod.DesiredState.Host
filteredPods = append(filteredPods, pod)
filtered = append(filtered, pod)
}
}
return filteredPods, nil
allPods.Items = filtered
return &allPods, nil
}
// WatchPods begins watching for new, changed, or deleted pods.
@@ -225,9 +227,9 @@ func (r *Registry) DeletePod(podID string) error {
}
// ListControllers obtains a list of ReplicationControllers.
func (r *Registry) ListControllers() ([]api.ReplicationController, error) {
var controllers []api.ReplicationController
err := r.ExtractList("/registry/controllers", &controllers)
func (r *Registry) ListControllers() (*api.ReplicationControllerList, error) {
controllers := &api.ReplicationControllerList{}
err := r.ExtractList("/registry/controllers", &controllers.Items, &controllers.ResourceVersion)
return controllers, err
}
@@ -283,9 +285,9 @@ func makeServiceKey(name string) string {
}
// ListServices obtains a list of Services.
func (r *Registry) ListServices() (api.ServiceList, error) {
var list api.ServiceList
err := r.ExtractList("/registry/services/specs", &list.Items)
func (r *Registry) ListServices() (*api.ServiceList, error) {
list := &api.ServiceList{}
err := r.ExtractList("/registry/services/specs", &list.Items, &list.ResourceVersion)
return list, err
}

View File

@@ -412,7 +412,7 @@ func TestEtcdEmptyListPods(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
if len(pods) != 0 {
if len(pods.Items) != 0 {
t.Errorf("Unexpected pod list: %#v", pods)
}
}
@@ -430,7 +430,7 @@ func TestEtcdListPodsNotFound(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
if len(pods) != 0 {
if len(pods.Items) != 0 {
t.Errorf("Unexpected pod list: %#v", pods)
}
}
@@ -465,11 +465,11 @@ func TestEtcdListPods(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
if len(pods) != 2 || pods[0].ID != "foo" || pods[1].ID != "bar" {
if len(pods.Items) != 2 || pods.Items[0].ID != "foo" || pods.Items[1].ID != "bar" {
t.Errorf("Unexpected pod list: %#v", pods)
}
if pods[0].CurrentState.Host != "machine" ||
pods[1].CurrentState.Host != "machine" {
if pods.Items[0].CurrentState.Host != "machine" ||
pods.Items[1].CurrentState.Host != "machine" {
t.Errorf("Failed to populate host name.")
}
}
@@ -487,7 +487,7 @@ func TestEtcdListControllersNotFound(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
if len(controllers) != 0 {
if len(controllers.Items) != 0 {
t.Errorf("Unexpected controller list: %#v", controllers)
}
}
@@ -534,7 +534,7 @@ func TestEtcdListControllers(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
if len(controllers) != 2 || controllers[0].ID != "foo" || controllers[1].ID != "bar" {
if len(controllers.Items) != 2 || controllers.Items[0].ID != "foo" || controllers.Items[1].ID != "bar" {
t.Errorf("Unexpected controller list: %#v", controllers)
}
}

View File

@@ -25,7 +25,7 @@ import (
// Registry is an interface implemented by things that know how to store Pod objects.
type Registry interface {
// ListPods obtains a list of pods that match selector.
ListPods(selector labels.Selector) ([]api.Pod, error)
ListPods(selector labels.Selector) (*api.PodList, error)
// Watch for new/changed/deleted pods
WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error)
// Get a specific pod

View File

@@ -105,15 +105,13 @@ func (rs *RegistryStorage) Get(id string) (interface{}, error) {
}
func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) {
var result api.PodList
pods, err := rs.registry.ListPods(selector)
if err == nil {
result.Items = pods
for i := range result.Items {
rs.fillPodInfo(&result.Items[i])
for i := range pods.Items {
rs.fillPodInfo(&pods.Items[i])
}
}
return result, err
return pods, err
}
// Watch begins watching for new, changed, or deleted pods.

View File

@@ -108,13 +108,13 @@ func TestListPodsError(t *testing.T) {
if err != podRegistry.Err {
t.Errorf("Expected %#v, Got %#v", podRegistry.Err, err)
}
if len(pods.(api.PodList).Items) != 0 {
t.Errorf("Unexpected non-zero pod list: %#v", pods)
if pods.(*api.PodList) != nil {
t.Errorf("Unexpected non-nil pod list: %#v", pods)
}
}
func TestListEmptyPodList(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil)
podRegistry := registrytest.NewPodRegistry(&api.PodList{JSONBase: api.JSONBase{ResourceVersion: 1}})
storage := RegistryStorage{
registry: podRegistry,
}
@@ -123,22 +123,27 @@ func TestListEmptyPodList(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
if len(pods.(api.PodList).Items) != 0 {
if len(pods.(*api.PodList).Items) != 0 {
t.Errorf("Unexpected non-zero pod list: %#v", pods)
}
if pods.(*api.PodList).ResourceVersion != 1 {
t.Errorf("Unexpected resource version: %#v", pods)
}
}
func TestListPodList(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Pods = []api.Pod{
{
JSONBase: api.JSONBase{
ID: "foo",
podRegistry.Pods = &api.PodList{
Items: []api.Pod{
{
JSONBase: api.JSONBase{
ID: "foo",
},
},
},
{
JSONBase: api.JSONBase{
ID: "bar",
{
JSONBase: api.JSONBase{
ID: "bar",
},
},
},
}
@@ -146,7 +151,7 @@ func TestListPodList(t *testing.T) {
registry: podRegistry,
}
podsObj, err := storage.List(labels.Everything())
pods := podsObj.(api.PodList)
pods := podsObj.(*api.PodList)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

View File

@@ -24,10 +24,10 @@ import (
// TODO: Why do we have this AND MemoryRegistry?
type ControllerRegistry struct {
Err error
Controllers []api.ReplicationController
Controllers *api.ReplicationControllerList
}
func (r *ControllerRegistry) ListControllers() ([]api.ReplicationController, error) {
func (r *ControllerRegistry) ListControllers() (*api.ReplicationControllerList, error) {
return r.Controllers, r.Err
}

View File

@@ -27,32 +27,34 @@ import (
type PodRegistry struct {
Err error
Pod *api.Pod
Pods []api.Pod
Pods *api.PodList
sync.Mutex
mux *watch.Mux
}
func NewPodRegistry(pods []api.Pod) *PodRegistry {
func NewPodRegistry(pods *api.PodList) *PodRegistry {
return &PodRegistry{
Pods: pods,
mux: watch.NewMux(0),
}
}
func (r *PodRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) {
func (r *PodRegistry) ListPods(selector labels.Selector) (*api.PodList, error) {
r.Lock()
defer r.Unlock()
if r.Err != nil {
return r.Pods, r.Err
return nil, r.Err
}
var filtered []api.Pod
for _, pod := range r.Pods {
for _, pod := range r.Pods.Items {
if selector.Matches(labels.Set(pod.Labels)) {
filtered = append(filtered, pod)
}
}
return filtered, nil
pods := *r.Pods
pods.Items = filtered
return &pods, nil
}
func (r *PodRegistry) WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) {

View File

@@ -37,8 +37,8 @@ type ServiceRegistry struct {
UpdatedID string
}
func (r *ServiceRegistry) ListServices() (api.ServiceList, error) {
return r.List, r.Err
func (r *ServiceRegistry) ListServices() (*api.ServiceList, error) {
return &r.List, r.Err
}
func (r *ServiceRegistry) CreateService(svc api.Service) error {

View File

@@ -25,7 +25,7 @@ import (
// Registry is an interface for things that know how to store services.
type Registry interface {
ListServices() (api.ServiceList, error)
ListServices() (*api.ServiceList, error)
CreateService(svc api.Service) error
GetService(name string) (*api.Service, error)
DeleteService(name string) error

View File

@@ -317,8 +317,9 @@ func TestServiceRegistryList(t *testing.T) {
JSONBase: api.JSONBase{ID: "foo2"},
Selector: map[string]string{"bar2": "baz2"},
})
registry.List.ResourceVersion = 1
s, _ := storage.List(labels.Everything())
sl := s.(api.ServiceList)
sl := s.(*api.ServiceList)
if len(fakeCloud.Calls) != 0 {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
}
@@ -331,4 +332,7 @@ func TestServiceRegistryList(t *testing.T) {
if e, a := "foo2", sl.Items[1].ID; e != a {
t.Errorf("Expected %v, but got %v", e, a)
}
if sl.ResourceVersion != 1 {
t.Errorf("Unexpected resource version: %#v", sl)
}
}