mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-07-25 13:43:01 +00:00
Implement the AMQP QueueDeclare
This commit is contained in:
parent
5effd97c27
commit
81eb36e21e
@ -389,19 +389,83 @@ func representBasicDeliver(event map[string]interface{}) []interface{} {
|
|||||||
return rep
|
return rep
|
||||||
}
|
}
|
||||||
|
|
||||||
func printEventQueueDeclare(eventQueueDeclare QueueDeclare) {
|
func emitQueueDeclare(event QueueDeclare, connectionInfo *api.ConnectionInfo, emitter api.Emitter) {
|
||||||
return
|
request := &api.GenericMessage{
|
||||||
fmt.Printf(
|
IsRequest: true,
|
||||||
"[%s] Queue: %s, Passive: %t, Durable: %t, AutoDelete: %t, Exclusive: %t, NoWait: %t, Arguments: %v\n",
|
CaptureTime: time.Now(),
|
||||||
queueMethodMap[10],
|
Payload: AMQPPayload{
|
||||||
eventQueueDeclare.Queue,
|
Type: "queue_declare",
|
||||||
eventQueueDeclare.Passive,
|
Data: &AMQPWrapper{
|
||||||
eventQueueDeclare.Durable,
|
Method: queueMethodMap[10],
|
||||||
eventQueueDeclare.AutoDelete,
|
Url: event.Queue,
|
||||||
eventQueueDeclare.Exclusive,
|
Details: event,
|
||||||
eventQueueDeclare.NoWait,
|
},
|
||||||
eventQueueDeclare.Arguments,
|
},
|
||||||
)
|
}
|
||||||
|
item := &api.OutputChannelItem{
|
||||||
|
Protocol: protocol,
|
||||||
|
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
|
||||||
|
ConnectionInfo: connectionInfo,
|
||||||
|
Pair: &api.RequestResponsePair{
|
||||||
|
Request: *request,
|
||||||
|
Response: api.GenericMessage{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
emitter.Emit(item)
|
||||||
|
}
|
||||||
|
|
||||||
|
func representQueueDeclare(event map[string]interface{}) []interface{} {
|
||||||
|
rep := make([]interface{}, 0)
|
||||||
|
|
||||||
|
details, _ := json.Marshal([]map[string]string{
|
||||||
|
{
|
||||||
|
"name": "Queue",
|
||||||
|
"value": event["Queue"].(string),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Passive",
|
||||||
|
"value": strconv.FormatBool(event["Passive"].(bool)),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Durable",
|
||||||
|
"value": strconv.FormatBool(event["Durable"].(bool)),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Exclusive",
|
||||||
|
"value": strconv.FormatBool(event["Exclusive"].(bool)),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "AutoDelete",
|
||||||
|
"value": strconv.FormatBool(event["AutoDelete"].(bool)),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "NoWait",
|
||||||
|
"value": strconv.FormatBool(event["NoWait"].(bool)),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
rep = append(rep, map[string]string{
|
||||||
|
"type": "table",
|
||||||
|
"title": "Details",
|
||||||
|
"data": string(details),
|
||||||
|
})
|
||||||
|
|
||||||
|
if event["Arguments"] != nil {
|
||||||
|
headers := make([]map[string]string, 0)
|
||||||
|
for name, value := range event["Arguments"].(map[string]interface{}) {
|
||||||
|
headers = append(headers, map[string]string{
|
||||||
|
"name": name,
|
||||||
|
"value": value.(string),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
headersMarshaled, _ := json.Marshal(headers)
|
||||||
|
rep = append(rep, map[string]string{
|
||||||
|
"type": "table",
|
||||||
|
"title": "Arguments",
|
||||||
|
"data": string(headersMarshaled),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return rep
|
||||||
}
|
}
|
||||||
|
|
||||||
func printEventExchangeDeclare(eventExchangeDeclare ExchangeDeclare) {
|
func printEventExchangeDeclare(eventExchangeDeclare ExchangeDeclare) {
|
||||||
|
@ -158,7 +158,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em
|
|||||||
NoWait: m.NoWait,
|
NoWait: m.NoWait,
|
||||||
Arguments: m.Arguments,
|
Arguments: m.Arguments,
|
||||||
}
|
}
|
||||||
printEventQueueDeclare(*eventQueueDeclare)
|
emitQueueDeclare(*eventQueueDeclare, connectionInfo, emitter)
|
||||||
|
|
||||||
case *ExchangeDeclare:
|
case *ExchangeDeclare:
|
||||||
eventExchangeDeclare := &ExchangeDeclare{
|
eventExchangeDeclare := &ExchangeDeclare{
|
||||||
@ -207,17 +207,28 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
|
|||||||
reqDetails := request["details"].(map[string]interface{})
|
reqDetails := request["details"].(map[string]interface{})
|
||||||
entryBytes, _ := json.Marshal(item.Pair)
|
entryBytes, _ := json.Marshal(item.Pair)
|
||||||
service := fmt.Sprintf("amqp")
|
service := fmt.Sprintf("amqp")
|
||||||
|
|
||||||
|
var summary string
|
||||||
|
switch request["method"] {
|
||||||
|
case basicMethodMap[40]:
|
||||||
|
case basicMethodMap[60]:
|
||||||
|
summary = reqDetails["Exchange"].(string)
|
||||||
|
case queueMethodMap[10]:
|
||||||
|
summary = reqDetails["Queue"].(string)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
return &api.MizuEntry{
|
return &api.MizuEntry{
|
||||||
ProtocolName: protocol.Name,
|
ProtocolName: protocol.Name,
|
||||||
EntryId: entryId,
|
EntryId: entryId,
|
||||||
Entry: string(entryBytes),
|
Entry: string(entryBytes),
|
||||||
Url: fmt.Sprintf("%s%s", service, reqDetails["Exchange"].(string)),
|
Url: fmt.Sprintf("%s%s", service, summary),
|
||||||
Method: request["method"].(string),
|
Method: request["method"].(string),
|
||||||
Status: 0,
|
Status: 0,
|
||||||
RequestSenderIp: item.ConnectionInfo.ClientIP,
|
RequestSenderIp: item.ConnectionInfo.ClientIP,
|
||||||
Service: service,
|
Service: service,
|
||||||
Timestamp: item.Timestamp,
|
Timestamp: item.Timestamp,
|
||||||
Path: reqDetails["Exchange"].(string),
|
Path: summary,
|
||||||
ResolvedSource: resolvedSource,
|
ResolvedSource: resolvedSource,
|
||||||
ResolvedDestination: resolvedDestination,
|
ResolvedDestination: resolvedDestination,
|
||||||
SourceIp: item.ConnectionInfo.ClientIP,
|
SourceIp: item.ConnectionInfo.ClientIP,
|
||||||
@ -267,6 +278,8 @@ func (d dissecting) Represent(entry string) ([]byte, error) {
|
|||||||
break
|
break
|
||||||
case basicMethodMap[60]:
|
case basicMethodMap[60]:
|
||||||
repRequest = representBasicDeliver(details)
|
repRequest = representBasicDeliver(details)
|
||||||
|
case queueMethodMap[10]:
|
||||||
|
repRequest = representQueueDeclare(details)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
// response := root["response"].(map[string]interface{})["payload"].(map[string]interface{})
|
// response := root["response"].(map[string]interface{})["payload"].(map[string]interface{})
|
||||||
|
Loading…
Reference in New Issue
Block a user