From 4dd3be7f96a976d19facfc86837bfbc18a974f6f Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Sat, 25 Apr 2026 16:36:26 +0200 Subject: [PATCH] Move wait for log uploads logic out of logger and tracer into pipeline runtime (#6471) --- agent/logger.go | 6 +- agent/runner.go | 20 ++-- agent/tracer.go | 38 +----- cli/exec/exec.go | 2 +- pipeline/runtime/helpers_test.go | 10 ++ pipeline/runtime/runtime.go | 2 + pipeline/runtime/runtime_test.go | 196 +++++++++++++++++++++++++++---- pipeline/runtime/step.go | 30 ++++- pipeline/runtime/step_test.go | 29 ----- pipeline/runtime/workflow.go | 5 + pipeline/tracing/tracer.go | 18 +-- 11 files changed, 230 insertions(+), 126 deletions(-) diff --git a/agent/logger.go b/agent/logger.go index cecf15dbe0..23a420344a 100644 --- a/agent/logger.go +++ b/agent/logger.go @@ -16,7 +16,6 @@ package agent import ( "io" - "sync" "github.com/rs/zerolog" @@ -28,7 +27,7 @@ import ( "go.woodpecker-ci.org/woodpecker/v3/rpc" ) -func (r *Runner) createLogger(_logger zerolog.Logger, uploads *sync.WaitGroup, workflow *rpc.Workflow) logging.Logger { +func (r *Runner) createLogger(_logger zerolog.Logger, workflow *rpc.Workflow) logging.Logger { return func(step *backend_types.Step, rc io.ReadCloser) error { defer rc.Close() @@ -36,9 +35,6 @@ func (r *Runner) createLogger(_logger zerolog.Logger, uploads *sync.WaitGroup, w Str("image", step.Image). Logger() - uploads.Add(1) - defer uploads.Done() - var secrets []string for _, secret := range workflow.Config.Secrets { secrets = append(secrets, secret.Value) diff --git a/agent/runner.go b/agent/runner.go index 7e5d19464c..e9a653cab8 100644 --- a/agent/runner.go +++ b/agent/runner.go @@ -19,7 +19,7 @@ import ( "context" "errors" "fmt" - "sync" + "runtime" "time" "github.com/rs/zerolog/log" @@ -159,7 +159,14 @@ func (r *Runner) Run(runnerCtx context.Context) error { return err } - var uploads sync.WaitGroup + // Enrich workflow env with agent info + // TODO: find better way to track this state + for _, stage := range workflow.Config.Stages { + for _, step := range stage.Steps { + step.Environment["CI_MACHINE"] = r.hostname + step.Environment["CI_SYSTEM_PLATFORM"] = runtime.GOOS + "/" + runtime.GOARCH + } + } // Run pipeline err = pipeline_runtime.New( @@ -167,8 +174,8 @@ func (r *Runner) Run(runnerCtx context.Context) error { r.backend, pipeline_runtime.WithContext(workflowCtx), pipeline_runtime.WithTaskUUID(fmt.Sprint(workflow.ID)), - pipeline_runtime.WithLogger(r.createLogger(logger, &uploads, workflow)), - pipeline_runtime.WithTracer(r.createTracer(ctxMeta, &uploads, logger, workflow)), + pipeline_runtime.WithLogger(r.createLogger(logger, workflow)), + pipeline_runtime.WithTracer(r.createTracer(ctxMeta, logger, workflow)), pipeline_runtime.WithDescription(map[string]string{ "workflow_id": workflow.ID, "repo": repoName, @@ -192,11 +199,6 @@ func (r *Runner) Run(runnerCtx context.Context) error { Bool("canceled", state.Canceled). Msg("workflow finished") - // Ensure all logs/traces are uploaded before finishing - logger.Debug().Msg("waiting for logs and traces upload") - uploads.Wait() - logger.Debug().Msg("logs and traces uploaded") - // Update workflow state doneCtx := runnerCtx //nolint:contextcheck if doneCtx.Err() != nil { diff --git a/agent/tracer.go b/agent/tracer.go index 836badfbfc..5cde77d8da 100644 --- a/agent/tracer.go +++ b/agent/tracer.go @@ -17,9 +17,6 @@ package agent import ( "context" "errors" - "runtime" - "strconv" - "sync" "time" "github.com/rs/zerolog" @@ -30,11 +27,8 @@ import ( "go.woodpecker-ci.org/woodpecker/v3/rpc" ) -func (r *Runner) createTracer(ctxMeta context.Context, uploads *sync.WaitGroup, logger zerolog.Logger, workflow *rpc.Workflow) tracing.TraceFunc { +func (r *Runner) createTracer(ctxMeta context.Context, logger zerolog.Logger, workflow *rpc.Workflow) tracing.TraceFunc { return func(state *state.State) error { - uploads.Add(1) - defer uploads.Done() - stepLogger := logger.With(). Str("image", state.CurrStep.Image). Str("workflow_id", workflow.ID). @@ -58,33 +52,9 @@ func (r *Runner) createTracer(ctxMeta context.Context, uploads *sync.WaitGroup, stepState.Finished = time.Now().Unix() } - defer func() { - stepLogger.Debug().Msg("update step status") + stepLogger.Debug().Msg("update step status") + defer stepLogger.Debug().Msg("update step status complete") - if err := r.client.Update(ctxMeta, workflow.ID, stepState); err != nil { - stepLogger.Debug(). - Err(err). - Msg("update step status error") - } - - stepLogger.Debug().Msg("update step status complete") - }() - if state.CurrStepState.Exited { - return nil - } - if state.CurrStep.Environment == nil { - state.CurrStep.Environment = map[string]string{} - } - - // TODO: find better way to update this state and move it to pipeline to have the same env in cli-exec - state.CurrStep.Environment["CI_MACHINE"] = r.hostname - - state.CurrStep.Environment["CI_PIPELINE_STARTED"] = strconv.FormatInt(state.Workflow.Started, 10) - - state.CurrStep.Environment["CI_STEP_STARTED"] = strconv.FormatInt(state.Workflow.Started, 10) - - state.CurrStep.Environment["CI_SYSTEM_PLATFORM"] = runtime.GOOS + "/" + runtime.GOARCH - - return nil + return r.client.Update(ctxMeta, workflow.ID, stepState) } } diff --git a/cli/exec/exec.go b/cli/exec/exec.go index fc5dfe7f03..9da07e62ba 100644 --- a/cli/exec/exec.go +++ b/cli/exec/exec.go @@ -324,7 +324,7 @@ func execWithAxis(ctx context.Context, c *cli.Command, file, repoPath string, ax return pipeline_runtime.New(compiled, backendEngine, pipeline_runtime.WithContext(pipelineCtx), //nolint:contextcheck - pipeline_runtime.WithTracer(tracing.DefaultTracer), + pipeline_runtime.WithTracer(tracing.NoOpTracer), pipeline_runtime.WithLogger(defaultLogger), pipeline_runtime.WithDescription(map[string]string{ "CLI": "exec", diff --git a/pipeline/runtime/helpers_test.go b/pipeline/runtime/helpers_test.go index ea29c8f6ca..14f5631dba 100644 --- a/pipeline/runtime/helpers_test.go +++ b/pipeline/runtime/helpers_test.go @@ -57,3 +57,13 @@ func getTracerStates(tracer *tracer_mocks.MockTracer) []state.State { } return states } + +// indexOfTrace returns the first index where predicate matches, or -1. +func indexOfTrace(traces []state.State, match func(s state.State) bool) int { + for i := range traces { + if match(traces[i]) { + return i + } + } + return -1 +} diff --git a/pipeline/runtime/runtime.go b/pipeline/runtime/runtime.go index 1cb262ebe3..8003b26a72 100644 --- a/pipeline/runtime/runtime.go +++ b/pipeline/runtime/runtime.go @@ -47,6 +47,8 @@ type Runtime struct { tracerLock sync.Mutex logger logging.Logger + uploadWait sync.WaitGroup + taskUUID string description map[string]string } diff --git a/pipeline/runtime/runtime_test.go b/pipeline/runtime/runtime_test.go index e497564a2a..ffe4ba2e5d 100644 --- a/pipeline/runtime/runtime_test.go +++ b/pipeline/runtime/runtime_test.go @@ -193,35 +193,77 @@ func TestWorkflowWithServiceStep(t *testing.T) { WithLogger(newTestLogger(t)), ) - assert.NoError(t, r.Run(t.Context())) + require.NoError(t, r.Run(t.Context())) traces := getTracerStates(tracer) - if assert.Len(t, traces, 5) { - assert.EqualValues(t, backend_types.State{}, traces[0].CurrStepState) - assert.Greater(t, traces[2].CurrStepState.Started, int64(0)) - assert.EqualValues(t, backend_types.State{Started: traces[2].CurrStepState.Started, Exited: true}, traces[2].CurrStepState) - assert.EqualValues(t, backend_types.State{}, traces[3].CurrStepState) - assert.Greater(t, traces[4].CurrStepState.Started, int64(0)) - assert.EqualValues(t, backend_types.State{Started: traces[4].CurrStepState.Started, Exited: true}, traces[4].CurrStepState) - assert.Greater(t, traces[4].Workflow.Started, int64(0)) - assert.EqualValues(t, state.State{ - Workflow: state.Workflow{ - Started: traces[4].Workflow.Started, - }, - CurrStep: &backend_types.Step{ - Name: "test", - UUID: "test-uuid", - Type: "commands", - OnSuccess: true, - Environment: map[string]string{}, - Commands: []string{"echo test"}, - }, - CurrStepState: backend_types.State{ - Started: traces[4].CurrStepState.Started, - Exited: true, - }, - }, traces[4]) + // Each step should emit exactly one "started" and one "exited" trace: + // db (service/detached), build, test — 3 * 2 = 6 traces total. + require.Len(t, traces, 6) + + // Per-step invariants: started trace is the zero state, exited trace is + // Exited=true with a monotonic Started timestamp. + for _, name := range []string{"db", "build", "test"} { + started := findFirstTraceByName(traces, name) + require.NotNil(t, started, "%s should have a started trace", name) + assert.EqualValues(t, backend_types.State{}, started.CurrStepState, + "%s started trace should be zero-valued", name) + + last := findLastTraceByName(traces, name) + require.NotNil(t, last, "%s should have an exited trace", name) + assert.True(t, last.CurrStepState.Exited, "%s should be exited", name) + assert.Equal(t, 0, last.CurrStepState.ExitCode, "%s should exit 0", name) + assert.Greater(t, last.CurrStepState.Started, int64(0), + "%s should have a non-zero Started timestamp", name) } + + // Per-step ordering: started trace precedes exited trace for the same step. + for _, name := range []string{"db", "build", "test"} { + startedIdx := indexOfTrace(traces, func(s state.State) bool { + return s.CurrStep != nil && s.CurrStep.Name == name && !s.CurrStepState.Exited + }) + exitedIdx := indexOfTrace(traces, func(s state.State) bool { + return s.CurrStep != nil && s.CurrStep.Name == name && s.CurrStepState.Exited + }) + assert.Less(t, startedIdx, exitedIdx, "%s started must precede %s exited", name, name) + } + + // The contract of a service/detached step in stage 1: its exit trace arrives + // AFTER stage 2's steps have already been traced. That's the whole point of + // detaching — it must not block the next stage. + dbExitIdx := indexOfTrace(traces, func(s state.State) bool { + return s.CurrStep != nil && s.CurrStep.Name == "db" && s.CurrStepState.Exited + }) + testExitIdx := indexOfTrace(traces, func(s state.State) bool { + return s.CurrStep != nil && s.CurrStep.Name == "test" && s.CurrStepState.Exited + }) + assert.Greater(t, dbExitIdx, testExitIdx, + "db (service) must complete after test (next stage) — otherwise it wasn't really detached") + + // Runtime-injected env vars should be present on the test step's exit trace. + testExit := findLastTraceByName(traces, "test") + require.NotNil(t, testExit) + assert.NotEmpty(t, testExit.CurrStep.Environment["CI_PIPELINE_STARTED"]) + assert.NotEmpty(t, testExit.CurrStep.Environment["CI_STEP_STARTED"]) + assert.Greater(t, testExit.Workflow.Started, int64(0)) + + // Strip runtime-injected env for a structural comparison of the step itself. + delete(testExit.CurrStep.Environment, "CI_PIPELINE_STARTED") + delete(testExit.CurrStep.Environment, "CI_STEP_STARTED") + assert.EqualValues(t, state.State{ + Workflow: state.Workflow{Started: testExit.Workflow.Started}, + CurrStep: &backend_types.Step{ + Name: "test", + UUID: "test-uuid", + Type: "commands", + OnSuccess: true, + Environment: map[string]string{}, + Commands: []string{"echo test"}, + }, + CurrStepState: backend_types.State{ + Started: testExit.CurrStepState.Started, + Exited: true, + }, + }, *testExit) } func TestWorkflowDetachedStepDoesNotBlockWorkflow(t *testing.T) { @@ -409,6 +451,9 @@ func TestWorkflowPluginStep(t *testing.T) { lastPluginTrace := findLastTraceByName(getTracerStates(tracer), "publish") if assert.NotNil(t, lastPluginTrace) { + delete(lastPluginTrace.CurrStep.Environment, "CI_PIPELINE_STARTED") + delete(lastPluginTrace.CurrStep.Environment, "CI_STEP_STARTED") + assert.EqualValues(t, map[string]string{ "DRONE_BUILD_STATUS": "success", "DRONE_REPO_SCM": "git", @@ -1223,3 +1268,102 @@ func TestWorkflowCancelDuringStepSleep(t *testing.T) { assert.Nil(t, findFirstTraceByName(getTracerStates(tracer), "never-reached"), "never-reached must not have been traced") } + +// TestWorkflowFailingServiceDoesNotFailWorkflow pins down the intentional design: +// a service/detached step that fails in the background has its failure logged +// and traced, but it must NOT propagate to the workflow error. Subsequent +// stages must still run, and Run() must return nil. +// +// This is the explicit contract in runDetachedStep: +// "Any error that occurs after setup is logged but not propagated — it cannot +// +// influence the pipeline outcome at that point." +func TestWorkflowFailingServiceDoesNotFailWorkflow(t *testing.T) { + t.Parallel() + tracer := newTestTracer(t) + r := New( + &backend_types.Config{ + Stages: []*backend_types.Stage{ + {Steps: []*backend_types.Step{ + // Service runs ~100ms (from withService), then exits non-zero. + cmdStep("db", withService(), withExitCode(1)), + cmdStep("build"), + }}, + {Steps: []*backend_types.Step{cmdStep("deploy")}}, + }, + }, + dummy.New(), + WithTracer(tracer), + WithLogger(newTestLogger(t)), + ) + + // Contract 1: workflow succeeds even though the service failed. + assert.NoError(t, r.Run(t.Context()), + "service failure must not fail the workflow (detached errors are not propagated)") + + traces := getTracerStates(tracer) + + // Contract 2: the service's failure IS visible in traces. This is the + // observability guarantee — the failure is logged and recorded even though + // it doesn't kill the workflow. + dbExit := findLastTraceByName(traces, "db") + require.NotNil(t, dbExit, "db must have an exit trace") + assert.True(t, dbExit.CurrStepState.Exited, "db should be marked exited") + assert.Equal(t, 1, dbExit.CurrStepState.ExitCode, "db exit code must be preserved in trace") + + // Contract 3: deploy must run normally — NOT skipped — because the service + // failure didn't set r.err. + deployExit := findLastTraceByName(traces, "deploy") + require.NotNil(t, deployExit, "deploy must be traced") + assert.False(t, deployExit.CurrStepState.Skipped, "deploy must run when only a service failed") + assert.True(t, deployExit.CurrStepState.Exited, "deploy should complete normally") + assert.Equal(t, 0, deployExit.CurrStepState.ExitCode) + + // Contract 4: uploadWait at the end of Run() guarantees the detached trace + // has been emitted BEFORE Run() returns. This is non-timing-dependent: + // if Run() returned, the exit trace for every detached step must exist. + // This is what the uploadWait plumbing in this PR is actually for. + assert.NotNil(t, findLastTraceByName(traces, "db"), + "detached step exit trace must be emitted before Run() returns (uploadWait contract)") +} + +// TestWorkflowFailingDetachedStepDoesNotFailWorkflow is the non-service +// counterpart: Detached=true, Type=commands (a background worker). Same +// contract — failures don't propagate. +func TestWorkflowFailingDetachedStepDoesNotFailWorkflow(t *testing.T) { + t.Parallel() + tracer := newTestTracer(t) + r := New( + &backend_types.Config{ + Stages: []*backend_types.Stage{ + {Steps: []*backend_types.Step{ + // Detached (non-service) worker, ~100ms (from withDetached), exits code 2. + cmdStep("background-worker", withDetached(), withExitCode(2)), + cmdStep("main-build"), + }}, + {Steps: []*backend_types.Step{cmdStep("deploy")}}, + }, + }, + dummy.New(), + WithTracer(tracer), + WithLogger(newTestLogger(t)), + ) + + assert.NoError(t, r.Run(t.Context()), + "detached worker failure must not fail the workflow") + + traces := getTracerStates(tracer) + + workerExit := findLastTraceByName(traces, "background-worker") + require.NotNil(t, workerExit, "background-worker must have an exit trace") + assert.True(t, workerExit.CurrStepState.Exited) + assert.Equal(t, 2, workerExit.CurrStepState.ExitCode, + "exit code from detached step must be preserved in trace") + + deployExit := findLastTraceByName(traces, "deploy") + require.NotNil(t, deployExit, "deploy must be traced") + assert.False(t, deployExit.CurrStepState.Skipped, + "deploy must run when only a detached worker failed") + assert.True(t, deployExit.CurrStepState.Exited) + assert.Equal(t, 0, deployExit.CurrStepState.ExitCode) +} diff --git a/pipeline/runtime/step.go b/pipeline/runtime/step.go index ee971dc6ef..4824403141 100644 --- a/pipeline/runtime/step.go +++ b/pipeline/runtime/step.go @@ -17,6 +17,7 @@ package runtime import ( "context" "errors" + "strconv" "sync" "time" @@ -206,7 +207,10 @@ func (r *Runtime) runDetachedStep(runnerCtx context.Context, step *backend_types } // Container is up and logging is streaming — hand off to background. + r.uploadWait.Add(1) go func() { + defer r.uploadWait.Done() + logger := r.makeLogger() processState, err := r.completeStep(runnerCtx, step, waitForLogs, startTime) @@ -253,12 +257,26 @@ func (r *Runtime) traceStep(processState *backend_types.State, err error, step * // processState == nil && err == nil: step just started, leave s.CurrStepState zero-valued. } - // The tracer should just trace changes, but it currently also updates step env vars used in various ways: - // https://github.com/woodpecker-ci/woodpecker/blob/main/agent/tracer.go#L79-L86 . - r.tracerLock.Lock() - defer r.tracerLock.Unlock() - if traceErr := r.tracer.Trace(s); traceErr != nil { - return traceErr + // The traceStep should just trace changes, but it currently also updates step env vars. + { + r.tracerLock.Lock() + defer r.tracerLock.Unlock() + + if s.CurrStepState.Exited { + if s.CurrStep.Environment == nil { + s.CurrStep.Environment = map[string]string{} + } + + // TODO: find better way to insert runtime step environment variables. + s.CurrStep.Environment["CI_PIPELINE_STARTED"] = strconv.FormatInt(s.Workflow.Started, 10) + s.CurrStep.Environment["CI_STEP_STARTED"] = strconv.FormatInt(s.Workflow.Started, 10) + } } + + if traceErr := r.tracer.Trace(s); traceErr != nil { + logger := r.makeLogger() + logger.Error().Err(traceErr).Msg("could not trace step state change") + } + return err } diff --git a/pipeline/runtime/step_test.go b/pipeline/runtime/step_test.go index 7f3dd1b0b0..47c2d1c9d3 100644 --- a/pipeline/runtime/step_test.go +++ b/pipeline/runtime/step_test.go @@ -177,18 +177,6 @@ func TestTraceStep(t *testing.T) { assert.True(t, calls[0].CurrStepState.Exited) }) - t.Run("TracerError", func(t *testing.T) { - t.Parallel() - traceErr := errors.New("tracer unavailable") - tracer := tracer_mocks.NewMockTracer(t) - tracer.On("Trace", mock.Anything).Return(traceErr).Maybe() - r := newDummyRuntime(t, tracer) - - err := r.traceStep(nil, nil, dummyStep("s1")) - - assert.ErrorIs(t, err, traceErr) - }) - t.Run("PipelineErrorPropagated", func(t *testing.T) { t.Parallel() tracer := newTestTracer(t) @@ -518,23 +506,6 @@ func TestExecuteStep(t *testing.T) { return atomic.LoadInt32(&traced) >= 2 }, time.Second, 10*time.Millisecond) }) - - t.Run("TracerErrorOnStarted", func(t *testing.T) { - t.Parallel() - traceErr := errors.New("tracer down") - tracer := tracer_mocks.NewMockTracer(t) - // First call (skip-check passes, this is the "started" trace) → error. - // The step has OnSuccess=true and no prior error, so shouldSkipStep returns false, - // meaning executeStep calls traceStep(nil, nil, step) first. - tracer.On("Trace", mock.Anything).Return(traceErr).Once() - - r := newDummyRuntime(t, tracer) - step := dummyStep("s1") // OnSuccess=true, so not skipped - - err := r.executeStep(t.Context(), step) - - assert.ErrorIs(t, err, traceErr) - }) } func TestRunBlockingStep(t *testing.T) { diff --git a/pipeline/runtime/workflow.go b/pipeline/runtime/workflow.go index 793110d4dc..0f3be10bdf 100644 --- a/pipeline/runtime/workflow.go +++ b/pipeline/runtime/workflow.go @@ -71,6 +71,11 @@ func (r *Runtime) Run(runnerCtx context.Context) error { } } + // Ensure all logs/traces are uploaded before finishing + logger.Debug().Msg("waiting for logs and traces upload") + r.uploadWait.Wait() + logger.Debug().Msg("logs and traces uploaded") + return r.err.Get() } diff --git a/pipeline/tracing/tracer.go b/pipeline/tracing/tracer.go index 800cb9e3c6..1f1233af7e 100644 --- a/pipeline/tracing/tracer.go +++ b/pipeline/tracing/tracer.go @@ -15,8 +15,6 @@ package tracing import ( - "strconv" - "go.woodpecker-ci.org/woodpecker/v3/pipeline/state" ) @@ -34,19 +32,7 @@ func (f TraceFunc) Trace(state *state.State) error { return f(state) } -// DefaultTracer provides a tracer that updates the CI_ environment -// variables to include the correct timestamp and status. -// TODO: find either a new home or better name for this. -var DefaultTracer = TraceFunc(func(state *state.State) error { - if state.CurrStepState.Exited { - return nil - } - if state.CurrStep.Environment == nil { - return nil - } - state.CurrStep.Environment["CI_PIPELINE_STARTED"] = strconv.FormatInt(state.Workflow.Started, 10) - - state.CurrStep.Environment["CI_STEP_STARTED"] = strconv.FormatInt(state.Workflow.Started, 10) - +// NoOpTracer provides a tracer that does nothing. +var NoOpTracer = TraceFunc(func(state *state.State) error { return nil })