Add queue details UI for admins (#1632)

# Changes
- Adds an admin view to see the whole work-queue of the server. 
- The admin can also pause / resume the queue. 
- The view is reloading data every 5 seconds automatically.
- The task model from queue got removed in favor of the one from models.
This commit is contained in:
Anbraten
2023-03-20 04:50:56 +01:00
committed by GitHub
parent 4d5c59556e
commit 2337f1854a
19 changed files with 432 additions and 265 deletions

View File

@@ -26,14 +26,8 @@ import (
"github.com/rs/zerolog/log"
)
const (
StatusSkipped = "skipped"
StatusSuccess = "success"
StatusFailure = "failure"
)
type entry struct {
item *Task
item *model.Task
done chan bool
error error
deadline time.Time
@@ -41,7 +35,7 @@ type entry struct {
type worker struct {
filter FilterFn
channel chan *Task
channel chan *model.Task
}
type fifo struct {
@@ -68,7 +62,7 @@ func New(_ context.Context) Queue {
}
// Push pushes an item to the tail of this queue.
func (q *fifo) Push(_ context.Context, task *Task) error {
func (q *fifo) Push(_ context.Context, task *model.Task) error {
q.Lock()
q.pending.PushBack(task)
q.Unlock()
@@ -77,7 +71,7 @@ func (q *fifo) Push(_ context.Context, task *Task) error {
}
// Push pushes an item to the tail of this queue.
func (q *fifo) PushAtOnce(_ context.Context, tasks []*Task) error {
func (q *fifo) PushAtOnce(_ context.Context, tasks []*model.Task) error {
q.Lock()
for _, task := range tasks {
q.pending.PushBack(task)
@@ -88,10 +82,10 @@ func (q *fifo) PushAtOnce(_ context.Context, tasks []*Task) error {
}
// Poll retrieves and removes the head of this queue.
func (q *fifo) Poll(c context.Context, f FilterFn) (*Task, error) {
func (q *fifo) Poll(c context.Context, f FilterFn) (*model.Task, error) {
q.Lock()
w := &worker{
channel: make(chan *Task, 1),
channel: make(chan *model.Task, 1),
filter: f,
}
q.workers[w] = struct{}{}
@@ -113,20 +107,20 @@ func (q *fifo) Poll(c context.Context, f FilterFn) (*Task, error) {
// Done signals that the item is done executing.
func (q *fifo) Done(_ context.Context, id string, exitStatus model.StatusValue) error {
return q.finished([]string{id}, string(exitStatus), nil)
return q.finished([]string{id}, exitStatus, nil)
}
// Error signals that the item is done executing with error.
func (q *fifo) Error(_ context.Context, id string, err error) error {
return q.finished([]string{id}, StatusFailure, err)
return q.finished([]string{id}, model.StatusFailure, err)
}
// Error signals that the item is done executing with error.
func (q *fifo) ErrorAtOnce(_ context.Context, id []string, err error) error {
return q.finished(id, StatusFailure, err)
return q.finished(id, model.StatusFailure, err)
}
func (q *fifo) finished(ids []string, exitStatus string, err error) error {
func (q *fifo) finished(ids []string, exitStatus model.StatusValue, err error) error {
q.Lock()
for _, id := range ids {
@@ -159,7 +153,7 @@ func (q *fifo) EvictAtOnce(_ context.Context, ids []string) error {
var next *list.Element
for e := q.pending.Front(); e != nil; e = next {
next = e.Next()
task, ok := e.Value.(*Task)
task, ok := e.Value.(*model.Task)
if ok && task.ID == id {
q.pending.Remove(e)
return nil
@@ -205,12 +199,13 @@ func (q *fifo) Info(_ context.Context) InfoT {
stats.Stats.Pending = q.pending.Len()
stats.Stats.WaitingOnDeps = q.waitingOnDeps.Len()
stats.Stats.Running = len(q.running)
stats.Stats.Complete = 0 // TODO: implement this
for e := q.pending.Front(); e != nil; e = e.Next() {
stats.Pending = append(stats.Pending, e.Value.(*Task))
stats.Pending = append(stats.Pending, e.Value.(*model.Task))
}
for e := q.waitingOnDeps.Front(); e != nil; e = e.Next() {
stats.WaitingOnDeps = append(stats.WaitingOnDeps, e.Value.(*Task))
stats.WaitingOnDeps = append(stats.WaitingOnDeps, e.Value.(*model.Task))
}
for _, entry := range q.running {
stats.Running = append(stats.Running, entry.item)
@@ -258,7 +253,7 @@ func (q *fifo) process() {
q.resubmitExpiredPipelines()
q.filterWaiting()
for pending, worker := q.assignToWorker(); pending != nil && worker != nil; pending, worker = q.assignToWorker() {
task := pending.Value.(*Task)
task := pending.Value.(*model.Task)
delete(q.workers, worker)
q.pending.Remove(pending)
q.running[task.ID] = &entry{
@@ -275,7 +270,7 @@ func (q *fifo) filterWaiting() {
var nextWaiting *list.Element
for e := q.waitingOnDeps.Front(); e != nil; e = nextWaiting {
nextWaiting = e.Next()
task := e.Value.(*Task)
task := e.Value.(*model.Task)
q.pending.PushBack(task)
}
@@ -285,7 +280,7 @@ func (q *fifo) filterWaiting() {
var nextPending *list.Element
for e := q.pending.Front(); e != nil; e = nextPending {
nextPending = e.Next()
task := e.Value.(*Task)
task := e.Value.(*model.Task)
if q.depsInQueue(task) {
log.Debug().Msgf("queue: waiting due to unmet dependencies %v", task.ID)
q.waitingOnDeps.PushBack(task)
@@ -303,7 +298,7 @@ func (q *fifo) assignToWorker() (*list.Element, *worker) {
var next *list.Element
for e := q.pending.Front(); e != nil; e = next {
next = e.Next()
task := e.Value.(*Task)
task := e.Value.(*model.Task)
log.Debug().Msgf("queue: trying to assign task: %v with deps %v", task.ID, task.Dependencies)
for w := range q.workers {
@@ -327,11 +322,11 @@ func (q *fifo) resubmitExpiredPipelines() {
}
}
func (q *fifo) depsInQueue(task *Task) bool {
func (q *fifo) depsInQueue(task *model.Task) bool {
var next *list.Element
for e := q.pending.Front(); e != nil; e = next {
next = e.Next()
possibleDep, ok := e.Value.(*Task)
possibleDep, ok := e.Value.(*model.Task)
log.Debug().Msgf("queue: pending right now: %v", possibleDep.ID)
for _, dep := range task.Dependencies {
if ok && possibleDep.ID == dep {
@@ -350,11 +345,11 @@ func (q *fifo) depsInQueue(task *Task) bool {
return false
}
func (q *fifo) updateDepStatusInQueue(taskID, status string) {
func (q *fifo) updateDepStatusInQueue(taskID string, status model.StatusValue) {
var next *list.Element
for e := q.pending.Front(); e != nil; e = next {
next = e.Next()
pending, ok := e.Value.(*Task)
pending, ok := e.Value.(*model.Task)
for _, dep := range pending.Dependencies {
if ok && taskID == dep {
pending.DepStatus[dep] = status
@@ -372,7 +367,7 @@ func (q *fifo) updateDepStatusInQueue(taskID, status string) {
for e := q.waitingOnDeps.Front(); e != nil; e = next {
next = e.Next()
waiting, ok := e.Value.(*Task)
waiting, ok := e.Value.(*model.Task)
for _, dep := range waiting.Dependencies {
if ok && taskID == dep {
waiting.DepStatus[dep] = status
@@ -386,7 +381,7 @@ func (q *fifo) removeFromPending(taskID string) {
var next *list.Element
for e := q.pending.Front(); e != nil; e = next {
next = e.Next()
task := e.Value.(*Task)
task := e.Value.(*model.Task)
if task.ID == taskID {
log.Debug().Msgf("queue: %s is removed from pending", taskID)
q.pending.Remove(e)