Move cncd/{logging,pubsub,queue}/ to server/{logging,pubsub,queue}/ (#346)

* Move cncd/{logging,pubsub,queue}/ to server/{logging,pubsub,queue}/

* Update REAMDEs and include history

Co-authored-by: Anbraten <anton@ju60.de>

Co-authored-by: Anbraten <anton@ju60.de>
This commit is contained in:
Jacob Floyd
2021-09-23 15:29:09 -05:00
committed by GitHub
parent 780c902a6b
commit a0d008e071
30 changed files with 51 additions and 33 deletions

29
server/queue/LICENSE Normal file
View File

@@ -0,0 +1,29 @@
BSD 3-Clause License
Copyright (c) 2017, Brad Rydzewski
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

12
server/queue/README.md Normal file
View File

@@ -0,0 +1,12 @@
# queue package
Go package provides a common interface for working with task queues.
## History
This was originally published in: https://github.com/cncd/queue
Then it was included in: https://github.com/drone-ci/drone/cncd/queue
## Documentation:
https://godoc.org/github.com/woodpecker-ci/woodpecker/server/queue

383
server/queue/fifo.go Normal file
View File

@@ -0,0 +1,383 @@
package queue
import (
"container/list"
"context"
"log"
"runtime"
"sync"
"time"
"github.com/sirupsen/logrus"
)
const (
StatusSkipped = "skipped"
StatusSuccess = "success"
StatusFailure = "failure"
)
type entry struct {
item *Task
done chan bool
retry int
error error
deadline time.Time
}
type worker struct {
filter Filter
channel chan *Task
}
type fifo struct {
sync.Mutex
workers map[*worker]struct{}
running map[string]*entry
pending *list.List
waitingOnDeps *list.List
extension time.Duration
paused bool
}
// New returns a new fifo queue.
func New() Queue {
return &fifo{
workers: map[*worker]struct{}{},
running: map[string]*entry{},
pending: list.New(),
waitingOnDeps: list.New(),
extension: time.Minute * 10,
paused: false,
}
}
// Push pushes an item to the tail of this queue.
func (q *fifo) Push(c context.Context, task *Task) error {
q.Lock()
q.pending.PushBack(task)
q.Unlock()
go q.process()
return nil
}
// Push pushes an item to the tail of this queue.
func (q *fifo) PushAtOnce(c context.Context, tasks []*Task) error {
q.Lock()
for _, task := range tasks {
q.pending.PushBack(task)
}
q.Unlock()
go q.process()
return nil
}
// Poll retrieves and removes the head of this queue.
func (q *fifo) Poll(c context.Context, f Filter) (*Task, error) {
q.Lock()
w := &worker{
channel: make(chan *Task, 1),
filter: f,
}
q.workers[w] = struct{}{}
q.Unlock()
go q.process()
for {
select {
case <-c.Done():
q.Lock()
delete(q.workers, w)
q.Unlock()
return nil, nil
case t := <-w.channel:
return t, nil
}
}
}
// Done signals that the item is done executing.
func (q *fifo) Done(c context.Context, id string, exitStatus string) error {
return q.finished([]string{id}, exitStatus, nil)
}
// Error signals that the item is done executing with error.
func (q *fifo) Error(c context.Context, id string, err error) error {
return q.finished([]string{id}, StatusFailure, err)
}
// Error signals that the item is done executing with error.
func (q *fifo) ErrorAtOnce(c context.Context, id []string, err error) error {
return q.finished(id, StatusFailure, err)
}
func (q *fifo) finished(ids []string, exitStatus string, err error) error {
q.Lock()
for _, id := range ids {
taskEntry, ok := q.running[id]
if ok {
taskEntry.error = err
close(taskEntry.done)
delete(q.running, id)
} else {
q.removeFromPending(id)
}
q.updateDepStatusInQueue(id, exitStatus)
}
q.Unlock()
return nil
}
// Evict removes a pending task from the queue.
func (q *fifo) Evict(c context.Context, id string) error {
return q.EvictAtOnce(c, []string{id})
}
// Evict removes a pending task from the queue.
func (q *fifo) EvictAtOnce(c context.Context, ids []string) error {
q.Lock()
defer q.Unlock()
for _, id := range ids {
var next *list.Element
for e := q.pending.Front(); e != nil; e = next {
next = e.Next()
task, ok := e.Value.(*Task)
if ok && task.ID == id {
q.pending.Remove(e)
return nil
}
}
}
return ErrNotFound
}
// Wait waits until the item is done executing.
func (q *fifo) Wait(c context.Context, id string) error {
q.Lock()
state := q.running[id]
q.Unlock()
if state != nil {
select {
case <-c.Done():
case <-state.done:
return state.error
}
}
return nil
}
// Extend extends the task execution deadline.
func (q *fifo) Extend(c context.Context, id string) error {
q.Lock()
defer q.Unlock()
state, ok := q.running[id]
if ok {
state.deadline = time.Now().Add(q.extension)
return nil
}
return ErrNotFound
}
// Info returns internal queue information.
func (q *fifo) Info(c context.Context) InfoT {
q.Lock()
stats := InfoT{}
stats.Stats.Workers = len(q.workers)
stats.Stats.Pending = q.pending.Len()
stats.Stats.WaitingOnDeps = q.waitingOnDeps.Len()
stats.Stats.Running = len(q.running)
for e := q.pending.Front(); e != nil; e = e.Next() {
stats.Pending = append(stats.Pending, e.Value.(*Task))
}
for e := q.waitingOnDeps.Front(); e != nil; e = e.Next() {
stats.WaitingOnDeps = append(stats.WaitingOnDeps, e.Value.(*Task))
}
for _, entry := range q.running {
stats.Running = append(stats.Running, entry.item)
}
stats.Paused = q.paused
q.Unlock()
return stats
}
func (q *fifo) Pause() {
q.Lock()
q.paused = true
q.Unlock()
}
func (q *fifo) Resume() {
q.Lock()
q.paused = false
q.Unlock()
go q.process()
}
// helper function that loops through the queue and attempts to
// match the item to a single subscriber.
func (q *fifo) process() {
q.Lock()
defer q.Unlock()
if q.paused {
return
}
defer func() {
// the risk of panic is low. This code can probably be removed
// once the code has been used in real world installs without issue.
if err := recover(); err != nil {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
log.Printf("queue: unexpected panic: %v\n%s", err, buf)
}
}()
q.resubmitExpiredBuilds()
q.filterWaiting()
for pending, worker := q.assignToWorker(); pending != nil && worker != nil; pending, worker = q.assignToWorker() {
task := pending.Value.(*Task)
delete(q.workers, worker)
q.pending.Remove(pending)
q.running[task.ID] = &entry{
item: task,
done: make(chan bool),
deadline: time.Now().Add(q.extension),
}
worker.channel <- task
}
}
func (q *fifo) filterWaiting() {
// resubmits all waiting tasks to pending, deps may have cleared
var nextWaiting *list.Element
for e := q.waitingOnDeps.Front(); e != nil; e = nextWaiting {
nextWaiting = e.Next()
task := e.Value.(*Task)
q.pending.PushBack(task)
}
// rebuild waitingDeps
q.waitingOnDeps = list.New()
filtered := []*list.Element{}
var nextPending *list.Element
for e := q.pending.Front(); e != nil; e = nextPending {
nextPending = e.Next()
task := e.Value.(*Task)
if q.depsInQueue(task) {
logrus.Debugf("queue: waiting due to unmet dependencies %v", task.ID)
q.waitingOnDeps.PushBack(task)
filtered = append(filtered, e)
}
}
// filter waiting tasks
for _, f := range filtered {
q.pending.Remove(f)
}
}
func (q *fifo) assignToWorker() (*list.Element, *worker) {
var next *list.Element
for e := q.pending.Front(); e != nil; e = next {
next = e.Next()
task := e.Value.(*Task)
logrus.Debugf("queue: trying to assign task: %v with deps %v", task.ID, task.Dependencies)
for w := range q.workers {
if w.filter(task) {
logrus.Debugf("queue: assigned task: %v with deps %v", task.ID, task.Dependencies)
return e, w
}
}
}
return nil, nil
}
func (q *fifo) resubmitExpiredBuilds() {
for id, state := range q.running {
if time.Now().After(state.deadline) {
q.pending.PushFront(state.item)
delete(q.running, id)
close(state.done)
}
}
}
func (q *fifo) depsInQueue(task *Task) bool {
var next *list.Element
for e := q.pending.Front(); e != nil; e = next {
next = e.Next()
possibleDep, ok := e.Value.(*Task)
logrus.Debugf("queue: pending right now: %v", possibleDep.ID)
for _, dep := range task.Dependencies {
if ok && possibleDep.ID == dep {
return true
}
}
}
for possibleDepID := range q.running {
logrus.Debugf("queue: running right now: %v", possibleDepID)
for _, dep := range task.Dependencies {
if possibleDepID == dep {
return true
}
}
}
return false
}
func (q *fifo) updateDepStatusInQueue(taskID string, status string) {
var next *list.Element
for e := q.pending.Front(); e != nil; e = next {
next = e.Next()
pending, ok := e.Value.(*Task)
for _, dep := range pending.Dependencies {
if ok && taskID == dep {
pending.DepStatus[dep] = status
}
}
}
for _, running := range q.running {
for _, dep := range running.item.Dependencies {
if taskID == dep {
running.item.DepStatus[dep] = status
}
}
}
next = nil
for e := q.waitingOnDeps.Front(); e != nil; e = next {
next = e.Next()
waiting, ok := e.Value.(*Task)
for _, dep := range waiting.Dependencies {
if ok && taskID == dep {
waiting.DepStatus[dep] = status
}
}
}
}
func (q *fifo) removeFromPending(taskID string) {
logrus.Debugf("queue: trying to remove %s", taskID)
var next *list.Element
for e := q.pending.Front(); e != nil; e = next {
next = e.Next()
task := e.Value.(*Task)
if task.ID == taskID {
logrus.Debugf("queue: %s is removed from pending", taskID)
q.pending.Remove(e)
return
}
}
}

598
server/queue/fifo_test.go Normal file
View File

@@ -0,0 +1,598 @@
package queue
import (
"context"
"fmt"
"sync"
"testing"
"time"
)
var noContext = context.Background()
func TestFifo(t *testing.T) {
want := &Task{ID: "1"}
q := New()
q.Push(noContext, want)
info := q.Info(noContext)
if len(info.Pending) != 1 {
t.Errorf("expect task in pending queue")
return
}
got, _ := q.Poll(noContext, func(*Task) bool { return true })
if got != want {
t.Errorf("expect task returned form queue")
return
}
info = q.Info(noContext)
if len(info.Pending) != 0 {
t.Errorf("expect task removed from pending queue")
return
}
if len(info.Running) != 1 {
t.Errorf("expect task in running queue")
return
}
q.Done(noContext, got.ID, StatusSuccess)
info = q.Info(noContext)
if len(info.Pending) != 0 {
t.Errorf("expect task removed from pending queue")
return
}
if len(info.Running) != 0 {
t.Errorf("expect task removed from running queue")
return
}
}
func TestFifoExpire(t *testing.T) {
want := &Task{ID: "1"}
q := New().(*fifo)
q.extension = 0
q.Push(noContext, want)
info := q.Info(noContext)
if len(info.Pending) != 1 {
t.Errorf("expect task in pending queue")
return
}
got, _ := q.Poll(noContext, func(*Task) bool { return true })
if got != want {
t.Errorf("expect task returned form queue")
return
}
q.process()
if len(info.Pending) != 1 {
t.Errorf("expect task re-added to pending queue")
return
}
}
func TestFifoWait(t *testing.T) {
want := &Task{ID: "1"}
q := New().(*fifo)
q.Push(noContext, want)
got, _ := q.Poll(noContext, func(*Task) bool { return true })
if got != want {
t.Errorf("expect task returned form queue")
return
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
q.Wait(noContext, got.ID)
wg.Done()
}()
<-time.After(time.Millisecond)
q.Done(noContext, got.ID, StatusSuccess)
wg.Wait()
}
func TestFifoEvict(t *testing.T) {
t1 := &Task{ID: "1"}
q := New()
q.Push(noContext, t1)
info := q.Info(noContext)
if len(info.Pending) != 1 {
t.Errorf("expect task in pending queue")
}
if err := q.Evict(noContext, t1.ID); err != nil {
t.Errorf("expect task evicted from queue")
}
info = q.Info(noContext)
if len(info.Pending) != 0 {
t.Errorf("expect pending queue has zero items")
}
if err := q.Evict(noContext, t1.ID); err != ErrNotFound {
t.Errorf("expect not found error when evicting item not in queue, got %s", err)
}
}
func TestFifoDependencies(t *testing.T) {
task1 := &Task{
ID: "1",
}
task2 := &Task{
ID: "2",
Dependencies: []string{"1"},
DepStatus: make(map[string]string),
}
q := New().(*fifo)
q.PushAtOnce(noContext, []*Task{task2, task1})
got, _ := q.Poll(noContext, func(*Task) bool { return true })
if got != task1 {
t.Errorf("expect task1 returned from queue as task2 depends on it")
return
}
q.Done(noContext, got.ID, StatusSuccess)
got, _ = q.Poll(noContext, func(*Task) bool { return true })
if got != task2 {
t.Errorf("expect task2 returned from queue")
return
}
}
func TestFifoErrors(t *testing.T) {
task1 := &Task{
ID: "1",
}
task2 := &Task{
ID: "2",
Dependencies: []string{"1"},
DepStatus: make(map[string]string),
}
task3 := &Task{
ID: "3",
Dependencies: []string{"1"},
DepStatus: make(map[string]string),
RunOn: []string{"success", "failure"},
}
q := New().(*fifo)
q.PushAtOnce(noContext, []*Task{task2, task3, task1})
got, _ := q.Poll(noContext, func(*Task) bool { return true })
if got != task1 {
t.Errorf("expect task1 returned from queue as task2 depends on it")
return
}
q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error"))
got, _ = q.Poll(noContext, func(*Task) bool { return true })
if got != task2 {
t.Errorf("expect task2 returned from queue")
return
}
if got.ShouldRun() {
t.Errorf("expect task2 should not run, since task1 failed")
return
}
got, _ = q.Poll(noContext, func(*Task) bool { return true })
if got != task3 {
t.Errorf("expect task3 returned from queue")
return
}
if !got.ShouldRun() {
t.Errorf("expect task3 should run, task1 failed, but task3 runs on failure too")
return
}
}
func TestFifoErrors2(t *testing.T) {
task1 := &Task{
ID: "1",
}
task2 := &Task{
ID: "2",
}
task3 := &Task{
ID: "3",
Dependencies: []string{"1", "2"},
DepStatus: make(map[string]string),
}
q := New().(*fifo)
q.PushAtOnce(noContext, []*Task{task2, task3, task1})
for i := 0; i < 2; i++ {
got, _ := q.Poll(noContext, func(*Task) bool { return true })
if got != task1 && got != task2 {
t.Errorf("expect task1 or task2 returned from queue as task3 depends on them")
return
}
if got != task1 {
q.Done(noContext, got.ID, StatusSuccess)
}
if got != task2 {
q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error"))
}
}
got, _ := q.Poll(noContext, func(*Task) bool { return true })
if got != task3 {
t.Errorf("expect task3 returned from queue")
return
}
if got.ShouldRun() {
t.Errorf("expect task3 should not run, task1 succeeded but task2 failed")
return
}
}
func TestFifoErrorsMultiThread(t *testing.T) {
//logrus.SetLevel(logrus.DebugLevel)
task1 := &Task{
ID: "1",
}
task2 := &Task{
ID: "2",
Dependencies: []string{"1"},
DepStatus: make(map[string]string),
}
task3 := &Task{
ID: "3",
Dependencies: []string{"1", "2"},
DepStatus: make(map[string]string),
}
q := New().(*fifo)
q.PushAtOnce(noContext, []*Task{task2, task3, task1})
obtainedWorkCh := make(chan *Task)
for i := 0; i < 10; i++ {
go func(i int) {
for {
fmt.Printf("Worker %d started\n", i)
got, _ := q.Poll(noContext, func(*Task) bool { return true })
obtainedWorkCh <- got
}
}(i)
}
task1Processed := false
task2Processed := false
for {
select {
case got := <-obtainedWorkCh:
fmt.Println(got.ID)
if !task1Processed {
if got != task1 {
t.Errorf("expect task1 returned from queue as task2 and task3 depends on it")
return
} else {
task1Processed = true
q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error"))
go func() {
for {
fmt.Printf("Worker spawned\n")
got, _ := q.Poll(noContext, func(*Task) bool { return true })
obtainedWorkCh <- got
}
}()
}
} else if !task2Processed {
if got != task2 {
t.Errorf("expect task2 returned from queue")
return
} else {
task2Processed = true
q.Done(noContext, got.ID, StatusSuccess)
go func() {
for {
fmt.Printf("Worker spawned\n")
got, _ := q.Poll(noContext, func(*Task) bool { return true })
obtainedWorkCh <- got
}
}()
}
} else {
if got != task3 {
t.Errorf("expect task3 returned from queue")
return
}
if got.ShouldRun() {
t.Errorf("expect task3 should not run, task1 succeeded but task2 failed")
return
} else {
return
}
}
case <-time.After(5 * time.Second):
info := q.Info(noContext)
fmt.Println(info.String())
t.Errorf("test timed out")
return
}
}
}
func TestFifoTransitiveErrors(t *testing.T) {
task1 := &Task{
ID: "1",
}
task2 := &Task{
ID: "2",
Dependencies: []string{"1"},
DepStatus: make(map[string]string),
}
task3 := &Task{
ID: "3",
Dependencies: []string{"2"},
DepStatus: make(map[string]string),
}
q := New().(*fifo)
q.PushAtOnce(noContext, []*Task{task2, task3, task1})
got, _ := q.Poll(noContext, func(*Task) bool { return true })
if got != task1 {
t.Errorf("expect task1 returned from queue as task2 depends on it")
return
}
q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error"))
got, _ = q.Poll(noContext, func(*Task) bool { return true })
if got != task2 {
t.Errorf("expect task2 returned from queue")
return
}
if got.ShouldRun() {
t.Errorf("expect task2 should not run, since task1 failed")
return
}
q.Done(noContext, got.ID, StatusSkipped)
got, _ = q.Poll(noContext, func(*Task) bool { return true })
if got != task3 {
t.Errorf("expect task3 returned from queue")
return
}
if got.ShouldRun() {
t.Errorf("expect task3 should not run, task1 failed, thus task2 was skipped, task3 should be skipped too")
return
}
}
func TestFifoCancel(t *testing.T) {
task1 := &Task{
ID: "1",
}
task2 := &Task{
ID: "2",
Dependencies: []string{"1"},
DepStatus: make(map[string]string),
}
task3 := &Task{
ID: "3",
Dependencies: []string{"1"},
DepStatus: make(map[string]string),
RunOn: []string{"success", "failure"},
}
q := New().(*fifo)
q.PushAtOnce(noContext, []*Task{task2, task3, task1})
_, _ = q.Poll(noContext, func(*Task) bool { return true })
q.Error(noContext, task1.ID, fmt.Errorf("cancelled"))
q.Error(noContext, task2.ID, fmt.Errorf("cancelled"))
q.Error(noContext, task3.ID, fmt.Errorf("cancelled"))
info := q.Info(noContext)
if len(info.Pending) != 0 {
t.Errorf("All pipelines should be cancelled")
return
}
}
func TestFifoPause(t *testing.T) {
task1 := &Task{
ID: "1",
}
q := New().(*fifo)
var wg sync.WaitGroup
wg.Add(1)
go func() {
_, _ = q.Poll(noContext, func(*Task) bool { return true })
wg.Done()
}()
q.Pause()
t0 := time.Now()
q.Push(noContext, task1)
time.Sleep(20 * time.Millisecond)
q.Resume()
wg.Wait()
t1 := time.Now()
if t1.Sub(t0) < 20*time.Millisecond {
t.Errorf("Should have waited til resume")
}
q.Pause()
q.Push(noContext, task1)
q.Resume()
_, _ = q.Poll(noContext, func(*Task) bool { return true })
}
func TestFifoPauseResume(t *testing.T) {
task1 := &Task{
ID: "1",
}
q := New().(*fifo)
q.Pause()
q.Push(noContext, task1)
q.Resume()
_, _ = q.Poll(noContext, func(*Task) bool { return true })
}
func TestWaitingVsPending(t *testing.T) {
task1 := &Task{
ID: "1",
}
task2 := &Task{
ID: "2",
Dependencies: []string{"1"},
DepStatus: make(map[string]string),
}
task3 := &Task{
ID: "3",
Dependencies: []string{"1"},
DepStatus: make(map[string]string),
RunOn: []string{"success", "failure"},
}
q := New().(*fifo)
q.PushAtOnce(noContext, []*Task{task2, task3, task1})
got, _ := q.Poll(noContext, func(*Task) bool { return true })
info := q.Info(noContext)
if info.Stats.WaitingOnDeps != 2 {
t.Errorf("2 should wait on deps")
}
q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error"))
got, _ = q.Poll(noContext, func(*Task) bool { return true })
info = q.Info(noContext)
if info.Stats.WaitingOnDeps != 0 {
t.Errorf("0 should wait on deps")
}
if info.Stats.Pending != 1 {
t.Errorf("1 should wait for worker")
}
}
func TestShouldRun(t *testing.T) {
task := &Task{
ID: "2",
Dependencies: []string{"1"},
DepStatus: map[string]string{
"1": StatusSuccess,
},
RunOn: []string{"failure"},
}
if task.ShouldRun() {
t.Errorf("expect task to not run, it runs on failure only")
return
}
task = &Task{
ID: "2",
Dependencies: []string{"1"},
DepStatus: map[string]string{
"1": StatusSuccess,
},
RunOn: []string{"failure", "success"},
}
if !task.ShouldRun() {
t.Errorf("expect task to run")
return
}
task = &Task{
ID: "2",
Dependencies: []string{"1"},
DepStatus: map[string]string{
"1": StatusFailure,
},
}
if task.ShouldRun() {
t.Errorf("expect task to not run")
return
}
task = &Task{
ID: "2",
Dependencies: []string{"1"},
DepStatus: map[string]string{
"1": StatusSuccess,
},
RunOn: []string{"success"},
}
if !task.ShouldRun() {
t.Errorf("expect task to run")
return
}
task = &Task{
ID: "2",
Dependencies: []string{"1"},
DepStatus: map[string]string{
"1": StatusFailure,
},
RunOn: []string{"failure"},
}
if !task.ShouldRun() {
t.Errorf("expect task to run")
return
}
task = &Task{
ID: "2",
Dependencies: []string{"1"},
DepStatus: map[string]string{
"1": StatusSkipped,
},
}
if task.ShouldRun() {
t.Errorf("Tasked should not run if dependency is skipped")
return
}
task = &Task{
ID: "2",
Dependencies: []string{"1"},
DepStatus: map[string]string{
"1": StatusSkipped,
},
RunOn: []string{"failure"},
}
if !task.ShouldRun() {
t.Errorf("On Failure tasks should run on skipped deps, something failed higher up the chain")
return
}
}

172
server/queue/queue.go Normal file
View File

@@ -0,0 +1,172 @@
package queue
import (
"context"
"errors"
"fmt"
"strings"
)
var (
// ErrCancel indicates the task was cancelled.
ErrCancel = errors.New("queue: task cancelled")
// ErrNotFound indicates the task was not found in the queue.
ErrNotFound = errors.New("queue: task not found")
)
// Task defines a unit of work in the queue.
type Task struct {
// ID identifies this task.
ID string `json:"id,omitempty"`
// Data is the actual data in the entry.
Data []byte `json:"data"`
// Labels represents the key-value pairs the entry is lebeled with.
Labels map[string]string `json:"labels,omitempty"`
// Task IDs this task depend
Dependencies []string
// Dependency's exit status
DepStatus map[string]string
// RunOn failure or success
RunOn []string
}
// ShouldRun tells if a task should be run or skipped, based on dependencies
func (t *Task) ShouldRun() bool {
if runsOnFailure(t.RunOn) && runsOnSuccess(t.RunOn) {
return true
}
if !runsOnFailure(t.RunOn) && runsOnSuccess(t.RunOn) {
for _, status := range t.DepStatus {
if StatusSuccess != status {
return false
}
}
return true
}
if runsOnFailure(t.RunOn) && !runsOnSuccess(t.RunOn) {
for _, status := range t.DepStatus {
if StatusSuccess == status {
return false
}
}
return true
}
return false
}
func (t *Task) String() string {
var sb strings.Builder
sb.WriteString(fmt.Sprintf("%s (%s) - %s", t.ID, t.Dependencies, t.DepStatus))
return sb.String()
}
func runsOnFailure(runsOn []string) bool {
for _, status := range runsOn {
if status == "failure" {
return true
}
}
return false
}
func runsOnSuccess(runsOn []string) bool {
if len(runsOn) == 0 {
return true
}
for _, status := range runsOn {
if status == "success" {
return true
}
}
return false
}
// InfoT provides runtime information.
type InfoT struct {
Pending []*Task `json:"pending"`
WaitingOnDeps []*Task `json:"waiting_on_deps"`
Running []*Task `json:"running"`
Stats struct {
Workers int `json:"worker_count"`
Pending int `json:"pending_count"`
WaitingOnDeps int `json:"waiting_on_deps_count"`
Running int `json:"running_count"`
Complete int `json:"completed_count"`
} `json:"stats"`
Paused bool
}
func (t *InfoT) String() string {
var sb strings.Builder
for _, task := range t.Pending {
sb.WriteString("\t" + task.String())
}
for _, task := range t.Running {
sb.WriteString("\t" + task.String())
}
for _, task := range t.WaitingOnDeps {
sb.WriteString("\t" + task.String())
}
return sb.String()
}
// Filter filters tasks in the queue. If the Filter returns false,
// the Task is skipped and not returned to the subscriber.
type Filter func(*Task) bool
// Queue defines a task queue for scheduling tasks among
// a pool of workers.
type Queue interface {
// Push pushes a task to the tail of this queue.
Push(c context.Context, task *Task) error
// Push pushes a task to the tail of this queue.
PushAtOnce(c context.Context, tasks []*Task) error
// Poll retrieves and removes a task head of this queue.
Poll(c context.Context, f Filter) (*Task, error)
// Extend extends the deadline for a task.
Extend(c context.Context, id string) error
// Done signals the task is complete.
Done(c context.Context, exitStatus string, id string) error
// Error signals the task is complete with errors.
Error(c context.Context, id string, err error) error
// Error signals the task is complete with errors.
ErrorAtOnce(c context.Context, id []string, err error) error
// Evict removes a pending task from the queue.
Evict(c context.Context, id string) error
// Evict removes a pending task from the queue.
EvictAtOnce(c context.Context, id []string) error
// Wait waits until the task is complete.
Wait(c context.Context, id string) error
// Info returns internal queue information.
Info(c context.Context) InfoT
// Stops the queue from handing out new work items in Poll
Pause()
// Starts the queue again, Poll returns new items
Resume()
}

1
server/queue/worker.go Normal file
View File

@@ -0,0 +1 @@
package queue

View File

@@ -0,0 +1 @@
package queue