Implement the representations for Kafka Fetch

This commit is contained in:
M. Mert Yildiran 2021-08-22 16:35:45 +03:00
parent 3d16ccf055
commit 26ad50f379
No known key found for this signature in database
GPG Key ID: D42ADB236521BF7A
2 changed files with 136 additions and 0 deletions

View File

@ -316,3 +316,136 @@ func representProduceResponse(data map[string]interface{}) []interface{} {
return rep
}
func representFetchRequest(data map[string]interface{}) []interface{} {
rep := make([]interface{}, 0)
rep = representRequestHeader(data, rep)
payload := data["Payload"].(map[string]interface{})
topics, _ := json.Marshal(payload["Topics"].([]interface{}))
replicaId := ""
if payload["ReplicaId"] != nil {
replicaId = fmt.Sprintf("%d", int(payload["ReplicaId"].(float64)))
}
maxBytes := ""
if payload["MaxBytes"] != nil {
maxBytes = fmt.Sprintf("%d", int(payload["MaxBytes"].(float64)))
}
isolationLevel := ""
if payload["IsolationLevel"] != nil {
isolationLevel = fmt.Sprintf("%d", int(payload["IsolationLevel"].(float64)))
}
sessionId := ""
if payload["SessionId"] != nil {
sessionId = fmt.Sprintf("%d", int(payload["SessionId"].(float64)))
}
sessionEpoch := ""
if payload["SessionEpoch"] != nil {
sessionEpoch = fmt.Sprintf("%d", int(payload["SessionEpoch"].(float64)))
}
forgottenTopicsData := ""
if payload["ForgottenTopicsData"] != nil {
x, _ := json.Marshal(payload["ForgottenTopicsData"].(map[string]interface{}))
forgottenTopicsData = string(x)
}
rackId := ""
if payload["RackId"] != nil {
rackId = fmt.Sprintf("%d", int(payload["RackId"].(float64)))
}
repPayload, _ := json.Marshal([]map[string]string{
{
"name": "Replica ID",
"value": replicaId,
},
{
"name": "Maximum Wait (ms)",
"value": fmt.Sprintf("%d", int(payload["MaxWaitMs"].(float64))),
},
{
"name": "Minimum Bytes",
"value": fmt.Sprintf("%d", int(payload["MinBytes"].(float64))),
},
{
"name": "Maximum Bytes",
"value": maxBytes,
},
{
"name": "Isolation Level",
"value": isolationLevel,
},
{
"name": "Session ID",
"value": sessionId,
},
{
"name": "Session Epoch",
"value": sessionEpoch,
},
{
"name": "Topics",
"value": string(topics),
},
{
"name": "Forgotten Topics Data",
"value": forgottenTopicsData,
},
{
"name": "Rack ID",
"value": rackId,
},
})
rep = append(rep, map[string]string{
"type": "table",
"title": "Payload",
"data": string(repPayload),
})
return rep
}
func representFetchResponse(data map[string]interface{}) []interface{} {
rep := make([]interface{}, 0)
rep = representResponseHeader(data, rep)
payload := data["Payload"].(map[string]interface{})
responses, _ := json.Marshal(payload["Responses"].([]interface{}))
throttleTimeMs := ""
if payload["ThrottleTimeMs"] != nil {
throttleTimeMs = fmt.Sprintf("%d", int(payload["ThrottleTimeMs"].(float64)))
}
errorCode := ""
if payload["ErrorCode"] != nil {
errorCode = fmt.Sprintf("%d", int(payload["ErrorCode"].(float64)))
}
sessionId := ""
if payload["SessionId"] != nil {
sessionId = fmt.Sprintf("%d", int(payload["SessionId"].(float64)))
}
repPayload, _ := json.Marshal([]map[string]string{
{
"name": "Throttle Time (ms)",
"value": throttleTimeMs,
},
{
"name": "Error Code",
"value": errorCode,
},
{
"name": "Session ID",
"value": sessionId,
},
{
"name": "Responses",
"value": string(responses),
},
})
rep = append(rep, map[string]string{
"type": "table",
"title": "Payload",
"data": string(repPayload),
})
return rep
}

View File

@ -183,6 +183,9 @@ func (d dissecting) Represent(entry string) ([]byte, error) {
case Produce:
repRequest = representProduceRequest(reqDetails)
repResponse = representProduceResponse(resDetails)
case Fetch:
repRequest = representFetchRequest(reqDetails)
repResponse = representFetchResponse(resDetails)
break
}