fix bugs and tests

This commit is contained in:
Zettat123 2025-07-22 13:40:07 -06:00
parent 63eaf15bd2
commit 96d6938c80
4 changed files with 69 additions and 64 deletions

View File

@ -404,7 +404,7 @@ func ShouldBlockRunByConcurrency(ctx context.Context, actionRun *ActionRun) (boo
concurrentRuns, err := db.Find[ActionRun](ctx, &FindRunOptions{ concurrentRuns, err := db.Find[ActionRun](ctx, &FindRunOptions{
RepoID: actionRun.RepoID, RepoID: actionRun.RepoID,
ConcurrencyGroup: actionRun.ConcurrencyGroup, ConcurrencyGroup: actionRun.ConcurrencyGroup,
Status: []Status{StatusWaiting, StatusRunning}, Status: []Status{StatusRunning},
}) })
if err != nil { if err != nil {
return false, fmt.Errorf("find running and waiting runs: %w", err) return false, fmt.Errorf("find running and waiting runs: %w", err)

View File

@ -222,7 +222,7 @@ func ShouldBlockJobByConcurrency(ctx context.Context, job *ActionRunJob) (bool,
concurrentJobsNum, err := db.Count[ActionRunJob](ctx, FindRunJobOptions{ concurrentJobsNum, err := db.Count[ActionRunJob](ctx, FindRunJobOptions{
RepoID: job.RepoID, RepoID: job.RepoID,
ConcurrencyGroup: job.ConcurrencyGroup, ConcurrencyGroup: job.ConcurrencyGroup,
Statuses: []Status{StatusRunning, StatusWaiting}, Statuses: []Status{StatusRunning},
}) })
if err != nil { if err != nil {
return false, fmt.Errorf("count running and waiting jobs: %w", err) return false, fmt.Errorf("count running and waiting jobs: %w", err)

View File

@ -285,12 +285,17 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model
} }
} }
if allDone { if allDone {
// evaluate concurrency // check concurrency
if err := evaluateConcurrencyForJobWithNeeds(ctx, r.jobMap[id], r.vars); err != nil { blockedByJobConcurrency, err := checkConcurrencyForJobWithNeeds(ctx, r.jobMap[id], r.vars)
if err != nil {
log.Error("Check job %d concurrency: %v. This job will stay blocked.", id, err) log.Error("Check job %d concurrency: %v. This job will stay blocked.", id, err)
continue continue
} }
if blockedByJobConcurrency {
continue
}
if err := CancelJobsByJobConcurrency(ctx, r.jobMap[id]); err != nil { if err := CancelJobsByJobConcurrency(ctx, r.jobMap[id]); err != nil {
log.Error("Cancel previous jobs for job %d: %v", id, err) log.Error("Cancel previous jobs for job %d: %v", id, err)
} }
@ -304,7 +309,6 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model
_, wfJob := wfJobs[0].Job() _, wfJob := wfJobs[0].Job()
hasIf = len(wfJob.If.Value) > 0 hasIf = len(wfJob.If.Value) > 0
} }
if hasIf { if hasIf {
// act_runner will check the "if" condition // act_runner will check the "if" condition
ret[id] = actions_model.StatusWaiting ret[id] = actions_model.StatusWaiting
@ -319,18 +323,18 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model
return ret return ret
} }
func evaluateConcurrencyForJobWithNeeds(ctx context.Context, actionRunJob *actions_model.ActionRunJob, vars map[string]string) error { func checkConcurrencyForJobWithNeeds(ctx context.Context, actionRunJob *actions_model.ActionRunJob, vars map[string]string) (bool, error) {
if actionRunJob.RawConcurrencyGroup == "" { if actionRunJob.RawConcurrencyGroup == "" {
return nil return false, nil
} }
if err := actionRunJob.LoadAttributes(ctx); err != nil { if err := actionRunJob.LoadAttributes(ctx); err != nil {
return err return false, err
} }
if !actionRunJob.IsConcurrencyEvaluated { if !actionRunJob.IsConcurrencyEvaluated {
taskNeeds, err := FindTaskNeeds(ctx, actionRunJob) taskNeeds, err := FindTaskNeeds(ctx, actionRunJob)
if err != nil { if err != nil {
return fmt.Errorf("find task needs: %w", err) return false, fmt.Errorf("find task needs: %w", err)
} }
jobResults := make(map[string]*jobparser.JobResult, len(taskNeeds)) jobResults := make(map[string]*jobparser.JobResult, len(taskNeeds))
for jobID, taskNeed := range taskNeeds { for jobID, taskNeed := range taskNeeds {
@ -343,14 +347,14 @@ func evaluateConcurrencyForJobWithNeeds(ctx context.Context, actionRunJob *actio
actionRunJob.ConcurrencyGroup, actionRunJob.ConcurrencyCancel, err = EvaluateJobConcurrency(ctx, actionRunJob.Run, actionRunJob, vars, jobResults) actionRunJob.ConcurrencyGroup, actionRunJob.ConcurrencyCancel, err = EvaluateJobConcurrency(ctx, actionRunJob.Run, actionRunJob, vars, jobResults)
if err != nil { if err != nil {
return fmt.Errorf("evaluate job concurrency: %w", err) return false, fmt.Errorf("evaluate job concurrency: %w", err)
} }
actionRunJob.IsConcurrencyEvaluated = true actionRunJob.IsConcurrencyEvaluated = true
if _, err := actions_model.UpdateRunJob(ctx, actionRunJob, nil, "concurrency_group", "concurrency_cancel", "is_concurrency_evaluated"); err != nil { if _, err := actions_model.UpdateRunJob(ctx, actionRunJob, nil, "concurrency_group", "concurrency_cancel", "is_concurrency_evaluated"); err != nil {
return fmt.Errorf("update run job: %w", err) return false, fmt.Errorf("update run job: %w", err)
} }
} }
return nil return actions_model.ShouldBlockJobByConcurrency(ctx, actionRunJob)
} }

View File

@ -8,7 +8,6 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"net/url" "net/url"
"slices"
"testing" "testing"
"time" "time"
@ -91,14 +90,10 @@ jobs:
steps: steps:
- run: echo 'job from workflow3' - run: echo 'job from workflow3'
` `
// push workflow1
opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf1TreePath, wf1FileContent) opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf1TreePath, wf1FileContent)
createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1) createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1)
opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf2TreePath, wf2FileContent) // fetch and exec workflow1
createWorkflowFile(t, token, user2.Name, repo.Name, wf2TreePath, opts2)
opts3 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf3TreePath, wf3FileContent)
createWorkflowFile(t, token, user2.Name, repo.Name, wf3TreePath, opts3)
// fetch and exec workflow1, workflow2 and workflow3 are blocked
task := runner.fetchTask(t) task := runner.fetchTask(t)
_, _, run := getTaskAndJobAndRunByTaskID(t, task.Id) _, _, run := getTaskAndJobAndRunByTaskID(t, task.Id)
assert.Equal(t, "workflow-main-abc123-user2", run.ConcurrencyGroup) assert.Equal(t, "workflow-main-abc123-user2", run.ConcurrencyGroup)
@ -108,23 +103,30 @@ jobs:
result: runnerv1.Result_RESULT_SUCCESS, result: runnerv1.Result_RESULT_SUCCESS,
}) })
// fetch workflow2 or workflow3 // push workflow2
workflowNames := []string{"concurrent-workflow-2.yml", "concurrent-workflow-3.yml"} opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf2TreePath, wf2FileContent)
createWorkflowFile(t, token, user2.Name, repo.Name, wf2TreePath, opts2)
// fetch workflow2
task = runner.fetchTask(t) task = runner.fetchTask(t)
_, _, run = getTaskAndJobAndRunByTaskID(t, task.Id) _, _, run = getTaskAndJobAndRunByTaskID(t, task.Id)
assert.Contains(t, workflowNames, run.WorkflowID)
workflowNames = slices.DeleteFunc(workflowNames, func(wfn string) bool { return wfn == run.WorkflowID })
assert.Equal(t, "workflow-main-abc123-user2", run.ConcurrencyGroup) assert.Equal(t, "workflow-main-abc123-user2", run.ConcurrencyGroup)
assert.Equal(t, "concurrent-workflow-2.yml", run.WorkflowID)
// push workflow3
opts3 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf3TreePath, wf3FileContent)
createWorkflowFile(t, token, user2.Name, repo.Name, wf3TreePath, opts3)
runner.fetchNoTask(t) runner.fetchNoTask(t)
// exec workflow2
runner.execTask(t, task, &mockTaskOutcome{ runner.execTask(t, task, &mockTaskOutcome{
result: runnerv1.Result_RESULT_SUCCESS, result: runnerv1.Result_RESULT_SUCCESS,
}) })
// fetch the last workflow (workflow2 or workflow3) // fetch and exec workflow3
task = runner.fetchTask(t) task = runner.fetchTask(t)
_, _, run = getTaskAndJobAndRunByTaskID(t, task.Id) _, _, run = getTaskAndJobAndRunByTaskID(t, task.Id)
assert.Equal(t, "workflow-main-abc123-user2", run.ConcurrencyGroup) assert.Equal(t, "workflow-main-abc123-user2", run.ConcurrencyGroup)
assert.Equal(t, workflowNames[0], run.WorkflowID) assert.Equal(t, "concurrent-workflow-3.yml", run.WorkflowID)
runner.fetchNoTask(t) runner.fetchNoTask(t)
runner.execTask(t, task, &mockTaskOutcome{ runner.execTask(t, task, &mockTaskOutcome{
result: runnerv1.Result_RESULT_SUCCESS, result: runnerv1.Result_RESULT_SUCCESS,
@ -425,7 +427,7 @@ on:
paths: paths:
- '.gitea/workflows/concurrent-workflow-1.yml' - '.gitea/workflows/concurrent-workflow-1.yml'
jobs: jobs:
job1: wf1-job:
runs-on: ${{ matrix.os }}-runner runs-on: ${{ matrix.os }}-runner
strategy: strategy:
matrix: matrix:
@ -433,8 +435,17 @@ jobs:
concurrency: concurrency:
group: job-os-${{ matrix.os }} group: job-os-${{ matrix.os }}
steps: steps:
- run: echo 'job1' - run: echo 'wf1'
job2: `
wf2TreePath := ".gitea/workflows/concurrent-workflow-2.yml"
wf2FileContent := `name: concurrent-workflow-2
on:
push:
paths:
- '.gitea/workflows/concurrent-workflow-2.yml'
jobs:
wf2-job:
runs-on: ${{ matrix.os }}-runner runs-on: ${{ matrix.os }}-runner
strategy: strategy:
matrix: matrix:
@ -442,26 +453,30 @@ jobs:
concurrency: concurrency:
group: job-os-${{ matrix.os }} group: job-os-${{ matrix.os }}
steps: steps:
- run: echo 'job2' - run: echo 'wf2'
` `
opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf1TreePath, wf1FileContent) opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf1TreePath, wf1FileContent)
createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1) createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1)
job1WinTask := windowsRunner.fetchTask(t) job1WinTask := windowsRunner.fetchTask(t)
job1LinuxTask := linuxRunner.fetchTask(t) job1LinuxTask := linuxRunner.fetchTask(t)
windowsRunner.fetchNoTask(t) windowsRunner.fetchNoTask(t)
linuxRunner.fetchNoTask(t) linuxRunner.fetchNoTask(t)
job2DarwinTask := darwinRunner.fetchTask(t)
_, job1WinJob, _ := getTaskAndJobAndRunByTaskID(t, job1WinTask.Id) _, job1WinJob, _ := getTaskAndJobAndRunByTaskID(t, job1WinTask.Id)
assert.Equal(t, "job1 (windows)", job1WinJob.Name) assert.Equal(t, "wf1-job (windows)", job1WinJob.Name)
assert.Equal(t, "job-os-windows", job1WinJob.ConcurrencyGroup) assert.Equal(t, "job-os-windows", job1WinJob.ConcurrencyGroup)
_, job1LinuxJob, _ := getTaskAndJobAndRunByTaskID(t, job1LinuxTask.Id) _, job1LinuxJob, _ := getTaskAndJobAndRunByTaskID(t, job1LinuxTask.Id)
assert.Equal(t, "job1 (linux)", job1LinuxJob.Name) assert.Equal(t, "wf1-job (linux)", job1LinuxJob.Name)
assert.Equal(t, "job-os-linux", job1LinuxJob.ConcurrencyGroup) assert.Equal(t, "job-os-linux", job1LinuxJob.ConcurrencyGroup)
opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf2TreePath, wf2FileContent)
createWorkflowFile(t, token, user2.Name, repo.Name, wf2TreePath, opts2)
job2DarwinTask := darwinRunner.fetchTask(t)
_, job2DarwinJob, _ := getTaskAndJobAndRunByTaskID(t, job2DarwinTask.Id) _, job2DarwinJob, _ := getTaskAndJobAndRunByTaskID(t, job2DarwinTask.Id)
assert.Equal(t, "job2 (darwin)", job2DarwinJob.Name) assert.Equal(t, "wf2-job (darwin)", job2DarwinJob.Name)
assert.Equal(t, "job-os-darwin", job2DarwinJob.ConcurrencyGroup) assert.Equal(t, "job-os-darwin", job2DarwinJob.ConcurrencyGroup)
windowsRunner.execTask(t, job1WinTask, &mockTaskOutcome{ windowsRunner.execTask(t, job1WinTask, &mockTaskOutcome{
result: runnerv1.Result_RESULT_SUCCESS, result: runnerv1.Result_RESULT_SUCCESS,
}) })
@ -472,10 +487,10 @@ jobs:
job2WinTask := windowsRunner.fetchTask(t) job2WinTask := windowsRunner.fetchTask(t)
job2LinuxTask := linuxRunner.fetchTask(t) job2LinuxTask := linuxRunner.fetchTask(t)
_, job2WinJob, _ := getTaskAndJobAndRunByTaskID(t, job2WinTask.Id) _, job2WinJob, _ := getTaskAndJobAndRunByTaskID(t, job2WinTask.Id)
assert.Equal(t, "job2 (windows)", job2WinJob.Name) assert.Equal(t, "wf2-job (windows)", job2WinJob.Name)
assert.Equal(t, "job-os-windows", job2WinJob.ConcurrencyGroup) assert.Equal(t, "job-os-windows", job2WinJob.ConcurrencyGroup)
_, job2LinuxJob, _ := getTaskAndJobAndRunByTaskID(t, job2LinuxTask.Id) _, job2LinuxJob, _ := getTaskAndJobAndRunByTaskID(t, job2LinuxTask.Id)
assert.Equal(t, "job2 (linux)", job2LinuxJob.Name) assert.Equal(t, "wf2-job (linux)", job2LinuxJob.Name)
assert.Equal(t, "job-os-linux", job2LinuxJob.ConcurrencyGroup) assert.Equal(t, "job-os-linux", job2LinuxJob.ConcurrencyGroup)
}) })
} }
@ -701,27 +716,20 @@ jobs:
}) })
_ = session.MakeRequest(t, req, http.StatusOK) _ = session.MakeRequest(t, req, http.StatusOK)
run2_2 := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: run2.ID})
assert.Equal(t, actions_model.StatusWaiting, run2_2.Status)
req = NewRequestWithValues(t, "POST", fmt.Sprintf("/%s/%s/actions/runs/%d/rerun", user2.Name, apiRepo.Name, run2.Index+1), map[string]string{ req = NewRequestWithValues(t, "POST", fmt.Sprintf("/%s/%s/actions/runs/%d/rerun", user2.Name, apiRepo.Name, run2.Index+1), map[string]string{
"_csrf": GetUserCSRFToken(t, session), "_csrf": GetUserCSRFToken(t, session),
}) })
_ = session.MakeRequest(t, req, http.StatusOK) _ = session.MakeRequest(t, req, http.StatusOK)
task6 := runner.fetchTask(t) task6 := runner.fetchTask(t)
_, _, run2_2 := getTaskAndJobAndRunByTaskID(t, task6.Id) _, _, run3 := getTaskAndJobAndRunByTaskID(t, task6.Id)
assert.Equal(t, "workflow-dispatch-v1.22", run2_2.ConcurrencyGroup)
runner.fetchNoTask(t) // cannot fetch task because task2 is not completed
runner.execTask(t, task6, &mockTaskOutcome{
result: runnerv1.Result_RESULT_SUCCESS,
})
task7 := runner.fetchTask(t)
_, _, run3 := getTaskAndJobAndRunByTaskID(t, task7.Id)
assert.Equal(t, "workflow-dispatch-v1.22", run3.ConcurrencyGroup) assert.Equal(t, "workflow-dispatch-v1.22", run3.ConcurrencyGroup)
runner.execTask(t, task7, &mockTaskOutcome{
result: runnerv1.Result_RESULT_SUCCESS, run2_2 = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: run2_2.ID})
}) assert.Equal(t, actions_model.StatusCancelled, run2_2.Status) // cancelled by run3
}) })
} }
@ -852,27 +860,20 @@ jobs:
}) })
_ = session.MakeRequest(t, req, http.StatusOK) _ = session.MakeRequest(t, req, http.StatusOK)
run2_2 := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: run2.ID})
assert.Equal(t, actions_model.StatusWaiting, run2_2.Status)
req = NewRequestWithValues(t, "POST", fmt.Sprintf("/%s/%s/actions/runs/%d/jobs/%d/rerun", user2.Name, apiRepo.Name, run2.Index+1, 1), map[string]string{ req = NewRequestWithValues(t, "POST", fmt.Sprintf("/%s/%s/actions/runs/%d/jobs/%d/rerun", user2.Name, apiRepo.Name, run2.Index+1, 1), map[string]string{
"_csrf": GetUserCSRFToken(t, session), "_csrf": GetUserCSRFToken(t, session),
}) })
_ = session.MakeRequest(t, req, http.StatusOK) _ = session.MakeRequest(t, req, http.StatusOK)
task6 := runner.fetchTask(t) task6 := runner.fetchTask(t)
_, _, run2_2 := getTaskAndJobAndRunByTaskID(t, task6.Id) _, _, run3 := getTaskAndJobAndRunByTaskID(t, task6.Id)
assert.Equal(t, "workflow-dispatch-v1.22", run2_2.ConcurrencyGroup)
runner.fetchNoTask(t) // cannot fetch task because task2 is not completed
runner.execTask(t, task6, &mockTaskOutcome{
result: runnerv1.Result_RESULT_SUCCESS,
})
task7 := runner.fetchTask(t)
_, _, run3 := getTaskAndJobAndRunByTaskID(t, task7.Id)
assert.Equal(t, "workflow-dispatch-v1.22", run3.ConcurrencyGroup) assert.Equal(t, "workflow-dispatch-v1.22", run3.ConcurrencyGroup)
runner.execTask(t, task7, &mockTaskOutcome{
result: runnerv1.Result_RESULT_SUCCESS, run2_2 = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: run2_2.ID})
}) assert.Equal(t, actions_model.StatusCancelled, run2_2.Status) // cancelled by run3
}) })
} }