From 204fba901824bf659cefe060b77effae064fd7cf Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Tue, 5 May 2015 01:04:20 -0700 Subject: [PATCH] some initial work on a builtin build runner --- drone.go | 31 +++-- queue/builtin/queue.go | 14 +-- queue/builtin/queue_test.go | 36 ++++-- queue/plugin/client.go | 25 +++- queue/plugin/server.go | 7 +- queue/queue.go | 6 - remote/github/helper.go | 6 +- runner/builtin/runner.go | 223 ++++++++++++++++++++++++++++++++++ runner/builtin/updater.go | 61 ++++++++++ runner/builtin/worker.go | 235 ++++++++++++++++++++++++++++++++++++ runner/runner.go | 22 ++++ server/queue.go | 3 + 12 files changed, 624 insertions(+), 45 deletions(-) create mode 100644 runner/builtin/runner.go create mode 100644 runner/builtin/updater.go create mode 100644 runner/builtin/worker.go create mode 100644 runner/runner.go diff --git a/drone.go b/drone.go index c119321c9..0158bc85a 100644 --- a/drone.go +++ b/drone.go @@ -16,6 +16,7 @@ import ( store "github.com/drone/drone/datastore/builtin" eventbus "github.com/drone/drone/eventbus/builtin" queue "github.com/drone/drone/queue/builtin" + runner "github.com/drone/drone/runner/builtin" ) var conf = flag.String("config", "drone.toml", "") @@ -27,20 +28,26 @@ func main() { if err != nil { panic(err) } + + store := store.Must(settings.Database.Path) + defer store.Close() + remote := github.New(settings.Service) session := session.New(settings.Session) - - ds := store.Must(settings.Database.Path) - defer ds.Close() + eventbus_ := eventbus.New() + queue_ := queue.New() + updater := runner.NewUpdater(eventbus_, store) + runner_ := runner.Runner{updater} + go run(&runner_, queue_) r := gin.Default() api := r.Group("/api") api.Use(server.SetHeaders()) - api.Use(server.SetBus(eventbus.New())) - api.Use(server.SetDatastore(ds)) + api.Use(server.SetBus(eventbus_)) + api.Use(server.SetDatastore(store)) api.Use(server.SetRemote(remote)) - api.Use(server.SetQueue(queue.New())) + api.Use(server.SetQueue(queue_)) api.Use(server.SetSettings(settings)) api.Use(server.SetSession(session)) api.Use(server.SetUser(session)) @@ -143,7 +150,7 @@ func main() { auth := r.Group("/authorize") { auth.Use(server.SetHeaders()) - auth.Use(server.SetDatastore(ds)) + auth.Use(server.SetDatastore(store)) auth.Use(server.SetRemote(remote)) auth.Use(server.SetSettings(settings)) auth.Use(server.SetSession(session)) @@ -178,3 +185,13 @@ func index() *template.Template { filestr := string(file) return template.Must(template.New("index.html").Parse(filestr)) } + +// run is a helper function for initializing the +// built-in build runner, if not running in remote +// mode. +func run(r *runner.Runner, q *queue.Queue) { + defer func() { + recover() + }() + r.Poll(q) +} diff --git a/queue/builtin/queue.go b/queue/builtin/queue.go index 4e6d2c89d..3686d36a3 100644 --- a/queue/builtin/queue.go +++ b/queue/builtin/queue.go @@ -78,6 +78,7 @@ func (q *Queue) Pull() *queue.Work { work := <-q.itemc q.Lock() delete(q.items, work) + q.acks[work] = struct{}{} q.Unlock() return work } @@ -94,24 +95,13 @@ func (q *Queue) PullClose(cn queue.CloseNotifier) *queue.Work { case work := <-q.itemc: q.Lock() delete(q.items, work) + q.acks[work] = struct{}{} q.Unlock() return work } } } -// PullAck retrieves and removes the head of this queue, waiting -// if necessary until work becomes available. Items pull from the -// queue that aren't acknowledged will be pushed back to the queue -// again when the default acknowledgement deadline is reached. -func (q *Queue) PullAck() *queue.Work { - work := q.Pull() - q.Lock() - q.acks[work] = struct{}{} - q.Unlock() - return work -} - // Ack acknowledges an item in the queue was processed. func (q *Queue) Ack(work *queue.Work) error { q.Lock() diff --git a/queue/builtin/queue_test.go b/queue/builtin/queue_test.go index 6dce3f6a1..31151cad5 100644 --- a/queue/builtin/queue_test.go +++ b/queue/builtin/queue_test.go @@ -1,6 +1,7 @@ package builtin import ( + "sync" "testing" "github.com/drone/drone/queue" @@ -41,25 +42,38 @@ func TestBuild(t *testing.T) { w1 := &queue.Work{} w2 := &queue.Work{} q := New() + c := new(closeNotifier) q.Publish(w1) q.Publish(w2) g.Assert(q.Pull()).Equal(w1) - g.Assert(q.Pull()).Equal(w2) + g.Assert(q.PullClose(c)).Equal(w2) + g.Assert(q.acks[w1]).Equal(struct{}{}) + g.Assert(q.acks[w2]).Equal(struct{}{}) + g.Assert(len(q.acks)).Equal(2) }) - g.It("Should pull item with ack", func() { - w := &queue.Work{} + g.It("Should cancel pulling item", func() { q := New() - q.Publish(w) - g.Assert(q.PullAck()).Equal(w) - g.Assert(q.acks[w]).Equal(struct{}{}) + c := new(closeNotifier) + c.closec = make(chan bool, 1) + var wg sync.WaitGroup + go func() { + wg.Add(1) + g.Assert(q.PullClose(c) == nil).IsTrue() + wg.Done() + }() + go func() { + c.closec <- true + }() + wg.Wait() }) g.It("Should ack item", func() { w := &queue.Work{} + c := new(closeNotifier) q := New() q.Publish(w) - g.Assert(q.PullAck()).Equal(w) + g.Assert(q.PullClose(c)).Equal(w) g.Assert(len(q.acks)).Equal(1) g.Assert(q.Ack(w)).Equal(nil) g.Assert(len(q.acks)).Equal(0) @@ -74,3 +88,11 @@ func TestBuild(t *testing.T) { }) }) } + +type closeNotifier struct { + closec chan bool +} + +func (c *closeNotifier) CloseNotify() <-chan bool { + return c.closec +} diff --git a/queue/plugin/client.go b/queue/plugin/client.go index d0f7706bc..ccdca760a 100644 --- a/queue/plugin/client.go +++ b/queue/plugin/client.go @@ -44,12 +44,11 @@ func (c *Client) Pull() *queue.Work { } // Pull makes an http request to the remote queue to -// retrieve work, with an acknowldement required. -// This initiates a long poll and will block until -// complete. -func (c *Client) PullAck() *queue.Work { +// retrieve work. This initiates a long poll and will +// block until complete. +func (c *Client) PullClose(cn queue.CloseNotifier) *queue.Work { out := &queue.Work{} - err := c.send("POST", "/queue/pull?ack=true", nil, out) + err := c.send("POST", "/queue/pull", nil, out) if err != nil { // TODO handle error } @@ -96,7 +95,6 @@ func (c *Client) send(method, path string, in interface{}, out interface{}) erro } req.Header.Add("Authorization", "Bearer "+c.token) req.Header.Add("Content-Type", "application/json") - resp, err := http.DefaultClient.Do(req) if err != nil { return err @@ -107,3 +105,18 @@ func (c *Client) send(method, path string, in interface{}, out interface{}) erro } return json.NewDecoder(resp.Body).Decode(out) } + +// In order to implement PullClose() we'll need to use a custom transport: +// +// tr := &http.Transport{} +// client := &http.Client{Transport: tr} +// c := make(chan error, 1) +// go func() { c <- f(client.Do(req)) }() +// select { +// case <-ctx.Done(): +// tr.CancelRequest(req) +// <-c // Wait for f to return. +// return ctx.Err() +// case err := <-c: +// return err +// } diff --git a/queue/plugin/server.go b/queue/plugin/server.go index bab3b931c..ce3153044 100644 --- a/queue/plugin/server.go +++ b/queue/plugin/server.go @@ -71,12 +71,7 @@ func remove(c *gin.Context) { // to retrieve work. func pull(c *gin.Context) { q := fromContext(c) - var work *queue.Work - if c.Request.FormValue("ack") == "true" { - work = q.PullAck() - } else { - work = q.Pull() - } + work := q.PullClose(c.Writer) if work == nil { c.AbortWithStatus(500) return diff --git a/queue/queue.go b/queue/queue.go index 54ba17c95..400f28e61 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -19,12 +19,6 @@ type Queue interface { // if the subscribing client terminates its connection. PullClose(CloseNotifier) *Work - // PullAck retrieves and removes the head of this queue, waiting - // if necessary until work becomes available. Items pull from the - // queue that aren't acknowledged will be pushed back to the queue - // again when the default acknowledgement deadline is reached. - PullAck() *Work - // Ack acknowledges an item in the queue was processed. Ack(*Work) error diff --git a/remote/github/helper.go b/remote/github/helper.go index 8714e4b2a..458dc33a1 100644 --- a/remote/github/helper.go +++ b/remote/github/helper.go @@ -172,7 +172,11 @@ func GetHook(client *github.Client, owner, name, url string) (*github.Hook, erro return nil, err } for _, hook := range hooks { - if strings.HasPrefix(hook.Config["url"].(string), url) { + hookurl, ok := hook.Config["url"].(string) + if !ok { + continue + } + if strings.HasPrefix(hookurl, url) { return &hook, nil } } diff --git a/runner/builtin/runner.go b/runner/builtin/runner.go new file mode 100644 index 000000000..d2ceb5e8f --- /dev/null +++ b/runner/builtin/runner.go @@ -0,0 +1,223 @@ +package builtin + +import ( + "crypto/sha1" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "os" + "time" + + "github.com/drone/drone/common" + "github.com/drone/drone/queue" + "github.com/samalba/dockerclient" + + log "github.com/Sirupsen/logrus" +) + +var ( + // Defult docker host address + DefaultHost = "unix:///var/run/docker.sock" + + // Docker host address from environment variable + DockerHost = os.Getenv("DOCKER_HOST") +) + +func init() { + // if the environment doesn't specify a DOCKER_HOST + // we should use the default Docker socket. + if len(DockerHost) == 0 { + DockerHost = DefaultHost + } +} + +type Runner struct { + Updater +} + +func (r *Runner) Run(w *queue.Work) error { + var workers []*worker + var client dockerclient.Client + + defer func() { + recover() + + // ensures that all containers have been removed + // from the host machine. + for _, worker := range workers { + worker.Remove() + } + + // if any part of the build fails and leaves + // behind orphan sub-builds we need to cleanup + // after ourselves. + if w.Build.State == common.StateRunning { + // if any tasks are running or pending + // we should mark them as complete. + for _, t := range w.Build.Tasks { + if t.State == common.StateRunning { + t.State = common.StateError + t.Finished = time.Now().UTC().Unix() + t.Duration = t.Finished - t.Started + } + if t.State == common.StatePending { + t.State = common.StateError + t.Started = time.Now().UTC().Unix() + t.Finished = time.Now().UTC().Unix() + t.Duration = 0 + } + r.SetTask(w.Repo, w.Build, t) + } + // mark the build as complete (with error) + w.Build.State = common.StateError + w.Build.Duration = w.Build.Finished - w.Build.Started + w.Build.Finished = time.Now().UTC().Unix() + r.SetBuild(w.Repo, w.Build) + } + }() + + // marks the build as running + w.Build.Started = time.Now().UTC().Unix() + w.Build.State = common.StateRunning + err := r.SetBuild(w.Repo, w.Build) + if err != nil { + return err + } + + // create the Docker client. In this version of Drone (alpha) + // we do not spread builds across clients, but this can and + // (probably) will change in the future. + client, err = dockerclient.NewDockerClient(DockerHost, nil) + if err != nil { + return err + } + + // loop through and execute the build and + // clone steps for each build task. + for _, task := range w.Build.Tasks { + + // marks the task as running + task.State = common.StateRunning + task.Started = time.Now().UTC().Unix() + err = r.SetTask(w.Repo, w.Build, task) + if err != nil { + return err + } + + work := &work{ + Repo: w.Repo, + Build: w.Build, + Keys: w.Keys, + Yaml: w.Yaml, + Task: task, + } + in, err := json.Marshal(work) + if err != nil { + return err + } + worker := newWorkerTimeout(client, w.Repo.Timeout+10) // 10 minute buffer + workers = append(workers, worker) + cname := cname(w.Repo.FullName, w.Build.Number, task.Number) + state, err := worker.Build(cname, in) + + switch { + case err == ErrTimeout: + task.State = common.StateKilled + case err != nil: + task.State = common.StateError + case state != 0: + task.ExitCode = state + task.State = common.StateFailure + default: + task.State = common.StateSuccess + } + + // send the logs to the datastore + rc, err := worker.Logs() + if err != nil { + return err + } + err = r.SetLogs(w.Repo, w.Build, task, rc) + if err != nil { + return err + } + + // update the task in the datastore + task.Finished = time.Now().UTC().Unix() + task.Duration = task.Finished - task.Started + err = r.SetTask(w.Repo, w.Build, task) + if err != nil { + return err + } + } + + // update the build state if any of the sub-tasks + // had a non-success status + w.Build.State = common.StateSuccess + for _, task := range w.Build.Tasks { + if task.State != common.StateSuccess { + w.Build.State = task.State + break + } + } + err = r.SetBuild(w.Repo, w.Build) + if err != nil { + return err + } + + // loop through and execute the notifications and + // the destroy all containers afterward. + for i, task := range w.Build.Tasks { + work := &work{ + Repo: w.Repo, + Build: w.Build, + Keys: w.Keys, + Yaml: w.Yaml, + Task: task, + } + in, err := json.Marshal(work) + if err != nil { + return err + } + workers[i].Notify(in) + break + } + + return nil +} + +func (r *Runner) Cancel(repo string, build, task int) error { + client, err := dockerclient.NewDockerClient(DockerHost, nil) + if err != nil { + return err + } + return client.StopContainer(cname(repo, build, task), 30) +} + +func (r *Runner) Logs(repo string, build, task int) (io.ReadCloser, error) { + client, err := dockerclient.NewDockerClient(DockerHost, nil) + if err != nil { + return nil, err + } + return client.ContainerLogs(cname(repo, build, task), logOptsTail) +} + +func cname(repo string, number, task int) string { + s := fmt.Sprintf("%s/%d/%d", repo, number, task) + h := sha1.New() + h.Write([]byte(s)) + hash := hex.EncodeToString(h.Sum(nil))[:10] + return fmt.Sprintf("drone-%s", hash) +} + +func (r *Runner) Poll(q queue.Queue) { + for { + w := q.Pull() + q.Ack(w) + err := r.Run(w) + if err != nil { + log.Error(err) + } + } +} diff --git a/runner/builtin/updater.go b/runner/builtin/updater.go new file mode 100644 index 000000000..776ed9d24 --- /dev/null +++ b/runner/builtin/updater.go @@ -0,0 +1,61 @@ +package builtin + +import ( + "io" + "io/ioutil" + + "github.com/drone/drone/common" + "github.com/drone/drone/datastore" + "github.com/drone/drone/eventbus" +) + +type Updater interface { + SetBuild(*common.Repo, *common.Build) error + SetTask(*common.Repo, *common.Build, *common.Task) error + SetLogs(*common.Repo, *common.Build, *common.Task, io.ReadCloser) error +} + +// NewUpdater returns an implementation of the Updater interface +// that directly modifies the database and sends messages to the bus. +func NewUpdater(bus eventbus.Bus, store datastore.Datastore) Updater { + return &updater{bus, store} +} + +type updater struct { + bus eventbus.Bus + store datastore.Datastore +} + +func (u *updater) SetBuild(r *common.Repo, b *common.Build) error { + err := u.store.SetBuildState(r.FullName, b) + if err != nil { + return err + } + u.bus.Send(&eventbus.Event{ + Repo: r, + Build: b, + }) + return nil +} + +func (u *updater) SetTask(r *common.Repo, b *common.Build, t *common.Task) error { + err := u.store.SetBuildTask(r.FullName, b.Number, t) + if err != nil { + return err + } + u.bus.Send(&eventbus.Event{ + Repo: r, + Build: b, + Task: t, + }) + return nil +} + +func (u *updater) SetLogs(r *common.Repo, b *common.Build, t *common.Task, rc io.ReadCloser) error { + defer rc.Close() + out, err := ioutil.ReadAll(rc) + if err != nil { + return err + } + return u.store.SetLogs(r.FullName, b.Number, t.Number, out) +} diff --git a/runner/builtin/worker.go b/runner/builtin/worker.go new file mode 100644 index 000000000..365b4c057 --- /dev/null +++ b/runner/builtin/worker.go @@ -0,0 +1,235 @@ +package builtin + +import ( + "errors" + "io" + "io/ioutil" + "time" + + "github.com/drone/drone/common" + "github.com/samalba/dockerclient" +) + +var ( + ErrTimeout = errors.New("Timeout") + ErrLogging = errors.New("Logs not available") +) + +var ( + // options to fetch the stdout and stderr logs + logOpts = &dockerclient.LogOptions{ + Stdout: true, + Stderr: true, + } + + // options to fetch the stdout and stderr logs + // by tailing the output. + logOptsTail = &dockerclient.LogOptions{ + Follow: true, + Stdout: true, + Stderr: true, + } +) + +var ( + // name of the build agent container. + DefaultAgent = "busybox" + + // default name of the build agent executable + DefaultEntrypoint = []string{"/bin/echo"} //[]string{"/bin/drone-agent"} + + // default argument to invoke build steps + DefaultBuildArgs = []string{"--build", "--clone", "--publish", "--deploy"} + + // default arguments to invoke notify steps + DefaultNotifyArgs = []string{"--notify"} + + // default arguments to invoke notify steps + DefaultNotifyTimeout = time.Minute * 5 +) + +type work struct { + Repo *common.Repo `json:"repo"` + Build *common.Build `json:"build"` + Task *common.Task `json:"task"` + Keys *common.Keypair `json:"keys"` + Netrc *common.Netrc `json:"netrc"` + Yaml []byte `json:"yaml"` +} + +type worker struct { + timeout time.Duration + client dockerclient.Client + build *dockerclient.ContainerInfo + notify *dockerclient.ContainerInfo +} + +func newWorker(client dockerclient.Client) *worker { + return newWorkerTimeout(client, 60) // default 60 minute timeout +} + +func newWorkerTimeout(client dockerclient.Client, timeout int64) *worker { + return &worker{ + timeout: time.Duration(timeout) * time.Minute, + client: client, + } +} + +// Build executes the clone, build and deploy steps. +func (w *worker) Build(name string, stdin []byte) (_ int, err error) { + // the command line arguments passed into the + // build agent container. + args := DefaultBuildArgs + args = append(args, "--") + args = append(args, string(stdin)) + + conf := &dockerclient.ContainerConfig{ + Image: DefaultAgent, + Entrypoint: DefaultEntrypoint, + Cmd: args, + HostConfig: dockerclient.HostConfig{ + Binds: []string{"/var/run/docker.sock:/var/run/docker.sock"}, + }, + Volumes: map[string]struct{}{ + "/drone": struct{}{}, + "/var/run/docker.sock": struct{}{}, + }, + } + + w.build, err = run(w.client, conf, name, w.timeout) + if err != nil { + return + } + return w.build.State.ExitCode, err +} + +// Notify executes the notification steps. +func (w *worker) Notify(stdin []byte) error { + // use the affinity parameter in case we are + // using Docker swarm as a backend. + environment := []string{"affinity:container==" + w.build.Id} + + // the build container is acting as an ambassador container + // with a shared filesystem . + volume := []string{w.build.Id} + + // the command line arguments passed into the + // build agent container. + args := DefaultNotifyArgs + args = append(args, "--") + args = append(args, string(stdin)) + + conf := &dockerclient.ContainerConfig{ + Image: DefaultAgent, + Entrypoint: DefaultEntrypoint, + Cmd: args, + Env: environment, + HostConfig: dockerclient.HostConfig{ + VolumesFrom: volume, + }, + } + + var err error + w.notify, err = run(w.client, conf, "", DefaultNotifyTimeout) + return err +} + +// Logs returns a multi-reader that fetches the logs +// from the build and deploy agents. +func (w *worker) Logs() (io.ReadCloser, error) { + if w.build == nil { + return nil, ErrLogging + } + return w.client.ContainerLogs(w.build.Id, logOpts) +} + +// Remove stops and removes the build, deploy and +// notification agents created for the build task. +func (w *worker) Remove() { + if w.notify != nil { + w.client.KillContainer(w.notify.Id, "9") + w.client.RemoveContainer(w.notify.Id, true, true) + } + if w.build != nil { + w.client.KillContainer(w.build.Id, "9") + w.client.RemoveContainer(w.build.Id, true, true) + } +} + +// run is a helper function that creates and starts a container, +// blocking until either complete or the timeout is reached. If +// the timeout is reached an ErrTimeout is returned, else the +// container info is returned. +func run(client dockerclient.Client, conf *dockerclient.ContainerConfig, name string, timeout time.Duration) (*dockerclient.ContainerInfo, error) { + + // attempts to create the contianer + id, err := client.CreateContainer(conf, name) + if err != nil { + // and pull the image and re-create if that fails + client.PullImage(conf.Image, nil) + id, err = client.CreateContainer(conf, name) + // make sure the container is removed in + // the event of a creation error. + if len(id) != 0 { + client.RemoveContainer(id, true, true) + } + if err != nil { + return nil, err + } + } + + // ensures the container is always stopped + // and ready to be removed. + defer func() { + client.StopContainer(id, 5) + client.KillContainer(id, "9") + }() + + // fetches the container information. + info, err := client.InspectContainer(id) + if err != nil { + return nil, err + } + + // channel listening for errors while the + // container is running async. + errc := make(chan error, 1) + infoc := make(chan *dockerclient.ContainerInfo, 1) + go func() { + + // starts the container + err := client.StartContainer(id, &conf.HostConfig) + if err != nil { + errc <- err + return + } + + // blocks and waits for the container to finish + // by streaming the logs (to /dev/null). Ideally + // we could use the `wait` function instead + rc, err := client.ContainerLogs(id, logOptsTail) + if err != nil { + errc <- err + return + } + io.Copy(ioutil.Discard, rc) + rc.Close() + + // fetches the container information + info, err := client.InspectContainer(id) + if err != nil { + errc <- err + return + } + infoc <- info + }() + + select { + case info := <-infoc: + return info, nil + case err := <-errc: + return info, err + case <-time.After(timeout): + return info, ErrTimeout + } +} diff --git a/runner/runner.go b/runner/runner.go new file mode 100644 index 000000000..0411eaf21 --- /dev/null +++ b/runner/runner.go @@ -0,0 +1,22 @@ +package runner + +import ( + "io" + + "github.com/drone/drone/common" + "github.com/drone/drone/queue" +) + +type Runner interface { + Run(work *queue.Work) error + Cancel(repo string, build, task int) error + Logs(repo string, build, task int) (io.ReadCloser, error) +} + +// Updater defines a set of functions that are required for +// the runner to sent Drone updates during a build. +type Updater interface { + SetBuild(*common.Repo, *common.Build) error + SetTask(*common.Repo, *common.Build, *common.Task) error + SetLogs(*common.Repo, *common.Build, *common.Task, io.ReadCloser) error +} diff --git a/server/queue.go b/server/queue.go index 486ef652b..0251298e2 100644 --- a/server/queue.go +++ b/server/queue.go @@ -50,6 +50,9 @@ func PollBuild(c *gin.Context) { } c.JSON(200, work) + + // acknowledge work received by the client + queue.Ack(work) } // GET /queue/push/:owner/:repo