diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index 8dc9219cf..124fc9b1a 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -4,6 +4,7 @@ import ( "bufio" "encoding/json" "fmt" + "io" "log" "github.com/up9inc/mizu/tap/api" @@ -40,11 +41,13 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em if isClient { _, _, err := ReadRequest(b, tcpID) if err != nil { + io.ReadAll(b) break } } else { err := ReadResponse(b, tcpID, emitter) if err != nil { + io.ReadAll(b) break } } diff --git a/tap/extensions/kafka/request.go b/tap/extensions/kafka/request.go index 80ebeccfb..2b9fbf9d8 100644 --- a/tap/extensions/kafka/request.go +++ b/tap/extensions/kafka/request.go @@ -1,10 +1,8 @@ package main import ( - "errors" "fmt" "io" - "log" "reflect" "github.com/up9inc/mizu/tap/api" @@ -34,10 +32,6 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID) (apiKey ApiKey, apiVersion int16 correlationID := d.readInt32() clientID := d.readString() - if apiKey == UpdateMetadata { - return - } - if i := int(apiKey); i < 0 || i >= len(apiTypes) { err = fmt.Errorf("unsupported api key: %d", i) return apiKey, apiVersion, err @@ -202,9 +196,7 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID) (apiKey ApiKey, apiVersion int16 mt.(messageType).decode(d, valueOf(deleteTopicsRequest)) payload = deleteTopicsRequest default: - msg := fmt.Sprintf("[WARNING] (Request) Not implemented: %s\n", apiKey) - log.Printf(msg) - return apiKey, 0, errors.New(msg) + return apiKey, 0, fmt.Errorf("(Request) Not implemented: %s", apiKey) } request := &Request{ diff --git a/tap/extensions/kafka/response.go b/tap/extensions/kafka/response.go index 85152fa20..f465bc283 100644 --- a/tap/extensions/kafka/response.go +++ b/tap/extensions/kafka/response.go @@ -1,10 +1,8 @@ package main import ( - "errors" "fmt" "io" - "log" "reflect" "time" @@ -45,10 +43,7 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error ) reqResPair := reqResMatcher.registerResponse(key, response) if reqResPair == nil { - d.discardAll() - msg := "Couldn't match a Kafka response to a Kafka request in 3 seconds!" - log.Printf("[WARNING] %s\n", msg) - return errors.New(msg) + return fmt.Errorf("Couldn't match a Kafka response to a Kafka request in 3 seconds!") } apiKey := reqResPair.Request.ApiKey apiVersion := reqResPair.Request.ApiVersion @@ -242,9 +237,7 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error mt.(messageType).decode(d, valueOf(deleteTopicsResponse)) reqResPair.Response.Payload = deleteTopicsResponse default: - msg := fmt.Sprintf("[WARNING] (Response) Not implemented: %s\n", apiKey) - log.Printf(msg) - return errors.New(msg) + return fmt.Errorf("(Response) Not implemented: %s", apiKey) } connectionInfo := &api.ConnectionInfo{