diff --git a/queue/builtin/queue.go b/queue/builtin/queue.go new file mode 100644 index 000000000..bd4445732 --- /dev/null +++ b/queue/builtin/queue.go @@ -0,0 +1,81 @@ +package builtin + +import ( + "sync" + + "github.com/drone/drone/queue" +) + +type Queue struct { + sync.Mutex + + acks map[*queue.Work]struct{} + items map[*queue.Work]struct{} + itemc chan *queue.Work +} + +func New() *Queue { + return &Queue{ + acks: make(map[*queue.Work]struct{}), + items: make(map[*queue.Work]struct{}), + itemc: make(chan *queue.Work, 999), + } +} + +// Publish inserts work at the tail of this queue, waiting for +// space to become available if the queue is full. +func (q *Queue) Publish(work *queue.Work) error { + q.Lock() + q.items[work] = struct{}{} + q.Unlock() + q.itemc <- work + return nil +} + +// Remove removes the specified work item from this queue, +// if it is present. +func (q *Queue) Remove(work *queue.Work) error { + return nil +} + +// Pull retrieves and removes the head of this queue, waiting +// if necessary until work becomes available. +func (q *Queue) Pull() *queue.Work { + work := <-q.itemc + q.Lock() + delete(q.items, work) + q.Unlock() + return work +} + +// PullAct retrieves and removes the head of this queue, waiting +// if necessary until work becomes available. Items pull from the +// queue that aren't acknowledged will be pushed back to the queue +// again when the default acknowledgement deadline is reached. +func (q *Queue) PullAck() *queue.Work { + work := q.Pull() + q.Lock() + q.acks[work] = struct{}{} + q.Unlock() + return work +} + +// Ack acknowledges an item in the queue was processed. +func (q *Queue) Ack(work *queue.Work) error { + q.Lock() + delete(q.acks, work) + q.Unlock() + return nil +} + +// Items returns a slice containing all of the work in this +// queue, in proper sequence. +func (q *Queue) Items() []*queue.Work { + q.Lock() + defer q.Unlock() + items := []*queue.Work{} + for work, _ := range q.items { + items = append(items, work) + } + return items +} diff --git a/queue/plugin/client.go b/queue/plugin/client.go new file mode 100644 index 000000000..d0f7706bc --- /dev/null +++ b/queue/plugin/client.go @@ -0,0 +1,109 @@ +package plugin + +import ( + "bytes" + "encoding/json" + "io" + "net/http" + "net/url" + + "github.com/drone/drone/queue" +) + +type Client struct { + url string + token string +} + +func New(url, token string) *Client { + return &Client{url, token} +} + +// Publish makes an http request to the remote queue +// to insert work at the tail. +func (c *Client) Publish(work *queue.Work) error { + return c.send("POST", "/queue", work, nil) +} + +// Remove makes an http request to the remote queue to +// remove the specified work item. +func (c *Client) Remove(work *queue.Work) error { + return c.send("DELETE", "/queue", work, nil) +} + +// Pull makes an http request to the remote queue to +// retrieve work. This initiates a long poll and will +// block until complete. +func (c *Client) Pull() *queue.Work { + out := &queue.Work{} + err := c.send("POST", "/queue/pull", nil, out) + if err != nil { + // TODO handle error + } + return out +} + +// Pull makes an http request to the remote queue to +// retrieve work, with an acknowldement required. +// This initiates a long poll and will block until +// complete. +func (c *Client) PullAck() *queue.Work { + out := &queue.Work{} + err := c.send("POST", "/queue/pull?ack=true", nil, out) + if err != nil { + // TODO handle error + } + return out +} + +// Ack makes an http request to the remote queue +// to acknowledge an item in the queue was processed. +func (c *Client) Ack(work *queue.Work) error { + return c.send("POST", "/queue/ack", nil, nil) +} + +// Items makes an http request to the remote queue +// to fetch a list of all work. +func (c *Client) Items() []*queue.Work { + out := []*queue.Work{} + err := c.send("GET", "/queue/items", nil, &out) + if err != nil { + // TODO handle error + } + return out +} + +// send is a helper function that makes an authenticated +// request to the remote http plugin. +func (c *Client) send(method, path string, in interface{}, out interface{}) error { + url_, err := url.Parse(c.url + path) + if err != nil { + return err + } + + var buf io.ReadWriter + if in != nil { + buf = new(bytes.Buffer) + err := json.NewEncoder(buf).Encode(in) + if err != nil { + return err + } + } + + req, err := http.NewRequest(method, url_.String(), buf) + if err != nil { + return err + } + req.Header.Add("Authorization", "Bearer "+c.token) + req.Header.Add("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if out == nil { + return nil + } + return json.NewDecoder(resp.Body).Decode(out) +} diff --git a/queue/plugin/server.go b/queue/plugin/server.go new file mode 100644 index 000000000..baf355072 --- /dev/null +++ b/queue/plugin/server.go @@ -0,0 +1,116 @@ +package plugin + +import ( + "net/http" + + "github.com/drone/drone/queue" + "github.com/gin-gonic/gin" +) + +// Handle returns an http.Handler that enables a remote +// client to interop with a Queue over http. +func Handle(queue queue.Queue, token string) http.Handler { + r := gin.New() + + // middleware to validate the authorization token + // and to inject the queue into the http context. + bearer := "Bearer " + token + r.Use(func(c *gin.Context) { + if c.Request.Header.Get("Authorization") != bearer { + c.AbortWithStatus(403) + return + } + c.Set("queue", queue) + c.Next() + }) + + r.POST("/queue", publish) + r.DELETE("/queue", remove) + r.POST("/queue/pull", pull) + r.POST("/queue/ack", ack) + r.POST("/queue/items", items) + + return r +} + +// publish handles an http request to the queue +// to insert work at the tail. +func publish(c *gin.Context) { + q := fromContext(c) + work := &queue.Work{} + if !c.Bind(work) { + c.AbortWithStatus(400) + return + } + err := q.Publish(work) + if err != nil { + c.Fail(500, err) + return + } + c.Writer.WriteHeader(200) +} + +// remove handles an http request to the queue +// to remove a work item. +func remove(c *gin.Context) { + q := fromContext(c) + work := &queue.Work{} + if !c.Bind(work) { + c.AbortWithStatus(400) + return + } + err := q.Remove(work) + if err != nil { + c.Fail(500, err) + return + } + c.Writer.WriteHeader(200) +} + +// pull handles an http request to the queue +// to retrieve work. +func pull(c *gin.Context) { + q := fromContext(c) + var work *queue.Work + if c.Request.FormValue("ack") != "" { + work = q.PullAck() + } else { + work = q.Pull() + } + if work == nil { + c.AbortWithStatus(500) + return + } + c.JSON(200, work) +} + +// ack handles an http request to the queue +// to confirm an item was successfully pulled. +func ack(c *gin.Context) { + q := fromContext(c) + work := &queue.Work{} + if !c.Bind(work) { + c.AbortWithStatus(400) + return + } + err := q.Ack(work) + if err != nil { + c.Fail(500, err) + return + } + c.Writer.WriteHeader(200) +} + +// items handles an http request to the queue to +// return a list of all work items. +func items(c *gin.Context) { + q := fromContext(c) + items := q.Items() + c.JSON(200, items) +} + +// helper function to retrieve the Queue from +// the context and cast appropriately. +func fromContext(c *gin.Context) queue.Queue { + return c.MustGet("queue").(queue.Queue) +} diff --git a/queue/queue.go b/queue/queue.go new file mode 100644 index 000000000..e0832d60f --- /dev/null +++ b/queue/queue.go @@ -0,0 +1,55 @@ +package queue + +type Queue interface { + // Publish inserts work at the tail of this queue, waiting for + // space to become available if the queue is full. + Publish(*Work) error + + // Remove removes the specified work item from this queue, + // if it is present. + Remove(*Work) error + + // Pull retrieves and removes the head of this queue, waiting + // if necessary until work becomes available. + Pull() *Work + + // PullAck retrieves and removes the head of this queue, waiting + // if necessary until work becomes available. Items pull from the + // queue that aren't acknowledged will be pushed back to the queue + // again when the default acknowledgement deadline is reached. + PullAck() *Work + + // Ack acknowledges an item in the queue was processed. + Ack(*Work) error + + // Items returns a slice containing all of the work in this + // queue, in proper sequence. + Items() []*Work +} + +// type Manager interface { +// // Register registers a worker that has signed +// // up to accept work. +// Register(*Worker) + +// // Unregister unregisters a worker that should no +// // longer be accepting work. +// Unregister(*Worker) + +// // Assign assigns work to a worker. +// Assign(*Work, *Worker) + +// // Unassign unassigns work from a worker. +// Unassign(*Work, *Worker) + +// // Work returns a list of all work that is +// // currently in progress. +// Work() []*Work + +// // Worker retrieves a worker by name. +// Worker(string) *Worker + +// // Workers returns a slice containing all workers +// // registered with the manager. +// Workers() []*Worker +// } diff --git a/queue/worker.go b/queue/worker.go new file mode 100644 index 000000000..912ef8b57 --- /dev/null +++ b/queue/worker.go @@ -0,0 +1,73 @@ +package queue + +import ( + "io" + + "github.com/drone/drone/common" +) + +// Work represents an item for work to be +// processed by a worker. +type Work struct { + User *common.User `json:"user"` + Repo *common.Repo `json:"repo"` + Build *common.Build `json:"build"` + Keys *common.Keypair `json:"keypair"` + Netrc *common.Netrc `json:"netrc"` + Yaml []byte `json:"yaml"` +} + +// represents a worker that has connected +// to the system in order to perform work +type Worker struct { + Name string + Addr string + IsHealthy bool +} + +// Ping pings to worker to verify it is +// available and in good health. +func (w *Worker) Ping() (bool, error) { + return false, nil +} + +// Logs fetches the logs for a work item. +func (w *Worker) Logs() (io.Reader, error) { + return nil, nil +} + +// Cancel cancels a work item. +func (w *Worker) Cancel() error { + return nil +} + +// type Monitor struct { +// manager *Manager +// } + +// func NewMonitor(manager *Manager) *Monitor { +// return &Monitor{manager} +// } + +// // start is a helper function that is used to monitor +// // all registered workers and ensure they are in a +// // healthy state. +// func (m *Monitor) Start() { +// ticker := time.NewTicker(1 * time.Hour) +// go func() { +// for { +// select { +// case <-ticker.C: +// workers := m.manager.Workers() +// for _, worker := range workers { +// // ping the worker to make sure it is +// // available and still accepting builds. +// if _, err := worker.Ping(); err != nil { +// m.manager.SetHealth(worker, false) +// } else { +// m.manager.SetHealth(worker, true) +// } +// } +// } +// } +// } diff --git a/server/queue.go b/server/queue.go new file mode 100644 index 000000000..b87c06411 --- /dev/null +++ b/server/queue.go @@ -0,0 +1,108 @@ +package server + +import ( + "io" + "io/ioutil" + "strconv" + + "github.com/gin-gonic/gin" + "github.com/gin-gonic/gin/binding" + + "github.com/drone/drone/common" + "github.com/drone/drone/eventbus" +) + +// TODO (bradrydzewski) the callback URL should be signed. +// TODO (bradrydzewski) we shouldn't need to fetch the Repo if specified in the URL path +// TODO (bradrydzewski) use SetRepoLast to update the last repository + +// GET /queue/pull +func PollBuild(c *gin.Context) { + queue := ToQueue(c) + work := queue.PullAck() + c.JSON(200, work) +} + +// GET /queue/push/:owner/:repo +func PushBuild(c *gin.Context) { + store := ToDatastore(c) + repo := ToRepo(c) + bus := ToBus(c) + in := &common.Build{} + if !c.BindWith(in, binding.JSON) { + return + } + build, err := store.Build(repo.FullName, in.Number) + if err != nil { + c.Fail(404, err) + return + } + err = store.SetBuildState(repo.FullName, build) + if err != nil { + c.Fail(500, err) + return + } + + bus.Send(&eventbus.Event{ + Build: build, + Repo: repo, + }) + if repo.Last != nil && repo.Last.Number > build.Number { + c.Writer.WriteHeader(200) + return + } + repo.Last = build + store.SetRepo(repo) + c.Writer.WriteHeader(200) +} + +// POST /queue/push/:owner/:repo/:build +func PushTask(c *gin.Context) { + store := ToDatastore(c) + repo := ToRepo(c) + bus := ToBus(c) + num, _ := strconv.Atoi(c.Params.ByName("build")) + in := &common.Task{} + if !c.BindWith(in, binding.JSON) { + return + } + err := store.SetBuildTask(repo.FullName, num, in) + if err != nil { + c.Fail(404, err) + return + } + build, err := store.Build(repo.FullName, num) + if err != nil { + c.Fail(404, err) + return + } + bus.Send(&eventbus.Event{ + Build: build, + Repo: repo, + }) + c.Writer.WriteHeader(200) +} + +// POST /queue/push/:owner/:repo/:build/:task/logs +func PushLogs(c *gin.Context) { + store := ToDatastore(c) + repo := ToRepo(c) + bnum, _ := strconv.Atoi(c.Params.ByName("build")) + tnum, _ := strconv.Atoi(c.Params.ByName("task")) + + // TODO (bradrydzewski) change this interface to accept an io.Reader + // instead of a byte array so that we can buffer the write and so that + // we avoid unnecessary copies of the data in memory. + logs, err := ioutil.ReadAll(io.LimitReader(c.Request.Body, 5000000)) //5MB + defer c.Request.Body.Close() + if err != nil { + c.Fail(500, err) + return + } + err = store.SetLogs(repo.FullName, bnum, tnum, logs) + if err != nil { + c.Fail(500, err) + return + } + c.Writer.WriteHeader(200) +} diff --git a/server/server.go b/server/server.go index 08f48d99d..55ef51d2f 100644 --- a/server/server.go +++ b/server/server.go @@ -9,11 +9,27 @@ import ( "github.com/drone/drone/common" "github.com/drone/drone/datastore" "github.com/drone/drone/eventbus" + "github.com/drone/drone/queue" "github.com/drone/drone/remote" "github.com/drone/drone/server/session" "github.com/drone/drone/settings" ) +func SetQueue(q queue.Queue) gin.HandlerFunc { + return func(c *gin.Context) { + c.Set("queue", q) + c.Next() + } +} + +func ToQueue(c *gin.Context) queue.Queue { + v, err := c.Get("queue") + if err != nil { + return nil + } + return v.(queue.Queue) +} + func SetBus(r eventbus.Bus) gin.HandlerFunc { return func(c *gin.Context) { c.Set("eventbus", r)