From 4db8e8902be80c6cb4a4bb04c2c39ed4a2b8c3b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=2E=20Mert=20Y=C4=B1ld=C4=B1ran?= Date: Sun, 16 Jan 2022 09:36:29 +0300 Subject: [PATCH] Add support of displaying nested data structures of Kafka in the right-pane (#643) * Handle nested `topicData` in `representProduceRequest` * Handle nested `topics` in `representCreateTopicsRequest` and `representCreateTopicsResponse` * Handle nested `responses` in `representProduceResponse` * Handle nested `topics` in `representFetchRequest` and nested `responses` in `representFetchResponse` * Introduce `ignoreKeys` argument to `representMapAsTable` and ignore the keys based on that argument * Bring back the `nil` checks --- tap/extensions/kafka/go.mod | 2 + tap/extensions/kafka/go.sum | 4 + tap/extensions/kafka/helpers.go | 277 +++++++++++++++++++++++++------- 3 files changed, 222 insertions(+), 61 deletions(-) diff --git a/tap/extensions/kafka/go.mod b/tap/extensions/kafka/go.mod index 113627f94..cdb75609a 100644 --- a/tap/extensions/kafka/go.mod +++ b/tap/extensions/kafka/go.mod @@ -3,6 +3,8 @@ module github.com/up9inc/mizu/tap/extensions/kafka go 1.16 require ( + github.com/fatih/camelcase v1.0.0 + github.com/ohler55/ojg v1.12.12 github.com/segmentio/kafka-go v0.4.17 github.com/up9inc/mizu/tap/api v0.0.0 ) diff --git a/tap/extensions/kafka/go.sum b/tap/extensions/kafka/go.sum index 70ec03308..e3d62474b 100644 --- a/tap/extensions/kafka/go.sum +++ b/tap/extensions/kafka/go.sum @@ -1,6 +1,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/fatih/camelcase v1.0.0 h1:hxNvNX/xYBp0ovncs8WyWZrOrpBNub/JfaMvbURyft8= +github.com/fatih/camelcase v1.0.0/go.mod h1:yN2Sb0lFhZJUdVvtELVWefmrXpuZESvPmqwoZc+/fpc= github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY= github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= @@ -16,6 +18,8 @@ github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfn github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/ohler55/ojg v1.12.12 h1:hepbQFn7GHAecTPmwS3j5dCiOLsOpzPLvhiqnlAVAoE= +github.com/ohler55/ojg v1.12.12/go.mod h1:LBbIVRAgoFbYBXQhRhuEpaJIqq+goSO63/FQ+nyJU88= github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A= github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= diff --git a/tap/extensions/kafka/helpers.go b/tap/extensions/kafka/helpers.go index dffe30bd8..68264c077 100644 --- a/tap/extensions/kafka/helpers.go +++ b/tap/extensions/kafka/helpers.go @@ -3,8 +3,13 @@ package main import ( "encoding/json" "fmt" + "reflect" "strconv" + "strings" + "github.com/fatih/camelcase" + "github.com/ohler55/ojg/jp" + "github.com/ohler55/ojg/oj" "github.com/up9inc/mizu/tap/api" ) @@ -289,17 +294,12 @@ func representProduceRequest(data map[string]interface{}) []interface{} { rep = representRequestHeader(data, rep) payload := data["payload"].(map[string]interface{}) - topicData := "" - _topicData := payload["topicData"] - if _topicData != nil { - x, _ := json.Marshal(_topicData.([]interface{})) - topicData = string(x) - } + topicData := payload["topicData"] transactionalID := "" if payload["transactionalID"] != nil { transactionalID = payload["transactionalID"].(string) } - repPayload, _ := json.Marshal([]api.TableData{ + repTransactionDetails, _ := json.Marshal([]api.TableData{ { Name: "Transactional ID", Value: transactionalID, @@ -315,18 +315,73 @@ func representProduceRequest(data map[string]interface{}) []interface{} { Value: fmt.Sprintf("%d", int(payload["timeout"].(float64))), Selector: `request.payload.timeout`, }, - { - Name: "Topic Data", - Value: topicData, - Selector: `request.payload.topicData`, - }, }) rep = append(rep, api.SectionData{ Type: api.TABLE, - Title: "Payload", - Data: string(repPayload), + Title: "Transaction Details", + Data: string(repTransactionDetails), }) + if topicData != nil { + for _, _topic := range topicData.([]interface{}) { + topic := _topic.(map[string]interface{}) + topicName := topic["topic"].(string) + partitions := topic["partitions"].(map[string]interface{}) + partitionsJson, err := json.Marshal(partitions) + if err != nil { + return rep + } + + repPartitions, _ := json.Marshal([]api.TableData{ + { + Name: "Length", + Value: partitions["length"], + Selector: `request.payload.transactionalID`, + }, + }) + rep = append(rep, api.SectionData{ + Type: api.TABLE, + Title: fmt.Sprintf("Partitions (topic: %s)", topicName), + Data: string(repPartitions), + }) + + obj, err := oj.ParseString(string(partitionsJson)) + recordBatchPath, err := jp.ParseString(`partitionData.records.recordBatch`) + recordBatchresults := recordBatchPath.Get(obj) + if len(recordBatchresults) > 0 { + rep = append(rep, api.SectionData{ + Type: api.TABLE, + Title: fmt.Sprintf("Record Batch (topic: %s)", topicName), + Data: representMapAsTable(recordBatchresults[0].(map[string]interface{}), `request.payload.topicData.partitions.partitionData.records.recordBatch`, []string{"record"}), + }) + } + + recordsPath, err := jp.ParseString(`partitionData.records.recordBatch.record`) + recordsResults := recordsPath.Get(obj) + if len(recordsResults) > 0 { + records := recordsResults[0].([]interface{}) + for i, _record := range records { + record := _record.(map[string]interface{}) + value := record["value"] + delete(record, "value") + + rep = append(rep, api.SectionData{ + Type: api.TABLE, + Title: fmt.Sprintf("Record [%d] Details (topic: %s)", i, topicName), + Data: representMapAsTable(record, fmt.Sprintf(`request.payload.topicData.partitions.partitionData.records.recordBatch.record[%d]`, i), []string{"value"}), + }) + + rep = append(rep, api.SectionData{ + Type: api.BODY, + Title: fmt.Sprintf("Record [%d] Value", i), + Data: value.(string), + Selector: fmt.Sprintf(`request.payload.topicData.partitions.partitionData.records.recordBatch.record[%d].value`, i), + }) + } + } + } + } + return rep } @@ -336,21 +391,12 @@ func representProduceResponse(data map[string]interface{}) []interface{} { rep = representResponseHeader(data, rep) payload := data["payload"].(map[string]interface{}) - responses := "" - if payload["responses"] != nil { - _responses, _ := json.Marshal(payload["responses"].([]interface{})) - responses = string(_responses) - } + responses := payload["responses"] throttleTimeMs := "" if payload["throttleTimeMs"] != nil { throttleTimeMs = fmt.Sprintf("%d", int(payload["throttleTimeMs"].(float64))) } repPayload, _ := json.Marshal([]api.TableData{ - { - Name: "Responses", - Value: string(responses), - Selector: `response.payload.responses`, - }, { Name: "Throttle Time (ms)", Value: throttleTimeMs, @@ -359,10 +405,31 @@ func representProduceResponse(data map[string]interface{}) []interface{} { }) rep = append(rep, api.SectionData{ Type: api.TABLE, - Title: "Payload", + Title: "Transaction Details", Data: string(repPayload), }) + if responses != nil { + for i, _response := range responses.([]interface{}) { + response := _response.(map[string]interface{}) + + rep = append(rep, api.SectionData{ + Type: api.TABLE, + Title: fmt.Sprintf("Response [%d]", i), + Data: representMapAsTable(response, fmt.Sprintf(`response.payload.responses[%d]`, i), []string{"partitionResponses"}), + }) + + for j, _partitionResponse := range response["partitionResponses"].([]interface{}) { + partitionResponse := _partitionResponse.(map[string]interface{}) + rep = append(rep, api.SectionData{ + Type: api.TABLE, + Title: fmt.Sprintf("Response [%d] Partition Response [%d]", i, j), + Data: representMapAsTable(partitionResponse, fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d]`, i, j), []string{}), + }) + } + } + } + return rep } @@ -372,11 +439,7 @@ func representFetchRequest(data map[string]interface{}) []interface{} { rep = representRequestHeader(data, rep) payload := data["payload"].(map[string]interface{}) - topics := "" - if payload["topics"] != nil { - _topics, _ := json.Marshal(payload["topics"].([]interface{})) - topics = string(_topics) - } + topics := payload["topics"] replicaId := "" if payload["replicaId"] != nil { replicaId = fmt.Sprintf("%d", int(payload["replicaId"].(float64))) @@ -442,11 +505,6 @@ func representFetchRequest(data map[string]interface{}) []interface{} { Value: sessionEpoch, Selector: `request.payload.sessionEpoch`, }, - { - Name: "Topics", - Value: topics, - Selector: `request.payload.topics`, - }, { Name: "Forgotten Topics Data", Value: forgottenTopicsData, @@ -460,10 +518,26 @@ func representFetchRequest(data map[string]interface{}) []interface{} { }) rep = append(rep, api.SectionData{ Type: api.TABLE, - Title: "Payload", + Title: "Transaction Details", Data: string(repPayload), }) + if topics != nil { + for i, _topic := range topics.([]interface{}) { + topic := _topic.(map[string]interface{}) + topicName := topic["topic"].(string) + for j, _partition := range topic["partitions"].([]interface{}) { + partition := _partition.(map[string]interface{}) + + rep = append(rep, api.SectionData{ + Type: api.TABLE, + Title: fmt.Sprintf("Partition [%d] (topic: %s)", j, topicName), + Data: representMapAsTable(partition, fmt.Sprintf(`request.payload.topics[%d].partitions[%d]`, i, j), []string{}), + }) + } + } + } + return rep } @@ -473,11 +547,7 @@ func representFetchResponse(data map[string]interface{}) []interface{} { rep = representResponseHeader(data, rep) payload := data["payload"].(map[string]interface{}) - responses := "" - if payload["responses"] != nil { - _responses, _ := json.Marshal(payload["responses"].([]interface{})) - responses = string(_responses) - } + responses := payload["responses"] throttleTimeMs := "" if payload["throttleTimeMs"] != nil { throttleTimeMs = fmt.Sprintf("%d", int(payload["throttleTimeMs"].(float64))) @@ -506,18 +576,56 @@ func representFetchResponse(data map[string]interface{}) []interface{} { Value: sessionId, Selector: `response.payload.sessionId`, }, - { - Name: "Responses", - Value: responses, - Selector: `response.payload.responses`, - }, }) rep = append(rep, api.SectionData{ Type: api.TABLE, - Title: "Payload", + Title: "Transaction Details", Data: string(repPayload), }) + if responses != nil { + for i, _response := range responses.([]interface{}) { + response := _response.(map[string]interface{}) + topicName := response["topic"].(string) + + for j, _partitionResponse := range response["partitionResponses"].([]interface{}) { + partitionResponse := _partitionResponse.(map[string]interface{}) + recordSet := partitionResponse["recordSet"].(map[string]interface{}) + + rep = append(rep, api.SectionData{ + Type: api.TABLE, + Title: fmt.Sprintf("Response [%d] Partition Response [%d] (topic: %s)", i, j, topicName), + Data: representMapAsTable(partitionResponse, fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d]`, i, j), []string{"recordSet"}), + }) + + recordBatch := recordSet["recordBatch"].(map[string]interface{}) + rep = append(rep, api.SectionData{ + Type: api.TABLE, + Title: fmt.Sprintf("Response [%d] Partition Response [%d] Record Batch (topic: %s)", i, j, topicName), + Data: representMapAsTable(recordBatch, fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d].recordSet.recordBatch`, i, j), []string{"record"}), + }) + + for k, _record := range recordBatch["record"].([]interface{}) { + record := _record.(map[string]interface{}) + value := record["value"] + + rep = append(rep, api.SectionData{ + Type: api.TABLE, + Title: fmt.Sprintf("Response [%d] Partition Response [%d] Record [%d] (topic: %s)", i, j, k, topicName), + Data: representMapAsTable(record, fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d].recordSet.recordBatch.record[%d]`, i, j, k), []string{"value"}), + }) + + rep = append(rep, api.SectionData{ + Type: api.BODY, + Title: fmt.Sprintf("Response [%d] Partition Response [%d] Record [%d] Value (topic: %s)", i, j, k, topicName), + Data: value.(string), + Selector: fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d].recordSet.recordBatch.record[%d].value`, i, j, k), + }) + } + } + } + } + return rep } @@ -591,17 +699,11 @@ func representCreateTopicsRequest(data map[string]interface{}) []interface{} { rep = representRequestHeader(data, rep) payload := data["payload"].(map[string]interface{}) - topics, _ := json.Marshal(payload["topics"].([]interface{})) validateOnly := "" if payload["validateOnly"] != nil { validateOnly = strconv.FormatBool(payload["validateOnly"].(bool)) } repPayload, _ := json.Marshal([]api.TableData{ - { - Name: "Topics", - Value: string(topics), - Selector: `request.payload.topics`, - }, { Name: "Timeout (ms)", Value: fmt.Sprintf("%d", int(payload["timeoutMs"].(float64))), @@ -615,10 +717,20 @@ func representCreateTopicsRequest(data map[string]interface{}) []interface{} { }) rep = append(rep, api.SectionData{ Type: api.TABLE, - Title: "Payload", + Title: "Transaction Details", Data: string(repPayload), }) + for i, _topic := range payload["topics"].([]interface{}) { + topic := _topic.(map[string]interface{}) + + rep = append(rep, api.SectionData{ + Type: api.TABLE, + Title: fmt.Sprintf("Topic [%d]", i), + Data: representMapAsTable(topic, fmt.Sprintf(`request.payload.topics[%d]`, i), []string{}), + }) + } + return rep } @@ -628,7 +740,6 @@ func representCreateTopicsResponse(data map[string]interface{}) []interface{} { rep = representResponseHeader(data, rep) payload := data["payload"].(map[string]interface{}) - topics, _ := json.Marshal(payload["topics"].([]interface{})) throttleTimeMs := "" if payload["throttleTimeMs"] != nil { throttleTimeMs = fmt.Sprintf("%d", int(payload["throttleTimeMs"].(float64))) @@ -639,18 +750,23 @@ func representCreateTopicsResponse(data map[string]interface{}) []interface{} { Value: throttleTimeMs, Selector: `response.payload.throttleTimeMs`, }, - { - Name: "Topics", - Value: string(topics), - Selector: `response.payload.topics`, - }, }) rep = append(rep, api.SectionData{ Type: api.TABLE, - Title: "Payload", + Title: "Transaction Details", Data: string(repPayload), }) + for i, _topic := range payload["topics"].([]interface{}) { + topic := _topic.(map[string]interface{}) + + rep = append(rep, api.SectionData{ + Type: api.TABLE, + Title: fmt.Sprintf("Topic [%d]", i), + Data: representMapAsTable(topic, fmt.Sprintf(`response.payload.topics[%d]`, i), []string{}), + }) + } + return rep } @@ -727,3 +843,42 @@ func representDeleteTopicsResponse(data map[string]interface{}) []interface{} { return rep } + +func contains(s []string, str string) bool { + for _, v := range s { + if v == str { + return true + } + } + + return false +} + +func representMapAsTable(mapData map[string]interface{}, selectorPrefix string, ignoreKeys []string) (representation string) { + var table []api.TableData + for key, value := range mapData { + if contains(ignoreKeys, key) { + continue + } + switch reflect.ValueOf(value).Kind() { + case reflect.Map: + fallthrough + case reflect.Slice: + x, err := json.Marshal(value) + value = string(x) + if err != nil { + continue + } + } + selector := fmt.Sprintf("%s[\"%s\"]", selectorPrefix, key) + table = append(table, api.TableData{ + Name: strings.Join(camelcase.Split(strings.Title(key)), " "), + Value: value, + Selector: selector, + }) + } + + obj, _ := json.Marshal(table) + representation = string(obj) + return +}