mirror of
https://github.com/woodpecker-ci/woodpecker.git
synced 2025-08-12 18:57:06 +00:00
Merge pull request #1830 from bradrydzewski/feature/mq
merge the feature/mq branch
This commit is contained in:
commit
db5b8e83ee
@ -3,7 +3,7 @@ workspace:
|
|||||||
path: src/github.com/drone/drone
|
path: src/github.com/drone/drone
|
||||||
|
|
||||||
pipeline:
|
pipeline:
|
||||||
backend:
|
test:
|
||||||
image: golang:1.6
|
image: golang:1.6
|
||||||
environment:
|
environment:
|
||||||
- GO15VENDOREXPERIMENT=1
|
- GO15VENDOREXPERIMENT=1
|
||||||
@ -23,7 +23,7 @@ pipeline:
|
|||||||
event: push
|
event: push
|
||||||
|
|
||||||
publish:
|
publish:
|
||||||
image: s3
|
image: plugins/s3
|
||||||
acl: public-read
|
acl: public-read
|
||||||
bucket: downloads.drone.io
|
bucket: downloads.drone.io
|
||||||
source: release/**/*.*
|
source: release/**/*.*
|
||||||
@ -32,9 +32,9 @@ pipeline:
|
|||||||
branch: master
|
branch: master
|
||||||
|
|
||||||
docker:
|
docker:
|
||||||
|
image: plugins/docker
|
||||||
repo: drone/drone
|
repo: drone/drone
|
||||||
tag: [ "0.5.0", "0.5" ]
|
tag: [ "0.5", "0.5.0", "0.5.0-rc" ]
|
||||||
storage_driver: overlay
|
|
||||||
when:
|
when:
|
||||||
branch: master
|
branch: master
|
||||||
event: push
|
event: push
|
||||||
|
@ -1 +1 @@
|
|||||||
eyJhbGciOiJIUzI1NiJ9.d29ya3NwYWNlOgogIGJhc2U6IC9nbwogIHBhdGg6IHNyYy9naXRodWIuY29tL2Ryb25lL2Ryb25lCgpwaXBlbGluZToKICBiYWNrZW5kOgogICAgaW1hZ2U6IGdvbGFuZzoxLjYKICAgIGVudmlyb25tZW50OgogICAgICAtIEdPMTVWRU5ET1JFWFBFUklNRU5UPTEKICAgIGNvbW1hbmRzOgogICAgICAtIG1ha2UgZGVwcyBnZW4KICAgICAgLSBtYWtlIHRlc3QgdGVzdF9wb3N0Z3JlcyB0ZXN0X215c3FsCgogIGNvbXBpbGU6CiAgICBpbWFnZTogZ29sYW5nOjEuNgogICAgZW52aXJvbm1lbnQ6CiAgICAgIC0gR08xNVZFTkRPUkVYUEVSSU1FTlQ9MQogICAgICAtIEdPUEFUSD0vZ28KICAgIGNvbW1hbmRzOgogICAgICAtIGV4cG9ydCBQQVRIPSRQQVRIOiRHT1BBVEgvYmluCiAgICAgIC0gbWFrZSBidWlsZAogICAgd2hlbjoKICAgICAgZXZlbnQ6IHB1c2gKCiAgcHVibGlzaDoKICAgIGltYWdlOiBzMwogICAgYWNsOiBwdWJsaWMtcmVhZAogICAgYnVja2V0OiBkb3dubG9hZHMuZHJvbmUuaW8KICAgIHNvdXJjZTogcmVsZWFzZS8qKi8qLioKICAgIHdoZW46CiAgICAgIGV2ZW50OiBwdXNoCiAgICAgIGJyYW5jaDogbWFzdGVyCgogIGRvY2tlcjoKICAgIHJlcG86IGRyb25lL2Ryb25lCiAgICB0YWc6IFsgIjAuNS4wIiwgIjAuNSIgXQogICAgc3RvcmFnZV9kcml2ZXI6IG92ZXJsYXkKICAgIHdoZW46CiAgICAgIGJyYW5jaDogbWFzdGVyCiAgICAgIGV2ZW50OiBwdXNoCgpzZXJ2aWNlczoKICBwb3N0Z3JlczoKICAgIGltYWdlOiBwb3N0Z3Jlczo5LjQuNQogICAgZW52aXJvbm1lbnQ6CiAgICAgIC0gUE9TVEdSRVNfVVNFUj1wb3N0Z3JlcwogIG15c3FsOgogICAgaW1hZ2U6IG15c3FsOjUuNi4yNwogICAgZW52aXJvbm1lbnQ6CiAgICAgIC0gTVlTUUxfREFUQUJBU0U9dGVzdAogICAgICAtIE1ZU1FMX0FMTE9XX0VNUFRZX1BBU1NXT1JEPXllcwo.kQIwqIgs7PnoKIGmzJ6hlbWTbV5zK0w4HVWsux79P3s
|
eyJhbGciOiJIUzI1NiJ9.d29ya3NwYWNlOgogIGJhc2U6IC9nbwogIHBhdGg6IHNyYy9naXRodWIuY29tL2Ryb25lL2Ryb25lCgpwaXBlbGluZToKICB0ZXN0OgogICAgaW1hZ2U6IGdvbGFuZzoxLjYKICAgIGVudmlyb25tZW50OgogICAgICAtIEdPMTVWRU5ET1JFWFBFUklNRU5UPTEKICAgIGNvbW1hbmRzOgogICAgICAtIG1ha2UgZGVwcyBnZW4KICAgICAgLSBtYWtlIHRlc3QgdGVzdF9wb3N0Z3JlcyB0ZXN0X215c3FsCgogIGNvbXBpbGU6CiAgICBpbWFnZTogZ29sYW5nOjEuNgogICAgZW52aXJvbm1lbnQ6CiAgICAgIC0gR08xNVZFTkRPUkVYUEVSSU1FTlQ9MQogICAgICAtIEdPUEFUSD0vZ28KICAgIGNvbW1hbmRzOgogICAgICAtIGV4cG9ydCBQQVRIPSRQQVRIOiRHT1BBVEgvYmluCiAgICAgIC0gbWFrZSBidWlsZAogICAgd2hlbjoKICAgICAgZXZlbnQ6IHB1c2gKCiAgcHVibGlzaDoKICAgIGltYWdlOiBwbHVnaW5zL3MzCiAgICBhY2w6IHB1YmxpYy1yZWFkCiAgICBidWNrZXQ6IGRvd25sb2Fkcy5kcm9uZS5pbwogICAgc291cmNlOiByZWxlYXNlLyoqLyouKgogICAgd2hlbjoKICAgICAgZXZlbnQ6IHB1c2gKICAgICAgYnJhbmNoOiBtYXN0ZXIKCiAgZG9ja2VyOgogICAgaW1hZ2U6IHBsdWdpbnMvZG9ja2VyCiAgICByZXBvOiBkcm9uZS9kcm9uZQogICAgdGFnOiBbICIwLjUiLCAiMC41LjAiLCAiMC41LjAtcmMiIF0KICAgIHdoZW46CiAgICAgIGJyYW5jaDogbWFzdGVyCiAgICAgIGV2ZW50OiBwdXNoCgpzZXJ2aWNlczoKICBwb3N0Z3JlczoKICAgIGltYWdlOiBwb3N0Z3Jlczo5LjQuNQogICAgZW52aXJvbm1lbnQ6CiAgICAgIC0gUE9TVEdSRVNfVVNFUj1wb3N0Z3JlcwogIG15c3FsOgogICAgaW1hZ2U6IG15c3FsOjUuNi4yNwogICAgZW52aXJvbm1lbnQ6CiAgICAgIC0gTVlTUUxfREFUQUJBU0U9dGVzdAogICAgICAtIE1ZU1FMX0FMTE9XX0VNUFRZX1BBU1NXT1JEPXllcwo.0lD1m6yILbU8ZrSJcZv7Y1CcGEG5zIaJma1C1lUTc7o
|
2
Makefile
2
Makefile
@ -19,6 +19,8 @@ deps_backend:
|
|||||||
go get -u golang.org/x/tools/cmd/cover
|
go get -u golang.org/x/tools/cmd/cover
|
||||||
go get -u github.com/jteeuwen/go-bindata/...
|
go get -u github.com/jteeuwen/go-bindata/...
|
||||||
go get -u github.com/elazarl/go-bindata-assetfs/...
|
go get -u github.com/elazarl/go-bindata-assetfs/...
|
||||||
|
go get -u github.com/drone/mq/...
|
||||||
|
go get -u github.com/tidwall/redlog
|
||||||
|
|
||||||
gen: gen_template gen_migrations
|
gen: gen_template gen_migrations
|
||||||
|
|
||||||
|
@ -11,7 +11,6 @@ import (
|
|||||||
|
|
||||||
"github.com/drone/drone/build"
|
"github.com/drone/drone/build"
|
||||||
"github.com/drone/drone/model"
|
"github.com/drone/drone/model"
|
||||||
"github.com/drone/drone/queue"
|
|
||||||
"github.com/drone/drone/version"
|
"github.com/drone/drone/version"
|
||||||
"github.com/drone/drone/yaml"
|
"github.com/drone/drone/yaml"
|
||||||
"github.com/drone/drone/yaml/expander"
|
"github.com/drone/drone/yaml/expander"
|
||||||
@ -48,7 +47,7 @@ func (a *Agent) Poll() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Agent) Run(payload *queue.Work, cancel <-chan bool) error {
|
func (a *Agent) Run(payload *model.Work, cancel <-chan bool) error {
|
||||||
|
|
||||||
payload.Job.Status = model.StatusRunning
|
payload.Job.Status = model.StatusRunning
|
||||||
payload.Job.Started = time.Now().Unix()
|
payload.Job.Started = time.Now().Unix()
|
||||||
@ -90,7 +89,7 @@ func (a *Agent) Run(payload *queue.Work, cancel <-chan bool) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Agent) prep(w *queue.Work) (*yaml.Config, error) {
|
func (a *Agent) prep(w *model.Work) (*yaml.Config, error) {
|
||||||
|
|
||||||
envs := toEnv(w)
|
envs := toEnv(w)
|
||||||
w.Yaml = expander.ExpandString(w.Yaml, envs)
|
w.Yaml = expander.ExpandString(w.Yaml, envs)
|
||||||
@ -155,8 +154,6 @@ func (a *Agent) prep(w *queue.Work) (*yaml.Config, error) {
|
|||||||
transform.CommandTransform(conf)
|
transform.CommandTransform(conf)
|
||||||
transform.ImagePull(conf, a.Pull)
|
transform.ImagePull(conf, a.Pull)
|
||||||
transform.ImageTag(conf)
|
transform.ImageTag(conf)
|
||||||
transform.ImageName(conf)
|
|
||||||
transform.ImageNamespace(conf, a.Namespace)
|
|
||||||
if err := transform.ImageEscalate(conf, a.Escalate); err != nil {
|
if err := transform.ImageEscalate(conf, a.Escalate); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -172,7 +169,7 @@ func (a *Agent) prep(w *queue.Work) (*yaml.Config, error) {
|
|||||||
return conf, nil
|
return conf, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Agent) exec(spec *yaml.Config, payload *queue.Work, cancel <-chan bool) error {
|
func (a *Agent) exec(spec *yaml.Config, payload *model.Work, cancel <-chan bool) error {
|
||||||
|
|
||||||
conf := build.Config{
|
conf := build.Config{
|
||||||
Engine: a.Engine,
|
Engine: a.Engine,
|
||||||
@ -231,7 +228,7 @@ func (a *Agent) exec(spec *yaml.Config, payload *queue.Work, cancel <-chan bool)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func toEnv(w *queue.Work) map[string]string {
|
func toEnv(w *model.Work) map[string]string {
|
||||||
envs := map[string]string{
|
envs := map[string]string{
|
||||||
"CI": "drone",
|
"CI": "drone",
|
||||||
"DRONE": "true",
|
"DRONE": "true",
|
||||||
|
@ -1,25 +1,22 @@
|
|||||||
package agent
|
package agent
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/Sirupsen/logrus"
|
"github.com/Sirupsen/logrus"
|
||||||
"github.com/drone/drone/build"
|
"github.com/drone/drone/build"
|
||||||
"github.com/drone/drone/client"
|
"github.com/drone/drone/model"
|
||||||
"github.com/drone/drone/queue"
|
"github.com/drone/mq/logger"
|
||||||
|
"github.com/drone/mq/stomp"
|
||||||
)
|
)
|
||||||
|
|
||||||
// UpdateFunc handles buid pipeline status updates.
|
// UpdateFunc handles buid pipeline status updates.
|
||||||
type UpdateFunc func(*queue.Work)
|
type UpdateFunc func(*model.Work)
|
||||||
|
|
||||||
// LoggerFunc handles buid pipeline logging updates.
|
// LoggerFunc handles buid pipeline logging updates.
|
||||||
type LoggerFunc func(*build.Line)
|
type LoggerFunc func(*build.Line)
|
||||||
|
|
||||||
var NoopUpdateFunc = func(*queue.Work) {}
|
var NoopUpdateFunc = func(*model.Work) {}
|
||||||
|
|
||||||
var TermLoggerFunc = func(line *build.Line) {
|
var TermLoggerFunc = func(line *build.Line) {
|
||||||
fmt.Println(line)
|
fmt.Println(line)
|
||||||
@ -27,65 +24,44 @@ var TermLoggerFunc = func(line *build.Line) {
|
|||||||
|
|
||||||
// NewClientUpdater returns an updater that sends updated build details
|
// NewClientUpdater returns an updater that sends updated build details
|
||||||
// to the drone server.
|
// to the drone server.
|
||||||
func NewClientUpdater(client client.Client) UpdateFunc {
|
func NewClientUpdater(client *stomp.Client) UpdateFunc {
|
||||||
return func(w *queue.Work) {
|
return func(w *model.Work) {
|
||||||
for {
|
err := client.SendJSON("/queue/updates", w)
|
||||||
err := client.Push(w)
|
if err != nil {
|
||||||
if err == nil {
|
logger.Warningf("Error updating %s/%s#%d.%d. %s",
|
||||||
return
|
|
||||||
}
|
|
||||||
logrus.Errorf("Error updating %s/%s#%d.%d. Retry in 30s. %s",
|
|
||||||
w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number, err)
|
w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number, err)
|
||||||
logrus.Infof("Retry update in 30s")
|
}
|
||||||
time.Sleep(time.Second * 30)
|
if w.Job.Status != model.StatusRunning {
|
||||||
|
var dest = fmt.Sprintf("/topic/logs.%d", w.Job.ID)
|
||||||
|
var opts = []stomp.MessageOption{
|
||||||
|
stomp.WithHeader("eof", "true"),
|
||||||
|
stomp.WithRetain("all"),
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := client.Send(dest, []byte("eof"), opts...); err != nil {
|
||||||
|
logger.Warningf("Error sending eof %s/%s#%d.%d. %s",
|
||||||
|
w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewStreamLogger(stream client.StreamWriter, w io.Writer, limit int64) LoggerFunc {
|
func NewClientLogger(client *stomp.Client, id int64, limit int64) LoggerFunc {
|
||||||
var err error
|
|
||||||
var size int64
|
|
||||||
return func(line *build.Line) {
|
|
||||||
|
|
||||||
|
var size int64
|
||||||
|
var dest = fmt.Sprintf("/topic/logs.%d", id)
|
||||||
|
var opts = []stomp.MessageOption{
|
||||||
|
stomp.WithRetain("all"),
|
||||||
|
}
|
||||||
|
|
||||||
|
return func(line *build.Line) {
|
||||||
if size > limit {
|
if size > limit {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if err := client.SendJSON(dest, line, opts...); err != nil {
|
||||||
// TODO remove this double-serialization
|
|
||||||
linejson, _ := json.Marshal(line)
|
|
||||||
w.Write(linejson)
|
|
||||||
w.Write([]byte{'\n'})
|
|
||||||
|
|
||||||
if err = stream.WriteJSON(line); err != nil {
|
|
||||||
logrus.Errorf("Error streaming build logs. %s", err)
|
logrus.Errorf("Error streaming build logs. %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
size += int64(len(line.Out))
|
size += int64(len(line.Out))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewClientLogger(client client.Client, id int64, rc io.ReadCloser, wc io.WriteCloser, limit int64) LoggerFunc {
|
|
||||||
var once sync.Once
|
|
||||||
var size int64
|
|
||||||
return func(line *build.Line) {
|
|
||||||
// annoying hack to only start streaming once the first line is written
|
|
||||||
once.Do(func() {
|
|
||||||
go func() {
|
|
||||||
err := client.Stream(id, rc)
|
|
||||||
if err != nil && err != io.ErrClosedPipe {
|
|
||||||
logrus.Errorf("Error streaming build logs. %s", err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
})
|
|
||||||
|
|
||||||
if size > limit {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
linejson, _ := json.Marshal(line)
|
|
||||||
wc.Write(linejson)
|
|
||||||
wc.Write([]byte{'\n'})
|
|
||||||
|
|
||||||
size += int64(len(line.Out))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -175,7 +175,6 @@ func (p *Pipeline) exec(c *yaml.Container) error {
|
|||||||
}
|
}
|
||||||
p.containers = append(p.containers, name)
|
p.containers = append(p.containers, name)
|
||||||
|
|
||||||
logrus.Debugf("wait.add(1) for %s logs", name)
|
|
||||||
p.wait.Add(1)
|
p.wait.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer func() {
|
defer func() {
|
||||||
@ -183,7 +182,6 @@ func (p *Pipeline) exec(c *yaml.Container) error {
|
|||||||
logrus.Errorln("recover writing build output", r)
|
logrus.Errorln("recover writing build output", r)
|
||||||
}
|
}
|
||||||
|
|
||||||
logrus.Debugf("wait.done() for %s logs", name)
|
|
||||||
p.wait.Done()
|
p.wait.Done()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -217,7 +215,6 @@ func (p *Pipeline) exec(c *yaml.Container) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
logrus.Debugf("wait.add(1) for %s exit code", name)
|
|
||||||
p.wait.Add(1)
|
p.wait.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer func() {
|
defer func() {
|
||||||
@ -225,7 +222,6 @@ func (p *Pipeline) exec(c *yaml.Container) error {
|
|||||||
logrus.Errorln("recover writing exit code to output", r)
|
logrus.Errorln("recover writing exit code to output", r)
|
||||||
}
|
}
|
||||||
p.wait.Done()
|
p.wait.Done()
|
||||||
logrus.Debugf("wait.done() for %s exit code", name)
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
p.pipe <- &Line{
|
p.pipe <- &Line{
|
||||||
|
40
bus/bus.go
40
bus/bus.go
@ -1,40 +0,0 @@
|
|||||||
package bus
|
|
||||||
|
|
||||||
//go:generate mockery -name Bus -output mock -case=underscore
|
|
||||||
|
|
||||||
import "golang.org/x/net/context"
|
|
||||||
|
|
||||||
// Bus represents an event bus implementation that
|
|
||||||
// allows a publisher to broadcast Event notifications
|
|
||||||
// to a list of subscribers.
|
|
||||||
type Bus interface {
|
|
||||||
// Publish broadcasts an event to all subscribers.
|
|
||||||
Publish(*Event)
|
|
||||||
|
|
||||||
// Subscribe adds the channel to the list of
|
|
||||||
// subscribers. Each subscriber in the list will
|
|
||||||
// receive broadcast events.
|
|
||||||
Subscribe(chan *Event)
|
|
||||||
|
|
||||||
// Unsubscribe removes the channel from the list
|
|
||||||
// of subscribers.
|
|
||||||
Unsubscribe(chan *Event)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Publish broadcasts an event to all subscribers.
|
|
||||||
func Publish(c context.Context, event *Event) {
|
|
||||||
FromContext(c).Publish(event)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Subscribe adds the channel to the list of
|
|
||||||
// subscribers. Each subscriber in the list will
|
|
||||||
// receive broadcast events.
|
|
||||||
func Subscribe(c context.Context, eventc chan *Event) {
|
|
||||||
FromContext(c).Subscribe(eventc)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Unsubscribe removes the channel from the
|
|
||||||
// list of subscribers.
|
|
||||||
func Unsubscribe(c context.Context, eventc chan *Event) {
|
|
||||||
FromContext(c).Unsubscribe(eventc)
|
|
||||||
}
|
|
@ -1,46 +0,0 @@
|
|||||||
package bus
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
type eventbus struct {
|
|
||||||
sync.Mutex
|
|
||||||
subs map[chan *Event]bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// New creates a simple event bus that manages a list of
|
|
||||||
// subscribers to which events are published.
|
|
||||||
func New() Bus {
|
|
||||||
return newEventbus()
|
|
||||||
}
|
|
||||||
|
|
||||||
func newEventbus() *eventbus {
|
|
||||||
return &eventbus{
|
|
||||||
subs: make(map[chan *Event]bool),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *eventbus) Subscribe(c chan *Event) {
|
|
||||||
b.Lock()
|
|
||||||
b.subs[c] = true
|
|
||||||
b.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *eventbus) Unsubscribe(c chan *Event) {
|
|
||||||
b.Lock()
|
|
||||||
delete(b.subs, c)
|
|
||||||
b.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *eventbus) Publish(event *Event) {
|
|
||||||
b.Lock()
|
|
||||||
defer b.Unlock()
|
|
||||||
|
|
||||||
for s := range b.subs {
|
|
||||||
go func(c chan *Event) {
|
|
||||||
defer recover()
|
|
||||||
c <- event
|
|
||||||
}(s)
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,73 +0,0 @@
|
|||||||
package bus
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/drone/drone/model"
|
|
||||||
. "github.com/franela/goblin"
|
|
||||||
"github.com/gin-gonic/gin"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestBus(t *testing.T) {
|
|
||||||
g := Goblin(t)
|
|
||||||
g.Describe("Event bus", func() {
|
|
||||||
|
|
||||||
g.It("Should unsubscribe", func() {
|
|
||||||
c := new(gin.Context)
|
|
||||||
b := newEventbus()
|
|
||||||
ToContext(c, b)
|
|
||||||
|
|
||||||
c1 := make(chan *Event)
|
|
||||||
c2 := make(chan *Event)
|
|
||||||
Subscribe(c, c1)
|
|
||||||
Subscribe(c, c2)
|
|
||||||
|
|
||||||
g.Assert(len(b.subs)).Equal(2)
|
|
||||||
})
|
|
||||||
|
|
||||||
g.It("Should subscribe", func() {
|
|
||||||
c := new(gin.Context)
|
|
||||||
b := newEventbus()
|
|
||||||
ToContext(c, b)
|
|
||||||
|
|
||||||
c1 := make(chan *Event)
|
|
||||||
c2 := make(chan *Event)
|
|
||||||
Subscribe(c, c1)
|
|
||||||
Subscribe(c, c2)
|
|
||||||
|
|
||||||
g.Assert(len(b.subs)).Equal(2)
|
|
||||||
|
|
||||||
Unsubscribe(c, c1)
|
|
||||||
Unsubscribe(c, c2)
|
|
||||||
|
|
||||||
g.Assert(len(b.subs)).Equal(0)
|
|
||||||
})
|
|
||||||
|
|
||||||
g.It("Should publish", func() {
|
|
||||||
c := new(gin.Context)
|
|
||||||
b := New()
|
|
||||||
ToContext(c, b)
|
|
||||||
|
|
||||||
e1 := NewEvent(Started, &model.Repo{}, &model.Build{}, &model.Job{})
|
|
||||||
e2 := NewEvent(Started, &model.Repo{}, &model.Build{}, &model.Job{})
|
|
||||||
c1 := make(chan *Event)
|
|
||||||
|
|
||||||
Subscribe(c, c1)
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
wg.Add(1)
|
|
||||||
|
|
||||||
var r1, r2 *Event
|
|
||||||
go func() {
|
|
||||||
r1 = <-c1
|
|
||||||
r2 = <-c1
|
|
||||||
wg.Done()
|
|
||||||
}()
|
|
||||||
Publish(c, e1)
|
|
||||||
Publish(c, e2)
|
|
||||||
wg.Wait()
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
}
|
|
@ -1,21 +0,0 @@
|
|||||||
package bus
|
|
||||||
|
|
||||||
import "golang.org/x/net/context"
|
|
||||||
|
|
||||||
const key = "bus"
|
|
||||||
|
|
||||||
// Setter defines a context that enables setting values.
|
|
||||||
type Setter interface {
|
|
||||||
Set(string, interface{})
|
|
||||||
}
|
|
||||||
|
|
||||||
// FromContext returns the Bus associated with this context.
|
|
||||||
func FromContext(c context.Context) Bus {
|
|
||||||
return c.Value(key).(Bus)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ToContext adds the Bus to this context if it supports
|
|
||||||
// the Setter interface.
|
|
||||||
func ToContext(c Setter, b Bus) {
|
|
||||||
c.Set(key, b)
|
|
||||||
}
|
|
@ -4,7 +4,6 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
|
|
||||||
"github.com/drone/drone/model"
|
"github.com/drone/drone/model"
|
||||||
"github.com/drone/drone/queue"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Client is used to communicate with a Drone server.
|
// Client is used to communicate with a Drone server.
|
||||||
@ -103,27 +102,4 @@ type Client interface {
|
|||||||
|
|
||||||
// AgentList returns a list of build agents.
|
// AgentList returns a list of build agents.
|
||||||
AgentList() ([]*model.Agent, error)
|
AgentList() ([]*model.Agent, error)
|
||||||
|
|
||||||
//
|
|
||||||
// below items for Queue (internal use only)
|
|
||||||
//
|
|
||||||
|
|
||||||
// Pull pulls work from the server queue.
|
|
||||||
Pull(os, arch string) (*queue.Work, error)
|
|
||||||
|
|
||||||
// Push pushes an update to the server.
|
|
||||||
Push(*queue.Work) error
|
|
||||||
|
|
||||||
// Stream streams the build logs to the server.
|
|
||||||
Stream(int64, io.ReadCloser) error
|
|
||||||
|
|
||||||
LogStream(int64) (StreamWriter, error)
|
|
||||||
|
|
||||||
LogPost(int64, io.ReadCloser) error
|
|
||||||
|
|
||||||
// Wait waits for the job to the complete.
|
|
||||||
Wait(int64) *Wait
|
|
||||||
|
|
||||||
// Ping the server
|
|
||||||
Ping() error
|
|
||||||
}
|
}
|
||||||
|
@ -12,10 +12,6 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
"github.com/drone/drone/model"
|
"github.com/drone/drone/model"
|
||||||
"github.com/drone/drone/queue"
|
|
||||||
"github.com/gorilla/websocket"
|
|
||||||
"golang.org/x/net/context"
|
|
||||||
"golang.org/x/net/context/ctxhttp"
|
|
||||||
"golang.org/x/oauth2"
|
"golang.org/x/oauth2"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -323,110 +319,6 @@ func (c *client) AgentList() ([]*model.Agent, error) {
|
|||||||
return out, err
|
return out, err
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
|
||||||
// below items for Queue (internal use only)
|
|
||||||
//
|
|
||||||
|
|
||||||
// Pull pulls work from the server queue.
|
|
||||||
func (c *client) Pull(os, arch string) (*queue.Work, error) {
|
|
||||||
out := new(queue.Work)
|
|
||||||
uri := fmt.Sprintf(pathPull, c.base, os, arch)
|
|
||||||
err := c.post(uri, nil, out)
|
|
||||||
return out, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Push pushes an update to the server.
|
|
||||||
func (c *client) Push(p *queue.Work) error {
|
|
||||||
uri := fmt.Sprintf(pathPush, c.base, p.Job.ID)
|
|
||||||
err := c.post(uri, p, nil)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ping pings the server.
|
|
||||||
func (c *client) Ping() error {
|
|
||||||
uri := fmt.Sprintf(pathPing, c.base)
|
|
||||||
err := c.post(uri, nil, nil)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stream streams the build logs to the server.
|
|
||||||
func (c *client) Stream(id int64, rc io.ReadCloser) error {
|
|
||||||
uri := fmt.Sprintf(pathStream, c.base, id)
|
|
||||||
err := c.post(uri, rc, nil)
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// LogPost sends the full build logs to the server.
|
|
||||||
func (c *client) LogPost(id int64, rc io.ReadCloser) error {
|
|
||||||
uri := fmt.Sprintf(pathLogs, c.base, id)
|
|
||||||
return c.post(uri, rc, nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
// StreamWriter implements a special writer for streaming log entries to the
|
|
||||||
// central Drone server. The standard implementation is the gorilla.Connection.
|
|
||||||
type StreamWriter interface {
|
|
||||||
Close() error
|
|
||||||
WriteJSON(interface{}) error
|
|
||||||
}
|
|
||||||
|
|
||||||
// LogStream streams the build logs to the server.
|
|
||||||
func (c *client) LogStream(id int64) (StreamWriter, error) {
|
|
||||||
rawurl := fmt.Sprintf(pathLogsAuth, c.base, id, c.token)
|
|
||||||
uri, err := url.Parse(rawurl)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if uri.Scheme == "https" {
|
|
||||||
uri.Scheme = "wss"
|
|
||||||
} else {
|
|
||||||
uri.Scheme = "ws"
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO need TLS client configuration
|
|
||||||
|
|
||||||
conn, _, err := websocket.DefaultDialer.Dial(uri.String(), nil)
|
|
||||||
return conn, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait watches and waits for the build to cancel or finish.
|
|
||||||
func (c *client) Wait(id int64) *Wait {
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
return &Wait{id, c, ctx, cancel}
|
|
||||||
}
|
|
||||||
|
|
||||||
type Wait struct {
|
|
||||||
id int64
|
|
||||||
client *client
|
|
||||||
|
|
||||||
ctx context.Context
|
|
||||||
cancel context.CancelFunc
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *Wait) Done() (*model.Job, error) {
|
|
||||||
uri := fmt.Sprintf(pathWait, w.client.base, w.id)
|
|
||||||
req, err := w.client.createRequest(uri, "POST", nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
res, err := ctxhttp.Do(w.ctx, w.client.client, req)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer res.Body.Close()
|
|
||||||
job := &model.Job{}
|
|
||||||
err = json.NewDecoder(res.Body).Decode(&job)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return job, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *Wait) Cancel() {
|
|
||||||
w.cancel()
|
|
||||||
}
|
|
||||||
|
|
||||||
//
|
//
|
||||||
// http request helper functions
|
// http request helper functions
|
||||||
//
|
//
|
||||||
|
@ -3,17 +3,19 @@ package agent
|
|||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/drone/drone/client"
|
"github.com/drone/drone/model"
|
||||||
"github.com/drone/drone/shared/token"
|
"github.com/drone/mq/logger"
|
||||||
"github.com/samalba/dockerclient"
|
"github.com/drone/mq/stomp"
|
||||||
|
"github.com/tidwall/redlog"
|
||||||
|
|
||||||
"github.com/Sirupsen/logrus"
|
"github.com/Sirupsen/logrus"
|
||||||
"github.com/codegangsta/cli"
|
"github.com/codegangsta/cli"
|
||||||
"strings"
|
"github.com/samalba/dockerclient"
|
||||||
)
|
)
|
||||||
|
|
||||||
// AgentCmd is the exported command for starting the drone agent.
|
// AgentCmd is the exported command for starting the drone agent.
|
||||||
@ -57,17 +59,11 @@ var AgentCmd = cli.Command{
|
|||||||
Usage: "docker architecture system",
|
Usage: "docker architecture system",
|
||||||
Value: "amd64",
|
Value: "amd64",
|
||||||
},
|
},
|
||||||
cli.StringFlag{
|
|
||||||
EnvVar: "DRONE_STORAGE_DRIVER",
|
|
||||||
Name: "drone-storage-driver",
|
|
||||||
Usage: "docker storage driver",
|
|
||||||
Value: "overlay",
|
|
||||||
},
|
|
||||||
cli.StringFlag{
|
cli.StringFlag{
|
||||||
EnvVar: "DRONE_SERVER",
|
EnvVar: "DRONE_SERVER",
|
||||||
Name: "drone-server",
|
Name: "drone-server",
|
||||||
Usage: "drone server address",
|
Usage: "drone server address",
|
||||||
Value: "http://localhost:8000",
|
Value: "ws://localhost:8000/ws/broker",
|
||||||
},
|
},
|
||||||
cli.StringFlag{
|
cli.StringFlag{
|
||||||
EnvVar: "DRONE_TOKEN",
|
EnvVar: "DRONE_TOKEN",
|
||||||
@ -102,6 +98,11 @@ var AgentCmd = cli.Command{
|
|||||||
Usage: "drone timeout due to log inactivity",
|
Usage: "drone timeout due to log inactivity",
|
||||||
Value: time.Minute * 5,
|
Value: time.Minute * 5,
|
||||||
},
|
},
|
||||||
|
cli.StringFlag{
|
||||||
|
EnvVar: "DRONE_FILTER",
|
||||||
|
Name: "filter",
|
||||||
|
Usage: "filter jobs processed by this agent",
|
||||||
|
},
|
||||||
cli.IntFlag{
|
cli.IntFlag{
|
||||||
EnvVar: "DRONE_MAX_LOGS",
|
EnvVar: "DRONE_MAX_LOGS",
|
||||||
Name: "max-log-size",
|
Name: "max-log-size",
|
||||||
@ -137,30 +138,31 @@ var AgentCmd = cli.Command{
|
|||||||
|
|
||||||
func start(c *cli.Context) {
|
func start(c *cli.Context) {
|
||||||
|
|
||||||
|
log := redlog.New(os.Stderr)
|
||||||
|
log.SetLevel(0)
|
||||||
|
logger.SetLogger(log)
|
||||||
|
|
||||||
// debug level if requested by user
|
// debug level if requested by user
|
||||||
if c.Bool("debug") {
|
if c.Bool("debug") {
|
||||||
logrus.SetLevel(logrus.DebugLevel)
|
logrus.SetLevel(logrus.DebugLevel)
|
||||||
|
|
||||||
|
log.SetLevel(1)
|
||||||
} else {
|
} else {
|
||||||
logrus.SetLevel(logrus.WarnLevel)
|
logrus.SetLevel(logrus.WarnLevel)
|
||||||
}
|
}
|
||||||
|
|
||||||
var accessToken string
|
var accessToken string
|
||||||
if c.String("drone-secret") != "" {
|
if c.String("drone-secret") != "" {
|
||||||
secretToken := c.String("drone-secret")
|
// secretToken := c.String("drone-secret")
|
||||||
accessToken, _ = token.New(token.AgentToken, "").Sign(secretToken)
|
accessToken = c.String("drone-secret")
|
||||||
|
// accessToken, _ = token.New(token.AgentToken, "").Sign(secretToken)
|
||||||
} else {
|
} else {
|
||||||
accessToken = c.String("drone-token")
|
accessToken = c.String("drone-token")
|
||||||
}
|
}
|
||||||
|
|
||||||
logrus.Infof("Connecting to %s with token %s",
|
logger.Noticef("connecting to server%s", c.String("drone-server"))
|
||||||
c.String("drone-server"),
|
|
||||||
accessToken,
|
|
||||||
)
|
|
||||||
|
|
||||||
client := client.NewClientToken(
|
server := strings.TrimRight(c.String("drone-server"), "/")
|
||||||
strings.TrimRight(c.String("drone-server"), "/"),
|
|
||||||
accessToken,
|
|
||||||
)
|
|
||||||
|
|
||||||
tls, err := dockerclient.TLSConfigFromCertPath(c.String("docker-cert-path"))
|
tls, err := dockerclient.TLSConfigFromCertPath(c.String("docker-cert-path"))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
@ -171,19 +173,15 @@ func start(c *cli.Context) {
|
|||||||
logrus.Fatal(err)
|
logrus.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
var client *stomp.Client
|
||||||
for {
|
|
||||||
if err := client.Ping(); err != nil {
|
handler := func(m *stomp.Message) {
|
||||||
logrus.Warnf("unable to ping the server. %s", err.Error())
|
running.Add(1)
|
||||||
}
|
defer func() {
|
||||||
time.Sleep(c.Duration("ping"))
|
running.Done()
|
||||||
}
|
client.Ack(m.Ack)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
for i := 0; i < c.Int("docker-max-procs"); i++ {
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
r := pipeline{
|
r := pipeline{
|
||||||
drone: client,
|
drone: client,
|
||||||
docker: docker,
|
docker: docker,
|
||||||
@ -196,17 +194,55 @@ func start(c *cli.Context) {
|
|||||||
logs: int64(c.Int("max-log-size")) * 1000000,
|
logs: int64(c.Int("max-log-size")) * 1000000,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for {
|
|
||||||
if err := r.run(); err != nil {
|
work := new(model.Work)
|
||||||
dur := c.Duration("backoff")
|
m.Unmarshal(work)
|
||||||
logrus.Warnf("reconnect in %v. %s", dur, err.Error())
|
r.run(work)
|
||||||
time.Sleep(dur)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
handleSignals()
|
handleSignals()
|
||||||
wg.Wait()
|
|
||||||
|
backoff := c.Duration("backoff")
|
||||||
|
|
||||||
|
for {
|
||||||
|
// dial the drone server to establish a TCP connection.
|
||||||
|
client, err = stomp.Dial(server)
|
||||||
|
if err != nil {
|
||||||
|
logger.Warningf("connection failed, retry in %v. %s", backoff, err)
|
||||||
|
<-time.After(backoff)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
opts := []stomp.MessageOption{
|
||||||
|
stomp.WithCredentials("x-token", accessToken),
|
||||||
|
}
|
||||||
|
|
||||||
|
// initialize the stomp session and authenticate.
|
||||||
|
if err = client.Connect(opts...); err != nil {
|
||||||
|
logger.Warningf("session failed, retry in %v", backoff, err)
|
||||||
|
<-time.After(backoff)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
opts = []stomp.MessageOption{
|
||||||
|
stomp.WithAck("client"),
|
||||||
|
stomp.WithPrefetch(
|
||||||
|
c.Int("docker-max-procs"),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
if filter := c.String("filter"); filter != "" {
|
||||||
|
opts = append(opts, stomp.WithSelector(filter))
|
||||||
|
}
|
||||||
|
|
||||||
|
// subscribe to the pending build queue.
|
||||||
|
client.Subscribe("/queue/pending", stomp.HandlerFunc(func(m *stomp.Message) {
|
||||||
|
go handler(m) // HACK until we a channel based Subscribe implementation
|
||||||
|
}), opts...)
|
||||||
|
|
||||||
|
logger.Noticef("connection establish, ready to process builds.")
|
||||||
|
<-client.Done()
|
||||||
|
|
||||||
|
logger.Warningf("connection interrupted, attempting to reconnect.")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// tracks running builds
|
// tracks running builds
|
||||||
@ -220,10 +256,10 @@ func handleSignals() {
|
|||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
<-c
|
<-c
|
||||||
logrus.Debugln("SIGTERM received.")
|
logger.Warningf("SIGTERM received.")
|
||||||
logrus.Debugln("wait for running builds to finish.")
|
logger.Warningf("wait for running builds to finish.")
|
||||||
running.Wait()
|
running.Wait()
|
||||||
logrus.Debugln("done.")
|
logger.Warningf("done.")
|
||||||
os.Exit(0)
|
os.Exit(0)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
@ -1,14 +1,13 @@
|
|||||||
package agent
|
package agent
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"io/ioutil"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/Sirupsen/logrus"
|
"github.com/Sirupsen/logrus"
|
||||||
"github.com/drone/drone/agent"
|
"github.com/drone/drone/agent"
|
||||||
"github.com/drone/drone/build/docker"
|
"github.com/drone/drone/build/docker"
|
||||||
"github.com/drone/drone/client"
|
"github.com/drone/drone/model"
|
||||||
|
"github.com/drone/mq/stomp"
|
||||||
|
|
||||||
"github.com/samalba/dockerclient"
|
"github.com/samalba/dockerclient"
|
||||||
)
|
)
|
||||||
@ -23,20 +22,16 @@ type config struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type pipeline struct {
|
type pipeline struct {
|
||||||
drone client.Client
|
drone *stomp.Client
|
||||||
docker dockerclient.Client
|
docker dockerclient.Client
|
||||||
config config
|
config config
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *pipeline) run() error {
|
func (r *pipeline) run(w *model.Work) {
|
||||||
w, err := r.drone.Pull("linux", "amd64")
|
|
||||||
if err != nil {
|
// defer func() {
|
||||||
return err
|
// // r.drone.Ack(id, opts)
|
||||||
}
|
// }()
|
||||||
running.Add(1)
|
|
||||||
defer func() {
|
|
||||||
running.Done()
|
|
||||||
}()
|
|
||||||
|
|
||||||
logrus.Infof("Starting build %s/%s#%d.%d",
|
logrus.Infof("Starting build %s/%s#%d.%d",
|
||||||
w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number)
|
w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number)
|
||||||
@ -44,24 +39,9 @@ func (r *pipeline) run() error {
|
|||||||
cancel := make(chan bool, 1)
|
cancel := make(chan bool, 1)
|
||||||
engine := docker.NewClient(r.docker)
|
engine := docker.NewClient(r.docker)
|
||||||
|
|
||||||
// streaming the logs
|
|
||||||
// rc, wc := io.Pipe()
|
|
||||||
// defer func() {
|
|
||||||
// wc.Close()
|
|
||||||
// rc.Close()
|
|
||||||
// }()
|
|
||||||
|
|
||||||
var buf bytes.Buffer
|
|
||||||
|
|
||||||
stream, err := r.drone.LogStream(w.Job.ID)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
a := agent.Agent{
|
a := agent.Agent{
|
||||||
Update: agent.NewClientUpdater(r.drone),
|
Update: agent.NewClientUpdater(r.drone),
|
||||||
// Logger: agent.NewClientLogger(r.drone, w.Job.ID, rc, wc, r.config.logs),
|
Logger: agent.NewClientLogger(r.drone, w.Job.ID, r.config.logs),
|
||||||
Logger: agent.NewStreamLogger(stream, &buf, r.config.logs),
|
|
||||||
Engine: engine,
|
Engine: engine,
|
||||||
Timeout: r.config.timeout,
|
Timeout: r.config.timeout,
|
||||||
Platform: r.config.platform,
|
Platform: r.config.platform,
|
||||||
@ -70,27 +50,34 @@ func (r *pipeline) run() error {
|
|||||||
Pull: r.config.pull,
|
Pull: r.config.pull,
|
||||||
}
|
}
|
||||||
|
|
||||||
// signal for canceling the build.
|
cancelFunc := func(m *stomp.Message) {
|
||||||
wait := r.drone.Wait(w.Job.ID)
|
defer m.Release()
|
||||||
defer wait.Cancel()
|
|
||||||
go func() {
|
id := m.Header.GetInt64("job-id")
|
||||||
if _, err := wait.Done(); err == nil {
|
if id == w.Job.ID {
|
||||||
cancel <- true
|
cancel <- true
|
||||||
logrus.Infof("Cancel build %s/%s#%d.%d",
|
logrus.Infof("Cancel build %s/%s#%d.%d",
|
||||||
w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number)
|
w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// signal for canceling the build.
|
||||||
|
sub, err := r.drone.Subscribe("/topic/cancel", stomp.HandlerFunc(cancelFunc))
|
||||||
|
if err != nil {
|
||||||
|
logrus.Errorf("Error subscribing to /topic/cancel. %s", err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
r.drone.Unsubscribe(sub)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
a.Run(w, cancel)
|
a.Run(w, cancel)
|
||||||
|
|
||||||
if err := r.drone.LogPost(w.Job.ID, ioutil.NopCloser(&buf)); err != nil {
|
// if err := r.drone.LogPost(w.Job.ID, ioutil.NopCloser(&buf)); err != nil {
|
||||||
logrus.Errorf("Error sending logs for %s/%s#%d.%d",
|
// logrus.Errorf("Error sending logs for %s/%s#%d.%d",
|
||||||
w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number)
|
// w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number)
|
||||||
}
|
// }
|
||||||
stream.Close()
|
// stream.Close()
|
||||||
|
|
||||||
logrus.Infof("Finished build %s/%s#%d.%d",
|
logrus.Infof("Finished build %s/%s#%d.%d",
|
||||||
w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number)
|
w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number)
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
@ -13,7 +13,6 @@ import (
|
|||||||
"github.com/drone/drone/agent"
|
"github.com/drone/drone/agent"
|
||||||
"github.com/drone/drone/build/docker"
|
"github.com/drone/drone/build/docker"
|
||||||
"github.com/drone/drone/model"
|
"github.com/drone/drone/model"
|
||||||
"github.com/drone/drone/queue"
|
|
||||||
"github.com/drone/drone/yaml"
|
"github.com/drone/drone/yaml"
|
||||||
|
|
||||||
"github.com/codegangsta/cli"
|
"github.com/codegangsta/cli"
|
||||||
@ -340,7 +339,7 @@ func exec(c *cli.Context) error {
|
|||||||
Pull: c.Bool("pull"),
|
Pull: c.Bool("pull"),
|
||||||
}
|
}
|
||||||
|
|
||||||
payload := &queue.Work{
|
payload := &model.Work{
|
||||||
Yaml: string(file),
|
Yaml: string(file),
|
||||||
Verified: c.BoolT("yaml.verified"),
|
Verified: c.BoolT("yaml.verified"),
|
||||||
Signed: c.BoolT("yaml.signed"),
|
Signed: c.BoolT("yaml.signed"),
|
||||||
|
@ -6,10 +6,10 @@ import (
|
|||||||
|
|
||||||
"github.com/drone/drone/router"
|
"github.com/drone/drone/router"
|
||||||
"github.com/drone/drone/router/middleware"
|
"github.com/drone/drone/router/middleware"
|
||||||
"github.com/gin-gonic/contrib/ginrus"
|
|
||||||
|
|
||||||
"github.com/Sirupsen/logrus"
|
"github.com/Sirupsen/logrus"
|
||||||
"github.com/codegangsta/cli"
|
"github.com/codegangsta/cli"
|
||||||
|
"github.com/gin-gonic/contrib/ginrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
var serverCmd = cli.Command{
|
var serverCmd = cli.Command{
|
||||||
@ -288,13 +288,11 @@ func server(c *cli.Context) error {
|
|||||||
ginrus.Ginrus(logrus.StandardLogger(), time.RFC3339, true),
|
ginrus.Ginrus(logrus.StandardLogger(), time.RFC3339, true),
|
||||||
middleware.Version,
|
middleware.Version,
|
||||||
middleware.Config(c),
|
middleware.Config(c),
|
||||||
middleware.Queue(c),
|
|
||||||
middleware.Stream(c),
|
|
||||||
middleware.Bus(c),
|
|
||||||
middleware.Cache(c),
|
middleware.Cache(c),
|
||||||
middleware.Store(c),
|
middleware.Store(c),
|
||||||
middleware.Remote(c),
|
middleware.Remote(c),
|
||||||
middleware.Agents(c),
|
middleware.Agents(c),
|
||||||
|
middleware.Broker(c),
|
||||||
)
|
)
|
||||||
|
|
||||||
// start the server with tls enabled
|
// start the server with tls enabled
|
||||||
|
@ -1,6 +1,4 @@
|
|||||||
package bus
|
package model
|
||||||
|
|
||||||
import "github.com/drone/drone/model"
|
|
||||||
|
|
||||||
// EventType defines the possible types of build events.
|
// EventType defines the possible types of build events.
|
||||||
type EventType string
|
type EventType string
|
||||||
@ -15,14 +13,14 @@ const (
|
|||||||
// Event represents a build event.
|
// Event represents a build event.
|
||||||
type Event struct {
|
type Event struct {
|
||||||
Type EventType `json:"type"`
|
Type EventType `json:"type"`
|
||||||
Repo model.Repo `json:"repo"`
|
Repo Repo `json:"repo"`
|
||||||
Build model.Build `json:"build"`
|
Build Build `json:"build"`
|
||||||
Job model.Job `json:"job"`
|
Job Job `json:"job"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewEvent creates a new Event for the build, using copies of
|
// NewEvent creates a new Event for the build, using copies of
|
||||||
// the build data to avoid possible mutation or race conditions.
|
// the build data to avoid possible mutation or race conditions.
|
||||||
func NewEvent(t EventType, r *model.Repo, b *model.Build, j *model.Job) *Event {
|
func NewEvent(t EventType, r *Repo, b *Build, j *Job) *Event {
|
||||||
return &Event{
|
return &Event{
|
||||||
Type: t,
|
Type: t,
|
||||||
Repo: *r,
|
Repo: *r,
|
||||||
@ -31,7 +29,7 @@ func NewEvent(t EventType, r *model.Repo, b *model.Build, j *model.Job) *Event {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBuildEvent(t EventType, r *model.Repo, b *model.Build) *Event {
|
func NewBuildEvent(t EventType, r *Repo, b *Build) *Event {
|
||||||
return &Event{
|
return &Event{
|
||||||
Type: t,
|
Type: t,
|
||||||
Repo: *r,
|
Repo: *r,
|
19
model/work.go
Normal file
19
model/work.go
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
package model
|
||||||
|
|
||||||
|
// Work represents an item for work to be
|
||||||
|
// processed by a worker.
|
||||||
|
type Work struct {
|
||||||
|
Signed bool `json:"signed"`
|
||||||
|
Verified bool `json:"verified"`
|
||||||
|
Yaml string `json:"config"`
|
||||||
|
YamlEnc string `json:"secret"`
|
||||||
|
Repo *Repo `json:"repo"`
|
||||||
|
Build *Build `json:"build"`
|
||||||
|
BuildLast *Build `json:"build_last"`
|
||||||
|
Job *Job `json:"job"`
|
||||||
|
Netrc *Netrc `json:"netrc"`
|
||||||
|
Keys *Key `json:"keys"`
|
||||||
|
System *System `json:"system"`
|
||||||
|
Secrets []*Secret `json:"secrets"`
|
||||||
|
User *User `json:"user"`
|
||||||
|
}
|
@ -1,23 +0,0 @@
|
|||||||
package queue
|
|
||||||
|
|
||||||
import (
|
|
||||||
"golang.org/x/net/context"
|
|
||||||
)
|
|
||||||
|
|
||||||
const key = "queue"
|
|
||||||
|
|
||||||
// Setter defines a context that enables setting values.
|
|
||||||
type Setter interface {
|
|
||||||
Set(string, interface{})
|
|
||||||
}
|
|
||||||
|
|
||||||
// FromContext returns the Queue associated with this context.
|
|
||||||
func FromContext(c context.Context) Queue {
|
|
||||||
return c.Value(key).(Queue)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ToContext adds the Queue to this context if it supports
|
|
||||||
// the Setter interface.
|
|
||||||
func ToContext(c Setter, q Queue) {
|
|
||||||
c.Set(key, q)
|
|
||||||
}
|
|
@ -1,67 +0,0 @@
|
|||||||
package queue
|
|
||||||
|
|
||||||
//go:generate mockery -name Queue -output mock -case=underscore
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
|
|
||||||
"golang.org/x/net/context"
|
|
||||||
)
|
|
||||||
|
|
||||||
// ErrNotFound indicates the requested work item does not
|
|
||||||
// exist in the queue.
|
|
||||||
var ErrNotFound = errors.New("queue item not found")
|
|
||||||
|
|
||||||
type Queue interface {
|
|
||||||
// Publish inserts work at the tail of this queue, waiting for
|
|
||||||
// space to become available if the queue is full.
|
|
||||||
Publish(*Work) error
|
|
||||||
|
|
||||||
// Remove removes the specified work item from this queue,
|
|
||||||
// if it is present.
|
|
||||||
Remove(*Work) error
|
|
||||||
|
|
||||||
// PullClose retrieves and removes the head of this queue,
|
|
||||||
// waiting if necessary until work becomes available.
|
|
||||||
Pull() *Work
|
|
||||||
|
|
||||||
// PullClose retrieves and removes the head of this queue,
|
|
||||||
// waiting if necessary until work becomes available. The
|
|
||||||
// CloseNotifier should be provided to clone the channel
|
|
||||||
// if the subscribing client terminates its connection.
|
|
||||||
PullClose(CloseNotifier) *Work
|
|
||||||
}
|
|
||||||
|
|
||||||
// Publish inserts work at the tail of this queue, waiting for
|
|
||||||
// space to become available if the queue is full.
|
|
||||||
func Publish(c context.Context, w *Work) error {
|
|
||||||
return FromContext(c).Publish(w)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove removes the specified work item from this queue,
|
|
||||||
// if it is present.
|
|
||||||
func Remove(c context.Context, w *Work) error {
|
|
||||||
return FromContext(c).Remove(w)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Pull retrieves and removes the head of this queue,
|
|
||||||
// waiting if necessary until work becomes available.
|
|
||||||
func Pull(c context.Context) *Work {
|
|
||||||
return FromContext(c).Pull()
|
|
||||||
}
|
|
||||||
|
|
||||||
// PullClose retrieves and removes the head of this queue,
|
|
||||||
// waiting if necessary until work becomes available. The
|
|
||||||
// CloseNotifier should be provided to clone the channel
|
|
||||||
// if the subscribing client terminates its connection.
|
|
||||||
func PullClose(c context.Context, cn CloseNotifier) *Work {
|
|
||||||
return FromContext(c).PullClose(cn)
|
|
||||||
}
|
|
||||||
|
|
||||||
// CloseNotifier defines a datastructure that is capable of notifying
|
|
||||||
// a subscriber when its connection is closed.
|
|
||||||
type CloseNotifier interface {
|
|
||||||
// CloseNotify returns a channel that receives a single value
|
|
||||||
// when the client connection has gone away.
|
|
||||||
CloseNotify() <-chan bool
|
|
||||||
}
|
|
@ -1,85 +0,0 @@
|
|||||||
package queue
|
|
||||||
|
|
||||||
import "sync"
|
|
||||||
|
|
||||||
type queue struct {
|
|
||||||
sync.Mutex
|
|
||||||
|
|
||||||
items map[*Work]struct{}
|
|
||||||
itemc chan *Work
|
|
||||||
}
|
|
||||||
|
|
||||||
func New() Queue {
|
|
||||||
return newQueue()
|
|
||||||
}
|
|
||||||
|
|
||||||
func newQueue() *queue {
|
|
||||||
return &queue{
|
|
||||||
items: make(map[*Work]struct{}),
|
|
||||||
itemc: make(chan *Work, 999),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (q *queue) Publish(work *Work) error {
|
|
||||||
q.Lock()
|
|
||||||
q.items[work] = struct{}{}
|
|
||||||
q.Unlock()
|
|
||||||
q.itemc <- work
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (q *queue) Remove(work *Work) error {
|
|
||||||
q.Lock()
|
|
||||||
defer q.Unlock()
|
|
||||||
|
|
||||||
_, ok := q.items[work]
|
|
||||||
if !ok {
|
|
||||||
return ErrNotFound
|
|
||||||
}
|
|
||||||
var items []*Work
|
|
||||||
|
|
||||||
// loop through and drain all items
|
|
||||||
// from the
|
|
||||||
drain:
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case item := <-q.itemc:
|
|
||||||
items = append(items, item)
|
|
||||||
default:
|
|
||||||
break drain
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// re-add all items to the queue except
|
|
||||||
// the item we're trying to remove
|
|
||||||
for _, item := range items {
|
|
||||||
if item == work {
|
|
||||||
delete(q.items, work)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
q.itemc <- item
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (q *queue) Pull() *Work {
|
|
||||||
work := <-q.itemc
|
|
||||||
q.Lock()
|
|
||||||
delete(q.items, work)
|
|
||||||
q.Unlock()
|
|
||||||
return work
|
|
||||||
}
|
|
||||||
|
|
||||||
func (q *queue) PullClose(cn CloseNotifier) *Work {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-cn.CloseNotify():
|
|
||||||
return nil
|
|
||||||
case work := <-q.itemc:
|
|
||||||
q.Lock()
|
|
||||||
delete(q.items, work)
|
|
||||||
q.Unlock()
|
|
||||||
return work
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,93 +0,0 @@
|
|||||||
package queue
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
. "github.com/franela/goblin"
|
|
||||||
"github.com/gin-gonic/gin"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestBuild(t *testing.T) {
|
|
||||||
g := Goblin(t)
|
|
||||||
g.Describe("Queue", func() {
|
|
||||||
|
|
||||||
g.It("Should publish item", func() {
|
|
||||||
c := new(gin.Context)
|
|
||||||
q := newQueue()
|
|
||||||
ToContext(c, q)
|
|
||||||
|
|
||||||
w1 := &Work{}
|
|
||||||
w2 := &Work{}
|
|
||||||
Publish(c, w1)
|
|
||||||
Publish(c, w2)
|
|
||||||
g.Assert(len(q.items)).Equal(2)
|
|
||||||
g.Assert(len(q.itemc)).Equal(2)
|
|
||||||
})
|
|
||||||
|
|
||||||
g.It("Should remove item", func() {
|
|
||||||
c := new(gin.Context)
|
|
||||||
q := newQueue()
|
|
||||||
ToContext(c, q)
|
|
||||||
|
|
||||||
w1 := &Work{}
|
|
||||||
w2 := &Work{}
|
|
||||||
w3 := &Work{}
|
|
||||||
Publish(c, w1)
|
|
||||||
Publish(c, w2)
|
|
||||||
Publish(c, w3)
|
|
||||||
Remove(c, w2)
|
|
||||||
g.Assert(len(q.items)).Equal(2)
|
|
||||||
g.Assert(len(q.itemc)).Equal(2)
|
|
||||||
|
|
||||||
g.Assert(Pull(c)).Equal(w1)
|
|
||||||
g.Assert(Pull(c)).Equal(w3)
|
|
||||||
g.Assert(Remove(c, w2)).Equal(ErrNotFound)
|
|
||||||
})
|
|
||||||
|
|
||||||
g.It("Should pull item", func() {
|
|
||||||
c := new(gin.Context)
|
|
||||||
q := New()
|
|
||||||
ToContext(c, q)
|
|
||||||
|
|
||||||
cn := new(closeNotifier)
|
|
||||||
cn.closec = make(chan bool, 1)
|
|
||||||
w1 := &Work{}
|
|
||||||
w2 := &Work{}
|
|
||||||
|
|
||||||
Publish(c, w1)
|
|
||||||
g.Assert(Pull(c)).Equal(w1)
|
|
||||||
|
|
||||||
Publish(c, w2)
|
|
||||||
g.Assert(PullClose(c, cn)).Equal(w2)
|
|
||||||
})
|
|
||||||
|
|
||||||
g.It("Should cancel pulling item", func() {
|
|
||||||
c := new(gin.Context)
|
|
||||||
q := New()
|
|
||||||
ToContext(c, q)
|
|
||||||
|
|
||||||
cn := new(closeNotifier)
|
|
||||||
cn.closec = make(chan bool, 1)
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
go func() {
|
|
||||||
wg.Add(1)
|
|
||||||
g.Assert(PullClose(c, cn) == nil).IsTrue()
|
|
||||||
wg.Done()
|
|
||||||
}()
|
|
||||||
go func() {
|
|
||||||
cn.closec <- true
|
|
||||||
}()
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
type closeNotifier struct {
|
|
||||||
closec chan bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *closeNotifier) CloseNotify() <-chan bool {
|
|
||||||
return c.closec
|
|
||||||
}
|
|
@ -1,21 +0,0 @@
|
|||||||
package queue
|
|
||||||
|
|
||||||
import "github.com/drone/drone/model"
|
|
||||||
|
|
||||||
// Work represents an item for work to be
|
|
||||||
// processed by a worker.
|
|
||||||
type Work struct {
|
|
||||||
Signed bool `json:"signed"`
|
|
||||||
Verified bool `json:"verified"`
|
|
||||||
Yaml string `json:"config"`
|
|
||||||
YamlEnc string `json:"secret"`
|
|
||||||
Repo *model.Repo `json:"repo"`
|
|
||||||
Build *model.Build `json:"build"`
|
|
||||||
BuildLast *model.Build `json:"build_last"`
|
|
||||||
Job *model.Job `json:"job"`
|
|
||||||
Netrc *model.Netrc `json:"netrc"`
|
|
||||||
Keys *model.Key `json:"keys"`
|
|
||||||
System *model.System `json:"system"`
|
|
||||||
Secrets []*model.Secret `json:"secrets"`
|
|
||||||
User *model.User `json:"user"`
|
|
||||||
}
|
|
52
router/middleware/broker.go
Normal file
52
router/middleware/broker.go
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
package middleware
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
handlers "github.com/drone/drone/server"
|
||||||
|
|
||||||
|
"github.com/codegangsta/cli"
|
||||||
|
"github.com/drone/mq/server"
|
||||||
|
"github.com/drone/mq/stomp"
|
||||||
|
|
||||||
|
"github.com/Sirupsen/logrus"
|
||||||
|
"github.com/gin-gonic/gin"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
serverKey = "broker"
|
||||||
|
clientKey = "stomp.client" // mirrored from stomp/context
|
||||||
|
)
|
||||||
|
|
||||||
|
// Broker is a middleware function that initializes the broker
|
||||||
|
// and adds the broker client to the request context.
|
||||||
|
func Broker(cli *cli.Context) gin.HandlerFunc {
|
||||||
|
secret := cli.String("agent-secret")
|
||||||
|
if secret == "" {
|
||||||
|
logrus.Fatalf("failed to generate token from DRONE_SECRET")
|
||||||
|
}
|
||||||
|
|
||||||
|
broker := server.NewServer(
|
||||||
|
server.WithCredentials("x-token", secret),
|
||||||
|
)
|
||||||
|
client := broker.Client()
|
||||||
|
|
||||||
|
var once sync.Once
|
||||||
|
return func(c *gin.Context) {
|
||||||
|
c.Set(serverKey, broker)
|
||||||
|
c.Set(clientKey, client)
|
||||||
|
once.Do(func() {
|
||||||
|
// this is some really hacky stuff
|
||||||
|
// turns out I need to do some refactoring
|
||||||
|
// don't judge!
|
||||||
|
// will fix in 0.6 release
|
||||||
|
ctx := c.Copy()
|
||||||
|
client.Connect(
|
||||||
|
stomp.WithCredentials("x-token", secret),
|
||||||
|
)
|
||||||
|
client.Subscribe("/queue/updates", stomp.HandlerFunc(func(m *stomp.Message) {
|
||||||
|
go handlers.HandleUpdate(ctx, m.Copy())
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
@ -1,17 +0,0 @@
|
|||||||
package middleware
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/drone/drone/bus"
|
|
||||||
|
|
||||||
"github.com/codegangsta/cli"
|
|
||||||
"github.com/gin-gonic/gin"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Bus is a middleware function that initializes the Event Bus and attaches to
|
|
||||||
// the context of every http.Request.
|
|
||||||
func Bus(cli *cli.Context) gin.HandlerFunc {
|
|
||||||
v := bus.New()
|
|
||||||
return func(c *gin.Context) {
|
|
||||||
bus.ToContext(c, v)
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,17 +0,0 @@
|
|||||||
package middleware
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/drone/drone/queue"
|
|
||||||
|
|
||||||
"github.com/codegangsta/cli"
|
|
||||||
"github.com/gin-gonic/gin"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Queue is a middleware function that initializes the Queue and attaches to
|
|
||||||
// the context of every http.Request.
|
|
||||||
func Queue(cli *cli.Context) gin.HandlerFunc {
|
|
||||||
v := queue.New()
|
|
||||||
return func(c *gin.Context) {
|
|
||||||
queue.ToContext(c, v)
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,17 +0,0 @@
|
|||||||
package middleware
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/drone/drone/stream"
|
|
||||||
|
|
||||||
"github.com/codegangsta/cli"
|
|
||||||
"github.com/gin-gonic/gin"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Stream is a middleware function that initializes the Stream and attaches to
|
|
||||||
// the context of every http.Request.
|
|
||||||
func Stream(cli *cli.Context) gin.HandlerFunc {
|
|
||||||
v := stream.New()
|
|
||||||
return func(c *gin.Context) {
|
|
||||||
stream.ToContext(c, v)
|
|
||||||
}
|
|
||||||
}
|
|
@ -113,17 +113,9 @@ func Load(middleware ...gin.HandlerFunc) http.Handler {
|
|||||||
e.POST("/hook", server.PostHook)
|
e.POST("/hook", server.PostHook)
|
||||||
e.POST("/api/hook", server.PostHook)
|
e.POST("/api/hook", server.PostHook)
|
||||||
|
|
||||||
stream := e.Group("/api/stream")
|
|
||||||
{
|
|
||||||
stream.Use(session.SetRepo())
|
|
||||||
stream.Use(session.SetPerm())
|
|
||||||
stream.Use(session.MustPull)
|
|
||||||
|
|
||||||
stream.GET("/:owner/:name", server.GetRepoEvents)
|
|
||||||
stream.GET("/:owner/:name/:build/:number", server.GetStream)
|
|
||||||
}
|
|
||||||
ws := e.Group("/ws")
|
ws := e.Group("/ws")
|
||||||
{
|
{
|
||||||
|
ws.GET("/broker", server.Broker)
|
||||||
ws.GET("/feed", server.EventStream)
|
ws.GET("/feed", server.EventStream)
|
||||||
ws.GET("/logs/:owner/:name/:build/:number",
|
ws.GET("/logs/:owner/:name/:build/:number",
|
||||||
session.SetRepo(),
|
session.SetRepo(),
|
||||||
@ -152,20 +144,6 @@ func Load(middleware ...gin.HandlerFunc) http.Handler {
|
|||||||
agents.GET("", server.GetAgents)
|
agents.GET("", server.GetAgents)
|
||||||
}
|
}
|
||||||
|
|
||||||
queue := e.Group("/api/queue")
|
|
||||||
{
|
|
||||||
queue.Use(session.AuthorizeAgent)
|
|
||||||
queue.POST("/pull", server.Pull)
|
|
||||||
queue.POST("/pull/:os/:arch", server.Pull)
|
|
||||||
queue.POST("/wait/:id", server.Wait)
|
|
||||||
queue.POST("/stream/:id", server.Stream)
|
|
||||||
queue.POST("/status/:id", server.Update)
|
|
||||||
queue.POST("/ping", server.Ping)
|
|
||||||
|
|
||||||
queue.POST("/logs/:id", server.PostLogs)
|
|
||||||
queue.GET("/logs/:id", server.WriteLogs)
|
|
||||||
}
|
|
||||||
|
|
||||||
// DELETE THESE
|
// DELETE THESE
|
||||||
// gitlab := e.Group("/gitlab/:owner/:name")
|
// gitlab := e.Group("/gitlab/:owner/:name")
|
||||||
// {
|
// {
|
||||||
|
13
server/broker.go
Normal file
13
server/broker.go
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/gin-gonic/gin"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Broker handles connections to the embedded message broker.
|
||||||
|
func Broker(c *gin.Context) {
|
||||||
|
broker := c.MustGet("broker").(http.Handler)
|
||||||
|
broker.ServeHTTP(c.Writer, c.Request)
|
||||||
|
}
|
@ -1,22 +1,23 @@
|
|||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/Sirupsen/logrus"
|
||||||
"github.com/drone/drone/bus"
|
|
||||||
"github.com/drone/drone/queue"
|
|
||||||
"github.com/drone/drone/remote"
|
"github.com/drone/drone/remote"
|
||||||
"github.com/drone/drone/shared/httputil"
|
"github.com/drone/drone/shared/httputil"
|
||||||
"github.com/drone/drone/store"
|
"github.com/drone/drone/store"
|
||||||
"github.com/drone/drone/stream"
|
"github.com/drone/drone/yaml"
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
"github.com/square/go-jose"
|
"github.com/square/go-jose"
|
||||||
|
|
||||||
"github.com/drone/drone/model"
|
"github.com/drone/drone/model"
|
||||||
"github.com/drone/drone/router/middleware/session"
|
"github.com/drone/drone/router/middleware/session"
|
||||||
|
"github.com/drone/mq/stomp"
|
||||||
)
|
)
|
||||||
|
|
||||||
func GetBuilds(c *gin.Context) {
|
func GetBuilds(c *gin.Context) {
|
||||||
@ -112,7 +113,7 @@ func GetBuildLogs(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
c.Header("Content-Type", "application/json")
|
c.Header("Content-Type", "application/json")
|
||||||
stream.Copy(c.Writer, r)
|
copyLogs(c.Writer, r)
|
||||||
}
|
}
|
||||||
|
|
||||||
func DeleteBuild(c *gin.Context) {
|
func DeleteBuild(c *gin.Context) {
|
||||||
@ -148,7 +149,14 @@ func DeleteBuild(c *gin.Context) {
|
|||||||
job.ExitCode = 137
|
job.ExitCode = 137
|
||||||
store.UpdateBuildJob(c, build, job)
|
store.UpdateBuildJob(c, build, job)
|
||||||
|
|
||||||
bus.Publish(c, bus.NewEvent(bus.Cancelled, repo, build, job))
|
client := stomp.MustFromContext(c)
|
||||||
|
client.SendJSON("/topic/cancel", model.Event{
|
||||||
|
Type: model.Cancelled,
|
||||||
|
Repo: *repo,
|
||||||
|
Build: *build,
|
||||||
|
Job: *job,
|
||||||
|
}, stomp.WithHeader("job-id", strconv.FormatInt(job.ID, 10)))
|
||||||
|
|
||||||
c.String(204, "")
|
c.String(204, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -294,7 +302,7 @@ func PostBuild(c *gin.Context) {
|
|||||||
last, _ := store.GetBuildLastBefore(c, repo, build.Branch, build.ID)
|
last, _ := store.GetBuildLastBefore(c, repo, build.Branch, build.ID)
|
||||||
secs, err := store.GetMergedSecretList(c, repo)
|
secs, err := store.GetMergedSecretList(c, repo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Error getting secrets for %s#%d. %s", repo.FullName, build.Number, err)
|
log.Debugf("Error getting secrets for %s#%d. %s", repo.FullName, build.Number, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var signed bool
|
var signed bool
|
||||||
@ -319,9 +327,19 @@ func PostBuild(c *gin.Context) {
|
|||||||
|
|
||||||
log.Debugf(".drone.yml is signed=%v and verified=%v", signed, verified)
|
log.Debugf(".drone.yml is signed=%v and verified=%v", signed, verified)
|
||||||
|
|
||||||
bus.Publish(c, bus.NewBuildEvent(bus.Enqueued, repo, build))
|
client := stomp.MustFromContext(c)
|
||||||
|
client.SendJSON("/topic/events", model.Event{
|
||||||
|
Type: model.Enqueued,
|
||||||
|
Repo: *repo,
|
||||||
|
Build: *build,
|
||||||
|
},
|
||||||
|
stomp.WithHeader("repo", repo.FullName),
|
||||||
|
stomp.WithHeader("private", strconv.FormatBool(repo.IsPrivate)),
|
||||||
|
)
|
||||||
|
|
||||||
for _, job := range jobs {
|
for _, job := range jobs {
|
||||||
queue.Publish(c, &queue.Work{
|
broker, _ := stomp.FromContext(c)
|
||||||
|
broker.SendJSON("/queue/pending", &model.Work{
|
||||||
Signed: signed,
|
Signed: signed,
|
||||||
Verified: verified,
|
Verified: verified,
|
||||||
User: user,
|
User: user,
|
||||||
@ -333,7 +351,15 @@ func PostBuild(c *gin.Context) {
|
|||||||
Yaml: string(raw),
|
Yaml: string(raw),
|
||||||
Secrets: secs,
|
Secrets: secs,
|
||||||
System: &model.System{Link: httputil.GetURL(c.Request)},
|
System: &model.System{Link: httputil.GetURL(c.Request)},
|
||||||
})
|
},
|
||||||
|
stomp.WithHeader(
|
||||||
|
"platform",
|
||||||
|
yaml.ParsePlatformDefault(raw, "linux/amd64"),
|
||||||
|
),
|
||||||
|
stomp.WithHeaders(
|
||||||
|
yaml.ParseLabel(raw),
|
||||||
|
),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -345,3 +371,20 @@ func GetBuildQueue(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
c.JSON(200, out)
|
c.JSON(200, out)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// copyLogs copies the stream from the source to the destination in valid JSON
|
||||||
|
// format. This converts the logs, which are per-line JSON objects, to a
|
||||||
|
// proper JSON array.
|
||||||
|
func copyLogs(dest io.Writer, src io.Reader) error {
|
||||||
|
io.WriteString(dest, "[")
|
||||||
|
|
||||||
|
scanner := bufio.NewScanner(src)
|
||||||
|
for scanner.Scan() {
|
||||||
|
io.WriteString(dest, scanner.Text())
|
||||||
|
io.WriteString(dest, ",\n")
|
||||||
|
}
|
||||||
|
|
||||||
|
io.WriteString(dest, "{}]")
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -3,19 +3,19 @@ package server
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"regexp"
|
"regexp"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
"github.com/square/go-jose"
|
"github.com/square/go-jose"
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/Sirupsen/logrus"
|
||||||
"github.com/drone/drone/bus"
|
|
||||||
"github.com/drone/drone/model"
|
"github.com/drone/drone/model"
|
||||||
"github.com/drone/drone/queue"
|
|
||||||
"github.com/drone/drone/remote"
|
"github.com/drone/drone/remote"
|
||||||
"github.com/drone/drone/shared/httputil"
|
"github.com/drone/drone/shared/httputil"
|
||||||
"github.com/drone/drone/shared/token"
|
"github.com/drone/drone/shared/token"
|
||||||
"github.com/drone/drone/store"
|
"github.com/drone/drone/store"
|
||||||
"github.com/drone/drone/yaml"
|
"github.com/drone/drone/yaml"
|
||||||
|
"github.com/drone/mq/stomp"
|
||||||
)
|
)
|
||||||
|
|
||||||
var skipRe = regexp.MustCompile(`\[(?i:ci *skip|skip *ci)\]`)
|
var skipRe = regexp.MustCompile(`\[(?i:ci *skip|skip *ci)\]`)
|
||||||
@ -208,12 +208,22 @@ func PostHook(c *gin.Context) {
|
|||||||
last, _ := store.GetBuildLastBefore(c, repo, build.Branch, build.ID)
|
last, _ := store.GetBuildLastBefore(c, repo, build.Branch, build.ID)
|
||||||
secs, err := store.GetMergedSecretList(c, repo)
|
secs, err := store.GetMergedSecretList(c, repo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Error getting secrets for %s#%d. %s", repo.FullName, build.Number, err)
|
log.Debugf("Error getting secrets for %s#%d. %s", repo.FullName, build.Number, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
bus.Publish(c, bus.NewBuildEvent(bus.Enqueued, repo, build))
|
client := stomp.MustFromContext(c)
|
||||||
|
client.SendJSON("/topic/events", model.Event{
|
||||||
|
Type: model.Enqueued,
|
||||||
|
Repo: *repo,
|
||||||
|
Build: *build,
|
||||||
|
},
|
||||||
|
stomp.WithHeader("repo", repo.FullName),
|
||||||
|
stomp.WithHeader("private", strconv.FormatBool(repo.IsPrivate)),
|
||||||
|
)
|
||||||
|
|
||||||
for _, job := range jobs {
|
for _, job := range jobs {
|
||||||
queue.Publish(c, &queue.Work{
|
broker, _ := stomp.FromContext(c)
|
||||||
|
broker.SendJSON("/queue/pending", &model.Work{
|
||||||
Signed: build.Signed,
|
Signed: build.Signed,
|
||||||
Verified: build.Verified,
|
Verified: build.Verified,
|
||||||
User: user,
|
User: user,
|
||||||
@ -225,7 +235,15 @@ func PostHook(c *gin.Context) {
|
|||||||
Yaml: string(raw),
|
Yaml: string(raw),
|
||||||
Secrets: secs,
|
Secrets: secs,
|
||||||
System: &model.System{Link: httputil.GetURL(c.Request)},
|
System: &model.System{Link: httputil.GetURL(c.Request)},
|
||||||
})
|
},
|
||||||
|
stomp.WithHeader(
|
||||||
|
"platform",
|
||||||
|
yaml.ParsePlatformDefault(raw, "linux/amd64"),
|
||||||
|
),
|
||||||
|
stomp.WithHeaders(
|
||||||
|
yaml.ParseLabel(raw),
|
||||||
|
),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
320
server/queue.go
320
server/queue.go
@ -1,80 +1,46 @@
|
|||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
"github.com/Sirupsen/logrus"
|
"github.com/Sirupsen/logrus"
|
||||||
"github.com/drone/drone/bus"
|
|
||||||
"github.com/drone/drone/model"
|
"github.com/drone/drone/model"
|
||||||
"github.com/drone/drone/queue"
|
|
||||||
"github.com/drone/drone/remote"
|
"github.com/drone/drone/remote"
|
||||||
"github.com/drone/drone/store"
|
"github.com/drone/drone/store"
|
||||||
"github.com/drone/drone/stream"
|
"github.com/drone/mq/stomp"
|
||||||
"github.com/gin-gonic/gin"
|
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Pull is a long request that polls and attemts to pull work off the queue stack.
|
// newline defines a newline constant to separate lines in the build output
|
||||||
func Pull(c *gin.Context) {
|
var newline = []byte{'\n'}
|
||||||
logrus.Debugf("Agent %s connected.", c.ClientIP())
|
|
||||||
|
|
||||||
w := queue.PullClose(c, c.Writer)
|
// upgrader defines the default behavior for upgrading the websocket.
|
||||||
if w == nil {
|
var upgrader = websocket.Upgrader{
|
||||||
logrus.Debugf("Agent %s could not pull work.", c.ClientIP())
|
ReadBufferSize: 1024,
|
||||||
} else {
|
WriteBufferSize: 1024,
|
||||||
|
CheckOrigin: func(r *http.Request) bool {
|
||||||
// setup the channel to stream logs
|
return true
|
||||||
if err := stream.Create(c, stream.ToKey(w.Job.ID)); err != nil {
|
},
|
||||||
logrus.Errorf("Unable to create stream. %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
c.JSON(202, w)
|
|
||||||
|
|
||||||
logrus.Debugf("Agent %s assigned work. %s/%s#%d.%d",
|
|
||||||
c.ClientIP(),
|
|
||||||
w.Repo.Owner,
|
|
||||||
w.Repo.Name,
|
|
||||||
w.Build.Number,
|
|
||||||
w.Job.Number,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait is a long request that polls and waits for cancelled build requests.
|
// HandleUpdate handles build updates from the agent and persists to the database.
|
||||||
func Wait(c *gin.Context) {
|
func HandleUpdate(c context.Context, message *stomp.Message) {
|
||||||
id, err := strconv.ParseInt(c.Param("id"), 10, 64)
|
defer func() {
|
||||||
if err != nil {
|
message.Release()
|
||||||
c.String(500, "Invalid input. %s", err)
|
if r := recover(); r != nil {
|
||||||
return
|
err := r.(error)
|
||||||
|
logrus.Errorf("Panic recover: broker update handler: %s", err)
|
||||||
}
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
eventc := make(chan *bus.Event, 1)
|
work := new(model.Work)
|
||||||
|
if err := message.Unmarshal(work); err != nil {
|
||||||
bus.Subscribe(c, eventc)
|
|
||||||
defer bus.Unsubscribe(c, eventc)
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case event := <-eventc:
|
|
||||||
if event.Job.ID == id && event.Type == bus.Cancelled {
|
|
||||||
c.JSON(200, event.Job)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
case <-c.Writer.CloseNotify():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update handles build updates from the agent and persists to the database.
|
|
||||||
func Update(c *gin.Context) {
|
|
||||||
work := &queue.Work{}
|
|
||||||
if err := c.BindJSON(work); err != nil {
|
|
||||||
logrus.Errorf("Invalid input. %s", err)
|
logrus.Errorf("Invalid input. %s", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -85,12 +51,12 @@ func Update(c *gin.Context) {
|
|||||||
// empty values if we just saved what was coming in the http.Request body.
|
// empty values if we just saved what was coming in the http.Request body.
|
||||||
build, err := store.GetBuild(c, work.Build.ID)
|
build, err := store.GetBuild(c, work.Build.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.String(404, "Unable to find build. %s", err)
|
logrus.Errorf("Unable to find build. %s", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
job, err := store.GetJob(c, work.Job.ID)
|
job, err := store.GetJob(c, work.Job.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.String(404, "Unable to find job. %s", err)
|
logrus.Errorf("Unable to find job. %s", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
build.Started = work.Build.Started
|
build.Started = work.Build.Started
|
||||||
@ -117,189 +83,81 @@ func Update(c *gin.Context) {
|
|||||||
|
|
||||||
ok, err := store.UpdateBuildJob(c, build, job)
|
ok, err := store.UpdateBuildJob(c, build, job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.String(500, "Unable to update job. %s", err)
|
logrus.Errorf("Unable to update job. %s", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if ok && build.Status != model.StatusRunning {
|
if ok {
|
||||||
// get the user because we transfer the user form the server to agent
|
// get the user because we transfer the user form the server to agent
|
||||||
// and back we lose the token which does not get serialized to json.
|
// and back we lose the token which does not get serialized to json.
|
||||||
user, err := store.GetUser(c, work.User.ID)
|
user, uerr := store.GetUser(c, work.User.ID)
|
||||||
if err != nil {
|
if uerr != nil {
|
||||||
c.String(500, "Unable to find user. %s", err)
|
logrus.Errorf("Unable to find user. %s", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
remote.Status(c, user, work.Repo, build,
|
remote.Status(c, user, work.Repo, build,
|
||||||
fmt.Sprintf("%s/%s/%d", work.System.Link, work.Repo.FullName, work.Build.Number))
|
fmt.Sprintf("%s/%s/%d", work.System.Link, work.Repo.FullName, work.Build.Number))
|
||||||
}
|
}
|
||||||
|
|
||||||
if build.Status == model.StatusRunning {
|
client := stomp.MustFromContext(c)
|
||||||
bus.Publish(c, bus.NewEvent(bus.Started, work.Repo, build, job))
|
err = client.SendJSON("/topic/events", model.Event{
|
||||||
} else {
|
Type: func() model.EventType {
|
||||||
bus.Publish(c, bus.NewEvent(bus.Finished, work.Repo, build, job))
|
// HACK we don't even really care about the event type.
|
||||||
|
// so we should just simplify how events are triggered.
|
||||||
|
if job.Status == model.StatusRunning {
|
||||||
|
return model.Started
|
||||||
}
|
}
|
||||||
|
return model.Finished
|
||||||
c.JSON(200, work)
|
}(),
|
||||||
}
|
Repo: *work.Repo,
|
||||||
|
Build: *build,
|
||||||
// Stream streams the logs to disk or memory for broadcasing to listeners. Once
|
Job: *job,
|
||||||
// the stream is closed it is moved to permanent storage in the database.
|
|
||||||
func Stream(c *gin.Context) {
|
|
||||||
id, err := strconv.ParseInt(c.Param("id"), 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
c.String(500, "Invalid input. %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
key := c.Param("id")
|
|
||||||
logrus.Infof("Agent %s creating stream %s.", c.ClientIP(), key)
|
|
||||||
|
|
||||||
wc, err := stream.Writer(c, key)
|
|
||||||
if err != nil {
|
|
||||||
c.String(500, "Failed to create stream writer. %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
wc.Close()
|
|
||||||
stream.Delete(c, key)
|
|
||||||
}()
|
|
||||||
|
|
||||||
io.Copy(wc, c.Request.Body)
|
|
||||||
|
|
||||||
rc, err := stream.Reader(c, key)
|
|
||||||
if err != nil {
|
|
||||||
c.String(500, "Failed to create stream reader. %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
wg := sync.WaitGroup{}
|
|
||||||
wg.Add(1)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
defer recover()
|
|
||||||
store.WriteLog(c, &model.Job{ID: id}, rc)
|
|
||||||
wg.Done()
|
|
||||||
}()
|
|
||||||
|
|
||||||
wc.Close()
|
|
||||||
wg.Wait()
|
|
||||||
c.String(200, "")
|
|
||||||
|
|
||||||
logrus.Debugf("Agent %s wrote stream to database", c.ClientIP())
|
|
||||||
}
|
|
||||||
|
|
||||||
func Ping(c *gin.Context) {
|
|
||||||
agent, err := store.GetAgentAddr(c, c.ClientIP())
|
|
||||||
if err == nil {
|
|
||||||
agent.Updated = time.Now().Unix()
|
|
||||||
err = store.UpdateAgent(c, agent)
|
|
||||||
} else {
|
|
||||||
err = store.CreateAgent(c, &model.Agent{
|
|
||||||
Address: c.ClientIP(),
|
|
||||||
Platform: "linux/amd64",
|
|
||||||
Capacity: 2,
|
|
||||||
Created: time.Now().Unix(),
|
|
||||||
Updated: time.Now().Unix(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
logrus.Errorf("Unable to register agent. %s", err.Error())
|
|
||||||
}
|
|
||||||
c.String(200, "PONG")
|
|
||||||
}
|
|
||||||
|
|
||||||
//
|
|
||||||
//
|
|
||||||
// Below are alternate implementations for the Queue that use websockets.
|
|
||||||
//
|
|
||||||
//
|
|
||||||
|
|
||||||
// PostLogs handles an http request from the agent to post build logs. These
|
|
||||||
// logs are posted at the end of the build process.
|
|
||||||
func PostLogs(c *gin.Context) {
|
|
||||||
id, _ := strconv.ParseInt(c.Param("id"), 10, 64)
|
|
||||||
job, err := store.GetJob(c, id)
|
|
||||||
if err != nil {
|
|
||||||
c.String(404, "Cannot upload logs. %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err := store.WriteLog(c, job, c.Request.Body); err != nil {
|
|
||||||
c.String(500, "Cannot persist logs", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
c.String(200, "")
|
|
||||||
}
|
|
||||||
|
|
||||||
// WriteLogs handles an http request from the agent to stream build logs from
|
|
||||||
// the agent to the server to enable real time streamings to the client.
|
|
||||||
func WriteLogs(c *gin.Context) {
|
|
||||||
id, err := strconv.ParseInt(c.Param("id"), 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
c.String(500, "Invalid input. %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
|
|
||||||
if err != nil {
|
|
||||||
c.String(500, "Cannot upgrade to websocket. %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer conn.Close()
|
|
||||||
|
|
||||||
wc, err := stream.Writer(c, stream.ToKey(id))
|
|
||||||
if err != nil {
|
|
||||||
c.String(500, "Cannot create stream writer. %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer func() {
|
|
||||||
wc.Close()
|
|
||||||
stream.Delete(c, stream.ToKey(id))
|
|
||||||
}()
|
|
||||||
|
|
||||||
var msg []byte
|
|
||||||
for {
|
|
||||||
_, msg, err = conn.ReadMessage()
|
|
||||||
if err != nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
wc.Write(msg)
|
|
||||||
wc.Write(newline)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil && err != io.EOF {
|
|
||||||
c.String(500, "Error reading logs. %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
//
|
|
||||||
// rc, err := stream.Reader(c, stream.ToKey(id))
|
|
||||||
// if err != nil {
|
|
||||||
// c.String(500, "Failed to create stream reader. %s", err)
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// wg := sync.WaitGroup{}
|
|
||||||
// wg.Add(1)
|
|
||||||
//
|
|
||||||
// go func() {
|
|
||||||
// defer recover()
|
|
||||||
// store.WriteLog(c, &model.Job{ID: id}, rc)
|
|
||||||
// wg.Done()
|
|
||||||
// }()
|
|
||||||
//
|
|
||||||
// wc.Close()
|
|
||||||
// wg.Wait()
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// newline defines a newline constant to separate lines in the build output
|
|
||||||
var newline = []byte{'\n'}
|
|
||||||
|
|
||||||
// upgrader defines the default behavior for upgrading the websocket.
|
|
||||||
var upgrader = websocket.Upgrader{
|
|
||||||
ReadBufferSize: 1024,
|
|
||||||
WriteBufferSize: 1024,
|
|
||||||
CheckOrigin: func(r *http.Request) bool {
|
|
||||||
return true
|
|
||||||
},
|
},
|
||||||
|
stomp.WithHeader("repo", work.Repo.FullName),
|
||||||
|
stomp.WithHeader("private", strconv.FormatBool(work.Repo.IsPrivate)),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
logrus.Errorf("Unable to publish to /topic/events. %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if job.Status == model.StatusRunning {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var buf bytes.Buffer
|
||||||
|
var sub []byte
|
||||||
|
|
||||||
|
done := make(chan bool)
|
||||||
|
dest := fmt.Sprintf("/topic/logs.%d", job.ID)
|
||||||
|
sub, err = client.Subscribe(dest, stomp.HandlerFunc(func(m *stomp.Message) {
|
||||||
|
defer m.Release()
|
||||||
|
if m.Header.GetBool("eof") {
|
||||||
|
done <- true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
buf.Write(m.Body)
|
||||||
|
buf.WriteByte('\n')
|
||||||
|
}))
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
logrus.Errorf("Unable to read logs from broker. %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
client.Unsubscribe(sub)
|
||||||
|
client.Send(dest, []byte{}, stomp.WithRetain("remove"))
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(30 * time.Second):
|
||||||
|
logrus.Errorf("Unable to read logs from broker. Timeout. %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := store.WriteLog(c, job, &buf); err != nil {
|
||||||
|
logrus.Errorf("Unable to write logs to store. %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
190
server/stream.go
190
server/stream.go
@ -1,121 +1,21 @@
|
|||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"fmt"
|
||||||
"encoding/json"
|
|
||||||
"io"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/drone/drone/bus"
|
|
||||||
"github.com/drone/drone/cache"
|
"github.com/drone/drone/cache"
|
||||||
"github.com/drone/drone/model"
|
"github.com/drone/drone/model"
|
||||||
"github.com/drone/drone/router/middleware/session"
|
"github.com/drone/drone/router/middleware/session"
|
||||||
"github.com/drone/drone/store"
|
"github.com/drone/drone/store"
|
||||||
"github.com/drone/drone/stream"
|
"github.com/drone/mq/stomp"
|
||||||
|
|
||||||
"github.com/Sirupsen/logrus"
|
"github.com/Sirupsen/logrus"
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
"github.com/manucorporat/sse"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// GetRepoEvents will upgrade the connection to a Websocket and will stream
|
|
||||||
// event updates to the browser.
|
|
||||||
func GetRepoEvents(c *gin.Context) {
|
|
||||||
repo := session.Repo(c)
|
|
||||||
c.Writer.Header().Set("Content-Type", "text/event-stream")
|
|
||||||
|
|
||||||
eventc := make(chan *bus.Event, 1)
|
|
||||||
bus.Subscribe(c, eventc)
|
|
||||||
defer func() {
|
|
||||||
bus.Unsubscribe(c, eventc)
|
|
||||||
close(eventc)
|
|
||||||
logrus.Infof("closed event stream")
|
|
||||||
}()
|
|
||||||
|
|
||||||
c.Stream(func(w io.Writer) bool {
|
|
||||||
select {
|
|
||||||
case event := <-eventc:
|
|
||||||
if event == nil {
|
|
||||||
logrus.Infof("nil event received")
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO(bradrydzewski) This is a super hacky workaround until we improve
|
|
||||||
// the actual bus. Having a per-call database event is just plain stupid.
|
|
||||||
if event.Repo.FullName == repo.FullName {
|
|
||||||
|
|
||||||
var payload = struct {
|
|
||||||
model.Build
|
|
||||||
Jobs []*model.Job `json:"jobs"`
|
|
||||||
}{}
|
|
||||||
payload.Build = event.Build
|
|
||||||
payload.Jobs, _ = store.GetJobList(c, &event.Build)
|
|
||||||
data, _ := json.Marshal(&payload)
|
|
||||||
|
|
||||||
sse.Encode(w, sse.Event{
|
|
||||||
Event: "message",
|
|
||||||
Data: string(data),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
case <-c.Writer.CloseNotify():
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func GetStream(c *gin.Context) {
|
|
||||||
|
|
||||||
repo := session.Repo(c)
|
|
||||||
buildn, _ := strconv.Atoi(c.Param("build"))
|
|
||||||
jobn, _ := strconv.Atoi(c.Param("number"))
|
|
||||||
|
|
||||||
c.Writer.Header().Set("Content-Type", "text/event-stream")
|
|
||||||
|
|
||||||
build, err := store.GetBuildNumber(c, repo, buildn)
|
|
||||||
if err != nil {
|
|
||||||
logrus.Debugln("stream cannot get build number.", err)
|
|
||||||
c.AbortWithError(404, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
job, err := store.GetJobNumber(c, build, jobn)
|
|
||||||
if err != nil {
|
|
||||||
logrus.Debugln("stream cannot get job number.", err)
|
|
||||||
c.AbortWithError(404, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
rc, err := stream.Reader(c, stream.ToKey(job.ID))
|
|
||||||
if err != nil {
|
|
||||||
c.AbortWithError(404, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
<-c.Writer.CloseNotify()
|
|
||||||
rc.Close()
|
|
||||||
}()
|
|
||||||
|
|
||||||
var line int
|
|
||||||
var scanner = bufio.NewScanner(rc)
|
|
||||||
for scanner.Scan() {
|
|
||||||
line++
|
|
||||||
var err = sse.Encode(c.Writer, sse.Event{
|
|
||||||
Id: strconv.Itoa(line),
|
|
||||||
Event: "message",
|
|
||||||
Data: scanner.Text(),
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
c.Writer.Flush()
|
|
||||||
}
|
|
||||||
|
|
||||||
logrus.Debugf("Closed stream %s#%d", repo.FullName, build.Number)
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// Time allowed to write the file to the client.
|
// Time allowed to write the file to the client.
|
||||||
writeWait = 5 * time.Second
|
writeWait = 5 * time.Second
|
||||||
@ -165,28 +65,34 @@ func LogStream(c *gin.Context) {
|
|||||||
ticker := time.NewTicker(pingPeriod)
|
ticker := time.NewTicker(pingPeriod)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
rc, err := stream.Reader(c, stream.ToKey(job.ID))
|
logs := make(chan []byte)
|
||||||
|
done := make(chan bool)
|
||||||
|
dest := fmt.Sprintf("/topic/logs.%d", job.ID)
|
||||||
|
client, _ := stomp.FromContext(c)
|
||||||
|
sub, err := client.Subscribe(dest, stomp.HandlerFunc(func(m *stomp.Message) {
|
||||||
|
if m.Header.GetBool("eof") {
|
||||||
|
done <- true
|
||||||
|
} else {
|
||||||
|
logs <- m.Body
|
||||||
|
}
|
||||||
|
m.Release()
|
||||||
|
}))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.AbortWithError(404, err)
|
logrus.Errorf("Unable to read logs from broker. %s", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
quitc := make(chan bool)
|
|
||||||
defer func() {
|
defer func() {
|
||||||
quitc <- true
|
client.Unsubscribe(sub)
|
||||||
close(quitc)
|
close(done)
|
||||||
rc.Close()
|
close(logs)
|
||||||
ws.Close()
|
|
||||||
logrus.Debug("Successfully closed websocket")
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
go func() {
|
|
||||||
defer func() {
|
|
||||||
recover()
|
|
||||||
}()
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-quitc:
|
case buf := <-logs:
|
||||||
|
ws.SetWriteDeadline(time.Now().Add(writeWait))
|
||||||
|
ws.WriteMessage(websocket.TextMessage, buf)
|
||||||
|
case <-done:
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeWait))
|
err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeWait))
|
||||||
@ -195,18 +101,6 @@ func LogStream(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
|
||||||
|
|
||||||
var scanner = bufio.NewScanner(rc)
|
|
||||||
var b []byte
|
|
||||||
for scanner.Scan() {
|
|
||||||
b = scanner.Bytes()
|
|
||||||
if len(b) == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
ws.SetWriteDeadline(time.Now().Add(writeWait))
|
|
||||||
ws.WriteMessage(websocket.TextMessage, b)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// EventStream produces the User event stream, sending all repository, build
|
// EventStream produces the User event stream, sending all repository, build
|
||||||
@ -227,20 +121,34 @@ func EventStream(c *gin.Context) {
|
|||||||
repo, _ = cache.GetRepoMap(c, user)
|
repo, _ = cache.GetRepoMap(c, user)
|
||||||
}
|
}
|
||||||
|
|
||||||
ticker := time.NewTicker(pingPeriod)
|
eventc := make(chan []byte, 10)
|
||||||
quitc := make(chan bool)
|
quitc := make(chan bool)
|
||||||
eventc := make(chan *bus.Event, 10)
|
tick := time.NewTicker(pingPeriod)
|
||||||
bus.Subscribe(c, eventc)
|
|
||||||
defer func() {
|
defer func() {
|
||||||
ticker.Stop()
|
tick.Stop()
|
||||||
bus.Unsubscribe(c, eventc)
|
|
||||||
quitc <- true
|
|
||||||
close(quitc)
|
|
||||||
close(eventc)
|
|
||||||
ws.Close()
|
ws.Close()
|
||||||
logrus.Debug("Successfully closed websocket")
|
logrus.Debug("Successfully closed websocket")
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
client := stomp.MustFromContext(c)
|
||||||
|
sub, err := client.Subscribe("/topic/events", stomp.HandlerFunc(func(m *stomp.Message) {
|
||||||
|
name := m.Header.GetString("repo")
|
||||||
|
priv := m.Header.GetBool("private")
|
||||||
|
if repo[name] || !priv {
|
||||||
|
eventc <- m.Body
|
||||||
|
}
|
||||||
|
m.Release()
|
||||||
|
}))
|
||||||
|
if err != nil {
|
||||||
|
logrus.Errorf("Unable to read logs from broker. %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
client.Unsubscribe(sub)
|
||||||
|
close(quitc)
|
||||||
|
close(eventc)
|
||||||
|
}()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer func() {
|
defer func() {
|
||||||
recover()
|
recover()
|
||||||
@ -249,15 +157,13 @@ func EventStream(c *gin.Context) {
|
|||||||
select {
|
select {
|
||||||
case <-quitc:
|
case <-quitc:
|
||||||
return
|
return
|
||||||
case event := <-eventc:
|
case event, ok := <-eventc:
|
||||||
if event == nil {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if repo[event.Repo.FullName] || !event.Repo.IsPrivate {
|
|
||||||
ws.SetWriteDeadline(time.Now().Add(writeWait))
|
ws.SetWriteDeadline(time.Now().Add(writeWait))
|
||||||
ws.WriteJSON(event)
|
ws.WriteMessage(websocket.TextMessage, event)
|
||||||
}
|
case <-tick.C:
|
||||||
case <-ticker.C:
|
|
||||||
err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeWait))
|
err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeWait))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
|
@ -1,21 +0,0 @@
|
|||||||
package stream
|
|
||||||
|
|
||||||
import "golang.org/x/net/context"
|
|
||||||
|
|
||||||
const key = "stream"
|
|
||||||
|
|
||||||
// Setter defines a context that enables setting values.
|
|
||||||
type Setter interface {
|
|
||||||
Set(string, interface{})
|
|
||||||
}
|
|
||||||
|
|
||||||
// FromContext returns the Stream associated with this context.
|
|
||||||
func FromContext(c context.Context) Stream {
|
|
||||||
return c.Value(key).(Stream)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ToContext adds the Stream to this context if it supports the
|
|
||||||
// Setter interface.
|
|
||||||
func ToContext(c Setter, s Stream) {
|
|
||||||
c.Set(key, s)
|
|
||||||
}
|
|
@ -1,54 +0,0 @@
|
|||||||
package stream
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"io"
|
|
||||||
"sync/atomic"
|
|
||||||
)
|
|
||||||
|
|
||||||
type reader struct {
|
|
||||||
w *writer
|
|
||||||
off int
|
|
||||||
closed uint32
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read reads from the Buffer
|
|
||||||
func (r *reader) Read(p []byte) (n int, err error) {
|
|
||||||
r.w.RLock()
|
|
||||||
defer r.w.RUnlock()
|
|
||||||
|
|
||||||
var m int
|
|
||||||
|
|
||||||
for len(p) > 0 {
|
|
||||||
|
|
||||||
m, _ = bytes.NewReader(r.w.buffer.Bytes()[r.off:]).Read(p)
|
|
||||||
n += m
|
|
||||||
r.off += n
|
|
||||||
|
|
||||||
if n > 0 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
if r.w.Closed() {
|
|
||||||
err = io.EOF
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if r.Closed() {
|
|
||||||
err = io.EOF
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
r.w.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *reader) Close() error {
|
|
||||||
atomic.StoreUint32(&r.closed, 1)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *reader) Closed() bool {
|
|
||||||
return atomic.LoadUint32(&r.closed) != 0
|
|
||||||
}
|
|
@ -1,7 +0,0 @@
|
|||||||
package stream
|
|
||||||
|
|
||||||
import "testing"
|
|
||||||
|
|
||||||
func TetsReader(t *testing.T) {
|
|
||||||
t.Skip() //TODO(bradrydzewski) implement reader tests
|
|
||||||
}
|
|
@ -1,60 +0,0 @@
|
|||||||
package stream
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bufio"
|
|
||||||
"io"
|
|
||||||
"strconv"
|
|
||||||
|
|
||||||
"golang.org/x/net/context"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Stream manages the stream of build logs.
|
|
||||||
type Stream interface {
|
|
||||||
Create(string) error
|
|
||||||
Delete(string) error
|
|
||||||
Reader(string) (io.ReadCloser, error)
|
|
||||||
Writer(string) (io.WriteCloser, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create creates a new stream.
|
|
||||||
func Create(c context.Context, key string) error {
|
|
||||||
return FromContext(c).Create(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Reader opens the stream for reading.
|
|
||||||
func Reader(c context.Context, key string) (io.ReadCloser, error) {
|
|
||||||
return FromContext(c).Reader(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Writer opens the stream for writing.
|
|
||||||
func Writer(c context.Context, key string) (io.WriteCloser, error) {
|
|
||||||
return FromContext(c).Writer(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete deletes the stream by key.
|
|
||||||
func Delete(c context.Context, key string) error {
|
|
||||||
return FromContext(c).Delete(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ToKey is a helper function that converts a unique identifier
|
|
||||||
// of type int64 into a string.
|
|
||||||
func ToKey(i int64) string {
|
|
||||||
return strconv.FormatInt(i, 10)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Copy copies the stream from the source to the destination in valid JSON
|
|
||||||
// format. This converts the logs, which are per-line JSON objects, to a
|
|
||||||
// proper JSON array.
|
|
||||||
func Copy(dest io.Writer, src io.Reader) error {
|
|
||||||
io.WriteString(dest, "[")
|
|
||||||
|
|
||||||
scanner := bufio.NewScanner(src)
|
|
||||||
for scanner.Scan() {
|
|
||||||
io.WriteString(dest, scanner.Text())
|
|
||||||
io.WriteString(dest, ",\n")
|
|
||||||
}
|
|
||||||
|
|
||||||
io.WriteString(dest, "{}]")
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
@ -1,72 +0,0 @@
|
|||||||
package stream
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
type stream struct {
|
|
||||||
sync.Mutex
|
|
||||||
writers map[string]*writer
|
|
||||||
}
|
|
||||||
|
|
||||||
// New returns a new in-memory implementation of Stream.
|
|
||||||
func New() Stream {
|
|
||||||
return &stream{
|
|
||||||
writers: map[string]*writer{},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Reader returns an io.Reader for reading from to the stream.
|
|
||||||
func (s *stream) Reader(name string) (io.ReadCloser, error) {
|
|
||||||
s.Lock()
|
|
||||||
defer s.Unlock()
|
|
||||||
|
|
||||||
if !s.exists(name) {
|
|
||||||
return nil, fmt.Errorf("stream: cannot read stream %s, not found", name)
|
|
||||||
}
|
|
||||||
return s.writers[name].Reader()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Writer returns an io.WriteCloser for writing to the stream.
|
|
||||||
func (s *stream) Writer(name string) (io.WriteCloser, error) {
|
|
||||||
s.Lock()
|
|
||||||
defer s.Unlock()
|
|
||||||
|
|
||||||
if !s.exists(name) {
|
|
||||||
return nil, fmt.Errorf("stream: cannot write stream %s, not found", name)
|
|
||||||
}
|
|
||||||
return s.writers[name], nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create creates a new stream.
|
|
||||||
func (s *stream) Create(name string) error {
|
|
||||||
s.Lock()
|
|
||||||
defer s.Unlock()
|
|
||||||
|
|
||||||
if s.exists(name) {
|
|
||||||
return fmt.Errorf("stream: cannot create stream %s, already exists", name)
|
|
||||||
}
|
|
||||||
|
|
||||||
s.writers[name] = newWriter()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete deletes the stream by key.
|
|
||||||
func (s *stream) Delete(name string) error {
|
|
||||||
s.Lock()
|
|
||||||
defer s.Unlock()
|
|
||||||
|
|
||||||
if !s.exists(name) {
|
|
||||||
return fmt.Errorf("stream: cannot delete stream %s, not found", name)
|
|
||||||
}
|
|
||||||
w := s.writers[name]
|
|
||||||
delete(s.writers, name)
|
|
||||||
return w.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *stream) exists(name string) bool {
|
|
||||||
_, exists := s.writers[name]
|
|
||||||
return exists
|
|
||||||
}
|
|
@ -1,7 +0,0 @@
|
|||||||
package stream
|
|
||||||
|
|
||||||
import "testing"
|
|
||||||
|
|
||||||
func TetsStream(t *testing.T) {
|
|
||||||
t.Skip() //TODO(bradrydzewski) implement stream tests
|
|
||||||
}
|
|
@ -1,52 +0,0 @@
|
|||||||
package stream
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"io"
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
)
|
|
||||||
|
|
||||||
type writer struct {
|
|
||||||
sync.RWMutex
|
|
||||||
*sync.Cond
|
|
||||||
|
|
||||||
buffer bytes.Buffer
|
|
||||||
closed uint32
|
|
||||||
}
|
|
||||||
|
|
||||||
func newWriter() *writer {
|
|
||||||
var w writer
|
|
||||||
w.Cond = sync.NewCond(w.RWMutex.RLocker())
|
|
||||||
return &w
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *writer) Write(p []byte) (n int, err error) {
|
|
||||||
defer w.Broadcast()
|
|
||||||
w.Lock()
|
|
||||||
defer w.Unlock()
|
|
||||||
if w.Closed() {
|
|
||||||
return 0, io.EOF
|
|
||||||
}
|
|
||||||
return w.buffer.Write(p)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *writer) Reader() (io.ReadCloser, error) {
|
|
||||||
return &reader{w: w}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *writer) Wait() {
|
|
||||||
if !w.Closed() {
|
|
||||||
w.Cond.Wait()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *writer) Close() error {
|
|
||||||
atomic.StoreUint32(&w.closed, 1)
|
|
||||||
w.Cond.Broadcast()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *writer) Closed() bool {
|
|
||||||
return atomic.LoadUint32(&w.closed) != 0
|
|
||||||
}
|
|
@ -1,7 +0,0 @@
|
|||||||
package stream
|
|
||||||
|
|
||||||
import "testing"
|
|
||||||
|
|
||||||
func TetsWriter(t *testing.T) {
|
|
||||||
t.Skip() //TODO(bradrydzewski) implement writer tests
|
|
||||||
}
|
|
26
yaml/label.go
Normal file
26
yaml/label.go
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
package yaml
|
||||||
|
|
||||||
|
import (
|
||||||
|
"gopkg.in/yaml.v2"
|
||||||
|
|
||||||
|
"github.com/drone/drone/yaml/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ParseLabel parses the labels section of the Yaml document.
|
||||||
|
func ParseLabel(in []byte) map[string]string {
|
||||||
|
out := struct {
|
||||||
|
Labels types.MapEqualSlice `yaml:"labels"`
|
||||||
|
}{}
|
||||||
|
|
||||||
|
yaml.Unmarshal(in, &out)
|
||||||
|
labels := out.Labels.Map()
|
||||||
|
if labels == nil {
|
||||||
|
labels = make(map[string]string)
|
||||||
|
}
|
||||||
|
return labels
|
||||||
|
}
|
||||||
|
|
||||||
|
// ParseLabelString parses the labels section of the Yaml document.
|
||||||
|
func ParseLabelString(in string) map[string]string {
|
||||||
|
return ParseLabel([]byte(in))
|
||||||
|
}
|
32
yaml/label_test.go
Normal file
32
yaml/label_test.go
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
package yaml
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/franela/goblin"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestLabel(t *testing.T) {
|
||||||
|
|
||||||
|
g := goblin.Goblin(t)
|
||||||
|
g.Describe("Label parser", func() {
|
||||||
|
|
||||||
|
g.It("Should parse empty yaml", func() {
|
||||||
|
labels := ParseLabelString("")
|
||||||
|
g.Assert(len(labels)).Equal(0)
|
||||||
|
})
|
||||||
|
|
||||||
|
g.It("Should parse slice", func() {
|
||||||
|
labels := ParseLabelString("labels: [foo=bar, baz=boo]")
|
||||||
|
g.Assert(len(labels)).Equal(2)
|
||||||
|
g.Assert(labels["foo"]).Equal("bar")
|
||||||
|
g.Assert(labels["baz"]).Equal("boo")
|
||||||
|
})
|
||||||
|
|
||||||
|
g.It("Should parse map", func() {
|
||||||
|
labels := ParseLabelString("labels: {foo: bar, baz: boo}")
|
||||||
|
g.Assert(labels["foo"]).Equal("bar")
|
||||||
|
g.Assert(labels["baz"]).Equal("boo")
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
26
yaml/platform.go
Normal file
26
yaml/platform.go
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
package yaml
|
||||||
|
|
||||||
|
import "gopkg.in/yaml.v2"
|
||||||
|
|
||||||
|
// ParsePlatform parses the platform section of the Yaml document.
|
||||||
|
func ParsePlatform(in []byte) string {
|
||||||
|
out := struct {
|
||||||
|
Platform string `yaml:"platform"`
|
||||||
|
}{}
|
||||||
|
|
||||||
|
yaml.Unmarshal(in, &out)
|
||||||
|
return out.Platform
|
||||||
|
}
|
||||||
|
|
||||||
|
// ParsePlatformString parses the platform section of the Yaml document.
|
||||||
|
func ParsePlatformString(in string) string {
|
||||||
|
return ParsePlatform([]byte(in))
|
||||||
|
}
|
||||||
|
|
||||||
|
// ParsePlatformDefault parses the platform section of the Yaml document.
|
||||||
|
func ParsePlatformDefault(in []byte, platform string) string {
|
||||||
|
if p := ParsePlatform([]byte(in)); p != "" {
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
return platform
|
||||||
|
}
|
@ -6,8 +6,11 @@ const clone = "clone"
|
|||||||
|
|
||||||
// Clone transforms the Yaml to include a clone step.
|
// Clone transforms the Yaml to include a clone step.
|
||||||
func Clone(c *yaml.Config, plugin string) error {
|
func Clone(c *yaml.Config, plugin string) error {
|
||||||
if plugin == "" {
|
switch plugin {
|
||||||
plugin = "git"
|
case "", "git":
|
||||||
|
plugin = "plugins/git:latest"
|
||||||
|
case "hg":
|
||||||
|
plugin = "plugins/hg:latest"
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, p := range c.Pipeline {
|
for _, p := range c.Pipeline {
|
||||||
|
@ -34,28 +34,6 @@ func ImageTag(conf *yaml.Config) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ImageName transforms the Yaml to replace underscores with dashes.
|
|
||||||
func ImageName(conf *yaml.Config) error {
|
|
||||||
for _, image := range conf.Pipeline {
|
|
||||||
image.Image = strings.Replace(image.Image, "_", "-", -1)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// ImageNamespace transforms the Yaml to use a default namepsace for plugins.
|
|
||||||
func ImageNamespace(conf *yaml.Config, namespace string) error {
|
|
||||||
for _, image := range conf.Pipeline {
|
|
||||||
if strings.Contains(image.Image, "/") {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if !isPlugin(image) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
image.Image = filepath.Join(namespace, image.Image)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// ImageEscalate transforms the Yaml to automatically enable privileged mode
|
// ImageEscalate transforms the Yaml to automatically enable privileged mode
|
||||||
// for a subset of white-listed plugins matching the given patterns.
|
// for a subset of white-listed plugins matching the given patterns.
|
||||||
func ImageEscalate(conf *yaml.Config, patterns []string) error {
|
func ImageEscalate(conf *yaml.Config, patterns []string) error {
|
||||||
|
@ -128,35 +128,5 @@ func Test_normalize(t *testing.T) {
|
|||||||
g.Assert(c.Pipeline[0].Image).Equal("golang:1.5")
|
g.Assert(c.Pipeline[0].Image).Equal("golang:1.5")
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
g.Describe("plugins", func() {
|
|
||||||
|
|
||||||
g.It("should prepend namespace", func() {
|
|
||||||
c := newConfig(&yaml.Container{
|
|
||||||
Image: "slack",
|
|
||||||
})
|
|
||||||
|
|
||||||
ImageNamespace(c, "plugins")
|
|
||||||
g.Assert(c.Pipeline[0].Image).Equal("plugins/slack")
|
|
||||||
})
|
|
||||||
|
|
||||||
g.It("should not override existing namespace", func() {
|
|
||||||
c := newConfig(&yaml.Container{
|
|
||||||
Image: "index.docker.io/drone/git",
|
|
||||||
})
|
|
||||||
|
|
||||||
ImageNamespace(c, "plugins")
|
|
||||||
g.Assert(c.Pipeline[0].Image).Equal("index.docker.io/drone/git")
|
|
||||||
})
|
|
||||||
|
|
||||||
g.It("should replace underscores with dashes", func() {
|
|
||||||
c := newConfig(&yaml.Container{
|
|
||||||
Image: "gh_pages",
|
|
||||||
})
|
|
||||||
|
|
||||||
ImageName(c)
|
|
||||||
g.Assert(c.Pipeline[0].Image).Equal("gh-pages")
|
|
||||||
})
|
|
||||||
})
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user