diff --git a/tap/extensions/kafka/helpers.go b/tap/extensions/kafka/helpers.go index 4961b9bca..c272f95ed 100644 --- a/tap/extensions/kafka/helpers.go +++ b/tap/extensions/kafka/helpers.go @@ -316,3 +316,136 @@ func representProduceResponse(data map[string]interface{}) []interface{} { return rep } + +func representFetchRequest(data map[string]interface{}) []interface{} { + rep := make([]interface{}, 0) + + rep = representRequestHeader(data, rep) + + payload := data["Payload"].(map[string]interface{}) + topics, _ := json.Marshal(payload["Topics"].([]interface{})) + replicaId := "" + if payload["ReplicaId"] != nil { + replicaId = fmt.Sprintf("%d", int(payload["ReplicaId"].(float64))) + } + maxBytes := "" + if payload["MaxBytes"] != nil { + maxBytes = fmt.Sprintf("%d", int(payload["MaxBytes"].(float64))) + } + isolationLevel := "" + if payload["IsolationLevel"] != nil { + isolationLevel = fmt.Sprintf("%d", int(payload["IsolationLevel"].(float64))) + } + sessionId := "" + if payload["SessionId"] != nil { + sessionId = fmt.Sprintf("%d", int(payload["SessionId"].(float64))) + } + sessionEpoch := "" + if payload["SessionEpoch"] != nil { + sessionEpoch = fmt.Sprintf("%d", int(payload["SessionEpoch"].(float64))) + } + forgottenTopicsData := "" + if payload["ForgottenTopicsData"] != nil { + x, _ := json.Marshal(payload["ForgottenTopicsData"].(map[string]interface{})) + forgottenTopicsData = string(x) + } + rackId := "" + if payload["RackId"] != nil { + rackId = fmt.Sprintf("%d", int(payload["RackId"].(float64))) + } + repPayload, _ := json.Marshal([]map[string]string{ + { + "name": "Replica ID", + "value": replicaId, + }, + { + "name": "Maximum Wait (ms)", + "value": fmt.Sprintf("%d", int(payload["MaxWaitMs"].(float64))), + }, + { + "name": "Minimum Bytes", + "value": fmt.Sprintf("%d", int(payload["MinBytes"].(float64))), + }, + { + "name": "Maximum Bytes", + "value": maxBytes, + }, + { + "name": "Isolation Level", + "value": isolationLevel, + }, + { + "name": "Session ID", + "value": sessionId, + }, + { + "name": "Session Epoch", + "value": sessionEpoch, + }, + { + "name": "Topics", + "value": string(topics), + }, + { + "name": "Forgotten Topics Data", + "value": forgottenTopicsData, + }, + { + "name": "Rack ID", + "value": rackId, + }, + }) + rep = append(rep, map[string]string{ + "type": "table", + "title": "Payload", + "data": string(repPayload), + }) + + return rep +} + +func representFetchResponse(data map[string]interface{}) []interface{} { + rep := make([]interface{}, 0) + + rep = representResponseHeader(data, rep) + + payload := data["Payload"].(map[string]interface{}) + responses, _ := json.Marshal(payload["Responses"].([]interface{})) + throttleTimeMs := "" + if payload["ThrottleTimeMs"] != nil { + throttleTimeMs = fmt.Sprintf("%d", int(payload["ThrottleTimeMs"].(float64))) + } + errorCode := "" + if payload["ErrorCode"] != nil { + errorCode = fmt.Sprintf("%d", int(payload["ErrorCode"].(float64))) + } + sessionId := "" + if payload["SessionId"] != nil { + sessionId = fmt.Sprintf("%d", int(payload["SessionId"].(float64))) + } + repPayload, _ := json.Marshal([]map[string]string{ + { + "name": "Throttle Time (ms)", + "value": throttleTimeMs, + }, + { + "name": "Error Code", + "value": errorCode, + }, + { + "name": "Session ID", + "value": sessionId, + }, + { + "name": "Responses", + "value": string(responses), + }, + }) + rep = append(rep, map[string]string{ + "type": "table", + "title": "Payload", + "data": string(repPayload), + }) + + return rep +} diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index 2d84c09cf..f89b5abbf 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -183,6 +183,9 @@ func (d dissecting) Represent(entry string) ([]byte, error) { case Produce: repRequest = representProduceRequest(reqDetails) repResponse = representProduceResponse(resDetails) + case Fetch: + repRequest = representFetchRequest(reqDetails) + repResponse = representFetchResponse(resDetails) break }