diff --git a/tap/extensions/amqp/helpers.go b/tap/extensions/amqp/helpers.go index 88ae2439d..0a547b738 100644 --- a/tap/extensions/amqp/helpers.go +++ b/tap/extensions/amqp/helpers.go @@ -389,19 +389,83 @@ func representBasicDeliver(event map[string]interface{}) []interface{} { return rep } -func printEventQueueDeclare(eventQueueDeclare QueueDeclare) { - return - fmt.Printf( - "[%s] Queue: %s, Passive: %t, Durable: %t, AutoDelete: %t, Exclusive: %t, NoWait: %t, Arguments: %v\n", - queueMethodMap[10], - eventQueueDeclare.Queue, - eventQueueDeclare.Passive, - eventQueueDeclare.Durable, - eventQueueDeclare.AutoDelete, - eventQueueDeclare.Exclusive, - eventQueueDeclare.NoWait, - eventQueueDeclare.Arguments, - ) +func emitQueueDeclare(event QueueDeclare, connectionInfo *api.ConnectionInfo, emitter api.Emitter) { + request := &api.GenericMessage{ + IsRequest: true, + CaptureTime: time.Now(), + Payload: AMQPPayload{ + Type: "queue_declare", + Data: &AMQPWrapper{ + Method: queueMethodMap[10], + Url: event.Queue, + Details: event, + }, + }, + } + item := &api.OutputChannelItem{ + Protocol: protocol, + Timestamp: time.Now().UnixNano() / int64(time.Millisecond), + ConnectionInfo: connectionInfo, + Pair: &api.RequestResponsePair{ + Request: *request, + Response: api.GenericMessage{}, + }, + } + emitter.Emit(item) +} + +func representQueueDeclare(event map[string]interface{}) []interface{} { + rep := make([]interface{}, 0) + + details, _ := json.Marshal([]map[string]string{ + { + "name": "Queue", + "value": event["Queue"].(string), + }, + { + "name": "Passive", + "value": strconv.FormatBool(event["Passive"].(bool)), + }, + { + "name": "Durable", + "value": strconv.FormatBool(event["Durable"].(bool)), + }, + { + "name": "Exclusive", + "value": strconv.FormatBool(event["Exclusive"].(bool)), + }, + { + "name": "AutoDelete", + "value": strconv.FormatBool(event["AutoDelete"].(bool)), + }, + { + "name": "NoWait", + "value": strconv.FormatBool(event["NoWait"].(bool)), + }, + }) + rep = append(rep, map[string]string{ + "type": "table", + "title": "Details", + "data": string(details), + }) + + if event["Arguments"] != nil { + headers := make([]map[string]string, 0) + for name, value := range event["Arguments"].(map[string]interface{}) { + headers = append(headers, map[string]string{ + "name": name, + "value": value.(string), + }) + } + headersMarshaled, _ := json.Marshal(headers) + rep = append(rep, map[string]string{ + "type": "table", + "title": "Arguments", + "data": string(headersMarshaled), + }) + } + + return rep } func printEventExchangeDeclare(eventExchangeDeclare ExchangeDeclare) { diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index 7b7e2bc18..51c674e54 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -158,7 +158,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em NoWait: m.NoWait, Arguments: m.Arguments, } - printEventQueueDeclare(*eventQueueDeclare) + emitQueueDeclare(*eventQueueDeclare, connectionInfo, emitter) case *ExchangeDeclare: eventExchangeDeclare := &ExchangeDeclare{ @@ -207,17 +207,28 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve reqDetails := request["details"].(map[string]interface{}) entryBytes, _ := json.Marshal(item.Pair) service := fmt.Sprintf("amqp") + + var summary string + switch request["method"] { + case basicMethodMap[40]: + case basicMethodMap[60]: + summary = reqDetails["Exchange"].(string) + case queueMethodMap[10]: + summary = reqDetails["Queue"].(string) + break + } + return &api.MizuEntry{ ProtocolName: protocol.Name, EntryId: entryId, Entry: string(entryBytes), - Url: fmt.Sprintf("%s%s", service, reqDetails["Exchange"].(string)), + Url: fmt.Sprintf("%s%s", service, summary), Method: request["method"].(string), Status: 0, RequestSenderIp: item.ConnectionInfo.ClientIP, Service: service, Timestamp: item.Timestamp, - Path: reqDetails["Exchange"].(string), + Path: summary, ResolvedSource: resolvedSource, ResolvedDestination: resolvedDestination, SourceIp: item.ConnectionInfo.ClientIP, @@ -267,6 +278,8 @@ func (d dissecting) Represent(entry string) ([]byte, error) { break case basicMethodMap[60]: repRequest = representBasicDeliver(details) + case queueMethodMap[10]: + repRequest = representQueueDeclare(details) break } // response := root["response"].(map[string]interface{})["payload"].(map[string]interface{})