diff --git a/tap/extensions/kafka/helpers.go b/tap/extensions/kafka/helpers.go index d8b6f73d1..f5fac3307 100644 --- a/tap/extensions/kafka/helpers.go +++ b/tap/extensions/kafka/helpers.go @@ -2,6 +2,8 @@ package main import ( "encoding/json" + "fmt" + "strconv" ) type KafkaPayload struct { @@ -16,3 +18,156 @@ type KafkaPayloader interface { func (h KafkaPayload) MarshalJSON() ([]byte, error) { return json.Marshal(h.Data) } + +func representRequestHeader(data map[string]interface{}, rep []interface{}) []interface{} { + requestHeader, _ := json.Marshal([]map[string]string{ + { + "name": "ApiKey", + "value": apiNames[int(data["ApiKey"].(float64))], + }, + { + "name": "ApiVersion", + "value": fmt.Sprintf("%d", int(data["ApiVersion"].(float64))), + }, + { + "name": "Client ID", + "value": data["ClientID"].(string), + }, + { + "name": "Correlation ID", + "value": fmt.Sprintf("%d", int(data["CorrelationID"].(float64))), + }, + { + "name": "Size", + "value": fmt.Sprintf("%d", int(data["Size"].(float64))), + }, + }) + rep = append(rep, map[string]string{ + "type": "table", + "title": "Request Header", + "data": string(requestHeader), + }) + + return rep +} + +func representResponseHeader(data map[string]interface{}, rep []interface{}) []interface{} { + requestHeader, _ := json.Marshal([]map[string]string{ + { + "name": "Correlation ID", + "value": fmt.Sprintf("%d", int(data["CorrelationID"].(float64))), + }, + }) + rep = append(rep, map[string]string{ + "type": "table", + "title": "Response Header", + "data": string(requestHeader), + }) + + return rep +} + +func representMetadataRequest(data map[string]interface{}) []interface{} { + rep := make([]interface{}, 0) + + rep = representRequestHeader(data, rep) + + payload := data["Payload"].(map[string]interface{}) + topics, _ := json.Marshal(payload["Topics"].([]interface{})) + allowAutoTopicCreation := "" + includeClusterAuthorizedOperations := "" + includeTopicAuthorizedOperations := "" + if payload["AllowAutoTopicCreation"] != nil { + allowAutoTopicCreation = strconv.FormatBool(payload["AllowAutoTopicCreation"].(bool)) + } + if payload["IncludeClusterAuthorizedOperations"] != nil { + includeClusterAuthorizedOperations = strconv.FormatBool(payload["IncludeClusterAuthorizedOperations"].(bool)) + } + if payload["IncludeTopicAuthorizedOperations"] != nil { + includeTopicAuthorizedOperations = strconv.FormatBool(payload["IncludeTopicAuthorizedOperations"].(bool)) + } + repPayload, _ := json.Marshal([]map[string]string{ + { + "name": "Topics", + "value": string(topics), + }, + { + "name": "Allow Auto Topic Creation", + "value": allowAutoTopicCreation, + }, + { + "name": "Include Cluster Authorized Operations", + "value": includeClusterAuthorizedOperations, + }, + { + "name": "Include Topic Authorized Operations", + "value": includeTopicAuthorizedOperations, + }, + }) + rep = append(rep, map[string]string{ + "type": "table", + "title": "Payload", + "data": string(repPayload), + }) + + return rep +} + +func representMetadataResponse(data map[string]interface{}) []interface{} { + rep := make([]interface{}, 0) + + rep = representResponseHeader(data, rep) + + payload := data["Payload"].(map[string]interface{}) + topics, _ := json.Marshal(payload["Topics"].([]interface{})) + brokers, _ := json.Marshal(payload["Brokers"].([]interface{})) + controllerID := "" + clusterID := "" + throttleTimeMs := "" + clusterAuthorizedOperations := "" + if payload["ControllerID"] != nil { + controllerID = fmt.Sprintf("%d", int(payload["ControllerID"].(float64))) + } + if payload["ClusterID"] != nil { + clusterID = payload["ClusterID"].(string) + } + if payload["ThrottleTimeMs"] != nil { + throttleTimeMs = fmt.Sprintf("%d", int(payload["ThrottleTimeMs"].(float64))) + } + if payload["ClusterAuthorizedOperations"] != nil { + clusterAuthorizedOperations = fmt.Sprintf("%d", int(payload["ClusterAuthorizedOperations"].(float64))) + } + repPayload, _ := json.Marshal([]map[string]string{ + { + "name": "Throttle Time (ms)", + "value": throttleTimeMs, + }, + { + "name": "Brokers", + "value": string(brokers), + }, + { + "name": "Cluster ID", + "value": clusterID, + }, + { + "name": "Controller ID", + "value": controllerID, + }, + { + "name": "Topics", + "value": string(topics), + }, + { + "name": "Cluster Authorized Operations", + "value": clusterAuthorizedOperations, + }, + }) + rep = append(rep, map[string]string{ + "type": "table", + "title": "Payload", + "data": string(repPayload), + }) + + return rep +} diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index 67f1d9ea2..913195412 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -46,7 +46,6 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *api.MizuEntry { request := item.Pair.Request.Payload.(map[string]interface{}) - response := item.Pair.Response.Payload.(map[string]interface{}) entryBytes, _ := json.Marshal(item.Pair) service := fmt.Sprintf("kafka") apiKey := ApiKey(request["ApiKey"].(float64)) @@ -159,8 +158,28 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails { } func (d dissecting) Represent(entry string) ([]byte, error) { - // TODO: Implement - return nil, nil + var root map[string]interface{} + json.Unmarshal([]byte(entry), &root) + representation := make(map[string]interface{}, 0) + request := root["request"].(map[string]interface{})["payload"].(map[string]interface{}) + response := root["response"].(map[string]interface{})["payload"].(map[string]interface{}) + // fmt.Printf("\n\nrequest: %+v\n", request) + // fmt.Printf("response: %+v\n", response) + + apiKey := ApiKey(request["ApiKey"].(float64)) + + var repRequest []interface{} + var repResponse []interface{} + switch apiKey { + case Metadata: + repRequest = representMetadataRequest(request) + repResponse = representMetadataResponse(response) + break + } + + representation["request"] = repRequest + representation["response"] = repResponse + return json.Marshal(representation) } var Dissector dissecting