mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-08-24 11:29:24 +00:00
Add support of displaying nested data structures of Kafka in the right-pane (#643)
* Handle nested `topicData` in `representProduceRequest` * Handle nested `topics` in `representCreateTopicsRequest` and `representCreateTopicsResponse` * Handle nested `responses` in `representProduceResponse` * Handle nested `topics` in `representFetchRequest` and nested `responses` in `representFetchResponse` * Introduce `ignoreKeys` argument to `representMapAsTable` and ignore the keys based on that argument * Bring back the `nil` checks
This commit is contained in:
parent
f5bacbd1ea
commit
4db8e8902b
@ -3,6 +3,8 @@ module github.com/up9inc/mizu/tap/extensions/kafka
|
||||
go 1.16
|
||||
|
||||
require (
|
||||
github.com/fatih/camelcase v1.0.0
|
||||
github.com/ohler55/ojg v1.12.12
|
||||
github.com/segmentio/kafka-go v0.4.17
|
||||
github.com/up9inc/mizu/tap/api v0.0.0
|
||||
)
|
||||
|
@ -1,6 +1,8 @@
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
|
||||
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
|
||||
github.com/fatih/camelcase v1.0.0 h1:hxNvNX/xYBp0ovncs8WyWZrOrpBNub/JfaMvbURyft8=
|
||||
github.com/fatih/camelcase v1.0.0/go.mod h1:yN2Sb0lFhZJUdVvtELVWefmrXpuZESvPmqwoZc+/fpc=
|
||||
github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY=
|
||||
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
|
||||
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
|
||||
@ -16,6 +18,8 @@ github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfn
|
||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
|
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/ohler55/ojg v1.12.12 h1:hepbQFn7GHAecTPmwS3j5dCiOLsOpzPLvhiqnlAVAoE=
|
||||
github.com/ohler55/ojg v1.12.12/go.mod h1:LBbIVRAgoFbYBXQhRhuEpaJIqq+goSO63/FQ+nyJU88=
|
||||
github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A=
|
||||
github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
|
@ -3,8 +3,13 @@ package main
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/fatih/camelcase"
|
||||
"github.com/ohler55/ojg/jp"
|
||||
"github.com/ohler55/ojg/oj"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
@ -289,17 +294,12 @@ func representProduceRequest(data map[string]interface{}) []interface{} {
|
||||
rep = representRequestHeader(data, rep)
|
||||
|
||||
payload := data["payload"].(map[string]interface{})
|
||||
topicData := ""
|
||||
_topicData := payload["topicData"]
|
||||
if _topicData != nil {
|
||||
x, _ := json.Marshal(_topicData.([]interface{}))
|
||||
topicData = string(x)
|
||||
}
|
||||
topicData := payload["topicData"]
|
||||
transactionalID := ""
|
||||
if payload["transactionalID"] != nil {
|
||||
transactionalID = payload["transactionalID"].(string)
|
||||
}
|
||||
repPayload, _ := json.Marshal([]api.TableData{
|
||||
repTransactionDetails, _ := json.Marshal([]api.TableData{
|
||||
{
|
||||
Name: "Transactional ID",
|
||||
Value: transactionalID,
|
||||
@ -315,18 +315,73 @@ func representProduceRequest(data map[string]interface{}) []interface{} {
|
||||
Value: fmt.Sprintf("%d", int(payload["timeout"].(float64))),
|
||||
Selector: `request.payload.timeout`,
|
||||
},
|
||||
{
|
||||
Name: "Topic Data",
|
||||
Value: topicData,
|
||||
Selector: `request.payload.topicData`,
|
||||
},
|
||||
})
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.TABLE,
|
||||
Title: "Payload",
|
||||
Data: string(repPayload),
|
||||
Title: "Transaction Details",
|
||||
Data: string(repTransactionDetails),
|
||||
})
|
||||
|
||||
if topicData != nil {
|
||||
for _, _topic := range topicData.([]interface{}) {
|
||||
topic := _topic.(map[string]interface{})
|
||||
topicName := topic["topic"].(string)
|
||||
partitions := topic["partitions"].(map[string]interface{})
|
||||
partitionsJson, err := json.Marshal(partitions)
|
||||
if err != nil {
|
||||
return rep
|
||||
}
|
||||
|
||||
repPartitions, _ := json.Marshal([]api.TableData{
|
||||
{
|
||||
Name: "Length",
|
||||
Value: partitions["length"],
|
||||
Selector: `request.payload.transactionalID`,
|
||||
},
|
||||
})
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.TABLE,
|
||||
Title: fmt.Sprintf("Partitions (topic: %s)", topicName),
|
||||
Data: string(repPartitions),
|
||||
})
|
||||
|
||||
obj, err := oj.ParseString(string(partitionsJson))
|
||||
recordBatchPath, err := jp.ParseString(`partitionData.records.recordBatch`)
|
||||
recordBatchresults := recordBatchPath.Get(obj)
|
||||
if len(recordBatchresults) > 0 {
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.TABLE,
|
||||
Title: fmt.Sprintf("Record Batch (topic: %s)", topicName),
|
||||
Data: representMapAsTable(recordBatchresults[0].(map[string]interface{}), `request.payload.topicData.partitions.partitionData.records.recordBatch`, []string{"record"}),
|
||||
})
|
||||
}
|
||||
|
||||
recordsPath, err := jp.ParseString(`partitionData.records.recordBatch.record`)
|
||||
recordsResults := recordsPath.Get(obj)
|
||||
if len(recordsResults) > 0 {
|
||||
records := recordsResults[0].([]interface{})
|
||||
for i, _record := range records {
|
||||
record := _record.(map[string]interface{})
|
||||
value := record["value"]
|
||||
delete(record, "value")
|
||||
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.TABLE,
|
||||
Title: fmt.Sprintf("Record [%d] Details (topic: %s)", i, topicName),
|
||||
Data: representMapAsTable(record, fmt.Sprintf(`request.payload.topicData.partitions.partitionData.records.recordBatch.record[%d]`, i), []string{"value"}),
|
||||
})
|
||||
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.BODY,
|
||||
Title: fmt.Sprintf("Record [%d] Value", i),
|
||||
Data: value.(string),
|
||||
Selector: fmt.Sprintf(`request.payload.topicData.partitions.partitionData.records.recordBatch.record[%d].value`, i),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return rep
|
||||
}
|
||||
|
||||
@ -336,21 +391,12 @@ func representProduceResponse(data map[string]interface{}) []interface{} {
|
||||
rep = representResponseHeader(data, rep)
|
||||
|
||||
payload := data["payload"].(map[string]interface{})
|
||||
responses := ""
|
||||
if payload["responses"] != nil {
|
||||
_responses, _ := json.Marshal(payload["responses"].([]interface{}))
|
||||
responses = string(_responses)
|
||||
}
|
||||
responses := payload["responses"]
|
||||
throttleTimeMs := ""
|
||||
if payload["throttleTimeMs"] != nil {
|
||||
throttleTimeMs = fmt.Sprintf("%d", int(payload["throttleTimeMs"].(float64)))
|
||||
}
|
||||
repPayload, _ := json.Marshal([]api.TableData{
|
||||
{
|
||||
Name: "Responses",
|
||||
Value: string(responses),
|
||||
Selector: `response.payload.responses`,
|
||||
},
|
||||
{
|
||||
Name: "Throttle Time (ms)",
|
||||
Value: throttleTimeMs,
|
||||
@ -359,10 +405,31 @@ func representProduceResponse(data map[string]interface{}) []interface{} {
|
||||
})
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.TABLE,
|
||||
Title: "Payload",
|
||||
Title: "Transaction Details",
|
||||
Data: string(repPayload),
|
||||
})
|
||||
|
||||
if responses != nil {
|
||||
for i, _response := range responses.([]interface{}) {
|
||||
response := _response.(map[string]interface{})
|
||||
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.TABLE,
|
||||
Title: fmt.Sprintf("Response [%d]", i),
|
||||
Data: representMapAsTable(response, fmt.Sprintf(`response.payload.responses[%d]`, i), []string{"partitionResponses"}),
|
||||
})
|
||||
|
||||
for j, _partitionResponse := range response["partitionResponses"].([]interface{}) {
|
||||
partitionResponse := _partitionResponse.(map[string]interface{})
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.TABLE,
|
||||
Title: fmt.Sprintf("Response [%d] Partition Response [%d]", i, j),
|
||||
Data: representMapAsTable(partitionResponse, fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d]`, i, j), []string{}),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return rep
|
||||
}
|
||||
|
||||
@ -372,11 +439,7 @@ func representFetchRequest(data map[string]interface{}) []interface{} {
|
||||
rep = representRequestHeader(data, rep)
|
||||
|
||||
payload := data["payload"].(map[string]interface{})
|
||||
topics := ""
|
||||
if payload["topics"] != nil {
|
||||
_topics, _ := json.Marshal(payload["topics"].([]interface{}))
|
||||
topics = string(_topics)
|
||||
}
|
||||
topics := payload["topics"]
|
||||
replicaId := ""
|
||||
if payload["replicaId"] != nil {
|
||||
replicaId = fmt.Sprintf("%d", int(payload["replicaId"].(float64)))
|
||||
@ -442,11 +505,6 @@ func representFetchRequest(data map[string]interface{}) []interface{} {
|
||||
Value: sessionEpoch,
|
||||
Selector: `request.payload.sessionEpoch`,
|
||||
},
|
||||
{
|
||||
Name: "Topics",
|
||||
Value: topics,
|
||||
Selector: `request.payload.topics`,
|
||||
},
|
||||
{
|
||||
Name: "Forgotten Topics Data",
|
||||
Value: forgottenTopicsData,
|
||||
@ -460,10 +518,26 @@ func representFetchRequest(data map[string]interface{}) []interface{} {
|
||||
})
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.TABLE,
|
||||
Title: "Payload",
|
||||
Title: "Transaction Details",
|
||||
Data: string(repPayload),
|
||||
})
|
||||
|
||||
if topics != nil {
|
||||
for i, _topic := range topics.([]interface{}) {
|
||||
topic := _topic.(map[string]interface{})
|
||||
topicName := topic["topic"].(string)
|
||||
for j, _partition := range topic["partitions"].([]interface{}) {
|
||||
partition := _partition.(map[string]interface{})
|
||||
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.TABLE,
|
||||
Title: fmt.Sprintf("Partition [%d] (topic: %s)", j, topicName),
|
||||
Data: representMapAsTable(partition, fmt.Sprintf(`request.payload.topics[%d].partitions[%d]`, i, j), []string{}),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return rep
|
||||
}
|
||||
|
||||
@ -473,11 +547,7 @@ func representFetchResponse(data map[string]interface{}) []interface{} {
|
||||
rep = representResponseHeader(data, rep)
|
||||
|
||||
payload := data["payload"].(map[string]interface{})
|
||||
responses := ""
|
||||
if payload["responses"] != nil {
|
||||
_responses, _ := json.Marshal(payload["responses"].([]interface{}))
|
||||
responses = string(_responses)
|
||||
}
|
||||
responses := payload["responses"]
|
||||
throttleTimeMs := ""
|
||||
if payload["throttleTimeMs"] != nil {
|
||||
throttleTimeMs = fmt.Sprintf("%d", int(payload["throttleTimeMs"].(float64)))
|
||||
@ -506,18 +576,56 @@ func representFetchResponse(data map[string]interface{}) []interface{} {
|
||||
Value: sessionId,
|
||||
Selector: `response.payload.sessionId`,
|
||||
},
|
||||
{
|
||||
Name: "Responses",
|
||||
Value: responses,
|
||||
Selector: `response.payload.responses`,
|
||||
},
|
||||
})
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.TABLE,
|
||||
Title: "Payload",
|
||||
Title: "Transaction Details",
|
||||
Data: string(repPayload),
|
||||
})
|
||||
|
||||
if responses != nil {
|
||||
for i, _response := range responses.([]interface{}) {
|
||||
response := _response.(map[string]interface{})
|
||||
topicName := response["topic"].(string)
|
||||
|
||||
for j, _partitionResponse := range response["partitionResponses"].([]interface{}) {
|
||||
partitionResponse := _partitionResponse.(map[string]interface{})
|
||||
recordSet := partitionResponse["recordSet"].(map[string]interface{})
|
||||
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.TABLE,
|
||||
Title: fmt.Sprintf("Response [%d] Partition Response [%d] (topic: %s)", i, j, topicName),
|
||||
Data: representMapAsTable(partitionResponse, fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d]`, i, j), []string{"recordSet"}),
|
||||
})
|
||||
|
||||
recordBatch := recordSet["recordBatch"].(map[string]interface{})
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.TABLE,
|
||||
Title: fmt.Sprintf("Response [%d] Partition Response [%d] Record Batch (topic: %s)", i, j, topicName),
|
||||
Data: representMapAsTable(recordBatch, fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d].recordSet.recordBatch`, i, j), []string{"record"}),
|
||||
})
|
||||
|
||||
for k, _record := range recordBatch["record"].([]interface{}) {
|
||||
record := _record.(map[string]interface{})
|
||||
value := record["value"]
|
||||
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.TABLE,
|
||||
Title: fmt.Sprintf("Response [%d] Partition Response [%d] Record [%d] (topic: %s)", i, j, k, topicName),
|
||||
Data: representMapAsTable(record, fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d].recordSet.recordBatch.record[%d]`, i, j, k), []string{"value"}),
|
||||
})
|
||||
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.BODY,
|
||||
Title: fmt.Sprintf("Response [%d] Partition Response [%d] Record [%d] Value (topic: %s)", i, j, k, topicName),
|
||||
Data: value.(string),
|
||||
Selector: fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d].recordSet.recordBatch.record[%d].value`, i, j, k),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return rep
|
||||
}
|
||||
|
||||
@ -591,17 +699,11 @@ func representCreateTopicsRequest(data map[string]interface{}) []interface{} {
|
||||
rep = representRequestHeader(data, rep)
|
||||
|
||||
payload := data["payload"].(map[string]interface{})
|
||||
topics, _ := json.Marshal(payload["topics"].([]interface{}))
|
||||
validateOnly := ""
|
||||
if payload["validateOnly"] != nil {
|
||||
validateOnly = strconv.FormatBool(payload["validateOnly"].(bool))
|
||||
}
|
||||
repPayload, _ := json.Marshal([]api.TableData{
|
||||
{
|
||||
Name: "Topics",
|
||||
Value: string(topics),
|
||||
Selector: `request.payload.topics`,
|
||||
},
|
||||
{
|
||||
Name: "Timeout (ms)",
|
||||
Value: fmt.Sprintf("%d", int(payload["timeoutMs"].(float64))),
|
||||
@ -615,10 +717,20 @@ func representCreateTopicsRequest(data map[string]interface{}) []interface{} {
|
||||
})
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.TABLE,
|
||||
Title: "Payload",
|
||||
Title: "Transaction Details",
|
||||
Data: string(repPayload),
|
||||
})
|
||||
|
||||
for i, _topic := range payload["topics"].([]interface{}) {
|
||||
topic := _topic.(map[string]interface{})
|
||||
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.TABLE,
|
||||
Title: fmt.Sprintf("Topic [%d]", i),
|
||||
Data: representMapAsTable(topic, fmt.Sprintf(`request.payload.topics[%d]`, i), []string{}),
|
||||
})
|
||||
}
|
||||
|
||||
return rep
|
||||
}
|
||||
|
||||
@ -628,7 +740,6 @@ func representCreateTopicsResponse(data map[string]interface{}) []interface{} {
|
||||
rep = representResponseHeader(data, rep)
|
||||
|
||||
payload := data["payload"].(map[string]interface{})
|
||||
topics, _ := json.Marshal(payload["topics"].([]interface{}))
|
||||
throttleTimeMs := ""
|
||||
if payload["throttleTimeMs"] != nil {
|
||||
throttleTimeMs = fmt.Sprintf("%d", int(payload["throttleTimeMs"].(float64)))
|
||||
@ -639,18 +750,23 @@ func representCreateTopicsResponse(data map[string]interface{}) []interface{} {
|
||||
Value: throttleTimeMs,
|
||||
Selector: `response.payload.throttleTimeMs`,
|
||||
},
|
||||
{
|
||||
Name: "Topics",
|
||||
Value: string(topics),
|
||||
Selector: `response.payload.topics`,
|
||||
},
|
||||
})
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.TABLE,
|
||||
Title: "Payload",
|
||||
Title: "Transaction Details",
|
||||
Data: string(repPayload),
|
||||
})
|
||||
|
||||
for i, _topic := range payload["topics"].([]interface{}) {
|
||||
topic := _topic.(map[string]interface{})
|
||||
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.TABLE,
|
||||
Title: fmt.Sprintf("Topic [%d]", i),
|
||||
Data: representMapAsTable(topic, fmt.Sprintf(`response.payload.topics[%d]`, i), []string{}),
|
||||
})
|
||||
}
|
||||
|
||||
return rep
|
||||
}
|
||||
|
||||
@ -727,3 +843,42 @@ func representDeleteTopicsResponse(data map[string]interface{}) []interface{} {
|
||||
|
||||
return rep
|
||||
}
|
||||
|
||||
func contains(s []string, str string) bool {
|
||||
for _, v := range s {
|
||||
if v == str {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func representMapAsTable(mapData map[string]interface{}, selectorPrefix string, ignoreKeys []string) (representation string) {
|
||||
var table []api.TableData
|
||||
for key, value := range mapData {
|
||||
if contains(ignoreKeys, key) {
|
||||
continue
|
||||
}
|
||||
switch reflect.ValueOf(value).Kind() {
|
||||
case reflect.Map:
|
||||
fallthrough
|
||||
case reflect.Slice:
|
||||
x, err := json.Marshal(value)
|
||||
value = string(x)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
}
|
||||
selector := fmt.Sprintf("%s[\"%s\"]", selectorPrefix, key)
|
||||
table = append(table, api.TableData{
|
||||
Name: strings.Join(camelcase.Split(strings.Title(key)), " "),
|
||||
Value: value,
|
||||
Selector: selector,
|
||||
})
|
||||
}
|
||||
|
||||
obj, _ := json.Marshal(table)
|
||||
representation = string(obj)
|
||||
return
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user