From 96d6938c808e769aa110702bc5492c7af8c200b3 Mon Sep 17 00:00:00 2001 From: Zettat123 Date: Tue, 22 Jul 2025 13:40:07 -0600 Subject: [PATCH] fix bugs and tests --- models/actions/run.go | 2 +- models/actions/run_job.go | 2 +- services/actions/job_emitter.go | 24 ++-- tests/integration/actions_concurrency_test.go | 105 +++++++++--------- 4 files changed, 69 insertions(+), 64 deletions(-) diff --git a/models/actions/run.go b/models/actions/run.go index a78e02359e7..b088952fef2 100644 --- a/models/actions/run.go +++ b/models/actions/run.go @@ -404,7 +404,7 @@ func ShouldBlockRunByConcurrency(ctx context.Context, actionRun *ActionRun) (boo concurrentRuns, err := db.Find[ActionRun](ctx, &FindRunOptions{ RepoID: actionRun.RepoID, ConcurrencyGroup: actionRun.ConcurrencyGroup, - Status: []Status{StatusWaiting, StatusRunning}, + Status: []Status{StatusRunning}, }) if err != nil { return false, fmt.Errorf("find running and waiting runs: %w", err) diff --git a/models/actions/run_job.go b/models/actions/run_job.go index 4dd736aa652..7cab156eb2d 100644 --- a/models/actions/run_job.go +++ b/models/actions/run_job.go @@ -222,7 +222,7 @@ func ShouldBlockJobByConcurrency(ctx context.Context, job *ActionRunJob) (bool, concurrentJobsNum, err := db.Count[ActionRunJob](ctx, FindRunJobOptions{ RepoID: job.RepoID, ConcurrencyGroup: job.ConcurrencyGroup, - Statuses: []Status{StatusRunning, StatusWaiting}, + Statuses: []Status{StatusRunning}, }) if err != nil { return false, fmt.Errorf("count running and waiting jobs: %w", err) diff --git a/services/actions/job_emitter.go b/services/actions/job_emitter.go index 2f5050d0874..c294aef2dd2 100644 --- a/services/actions/job_emitter.go +++ b/services/actions/job_emitter.go @@ -285,12 +285,17 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model } } if allDone { - // evaluate concurrency - if err := evaluateConcurrencyForJobWithNeeds(ctx, r.jobMap[id], r.vars); err != nil { + // check concurrency + blockedByJobConcurrency, err := checkConcurrencyForJobWithNeeds(ctx, r.jobMap[id], r.vars) + if err != nil { log.Error("Check job %d concurrency: %v. This job will stay blocked.", id, err) continue } + if blockedByJobConcurrency { + continue + } + if err := CancelJobsByJobConcurrency(ctx, r.jobMap[id]); err != nil { log.Error("Cancel previous jobs for job %d: %v", id, err) } @@ -304,7 +309,6 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model _, wfJob := wfJobs[0].Job() hasIf = len(wfJob.If.Value) > 0 } - if hasIf { // act_runner will check the "if" condition ret[id] = actions_model.StatusWaiting @@ -319,18 +323,18 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model return ret } -func evaluateConcurrencyForJobWithNeeds(ctx context.Context, actionRunJob *actions_model.ActionRunJob, vars map[string]string) error { +func checkConcurrencyForJobWithNeeds(ctx context.Context, actionRunJob *actions_model.ActionRunJob, vars map[string]string) (bool, error) { if actionRunJob.RawConcurrencyGroup == "" { - return nil + return false, nil } if err := actionRunJob.LoadAttributes(ctx); err != nil { - return err + return false, err } if !actionRunJob.IsConcurrencyEvaluated { taskNeeds, err := FindTaskNeeds(ctx, actionRunJob) if err != nil { - return fmt.Errorf("find task needs: %w", err) + return false, fmt.Errorf("find task needs: %w", err) } jobResults := make(map[string]*jobparser.JobResult, len(taskNeeds)) for jobID, taskNeed := range taskNeeds { @@ -343,14 +347,14 @@ func evaluateConcurrencyForJobWithNeeds(ctx context.Context, actionRunJob *actio actionRunJob.ConcurrencyGroup, actionRunJob.ConcurrencyCancel, err = EvaluateJobConcurrency(ctx, actionRunJob.Run, actionRunJob, vars, jobResults) if err != nil { - return fmt.Errorf("evaluate job concurrency: %w", err) + return false, fmt.Errorf("evaluate job concurrency: %w", err) } actionRunJob.IsConcurrencyEvaluated = true if _, err := actions_model.UpdateRunJob(ctx, actionRunJob, nil, "concurrency_group", "concurrency_cancel", "is_concurrency_evaluated"); err != nil { - return fmt.Errorf("update run job: %w", err) + return false, fmt.Errorf("update run job: %w", err) } } - return nil + return actions_model.ShouldBlockJobByConcurrency(ctx, actionRunJob) } diff --git a/tests/integration/actions_concurrency_test.go b/tests/integration/actions_concurrency_test.go index 8b2c3ce9f56..dd32db5cf62 100644 --- a/tests/integration/actions_concurrency_test.go +++ b/tests/integration/actions_concurrency_test.go @@ -8,7 +8,6 @@ import ( "fmt" "net/http" "net/url" - "slices" "testing" "time" @@ -91,14 +90,10 @@ jobs: steps: - run: echo 'job from workflow3' ` + // push workflow1 opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf1TreePath, wf1FileContent) createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1) - opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf2TreePath, wf2FileContent) - createWorkflowFile(t, token, user2.Name, repo.Name, wf2TreePath, opts2) - opts3 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf3TreePath, wf3FileContent) - createWorkflowFile(t, token, user2.Name, repo.Name, wf3TreePath, opts3) - - // fetch and exec workflow1, workflow2 and workflow3 are blocked + // fetch and exec workflow1 task := runner.fetchTask(t) _, _, run := getTaskAndJobAndRunByTaskID(t, task.Id) assert.Equal(t, "workflow-main-abc123-user2", run.ConcurrencyGroup) @@ -108,23 +103,30 @@ jobs: result: runnerv1.Result_RESULT_SUCCESS, }) - // fetch workflow2 or workflow3 - workflowNames := []string{"concurrent-workflow-2.yml", "concurrent-workflow-3.yml"} + // push workflow2 + opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf2TreePath, wf2FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf2TreePath, opts2) + // fetch workflow2 task = runner.fetchTask(t) _, _, run = getTaskAndJobAndRunByTaskID(t, task.Id) - assert.Contains(t, workflowNames, run.WorkflowID) - workflowNames = slices.DeleteFunc(workflowNames, func(wfn string) bool { return wfn == run.WorkflowID }) assert.Equal(t, "workflow-main-abc123-user2", run.ConcurrencyGroup) + assert.Equal(t, "concurrent-workflow-2.yml", run.WorkflowID) + + // push workflow3 + opts3 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf3TreePath, wf3FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf3TreePath, opts3) runner.fetchNoTask(t) + + // exec workflow2 runner.execTask(t, task, &mockTaskOutcome{ result: runnerv1.Result_RESULT_SUCCESS, }) - // fetch the last workflow (workflow2 or workflow3) + // fetch and exec workflow3 task = runner.fetchTask(t) _, _, run = getTaskAndJobAndRunByTaskID(t, task.Id) assert.Equal(t, "workflow-main-abc123-user2", run.ConcurrencyGroup) - assert.Equal(t, workflowNames[0], run.WorkflowID) + assert.Equal(t, "concurrent-workflow-3.yml", run.WorkflowID) runner.fetchNoTask(t) runner.execTask(t, task, &mockTaskOutcome{ result: runnerv1.Result_RESULT_SUCCESS, @@ -425,7 +427,7 @@ on: paths: - '.gitea/workflows/concurrent-workflow-1.yml' jobs: - job1: + wf1-job: runs-on: ${{ matrix.os }}-runner strategy: matrix: @@ -433,8 +435,17 @@ jobs: concurrency: group: job-os-${{ matrix.os }} steps: - - run: echo 'job1' - job2: + - run: echo 'wf1' +` + + wf2TreePath := ".gitea/workflows/concurrent-workflow-2.yml" + wf2FileContent := `name: concurrent-workflow-2 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-2.yml' +jobs: + wf2-job: runs-on: ${{ matrix.os }}-runner strategy: matrix: @@ -442,26 +453,30 @@ jobs: concurrency: group: job-os-${{ matrix.os }} steps: - - run: echo 'job2' + - run: echo 'wf2' ` - opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf1TreePath, wf1FileContent) + opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf1TreePath, wf1FileContent) createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1) job1WinTask := windowsRunner.fetchTask(t) job1LinuxTask := linuxRunner.fetchTask(t) windowsRunner.fetchNoTask(t) linuxRunner.fetchNoTask(t) - job2DarwinTask := darwinRunner.fetchTask(t) _, job1WinJob, _ := getTaskAndJobAndRunByTaskID(t, job1WinTask.Id) - assert.Equal(t, "job1 (windows)", job1WinJob.Name) + assert.Equal(t, "wf1-job (windows)", job1WinJob.Name) assert.Equal(t, "job-os-windows", job1WinJob.ConcurrencyGroup) _, job1LinuxJob, _ := getTaskAndJobAndRunByTaskID(t, job1LinuxTask.Id) - assert.Equal(t, "job1 (linux)", job1LinuxJob.Name) + assert.Equal(t, "wf1-job (linux)", job1LinuxJob.Name) assert.Equal(t, "job-os-linux", job1LinuxJob.ConcurrencyGroup) + + opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf2TreePath, wf2FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf2TreePath, opts2) + job2DarwinTask := darwinRunner.fetchTask(t) _, job2DarwinJob, _ := getTaskAndJobAndRunByTaskID(t, job2DarwinTask.Id) - assert.Equal(t, "job2 (darwin)", job2DarwinJob.Name) + assert.Equal(t, "wf2-job (darwin)", job2DarwinJob.Name) assert.Equal(t, "job-os-darwin", job2DarwinJob.ConcurrencyGroup) + windowsRunner.execTask(t, job1WinTask, &mockTaskOutcome{ result: runnerv1.Result_RESULT_SUCCESS, }) @@ -472,10 +487,10 @@ jobs: job2WinTask := windowsRunner.fetchTask(t) job2LinuxTask := linuxRunner.fetchTask(t) _, job2WinJob, _ := getTaskAndJobAndRunByTaskID(t, job2WinTask.Id) - assert.Equal(t, "job2 (windows)", job2WinJob.Name) + assert.Equal(t, "wf2-job (windows)", job2WinJob.Name) assert.Equal(t, "job-os-windows", job2WinJob.ConcurrencyGroup) _, job2LinuxJob, _ := getTaskAndJobAndRunByTaskID(t, job2LinuxTask.Id) - assert.Equal(t, "job2 (linux)", job2LinuxJob.Name) + assert.Equal(t, "wf2-job (linux)", job2LinuxJob.Name) assert.Equal(t, "job-os-linux", job2LinuxJob.ConcurrencyGroup) }) } @@ -701,27 +716,20 @@ jobs: }) _ = session.MakeRequest(t, req, http.StatusOK) + run2_2 := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: run2.ID}) + assert.Equal(t, actions_model.StatusWaiting, run2_2.Status) + req = NewRequestWithValues(t, "POST", fmt.Sprintf("/%s/%s/actions/runs/%d/rerun", user2.Name, apiRepo.Name, run2.Index+1), map[string]string{ "_csrf": GetUserCSRFToken(t, session), }) _ = session.MakeRequest(t, req, http.StatusOK) task6 := runner.fetchTask(t) - _, _, run2_2 := getTaskAndJobAndRunByTaskID(t, task6.Id) - assert.Equal(t, "workflow-dispatch-v1.22", run2_2.ConcurrencyGroup) - - runner.fetchNoTask(t) // cannot fetch task because task2 is not completed - - runner.execTask(t, task6, &mockTaskOutcome{ - result: runnerv1.Result_RESULT_SUCCESS, - }) - - task7 := runner.fetchTask(t) - _, _, run3 := getTaskAndJobAndRunByTaskID(t, task7.Id) + _, _, run3 := getTaskAndJobAndRunByTaskID(t, task6.Id) assert.Equal(t, "workflow-dispatch-v1.22", run3.ConcurrencyGroup) - runner.execTask(t, task7, &mockTaskOutcome{ - result: runnerv1.Result_RESULT_SUCCESS, - }) + + run2_2 = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: run2_2.ID}) + assert.Equal(t, actions_model.StatusCancelled, run2_2.Status) // cancelled by run3 }) } @@ -852,27 +860,20 @@ jobs: }) _ = session.MakeRequest(t, req, http.StatusOK) + run2_2 := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: run2.ID}) + assert.Equal(t, actions_model.StatusWaiting, run2_2.Status) + req = NewRequestWithValues(t, "POST", fmt.Sprintf("/%s/%s/actions/runs/%d/jobs/%d/rerun", user2.Name, apiRepo.Name, run2.Index+1, 1), map[string]string{ "_csrf": GetUserCSRFToken(t, session), }) _ = session.MakeRequest(t, req, http.StatusOK) task6 := runner.fetchTask(t) - _, _, run2_2 := getTaskAndJobAndRunByTaskID(t, task6.Id) - assert.Equal(t, "workflow-dispatch-v1.22", run2_2.ConcurrencyGroup) - - runner.fetchNoTask(t) // cannot fetch task because task2 is not completed - - runner.execTask(t, task6, &mockTaskOutcome{ - result: runnerv1.Result_RESULT_SUCCESS, - }) - - task7 := runner.fetchTask(t) - _, _, run3 := getTaskAndJobAndRunByTaskID(t, task7.Id) + _, _, run3 := getTaskAndJobAndRunByTaskID(t, task6.Id) assert.Equal(t, "workflow-dispatch-v1.22", run3.ConcurrencyGroup) - runner.execTask(t, task7, &mockTaskOutcome{ - result: runnerv1.Result_RESULT_SUCCESS, - }) + + run2_2 = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: run2_2.ID}) + assert.Equal(t, actions_model.StatusCancelled, run2_2.Status) // cancelled by run3 }) }