Iterating on cance;ation logic

This commit is contained in:
Laszlo Fogas 2019-09-16 15:18:15 +02:00
parent 6bae1b6c83
commit d8fe50dfa7
6 changed files with 95 additions and 77 deletions

View File

@ -99,41 +99,57 @@ func (q *fifo) Poll(c context.Context, f Filter) (*Task, error) {
// Done signals that the item is done executing. // Done signals that the item is done executing.
func (q *fifo) Done(c context.Context, id string, exitStatus string) error { func (q *fifo) Done(c context.Context, id string, exitStatus string) error {
return q.finished(id, exitStatus, nil) return q.finished([]string{id}, exitStatus, nil)
} }
// Error signals that the item is done executing with error. // Error signals that the item is done executing with error.
func (q *fifo) Error(c context.Context, id string, err error) error { func (q *fifo) Error(c context.Context, id string, err error) error {
return q.finished([]string{id}, StatusFailure, err)
}
// Error signals that the item is done executing with error.
func (q *fifo) ErrorAtOnce(c context.Context, id []string, err error) error {
return q.finished(id, StatusFailure, err) return q.finished(id, StatusFailure, err)
} }
func (q *fifo) finished(id string, exitStatus string, err error) error { func (q *fifo) finished(ids []string, exitStatus string, err error) error {
q.Lock() q.Lock()
taskEntry, ok := q.running[id]
if ok { for _, id := range ids {
taskEntry.error = err taskEntry, ok := q.running[id]
close(taskEntry.done) if ok {
delete(q.running, id) taskEntry.error = err
} else { close(taskEntry.done)
q.removeFromPending(id) delete(q.running, id)
} else {
q.removeFromPending(id)
}
q.updateDepStatusInQueue(id, exitStatus)
} }
q.updateDepStatusInQueue(id, exitStatus)
q.Unlock() q.Unlock()
return nil return nil
} }
// Evict removes a pending task from the queue. // Evict removes a pending task from the queue.
func (q *fifo) Evict(c context.Context, id string) error { func (q *fifo) Evict(c context.Context, id string) error {
return q.EvictAtOnce(c, []string{id})
}
// Evict removes a pending task from the queue.
func (q *fifo) EvictAtOnce(c context.Context, ids []string) error {
q.Lock() q.Lock()
defer q.Unlock() defer q.Unlock()
var next *list.Element for _, id := range ids {
for e := q.pending.Front(); e != nil; e = next { var next *list.Element
next = e.Next() for e := q.pending.Front(); e != nil; e = next {
task, ok := e.Value.(*Task) next = e.Next()
if ok && task.ID == id { task, ok := e.Value.(*Task)
q.pending.Remove(e) if ok && task.ID == id {
return nil q.pending.Remove(e)
return nil
}
} }
} }
return ErrNotFound return ErrNotFound

View File

@ -123,9 +123,15 @@ type Queue interface {
// Error signals the task is complete with errors. // Error signals the task is complete with errors.
Error(c context.Context, id string, err error) error Error(c context.Context, id string, err error) error
// Error signals the task is complete with errors.
ErrorAtOnce(c context.Context, id []string, err error) error
// Evict removes a pending task from the queue. // Evict removes a pending task from the queue.
Evict(c context.Context, id string) error Evict(c context.Context, id string) error
// Evict removes a pending task from the queue.
EvictAtOnce(c context.Context, id []string) error
// Wait waits until the task is complete. // Wait waits until the task is complete.
Wait(c context.Context, id string) error Wait(c context.Context, id string) error

View File

@ -119,3 +119,14 @@ func (q *persistentQueue) Evict(c context.Context, id string) error {
} }
return err return err
} }
// Evict removes a pending task from the queue.
func (q *persistentQueue) EvictAtOnce(c context.Context, ids []string) error {
err := q.Queue.EvictAtOnce(c, ids)
if err == nil {
for _, id := range ids {
q.store.TaskDelete(id)
}
}
return err
}

View File

@ -113,7 +113,7 @@ func Load(mux *httptreemux.ContextMux, middleware ...gin.HandlerFunc) http.Handl
repo.POST("/move", session.MustRepoAdmin(), server.MoveRepo) repo.POST("/move", session.MustRepoAdmin(), server.MoveRepo)
repo.POST("/builds/:number", session.MustPush, server.PostBuild) repo.POST("/builds/:number", session.MustPush, server.PostBuild)
repo.DELETE("/builds/:number", session.MustRepoAdmin(), server.ZombieKill) repo.DELETE("/builds/:number", session.MustPush, server.DeleteBuild)
repo.POST("/builds/:number/approve", session.MustPush, server.PostApproval) repo.POST("/builds/:number/approve", session.MustPush, server.PostApproval)
repo.POST("/builds/:number/decline", session.MustPush, server.PostDecline) repo.POST("/builds/:number/decline", session.MustPush, server.PostDecline)
repo.DELETE("/builds/:number/:job", session.MustPush, server.DeleteBuild) repo.DELETE("/builds/:number/:job", session.MustPush, server.DeleteBuild)

View File

@ -160,8 +160,6 @@ func GetProcLogs(c *gin.Context) {
// DeleteBuild cancels a build // DeleteBuild cancels a build
func DeleteBuild(c *gin.Context) { func DeleteBuild(c *gin.Context) {
repo := session.Repo(c) repo := session.Repo(c)
// parse the build number from the request parameter.
num, _ := strconv.Atoi(c.Params.ByName("number")) num, _ := strconv.Atoi(c.Params.ByName("number"))
build, err := store.GetBuildNumber(c, repo, num) build, err := store.GetBuildNumber(c, repo, num)
@ -176,76 +174,63 @@ func DeleteBuild(c *gin.Context) {
return return
} }
cancelled := false if build.Status != model.StatusRunning && build.Status != model.StatusPending {
c.String(400, "Cannot cancel a non-running or non-pending build")
return
}
// First cancel/evict procs in the queue in one go
procToCancel := []string{}
procToEvict := []string{}
for _, proc := range procs { for _, proc := range procs {
if proc.PPID != 0 { if proc.PPID != 0 {
continue continue
} }
if proc.State == model.StatusRunning {
if proc.State != model.StatusRunning && proc.State != model.StatusPending { procToCancel = append(procToCancel, fmt.Sprint(proc.ID))
continue
} }
if proc.State == model.StatusPending {
// TODO cancel child procs procToEvict = append(procToEvict, fmt.Sprint(proc.ID))
if _, err = UpdateProcToStatusKilled(store.FromContext(c), *proc); err != nil {
log.Printf("error: done: cannot update proc_id %d state: %s", proc.ID, err)
} }
Config.Services.Queue.Error(context.Background(), fmt.Sprint(proc.ID), queue.ErrCancel)
cancelled = true
}
if !cancelled {
c.String(400, "Cannot cancel a non-running build")
return
}
c.String(204, "")
}
// ZombieKill kills zombie processes stuck in an infinite pending
// or running state. This can only be invoked by administrators and
// may have negative effects.
func ZombieKill(c *gin.Context) {
repo := session.Repo(c)
// parse the build number and job sequence number from
// the repquest parameter.
num, _ := strconv.Atoi(c.Params.ByName("number"))
build, err := store.GetBuildNumber(c, repo, num)
if err != nil {
c.AbortWithError(404, err)
return
}
procs, err := store.FromContext(c).ProcList(build)
if err != nil {
c.AbortWithError(404, err)
return
}
if build.Status != model.StatusRunning {
c.String(400, "Cannot force cancel a non-running build")
return
} }
Config.Services.Queue.EvictAtOnce(context.Background(), procToEvict)
Config.Services.Queue.ErrorAtOnce(context.Background(), procToEvict, queue.ErrCancel)
Config.Services.Queue.ErrorAtOnce(context.Background(), procToCancel, queue.ErrCancel)
// Then update the DB status for pending builds
// Running ones will be set when the agents stop on the cancel signal
for _, proc := range procs { for _, proc := range procs {
if proc.Running() { if proc.State == model.StatusPending {
if _, err := UpdateProcToStatusKilled(store.FromContext(c), *proc); err != nil { if proc.PPID != 0 {
log.Printf("error: done: cannot update proc_id %d state: %s", proc.ID, err) if _, err = UpdateProcToStatusSkipped(store.FromContext(c), *proc, 0); err != nil {
log.Printf("error: done: cannot update proc_id %d state: %s", proc.ID, err)
}
} else {
if _, err = UpdateProcToStatusKilled(store.FromContext(c), *proc); err != nil {
log.Printf("error: done: cannot update proc_id %d state: %s", proc.ID, err)
}
} }
} else {
store.FromContext(c).ProcUpdate(proc)
} }
Config.Services.Queue.Error(context.Background(), fmt.Sprint(proc.ID), queue.ErrCancel)
} }
if _, err := UpdateToStatusKilled(store.FromContext(c), *build); err != nil { killedBuild, err := UpdateToStatusKilled(store.FromContext(c), *build)
if err != nil {
c.AbortWithError(500, err) c.AbortWithError(500, err)
return return
} }
// For pending builds, we stream the UI the latest state.
// For running builds, the UI will be updated when the agents acknowledge the cancel
if build.Status == model.StatusPending {
procs, err = store.FromContext(c).ProcList(killedBuild)
if err != nil {
c.AbortWithError(404, err)
return
}
killedBuild.Procs = model.Tree(procs)
publishToTopic(c, killedBuild, repo, model.Cancelled)
}
c.String(204, "") c.String(204, "")
} }
@ -353,7 +338,7 @@ func PostApproval(c *gin.Context) {
} }
}() }()
publishToTopic(c, build, repo) publishToTopic(c, build, repo, model.Enqueued)
queueBuild(build, repo, buildItems) queueBuild(build, repo, buildItems)
} }
@ -557,7 +542,7 @@ func PostBuild(c *gin.Context) {
} }
c.JSON(202, build) c.JSON(202, build)
publishToTopic(c, build, repo) publishToTopic(c, build, repo, model.Enqueued)
queueBuild(build, repo, buildItems) queueBuild(build, repo, buildItems)
} }

View File

@ -285,7 +285,7 @@ func PostHook(c *gin.Context) {
} }
}() }()
publishToTopic(c, build, repo) publishToTopic(c, build, repo, model.Enqueued)
queueBuild(build, repo, buildItems) queueBuild(build, repo, buildItems)
} }
@ -360,7 +360,7 @@ func findOrPersistPipelineConfig(build *model.Build, remoteYamlConfig *remote.Fi
} }
// publishes message to UI clients // publishes message to UI clients
func publishToTopic(c *gin.Context, build *model.Build, repo *model.Repo) { func publishToTopic(c *gin.Context, build *model.Build, repo *model.Repo, event model.EventType) {
message := pubsub.Message{ message := pubsub.Message{
Labels: map[string]string{ Labels: map[string]string{
"repo": repo.FullName, "repo": repo.FullName,