From 381502cea5f50fd9efa98fa72dfc7ab56f881474 Mon Sep 17 00:00:00 2001 From: "M. Mert Yildiran" Date: Sun, 22 Aug 2021 13:58:09 +0300 Subject: [PATCH] Implement `Analyze`, `Summarize` methods of Kafka --- tap/extensions/kafka/go.mod | 2 +- tap/extensions/kafka/go.sum | 6 + tap/extensions/kafka/helpers.go | 18 +++ tap/extensions/kafka/main.go | 116 +++++++++++++++++- tap/extensions/kafka/request.go | 1 + tap/extensions/kafka/response.go | 37 +++++- .../HarEntryViewer/HAREntryViewer.tsx | 10 +- 7 files changed, 179 insertions(+), 11 deletions(-) create mode 100644 tap/extensions/kafka/helpers.go diff --git a/tap/extensions/kafka/go.mod b/tap/extensions/kafka/go.mod index c3808e9ab..113627f94 100644 --- a/tap/extensions/kafka/go.mod +++ b/tap/extensions/kafka/go.mod @@ -3,7 +3,7 @@ module github.com/up9inc/mizu/tap/extensions/kafka go 1.16 require ( - github.com/segmentio/kafka-go v0.4.17 // indirect + github.com/segmentio/kafka-go v0.4.17 github.com/up9inc/mizu/tap/api v0.0.0 ) diff --git a/tap/extensions/kafka/go.sum b/tap/extensions/kafka/go.sum index 9aeb88d28..32124810d 100644 --- a/tap/extensions/kafka/go.sum +++ b/tap/extensions/kafka/go.sum @@ -1,13 +1,18 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY= github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA= github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= +github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A= github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= @@ -24,6 +29,7 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/tap/extensions/kafka/helpers.go b/tap/extensions/kafka/helpers.go new file mode 100644 index 000000000..d8b6f73d1 --- /dev/null +++ b/tap/extensions/kafka/helpers.go @@ -0,0 +1,18 @@ +package main + +import ( + "encoding/json" +) + +type KafkaPayload struct { + Type string + Data interface{} +} + +type KafkaPayloader interface { + MarshalJSON() ([]byte, error) +} + +func (h KafkaPayload) MarshalJSON() ([]byte, error) { + return json.Marshal(h.Data) +} diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index 74b096d4e..67f1d9ea2 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -2,6 +2,8 @@ package main import ( "bufio" + "encoding/json" + "fmt" "log" "github.com/up9inc/mizu/tap/api" @@ -13,7 +15,7 @@ var _protocol api.Protocol = api.Protocol{ Abbreviation: "KAFKA", BackgroundColor: "#000000", ForegroundColor: "#ffffff", - FontSize: 12, + FontSize: 11, ReferenceLink: "https://kafka.apache.org/protocol", Ports: []string{"9092"}, } @@ -43,13 +45,117 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em } func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *api.MizuEntry { - // TODO: Implement - return nil + request := item.Pair.Request.Payload.(map[string]interface{}) + response := item.Pair.Response.Payload.(map[string]interface{}) + entryBytes, _ := json.Marshal(item.Pair) + service := fmt.Sprintf("kafka") + apiKey := ApiKey(request["ApiKey"].(float64)) + + summary := "" + switch apiKey { + case Metadata: + _topics := request["Payload"].(map[string]interface{})["Topics"] + if _topics == nil { + break + } + topics := _topics.([]interface{}) + for _, topic := range topics { + summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["Name"].(string)) + } + if len(summary) > 0 { + summary = summary[:len(summary)-2] + } + break + case ApiVersions: + summary = request["ClientID"].(string) + break + case Produce: + topics := request["Payload"].(map[string]interface{})["TopicData"].([]interface{}) + for _, topic := range topics { + summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["Topic"].(string)) + } + if len(summary) > 0 { + summary = summary[:len(summary)-2] + } + break + case Fetch: + topics := request["Payload"].(map[string]interface{})["Topics"].([]interface{}) + for _, topic := range topics { + summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["Topic"].(string)) + } + if len(summary) > 0 { + summary = summary[:len(summary)-2] + } + break + case ListOffsets: + topics := request["Payload"].(map[string]interface{})["Topics"].([]interface{}) + for _, topic := range topics { + summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["Name"].(string)) + } + if len(summary) > 0 { + summary = summary[:len(summary)-2] + } + break + case CreateTopics: + topics := request["Payload"].(map[string]interface{})["Topics"].([]interface{}) + for _, topic := range topics { + summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["Name"].(string)) + } + if len(summary) > 0 { + summary = summary[:len(summary)-2] + } + break + case DeleteTopics: + topicNames := request["TopicNames"].([]string) + for _, name := range topicNames { + summary += fmt.Sprintf("%s, ", name) + } + break + } + + return &api.MizuEntry{ + ProtocolName: _protocol.Name, + EntryId: entryId, + Entry: string(entryBytes), + Url: fmt.Sprintf("%s%s", service, summary), + Method: apiNames[apiKey], + Status: 0, + RequestSenderIp: item.ConnectionInfo.ClientIP, + Service: service, + Timestamp: item.Timestamp, + Path: summary, + ResolvedSource: resolvedSource, + ResolvedDestination: resolvedDestination, + SourceIp: item.ConnectionInfo.ClientIP, + DestinationIp: item.ConnectionInfo.ServerIP, + SourcePort: item.ConnectionInfo.ClientPort, + DestinationPort: item.ConnectionInfo.ServerPort, + IsOutgoing: item.ConnectionInfo.IsOutgoing, + } } func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails { - // TODO: Implement - return nil + return &api.BaseEntryDetails{ + Id: entry.EntryId, + Protocol: _protocol, + Url: entry.Url, + RequestSenderIp: entry.RequestSenderIp, + Service: entry.Service, + Summary: entry.Path, + StatusCode: entry.Status, + Method: entry.Method, + Timestamp: entry.Timestamp, + SourceIp: entry.SourceIp, + DestinationIp: entry.DestinationIp, + SourcePort: entry.SourcePort, + DestinationPort: entry.DestinationPort, + IsOutgoing: entry.IsOutgoing, + Latency: 0, + Rules: api.ApplicableRules{ + Latency: 0, + Status: false, + }, + } } func (d dissecting) Represent(entry string) ([]byte, error) { diff --git a/tap/extensions/kafka/request.go b/tap/extensions/kafka/request.go index f6a9d04fd..bfee59182 100644 --- a/tap/extensions/kafka/request.go +++ b/tap/extensions/kafka/request.go @@ -143,6 +143,7 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID) (apiKey ApiKey, apiVersion int16 } mt.(messageType).decode(d, valueOf(fetchRequest)) payload = fetchRequest + break case ListOffsets: var mt interface{} var listOffsetsRequest interface{} diff --git a/tap/extensions/kafka/response.go b/tap/extensions/kafka/response.go index 43ecb15a2..f48bfbef3 100644 --- a/tap/extensions/kafka/response.go +++ b/tap/extensions/kafka/response.go @@ -6,6 +6,7 @@ import ( "io" "log" "reflect" + "time" "github.com/up9inc/mizu/tap/api" ) @@ -245,8 +246,40 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error break } - reqResPair.debug() - // emitter.Emit(item) + // reqResPair.debug() + + connectionInfo := &api.ConnectionInfo{ + ClientIP: tcpID.SrcIP, + ClientPort: tcpID.SrcPort, + ServerIP: tcpID.DstIP, + ServerPort: tcpID.DstPort, + IsOutgoing: true, + } + + item := &api.OutputChannelItem{ + Protocol: _protocol, + Timestamp: time.Now().UnixNano() / int64(time.Millisecond), + ConnectionInfo: connectionInfo, + Pair: &api.RequestResponsePair{ + Request: api.GenericMessage{ + IsRequest: true, + CaptureTime: time.Now(), + Payload: KafkaPayload{ + Type: "kafka_request", + Data: reqResPair.Request, + }, + }, + Response: api.GenericMessage{ + IsRequest: false, + CaptureTime: time.Now(), + Payload: KafkaPayload{ + Type: "kafka_response", + Data: reqResPair.Response, + }, + }, + }, + } + emitter.Emit(item) if i := int(apiKey); i < 0 || i >= len(apiTypes) { err = fmt.Errorf("unsupported api key: %d", i) diff --git a/ui/src/components/HarEntryViewer/HAREntryViewer.tsx b/ui/src/components/HarEntryViewer/HAREntryViewer.tsx index 9244f7b4f..1d623b6c6 100644 --- a/ui/src/components/HarEntryViewer/HAREntryViewer.tsx +++ b/ui/src/components/HarEntryViewer/HAREntryViewer.tsx @@ -29,8 +29,6 @@ const SectionsRepresentation: React.FC = ({data, color}) => { } const AutoRepresentation: React.FC = ({representation, color, isResponseMocked}) => { - const {request, response} = JSON.parse(representation); - const rulesMatched = [] const TABS = [ {tab: 'request'}, @@ -42,9 +40,15 @@ const AutoRepresentation: React.FC = ({representation, color, isResponseMoc tab: 'Rules', }, ]; - const [currentTab, setCurrentTab] = useState(TABS[0].tab); + // Don't fail even if `representation` is an empty string + if (representation.length === 0) { + return <>; + } + + const {request, response} = JSON.parse(representation); + return
{