Use a single RawConcurrency in db

This commit is contained in:
Christopher Homberger 2025-07-25 21:57:47 +02:00
parent d5f6c44a49
commit 796d2c171f
10 changed files with 100 additions and 78 deletions

View File

@ -27,31 +27,30 @@ import (
// ActionRun represents a run of a workflow file // ActionRun represents a run of a workflow file
type ActionRun struct { type ActionRun struct {
ID int64 ID int64
Title string Title string
RepoID int64 `xorm:"index unique(repo_index) index(repo_concurrency)"` RepoID int64 `xorm:"index unique(repo_index) index(repo_concurrency)"`
Repo *repo_model.Repository `xorm:"-"` Repo *repo_model.Repository `xorm:"-"`
OwnerID int64 `xorm:"index"` OwnerID int64 `xorm:"index"`
WorkflowID string `xorm:"index"` // the name of workflow file WorkflowID string `xorm:"index"` // the name of workflow file
Index int64 `xorm:"index unique(repo_index)"` // a unique number for each run of a repository Index int64 `xorm:"index unique(repo_index)"` // a unique number for each run of a repository
TriggerUserID int64 `xorm:"index"` TriggerUserID int64 `xorm:"index"`
TriggerUser *user_model.User `xorm:"-"` TriggerUser *user_model.User `xorm:"-"`
ScheduleID int64 ScheduleID int64
Ref string `xorm:"index"` // the commit/tag/… that caused the run Ref string `xorm:"index"` // the commit/tag/… that caused the run
IsRefDeleted bool `xorm:"-"` IsRefDeleted bool `xorm:"-"`
CommitSHA string CommitSHA string
IsForkPullRequest bool // If this is triggered by a PR from a forked repository or an untrusted user, we need to check if it is approved and limit permissions when running the workflow. IsForkPullRequest bool // If this is triggered by a PR from a forked repository or an untrusted user, we need to check if it is approved and limit permissions when running the workflow.
NeedApproval bool // may need approval if it's a fork pull request NeedApproval bool // may need approval if it's a fork pull request
ApprovedBy int64 `xorm:"index"` // who approved ApprovedBy int64 `xorm:"index"` // who approved
Event webhook_module.HookEventType // the webhook event that causes the workflow to run Event webhook_module.HookEventType // the webhook event that causes the workflow to run
EventPayload string `xorm:"LONGTEXT"` EventPayload string `xorm:"LONGTEXT"`
TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow
Status Status `xorm:"index"` Status Status `xorm:"index"`
Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed
RawConcurrencyGroup string RawConcurrency string // raw concurrency
RawConcurrencyCancel string ConcurrencyGroup string `xorm:"index(repo_concurrency)"`
ConcurrencyGroup string `xorm:"index(repo_concurrency)"` ConcurrencyCancel bool
ConcurrencyCancel bool
// Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0 // Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0
Started timeutil.TimeStamp Started timeutil.TimeStamp
Stopped timeutil.TimeStamp Stopped timeutil.TimeStamp

View File

@ -36,9 +36,8 @@ type ActionRunJob struct {
TaskID int64 // the latest task of the job TaskID int64 // the latest task of the job
Status Status `xorm:"index"` Status Status `xorm:"index"`
RawConcurrencyGroup string // raw concurrency.group RawConcurrency string // raw concurrency
RawConcurrencyCancel string // raw concurrency.cancel-in-progress IsConcurrencyEvaluated bool // whether RawConcurrency has been evaluated, only valid when RawConcurrency is not empty
IsConcurrencyEvaluated bool // whether RawConcurrencyGroup have been evaluated, only valid when RawConcurrencyGroup is not empty
ConcurrencyGroup string `xorm:"index(repo_concurrency)"` // evaluated concurrency.group ConcurrencyGroup string `xorm:"index(repo_concurrency)"` // evaluated concurrency.group
ConcurrencyCancel bool // evaluated concurrency.cancel-in-progress ConcurrencyCancel bool // evaluated concurrency.cancel-in-progress
@ -206,13 +205,12 @@ func AggregateJobStatus(jobs []*ActionRunJob) Status {
} }
func ShouldBlockJobByConcurrency(ctx context.Context, job *ActionRunJob) (bool, error) { func ShouldBlockJobByConcurrency(ctx context.Context, job *ActionRunJob) (bool, error) {
if job.RawConcurrencyGroup == "" { if job.RawConcurrency == "" {
return false, nil return false, nil
} }
if !job.IsConcurrencyEvaluated { if !job.IsConcurrencyEvaluated {
return false, ErrUnevaluatedConcurrency{ return false, ErrUnevaluatedConcurrency{
Group: job.RawConcurrencyGroup, RawConcurrency: job.RawConcurrency,
CancelInProgress: job.RawConcurrencyCancel,
} }
} }
if job.ConcurrencyGroup == "" || job.ConcurrencyCancel { if job.ConcurrencyGroup == "" || job.ConcurrencyCancel {
@ -228,7 +226,7 @@ func ShouldBlockJobByConcurrency(ctx context.Context, job *ActionRunJob) (bool,
} }
func CancelPreviousJobsByJobConcurrency(ctx context.Context, job *ActionRunJob) ([]*ActionRunJob, error) { func CancelPreviousJobsByJobConcurrency(ctx context.Context, job *ActionRunJob) ([]*ActionRunJob, error) {
if job.RawConcurrencyGroup == "" { if job.RawConcurrency == "" {
return nil, nil return nil, nil
} }
@ -236,8 +234,7 @@ func CancelPreviousJobsByJobConcurrency(ctx context.Context, job *ActionRunJob)
if !job.IsConcurrencyEvaluated { if !job.IsConcurrencyEvaluated {
return nil, ErrUnevaluatedConcurrency{ return nil, ErrUnevaluatedConcurrency{
Group: job.RawConcurrencyGroup, RawConcurrency: job.RawConcurrency,
CancelInProgress: job.RawConcurrencyCancel,
} }
} }
if job.ConcurrencyGroup == "" { if job.ConcurrencyGroup == "" {
@ -270,8 +267,7 @@ func CancelPreviousJobsByJobConcurrency(ctx context.Context, job *ActionRunJob)
} }
type ErrUnevaluatedConcurrency struct { type ErrUnevaluatedConcurrency struct {
Group string RawConcurrency string
CancelInProgress string
} }
func IsErrUnevaluatedConcurrency(err error) bool { func IsErrUnevaluatedConcurrency(err error) bool {
@ -280,5 +276,5 @@ func IsErrUnevaluatedConcurrency(err error) bool {
} }
func (err ErrUnevaluatedConcurrency) Error() string { func (err ErrUnevaluatedConcurrency) Error() string {
return fmt.Sprintf("the raw concurrency [group=%s, cancel-in-progress=%s] is not evaluated", err.Group, err.CancelInProgress) return fmt.Sprintf("the raw concurrency [%s] is not evaluated", err.RawConcurrency)
} }

View File

@ -9,11 +9,10 @@ import (
func AddActionsConcurrency(x *xorm.Engine) error { func AddActionsConcurrency(x *xorm.Engine) error {
type ActionRun struct { type ActionRun struct {
RepoID int64 `xorm:"index unique(repo_index) index(repo_concurrency)"` RepoID int64 `xorm:"index unique(repo_index) index(repo_concurrency)"`
RawConcurrencyGroup string RawConcurrency string
RawConcurrencyCancel string ConcurrencyGroup string `xorm:"index(repo_concurrency)"`
ConcurrencyGroup string `xorm:"index(repo_concurrency)"` ConcurrencyCancel bool
ConcurrencyCancel bool
} }
if err := x.Sync(new(ActionRun)); err != nil { if err := x.Sync(new(ActionRun)); err != nil {
@ -22,8 +21,7 @@ func AddActionsConcurrency(x *xorm.Engine) error {
type ActionRunJob struct { type ActionRunJob struct {
RepoID int64 `xorm:"index index(repo_concurrency)"` RepoID int64 `xorm:"index index(repo_concurrency)"`
RawConcurrencyGroup string RawConcurrency string
RawConcurrencyCancel string
IsConcurrencyEvaluated bool IsConcurrencyEvaluated bool
ConcurrencyGroup string `xorm:"index(repo_concurrency)"` ConcurrencyGroup string `xorm:"index(repo_concurrency)"`
ConcurrencyCancel bool ConcurrencyCancel bool

View File

@ -33,6 +33,7 @@ import (
actions_service "code.gitea.io/gitea/services/actions" actions_service "code.gitea.io/gitea/services/actions"
context_module "code.gitea.io/gitea/services/context" context_module "code.gitea.io/gitea/services/context"
notify_service "code.gitea.io/gitea/services/notify" notify_service "code.gitea.io/gitea/services/notify"
"gopkg.in/yaml.v3"
"github.com/nektos/act/pkg/model" "github.com/nektos/act/pkg/model"
"xorm.io/builder" "xorm.io/builder"
@ -440,32 +441,36 @@ func Rerun(ctx *context_module.Context) {
return return
} }
wfConcurrencyGroup, wfConcurrencyCancel, err := actions_service.EvaluateWorkflowConcurrency(ctx, run, &model.RawConcurrency{ if run.RawConcurrency != "" {
Group: run.RawConcurrencyGroup, var rawConcurrency model.RawConcurrency
CancelInProgress: run.RawConcurrencyCancel, if err := yaml.Unmarshal([]byte(run.RawConcurrency), &rawConcurrency); err != nil {
}, vars) ctx.ServerError("UnmarshalRawConcurrency", fmt.Errorf("unmarshal raw concurrency: %w", err))
if err != nil { return
ctx.ServerError("EvaluateWorkflowConcurrency", fmt.Errorf("evaluate workflow concurrency: %w", err)) }
return wfConcurrencyGroup, wfConcurrencyCancel, err := actions_service.EvaluateWorkflowConcurrency(ctx, run, &rawConcurrency, vars)
} if err != nil {
if wfConcurrencyGroup != "" { ctx.ServerError("EvaluateWorkflowConcurrency", fmt.Errorf("evaluate workflow concurrency: %w", err))
run.ConcurrencyGroup = wfConcurrencyGroup return
run.ConcurrencyCancel = wfConcurrencyCancel }
} if wfConcurrencyGroup != "" {
run.ConcurrencyGroup = wfConcurrencyGroup
run.ConcurrencyCancel = wfConcurrencyCancel
}
blockRunByConcurrency, err = actions_model.ShouldBlockRunByConcurrency(ctx, run) blockRunByConcurrency, err = actions_model.ShouldBlockRunByConcurrency(ctx, run)
if err != nil { if err != nil {
ctx.ServerError("ShouldBlockRunByConcurrency", err) ctx.ServerError("ShouldBlockRunByConcurrency", err)
return return
} }
if blockRunByConcurrency { if blockRunByConcurrency {
run.Status = actions_model.StatusBlocked run.Status = actions_model.StatusBlocked
} else { } else {
run.Status = actions_model.StatusRunning run.Status = actions_model.StatusRunning
} }
if err := actions_service.CancelJobsByRunConcurrency(ctx, run); err != nil { if err := actions_service.CancelJobsByRunConcurrency(ctx, run); err != nil {
ctx.ServerError("cancel jobs", err) ctx.ServerError("cancel jobs", err)
return return
}
} }
if err := actions_model.UpdateRun(ctx, run, "started", "stopped", "previous_duration", "status", "concurrency_group", "concurrency_cancel"); err != nil { if err := actions_model.UpdateRun(ctx, run, "started", "stopped", "previous_duration", "status", "concurrency_group", "concurrency_cancel"); err != nil {
ctx.ServerError("UpdateRun", err) ctx.ServerError("UpdateRun", err)
@ -524,7 +529,7 @@ func rerunJob(ctx *context_module.Context, job *actions_model.ActionRunJob, shou
if err != nil { if err != nil {
return fmt.Errorf("get run %d variables: %w", job.Run.ID, err) return fmt.Errorf("get run %d variables: %w", job.Run.ID, err)
} }
if job.RawConcurrencyGroup != "" && job.Status != actions_model.StatusBlocked { if job.RawConcurrency != "" && job.Status != actions_model.StatusBlocked {
var err error var err error
job.ConcurrencyGroup, job.ConcurrencyCancel, err = actions_service.EvaluateJobConcurrency(ctx, job.Run, job, vars, nil) job.ConcurrencyGroup, job.ConcurrencyCancel, err = actions_service.EvaluateJobConcurrency(ctx, job.Run, job, vars, nil)
if err != nil { if err != nil {

View File

@ -11,6 +11,7 @@ import (
actions_model "code.gitea.io/gitea/models/actions" actions_model "code.gitea.io/gitea/models/actions"
"code.gitea.io/gitea/modules/json" "code.gitea.io/gitea/modules/json"
api "code.gitea.io/gitea/modules/structs" api "code.gitea.io/gitea/modules/structs"
"gopkg.in/yaml.v3"
"github.com/nektos/act/pkg/jobparser" "github.com/nektos/act/pkg/jobparser"
act_model "github.com/nektos/act/pkg/model" act_model "github.com/nektos/act/pkg/model"
@ -41,9 +42,9 @@ func EvaluateJobConcurrency(ctx context.Context, run *actions_model.ActionRun, a
return "", false, fmt.Errorf("job LoadAttributes: %w", err) return "", false, fmt.Errorf("job LoadAttributes: %w", err)
} }
rawConcurrency := &act_model.RawConcurrency{ var rawConcurrency act_model.RawConcurrency
Group: actionRunJob.RawConcurrencyGroup, if err := yaml.Unmarshal([]byte(actionRunJob.RawConcurrency), &rawConcurrency); err != nil {
CancelInProgress: actionRunJob.RawConcurrencyCancel, return "", false, fmt.Errorf("unmarshal raw concurrency: %w", err)
} }
gitCtx := GenerateGiteaContext(run, actionRunJob) gitCtx := GenerateGiteaContext(run, actionRunJob)
@ -66,7 +67,7 @@ func EvaluateJobConcurrency(ctx context.Context, run *actions_model.ActionRun, a
} }
_, singleWorkflowJob := singleWorkflows[0].Job() _, singleWorkflowJob := singleWorkflows[0].Job()
concurrencyGroup, concurrencyCancel, err := jobparser.EvaluateConcurrency(rawConcurrency, actionRunJob.JobID, singleWorkflowJob, gitCtx, jobResults, vars, inputs) concurrencyGroup, concurrencyCancel, err := jobparser.EvaluateConcurrency(&rawConcurrency, actionRunJob.JobID, singleWorkflowJob, gitCtx, jobResults, vars, inputs)
if err != nil { if err != nil {
return "", false, fmt.Errorf("evaluate concurrency: %w", err) return "", false, fmt.Errorf("evaluate concurrency: %w", err)
} }

View File

@ -326,7 +326,7 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model
} }
func checkConcurrencyForJobWithNeeds(ctx context.Context, actionRunJob *actions_model.ActionRunJob, vars map[string]string) (bool, error) { func checkConcurrencyForJobWithNeeds(ctx context.Context, actionRunJob *actions_model.ActionRunJob, vars map[string]string) (bool, error) {
if actionRunJob.RawConcurrencyGroup == "" { if actionRunJob.RawConcurrency == "" {
return false, nil return false, nil
} }
if err := actionRunJob.LoadAttributes(ctx); err != nil { if err := actionRunJob.LoadAttributes(ctx); err != nil {

View File

@ -28,6 +28,7 @@ import (
webhook_module "code.gitea.io/gitea/modules/webhook" webhook_module "code.gitea.io/gitea/modules/webhook"
"code.gitea.io/gitea/services/convert" "code.gitea.io/gitea/services/convert"
notify_service "code.gitea.io/gitea/services/notify" notify_service "code.gitea.io/gitea/services/notify"
"gopkg.in/yaml.v3"
"github.com/nektos/act/pkg/jobparser" "github.com/nektos/act/pkg/jobparser"
"github.com/nektos/act/pkg/model" "github.com/nektos/act/pkg/model"
@ -363,6 +364,12 @@ func handleWorkflows(
continue continue
} }
if wfRawConcurrency != nil { if wfRawConcurrency != nil {
rawConcurrency, err := yaml.Marshal(wfRawConcurrency)
if err != nil {
log.Error("Marshal raw concurrency: %v", err)
continue
}
run.RawConcurrency = string(rawConcurrency)
wfConcurrencyGroup, wfConcurrencyCancel, err := EvaluateWorkflowConcurrency(ctx, run, wfRawConcurrency, vars) wfConcurrencyGroup, wfConcurrencyCancel, err := EvaluateWorkflowConcurrency(ctx, run, wfRawConcurrency, vars)
if err != nil { if err != nil {
log.Error("EvaluateWorkflowConcurrency: %v", err) log.Error("EvaluateWorkflowConcurrency: %v", err)

View File

@ -11,6 +11,7 @@ import (
"code.gitea.io/gitea/models/db" "code.gitea.io/gitea/models/db"
repo_model "code.gitea.io/gitea/models/repo" repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/modules/util" "code.gitea.io/gitea/modules/util"
"gopkg.in/yaml.v3"
"github.com/nektos/act/pkg/jobparser" "github.com/nektos/act/pkg/jobparser"
) )
@ -90,9 +91,12 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar
Status: status, Status: status,
} }
// check job concurrency // check job concurrency
if job.RawConcurrency != nil && job.RawConcurrency.Group != "" { if job.RawConcurrency != nil {
runJob.RawConcurrencyGroup = job.RawConcurrency.Group rawConcurrency, err := yaml.Marshal(job.RawConcurrency)
runJob.RawConcurrencyCancel = job.RawConcurrency.CancelInProgress if err != nil {
return fmt.Errorf("marshal raw concurrency: %w", err)
}
runJob.RawConcurrency = string(rawConcurrency)
// do not evaluate job concurrency when it requires `needs` // do not evaluate job concurrency when it requires `needs`
if len(needs) == 0 { if len(needs) == 0 {
var err error var err error

View File

@ -16,6 +16,7 @@ import (
"code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/timeutil"
webhook_module "code.gitea.io/gitea/modules/webhook" webhook_module "code.gitea.io/gitea/modules/webhook"
notify_service "code.gitea.io/gitea/services/notify" notify_service "code.gitea.io/gitea/services/notify"
"gopkg.in/yaml.v3"
"github.com/nektos/act/pkg/jobparser" "github.com/nektos/act/pkg/jobparser"
) )
@ -135,6 +136,11 @@ func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule)
return err return err
} }
if wfRawConcurrency != nil { if wfRawConcurrency != nil {
rawConcurrency, err := yaml.Marshal(wfRawConcurrency)
if err != nil {
return fmt.Errorf("marshal raw concurrency: %w", err)
}
run.RawConcurrency = string(rawConcurrency)
wfConcurrencyGroup, wfConcurrencyCancel, err := EvaluateWorkflowConcurrency(ctx, run, wfRawConcurrency, vars) wfConcurrencyGroup, wfConcurrencyCancel, err := EvaluateWorkflowConcurrency(ctx, run, wfRawConcurrency, vars)
if err != nil { if err != nil {
return err return err

View File

@ -23,6 +23,7 @@ import (
"code.gitea.io/gitea/services/context" "code.gitea.io/gitea/services/context"
"code.gitea.io/gitea/services/convert" "code.gitea.io/gitea/services/convert"
notify_service "code.gitea.io/gitea/services/notify" notify_service "code.gitea.io/gitea/services/notify"
"gopkg.in/yaml.v3"
"github.com/nektos/act/pkg/jobparser" "github.com/nektos/act/pkg/jobparser"
"github.com/nektos/act/pkg/model" "github.com/nektos/act/pkg/model"
@ -194,6 +195,11 @@ func DispatchActionWorkflow(ctx reqctx.RequestContext, doer *user_model.User, re
if err != nil { if err != nil {
return err return err
} }
rawConcurrency, err := yaml.Marshal(wfRawConcurrency)
if err != nil {
return fmt.Errorf("marshal raw concurrency: %w", err)
}
run.RawConcurrency = string(rawConcurrency)
wfConcurrencyGroup, wfConcurrencyCancel, err := EvaluateWorkflowConcurrency(ctx, run, wfRawConcurrency, vars) wfConcurrencyGroup, wfConcurrencyCancel, err := EvaluateWorkflowConcurrency(ctx, run, wfRawConcurrency, vars)
if err != nil { if err != nil {
return err return err