diff --git a/tap/tcp_reader.go b/tap/tcp_reader.go index 0315745a0..0b4a92513 100644 --- a/tap/tcp_reader.go +++ b/tap/tcp_reader.go @@ -2,6 +2,7 @@ package tap import ( "bufio" + "bytes" "io" "sync" "time" @@ -21,9 +22,8 @@ type tcpReader struct { isClient bool isOutgoing bool msgQueue chan api.TcpReaderDataMsg // Channel of captured reassembled tcp payload - msgBuffer []api.TcpReaderDataMsg - msgBufferMaster []api.TcpReaderDataMsg - exhaustBuffer bool + msgBuffer *bytes.Buffer + msgBufferMaster *bytes.Buffer data []byte progress *api.ReadProgress captureTime time.Time @@ -37,15 +37,17 @@ type tcpReader struct { func NewTcpReader(msgQueue chan api.TcpReaderDataMsg, progress *api.ReadProgress, ident string, tcpId *api.TcpID, captureTime time.Time, parent *tcpStream, isClient bool, isOutgoing bool, emitter api.Emitter) *tcpReader { return &tcpReader{ - msgQueue: msgQueue, - progress: progress, - ident: ident, - tcpID: tcpId, - captureTime: captureTime, - parent: parent, - isClient: isClient, - isOutgoing: isOutgoing, - emitter: emitter, + msgQueue: msgQueue, + msgBuffer: bytes.NewBuffer(make([]byte, 0)), + msgBufferMaster: bytes.NewBuffer(make([]byte, 0)), + progress: progress, + ident: ident, + tcpID: tcpId, + captureTime: captureTime, + parent: parent, + isClient: isClient, + isOutgoing: isOutgoing, + emitter: emitter, } } @@ -85,60 +87,32 @@ func (reader *tcpReader) isProtocolIdentified() bool { } func (reader *tcpReader) rewind() { - // Tell Read to exhaust the msgBuffer - reader.exhaustBuffer = true - // Reset the data and msgBuffer from the master record reader.data = make([]byte, 0) - reader.msgBuffer = make([]api.TcpReaderDataMsg, len(reader.msgBufferMaster)) - copy(reader.msgBuffer, reader.msgBufferMaster) + buffer := reader.msgBufferMaster.Bytes() + reader.msgBuffer = bytes.NewBuffer(make([]byte, 0)) + reader.msgBuffer.Write(buffer) // Reset the read progress reader.progress.Reset() } -func (reader *tcpReader) populateData(msg api.TcpReaderDataMsg) { - reader.data = msg.GetBytes() - reader.captureTime = msg.GetTimestamp() -} - func (reader *tcpReader) Read(p []byte) (int, error) { - var msg api.TcpReaderDataMsg - - if reader.exhaustBuffer && len(reader.data) == 0 { - if len(reader.msgBuffer) > 0 { - // Pop first message - if len(reader.msgBuffer) > 1 { - msg, reader.msgBuffer = reader.msgBuffer[0], reader.msgBuffer[1:] - } else { - msg = reader.msgBuffer[0] - reader.msgBuffer = make([]api.TcpReaderDataMsg, 0) - } - - // Get the bytes - reader.populateData(msg) - - // Set exhaustBuffer to false if we exhaust the msgBuffer - if len(reader.msgBuffer) == 0 { - reader.exhaustBuffer = false - } - } else { - // Buffer is empty - reader.exhaustBuffer = false - } + if reader.msgBuffer.Len() > 0 { + reader.data = reader.msgBuffer.Bytes() + reader.msgBuffer = bytes.NewBuffer(make([]byte, 0)) } + var msg api.TcpReaderDataMsg ok := true for ok && len(reader.data) == 0 { msg, ok = <-reader.msgQueue if msg != nil { - reader.populateData(msg) + reader.data = msg.GetBytes() + reader.captureTime = msg.GetTimestamp() if !reader.isProtocolIdentified() { - reader.msgBufferMaster = append( - reader.msgBufferMaster, - NewTcpReaderDataMsg(msg.GetBytes(), msg.GetTimestamp()), - ) + reader.msgBufferMaster.Write(reader.data) } } diff --git a/tap/tcp_stream.go b/tap/tcp_stream.go index d63982920..04d78e6d7 100644 --- a/tap/tcp_stream.go +++ b/tap/tcp_stream.go @@ -1,6 +1,7 @@ package tap import ( + "bytes" "sync" "time" @@ -71,8 +72,8 @@ func (t *tcpStream) SetProtocol(protocol *api.Protocol) { t.protocol = protocol // Clean the buffers - t.client.msgBufferMaster = make([]api.TcpReaderDataMsg, 0) - t.server.msgBufferMaster = make([]api.TcpReaderDataMsg, 0) + t.client.msgBufferMaster = bytes.NewBuffer(make([]byte, 0)) + t.server.msgBufferMaster = bytes.NewBuffer(make([]byte, 0)) } func (t *tcpStream) GetOrigin() api.Capture {