diff --git a/tap/extensions/kafka/Makefile b/tap/extensions/kafka/Makefile index a7774b084..5a4c0f843 100644 --- a/tap/extensions/kafka/Makefile +++ b/tap/extensions/kafka/Makefile @@ -13,4 +13,4 @@ test-pull-bin: test-pull-expect: @mkdir -p expect - @[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect8/kafka/\* expect + @[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect9/kafka/\* expect diff --git a/tap/extensions/kafka/helpers.go b/tap/extensions/kafka/helpers.go index aa714e7c6..24e061de2 100644 --- a/tap/extensions/kafka/helpers.go +++ b/tap/extensions/kafka/helpers.go @@ -3,13 +3,14 @@ package kafka import ( "encoding/json" "fmt" - "golang.org/x/text/cases" - "golang.org/x/text/language" "reflect" "sort" "strconv" "strings" + "golang.org/x/text/cases" + "golang.org/x/text/language" + "github.com/fatih/camelcase" "github.com/ohler55/ojg/jp" "github.com/ohler55/ojg/oj" @@ -36,9 +37,14 @@ type KafkaWrapper struct { func representRequestHeader(data map[string]interface{}, rep []interface{}) []interface{} { requestHeader, _ := json.Marshal([]api.TableData{ + { + Name: "ApiKeyName", + Value: data["apiKeyName"].(string), + Selector: `request.apiKeyName`, + }, { Name: "ApiKey", - Value: apiNames[int(data["apiKey"].(float64))], + Value: int(data["apiKey"].(float64)), Selector: `request.apiKey`, }, { diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index 151e498d7..0996ad5d1 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -96,8 +96,8 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry { statusQuery := "" apiKey := ApiKey(entry.Request["apiKey"].(float64)) - method := apiNames[apiKey] - methodQuery := fmt.Sprintf("request.apiKey == %d", int(entry.Request["apiKey"].(float64))) + method := entry.Request["apiKeyName"].(string) + methodQuery := fmt.Sprintf(`request.apiKeyName == "%s"`, method) summary := "" summaryQuery := "" diff --git a/tap/extensions/kafka/request.go b/tap/extensions/kafka/request.go index 511efa022..deb332296 100644 --- a/tap/extensions/kafka/request.go +++ b/tap/extensions/kafka/request.go @@ -11,6 +11,7 @@ import ( type Request struct { Size int32 `json:"size"` + ApiKeyName string `json:"apiKeyName"` ApiKey ApiKey `json:"apiKey"` ApiVersion int16 `json:"apiVersion"` CorrelationID int32 `json:"correlationID"` @@ -202,6 +203,7 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, ca request := &Request{ Size: size, + ApiKeyName: apiNames[apiKey], ApiKey: apiKey, ApiVersion: apiVersion, CorrelationID: correlationID,