Fix the read errors that freezes the sniffer in HTTP and Kafka

This commit is contained in:
M. Mert Yildiran 2021-08-22 22:44:11 +03:00
parent 6f84138eb7
commit 60a6be677f
No known key found for this signature in database
GPG Key ID: D42ADB236521BF7A
6 changed files with 28 additions and 14 deletions

View File

@ -78,6 +78,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
io.ReadAll(b)
rlog.Debugf("[HTTP/2] stream %s error: %s (%v,%+v)", ident, err, err, err)
continue
}
@ -86,6 +87,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
io.ReadAll(b)
rlog.Debugf("[HTTP-request] stream %s Request error: %s (%v,%+v)", ident, err, err, err)
continue
}
@ -94,6 +96,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
io.ReadAll(b)
rlog.Debugf("[HTTP-response], stream %s Response error: %s (%v,%+v)", ident, err, err, err)
continue
}

View File

@ -103,7 +103,7 @@ func (d *decoder) decodeCompactBytes(v value) {
}
func (d *decoder) decodeArray(v value, elemType reflect.Type, decodeElem decodeFunc) {
if n := d.readInt32(); n < 0 {
if n := d.readInt32(); n < 0 || n > 65535 {
v.setArray(array{})
} else {
a := makeArray(elemType, int(n))
@ -115,7 +115,7 @@ func (d *decoder) decodeArray(v value, elemType reflect.Type, decodeElem decodeF
}
func (d *decoder) decodeCompactArray(v value, elemType reflect.Type, decodeElem decodeFunc) {
if n := d.readUnsignedVarInt(); n < 1 {
if n := d.readUnsignedVarInt(); n < 1 || n > 65535 {
v.setArray(array{})
} else {
a := makeArray(elemType, int(n-1))

View File

@ -221,7 +221,11 @@ func representApiVersionsResponse(data map[string]interface{}) []interface{} {
rep = representResponseHeader(data, rep)
payload := data["Payload"].(map[string]interface{})
apiKeys, _ := json.Marshal(payload["ApiKeys"].([]interface{}))
apiKeys := ""
if payload["TopicNames"] != nil {
x, _ := json.Marshal(payload["ApiKeys"].([]interface{}))
apiKeys = string(x)
}
throttleTimeMs := ""
if payload["ThrottleTimeMs"] != nil {
throttleTimeMs = fmt.Sprintf("%d", int(payload["ThrottleTimeMs"].(float64)))
@ -233,7 +237,7 @@ func representApiVersionsResponse(data map[string]interface{}) []interface{} {
},
{
"name": "ApiKeys",
"value": string(apiKeys),
"value": apiKeys,
},
{
"name": "Throttle Time (ms)",

View File

@ -1,6 +1,7 @@
package main
import (
"errors"
"fmt"
"io"
"log"
@ -33,20 +34,24 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID) (apiKey ApiKey, apiVersion int16
correlationID := d.readInt32()
clientID := d.readString()
if apiKey == UpdateMetadata {
return
}
if i := int(apiKey); i < 0 || i >= len(apiTypes) {
err = fmt.Errorf("unsupported api key: %d", i)
return apiKey, 0, err
return apiKey, apiVersion, err
}
if err = d.err; err != nil {
err = dontExpectEOF(err)
return apiKey, 0, err
return apiKey, apiVersion, err
}
t := &apiTypes[apiKey]
if t == nil {
err = fmt.Errorf("unsupported api: %s", apiNames[apiKey])
return apiKey, 0, err
return apiKey, apiVersion, err
}
var payload interface{}
@ -197,8 +202,9 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID) (apiKey ApiKey, apiVersion int16
mt.(messageType).decode(d, valueOf(deleteTopicsRequest))
payload = deleteTopicsRequest
default:
log.Printf("[WARNING] (Request) Not implemented: %s\n", apiKey)
break
msg := fmt.Sprintf("[WARNING] (Request) Not implemented: %s\n", apiKey)
log.Printf(msg)
return apiKey, 0, errors.New(msg)
}
request := &Request{

View File

@ -242,8 +242,9 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error
mt.(messageType).decode(d, valueOf(deleteTopicsResponse))
reqResPair.Response.Payload = deleteTopicsResponse
default:
log.Printf("[WARNING] (Response) Not implemented: %s\n", apiKey)
break
msg := fmt.Sprintf("[WARNING] (Response) Not implemented: %s\n", apiKey)
log.Printf(msg)
return errors.New(msg)
}
connectionInfo := &api.ConnectionInfo{

View File

@ -4,7 +4,6 @@ import (
"bufio"
"fmt"
"io"
"strconv"
"sync"
"time"
@ -75,8 +74,9 @@ func (h *tcpReader) Read(p []byte) (int, error) {
err := clientHello.Unmarshall(msg.bytes)
if err == nil {
fmt.Printf("Detected TLS client hello with SNI %s\n", clientHello.SNI)
numericPort, _ := strconv.Atoi(h.tcpID.DstPort)
h.outboundLinkWriter.WriteOutboundLink(h.tcpID.SrcIP, h.tcpID.DstIP, numericPort, clientHello.SNI, TLSProtocol)
// TODO: Throws `panic: runtime error: invalid memory address or nil pointer dereference` error.
// numericPort, _ := strconv.Atoi(h.tcpID.DstPort)
// h.outboundLinkWriter.WriteOutboundLink(h.tcpID.SrcIP, h.tcpID.DstIP, numericPort, clientHello.SNI, TLSProtocol)
}
}
}