From e45ca56dcb27a04a20f970cd12436373aa5a89e1 Mon Sep 17 00:00:00 2001 From: Zettat123 Date: Thu, 24 Jul 2025 22:30:48 -0600 Subject: [PATCH] improve handling cancellation --- models/actions/run.go | 6 ++++- routers/api/actions/runner/runner.go | 2 +- routers/web/repo/actions/view.go | 37 ++++------------------------ services/actions/clear_tasks.go | 7 ++++++ services/actions/job_emitter.go | 15 ++++++++++- 5 files changed, 32 insertions(+), 35 deletions(-) diff --git a/models/actions/run.go b/models/actions/run.go index dc69d5c6d01..f4e130dce45 100644 --- a/models/actions/run.go +++ b/models/actions/run.go @@ -285,7 +285,11 @@ func CancelJobs(ctx context.Context, jobs []*ActionRunJob) ([]*ActionRunJob, err if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil { return cancelledJobs, err } - cancelledJobs = append(cancelledJobs, job) + updatedJob, err := GetRunJobByID(ctx, job.ID) + if err != nil { + return cancelledJobs, fmt.Errorf("get job: %w", err) + } + cancelledJobs = append(cancelledJobs, updatedJob) } // Return nil to indicate successful cancellation of all running and waiting jobs. diff --git a/routers/api/actions/runner/runner.go b/routers/api/actions/runner/runner.go index ce8137592d6..9a24343ea21 100644 --- a/routers/api/actions/runner/runner.go +++ b/routers/api/actions/runner/runner.go @@ -227,7 +227,7 @@ func (s *Service) UpdateTask( } if req.Msg.State.Result != runnerv1.Result_RESULT_UNSPECIFIED { - if err := actions_service.EmitJobsIfReady(task.Job.RunID); err != nil { + if err := actions_service.EmitJobsIfReadyByRun(task.Job.RunID); err != nil { log.Error("Emit ready jobs of run %d: %v", task.Job.RunID, err) } } diff --git a/routers/web/repo/actions/view.go b/routers/web/repo/actions/view.go index 0ac1e0ae693..79d01b47480 100644 --- a/routers/web/repo/actions/view.go +++ b/routers/web/repo/actions/view.go @@ -27,7 +27,6 @@ import ( "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/storage" "code.gitea.io/gitea/modules/templates" - "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/util" "code.gitea.io/gitea/modules/web" "code.gitea.io/gitea/routers/common" @@ -572,30 +571,11 @@ func Cancel(ctx *context_module.Context) { var updatedjobs []*actions_model.ActionRunJob if err := db.WithTx(ctx, func(ctx context.Context) error { - for _, job := range jobs { - status := job.Status - if status.IsDone() { - continue - } - if job.TaskID == 0 { - job.Status = actions_model.StatusCancelled - job.Stopped = timeutil.TimeStampNow() - n, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped") - if err != nil { - return err - } - if n == 0 { - return errors.New("job has changed, try again") - } - if n > 0 { - updatedjobs = append(updatedjobs, job) - } - continue - } - if err := actions_model.StopTask(ctx, job.TaskID, actions_model.StatusCancelled); err != nil { - return err - } + cancelledJobs, err := actions_model.CancelJobs(ctx, jobs) + if err != nil { + return fmt.Errorf("cancel jobs: %w", err) } + updatedjobs = append(updatedjobs, cancelledJobs...) return nil }); err != nil { ctx.ServerError("StopTask", err) @@ -603,14 +583,7 @@ func Cancel(ctx *context_module.Context) { } actions_service.CreateCommitStatus(ctx, jobs...) - - run, err := actions_model.GetRunByIndex(ctx, ctx.Repo.Repository.ID, runIndex) - if err != nil { - ctx.ServerError("GetRunByIndex", err) - } - if err := actions_service.EmitJobsIfReady(run.ID); err != nil { - log.Error("Emit ready jobs of run %d: %v", run.ID, err) - } + actions_service.EmitJobsIfReadyByJobs(updatedjobs) for _, job := range updatedjobs { _ = job.LoadAttributes(ctx) diff --git a/services/actions/clear_tasks.go b/services/actions/clear_tasks.go index 227b692d882..1b871109008 100644 --- a/services/actions/clear_tasks.go +++ b/services/actions/clear_tasks.go @@ -52,12 +52,14 @@ func notifyWorkflowJobStatusUpdate(ctx context.Context, jobs []*actions_model.Ac func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) error { jobs, err := actions_model.CancelPreviousJobs(ctx, repoID, ref, workflowID, event) notifyWorkflowJobStatusUpdate(ctx, jobs) + EmitJobsIfReadyByJobs(jobs) return err } func CleanRepoScheduleTasks(ctx context.Context, repo *repo_model.Repository) error { jobs, err := actions_model.CleanRepoScheduleTasks(ctx, repo) notifyWorkflowJobStatusUpdate(ctx, jobs) + EmitJobsIfReadyByJobs(jobs) return err } @@ -109,6 +111,7 @@ func stopTasks(ctx context.Context, opts actions_model.FindTaskOptions) error { } notifyWorkflowJobStatusUpdate(ctx, jobs) + EmitJobsIfReadyByJobs(jobs) return nil } @@ -125,6 +128,7 @@ func CancelAbandonedJobs(ctx context.Context) error { } now := timeutil.TimeStampNow() + var updatedJobs []*actions_model.ActionRunJob for _, job := range jobs { job.Status = actions_model.StatusCancelled job.Stopped = now @@ -139,10 +143,13 @@ func CancelAbandonedJobs(ctx context.Context) error { } CreateCommitStatus(ctx, job) if updated { + updatedJobs = append(updatedJobs, job) NotifyWorkflowRunStatusUpdateWithReload(ctx, job) notify_service.WorkflowJobStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job, nil) } } + EmitJobsIfReadyByJobs(updatedJobs) + return nil } diff --git a/services/actions/job_emitter.go b/services/actions/job_emitter.go index 76cdedd88ac..f44f5721634 100644 --- a/services/actions/job_emitter.go +++ b/services/actions/job_emitter.go @@ -26,7 +26,7 @@ type jobUpdate struct { RunID int64 } -func EmitJobsIfReady(runID int64) error { +func EmitJobsIfReadyByRun(runID int64) error { err := jobEmitterQueue.Push(&jobUpdate{ RunID: runID, }) @@ -36,6 +36,19 @@ func EmitJobsIfReady(runID int64) error { return err } +func EmitJobsIfReadyByJobs(jobs []*actions_model.ActionRunJob) { + checkedRuns := make(container.Set[int64]) + for _, job := range jobs { + if !job.Status.IsDone() || checkedRuns.Contains(job.RunID) { + continue + } + if err := EmitJobsIfReadyByRun(job.RunID); err != nil { + log.Error("Check jobs of run %d: %v", job.RunID, err) + } + checkedRuns.Add(job.RunID) + } +} + func jobEmitterQueueHandler(items ...*jobUpdate) []*jobUpdate { ctx := graceful.GetManager().ShutdownContext() var ret []*jobUpdate