mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-07-30 07:51:10 +00:00
Implement the AMQP QueueBind
This commit is contained in:
parent
21cac50ccb
commit
1a89e3219c
@ -287,7 +287,7 @@ func representBasicPublish(event map[string]interface{}) []interface{} {
|
||||
return rep
|
||||
}
|
||||
|
||||
func emitBasicDeliver(event BasicPublish, connectionInfo *api.ConnectionInfo, emitter api.Emitter) {
|
||||
func emitBasicDeliver(event BasicDeliver, connectionInfo *api.ConnectionInfo, emitter api.Emitter) {
|
||||
request := &api.GenericMessage{
|
||||
IsRequest: true,
|
||||
CaptureTime: time.Now(),
|
||||
@ -689,17 +689,75 @@ func representConnectionClose(event map[string]interface{}) []interface{} {
|
||||
return rep
|
||||
}
|
||||
|
||||
func printEventQueueBind(eventQueueBind QueueBind) {
|
||||
return
|
||||
fmt.Printf(
|
||||
"[%s] Queue: %s, Exchange: %s, RoutingKey: %s, NoWait: %t, Arguments: %v\n",
|
||||
queueMethodMap[20],
|
||||
eventQueueBind.Queue,
|
||||
eventQueueBind.Exchange,
|
||||
eventQueueBind.RoutingKey,
|
||||
eventQueueBind.NoWait,
|
||||
eventQueueBind.Arguments,
|
||||
)
|
||||
func emitQueueBind(event QueueBind, connectionInfo *api.ConnectionInfo, emitter api.Emitter) {
|
||||
request := &api.GenericMessage{
|
||||
IsRequest: true,
|
||||
CaptureTime: time.Now(),
|
||||
Payload: AMQPPayload{
|
||||
Type: "queue_bind",
|
||||
Data: &AMQPWrapper{
|
||||
Method: queueMethodMap[20],
|
||||
Url: event.Queue,
|
||||
Details: event,
|
||||
},
|
||||
},
|
||||
}
|
||||
item := &api.OutputChannelItem{
|
||||
Protocol: protocol,
|
||||
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
|
||||
ConnectionInfo: connectionInfo,
|
||||
Pair: &api.RequestResponsePair{
|
||||
Request: *request,
|
||||
Response: api.GenericMessage{},
|
||||
},
|
||||
}
|
||||
emitter.Emit(item)
|
||||
}
|
||||
|
||||
func representQueueBind(event map[string]interface{}) []interface{} {
|
||||
rep := make([]interface{}, 0)
|
||||
|
||||
details, _ := json.Marshal([]map[string]string{
|
||||
{
|
||||
"name": "Queue",
|
||||
"value": event["Queue"].(string),
|
||||
},
|
||||
{
|
||||
"name": "Exchange",
|
||||
"value": event["Exchange"].(string),
|
||||
},
|
||||
{
|
||||
"name": "RoutingKey",
|
||||
"value": event["RoutingKey"].(string),
|
||||
},
|
||||
{
|
||||
"name": "NoWait",
|
||||
"value": strconv.FormatBool(event["NoWait"].(bool)),
|
||||
},
|
||||
})
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"title": "Details",
|
||||
"data": string(details),
|
||||
})
|
||||
|
||||
if event["Arguments"] != nil {
|
||||
headers := make([]map[string]string, 0)
|
||||
for name, value := range event["Arguments"].(map[string]interface{}) {
|
||||
headers = append(headers, map[string]string{
|
||||
"name": name,
|
||||
"value": value.(string),
|
||||
})
|
||||
}
|
||||
headersMarshaled, _ := json.Marshal(headers)
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"title": "Arguments",
|
||||
"data": string(headersMarshaled),
|
||||
})
|
||||
}
|
||||
|
||||
return rep
|
||||
}
|
||||
|
||||
func printEventBasicConsume(eventBasicConsume BasicConsume) {
|
||||
|
@ -106,7 +106,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em
|
||||
emitBasicPublish(*eventBasicPublish, connectionInfo, emitter)
|
||||
case *BasicDeliver:
|
||||
eventBasicDeliver.Body = f.Body
|
||||
emitBasicDeliver(*eventBasicPublish, connectionInfo, emitter)
|
||||
emitBasicDeliver(*eventBasicDeliver, connectionInfo, emitter)
|
||||
default:
|
||||
}
|
||||
|
||||
@ -127,7 +127,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em
|
||||
NoWait: m.NoWait,
|
||||
Arguments: m.Arguments,
|
||||
}
|
||||
printEventQueueBind(*eventQueueBind)
|
||||
emitQueueBind(*eventQueueBind, connectionInfo, emitter)
|
||||
|
||||
case *BasicConsume:
|
||||
eventBasicConsume := &BasicConsume{
|
||||
@ -228,6 +228,9 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
|
||||
case connectionMethodMap[50]:
|
||||
summary = reqDetails["ReplyText"].(string)
|
||||
break
|
||||
case queueMethodMap[20]:
|
||||
summary = reqDetails["Queue"].(string)
|
||||
break
|
||||
}
|
||||
|
||||
return &api.MizuEntry{
|
||||
@ -303,6 +306,9 @@ func (d dissecting) Represent(entry string) ([]byte, error) {
|
||||
case connectionMethodMap[50]:
|
||||
repRequest = representConnectionClose(details)
|
||||
break
|
||||
case queueMethodMap[20]:
|
||||
repRequest = representQueueBind(details)
|
||||
break
|
||||
}
|
||||
// response := root["response"].(map[string]interface{})["payload"].(map[string]interface{})
|
||||
// repRequest := representRequest(request)
|
||||
|
Loading…
Reference in New Issue
Block a user