Implement the representations for Kafka ListOffsets, CreateTopics and DeleteTopics

This commit is contained in:
M. Mert Yildiran 2021-08-22 17:05:36 +03:00
parent 26ad50f379
commit 58b744bc6c
No known key found for this signature in database
GPG Key ID: D42ADB236521BF7A
3 changed files with 206 additions and 3 deletions

View File

@ -449,3 +449,192 @@ func representFetchResponse(data map[string]interface{}) []interface{} {
return rep
}
func representListOffsetsRequest(data map[string]interface{}) []interface{} {
rep := make([]interface{}, 0)
rep = representRequestHeader(data, rep)
payload := data["Payload"].(map[string]interface{})
topics, _ := json.Marshal(payload["Topics"].([]interface{}))
repPayload, _ := json.Marshal([]map[string]string{
{
"name": "Replica ID",
"value": fmt.Sprintf("%d", int(payload["ReplicaId"].(float64))),
},
{
"name": "Topics",
"value": string(topics),
},
})
rep = append(rep, map[string]string{
"type": "table",
"title": "Payload",
"data": string(repPayload),
})
return rep
}
func representListOffsetsResponse(data map[string]interface{}) []interface{} {
rep := make([]interface{}, 0)
rep = representResponseHeader(data, rep)
payload := data["Payload"].(map[string]interface{})
topics, _ := json.Marshal(payload["Topics"].([]interface{}))
throttleTimeMs := ""
if payload["ThrottleTimeMs"] != nil {
throttleTimeMs = fmt.Sprintf("%d", int(payload["ThrottleTimeMs"].(float64)))
}
repPayload, _ := json.Marshal([]map[string]string{
{
"name": "Throttle Time (ms)",
"value": throttleTimeMs,
},
{
"name": "Topics",
"value": string(topics),
},
})
rep = append(rep, map[string]string{
"type": "table",
"title": "Payload",
"data": string(repPayload),
})
return rep
}
func representCreateTopicsRequest(data map[string]interface{}) []interface{} {
rep := make([]interface{}, 0)
rep = representRequestHeader(data, rep)
payload := data["Payload"].(map[string]interface{})
topics, _ := json.Marshal(payload["Topics"].([]interface{}))
validateOnly := ""
if payload["ValidateOnly"] != nil {
validateOnly = strconv.FormatBool(payload["ValidateOnly"].(bool))
}
repPayload, _ := json.Marshal([]map[string]string{
{
"name": "Topics",
"value": string(topics),
},
{
"name": "Timeout (ms)",
"value": fmt.Sprintf("%d", int(payload["TimeoutMs"].(float64))),
},
{
"name": "Validate Only",
"value": validateOnly,
},
})
rep = append(rep, map[string]string{
"type": "table",
"title": "Payload",
"data": string(repPayload),
})
return rep
}
func representCreateTopicsResponse(data map[string]interface{}) []interface{} {
rep := make([]interface{}, 0)
rep = representResponseHeader(data, rep)
payload := data["Payload"].(map[string]interface{})
topics, _ := json.Marshal(payload["Topics"].([]interface{}))
throttleTimeMs := ""
if payload["ThrottleTimeMs"] != nil {
throttleTimeMs = fmt.Sprintf("%d", int(payload["ThrottleTimeMs"].(float64)))
}
repPayload, _ := json.Marshal([]map[string]string{
{
"name": "Throttle Time (ms)",
"value": throttleTimeMs,
},
{
"name": "Topics",
"value": string(topics),
},
})
rep = append(rep, map[string]string{
"type": "table",
"title": "Payload",
"data": string(repPayload),
})
return rep
}
func representDeleteTopicsRequest(data map[string]interface{}) []interface{} {
rep := make([]interface{}, 0)
rep = representRequestHeader(data, rep)
payload := data["Payload"].(map[string]interface{})
topics := ""
if payload["Topics"] != nil {
x, _ := json.Marshal(payload["Topics"].([]interface{}))
topics = string(x)
}
topicNames := ""
if payload["TopicNames"] != nil {
x, _ := json.Marshal(payload["TopicNames"].([]interface{}))
topicNames = string(x)
}
repPayload, _ := json.Marshal([]map[string]string{
{
"name": "TopicNames",
"value": string(topicNames),
},
{
"name": "Topics",
"value": string(topics),
},
{
"name": "Timeout (ms)",
"value": fmt.Sprintf("%d", int(payload["TimeoutMs"].(float64))),
},
})
rep = append(rep, map[string]string{
"type": "table",
"title": "Payload",
"data": string(repPayload),
})
return rep
}
func representDeleteTopicsResponse(data map[string]interface{}) []interface{} {
rep := make([]interface{}, 0)
rep = representResponseHeader(data, rep)
payload := data["Payload"].(map[string]interface{})
responses, _ := json.Marshal(payload["Responses"].([]interface{}))
throttleTimeMs := ""
if payload["ThrottleTimeMs"] != nil {
throttleTimeMs = fmt.Sprintf("%d", int(payload["ThrottleTimeMs"].(float64)))
}
repPayload, _ := json.Marshal([]map[string]string{
{
"name": "Throttle Time (ms)",
"value": throttleTimeMs,
},
{
"name": "Responses",
"value": string(responses),
},
})
rep = append(rep, map[string]string{
"type": "table",
"title": "Payload",
"data": string(repPayload),
})
return rep
}

View File

@ -180,13 +180,27 @@ func (d dissecting) Represent(entry string) ([]byte, error) {
case ApiVersions:
repRequest = representApiVersionsRequest(reqDetails)
repResponse = representApiVersionsResponse(resDetails)
break
case Produce:
repRequest = representProduceRequest(reqDetails)
repResponse = representProduceResponse(resDetails)
break
case Fetch:
repRequest = representFetchRequest(reqDetails)
repResponse = representFetchResponse(resDetails)
break
case ListOffsets:
repRequest = representListOffsetsRequest(reqDetails)
repResponse = representListOffsetsResponse(resDetails)
break
case CreateTopics:
repRequest = representCreateTopicsRequest(reqDetails)
repResponse = representCreateTopicsResponse(resDetails)
break
case DeleteTopics:
repRequest = representDeleteTopicsRequest(reqDetails)
repResponse = representDeleteTopicsResponse(resDetails)
break
}
representation["request"] = repRequest

View File

@ -939,7 +939,7 @@ type CreateTopicsResponseV7 struct {
type DeleteTopicsRequestV0 struct {
TopicNames []string
TimemoutMs int32
TimeoutMs int32
}
// DeleteTopics Request (Version: 6)
@ -950,8 +950,8 @@ type DeleteTopicsRequestTopicV6 struct {
}
type DeleteTopicsRequestV6 struct {
Topics []DeleteTopicsRequestTopicV6
TimemoutMs int32
Topics []DeleteTopicsRequestTopicV6
TimeoutMs int32
}
// DeleteTopics Response (Version: 0)