From b0121269173d056b52aa0a0bcbc77b6f28c1e1d4 Mon Sep 17 00:00:00 2001 From: Christopher Homberger Date: Wed, 9 Jul 2025 12:27:34 +0200 Subject: [PATCH] fix: rerun all jobs did not respect concurrency --- routers/web/repo/actions/view.go | 42 +++-- tests/integration/actions_concurrency_test.go | 151 ++++++++++++++++++ 2 files changed, 182 insertions(+), 11 deletions(-) diff --git a/routers/web/repo/actions/view.go b/routers/web/repo/actions/view.go index 4f59485d19f..4665d027ee2 100644 --- a/routers/web/repo/actions/view.go +++ b/routers/web/repo/actions/view.go @@ -420,26 +420,45 @@ func Rerun(ctx *context_module.Context) { return } - // reset run's start and stop time when it is done - if run.Status.IsDone() { - run.PreviousDuration = run.Duration() - run.Started = 0 - run.Stopped = 0 - if err := actions_model.UpdateRun(ctx, run, "started", "stopped", "previous_duration"); err != nil { - ctx.ServerError("UpdateRun", err) - return - } - } + // TODO evaluate concurrency expression again, vars may change after the run is done + // check run (workflow-level) concurrency job, jobs := getRunJobs(ctx, runIndex, jobIndex) if ctx.Written() { return } + var blockRunByConcurrency bool + + // reset run's start and stop time when it is done + if run.Status.IsDone() { + run.PreviousDuration = run.Duration() + run.Started = 0 + run.Stopped = 0 + + blockRunByConcurrency, err = actions_model.ShouldBlockRunByConcurrency(ctx, run) + if err != nil { + ctx.ServerError("ShouldBlockRunByConcurrency", err) + return + } + if blockRunByConcurrency { + run.Status = actions_model.StatusBlocked + } else if err := actions_service.CancelJobsByRunConcurrency(ctx, run); err != nil { + ctx.ServerError("cancel jobs", err) + return + } else { + run.Status = actions_model.StatusRunning + } + if err := actions_model.UpdateRun(ctx, run, "started", "stopped", "previous_duration", "status"); err != nil { + ctx.ServerError("UpdateRun", err) + return + } + } + if jobIndexStr == "" { // rerun all jobs for _, j := range jobs { // if the job has needs, it should be set to "blocked" status to wait for other jobs - shouldBlock := len(j.Needs) > 0 + shouldBlock := len(j.Needs) > 0 || blockRunByConcurrency if err := rerunJob(ctx, j, shouldBlock); err != nil { ctx.ServerError("RerunJob", err) return @@ -453,6 +472,7 @@ func Rerun(ctx *context_module.Context) { for _, j := range rerunJobs { // jobs other than the specified one should be set to "blocked" status + // TODO respect blockRunByConcurrency here? shouldBlock := j.JobID != job.JobID if err := rerunJob(ctx, j, shouldBlock); err != nil { ctx.ServerError("RerunJob", err) diff --git a/tests/integration/actions_concurrency_test.go b/tests/integration/actions_concurrency_test.go index 30c36b4024f..8ec4930f8c8 100644 --- a/tests/integration/actions_concurrency_test.go +++ b/tests/integration/actions_concurrency_test.go @@ -574,6 +574,157 @@ jobs: }) } +func TestWorkflowDispatchRerunAllJobsConcurrency(t *testing.T) { + onGiteaRun(t, func(t *testing.T, u *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + session := loginUser(t, user2.Name) + token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + apiRepo := createActionsTestRepo(t, token, "actions-concurrency", false) + repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiRepo.ID}) + httpContext := NewAPITestContext(t, user2.Name, repo.Name, auth_model.AccessTokenScopeWriteRepository) + defer doAPIDeleteRepository(httpContext)(t) + + runner := newMockRunner() + runner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner", []string{"ubuntu-latest"}, false) + + wf1TreePath := ".gitea/workflows/workflow-dispatch-concurrency.yml" + wf1FileContent := `name: workflow-dispatch-concurrency +on: + workflow_dispatch: + inputs: + appVersion: + description: 'APP version' + required: true + default: 'v1.23' + type: choice + options: + - v1.21 + - v1.22 + - v1.23 + cancel: + description: 'Cancel running workflows' + required: false + type: boolean + default: false +concurrency: + group: workflow-dispatch-${{ inputs.appVersion }} + cancel-in-progress: ${{ inputs.cancel }} +jobs: + job: + runs-on: ubuntu-latest + steps: + - run: echo 'workflow dispatch job' +` + + opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf1TreePath, wf1FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1) + + // run the workflow with appVersion=v1.21 and cancel=false + urlStr := fmt.Sprintf("/%s/%s/actions/run?workflow=%s", user2.Name, repo.Name, "workflow-dispatch-concurrency.yml") + req := NewRequestWithValues(t, "POST", urlStr, map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + "ref": "refs/heads/main", + "appVersion": "v1.21", + }) + session.MakeRequest(t, req, http.StatusSeeOther) + task1 := runner.fetchTask(t) + _, _, run1 := getTaskAndJobAndRunByTaskID(t, task1.Id) + assert.Equal(t, "workflow-dispatch-v1.21", run1.ConcurrencyGroup) + + req = NewRequestWithValues(t, "POST", urlStr, map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + "ref": "refs/heads/main", + "appVersion": "v1.22", + }) + session.MakeRequest(t, req, http.StatusSeeOther) + task2 := runner.fetchTask(t) + _, _, run2 := getTaskAndJobAndRunByTaskID(t, task2.Id) + assert.Equal(t, "workflow-dispatch-v1.22", run2.ConcurrencyGroup) + + // run the workflow with appVersion=v1.22 and cancel=false again + req = NewRequestWithValues(t, "POST", urlStr, map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + "ref": "refs/heads/main", + "appVersion": "v1.22", + }) + session.MakeRequest(t, req, http.StatusSeeOther) + + runner.fetchNoTask(t) // cannot fetch task because task2 is not completed + + // run the workflow with appVersion=v1.22 and cancel=true + req = NewRequestWithValues(t, "POST", urlStr, map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + "ref": "refs/heads/main", + "appVersion": "v1.22", + "cancel": "on", + }) + session.MakeRequest(t, req, http.StatusSeeOther) + task4 := runner.fetchTask(t) + _, _, run4 := getTaskAndJobAndRunByTaskID(t, task4.Id) + assert.Equal(t, actions_model.StatusRunning, run4.Status) + assert.Equal(t, "workflow-dispatch-v1.22", run4.ConcurrencyGroup) + _, _, run2 = getTaskAndJobAndRunByTaskID(t, task2.Id) + assert.Equal(t, actions_model.StatusCancelled, run2.Status) + + runner.execTask(t, task4, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + + // rerun cancel true scenario + + req = NewRequestWithValues(t, "POST", fmt.Sprintf("/%s/%s/actions/runs/%d/rerun", user2.Name, apiRepo.Name, run2.Index), map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + }) + _ = session.MakeRequest(t, req, http.StatusOK) + + req = NewRequestWithValues(t, "POST", fmt.Sprintf("/%s/%s/actions/runs/%d/rerun", user2.Name, apiRepo.Name, run4.Index), map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + }) + _ = session.MakeRequest(t, req, http.StatusOK) + + task5 := runner.fetchTask(t) + _, _, run4_1 := getTaskAndJobAndRunByTaskID(t, task5.Id) + assert.Equal(t, "workflow-dispatch-v1.22", run4_1.ConcurrencyGroup) + assert.Equal(t, run4.ID, run4_1.ID) + _, _, run2_1 := getTaskAndJobAndRunByTaskID(t, task2.Id) + assert.Equal(t, actions_model.StatusCancelled, run2_1.Status) + + runner.execTask(t, task5, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_CANCELLED, + }) + + // rerun cancel false scenario + + req = NewRequestWithValues(t, "POST", fmt.Sprintf("/%s/%s/actions/runs/%d/rerun", user2.Name, apiRepo.Name, run2.Index), map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + }) + _ = session.MakeRequest(t, req, http.StatusOK) + + req = NewRequestWithValues(t, "POST", fmt.Sprintf("/%s/%s/actions/runs/%d/rerun", user2.Name, apiRepo.Name, run2.Index+1), map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + }) + _ = session.MakeRequest(t, req, http.StatusOK) + + task6 := runner.fetchTask(t) + _, _, run2_2 := getTaskAndJobAndRunByTaskID(t, task6.Id) + assert.Equal(t, "workflow-dispatch-v1.22", run2_2.ConcurrencyGroup) + + runner.fetchNoTask(t) // cannot fetch task because task2 is not completed + + runner.execTask(t, task6, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + + task7 := runner.fetchTask(t) + _, _, run3 := getTaskAndJobAndRunByTaskID(t, task7.Id) + assert.Equal(t, "workflow-dispatch-v1.22", run3.ConcurrencyGroup) + runner.execTask(t, task7, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + }) +} + func TestScheduleConcurrency(t *testing.T) { onGiteaRun(t, func(t *testing.T, u *url.URL) { user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2})