Improve request-response matching and fix some interface conversion errors (#778)

* Add a PCAP based testbed

* Fix typos

* Download PCAPs from the Google Cloud bucket

* Add a Python script to automate the PCAP testbed

* Dump the test suite into a file named `suite.json`

* Serialize entries separately

* Dissect individual TCP streams one by one through separate PCAP files

* Improve the reliability a little bit

* Ditch the individual TCP streams idea

* Fix some issues in Kafka

* Print the total number of packets and TCP streams

* Fix an interface conversion error in AMQP

* Print the total number of returning items from the dissectors

* Print the total number of returning items from the dissectors really

* Fix a possible race condition

* Do atomic increments just to be sure

* Print the total number of Redis `Dissect` calls

* Improve the request-response matching in Redis by including the TCP stream ID

* Update the request-response pair matching key format in HTTP and Kafka

* Rearrange the test suite

* Add more queries to the test suite

* Remove the debug prints

* Add the assertions

* Close the WebSocket connection faster

* Make `MIZU_TEST` enviroment variable a shared constant

* Add `test-lint` rule

* Fix several issues in Kafka

* Update the test suite

* Add more queries

* Fix the `test-lint` rule

* Exit only after PCAP EOF

* Add more queries

* Update `suite.json`

* Make the tests more stable

* Revert the bad changes

* Run `go mod tidy` on `tap/`
This commit is contained in:
M. Mert Yıldıran 2022-02-08 21:32:27 +03:00 committed by GitHub
parent 145004fe43
commit a42a0cd0b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 97 additions and 91 deletions

View File

@ -58,8 +58,10 @@ type TcpID struct {
}
type CounterPair struct {
StreamId int64
Request uint
Response uint
sync.Mutex
}
type GenericMessage struct {

View File

@ -362,7 +362,7 @@ func representBasicDeliver(event map[string]interface{}) []interface{} {
for name, value := range properties["headers"].(map[string]interface{}) {
headers = append(headers, api.TableData{
Name: name,
Value: value.(string),
Value: value,
Selector: fmt.Sprintf(`request.properties.headers["%s"]`, name),
})
}

View File

@ -115,7 +115,10 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
if err != nil {
return
}
counterPair.Lock()
counterPair.Request++
requestCounter := counterPair.Request
counterPair.Unlock()
// Check HTTP2 upgrade - HTTP2 Over Cleartext (H2C)
if strings.Contains(strings.ToLower(req.Header.Get("Connection")), "upgrade") && strings.ToLower(req.Header.Get("Upgrade")) == "h2c" {
@ -127,12 +130,13 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
req.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
ident := fmt.Sprintf(
"%s->%s %s->%s %d %s",
"%d_%s:%s_%s:%s_%d_%s",
counterPair.StreamId,
tcpID.SrcIP,
tcpID.DstIP,
tcpID.SrcPort,
tcpID.DstPort,
counterPair.Request,
requestCounter,
"HTTP1",
)
item := reqResMatcher.registerRequest(ident, req, superTimer.CaptureTime, req.ProtoMinor)
@ -155,7 +159,10 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
if err != nil {
return
}
counterPair.Lock()
counterPair.Response++
responseCounter := counterPair.Response
counterPair.Unlock()
// Check HTTP2 upgrade - HTTP2 Over Cleartext (H2C)
if res.StatusCode == 101 && strings.Contains(strings.ToLower(res.Header.Get("Connection")), "upgrade") && strings.ToLower(res.Header.Get("Upgrade")) == "h2c" {
@ -167,12 +174,13 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
res.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
ident := fmt.Sprintf(
"%s->%s %s->%s %d %s",
"%d_%s:%s_%s:%s_%d_%s",
counterPair.StreamId,
tcpID.DstIP,
tcpID.SrcIP,
tcpID.DstPort,
tcpID.SrcPort,
counterPair.Response,
responseCounter,
"HTTP1",
)
item := reqResMatcher.registerResponse(ident, res, superTimer.CaptureTime, res.ProtoMinor)

View File

@ -1,9 +1,7 @@
package http
import (
"fmt"
"net/http"
"strings"
"sync"
"time"
@ -23,9 +21,6 @@ func createResponseRequestMatcher() requestResponseMatcher {
}
func (matcher *requestResponseMatcher) registerRequest(ident string, request *http.Request, captureTime time.Time, protoMinor int) *api.OutputChannelItem {
split := splitIdent(ident)
key := genKey(split)
requestHTTPMessage := api.GenericMessage{
IsRequest: true,
CaptureTime: captureTime,
@ -35,7 +30,7 @@ func (matcher *requestResponseMatcher) registerRequest(ident string, request *ht
},
}
if response, found := matcher.openMessagesMap.LoadAndDelete(key); found {
if response, found := matcher.openMessagesMap.LoadAndDelete(ident); found {
// Type assertion always succeeds because all of the map's values are of api.GenericMessage type
responseHTTPMessage := response.(*api.GenericMessage)
if responseHTTPMessage.IsRequest {
@ -44,14 +39,11 @@ func (matcher *requestResponseMatcher) registerRequest(ident string, request *ht
return matcher.preparePair(&requestHTTPMessage, responseHTTPMessage, protoMinor)
}
matcher.openMessagesMap.Store(key, &requestHTTPMessage)
matcher.openMessagesMap.Store(ident, &requestHTTPMessage)
return nil
}
func (matcher *requestResponseMatcher) registerResponse(ident string, response *http.Response, captureTime time.Time, protoMinor int) *api.OutputChannelItem {
split := splitIdent(ident)
key := genKey(split)
responseHTTPMessage := api.GenericMessage{
IsRequest: false,
CaptureTime: captureTime,
@ -61,7 +53,7 @@ func (matcher *requestResponseMatcher) registerResponse(ident string, response *
},
}
if request, found := matcher.openMessagesMap.LoadAndDelete(key); found {
if request, found := matcher.openMessagesMap.LoadAndDelete(ident); found {
// Type assertion always succeeds because all of the map's values are of api.GenericMessage type
requestHTTPMessage := request.(*api.GenericMessage)
if !requestHTTPMessage.IsRequest {
@ -70,7 +62,7 @@ func (matcher *requestResponseMatcher) registerResponse(ident string, response *
return matcher.preparePair(requestHTTPMessage, &responseHTTPMessage, protoMinor)
}
matcher.openMessagesMap.Store(key, &responseHTTPMessage)
matcher.openMessagesMap.Store(ident, &responseHTTPMessage)
return nil
}
@ -89,13 +81,3 @@ func (matcher *requestResponseMatcher) preparePair(requestHTTPMessage *api.Gener
},
}
}
func splitIdent(ident string) []string {
ident = strings.Replace(ident, "->", " ", -1)
return strings.Split(ident, " ")
}
func genKey(split []string) string {
key := fmt.Sprintf("%s:%s->%s:%s,%s%s", split[0], split[2], split[1], split[3], split[4], split[5])
return key
}

View File

@ -368,24 +368,26 @@ func representProduceRequest(data map[string]interface{}) []interface{} {
}
recordsResults := recordsPath.Get(obj)
if len(recordsResults) > 0 {
records := recordsResults[0].([]interface{})
for i, _record := range records {
record := _record.(map[string]interface{})
value := record["value"]
delete(record, "value")
if recordsResults[0] != nil {
records := recordsResults[0].([]interface{})
for i, _record := range records {
record := _record.(map[string]interface{})
value := record["value"]
delete(record, "value")
rep = append(rep, api.SectionData{
Type: api.TABLE,
Title: fmt.Sprintf("Record [%d] Details (topic: %s)", i, topicName),
Data: representMapAsTable(record, fmt.Sprintf(`request.payload.topicData.partitions.partitionData.records.recordBatch.record[%d]`, i), []string{"value"}),
})
rep = append(rep, api.SectionData{
Type: api.TABLE,
Title: fmt.Sprintf("Record [%d] Details (topic: %s)", i, topicName),
Data: representMapAsTable(record, fmt.Sprintf(`request.payload.topicData.partitions.partitionData.records.recordBatch.record[%d]`, i), []string{"value"}),
})
rep = append(rep, api.SectionData{
Type: api.BODY,
Title: fmt.Sprintf("Record [%d] Value", i),
Data: value.(string),
Selector: fmt.Sprintf(`request.payload.topicData.partitions.partitionData.records.recordBatch.record[%d].value`, i),
})
rep = append(rep, api.SectionData{
Type: api.BODY,
Title: fmt.Sprintf("Record [%d] Value", i),
Data: value.(string),
Selector: fmt.Sprintf(`request.payload.topicData.partitions.partitionData.records.recordBatch.record[%d].value`, i),
})
}
}
}
}
@ -614,22 +616,24 @@ func representFetchResponse(data map[string]interface{}) []interface{} {
Data: representMapAsTable(recordBatch, fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d].recordSet.recordBatch`, i, j), []string{"record"}),
})
for k, _record := range recordBatch["record"].([]interface{}) {
record := _record.(map[string]interface{})
value := record["value"]
if recordBatch["record"] != nil {
for k, _record := range recordBatch["record"].([]interface{}) {
record := _record.(map[string]interface{})
value := record["value"]
rep = append(rep, api.SectionData{
Type: api.TABLE,
Title: fmt.Sprintf("Response [%d] Partition Response [%d] Record [%d] (topic: %s)", i, j, k, topicName),
Data: representMapAsTable(record, fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d].recordSet.recordBatch.record[%d]`, i, j, k), []string{"value"}),
})
rep = append(rep, api.SectionData{
Type: api.TABLE,
Title: fmt.Sprintf("Response [%d] Partition Response [%d] Record [%d] (topic: %s)", i, j, k, topicName),
Data: representMapAsTable(record, fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d].recordSet.recordBatch.record[%d]`, i, j, k), []string{"value"}),
})
rep = append(rep, api.SectionData{
Type: api.BODY,
Title: fmt.Sprintf("Response [%d] Partition Response [%d] Record [%d] Value (topic: %s)", i, j, k, topicName),
Data: value.(string),
Selector: fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d].recordSet.recordBatch.record[%d].value`, i, j, k),
})
rep = append(rep, api.SectionData{
Type: api.BODY,
Title: fmt.Sprintf("Response [%d] Partition Response [%d] Record [%d] Value (topic: %s)", i, j, k, topicName),
Data: value.(string),
Selector: fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d].recordSet.recordBatch.record[%d].value`, i, j, k),
})
}
}
}
}
@ -730,6 +734,9 @@ func representCreateTopicsRequest(data map[string]interface{}) []interface{} {
Data: string(repPayload),
})
if payload["topics"] == nil {
return rep
}
for i, _topic := range payload["topics"].([]interface{}) {
topic := _topic.(map[string]interface{})
@ -766,6 +773,9 @@ func representCreateTopicsResponse(data map[string]interface{}) []interface{} {
Data: string(repPayload),
})
if payload["topics"] == nil {
return rep
}
for i, _topic := range payload["topics"].([]interface{}) {
topic := _topic.(map[string]interface{})

View File

@ -47,13 +47,13 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
if isClient {
_, _, err := ReadRequest(b, tcpID, superTimer)
_, _, err := ReadRequest(b, tcpID, counterPair, superTimer)
if err != nil {
return err
}
superIdentifier.Protocol = &_protocol
} else {
err := ReadResponse(b, tcpID, superTimer, emitter)
err := ReadResponse(b, tcpID, counterPair, superTimer, emitter)
if err != nil {
return err
}
@ -120,7 +120,11 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
summary = summary[:len(summary)-2]
}
case CreateTopics:
topics := reqDetails["payload"].(map[string]interface{})["topics"].([]interface{})
_topics := reqDetails["payload"].(map[string]interface{})["topics"]
if _topics == nil {
break
}
topics := _topics.([]interface{})
for _, topic := range topics {
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["name"].(string))
}
@ -128,6 +132,9 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
summary = summary[:len(summary)-2]
}
case DeleteTopics:
if reqDetails["topicNames"] == nil {
break
}
topicNames := reqDetails["topicNames"].([]string)
for _, name := range topicNames {
summary += fmt.Sprintf("%s, ", name)

View File

@ -19,7 +19,7 @@ type Request struct {
CaptureTime time.Time `json:"captureTime"`
}
func ReadRequest(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer) (apiKey ApiKey, apiVersion int16, err error) {
func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer) (apiKey ApiKey, apiVersion int16, err error) {
d := &decoder{reader: r, remain: 4}
size := d.readInt32()
@ -214,7 +214,8 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer) (api
}
key := fmt.Sprintf(
"%s:%s->%s:%s::%d",
"%d_%s:%s_%s:%s_%d",
counterPair.StreamId,
tcpID.SrcIP,
tcpID.SrcPort,
tcpID.DstIP,

View File

@ -16,7 +16,7 @@ type Response struct {
CaptureTime time.Time `json:"captureTime"`
}
func ReadResponse(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter) (err error) {
func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) (err error) {
d := &decoder{reader: r, remain: 4}
size := d.readInt32()
@ -44,7 +44,8 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer, emi
}
key := fmt.Sprintf(
"%s:%s->%s:%s::%d",
"%d_%s:%s_%s:%s_%d",
counterPair.StreamId,
tcpID.DstIP,
tcpID.DstPort,
tcpID.SrcIP,

View File

@ -7,15 +7,21 @@ import (
)
func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket) error {
counterPair.Lock()
counterPair.Request++
requestCounter := counterPair.Request
counterPair.Unlock()
ident := fmt.Sprintf(
"%s->%s %s->%s %d",
"%d_%s:%s_%s:%s_%d",
counterPair.StreamId,
tcpID.SrcIP,
tcpID.DstIP,
tcpID.SrcPort,
tcpID.DstPort,
counterPair.Request,
requestCounter,
)
item := reqResMatcher.registerRequest(ident, request, superTimer.CaptureTime)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
@ -31,15 +37,21 @@ func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTim
}
func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket) error {
counterPair.Lock()
counterPair.Response++
responseCounter := counterPair.Response
counterPair.Unlock()
ident := fmt.Sprintf(
"%s->%s %s->%s %d",
"%d_%s:%s_%s:%s_%d",
counterPair.StreamId,
tcpID.DstIP,
tcpID.SrcIP,
tcpID.DstPort,
tcpID.SrcPort,
counterPair.Response,
responseCounter,
)
item := reqResMatcher.registerResponse(ident, response, superTimer.CaptureTime)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{

View File

@ -1,8 +1,6 @@
package redis
import (
"fmt"
"strings"
"sync"
"time"
@ -11,7 +9,7 @@ import (
var reqResMatcher = createResponseRequestMatcher() // global
// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}_{incremental_counter}
// Key is `{stream_id}_{src_ip}:{dst_ip}_{src_ip}:{src_port}_{incremental_counter}`
type requestResponseMatcher struct {
openMessagesMap *sync.Map
}
@ -22,9 +20,6 @@ func createResponseRequestMatcher() requestResponseMatcher {
}
func (matcher *requestResponseMatcher) registerRequest(ident string, request *RedisPacket, captureTime time.Time) *api.OutputChannelItem {
split := splitIdent(ident)
key := genKey(split)
requestRedisMessage := api.GenericMessage{
IsRequest: true,
CaptureTime: captureTime,
@ -37,7 +32,7 @@ func (matcher *requestResponseMatcher) registerRequest(ident string, request *Re
},
}
if response, found := matcher.openMessagesMap.LoadAndDelete(key); found {
if response, found := matcher.openMessagesMap.LoadAndDelete(ident); found {
// Type assertion always succeeds because all of the map's values are of api.GenericMessage type
responseRedisMessage := response.(*api.GenericMessage)
if responseRedisMessage.IsRequest {
@ -46,14 +41,11 @@ func (matcher *requestResponseMatcher) registerRequest(ident string, request *Re
return matcher.preparePair(&requestRedisMessage, responseRedisMessage)
}
matcher.openMessagesMap.Store(key, &requestRedisMessage)
matcher.openMessagesMap.Store(ident, &requestRedisMessage)
return nil
}
func (matcher *requestResponseMatcher) registerResponse(ident string, response *RedisPacket, captureTime time.Time) *api.OutputChannelItem {
split := splitIdent(ident)
key := genKey(split)
responseRedisMessage := api.GenericMessage{
IsRequest: false,
CaptureTime: captureTime,
@ -66,7 +58,7 @@ func (matcher *requestResponseMatcher) registerResponse(ident string, response *
},
}
if request, found := matcher.openMessagesMap.LoadAndDelete(key); found {
if request, found := matcher.openMessagesMap.LoadAndDelete(ident); found {
// Type assertion always succeeds because all of the map's values are of api.GenericMessage type
requestRedisMessage := request.(*api.GenericMessage)
if !requestRedisMessage.IsRequest {
@ -75,7 +67,7 @@ func (matcher *requestResponseMatcher) registerResponse(ident string, response *
return matcher.preparePair(requestRedisMessage, &responseRedisMessage)
}
matcher.openMessagesMap.Store(key, &responseRedisMessage)
matcher.openMessagesMap.Store(ident, &responseRedisMessage)
return nil
}
@ -90,13 +82,3 @@ func (matcher *requestResponseMatcher) preparePair(requestRedisMessage *api.Gene
},
}
}
func splitIdent(ident string) []string {
ident = strings.Replace(ident, "->", " ", -1)
return strings.Split(ident, " ")
}
func genKey(split []string) string {
key := fmt.Sprintf("%s:%s->%s:%s,%s", split[0], split[2], split[1], split[3], split[4])
return key
}

View File

@ -82,6 +82,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
stream.id = factory.streamsMap.nextId()
for i, extension := range extensions {
counterPair := &api.CounterPair{
StreamId: stream.id,
Request: 0,
Response: 0,
}