Files
kubeshark/tap/extensions/redis/handlers.go
M. Mert Yıldıran 041223b558 Move the request-response matcher's scope from global-level to TCP stream-level (#793)
* Create a new request-response matcher for each TCP stream

* Fix the `ident` formats in request-response matchers

* Don't sort the items in the HTTP tests

* Update tap/extensions/kafka/matcher.go

Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>

* Temporarily change the bucket folder to the new expected

* Bring back the `deleteOlderThan` method

* Use `api.RequestResponseMatcher` instead of `interface{}` as type

* Use `api.RequestResponseMatcher` instead of `interface{}` as type (more)

* Update the key format comments

Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
2022-02-14 18:16:58 +03:00

66 lines
1.5 KiB
Go

package redis
import (
"fmt"
"github.com/up9inc/mizu/tap/api"
)
func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket, reqResMatcher *requestResponseMatcher) error {
counterPair.Lock()
counterPair.Request++
requestCounter := counterPair.Request
counterPair.Unlock()
ident := fmt.Sprintf(
"%s_%s_%s_%s_%d",
tcpID.SrcIP,
tcpID.DstIP,
tcpID.SrcPort,
tcpID.DstPort,
requestCounter,
)
item := reqResMatcher.registerRequest(ident, request, superTimer.CaptureTime)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.SrcIP,
ClientPort: tcpID.SrcPort,
ServerIP: tcpID.DstIP,
ServerPort: tcpID.DstPort,
IsOutgoing: true,
}
emitter.Emit(item)
}
return nil
}
func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket, reqResMatcher *requestResponseMatcher) error {
counterPair.Lock()
counterPair.Response++
responseCounter := counterPair.Response
counterPair.Unlock()
ident := fmt.Sprintf(
"%s_%s_%s_%s_%d",
tcpID.DstIP,
tcpID.SrcIP,
tcpID.DstPort,
tcpID.SrcPort,
responseCounter,
)
item := reqResMatcher.registerResponse(ident, response, superTimer.CaptureTime)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.DstIP,
ClientPort: tcpID.DstPort,
ServerIP: tcpID.SrcIP,
ServerPort: tcpID.SrcPort,
IsOutgoing: false,
}
emitter.Emit(item)
}
return nil
}