Implement Analyze, Summarize methods of Kafka

This commit is contained in:
M. Mert Yildiran 2021-08-22 13:58:09 +03:00
parent f6a260a8c9
commit 381502cea5
No known key found for this signature in database
GPG Key ID: D42ADB236521BF7A
7 changed files with 179 additions and 11 deletions

View File

@ -3,7 +3,7 @@ module github.com/up9inc/mizu/tap/extensions/kafka
go 1.16
require (
github.com/segmentio/kafka-go v0.4.17 // indirect
github.com/segmentio/kafka-go v0.4.17
github.com/up9inc/mizu/tap/api v0.0.0
)

View File

@ -1,13 +1,18 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY=
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA=
github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A=
github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
@ -24,6 +29,7 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -0,0 +1,18 @@
package main
import (
"encoding/json"
)
type KafkaPayload struct {
Type string
Data interface{}
}
type KafkaPayloader interface {
MarshalJSON() ([]byte, error)
}
func (h KafkaPayload) MarshalJSON() ([]byte, error) {
return json.Marshal(h.Data)
}

View File

@ -2,6 +2,8 @@ package main
import (
"bufio"
"encoding/json"
"fmt"
"log"
"github.com/up9inc/mizu/tap/api"
@ -13,7 +15,7 @@ var _protocol api.Protocol = api.Protocol{
Abbreviation: "KAFKA",
BackgroundColor: "#000000",
ForegroundColor: "#ffffff",
FontSize: 12,
FontSize: 11,
ReferenceLink: "https://kafka.apache.org/protocol",
Ports: []string{"9092"},
}
@ -43,13 +45,117 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em
}
func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *api.MizuEntry {
// TODO: Implement
return nil
request := item.Pair.Request.Payload.(map[string]interface{})
response := item.Pair.Response.Payload.(map[string]interface{})
entryBytes, _ := json.Marshal(item.Pair)
service := fmt.Sprintf("kafka")
apiKey := ApiKey(request["ApiKey"].(float64))
summary := ""
switch apiKey {
case Metadata:
_topics := request["Payload"].(map[string]interface{})["Topics"]
if _topics == nil {
break
}
topics := _topics.([]interface{})
for _, topic := range topics {
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["Name"].(string))
}
if len(summary) > 0 {
summary = summary[:len(summary)-2]
}
break
case ApiVersions:
summary = request["ClientID"].(string)
break
case Produce:
topics := request["Payload"].(map[string]interface{})["TopicData"].([]interface{})
for _, topic := range topics {
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["Topic"].(string))
}
if len(summary) > 0 {
summary = summary[:len(summary)-2]
}
break
case Fetch:
topics := request["Payload"].(map[string]interface{})["Topics"].([]interface{})
for _, topic := range topics {
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["Topic"].(string))
}
if len(summary) > 0 {
summary = summary[:len(summary)-2]
}
break
case ListOffsets:
topics := request["Payload"].(map[string]interface{})["Topics"].([]interface{})
for _, topic := range topics {
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["Name"].(string))
}
if len(summary) > 0 {
summary = summary[:len(summary)-2]
}
break
case CreateTopics:
topics := request["Payload"].(map[string]interface{})["Topics"].([]interface{})
for _, topic := range topics {
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["Name"].(string))
}
if len(summary) > 0 {
summary = summary[:len(summary)-2]
}
break
case DeleteTopics:
topicNames := request["TopicNames"].([]string)
for _, name := range topicNames {
summary += fmt.Sprintf("%s, ", name)
}
break
}
return &api.MizuEntry{
ProtocolName: _protocol.Name,
EntryId: entryId,
Entry: string(entryBytes),
Url: fmt.Sprintf("%s%s", service, summary),
Method: apiNames[apiKey],
Status: 0,
RequestSenderIp: item.ConnectionInfo.ClientIP,
Service: service,
Timestamp: item.Timestamp,
Path: summary,
ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination,
SourceIp: item.ConnectionInfo.ClientIP,
DestinationIp: item.ConnectionInfo.ServerIP,
SourcePort: item.ConnectionInfo.ClientPort,
DestinationPort: item.ConnectionInfo.ServerPort,
IsOutgoing: item.ConnectionInfo.IsOutgoing,
}
}
func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
// TODO: Implement
return nil
return &api.BaseEntryDetails{
Id: entry.EntryId,
Protocol: _protocol,
Url: entry.Url,
RequestSenderIp: entry.RequestSenderIp,
Service: entry.Service,
Summary: entry.Path,
StatusCode: entry.Status,
Method: entry.Method,
Timestamp: entry.Timestamp,
SourceIp: entry.SourceIp,
DestinationIp: entry.DestinationIp,
SourcePort: entry.SourcePort,
DestinationPort: entry.DestinationPort,
IsOutgoing: entry.IsOutgoing,
Latency: 0,
Rules: api.ApplicableRules{
Latency: 0,
Status: false,
},
}
}
func (d dissecting) Represent(entry string) ([]byte, error) {

View File

@ -143,6 +143,7 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID) (apiKey ApiKey, apiVersion int16
}
mt.(messageType).decode(d, valueOf(fetchRequest))
payload = fetchRequest
break
case ListOffsets:
var mt interface{}
var listOffsetsRequest interface{}

View File

@ -6,6 +6,7 @@ import (
"io"
"log"
"reflect"
"time"
"github.com/up9inc/mizu/tap/api"
)
@ -245,8 +246,40 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error
break
}
reqResPair.debug()
// emitter.Emit(item)
// reqResPair.debug()
connectionInfo := &api.ConnectionInfo{
ClientIP: tcpID.SrcIP,
ClientPort: tcpID.SrcPort,
ServerIP: tcpID.DstIP,
ServerPort: tcpID.DstPort,
IsOutgoing: true,
}
item := &api.OutputChannelItem{
Protocol: _protocol,
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
ConnectionInfo: connectionInfo,
Pair: &api.RequestResponsePair{
Request: api.GenericMessage{
IsRequest: true,
CaptureTime: time.Now(),
Payload: KafkaPayload{
Type: "kafka_request",
Data: reqResPair.Request,
},
},
Response: api.GenericMessage{
IsRequest: false,
CaptureTime: time.Now(),
Payload: KafkaPayload{
Type: "kafka_response",
Data: reqResPair.Response,
},
},
},
}
emitter.Emit(item)
if i := int(apiKey); i < 0 || i >= len(apiTypes) {
err = fmt.Errorf("unsupported api key: %d", i)

View File

@ -29,8 +29,6 @@ const SectionsRepresentation: React.FC<any> = ({data, color}) => {
}
const AutoRepresentation: React.FC<any> = ({representation, color, isResponseMocked}) => {
const {request, response} = JSON.parse(representation);
const rulesMatched = []
const TABS = [
{tab: 'request'},
@ -42,9 +40,15 @@ const AutoRepresentation: React.FC<any> = ({representation, color, isResponseMoc
tab: 'Rules',
},
];
const [currentTab, setCurrentTab] = useState(TABS[0].tab);
// Don't fail even if `representation` is an empty string
if (representation.length === 0) {
return <></>;
}
const {request, response} = JSON.parse(representation);
return <div className={styles.harEntry}>
{<div className={styles.body}>
<div className={styles.bodyHeader}>