mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-09-27 21:38:06 +00:00
* Create a new request-response matcher for each TCP stream * Fix the `ident` formats in request-response matchers * Don't sort the items in the HTTP tests * Update tap/extensions/kafka/matcher.go Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com> * Temporarily change the bucket folder to the new expected * Bring back the `deleteOlderThan` method * Use `api.RequestResponseMatcher` instead of `interface{}` as type * Use `api.RequestResponseMatcher` instead of `interface{}` as type (more) * Update the key format comments Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
66 lines
1.5 KiB
Go
66 lines
1.5 KiB
Go
package redis
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
"github.com/up9inc/mizu/tap/api"
|
|
)
|
|
|
|
func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket, reqResMatcher *requestResponseMatcher) error {
|
|
counterPair.Lock()
|
|
counterPair.Request++
|
|
requestCounter := counterPair.Request
|
|
counterPair.Unlock()
|
|
|
|
ident := fmt.Sprintf(
|
|
"%s_%s_%s_%s_%d",
|
|
tcpID.SrcIP,
|
|
tcpID.DstIP,
|
|
tcpID.SrcPort,
|
|
tcpID.DstPort,
|
|
requestCounter,
|
|
)
|
|
|
|
item := reqResMatcher.registerRequest(ident, request, superTimer.CaptureTime)
|
|
if item != nil {
|
|
item.ConnectionInfo = &api.ConnectionInfo{
|
|
ClientIP: tcpID.SrcIP,
|
|
ClientPort: tcpID.SrcPort,
|
|
ServerIP: tcpID.DstIP,
|
|
ServerPort: tcpID.DstPort,
|
|
IsOutgoing: true,
|
|
}
|
|
emitter.Emit(item)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket, reqResMatcher *requestResponseMatcher) error {
|
|
counterPair.Lock()
|
|
counterPair.Response++
|
|
responseCounter := counterPair.Response
|
|
counterPair.Unlock()
|
|
|
|
ident := fmt.Sprintf(
|
|
"%s_%s_%s_%s_%d",
|
|
tcpID.DstIP,
|
|
tcpID.SrcIP,
|
|
tcpID.DstPort,
|
|
tcpID.SrcPort,
|
|
responseCounter,
|
|
)
|
|
|
|
item := reqResMatcher.registerResponse(ident, response, superTimer.CaptureTime)
|
|
if item != nil {
|
|
item.ConnectionInfo = &api.ConnectionInfo{
|
|
ClientIP: tcpID.DstIP,
|
|
ClientPort: tcpID.DstPort,
|
|
ServerIP: tcpID.SrcIP,
|
|
ServerPort: tcpID.SrcPort,
|
|
IsOutgoing: false,
|
|
}
|
|
emitter.Emit(item)
|
|
}
|
|
return nil
|
|
}
|