Change the Dissect method signature to have *TcpReader as an argument

This commit is contained in:
M. Mert Yildiran 2022-04-20 13:06:46 +03:00
parent 960ba644cd
commit ea85b0b082
No known key found for this signature in database
GPG Key ID: D42ADB236521BF7A
11 changed files with 209 additions and 98 deletions

View File

@ -132,7 +132,7 @@ func (p *ReadProgress) Current() (n int) {
type Dissector interface { type Dissector interface {
Register(*Extension) Register(*Extension)
Ping() Ping()
Dissect(b *bufio.Reader, progress *ReadProgress, capture Capture, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *shared.TrafficFilteringOptions, reqResMatcher RequestResponseMatcher) error Dissect(b *bufio.Reader, reader *TcpReader, options *shared.TrafficFilteringOptions) error
Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *Entry Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *Entry
Summarize(entry *Entry) *BaseEntry Summarize(entry *Entry) *BaseEntry
Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, err error) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, err error)

View File

@ -40,45 +40,45 @@ type TcpReader struct {
sync.Mutex sync.Mutex
} }
func (h *TcpReader) Read(p []byte) (int, error) { func (reader *TcpReader) Read(p []byte) (int, error) {
var msg TcpReaderDataMsg var msg TcpReaderDataMsg
ok := true ok := true
for ok && len(h.data) == 0 { for ok && len(reader.data) == 0 {
msg, ok = <-h.MsgQueue msg, ok = <-reader.MsgQueue
h.data = msg.bytes reader.data = msg.bytes
h.SuperTimer.CaptureTime = msg.timestamp reader.SuperTimer.CaptureTime = msg.timestamp
if len(h.data) > 0 { if len(reader.data) > 0 {
h.packetsSeen += 1 reader.packetsSeen += 1
} }
} }
if !ok || len(h.data) == 0 { if !ok || len(reader.data) == 0 {
return 0, io.EOF return 0, io.EOF
} }
l := copy(p, h.data) l := copy(p, reader.data)
h.data = h.data[l:] reader.data = reader.data[l:]
h.Progress.Feed(l) reader.Progress.Feed(l)
return l, nil return l, nil
} }
func (h *TcpReader) Close() { func (reader *TcpReader) Close() {
h.Lock() reader.Lock()
if !h.isClosed { if !reader.isClosed {
h.isClosed = true reader.isClosed = true
close(h.MsgQueue) close(reader.MsgQueue)
} }
h.Unlock() reader.Unlock()
} }
func (h *TcpReader) Run(filteringOptions *shared.TrafficFilteringOptions, wg *sync.WaitGroup) { func (reader *TcpReader) Run(options *shared.TrafficFilteringOptions, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
b := bufio.NewReader(h) b := bufio.NewReader(reader)
err := h.Extension.Dissector.Dissect(b, h.Progress, h.Parent.Origin, h.IsClient, h.TcpID, h.CounterPair, h.SuperTimer, h.Parent.SuperIdentifier, h.Emitter, filteringOptions, h.ReqResMatcher) err := reader.Extension.Dissector.Dissect(b, reader, options)
if err != nil { if err != nil {
_, err = io.Copy(ioutil.Discard, b) _, err = io.Copy(ioutil.Discard, reader)
if err != nil { if err != nil {
logger.Log.Errorf("%v", err) logger.Log.Errorf("%v", err)
} }

View File

@ -40,17 +40,17 @@ func (d dissecting) Ping() {
const amqpRequest string = "amqp_request" const amqpRequest string = "amqp_request"
func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *shared.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error { func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *shared.TrafficFilteringOptions) error {
r := AmqpReader{b} r := AmqpReader{b}
var remaining int var remaining int
var header *HeaderFrame var header *HeaderFrame
connectionInfo := &api.ConnectionInfo{ connectionInfo := &api.ConnectionInfo{
ClientIP: tcpID.SrcIP, ClientIP: reader.TcpID.SrcIP,
ClientPort: tcpID.SrcPort, ClientPort: reader.TcpID.SrcPort,
ServerIP: tcpID.DstIP, ServerIP: reader.TcpID.DstIP,
ServerPort: tcpID.DstPort, ServerPort: reader.TcpID.DstPort,
IsOutgoing: true, IsOutgoing: true,
} }
@ -76,7 +76,7 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture
var lastMethodFrameMessage Message var lastMethodFrameMessage Message
for { for {
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &protocol { if reader.Parent.SuperIdentifier.Protocol != nil && reader.Parent.SuperIdentifier.Protocol != &protocol {
return errors.New("Identified by another protocol") return errors.New("Identified by another protocol")
} }
@ -113,12 +113,12 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture
switch lastMethodFrameMessage.(type) { switch lastMethodFrameMessage.(type) {
case *BasicPublish: case *BasicPublish:
eventBasicPublish.Body = f.Body eventBasicPublish.Body = f.Body
superIdentifier.Protocol = &protocol reader.Parent.SuperIdentifier.Protocol = &protocol
emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture) emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin)
case *BasicDeliver: case *BasicDeliver:
eventBasicDeliver.Body = f.Body eventBasicDeliver.Body = f.Body
superIdentifier.Protocol = &protocol reader.Parent.SuperIdentifier.Protocol = &protocol
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture) emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin)
} }
case *MethodFrame: case *MethodFrame:
@ -138,8 +138,8 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture
NoWait: m.NoWait, NoWait: m.NoWait,
Arguments: m.Arguments, Arguments: m.Arguments,
} }
superIdentifier.Protocol = &protocol reader.Parent.SuperIdentifier.Protocol = &protocol
emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture) emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin)
case *BasicConsume: case *BasicConsume:
eventBasicConsume := &BasicConsume{ eventBasicConsume := &BasicConsume{
@ -151,8 +151,8 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture
NoWait: m.NoWait, NoWait: m.NoWait,
Arguments: m.Arguments, Arguments: m.Arguments,
} }
superIdentifier.Protocol = &protocol reader.Parent.SuperIdentifier.Protocol = &protocol
emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture) emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin)
case *BasicDeliver: case *BasicDeliver:
eventBasicDeliver.ConsumerTag = m.ConsumerTag eventBasicDeliver.ConsumerTag = m.ConsumerTag
@ -171,8 +171,8 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture
NoWait: m.NoWait, NoWait: m.NoWait,
Arguments: m.Arguments, Arguments: m.Arguments,
} }
superIdentifier.Protocol = &protocol reader.Parent.SuperIdentifier.Protocol = &protocol
emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture) emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin)
case *ExchangeDeclare: case *ExchangeDeclare:
eventExchangeDeclare := &ExchangeDeclare{ eventExchangeDeclare := &ExchangeDeclare{
@ -185,8 +185,8 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture
NoWait: m.NoWait, NoWait: m.NoWait,
Arguments: m.Arguments, Arguments: m.Arguments,
} }
superIdentifier.Protocol = &protocol reader.Parent.SuperIdentifier.Protocol = &protocol
emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture) emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin)
case *ConnectionStart: case *ConnectionStart:
eventConnectionStart := &ConnectionStart{ eventConnectionStart := &ConnectionStart{
@ -196,8 +196,8 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture
Mechanisms: m.Mechanisms, Mechanisms: m.Mechanisms,
Locales: m.Locales, Locales: m.Locales,
} }
superIdentifier.Protocol = &protocol reader.Parent.SuperIdentifier.Protocol = &protocol
emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture) emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin)
case *ConnectionClose: case *ConnectionClose:
eventConnectionClose := &ConnectionClose{ eventConnectionClose := &ConnectionClose{
@ -206,8 +206,8 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture
ClassId: m.ClassId, ClassId: m.ClassId,
MethodId: m.MethodId, MethodId: m.MethodId,
} }
superIdentifier.Protocol = &protocol reader.Parent.SuperIdentifier.Protocol = &protocol
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture) emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin)
} }
default: default:

View File

@ -17,6 +17,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/tap/api" "github.com/up9inc/mizu/tap/api"
"github.com/up9inc/mizu/tap/api/diagnose"
) )
const ( const (
@ -84,7 +85,7 @@ func TestDissect(t *testing.T) {
// Channel to verify the output // Channel to verify the output
itemChannel := make(chan *api.OutputChannelItem) itemChannel := make(chan *api.OutputChannelItem)
var emitter api.Emitter = &api.Emitting{ var emitter api.Emitter = &api.Emitting{
AppStats: &api.AppStats{}, AppStats: &diagnose.AppStats{},
OutputChannel: itemChannel, OutputChannel: itemChannel,
} }
@ -123,7 +124,19 @@ func TestDissect(t *testing.T) {
DstPort: "2", DstPort: "2",
} }
reqResMatcher := dissector.NewResponseRequestMatcher() reqResMatcher := dissector.NewResponseRequestMatcher()
err = dissector.Dissect(bufferClient, &api.ReadProgress{}, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) reader := &api.TcpReader{
Progress: &api.ReadProgress{},
Parent: &api.TcpStream{
Origin: api.Pcap,
SuperIdentifier: superIdentifier,
},
IsClient: true,
TcpID: tcpIDClient,
SuperTimer: &api.SuperTimer{},
Emitter: emitter,
ReqResMatcher: reqResMatcher,
}
err = dissector.Dissect(bufferClient, reader, options)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err) panic(err)
} }
@ -141,7 +154,19 @@ func TestDissect(t *testing.T) {
SrcPort: "2", SrcPort: "2",
DstPort: "1", DstPort: "1",
} }
err = dissector.Dissect(bufferServer, &api.ReadProgress{}, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) reader = &api.TcpReader{
Progress: &api.ReadProgress{},
Parent: &api.TcpStream{
Origin: api.Pcap,
SuperIdentifier: superIdentifier,
},
IsClient: false,
TcpID: tcpIDServer,
SuperTimer: &api.SuperTimer{},
Emitter: emitter,
ReqResMatcher: reqResMatcher,
}
err = dissector.Dissect(bufferServer, reader, options)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err) panic(err)
} }

View File

@ -87,15 +87,15 @@ func (d dissecting) Ping() {
log.Printf("pong %s", http11protocol.Name) log.Printf("pong %s", http11protocol.Name)
} }
func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *shared.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error { func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *shared.TrafficFilteringOptions) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher) reqResMatcher := reader.ReqResMatcher.(*requestResponseMatcher)
var err error var err error
isHTTP2, _ := checkIsHTTP2Connection(b, isClient) isHTTP2, _ := checkIsHTTP2Connection(b, reader.IsClient)
var http2Assembler *Http2Assembler var http2Assembler *Http2Assembler
if isHTTP2 { if isHTTP2 {
err = prepareHTTP2Connection(b, isClient) err = prepareHTTP2Connection(b, reader.IsClient)
if err != nil { if err != nil {
return err return err
} }
@ -106,74 +106,74 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture
for { for {
if switchingProtocolsHTTP2 { if switchingProtocolsHTTP2 {
switchingProtocolsHTTP2 = false switchingProtocolsHTTP2 = false
isHTTP2, err = checkIsHTTP2Connection(b, isClient) isHTTP2, err = checkIsHTTP2Connection(b, reader.IsClient)
if err != nil { if err != nil {
break break
} }
err = prepareHTTP2Connection(b, isClient) err = prepareHTTP2Connection(b, reader.IsClient)
if err != nil { if err != nil {
break break
} }
http2Assembler = createHTTP2Assembler(b) http2Assembler = createHTTP2Assembler(b)
} }
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &http11protocol { if reader.Parent.SuperIdentifier.Protocol != nil && reader.Parent.SuperIdentifier.Protocol != &http11protocol {
return errors.New("Identified by another protocol") return errors.New("Identified by another protocol")
} }
if isHTTP2 { if isHTTP2 {
err = handleHTTP2Stream(http2Assembler, progress, capture, tcpID, superTimer, emitter, options, reqResMatcher) err = handleHTTP2Stream(http2Assembler, reader.Progress, reader.Parent.Origin, reader.TcpID, reader.SuperTimer, reader.Emitter, options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF { if err == io.EOF || err == io.ErrUnexpectedEOF {
break break
} else if err != nil { } else if err != nil {
continue continue
} }
superIdentifier.Protocol = &http11protocol reader.Parent.SuperIdentifier.Protocol = &http11protocol
} else if isClient { } else if reader.IsClient {
var req *http.Request var req *http.Request
switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, progress, capture, tcpID, counterPair, superTimer, emitter, options, reqResMatcher) switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, reader.Progress, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.SuperTimer, reader.Emitter, options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF { if err == io.EOF || err == io.ErrUnexpectedEOF {
break break
} else if err != nil { } else if err != nil {
continue continue
} }
superIdentifier.Protocol = &http11protocol reader.Parent.SuperIdentifier.Protocol = &http11protocol
// In case of an HTTP2 upgrade, duplicate the HTTP1 request into HTTP2 with stream ID 1 // In case of an HTTP2 upgrade, duplicate the HTTP1 request into HTTP2 with stream ID 1
if switchingProtocolsHTTP2 { if switchingProtocolsHTTP2 {
ident := fmt.Sprintf( ident := fmt.Sprintf(
"%s_%s_%s_%s_1_%s", "%s_%s_%s_%s_1_%s",
tcpID.SrcIP, reader.TcpID.SrcIP,
tcpID.DstIP, reader.TcpID.DstIP,
tcpID.SrcPort, reader.TcpID.SrcPort,
tcpID.DstPort, reader.TcpID.DstPort,
"HTTP2", "HTTP2",
) )
item := reqResMatcher.registerRequest(ident, req, superTimer.CaptureTime, progress.Current(), req.ProtoMinor) item := reqResMatcher.registerRequest(ident, req, reader.SuperTimer.CaptureTime, reader.Progress.Current(), req.ProtoMinor)
if item != nil { if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{ item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.SrcIP, ClientIP: reader.TcpID.SrcIP,
ClientPort: tcpID.SrcPort, ClientPort: reader.TcpID.SrcPort,
ServerIP: tcpID.DstIP, ServerIP: reader.TcpID.DstIP,
ServerPort: tcpID.DstPort, ServerPort: reader.TcpID.DstPort,
IsOutgoing: true, IsOutgoing: true,
} }
item.Capture = capture item.Capture = reader.Parent.Origin
filterAndEmit(item, emitter, options) filterAndEmit(item, reader.Emitter, options)
} }
} }
} else { } else {
switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, progress, capture, tcpID, counterPair, superTimer, emitter, options, reqResMatcher) switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, reader.Progress, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.SuperTimer, reader.Emitter, options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF { if err == io.EOF || err == io.ErrUnexpectedEOF {
break break
} else if err != nil { } else if err != nil {
continue continue
} }
superIdentifier.Protocol = &http11protocol reader.Parent.SuperIdentifier.Protocol = &http11protocol
} }
} }
if superIdentifier.Protocol == nil { if reader.Parent.SuperIdentifier.Protocol == nil {
return err return err
} }

View File

@ -17,6 +17,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/tap/api" "github.com/up9inc/mizu/tap/api"
"github.com/up9inc/mizu/tap/api/diagnose"
) )
const ( const (
@ -86,7 +87,7 @@ func TestDissect(t *testing.T) {
// Channel to verify the output // Channel to verify the output
itemChannel := make(chan *api.OutputChannelItem) itemChannel := make(chan *api.OutputChannelItem)
var emitter api.Emitter = &api.Emitting{ var emitter api.Emitter = &api.Emitting{
AppStats: &api.AppStats{}, AppStats: &diagnose.AppStats{},
OutputChannel: itemChannel, OutputChannel: itemChannel,
} }
@ -125,7 +126,19 @@ func TestDissect(t *testing.T) {
DstPort: "2", DstPort: "2",
} }
reqResMatcher := dissector.NewResponseRequestMatcher() reqResMatcher := dissector.NewResponseRequestMatcher()
err = dissector.Dissect(bufferClient, &api.ReadProgress{}, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) reader := &api.TcpReader{
Progress: &api.ReadProgress{},
Parent: &api.TcpStream{
Origin: api.Pcap,
SuperIdentifier: superIdentifier,
},
IsClient: true,
TcpID: tcpIDClient,
SuperTimer: &api.SuperTimer{},
Emitter: emitter,
ReqResMatcher: reqResMatcher,
}
err = dissector.Dissect(bufferClient, reader, options)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err) panic(err)
} }
@ -143,7 +156,19 @@ func TestDissect(t *testing.T) {
SrcPort: "2", SrcPort: "2",
DstPort: "1", DstPort: "1",
} }
err = dissector.Dissect(bufferServer, &api.ReadProgress{}, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) reader = &api.TcpReader{
Progress: &api.ReadProgress{},
Parent: &api.TcpStream{
Origin: api.Pcap,
SuperIdentifier: superIdentifier,
},
IsClient: false,
TcpID: tcpIDServer,
SuperTimer: &api.SuperTimer{},
Emitter: emitter,
ReqResMatcher: reqResMatcher,
}
err = dissector.Dissect(bufferServer, reader, options)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err) panic(err)
} }

View File

@ -36,25 +36,25 @@ func (d dissecting) Ping() {
log.Printf("pong %s", _protocol.Name) log.Printf("pong %s", _protocol.Name)
} }
func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *shared.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error { func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *shared.TrafficFilteringOptions) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher) reqResMatcher := reader.ReqResMatcher.(*requestResponseMatcher)
for { for {
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &_protocol { if reader.Parent.SuperIdentifier.Protocol != nil && reader.Parent.SuperIdentifier.Protocol != &_protocol {
return errors.New("Identified by another protocol") return errors.New("Identified by another protocol")
} }
if isClient { if reader.IsClient {
_, _, err := ReadRequest(b, tcpID, counterPair, superTimer, reqResMatcher) _, _, err := ReadRequest(b, reader.TcpID, reader.CounterPair, reader.SuperTimer, reqResMatcher)
if err != nil { if err != nil {
return err return err
} }
superIdentifier.Protocol = &_protocol reader.Parent.SuperIdentifier.Protocol = &_protocol
} else { } else {
err := ReadResponse(b, capture, tcpID, counterPair, superTimer, emitter, reqResMatcher) err := ReadResponse(b, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.SuperTimer, reader.Emitter, reqResMatcher)
if err != nil { if err != nil {
return err return err
} }
superIdentifier.Protocol = &_protocol reader.Parent.SuperIdentifier.Protocol = &_protocol
} }
} }
} }

View File

@ -17,6 +17,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/tap/api" "github.com/up9inc/mizu/tap/api"
"github.com/up9inc/mizu/tap/api/diagnose"
) )
const ( const (
@ -84,7 +85,7 @@ func TestDissect(t *testing.T) {
// Channel to verify the output // Channel to verify the output
itemChannel := make(chan *api.OutputChannelItem) itemChannel := make(chan *api.OutputChannelItem)
var emitter api.Emitter = &api.Emitting{ var emitter api.Emitter = &api.Emitting{
AppStats: &api.AppStats{}, AppStats: &diagnose.AppStats{},
OutputChannel: itemChannel, OutputChannel: itemChannel,
} }
@ -124,7 +125,19 @@ func TestDissect(t *testing.T) {
} }
reqResMatcher := dissector.NewResponseRequestMatcher() reqResMatcher := dissector.NewResponseRequestMatcher()
reqResMatcher.SetMaxTry(10) reqResMatcher.SetMaxTry(10)
err = dissector.Dissect(bufferClient, &api.ReadProgress{}, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) reader := &api.TcpReader{
Progress: &api.ReadProgress{},
Parent: &api.TcpStream{
Origin: api.Pcap,
SuperIdentifier: superIdentifier,
},
IsClient: true,
TcpID: tcpIDClient,
SuperTimer: &api.SuperTimer{},
Emitter: emitter,
ReqResMatcher: reqResMatcher,
}
err = dissector.Dissect(bufferClient, reader, options)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println(err) log.Println(err)
} }
@ -142,7 +155,19 @@ func TestDissect(t *testing.T) {
SrcPort: "2", SrcPort: "2",
DstPort: "1", DstPort: "1",
} }
err = dissector.Dissect(bufferServer, &api.ReadProgress{}, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) reader = &api.TcpReader{
Progress: &api.ReadProgress{},
Parent: &api.TcpStream{
Origin: api.Pcap,
SuperIdentifier: superIdentifier,
},
IsClient: false,
TcpID: tcpIDServer,
SuperTimer: &api.SuperTimer{},
Emitter: emitter,
ReqResMatcher: reqResMatcher,
}
err = dissector.Dissect(bufferServer, reader, options)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println(err) log.Println(err)
} }

View File

@ -35,8 +35,8 @@ func (d dissecting) Ping() {
log.Printf("pong %s", protocol.Name) log.Printf("pong %s", protocol.Name)
} }
func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *shared.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error { func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *shared.TrafficFilteringOptions) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher) reqResMatcher := reader.ReqResMatcher.(*requestResponseMatcher)
is := &RedisInputStream{ is := &RedisInputStream{
Reader: b, Reader: b,
Buf: make([]byte, 8192), Buf: make([]byte, 8192),
@ -48,10 +48,10 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture
return err return err
} }
if isClient { if reader.IsClient {
err = handleClientStream(progress, capture, tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher) err = handleClientStream(reader.Progress, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.SuperTimer, reader.Emitter, redisPacket, reqResMatcher)
} else { } else {
err = handleServerStream(progress, capture, tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher) err = handleServerStream(reader.Progress, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.SuperTimer, reader.Emitter, redisPacket, reqResMatcher)
} }
if err != nil { if err != nil {

View File

@ -18,6 +18,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/tap/api" "github.com/up9inc/mizu/tap/api"
"github.com/up9inc/mizu/tap/api/diagnose"
) )
const ( const (
@ -85,7 +86,7 @@ func TestDissect(t *testing.T) {
// Channel to verify the output // Channel to verify the output
itemChannel := make(chan *api.OutputChannelItem) itemChannel := make(chan *api.OutputChannelItem)
var emitter api.Emitter = &api.Emitting{ var emitter api.Emitter = &api.Emitting{
AppStats: &api.AppStats{}, AppStats: &diagnose.AppStats{},
OutputChannel: itemChannel, OutputChannel: itemChannel,
} }
@ -124,7 +125,19 @@ func TestDissect(t *testing.T) {
DstPort: "2", DstPort: "2",
} }
reqResMatcher := dissector.NewResponseRequestMatcher() reqResMatcher := dissector.NewResponseRequestMatcher()
err = dissector.Dissect(bufferClient, &api.ReadProgress{}, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) reader := &api.TcpReader{
Progress: &api.ReadProgress{},
Parent: &api.TcpStream{
Origin: api.Pcap,
SuperIdentifier: superIdentifier,
},
IsClient: true,
TcpID: tcpIDClient,
SuperTimer: &api.SuperTimer{},
Emitter: emitter,
ReqResMatcher: reqResMatcher,
}
err = dissector.Dissect(bufferClient, reader, options)
if err != nil && reflect.TypeOf(err) != reflect.TypeOf(&ConnectError{}) && err != io.EOF && err != io.ErrUnexpectedEOF { if err != nil && reflect.TypeOf(err) != reflect.TypeOf(&ConnectError{}) && err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println(err) log.Println(err)
} }
@ -142,7 +155,19 @@ func TestDissect(t *testing.T) {
SrcPort: "2", SrcPort: "2",
DstPort: "1", DstPort: "1",
} }
err = dissector.Dissect(bufferServer, &api.ReadProgress{}, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) reader = &api.TcpReader{
Progress: &api.ReadProgress{},
Parent: &api.TcpStream{
Origin: api.Pcap,
SuperIdentifier: superIdentifier,
},
IsClient: false,
TcpID: tcpIDServer,
SuperTimer: &api.SuperTimer{},
Emitter: emitter,
ReqResMatcher: reqResMatcher,
}
err = dissector.Dissect(bufferServer, reader, options)
if err != nil && reflect.TypeOf(err) != reflect.TypeOf(&ConnectError{}) && err != io.EOF && err != io.ErrUnexpectedEOF { if err != nil && reflect.TypeOf(err) != reflect.TypeOf(&ConnectError{}) && err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println(err) log.Println(err)
} }

View File

@ -149,7 +149,6 @@ func (p *tlsPoller) startNewTlsReader(chunk *tlsChunk, ip net.IP, port uint16, k
doneHandler: func(r *tlsReader) { doneHandler: func(r *tlsReader) {
p.closeReader(key, r) p.closeReader(key, r)
}, },
progress: &api.ReadProgress{},
} }
tcpid := p.buildTcpId(chunk, ip, port) tcpid := p.buildTcpId(chunk, ip, port)
@ -167,8 +166,20 @@ func dissect(extension *api.Extension, reader *tlsReader, isRequest bool, tcpid
tlsEmitter *tlsEmitter, options *shared.TrafficFilteringOptions, reqResMatcher api.RequestResponseMatcher) { tlsEmitter *tlsEmitter, options *shared.TrafficFilteringOptions, reqResMatcher api.RequestResponseMatcher) {
b := bufio.NewReader(reader) b := bufio.NewReader(reader)
err := extension.Dissector.Dissect(b, reader.progress, api.Ebpf, isRequest, tcpid, &api.CounterPair{}, tcpReader := &api.TcpReader{
&api.SuperTimer{}, &api.SuperIdentifier{}, tlsEmitter, options, reqResMatcher) Progress: reader.progress,
Parent: &api.TcpStream{
Origin: api.Ebpf,
SuperIdentifier: &api.SuperIdentifier{},
},
IsClient: isRequest,
TcpID: tcpid,
SuperTimer: &api.SuperTimer{},
Emitter: tlsEmitter,
ReqResMatcher: reqResMatcher,
}
err := extension.Dissector.Dissect(b, tcpReader, options)
if err != nil { if err != nil {
logger.Log.Warningf("Error dissecting TLS %v - %v", tcpid, err) logger.Log.Warningf("Error dissecting TLS %v - %v", tcpid, err)