add TestAbandonConcurrentRun

This commit is contained in:
Zettat123 2025-07-25 16:47:54 -06:00
parent 6a9eb9e1e7
commit 42c2ca2c91
2 changed files with 101 additions and 1 deletions

View File

@ -120,7 +120,7 @@ func stopTasks(ctx context.Context, opts actions_model.FindTaskOptions) error {
func CancelAbandonedJobs(ctx context.Context) error {
jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{
Statuses: []actions_model.Status{actions_model.StatusWaiting, actions_model.StatusBlocked},
UpdatedBefore: timeutil.TimeStamp(time.Now().Add(-setting.Actions.AbandonedJobTimeout).Unix()),
UpdatedBefore: timeutil.TimeStampNow().AddDuration(-setting.Actions.AbandonedJobTimeout),
})
if err != nil {
log.Warn("find abandoned tasks: %v", err)

View File

@ -16,6 +16,7 @@ import (
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/models/unittest"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/setting"
api "code.gitea.io/gitea/modules/structs"
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/modules/util"
@ -1247,6 +1248,105 @@ jobs:
})
}
func TestAbandonConcurrentRun(t *testing.T) {
onGiteaRun(t, func(t *testing.T, u *url.URL) {
user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2})
user2Session := loginUser(t, user2.Name)
user2Token := getTokenForLoggedInUser(t, user2Session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser)
apiRepo := createActionsTestRepo(t, user2Token, "actions-concurrency", false)
repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiRepo.ID})
user2APICtx := NewAPITestContext(t, repo.OwnerName, repo.Name, auth_model.AccessTokenScopeWriteRepository)
defer doAPIDeleteRepository(user2APICtx)(t)
runner := newMockRunner()
runner.registerAsRepoRunner(t, repo.OwnerName, repo.Name, "mock-runner", []string{"ubuntu-latest"}, false)
wf1TreePath := ".gitea/workflows/workflow-1.yml"
wf1FileContent := `name: Workflow-1
on:
push:
paths:
- '.gitea/workflows/workflow-1.yml'
concurrency:
group: test-group
jobs:
wf1-job1:
runs-on: ubuntu-latest
steps:
- run: echo 'wf1-job1'
wf1-job2:
runs-on: customized-runner
steps:
- run: echo 'wf1-job1'
`
wf2TreePath := ".gitea/workflows/workflow-2.yml"
wf2FileContent := `name: Workflow-2
on:
push:
paths:
- '.gitea/workflows/workflow-2.yml'
concurrency:
group: test-group
jobs:
wf2-job1:
runs-on: ubuntu-latest
steps:
- run: echo 'wf2-job1'
`
// push workflow1
opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf1TreePath, wf1FileContent)
createWorkflowFile(t, user2Token, repo.OwnerName, repo.Name, wf1TreePath, opts1)
// fetch wf1-job1
w1j1Task := runner.fetchTask(t)
_, _, run1 := getTaskAndJobAndRunByTaskID(t, w1j1Task.Id)
assert.Equal(t, "test-group", run1.ConcurrencyGroup)
assert.Equal(t, actions_model.StatusRunning, run1.Status)
// query wf1-job2 from db and check its status
w1j2Job := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{RunID: run1.ID, JobID: "wf1-job2"})
// wf1-job2 is waiting but no runner will run it
assert.Equal(t, actions_model.StatusWaiting, w1j2Job.Status)
time.Sleep(time.Second)
now := time.Now()
time.Sleep(time.Second)
// push workflow2
opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf2TreePath, wf2FileContent)
createWorkflowFile(t, user2Token, repo.OwnerName, repo.Name, wf2TreePath, opts2)
// query run2 from db and check its status
run2 := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{RepoID: repo.ID, WorkflowID: "workflow-2.yml"})
// run2 is blocked because it is blocked by workflow1's concurrency group "test-group"
assert.Equal(t, actions_model.StatusBlocked, run2.Status)
// mock time
fakeNow := now.Add(setting.Actions.AbandonedJobTimeout)
timeutil.MockSet(fakeNow)
defer timeutil.MockUnset()
// call CancelAbandonedJobs manually
assert.NoError(t, actions_service.CancelAbandonedJobs(t.Context()))
// check the status of wf1-job2
w1j2Job = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{ID: w1j2Job.ID})
assert.Equal(t, actions_model.StatusCancelled, w1j2Job.Status)
// check the status of run1
run1 = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: run1.ID})
assert.Equal(t, actions_model.StatusCancelled, run1.Status)
// fetch wf2-job1 and check
w2j1Task := runner.fetchTask(t)
_, w2j1Job, run2 := getTaskAndJobAndRunByTaskID(t, w2j1Task.Id)
assert.Equal(t, "test-group", run2.ConcurrencyGroup)
assert.Equal(t, "wf2-job1", w2j1Job.JobID)
assert.Equal(t, actions_model.StatusRunning, run2.Status)
assert.Equal(t, actions_model.StatusRunning, w2j1Job.Status)
})
}
func TestRunAndJobWithSameConcurrencyGroup(t *testing.T) {
onGiteaRun(t, func(t *testing.T, u *url.URL) {
user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2})