mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-06-25 15:54:43 +00:00
* Remove `tcpStreamWrapper` struct * Refactor `tap` module and move some of the code to `tap/api` module * Move `TrafficFilteringOptions` struct to `shared` module * Change the `Dissect` method signature to have `*TcpReader` as an argument * Add `CloseOtherProtocolDissectors` method and use it to synchronously close the other protocol dissectors * Run `go mod tidy` in `cli` module * Rename `SuperIdentifier` struct to `ProtoIdentifier` * Remove `SuperTimer` struct * Bring back `CloseTimedoutTcpStreamChannels` method * Run `go mod tidy` everywhere * Remove `GOGC` environment variable from tapper * Fix the tests * Bring back `debug.FreeOSMemory()` call * Make `CloseOtherProtocolDissectors` method mutexed * Revert "Remove `GOGC` environment variable from tapper" This reverts commitcfc2484bbb
. * Bring back the removed `checksum`, `nooptcheck` and `ignorefsmerr` flags * Define a bunch of interfaces and don't export any new structs from `tap/api` * Keep the interfaces in `tap/api` but move the structs to `tap/tcp` * Fix the unit tests by depending on `github.com/up9inc/mizu/tap` * Use the modified `tlsEmitter` * Define `TlsChunk` interface and make `tlsReader` implement `TcpReader` * Remove unused fields in `tlsReader` * Define `ReassemblyStream` interface and separate `gopacket` specififc fields to `tcpReassemblyStream` struct Such that make `tap/api` don't depend on `gopacket` * Remove the unused fields * Make `tlsPoller` implement `TcpStream` interface and remove the call to `NewTcpStreamDummy` method * Remove unused fields from `tlsPoller` * Remove almost all of the setter methods in `TcpReader` and `TcpStream` interface and remove `TlsChunk` interface * Revert "Revert "Remove `GOGC` environment variable from tapper"" This reverts commitab2b9a803b
. * Revert "Bring back `debug.FreeOSMemory()` call" This reverts commit1cce863bbb
. * Remove excess comment * Fix acceptance tests (`logger` module) #run_acceptance_tests * Bring back `github.com/patrickmn/go-cache` * Fix `NewTcpStream` method signature * Put `tcpReader` and `tcpStream` mocks into protocol dissectors to remove `github.com/up9inc/mizu/tap` dependency * Fix AMQP tests * Revert960ba644cd
* Revert `go.mod` and `go.sum` files in protocol dissectors * Fix the comment position * Revert `AppStatsInst` change * Fix indent * Fix CLI build * Fix linter error * Fix error msg * Revert some of the changes in `chunk.go`
265 lines
8.1 KiB
Go
265 lines
8.1 KiB
Go
package kafka
|
|
|
|
import (
|
|
"bufio"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"time"
|
|
|
|
"github.com/up9inc/mizu/tap/api"
|
|
)
|
|
|
|
var _protocol api.Protocol = api.Protocol{
|
|
Name: "kafka",
|
|
LongName: "Apache Kafka Protocol",
|
|
Abbreviation: "KAFKA",
|
|
Macro: "kafka",
|
|
Version: "12",
|
|
BackgroundColor: "#000000",
|
|
ForegroundColor: "#ffffff",
|
|
FontSize: 11,
|
|
ReferenceLink: "https://kafka.apache.org/protocol",
|
|
Ports: []string{"9092"},
|
|
Priority: 2,
|
|
}
|
|
|
|
type dissecting string
|
|
|
|
func (d dissecting) Register(extension *api.Extension) {
|
|
extension.Protocol = &_protocol
|
|
}
|
|
|
|
func (d dissecting) Ping() {
|
|
log.Printf("pong %s", _protocol.Name)
|
|
}
|
|
|
|
func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.TrafficFilteringOptions) error {
|
|
reqResMatcher := reader.GetReqResMatcher().(*requestResponseMatcher)
|
|
for {
|
|
if reader.GetParent().GetProtoIdentifier().Protocol != nil && reader.GetParent().GetProtoIdentifier().Protocol != &_protocol {
|
|
return errors.New("Identified by another protocol")
|
|
}
|
|
|
|
if reader.GetIsClient() {
|
|
_, _, err := ReadRequest(b, reader.GetTcpID(), reader.GetCounterPair(), reader.GetCaptureTime(), reqResMatcher)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
reader.GetParent().SetProtocol(&_protocol)
|
|
} else {
|
|
err := ReadResponse(b, reader.GetParent().GetOrigin(), reader.GetTcpID(), reader.GetCounterPair(), reader.GetCaptureTime(), reader.GetEmitter(), reqResMatcher)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
reader.GetParent().SetProtocol(&_protocol)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry {
|
|
request := item.Pair.Request.Payload.(map[string]interface{})
|
|
reqDetails := request["details"].(map[string]interface{})
|
|
|
|
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
|
|
if elapsedTime < 0 {
|
|
elapsedTime = 0
|
|
}
|
|
return &api.Entry{
|
|
Protocol: _protocol,
|
|
Capture: item.Capture,
|
|
Source: &api.TCP{
|
|
Name: resolvedSource,
|
|
IP: item.ConnectionInfo.ClientIP,
|
|
Port: item.ConnectionInfo.ClientPort,
|
|
},
|
|
Destination: &api.TCP{
|
|
Name: resolvedDestination,
|
|
IP: item.ConnectionInfo.ServerIP,
|
|
Port: item.ConnectionInfo.ServerPort,
|
|
},
|
|
Namespace: namespace,
|
|
Outgoing: item.ConnectionInfo.IsOutgoing,
|
|
Request: reqDetails,
|
|
Response: item.Pair.Response.Payload.(map[string]interface{})["details"].(map[string]interface{}),
|
|
RequestSize: item.Pair.Request.CaptureSize,
|
|
ResponseSize: item.Pair.Response.CaptureSize,
|
|
Timestamp: item.Timestamp,
|
|
StartTime: item.Pair.Request.CaptureTime,
|
|
ElapsedTime: elapsedTime,
|
|
}
|
|
}
|
|
|
|
func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
|
|
status := 0
|
|
statusQuery := ""
|
|
|
|
apiKey := ApiKey(entry.Request["apiKey"].(float64))
|
|
method := apiNames[apiKey]
|
|
methodQuery := fmt.Sprintf("request.apiKey == %d", int(entry.Request["apiKey"].(float64)))
|
|
|
|
summary := ""
|
|
summaryQuery := ""
|
|
switch apiKey {
|
|
case Metadata:
|
|
_topics := entry.Request["payload"].(map[string]interface{})["topics"]
|
|
if _topics == nil {
|
|
break
|
|
}
|
|
topics := _topics.([]interface{})
|
|
for i, topic := range topics {
|
|
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["name"].(string))
|
|
summaryQuery += fmt.Sprintf(`request.payload.topics[%d].name == "%s" and`, i, summary)
|
|
}
|
|
if len(summary) > 0 {
|
|
summary = summary[:len(summary)-2]
|
|
summaryQuery = summaryQuery[:len(summaryQuery)-4]
|
|
}
|
|
case ApiVersions:
|
|
summary = entry.Request["clientID"].(string)
|
|
summaryQuery = fmt.Sprintf(`request.clientID == "%s"`, summary)
|
|
case Produce:
|
|
_topics := entry.Request["payload"].(map[string]interface{})["topicData"]
|
|
if _topics == nil {
|
|
break
|
|
}
|
|
topics := _topics.([]interface{})
|
|
for i, topic := range topics {
|
|
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["topic"].(string))
|
|
summaryQuery += fmt.Sprintf(`request.payload.topicData[%d].topic == "%s" and`, i, summary)
|
|
}
|
|
if len(summary) > 0 {
|
|
summary = summary[:len(summary)-2]
|
|
summaryQuery = summaryQuery[:len(summaryQuery)-4]
|
|
}
|
|
case Fetch:
|
|
_topics := entry.Request["payload"].(map[string]interface{})["topics"]
|
|
if _topics == nil {
|
|
break
|
|
}
|
|
topics := _topics.([]interface{})
|
|
for i, topic := range topics {
|
|
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["topic"].(string))
|
|
summaryQuery += fmt.Sprintf(`request.payload.topics[%d].topic == "%s" and`, i, summary)
|
|
}
|
|
if len(summary) > 0 {
|
|
summary = summary[:len(summary)-2]
|
|
summaryQuery = summaryQuery[:len(summaryQuery)-4]
|
|
}
|
|
case ListOffsets:
|
|
_topics := entry.Request["payload"].(map[string]interface{})["topics"]
|
|
if _topics == nil {
|
|
break
|
|
}
|
|
topics := _topics.([]interface{})
|
|
for i, topic := range topics {
|
|
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["name"].(string))
|
|
summaryQuery += fmt.Sprintf(`request.payload.topics[%d].name == "%s" and`, i, summary)
|
|
}
|
|
if len(summary) > 0 {
|
|
summary = summary[:len(summary)-2]
|
|
summaryQuery = summaryQuery[:len(summaryQuery)-4]
|
|
}
|
|
case CreateTopics:
|
|
_topics := entry.Request["payload"].(map[string]interface{})["topics"]
|
|
if _topics == nil {
|
|
break
|
|
}
|
|
topics := _topics.([]interface{})
|
|
for i, topic := range topics {
|
|
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["name"].(string))
|
|
summaryQuery += fmt.Sprintf(`request.payload.topics[%d].name == "%s" and`, i, summary)
|
|
}
|
|
if len(summary) > 0 {
|
|
summary = summary[:len(summary)-2]
|
|
summaryQuery = summaryQuery[:len(summaryQuery)-4]
|
|
}
|
|
case DeleteTopics:
|
|
if entry.Request["topicNames"] == nil {
|
|
break
|
|
}
|
|
topicNames := entry.Request["topicNames"].([]string)
|
|
for i, name := range topicNames {
|
|
summary += fmt.Sprintf("%s, ", name)
|
|
summaryQuery += fmt.Sprintf(`request.topicNames[%d] == "%s" and`, i, summary)
|
|
}
|
|
if len(summary) > 0 {
|
|
summary = summary[:len(summary)-2]
|
|
summaryQuery = summaryQuery[:len(summaryQuery)-4]
|
|
}
|
|
}
|
|
|
|
return &api.BaseEntry{
|
|
Id: entry.Id,
|
|
Protocol: entry.Protocol,
|
|
Capture: entry.Capture,
|
|
Summary: summary,
|
|
SummaryQuery: summaryQuery,
|
|
Status: status,
|
|
StatusQuery: statusQuery,
|
|
Method: method,
|
|
MethodQuery: methodQuery,
|
|
Timestamp: entry.Timestamp,
|
|
Source: entry.Source,
|
|
Destination: entry.Destination,
|
|
IsOutgoing: entry.Outgoing,
|
|
Latency: entry.ElapsedTime,
|
|
Rules: entry.Rules,
|
|
ContractStatus: entry.ContractStatus,
|
|
}
|
|
}
|
|
|
|
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, err error) {
|
|
representation := make(map[string]interface{})
|
|
|
|
apiKey := ApiKey(request["apiKey"].(float64))
|
|
|
|
var repRequest []interface{}
|
|
var repResponse []interface{}
|
|
switch apiKey {
|
|
case Metadata:
|
|
repRequest = representMetadataRequest(request)
|
|
repResponse = representMetadataResponse(response)
|
|
case ApiVersions:
|
|
repRequest = representApiVersionsRequest(request)
|
|
repResponse = representApiVersionsResponse(response)
|
|
case Produce:
|
|
repRequest = representProduceRequest(request)
|
|
repResponse = representProduceResponse(response)
|
|
case Fetch:
|
|
repRequest = representFetchRequest(request)
|
|
repResponse = representFetchResponse(response)
|
|
case ListOffsets:
|
|
repRequest = representListOffsetsRequest(request)
|
|
repResponse = representListOffsetsResponse(response)
|
|
case CreateTopics:
|
|
repRequest = representCreateTopicsRequest(request)
|
|
repResponse = representCreateTopicsResponse(response)
|
|
case DeleteTopics:
|
|
repRequest = representDeleteTopicsRequest(request)
|
|
repResponse = representDeleteTopicsResponse(response)
|
|
}
|
|
|
|
representation["request"] = repRequest
|
|
representation["response"] = repResponse
|
|
object, err = json.Marshal(representation)
|
|
return
|
|
}
|
|
|
|
func (d dissecting) Macros() map[string]string {
|
|
return map[string]string{
|
|
`kafka`: fmt.Sprintf(`proto.name == "%s"`, _protocol.Name),
|
|
}
|
|
}
|
|
|
|
func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher {
|
|
return createResponseRequestMatcher()
|
|
}
|
|
|
|
var Dissector dissecting
|
|
|
|
func NewDissector() api.Dissector {
|
|
return Dissector
|
|
}
|