From 58b744bc6ce917bf3506dd63f7e747da94c90e7f Mon Sep 17 00:00:00 2001 From: "M. Mert Yildiran" Date: Sun, 22 Aug 2021 17:05:36 +0300 Subject: [PATCH] Implement the representations for Kafka `ListOffsets`, `CreateTopics` and `DeleteTopics` --- tap/extensions/kafka/helpers.go | 189 ++++++++++++++++++++++++++++++++ tap/extensions/kafka/main.go | 14 +++ tap/extensions/kafka/structs.go | 6 +- 3 files changed, 206 insertions(+), 3 deletions(-) diff --git a/tap/extensions/kafka/helpers.go b/tap/extensions/kafka/helpers.go index c272f95ed..da40d62d1 100644 --- a/tap/extensions/kafka/helpers.go +++ b/tap/extensions/kafka/helpers.go @@ -449,3 +449,192 @@ func representFetchResponse(data map[string]interface{}) []interface{} { return rep } + +func representListOffsetsRequest(data map[string]interface{}) []interface{} { + rep := make([]interface{}, 0) + + rep = representRequestHeader(data, rep) + + payload := data["Payload"].(map[string]interface{}) + topics, _ := json.Marshal(payload["Topics"].([]interface{})) + repPayload, _ := json.Marshal([]map[string]string{ + { + "name": "Replica ID", + "value": fmt.Sprintf("%d", int(payload["ReplicaId"].(float64))), + }, + { + "name": "Topics", + "value": string(topics), + }, + }) + rep = append(rep, map[string]string{ + "type": "table", + "title": "Payload", + "data": string(repPayload), + }) + + return rep +} + +func representListOffsetsResponse(data map[string]interface{}) []interface{} { + rep := make([]interface{}, 0) + + rep = representResponseHeader(data, rep) + + payload := data["Payload"].(map[string]interface{}) + topics, _ := json.Marshal(payload["Topics"].([]interface{})) + throttleTimeMs := "" + if payload["ThrottleTimeMs"] != nil { + throttleTimeMs = fmt.Sprintf("%d", int(payload["ThrottleTimeMs"].(float64))) + } + repPayload, _ := json.Marshal([]map[string]string{ + { + "name": "Throttle Time (ms)", + "value": throttleTimeMs, + }, + { + "name": "Topics", + "value": string(topics), + }, + }) + rep = append(rep, map[string]string{ + "type": "table", + "title": "Payload", + "data": string(repPayload), + }) + + return rep +} + +func representCreateTopicsRequest(data map[string]interface{}) []interface{} { + rep := make([]interface{}, 0) + + rep = representRequestHeader(data, rep) + + payload := data["Payload"].(map[string]interface{}) + topics, _ := json.Marshal(payload["Topics"].([]interface{})) + validateOnly := "" + if payload["ValidateOnly"] != nil { + validateOnly = strconv.FormatBool(payload["ValidateOnly"].(bool)) + } + repPayload, _ := json.Marshal([]map[string]string{ + { + "name": "Topics", + "value": string(topics), + }, + { + "name": "Timeout (ms)", + "value": fmt.Sprintf("%d", int(payload["TimeoutMs"].(float64))), + }, + { + "name": "Validate Only", + "value": validateOnly, + }, + }) + rep = append(rep, map[string]string{ + "type": "table", + "title": "Payload", + "data": string(repPayload), + }) + + return rep +} + +func representCreateTopicsResponse(data map[string]interface{}) []interface{} { + rep := make([]interface{}, 0) + + rep = representResponseHeader(data, rep) + + payload := data["Payload"].(map[string]interface{}) + topics, _ := json.Marshal(payload["Topics"].([]interface{})) + throttleTimeMs := "" + if payload["ThrottleTimeMs"] != nil { + throttleTimeMs = fmt.Sprintf("%d", int(payload["ThrottleTimeMs"].(float64))) + } + repPayload, _ := json.Marshal([]map[string]string{ + { + "name": "Throttle Time (ms)", + "value": throttleTimeMs, + }, + { + "name": "Topics", + "value": string(topics), + }, + }) + rep = append(rep, map[string]string{ + "type": "table", + "title": "Payload", + "data": string(repPayload), + }) + + return rep +} + +func representDeleteTopicsRequest(data map[string]interface{}) []interface{} { + rep := make([]interface{}, 0) + + rep = representRequestHeader(data, rep) + + payload := data["Payload"].(map[string]interface{}) + topics := "" + if payload["Topics"] != nil { + x, _ := json.Marshal(payload["Topics"].([]interface{})) + topics = string(x) + } + topicNames := "" + if payload["TopicNames"] != nil { + x, _ := json.Marshal(payload["TopicNames"].([]interface{})) + topicNames = string(x) + } + repPayload, _ := json.Marshal([]map[string]string{ + { + "name": "TopicNames", + "value": string(topicNames), + }, + { + "name": "Topics", + "value": string(topics), + }, + { + "name": "Timeout (ms)", + "value": fmt.Sprintf("%d", int(payload["TimeoutMs"].(float64))), + }, + }) + rep = append(rep, map[string]string{ + "type": "table", + "title": "Payload", + "data": string(repPayload), + }) + + return rep +} + +func representDeleteTopicsResponse(data map[string]interface{}) []interface{} { + rep := make([]interface{}, 0) + + rep = representResponseHeader(data, rep) + + payload := data["Payload"].(map[string]interface{}) + responses, _ := json.Marshal(payload["Responses"].([]interface{})) + throttleTimeMs := "" + if payload["ThrottleTimeMs"] != nil { + throttleTimeMs = fmt.Sprintf("%d", int(payload["ThrottleTimeMs"].(float64))) + } + repPayload, _ := json.Marshal([]map[string]string{ + { + "name": "Throttle Time (ms)", + "value": throttleTimeMs, + }, + { + "name": "Responses", + "value": string(responses), + }, + }) + rep = append(rep, map[string]string{ + "type": "table", + "title": "Payload", + "data": string(repPayload), + }) + + return rep +} diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index f89b5abbf..7e48ce2cb 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -180,13 +180,27 @@ func (d dissecting) Represent(entry string) ([]byte, error) { case ApiVersions: repRequest = representApiVersionsRequest(reqDetails) repResponse = representApiVersionsResponse(resDetails) + break case Produce: repRequest = representProduceRequest(reqDetails) repResponse = representProduceResponse(resDetails) + break case Fetch: repRequest = representFetchRequest(reqDetails) repResponse = representFetchResponse(resDetails) break + case ListOffsets: + repRequest = representListOffsetsRequest(reqDetails) + repResponse = representListOffsetsResponse(resDetails) + break + case CreateTopics: + repRequest = representCreateTopicsRequest(reqDetails) + repResponse = representCreateTopicsResponse(resDetails) + break + case DeleteTopics: + repRequest = representDeleteTopicsRequest(reqDetails) + repResponse = representDeleteTopicsResponse(resDetails) + break } representation["request"] = repRequest diff --git a/tap/extensions/kafka/structs.go b/tap/extensions/kafka/structs.go index db3daae59..d9aa5c1cb 100644 --- a/tap/extensions/kafka/structs.go +++ b/tap/extensions/kafka/structs.go @@ -939,7 +939,7 @@ type CreateTopicsResponseV7 struct { type DeleteTopicsRequestV0 struct { TopicNames []string - TimemoutMs int32 + TimeoutMs int32 } // DeleteTopics Request (Version: 6) @@ -950,8 +950,8 @@ type DeleteTopicsRequestTopicV6 struct { } type DeleteTopicsRequestV6 struct { - Topics []DeleteTopicsRequestTopicV6 - TimemoutMs int32 + Topics []DeleteTopicsRequestTopicV6 + TimeoutMs int32 } // DeleteTopics Response (Version: 0)