Fix the interface conversion errors in Kafka (#333)

This commit is contained in:
M. Mert Yıldıran 2021-10-08 07:35:20 +03:00 committed by GitHub
parent 77710cc411
commit eb4a541376
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 46 additions and 14 deletions

View File

@ -130,8 +130,16 @@ func representMetadataResponse(data map[string]interface{}) []interface{} {
rep = representResponseHeader(data, rep) rep = representResponseHeader(data, rep)
payload := data["Payload"].(map[string]interface{}) payload := data["Payload"].(map[string]interface{})
topics, _ := json.Marshal(payload["Topics"].([]interface{})) topics := ""
brokers, _ := json.Marshal(payload["Brokers"].([]interface{})) if payload["Topics"] != nil {
_topics, _ := json.Marshal(payload["Topics"].([]interface{}))
topics = string(_topics)
}
brokers := ""
if payload["Brokers"] != nil {
_brokers, _ := json.Marshal(payload["Brokers"].([]interface{}))
brokers = string(_brokers)
}
controllerID := "" controllerID := ""
clusterID := "" clusterID := ""
throttleTimeMs := "" throttleTimeMs := ""
@ -155,7 +163,7 @@ func representMetadataResponse(data map[string]interface{}) []interface{} {
}, },
{ {
"name": "Brokers", "name": "Brokers",
"value": string(brokers), "value": brokers,
}, },
{ {
"name": "Cluster ID", "name": "Cluster ID",
@ -167,7 +175,7 @@ func representMetadataResponse(data map[string]interface{}) []interface{} {
}, },
{ {
"name": "Topics", "name": "Topics",
"value": string(topics), "value": topics,
}, },
{ {
"name": "Cluster Authorized Operations", "name": "Cluster Authorized Operations",
@ -303,7 +311,11 @@ func representProduceResponse(data map[string]interface{}) []interface{} {
rep = representResponseHeader(data, rep) rep = representResponseHeader(data, rep)
payload := data["Payload"].(map[string]interface{}) payload := data["Payload"].(map[string]interface{})
responses, _ := json.Marshal(payload["Responses"].([]interface{})) responses := ""
if payload["Responses"] != nil {
_responses, _ := json.Marshal(payload["Responses"].([]interface{}))
responses = string(_responses)
}
throttleTimeMs := "" throttleTimeMs := ""
if payload["ThrottleTimeMs"] != nil { if payload["ThrottleTimeMs"] != nil {
throttleTimeMs = fmt.Sprintf("%d", int(payload["ThrottleTimeMs"].(float64))) throttleTimeMs = fmt.Sprintf("%d", int(payload["ThrottleTimeMs"].(float64)))
@ -333,7 +345,11 @@ func representFetchRequest(data map[string]interface{}) []interface{} {
rep = representRequestHeader(data, rep) rep = representRequestHeader(data, rep)
payload := data["Payload"].(map[string]interface{}) payload := data["Payload"].(map[string]interface{})
topics, _ := json.Marshal(payload["Topics"].([]interface{})) topics := ""
if payload["Topics"] != nil {
_topics, _ := json.Marshal(payload["Topics"].([]interface{}))
topics = string(_topics)
}
replicaId := "" replicaId := ""
if payload["ReplicaId"] != nil { if payload["ReplicaId"] != nil {
replicaId = fmt.Sprintf("%d", int(payload["ReplicaId"].(float64))) replicaId = fmt.Sprintf("%d", int(payload["ReplicaId"].(float64)))
@ -361,7 +377,7 @@ func representFetchRequest(data map[string]interface{}) []interface{} {
} }
rackId := "" rackId := ""
if payload["RackId"] != nil { if payload["RackId"] != nil {
rackId = fmt.Sprintf("%d", int(payload["RackId"].(float64))) rackId = payload["RackId"].(string)
} }
repPayload, _ := json.Marshal([]map[string]string{ repPayload, _ := json.Marshal([]map[string]string{
{ {
@ -394,7 +410,7 @@ func representFetchRequest(data map[string]interface{}) []interface{} {
}, },
{ {
"name": "Topics", "name": "Topics",
"value": string(topics), "value": topics,
}, },
{ {
"name": "Forgotten Topics Data", "name": "Forgotten Topics Data",
@ -420,7 +436,11 @@ func representFetchResponse(data map[string]interface{}) []interface{} {
rep = representResponseHeader(data, rep) rep = representResponseHeader(data, rep)
payload := data["Payload"].(map[string]interface{}) payload := data["Payload"].(map[string]interface{})
responses, _ := json.Marshal(payload["Responses"].([]interface{})) responses := ""
if payload["Responses"] != nil {
_responses, _ := json.Marshal(payload["Responses"].([]interface{}))
responses = string(_responses)
}
throttleTimeMs := "" throttleTimeMs := ""
if payload["ThrottleTimeMs"] != nil { if payload["ThrottleTimeMs"] != nil {
throttleTimeMs = fmt.Sprintf("%d", int(payload["ThrottleTimeMs"].(float64))) throttleTimeMs = fmt.Sprintf("%d", int(payload["ThrottleTimeMs"].(float64)))
@ -448,7 +468,7 @@ func representFetchResponse(data map[string]interface{}) []interface{} {
}, },
{ {
"name": "Responses", "name": "Responses",
"value": string(responses), "value": responses,
}, },
}) })
rep = append(rep, map[string]string{ rep = append(rep, map[string]string{
@ -466,7 +486,11 @@ func representListOffsetsRequest(data map[string]interface{}) []interface{} {
rep = representRequestHeader(data, rep) rep = representRequestHeader(data, rep)
payload := data["Payload"].(map[string]interface{}) payload := data["Payload"].(map[string]interface{})
topics, _ := json.Marshal(payload["Topics"].([]interface{})) topics := ""
if payload["Topics"] != nil {
_topics, _ := json.Marshal(payload["Topics"].([]interface{}))
topics = string(_topics)
}
repPayload, _ := json.Marshal([]map[string]string{ repPayload, _ := json.Marshal([]map[string]string{
{ {
"name": "Replica ID", "name": "Replica ID",
@ -474,7 +498,7 @@ func representListOffsetsRequest(data map[string]interface{}) []interface{} {
}, },
{ {
"name": "Topics", "name": "Topics",
"value": string(topics), "value": topics,
}, },
}) })
rep = append(rep, map[string]string{ rep = append(rep, map[string]string{

View File

@ -104,7 +104,11 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
} }
break break
case Fetch: case Fetch:
topics := reqDetails["Payload"].(map[string]interface{})["Topics"].([]interface{}) _topics := reqDetails["Payload"].(map[string]interface{})["Topics"]
if _topics == nil {
break
}
topics := _topics.([]interface{})
for _, topic := range topics { for _, topic := range topics {
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["Topic"].(string)) summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["Topic"].(string))
} }
@ -113,7 +117,11 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
} }
break break
case ListOffsets: case ListOffsets:
topics := reqDetails["Payload"].(map[string]interface{})["Topics"].([]interface{}) _topics := reqDetails["Payload"].(map[string]interface{})["Topics"]
if _topics == nil {
break
}
topics := _topics.([]interface{})
for _, topic := range topics { for _, topic := range topics {
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["Name"].(string)) summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["Name"].(string))
} }