Fix the build errors in Kafka Go files

This commit is contained in:
M. Mert Yildiran
2021-08-22 09:03:36 +03:00
parent a598256576
commit 6d69bdbc13
6 changed files with 53 additions and 24 deletions

View File

@@ -3,7 +3,8 @@ module github.com/up9inc/mizu/tap/extensions/kafka
go 1.16
require (
github.com/up9inc/mizu/tap/api v0.0.0
github.com/segmentio/kafka-go v0.4.17 // indirect
github.com/up9inc/mizu/tap/api v0.0.0
)
replace github.com/up9inc/mizu/tap/api v0.0.0 => ../../api

View File

@@ -0,0 +1,29 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA=
github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A=
github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/segmentio/kafka-go v0.4.17 h1:IyqRstL9KUTDb3kyGPOOa5VffokKWSEzN6geJ92dSDY=
github.com/segmentio/kafka-go v0.4.17/go.mod h1:19+Eg7KwrNKy/PFhiIthEPkO8k+ac7/ZYXwYM9Df10w=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -7,7 +7,7 @@ import (
"github.com/up9inc/mizu/tap/api"
)
var protocol api.Protocol = api.Protocol{
var _protocol api.Protocol = api.Protocol{
Name: "kafka",
LongName: "Apache Kafka Protocol",
Abbreviation: "KAFKA",
@@ -25,11 +25,11 @@ func init() {
type dissecting string
func (d dissecting) Register(extension *api.Extension) {
extension.Protocol = protocol
extension.Protocol = _protocol
}
func (d dissecting) Ping() {
log.Printf("pong %s\n", protocol.Name)
log.Printf("pong %s\n", _protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) {

View File

@@ -2,8 +2,7 @@ package main
import (
"log"
cmap "github.com/orcaman/concurrent-map"
"sync"
)
var reqResMatcher = CreateResponseRequestMatcher() // global
@@ -15,29 +14,29 @@ type RequestResponsePair struct {
// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}::{correlation_id}
type requestResponseMatcher struct {
openMessagesMap cmap.ConcurrentMap
openMessagesMap sync.Map
}
func CreateResponseRequestMatcher() requestResponseMatcher {
newMatcher := &requestResponseMatcher{openMessagesMap: cmap.New()}
newMatcher := &requestResponseMatcher{openMessagesMap: sync.Map{}}
return *newMatcher
}
func (matcher *requestResponseMatcher) registerRequest(key string, request *Request) *RequestResponsePair {
if response, found := matcher.openMessagesMap.Pop(key); found {
if response, found := matcher.openMessagesMap.LoadAndDelete(key); found {
return matcher.preparePair(request, response.(*Response))
}
matcher.openMessagesMap.Set(key, request)
matcher.openMessagesMap.Store(key, &request)
return nil
}
func (matcher *requestResponseMatcher) registerResponse(key string, response *Response) *RequestResponsePair {
if request, found := matcher.openMessagesMap.Pop(key); found {
if request, found := matcher.openMessagesMap.LoadAndDelete(key); found {
return matcher.preparePair(request.(*Request), response)
}
matcher.openMessagesMap.Set(key, response)
matcher.openMessagesMap.Store(key, &response)
return nil
}

View File

@@ -6,7 +6,7 @@ import (
"log"
"reflect"
"github.com/google/gopacket"
"github.com/up9inc/mizu/tap/api"
)
type Request struct {
@@ -27,7 +27,7 @@ func (req *Request) print() {
log.Printf("Payload: %+v\n", req.Payload)
}
func ReadRequest(r io.Reader, net gopacket.Flow, transport gopacket.Flow) (apiKey ApiKey, apiVersion int16, err error) {
func ReadRequest(r io.Reader, tcpID *api.TcpID) (apiKey ApiKey, apiVersion int16, err error) {
d := &decoder{reader: r, remain: 4}
size := d.readInt32()
@@ -220,10 +220,10 @@ func ReadRequest(r io.Reader, net gopacket.Flow, transport gopacket.Flow) (apiKe
key := fmt.Sprintf(
"%s:%s->%s:%s::%d",
net.Src().String(),
transport.Src().String(),
net.Dst().String(),
transport.Dst().String(),
tcpID.SrcIP,
tcpID.SrcPort,
tcpID.DstIP,
tcpID.DstPort,
correlationID,
)
// fmt.Printf("key: %v\n", key)

View File

@@ -6,7 +6,7 @@ import (
"log"
"reflect"
"github.com/google/gopacket"
"github.com/up9inc/mizu/tap/api"
)
type Response struct {
@@ -21,7 +21,7 @@ func (res *Response) print() {
log.Printf("Payload: %+v\n", res.Payload)
}
func ReadResponse(r io.Reader, net gopacket.Flow, transport gopacket.Flow) (err error) {
func ReadResponse(r io.Reader, tcpID *api.TcpID) (err error) {
d := &decoder{reader: r, remain: 4}
size := d.readInt32()
@@ -41,10 +41,10 @@ func ReadResponse(r io.Reader, net gopacket.Flow, transport gopacket.Flow) (err
key := fmt.Sprintf(
"%s:%s->%s:%s::%d",
net.Src().String(),
transport.Src().String(),
net.Dst().String(),
transport.Dst().String(),
tcpID.SrcIP,
tcpID.SrcPort,
tcpID.DstIP,
tcpID.DstPort,
correlationID,
)
// fmt.Printf("key: %v\n", key)