Files
kubeshark/tap/extensions/amqp/main.go
M. Mert Yıldıran 1f2f63d11b Implement AMQP request-response matcher (#1091)
* Implement the basis of AMQP request-response matching

* Fix `package.json`

* Add `ExchangeDeclareOk`

* Add `ConnectionCloseOk`

* Add `BasicConsumeOk`

* Add `QueueBindOk`

* Add `representEmptyResponse` and fix `BasicPublish` and `BasicDeliver`

* Fix ident and matcher, add `connectionOpen`, `channelOpen`, `connectionTune`, `basicCancel`

* Fix linter

* Fix the unit tests

* #run_acceptance_tests

* #run_acceptance_tests

* Fix the tests #run_acceptance_tests

* Log don't panic

* Don't skip AMQP acceptance tests #run_acceptance_tests

* Revert "Don't skip AMQP acceptance tests #run_acceptance_tests"

This reverts commit c60e9cf747.

* Remove `Details` section from `representEmpty`

* Add `This request or response has no data.` text
2022-07-11 17:33:25 +03:00

454 lines
14 KiB
Go

package amqp
import (
"bufio"
"encoding/json"
"fmt"
"io"
"log"
"strconv"
"time"
"github.com/up9inc/mizu/tap/api"
)
var protocol = api.Protocol{
ProtocolSummary: api.ProtocolSummary{
Name: "amqp",
Version: "0-9-1",
Abbreviation: "AMQP",
},
LongName: "Advanced Message Queuing Protocol 0-9-1",
Macro: "amqp",
BackgroundColor: "#ff6600",
ForegroundColor: "#ffffff",
FontSize: 12,
ReferenceLink: "https://www.rabbitmq.com/amqp-0-9-1-reference.html",
Ports: []string{"5671", "5672"},
Priority: 1,
}
var protocolsMap = map[string]*api.Protocol{
protocol.ToString(): &protocol,
}
type dissecting string
func (d dissecting) Register(extension *api.Extension) {
extension.Protocol = &protocol
}
func (d dissecting) GetProtocols() map[string]*api.Protocol {
return protocolsMap
}
func (d dissecting) Ping() {
log.Printf("pong %s", protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.TrafficFilteringOptions) error {
r := AmqpReader{b}
var remaining int
var header *HeaderFrame
eventBasicPublish := &BasicPublish{
Exchange: "",
RoutingKey: "",
Mandatory: false,
Immediate: false,
Body: nil,
Properties: Properties{},
}
eventBasicDeliver := &BasicDeliver{
ConsumerTag: "",
DeliveryTag: 0,
Redelivered: false,
Exchange: "",
RoutingKey: "",
Properties: Properties{},
Body: nil,
}
var lastMethodFrameMessage Message
var ident string
isClient := reader.GetIsClient()
reqResMatcher := reader.GetReqResMatcher().(*requestResponseMatcher)
for {
frameVal, err := r.readFrame()
if err == io.EOF {
// We must read until we see an EOF... very important!
return err
}
switch f := frameVal.(type) {
case *HeartbeatFrame:
// drop
case *HeaderFrame:
reader.GetParent().SetProtocol(&protocol)
// start content state
header = f
remaining = int(header.Size)
// Workaround for `Time.MarshalJSON: year outside of range [0,9999]` error
if header.Properties.Timestamp.Year() > 9999 {
header.Properties.Timestamp = time.Time{}.UTC()
}
switch lastMethodFrameMessage.(type) {
case *BasicPublish:
eventBasicPublish.Properties = header.Properties
case *BasicDeliver:
eventBasicDeliver.Properties = header.Properties
}
case *BodyFrame:
reader.GetParent().SetProtocol(&protocol)
// continue until terminated
remaining -= len(f.Body)
switch lastMethodFrameMessage.(type) {
case *BasicPublish:
eventBasicPublish.Body = f.Body
reqResMatcher.emitEvent(isClient, ident, basicMethodMap[40], *eventBasicPublish, reader)
reqResMatcher.emitEvent(!isClient, ident, emptyMethod, &emptyResponse{}, reader)
case *BasicDeliver:
eventBasicDeliver.Body = f.Body
reqResMatcher.emitEvent(!isClient, ident, basicMethodMap[60], *eventBasicDeliver, reader)
reqResMatcher.emitEvent(isClient, ident, emptyMethod, &emptyResponse{}, reader)
}
case *MethodFrame:
reader.GetParent().SetProtocol(&protocol)
lastMethodFrameMessage = f.Method
ident = getIdent(reader, f)
switch m := f.Method.(type) {
case *BasicPublish:
eventBasicPublish.Exchange = m.Exchange
eventBasicPublish.RoutingKey = m.RoutingKey
eventBasicPublish.Mandatory = m.Mandatory
eventBasicPublish.Immediate = m.Immediate
case *QueueBind:
eventQueueBind := &QueueBind{
Queue: m.Queue,
Exchange: m.Exchange,
RoutingKey: m.RoutingKey,
NoWait: m.NoWait,
Arguments: m.Arguments,
}
reqResMatcher.emitEvent(isClient, ident, queueMethodMap[20], *eventQueueBind, reader)
case *QueueBindOk:
reqResMatcher.emitEvent(isClient, ident, queueMethodMap[21], m, reader)
case *BasicConsume:
eventBasicConsume := &BasicConsume{
Queue: m.Queue,
ConsumerTag: m.ConsumerTag,
NoLocal: m.NoLocal,
NoAck: m.NoAck,
Exclusive: m.Exclusive,
NoWait: m.NoWait,
Arguments: m.Arguments,
}
reqResMatcher.emitEvent(isClient, ident, basicMethodMap[20], *eventBasicConsume, reader)
case *BasicConsumeOk:
reqResMatcher.emitEvent(isClient, ident, basicMethodMap[21], m, reader)
case *BasicDeliver:
eventBasicDeliver.ConsumerTag = m.ConsumerTag
eventBasicDeliver.DeliveryTag = m.DeliveryTag
eventBasicDeliver.Redelivered = m.Redelivered
eventBasicDeliver.Exchange = m.Exchange
eventBasicDeliver.RoutingKey = m.RoutingKey
case *QueueDeclare:
eventQueueDeclare := &QueueDeclare{
Queue: m.Queue,
Passive: m.Passive,
Durable: m.Durable,
AutoDelete: m.AutoDelete,
Exclusive: m.Exclusive,
NoWait: m.NoWait,
Arguments: m.Arguments,
}
reqResMatcher.emitEvent(isClient, ident, queueMethodMap[10], *eventQueueDeclare, reader)
case *QueueDeclareOk:
reqResMatcher.emitEvent(isClient, ident, queueMethodMap[11], m, reader)
case *ExchangeDeclare:
eventExchangeDeclare := &ExchangeDeclare{
Exchange: m.Exchange,
Type: m.Type,
Passive: m.Passive,
Durable: m.Durable,
AutoDelete: m.AutoDelete,
Internal: m.Internal,
NoWait: m.NoWait,
Arguments: m.Arguments,
}
reqResMatcher.emitEvent(isClient, ident, exchangeMethodMap[10], *eventExchangeDeclare, reader)
case *ExchangeDeclareOk:
reqResMatcher.emitEvent(isClient, ident, exchangeMethodMap[11], m, reader)
case *ConnectionStart:
// In our tests, *ConnectionStart does not result in *ConnectionStartOk
reqResMatcher.emitEvent(!isClient, ident, connectionMethodMap[10], m, reader)
reqResMatcher.emitEvent(isClient, ident, emptyMethod, &emptyResponse{}, reader)
case *ConnectionStartOk:
// In our tests, *ConnectionStart does not result in *ConnectionStartOk
reqResMatcher.emitEvent(isClient, ident, connectionMethodMap[11], m, reader)
case *ConnectionClose:
eventConnectionClose := &ConnectionClose{
ReplyCode: m.ReplyCode,
ReplyText: m.ReplyText,
ClassId: m.ClassId,
MethodId: m.MethodId,
}
reqResMatcher.emitEvent(isClient, ident, connectionMethodMap[50], *eventConnectionClose, reader)
case *ConnectionCloseOk:
reqResMatcher.emitEvent(isClient, ident, connectionMethodMap[51], m, reader)
case *connectionOpen:
eventConnectionOpen := &connectionOpen{
VirtualHost: m.VirtualHost,
}
reqResMatcher.emitEvent(isClient, ident, connectionMethodMap[40], *eventConnectionOpen, reader)
case *connectionOpenOk:
reqResMatcher.emitEvent(isClient, ident, connectionMethodMap[41], m, reader)
case *channelOpen:
reqResMatcher.emitEvent(isClient, ident, channelMethodMap[10], m, reader)
case *channelOpenOk:
reqResMatcher.emitEvent(isClient, ident, channelMethodMap[11], m, reader)
case *connectionTune:
// In our tests, *connectionTune does not result in *connectionTuneOk
reqResMatcher.emitEvent(!isClient, ident, connectionMethodMap[30], m, reader)
reqResMatcher.emitEvent(isClient, ident, emptyMethod, &emptyResponse{}, reader)
case *connectionTuneOk:
// In our tests, *connectionTune does not result in *connectionTuneOk
reqResMatcher.emitEvent(isClient, ident, connectionMethodMap[31], m, reader)
case *basicCancel:
reqResMatcher.emitEvent(isClient, ident, basicMethodMap[30], m, reader)
case *basicCancelOk:
reqResMatcher.emitEvent(isClient, ident, basicMethodMap[31], m, reader)
}
default:
// log.Printf("unexpected frameVal: %+v", f)
}
}
}
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry {
request := item.Pair.Request.Payload.(map[string]interface{})
response := item.Pair.Response.Payload.(map[string]interface{})
reqDetails := request["details"].(map[string]interface{})
resDetails := response["details"].(map[string]interface{})
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
if elapsedTime < 0 {
elapsedTime = 0
}
reqDetails["method"] = request["method"]
resDetails["method"] = response["method"]
return &api.Entry{
Protocol: protocol.ProtocolSummary,
Capture: item.Capture,
Source: &api.TCP{
Name: resolvedSource,
IP: item.ConnectionInfo.ClientIP,
Port: item.ConnectionInfo.ClientPort,
},
Destination: &api.TCP{
Name: resolvedDestination,
IP: item.ConnectionInfo.ServerIP,
Port: item.ConnectionInfo.ServerPort,
},
Namespace: namespace,
Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails,
Response: resDetails,
RequestSize: item.Pair.Request.CaptureSize,
ResponseSize: item.Pair.Response.CaptureSize,
Timestamp: item.Timestamp,
StartTime: item.Pair.Request.CaptureTime,
ElapsedTime: elapsedTime,
}
}
func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
summary := ""
summaryQuery := ""
method := entry.Request["method"].(string)
methodQuery := fmt.Sprintf(`request.method == "%s"`, method)
switch method {
case basicMethodMap[40]:
summary = entry.Request["exchange"].(string)
summaryQuery = fmt.Sprintf(`request.exchange == "%s"`, summary)
case basicMethodMap[60]:
summary = entry.Request["exchange"].(string)
summaryQuery = fmt.Sprintf(`request.exchange == "%s"`, summary)
case exchangeMethodMap[10]:
summary = entry.Request["exchange"].(string)
summaryQuery = fmt.Sprintf(`request.exchange == "%s"`, summary)
case queueMethodMap[10]:
summary = entry.Request["queue"].(string)
summaryQuery = fmt.Sprintf(`request.queue == "%s"`, summary)
case connectionMethodMap[10]:
versionMajor := int(entry.Request["versionMajor"].(float64))
versionMinor := int(entry.Request["versionMinor"].(float64))
summary = fmt.Sprintf(
"%s.%s",
strconv.Itoa(versionMajor),
strconv.Itoa(versionMinor),
)
summaryQuery = fmt.Sprintf(`request.versionMajor == %d and request.versionMinor == %d`, versionMajor, versionMinor)
case connectionMethodMap[50]:
summary = entry.Request["replyText"].(string)
summaryQuery = fmt.Sprintf(`request.replyText == "%s"`, summary)
case queueMethodMap[20]:
summary = entry.Request["queue"].(string)
summaryQuery = fmt.Sprintf(`request.queue == "%s"`, summary)
case basicMethodMap[20]:
summary = entry.Request["queue"].(string)
summaryQuery = fmt.Sprintf(`request.queue == "%s"`, summary)
case connectionMethodMap[40]:
summary = entry.Request["virtualHost"].(string)
summaryQuery = fmt.Sprintf(`request.virtualHost == "%s"`, summary)
case connectionMethodMap[30]:
summary = fmt.Sprintf("%g", entry.Request["channelMax"].(float64))
summaryQuery = fmt.Sprintf(`request.channelMax == "%s"`, summary)
case connectionMethodMap[31]:
summary = fmt.Sprintf("%g", entry.Request["channelMax"].(float64))
summaryQuery = fmt.Sprintf(`request.channelMax == "%s"`, summary)
case basicMethodMap[30]:
summary = entry.Request["consumerTag"].(string)
summaryQuery = fmt.Sprintf(`request.consumerTag == "%s"`, summary)
case basicMethodMap[31]:
summary = entry.Request["consumerTag"].(string)
summaryQuery = fmt.Sprintf(`request.consumerTag == "%s"`, summary)
}
return &api.BaseEntry{
Id: entry.Id,
Protocol: *protocolsMap[entry.Protocol.ToString()],
Capture: entry.Capture,
Summary: summary,
SummaryQuery: summaryQuery,
Status: 0,
StatusQuery: "",
Method: method,
MethodQuery: methodQuery,
Timestamp: entry.Timestamp,
Source: entry.Source,
Destination: entry.Destination,
IsOutgoing: entry.Outgoing,
Latency: entry.ElapsedTime,
}
}
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, err error) {
representation := make(map[string]interface{})
var repRequest []interface{}
var repResponse []interface{}
switch request["method"].(string) {
case basicMethodMap[40]:
repRequest = representBasicPublish(request)
case basicMethodMap[60]:
repRequest = representBasicDeliver(request)
case queueMethodMap[10]:
repRequest = representQueueDeclare(request)
case exchangeMethodMap[10]:
repRequest = representExchangeDeclare(request)
case connectionMethodMap[10]:
repRequest = representConnectionStart(request)
case connectionMethodMap[50]:
repRequest = representConnectionClose(request)
case queueMethodMap[20]:
repRequest = representQueueBind(request)
case basicMethodMap[20]:
repRequest = representBasicConsume(request)
case connectionMethodMap[40]:
repRequest = representConnectionOpen(request)
case channelMethodMap[10]:
repRequest = representEmpty(request)
case connectionMethodMap[30]:
repRequest = representConnectionTune(request)
case basicMethodMap[30]:
repRequest = representBasicCancel(request)
}
switch response["method"].(string) {
case queueMethodMap[11]:
repResponse = representQueueDeclareOk(response)
case exchangeMethodMap[11]:
repResponse = representEmpty(response)
case connectionMethodMap[11]:
repResponse = representConnectionStartOk(response)
case connectionMethodMap[51]:
repResponse = representEmpty(response)
case basicMethodMap[21]:
repResponse = representBasicConsumeOk(response)
case queueMethodMap[21]:
repResponse = representEmpty(response)
case connectionMethodMap[41]:
repResponse = representEmpty(response)
case channelMethodMap[11]:
repResponse = representEmpty(request)
case connectionMethodMap[31]:
repResponse = representConnectionTune(request)
case basicMethodMap[31]:
repResponse = representBasicCancelOk(request)
case emptyMethod:
repResponse = representEmpty(response)
}
representation["request"] = repRequest
representation["response"] = repResponse
object, err = json.Marshal(representation)
return
}
func (d dissecting) Macros() map[string]string {
return map[string]string{
`amqp`: fmt.Sprintf(`protocol.name == "%s"`, protocol.Name),
}
}
func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher {
return createResponseRequestMatcher()
}
var Dissector dissecting
func NewDissector() api.Dissector {
return Dissector
}