improved streaming w/ websocket. potential for auto re-connect, resume

This commit is contained in:
Brad Rydzewski
2015-05-17 23:07:10 -07:00
parent 73ed4f08d5
commit a14d21f5b9
4 changed files with 55 additions and 130 deletions

View File

@@ -87,7 +87,7 @@
// subscribes to the build otuput. // subscribes to the build otuput.
logs.subscribe(fullName, number, step, function(data){ logs.subscribe(fullName, number, step, function(data){
term.innerHTML += convert.toHtml(data); term.innerHTML += convert.toHtml(data.replace("\\n","\n"));
if (tail) { if (tail) {
// scrolls to the bottom of the page if enabled // scrolls to the bottom of the page if enabled
$window.scrollTo(0, $window.document.body.scrollHeight); $window.scrollTo(0, $window.document.body.scrollHeight);

View File

@@ -20,31 +20,34 @@
}; };
var callback, var callback,
websocket, events,
token = localStorage.getItem('access_token'); token = localStorage.getItem('access_token');
this.subscribe = function (repoName, number, step, _callback) { this.subscribe = function (repoName, number, step, _callback) {
callback = _callback; callback = _callback;
var proto = ($window.location.protocol === 'https:' ? 'wss' : 'ws'), var route = ['/api/stream/', repoName, '/', number, '/', step, '?access_token=', token].join('')
route = [proto, "://", $window.location.host, '/api/stream/', repoName, '/', number, '/', step, '?access_token=', token].join(''); events = new EventSource(route, { withCredentials: true });
events.onmessage = function (event) {
websocket = new WebSocket(route);
websocket.onmessage = function (event) {
if (callback !== undefined) { if (callback !== undefined) {
callback(event.data); callback(event.data);
} }
}; };
websocket.onclose = function (event) { events.onerror = function (event) {
console.log('logs websocket closed'); callback = undefined;
if (events !== undefined) {
events.close();
events = undefined;
}
console.log('user event stream closed due to error.', event);
}; };
}; };
this.unsubscribe = function () { this.unsubscribe = function () {
callback = undefined; callback = undefined;
if (websocket !== undefined) { if (events !== undefined) {
websocket.close(); events.close();
websocket = undefined; events = undefined;
} }
}; };
} }

View File

@@ -224,6 +224,7 @@ func (r *Runner) Logs(build *common.Build) (io.ReadCloser, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
// verify the container is running. if not we'll // verify the container is running. if not we'll
// do an exponential backoff and attempt to wait // do an exponential backoff and attempt to wait
if !info.State.Running { if !info.State.Running {
@@ -242,16 +243,17 @@ func (r *Runner) Logs(build *common.Build) (io.ReadCloser, error) {
} }
} }
rc, err := client.ContainerLogs(info.Id, logOptsTail) return client.ContainerLogs(info.Id, logOptsTail)
if err != nil { // rc, err := client.ContainerLogs(info.Id, logOptsTail)
return nil, err // if err != nil {
} // return nil, err
pr, pw := io.Pipe() // }
go func() { // pr, pw := io.Pipe()
defer rc.Close() // go func() {
docker.StdCopy(pw, pw, rc) // defer rc.Close()
}() // docker.StdCopy(pw, pw, rc)
return pr, nil // }()
// return pr, nil
} }
func cname(build *common.Build) string { func cname(build *common.Build) string {

View File

@@ -1,39 +1,17 @@
package server package server
import ( import (
"bufio"
"encoding/json"
"io" "io"
"net/http"
"strconv" "strconv"
"time"
"github.com/drone/drone/pkg/bus" "github.com/drone/drone/pkg/bus"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/drone/drone/pkg/docker"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/gorilla/websocket" "github.com/manucorporat/sse"
// "github.com/koding/websocketproxy"
) )
const (
// Time allowed to write the message to the client.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the client.
pongWait = 60 * time.Second
// Send pings to client with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool { return true },
}
// GetRepoEvents will upgrade the connection to a Websocket and will stream // GetRepoEvents will upgrade the connection to a Websocket and will stream
// event updates to the browser. // event updates to the browser.
func GetRepoEvents(c *gin.Context) { func GetRepoEvents(c *gin.Context) {
@@ -45,6 +23,7 @@ func GetRepoEvents(c *gin.Context) {
bus_.Subscribe(eventc) bus_.Subscribe(eventc)
defer func() { defer func() {
bus_.Unsubscribe(eventc) bus_.Unsubscribe(eventc)
close(eventc)
log.Infof("closed event stream") log.Infof("closed event stream")
}() }()
@@ -57,9 +36,10 @@ func GetRepoEvents(c *gin.Context) {
} }
if event.Kind == bus.EventRepo && if event.Kind == bus.EventRepo &&
event.Name == repo.FullName { event.Name == repo.FullName {
d := map[string]interface{}{} sse.Encode(w, sse.Event{
json.Unmarshal(event.Msg, &d) Event: "message",
c.SSEvent("message", d) Data: string(event.Msg),
})
} }
case <-c.Writer.CloseNotify(): case <-c.Writer.CloseNotify():
return false return false
@@ -75,11 +55,7 @@ func GetStream(c *gin.Context) {
commitseq, _ := strconv.Atoi(c.Params.ByName("build")) commitseq, _ := strconv.Atoi(c.Params.ByName("build"))
buildseq, _ := strconv.Atoi(c.Params.ByName("number")) buildseq, _ := strconv.Atoi(c.Params.ByName("number"))
// agent, err := store.BuildAgent(repo.FullName, build) c.Writer.Header().Set("Content-Type", "text/event-stream")
// if err != nil {
// c.Fail(404, err)
// return
// }
commit, err := store.CommitSeq(repo, commitseq) commit, err := store.CommitSeq(repo, commitseq)
if err != nil { if err != nil {
@@ -97,89 +73,33 @@ func GetStream(c *gin.Context) {
c.Fail(404, err) c.Fail(404, err)
return return
} }
go func() {
<-c.Writer.CloseNotify()
rc.Close()
}()
// upgrade the websocket rw := &StreamWriter{c.Writer, 0}
ws, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
c.Fail(400, err)
return
}
var ticker = time.NewTicker(pingPeriod)
var out = make(chan []byte)
defer func() { defer func() {
log.Infof("closed stdout websocket") log.Infof("closed log stream")
ticker.Stop()
rc.Close() rc.Close()
ws.Close()
}() }()
go func() { docker.StdCopy(rw, rw, rc)
for {
select {
case <-c.Writer.CloseNotify():
rc.Close()
ws.Close()
return
case line := <-out:
ws.WriteMessage(websocket.TextMessage, line)
case <-ticker.C:
ws.SetWriteDeadline(time.Now().Add(writeWait))
err := ws.WriteMessage(websocket.PingMessage, []byte{})
if err != nil {
rc.Close()
ws.Close()
return
}
}
}
}()
go func() {
rd := bufio.NewReader(rc)
for {
str, err := rd.ReadBytes('\n')
if err != nil {
break
}
if len(str) == 0 {
break
}
out <- str
}
rc.Close()
ws.Close()
}()
readWebsocket(ws)
// url_, err := url.Parse("ws://" + agent.Addr)
// if err != nil {
// c.Fail(500, err)
// return
// }
// url_.Path = fmt.Sprintf("/stream/%s/%v/%v", repo.FullName, build, task)
// proxy := websocketproxy.NewProxy(url_)
// proxy.ServeHTTP(c.Writer, c.Request)
// log.Debugf("closed websocket")
} }
// readWebsocket will block while reading the websocket data type StreamWriter struct {
func readWebsocket(ws *websocket.Conn) { writer gin.ResponseWriter
defer ws.Close() count int
ws.SetReadLimit(512) }
ws.SetReadDeadline(time.Now().Add(pongWait))
ws.SetPongHandler(func(string) error { func (w *StreamWriter) Write(data []byte) (int, error) {
ws.SetReadDeadline(time.Now().Add(pongWait)) var err = sse.Encode(w.writer, sse.Event{
return nil Id: strconv.Itoa(w.count),
Event: "message",
Data: string(data),
}) })
for { w.writer.Flush()
_, _, err := ws.ReadMessage() w.count += len(data)
if err != nil { return len(data), err
break
}
}
} }