From 1d1dff0dcf4fd5db96ace152bb285f86ca8bd39c Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Thu, 23 Apr 2015 16:22:23 -0700 Subject: [PATCH 1/3] added unit tests for queue --- queue/builtin/queue.go | 35 ++++++++++++++++- queue/builtin/queue_test.go | 75 +++++++++++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+), 2 deletions(-) create mode 100644 queue/builtin/queue_test.go diff --git a/queue/builtin/queue.go b/queue/builtin/queue.go index bd4445732..6256cad2a 100644 --- a/queue/builtin/queue.go +++ b/queue/builtin/queue.go @@ -35,6 +35,37 @@ func (q *Queue) Publish(work *queue.Work) error { // Remove removes the specified work item from this queue, // if it is present. func (q *Queue) Remove(work *queue.Work) error { + q.Lock() + defer q.Unlock() + + _, ok := q.items[work] + if !ok { + return nil + } + var items []*queue.Work + + // loop through and drain all items + // from the queue. +drain: + for { + select { + case item := <-q.itemc: + items = append(items, item) + default: + break drain + } + } + + // re-add all items to the queue except + // the item we're trying to remove + for _, item := range items { + if item == work { + delete(q.items, work) + delete(q.acks, work) + continue + } + q.itemc <- item + } return nil } @@ -48,7 +79,7 @@ func (q *Queue) Pull() *queue.Work { return work } -// PullAct retrieves and removes the head of this queue, waiting +// PullAck retrieves and removes the head of this queue, waiting // if necessary until work becomes available. Items pull from the // queue that aren't acknowledged will be pushed back to the queue // again when the default acknowledgement deadline is reached. @@ -74,7 +105,7 @@ func (q *Queue) Items() []*queue.Work { q.Lock() defer q.Unlock() items := []*queue.Work{} - for work, _ := range q.items { + for work := range q.items { items = append(items, work) } return items diff --git a/queue/builtin/queue_test.go b/queue/builtin/queue_test.go new file mode 100644 index 000000000..24cb80336 --- /dev/null +++ b/queue/builtin/queue_test.go @@ -0,0 +1,75 @@ +package builtin + +import ( + "testing" + + "github.com/drone/drone/queue" + . "github.com/franela/goblin" +) + +func TestBuild(t *testing.T) { + g := Goblin(t) + g.Describe("Queue", func() { + + g.It("Should publish item", func() { + w1 := &queue.Work{} + w2 := &queue.Work{} + q := New() + q.Publish(w1) + q.Publish(w2) + g.Assert(len(q.items)).Equal(2) + g.Assert(len(q.itemc)).Equal(2) + }) + + g.It("Should remove item", func() { + w1 := &queue.Work{} + w2 := &queue.Work{} + w3 := &queue.Work{} + q := New() + q.Publish(w1) + q.Publish(w2) + q.Publish(w3) + q.Remove(w2) + g.Assert(len(q.items)).Equal(2) + g.Assert(len(q.itemc)).Equal(2) + g.Assert(q.Pull()).Equal(w1) + g.Assert(q.Pull()).Equal(w3) + }) + + g.It("Should pull item", func() { + w1 := &queue.Work{} + w2 := &queue.Work{} + q := New() + q.Publish(w1) + q.Publish(w2) + g.Assert(q.Pull()).Equal(w1) + g.Assert(q.Pull()).Equal(w2) + }) + + g.It("Should pull item with ack", func() { + w := &queue.Work{} + q := New() + q.Publish(w) + g.Assert(q.PullAck()).Equal(w) + g.Assert(q.acks[w]).Equal(struct{}{}) + }) + + g.It("Should ack item", func() { + w := &queue.Work{} + q := New() + q.Publish(w) + g.Assert(q.PullAck()).Equal(w) + g.Assert(len(q.acks)).Equal(1) + g.Assert(q.Ack(w)).Equal(nil) + g.Assert(len(q.acks)).Equal(0) + }) + + g.It("Should get all items", func() { + q := New() + q.Publish(&queue.Work{}) + q.Publish(&queue.Work{}) + q.Publish(&queue.Work{}) + g.Assert(len(q.Items())).Equal(3) + }) + }) +} From 39e7ce9840ebc179f41e697525d0ec62fd30be8a Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Thu, 23 Apr 2015 16:32:46 -0700 Subject: [PATCH 2/3] add error when removing item not exists --- queue/builtin/queue.go | 5 ++++- queue/builtin/queue_test.go | 1 + 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/queue/builtin/queue.go b/queue/builtin/queue.go index 6256cad2a..d680d4ac8 100644 --- a/queue/builtin/queue.go +++ b/queue/builtin/queue.go @@ -1,11 +1,14 @@ package builtin import ( + "errors" "sync" "github.com/drone/drone/queue" ) +var ErrNotFound = errors.New("work item not found") + type Queue struct { sync.Mutex @@ -40,7 +43,7 @@ func (q *Queue) Remove(work *queue.Work) error { _, ok := q.items[work] if !ok { - return nil + return ErrNotFound } var items []*queue.Work diff --git a/queue/builtin/queue_test.go b/queue/builtin/queue_test.go index 24cb80336..6dce3f6a1 100644 --- a/queue/builtin/queue_test.go +++ b/queue/builtin/queue_test.go @@ -34,6 +34,7 @@ func TestBuild(t *testing.T) { g.Assert(len(q.itemc)).Equal(2) g.Assert(q.Pull()).Equal(w1) g.Assert(q.Pull()).Equal(w3) + g.Assert(q.Remove(w2)).Equal(ErrNotFound) }) g.It("Should pull item", func() { From da350989d3fe153adca132d4ef8e400f397ad509 Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Fri, 24 Apr 2015 14:25:03 -0700 Subject: [PATCH 3/3] fixed minor queue glitches --- datastore/bolt/build.go | 2 +- drone.go | 15 +++++++++++++++ queue/queue.go | 27 --------------------------- server/builds.go | 23 ++++++++++++++++++++++- server/hooks.go | 30 ++++++++++++++++++++---------- server/queue.go | 12 +++++++++++- 6 files changed, 69 insertions(+), 40 deletions(-) diff --git a/datastore/bolt/build.go b/datastore/bolt/build.go index 3d20a1be0..1a466eb86 100644 --- a/datastore/bolt/build.go +++ b/datastore/bolt/build.go @@ -204,7 +204,7 @@ func (db *DB) SetBuildTask(repo string, build int, task *common.Task) error { return err } build_.Updated = time.Now().UTC().Unix() - build_.Tasks[task.Number] = task // TODO check index to prevent nil pointer / panic + build_.Tasks[task.Number-1] = task // TODO check index to prevent nil pointer / panic return update(t, bucketBuild, key, build_) }) } diff --git a/drone.go b/drone.go index e8b331a90..782adb6cc 100644 --- a/drone.go +++ b/drone.go @@ -6,10 +6,13 @@ import ( "github.com/gin-gonic/gin" "github.com/drone/drone/datastore/bolt" + "github.com/drone/drone/eventbus" "github.com/drone/drone/remote/github" "github.com/drone/drone/server" "github.com/drone/drone/server/session" "github.com/drone/drone/settings" + + queue "github.com/drone/drone/queue/builtin" ) var path = flag.String("config", "drone.toml", "") @@ -31,8 +34,10 @@ func main() { api := r.Group("/api") api.Use(server.SetHeaders()) + api.Use(server.SetBus(eventbus.New())) api.Use(server.SetDatastore(ds)) api.Use(server.SetRemote(remote)) + api.Use(server.SetQueue(queue.New())) api.Use(server.SetSettings(settings)) api.Use(server.SetUser(session)) @@ -99,6 +104,16 @@ func main() { hooks.POST("", server.PostHook) } + queue := api.Group("/queue") + { + queue.Use(server.SetRepo()) + queue.GET("", server.GetQueue) + queue.POST("/pull", server.PollBuild) + queue.POST("/push/:owner/:name", server.PushBuild) + queue.POST("/push/:owner/:name/:build", server.PushTask) + queue.POST("/push/:owner/:name/:build/:task/logs", server.PushLogs) + } + auth := r.Group("/authorize") { auth.Use(server.SetHeaders()) diff --git a/queue/queue.go b/queue/queue.go index e0832d60f..73df7a70c 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -26,30 +26,3 @@ type Queue interface { // queue, in proper sequence. Items() []*Work } - -// type Manager interface { -// // Register registers a worker that has signed -// // up to accept work. -// Register(*Worker) - -// // Unregister unregisters a worker that should no -// // longer be accepting work. -// Unregister(*Worker) - -// // Assign assigns work to a worker. -// Assign(*Work, *Worker) - -// // Unassign unassigns work from a worker. -// Unassign(*Work, *Worker) - -// // Work returns a list of all work that is -// // currently in progress. -// Work() []*Work - -// // Worker retrieves a worker by name. -// Worker(string) *Worker - -// // Workers returns a slice containing all workers -// // registered with the manager. -// Workers() []*Worker -// } diff --git a/server/builds.go b/server/builds.go index 8535737a1..ac84bdb80 100644 --- a/server/builds.go +++ b/server/builds.go @@ -5,6 +5,7 @@ import ( "strconv" "github.com/drone/drone/common" + "github.com/drone/drone/queue" "github.com/gin-gonic/gin" "github.com/gin-gonic/gin/binding" ) @@ -102,6 +103,7 @@ func PostBuildStatus(c *gin.Context) { // func RunBuild(c *gin.Context) { store := ToDatastore(c) + queue_ := ToQueue(c) repo := ToRepo(c) num, err := strconv.Atoi(c.Params.ByName("number")) if err != nil { @@ -114,6 +116,18 @@ func RunBuild(c *gin.Context) { return } + keys, err := store.RepoKeypair(repo.FullName) + if err != nil { + c.Fail(404, err) + return + } + + user, err := store.User(repo.User.Login) + if err != nil { + c.Fail(404, err) + return + } + // must not restart a running build if build.State == common.StatePending || build.State == common.StateRunning { c.Fail(409, err) @@ -143,7 +157,14 @@ func RunBuild(c *gin.Context) { // raw = []byte(inject.InjectSafe(string(raw), params)) // } - // TODO push build to queue + queue_.Publish(&queue.Work{ + User: user, + Repo: repo, + Build: build, + Keys: keys, + Netrc: &common.Netrc{}, //TODO create netrc + Yaml: nil, // TODO fetch yaml + }) c.JSON(202, build) } diff --git a/server/hooks.go b/server/hooks.go index df7a9ae29..250ab0934 100644 --- a/server/hooks.go +++ b/server/hooks.go @@ -7,7 +7,7 @@ import ( "github.com/drone/drone/common" "github.com/drone/drone/parser/inject" "github.com/drone/drone/parser/matrix" - // "github.com/bradrydzewski/drone/worker" + "github.com/drone/drone/queue" "github.com/gin-gonic/gin" ) @@ -19,6 +19,7 @@ import ( func PostHook(c *gin.Context) { remote := ToRemote(c) store := ToDatastore(c) + queue_ := ToQueue(c) hook, err := remote.Hook(c.Request) if err != nil { @@ -107,19 +108,13 @@ func PostHook(c *gin.Context) { Environment: axis, }) } - - err = store.SetBuild(repo.FullName, build) + keys, err := store.RepoKeypair(repo.FullName) if err != nil { - c.Fail(500, err) + log.Errorf("failure to fetch keypair for %s. %s", repo.FullName, err) + c.Fail(404, err) return } - // w := worker.Work{ - // User: user, - // Repo: repo, - // Build: build, - // } - // verify the branches can be built vs skipped // s, _ := script.ParseBuild(string(yml)) // if len(hook.PullRequest) == 0 && !s.MatchBranch(hook.Branch) { @@ -127,5 +122,20 @@ func PostHook(c *gin.Context) { // return // } + err = store.SetBuild(repo.FullName, build) + if err != nil { + c.Fail(500, err) + return + } + + queue_.Publish(&queue.Work{ + User: user, + Repo: repo, + Build: build, + Keys: keys, + Netrc: &common.Netrc{}, // TODO + Yaml: raw, + }) + c.JSON(200, build) } diff --git a/server/queue.go b/server/queue.go index b87c06411..c8315e397 100644 --- a/server/queue.go +++ b/server/queue.go @@ -19,7 +19,7 @@ import ( // GET /queue/pull func PollBuild(c *gin.Context) { queue := ToQueue(c) - work := queue.PullAck() + work := queue.Pull() c.JSON(200, work) } @@ -37,6 +37,10 @@ func PushBuild(c *gin.Context) { c.Fail(404, err) return } + build.Duration = in.Duration + build.Started = in.Started + build.Finished = in.Finished + build.State = in.State err = store.SetBuildState(repo.FullName, build) if err != nil { c.Fail(500, err) @@ -106,3 +110,9 @@ func PushLogs(c *gin.Context) { } c.Writer.WriteHeader(200) } + +func GetQueue(c *gin.Context) { + queue := ToQueue(c) + items := queue.Items() + c.JSON(200, items) +}