diff --git a/cli/kubernetes/provider.go b/cli/kubernetes/provider.go index 221895ad4..ba760f4ef 100644 --- a/cli/kubernetes/provider.go +++ b/cli/kubernetes/provider.go @@ -578,6 +578,7 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac "-i", "any", "--tap", "--api-server-address", fmt.Sprintf("ws://%s/wsTapper", apiServerPodIp), + "--nodefrag" } if tapOutgoing { mizuCmd = append(mizuCmd, "--anydirection") diff --git a/tap/extensions/amqp/helpers.go b/tap/extensions/amqp/helpers.go new file mode 100644 index 000000000..35d62e3b8 --- /dev/null +++ b/tap/extensions/amqp/helpers.go @@ -0,0 +1,186 @@ +package main + +import "fmt" + +var connectionMethodMap = map[int]string{ + 10: "connection start", + 11: "connection start-ok", + 20: "connection secure", + 21: "connection secure-ok", + 30: "connection tune", + 31: "connection tune-ok", + 40: "connection open", + 41: "connection open-ok", + 50: "connection close", + 51: "connection close-ok", + 60: "connection blocked", + 61: "connection unblocked", +} + +var channelMethodMap = map[int]string{ + 10: "channel open", + 11: "channel open-ok", + 20: "channel flow", + 21: "channel flow-ok", + 40: "channel close", + 41: "channel close-ok", +} + +var exchangeMethodMap = map[int]string{ + 10: "exchange declare", + 11: "exchange declare-ok", + 20: "exchange delete", + 21: "exchange delete-ok", + 30: "exchange bind", + 31: "exchange bind-ok", + 40: "exchange unbind", + 51: "exchange unbind-ok", +} + +var queueMethodMap = map[int]string{ + 10: "queue declare", + 11: "queue declare-ok", + 20: "queue bind", + 21: "queue bind-ok", + 50: "queue unbind", + 51: "queue unbind-ok", + 30: "queue purge", + 31: "queue purge-ok", + 40: "queue delete", + 41: "queue delete-ok", +} + +var basicMethodMap = map[int]string{ + 10: "basic qos", + 11: "basic qos-ok", + 20: "basic consume", + 21: "basic consume-ok", + 30: "basic cancel", + 31: "basic cancel-ok", + 40: "basic publish", + 50: "basic return", + 60: "basic deliver", + 70: "basic get", + 71: "basic get-ok", + 72: "basic get-empty", + 80: "basic ack", + 90: "basic reject", + 100: "basic recover-async", + 110: "basic recover", + 111: "basic recover-ok", + 120: "basic nack", +} + +var txMethodMap = map[int]string{ + 10: "tx select", + 11: "tx select-ok", + 20: "tx commit", + 21: "tx commit-ok", + 30: "tx rollback", + 31: "tx rollback-ok", +} + +func printEventBasicPublish(eventBasicPublish BasicPublish) { + fmt.Printf( + "[%s] Exchange: %s, RoutingKey: %s, Mandatory: %t, Immediate: %t, Properties: %v, Body: %s\n", + basicMethodMap[40], + eventBasicPublish.Exchange, + eventBasicPublish.RoutingKey, + eventBasicPublish.Mandatory, + eventBasicPublish.Immediate, + eventBasicPublish.Properties, + eventBasicPublish.Body, + ) +} + +func printEventBasicDeliver(eventBasicDeliver BasicDeliver) { + fmt.Printf( + "[%s] ConsumerTag: %s, DeliveryTag: %d, Redelivered: %t, Exchange: %s, RoutingKey: %s, Properties: %v, Body: %s\n", + basicMethodMap[60], + eventBasicDeliver.ConsumerTag, + eventBasicDeliver.DeliveryTag, + eventBasicDeliver.Redelivered, + eventBasicDeliver.Exchange, + eventBasicDeliver.RoutingKey, + eventBasicDeliver.Properties, + eventBasicDeliver.Body, + ) +} + +func printEventQueueDeclare(eventQueueDeclare QueueDeclare) { + fmt.Printf( + "[%s] Queue: %s, Passive: %t, Durable: %t, AutoDelete: %t, Exclusive: %t, NoWait: %t, Arguments: %v\n", + queueMethodMap[10], + eventQueueDeclare.Queue, + eventQueueDeclare.Passive, + eventQueueDeclare.Durable, + eventQueueDeclare.AutoDelete, + eventQueueDeclare.Exclusive, + eventQueueDeclare.NoWait, + eventQueueDeclare.Arguments, + ) +} + +func printEventExchangeDeclare(eventExchangeDeclare ExchangeDeclare) { + fmt.Printf( + "[%s] Exchange: %s, Type: %s, Passive: %t, Durable: %t, AutoDelete: %t, Internal: %t, NoWait: %t, Arguments: %v\n", + exchangeMethodMap[10], + eventExchangeDeclare.Exchange, + eventExchangeDeclare.Type, + eventExchangeDeclare.Passive, + eventExchangeDeclare.Durable, + eventExchangeDeclare.AutoDelete, + eventExchangeDeclare.Internal, + eventExchangeDeclare.NoWait, + eventExchangeDeclare.Arguments, + ) +} + +func printEventConnectionStart(eventConnectionStart ConnectionStart) { + fmt.Printf( + "[%s] Version: %d.%d, ServerProperties: %v, Mechanisms: %s, Locales: %s\n", + connectionMethodMap[10], + eventConnectionStart.VersionMajor, + eventConnectionStart.VersionMinor, + eventConnectionStart.ServerProperties, + eventConnectionStart.Mechanisms, + eventConnectionStart.Locales, + ) +} + +func printEventConnectionClose(eventConnectionClose ConnectionClose) { + fmt.Printf( + "[%s] ReplyCode: %d, ReplyText: %s, ClassId: %d, MethodId: %d\n", + connectionMethodMap[50], + eventConnectionClose.ReplyCode, + eventConnectionClose.ReplyText, + eventConnectionClose.ClassId, + eventConnectionClose.MethodId, + ) +} + +func printEventQueueBind(eventQueueBind QueueBind) { + fmt.Printf( + "[%s] Queue: %s, Exchange: %s, RoutingKey: %s, NoWait: %t, Arguments: %v\n", + queueMethodMap[20], + eventQueueBind.Queue, + eventQueueBind.Exchange, + eventQueueBind.RoutingKey, + eventQueueBind.NoWait, + eventQueueBind.Arguments, + ) +} + +func printEventBasicConsume(eventBasicConsume BasicConsume) { + fmt.Printf( + "[%s] Queue: %s, ConsumerTag: %s, NoLocal: %t, NoAck: %t, Exclusive: %t, NoWait: %t, Arguments: %v\n", + basicMethodMap[20], + eventBasicConsume.Queue, + eventBasicConsume.ConsumerTag, + eventBasicConsume.NoLocal, + eventBasicConsume.NoAck, + eventBasicConsume.Exclusive, + eventBasicConsume.NoWait, + eventBasicConsume.Arguments, + ) +} diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index b7c4a21b4..c7ddc5cbb 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -2,6 +2,7 @@ package main import ( "bufio" + "io" "log" "github.com/up9inc/mizu/tap/api" @@ -33,7 +34,162 @@ func (d dissecting) Ping() { } func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) { - // TODO: Implement + r := AmqpReader{b} + + var remaining int + var header *HeaderFrame + var body []byte + + eventBasicPublish := &BasicPublish{ + Exchange: "", + RoutingKey: "", + Mandatory: false, + Immediate: false, + Body: nil, + Properties: Properties{}, + } + + eventBasicDeliver := &BasicDeliver{ + ConsumerTag: "", + DeliveryTag: 0, + Redelivered: false, + Exchange: "", + RoutingKey: "", + Properties: Properties{}, + Body: nil, + } + + var lastMethodFrameMessage Message + + for { + frame, err := r.ReadFrame() + if err == io.EOF { + // We must read until we see an EOF... very important! + return + } else if err != nil { + // log.Println("Error reading stream", h.net, h.transport, ":", err) + } + + switch f := frame.(type) { + case *HeartbeatFrame: + // drop + + case *HeaderFrame: + // start content state + header = f + remaining = int(header.Size) + switch lastMethodFrameMessage.(type) { + case *BasicPublish: + eventBasicPublish.Properties = header.Properties + case *BasicDeliver: + eventBasicDeliver.Properties = header.Properties + default: + } + + case *BodyFrame: + // continue until terminated + body = append(body, f.Body...) + remaining -= len(f.Body) + switch lastMethodFrameMessage.(type) { + case *BasicPublish: + eventBasicPublish.Body = f.Body + printEventBasicPublish(*eventBasicPublish) + case *BasicDeliver: + eventBasicDeliver.Body = f.Body + printEventBasicDeliver(*eventBasicDeliver) + default: + } + + case *MethodFrame: + lastMethodFrameMessage = f.Method + switch m := f.Method.(type) { + case *BasicPublish: + eventBasicPublish.Exchange = m.Exchange + eventBasicPublish.RoutingKey = m.RoutingKey + eventBasicPublish.Mandatory = m.Mandatory + eventBasicPublish.Immediate = m.Immediate + + case *QueueBind: + eventQueueBind := &QueueBind{ + Queue: m.Queue, + Exchange: m.Exchange, + RoutingKey: m.RoutingKey, + NoWait: m.NoWait, + Arguments: m.Arguments, + } + printEventQueueBind(*eventQueueBind) + + case *BasicConsume: + eventBasicConsume := &BasicConsume{ + Queue: m.Queue, + ConsumerTag: m.ConsumerTag, + NoLocal: m.NoLocal, + NoAck: m.NoAck, + Exclusive: m.Exclusive, + NoWait: m.NoWait, + Arguments: m.Arguments, + } + printEventBasicConsume(*eventBasicConsume) + + case *BasicDeliver: + eventBasicDeliver.ConsumerTag = m.ConsumerTag + eventBasicDeliver.DeliveryTag = m.DeliveryTag + eventBasicDeliver.Redelivered = m.Redelivered + eventBasicDeliver.Exchange = m.Exchange + eventBasicDeliver.RoutingKey = m.RoutingKey + + case *QueueDeclare: + eventQueueDeclare := &QueueDeclare{ + Queue: m.Queue, + Passive: m.Passive, + Durable: m.Durable, + AutoDelete: m.AutoDelete, + Exclusive: m.Exclusive, + NoWait: m.NoWait, + Arguments: m.Arguments, + } + printEventQueueDeclare(*eventQueueDeclare) + + case *ExchangeDeclare: + eventExchangeDeclare := &ExchangeDeclare{ + Exchange: m.Exchange, + Type: m.Type, + Passive: m.Passive, + Durable: m.Durable, + AutoDelete: m.AutoDelete, + Internal: m.Internal, + NoWait: m.NoWait, + Arguments: m.Arguments, + } + printEventExchangeDeclare(*eventExchangeDeclare) + + case *ConnectionStart: + eventConnectionStart := &ConnectionStart{ + VersionMajor: m.VersionMajor, + VersionMinor: m.VersionMinor, + ServerProperties: m.ServerProperties, + Mechanisms: m.Mechanisms, + Locales: m.Locales, + } + printEventConnectionStart(*eventConnectionStart) + + case *ConnectionClose: + eventConnectionClose := &ConnectionClose{ + ReplyCode: m.ReplyCode, + ReplyText: m.ReplyText, + ClassId: m.ClassId, + MethodId: m.MethodId, + } + printEventConnectionClose(*eventConnectionClose) + + default: + + } + + default: + // fmt.Printf("unexpected frame: %+v\n", f) + } + } } func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *api.MizuEntry { diff --git a/tap/passive_tapper.go b/tap/passive_tapper.go index 113951d03..6433ad20f 100644 --- a/tap/passive_tapper.go +++ b/tap/passive_tapper.go @@ -244,7 +244,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem, allExtensionPor appPortsStr := os.Getenv(AppPortsEnvVar) var appPorts []int if appPortsStr == "" { - rlog.Info("Received empty/no APP_PORTS env var! only listening to ports: %v!", allExtensionPorts) + rlog.Info("Received empty/no APP_PORTS env var! only listening to ports:", allExtensionPorts) appPorts = make([]int, 0) } else { appPorts = parseAppPorts(appPortsStr)