diff --git a/cmd/drone-agent/agent.go b/cmd/drone-agent/agent.go index 5a21040cf..a1bec24c5 100644 --- a/cmd/drone-agent/agent.go +++ b/cmd/drone-agent/agent.go @@ -16,24 +16,16 @@ package main import ( "context" - "encoding/json" - "io" - "io/ioutil" "net/http" "os" - "strconv" "sync" - "time" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" - "github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline" - "github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline/backend" - "github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline/backend/docker" - "github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline/multipart" "github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline/rpc" + "github.com/laszlocph/drone-oss-08/runner" "github.com/drone/signal" "github.com/rs/zerolog" @@ -43,6 +35,10 @@ import ( oldcontext "golang.org/x/net/context" ) +var counter = &runner.State{ + Metadata: map[string]runner.Info{}, +} + func loop(c *cli.Context) error { filter := rpc.Filter{ Labels: map[string]string{ @@ -124,12 +120,8 @@ func loop(c *cli.Context) error { if sigterm.IsSet() { return } - r := runner{ - client: client, - filter: filter, - hostname: hostname, - } - if err := r.run(ctx); err != nil { + r := runner.NewRunner(client, filter, hostname, counter) + if err := r.Run(ctx); err != nil { log.Error().Err(err).Msg("pipeline done with error") return } @@ -141,322 +133,6 @@ func loop(c *cli.Context) error { return nil } -// NOTE we need to limit the size of the logs and files that we upload. -// The maximum grpc payload size is 4194304. So until we implement streaming -// for uploads, we need to set these limits below the maximum. -const ( - maxLogsUpload = 2000000 // this is per step - maxFileUpload = 1000000 -) - -type runner struct { - client rpc.Peer - filter rpc.Filter - hostname string -} - -func (r *runner) run(ctx context.Context) error { - log.Debug(). - Msg("request next execution") - - meta, _ := metadata.FromOutgoingContext(ctx) - ctxmeta := metadata.NewOutgoingContext(context.Background(), meta) - - // get the next job from the queue - work, err := r.client.Next(ctx, r.filter) - if err != nil { - return err - } - if work == nil { - return nil - } - - timeout := time.Hour - if minutes := work.Timeout; minutes != 0 { - timeout = time.Duration(minutes) * time.Minute - } - - counter.Add( - work.ID, - timeout, - extractRepositoryName(work.Config), // hack - extractBuildNumber(work.Config), // hack - ) - defer counter.Done(work.ID) - - logger := log.With(). - Str("repo", extractRepositoryName(work.Config)). // hack - Str("build", extractBuildNumber(work.Config)). // hack - Str("id", work.ID). - Logger() - - logger.Debug(). - Msg("received execution") - - // new docker engine - engine, err := docker.NewEnv() - if err != nil { - logger.Error(). - Err(err). - Msg("cannot create docker client") - - return err - } - - ctx, cancel := context.WithTimeout(ctxmeta, timeout) - defer cancel() - - cancelled := abool.New() - go func() { - logger.Debug(). - Msg("listen for cancel signal") - - if werr := r.client.Wait(ctx, work.ID); werr != nil { - cancelled.SetTo(true) - logger.Warn(). - Err(werr). - Msg("cancel signal received") - - cancel() - } else { - logger.Debug(). - Msg("stop listening for cancel signal") - } - }() - - go func() { - for { - select { - case <-ctx.Done(): - logger.Debug(). - Msg("pipeline done") - - return - case <-time.After(time.Minute): - logger.Debug(). - Msg("pipeline lease renewed") - - r.client.Extend(ctx, work.ID) - } - } - }() - - state := rpc.State{} - state.Started = time.Now().Unix() - - err = r.client.Init(ctxmeta, work.ID, state) - if err != nil { - logger.Error(). - Err(err). - Msg("pipeline initialization failed") - } - - var uploads sync.WaitGroup - defaultLogger := pipeline.LogFunc(func(proc *backend.Step, rc multipart.Reader) error { - - loglogger := logger.With(). - Str("image", proc.Image). - Str("stage", proc.Alias). - Logger() - - part, rerr := rc.NextPart() - if rerr != nil { - return rerr - } - uploads.Add(1) - - var secrets []string - for _, secret := range work.Config.Secrets { - if secret.Mask { - secrets = append(secrets, secret.Value) - } - } - - loglogger.Debug().Msg("log stream opened") - - limitedPart := io.LimitReader(part, maxLogsUpload) - logstream := rpc.NewLineWriter(r.client, work.ID, proc.Alias, secrets...) - io.Copy(logstream, limitedPart) - - loglogger.Debug().Msg("log stream copied") - - file := &rpc.File{} - file.Mime = "application/json+logs" - file.Proc = proc.Alias - file.Name = "logs.json" - file.Data, _ = json.Marshal(logstream.Lines()) - file.Size = len(file.Data) - file.Time = time.Now().Unix() - - loglogger.Debug(). - Msg("log stream uploading") - - if serr := r.client.Upload(ctxmeta, work.ID, file); serr != nil { - loglogger.Error(). - Err(serr). - Msg("log stream upload error") - } - - loglogger.Debug(). - Msg("log stream upload complete") - - defer func() { - loglogger.Debug(). - Msg("log stream closed") - - uploads.Done() - }() - - part, rerr = rc.NextPart() - if rerr != nil { - return nil - } - // TODO should be configurable - limitedPart = io.LimitReader(part, maxFileUpload) - file = &rpc.File{} - file.Mime = part.Header().Get("Content-Type") - file.Proc = proc.Alias - file.Name = part.FileName() - file.Data, _ = ioutil.ReadAll(limitedPart) - file.Size = len(file.Data) - file.Time = time.Now().Unix() - file.Meta = map[string]string{} - - for key, value := range part.Header() { - file.Meta[key] = value[0] - } - - loglogger.Debug(). - Str("file", file.Name). - Str("mime", file.Mime). - Msg("file stream uploading") - - if serr := r.client.Upload(ctxmeta, work.ID, file); serr != nil { - loglogger.Error(). - Err(serr). - Str("file", file.Name). - Str("mime", file.Mime). - Msg("file stream upload error") - } - - loglogger.Debug(). - Str("file", file.Name). - Str("mime", file.Mime). - Msg("file stream upload complete") - return nil - }) - - defaultTracer := pipeline.TraceFunc(func(state *pipeline.State) error { - proclogger := logger.With(). - Str("image", state.Pipeline.Step.Image). - Str("stage", state.Pipeline.Step.Alias). - Int("exit_code", state.Process.ExitCode). - Bool("exited", state.Process.Exited). - Logger() - - procState := rpc.State{ - Proc: state.Pipeline.Step.Alias, - Exited: state.Process.Exited, - ExitCode: state.Process.ExitCode, - Started: time.Now().Unix(), // TODO do not do this - Finished: time.Now().Unix(), - } - defer func() { - proclogger.Debug(). - Msg("update step status") - - if uerr := r.client.Update(ctxmeta, work.ID, procState); uerr != nil { - proclogger.Debug(). - Err(uerr). - Msg("update step status error") - } - - proclogger.Debug(). - Msg("update step status complete") - }() - if state.Process.Exited { - return nil - } - if state.Pipeline.Step.Environment == nil { - state.Pipeline.Step.Environment = map[string]string{} - } - - state.Pipeline.Step.Environment["DRONE_MACHINE"] = r.hostname - state.Pipeline.Step.Environment["CI_BUILD_STATUS"] = "success" - state.Pipeline.Step.Environment["CI_BUILD_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10) - state.Pipeline.Step.Environment["CI_BUILD_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10) - state.Pipeline.Step.Environment["DRONE_BUILD_STATUS"] = "success" - state.Pipeline.Step.Environment["DRONE_BUILD_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10) - state.Pipeline.Step.Environment["DRONE_BUILD_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10) - - state.Pipeline.Step.Environment["CI_JOB_STATUS"] = "success" - state.Pipeline.Step.Environment["CI_JOB_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10) - state.Pipeline.Step.Environment["CI_JOB_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10) - state.Pipeline.Step.Environment["DRONE_JOB_STATUS"] = "success" - state.Pipeline.Step.Environment["DRONE_JOB_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10) - state.Pipeline.Step.Environment["DRONE_JOB_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10) - - if state.Pipeline.Error != nil { - state.Pipeline.Step.Environment["CI_BUILD_STATUS"] = "failure" - state.Pipeline.Step.Environment["CI_JOB_STATUS"] = "failure" - state.Pipeline.Step.Environment["DRONE_BUILD_STATUS"] = "failure" - state.Pipeline.Step.Environment["DRONE_JOB_STATUS"] = "failure" - } - return nil - }) - - err = pipeline.New(work.Config, - pipeline.WithContext(ctx), - pipeline.WithLogger(defaultLogger), - pipeline.WithTracer(defaultTracer), - pipeline.WithEngine(engine), - ).Run() - - state.Finished = time.Now().Unix() - state.Exited = true - if err != nil { - switch xerr := err.(type) { - case *pipeline.ExitError: - state.ExitCode = xerr.Code - default: - state.ExitCode = 1 - state.Error = err.Error() - } - if cancelled.IsSet() { - state.ExitCode = 137 - } - } - - logger.Debug(). - Str("error", state.Error). - Int("exit_code", state.ExitCode). - Msg("pipeline complete") - - logger.Debug(). - Msg("uploading logs") - - uploads.Wait() - - logger.Debug(). - Msg("uploading logs complete") - - logger.Debug(). - Str("error", state.Error). - Int("exit_code", state.ExitCode). - Msg("updating pipeline status") - - err = r.client.Done(ctxmeta, work.ID, state) - if err != nil { - logger.Error().Err(err). - Msg("updating pipeline status failed") - } else { - logger.Debug(). - Msg("updating pipeline status complete") - } - - return nil -} - type credentials struct { username string password string @@ -472,13 +148,3 @@ func (c *credentials) GetRequestMetadata(oldcontext.Context, ...string) (map[str func (c *credentials) RequireTransportSecurity() bool { return false } - -// extract repository name from the configuration -func extractRepositoryName(config *backend.Config) string { - return config.Stages[0].Steps[0].Environment["DRONE_REPO"] -} - -// extract build number from the configuration -func extractBuildNumber(config *backend.Config) string { - return config.Stages[0].Steps[0].Environment["DRONE_BUILD_NUMBER"] -} diff --git a/cmd/drone-agent/health.go b/cmd/drone-agent/health.go index e11649009..643ccdc27 100644 --- a/cmd/drone-agent/health.go +++ b/cmd/drone-agent/health.go @@ -17,10 +17,7 @@ package main import ( "encoding/json" "fmt" - "io" "net/http" - "sync" - "time" "github.com/laszlocph/drone-oss-08/version" "github.com/urfave/cli" @@ -60,7 +57,7 @@ func handleStats(w http.ResponseWriter, r *http.Request) { w.WriteHeader(500) } w.Header().Add("Content-Type", "text/json") - counter.writeTo(w) + counter.WriteTo(w) } type versionResp struct { @@ -68,68 +65,6 @@ type versionResp struct { Source string `json:"source"` } -// default statistics counter -var counter = &state{ - Metadata: map[string]info{}, -} - -type state struct { - sync.Mutex `json:"-"` - Polling int `json:"polling_count"` - Running int `json:"running_count"` - Metadata map[string]info `json:"running"` -} - -type info struct { - ID string `json:"id"` - Repo string `json:"repository"` - Build string `json:"build_number"` - Started time.Time `json:"build_started"` - Timeout time.Duration `json:"build_timeout"` -} - -func (s *state) Add(id string, timeout time.Duration, repo, build string) { - s.Lock() - s.Polling-- - s.Running++ - s.Metadata[id] = info{ - ID: id, - Repo: repo, - Build: build, - Timeout: timeout, - Started: time.Now().UTC(), - } - s.Unlock() -} - -func (s *state) Done(id string) { - s.Lock() - s.Polling++ - s.Running-- - delete(s.Metadata, id) - s.Unlock() -} - -func (s *state) Healthy() bool { - s.Lock() - defer s.Unlock() - now := time.Now() - buf := time.Hour // 1 hour buffer - for _, item := range s.Metadata { - if now.After(item.Started.Add(item.Timeout).Add(buf)) { - return false - } - } - return true -} - -func (s *state) writeTo(w io.Writer) (int, error) { - s.Lock() - out, _ := json.Marshal(s) - s.Unlock() - return w.Write(out) -} - // handles pinging the endpoint and returns an error if the // agent is in an unhealthy state. func pinger(c *cli.Context) error {