Cleanup state reporting (#3850)

This commit is contained in:
Anbraten
2024-07-01 11:20:55 +02:00
committed by GitHub
parent daeab8d3c7
commit 2fa9432ef8
18 changed files with 590 additions and 514 deletions

View File

@@ -36,7 +36,7 @@ func (r *Runner) createLogger(_logger zerolog.Logger, uploads *sync.WaitGroup, w
return func(step *backend.Step, rc io.Reader) error {
logger := _logger.With().
Str("image", step.Image).
Str("workflowID", workflow.ID).
Str("workflow_id", workflow.ID).
Logger()
uploads.Add(1)

View File

@@ -161,17 +161,14 @@ func (c *client) Wait(ctx context.Context, id string) (err error) {
}
// Init signals the workflow is initialized.
func (c *client) Init(ctx context.Context, id string, state rpc.State) (err error) {
func (c *client) Init(ctx context.Context, workflowID string, state rpc.WorkflowState) (err error) {
retry := c.newBackOff()
req := new(proto.InitRequest)
req.Id = id
req.State = new(proto.State)
req.State.Error = state.Error
req.State.ExitCode = int32(state.ExitCode)
req.State.Exited = state.Exited
req.State.Finished = state.Finished
req.Id = workflowID
req.State = new(proto.WorkflowState)
req.State.Started = state.Started
req.State.StepUuid = state.StepUUID
req.State.Finished = state.Finished
req.State.Error = state.Error
for {
_, err = c.client.Init(ctx, req)
if err == nil {
@@ -201,18 +198,15 @@ func (c *client) Init(ctx context.Context, id string, state rpc.State) (err erro
return nil
}
// Done signals the work is complete.
func (c *client) Done(ctx context.Context, id string, state rpc.State) (err error) {
// Done signals the workflow is complete.
func (c *client) Done(ctx context.Context, workflowID string, state rpc.WorkflowState) (err error) {
retry := c.newBackOff()
req := new(proto.DoneRequest)
req.Id = id
req.State = new(proto.State)
req.State.Error = state.Error
req.State.ExitCode = int32(state.ExitCode)
req.State.Exited = state.Exited
req.State.Finished = state.Finished
req.Id = workflowID
req.State = new(proto.WorkflowState)
req.State.Started = state.Started
req.State.StepUuid = state.StepUUID
req.State.Finished = state.Finished
req.State.Error = state.Error
for {
_, err = c.client.Done(ctx, req)
if err == nil {
@@ -277,17 +271,17 @@ func (c *client) Extend(ctx context.Context, id string) (err error) {
}
// Update updates the workflow state.
func (c *client) Update(ctx context.Context, id string, state rpc.State) (err error) {
func (c *client) Update(ctx context.Context, id string, state rpc.StepState) (err error) {
retry := c.newBackOff()
req := new(proto.UpdateRequest)
req.Id = id
req.State = new(proto.State)
req.State.Error = state.Error
req.State.ExitCode = int32(state.ExitCode)
req.State.Exited = state.Exited
req.State.Finished = state.Finished
req.State.Started = state.Started
req.State = new(proto.StepState)
req.State.StepUuid = state.StepUUID
req.State.Started = state.Started
req.State.Finished = state.Finished
req.State.Exited = state.Exited
req.State.ExitCode = int32(state.ExitCode)
req.State.Error = state.Error
for {
_, err = c.client.Update(ctx, req)
if err == nil {

View File

@@ -23,7 +23,6 @@ import (
"time"
"github.com/rs/zerolog/log"
"github.com/tevino/abool/v2"
"google.golang.org/grpc/metadata"
"go.woodpecker-ci.org/woodpecker/v2/pipeline"
@@ -57,34 +56,34 @@ func (r *Runner) Run(runnerCtx context.Context) error { //nolint:contextcheck
ctxMeta := metadata.NewOutgoingContext(context.Background(), meta)
// get the next workflow from the queue
work, err := r.client.Next(runnerCtx, r.filter)
workflow, err := r.client.Next(runnerCtx, r.filter)
if err != nil {
return err
}
if work == nil {
if workflow == nil {
return nil
}
timeout := time.Hour
if minutes := work.Timeout; minutes != 0 {
if minutes := workflow.Timeout; minutes != 0 {
timeout = time.Duration(minutes) * time.Minute
}
repoName := extractRepositoryName(work.Config) // hack
pipelineNumber := extractPipelineNumber(work.Config) // hack
repoName := extractRepositoryName(workflow.Config) // hack
pipelineNumber := extractPipelineNumber(workflow.Config) // hack
r.counter.Add(
work.ID,
workflow.ID,
timeout,
repoName,
pipelineNumber,
)
defer r.counter.Done(work.ID)
defer r.counter.Done(workflow.ID)
logger := log.With().
Str("repo", repoName).
Str("pipeline", pipelineNumber).
Str("id", work.ID).
Str("workflow_id", workflow.ID).
Logger()
logger.Debug().Msg("received execution")
@@ -99,17 +98,17 @@ func (r *Runner) Run(runnerCtx context.Context) error { //nolint:contextcheck
logger.Error().Msg("Received sigterm termination signal")
})
canceled := abool.New()
canceled := false
go func() {
logger.Debug().Msg("listen for cancel signal")
if err := r.client.Wait(workflowCtx, work.ID); err != nil {
canceled.SetTo(true)
if err := r.client.Wait(workflowCtx, workflow.ID); err != nil {
canceled = true
logger.Warn().Err(err).Msg("cancel signal received")
cancel()
} else {
logger.Debug().Msg("stop listening for cancel signal")
logger.Debug().Msg("done listening for cancel signal")
}
}()
@@ -123,76 +122,66 @@ func (r *Runner) Run(runnerCtx context.Context) error { //nolint:contextcheck
case <-time.After(time.Minute):
logger.Debug().Msg("pipeline lease renewed")
if err := r.client.Extend(workflowCtx, work.ID); err != nil {
if err := r.client.Extend(workflowCtx, workflow.ID); err != nil {
log.Error().Err(err).Msg("extending pipeline deadline failed")
}
}
}
}()
state := rpc.State{}
state := rpc.WorkflowState{}
state.Started = time.Now().Unix()
err = r.client.Init(runnerCtx, work.ID, state)
err = r.client.Init(runnerCtx, workflow.ID, state)
if err != nil {
logger.Error().Err(err).Msg("pipeline initialization failed")
logger.Error().Err(err).Msg("workflow initialization failed")
// TODO: should we return here?
}
var uploads sync.WaitGroup
//nolint:contextcheck
err = pipeline.New(work.Config,
err = pipeline.New(workflow.Config,
pipeline.WithContext(workflowCtx),
pipeline.WithTaskUUID(fmt.Sprint(work.ID)),
pipeline.WithLogger(r.createLogger(logger, &uploads, work)),
pipeline.WithTracer(r.createTracer(ctxMeta, logger, work)),
pipeline.WithTaskUUID(fmt.Sprint(workflow.ID)),
pipeline.WithLogger(r.createLogger(logger, &uploads, workflow)),
pipeline.WithTracer(r.createTracer(ctxMeta, logger, workflow)),
pipeline.WithBackend(*r.backend),
pipeline.WithDescription(map[string]string{
"ID": work.ID,
"Repo": repoName,
"Pipeline": pipelineNumber,
"workflow_id": workflow.ID,
"repo": repoName,
"pipeline_number": pipelineNumber,
}),
).Run(runnerCtx)
state.Finished = time.Now().Unix()
state.Exited = true
if canceled.IsSet() {
state.Error = ""
state.ExitCode = pipeline.ExitCodeKilled
} else if err != nil {
pExitError := &pipeline.ExitError{}
switch {
case errors.As(err, &pExitError):
state.ExitCode = pExitError.Code
case errors.Is(err, pipeline.ErrCancel):
state.Error = ""
state.ExitCode = pipeline.ExitCodeKilled
canceled.SetTo(true)
default:
state.ExitCode = 1
state.Error = err.Error()
}
if errors.Is(err, pipeline.ErrCancel) {
canceled = true
} else if canceled {
err = errors.Join(err, pipeline.ErrCancel)
}
if err != nil {
state.Error = err.Error()
}
logger.Debug().
Str("error", state.Error).
Int("exit_code", state.ExitCode).
Bool("canceled", canceled.IsSet()).
Msg("pipeline complete")
Bool("canceled", canceled).
Msg("workflow finished")
logger.Debug().Msg("uploading logs")
logger.Debug().Msg("uploading logs ...")
uploads.Wait()
logger.Debug().Msg("uploading logs complete")
logger.Debug().Msg("uploaded logs")
logger.Debug().
Str("error", state.Error).
Int("exit_code", state.ExitCode).
Msg("updating pipeline status")
Msg("updating workflow status")
if err := r.client.Done(runnerCtx, work.ID, state); err != nil {
logger.Error().Err(err).Msg("updating pipeline status failed")
if err := r.client.Done(runnerCtx, workflow.ID, state); err != nil {
logger.Error().Err(err).Msg("updating workflow status failed")
} else {
logger.Debug().Msg("updating pipeline status complete")
logger.Debug().Msg("updating workflow status complete")
}
return nil

View File

@@ -36,7 +36,7 @@ func (r *Runner) createTracer(ctxMeta context.Context, logger zerolog.Logger, wo
Bool("exited", state.Process.Exited).
Logger()
stepState := rpc.State{
stepState := rpc.StepState{
StepUUID: state.Pipeline.Step.UUID,
Exited: state.Process.Exited,
ExitCode: state.Process.ExitCode,
@@ -69,11 +69,11 @@ func (r *Runner) createTracer(ctxMeta context.Context, logger zerolog.Logger, wo
state.Pipeline.Step.Environment["CI_MACHINE"] = r.hostname
state.Pipeline.Step.Environment["CI_PIPELINE_STATUS"] = "success"
state.Pipeline.Step.Environment["CI_PIPELINE_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10)
state.Pipeline.Step.Environment["CI_PIPELINE_STARTED"] = strconv.FormatInt(state.Pipeline.Started, 10)
state.Pipeline.Step.Environment["CI_PIPELINE_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10)
state.Pipeline.Step.Environment["CI_STEP_STATUS"] = "success"
state.Pipeline.Step.Environment["CI_STEP_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10)
state.Pipeline.Step.Environment["CI_STEP_STARTED"] = strconv.FormatInt(state.Pipeline.Started, 10)
state.Pipeline.Step.Environment["CI_STEP_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10)
state.Pipeline.Step.Environment["CI_SYSTEM_PLATFORM"] = runtime.GOOS + "/" + runtime.GOARCH