Fix pipeline cancel (#6320)

This commit is contained in:
6543
2026-03-30 19:33:54 +02:00
committed by GitHub
parent 662575630a
commit af471c5372
5 changed files with 14 additions and 3 deletions

View File

@@ -51,6 +51,8 @@ func NewRunner(workEngine rpc.Peer, f rpc.Filter, h string, state *State, backen
}
}
// TODO: refactor this big function into subfunctions in it's own subpackage
// Run executes a workflow using a backend, tracks its state and reports the state back to the server.
func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error {
log.Debug().Msg("request next execution")
@@ -105,7 +107,7 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error {
// Listen for remote cancel events (UI / API).
// When canceled, we MUST cancel the workflow context
// so that workflow execution stop immediately.
// so that workflow execution stops immediately.
go func() {
logger.Debug().Msg("start listening for server side cancel signal")

View File

@@ -267,6 +267,8 @@ func (e *docker) WaitStep(ctx context.Context, step *backend_types.Step, taskUUI
log.Trace().Msgf("ContainerWait returned with resp: %v", resp)
case err := <-errC:
log.Trace().Msgf("ContainerWait returned with err: %v", err)
case <-ctx.Done():
return nil, ctx.Err()
}
info, err := e.client.ContainerInspect(ctx, containerName)

View File

@@ -53,7 +53,7 @@ func (r *Runtime) Run(runnerCtx context.Context) error {
r.started = time.Now().Unix()
if err := r.engine.SetupWorkflow(runnerCtx, r.spec, r.taskUUID); err != nil {
if err := r.engine.SetupWorkflow(r.ctx, r.spec, r.taskUUID); err != nil { //nolint:contextcheck
r.traceWorkflowSetupError(err)
return err
}

View File

@@ -169,7 +169,11 @@ func (q *fifo) Wait(ctx context.Context, taskID string) error {
select {
case <-ctx.Done():
case <-state.done:
// only return queue errors and no workflow errors
// check if we have a wrapped cancel error and unwrap it
if errors.Is(state.error, ErrCancel) {
return ErrCancel
}
// or return queue errors and no workflow errors
if !errors.Is(state.error, new(ErrExternal)) {
return state.error
}

View File

@@ -117,13 +117,16 @@ func (s *RPC) Wait(c context.Context, workflowID string) (canceled bool, err err
if err := s.queue.Wait(c, workflowID); err != nil {
if errors.Is(err, queue.ErrCancel) {
// we explicit send a cancel signal
log.Debug().Str("workflowID", workflowID).Msg("while waiting the queue reported the workflow as canceled")
return true, nil
}
// unknown error happened
log.Error().Err(err).Str("workflowID", workflowID).Msg("while waiting the queue returned an unexpected error")
return false, err
}
// workflow finished and on issues appeared
log.Debug().Str("workflowID", workflowID).Msg("queue reported the workflow as finished")
return false, nil
}