kubeshark/tap/extensions/amqp/main.go
2022-01-25 05:10:32 +03:00

335 lines
9.0 KiB
Go

package amqp
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"strconv"
"github.com/up9inc/mizu/tap/api"
)
var protocol api.Protocol = api.Protocol{
Name: "amqp",
LongName: "Advanced Message Queuing Protocol 0-9-1",
Abbreviation: "AMQP",
Macro: "amqp",
Version: "0-9-1",
BackgroundColor: "#ff6600",
ForegroundColor: "#ffffff",
FontSize: 12,
ReferenceLink: "https://www.rabbitmq.com/amqp-0-9-1-reference.html",
Ports: []string{"5671", "5672"},
Priority: 1,
}
func init() {
log.Println("Initializing AMQP extension...")
}
type dissecting string
func (d dissecting) Register(extension *api.Extension) {
extension.Protocol = &protocol
}
func (d dissecting) Ping() {
log.Printf("pong %s", protocol.Name)
}
const amqpRequest string = "amqp_request"
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
r := AmqpReader{b}
var remaining int
var header *HeaderFrame
var body []byte
connectionInfo := &api.ConnectionInfo{
ClientIP: tcpID.SrcIP,
ClientPort: tcpID.SrcPort,
ServerIP: tcpID.DstIP,
ServerPort: tcpID.DstPort,
IsOutgoing: true,
}
eventBasicPublish := &BasicPublish{
Exchange: "",
RoutingKey: "",
Mandatory: false,
Immediate: false,
Body: nil,
Properties: Properties{},
}
eventBasicDeliver := &BasicDeliver{
ConsumerTag: "",
DeliveryTag: 0,
Redelivered: false,
Exchange: "",
RoutingKey: "",
Properties: Properties{},
Body: nil,
}
var lastMethodFrameMessage Message
for {
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &protocol {
return errors.New("Identified by another protocol")
}
frame, err := r.ReadFrame()
if err == io.EOF {
// We must read until we see an EOF... very important!
return errors.New("AMQP EOF")
}
switch f := frame.(type) {
case *HeartbeatFrame:
// drop
case *HeaderFrame:
// start content state
header = f
remaining = int(header.Size)
switch lastMethodFrameMessage.(type) {
case *BasicPublish:
eventBasicPublish.Properties = header.Properties
case *BasicDeliver:
eventBasicDeliver.Properties = header.Properties
default:
frame = nil
}
case *BodyFrame:
// continue until terminated
body = append(body, f.Body...)
remaining -= len(f.Body)
switch lastMethodFrameMessage.(type) {
case *BasicPublish:
eventBasicPublish.Body = f.Body
superIdentifier.Protocol = &protocol
emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, superTimer.CaptureTime, emitter)
case *BasicDeliver:
eventBasicDeliver.Body = f.Body
superIdentifier.Protocol = &protocol
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, superTimer.CaptureTime, emitter)
default:
body = nil
frame = nil
}
case *MethodFrame:
lastMethodFrameMessage = f.Method
switch m := f.Method.(type) {
case *BasicPublish:
eventBasicPublish.Exchange = m.Exchange
eventBasicPublish.RoutingKey = m.RoutingKey
eventBasicPublish.Mandatory = m.Mandatory
eventBasicPublish.Immediate = m.Immediate
case *QueueBind:
eventQueueBind := &QueueBind{
Queue: m.Queue,
Exchange: m.Exchange,
RoutingKey: m.RoutingKey,
NoWait: m.NoWait,
Arguments: m.Arguments,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter)
case *BasicConsume:
eventBasicConsume := &BasicConsume{
Queue: m.Queue,
ConsumerTag: m.ConsumerTag,
NoLocal: m.NoLocal,
NoAck: m.NoAck,
Exclusive: m.Exclusive,
NoWait: m.NoWait,
Arguments: m.Arguments,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter)
case *BasicDeliver:
eventBasicDeliver.ConsumerTag = m.ConsumerTag
eventBasicDeliver.DeliveryTag = m.DeliveryTag
eventBasicDeliver.Redelivered = m.Redelivered
eventBasicDeliver.Exchange = m.Exchange
eventBasicDeliver.RoutingKey = m.RoutingKey
case *QueueDeclare:
eventQueueDeclare := &QueueDeclare{
Queue: m.Queue,
Passive: m.Passive,
Durable: m.Durable,
AutoDelete: m.AutoDelete,
Exclusive: m.Exclusive,
NoWait: m.NoWait,
Arguments: m.Arguments,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter)
case *ExchangeDeclare:
eventExchangeDeclare := &ExchangeDeclare{
Exchange: m.Exchange,
Type: m.Type,
Passive: m.Passive,
Durable: m.Durable,
AutoDelete: m.AutoDelete,
Internal: m.Internal,
NoWait: m.NoWait,
Arguments: m.Arguments,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter)
case *ConnectionStart:
eventConnectionStart := &ConnectionStart{
VersionMajor: m.VersionMajor,
VersionMinor: m.VersionMinor,
ServerProperties: m.ServerProperties,
Mechanisms: m.Mechanisms,
Locales: m.Locales,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter)
case *ConnectionClose:
eventConnectionClose := &ConnectionClose{
ReplyCode: m.ReplyCode,
ReplyText: m.ReplyText,
ClassId: m.ClassId,
MethodId: m.MethodId,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, superTimer.CaptureTime, emitter)
default:
frame = nil
}
default:
// log.Printf("unexpected frame: %+v", f)
}
}
}
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.Entry {
request := item.Pair.Request.Payload.(map[string]interface{})
reqDetails := request["details"].(map[string]interface{})
summary := ""
switch request["method"] {
case basicMethodMap[40]:
summary = reqDetails["exchange"].(string)
break
case basicMethodMap[60]:
summary = reqDetails["exchange"].(string)
break
case exchangeMethodMap[10]:
summary = reqDetails["exchange"].(string)
break
case queueMethodMap[10]:
summary = reqDetails["queue"].(string)
break
case connectionMethodMap[10]:
summary = fmt.Sprintf(
"%s.%s",
strconv.Itoa(int(reqDetails["versionMajor"].(float64))),
strconv.Itoa(int(reqDetails["versionMinor"].(float64))),
)
break
case connectionMethodMap[50]:
summary = reqDetails["replyText"].(string)
break
case queueMethodMap[20]:
summary = reqDetails["queue"].(string)
break
case basicMethodMap[20]:
summary = reqDetails["queue"].(string)
break
}
request["url"] = summary
reqDetails["method"] = request["method"]
return &api.Entry{
Protocol: protocol,
Source: &api.TCP{
Name: resolvedSource,
IP: item.ConnectionInfo.ClientIP,
Port: item.ConnectionInfo.ClientPort,
},
Destination: &api.TCP{
Name: resolvedDestination,
IP: item.ConnectionInfo.ServerIP,
Port: item.ConnectionInfo.ServerPort,
},
Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails,
Method: request["method"].(string),
Status: 0,
Timestamp: item.Timestamp,
StartTime: item.Pair.Request.CaptureTime,
ElapsedTime: 0,
Summary: summary,
IsOutgoing: item.ConnectionInfo.IsOutgoing,
}
}
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) {
bodySize = 0
representation := make(map[string]interface{}, 0)
var repRequest []interface{}
switch request["method"].(string) {
case basicMethodMap[40]:
repRequest = representBasicPublish(request)
break
case basicMethodMap[60]:
repRequest = representBasicDeliver(request)
break
case queueMethodMap[10]:
repRequest = representQueueDeclare(request)
break
case exchangeMethodMap[10]:
repRequest = representExchangeDeclare(request)
break
case connectionMethodMap[10]:
repRequest = representConnectionStart(request)
break
case connectionMethodMap[50]:
repRequest = representConnectionClose(request)
break
case queueMethodMap[20]:
repRequest = representQueueBind(request)
break
case basicMethodMap[20]:
repRequest = representBasicConsume(request)
break
}
representation["request"] = repRequest
object, err = json.Marshal(representation)
return
}
func (d dissecting) Macros() map[string]string {
return map[string]string{
`amqp`: fmt.Sprintf(`proto.name == "%s"`, protocol.Name),
}
}
var Dissector dissecting
func NewDissector() api.Dissector {
return Dissector
}