Make the TCP reader consists of a single Go routine (instead of two) and try to dissect in both client and server mode by rewinding

This commit is contained in:
M. Mert Yildiran
2021-08-25 15:39:03 +03:00
parent 942e7ff19a
commit d0f0e187cb
8 changed files with 64 additions and 85 deletions

View File

@@ -41,6 +41,18 @@ type TcpID struct {
Ident string
}
func (t *TcpID) Swap() {
srcIP := t.SrcIP
dstIP := t.DstIP
srcPort := t.SrcPort
dstPort := t.DstPort
t.SrcIP = dstIP
t.SrcPort = dstPort
t.DstIP = srcIP
t.DstPort = srcPort
}
type GenericMessage struct {
IsRequest bool `json:"is_request"`
CaptureTime time.Time `json:"capture_time"`
@@ -62,7 +74,7 @@ type OutputChannelItem struct {
type Dissector interface {
Register(*Extension)
Ping()
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, emitter Emitter)
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, emitter Emitter) error
Analyze(item *OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *MizuEntry
Summarize(entry *MizuEntry) *BaseEntryDetails
Represent(entry *MizuEntry) (Protocol, []byte, error)

View File

@@ -39,7 +39,7 @@ func (d dissecting) Ping() {
const amqpRequest string = "amqp_request"
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) error {
r := AmqpReader{b}
var remaining int
@@ -79,7 +79,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em
frame, err := r.ReadFrame()
if err == io.EOF {
// We must read until we see an EOF... very important!
return
return nil
} else if err != nil {
// log.Println("Error reading stream", h.net, h.transport, ":", err)
}
@@ -204,6 +204,8 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em
// log.Printf("unexpected frame: %+v\n", f)
}
}
return nil
}
func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *api.MizuEntry {

View File

@@ -6,7 +6,6 @@ import (
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"time"
@@ -78,7 +77,8 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, emitter api.Emit
requestCounter++
req, err := http.ReadRequest(b)
if err != nil {
log.Println("Error reading stream:", err)
requestCounter--
// log.Println("Error reading stream:", err)
return err
}
@@ -120,7 +120,8 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, emitter api.Emit
responseCounter++
res, err := http.ReadResponse(b, nil)
if err != nil {
log.Println("Error reading stream:", err)
responseCounter--
// log.Println("Error reading stream:", err)
return err
}
var req string

View File

@@ -60,7 +60,7 @@ func (d dissecting) Ping() {
log.Printf("pong %s\n", protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) error {
ident := fmt.Sprintf("%s->%s:%s->%s", tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, tcpID.DstPort)
isHTTP2, err := checkIsHTTP2Connection(b, isClient)
if err != nil {
@@ -77,36 +77,43 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em
grpcAssembler = createGrpcAssembler(b)
}
success := false
for {
if isHTTP2 {
err = handleHTTP2Stream(grpcAssembler, tcpID, emitter)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
io.ReadAll(b)
rlog.Debugf("[HTTP/2] stream %s error: %s (%v,%+v)", ident, err, err, err)
continue
}
success = true
} else if isClient {
tcpID.Swap()
err = handleHTTP1ClientStream(b, tcpID, emitter)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
io.ReadAll(b)
rlog.Debugf("[HTTP-request] stream %s Request error: %s (%v,%+v)", ident, err, err, err)
continue
}
success = true
} else {
err = handleHTTP1ServerStream(b, tcpID, emitter)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
io.ReadAll(b)
rlog.Debugf("[HTTP-response], stream %s Response error: %s (%v,%+v)", ident, err, err, err)
continue
}
success = true
}
}
if !success {
return err
}
return nil
}
func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *api.MizuEntry {

View File

@@ -36,7 +36,7 @@ func (d dissecting) Ping() {
log.Printf("pong %s\n", _protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) error {
for {
if isClient {
_, _, err := ReadRequest(b, tcpID)
@@ -52,6 +52,8 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em
}
}
}
return nil
}
func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *api.MizuEntry {

View File

@@ -2,6 +2,7 @@ package tap
import (
"bufio"
"bytes"
"fmt"
"io"
"sync"
@@ -89,29 +90,19 @@ func (h *tcpReader) Read(p []byte) (int, error) {
return l, nil
}
func containsPort(ports []string, port string) bool {
for _, x := range ports {
if x == port {
return true
}
}
return false
}
func (h *tcpReader) run(wg *sync.WaitGroup) {
defer wg.Done()
var port string
if h.isClient {
port = h.tcpID.DstPort
} else {
port = h.tcpID.SrcPort
}
b := bufio.NewReader(h)
// TODO: maybe check for kafka and amqp and when it is not one of those pass it to the HTTP?
// because it will check for the ports that we checked in the "isTapTarget"
for _, extension := range extensions {
if containsPort(extension.Protocol.Ports, port) {
extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.Emitter)
}
}
data, _ := io.ReadAll(h)
r := bytes.NewReader(data)
b := bufio.NewReader(r)
extensions[1].Dissector.Dissect(b, true, h.tcpID, h.Emitter)
r.Reset(data)
b = bufio.NewReader(r)
extensions[1].Dissector.Dissect(b, false, h.tcpID, h.Emitter)
}

View File

@@ -21,10 +21,8 @@ type tcpStream struct {
optchecker reassembly.TCPOptionCheck
net, transport gopacket.Flow
isDNS bool
reader tcpReader
isTapTarget bool
reversed bool
client tcpReader
server tcpReader
urls []string
ident string
sync.Mutex
@@ -145,11 +143,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
// This is where we pass the reassembled information onwards
// This channel is read by an tcpReader object
statsTracker.incReassembledTcpPayloadsCount()
if dir == reassembly.TCPDirClientToServer && !t.reversed {
t.client.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp}
} else {
t.server.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp}
}
t.reader.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp}
}
}
}
@@ -157,8 +151,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
func (t *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool {
Trace("%s: Connection closed", t.ident)
if t.isTapTarget {
close(t.client.msgQueue)
close(t.server.msgQueue)
close(t.reader.msgQueue)
}
// do not remove the connection to allow last ACK
return false

View File

@@ -45,13 +45,12 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
transport: transport,
isDNS: tcp.SrcPort == 53 || tcp.DstPort == 53,
isTapTarget: isTapTarget,
reversed: props.reversed,
tcpstate: reassembly.NewTCPSimpleFSM(fsmOptions),
ident: fmt.Sprintf("%s:%s", net, transport),
optchecker: reassembly.NewTCPOptionCheck(),
}
if stream.isTapTarget {
stream.client = tcpReader{
stream.reader = tcpReader{
msgQueue: make(chan tcpReaderDataMsg),
ident: fmt.Sprintf("%s %s", net, transport),
tcpID: &api.TcpID{
@@ -66,24 +65,9 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
outboundLinkWriter: factory.outbountLinkWriter,
Emitter: factory.Emitter,
}
stream.server = tcpReader{
msgQueue: make(chan tcpReaderDataMsg),
ident: fmt.Sprintf("%s %s", net.Reverse(), transport.Reverse()),
tcpID: &api.TcpID{
SrcIP: net.Dst().String(),
DstIP: net.Src().String(),
SrcPort: transport.Dst().String(),
DstPort: transport.Src().String(),
},
parent: stream,
isOutgoing: props.isOutgoing,
outboundLinkWriter: factory.outbountLinkWriter,
Emitter: factory.Emitter,
}
factory.wg.Add(2)
// Start reading from channels stream.client.bytes and stream.server.bytes
go stream.client.run(&factory.wg)
go stream.server.run(&factory.wg)
factory.wg.Add(1)
// Start reading from channel stream.reader.bytes
go stream.reader.run(&factory.wg)
}
return stream
}
@@ -93,42 +77,30 @@ func (factory *tcpStreamFactory) WaitGoRoutines() {
}
func (factory *tcpStreamFactory) getStreamProps(srcIP string, dstIP string, srcPort string, dstPort string, allExtensionPorts []string) *streamProps {
reversed := false
if hostMode {
// TODO: Implement reversed for the `hostMode`
// TODO: Bring back `filterAuthorities`
return &streamProps{isTapTarget: true, isOutgoing: false}
if inArrayString(gSettings.filterAuthorities, fmt.Sprintf("%s:%s", dstIP, dstPort)) == true {
rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ host1 %s:%s", dstIP, dstPort))
return &streamProps{isTapTarget: true, isOutgoing: false, reversed: reversed}
return &streamProps{isTapTarget: true, isOutgoing: false}
} else if inArrayString(gSettings.filterAuthorities, dstIP) == true {
rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ host2 %s", dstIP))
return &streamProps{isTapTarget: true, isOutgoing: false, reversed: reversed}
return &streamProps{isTapTarget: true, isOutgoing: false}
} else if *anydirection && inArrayString(gSettings.filterAuthorities, srcIP) == true {
rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ host3 %s", srcIP))
return &streamProps{isTapTarget: true, isOutgoing: true, reversed: reversed}
return &streamProps{isTapTarget: true, isOutgoing: true}
}
return &streamProps{isTapTarget: false, reversed: reversed}
return &streamProps{isTapTarget: false}
} else {
// TODO: Bring back `filterPorts` as a string if it's really needed
// (gSettings.filterPorts != nil && (inArrayInt(gSettings.filterPorts, dstPort)))
isTappedPort := containsPort(allExtensionPorts, dstPort)
if !isTappedPort && containsPort(allExtensionPorts, srcPort) {
isTappedPort = true
reversed = true
}
if !isTappedPort {
rlog.Debugf("getStreamProps %s", fmt.Sprintf("- notHost1 %s", dstPort))
return &streamProps{isTapTarget: false, isOutgoing: false, reversed: reversed}
}
isOutgoing := !inArrayString(ownIps, dstIP)
if !*anydirection && isOutgoing {
rlog.Debugf("getStreamProps %s", fmt.Sprintf("- notHost2"))
return &streamProps{isTapTarget: false, isOutgoing: isOutgoing, reversed: reversed}
return &streamProps{isTapTarget: false, isOutgoing: isOutgoing}
}
rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ notHost3 %s -> %s:%s", srcIP, dstIP, dstPort))
return &streamProps{isTapTarget: true, reversed: reversed}
return &streamProps{isTapTarget: true}
}
}
@@ -143,5 +115,4 @@ func (factory *tcpStreamFactory) shouldNotifyOnOutboundLink(dstIP string, dstPor
type streamProps struct {
isTapTarget bool
isOutgoing bool
reversed bool
}