From d4adc04c38f57de2d234d31982cc69e141ae81cb Mon Sep 17 00:00:00 2001 From: "M. Mert Yildiran" Date: Sat, 21 Aug 2021 12:02:28 +0300 Subject: [PATCH] Implement the AMQP `BasicPublish` and fix some issues in the UI when the response payload is missing --- tap/extensions/amqp/helpers.go | 44 ++++++++++- tap/extensions/amqp/main.go | 74 +++++++++++++++++-- tap/extensions/amqp/structs.go | 25 +++++++ ui/src/components/HarEntryDetailed.tsx | 8 +- .../HarEntryViewer/HAREntryViewer.tsx | 34 +++++---- 5 files changed, 156 insertions(+), 29 deletions(-) create mode 100644 tap/extensions/amqp/structs.go diff --git a/tap/extensions/amqp/helpers.go b/tap/extensions/amqp/helpers.go index 35d62e3b8..8d4c64a57 100644 --- a/tap/extensions/amqp/helpers.go +++ b/tap/extensions/amqp/helpers.go @@ -1,6 +1,11 @@ package main -import "fmt" +import ( + "fmt" + "time" + + "github.com/up9inc/mizu/tap/api" +) var connectionMethodMap = map[int]string{ 10: "connection start", @@ -80,7 +85,37 @@ var txMethodMap = map[int]string{ 31: "tx rollback-ok", } +type AMQPWrapper struct { + Method string + Details interface{} +} + +func emitBasicPublish(eventBasicPublish BasicPublish, connectionInfo *api.ConnectionInfo, emitter api.Emitter) { + request := &api.GenericMessage{ + IsRequest: true, + CaptureTime: time.Now(), + Payload: AMQPPayload{ + Type: "basic_publish", + Data: &AMQPWrapper{ + Method: "Basic Publish", + Details: eventBasicPublish, + }, + }, + } + item := &api.OutputChannelItem{ + Protocol: protocol, + Timestamp: time.Now().UnixNano() / int64(time.Millisecond), + ConnectionInfo: nil, + Pair: &api.RequestResponsePair{ + Request: *request, + Response: api.GenericMessage{}, + }, + } + emitter.Emit(item) +} + func printEventBasicPublish(eventBasicPublish BasicPublish) { + return fmt.Printf( "[%s] Exchange: %s, RoutingKey: %s, Mandatory: %t, Immediate: %t, Properties: %v, Body: %s\n", basicMethodMap[40], @@ -94,6 +129,7 @@ func printEventBasicPublish(eventBasicPublish BasicPublish) { } func printEventBasicDeliver(eventBasicDeliver BasicDeliver) { + return fmt.Printf( "[%s] ConsumerTag: %s, DeliveryTag: %d, Redelivered: %t, Exchange: %s, RoutingKey: %s, Properties: %v, Body: %s\n", basicMethodMap[60], @@ -108,6 +144,7 @@ func printEventBasicDeliver(eventBasicDeliver BasicDeliver) { } func printEventQueueDeclare(eventQueueDeclare QueueDeclare) { + return fmt.Printf( "[%s] Queue: %s, Passive: %t, Durable: %t, AutoDelete: %t, Exclusive: %t, NoWait: %t, Arguments: %v\n", queueMethodMap[10], @@ -122,6 +159,7 @@ func printEventQueueDeclare(eventQueueDeclare QueueDeclare) { } func printEventExchangeDeclare(eventExchangeDeclare ExchangeDeclare) { + return fmt.Printf( "[%s] Exchange: %s, Type: %s, Passive: %t, Durable: %t, AutoDelete: %t, Internal: %t, NoWait: %t, Arguments: %v\n", exchangeMethodMap[10], @@ -137,6 +175,7 @@ func printEventExchangeDeclare(eventExchangeDeclare ExchangeDeclare) { } func printEventConnectionStart(eventConnectionStart ConnectionStart) { + return fmt.Printf( "[%s] Version: %d.%d, ServerProperties: %v, Mechanisms: %s, Locales: %s\n", connectionMethodMap[10], @@ -149,6 +188,7 @@ func printEventConnectionStart(eventConnectionStart ConnectionStart) { } func printEventConnectionClose(eventConnectionClose ConnectionClose) { + return fmt.Printf( "[%s] ReplyCode: %d, ReplyText: %s, ClassId: %d, MethodId: %d\n", connectionMethodMap[50], @@ -160,6 +200,7 @@ func printEventConnectionClose(eventConnectionClose ConnectionClose) { } func printEventQueueBind(eventQueueBind QueueBind) { + return fmt.Printf( "[%s] Queue: %s, Exchange: %s, RoutingKey: %s, NoWait: %t, Arguments: %v\n", queueMethodMap[20], @@ -172,6 +213,7 @@ func printEventQueueBind(eventQueueBind QueueBind) { } func printEventBasicConsume(eventBasicConsume BasicConsume) { + return fmt.Printf( "[%s] Queue: %s, ConsumerTag: %s, NoLocal: %t, NoAck: %t, Exclusive: %t, NoWait: %t, Arguments: %v\n", basicMethodMap[20], diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index c7ddc5cbb..e8fd30d94 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -2,6 +2,8 @@ package main import ( "bufio" + "encoding/json" + "fmt" "io" "log" @@ -10,7 +12,7 @@ import ( var protocol api.Protocol = api.Protocol{ Name: "amqp", - LongName: "Advanced Message Queuing Protocol", + LongName: "Advanced Message Queuing Protocol 0-9-1", Abbreviation: "AMQP", BackgroundColor: "#ff6600", ForegroundColor: "#ffffff", @@ -40,6 +42,14 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em var header *HeaderFrame var body []byte + connectionInfo := &api.ConnectionInfo{ + ClientIP: tcpID.SrcIP, + ClientPort: tcpID.SrcPort, + ServerIP: tcpID.DstIP, + ServerPort: tcpID.DstPort, + IsOutgoing: true, + } + eventBasicPublish := &BasicPublish{ Exchange: "", RoutingKey: "", @@ -94,6 +104,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em case *BasicPublish: eventBasicPublish.Body = f.Body printEventBasicPublish(*eventBasicPublish) + emitBasicPublish(*eventBasicPublish, connectionInfo, emitter) case *BasicDeliver: eventBasicDeliver.Body = f.Body printEventBasicDeliver(*eventBasicDeliver) @@ -193,18 +204,67 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em } func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *api.MizuEntry { - // TODO: Implement - return nil + request := item.Pair.Request.Payload.(map[string]interface{}) + reqDetails := request["Details"].(map[string]interface{}) + entryBytes, _ := json.Marshal(item.Pair) + service := fmt.Sprintf("amqp") + return &api.MizuEntry{ + ProtocolName: protocol.Name, + EntryId: entryId, + Entry: string(entryBytes), + Url: fmt.Sprintf("%s%s", service, reqDetails["Exchange"].(string)), + Method: request["Method"].(string), + Status: 0, + RequestSenderIp: "", + Service: service, + Timestamp: item.Timestamp, + Path: reqDetails["Exchange"].(string), + ResolvedSource: resolvedSource, + ResolvedDestination: resolvedDestination, + SourceIp: "", + DestinationIp: "", + SourcePort: "", + DestinationPort: "", + IsOutgoing: true, + } + } func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails { - // TODO: Implement - return nil + return &api.BaseEntryDetails{ + Id: entry.EntryId, + Protocol: protocol, + Url: entry.Url, + RequestSenderIp: entry.RequestSenderIp, + Service: entry.Service, + Summary: entry.Path, + StatusCode: entry.Status, + Method: entry.Method, + Timestamp: entry.Timestamp, + SourceIp: entry.SourceIp, + DestinationIp: entry.DestinationIp, + SourcePort: entry.SourcePort, + DestinationPort: entry.DestinationPort, + IsOutgoing: entry.IsOutgoing, + Latency: 0, + Rules: api.ApplicableRules{ + Latency: 0, + Status: false, + }, + } } func (d dissecting) Represent(entry string) ([]byte, error) { - // TODO: Implement - return nil, nil + var root map[string]interface{} + json.Unmarshal([]byte(entry), &root) + representation := make(map[string]interface{}, 0) + // request := root["request"].(map[string]interface{})["payload"].(map[string]interface{}) + // response := root["response"].(map[string]interface{})["payload"].(map[string]interface{}) + // repRequest := representRequest(request) + // repResponse := representResponse(response) + // representation["request"] = repRequest + // representation["response"] = repResponse + return json.Marshal(representation) } var Dissector dissecting diff --git a/tap/extensions/amqp/structs.go b/tap/extensions/amqp/structs.go new file mode 100644 index 000000000..67a645883 --- /dev/null +++ b/tap/extensions/amqp/structs.go @@ -0,0 +1,25 @@ +package main + +import ( + "encoding/json" + "fmt" +) + +type AMQPPayload struct { + Type string + Method string + Data interface{} +} + +type AMQPPayloader interface { + MarshalJSON() ([]byte, error) +} + +func (h AMQPPayload) MarshalJSON() ([]byte, error) { + switch h.Type { + case "basic_publish": + return json.Marshal(h.Data) + default: + panic(fmt.Sprintf("AMQP payload cannot be marshaled: %s\n", h.Type)) + } +} diff --git a/ui/src/components/HarEntryDetailed.tsx b/ui/src/components/HarEntryDetailed.tsx index 62430341e..a7ad371d5 100644 --- a/ui/src/components/HarEntryDetailed.tsx +++ b/ui/src/components/HarEntryDetailed.tsx @@ -39,13 +39,12 @@ const HarEntryTitle: React.FC = ({protocol, har}) => { const {log: {entries}} = har; const {response} = JSON.parse(entries[0].entry); - const {bodySize} = response.payload; return
-
{formatSize(bodySize)}
+ {response.payload &&
{formatSize(response.payload.bodySize)}
}
{'rulesMatched' in entries[0] ? entries[0].rulesMatched?.length : '0'} Rules Applied
; @@ -56,12 +55,11 @@ const HarEntrySummary: React.FC = ({har}) => { const {log: {entries}} = har; const {response, request} = JSON.parse(entries[0].entry); - const {status} = response.payload; return
- {status &&
- + {response.payload &&
+
}
diff --git a/ui/src/components/HarEntryViewer/HAREntryViewer.tsx b/ui/src/components/HarEntryViewer/HAREntryViewer.tsx index 141f12204..56070dd53 100644 --- a/ui/src/components/HarEntryViewer/HAREntryViewer.tsx +++ b/ui/src/components/HarEntryViewer/HAREntryViewer.tsx @@ -6,22 +6,24 @@ import {HAREntryTableSection, HAREntryBodySection, HAREntryTablePolicySection} f const SectionsRepresentation: React.FC = ({data, color}) => { const sections = [] - data.forEach((row) => { - switch (row.type) { - case "table": - sections.push( - - ) - break; - case "body": - sections.push( - - ) - break; - default: - break; - } - }); + if (data !== undefined) { + data.forEach((row) => { + switch (row.type) { + case "table": + sections.push( + + ) + break; + case "body": + sections.push( + + ) + break; + default: + break; + } + }); + } return <>{sections}; }