modify cancel-in-progress behavior

This commit is contained in:
Zettat123 2025-07-21 22:31:47 -06:00
parent c5444e742f
commit 63eaf15bd2
6 changed files with 127 additions and 92 deletions

View File

@ -415,34 +415,40 @@ func ShouldBlockRunByConcurrency(ctx context.Context, actionRun *ActionRun) (boo
}
func CancelPreviousJobsByRunConcurrency(ctx context.Context, actionRun *ActionRun) ([]*ActionRunJob, error) {
if actionRun.ConcurrencyGroup == "" {
return nil, nil
}
var cancelledJobs []*ActionRunJob
if actionRun.ConcurrencyGroup != "" && actionRun.ConcurrencyCancel {
// cancel previous runs in the same concurrency group
runs, err := db.Find[ActionRun](ctx, &FindRunOptions{
RepoID: actionRun.RepoID,
ConcurrencyGroup: actionRun.ConcurrencyGroup,
Status: []Status{StatusRunning, StatusWaiting, StatusBlocked},
statusFindOption := []Status{StatusWaiting, StatusBlocked}
if actionRun.ConcurrencyCancel {
statusFindOption = append(statusFindOption, StatusRunning)
}
// cancel previous runs in the same concurrency group
runs, err := db.Find[ActionRun](ctx, &FindRunOptions{
RepoID: actionRun.RepoID,
ConcurrencyGroup: actionRun.ConcurrencyGroup,
Status: statusFindOption,
})
if err != nil {
return cancelledJobs, fmt.Errorf("find runs: %w", err)
}
for _, run := range runs {
if run.ID == actionRun.ID {
continue
}
jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
RunID: run.ID,
})
if err != nil {
return cancelledJobs, fmt.Errorf("find runs: %w", err)
return cancelledJobs, fmt.Errorf("find run %d jobs: %w", run.ID, err)
}
for _, run := range runs {
if run.ID == actionRun.ID {
continue
}
jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
RunID: run.ID,
})
if err != nil {
return cancelledJobs, fmt.Errorf("find run %d jobs: %w", run.ID, err)
}
cjs, err := CancelJobs(ctx, jobs)
if err != nil {
return cancelledJobs, fmt.Errorf("cancel run %d jobs: %w", run.ID, err)
}
cancelledJobs = append(cancelledJobs, cjs...)
cjs, err := CancelJobs(ctx, jobs)
if err != nil {
return cancelledJobs, fmt.Errorf("cancel run %d jobs: %w", run.ID, err)
}
cancelledJobs = append(cancelledJobs, cjs...)
}
return cancelledJobs, nil

View File

@ -239,32 +239,38 @@ func ShouldBlockJobByConcurrency(ctx context.Context, job *ActionRunJob) (bool,
}
func CancelPreviousJobsByJobConcurrency(ctx context.Context, job *ActionRunJob) ([]*ActionRunJob, error) {
if job.RawConcurrencyGroup == "" {
return nil, nil
}
var cancelledJobs []*ActionRunJob
if job.RawConcurrencyGroup != "" {
if !job.IsConcurrencyEvaluated {
return cancelledJobs, ErrUnevaluatedConcurrency{
Group: job.RawConcurrencyGroup,
CancelInProgress: job.RawConcurrencyCancel,
}
if !job.IsConcurrencyEvaluated {
return cancelledJobs, ErrUnevaluatedConcurrency{
Group: job.RawConcurrencyGroup,
CancelInProgress: job.RawConcurrencyCancel,
}
if job.ConcurrencyGroup != "" && job.ConcurrencyCancel {
// cancel previous jobs in the same concurrency group
previousJobs, err := db.Find[ActionRunJob](ctx, &FindRunJobOptions{
RepoID: job.RepoID,
ConcurrencyGroup: job.ConcurrencyGroup,
Statuses: []Status{StatusRunning, StatusWaiting, StatusBlocked},
})
if err != nil {
return cancelledJobs, fmt.Errorf("find previous jobs: %w", err)
}
previousJobs = slices.DeleteFunc(previousJobs, func(j *ActionRunJob) bool { return j.ID == job.ID })
cjs, err := CancelJobs(ctx, previousJobs)
if err != nil {
return cancelledJobs, fmt.Errorf("cancel previous jobs: %w", err)
}
cancelledJobs = append(cancelledJobs, cjs...)
}
if job.ConcurrencyGroup != "" {
statusFindOption := []Status{StatusWaiting, StatusBlocked}
if job.ConcurrencyCancel {
statusFindOption = append(statusFindOption, StatusRunning)
}
// cancel previous jobs in the same concurrency group
previousJobs, err := db.Find[ActionRunJob](ctx, &FindRunJobOptions{
RepoID: job.RepoID,
ConcurrencyGroup: job.ConcurrencyGroup,
Statuses: statusFindOption,
})
if err != nil {
return cancelledJobs, fmt.Errorf("find previous jobs: %w", err)
}
previousJobs = slices.DeleteFunc(previousJobs, func(j *ActionRunJob) bool { return j.ID == job.ID })
cjs, err := CancelJobs(ctx, previousJobs)
if err != nil {
return cancelledJobs, fmt.Errorf("cancel previous jobs: %w", err)
}
cancelledJobs = append(cancelledJobs, cjs...)
}
return cancelledJobs, nil

View File

@ -443,12 +443,13 @@ func Rerun(ctx *context_module.Context) {
}
if blockRunByConcurrency {
run.Status = actions_model.StatusBlocked
} else if err := actions_service.CancelJobsByRunConcurrency(ctx, run); err != nil {
ctx.ServerError("cancel jobs", err)
return
} else {
run.Status = actions_model.StatusRunning
}
if err := actions_service.CancelJobsByRunConcurrency(ctx, run); err != nil {
ctx.ServerError("cancel jobs", err)
return
}
if err := actions_model.UpdateRun(ctx, run, "started", "stopped", "previous_duration", "status"); err != nil {
ctx.ServerError("UpdateRun", err)
return
@ -520,7 +521,8 @@ func rerunJob(ctx *context_module.Context, job *actions_model.ActionRunJob, shou
}
if blockByConcurrency {
job.Status = actions_model.StatusBlocked
} else if err := actions_service.CancelJobsByJobConcurrency(ctx, job); err != nil {
}
if err := actions_service.CancelJobsByJobConcurrency(ctx, job); err != nil {
return fmt.Errorf("cancel jobs: %w", err)
}
}

View File

@ -285,16 +285,13 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model
}
}
if allDone {
// check concurrency
blockedByJobConcurrency, err := checkConcurrencyForJobWithNeeds(ctx, r.jobMap[id], r.vars)
if err != nil {
// evaluate concurrency
if err := evaluateConcurrencyForJobWithNeeds(ctx, r.jobMap[id], r.vars); err != nil {
log.Error("Check job %d concurrency: %v. This job will stay blocked.", id, err)
continue
}
if blockedByJobConcurrency {
continue
} else if err := CancelJobsByJobConcurrency(ctx, r.jobMap[id]); err != nil {
if err := CancelJobsByJobConcurrency(ctx, r.jobMap[id]); err != nil {
log.Error("Cancel previous jobs for job %d: %v", id, err)
}
@ -322,18 +319,18 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model
return ret
}
func checkConcurrencyForJobWithNeeds(ctx context.Context, actionRunJob *actions_model.ActionRunJob, vars map[string]string) (bool, error) {
func evaluateConcurrencyForJobWithNeeds(ctx context.Context, actionRunJob *actions_model.ActionRunJob, vars map[string]string) error {
if actionRunJob.RawConcurrencyGroup == "" {
return false, nil
return nil
}
if err := actionRunJob.LoadAttributes(ctx); err != nil {
return false, err
return err
}
if !actionRunJob.IsConcurrencyEvaluated {
taskNeeds, err := FindTaskNeeds(ctx, actionRunJob)
if err != nil {
return false, fmt.Errorf("find task needs: %w", err)
return fmt.Errorf("find task needs: %w", err)
}
jobResults := make(map[string]*jobparser.JobResult, len(taskNeeds))
for jobID, taskNeed := range taskNeeds {
@ -346,14 +343,14 @@ func checkConcurrencyForJobWithNeeds(ctx context.Context, actionRunJob *actions_
actionRunJob.ConcurrencyGroup, actionRunJob.ConcurrencyCancel, err = EvaluateJobConcurrency(ctx, actionRunJob.Run, actionRunJob, vars, jobResults)
if err != nil {
return false, fmt.Errorf("evaluate job concurrency: %w", err)
return fmt.Errorf("evaluate job concurrency: %w", err)
}
actionRunJob.IsConcurrencyEvaluated = true
if _, err := actions_model.UpdateRunJob(ctx, actionRunJob, nil, "concurrency_group", "concurrency_cancel", "is_concurrency_evaluated"); err != nil {
return false, fmt.Errorf("update run job: %w", err)
return fmt.Errorf("update run job: %w", err)
}
}
return actions_model.ShouldBlockJobByConcurrency(ctx, actionRunJob)
return nil
}

View File

@ -38,7 +38,8 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar
}
if blockRunByConcurrency {
run.Status = actions_model.StatusBlocked
} else if err := CancelJobsByRunConcurrency(ctx, run); err != nil {
}
if err := CancelJobsByRunConcurrency(ctx, run); err != nil {
return fmt.Errorf("cancel jobs: %w", err)
}
@ -98,14 +99,17 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar
if job.RawConcurrency != nil && job.RawConcurrency.Group != "" {
runJob.RawConcurrencyGroup = job.RawConcurrency.Group
runJob.RawConcurrencyCancel = job.RawConcurrency.CancelInProgress
// we do not need to evaluate job concurrency if the job is blocked because it will be checked by job emitter
if runJob.Status != actions_model.StatusBlocked {
// do not evaluate job concurrency when it requires `needs`
if len(needs) == 0 {
var err error
runJob.ConcurrencyGroup, runJob.ConcurrencyCancel, err = EvaluateJobConcurrency(ctx, run, runJob, vars, nil)
if err != nil {
return fmt.Errorf("evaluate job concurrency: %w", err)
}
runJob.IsConcurrencyEvaluated = true
}
// do not need to check job concurrency if the job is blocked because it will be checked by job emitter
if runJob.Status != actions_model.StatusBlocked {
// check if the job should be blocked by job concurrency
blockByConcurrency, err := actions_model.ShouldBlockJobByConcurrency(ctx, runJob)
if err != nil {
@ -113,7 +117,8 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar
}
if blockByConcurrency {
runJob.Status = actions_model.StatusBlocked
} else if err := CancelJobsByJobConcurrency(ctx, runJob); err != nil {
}
if err := CancelJobsByJobConcurrency(ctx, runJob); err != nil {
return fmt.Errorf("cancel jobs: %w", err)
}
}

View File

@ -1030,7 +1030,7 @@ jobs:
concurrency:
group: job-group-1
steps:
- run: echo 'wf2-job2'
- run: echo 'wf2-job1'
wf2-job2:
runs-on: runner2
concurrency:
@ -1073,70 +1073,89 @@ jobs:
- run: echo 'wf4-job1'
`
// push workflow 1, 2 and 3
// push workflow 1
opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf1TreePath, wf1FileContent)
createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1)
opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf2TreePath, wf2FileContent)
createWorkflowFile(t, token, user2.Name, repo.Name, wf2TreePath, opts2)
opts3 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf3TreePath, wf3FileContent)
createWorkflowFile(t, token, user2.Name, repo.Name, wf3TreePath, opts3)
// fetch wf1-job1 and wf1-job2
w1j1Task := runner1.fetchTask(t)
w1j2Task := runner2.fetchTask(t)
// cannot fetch wf2-job1 and wf2-job2 because workflow-2 is blocked by workflow-1's concurrency group "workflow-group-1"
// cannot fetch wf3-job1 because it is blocked by wf1-job1's concurrency group "job-group-1"
runner1.fetchNoTask(t)
runner2.fetchNoTask(t)
_, w1j1Job, w1Run := getTaskAndJobAndRunByTaskID(t, w1j1Task.Id)
assert.Equal(t, "job-group-1", w1j1Job.ConcurrencyGroup)
assert.Equal(t, "workflow-group-1", w1Run.ConcurrencyGroup)
assert.Equal(t, "concurrent-workflow-1.yml", w1Run.WorkflowID)
assert.Equal(t, actions_model.StatusRunning, w1j1Job.Status)
_, w1j2Job, _ := getTaskAndJobAndRunByTaskID(t, w1j2Task.Id)
assert.Equal(t, "job-group-2", w1j2Job.ConcurrencyGroup)
assert.Equal(t, actions_model.StatusRunning, w1j2Job.Status)
// push workflow 2
opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf2TreePath, wf2FileContent)
createWorkflowFile(t, token, user2.Name, repo.Name, wf2TreePath, opts2)
// cannot fetch wf2-job1 and wf2-job2 because workflow-2 is blocked by workflow-1's concurrency group "workflow-group-1"
runner1.fetchNoTask(t)
runner2.fetchNoTask(t)
// query wf2-job1 from db and check its status
w2Run := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{RepoID: repo.ID, WorkflowID: "concurrent-workflow-2.yml"})
w2j1Job := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{RunID: w2Run.ID, JobID: "wf2-job1"})
assert.Equal(t, actions_model.StatusBlocked, w2j1Job.Status)
// push workflow 3
opts3 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf3TreePath, wf3FileContent)
createWorkflowFile(t, token, user2.Name, repo.Name, wf3TreePath, opts3)
// cannot fetch wf3-job1 because it is blocked by wf1-job1's concurrency group "job-group-1"
runner1.fetchNoTask(t)
// query wf3-job1 from db and check its status
w3Run := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{RepoID: repo.ID, WorkflowID: "concurrent-workflow-3.yml"})
w3j1Job := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{RunID: w3Run.ID, JobID: "wf3-job1"})
assert.Equal(t, actions_model.StatusBlocked, w3j1Job.Status)
// wf2-job1 is cancelled by wf3-job1
w2j1Job = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{ID: w2j1Job.ID})
assert.Equal(t, actions_model.StatusCancelled, w2j1Job.Status)
// exec wf1-job1
runner1.execTask(t, w1j1Task, &mockTaskOutcome{
result: runnerv1.Result_RESULT_SUCCESS,
})
// fetch wf3-job1
assert.Equal(t, actions_model.StatusBlocked, w3j1Job.Status)
w3j1Task := runner1.fetchTask(t)
// cannot fetch wf2-job1 and wf2-job2 because workflow-2 is blocked by workflow-1's concurrency group "workflow-group-1"
runner1.fetchNoTask(t)
runner2.fetchNoTask(t)
_, w3j1Job, w3Run := getTaskAndJobAndRunByTaskID(t, w3j1Task.Id)
_, w3j1Job, w3Run = getTaskAndJobAndRunByTaskID(t, w3j1Task.Id)
assert.Equal(t, "job-group-1", w3j1Job.ConcurrencyGroup)
assert.Equal(t, "workflow-group-2", w3Run.ConcurrencyGroup)
assert.Equal(t, "concurrent-workflow-3.yml", w3Run.WorkflowID)
// push workflow-4
opts4 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf4TreePath, wf4FileContent)
createWorkflowFile(t, token, user2.Name, repo.Name, wf4TreePath, opts4)
// exec wf1-job2
runner2.execTask(t, w1j2Task, &mockTaskOutcome{
result: runnerv1.Result_RESULT_SUCCESS,
})
// wf2-job2
// fetch wf2-job2
w2j2Task := runner2.fetchTask(t)
// cannot fetch wf2-job1 because it is blocked by wf3-job1's concurrency group "job-group-1"
// cannot fetch wf4-job1 because it is blocked by workflow-3's concurrency group "workflow-group-2"
runner1.fetchNoTask(t)
runner2.fetchNoTask(t)
_, w2j2Job, w2Run := getTaskAndJobAndRunByTaskID(t, w2j2Task.Id)
assert.Equal(t, "job-group-2", w2j2Job.ConcurrencyGroup)
assert.Equal(t, "workflow-group-1", w2Run.ConcurrencyGroup)
assert.Equal(t, "concurrent-workflow-2.yml", w2Run.WorkflowID)
assert.Equal(t, actions_model.StatusRunning, w2j2Job.Status)
// push workflow-4
opts4 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf4TreePath, wf4FileContent)
createWorkflowFile(t, token, user2.Name, repo.Name, wf4TreePath, opts4)
// cannot fetch wf4-job1 because it is blocked by workflow-3's concurrency group "workflow-group-2"
runner2.fetchNoTask(t)
// exec wf3-job1
runner1.execTask(t, w3j1Task, &mockTaskOutcome{
result: runnerv1.Result_RESULT_SUCCESS,
})
// fetch wf2-job1
w2j1Task := runner1.fetchTask(t)
// fetch wf4-job1
w4j1Task := runner2.fetchTask(t)
// all tasks have been fetched
runner1.fetchNoTask(t)
runner2.fetchNoTask(t)
_, w2j1Job, _ := getTaskAndJobAndRunByTaskID(t, w2j1Task.Id)
assert.Equal(t, "job-group-1", w2j1Job.ConcurrencyGroup)
assert.Equal(t, actions_model.StatusRunning, w2j2Job.Status)
_, w2j2Job, w2Run = getTaskAndJobAndRunByTaskID(t, w2j2Task.Id)
// wf2-job2 is cancelled because wf4-job1's cancel-in-progress is true
assert.Equal(t, actions_model.StatusCancelled, w2j2Job.Status)