From 21cac50ccb1440def2c3d222c24ae64a6fad8d8c Mon Sep 17 00:00:00 2001 From: "M. Mert Yildiran" Date: Sat, 21 Aug 2021 16:28:44 +0300 Subject: [PATCH] Implement the AMQP `ConnectionClose` --- tap/extensions/amqp/helpers.go | 63 ++++++++++++++++++++++++++++------ tap/extensions/amqp/main.go | 14 ++++++-- 2 files changed, 65 insertions(+), 12 deletions(-) diff --git a/tap/extensions/amqp/helpers.go b/tap/extensions/amqp/helpers.go index 1c06e71c3..1ae6751da 100644 --- a/tap/extensions/amqp/helpers.go +++ b/tap/extensions/amqp/helpers.go @@ -634,16 +634,59 @@ func representConnectionStart(event map[string]interface{}) []interface{} { return rep } -func printEventConnectionClose(eventConnectionClose ConnectionClose) { - return - fmt.Printf( - "[%s] ReplyCode: %d, ReplyText: %s, ClassId: %d, MethodId: %d\n", - connectionMethodMap[50], - eventConnectionClose.ReplyCode, - eventConnectionClose.ReplyText, - eventConnectionClose.ClassId, - eventConnectionClose.MethodId, - ) +func emitConnectionClose(event ConnectionClose, connectionInfo *api.ConnectionInfo, emitter api.Emitter) { + request := &api.GenericMessage{ + IsRequest: true, + CaptureTime: time.Now(), + Payload: AMQPPayload{ + Type: "connection_close", + Data: &AMQPWrapper{ + Method: connectionMethodMap[50], + Url: event.ReplyText, + Details: event, + }, + }, + } + item := &api.OutputChannelItem{ + Protocol: protocol, + Timestamp: time.Now().UnixNano() / int64(time.Millisecond), + ConnectionInfo: connectionInfo, + Pair: &api.RequestResponsePair{ + Request: *request, + Response: api.GenericMessage{}, + }, + } + emitter.Emit(item) +} + +func representConnectionClose(event map[string]interface{}) []interface{} { + rep := make([]interface{}, 0) + + details, _ := json.Marshal([]map[string]string{ + { + "name": "Reply Code", + "value": fmt.Sprintf("%g", event["ReplyCode"].(float64)), + }, + { + "name": "Reply Text", + "value": event["ReplyText"].(string), + }, + { + "name": "Class ID", + "value": fmt.Sprintf("%g", event["ClassId"].(float64)), + }, + { + "name": "Method ID", + "value": fmt.Sprintf("%g", event["MethodId"].(float64)), + }, + }) + rep = append(rep, map[string]string{ + "type": "table", + "title": "Details", + "data": string(details), + }) + + return rep } func printEventQueueBind(eventQueueBind QueueBind) { diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index 4f6f3555c..3bdcafb72 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -190,7 +190,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em ClassId: m.ClassId, MethodId: m.MethodId, } - printEventConnectionClose(*eventConnectionClose) + emitConnectionClose(*eventConnectionClose, connectionInfo, emitter) default: @@ -208,10 +208,14 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve entryBytes, _ := json.Marshal(item.Pair) service := fmt.Sprintf("amqp") - var summary string + summary := "" switch request["method"] { case basicMethodMap[40]: + summary = reqDetails["Exchange"].(string) + break case basicMethodMap[60]: + summary = reqDetails["Exchange"].(string) + break case exchangeMethodMap[10]: summary = reqDetails["Exchange"].(string) break @@ -221,6 +225,9 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve case connectionMethodMap[10]: summary = fmt.Sprintf("%g.%g", reqDetails["VersionMajor"].(float64), reqDetails["VersionMinor"].(float64)) break + case connectionMethodMap[50]: + summary = reqDetails["ReplyText"].(string) + break } return &api.MizuEntry{ @@ -293,6 +300,9 @@ func (d dissecting) Represent(entry string) ([]byte, error) { case connectionMethodMap[10]: repRequest = representConnectionStart(details) break + case connectionMethodMap[50]: + repRequest = representConnectionClose(details) + break } // response := root["response"].(map[string]interface{})["payload"].(map[string]interface{}) // repRequest := representRequest(request)