diff --git a/tap/api/api.go b/tap/api/api.go index 7ce5d7ad5..64dbb8f4e 100644 --- a/tap/api/api.go +++ b/tap/api/api.go @@ -132,7 +132,7 @@ func (p *ReadProgress) Current() (n int) { type Dissector interface { Register(*Extension) Ping() - Dissect(b *bufio.Reader, progress *ReadProgress, capture Capture, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *shared.TrafficFilteringOptions, reqResMatcher RequestResponseMatcher) error + Dissect(b *bufio.Reader, reader *TcpReader, options *shared.TrafficFilteringOptions) error Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *Entry Summarize(entry *Entry) *BaseEntry Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, err error) diff --git a/tap/api/tcp_reader.go b/tap/api/tcp_reader.go index 75eba5a4d..115c846d8 100644 --- a/tap/api/tcp_reader.go +++ b/tap/api/tcp_reader.go @@ -40,45 +40,45 @@ type TcpReader struct { sync.Mutex } -func (h *TcpReader) Read(p []byte) (int, error) { +func (reader *TcpReader) Read(p []byte) (int, error) { var msg TcpReaderDataMsg ok := true - for ok && len(h.data) == 0 { - msg, ok = <-h.MsgQueue - h.data = msg.bytes + for ok && len(reader.data) == 0 { + msg, ok = <-reader.MsgQueue + reader.data = msg.bytes - h.SuperTimer.CaptureTime = msg.timestamp - if len(h.data) > 0 { - h.packetsSeen += 1 + reader.SuperTimer.CaptureTime = msg.timestamp + if len(reader.data) > 0 { + reader.packetsSeen += 1 } } - if !ok || len(h.data) == 0 { + if !ok || len(reader.data) == 0 { return 0, io.EOF } - l := copy(p, h.data) - h.data = h.data[l:] - h.Progress.Feed(l) + l := copy(p, reader.data) + reader.data = reader.data[l:] + reader.Progress.Feed(l) return l, nil } -func (h *TcpReader) Close() { - h.Lock() - if !h.isClosed { - h.isClosed = true - close(h.MsgQueue) +func (reader *TcpReader) Close() { + reader.Lock() + if !reader.isClosed { + reader.isClosed = true + close(reader.MsgQueue) } - h.Unlock() + reader.Unlock() } -func (h *TcpReader) Run(filteringOptions *shared.TrafficFilteringOptions, wg *sync.WaitGroup) { +func (reader *TcpReader) Run(options *shared.TrafficFilteringOptions, wg *sync.WaitGroup) { defer wg.Done() - b := bufio.NewReader(h) - err := h.Extension.Dissector.Dissect(b, h.Progress, h.Parent.Origin, h.IsClient, h.TcpID, h.CounterPair, h.SuperTimer, h.Parent.SuperIdentifier, h.Emitter, filteringOptions, h.ReqResMatcher) + b := bufio.NewReader(reader) + err := reader.Extension.Dissector.Dissect(b, reader, options) if err != nil { - _, err = io.Copy(ioutil.Discard, b) + _, err = io.Copy(ioutil.Discard, reader) if err != nil { logger.Log.Errorf("%v", err) } diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index efcc5cea4..5e3c76700 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -40,17 +40,17 @@ func (d dissecting) Ping() { const amqpRequest string = "amqp_request" -func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *shared.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error { +func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *shared.TrafficFilteringOptions) error { r := AmqpReader{b} var remaining int var header *HeaderFrame connectionInfo := &api.ConnectionInfo{ - ClientIP: tcpID.SrcIP, - ClientPort: tcpID.SrcPort, - ServerIP: tcpID.DstIP, - ServerPort: tcpID.DstPort, + ClientIP: reader.TcpID.SrcIP, + ClientPort: reader.TcpID.SrcPort, + ServerIP: reader.TcpID.DstIP, + ServerPort: reader.TcpID.DstPort, IsOutgoing: true, } @@ -76,7 +76,7 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture var lastMethodFrameMessage Message for { - if superIdentifier.Protocol != nil && superIdentifier.Protocol != &protocol { + if reader.Parent.SuperIdentifier.Protocol != nil && reader.Parent.SuperIdentifier.Protocol != &protocol { return errors.New("Identified by another protocol") } @@ -113,12 +113,12 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture switch lastMethodFrameMessage.(type) { case *BasicPublish: eventBasicPublish.Body = f.Body - superIdentifier.Protocol = &protocol - emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture) + reader.Parent.SuperIdentifier.Protocol = &protocol + emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) case *BasicDeliver: eventBasicDeliver.Body = f.Body - superIdentifier.Protocol = &protocol - emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture) + reader.Parent.SuperIdentifier.Protocol = &protocol + emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) } case *MethodFrame: @@ -138,8 +138,8 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture NoWait: m.NoWait, Arguments: m.Arguments, } - superIdentifier.Protocol = &protocol - emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture) + reader.Parent.SuperIdentifier.Protocol = &protocol + emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) case *BasicConsume: eventBasicConsume := &BasicConsume{ @@ -151,8 +151,8 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture NoWait: m.NoWait, Arguments: m.Arguments, } - superIdentifier.Protocol = &protocol - emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture) + reader.Parent.SuperIdentifier.Protocol = &protocol + emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) case *BasicDeliver: eventBasicDeliver.ConsumerTag = m.ConsumerTag @@ -171,8 +171,8 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture NoWait: m.NoWait, Arguments: m.Arguments, } - superIdentifier.Protocol = &protocol - emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture) + reader.Parent.SuperIdentifier.Protocol = &protocol + emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) case *ExchangeDeclare: eventExchangeDeclare := &ExchangeDeclare{ @@ -185,8 +185,8 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture NoWait: m.NoWait, Arguments: m.Arguments, } - superIdentifier.Protocol = &protocol - emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture) + reader.Parent.SuperIdentifier.Protocol = &protocol + emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) case *ConnectionStart: eventConnectionStart := &ConnectionStart{ @@ -196,8 +196,8 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture Mechanisms: m.Mechanisms, Locales: m.Locales, } - superIdentifier.Protocol = &protocol - emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture) + reader.Parent.SuperIdentifier.Protocol = &protocol + emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) case *ConnectionClose: eventConnectionClose := &ConnectionClose{ @@ -206,8 +206,8 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture ClassId: m.ClassId, MethodId: m.MethodId, } - superIdentifier.Protocol = &protocol - emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture) + reader.Parent.SuperIdentifier.Protocol = &protocol + emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) } default: diff --git a/tap/extensions/amqp/main_test.go b/tap/extensions/amqp/main_test.go index 557bb3661..5b5c51618 100644 --- a/tap/extensions/amqp/main_test.go +++ b/tap/extensions/amqp/main_test.go @@ -17,6 +17,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/tap/api" + "github.com/up9inc/mizu/tap/api/diagnose" ) const ( @@ -84,7 +85,7 @@ func TestDissect(t *testing.T) { // Channel to verify the output itemChannel := make(chan *api.OutputChannelItem) var emitter api.Emitter = &api.Emitting{ - AppStats: &api.AppStats{}, + AppStats: &diagnose.AppStats{}, OutputChannel: itemChannel, } @@ -123,7 +124,19 @@ func TestDissect(t *testing.T) { DstPort: "2", } reqResMatcher := dissector.NewResponseRequestMatcher() - err = dissector.Dissect(bufferClient, &api.ReadProgress{}, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) + reader := &api.TcpReader{ + Progress: &api.ReadProgress{}, + Parent: &api.TcpStream{ + Origin: api.Pcap, + SuperIdentifier: superIdentifier, + }, + IsClient: true, + TcpID: tcpIDClient, + SuperTimer: &api.SuperTimer{}, + Emitter: emitter, + ReqResMatcher: reqResMatcher, + } + err = dissector.Dissect(bufferClient, reader, options) if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { panic(err) } @@ -141,7 +154,19 @@ func TestDissect(t *testing.T) { SrcPort: "2", DstPort: "1", } - err = dissector.Dissect(bufferServer, &api.ReadProgress{}, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) + reader = &api.TcpReader{ + Progress: &api.ReadProgress{}, + Parent: &api.TcpStream{ + Origin: api.Pcap, + SuperIdentifier: superIdentifier, + }, + IsClient: false, + TcpID: tcpIDServer, + SuperTimer: &api.SuperTimer{}, + Emitter: emitter, + ReqResMatcher: reqResMatcher, + } + err = dissector.Dissect(bufferServer, reader, options) if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { panic(err) } diff --git a/tap/extensions/http/main.go b/tap/extensions/http/main.go index 95ab67fa3..7d7f7a114 100644 --- a/tap/extensions/http/main.go +++ b/tap/extensions/http/main.go @@ -87,15 +87,15 @@ func (d dissecting) Ping() { log.Printf("pong %s", http11protocol.Name) } -func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *shared.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error { - reqResMatcher := _reqResMatcher.(*requestResponseMatcher) +func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *shared.TrafficFilteringOptions) error { + reqResMatcher := reader.ReqResMatcher.(*requestResponseMatcher) var err error - isHTTP2, _ := checkIsHTTP2Connection(b, isClient) + isHTTP2, _ := checkIsHTTP2Connection(b, reader.IsClient) var http2Assembler *Http2Assembler if isHTTP2 { - err = prepareHTTP2Connection(b, isClient) + err = prepareHTTP2Connection(b, reader.IsClient) if err != nil { return err } @@ -106,74 +106,74 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture for { if switchingProtocolsHTTP2 { switchingProtocolsHTTP2 = false - isHTTP2, err = checkIsHTTP2Connection(b, isClient) + isHTTP2, err = checkIsHTTP2Connection(b, reader.IsClient) if err != nil { break } - err = prepareHTTP2Connection(b, isClient) + err = prepareHTTP2Connection(b, reader.IsClient) if err != nil { break } http2Assembler = createHTTP2Assembler(b) } - if superIdentifier.Protocol != nil && superIdentifier.Protocol != &http11protocol { + if reader.Parent.SuperIdentifier.Protocol != nil && reader.Parent.SuperIdentifier.Protocol != &http11protocol { return errors.New("Identified by another protocol") } if isHTTP2 { - err = handleHTTP2Stream(http2Assembler, progress, capture, tcpID, superTimer, emitter, options, reqResMatcher) + err = handleHTTP2Stream(http2Assembler, reader.Progress, reader.Parent.Origin, reader.TcpID, reader.SuperTimer, reader.Emitter, options, reqResMatcher) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { continue } - superIdentifier.Protocol = &http11protocol - } else if isClient { + reader.Parent.SuperIdentifier.Protocol = &http11protocol + } else if reader.IsClient { var req *http.Request - switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, progress, capture, tcpID, counterPair, superTimer, emitter, options, reqResMatcher) + switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, reader.Progress, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.SuperTimer, reader.Emitter, options, reqResMatcher) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { continue } - superIdentifier.Protocol = &http11protocol + reader.Parent.SuperIdentifier.Protocol = &http11protocol // In case of an HTTP2 upgrade, duplicate the HTTP1 request into HTTP2 with stream ID 1 if switchingProtocolsHTTP2 { ident := fmt.Sprintf( "%s_%s_%s_%s_1_%s", - tcpID.SrcIP, - tcpID.DstIP, - tcpID.SrcPort, - tcpID.DstPort, + reader.TcpID.SrcIP, + reader.TcpID.DstIP, + reader.TcpID.SrcPort, + reader.TcpID.DstPort, "HTTP2", ) - item := reqResMatcher.registerRequest(ident, req, superTimer.CaptureTime, progress.Current(), req.ProtoMinor) + item := reqResMatcher.registerRequest(ident, req, reader.SuperTimer.CaptureTime, reader.Progress.Current(), req.ProtoMinor) if item != nil { item.ConnectionInfo = &api.ConnectionInfo{ - ClientIP: tcpID.SrcIP, - ClientPort: tcpID.SrcPort, - ServerIP: tcpID.DstIP, - ServerPort: tcpID.DstPort, + ClientIP: reader.TcpID.SrcIP, + ClientPort: reader.TcpID.SrcPort, + ServerIP: reader.TcpID.DstIP, + ServerPort: reader.TcpID.DstPort, IsOutgoing: true, } - item.Capture = capture - filterAndEmit(item, emitter, options) + item.Capture = reader.Parent.Origin + filterAndEmit(item, reader.Emitter, options) } } } else { - switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, progress, capture, tcpID, counterPair, superTimer, emitter, options, reqResMatcher) + switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, reader.Progress, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.SuperTimer, reader.Emitter, options, reqResMatcher) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { continue } - superIdentifier.Protocol = &http11protocol + reader.Parent.SuperIdentifier.Protocol = &http11protocol } } - if superIdentifier.Protocol == nil { + if reader.Parent.SuperIdentifier.Protocol == nil { return err } diff --git a/tap/extensions/http/main_test.go b/tap/extensions/http/main_test.go index 3c2ee1726..9da02029c 100644 --- a/tap/extensions/http/main_test.go +++ b/tap/extensions/http/main_test.go @@ -17,6 +17,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/tap/api" + "github.com/up9inc/mizu/tap/api/diagnose" ) const ( @@ -86,7 +87,7 @@ func TestDissect(t *testing.T) { // Channel to verify the output itemChannel := make(chan *api.OutputChannelItem) var emitter api.Emitter = &api.Emitting{ - AppStats: &api.AppStats{}, + AppStats: &diagnose.AppStats{}, OutputChannel: itemChannel, } @@ -125,7 +126,19 @@ func TestDissect(t *testing.T) { DstPort: "2", } reqResMatcher := dissector.NewResponseRequestMatcher() - err = dissector.Dissect(bufferClient, &api.ReadProgress{}, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) + reader := &api.TcpReader{ + Progress: &api.ReadProgress{}, + Parent: &api.TcpStream{ + Origin: api.Pcap, + SuperIdentifier: superIdentifier, + }, + IsClient: true, + TcpID: tcpIDClient, + SuperTimer: &api.SuperTimer{}, + Emitter: emitter, + ReqResMatcher: reqResMatcher, + } + err = dissector.Dissect(bufferClient, reader, options) if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { panic(err) } @@ -143,7 +156,19 @@ func TestDissect(t *testing.T) { SrcPort: "2", DstPort: "1", } - err = dissector.Dissect(bufferServer, &api.ReadProgress{}, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) + reader = &api.TcpReader{ + Progress: &api.ReadProgress{}, + Parent: &api.TcpStream{ + Origin: api.Pcap, + SuperIdentifier: superIdentifier, + }, + IsClient: false, + TcpID: tcpIDServer, + SuperTimer: &api.SuperTimer{}, + Emitter: emitter, + ReqResMatcher: reqResMatcher, + } + err = dissector.Dissect(bufferServer, reader, options) if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { panic(err) } diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index 05d844477..7d6caf283 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -36,25 +36,25 @@ func (d dissecting) Ping() { log.Printf("pong %s", _protocol.Name) } -func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *shared.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error { - reqResMatcher := _reqResMatcher.(*requestResponseMatcher) +func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *shared.TrafficFilteringOptions) error { + reqResMatcher := reader.ReqResMatcher.(*requestResponseMatcher) for { - if superIdentifier.Protocol != nil && superIdentifier.Protocol != &_protocol { + if reader.Parent.SuperIdentifier.Protocol != nil && reader.Parent.SuperIdentifier.Protocol != &_protocol { return errors.New("Identified by another protocol") } - if isClient { - _, _, err := ReadRequest(b, tcpID, counterPair, superTimer, reqResMatcher) + if reader.IsClient { + _, _, err := ReadRequest(b, reader.TcpID, reader.CounterPair, reader.SuperTimer, reqResMatcher) if err != nil { return err } - superIdentifier.Protocol = &_protocol + reader.Parent.SuperIdentifier.Protocol = &_protocol } else { - err := ReadResponse(b, capture, tcpID, counterPair, superTimer, emitter, reqResMatcher) + err := ReadResponse(b, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.SuperTimer, reader.Emitter, reqResMatcher) if err != nil { return err } - superIdentifier.Protocol = &_protocol + reader.Parent.SuperIdentifier.Protocol = &_protocol } } } diff --git a/tap/extensions/kafka/main_test.go b/tap/extensions/kafka/main_test.go index 8ca267adf..ef30267d5 100644 --- a/tap/extensions/kafka/main_test.go +++ b/tap/extensions/kafka/main_test.go @@ -17,6 +17,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/tap/api" + "github.com/up9inc/mizu/tap/api/diagnose" ) const ( @@ -84,7 +85,7 @@ func TestDissect(t *testing.T) { // Channel to verify the output itemChannel := make(chan *api.OutputChannelItem) var emitter api.Emitter = &api.Emitting{ - AppStats: &api.AppStats{}, + AppStats: &diagnose.AppStats{}, OutputChannel: itemChannel, } @@ -124,7 +125,19 @@ func TestDissect(t *testing.T) { } reqResMatcher := dissector.NewResponseRequestMatcher() reqResMatcher.SetMaxTry(10) - err = dissector.Dissect(bufferClient, &api.ReadProgress{}, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) + reader := &api.TcpReader{ + Progress: &api.ReadProgress{}, + Parent: &api.TcpStream{ + Origin: api.Pcap, + SuperIdentifier: superIdentifier, + }, + IsClient: true, + TcpID: tcpIDClient, + SuperTimer: &api.SuperTimer{}, + Emitter: emitter, + ReqResMatcher: reqResMatcher, + } + err = dissector.Dissect(bufferClient, reader, options) if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { log.Println(err) } @@ -142,7 +155,19 @@ func TestDissect(t *testing.T) { SrcPort: "2", DstPort: "1", } - err = dissector.Dissect(bufferServer, &api.ReadProgress{}, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) + reader = &api.TcpReader{ + Progress: &api.ReadProgress{}, + Parent: &api.TcpStream{ + Origin: api.Pcap, + SuperIdentifier: superIdentifier, + }, + IsClient: false, + TcpID: tcpIDServer, + SuperTimer: &api.SuperTimer{}, + Emitter: emitter, + ReqResMatcher: reqResMatcher, + } + err = dissector.Dissect(bufferServer, reader, options) if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { log.Println(err) } diff --git a/tap/extensions/redis/main.go b/tap/extensions/redis/main.go index 60ed3dfca..f3143e862 100644 --- a/tap/extensions/redis/main.go +++ b/tap/extensions/redis/main.go @@ -35,8 +35,8 @@ func (d dissecting) Ping() { log.Printf("pong %s", protocol.Name) } -func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *shared.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error { - reqResMatcher := _reqResMatcher.(*requestResponseMatcher) +func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *shared.TrafficFilteringOptions) error { + reqResMatcher := reader.ReqResMatcher.(*requestResponseMatcher) is := &RedisInputStream{ Reader: b, Buf: make([]byte, 8192), @@ -48,10 +48,10 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture return err } - if isClient { - err = handleClientStream(progress, capture, tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher) + if reader.IsClient { + err = handleClientStream(reader.Progress, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.SuperTimer, reader.Emitter, redisPacket, reqResMatcher) } else { - err = handleServerStream(progress, capture, tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher) + err = handleServerStream(reader.Progress, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.SuperTimer, reader.Emitter, redisPacket, reqResMatcher) } if err != nil { diff --git a/tap/extensions/redis/main_test.go b/tap/extensions/redis/main_test.go index 69910d869..4231d1841 100644 --- a/tap/extensions/redis/main_test.go +++ b/tap/extensions/redis/main_test.go @@ -18,6 +18,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/tap/api" + "github.com/up9inc/mizu/tap/api/diagnose" ) const ( @@ -85,7 +86,7 @@ func TestDissect(t *testing.T) { // Channel to verify the output itemChannel := make(chan *api.OutputChannelItem) var emitter api.Emitter = &api.Emitting{ - AppStats: &api.AppStats{}, + AppStats: &diagnose.AppStats{}, OutputChannel: itemChannel, } @@ -124,7 +125,19 @@ func TestDissect(t *testing.T) { DstPort: "2", } reqResMatcher := dissector.NewResponseRequestMatcher() - err = dissector.Dissect(bufferClient, &api.ReadProgress{}, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) + reader := &api.TcpReader{ + Progress: &api.ReadProgress{}, + Parent: &api.TcpStream{ + Origin: api.Pcap, + SuperIdentifier: superIdentifier, + }, + IsClient: true, + TcpID: tcpIDClient, + SuperTimer: &api.SuperTimer{}, + Emitter: emitter, + ReqResMatcher: reqResMatcher, + } + err = dissector.Dissect(bufferClient, reader, options) if err != nil && reflect.TypeOf(err) != reflect.TypeOf(&ConnectError{}) && err != io.EOF && err != io.ErrUnexpectedEOF { log.Println(err) } @@ -142,7 +155,19 @@ func TestDissect(t *testing.T) { SrcPort: "2", DstPort: "1", } - err = dissector.Dissect(bufferServer, &api.ReadProgress{}, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) + reader = &api.TcpReader{ + Progress: &api.ReadProgress{}, + Parent: &api.TcpStream{ + Origin: api.Pcap, + SuperIdentifier: superIdentifier, + }, + IsClient: false, + TcpID: tcpIDServer, + SuperTimer: &api.SuperTimer{}, + Emitter: emitter, + ReqResMatcher: reqResMatcher, + } + err = dissector.Dissect(bufferServer, reader, options) if err != nil && reflect.TypeOf(err) != reflect.TypeOf(&ConnectError{}) && err != io.EOF && err != io.ErrUnexpectedEOF { log.Println(err) } diff --git a/tap/tlstapper/tls_poller.go b/tap/tlstapper/tls_poller.go index 1926d3646..dffce5118 100644 --- a/tap/tlstapper/tls_poller.go +++ b/tap/tlstapper/tls_poller.go @@ -149,7 +149,6 @@ func (p *tlsPoller) startNewTlsReader(chunk *tlsChunk, ip net.IP, port uint16, k doneHandler: func(r *tlsReader) { p.closeReader(key, r) }, - progress: &api.ReadProgress{}, } tcpid := p.buildTcpId(chunk, ip, port) @@ -167,8 +166,20 @@ func dissect(extension *api.Extension, reader *tlsReader, isRequest bool, tcpid tlsEmitter *tlsEmitter, options *shared.TrafficFilteringOptions, reqResMatcher api.RequestResponseMatcher) { b := bufio.NewReader(reader) - err := extension.Dissector.Dissect(b, reader.progress, api.Ebpf, isRequest, tcpid, &api.CounterPair{}, - &api.SuperTimer{}, &api.SuperIdentifier{}, tlsEmitter, options, reqResMatcher) + tcpReader := &api.TcpReader{ + Progress: reader.progress, + Parent: &api.TcpStream{ + Origin: api.Ebpf, + SuperIdentifier: &api.SuperIdentifier{}, + }, + IsClient: isRequest, + TcpID: tcpid, + SuperTimer: &api.SuperTimer{}, + Emitter: tlsEmitter, + ReqResMatcher: reqResMatcher, + } + + err := extension.Dissector.Dissect(b, tcpReader, options) if err != nil { logger.Log.Warningf("Error dissecting TLS %v - %v", tcpid, err)