From 60a6be677f72fff4bf6dd391f529df173f129e5a Mon Sep 17 00:00:00 2001 From: "M. Mert Yildiran" Date: Sun, 22 Aug 2021 22:44:11 +0300 Subject: [PATCH] Fix the read errors that freezes the sniffer in HTTP and Kafka --- tap/extensions/http/main.go | 3 +++ tap/extensions/kafka/decode.go | 4 ++-- tap/extensions/kafka/helpers.go | 8 ++++++-- tap/extensions/kafka/request.go | 16 +++++++++++----- tap/extensions/kafka/response.go | 5 +++-- tap/tcp_reader.go | 6 +++--- 6 files changed, 28 insertions(+), 14 deletions(-) diff --git a/tap/extensions/http/main.go b/tap/extensions/http/main.go index a74db0d3f..2797e5562 100644 --- a/tap/extensions/http/main.go +++ b/tap/extensions/http/main.go @@ -78,6 +78,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { + io.ReadAll(b) rlog.Debugf("[HTTP/2] stream %s error: %s (%v,%+v)", ident, err, err, err) continue } @@ -86,6 +87,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { + io.ReadAll(b) rlog.Debugf("[HTTP-request] stream %s Request error: %s (%v,%+v)", ident, err, err, err) continue } @@ -94,6 +96,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { + io.ReadAll(b) rlog.Debugf("[HTTP-response], stream %s Response error: %s (%v,%+v)", ident, err, err, err) continue } diff --git a/tap/extensions/kafka/decode.go b/tap/extensions/kafka/decode.go index 6dd46599f..04a083a03 100644 --- a/tap/extensions/kafka/decode.go +++ b/tap/extensions/kafka/decode.go @@ -103,7 +103,7 @@ func (d *decoder) decodeCompactBytes(v value) { } func (d *decoder) decodeArray(v value, elemType reflect.Type, decodeElem decodeFunc) { - if n := d.readInt32(); n < 0 { + if n := d.readInt32(); n < 0 || n > 65535 { v.setArray(array{}) } else { a := makeArray(elemType, int(n)) @@ -115,7 +115,7 @@ func (d *decoder) decodeArray(v value, elemType reflect.Type, decodeElem decodeF } func (d *decoder) decodeCompactArray(v value, elemType reflect.Type, decodeElem decodeFunc) { - if n := d.readUnsignedVarInt(); n < 1 { + if n := d.readUnsignedVarInt(); n < 1 || n > 65535 { v.setArray(array{}) } else { a := makeArray(elemType, int(n-1)) diff --git a/tap/extensions/kafka/helpers.go b/tap/extensions/kafka/helpers.go index da40d62d1..d95044cb8 100644 --- a/tap/extensions/kafka/helpers.go +++ b/tap/extensions/kafka/helpers.go @@ -221,7 +221,11 @@ func representApiVersionsResponse(data map[string]interface{}) []interface{} { rep = representResponseHeader(data, rep) payload := data["Payload"].(map[string]interface{}) - apiKeys, _ := json.Marshal(payload["ApiKeys"].([]interface{})) + apiKeys := "" + if payload["TopicNames"] != nil { + x, _ := json.Marshal(payload["ApiKeys"].([]interface{})) + apiKeys = string(x) + } throttleTimeMs := "" if payload["ThrottleTimeMs"] != nil { throttleTimeMs = fmt.Sprintf("%d", int(payload["ThrottleTimeMs"].(float64))) @@ -233,7 +237,7 @@ func representApiVersionsResponse(data map[string]interface{}) []interface{} { }, { "name": "ApiKeys", - "value": string(apiKeys), + "value": apiKeys, }, { "name": "Throttle Time (ms)", diff --git a/tap/extensions/kafka/request.go b/tap/extensions/kafka/request.go index bfee59182..80ebeccfb 100644 --- a/tap/extensions/kafka/request.go +++ b/tap/extensions/kafka/request.go @@ -1,6 +1,7 @@ package main import ( + "errors" "fmt" "io" "log" @@ -33,20 +34,24 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID) (apiKey ApiKey, apiVersion int16 correlationID := d.readInt32() clientID := d.readString() + if apiKey == UpdateMetadata { + return + } + if i := int(apiKey); i < 0 || i >= len(apiTypes) { err = fmt.Errorf("unsupported api key: %d", i) - return apiKey, 0, err + return apiKey, apiVersion, err } if err = d.err; err != nil { err = dontExpectEOF(err) - return apiKey, 0, err + return apiKey, apiVersion, err } t := &apiTypes[apiKey] if t == nil { err = fmt.Errorf("unsupported api: %s", apiNames[apiKey]) - return apiKey, 0, err + return apiKey, apiVersion, err } var payload interface{} @@ -197,8 +202,9 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID) (apiKey ApiKey, apiVersion int16 mt.(messageType).decode(d, valueOf(deleteTopicsRequest)) payload = deleteTopicsRequest default: - log.Printf("[WARNING] (Request) Not implemented: %s\n", apiKey) - break + msg := fmt.Sprintf("[WARNING] (Request) Not implemented: %s\n", apiKey) + log.Printf(msg) + return apiKey, 0, errors.New(msg) } request := &Request{ diff --git a/tap/extensions/kafka/response.go b/tap/extensions/kafka/response.go index 119825da9..85152fa20 100644 --- a/tap/extensions/kafka/response.go +++ b/tap/extensions/kafka/response.go @@ -242,8 +242,9 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error mt.(messageType).decode(d, valueOf(deleteTopicsResponse)) reqResPair.Response.Payload = deleteTopicsResponse default: - log.Printf("[WARNING] (Response) Not implemented: %s\n", apiKey) - break + msg := fmt.Sprintf("[WARNING] (Response) Not implemented: %s\n", apiKey) + log.Printf(msg) + return errors.New(msg) } connectionInfo := &api.ConnectionInfo{ diff --git a/tap/tcp_reader.go b/tap/tcp_reader.go index e33f1eefa..b85552f1a 100644 --- a/tap/tcp_reader.go +++ b/tap/tcp_reader.go @@ -4,7 +4,6 @@ import ( "bufio" "fmt" "io" - "strconv" "sync" "time" @@ -75,8 +74,9 @@ func (h *tcpReader) Read(p []byte) (int, error) { err := clientHello.Unmarshall(msg.bytes) if err == nil { fmt.Printf("Detected TLS client hello with SNI %s\n", clientHello.SNI) - numericPort, _ := strconv.Atoi(h.tcpID.DstPort) - h.outboundLinkWriter.WriteOutboundLink(h.tcpID.SrcIP, h.tcpID.DstIP, numericPort, clientHello.SNI, TLSProtocol) + // TODO: Throws `panic: runtime error: invalid memory address or nil pointer dereference` error. + // numericPort, _ := strconv.Atoi(h.tcpID.DstPort) + // h.outboundLinkWriter.WriteOutboundLink(h.tcpID.SrcIP, h.tcpID.DstIP, numericPort, clientHello.SNI, TLSProtocol) } } }