diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index ea58302a0bc..a85bf114f29 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -25,18 +25,29 @@ import ( "net/url" "runtime/debug" "strings" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) // RESTStorage is a generic interface for RESTful storage services type RESTStorage interface { List(labels.Query) (interface{}, error) Get(id string) (interface{}, error) - Delete(id string) error + Delete(id string) (<-chan interface{}, error) Extract(body string) (interface{}, error) - Create(interface{}) error - Update(interface{}) error + Create(interface{}) (<-chan interface{}, error) + Update(interface{}) (<-chan interface{}, error) +} + +func MakeAsync(fn func() interface{}) <-chan interface{} { + channel := make(chan interface{}, 1) + go func() { + defer util.HandleCrash() + channel <- fn() + }() + return channel } // Status is a return value for calls that don't return other objects @@ -135,6 +146,17 @@ func (server *ApiServer) readBody(req *http.Request) (string, error) { return string(body), err } +func (server *ApiServer) waitForObject(out <-chan interface{}, timeout time.Duration) (interface{}, error) { + tick := time.After(timeout) + var obj interface{} + select { + case obj = <-out: + return obj, nil + case <-tick: + return nil, fmt.Errorf("Timed out waiting for synchronization.") + } +} + // handleREST is the main dispatcher for the server. It switches on the HTTP method, and then // on path length, according to the following table: // Method Path Action @@ -144,7 +166,17 @@ func (server *ApiServer) readBody(req *http.Request) (string, error) { // PUT /foo/bar update 'bar' // DELETE /foo/bar delete 'bar' // Returns 404 if the method/pattern doesn't match one of these entries +// The server accepts several query parameters: +// sync=[false|true] Synchronous request (only applies to create, update, delete operations) +// timeout= Timeout for synchronous requests, only applies if sync=true +// labels= Used for filtering list operations func (server *ApiServer) handleREST(parts []string, requestUrl *url.URL, req *http.Request, w http.ResponseWriter, storage RESTStorage) { + sync := requestUrl.Query().Get("sync") == "true" + timeout, err := time.ParseDuration(requestUrl.Query().Get("timeout")) + if err != nil && len(requestUrl.Query().Get("timeout")) > 0 { + log.Printf("Failed to parse: %#v '%s'", err, requestUrl.Query().Get("timeout")) + timeout = time.Second * 30 + } switch req.Method { case "GET": switch len(parts) { @@ -159,7 +191,7 @@ func (server *ApiServer) handleREST(parts []string, requestUrl *url.URL, req *ht server.error(err, w) return } - server.write(200, controllers, w) + server.write(http.StatusOK, controllers, w) case 2: item, err := storage.Get(parts[1]) if err != nil { @@ -190,24 +222,44 @@ func (server *ApiServer) handleREST(parts []string, requestUrl *url.URL, req *ht server.error(err, w) return } - err = storage.Create(obj) + out, err := storage.Create(obj) + if err == nil && sync { + obj, err = server.waitForObject(out, timeout) + } if err != nil { server.error(err, w) - } else { - server.write(200, obj, w) + return } + var statusCode int + if sync { + statusCode = http.StatusOK + } else { + statusCode = http.StatusAccepted + } + server.write(statusCode, obj, w) return case "DELETE": if len(parts) != 2 { server.notFound(req, w) return } - err := storage.Delete(parts[1]) + out, err := storage.Delete(parts[1]) + var obj interface{} + obj = Status{Success: true} + if err == nil && sync { + obj, err = server.waitForObject(out, timeout) + } if err != nil { server.error(err, w) return } - server.write(200, Status{Success: true}, w) + var statusCode int + if sync { + statusCode = http.StatusOK + } else { + statusCode = http.StatusAccepted + } + server.write(statusCode, obj, w) return case "PUT": if len(parts) != 2 { @@ -223,12 +275,21 @@ func (server *ApiServer) handleREST(parts []string, requestUrl *url.URL, req *ht server.error(err, w) return } - err = storage.Update(obj) + out, err := storage.Update(obj) + if err == nil && sync { + obj, err = server.waitForObject(out, timeout) + } if err != nil { server.error(err, w) return } - server.write(200, obj, w) + var statusCode int + if sync { + statusCode = http.StatusOK + } else { + statusCode = http.StatusAccepted + } + server.write(statusCode, obj, w) return default: server.notFound(req, w) diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index 41197d5b7e3..a89358a6deb 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -23,6 +23,7 @@ import ( "net/http" "net/http/httptest" "reflect" + "sync" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" @@ -49,6 +50,7 @@ type SimpleRESTStorage struct { item Simple deleted string updated Simple + channel <-chan interface{} } func (storage *SimpleRESTStorage) List(labels.Query) (interface{}, error) { @@ -62,9 +64,9 @@ func (storage *SimpleRESTStorage) Get(id string) (interface{}, error) { return storage.item, storage.err } -func (storage *SimpleRESTStorage) Delete(id string) error { +func (storage *SimpleRESTStorage) Delete(id string) (<-chan interface{}, error) { storage.deleted = id - return storage.err + return storage.channel, storage.err } func (storage *SimpleRESTStorage) Extract(body string) (interface{}, error) { @@ -73,13 +75,13 @@ func (storage *SimpleRESTStorage) Extract(body string) (interface{}, error) { return item, storage.err } -func (storage *SimpleRESTStorage) Create(interface{}) error { - return storage.err +func (storage *SimpleRESTStorage) Create(interface{}) (<-chan interface{}, error) { + return storage.channel, storage.err } -func (storage *SimpleRESTStorage) Update(object interface{}) error { +func (storage *SimpleRESTStorage) Update(object interface{}) (<-chan interface{}, error) { storage.updated = object.(Simple) - return storage.err + return storage.channel, storage.err } func extractBody(response *http.Response, object interface{}) (string, error) { @@ -270,7 +272,7 @@ func TestCreate(t *testing.T) { expectNoError(t, err) response, err := client.Do(request) expectNoError(t, err) - if response.StatusCode != 200 { + if response.StatusCode != http.StatusAccepted { t.Errorf("Unexpected response %#v", response) } @@ -281,3 +283,58 @@ func TestCreate(t *testing.T) { t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simple, string(body)) } } + +func TestSyncCreate(t *testing.T) { + channel := make(chan interface{}, 1) + storage := SimpleRESTStorage{ + channel: channel, + } + handler := New(map[string]RESTStorage{ + "foo": &storage, + }, "/prefix/version") + server := httptest.NewServer(handler) + client := http.Client{} + + simple := Simple{Name: "foo"} + data, _ := json.Marshal(simple) + request, err := http.NewRequest("POST", server.URL+"/prefix/version/foo?sync=true", bytes.NewBuffer(data)) + expectNoError(t, err) + wg := sync.WaitGroup{} + wg.Add(1) + var response *http.Response + go func() { + response, err = client.Do(request) + expectNoError(t, err) + if response.StatusCode != 200 { + t.Errorf("Unexpected response %#v", response) + } + wg.Done() + }() + output := Simple{Name: "bar"} + channel <- output + wg.Wait() + var itemOut Simple + body, err := extractBody(response, &itemOut) + expectNoError(t, err) + if !reflect.DeepEqual(itemOut, output) { + t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simple, string(body)) + } +} + +func TestSyncCreateTimeout(t *testing.T) { + handler := New(map[string]RESTStorage{ + "foo": &SimpleRESTStorage{}, + }, "/prefix/version") + server := httptest.NewServer(handler) + client := http.Client{} + + simple := Simple{Name: "foo"} + data, _ := json.Marshal(simple) + request, err := http.NewRequest("POST", server.URL+"/prefix/version/foo?sync=true&timeout=1us", bytes.NewBuffer(data)) + expectNoError(t, err) + response, err := client.Do(request) + expectNoError(t, err) + if response.StatusCode != 500 { + t.Errorf("Unexpected response %#v", response) + } +} diff --git a/pkg/client/client.go b/pkg/client/client.go index 90300589d11..e7df7f8d817 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -95,7 +95,7 @@ func (client Client) rawRequest(method, path string, requestBody io.Reader, targ if err != nil { return body, err } - if response.StatusCode != 200 { + if response.StatusCode < http.StatusOK || response.StatusCode > http.StatusPartialContent { return nil, fmt.Errorf("request [%s %s] failed (%d) %s: %s", method, client.makeURL(path), response.StatusCode, response.Status, string(body)) } if target != nil { diff --git a/pkg/registry/controller_registry.go b/pkg/registry/controller_registry.go index 1824ba965a2..76034b95675 100644 --- a/pkg/registry/controller_registry.go +++ b/pkg/registry/controller_registry.go @@ -57,8 +57,8 @@ func (storage *ControllerRegistryStorage) Get(id string) (interface{}, error) { return controller, err } -func (storage *ControllerRegistryStorage) Delete(id string) error { - return storage.registry.DeleteController(id) +func (storage *ControllerRegistryStorage) Delete(id string) (<-chan interface{}, error) { + return apiserver.MakeAsync(func() interface{} { return apiserver.Status{Success: true} }), storage.registry.DeleteController(id) } func (storage *ControllerRegistryStorage) Extract(body string) (interface{}, error) { @@ -68,10 +68,10 @@ func (storage *ControllerRegistryStorage) Extract(body string) (interface{}, err return result, err } -func (storage *ControllerRegistryStorage) Create(controller interface{}) error { - return storage.registry.CreateController(controller.(api.ReplicationController)) +func (storage *ControllerRegistryStorage) Create(controller interface{}) (<-chan interface{}, error) { + return apiserver.MakeAsync(func() interface{} { return controller }), storage.registry.CreateController(controller.(api.ReplicationController)) } -func (storage *ControllerRegistryStorage) Update(controller interface{}) error { - return storage.registry.UpdateController(controller.(api.ReplicationController)) +func (storage *ControllerRegistryStorage) Update(controller interface{}) (<-chan interface{}, error) { + return apiserver.MakeAsync(func() interface{} { return controller }), storage.registry.UpdateController(controller.(api.ReplicationController)) } diff --git a/pkg/registry/pod_registry.go b/pkg/registry/pod_registry.go index 272238c6894..f3ea52a8277 100644 --- a/pkg/registry/pod_registry.go +++ b/pkg/registry/pod_registry.go @@ -110,8 +110,8 @@ func (storage *PodRegistryStorage) Get(id string) (interface{}, error) { return pod, err } -func (storage *PodRegistryStorage) Delete(id string) error { - return storage.registry.DeletePod(id) +func (storage *PodRegistryStorage) Delete(id string) (<-chan interface{}, error) { + return apiserver.MakeAsync(func() interface{} { return apiserver.Status{Success: true} }), storage.registry.DeletePod(id) } func (storage *PodRegistryStorage) Extract(body string) (interface{}, error) { @@ -121,18 +121,19 @@ func (storage *PodRegistryStorage) Extract(body string) (interface{}, error) { return pod, err } -func (storage *PodRegistryStorage) Create(pod interface{}) error { +func (storage *PodRegistryStorage) Create(pod interface{}) (<-chan interface{}, error) { podObj := pod.(api.Pod) if len(podObj.ID) == 0 { - return fmt.Errorf("id is unspecified: %#v", pod) + return nil, fmt.Errorf("id is unspecified: %#v", pod) } machine, err := storage.scheduler.Schedule(podObj) if err != nil { - return err + return nil, err } - return storage.registry.CreatePod(machine, podObj) + + return apiserver.MakeAsync(func() interface{} { return pod }), storage.registry.CreatePod(machine, podObj) } -func (storage *PodRegistryStorage) Update(pod interface{}) error { - return storage.registry.UpdatePod(pod.(api.Pod)) +func (storage *PodRegistryStorage) Update(pod interface{}) (<-chan interface{}, error) { + return apiserver.MakeAsync(func() interface{} { return pod }), storage.registry.UpdatePod(pod.(api.Pod)) } diff --git a/pkg/registry/service_registry.go b/pkg/registry/service_registry.go index 128e3b571b5..9b936841a7f 100644 --- a/pkg/registry/service_registry.go +++ b/pkg/registry/service_registry.go @@ -84,10 +84,10 @@ func (sr *ServiceRegistryStorage) Get(id string) (interface{}, error) { return service, err } -func (sr *ServiceRegistryStorage) Delete(id string) error { +func (sr *ServiceRegistryStorage) Delete(id string) (<-chan interface{}, error) { svc, err := sr.Get(id) if err != nil { - return err + return nil, err } if svc.(*api.Service).CreateExternalLoadBalancer { var balancer cloudprovider.TCPLoadBalancer @@ -98,11 +98,11 @@ func (sr *ServiceRegistryStorage) Delete(id string) error { if ok && balancer != nil { err = balancer.DeleteTCPLoadBalancer(id, "us-central1") if err != nil { - return err + return nil, err } } } - return sr.registry.DeleteService(id) + return apiserver.MakeAsync(func() interface{} { return apiserver.Status{Success: true} }), sr.registry.DeleteService(id) } func (sr *ServiceRegistryStorage) Extract(body string) (interface{}, error) { @@ -112,7 +112,7 @@ func (sr *ServiceRegistryStorage) Extract(body string) (interface{}, error) { return svc, err } -func (sr *ServiceRegistryStorage) Create(obj interface{}) error { +func (sr *ServiceRegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { srv := obj.(api.Service) if srv.CreateExternalLoadBalancer { var balancer cloudprovider.TCPLoadBalancer @@ -123,15 +123,16 @@ func (sr *ServiceRegistryStorage) Create(obj interface{}) error { if ok && balancer != nil { err := balancer.CreateTCPLoadBalancer(srv.ID, "us-central1", srv.Port, sr.hosts) if err != nil { - return err + return nil, err } } else { - return fmt.Errorf("requested an external service, but no cloud provider supplied.") + return nil, fmt.Errorf("requested an external service, but no cloud provider supplied.") } } - return sr.registry.CreateService(srv) + // TODO actually wait for the object to be fully created here. + return apiserver.MakeAsync(func() interface{} { return obj }), sr.registry.CreateService(srv) } -func (sr *ServiceRegistryStorage) Update(obj interface{}) error { - return sr.registry.UpdateService(obj.(api.Service)) +func (sr *ServiceRegistryStorage) Update(obj interface{}) (<-chan interface{}, error) { + return apiserver.MakeAsync(func() interface{} { return obj }), sr.registry.UpdateService(obj.(api.Service)) }