diff --git a/tap/tcp_reader.go b/tap/tcp_reader.go index 854834385..0315745a0 100644 --- a/tap/tcp_reader.go +++ b/tap/tcp_reader.go @@ -89,7 +89,7 @@ func (reader *tcpReader) rewind() { reader.exhaustBuffer = true // Reset the data and msgBuffer from the master record - reader.data = []byte{} + reader.data = make([]byte, 0) reader.msgBuffer = make([]api.TcpReaderDataMsg, len(reader.msgBufferMaster)) copy(reader.msgBuffer, reader.msgBufferMaster) @@ -97,18 +97,26 @@ func (reader *tcpReader) rewind() { reader.progress.Reset() } +func (reader *tcpReader) populateData(msg api.TcpReaderDataMsg) { + reader.data = msg.GetBytes() + reader.captureTime = msg.GetTimestamp() +} + func (reader *tcpReader) Read(p []byte) (int, error) { var msg api.TcpReaderDataMsg - // TODO: There are problems in AMQP and Kafka if reader.exhaustBuffer && len(reader.data) == 0 { if len(reader.msgBuffer) > 0 { // Pop first message - msg, reader.msgBuffer = reader.msgBuffer[0], reader.msgBuffer[1:] + if len(reader.msgBuffer) > 1 { + msg, reader.msgBuffer = reader.msgBuffer[0], reader.msgBuffer[1:] + } else { + msg = reader.msgBuffer[0] + reader.msgBuffer = make([]api.TcpReaderDataMsg, 0) + } // Get the bytes - reader.data = msg.GetBytes() - reader.captureTime = msg.GetTimestamp() + reader.populateData(msg) // Set exhaustBuffer to false if we exhaust the msgBuffer if len(reader.msgBuffer) == 0 { @@ -124,10 +132,13 @@ func (reader *tcpReader) Read(p []byte) (int, error) { for ok && len(reader.data) == 0 { msg, ok = <-reader.msgQueue if msg != nil { - reader.data = msg.GetBytes() - reader.captureTime = msg.GetTimestamp() + reader.populateData(msg) + if !reader.isProtocolIdentified() { - reader.msgBufferMaster = append(reader.msgBufferMaster, msg) + reader.msgBufferMaster = append( + reader.msgBufferMaster, + NewTcpReaderDataMsg(msg.GetBytes(), msg.GetTimestamp()), + ) } } diff --git a/tap/tcp_stream.go b/tap/tcp_stream.go index 35c25f5ad..d63982920 100644 --- a/tap/tcp_stream.go +++ b/tap/tcp_stream.go @@ -71,10 +71,8 @@ func (t *tcpStream) SetProtocol(protocol *api.Protocol) { t.protocol = protocol // Clean the buffers - t.client.msgBuffer = []api.TcpReaderDataMsg{} - t.client.msgBufferMaster = []api.TcpReaderDataMsg{} - t.server.msgBuffer = []api.TcpReaderDataMsg{} - t.server.msgBufferMaster = []api.TcpReaderDataMsg{} + t.client.msgBufferMaster = make([]api.TcpReaderDataMsg, 0) + t.server.msgBufferMaster = make([]api.TcpReaderDataMsg, 0) } func (t *tcpStream) GetOrigin() api.Capture {