diff --git a/tap/api/api.go b/tap/api/api.go index 9ddc0f4cd..e40fe8c98 100644 --- a/tap/api/api.go +++ b/tap/api/api.go @@ -42,20 +42,6 @@ type TcpID struct { Ident string } -func (t *TcpID) Swap() *TcpID { - srcIP := t.SrcIP - dstIP := t.DstIP - srcPort := t.SrcPort - dstPort := t.DstPort - - return &TcpID{ - SrcIP: dstIP, - SrcPort: dstPort, - DstIP: srcIP, - DstPort: srcPort, - } -} - type GenericMessage struct { IsRequest bool `json:"is_request"` CaptureTime time.Time `json:"capture_time"` diff --git a/tap/extensions/http/main.go b/tap/extensions/http/main.go index 7797e6a52..080e51bbf 100644 --- a/tap/extensions/http/main.go +++ b/tap/extensions/http/main.go @@ -91,7 +91,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em } success = true } else if isClient { - err = handleHTTP1ClientStream(b, tcpID.Swap(), emitter) + err = handleHTTP1ClientStream(b, tcpID, emitter) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { diff --git a/tap/tcp_reader.go b/tap/tcp_reader.go index c5aa3b26d..687e9c317 100644 --- a/tap/tcp_reader.go +++ b/tap/tcp_reader.go @@ -104,8 +104,7 @@ func (h *tcpReader) run(wg *sync.WaitGroup) { for _, extension := range extensions { for _, isClient := range []bool{true, false} { r.Reset(data) - b := bufio.NewReader(r) - extension.Dissector.Dissect(b, isClient, h.tcpID, h.Emitter) + extension.Dissector.Dissect(bufio.NewReader(r), isClient, h.tcpID, h.Emitter) } } } diff --git a/tap/tcp_stream.go b/tap/tcp_stream.go index fbcbb76f0..e55995ee9 100644 --- a/tap/tcp_stream.go +++ b/tap/tcp_stream.go @@ -23,6 +23,8 @@ type tcpStream struct { isDNS bool reader tcpReader isTapTarget bool + client tcpReader + server tcpReader urls []string ident string sync.Mutex @@ -143,7 +145,11 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass // This is where we pass the reassembled information onwards // This channel is read by an tcpReader object statsTracker.incReassembledTcpPayloadsCount() - t.reader.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp} + if dir == reassembly.TCPDirClientToServer { + t.client.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp} + } else { + t.server.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp} + } } } } @@ -151,7 +157,8 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass func (t *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool { Trace("%s: Connection closed", t.ident) if t.isTapTarget { - close(t.reader.msgQueue) + close(t.client.msgQueue) + close(t.server.msgQueue) } // do not remove the connection to allow last ACK return false diff --git a/tap/tcp_stream_factory.go b/tap/tcp_stream_factory.go index cb4e4068e..d1ada633a 100644 --- a/tap/tcp_stream_factory.go +++ b/tap/tcp_stream_factory.go @@ -50,7 +50,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T optchecker: reassembly.NewTCPOptionCheck(), } if stream.isTapTarget { - stream.reader = tcpReader{ + stream.client = tcpReader{ msgQueue: make(chan tcpReaderDataMsg), ident: fmt.Sprintf("%s %s", net, transport), tcpID: &api.TcpID{ @@ -65,9 +65,25 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T outboundLinkWriter: factory.outboundLinkWriter, Emitter: factory.Emitter, } - factory.wg.Add(1) + stream.server = tcpReader{ + msgQueue: make(chan tcpReaderDataMsg), + ident: fmt.Sprintf("%s %s", net, transport), + tcpID: &api.TcpID{ + SrcIP: net.Dst().String(), + DstIP: net.Src().String(), + SrcPort: transport.Dst().String(), + DstPort: transport.Src().String(), + }, + parent: stream, + isClient: true, + isOutgoing: props.isOutgoing, + outboundLinkWriter: factory.outbountLinkWriter, + Emitter: factory.Emitter, + } + factory.wg.Add(2) // Start reading from channel stream.reader.bytes - go stream.reader.run(&factory.wg) + go stream.client.run(&factory.wg) + go stream.server.run(&factory.wg) } return stream }