From ca186724ddd97b10fd9ffba9a4cef7307006ccef Mon Sep 17 00:00:00 2001 From: Zettat123 Date: Wed, 23 Jul 2025 11:37:51 -0600 Subject: [PATCH] modify cancel-in-progress behavior --- models/actions/run.go | 55 +++--- models/actions/run_job.go | 67 ++++--- services/actions/job_emitter.go | 165 ++++++++---------- tests/integration/actions_concurrency_test.go | 100 +++++++++++ 4 files changed, 240 insertions(+), 147 deletions(-) diff --git a/models/actions/run.go b/models/actions/run.go index b088952fef2..dc69d5c6d01 100644 --- a/models/actions/run.go +++ b/models/actions/run.go @@ -401,17 +401,34 @@ func ShouldBlockRunByConcurrency(ctx context.Context, actionRun *ActionRun) (boo return false, nil } - concurrentRuns, err := db.Find[ActionRun](ctx, &FindRunOptions{ - RepoID: actionRun.RepoID, - ConcurrencyGroup: actionRun.ConcurrencyGroup, - Status: []Status{StatusRunning}, + runs, jobs, err := GetConcurrentRunsAndJobs(ctx, actionRun.RepoID, actionRun.ConcurrencyGroup, []Status{StatusRunning}) + if err != nil { + return false, fmt.Errorf("find concurrent runs and jobs: %w", err) + } + + return len(runs) > 0 || len(jobs) > 0, nil +} + +func GetConcurrentRunsAndJobs(ctx context.Context, repoID int64, concurrencyGroup string, status []Status) ([]*ActionRun, []*ActionRunJob, error) { + runs, err := db.Find[ActionRun](ctx, &FindRunOptions{ + RepoID: repoID, + ConcurrencyGroup: concurrencyGroup, + Status: status, }) if err != nil { - return false, fmt.Errorf("find running and waiting runs: %w", err) + return nil, nil, fmt.Errorf("find runs: %w", err) } - previousRuns := slices.DeleteFunc(concurrentRuns, func(r *ActionRun) bool { return r.ID == actionRun.ID }) - return len(previousRuns) > 0, nil + jobs, err := db.Find[ActionRunJob](ctx, &FindRunJobOptions{ + RepoID: repoID, + ConcurrencyGroup: concurrencyGroup, + Statuses: status, + }) + if err != nil { + return nil, nil, fmt.Errorf("find jobs: %w", err) + } + + return runs, jobs, nil } func CancelPreviousJobsByRunConcurrency(ctx context.Context, actionRun *ActionRun) ([]*ActionRunJob, error) { @@ -419,21 +436,19 @@ func CancelPreviousJobsByRunConcurrency(ctx context.Context, actionRun *ActionRu return nil, nil } - var cancelledJobs []*ActionRunJob + var jobsToCancel []*ActionRunJob statusFindOption := []Status{StatusWaiting, StatusBlocked} if actionRun.ConcurrencyCancel { statusFindOption = append(statusFindOption, StatusRunning) } - // cancel previous runs in the same concurrency group - runs, err := db.Find[ActionRun](ctx, &FindRunOptions{ - RepoID: actionRun.RepoID, - ConcurrencyGroup: actionRun.ConcurrencyGroup, - Status: statusFindOption, - }) + runs, jobs, err := GetConcurrentRunsAndJobs(ctx, actionRun.RepoID, actionRun.ConcurrencyGroup, statusFindOption) if err != nil { - return cancelledJobs, fmt.Errorf("find runs: %w", err) + return nil, fmt.Errorf("find concurrent runs and jobs: %w", err) } + jobsToCancel = append(jobsToCancel, jobs...) + + // cancel runs in the same concurrency group for _, run := range runs { if run.ID == actionRun.ID { continue @@ -442,14 +457,10 @@ func CancelPreviousJobsByRunConcurrency(ctx context.Context, actionRun *ActionRu RunID: run.ID, }) if err != nil { - return cancelledJobs, fmt.Errorf("find run %d jobs: %w", run.ID, err) + return nil, fmt.Errorf("find run %d jobs: %w", run.ID, err) } - cjs, err := CancelJobs(ctx, jobs) - if err != nil { - return cancelledJobs, fmt.Errorf("cancel run %d jobs: %w", run.ID, err) - } - cancelledJobs = append(cancelledJobs, cjs...) + jobsToCancel = append(jobsToCancel, jobs...) } - return cancelledJobs, nil + return CancelJobs(ctx, jobsToCancel) } diff --git a/models/actions/run_job.go b/models/actions/run_job.go index 353d3c1e8ed..5497d6293e4 100644 --- a/models/actions/run_job.go +++ b/models/actions/run_job.go @@ -219,23 +219,12 @@ func ShouldBlockJobByConcurrency(ctx context.Context, job *ActionRunJob) (bool, return false, nil } - concurrentJobsNum, err := db.Count[ActionRunJob](ctx, FindRunJobOptions{ - RepoID: job.RepoID, - ConcurrencyGroup: job.ConcurrencyGroup, - Statuses: []Status{StatusRunning}, - }) + runs, jobs, err := GetConcurrentRunsAndJobs(ctx, job.RepoID, job.ConcurrencyGroup, []Status{StatusRunning}) if err != nil { - return false, fmt.Errorf("count running and waiting jobs: %w", err) - } - if concurrentJobsNum > 0 { - return true, nil + return false, fmt.Errorf("find concurrent runs and jobs: %w", err) } - if err := job.LoadRun(ctx); err != nil { - return false, fmt.Errorf("load run: %w", err) - } - - return ShouldBlockRunByConcurrency(ctx, job.Run) + return len(runs) > 0 || len(jobs) > 0, nil } func CancelPreviousJobsByJobConcurrency(ctx context.Context, job *ActionRunJob) ([]*ActionRunJob, error) { @@ -243,37 +232,41 @@ func CancelPreviousJobsByJobConcurrency(ctx context.Context, job *ActionRunJob) return nil, nil } - var cancelledJobs []*ActionRunJob + var jobsToCancel []*ActionRunJob if !job.IsConcurrencyEvaluated { - return cancelledJobs, ErrUnevaluatedConcurrency{ + return nil, ErrUnevaluatedConcurrency{ Group: job.RawConcurrencyGroup, CancelInProgress: job.RawConcurrencyCancel, } } - if job.ConcurrencyGroup != "" { - statusFindOption := []Status{StatusWaiting, StatusBlocked} - if job.ConcurrencyCancel { - statusFindOption = append(statusFindOption, StatusRunning) - } - // cancel previous jobs in the same concurrency group - previousJobs, err := db.Find[ActionRunJob](ctx, &FindRunJobOptions{ - RepoID: job.RepoID, - ConcurrencyGroup: job.ConcurrencyGroup, - Statuses: statusFindOption, - }) - if err != nil { - return cancelledJobs, fmt.Errorf("find previous jobs: %w", err) - } - previousJobs = slices.DeleteFunc(previousJobs, func(j *ActionRunJob) bool { return j.ID == job.ID }) - cjs, err := CancelJobs(ctx, previousJobs) - if err != nil { - return cancelledJobs, fmt.Errorf("cancel previous jobs: %w", err) - } - cancelledJobs = append(cancelledJobs, cjs...) + if job.ConcurrencyGroup == "" { + return nil, nil } - return cancelledJobs, nil + statusFindOption := []Status{StatusWaiting, StatusBlocked} + if job.ConcurrencyCancel { + statusFindOption = append(statusFindOption, StatusRunning) + } + runs, jobs, err := GetConcurrentRunsAndJobs(ctx, job.RepoID, job.ConcurrencyGroup, statusFindOption) + if err != nil { + return nil, fmt.Errorf("find concurrent runs and jobs: %w", err) + } + jobs = slices.DeleteFunc(jobs, func(j *ActionRunJob) bool { return j.ID == job.ID }) + jobsToCancel = append(jobsToCancel, jobs...) + + // cancel runs in the same concurrency group + for _, run := range runs { + jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{ + RunID: run.ID, + }) + if err != nil { + return nil, fmt.Errorf("find run %d jobs: %w", run.ID, err) + } + jobsToCancel = append(jobsToCancel, jobs...) + } + + return CancelJobs(ctx, jobsToCancel) } type ErrUnevaluatedConcurrency struct { diff --git a/services/actions/job_emitter.go b/services/actions/job_emitter.go index c294aef2dd2..c711dc1ea7b 100644 --- a/services/actions/job_emitter.go +++ b/services/actions/job_emitter.go @@ -62,95 +62,11 @@ func checkJobsByRunID(ctx context.Context, runID int64) error { jobs = append(jobs, js...) updatedJobs = append(updatedJobs, ujs...) } - // check run (workflow-level) concurrency - concurrentRunIDs := make(container.Set[int64]) - concurrentRunIDs.Add(run.ID) - if run.ConcurrencyGroup != "" { - concurrentRuns, err := db.Find[actions_model.ActionRun](ctx, actions_model.FindRunOptions{ - RepoID: run.RepoID, - ConcurrencyGroup: run.ConcurrencyGroup, - Status: []actions_model.Status{actions_model.StatusBlocked}, - }) - if err != nil { - return err - } - for _, concurrentRun := range concurrentRuns { - if concurrentRunIDs.Contains(concurrentRun.ID) { - continue - } - concurrentRunIDs.Add(concurrentRun.ID) - if concurrentRun.NeedApproval { - continue - } - if js, ujs, err := checkJobsOfRun(ctx, concurrentRun); err != nil { - return err - } else { - jobs = append(jobs, js...) - updatedJobs = append(updatedJobs, ujs...) - } - updatedRun, err := actions_model.GetRunByID(ctx, concurrentRun.ID) - if err != nil { - return err - } - if updatedRun.Status == actions_model.StatusWaiting { - // only run one blocked action run in the same concurrency group - break - } - } - } - - // check job concurrency - runJobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID}) - if err != nil { + if js, ujs, err := checkRunConcurrency(ctx, run); err != nil { return err - } - for _, job := range runJobs { - if job.Status.IsDone() && job.ConcurrencyGroup != "" { - waitingConcurrentJobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{ - RepoID: job.RepoID, - ConcurrencyGroup: job.ConcurrencyGroup, - Statuses: []actions_model.Status{actions_model.StatusWaiting}, - }) - if err != nil { - return err - } - if len(waitingConcurrentJobs) == 0 { - blockedConcurrentJobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{ - RepoID: job.RepoID, - ConcurrencyGroup: job.ConcurrencyGroup, - Statuses: []actions_model.Status{actions_model.StatusBlocked}, - }) - if err != nil { - return err - } - for _, concurrentJob := range blockedConcurrentJobs { - if concurrentRunIDs.Contains(concurrentJob.RunID) { - continue - } - concurrentRunIDs.Add(concurrentJob.RunID) - concurrentRun, err := actions_model.GetRunByID(ctx, concurrentJob.RunID) - if err != nil { - return err - } - if concurrentRun.NeedApproval { - continue - } - if js, ujs, err := checkJobsOfRun(ctx, concurrentRun); err != nil { - return err - } else { - jobs = append(jobs, js...) - updatedJobs = append(updatedJobs, ujs...) - } - updatedJob, err := actions_model.GetRunJobByID(ctx, concurrentJob.ID) - if err != nil { - return err - } - if updatedJob.Status == actions_model.StatusWaiting { - break - } - } - } - } + } else { + jobs = append(jobs, js...) + updatedJobs = append(updatedJobs, ujs...) } return nil }); err != nil { @@ -176,6 +92,79 @@ func checkJobsByRunID(ctx context.Context, runID int64) error { return nil } +func findBlockedRunByConcurrency(ctx context.Context, repoID int64, concurrencyGroup string) (*actions_model.ActionRun, bool, error) { + if concurrencyGroup == "" { + return nil, false, nil + } + cRuns, cJobs, err := actions_model.GetConcurrentRunsAndJobs(ctx, repoID, concurrencyGroup, []actions_model.Status{actions_model.StatusBlocked}) + if err != nil { + return nil, false, fmt.Errorf("find concurrent runs and jobs: %w", err) + } + + // There can be at most one blocked run or job + var concurrentRun *actions_model.ActionRun + if len(cRuns) > 0 { + concurrentRun = cRuns[0] + } else if len(cJobs) > 0 { + jobRun, err := actions_model.GetRunByID(ctx, cJobs[0].RunID) + if err != nil { + return nil, false, fmt.Errorf("get run by job %d: %w", cJobs[0].ID, err) + } + concurrentRun = jobRun + } + + return concurrentRun, concurrentRun != nil, nil +} + +func checkRunConcurrency(ctx context.Context, run *actions_model.ActionRun) (jobs, updatedJobs []*actions_model.ActionRunJob, err error) { + checkedConcurrencyGroup := make(container.Set[string]) + + // check run (workflow-level) concurrency + if run.ConcurrencyGroup != "" { + concurrentRun, found, err := findBlockedRunByConcurrency(ctx, run.RepoID, run.ConcurrencyGroup) + if err != nil { + return nil, nil, fmt.Errorf("find blocked run by concurrency: %w", err) + } + if found && !concurrentRun.NeedApproval { + if js, ujs, err := checkJobsOfRun(ctx, concurrentRun); err != nil { + return nil, nil, err + } else { + jobs = append(jobs, js...) + updatedJobs = append(updatedJobs, ujs...) + } + } + checkedConcurrencyGroup.Add(run.ConcurrencyGroup) + } + + // check job concurrency + runJobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID}) + if err != nil { + return nil, nil, fmt.Errorf("find run %d jobs: %w", run.ID, err) + } + for _, job := range runJobs { + if !job.Status.IsDone() { + continue + } + if job.ConcurrencyGroup == "" && checkedConcurrencyGroup.Contains(job.ConcurrencyGroup) { + continue + } + concurrentRun, found, err := findBlockedRunByConcurrency(ctx, job.RepoID, job.ConcurrencyGroup) + if err != nil { + return nil, nil, fmt.Errorf("find blocked run by concurrency: %w", err) + } + if found && !concurrentRun.NeedApproval { + if js, ujs, err := checkJobsOfRun(ctx, concurrentRun); err != nil { + return nil, nil, err + } else { + jobs = append(jobs, js...) + updatedJobs = append(updatedJobs, ujs...) + } + } + checkedConcurrencyGroup.Add(job.ConcurrencyGroup) + } + return +} + func checkJobsOfRun(ctx context.Context, run *actions_model.ActionRun) (jobs, updatedJobs []*actions_model.ActionRunJob, err error) { jobs, err = db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID}) if err != nil { diff --git a/tests/integration/actions_concurrency_test.go b/tests/integration/actions_concurrency_test.go index dd32db5cf62..c3a22cf8c15 100644 --- a/tests/integration/actions_concurrency_test.go +++ b/tests/integration/actions_concurrency_test.go @@ -1247,6 +1247,106 @@ jobs: }) } +func TestRunAndJobWithSameConcurrencyGroup(t *testing.T) { + onGiteaRun(t, func(t *testing.T, u *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + session := loginUser(t, user2.Name) + token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + apiRepo := createActionsTestRepo(t, token, "actions-concurrency", false) + repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiRepo.ID}) + httpContext := NewAPITestContext(t, user2.Name, repo.Name, auth_model.AccessTokenScopeWriteRepository) + defer doAPIDeleteRepository(httpContext)(t) + + runner := newMockRunner() + runner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner", []string{"ubuntu-latest"}, false) + + wf1TreePath := ".gitea/workflows/concurrent-workflow-1.yml" + wf1FileContent := `name: concurrent-workflow-1 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-1.yml' +jobs: + wf1-job: + runs-on: ubuntu-latest + concurrency: + group: test-group + steps: + - run: echo 'wf1-job' +` + wf2TreePath := ".gitea/workflows/concurrent-workflow-2.yml" + wf2FileContent := `name: concurrent-workflow-2 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-2.yml' +concurrency: + group: test-group +jobs: + wf2-job: + runs-on: ubuntu-latest + steps: + - run: echo 'wf2-job' +` + wf3TreePath := ".gitea/workflows/concurrent-workflow-3.yml" + wf3FileContent := `name: concurrent-workflow-3 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-3.yml' +jobs: + wf3-job: + runs-on: ubuntu-latest + concurrency: + group: test-group + cancel-in-progress: true + steps: + - run: echo 'wf3-job' +` + // push workflow1 + opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf1TreePath, wf1FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1) + // fetch run1 + task := runner.fetchTask(t) + _, job1, run1 := getTaskAndJobAndRunByTaskID(t, task.Id) + assert.Equal(t, "test-group", job1.ConcurrencyGroup) + assert.Equal(t, actions_model.StatusRunning, run1.Status) + + // push workflow2 + opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf2TreePath, wf2FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf2TreePath, opts2) + // cannot fetch run2 because run1 is still running + runner.fetchNoTask(t) + run2 := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{RepoID: repo.ID, WorkflowID: "concurrent-workflow-2.yml"}) + assert.Equal(t, "test-group", run2.ConcurrencyGroup) + assert.Equal(t, actions_model.StatusBlocked, run2.Status) + + // exec run1 + runner.execTask(t, task, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + + // fetch run2 + task2 := runner.fetchTask(t) + _, _, run2 = getTaskAndJobAndRunByTaskID(t, task2.Id) + assert.Equal(t, actions_model.StatusRunning, run2.Status) + + // push workflow3 + opts3 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf3TreePath, wf3FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf3TreePath, opts3) + // fetch run3 + task3 := runner.fetchTask(t) + _, job3, run3 := getTaskAndJobAndRunByTaskID(t, task3.Id) + assert.Equal(t, "test-group", job3.ConcurrencyGroup) + assert.Equal(t, actions_model.StatusRunning, run3.Status) + + // run2 should be cancelled by run3 + run2 = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: run2.ID}) + assert.Equal(t, actions_model.StatusCancelled, run2.Status) + }) +} + func getTaskAndJobAndRunByTaskID(t *testing.T, taskID int64) (*actions_model.ActionTask, *actions_model.ActionRunJob, *actions_model.ActionRun) { actionTask := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionTask{ID: taskID}) actionRunJob := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{ID: actionTask.JobID})