diff --git a/pkg/api/helper.go b/pkg/api/helper.go index d4bd0425db0..cd882110a5e 100644 --- a/pkg/api/helper.go +++ b/pkg/api/helper.go @@ -37,6 +37,8 @@ func init() { MinionList{}, Minion{}, Status{}, + ServerOpList{}, + ServerOp{}, ) } diff --git a/pkg/api/types.go b/pkg/api/types.go index e9f480385ff..ed18f287678 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -207,3 +207,14 @@ const ( StatusFailure = "failure" StatusWorking = "working" ) + +// Operation information, as delivered to API clients. +type ServerOp struct { + JSONBase `yaml:",inline" json:",inline"` +} + +// Operation list, as delivered to API clients. +type ServerOpList struct { + JSONBase `yaml:",inline" json:",inline"` + Items []ServerOp `yaml:"items,omitempty" json:"items,omitempty"` +} diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 4b7bc537c7e..cabe6541b07 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -41,9 +41,14 @@ type RESTStorage interface { Update(interface{}) (<-chan interface{}, error) } +// WorkFunc is used to perform any time consuming work for an api call, after +// the input has been validated. Pass one of these to MakeAsync to create an +// appropriate return value for the Update, Delete, and Create methods. +type WorkFunc func() (result interface{}, err error) + // MakeAsync takes a function and executes it, delivering the result in the way required // by RESTStorage's Update, Delete, and Create methods. -func MakeAsync(fn func() (interface{}, error)) <-chan interface{} { +func MakeAsync(fn WorkFunc) <-chan interface{} { channel := make(chan interface{}) go func() { defer util.HandleCrash() @@ -171,7 +176,7 @@ func (server *ApiServer) finishReq(out <-chan interface{}, sync bool, timeout ti if sync { op.WaitFor(timeout) } - obj, complete := op.Describe() + obj, complete := op.StatusOrResult() if complete { server.write(http.StatusOK, obj, w) } else { @@ -308,7 +313,7 @@ func (server *ApiServer) handleOperationRequest(parts []string, w http.ResponseW server.notFound(req, w) } - obj, complete := op.Describe() + obj, complete := op.StatusOrResult() if complete { server.write(http.StatusOK, obj, w) } else { diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index 7ed959b67ab..f3371300c3a 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -60,7 +60,8 @@ type SimpleRESTStorage struct { updated Simple created Simple - // called when answering update, delete, create + // If non-nil, called inside the WorkFunc when answering update, delete, create. + // obj recieves the original input to the update, delete, or create call. injectedFunction func(obj interface{}) (returnObj interface{}, err error) } diff --git a/pkg/apiserver/operation.go b/pkg/apiserver/operation.go index 41c20c07e54..09b25e2c34f 100644 --- a/pkg/apiserver/operation.go +++ b/pkg/apiserver/operation.go @@ -17,30 +17,16 @@ limitations under the License. package apiserver import ( - "fmt" "sort" + "strconv" "sync" + "sync/atomic" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) -func init() { - api.AddKnownTypes(ServerOp{}, ServerOpList{}) -} - -// Operation information, as delivered to API clients. -type ServerOp struct { - api.JSONBase `yaml:",inline" json:",inline"` -} - -// Operation list, as delivered to API clients. -type ServerOpList struct { - api.JSONBase `yaml:",inline" json:",inline"` - Items []ServerOp `yaml:"items,omitempty" json:"items,omitempty"` -} - // Operation represents an ongoing action which the server is performing. type Operation struct { ID string @@ -53,9 +39,12 @@ type Operation struct { // Operations tracks all the ongoing operations. type Operations struct { - lock sync.Mutex - ops map[string]*Operation - nextID int + // Access only using functions from atomic. + lastID int64 + + // 'lock' guards the ops map. + lock sync.Mutex + ops map[string]*Operation } // Returns a new Operations repository. @@ -67,25 +56,28 @@ func NewOperations() *Operations { return ops } -// Add a new operation. +// Add a new operation. Lock-free. func (ops *Operations) NewOperation(from <-chan interface{}) *Operation { - ops.lock.Lock() - defer ops.lock.Unlock() - id := fmt.Sprintf("%v", ops.nextID) - ops.nextID++ - + id := atomic.AddInt64(&ops.lastID, 1) op := &Operation{ - ID: id, + ID: strconv.FormatInt(id, 10), awaiting: from, notify: make(chan bool, 1), } go op.wait() - ops.ops[id] = op + go ops.insert(op) return op } +// Inserts op into the ops map. +func (ops *Operations) insert(op *Operation) { + ops.lock.Lock() + defer ops.lock.Unlock() + ops.ops[op.ID] = op +} + // List operations for an API client. -func (ops *Operations) List() ServerOpList { +func (ops *Operations) List() api.ServerOpList { ops.lock.Lock() defer ops.lock.Unlock() @@ -94,9 +86,9 @@ func (ops *Operations) List() ServerOpList { ids = append(ids, id) } sort.StringSlice(ids).Sort() - ol := ServerOpList{} + ol := api.ServerOpList{} for _, id := range ids { - ol.Items = append(ol.Items, ServerOp{JSONBase: api.JSONBase{ID: id}}) + ol.Items = append(ol.Items, api.ServerOp{JSONBase: api.JSONBase{ID: id}}) } return ol } @@ -124,7 +116,9 @@ func (ops *Operations) expire(maxAge time.Duration) { // Waits forever for the operation to complete; call via go when // the operation is created. Sets op.finished when the operation -// does complete. Does not keep op locked while waiting. +// does complete, and sends on the notify channel, in case there +// are any WaitFor() calls in progress. +// Does not keep op locked while waiting. func (op *Operation) wait() { defer util.HandleCrash() result := <-op.awaiting @@ -161,7 +155,7 @@ func (op *Operation) expired(limitTime time.Time) bool { // Return status information or the result of the operation if it is complete, // with a bool indicating true in the latter case. -func (op *Operation) Describe() (description interface{}, finished bool) { +func (op *Operation) StatusOrResult() (description interface{}, finished bool) { op.lock.Lock() defer op.lock.Unlock() diff --git a/pkg/apiserver/operation_test.go b/pkg/apiserver/operation_test.go index a5dba31ca34..35b06ab1547 100644 --- a/pkg/apiserver/operation_test.go +++ b/pkg/apiserver/operation_test.go @@ -17,6 +17,7 @@ limitations under the License. package apiserver import ( + "sync/atomic" "testing" "time" ) @@ -26,6 +27,10 @@ func TestOperation(t *testing.T) { c := make(chan interface{}) op := ops.NewOperation(c) + // Allow context switch, so that op's ID can get added to the map and Get will work. + // This is just so we can test Get. Ordinary users have no need to call Get immediately + // after calling NewOperation, because it returns the operation directly. + time.Sleep(time.Millisecond) go func() { time.Sleep(500 * time.Millisecond) c <- "All done" @@ -40,16 +45,28 @@ func TestOperation(t *testing.T) { } op.WaitFor(10 * time.Millisecond) - if _, completed := op.Describe(); completed { + if _, completed := op.StatusOrResult(); completed { t.Errorf("Unexpectedly fast completion") } - op.WaitFor(time.Second) - if _, completed := op.Describe(); !completed { + const waiters = 10 + var waited int32 + for i := 0; i < waiters; i++ { + go func() { + op.WaitFor(time.Hour) + atomic.AddInt32(&waited, 1) + }() + } + + op.WaitFor(time.Minute) + if _, completed := op.StatusOrResult(); !completed { t.Errorf("Unexpectedly slow completion") } time.Sleep(100 * time.Millisecond) + if waited != waiters { + t.Errorf("Multiple waiters doesn't work, only %v finished", waited) + } if op.expired(time.Now().Add(-time.Second)) { t.Errorf("Should not be expired: %#v", op)