From eb4a5413768dcc2cbaf17301b6e75e8d0cf8d479 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=2E=20Mert=20Y=C4=B1ld=C4=B1ran?= Date: Fri, 8 Oct 2021 07:35:20 +0300 Subject: [PATCH] Fix the interface conversion errors in Kafka (#333) --- tap/extensions/kafka/helpers.go | 48 ++++++++++++++++++++++++--------- tap/extensions/kafka/main.go | 12 +++++++-- 2 files changed, 46 insertions(+), 14 deletions(-) diff --git a/tap/extensions/kafka/helpers.go b/tap/extensions/kafka/helpers.go index 00c41e49f..3a9a238eb 100644 --- a/tap/extensions/kafka/helpers.go +++ b/tap/extensions/kafka/helpers.go @@ -130,8 +130,16 @@ func representMetadataResponse(data map[string]interface{}) []interface{} { rep = representResponseHeader(data, rep) payload := data["Payload"].(map[string]interface{}) - topics, _ := json.Marshal(payload["Topics"].([]interface{})) - brokers, _ := json.Marshal(payload["Brokers"].([]interface{})) + topics := "" + if payload["Topics"] != nil { + _topics, _ := json.Marshal(payload["Topics"].([]interface{})) + topics = string(_topics) + } + brokers := "" + if payload["Brokers"] != nil { + _brokers, _ := json.Marshal(payload["Brokers"].([]interface{})) + brokers = string(_brokers) + } controllerID := "" clusterID := "" throttleTimeMs := "" @@ -155,7 +163,7 @@ func representMetadataResponse(data map[string]interface{}) []interface{} { }, { "name": "Brokers", - "value": string(brokers), + "value": brokers, }, { "name": "Cluster ID", @@ -167,7 +175,7 @@ func representMetadataResponse(data map[string]interface{}) []interface{} { }, { "name": "Topics", - "value": string(topics), + "value": topics, }, { "name": "Cluster Authorized Operations", @@ -303,7 +311,11 @@ func representProduceResponse(data map[string]interface{}) []interface{} { rep = representResponseHeader(data, rep) payload := data["Payload"].(map[string]interface{}) - responses, _ := json.Marshal(payload["Responses"].([]interface{})) + responses := "" + if payload["Responses"] != nil { + _responses, _ := json.Marshal(payload["Responses"].([]interface{})) + responses = string(_responses) + } throttleTimeMs := "" if payload["ThrottleTimeMs"] != nil { throttleTimeMs = fmt.Sprintf("%d", int(payload["ThrottleTimeMs"].(float64))) @@ -333,7 +345,11 @@ func representFetchRequest(data map[string]interface{}) []interface{} { rep = representRequestHeader(data, rep) payload := data["Payload"].(map[string]interface{}) - topics, _ := json.Marshal(payload["Topics"].([]interface{})) + topics := "" + if payload["Topics"] != nil { + _topics, _ := json.Marshal(payload["Topics"].([]interface{})) + topics = string(_topics) + } replicaId := "" if payload["ReplicaId"] != nil { replicaId = fmt.Sprintf("%d", int(payload["ReplicaId"].(float64))) @@ -361,7 +377,7 @@ func representFetchRequest(data map[string]interface{}) []interface{} { } rackId := "" if payload["RackId"] != nil { - rackId = fmt.Sprintf("%d", int(payload["RackId"].(float64))) + rackId = payload["RackId"].(string) } repPayload, _ := json.Marshal([]map[string]string{ { @@ -394,7 +410,7 @@ func representFetchRequest(data map[string]interface{}) []interface{} { }, { "name": "Topics", - "value": string(topics), + "value": topics, }, { "name": "Forgotten Topics Data", @@ -420,7 +436,11 @@ func representFetchResponse(data map[string]interface{}) []interface{} { rep = representResponseHeader(data, rep) payload := data["Payload"].(map[string]interface{}) - responses, _ := json.Marshal(payload["Responses"].([]interface{})) + responses := "" + if payload["Responses"] != nil { + _responses, _ := json.Marshal(payload["Responses"].([]interface{})) + responses = string(_responses) + } throttleTimeMs := "" if payload["ThrottleTimeMs"] != nil { throttleTimeMs = fmt.Sprintf("%d", int(payload["ThrottleTimeMs"].(float64))) @@ -448,7 +468,7 @@ func representFetchResponse(data map[string]interface{}) []interface{} { }, { "name": "Responses", - "value": string(responses), + "value": responses, }, }) rep = append(rep, map[string]string{ @@ -466,7 +486,11 @@ func representListOffsetsRequest(data map[string]interface{}) []interface{} { rep = representRequestHeader(data, rep) payload := data["Payload"].(map[string]interface{}) - topics, _ := json.Marshal(payload["Topics"].([]interface{})) + topics := "" + if payload["Topics"] != nil { + _topics, _ := json.Marshal(payload["Topics"].([]interface{})) + topics = string(_topics) + } repPayload, _ := json.Marshal([]map[string]string{ { "name": "Replica ID", @@ -474,7 +498,7 @@ func representListOffsetsRequest(data map[string]interface{}) []interface{} { }, { "name": "Topics", - "value": string(topics), + "value": topics, }, }) rep = append(rep, map[string]string{ diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index ca9291c30..b677182d1 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -104,7 +104,11 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve } break case Fetch: - topics := reqDetails["Payload"].(map[string]interface{})["Topics"].([]interface{}) + _topics := reqDetails["Payload"].(map[string]interface{})["Topics"] + if _topics == nil { + break + } + topics := _topics.([]interface{}) for _, topic := range topics { summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["Topic"].(string)) } @@ -113,7 +117,11 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve } break case ListOffsets: - topics := reqDetails["Payload"].(map[string]interface{})["Topics"].([]interface{}) + _topics := reqDetails["Payload"].(map[string]interface{})["Topics"] + if _topics == nil { + break + } + topics := _topics.([]interface{}) for _, topic := range topics { summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["Name"].(string)) }