making the recorder async

This commit is contained in:
amit bezalel 2017-07-07 23:46:05 +03:00
parent 7882e7f051
commit d40b7670d5

View File

@ -20,6 +20,7 @@ type Recorder struct {
buffer bytes.Buffer buffer bytes.Buffer
serverInitMessage *common.ServerInit serverInitMessage *common.ServerInit
sessionStartWritten bool sessionStartWritten bool
segmentChan chan *common.RfbSegment
} }
func getNowMillisec() int { func getNowMillisec() int {
@ -40,6 +41,16 @@ func NewRecorder(saveFilePath string) *Recorder {
fmt.Printf("unable to open file: %s, error: %v", saveFilePath, err) fmt.Printf("unable to open file: %s, error: %v", saveFilePath, err)
return nil return nil
} }
//buffer the channel so we don't halt the proxying flow for slow writes when under pressure
rec.segmentChan = make(chan *common.RfbSegment, 100)
go func() {
for {
data := <-rec.segmentChan
rec.HandleRfbSegment(data)
}
}()
return &rec return &rec
} }
@ -104,11 +115,24 @@ func (r *Recorder) writeStartSession(initMsg *common.ServerInit) error {
} }
func (r *Recorder) Consume(data *common.RfbSegment) error { func (r *Recorder) Consume(data *common.RfbSegment) error {
select {
case r.segmentChan <- data:
default:
fmt.Println("error: recorder queue is full")
}
return nil
}
func (r *Recorder) HandleRfbSegment(data *common.RfbSegment) error {
switch data.SegmentType { switch data.SegmentType {
case common.SegmentMessageSeparator: case common.SegmentMessageSeparator:
if !r.sessionStartWritten { if !r.sessionStartWritten {
r.writeStartSession(r.serverInitMessage) r.writeStartSession(r.serverInitMessage)
} }
switch common.ServerMessageType(data.UpcomingObjectType) { switch common.ServerMessageType(data.UpcomingObjectType) {
case common.FramebufferUpdate: case common.FramebufferUpdate:
r.writeToDisk() r.writeToDisk()
@ -128,8 +152,8 @@ func (r *Recorder) Consume(data *common.RfbSegment) error {
r.serverInitMessage = data.Message.(*common.ServerInit) r.serverInitMessage = data.Message.(*common.ServerInit)
case common.SegmentFullyParsedClientMessage: case common.SegmentFullyParsedClientMessage:
clientMsg := data.Message.(common.ClientMessage) clientMsg := data.Message.(common.ClientMessage)
switch clientMsg.Type() {
switch clientMsg.Type() {
case common.SetPixelFormatMsgType: case common.SetPixelFormatMsgType:
clientMsg := data.Message.(*server.SetPixelFormat) clientMsg := data.Message.(*server.SetPixelFormat)
r.serverInitMessage.PixelFormat = clientMsg.PF r.serverInitMessage.PixelFormat = clientMsg.PF
@ -138,7 +162,7 @@ func (r *Recorder) Consume(data *common.RfbSegment) error {
} }
default: default:
return errors.New("undefined RfbSegment type") //return errors.New("undefined RfbSegment type")
} }
return nil return nil
} }