Implement the AMQP BasicDeliver

This commit is contained in:
M. Mert Yildiran 2021-08-21 15:37:20 +03:00
parent 75899a9868
commit 5effd97c27
No known key found for this signature in database
GPG Key ID: D42ADB236521BF7A
3 changed files with 166 additions and 63 deletions

View File

@ -118,29 +118,7 @@ func emitBasicPublish(event BasicPublish, connectionInfo *api.ConnectionInfo, em
emitter.Emit(item)
}
func representBasicPublish(event map[string]interface{}) []interface{} {
rep := make([]interface{}, 0)
details, _ := json.Marshal([]map[string]string{
{
"name": "Exchange",
"value": event["Exchange"].(string),
},
{
"name": "Immediate",
"value": strconv.FormatBool(event["Immediate"].(bool)),
},
{
"name": "Mandatory",
"value": strconv.FormatBool(event["Mandatory"].(bool)),
},
})
rep = append(rep, map[string]string{
"type": "table",
"title": "details",
"data": string(details),
})
func representProperties(properties map[string]interface{}, rep []interface{}) ([]interface{}, string, string) {
contentType := ""
contentEncoding := ""
deliveryMode := ""
@ -154,8 +132,6 @@ func representBasicPublish(event map[string]interface{}) []interface{} {
userId := ""
appId := ""
properties := event["Properties"].(map[string]interface{})
if properties["ContentType"] != nil {
contentType = properties["ContentType"].(string)
}
@ -249,44 +225,168 @@ func representBasicPublish(event map[string]interface{}) []interface{} {
"data": string(props),
})
headers := make([]map[string]string, 0)
for name, value := range properties["Headers"].(map[string]interface{}) {
headers = append(headers, map[string]string{
"name": name,
"value": value.(string),
})
}
headersMarshaled, _ := json.Marshal(headers)
return rep, contentType, contentEncoding
}
func representBasicPublish(event map[string]interface{}) []interface{} {
rep := make([]interface{}, 0)
details, _ := json.Marshal([]map[string]string{
{
"name": "Exchange",
"value": event["Exchange"].(string),
},
{
"name": "Routing Key",
"value": event["RoutingKey"].(string),
},
{
"name": "Mandatory",
"value": strconv.FormatBool(event["Mandatory"].(bool)),
},
{
"name": "Immediate",
"value": strconv.FormatBool(event["Immediate"].(bool)),
},
})
rep = append(rep, map[string]string{
"type": "table",
"title": "Headers",
"data": string(headersMarshaled),
"title": "Details",
"data": string(details),
})
rep = append(rep, map[string]string{
"type": "body",
"title": "Body",
"encoding": contentEncoding,
"mime_type": contentType,
"data": event["Body"].(string),
}) // FIXME: `Body` value seems wrong
properties := event["Properties"].(map[string]interface{})
rep, contentType, contentEncoding := representProperties(properties, rep)
if properties["Headers"] != nil {
headers := make([]map[string]string, 0)
for name, value := range properties["Headers"].(map[string]interface{}) {
headers = append(headers, map[string]string{
"name": name,
"value": value.(string),
})
}
headersMarshaled, _ := json.Marshal(headers)
rep = append(rep, map[string]string{
"type": "table",
"title": "Headers",
"data": string(headersMarshaled),
})
}
if event["Body"] != nil {
rep = append(rep, map[string]string{
"type": "body",
"title": "Body",
"encoding": contentEncoding,
"mime_type": contentType,
"data": event["Body"].(string),
}) // FIXME: `Body` value seems wrong
}
return rep
}
func printEventBasicDeliver(eventBasicDeliver BasicDeliver) {
return
fmt.Printf(
"[%s] ConsumerTag: %s, DeliveryTag: %d, Redelivered: %t, Exchange: %s, RoutingKey: %s, Properties: %v, Body: %s\n",
basicMethodMap[60],
eventBasicDeliver.ConsumerTag,
eventBasicDeliver.DeliveryTag,
eventBasicDeliver.Redelivered,
eventBasicDeliver.Exchange,
eventBasicDeliver.RoutingKey,
eventBasicDeliver.Properties,
eventBasicDeliver.Body,
)
func emitBasicDeliver(event BasicPublish, connectionInfo *api.ConnectionInfo, emitter api.Emitter) {
request := &api.GenericMessage{
IsRequest: true,
CaptureTime: time.Now(),
Payload: AMQPPayload{
Type: "basic_deliver",
Data: &AMQPWrapper{
Method: basicMethodMap[60],
Url: event.Exchange,
Details: event,
},
},
}
item := &api.OutputChannelItem{
Protocol: protocol,
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
ConnectionInfo: connectionInfo,
Pair: &api.RequestResponsePair{
Request: *request,
Response: api.GenericMessage{},
},
}
emitter.Emit(item)
}
func representBasicDeliver(event map[string]interface{}) []interface{} {
rep := make([]interface{}, 0)
consumerTag := ""
deliveryTag := ""
redelivered := ""
if event["ConsumerTag"] != nil {
consumerTag = event["ConsumerTag"].(string)
}
if event["DeliveryTag"] != nil {
deliveryTag = fmt.Sprintf("%g", event["DeliveryTag"].(float64))
}
if event["Redelivered"] != nil {
redelivered = strconv.FormatBool(event["Redelivered"].(bool))
}
details, _ := json.Marshal([]map[string]string{
{
"name": "Consumer Tag",
"value": consumerTag,
},
{
"name": "Delivery Tag",
"value": deliveryTag,
},
{
"name": "Redelivered",
"value": redelivered,
},
{
"name": "Exchange",
"value": event["Exchange"].(string),
},
{
"name": "Routing Key",
"value": event["RoutingKey"].(string),
},
})
rep = append(rep, map[string]string{
"type": "table",
"title": "Details",
"data": string(details),
})
properties := event["Properties"].(map[string]interface{})
rep, contentType, contentEncoding := representProperties(properties, rep)
if properties["Headers"] != nil {
headers := make([]map[string]string, 0)
for name, value := range properties["Headers"].(map[string]interface{}) {
headers = append(headers, map[string]string{
"name": name,
"value": value.(string),
})
}
headersMarshaled, _ := json.Marshal(headers)
rep = append(rep, map[string]string{
"type": "table",
"title": "Headers",
"data": string(headersMarshaled),
})
}
if event["Body"] != nil {
rep = append(rep, map[string]string{
"type": "body",
"title": "Body",
"encoding": contentEncoding,
"mime_type": contentType,
"data": event["Body"].(string),
}) // FIXME: `Body` value seems wrong
}
return rep
}
func printEventQueueDeclare(eventQueueDeclare QueueDeclare) {

View File

@ -106,7 +106,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em
emitBasicPublish(*eventBasicPublish, connectionInfo, emitter)
case *BasicDeliver:
eventBasicDeliver.Body = f.Body
printEventBasicDeliver(*eventBasicDeliver)
emitBasicDeliver(*eventBasicPublish, connectionInfo, emitter)
default:
}
@ -265,6 +265,9 @@ func (d dissecting) Represent(entry string) ([]byte, error) {
case basicMethodMap[40]:
repRequest = representBasicPublish(details)
break
case basicMethodMap[60]:
repRequest = representBasicDeliver(details)
break
}
// response := root["response"].(map[string]interface{})["payload"].(map[string]interface{})
// repRequest := representRequest(request)

View File

@ -2,7 +2,6 @@ package main
import (
"encoding/json"
"fmt"
)
type AMQPPayload struct {
@ -16,10 +15,11 @@ type AMQPPayloader interface {
}
func (h AMQPPayload) MarshalJSON() ([]byte, error) {
switch h.Type {
case "basic_publish":
return json.Marshal(h.Data)
default:
panic(fmt.Sprintf("AMQP payload cannot be marshaled: %s\n", h.Type))
}
return json.Marshal(h.Data)
// switch h.Type {
// case "basic_publish":
// return json.Marshal(h.Data)
// default:
// panic(fmt.Sprintf("AMQP payload cannot be marshaled: %s\n", h.Type))
// }
}