Merge branch 'feat/extension-api' of github.com:up9inc/mizu into feat/extension-api

This commit is contained in:
Roee Gadot
2021-08-25 17:37:52 +03:00
5 changed files with 30 additions and 22 deletions

View File

@@ -42,20 +42,6 @@ type TcpID struct {
Ident string
}
func (t *TcpID) Swap() *TcpID {
srcIP := t.SrcIP
dstIP := t.DstIP
srcPort := t.SrcPort
dstPort := t.DstPort
return &TcpID{
SrcIP: dstIP,
SrcPort: dstPort,
DstIP: srcIP,
DstPort: srcPort,
}
}
type GenericMessage struct {
IsRequest bool `json:"is_request"`
CaptureTime time.Time `json:"capture_time"`

View File

@@ -91,7 +91,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em
}
success = true
} else if isClient {
err = handleHTTP1ClientStream(b, tcpID.Swap(), emitter)
err = handleHTTP1ClientStream(b, tcpID, emitter)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {

View File

@@ -104,8 +104,7 @@ func (h *tcpReader) run(wg *sync.WaitGroup) {
for _, extension := range extensions {
for _, isClient := range []bool{true, false} {
r.Reset(data)
b := bufio.NewReader(r)
extension.Dissector.Dissect(b, isClient, h.tcpID, h.Emitter)
extension.Dissector.Dissect(bufio.NewReader(r), isClient, h.tcpID, h.Emitter)
}
}
}

View File

@@ -23,6 +23,8 @@ type tcpStream struct {
isDNS bool
reader tcpReader
isTapTarget bool
client tcpReader
server tcpReader
urls []string
ident string
sync.Mutex
@@ -143,7 +145,11 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
// This is where we pass the reassembled information onwards
// This channel is read by an tcpReader object
statsTracker.incReassembledTcpPayloadsCount()
t.reader.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp}
if dir == reassembly.TCPDirClientToServer {
t.client.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp}
} else {
t.server.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp}
}
}
}
}
@@ -151,7 +157,8 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
func (t *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool {
Trace("%s: Connection closed", t.ident)
if t.isTapTarget {
close(t.reader.msgQueue)
close(t.client.msgQueue)
close(t.server.msgQueue)
}
// do not remove the connection to allow last ACK
return false

View File

@@ -50,7 +50,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
optchecker: reassembly.NewTCPOptionCheck(),
}
if stream.isTapTarget {
stream.reader = tcpReader{
stream.client = tcpReader{
msgQueue: make(chan tcpReaderDataMsg),
ident: fmt.Sprintf("%s %s", net, transport),
tcpID: &api.TcpID{
@@ -65,9 +65,25 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
outboundLinkWriter: factory.outboundLinkWriter,
Emitter: factory.Emitter,
}
factory.wg.Add(1)
stream.server = tcpReader{
msgQueue: make(chan tcpReaderDataMsg),
ident: fmt.Sprintf("%s %s", net, transport),
tcpID: &api.TcpID{
SrcIP: net.Dst().String(),
DstIP: net.Src().String(),
SrcPort: transport.Dst().String(),
DstPort: transport.Src().String(),
},
parent: stream,
isClient: true,
isOutgoing: props.isOutgoing,
outboundLinkWriter: factory.outbountLinkWriter,
Emitter: factory.Emitter,
}
factory.wg.Add(2)
// Start reading from channel stream.reader.bytes
go stream.reader.run(&factory.wg)
go stream.client.run(&factory.wg)
go stream.server.run(&factory.wg)
}
return stream
}