diff --git a/pkg/api/helper.go b/pkg/api/helper.go index d4bd0425db0..cd882110a5e 100644 --- a/pkg/api/helper.go +++ b/pkg/api/helper.go @@ -37,6 +37,8 @@ func init() { MinionList{}, Minion{}, Status{}, + ServerOpList{}, + ServerOp{}, ) } diff --git a/pkg/api/types.go b/pkg/api/types.go index e9f480385ff..ed18f287678 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -207,3 +207,14 @@ const ( StatusFailure = "failure" StatusWorking = "working" ) + +// Operation information, as delivered to API clients. +type ServerOp struct { + JSONBase `yaml:",inline" json:",inline"` +} + +// Operation list, as delivered to API clients. +type ServerOpList struct { + JSONBase `yaml:",inline" json:",inline"` + Items []ServerOp `yaml:"items,omitempty" json:"items,omitempty"` +} diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 5507d6f0fe7..cabe6541b07 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -41,11 +41,30 @@ type RESTStorage interface { Update(interface{}) (<-chan interface{}, error) } -func MakeAsync(fn func() interface{}) <-chan interface{} { - channel := make(chan interface{}, 1) +// WorkFunc is used to perform any time consuming work for an api call, after +// the input has been validated. Pass one of these to MakeAsync to create an +// appropriate return value for the Update, Delete, and Create methods. +type WorkFunc func() (result interface{}, err error) + +// MakeAsync takes a function and executes it, delivering the result in the way required +// by RESTStorage's Update, Delete, and Create methods. +func MakeAsync(fn WorkFunc) <-chan interface{} { + channel := make(chan interface{}) go func() { defer util.HandleCrash() - channel <- fn() + obj, err := fn() + if err != nil { + channel <- &api.Status{ + Status: api.StatusFailure, + Details: err.Error(), + } + } else { + channel <- obj + } + // 'close' is used to signal that no further values will + // be written to the channel. Not strictly necessary, but + // also won't hurt. + close(channel) }() return channel } @@ -59,6 +78,7 @@ func MakeAsync(fn func() interface{}) <-chan interface{} { type ApiServer struct { prefix string storage map[string]RESTStorage + ops *Operations } // New creates a new ApiServer object. @@ -68,6 +88,7 @@ func New(storage map[string]RESTStorage, prefix string) *ApiServer { return &ApiServer{ storage: storage, prefix: prefix, + ops: NewOperations(), } } @@ -108,6 +129,10 @@ func (server *ApiServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { server.notFound(req, w) return } + if requestParts[0] == "operations" { + server.handleOperationRequest(requestParts[1:], w, req) + return + } storage := server.storage[requestParts[0]] if storage == nil { logger.Addf("'%v' has no storage object", requestParts[0]) @@ -144,15 +169,30 @@ func (server *ApiServer) readBody(req *http.Request) ([]byte, error) { return body, err } -func (server *ApiServer) waitForObject(out <-chan interface{}, timeout time.Duration) (interface{}, error) { - tick := time.After(timeout) - var obj interface{} - select { - case obj = <-out: - return obj, nil - case <-tick: - return nil, fmt.Errorf("Timed out waiting for synchronization.") +// finishReq finishes up a request, waiting until the operation finishes or, after a timeout, creating an +// Operation to recieve the result and returning its ID down the writer. +func (server *ApiServer) finishReq(out <-chan interface{}, sync bool, timeout time.Duration, w http.ResponseWriter) { + op := server.ops.NewOperation(out) + if sync { + op.WaitFor(timeout) } + obj, complete := op.StatusOrResult() + if complete { + server.write(http.StatusOK, obj, w) + } else { + server.write(http.StatusAccepted, obj, w) + } +} + +func parseTimeout(str string) time.Duration { + if str != "" { + timeout, err := time.ParseDuration(str) + if err == nil { + return timeout + } + glog.Errorf("Failed to parse: %#v '%s'", err, str) + } + return 30 * time.Second } // handleREST is the main dispatcher for the server. It switches on the HTTP method, and then @@ -170,11 +210,7 @@ func (server *ApiServer) waitForObject(out <-chan interface{}, timeout time.Dura // labels= Used for filtering list operations func (server *ApiServer) handleREST(parts []string, requestUrl *url.URL, req *http.Request, w http.ResponseWriter, storage RESTStorage) { sync := requestUrl.Query().Get("sync") == "true" - timeout, err := time.ParseDuration(requestUrl.Query().Get("timeout")) - if err != nil && len(requestUrl.Query().Get("timeout")) > 0 { - glog.Errorf("Failed to parse: %#v '%s'", err, requestUrl.Query().Get("timeout")) - timeout = time.Second * 30 - } + timeout := parseTimeout(requestUrl.Query().Get("timeout")) switch req.Method { case "GET": switch len(parts) { @@ -184,12 +220,12 @@ func (server *ApiServer) handleREST(parts []string, requestUrl *url.URL, req *ht server.error(err, w) return } - controllers, err := storage.List(selector) + list, err := storage.List(selector) if err != nil { server.error(err, w) return } - server.write(http.StatusOK, controllers, w) + server.write(http.StatusOK, list, w) case 2: item, err := storage.Get(parts[1]) if err != nil { @@ -204,7 +240,6 @@ func (server *ApiServer) handleREST(parts []string, requestUrl *url.URL, req *ht default: server.notFound(req, w) } - return case "POST": if len(parts) != 1 { server.notFound(req, w) @@ -221,44 +256,22 @@ func (server *ApiServer) handleREST(parts []string, requestUrl *url.URL, req *ht return } out, err := storage.Create(obj) - if err == nil && sync { - obj, err = server.waitForObject(out, timeout) - } if err != nil { server.error(err, w) return } - var statusCode int - if sync { - statusCode = http.StatusOK - } else { - statusCode = http.StatusAccepted - } - server.write(statusCode, obj, w) - return + server.finishReq(out, sync, timeout, w) case "DELETE": if len(parts) != 2 { server.notFound(req, w) return } out, err := storage.Delete(parts[1]) - var obj interface{} - obj = api.Status{Status: api.StatusSuccess} - if err == nil && sync { - obj, err = server.waitForObject(out, timeout) - } if err != nil { server.error(err, w) return } - var statusCode int - if sync { - statusCode = http.StatusOK - } else { - statusCode = http.StatusAccepted - } - server.write(statusCode, obj, w) - return + server.finishReq(out, sync, timeout, w) case "PUT": if len(parts) != 2 { server.notFound(req, w) @@ -274,22 +287,36 @@ func (server *ApiServer) handleREST(parts []string, requestUrl *url.URL, req *ht return } out, err := storage.Update(obj) - if err == nil && sync { - obj, err = server.waitForObject(out, timeout) - } if err != nil { server.error(err, w) return } - var statusCode int - if sync { - statusCode = http.StatusOK - } else { - statusCode = http.StatusAccepted - } - server.write(statusCode, obj, w) - return + server.finishReq(out, sync, timeout, w) default: server.notFound(req, w) } } + +func (server *ApiServer) handleOperationRequest(parts []string, w http.ResponseWriter, req *http.Request) { + if req.Method != "GET" { + server.notFound(req, w) + } + if len(parts) == 0 { + // List outstanding operations. + list := server.ops.List() + server.write(http.StatusOK, list, w) + return + } + + op := server.ops.Get(parts[0]) + if op == nil { + server.notFound(req, w) + } + + obj, complete := op.StatusOrResult() + if complete { + server.write(http.StatusOK, obj, w) + } else { + server.write(http.StatusAccepted, obj, w) + } +} diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index 8b0e535feeb..f3371300c3a 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -18,7 +18,6 @@ package apiserver import ( "bytes" - "encoding/json" "fmt" "io/ioutil" "net/http" @@ -26,6 +25,7 @@ import ( "reflect" "sync" "testing" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" @@ -58,7 +58,11 @@ type SimpleRESTStorage struct { item Simple deleted string updated Simple - channel <-chan interface{} + created Simple + + // If non-nil, called inside the WorkFunc when answering update, delete, create. + // obj recieves the original input to the update, delete, or create call. + injectedFunction func(obj interface{}) (returnObj interface{}, err error) } func (storage *SimpleRESTStorage) List(labels.Selector) (interface{}, error) { @@ -74,7 +78,15 @@ func (storage *SimpleRESTStorage) Get(id string) (interface{}, error) { func (storage *SimpleRESTStorage) Delete(id string) (<-chan interface{}, error) { storage.deleted = id - return storage.channel, storage.err + if storage.err != nil { + return nil, storage.err + } + return MakeAsync(func() (interface{}, error) { + if storage.injectedFunction != nil { + return storage.injectedFunction(id) + } + return api.Status{Status: api.StatusSuccess}, nil + }), nil } func (storage *SimpleRESTStorage) Extract(body []byte) (interface{}, error) { @@ -83,13 +95,30 @@ func (storage *SimpleRESTStorage) Extract(body []byte) (interface{}, error) { return item, storage.err } -func (storage *SimpleRESTStorage) Create(interface{}) (<-chan interface{}, error) { - return storage.channel, storage.err +func (storage *SimpleRESTStorage) Create(obj interface{}) (<-chan interface{}, error) { + storage.created = obj.(Simple) + if storage.err != nil { + return nil, storage.err + } + return MakeAsync(func() (interface{}, error) { + if storage.injectedFunction != nil { + return storage.injectedFunction(obj) + } + return obj, nil + }), nil } -func (storage *SimpleRESTStorage) Update(object interface{}) (<-chan interface{}, error) { - storage.updated = object.(Simple) - return storage.channel, storage.err +func (storage *SimpleRESTStorage) Update(obj interface{}) (<-chan interface{}, error) { + storage.updated = obj.(Simple) + if storage.err != nil { + return nil, storage.err + } + return MakeAsync(func() (interface{}, error) { + if storage.injectedFunction != nil { + return storage.injectedFunction(obj) + } + return obj, nil + }), nil } func extractBody(response *http.Response, object interface{}) (string, error) { @@ -214,7 +243,7 @@ func TestUpdate(t *testing.T) { item := Simple{ Name: "bar", } - body, err := json.Marshal(item) + body, err := api.Encode(item) expectNoError(t, err) client := http.Client{} request, err := http.NewRequest("PUT", server.URL+"/prefix/version/simple/"+ID, bytes.NewReader(body)) @@ -270,14 +299,15 @@ func TestMissingStorage(t *testing.T) { } func TestCreate(t *testing.T) { + simpleStorage := &SimpleRESTStorage{} handler := New(map[string]RESTStorage{ - "foo": &SimpleRESTStorage{}, + "foo": simpleStorage, }, "/prefix/version") server := httptest.NewServer(handler) client := http.Client{} simple := Simple{Name: "foo"} - data, _ := json.Marshal(simple) + data, _ := api.Encode(simple) request, err := http.NewRequest("POST", server.URL+"/prefix/version/foo", bytes.NewBuffer(data)) expectNoError(t, err) response, err := client.Do(request) @@ -286,18 +316,32 @@ func TestCreate(t *testing.T) { t.Errorf("Unexpected response %#v", response) } - var itemOut Simple + var itemOut api.Status body, err := extractBody(response, &itemOut) expectNoError(t, err) - if !reflect.DeepEqual(itemOut, simple) { - t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simple, string(body)) + if itemOut.Status != api.StatusWorking || itemOut.Details == "" { + t.Errorf("Unexpected status: %#v (%s)", itemOut, string(body)) + } +} + +func TestParseTimeout(t *testing.T) { + if d := parseTimeout(""); d != 30*time.Second { + t.Errorf("blank timeout produces %v", d) + } + if d := parseTimeout("not a timeout"); d != 30*time.Second { + t.Errorf("bad timeout produces %v", d) + } + if d := parseTimeout("10s"); d != 10*time.Second { + t.Errorf("10s timeout produced: %v", d) } } func TestSyncCreate(t *testing.T) { - channel := make(chan interface{}, 1) storage := SimpleRESTStorage{ - channel: channel, + injectedFunction: func(obj interface{}) (interface{}, error) { + time.Sleep(200 * time.Millisecond) + return obj, nil + }, } handler := New(map[string]RESTStorage{ "foo": &storage, @@ -306,7 +350,7 @@ func TestSyncCreate(t *testing.T) { client := http.Client{} simple := Simple{Name: "foo"} - data, _ := json.Marshal(simple) + data, _ := api.Encode(simple) request, err := http.NewRequest("POST", server.URL+"/prefix/version/foo?sync=true", bytes.NewBuffer(data)) expectNoError(t, err) wg := sync.WaitGroup{} @@ -314,37 +358,54 @@ func TestSyncCreate(t *testing.T) { var response *http.Response go func() { response, err = client.Do(request) - expectNoError(t, err) - if response.StatusCode != 200 { - t.Errorf("Unexpected response %#v", response) - } wg.Done() }() - output := Simple{Name: "bar"} - channel <- output wg.Wait() + expectNoError(t, err) var itemOut Simple body, err := extractBody(response, &itemOut) expectNoError(t, err) - if !reflect.DeepEqual(itemOut, output) { + if !reflect.DeepEqual(itemOut, simple) { t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simple, string(body)) } + if response.StatusCode != http.StatusOK { + t.Errorf("Unexpected status: %d, Expected: %d, %#v", response.StatusCode, http.StatusOK, response) + } } func TestSyncCreateTimeout(t *testing.T) { + storage := SimpleRESTStorage{ + injectedFunction: func(obj interface{}) (interface{}, error) { + time.Sleep(400 * time.Millisecond) + return obj, nil + }, + } handler := New(map[string]RESTStorage{ - "foo": &SimpleRESTStorage{}, + "foo": &storage, }, "/prefix/version") server := httptest.NewServer(handler) client := http.Client{} simple := Simple{Name: "foo"} - data, _ := json.Marshal(simple) - request, err := http.NewRequest("POST", server.URL+"/prefix/version/foo?sync=true&timeout=1us", bytes.NewBuffer(data)) + data, _ := api.Encode(simple) + request, err := http.NewRequest("POST", server.URL+"/prefix/version/foo?sync=true&timeout=200ms", bytes.NewBuffer(data)) expectNoError(t, err) - response, err := client.Do(request) + wg := sync.WaitGroup{} + wg.Add(1) + var response *http.Response + go func() { + response, err = client.Do(request) + wg.Done() + }() + wg.Wait() expectNoError(t, err) - if response.StatusCode != 500 { - t.Errorf("Unexpected response %#v", response) + var itemOut api.Status + _, err = extractBody(response, &itemOut) + expectNoError(t, err) + if itemOut.Status != api.StatusWorking || itemOut.Details == "" { + t.Errorf("Unexpected status %#v", itemOut) + } + if response.StatusCode != http.StatusAccepted { + t.Errorf("Unexpected status: %d, Expected: %d, %#v", response.StatusCode, 202, response) } } diff --git a/pkg/apiserver/operation.go b/pkg/apiserver/operation.go new file mode 100644 index 00000000000..09b25e2c34f --- /dev/null +++ b/pkg/apiserver/operation.go @@ -0,0 +1,169 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "sort" + "strconv" + "sync" + "sync/atomic" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +// Operation represents an ongoing action which the server is performing. +type Operation struct { + ID string + result interface{} + awaiting <-chan interface{} + finished *time.Time + lock sync.Mutex + notify chan bool +} + +// Operations tracks all the ongoing operations. +type Operations struct { + // Access only using functions from atomic. + lastID int64 + + // 'lock' guards the ops map. + lock sync.Mutex + ops map[string]*Operation +} + +// Returns a new Operations repository. +func NewOperations() *Operations { + ops := &Operations{ + ops: map[string]*Operation{}, + } + go util.Forever(func() { ops.expire(10 * time.Minute) }, 5*time.Minute) + return ops +} + +// Add a new operation. Lock-free. +func (ops *Operations) NewOperation(from <-chan interface{}) *Operation { + id := atomic.AddInt64(&ops.lastID, 1) + op := &Operation{ + ID: strconv.FormatInt(id, 10), + awaiting: from, + notify: make(chan bool, 1), + } + go op.wait() + go ops.insert(op) + return op +} + +// Inserts op into the ops map. +func (ops *Operations) insert(op *Operation) { + ops.lock.Lock() + defer ops.lock.Unlock() + ops.ops[op.ID] = op +} + +// List operations for an API client. +func (ops *Operations) List() api.ServerOpList { + ops.lock.Lock() + defer ops.lock.Unlock() + + ids := []string{} + for id := range ops.ops { + ids = append(ids, id) + } + sort.StringSlice(ids).Sort() + ol := api.ServerOpList{} + for _, id := range ids { + ol.Items = append(ol.Items, api.ServerOp{JSONBase: api.JSONBase{ID: id}}) + } + return ol +} + +// Returns the operation with the given ID, or nil +func (ops *Operations) Get(id string) *Operation { + ops.lock.Lock() + defer ops.lock.Unlock() + return ops.ops[id] +} + +// Garbage collect operations that have finished longer than maxAge ago. +func (ops *Operations) expire(maxAge time.Duration) { + ops.lock.Lock() + defer ops.lock.Unlock() + keep := map[string]*Operation{} + limitTime := time.Now().Add(-maxAge) + for id, op := range ops.ops { + if !op.expired(limitTime) { + keep[id] = op + } + } + ops.ops = keep +} + +// Waits forever for the operation to complete; call via go when +// the operation is created. Sets op.finished when the operation +// does complete, and sends on the notify channel, in case there +// are any WaitFor() calls in progress. +// Does not keep op locked while waiting. +func (op *Operation) wait() { + defer util.HandleCrash() + result := <-op.awaiting + + op.lock.Lock() + defer op.lock.Unlock() + op.result = result + finished := time.Now() + op.finished = &finished + op.notify <- true +} + +// Wait for the specified duration, or until the operation finishes, +// whichever happens first. +func (op *Operation) WaitFor(timeout time.Duration) { + select { + case <-time.After(timeout): + case <-op.notify: + // Re-send on this channel in case there are others + // waiting for notification. + op.notify <- true + } +} + +// Returns true if this operation finished before limitTime. +func (op *Operation) expired(limitTime time.Time) bool { + op.lock.Lock() + defer op.lock.Unlock() + if op.finished == nil { + return false + } + return op.finished.Before(limitTime) +} + +// Return status information or the result of the operation if it is complete, +// with a bool indicating true in the latter case. +func (op *Operation) StatusOrResult() (description interface{}, finished bool) { + op.lock.Lock() + defer op.lock.Unlock() + + if op.finished == nil { + return api.Status{ + Status: api.StatusWorking, + Details: op.ID, + }, false + } + return op.result, true +} diff --git a/pkg/apiserver/operation_test.go b/pkg/apiserver/operation_test.go new file mode 100644 index 00000000000..35b06ab1547 --- /dev/null +++ b/pkg/apiserver/operation_test.go @@ -0,0 +1,86 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "sync/atomic" + "testing" + "time" +) + +func TestOperation(t *testing.T) { + ops := NewOperations() + + c := make(chan interface{}) + op := ops.NewOperation(c) + // Allow context switch, so that op's ID can get added to the map and Get will work. + // This is just so we can test Get. Ordinary users have no need to call Get immediately + // after calling NewOperation, because it returns the operation directly. + time.Sleep(time.Millisecond) + go func() { + time.Sleep(500 * time.Millisecond) + c <- "All done" + }() + + if op.expired(time.Now().Add(-time.Minute)) { + t.Errorf("Expired before finished: %#v", op) + } + ops.expire(time.Minute) + if tmp := ops.Get(op.ID); tmp == nil { + t.Errorf("expire incorrectly removed the operation %#v", ops) + } + + op.WaitFor(10 * time.Millisecond) + if _, completed := op.StatusOrResult(); completed { + t.Errorf("Unexpectedly fast completion") + } + + const waiters = 10 + var waited int32 + for i := 0; i < waiters; i++ { + go func() { + op.WaitFor(time.Hour) + atomic.AddInt32(&waited, 1) + }() + } + + op.WaitFor(time.Minute) + if _, completed := op.StatusOrResult(); !completed { + t.Errorf("Unexpectedly slow completion") + } + + time.Sleep(100 * time.Millisecond) + if waited != waiters { + t.Errorf("Multiple waiters doesn't work, only %v finished", waited) + } + + if op.expired(time.Now().Add(-time.Second)) { + t.Errorf("Should not be expired: %#v", op) + } + if !op.expired(time.Now().Add(-80 * time.Millisecond)) { + t.Errorf("Should be expired: %#v", op) + } + + ops.expire(80 * time.Millisecond) + if tmp := ops.Get(op.ID); tmp != nil { + t.Errorf("expire failed to remove the operation %#v", ops) + } + + if op.result.(string) != "All done" { + t.Errorf("Got unexpected result: %#v", op.result) + } +} diff --git a/pkg/client/request.go b/pkg/client/request.go index fde4d914e08..ea643421433 100644 --- a/pkg/client/request.go +++ b/pkg/client/request.go @@ -43,9 +43,11 @@ import ( // Begin a request with a verb (GET, POST, PUT, DELETE) func (c *Client) Verb(verb string) *Request { return &Request{ - verb: verb, - c: c, - path: "/api/v1beta1", + verb: verb, + c: c, + path: "/api/v1beta1", + sync: true, + timeout: 10 * time.Second, } } @@ -80,6 +82,7 @@ type Request struct { body io.Reader selector labels.Selector timeout time.Duration + sync bool } // Append an item to the request path. You must call Path at least once. @@ -91,6 +94,15 @@ func (r *Request) Path(item string) *Request { return r } +// Set sync/async call status. +func (r *Request) Sync(sync bool) *Request { + if r.err != nil { + return r + } + r.sync = sync + return r +} + // Overwrite an existing path with the path parameter. func (r *Request) AbsPath(path string) *Request { if r.err != nil { @@ -168,8 +180,11 @@ func (r *Request) Do() Result { if r.selector != nil { query.Add("labels", r.selector.String()) } - if r.timeout != 0 { - query.Add("timeout", r.timeout.String()) + if r.sync { + query.Add("sync", "true") + if r.timeout != 0 { + query.Add("timeout", r.timeout.String()) + } } finalUrl += "?" + query.Encode() req, err := http.NewRequest(r.verb, finalUrl, r.body) diff --git a/pkg/client/request_test.go b/pkg/client/request_test.go index 41c8ae62e32..c6ec542c240 100644 --- a/pkg/client/request_test.go +++ b/pkg/client/request_test.go @@ -58,7 +58,7 @@ func TestDoRequestNewWay(t *testing.T) { t.Errorf("Expected: %#v, got %#v", expectedObj, obj) } fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz", "POST", &reqBody) - if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo&timeout=1s" { + if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo&sync=true&timeout=1s" { t.Errorf("Unexpected query: %v", fakeHandler.RequestReceived.URL.RawQuery) } if fakeHandler.RequestReceived.Header["Authorization"] == nil { @@ -83,6 +83,7 @@ func TestDoRequestNewWayReader(t *testing.T) { Path("foo/bar"). Path("baz"). Selector(labels.Set{"name": "foo"}.AsSelector()). + Sync(false). Timeout(time.Second). Body(bytes.NewBuffer(reqBodyExpected)). Do().Get() @@ -97,7 +98,7 @@ func TestDoRequestNewWayReader(t *testing.T) { } tmpStr := string(reqBodyExpected) fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz", "POST", &tmpStr) - if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo&timeout=1s" { + if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo" { t.Errorf("Unexpected query: %v", fakeHandler.RequestReceived.URL.RawQuery) } if fakeHandler.RequestReceived.Header["Authorization"] == nil { @@ -136,7 +137,7 @@ func TestDoRequestNewWayObj(t *testing.T) { } tmpStr := string(reqBodyExpected) fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz", "POST", &tmpStr) - if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo&timeout=1s" { + if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo&sync=true&timeout=1s" { t.Errorf("Unexpected query: %v", fakeHandler.RequestReceived.URL.RawQuery) } if fakeHandler.RequestReceived.Header["Authorization"] == nil { @@ -181,7 +182,7 @@ func TestDoRequestNewWayFile(t *testing.T) { } tmpStr := string(reqBodyExpected) fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz", "POST", &tmpStr) - if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo&timeout=1s" { + if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo&sync=true&timeout=1s" { t.Errorf("Unexpected query: %v", fakeHandler.RequestReceived.URL.RawQuery) } if fakeHandler.RequestReceived.Header["Authorization"] == nil { @@ -213,3 +214,19 @@ func TestAbsPath(t *testing.T) { t.Errorf("unexpected path: %s, expected %s", r.path, expectedPath) } } + +func TestSync(t *testing.T) { + c := New("", nil) + r := c.Get() + if !r.sync { + t.Errorf("sync has wrong default") + } + r.Sync(false) + if r.sync { + t.Errorf("'Sync' doesn't work") + } + r.Sync(true) + if !r.sync { + t.Errorf("'Sync' doesn't work") + } +} diff --git a/pkg/registry/controller_registry.go b/pkg/registry/controller_registry.go index bf7722e83b3..64fac8258d1 100644 --- a/pkg/registry/controller_registry.go +++ b/pkg/registry/controller_registry.go @@ -17,6 +17,8 @@ limitations under the License. package registry import ( + "fmt" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" @@ -55,7 +57,9 @@ func (storage *ControllerRegistryStorage) Get(id string) (interface{}, error) { } func (storage *ControllerRegistryStorage) Delete(id string) (<-chan interface{}, error) { - return apiserver.MakeAsync(func() interface{} { return api.Status{Status: api.StatusSuccess} }), storage.registry.DeleteController(id) + return apiserver.MakeAsync(func() (interface{}, error) { + return api.Status{Status: api.StatusSuccess}, storage.registry.DeleteController(id) + }), nil } func (storage *ControllerRegistryStorage) Extract(body []byte) (interface{}, error) { @@ -64,10 +68,36 @@ func (storage *ControllerRegistryStorage) Extract(body []byte) (interface{}, err return result, err } -func (storage *ControllerRegistryStorage) Create(controller interface{}) (<-chan interface{}, error) { - return apiserver.MakeAsync(func() interface{} { return controller }), storage.registry.CreateController(controller.(api.ReplicationController)) +func (storage *ControllerRegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { + controller, ok := obj.(api.ReplicationController) + if !ok { + return nil, fmt.Errorf("not a replication controller: %#v", obj) + } + if controller.ID == "" { + return nil, fmt.Errorf("ID should not be empty: %#v", controller) + } + return apiserver.MakeAsync(func() (interface{}, error) { + err := storage.registry.CreateController(controller) + if err != nil { + return nil, err + } + return storage.registry.GetController(controller.ID) + }), nil } -func (storage *ControllerRegistryStorage) Update(controller interface{}) (<-chan interface{}, error) { - return apiserver.MakeAsync(func() interface{} { return controller }), storage.registry.UpdateController(controller.(api.ReplicationController)) +func (storage *ControllerRegistryStorage) Update(obj interface{}) (<-chan interface{}, error) { + controller, ok := obj.(api.ReplicationController) + if !ok { + return nil, fmt.Errorf("not a replication controller: %#v", obj) + } + if controller.ID == "" { + return nil, fmt.Errorf("ID should not be empty: %#v", controller) + } + return apiserver.MakeAsync(func() (interface{}, error) { + err := storage.registry.UpdateController(controller) + if err != nil { + return nil, err + } + return storage.registry.GetController(controller.ID) + }), nil } diff --git a/pkg/registry/minion_registry.go b/pkg/registry/minion_registry.go index e5b6c0c131b..dbe7fc5c994 100644 --- a/pkg/registry/minion_registry.go +++ b/pkg/registry/minion_registry.go @@ -140,8 +140,28 @@ func (storage *MinionRegistryStorage) Extract(body []byte) (interface{}, error) return minion, err } -func (storage *MinionRegistryStorage) Create(minion interface{}) (<-chan interface{}, error) { - return apiserver.MakeAsync(func() interface{} { return minion }), storage.registry.Insert(minion.(api.Minion).ID) +func (storage *MinionRegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { + minion, ok := obj.(api.Minion) + if !ok { + return nil, fmt.Errorf("not a minion: %#v", obj) + } + if minion.ID == "" { + return nil, fmt.Errorf("ID should not be empty: %#v", minion) + } + return apiserver.MakeAsync(func() (interface{}, error) { + err := storage.registry.Insert(minion.ID) + if err != nil { + return nil, err + } + contains, err := storage.registry.Contains(minion.ID) + if err != nil { + return nil, err + } + if contains { + return storage.toApiMinion(minion.ID), nil + } + return nil, fmt.Errorf("unable to add minion %#v", minion) + }), nil } func (storage *MinionRegistryStorage) Update(minion interface{}) (<-chan interface{}, error) { @@ -156,5 +176,7 @@ func (storage *MinionRegistryStorage) Delete(id string) (<-chan interface{}, err if err != nil { return nil, err } - return apiserver.MakeAsync(func() interface{} { return api.Status{Status: api.StatusSuccess} }), storage.registry.Delete(id) + return apiserver.MakeAsync(func() (interface{}, error) { + return api.Status{Status: api.StatusSuccess}, storage.registry.Delete(id) + }), nil } diff --git a/pkg/registry/minion_registry_test.go b/pkg/registry/minion_registry_test.go index ca12a0be66b..6c6db1b30f7 100644 --- a/pkg/registry/minion_registry_test.go +++ b/pkg/registry/minion_registry_test.go @@ -73,20 +73,32 @@ func TestMinionRegistryStorage(t *testing.T) { t.Errorf("has unexpected object") } - if _, err := ms.Create(api.Minion{JSONBase: api.JSONBase{ID: "baz"}}); err != nil { + c, err := ms.Create(api.Minion{JSONBase: api.JSONBase{ID: "baz"}}) + if err != nil { t.Errorf("insert failed") } + obj := <-c + if m, ok := obj.(api.Minion); !ok || m.ID != "baz" { + t.Errorf("insert return value was weird: %#v", obj) + } if obj, err := ms.Get("baz"); err != nil || obj.(api.Minion).ID != "baz" { t.Errorf("insert didn't actually insert") } - if _, err := ms.Delete("bar"); err != nil { + c, err = ms.Delete("bar") + if err != nil { t.Errorf("delete failed") } + obj = <-c + if s, ok := obj.(api.Status); !ok || s.Status != api.StatusSuccess { + t.Errorf("delete return value was weird: %#v", obj) + } if _, err := ms.Get("bar"); err != ErrDoesNotExist { t.Errorf("delete didn't actually delete") } - if _, err := ms.Delete("bar"); err != ErrDoesNotExist { + + _, err = ms.Delete("bar") + if err != ErrDoesNotExist { t.Errorf("delete returned wrong error") } diff --git a/pkg/registry/pod_registry.go b/pkg/registry/pod_registry.go index 06b0c4ae79e..708dfb24ac7 100644 --- a/pkg/registry/pod_registry.go +++ b/pkg/registry/pod_registry.go @@ -131,7 +131,9 @@ func (storage *PodRegistryStorage) Get(id string) (interface{}, error) { } func (storage *PodRegistryStorage) Delete(id string) (<-chan interface{}, error) { - return apiserver.MakeAsync(func() interface{} { return api.Status{Status: api.StatusSuccess} }), storage.registry.DeletePod(id) + return apiserver.MakeAsync(func() (interface{}, error) { + return api.Status{Status: api.StatusSuccess}, storage.registry.DeletePod(id) + }), nil } func (storage *PodRegistryStorage) Extract(body []byte) (interface{}, error) { @@ -140,19 +142,37 @@ func (storage *PodRegistryStorage) Extract(body []byte) (interface{}, error) { return pod, err } -func (storage *PodRegistryStorage) Create(pod interface{}) (<-chan interface{}, error) { - podObj := pod.(api.Pod) - if len(podObj.ID) == 0 { +func (storage *PodRegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { + pod := obj.(api.Pod) + if len(pod.ID) == 0 { return nil, fmt.Errorf("id is unspecified: %#v", pod) } - machine, err := storage.scheduler.Schedule(podObj) - if err != nil { - return nil, err + + return apiserver.MakeAsync(func() (interface{}, error) { + // TODO(lavalamp): Separate scheduler more cleanly. + machine, err := storage.scheduler.Schedule(pod) + if err != nil { + return nil, err + } + err = storage.registry.CreatePod(machine, pod) + if err != nil { + return nil, err + } + return storage.registry.GetPod(pod.ID) + }), nil +} + +func (storage *PodRegistryStorage) Update(obj interface{}) (<-chan interface{}, error) { + pod := obj.(api.Pod) + if len(pod.ID) == 0 { + return nil, fmt.Errorf("id is unspecified: %#v", pod) } - return apiserver.MakeAsync(func() interface{} { return pod }), storage.registry.CreatePod(machine, podObj) -} - -func (storage *PodRegistryStorage) Update(pod interface{}) (<-chan interface{}, error) { - return apiserver.MakeAsync(func() interface{} { return pod }), storage.registry.UpdatePod(pod.(api.Pod)) + return apiserver.MakeAsync(func() (interface{}, error) { + err := storage.registry.UpdatePod(pod) + if err != nil { + return nil, err + } + return storage.registry.GetPod(pod.ID) + }), nil } diff --git a/pkg/registry/service_registry.go b/pkg/registry/service_registry.go index 419719869d7..89455a85c50 100644 --- a/pkg/registry/service_registry.go +++ b/pkg/registry/service_registry.go @@ -82,24 +82,26 @@ func (sr *ServiceRegistryStorage) Get(id string) (interface{}, error) { } func (sr *ServiceRegistryStorage) Delete(id string) (<-chan interface{}, error) { - svc, err := sr.Get(id) + service, err := sr.registry.GetService(id) if err != nil { return nil, err } - if svc.(*api.Service).CreateExternalLoadBalancer { - var balancer cloudprovider.TCPLoadBalancer - var ok bool - if sr.cloud != nil { - balancer, ok = sr.cloud.TCPLoadBalancer() - } - if ok && balancer != nil { - err = balancer.DeleteTCPLoadBalancer(id, "us-central1") - if err != nil { - return nil, err + return apiserver.MakeAsync(func() (interface{}, error) { + if service.CreateExternalLoadBalancer { + var balancer cloudprovider.TCPLoadBalancer + var ok bool + if sr.cloud != nil { + balancer, ok = sr.cloud.TCPLoadBalancer() + } + if ok && balancer != nil { + err = balancer.DeleteTCPLoadBalancer(id, "us-central1") + if err != nil { + return nil, err + } } } - } - return apiserver.MakeAsync(func() interface{} { return api.Status{Status: api.StatusSuccess} }), sr.registry.DeleteService(id) + return api.Status{Status: api.StatusSuccess}, sr.registry.DeleteService(id) + }), nil } func (sr *ServiceRegistryStorage) Extract(body []byte) (interface{}, error) { @@ -110,29 +112,51 @@ func (sr *ServiceRegistryStorage) Extract(body []byte) (interface{}, error) { func (sr *ServiceRegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { srv := obj.(api.Service) - if srv.CreateExternalLoadBalancer { - var balancer cloudprovider.TCPLoadBalancer - var ok bool - if sr.cloud != nil { - balancer, ok = sr.cloud.TCPLoadBalancer() - } - if ok && balancer != nil { - hosts, err := sr.machines.List() - if err != nil { - return nil, err - } - err = balancer.CreateTCPLoadBalancer(srv.ID, "us-central1", srv.Port, hosts) - if err != nil { - return nil, err - } - } else { - return nil, fmt.Errorf("requested an external service, but no cloud provider supplied.") - } + if srv.ID == "" { + return nil, fmt.Errorf("ID should not be empty: %#v", srv) } - // TODO actually wait for the object to be fully created here. - return apiserver.MakeAsync(func() interface{} { return obj }), sr.registry.CreateService(srv) + return apiserver.MakeAsync(func() (interface{}, error) { + // TODO: Consider moving this to a rectification loop, so that we make/remove external load balancers + // correctly no matter what http operations happen. + if srv.CreateExternalLoadBalancer { + var balancer cloudprovider.TCPLoadBalancer + var ok bool + if sr.cloud != nil { + balancer, ok = sr.cloud.TCPLoadBalancer() + } + if ok && balancer != nil { + hosts, err := sr.machines.List() + if err != nil { + return nil, err + } + err = balancer.CreateTCPLoadBalancer(srv.ID, "us-central1", srv.Port, hosts) + if err != nil { + return nil, err + } + } else { + return nil, fmt.Errorf("requested an external service, but no cloud provider supplied.") + } + } + // TODO actually wait for the object to be fully created here. + err := sr.registry.CreateService(srv) + if err != nil { + return nil, err + } + return sr.registry.GetService(srv.ID) + }), nil } func (sr *ServiceRegistryStorage) Update(obj interface{}) (<-chan interface{}, error) { - return apiserver.MakeAsync(func() interface{} { return obj }), sr.registry.UpdateService(obj.(api.Service)) + srv := obj.(api.Service) + if srv.ID == "" { + return nil, fmt.Errorf("ID should not be empty: %#v", srv) + } + return apiserver.MakeAsync(func() (interface{}, error) { + // TODO: check to see if external load balancer status changed + err := sr.registry.UpdateService(srv) + if err != nil { + return nil, err + } + return sr.registry.GetService(srv.ID) + }), nil } diff --git a/pkg/registry/service_registry_test.go b/pkg/registry/service_registry_test.go index 109a9b7f7c6..023101aae2f 100644 --- a/pkg/registry/service_registry_test.go +++ b/pkg/registry/service_registry_test.go @@ -34,7 +34,8 @@ func TestServiceRegistry(t *testing.T) { svc := api.Service{ JSONBase: api.JSONBase{ID: "foo"}, } - storage.Create(svc) + c, _ := storage.Create(svc) + <-c if len(fakeCloud.Calls) != 0 { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) @@ -57,7 +58,8 @@ func TestServiceRegistryExternalService(t *testing.T) { JSONBase: api.JSONBase{ID: "foo"}, CreateExternalLoadBalancer: true, } - storage.Create(svc) + c, _ := storage.Create(svc) + <-c if len(fakeCloud.Calls) != 1 || fakeCloud.Calls[0] != "create" { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) @@ -82,7 +84,8 @@ func TestServiceRegistryExternalServiceError(t *testing.T) { JSONBase: api.JSONBase{ID: "foo"}, CreateExternalLoadBalancer: true, } - storage.Create(svc) + c, _ := storage.Create(svc) + <-c if len(fakeCloud.Calls) != 1 || fakeCloud.Calls[0] != "create" { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) @@ -106,7 +109,8 @@ func TestServiceRegistryDelete(t *testing.T) { } memory.CreateService(svc) - storage.Delete(svc.ID) + c, _ := storage.Delete(svc.ID) + <-c if len(fakeCloud.Calls) != 0 { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) @@ -131,7 +135,8 @@ func TestServiceRegistryDeleteExternal(t *testing.T) { } memory.CreateService(svc) - storage.Delete(svc.ID) + c, _ := storage.Delete(svc.ID) + <-c if len(fakeCloud.Calls) != 1 || fakeCloud.Calls[0] != "delete" { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)