diff --git a/tap/extensions/amqp/helpers.go b/tap/extensions/amqp/helpers.go index 4dc83dcb0..d6e841d11 100644 --- a/tap/extensions/amqp/helpers.go +++ b/tap/extensions/amqp/helpers.go @@ -760,17 +760,81 @@ func representQueueBind(event map[string]interface{}) []interface{} { return rep } -func printEventBasicConsume(eventBasicConsume BasicConsume) { - return - fmt.Printf( - "[%s] Queue: %s, ConsumerTag: %s, NoLocal: %t, NoAck: %t, Exclusive: %t, NoWait: %t, Arguments: %v\n", - basicMethodMap[20], - eventBasicConsume.Queue, - eventBasicConsume.ConsumerTag, - eventBasicConsume.NoLocal, - eventBasicConsume.NoAck, - eventBasicConsume.Exclusive, - eventBasicConsume.NoWait, - eventBasicConsume.Arguments, - ) +func emitBasicConsume(event BasicConsume, connectionInfo *api.ConnectionInfo, emitter api.Emitter) { + request := &api.GenericMessage{ + IsRequest: true, + CaptureTime: time.Now(), + Payload: AMQPPayload{ + Type: "basic_consume", + Data: &AMQPWrapper{ + Method: basicMethodMap[20], + Url: event.Queue, + Details: event, + }, + }, + } + item := &api.OutputChannelItem{ + Protocol: protocol, + Timestamp: time.Now().UnixNano() / int64(time.Millisecond), + ConnectionInfo: connectionInfo, + Pair: &api.RequestResponsePair{ + Request: *request, + Response: api.GenericMessage{}, + }, + } + emitter.Emit(item) +} + +func representBasicConsume(event map[string]interface{}) []interface{} { + rep := make([]interface{}, 0) + + details, _ := json.Marshal([]map[string]string{ + { + "name": "Queue", + "value": event["Queue"].(string), + }, + { + "name": "Consumer Tag", + "value": event["ConsumerTag"].(string), + }, + { + "name": "No Local", + "value": strconv.FormatBool(event["NoLocal"].(bool)), + }, + { + "name": "No Ack", + "value": strconv.FormatBool(event["NoAck"].(bool)), + }, + { + "name": "Exclusive", + "value": strconv.FormatBool(event["Exclusive"].(bool)), + }, + { + "name": "NoWait", + "value": strconv.FormatBool(event["NoWait"].(bool)), + }, + }) + rep = append(rep, map[string]string{ + "type": "table", + "title": "Details", + "data": string(details), + }) + + if event["Arguments"] != nil { + headers := make([]map[string]string, 0) + for name, value := range event["Arguments"].(map[string]interface{}) { + headers = append(headers, map[string]string{ + "name": name, + "value": value.(string), + }) + } + headersMarshaled, _ := json.Marshal(headers) + rep = append(rep, map[string]string{ + "type": "table", + "title": "Arguments", + "data": string(headersMarshaled), + }) + } + + return rep } diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index 12e2e7173..956d6f5fe 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -139,7 +139,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em NoWait: m.NoWait, Arguments: m.Arguments, } - printEventBasicConsume(*eventBasicConsume) + emitBasicConsume(*eventBasicConsume, connectionInfo, emitter) case *BasicDeliver: eventBasicDeliver.ConsumerTag = m.ConsumerTag @@ -231,6 +231,9 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve case queueMethodMap[20]: summary = reqDetails["Queue"].(string) break + case basicMethodMap[20]: + summary = reqDetails["Queue"].(string) + break } return &api.MizuEntry{ @@ -284,7 +287,7 @@ func (d dissecting) Represent(entry string) ([]byte, error) { json.Unmarshal([]byte(entry), &root) representation := make(map[string]interface{}, 0) request := root["request"].(map[string]interface{})["payload"].(map[string]interface{}) - log.Printf("request: %+v\n", request) + // log.Printf("request: %+v\n", request) var repRequest []interface{} details := request["details"].(map[string]interface{}) switch request["method"].(string) { @@ -309,6 +312,9 @@ func (d dissecting) Represent(entry string) ([]byte, error) { case queueMethodMap[20]: repRequest = representQueueBind(details) break + case basicMethodMap[20]: + repRequest = representBasicConsume(details) + break } // response := root["response"].(map[string]interface{})["payload"].(map[string]interface{}) // repRequest := representRequest(request)