improve handling cancellation

This commit is contained in:
Zettat123 2025-07-24 22:30:48 -06:00
parent dc003e40fc
commit e45ca56dcb
5 changed files with 32 additions and 35 deletions

View File

@ -285,7 +285,11 @@ func CancelJobs(ctx context.Context, jobs []*ActionRunJob) ([]*ActionRunJob, err
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil { if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
return cancelledJobs, err return cancelledJobs, err
} }
cancelledJobs = append(cancelledJobs, job) updatedJob, err := GetRunJobByID(ctx, job.ID)
if err != nil {
return cancelledJobs, fmt.Errorf("get job: %w", err)
}
cancelledJobs = append(cancelledJobs, updatedJob)
} }
// Return nil to indicate successful cancellation of all running and waiting jobs. // Return nil to indicate successful cancellation of all running and waiting jobs.

View File

@ -227,7 +227,7 @@ func (s *Service) UpdateTask(
} }
if req.Msg.State.Result != runnerv1.Result_RESULT_UNSPECIFIED { if req.Msg.State.Result != runnerv1.Result_RESULT_UNSPECIFIED {
if err := actions_service.EmitJobsIfReady(task.Job.RunID); err != nil { if err := actions_service.EmitJobsIfReadyByRun(task.Job.RunID); err != nil {
log.Error("Emit ready jobs of run %d: %v", task.Job.RunID, err) log.Error("Emit ready jobs of run %d: %v", task.Job.RunID, err)
} }
} }

View File

@ -27,7 +27,6 @@ import (
"code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/storage" "code.gitea.io/gitea/modules/storage"
"code.gitea.io/gitea/modules/templates" "code.gitea.io/gitea/modules/templates"
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/modules/util" "code.gitea.io/gitea/modules/util"
"code.gitea.io/gitea/modules/web" "code.gitea.io/gitea/modules/web"
"code.gitea.io/gitea/routers/common" "code.gitea.io/gitea/routers/common"
@ -572,30 +571,11 @@ func Cancel(ctx *context_module.Context) {
var updatedjobs []*actions_model.ActionRunJob var updatedjobs []*actions_model.ActionRunJob
if err := db.WithTx(ctx, func(ctx context.Context) error { if err := db.WithTx(ctx, func(ctx context.Context) error {
for _, job := range jobs { cancelledJobs, err := actions_model.CancelJobs(ctx, jobs)
status := job.Status
if status.IsDone() {
continue
}
if job.TaskID == 0 {
job.Status = actions_model.StatusCancelled
job.Stopped = timeutil.TimeStampNow()
n, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
if err != nil { if err != nil {
return err return fmt.Errorf("cancel jobs: %w", err)
}
if n == 0 {
return errors.New("job has changed, try again")
}
if n > 0 {
updatedjobs = append(updatedjobs, job)
}
continue
}
if err := actions_model.StopTask(ctx, job.TaskID, actions_model.StatusCancelled); err != nil {
return err
}
} }
updatedjobs = append(updatedjobs, cancelledJobs...)
return nil return nil
}); err != nil { }); err != nil {
ctx.ServerError("StopTask", err) ctx.ServerError("StopTask", err)
@ -603,14 +583,7 @@ func Cancel(ctx *context_module.Context) {
} }
actions_service.CreateCommitStatus(ctx, jobs...) actions_service.CreateCommitStatus(ctx, jobs...)
actions_service.EmitJobsIfReadyByJobs(updatedjobs)
run, err := actions_model.GetRunByIndex(ctx, ctx.Repo.Repository.ID, runIndex)
if err != nil {
ctx.ServerError("GetRunByIndex", err)
}
if err := actions_service.EmitJobsIfReady(run.ID); err != nil {
log.Error("Emit ready jobs of run %d: %v", run.ID, err)
}
for _, job := range updatedjobs { for _, job := range updatedjobs {
_ = job.LoadAttributes(ctx) _ = job.LoadAttributes(ctx)

View File

@ -52,12 +52,14 @@ func notifyWorkflowJobStatusUpdate(ctx context.Context, jobs []*actions_model.Ac
func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) error { func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) error {
jobs, err := actions_model.CancelPreviousJobs(ctx, repoID, ref, workflowID, event) jobs, err := actions_model.CancelPreviousJobs(ctx, repoID, ref, workflowID, event)
notifyWorkflowJobStatusUpdate(ctx, jobs) notifyWorkflowJobStatusUpdate(ctx, jobs)
EmitJobsIfReadyByJobs(jobs)
return err return err
} }
func CleanRepoScheduleTasks(ctx context.Context, repo *repo_model.Repository) error { func CleanRepoScheduleTasks(ctx context.Context, repo *repo_model.Repository) error {
jobs, err := actions_model.CleanRepoScheduleTasks(ctx, repo) jobs, err := actions_model.CleanRepoScheduleTasks(ctx, repo)
notifyWorkflowJobStatusUpdate(ctx, jobs) notifyWorkflowJobStatusUpdate(ctx, jobs)
EmitJobsIfReadyByJobs(jobs)
return err return err
} }
@ -109,6 +111,7 @@ func stopTasks(ctx context.Context, opts actions_model.FindTaskOptions) error {
} }
notifyWorkflowJobStatusUpdate(ctx, jobs) notifyWorkflowJobStatusUpdate(ctx, jobs)
EmitJobsIfReadyByJobs(jobs)
return nil return nil
} }
@ -125,6 +128,7 @@ func CancelAbandonedJobs(ctx context.Context) error {
} }
now := timeutil.TimeStampNow() now := timeutil.TimeStampNow()
var updatedJobs []*actions_model.ActionRunJob
for _, job := range jobs { for _, job := range jobs {
job.Status = actions_model.StatusCancelled job.Status = actions_model.StatusCancelled
job.Stopped = now job.Stopped = now
@ -139,10 +143,13 @@ func CancelAbandonedJobs(ctx context.Context) error {
} }
CreateCommitStatus(ctx, job) CreateCommitStatus(ctx, job)
if updated { if updated {
updatedJobs = append(updatedJobs, job)
NotifyWorkflowRunStatusUpdateWithReload(ctx, job) NotifyWorkflowRunStatusUpdateWithReload(ctx, job)
notify_service.WorkflowJobStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job, nil) notify_service.WorkflowJobStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job, nil)
} }
} }
EmitJobsIfReadyByJobs(updatedJobs)
return nil return nil
} }

View File

@ -26,7 +26,7 @@ type jobUpdate struct {
RunID int64 RunID int64
} }
func EmitJobsIfReady(runID int64) error { func EmitJobsIfReadyByRun(runID int64) error {
err := jobEmitterQueue.Push(&jobUpdate{ err := jobEmitterQueue.Push(&jobUpdate{
RunID: runID, RunID: runID,
}) })
@ -36,6 +36,19 @@ func EmitJobsIfReady(runID int64) error {
return err return err
} }
func EmitJobsIfReadyByJobs(jobs []*actions_model.ActionRunJob) {
checkedRuns := make(container.Set[int64])
for _, job := range jobs {
if !job.Status.IsDone() || checkedRuns.Contains(job.RunID) {
continue
}
if err := EmitJobsIfReadyByRun(job.RunID); err != nil {
log.Error("Check jobs of run %d: %v", job.RunID, err)
}
checkedRuns.Add(job.RunID)
}
}
func jobEmitterQueueHandler(items ...*jobUpdate) []*jobUpdate { func jobEmitterQueueHandler(items ...*jobUpdate) []*jobUpdate {
ctx := graceful.GetManager().ShutdownContext() ctx := graceful.GetManager().ShutdownContext()
var ret []*jobUpdate var ret []*jobUpdate