Merge pull request #249 from lavalamp/api_long_op

Give api server operation tracking ability
This commit is contained in:
brendandburns 2014-06-26 12:10:40 -07:00
commit 46a615864c
14 changed files with 656 additions and 155 deletions

View File

@ -37,6 +37,8 @@ func init() {
MinionList{},
Minion{},
Status{},
ServerOpList{},
ServerOp{},
)
}

View File

@ -207,3 +207,14 @@ const (
StatusFailure = "failure"
StatusWorking = "working"
)
// Operation information, as delivered to API clients.
type ServerOp struct {
JSONBase `yaml:",inline" json:",inline"`
}
// Operation list, as delivered to API clients.
type ServerOpList struct {
JSONBase `yaml:",inline" json:",inline"`
Items []ServerOp `yaml:"items,omitempty" json:"items,omitempty"`
}

View File

@ -41,11 +41,30 @@ type RESTStorage interface {
Update(interface{}) (<-chan interface{}, error)
}
func MakeAsync(fn func() interface{}) <-chan interface{} {
channel := make(chan interface{}, 1)
// WorkFunc is used to perform any time consuming work for an api call, after
// the input has been validated. Pass one of these to MakeAsync to create an
// appropriate return value for the Update, Delete, and Create methods.
type WorkFunc func() (result interface{}, err error)
// MakeAsync takes a function and executes it, delivering the result in the way required
// by RESTStorage's Update, Delete, and Create methods.
func MakeAsync(fn WorkFunc) <-chan interface{} {
channel := make(chan interface{})
go func() {
defer util.HandleCrash()
channel <- fn()
obj, err := fn()
if err != nil {
channel <- &api.Status{
Status: api.StatusFailure,
Details: err.Error(),
}
} else {
channel <- obj
}
// 'close' is used to signal that no further values will
// be written to the channel. Not strictly necessary, but
// also won't hurt.
close(channel)
}()
return channel
}
@ -59,6 +78,7 @@ func MakeAsync(fn func() interface{}) <-chan interface{} {
type ApiServer struct {
prefix string
storage map[string]RESTStorage
ops *Operations
}
// New creates a new ApiServer object.
@ -68,6 +88,7 @@ func New(storage map[string]RESTStorage, prefix string) *ApiServer {
return &ApiServer{
storage: storage,
prefix: prefix,
ops: NewOperations(),
}
}
@ -108,6 +129,10 @@ func (server *ApiServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
server.notFound(req, w)
return
}
if requestParts[0] == "operations" {
server.handleOperationRequest(requestParts[1:], w, req)
return
}
storage := server.storage[requestParts[0]]
if storage == nil {
logger.Addf("'%v' has no storage object", requestParts[0])
@ -144,15 +169,30 @@ func (server *ApiServer) readBody(req *http.Request) ([]byte, error) {
return body, err
}
func (server *ApiServer) waitForObject(out <-chan interface{}, timeout time.Duration) (interface{}, error) {
tick := time.After(timeout)
var obj interface{}
select {
case obj = <-out:
return obj, nil
case <-tick:
return nil, fmt.Errorf("Timed out waiting for synchronization.")
// finishReq finishes up a request, waiting until the operation finishes or, after a timeout, creating an
// Operation to recieve the result and returning its ID down the writer.
func (server *ApiServer) finishReq(out <-chan interface{}, sync bool, timeout time.Duration, w http.ResponseWriter) {
op := server.ops.NewOperation(out)
if sync {
op.WaitFor(timeout)
}
obj, complete := op.StatusOrResult()
if complete {
server.write(http.StatusOK, obj, w)
} else {
server.write(http.StatusAccepted, obj, w)
}
}
func parseTimeout(str string) time.Duration {
if str != "" {
timeout, err := time.ParseDuration(str)
if err == nil {
return timeout
}
glog.Errorf("Failed to parse: %#v '%s'", err, str)
}
return 30 * time.Second
}
// handleREST is the main dispatcher for the server. It switches on the HTTP method, and then
@ -170,11 +210,7 @@ func (server *ApiServer) waitForObject(out <-chan interface{}, timeout time.Dura
// labels=<label-selector> Used for filtering list operations
func (server *ApiServer) handleREST(parts []string, requestUrl *url.URL, req *http.Request, w http.ResponseWriter, storage RESTStorage) {
sync := requestUrl.Query().Get("sync") == "true"
timeout, err := time.ParseDuration(requestUrl.Query().Get("timeout"))
if err != nil && len(requestUrl.Query().Get("timeout")) > 0 {
glog.Errorf("Failed to parse: %#v '%s'", err, requestUrl.Query().Get("timeout"))
timeout = time.Second * 30
}
timeout := parseTimeout(requestUrl.Query().Get("timeout"))
switch req.Method {
case "GET":
switch len(parts) {
@ -184,12 +220,12 @@ func (server *ApiServer) handleREST(parts []string, requestUrl *url.URL, req *ht
server.error(err, w)
return
}
controllers, err := storage.List(selector)
list, err := storage.List(selector)
if err != nil {
server.error(err, w)
return
}
server.write(http.StatusOK, controllers, w)
server.write(http.StatusOK, list, w)
case 2:
item, err := storage.Get(parts[1])
if err != nil {
@ -204,7 +240,6 @@ func (server *ApiServer) handleREST(parts []string, requestUrl *url.URL, req *ht
default:
server.notFound(req, w)
}
return
case "POST":
if len(parts) != 1 {
server.notFound(req, w)
@ -221,44 +256,22 @@ func (server *ApiServer) handleREST(parts []string, requestUrl *url.URL, req *ht
return
}
out, err := storage.Create(obj)
if err == nil && sync {
obj, err = server.waitForObject(out, timeout)
}
if err != nil {
server.error(err, w)
return
}
var statusCode int
if sync {
statusCode = http.StatusOK
} else {
statusCode = http.StatusAccepted
}
server.write(statusCode, obj, w)
return
server.finishReq(out, sync, timeout, w)
case "DELETE":
if len(parts) != 2 {
server.notFound(req, w)
return
}
out, err := storage.Delete(parts[1])
var obj interface{}
obj = api.Status{Status: api.StatusSuccess}
if err == nil && sync {
obj, err = server.waitForObject(out, timeout)
}
if err != nil {
server.error(err, w)
return
}
var statusCode int
if sync {
statusCode = http.StatusOK
} else {
statusCode = http.StatusAccepted
}
server.write(statusCode, obj, w)
return
server.finishReq(out, sync, timeout, w)
case "PUT":
if len(parts) != 2 {
server.notFound(req, w)
@ -274,22 +287,36 @@ func (server *ApiServer) handleREST(parts []string, requestUrl *url.URL, req *ht
return
}
out, err := storage.Update(obj)
if err == nil && sync {
obj, err = server.waitForObject(out, timeout)
}
if err != nil {
server.error(err, w)
return
}
var statusCode int
if sync {
statusCode = http.StatusOK
} else {
statusCode = http.StatusAccepted
}
server.write(statusCode, obj, w)
return
server.finishReq(out, sync, timeout, w)
default:
server.notFound(req, w)
}
}
func (server *ApiServer) handleOperationRequest(parts []string, w http.ResponseWriter, req *http.Request) {
if req.Method != "GET" {
server.notFound(req, w)
}
if len(parts) == 0 {
// List outstanding operations.
list := server.ops.List()
server.write(http.StatusOK, list, w)
return
}
op := server.ops.Get(parts[0])
if op == nil {
server.notFound(req, w)
}
obj, complete := op.StatusOrResult()
if complete {
server.write(http.StatusOK, obj, w)
} else {
server.write(http.StatusAccepted, obj, w)
}
}

View File

@ -18,7 +18,6 @@ package apiserver
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
@ -26,6 +25,7 @@ import (
"reflect"
"sync"
"testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
@ -58,7 +58,11 @@ type SimpleRESTStorage struct {
item Simple
deleted string
updated Simple
channel <-chan interface{}
created Simple
// If non-nil, called inside the WorkFunc when answering update, delete, create.
// obj recieves the original input to the update, delete, or create call.
injectedFunction func(obj interface{}) (returnObj interface{}, err error)
}
func (storage *SimpleRESTStorage) List(labels.Selector) (interface{}, error) {
@ -74,7 +78,15 @@ func (storage *SimpleRESTStorage) Get(id string) (interface{}, error) {
func (storage *SimpleRESTStorage) Delete(id string) (<-chan interface{}, error) {
storage.deleted = id
return storage.channel, storage.err
if storage.err != nil {
return nil, storage.err
}
return MakeAsync(func() (interface{}, error) {
if storage.injectedFunction != nil {
return storage.injectedFunction(id)
}
return api.Status{Status: api.StatusSuccess}, nil
}), nil
}
func (storage *SimpleRESTStorage) Extract(body []byte) (interface{}, error) {
@ -83,13 +95,30 @@ func (storage *SimpleRESTStorage) Extract(body []byte) (interface{}, error) {
return item, storage.err
}
func (storage *SimpleRESTStorage) Create(interface{}) (<-chan interface{}, error) {
return storage.channel, storage.err
func (storage *SimpleRESTStorage) Create(obj interface{}) (<-chan interface{}, error) {
storage.created = obj.(Simple)
if storage.err != nil {
return nil, storage.err
}
return MakeAsync(func() (interface{}, error) {
if storage.injectedFunction != nil {
return storage.injectedFunction(obj)
}
return obj, nil
}), nil
}
func (storage *SimpleRESTStorage) Update(object interface{}) (<-chan interface{}, error) {
storage.updated = object.(Simple)
return storage.channel, storage.err
func (storage *SimpleRESTStorage) Update(obj interface{}) (<-chan interface{}, error) {
storage.updated = obj.(Simple)
if storage.err != nil {
return nil, storage.err
}
return MakeAsync(func() (interface{}, error) {
if storage.injectedFunction != nil {
return storage.injectedFunction(obj)
}
return obj, nil
}), nil
}
func extractBody(response *http.Response, object interface{}) (string, error) {
@ -214,7 +243,7 @@ func TestUpdate(t *testing.T) {
item := Simple{
Name: "bar",
}
body, err := json.Marshal(item)
body, err := api.Encode(item)
expectNoError(t, err)
client := http.Client{}
request, err := http.NewRequest("PUT", server.URL+"/prefix/version/simple/"+ID, bytes.NewReader(body))
@ -270,14 +299,15 @@ func TestMissingStorage(t *testing.T) {
}
func TestCreate(t *testing.T) {
simpleStorage := &SimpleRESTStorage{}
handler := New(map[string]RESTStorage{
"foo": &SimpleRESTStorage{},
"foo": simpleStorage,
}, "/prefix/version")
server := httptest.NewServer(handler)
client := http.Client{}
simple := Simple{Name: "foo"}
data, _ := json.Marshal(simple)
data, _ := api.Encode(simple)
request, err := http.NewRequest("POST", server.URL+"/prefix/version/foo", bytes.NewBuffer(data))
expectNoError(t, err)
response, err := client.Do(request)
@ -286,18 +316,32 @@ func TestCreate(t *testing.T) {
t.Errorf("Unexpected response %#v", response)
}
var itemOut Simple
var itemOut api.Status
body, err := extractBody(response, &itemOut)
expectNoError(t, err)
if !reflect.DeepEqual(itemOut, simple) {
t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simple, string(body))
if itemOut.Status != api.StatusWorking || itemOut.Details == "" {
t.Errorf("Unexpected status: %#v (%s)", itemOut, string(body))
}
}
func TestParseTimeout(t *testing.T) {
if d := parseTimeout(""); d != 30*time.Second {
t.Errorf("blank timeout produces %v", d)
}
if d := parseTimeout("not a timeout"); d != 30*time.Second {
t.Errorf("bad timeout produces %v", d)
}
if d := parseTimeout("10s"); d != 10*time.Second {
t.Errorf("10s timeout produced: %v", d)
}
}
func TestSyncCreate(t *testing.T) {
channel := make(chan interface{}, 1)
storage := SimpleRESTStorage{
channel: channel,
injectedFunction: func(obj interface{}) (interface{}, error) {
time.Sleep(200 * time.Millisecond)
return obj, nil
},
}
handler := New(map[string]RESTStorage{
"foo": &storage,
@ -306,7 +350,7 @@ func TestSyncCreate(t *testing.T) {
client := http.Client{}
simple := Simple{Name: "foo"}
data, _ := json.Marshal(simple)
data, _ := api.Encode(simple)
request, err := http.NewRequest("POST", server.URL+"/prefix/version/foo?sync=true", bytes.NewBuffer(data))
expectNoError(t, err)
wg := sync.WaitGroup{}
@ -314,37 +358,54 @@ func TestSyncCreate(t *testing.T) {
var response *http.Response
go func() {
response, err = client.Do(request)
expectNoError(t, err)
if response.StatusCode != 200 {
t.Errorf("Unexpected response %#v", response)
}
wg.Done()
}()
output := Simple{Name: "bar"}
channel <- output
wg.Wait()
expectNoError(t, err)
var itemOut Simple
body, err := extractBody(response, &itemOut)
expectNoError(t, err)
if !reflect.DeepEqual(itemOut, output) {
if !reflect.DeepEqual(itemOut, simple) {
t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simple, string(body))
}
if response.StatusCode != http.StatusOK {
t.Errorf("Unexpected status: %d, Expected: %d, %#v", response.StatusCode, http.StatusOK, response)
}
}
func TestSyncCreateTimeout(t *testing.T) {
storage := SimpleRESTStorage{
injectedFunction: func(obj interface{}) (interface{}, error) {
time.Sleep(400 * time.Millisecond)
return obj, nil
},
}
handler := New(map[string]RESTStorage{
"foo": &SimpleRESTStorage{},
"foo": &storage,
}, "/prefix/version")
server := httptest.NewServer(handler)
client := http.Client{}
simple := Simple{Name: "foo"}
data, _ := json.Marshal(simple)
request, err := http.NewRequest("POST", server.URL+"/prefix/version/foo?sync=true&timeout=1us", bytes.NewBuffer(data))
data, _ := api.Encode(simple)
request, err := http.NewRequest("POST", server.URL+"/prefix/version/foo?sync=true&timeout=200ms", bytes.NewBuffer(data))
expectNoError(t, err)
response, err := client.Do(request)
wg := sync.WaitGroup{}
wg.Add(1)
var response *http.Response
go func() {
response, err = client.Do(request)
wg.Done()
}()
wg.Wait()
expectNoError(t, err)
if response.StatusCode != 500 {
t.Errorf("Unexpected response %#v", response)
var itemOut api.Status
_, err = extractBody(response, &itemOut)
expectNoError(t, err)
if itemOut.Status != api.StatusWorking || itemOut.Details == "" {
t.Errorf("Unexpected status %#v", itemOut)
}
if response.StatusCode != http.StatusAccepted {
t.Errorf("Unexpected status: %d, Expected: %d, %#v", response.StatusCode, 202, response)
}
}

169
pkg/apiserver/operation.go Normal file
View File

@ -0,0 +1,169 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"sort"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
// Operation represents an ongoing action which the server is performing.
type Operation struct {
ID string
result interface{}
awaiting <-chan interface{}
finished *time.Time
lock sync.Mutex
notify chan bool
}
// Operations tracks all the ongoing operations.
type Operations struct {
// Access only using functions from atomic.
lastID int64
// 'lock' guards the ops map.
lock sync.Mutex
ops map[string]*Operation
}
// Returns a new Operations repository.
func NewOperations() *Operations {
ops := &Operations{
ops: map[string]*Operation{},
}
go util.Forever(func() { ops.expire(10 * time.Minute) }, 5*time.Minute)
return ops
}
// Add a new operation. Lock-free.
func (ops *Operations) NewOperation(from <-chan interface{}) *Operation {
id := atomic.AddInt64(&ops.lastID, 1)
op := &Operation{
ID: strconv.FormatInt(id, 10),
awaiting: from,
notify: make(chan bool, 1),
}
go op.wait()
go ops.insert(op)
return op
}
// Inserts op into the ops map.
func (ops *Operations) insert(op *Operation) {
ops.lock.Lock()
defer ops.lock.Unlock()
ops.ops[op.ID] = op
}
// List operations for an API client.
func (ops *Operations) List() api.ServerOpList {
ops.lock.Lock()
defer ops.lock.Unlock()
ids := []string{}
for id := range ops.ops {
ids = append(ids, id)
}
sort.StringSlice(ids).Sort()
ol := api.ServerOpList{}
for _, id := range ids {
ol.Items = append(ol.Items, api.ServerOp{JSONBase: api.JSONBase{ID: id}})
}
return ol
}
// Returns the operation with the given ID, or nil
func (ops *Operations) Get(id string) *Operation {
ops.lock.Lock()
defer ops.lock.Unlock()
return ops.ops[id]
}
// Garbage collect operations that have finished longer than maxAge ago.
func (ops *Operations) expire(maxAge time.Duration) {
ops.lock.Lock()
defer ops.lock.Unlock()
keep := map[string]*Operation{}
limitTime := time.Now().Add(-maxAge)
for id, op := range ops.ops {
if !op.expired(limitTime) {
keep[id] = op
}
}
ops.ops = keep
}
// Waits forever for the operation to complete; call via go when
// the operation is created. Sets op.finished when the operation
// does complete, and sends on the notify channel, in case there
// are any WaitFor() calls in progress.
// Does not keep op locked while waiting.
func (op *Operation) wait() {
defer util.HandleCrash()
result := <-op.awaiting
op.lock.Lock()
defer op.lock.Unlock()
op.result = result
finished := time.Now()
op.finished = &finished
op.notify <- true
}
// Wait for the specified duration, or until the operation finishes,
// whichever happens first.
func (op *Operation) WaitFor(timeout time.Duration) {
select {
case <-time.After(timeout):
case <-op.notify:
// Re-send on this channel in case there are others
// waiting for notification.
op.notify <- true
}
}
// Returns true if this operation finished before limitTime.
func (op *Operation) expired(limitTime time.Time) bool {
op.lock.Lock()
defer op.lock.Unlock()
if op.finished == nil {
return false
}
return op.finished.Before(limitTime)
}
// Return status information or the result of the operation if it is complete,
// with a bool indicating true in the latter case.
func (op *Operation) StatusOrResult() (description interface{}, finished bool) {
op.lock.Lock()
defer op.lock.Unlock()
if op.finished == nil {
return api.Status{
Status: api.StatusWorking,
Details: op.ID,
}, false
}
return op.result, true
}

View File

@ -0,0 +1,86 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"sync/atomic"
"testing"
"time"
)
func TestOperation(t *testing.T) {
ops := NewOperations()
c := make(chan interface{})
op := ops.NewOperation(c)
// Allow context switch, so that op's ID can get added to the map and Get will work.
// This is just so we can test Get. Ordinary users have no need to call Get immediately
// after calling NewOperation, because it returns the operation directly.
time.Sleep(time.Millisecond)
go func() {
time.Sleep(500 * time.Millisecond)
c <- "All done"
}()
if op.expired(time.Now().Add(-time.Minute)) {
t.Errorf("Expired before finished: %#v", op)
}
ops.expire(time.Minute)
if tmp := ops.Get(op.ID); tmp == nil {
t.Errorf("expire incorrectly removed the operation %#v", ops)
}
op.WaitFor(10 * time.Millisecond)
if _, completed := op.StatusOrResult(); completed {
t.Errorf("Unexpectedly fast completion")
}
const waiters = 10
var waited int32
for i := 0; i < waiters; i++ {
go func() {
op.WaitFor(time.Hour)
atomic.AddInt32(&waited, 1)
}()
}
op.WaitFor(time.Minute)
if _, completed := op.StatusOrResult(); !completed {
t.Errorf("Unexpectedly slow completion")
}
time.Sleep(100 * time.Millisecond)
if waited != waiters {
t.Errorf("Multiple waiters doesn't work, only %v finished", waited)
}
if op.expired(time.Now().Add(-time.Second)) {
t.Errorf("Should not be expired: %#v", op)
}
if !op.expired(time.Now().Add(-80 * time.Millisecond)) {
t.Errorf("Should be expired: %#v", op)
}
ops.expire(80 * time.Millisecond)
if tmp := ops.Get(op.ID); tmp != nil {
t.Errorf("expire failed to remove the operation %#v", ops)
}
if op.result.(string) != "All done" {
t.Errorf("Got unexpected result: %#v", op.result)
}
}

View File

@ -43,9 +43,11 @@ import (
// Begin a request with a verb (GET, POST, PUT, DELETE)
func (c *Client) Verb(verb string) *Request {
return &Request{
verb: verb,
c: c,
path: "/api/v1beta1",
verb: verb,
c: c,
path: "/api/v1beta1",
sync: true,
timeout: 10 * time.Second,
}
}
@ -80,6 +82,7 @@ type Request struct {
body io.Reader
selector labels.Selector
timeout time.Duration
sync bool
}
// Append an item to the request path. You must call Path at least once.
@ -91,6 +94,15 @@ func (r *Request) Path(item string) *Request {
return r
}
// Set sync/async call status.
func (r *Request) Sync(sync bool) *Request {
if r.err != nil {
return r
}
r.sync = sync
return r
}
// Overwrite an existing path with the path parameter.
func (r *Request) AbsPath(path string) *Request {
if r.err != nil {
@ -168,8 +180,11 @@ func (r *Request) Do() Result {
if r.selector != nil {
query.Add("labels", r.selector.String())
}
if r.timeout != 0 {
query.Add("timeout", r.timeout.String())
if r.sync {
query.Add("sync", "true")
if r.timeout != 0 {
query.Add("timeout", r.timeout.String())
}
}
finalUrl += "?" + query.Encode()
req, err := http.NewRequest(r.verb, finalUrl, r.body)

View File

@ -58,7 +58,7 @@ func TestDoRequestNewWay(t *testing.T) {
t.Errorf("Expected: %#v, got %#v", expectedObj, obj)
}
fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz", "POST", &reqBody)
if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo&timeout=1s" {
if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo&sync=true&timeout=1s" {
t.Errorf("Unexpected query: %v", fakeHandler.RequestReceived.URL.RawQuery)
}
if fakeHandler.RequestReceived.Header["Authorization"] == nil {
@ -83,6 +83,7 @@ func TestDoRequestNewWayReader(t *testing.T) {
Path("foo/bar").
Path("baz").
Selector(labels.Set{"name": "foo"}.AsSelector()).
Sync(false).
Timeout(time.Second).
Body(bytes.NewBuffer(reqBodyExpected)).
Do().Get()
@ -97,7 +98,7 @@ func TestDoRequestNewWayReader(t *testing.T) {
}
tmpStr := string(reqBodyExpected)
fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz", "POST", &tmpStr)
if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo&timeout=1s" {
if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo" {
t.Errorf("Unexpected query: %v", fakeHandler.RequestReceived.URL.RawQuery)
}
if fakeHandler.RequestReceived.Header["Authorization"] == nil {
@ -136,7 +137,7 @@ func TestDoRequestNewWayObj(t *testing.T) {
}
tmpStr := string(reqBodyExpected)
fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz", "POST", &tmpStr)
if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo&timeout=1s" {
if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo&sync=true&timeout=1s" {
t.Errorf("Unexpected query: %v", fakeHandler.RequestReceived.URL.RawQuery)
}
if fakeHandler.RequestReceived.Header["Authorization"] == nil {
@ -181,7 +182,7 @@ func TestDoRequestNewWayFile(t *testing.T) {
}
tmpStr := string(reqBodyExpected)
fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz", "POST", &tmpStr)
if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo&timeout=1s" {
if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo&sync=true&timeout=1s" {
t.Errorf("Unexpected query: %v", fakeHandler.RequestReceived.URL.RawQuery)
}
if fakeHandler.RequestReceived.Header["Authorization"] == nil {
@ -213,3 +214,19 @@ func TestAbsPath(t *testing.T) {
t.Errorf("unexpected path: %s, expected %s", r.path, expectedPath)
}
}
func TestSync(t *testing.T) {
c := New("", nil)
r := c.Get()
if !r.sync {
t.Errorf("sync has wrong default")
}
r.Sync(false)
if r.sync {
t.Errorf("'Sync' doesn't work")
}
r.Sync(true)
if !r.sync {
t.Errorf("'Sync' doesn't work")
}
}

View File

@ -17,6 +17,8 @@ limitations under the License.
package registry
import (
"fmt"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
@ -55,7 +57,9 @@ func (storage *ControllerRegistryStorage) Get(id string) (interface{}, error) {
}
func (storage *ControllerRegistryStorage) Delete(id string) (<-chan interface{}, error) {
return apiserver.MakeAsync(func() interface{} { return api.Status{Status: api.StatusSuccess} }), storage.registry.DeleteController(id)
return apiserver.MakeAsync(func() (interface{}, error) {
return api.Status{Status: api.StatusSuccess}, storage.registry.DeleteController(id)
}), nil
}
func (storage *ControllerRegistryStorage) Extract(body []byte) (interface{}, error) {
@ -64,10 +68,36 @@ func (storage *ControllerRegistryStorage) Extract(body []byte) (interface{}, err
return result, err
}
func (storage *ControllerRegistryStorage) Create(controller interface{}) (<-chan interface{}, error) {
return apiserver.MakeAsync(func() interface{} { return controller }), storage.registry.CreateController(controller.(api.ReplicationController))
func (storage *ControllerRegistryStorage) Create(obj interface{}) (<-chan interface{}, error) {
controller, ok := obj.(api.ReplicationController)
if !ok {
return nil, fmt.Errorf("not a replication controller: %#v", obj)
}
if controller.ID == "" {
return nil, fmt.Errorf("ID should not be empty: %#v", controller)
}
return apiserver.MakeAsync(func() (interface{}, error) {
err := storage.registry.CreateController(controller)
if err != nil {
return nil, err
}
return storage.registry.GetController(controller.ID)
}), nil
}
func (storage *ControllerRegistryStorage) Update(controller interface{}) (<-chan interface{}, error) {
return apiserver.MakeAsync(func() interface{} { return controller }), storage.registry.UpdateController(controller.(api.ReplicationController))
func (storage *ControllerRegistryStorage) Update(obj interface{}) (<-chan interface{}, error) {
controller, ok := obj.(api.ReplicationController)
if !ok {
return nil, fmt.Errorf("not a replication controller: %#v", obj)
}
if controller.ID == "" {
return nil, fmt.Errorf("ID should not be empty: %#v", controller)
}
return apiserver.MakeAsync(func() (interface{}, error) {
err := storage.registry.UpdateController(controller)
if err != nil {
return nil, err
}
return storage.registry.GetController(controller.ID)
}), nil
}

View File

@ -140,8 +140,28 @@ func (storage *MinionRegistryStorage) Extract(body []byte) (interface{}, error)
return minion, err
}
func (storage *MinionRegistryStorage) Create(minion interface{}) (<-chan interface{}, error) {
return apiserver.MakeAsync(func() interface{} { return minion }), storage.registry.Insert(minion.(api.Minion).ID)
func (storage *MinionRegistryStorage) Create(obj interface{}) (<-chan interface{}, error) {
minion, ok := obj.(api.Minion)
if !ok {
return nil, fmt.Errorf("not a minion: %#v", obj)
}
if minion.ID == "" {
return nil, fmt.Errorf("ID should not be empty: %#v", minion)
}
return apiserver.MakeAsync(func() (interface{}, error) {
err := storage.registry.Insert(minion.ID)
if err != nil {
return nil, err
}
contains, err := storage.registry.Contains(minion.ID)
if err != nil {
return nil, err
}
if contains {
return storage.toApiMinion(minion.ID), nil
}
return nil, fmt.Errorf("unable to add minion %#v", minion)
}), nil
}
func (storage *MinionRegistryStorage) Update(minion interface{}) (<-chan interface{}, error) {
@ -156,5 +176,7 @@ func (storage *MinionRegistryStorage) Delete(id string) (<-chan interface{}, err
if err != nil {
return nil, err
}
return apiserver.MakeAsync(func() interface{} { return api.Status{Status: api.StatusSuccess} }), storage.registry.Delete(id)
return apiserver.MakeAsync(func() (interface{}, error) {
return api.Status{Status: api.StatusSuccess}, storage.registry.Delete(id)
}), nil
}

View File

@ -73,20 +73,32 @@ func TestMinionRegistryStorage(t *testing.T) {
t.Errorf("has unexpected object")
}
if _, err := ms.Create(api.Minion{JSONBase: api.JSONBase{ID: "baz"}}); err != nil {
c, err := ms.Create(api.Minion{JSONBase: api.JSONBase{ID: "baz"}})
if err != nil {
t.Errorf("insert failed")
}
obj := <-c
if m, ok := obj.(api.Minion); !ok || m.ID != "baz" {
t.Errorf("insert return value was weird: %#v", obj)
}
if obj, err := ms.Get("baz"); err != nil || obj.(api.Minion).ID != "baz" {
t.Errorf("insert didn't actually insert")
}
if _, err := ms.Delete("bar"); err != nil {
c, err = ms.Delete("bar")
if err != nil {
t.Errorf("delete failed")
}
obj = <-c
if s, ok := obj.(api.Status); !ok || s.Status != api.StatusSuccess {
t.Errorf("delete return value was weird: %#v", obj)
}
if _, err := ms.Get("bar"); err != ErrDoesNotExist {
t.Errorf("delete didn't actually delete")
}
if _, err := ms.Delete("bar"); err != ErrDoesNotExist {
_, err = ms.Delete("bar")
if err != ErrDoesNotExist {
t.Errorf("delete returned wrong error")
}

View File

@ -131,7 +131,9 @@ func (storage *PodRegistryStorage) Get(id string) (interface{}, error) {
}
func (storage *PodRegistryStorage) Delete(id string) (<-chan interface{}, error) {
return apiserver.MakeAsync(func() interface{} { return api.Status{Status: api.StatusSuccess} }), storage.registry.DeletePod(id)
return apiserver.MakeAsync(func() (interface{}, error) {
return api.Status{Status: api.StatusSuccess}, storage.registry.DeletePod(id)
}), nil
}
func (storage *PodRegistryStorage) Extract(body []byte) (interface{}, error) {
@ -140,19 +142,37 @@ func (storage *PodRegistryStorage) Extract(body []byte) (interface{}, error) {
return pod, err
}
func (storage *PodRegistryStorage) Create(pod interface{}) (<-chan interface{}, error) {
podObj := pod.(api.Pod)
if len(podObj.ID) == 0 {
func (storage *PodRegistryStorage) Create(obj interface{}) (<-chan interface{}, error) {
pod := obj.(api.Pod)
if len(pod.ID) == 0 {
return nil, fmt.Errorf("id is unspecified: %#v", pod)
}
machine, err := storage.scheduler.Schedule(podObj)
if err != nil {
return nil, err
return apiserver.MakeAsync(func() (interface{}, error) {
// TODO(lavalamp): Separate scheduler more cleanly.
machine, err := storage.scheduler.Schedule(pod)
if err != nil {
return nil, err
}
err = storage.registry.CreatePod(machine, pod)
if err != nil {
return nil, err
}
return storage.registry.GetPod(pod.ID)
}), nil
}
func (storage *PodRegistryStorage) Update(obj interface{}) (<-chan interface{}, error) {
pod := obj.(api.Pod)
if len(pod.ID) == 0 {
return nil, fmt.Errorf("id is unspecified: %#v", pod)
}
return apiserver.MakeAsync(func() interface{} { return pod }), storage.registry.CreatePod(machine, podObj)
}
func (storage *PodRegistryStorage) Update(pod interface{}) (<-chan interface{}, error) {
return apiserver.MakeAsync(func() interface{} { return pod }), storage.registry.UpdatePod(pod.(api.Pod))
return apiserver.MakeAsync(func() (interface{}, error) {
err := storage.registry.UpdatePod(pod)
if err != nil {
return nil, err
}
return storage.registry.GetPod(pod.ID)
}), nil
}

View File

@ -82,24 +82,26 @@ func (sr *ServiceRegistryStorage) Get(id string) (interface{}, error) {
}
func (sr *ServiceRegistryStorage) Delete(id string) (<-chan interface{}, error) {
svc, err := sr.Get(id)
service, err := sr.registry.GetService(id)
if err != nil {
return nil, err
}
if svc.(*api.Service).CreateExternalLoadBalancer {
var balancer cloudprovider.TCPLoadBalancer
var ok bool
if sr.cloud != nil {
balancer, ok = sr.cloud.TCPLoadBalancer()
}
if ok && balancer != nil {
err = balancer.DeleteTCPLoadBalancer(id, "us-central1")
if err != nil {
return nil, err
return apiserver.MakeAsync(func() (interface{}, error) {
if service.CreateExternalLoadBalancer {
var balancer cloudprovider.TCPLoadBalancer
var ok bool
if sr.cloud != nil {
balancer, ok = sr.cloud.TCPLoadBalancer()
}
if ok && balancer != nil {
err = balancer.DeleteTCPLoadBalancer(id, "us-central1")
if err != nil {
return nil, err
}
}
}
}
return apiserver.MakeAsync(func() interface{} { return api.Status{Status: api.StatusSuccess} }), sr.registry.DeleteService(id)
return api.Status{Status: api.StatusSuccess}, sr.registry.DeleteService(id)
}), nil
}
func (sr *ServiceRegistryStorage) Extract(body []byte) (interface{}, error) {
@ -110,29 +112,51 @@ func (sr *ServiceRegistryStorage) Extract(body []byte) (interface{}, error) {
func (sr *ServiceRegistryStorage) Create(obj interface{}) (<-chan interface{}, error) {
srv := obj.(api.Service)
if srv.CreateExternalLoadBalancer {
var balancer cloudprovider.TCPLoadBalancer
var ok bool
if sr.cloud != nil {
balancer, ok = sr.cloud.TCPLoadBalancer()
}
if ok && balancer != nil {
hosts, err := sr.machines.List()
if err != nil {
return nil, err
}
err = balancer.CreateTCPLoadBalancer(srv.ID, "us-central1", srv.Port, hosts)
if err != nil {
return nil, err
}
} else {
return nil, fmt.Errorf("requested an external service, but no cloud provider supplied.")
}
if srv.ID == "" {
return nil, fmt.Errorf("ID should not be empty: %#v", srv)
}
// TODO actually wait for the object to be fully created here.
return apiserver.MakeAsync(func() interface{} { return obj }), sr.registry.CreateService(srv)
return apiserver.MakeAsync(func() (interface{}, error) {
// TODO: Consider moving this to a rectification loop, so that we make/remove external load balancers
// correctly no matter what http operations happen.
if srv.CreateExternalLoadBalancer {
var balancer cloudprovider.TCPLoadBalancer
var ok bool
if sr.cloud != nil {
balancer, ok = sr.cloud.TCPLoadBalancer()
}
if ok && balancer != nil {
hosts, err := sr.machines.List()
if err != nil {
return nil, err
}
err = balancer.CreateTCPLoadBalancer(srv.ID, "us-central1", srv.Port, hosts)
if err != nil {
return nil, err
}
} else {
return nil, fmt.Errorf("requested an external service, but no cloud provider supplied.")
}
}
// TODO actually wait for the object to be fully created here.
err := sr.registry.CreateService(srv)
if err != nil {
return nil, err
}
return sr.registry.GetService(srv.ID)
}), nil
}
func (sr *ServiceRegistryStorage) Update(obj interface{}) (<-chan interface{}, error) {
return apiserver.MakeAsync(func() interface{} { return obj }), sr.registry.UpdateService(obj.(api.Service))
srv := obj.(api.Service)
if srv.ID == "" {
return nil, fmt.Errorf("ID should not be empty: %#v", srv)
}
return apiserver.MakeAsync(func() (interface{}, error) {
// TODO: check to see if external load balancer status changed
err := sr.registry.UpdateService(srv)
if err != nil {
return nil, err
}
return sr.registry.GetService(srv.ID)
}), nil
}

View File

@ -34,7 +34,8 @@ func TestServiceRegistry(t *testing.T) {
svc := api.Service{
JSONBase: api.JSONBase{ID: "foo"},
}
storage.Create(svc)
c, _ := storage.Create(svc)
<-c
if len(fakeCloud.Calls) != 0 {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
@ -57,7 +58,8 @@ func TestServiceRegistryExternalService(t *testing.T) {
JSONBase: api.JSONBase{ID: "foo"},
CreateExternalLoadBalancer: true,
}
storage.Create(svc)
c, _ := storage.Create(svc)
<-c
if len(fakeCloud.Calls) != 1 || fakeCloud.Calls[0] != "create" {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
@ -82,7 +84,8 @@ func TestServiceRegistryExternalServiceError(t *testing.T) {
JSONBase: api.JSONBase{ID: "foo"},
CreateExternalLoadBalancer: true,
}
storage.Create(svc)
c, _ := storage.Create(svc)
<-c
if len(fakeCloud.Calls) != 1 || fakeCloud.Calls[0] != "create" {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
@ -106,7 +109,8 @@ func TestServiceRegistryDelete(t *testing.T) {
}
memory.CreateService(svc)
storage.Delete(svc.ID)
c, _ := storage.Delete(svc.ID)
<-c
if len(fakeCloud.Calls) != 0 {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
@ -131,7 +135,8 @@ func TestServiceRegistryDeleteExternal(t *testing.T) {
}
memory.CreateService(svc)
storage.Delete(svc.ID)
c, _ := storage.Delete(svc.ID)
<-c
if len(fakeCloud.Calls) != 1 || fakeCloud.Calls[0] != "delete" {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)