mirror of
https://github.com/woodpecker-ci/woodpecker.git
synced 2025-10-22 18:48:53 +00:00
Refactor pipeline parsing and forge refreshing (#2527)
- refactor pipeline parsing - do not parse the pipeline multiple times to perform filter checks, do this once and perform checks on the result directly - code deduplication - refactor forge token refreshing - move refreshing to a helper func to reduce code --------- Co-authored-by: Anbraten <anton@ju60.de>
This commit is contained in:
@@ -23,7 +23,6 @@ import (
|
||||
|
||||
"github.com/woodpecker-ci/woodpecker/server"
|
||||
"github.com/woodpecker-ci/woodpecker/server/forge"
|
||||
"github.com/woodpecker-ci/woodpecker/server/forge/types"
|
||||
"github.com/woodpecker-ci/woodpecker/server/model"
|
||||
"github.com/woodpecker-ci/woodpecker/server/store"
|
||||
)
|
||||
@@ -37,118 +36,92 @@ func Create(ctx context.Context, _store store.Store, repo *model.Repo, pipeline
|
||||
return nil, fmt.Errorf(msg)
|
||||
}
|
||||
|
||||
// if the forge has a refresh token, the current access token
|
||||
// If the forge has a refresh token, the current access token
|
||||
// may be stale. Therefore, we should refresh prior to dispatching
|
||||
// the pipeline.
|
||||
if refresher, ok := server.Config.Services.Forge.(forge.Refresher); ok {
|
||||
refreshed, err := refresher.Refresh(ctx, repoUser)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msgf("failed to refresh oauth2 token for repoUser: %s", repoUser.Login)
|
||||
} else if refreshed {
|
||||
if err := _store.UpdateUser(repoUser); err != nil {
|
||||
log.Error().Err(err).Msgf("error while updating repoUser: %s", repoUser.Login)
|
||||
// move forward
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
forgeYamlConfigs []*types.FileMeta
|
||||
configFetchErr error
|
||||
filtered bool
|
||||
parseErr error
|
||||
)
|
||||
|
||||
// fetch the pipeline file from the forge
|
||||
configFetcher := forge.NewConfigFetcher(server.Config.Services.Forge, server.Config.Services.Timeout, server.Config.Services.ConfigService, repoUser, repo, pipeline)
|
||||
forgeYamlConfigs, configFetchErr = configFetcher.Fetch(ctx)
|
||||
if configFetchErr == nil {
|
||||
filtered, parseErr = checkIfFiltered(repo, pipeline, forgeYamlConfigs)
|
||||
if parseErr == nil {
|
||||
if filtered {
|
||||
err := ErrFiltered{Msg: "global when filter of all workflows do skip this pipeline"}
|
||||
log.Debug().Str("repo", repo.FullName).Msgf("%v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if zeroSteps(pipeline, forgeYamlConfigs) {
|
||||
err := ErrFiltered{Msg: "step conditions yield zero runnable steps"}
|
||||
log.Debug().Str("repo", repo.FullName).Msgf("%v", err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
forge.Refresh(ctx, server.Config.Services.Forge, _store, repoUser)
|
||||
|
||||
// update some pipeline fields
|
||||
pipeline.RepoID = repo.ID
|
||||
pipeline.Status = model.StatusPending
|
||||
|
||||
// fetch the pipeline file from the forge
|
||||
configFetcher := forge.NewConfigFetcher(server.Config.Services.Forge, server.Config.Services.Timeout, server.Config.Services.ConfigService, repoUser, repo, pipeline)
|
||||
forgeYamlConfigs, configFetchErr := configFetcher.Fetch(ctx)
|
||||
|
||||
if configFetchErr != nil {
|
||||
log.Debug().Str("repo", repo.FullName).Err(configFetchErr).Msgf("cannot find config '%s' in '%s' with user: '%s'", repo.Config, pipeline.Ref, repoUser.Login)
|
||||
pipeline.Started = time.Now().Unix()
|
||||
pipeline.Finished = pipeline.Started
|
||||
pipeline.Status = model.StatusError
|
||||
pipeline.Error = fmt.Sprintf("pipeline definition not found in %s", repo.FullName)
|
||||
} else if parseErr != nil {
|
||||
log.Debug().Str("repo", repo.FullName).Err(parseErr).Msg("failed to parse yaml")
|
||||
pipeline.Started = time.Now().Unix()
|
||||
pipeline.Finished = pipeline.Started
|
||||
pipeline.Status = model.StatusError
|
||||
pipeline.Error = fmt.Sprintf("failed to parse pipeline: %s", parseErr.Error())
|
||||
} else {
|
||||
setGatedState(repo, pipeline)
|
||||
return nil, persistPipelineWithErr(ctx, _store, pipeline, repo, repoUser, fmt.Sprintf("pipeline definition not found in %s", repo.FullName))
|
||||
}
|
||||
|
||||
pipelineItems, parseErr := parsePipeline(_store, pipeline, repoUser, repo, forgeYamlConfigs, nil)
|
||||
if parseErr != nil {
|
||||
log.Debug().Str("repo", repo.FullName).Err(parseErr).Msg("failed to parse yaml")
|
||||
return nil, persistPipelineWithErr(ctx, _store, pipeline, repo, repoUser, fmt.Sprintf("failed to parse pipeline: %s", parseErr.Error()))
|
||||
}
|
||||
|
||||
if len(pipelineItems) == 0 {
|
||||
log.Debug().Str("repo", repo.FullName).Msg(ErrFiltered.Error())
|
||||
return nil, ErrFiltered
|
||||
}
|
||||
|
||||
setGatedState(repo, pipeline)
|
||||
|
||||
err = _store.CreatePipeline(pipeline)
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("failure to save pipeline for %s", repo.FullName)
|
||||
log.Error().Err(err).Msg(msg)
|
||||
return nil, fmt.Errorf(msg)
|
||||
msg := fmt.Errorf("failed to save pipeline for %s", repo.FullName)
|
||||
log.Error().Err(err).Msg(msg.Error())
|
||||
return nil, msg
|
||||
}
|
||||
|
||||
pipeline = setPipelineStepsOnPipeline(pipeline, pipelineItems)
|
||||
|
||||
// persist the pipeline config for historical correctness, restarts, etc
|
||||
var configs []*model.Config
|
||||
for _, forgeYamlConfig := range forgeYamlConfigs {
|
||||
_, err := findOrPersistPipelineConfig(_store, pipeline, forgeYamlConfig)
|
||||
config, err := findOrPersistPipelineConfig(_store, pipeline, forgeYamlConfig)
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("failure to find or persist pipeline config for %s", repo.FullName)
|
||||
msg := fmt.Sprintf("failed to find or persist pipeline config for %s", repo.FullName)
|
||||
log.Error().Err(err).Msg(msg)
|
||||
return nil, fmt.Errorf(msg)
|
||||
}
|
||||
configs = append(configs, config)
|
||||
}
|
||||
|
||||
if pipeline.Status == model.StatusError {
|
||||
if err := publishToTopic(ctx, pipeline, repo); err != nil {
|
||||
log.Error().Err(err).Msg("publishToTopic")
|
||||
}
|
||||
|
||||
updatePipelineStatus(ctx, pipeline, repo, repoUser)
|
||||
|
||||
return pipeline, nil
|
||||
}
|
||||
|
||||
pipeline, pipelineItems, err := createPipelineItems(ctx, _store, pipeline, repoUser, repo, forgeYamlConfigs, nil)
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("failure to createPipelineItems for %s", repo.FullName)
|
||||
// link pipeline to persisted configs
|
||||
if err := linkPipelineConfigs(_store, configs, pipeline.ID); err != nil {
|
||||
msg := fmt.Sprintf("failed to find or persist pipeline config for %s", repo.FullName)
|
||||
log.Error().Err(err).Msg(msg)
|
||||
return nil, fmt.Errorf(msg)
|
||||
}
|
||||
|
||||
if pipeline.Status == model.StatusBlocked {
|
||||
if err := publishToTopic(ctx, pipeline, repo); err != nil {
|
||||
log.Error().Err(err).Msg("publishToTopic")
|
||||
}
|
||||
|
||||
updatePipelineStatus(ctx, pipeline, repo, repoUser)
|
||||
|
||||
publishPipeline(ctx, pipeline, repo, repoUser)
|
||||
return pipeline, nil
|
||||
}
|
||||
|
||||
pipeline, err = start(ctx, _store, pipeline, repoUser, repo, pipelineItems)
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("failure to start pipeline for %s", repo.FullName)
|
||||
msg := fmt.Sprintf("failed to start pipeline for %s", repo.FullName)
|
||||
log.Error().Err(err).Msg(msg)
|
||||
return nil, fmt.Errorf(msg)
|
||||
}
|
||||
|
||||
return pipeline, nil
|
||||
}
|
||||
|
||||
func persistPipelineWithErr(ctx context.Context, _store store.Store, pipeline *model.Pipeline, repo *model.Repo, repoUser *model.User, err string) error {
|
||||
pipeline.Started = time.Now().Unix()
|
||||
pipeline.Finished = pipeline.Started
|
||||
pipeline.Status = model.StatusError
|
||||
pipeline.Error = err
|
||||
dbErr := _store.CreatePipeline(pipeline)
|
||||
if dbErr != nil {
|
||||
msg := fmt.Errorf("failed to save pipeline for %s", repo.FullName)
|
||||
log.Error().Err(dbErr).Msg(msg.Error())
|
||||
return msg
|
||||
}
|
||||
|
||||
publishPipeline(ctx, pipeline, repo, repoUser)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user