diff --git a/tap/api/api.go b/tap/api/api.go index 1c50df3c2..678c87feb 100644 --- a/tap/api/api.go +++ b/tap/api/api.go @@ -2,6 +2,7 @@ package api import ( "bufio" + "fmt" "plugin" "time" ) @@ -51,5 +52,21 @@ type OutputChannelItem struct { type Dissector interface { Register(*Extension) Ping() - Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, callback func(item *OutputChannelItem)) + Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, emitter Emitter) +} + +type Emitting struct { + OutputChannel chan *OutputChannelItem +} + +type Emitter interface { + Emit(item *OutputChannelItem) +} + +func (e *Emitting) Emit(item *OutputChannelItem) { + fmt.Printf("item: %+v\n", item) + fmt.Printf("item.Data: %+v\n", item.Data) + fmt.Printf("item.Data.Request.Orig: %v\n", item.Data.Request.Orig) + fmt.Printf("item.Data.Response.Orig: %v\n", item.Data.Response.Orig) + e.OutputChannel <- item } diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index 381ee6245..18e2107ab 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -23,7 +23,7 @@ func (d dissecting) Ping() { log.Printf("pong AMQP\n") } -func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, callback func(item *api.OutputChannelItem)) { +func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) { // TODO: Implement } diff --git a/tap/extensions/http/handlers.go b/tap/extensions/http/handlers.go index 674c907a4..01dcf3ca2 100644 --- a/tap/extensions/http/handlers.go +++ b/tap/extensions/http/handlers.go @@ -11,7 +11,7 @@ import ( "github.com/up9inc/mizu/tap/api" ) -func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, Emit func(item *api.OutputChannelItem)) error { +func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, emitter api.Emitter) error { streamID, messageHTTP1, err := grpcAssembler.readMessage() if err != nil { return err @@ -45,13 +45,13 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, Emit func } if item != nil { - Emit(item) + emitter.Emit(item) } return nil } -func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, Emit func(item *api.OutputChannelItem)) error { +func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, emitter api.Emitter) error { requestCounter++ req, err := http.ReadRequest(b) if err != nil { @@ -73,12 +73,12 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, Emit func(item * ) item := reqResMatcher.registerRequest(ident, req, time.Now()) if item != nil { - Emit(item) + emitter.Emit(item) } return nil } -func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, Emit func(item *api.OutputChannelItem)) error { +func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, emitter api.Emitter) error { responseCounter++ res, err := http.ReadResponse(b, nil) if err != nil { @@ -99,7 +99,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, Emit func(item * ) item := reqResMatcher.registerResponse(ident, res, time.Now()) if item != nil { - Emit(item) + emitter.Emit(item) } return nil } diff --git a/tap/extensions/http/main.go b/tap/extensions/http/main.go index 6799d5879..2a1c642a3 100644 --- a/tap/extensions/http/main.go +++ b/tap/extensions/http/main.go @@ -32,7 +32,7 @@ func (d dissecting) Ping() { log.Printf("pong HTTP\n") } -func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, Emit func(item *api.OutputChannelItem)) { +func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) { ident := fmt.Sprintf("%s->%s:%s->%s", tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, tcpID.DstPort) isHTTP2, err := checkIsHTTP2Connection(b, isClient) if err != nil { @@ -51,7 +51,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, Em for { if isHTTP2 { - err = handleHTTP2Stream(grpcAssembler, tcpID, Emit) + err = handleHTTP2Stream(grpcAssembler, tcpID, emitter) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { @@ -59,7 +59,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, Em continue } } else if isClient { - err = handleHTTP1ClientStream(b, tcpID, Emit) + err = handleHTTP1ClientStream(b, tcpID, emitter) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { @@ -67,7 +67,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, Em continue } } else { - err = handleHTTP1ServerStream(b, tcpID, Emit) + err = handleHTTP1ServerStream(b, tcpID, emitter) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index 1385d842d..60c5b4f85 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -23,7 +23,7 @@ func (d dissecting) Ping() { log.Printf("pong Kafka\n") } -func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, callback func(item *api.OutputChannelItem)) { +func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) { // TODO: Implement } diff --git a/tap/passive_tapper.go b/tap/passive_tapper.go index 37d36da35..a32444ef6 100644 --- a/tap/passive_tapper.go +++ b/tap/passive_tapper.go @@ -302,9 +302,13 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) { log.Fatal(err) } + var emitter api.Emitter = &api.Emitting{ + OutputChannel: outputItems, + } + // Set up assembly streamFactory := &tcpStreamFactory{ - OutputChannelItem: outputItems, + Emitter: emitter, } streamPool := tcpassembly.NewStreamPool(streamFactory) assembler := tcpassembly.NewAssembler(streamPool) diff --git a/tap/tcp_stream_factory.go b/tap/tcp_stream_factory.go index ad57d58ce..41d39cd9e 100644 --- a/tap/tcp_stream_factory.go +++ b/tap/tcp_stream_factory.go @@ -15,7 +15,7 @@ import ( type tcpStreamFactory struct { outbountLinkWriter *OutboundLinkWriter - OutputChannelItem chan *api.OutputChannelItem + Emitter api.Emitter } const checkTLSPacketAmount = 100 @@ -33,22 +33,22 @@ func Emit(item *api.OutputChannelItem) { log.Printf("Emit item: %+v\n", item) } -func (h *tcpStream) clientRun(tcpID *api.TcpID) { +func (h *tcpStream) clientRun(tcpID *api.TcpID, emitter api.Emitter) { b := bufio.NewReader(&h.r) for _, extension := range extensions { if containsPort(extension.OutboundPorts, h.transport.Dst().String()) { extension.Dissector.Ping() - extension.Dissector.Dissect(b, true, tcpID, Emit) + extension.Dissector.Dissect(b, true, tcpID, emitter) } } } -func (h *tcpStream) serverRun(tcpID *api.TcpID) { +func (h *tcpStream) serverRun(tcpID *api.TcpID, emitter api.Emitter) { b := bufio.NewReader(&h.r) for _, extension := range extensions { if containsPort(extension.OutboundPorts, h.transport.Src().String()) { extension.Dissector.Ping() - extension.Dissector.Dissect(b, false, tcpID, Emit) + extension.Dissector.Dissect(b, false, tcpID, emitter) } } } @@ -67,9 +67,9 @@ func (h *tcpStreamFactory) New(net, transport gopacket.Flow) tcpassembly.Stream DstPort: transport.Dst().String(), } if containsPort(allOutboundPorts, transport.Dst().String()) { - go stream.clientRun(tcpID) + go stream.clientRun(tcpID, h.Emitter) } else if containsPort(allOutboundPorts, transport.Src().String()) { - go stream.serverRun(tcpID) + go stream.serverRun(tcpID, h.Emitter) } return &stream.r }