From 1a89e3219c25398a658b153657d24db68f9053e8 Mon Sep 17 00:00:00 2001 From: "M. Mert Yildiran" Date: Sat, 21 Aug 2021 16:38:05 +0300 Subject: [PATCH] Implement the AMQP `QueueBind` --- tap/extensions/amqp/helpers.go | 82 +++++++++++++++++++++++++++++----- tap/extensions/amqp/main.go | 10 ++++- 2 files changed, 78 insertions(+), 14 deletions(-) diff --git a/tap/extensions/amqp/helpers.go b/tap/extensions/amqp/helpers.go index 1ae6751da..4dc83dcb0 100644 --- a/tap/extensions/amqp/helpers.go +++ b/tap/extensions/amqp/helpers.go @@ -287,7 +287,7 @@ func representBasicPublish(event map[string]interface{}) []interface{} { return rep } -func emitBasicDeliver(event BasicPublish, connectionInfo *api.ConnectionInfo, emitter api.Emitter) { +func emitBasicDeliver(event BasicDeliver, connectionInfo *api.ConnectionInfo, emitter api.Emitter) { request := &api.GenericMessage{ IsRequest: true, CaptureTime: time.Now(), @@ -689,17 +689,75 @@ func representConnectionClose(event map[string]interface{}) []interface{} { return rep } -func printEventQueueBind(eventQueueBind QueueBind) { - return - fmt.Printf( - "[%s] Queue: %s, Exchange: %s, RoutingKey: %s, NoWait: %t, Arguments: %v\n", - queueMethodMap[20], - eventQueueBind.Queue, - eventQueueBind.Exchange, - eventQueueBind.RoutingKey, - eventQueueBind.NoWait, - eventQueueBind.Arguments, - ) +func emitQueueBind(event QueueBind, connectionInfo *api.ConnectionInfo, emitter api.Emitter) { + request := &api.GenericMessage{ + IsRequest: true, + CaptureTime: time.Now(), + Payload: AMQPPayload{ + Type: "queue_bind", + Data: &AMQPWrapper{ + Method: queueMethodMap[20], + Url: event.Queue, + Details: event, + }, + }, + } + item := &api.OutputChannelItem{ + Protocol: protocol, + Timestamp: time.Now().UnixNano() / int64(time.Millisecond), + ConnectionInfo: connectionInfo, + Pair: &api.RequestResponsePair{ + Request: *request, + Response: api.GenericMessage{}, + }, + } + emitter.Emit(item) +} + +func representQueueBind(event map[string]interface{}) []interface{} { + rep := make([]interface{}, 0) + + details, _ := json.Marshal([]map[string]string{ + { + "name": "Queue", + "value": event["Queue"].(string), + }, + { + "name": "Exchange", + "value": event["Exchange"].(string), + }, + { + "name": "RoutingKey", + "value": event["RoutingKey"].(string), + }, + { + "name": "NoWait", + "value": strconv.FormatBool(event["NoWait"].(bool)), + }, + }) + rep = append(rep, map[string]string{ + "type": "table", + "title": "Details", + "data": string(details), + }) + + if event["Arguments"] != nil { + headers := make([]map[string]string, 0) + for name, value := range event["Arguments"].(map[string]interface{}) { + headers = append(headers, map[string]string{ + "name": name, + "value": value.(string), + }) + } + headersMarshaled, _ := json.Marshal(headers) + rep = append(rep, map[string]string{ + "type": "table", + "title": "Arguments", + "data": string(headersMarshaled), + }) + } + + return rep } func printEventBasicConsume(eventBasicConsume BasicConsume) { diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index 3bdcafb72..12e2e7173 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -106,7 +106,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em emitBasicPublish(*eventBasicPublish, connectionInfo, emitter) case *BasicDeliver: eventBasicDeliver.Body = f.Body - emitBasicDeliver(*eventBasicPublish, connectionInfo, emitter) + emitBasicDeliver(*eventBasicDeliver, connectionInfo, emitter) default: } @@ -127,7 +127,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em NoWait: m.NoWait, Arguments: m.Arguments, } - printEventQueueBind(*eventQueueBind) + emitQueueBind(*eventQueueBind, connectionInfo, emitter) case *BasicConsume: eventBasicConsume := &BasicConsume{ @@ -228,6 +228,9 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve case connectionMethodMap[50]: summary = reqDetails["ReplyText"].(string) break + case queueMethodMap[20]: + summary = reqDetails["Queue"].(string) + break } return &api.MizuEntry{ @@ -303,6 +306,9 @@ func (d dissecting) Represent(entry string) ([]byte, error) { case connectionMethodMap[50]: repRequest = representConnectionClose(details) break + case queueMethodMap[20]: + repRequest = representQueueBind(details) + break } // response := root["response"].(map[string]interface{})["payload"].(map[string]interface{}) // repRequest := representRequest(request)