diff --git a/tap/extensions/amqp/helpers.go b/tap/extensions/amqp/helpers.go index 9e18cad3a..88ae2439d 100644 --- a/tap/extensions/amqp/helpers.go +++ b/tap/extensions/amqp/helpers.go @@ -118,29 +118,7 @@ func emitBasicPublish(event BasicPublish, connectionInfo *api.ConnectionInfo, em emitter.Emit(item) } -func representBasicPublish(event map[string]interface{}) []interface{} { - rep := make([]interface{}, 0) - - details, _ := json.Marshal([]map[string]string{ - { - "name": "Exchange", - "value": event["Exchange"].(string), - }, - { - "name": "Immediate", - "value": strconv.FormatBool(event["Immediate"].(bool)), - }, - { - "name": "Mandatory", - "value": strconv.FormatBool(event["Mandatory"].(bool)), - }, - }) - rep = append(rep, map[string]string{ - "type": "table", - "title": "details", - "data": string(details), - }) - +func representProperties(properties map[string]interface{}, rep []interface{}) ([]interface{}, string, string) { contentType := "" contentEncoding := "" deliveryMode := "" @@ -154,8 +132,6 @@ func representBasicPublish(event map[string]interface{}) []interface{} { userId := "" appId := "" - properties := event["Properties"].(map[string]interface{}) - if properties["ContentType"] != nil { contentType = properties["ContentType"].(string) } @@ -249,44 +225,168 @@ func representBasicPublish(event map[string]interface{}) []interface{} { "data": string(props), }) - headers := make([]map[string]string, 0) - for name, value := range properties["Headers"].(map[string]interface{}) { - headers = append(headers, map[string]string{ - "name": name, - "value": value.(string), - }) - } - headersMarshaled, _ := json.Marshal(headers) + return rep, contentType, contentEncoding +} + +func representBasicPublish(event map[string]interface{}) []interface{} { + rep := make([]interface{}, 0) + + details, _ := json.Marshal([]map[string]string{ + { + "name": "Exchange", + "value": event["Exchange"].(string), + }, + { + "name": "Routing Key", + "value": event["RoutingKey"].(string), + }, + { + "name": "Mandatory", + "value": strconv.FormatBool(event["Mandatory"].(bool)), + }, + { + "name": "Immediate", + "value": strconv.FormatBool(event["Immediate"].(bool)), + }, + }) rep = append(rep, map[string]string{ "type": "table", - "title": "Headers", - "data": string(headersMarshaled), + "title": "Details", + "data": string(details), }) - rep = append(rep, map[string]string{ - "type": "body", - "title": "Body", - "encoding": contentEncoding, - "mime_type": contentType, - "data": event["Body"].(string), - }) // FIXME: `Body` value seems wrong + properties := event["Properties"].(map[string]interface{}) + rep, contentType, contentEncoding := representProperties(properties, rep) + + if properties["Headers"] != nil { + headers := make([]map[string]string, 0) + for name, value := range properties["Headers"].(map[string]interface{}) { + headers = append(headers, map[string]string{ + "name": name, + "value": value.(string), + }) + } + headersMarshaled, _ := json.Marshal(headers) + rep = append(rep, map[string]string{ + "type": "table", + "title": "Headers", + "data": string(headersMarshaled), + }) + } + + if event["Body"] != nil { + rep = append(rep, map[string]string{ + "type": "body", + "title": "Body", + "encoding": contentEncoding, + "mime_type": contentType, + "data": event["Body"].(string), + }) // FIXME: `Body` value seems wrong + } return rep } -func printEventBasicDeliver(eventBasicDeliver BasicDeliver) { - return - fmt.Printf( - "[%s] ConsumerTag: %s, DeliveryTag: %d, Redelivered: %t, Exchange: %s, RoutingKey: %s, Properties: %v, Body: %s\n", - basicMethodMap[60], - eventBasicDeliver.ConsumerTag, - eventBasicDeliver.DeliveryTag, - eventBasicDeliver.Redelivered, - eventBasicDeliver.Exchange, - eventBasicDeliver.RoutingKey, - eventBasicDeliver.Properties, - eventBasicDeliver.Body, - ) +func emitBasicDeliver(event BasicPublish, connectionInfo *api.ConnectionInfo, emitter api.Emitter) { + request := &api.GenericMessage{ + IsRequest: true, + CaptureTime: time.Now(), + Payload: AMQPPayload{ + Type: "basic_deliver", + Data: &AMQPWrapper{ + Method: basicMethodMap[60], + Url: event.Exchange, + Details: event, + }, + }, + } + item := &api.OutputChannelItem{ + Protocol: protocol, + Timestamp: time.Now().UnixNano() / int64(time.Millisecond), + ConnectionInfo: connectionInfo, + Pair: &api.RequestResponsePair{ + Request: *request, + Response: api.GenericMessage{}, + }, + } + emitter.Emit(item) +} + +func representBasicDeliver(event map[string]interface{}) []interface{} { + rep := make([]interface{}, 0) + + consumerTag := "" + deliveryTag := "" + redelivered := "" + + if event["ConsumerTag"] != nil { + consumerTag = event["ConsumerTag"].(string) + } + if event["DeliveryTag"] != nil { + deliveryTag = fmt.Sprintf("%g", event["DeliveryTag"].(float64)) + } + if event["Redelivered"] != nil { + redelivered = strconv.FormatBool(event["Redelivered"].(bool)) + } + + details, _ := json.Marshal([]map[string]string{ + { + "name": "Consumer Tag", + "value": consumerTag, + }, + { + "name": "Delivery Tag", + "value": deliveryTag, + }, + { + "name": "Redelivered", + "value": redelivered, + }, + { + "name": "Exchange", + "value": event["Exchange"].(string), + }, + { + "name": "Routing Key", + "value": event["RoutingKey"].(string), + }, + }) + rep = append(rep, map[string]string{ + "type": "table", + "title": "Details", + "data": string(details), + }) + + properties := event["Properties"].(map[string]interface{}) + rep, contentType, contentEncoding := representProperties(properties, rep) + + if properties["Headers"] != nil { + headers := make([]map[string]string, 0) + for name, value := range properties["Headers"].(map[string]interface{}) { + headers = append(headers, map[string]string{ + "name": name, + "value": value.(string), + }) + } + headersMarshaled, _ := json.Marshal(headers) + rep = append(rep, map[string]string{ + "type": "table", + "title": "Headers", + "data": string(headersMarshaled), + }) + } + + if event["Body"] != nil { + rep = append(rep, map[string]string{ + "type": "body", + "title": "Body", + "encoding": contentEncoding, + "mime_type": contentType, + "data": event["Body"].(string), + }) // FIXME: `Body` value seems wrong + } + + return rep } func printEventQueueDeclare(eventQueueDeclare QueueDeclare) { diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index 43bc17efb..7b7e2bc18 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -106,7 +106,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em emitBasicPublish(*eventBasicPublish, connectionInfo, emitter) case *BasicDeliver: eventBasicDeliver.Body = f.Body - printEventBasicDeliver(*eventBasicDeliver) + emitBasicDeliver(*eventBasicPublish, connectionInfo, emitter) default: } @@ -265,6 +265,9 @@ func (d dissecting) Represent(entry string) ([]byte, error) { case basicMethodMap[40]: repRequest = representBasicPublish(details) break + case basicMethodMap[60]: + repRequest = representBasicDeliver(details) + break } // response := root["response"].(map[string]interface{})["payload"].(map[string]interface{}) // repRequest := representRequest(request) diff --git a/tap/extensions/amqp/structs.go b/tap/extensions/amqp/structs.go index 67a645883..3a384a512 100644 --- a/tap/extensions/amqp/structs.go +++ b/tap/extensions/amqp/structs.go @@ -2,7 +2,6 @@ package main import ( "encoding/json" - "fmt" ) type AMQPPayload struct { @@ -16,10 +15,11 @@ type AMQPPayloader interface { } func (h AMQPPayload) MarshalJSON() ([]byte, error) { - switch h.Type { - case "basic_publish": - return json.Marshal(h.Data) - default: - panic(fmt.Sprintf("AMQP payload cannot be marshaled: %s\n", h.Type)) - } + return json.Marshal(h.Data) + // switch h.Type { + // case "basic_publish": + // return json.Marshal(h.Data) + // default: + // panic(fmt.Sprintf("AMQP payload cannot be marshaled: %s\n", h.Type)) + // } }