mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-09-01 18:47:39 +00:00
Implement the representations for Kafka Metadata
, RequestHeader
and ResponseHeader
This commit is contained in:
@@ -2,6 +2,8 @@ package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
type KafkaPayload struct {
|
||||
@@ -16,3 +18,156 @@ type KafkaPayloader interface {
|
||||
func (h KafkaPayload) MarshalJSON() ([]byte, error) {
|
||||
return json.Marshal(h.Data)
|
||||
}
|
||||
|
||||
func representRequestHeader(data map[string]interface{}, rep []interface{}) []interface{} {
|
||||
requestHeader, _ := json.Marshal([]map[string]string{
|
||||
{
|
||||
"name": "ApiKey",
|
||||
"value": apiNames[int(data["ApiKey"].(float64))],
|
||||
},
|
||||
{
|
||||
"name": "ApiVersion",
|
||||
"value": fmt.Sprintf("%d", int(data["ApiVersion"].(float64))),
|
||||
},
|
||||
{
|
||||
"name": "Client ID",
|
||||
"value": data["ClientID"].(string),
|
||||
},
|
||||
{
|
||||
"name": "Correlation ID",
|
||||
"value": fmt.Sprintf("%d", int(data["CorrelationID"].(float64))),
|
||||
},
|
||||
{
|
||||
"name": "Size",
|
||||
"value": fmt.Sprintf("%d", int(data["Size"].(float64))),
|
||||
},
|
||||
})
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"title": "Request Header",
|
||||
"data": string(requestHeader),
|
||||
})
|
||||
|
||||
return rep
|
||||
}
|
||||
|
||||
func representResponseHeader(data map[string]interface{}, rep []interface{}) []interface{} {
|
||||
requestHeader, _ := json.Marshal([]map[string]string{
|
||||
{
|
||||
"name": "Correlation ID",
|
||||
"value": fmt.Sprintf("%d", int(data["CorrelationID"].(float64))),
|
||||
},
|
||||
})
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"title": "Response Header",
|
||||
"data": string(requestHeader),
|
||||
})
|
||||
|
||||
return rep
|
||||
}
|
||||
|
||||
func representMetadataRequest(data map[string]interface{}) []interface{} {
|
||||
rep := make([]interface{}, 0)
|
||||
|
||||
rep = representRequestHeader(data, rep)
|
||||
|
||||
payload := data["Payload"].(map[string]interface{})
|
||||
topics, _ := json.Marshal(payload["Topics"].([]interface{}))
|
||||
allowAutoTopicCreation := ""
|
||||
includeClusterAuthorizedOperations := ""
|
||||
includeTopicAuthorizedOperations := ""
|
||||
if payload["AllowAutoTopicCreation"] != nil {
|
||||
allowAutoTopicCreation = strconv.FormatBool(payload["AllowAutoTopicCreation"].(bool))
|
||||
}
|
||||
if payload["IncludeClusterAuthorizedOperations"] != nil {
|
||||
includeClusterAuthorizedOperations = strconv.FormatBool(payload["IncludeClusterAuthorizedOperations"].(bool))
|
||||
}
|
||||
if payload["IncludeTopicAuthorizedOperations"] != nil {
|
||||
includeTopicAuthorizedOperations = strconv.FormatBool(payload["IncludeTopicAuthorizedOperations"].(bool))
|
||||
}
|
||||
repPayload, _ := json.Marshal([]map[string]string{
|
||||
{
|
||||
"name": "Topics",
|
||||
"value": string(topics),
|
||||
},
|
||||
{
|
||||
"name": "Allow Auto Topic Creation",
|
||||
"value": allowAutoTopicCreation,
|
||||
},
|
||||
{
|
||||
"name": "Include Cluster Authorized Operations",
|
||||
"value": includeClusterAuthorizedOperations,
|
||||
},
|
||||
{
|
||||
"name": "Include Topic Authorized Operations",
|
||||
"value": includeTopicAuthorizedOperations,
|
||||
},
|
||||
})
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"title": "Payload",
|
||||
"data": string(repPayload),
|
||||
})
|
||||
|
||||
return rep
|
||||
}
|
||||
|
||||
func representMetadataResponse(data map[string]interface{}) []interface{} {
|
||||
rep := make([]interface{}, 0)
|
||||
|
||||
rep = representResponseHeader(data, rep)
|
||||
|
||||
payload := data["Payload"].(map[string]interface{})
|
||||
topics, _ := json.Marshal(payload["Topics"].([]interface{}))
|
||||
brokers, _ := json.Marshal(payload["Brokers"].([]interface{}))
|
||||
controllerID := ""
|
||||
clusterID := ""
|
||||
throttleTimeMs := ""
|
||||
clusterAuthorizedOperations := ""
|
||||
if payload["ControllerID"] != nil {
|
||||
controllerID = fmt.Sprintf("%d", int(payload["ControllerID"].(float64)))
|
||||
}
|
||||
if payload["ClusterID"] != nil {
|
||||
clusterID = payload["ClusterID"].(string)
|
||||
}
|
||||
if payload["ThrottleTimeMs"] != nil {
|
||||
throttleTimeMs = fmt.Sprintf("%d", int(payload["ThrottleTimeMs"].(float64)))
|
||||
}
|
||||
if payload["ClusterAuthorizedOperations"] != nil {
|
||||
clusterAuthorizedOperations = fmt.Sprintf("%d", int(payload["ClusterAuthorizedOperations"].(float64)))
|
||||
}
|
||||
repPayload, _ := json.Marshal([]map[string]string{
|
||||
{
|
||||
"name": "Throttle Time (ms)",
|
||||
"value": throttleTimeMs,
|
||||
},
|
||||
{
|
||||
"name": "Brokers",
|
||||
"value": string(brokers),
|
||||
},
|
||||
{
|
||||
"name": "Cluster ID",
|
||||
"value": clusterID,
|
||||
},
|
||||
{
|
||||
"name": "Controller ID",
|
||||
"value": controllerID,
|
||||
},
|
||||
{
|
||||
"name": "Topics",
|
||||
"value": string(topics),
|
||||
},
|
||||
{
|
||||
"name": "Cluster Authorized Operations",
|
||||
"value": clusterAuthorizedOperations,
|
||||
},
|
||||
})
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"title": "Payload",
|
||||
"data": string(repPayload),
|
||||
})
|
||||
|
||||
return rep
|
||||
}
|
||||
|
@@ -46,7 +46,6 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em
|
||||
|
||||
func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *api.MizuEntry {
|
||||
request := item.Pair.Request.Payload.(map[string]interface{})
|
||||
response := item.Pair.Response.Payload.(map[string]interface{})
|
||||
entryBytes, _ := json.Marshal(item.Pair)
|
||||
service := fmt.Sprintf("kafka")
|
||||
apiKey := ApiKey(request["ApiKey"].(float64))
|
||||
@@ -159,8 +158,28 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
|
||||
}
|
||||
|
||||
func (d dissecting) Represent(entry string) ([]byte, error) {
|
||||
// TODO: Implement
|
||||
return nil, nil
|
||||
var root map[string]interface{}
|
||||
json.Unmarshal([]byte(entry), &root)
|
||||
representation := make(map[string]interface{}, 0)
|
||||
request := root["request"].(map[string]interface{})["payload"].(map[string]interface{})
|
||||
response := root["response"].(map[string]interface{})["payload"].(map[string]interface{})
|
||||
// fmt.Printf("\n\nrequest: %+v\n", request)
|
||||
// fmt.Printf("response: %+v\n", response)
|
||||
|
||||
apiKey := ApiKey(request["ApiKey"].(float64))
|
||||
|
||||
var repRequest []interface{}
|
||||
var repResponse []interface{}
|
||||
switch apiKey {
|
||||
case Metadata:
|
||||
repRequest = representMetadataRequest(request)
|
||||
repResponse = representMetadataResponse(response)
|
||||
break
|
||||
}
|
||||
|
||||
representation["request"] = repRequest
|
||||
representation["response"] = repResponse
|
||||
return json.Marshal(representation)
|
||||
}
|
||||
|
||||
var Dissector dissecting
|
||||
|
Reference in New Issue
Block a user