From 796d2c171f5f534d024deb1bf27dbd6ebed61fdf Mon Sep 17 00:00:00 2001 From: Christopher Homberger Date: Fri, 25 Jul 2025 21:57:47 +0200 Subject: [PATCH] Use a single RawConcurrency in db --- models/actions/run.go | 49 ++++++++++++------------- models/actions/run_job.go | 20 ++++------ models/migrations/v1_25/v322.go | 12 +++--- routers/web/repo/actions/view.go | 57 ++++++++++++++++------------- services/actions/concurrency.go | 9 +++-- services/actions/job_emitter.go | 2 +- services/actions/notifier_helper.go | 7 ++++ services/actions/run.go | 10 +++-- services/actions/schedule_tasks.go | 6 +++ services/actions/workflow.go | 6 +++ 10 files changed, 100 insertions(+), 78 deletions(-) diff --git a/models/actions/run.go b/models/actions/run.go index 30c892484f4..9cd488335cd 100644 --- a/models/actions/run.go +++ b/models/actions/run.go @@ -27,31 +27,30 @@ import ( // ActionRun represents a run of a workflow file type ActionRun struct { - ID int64 - Title string - RepoID int64 `xorm:"index unique(repo_index) index(repo_concurrency)"` - Repo *repo_model.Repository `xorm:"-"` - OwnerID int64 `xorm:"index"` - WorkflowID string `xorm:"index"` // the name of workflow file - Index int64 `xorm:"index unique(repo_index)"` // a unique number for each run of a repository - TriggerUserID int64 `xorm:"index"` - TriggerUser *user_model.User `xorm:"-"` - ScheduleID int64 - Ref string `xorm:"index"` // the commit/tag/… that caused the run - IsRefDeleted bool `xorm:"-"` - CommitSHA string - IsForkPullRequest bool // If this is triggered by a PR from a forked repository or an untrusted user, we need to check if it is approved and limit permissions when running the workflow. - NeedApproval bool // may need approval if it's a fork pull request - ApprovedBy int64 `xorm:"index"` // who approved - Event webhook_module.HookEventType // the webhook event that causes the workflow to run - EventPayload string `xorm:"LONGTEXT"` - TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow - Status Status `xorm:"index"` - Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed - RawConcurrencyGroup string - RawConcurrencyCancel string - ConcurrencyGroup string `xorm:"index(repo_concurrency)"` - ConcurrencyCancel bool + ID int64 + Title string + RepoID int64 `xorm:"index unique(repo_index) index(repo_concurrency)"` + Repo *repo_model.Repository `xorm:"-"` + OwnerID int64 `xorm:"index"` + WorkflowID string `xorm:"index"` // the name of workflow file + Index int64 `xorm:"index unique(repo_index)"` // a unique number for each run of a repository + TriggerUserID int64 `xorm:"index"` + TriggerUser *user_model.User `xorm:"-"` + ScheduleID int64 + Ref string `xorm:"index"` // the commit/tag/… that caused the run + IsRefDeleted bool `xorm:"-"` + CommitSHA string + IsForkPullRequest bool // If this is triggered by a PR from a forked repository or an untrusted user, we need to check if it is approved and limit permissions when running the workflow. + NeedApproval bool // may need approval if it's a fork pull request + ApprovedBy int64 `xorm:"index"` // who approved + Event webhook_module.HookEventType // the webhook event that causes the workflow to run + EventPayload string `xorm:"LONGTEXT"` + TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow + Status Status `xorm:"index"` + Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed + RawConcurrency string // raw concurrency + ConcurrencyGroup string `xorm:"index(repo_concurrency)"` + ConcurrencyCancel bool // Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0 Started timeutil.TimeStamp Stopped timeutil.TimeStamp diff --git a/models/actions/run_job.go b/models/actions/run_job.go index 5497d6293e4..0e8152fb0af 100644 --- a/models/actions/run_job.go +++ b/models/actions/run_job.go @@ -36,9 +36,8 @@ type ActionRunJob struct { TaskID int64 // the latest task of the job Status Status `xorm:"index"` - RawConcurrencyGroup string // raw concurrency.group - RawConcurrencyCancel string // raw concurrency.cancel-in-progress - IsConcurrencyEvaluated bool // whether RawConcurrencyGroup have been evaluated, only valid when RawConcurrencyGroup is not empty + RawConcurrency string // raw concurrency + IsConcurrencyEvaluated bool // whether RawConcurrency has been evaluated, only valid when RawConcurrency is not empty ConcurrencyGroup string `xorm:"index(repo_concurrency)"` // evaluated concurrency.group ConcurrencyCancel bool // evaluated concurrency.cancel-in-progress @@ -206,13 +205,12 @@ func AggregateJobStatus(jobs []*ActionRunJob) Status { } func ShouldBlockJobByConcurrency(ctx context.Context, job *ActionRunJob) (bool, error) { - if job.RawConcurrencyGroup == "" { + if job.RawConcurrency == "" { return false, nil } if !job.IsConcurrencyEvaluated { return false, ErrUnevaluatedConcurrency{ - Group: job.RawConcurrencyGroup, - CancelInProgress: job.RawConcurrencyCancel, + RawConcurrency: job.RawConcurrency, } } if job.ConcurrencyGroup == "" || job.ConcurrencyCancel { @@ -228,7 +226,7 @@ func ShouldBlockJobByConcurrency(ctx context.Context, job *ActionRunJob) (bool, } func CancelPreviousJobsByJobConcurrency(ctx context.Context, job *ActionRunJob) ([]*ActionRunJob, error) { - if job.RawConcurrencyGroup == "" { + if job.RawConcurrency == "" { return nil, nil } @@ -236,8 +234,7 @@ func CancelPreviousJobsByJobConcurrency(ctx context.Context, job *ActionRunJob) if !job.IsConcurrencyEvaluated { return nil, ErrUnevaluatedConcurrency{ - Group: job.RawConcurrencyGroup, - CancelInProgress: job.RawConcurrencyCancel, + RawConcurrency: job.RawConcurrency, } } if job.ConcurrencyGroup == "" { @@ -270,8 +267,7 @@ func CancelPreviousJobsByJobConcurrency(ctx context.Context, job *ActionRunJob) } type ErrUnevaluatedConcurrency struct { - Group string - CancelInProgress string + RawConcurrency string } func IsErrUnevaluatedConcurrency(err error) bool { @@ -280,5 +276,5 @@ func IsErrUnevaluatedConcurrency(err error) bool { } func (err ErrUnevaluatedConcurrency) Error() string { - return fmt.Sprintf("the raw concurrency [group=%s, cancel-in-progress=%s] is not evaluated", err.Group, err.CancelInProgress) + return fmt.Sprintf("the raw concurrency [%s] is not evaluated", err.RawConcurrency) } diff --git a/models/migrations/v1_25/v322.go b/models/migrations/v1_25/v322.go index a3241de91d8..b6525d8d7cf 100644 --- a/models/migrations/v1_25/v322.go +++ b/models/migrations/v1_25/v322.go @@ -9,11 +9,10 @@ import ( func AddActionsConcurrency(x *xorm.Engine) error { type ActionRun struct { - RepoID int64 `xorm:"index unique(repo_index) index(repo_concurrency)"` - RawConcurrencyGroup string - RawConcurrencyCancel string - ConcurrencyGroup string `xorm:"index(repo_concurrency)"` - ConcurrencyCancel bool + RepoID int64 `xorm:"index unique(repo_index) index(repo_concurrency)"` + RawConcurrency string + ConcurrencyGroup string `xorm:"index(repo_concurrency)"` + ConcurrencyCancel bool } if err := x.Sync(new(ActionRun)); err != nil { @@ -22,8 +21,7 @@ func AddActionsConcurrency(x *xorm.Engine) error { type ActionRunJob struct { RepoID int64 `xorm:"index index(repo_concurrency)"` - RawConcurrencyGroup string - RawConcurrencyCancel string + RawConcurrency string IsConcurrencyEvaluated bool ConcurrencyGroup string `xorm:"index(repo_concurrency)"` ConcurrencyCancel bool diff --git a/routers/web/repo/actions/view.go b/routers/web/repo/actions/view.go index 670b41ca888..800635773ae 100644 --- a/routers/web/repo/actions/view.go +++ b/routers/web/repo/actions/view.go @@ -33,6 +33,7 @@ import ( actions_service "code.gitea.io/gitea/services/actions" context_module "code.gitea.io/gitea/services/context" notify_service "code.gitea.io/gitea/services/notify" + "gopkg.in/yaml.v3" "github.com/nektos/act/pkg/model" "xorm.io/builder" @@ -440,32 +441,36 @@ func Rerun(ctx *context_module.Context) { return } - wfConcurrencyGroup, wfConcurrencyCancel, err := actions_service.EvaluateWorkflowConcurrency(ctx, run, &model.RawConcurrency{ - Group: run.RawConcurrencyGroup, - CancelInProgress: run.RawConcurrencyCancel, - }, vars) - if err != nil { - ctx.ServerError("EvaluateWorkflowConcurrency", fmt.Errorf("evaluate workflow concurrency: %w", err)) - return - } - if wfConcurrencyGroup != "" { - run.ConcurrencyGroup = wfConcurrencyGroup - run.ConcurrencyCancel = wfConcurrencyCancel - } + if run.RawConcurrency != "" { + var rawConcurrency model.RawConcurrency + if err := yaml.Unmarshal([]byte(run.RawConcurrency), &rawConcurrency); err != nil { + ctx.ServerError("UnmarshalRawConcurrency", fmt.Errorf("unmarshal raw concurrency: %w", err)) + return + } + wfConcurrencyGroup, wfConcurrencyCancel, err := actions_service.EvaluateWorkflowConcurrency(ctx, run, &rawConcurrency, vars) + if err != nil { + ctx.ServerError("EvaluateWorkflowConcurrency", fmt.Errorf("evaluate workflow concurrency: %w", err)) + return + } + if wfConcurrencyGroup != "" { + run.ConcurrencyGroup = wfConcurrencyGroup + run.ConcurrencyCancel = wfConcurrencyCancel + } - blockRunByConcurrency, err = actions_model.ShouldBlockRunByConcurrency(ctx, run) - if err != nil { - ctx.ServerError("ShouldBlockRunByConcurrency", err) - return - } - if blockRunByConcurrency { - run.Status = actions_model.StatusBlocked - } else { - run.Status = actions_model.StatusRunning - } - if err := actions_service.CancelJobsByRunConcurrency(ctx, run); err != nil { - ctx.ServerError("cancel jobs", err) - return + blockRunByConcurrency, err = actions_model.ShouldBlockRunByConcurrency(ctx, run) + if err != nil { + ctx.ServerError("ShouldBlockRunByConcurrency", err) + return + } + if blockRunByConcurrency { + run.Status = actions_model.StatusBlocked + } else { + run.Status = actions_model.StatusRunning + } + if err := actions_service.CancelJobsByRunConcurrency(ctx, run); err != nil { + ctx.ServerError("cancel jobs", err) + return + } } if err := actions_model.UpdateRun(ctx, run, "started", "stopped", "previous_duration", "status", "concurrency_group", "concurrency_cancel"); err != nil { ctx.ServerError("UpdateRun", err) @@ -524,7 +529,7 @@ func rerunJob(ctx *context_module.Context, job *actions_model.ActionRunJob, shou if err != nil { return fmt.Errorf("get run %d variables: %w", job.Run.ID, err) } - if job.RawConcurrencyGroup != "" && job.Status != actions_model.StatusBlocked { + if job.RawConcurrency != "" && job.Status != actions_model.StatusBlocked { var err error job.ConcurrencyGroup, job.ConcurrencyCancel, err = actions_service.EvaluateJobConcurrency(ctx, job.Run, job, vars, nil) if err != nil { diff --git a/services/actions/concurrency.go b/services/actions/concurrency.go index 5bd07f0cee7..56cad3103ff 100644 --- a/services/actions/concurrency.go +++ b/services/actions/concurrency.go @@ -11,6 +11,7 @@ import ( actions_model "code.gitea.io/gitea/models/actions" "code.gitea.io/gitea/modules/json" api "code.gitea.io/gitea/modules/structs" + "gopkg.in/yaml.v3" "github.com/nektos/act/pkg/jobparser" act_model "github.com/nektos/act/pkg/model" @@ -41,9 +42,9 @@ func EvaluateJobConcurrency(ctx context.Context, run *actions_model.ActionRun, a return "", false, fmt.Errorf("job LoadAttributes: %w", err) } - rawConcurrency := &act_model.RawConcurrency{ - Group: actionRunJob.RawConcurrencyGroup, - CancelInProgress: actionRunJob.RawConcurrencyCancel, + var rawConcurrency act_model.RawConcurrency + if err := yaml.Unmarshal([]byte(actionRunJob.RawConcurrency), &rawConcurrency); err != nil { + return "", false, fmt.Errorf("unmarshal raw concurrency: %w", err) } gitCtx := GenerateGiteaContext(run, actionRunJob) @@ -66,7 +67,7 @@ func EvaluateJobConcurrency(ctx context.Context, run *actions_model.ActionRun, a } _, singleWorkflowJob := singleWorkflows[0].Job() - concurrencyGroup, concurrencyCancel, err := jobparser.EvaluateConcurrency(rawConcurrency, actionRunJob.JobID, singleWorkflowJob, gitCtx, jobResults, vars, inputs) + concurrencyGroup, concurrencyCancel, err := jobparser.EvaluateConcurrency(&rawConcurrency, actionRunJob.JobID, singleWorkflowJob, gitCtx, jobResults, vars, inputs) if err != nil { return "", false, fmt.Errorf("evaluate concurrency: %w", err) } diff --git a/services/actions/job_emitter.go b/services/actions/job_emitter.go index f44f5721634..42c6daa7fe1 100644 --- a/services/actions/job_emitter.go +++ b/services/actions/job_emitter.go @@ -326,7 +326,7 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model } func checkConcurrencyForJobWithNeeds(ctx context.Context, actionRunJob *actions_model.ActionRunJob, vars map[string]string) (bool, error) { - if actionRunJob.RawConcurrencyGroup == "" { + if actionRunJob.RawConcurrency == "" { return false, nil } if err := actionRunJob.LoadAttributes(ctx); err != nil { diff --git a/services/actions/notifier_helper.go b/services/actions/notifier_helper.go index 5bb6413ceea..5eb0898f874 100644 --- a/services/actions/notifier_helper.go +++ b/services/actions/notifier_helper.go @@ -28,6 +28,7 @@ import ( webhook_module "code.gitea.io/gitea/modules/webhook" "code.gitea.io/gitea/services/convert" notify_service "code.gitea.io/gitea/services/notify" + "gopkg.in/yaml.v3" "github.com/nektos/act/pkg/jobparser" "github.com/nektos/act/pkg/model" @@ -363,6 +364,12 @@ func handleWorkflows( continue } if wfRawConcurrency != nil { + rawConcurrency, err := yaml.Marshal(wfRawConcurrency) + if err != nil { + log.Error("Marshal raw concurrency: %v", err) + continue + } + run.RawConcurrency = string(rawConcurrency) wfConcurrencyGroup, wfConcurrencyCancel, err := EvaluateWorkflowConcurrency(ctx, run, wfRawConcurrency, vars) if err != nil { log.Error("EvaluateWorkflowConcurrency: %v", err) diff --git a/services/actions/run.go b/services/actions/run.go index e35c5c2f586..f602a8f5d35 100644 --- a/services/actions/run.go +++ b/services/actions/run.go @@ -11,6 +11,7 @@ import ( "code.gitea.io/gitea/models/db" repo_model "code.gitea.io/gitea/models/repo" "code.gitea.io/gitea/modules/util" + "gopkg.in/yaml.v3" "github.com/nektos/act/pkg/jobparser" ) @@ -90,9 +91,12 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar Status: status, } // check job concurrency - if job.RawConcurrency != nil && job.RawConcurrency.Group != "" { - runJob.RawConcurrencyGroup = job.RawConcurrency.Group - runJob.RawConcurrencyCancel = job.RawConcurrency.CancelInProgress + if job.RawConcurrency != nil { + rawConcurrency, err := yaml.Marshal(job.RawConcurrency) + if err != nil { + return fmt.Errorf("marshal raw concurrency: %w", err) + } + runJob.RawConcurrency = string(rawConcurrency) // do not evaluate job concurrency when it requires `needs` if len(needs) == 0 { var err error diff --git a/services/actions/schedule_tasks.go b/services/actions/schedule_tasks.go index 5c0bfc90129..444f22d351e 100644 --- a/services/actions/schedule_tasks.go +++ b/services/actions/schedule_tasks.go @@ -16,6 +16,7 @@ import ( "code.gitea.io/gitea/modules/timeutil" webhook_module "code.gitea.io/gitea/modules/webhook" notify_service "code.gitea.io/gitea/services/notify" + "gopkg.in/yaml.v3" "github.com/nektos/act/pkg/jobparser" ) @@ -135,6 +136,11 @@ func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule) return err } if wfRawConcurrency != nil { + rawConcurrency, err := yaml.Marshal(wfRawConcurrency) + if err != nil { + return fmt.Errorf("marshal raw concurrency: %w", err) + } + run.RawConcurrency = string(rawConcurrency) wfConcurrencyGroup, wfConcurrencyCancel, err := EvaluateWorkflowConcurrency(ctx, run, wfRawConcurrency, vars) if err != nil { return err diff --git a/services/actions/workflow.go b/services/actions/workflow.go index 679b73ddf6f..f8f4d76d0c7 100644 --- a/services/actions/workflow.go +++ b/services/actions/workflow.go @@ -23,6 +23,7 @@ import ( "code.gitea.io/gitea/services/context" "code.gitea.io/gitea/services/convert" notify_service "code.gitea.io/gitea/services/notify" + "gopkg.in/yaml.v3" "github.com/nektos/act/pkg/jobparser" "github.com/nektos/act/pkg/model" @@ -194,6 +195,11 @@ func DispatchActionWorkflow(ctx reqctx.RequestContext, doer *user_model.User, re if err != nil { return err } + rawConcurrency, err := yaml.Marshal(wfRawConcurrency) + if err != nil { + return fmt.Errorf("marshal raw concurrency: %w", err) + } + run.RawConcurrency = string(rawConcurrency) wfConcurrencyGroup, wfConcurrencyCancel, err := EvaluateWorkflowConcurrency(ctx, run, wfRawConcurrency, vars) if err != nil { return err