Task dependencies to survive restarts

This commit is contained in:
Laszlo Fogas
2019-06-25 12:07:36 +02:00
parent 84564f9ae5
commit be0d1d73fb
13 changed files with 114 additions and 13 deletions

View File

@@ -23,9 +23,11 @@ import (
// Task defines scheduled pipeline Task.
type Task struct {
ID string `meddler:"task_id"`
Data []byte `meddler:"task_data"`
Labels map[string]string `meddler:"task_labels,json"`
ID string `meddler:"task_id"`
Data []byte `meddler:"task_data"`
Labels map[string]string `meddler:"task_labels,json"`
Dependencies []string `meddler:"task_dependencies,json"`
RunOn []string `meddler:"task_run_on,json"`
}
// TaskStore defines storage for scheduled Tasks.
@@ -39,13 +41,18 @@ type TaskStore interface {
// ensures the task Queue can be restored when the system starts.
func WithTaskStore(q queue.Queue, s TaskStore) queue.Queue {
tasks, _ := s.TaskList()
toEnqueue := []*queue.Task{}
for _, task := range tasks {
q.Push(context.Background(), &queue.Task{
ID: task.ID,
Data: task.Data,
Labels: task.Labels,
toEnqueue = append(toEnqueue, &queue.Task{
ID: task.ID,
Data: task.Data,
Labels: task.Labels,
Dependencies: task.Dependencies,
RunOn: task.RunOn,
DepStatus: make(map[string]bool),
})
}
q.PushAtOnce(context.Background(), toEnqueue)
return &persistentQueue{q, s}
}
@@ -57,9 +64,11 @@ type persistentQueue struct {
// Push pushes a task to the tail of this queue.
func (q *persistentQueue) Push(c context.Context, task *queue.Task) error {
q.store.TaskInsert(&Task{
ID: task.ID,
Data: task.Data,
Labels: task.Labels,
ID: task.ID,
Data: task.Data,
Labels: task.Labels,
Dependencies: task.Dependencies,
RunOn: task.RunOn,
})
err := q.Queue.Push(c, task)
if err != nil {
@@ -72,9 +81,11 @@ func (q *persistentQueue) Push(c context.Context, task *queue.Task) error {
func (q *persistentQueue) PushAtOnce(c context.Context, tasks []*queue.Task) error {
for _, task := range tasks {
q.store.TaskInsert(&Task{
ID: task.ID,
Data: task.Data,
Labels: task.Labels,
ID: task.ID,
Data: task.Data,
Labels: task.Labels,
Dependencies: task.Dependencies,
RunOn: task.RunOn,
})
}
err := q.Queue.PushAtOnce(c, tasks)