From efe8e849e44dd7d79950cd6211ce85630fee8024 Mon Sep 17 00:00:00 2001 From: "M. Mert Yildiran" Date: Wed, 25 Aug 2021 19:46:46 +0300 Subject: [PATCH] Fix the memory leaks in AMQP and Kafka dissectors --- tap/extensions/amqp/read.go | 5 +++++ tap/extensions/kafka/main.go | 3 --- tap/extensions/kafka/request.go | 4 ++++ tap/extensions/kafka/response.go | 4 ++++ 4 files changed, 13 insertions(+), 3 deletions(-) diff --git a/tap/extensions/amqp/read.go b/tap/extensions/amqp/read.go index 5457a2c5d..fc1c78b6c 100644 --- a/tap/extensions/amqp/read.go +++ b/tap/extensions/amqp/read.go @@ -9,6 +9,7 @@ import ( "bytes" "encoding/binary" "errors" + "fmt" "io" "time" ) @@ -54,6 +55,10 @@ func (r *AmqpReader) ReadFrame() (frame frame, err error) { channel := binary.BigEndian.Uint16(scratch[1:3]) size := binary.BigEndian.Uint32(scratch[3:7]) + if size > (1000000 * 128) { + return nil, fmt.Errorf("An AMQP message cannot be bigger than 128MB") + } + switch typ { case frameMethod: if frame, err = r.parseMethodFrame(channel, size); err != nil { diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index caa111e51..c3e9ac748 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -4,7 +4,6 @@ import ( "bufio" "encoding/json" "fmt" - "io" "log" "github.com/up9inc/mizu/tap/api" @@ -42,13 +41,11 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em if isClient { _, _, err := ReadRequest(b, tcpID) if err != nil { - io.ReadAll(b) break } } else { err := ReadResponse(b, tcpID, emitter) if err != nil { - io.ReadAll(b) break } } diff --git a/tap/extensions/kafka/request.go b/tap/extensions/kafka/request.go index 2b9fbf9d8..4847bf777 100644 --- a/tap/extensions/kafka/request.go +++ b/tap/extensions/kafka/request.go @@ -21,6 +21,10 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID) (apiKey ApiKey, apiVersion int16 d := &decoder{reader: r, remain: 4} size := d.readInt32() + if size > 1000000 { + return 0, 0, fmt.Errorf("A Kafka message cannot be bigger than 1MB") + } + if err = d.err; err != nil { err = dontExpectEOF(err) return 0, 0, err diff --git a/tap/extensions/kafka/response.go b/tap/extensions/kafka/response.go index 8b0821264..bf7133e12 100644 --- a/tap/extensions/kafka/response.go +++ b/tap/extensions/kafka/response.go @@ -19,6 +19,10 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error d := &decoder{reader: r, remain: 4} size := d.readInt32() + if size > 1000000 { + return fmt.Errorf("A Kafka message cannot be bigger than 1MB") + } + if err = d.err; err != nil { err = dontExpectEOF(err) return err