diff --git a/tap/api/api.go b/tap/api/api.go index 4ef0b5d2f..f468a021a 100644 --- a/tap/api/api.go +++ b/tap/api/api.go @@ -58,8 +58,10 @@ type TcpID struct { } type CounterPair struct { + StreamId int64 Request uint Response uint + sync.Mutex } type GenericMessage struct { diff --git a/tap/extensions/amqp/helpers.go b/tap/extensions/amqp/helpers.go index 21bf81158..3b0a5bd1e 100644 --- a/tap/extensions/amqp/helpers.go +++ b/tap/extensions/amqp/helpers.go @@ -362,7 +362,7 @@ func representBasicDeliver(event map[string]interface{}) []interface{} { for name, value := range properties["headers"].(map[string]interface{}) { headers = append(headers, api.TableData{ Name: name, - Value: value.(string), + Value: value, Selector: fmt.Sprintf(`request.properties.headers["%s"]`, name), }) } diff --git a/tap/extensions/http/handlers.go b/tap/extensions/http/handlers.go index 094f07951..8d084be77 100644 --- a/tap/extensions/http/handlers.go +++ b/tap/extensions/http/handlers.go @@ -115,7 +115,10 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api if err != nil { return } + counterPair.Lock() counterPair.Request++ + requestCounter := counterPair.Request + counterPair.Unlock() // Check HTTP2 upgrade - HTTP2 Over Cleartext (H2C) if strings.Contains(strings.ToLower(req.Header.Get("Connection")), "upgrade") && strings.ToLower(req.Header.Get("Upgrade")) == "h2c" { @@ -127,12 +130,13 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api req.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind ident := fmt.Sprintf( - "%s->%s %s->%s %d %s", + "%d_%s:%s_%s:%s_%d_%s", + counterPair.StreamId, tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, tcpID.DstPort, - counterPair.Request, + requestCounter, "HTTP1", ) item := reqResMatcher.registerRequest(ident, req, superTimer.CaptureTime, req.ProtoMinor) @@ -155,7 +159,10 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api if err != nil { return } + counterPair.Lock() counterPair.Response++ + responseCounter := counterPair.Response + counterPair.Unlock() // Check HTTP2 upgrade - HTTP2 Over Cleartext (H2C) if res.StatusCode == 101 && strings.Contains(strings.ToLower(res.Header.Get("Connection")), "upgrade") && strings.ToLower(res.Header.Get("Upgrade")) == "h2c" { @@ -167,12 +174,13 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api res.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind ident := fmt.Sprintf( - "%s->%s %s->%s %d %s", + "%d_%s:%s_%s:%s_%d_%s", + counterPair.StreamId, tcpID.DstIP, tcpID.SrcIP, tcpID.DstPort, tcpID.SrcPort, - counterPair.Response, + responseCounter, "HTTP1", ) item := reqResMatcher.registerResponse(ident, res, superTimer.CaptureTime, res.ProtoMinor) diff --git a/tap/extensions/http/matcher.go b/tap/extensions/http/matcher.go index 0138b9b7d..0a28a65ac 100644 --- a/tap/extensions/http/matcher.go +++ b/tap/extensions/http/matcher.go @@ -1,9 +1,7 @@ package http import ( - "fmt" "net/http" - "strings" "sync" "time" @@ -23,9 +21,6 @@ func createResponseRequestMatcher() requestResponseMatcher { } func (matcher *requestResponseMatcher) registerRequest(ident string, request *http.Request, captureTime time.Time, protoMinor int) *api.OutputChannelItem { - split := splitIdent(ident) - key := genKey(split) - requestHTTPMessage := api.GenericMessage{ IsRequest: true, CaptureTime: captureTime, @@ -35,7 +30,7 @@ func (matcher *requestResponseMatcher) registerRequest(ident string, request *ht }, } - if response, found := matcher.openMessagesMap.LoadAndDelete(key); found { + if response, found := matcher.openMessagesMap.LoadAndDelete(ident); found { // Type assertion always succeeds because all of the map's values are of api.GenericMessage type responseHTTPMessage := response.(*api.GenericMessage) if responseHTTPMessage.IsRequest { @@ -44,14 +39,11 @@ func (matcher *requestResponseMatcher) registerRequest(ident string, request *ht return matcher.preparePair(&requestHTTPMessage, responseHTTPMessage, protoMinor) } - matcher.openMessagesMap.Store(key, &requestHTTPMessage) + matcher.openMessagesMap.Store(ident, &requestHTTPMessage) return nil } func (matcher *requestResponseMatcher) registerResponse(ident string, response *http.Response, captureTime time.Time, protoMinor int) *api.OutputChannelItem { - split := splitIdent(ident) - key := genKey(split) - responseHTTPMessage := api.GenericMessage{ IsRequest: false, CaptureTime: captureTime, @@ -61,7 +53,7 @@ func (matcher *requestResponseMatcher) registerResponse(ident string, response * }, } - if request, found := matcher.openMessagesMap.LoadAndDelete(key); found { + if request, found := matcher.openMessagesMap.LoadAndDelete(ident); found { // Type assertion always succeeds because all of the map's values are of api.GenericMessage type requestHTTPMessage := request.(*api.GenericMessage) if !requestHTTPMessage.IsRequest { @@ -70,7 +62,7 @@ func (matcher *requestResponseMatcher) registerResponse(ident string, response * return matcher.preparePair(requestHTTPMessage, &responseHTTPMessage, protoMinor) } - matcher.openMessagesMap.Store(key, &responseHTTPMessage) + matcher.openMessagesMap.Store(ident, &responseHTTPMessage) return nil } @@ -89,13 +81,3 @@ func (matcher *requestResponseMatcher) preparePair(requestHTTPMessage *api.Gener }, } } - -func splitIdent(ident string) []string { - ident = strings.Replace(ident, "->", " ", -1) - return strings.Split(ident, " ") -} - -func genKey(split []string) string { - key := fmt.Sprintf("%s:%s->%s:%s,%s%s", split[0], split[2], split[1], split[3], split[4], split[5]) - return key -} diff --git a/tap/extensions/kafka/helpers.go b/tap/extensions/kafka/helpers.go index 82c61407b..6bba27eb2 100644 --- a/tap/extensions/kafka/helpers.go +++ b/tap/extensions/kafka/helpers.go @@ -368,24 +368,26 @@ func representProduceRequest(data map[string]interface{}) []interface{} { } recordsResults := recordsPath.Get(obj) if len(recordsResults) > 0 { - records := recordsResults[0].([]interface{}) - for i, _record := range records { - record := _record.(map[string]interface{}) - value := record["value"] - delete(record, "value") + if recordsResults[0] != nil { + records := recordsResults[0].([]interface{}) + for i, _record := range records { + record := _record.(map[string]interface{}) + value := record["value"] + delete(record, "value") - rep = append(rep, api.SectionData{ - Type: api.TABLE, - Title: fmt.Sprintf("Record [%d] Details (topic: %s)", i, topicName), - Data: representMapAsTable(record, fmt.Sprintf(`request.payload.topicData.partitions.partitionData.records.recordBatch.record[%d]`, i), []string{"value"}), - }) + rep = append(rep, api.SectionData{ + Type: api.TABLE, + Title: fmt.Sprintf("Record [%d] Details (topic: %s)", i, topicName), + Data: representMapAsTable(record, fmt.Sprintf(`request.payload.topicData.partitions.partitionData.records.recordBatch.record[%d]`, i), []string{"value"}), + }) - rep = append(rep, api.SectionData{ - Type: api.BODY, - Title: fmt.Sprintf("Record [%d] Value", i), - Data: value.(string), - Selector: fmt.Sprintf(`request.payload.topicData.partitions.partitionData.records.recordBatch.record[%d].value`, i), - }) + rep = append(rep, api.SectionData{ + Type: api.BODY, + Title: fmt.Sprintf("Record [%d] Value", i), + Data: value.(string), + Selector: fmt.Sprintf(`request.payload.topicData.partitions.partitionData.records.recordBatch.record[%d].value`, i), + }) + } } } } @@ -614,22 +616,24 @@ func representFetchResponse(data map[string]interface{}) []interface{} { Data: representMapAsTable(recordBatch, fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d].recordSet.recordBatch`, i, j), []string{"record"}), }) - for k, _record := range recordBatch["record"].([]interface{}) { - record := _record.(map[string]interface{}) - value := record["value"] + if recordBatch["record"] != nil { + for k, _record := range recordBatch["record"].([]interface{}) { + record := _record.(map[string]interface{}) + value := record["value"] - rep = append(rep, api.SectionData{ - Type: api.TABLE, - Title: fmt.Sprintf("Response [%d] Partition Response [%d] Record [%d] (topic: %s)", i, j, k, topicName), - Data: representMapAsTable(record, fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d].recordSet.recordBatch.record[%d]`, i, j, k), []string{"value"}), - }) + rep = append(rep, api.SectionData{ + Type: api.TABLE, + Title: fmt.Sprintf("Response [%d] Partition Response [%d] Record [%d] (topic: %s)", i, j, k, topicName), + Data: representMapAsTable(record, fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d].recordSet.recordBatch.record[%d]`, i, j, k), []string{"value"}), + }) - rep = append(rep, api.SectionData{ - Type: api.BODY, - Title: fmt.Sprintf("Response [%d] Partition Response [%d] Record [%d] Value (topic: %s)", i, j, k, topicName), - Data: value.(string), - Selector: fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d].recordSet.recordBatch.record[%d].value`, i, j, k), - }) + rep = append(rep, api.SectionData{ + Type: api.BODY, + Title: fmt.Sprintf("Response [%d] Partition Response [%d] Record [%d] Value (topic: %s)", i, j, k, topicName), + Data: value.(string), + Selector: fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d].recordSet.recordBatch.record[%d].value`, i, j, k), + }) + } } } } @@ -730,6 +734,9 @@ func representCreateTopicsRequest(data map[string]interface{}) []interface{} { Data: string(repPayload), }) + if payload["topics"] == nil { + return rep + } for i, _topic := range payload["topics"].([]interface{}) { topic := _topic.(map[string]interface{}) @@ -766,6 +773,9 @@ func representCreateTopicsResponse(data map[string]interface{}) []interface{} { Data: string(repPayload), }) + if payload["topics"] == nil { + return rep + } for i, _topic := range payload["topics"].([]interface{}) { topic := _topic.(map[string]interface{}) diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index 9aac34a5d..84a764064 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -47,13 +47,13 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co } if isClient { - _, _, err := ReadRequest(b, tcpID, superTimer) + _, _, err := ReadRequest(b, tcpID, counterPair, superTimer) if err != nil { return err } superIdentifier.Protocol = &_protocol } else { - err := ReadResponse(b, tcpID, superTimer, emitter) + err := ReadResponse(b, tcpID, counterPair, superTimer, emitter) if err != nil { return err } @@ -120,7 +120,11 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, summary = summary[:len(summary)-2] } case CreateTopics: - topics := reqDetails["payload"].(map[string]interface{})["topics"].([]interface{}) + _topics := reqDetails["payload"].(map[string]interface{})["topics"] + if _topics == nil { + break + } + topics := _topics.([]interface{}) for _, topic := range topics { summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["name"].(string)) } @@ -128,6 +132,9 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, summary = summary[:len(summary)-2] } case DeleteTopics: + if reqDetails["topicNames"] == nil { + break + } topicNames := reqDetails["topicNames"].([]string) for _, name := range topicNames { summary += fmt.Sprintf("%s, ", name) diff --git a/tap/extensions/kafka/request.go b/tap/extensions/kafka/request.go index 4d3819d3b..982312936 100644 --- a/tap/extensions/kafka/request.go +++ b/tap/extensions/kafka/request.go @@ -19,7 +19,7 @@ type Request struct { CaptureTime time.Time `json:"captureTime"` } -func ReadRequest(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer) (apiKey ApiKey, apiVersion int16, err error) { +func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer) (apiKey ApiKey, apiVersion int16, err error) { d := &decoder{reader: r, remain: 4} size := d.readInt32() @@ -214,7 +214,8 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer) (api } key := fmt.Sprintf( - "%s:%s->%s:%s::%d", + "%d_%s:%s_%s:%s_%d", + counterPair.StreamId, tcpID.SrcIP, tcpID.SrcPort, tcpID.DstIP, diff --git a/tap/extensions/kafka/response.go b/tap/extensions/kafka/response.go index 44fc67b79..dd9909034 100644 --- a/tap/extensions/kafka/response.go +++ b/tap/extensions/kafka/response.go @@ -16,7 +16,7 @@ type Response struct { CaptureTime time.Time `json:"captureTime"` } -func ReadResponse(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter) (err error) { +func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) (err error) { d := &decoder{reader: r, remain: 4} size := d.readInt32() @@ -44,7 +44,8 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer, emi } key := fmt.Sprintf( - "%s:%s->%s:%s::%d", + "%d_%s:%s_%s:%s_%d", + counterPair.StreamId, tcpID.DstIP, tcpID.DstPort, tcpID.SrcIP, diff --git a/tap/extensions/redis/handlers.go b/tap/extensions/redis/handlers.go index 76785e1a0..a4a3a3858 100644 --- a/tap/extensions/redis/handlers.go +++ b/tap/extensions/redis/handlers.go @@ -7,15 +7,21 @@ import ( ) func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket) error { + counterPair.Lock() counterPair.Request++ + requestCounter := counterPair.Request + counterPair.Unlock() + ident := fmt.Sprintf( - "%s->%s %s->%s %d", + "%d_%s:%s_%s:%s_%d", + counterPair.StreamId, tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, tcpID.DstPort, - counterPair.Request, + requestCounter, ) + item := reqResMatcher.registerRequest(ident, request, superTimer.CaptureTime) if item != nil { item.ConnectionInfo = &api.ConnectionInfo{ @@ -31,15 +37,21 @@ func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTim } func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket) error { + counterPair.Lock() counterPair.Response++ + responseCounter := counterPair.Response + counterPair.Unlock() + ident := fmt.Sprintf( - "%s->%s %s->%s %d", + "%d_%s:%s_%s:%s_%d", + counterPair.StreamId, tcpID.DstIP, tcpID.SrcIP, tcpID.DstPort, tcpID.SrcPort, - counterPair.Response, + responseCounter, ) + item := reqResMatcher.registerResponse(ident, response, superTimer.CaptureTime) if item != nil { item.ConnectionInfo = &api.ConnectionInfo{ diff --git a/tap/extensions/redis/matcher.go b/tap/extensions/redis/matcher.go index 4d1f3d86c..66e1c423b 100644 --- a/tap/extensions/redis/matcher.go +++ b/tap/extensions/redis/matcher.go @@ -1,8 +1,6 @@ package redis import ( - "fmt" - "strings" "sync" "time" @@ -11,7 +9,7 @@ import ( var reqResMatcher = createResponseRequestMatcher() // global -// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}_{incremental_counter} +// Key is `{stream_id}_{src_ip}:{dst_ip}_{src_ip}:{src_port}_{incremental_counter}` type requestResponseMatcher struct { openMessagesMap *sync.Map } @@ -22,9 +20,6 @@ func createResponseRequestMatcher() requestResponseMatcher { } func (matcher *requestResponseMatcher) registerRequest(ident string, request *RedisPacket, captureTime time.Time) *api.OutputChannelItem { - split := splitIdent(ident) - key := genKey(split) - requestRedisMessage := api.GenericMessage{ IsRequest: true, CaptureTime: captureTime, @@ -37,7 +32,7 @@ func (matcher *requestResponseMatcher) registerRequest(ident string, request *Re }, } - if response, found := matcher.openMessagesMap.LoadAndDelete(key); found { + if response, found := matcher.openMessagesMap.LoadAndDelete(ident); found { // Type assertion always succeeds because all of the map's values are of api.GenericMessage type responseRedisMessage := response.(*api.GenericMessage) if responseRedisMessage.IsRequest { @@ -46,14 +41,11 @@ func (matcher *requestResponseMatcher) registerRequest(ident string, request *Re return matcher.preparePair(&requestRedisMessage, responseRedisMessage) } - matcher.openMessagesMap.Store(key, &requestRedisMessage) + matcher.openMessagesMap.Store(ident, &requestRedisMessage) return nil } func (matcher *requestResponseMatcher) registerResponse(ident string, response *RedisPacket, captureTime time.Time) *api.OutputChannelItem { - split := splitIdent(ident) - key := genKey(split) - responseRedisMessage := api.GenericMessage{ IsRequest: false, CaptureTime: captureTime, @@ -66,7 +58,7 @@ func (matcher *requestResponseMatcher) registerResponse(ident string, response * }, } - if request, found := matcher.openMessagesMap.LoadAndDelete(key); found { + if request, found := matcher.openMessagesMap.LoadAndDelete(ident); found { // Type assertion always succeeds because all of the map's values are of api.GenericMessage type requestRedisMessage := request.(*api.GenericMessage) if !requestRedisMessage.IsRequest { @@ -75,7 +67,7 @@ func (matcher *requestResponseMatcher) registerResponse(ident string, response * return matcher.preparePair(requestRedisMessage, &responseRedisMessage) } - matcher.openMessagesMap.Store(key, &responseRedisMessage) + matcher.openMessagesMap.Store(ident, &responseRedisMessage) return nil } @@ -90,13 +82,3 @@ func (matcher *requestResponseMatcher) preparePair(requestRedisMessage *api.Gene }, } } - -func splitIdent(ident string) []string { - ident = strings.Replace(ident, "->", " ", -1) - return strings.Split(ident, " ") -} - -func genKey(split []string) string { - key := fmt.Sprintf("%s:%s->%s:%s,%s", split[0], split[2], split[1], split[3], split[4]) - return key -} diff --git a/tap/tcp_stream_factory.go b/tap/tcp_stream_factory.go index 90f09b5d8..ce56dcec8 100644 --- a/tap/tcp_stream_factory.go +++ b/tap/tcp_stream_factory.go @@ -82,6 +82,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T stream.id = factory.streamsMap.nextId() for i, extension := range extensions { counterPair := &api.CounterPair{ + StreamId: stream.id, Request: 0, Response: 0, }