Implement the AMQP ConnectionStart

This commit is contained in:
M. Mert Yildiran 2021-08-21 16:16:55 +03:00
parent 448f9d0836
commit 8fa1506969
No known key found for this signature in database
GPG Key ID: D42ADB236521BF7A
2 changed files with 91 additions and 12 deletions

View File

@ -551,17 +551,87 @@ func representExchangeDeclare(event map[string]interface{}) []interface{} {
return rep return rep
} }
func printEventConnectionStart(eventConnectionStart ConnectionStart) { func emitConnectionStart(event ConnectionStart, connectionInfo *api.ConnectionInfo, emitter api.Emitter) {
return request := &api.GenericMessage{
fmt.Printf( IsRequest: true,
"[%s] Version: %d.%d, ServerProperties: %v, Mechanisms: %s, Locales: %s\n", CaptureTime: time.Now(),
connectionMethodMap[10], Payload: AMQPPayload{
eventConnectionStart.VersionMajor, Type: "connection_start",
eventConnectionStart.VersionMinor, Data: &AMQPWrapper{
eventConnectionStart.ServerProperties, Method: connectionMethodMap[10],
eventConnectionStart.Mechanisms, Url: fmt.Sprintf("%s.%s", string(event.VersionMajor), string(event.VersionMinor)),
eventConnectionStart.Locales, Details: event,
) },
},
}
item := &api.OutputChannelItem{
Protocol: protocol,
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
ConnectionInfo: connectionInfo,
Pair: &api.RequestResponsePair{
Request: *request,
Response: api.GenericMessage{},
},
}
emitter.Emit(item)
}
func representConnectionStart(event map[string]interface{}) []interface{} {
rep := make([]interface{}, 0)
details, _ := json.Marshal([]map[string]string{
{
"name": "Version Major",
"value": fmt.Sprintf("%g", event["VersionMajor"].(float64)),
},
{
"name": "Version Minor",
"value": fmt.Sprintf("%g", event["VersionMinor"].(float64)),
},
{
"name": "Mechanisms",
"value": event["Mechanisms"].(string),
},
{
"name": "Locales",
"value": event["Locales"].(string),
},
})
rep = append(rep, map[string]string{
"type": "table",
"title": "Details",
"data": string(details),
})
if event["ServerProperties"] != nil {
headers := make([]map[string]string, 0)
for name, value := range event["ServerProperties"].(map[string]interface{}) {
var outcome string
switch value.(type) {
case string:
outcome = value.(string)
break
case map[string]interface{}:
x, _ := json.Marshal(value)
outcome = string(x)
break
default:
panic("Unknown data type for the server property!")
}
headers = append(headers, map[string]string{
"name": name,
"value": outcome,
})
}
headersMarshaled, _ := json.Marshal(headers)
rep = append(rep, map[string]string{
"type": "table",
"title": "Server Properties",
"data": string(headersMarshaled),
})
}
return rep
} }
func printEventConnectionClose(eventConnectionClose ConnectionClose) { func printEventConnectionClose(eventConnectionClose ConnectionClose) {

View File

@ -181,7 +181,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em
Mechanisms: m.Mechanisms, Mechanisms: m.Mechanisms,
Locales: m.Locales, Locales: m.Locales,
} }
printEventConnectionStart(*eventConnectionStart) emitConnectionStart(*eventConnectionStart, connectionInfo, emitter)
case *ConnectionClose: case *ConnectionClose:
eventConnectionClose := &ConnectionClose{ eventConnectionClose := &ConnectionClose{
@ -214,9 +214,13 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
case basicMethodMap[60]: case basicMethodMap[60]:
case exchangeMethodMap[10]: case exchangeMethodMap[10]:
summary = reqDetails["Exchange"].(string) summary = reqDetails["Exchange"].(string)
break
case queueMethodMap[10]: case queueMethodMap[10]:
summary = reqDetails["Queue"].(string) summary = reqDetails["Queue"].(string)
break break
case connectionMethodMap[10]:
summary = fmt.Sprintf("%g.%g", reqDetails["VersionMajor"].(float64), reqDetails["VersionMinor"].(float64))
break
} }
return &api.MizuEntry{ return &api.MizuEntry{
@ -279,11 +283,16 @@ func (d dissecting) Represent(entry string) ([]byte, error) {
break break
case basicMethodMap[60]: case basicMethodMap[60]:
repRequest = representBasicDeliver(details) repRequest = representBasicDeliver(details)
break
case queueMethodMap[10]: case queueMethodMap[10]:
repRequest = representQueueDeclare(details) repRequest = representQueueDeclare(details)
break
case exchangeMethodMap[10]: case exchangeMethodMap[10]:
repRequest = representExchangeDeclare(details) repRequest = representExchangeDeclare(details)
break break
case connectionMethodMap[10]:
repRequest = representConnectionStart(details)
break
} }
// response := root["response"].(map[string]interface{})["payload"].(map[string]interface{}) // response := root["response"].(map[string]interface{})["payload"].(map[string]interface{})
// repRequest := representRequest(request) // repRequest := representRequest(request)