diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index 942ae56fc..233521827 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -33,7 +33,11 @@ func (d dissecting) Ping() { } func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) { - // TODO: Implement + if isClient { + ReadRequest(b, tcpID) + } else { + ReadResponse(b, tcpID, emitter) + } } func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *api.MizuEntry { diff --git a/tap/extensions/kafka/matcher.go b/tap/extensions/kafka/matcher.go index 17b86b48d..e5f2b2e4e 100644 --- a/tap/extensions/kafka/matcher.go +++ b/tap/extensions/kafka/matcher.go @@ -3,9 +3,11 @@ package main import ( "log" "sync" + "time" ) var reqResMatcher = CreateResponseRequestMatcher() // global +const maxTry int = 3000 type RequestResponsePair struct { Request Request @@ -27,17 +29,22 @@ func (matcher *requestResponseMatcher) registerRequest(key string, request *Requ return matcher.preparePair(request, response.(*Response)) } - matcher.openMessagesMap.Store(key, &request) + matcher.openMessagesMap.Store(key, request) return nil } func (matcher *requestResponseMatcher) registerResponse(key string, response *Response) *RequestResponsePair { - if request, found := matcher.openMessagesMap.LoadAndDelete(key); found { - return matcher.preparePair(request.(*Request), response) + try := 0 + for { + try++ + if try > maxTry { + return nil + } + if request, found := matcher.openMessagesMap.LoadAndDelete(key); found { + return matcher.preparePair(request.(*Request), response) + } + time.Sleep(1 * time.Millisecond) } - - matcher.openMessagesMap.Store(key, &response) - return nil } func (matcher *requestResponseMatcher) preparePair(request *Request, response *Response) *RequestResponsePair { @@ -47,8 +54,19 @@ func (matcher *requestResponseMatcher) preparePair(request *Request, response *R } } -func (reqResPair *RequestResponsePair) print() { - log.Printf("----------------\n") - reqResPair.Request.print() - reqResPair.Response.print() +func (reqResPair *RequestResponsePair) debug() { + req := reqResPair.Request + res := reqResPair.Response + log.Printf( + "\n----------------\n> Request [%d]\nApiKey: %v\nApiVersion: %v\nCorrelationID: %v\nClientID: %v\nPayload: %+v\n> Response [%d]\nCorrelationID: %v\nPayload: %+v\n", + req.Size, + req.ApiKey, + req.ApiVersion, + req.CorrelationID, + req.ClientID, + req.Payload, + res.Size, + res.CorrelationID, + res.Payload, + ) } diff --git a/tap/extensions/kafka/request.go b/tap/extensions/kafka/request.go index 4a585aace..f6a9d04fd 100644 --- a/tap/extensions/kafka/request.go +++ b/tap/extensions/kafka/request.go @@ -18,22 +18,13 @@ type Request struct { Payload interface{} } -func (req *Request) print() { - log.Printf("> Request [%d]\n", req.Size) - log.Printf("ApiKey: %v\n", req.ApiKey) - log.Printf("ApiVersion: %v\n", req.ApiVersion) - log.Printf("CorrelationID: %v\n", req.CorrelationID) - log.Printf("ClientID: %v\n", req.ClientID) - log.Printf("Payload: %+v\n", req.Payload) -} - func ReadRequest(r io.Reader, tcpID *api.TcpID) (apiKey ApiKey, apiVersion int16, err error) { d := &decoder{reader: r, remain: 4} size := d.readInt32() if err = d.err; err != nil { err = dontExpectEOF(err) - return + return 0, 0, err } d.remain = int(size) @@ -44,18 +35,18 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID) (apiKey ApiKey, apiVersion int16 if i := int(apiKey); i < 0 || i >= len(apiTypes) { err = fmt.Errorf("unsupported api key: %d", i) - return + return apiKey, 0, err } if err = d.err; err != nil { err = dontExpectEOF(err) - return + return apiKey, 0, err } t := &apiTypes[apiKey] if t == nil { err = fmt.Errorf("unsupported api: %s", apiNames[apiKey]) - return + return apiKey, 0, err } var payload interface{} @@ -226,12 +217,11 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID) (apiKey ApiKey, apiVersion int16 tcpID.DstPort, correlationID, ) - // fmt.Printf("key: %v\n", key) reqResMatcher.registerRequest(key, request) d.discardAll() - return + return apiKey, apiVersion, nil } func WriteRequest(w io.Writer, apiVersion int16, correlationID int32, clientID string, msg Message) error { diff --git a/tap/extensions/kafka/response.go b/tap/extensions/kafka/response.go index 65ae9334d..6931d822d 100644 --- a/tap/extensions/kafka/response.go +++ b/tap/extensions/kafka/response.go @@ -1,6 +1,7 @@ package main import ( + "errors" "fmt" "io" "log" @@ -15,19 +16,13 @@ type Response struct { Payload interface{} } -func (res *Response) print() { - log.Printf("> Response [%d]\n", res.Size) - log.Printf("CorrelationID: %v\n", res.CorrelationID) - log.Printf("Payload: %+v\n", res.Payload) -} - -func ReadResponse(r io.Reader, tcpID *api.TcpID) (err error) { +func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error) { d := &decoder{reader: r, remain: 4} size := d.readInt32() if err = d.err; err != nil { err = dontExpectEOF(err) - return + return err } d.remain = int(size) @@ -41,14 +36,17 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID) (err error) { key := fmt.Sprintf( "%s:%s->%s:%s::%d", - tcpID.SrcIP, - tcpID.SrcPort, tcpID.DstIP, tcpID.DstPort, + tcpID.SrcIP, + tcpID.SrcPort, correlationID, ) - // fmt.Printf("key: %v\n", key) reqResPair := reqResMatcher.registerResponse(key, response) + if reqResPair == nil { + d.discardAll() + return errors.New("Couldn't match a Kafka response to a Kafka request in 3 seconds!") + } apiKey := reqResPair.Request.ApiKey apiVersion := reqResPair.Request.ApiVersion @@ -245,22 +243,23 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID) (err error) { break } - reqResPair.print() + reqResPair.debug() + // emitter.Emit(item) if i := int(apiKey); i < 0 || i >= len(apiTypes) { err = fmt.Errorf("unsupported api key: %d", i) - return + return err } t := &apiTypes[apiKey] if t == nil { err = fmt.Errorf("unsupported api: %s", apiNames[apiKey]) - return + return err } d.discardAll() - return + return nil } func WriteResponse(w io.Writer, apiVersion int16, correlationID int32, msg Message) error {