mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-08-05 10:41:36 +00:00
* Add unit tests for Kafka dissector * Return `io.EOF` if request or response header size is zero * Sort the slice in `representMapAsTable` * Remove the dead code * Remove more dead code * Remove more dead code * Fix `dissector.Analyze` call Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
66 lines
1.6 KiB
Go
66 lines
1.6 KiB
Go
package kafka
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/up9inc/mizu/tap/api"
|
|
)
|
|
|
|
type RequestResponsePair struct {
|
|
Request Request
|
|
Response Response
|
|
}
|
|
|
|
// Key is {client_addr}_{client_port}_{dest_addr}_{dest_port}_{correlation_id}
|
|
type requestResponseMatcher struct {
|
|
openMessagesMap *sync.Map
|
|
maxTry int
|
|
}
|
|
|
|
func createResponseRequestMatcher() api.RequestResponseMatcher {
|
|
return &requestResponseMatcher{openMessagesMap: &sync.Map{}, maxTry: 3000}
|
|
}
|
|
|
|
func (matcher *requestResponseMatcher) GetMap() *sync.Map {
|
|
return matcher.openMessagesMap
|
|
}
|
|
|
|
func (matcher *requestResponseMatcher) SetMaxTry(value int) {
|
|
matcher.maxTry = value
|
|
}
|
|
|
|
func (matcher *requestResponseMatcher) registerRequest(key string, request *Request) *RequestResponsePair {
|
|
if response, found := matcher.openMessagesMap.LoadAndDelete(key); found {
|
|
// Check for a situation that only occurs when a Kafka broker is initiating
|
|
switch v := response.(type) {
|
|
case *Response:
|
|
return matcher.preparePair(request, v)
|
|
}
|
|
}
|
|
|
|
matcher.openMessagesMap.Store(key, request)
|
|
return nil
|
|
}
|
|
|
|
func (matcher *requestResponseMatcher) registerResponse(key string, response *Response) *RequestResponsePair {
|
|
try := 0
|
|
for {
|
|
try++
|
|
if try > matcher.maxTry {
|
|
return nil
|
|
}
|
|
if request, found := matcher.openMessagesMap.LoadAndDelete(key); found {
|
|
return matcher.preparePair(request.(*Request), response)
|
|
}
|
|
time.Sleep(1 * time.Millisecond)
|
|
}
|
|
}
|
|
|
|
func (matcher *requestResponseMatcher) preparePair(request *Request, response *Response) *RequestResponsePair {
|
|
return &RequestResponsePair{
|
|
Request: *request,
|
|
Response: *response,
|
|
}
|
|
}
|