diff --git a/tap/extensions/kafka/helpers.go b/tap/extensions/kafka/helpers.go index c69c30488..4961b9bca 100644 --- a/tap/extensions/kafka/helpers.go +++ b/tap/extensions/kafka/helpers.go @@ -79,10 +79,14 @@ func representMetadataRequest(data map[string]interface{}) []interface{} { rep = representRequestHeader(data, rep) payload := data["Payload"].(map[string]interface{}) - topics, _ := json.Marshal(payload["Topics"].([]interface{})) + topics := "" allowAutoTopicCreation := "" includeClusterAuthorizedOperations := "" includeTopicAuthorizedOperations := "" + if payload["Topics"] != nil { + x, _ := json.Marshal(payload["Topics"].([]interface{})) + topics = string(x) + } if payload["AllowAutoTopicCreation"] != nil { allowAutoTopicCreation = strconv.FormatBool(payload["AllowAutoTopicCreation"].(bool)) } @@ -95,7 +99,7 @@ func representMetadataRequest(data map[string]interface{}) []interface{} { repPayload, _ := json.Marshal([]map[string]string{ { "name": "Topics", - "value": string(topics), + "value": topics, }, { "name": "Allow Auto Topic Creation", @@ -244,3 +248,71 @@ func representApiVersionsResponse(data map[string]interface{}) []interface{} { return rep } + +func representProduceRequest(data map[string]interface{}) []interface{} { + rep := make([]interface{}, 0) + + rep = representRequestHeader(data, rep) + + payload := data["Payload"].(map[string]interface{}) + topicData, _ := json.Marshal(payload["TopicData"].([]interface{})) + transactionalID := "" + if payload["TransactionalID"] != nil { + transactionalID = payload["TransactionalID"].(string) + } + repPayload, _ := json.Marshal([]map[string]string{ + { + "name": "Transactional ID", + "value": transactionalID, + }, + { + "name": "Required Acknowledgements", + "value": fmt.Sprintf("%d", int(payload["RequiredAcks"].(float64))), + }, + { + "name": "Timeout", + "value": fmt.Sprintf("%d", int(payload["Timeout"].(float64))), + }, + { + "name": "Topic Data", + "value": string(topicData), + }, + }) + rep = append(rep, map[string]string{ + "type": "table", + "title": "Payload", + "data": string(repPayload), + }) + + return rep +} + +func representProduceResponse(data map[string]interface{}) []interface{} { + rep := make([]interface{}, 0) + + rep = representResponseHeader(data, rep) + + payload := data["Payload"].(map[string]interface{}) + responses, _ := json.Marshal(payload["Responses"].([]interface{})) + throttleTimeMs := "" + if payload["ThrottleTimeMs"] != nil { + throttleTimeMs = fmt.Sprintf("%d", int(payload["ThrottleTimeMs"].(float64))) + } + repPayload, _ := json.Marshal([]map[string]string{ + { + "name": "Responses", + "value": string(responses), + }, + { + "name": "Throttle Time (ms)", + "value": throttleTimeMs, + }, + }) + rep = append(rep, map[string]string{ + "type": "table", + "title": "Payload", + "data": string(repPayload), + }) + + return rep +} diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index ccb44e469..2d84c09cf 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -180,6 +180,9 @@ func (d dissecting) Represent(entry string) ([]byte, error) { case ApiVersions: repRequest = representApiVersionsRequest(reqDetails) repResponse = representApiVersionsResponse(resDetails) + case Produce: + repRequest = representProduceRequest(reqDetails) + repResponse = representProduceResponse(resDetails) break }