From 3849d1e2b3623b31beb50f9d007e85cf8ba1ec02 Mon Sep 17 00:00:00 2001 From: Laszlo Fogas Date: Fri, 21 May 2021 12:50:19 +0200 Subject: [PATCH 1/3] Testcases for https://github.com/laszlocph/woodpecker/issues/200 --- cncd/queue/fifo_test.go | 125 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 125 insertions(+) diff --git a/cncd/queue/fifo_test.go b/cncd/queue/fifo_test.go index 287107c6f..40dad4b4f 100644 --- a/cncd/queue/fifo_test.go +++ b/cncd/queue/fifo_test.go @@ -200,6 +200,131 @@ func TestFifoErrors(t *testing.T) { } } +func TestFifoErrors2(t *testing.T) { + task1 := &Task{ + ID: "1", + } + + task2 := &Task{ + ID: "2", + Dependencies: []string{"1"}, + DepStatus: make(map[string]string), + } + + task3 := &Task{ + ID: "3", + Dependencies: []string{"1", "2"}, + DepStatus: make(map[string]string), + } + + q := New().(*fifo) + q.PushAtOnce(noContext, []*Task{task2, task3, task1}) + + got, _ := q.Poll(noContext, func(*Task) bool { return true }) + if got != task1 { + t.Errorf("expect task1 returned from queue as task2 and task3 depends on it") + return + } + + q.Done(noContext, got.ID, StatusSuccess) + + got, _ = q.Poll(noContext, func(*Task) bool { return true }) + if got != task2 { + t.Errorf("expect task2 returned from queue") + return + } + + q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error")) + + got, _ = q.Poll(noContext, func(*Task) bool { return true }) + if got != task3 { + t.Errorf("expect task3 returned from queue") + return + } + + if got.ShouldRun() { + t.Errorf("expect task3 should not run, task1 succeeded but task2 failed") + return + } +} + +func TestFifoErrorsMultiThread(t *testing.T) { + task1 := &Task{ + ID: "1", + } + + task2 := &Task{ + ID: "2", + Dependencies: []string{"1"}, + DepStatus: make(map[string]string), + } + + task3 := &Task{ + ID: "3", + Dependencies: []string{"1", "2"}, + DepStatus: make(map[string]string), + } + + q := New().(*fifo) + q.PushAtOnce(noContext, []*Task{task2, task3, task1}) + + obtainedWorkCh := make(chan *Task) + + for i := 0; i < 10; i++ { + go func(i int) { + for { + fmt.Printf("Worker %d started\n", i) + got, _ := q.Poll(noContext, func(*Task) bool { return true }) + obtainedWorkCh <- got + } + }(i) + } + + task1Processed := false + task2Processed := false + + for { + select { + case got := <-obtainedWorkCh: + fmt.Println(got.ID) + + if !task1Processed { + if got != task1 { + t.Errorf("expect task1 returned from queue as task2 and task3 depends on it") + return + } else { + task1Processed = true + q.Done(noContext, got.ID, StatusSuccess) + } + } else if !task2Processed { + if got != task2 { + t.Errorf("expect task2 returned from queue") + return + } else { + task2Processed = true + q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error")) + } + } else { + if got != task3 { + t.Errorf("expect task3 returned from queue") + return + } + + if got.ShouldRun() { + t.Errorf("expect task3 should not run, task1 succeeded but task2 failed") + return + } else { + return + } + } + + case <-time.After(3 * time.Second): + t.Errorf("test timed out") + return + } + } +} + func TestFifoTransitiveErrors(t *testing.T) { task1 := &Task{ ID: "1", From 11417f0f7c3b26a3d2ab9f3844b8072e7d59435f Mon Sep 17 00:00:00 2001 From: Laszlo Fogas Date: Fri, 21 May 2021 12:54:55 +0200 Subject: [PATCH 2/3] Test times out --- cncd/queue/fifo_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cncd/queue/fifo_test.go b/cncd/queue/fifo_test.go index 40dad4b4f..4f2a7bae9 100644 --- a/cncd/queue/fifo_test.go +++ b/cncd/queue/fifo_test.go @@ -318,7 +318,7 @@ func TestFifoErrorsMultiThread(t *testing.T) { } } - case <-time.After(3 * time.Second): + case <-time.After(5 * time.Second): t.Errorf("test timed out") return } From 3cb8d06b786354a6cc37617c0991714637bf0109 Mon Sep 17 00:00:00 2001 From: Laszlo Fogas Date: Tue, 25 May 2021 13:05:31 +0200 Subject: [PATCH 3/3] Fix: setting dep status on all deps, not just the first --- cncd/queue/fifo.go | 4 +-- cncd/queue/fifo_test.go | 54 ++++++++++++++++++++++++++--------------- cncd/queue/queue.go | 26 ++++++++++++++++++++ 3 files changed, 62 insertions(+), 22 deletions(-) diff --git a/cncd/queue/fifo.go b/cncd/queue/fifo.go index 82ee36815..9576cf791 100644 --- a/cncd/queue/fifo.go +++ b/cncd/queue/fifo.go @@ -356,8 +356,8 @@ func (q *fifo) updateDepStatusInQueue(taskID string, status string) { } } - var n *list.Element - for e := q.waitingOnDeps.Front(); e != nil; e = n { + next = nil + for e := q.waitingOnDeps.Front(); e != nil; e = next { next = e.Next() waiting, ok := e.Value.(*Task) for _, dep := range waiting.Dependencies { diff --git a/cncd/queue/fifo_test.go b/cncd/queue/fifo_test.go index 4f2a7bae9..e87829e83 100644 --- a/cncd/queue/fifo_test.go +++ b/cncd/queue/fifo_test.go @@ -207,8 +207,6 @@ func TestFifoErrors2(t *testing.T) { task2 := &Task{ ID: "2", - Dependencies: []string{"1"}, - DepStatus: make(map[string]string), } task3 := &Task{ @@ -220,23 +218,22 @@ func TestFifoErrors2(t *testing.T) { q := New().(*fifo) q.PushAtOnce(noContext, []*Task{task2, task3, task1}) + for i := 0; i < 2; i++ { + got, _ := q.Poll(noContext, func(*Task) bool { return true }) + if got != task1 && got != task2{ + t.Errorf("expect task1 or task2 returned from queue as task3 depends on them") + return + } + + if got != task1 { + q.Done(noContext, got.ID, StatusSuccess) + } + if got != task2 { + q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error")) + } + } + got, _ := q.Poll(noContext, func(*Task) bool { return true }) - if got != task1 { - t.Errorf("expect task1 returned from queue as task2 and task3 depends on it") - return - } - - q.Done(noContext, got.ID, StatusSuccess) - - got, _ = q.Poll(noContext, func(*Task) bool { return true }) - if got != task2 { - t.Errorf("expect task2 returned from queue") - return - } - - q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error")) - - got, _ = q.Poll(noContext, func(*Task) bool { return true }) if got != task3 { t.Errorf("expect task3 returned from queue") return @@ -249,6 +246,7 @@ func TestFifoErrors2(t *testing.T) { } func TestFifoErrorsMultiThread(t *testing.T) { + //logrus.SetLevel(logrus.DebugLevel) task1 := &Task{ ID: "1", } @@ -294,7 +292,14 @@ func TestFifoErrorsMultiThread(t *testing.T) { return } else { task1Processed = true - q.Done(noContext, got.ID, StatusSuccess) + q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error")) + go func() { + for { + fmt.Printf("Worker spawned\n") + got, _ := q.Poll(noContext, func(*Task) bool { return true }) + obtainedWorkCh <- got + } + }() } } else if !task2Processed { if got != task2 { @@ -302,7 +307,14 @@ func TestFifoErrorsMultiThread(t *testing.T) { return } else { task2Processed = true - q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error")) + q.Done(noContext, got.ID, StatusSuccess) + go func() { + for { + fmt.Printf("Worker spawned\n") + got, _ := q.Poll(noContext, func(*Task) bool { return true }) + obtainedWorkCh <- got + } + }() } } else { if got != task3 { @@ -319,6 +331,8 @@ func TestFifoErrorsMultiThread(t *testing.T) { } case <-time.After(5 * time.Second): + info := q.Info(noContext) + fmt.Println(info.String()) t.Errorf("test timed out") return } diff --git a/cncd/queue/queue.go b/cncd/queue/queue.go index 29bccb20e..0485e786c 100644 --- a/cncd/queue/queue.go +++ b/cncd/queue/queue.go @@ -3,6 +3,8 @@ package queue import ( "context" "errors" + "fmt" + "strings" ) var ( @@ -61,6 +63,12 @@ func (t *Task) ShouldRun() bool { return false } +func (t *Task) String() string { + var sb strings.Builder + sb.WriteString(fmt.Sprintf("%s (%s) - %s", t.ID, t.Dependencies, t.DepStatus)) + return sb.String() +} + func runsOnFailure(runsOn []string) bool { for _, status := range runsOn { if status == "failure" { @@ -98,6 +106,24 @@ type InfoT struct { Paused bool } +func (t *InfoT) String() string { + var sb strings.Builder + + for _, task := range t.Pending { + sb.WriteString("\t" + task.String()) + } + + for _, task := range t.Running { + sb.WriteString("\t" + task.String()) + } + + for _, task := range t.WaitingOnDeps { + sb.WriteString("\t" + task.String()) + } + + return sb.String() +} + // Filter filters tasks in the queue. If the Filter returns false, // the Task is skipped and not returned to the subscriber. type Filter func(*Task) bool