Migrate from SQLite to Basenine and introduce a new filtering syntax (#279)

* Fix the OOMKilled error by calling `debug.FreeOSMemory` periodically

* Remove `MAX_NUMBER_OF_GOROUTINES` environment variable

* Change the line

* Increase the default value of `TCP_STREAM_CHANNEL_TIMEOUT_MS` to `10000`

* Write the client and integrate to the new real-time database

* Refactor the WebSocket implementaiton for `/ws`

* Adapt the UI to the new filtering system

* Fix the rest of the issues in the UI

* Increase the buffer of the scanner

* Implement accessing single records

* Increase the buffer of another scanner

* Populate `Request` and `Response` fields of `MizuEntry`

* Add syntax highlighting for the query

* Add database to `Dockerfile`

* Fix some issues

* Update the `realtime_dbms` Git module commit hash

* Upgrade Gin version and print the query string

* Revert "Upgrade Gin version and print the query string"

This reverts commit aa09f904ee.

* Use WebSocket's itself to query instead of the query string

* Fix some errors related to conversion to HAR

* Fix the issues caused by the latest merge

* Fix the build error

* Fix PR validation GitHub workflow

* Replace the git submodule with latest Basenine version `0.1.0`

Remove `realtime_client.go` and use the official client library `github.com/up9inc/basenine/client/go` instead.

* Move Basenine host and port constants to `shared` module

* Reliably execute and wait for Basenine to become available

* Upgrade Basenine version

* Properly close WebSocket and data channel

* Fix the issues caused by the recent merge commit

* Clean up the TypeScript code

* Update `.gitignore`

* Limit the database size

* Add `Macros` method signature to `Dissector` interface and set the macros provided by the protocol extensions

* Run `go mod tidy` on `agent`

* Upgrade `github.com/up9inc/basenine/client/go` version

* Implement a mechanism to update the query using click events in the UI and use it for protocol macros

* Update the query on click to timestamps

* Fix some issues in the WebSocket and channel handling

* Update the query on clicks to status code

* Update the query on clicks to method, path and service

* Update the query on clicks to is outgoing, source and destination ports

* Add an API endpoint to validate the query against syntax errors

* Move the query background color state into `TrafficPage`

* Fix the logic in `setQuery`

* Display a toast message in case of a syntax error in the query

* Remove a call to `fmt.Printf`

* Upgrade Basenine version to `0.1.3`

* Fix an issue related to getting `MAX_ENTRIES_DB_BYTES` environment variable

* Have the `path` key in request details, in HTTP

* Rearrange the HTTP headers for the querying

* Do the same thing for `cookies` and `queryString`

* Update the query on click to table elements

Add the selectors for `TABLE` type representations in HTTP extension.

* Update the query on click to `bodySize` and `elapsedTime` in `EntryTitle`

* Add the selectors for `TABLE` type representations in AMQP extension

* Add the selectors for `TABLE` type representations in Kafka extension

* Add the selectors for `TABLE` type representations in Redis extension

* Define a struct in `tap/api.go` for the section representation data

* Add the selectors for `BODY` type representations

* Add `request.path` to the HTTP request details

* Change the summary string's field name from `path` to `summary`

* Introduce `queryable` CSS class for queryable UI elements and underline them on hover

* Instead of `N requests` at the bottom, make it `Displaying N results (queried X/Y)` and live update the values

Upgrade Basenine version to `0.2.0`.

* Verify the sha256sum of Basenine executable inside `Dockerfile`

* Pass the start time to web UI through WebSocket and always show the `EntriesList` footer

* Pipe the `stderr` of Basenine as well

* Fix the layout issues related to `CodeEditor` in the UI

* Use the correct `shasum` command in `Dockerfile`

* Upgrade Basenine version to `0.2.1`

* Limit the height of `CodeEditor` container

* Remove `Paused` enum `ConnectionStatus` in UI

* Fix the issue caused by the recent merge

* Add the filtering guide (cheatsheet)

* Update open cheatsheet button's title

* Update cheatsheet content

* Remove the old SQLite code, adapt the `--analyze` related code to Basenine

* Change the method signature of `NewEntry`

* Change the method signature of `Represent`

* Introduce `HTTPPair` field in `MizuEntry` specific to HTTP

* Remove `Entry`, `EntryId` and `EstimatedSizeBytes` fields from `MizuEntry`

Also remove the `getEstimatedEntrySizeBytes` method.

* Remove `gorm.io/gorm` dependency

* Remove unused `sensitiveDataFiltering` folder

* Increase the left margin of open cheatsheet button

* Add `overflow: auto` to the cheatsheet `Modal`

* Fix `GetEntry` method

* Fix the macro for gRPC

* Fix an interface conversion in case of AMQP

* Fix two more interface conversion errors in AMQP

* Make the `syncEntriesImpl` method blocking

* Fix a grammar mistake in the cheatsheet

* Adapt to the changes in the recent merge commit

* Improve the cheatsheet text

* Always display the timestamp in `en-US`

* Upgrade Basenine version to `0.2.2`

* Fix the order of closing Basenine connections and channels

* Don't close the Basenine channels at all

* Upgrade Basenine version to `0.2.3`

* Set the initial filter to `rlimit(100)`

* Make Basenine persistent

* Upgrade Basenine version to `0.2.4`

* Update `debug.Dockerfile`

* Fix a failing test

* Upgrade Basenine version to `0.2.5`

* Revert "Do not show play icon when disconnected (#428)"

This reverts commit 8af2e562f8.

* Upgrade Basenine version to `0.2.6`

* Make all non-informative things informative

* Make `100` a constant

* Use `===` in JavaScript no matter what

* Remove a forgotten `console.log`

* Add a comment and update the `query` in `syncEntriesImpl`

* Don't call `panic` in `GetEntry`

* Replace `panic` calls in `startBasenineServer` with `logger.Log.Panicf`

* Remove unnecessary `\n` characters in the logs
This commit is contained in:
M. Mert Yıldıran
2021-11-09 19:54:48 +03:00
committed by GitHub
parent 31d95c6557
commit d2fe3f6620
62 changed files with 3077 additions and 2327 deletions

View File

@@ -27,48 +27,54 @@ type KafkaWrapper struct {
}
func representRequestHeader(data map[string]interface{}, rep []interface{}) []interface{} {
requestHeader, _ := json.Marshal([]map[string]string{
requestHeader, _ := json.Marshal([]api.TableData{
{
"name": "ApiKey",
"value": apiNames[int(data["ApiKey"].(float64))],
Name: "ApiKey",
Value: apiNames[int(data["apiKey"].(float64))],
Selector: `request.apiKey`,
},
{
"name": "ApiVersion",
"value": fmt.Sprintf("%d", int(data["ApiVersion"].(float64))),
Name: "ApiVersion",
Value: fmt.Sprintf("%d", int(data["apiVersion"].(float64))),
Selector: `request.apiVersion`,
},
{
"name": "Client ID",
"value": data["ClientID"].(string),
Name: "Client ID",
Value: data["clientID"].(string),
Selector: `request.clientID`,
},
{
"name": "Correlation ID",
"value": fmt.Sprintf("%d", int(data["CorrelationID"].(float64))),
Name: "Correlation ID",
Value: fmt.Sprintf("%d", int(data["correlationID"].(float64))),
Selector: `request.correlationID`,
},
{
"name": "Size",
"value": fmt.Sprintf("%d", int(data["Size"].(float64))),
Name: "Size",
Value: fmt.Sprintf("%d", int(data["size"].(float64))),
Selector: `request.size`,
},
})
rep = append(rep, map[string]string{
"type": api.TABLE,
"title": "Request Header",
"data": string(requestHeader),
rep = append(rep, api.SectionData{
Type: api.TABLE,
Title: "Request Header",
Data: string(requestHeader),
})
return rep
}
func representResponseHeader(data map[string]interface{}, rep []interface{}) []interface{} {
requestHeader, _ := json.Marshal([]map[string]string{
requestHeader, _ := json.Marshal([]api.TableData{
{
"name": "Correlation ID",
"value": fmt.Sprintf("%d", int(data["CorrelationID"].(float64))),
Name: "Correlation ID",
Value: fmt.Sprintf("%d", int(data["correlationID"].(float64))),
Selector: `response.correlationID`,
},
})
rep = append(rep, map[string]string{
"type": api.TABLE,
"title": "Response Header",
"data": string(requestHeader),
rep = append(rep, api.SectionData{
Type: api.TABLE,
Title: "Response Header",
Data: string(requestHeader),
})
return rep
@@ -79,46 +85,50 @@ func representMetadataRequest(data map[string]interface{}) []interface{} {
rep = representRequestHeader(data, rep)
payload := data["Payload"].(map[string]interface{})
payload := data["payload"].(map[string]interface{})
topics := ""
allowAutoTopicCreation := ""
includeClusterAuthorizedOperations := ""
includeTopicAuthorizedOperations := ""
if payload["Topics"] != nil {
x, _ := json.Marshal(payload["Topics"].([]interface{}))
if payload["topics"] != nil {
x, _ := json.Marshal(payload["topics"].([]interface{}))
topics = string(x)
}
if payload["AllowAutoTopicCreation"] != nil {
allowAutoTopicCreation = strconv.FormatBool(payload["AllowAutoTopicCreation"].(bool))
if payload["allowAutoTopicCreation"] != nil {
allowAutoTopicCreation = strconv.FormatBool(payload["allowAutoTopicCreation"].(bool))
}
if payload["IncludeClusterAuthorizedOperations"] != nil {
includeClusterAuthorizedOperations = strconv.FormatBool(payload["IncludeClusterAuthorizedOperations"].(bool))
if payload["includeClusterAuthorizedOperations"] != nil {
includeClusterAuthorizedOperations = strconv.FormatBool(payload["includeClusterAuthorizedOperations"].(bool))
}
if payload["IncludeTopicAuthorizedOperations"] != nil {
includeTopicAuthorizedOperations = strconv.FormatBool(payload["IncludeTopicAuthorizedOperations"].(bool))
if payload["includeTopicAuthorizedOperations"] != nil {
includeTopicAuthorizedOperations = strconv.FormatBool(payload["includeTopicAuthorizedOperations"].(bool))
}
repPayload, _ := json.Marshal([]map[string]string{
repPayload, _ := json.Marshal([]api.TableData{
{
"name": "Topics",
"value": topics,
Name: "Topics",
Value: topics,
Selector: `request.payload.topics`,
},
{
"name": "Allow Auto Topic Creation",
"value": allowAutoTopicCreation,
Name: "Allow Auto Topic Creation",
Value: allowAutoTopicCreation,
Selector: `request.payload.allowAutoTopicCreation`,
},
{
"name": "Include Cluster Authorized Operations",
"value": includeClusterAuthorizedOperations,
Name: "Include Cluster Authorized Operations",
Value: includeClusterAuthorizedOperations,
Selector: `request.payload.includeClusterAuthorizedOperations`,
},
{
"name": "Include Topic Authorized Operations",
"value": includeTopicAuthorizedOperations,
Name: "Include Topic Authorized Operations",
Value: includeTopicAuthorizedOperations,
Selector: `request.payload.includeTopicAuthorizedOperations`,
},
})
rep = append(rep, map[string]string{
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
rep = append(rep, api.SectionData{
Type: api.TABLE,
Title: "Payload",
Data: string(repPayload),
})
return rep
@@ -129,63 +139,69 @@ func representMetadataResponse(data map[string]interface{}) []interface{} {
rep = representResponseHeader(data, rep)
payload := data["Payload"].(map[string]interface{})
payload := data["payload"].(map[string]interface{})
topics := ""
if payload["Topics"] != nil {
_topics, _ := json.Marshal(payload["Topics"].([]interface{}))
if payload["topics"] != nil {
_topics, _ := json.Marshal(payload["topics"].([]interface{}))
topics = string(_topics)
}
brokers := ""
if payload["Brokers"] != nil {
_brokers, _ := json.Marshal(payload["Brokers"].([]interface{}))
if payload["brokers"] != nil {
_brokers, _ := json.Marshal(payload["brokers"].([]interface{}))
brokers = string(_brokers)
}
controllerID := ""
clusterID := ""
throttleTimeMs := ""
clusterAuthorizedOperations := ""
if payload["ControllerID"] != nil {
controllerID = fmt.Sprintf("%d", int(payload["ControllerID"].(float64)))
if payload["controllerID"] != nil {
controllerID = fmt.Sprintf("%d", int(payload["controllerID"].(float64)))
}
if payload["ClusterID"] != nil {
clusterID = payload["ClusterID"].(string)
if payload["clusterID"] != nil {
clusterID = payload["clusterID"].(string)
}
if payload["ThrottleTimeMs"] != nil {
throttleTimeMs = fmt.Sprintf("%d", int(payload["ThrottleTimeMs"].(float64)))
if payload["throttleTimeMs"] != nil {
throttleTimeMs = fmt.Sprintf("%d", int(payload["throttleTimeMs"].(float64)))
}
if payload["ClusterAuthorizedOperations"] != nil {
clusterAuthorizedOperations = fmt.Sprintf("%d", int(payload["ClusterAuthorizedOperations"].(float64)))
if payload["clusterAuthorizedOperations"] != nil {
clusterAuthorizedOperations = fmt.Sprintf("%d", int(payload["clusterAuthorizedOperations"].(float64)))
}
repPayload, _ := json.Marshal([]map[string]string{
repPayload, _ := json.Marshal([]api.TableData{
{
"name": "Throttle Time (ms)",
"value": throttleTimeMs,
Name: "Throttle Time (ms)",
Value: throttleTimeMs,
Selector: `response.payload.throttleTimeMs`,
},
{
"name": "Brokers",
"value": brokers,
Name: "Brokers",
Value: brokers,
Selector: `response.payload.brokers`,
},
{
"name": "Cluster ID",
"value": clusterID,
Name: "Cluster ID",
Value: clusterID,
Selector: `response.payload.clusterID`,
},
{
"name": "Controller ID",
"value": controllerID,
Name: "Controller ID",
Value: controllerID,
Selector: `response.payload.controllerID`,
},
{
"name": "Topics",
"value": topics,
Name: "Topics",
Value: topics,
Selector: `response.payload.topics`,
},
{
"name": "Cluster Authorized Operations",
"value": clusterAuthorizedOperations,
Name: "Cluster Authorized Operations",
Value: clusterAuthorizedOperations,
Selector: `response.payload.clusterAuthorizedOperations`,
},
})
rep = append(rep, map[string]string{
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
rep = append(rep, api.SectionData{
Type: api.TABLE,
Title: "Payload",
Data: string(repPayload),
})
return rep
@@ -196,29 +212,31 @@ func representApiVersionsRequest(data map[string]interface{}) []interface{} {
rep = representRequestHeader(data, rep)
payload := data["Payload"].(map[string]interface{})
payload := data["payload"].(map[string]interface{})
clientSoftwareName := ""
clientSoftwareVersion := ""
if payload["ClientSoftwareName"] != nil {
clientSoftwareName = payload["ClientSoftwareName"].(string)
if payload["clientSoftwareName"] != nil {
clientSoftwareName = payload["clientSoftwareName"].(string)
}
if payload["ClientSoftwareVersion"] != nil {
clientSoftwareVersion = payload["ClientSoftwareVersion"].(string)
if payload["clientSoftwareVersion"] != nil {
clientSoftwareVersion = payload["clientSoftwareVersion"].(string)
}
repPayload, _ := json.Marshal([]map[string]string{
repPayload, _ := json.Marshal([]api.TableData{
{
"name": "Client Software Name",
"value": clientSoftwareName,
Name: "Client Software Name",
Value: clientSoftwareName,
Selector: `request.payload.clientSoftwareName`,
},
{
"name": "Client Software Version",
"value": clientSoftwareVersion,
Name: "Client Software Version",
Value: clientSoftwareVersion,
Selector: `request.payload.clientSoftwareVersion`,
},
})
rep = append(rep, map[string]string{
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
rep = append(rep, api.SectionData{
Type: api.TABLE,
Title: "Payload",
Data: string(repPayload),
})
return rep
@@ -229,34 +247,37 @@ func representApiVersionsResponse(data map[string]interface{}) []interface{} {
rep = representResponseHeader(data, rep)
payload := data["Payload"].(map[string]interface{})
payload := data["payload"].(map[string]interface{})
apiKeys := ""
if payload["TopicNames"] != nil {
x, _ := json.Marshal(payload["ApiKeys"].([]interface{}))
if payload["apiKeys"] != nil {
x, _ := json.Marshal(payload["apiKeys"].([]interface{}))
apiKeys = string(x)
}
throttleTimeMs := ""
if payload["ThrottleTimeMs"] != nil {
throttleTimeMs = fmt.Sprintf("%d", int(payload["ThrottleTimeMs"].(float64)))
if payload["throttleTimeMs"] != nil {
throttleTimeMs = fmt.Sprintf("%d", int(payload["throttleTimeMs"].(float64)))
}
repPayload, _ := json.Marshal([]map[string]string{
repPayload, _ := json.Marshal([]api.TableData{
{
"name": "Error Code",
"value": fmt.Sprintf("%d", int(payload["ErrorCode"].(float64))),
Name: "Error Code",
Value: fmt.Sprintf("%d", int(payload["errorCode"].(float64))),
Selector: `response.payload.errorCode`,
},
{
"name": "ApiKeys",
"value": apiKeys,
Name: "ApiKeys",
Value: apiKeys,
Selector: `response.payload.apiKeys`,
},
{
"name": "Throttle Time (ms)",
"value": throttleTimeMs,
Name: "Throttle Time (ms)",
Value: throttleTimeMs,
Selector: `response.payload.throttleTimeMs`,
},
})
rep = append(rep, map[string]string{
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
rep = append(rep, api.SectionData{
Type: api.TABLE,
Title: "Payload",
Data: string(repPayload),
})
return rep
@@ -267,39 +288,43 @@ func representProduceRequest(data map[string]interface{}) []interface{} {
rep = representRequestHeader(data, rep)
payload := data["Payload"].(map[string]interface{})
payload := data["payload"].(map[string]interface{})
topicData := ""
_topicData := payload["TopicData"]
_topicData := payload["topicData"]
if _topicData != nil {
x, _ := json.Marshal(_topicData.([]interface{}))
topicData = string(x)
}
transactionalID := ""
if payload["TransactionalID"] != nil {
transactionalID = payload["TransactionalID"].(string)
if payload["transactionalID"] != nil {
transactionalID = payload["transactionalID"].(string)
}
repPayload, _ := json.Marshal([]map[string]string{
repPayload, _ := json.Marshal([]api.TableData{
{
"name": "Transactional ID",
"value": transactionalID,
Name: "Transactional ID",
Value: transactionalID,
Selector: `request.payload.transactionalID`,
},
{
"name": "Required Acknowledgements",
"value": fmt.Sprintf("%d", int(payload["RequiredAcks"].(float64))),
Name: "Required Acknowledgements",
Value: fmt.Sprintf("%d", int(payload["requiredAcks"].(float64))),
Selector: `request.payload.requiredAcks`,
},
{
"name": "Timeout",
"value": fmt.Sprintf("%d", int(payload["Timeout"].(float64))),
Name: "Timeout",
Value: fmt.Sprintf("%d", int(payload["timeout"].(float64))),
Selector: `request.payload.timeout`,
},
{
"name": "Topic Data",
"value": topicData,
Name: "Topic Data",
Value: topicData,
Selector: `request.payload.topicData`,
},
})
rep = append(rep, map[string]string{
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
rep = append(rep, api.SectionData{
Type: api.TABLE,
Title: "Payload",
Data: string(repPayload),
})
return rep
@@ -310,30 +335,32 @@ func representProduceResponse(data map[string]interface{}) []interface{} {
rep = representResponseHeader(data, rep)
payload := data["Payload"].(map[string]interface{})
payload := data["payload"].(map[string]interface{})
responses := ""
if payload["Responses"] != nil {
_responses, _ := json.Marshal(payload["Responses"].([]interface{}))
if payload["responses"] != nil {
_responses, _ := json.Marshal(payload["responses"].([]interface{}))
responses = string(_responses)
}
throttleTimeMs := ""
if payload["ThrottleTimeMs"] != nil {
throttleTimeMs = fmt.Sprintf("%d", int(payload["ThrottleTimeMs"].(float64)))
if payload["throttleTimeMs"] != nil {
throttleTimeMs = fmt.Sprintf("%d", int(payload["throttleTimeMs"].(float64)))
}
repPayload, _ := json.Marshal([]map[string]string{
repPayload, _ := json.Marshal([]api.TableData{
{
"name": "Responses",
"value": string(responses),
Name: "Responses",
Value: string(responses),
Selector: `response.payload.responses`,
},
{
"name": "Throttle Time (ms)",
"value": throttleTimeMs,
Name: "Throttle Time (ms)",
Value: throttleTimeMs,
Selector: `response.payload.throttleTimeMs`,
},
})
rep = append(rep, map[string]string{
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
rep = append(rep, api.SectionData{
Type: api.TABLE,
Title: "Payload",
Data: string(repPayload),
})
return rep
@@ -344,87 +371,97 @@ func representFetchRequest(data map[string]interface{}) []interface{} {
rep = representRequestHeader(data, rep)
payload := data["Payload"].(map[string]interface{})
payload := data["payload"].(map[string]interface{})
topics := ""
if payload["Topics"] != nil {
_topics, _ := json.Marshal(payload["Topics"].([]interface{}))
if payload["topics"] != nil {
_topics, _ := json.Marshal(payload["topics"].([]interface{}))
topics = string(_topics)
}
replicaId := ""
if payload["ReplicaId"] != nil {
replicaId = fmt.Sprintf("%d", int(payload["ReplicaId"].(float64)))
if payload["replicaId"] != nil {
replicaId = fmt.Sprintf("%d", int(payload["replicaId"].(float64)))
}
maxBytes := ""
if payload["MaxBytes"] != nil {
maxBytes = fmt.Sprintf("%d", int(payload["MaxBytes"].(float64)))
if payload["maxBytes"] != nil {
maxBytes = fmt.Sprintf("%d", int(payload["maxBytes"].(float64)))
}
isolationLevel := ""
if payload["IsolationLevel"] != nil {
isolationLevel = fmt.Sprintf("%d", int(payload["IsolationLevel"].(float64)))
if payload["isolationLevel"] != nil {
isolationLevel = fmt.Sprintf("%d", int(payload["isolationLevel"].(float64)))
}
sessionId := ""
if payload["SessionId"] != nil {
sessionId = fmt.Sprintf("%d", int(payload["SessionId"].(float64)))
if payload["sessionId"] != nil {
sessionId = fmt.Sprintf("%d", int(payload["sessionId"].(float64)))
}
sessionEpoch := ""
if payload["SessionEpoch"] != nil {
sessionEpoch = fmt.Sprintf("%d", int(payload["SessionEpoch"].(float64)))
if payload["sessionEpoch"] != nil {
sessionEpoch = fmt.Sprintf("%d", int(payload["sessionEpoch"].(float64)))
}
forgottenTopicsData := ""
if payload["ForgottenTopicsData"] != nil {
x, _ := json.Marshal(payload["ForgottenTopicsData"].(map[string]interface{}))
if payload["forgottenTopicsData"] != nil {
x, _ := json.Marshal(payload["forgottenTopicsData"].(map[string]interface{}))
forgottenTopicsData = string(x)
}
rackId := ""
if payload["RackId"] != nil {
rackId = payload["RackId"].(string)
if payload["rackId"] != nil {
rackId = payload["rackId"].(string)
}
repPayload, _ := json.Marshal([]map[string]string{
repPayload, _ := json.Marshal([]api.TableData{
{
"name": "Replica ID",
"value": replicaId,
Name: "Replica ID",
Value: replicaId,
Selector: `request.payload.replicaId`,
},
{
"name": "Maximum Wait (ms)",
"value": fmt.Sprintf("%d", int(payload["MaxWaitMs"].(float64))),
Name: "Maximum Wait (ms)",
Value: fmt.Sprintf("%d", int(payload["maxWaitMs"].(float64))),
Selector: `request.payload.maxWaitMs`,
},
{
"name": "Minimum Bytes",
"value": fmt.Sprintf("%d", int(payload["MinBytes"].(float64))),
Name: "Minimum Bytes",
Value: fmt.Sprintf("%d", int(payload["minBytes"].(float64))),
Selector: `request.payload.minBytes`,
},
{
"name": "Maximum Bytes",
"value": maxBytes,
Name: "Maximum Bytes",
Value: maxBytes,
Selector: `request.payload.maxBytes`,
},
{
"name": "Isolation Level",
"value": isolationLevel,
Name: "Isolation Level",
Value: isolationLevel,
Selector: `request.payload.isolationLevel`,
},
{
"name": "Session ID",
"value": sessionId,
Name: "Session ID",
Value: sessionId,
Selector: `request.payload.sessionId`,
},
{
"name": "Session Epoch",
"value": sessionEpoch,
Name: "Session Epoch",
Value: sessionEpoch,
Selector: `request.payload.sessionEpoch`,
},
{
"name": "Topics",
"value": topics,
Name: "Topics",
Value: topics,
Selector: `request.payload.topics`,
},
{
"name": "Forgotten Topics Data",
"value": forgottenTopicsData,
Name: "Forgotten Topics Data",
Value: forgottenTopicsData,
Selector: `request.payload.forgottenTopicsData`,
},
{
"name": "Rack ID",
"value": rackId,
Name: "Rack ID",
Value: rackId,
Selector: `request.payload.rackId`,
},
})
rep = append(rep, map[string]string{
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
rep = append(rep, api.SectionData{
Type: api.TABLE,
Title: "Payload",
Data: string(repPayload),
})
return rep
@@ -435,46 +472,50 @@ func representFetchResponse(data map[string]interface{}) []interface{} {
rep = representResponseHeader(data, rep)
payload := data["Payload"].(map[string]interface{})
payload := data["payload"].(map[string]interface{})
responses := ""
if payload["Responses"] != nil {
_responses, _ := json.Marshal(payload["Responses"].([]interface{}))
if payload["responses"] != nil {
_responses, _ := json.Marshal(payload["responses"].([]interface{}))
responses = string(_responses)
}
throttleTimeMs := ""
if payload["ThrottleTimeMs"] != nil {
throttleTimeMs = fmt.Sprintf("%d", int(payload["ThrottleTimeMs"].(float64)))
if payload["throttleTimeMs"] != nil {
throttleTimeMs = fmt.Sprintf("%d", int(payload["throttleTimeMs"].(float64)))
}
errorCode := ""
if payload["ErrorCode"] != nil {
errorCode = fmt.Sprintf("%d", int(payload["ErrorCode"].(float64)))
if payload["errorCode"] != nil {
errorCode = fmt.Sprintf("%d", int(payload["errorCode"].(float64)))
}
sessionId := ""
if payload["SessionId"] != nil {
sessionId = fmt.Sprintf("%d", int(payload["SessionId"].(float64)))
if payload["sessionId"] != nil {
sessionId = fmt.Sprintf("%d", int(payload["sessionId"].(float64)))
}
repPayload, _ := json.Marshal([]map[string]string{
repPayload, _ := json.Marshal([]api.TableData{
{
"name": "Throttle Time (ms)",
"value": throttleTimeMs,
Name: "Throttle Time (ms)",
Value: throttleTimeMs,
Selector: `response.payload.throttleTimeMs`,
},
{
"name": "Error Code",
"value": errorCode,
Name: "Error Code",
Value: errorCode,
Selector: `response.payload.errorCode`,
},
{
"name": "Session ID",
"value": sessionId,
Name: "Session ID",
Value: sessionId,
Selector: `response.payload.sessionId`,
},
{
"name": "Responses",
"value": responses,
Name: "Responses",
Value: responses,
Selector: `response.payload.responses`,
},
})
rep = append(rep, map[string]string{
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
rep = append(rep, api.SectionData{
Type: api.TABLE,
Title: "Payload",
Data: string(repPayload),
})
return rep
@@ -485,26 +526,28 @@ func representListOffsetsRequest(data map[string]interface{}) []interface{} {
rep = representRequestHeader(data, rep)
payload := data["Payload"].(map[string]interface{})
payload := data["payload"].(map[string]interface{})
topics := ""
if payload["Topics"] != nil {
_topics, _ := json.Marshal(payload["Topics"].([]interface{}))
if payload["topics"] != nil {
_topics, _ := json.Marshal(payload["topics"].([]interface{}))
topics = string(_topics)
}
repPayload, _ := json.Marshal([]map[string]string{
repPayload, _ := json.Marshal([]api.TableData{
{
"name": "Replica ID",
"value": fmt.Sprintf("%d", int(payload["ReplicaId"].(float64))),
Name: "Replica ID",
Value: fmt.Sprintf("%d", int(payload["replicaId"].(float64))),
Selector: `request.payload.replicaId`,
},
{
"name": "Topics",
"value": topics,
Name: "Topics",
Value: topics,
Selector: `request.payload.topics`,
},
})
rep = append(rep, map[string]string{
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
rep = append(rep, api.SectionData{
Type: api.TABLE,
Title: "Payload",
Data: string(repPayload),
})
return rep
@@ -515,26 +558,28 @@ func representListOffsetsResponse(data map[string]interface{}) []interface{} {
rep = representResponseHeader(data, rep)
payload := data["Payload"].(map[string]interface{})
topics, _ := json.Marshal(payload["Topics"].([]interface{}))
payload := data["payload"].(map[string]interface{})
topics, _ := json.Marshal(payload["topics"].([]interface{}))
throttleTimeMs := ""
if payload["ThrottleTimeMs"] != nil {
throttleTimeMs = fmt.Sprintf("%d", int(payload["ThrottleTimeMs"].(float64)))
if payload["throttleTimeMs"] != nil {
throttleTimeMs = fmt.Sprintf("%d", int(payload["throttleTimeMs"].(float64)))
}
repPayload, _ := json.Marshal([]map[string]string{
repPayload, _ := json.Marshal([]api.TableData{
{
"name": "Throttle Time (ms)",
"value": throttleTimeMs,
Name: "Throttle Time (ms)",
Value: throttleTimeMs,
Selector: `response.payload.throttleTimeMs`,
},
{
"name": "Topics",
"value": string(topics),
Name: "Topics",
Value: string(topics),
Selector: `response.payload.topics`,
},
})
rep = append(rep, map[string]string{
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
rep = append(rep, api.SectionData{
Type: api.TABLE,
Title: "Payload",
Data: string(repPayload),
})
return rep
@@ -545,30 +590,33 @@ func representCreateTopicsRequest(data map[string]interface{}) []interface{} {
rep = representRequestHeader(data, rep)
payload := data["Payload"].(map[string]interface{})
topics, _ := json.Marshal(payload["Topics"].([]interface{}))
payload := data["payload"].(map[string]interface{})
topics, _ := json.Marshal(payload["topics"].([]interface{}))
validateOnly := ""
if payload["ValidateOnly"] != nil {
validateOnly = strconv.FormatBool(payload["ValidateOnly"].(bool))
if payload["validateOnly"] != nil {
validateOnly = strconv.FormatBool(payload["validateOnly"].(bool))
}
repPayload, _ := json.Marshal([]map[string]string{
repPayload, _ := json.Marshal([]api.TableData{
{
"name": "Topics",
"value": string(topics),
Name: "Topics",
Value: string(topics),
Selector: `request.payload.topics`,
},
{
"name": "Timeout (ms)",
"value": fmt.Sprintf("%d", int(payload["TimeoutMs"].(float64))),
Name: "Timeout (ms)",
Value: fmt.Sprintf("%d", int(payload["timeoutMs"].(float64))),
Selector: `request.payload.timeoutMs`,
},
{
"name": "Validate Only",
"value": validateOnly,
Name: "Validate Only",
Value: validateOnly,
Selector: `request.payload.validateOnly`,
},
})
rep = append(rep, map[string]string{
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
rep = append(rep, api.SectionData{
Type: api.TABLE,
Title: "Payload",
Data: string(repPayload),
})
return rep
@@ -579,26 +627,28 @@ func representCreateTopicsResponse(data map[string]interface{}) []interface{} {
rep = representResponseHeader(data, rep)
payload := data["Payload"].(map[string]interface{})
topics, _ := json.Marshal(payload["Topics"].([]interface{}))
payload := data["payload"].(map[string]interface{})
topics, _ := json.Marshal(payload["topics"].([]interface{}))
throttleTimeMs := ""
if payload["ThrottleTimeMs"] != nil {
throttleTimeMs = fmt.Sprintf("%d", int(payload["ThrottleTimeMs"].(float64)))
if payload["throttleTimeMs"] != nil {
throttleTimeMs = fmt.Sprintf("%d", int(payload["throttleTimeMs"].(float64)))
}
repPayload, _ := json.Marshal([]map[string]string{
repPayload, _ := json.Marshal([]api.TableData{
{
"name": "Throttle Time (ms)",
"value": throttleTimeMs,
Name: "Throttle Time (ms)",
Value: throttleTimeMs,
Selector: `response.payload.throttleTimeMs`,
},
{
"name": "Topics",
"value": string(topics),
Name: "Topics",
Value: string(topics),
Selector: `response.payload.topics`,
},
})
rep = append(rep, map[string]string{
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
rep = append(rep, api.SectionData{
Type: api.TABLE,
Title: "Payload",
Data: string(repPayload),
})
return rep
@@ -609,35 +659,38 @@ func representDeleteTopicsRequest(data map[string]interface{}) []interface{} {
rep = representRequestHeader(data, rep)
payload := data["Payload"].(map[string]interface{})
payload := data["payload"].(map[string]interface{})
topics := ""
if payload["Topics"] != nil {
x, _ := json.Marshal(payload["Topics"].([]interface{}))
if payload["topics"] != nil {
x, _ := json.Marshal(payload["topics"].([]interface{}))
topics = string(x)
}
topicNames := ""
if payload["TopicNames"] != nil {
x, _ := json.Marshal(payload["TopicNames"].([]interface{}))
if payload["topicNames"] != nil {
x, _ := json.Marshal(payload["topicNames"].([]interface{}))
topicNames = string(x)
}
repPayload, _ := json.Marshal([]map[string]string{
repPayload, _ := json.Marshal([]api.TableData{
{
"name": "TopicNames",
"value": string(topicNames),
Name: "TopicNames",
Value: string(topicNames),
Selector: `request.payload.topicNames`,
},
{
"name": "Topics",
"value": string(topics),
Name: "Topics",
Value: string(topics),
Selector: `request.payload.topics`,
},
{
"name": "Timeout (ms)",
"value": fmt.Sprintf("%d", int(payload["TimeoutMs"].(float64))),
Name: "Timeout (ms)",
Value: fmt.Sprintf("%d", int(payload["timeoutMs"].(float64))),
Selector: `request.payload.timeoutMs`,
},
})
rep = append(rep, map[string]string{
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
rep = append(rep, api.SectionData{
Type: api.TABLE,
Title: "Payload",
Data: string(repPayload),
})
return rep
@@ -648,26 +701,28 @@ func representDeleteTopicsResponse(data map[string]interface{}) []interface{} {
rep = representResponseHeader(data, rep)
payload := data["Payload"].(map[string]interface{})
responses, _ := json.Marshal(payload["Responses"].([]interface{}))
payload := data["payload"].(map[string]interface{})
responses, _ := json.Marshal(payload["responses"].([]interface{}))
throttleTimeMs := ""
if payload["ThrottleTimeMs"] != nil {
throttleTimeMs = fmt.Sprintf("%d", int(payload["ThrottleTimeMs"].(float64)))
if payload["throttleTimeMs"] != nil {
throttleTimeMs = fmt.Sprintf("%d", int(payload["throttleTimeMs"].(float64)))
}
repPayload, _ := json.Marshal([]map[string]string{
repPayload, _ := json.Marshal([]api.TableData{
{
"name": "Throttle Time (ms)",
"value": throttleTimeMs,
Name: "Throttle Time (ms)",
Value: throttleTimeMs,
Selector: `response.payload.throttleTimeMs`,
},
{
"name": "Responses",
"value": string(responses),
Name: "Responses",
Value: string(responses),
Selector: `response.payload.responses`,
},
})
rep = append(rep, map[string]string{
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
rep = append(rep, api.SectionData{
Type: api.TABLE,
Title: "Payload",
Data: string(repPayload),
})
return rep

View File

@@ -15,6 +15,7 @@ var _protocol api.Protocol = api.Protocol{
Name: "kafka",
LongName: "Apache Kafka Protocol",
Abbreviation: "KAFKA",
Macro: "kafka",
Version: "12",
BackgroundColor: "#000000",
ForegroundColor: "#ffffff",
@@ -61,7 +62,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
}
func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *api.MizuEntry {
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.MizuEntry {
request := item.Pair.Request.Payload.(map[string]interface{})
reqDetails := request["details"].(map[string]interface{})
service := "kafka"
@@ -70,76 +71,76 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
} else if resolvedSource != "" {
service = resolvedSource
}
apiKey := ApiKey(reqDetails["ApiKey"].(float64))
apiKey := ApiKey(reqDetails["apiKey"].(float64))
summary := ""
switch apiKey {
case Metadata:
_topics := reqDetails["Payload"].(map[string]interface{})["Topics"]
_topics := reqDetails["payload"].(map[string]interface{})["topics"]
if _topics == nil {
break
}
topics := _topics.([]interface{})
for _, topic := range topics {
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["Name"].(string))
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["name"].(string))
}
if len(summary) > 0 {
summary = summary[:len(summary)-2]
}
break
case ApiVersions:
summary = reqDetails["ClientID"].(string)
summary = reqDetails["clientID"].(string)
break
case Produce:
_topics := reqDetails["Payload"].(map[string]interface{})["TopicData"]
_topics := reqDetails["payload"].(map[string]interface{})["topicData"]
if _topics == nil {
break
}
topics := _topics.([]interface{})
for _, topic := range topics {
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["Topic"].(string))
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["topic"].(string))
}
if len(summary) > 0 {
summary = summary[:len(summary)-2]
}
break
case Fetch:
_topics := reqDetails["Payload"].(map[string]interface{})["Topics"]
_topics := reqDetails["payload"].(map[string]interface{})["topics"]
if _topics == nil {
break
}
topics := _topics.([]interface{})
for _, topic := range topics {
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["Topic"].(string))
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["topic"].(string))
}
if len(summary) > 0 {
summary = summary[:len(summary)-2]
}
break
case ListOffsets:
_topics := reqDetails["Payload"].(map[string]interface{})["Topics"]
_topics := reqDetails["payload"].(map[string]interface{})["topics"]
if _topics == nil {
break
}
topics := _topics.([]interface{})
for _, topic := range topics {
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["Name"].(string))
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["name"].(string))
}
if len(summary) > 0 {
summary = summary[:len(summary)-2]
}
break
case CreateTopics:
topics := reqDetails["Payload"].(map[string]interface{})["Topics"].([]interface{})
topics := reqDetails["payload"].(map[string]interface{})["topics"].([]interface{})
for _, topic := range topics {
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["Name"].(string))
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["name"].(string))
}
if len(summary) > 0 {
summary = summary[:len(summary)-2]
}
break
case DeleteTopics:
topicNames := reqDetails["TopicNames"].([]string)
topicNames := reqDetails["topicNames"].([]string)
for _, name := range topicNames {
summary += fmt.Sprintf("%s, ", name)
}
@@ -148,44 +149,48 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
request["url"] = summary
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
entryBytes, _ := json.Marshal(item.Pair)
return &api.MizuEntry{
ProtocolName: _protocol.Name,
ProtocolLongName: _protocol.LongName,
ProtocolAbbreviation: _protocol.Abbreviation,
ProtocolVersion: _protocol.Version,
ProtocolBackgroundColor: _protocol.BackgroundColor,
ProtocolForegroundColor: _protocol.ForegroundColor,
ProtocolFontSize: _protocol.FontSize,
ProtocolReferenceLink: _protocol.ReferenceLink,
EntryId: entryId,
Entry: string(entryBytes),
Url: fmt.Sprintf("%s%s", service, summary),
Method: apiNames[apiKey],
Status: 0,
RequestSenderIp: item.ConnectionInfo.ClientIP,
Service: service,
Timestamp: item.Timestamp,
ElapsedTime: elapsedTime,
Path: summary,
ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination,
SourceIp: item.ConnectionInfo.ClientIP,
DestinationIp: item.ConnectionInfo.ServerIP,
SourcePort: item.ConnectionInfo.ClientPort,
DestinationPort: item.ConnectionInfo.ServerPort,
IsOutgoing: item.ConnectionInfo.IsOutgoing,
Protocol: _protocol,
Source: &api.TCP{
Name: resolvedSource,
IP: item.ConnectionInfo.ClientIP,
Port: item.ConnectionInfo.ClientPort,
},
Destination: &api.TCP{
Name: resolvedDestination,
IP: item.ConnectionInfo.ServerIP,
Port: item.ConnectionInfo.ServerPort,
},
Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails,
Response: item.Pair.Response.Payload.(map[string]interface{})["details"].(map[string]interface{}),
Url: fmt.Sprintf("%s%s", service, summary),
Method: apiNames[apiKey],
Status: 0,
RequestSenderIp: item.ConnectionInfo.ClientIP,
Service: service,
Timestamp: item.Timestamp,
StartTime: item.Pair.Request.CaptureTime,
ElapsedTime: elapsedTime,
Summary: summary,
ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination,
SourceIp: item.ConnectionInfo.ClientIP,
DestinationIp: item.ConnectionInfo.ServerIP,
SourcePort: item.ConnectionInfo.ClientPort,
DestinationPort: item.ConnectionInfo.ServerPort,
IsOutgoing: item.ConnectionInfo.IsOutgoing,
}
}
func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
return &api.BaseEntryDetails{
Id: entry.EntryId,
Id: entry.Id,
Protocol: _protocol,
Url: entry.Url,
RequestSenderIp: entry.RequestSenderIp,
Service: entry.Service,
Summary: entry.Path,
Summary: entry.Summary,
StatusCode: entry.Status,
Method: entry.Method,
Timestamp: entry.Timestamp,
@@ -202,49 +207,43 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
}
}
func (d dissecting) Represent(entry *api.MizuEntry) (p api.Protocol, object []byte, bodySize int64, err error) {
p = _protocol
func (d dissecting) Represent(protoIn api.Protocol, request map[string]interface{}, response map[string]interface{}) (protoOut api.Protocol, object []byte, bodySize int64, err error) {
protoOut = _protocol
bodySize = 0
var root map[string]interface{}
json.Unmarshal([]byte(entry.Entry), &root)
representation := make(map[string]interface{}, 0)
request := root["request"].(map[string]interface{})["payload"].(map[string]interface{})
response := root["response"].(map[string]interface{})["payload"].(map[string]interface{})
reqDetails := request["details"].(map[string]interface{})
resDetails := response["details"].(map[string]interface{})
apiKey := ApiKey(reqDetails["ApiKey"].(float64))
apiKey := ApiKey(request["apiKey"].(float64))
var repRequest []interface{}
var repResponse []interface{}
switch apiKey {
case Metadata:
repRequest = representMetadataRequest(reqDetails)
repResponse = representMetadataResponse(resDetails)
repRequest = representMetadataRequest(request)
repResponse = representMetadataResponse(response)
break
case ApiVersions:
repRequest = representApiVersionsRequest(reqDetails)
repResponse = representApiVersionsResponse(resDetails)
repRequest = representApiVersionsRequest(request)
repResponse = representApiVersionsResponse(response)
break
case Produce:
repRequest = representProduceRequest(reqDetails)
repResponse = representProduceResponse(resDetails)
repRequest = representProduceRequest(request)
repResponse = representProduceResponse(response)
break
case Fetch:
repRequest = representFetchRequest(reqDetails)
repResponse = representFetchResponse(resDetails)
repRequest = representFetchRequest(request)
repResponse = representFetchResponse(response)
break
case ListOffsets:
repRequest = representListOffsetsRequest(reqDetails)
repResponse = representListOffsetsResponse(resDetails)
repRequest = representListOffsetsRequest(request)
repResponse = representListOffsetsResponse(response)
break
case CreateTopics:
repRequest = representCreateTopicsRequest(reqDetails)
repResponse = representCreateTopicsResponse(resDetails)
repRequest = representCreateTopicsRequest(request)
repResponse = representCreateTopicsResponse(response)
break
case DeleteTopics:
repRequest = representDeleteTopicsRequest(reqDetails)
repResponse = representDeleteTopicsResponse(resDetails)
repRequest = representDeleteTopicsRequest(request)
repResponse = representDeleteTopicsResponse(response)
break
}
@@ -254,4 +253,10 @@ func (d dissecting) Represent(entry *api.MizuEntry) (p api.Protocol, object []by
return
}
func (d dissecting) Macros() map[string]string {
return map[string]string{
`kafka`: fmt.Sprintf(`proto.abbr == "%s"`, _protocol.Abbreviation),
}
}
var Dissector dissecting

View File

@@ -10,13 +10,13 @@ import (
)
type Request struct {
Size int32
ApiKey ApiKey
ApiVersion int16
CorrelationID int32
ClientID string
Payload interface{}
CaptureTime time.Time
Size int32 `json:"size"`
ApiKey ApiKey `json:"apiKey"`
ApiVersion int16 `json:"apiVersion"`
CorrelationID int32 `json:"correlationID"`
ClientID string `json:"clientID"`
Payload interface{} `json:"payload"`
CaptureTime time.Time `json:"captureTime"`
}
func ReadRequest(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer) (apiKey ApiKey, apiVersion int16, err error) {

View File

@@ -10,10 +10,10 @@ import (
)
type Response struct {
Size int32
CorrelationID int32
Payload interface{}
CaptureTime time.Time
Size int32 `json:"size"`
CorrelationID int32 `json:"correlationID"`
Payload interface{} `json:"payload"`
CaptureTime time.Time `json:"captureTime"`
}
func ReadResponse(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter) (err error) {

File diff suppressed because it is too large Load Diff