mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-09-16 23:03:51 +00:00
Fix the buffering algorithm #run_acceptance_tests
This commit is contained in:
@@ -118,6 +118,11 @@ func (p *ReadProgress) Current() (n int) {
|
||||
return p.lastCurrent
|
||||
}
|
||||
|
||||
func (p *ReadProgress) Reset() {
|
||||
p.readBytes = 0
|
||||
p.lastCurrent = 0
|
||||
}
|
||||
|
||||
type Dissector interface {
|
||||
Register(*Extension)
|
||||
Ping()
|
||||
|
@@ -15,21 +15,23 @@ import (
|
||||
* Implements io.Reader interface (Read)
|
||||
*/
|
||||
type tcpReader struct {
|
||||
ident string
|
||||
tcpID *api.TcpID
|
||||
isClosed bool
|
||||
isClient bool
|
||||
isOutgoing bool
|
||||
msgQueue chan api.TcpReaderDataMsg // Channel of captured reassembled tcp payload
|
||||
pastData []byte
|
||||
data []byte
|
||||
progress *api.ReadProgress
|
||||
captureTime time.Time
|
||||
parent *tcpStream
|
||||
packetsSeen uint
|
||||
emitter api.Emitter
|
||||
counterPair *api.CounterPair
|
||||
reqResMatcher api.RequestResponseMatcher
|
||||
ident string
|
||||
tcpID *api.TcpID
|
||||
isClosed bool
|
||||
isClient bool
|
||||
isOutgoing bool
|
||||
msgQueue chan api.TcpReaderDataMsg // Channel of captured reassembled tcp payload
|
||||
msgBuffer []api.TcpReaderDataMsg
|
||||
msgBufferMaster []api.TcpReaderDataMsg
|
||||
exhaustBuffer bool
|
||||
data []byte
|
||||
progress *api.ReadProgress
|
||||
captureTime time.Time
|
||||
parent *tcpStream
|
||||
packetsSeen uint
|
||||
emitter api.Emitter
|
||||
counterPair *api.CounterPair
|
||||
reqResMatcher api.RequestResponseMatcher
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
@@ -83,13 +85,40 @@ func (reader *tcpReader) isProtocolIdentified() bool {
|
||||
}
|
||||
|
||||
func (reader *tcpReader) rewind() {
|
||||
reader.data = make([]byte, len(reader.pastData))
|
||||
copy(reader.data, reader.pastData)
|
||||
// Tell Read to exhaust the msgBuffer
|
||||
reader.exhaustBuffer = true
|
||||
|
||||
// Reset the data and msgBuffer from the master record
|
||||
reader.data = []byte{}
|
||||
reader.msgBuffer = make([]api.TcpReaderDataMsg, len(reader.msgBufferMaster))
|
||||
copy(reader.msgBuffer, reader.msgBufferMaster)
|
||||
|
||||
// Reset the read progress
|
||||
reader.progress.Reset()
|
||||
}
|
||||
|
||||
func (reader *tcpReader) Read(p []byte) (int, error) {
|
||||
var msg api.TcpReaderDataMsg
|
||||
|
||||
if reader.exhaustBuffer && len(reader.data) == 0 {
|
||||
if len(reader.msgBuffer) > 0 {
|
||||
// Pop first message
|
||||
msg, reader.msgBuffer = reader.msgBuffer[0], reader.msgBuffer[1:]
|
||||
|
||||
// Get the bytes
|
||||
reader.data = msg.GetBytes()
|
||||
reader.captureTime = msg.GetTimestamp()
|
||||
|
||||
// Set exhaustBuffer to false if we exhaust the msgBuffer
|
||||
if len(reader.msgBuffer) == 0 {
|
||||
reader.exhaustBuffer = false
|
||||
}
|
||||
} else {
|
||||
// Buffer is empty
|
||||
reader.exhaustBuffer = false
|
||||
}
|
||||
}
|
||||
|
||||
ok := true
|
||||
for ok && len(reader.data) == 0 {
|
||||
msg, ok = <-reader.msgQueue
|
||||
@@ -97,7 +126,7 @@ func (reader *tcpReader) Read(p []byte) (int, error) {
|
||||
reader.data = msg.GetBytes()
|
||||
reader.captureTime = msg.GetTimestamp()
|
||||
if !reader.isProtocolIdentified() {
|
||||
reader.pastData = append(reader.pastData, reader.data...)
|
||||
reader.msgBufferMaster = append(reader.msgBufferMaster, msg)
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -69,8 +69,12 @@ func (t *tcpStream) addReqResMatcher(reqResMatcher api.RequestResponseMatcher) {
|
||||
|
||||
func (t *tcpStream) SetProtocol(protocol *api.Protocol) {
|
||||
t.protocol = protocol
|
||||
t.client.pastData = []byte{}
|
||||
t.server.pastData = []byte{}
|
||||
|
||||
// Clean the buffers
|
||||
t.client.msgBuffer = []api.TcpReaderDataMsg{}
|
||||
t.client.msgBufferMaster = []api.TcpReaderDataMsg{}
|
||||
t.server.msgBuffer = []api.TcpReaderDataMsg{}
|
||||
t.server.msgBufferMaster = []api.TcpReaderDataMsg{}
|
||||
}
|
||||
|
||||
func (t *tcpStream) GetOrigin() api.Capture {
|
||||
|
Reference in New Issue
Block a user