Add --nodefrag flag to the tapper and bring in the main AMQP code

This commit is contained in:
M. Mert Yildiran 2021-08-21 10:44:08 +03:00
parent 5b3f63dd28
commit 0c2140e11b
No known key found for this signature in database
GPG Key ID: D42ADB236521BF7A
4 changed files with 345 additions and 2 deletions

View File

@ -578,6 +578,7 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
"-i", "any",
"--tap",
"--api-server-address", fmt.Sprintf("ws://%s/wsTapper", apiServerPodIp),
"--nodefrag"
}
if tapOutgoing {
mizuCmd = append(mizuCmd, "--anydirection")

View File

@ -0,0 +1,186 @@
package main
import "fmt"
var connectionMethodMap = map[int]string{
10: "connection start",
11: "connection start-ok",
20: "connection secure",
21: "connection secure-ok",
30: "connection tune",
31: "connection tune-ok",
40: "connection open",
41: "connection open-ok",
50: "connection close",
51: "connection close-ok",
60: "connection blocked",
61: "connection unblocked",
}
var channelMethodMap = map[int]string{
10: "channel open",
11: "channel open-ok",
20: "channel flow",
21: "channel flow-ok",
40: "channel close",
41: "channel close-ok",
}
var exchangeMethodMap = map[int]string{
10: "exchange declare",
11: "exchange declare-ok",
20: "exchange delete",
21: "exchange delete-ok",
30: "exchange bind",
31: "exchange bind-ok",
40: "exchange unbind",
51: "exchange unbind-ok",
}
var queueMethodMap = map[int]string{
10: "queue declare",
11: "queue declare-ok",
20: "queue bind",
21: "queue bind-ok",
50: "queue unbind",
51: "queue unbind-ok",
30: "queue purge",
31: "queue purge-ok",
40: "queue delete",
41: "queue delete-ok",
}
var basicMethodMap = map[int]string{
10: "basic qos",
11: "basic qos-ok",
20: "basic consume",
21: "basic consume-ok",
30: "basic cancel",
31: "basic cancel-ok",
40: "basic publish",
50: "basic return",
60: "basic deliver",
70: "basic get",
71: "basic get-ok",
72: "basic get-empty",
80: "basic ack",
90: "basic reject",
100: "basic recover-async",
110: "basic recover",
111: "basic recover-ok",
120: "basic nack",
}
var txMethodMap = map[int]string{
10: "tx select",
11: "tx select-ok",
20: "tx commit",
21: "tx commit-ok",
30: "tx rollback",
31: "tx rollback-ok",
}
func printEventBasicPublish(eventBasicPublish BasicPublish) {
fmt.Printf(
"[%s] Exchange: %s, RoutingKey: %s, Mandatory: %t, Immediate: %t, Properties: %v, Body: %s\n",
basicMethodMap[40],
eventBasicPublish.Exchange,
eventBasicPublish.RoutingKey,
eventBasicPublish.Mandatory,
eventBasicPublish.Immediate,
eventBasicPublish.Properties,
eventBasicPublish.Body,
)
}
func printEventBasicDeliver(eventBasicDeliver BasicDeliver) {
fmt.Printf(
"[%s] ConsumerTag: %s, DeliveryTag: %d, Redelivered: %t, Exchange: %s, RoutingKey: %s, Properties: %v, Body: %s\n",
basicMethodMap[60],
eventBasicDeliver.ConsumerTag,
eventBasicDeliver.DeliveryTag,
eventBasicDeliver.Redelivered,
eventBasicDeliver.Exchange,
eventBasicDeliver.RoutingKey,
eventBasicDeliver.Properties,
eventBasicDeliver.Body,
)
}
func printEventQueueDeclare(eventQueueDeclare QueueDeclare) {
fmt.Printf(
"[%s] Queue: %s, Passive: %t, Durable: %t, AutoDelete: %t, Exclusive: %t, NoWait: %t, Arguments: %v\n",
queueMethodMap[10],
eventQueueDeclare.Queue,
eventQueueDeclare.Passive,
eventQueueDeclare.Durable,
eventQueueDeclare.AutoDelete,
eventQueueDeclare.Exclusive,
eventQueueDeclare.NoWait,
eventQueueDeclare.Arguments,
)
}
func printEventExchangeDeclare(eventExchangeDeclare ExchangeDeclare) {
fmt.Printf(
"[%s] Exchange: %s, Type: %s, Passive: %t, Durable: %t, AutoDelete: %t, Internal: %t, NoWait: %t, Arguments: %v\n",
exchangeMethodMap[10],
eventExchangeDeclare.Exchange,
eventExchangeDeclare.Type,
eventExchangeDeclare.Passive,
eventExchangeDeclare.Durable,
eventExchangeDeclare.AutoDelete,
eventExchangeDeclare.Internal,
eventExchangeDeclare.NoWait,
eventExchangeDeclare.Arguments,
)
}
func printEventConnectionStart(eventConnectionStart ConnectionStart) {
fmt.Printf(
"[%s] Version: %d.%d, ServerProperties: %v, Mechanisms: %s, Locales: %s\n",
connectionMethodMap[10],
eventConnectionStart.VersionMajor,
eventConnectionStart.VersionMinor,
eventConnectionStart.ServerProperties,
eventConnectionStart.Mechanisms,
eventConnectionStart.Locales,
)
}
func printEventConnectionClose(eventConnectionClose ConnectionClose) {
fmt.Printf(
"[%s] ReplyCode: %d, ReplyText: %s, ClassId: %d, MethodId: %d\n",
connectionMethodMap[50],
eventConnectionClose.ReplyCode,
eventConnectionClose.ReplyText,
eventConnectionClose.ClassId,
eventConnectionClose.MethodId,
)
}
func printEventQueueBind(eventQueueBind QueueBind) {
fmt.Printf(
"[%s] Queue: %s, Exchange: %s, RoutingKey: %s, NoWait: %t, Arguments: %v\n",
queueMethodMap[20],
eventQueueBind.Queue,
eventQueueBind.Exchange,
eventQueueBind.RoutingKey,
eventQueueBind.NoWait,
eventQueueBind.Arguments,
)
}
func printEventBasicConsume(eventBasicConsume BasicConsume) {
fmt.Printf(
"[%s] Queue: %s, ConsumerTag: %s, NoLocal: %t, NoAck: %t, Exclusive: %t, NoWait: %t, Arguments: %v\n",
basicMethodMap[20],
eventBasicConsume.Queue,
eventBasicConsume.ConsumerTag,
eventBasicConsume.NoLocal,
eventBasicConsume.NoAck,
eventBasicConsume.Exclusive,
eventBasicConsume.NoWait,
eventBasicConsume.Arguments,
)
}

View File

@ -2,6 +2,7 @@ package main
import (
"bufio"
"io"
"log"
"github.com/up9inc/mizu/tap/api"
@ -33,7 +34,162 @@ func (d dissecting) Ping() {
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) {
// TODO: Implement
r := AmqpReader{b}
var remaining int
var header *HeaderFrame
var body []byte
eventBasicPublish := &BasicPublish{
Exchange: "",
RoutingKey: "",
Mandatory: false,
Immediate: false,
Body: nil,
Properties: Properties{},
}
eventBasicDeliver := &BasicDeliver{
ConsumerTag: "",
DeliveryTag: 0,
Redelivered: false,
Exchange: "",
RoutingKey: "",
Properties: Properties{},
Body: nil,
}
var lastMethodFrameMessage Message
for {
frame, err := r.ReadFrame()
if err == io.EOF {
// We must read until we see an EOF... very important!
return
} else if err != nil {
// log.Println("Error reading stream", h.net, h.transport, ":", err)
}
switch f := frame.(type) {
case *HeartbeatFrame:
// drop
case *HeaderFrame:
// start content state
header = f
remaining = int(header.Size)
switch lastMethodFrameMessage.(type) {
case *BasicPublish:
eventBasicPublish.Properties = header.Properties
case *BasicDeliver:
eventBasicDeliver.Properties = header.Properties
default:
}
case *BodyFrame:
// continue until terminated
body = append(body, f.Body...)
remaining -= len(f.Body)
switch lastMethodFrameMessage.(type) {
case *BasicPublish:
eventBasicPublish.Body = f.Body
printEventBasicPublish(*eventBasicPublish)
case *BasicDeliver:
eventBasicDeliver.Body = f.Body
printEventBasicDeliver(*eventBasicDeliver)
default:
}
case *MethodFrame:
lastMethodFrameMessage = f.Method
switch m := f.Method.(type) {
case *BasicPublish:
eventBasicPublish.Exchange = m.Exchange
eventBasicPublish.RoutingKey = m.RoutingKey
eventBasicPublish.Mandatory = m.Mandatory
eventBasicPublish.Immediate = m.Immediate
case *QueueBind:
eventQueueBind := &QueueBind{
Queue: m.Queue,
Exchange: m.Exchange,
RoutingKey: m.RoutingKey,
NoWait: m.NoWait,
Arguments: m.Arguments,
}
printEventQueueBind(*eventQueueBind)
case *BasicConsume:
eventBasicConsume := &BasicConsume{
Queue: m.Queue,
ConsumerTag: m.ConsumerTag,
NoLocal: m.NoLocal,
NoAck: m.NoAck,
Exclusive: m.Exclusive,
NoWait: m.NoWait,
Arguments: m.Arguments,
}
printEventBasicConsume(*eventBasicConsume)
case *BasicDeliver:
eventBasicDeliver.ConsumerTag = m.ConsumerTag
eventBasicDeliver.DeliveryTag = m.DeliveryTag
eventBasicDeliver.Redelivered = m.Redelivered
eventBasicDeliver.Exchange = m.Exchange
eventBasicDeliver.RoutingKey = m.RoutingKey
case *QueueDeclare:
eventQueueDeclare := &QueueDeclare{
Queue: m.Queue,
Passive: m.Passive,
Durable: m.Durable,
AutoDelete: m.AutoDelete,
Exclusive: m.Exclusive,
NoWait: m.NoWait,
Arguments: m.Arguments,
}
printEventQueueDeclare(*eventQueueDeclare)
case *ExchangeDeclare:
eventExchangeDeclare := &ExchangeDeclare{
Exchange: m.Exchange,
Type: m.Type,
Passive: m.Passive,
Durable: m.Durable,
AutoDelete: m.AutoDelete,
Internal: m.Internal,
NoWait: m.NoWait,
Arguments: m.Arguments,
}
printEventExchangeDeclare(*eventExchangeDeclare)
case *ConnectionStart:
eventConnectionStart := &ConnectionStart{
VersionMajor: m.VersionMajor,
VersionMinor: m.VersionMinor,
ServerProperties: m.ServerProperties,
Mechanisms: m.Mechanisms,
Locales: m.Locales,
}
printEventConnectionStart(*eventConnectionStart)
case *ConnectionClose:
eventConnectionClose := &ConnectionClose{
ReplyCode: m.ReplyCode,
ReplyText: m.ReplyText,
ClassId: m.ClassId,
MethodId: m.MethodId,
}
printEventConnectionClose(*eventConnectionClose)
default:
}
default:
// fmt.Printf("unexpected frame: %+v\n", f)
}
}
}
func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *api.MizuEntry {

View File

@ -244,7 +244,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem, allExtensionPor
appPortsStr := os.Getenv(AppPortsEnvVar)
var appPorts []int
if appPortsStr == "" {
rlog.Info("Received empty/no APP_PORTS env var! only listening to ports: %v!", allExtensionPorts)
rlog.Info("Received empty/no APP_PORTS env var! only listening to ports:", allExtensionPorts)
appPorts = make([]int, 0)
} else {
appPorts = parseAppPorts(appPortsStr)