From bafc422ac0427feaf202569c3eb68e4cf1dd79bb Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Thu, 28 Aug 2014 20:48:07 -0400 Subject: [PATCH] Add the resource version to api.*List items from etcd Allows clients to watch more easily (can invoke Get, then Watch). --- pkg/master/pod_cache.go | 2 +- pkg/master/pod_cache_test.go | 2 +- pkg/registry/controller/registry.go | 2 +- pkg/registry/controller/storage.go | 18 +++++----- pkg/registry/controller/storage_test.go | 44 ++++++++++++++----------- pkg/registry/etcd/etcd.go | 28 ++++++++-------- pkg/registry/etcd/etcd_test.go | 14 ++++---- pkg/registry/pod/registry.go | 2 +- pkg/registry/pod/storage.go | 8 ++--- pkg/registry/pod/storage_test.go | 31 +++++++++-------- pkg/registry/registrytest/controller.go | 4 +-- pkg/registry/registrytest/pod.go | 14 ++++---- pkg/registry/registrytest/service.go | 4 +-- pkg/registry/service/registry.go | 2 +- pkg/registry/service/storage_test.go | 6 +++- pkg/tools/etcd_tools.go | 21 ++++++++---- pkg/tools/etcd_tools_test.go | 7 +++- 17 files changed, 120 insertions(+), 89 deletions(-) diff --git a/pkg/master/pod_cache.go b/pkg/master/pod_cache.go index 674c6c8b9c6..75efa6aabfd 100644 --- a/pkg/master/pod_cache.go +++ b/pkg/master/pod_cache.go @@ -77,7 +77,7 @@ func (p *PodCache) UpdateAllContainers() { glog.Errorf("Error synchronizing container list: %v", err) return } - for _, pod := range pods { + for _, pod := range pods.Items { err := p.updatePodInfo(pod.CurrentState.Host, pod.ID) if err != nil && err != client.ErrPodInfoNotAvailable { glog.Errorf("Error synchronizing container: %v", err) diff --git a/pkg/master/pod_cache_test.go b/pkg/master/pod_cache_test.go index 9f369662bab..ed45e58a6ab 100644 --- a/pkg/master/pod_cache_test.go +++ b/pkg/master/pod_cache_test.go @@ -96,7 +96,7 @@ func TestPodUpdateAllContainers(t *testing.T) { } pods := []api.Pod{pod} - mockRegistry := registrytest.NewPodRegistry(pods) + mockRegistry := registrytest.NewPodRegistry(&api.PodList{Items: pods}) expected := api.PodInfo{"foo": docker.Container{ID: "foo"}} fake := FakePodInfoGetter{ diff --git a/pkg/registry/controller/registry.go b/pkg/registry/controller/registry.go index f88f503b4ff..ab6d27be14b 100644 --- a/pkg/registry/controller/registry.go +++ b/pkg/registry/controller/registry.go @@ -23,7 +23,7 @@ import ( // Registry is an interface for things that know how to store ReplicationControllers. type Registry interface { - ListControllers() ([]api.ReplicationController, error) + ListControllers() (*api.ReplicationControllerList, error) WatchControllers(resourceVersion uint64) (watch.Interface, error) GetController(controllerID string) (*api.ReplicationController, error) CreateController(controller api.ReplicationController) error diff --git a/pkg/registry/controller/storage.go b/pkg/registry/controller/storage.go index 79835aba932..0a21a0fbbf5 100644 --- a/pkg/registry/controller/storage.go +++ b/pkg/registry/controller/storage.go @@ -92,16 +92,18 @@ func (rs *RegistryStorage) Get(id string) (interface{}, error) { // List obtains a list of ReplicationControllers that match selector. func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) { - result := api.ReplicationControllerList{} controllers, err := rs.registry.ListControllers() - if err == nil { - for _, controller := range controllers { - if selector.Matches(labels.Set(controller.Labels)) { - result.Items = append(result.Items, controller) - } + if err != nil { + return nil, err + } + filtered := []api.ReplicationController{} + for _, controller := range controllers.Items { + if selector.Matches(labels.Set(controller.Labels)) { + filtered = append(filtered, controller) } } - return result, err + controllers.Items = filtered + return controllers, err } // New creates a new ReplicationController for use with Create and Update. @@ -150,7 +152,7 @@ func (rs *RegistryStorage) waitForController(ctrl api.ReplicationController) (in if err != nil { return ctrl, err } - if len(pods) == ctrl.DesiredState.Replicas { + if len(pods.Items) == ctrl.DesiredState.Replicas { break } time.Sleep(rs.pollPeriod) diff --git a/pkg/registry/controller/storage_test.go b/pkg/registry/controller/storage_test.go index d24eccb2eef..be4b3c54547 100644 --- a/pkg/registry/controller/storage_test.go +++ b/pkg/registry/controller/storage_test.go @@ -37,18 +37,17 @@ func TestListControllersError(t *testing.T) { storage := RegistryStorage{ registry: &mockRegistry, } - controllersObj, err := storage.List(nil) - controllers := controllersObj.(api.ReplicationControllerList) + controllers, err := storage.List(nil) if err != mockRegistry.Err { t.Errorf("Expected %#v, Got %#v", mockRegistry.Err, err) } - if len(controllers.Items) != 0 { - t.Errorf("Unexpected non-zero ctrl list: %#v", controllers) + if controllers != nil { + t.Errorf("Unexpected non-nil ctrl list: %#v", controllers) } } func TestListEmptyControllerList(t *testing.T) { - mockRegistry := registrytest.ControllerRegistry{} + mockRegistry := registrytest.ControllerRegistry{nil, &api.ReplicationControllerList{JSONBase: api.JSONBase{ResourceVersion: 1}}} storage := RegistryStorage{ registry: &mockRegistry, } @@ -57,22 +56,27 @@ func TestListEmptyControllerList(t *testing.T) { t.Errorf("unexpected error: %v", err) } - if len(controllers.(api.ReplicationControllerList).Items) != 0 { + if len(controllers.(*api.ReplicationControllerList).Items) != 0 { t.Errorf("Unexpected non-zero ctrl list: %#v", controllers) } + if controllers.(*api.ReplicationControllerList).ResourceVersion != 1 { + t.Errorf("Unexpected resource version: %#v", controllers) + } } func TestListControllerList(t *testing.T) { mockRegistry := registrytest.ControllerRegistry{ - Controllers: []api.ReplicationController{ - { - JSONBase: api.JSONBase{ - ID: "foo", + Controllers: &api.ReplicationControllerList{ + Items: []api.ReplicationController{ + { + JSONBase: api.JSONBase{ + ID: "foo", + }, }, - }, - { - JSONBase: api.JSONBase{ - ID: "bar", + { + JSONBase: api.JSONBase{ + ID: "bar", + }, }, }, }, @@ -81,7 +85,7 @@ func TestListControllerList(t *testing.T) { registry: &mockRegistry, } controllersObj, err := storage.List(labels.Everything()) - controllers := controllersObj.(api.ReplicationControllerList) + controllers := controllersObj.(*api.ReplicationControllerList) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -212,10 +216,12 @@ var validPodTemplate = api.PodTemplate{ func TestCreateController(t *testing.T) { mockRegistry := registrytest.ControllerRegistry{} mockPodRegistry := registrytest.PodRegistry{ - Pods: []api.Pod{ - { - JSONBase: api.JSONBase{ID: "foo"}, - Labels: map[string]string{"a": "b"}, + Pods: &api.PodList{ + Items: []api.Pod{ + { + JSONBase: api.JSONBase{ID: "foo"}, + Labels: map[string]string{"a": "b"}, + }, }, }, } diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index e0bf67740a6..64b1e90160f 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -59,22 +59,24 @@ func makePodKey(podID string) string { } // ListPods obtains a list of pods that match selector. -func (r *Registry) ListPods(selector labels.Selector) ([]api.Pod, error) { - allPods := []api.Pod{} - filteredPods := []api.Pod{} - if err := r.ExtractList("/registry/pods", &allPods); err != nil { +func (r *Registry) ListPods(selector labels.Selector) (*api.PodList, error) { + allPods := api.PodList{} + err := r.ExtractList("/registry/pods", &allPods.Items, &allPods.ResourceVersion) + if err != nil { return nil, err } - for _, pod := range allPods { + filtered := []api.Pod{} + for _, pod := range allPods.Items { if selector.Matches(labels.Set(pod.Labels)) { // TODO: Currently nothing sets CurrentState.Host. We need a feedback loop that sets // the CurrentState.Host and Status fields. Here we pretend that reality perfectly // matches our desires. pod.CurrentState.Host = pod.DesiredState.Host - filteredPods = append(filteredPods, pod) + filtered = append(filtered, pod) } } - return filteredPods, nil + allPods.Items = filtered + return &allPods, nil } // WatchPods begins watching for new, changed, or deleted pods. @@ -225,9 +227,9 @@ func (r *Registry) DeletePod(podID string) error { } // ListControllers obtains a list of ReplicationControllers. -func (r *Registry) ListControllers() ([]api.ReplicationController, error) { - var controllers []api.ReplicationController - err := r.ExtractList("/registry/controllers", &controllers) +func (r *Registry) ListControllers() (*api.ReplicationControllerList, error) { + controllers := &api.ReplicationControllerList{} + err := r.ExtractList("/registry/controllers", &controllers.Items, &controllers.ResourceVersion) return controllers, err } @@ -283,9 +285,9 @@ func makeServiceKey(name string) string { } // ListServices obtains a list of Services. -func (r *Registry) ListServices() (api.ServiceList, error) { - var list api.ServiceList - err := r.ExtractList("/registry/services/specs", &list.Items) +func (r *Registry) ListServices() (*api.ServiceList, error) { + list := &api.ServiceList{} + err := r.ExtractList("/registry/services/specs", &list.Items, &list.ResourceVersion) return list, err } diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index c2780a5a67f..f976728c6da 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -412,7 +412,7 @@ func TestEtcdEmptyListPods(t *testing.T) { t.Errorf("unexpected error: %v", err) } - if len(pods) != 0 { + if len(pods.Items) != 0 { t.Errorf("Unexpected pod list: %#v", pods) } } @@ -430,7 +430,7 @@ func TestEtcdListPodsNotFound(t *testing.T) { t.Errorf("unexpected error: %v", err) } - if len(pods) != 0 { + if len(pods.Items) != 0 { t.Errorf("Unexpected pod list: %#v", pods) } } @@ -465,11 +465,11 @@ func TestEtcdListPods(t *testing.T) { t.Errorf("unexpected error: %v", err) } - if len(pods) != 2 || pods[0].ID != "foo" || pods[1].ID != "bar" { + if len(pods.Items) != 2 || pods.Items[0].ID != "foo" || pods.Items[1].ID != "bar" { t.Errorf("Unexpected pod list: %#v", pods) } - if pods[0].CurrentState.Host != "machine" || - pods[1].CurrentState.Host != "machine" { + if pods.Items[0].CurrentState.Host != "machine" || + pods.Items[1].CurrentState.Host != "machine" { t.Errorf("Failed to populate host name.") } } @@ -487,7 +487,7 @@ func TestEtcdListControllersNotFound(t *testing.T) { t.Errorf("unexpected error: %v", err) } - if len(controllers) != 0 { + if len(controllers.Items) != 0 { t.Errorf("Unexpected controller list: %#v", controllers) } } @@ -534,7 +534,7 @@ func TestEtcdListControllers(t *testing.T) { t.Errorf("unexpected error: %v", err) } - if len(controllers) != 2 || controllers[0].ID != "foo" || controllers[1].ID != "bar" { + if len(controllers.Items) != 2 || controllers.Items[0].ID != "foo" || controllers.Items[1].ID != "bar" { t.Errorf("Unexpected controller list: %#v", controllers) } } diff --git a/pkg/registry/pod/registry.go b/pkg/registry/pod/registry.go index 2aaf488deab..af8fd91db47 100644 --- a/pkg/registry/pod/registry.go +++ b/pkg/registry/pod/registry.go @@ -25,7 +25,7 @@ import ( // Registry is an interface implemented by things that know how to store Pod objects. type Registry interface { // ListPods obtains a list of pods that match selector. - ListPods(selector labels.Selector) ([]api.Pod, error) + ListPods(selector labels.Selector) (*api.PodList, error) // Watch for new/changed/deleted pods WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) // Get a specific pod diff --git a/pkg/registry/pod/storage.go b/pkg/registry/pod/storage.go index 56ad464b699..b51b1601c0d 100644 --- a/pkg/registry/pod/storage.go +++ b/pkg/registry/pod/storage.go @@ -105,15 +105,13 @@ func (rs *RegistryStorage) Get(id string) (interface{}, error) { } func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) { - var result api.PodList pods, err := rs.registry.ListPods(selector) if err == nil { - result.Items = pods - for i := range result.Items { - rs.fillPodInfo(&result.Items[i]) + for i := range pods.Items { + rs.fillPodInfo(&pods.Items[i]) } } - return result, err + return pods, err } // Watch begins watching for new, changed, or deleted pods. diff --git a/pkg/registry/pod/storage_test.go b/pkg/registry/pod/storage_test.go index 1ffb1280452..46da24c3c61 100644 --- a/pkg/registry/pod/storage_test.go +++ b/pkg/registry/pod/storage_test.go @@ -108,13 +108,13 @@ func TestListPodsError(t *testing.T) { if err != podRegistry.Err { t.Errorf("Expected %#v, Got %#v", podRegistry.Err, err) } - if len(pods.(api.PodList).Items) != 0 { - t.Errorf("Unexpected non-zero pod list: %#v", pods) + if pods.(*api.PodList) != nil { + t.Errorf("Unexpected non-nil pod list: %#v", pods) } } func TestListEmptyPodList(t *testing.T) { - podRegistry := registrytest.NewPodRegistry(nil) + podRegistry := registrytest.NewPodRegistry(&api.PodList{JSONBase: api.JSONBase{ResourceVersion: 1}}) storage := RegistryStorage{ registry: podRegistry, } @@ -123,22 +123,27 @@ func TestListEmptyPodList(t *testing.T) { t.Errorf("unexpected error: %v", err) } - if len(pods.(api.PodList).Items) != 0 { + if len(pods.(*api.PodList).Items) != 0 { t.Errorf("Unexpected non-zero pod list: %#v", pods) } + if pods.(*api.PodList).ResourceVersion != 1 { + t.Errorf("Unexpected resource version: %#v", pods) + } } func TestListPodList(t *testing.T) { podRegistry := registrytest.NewPodRegistry(nil) - podRegistry.Pods = []api.Pod{ - { - JSONBase: api.JSONBase{ - ID: "foo", + podRegistry.Pods = &api.PodList{ + Items: []api.Pod{ + { + JSONBase: api.JSONBase{ + ID: "foo", + }, }, - }, - { - JSONBase: api.JSONBase{ - ID: "bar", + { + JSONBase: api.JSONBase{ + ID: "bar", + }, }, }, } @@ -146,7 +151,7 @@ func TestListPodList(t *testing.T) { registry: podRegistry, } podsObj, err := storage.List(labels.Everything()) - pods := podsObj.(api.PodList) + pods := podsObj.(*api.PodList) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/pkg/registry/registrytest/controller.go b/pkg/registry/registrytest/controller.go index be857dbaf4d..76c2f26a970 100644 --- a/pkg/registry/registrytest/controller.go +++ b/pkg/registry/registrytest/controller.go @@ -24,10 +24,10 @@ import ( // TODO: Why do we have this AND MemoryRegistry? type ControllerRegistry struct { Err error - Controllers []api.ReplicationController + Controllers *api.ReplicationControllerList } -func (r *ControllerRegistry) ListControllers() ([]api.ReplicationController, error) { +func (r *ControllerRegistry) ListControllers() (*api.ReplicationControllerList, error) { return r.Controllers, r.Err } diff --git a/pkg/registry/registrytest/pod.go b/pkg/registry/registrytest/pod.go index d097e7dafad..60aa66976e7 100644 --- a/pkg/registry/registrytest/pod.go +++ b/pkg/registry/registrytest/pod.go @@ -27,32 +27,34 @@ import ( type PodRegistry struct { Err error Pod *api.Pod - Pods []api.Pod + Pods *api.PodList sync.Mutex mux *watch.Mux } -func NewPodRegistry(pods []api.Pod) *PodRegistry { +func NewPodRegistry(pods *api.PodList) *PodRegistry { return &PodRegistry{ Pods: pods, mux: watch.NewMux(0), } } -func (r *PodRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) { +func (r *PodRegistry) ListPods(selector labels.Selector) (*api.PodList, error) { r.Lock() defer r.Unlock() if r.Err != nil { - return r.Pods, r.Err + return nil, r.Err } var filtered []api.Pod - for _, pod := range r.Pods { + for _, pod := range r.Pods.Items { if selector.Matches(labels.Set(pod.Labels)) { filtered = append(filtered, pod) } } - return filtered, nil + pods := *r.Pods + pods.Items = filtered + return &pods, nil } func (r *PodRegistry) WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) { diff --git a/pkg/registry/registrytest/service.go b/pkg/registry/registrytest/service.go index 15324c50dc8..e82d1033f16 100644 --- a/pkg/registry/registrytest/service.go +++ b/pkg/registry/registrytest/service.go @@ -37,8 +37,8 @@ type ServiceRegistry struct { UpdatedID string } -func (r *ServiceRegistry) ListServices() (api.ServiceList, error) { - return r.List, r.Err +func (r *ServiceRegistry) ListServices() (*api.ServiceList, error) { + return &r.List, r.Err } func (r *ServiceRegistry) CreateService(svc api.Service) error { diff --git a/pkg/registry/service/registry.go b/pkg/registry/service/registry.go index bcbfa461657..26af6077ff5 100644 --- a/pkg/registry/service/registry.go +++ b/pkg/registry/service/registry.go @@ -25,7 +25,7 @@ import ( // Registry is an interface for things that know how to store services. type Registry interface { - ListServices() (api.ServiceList, error) + ListServices() (*api.ServiceList, error) CreateService(svc api.Service) error GetService(name string) (*api.Service, error) DeleteService(name string) error diff --git a/pkg/registry/service/storage_test.go b/pkg/registry/service/storage_test.go index f561eed9531..0186a4635f0 100644 --- a/pkg/registry/service/storage_test.go +++ b/pkg/registry/service/storage_test.go @@ -317,8 +317,9 @@ func TestServiceRegistryList(t *testing.T) { JSONBase: api.JSONBase{ID: "foo2"}, Selector: map[string]string{"bar2": "baz2"}, }) + registry.List.ResourceVersion = 1 s, _ := storage.List(labels.Everything()) - sl := s.(api.ServiceList) + sl := s.(*api.ServiceList) if len(fakeCloud.Calls) != 0 { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } @@ -331,4 +332,7 @@ func TestServiceRegistryList(t *testing.T) { if e, a := "foo2", sl.Items[1].ID; e != a { t.Errorf("Expected %v, but got %v", e, a) } + if sl.ResourceVersion != 1 { + t.Errorf("Unexpected resource version: %#v", sl) + } } diff --git a/pkg/tools/etcd_tools.go b/pkg/tools/etcd_tools.go index 90bad267643..056f6e62687 100644 --- a/pkg/tools/etcd_tools.go +++ b/pkg/tools/etcd_tools.go @@ -117,22 +117,29 @@ func etcdErrorIndex(err error) (uint64, bool) { return 0, false } -func (h *EtcdHelper) listEtcdNode(key string) ([]*etcd.Node, error) { +func (h *EtcdHelper) listEtcdNode(key string) ([]*etcd.Node, uint64, error) { result, err := h.Client.Get(key, false, true) if err != nil { + index, ok := etcdErrorIndex(err) + if !ok { + index = 0 + } nodes := make([]*etcd.Node, 0) if IsEtcdNotFound(err) { - return nodes, nil + return nodes, index, nil } else { - return nodes, err + return nodes, index, err } } - return result.Node.Nodes, nil + return result.Node.Nodes, result.EtcdIndex, nil } -// Extract a go object per etcd node into a slice. -func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}) error { - nodes, err := h.listEtcdNode(key) +// Extract a go object per etcd node into a slice with the resource version. +func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}, resourceVersion *uint64) error { + nodes, index, err := h.listEtcdNode(key) + if resourceVersion != nil { + *resourceVersion = index + } if err != nil { return err } diff --git a/pkg/tools/etcd_tools_test.go b/pkg/tools/etcd_tools_test.go index b641a061cc9..216e1bfd898 100644 --- a/pkg/tools/etcd_tools_test.go +++ b/pkg/tools/etcd_tools_test.go @@ -66,6 +66,7 @@ func TestExtractList(t *testing.T) { fakeClient := NewFakeEtcdClient(t) fakeClient.Data["/some/key"] = EtcdResponseWithError{ R: &etcd.Response{ + EtcdIndex: 10, Node: &etcd.Node{ Nodes: []*etcd.Node{ { @@ -92,10 +93,14 @@ func TestExtractList(t *testing.T) { var got []api.Pod helper := EtcdHelper{fakeClient, codec, versioner} - err := helper.ExtractList("/some/key", &got) + resourceVersion := uint64(0) + err := helper.ExtractList("/some/key", &got, &resourceVersion) if err != nil { t.Errorf("Unexpected error %#v", err) } + if resourceVersion != 10 { + t.Errorf("Unexpected resource version %d", resourceVersion) + } for i := 0; i < len(expect); i++ { if !reflect.DeepEqual(got[i], expect[i]) {