Fix the problems in AMQP and Kafka #run_acceptance_tests

This commit is contained in:
M. Mert Yildiran
2022-05-13 00:33:12 +03:00
parent f77f8bc9c2
commit e7f3b020a3
2 changed files with 21 additions and 12 deletions

View File

@@ -89,7 +89,7 @@ func (reader *tcpReader) rewind() {
reader.exhaustBuffer = true reader.exhaustBuffer = true
// Reset the data and msgBuffer from the master record // Reset the data and msgBuffer from the master record
reader.data = []byte{} reader.data = make([]byte, 0)
reader.msgBuffer = make([]api.TcpReaderDataMsg, len(reader.msgBufferMaster)) reader.msgBuffer = make([]api.TcpReaderDataMsg, len(reader.msgBufferMaster))
copy(reader.msgBuffer, reader.msgBufferMaster) copy(reader.msgBuffer, reader.msgBufferMaster)
@@ -97,18 +97,26 @@ func (reader *tcpReader) rewind() {
reader.progress.Reset() reader.progress.Reset()
} }
func (reader *tcpReader) populateData(msg api.TcpReaderDataMsg) {
reader.data = msg.GetBytes()
reader.captureTime = msg.GetTimestamp()
}
func (reader *tcpReader) Read(p []byte) (int, error) { func (reader *tcpReader) Read(p []byte) (int, error) {
var msg api.TcpReaderDataMsg var msg api.TcpReaderDataMsg
// TODO: There are problems in AMQP and Kafka
if reader.exhaustBuffer && len(reader.data) == 0 { if reader.exhaustBuffer && len(reader.data) == 0 {
if len(reader.msgBuffer) > 0 { if len(reader.msgBuffer) > 0 {
// Pop first message // Pop first message
if len(reader.msgBuffer) > 1 {
msg, reader.msgBuffer = reader.msgBuffer[0], reader.msgBuffer[1:] msg, reader.msgBuffer = reader.msgBuffer[0], reader.msgBuffer[1:]
} else {
msg = reader.msgBuffer[0]
reader.msgBuffer = make([]api.TcpReaderDataMsg, 0)
}
// Get the bytes // Get the bytes
reader.data = msg.GetBytes() reader.populateData(msg)
reader.captureTime = msg.GetTimestamp()
// Set exhaustBuffer to false if we exhaust the msgBuffer // Set exhaustBuffer to false if we exhaust the msgBuffer
if len(reader.msgBuffer) == 0 { if len(reader.msgBuffer) == 0 {
@@ -124,10 +132,13 @@ func (reader *tcpReader) Read(p []byte) (int, error) {
for ok && len(reader.data) == 0 { for ok && len(reader.data) == 0 {
msg, ok = <-reader.msgQueue msg, ok = <-reader.msgQueue
if msg != nil { if msg != nil {
reader.data = msg.GetBytes() reader.populateData(msg)
reader.captureTime = msg.GetTimestamp()
if !reader.isProtocolIdentified() { if !reader.isProtocolIdentified() {
reader.msgBufferMaster = append(reader.msgBufferMaster, msg) reader.msgBufferMaster = append(
reader.msgBufferMaster,
NewTcpReaderDataMsg(msg.GetBytes(), msg.GetTimestamp()),
)
} }
} }

View File

@@ -71,10 +71,8 @@ func (t *tcpStream) SetProtocol(protocol *api.Protocol) {
t.protocol = protocol t.protocol = protocol
// Clean the buffers // Clean the buffers
t.client.msgBuffer = []api.TcpReaderDataMsg{} t.client.msgBufferMaster = make([]api.TcpReaderDataMsg, 0)
t.client.msgBufferMaster = []api.TcpReaderDataMsg{} t.server.msgBufferMaster = make([]api.TcpReaderDataMsg, 0)
t.server.msgBuffer = []api.TcpReaderDataMsg{}
t.server.msgBufferMaster = []api.TcpReaderDataMsg{}
} }
func (t *tcpStream) GetOrigin() api.Capture { func (t *tcpStream) GetOrigin() api.Capture {