From af471c5372bcbcf60e6892317e7fdf715e9b2a55 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Mon, 30 Mar 2026 19:33:54 +0200 Subject: [PATCH] Fix pipeline cancel (#6320) --- agent/runner.go | 4 +++- pipeline/backend/docker/docker.go | 2 ++ pipeline/runtime/workflow.go | 2 +- server/queue/fifo.go | 6 +++++- server/rpc/rpc.go | 3 +++ 5 files changed, 14 insertions(+), 3 deletions(-) diff --git a/agent/runner.go b/agent/runner.go index e754fe9756..4220427378 100644 --- a/agent/runner.go +++ b/agent/runner.go @@ -51,6 +51,8 @@ func NewRunner(workEngine rpc.Peer, f rpc.Filter, h string, state *State, backen } } +// TODO: refactor this big function into subfunctions in it's own subpackage + // Run executes a workflow using a backend, tracks its state and reports the state back to the server. func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { log.Debug().Msg("request next execution") @@ -105,7 +107,7 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { // Listen for remote cancel events (UI / API). // When canceled, we MUST cancel the workflow context - // so that workflow execution stop immediately. + // so that workflow execution stops immediately. go func() { logger.Debug().Msg("start listening for server side cancel signal") diff --git a/pipeline/backend/docker/docker.go b/pipeline/backend/docker/docker.go index 63b99fc3e8..b1bbd6733a 100644 --- a/pipeline/backend/docker/docker.go +++ b/pipeline/backend/docker/docker.go @@ -267,6 +267,8 @@ func (e *docker) WaitStep(ctx context.Context, step *backend_types.Step, taskUUI log.Trace().Msgf("ContainerWait returned with resp: %v", resp) case err := <-errC: log.Trace().Msgf("ContainerWait returned with err: %v", err) + case <-ctx.Done(): + return nil, ctx.Err() } info, err := e.client.ContainerInspect(ctx, containerName) diff --git a/pipeline/runtime/workflow.go b/pipeline/runtime/workflow.go index 1c841d8040..bc8d1cee65 100644 --- a/pipeline/runtime/workflow.go +++ b/pipeline/runtime/workflow.go @@ -53,7 +53,7 @@ func (r *Runtime) Run(runnerCtx context.Context) error { r.started = time.Now().Unix() - if err := r.engine.SetupWorkflow(runnerCtx, r.spec, r.taskUUID); err != nil { + if err := r.engine.SetupWorkflow(r.ctx, r.spec, r.taskUUID); err != nil { //nolint:contextcheck r.traceWorkflowSetupError(err) return err } diff --git a/server/queue/fifo.go b/server/queue/fifo.go index e215c817a1..57d6b7af72 100644 --- a/server/queue/fifo.go +++ b/server/queue/fifo.go @@ -169,7 +169,11 @@ func (q *fifo) Wait(ctx context.Context, taskID string) error { select { case <-ctx.Done(): case <-state.done: - // only return queue errors and no workflow errors + // check if we have a wrapped cancel error and unwrap it + if errors.Is(state.error, ErrCancel) { + return ErrCancel + } + // or return queue errors and no workflow errors if !errors.Is(state.error, new(ErrExternal)) { return state.error } diff --git a/server/rpc/rpc.go b/server/rpc/rpc.go index 0b97fd8b81..2ed19cf871 100644 --- a/server/rpc/rpc.go +++ b/server/rpc/rpc.go @@ -117,13 +117,16 @@ func (s *RPC) Wait(c context.Context, workflowID string) (canceled bool, err err if err := s.queue.Wait(c, workflowID); err != nil { if errors.Is(err, queue.ErrCancel) { // we explicit send a cancel signal + log.Debug().Str("workflowID", workflowID).Msg("while waiting the queue reported the workflow as canceled") return true, nil } // unknown error happened + log.Error().Err(err).Str("workflowID", workflowID).Msg("while waiting the queue returned an unexpected error") return false, err } // workflow finished and on issues appeared + log.Debug().Str("workflowID", workflowID).Msg("queue reported the workflow as finished") return false, nil }