Implement Dissect method of Kafka and adapt request-response pair matcher to asynchronous client-server stream

This commit is contained in:
M. Mert Yildiran 2021-08-22 10:44:53 +03:00
parent 6d69bdbc13
commit f6a532a5b5
No known key found for this signature in database
GPG Key ID: D42ADB236521BF7A
4 changed files with 52 additions and 41 deletions

View File

@ -33,7 +33,11 @@ func (d dissecting) Ping() {
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) {
// TODO: Implement
if isClient {
ReadRequest(b, tcpID)
} else {
ReadResponse(b, tcpID, emitter)
}
}
func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *api.MizuEntry {

View File

@ -3,9 +3,11 @@ package main
import (
"log"
"sync"
"time"
)
var reqResMatcher = CreateResponseRequestMatcher() // global
const maxTry int = 3000
type RequestResponsePair struct {
Request Request
@ -27,17 +29,22 @@ func (matcher *requestResponseMatcher) registerRequest(key string, request *Requ
return matcher.preparePair(request, response.(*Response))
}
matcher.openMessagesMap.Store(key, &request)
matcher.openMessagesMap.Store(key, request)
return nil
}
func (matcher *requestResponseMatcher) registerResponse(key string, response *Response) *RequestResponsePair {
try := 0
for {
try++
if try > maxTry {
return nil
}
if request, found := matcher.openMessagesMap.LoadAndDelete(key); found {
return matcher.preparePair(request.(*Request), response)
}
matcher.openMessagesMap.Store(key, &response)
return nil
time.Sleep(1 * time.Millisecond)
}
}
func (matcher *requestResponseMatcher) preparePair(request *Request, response *Response) *RequestResponsePair {
@ -47,8 +54,19 @@ func (matcher *requestResponseMatcher) preparePair(request *Request, response *R
}
}
func (reqResPair *RequestResponsePair) print() {
log.Printf("----------------\n")
reqResPair.Request.print()
reqResPair.Response.print()
func (reqResPair *RequestResponsePair) debug() {
req := reqResPair.Request
res := reqResPair.Response
log.Printf(
"\n----------------\n> Request [%d]\nApiKey: %v\nApiVersion: %v\nCorrelationID: %v\nClientID: %v\nPayload: %+v\n> Response [%d]\nCorrelationID: %v\nPayload: %+v\n",
req.Size,
req.ApiKey,
req.ApiVersion,
req.CorrelationID,
req.ClientID,
req.Payload,
res.Size,
res.CorrelationID,
res.Payload,
)
}

View File

@ -18,22 +18,13 @@ type Request struct {
Payload interface{}
}
func (req *Request) print() {
log.Printf("> Request [%d]\n", req.Size)
log.Printf("ApiKey: %v\n", req.ApiKey)
log.Printf("ApiVersion: %v\n", req.ApiVersion)
log.Printf("CorrelationID: %v\n", req.CorrelationID)
log.Printf("ClientID: %v\n", req.ClientID)
log.Printf("Payload: %+v\n", req.Payload)
}
func ReadRequest(r io.Reader, tcpID *api.TcpID) (apiKey ApiKey, apiVersion int16, err error) {
d := &decoder{reader: r, remain: 4}
size := d.readInt32()
if err = d.err; err != nil {
err = dontExpectEOF(err)
return
return 0, 0, err
}
d.remain = int(size)
@ -44,18 +35,18 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID) (apiKey ApiKey, apiVersion int16
if i := int(apiKey); i < 0 || i >= len(apiTypes) {
err = fmt.Errorf("unsupported api key: %d", i)
return
return apiKey, 0, err
}
if err = d.err; err != nil {
err = dontExpectEOF(err)
return
return apiKey, 0, err
}
t := &apiTypes[apiKey]
if t == nil {
err = fmt.Errorf("unsupported api: %s", apiNames[apiKey])
return
return apiKey, 0, err
}
var payload interface{}
@ -226,12 +217,11 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID) (apiKey ApiKey, apiVersion int16
tcpID.DstPort,
correlationID,
)
// fmt.Printf("key: %v\n", key)
reqResMatcher.registerRequest(key, request)
d.discardAll()
return
return apiKey, apiVersion, nil
}
func WriteRequest(w io.Writer, apiVersion int16, correlationID int32, clientID string, msg Message) error {

View File

@ -1,6 +1,7 @@
package main
import (
"errors"
"fmt"
"io"
"log"
@ -15,19 +16,13 @@ type Response struct {
Payload interface{}
}
func (res *Response) print() {
log.Printf("> Response [%d]\n", res.Size)
log.Printf("CorrelationID: %v\n", res.CorrelationID)
log.Printf("Payload: %+v\n", res.Payload)
}
func ReadResponse(r io.Reader, tcpID *api.TcpID) (err error) {
func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error) {
d := &decoder{reader: r, remain: 4}
size := d.readInt32()
if err = d.err; err != nil {
err = dontExpectEOF(err)
return
return err
}
d.remain = int(size)
@ -41,14 +36,17 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID) (err error) {
key := fmt.Sprintf(
"%s:%s->%s:%s::%d",
tcpID.SrcIP,
tcpID.SrcPort,
tcpID.DstIP,
tcpID.DstPort,
tcpID.SrcIP,
tcpID.SrcPort,
correlationID,
)
// fmt.Printf("key: %v\n", key)
reqResPair := reqResMatcher.registerResponse(key, response)
if reqResPair == nil {
d.discardAll()
return errors.New("Couldn't match a Kafka response to a Kafka request in 3 seconds!")
}
apiKey := reqResPair.Request.ApiKey
apiVersion := reqResPair.Request.ApiVersion
@ -245,22 +243,23 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID) (err error) {
break
}
reqResPair.print()
reqResPair.debug()
// emitter.Emit(item)
if i := int(apiKey); i < 0 || i >= len(apiTypes) {
err = fmt.Errorf("unsupported api key: %d", i)
return
return err
}
t := &apiTypes[apiKey]
if t == nil {
err = fmt.Errorf("unsupported api: %s", apiNames[apiKey])
return
return err
}
d.discardAll()
return
return nil
}
func WriteResponse(w io.Writer, apiVersion int16, correlationID int32, msg Message) error {