diff --git a/tap/extensions/kafka/go.mod b/tap/extensions/kafka/go.mod index 80734ef05..c3808e9ab 100644 --- a/tap/extensions/kafka/go.mod +++ b/tap/extensions/kafka/go.mod @@ -3,7 +3,8 @@ module github.com/up9inc/mizu/tap/extensions/kafka go 1.16 require ( - github.com/up9inc/mizu/tap/api v0.0.0 + github.com/segmentio/kafka-go v0.4.17 // indirect + github.com/up9inc/mizu/tap/api v0.0.0 ) replace github.com/up9inc/mizu/tap/api v0.0.0 => ../../api diff --git a/tap/extensions/kafka/go.sum b/tap/extensions/kafka/go.sum new file mode 100644 index 000000000..9aeb88d28 --- /dev/null +++ b/tap/extensions/kafka/go.sum @@ -0,0 +1,29 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA= +github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A= +github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/segmentio/kafka-go v0.4.17 h1:IyqRstL9KUTDb3kyGPOOa5VffokKWSEzN6geJ92dSDY= +github.com/segmentio/kafka-go v0.4.17/go.mod h1:19+Eg7KwrNKy/PFhiIthEPkO8k+ac7/ZYXwYM9Df10w= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= +github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index 745b5332e..942ae56fc 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -7,7 +7,7 @@ import ( "github.com/up9inc/mizu/tap/api" ) -var protocol api.Protocol = api.Protocol{ +var _protocol api.Protocol = api.Protocol{ Name: "kafka", LongName: "Apache Kafka Protocol", Abbreviation: "KAFKA", @@ -25,11 +25,11 @@ func init() { type dissecting string func (d dissecting) Register(extension *api.Extension) { - extension.Protocol = protocol + extension.Protocol = _protocol } func (d dissecting) Ping() { - log.Printf("pong %s\n", protocol.Name) + log.Printf("pong %s\n", _protocol.Name) } func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) { diff --git a/tap/extensions/kafka/matcher.go b/tap/extensions/kafka/matcher.go index 83be3401b..17b86b48d 100644 --- a/tap/extensions/kafka/matcher.go +++ b/tap/extensions/kafka/matcher.go @@ -2,8 +2,7 @@ package main import ( "log" - - cmap "github.com/orcaman/concurrent-map" + "sync" ) var reqResMatcher = CreateResponseRequestMatcher() // global @@ -15,29 +14,29 @@ type RequestResponsePair struct { // Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}::{correlation_id} type requestResponseMatcher struct { - openMessagesMap cmap.ConcurrentMap + openMessagesMap sync.Map } func CreateResponseRequestMatcher() requestResponseMatcher { - newMatcher := &requestResponseMatcher{openMessagesMap: cmap.New()} + newMatcher := &requestResponseMatcher{openMessagesMap: sync.Map{}} return *newMatcher } func (matcher *requestResponseMatcher) registerRequest(key string, request *Request) *RequestResponsePair { - if response, found := matcher.openMessagesMap.Pop(key); found { + if response, found := matcher.openMessagesMap.LoadAndDelete(key); found { return matcher.preparePair(request, response.(*Response)) } - matcher.openMessagesMap.Set(key, request) + matcher.openMessagesMap.Store(key, &request) return nil } func (matcher *requestResponseMatcher) registerResponse(key string, response *Response) *RequestResponsePair { - if request, found := matcher.openMessagesMap.Pop(key); found { + if request, found := matcher.openMessagesMap.LoadAndDelete(key); found { return matcher.preparePair(request.(*Request), response) } - matcher.openMessagesMap.Set(key, response) + matcher.openMessagesMap.Store(key, &response) return nil } diff --git a/tap/extensions/kafka/request.go b/tap/extensions/kafka/request.go index a4f196897..4a585aace 100644 --- a/tap/extensions/kafka/request.go +++ b/tap/extensions/kafka/request.go @@ -6,7 +6,7 @@ import ( "log" "reflect" - "github.com/google/gopacket" + "github.com/up9inc/mizu/tap/api" ) type Request struct { @@ -27,7 +27,7 @@ func (req *Request) print() { log.Printf("Payload: %+v\n", req.Payload) } -func ReadRequest(r io.Reader, net gopacket.Flow, transport gopacket.Flow) (apiKey ApiKey, apiVersion int16, err error) { +func ReadRequest(r io.Reader, tcpID *api.TcpID) (apiKey ApiKey, apiVersion int16, err error) { d := &decoder{reader: r, remain: 4} size := d.readInt32() @@ -220,10 +220,10 @@ func ReadRequest(r io.Reader, net gopacket.Flow, transport gopacket.Flow) (apiKe key := fmt.Sprintf( "%s:%s->%s:%s::%d", - net.Src().String(), - transport.Src().String(), - net.Dst().String(), - transport.Dst().String(), + tcpID.SrcIP, + tcpID.SrcPort, + tcpID.DstIP, + tcpID.DstPort, correlationID, ) // fmt.Printf("key: %v\n", key) diff --git a/tap/extensions/kafka/response.go b/tap/extensions/kafka/response.go index 31a1935f9..65ae9334d 100644 --- a/tap/extensions/kafka/response.go +++ b/tap/extensions/kafka/response.go @@ -6,7 +6,7 @@ import ( "log" "reflect" - "github.com/google/gopacket" + "github.com/up9inc/mizu/tap/api" ) type Response struct { @@ -21,7 +21,7 @@ func (res *Response) print() { log.Printf("Payload: %+v\n", res.Payload) } -func ReadResponse(r io.Reader, net gopacket.Flow, transport gopacket.Flow) (err error) { +func ReadResponse(r io.Reader, tcpID *api.TcpID) (err error) { d := &decoder{reader: r, remain: 4} size := d.readInt32() @@ -41,10 +41,10 @@ func ReadResponse(r io.Reader, net gopacket.Flow, transport gopacket.Flow) (err key := fmt.Sprintf( "%s:%s->%s:%s::%d", - net.Src().String(), - transport.Src().String(), - net.Dst().String(), - transport.Dst().String(), + tcpID.SrcIP, + tcpID.SrcPort, + tcpID.DstIP, + tcpID.DstPort, correlationID, ) // fmt.Printf("key: %v\n", key)