package amqp import ( "bufio" "encoding/json" "errors" "fmt" "io" "log" "strconv" "github.com/up9inc/mizu/tap/api" ) var protocol api.Protocol = api.Protocol{ Name: "amqp", LongName: "Advanced Message Queuing Protocol 0-9-1", Abbreviation: "AMQP", Macro: "amqp", Version: "0-9-1", BackgroundColor: "#ff6600", ForegroundColor: "#ffffff", FontSize: 12, ReferenceLink: "https://www.rabbitmq.com/amqp-0-9-1-reference.html", Ports: []string{"5671", "5672"}, Priority: 1, } func init() { log.Println("Initializing AMQP extension...") } type dissecting string func (d dissecting) Register(extension *api.Extension) { extension.Protocol = &protocol } func (d dissecting) Ping() { log.Printf("pong %s", protocol.Name) } const amqpRequest string = "amqp_request" func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error { r := AmqpReader{b} var remaining int var header *HeaderFrame var body []byte connectionInfo := &api.ConnectionInfo{ ClientIP: tcpID.SrcIP, ClientPort: tcpID.SrcPort, ServerIP: tcpID.DstIP, ServerPort: tcpID.DstPort, IsOutgoing: true, } eventBasicPublish := &BasicPublish{ Exchange: "", RoutingKey: "", Mandatory: false, Immediate: false, Body: nil, Properties: Properties{}, } eventBasicDeliver := &BasicDeliver{ ConsumerTag: "", DeliveryTag: 0, Redelivered: false, Exchange: "", RoutingKey: "", Properties: Properties{}, Body: nil, } var lastMethodFrameMessage Message for { if superIdentifier.Protocol != nil && superIdentifier.Protocol != &protocol { return errors.New("Identified by another protocol") } frame, err := r.ReadFrame() if err == io.EOF { // We must read until we see an EOF... very important! return errors.New("AMQP EOF") } switch f := frame.(type) { case *HeartbeatFrame: // drop case *HeaderFrame: // start content state header = f remaining = int(header.Size) switch lastMethodFrameMessage.(type) { case *BasicPublish: eventBasicPublish.Properties = header.Properties case *BasicDeliver: eventBasicDeliver.Properties = header.Properties default: frame = nil } case *BodyFrame: // continue until terminated body = append(body, f.Body...) remaining -= len(f.Body) switch lastMethodFrameMessage.(type) { case *BasicPublish: eventBasicPublish.Body = f.Body superIdentifier.Protocol = &protocol emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, superTimer.CaptureTime, emitter) case *BasicDeliver: eventBasicDeliver.Body = f.Body superIdentifier.Protocol = &protocol emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, superTimer.CaptureTime, emitter) default: body = nil frame = nil } case *MethodFrame: lastMethodFrameMessage = f.Method switch m := f.Method.(type) { case *BasicPublish: eventBasicPublish.Exchange = m.Exchange eventBasicPublish.RoutingKey = m.RoutingKey eventBasicPublish.Mandatory = m.Mandatory eventBasicPublish.Immediate = m.Immediate case *QueueBind: eventQueueBind := &QueueBind{ Queue: m.Queue, Exchange: m.Exchange, RoutingKey: m.RoutingKey, NoWait: m.NoWait, Arguments: m.Arguments, } superIdentifier.Protocol = &protocol emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter) case *BasicConsume: eventBasicConsume := &BasicConsume{ Queue: m.Queue, ConsumerTag: m.ConsumerTag, NoLocal: m.NoLocal, NoAck: m.NoAck, Exclusive: m.Exclusive, NoWait: m.NoWait, Arguments: m.Arguments, } superIdentifier.Protocol = &protocol emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter) case *BasicDeliver: eventBasicDeliver.ConsumerTag = m.ConsumerTag eventBasicDeliver.DeliveryTag = m.DeliveryTag eventBasicDeliver.Redelivered = m.Redelivered eventBasicDeliver.Exchange = m.Exchange eventBasicDeliver.RoutingKey = m.RoutingKey case *QueueDeclare: eventQueueDeclare := &QueueDeclare{ Queue: m.Queue, Passive: m.Passive, Durable: m.Durable, AutoDelete: m.AutoDelete, Exclusive: m.Exclusive, NoWait: m.NoWait, Arguments: m.Arguments, } superIdentifier.Protocol = &protocol emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter) case *ExchangeDeclare: eventExchangeDeclare := &ExchangeDeclare{ Exchange: m.Exchange, Type: m.Type, Passive: m.Passive, Durable: m.Durable, AutoDelete: m.AutoDelete, Internal: m.Internal, NoWait: m.NoWait, Arguments: m.Arguments, } superIdentifier.Protocol = &protocol emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter) case *ConnectionStart: eventConnectionStart := &ConnectionStart{ VersionMajor: m.VersionMajor, VersionMinor: m.VersionMinor, ServerProperties: m.ServerProperties, Mechanisms: m.Mechanisms, Locales: m.Locales, } superIdentifier.Protocol = &protocol emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter) case *ConnectionClose: eventConnectionClose := &ConnectionClose{ ReplyCode: m.ReplyCode, ReplyText: m.ReplyText, ClassId: m.ClassId, MethodId: m.MethodId, } superIdentifier.Protocol = &protocol emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, superTimer.CaptureTime, emitter) default: frame = nil } default: // log.Printf("unexpected frame: %+v", f) } } } func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.Entry { request := item.Pair.Request.Payload.(map[string]interface{}) reqDetails := request["details"].(map[string]interface{}) summary := "" switch request["method"] { case basicMethodMap[40]: summary = reqDetails["exchange"].(string) break case basicMethodMap[60]: summary = reqDetails["exchange"].(string) break case exchangeMethodMap[10]: summary = reqDetails["exchange"].(string) break case queueMethodMap[10]: summary = reqDetails["queue"].(string) break case connectionMethodMap[10]: summary = fmt.Sprintf( "%s.%s", strconv.Itoa(int(reqDetails["versionMajor"].(float64))), strconv.Itoa(int(reqDetails["versionMinor"].(float64))), ) break case connectionMethodMap[50]: summary = reqDetails["replyText"].(string) break case queueMethodMap[20]: summary = reqDetails["queue"].(string) break case basicMethodMap[20]: summary = reqDetails["queue"].(string) break } request["url"] = summary reqDetails["method"] = request["method"] return &api.Entry{ Protocol: protocol, Source: &api.TCP{ Name: resolvedSource, IP: item.ConnectionInfo.ClientIP, Port: item.ConnectionInfo.ClientPort, }, Destination: &api.TCP{ Name: resolvedDestination, IP: item.ConnectionInfo.ServerIP, Port: item.ConnectionInfo.ServerPort, }, Outgoing: item.ConnectionInfo.IsOutgoing, Request: reqDetails, Method: request["method"].(string), Status: 0, Timestamp: item.Timestamp, StartTime: item.Pair.Request.CaptureTime, ElapsedTime: 0, Summary: summary, IsOutgoing: item.ConnectionInfo.IsOutgoing, } } func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) { bodySize = 0 representation := make(map[string]interface{}, 0) var repRequest []interface{} switch request["method"].(string) { case basicMethodMap[40]: repRequest = representBasicPublish(request) break case basicMethodMap[60]: repRequest = representBasicDeliver(request) break case queueMethodMap[10]: repRequest = representQueueDeclare(request) break case exchangeMethodMap[10]: repRequest = representExchangeDeclare(request) break case connectionMethodMap[10]: repRequest = representConnectionStart(request) break case connectionMethodMap[50]: repRequest = representConnectionClose(request) break case queueMethodMap[20]: repRequest = representQueueBind(request) break case basicMethodMap[20]: repRequest = representBasicConsume(request) break } representation["request"] = repRequest object, err = json.Marshal(representation) return } func (d dissecting) Macros() map[string]string { return map[string]string{ `amqp`: fmt.Sprintf(`proto.name == "%s"`, protocol.Name), } } var Dissector dissecting func NewDissector() api.Dissector { return Dissector }