|
|
|
|
@@ -3,8 +3,13 @@ package main
|
|
|
|
|
import (
|
|
|
|
|
"encoding/json"
|
|
|
|
|
"fmt"
|
|
|
|
|
"reflect"
|
|
|
|
|
"strconv"
|
|
|
|
|
"strings"
|
|
|
|
|
|
|
|
|
|
"github.com/fatih/camelcase"
|
|
|
|
|
"github.com/ohler55/ojg/jp"
|
|
|
|
|
"github.com/ohler55/ojg/oj"
|
|
|
|
|
"github.com/up9inc/mizu/tap/api"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
@@ -289,17 +294,12 @@ func representProduceRequest(data map[string]interface{}) []interface{} {
|
|
|
|
|
rep = representRequestHeader(data, rep)
|
|
|
|
|
|
|
|
|
|
payload := data["payload"].(map[string]interface{})
|
|
|
|
|
topicData := ""
|
|
|
|
|
_topicData := payload["topicData"]
|
|
|
|
|
if _topicData != nil {
|
|
|
|
|
x, _ := json.Marshal(_topicData.([]interface{}))
|
|
|
|
|
topicData = string(x)
|
|
|
|
|
}
|
|
|
|
|
topicData := payload["topicData"]
|
|
|
|
|
transactionalID := ""
|
|
|
|
|
if payload["transactionalID"] != nil {
|
|
|
|
|
transactionalID = payload["transactionalID"].(string)
|
|
|
|
|
}
|
|
|
|
|
repPayload, _ := json.Marshal([]api.TableData{
|
|
|
|
|
repTransactionDetails, _ := json.Marshal([]api.TableData{
|
|
|
|
|
{
|
|
|
|
|
Name: "Transactional ID",
|
|
|
|
|
Value: transactionalID,
|
|
|
|
|
@@ -315,18 +315,73 @@ func representProduceRequest(data map[string]interface{}) []interface{} {
|
|
|
|
|
Value: fmt.Sprintf("%d", int(payload["timeout"].(float64))),
|
|
|
|
|
Selector: `request.payload.timeout`,
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
Name: "Topic Data",
|
|
|
|
|
Value: topicData,
|
|
|
|
|
Selector: `request.payload.topicData`,
|
|
|
|
|
},
|
|
|
|
|
})
|
|
|
|
|
rep = append(rep, api.SectionData{
|
|
|
|
|
Type: api.TABLE,
|
|
|
|
|
Title: "Payload",
|
|
|
|
|
Data: string(repPayload),
|
|
|
|
|
Title: "Transaction Details",
|
|
|
|
|
Data: string(repTransactionDetails),
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
if topicData != nil {
|
|
|
|
|
for _, _topic := range topicData.([]interface{}) {
|
|
|
|
|
topic := _topic.(map[string]interface{})
|
|
|
|
|
topicName := topic["topic"].(string)
|
|
|
|
|
partitions := topic["partitions"].(map[string]interface{})
|
|
|
|
|
partitionsJson, err := json.Marshal(partitions)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return rep
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
repPartitions, _ := json.Marshal([]api.TableData{
|
|
|
|
|
{
|
|
|
|
|
Name: "Length",
|
|
|
|
|
Value: partitions["length"],
|
|
|
|
|
Selector: `request.payload.transactionalID`,
|
|
|
|
|
},
|
|
|
|
|
})
|
|
|
|
|
rep = append(rep, api.SectionData{
|
|
|
|
|
Type: api.TABLE,
|
|
|
|
|
Title: fmt.Sprintf("Partitions (topic: %s)", topicName),
|
|
|
|
|
Data: string(repPartitions),
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
obj, err := oj.ParseString(string(partitionsJson))
|
|
|
|
|
recordBatchPath, err := jp.ParseString(`partitionData.records.recordBatch`)
|
|
|
|
|
recordBatchresults := recordBatchPath.Get(obj)
|
|
|
|
|
if len(recordBatchresults) > 0 {
|
|
|
|
|
rep = append(rep, api.SectionData{
|
|
|
|
|
Type: api.TABLE,
|
|
|
|
|
Title: fmt.Sprintf("Record Batch (topic: %s)", topicName),
|
|
|
|
|
Data: representMapAsTable(recordBatchresults[0].(map[string]interface{}), `request.payload.topicData.partitions.partitionData.records.recordBatch`, []string{"record"}),
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
recordsPath, err := jp.ParseString(`partitionData.records.recordBatch.record`)
|
|
|
|
|
recordsResults := recordsPath.Get(obj)
|
|
|
|
|
if len(recordsResults) > 0 {
|
|
|
|
|
records := recordsResults[0].([]interface{})
|
|
|
|
|
for i, _record := range records {
|
|
|
|
|
record := _record.(map[string]interface{})
|
|
|
|
|
value := record["value"]
|
|
|
|
|
delete(record, "value")
|
|
|
|
|
|
|
|
|
|
rep = append(rep, api.SectionData{
|
|
|
|
|
Type: api.TABLE,
|
|
|
|
|
Title: fmt.Sprintf("Record [%d] Details (topic: %s)", i, topicName),
|
|
|
|
|
Data: representMapAsTable(record, fmt.Sprintf(`request.payload.topicData.partitions.partitionData.records.recordBatch.record[%d]`, i), []string{"value"}),
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
rep = append(rep, api.SectionData{
|
|
|
|
|
Type: api.BODY,
|
|
|
|
|
Title: fmt.Sprintf("Record [%d] Value", i),
|
|
|
|
|
Data: value.(string),
|
|
|
|
|
Selector: fmt.Sprintf(`request.payload.topicData.partitions.partitionData.records.recordBatch.record[%d].value`, i),
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return rep
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -336,21 +391,12 @@ func representProduceResponse(data map[string]interface{}) []interface{} {
|
|
|
|
|
rep = representResponseHeader(data, rep)
|
|
|
|
|
|
|
|
|
|
payload := data["payload"].(map[string]interface{})
|
|
|
|
|
responses := ""
|
|
|
|
|
if payload["responses"] != nil {
|
|
|
|
|
_responses, _ := json.Marshal(payload["responses"].([]interface{}))
|
|
|
|
|
responses = string(_responses)
|
|
|
|
|
}
|
|
|
|
|
responses := payload["responses"]
|
|
|
|
|
throttleTimeMs := ""
|
|
|
|
|
if payload["throttleTimeMs"] != nil {
|
|
|
|
|
throttleTimeMs = fmt.Sprintf("%d", int(payload["throttleTimeMs"].(float64)))
|
|
|
|
|
}
|
|
|
|
|
repPayload, _ := json.Marshal([]api.TableData{
|
|
|
|
|
{
|
|
|
|
|
Name: "Responses",
|
|
|
|
|
Value: string(responses),
|
|
|
|
|
Selector: `response.payload.responses`,
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
Name: "Throttle Time (ms)",
|
|
|
|
|
Value: throttleTimeMs,
|
|
|
|
|
@@ -359,10 +405,31 @@ func representProduceResponse(data map[string]interface{}) []interface{} {
|
|
|
|
|
})
|
|
|
|
|
rep = append(rep, api.SectionData{
|
|
|
|
|
Type: api.TABLE,
|
|
|
|
|
Title: "Payload",
|
|
|
|
|
Title: "Transaction Details",
|
|
|
|
|
Data: string(repPayload),
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
if responses != nil {
|
|
|
|
|
for i, _response := range responses.([]interface{}) {
|
|
|
|
|
response := _response.(map[string]interface{})
|
|
|
|
|
|
|
|
|
|
rep = append(rep, api.SectionData{
|
|
|
|
|
Type: api.TABLE,
|
|
|
|
|
Title: fmt.Sprintf("Response [%d]", i),
|
|
|
|
|
Data: representMapAsTable(response, fmt.Sprintf(`response.payload.responses[%d]`, i), []string{"partitionResponses"}),
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
for j, _partitionResponse := range response["partitionResponses"].([]interface{}) {
|
|
|
|
|
partitionResponse := _partitionResponse.(map[string]interface{})
|
|
|
|
|
rep = append(rep, api.SectionData{
|
|
|
|
|
Type: api.TABLE,
|
|
|
|
|
Title: fmt.Sprintf("Response [%d] Partition Response [%d]", i, j),
|
|
|
|
|
Data: representMapAsTable(partitionResponse, fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d]`, i, j), []string{}),
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return rep
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -372,11 +439,7 @@ func representFetchRequest(data map[string]interface{}) []interface{} {
|
|
|
|
|
rep = representRequestHeader(data, rep)
|
|
|
|
|
|
|
|
|
|
payload := data["payload"].(map[string]interface{})
|
|
|
|
|
topics := ""
|
|
|
|
|
if payload["topics"] != nil {
|
|
|
|
|
_topics, _ := json.Marshal(payload["topics"].([]interface{}))
|
|
|
|
|
topics = string(_topics)
|
|
|
|
|
}
|
|
|
|
|
topics := payload["topics"]
|
|
|
|
|
replicaId := ""
|
|
|
|
|
if payload["replicaId"] != nil {
|
|
|
|
|
replicaId = fmt.Sprintf("%d", int(payload["replicaId"].(float64)))
|
|
|
|
|
@@ -442,11 +505,6 @@ func representFetchRequest(data map[string]interface{}) []interface{} {
|
|
|
|
|
Value: sessionEpoch,
|
|
|
|
|
Selector: `request.payload.sessionEpoch`,
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
Name: "Topics",
|
|
|
|
|
Value: topics,
|
|
|
|
|
Selector: `request.payload.topics`,
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
Name: "Forgotten Topics Data",
|
|
|
|
|
Value: forgottenTopicsData,
|
|
|
|
|
@@ -460,10 +518,26 @@ func representFetchRequest(data map[string]interface{}) []interface{} {
|
|
|
|
|
})
|
|
|
|
|
rep = append(rep, api.SectionData{
|
|
|
|
|
Type: api.TABLE,
|
|
|
|
|
Title: "Payload",
|
|
|
|
|
Title: "Transaction Details",
|
|
|
|
|
Data: string(repPayload),
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
if topics != nil {
|
|
|
|
|
for i, _topic := range topics.([]interface{}) {
|
|
|
|
|
topic := _topic.(map[string]interface{})
|
|
|
|
|
topicName := topic["topic"].(string)
|
|
|
|
|
for j, _partition := range topic["partitions"].([]interface{}) {
|
|
|
|
|
partition := _partition.(map[string]interface{})
|
|
|
|
|
|
|
|
|
|
rep = append(rep, api.SectionData{
|
|
|
|
|
Type: api.TABLE,
|
|
|
|
|
Title: fmt.Sprintf("Partition [%d] (topic: %s)", j, topicName),
|
|
|
|
|
Data: representMapAsTable(partition, fmt.Sprintf(`request.payload.topics[%d].partitions[%d]`, i, j), []string{}),
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return rep
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -473,11 +547,7 @@ func representFetchResponse(data map[string]interface{}) []interface{} {
|
|
|
|
|
rep = representResponseHeader(data, rep)
|
|
|
|
|
|
|
|
|
|
payload := data["payload"].(map[string]interface{})
|
|
|
|
|
responses := ""
|
|
|
|
|
if payload["responses"] != nil {
|
|
|
|
|
_responses, _ := json.Marshal(payload["responses"].([]interface{}))
|
|
|
|
|
responses = string(_responses)
|
|
|
|
|
}
|
|
|
|
|
responses := payload["responses"]
|
|
|
|
|
throttleTimeMs := ""
|
|
|
|
|
if payload["throttleTimeMs"] != nil {
|
|
|
|
|
throttleTimeMs = fmt.Sprintf("%d", int(payload["throttleTimeMs"].(float64)))
|
|
|
|
|
@@ -506,18 +576,56 @@ func representFetchResponse(data map[string]interface{}) []interface{} {
|
|
|
|
|
Value: sessionId,
|
|
|
|
|
Selector: `response.payload.sessionId`,
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
Name: "Responses",
|
|
|
|
|
Value: responses,
|
|
|
|
|
Selector: `response.payload.responses`,
|
|
|
|
|
},
|
|
|
|
|
})
|
|
|
|
|
rep = append(rep, api.SectionData{
|
|
|
|
|
Type: api.TABLE,
|
|
|
|
|
Title: "Payload",
|
|
|
|
|
Title: "Transaction Details",
|
|
|
|
|
Data: string(repPayload),
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
if responses != nil {
|
|
|
|
|
for i, _response := range responses.([]interface{}) {
|
|
|
|
|
response := _response.(map[string]interface{})
|
|
|
|
|
topicName := response["topic"].(string)
|
|
|
|
|
|
|
|
|
|
for j, _partitionResponse := range response["partitionResponses"].([]interface{}) {
|
|
|
|
|
partitionResponse := _partitionResponse.(map[string]interface{})
|
|
|
|
|
recordSet := partitionResponse["recordSet"].(map[string]interface{})
|
|
|
|
|
|
|
|
|
|
rep = append(rep, api.SectionData{
|
|
|
|
|
Type: api.TABLE,
|
|
|
|
|
Title: fmt.Sprintf("Response [%d] Partition Response [%d] (topic: %s)", i, j, topicName),
|
|
|
|
|
Data: representMapAsTable(partitionResponse, fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d]`, i, j), []string{"recordSet"}),
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
recordBatch := recordSet["recordBatch"].(map[string]interface{})
|
|
|
|
|
rep = append(rep, api.SectionData{
|
|
|
|
|
Type: api.TABLE,
|
|
|
|
|
Title: fmt.Sprintf("Response [%d] Partition Response [%d] Record Batch (topic: %s)", i, j, topicName),
|
|
|
|
|
Data: representMapAsTable(recordBatch, fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d].recordSet.recordBatch`, i, j), []string{"record"}),
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
for k, _record := range recordBatch["record"].([]interface{}) {
|
|
|
|
|
record := _record.(map[string]interface{})
|
|
|
|
|
value := record["value"]
|
|
|
|
|
|
|
|
|
|
rep = append(rep, api.SectionData{
|
|
|
|
|
Type: api.TABLE,
|
|
|
|
|
Title: fmt.Sprintf("Response [%d] Partition Response [%d] Record [%d] (topic: %s)", i, j, k, topicName),
|
|
|
|
|
Data: representMapAsTable(record, fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d].recordSet.recordBatch.record[%d]`, i, j, k), []string{"value"}),
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
rep = append(rep, api.SectionData{
|
|
|
|
|
Type: api.BODY,
|
|
|
|
|
Title: fmt.Sprintf("Response [%d] Partition Response [%d] Record [%d] Value (topic: %s)", i, j, k, topicName),
|
|
|
|
|
Data: value.(string),
|
|
|
|
|
Selector: fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d].recordSet.recordBatch.record[%d].value`, i, j, k),
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return rep
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -591,17 +699,11 @@ func representCreateTopicsRequest(data map[string]interface{}) []interface{} {
|
|
|
|
|
rep = representRequestHeader(data, rep)
|
|
|
|
|
|
|
|
|
|
payload := data["payload"].(map[string]interface{})
|
|
|
|
|
topics, _ := json.Marshal(payload["topics"].([]interface{}))
|
|
|
|
|
validateOnly := ""
|
|
|
|
|
if payload["validateOnly"] != nil {
|
|
|
|
|
validateOnly = strconv.FormatBool(payload["validateOnly"].(bool))
|
|
|
|
|
}
|
|
|
|
|
repPayload, _ := json.Marshal([]api.TableData{
|
|
|
|
|
{
|
|
|
|
|
Name: "Topics",
|
|
|
|
|
Value: string(topics),
|
|
|
|
|
Selector: `request.payload.topics`,
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
Name: "Timeout (ms)",
|
|
|
|
|
Value: fmt.Sprintf("%d", int(payload["timeoutMs"].(float64))),
|
|
|
|
|
@@ -615,10 +717,20 @@ func representCreateTopicsRequest(data map[string]interface{}) []interface{} {
|
|
|
|
|
})
|
|
|
|
|
rep = append(rep, api.SectionData{
|
|
|
|
|
Type: api.TABLE,
|
|
|
|
|
Title: "Payload",
|
|
|
|
|
Title: "Transaction Details",
|
|
|
|
|
Data: string(repPayload),
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
for i, _topic := range payload["topics"].([]interface{}) {
|
|
|
|
|
topic := _topic.(map[string]interface{})
|
|
|
|
|
|
|
|
|
|
rep = append(rep, api.SectionData{
|
|
|
|
|
Type: api.TABLE,
|
|
|
|
|
Title: fmt.Sprintf("Topic [%d]", i),
|
|
|
|
|
Data: representMapAsTable(topic, fmt.Sprintf(`request.payload.topics[%d]`, i), []string{}),
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return rep
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -628,7 +740,6 @@ func representCreateTopicsResponse(data map[string]interface{}) []interface{} {
|
|
|
|
|
rep = representResponseHeader(data, rep)
|
|
|
|
|
|
|
|
|
|
payload := data["payload"].(map[string]interface{})
|
|
|
|
|
topics, _ := json.Marshal(payload["topics"].([]interface{}))
|
|
|
|
|
throttleTimeMs := ""
|
|
|
|
|
if payload["throttleTimeMs"] != nil {
|
|
|
|
|
throttleTimeMs = fmt.Sprintf("%d", int(payload["throttleTimeMs"].(float64)))
|
|
|
|
|
@@ -639,18 +750,23 @@ func representCreateTopicsResponse(data map[string]interface{}) []interface{} {
|
|
|
|
|
Value: throttleTimeMs,
|
|
|
|
|
Selector: `response.payload.throttleTimeMs`,
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
Name: "Topics",
|
|
|
|
|
Value: string(topics),
|
|
|
|
|
Selector: `response.payload.topics`,
|
|
|
|
|
},
|
|
|
|
|
})
|
|
|
|
|
rep = append(rep, api.SectionData{
|
|
|
|
|
Type: api.TABLE,
|
|
|
|
|
Title: "Payload",
|
|
|
|
|
Title: "Transaction Details",
|
|
|
|
|
Data: string(repPayload),
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
for i, _topic := range payload["topics"].([]interface{}) {
|
|
|
|
|
topic := _topic.(map[string]interface{})
|
|
|
|
|
|
|
|
|
|
rep = append(rep, api.SectionData{
|
|
|
|
|
Type: api.TABLE,
|
|
|
|
|
Title: fmt.Sprintf("Topic [%d]", i),
|
|
|
|
|
Data: representMapAsTable(topic, fmt.Sprintf(`response.payload.topics[%d]`, i), []string{}),
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return rep
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -727,3 +843,42 @@ func representDeleteTopicsResponse(data map[string]interface{}) []interface{} {
|
|
|
|
|
|
|
|
|
|
return rep
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func contains(s []string, str string) bool {
|
|
|
|
|
for _, v := range s {
|
|
|
|
|
if v == str {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func representMapAsTable(mapData map[string]interface{}, selectorPrefix string, ignoreKeys []string) (representation string) {
|
|
|
|
|
var table []api.TableData
|
|
|
|
|
for key, value := range mapData {
|
|
|
|
|
if contains(ignoreKeys, key) {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
switch reflect.ValueOf(value).Kind() {
|
|
|
|
|
case reflect.Map:
|
|
|
|
|
fallthrough
|
|
|
|
|
case reflect.Slice:
|
|
|
|
|
x, err := json.Marshal(value)
|
|
|
|
|
value = string(x)
|
|
|
|
|
if err != nil {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
selector := fmt.Sprintf("%s[\"%s\"]", selectorPrefix, key)
|
|
|
|
|
table = append(table, api.TableData{
|
|
|
|
|
Name: strings.Join(camelcase.Split(strings.Title(key)), " "),
|
|
|
|
|
Value: value,
|
|
|
|
|
Selector: selector,
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
obj, _ := json.Marshal(table)
|
|
|
|
|
representation = string(obj)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|