Implement the AMQP BasicPublish and fix some issues in the UI when the response payload is missing

This commit is contained in:
M. Mert Yildiran 2021-08-21 12:02:28 +03:00
parent 0c2140e11b
commit d4adc04c38
No known key found for this signature in database
GPG Key ID: D42ADB236521BF7A
5 changed files with 156 additions and 29 deletions

View File

@ -1,6 +1,11 @@
package main
import "fmt"
import (
"fmt"
"time"
"github.com/up9inc/mizu/tap/api"
)
var connectionMethodMap = map[int]string{
10: "connection start",
@ -80,7 +85,37 @@ var txMethodMap = map[int]string{
31: "tx rollback-ok",
}
type AMQPWrapper struct {
Method string
Details interface{}
}
func emitBasicPublish(eventBasicPublish BasicPublish, connectionInfo *api.ConnectionInfo, emitter api.Emitter) {
request := &api.GenericMessage{
IsRequest: true,
CaptureTime: time.Now(),
Payload: AMQPPayload{
Type: "basic_publish",
Data: &AMQPWrapper{
Method: "Basic Publish",
Details: eventBasicPublish,
},
},
}
item := &api.OutputChannelItem{
Protocol: protocol,
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
ConnectionInfo: nil,
Pair: &api.RequestResponsePair{
Request: *request,
Response: api.GenericMessage{},
},
}
emitter.Emit(item)
}
func printEventBasicPublish(eventBasicPublish BasicPublish) {
return
fmt.Printf(
"[%s] Exchange: %s, RoutingKey: %s, Mandatory: %t, Immediate: %t, Properties: %v, Body: %s\n",
basicMethodMap[40],
@ -94,6 +129,7 @@ func printEventBasicPublish(eventBasicPublish BasicPublish) {
}
func printEventBasicDeliver(eventBasicDeliver BasicDeliver) {
return
fmt.Printf(
"[%s] ConsumerTag: %s, DeliveryTag: %d, Redelivered: %t, Exchange: %s, RoutingKey: %s, Properties: %v, Body: %s\n",
basicMethodMap[60],
@ -108,6 +144,7 @@ func printEventBasicDeliver(eventBasicDeliver BasicDeliver) {
}
func printEventQueueDeclare(eventQueueDeclare QueueDeclare) {
return
fmt.Printf(
"[%s] Queue: %s, Passive: %t, Durable: %t, AutoDelete: %t, Exclusive: %t, NoWait: %t, Arguments: %v\n",
queueMethodMap[10],
@ -122,6 +159,7 @@ func printEventQueueDeclare(eventQueueDeclare QueueDeclare) {
}
func printEventExchangeDeclare(eventExchangeDeclare ExchangeDeclare) {
return
fmt.Printf(
"[%s] Exchange: %s, Type: %s, Passive: %t, Durable: %t, AutoDelete: %t, Internal: %t, NoWait: %t, Arguments: %v\n",
exchangeMethodMap[10],
@ -137,6 +175,7 @@ func printEventExchangeDeclare(eventExchangeDeclare ExchangeDeclare) {
}
func printEventConnectionStart(eventConnectionStart ConnectionStart) {
return
fmt.Printf(
"[%s] Version: %d.%d, ServerProperties: %v, Mechanisms: %s, Locales: %s\n",
connectionMethodMap[10],
@ -149,6 +188,7 @@ func printEventConnectionStart(eventConnectionStart ConnectionStart) {
}
func printEventConnectionClose(eventConnectionClose ConnectionClose) {
return
fmt.Printf(
"[%s] ReplyCode: %d, ReplyText: %s, ClassId: %d, MethodId: %d\n",
connectionMethodMap[50],
@ -160,6 +200,7 @@ func printEventConnectionClose(eventConnectionClose ConnectionClose) {
}
func printEventQueueBind(eventQueueBind QueueBind) {
return
fmt.Printf(
"[%s] Queue: %s, Exchange: %s, RoutingKey: %s, NoWait: %t, Arguments: %v\n",
queueMethodMap[20],
@ -172,6 +213,7 @@ func printEventQueueBind(eventQueueBind QueueBind) {
}
func printEventBasicConsume(eventBasicConsume BasicConsume) {
return
fmt.Printf(
"[%s] Queue: %s, ConsumerTag: %s, NoLocal: %t, NoAck: %t, Exclusive: %t, NoWait: %t, Arguments: %v\n",
basicMethodMap[20],

View File

@ -2,6 +2,8 @@ package main
import (
"bufio"
"encoding/json"
"fmt"
"io"
"log"
@ -10,7 +12,7 @@ import (
var protocol api.Protocol = api.Protocol{
Name: "amqp",
LongName: "Advanced Message Queuing Protocol",
LongName: "Advanced Message Queuing Protocol 0-9-1",
Abbreviation: "AMQP",
BackgroundColor: "#ff6600",
ForegroundColor: "#ffffff",
@ -40,6 +42,14 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em
var header *HeaderFrame
var body []byte
connectionInfo := &api.ConnectionInfo{
ClientIP: tcpID.SrcIP,
ClientPort: tcpID.SrcPort,
ServerIP: tcpID.DstIP,
ServerPort: tcpID.DstPort,
IsOutgoing: true,
}
eventBasicPublish := &BasicPublish{
Exchange: "",
RoutingKey: "",
@ -94,6 +104,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em
case *BasicPublish:
eventBasicPublish.Body = f.Body
printEventBasicPublish(*eventBasicPublish)
emitBasicPublish(*eventBasicPublish, connectionInfo, emitter)
case *BasicDeliver:
eventBasicDeliver.Body = f.Body
printEventBasicDeliver(*eventBasicDeliver)
@ -193,18 +204,67 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em
}
func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *api.MizuEntry {
// TODO: Implement
return nil
request := item.Pair.Request.Payload.(map[string]interface{})
reqDetails := request["Details"].(map[string]interface{})
entryBytes, _ := json.Marshal(item.Pair)
service := fmt.Sprintf("amqp")
return &api.MizuEntry{
ProtocolName: protocol.Name,
EntryId: entryId,
Entry: string(entryBytes),
Url: fmt.Sprintf("%s%s", service, reqDetails["Exchange"].(string)),
Method: request["Method"].(string),
Status: 0,
RequestSenderIp: "",
Service: service,
Timestamp: item.Timestamp,
Path: reqDetails["Exchange"].(string),
ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination,
SourceIp: "",
DestinationIp: "",
SourcePort: "",
DestinationPort: "",
IsOutgoing: true,
}
}
func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
// TODO: Implement
return nil
return &api.BaseEntryDetails{
Id: entry.EntryId,
Protocol: protocol,
Url: entry.Url,
RequestSenderIp: entry.RequestSenderIp,
Service: entry.Service,
Summary: entry.Path,
StatusCode: entry.Status,
Method: entry.Method,
Timestamp: entry.Timestamp,
SourceIp: entry.SourceIp,
DestinationIp: entry.DestinationIp,
SourcePort: entry.SourcePort,
DestinationPort: entry.DestinationPort,
IsOutgoing: entry.IsOutgoing,
Latency: 0,
Rules: api.ApplicableRules{
Latency: 0,
Status: false,
},
}
}
func (d dissecting) Represent(entry string) ([]byte, error) {
// TODO: Implement
return nil, nil
var root map[string]interface{}
json.Unmarshal([]byte(entry), &root)
representation := make(map[string]interface{}, 0)
// request := root["request"].(map[string]interface{})["payload"].(map[string]interface{})
// response := root["response"].(map[string]interface{})["payload"].(map[string]interface{})
// repRequest := representRequest(request)
// repResponse := representResponse(response)
// representation["request"] = repRequest
// representation["response"] = repResponse
return json.Marshal(representation)
}
var Dissector dissecting

View File

@ -0,0 +1,25 @@
package main
import (
"encoding/json"
"fmt"
)
type AMQPPayload struct {
Type string
Method string
Data interface{}
}
type AMQPPayloader interface {
MarshalJSON() ([]byte, error)
}
func (h AMQPPayload) MarshalJSON() ([]byte, error) {
switch h.Type {
case "basic_publish":
return json.Marshal(h.Data)
default:
panic(fmt.Sprintf("AMQP payload cannot be marshaled: %s\n", h.Type))
}
}

View File

@ -39,13 +39,12 @@ const HarEntryTitle: React.FC<any> = ({protocol, har}) => {
const {log: {entries}} = har;
const {response} = JSON.parse(entries[0].entry);
const {bodySize} = response.payload;
return <div className={classes.entryTitle}>
<Protocol protocol={protocol} horizontal={true}/>
<div style={{right: "30px", position: "absolute", display: "flex"}}>
<div style={{margin: "0 18px", opacity: 0.5}}>{formatSize(bodySize)}</div>
{response.payload && <div style={{margin: "0 18px", opacity: 0.5}}>{formatSize(response.payload.bodySize)}</div>}
<div style={{opacity: 0.5}}>{'rulesMatched' in entries[0] ? entries[0].rulesMatched?.length : '0'} Rules Applied</div>
</div>
</div>;
@ -56,12 +55,11 @@ const HarEntrySummary: React.FC<any> = ({har}) => {
const {log: {entries}} = har;
const {response, request} = JSON.parse(entries[0].entry);
const {status} = response.payload;
return <div className={classes.entrySummary}>
{status && <div style={{marginRight: 8}}>
<StatusCode statusCode={status}/>
{response.payload && <div style={{marginRight: 8}}>
<StatusCode statusCode={response.payload.status}/>
</div>}
<div style={{flexGrow: 1, overflow: 'hidden'}}>
<EndpointPath method={request?.payload.method} path={request?.payload.url}/>

View File

@ -6,22 +6,24 @@ import {HAREntryTableSection, HAREntryBodySection, HAREntryTablePolicySection} f
const SectionsRepresentation: React.FC<any> = ({data, color}) => {
const sections = []
data.forEach((row) => {
switch (row.type) {
case "table":
sections.push(
<HAREntryTableSection title={row.title} color={color} arrayToIterate={JSON.parse(row.data)}/>
)
break;
case "body":
sections.push(
<HAREntryBodySection color={color} content={row.data} encoding={row.encoding} contentType={row.mime_type}/>
)
break;
default:
break;
}
});
if (data !== undefined) {
data.forEach((row) => {
switch (row.type) {
case "table":
sections.push(
<HAREntryTableSection title={row.title} color={color} arrayToIterate={JSON.parse(row.data)}/>
)
break;
case "body":
sections.push(
<HAREntryBodySection color={color} content={row.data} encoding={row.encoding} contentType={row.mime_type}/>
)
break;
default:
break;
}
});
}
return <>{sections}</>;
}