mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-09-26 13:04:13 +00:00
* Add a PCAP based testbed * Fix typos * Download PCAPs from the Google Cloud bucket * Add a Python script to automate the PCAP testbed * Dump the test suite into a file named `suite.json` * Serialize entries separately * Dissect individual TCP streams one by one through separate PCAP files * Improve the reliability a little bit * Ditch the individual TCP streams idea * Fix some issues in Kafka * Print the total number of packets and TCP streams * Fix an interface conversion error in AMQP * Print the total number of returning items from the dissectors * Print the total number of returning items from the dissectors really * Fix a possible race condition * Do atomic increments just to be sure * Print the total number of Redis `Dissect` calls * Improve the request-response matching in Redis by including the TCP stream ID * Update the request-response pair matching key format in HTTP and Kafka * Rearrange the test suite * Add more queries to the test suite * Remove the debug prints * Add the assertions * Close the WebSocket connection faster * Make `MIZU_TEST` enviroment variable a shared constant * Add `test-lint` rule * Fix several issues in Kafka * Update the test suite * Add more queries * Fix the `test-lint` rule * Exit only after PCAP EOF * Add more queries * Update `suite.json` * Make the tests more stable * Revert the bad changes * Run `go mod tidy` on `tap/`
68 lines
1.5 KiB
Go
68 lines
1.5 KiB
Go
package redis
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
"github.com/up9inc/mizu/tap/api"
|
|
)
|
|
|
|
func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket) error {
|
|
counterPair.Lock()
|
|
counterPair.Request++
|
|
requestCounter := counterPair.Request
|
|
counterPair.Unlock()
|
|
|
|
ident := fmt.Sprintf(
|
|
"%d_%s:%s_%s:%s_%d",
|
|
counterPair.StreamId,
|
|
tcpID.SrcIP,
|
|
tcpID.DstIP,
|
|
tcpID.SrcPort,
|
|
tcpID.DstPort,
|
|
requestCounter,
|
|
)
|
|
|
|
item := reqResMatcher.registerRequest(ident, request, superTimer.CaptureTime)
|
|
if item != nil {
|
|
item.ConnectionInfo = &api.ConnectionInfo{
|
|
ClientIP: tcpID.SrcIP,
|
|
ClientPort: tcpID.SrcPort,
|
|
ServerIP: tcpID.DstIP,
|
|
ServerPort: tcpID.DstPort,
|
|
IsOutgoing: true,
|
|
}
|
|
emitter.Emit(item)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket) error {
|
|
counterPair.Lock()
|
|
counterPair.Response++
|
|
responseCounter := counterPair.Response
|
|
counterPair.Unlock()
|
|
|
|
ident := fmt.Sprintf(
|
|
"%d_%s:%s_%s:%s_%d",
|
|
counterPair.StreamId,
|
|
tcpID.DstIP,
|
|
tcpID.SrcIP,
|
|
tcpID.DstPort,
|
|
tcpID.SrcPort,
|
|
responseCounter,
|
|
)
|
|
|
|
item := reqResMatcher.registerResponse(ident, response, superTimer.CaptureTime)
|
|
if item != nil {
|
|
item.ConnectionInfo = &api.ConnectionInfo{
|
|
ClientIP: tcpID.DstIP,
|
|
ClientPort: tcpID.DstPort,
|
|
ServerIP: tcpID.SrcIP,
|
|
ServerPort: tcpID.SrcPort,
|
|
IsOutgoing: false,
|
|
}
|
|
emitter.Emit(item)
|
|
}
|
|
return nil
|
|
}
|