From c9246dc1300910319d96acd5d7197e67721925ad Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Wed, 25 Jun 2014 16:23:15 -0700 Subject: [PATCH] Implement required sync changes everywhere. Make requests sync by default, so that usage of them doesn't have to change. --- pkg/apiserver/apiserver_test.go | 6 +- pkg/apiserver/operation_test.go | 12 ++-- pkg/client/request.go | 25 ++++++-- pkg/client/request_test.go | 25 ++++++-- pkg/registry/controller_registry.go | 40 ++++++++++-- pkg/registry/minion_registry.go | 28 +++++++- pkg/registry/minion_registry_test.go | 18 +++++- pkg/registry/pod_registry.go | 44 +++++++++---- pkg/registry/service_registry.go | 92 +++++++++++++++++---------- pkg/registry/service_registry_test.go | 15 +++-- 10 files changed, 225 insertions(+), 80 deletions(-) diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index 45a597b3346..7ed959b67ab 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -338,7 +338,7 @@ func TestParseTimeout(t *testing.T) { func TestSyncCreate(t *testing.T) { storage := SimpleRESTStorage{ injectedFunction: func(obj interface{}) (interface{}, error) { - time.Sleep(2 * time.Second) + time.Sleep(200 * time.Millisecond) return obj, nil }, } @@ -375,7 +375,7 @@ func TestSyncCreate(t *testing.T) { func TestSyncCreateTimeout(t *testing.T) { storage := SimpleRESTStorage{ injectedFunction: func(obj interface{}) (interface{}, error) { - time.Sleep(10 * time.Second) + time.Sleep(400 * time.Millisecond) return obj, nil }, } @@ -387,7 +387,7 @@ func TestSyncCreateTimeout(t *testing.T) { simple := Simple{Name: "foo"} data, _ := api.Encode(simple) - request, err := http.NewRequest("POST", server.URL+"/prefix/version/foo?sync=true&timeout=2s", bytes.NewBuffer(data)) + request, err := http.NewRequest("POST", server.URL+"/prefix/version/foo?sync=true&timeout=200ms", bytes.NewBuffer(data)) expectNoError(t, err) wg := sync.WaitGroup{} wg.Add(1) diff --git a/pkg/apiserver/operation_test.go b/pkg/apiserver/operation_test.go index df3e64b4aca..a5dba31ca34 100644 --- a/pkg/apiserver/operation_test.go +++ b/pkg/apiserver/operation_test.go @@ -27,7 +27,7 @@ func TestOperation(t *testing.T) { c := make(chan interface{}) op := ops.NewOperation(c) go func() { - time.Sleep(5 * time.Second) + time.Sleep(500 * time.Millisecond) c <- "All done" }() @@ -39,26 +39,26 @@ func TestOperation(t *testing.T) { t.Errorf("expire incorrectly removed the operation %#v", ops) } - op.WaitFor(time.Second) + op.WaitFor(10 * time.Millisecond) if _, completed := op.Describe(); completed { t.Errorf("Unexpectedly fast completion") } - op.WaitFor(5 * time.Second) + op.WaitFor(time.Second) if _, completed := op.Describe(); !completed { t.Errorf("Unexpectedly slow completion") } - time.Sleep(900 * time.Millisecond) + time.Sleep(100 * time.Millisecond) if op.expired(time.Now().Add(-time.Second)) { t.Errorf("Should not be expired: %#v", op) } - if !op.expired(time.Now().Add(-800 * time.Millisecond)) { + if !op.expired(time.Now().Add(-80 * time.Millisecond)) { t.Errorf("Should be expired: %#v", op) } - ops.expire(800 * time.Millisecond) + ops.expire(80 * time.Millisecond) if tmp := ops.Get(op.ID); tmp != nil { t.Errorf("expire failed to remove the operation %#v", ops) } diff --git a/pkg/client/request.go b/pkg/client/request.go index fde4d914e08..ea643421433 100644 --- a/pkg/client/request.go +++ b/pkg/client/request.go @@ -43,9 +43,11 @@ import ( // Begin a request with a verb (GET, POST, PUT, DELETE) func (c *Client) Verb(verb string) *Request { return &Request{ - verb: verb, - c: c, - path: "/api/v1beta1", + verb: verb, + c: c, + path: "/api/v1beta1", + sync: true, + timeout: 10 * time.Second, } } @@ -80,6 +82,7 @@ type Request struct { body io.Reader selector labels.Selector timeout time.Duration + sync bool } // Append an item to the request path. You must call Path at least once. @@ -91,6 +94,15 @@ func (r *Request) Path(item string) *Request { return r } +// Set sync/async call status. +func (r *Request) Sync(sync bool) *Request { + if r.err != nil { + return r + } + r.sync = sync + return r +} + // Overwrite an existing path with the path parameter. func (r *Request) AbsPath(path string) *Request { if r.err != nil { @@ -168,8 +180,11 @@ func (r *Request) Do() Result { if r.selector != nil { query.Add("labels", r.selector.String()) } - if r.timeout != 0 { - query.Add("timeout", r.timeout.String()) + if r.sync { + query.Add("sync", "true") + if r.timeout != 0 { + query.Add("timeout", r.timeout.String()) + } } finalUrl += "?" + query.Encode() req, err := http.NewRequest(r.verb, finalUrl, r.body) diff --git a/pkg/client/request_test.go b/pkg/client/request_test.go index 41c8ae62e32..c6ec542c240 100644 --- a/pkg/client/request_test.go +++ b/pkg/client/request_test.go @@ -58,7 +58,7 @@ func TestDoRequestNewWay(t *testing.T) { t.Errorf("Expected: %#v, got %#v", expectedObj, obj) } fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz", "POST", &reqBody) - if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo&timeout=1s" { + if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo&sync=true&timeout=1s" { t.Errorf("Unexpected query: %v", fakeHandler.RequestReceived.URL.RawQuery) } if fakeHandler.RequestReceived.Header["Authorization"] == nil { @@ -83,6 +83,7 @@ func TestDoRequestNewWayReader(t *testing.T) { Path("foo/bar"). Path("baz"). Selector(labels.Set{"name": "foo"}.AsSelector()). + Sync(false). Timeout(time.Second). Body(bytes.NewBuffer(reqBodyExpected)). Do().Get() @@ -97,7 +98,7 @@ func TestDoRequestNewWayReader(t *testing.T) { } tmpStr := string(reqBodyExpected) fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz", "POST", &tmpStr) - if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo&timeout=1s" { + if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo" { t.Errorf("Unexpected query: %v", fakeHandler.RequestReceived.URL.RawQuery) } if fakeHandler.RequestReceived.Header["Authorization"] == nil { @@ -136,7 +137,7 @@ func TestDoRequestNewWayObj(t *testing.T) { } tmpStr := string(reqBodyExpected) fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz", "POST", &tmpStr) - if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo&timeout=1s" { + if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo&sync=true&timeout=1s" { t.Errorf("Unexpected query: %v", fakeHandler.RequestReceived.URL.RawQuery) } if fakeHandler.RequestReceived.Header["Authorization"] == nil { @@ -181,7 +182,7 @@ func TestDoRequestNewWayFile(t *testing.T) { } tmpStr := string(reqBodyExpected) fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz", "POST", &tmpStr) - if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo&timeout=1s" { + if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo&sync=true&timeout=1s" { t.Errorf("Unexpected query: %v", fakeHandler.RequestReceived.URL.RawQuery) } if fakeHandler.RequestReceived.Header["Authorization"] == nil { @@ -213,3 +214,19 @@ func TestAbsPath(t *testing.T) { t.Errorf("unexpected path: %s, expected %s", r.path, expectedPath) } } + +func TestSync(t *testing.T) { + c := New("", nil) + r := c.Get() + if !r.sync { + t.Errorf("sync has wrong default") + } + r.Sync(false) + if r.sync { + t.Errorf("'Sync' doesn't work") + } + r.Sync(true) + if !r.sync { + t.Errorf("'Sync' doesn't work") + } +} diff --git a/pkg/registry/controller_registry.go b/pkg/registry/controller_registry.go index bf7722e83b3..64fac8258d1 100644 --- a/pkg/registry/controller_registry.go +++ b/pkg/registry/controller_registry.go @@ -17,6 +17,8 @@ limitations under the License. package registry import ( + "fmt" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" @@ -55,7 +57,9 @@ func (storage *ControllerRegistryStorage) Get(id string) (interface{}, error) { } func (storage *ControllerRegistryStorage) Delete(id string) (<-chan interface{}, error) { - return apiserver.MakeAsync(func() interface{} { return api.Status{Status: api.StatusSuccess} }), storage.registry.DeleteController(id) + return apiserver.MakeAsync(func() (interface{}, error) { + return api.Status{Status: api.StatusSuccess}, storage.registry.DeleteController(id) + }), nil } func (storage *ControllerRegistryStorage) Extract(body []byte) (interface{}, error) { @@ -64,10 +68,36 @@ func (storage *ControllerRegistryStorage) Extract(body []byte) (interface{}, err return result, err } -func (storage *ControllerRegistryStorage) Create(controller interface{}) (<-chan interface{}, error) { - return apiserver.MakeAsync(func() interface{} { return controller }), storage.registry.CreateController(controller.(api.ReplicationController)) +func (storage *ControllerRegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { + controller, ok := obj.(api.ReplicationController) + if !ok { + return nil, fmt.Errorf("not a replication controller: %#v", obj) + } + if controller.ID == "" { + return nil, fmt.Errorf("ID should not be empty: %#v", controller) + } + return apiserver.MakeAsync(func() (interface{}, error) { + err := storage.registry.CreateController(controller) + if err != nil { + return nil, err + } + return storage.registry.GetController(controller.ID) + }), nil } -func (storage *ControllerRegistryStorage) Update(controller interface{}) (<-chan interface{}, error) { - return apiserver.MakeAsync(func() interface{} { return controller }), storage.registry.UpdateController(controller.(api.ReplicationController)) +func (storage *ControllerRegistryStorage) Update(obj interface{}) (<-chan interface{}, error) { + controller, ok := obj.(api.ReplicationController) + if !ok { + return nil, fmt.Errorf("not a replication controller: %#v", obj) + } + if controller.ID == "" { + return nil, fmt.Errorf("ID should not be empty: %#v", controller) + } + return apiserver.MakeAsync(func() (interface{}, error) { + err := storage.registry.UpdateController(controller) + if err != nil { + return nil, err + } + return storage.registry.GetController(controller.ID) + }), nil } diff --git a/pkg/registry/minion_registry.go b/pkg/registry/minion_registry.go index e5b6c0c131b..dbe7fc5c994 100644 --- a/pkg/registry/minion_registry.go +++ b/pkg/registry/minion_registry.go @@ -140,8 +140,28 @@ func (storage *MinionRegistryStorage) Extract(body []byte) (interface{}, error) return minion, err } -func (storage *MinionRegistryStorage) Create(minion interface{}) (<-chan interface{}, error) { - return apiserver.MakeAsync(func() interface{} { return minion }), storage.registry.Insert(minion.(api.Minion).ID) +func (storage *MinionRegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { + minion, ok := obj.(api.Minion) + if !ok { + return nil, fmt.Errorf("not a minion: %#v", obj) + } + if minion.ID == "" { + return nil, fmt.Errorf("ID should not be empty: %#v", minion) + } + return apiserver.MakeAsync(func() (interface{}, error) { + err := storage.registry.Insert(minion.ID) + if err != nil { + return nil, err + } + contains, err := storage.registry.Contains(minion.ID) + if err != nil { + return nil, err + } + if contains { + return storage.toApiMinion(minion.ID), nil + } + return nil, fmt.Errorf("unable to add minion %#v", minion) + }), nil } func (storage *MinionRegistryStorage) Update(minion interface{}) (<-chan interface{}, error) { @@ -156,5 +176,7 @@ func (storage *MinionRegistryStorage) Delete(id string) (<-chan interface{}, err if err != nil { return nil, err } - return apiserver.MakeAsync(func() interface{} { return api.Status{Status: api.StatusSuccess} }), storage.registry.Delete(id) + return apiserver.MakeAsync(func() (interface{}, error) { + return api.Status{Status: api.StatusSuccess}, storage.registry.Delete(id) + }), nil } diff --git a/pkg/registry/minion_registry_test.go b/pkg/registry/minion_registry_test.go index ca12a0be66b..6c6db1b30f7 100644 --- a/pkg/registry/minion_registry_test.go +++ b/pkg/registry/minion_registry_test.go @@ -73,20 +73,32 @@ func TestMinionRegistryStorage(t *testing.T) { t.Errorf("has unexpected object") } - if _, err := ms.Create(api.Minion{JSONBase: api.JSONBase{ID: "baz"}}); err != nil { + c, err := ms.Create(api.Minion{JSONBase: api.JSONBase{ID: "baz"}}) + if err != nil { t.Errorf("insert failed") } + obj := <-c + if m, ok := obj.(api.Minion); !ok || m.ID != "baz" { + t.Errorf("insert return value was weird: %#v", obj) + } if obj, err := ms.Get("baz"); err != nil || obj.(api.Minion).ID != "baz" { t.Errorf("insert didn't actually insert") } - if _, err := ms.Delete("bar"); err != nil { + c, err = ms.Delete("bar") + if err != nil { t.Errorf("delete failed") } + obj = <-c + if s, ok := obj.(api.Status); !ok || s.Status != api.StatusSuccess { + t.Errorf("delete return value was weird: %#v", obj) + } if _, err := ms.Get("bar"); err != ErrDoesNotExist { t.Errorf("delete didn't actually delete") } - if _, err := ms.Delete("bar"); err != ErrDoesNotExist { + + _, err = ms.Delete("bar") + if err != ErrDoesNotExist { t.Errorf("delete returned wrong error") } diff --git a/pkg/registry/pod_registry.go b/pkg/registry/pod_registry.go index 06b0c4ae79e..708dfb24ac7 100644 --- a/pkg/registry/pod_registry.go +++ b/pkg/registry/pod_registry.go @@ -131,7 +131,9 @@ func (storage *PodRegistryStorage) Get(id string) (interface{}, error) { } func (storage *PodRegistryStorage) Delete(id string) (<-chan interface{}, error) { - return apiserver.MakeAsync(func() interface{} { return api.Status{Status: api.StatusSuccess} }), storage.registry.DeletePod(id) + return apiserver.MakeAsync(func() (interface{}, error) { + return api.Status{Status: api.StatusSuccess}, storage.registry.DeletePod(id) + }), nil } func (storage *PodRegistryStorage) Extract(body []byte) (interface{}, error) { @@ -140,19 +142,37 @@ func (storage *PodRegistryStorage) Extract(body []byte) (interface{}, error) { return pod, err } -func (storage *PodRegistryStorage) Create(pod interface{}) (<-chan interface{}, error) { - podObj := pod.(api.Pod) - if len(podObj.ID) == 0 { +func (storage *PodRegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { + pod := obj.(api.Pod) + if len(pod.ID) == 0 { return nil, fmt.Errorf("id is unspecified: %#v", pod) } - machine, err := storage.scheduler.Schedule(podObj) - if err != nil { - return nil, err + + return apiserver.MakeAsync(func() (interface{}, error) { + // TODO(lavalamp): Separate scheduler more cleanly. + machine, err := storage.scheduler.Schedule(pod) + if err != nil { + return nil, err + } + err = storage.registry.CreatePod(machine, pod) + if err != nil { + return nil, err + } + return storage.registry.GetPod(pod.ID) + }), nil +} + +func (storage *PodRegistryStorage) Update(obj interface{}) (<-chan interface{}, error) { + pod := obj.(api.Pod) + if len(pod.ID) == 0 { + return nil, fmt.Errorf("id is unspecified: %#v", pod) } - return apiserver.MakeAsync(func() interface{} { return pod }), storage.registry.CreatePod(machine, podObj) -} - -func (storage *PodRegistryStorage) Update(pod interface{}) (<-chan interface{}, error) { - return apiserver.MakeAsync(func() interface{} { return pod }), storage.registry.UpdatePod(pod.(api.Pod)) + return apiserver.MakeAsync(func() (interface{}, error) { + err := storage.registry.UpdatePod(pod) + if err != nil { + return nil, err + } + return storage.registry.GetPod(pod.ID) + }), nil } diff --git a/pkg/registry/service_registry.go b/pkg/registry/service_registry.go index 419719869d7..89455a85c50 100644 --- a/pkg/registry/service_registry.go +++ b/pkg/registry/service_registry.go @@ -82,24 +82,26 @@ func (sr *ServiceRegistryStorage) Get(id string) (interface{}, error) { } func (sr *ServiceRegistryStorage) Delete(id string) (<-chan interface{}, error) { - svc, err := sr.Get(id) + service, err := sr.registry.GetService(id) if err != nil { return nil, err } - if svc.(*api.Service).CreateExternalLoadBalancer { - var balancer cloudprovider.TCPLoadBalancer - var ok bool - if sr.cloud != nil { - balancer, ok = sr.cloud.TCPLoadBalancer() - } - if ok && balancer != nil { - err = balancer.DeleteTCPLoadBalancer(id, "us-central1") - if err != nil { - return nil, err + return apiserver.MakeAsync(func() (interface{}, error) { + if service.CreateExternalLoadBalancer { + var balancer cloudprovider.TCPLoadBalancer + var ok bool + if sr.cloud != nil { + balancer, ok = sr.cloud.TCPLoadBalancer() + } + if ok && balancer != nil { + err = balancer.DeleteTCPLoadBalancer(id, "us-central1") + if err != nil { + return nil, err + } } } - } - return apiserver.MakeAsync(func() interface{} { return api.Status{Status: api.StatusSuccess} }), sr.registry.DeleteService(id) + return api.Status{Status: api.StatusSuccess}, sr.registry.DeleteService(id) + }), nil } func (sr *ServiceRegistryStorage) Extract(body []byte) (interface{}, error) { @@ -110,29 +112,51 @@ func (sr *ServiceRegistryStorage) Extract(body []byte) (interface{}, error) { func (sr *ServiceRegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { srv := obj.(api.Service) - if srv.CreateExternalLoadBalancer { - var balancer cloudprovider.TCPLoadBalancer - var ok bool - if sr.cloud != nil { - balancer, ok = sr.cloud.TCPLoadBalancer() - } - if ok && balancer != nil { - hosts, err := sr.machines.List() - if err != nil { - return nil, err - } - err = balancer.CreateTCPLoadBalancer(srv.ID, "us-central1", srv.Port, hosts) - if err != nil { - return nil, err - } - } else { - return nil, fmt.Errorf("requested an external service, but no cloud provider supplied.") - } + if srv.ID == "" { + return nil, fmt.Errorf("ID should not be empty: %#v", srv) } - // TODO actually wait for the object to be fully created here. - return apiserver.MakeAsync(func() interface{} { return obj }), sr.registry.CreateService(srv) + return apiserver.MakeAsync(func() (interface{}, error) { + // TODO: Consider moving this to a rectification loop, so that we make/remove external load balancers + // correctly no matter what http operations happen. + if srv.CreateExternalLoadBalancer { + var balancer cloudprovider.TCPLoadBalancer + var ok bool + if sr.cloud != nil { + balancer, ok = sr.cloud.TCPLoadBalancer() + } + if ok && balancer != nil { + hosts, err := sr.machines.List() + if err != nil { + return nil, err + } + err = balancer.CreateTCPLoadBalancer(srv.ID, "us-central1", srv.Port, hosts) + if err != nil { + return nil, err + } + } else { + return nil, fmt.Errorf("requested an external service, but no cloud provider supplied.") + } + } + // TODO actually wait for the object to be fully created here. + err := sr.registry.CreateService(srv) + if err != nil { + return nil, err + } + return sr.registry.GetService(srv.ID) + }), nil } func (sr *ServiceRegistryStorage) Update(obj interface{}) (<-chan interface{}, error) { - return apiserver.MakeAsync(func() interface{} { return obj }), sr.registry.UpdateService(obj.(api.Service)) + srv := obj.(api.Service) + if srv.ID == "" { + return nil, fmt.Errorf("ID should not be empty: %#v", srv) + } + return apiserver.MakeAsync(func() (interface{}, error) { + // TODO: check to see if external load balancer status changed + err := sr.registry.UpdateService(srv) + if err != nil { + return nil, err + } + return sr.registry.GetService(srv.ID) + }), nil } diff --git a/pkg/registry/service_registry_test.go b/pkg/registry/service_registry_test.go index 109a9b7f7c6..023101aae2f 100644 --- a/pkg/registry/service_registry_test.go +++ b/pkg/registry/service_registry_test.go @@ -34,7 +34,8 @@ func TestServiceRegistry(t *testing.T) { svc := api.Service{ JSONBase: api.JSONBase{ID: "foo"}, } - storage.Create(svc) + c, _ := storage.Create(svc) + <-c if len(fakeCloud.Calls) != 0 { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) @@ -57,7 +58,8 @@ func TestServiceRegistryExternalService(t *testing.T) { JSONBase: api.JSONBase{ID: "foo"}, CreateExternalLoadBalancer: true, } - storage.Create(svc) + c, _ := storage.Create(svc) + <-c if len(fakeCloud.Calls) != 1 || fakeCloud.Calls[0] != "create" { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) @@ -82,7 +84,8 @@ func TestServiceRegistryExternalServiceError(t *testing.T) { JSONBase: api.JSONBase{ID: "foo"}, CreateExternalLoadBalancer: true, } - storage.Create(svc) + c, _ := storage.Create(svc) + <-c if len(fakeCloud.Calls) != 1 || fakeCloud.Calls[0] != "create" { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) @@ -106,7 +109,8 @@ func TestServiceRegistryDelete(t *testing.T) { } memory.CreateService(svc) - storage.Delete(svc.ID) + c, _ := storage.Delete(svc.ID) + <-c if len(fakeCloud.Calls) != 0 { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) @@ -131,7 +135,8 @@ func TestServiceRegistryDeleteExternal(t *testing.T) { } memory.CreateService(svc) - storage.Delete(svc.ID) + c, _ := storage.Delete(svc.ID) + <-c if len(fakeCloud.Calls) != 1 || fakeCloud.Calls[0] != "delete" { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)