diff --git a/.gitignore b/.gitignore index 0166e230d..f48fd3263 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,9 @@ drone/drone *.sqlite *_gen.go +!store/datastore/sql/sqlite/sql_gen.go +!store/datastore/sql/mysql/sql_gen.go +!store/datastore/sql/postgres/sql_gen.go #*.css *.txt *.zip diff --git a/drone/agent/agent.go b/drone/agent/agent.go index dba7ca407..df3c1233e 100644 --- a/drone/agent/agent.go +++ b/drone/agent/agent.go @@ -2,10 +2,13 @@ package agent import ( "context" + "encoding/json" "io" + "io/ioutil" "log" "math" "net/url" + "strconv" "sync" "time" @@ -189,9 +192,9 @@ func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error { state := rpc.State{} state.Started = time.Now().Unix() - err = client.Update(context.Background(), work.ID, state) + err = client.Init(context.Background(), work.ID, state) if err != nil { - log.Printf("pipeline: error updating pipeline status: %s: %s", work.ID, err) + log.Printf("pipeline: error signaling pipeline init: %s: %s", work.ID, err) } var uploads sync.WaitGroup @@ -201,9 +204,31 @@ func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error { return rerr } uploads.Add(1) - writer := rpc.NewLineWriter(client, work.ID, proc.Alias) - rlimit := io.LimitReader(part, maxLogsUpload) - io.Copy(writer, rlimit) + + var secrets []string + for _, secret := range work.Config.Secrets { + if secret.Mask { + secrets = append(secrets, secret.Value) + } + } + + limitedPart := io.LimitReader(part, maxLogsUpload) + logstream := rpc.NewLineWriter(client, work.ID, proc.Alias, secrets...) + io.Copy(logstream, limitedPart) + + file := &rpc.File{} + file.Mime = "application/json+logs" + file.Proc = proc.Alias + file.Name = "logs.json" + file.Data, _ = json.Marshal(logstream.Lines()) + file.Size = len(file.Data) + file.Time = time.Now().Unix() + + if serr := client.Upload(context.Background(), work.ID, file); serr != nil { + log.Printf("pipeline: cannot upload logs: %s: %s: %s", work.ID, file.Mime, serr) + } else { + log.Printf("pipeline: finish uploading logs: %s: step %s: %s", file.Mime, work.ID, proc.Alias) + } defer func() { log.Printf("pipeline: finish uploading logs: %s: step %s", work.ID, proc.Alias) @@ -214,10 +239,54 @@ func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error { if rerr != nil { return nil } - rlimit = io.LimitReader(part, maxFileUpload) - mime := part.Header().Get("Content-Type") - if serr := client.Upload(context.Background(), work.ID, mime, rlimit); serr != nil { - log.Printf("pipeline: cannot upload artifact: %s: %s: %s", work.ID, mime, serr) + // TODO should be configurable + limitedPart = io.LimitReader(part, maxFileUpload) + file = &rpc.File{} + file.Mime = part.Header().Get("Content-Type") + file.Proc = proc.Alias + file.Name = part.FileName() + file.Data, _ = ioutil.ReadAll(limitedPart) + file.Size = len(file.Data) + file.Time = time.Now().Unix() + + if serr := client.Upload(context.Background(), work.ID, file); serr != nil { + log.Printf("pipeline: cannot upload artifact: %s: %s: %s", work.ID, file.Mime, serr) + } else { + log.Printf("pipeline: finish uploading artifact: %s: step %s: %s", file.Mime, work.ID, proc.Alias) + } + return nil + }) + + defaultTracer := pipeline.TraceFunc(func(state *pipeline.State) error { + procState := rpc.State{ + Proc: state.Pipeline.Step.Alias, + Exited: state.Process.Exited, + ExitCode: state.Process.ExitCode, + Started: time.Now().Unix(), // TODO do not do this + Finished: time.Now().Unix(), + } + defer func() { + if uerr := client.Update(context.Background(), work.ID, procState); uerr != nil { + log.Printf("Pipeine: error updating pipeline step status: %s: %s: %s", work.ID, procState.Proc, uerr) + } + }() + if state.Process.Exited { + return nil + } + if state.Pipeline.Step.Environment == nil { + state.Pipeline.Step.Environment = map[string]string{} + } + state.Pipeline.Step.Environment["CI_BUILD_STATUS"] = "success" + state.Pipeline.Step.Environment["CI_BUILD_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10) + state.Pipeline.Step.Environment["CI_BUILD_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10) + + state.Pipeline.Step.Environment["CI_JOB_STATUS"] = "success" + state.Pipeline.Step.Environment["CI_JOB_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10) + state.Pipeline.Step.Environment["CI_JOB_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10) + + if state.Pipeline.Error != nil { + state.Pipeline.Step.Environment["CI_BUILD_STATUS"] = "failure" + state.Pipeline.Step.Environment["CI_JOB_STATUS"] = "failure" } return nil }) @@ -225,7 +294,7 @@ func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error { err = pipeline.New(work.Config, pipeline.WithContext(ctx), pipeline.WithLogger(defaultLogger), - pipeline.WithTracer(pipeline.DefaultTracer), + pipeline.WithTracer(defaultTracer), pipeline.WithEngine(engine), ).Run() @@ -247,9 +316,10 @@ func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error { log.Printf("pipeline: execution complete: %s", work.ID) uploads.Wait() - err = client.Update(context.Background(), work.ID, state) + + err = client.Done(context.Background(), work.ID, state) if err != nil { - log.Printf("Pipeine: error updating pipeline status: %s: %s", work.ID, err) + log.Printf("Pipeine: error signaling pipeline done: %s: %s", work.ID, err) } return nil diff --git a/model/build.go b/model/build.go index 56c2f69af..f718598e1 100644 --- a/model/build.go +++ b/model/build.go @@ -31,7 +31,8 @@ type Build struct { Verified bool `json:"verified" meddler:"build_verified"` // deprecate Reviewer string `json:"reviewed_by" meddler:"build_reviewer"` Reviewed int64 `json:"reviewed_at" meddler:"build_reviewed"` - Jobs []*Job `json:"jobs,omitempty" meddler:"-"` + // Jobs []*Job `json:"jobs,omitempty" meddler:"-"` + Procs []*Proc `json:"procs,omitempty" meddler:"-"` } type BuildGroup struct { diff --git a/model/event.go b/model/event.go index efee037cc..e04803afc 100644 --- a/model/event.go +++ b/model/event.go @@ -15,24 +15,5 @@ type Event struct { Type EventType `json:"type"` Repo Repo `json:"repo"` Build Build `json:"build"` - Job Job `json:"job"` -} - -// NewEvent creates a new Event for the build, using copies of -// the build data to avoid possible mutation or race conditions. -func NewEvent(t EventType, r *Repo, b *Build, j *Job) *Event { - return &Event{ - Type: t, - Repo: *r, - Build: *b, - Job: *j, - } -} - -func NewBuildEvent(t EventType, r *Repo, b *Build) *Event { - return &Event{ - Type: t, - Repo: *r, - Build: *b, - } + Proc Proc `json:"proc"` } diff --git a/model/file.go b/model/file.go new file mode 100644 index 000000000..e13e5e756 --- /dev/null +++ b/model/file.go @@ -0,0 +1,23 @@ +package model + +import "io" + +// FileStore persists pipeline artifacts to storage. +type FileStore interface { + FileList(*Build) ([]*File, error) + FileFind(*Proc, string) (*File, error) + FileRead(*Proc, string) (io.ReadCloser, error) + FileCreate(*File, io.Reader) error +} + +// File represents a pipeline artifact. +type File struct { + ID int64 `json:"id" meddler:"file_id,pk"` + BuildID int64 `json:"build_id" meddler:"file_build_id"` + ProcID int64 `json:"proc_id" meddler:"file_proc_id"` + Name string `json:"name" meddler:"file_name"` + Size int `json:"size" meddler:"file_size"` + Mime string `json:"mime" meddler:"file_mime"` + Time int64 `json:"time" meddler:"file_time"` + // Data []byte `json:"data" meddler:"file_data"` +} diff --git a/model/job.go b/model/job.go index b8d2bbd12..36c2d2f73 100644 --- a/model/job.go +++ b/model/job.go @@ -1,17 +1,17 @@ package model -// swagger:model job -type Job struct { - ID int64 `json:"id" meddler:"job_id,pk"` - BuildID int64 `json:"-" meddler:"job_build_id"` - NodeID int64 `json:"-" meddler:"job_node_id"` - Number int `json:"number" meddler:"job_number"` - Error string `json:"error" meddler:"job_error"` - Status string `json:"status" meddler:"job_status"` - ExitCode int `json:"exit_code" meddler:"job_exit_code"` - Enqueued int64 `json:"enqueued_at" meddler:"job_enqueued"` - Started int64 `json:"started_at" meddler:"job_started"` - Finished int64 `json:"finished_at" meddler:"job_finished"` - - Environment map[string]string `json:"environment" meddler:"job_environment,json"` -} +// // swagger:model job +// type Job struct { +// ID int64 `json:"id" meddler:"job_id,pk"` +// BuildID int64 `json:"-" meddler:"job_build_id"` +// NodeID int64 `json:"-" meddler:"job_node_id"` +// Number int `json:"number" meddler:"job_number"` +// Error string `json:"error" meddler:"job_error"` +// Status string `json:"status" meddler:"job_status"` +// ExitCode int `json:"exit_code" meddler:"job_exit_code"` +// Enqueued int64 `json:"enqueued_at" meddler:"job_enqueued"` +// Started int64 `json:"started_at" meddler:"job_started"` +// Finished int64 `json:"finished_at" meddler:"job_finished"` +// +// Environment map[string]string `json:"environment" meddler:"job_environment,json"` +// } diff --git a/model/key.go b/model/key.go deleted file mode 100644 index aef0ff497..000000000 --- a/model/key.go +++ /dev/null @@ -1,8 +0,0 @@ -package model - -type Key struct { - ID int64 `json:"-" meddler:"key_id,pk"` - RepoID int64 `json:"-" meddler:"key_repo_id"` - Public string `json:"public" meddler:"key_public"` - Private string `json:"private" meddler:"key_private"` -} diff --git a/model/log.go b/model/log.go deleted file mode 100644 index 41a8fb8c1..000000000 --- a/model/log.go +++ /dev/null @@ -1,7 +0,0 @@ -package model - -type Log struct { - ID int64 `meddler:"log_id,pk"` - JobID int64 `meddler:"log_job_id"` - Data []byte `meddler:"log_data"` -} diff --git a/model/proc.go b/model/proc.go new file mode 100644 index 000000000..d73001bdb --- /dev/null +++ b/model/proc.go @@ -0,0 +1,60 @@ +package model + +// ProcStore persists process information to storage. +type ProcStore interface { + ProcLoad(int64) (*Proc, error) + ProcFind(*Build, int) (*Proc, error) + ProcChild(*Build, int, string) (*Proc, error) + ProcList(*Build) ([]*Proc, error) + ProcCreate([]*Proc) error + ProcUpdate(*Proc) error + ProcClear(*Build) error +} + +// Proc represents a process in the build pipeline. +// swagger:model proc +type Proc struct { + ID int64 `json:"id" meddler:"proc_id,pk"` + BuildID int64 `json:"build_id" meddler:"proc_build_id"` + PID int `json:"pid" meddler:"proc_pid"` + PPID int `json:"ppid" meddler:"proc_ppid"` + PGID int `json:"pgid" meddler:"proc_pgid"` + Name string `json:"name" meddler:"proc_name"` + State string `json:"state" meddler:"proc_state"` + Error string `json:"error,omitempty" meddler:"proc_error"` + ExitCode int `json:"exit_code" meddler:"proc_exit_code"` + Started int64 `json:"start_time,omitempty" meddler:"proc_started"` + Stopped int64 `json:"end_time,omitempty" meddler:"proc_stopped"` + Machine string `json:"machine,omitempty" meddler:"proc_machine"` + Platform string `json:"platform,omitempty" meddler:"proc_platform"` + Environ map[string]string `json:"environ,omitempty" meddler:"proc_environ,json"` + Children []*Proc `json:"children,omitempty" meddler:"-"` +} + +// Running returns true if the process state is pending or running. +func (p *Proc) Running() bool { + return p.State == StatusPending || p.State == StatusRunning +} + +// Failing returns true if the process state is failed, killed or error. +func (p *Proc) Failing() bool { + return p.State == StatusError || p.State == StatusKilled || p.State == StatusFailure +} + +// Tree creates a process tree from a flat process list. +func Tree(procs []*Proc) []*Proc { + var ( + nodes []*Proc + parent *Proc + ) + for _, proc := range procs { + if proc.PPID == 0 { + nodes = append(nodes, proc) + parent = proc + continue + } else { + parent.Children = append(parent.Children, proc) + } + } + return nodes +} diff --git a/model/work.go b/model/work.go deleted file mode 100644 index 06c69d91c..000000000 --- a/model/work.go +++ /dev/null @@ -1,19 +0,0 @@ -package model - -// Work represents an item for work to be -// processed by a worker. -type Work struct { - Signed bool `json:"signed"` - Verified bool `json:"verified"` - Yaml string `json:"config"` - YamlEnc string `json:"secret"` - Repo *Repo `json:"repo"` - Build *Build `json:"build"` - BuildLast *Build `json:"build_last"` - Job *Job `json:"job"` - Netrc *Netrc `json:"netrc"` - Keys *Key `json:"keys"` - System *System `json:"system"` - Secrets []*Secret `json:"secrets"` - User *User `json:"user"` -} diff --git a/router/router.go b/router/router.go index c6fcc12c3..366c2e3db 100644 --- a/router/router.go +++ b/router/router.go @@ -104,7 +104,7 @@ func Load(middleware ...gin.HandlerFunc) http.Handler { repo.GET("", server.GetRepo) repo.GET("/builds", server.GetBuilds) repo.GET("/builds/:number", server.GetBuild) - repo.GET("/logs/:number/:job", server.GetBuildLogs) + repo.GET("/logs/:number/:ppid/:proc", server.GetBuildLogs) repo.POST("/sign", session.MustPush, server.Sign) repo.GET("/secrets", session.MustPush, server.GetSecrets) diff --git a/server/build.go b/server/build.go index 9249d1b4f..2f8021159 100644 --- a/server/build.go +++ b/server/build.go @@ -1,7 +1,6 @@ package server import ( - "bufio" "context" "encoding/json" "fmt" @@ -51,14 +50,10 @@ func GetBuild(c *gin.Context) { c.AbortWithError(http.StatusInternalServerError, err) return } - jobs, _ := store.GetJobList(c, build) + procs, _ := store.FromContext(c).ProcList(build) + build.Procs = model.Tree(procs) - out := struct { - *model.Build - Jobs []*model.Job `json:"jobs"` - }{build, jobs} - - c.JSON(http.StatusOK, &out) + c.JSON(http.StatusOK, build) } func GetBuildLast(c *gin.Context) { @@ -70,27 +65,20 @@ func GetBuildLast(c *gin.Context) { c.String(http.StatusInternalServerError, err.Error()) return } - jobs, _ := store.GetJobList(c, build) - out := struct { - *model.Build - Jobs []*model.Job `json:"jobs"` - }{build, jobs} - - c.JSON(http.StatusOK, &out) + procs, _ := store.FromContext(c).ProcList(build) + build.Procs = model.Tree(procs) + c.JSON(http.StatusOK, build) } func GetBuildLogs(c *gin.Context) { repo := session.Repo(c) - // the user may specify to stream the full logs, - // or partial logs, capped at 2MB. - full, _ := strconv.ParseBool(c.DefaultQuery("full", "false")) - // parse the build number and job sequence number from // the repquest parameter. num, _ := strconv.Atoi(c.Params.ByName("number")) - seq, _ := strconv.Atoi(c.Params.ByName("job")) + ppid, _ := strconv.Atoi(c.Params.ByName("ppid")) + name := c.Params.ByName("proc") build, err := store.GetBuildNumber(c, repo, num) if err != nil { @@ -98,25 +86,22 @@ func GetBuildLogs(c *gin.Context) { return } - job, err := store.GetJobNumber(c, build, seq) + proc, err := store.FromContext(c).ProcChild(build, ppid, name) if err != nil { c.AbortWithError(404, err) return } - r, err := store.ReadLog(c, job) + rc, err := store.FromContext(c).LogFind(proc) if err != nil { c.AbortWithError(404, err) return } - defer r.Close() - if full { - // TODO implement limited streaming to avoid crashing the browser - } + defer rc.Close() c.Header("Content-Type", "application/json") - copyLogs(c.Writer, r) + io.Copy(c.Writer, rc) } func DeleteBuild(c *gin.Context) { @@ -133,26 +118,27 @@ func DeleteBuild(c *gin.Context) { return } - job, err := store.GetJobNumber(c, build, seq) + proc, err := store.FromContext(c).ProcFind(build, seq) if err != nil { c.AbortWithError(404, err) return } - if job.Status != model.StatusRunning { + if proc.State != model.StatusRunning { c.String(400, "Cannot cancel a non-running build") return } - job.Status = model.StatusKilled - job.Finished = time.Now().Unix() - if job.Started == 0 { - job.Started = job.Finished + proc.State = model.StatusKilled + proc.Stopped = time.Now().Unix() + if proc.Started == 0 { + proc.Started = proc.Stopped } - job.ExitCode = 137 - store.UpdateBuildJob(c, build, job) + proc.ExitCode = 137 + // TODO cancel child procs + store.FromContext(c).ProcUpdate(proc) - config.queue.Error(context.Background(), fmt.Sprint(job.ID), queue.ErrCancel) + config.queue.Error(context.Background(), fmt.Sprint(proc.ID), queue.ErrCancel) c.String(204, "") } @@ -243,15 +229,37 @@ func PostApproval(c *gin.Context) { return } + var pcounter = len(items) for _, item := range items { - build.Jobs = append(build.Jobs, item.Job) - store.CreateJob(c, item.Job) - // TODO err + build.Procs = append(build.Procs, item.Proc) + item.Proc.BuildID = build.ID + + for _, stage := range item.Config.Stages { + var gid int + for _, step := range stage.Steps { + pcounter++ + if gid == 0 { + gid = pcounter + } + proc := &model.Proc{ + BuildID: build.ID, + Name: step.Alias, + PID: pcounter, + PPID: item.Proc.PID, + PGID: gid, + State: model.StatusPending, + } + build.Procs = append(build.Procs, proc) + } + } } + store.FromContext(c).ProcCreate(build.Procs) // // publish topic // + buildCopy := *build + buildCopy.Procs = model.Tree(buildCopy.Procs) message := pubsub.Message{ Labels: map[string]string{ "repo": repo.FullName, @@ -261,17 +269,18 @@ func PostApproval(c *gin.Context) { message.Data, _ = json.Marshal(model.Event{ Type: model.Enqueued, Repo: *repo, - Build: *build, + Build: buildCopy, }) // TODO remove global reference config.pubsub.Publish(c, "topic/events", message) + // // end publish topic // for _, item := range items { task := new(queue.Task) - task.ID = fmt.Sprint(item.Job.ID) + task.ID = fmt.Sprint(item.Proc.ID) task.Labels = map[string]string{} task.Labels["platform"] = item.Platform for k, v := range item.Labels { @@ -279,7 +288,7 @@ func PostApproval(c *gin.Context) { } task.Data, _ = json.Marshal(rpc.Pipeline{ - ID: fmt.Sprint(item.Job.ID), + ID: fmt.Sprint(item.Proc.ID), Config: item.Config, Timeout: b.Repo.Timeout, }) @@ -336,23 +345,6 @@ func GetBuildQueue(c *gin.Context) { c.JSON(200, out) } -// copyLogs copies the stream from the source to the destination in valid JSON -// format. This converts the logs, which are per-line JSON objects, to a -// proper JSON array. -func copyLogs(dest io.Writer, src io.Reader) error { - io.WriteString(dest, "[") - - scanner := bufio.NewScanner(src) - for scanner.Scan() { - io.WriteString(dest, scanner.Text()) - io.WriteString(dest, ",\n") - } - - io.WriteString(dest, "{}]") - - return nil -} - // // // @@ -411,13 +403,6 @@ func PostBuild(c *gin.Context) { return } - jobs, err := store.GetJobList(c, build) - if err != nil { - logrus.Errorf("failure to get build %d jobs. %s", build.Number, err) - c.AbortWithError(404, err) - return - } - // must not restart a running build if build.Status == model.StatusPending || build.Status == model.StatusRunning { c.String(409, "Cannot re-start a started build") @@ -430,11 +415,12 @@ func PostBuild(c *gin.Context) { build.ID = 0 build.Number = 0 build.Parent = num - for _, job := range jobs { - job.ID = 0 - job.NodeID = 0 - } - err := store.CreateBuild(c, build, jobs...) + build.Status = model.StatusPending + build.Started = 0 + build.Finished = 0 + build.Enqueued = time.Now().UTC().Unix() + build.Error = "" + err = store.CreateBuild(c, build) if err != nil { c.String(500, err.Error()) return @@ -448,6 +434,26 @@ func PostBuild(c *gin.Context) { build.Event = event } build.Deploy = c.DefaultQuery("deploy_to", build.Deploy) + } else { + // todo move this to database tier + // and wrap inside a transaction + build.Status = model.StatusPending + build.Started = 0 + build.Finished = 0 + build.Enqueued = time.Now().UTC().Unix() + build.Error = "" + + err = store.FromContext(c).ProcClear(build) + if err != nil { + c.AbortWithStatus(500) + return + } + + err = store.UpdateBuild(c, build) + if err != nil { + c.AbortWithStatus(500) + return + } } // Read query string parameters into buildParams, exclude reserved params @@ -462,35 +468,6 @@ func PostBuild(c *gin.Context) { } } - // todo move this to database tier - // and wrap inside a transaction - build.Status = model.StatusPending - build.Started = 0 - build.Finished = 0 - build.Enqueued = time.Now().UTC().Unix() - build.Error = "" - for _, job := range jobs { - for k, v := range buildParams { - job.Environment[k] = v - } - job.Error = "" - job.Status = model.StatusPending - job.Started = 0 - job.Finished = 0 - job.ExitCode = 0 - job.NodeID = 0 - job.Enqueued = build.Enqueued - store.UpdateJob(c, job) - } - - err = store.UpdateBuild(c, build) - if err != nil { - c.AbortWithStatus(500) - return - } - - c.JSON(202, build) - // get the previous build so that we can send // on status change notifications last, _ := store.GetBuildLastBefore(c, repo, build.Branch, build.ID) @@ -508,25 +485,60 @@ func PostBuild(c *gin.Context) { Link: httputil.GetURL(c.Request), Yaml: string(raw), } + // TODO inject environment varibles !!!!!! buildParams items, err := b.Build() if err != nil { build.Status = model.StatusError build.Started = time.Now().Unix() build.Finished = build.Started build.Error = err.Error() + c.JSON(500, build) return } - for i, item := range items { - // TODO prevent possible index out of bounds - item.Job.ID = jobs[i].ID - build.Jobs = append(build.Jobs, item.Job) - store.UpdateJob(c, item.Job) + var pcounter = len(items) + for _, item := range items { + build.Procs = append(build.Procs, item.Proc) + item.Proc.BuildID = build.ID + + for _, stage := range item.Config.Stages { + var gid int + for _, step := range stage.Steps { + pcounter++ + if gid == 0 { + gid = pcounter + } + proc := &model.Proc{ + BuildID: build.ID, + Name: step.Alias, + PID: pcounter, + PPID: item.Proc.PID, + PGID: gid, + State: model.StatusPending, + } + build.Procs = append(build.Procs, proc) + } + } } + err = store.FromContext(c).ProcCreate(build.Procs) + if err != nil { + logrus.Errorf("cannot restart %s#%d: %s", repo.FullName, build.Number, err) + build.Status = model.StatusError + build.Started = time.Now().Unix() + build.Finished = build.Started + build.Error = err.Error() + c.JSON(500, build) + return + } + + c.JSON(202, build) + // // publish topic // + buildCopy := *build + buildCopy.Procs = model.Tree(buildCopy.Procs) message := pubsub.Message{ Labels: map[string]string{ "repo": repo.FullName, @@ -536,7 +548,7 @@ func PostBuild(c *gin.Context) { message.Data, _ = json.Marshal(model.Event{ Type: model.Enqueued, Repo: *repo, - Build: *build, + Build: buildCopy, }) // TODO remove global reference config.pubsub.Publish(c, "topic/events", message) @@ -546,7 +558,7 @@ func PostBuild(c *gin.Context) { for _, item := range items { task := new(queue.Task) - task.ID = fmt.Sprint(item.Job.ID) + task.ID = fmt.Sprint(item.Proc.ID) task.Labels = map[string]string{} task.Labels["platform"] = item.Platform for k, v := range item.Labels { @@ -554,7 +566,7 @@ func PostBuild(c *gin.Context) { } task.Data, _ = json.Marshal(rpc.Pipeline{ - ID: fmt.Sprint(item.Job.ID), + ID: fmt.Sprint(item.Proc.ID), Config: item.Config, Timeout: b.Repo.Timeout, }) diff --git a/server/hook.go b/server/hook.go index 15a407a8a..06762dab7 100644 --- a/server/hook.go +++ b/server/hook.go @@ -123,7 +123,7 @@ func PostHook(c *gin.Context) { // if the remote has a refresh token, the current access token // may be stale. Therefore, we should refresh prior to dispatching - // the job. + // the build. if refresher, ok := remote_.(remote.Refresher); ok { ok, _ := refresher.Refresh(user) if ok { @@ -221,7 +221,7 @@ func PostHook(c *gin.Context) { build.RepoID = repo.ID build.Verified = true - if err := store.CreateBuild(c, build, build.Jobs...); err != nil { + if err := store.CreateBuild(c, build, build.Procs...); err != nil { logrus.Errorf("failure to save commit for %s. %s", repo.FullName, err) c.AbortWithError(500, err) return @@ -268,10 +268,34 @@ func PostHook(c *gin.Context) { return } + var pcounter = len(items) + for _, item := range items { - build.Jobs = append(build.Jobs, item.Job) - store.CreateJob(c, item.Job) - // TODO err + build.Procs = append(build.Procs, item.Proc) + item.Proc.BuildID = build.ID + + for _, stage := range item.Config.Stages { + var gid int + for _, step := range stage.Steps { + pcounter++ + if gid == 0 { + gid = pcounter + } + proc := &model.Proc{ + BuildID: build.ID, + Name: step.Alias, + PID: pcounter, + PPID: item.Proc.PID, + PGID: gid, + State: model.StatusPending, + } + build.Procs = append(build.Procs, proc) + } + } + } + err = store.FromContext(c).ProcCreate(build.Procs) + if err != nil { + logrus.Errorf("error persisting procs %s/%d: %s", repo.FullName, build.Number, err) } // @@ -283,10 +307,12 @@ func PostHook(c *gin.Context) { "private": strconv.FormatBool(repo.IsPrivate), }, } + buildCopy := *build + buildCopy.Procs = model.Tree(buildCopy.Procs) message.Data, _ = json.Marshal(model.Event{ Type: model.Enqueued, Repo: *repo, - Build: *build, + Build: buildCopy, }) // TODO remove global reference config.pubsub.Publish(c, "topic/events", message) @@ -296,7 +322,7 @@ func PostHook(c *gin.Context) { for _, item := range items { task := new(queue.Task) - task.ID = fmt.Sprint(item.Job.ID) + task.ID = fmt.Sprint(item.Proc.ID) task.Labels = map[string]string{} task.Labels["platform"] = item.Platform for k, v := range item.Labels { @@ -304,7 +330,7 @@ func PostHook(c *gin.Context) { } task.Data, _ = json.Marshal(rpc.Pipeline{ - ID: fmt.Sprint(item.Job.ID), + ID: fmt.Sprint(item.Proc.ID), Config: item.Config, Timeout: b.Repo.Timeout, }) @@ -315,7 +341,7 @@ func PostHook(c *gin.Context) { } // return the metadata from the cli context. -func metadataFromStruct(repo *model.Repo, build, last *model.Build, job *model.Job, link string) frontend.Metadata { +func metadataFromStruct(repo *model.Repo, build, last *model.Build, proc *model.Proc, link string) frontend.Metadata { return frontend.Metadata{ Repo: frontend.Repo{ Name: repo.Name, @@ -368,8 +394,8 @@ func metadataFromStruct(repo *model.Repo, build, last *model.Build, job *model.J }, }, Job: frontend.Job{ - Number: job.Number, - Matrix: job.Environment, + Number: proc.PID, + Matrix: proc.Environ, }, Sys: frontend.System{ Name: "drone", @@ -390,7 +416,7 @@ type builder struct { } type buildItem struct { - Job *model.Job + Proc *model.Proc Platform string Labels map[string]string Config *backend.Config @@ -408,15 +434,15 @@ func (b *builder) Build() ([]*buildItem, error) { var items []*buildItem for i, axis := range axes { - job := &model.Job{ - BuildID: b.Curr.ID, - Number: i + 1, - Status: model.StatusPending, - Environment: axis, - Enqueued: b.Curr.Created, + proc := &model.Proc{ + BuildID: b.Curr.ID, + PID: i + 1, + PGID: i + 1, + State: model.StatusPending, + Environ: axis, } - metadata := metadataFromStruct(b.Repo, b.Curr, b.Last, job, b.Link) + metadata := metadataFromStruct(b.Repo, b.Curr, b.Last, proc, b.Link) environ := metadata.Environ() for k, v := range metadata.EnvironDrone() { environ[k] = v @@ -481,15 +507,16 @@ func (b *builder) Build() ([]*buildItem, error) { compiler.WithPrefix( fmt.Sprintf( "%d_%d", - job.ID, + proc.ID, time.Now().Unix(), ), ), - compiler.WithEnviron(job.Environment), + compiler.WithEnviron(proc.Environ), compiler.WithProxy(), // TODO ability to set global volumes for things like certs compiler.WithVolumes(), compiler.WithWorkspaceFromURL("/drone", b.Curr.Link), + compiler.WithMetadata(metadata), ).Compile(parsed) for _, sec := range b.Secs { @@ -506,7 +533,7 @@ func (b *builder) Build() ([]*buildItem, error) { } item := &buildItem{ - Job: job, + Proc: proc, Config: ir, Labels: parsed.Labels, Platform: metadata.Sys.Arch, diff --git a/server/rpc.go b/server/rpc.go index 520f67d25..6be318816 100644 --- a/server/rpc.go +++ b/server/rpc.go @@ -4,13 +4,10 @@ import ( "bytes" "context" "encoding/json" - "fmt" - "io" "log" "os" "strconv" - "github.com/Sirupsen/logrus" "github.com/cncd/logging" "github.com/cncd/pipeline/pipeline/rpc" "github.com/cncd/pubsub" @@ -109,20 +106,26 @@ func (s *RPC) Extend(c context.Context, id string) error { // Update implements the rpc.Update function func (s *RPC) Update(c context.Context, id string, state rpc.State) error { - jobID, err := strconv.ParseInt(id, 10, 64) + procID, err := strconv.ParseInt(id, 10, 64) if err != nil { return err } - job, err := s.store.GetJob(jobID) + pproc, err := s.store.ProcLoad(procID) if err != nil { - log.Printf("error: cannot find job with id %d: %s", jobID, err) + log.Printf("error: rpc.update: cannot find pproc with id %d: %s", procID, err) return err } - build, err := s.store.GetBuild(job.BuildID) + build, err := s.store.GetBuild(pproc.BuildID) if err != nil { - log.Printf("error: cannot find build with id %d: %s", job.BuildID, err) + log.Printf("error: cannot find build with id %d: %s", pproc.BuildID, err) + return err + } + + proc, err := s.store.ProcChild(build, pproc.PID, state.Proc) + if err != nil { + log.Printf("error: cannot find proc with name %s: %s", state.Proc, err) return err } @@ -132,92 +135,231 @@ func (s *RPC) Update(c context.Context, id string, state rpc.State) error { return err } - if build.Status != model.StatusRunning { - - } - - job.Started = state.Started - job.Finished = state.Finished - job.ExitCode = state.ExitCode - job.Status = model.StatusRunning - job.Error = state.Error - - if build.Status == model.StatusPending { - build.Started = job.Started - build.Status = model.StatusRunning - s.store.UpdateBuild(build) - } - - log.Printf("pipeline: update %s: exited=%v, exit_code=%d", id, state.Exited, state.ExitCode) - if state.Exited { - - job.Status = model.StatusSuccess - if job.ExitCode != 0 || job.Error != "" { - job.Status = model.StatusFailure + proc.Stopped = state.Finished + proc.ExitCode = state.ExitCode + proc.Error = state.Error + proc.State = model.StatusSuccess + if state.ExitCode != 0 || state.Error != "" { + proc.State = model.StatusFailure } - - // save the logs - var buf bytes.Buffer - if serr := s.logger.Snapshot(context.Background(), id, &buf); serr != nil { - log.Printf("error: snapshotting logs: %s", serr) - } - if werr := s.store.WriteLog(job, &buf); werr != nil { - log.Printf("error: persisting logs: %s", werr) - } - - // close the logger - s.logger.Close(c, id) - s.queue.Done(c, id) + } else { + proc.Started = state.Started + proc.State = model.StatusRunning } - // hackity hack - cc := context.WithValue(c, "store", s.store) - ok, uerr := store.UpdateBuildJob(cc, build, job) - if uerr != nil { - log.Printf("error: updating job: %s", uerr) - } - if ok { - // get the user because we transfer the user form the server to agent - // and back we lose the token which does not get serialized to json. - user, uerr := s.store.GetUser(repo.UserID) - if uerr != nil { - logrus.Errorf("Unable to find user. %s", err) - } else { - s.remote.Status(user, repo, build, - fmt.Sprintf("%s/%s/%d", s.host, repo.FullName, build.Number)) - } + if err := s.store.ProcUpdate(proc); err != nil { + log.Printf("error: rpc.update: cannot update proc: %s", err) } - message := pubsub.Message{} + build.Procs, _ = s.store.ProcList(build) + build.Procs = model.Tree(build.Procs) + message := pubsub.Message{ + Labels: map[string]string{ + "repo": repo.FullName, + "private": strconv.FormatBool(repo.IsPrivate), + }, + } message.Data, _ = json.Marshal(model.Event{ - Type: func() model.EventType { - // HACK we don't even really care about the event type. - // so we should just simplify how events are triggered. - // WTF was this being used for????????????????????????? - if job.Status == model.StatusRunning { - return model.Started - } - return model.Finished - }(), Repo: *repo, Build: *build, - Job: *job, }) - message.Labels = map[string]string{ - "repo": repo.FullName, - "private": strconv.FormatBool(repo.IsPrivate), - } s.pubsub.Publish(c, "topic/events", message) - log.Println("finish rpc.update") + return nil } // Upload implements the rpc.Upload function -func (s *RPC) Upload(c context.Context, id, mime string, file io.Reader) error { return nil } +func (s *RPC) Upload(c context.Context, id string, file *rpc.File) error { + procID, err := strconv.ParseInt(id, 10, 64) + if err != nil { + return err + } + + pproc, err := s.store.ProcLoad(procID) + if err != nil { + log.Printf("error: cannot find parent proc with id %d: %s", procID, err) + return err + } + + build, err := s.store.GetBuild(pproc.BuildID) + if err != nil { + log.Printf("error: cannot find build with id %d: %s", pproc.BuildID, err) + return err + } + + proc, err := s.store.ProcChild(build, pproc.PID, file.Proc) + if err != nil { + log.Printf("error: cannot find child proc with name %s: %s", file.Proc, err) + return err + } + + if file.Mime == "application/json+logs" { + return s.store.LogSave( + proc, + bytes.NewBuffer(file.Data), + ) + } + + return s.store.FileCreate(&model.File{ + BuildID: proc.BuildID, + ProcID: proc.ID, + Mime: file.Mime, + Name: file.Name, + Size: file.Size, + Time: file.Time, + }, + bytes.NewBuffer(file.Data), + ) +} + +// Init implements the rpc.Init function +func (s *RPC) Init(c context.Context, id string, state rpc.State) error { + procID, err := strconv.ParseInt(id, 10, 64) + if err != nil { + return err + } + + proc, err := s.store.ProcLoad(procID) + if err != nil { + log.Printf("error: cannot find proc with id %d: %s", procID, err) + return err + } + + build, err := s.store.GetBuild(proc.BuildID) + if err != nil { + log.Printf("error: cannot find build with id %d: %s", proc.BuildID, err) + return err + } + + repo, err := s.store.GetRepo(build.RepoID) + if err != nil { + log.Printf("error: cannot find repo with id %d: %s", build.RepoID, err) + return err + } + + if build.Status == model.StatusPending { + build.Status = model.StatusRunning + build.Started = state.Started + if err := s.store.UpdateBuild(build); err != nil { + log.Printf("error: init: cannot update build_id %d state: %s", build.ID, err) + } + } + + defer func() { + build.Procs, _ = s.store.ProcList(build) + message := pubsub.Message{ + Labels: map[string]string{ + "repo": repo.FullName, + "private": strconv.FormatBool(repo.IsPrivate), + }, + } + message.Data, _ = json.Marshal(model.Event{ + Repo: *repo, + Build: *build, + }) + s.pubsub.Publish(c, "topic/events", message) + }() + + proc.Started = state.Started + proc.State = model.StatusRunning + return s.store.ProcUpdate(proc) +} // Done implements the rpc.Done function -func (s *RPC) Done(c context.Context, id string) error { return nil } +func (s *RPC) Done(c context.Context, id string, state rpc.State) error { + procID, err := strconv.ParseInt(id, 10, 64) + if err != nil { + return err + } + + proc, err := s.store.ProcLoad(procID) + if err != nil { + log.Printf("error: cannot find proc with id %d: %s", procID, err) + return err + } + + build, err := s.store.GetBuild(proc.BuildID) + if err != nil { + log.Printf("error: cannot find build with id %d: %s", proc.BuildID, err) + return err + } + + repo, err := s.store.GetRepo(build.RepoID) + if err != nil { + log.Printf("error: cannot find repo with id %d: %s", build.RepoID, err) + return err + } + + proc.Stopped = state.Finished + proc.Error = state.Error + proc.ExitCode = state.ExitCode + proc.State = model.StatusSuccess + if proc.ExitCode != 0 || proc.Error != "" { + proc.State = model.StatusFailure + } + if err := s.store.ProcUpdate(proc); err != nil { + log.Printf("error: done: cannot update proc_id %d state: %s", procID, err) + } + + if err := s.queue.Done(c, id); err != nil { + log.Printf("error: done: cannot ack proc_id %d: %s", procID, err) + } + + // TODO handle this error + procs, _ := s.store.ProcList(build) + for _, p := range procs { + if p.Running() && p.PPID == proc.PID { + p.State = model.StatusSkipped + if p.Started != 0 { + p.State = model.StatusSuccess // for deamons that are killed + p.Stopped = proc.Stopped + } + if err := s.store.ProcUpdate(p); err != nil { + log.Printf("error: done: cannot update proc_id %d child state: %s", p.ID, err) + } + } + } + + running := false + status := model.StatusSuccess + for _, p := range procs { + if p.PPID == 0 { + if p.Running() { + running = true + } + if p.Failing() { + status = p.State + } + } + } + if !running { + build.Status = status + build.Finished = proc.Stopped + if err := s.store.UpdateBuild(build); err != nil { + log.Printf("error: done: cannot update build_id %d final state: %s", build.ID, err) + } + } + + if err := s.logger.Close(c, id); err != nil { + log.Printf("error: done: cannot close build_id %d logger: %s", proc.ID, err) + } + + build.Procs = model.Tree(procs) + message := pubsub.Message{ + Labels: map[string]string{ + "repo": repo.FullName, + "private": strconv.FormatBool(repo.IsPrivate), + }, + } + message.Data, _ = json.Marshal(model.Event{ + Repo: *repo, + Build: *build, + }) + s.pubsub.Publish(c, "topic/events", message) + + return nil +} // Log implements the rpc.Log function func (s *RPC) Log(c context.Context, id string, line *rpc.Line) error { diff --git a/server/stream.go b/server/stream.go index 264aa23b8..a3c7ed73c 100644 --- a/server/stream.go +++ b/server/stream.go @@ -66,13 +66,13 @@ func LogStream(c *gin.Context) { c.AbortWithError(404, err) return } - job, err := store.GetJobNumber(c, build, jobn) + proc, err := store.FromContext(c).ProcFind(build, jobn) if err != nil { - logrus.Debugln("stream cannot get job number.", err) + logrus.Debugln("stream cannot get proc number.", err) c.AbortWithError(404, err) return } - if job.Status != model.StatusRunning { + if proc.State != model.StatusRunning { logrus.Debugln("stream not found.") c.AbortWithStatus(404) return @@ -102,7 +102,7 @@ func LogStream(c *gin.Context) { go func() { // TODO remove global variable - config.logger.Tail(ctx, fmt.Sprint(job.ID), func(entries ...*logging.Entry) { + config.logger.Tail(ctx, fmt.Sprint(proc.ID), func(entries ...*logging.Entry) { for _, entry := range entries { select { case <-ctx.Done(): diff --git a/store/datastore/builds.go b/store/datastore/builds.go index 0f03608cb..e43fb639e 100644 --- a/store/datastore/builds.go +++ b/store/datastore/builds.go @@ -55,7 +55,7 @@ func (db *datastore) GetBuildQueue() ([]*model.Feed, error) { return feed, err } -func (db *datastore) CreateBuild(build *model.Build, jobs ...*model.Job) error { +func (db *datastore) CreateBuild(build *model.Build, procs ...*model.Proc) error { var number int db.QueryRow(rebind(buildNumberLast), build.RepoID).Scan(&number) build.Number = number + 1 @@ -65,11 +65,9 @@ func (db *datastore) CreateBuild(build *model.Build, jobs ...*model.Job) error { if err != nil { return err } - for i, job := range jobs { - job.BuildID = build.ID - job.Number = i + 1 - job.Enqueued = build.Created - err = meddler.Insert(db, jobTable, job) + for _, proc := range procs { + proc.BuildID = build.ID + err = meddler.Insert(db, "procs", proc) if err != nil { return err } diff --git a/store/datastore/builds_test.go b/store/datastore/builds_test.go index 382e3713f..783578216 100644 --- a/store/datastore/builds_test.go +++ b/store/datastore/builds_test.go @@ -28,7 +28,7 @@ func TestBuilds(t *testing.T) { Status: model.StatusSuccess, Commit: "85f8c029b902ed9400bc600bac301a0aadb144ac", } - err := s.CreateBuild(&build, []*model.Job{}...) + err := s.CreateBuild(&build, []*model.Proc{}...) g.Assert(err == nil).IsTrue() g.Assert(build.ID != 0).IsTrue() g.Assert(build.Number).Equal(1) @@ -42,7 +42,7 @@ func TestBuilds(t *testing.T) { Status: model.StatusSuccess, Commit: "85f8c029b902ed9400bc600bac301a0aadb144ac", } - s.CreateBuild(&build, []*model.Job{}...) + s.CreateBuild(&build, []*model.Proc{}...) build.Status = model.StatusRunning err1 := s.UpdateBuild(&build) getbuild, err2 := s.GetBuild(build.ID) @@ -59,7 +59,7 @@ func TestBuilds(t *testing.T) { RepoID: 1, Status: model.StatusSuccess, } - s.CreateBuild(&build, []*model.Job{}...) + s.CreateBuild(&build, []*model.Proc{}...) getbuild, err := s.GetBuild(build.ID) g.Assert(err == nil).IsTrue() g.Assert(build.ID).Equal(getbuild.ID) @@ -76,8 +76,8 @@ func TestBuilds(t *testing.T) { RepoID: 1, Status: model.StatusPending, } - err1 := s.CreateBuild(build1, []*model.Job{}...) - err2 := s.CreateBuild(build2, []*model.Job{}...) + err1 := s.CreateBuild(build1, []*model.Proc{}...) + err2 := s.CreateBuild(build2, []*model.Proc{}...) getbuild, err3 := s.GetBuildNumber(&model.Repo{ID: 1}, build2.Number) g.Assert(err1 == nil).IsTrue() g.Assert(err2 == nil).IsTrue() @@ -98,8 +98,8 @@ func TestBuilds(t *testing.T) { Status: model.StatusPending, Ref: "refs/pull/6", } - err1 := s.CreateBuild(build1, []*model.Job{}...) - err2 := s.CreateBuild(build2, []*model.Job{}...) + err1 := s.CreateBuild(build1, []*model.Proc{}...) + err2 := s.CreateBuild(build2, []*model.Proc{}...) getbuild, err3 := s.GetBuildRef(&model.Repo{ID: 1}, "refs/pull/6") g.Assert(err1 == nil).IsTrue() g.Assert(err2 == nil).IsTrue() @@ -121,8 +121,8 @@ func TestBuilds(t *testing.T) { Status: model.StatusPending, Ref: "refs/pull/6", } - err1 := s.CreateBuild(build1, []*model.Job{}...) - err2 := s.CreateBuild(build2, []*model.Job{}...) + err1 := s.CreateBuild(build1, []*model.Proc{}...) + err2 := s.CreateBuild(build2, []*model.Proc{}...) getbuild, err3 := s.GetBuildRef(&model.Repo{ID: 1}, "refs/pull/6") g.Assert(err1 == nil).IsTrue() g.Assert(err2 == nil).IsTrue() @@ -146,8 +146,8 @@ func TestBuilds(t *testing.T) { Branch: "dev", Commit: "85f8c029b902ed9400bc600bac301a0aadb144aa", } - err1 := s.CreateBuild(build1, []*model.Job{}...) - err2 := s.CreateBuild(build2, []*model.Job{}...) + err1 := s.CreateBuild(build1, []*model.Proc{}...) + err2 := s.CreateBuild(build2, []*model.Proc{}...) getbuild, err3 := s.GetBuildCommit(&model.Repo{ID: 1}, build2.Commit, build2.Branch) g.Assert(err1 == nil).IsTrue() g.Assert(err2 == nil).IsTrue() @@ -174,8 +174,8 @@ func TestBuilds(t *testing.T) { Commit: "85f8c029b902ed9400bc600bac301a0aadb144aa", Event: model.EventPush, } - err1 := s.CreateBuild(build1, []*model.Job{}...) - err2 := s.CreateBuild(build2, []*model.Job{}...) + err1 := s.CreateBuild(build1, []*model.Proc{}...) + err2 := s.CreateBuild(build2, []*model.Proc{}...) getbuild, err3 := s.GetBuildLast(&model.Repo{ID: 1}, build2.Branch) g.Assert(err1 == nil).IsTrue() g.Assert(err2 == nil).IsTrue() @@ -207,9 +207,9 @@ func TestBuilds(t *testing.T) { Branch: "master", Commit: "85f8c029b902ed9400bc600bac301a0aadb144aa", } - err1 := s.CreateBuild(build1, []*model.Job{}...) - err2 := s.CreateBuild(build2, []*model.Job{}...) - err3 := s.CreateBuild(build3, []*model.Job{}...) + err1 := s.CreateBuild(build1, []*model.Proc{}...) + err2 := s.CreateBuild(build2, []*model.Proc{}...) + err3 := s.CreateBuild(build3, []*model.Proc{}...) getbuild, err4 := s.GetBuildLastBefore(&model.Repo{ID: 1}, build3.Branch, build3.ID) g.Assert(err1 == nil).IsTrue() g.Assert(err2 == nil).IsTrue() @@ -232,8 +232,8 @@ func TestBuilds(t *testing.T) { RepoID: 1, Status: model.StatusSuccess, } - s.CreateBuild(build1, []*model.Job{}...) - s.CreateBuild(build2, []*model.Job{}...) + s.CreateBuild(build1, []*model.Proc{}...) + s.CreateBuild(build2, []*model.Proc{}...) builds, err := s.GetBuildList(&model.Repo{ID: 1}) g.Assert(err == nil).IsTrue() g.Assert(len(builds)).Equal(2) diff --git a/store/datastore/ddl/mysql/13.sql b/store/datastore/ddl/mysql/13.sql new file mode 100644 index 000000000..d03759bdf --- /dev/null +++ b/store/datastore/ddl/mysql/13.sql @@ -0,0 +1,45 @@ +-- +migrate Up + +CREATE TABLE procs ( + proc_id INTEGER PRIMARY KEY AUTO_INCREMENT +,proc_build_id INTEGER +,proc_pid INTEGER +,proc_ppid INTEGER +,proc_pgid INTEGER +,proc_name VARCHAR(250) +,proc_state VARCHAR(250) +,proc_error VARCHAR(500) +,proc_exit_code INTEGER +,proc_started INTEGER +,proc_stopped INTEGER +,proc_machine VARCHAR(250) +,proc_platform VARCHAR(250) +,proc_environ VARCHAR(2000) +,UNIQUE(proc_build_id, proc_pid) +); + +CREATE INDEX proc_build_ix ON procs (proc_build_id); + +CREATE TABLE files ( + file_id INTEGER PRIMARY KEY AUTO_INCREMENT +,file_build_id INTEGER +,file_proc_id INTEGER +,file_name VARCHAR(250) +,file_mime VARCHAR(250) +,file_size INTEGER +,file_time INTEGER +,file_data MEDIUMBLOB +,UNIQUE(file_proc_id,file_name) +); + +CREATE INDEX file_build_ix ON files (file_build_id); +CREATE INDEX file_proc_ix ON files (file_proc_id); + +-- +migrate Down + +DROP INDEX file_build_ix; +DROP INDEX file_proc_ix; +DROP TABLE files; + +DROP INDEX proc_build_ix; +DROP TABLE procs; diff --git a/store/datastore/ddl/postgres/13.sql b/store/datastore/ddl/postgres/13.sql new file mode 100644 index 000000000..f01954205 --- /dev/null +++ b/store/datastore/ddl/postgres/13.sql @@ -0,0 +1,47 @@ +-- +migrate Up + +CREATE TABLE procs ( + proc_id SERIAL PRIMARY KEY +,proc_build_id INTEGER +,proc_pid INTEGER +,proc_ppid INTEGER +,proc_pgid INTEGER +,proc_name VARCHAR(250) +,proc_state VARCHAR(250) +,proc_error VARCHAR(500) +,proc_exit_code INTEGER +,proc_started INTEGER +,proc_stopped INTEGER +,proc_machine VARCHAR(250) +,proc_platform VARCHAR(250) +,proc_environ VARCHAR(2000) + +,UNIQUE(proc_build_id, proc_pid) +); + +CREATE INDEX proc_build_ix ON procs (proc_build_id); + +CREATE TABLE files ( + file_id SERIAL PRIMARY KEY +,file_build_id INTEGER +,file_proc_id INTEGER +,file_name VARCHAR(250) +,file_mime VARCHAR(250) +,file_size INTEGER +,file_time INTEGER +,file_data BYTEA + +,UNIQUE(file_proc_id,file_name) +); + +CREATE INDEX file_build_ix ON files (file_build_id); +CREATE INDEX file_proc_ix ON files (file_proc_id); + +-- +migrate Down + +DROP INDEX file_build_ix; +DROP INDEX file_proc_ix; +DROP TABLE files; + +DROP INDEX proc_build_ix; +DROP TABLE procs; diff --git a/store/datastore/ddl/sqlite3/13.sql b/store/datastore/ddl/sqlite3/13.sql new file mode 100644 index 000000000..e57ca6f9b --- /dev/null +++ b/store/datastore/ddl/sqlite3/13.sql @@ -0,0 +1,46 @@ +-- +migrate Up + +CREATE TABLE procs ( + proc_id INTEGER PRIMARY KEY AUTOINCREMENT +,proc_build_id INTEGER +,proc_pid INTEGER +,proc_ppid INTEGER +,proc_pgid INTEGER +,proc_name TEXT +,proc_state TEXT +,proc_error TEXT +,proc_exit_code INTEGER +,proc_started INTEGER +,proc_stopped INTEGER +,proc_machine TEXT +,proc_platform TEXT +,proc_environ TEXT +,UNIQUE(proc_build_id, proc_pid) +); + +CREATE INDEX proc_build_ix ON procs (proc_build_id); + +CREATE TABLE files ( + file_id INTEGER PRIMARY KEY AUTOINCREMENT +,file_build_id INTEGER +,file_proc_id INTEGER +,file_name TEXT +,file_mime TEXT +,file_size INTEGER +,file_time INTEGER +,file_data BLOB +,UNIQUE(file_proc_id,file_name) +,FOREIGN KEY(file_proc_id) REFERENCES procs (proc_id) ON DELETE CASCADE +); + +CREATE INDEX file_build_ix ON files (file_build_id); +CREATE INDEX file_proc_ix ON files (file_proc_id); + +-- +migrate Down + +DROP INDEX file_build_ix; +DROP INDEX file_proc_ix; +DROP TABLE files; + +DROP INDEX proc_build_ix; +DROP TABLE procs; diff --git a/store/datastore/files.go b/store/datastore/files.go new file mode 100644 index 000000000..108ebcfa3 --- /dev/null +++ b/store/datastore/files.go @@ -0,0 +1,63 @@ +package datastore + +import ( + "bytes" + "io" + "io/ioutil" + + "github.com/drone/drone/model" + "github.com/drone/drone/store/datastore/sql" + + "github.com/russross/meddler" +) + +func (db *datastore) FileList(build *model.Build) ([]*model.File, error) { + stmt := sql.Lookup(db.driver, "files-find-build") + list := []*model.File{} + err := meddler.QueryAll(db, &list, stmt, build.ID) + return list, err +} + +func (db *datastore) FileFind(proc *model.Proc, name string) (*model.File, error) { + stmt := sql.Lookup(db.driver, "files-find-proc-name") + file := new(model.File) + err := meddler.QueryRow(db, file, stmt, proc.ID, name) + return file, err +} + +func (db *datastore) FileRead(proc *model.Proc, name string) (io.ReadCloser, error) { + stmt := sql.Lookup(db.driver, "files-find-proc-name-data") + file := new(fileData) + err := meddler.QueryRow(db, file, stmt, proc.ID, name) + buf := bytes.NewBuffer(file.Data) + return ioutil.NopCloser(buf), err +} + +func (db *datastore) FileCreate(file *model.File, r io.Reader) error { + d, err := ioutil.ReadAll(r) + if err != nil { + return err + } + f := fileData{ + ID: file.ID, + BuildID: file.BuildID, + ProcID: file.ProcID, + Name: file.Name, + Size: file.Size, + Mime: file.Mime, + Time: file.Time, + Data: d, + } + return meddler.Insert(db, "files", &f) +} + +type fileData struct { + ID int64 `meddler:"file_id,pk"` + BuildID int64 `meddler:"file_build_id"` + ProcID int64 `meddler:"file_proc_id"` + Name string `meddler:"file_name"` + Size int `meddler:"file_size"` + Mime string `meddler:"file_mime"` + Time int64 `meddler:"file_time"` + Data []byte `meddler:"file_data"` +} diff --git a/store/datastore/files_test.go b/store/datastore/files_test.go new file mode 100644 index 000000000..051e78a1d --- /dev/null +++ b/store/datastore/files_test.go @@ -0,0 +1,184 @@ +package datastore + +import ( + "bytes" + "io/ioutil" + "testing" + + "github.com/drone/drone/model" +) + +func TestFileFind(t *testing.T) { + s := newTest() + defer func() { + s.Exec("delete from files") + s.Close() + }() + + if err := s.FileCreate( + &model.File{ + BuildID: 2, + ProcID: 1, + Name: "hello.txt", + Mime: "text/plain", + Size: 11, + }, + bytes.NewBufferString("hello world"), + ); err != nil { + t.Errorf("Unexpected error: insert file: %s", err) + return + } + + file, err := s.FileFind(&model.Proc{ID: 1}, "hello.txt") + if err != nil { + t.Error(err) + return + } + if got, want := file.ID, int64(1); got != want { + t.Errorf("Want file id %d, got %d", want, got) + } + if got, want := file.BuildID, int64(2); got != want { + t.Errorf("Want file build id %d, got %d", want, got) + } + if got, want := file.ProcID, int64(1); got != want { + t.Errorf("Want file proc id %d, got %d", want, got) + } + if got, want := file.Name, "hello.txt"; got != want { + t.Errorf("Want file name %s, got %s", want, got) + } + if got, want := file.Mime, "text/plain"; got != want { + t.Errorf("Want file mime %s, got %s", want, got) + } + if got, want := file.Size, 11; got != want { + t.Errorf("Want file size %d, got %d", want, got) + } + + rc, err := s.FileRead(&model.Proc{ID: 1}, "hello.txt") + if err != nil { + t.Error(err) + return + } + out, _ := ioutil.ReadAll(rc) + if got, want := string(out), "hello world"; got != want { + t.Errorf("Want file data %s, got %s", want, got) + } +} + +func TestFileList(t *testing.T) { + s := newTest() + defer func() { + s.Exec("delete from files") + s.Close() + }() + + s.FileCreate( + &model.File{ + BuildID: 1, + ProcID: 1, + Name: "hello.txt", + Mime: "text/plain", + Size: 11, + }, + bytes.NewBufferString("hello world"), + ) + s.FileCreate( + &model.File{ + BuildID: 1, + ProcID: 1, + Name: "hola.txt", + Mime: "text/plain", + Size: 11, + }, + bytes.NewBufferString("hola mundo"), + ) + + files, err := s.FileList(&model.Build{ID: 1}) + if err != nil { + t.Errorf("Unexpected error: select files: %s", err) + return + } + + if got, want := len(files), 2; got != want { + t.Errorf("Wanted %d files, got %d", want, got) + } +} + +func TestFileIndexes(t *testing.T) { + s := newTest() + defer func() { + s.Exec("delete from files") + s.Close() + }() + + if err := s.FileCreate( + &model.File{ + BuildID: 1, + ProcID: 1, + Name: "hello.txt", + Size: 11, + Mime: "text/plain", + }, + bytes.NewBufferString("hello world"), + ); err != nil { + t.Errorf("Unexpected error: insert file: %s", err) + return + } + + // fail due to duplicate file name + if err := s.FileCreate( + &model.File{ + BuildID: 1, + ProcID: 1, + Name: "hello.txt", + Mime: "text/plain", + Size: 11, + }, + bytes.NewBufferString("hello world"), + ); err == nil { + t.Errorf("Unexpected error: dupliate pid") + } +} + +// func TestFileCascade(t *testing.T) { +// s := newTest() +// defer s.Close() +// +// +// err1 := s.ProcCreate([]*model.Proc{ +// { +// BuildID: 1, +// PID: 1, +// PGID: 1, +// Name: "build", +// State: "success", +// }, +// }) +// err2 := s.FileCreate( +// &model.File{ +// BuildID: 1, +// ProcID: 1, +// Name: "hello.txt", +// Mime: "text/plain", +// Size: 11, +// }, +// bytes.NewBufferString("hello world"), +// ) +// +// if err1 != nil { +// t.Errorf("Unexpected error: cannot insert proc: %s", err1) +// } else if err2 != nil { +// t.Errorf("Unexpected error: cannot insert file: %s", err2) +// } +// +// if _, err3 := s.ProcFind(&model.Build{ID: 1}, 1); err3 != nil { +// t.Errorf("Unexpected error: cannot get inserted proc: %s", err3) +// } +// +// db.Exec("delete from procs where proc_id = 1") +// +// file, err4 := s.FileFind(&model.Proc{ID: 1}, "hello.txt") +// if err4 == nil { +// t.Errorf("Expected no rows in result set error") +// t.Log(file) +// } +// } diff --git a/store/datastore/jobs.go b/store/datastore/jobs.go index cbbbca09b..d47f9f049 100644 --- a/store/datastore/jobs.go +++ b/store/datastore/jobs.go @@ -1,49 +1,50 @@ package datastore -import ( - "github.com/drone/drone/model" - "github.com/russross/meddler" -) - -func (db *datastore) GetJob(id int64) (*model.Job, error) { - var job = new(model.Job) - var err = meddler.Load(db, jobTable, job, id) - return job, err -} - -func (db *datastore) GetJobNumber(build *model.Build, num int) (*model.Job, error) { - var job = new(model.Job) - var err = meddler.QueryRow(db, job, rebind(jobNumberQuery), build.ID, num) - return job, err -} - -func (db *datastore) GetJobList(build *model.Build) ([]*model.Job, error) { - var jobs = []*model.Job{} - var err = meddler.QueryAll(db, &jobs, rebind(jobListQuery), build.ID) - return jobs, err -} - -func (db *datastore) CreateJob(job *model.Job) error { - return meddler.Insert(db, jobTable, job) -} - -func (db *datastore) UpdateJob(job *model.Job) error { - return meddler.Update(db, jobTable, job) -} - -const jobTable = "jobs" - -const jobListQuery = ` -SELECT * -FROM jobs -WHERE job_build_id = ? -ORDER BY job_number ASC -` - -const jobNumberQuery = ` -SELECT * -FROM jobs -WHERE job_build_id = ? -AND job_number = ? -LIMIT 1 -` +// +// import ( +// "github.com/drone/drone/model" +// "github.com/russross/meddler" +// ) +// +// func (db *datastore) GetJob(id int64) (*model.Job, error) { +// var job = new(model.Job) +// var err = meddler.Load(db, jobTable, job, id) +// return job, err +// } +// +// func (db *datastore) GetJobNumber(build *model.Build, num int) (*model.Job, error) { +// var job = new(model.Job) +// var err = meddler.QueryRow(db, job, rebind(jobNumberQuery), build.ID, num) +// return job, err +// } +// +// func (db *datastore) GetJobList(build *model.Build) ([]*model.Job, error) { +// var jobs = []*model.Job{} +// var err = meddler.QueryAll(db, &jobs, rebind(jobListQuery), build.ID) +// return jobs, err +// } +// +// func (db *datastore) CreateJob(job *model.Job) error { +// return meddler.Insert(db, jobTable, job) +// } +// +// func (db *datastore) UpdateJob(job *model.Job) error { +// return meddler.Update(db, jobTable, job) +// } +// +// const jobTable = "jobs" +// +// const jobListQuery = ` +// SELECT * +// FROM jobs +// WHERE job_build_id = ? +// ORDER BY job_number ASC +// ` +// +// const jobNumberQuery = ` +// SELECT * +// FROM jobs +// WHERE job_build_id = ? +// AND job_number = ? +// LIMIT 1 +// ` diff --git a/store/datastore/jobs_test.go b/store/datastore/jobs_test.go index bf96e6d86..46693984f 100644 --- a/store/datastore/jobs_test.go +++ b/store/datastore/jobs_test.go @@ -1,118 +1,119 @@ package datastore -import ( - "testing" - - "github.com/drone/drone/model" - "github.com/franela/goblin" -) - -func TestJobs(t *testing.T) { - db := openTest() - defer db.Close() - - s := From(db) - g := goblin.Goblin(t) - g.Describe("Job", func() { - - // before each test we purge the package table data from the database. - g.BeforeEach(func() { - db.Exec("DELETE FROM jobs") - db.Exec("DELETE FROM builds") - }) - - g.It("Should Set a job", func() { - job := &model.Job{ - BuildID: 1, - Status: "pending", - ExitCode: 0, - Number: 1, - } - err1 := s.CreateJob(job) - g.Assert(err1 == nil).IsTrue() - g.Assert(job.ID != 0).IsTrue() - - job.Status = "started" - err2 := s.UpdateJob(job) - g.Assert(err2 == nil).IsTrue() - - getjob, err3 := s.GetJob(job.ID) - g.Assert(err3 == nil).IsTrue() - g.Assert(getjob.Status).Equal(job.Status) - }) - - g.It("Should Get a Job by ID", func() { - job := &model.Job{ - BuildID: 1, - Status: "pending", - ExitCode: 1, - Number: 1, - Environment: map[string]string{"foo": "bar"}, - } - err1 := s.CreateJob(job) - g.Assert(err1 == nil).IsTrue() - g.Assert(job.ID != 0).IsTrue() - - getjob, err2 := s.GetJob(job.ID) - g.Assert(err2 == nil).IsTrue() - g.Assert(getjob.ID).Equal(job.ID) - g.Assert(getjob.Status).Equal(job.Status) - g.Assert(getjob.ExitCode).Equal(job.ExitCode) - g.Assert(getjob.Environment).Equal(job.Environment) - g.Assert(getjob.Environment["foo"]).Equal("bar") - }) - - g.It("Should Get a Job by Number", func() { - job := &model.Job{ - BuildID: 1, - Status: "pending", - ExitCode: 1, - Number: 1, - } - err1 := s.CreateJob(job) - g.Assert(err1 == nil).IsTrue() - g.Assert(job.ID != 0).IsTrue() - - getjob, err2 := s.GetJobNumber(&model.Build{ID: 1}, 1) - g.Assert(err2 == nil).IsTrue() - g.Assert(getjob.ID).Equal(job.ID) - g.Assert(getjob.Status).Equal(job.Status) - }) - - g.It("Should Get a List of Jobs by Commit", func() { - - build := model.Build{ - RepoID: 1, - Status: model.StatusSuccess, - } - jobs := []*model.Job{ - { - BuildID: 1, - Status: "success", - ExitCode: 0, - Number: 1, - }, - { - BuildID: 3, - Status: "error", - ExitCode: 1, - Number: 2, - }, - { - BuildID: 5, - Status: "pending", - ExitCode: 0, - Number: 3, - }, - } - - err1 := s.CreateBuild(&build, jobs...) - g.Assert(err1 == nil).IsTrue() - getjobs, err2 := s.GetJobList(&build) - g.Assert(err2 == nil).IsTrue() - g.Assert(len(getjobs)).Equal(3) - g.Assert(getjobs[0].Number).Equal(1) - g.Assert(getjobs[0].Status).Equal(model.StatusSuccess) - }) - }) -} +// +// import ( +// "testing" +// +// "github.com/drone/drone/model" +// "github.com/franela/goblin" +// ) +// +// func TestJobs(t *testing.T) { +// db := openTest() +// defer db.Close() +// +// s := From(db) +// g := goblin.Goblin(t) +// g.Describe("Job", func() { +// +// // before each test we purge the package table data from the database. +// g.BeforeEach(func() { +// db.Exec("DELETE FROM jobs") +// db.Exec("DELETE FROM builds") +// }) +// +// g.It("Should Set a job", func() { +// job := &model.Job{ +// BuildID: 1, +// Status: "pending", +// ExitCode: 0, +// Number: 1, +// } +// err1 := s.CreateJob(job) +// g.Assert(err1 == nil).IsTrue() +// g.Assert(job.ID != 0).IsTrue() +// +// job.Status = "started" +// err2 := s.UpdateJob(job) +// g.Assert(err2 == nil).IsTrue() +// +// getjob, err3 := s.GetJob(job.ID) +// g.Assert(err3 == nil).IsTrue() +// g.Assert(getjob.Status).Equal(job.Status) +// }) +// +// g.It("Should Get a Job by ID", func() { +// job := &model.Job{ +// BuildID: 1, +// Status: "pending", +// ExitCode: 1, +// Number: 1, +// Environment: map[string]string{"foo": "bar"}, +// } +// err1 := s.CreateJob(job) +// g.Assert(err1 == nil).IsTrue() +// g.Assert(job.ID != 0).IsTrue() +// +// getjob, err2 := s.GetJob(job.ID) +// g.Assert(err2 == nil).IsTrue() +// g.Assert(getjob.ID).Equal(job.ID) +// g.Assert(getjob.Status).Equal(job.Status) +// g.Assert(getjob.ExitCode).Equal(job.ExitCode) +// g.Assert(getjob.Environment).Equal(job.Environment) +// g.Assert(getjob.Environment["foo"]).Equal("bar") +// }) +// +// g.It("Should Get a Job by Number", func() { +// job := &model.Job{ +// BuildID: 1, +// Status: "pending", +// ExitCode: 1, +// Number: 1, +// } +// err1 := s.CreateJob(job) +// g.Assert(err1 == nil).IsTrue() +// g.Assert(job.ID != 0).IsTrue() +// +// getjob, err2 := s.GetJobNumber(&model.Build{ID: 1}, 1) +// g.Assert(err2 == nil).IsTrue() +// g.Assert(getjob.ID).Equal(job.ID) +// g.Assert(getjob.Status).Equal(job.Status) +// }) +// +// g.It("Should Get a List of Jobs by Commit", func() { +// +// build := model.Build{ +// RepoID: 1, +// Status: model.StatusSuccess, +// } +// jobs := []*model.Job{ +// { +// BuildID: 1, +// Status: "success", +// ExitCode: 0, +// Number: 1, +// }, +// { +// BuildID: 3, +// Status: "error", +// ExitCode: 1, +// Number: 2, +// }, +// { +// BuildID: 5, +// Status: "pending", +// ExitCode: 0, +// Number: 3, +// }, +// } +// +// err1 := s.CreateBuild(&build, jobs...) +// g.Assert(err1 == nil).IsTrue() +// getjobs, err2 := s.GetJobList(&build) +// g.Assert(err2 == nil).IsTrue() +// g.Assert(len(getjobs)).Equal(3) +// g.Assert(getjobs[0].Number).Equal(1) +// g.Assert(getjobs[0].Status).Equal(model.StatusSuccess) +// }) +// }) +// } diff --git a/store/datastore/logs.go b/store/datastore/logs.go index 32676dfe6..b39ad3b6a 100644 --- a/store/datastore/logs.go +++ b/store/datastore/logs.go @@ -9,23 +9,29 @@ import ( "github.com/russross/meddler" ) -func (db *datastore) ReadLog(job *model.Job) (io.ReadCloser, error) { - var log = new(model.Log) - var err = meddler.QueryRow(db, log, rebind(logQuery), job.ID) +func (db *datastore) LogFind(proc *model.Proc) (io.ReadCloser, error) { + var log = new(logData) + var err = meddler.QueryRow(db, log, rebind(logQuery), proc.ID) var buf = bytes.NewBuffer(log.Data) return ioutil.NopCloser(buf), err } -func (db *datastore) WriteLog(job *model.Job, r io.Reader) error { - var log = new(model.Log) - var err = meddler.QueryRow(db, log, rebind(logQuery), job.ID) +func (db *datastore) LogSave(proc *model.Proc, r io.Reader) error { + var log = new(logData) + var err = meddler.QueryRow(db, log, rebind(logQuery), proc.ID) if err != nil { - log = &model.Log{JobID: job.ID} + log = &logData{ProcID: proc.ID} } log.Data, _ = ioutil.ReadAll(r) return meddler.Save(db, logTable, log) } +type logData struct { + ID int64 `meddler:"log_id,pk"` + ProcID int64 `meddler:"log_job_id"` + Data []byte `meddler:"log_data"` +} + const logTable = "logs" const logQuery = ` diff --git a/store/datastore/logs_test.go b/store/datastore/logs_test.go index 38c44a4b4..cd57b762d 100644 --- a/store/datastore/logs_test.go +++ b/store/datastore/logs_test.go @@ -24,14 +24,14 @@ func TestLogs(t *testing.T) { }) g.It("Should create a log", func() { - job := model.Job{ + proc := model.Proc{ ID: 1, } buf := bytes.NewBufferString("echo hi") - err := s.WriteLog(&job, buf) + err := s.LogSave(&proc, buf) g.Assert(err == nil).IsTrue() - rc, err := s.ReadLog(&job) + rc, err := s.LogFind(&proc) g.Assert(err == nil).IsTrue() defer rc.Close() out, _ := ioutil.ReadAll(rc) @@ -39,17 +39,17 @@ func TestLogs(t *testing.T) { }) g.It("Should update a log", func() { - job := model.Job{ + proc := model.Proc{ ID: 1, } buf1 := bytes.NewBufferString("echo hi") buf2 := bytes.NewBufferString("echo allo?") - err1 := s.WriteLog(&job, buf1) - err2 := s.WriteLog(&job, buf2) + err1 := s.LogSave(&proc, buf1) + err2 := s.LogSave(&proc, buf2) g.Assert(err1 == nil).IsTrue() g.Assert(err2 == nil).IsTrue() - rc, err := s.ReadLog(&job) + rc, err := s.LogFind(&proc) g.Assert(err == nil).IsTrue() defer rc.Close() out, _ := ioutil.ReadAll(rc) diff --git a/store/datastore/procs.go b/store/datastore/procs.go new file mode 100644 index 000000000..7dcce95ec --- /dev/null +++ b/store/datastore/procs.go @@ -0,0 +1,59 @@ +package datastore + +import ( + "github.com/drone/drone/model" + "github.com/drone/drone/store/datastore/sql" + "github.com/russross/meddler" +) + +func (db *datastore) ProcLoad(id int64) (*model.Proc, error) { + stmt := sql.Lookup(db.driver, "procs-find-id") + proc := new(model.Proc) + err := meddler.QueryRow(db, proc, stmt, id) + return proc, err +} + +func (db *datastore) ProcFind(build *model.Build, pid int) (*model.Proc, error) { + stmt := sql.Lookup(db.driver, "procs-find-build-pid") + proc := new(model.Proc) + err := meddler.QueryRow(db, proc, stmt, build.ID, pid) + return proc, err +} + +func (db *datastore) ProcChild(build *model.Build, pid int, child string) (*model.Proc, error) { + stmt := sql.Lookup(db.driver, "procs-find-build-ppid") + proc := new(model.Proc) + err := meddler.QueryRow(db, proc, stmt, build.ID, pid, child) + return proc, err +} + +func (db *datastore) ProcList(build *model.Build) ([]*model.Proc, error) { + stmt := sql.Lookup(db.driver, "procs-find-build") + list := []*model.Proc{} + err := meddler.QueryAll(db, &list, stmt, build.ID) + return list, err +} + +func (db *datastore) ProcCreate(procs []*model.Proc) error { + for _, proc := range procs { + if err := meddler.Insert(db, "procs", proc); err != nil { + return err + } + } + return nil +} + +func (db *datastore) ProcUpdate(proc *model.Proc) error { + return meddler.Update(db, "procs", proc) +} + +func (db *datastore) ProcClear(build *model.Build) (err error) { + stmt1 := sql.Lookup(db.driver, "files-delete-build") + stmt2 := sql.Lookup(db.driver, "procs-delete-build") + _, err = db.Exec(stmt1, build.ID) + if err != nil { + return + } + _, err = db.Exec(stmt2, build.ID) + return +} diff --git a/store/datastore/procs_test.go b/store/datastore/procs_test.go new file mode 100644 index 000000000..2f9849d36 --- /dev/null +++ b/store/datastore/procs_test.go @@ -0,0 +1,239 @@ +package datastore + +import ( + "testing" + + "github.com/drone/drone/model" +) + +func TestProcFind(t *testing.T) { + s := newTest() + defer func() { + s.Exec("delete from procs") + s.Close() + }() + + err := s.ProcCreate([]*model.Proc{ + { + BuildID: 1000, + PID: 1, + PPID: 2, + PGID: 3, + Name: "build", + State: model.StatusSuccess, + Error: "pc load letter", + ExitCode: 255, + Machine: "localhost", + Platform: "linux/amd64", + Environ: map[string]string{"GOLANG": "tip"}, + }, + }) + if err != nil { + t.Errorf("Unexpected error: insert procs: %s", err) + return + } + + proc, err := s.ProcFind(&model.Build{ID: 1000}, 1) + if err != nil { + t.Error(err) + return + } + if got, want := proc.BuildID, int64(1000); got != want { + t.Errorf("Want proc fk %d, got %d", want, got) + } + if got, want := proc.ID, int64(1); got != want { + t.Errorf("Want proc pk %d, got %d", want, got) + } + if got, want := proc.PID, 1; got != want { + t.Errorf("Want proc ppid %d, got %d", want, got) + } + if got, want := proc.PPID, 2; got != want { + t.Errorf("Want proc ppid %d, got %d", want, got) + } + if got, want := proc.PGID, 3; got != want { + t.Errorf("Want proc pgid %d, got %d", want, got) + } + if got, want := proc.Name, "build"; got != want { + t.Errorf("Want proc name %s, got %s", want, got) + } +} + +func TestProcChild(t *testing.T) { + s := newTest() + defer func() { + s.Exec("delete from procs") + s.Close() + }() + + err := s.ProcCreate([]*model.Proc{ + { + BuildID: 1, + PID: 1, + PPID: 1, + PGID: 1, + State: "success", + }, + { + BuildID: 1, + PID: 2, + PGID: 2, + PPID: 1, + Name: "build", + State: "success", + }, + }) + if err != nil { + t.Errorf("Unexpected error: insert procs: %s", err) + return + } + proc, err := s.ProcChild(&model.Build{ID: 1}, 1, "build") + if err != nil { + t.Error(err) + return + } + + if got, want := proc.PID, 2; got != want { + t.Errorf("Want proc pid %d, got %d", want, got) + } + if got, want := proc.Name, "build"; got != want { + t.Errorf("Want proc name %s, got %s", want, got) + } +} + +func TestProcList(t *testing.T) { + s := newTest() + defer func() { + s.Exec("delete from procs") + s.Close() + }() + + err := s.ProcCreate([]*model.Proc{ + { + BuildID: 2, + PID: 1, + PPID: 1, + PGID: 1, + State: "success", + }, + { + BuildID: 1, + PID: 1, + PPID: 1, + PGID: 1, + State: "success", + }, + { + BuildID: 1, + PID: 2, + PGID: 2, + PPID: 1, + Name: "build", + State: "success", + }, + }) + if err != nil { + t.Errorf("Unexpected error: insert procs: %s", err) + return + } + procs, err := s.ProcList(&model.Build{ID: 1}) + if err != nil { + t.Error(err) + return + } + if got, want := len(procs), 2; got != want { + t.Errorf("Want %d procs, got %d", want, got) + } +} + +func TestProcUpdate(t *testing.T) { + s := newTest() + defer func() { + s.Exec("delete from procs") + s.Close() + }() + + proc := &model.Proc{ + BuildID: 1, + PID: 1, + PPID: 2, + PGID: 3, + Name: "build", + State: "pending", + Error: "pc load letter", + ExitCode: 255, + Machine: "localhost", + Platform: "linux/amd64", + Environ: map[string]string{"GOLANG": "tip"}, + } + if err := s.ProcCreate([]*model.Proc{proc}); err != nil { + t.Errorf("Unexpected error: insert proc: %s", err) + return + } + proc.State = "running" + if err := s.ProcUpdate(proc); err != nil { + t.Errorf("Unexpected error: update proc: %s", err) + return + } + updated, err := s.ProcFind(&model.Build{ID: 1}, 1) + if err != nil { + t.Error(err) + return + } + if got, want := updated.State, "running"; got != want { + t.Errorf("Want proc name %s, got %s", want, got) + } +} + +func TestProcIndexes(t *testing.T) { + s := newTest() + defer func() { + s.Exec("delete from procs") + s.Close() + }() + + if err := s.ProcCreate([]*model.Proc{ + { + BuildID: 1, + PID: 1, + PPID: 1, + PGID: 1, + State: "running", + Name: "build", + }, + }); err != nil { + t.Errorf("Unexpected error: insert procs: %s", err) + return + } + + // fail due to duplicate pid + if err := s.ProcCreate([]*model.Proc{ + { + BuildID: 1, + PID: 1, + PPID: 1, + PGID: 1, + State: "success", + Name: "clone", + }, + }); err == nil { + t.Errorf("Unexpected error: dupliate pid") + } + + // // fail due to duplicate process name + // if err := s.ProcCreate([]*model.Proc{ + // { + // BuildID: 1, + // PID: 2, + // PPID: 1, + // PGID: 1, + // State: "success", + // Name: "build", + // }, + // }); err == nil { + // t.Errorf("Unexpected error: dupliate name") + // } +} + +// func TestProcCascade(t *testing.T) { +// +// } diff --git a/store/datastore/sql/lookup.go b/store/datastore/sql/lookup.go new file mode 100644 index 000000000..6d9550af9 --- /dev/null +++ b/store/datastore/sql/lookup.go @@ -0,0 +1,24 @@ +package sql + +import ( + "github.com/drone/drone/store/datastore/sql/postgres" + "github.com/drone/drone/store/datastore/sql/sqlite" +) + +// Supported database drivers +const ( + DriverSqlite = "sqlite3" + DriverMysql = "mysql" + DriverPostgres = "postgres" +) + +// Lookup returns the named sql statement compatible with +// the specified database driver. +func Lookup(driver string, name string) string { + switch driver { + case DriverPostgres: + return postgres.Lookup(name) + default: + return sqlite.Lookup(name) + } +} diff --git a/store/datastore/sql/postgres/files/files.sql b/store/datastore/sql/postgres/files/files.sql new file mode 100644 index 000000000..31817f766 --- /dev/null +++ b/store/datastore/sql/postgres/files/files.sql @@ -0,0 +1,45 @@ +-- name: files-find-build + +SELECT + file_id +,file_build_id +,file_proc_id +,file_name +,file_mime +,file_size +,file_time +FROM files +WHERE file_build_id = $1 + +-- name: files-find-proc-name + +SELECT + file_id +,file_build_id +,file_proc_id +,file_name +,file_mime +,file_size +,file_time +FROM files +WHERE file_proc_id = $1 + AND file_name = $2 + +-- name: files-find-proc-name-data + +SELECT + file_id +,file_build_id +,file_proc_id +,file_name +,file_mime +,file_size +,file_time +,file_data +FROM files +WHERE file_proc_id = $1 + AND file_name = $2 + +-- name: files-delete-build + +DELETE FROM files WHERE file_build_id = $1 diff --git a/store/datastore/sql/postgres/files/procs.sql b/store/datastore/sql/postgres/files/procs.sql new file mode 100644 index 000000000..7f336885b --- /dev/null +++ b/store/datastore/sql/postgres/files/procs.sql @@ -0,0 +1,86 @@ +-- name: procs-find-id + +SELECT + proc_id +,proc_build_id +,proc_pid +,proc_ppid +,proc_pgid +,proc_name +,proc_state +,proc_error +,proc_exit_code +,proc_started +,proc_stopped +,proc_machine +,proc_platform +,proc_environ +FROM procs +WHERE proc_id = $1 + +-- name: procs-find-build + +SELECT + proc_id +,proc_build_id +,proc_pid +,proc_ppid +,proc_pgid +,proc_name +,proc_state +,proc_error +,proc_exit_code +,proc_started +,proc_stopped +,proc_machine +,proc_platform +,proc_environ +FROM procs +WHERE proc_build_id = $1 + +-- name: procs-find-build-pid + +SELECT +proc_id +,proc_build_id +,proc_pid +,proc_ppid +,proc_pgid +,proc_name +,proc_state +,proc_error +,proc_exit_code +,proc_started +,proc_stopped +,proc_machine +,proc_platform +,proc_environ +FROM procs +WHERE proc_build_id = $1 + AND proc_pid = $2 + +-- name: procs-find-build-ppid + +SELECT +proc_id +,proc_build_id +,proc_pid +,proc_ppid +,proc_pgid +,proc_name +,proc_state +,proc_error +,proc_exit_code +,proc_started +,proc_stopped +,proc_machine +,proc_platform +,proc_environ +FROM procs +WHERE proc_build_id = $1 + AND proc_ppid = $2 + AND proc_name = $3 + +-- name: procs-delete-build + +DELETE FROM procs WHERE proc_build_id = $1 diff --git a/store/datastore/sql/postgres/sql.go b/store/datastore/sql/postgres/sql.go new file mode 100644 index 000000000..2b0b64887 --- /dev/null +++ b/store/datastore/sql/postgres/sql.go @@ -0,0 +1,3 @@ +package postgres + +//go:generate sqlbin sql --package=postgres diff --git a/store/datastore/sql/postgres/sql_gen.go b/store/datastore/sql/postgres/sql_gen.go new file mode 100644 index 000000000..19d26fa3f --- /dev/null +++ b/store/datastore/sql/postgres/sql_gen.go @@ -0,0 +1,151 @@ +package postgres + +// Lookup returns the named statement. +func Lookup(name string) string { + return index[name] +} + +var index = map[string]string{ + "files-find-build": filesFindBuild, + "files-find-proc-name": filesFindProcName, + "files-find-proc-name-data": filesFindProcNameData, + "files-delete-build": filesDeleteBuild, + "procs-find-id": procsFindId, + "procs-find-build": procsFindBuild, + "procs-find-build-pid": procsFindBuildPid, + "procs-find-build-ppid": procsFindBuildPpid, + "procs-delete-build": procsDeleteBuild, +} + +var filesFindBuild = ` +SELECT + file_id +,file_build_id +,file_proc_id +,file_name +,file_mime +,file_size +,file_time +FROM files +WHERE file_build_id = $1 +` + +var filesFindProcName = ` +SELECT + file_id +,file_build_id +,file_proc_id +,file_name +,file_mime +,file_size +,file_time +FROM files +WHERE file_proc_id = $1 + AND file_name = $2 +` + +var filesFindProcNameData = ` +SELECT + file_id +,file_build_id +,file_proc_id +,file_name +,file_mime +,file_size +,file_time +,file_data +FROM files +WHERE file_proc_id = $1 + AND file_name = $2 +` + +var filesDeleteBuild = ` +DELETE FROM files WHERE file_build_id = $1 +` + +var procsFindId = ` +SELECT + proc_id +,proc_build_id +,proc_pid +,proc_ppid +,proc_pgid +,proc_name +,proc_state +,proc_error +,proc_exit_code +,proc_started +,proc_stopped +,proc_machine +,proc_platform +,proc_environ +FROM procs +WHERE proc_id = $1 +` + +var procsFindBuild = ` +SELECT + proc_id +,proc_build_id +,proc_pid +,proc_ppid +,proc_pgid +,proc_name +,proc_state +,proc_error +,proc_exit_code +,proc_started +,proc_stopped +,proc_machine +,proc_platform +,proc_environ +FROM procs +WHERE proc_build_id = $1 +` + +var procsFindBuildPid = ` +SELECT +proc_id +,proc_build_id +,proc_pid +,proc_ppid +,proc_pgid +,proc_name +,proc_state +,proc_error +,proc_exit_code +,proc_started +,proc_stopped +,proc_machine +,proc_platform +,proc_environ +FROM procs +WHERE proc_build_id = $1 + AND proc_pid = $2 +` + +var procsFindBuildPpid = ` +SELECT +proc_id +,proc_build_id +,proc_pid +,proc_ppid +,proc_pgid +,proc_name +,proc_state +,proc_error +,proc_exit_code +,proc_started +,proc_stopped +,proc_machine +,proc_platform +,proc_environ +FROM procs +WHERE proc_build_id = $1 + AND proc_ppid = $2 + AND proc_name = $3 +` + +var procsDeleteBuild = ` +DELETE FROM procs WHERE proc_build_id = $1 +` diff --git a/store/datastore/sql/sqlite/files/files.sql b/store/datastore/sql/sqlite/files/files.sql new file mode 100644 index 000000000..0088a218f --- /dev/null +++ b/store/datastore/sql/sqlite/files/files.sql @@ -0,0 +1,45 @@ +-- name: files-find-build + +SELECT + file_id +,file_build_id +,file_proc_id +,file_name +,file_mime +,file_size +,file_time +FROM files +WHERE file_build_id = ? + +-- name: files-find-proc-name + +SELECT + file_id +,file_build_id +,file_proc_id +,file_name +,file_mime +,file_size +,file_time +FROM files +WHERE file_proc_id = ? + AND file_name = ? + +-- name: files-find-proc-name-data + +SELECT + file_id +,file_build_id +,file_proc_id +,file_name +,file_mime +,file_size +,file_time +,file_data +FROM files +WHERE file_proc_id = ? + AND file_name = ? + +-- name: files-delete-build + +DELETE FROM files WHERE file_build_id = ? diff --git a/store/datastore/sql/sqlite/files/procs.sql b/store/datastore/sql/sqlite/files/procs.sql new file mode 100644 index 000000000..5a9730aab --- /dev/null +++ b/store/datastore/sql/sqlite/files/procs.sql @@ -0,0 +1,86 @@ +-- name: procs-find-id + +SELECT + proc_id +,proc_build_id +,proc_pid +,proc_ppid +,proc_pgid +,proc_name +,proc_state +,proc_error +,proc_exit_code +,proc_started +,proc_stopped +,proc_machine +,proc_platform +,proc_environ +FROM procs +WHERE proc_id = ? + +-- name: procs-find-build + +SELECT + proc_id +,proc_build_id +,proc_pid +,proc_ppid +,proc_pgid +,proc_name +,proc_state +,proc_error +,proc_exit_code +,proc_started +,proc_stopped +,proc_machine +,proc_platform +,proc_environ +FROM procs +WHERE proc_build_id = ? + +-- name: procs-find-build-pid + +SELECT +proc_id +,proc_build_id +,proc_pid +,proc_ppid +,proc_pgid +,proc_name +,proc_state +,proc_error +,proc_exit_code +,proc_started +,proc_stopped +,proc_machine +,proc_platform +,proc_environ +FROM procs +WHERE proc_build_id = ? + AND proc_pid = ? + +-- name: procs-find-build-ppid + +SELECT +proc_id +,proc_build_id +,proc_pid +,proc_ppid +,proc_pgid +,proc_name +,proc_state +,proc_error +,proc_exit_code +,proc_started +,proc_stopped +,proc_machine +,proc_platform +,proc_environ +FROM procs +WHERE proc_build_id = ? + AND proc_ppid = ? + AND proc_name = ? + +-- name: procs-delete-build + +DELETE FROM procs WHERE proc_build_id = ? diff --git a/store/datastore/sql/sqlite/sql.go b/store/datastore/sql/sqlite/sql.go new file mode 100644 index 000000000..60cd5f241 --- /dev/null +++ b/store/datastore/sql/sqlite/sql.go @@ -0,0 +1,3 @@ +package sqlite + +//go:generate sqlbin sql --package=sqlite diff --git a/store/datastore/sql/sqlite/sql_gen.go b/store/datastore/sql/sqlite/sql_gen.go new file mode 100644 index 000000000..8e2574797 --- /dev/null +++ b/store/datastore/sql/sqlite/sql_gen.go @@ -0,0 +1,151 @@ +package sqlite + +// Lookup returns the named statement. +func Lookup(name string) string { + return index[name] +} + +var index = map[string]string{ + "files-find-build": filesFindBuild, + "files-find-proc-name": filesFindProcName, + "files-find-proc-name-data": filesFindProcNameData, + "files-delete-build": filesDeleteBuild, + "procs-find-id": procsFindId, + "procs-find-build": procsFindBuild, + "procs-find-build-pid": procsFindBuildPid, + "procs-find-build-ppid": procsFindBuildPpid, + "procs-delete-build": procsDeleteBuild, +} + +var filesFindBuild = ` +SELECT + file_id +,file_build_id +,file_proc_id +,file_name +,file_mime +,file_size +,file_time +FROM files +WHERE file_build_id = ? +` + +var filesFindProcName = ` +SELECT + file_id +,file_build_id +,file_proc_id +,file_name +,file_mime +,file_size +,file_time +FROM files +WHERE file_proc_id = ? + AND file_name = ? +` + +var filesFindProcNameData = ` +SELECT + file_id +,file_build_id +,file_proc_id +,file_name +,file_mime +,file_size +,file_time +,file_data +FROM files +WHERE file_proc_id = ? + AND file_name = ? +` + +var filesDeleteBuild = ` +DELETE FROM files WHERE file_build_id = ? +` + +var procsFindId = ` +SELECT + proc_id +,proc_build_id +,proc_pid +,proc_ppid +,proc_pgid +,proc_name +,proc_state +,proc_error +,proc_exit_code +,proc_started +,proc_stopped +,proc_machine +,proc_platform +,proc_environ +FROM procs +WHERE proc_id = ? +` + +var procsFindBuild = ` +SELECT + proc_id +,proc_build_id +,proc_pid +,proc_ppid +,proc_pgid +,proc_name +,proc_state +,proc_error +,proc_exit_code +,proc_started +,proc_stopped +,proc_machine +,proc_platform +,proc_environ +FROM procs +WHERE proc_build_id = ? +` + +var procsFindBuildPid = ` +SELECT +proc_id +,proc_build_id +,proc_pid +,proc_ppid +,proc_pgid +,proc_name +,proc_state +,proc_error +,proc_exit_code +,proc_started +,proc_stopped +,proc_machine +,proc_platform +,proc_environ +FROM procs +WHERE proc_build_id = ? + AND proc_pid = ? +` + +var procsFindBuildPpid = ` +SELECT +proc_id +,proc_build_id +,proc_pid +,proc_ppid +,proc_pgid +,proc_name +,proc_state +,proc_error +,proc_exit_code +,proc_started +,proc_stopped +,proc_machine +,proc_platform +,proc_environ +FROM procs +WHERE proc_build_id = ? + AND proc_ppid = ? + AND proc_name = ? +` + +var procsDeleteBuild = ` +DELETE FROM procs WHERE proc_build_id = ? +` diff --git a/store/datastore/store.go b/store/datastore/store.go index 2d5936132..28798176e 100644 --- a/store/datastore/store.go +++ b/store/datastore/store.go @@ -17,19 +17,24 @@ import ( // of the sql/database driver with a relational database backend. type datastore struct { *sql.DB + + driver string + config string } // New creates a database connection for the given driver and datasource // and returns a new Store. func New(driver, config string) store.Store { - return From( - open(driver, config), - ) + return &datastore{ + DB: open(driver, config), + driver: driver, + config: config, + } } // From returns a Store using an existing database connection. func From(db *sql.DB) store.Store { - return &datastore{db} + return &datastore{DB: db} } // open opens a new database connection with the specified @@ -60,7 +65,7 @@ func open(driver, config string) *sql.DB { return db } -// OpenTest opens a new database connection for testing purposes. +// openTest opens a new database connection for testing purposes. // The database driver and connection string are provided by // environment variables, with fallback to in-memory sqlite. func openTest() *sql.DB { @@ -75,6 +80,25 @@ func openTest() *sql.DB { return open(driver, config) } +// newTest creates a new database connection for testing purposes. +// The database driver and connection string are provided by +// environment variables, with fallback to in-memory sqlite. +func newTest() *datastore { + var ( + driver = "sqlite3" + config = ":memory:" + ) + if os.Getenv("DATABASE_DRIVER") != "" { + driver = os.Getenv("DATABASE_DRIVER") + config = os.Getenv("DATABASE_CONFIG") + } + return &datastore{ + DB: open(driver, config), + driver: driver, + config: config, + } +} + // helper function to ping the database with backoff to ensure // a connection can be established before we proceed with the // database setup and migration. diff --git a/store/store.go b/store/store.go index 21f114b79..79abc787e 100644 --- a/store/store.go +++ b/store/store.go @@ -107,31 +107,31 @@ type Store interface { GetBuildQueue() ([]*model.Feed, error) // CreateBuild creates a new build and jobs. - CreateBuild(*model.Build, ...*model.Job) error + CreateBuild(*model.Build, ...*model.Proc) error // UpdateBuild updates a build. UpdateBuild(*model.Build) error - // GetJob gets a job by unique ID. - GetJob(int64) (*model.Job, error) - - // GetJobNumber gets a job by number. - GetJobNumber(*model.Build, int) (*model.Job, error) - - // GetJobList gets a list of all users in the system. - GetJobList(*model.Build) ([]*model.Job, error) - - // CreateJob creates a job. - CreateJob(*model.Job) error - - // UpdateJob updates a job. - UpdateJob(*model.Job) error - - // ReadLog reads the Job logs from the datastore. - ReadLog(*model.Job) (io.ReadCloser, error) - - // WriteLog writes the job logs to the datastore. - WriteLog(*model.Job, io.Reader) error + // // GetJob gets a job by unique ID. + // GetJob(int64) (*model.Job, error) + // + // // GetJobNumber gets a job by number. + // GetJobNumber(*model.Build, int) (*model.Job, error) + // + // // GetJobList gets a list of all users in the system. + // GetJobList(*model.Build) ([]*model.Job, error) + // + // // CreateJob creates a job. + // CreateJob(*model.Job) error + // + // // UpdateJob updates a job. + // UpdateJob(*model.Job) error + // + // // ReadLog reads the Job logs from the datastore. + // ReadLog(*model.Job) (io.ReadCloser, error) + // + // // WriteLog writes the job logs to the datastore. + // WriteLog(*model.Job, io.Reader) error GetAgent(int64) (*model.Agent, error) @@ -144,6 +144,22 @@ type Store interface { UpdateAgent(*model.Agent) error DeleteAgent(*model.Agent) error + + ProcLoad(int64) (*model.Proc, error) + ProcFind(*model.Build, int) (*model.Proc, error) + ProcChild(*model.Build, int, string) (*model.Proc, error) + ProcList(*model.Build) ([]*model.Proc, error) + ProcCreate([]*model.Proc) error + ProcUpdate(*model.Proc) error + ProcClear(*model.Build) error + + LogFind(*model.Proc) (io.ReadCloser, error) + LogSave(*model.Proc, io.Reader) error + + FileList(*model.Build) ([]*model.File, error) + FileFind(*model.Proc, string) (*model.File, error) + FileRead(*model.Proc, string) (io.ReadCloser, error) + FileCreate(*model.File, io.Reader) error } const globalTeamName = "__global__" @@ -336,82 +352,41 @@ func GetBuildQueue(c context.Context) ([]*model.Feed, error) { return FromContext(c).GetBuildQueue() } -func CreateBuild(c context.Context, build *model.Build, jobs ...*model.Job) error { - return FromContext(c).CreateBuild(build, jobs...) +func CreateBuild(c context.Context, build *model.Build, procs ...*model.Proc) error { + return FromContext(c).CreateBuild(build, procs...) } func UpdateBuild(c context.Context, build *model.Build) error { return FromContext(c).UpdateBuild(build) } -func UpdateBuildJob(c context.Context, build *model.Build, job *model.Job) (bool, error) { - if err := UpdateJob(c, job); err != nil { - return false, err - } - - // if the job is running or started we don't need to update the build - // status since. - if job.Status == model.StatusRunning || job.Status == model.StatusPending { - return false, nil - } - - jobs, err := GetJobList(c, build) - if err != nil { - return false, err - } - // check to see if all jobs are finished for this build. If yes, we need to - // calcualte the overall build status and finish time. - status := model.StatusSuccess - finish := job.Finished - for _, job := range jobs { - if job.Finished > finish { - finish = job.Finished - } - switch job.Status { - case model.StatusSuccess: - // no-op - case model.StatusRunning, model.StatusPending: - return false, nil - default: - status = job.Status - } - } - - build.Status = status - build.Finished = finish - if err := FromContext(c).UpdateBuild(build); err != nil { - return false, err - } - return true, nil -} - -func GetJob(c context.Context, id int64) (*model.Job, error) { - return FromContext(c).GetJob(id) -} - -func GetJobNumber(c context.Context, build *model.Build, num int) (*model.Job, error) { - return FromContext(c).GetJobNumber(build, num) -} - -func GetJobList(c context.Context, build *model.Build) ([]*model.Job, error) { - return FromContext(c).GetJobList(build) -} - -func CreateJob(c context.Context, job *model.Job) error { - return FromContext(c).CreateJob(job) -} - -func UpdateJob(c context.Context, job *model.Job) error { - return FromContext(c).UpdateJob(job) -} - -func ReadLog(c context.Context, job *model.Job) (io.ReadCloser, error) { - return FromContext(c).ReadLog(job) -} - -func WriteLog(c context.Context, job *model.Job, r io.Reader) error { - return FromContext(c).WriteLog(job, r) -} +// func GetJob(c context.Context, id int64) (*model.Job, error) { +// return FromContext(c).GetJob(id) +// } +// +// func GetJobNumber(c context.Context, build *model.Build, num int) (*model.Job, error) { +// return FromContext(c).GetJobNumber(build, num) +// } +// +// func GetJobList(c context.Context, build *model.Build) ([]*model.Job, error) { +// return FromContext(c).GetJobList(build) +// } +// +// func CreateJob(c context.Context, job *model.Job) error { +// return FromContext(c).CreateJob(job) +// } +// +// func UpdateJob(c context.Context, job *model.Job) error { +// return FromContext(c).UpdateJob(job) +// } +// +// func ReadLog(c context.Context, job *model.Job) (io.ReadCloser, error) { +// return FromContext(c).ReadLog(job) +// } +// +// func WriteLog(c context.Context, job *model.Job, r io.Reader) error { +// return FromContext(c).WriteLog(job, r) +// } func GetAgent(c context.Context, id int64) (*model.Agent, error) { return FromContext(c).GetAgent(id) diff --git a/vendor/github.com/cncd/pipeline/pipeline/rpc/client.go b/vendor/github.com/cncd/pipeline/pipeline/rpc/client.go index b21957559..07016825d 100644 --- a/vendor/github.com/cncd/pipeline/pipeline/rpc/client.go +++ b/vendor/github.com/cncd/pipeline/pipeline/rpc/client.go @@ -3,7 +3,6 @@ package rpc import ( "context" "io" - "io/ioutil" "log" "math" "net/http" @@ -18,6 +17,7 @@ import ( const ( methodNext = "next" methodWait = "wait" + methodInit = "init" methodDone = "done" methodExtend = "extend" methodUpdate = "update" @@ -28,8 +28,7 @@ const ( type ( uploadReq struct { ID string `json:"id"` - Mime string `json:"mime"` - Data []byte `json:"data"` + File *File `json:"file"` } updateReq struct { @@ -90,9 +89,16 @@ func (t *Client) Wait(c context.Context, id string) error { return t.call(c, methodWait, id, nil) } +// Init signals the pipeline is initialized. +func (t *Client) Init(c context.Context, id string, state State) error { + params := updateReq{id, state} + return t.call(c, methodInit, ¶ms, nil) +} + // Done signals the pipeline is complete. -func (t *Client) Done(c context.Context, id string) error { - return t.call(c, methodDone, id, nil) +func (t *Client) Done(c context.Context, id string, state State) error { + params := updateReq{id, state} + return t.call(c, methodDone, ¶ms, nil) } // Extend extends the pipeline deadline. @@ -113,12 +119,8 @@ func (t *Client) Log(c context.Context, id string, line *Line) error { } // Upload uploads the pipeline artifact. -func (t *Client) Upload(c context.Context, id, mime string, file io.Reader) error { - data, err := ioutil.ReadAll(file) - if err != nil { - return err - } - params := uploadReq{id, mime, data} +func (t *Client) Upload(c context.Context, id string, file *File) error { + params := uploadReq{id, file} return t.call(c, methodUpload, params, nil) } diff --git a/vendor/github.com/cncd/pipeline/pipeline/rpc/line.go b/vendor/github.com/cncd/pipeline/pipeline/rpc/line.go index d85ae7bbb..1cbe4cd1e 100644 --- a/vendor/github.com/cncd/pipeline/pipeline/rpc/line.go +++ b/vendor/github.com/cncd/pipeline/pipeline/rpc/line.go @@ -36,12 +36,13 @@ func (l *Line) String() string { // LineWriter sends logs to the client. type LineWriter struct { - peer Peer - id string - name string - num int - now time.Time - rep *strings.Replacer + peer Peer + id string + name string + num int + now time.Time + rep *strings.Replacer + lines []*Line } // NewLineWriter returns a new line reader. @@ -91,5 +92,16 @@ func (w *LineWriter) Write(p []byte) (n int, err error) { // w.peer.Log(context.Background(), w.id, line) // w.num++ // } + w.lines = append(w.lines, line) return len(p), nil } + +// Lines returns the line history +func (w *LineWriter) Lines() []*Line { + return w.lines +} + +// Clear clears the line history +func (w *LineWriter) Clear() { + w.lines = w.lines[:0] +} diff --git a/vendor/github.com/cncd/pipeline/pipeline/rpc/peer.go b/vendor/github.com/cncd/pipeline/pipeline/rpc/peer.go index 22c96db0b..8c7e21e7e 100644 --- a/vendor/github.com/cncd/pipeline/pipeline/rpc/peer.go +++ b/vendor/github.com/cncd/pipeline/pipeline/rpc/peer.go @@ -2,7 +2,6 @@ package rpc import ( "context" - "io" "github.com/cncd/pipeline/pipeline/backend" ) @@ -33,6 +32,16 @@ type ( Config *backend.Config `json:"config"` Timeout int64 `json:"timeout"` } + + // File defines a pipeline artifact. + File struct { + Name string `json:"name"` + Proc string `json:"proc"` + Mime string `json:"mime"` + Time int64 `json:"time"` + Size int `json:"size"` + Data []byte `json:"data"` + } ) // NoFilter is an empty filter. @@ -43,11 +52,14 @@ type Peer interface { // Next returns the next pipeline in the queue. Next(c context.Context, f Filter) (*Pipeline, error) - // Wait blocks untilthe pipeline is complete. + // Wait blocks until the pipeline is complete. Wait(c context.Context, id string) error + // Init signals the pipeline is initialized. + Init(c context.Context, id string, state State) error + // Done signals the pipeline is complete. - Done(c context.Context, id string) error + Done(c context.Context, id string, state State) error // Extend extends the pipeline deadline Extend(c context.Context, id string) error @@ -56,7 +68,7 @@ type Peer interface { Update(c context.Context, id string, state State) error // Upload uploads the pipeline artifact. - Upload(c context.Context, id, mime string, file io.Reader) error + Upload(c context.Context, id string, file *File) error // Log writes the pipeline log entry. Log(c context.Context, id string, line *Line) error diff --git a/vendor/github.com/cncd/pipeline/pipeline/rpc/server.go b/vendor/github.com/cncd/pipeline/pipeline/rpc/server.go index decb9ed2b..65eb0f438 100644 --- a/vendor/github.com/cncd/pipeline/pipeline/rpc/server.go +++ b/vendor/github.com/cncd/pipeline/pipeline/rpc/server.go @@ -1,7 +1,6 @@ package rpc import ( - "bytes" "context" "encoding/json" "errors" @@ -54,6 +53,8 @@ func (s *Server) router(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2. return s.next(ctx, req) case methodWait: return s.wait(ctx, req) + case methodInit: + return s.init(ctx, req) case methodDone: return s.done(ctx, req) case methodExtend: @@ -90,15 +91,24 @@ func (s *Server) wait(ctx context.Context, req *jsonrpc2.Request) (interface{}, return nil, s.peer.Wait(ctx, id) } +// init unmarshals the rpc request parameters and invokes the peer.Init +// procedure. The results are retuned and written to the rpc response. +func (s *Server) init(ctx context.Context, req *jsonrpc2.Request) (interface{}, error) { + in := new(updateReq) + if err := json.Unmarshal([]byte(*req.Params), in); err != nil { + return nil, err + } + return nil, s.peer.Init(ctx, in.ID, in.State) +} + // done unmarshals the rpc request parameters and invokes the peer.Done // procedure. The results are retuned and written to the rpc response. func (s *Server) done(ctx context.Context, req *jsonrpc2.Request) (interface{}, error) { - var id string - err := json.Unmarshal([]byte(*req.Params), &id) - if err != nil { + in := new(updateReq) + if err := json.Unmarshal([]byte(*req.Params), in); err != nil { return nil, err } - return nil, s.peer.Done(ctx, id) + return nil, s.peer.Done(ctx, in.ID, in.State) } // extend unmarshals the rpc request parameters and invokes the peer.Extend @@ -137,5 +147,5 @@ func (s *Server) upload(req *jsonrpc2.Request) (interface{}, error) { if err := json.Unmarshal([]byte(*req.Params), in); err != nil { return nil, err } - return nil, s.peer.Upload(noContext, in.ID, in.Mime, bytes.NewBuffer(in.Data)) + return nil, s.peer.Upload(noContext, in.ID, in.File) } diff --git a/vendor/vendor.json b/vendor/vendor.json index 7c33e2d14..9d792a276 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -28,68 +28,68 @@ { "checksumSHA1": "W3AuK8ocqHwlUajGmQLFvnRhTZE=", "path": "github.com/cncd/pipeline/pipeline", - "revision": "addc99dad68008570994f8de318101adfe4161a6", - "revisionTime": "2017-03-19T09:04:25Z" + "revision": "4b348532eddd31220de9a179c197d31a78b200f5", + "revisionTime": "2017-03-29T08:36:18Z" }, { "checksumSHA1": "Qu2FreqaMr8Yx2bW9O0cxAGgjr0=", "path": "github.com/cncd/pipeline/pipeline/backend", - "revision": "addc99dad68008570994f8de318101adfe4161a6", - "revisionTime": "2017-03-19T09:04:25Z" + "revision": "4b348532eddd31220de9a179c197d31a78b200f5", + "revisionTime": "2017-03-29T08:36:18Z" }, { "checksumSHA1": "0CGXRaYwZhJxGIrGhn8WGpkFqPo=", "path": "github.com/cncd/pipeline/pipeline/backend/docker", - "revision": "addc99dad68008570994f8de318101adfe4161a6", - "revisionTime": "2017-03-19T09:04:25Z" + "revision": "4b348532eddd31220de9a179c197d31a78b200f5", + "revisionTime": "2017-03-29T08:36:18Z" }, { "checksumSHA1": "/8wE+cVb7T4PQZgpLNu0DHzKGuE=", "path": "github.com/cncd/pipeline/pipeline/frontend", - "revision": "addc99dad68008570994f8de318101adfe4161a6", - "revisionTime": "2017-03-19T09:04:25Z" + "revision": "4b348532eddd31220de9a179c197d31a78b200f5", + "revisionTime": "2017-03-29T08:36:18Z" }, { "checksumSHA1": "O0sulBQAHJeNLg3lO38Cq5uf/eg=", "path": "github.com/cncd/pipeline/pipeline/frontend/yaml", - "revision": "addc99dad68008570994f8de318101adfe4161a6", - "revisionTime": "2017-03-19T09:04:25Z" + "revision": "4b348532eddd31220de9a179c197d31a78b200f5", + "revisionTime": "2017-03-29T08:36:18Z" }, { "checksumSHA1": "ftyr9EJQl9D5OvzOcqGBS6stt0g=", "path": "github.com/cncd/pipeline/pipeline/frontend/yaml/compiler", - "revision": "addc99dad68008570994f8de318101adfe4161a6", - "revisionTime": "2017-03-19T09:04:25Z" + "revision": "4b348532eddd31220de9a179c197d31a78b200f5", + "revisionTime": "2017-03-29T08:36:18Z" }, { "checksumSHA1": "Q0GkNUFamVYIA1Fd8r0A5M6Gx54=", "path": "github.com/cncd/pipeline/pipeline/frontend/yaml/linter", - "revision": "addc99dad68008570994f8de318101adfe4161a6", - "revisionTime": "2017-03-19T09:04:25Z" + "revision": "4b348532eddd31220de9a179c197d31a78b200f5", + "revisionTime": "2017-03-29T08:36:18Z" }, { "checksumSHA1": "kx2sPUIMozPC/g6E4w48h3FfH3k=", "path": "github.com/cncd/pipeline/pipeline/frontend/yaml/matrix", - "revision": "addc99dad68008570994f8de318101adfe4161a6", - "revisionTime": "2017-03-19T09:04:25Z" + "revision": "4b348532eddd31220de9a179c197d31a78b200f5", + "revisionTime": "2017-03-29T08:36:18Z" }, { "checksumSHA1": "2/3f3oNmxXy5kcrRLCFa24Oc9O4=", "path": "github.com/cncd/pipeline/pipeline/interrupt", - "revision": "addc99dad68008570994f8de318101adfe4161a6", - "revisionTime": "2017-03-19T09:04:25Z" + "revision": "4b348532eddd31220de9a179c197d31a78b200f5", + "revisionTime": "2017-03-29T08:36:18Z" }, { "checksumSHA1": "uOjTfke7Qxosrivgz/nVTHeIP5g=", "path": "github.com/cncd/pipeline/pipeline/multipart", - "revision": "addc99dad68008570994f8de318101adfe4161a6", - "revisionTime": "2017-03-19T09:04:25Z" + "revision": "4b348532eddd31220de9a179c197d31a78b200f5", + "revisionTime": "2017-03-29T08:36:18Z" }, { - "checksumSHA1": "MratmNKJ78/IhWvDsZphN01CtmE=", + "checksumSHA1": "TP5lK1T8cOKv5QjZ2nqdlYczSTo=", "path": "github.com/cncd/pipeline/pipeline/rpc", - "revision": "addc99dad68008570994f8de318101adfe4161a6", - "revisionTime": "2017-03-19T09:04:25Z" + "revision": "4b348532eddd31220de9a179c197d31a78b200f5", + "revisionTime": "2017-03-29T08:36:18Z" }, { "checksumSHA1": "7Qj1DK0ceAXkYztW0l3+L6sn+V8=",