diff --git a/tee-listeners/recorder.go b/tee-listeners/recorder.go index a10b93b..f564a70 100644 --- a/tee-listeners/recorder.go +++ b/tee-listeners/recorder.go @@ -20,6 +20,7 @@ type Recorder struct { buffer bytes.Buffer serverInitMessage *common.ServerInit sessionStartWritten bool + segmentChan chan *common.RfbSegment } func getNowMillisec() int { @@ -40,6 +41,16 @@ func NewRecorder(saveFilePath string) *Recorder { fmt.Printf("unable to open file: %s, error: %v", saveFilePath, err) return nil } + + //buffer the channel so we don't halt the proxying flow for slow writes when under pressure + rec.segmentChan = make(chan *common.RfbSegment, 100) + go func() { + for { + data := <-rec.segmentChan + rec.HandleRfbSegment(data) + } + }() + return &rec } @@ -104,11 +115,24 @@ func (r *Recorder) writeStartSession(initMsg *common.ServerInit) error { } func (r *Recorder) Consume(data *common.RfbSegment) error { + + select { + case r.segmentChan <- data: + default: + fmt.Println("error: recorder queue is full") + } + + return nil +} + +func (r *Recorder) HandleRfbSegment(data *common.RfbSegment) error { + switch data.SegmentType { case common.SegmentMessageSeparator: if !r.sessionStartWritten { r.writeStartSession(r.serverInitMessage) } + switch common.ServerMessageType(data.UpcomingObjectType) { case common.FramebufferUpdate: r.writeToDisk() @@ -128,8 +152,8 @@ func (r *Recorder) Consume(data *common.RfbSegment) error { r.serverInitMessage = data.Message.(*common.ServerInit) case common.SegmentFullyParsedClientMessage: clientMsg := data.Message.(common.ClientMessage) - switch clientMsg.Type() { + switch clientMsg.Type() { case common.SetPixelFormatMsgType: clientMsg := data.Message.(*server.SetPixelFormat) r.serverInitMessage.PixelFormat = clientMsg.PF @@ -138,7 +162,7 @@ func (r *Recorder) Consume(data *common.RfbSegment) error { } default: - return errors.New("undefined RfbSegment type") + //return errors.New("undefined RfbSegment type") } return nil }