Implement the AMQP ExchangeDeclare

This commit is contained in:
M. Mert Yildiran 2021-08-21 15:59:44 +03:00
parent 81eb36e21e
commit 448f9d0836
No known key found for this signature in database
GPG Key ID: D42ADB236521BF7A
2 changed files with 86 additions and 16 deletions

View File

@ -435,7 +435,7 @@ func representQueueDeclare(event map[string]interface{}) []interface{} {
"value": strconv.FormatBool(event["Exclusive"].(bool)),
},
{
"name": "AutoDelete",
"name": "Auto Delete",
"value": strconv.FormatBool(event["AutoDelete"].(bool)),
},
{
@ -468,20 +468,87 @@ func representQueueDeclare(event map[string]interface{}) []interface{} {
return rep
}
func printEventExchangeDeclare(eventExchangeDeclare ExchangeDeclare) {
return
fmt.Printf(
"[%s] Exchange: %s, Type: %s, Passive: %t, Durable: %t, AutoDelete: %t, Internal: %t, NoWait: %t, Arguments: %v\n",
exchangeMethodMap[10],
eventExchangeDeclare.Exchange,
eventExchangeDeclare.Type,
eventExchangeDeclare.Passive,
eventExchangeDeclare.Durable,
eventExchangeDeclare.AutoDelete,
eventExchangeDeclare.Internal,
eventExchangeDeclare.NoWait,
eventExchangeDeclare.Arguments,
)
func emitExchangeDeclare(event ExchangeDeclare, connectionInfo *api.ConnectionInfo, emitter api.Emitter) {
request := &api.GenericMessage{
IsRequest: true,
CaptureTime: time.Now(),
Payload: AMQPPayload{
Type: "exchange_declare",
Data: &AMQPWrapper{
Method: exchangeMethodMap[10],
Url: event.Exchange,
Details: event,
},
},
}
item := &api.OutputChannelItem{
Protocol: protocol,
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
ConnectionInfo: connectionInfo,
Pair: &api.RequestResponsePair{
Request: *request,
Response: api.GenericMessage{},
},
}
emitter.Emit(item)
}
func representExchangeDeclare(event map[string]interface{}) []interface{} {
rep := make([]interface{}, 0)
details, _ := json.Marshal([]map[string]string{
{
"name": "Exchange",
"value": event["Exchange"].(string),
},
{
"name": "Type",
"value": event["Type"].(string),
},
{
"name": "Passive",
"value": strconv.FormatBool(event["Passive"].(bool)),
},
{
"name": "Durable",
"value": strconv.FormatBool(event["Durable"].(bool)),
},
{
"name": "Auto Delete",
"value": strconv.FormatBool(event["AutoDelete"].(bool)),
},
{
"name": "Internal",
"value": strconv.FormatBool(event["Internal"].(bool)),
},
{
"name": "NoWait",
"value": strconv.FormatBool(event["NoWait"].(bool)),
},
})
rep = append(rep, map[string]string{
"type": "table",
"title": "Details",
"data": string(details),
})
if event["Arguments"] != nil {
headers := make([]map[string]string, 0)
for name, value := range event["Arguments"].(map[string]interface{}) {
headers = append(headers, map[string]string{
"name": name,
"value": value.(string),
})
}
headersMarshaled, _ := json.Marshal(headers)
rep = append(rep, map[string]string{
"type": "table",
"title": "Arguments",
"data": string(headersMarshaled),
})
}
return rep
}
func printEventConnectionStart(eventConnectionStart ConnectionStart) {

View File

@ -171,7 +171,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em
NoWait: m.NoWait,
Arguments: m.Arguments,
}
printEventExchangeDeclare(*eventExchangeDeclare)
emitExchangeDeclare(*eventExchangeDeclare, connectionInfo, emitter)
case *ConnectionStart:
eventConnectionStart := &ConnectionStart{
@ -212,6 +212,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
switch request["method"] {
case basicMethodMap[40]:
case basicMethodMap[60]:
case exchangeMethodMap[10]:
summary = reqDetails["Exchange"].(string)
case queueMethodMap[10]:
summary = reqDetails["Queue"].(string)
@ -280,6 +281,8 @@ func (d dissecting) Represent(entry string) ([]byte, error) {
repRequest = representBasicDeliver(details)
case queueMethodMap[10]:
repRequest = representQueueDeclare(details)
case exchangeMethodMap[10]:
repRequest = representExchangeDeclare(details)
break
}
// response := root["response"].(map[string]interface{})["payload"].(map[string]interface{})