From d0f0e187cba22b7d1a261e63cf1f4e3c587f5783 Mon Sep 17 00:00:00 2001 From: "M. Mert Yildiran" Date: Wed, 25 Aug 2021 15:39:03 +0300 Subject: [PATCH] Make the TCP reader consists of a single Go routine (instead of two) and try to dissect in both client and server mode by rewinding --- tap/api/api.go | 14 ++++++++- tap/extensions/amqp/main.go | 6 ++-- tap/extensions/http/handlers.go | 7 +++-- tap/extensions/http/main.go | 15 +++++++--- tap/extensions/kafka/main.go | 4 ++- tap/tcp_reader.go | 37 +++++++++-------------- tap/tcp_stream.go | 13 ++------ tap/tcp_stream_factory.go | 53 ++++++++------------------------- 8 files changed, 64 insertions(+), 85 deletions(-) diff --git a/tap/api/api.go b/tap/api/api.go index 57a1ac994..483a78800 100644 --- a/tap/api/api.go +++ b/tap/api/api.go @@ -41,6 +41,18 @@ type TcpID struct { Ident string } +func (t *TcpID) Swap() { + srcIP := t.SrcIP + dstIP := t.DstIP + srcPort := t.SrcPort + dstPort := t.DstPort + + t.SrcIP = dstIP + t.SrcPort = dstPort + t.DstIP = srcIP + t.DstPort = srcPort +} + type GenericMessage struct { IsRequest bool `json:"is_request"` CaptureTime time.Time `json:"capture_time"` @@ -62,7 +74,7 @@ type OutputChannelItem struct { type Dissector interface { Register(*Extension) Ping() - Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, emitter Emitter) + Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, emitter Emitter) error Analyze(item *OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *MizuEntry Summarize(entry *MizuEntry) *BaseEntryDetails Represent(entry *MizuEntry) (Protocol, []byte, error) diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index 9dbf60391..f9e68383d 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -39,7 +39,7 @@ func (d dissecting) Ping() { const amqpRequest string = "amqp_request" -func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) { +func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) error { r := AmqpReader{b} var remaining int @@ -79,7 +79,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em frame, err := r.ReadFrame() if err == io.EOF { // We must read until we see an EOF... very important! - return + return nil } else if err != nil { // log.Println("Error reading stream", h.net, h.transport, ":", err) } @@ -204,6 +204,8 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em // log.Printf("unexpected frame: %+v\n", f) } } + + return nil } func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *api.MizuEntry { diff --git a/tap/extensions/http/handlers.go b/tap/extensions/http/handlers.go index 9f9bb3eb6..572815ed0 100644 --- a/tap/extensions/http/handlers.go +++ b/tap/extensions/http/handlers.go @@ -6,7 +6,6 @@ import ( "fmt" "io" "io/ioutil" - "log" "net/http" "time" @@ -78,7 +77,8 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, emitter api.Emit requestCounter++ req, err := http.ReadRequest(b) if err != nil { - log.Println("Error reading stream:", err) + requestCounter-- + // log.Println("Error reading stream:", err) return err } @@ -120,7 +120,8 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, emitter api.Emit responseCounter++ res, err := http.ReadResponse(b, nil) if err != nil { - log.Println("Error reading stream:", err) + responseCounter-- + // log.Println("Error reading stream:", err) return err } var req string diff --git a/tap/extensions/http/main.go b/tap/extensions/http/main.go index 3d26584d1..d09f6acda 100644 --- a/tap/extensions/http/main.go +++ b/tap/extensions/http/main.go @@ -60,7 +60,7 @@ func (d dissecting) Ping() { log.Printf("pong %s\n", protocol.Name) } -func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) { +func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) error { ident := fmt.Sprintf("%s->%s:%s->%s", tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, tcpID.DstPort) isHTTP2, err := checkIsHTTP2Connection(b, isClient) if err != nil { @@ -77,36 +77,43 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em grpcAssembler = createGrpcAssembler(b) } + success := false for { if isHTTP2 { err = handleHTTP2Stream(grpcAssembler, tcpID, emitter) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { - io.ReadAll(b) rlog.Debugf("[HTTP/2] stream %s error: %s (%v,%+v)", ident, err, err, err) continue } + success = true } else if isClient { + tcpID.Swap() err = handleHTTP1ClientStream(b, tcpID, emitter) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { - io.ReadAll(b) rlog.Debugf("[HTTP-request] stream %s Request error: %s (%v,%+v)", ident, err, err, err) continue } + success = true } else { err = handleHTTP1ServerStream(b, tcpID, emitter) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { - io.ReadAll(b) rlog.Debugf("[HTTP-response], stream %s Response error: %s (%v,%+v)", ident, err, err, err) continue } + success = true } } + + if !success { + return err + } + return nil } func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *api.MizuEntry { diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index 124fc9b1a..6d4aef037 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -36,7 +36,7 @@ func (d dissecting) Ping() { log.Printf("pong %s\n", _protocol.Name) } -func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) { +func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) error { for { if isClient { _, _, err := ReadRequest(b, tcpID) @@ -52,6 +52,8 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em } } } + + return nil } func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *api.MizuEntry { diff --git a/tap/tcp_reader.go b/tap/tcp_reader.go index eb5d2a764..c0dbd563b 100644 --- a/tap/tcp_reader.go +++ b/tap/tcp_reader.go @@ -2,6 +2,7 @@ package tap import ( "bufio" + "bytes" "fmt" "io" "sync" @@ -89,29 +90,19 @@ func (h *tcpReader) Read(p []byte) (int, error) { return l, nil } -func containsPort(ports []string, port string) bool { - for _, x := range ports { - if x == port { - return true - } - } - return false -} - func (h *tcpReader) run(wg *sync.WaitGroup) { defer wg.Done() - var port string - if h.isClient { - port = h.tcpID.DstPort - } else { - port = h.tcpID.SrcPort - } - b := bufio.NewReader(h) - // TODO: maybe check for kafka and amqp and when it is not one of those pass it to the HTTP? - // because it will check for the ports that we checked in the "isTapTarget" - for _, extension := range extensions { - if containsPort(extension.Protocol.Ports, port) { - extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.Emitter) - } - } + + data, _ := io.ReadAll(h) + r := bytes.NewReader(data) + + b := bufio.NewReader(r) + + extensions[1].Dissector.Dissect(b, true, h.tcpID, h.Emitter) + + r.Reset(data) + + b = bufio.NewReader(r) + + extensions[1].Dissector.Dissect(b, false, h.tcpID, h.Emitter) } diff --git a/tap/tcp_stream.go b/tap/tcp_stream.go index 7c6df1172..fbcbb76f0 100644 --- a/tap/tcp_stream.go +++ b/tap/tcp_stream.go @@ -21,10 +21,8 @@ type tcpStream struct { optchecker reassembly.TCPOptionCheck net, transport gopacket.Flow isDNS bool + reader tcpReader isTapTarget bool - reversed bool - client tcpReader - server tcpReader urls []string ident string sync.Mutex @@ -145,11 +143,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass // This is where we pass the reassembled information onwards // This channel is read by an tcpReader object statsTracker.incReassembledTcpPayloadsCount() - if dir == reassembly.TCPDirClientToServer && !t.reversed { - t.client.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp} - } else { - t.server.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp} - } + t.reader.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp} } } } @@ -157,8 +151,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass func (t *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool { Trace("%s: Connection closed", t.ident) if t.isTapTarget { - close(t.client.msgQueue) - close(t.server.msgQueue) + close(t.reader.msgQueue) } // do not remove the connection to allow last ACK return false diff --git a/tap/tcp_stream_factory.go b/tap/tcp_stream_factory.go index 887869da9..2a8f228b1 100644 --- a/tap/tcp_stream_factory.go +++ b/tap/tcp_stream_factory.go @@ -45,13 +45,12 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T transport: transport, isDNS: tcp.SrcPort == 53 || tcp.DstPort == 53, isTapTarget: isTapTarget, - reversed: props.reversed, tcpstate: reassembly.NewTCPSimpleFSM(fsmOptions), ident: fmt.Sprintf("%s:%s", net, transport), optchecker: reassembly.NewTCPOptionCheck(), } if stream.isTapTarget { - stream.client = tcpReader{ + stream.reader = tcpReader{ msgQueue: make(chan tcpReaderDataMsg), ident: fmt.Sprintf("%s %s", net, transport), tcpID: &api.TcpID{ @@ -66,24 +65,9 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T outboundLinkWriter: factory.outbountLinkWriter, Emitter: factory.Emitter, } - stream.server = tcpReader{ - msgQueue: make(chan tcpReaderDataMsg), - ident: fmt.Sprintf("%s %s", net.Reverse(), transport.Reverse()), - tcpID: &api.TcpID{ - SrcIP: net.Dst().String(), - DstIP: net.Src().String(), - SrcPort: transport.Dst().String(), - DstPort: transport.Src().String(), - }, - parent: stream, - isOutgoing: props.isOutgoing, - outboundLinkWriter: factory.outbountLinkWriter, - Emitter: factory.Emitter, - } - factory.wg.Add(2) - // Start reading from channels stream.client.bytes and stream.server.bytes - go stream.client.run(&factory.wg) - go stream.server.run(&factory.wg) + factory.wg.Add(1) + // Start reading from channel stream.reader.bytes + go stream.reader.run(&factory.wg) } return stream } @@ -93,42 +77,30 @@ func (factory *tcpStreamFactory) WaitGoRoutines() { } func (factory *tcpStreamFactory) getStreamProps(srcIP string, dstIP string, srcPort string, dstPort string, allExtensionPorts []string) *streamProps { - reversed := false if hostMode { - // TODO: Implement reversed for the `hostMode` + // TODO: Bring back `filterAuthorities` + return &streamProps{isTapTarget: true, isOutgoing: false} if inArrayString(gSettings.filterAuthorities, fmt.Sprintf("%s:%s", dstIP, dstPort)) == true { rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ host1 %s:%s", dstIP, dstPort)) - return &streamProps{isTapTarget: true, isOutgoing: false, reversed: reversed} + return &streamProps{isTapTarget: true, isOutgoing: false} } else if inArrayString(gSettings.filterAuthorities, dstIP) == true { rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ host2 %s", dstIP)) - return &streamProps{isTapTarget: true, isOutgoing: false, reversed: reversed} + return &streamProps{isTapTarget: true, isOutgoing: false} } else if *anydirection && inArrayString(gSettings.filterAuthorities, srcIP) == true { rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ host3 %s", srcIP)) - return &streamProps{isTapTarget: true, isOutgoing: true, reversed: reversed} + return &streamProps{isTapTarget: true, isOutgoing: true} } - return &streamProps{isTapTarget: false, reversed: reversed} + return &streamProps{isTapTarget: false} } else { - // TODO: Bring back `filterPorts` as a string if it's really needed - // (gSettings.filterPorts != nil && (inArrayInt(gSettings.filterPorts, dstPort))) - isTappedPort := containsPort(allExtensionPorts, dstPort) - if !isTappedPort && containsPort(allExtensionPorts, srcPort) { - isTappedPort = true - reversed = true - } - if !isTappedPort { - rlog.Debugf("getStreamProps %s", fmt.Sprintf("- notHost1 %s", dstPort)) - return &streamProps{isTapTarget: false, isOutgoing: false, reversed: reversed} - } - isOutgoing := !inArrayString(ownIps, dstIP) if !*anydirection && isOutgoing { rlog.Debugf("getStreamProps %s", fmt.Sprintf("- notHost2")) - return &streamProps{isTapTarget: false, isOutgoing: isOutgoing, reversed: reversed} + return &streamProps{isTapTarget: false, isOutgoing: isOutgoing} } rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ notHost3 %s -> %s:%s", srcIP, dstIP, dstPort)) - return &streamProps{isTapTarget: true, reversed: reversed} + return &streamProps{isTapTarget: true} } } @@ -143,5 +115,4 @@ func (factory *tcpStreamFactory) shouldNotifyOnOutboundLink(dstIP string, dstPor type streamProps struct { isTapTarget bool isOutgoing bool - reversed bool }