Refactor the AMQP and Kafka implementations to create the summary string only inside the Analyze method

This commit is contained in:
M. Mert Yildiran 2021-08-22 15:52:56 +03:00
parent 983c583f1b
commit 2d0d583a93
No known key found for this signature in database
GPG Key ID: D42ADB236521BF7A
7 changed files with 50 additions and 207 deletions

View File

@ -93,15 +93,15 @@ type AMQPWrapper struct {
Details interface{} `json:"details"`
}
func emitBasicPublish(event BasicPublish, connectionInfo *api.ConnectionInfo, emitter api.Emitter) {
func emitAMQP(event interface{}, _type string, method string, connectionInfo *api.ConnectionInfo, emitter api.Emitter) {
request := &api.GenericMessage{
IsRequest: true,
CaptureTime: time.Now(),
Payload: AMQPPayload{
Type: "basic_publish",
Type: _type,
Data: &AMQPWrapper{
Method: basicMethodMap[40],
Url: event.Exchange,
Method: method,
Url: "",
Details: event,
},
},
@ -287,31 +287,6 @@ func representBasicPublish(event map[string]interface{}) []interface{} {
return rep
}
func emitBasicDeliver(event BasicDeliver, connectionInfo *api.ConnectionInfo, emitter api.Emitter) {
request := &api.GenericMessage{
IsRequest: true,
CaptureTime: time.Now(),
Payload: AMQPPayload{
Type: "basic_deliver",
Data: &AMQPWrapper{
Method: basicMethodMap[60],
Url: event.Exchange,
Details: event,
},
},
}
item := &api.OutputChannelItem{
Protocol: protocol,
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
ConnectionInfo: connectionInfo,
Pair: &api.RequestResponsePair{
Request: *request,
Response: api.GenericMessage{},
},
}
emitter.Emit(item)
}
func representBasicDeliver(event map[string]interface{}) []interface{} {
rep := make([]interface{}, 0)
@ -389,31 +364,6 @@ func representBasicDeliver(event map[string]interface{}) []interface{} {
return rep
}
func emitQueueDeclare(event QueueDeclare, connectionInfo *api.ConnectionInfo, emitter api.Emitter) {
request := &api.GenericMessage{
IsRequest: true,
CaptureTime: time.Now(),
Payload: AMQPPayload{
Type: "queue_declare",
Data: &AMQPWrapper{
Method: queueMethodMap[10],
Url: event.Queue,
Details: event,
},
},
}
item := &api.OutputChannelItem{
Protocol: protocol,
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
ConnectionInfo: connectionInfo,
Pair: &api.RequestResponsePair{
Request: *request,
Response: api.GenericMessage{},
},
}
emitter.Emit(item)
}
func representQueueDeclare(event map[string]interface{}) []interface{} {
rep := make([]interface{}, 0)
@ -468,31 +418,6 @@ func representQueueDeclare(event map[string]interface{}) []interface{} {
return rep
}
func emitExchangeDeclare(event ExchangeDeclare, connectionInfo *api.ConnectionInfo, emitter api.Emitter) {
request := &api.GenericMessage{
IsRequest: true,
CaptureTime: time.Now(),
Payload: AMQPPayload{
Type: "exchange_declare",
Data: &AMQPWrapper{
Method: exchangeMethodMap[10],
Url: event.Exchange,
Details: event,
},
},
}
item := &api.OutputChannelItem{
Protocol: protocol,
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
ConnectionInfo: connectionInfo,
Pair: &api.RequestResponsePair{
Request: *request,
Response: api.GenericMessage{},
},
}
emitter.Emit(item)
}
func representExchangeDeclare(event map[string]interface{}) []interface{} {
rep := make([]interface{}, 0)
@ -551,31 +476,6 @@ func representExchangeDeclare(event map[string]interface{}) []interface{} {
return rep
}
func emitConnectionStart(event ConnectionStart, connectionInfo *api.ConnectionInfo, emitter api.Emitter) {
request := &api.GenericMessage{
IsRequest: true,
CaptureTime: time.Now(),
Payload: AMQPPayload{
Type: "connection_start",
Data: &AMQPWrapper{
Method: connectionMethodMap[10],
Url: fmt.Sprintf("%d.%d", event.VersionMajor, event.VersionMinor),
Details: event,
},
},
}
item := &api.OutputChannelItem{
Protocol: protocol,
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
ConnectionInfo: connectionInfo,
Pair: &api.RequestResponsePair{
Request: *request,
Response: api.GenericMessage{},
},
}
emitter.Emit(item)
}
func representConnectionStart(event map[string]interface{}) []interface{} {
rep := make([]interface{}, 0)
@ -634,31 +534,6 @@ func representConnectionStart(event map[string]interface{}) []interface{} {
return rep
}
func emitConnectionClose(event ConnectionClose, connectionInfo *api.ConnectionInfo, emitter api.Emitter) {
request := &api.GenericMessage{
IsRequest: true,
CaptureTime: time.Now(),
Payload: AMQPPayload{
Type: "connection_close",
Data: &AMQPWrapper{
Method: connectionMethodMap[50],
Url: event.ReplyText,
Details: event,
},
},
}
item := &api.OutputChannelItem{
Protocol: protocol,
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
ConnectionInfo: connectionInfo,
Pair: &api.RequestResponsePair{
Request: *request,
Response: api.GenericMessage{},
},
}
emitter.Emit(item)
}
func representConnectionClose(event map[string]interface{}) []interface{} {
rep := make([]interface{}, 0)
@ -689,31 +564,6 @@ func representConnectionClose(event map[string]interface{}) []interface{} {
return rep
}
func emitQueueBind(event QueueBind, connectionInfo *api.ConnectionInfo, emitter api.Emitter) {
request := &api.GenericMessage{
IsRequest: true,
CaptureTime: time.Now(),
Payload: AMQPPayload{
Type: "queue_bind",
Data: &AMQPWrapper{
Method: queueMethodMap[20],
Url: event.Queue,
Details: event,
},
},
}
item := &api.OutputChannelItem{
Protocol: protocol,
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
ConnectionInfo: connectionInfo,
Pair: &api.RequestResponsePair{
Request: *request,
Response: api.GenericMessage{},
},
}
emitter.Emit(item)
}
func representQueueBind(event map[string]interface{}) []interface{} {
rep := make([]interface{}, 0)
@ -760,31 +610,6 @@ func representQueueBind(event map[string]interface{}) []interface{} {
return rep
}
func emitBasicConsume(event BasicConsume, connectionInfo *api.ConnectionInfo, emitter api.Emitter) {
request := &api.GenericMessage{
IsRequest: true,
CaptureTime: time.Now(),
Payload: AMQPPayload{
Type: "basic_consume",
Data: &AMQPWrapper{
Method: basicMethodMap[20],
Url: event.Queue,
Details: event,
},
},
}
item := &api.OutputChannelItem{
Protocol: protocol,
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
ConnectionInfo: connectionInfo,
Pair: &api.RequestResponsePair{
Request: *request,
Response: api.GenericMessage{},
},
}
emitter.Emit(item)
}
func representBasicConsume(event map[string]interface{}) []interface{} {
rep := make([]interface{}, 0)

View File

@ -36,6 +36,8 @@ func (d dissecting) Ping() {
log.Printf("pong %s\n", protocol.Name)
}
const amqpRequest string = "amqp_request"
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) {
r := AmqpReader{b}
@ -104,10 +106,10 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em
switch lastMethodFrameMessage.(type) {
case *BasicPublish:
eventBasicPublish.Body = f.Body
emitBasicPublish(*eventBasicPublish, connectionInfo, emitter)
emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, emitter)
case *BasicDeliver:
eventBasicDeliver.Body = f.Body
emitBasicDeliver(*eventBasicDeliver, connectionInfo, emitter)
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, emitter)
default:
}
@ -128,7 +130,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em
NoWait: m.NoWait,
Arguments: m.Arguments,
}
emitQueueBind(*eventQueueBind, connectionInfo, emitter)
emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, emitter)
case *BasicConsume:
eventBasicConsume := &BasicConsume{
@ -140,7 +142,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em
NoWait: m.NoWait,
Arguments: m.Arguments,
}
emitBasicConsume(*eventBasicConsume, connectionInfo, emitter)
emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, emitter)
case *BasicDeliver:
eventBasicDeliver.ConsumerTag = m.ConsumerTag
@ -159,7 +161,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em
NoWait: m.NoWait,
Arguments: m.Arguments,
}
emitQueueDeclare(*eventQueueDeclare, connectionInfo, emitter)
emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, emitter)
case *ExchangeDeclare:
eventExchangeDeclare := &ExchangeDeclare{
@ -172,7 +174,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em
NoWait: m.NoWait,
Arguments: m.Arguments,
}
emitExchangeDeclare(*eventExchangeDeclare, connectionInfo, emitter)
emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, emitter)
case *ConnectionStart:
eventConnectionStart := &ConnectionStart{
@ -182,7 +184,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em
Mechanisms: m.Mechanisms,
Locales: m.Locales,
}
emitConnectionStart(*eventConnectionStart, connectionInfo, emitter)
emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, emitter)
case *ConnectionClose:
eventConnectionClose := &ConnectionClose{
@ -191,7 +193,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em
ClassId: m.ClassId,
MethodId: m.MethodId,
}
emitConnectionClose(*eventConnectionClose, connectionInfo, emitter)
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, emitter)
default:
@ -206,7 +208,6 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em
func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *api.MizuEntry {
request := item.Pair.Request.Payload.(map[string]interface{})
reqDetails := request["details"].(map[string]interface{})
entryBytes, _ := json.Marshal(item.Pair)
service := fmt.Sprintf("amqp")
summary := ""
@ -241,6 +242,8 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
break
}
request["url"] = summary
entryBytes, _ := json.Marshal(item.Pair)
return &api.MizuEntry{
ProtocolName: protocol.Name,
EntryId: entryId,

View File

@ -17,7 +17,7 @@ type AMQPPayloader interface {
func (h AMQPPayload) MarshalJSON() ([]byte, error) {
return json.Marshal(h.Data)
// switch h.Type {
// case "basic_publish":
// case "amqp_request":
// return json.Marshal(h.Data)
// default:
// panic(fmt.Sprintf("AMQP payload cannot be marshaled: %s\n", h.Type))

View File

@ -19,6 +19,12 @@ func (h KafkaPayload) MarshalJSON() ([]byte, error) {
return json.Marshal(h.Data)
}
type KafkaWrapper struct {
Method string `json:"method"`
Url string `json:"url"`
Details interface{} `json:"details"`
}
func representRequestHeader(data map[string]interface{}, rep []interface{}) []interface{} {
requestHeader, _ := json.Marshal([]map[string]string{
{

View File

@ -46,14 +46,14 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em
func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *api.MizuEntry {
request := item.Pair.Request.Payload.(map[string]interface{})
entryBytes, _ := json.Marshal(item.Pair)
reqDetails := request["details"].(map[string]interface{})
service := fmt.Sprintf("kafka")
apiKey := ApiKey(request["ApiKey"].(float64))
apiKey := ApiKey(reqDetails["ApiKey"].(float64))
summary := ""
switch apiKey {
case Metadata:
_topics := request["Payload"].(map[string]interface{})["Topics"]
_topics := reqDetails["Payload"].(map[string]interface{})["Topics"]
if _topics == nil {
break
}
@ -66,10 +66,10 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
}
break
case ApiVersions:
summary = request["ClientID"].(string)
summary = reqDetails["ClientID"].(string)
break
case Produce:
topics := request["Payload"].(map[string]interface{})["TopicData"].([]interface{})
topics := reqDetails["Payload"].(map[string]interface{})["TopicData"].([]interface{})
for _, topic := range topics {
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["Topic"].(string))
}
@ -78,7 +78,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
}
break
case Fetch:
topics := request["Payload"].(map[string]interface{})["Topics"].([]interface{})
topics := reqDetails["Payload"].(map[string]interface{})["Topics"].([]interface{})
for _, topic := range topics {
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["Topic"].(string))
}
@ -87,7 +87,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
}
break
case ListOffsets:
topics := request["Payload"].(map[string]interface{})["Topics"].([]interface{})
topics := reqDetails["Payload"].(map[string]interface{})["Topics"].([]interface{})
for _, topic := range topics {
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["Name"].(string))
}
@ -96,7 +96,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
}
break
case CreateTopics:
topics := request["Payload"].(map[string]interface{})["Topics"].([]interface{})
topics := reqDetails["Payload"].(map[string]interface{})["Topics"].([]interface{})
for _, topic := range topics {
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["Name"].(string))
}
@ -105,13 +105,15 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
}
break
case DeleteTopics:
topicNames := request["TopicNames"].([]string)
topicNames := reqDetails["TopicNames"].([]string)
for _, name := range topicNames {
summary += fmt.Sprintf("%s, ", name)
}
break
}
request["url"] = summary
entryBytes, _ := json.Marshal(item.Pair)
return &api.MizuEntry{
ProtocolName: _protocol.Name,
EntryId: entryId,
@ -163,17 +165,17 @@ func (d dissecting) Represent(entry string) ([]byte, error) {
representation := make(map[string]interface{}, 0)
request := root["request"].(map[string]interface{})["payload"].(map[string]interface{})
response := root["response"].(map[string]interface{})["payload"].(map[string]interface{})
// fmt.Printf("\n\nrequest: %+v\n", request)
// fmt.Printf("response: %+v\n", response)
reqDetails := request["details"].(map[string]interface{})
resDetails := response["details"].(map[string]interface{})
apiKey := ApiKey(request["ApiKey"].(float64))
apiKey := ApiKey(reqDetails["ApiKey"].(float64))
var repRequest []interface{}
var repResponse []interface{}
switch apiKey {
case Metadata:
repRequest = representMetadataRequest(request)
repResponse = representMetadataResponse(response)
repRequest = representMetadataRequest(reqDetails)
repResponse = representMetadataResponse(resDetails)
break
}

View File

@ -266,7 +266,11 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error
CaptureTime: time.Now(),
Payload: KafkaPayload{
Type: "kafka_request",
Data: reqResPair.Request,
Data: &KafkaWrapper{
Method: apiNames[apiKey],
Url: "",
Details: reqResPair.Request,
},
},
},
Response: api.GenericMessage{
@ -274,7 +278,11 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error
CaptureTime: time.Now(),
Payload: KafkaPayload{
Type: "kafka_response",
Data: reqResPair.Response,
Data: &KafkaWrapper{
Method: apiNames[apiKey],
Url: "",
Details: reqResPair.Response,
},
},
},
},

View File

@ -56,9 +56,8 @@ const HarEntrySummary: React.FC<any> = ({har}) => {
const {log: {entries}} = har;
const {response, request} = JSON.parse(entries[0].entry);
return <div className={classes.entrySummary}>
{response.payload && <div style={{marginRight: 8}}>
{response.payload && response.payload.status && <div style={{marginRight: 8}}>
<StatusCode statusCode={response.payload.status}/>
</div>}
<div style={{flexGrow: 1, overflow: 'hidden'}}>