mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-09-01 10:36:55 +00:00
* Implement the basis of AMQP request-response matching
* Fix `package.json`
* Add `ExchangeDeclareOk`
* Add `ConnectionCloseOk`
* Add `BasicConsumeOk`
* Add `QueueBindOk`
* Add `representEmptyResponse` and fix `BasicPublish` and `BasicDeliver`
* Fix ident and matcher, add `connectionOpen`, `channelOpen`, `connectionTune`, `basicCancel`
* Fix linter
* Fix the unit tests
* #run_acceptance_tests
* #run_acceptance_tests
* Fix the tests #run_acceptance_tests
* Log don't panic
* Don't skip AMQP acceptance tests #run_acceptance_tests
* Revert "Don't skip AMQP acceptance tests #run_acceptance_tests"
This reverts commit c60e9cf747
.
* Remove `Details` section from `representEmpty`
* Add `This request or response has no data.` text
454 lines
14 KiB
Go
454 lines
14 KiB
Go
package amqp
|
|
|
|
import (
|
|
"bufio"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/up9inc/mizu/tap/api"
|
|
)
|
|
|
|
var protocol = api.Protocol{
|
|
ProtocolSummary: api.ProtocolSummary{
|
|
Name: "amqp",
|
|
Version: "0-9-1",
|
|
Abbreviation: "AMQP",
|
|
},
|
|
LongName: "Advanced Message Queuing Protocol 0-9-1",
|
|
Macro: "amqp",
|
|
BackgroundColor: "#ff6600",
|
|
ForegroundColor: "#ffffff",
|
|
FontSize: 12,
|
|
ReferenceLink: "https://www.rabbitmq.com/amqp-0-9-1-reference.html",
|
|
Ports: []string{"5671", "5672"},
|
|
Priority: 1,
|
|
}
|
|
|
|
var protocolsMap = map[string]*api.Protocol{
|
|
protocol.ToString(): &protocol,
|
|
}
|
|
|
|
type dissecting string
|
|
|
|
func (d dissecting) Register(extension *api.Extension) {
|
|
extension.Protocol = &protocol
|
|
}
|
|
|
|
func (d dissecting) GetProtocols() map[string]*api.Protocol {
|
|
return protocolsMap
|
|
}
|
|
|
|
func (d dissecting) Ping() {
|
|
log.Printf("pong %s", protocol.Name)
|
|
}
|
|
|
|
func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.TrafficFilteringOptions) error {
|
|
r := AmqpReader{b}
|
|
|
|
var remaining int
|
|
var header *HeaderFrame
|
|
|
|
eventBasicPublish := &BasicPublish{
|
|
Exchange: "",
|
|
RoutingKey: "",
|
|
Mandatory: false,
|
|
Immediate: false,
|
|
Body: nil,
|
|
Properties: Properties{},
|
|
}
|
|
|
|
eventBasicDeliver := &BasicDeliver{
|
|
ConsumerTag: "",
|
|
DeliveryTag: 0,
|
|
Redelivered: false,
|
|
Exchange: "",
|
|
RoutingKey: "",
|
|
Properties: Properties{},
|
|
Body: nil,
|
|
}
|
|
|
|
var lastMethodFrameMessage Message
|
|
|
|
var ident string
|
|
isClient := reader.GetIsClient()
|
|
reqResMatcher := reader.GetReqResMatcher().(*requestResponseMatcher)
|
|
|
|
for {
|
|
frameVal, err := r.readFrame()
|
|
if err == io.EOF {
|
|
// We must read until we see an EOF... very important!
|
|
return err
|
|
}
|
|
|
|
switch f := frameVal.(type) {
|
|
case *HeartbeatFrame:
|
|
// drop
|
|
|
|
case *HeaderFrame:
|
|
reader.GetParent().SetProtocol(&protocol)
|
|
|
|
// start content state
|
|
header = f
|
|
remaining = int(header.Size)
|
|
|
|
// Workaround for `Time.MarshalJSON: year outside of range [0,9999]` error
|
|
if header.Properties.Timestamp.Year() > 9999 {
|
|
header.Properties.Timestamp = time.Time{}.UTC()
|
|
}
|
|
|
|
switch lastMethodFrameMessage.(type) {
|
|
case *BasicPublish:
|
|
eventBasicPublish.Properties = header.Properties
|
|
case *BasicDeliver:
|
|
eventBasicDeliver.Properties = header.Properties
|
|
}
|
|
|
|
case *BodyFrame:
|
|
reader.GetParent().SetProtocol(&protocol)
|
|
|
|
// continue until terminated
|
|
remaining -= len(f.Body)
|
|
switch lastMethodFrameMessage.(type) {
|
|
case *BasicPublish:
|
|
eventBasicPublish.Body = f.Body
|
|
reqResMatcher.emitEvent(isClient, ident, basicMethodMap[40], *eventBasicPublish, reader)
|
|
reqResMatcher.emitEvent(!isClient, ident, emptyMethod, &emptyResponse{}, reader)
|
|
|
|
case *BasicDeliver:
|
|
eventBasicDeliver.Body = f.Body
|
|
reqResMatcher.emitEvent(!isClient, ident, basicMethodMap[60], *eventBasicDeliver, reader)
|
|
reqResMatcher.emitEvent(isClient, ident, emptyMethod, &emptyResponse{}, reader)
|
|
}
|
|
|
|
case *MethodFrame:
|
|
reader.GetParent().SetProtocol(&protocol)
|
|
|
|
lastMethodFrameMessage = f.Method
|
|
|
|
ident = getIdent(reader, f)
|
|
|
|
switch m := f.Method.(type) {
|
|
case *BasicPublish:
|
|
eventBasicPublish.Exchange = m.Exchange
|
|
eventBasicPublish.RoutingKey = m.RoutingKey
|
|
eventBasicPublish.Mandatory = m.Mandatory
|
|
eventBasicPublish.Immediate = m.Immediate
|
|
|
|
case *QueueBind:
|
|
eventQueueBind := &QueueBind{
|
|
Queue: m.Queue,
|
|
Exchange: m.Exchange,
|
|
RoutingKey: m.RoutingKey,
|
|
NoWait: m.NoWait,
|
|
Arguments: m.Arguments,
|
|
}
|
|
reqResMatcher.emitEvent(isClient, ident, queueMethodMap[20], *eventQueueBind, reader)
|
|
|
|
case *QueueBindOk:
|
|
reqResMatcher.emitEvent(isClient, ident, queueMethodMap[21], m, reader)
|
|
|
|
case *BasicConsume:
|
|
eventBasicConsume := &BasicConsume{
|
|
Queue: m.Queue,
|
|
ConsumerTag: m.ConsumerTag,
|
|
NoLocal: m.NoLocal,
|
|
NoAck: m.NoAck,
|
|
Exclusive: m.Exclusive,
|
|
NoWait: m.NoWait,
|
|
Arguments: m.Arguments,
|
|
}
|
|
reqResMatcher.emitEvent(isClient, ident, basicMethodMap[20], *eventBasicConsume, reader)
|
|
|
|
case *BasicConsumeOk:
|
|
reqResMatcher.emitEvent(isClient, ident, basicMethodMap[21], m, reader)
|
|
|
|
case *BasicDeliver:
|
|
eventBasicDeliver.ConsumerTag = m.ConsumerTag
|
|
eventBasicDeliver.DeliveryTag = m.DeliveryTag
|
|
eventBasicDeliver.Redelivered = m.Redelivered
|
|
eventBasicDeliver.Exchange = m.Exchange
|
|
eventBasicDeliver.RoutingKey = m.RoutingKey
|
|
|
|
case *QueueDeclare:
|
|
eventQueueDeclare := &QueueDeclare{
|
|
Queue: m.Queue,
|
|
Passive: m.Passive,
|
|
Durable: m.Durable,
|
|
AutoDelete: m.AutoDelete,
|
|
Exclusive: m.Exclusive,
|
|
NoWait: m.NoWait,
|
|
Arguments: m.Arguments,
|
|
}
|
|
reqResMatcher.emitEvent(isClient, ident, queueMethodMap[10], *eventQueueDeclare, reader)
|
|
|
|
case *QueueDeclareOk:
|
|
reqResMatcher.emitEvent(isClient, ident, queueMethodMap[11], m, reader)
|
|
|
|
case *ExchangeDeclare:
|
|
eventExchangeDeclare := &ExchangeDeclare{
|
|
Exchange: m.Exchange,
|
|
Type: m.Type,
|
|
Passive: m.Passive,
|
|
Durable: m.Durable,
|
|
AutoDelete: m.AutoDelete,
|
|
Internal: m.Internal,
|
|
NoWait: m.NoWait,
|
|
Arguments: m.Arguments,
|
|
}
|
|
reqResMatcher.emitEvent(isClient, ident, exchangeMethodMap[10], *eventExchangeDeclare, reader)
|
|
|
|
case *ExchangeDeclareOk:
|
|
reqResMatcher.emitEvent(isClient, ident, exchangeMethodMap[11], m, reader)
|
|
|
|
case *ConnectionStart:
|
|
// In our tests, *ConnectionStart does not result in *ConnectionStartOk
|
|
reqResMatcher.emitEvent(!isClient, ident, connectionMethodMap[10], m, reader)
|
|
reqResMatcher.emitEvent(isClient, ident, emptyMethod, &emptyResponse{}, reader)
|
|
|
|
case *ConnectionStartOk:
|
|
// In our tests, *ConnectionStart does not result in *ConnectionStartOk
|
|
reqResMatcher.emitEvent(isClient, ident, connectionMethodMap[11], m, reader)
|
|
|
|
case *ConnectionClose:
|
|
eventConnectionClose := &ConnectionClose{
|
|
ReplyCode: m.ReplyCode,
|
|
ReplyText: m.ReplyText,
|
|
ClassId: m.ClassId,
|
|
MethodId: m.MethodId,
|
|
}
|
|
reqResMatcher.emitEvent(isClient, ident, connectionMethodMap[50], *eventConnectionClose, reader)
|
|
|
|
case *ConnectionCloseOk:
|
|
reqResMatcher.emitEvent(isClient, ident, connectionMethodMap[51], m, reader)
|
|
|
|
case *connectionOpen:
|
|
eventConnectionOpen := &connectionOpen{
|
|
VirtualHost: m.VirtualHost,
|
|
}
|
|
reqResMatcher.emitEvent(isClient, ident, connectionMethodMap[40], *eventConnectionOpen, reader)
|
|
|
|
case *connectionOpenOk:
|
|
reqResMatcher.emitEvent(isClient, ident, connectionMethodMap[41], m, reader)
|
|
|
|
case *channelOpen:
|
|
reqResMatcher.emitEvent(isClient, ident, channelMethodMap[10], m, reader)
|
|
|
|
case *channelOpenOk:
|
|
reqResMatcher.emitEvent(isClient, ident, channelMethodMap[11], m, reader)
|
|
|
|
case *connectionTune:
|
|
// In our tests, *connectionTune does not result in *connectionTuneOk
|
|
reqResMatcher.emitEvent(!isClient, ident, connectionMethodMap[30], m, reader)
|
|
reqResMatcher.emitEvent(isClient, ident, emptyMethod, &emptyResponse{}, reader)
|
|
|
|
case *connectionTuneOk:
|
|
// In our tests, *connectionTune does not result in *connectionTuneOk
|
|
reqResMatcher.emitEvent(isClient, ident, connectionMethodMap[31], m, reader)
|
|
|
|
case *basicCancel:
|
|
reqResMatcher.emitEvent(isClient, ident, basicMethodMap[30], m, reader)
|
|
|
|
case *basicCancelOk:
|
|
reqResMatcher.emitEvent(isClient, ident, basicMethodMap[31], m, reader)
|
|
}
|
|
|
|
default:
|
|
// log.Printf("unexpected frameVal: %+v", f)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry {
|
|
request := item.Pair.Request.Payload.(map[string]interface{})
|
|
response := item.Pair.Response.Payload.(map[string]interface{})
|
|
reqDetails := request["details"].(map[string]interface{})
|
|
resDetails := response["details"].(map[string]interface{})
|
|
|
|
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
|
|
if elapsedTime < 0 {
|
|
elapsedTime = 0
|
|
}
|
|
|
|
reqDetails["method"] = request["method"]
|
|
resDetails["method"] = response["method"]
|
|
return &api.Entry{
|
|
Protocol: protocol.ProtocolSummary,
|
|
Capture: item.Capture,
|
|
Source: &api.TCP{
|
|
Name: resolvedSource,
|
|
IP: item.ConnectionInfo.ClientIP,
|
|
Port: item.ConnectionInfo.ClientPort,
|
|
},
|
|
Destination: &api.TCP{
|
|
Name: resolvedDestination,
|
|
IP: item.ConnectionInfo.ServerIP,
|
|
Port: item.ConnectionInfo.ServerPort,
|
|
},
|
|
Namespace: namespace,
|
|
Outgoing: item.ConnectionInfo.IsOutgoing,
|
|
Request: reqDetails,
|
|
Response: resDetails,
|
|
RequestSize: item.Pair.Request.CaptureSize,
|
|
ResponseSize: item.Pair.Response.CaptureSize,
|
|
Timestamp: item.Timestamp,
|
|
StartTime: item.Pair.Request.CaptureTime,
|
|
ElapsedTime: elapsedTime,
|
|
}
|
|
|
|
}
|
|
|
|
func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
|
|
summary := ""
|
|
summaryQuery := ""
|
|
method := entry.Request["method"].(string)
|
|
methodQuery := fmt.Sprintf(`request.method == "%s"`, method)
|
|
switch method {
|
|
case basicMethodMap[40]:
|
|
summary = entry.Request["exchange"].(string)
|
|
summaryQuery = fmt.Sprintf(`request.exchange == "%s"`, summary)
|
|
case basicMethodMap[60]:
|
|
summary = entry.Request["exchange"].(string)
|
|
summaryQuery = fmt.Sprintf(`request.exchange == "%s"`, summary)
|
|
case exchangeMethodMap[10]:
|
|
summary = entry.Request["exchange"].(string)
|
|
summaryQuery = fmt.Sprintf(`request.exchange == "%s"`, summary)
|
|
case queueMethodMap[10]:
|
|
summary = entry.Request["queue"].(string)
|
|
summaryQuery = fmt.Sprintf(`request.queue == "%s"`, summary)
|
|
case connectionMethodMap[10]:
|
|
versionMajor := int(entry.Request["versionMajor"].(float64))
|
|
versionMinor := int(entry.Request["versionMinor"].(float64))
|
|
summary = fmt.Sprintf(
|
|
"%s.%s",
|
|
strconv.Itoa(versionMajor),
|
|
strconv.Itoa(versionMinor),
|
|
)
|
|
summaryQuery = fmt.Sprintf(`request.versionMajor == %d and request.versionMinor == %d`, versionMajor, versionMinor)
|
|
case connectionMethodMap[50]:
|
|
summary = entry.Request["replyText"].(string)
|
|
summaryQuery = fmt.Sprintf(`request.replyText == "%s"`, summary)
|
|
case queueMethodMap[20]:
|
|
summary = entry.Request["queue"].(string)
|
|
summaryQuery = fmt.Sprintf(`request.queue == "%s"`, summary)
|
|
case basicMethodMap[20]:
|
|
summary = entry.Request["queue"].(string)
|
|
summaryQuery = fmt.Sprintf(`request.queue == "%s"`, summary)
|
|
case connectionMethodMap[40]:
|
|
summary = entry.Request["virtualHost"].(string)
|
|
summaryQuery = fmt.Sprintf(`request.virtualHost == "%s"`, summary)
|
|
case connectionMethodMap[30]:
|
|
summary = fmt.Sprintf("%g", entry.Request["channelMax"].(float64))
|
|
summaryQuery = fmt.Sprintf(`request.channelMax == "%s"`, summary)
|
|
case connectionMethodMap[31]:
|
|
summary = fmt.Sprintf("%g", entry.Request["channelMax"].(float64))
|
|
summaryQuery = fmt.Sprintf(`request.channelMax == "%s"`, summary)
|
|
case basicMethodMap[30]:
|
|
summary = entry.Request["consumerTag"].(string)
|
|
summaryQuery = fmt.Sprintf(`request.consumerTag == "%s"`, summary)
|
|
case basicMethodMap[31]:
|
|
summary = entry.Request["consumerTag"].(string)
|
|
summaryQuery = fmt.Sprintf(`request.consumerTag == "%s"`, summary)
|
|
}
|
|
|
|
return &api.BaseEntry{
|
|
Id: entry.Id,
|
|
Protocol: *protocolsMap[entry.Protocol.ToString()],
|
|
Capture: entry.Capture,
|
|
Summary: summary,
|
|
SummaryQuery: summaryQuery,
|
|
Status: 0,
|
|
StatusQuery: "",
|
|
Method: method,
|
|
MethodQuery: methodQuery,
|
|
Timestamp: entry.Timestamp,
|
|
Source: entry.Source,
|
|
Destination: entry.Destination,
|
|
IsOutgoing: entry.Outgoing,
|
|
Latency: entry.ElapsedTime,
|
|
}
|
|
}
|
|
|
|
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, err error) {
|
|
representation := make(map[string]interface{})
|
|
var repRequest []interface{}
|
|
var repResponse []interface{}
|
|
|
|
switch request["method"].(string) {
|
|
case basicMethodMap[40]:
|
|
repRequest = representBasicPublish(request)
|
|
case basicMethodMap[60]:
|
|
repRequest = representBasicDeliver(request)
|
|
case queueMethodMap[10]:
|
|
repRequest = representQueueDeclare(request)
|
|
case exchangeMethodMap[10]:
|
|
repRequest = representExchangeDeclare(request)
|
|
case connectionMethodMap[10]:
|
|
repRequest = representConnectionStart(request)
|
|
case connectionMethodMap[50]:
|
|
repRequest = representConnectionClose(request)
|
|
case queueMethodMap[20]:
|
|
repRequest = representQueueBind(request)
|
|
case basicMethodMap[20]:
|
|
repRequest = representBasicConsume(request)
|
|
case connectionMethodMap[40]:
|
|
repRequest = representConnectionOpen(request)
|
|
case channelMethodMap[10]:
|
|
repRequest = representEmpty(request)
|
|
case connectionMethodMap[30]:
|
|
repRequest = representConnectionTune(request)
|
|
case basicMethodMap[30]:
|
|
repRequest = representBasicCancel(request)
|
|
}
|
|
|
|
switch response["method"].(string) {
|
|
case queueMethodMap[11]:
|
|
repResponse = representQueueDeclareOk(response)
|
|
case exchangeMethodMap[11]:
|
|
repResponse = representEmpty(response)
|
|
case connectionMethodMap[11]:
|
|
repResponse = representConnectionStartOk(response)
|
|
case connectionMethodMap[51]:
|
|
repResponse = representEmpty(response)
|
|
case basicMethodMap[21]:
|
|
repResponse = representBasicConsumeOk(response)
|
|
case queueMethodMap[21]:
|
|
repResponse = representEmpty(response)
|
|
case connectionMethodMap[41]:
|
|
repResponse = representEmpty(response)
|
|
case channelMethodMap[11]:
|
|
repResponse = representEmpty(request)
|
|
case connectionMethodMap[31]:
|
|
repResponse = representConnectionTune(request)
|
|
case basicMethodMap[31]:
|
|
repResponse = representBasicCancelOk(request)
|
|
case emptyMethod:
|
|
repResponse = representEmpty(response)
|
|
}
|
|
|
|
representation["request"] = repRequest
|
|
representation["response"] = repResponse
|
|
object, err = json.Marshal(representation)
|
|
|
|
return
|
|
}
|
|
|
|
func (d dissecting) Macros() map[string]string {
|
|
return map[string]string{
|
|
`amqp`: fmt.Sprintf(`protocol.name == "%s"`, protocol.Name),
|
|
}
|
|
}
|
|
|
|
func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher {
|
|
return createResponseRequestMatcher()
|
|
}
|
|
|
|
var Dissector dissecting
|
|
|
|
func NewDissector() api.Dissector {
|
|
return Dissector
|
|
}
|