diff --git a/server/build.go b/server/build.go index 7da655c63..1f6d32578 100644 --- a/server/build.go +++ b/server/build.go @@ -17,7 +17,6 @@ package server import ( "bytes" "context" - "encoding/json" "fmt" "io" "net/http" @@ -26,8 +25,6 @@ import ( "github.com/Sirupsen/logrus" "github.com/gin-gonic/gin" - "github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline/rpc" - "github.com/laszlocph/drone-oss-08/cncd/pubsub" "github.com/laszlocph/drone-oss-08/cncd/queue" "github.com/laszlocph/drone-oss-08/remote" "github.com/laszlocph/drone-oss-08/shared/httputil" @@ -337,7 +334,7 @@ func PostApproval(c *gin.Context) { Yaml: conf.Data, Envs: envs, } - items, err := b.Build() + buildItems, err := b.Build() if err != nil { build.Status = model.StatusError build.Started = time.Now().Unix() @@ -347,73 +344,14 @@ func PostApproval(c *gin.Context) { return } - var pcounter = len(items) - for _, item := range items { - build.Procs = append(build.Procs, item.Proc) - item.Proc.BuildID = build.ID - - for _, stage := range item.Config.Stages { - var gid int - for _, step := range stage.Steps { - pcounter++ - if gid == 0 { - gid = pcounter - } - proc := &model.Proc{ - BuildID: build.ID, - Name: step.Alias, - PID: pcounter, - PPID: item.Proc.PID, - PGID: gid, - State: model.StatusPending, - } - build.Procs = append(build.Procs, proc) - } - } + setBuildProcs(build, buildItems) + err = store.FromContext(c).ProcCreate(build.Procs) + if err != nil { + logrus.Errorf("error persisting procs %s/%d: %s", repo.FullName, build.Number, err) } - store.FromContext(c).ProcCreate(build.Procs) - // - // publish topic - // - buildCopy := *build - buildCopy.Procs = model.Tree(buildCopy.Procs) - message := pubsub.Message{ - Labels: map[string]string{ - "repo": repo.FullName, - "private": strconv.FormatBool(repo.IsPrivate), - }, - } - message.Data, _ = json.Marshal(model.Event{ - Type: model.Enqueued, - Repo: *repo, - Build: buildCopy, - }) - // TODO remove global reference - Config.Services.Pubsub.Publish(c, "topic/events", message) - - // - // end publish topic - // - - for _, item := range items { - task := new(queue.Task) - task.ID = fmt.Sprint(item.Proc.ID) - task.Labels = map[string]string{} - task.Labels["platform"] = item.Platform - for k, v := range item.Labels { - task.Labels[k] = v - } - - task.Data, _ = json.Marshal(rpc.Pipeline{ - ID: fmt.Sprint(item.Proc.ID), - Config: item.Config, - Timeout: b.Repo.Timeout, - }) - - Config.Services.Logs.Open(context.Background(), task.ID) - Config.Services.Queue.Push(context.Background(), task) - } + publishToTopic(c, build, repo) + queueBuild(build, repo, buildItems) } func PostDecline(c *gin.Context) { @@ -463,15 +401,7 @@ func GetBuildQueue(c *gin.Context) { c.JSON(200, out) } -// -// -// -// -// -// - func PostBuild(c *gin.Context) { - remote_ := remote.FromContext(c) repo := session.Repo(c) @@ -592,7 +522,7 @@ func PostBuild(c *gin.Context) { Yaml: conf.Data, Envs: buildParams, } - items, err := b.Build() + buildItems, err := b.Build() if err != nil { build.Status = model.StatusError build.Started = time.Now().Unix() @@ -602,30 +532,7 @@ func PostBuild(c *gin.Context) { return } - var pcounter = len(items) - for _, item := range items { - build.Procs = append(build.Procs, item.Proc) - item.Proc.BuildID = build.ID - - for _, stage := range item.Config.Stages { - var gid int - for _, step := range stage.Steps { - pcounter++ - if gid == 0 { - gid = pcounter - } - proc := &model.Proc{ - BuildID: build.ID, - Name: step.Alias, - PID: pcounter, - PPID: item.Proc.PID, - PGID: gid, - State: model.StatusPending, - } - build.Procs = append(build.Procs, proc) - } - } - } + setBuildProcs(build, buildItems) err = store.FromContext(c).ProcCreate(build.Procs) if err != nil { @@ -637,55 +544,12 @@ func PostBuild(c *gin.Context) { c.JSON(500, build) return } - c.JSON(202, build) - // - // publish topic - // - buildCopy := *build - buildCopy.Procs = model.Tree(buildCopy.Procs) - message := pubsub.Message{ - Labels: map[string]string{ - "repo": repo.FullName, - "private": strconv.FormatBool(repo.IsPrivate), - }, - } - message.Data, _ = json.Marshal(model.Event{ - Type: model.Enqueued, - Repo: *repo, - Build: buildCopy, - }) - // TODO remove global reference - Config.Services.Pubsub.Publish(c, "topic/events", message) - // - // end publish topic - // - - for _, item := range items { - task := new(queue.Task) - task.ID = fmt.Sprint(item.Proc.ID) - task.Labels = map[string]string{} - task.Labels["platform"] = item.Platform - for k, v := range item.Labels { - task.Labels[k] = v - } - - task.Data, _ = json.Marshal(rpc.Pipeline{ - ID: fmt.Sprint(item.Proc.ID), - Config: item.Config, - Timeout: b.Repo.Timeout, - }) - - Config.Services.Logs.Open(context.Background(), task.ID) - Config.Services.Queue.Push(context.Background(), task) - } + publishToTopic(c, build, repo) + queueBuild(build, repo, buildItems) } -// -/// -// - func DeleteBuildLogs(c *gin.Context) { repo := session.Repo(c) user := session.User(c)