mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-07-24 21:23:33 +00:00
Implement the AMQP ConnectionClose
This commit is contained in:
parent
8fa1506969
commit
21cac50ccb
@ -634,16 +634,59 @@ func representConnectionStart(event map[string]interface{}) []interface{} {
|
|||||||
return rep
|
return rep
|
||||||
}
|
}
|
||||||
|
|
||||||
func printEventConnectionClose(eventConnectionClose ConnectionClose) {
|
func emitConnectionClose(event ConnectionClose, connectionInfo *api.ConnectionInfo, emitter api.Emitter) {
|
||||||
return
|
request := &api.GenericMessage{
|
||||||
fmt.Printf(
|
IsRequest: true,
|
||||||
"[%s] ReplyCode: %d, ReplyText: %s, ClassId: %d, MethodId: %d\n",
|
CaptureTime: time.Now(),
|
||||||
connectionMethodMap[50],
|
Payload: AMQPPayload{
|
||||||
eventConnectionClose.ReplyCode,
|
Type: "connection_close",
|
||||||
eventConnectionClose.ReplyText,
|
Data: &AMQPWrapper{
|
||||||
eventConnectionClose.ClassId,
|
Method: connectionMethodMap[50],
|
||||||
eventConnectionClose.MethodId,
|
Url: event.ReplyText,
|
||||||
)
|
Details: event,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
item := &api.OutputChannelItem{
|
||||||
|
Protocol: protocol,
|
||||||
|
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
|
||||||
|
ConnectionInfo: connectionInfo,
|
||||||
|
Pair: &api.RequestResponsePair{
|
||||||
|
Request: *request,
|
||||||
|
Response: api.GenericMessage{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
emitter.Emit(item)
|
||||||
|
}
|
||||||
|
|
||||||
|
func representConnectionClose(event map[string]interface{}) []interface{} {
|
||||||
|
rep := make([]interface{}, 0)
|
||||||
|
|
||||||
|
details, _ := json.Marshal([]map[string]string{
|
||||||
|
{
|
||||||
|
"name": "Reply Code",
|
||||||
|
"value": fmt.Sprintf("%g", event["ReplyCode"].(float64)),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Reply Text",
|
||||||
|
"value": event["ReplyText"].(string),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Class ID",
|
||||||
|
"value": fmt.Sprintf("%g", event["ClassId"].(float64)),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Method ID",
|
||||||
|
"value": fmt.Sprintf("%g", event["MethodId"].(float64)),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
rep = append(rep, map[string]string{
|
||||||
|
"type": "table",
|
||||||
|
"title": "Details",
|
||||||
|
"data": string(details),
|
||||||
|
})
|
||||||
|
|
||||||
|
return rep
|
||||||
}
|
}
|
||||||
|
|
||||||
func printEventQueueBind(eventQueueBind QueueBind) {
|
func printEventQueueBind(eventQueueBind QueueBind) {
|
||||||
|
@ -190,7 +190,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em
|
|||||||
ClassId: m.ClassId,
|
ClassId: m.ClassId,
|
||||||
MethodId: m.MethodId,
|
MethodId: m.MethodId,
|
||||||
}
|
}
|
||||||
printEventConnectionClose(*eventConnectionClose)
|
emitConnectionClose(*eventConnectionClose, connectionInfo, emitter)
|
||||||
|
|
||||||
default:
|
default:
|
||||||
|
|
||||||
@ -208,10 +208,14 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
|
|||||||
entryBytes, _ := json.Marshal(item.Pair)
|
entryBytes, _ := json.Marshal(item.Pair)
|
||||||
service := fmt.Sprintf("amqp")
|
service := fmt.Sprintf("amqp")
|
||||||
|
|
||||||
var summary string
|
summary := ""
|
||||||
switch request["method"] {
|
switch request["method"] {
|
||||||
case basicMethodMap[40]:
|
case basicMethodMap[40]:
|
||||||
|
summary = reqDetails["Exchange"].(string)
|
||||||
|
break
|
||||||
case basicMethodMap[60]:
|
case basicMethodMap[60]:
|
||||||
|
summary = reqDetails["Exchange"].(string)
|
||||||
|
break
|
||||||
case exchangeMethodMap[10]:
|
case exchangeMethodMap[10]:
|
||||||
summary = reqDetails["Exchange"].(string)
|
summary = reqDetails["Exchange"].(string)
|
||||||
break
|
break
|
||||||
@ -221,6 +225,9 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
|
|||||||
case connectionMethodMap[10]:
|
case connectionMethodMap[10]:
|
||||||
summary = fmt.Sprintf("%g.%g", reqDetails["VersionMajor"].(float64), reqDetails["VersionMinor"].(float64))
|
summary = fmt.Sprintf("%g.%g", reqDetails["VersionMajor"].(float64), reqDetails["VersionMinor"].(float64))
|
||||||
break
|
break
|
||||||
|
case connectionMethodMap[50]:
|
||||||
|
summary = reqDetails["ReplyText"].(string)
|
||||||
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
return &api.MizuEntry{
|
return &api.MizuEntry{
|
||||||
@ -293,6 +300,9 @@ func (d dissecting) Represent(entry string) ([]byte, error) {
|
|||||||
case connectionMethodMap[10]:
|
case connectionMethodMap[10]:
|
||||||
repRequest = representConnectionStart(details)
|
repRequest = representConnectionStart(details)
|
||||||
break
|
break
|
||||||
|
case connectionMethodMap[50]:
|
||||||
|
repRequest = representConnectionClose(details)
|
||||||
|
break
|
||||||
}
|
}
|
||||||
// response := root["response"].(map[string]interface{})["payload"].(map[string]interface{})
|
// response := root["response"].(map[string]interface{})["payload"].(map[string]interface{})
|
||||||
// repRequest := representRequest(request)
|
// repRequest := representRequest(request)
|
||||||
|
Loading…
Reference in New Issue
Block a user