From aa30e381839e61398834cc362d3f8b0406f06abc Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Mon, 26 Oct 2015 14:56:51 +0100 Subject: [PATCH] Pass resource version to storage List operation. --- pkg/api/rest/rest.go | 6 ++++-- pkg/api/rest/resttest/resttest.go | 8 ++++---- pkg/apiserver/apiserver_test.go | 2 +- pkg/apiserver/resthandler.go | 2 +- pkg/master/master.go | 4 ++-- pkg/master/master_test.go | 6 +++--- pkg/master/thirdparty_controller.go | 2 +- pkg/registry/componentstatus/rest.go | 2 +- pkg/registry/componentstatus/rest_test.go | 6 +++--- pkg/registry/controller/registry.go | 6 +++--- pkg/registry/deployment/registry.go | 6 +++--- pkg/registry/endpoint/registry.go | 6 +++--- pkg/registry/generic/etcd/etcd.go | 15 +++++++++++---- pkg/registry/generic/etcd/etcd_test.go | 2 +- pkg/registry/job/registry.go | 6 +++--- pkg/registry/namespace/registry.go | 6 +++--- pkg/registry/node/registry.go | 6 +++--- pkg/registry/registrytest/endpoint.go | 2 +- pkg/registry/registrytest/node.go | 2 +- pkg/registry/registrytest/service.go | 2 +- pkg/registry/secret/registry.go | 6 +++--- .../service/ipallocator/controller/repair.go | 3 ++- .../service/portallocator/controller/repair.go | 3 ++- pkg/registry/service/registry.go | 6 +++--- pkg/registry/service/rest.go | 4 ++-- pkg/registry/service/rest_test.go | 2 +- pkg/registry/serviceaccount/registry.go | 6 +++--- pkg/registry/thirdpartyresourcedata/registry.go | 6 +++--- pkg/storage/cacher.go | 6 +++--- pkg/storage/etcd/etcd_helper.go | 2 +- pkg/storage/etcd/etcd_helper_test.go | 6 +++--- pkg/storage/interfaces.go | 4 +++- 32 files changed, 82 insertions(+), 69 deletions(-) diff --git a/pkg/api/rest/rest.go b/pkg/api/rest/rest.go index 57be322a7ed..55bfa60f70c 100644 --- a/pkg/api/rest/rest.go +++ b/pkg/api/rest/rest.go @@ -61,8 +61,9 @@ type Lister interface { // NewList returns an empty object that can be used with the List call. // This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object) NewList() runtime.Object - // List selects resources in the storage which match to the selector. - List(ctx api.Context, label labels.Selector, field fields.Selector) (runtime.Object, error) + // List selects resources in the storage which match to the selector. 'options' can be nil. + // TODO: Move 'label' and 'field' to 'options'. + List(ctx api.Context, label labels.Selector, field fields.Selector, options *api.ListOptions) (runtime.Object, error) } // Getter is an object that can retrieve a named RESTful resource. @@ -183,6 +184,7 @@ type Watcher interface { // are supported; an error should be returned if 'field' tries to select on a field that // isn't supported. 'resourceVersion' allows for continuing/starting a watch at a // particular version. + // TODO: Replace 'label', 'field' and 'resourceVersion' with ListOptions. Watch(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) } diff --git a/pkg/api/rest/resttest/resttest.go b/pkg/api/rest/resttest/resttest.go index 7d8bed11b4f..db223d90688 100644 --- a/pkg/api/rest/resttest/resttest.go +++ b/pkg/api/rest/resttest/resttest.go @@ -853,7 +853,7 @@ func (t *Tester) testListError() { storageError := fmt.Errorf("test error") t.withStorageError(storageError, func() { - _, err := t.storage.(rest.Lister).List(ctx, labels.Everything(), fields.Everything()) + _, err := t.storage.(rest.Lister).List(ctx, labels.Everything(), fields.Everything(), nil) if err != storageError { t.Errorf("unexpected error: %v", err) } @@ -870,7 +870,7 @@ func (t *Tester) testListFound(obj runtime.Object, assignFn AssignFunc) { existing := assignFn([]runtime.Object{foo1, foo2}) - listObj, err := t.storage.(rest.Lister).List(ctx, labels.Everything(), fields.Everything()) + listObj, err := t.storage.(rest.Lister).List(ctx, labels.Everything(), fields.Everything(), nil) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -902,7 +902,7 @@ func (t *Tester) testListMatchLabels(obj runtime.Object, assignFn AssignFunc) { filtered := []runtime.Object{existing[1]} selector := labels.SelectorFromSet(labels.Set(testLabels)) - listObj, err := t.storage.(rest.Lister).List(ctx, selector, fields.Everything()) + listObj, err := t.storage.(rest.Lister).List(ctx, selector, fields.Everything(), nil) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -924,7 +924,7 @@ func (t *Tester) testListNotFound(assignFn AssignFunc, setRVFn SetRVFunc) { setRVFn(uint64(123)) _ = assignFn([]runtime.Object{}) - listObj, err := t.storage.(rest.Lister).List(ctx, labels.Everything(), fields.Everything()) + listObj, err := t.storage.(rest.Lister).List(ctx, labels.Everything(), fields.Everything(), nil) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index 6d0f5f22d15..dd5433e635a 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -307,7 +307,7 @@ type SimpleRESTStorage struct { injectedFunction func(obj runtime.Object) (returnObj runtime.Object, err error) } -func (storage *SimpleRESTStorage) List(ctx api.Context, label labels.Selector, field fields.Selector) (runtime.Object, error) { +func (storage *SimpleRESTStorage) List(ctx api.Context, label labels.Selector, field fields.Selector, options *api.ListOptions) (runtime.Object, error) { storage.checkContext(ctx) result := &apiservertesting.SimpleList{ Items: storage.list, diff --git a/pkg/apiserver/resthandler.go b/pkg/apiserver/resthandler.go index 037adecbf7c..1af59a4a922 100644 --- a/pkg/apiserver/resthandler.go +++ b/pkg/apiserver/resthandler.go @@ -284,7 +284,7 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch return } - result, err := r.List(ctx, opts.LabelSelector, opts.FieldSelector) + result, err := r.List(ctx, opts.LabelSelector, opts.FieldSelector, &opts) if err != nil { errorJSON(err, scope.Codec, w) return diff --git a/pkg/master/master.go b/pkg/master/master.go index 624f4d7bcd8..5e6c7b2a144 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -935,7 +935,7 @@ func (m *Master) RemoveThirdPartyResource(path string) error { func (m *Master) removeAllThirdPartyResources(registry *thirdpartyresourcedataetcd.REST) error { ctx := api.NewDefaultContext() - existingData, err := registry.List(ctx, labels.Everything(), fields.Everything()) + existingData, err := registry.List(ctx, labels.Everything(), fields.Everything(), nil) if err != nil { return err } @@ -1137,7 +1137,7 @@ func findExternalAddress(node *api.Node) (string, error) { } func (m *Master) getNodeAddresses() ([]string, error) { - nodes, err := m.nodeRegistry.ListNodes(api.NewDefaultContext(), labels.Everything(), fields.Everything()) + nodes, err := m.nodeRegistry.ListNodes(api.NewDefaultContext(), labels.Everything(), fields.Everything(), nil) if err != nil { return nil, err } diff --git a/pkg/master/master_test.go b/pkg/master/master_test.go index d1d8b380b4a..89a3f374ce4 100644 --- a/pkg/master/master_test.go +++ b/pkg/master/master_test.go @@ -338,14 +338,14 @@ func TestGetNodeAddresses(t *testing.T) { master, _, assert := setUp(t) // Fail case (no addresses associated with nodes) - nodes, _ := master.nodeRegistry.ListNodes(api.NewDefaultContext(), labels.Everything(), fields.Everything()) + nodes, _ := master.nodeRegistry.ListNodes(api.NewDefaultContext(), labels.Everything(), fields.Everything(), nil) addrs, err := master.getNodeAddresses() assert.Error(err, "getNodeAddresses should have caused an error as there are no addresses.") assert.Equal([]string(nil), addrs) // Pass case with External type IP - nodes, _ = master.nodeRegistry.ListNodes(api.NewDefaultContext(), labels.Everything(), fields.Everything()) + nodes, _ = master.nodeRegistry.ListNodes(api.NewDefaultContext(), labels.Everything(), fields.Everything(), nil) for index := range nodes.Items { nodes.Items[index].Status.Addresses = []api.NodeAddress{{Type: api.NodeExternalIP, Address: "127.0.0.1"}} } @@ -354,7 +354,7 @@ func TestGetNodeAddresses(t *testing.T) { assert.Equal([]string{"127.0.0.1", "127.0.0.1"}, addrs) // Pass case with LegacyHost type IP - nodes, _ = master.nodeRegistry.ListNodes(api.NewDefaultContext(), labels.Everything(), fields.Everything()) + nodes, _ = master.nodeRegistry.ListNodes(api.NewDefaultContext(), labels.Everything(), fields.Everything(), nil) for index := range nodes.Items { nodes.Items[index].Status.Addresses = []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "127.0.0.2"}} } diff --git a/pkg/master/thirdparty_controller.go b/pkg/master/thirdparty_controller.go index 806de9f45f4..e29766d5c4d 100644 --- a/pkg/master/thirdparty_controller.go +++ b/pkg/master/thirdparty_controller.go @@ -76,7 +76,7 @@ func (t *ThirdPartyController) SyncOneResource(rsrc *expapi.ThirdPartyResource) // Synchronize all resources with RESTful resources on the master func (t *ThirdPartyController) SyncResources() error { - list, err := t.thirdPartyResourceRegistry.List(api.NewDefaultContext(), labels.Everything(), fields.Everything()) + list, err := t.thirdPartyResourceRegistry.List(api.NewDefaultContext(), labels.Everything(), fields.Everything(), nil) if err != nil { return err } diff --git a/pkg/registry/componentstatus/rest.go b/pkg/registry/componentstatus/rest.go index 517f73b1dfc..90f99ed0c26 100644 --- a/pkg/registry/componentstatus/rest.go +++ b/pkg/registry/componentstatus/rest.go @@ -51,7 +51,7 @@ func (rs *REST) NewList() runtime.Object { // Returns the list of component status. Note that the label and field are both ignored. // Note that this call doesn't support labels or selectors. -func (rs *REST) List(ctx api.Context, label labels.Selector, field fields.Selector) (runtime.Object, error) { +func (rs *REST) List(ctx api.Context, label labels.Selector, field fields.Selector, options *api.ListOptions) (runtime.Object, error) { servers := rs.GetServersToValidate() // TODO: This should be parallelized. diff --git a/pkg/registry/componentstatus/rest_test.go b/pkg/registry/componentstatus/rest_test.go index 9accaeff282..32fd6a38131 100644 --- a/pkg/registry/componentstatus/rest_test.go +++ b/pkg/registry/componentstatus/rest_test.go @@ -78,7 +78,7 @@ func createTestStatus(name string, status api.ConditionStatus, msg string, err s func TestList_NoError(t *testing.T) { r := NewTestREST(testResponse{code: 200, data: "ok"}) - got, err := r.List(api.NewContext(), labels.Everything(), fields.Everything()) + got, err := r.List(api.NewContext(), labels.Everything(), fields.Everything(), nil) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -92,7 +92,7 @@ func TestList_NoError(t *testing.T) { func TestList_FailedCheck(t *testing.T) { r := NewTestREST(testResponse{code: 500, data: ""}) - got, err := r.List(api.NewContext(), labels.Everything(), fields.Everything()) + got, err := r.List(api.NewContext(), labels.Everything(), fields.Everything(), nil) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -107,7 +107,7 @@ func TestList_FailedCheck(t *testing.T) { func TestList_UnknownError(t *testing.T) { r := NewTestREST(testResponse{code: 500, data: "", err: fmt.Errorf("fizzbuzz error")}) - got, err := r.List(api.NewContext(), labels.Everything(), fields.Everything()) + got, err := r.List(api.NewContext(), labels.Everything(), fields.Everything(), nil) if err != nil { t.Fatalf("Unexpected error: %v", err) } diff --git a/pkg/registry/controller/registry.go b/pkg/registry/controller/registry.go index 78e1b61e455..de0be2d7a1b 100644 --- a/pkg/registry/controller/registry.go +++ b/pkg/registry/controller/registry.go @@ -28,7 +28,7 @@ import ( // Registry is an interface for things that know how to store ReplicationControllers. type Registry interface { - ListControllers(ctx api.Context, label labels.Selector, field fields.Selector) (*api.ReplicationControllerList, error) + ListControllers(ctx api.Context, label labels.Selector, field fields.Selector, options *api.ListOptions) (*api.ReplicationControllerList, error) WatchControllers(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) GetController(ctx api.Context, controllerID string) (*api.ReplicationController, error) CreateController(ctx api.Context, controller *api.ReplicationController) (*api.ReplicationController, error) @@ -48,11 +48,11 @@ func NewRegistry(s rest.StandardStorage) Registry { } // List obtains a list of ReplicationControllers that match selector. -func (s *storage) ListControllers(ctx api.Context, label labels.Selector, field fields.Selector) (*api.ReplicationControllerList, error) { +func (s *storage) ListControllers(ctx api.Context, label labels.Selector, field fields.Selector, options *api.ListOptions) (*api.ReplicationControllerList, error) { if !field.Empty() { return nil, fmt.Errorf("field selector not supported yet") } - obj, err := s.List(ctx, label, field) + obj, err := s.List(ctx, label, field, options) if err != nil { return nil, err } diff --git a/pkg/registry/deployment/registry.go b/pkg/registry/deployment/registry.go index 12e76a25a92..018ee3bc52b 100644 --- a/pkg/registry/deployment/registry.go +++ b/pkg/registry/deployment/registry.go @@ -28,7 +28,7 @@ import ( // Registry is an interface for things that know how to store Deployments. type Registry interface { - ListDeployments(ctx api.Context, label labels.Selector, field fields.Selector) (*extensions.DeploymentList, error) + ListDeployments(ctx api.Context, label labels.Selector, field fields.Selector, options *api.ListOptions) (*extensions.DeploymentList, error) GetDeployment(ctx api.Context, deploymentID string) (*extensions.Deployment, error) CreateDeployment(ctx api.Context, deployment *extensions.Deployment) (*extensions.Deployment, error) UpdateDeployment(ctx api.Context, deployment *extensions.Deployment) (*extensions.Deployment, error) @@ -46,11 +46,11 @@ func NewRegistry(s rest.StandardStorage) Registry { } // List obtains a list of Deployments that match selector. -func (s *storage) ListDeployments(ctx api.Context, label labels.Selector, field fields.Selector) (*extensions.DeploymentList, error) { +func (s *storage) ListDeployments(ctx api.Context, label labels.Selector, field fields.Selector, options *api.ListOptions) (*extensions.DeploymentList, error) { if !field.Empty() { return nil, fmt.Errorf("field selector not supported yet") } - obj, err := s.List(ctx, label, field) + obj, err := s.List(ctx, label, field, options) if err != nil { return nil, err } diff --git a/pkg/registry/endpoint/registry.go b/pkg/registry/endpoint/registry.go index 775ef78bb56..c9607c5ab0a 100644 --- a/pkg/registry/endpoint/registry.go +++ b/pkg/registry/endpoint/registry.go @@ -26,7 +26,7 @@ import ( // Registry is an interface for things that know how to store endpoints. type Registry interface { - ListEndpoints(ctx api.Context) (*api.EndpointsList, error) + ListEndpoints(ctx api.Context, label labels.Selector, field fields.Selector, options *api.ListOptions) (*api.EndpointsList, error) GetEndpoints(ctx api.Context, name string) (*api.Endpoints, error) WatchEndpoints(ctx api.Context, labels labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) UpdateEndpoints(ctx api.Context, e *api.Endpoints) error @@ -44,8 +44,8 @@ func NewRegistry(s rest.StandardStorage) Registry { return &storage{s} } -func (s *storage) ListEndpoints(ctx api.Context) (*api.EndpointsList, error) { - obj, err := s.List(ctx, labels.Everything(), fields.Everything()) +func (s *storage) ListEndpoints(ctx api.Context, label labels.Selector, field fields.Selector, options *api.ListOptions) (*api.EndpointsList, error) { + obj, err := s.List(ctx, label, field, options) if err != nil { return nil, err } diff --git a/pkg/registry/generic/etcd/etcd.go b/pkg/registry/generic/etcd/etcd.go index 42e52ec64c2..2fde3f1059c 100644 --- a/pkg/registry/generic/etcd/etcd.go +++ b/pkg/registry/generic/etcd/etcd.go @@ -159,12 +159,12 @@ func (e *Etcd) NewList() runtime.Object { } // List returns a list of items matching labels and field -func (e *Etcd) List(ctx api.Context, label labels.Selector, field fields.Selector) (runtime.Object, error) { - return e.ListPredicate(ctx, e.PredicateFunc(label, field)) +func (e *Etcd) List(ctx api.Context, label labels.Selector, field fields.Selector, options *api.ListOptions) (runtime.Object, error) { + return e.ListPredicate(ctx, e.PredicateFunc(label, field), options) } // ListPredicate returns a list of all the items matching m. -func (e *Etcd) ListPredicate(ctx api.Context, m generic.Matcher) (runtime.Object, error) { +func (e *Etcd) ListPredicate(ctx api.Context, m generic.Matcher, options *api.ListOptions) (runtime.Object, error) { list := e.NewListFunc() trace := util.NewTrace("List " + reflect.TypeOf(list).String()) filterFunc := e.filterAndDecorateFunction(m) @@ -183,7 +183,14 @@ func (e *Etcd) ListPredicate(ctx api.Context, m generic.Matcher) (runtime.Object } trace.Step("About to list directory") - err := e.Storage.List(ctx, e.KeyRootFunc(ctx), filterFunc, list) + if options == nil { + options = &api.ListOptions{ResourceVersion: "0"} + } + version, err := storage.ParseWatchResourceVersion(options.ResourceVersion, e.EndpointName) + if err != nil { + return nil, err + } + err = e.Storage.List(ctx, e.KeyRootFunc(ctx), version, filterFunc, list) trace.Step("List extracted") if err != nil { return nil, err diff --git a/pkg/registry/generic/etcd/etcd_test.go b/pkg/registry/generic/etcd/etcd_test.go index ff98bc0d871..98a6ec38b9f 100644 --- a/pkg/registry/generic/etcd/etcd_test.go +++ b/pkg/registry/generic/etcd/etcd_test.go @@ -242,7 +242,7 @@ func TestEtcdList(t *testing.T) { key = etcdtest.AddPrefix(key) fakeClient.Data[key] = item.in } - list, err := registry.ListPredicate(ctx, item.m) + list, err := registry.ListPredicate(ctx, item.m, nil) if e, a := item.succeed, err == nil; e != a { t.Errorf("%v: expected %v, got %v: %v", name, e, a, err) continue diff --git a/pkg/registry/job/registry.go b/pkg/registry/job/registry.go index fb36dfa5512..fc0da64e334 100644 --- a/pkg/registry/job/registry.go +++ b/pkg/registry/job/registry.go @@ -30,7 +30,7 @@ import ( // Registry is an interface for things that know how to store Jobs. type Registry interface { // ListJobs obtains a list of Jobs having labels and fields which match selector. - ListJobs(ctx api.Context, label labels.Selector, field fields.Selector) (*extensions.JobList, error) + ListJobs(ctx api.Context, label labels.Selector, field fields.Selector, options *api.ListOptions) (*extensions.JobList, error) // WatchJobs watch for new/changed/deleted Jobs. WatchJobs(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) // GetJobs gets a specific Job. @@ -54,11 +54,11 @@ func NewRegistry(s rest.StandardStorage) Registry { return &storage{s} } -func (s *storage) ListJobs(ctx api.Context, label labels.Selector, field fields.Selector) (*extensions.JobList, error) { +func (s *storage) ListJobs(ctx api.Context, label labels.Selector, field fields.Selector, options *api.ListOptions) (*extensions.JobList, error) { if !field.Empty() { return nil, fmt.Errorf("field selector not supported yet") } - obj, err := s.List(ctx, label, field) + obj, err := s.List(ctx, label, field, options) if err != nil { return nil, err } diff --git a/pkg/registry/namespace/registry.go b/pkg/registry/namespace/registry.go index a24dc14a56b..a7801a58fc1 100644 --- a/pkg/registry/namespace/registry.go +++ b/pkg/registry/namespace/registry.go @@ -27,7 +27,7 @@ import ( // Registry is an interface implemented by things that know how to store Namespace objects. type Registry interface { // ListNamespaces obtains a list of namespaces having labels which match selector. - ListNamespaces(ctx api.Context, selector labels.Selector) (*api.NamespaceList, error) + ListNamespaces(ctx api.Context, label labels.Selector, field fields.Selector, options *api.ListOptions) (*api.NamespaceList, error) // Watch for new/changed/deleted namespaces WatchNamespaces(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) // Get a specific namespace @@ -51,8 +51,8 @@ func NewRegistry(s rest.StandardStorage) Registry { return &storage{s} } -func (s *storage) ListNamespaces(ctx api.Context, label labels.Selector) (*api.NamespaceList, error) { - obj, err := s.List(ctx, label, fields.Everything()) +func (s *storage) ListNamespaces(ctx api.Context, label labels.Selector, field fields.Selector, options *api.ListOptions) (*api.NamespaceList, error) { + obj, err := s.List(ctx, label, field, options) if err != nil { return nil, err } diff --git a/pkg/registry/node/registry.go b/pkg/registry/node/registry.go index e587b6c0f14..5f9159a4a32 100644 --- a/pkg/registry/node/registry.go +++ b/pkg/registry/node/registry.go @@ -26,7 +26,7 @@ import ( // Registry is an interface for things that know how to store node. type Registry interface { - ListNodes(ctx api.Context, label labels.Selector, field fields.Selector) (*api.NodeList, error) + ListNodes(ctx api.Context, label labels.Selector, field fields.Selector, options *api.ListOptions) (*api.NodeList, error) CreateNode(ctx api.Context, node *api.Node) error UpdateNode(ctx api.Context, node *api.Node) error GetNode(ctx api.Context, nodeID string) (*api.Node, error) @@ -45,8 +45,8 @@ func NewRegistry(s rest.StandardStorage) Registry { return &storage{s} } -func (s *storage) ListNodes(ctx api.Context, label labels.Selector, field fields.Selector) (*api.NodeList, error) { - obj, err := s.List(ctx, label, field) +func (s *storage) ListNodes(ctx api.Context, label labels.Selector, field fields.Selector, options *api.ListOptions) (*api.NodeList, error) { + obj, err := s.List(ctx, label, field, options) if err != nil { return nil, err } diff --git a/pkg/registry/registrytest/endpoint.go b/pkg/registry/registrytest/endpoint.go index a389caf27fa..98adc9007e5 100644 --- a/pkg/registry/registrytest/endpoint.go +++ b/pkg/registry/registrytest/endpoint.go @@ -36,7 +36,7 @@ type EndpointRegistry struct { lock sync.Mutex } -func (e *EndpointRegistry) ListEndpoints(ctx api.Context) (*api.EndpointsList, error) { +func (e *EndpointRegistry) ListEndpoints(ctx api.Context, label labels.Selector, field fields.Selector, options *api.ListOptions) (*api.EndpointsList, error) { // TODO: support namespaces in this mock e.lock.Lock() defer e.lock.Unlock() diff --git a/pkg/registry/registrytest/node.go b/pkg/registry/registrytest/node.go index c2d2476f568..00c29a0ff39 100644 --- a/pkg/registry/registrytest/node.go +++ b/pkg/registry/registrytest/node.go @@ -59,7 +59,7 @@ func (r *NodeRegistry) SetError(err error) { r.Err = err } -func (r *NodeRegistry) ListNodes(ctx api.Context, label labels.Selector, field fields.Selector) (*api.NodeList, error) { +func (r *NodeRegistry) ListNodes(ctx api.Context, label labels.Selector, field fields.Selector, options *api.ListOptions) (*api.NodeList, error) { r.Lock() defer r.Unlock() return &r.Nodes, r.Err diff --git a/pkg/registry/registrytest/service.go b/pkg/registry/registrytest/service.go index 56439454fab..7b63702c8fb 100644 --- a/pkg/registry/registrytest/service.go +++ b/pkg/registry/registrytest/service.go @@ -46,7 +46,7 @@ func (r *ServiceRegistry) SetError(err error) { r.Err = err } -func (r *ServiceRegistry) ListServices(ctx api.Context, label labels.Selector, field fields.Selector) (*api.ServiceList, error) { +func (r *ServiceRegistry) ListServices(ctx api.Context, label labels.Selector, field fields.Selector, options *api.ListOptions) (*api.ServiceList, error) { r.mu.Lock() defer r.mu.Unlock() diff --git a/pkg/registry/secret/registry.go b/pkg/registry/secret/registry.go index 1ec4b8c244a..98d19e2a650 100644 --- a/pkg/registry/secret/registry.go +++ b/pkg/registry/secret/registry.go @@ -27,7 +27,7 @@ import ( // Registry is an interface implemented by things that know how to store Secret objects. type Registry interface { // ListSecrets obtains a list of Secrets having labels which match selector. - ListSecrets(ctx api.Context, selector labels.Selector) (*api.SecretList, error) + ListSecrets(ctx api.Context, label labels.Selector, field fields.Selector, options *api.ListOptions) (*api.SecretList, error) // Watch for new/changed/deleted secrets WatchSecrets(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) // Get a specific Secret @@ -51,8 +51,8 @@ func NewRegistry(s rest.StandardStorage) Registry { return &storage{s} } -func (s *storage) ListSecrets(ctx api.Context, label labels.Selector) (*api.SecretList, error) { - obj, err := s.List(ctx, label, fields.Everything()) +func (s *storage) ListSecrets(ctx api.Context, label labels.Selector, field fields.Selector, options *api.ListOptions) (*api.SecretList, error) { + obj, err := s.List(ctx, label, field, options) if err != nil { return nil, err } diff --git a/pkg/registry/service/ipallocator/controller/repair.go b/pkg/registry/service/ipallocator/controller/repair.go index fcf4b062de7..d5bdf757aac 100644 --- a/pkg/registry/service/ipallocator/controller/repair.go +++ b/pkg/registry/service/ipallocator/controller/repair.go @@ -96,7 +96,8 @@ func (c *Repair) RunOnce() error { } ctx := api.WithNamespace(api.NewDefaultContext(), api.NamespaceAll) - list, err := c.registry.ListServices(ctx, labels.Everything(), fields.Everything()) + options := &api.ListOptions{ResourceVersion: latest.ObjectMeta.ResourceVersion} + list, err := c.registry.ListServices(ctx, labels.Everything(), fields.Everything(), options) if err != nil { return fmt.Errorf("unable to refresh the service IP block: %v", err) } diff --git a/pkg/registry/service/portallocator/controller/repair.go b/pkg/registry/service/portallocator/controller/repair.go index 936221e2d20..6e3840372db 100644 --- a/pkg/registry/service/portallocator/controller/repair.go +++ b/pkg/registry/service/portallocator/controller/repair.go @@ -81,7 +81,8 @@ func (c *Repair) RunOnce() error { } ctx := api.WithNamespace(api.NewDefaultContext(), api.NamespaceAll) - list, err := c.registry.ListServices(ctx, labels.Everything(), fields.Everything()) + options := &api.ListOptions{ResourceVersion: latest.ObjectMeta.ResourceVersion} + list, err := c.registry.ListServices(ctx, labels.Everything(), fields.Everything(), options) if err != nil { return fmt.Errorf("unable to refresh the port block: %v", err) } diff --git a/pkg/registry/service/registry.go b/pkg/registry/service/registry.go index 27fad645acd..53f08e018ea 100644 --- a/pkg/registry/service/registry.go +++ b/pkg/registry/service/registry.go @@ -26,7 +26,7 @@ import ( // Registry is an interface for things that know how to store services. type Registry interface { - ListServices(ctx api.Context, label labels.Selector, field fields.Selector) (*api.ServiceList, error) + ListServices(ctx api.Context, label labels.Selector, field fields.Selector, options *api.ListOptions) (*api.ServiceList, error) CreateService(ctx api.Context, svc *api.Service) (*api.Service, error) GetService(ctx api.Context, name string) (*api.Service, error) DeleteService(ctx api.Context, name string) error @@ -45,8 +45,8 @@ func NewRegistry(s rest.StandardStorage) Registry { return &storage{s} } -func (s *storage) ListServices(ctx api.Context, label labels.Selector, field fields.Selector) (*api.ServiceList, error) { - obj, err := s.List(ctx, label, field) +func (s *storage) ListServices(ctx api.Context, label labels.Selector, field fields.Selector, options *api.ListOptions) (*api.ServiceList, error) { + obj, err := s.List(ctx, label, field, options) if err != nil { return nil, err } diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index 70d573790e8..218d4dd4aab 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -173,8 +173,8 @@ func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) { return rs.registry.GetService(ctx, id) } -func (rs *REST) List(ctx api.Context, label labels.Selector, field fields.Selector) (runtime.Object, error) { - return rs.registry.ListServices(ctx, label, field) +func (rs *REST) List(ctx api.Context, label labels.Selector, field fields.Selector, options *api.ListOptions) (runtime.Object, error) { + return rs.registry.ListServices(ctx, label, field, options) } // Watch returns Services events via a watch.Interface. diff --git a/pkg/registry/service/rest_test.go b/pkg/registry/service/rest_test.go index 6e9ea4a5720..4e336541163 100644 --- a/pkg/registry/service/rest_test.go +++ b/pkg/registry/service/rest_test.go @@ -543,7 +543,7 @@ func TestServiceRegistryList(t *testing.T) { }, }) registry.List.ResourceVersion = "1" - s, _ := storage.List(ctx, labels.Everything(), fields.Everything()) + s, _ := storage.List(ctx, labels.Everything(), fields.Everything(), nil) sl := s.(*api.ServiceList) if len(sl.Items) != 2 { t.Fatalf("Expected 2 services, but got %v", len(sl.Items)) diff --git a/pkg/registry/serviceaccount/registry.go b/pkg/registry/serviceaccount/registry.go index 85e5586dfae..e62928b00ec 100644 --- a/pkg/registry/serviceaccount/registry.go +++ b/pkg/registry/serviceaccount/registry.go @@ -27,7 +27,7 @@ import ( // Registry is an interface implemented by things that know how to store ServiceAccount objects. type Registry interface { // ListServiceAccounts obtains a list of ServiceAccounts having labels which match selector. - ListServiceAccounts(ctx api.Context, selector labels.Selector) (*api.ServiceAccountList, error) + ListServiceAccounts(ctx api.Context, label labels.Selector, field fields.Selector, options *api.ListOptions) (*api.ServiceAccountList, error) // Watch for new/changed/deleted service accounts WatchServiceAccounts(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) // Get a specific ServiceAccount @@ -51,8 +51,8 @@ func NewRegistry(s rest.StandardStorage) Registry { return &storage{s} } -func (s *storage) ListServiceAccounts(ctx api.Context, label labels.Selector) (*api.ServiceAccountList, error) { - obj, err := s.List(ctx, label, fields.Everything()) +func (s *storage) ListServiceAccounts(ctx api.Context, label labels.Selector, field fields.Selector, options *api.ListOptions) (*api.ServiceAccountList, error) { + obj, err := s.List(ctx, label, field, options) if err != nil { return nil, err } diff --git a/pkg/registry/thirdpartyresourcedata/registry.go b/pkg/registry/thirdpartyresourcedata/registry.go index d80aa0718e7..32346c7a2a0 100644 --- a/pkg/registry/thirdpartyresourcedata/registry.go +++ b/pkg/registry/thirdpartyresourcedata/registry.go @@ -28,7 +28,7 @@ import ( // Registry is an interface implemented by things that know how to store ThirdPartyResourceData objects. type Registry interface { // ListThirdPartyResourceData obtains a list of ThirdPartyResourceData having labels which match selector. - ListThirdPartyResourceData(ctx api.Context, selector labels.Selector) (*extensions.ThirdPartyResourceDataList, error) + ListThirdPartyResourceData(ctx api.Context, label labels.Selector, field fields.Selector, options *api.ListOptions) (*extensions.ThirdPartyResourceDataList, error) // Watch for new/changed/deleted ThirdPartyResourceData WatchThirdPartyResourceData(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) // Get a specific ThirdPartyResourceData @@ -52,8 +52,8 @@ func NewRegistry(s rest.StandardStorage) Registry { return &storage{s} } -func (s *storage) ListThirdPartyResourceData(ctx api.Context, label labels.Selector) (*extensions.ThirdPartyResourceDataList, error) { - obj, err := s.List(ctx, label, fields.Everything()) +func (s *storage) ListThirdPartyResourceData(ctx api.Context, label labels.Selector, field fields.Selector, options *api.ListOptions) (*extensions.ThirdPartyResourceDataList, error) { + obj, err := s.List(ctx, label, field, options) if err != nil { return nil, err } diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index 7fd1c68c071..eca176baaf9 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -219,8 +219,8 @@ func (c *Cacher) GetToList(ctx context.Context, key string, filter FilterFunc, l } // Implements storage.Interface. -func (c *Cacher) List(ctx context.Context, key string, filter FilterFunc, listObj runtime.Object) error { - return c.storage.List(ctx, key, filter, listObj) +func (c *Cacher) List(ctx context.Context, key string, resourceVersion uint64, filter FilterFunc, listObj runtime.Object) error { + return c.storage.List(ctx, key, resourceVersion, filter, listObj) } // ListFromMemory implements list operation (the same signature as List method) @@ -344,7 +344,7 @@ func newCacherListerWatcher(storage Interface, resourcePrefix string, newListFun // Implements cache.ListerWatcher interface. func (lw *cacherListerWatcher) List() (runtime.Object, error) { list := lw.newListFunc() - if err := lw.storage.List(context.TODO(), lw.resourcePrefix, Everything, list); err != nil { + if err := lw.storage.List(context.TODO(), lw.resourcePrefix, 0, Everything, list); err != nil { return nil, err } return list, nil diff --git a/pkg/storage/etcd/etcd_helper.go b/pkg/storage/etcd/etcd_helper.go index 2767e47e84b..327f489bdf6 100644 --- a/pkg/storage/etcd/etcd_helper.go +++ b/pkg/storage/etcd/etcd_helper.go @@ -350,7 +350,7 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFun } // Implements storage.Interface. -func (h *etcdHelper) List(ctx context.Context, key string, filter storage.FilterFunc, listObj runtime.Object) error { +func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion uint64, filter storage.FilterFunc, listObj runtime.Object) error { if ctx == nil { glog.Errorf("Context is nil") } diff --git a/pkg/storage/etcd/etcd_helper_test.go b/pkg/storage/etcd/etcd_helper_test.go index 9f6fd3dce4a..f0b9a216f28 100644 --- a/pkg/storage/etcd/etcd_helper_test.go +++ b/pkg/storage/etcd/etcd_helper_test.go @@ -135,7 +135,7 @@ func TestList(t *testing.T) { var got api.PodList // TODO: a sorted filter function could be applied such implied // ordering on the returned list doesn't matter. - err := helper.List(context.TODO(), key, storage.Everything, &got) + err := helper.List(context.TODO(), key, 0, storage.Everything, &got) if err != nil { t.Errorf("Unexpected error %v", err) } @@ -174,7 +174,7 @@ func TestListFiltered(t *testing.T) { } var got api.PodList - err := helper.List(context.TODO(), key, filter, &got) + err := helper.List(context.TODO(), key, 0, filter, &got) if err != nil { t.Errorf("Unexpected error %v", err) } @@ -224,7 +224,7 @@ func TestListAcrossDirectories(t *testing.T) { list.Items[2] = *returnedObj var got api.PodList - err := roothelper.List(context.TODO(), rootkey, storage.Everything, &got) + err := roothelper.List(context.TODO(), rootkey, 0, storage.Everything, &got) if err != nil { t.Errorf("Unexpected error %v", err) } diff --git a/pkg/storage/interfaces.go b/pkg/storage/interfaces.go index c55f2687fbe..79b5fa67cca 100644 --- a/pkg/storage/interfaces.go +++ b/pkg/storage/interfaces.go @@ -116,7 +116,9 @@ type Interface interface { // List unmarshalls jsons found at directory defined by key and opaque them // into *List api object (an object that satisfies runtime.IsList definition). - List(ctx context.Context, key string, filter FilterFunc, listObj runtime.Object) error + // The returned contents may be delayed, but it is guaranteed that they will + // be have at least 'resourceVersion'. + List(ctx context.Context, key string, resourceVersion uint64, filter FilterFunc, listObj runtime.Object) error // GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType') // retrying the update until success if there is index conflict.