diff --git a/services/actions/clear_tasks.go b/services/actions/clear_tasks.go index 1b871109008..f4acab10356 100644 --- a/services/actions/clear_tasks.go +++ b/services/actions/clear_tasks.go @@ -120,7 +120,7 @@ func stopTasks(ctx context.Context, opts actions_model.FindTaskOptions) error { func CancelAbandonedJobs(ctx context.Context) error { jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{ Statuses: []actions_model.Status{actions_model.StatusWaiting, actions_model.StatusBlocked}, - UpdatedBefore: timeutil.TimeStamp(time.Now().Add(-setting.Actions.AbandonedJobTimeout).Unix()), + UpdatedBefore: timeutil.TimeStampNow().AddDuration(-setting.Actions.AbandonedJobTimeout), }) if err != nil { log.Warn("find abandoned tasks: %v", err) diff --git a/tests/integration/actions_concurrency_test.go b/tests/integration/actions_concurrency_test.go index c3a22cf8c15..1115b054226 100644 --- a/tests/integration/actions_concurrency_test.go +++ b/tests/integration/actions_concurrency_test.go @@ -16,6 +16,7 @@ import ( repo_model "code.gitea.io/gitea/models/repo" "code.gitea.io/gitea/models/unittest" user_model "code.gitea.io/gitea/models/user" + "code.gitea.io/gitea/modules/setting" api "code.gitea.io/gitea/modules/structs" "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/util" @@ -1247,6 +1248,105 @@ jobs: }) } +func TestAbandonConcurrentRun(t *testing.T) { + onGiteaRun(t, func(t *testing.T, u *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + user2Session := loginUser(t, user2.Name) + user2Token := getTokenForLoggedInUser(t, user2Session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + apiRepo := createActionsTestRepo(t, user2Token, "actions-concurrency", false) + repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiRepo.ID}) + user2APICtx := NewAPITestContext(t, repo.OwnerName, repo.Name, auth_model.AccessTokenScopeWriteRepository) + defer doAPIDeleteRepository(user2APICtx)(t) + + runner := newMockRunner() + runner.registerAsRepoRunner(t, repo.OwnerName, repo.Name, "mock-runner", []string{"ubuntu-latest"}, false) + + wf1TreePath := ".gitea/workflows/workflow-1.yml" + wf1FileContent := `name: Workflow-1 +on: + push: + paths: + - '.gitea/workflows/workflow-1.yml' +concurrency: + group: test-group +jobs: + wf1-job1: + runs-on: ubuntu-latest + steps: + - run: echo 'wf1-job1' + wf1-job2: + runs-on: customized-runner + steps: + - run: echo 'wf1-job1' +` + + wf2TreePath := ".gitea/workflows/workflow-2.yml" + wf2FileContent := `name: Workflow-2 +on: + push: + paths: + - '.gitea/workflows/workflow-2.yml' +concurrency: + group: test-group +jobs: + wf2-job1: + runs-on: ubuntu-latest + steps: + - run: echo 'wf2-job1' +` + // push workflow1 + opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf1TreePath, wf1FileContent) + createWorkflowFile(t, user2Token, repo.OwnerName, repo.Name, wf1TreePath, opts1) + + // fetch wf1-job1 + w1j1Task := runner.fetchTask(t) + _, _, run1 := getTaskAndJobAndRunByTaskID(t, w1j1Task.Id) + assert.Equal(t, "test-group", run1.ConcurrencyGroup) + assert.Equal(t, actions_model.StatusRunning, run1.Status) + // query wf1-job2 from db and check its status + w1j2Job := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{RunID: run1.ID, JobID: "wf1-job2"}) + // wf1-job2 is waiting but no runner will run it + assert.Equal(t, actions_model.StatusWaiting, w1j2Job.Status) + + time.Sleep(time.Second) + now := time.Now() + time.Sleep(time.Second) + + // push workflow2 + opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf2TreePath, wf2FileContent) + createWorkflowFile(t, user2Token, repo.OwnerName, repo.Name, wf2TreePath, opts2) + + // query run2 from db and check its status + run2 := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{RepoID: repo.ID, WorkflowID: "workflow-2.yml"}) + // run2 is blocked because it is blocked by workflow1's concurrency group "test-group" + assert.Equal(t, actions_model.StatusBlocked, run2.Status) + + // mock time + fakeNow := now.Add(setting.Actions.AbandonedJobTimeout) + timeutil.MockSet(fakeNow) + defer timeutil.MockUnset() + + // call CancelAbandonedJobs manually + assert.NoError(t, actions_service.CancelAbandonedJobs(t.Context())) + + // check the status of wf1-job2 + w1j2Job = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{ID: w1j2Job.ID}) + assert.Equal(t, actions_model.StatusCancelled, w1j2Job.Status) + // check the status of run1 + run1 = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: run1.ID}) + assert.Equal(t, actions_model.StatusCancelled, run1.Status) + + // fetch wf2-job1 and check + w2j1Task := runner.fetchTask(t) + _, w2j1Job, run2 := getTaskAndJobAndRunByTaskID(t, w2j1Task.Id) + assert.Equal(t, "test-group", run2.ConcurrencyGroup) + assert.Equal(t, "wf2-job1", w2j1Job.JobID) + assert.Equal(t, actions_model.StatusRunning, run2.Status) + assert.Equal(t, actions_model.StatusRunning, w2j1Job.Status) + }) +} + func TestRunAndJobWithSameConcurrencyGroup(t *testing.T) { onGiteaRun(t, func(t *testing.T, u *url.URL) { user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2})