diff --git a/tap/extensions/amqp/helpers.go b/tap/extensions/amqp/helpers.go index 4dcd99118..ec16443f5 100644 --- a/tap/extensions/amqp/helpers.go +++ b/tap/extensions/amqp/helpers.go @@ -93,15 +93,15 @@ type AMQPWrapper struct { Details interface{} `json:"details"` } -func emitBasicPublish(event BasicPublish, connectionInfo *api.ConnectionInfo, emitter api.Emitter) { +func emitAMQP(event interface{}, _type string, method string, connectionInfo *api.ConnectionInfo, emitter api.Emitter) { request := &api.GenericMessage{ IsRequest: true, CaptureTime: time.Now(), Payload: AMQPPayload{ - Type: "basic_publish", + Type: _type, Data: &AMQPWrapper{ - Method: basicMethodMap[40], - Url: event.Exchange, + Method: method, + Url: "", Details: event, }, }, @@ -287,31 +287,6 @@ func representBasicPublish(event map[string]interface{}) []interface{} { return rep } -func emitBasicDeliver(event BasicDeliver, connectionInfo *api.ConnectionInfo, emitter api.Emitter) { - request := &api.GenericMessage{ - IsRequest: true, - CaptureTime: time.Now(), - Payload: AMQPPayload{ - Type: "basic_deliver", - Data: &AMQPWrapper{ - Method: basicMethodMap[60], - Url: event.Exchange, - Details: event, - }, - }, - } - item := &api.OutputChannelItem{ - Protocol: protocol, - Timestamp: time.Now().UnixNano() / int64(time.Millisecond), - ConnectionInfo: connectionInfo, - Pair: &api.RequestResponsePair{ - Request: *request, - Response: api.GenericMessage{}, - }, - } - emitter.Emit(item) -} - func representBasicDeliver(event map[string]interface{}) []interface{} { rep := make([]interface{}, 0) @@ -389,31 +364,6 @@ func representBasicDeliver(event map[string]interface{}) []interface{} { return rep } -func emitQueueDeclare(event QueueDeclare, connectionInfo *api.ConnectionInfo, emitter api.Emitter) { - request := &api.GenericMessage{ - IsRequest: true, - CaptureTime: time.Now(), - Payload: AMQPPayload{ - Type: "queue_declare", - Data: &AMQPWrapper{ - Method: queueMethodMap[10], - Url: event.Queue, - Details: event, - }, - }, - } - item := &api.OutputChannelItem{ - Protocol: protocol, - Timestamp: time.Now().UnixNano() / int64(time.Millisecond), - ConnectionInfo: connectionInfo, - Pair: &api.RequestResponsePair{ - Request: *request, - Response: api.GenericMessage{}, - }, - } - emitter.Emit(item) -} - func representQueueDeclare(event map[string]interface{}) []interface{} { rep := make([]interface{}, 0) @@ -468,31 +418,6 @@ func representQueueDeclare(event map[string]interface{}) []interface{} { return rep } -func emitExchangeDeclare(event ExchangeDeclare, connectionInfo *api.ConnectionInfo, emitter api.Emitter) { - request := &api.GenericMessage{ - IsRequest: true, - CaptureTime: time.Now(), - Payload: AMQPPayload{ - Type: "exchange_declare", - Data: &AMQPWrapper{ - Method: exchangeMethodMap[10], - Url: event.Exchange, - Details: event, - }, - }, - } - item := &api.OutputChannelItem{ - Protocol: protocol, - Timestamp: time.Now().UnixNano() / int64(time.Millisecond), - ConnectionInfo: connectionInfo, - Pair: &api.RequestResponsePair{ - Request: *request, - Response: api.GenericMessage{}, - }, - } - emitter.Emit(item) -} - func representExchangeDeclare(event map[string]interface{}) []interface{} { rep := make([]interface{}, 0) @@ -551,31 +476,6 @@ func representExchangeDeclare(event map[string]interface{}) []interface{} { return rep } -func emitConnectionStart(event ConnectionStart, connectionInfo *api.ConnectionInfo, emitter api.Emitter) { - request := &api.GenericMessage{ - IsRequest: true, - CaptureTime: time.Now(), - Payload: AMQPPayload{ - Type: "connection_start", - Data: &AMQPWrapper{ - Method: connectionMethodMap[10], - Url: fmt.Sprintf("%d.%d", event.VersionMajor, event.VersionMinor), - Details: event, - }, - }, - } - item := &api.OutputChannelItem{ - Protocol: protocol, - Timestamp: time.Now().UnixNano() / int64(time.Millisecond), - ConnectionInfo: connectionInfo, - Pair: &api.RequestResponsePair{ - Request: *request, - Response: api.GenericMessage{}, - }, - } - emitter.Emit(item) -} - func representConnectionStart(event map[string]interface{}) []interface{} { rep := make([]interface{}, 0) @@ -634,31 +534,6 @@ func representConnectionStart(event map[string]interface{}) []interface{} { return rep } -func emitConnectionClose(event ConnectionClose, connectionInfo *api.ConnectionInfo, emitter api.Emitter) { - request := &api.GenericMessage{ - IsRequest: true, - CaptureTime: time.Now(), - Payload: AMQPPayload{ - Type: "connection_close", - Data: &AMQPWrapper{ - Method: connectionMethodMap[50], - Url: event.ReplyText, - Details: event, - }, - }, - } - item := &api.OutputChannelItem{ - Protocol: protocol, - Timestamp: time.Now().UnixNano() / int64(time.Millisecond), - ConnectionInfo: connectionInfo, - Pair: &api.RequestResponsePair{ - Request: *request, - Response: api.GenericMessage{}, - }, - } - emitter.Emit(item) -} - func representConnectionClose(event map[string]interface{}) []interface{} { rep := make([]interface{}, 0) @@ -689,31 +564,6 @@ func representConnectionClose(event map[string]interface{}) []interface{} { return rep } -func emitQueueBind(event QueueBind, connectionInfo *api.ConnectionInfo, emitter api.Emitter) { - request := &api.GenericMessage{ - IsRequest: true, - CaptureTime: time.Now(), - Payload: AMQPPayload{ - Type: "queue_bind", - Data: &AMQPWrapper{ - Method: queueMethodMap[20], - Url: event.Queue, - Details: event, - }, - }, - } - item := &api.OutputChannelItem{ - Protocol: protocol, - Timestamp: time.Now().UnixNano() / int64(time.Millisecond), - ConnectionInfo: connectionInfo, - Pair: &api.RequestResponsePair{ - Request: *request, - Response: api.GenericMessage{}, - }, - } - emitter.Emit(item) -} - func representQueueBind(event map[string]interface{}) []interface{} { rep := make([]interface{}, 0) @@ -760,31 +610,6 @@ func representQueueBind(event map[string]interface{}) []interface{} { return rep } -func emitBasicConsume(event BasicConsume, connectionInfo *api.ConnectionInfo, emitter api.Emitter) { - request := &api.GenericMessage{ - IsRequest: true, - CaptureTime: time.Now(), - Payload: AMQPPayload{ - Type: "basic_consume", - Data: &AMQPWrapper{ - Method: basicMethodMap[20], - Url: event.Queue, - Details: event, - }, - }, - } - item := &api.OutputChannelItem{ - Protocol: protocol, - Timestamp: time.Now().UnixNano() / int64(time.Millisecond), - ConnectionInfo: connectionInfo, - Pair: &api.RequestResponsePair{ - Request: *request, - Response: api.GenericMessage{}, - }, - } - emitter.Emit(item) -} - func representBasicConsume(event map[string]interface{}) []interface{} { rep := make([]interface{}, 0) diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index 24e8e9b29..45cf9bbf1 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -36,6 +36,8 @@ func (d dissecting) Ping() { log.Printf("pong %s\n", protocol.Name) } +const amqpRequest string = "amqp_request" + func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) { r := AmqpReader{b} @@ -104,10 +106,10 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em switch lastMethodFrameMessage.(type) { case *BasicPublish: eventBasicPublish.Body = f.Body - emitBasicPublish(*eventBasicPublish, connectionInfo, emitter) + emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, emitter) case *BasicDeliver: eventBasicDeliver.Body = f.Body - emitBasicDeliver(*eventBasicDeliver, connectionInfo, emitter) + emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, emitter) default: } @@ -128,7 +130,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em NoWait: m.NoWait, Arguments: m.Arguments, } - emitQueueBind(*eventQueueBind, connectionInfo, emitter) + emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, emitter) case *BasicConsume: eventBasicConsume := &BasicConsume{ @@ -140,7 +142,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em NoWait: m.NoWait, Arguments: m.Arguments, } - emitBasicConsume(*eventBasicConsume, connectionInfo, emitter) + emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, emitter) case *BasicDeliver: eventBasicDeliver.ConsumerTag = m.ConsumerTag @@ -159,7 +161,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em NoWait: m.NoWait, Arguments: m.Arguments, } - emitQueueDeclare(*eventQueueDeclare, connectionInfo, emitter) + emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, emitter) case *ExchangeDeclare: eventExchangeDeclare := &ExchangeDeclare{ @@ -172,7 +174,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em NoWait: m.NoWait, Arguments: m.Arguments, } - emitExchangeDeclare(*eventExchangeDeclare, connectionInfo, emitter) + emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, emitter) case *ConnectionStart: eventConnectionStart := &ConnectionStart{ @@ -182,7 +184,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em Mechanisms: m.Mechanisms, Locales: m.Locales, } - emitConnectionStart(*eventConnectionStart, connectionInfo, emitter) + emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, emitter) case *ConnectionClose: eventConnectionClose := &ConnectionClose{ @@ -191,7 +193,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em ClassId: m.ClassId, MethodId: m.MethodId, } - emitConnectionClose(*eventConnectionClose, connectionInfo, emitter) + emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, emitter) default: @@ -206,7 +208,6 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *api.MizuEntry { request := item.Pair.Request.Payload.(map[string]interface{}) reqDetails := request["details"].(map[string]interface{}) - entryBytes, _ := json.Marshal(item.Pair) service := fmt.Sprintf("amqp") summary := "" @@ -241,6 +242,8 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve break } + request["url"] = summary + entryBytes, _ := json.Marshal(item.Pair) return &api.MizuEntry{ ProtocolName: protocol.Name, EntryId: entryId, diff --git a/tap/extensions/amqp/structs.go b/tap/extensions/amqp/structs.go index 3a384a512..49c4e921d 100644 --- a/tap/extensions/amqp/structs.go +++ b/tap/extensions/amqp/structs.go @@ -17,7 +17,7 @@ type AMQPPayloader interface { func (h AMQPPayload) MarshalJSON() ([]byte, error) { return json.Marshal(h.Data) // switch h.Type { - // case "basic_publish": + // case "amqp_request": // return json.Marshal(h.Data) // default: // panic(fmt.Sprintf("AMQP payload cannot be marshaled: %s\n", h.Type)) diff --git a/tap/extensions/kafka/helpers.go b/tap/extensions/kafka/helpers.go index f5fac3307..4913b0759 100644 --- a/tap/extensions/kafka/helpers.go +++ b/tap/extensions/kafka/helpers.go @@ -19,6 +19,12 @@ func (h KafkaPayload) MarshalJSON() ([]byte, error) { return json.Marshal(h.Data) } +type KafkaWrapper struct { + Method string `json:"method"` + Url string `json:"url"` + Details interface{} `json:"details"` +} + func representRequestHeader(data map[string]interface{}, rep []interface{}) []interface{} { requestHeader, _ := json.Marshal([]map[string]string{ { diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index 913195412..5d6477291 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -46,14 +46,14 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *api.MizuEntry { request := item.Pair.Request.Payload.(map[string]interface{}) - entryBytes, _ := json.Marshal(item.Pair) + reqDetails := request["details"].(map[string]interface{}) service := fmt.Sprintf("kafka") - apiKey := ApiKey(request["ApiKey"].(float64)) + apiKey := ApiKey(reqDetails["ApiKey"].(float64)) summary := "" switch apiKey { case Metadata: - _topics := request["Payload"].(map[string]interface{})["Topics"] + _topics := reqDetails["Payload"].(map[string]interface{})["Topics"] if _topics == nil { break } @@ -66,10 +66,10 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve } break case ApiVersions: - summary = request["ClientID"].(string) + summary = reqDetails["ClientID"].(string) break case Produce: - topics := request["Payload"].(map[string]interface{})["TopicData"].([]interface{}) + topics := reqDetails["Payload"].(map[string]interface{})["TopicData"].([]interface{}) for _, topic := range topics { summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["Topic"].(string)) } @@ -78,7 +78,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve } break case Fetch: - topics := request["Payload"].(map[string]interface{})["Topics"].([]interface{}) + topics := reqDetails["Payload"].(map[string]interface{})["Topics"].([]interface{}) for _, topic := range topics { summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["Topic"].(string)) } @@ -87,7 +87,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve } break case ListOffsets: - topics := request["Payload"].(map[string]interface{})["Topics"].([]interface{}) + topics := reqDetails["Payload"].(map[string]interface{})["Topics"].([]interface{}) for _, topic := range topics { summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["Name"].(string)) } @@ -96,7 +96,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve } break case CreateTopics: - topics := request["Payload"].(map[string]interface{})["Topics"].([]interface{}) + topics := reqDetails["Payload"].(map[string]interface{})["Topics"].([]interface{}) for _, topic := range topics { summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["Name"].(string)) } @@ -105,13 +105,15 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve } break case DeleteTopics: - topicNames := request["TopicNames"].([]string) + topicNames := reqDetails["TopicNames"].([]string) for _, name := range topicNames { summary += fmt.Sprintf("%s, ", name) } break } + request["url"] = summary + entryBytes, _ := json.Marshal(item.Pair) return &api.MizuEntry{ ProtocolName: _protocol.Name, EntryId: entryId, @@ -163,17 +165,17 @@ func (d dissecting) Represent(entry string) ([]byte, error) { representation := make(map[string]interface{}, 0) request := root["request"].(map[string]interface{})["payload"].(map[string]interface{}) response := root["response"].(map[string]interface{})["payload"].(map[string]interface{}) - // fmt.Printf("\n\nrequest: %+v\n", request) - // fmt.Printf("response: %+v\n", response) + reqDetails := request["details"].(map[string]interface{}) + resDetails := response["details"].(map[string]interface{}) - apiKey := ApiKey(request["ApiKey"].(float64)) + apiKey := ApiKey(reqDetails["ApiKey"].(float64)) var repRequest []interface{} var repResponse []interface{} switch apiKey { case Metadata: - repRequest = representMetadataRequest(request) - repResponse = representMetadataResponse(response) + repRequest = representMetadataRequest(reqDetails) + repResponse = representMetadataResponse(resDetails) break } diff --git a/tap/extensions/kafka/response.go b/tap/extensions/kafka/response.go index f48bfbef3..75f1ff458 100644 --- a/tap/extensions/kafka/response.go +++ b/tap/extensions/kafka/response.go @@ -266,7 +266,11 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error CaptureTime: time.Now(), Payload: KafkaPayload{ Type: "kafka_request", - Data: reqResPair.Request, + Data: &KafkaWrapper{ + Method: apiNames[apiKey], + Url: "", + Details: reqResPair.Request, + }, }, }, Response: api.GenericMessage{ @@ -274,7 +278,11 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error CaptureTime: time.Now(), Payload: KafkaPayload{ Type: "kafka_response", - Data: reqResPair.Response, + Data: &KafkaWrapper{ + Method: apiNames[apiKey], + Url: "", + Details: reqResPair.Response, + }, }, }, }, diff --git a/ui/src/components/HarEntryDetailed.tsx b/ui/src/components/HarEntryDetailed.tsx index a7ad371d5..85c1c561e 100644 --- a/ui/src/components/HarEntryDetailed.tsx +++ b/ui/src/components/HarEntryDetailed.tsx @@ -56,9 +56,8 @@ const HarEntrySummary: React.FC = ({har}) => { const {log: {entries}} = har; const {response, request} = JSON.parse(entries[0].entry); - return
- {response.payload &&
+ {response.payload && response.payload.status &&
}