mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-08-11 21:32:37 +00:00
59 lines
1.5 KiB
Go
59 lines
1.5 KiB
Go
package kafka
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
var reqResMatcher = CreateResponseRequestMatcher() // global
|
|
const maxTry int = 3000
|
|
|
|
type RequestResponsePair struct {
|
|
Request Request
|
|
Response Response
|
|
}
|
|
|
|
// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}::{correlation_id}
|
|
type requestResponseMatcher struct {
|
|
openMessagesMap *sync.Map
|
|
}
|
|
|
|
func CreateResponseRequestMatcher() requestResponseMatcher {
|
|
newMatcher := &requestResponseMatcher{openMessagesMap: &sync.Map{}}
|
|
return *newMatcher
|
|
}
|
|
|
|
func (matcher *requestResponseMatcher) registerRequest(key string, request *Request) *RequestResponsePair {
|
|
if response, found := matcher.openMessagesMap.LoadAndDelete(key); found {
|
|
// Check for a situation that only occurs when a Kafka broker is initiating
|
|
switch response.(type) {
|
|
case *Response:
|
|
return matcher.preparePair(request, response.(*Response))
|
|
}
|
|
}
|
|
|
|
matcher.openMessagesMap.Store(key, request)
|
|
return nil
|
|
}
|
|
|
|
func (matcher *requestResponseMatcher) registerResponse(key string, response *Response) *RequestResponsePair {
|
|
try := 0
|
|
for {
|
|
try++
|
|
if try > maxTry {
|
|
return nil
|
|
}
|
|
if request, found := matcher.openMessagesMap.LoadAndDelete(key); found {
|
|
return matcher.preparePair(request.(*Request), response)
|
|
}
|
|
time.Sleep(1 * time.Millisecond)
|
|
}
|
|
}
|
|
|
|
func (matcher *requestResponseMatcher) preparePair(request *Request, response *Response) *RequestResponsePair {
|
|
return &RequestResponsePair{
|
|
Request: *request,
|
|
Response: *response,
|
|
}
|
|
}
|