mirror of
https://github.com/woodpecker-ci/woodpecker.git
synced 2026-05-04 12:32:36 +00:00
Move wait for log uploads logic out of logger and tracer into pipeline runtime (#6471)
This commit is contained in:
@@ -16,7 +16,6 @@ package agent
|
||||
|
||||
import (
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
|
||||
@@ -28,7 +27,7 @@ import (
|
||||
"go.woodpecker-ci.org/woodpecker/v3/rpc"
|
||||
)
|
||||
|
||||
func (r *Runner) createLogger(_logger zerolog.Logger, uploads *sync.WaitGroup, workflow *rpc.Workflow) logging.Logger {
|
||||
func (r *Runner) createLogger(_logger zerolog.Logger, workflow *rpc.Workflow) logging.Logger {
|
||||
return func(step *backend_types.Step, rc io.ReadCloser) error {
|
||||
defer rc.Close()
|
||||
|
||||
@@ -36,9 +35,6 @@ func (r *Runner) createLogger(_logger zerolog.Logger, uploads *sync.WaitGroup, w
|
||||
Str("image", step.Image).
|
||||
Logger()
|
||||
|
||||
uploads.Add(1)
|
||||
defer uploads.Done()
|
||||
|
||||
var secrets []string
|
||||
for _, secret := range workflow.Config.Secrets {
|
||||
secrets = append(secrets, secret.Value)
|
||||
|
||||
@@ -19,7 +19,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
@@ -159,7 +159,14 @@ func (r *Runner) Run(runnerCtx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
var uploads sync.WaitGroup
|
||||
// Enrich workflow env with agent info
|
||||
// TODO: find better way to track this state
|
||||
for _, stage := range workflow.Config.Stages {
|
||||
for _, step := range stage.Steps {
|
||||
step.Environment["CI_MACHINE"] = r.hostname
|
||||
step.Environment["CI_SYSTEM_PLATFORM"] = runtime.GOOS + "/" + runtime.GOARCH
|
||||
}
|
||||
}
|
||||
|
||||
// Run pipeline
|
||||
err = pipeline_runtime.New(
|
||||
@@ -167,8 +174,8 @@ func (r *Runner) Run(runnerCtx context.Context) error {
|
||||
r.backend,
|
||||
pipeline_runtime.WithContext(workflowCtx),
|
||||
pipeline_runtime.WithTaskUUID(fmt.Sprint(workflow.ID)),
|
||||
pipeline_runtime.WithLogger(r.createLogger(logger, &uploads, workflow)),
|
||||
pipeline_runtime.WithTracer(r.createTracer(ctxMeta, &uploads, logger, workflow)),
|
||||
pipeline_runtime.WithLogger(r.createLogger(logger, workflow)),
|
||||
pipeline_runtime.WithTracer(r.createTracer(ctxMeta, logger, workflow)),
|
||||
pipeline_runtime.WithDescription(map[string]string{
|
||||
"workflow_id": workflow.ID,
|
||||
"repo": repoName,
|
||||
@@ -192,11 +199,6 @@ func (r *Runner) Run(runnerCtx context.Context) error {
|
||||
Bool("canceled", state.Canceled).
|
||||
Msg("workflow finished")
|
||||
|
||||
// Ensure all logs/traces are uploaded before finishing
|
||||
logger.Debug().Msg("waiting for logs and traces upload")
|
||||
uploads.Wait()
|
||||
logger.Debug().Msg("logs and traces uploaded")
|
||||
|
||||
// Update workflow state
|
||||
doneCtx := runnerCtx //nolint:contextcheck
|
||||
if doneCtx.Err() != nil {
|
||||
|
||||
@@ -17,9 +17,6 @@ package agent
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
@@ -30,11 +27,8 @@ import (
|
||||
"go.woodpecker-ci.org/woodpecker/v3/rpc"
|
||||
)
|
||||
|
||||
func (r *Runner) createTracer(ctxMeta context.Context, uploads *sync.WaitGroup, logger zerolog.Logger, workflow *rpc.Workflow) tracing.TraceFunc {
|
||||
func (r *Runner) createTracer(ctxMeta context.Context, logger zerolog.Logger, workflow *rpc.Workflow) tracing.TraceFunc {
|
||||
return func(state *state.State) error {
|
||||
uploads.Add(1)
|
||||
defer uploads.Done()
|
||||
|
||||
stepLogger := logger.With().
|
||||
Str("image", state.CurrStep.Image).
|
||||
Str("workflow_id", workflow.ID).
|
||||
@@ -58,33 +52,9 @@ func (r *Runner) createTracer(ctxMeta context.Context, uploads *sync.WaitGroup,
|
||||
stepState.Finished = time.Now().Unix()
|
||||
}
|
||||
|
||||
defer func() {
|
||||
stepLogger.Debug().Msg("update step status")
|
||||
stepLogger.Debug().Msg("update step status")
|
||||
defer stepLogger.Debug().Msg("update step status complete")
|
||||
|
||||
if err := r.client.Update(ctxMeta, workflow.ID, stepState); err != nil {
|
||||
stepLogger.Debug().
|
||||
Err(err).
|
||||
Msg("update step status error")
|
||||
}
|
||||
|
||||
stepLogger.Debug().Msg("update step status complete")
|
||||
}()
|
||||
if state.CurrStepState.Exited {
|
||||
return nil
|
||||
}
|
||||
if state.CurrStep.Environment == nil {
|
||||
state.CurrStep.Environment = map[string]string{}
|
||||
}
|
||||
|
||||
// TODO: find better way to update this state and move it to pipeline to have the same env in cli-exec
|
||||
state.CurrStep.Environment["CI_MACHINE"] = r.hostname
|
||||
|
||||
state.CurrStep.Environment["CI_PIPELINE_STARTED"] = strconv.FormatInt(state.Workflow.Started, 10)
|
||||
|
||||
state.CurrStep.Environment["CI_STEP_STARTED"] = strconv.FormatInt(state.Workflow.Started, 10)
|
||||
|
||||
state.CurrStep.Environment["CI_SYSTEM_PLATFORM"] = runtime.GOOS + "/" + runtime.GOARCH
|
||||
|
||||
return nil
|
||||
return r.client.Update(ctxMeta, workflow.ID, stepState)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -324,7 +324,7 @@ func execWithAxis(ctx context.Context, c *cli.Command, file, repoPath string, ax
|
||||
|
||||
return pipeline_runtime.New(compiled, backendEngine,
|
||||
pipeline_runtime.WithContext(pipelineCtx), //nolint:contextcheck
|
||||
pipeline_runtime.WithTracer(tracing.DefaultTracer),
|
||||
pipeline_runtime.WithTracer(tracing.NoOpTracer),
|
||||
pipeline_runtime.WithLogger(defaultLogger),
|
||||
pipeline_runtime.WithDescription(map[string]string{
|
||||
"CLI": "exec",
|
||||
|
||||
@@ -57,3 +57,13 @@ func getTracerStates(tracer *tracer_mocks.MockTracer) []state.State {
|
||||
}
|
||||
return states
|
||||
}
|
||||
|
||||
// indexOfTrace returns the first index where predicate matches, or -1.
|
||||
func indexOfTrace(traces []state.State, match func(s state.State) bool) int {
|
||||
for i := range traces {
|
||||
if match(traces[i]) {
|
||||
return i
|
||||
}
|
||||
}
|
||||
return -1
|
||||
}
|
||||
|
||||
@@ -47,6 +47,8 @@ type Runtime struct {
|
||||
tracerLock sync.Mutex
|
||||
logger logging.Logger
|
||||
|
||||
uploadWait sync.WaitGroup
|
||||
|
||||
taskUUID string
|
||||
description map[string]string
|
||||
}
|
||||
|
||||
@@ -193,35 +193,77 @@ func TestWorkflowWithServiceStep(t *testing.T) {
|
||||
WithLogger(newTestLogger(t)),
|
||||
)
|
||||
|
||||
assert.NoError(t, r.Run(t.Context()))
|
||||
require.NoError(t, r.Run(t.Context()))
|
||||
traces := getTracerStates(tracer)
|
||||
if assert.Len(t, traces, 5) {
|
||||
assert.EqualValues(t, backend_types.State{}, traces[0].CurrStepState)
|
||||
assert.Greater(t, traces[2].CurrStepState.Started, int64(0))
|
||||
assert.EqualValues(t, backend_types.State{Started: traces[2].CurrStepState.Started, Exited: true}, traces[2].CurrStepState)
|
||||
assert.EqualValues(t, backend_types.State{}, traces[3].CurrStepState)
|
||||
assert.Greater(t, traces[4].CurrStepState.Started, int64(0))
|
||||
assert.EqualValues(t, backend_types.State{Started: traces[4].CurrStepState.Started, Exited: true}, traces[4].CurrStepState)
|
||||
|
||||
assert.Greater(t, traces[4].Workflow.Started, int64(0))
|
||||
assert.EqualValues(t, state.State{
|
||||
Workflow: state.Workflow{
|
||||
Started: traces[4].Workflow.Started,
|
||||
},
|
||||
CurrStep: &backend_types.Step{
|
||||
Name: "test",
|
||||
UUID: "test-uuid",
|
||||
Type: "commands",
|
||||
OnSuccess: true,
|
||||
Environment: map[string]string{},
|
||||
Commands: []string{"echo test"},
|
||||
},
|
||||
CurrStepState: backend_types.State{
|
||||
Started: traces[4].CurrStepState.Started,
|
||||
Exited: true,
|
||||
},
|
||||
}, traces[4])
|
||||
// Each step should emit exactly one "started" and one "exited" trace:
|
||||
// db (service/detached), build, test — 3 * 2 = 6 traces total.
|
||||
require.Len(t, traces, 6)
|
||||
|
||||
// Per-step invariants: started trace is the zero state, exited trace is
|
||||
// Exited=true with a monotonic Started timestamp.
|
||||
for _, name := range []string{"db", "build", "test"} {
|
||||
started := findFirstTraceByName(traces, name)
|
||||
require.NotNil(t, started, "%s should have a started trace", name)
|
||||
assert.EqualValues(t, backend_types.State{}, started.CurrStepState,
|
||||
"%s started trace should be zero-valued", name)
|
||||
|
||||
last := findLastTraceByName(traces, name)
|
||||
require.NotNil(t, last, "%s should have an exited trace", name)
|
||||
assert.True(t, last.CurrStepState.Exited, "%s should be exited", name)
|
||||
assert.Equal(t, 0, last.CurrStepState.ExitCode, "%s should exit 0", name)
|
||||
assert.Greater(t, last.CurrStepState.Started, int64(0),
|
||||
"%s should have a non-zero Started timestamp", name)
|
||||
}
|
||||
|
||||
// Per-step ordering: started trace precedes exited trace for the same step.
|
||||
for _, name := range []string{"db", "build", "test"} {
|
||||
startedIdx := indexOfTrace(traces, func(s state.State) bool {
|
||||
return s.CurrStep != nil && s.CurrStep.Name == name && !s.CurrStepState.Exited
|
||||
})
|
||||
exitedIdx := indexOfTrace(traces, func(s state.State) bool {
|
||||
return s.CurrStep != nil && s.CurrStep.Name == name && s.CurrStepState.Exited
|
||||
})
|
||||
assert.Less(t, startedIdx, exitedIdx, "%s started must precede %s exited", name, name)
|
||||
}
|
||||
|
||||
// The contract of a service/detached step in stage 1: its exit trace arrives
|
||||
// AFTER stage 2's steps have already been traced. That's the whole point of
|
||||
// detaching — it must not block the next stage.
|
||||
dbExitIdx := indexOfTrace(traces, func(s state.State) bool {
|
||||
return s.CurrStep != nil && s.CurrStep.Name == "db" && s.CurrStepState.Exited
|
||||
})
|
||||
testExitIdx := indexOfTrace(traces, func(s state.State) bool {
|
||||
return s.CurrStep != nil && s.CurrStep.Name == "test" && s.CurrStepState.Exited
|
||||
})
|
||||
assert.Greater(t, dbExitIdx, testExitIdx,
|
||||
"db (service) must complete after test (next stage) — otherwise it wasn't really detached")
|
||||
|
||||
// Runtime-injected env vars should be present on the test step's exit trace.
|
||||
testExit := findLastTraceByName(traces, "test")
|
||||
require.NotNil(t, testExit)
|
||||
assert.NotEmpty(t, testExit.CurrStep.Environment["CI_PIPELINE_STARTED"])
|
||||
assert.NotEmpty(t, testExit.CurrStep.Environment["CI_STEP_STARTED"])
|
||||
assert.Greater(t, testExit.Workflow.Started, int64(0))
|
||||
|
||||
// Strip runtime-injected env for a structural comparison of the step itself.
|
||||
delete(testExit.CurrStep.Environment, "CI_PIPELINE_STARTED")
|
||||
delete(testExit.CurrStep.Environment, "CI_STEP_STARTED")
|
||||
assert.EqualValues(t, state.State{
|
||||
Workflow: state.Workflow{Started: testExit.Workflow.Started},
|
||||
CurrStep: &backend_types.Step{
|
||||
Name: "test",
|
||||
UUID: "test-uuid",
|
||||
Type: "commands",
|
||||
OnSuccess: true,
|
||||
Environment: map[string]string{},
|
||||
Commands: []string{"echo test"},
|
||||
},
|
||||
CurrStepState: backend_types.State{
|
||||
Started: testExit.CurrStepState.Started,
|
||||
Exited: true,
|
||||
},
|
||||
}, *testExit)
|
||||
}
|
||||
|
||||
func TestWorkflowDetachedStepDoesNotBlockWorkflow(t *testing.T) {
|
||||
@@ -409,6 +451,9 @@ func TestWorkflowPluginStep(t *testing.T) {
|
||||
|
||||
lastPluginTrace := findLastTraceByName(getTracerStates(tracer), "publish")
|
||||
if assert.NotNil(t, lastPluginTrace) {
|
||||
delete(lastPluginTrace.CurrStep.Environment, "CI_PIPELINE_STARTED")
|
||||
delete(lastPluginTrace.CurrStep.Environment, "CI_STEP_STARTED")
|
||||
|
||||
assert.EqualValues(t, map[string]string{
|
||||
"DRONE_BUILD_STATUS": "success",
|
||||
"DRONE_REPO_SCM": "git",
|
||||
@@ -1223,3 +1268,102 @@ func TestWorkflowCancelDuringStepSleep(t *testing.T) {
|
||||
assert.Nil(t, findFirstTraceByName(getTracerStates(tracer), "never-reached"),
|
||||
"never-reached must not have been traced")
|
||||
}
|
||||
|
||||
// TestWorkflowFailingServiceDoesNotFailWorkflow pins down the intentional design:
|
||||
// a service/detached step that fails in the background has its failure logged
|
||||
// and traced, but it must NOT propagate to the workflow error. Subsequent
|
||||
// stages must still run, and Run() must return nil.
|
||||
//
|
||||
// This is the explicit contract in runDetachedStep:
|
||||
// "Any error that occurs after setup is logged but not propagated — it cannot
|
||||
//
|
||||
// influence the pipeline outcome at that point."
|
||||
func TestWorkflowFailingServiceDoesNotFailWorkflow(t *testing.T) {
|
||||
t.Parallel()
|
||||
tracer := newTestTracer(t)
|
||||
r := New(
|
||||
&backend_types.Config{
|
||||
Stages: []*backend_types.Stage{
|
||||
{Steps: []*backend_types.Step{
|
||||
// Service runs ~100ms (from withService), then exits non-zero.
|
||||
cmdStep("db", withService(), withExitCode(1)),
|
||||
cmdStep("build"),
|
||||
}},
|
||||
{Steps: []*backend_types.Step{cmdStep("deploy")}},
|
||||
},
|
||||
},
|
||||
dummy.New(),
|
||||
WithTracer(tracer),
|
||||
WithLogger(newTestLogger(t)),
|
||||
)
|
||||
|
||||
// Contract 1: workflow succeeds even though the service failed.
|
||||
assert.NoError(t, r.Run(t.Context()),
|
||||
"service failure must not fail the workflow (detached errors are not propagated)")
|
||||
|
||||
traces := getTracerStates(tracer)
|
||||
|
||||
// Contract 2: the service's failure IS visible in traces. This is the
|
||||
// observability guarantee — the failure is logged and recorded even though
|
||||
// it doesn't kill the workflow.
|
||||
dbExit := findLastTraceByName(traces, "db")
|
||||
require.NotNil(t, dbExit, "db must have an exit trace")
|
||||
assert.True(t, dbExit.CurrStepState.Exited, "db should be marked exited")
|
||||
assert.Equal(t, 1, dbExit.CurrStepState.ExitCode, "db exit code must be preserved in trace")
|
||||
|
||||
// Contract 3: deploy must run normally — NOT skipped — because the service
|
||||
// failure didn't set r.err.
|
||||
deployExit := findLastTraceByName(traces, "deploy")
|
||||
require.NotNil(t, deployExit, "deploy must be traced")
|
||||
assert.False(t, deployExit.CurrStepState.Skipped, "deploy must run when only a service failed")
|
||||
assert.True(t, deployExit.CurrStepState.Exited, "deploy should complete normally")
|
||||
assert.Equal(t, 0, deployExit.CurrStepState.ExitCode)
|
||||
|
||||
// Contract 4: uploadWait at the end of Run() guarantees the detached trace
|
||||
// has been emitted BEFORE Run() returns. This is non-timing-dependent:
|
||||
// if Run() returned, the exit trace for every detached step must exist.
|
||||
// This is what the uploadWait plumbing in this PR is actually for.
|
||||
assert.NotNil(t, findLastTraceByName(traces, "db"),
|
||||
"detached step exit trace must be emitted before Run() returns (uploadWait contract)")
|
||||
}
|
||||
|
||||
// TestWorkflowFailingDetachedStepDoesNotFailWorkflow is the non-service
|
||||
// counterpart: Detached=true, Type=commands (a background worker). Same
|
||||
// contract — failures don't propagate.
|
||||
func TestWorkflowFailingDetachedStepDoesNotFailWorkflow(t *testing.T) {
|
||||
t.Parallel()
|
||||
tracer := newTestTracer(t)
|
||||
r := New(
|
||||
&backend_types.Config{
|
||||
Stages: []*backend_types.Stage{
|
||||
{Steps: []*backend_types.Step{
|
||||
// Detached (non-service) worker, ~100ms (from withDetached), exits code 2.
|
||||
cmdStep("background-worker", withDetached(), withExitCode(2)),
|
||||
cmdStep("main-build"),
|
||||
}},
|
||||
{Steps: []*backend_types.Step{cmdStep("deploy")}},
|
||||
},
|
||||
},
|
||||
dummy.New(),
|
||||
WithTracer(tracer),
|
||||
WithLogger(newTestLogger(t)),
|
||||
)
|
||||
|
||||
assert.NoError(t, r.Run(t.Context()),
|
||||
"detached worker failure must not fail the workflow")
|
||||
|
||||
traces := getTracerStates(tracer)
|
||||
|
||||
workerExit := findLastTraceByName(traces, "background-worker")
|
||||
require.NotNil(t, workerExit, "background-worker must have an exit trace")
|
||||
assert.True(t, workerExit.CurrStepState.Exited)
|
||||
assert.Equal(t, 2, workerExit.CurrStepState.ExitCode,
|
||||
"exit code from detached step must be preserved in trace")
|
||||
|
||||
deployExit := findLastTraceByName(traces, "deploy")
|
||||
require.NotNil(t, deployExit, "deploy must be traced")
|
||||
assert.False(t, deployExit.CurrStepState.Skipped,
|
||||
"deploy must run when only a detached worker failed")
|
||||
assert.True(t, deployExit.CurrStepState.Exited)
|
||||
assert.Equal(t, 0, deployExit.CurrStepState.ExitCode)
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ package runtime
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -206,7 +207,10 @@ func (r *Runtime) runDetachedStep(runnerCtx context.Context, step *backend_types
|
||||
}
|
||||
|
||||
// Container is up and logging is streaming — hand off to background.
|
||||
r.uploadWait.Add(1)
|
||||
go func() {
|
||||
defer r.uploadWait.Done()
|
||||
|
||||
logger := r.makeLogger()
|
||||
|
||||
processState, err := r.completeStep(runnerCtx, step, waitForLogs, startTime)
|
||||
@@ -253,12 +257,26 @@ func (r *Runtime) traceStep(processState *backend_types.State, err error, step *
|
||||
// processState == nil && err == nil: step just started, leave s.CurrStepState zero-valued.
|
||||
}
|
||||
|
||||
// The tracer should just trace changes, but it currently also updates step env vars used in various ways:
|
||||
// https://github.com/woodpecker-ci/woodpecker/blob/main/agent/tracer.go#L79-L86 .
|
||||
r.tracerLock.Lock()
|
||||
defer r.tracerLock.Unlock()
|
||||
if traceErr := r.tracer.Trace(s); traceErr != nil {
|
||||
return traceErr
|
||||
// The traceStep should just trace changes, but it currently also updates step env vars.
|
||||
{
|
||||
r.tracerLock.Lock()
|
||||
defer r.tracerLock.Unlock()
|
||||
|
||||
if s.CurrStepState.Exited {
|
||||
if s.CurrStep.Environment == nil {
|
||||
s.CurrStep.Environment = map[string]string{}
|
||||
}
|
||||
|
||||
// TODO: find better way to insert runtime step environment variables.
|
||||
s.CurrStep.Environment["CI_PIPELINE_STARTED"] = strconv.FormatInt(s.Workflow.Started, 10)
|
||||
s.CurrStep.Environment["CI_STEP_STARTED"] = strconv.FormatInt(s.Workflow.Started, 10)
|
||||
}
|
||||
}
|
||||
|
||||
if traceErr := r.tracer.Trace(s); traceErr != nil {
|
||||
logger := r.makeLogger()
|
||||
logger.Error().Err(traceErr).Msg("could not trace step state change")
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -177,18 +177,6 @@ func TestTraceStep(t *testing.T) {
|
||||
assert.True(t, calls[0].CurrStepState.Exited)
|
||||
})
|
||||
|
||||
t.Run("TracerError", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
traceErr := errors.New("tracer unavailable")
|
||||
tracer := tracer_mocks.NewMockTracer(t)
|
||||
tracer.On("Trace", mock.Anything).Return(traceErr).Maybe()
|
||||
r := newDummyRuntime(t, tracer)
|
||||
|
||||
err := r.traceStep(nil, nil, dummyStep("s1"))
|
||||
|
||||
assert.ErrorIs(t, err, traceErr)
|
||||
})
|
||||
|
||||
t.Run("PipelineErrorPropagated", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
tracer := newTestTracer(t)
|
||||
@@ -518,23 +506,6 @@ func TestExecuteStep(t *testing.T) {
|
||||
return atomic.LoadInt32(&traced) >= 2
|
||||
}, time.Second, 10*time.Millisecond)
|
||||
})
|
||||
|
||||
t.Run("TracerErrorOnStarted", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
traceErr := errors.New("tracer down")
|
||||
tracer := tracer_mocks.NewMockTracer(t)
|
||||
// First call (skip-check passes, this is the "started" trace) → error.
|
||||
// The step has OnSuccess=true and no prior error, so shouldSkipStep returns false,
|
||||
// meaning executeStep calls traceStep(nil, nil, step) first.
|
||||
tracer.On("Trace", mock.Anything).Return(traceErr).Once()
|
||||
|
||||
r := newDummyRuntime(t, tracer)
|
||||
step := dummyStep("s1") // OnSuccess=true, so not skipped
|
||||
|
||||
err := r.executeStep(t.Context(), step)
|
||||
|
||||
assert.ErrorIs(t, err, traceErr)
|
||||
})
|
||||
}
|
||||
|
||||
func TestRunBlockingStep(t *testing.T) {
|
||||
|
||||
@@ -71,6 +71,11 @@ func (r *Runtime) Run(runnerCtx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure all logs/traces are uploaded before finishing
|
||||
logger.Debug().Msg("waiting for logs and traces upload")
|
||||
r.uploadWait.Wait()
|
||||
logger.Debug().Msg("logs and traces uploaded")
|
||||
|
||||
return r.err.Get()
|
||||
}
|
||||
|
||||
|
||||
@@ -15,8 +15,6 @@
|
||||
package tracing
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"go.woodpecker-ci.org/woodpecker/v3/pipeline/state"
|
||||
)
|
||||
|
||||
@@ -34,19 +32,7 @@ func (f TraceFunc) Trace(state *state.State) error {
|
||||
return f(state)
|
||||
}
|
||||
|
||||
// DefaultTracer provides a tracer that updates the CI_ environment
|
||||
// variables to include the correct timestamp and status.
|
||||
// TODO: find either a new home or better name for this.
|
||||
var DefaultTracer = TraceFunc(func(state *state.State) error {
|
||||
if state.CurrStepState.Exited {
|
||||
return nil
|
||||
}
|
||||
if state.CurrStep.Environment == nil {
|
||||
return nil
|
||||
}
|
||||
state.CurrStep.Environment["CI_PIPELINE_STARTED"] = strconv.FormatInt(state.Workflow.Started, 10)
|
||||
|
||||
state.CurrStep.Environment["CI_STEP_STARTED"] = strconv.FormatInt(state.Workflow.Started, 10)
|
||||
|
||||
// NoOpTracer provides a tracer that does nothing.
|
||||
var NoOpTracer = TraceFunc(func(state *state.State) error {
|
||||
return nil
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user