diff --git a/models/actions/run.go b/models/actions/run.go index ac73ec39ac3..a78e02359e7 100644 --- a/models/actions/run.go +++ b/models/actions/run.go @@ -415,34 +415,40 @@ func ShouldBlockRunByConcurrency(ctx context.Context, actionRun *ActionRun) (boo } func CancelPreviousJobsByRunConcurrency(ctx context.Context, actionRun *ActionRun) ([]*ActionRunJob, error) { + if actionRun.ConcurrencyGroup == "" { + return nil, nil + } + var cancelledJobs []*ActionRunJob - if actionRun.ConcurrencyGroup != "" && actionRun.ConcurrencyCancel { - // cancel previous runs in the same concurrency group - runs, err := db.Find[ActionRun](ctx, &FindRunOptions{ - RepoID: actionRun.RepoID, - ConcurrencyGroup: actionRun.ConcurrencyGroup, - Status: []Status{StatusRunning, StatusWaiting, StatusBlocked}, + statusFindOption := []Status{StatusWaiting, StatusBlocked} + if actionRun.ConcurrencyCancel { + statusFindOption = append(statusFindOption, StatusRunning) + } + // cancel previous runs in the same concurrency group + runs, err := db.Find[ActionRun](ctx, &FindRunOptions{ + RepoID: actionRun.RepoID, + ConcurrencyGroup: actionRun.ConcurrencyGroup, + Status: statusFindOption, + }) + if err != nil { + return cancelledJobs, fmt.Errorf("find runs: %w", err) + } + for _, run := range runs { + if run.ID == actionRun.ID { + continue + } + jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{ + RunID: run.ID, }) if err != nil { - return cancelledJobs, fmt.Errorf("find runs: %w", err) + return cancelledJobs, fmt.Errorf("find run %d jobs: %w", run.ID, err) } - for _, run := range runs { - if run.ID == actionRun.ID { - continue - } - jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{ - RunID: run.ID, - }) - if err != nil { - return cancelledJobs, fmt.Errorf("find run %d jobs: %w", run.ID, err) - } - cjs, err := CancelJobs(ctx, jobs) - if err != nil { - return cancelledJobs, fmt.Errorf("cancel run %d jobs: %w", run.ID, err) - } - cancelledJobs = append(cancelledJobs, cjs...) + cjs, err := CancelJobs(ctx, jobs) + if err != nil { + return cancelledJobs, fmt.Errorf("cancel run %d jobs: %w", run.ID, err) } + cancelledJobs = append(cancelledJobs, cjs...) } return cancelledJobs, nil diff --git a/models/actions/run_job.go b/models/actions/run_job.go index 5e91b7585e8..4dd736aa652 100644 --- a/models/actions/run_job.go +++ b/models/actions/run_job.go @@ -239,32 +239,38 @@ func ShouldBlockJobByConcurrency(ctx context.Context, job *ActionRunJob) (bool, } func CancelPreviousJobsByJobConcurrency(ctx context.Context, job *ActionRunJob) ([]*ActionRunJob, error) { + if job.RawConcurrencyGroup == "" { + return nil, nil + } + var cancelledJobs []*ActionRunJob - if job.RawConcurrencyGroup != "" { - if !job.IsConcurrencyEvaluated { - return cancelledJobs, ErrUnevaluatedConcurrency{ - Group: job.RawConcurrencyGroup, - CancelInProgress: job.RawConcurrencyCancel, - } + if !job.IsConcurrencyEvaluated { + return cancelledJobs, ErrUnevaluatedConcurrency{ + Group: job.RawConcurrencyGroup, + CancelInProgress: job.RawConcurrencyCancel, } - if job.ConcurrencyGroup != "" && job.ConcurrencyCancel { - // cancel previous jobs in the same concurrency group - previousJobs, err := db.Find[ActionRunJob](ctx, &FindRunJobOptions{ - RepoID: job.RepoID, - ConcurrencyGroup: job.ConcurrencyGroup, - Statuses: []Status{StatusRunning, StatusWaiting, StatusBlocked}, - }) - if err != nil { - return cancelledJobs, fmt.Errorf("find previous jobs: %w", err) - } - previousJobs = slices.DeleteFunc(previousJobs, func(j *ActionRunJob) bool { return j.ID == job.ID }) - cjs, err := CancelJobs(ctx, previousJobs) - if err != nil { - return cancelledJobs, fmt.Errorf("cancel previous jobs: %w", err) - } - cancelledJobs = append(cancelledJobs, cjs...) + } + if job.ConcurrencyGroup != "" { + statusFindOption := []Status{StatusWaiting, StatusBlocked} + if job.ConcurrencyCancel { + statusFindOption = append(statusFindOption, StatusRunning) } + // cancel previous jobs in the same concurrency group + previousJobs, err := db.Find[ActionRunJob](ctx, &FindRunJobOptions{ + RepoID: job.RepoID, + ConcurrencyGroup: job.ConcurrencyGroup, + Statuses: statusFindOption, + }) + if err != nil { + return cancelledJobs, fmt.Errorf("find previous jobs: %w", err) + } + previousJobs = slices.DeleteFunc(previousJobs, func(j *ActionRunJob) bool { return j.ID == job.ID }) + cjs, err := CancelJobs(ctx, previousJobs) + if err != nil { + return cancelledJobs, fmt.Errorf("cancel previous jobs: %w", err) + } + cancelledJobs = append(cancelledJobs, cjs...) } return cancelledJobs, nil diff --git a/routers/web/repo/actions/view.go b/routers/web/repo/actions/view.go index 2cb9fdb3aa6..0ac1e0ae693 100644 --- a/routers/web/repo/actions/view.go +++ b/routers/web/repo/actions/view.go @@ -443,12 +443,13 @@ func Rerun(ctx *context_module.Context) { } if blockRunByConcurrency { run.Status = actions_model.StatusBlocked - } else if err := actions_service.CancelJobsByRunConcurrency(ctx, run); err != nil { - ctx.ServerError("cancel jobs", err) - return } else { run.Status = actions_model.StatusRunning } + if err := actions_service.CancelJobsByRunConcurrency(ctx, run); err != nil { + ctx.ServerError("cancel jobs", err) + return + } if err := actions_model.UpdateRun(ctx, run, "started", "stopped", "previous_duration", "status"); err != nil { ctx.ServerError("UpdateRun", err) return @@ -520,7 +521,8 @@ func rerunJob(ctx *context_module.Context, job *actions_model.ActionRunJob, shou } if blockByConcurrency { job.Status = actions_model.StatusBlocked - } else if err := actions_service.CancelJobsByJobConcurrency(ctx, job); err != nil { + } + if err := actions_service.CancelJobsByJobConcurrency(ctx, job); err != nil { return fmt.Errorf("cancel jobs: %w", err) } } diff --git a/services/actions/job_emitter.go b/services/actions/job_emitter.go index 46211d57ff5..2f5050d0874 100644 --- a/services/actions/job_emitter.go +++ b/services/actions/job_emitter.go @@ -285,16 +285,13 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model } } if allDone { - // check concurrency - blockedByJobConcurrency, err := checkConcurrencyForJobWithNeeds(ctx, r.jobMap[id], r.vars) - if err != nil { + // evaluate concurrency + if err := evaluateConcurrencyForJobWithNeeds(ctx, r.jobMap[id], r.vars); err != nil { log.Error("Check job %d concurrency: %v. This job will stay blocked.", id, err) continue } - if blockedByJobConcurrency { - continue - } else if err := CancelJobsByJobConcurrency(ctx, r.jobMap[id]); err != nil { + if err := CancelJobsByJobConcurrency(ctx, r.jobMap[id]); err != nil { log.Error("Cancel previous jobs for job %d: %v", id, err) } @@ -322,18 +319,18 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model return ret } -func checkConcurrencyForJobWithNeeds(ctx context.Context, actionRunJob *actions_model.ActionRunJob, vars map[string]string) (bool, error) { +func evaluateConcurrencyForJobWithNeeds(ctx context.Context, actionRunJob *actions_model.ActionRunJob, vars map[string]string) error { if actionRunJob.RawConcurrencyGroup == "" { - return false, nil + return nil } if err := actionRunJob.LoadAttributes(ctx); err != nil { - return false, err + return err } if !actionRunJob.IsConcurrencyEvaluated { taskNeeds, err := FindTaskNeeds(ctx, actionRunJob) if err != nil { - return false, fmt.Errorf("find task needs: %w", err) + return fmt.Errorf("find task needs: %w", err) } jobResults := make(map[string]*jobparser.JobResult, len(taskNeeds)) for jobID, taskNeed := range taskNeeds { @@ -346,14 +343,14 @@ func checkConcurrencyForJobWithNeeds(ctx context.Context, actionRunJob *actions_ actionRunJob.ConcurrencyGroup, actionRunJob.ConcurrencyCancel, err = EvaluateJobConcurrency(ctx, actionRunJob.Run, actionRunJob, vars, jobResults) if err != nil { - return false, fmt.Errorf("evaluate job concurrency: %w", err) + return fmt.Errorf("evaluate job concurrency: %w", err) } actionRunJob.IsConcurrencyEvaluated = true if _, err := actions_model.UpdateRunJob(ctx, actionRunJob, nil, "concurrency_group", "concurrency_cancel", "is_concurrency_evaluated"); err != nil { - return false, fmt.Errorf("update run job: %w", err) + return fmt.Errorf("update run job: %w", err) } } - return actions_model.ShouldBlockJobByConcurrency(ctx, actionRunJob) + return nil } diff --git a/services/actions/run.go b/services/actions/run.go index 8006cf715c4..db09a0ff00b 100644 --- a/services/actions/run.go +++ b/services/actions/run.go @@ -38,7 +38,8 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar } if blockRunByConcurrency { run.Status = actions_model.StatusBlocked - } else if err := CancelJobsByRunConcurrency(ctx, run); err != nil { + } + if err := CancelJobsByRunConcurrency(ctx, run); err != nil { return fmt.Errorf("cancel jobs: %w", err) } @@ -98,14 +99,17 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar if job.RawConcurrency != nil && job.RawConcurrency.Group != "" { runJob.RawConcurrencyGroup = job.RawConcurrency.Group runJob.RawConcurrencyCancel = job.RawConcurrency.CancelInProgress - // we do not need to evaluate job concurrency if the job is blocked because it will be checked by job emitter - if runJob.Status != actions_model.StatusBlocked { + // do not evaluate job concurrency when it requires `needs` + if len(needs) == 0 { var err error runJob.ConcurrencyGroup, runJob.ConcurrencyCancel, err = EvaluateJobConcurrency(ctx, run, runJob, vars, nil) if err != nil { return fmt.Errorf("evaluate job concurrency: %w", err) } runJob.IsConcurrencyEvaluated = true + } + // do not need to check job concurrency if the job is blocked because it will be checked by job emitter + if runJob.Status != actions_model.StatusBlocked { // check if the job should be blocked by job concurrency blockByConcurrency, err := actions_model.ShouldBlockJobByConcurrency(ctx, runJob) if err != nil { @@ -113,7 +117,8 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar } if blockByConcurrency { runJob.Status = actions_model.StatusBlocked - } else if err := CancelJobsByJobConcurrency(ctx, runJob); err != nil { + } + if err := CancelJobsByJobConcurrency(ctx, runJob); err != nil { return fmt.Errorf("cancel jobs: %w", err) } } diff --git a/tests/integration/actions_concurrency_test.go b/tests/integration/actions_concurrency_test.go index 475ed612c7e..8b2c3ce9f56 100644 --- a/tests/integration/actions_concurrency_test.go +++ b/tests/integration/actions_concurrency_test.go @@ -1030,7 +1030,7 @@ jobs: concurrency: group: job-group-1 steps: - - run: echo 'wf2-job2' + - run: echo 'wf2-job1' wf2-job2: runs-on: runner2 concurrency: @@ -1073,70 +1073,89 @@ jobs: - run: echo 'wf4-job1' ` - // push workflow 1, 2 and 3 + // push workflow 1 opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf1TreePath, wf1FileContent) createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1) - opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf2TreePath, wf2FileContent) - createWorkflowFile(t, token, user2.Name, repo.Name, wf2TreePath, opts2) - opts3 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf3TreePath, wf3FileContent) - createWorkflowFile(t, token, user2.Name, repo.Name, wf3TreePath, opts3) + // fetch wf1-job1 and wf1-job2 w1j1Task := runner1.fetchTask(t) w1j2Task := runner2.fetchTask(t) - // cannot fetch wf2-job1 and wf2-job2 because workflow-2 is blocked by workflow-1's concurrency group "workflow-group-1" - // cannot fetch wf3-job1 because it is blocked by wf1-job1's concurrency group "job-group-1" - runner1.fetchNoTask(t) - runner2.fetchNoTask(t) _, w1j1Job, w1Run := getTaskAndJobAndRunByTaskID(t, w1j1Task.Id) assert.Equal(t, "job-group-1", w1j1Job.ConcurrencyGroup) assert.Equal(t, "workflow-group-1", w1Run.ConcurrencyGroup) assert.Equal(t, "concurrent-workflow-1.yml", w1Run.WorkflowID) + assert.Equal(t, actions_model.StatusRunning, w1j1Job.Status) _, w1j2Job, _ := getTaskAndJobAndRunByTaskID(t, w1j2Task.Id) assert.Equal(t, "job-group-2", w1j2Job.ConcurrencyGroup) + assert.Equal(t, actions_model.StatusRunning, w1j2Job.Status) + + // push workflow 2 + opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf2TreePath, wf2FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf2TreePath, opts2) + // cannot fetch wf2-job1 and wf2-job2 because workflow-2 is blocked by workflow-1's concurrency group "workflow-group-1" + runner1.fetchNoTask(t) + runner2.fetchNoTask(t) + // query wf2-job1 from db and check its status + w2Run := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{RepoID: repo.ID, WorkflowID: "concurrent-workflow-2.yml"}) + w2j1Job := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{RunID: w2Run.ID, JobID: "wf2-job1"}) + assert.Equal(t, actions_model.StatusBlocked, w2j1Job.Status) + + // push workflow 3 + opts3 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf3TreePath, wf3FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf3TreePath, opts3) + // cannot fetch wf3-job1 because it is blocked by wf1-job1's concurrency group "job-group-1" + runner1.fetchNoTask(t) + // query wf3-job1 from db and check its status + w3Run := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{RepoID: repo.ID, WorkflowID: "concurrent-workflow-3.yml"}) + w3j1Job := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{RunID: w3Run.ID, JobID: "wf3-job1"}) + assert.Equal(t, actions_model.StatusBlocked, w3j1Job.Status) + // wf2-job1 is cancelled by wf3-job1 + w2j1Job = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{ID: w2j1Job.ID}) + assert.Equal(t, actions_model.StatusCancelled, w2j1Job.Status) + // exec wf1-job1 runner1.execTask(t, w1j1Task, &mockTaskOutcome{ result: runnerv1.Result_RESULT_SUCCESS, }) + // fetch wf3-job1 + assert.Equal(t, actions_model.StatusBlocked, w3j1Job.Status) w3j1Task := runner1.fetchTask(t) - // cannot fetch wf2-job1 and wf2-job2 because workflow-2 is blocked by workflow-1's concurrency group "workflow-group-1" - runner1.fetchNoTask(t) - runner2.fetchNoTask(t) - _, w3j1Job, w3Run := getTaskAndJobAndRunByTaskID(t, w3j1Task.Id) + _, w3j1Job, w3Run = getTaskAndJobAndRunByTaskID(t, w3j1Task.Id) assert.Equal(t, "job-group-1", w3j1Job.ConcurrencyGroup) assert.Equal(t, "workflow-group-2", w3Run.ConcurrencyGroup) assert.Equal(t, "concurrent-workflow-3.yml", w3Run.WorkflowID) - // push workflow-4 - opts4 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf4TreePath, wf4FileContent) - createWorkflowFile(t, token, user2.Name, repo.Name, wf4TreePath, opts4) + // exec wf1-job2 runner2.execTask(t, w1j2Task, &mockTaskOutcome{ result: runnerv1.Result_RESULT_SUCCESS, }) - // wf2-job2 + + // fetch wf2-job2 w2j2Task := runner2.fetchTask(t) - // cannot fetch wf2-job1 because it is blocked by wf3-job1's concurrency group "job-group-1" - // cannot fetch wf4-job1 because it is blocked by workflow-3's concurrency group "workflow-group-2" - runner1.fetchNoTask(t) - runner2.fetchNoTask(t) _, w2j2Job, w2Run := getTaskAndJobAndRunByTaskID(t, w2j2Task.Id) assert.Equal(t, "job-group-2", w2j2Job.ConcurrencyGroup) assert.Equal(t, "workflow-group-1", w2Run.ConcurrencyGroup) assert.Equal(t, "concurrent-workflow-2.yml", w2Run.WorkflowID) + assert.Equal(t, actions_model.StatusRunning, w2j2Job.Status) + + // push workflow-4 + opts4 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf4TreePath, wf4FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf4TreePath, opts4) + // cannot fetch wf4-job1 because it is blocked by workflow-3's concurrency group "workflow-group-2" + runner2.fetchNoTask(t) + // exec wf3-job1 runner1.execTask(t, w3j1Task, &mockTaskOutcome{ result: runnerv1.Result_RESULT_SUCCESS, }) - // fetch wf2-job1 - w2j1Task := runner1.fetchTask(t) + // fetch wf4-job1 w4j1Task := runner2.fetchTask(t) // all tasks have been fetched runner1.fetchNoTask(t) runner2.fetchNoTask(t) - _, w2j1Job, _ := getTaskAndJobAndRunByTaskID(t, w2j1Task.Id) - assert.Equal(t, "job-group-1", w2j1Job.ConcurrencyGroup) - assert.Equal(t, actions_model.StatusRunning, w2j2Job.Status) + _, w2j2Job, w2Run = getTaskAndJobAndRunByTaskID(t, w2j2Task.Id) // wf2-job2 is cancelled because wf4-job1's cancel-in-progress is true assert.Equal(t, actions_model.StatusCancelled, w2j2Job.Status)