mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-06-22 22:39:40 +00:00
Use *bytes.Buffer
instead of []api.TcpReaderDataMsg
#run_acceptance_tests
This commit is contained in:
parent
e7f3b020a3
commit
0fc70bffe2
@ -2,6 +2,7 @@ package tap
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
|
"bytes"
|
||||||
"io"
|
"io"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -21,9 +22,8 @@ type tcpReader struct {
|
|||||||
isClient bool
|
isClient bool
|
||||||
isOutgoing bool
|
isOutgoing bool
|
||||||
msgQueue chan api.TcpReaderDataMsg // Channel of captured reassembled tcp payload
|
msgQueue chan api.TcpReaderDataMsg // Channel of captured reassembled tcp payload
|
||||||
msgBuffer []api.TcpReaderDataMsg
|
msgBuffer *bytes.Buffer
|
||||||
msgBufferMaster []api.TcpReaderDataMsg
|
msgBufferMaster *bytes.Buffer
|
||||||
exhaustBuffer bool
|
|
||||||
data []byte
|
data []byte
|
||||||
progress *api.ReadProgress
|
progress *api.ReadProgress
|
||||||
captureTime time.Time
|
captureTime time.Time
|
||||||
@ -37,15 +37,17 @@ type tcpReader struct {
|
|||||||
|
|
||||||
func NewTcpReader(msgQueue chan api.TcpReaderDataMsg, progress *api.ReadProgress, ident string, tcpId *api.TcpID, captureTime time.Time, parent *tcpStream, isClient bool, isOutgoing bool, emitter api.Emitter) *tcpReader {
|
func NewTcpReader(msgQueue chan api.TcpReaderDataMsg, progress *api.ReadProgress, ident string, tcpId *api.TcpID, captureTime time.Time, parent *tcpStream, isClient bool, isOutgoing bool, emitter api.Emitter) *tcpReader {
|
||||||
return &tcpReader{
|
return &tcpReader{
|
||||||
msgQueue: msgQueue,
|
msgQueue: msgQueue,
|
||||||
progress: progress,
|
msgBuffer: bytes.NewBuffer(make([]byte, 0)),
|
||||||
ident: ident,
|
msgBufferMaster: bytes.NewBuffer(make([]byte, 0)),
|
||||||
tcpID: tcpId,
|
progress: progress,
|
||||||
captureTime: captureTime,
|
ident: ident,
|
||||||
parent: parent,
|
tcpID: tcpId,
|
||||||
isClient: isClient,
|
captureTime: captureTime,
|
||||||
isOutgoing: isOutgoing,
|
parent: parent,
|
||||||
emitter: emitter,
|
isClient: isClient,
|
||||||
|
isOutgoing: isOutgoing,
|
||||||
|
emitter: emitter,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -85,60 +87,32 @@ func (reader *tcpReader) isProtocolIdentified() bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (reader *tcpReader) rewind() {
|
func (reader *tcpReader) rewind() {
|
||||||
// Tell Read to exhaust the msgBuffer
|
|
||||||
reader.exhaustBuffer = true
|
|
||||||
|
|
||||||
// Reset the data and msgBuffer from the master record
|
// Reset the data and msgBuffer from the master record
|
||||||
reader.data = make([]byte, 0)
|
reader.data = make([]byte, 0)
|
||||||
reader.msgBuffer = make([]api.TcpReaderDataMsg, len(reader.msgBufferMaster))
|
buffer := reader.msgBufferMaster.Bytes()
|
||||||
copy(reader.msgBuffer, reader.msgBufferMaster)
|
reader.msgBuffer = bytes.NewBuffer(make([]byte, 0))
|
||||||
|
reader.msgBuffer.Write(buffer)
|
||||||
|
|
||||||
// Reset the read progress
|
// Reset the read progress
|
||||||
reader.progress.Reset()
|
reader.progress.Reset()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (reader *tcpReader) populateData(msg api.TcpReaderDataMsg) {
|
|
||||||
reader.data = msg.GetBytes()
|
|
||||||
reader.captureTime = msg.GetTimestamp()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (reader *tcpReader) Read(p []byte) (int, error) {
|
func (reader *tcpReader) Read(p []byte) (int, error) {
|
||||||
var msg api.TcpReaderDataMsg
|
if reader.msgBuffer.Len() > 0 {
|
||||||
|
reader.data = reader.msgBuffer.Bytes()
|
||||||
if reader.exhaustBuffer && len(reader.data) == 0 {
|
reader.msgBuffer = bytes.NewBuffer(make([]byte, 0))
|
||||||
if len(reader.msgBuffer) > 0 {
|
|
||||||
// Pop first message
|
|
||||||
if len(reader.msgBuffer) > 1 {
|
|
||||||
msg, reader.msgBuffer = reader.msgBuffer[0], reader.msgBuffer[1:]
|
|
||||||
} else {
|
|
||||||
msg = reader.msgBuffer[0]
|
|
||||||
reader.msgBuffer = make([]api.TcpReaderDataMsg, 0)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get the bytes
|
|
||||||
reader.populateData(msg)
|
|
||||||
|
|
||||||
// Set exhaustBuffer to false if we exhaust the msgBuffer
|
|
||||||
if len(reader.msgBuffer) == 0 {
|
|
||||||
reader.exhaustBuffer = false
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Buffer is empty
|
|
||||||
reader.exhaustBuffer = false
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var msg api.TcpReaderDataMsg
|
||||||
ok := true
|
ok := true
|
||||||
for ok && len(reader.data) == 0 {
|
for ok && len(reader.data) == 0 {
|
||||||
msg, ok = <-reader.msgQueue
|
msg, ok = <-reader.msgQueue
|
||||||
if msg != nil {
|
if msg != nil {
|
||||||
reader.populateData(msg)
|
reader.data = msg.GetBytes()
|
||||||
|
reader.captureTime = msg.GetTimestamp()
|
||||||
|
|
||||||
if !reader.isProtocolIdentified() {
|
if !reader.isProtocolIdentified() {
|
||||||
reader.msgBufferMaster = append(
|
reader.msgBufferMaster.Write(reader.data)
|
||||||
reader.msgBufferMaster,
|
|
||||||
NewTcpReaderDataMsg(msg.GetBytes(), msg.GetTimestamp()),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package tap
|
package tap
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -71,8 +72,8 @@ func (t *tcpStream) SetProtocol(protocol *api.Protocol) {
|
|||||||
t.protocol = protocol
|
t.protocol = protocol
|
||||||
|
|
||||||
// Clean the buffers
|
// Clean the buffers
|
||||||
t.client.msgBufferMaster = make([]api.TcpReaderDataMsg, 0)
|
t.client.msgBufferMaster = bytes.NewBuffer(make([]byte, 0))
|
||||||
t.server.msgBufferMaster = make([]api.TcpReaderDataMsg, 0)
|
t.server.msgBufferMaster = bytes.NewBuffer(make([]byte, 0))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *tcpStream) GetOrigin() api.Capture {
|
func (t *tcpStream) GetOrigin() api.Capture {
|
||||||
|
Loading…
Reference in New Issue
Block a user