mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-06-28 09:10:09 +00:00
* Remove `tcpStreamWrapper` struct * Refactor `tap` module and move some of the code to `tap/api` module * Move `TrafficFilteringOptions` struct to `shared` module * Change the `Dissect` method signature to have `*TcpReader` as an argument * Add `CloseOtherProtocolDissectors` method and use it to synchronously close the other protocol dissectors * Run `go mod tidy` in `cli` module * Rename `SuperIdentifier` struct to `ProtoIdentifier` * Remove `SuperTimer` struct * Bring back `CloseTimedoutTcpStreamChannels` method * Run `go mod tidy` everywhere * Remove `GOGC` environment variable from tapper * Fix the tests * Bring back `debug.FreeOSMemory()` call * Make `CloseOtherProtocolDissectors` method mutexed * Revert "Remove `GOGC` environment variable from tapper" This reverts commitcfc2484bbb
. * Bring back the removed `checksum`, `nooptcheck` and `ignorefsmerr` flags * Define a bunch of interfaces and don't export any new structs from `tap/api` * Keep the interfaces in `tap/api` but move the structs to `tap/tcp` * Fix the unit tests by depending on `github.com/up9inc/mizu/tap` * Use the modified `tlsEmitter` * Define `TlsChunk` interface and make `tlsReader` implement `TcpReader` * Remove unused fields in `tlsReader` * Define `ReassemblyStream` interface and separate `gopacket` specififc fields to `tcpReassemblyStream` struct Such that make `tap/api` don't depend on `gopacket` * Remove the unused fields * Make `tlsPoller` implement `TcpStream` interface and remove the call to `NewTcpStreamDummy` method * Remove unused fields from `tlsPoller` * Remove almost all of the setter methods in `TcpReader` and `TcpStream` interface and remove `TlsChunk` interface * Revert "Revert "Remove `GOGC` environment variable from tapper"" This reverts commitab2b9a803b
. * Revert "Bring back `debug.FreeOSMemory()` call" This reverts commit1cce863bbb
. * Remove excess comment * Fix acceptance tests (`logger` module) #run_acceptance_tests * Bring back `github.com/patrickmn/go-cache` * Fix `NewTcpStream` method signature * Put `tcpReader` and `tcpStream` mocks into protocol dissectors to remove `github.com/up9inc/mizu/tap` dependency * Fix AMQP tests * Revert960ba644cd
* Revert `go.mod` and `go.sum` files in protocol dissectors * Fix the comment position * Revert `AppStatsInst` change * Fix indent * Fix CLI build * Fix linter error * Fix error msg * Revert some of the changes in `chunk.go`
346 lines
11 KiB
Go
346 lines
11 KiB
Go
package amqp
|
|
|
|
import (
|
|
"bufio"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/up9inc/mizu/tap/api"
|
|
)
|
|
|
|
var protocol api.Protocol = api.Protocol{
|
|
Name: "amqp",
|
|
LongName: "Advanced Message Queuing Protocol 0-9-1",
|
|
Abbreviation: "AMQP",
|
|
Macro: "amqp",
|
|
Version: "0-9-1",
|
|
BackgroundColor: "#ff6600",
|
|
ForegroundColor: "#ffffff",
|
|
FontSize: 12,
|
|
ReferenceLink: "https://www.rabbitmq.com/amqp-0-9-1-reference.html",
|
|
Ports: []string{"5671", "5672"},
|
|
Priority: 1,
|
|
}
|
|
|
|
type dissecting string
|
|
|
|
func (d dissecting) Register(extension *api.Extension) {
|
|
extension.Protocol = &protocol
|
|
}
|
|
|
|
func (d dissecting) Ping() {
|
|
log.Printf("pong %s", protocol.Name)
|
|
}
|
|
|
|
const amqpRequest string = "amqp_request"
|
|
|
|
func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.TrafficFilteringOptions) error {
|
|
r := AmqpReader{b}
|
|
|
|
var remaining int
|
|
var header *HeaderFrame
|
|
|
|
connectionInfo := &api.ConnectionInfo{
|
|
ClientIP: reader.GetTcpID().SrcIP,
|
|
ClientPort: reader.GetTcpID().SrcPort,
|
|
ServerIP: reader.GetTcpID().DstIP,
|
|
ServerPort: reader.GetTcpID().DstPort,
|
|
IsOutgoing: true,
|
|
}
|
|
|
|
eventBasicPublish := &BasicPublish{
|
|
Exchange: "",
|
|
RoutingKey: "",
|
|
Mandatory: false,
|
|
Immediate: false,
|
|
Body: nil,
|
|
Properties: Properties{},
|
|
}
|
|
|
|
eventBasicDeliver := &BasicDeliver{
|
|
ConsumerTag: "",
|
|
DeliveryTag: 0,
|
|
Redelivered: false,
|
|
Exchange: "",
|
|
RoutingKey: "",
|
|
Properties: Properties{},
|
|
Body: nil,
|
|
}
|
|
|
|
var lastMethodFrameMessage Message
|
|
|
|
for {
|
|
if reader.GetParent().GetProtoIdentifier().Protocol != nil && reader.GetParent().GetProtoIdentifier().Protocol != &protocol {
|
|
return errors.New("Identified by another protocol")
|
|
}
|
|
|
|
frame, err := r.ReadFrame()
|
|
if err == io.EOF {
|
|
// We must read until we see an EOF... very important!
|
|
return nil
|
|
}
|
|
|
|
switch f := frame.(type) {
|
|
case *HeartbeatFrame:
|
|
// drop
|
|
|
|
case *HeaderFrame:
|
|
// start content state
|
|
header = f
|
|
remaining = int(header.Size)
|
|
|
|
// Workaround for `Time.MarshalJSON: year outside of range [0,9999]` error
|
|
if header.Properties.Timestamp.Year() > 9999 {
|
|
header.Properties.Timestamp = time.Time{}.UTC()
|
|
}
|
|
|
|
switch lastMethodFrameMessage.(type) {
|
|
case *BasicPublish:
|
|
eventBasicPublish.Properties = header.Properties
|
|
case *BasicDeliver:
|
|
eventBasicDeliver.Properties = header.Properties
|
|
}
|
|
|
|
case *BodyFrame:
|
|
// continue until terminated
|
|
remaining -= len(f.Body)
|
|
switch lastMethodFrameMessage.(type) {
|
|
case *BasicPublish:
|
|
eventBasicPublish.Body = f.Body
|
|
reader.GetParent().SetProtocol(&protocol)
|
|
emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
|
|
case *BasicDeliver:
|
|
eventBasicDeliver.Body = f.Body
|
|
reader.GetParent().SetProtocol(&protocol)
|
|
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
|
|
}
|
|
|
|
case *MethodFrame:
|
|
lastMethodFrameMessage = f.Method
|
|
switch m := f.Method.(type) {
|
|
case *BasicPublish:
|
|
eventBasicPublish.Exchange = m.Exchange
|
|
eventBasicPublish.RoutingKey = m.RoutingKey
|
|
eventBasicPublish.Mandatory = m.Mandatory
|
|
eventBasicPublish.Immediate = m.Immediate
|
|
|
|
case *QueueBind:
|
|
eventQueueBind := &QueueBind{
|
|
Queue: m.Queue,
|
|
Exchange: m.Exchange,
|
|
RoutingKey: m.RoutingKey,
|
|
NoWait: m.NoWait,
|
|
Arguments: m.Arguments,
|
|
}
|
|
reader.GetParent().SetProtocol(&protocol)
|
|
emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
|
|
|
|
case *BasicConsume:
|
|
eventBasicConsume := &BasicConsume{
|
|
Queue: m.Queue,
|
|
ConsumerTag: m.ConsumerTag,
|
|
NoLocal: m.NoLocal,
|
|
NoAck: m.NoAck,
|
|
Exclusive: m.Exclusive,
|
|
NoWait: m.NoWait,
|
|
Arguments: m.Arguments,
|
|
}
|
|
reader.GetParent().SetProtocol(&protocol)
|
|
emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
|
|
|
|
case *BasicDeliver:
|
|
eventBasicDeliver.ConsumerTag = m.ConsumerTag
|
|
eventBasicDeliver.DeliveryTag = m.DeliveryTag
|
|
eventBasicDeliver.Redelivered = m.Redelivered
|
|
eventBasicDeliver.Exchange = m.Exchange
|
|
eventBasicDeliver.RoutingKey = m.RoutingKey
|
|
|
|
case *QueueDeclare:
|
|
eventQueueDeclare := &QueueDeclare{
|
|
Queue: m.Queue,
|
|
Passive: m.Passive,
|
|
Durable: m.Durable,
|
|
AutoDelete: m.AutoDelete,
|
|
Exclusive: m.Exclusive,
|
|
NoWait: m.NoWait,
|
|
Arguments: m.Arguments,
|
|
}
|
|
reader.GetParent().SetProtocol(&protocol)
|
|
emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
|
|
|
|
case *ExchangeDeclare:
|
|
eventExchangeDeclare := &ExchangeDeclare{
|
|
Exchange: m.Exchange,
|
|
Type: m.Type,
|
|
Passive: m.Passive,
|
|
Durable: m.Durable,
|
|
AutoDelete: m.AutoDelete,
|
|
Internal: m.Internal,
|
|
NoWait: m.NoWait,
|
|
Arguments: m.Arguments,
|
|
}
|
|
reader.GetParent().SetProtocol(&protocol)
|
|
emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
|
|
|
|
case *ConnectionStart:
|
|
eventConnectionStart := &ConnectionStart{
|
|
VersionMajor: m.VersionMajor,
|
|
VersionMinor: m.VersionMinor,
|
|
ServerProperties: m.ServerProperties,
|
|
Mechanisms: m.Mechanisms,
|
|
Locales: m.Locales,
|
|
}
|
|
reader.GetParent().SetProtocol(&protocol)
|
|
emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
|
|
|
|
case *ConnectionClose:
|
|
eventConnectionClose := &ConnectionClose{
|
|
ReplyCode: m.ReplyCode,
|
|
ReplyText: m.ReplyText,
|
|
ClassId: m.ClassId,
|
|
MethodId: m.MethodId,
|
|
}
|
|
reader.GetParent().SetProtocol(&protocol)
|
|
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
|
|
}
|
|
|
|
default:
|
|
// log.Printf("unexpected frame: %+v", f)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry {
|
|
request := item.Pair.Request.Payload.(map[string]interface{})
|
|
reqDetails := request["details"].(map[string]interface{})
|
|
|
|
reqDetails["method"] = request["method"]
|
|
return &api.Entry{
|
|
Protocol: protocol,
|
|
Capture: item.Capture,
|
|
Source: &api.TCP{
|
|
Name: resolvedSource,
|
|
IP: item.ConnectionInfo.ClientIP,
|
|
Port: item.ConnectionInfo.ClientPort,
|
|
},
|
|
Destination: &api.TCP{
|
|
Name: resolvedDestination,
|
|
IP: item.ConnectionInfo.ServerIP,
|
|
Port: item.ConnectionInfo.ServerPort,
|
|
},
|
|
Namespace: namespace,
|
|
Outgoing: item.ConnectionInfo.IsOutgoing,
|
|
Request: reqDetails,
|
|
RequestSize: item.Pair.Request.CaptureSize,
|
|
Timestamp: item.Timestamp,
|
|
StartTime: item.Pair.Request.CaptureTime,
|
|
ElapsedTime: 0,
|
|
}
|
|
|
|
}
|
|
|
|
func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
|
|
summary := ""
|
|
summaryQuery := ""
|
|
method := entry.Request["method"].(string)
|
|
methodQuery := fmt.Sprintf(`request.method == "%s"`, method)
|
|
switch method {
|
|
case basicMethodMap[40]:
|
|
summary = entry.Request["exchange"].(string)
|
|
summaryQuery = fmt.Sprintf(`request.exchange == "%s"`, summary)
|
|
case basicMethodMap[60]:
|
|
summary = entry.Request["exchange"].(string)
|
|
summaryQuery = fmt.Sprintf(`request.exchange == "%s"`, summary)
|
|
case exchangeMethodMap[10]:
|
|
summary = entry.Request["exchange"].(string)
|
|
summaryQuery = fmt.Sprintf(`request.exchange == "%s"`, summary)
|
|
case queueMethodMap[10]:
|
|
summary = entry.Request["queue"].(string)
|
|
summaryQuery = fmt.Sprintf(`request.queue == "%s"`, summary)
|
|
case connectionMethodMap[10]:
|
|
versionMajor := int(entry.Request["versionMajor"].(float64))
|
|
versionMinor := int(entry.Request["versionMinor"].(float64))
|
|
summary = fmt.Sprintf(
|
|
"%s.%s",
|
|
strconv.Itoa(versionMajor),
|
|
strconv.Itoa(versionMinor),
|
|
)
|
|
summaryQuery = fmt.Sprintf(`request.versionMajor == %d and request.versionMinor == %d`, versionMajor, versionMinor)
|
|
case connectionMethodMap[50]:
|
|
summary = entry.Request["replyText"].(string)
|
|
summaryQuery = fmt.Sprintf(`request.replyText == "%s"`, summary)
|
|
case queueMethodMap[20]:
|
|
summary = entry.Request["queue"].(string)
|
|
summaryQuery = fmt.Sprintf(`request.queue == "%s"`, summary)
|
|
case basicMethodMap[20]:
|
|
summary = entry.Request["queue"].(string)
|
|
summaryQuery = fmt.Sprintf(`request.queue == "%s"`, summary)
|
|
}
|
|
|
|
return &api.BaseEntry{
|
|
Id: entry.Id,
|
|
Protocol: entry.Protocol,
|
|
Capture: entry.Capture,
|
|
Summary: summary,
|
|
SummaryQuery: summaryQuery,
|
|
Status: 0,
|
|
StatusQuery: "",
|
|
Method: method,
|
|
MethodQuery: methodQuery,
|
|
Timestamp: entry.Timestamp,
|
|
Source: entry.Source,
|
|
Destination: entry.Destination,
|
|
IsOutgoing: entry.Outgoing,
|
|
Latency: entry.ElapsedTime,
|
|
Rules: entry.Rules,
|
|
ContractStatus: entry.ContractStatus,
|
|
}
|
|
}
|
|
|
|
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, err error) {
|
|
representation := make(map[string]interface{})
|
|
var repRequest []interface{}
|
|
switch request["method"].(string) {
|
|
case basicMethodMap[40]:
|
|
repRequest = representBasicPublish(request)
|
|
case basicMethodMap[60]:
|
|
repRequest = representBasicDeliver(request)
|
|
case queueMethodMap[10]:
|
|
repRequest = representQueueDeclare(request)
|
|
case exchangeMethodMap[10]:
|
|
repRequest = representExchangeDeclare(request)
|
|
case connectionMethodMap[10]:
|
|
repRequest = representConnectionStart(request)
|
|
case connectionMethodMap[50]:
|
|
repRequest = representConnectionClose(request)
|
|
case queueMethodMap[20]:
|
|
repRequest = representQueueBind(request)
|
|
case basicMethodMap[20]:
|
|
repRequest = representBasicConsume(request)
|
|
}
|
|
representation["request"] = repRequest
|
|
object, err = json.Marshal(representation)
|
|
return
|
|
}
|
|
|
|
func (d dissecting) Macros() map[string]string {
|
|
return map[string]string{
|
|
`amqp`: fmt.Sprintf(`proto.name == "%s"`, protocol.Name),
|
|
}
|
|
}
|
|
|
|
func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher {
|
|
return nil
|
|
}
|
|
|
|
var Dissector dissecting
|
|
|
|
func NewDissector() api.Dissector {
|
|
return Dissector
|
|
}
|