Add Emitter interface to the API and send OutputChannelItem(s) to OutputChannel

This commit is contained in:
M. Mert Yildiran 2021-08-18 10:30:55 +03:00
parent 2d188c2dc9
commit 9fd069a4ff
No known key found for this signature in database
GPG Key ID: D42ADB236521BF7A
7 changed files with 42 additions and 21 deletions

View File

@ -2,6 +2,7 @@ package api
import (
"bufio"
"fmt"
"plugin"
"time"
)
@ -51,5 +52,21 @@ type OutputChannelItem struct {
type Dissector interface {
Register(*Extension)
Ping()
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, callback func(item *OutputChannelItem))
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, emitter Emitter)
}
type Emitting struct {
OutputChannel chan *OutputChannelItem
}
type Emitter interface {
Emit(item *OutputChannelItem)
}
func (e *Emitting) Emit(item *OutputChannelItem) {
fmt.Printf("item: %+v\n", item)
fmt.Printf("item.Data: %+v\n", item.Data)
fmt.Printf("item.Data.Request.Orig: %v\n", item.Data.Request.Orig)
fmt.Printf("item.Data.Response.Orig: %v\n", item.Data.Response.Orig)
e.OutputChannel <- item
}

View File

@ -23,7 +23,7 @@ func (d dissecting) Ping() {
log.Printf("pong AMQP\n")
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, callback func(item *api.OutputChannelItem)) {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) {
// TODO: Implement
}

View File

@ -11,7 +11,7 @@ import (
"github.com/up9inc/mizu/tap/api"
)
func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, Emit func(item *api.OutputChannelItem)) error {
func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, emitter api.Emitter) error {
streamID, messageHTTP1, err := grpcAssembler.readMessage()
if err != nil {
return err
@ -45,13 +45,13 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, Emit func
}
if item != nil {
Emit(item)
emitter.Emit(item)
}
return nil
}
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, Emit func(item *api.OutputChannelItem)) error {
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, emitter api.Emitter) error {
requestCounter++
req, err := http.ReadRequest(b)
if err != nil {
@ -73,12 +73,12 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, Emit func(item *
)
item := reqResMatcher.registerRequest(ident, req, time.Now())
if item != nil {
Emit(item)
emitter.Emit(item)
}
return nil
}
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, Emit func(item *api.OutputChannelItem)) error {
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, emitter api.Emitter) error {
responseCounter++
res, err := http.ReadResponse(b, nil)
if err != nil {
@ -99,7 +99,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, Emit func(item *
)
item := reqResMatcher.registerResponse(ident, res, time.Now())
if item != nil {
Emit(item)
emitter.Emit(item)
}
return nil
}

View File

@ -32,7 +32,7 @@ func (d dissecting) Ping() {
log.Printf("pong HTTP\n")
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, Emit func(item *api.OutputChannelItem)) {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) {
ident := fmt.Sprintf("%s->%s:%s->%s", tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, tcpID.DstPort)
isHTTP2, err := checkIsHTTP2Connection(b, isClient)
if err != nil {
@ -51,7 +51,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, Em
for {
if isHTTP2 {
err = handleHTTP2Stream(grpcAssembler, tcpID, Emit)
err = handleHTTP2Stream(grpcAssembler, tcpID, emitter)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@ -59,7 +59,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, Em
continue
}
} else if isClient {
err = handleHTTP1ClientStream(b, tcpID, Emit)
err = handleHTTP1ClientStream(b, tcpID, emitter)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@ -67,7 +67,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, Em
continue
}
} else {
err = handleHTTP1ServerStream(b, tcpID, Emit)
err = handleHTTP1ServerStream(b, tcpID, emitter)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {

View File

@ -23,7 +23,7 @@ func (d dissecting) Ping() {
log.Printf("pong Kafka\n")
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, callback func(item *api.OutputChannelItem)) {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) {
// TODO: Implement
}

View File

@ -302,9 +302,13 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
log.Fatal(err)
}
var emitter api.Emitter = &api.Emitting{
OutputChannel: outputItems,
}
// Set up assembly
streamFactory := &tcpStreamFactory{
OutputChannelItem: outputItems,
Emitter: emitter,
}
streamPool := tcpassembly.NewStreamPool(streamFactory)
assembler := tcpassembly.NewAssembler(streamPool)

View File

@ -15,7 +15,7 @@ import (
type tcpStreamFactory struct {
outbountLinkWriter *OutboundLinkWriter
OutputChannelItem chan *api.OutputChannelItem
Emitter api.Emitter
}
const checkTLSPacketAmount = 100
@ -33,22 +33,22 @@ func Emit(item *api.OutputChannelItem) {
log.Printf("Emit item: %+v\n", item)
}
func (h *tcpStream) clientRun(tcpID *api.TcpID) {
func (h *tcpStream) clientRun(tcpID *api.TcpID, emitter api.Emitter) {
b := bufio.NewReader(&h.r)
for _, extension := range extensions {
if containsPort(extension.OutboundPorts, h.transport.Dst().String()) {
extension.Dissector.Ping()
extension.Dissector.Dissect(b, true, tcpID, Emit)
extension.Dissector.Dissect(b, true, tcpID, emitter)
}
}
}
func (h *tcpStream) serverRun(tcpID *api.TcpID) {
func (h *tcpStream) serverRun(tcpID *api.TcpID, emitter api.Emitter) {
b := bufio.NewReader(&h.r)
for _, extension := range extensions {
if containsPort(extension.OutboundPorts, h.transport.Src().String()) {
extension.Dissector.Ping()
extension.Dissector.Dissect(b, false, tcpID, Emit)
extension.Dissector.Dissect(b, false, tcpID, emitter)
}
}
}
@ -67,9 +67,9 @@ func (h *tcpStreamFactory) New(net, transport gopacket.Flow) tcpassembly.Stream
DstPort: transport.Dst().String(),
}
if containsPort(allOutboundPorts, transport.Dst().String()) {
go stream.clientRun(tcpID)
go stream.clientRun(tcpID, h.Emitter)
} else if containsPort(allOutboundPorts, transport.Src().String()) {
go stream.serverRun(tcpID)
go stream.serverRun(tcpID, h.Emitter)
}
return &stream.r
}