mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-06-23 06:48:47 +00:00
156 lines
3.6 KiB
Go
156 lines
3.6 KiB
Go
package tap
|
|
|
|
import (
|
|
"os"
|
|
"bufio"
|
|
"io"
|
|
"io/ioutil"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/up9inc/mizu/logger"
|
|
"github.com/up9inc/mizu/tap/api"
|
|
)
|
|
|
|
/* TcpReader gets reads from a channel of bytes of tcp payload, and parses it into requests and responses.
|
|
* The payload is written to the channel by a tcpStream object that is dedicated to one tcp connection.
|
|
* An TcpReader object is unidirectional: it parses either a client stream or a server stream.
|
|
* Implements io.Reader interface (Read)
|
|
*/
|
|
type tcpReader struct {
|
|
ident string
|
|
tcpID *api.TcpID
|
|
isClosed bool
|
|
isClient bool
|
|
isOutgoing bool
|
|
msgQueue chan api.TcpReaderDataMsg // Channel of captured reassembled tcp payload
|
|
data []byte
|
|
progress *api.ReadProgress
|
|
captureTime time.Time
|
|
parent *tcpStream
|
|
packetsSeen uint
|
|
extension *api.Extension
|
|
emitter api.Emitter
|
|
counterPair *api.CounterPair
|
|
reqResMatcher api.RequestResponseMatcher
|
|
sync.Mutex
|
|
}
|
|
|
|
func NewTcpReader(msgQueue chan api.TcpReaderDataMsg, progress *api.ReadProgress, ident string, tcpId *api.TcpID, captureTime time.Time, parent *tcpStream, isClient bool, isOutgoing bool, extension *api.Extension, emitter api.Emitter, counterPair *api.CounterPair, reqResMatcher api.RequestResponseMatcher) *tcpReader {
|
|
return &tcpReader{
|
|
msgQueue: msgQueue,
|
|
progress: progress,
|
|
ident: ident,
|
|
tcpID: tcpId,
|
|
captureTime: captureTime,
|
|
parent: parent,
|
|
isClient: isClient,
|
|
isOutgoing: isOutgoing,
|
|
extension: extension,
|
|
emitter: emitter,
|
|
counterPair: counterPair,
|
|
reqResMatcher: reqResMatcher,
|
|
}
|
|
}
|
|
|
|
func (reader *tcpReader) run(options *api.TrafficFilteringOptions, wg *sync.WaitGroup) {
|
|
defer wg.Done()
|
|
b := bufio.NewReader(reader)
|
|
|
|
if os.Getenv("MIZU_TAPPER_NO_DISSECTORS") == "true" {
|
|
io.ReadAll(b)
|
|
return
|
|
}
|
|
|
|
err := reader.extension.Dissector.Dissect(b, reader, options)
|
|
if err != nil {
|
|
_, err = io.Copy(ioutil.Discard, reader)
|
|
if err != nil {
|
|
logger.Log.Errorf("%v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (reader *tcpReader) close() {
|
|
reader.Lock()
|
|
if !reader.isClosed {
|
|
reader.isClosed = true
|
|
close(reader.msgQueue)
|
|
}
|
|
reader.Unlock()
|
|
}
|
|
|
|
func (reader *tcpReader) sendMsgIfNotClosed(msg api.TcpReaderDataMsg) {
|
|
reader.Lock()
|
|
if !reader.isClosed {
|
|
reader.msgQueue <- msg
|
|
}
|
|
reader.Unlock()
|
|
}
|
|
|
|
func (reader *tcpReader) Read(p []byte) (int, error) {
|
|
var msg api.TcpReaderDataMsg
|
|
|
|
ok := true
|
|
for ok && len(reader.data) == 0 {
|
|
msg, ok = <-reader.msgQueue
|
|
if msg != nil {
|
|
reader.data = msg.GetBytes()
|
|
reader.captureTime = msg.GetTimestamp()
|
|
}
|
|
|
|
if len(reader.data) > 0 {
|
|
reader.packetsSeen += 1
|
|
}
|
|
}
|
|
if !ok || len(reader.data) == 0 {
|
|
return 0, io.EOF
|
|
}
|
|
|
|
l := copy(p, reader.data)
|
|
reader.data = reader.data[l:]
|
|
reader.progress.Feed(l)
|
|
|
|
return l, nil
|
|
}
|
|
|
|
func (reader *tcpReader) GetReqResMatcher() api.RequestResponseMatcher {
|
|
return reader.reqResMatcher
|
|
}
|
|
|
|
func (reader *tcpReader) GetIsClient() bool {
|
|
return reader.isClient
|
|
}
|
|
|
|
func (reader *tcpReader) GetReadProgress() *api.ReadProgress {
|
|
return reader.progress
|
|
}
|
|
|
|
func (reader *tcpReader) GetParent() api.TcpStream {
|
|
return reader.parent
|
|
}
|
|
|
|
func (reader *tcpReader) GetTcpID() *api.TcpID {
|
|
return reader.tcpID
|
|
}
|
|
|
|
func (reader *tcpReader) GetCounterPair() *api.CounterPair {
|
|
return reader.counterPair
|
|
}
|
|
|
|
func (reader *tcpReader) GetCaptureTime() time.Time {
|
|
return reader.captureTime
|
|
}
|
|
|
|
func (reader *tcpReader) GetEmitter() api.Emitter {
|
|
return reader.emitter
|
|
}
|
|
|
|
func (reader *tcpReader) GetIsClosed() bool {
|
|
return reader.isClosed
|
|
}
|
|
|
|
func (reader *tcpReader) GetExtension() *api.Extension {
|
|
return reader.extension
|
|
}
|