diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index c34ec8de6..9f213cd84 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -3,6 +3,7 @@ package main import ( "bufio" "encoding/json" + "errors" "fmt" "io" "log" @@ -80,9 +81,9 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em frame, err := r.ReadFrame() if err == io.EOF { // We must read until we see an EOF... very important! - return nil + return errors.New("AMQP EOF") } else if err != nil { - // log.Println("Error reading stream", h.net, h.transport, ":", err) + // return err } switch f := frame.(type) { @@ -206,7 +207,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em } } - return nil + return errors.New("AMQP EOF") } func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *api.MizuEntry { diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index c3e9ac748..6a953316b 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -41,12 +41,12 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em if isClient { _, _, err := ReadRequest(b, tcpID) if err != nil { - break + return err } } else { err := ReadResponse(b, tcpID, emitter) if err != nil { - break + return err } } } diff --git a/tap/tcp_reader.go b/tap/tcp_reader.go index 1cc7be82f..b5a5e29fb 100644 --- a/tap/tcp_reader.go +++ b/tap/tcp_reader.go @@ -103,6 +103,9 @@ func (h *tcpReader) run(wg *sync.WaitGroup, isClient bool) { for _, extension := range extensions { r.Reset(data) - extension.Dissector.Dissect(bufio.NewReader(r), isClient, h.tcpID, h.Emitter) + err := extension.Dissector.Dissect(bufio.NewReader(r), isClient, h.tcpID, h.Emitter) + if err == nil { + break + } } }