From 041223b5586df171607b6feadb911eb7becbceb4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=2E=20Mert=20Y=C4=B1ld=C4=B1ran?= Date: Mon, 14 Feb 2022 18:16:58 +0300 Subject: [PATCH] Move the request-response matcher's scope from global-level to TCP stream-level (#793) * Create a new request-response matcher for each TCP stream * Fix the `ident` formats in request-response matchers * Don't sort the items in the HTTP tests * Update tap/extensions/kafka/matcher.go Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com> * Temporarily change the bucket folder to the new expected * Bring back the `deleteOlderThan` method * Use `api.RequestResponseMatcher` instead of `interface{}` as type * Use `api.RequestResponseMatcher` instead of `interface{}` as type (more) * Update the key format comments Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com> --- tap/api/api.go | 15 +++++++++------ tap/cleaner.go | 12 +++++++++--- tap/extensions/amqp/main.go | 6 +++++- tap/extensions/http/Makefile | 2 +- tap/extensions/http/handlers.go | 16 +++++++--------- tap/extensions/http/main.go | 17 +++++++++++------ tap/extensions/http/main_test.go | 15 +++------------ tap/extensions/http/matcher.go | 13 +++++++------ tap/extensions/kafka/main.go | 12 ++++++++---- tap/extensions/kafka/matcher.go | 14 +++++++++----- tap/extensions/kafka/request.go | 5 ++--- tap/extensions/kafka/response.go | 5 ++--- tap/extensions/redis/handlers.go | 10 ++++------ tap/extensions/redis/main.go | 12 ++++++++---- tap/extensions/redis/matcher.go | 13 +++++++------ tap/passive_tapper.go | 1 + tap/tcp_reader.go | 3 ++- tap/tcp_stream_factory.go | 14 +++++++++----- 18 files changed, 104 insertions(+), 81 deletions(-) diff --git a/tap/api/api.go b/tap/api/api.go index 52c3cac11..19feed2ce 100644 --- a/tap/api/api.go +++ b/tap/api/api.go @@ -39,10 +39,9 @@ type TCP struct { } type Extension struct { - Protocol *Protocol - Path string - Dissector Dissector - MatcherMap *sync.Map + Protocol *Protocol + Path string + Dissector Dissector } type ConnectionInfo struct { @@ -62,7 +61,6 @@ type TcpID struct { } type CounterPair struct { - StreamId int64 Request uint Response uint sync.Mutex @@ -100,10 +98,15 @@ type SuperIdentifier struct { type Dissector interface { Register(*Extension) Ping() - Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions) error + Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions, reqResMatcher RequestResponseMatcher) error Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string) *Entry Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) Macros() map[string]string + NewResponseRequestMatcher() RequestResponseMatcher +} + +type RequestResponseMatcher interface { + GetMap() *sync.Map } type Emitting struct { diff --git a/tap/cleaner.go b/tap/cleaner.go index 61be717f3..cdf2acf20 100644 --- a/tap/cleaner.go +++ b/tap/cleaner.go @@ -22,6 +22,7 @@ type Cleaner struct { connectionTimeout time.Duration stats CleanerStats statsMutex sync.Mutex + streamsMap *tcpStreamMap } func (cl *Cleaner) clean() { @@ -32,10 +33,15 @@ func (cl *Cleaner) clean() { flushed, closed := cl.assembler.FlushCloseOlderThan(startCleanTime.Add(-cl.connectionTimeout)) cl.assemblerMutex.Unlock() - for _, extension := range extensions { - deleted := deleteOlderThan(extension.MatcherMap, startCleanTime.Add(-cl.connectionTimeout)) + cl.streamsMap.streams.Range(func(k, v interface{}) bool { + reqResMatcher := v.(*tcpStreamWrapper).reqResMatcher + if reqResMatcher == nil { + return true + } + deleted := deleteOlderThan(reqResMatcher.GetMap(), startCleanTime.Add(-cl.connectionTimeout)) cl.stats.deleted += deleted - } + return true + }) cl.statsMutex.Lock() logger.Log.Debugf("Assembler Stats after cleaning %s", cl.assembler.Dump()) diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index 9ca023a4f..581b9b4af 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -42,7 +42,7 @@ func (d dissecting) Ping() { const amqpRequest string = "amqp_request" -func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error { +func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error { r := AmqpReader{b} var remaining int @@ -300,6 +300,10 @@ func (d dissecting) Macros() map[string]string { } } +func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher { + return nil +} + var Dissector dissecting func NewDissector() api.Dissector { diff --git a/tap/extensions/http/Makefile b/tap/extensions/http/Makefile index 529cc27ef..253910d58 100644 --- a/tap/extensions/http/Makefile +++ b/tap/extensions/http/Makefile @@ -13,4 +13,4 @@ test-pull-bin: test-pull-expect: @mkdir -p expect - @[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect/http/\* expect + @[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect2/http/\* expect diff --git a/tap/extensions/http/handlers.go b/tap/extensions/http/handlers.go index 8d084be77..7ffa5fee5 100644 --- a/tap/extensions/http/handlers.go +++ b/tap/extensions/http/handlers.go @@ -47,7 +47,7 @@ func replaceForwardedFor(item *api.OutputChannelItem) { item.ConnectionInfo.ClientPort = "" } -func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) error { +func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) error { streamID, messageHTTP1, isGrpc, err := http2Assembler.readMessage() if err != nil { return err @@ -58,7 +58,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi switch messageHTTP1 := messageHTTP1.(type) { case http.Request: ident := fmt.Sprintf( - "%s->%s %s->%s %d %s", + "%s_%s_%s_%s_%d_%s", tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, @@ -78,7 +78,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi } case http.Response: ident := fmt.Sprintf( - "%s->%s %s->%s %d %s", + "%s_%s_%s_%s_%d_%s", tcpID.DstIP, tcpID.SrcIP, tcpID.DstPort, @@ -110,7 +110,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi return nil } -func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) (switchingProtocolsHTTP2 bool, req *http.Request, err error) { +func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, req *http.Request, err error) { req, err = http.ReadRequest(b) if err != nil { return @@ -130,8 +130,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api req.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind ident := fmt.Sprintf( - "%d_%s:%s_%s:%s_%d_%s", - counterPair.StreamId, + "%s_%s_%s_%s_%d_%s", tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, @@ -153,7 +152,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api return } -func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) (switchingProtocolsHTTP2 bool, err error) { +func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, err error) { var res *http.Response res, err = http.ReadResponse(b, nil) if err != nil { @@ -174,8 +173,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api res.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind ident := fmt.Sprintf( - "%d_%s:%s_%s:%s_%d_%s", - counterPair.StreamId, + "%s_%s_%s_%s_%d_%s", tcpID.DstIP, tcpID.SrcIP, tcpID.DstPort, diff --git a/tap/extensions/http/main.go b/tap/extensions/http/main.go index 28c8a1744..32664cb61 100644 --- a/tap/extensions/http/main.go +++ b/tap/extensions/http/main.go @@ -84,14 +84,15 @@ type dissecting string func (d dissecting) Register(extension *api.Extension) { extension.Protocol = &http11protocol - extension.MatcherMap = reqResMatcher.openMessagesMap } func (d dissecting) Ping() { log.Printf("pong %s", http11protocol.Name) } -func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error { +func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error { + reqResMatcher := _reqResMatcher.(*requestResponseMatcher) + var err error isHTTP2, _ := checkIsHTTP2Connection(b, isClient) @@ -124,7 +125,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co } if isHTTP2 { - err = handleHTTP2Stream(http2Assembler, tcpID, superTimer, emitter, options) + err = handleHTTP2Stream(http2Assembler, tcpID, superTimer, emitter, options, reqResMatcher) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { @@ -133,7 +134,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co superIdentifier.Protocol = &http11protocol } else if isClient { var req *http.Request - switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter, options) + switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter, options, reqResMatcher) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { @@ -144,7 +145,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co // In case of an HTTP2 upgrade, duplicate the HTTP1 request into HTTP2 with stream ID 1 if switchingProtocolsHTTP2 { ident := fmt.Sprintf( - "%s->%s %s->%s 1 %s", + "%s_%s_%s_%s_1_%s", tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, @@ -164,7 +165,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co } } } else { - switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter, options) + switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter, options, reqResMatcher) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { @@ -472,6 +473,10 @@ func (d dissecting) Macros() map[string]string { } } +func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher { + return createResponseRequestMatcher() +} + var Dissector dissecting func NewDissector() api.Dissector { diff --git a/tap/extensions/http/main_test.go b/tap/extensions/http/main_test.go index 97cbc6430..f1934a994 100644 --- a/tap/extensions/http/main_test.go +++ b/tap/extensions/http/main_test.go @@ -11,7 +11,6 @@ import ( "os" "path" "path/filepath" - "sort" "testing" "time" @@ -39,7 +38,6 @@ func TestRegister(t *testing.T) { extension := &api.Extension{} dissector.Register(extension) assert.Equal(t, "http", extension.Protocol.Name) - assert.NotNil(t, extension.MatcherMap) } func TestMacros(t *testing.T) { @@ -123,7 +121,8 @@ func TestDissect(t *testing.T) { SrcPort: "1", DstPort: "2", } - err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options) + reqResMatcher := dissector.NewResponseRequestMatcher() + err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { panic(err) } @@ -141,7 +140,7 @@ func TestDissect(t *testing.T) { SrcPort: "2", DstPort: "1", } - err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options) + err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { panic(err) } @@ -155,14 +154,6 @@ func TestDissect(t *testing.T) { stop <- true - sort.Slice(items, func(i, j int) bool { - iMarshaled, err := json.Marshal(items[i]) - assert.Nil(t, err) - jMarshaled, err := json.Marshal(items[j]) - assert.Nil(t, err) - return len(iMarshaled) < len(jMarshaled) - }) - marshaled, err := json.Marshal(items) assert.Nil(t, err) diff --git a/tap/extensions/http/matcher.go b/tap/extensions/http/matcher.go index 0a28a65ac..67c09136a 100644 --- a/tap/extensions/http/matcher.go +++ b/tap/extensions/http/matcher.go @@ -8,16 +8,17 @@ import ( "github.com/up9inc/mizu/tap/api" ) -var reqResMatcher = createResponseRequestMatcher() // global - -// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}_{incremental_counter} +// Key is {client_addr}_{client_port}_{dest_addr}_{dest_port}_{incremental_counter}_{proto_ident} type requestResponseMatcher struct { openMessagesMap *sync.Map } -func createResponseRequestMatcher() requestResponseMatcher { - newMatcher := &requestResponseMatcher{openMessagesMap: &sync.Map{}} - return *newMatcher +func createResponseRequestMatcher() api.RequestResponseMatcher { + return &requestResponseMatcher{openMessagesMap: &sync.Map{}} +} + +func (matcher *requestResponseMatcher) GetMap() *sync.Map { + return matcher.openMessagesMap } func (matcher *requestResponseMatcher) registerRequest(ident string, request *http.Request, captureTime time.Time, protoMinor int) *api.OutputChannelItem { diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index 84a764064..3fdadd364 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -33,27 +33,27 @@ type dissecting string func (d dissecting) Register(extension *api.Extension) { extension.Protocol = &_protocol - extension.MatcherMap = reqResMatcher.openMessagesMap } func (d dissecting) Ping() { log.Printf("pong %s", _protocol.Name) } -func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error { +func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error { + reqResMatcher := _reqResMatcher.(*requestResponseMatcher) for { if superIdentifier.Protocol != nil && superIdentifier.Protocol != &_protocol { return errors.New("Identified by another protocol") } if isClient { - _, _, err := ReadRequest(b, tcpID, counterPair, superTimer) + _, _, err := ReadRequest(b, tcpID, counterPair, superTimer, reqResMatcher) if err != nil { return err } superIdentifier.Protocol = &_protocol } else { - err := ReadResponse(b, tcpID, counterPair, superTimer, emitter) + err := ReadResponse(b, tcpID, counterPair, superTimer, emitter, reqResMatcher) if err != nil { return err } @@ -215,6 +215,10 @@ func (d dissecting) Macros() map[string]string { } } +func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher { + return createResponseRequestMatcher() +} + var Dissector dissecting func NewDissector() api.Dissector { diff --git a/tap/extensions/kafka/matcher.go b/tap/extensions/kafka/matcher.go index 8bf8914bd..d5c77618a 100644 --- a/tap/extensions/kafka/matcher.go +++ b/tap/extensions/kafka/matcher.go @@ -3,9 +3,10 @@ package kafka import ( "sync" "time" + + "github.com/up9inc/mizu/tap/api" ) -var reqResMatcher = CreateResponseRequestMatcher() // global const maxTry int = 3000 type RequestResponsePair struct { @@ -13,14 +14,17 @@ type RequestResponsePair struct { Response Response } -// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}::{correlation_id} +// Key is {client_addr}_{client_port}_{dest_addr}_{dest_port}_{correlation_id} type requestResponseMatcher struct { openMessagesMap *sync.Map } -func CreateResponseRequestMatcher() requestResponseMatcher { - newMatcher := &requestResponseMatcher{openMessagesMap: &sync.Map{}} - return *newMatcher +func createResponseRequestMatcher() api.RequestResponseMatcher { + return &requestResponseMatcher{openMessagesMap: &sync.Map{}} +} + +func (matcher *requestResponseMatcher) GetMap() *sync.Map { + return matcher.openMessagesMap } func (matcher *requestResponseMatcher) registerRequest(key string, request *Request) *RequestResponsePair { diff --git a/tap/extensions/kafka/request.go b/tap/extensions/kafka/request.go index 982312936..362d9e1df 100644 --- a/tap/extensions/kafka/request.go +++ b/tap/extensions/kafka/request.go @@ -19,7 +19,7 @@ type Request struct { CaptureTime time.Time `json:"captureTime"` } -func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer) (apiKey ApiKey, apiVersion int16, err error) { +func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, reqResMatcher *requestResponseMatcher) (apiKey ApiKey, apiVersion int16, err error) { d := &decoder{reader: r, remain: 4} size := d.readInt32() @@ -214,8 +214,7 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, su } key := fmt.Sprintf( - "%d_%s:%s_%s:%s_%d", - counterPair.StreamId, + "%s_%s_%s_%s_%d", tcpID.SrcIP, tcpID.SrcPort, tcpID.DstIP, diff --git a/tap/extensions/kafka/response.go b/tap/extensions/kafka/response.go index dd9909034..0eb7950c7 100644 --- a/tap/extensions/kafka/response.go +++ b/tap/extensions/kafka/response.go @@ -16,7 +16,7 @@ type Response struct { CaptureTime time.Time `json:"captureTime"` } -func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) (err error) { +func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, reqResMatcher *requestResponseMatcher) (err error) { d := &decoder{reader: r, remain: 4} size := d.readInt32() @@ -44,8 +44,7 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, s } key := fmt.Sprintf( - "%d_%s:%s_%s:%s_%d", - counterPair.StreamId, + "%s_%s_%s_%s_%d", tcpID.DstIP, tcpID.DstPort, tcpID.SrcIP, diff --git a/tap/extensions/redis/handlers.go b/tap/extensions/redis/handlers.go index a4a3a3858..c4eb7985e 100644 --- a/tap/extensions/redis/handlers.go +++ b/tap/extensions/redis/handlers.go @@ -6,15 +6,14 @@ import ( "github.com/up9inc/mizu/tap/api" ) -func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket) error { +func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket, reqResMatcher *requestResponseMatcher) error { counterPair.Lock() counterPair.Request++ requestCounter := counterPair.Request counterPair.Unlock() ident := fmt.Sprintf( - "%d_%s:%s_%s:%s_%d", - counterPair.StreamId, + "%s_%s_%s_%s_%d", tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, @@ -36,15 +35,14 @@ func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTim return nil } -func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket) error { +func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket, reqResMatcher *requestResponseMatcher) error { counterPair.Lock() counterPair.Response++ responseCounter := counterPair.Response counterPair.Unlock() ident := fmt.Sprintf( - "%d_%s:%s_%s:%s_%d", - counterPair.StreamId, + "%s_%s_%s_%s_%d", tcpID.DstIP, tcpID.SrcIP, tcpID.DstPort, diff --git a/tap/extensions/redis/main.go b/tap/extensions/redis/main.go index 6de87739a..c27cd431f 100644 --- a/tap/extensions/redis/main.go +++ b/tap/extensions/redis/main.go @@ -32,14 +32,14 @@ type dissecting string func (d dissecting) Register(extension *api.Extension) { extension.Protocol = &protocol - extension.MatcherMap = reqResMatcher.openMessagesMap } func (d dissecting) Ping() { log.Printf("pong %s", protocol.Name) } -func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error { +func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error { + reqResMatcher := _reqResMatcher.(*requestResponseMatcher) is := &RedisInputStream{ Reader: b, Buf: make([]byte, 8192), @@ -52,9 +52,9 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co } if isClient { - err = handleClientStream(tcpID, counterPair, superTimer, emitter, redisPacket) + err = handleClientStream(tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher) } else { - err = handleServerStream(tcpID, counterPair, superTimer, emitter, redisPacket) + err = handleServerStream(tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher) } if err != nil { @@ -127,6 +127,10 @@ func (d dissecting) Macros() map[string]string { } } +func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher { + return createResponseRequestMatcher() +} + var Dissector dissecting func NewDissector() api.Dissector { diff --git a/tap/extensions/redis/matcher.go b/tap/extensions/redis/matcher.go index 66e1c423b..e63c2f4b2 100644 --- a/tap/extensions/redis/matcher.go +++ b/tap/extensions/redis/matcher.go @@ -7,16 +7,17 @@ import ( "github.com/up9inc/mizu/tap/api" ) -var reqResMatcher = createResponseRequestMatcher() // global - -// Key is `{stream_id}_{src_ip}:{dst_ip}_{src_ip}:{src_port}_{incremental_counter}` +// Key is `{src_ip}_{dst_ip}_{src_ip}_{src_port}_{incremental_counter}` type requestResponseMatcher struct { openMessagesMap *sync.Map } -func createResponseRequestMatcher() requestResponseMatcher { - newMatcher := &requestResponseMatcher{openMessagesMap: &sync.Map{}} - return *newMatcher +func createResponseRequestMatcher() api.RequestResponseMatcher { + return &requestResponseMatcher{openMessagesMap: &sync.Map{}} +} + +func (matcher *requestResponseMatcher) GetMap() *sync.Map { + return matcher.openMessagesMap } func (matcher *requestResponseMatcher) registerRequest(ident string, request *RedisPacket, captureTime time.Time) *api.OutputChannelItem { diff --git a/tap/passive_tapper.go b/tap/passive_tapper.go index 2a0a142ce..2779295e9 100644 --- a/tap/passive_tapper.go +++ b/tap/passive_tapper.go @@ -210,6 +210,7 @@ func startPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem) assemblerMutex: &assembler.assemblerMutex, cleanPeriod: cleanPeriod, connectionTimeout: staleConnectionTimeout, + streamsMap: streamsMap, } cleaner.start() diff --git a/tap/tcp_reader.go b/tap/tcp_reader.go index bacda8ac8..ceb94e98e 100644 --- a/tap/tcp_reader.go +++ b/tap/tcp_reader.go @@ -47,6 +47,7 @@ type tcpReader struct { extension *api.Extension emitter api.Emitter counterPair *api.CounterPair + reqResMatcher api.RequestResponseMatcher sync.Mutex } @@ -94,7 +95,7 @@ func (h *tcpReader) Close() { func (h *tcpReader) run(wg *sync.WaitGroup) { defer wg.Done() b := bufio.NewReader(h) - err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter, filteringOptions) + err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter, filteringOptions, h.reqResMatcher) if err != nil { _, err = io.Copy(ioutil.Discard, b) if err != nil { diff --git a/tap/tcp_stream_factory.go b/tap/tcp_stream_factory.go index ce56dcec8..9073e013b 100644 --- a/tap/tcp_stream_factory.go +++ b/tap/tcp_stream_factory.go @@ -29,8 +29,9 @@ type tcpStreamFactory struct { } type tcpStreamWrapper struct { - stream *tcpStream - createdAt time.Time + stream *tcpStream + reqResMatcher api.RequestResponseMatcher + createdAt time.Time } func NewTcpStreamFactory(emitter api.Emitter, streamsMap *tcpStreamMap, opts *TapOpts) *tcpStreamFactory { @@ -81,8 +82,8 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T if stream.isTapTarget { stream.id = factory.streamsMap.nextId() for i, extension := range extensions { + reqResMatcher := extension.Dissector.NewResponseRequestMatcher() counterPair := &api.CounterPair{ - StreamId: stream.id, Request: 0, Response: 0, } @@ -103,6 +104,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T extension: extension, emitter: factory.Emitter, counterPair: counterPair, + reqResMatcher: reqResMatcher, }) stream.servers = append(stream.servers, tcpReader{ msgQueue: make(chan tcpReaderDataMsg), @@ -121,11 +123,13 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T extension: extension, emitter: factory.Emitter, counterPair: counterPair, + reqResMatcher: reqResMatcher, }) factory.streamsMap.Store(stream.id, &tcpStreamWrapper{ - stream: stream, - createdAt: time.Now(), + stream: stream, + reqResMatcher: reqResMatcher, + createdAt: time.Now(), }) factory.wg.Add(2)