From 4d4003a9a16e69f3919cdbadbef6e95002714389 Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Thu, 28 Apr 2016 14:10:32 -0700 Subject: [PATCH] moved 0.5 out of feature flag, removed deprecated 0.4 code and features --- .drone.yml | 2 +- api/build.go | 115 +++------ api/node.go | 80 ------ drone/drone.go | 2 +- drone/main.go | 62 ----- drone/server/server.go | 7 +- engine/bus.go | 48 ---- engine/bus_test.go | 50 ---- engine/context.go | 23 -- engine/engine.go | 444 ---------------------------------- engine/pool.go | 86 ------- engine/pool_test.go | 89 ------- engine/types.go | 24 -- engine/updater.go | 66 ----- engine/util.go | 35 --- engine/worker.go | 115 --------- model/node.go | 36 --- router/middleware/engine.go | 28 --- router/router.go | 34 +-- store/datastore/nodes.go | 48 ---- store/datastore/nodes_test.go | 101 -------- store/store.go | 35 --- template/amber/base.amber | 3 - web/hook.go | 104 +++----- web/pages.go | 29 +-- web/stream.go | 80 +++--- web/stream2.go | 121 --------- 27 files changed, 120 insertions(+), 1747 deletions(-) delete mode 100644 api/node.go delete mode 100644 drone/main.go delete mode 100644 engine/bus.go delete mode 100644 engine/bus_test.go delete mode 100644 engine/context.go delete mode 100644 engine/engine.go delete mode 100644 engine/pool.go delete mode 100644 engine/pool_test.go delete mode 100644 engine/types.go delete mode 100644 engine/updater.go delete mode 100644 engine/util.go delete mode 100644 engine/worker.go delete mode 100644 model/node.go delete mode 100644 router/middleware/engine.go delete mode 100644 store/datastore/nodes.go delete mode 100644 store/datastore/nodes_test.go delete mode 100644 web/stream2.go diff --git a/.drone.yml b/.drone.yml index 52b56f9bf..e78534349 100644 --- a/.drone.yml +++ b/.drone.yml @@ -35,7 +35,7 @@ publish: password: $$DOCKER_PASS email: $$DOCKER_EMAIL repo: drone/drone - tag: [ "latest", "0.4.2" ] + tag: [ "0.5.0" ] when: repo: drone/drone branch: master diff --git a/api/build.go b/api/build.go index d0bd37388..3ca688a44 100644 --- a/api/build.go +++ b/api/build.go @@ -5,14 +5,11 @@ import ( "io" "net/http" "os" - "path/filepath" "strconv" - "strings" "time" log "github.com/Sirupsen/logrus" "github.com/drone/drone/bus" - "github.com/drone/drone/engine" "github.com/drone/drone/queue" "github.com/drone/drone/remote" "github.com/drone/drone/shared/httputil" @@ -33,10 +30,7 @@ func init() { if droneYml == "" { droneYml = ".drone.yml" } - droneSec = fmt.Sprintf("%s.sec", strings.TrimSuffix(droneYml, filepath.Ext(droneYml))) - if os.Getenv("CANARY") == "true" { - droneSec = fmt.Sprintf("%s.sig", droneYml) - } + droneSec = fmt.Sprintf("%s.sig", droneYml) } func GetBuilds(c *gin.Context) { @@ -135,7 +129,6 @@ func GetBuildLogs(c *gin.Context) { } func DeleteBuild(c *gin.Context) { - engine_ := engine.FromContext(c) repo := session.Repo(c) // parse the build number and job sequence number from @@ -155,17 +148,8 @@ func DeleteBuild(c *gin.Context) { return } - if os.Getenv("CANARY") == "true" { - bus.Publish(c, bus.NewEvent(bus.Cancelled, repo, build, job)) - return - } - - node, err := store.GetNode(c, job.NodeID) - if err != nil { - c.AbortWithError(404, err) - return - } - engine_.Cancel(build.ID, job.ID, node) + bus.Publish(c, bus.NewEvent(bus.Cancelled, repo, build, job)) + c.String(204, "") } func PostBuild(c *gin.Context) { @@ -218,7 +202,6 @@ func PostBuild(c *gin.Context) { log.Debugf("cannot find build secrets for %s. %s", repo.FullName, err) } - key, _ := store.GetKey(c, repo) netrc, err := remote_.Netrc(user, repo) if err != nil { log.Errorf("failure to generate netrc for %s. %s", repo.FullName, err) @@ -296,72 +279,42 @@ func PostBuild(c *gin.Context) { log.Errorf("Error getting secrets for %s#%d. %s", repo.FullName, build.Number, err) } - // IMPORTANT. PLEASE READ - // - // The below code uses a feature flag to switch between the current - // build engine and the exerimental 0.5 build engine. This can be - // enabled using with the environment variable CANARY=true + var signed bool + var verified bool - if os.Getenv("CANARY") == "true" { - - var signed bool - var verified bool - - signature, err := jose.ParseSigned(string(sec)) + signature, err := jose.ParseSigned(string(sec)) + if err != nil { + log.Debugf("cannot parse .drone.yml.sig file. %s", err) + } else if len(sec) == 0 { + log.Debugf("cannot parse .drone.yml.sig file. empty file") + } else { + signed = true + output, err := signature.Verify([]byte(repo.Hash)) if err != nil { - log.Debugf("cannot parse .drone.yml.sig file. %s", err) - } else if len(sec) == 0 { - log.Debugf("cannot parse .drone.yml.sig file. empty file") + log.Debugf("cannot verify .drone.yml.sig file. %s", err) + } else if string(output) != string(raw) { + log.Debugf("cannot verify .drone.yml.sig file. no match. %q <> %q", string(output), string(raw)) } else { - signed = true - output, err := signature.Verify([]byte(repo.Hash)) - if err != nil { - log.Debugf("cannot verify .drone.yml.sig file. %s", err) - } else if string(output) != string(raw) { - log.Debugf("cannot verify .drone.yml.sig file. no match. %q <> %q", string(output), string(raw)) - } else { - verified = true - } + verified = true } - - log.Debugf(".drone.yml is signed=%v and verified=%v", signed, verified) - - bus.Publish(c, bus.NewBuildEvent(bus.Enqueued, repo, build)) - for _, job := range jobs { - queue.Publish(c, &queue.Work{ - Signed: signed, - Verified: verified, - User: user, - Repo: repo, - Build: build, - BuildLast: last, - Job: job, - Netrc: netrc, - Yaml: string(raw), - Secrets: secs, - System: &model.System{Link: httputil.GetURL(c.Request)}, - }) - } - return // EXIT NOT TO AVOID THE 0.4 ENGINE CODE BELOW } - engine_ := engine.FromContext(c) - go engine_.Schedule(c.Copy(), &engine.Task{ - User: user, - Repo: repo, - Build: build, - BuildPrev: last, - Jobs: jobs, - Keys: key, - Netrc: netrc, - Config: string(raw), - Secret: string(sec), - System: &model.System{ - Link: httputil.GetURL(c.Request), - Plugins: strings.Split(os.Getenv("PLUGIN_FILTER"), " "), - Globals: strings.Split(os.Getenv("PLUGIN_PARAMS"), " "), - Escalates: strings.Split(os.Getenv("ESCALATE_FILTER"), " "), - }, - }) + log.Debugf(".drone.yml is signed=%v and verified=%v", signed, verified) + bus.Publish(c, bus.NewBuildEvent(bus.Enqueued, repo, build)) + for _, job := range jobs { + queue.Publish(c, &queue.Work{ + Signed: signed, + Verified: verified, + User: user, + Repo: repo, + Build: build, + BuildLast: last, + Job: job, + Netrc: netrc, + Yaml: string(raw), + Secrets: secs, + System: &model.System{Link: httputil.GetURL(c.Request)}, + }) + } } diff --git a/api/node.go b/api/node.go deleted file mode 100644 index 592ec0c1b..000000000 --- a/api/node.go +++ /dev/null @@ -1,80 +0,0 @@ -package api - -import ( - "net/http" - "strconv" - - "github.com/gin-gonic/gin" - - "github.com/drone/drone/engine" - "github.com/drone/drone/model" - "github.com/drone/drone/store" -) - -func GetNodes(c *gin.Context) { - nodes, err := store.GetNodeList(c) - if err != nil { - c.String(400, err.Error()) - } else { - c.JSON(200, nodes) - } -} - -func GetNode(c *gin.Context) { - -} - -func PostNode(c *gin.Context) { - engine := engine.FromContext(c) - - in := struct { - Addr string `json:"address"` - Arch string `json:"architecture"` - Cert string `json:"cert"` - Key string `json:"key"` - CA string `json:"ca"` - }{} - err := c.Bind(&in) - if err != nil { - c.AbortWithStatus(http.StatusBadRequest) - return - } - - node := &model.Node{} - node.Addr = in.Addr - node.Cert = in.Cert - node.Key = in.Key - node.CA = in.CA - node.Arch = "linux_amd64" - - err = engine.Allocate(node) - if err != nil { - c.String(http.StatusBadRequest, err.Error()) - return - } - - err = store.CreateNode(c, node) - if err != nil { - c.AbortWithStatus(http.StatusInternalServerError) - return - } - - c.IndentedJSON(http.StatusOK, node) -} - -func DeleteNode(c *gin.Context) { - engine := engine.FromContext(c) - - id, _ := strconv.Atoi(c.Param("node")) - node, err := store.GetNode(c, int64(id)) - if err != nil { - c.AbortWithStatus(http.StatusNotFound) - return - } - err = store.DeleteNode(c, node) - if err != nil { - c.AbortWithStatus(http.StatusInternalServerError) - return - } - engine.Deallocate(node) -} diff --git a/drone/drone.go b/drone/drone.go index 2a92660da..cfc034d60 100644 --- a/drone/drone.go +++ b/drone/drone.go @@ -12,7 +12,7 @@ import ( _ "github.com/joho/godotenv/autoload" ) -func main2() { +func main() { envflag.Parse() app := cli.NewApp() diff --git a/drone/main.go b/drone/main.go deleted file mode 100644 index e08585855..000000000 --- a/drone/main.go +++ /dev/null @@ -1,62 +0,0 @@ -package main - -import ( - "net/http" - "os" - "time" - - "github.com/drone/drone/router" - "github.com/drone/drone/router/middleware" - - "github.com/Sirupsen/logrus" - "github.com/gin-gonic/contrib/ginrus" - "github.com/ianschenck/envflag" - _ "github.com/joho/godotenv/autoload" -) - -var ( - addr = envflag.String("SERVER_ADDR", ":8000", "") - cert = envflag.String("SERVER_CERT", "", "") - key = envflag.String("SERVER_KEY", "", "") - - debug = envflag.Bool("DEBUG", false, "") -) - -func main() { - if os.Getenv("CANARY") == "true" { - main2() - return - } - - envflag.Parse() - - // debug level if requested by user - if *debug { - logrus.SetLevel(logrus.DebugLevel) - } else { - logrus.SetLevel(logrus.WarnLevel) - } - - // setup the server and start the listener - handler := router.Load( - ginrus.Ginrus(logrus.StandardLogger(), time.RFC3339, true), - middleware.Version, - middleware.Queue(), - middleware.Stream(), - middleware.Bus(), - middleware.Cache(), - middleware.Store(), - middleware.Remote(), - middleware.Engine(), - ) - - if *cert != "" { - logrus.Fatal( - http.ListenAndServeTLS(*addr, *cert, *key, handler), - ) - } else { - logrus.Fatal( - http.ListenAndServe(*addr, handler), - ) - } -} diff --git a/drone/server/server.go b/drone/server/server.go index e22be9467..600a84fa9 100644 --- a/drone/server/server.go +++ b/drone/server/server.go @@ -86,7 +86,6 @@ func start(c *cli.Context) error { middleware.Cache(), middleware.Store(), middleware.Remote(), - middleware.Engine(), ) if c.String("server-cert") != "" { @@ -109,7 +108,7 @@ var agreement = ` You are attempting to use the unstable channel. This build is experimental and -has known bugs and compatibility issues, and is not intended for general use. +has known bugs and compatibility issues. It is not intended for general use. Please consider using the latest stable release instead: @@ -119,8 +118,8 @@ If you are attempting to build from source please use the latest stable tag: v0.4.2 -If you are interested in testing this experimental build and assisting with -development you will need to set the following environment variables to proceed: +If you are interested in testing this experimental build AND assisting with +development you may proceed by setting the following environment: I_UNDERSTAND_I_AM_USING_AN_UNSTABLE_VERSION=true I_AGREE_TO_FIX_BUGS_AND_NOT_FILE_BUGS=true diff --git a/engine/bus.go b/engine/bus.go deleted file mode 100644 index 3ab1ece03..000000000 --- a/engine/bus.go +++ /dev/null @@ -1,48 +0,0 @@ -package engine - -import ( - "sync" -) - -type eventbus struct { - sync.Mutex - subs map[chan *Event]bool -} - -// New creates a new eventbus that manages a list of -// subscribers to which events are published. -func newEventbus() *eventbus { - return &eventbus{ - subs: make(map[chan *Event]bool), - } -} - -// Subscribe adds the channel to the list of -// subscribers. Each subscriber in the list will -// receive broadcast events. -func (b *eventbus) subscribe(c chan *Event) { - b.Lock() - b.subs[c] = true - b.Unlock() -} - -// Unsubscribe removes the channel from the -// list of subscribers. -func (b *eventbus) unsubscribe(c chan *Event) { - b.Lock() - delete(b.subs, c) - b.Unlock() -} - -// Send dispatches a message to all subscribers. -func (b *eventbus) send(event *Event) { - b.Lock() - defer b.Unlock() - - for s := range b.subs { - go func(c chan *Event) { - defer recover() - c <- event - }(s) - } -} diff --git a/engine/bus_test.go b/engine/bus_test.go deleted file mode 100644 index 85b2e681e..000000000 --- a/engine/bus_test.go +++ /dev/null @@ -1,50 +0,0 @@ -package engine - -import ( - "testing" - - . "github.com/franela/goblin" -) - -func TestBus(t *testing.T) { - g := Goblin(t) - g.Describe("Event bus", func() { - - g.It("Should unsubscribe", func() { - c1 := make(chan *Event) - c2 := make(chan *Event) - b := newEventbus() - b.subscribe(c1) - b.subscribe(c2) - g.Assert(len(b.subs)).Equal(2) - }) - - g.It("Should subscribe", func() { - c1 := make(chan *Event) - c2 := make(chan *Event) - b := newEventbus() - b.subscribe(c1) - b.subscribe(c2) - g.Assert(len(b.subs)).Equal(2) - b.unsubscribe(c1) - b.unsubscribe(c2) - g.Assert(len(b.subs)).Equal(0) - }) - - g.It("Should send", func() { - em := map[string]bool{"foo": true, "bar": true} - e1 := &Event{Name: "foo"} - e2 := &Event{Name: "bar"} - c := make(chan *Event) - b := newEventbus() - b.subscribe(c) - b.send(e1) - b.send(e2) - r1 := <-c - r2 := <-c - g.Assert(em[r1.Name]).Equal(true) - g.Assert(em[r2.Name]).Equal(true) - }) - }) - -} diff --git a/engine/context.go b/engine/context.go deleted file mode 100644 index 2321fa071..000000000 --- a/engine/context.go +++ /dev/null @@ -1,23 +0,0 @@ -package engine - -import ( - "golang.org/x/net/context" -) - -const key = "engine" - -// Setter defines a context that enables setting values. -type Setter interface { - Set(string, interface{}) -} - -// FromContext returns the Engine associated with this context. -func FromContext(c context.Context) Engine { - return c.Value(key).(Engine) -} - -// ToContext adds the Engine to this context if it supports -// the Setter interface. -func ToContext(c Setter, engine Engine) { - c.Set(key, engine) -} diff --git a/engine/engine.go b/engine/engine.go deleted file mode 100644 index 32a2c6a10..000000000 --- a/engine/engine.go +++ /dev/null @@ -1,444 +0,0 @@ -package engine - -import ( - "bytes" - "crypto/tls" - "crypto/x509" - "errors" - "fmt" - "io" - "io/ioutil" - "os" - "runtime" - "time" - - log "github.com/Sirupsen/logrus" - "github.com/docker/docker/pkg/stdcopy" - "github.com/drone/drone/model" - "github.com/drone/drone/shared/docker" - "github.com/drone/drone/store" - "github.com/samalba/dockerclient" - "golang.org/x/net/context" -) - -type Engine interface { - Schedule(context.Context, *Task) - Cancel(int64, int64, *model.Node) error - Stream(int64, int64, *model.Node) (io.ReadCloser, error) - Deallocate(*model.Node) - Allocate(*model.Node) error - Subscribe(chan *Event) - Unsubscribe(chan *Event) -} - -var ( - // options to fetch the stdout and stderr logs - logOpts = &dockerclient.LogOptions{ - Stdout: true, - Stderr: true, - } - - // options to fetch the stdout and stderr logs - // by tailing the output. - logOptsTail = &dockerclient.LogOptions{ - Follow: true, - Stdout: true, - Stderr: true, - } - - // error when the system cannot find logs - errLogging = errors.New("Logs not available") -) - -type engine struct { - bus *eventbus - updater *updater - pool *pool - envs []string -} - -// Load creates a new build engine, loaded with registered nodes from the -// database. The registered nodes are added to the pool of nodes to immediately -// start accepting workloads. -func Load(s store.Store) Engine { - engine := &engine{} - engine.bus = newEventbus() - engine.pool = newPool() - engine.updater = &updater{engine.bus} - - // quick fix to propagate HTTP_PROXY variables - // throughout the build environment. - var proxyVars = []string{"HTTP_PROXY", "http_proxy", "HTTPS_PROXY", "https_proxy", "NO_PROXY", "no_proxy"} - for _, proxyVar := range proxyVars { - proxyVal := os.Getenv(proxyVar) - if len(proxyVal) != 0 { - engine.envs = append(engine.envs, proxyVar+"="+proxyVal) - } - } - - nodes, err := s.GetNodeList() - if err != nil { - log.Fatalf("failed to get nodes from database. %s", err) - } - for _, node := range nodes { - engine.pool.allocate(node) - log.Infof("registered docker daemon %s", node.Addr) - } - - return engine -} - -// Cancel cancels the job running on the specified Node. -func (e *engine) Cancel(build, job int64, node *model.Node) error { - client, err := newDockerClient(node.Addr, node.Cert, node.Key, node.CA) - if err != nil { - return err - } - - id := fmt.Sprintf("drone_build_%d_job_%d", build, job) - return client.StopContainer(id, 30) -} - -// Stream streams the job output from the specified Node. -func (e *engine) Stream(build, job int64, node *model.Node) (io.ReadCloser, error) { - client, err := newDockerClient(node.Addr, node.Cert, node.Key, node.CA) - if err != nil { - log.Errorf("cannot create Docker client for node %s", node.Addr) - return nil, err - } - - id := fmt.Sprintf("drone_build_%d_job_%d", build, job) - log.Debugf("streaming container logs %s", id) - return client.ContainerLogs(id, logOptsTail) -} - -// Subscribe subscribes the channel to all build events. -func (e *engine) Subscribe(c chan *Event) { - e.bus.subscribe(c) -} - -// Unsubscribe unsubscribes the channel from all build events. -func (e *engine) Unsubscribe(c chan *Event) { - e.bus.unsubscribe(c) -} - -func (e *engine) Allocate(node *model.Node) error { - - // run the full build! - client, err := newDockerClient(node.Addr, node.Cert, node.Key, node.CA) - if err != nil { - log.Errorf("error creating docker client %s. %s.", node.Addr, err) - return err - } - version, err := client.Version() - if err != nil { - log.Errorf("error connecting to docker daemon %s. %s.", node.Addr, err) - return err - } - - log.Infof("registered docker daemon %s running version %s", node.Addr, version.Version) - e.pool.allocate(node) - return nil -} - -func (e *engine) Deallocate(n *model.Node) { - nodes := e.pool.list() - for _, node := range nodes { - if node.ID == n.ID { - log.Infof("un-registered docker daemon %s", node.Addr) - e.pool.deallocate(node) - break - } - } -} - -func (e *engine) Schedule(c context.Context, req *Task) { - node := <-e.pool.reserve() - - // since we are probably running in a go-routine - // make sure we recover from any panics so that - // a bug doesn't crash the whole system. - defer func() { - if err := recover(); err != nil { - - const size = 64 << 10 - buf := make([]byte, size) - buf = buf[:runtime.Stack(buf, false)] - log.Errorf("panic running build: %v\n%s", err, string(buf)) - } - e.pool.release(node) - }() - - // update the node that was allocated to each job - func(id int64) { - for _, job := range req.Jobs { - job.NodeID = id - store.UpdateJob(c, job) - } - }(node.ID) - - // run the full build! - client, err := newDockerClient(node.Addr, node.Cert, node.Key, node.CA) - if err != nil { - log.Errorln("error creating docker client", err) - } - - // update the build state if any of the sub-tasks - // had a non-success status - req.Build.Started = time.Now().UTC().Unix() - req.Build.Status = model.StatusRunning - e.updater.SetBuild(c, req) - - // run all bulid jobs - for _, job := range req.Jobs { - req.Job = job - e.runJob(c, req, e.updater, client) - } - - // update overall status based on each job - req.Build.Status = model.StatusSuccess - for _, job := range req.Jobs { - if job.Status != model.StatusSuccess { - req.Build.Status = job.Status - break - } - } - req.Build.Finished = time.Now().UTC().Unix() - err = e.updater.SetBuild(c, req) - if err != nil { - log.Errorf("error updating build completion status. %s", err) - } - - // run notifications - err = e.runJobNotify(req, client) - if err != nil { - log.Errorf("error executing notification step. %s", err) - } -} - -func newDockerClient(addr, cert, key, ca string) (dockerclient.Client, error) { - var tlc *tls.Config - - // create the Docket client TLS config - if len(cert) != 0 { - pem, err := tls.X509KeyPair([]byte(cert), []byte(key)) - if err != nil { - log.Errorf("error loading X509 key pair. %s.", err) - return dockerclient.NewDockerClient(addr, nil) - } - - // create the TLS configuration for secure - // docker communications. - tlc = &tls.Config{} - tlc.Certificates = []tls.Certificate{pem} - - // use the certificate authority if provided. - // else don't use a certificate authority and set - // skip verify to true - if len(ca) != 0 { - log.Infof("creating docker client %s with CA", addr) - pool := x509.NewCertPool() - pool.AppendCertsFromPEM([]byte(ca)) - tlc.RootCAs = pool - - } else { - log.Infof("creating docker client %s WITHOUT CA", addr) - tlc.InsecureSkipVerify = true - } - } - - // create the Docker client. In this version of Drone (alpha) - // we do not spread builds across clients, but this can and - // (probably) will change in the future. - return dockerclient.NewDockerClient(addr, tlc) -} - -func (e *engine) runJob(c context.Context, r *Task, updater *updater, client dockerclient.Client) error { - - name := fmt.Sprintf("drone_build_%d_job_%d", r.Build.ID, r.Job.ID) - - defer func() { - if r.Job.Status == model.StatusRunning { - r.Job.Status = model.StatusError - r.Job.Finished = time.Now().UTC().Unix() - r.Job.ExitCode = 255 - } - if r.Job.Status == model.StatusPending { - r.Job.Status = model.StatusError - r.Job.Started = time.Now().UTC().Unix() - r.Job.Finished = time.Now().UTC().Unix() - r.Job.ExitCode = 255 - } - updater.SetJob(c, r) - - client.KillContainer(name, "9") - client.RemoveContainer(name, true, true) - }() - - // marks the task as running - r.Job.Status = model.StatusRunning - r.Job.Started = time.Now().UTC().Unix() - - // encode the build payload to write to stdin - // when launching the build container - in, err := encodeToLegacyFormat(r) - if err != nil { - log.Errorf("failure to marshal work. %s", err) - return err - } - - // CREATE AND START BUILD - args := DefaultBuildArgs - if r.Build.Event == model.EventPull { - args = DefaultPullRequestArgs - } - args = append(args, "--") - args = append(args, string(in)) - - conf := &dockerclient.ContainerConfig{ - Image: DefaultAgent, - Entrypoint: DefaultEntrypoint, - Cmd: args, - Env: e.envs, - HostConfig: dockerclient.HostConfig{ - Binds: []string{"/var/run/docker.sock:/var/run/docker.sock"}, - MemorySwappiness: -1, - }, - Volumes: map[string]struct{}{ - "/var/run/docker.sock": {}, - }, - } - - log.Infof("preparing container %s", name) - client.PullImage(conf.Image, nil) - - _, err = docker.RunDaemon(client, conf, name) - if err != nil { - log.Errorf("error starting build container. %s", err) - return err - } - - // UPDATE STATUS - - err = updater.SetJob(c, r) - if err != nil { - log.Errorf("error updating job status as running. %s", err) - return err - } - - // WAIT FOR OUTPUT - info, builderr := docker.Wait(client, name) - - switch { - case info.State.Running: - // A build unblocked before actually being completed. - log.Errorf("incomplete build: %s", name) - r.Job.ExitCode = 1 - r.Job.Status = model.StatusError - case info.State.ExitCode == 128: - r.Job.ExitCode = info.State.ExitCode - r.Job.Status = model.StatusKilled - case info.State.ExitCode == 130: - r.Job.ExitCode = info.State.ExitCode - r.Job.Status = model.StatusKilled - case builderr != nil: - r.Job.Status = model.StatusError - case info.State.ExitCode != 0: - r.Job.ExitCode = info.State.ExitCode - r.Job.Status = model.StatusFailure - default: - r.Job.Status = model.StatusSuccess - } - - // send the logs to the datastore - var buf bytes.Buffer - rc, err := client.ContainerLogs(name, docker.LogOpts) - if err != nil && builderr != nil { - buf.WriteString("Error launching build") - buf.WriteString(builderr.Error()) - } else if err != nil { - buf.WriteString("Error launching build") - buf.WriteString(err.Error()) - log.Errorf("error opening connection to logs. %s", err) - return err - } else { - defer rc.Close() - stdcopy.StdCopy(&buf, &buf, io.LimitReader(rc, 5000000)) - } - - // update the task in the datastore - r.Job.Finished = time.Now().UTC().Unix() - err = updater.SetJob(c, r) - if err != nil { - log.Errorf("error updating job after completion. %s", err) - return err - } - - err = updater.SetLogs(c, r, ioutil.NopCloser(&buf)) - if err != nil { - log.Errorf("error updating logs. %s", err) - return err - } - - log.Debugf("completed job %d with status %s.", r.Job.ID, r.Job.Status) - return nil -} - -func (e *engine) runJobNotify(r *Task, client dockerclient.Client) error { - - name := fmt.Sprintf("drone_build_%d_notify", r.Build.ID) - - defer func() { - client.KillContainer(name, "9") - client.RemoveContainer(name, true, true) - }() - - // encode the build payload to write to stdin - // when launching the build container - in, err := encodeToLegacyFormat(r) - if err != nil { - log.Errorf("failure to marshal work. %s", err) - return err - } - - args := DefaultNotifyArgs - args = append(args, "--") - args = append(args, string(in)) - - conf := &dockerclient.ContainerConfig{ - Image: DefaultAgent, - Entrypoint: DefaultEntrypoint, - Cmd: args, - Env: e.envs, - HostConfig: dockerclient.HostConfig{ - Binds: []string{"/var/run/docker.sock:/var/run/docker.sock"}, - MemorySwappiness: -1, - }, - Volumes: map[string]struct{}{ - "/var/run/docker.sock": {}, - }, - } - - log.Infof("preparing container %s", name) - info, err := docker.Run(client, conf, name) - if err != nil { - log.Errorf("Error starting notification container %s. %s", name, err) - } - - // for debugging purposes we print a failed notification executions - // output to the logs. Otherwise we have no way to troubleshoot failed - // notifications. This is temporary code until I've come up with - // a better solution. - if info != nil && info.State.ExitCode != 0 && log.GetLevel() >= log.InfoLevel { - var buf bytes.Buffer - rc, err := client.ContainerLogs(name, docker.LogOpts) - if err == nil { - defer rc.Close() - stdcopy.StdCopy(&buf, &buf, io.LimitReader(rc, 50000)) - } - log.Infof("Notification container %s exited with %d", name, info.State.ExitCode) - log.Infoln(buf.String()) - } - - return err -} diff --git a/engine/pool.go b/engine/pool.go deleted file mode 100644 index a886b47de..000000000 --- a/engine/pool.go +++ /dev/null @@ -1,86 +0,0 @@ -package engine - -import ( - "sync" - - "github.com/drone/drone/model" -) - -type pool struct { - sync.Mutex - nodes map[*model.Node]bool - nodec chan *model.Node -} - -func newPool() *pool { - return &pool{ - nodes: make(map[*model.Node]bool), - nodec: make(chan *model.Node, 999), - } -} - -// Allocate allocates a node to the pool to -// be available to accept work. -func (p *pool) allocate(n *model.Node) bool { - if p.isAllocated(n) { - return false - } - - p.Lock() - p.nodes[n] = true - p.Unlock() - - p.nodec <- n - return true -} - -// IsAllocated is a helper function that returns -// true if the node is currently allocated to -// the pool. -func (p *pool) isAllocated(n *model.Node) bool { - p.Lock() - defer p.Unlock() - _, ok := p.nodes[n] - return ok -} - -// Deallocate removes the node from the pool of -// available nodes. If the node is currently -// reserved and performing work it will finish, -// but no longer be given new work. -func (p *pool) deallocate(n *model.Node) { - p.Lock() - defer p.Unlock() - delete(p.nodes, n) -} - -// List returns a list of all model.Nodes currently -// allocated to the pool. -func (p *pool) list() []*model.Node { - p.Lock() - defer p.Unlock() - - var nodes []*model.Node - for n := range p.nodes { - nodes = append(nodes, n) - } - return nodes -} - -// Reserve reserves the next available node to -// start doing work. Once work is complete, the -// node should be released back to the pool. -func (p *pool) reserve() <-chan *model.Node { - return p.nodec -} - -// Release releases the node back to the pool -// of available nodes. -func (p *pool) release(n *model.Node) bool { - if !p.isAllocated(n) { - return false - } - - p.nodec <- n - return true -} diff --git a/engine/pool_test.go b/engine/pool_test.go deleted file mode 100644 index 852847396..000000000 --- a/engine/pool_test.go +++ /dev/null @@ -1,89 +0,0 @@ -package engine - -import ( - "testing" - - "github.com/drone/drone/model" - "github.com/franela/goblin" -) - -func TestPool(t *testing.T) { - - g := goblin.Goblin(t) - g.Describe("Pool", func() { - - g.It("Should allocate nodes", func() { - n := &model.Node{Addr: "unix:///var/run/docker.sock"} - pool := newPool() - pool.allocate(n) - g.Assert(len(pool.nodes)).Equal(1) - g.Assert(len(pool.nodec)).Equal(1) - g.Assert(pool.nodes[n]).Equal(true) - }) - - g.It("Should not re-allocate an allocated node", func() { - n := &model.Node{Addr: "unix:///var/run/docker.sock"} - pool := newPool() - g.Assert(pool.allocate(n)).Equal(true) - g.Assert(pool.allocate(n)).Equal(false) - }) - - g.It("Should reserve a node", func() { - n := &model.Node{Addr: "unix:///var/run/docker.sock"} - pool := newPool() - pool.allocate(n) - g.Assert(<-pool.reserve()).Equal(n) - }) - - g.It("Should release a node", func() { - n := &model.Node{Addr: "unix:///var/run/docker.sock"} - pool := newPool() - pool.allocate(n) - g.Assert(len(pool.nodec)).Equal(1) - g.Assert(<-pool.reserve()).Equal(n) - g.Assert(len(pool.nodec)).Equal(0) - pool.release(n) - g.Assert(len(pool.nodec)).Equal(1) - g.Assert(<-pool.reserve()).Equal(n) - g.Assert(len(pool.nodec)).Equal(0) - }) - - g.It("Should not release an unallocated node", func() { - n := &model.Node{Addr: "unix:///var/run/docker.sock"} - pool := newPool() - g.Assert(len(pool.nodes)).Equal(0) - g.Assert(len(pool.nodec)).Equal(0) - pool.release(n) - g.Assert(len(pool.nodes)).Equal(0) - g.Assert(len(pool.nodec)).Equal(0) - pool.release(nil) - g.Assert(len(pool.nodes)).Equal(0) - g.Assert(len(pool.nodec)).Equal(0) - }) - - g.It("Should list all allocated nodes", func() { - n1 := &model.Node{Addr: "unix:///var/run/docker.sock"} - n2 := &model.Node{Addr: "unix:///var/run/docker.sock"} - pool := newPool() - pool.allocate(n1) - pool.allocate(n2) - g.Assert(len(pool.nodes)).Equal(2) - g.Assert(len(pool.nodec)).Equal(2) - g.Assert(len(pool.list())).Equal(2) - }) - - g.It("Should remove a node", func() { - n1 := &model.Node{Addr: "unix:///var/run/docker.sock"} - n2 := &model.Node{Addr: "unix:///var/run/docker.sock"} - pool := newPool() - pool.allocate(n1) - pool.allocate(n2) - g.Assert(len(pool.nodes)).Equal(2) - pool.deallocate(n1) - pool.deallocate(n2) - g.Assert(len(pool.nodes)).Equal(0) - g.Assert(len(pool.list())).Equal(0) - }) - - }) -} diff --git a/engine/types.go b/engine/types.go deleted file mode 100644 index 8ba2dd3c3..000000000 --- a/engine/types.go +++ /dev/null @@ -1,24 +0,0 @@ -package engine - -import ( - "github.com/drone/drone/model" -) - -type Event struct { - Name string - Msg []byte -} - -type Task struct { - User *model.User `json:"-"` - Repo *model.Repo `json:"repo"` - Build *model.Build `json:"build"` - BuildPrev *model.Build `json:"build_last"` - Jobs []*model.Job `json:"-"` - Job *model.Job `json:"job"` - Keys *model.Key `json:"keys"` - Netrc *model.Netrc `json:"netrc"` - Config string `json:"config"` - Secret string `json:"secret"` - System *model.System `json:"system"` -} diff --git a/engine/updater.go b/engine/updater.go deleted file mode 100644 index 79c8454d3..000000000 --- a/engine/updater.go +++ /dev/null @@ -1,66 +0,0 @@ -package engine - -import ( - "encoding/json" - "fmt" - "io" - - "github.com/drone/drone/model" - "github.com/drone/drone/remote" - "github.com/drone/drone/store" - "golang.org/x/net/context" -) - -type updater struct { - bus *eventbus -} - -func (u *updater) SetBuild(c context.Context, r *Task) error { - err := store.UpdateBuild(c, r.Build) - if err != nil { - return err - } - - err = remote.FromContext(c).Status(r.User, r.Repo, r.Build, fmt.Sprintf("%s/%s/%d", r.System.Link, r.Repo.FullName, r.Build.Number)) - if err != nil { - // log err - } - - msg, err := json.Marshal(&payload{r.Build, r.Jobs}) - if err != nil { - return err - } - - u.bus.send(&Event{ - Name: r.Repo.FullName, - Msg: msg, - }) - return nil -} - -func (u *updater) SetJob(c context.Context, r *Task) error { - err := store.UpdateJob(c, r.Job) - if err != nil { - return err - } - - msg, err := json.Marshal(&payload{r.Build, r.Jobs}) - if err != nil { - return err - } - - u.bus.send(&Event{ - Name: r.Repo.FullName, - Msg: msg, - }) - return nil -} - -func (u *updater) SetLogs(c context.Context, r *Task, rc io.ReadCloser) error { - return store.WriteLog(c, r.Job, rc) -} - -type payload struct { - *model.Build - Jobs []*model.Job `json:"jobs"` -} diff --git a/engine/util.go b/engine/util.go deleted file mode 100644 index b8f9068bd..000000000 --- a/engine/util.go +++ /dev/null @@ -1,35 +0,0 @@ -package engine - -import ( - "encoding/json" -) - -func encodeToLegacyFormat(t *Task) ([]byte, error) { - // t.System.Plugins = append(t.System.Plugins, "plugins/*") - - // s := map[string]interface{}{} - // s["repo"] = t.Repo - // s["config"] = t.Config - // s["secret"] = t.Secret - // s["job"] = t.Job - // s["system"] = t.System - // s["workspace"] = map[string]interface{}{ - // "netrc": t.Netrc, - // "keys": t.Keys, - // } - // s["build"] = map[string]interface{}{ - // "number": t.Build.Number, - // "status": t.Build.Status, - // "head_commit": map[string]interface{}{ - // "sha": t.Build.Commit, - // "ref": t.Build.Ref, - // "branch": t.Build.Branch, - // "message": t.Build.Message, - // "author": map[string]interface{}{ - // "login": t.Build.Author, - // "email": t.Build.Email, - // }, - // }, - // } - return json.Marshal(t) -} diff --git a/engine/worker.go b/engine/worker.go deleted file mode 100644 index 9b3c8d8e9..000000000 --- a/engine/worker.go +++ /dev/null @@ -1,115 +0,0 @@ -package engine - -import ( - "fmt" - "io" - - "github.com/drone/drone/shared/docker" - "github.com/samalba/dockerclient" -) - -var ( - // name of the build agent container. - DefaultAgent = "drone/drone-exec:latest" - - // default name of the build agent executable - DefaultEntrypoint = []string{"/bin/drone-exec"} - - // default argument to invoke build steps - DefaultBuildArgs = []string{"--pull", "--cache", "--clone", "--build", "--deploy"} - - // default argument to invoke build steps - DefaultPullRequestArgs = []string{"--pull", "--cache", "--clone", "--build"} - - // default arguments to invoke notify steps - DefaultNotifyArgs = []string{"--pull", "--notify"} -) - -type worker struct { - client dockerclient.Client - build *dockerclient.ContainerInfo - notify *dockerclient.ContainerInfo -} - -func newWorker(client dockerclient.Client) *worker { - return &worker{client: client} -} - -// Build executes the clone, build and deploy steps. -func (w *worker) Build(name string, stdin []byte, pr bool) (_ int, err error) { - // the command line arguments passed into the - // build agent container. - args := DefaultBuildArgs - if pr { - args = DefaultPullRequestArgs - } - args = append(args, "--") - args = append(args, string(stdin)) - - conf := &dockerclient.ContainerConfig{ - Image: DefaultAgent, - Entrypoint: DefaultEntrypoint, - Cmd: args, - HostConfig: dockerclient.HostConfig{ - Binds: []string{"/var/run/docker.sock:/var/run/docker.sock"}, - }, - Volumes: map[string]struct{}{ - "/var/run/docker.sock": {}, - }, - } - - // TEMPORARY: always try to pull the new image for now - // since we'll be frequently updating the build image - // for the next few weeks - w.client.PullImage(conf.Image, nil) - - w.build, err = docker.Run(w.client, conf, name) - if err != nil { - return 1, err - } - if w.build.State.OOMKilled { - return 1, fmt.Errorf("OOMKill received") - } - return w.build.State.ExitCode, err -} - -// Notify executes the notification steps. -func (w *worker) Notify(stdin []byte) error { - - args := DefaultNotifyArgs - args = append(args, "--") - args = append(args, string(stdin)) - - conf := &dockerclient.ContainerConfig{ - Image: DefaultAgent, - Entrypoint: DefaultEntrypoint, - Cmd: args, - HostConfig: dockerclient.HostConfig{}, - } - - var err error - w.notify, err = docker.Run(w.client, conf, "") - return err -} - -// Logs returns a multi-reader that fetches the logs -// from the build and deploy agents. -func (w *worker) Logs() (io.ReadCloser, error) { - if w.build == nil { - return nil, errLogging - } - return w.client.ContainerLogs(w.build.Id, logOpts) -} - -// Remove stops and removes the build, deploy and -// notification agents created for the build task. -func (w *worker) Remove() { - if w.notify != nil { - w.client.KillContainer(w.notify.Id, "9") - w.client.RemoveContainer(w.notify.Id, true, true) - } - if w.build != nil { - w.client.KillContainer(w.build.Id, "9") - w.client.RemoveContainer(w.build.Id, true, true) - } -} diff --git a/model/node.go b/model/node.go deleted file mode 100644 index 03e6f8dfc..000000000 --- a/model/node.go +++ /dev/null @@ -1,36 +0,0 @@ -package model - -const ( - Freebsd_386 uint = iota - Freebsd_amd64 - Freebsd_arm - Linux_386 - Linux_amd64 - Linux_arm - Linux_arm64 - Solaris_amd64 - Windows_386 - Windows_amd64 -) - -var Archs = map[string]uint{ - "freebsd_386": Freebsd_386, - "freebsd_amd64": Freebsd_amd64, - "freebsd_arm": Freebsd_arm, - "linux_386": Linux_386, - "linux_amd64": Linux_amd64, - "linux_arm": Linux_arm, - "linux_arm64": Linux_arm64, - "solaris_amd64": Solaris_amd64, - "windows_386": Windows_386, - "windows_amd64": Windows_amd64, -} - -type Node struct { - ID int64 `meddler:"node_id,pk" json:"id"` - Addr string `meddler:"node_addr" json:"address"` - Arch string `meddler:"node_arch" json:"architecture"` - Cert string `meddler:"node_cert" json:"-"` - Key string `meddler:"node_key" json:"-"` - CA string `meddler:"node_ca" json:"-"` -} diff --git a/router/middleware/engine.go b/router/middleware/engine.go deleted file mode 100644 index 01da07067..000000000 --- a/router/middleware/engine.go +++ /dev/null @@ -1,28 +0,0 @@ -package middleware - -import ( - "sync" - - "github.com/drone/drone/engine" - "github.com/drone/drone/store" - - "github.com/gin-gonic/gin" -) - -// Engine is a middleware function that initializes the Engine and attaches to -// the context of every http.Request. -func Engine() gin.HandlerFunc { - var once sync.Once - var engine_ engine.Engine - - return func(c *gin.Context) { - - once.Do(func() { - store_ := store.FromContext(c) - engine_ = engine.Load(store_) - }) - - engine.ToContext(c, engine_) - c.Next() - } -} diff --git a/router/router.go b/router/router.go index 157e6f18e..32a13998f 100644 --- a/router/router.go +++ b/router/router.go @@ -2,7 +2,6 @@ package router import ( "net/http" - "os" "strings" "github.com/gin-gonic/gin" @@ -41,8 +40,6 @@ func Load(middlewares ...gin.HandlerFunc) http.Handler { { settings.Use(session.MustUser()) settings.GET("/profile", web.ShowUser) - settings.GET("/people", session.MustAdmin(), web.ShowUsers) - settings.GET("/nodes", session.MustAdmin(), web.ShowNodes) } repo := e.Group("/repos/:owner/:name") { @@ -83,14 +80,6 @@ func Load(middlewares ...gin.HandlerFunc) http.Handler { users.DELETE("/:login", api.DeleteUser) } - nodes := e.Group("/api/nodes") - { - nodes.Use(session.MustAdmin()) - nodes.GET("", api.GetNodes) - nodes.POST("", api.PostNode) - nodes.DELETE("/:node", api.DeleteNode) - } - repos := e.Group("/api/repos/:owner/:name") { repos.POST("", api.PostRepo) @@ -139,13 +128,8 @@ func Load(middlewares ...gin.HandlerFunc) http.Handler { stream.Use(session.SetPerm()) stream.Use(session.MustPull) - if os.Getenv("CANARY") == "true" { - stream.GET("/:owner/:name", web.GetRepoEvents2) - stream.GET("/:owner/:name/:build/:number", web.GetStream2) - } else { - stream.GET("/:owner/:name", web.GetRepoEvents) - stream.GET("/:owner/:name/:build/:number", web.GetStream) - } + stream.GET("/:owner/:name", web.GetRepoEvents) + stream.GET("/:owner/:name/:build/:number", web.GetStream) } bots := e.Group("/bots") @@ -164,14 +148,12 @@ func Load(middlewares ...gin.HandlerFunc) http.Handler { queue := e.Group("/api/queue") { - if os.Getenv("CANARY") == "true" { - queue.Use(middleware.AgentMust()) - queue.POST("/pull", api.Pull) - queue.POST("/pull/:os/:arch", api.Pull) - queue.POST("/wait/:id", api.Wait) - queue.POST("/stream/:id", api.Stream) - queue.POST("/status/:id", api.Update) - } + queue.Use(middleware.AgentMust()) + queue.POST("/pull", api.Pull) + queue.POST("/pull/:os/:arch", api.Pull) + queue.POST("/wait/:id", api.Wait) + queue.POST("/stream/:id", api.Stream) + queue.POST("/status/:id", api.Update) } gitlab := e.Group("/gitlab/:owner/:name") diff --git a/store/datastore/nodes.go b/store/datastore/nodes.go deleted file mode 100644 index 37e88652c..000000000 --- a/store/datastore/nodes.go +++ /dev/null @@ -1,48 +0,0 @@ -package datastore - -import ( - "github.com/drone/drone/model" - "github.com/russross/meddler" -) - -func (db *datastore) GetNode(id int64) (*model.Node, error) { - var node = new(model.Node) - var err = meddler.Load(db, nodeTable, node, id) - return node, err -} - -func (db *datastore) GetNodeList() ([]*model.Node, error) { - var nodes = []*model.Node{} - var err = meddler.QueryAll(db, &nodes, rebind(nodeListQuery)) - return nodes, err -} - -func (db *datastore) CreateNode(node *model.Node) error { - return meddler.Insert(db, nodeTable, node) -} - -func (db *datastore) UpdateNode(node *model.Node) error { - return meddler.Update(db, nodeTable, node) -} - -func (db *datastore) DeleteNode(node *model.Node) error { - var _, err = db.Exec(rebind(nodeDeleteStmt), node.ID) - return err -} - -const nodeTable = "nodes" - -const nodeListQuery = ` -SELECT * -FROM nodes -ORDER BY node_addr -` - -const nodeCountQuery = ` -SELECT COUNT(*) FROM nodes -` - -const nodeDeleteStmt = ` -DELETE FROM nodes -WHERE node_id=? -` diff --git a/store/datastore/nodes_test.go b/store/datastore/nodes_test.go deleted file mode 100644 index 3ae9898fd..000000000 --- a/store/datastore/nodes_test.go +++ /dev/null @@ -1,101 +0,0 @@ -package datastore - -import ( - "testing" - - "github.com/drone/drone/model" - "github.com/franela/goblin" -) - -func TestNodes(t *testing.T) { - db := openTest() - defer db.Close() - - s := From(db) - g := goblin.Goblin(t) - g.Describe("Nodes", func() { - - // before each test be sure to purge the package - // table data from the database. - g.BeforeEach(func() { - db.Exec("DELETE FROM nodes") - }) - - g.It("Should create a node", func() { - node := model.Node{ - Addr: "unix:///var/run/docker/docker.sock", - Arch: "linux_amd64", - } - err := s.CreateNode(&node) - g.Assert(err == nil).IsTrue() - g.Assert(node.ID != 0).IsTrue() - }) - - g.It("Should update a node", func() { - node := model.Node{ - Addr: "unix:///var/run/docker/docker.sock", - Arch: "linux_amd64", - } - err := s.CreateNode(&node) - g.Assert(err == nil).IsTrue() - g.Assert(node.ID != 0).IsTrue() - - node.Addr = "unix:///var/run/docker.sock" - - err1 := s.UpdateNode(&node) - getnode, err2 := s.GetNode(node.ID) - g.Assert(err1 == nil).IsTrue() - g.Assert(err2 == nil).IsTrue() - g.Assert(node.ID).Equal(getnode.ID) - g.Assert(node.Addr).Equal(getnode.Addr) - g.Assert(node.Arch).Equal(getnode.Arch) - }) - - g.It("Should get a node", func() { - node := model.Node{ - Addr: "unix:///var/run/docker/docker.sock", - Arch: "linux_amd64", - } - err := s.CreateNode(&node) - g.Assert(err == nil).IsTrue() - g.Assert(node.ID != 0).IsTrue() - - getnode, err := s.GetNode(node.ID) - g.Assert(err == nil).IsTrue() - g.Assert(node.ID).Equal(getnode.ID) - g.Assert(node.Addr).Equal(getnode.Addr) - g.Assert(node.Arch).Equal(getnode.Arch) - }) - - g.It("Should get a node list", func() { - node1 := model.Node{ - Addr: "unix:///var/run/docker/docker.sock", - Arch: "linux_amd64", - } - node2 := model.Node{ - Addr: "unix:///var/run/docker.sock", - Arch: "linux_386", - } - s.CreateNode(&node1) - s.CreateNode(&node2) - - nodes, err := s.GetNodeList() - g.Assert(err == nil).IsTrue() - g.Assert(len(nodes)).Equal(2) - }) - - g.It("Should delete a node", func() { - node := model.Node{ - Addr: "unix:///var/run/docker/docker.sock", - Arch: "linux_amd64", - } - err1 := s.CreateNode(&node) - err2 := s.DeleteNode(&node) - g.Assert(err1 == nil).IsTrue() - g.Assert(err2 == nil).IsTrue() - - _, err := s.GetNode(node.ID) - g.Assert(err == nil).IsFalse() - }) - }) -} diff --git a/store/store.go b/store/store.go index f0fd97968..a359a3edd 100644 --- a/store/store.go +++ b/store/store.go @@ -125,21 +125,6 @@ type Store interface { // WriteLog writes the job logs to the datastore. WriteLog(*model.Job, io.Reader) error - - // GetNode gets a build node from the datastore. - GetNode(id int64) (*model.Node, error) - - // GetNodeList gets a build node list from the datastore. - GetNodeList() ([]*model.Node, error) - - // CreateNode add a new build node to the datastore. - CreateNode(*model.Node) error - - // UpdateNode updates a build node in the datastore. - UpdateNode(*model.Node) error - - // DeleteNode removes a build node from the datastore. - DeleteNode(*model.Node) error } // GetUser gets a user by unique ID. @@ -343,23 +328,3 @@ func ReadLog(c context.Context, job *model.Job) (io.ReadCloser, error) { func WriteLog(c context.Context, job *model.Job, r io.Reader) error { return FromContext(c).WriteLog(job, r) } - -func GetNode(c context.Context, id int64) (*model.Node, error) { - return FromContext(c).GetNode(id) -} - -func GetNodeList(c context.Context) ([]*model.Node, error) { - return FromContext(c).GetNodeList() -} - -func CreateNode(c context.Context, node *model.Node) error { - return FromContext(c).CreateNode(node) -} - -func UpdateNode(c context.Context, node *model.Node) error { - return FromContext(c).UpdateNode(node) -} - -func DeleteNode(c context.Context, node *model.Node) error { - return FromContext(c).DeleteNode(node) -} diff --git a/template/amber/base.amber b/template/amber/base.amber index 491bf3cfc..2e864a264 100644 --- a/template/amber/base.amber +++ b/template/amber/base.amber @@ -34,9 +34,6 @@ html i.material-icons expand_more div.dropdown-menu.dropdown-menu-right a.dropdown-item[href="/settings/profile"] Profile - if User.Admin - a.dropdown-item[href="/settings/people"] People - a.dropdown-item[href="/settings/nodes"] Nodes a.dropdown-item[href="/logout"] Logout diff --git a/web/hook.go b/web/hook.go index 6544446b3..b61519949 100644 --- a/web/hook.go +++ b/web/hook.go @@ -3,16 +3,13 @@ package web import ( "fmt" "os" - "path/filepath" "regexp" - "strings" "github.com/gin-gonic/gin" "github.com/square/go-jose" log "github.com/Sirupsen/logrus" "github.com/drone/drone/bus" - "github.com/drone/drone/engine" "github.com/drone/drone/model" "github.com/drone/drone/queue" "github.com/drone/drone/remote" @@ -31,10 +28,7 @@ func init() { if droneYml == "" { droneYml = ".drone.yml" } - droneSec = fmt.Sprintf("%s.sec", strings.TrimSuffix(droneYml, filepath.Ext(droneYml))) - if os.Getenv("CANARY") == "true" { - droneSec = fmt.Sprintf("%s.sig", droneYml) - } + droneSec = fmt.Sprintf("%s.sig", droneYml) } var skipRe = regexp.MustCompile(`\[(?i:ci *skip|skip *ci)\]`) @@ -168,8 +162,6 @@ func PostHook(c *gin.Context) { return } - key, _ := store.GetKey(c, repo) - // verify the branches can be built vs skipped branches := yaml.ParseBranch(raw) if !branches.Matches(build.Branch) && build.Event != model.EventTag && build.Event != model.EventDeploy { @@ -214,71 +206,43 @@ func PostHook(c *gin.Context) { log.Errorf("Error getting secrets for %s#%d. %s", repo.FullName, build.Number, err) } - // IMPORTANT. PLEASE READ - // - // The below code uses a feature flag to switch between the current - // build engine and the exerimental 0.5 build engine. This can be - // enabled using with the environment variable CANARY=true + var signed bool + var verified bool - if os.Getenv("CANARY") == "true" { - - var signed bool - var verified bool - - signature, err := jose.ParseSigned(string(sec)) + signature, err := jose.ParseSigned(string(sec)) + if err != nil { + log.Debugf("cannot parse .drone.yml.sig file. %s", err) + } else if len(sec) == 0 { + log.Debugf("cannot parse .drone.yml.sig file. empty file") + } else { + signed = true + output, err := signature.Verify([]byte(repo.Hash)) if err != nil { - log.Debugf("cannot parse .drone.yml.sig file. %s", err) - } else if len(sec) == 0 { - log.Debugf("cannot parse .drone.yml.sig file. empty file") + log.Debugf("cannot verify .drone.yml.sig file. %s", err) + } else if string(output) != string(raw) { + log.Debugf("cannot verify .drone.yml.sig file. no match") } else { - signed = true - output, err := signature.Verify([]byte(repo.Hash)) - if err != nil { - log.Debugf("cannot verify .drone.yml.sig file. %s", err) - } else if string(output) != string(raw) { - log.Debugf("cannot verify .drone.yml.sig file. no match") - } else { - verified = true - } + verified = true } - - log.Debugf(".drone.yml is signed=%v and verified=%v", signed, verified) - - bus.Publish(c, bus.NewBuildEvent(bus.Enqueued, repo, build)) - for _, job := range jobs { - queue.Publish(c, &queue.Work{ - Signed: signed, - Verified: verified, - User: user, - Repo: repo, - Build: build, - BuildLast: last, - Job: job, - Netrc: netrc, - Yaml: string(raw), - Secrets: secs, - System: &model.System{Link: httputil.GetURL(c.Request)}, - }) - } - return // EXIT NOT TO AVOID THE 0.4 ENGINE CODE BELOW } - engine_ := engine.FromContext(c) - go engine_.Schedule(c.Copy(), &engine.Task{ - User: user, - Repo: repo, - Build: build, - BuildPrev: last, - Jobs: jobs, - Keys: key, - Netrc: netrc, - Config: string(raw), - Secret: string(sec), - System: &model.System{ - Link: httputil.GetURL(c.Request), - Plugins: strings.Split(os.Getenv("PLUGIN_FILTER"), " "), - Globals: strings.Split(os.Getenv("PLUGIN_PARAMS"), " "), - Escalates: strings.Split(os.Getenv("ESCALATE_FILTER"), " "), - }, - }) + log.Debugf(".drone.yml is signed=%v and verified=%v", signed, verified) + + bus.Publish(c, bus.NewBuildEvent(bus.Enqueued, repo, build)) + for _, job := range jobs { + queue.Publish(c, &queue.Work{ + Signed: signed, + Verified: verified, + User: user, + Repo: repo, + Build: build, + BuildLast: last, + Job: job, + Netrc: netrc, + Yaml: string(raw), + Secrets: secs, + System: &model.System{Link: httputil.GetURL(c.Request)}, + }) + } + } diff --git a/web/pages.go b/web/pages.go index 46da27304..8480c1e85 100644 --- a/web/pages.go +++ b/web/pages.go @@ -30,7 +30,7 @@ func ShowIndex(c *gin.Context) { } // filter to only show the currently active ones - activeRepos, err := store.GetRepoListOf(c,repos) + activeRepos, err := store.GetRepoListOf(c, repos) if err != nil { c.String(400, err.Error()) return @@ -83,26 +83,6 @@ func ShowUser(c *gin.Context) { }) } -func ShowUsers(c *gin.Context) { - user := session.User(c) - if !user.Admin { - c.AbortWithStatus(http.StatusForbidden) - return - } - users, _ := store.GetUserList(c) - - token, _ := token.New( - token.CsrfToken, - user.Login, - ).Sign(user.Hash) - - c.HTML(200, "users.html", gin.H{ - "User": user, - "Users": users, - "Csrf": token, - }) -} - func ShowRepo(c *gin.Context) { user := session.User(c) repo := session.Repo(c) @@ -227,10 +207,3 @@ func ShowBuild(c *gin.Context) { "Csrf": csrf, }) } - -func ShowNodes(c *gin.Context) { - user := session.User(c) - nodes, _ := store.GetNodeList(c) - token, _ := token.New(token.CsrfToken, user.Login).Sign(user.Hash) - c.HTML(http.StatusOK, "nodes.html", gin.H{"User": user, "Nodes": nodes, "Csrf": token}) -} diff --git a/web/stream.go b/web/stream.go index 623beb827..cceff005c 100644 --- a/web/stream.go +++ b/web/stream.go @@ -1,15 +1,18 @@ package web import ( + "bufio" + "encoding/json" "io" "strconv" "github.com/gin-gonic/gin" - "github.com/docker/docker/pkg/stdcopy" - "github.com/drone/drone/engine" + "github.com/drone/drone/bus" + "github.com/drone/drone/model" "github.com/drone/drone/router/middleware/session" "github.com/drone/drone/store" + "github.com/drone/drone/stream" log "github.com/Sirupsen/logrus" @@ -19,14 +22,13 @@ import ( // GetRepoEvents will upgrade the connection to a Websocket and will stream // event updates to the browser. func GetRepoEvents(c *gin.Context) { - engine_ := engine.FromContext(c) repo := session.Repo(c) c.Writer.Header().Set("Content-Type", "text/event-stream") - eventc := make(chan *engine.Event, 1) - engine_.Subscribe(eventc) + eventc := make(chan *bus.Event, 1) + bus.Subscribe(c, eventc) defer func() { - engine_.Unsubscribe(eventc) + bus.Unsubscribe(c, eventc) close(eventc) log.Infof("closed event stream") }() @@ -38,11 +40,22 @@ func GetRepoEvents(c *gin.Context) { log.Infof("nil event received") return false } - if event.Name == repo.FullName { - log.Debugf("received message %s", event.Name) + + // TODO(bradrydzewski) This is a super hacky workaround until we improve + // the actual bus. Having a per-call database event is just plain stupid. + if event.Repo.FullName == repo.FullName { + + var payload = struct { + model.Build + Jobs []*model.Job `json:"jobs"` + }{} + payload.Build = event.Build + payload.Jobs, _ = store.GetJobList(c, &event.Build) + data, _ := json.Marshal(&payload) + sse.Encode(w, sse.Event{ Event: "message", - Data: string(event.Msg), + Data: string(data), }) } case <-c.Writer.CloseNotify(): @@ -54,7 +67,6 @@ func GetRepoEvents(c *gin.Context) { func GetStream(c *gin.Context) { - engine_ := engine.FromContext(c) repo := session.Repo(c) buildn, _ := strconv.Atoi(c.Param("build")) jobn, _ := strconv.Atoi(c.Param("number")) @@ -73,48 +85,32 @@ func GetStream(c *gin.Context) { c.AbortWithError(404, err) return } - node, err := store.GetNode(c, job.NodeID) - if err != nil { - log.Debugln("stream cannot get node.", err) - c.AbortWithError(404, err) - return - } - rc, err := engine_.Stream(build.ID, job.ID, node) + rc, err := stream.Reader(c, stream.ToKey(job.ID)) if err != nil { c.AbortWithError(404, err) return } - defer func() { - rc.Close() - }() - go func() { - defer func() { - recover() - }() <-c.Writer.CloseNotify() rc.Close() }() - rw := &StreamWriter{c.Writer, 0} + var line int + var scanner = bufio.NewScanner(rc) + for scanner.Scan() { + line++ + var err = sse.Encode(c.Writer, sse.Event{ + Id: strconv.Itoa(line), + Event: "message", + Data: scanner.Text(), + }) + if err != nil { + break + } + c.Writer.Flush() + } - stdcopy.StdCopy(rw, rw, rc) -} - -type StreamWriter struct { - writer gin.ResponseWriter - count int -} - -func (w *StreamWriter) Write(data []byte) (int, error) { - var err = sse.Encode(w.writer, sse.Event{ - Id: strconv.Itoa(w.count), - Event: "message", - Data: string(data), - }) - w.writer.Flush() - w.count += len(data) - return len(data), err + log.Debugf("Closed stream %s#%d", repo.FullName, build.Number) } diff --git a/web/stream2.go b/web/stream2.go deleted file mode 100644 index 91067dd83..000000000 --- a/web/stream2.go +++ /dev/null @@ -1,121 +0,0 @@ -package web - -import ( - "bufio" - "encoding/json" - "io" - "strconv" - - "github.com/gin-gonic/gin" - - "github.com/drone/drone/bus" - "github.com/drone/drone/model" - "github.com/drone/drone/router/middleware/session" - "github.com/drone/drone/store" - "github.com/drone/drone/stream" - - log "github.com/Sirupsen/logrus" - - "github.com/manucorporat/sse" -) - -// IMPORTANT. PLEASE READ -// -// This file containers experimental streaming features for the 0.5 -// release. These can be enabled with the feature flag CANARY=true - -// GetRepoEvents will upgrade the connection to a Websocket and will stream -// event updates to the browser. -func GetRepoEvents2(c *gin.Context) { - repo := session.Repo(c) - c.Writer.Header().Set("Content-Type", "text/event-stream") - - eventc := make(chan *bus.Event, 1) - bus.Subscribe(c, eventc) - defer func() { - bus.Unsubscribe(c, eventc) - close(eventc) - log.Infof("closed event stream") - }() - - c.Stream(func(w io.Writer) bool { - select { - case event := <-eventc: - if event == nil { - log.Infof("nil event received") - return false - } - - // TODO(bradrydzewski) This is a super hacky workaround until we improve - // the actual bus. Having a per-call database event is just plain stupid. - if event.Repo.FullName == repo.FullName { - - var payload = struct { - model.Build - Jobs []*model.Job `json:"jobs"` - }{} - payload.Build = event.Build - payload.Jobs, _ = store.GetJobList(c, &event.Build) - data, _ := json.Marshal(&payload) - - sse.Encode(w, sse.Event{ - Event: "message", - Data: string(data), - }) - } - case <-c.Writer.CloseNotify(): - return false - } - return true - }) -} - -func GetStream2(c *gin.Context) { - - repo := session.Repo(c) - buildn, _ := strconv.Atoi(c.Param("build")) - jobn, _ := strconv.Atoi(c.Param("number")) - - c.Writer.Header().Set("Content-Type", "text/event-stream") - - build, err := store.GetBuildNumber(c, repo, buildn) - if err != nil { - log.Debugln("stream cannot get build number.", err) - c.AbortWithError(404, err) - return - } - job, err := store.GetJobNumber(c, build, jobn) - if err != nil { - log.Debugln("stream cannot get job number.", err) - c.AbortWithError(404, err) - return - } - - rc, err := stream.Reader(c, stream.ToKey(job.ID)) - if err != nil { - c.AbortWithError(404, err) - return - } - - go func() { - <-c.Writer.CloseNotify() - rc.Close() - }() - - var line int - var scanner = bufio.NewScanner(rc) - for scanner.Scan() { - line++ - var err = sse.Encode(c.Writer, sse.Event{ - Id: strconv.Itoa(line), - Event: "message", - Data: scanner.Text(), - }) - if err != nil { - break - } - c.Writer.Flush() - } - - log.Debugf("Closed stream %s#%d", repo.FullName, build.Number) -}