mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-08-05 18:46:16 +00:00
* Spawn only two Goroutines per TCP stream * Fix the linter error * Use `isProtocolIdentified` method instead * Fix the `Read` method of `tcpReader` * Remove unnecessary `append` * Copy to buffer only a message is received * Remove `exhaustBuffer` field and add `rewind` function * Rename `buffer` field to `pastData` * Update tap/tcp_reader.go Co-authored-by: Nimrod Gilboa Markevich <59927337+nimrod-up9@users.noreply.github.com> * Use `copy` instead of assignment * No lint * #run_acceptance_tests * Fix `rewind` #run_acceptance_tests * Fix the buffering algorithm #run_acceptance_tests * Add `TODO` * Fix the problems in AMQP and Kafka #run_acceptance_tests * Use `*bytes.Buffer` instead of `[]api.TcpReaderDataMsg` #run_acceptance_tests * Have a single `*bytes.Buffer` * Revert "Have a single `*bytes.Buffer`" This reverts commitfad96a288a
. * Revert "Use `*bytes.Buffer` instead of `[]api.TcpReaderDataMsg` #run_acceptance_tests" This reverts commit0fc70bffe2
. * Fix the early timing out issue #run_acceptance_tests * Remove `NewBytes()` method * Update the `NewTcpReader` method signature #run_acceptance_tests * #run_acceptance_tests * #run_acceptance_tests * #run_acceptance_tests Co-authored-by: Nimrod Gilboa Markevich <59927337+nimrod-up9@users.noreply.github.com>
81 lines
1.8 KiB
Go
81 lines
1.8 KiB
Go
package kafka
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/up9inc/mizu/tap/api"
|
|
)
|
|
|
|
type tcpReader struct {
|
|
ident string
|
|
tcpID *api.TcpID
|
|
isClosed bool
|
|
isClient bool
|
|
isOutgoing bool
|
|
progress *api.ReadProgress
|
|
captureTime time.Time
|
|
parent api.TcpStream
|
|
extension *api.Extension
|
|
emitter api.Emitter
|
|
counterPair *api.CounterPair
|
|
reqResMatcher api.RequestResponseMatcher
|
|
sync.Mutex
|
|
}
|
|
|
|
func NewTcpReader(progress *api.ReadProgress, ident string, tcpId *api.TcpID, captureTime time.Time, parent api.TcpStream, isClient bool, isOutgoing bool, extension *api.Extension, emitter api.Emitter, counterPair *api.CounterPair, reqResMatcher api.RequestResponseMatcher) api.TcpReader {
|
|
return &tcpReader{
|
|
progress: progress,
|
|
ident: ident,
|
|
tcpID: tcpId,
|
|
captureTime: captureTime,
|
|
parent: parent,
|
|
isClient: isClient,
|
|
isOutgoing: isOutgoing,
|
|
extension: extension,
|
|
emitter: emitter,
|
|
counterPair: counterPair,
|
|
reqResMatcher: reqResMatcher,
|
|
}
|
|
}
|
|
|
|
func (reader *tcpReader) Read(p []byte) (int, error) {
|
|
return 0, nil
|
|
}
|
|
|
|
func (reader *tcpReader) GetReqResMatcher() api.RequestResponseMatcher {
|
|
return reader.reqResMatcher
|
|
}
|
|
|
|
func (reader *tcpReader) GetIsClient() bool {
|
|
return reader.isClient
|
|
}
|
|
|
|
func (reader *tcpReader) GetReadProgress() *api.ReadProgress {
|
|
return reader.progress
|
|
}
|
|
|
|
func (reader *tcpReader) GetParent() api.TcpStream {
|
|
return reader.parent
|
|
}
|
|
|
|
func (reader *tcpReader) GetTcpID() *api.TcpID {
|
|
return reader.tcpID
|
|
}
|
|
|
|
func (reader *tcpReader) GetCounterPair() *api.CounterPair {
|
|
return reader.counterPair
|
|
}
|
|
|
|
func (reader *tcpReader) GetCaptureTime() time.Time {
|
|
return reader.captureTime
|
|
}
|
|
|
|
func (reader *tcpReader) GetEmitter() api.Emitter {
|
|
return reader.emitter
|
|
}
|
|
|
|
func (reader *tcpReader) GetIsClosed() bool {
|
|
return reader.isClosed
|
|
}
|