Move the request-response matcher's scope from global-level to TCP stream-level (#793)

* Create a new request-response matcher for each TCP stream

* Fix the `ident` formats in request-response matchers

* Don't sort the items in the HTTP tests

* Update tap/extensions/kafka/matcher.go

Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>

* Temporarily change the bucket folder to the new expected

* Bring back the `deleteOlderThan` method

* Use `api.RequestResponseMatcher` instead of `interface{}` as type

* Use `api.RequestResponseMatcher` instead of `interface{}` as type (more)

* Update the key format comments

Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
This commit is contained in:
M. Mert Yıldıran 2022-02-14 18:16:58 +03:00 committed by GitHub
parent 71c04d20ef
commit 041223b558
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 104 additions and 81 deletions

View File

@ -42,7 +42,6 @@ type Extension struct {
Protocol *Protocol Protocol *Protocol
Path string Path string
Dissector Dissector Dissector Dissector
MatcherMap *sync.Map
} }
type ConnectionInfo struct { type ConnectionInfo struct {
@ -62,7 +61,6 @@ type TcpID struct {
} }
type CounterPair struct { type CounterPair struct {
StreamId int64
Request uint Request uint
Response uint Response uint
sync.Mutex sync.Mutex
@ -100,10 +98,15 @@ type SuperIdentifier struct {
type Dissector interface { type Dissector interface {
Register(*Extension) Register(*Extension)
Ping() Ping()
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions) error Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions, reqResMatcher RequestResponseMatcher) error
Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string) *Entry Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string) *Entry
Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error)
Macros() map[string]string Macros() map[string]string
NewResponseRequestMatcher() RequestResponseMatcher
}
type RequestResponseMatcher interface {
GetMap() *sync.Map
} }
type Emitting struct { type Emitting struct {

View File

@ -22,6 +22,7 @@ type Cleaner struct {
connectionTimeout time.Duration connectionTimeout time.Duration
stats CleanerStats stats CleanerStats
statsMutex sync.Mutex statsMutex sync.Mutex
streamsMap *tcpStreamMap
} }
func (cl *Cleaner) clean() { func (cl *Cleaner) clean() {
@ -32,10 +33,15 @@ func (cl *Cleaner) clean() {
flushed, closed := cl.assembler.FlushCloseOlderThan(startCleanTime.Add(-cl.connectionTimeout)) flushed, closed := cl.assembler.FlushCloseOlderThan(startCleanTime.Add(-cl.connectionTimeout))
cl.assemblerMutex.Unlock() cl.assemblerMutex.Unlock()
for _, extension := range extensions { cl.streamsMap.streams.Range(func(k, v interface{}) bool {
deleted := deleteOlderThan(extension.MatcherMap, startCleanTime.Add(-cl.connectionTimeout)) reqResMatcher := v.(*tcpStreamWrapper).reqResMatcher
cl.stats.deleted += deleted if reqResMatcher == nil {
return true
} }
deleted := deleteOlderThan(reqResMatcher.GetMap(), startCleanTime.Add(-cl.connectionTimeout))
cl.stats.deleted += deleted
return true
})
cl.statsMutex.Lock() cl.statsMutex.Lock()
logger.Log.Debugf("Assembler Stats after cleaning %s", cl.assembler.Dump()) logger.Log.Debugf("Assembler Stats after cleaning %s", cl.assembler.Dump())

View File

@ -42,7 +42,7 @@ func (d dissecting) Ping() {
const amqpRequest string = "amqp_request" const amqpRequest string = "amqp_request"
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error { func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
r := AmqpReader{b} r := AmqpReader{b}
var remaining int var remaining int
@ -300,6 +300,10 @@ func (d dissecting) Macros() map[string]string {
} }
} }
func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher {
return nil
}
var Dissector dissecting var Dissector dissecting
func NewDissector() api.Dissector { func NewDissector() api.Dissector {

View File

@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect: test-pull-expect:
@mkdir -p expect @mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect/http/\* expect @[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect2/http/\* expect

View File

@ -47,7 +47,7 @@ func replaceForwardedFor(item *api.OutputChannelItem) {
item.ConnectionInfo.ClientPort = "" item.ConnectionInfo.ClientPort = ""
} }
func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) error { func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) error {
streamID, messageHTTP1, isGrpc, err := http2Assembler.readMessage() streamID, messageHTTP1, isGrpc, err := http2Assembler.readMessage()
if err != nil { if err != nil {
return err return err
@ -58,7 +58,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi
switch messageHTTP1 := messageHTTP1.(type) { switch messageHTTP1 := messageHTTP1.(type) {
case http.Request: case http.Request:
ident := fmt.Sprintf( ident := fmt.Sprintf(
"%s->%s %s->%s %d %s", "%s_%s_%s_%s_%d_%s",
tcpID.SrcIP, tcpID.SrcIP,
tcpID.DstIP, tcpID.DstIP,
tcpID.SrcPort, tcpID.SrcPort,
@ -78,7 +78,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi
} }
case http.Response: case http.Response:
ident := fmt.Sprintf( ident := fmt.Sprintf(
"%s->%s %s->%s %d %s", "%s_%s_%s_%s_%d_%s",
tcpID.DstIP, tcpID.DstIP,
tcpID.SrcIP, tcpID.SrcIP,
tcpID.DstPort, tcpID.DstPort,
@ -110,7 +110,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi
return nil return nil
} }
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) (switchingProtocolsHTTP2 bool, req *http.Request, err error) { func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, req *http.Request, err error) {
req, err = http.ReadRequest(b) req, err = http.ReadRequest(b)
if err != nil { if err != nil {
return return
@ -130,8 +130,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
req.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind req.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
ident := fmt.Sprintf( ident := fmt.Sprintf(
"%d_%s:%s_%s:%s_%d_%s", "%s_%s_%s_%s_%d_%s",
counterPair.StreamId,
tcpID.SrcIP, tcpID.SrcIP,
tcpID.DstIP, tcpID.DstIP,
tcpID.SrcPort, tcpID.SrcPort,
@ -153,7 +152,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
return return
} }
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) (switchingProtocolsHTTP2 bool, err error) { func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, err error) {
var res *http.Response var res *http.Response
res, err = http.ReadResponse(b, nil) res, err = http.ReadResponse(b, nil)
if err != nil { if err != nil {
@ -174,8 +173,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
res.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind res.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
ident := fmt.Sprintf( ident := fmt.Sprintf(
"%d_%s:%s_%s:%s_%d_%s", "%s_%s_%s_%s_%d_%s",
counterPair.StreamId,
tcpID.DstIP, tcpID.DstIP,
tcpID.SrcIP, tcpID.SrcIP,
tcpID.DstPort, tcpID.DstPort,

View File

@ -84,14 +84,15 @@ type dissecting string
func (d dissecting) Register(extension *api.Extension) { func (d dissecting) Register(extension *api.Extension) {
extension.Protocol = &http11protocol extension.Protocol = &http11protocol
extension.MatcherMap = reqResMatcher.openMessagesMap
} }
func (d dissecting) Ping() { func (d dissecting) Ping() {
log.Printf("pong %s", http11protocol.Name) log.Printf("pong %s", http11protocol.Name)
} }
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error { func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
var err error var err error
isHTTP2, _ := checkIsHTTP2Connection(b, isClient) isHTTP2, _ := checkIsHTTP2Connection(b, isClient)
@ -124,7 +125,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
} }
if isHTTP2 { if isHTTP2 {
err = handleHTTP2Stream(http2Assembler, tcpID, superTimer, emitter, options) err = handleHTTP2Stream(http2Assembler, tcpID, superTimer, emitter, options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF { if err == io.EOF || err == io.ErrUnexpectedEOF {
break break
} else if err != nil { } else if err != nil {
@ -133,7 +134,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
superIdentifier.Protocol = &http11protocol superIdentifier.Protocol = &http11protocol
} else if isClient { } else if isClient {
var req *http.Request var req *http.Request
switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter, options) switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter, options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF { if err == io.EOF || err == io.ErrUnexpectedEOF {
break break
} else if err != nil { } else if err != nil {
@ -144,7 +145,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
// In case of an HTTP2 upgrade, duplicate the HTTP1 request into HTTP2 with stream ID 1 // In case of an HTTP2 upgrade, duplicate the HTTP1 request into HTTP2 with stream ID 1
if switchingProtocolsHTTP2 { if switchingProtocolsHTTP2 {
ident := fmt.Sprintf( ident := fmt.Sprintf(
"%s->%s %s->%s 1 %s", "%s_%s_%s_%s_1_%s",
tcpID.SrcIP, tcpID.SrcIP,
tcpID.DstIP, tcpID.DstIP,
tcpID.SrcPort, tcpID.SrcPort,
@ -164,7 +165,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
} }
} }
} else { } else {
switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter, options) switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter, options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF { if err == io.EOF || err == io.ErrUnexpectedEOF {
break break
} else if err != nil { } else if err != nil {
@ -472,6 +473,10 @@ func (d dissecting) Macros() map[string]string {
} }
} }
func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher {
return createResponseRequestMatcher()
}
var Dissector dissecting var Dissector dissecting
func NewDissector() api.Dissector { func NewDissector() api.Dissector {

View File

@ -11,7 +11,6 @@ import (
"os" "os"
"path" "path"
"path/filepath" "path/filepath"
"sort"
"testing" "testing"
"time" "time"
@ -39,7 +38,6 @@ func TestRegister(t *testing.T) {
extension := &api.Extension{} extension := &api.Extension{}
dissector.Register(extension) dissector.Register(extension)
assert.Equal(t, "http", extension.Protocol.Name) assert.Equal(t, "http", extension.Protocol.Name)
assert.NotNil(t, extension.MatcherMap)
} }
func TestMacros(t *testing.T) { func TestMacros(t *testing.T) {
@ -123,7 +121,8 @@ func TestDissect(t *testing.T) {
SrcPort: "1", SrcPort: "1",
DstPort: "2", DstPort: "2",
} }
err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options) reqResMatcher := dissector.NewResponseRequestMatcher()
err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err) panic(err)
} }
@ -141,7 +140,7 @@ func TestDissect(t *testing.T) {
SrcPort: "2", SrcPort: "2",
DstPort: "1", DstPort: "1",
} }
err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options) err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err) panic(err)
} }
@ -155,14 +154,6 @@ func TestDissect(t *testing.T) {
stop <- true stop <- true
sort.Slice(items, func(i, j int) bool {
iMarshaled, err := json.Marshal(items[i])
assert.Nil(t, err)
jMarshaled, err := json.Marshal(items[j])
assert.Nil(t, err)
return len(iMarshaled) < len(jMarshaled)
})
marshaled, err := json.Marshal(items) marshaled, err := json.Marshal(items)
assert.Nil(t, err) assert.Nil(t, err)

View File

@ -8,16 +8,17 @@ import (
"github.com/up9inc/mizu/tap/api" "github.com/up9inc/mizu/tap/api"
) )
var reqResMatcher = createResponseRequestMatcher() // global // Key is {client_addr}_{client_port}_{dest_addr}_{dest_port}_{incremental_counter}_{proto_ident}
// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}_{incremental_counter}
type requestResponseMatcher struct { type requestResponseMatcher struct {
openMessagesMap *sync.Map openMessagesMap *sync.Map
} }
func createResponseRequestMatcher() requestResponseMatcher { func createResponseRequestMatcher() api.RequestResponseMatcher {
newMatcher := &requestResponseMatcher{openMessagesMap: &sync.Map{}} return &requestResponseMatcher{openMessagesMap: &sync.Map{}}
return *newMatcher }
func (matcher *requestResponseMatcher) GetMap() *sync.Map {
return matcher.openMessagesMap
} }
func (matcher *requestResponseMatcher) registerRequest(ident string, request *http.Request, captureTime time.Time, protoMinor int) *api.OutputChannelItem { func (matcher *requestResponseMatcher) registerRequest(ident string, request *http.Request, captureTime time.Time, protoMinor int) *api.OutputChannelItem {

View File

@ -33,27 +33,27 @@ type dissecting string
func (d dissecting) Register(extension *api.Extension) { func (d dissecting) Register(extension *api.Extension) {
extension.Protocol = &_protocol extension.Protocol = &_protocol
extension.MatcherMap = reqResMatcher.openMessagesMap
} }
func (d dissecting) Ping() { func (d dissecting) Ping() {
log.Printf("pong %s", _protocol.Name) log.Printf("pong %s", _protocol.Name)
} }
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error { func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
for { for {
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &_protocol { if superIdentifier.Protocol != nil && superIdentifier.Protocol != &_protocol {
return errors.New("Identified by another protocol") return errors.New("Identified by another protocol")
} }
if isClient { if isClient {
_, _, err := ReadRequest(b, tcpID, counterPair, superTimer) _, _, err := ReadRequest(b, tcpID, counterPair, superTimer, reqResMatcher)
if err != nil { if err != nil {
return err return err
} }
superIdentifier.Protocol = &_protocol superIdentifier.Protocol = &_protocol
} else { } else {
err := ReadResponse(b, tcpID, counterPair, superTimer, emitter) err := ReadResponse(b, tcpID, counterPair, superTimer, emitter, reqResMatcher)
if err != nil { if err != nil {
return err return err
} }
@ -215,6 +215,10 @@ func (d dissecting) Macros() map[string]string {
} }
} }
func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher {
return createResponseRequestMatcher()
}
var Dissector dissecting var Dissector dissecting
func NewDissector() api.Dissector { func NewDissector() api.Dissector {

View File

@ -3,9 +3,10 @@ package kafka
import ( import (
"sync" "sync"
"time" "time"
"github.com/up9inc/mizu/tap/api"
) )
var reqResMatcher = CreateResponseRequestMatcher() // global
const maxTry int = 3000 const maxTry int = 3000
type RequestResponsePair struct { type RequestResponsePair struct {
@ -13,14 +14,17 @@ type RequestResponsePair struct {
Response Response Response Response
} }
// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}::{correlation_id} // Key is {client_addr}_{client_port}_{dest_addr}_{dest_port}_{correlation_id}
type requestResponseMatcher struct { type requestResponseMatcher struct {
openMessagesMap *sync.Map openMessagesMap *sync.Map
} }
func CreateResponseRequestMatcher() requestResponseMatcher { func createResponseRequestMatcher() api.RequestResponseMatcher {
newMatcher := &requestResponseMatcher{openMessagesMap: &sync.Map{}} return &requestResponseMatcher{openMessagesMap: &sync.Map{}}
return *newMatcher }
func (matcher *requestResponseMatcher) GetMap() *sync.Map {
return matcher.openMessagesMap
} }
func (matcher *requestResponseMatcher) registerRequest(key string, request *Request) *RequestResponsePair { func (matcher *requestResponseMatcher) registerRequest(key string, request *Request) *RequestResponsePair {

View File

@ -19,7 +19,7 @@ type Request struct {
CaptureTime time.Time `json:"captureTime"` CaptureTime time.Time `json:"captureTime"`
} }
func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer) (apiKey ApiKey, apiVersion int16, err error) { func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, reqResMatcher *requestResponseMatcher) (apiKey ApiKey, apiVersion int16, err error) {
d := &decoder{reader: r, remain: 4} d := &decoder{reader: r, remain: 4}
size := d.readInt32() size := d.readInt32()
@ -214,8 +214,7 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, su
} }
key := fmt.Sprintf( key := fmt.Sprintf(
"%d_%s:%s_%s:%s_%d", "%s_%s_%s_%s_%d",
counterPair.StreamId,
tcpID.SrcIP, tcpID.SrcIP,
tcpID.SrcPort, tcpID.SrcPort,
tcpID.DstIP, tcpID.DstIP,

View File

@ -16,7 +16,7 @@ type Response struct {
CaptureTime time.Time `json:"captureTime"` CaptureTime time.Time `json:"captureTime"`
} }
func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) (err error) { func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, reqResMatcher *requestResponseMatcher) (err error) {
d := &decoder{reader: r, remain: 4} d := &decoder{reader: r, remain: 4}
size := d.readInt32() size := d.readInt32()
@ -44,8 +44,7 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, s
} }
key := fmt.Sprintf( key := fmt.Sprintf(
"%d_%s:%s_%s:%s_%d", "%s_%s_%s_%s_%d",
counterPair.StreamId,
tcpID.DstIP, tcpID.DstIP,
tcpID.DstPort, tcpID.DstPort,
tcpID.SrcIP, tcpID.SrcIP,

View File

@ -6,15 +6,14 @@ import (
"github.com/up9inc/mizu/tap/api" "github.com/up9inc/mizu/tap/api"
) )
func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket) error { func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket, reqResMatcher *requestResponseMatcher) error {
counterPair.Lock() counterPair.Lock()
counterPair.Request++ counterPair.Request++
requestCounter := counterPair.Request requestCounter := counterPair.Request
counterPair.Unlock() counterPair.Unlock()
ident := fmt.Sprintf( ident := fmt.Sprintf(
"%d_%s:%s_%s:%s_%d", "%s_%s_%s_%s_%d",
counterPair.StreamId,
tcpID.SrcIP, tcpID.SrcIP,
tcpID.DstIP, tcpID.DstIP,
tcpID.SrcPort, tcpID.SrcPort,
@ -36,15 +35,14 @@ func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTim
return nil return nil
} }
func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket) error { func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket, reqResMatcher *requestResponseMatcher) error {
counterPair.Lock() counterPair.Lock()
counterPair.Response++ counterPair.Response++
responseCounter := counterPair.Response responseCounter := counterPair.Response
counterPair.Unlock() counterPair.Unlock()
ident := fmt.Sprintf( ident := fmt.Sprintf(
"%d_%s:%s_%s:%s_%d", "%s_%s_%s_%s_%d",
counterPair.StreamId,
tcpID.DstIP, tcpID.DstIP,
tcpID.SrcIP, tcpID.SrcIP,
tcpID.DstPort, tcpID.DstPort,

View File

@ -32,14 +32,14 @@ type dissecting string
func (d dissecting) Register(extension *api.Extension) { func (d dissecting) Register(extension *api.Extension) {
extension.Protocol = &protocol extension.Protocol = &protocol
extension.MatcherMap = reqResMatcher.openMessagesMap
} }
func (d dissecting) Ping() { func (d dissecting) Ping() {
log.Printf("pong %s", protocol.Name) log.Printf("pong %s", protocol.Name)
} }
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error { func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
is := &RedisInputStream{ is := &RedisInputStream{
Reader: b, Reader: b,
Buf: make([]byte, 8192), Buf: make([]byte, 8192),
@ -52,9 +52,9 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
} }
if isClient { if isClient {
err = handleClientStream(tcpID, counterPair, superTimer, emitter, redisPacket) err = handleClientStream(tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher)
} else { } else {
err = handleServerStream(tcpID, counterPair, superTimer, emitter, redisPacket) err = handleServerStream(tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher)
} }
if err != nil { if err != nil {
@ -127,6 +127,10 @@ func (d dissecting) Macros() map[string]string {
} }
} }
func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher {
return createResponseRequestMatcher()
}
var Dissector dissecting var Dissector dissecting
func NewDissector() api.Dissector { func NewDissector() api.Dissector {

View File

@ -7,16 +7,17 @@ import (
"github.com/up9inc/mizu/tap/api" "github.com/up9inc/mizu/tap/api"
) )
var reqResMatcher = createResponseRequestMatcher() // global // Key is `{src_ip}_{dst_ip}_{src_ip}_{src_port}_{incremental_counter}`
// Key is `{stream_id}_{src_ip}:{dst_ip}_{src_ip}:{src_port}_{incremental_counter}`
type requestResponseMatcher struct { type requestResponseMatcher struct {
openMessagesMap *sync.Map openMessagesMap *sync.Map
} }
func createResponseRequestMatcher() requestResponseMatcher { func createResponseRequestMatcher() api.RequestResponseMatcher {
newMatcher := &requestResponseMatcher{openMessagesMap: &sync.Map{}} return &requestResponseMatcher{openMessagesMap: &sync.Map{}}
return *newMatcher }
func (matcher *requestResponseMatcher) GetMap() *sync.Map {
return matcher.openMessagesMap
} }
func (matcher *requestResponseMatcher) registerRequest(ident string, request *RedisPacket, captureTime time.Time) *api.OutputChannelItem { func (matcher *requestResponseMatcher) registerRequest(ident string, request *RedisPacket, captureTime time.Time) *api.OutputChannelItem {

View File

@ -210,6 +210,7 @@ func startPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem)
assemblerMutex: &assembler.assemblerMutex, assemblerMutex: &assembler.assemblerMutex,
cleanPeriod: cleanPeriod, cleanPeriod: cleanPeriod,
connectionTimeout: staleConnectionTimeout, connectionTimeout: staleConnectionTimeout,
streamsMap: streamsMap,
} }
cleaner.start() cleaner.start()

View File

@ -47,6 +47,7 @@ type tcpReader struct {
extension *api.Extension extension *api.Extension
emitter api.Emitter emitter api.Emitter
counterPair *api.CounterPair counterPair *api.CounterPair
reqResMatcher api.RequestResponseMatcher
sync.Mutex sync.Mutex
} }
@ -94,7 +95,7 @@ func (h *tcpReader) Close() {
func (h *tcpReader) run(wg *sync.WaitGroup) { func (h *tcpReader) run(wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
b := bufio.NewReader(h) b := bufio.NewReader(h)
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter, filteringOptions) err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter, filteringOptions, h.reqResMatcher)
if err != nil { if err != nil {
_, err = io.Copy(ioutil.Discard, b) _, err = io.Copy(ioutil.Discard, b)
if err != nil { if err != nil {

View File

@ -30,6 +30,7 @@ type tcpStreamFactory struct {
type tcpStreamWrapper struct { type tcpStreamWrapper struct {
stream *tcpStream stream *tcpStream
reqResMatcher api.RequestResponseMatcher
createdAt time.Time createdAt time.Time
} }
@ -81,8 +82,8 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
if stream.isTapTarget { if stream.isTapTarget {
stream.id = factory.streamsMap.nextId() stream.id = factory.streamsMap.nextId()
for i, extension := range extensions { for i, extension := range extensions {
reqResMatcher := extension.Dissector.NewResponseRequestMatcher()
counterPair := &api.CounterPair{ counterPair := &api.CounterPair{
StreamId: stream.id,
Request: 0, Request: 0,
Response: 0, Response: 0,
} }
@ -103,6 +104,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
extension: extension, extension: extension,
emitter: factory.Emitter, emitter: factory.Emitter,
counterPair: counterPair, counterPair: counterPair,
reqResMatcher: reqResMatcher,
}) })
stream.servers = append(stream.servers, tcpReader{ stream.servers = append(stream.servers, tcpReader{
msgQueue: make(chan tcpReaderDataMsg), msgQueue: make(chan tcpReaderDataMsg),
@ -121,10 +123,12 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
extension: extension, extension: extension,
emitter: factory.Emitter, emitter: factory.Emitter,
counterPair: counterPair, counterPair: counterPair,
reqResMatcher: reqResMatcher,
}) })
factory.streamsMap.Store(stream.id, &tcpStreamWrapper{ factory.streamsMap.Store(stream.id, &tcpStreamWrapper{
stream: stream, stream: stream,
reqResMatcher: reqResMatcher,
createdAt: time.Now(), createdAt: time.Now(),
}) })