diff --git a/tap/api/api.go b/tap/api/api.go index 132450f60..edc801714 100644 --- a/tap/api/api.go +++ b/tap/api/api.go @@ -44,5 +44,5 @@ type RequestResponsePair struct { type Dissector interface { Register(*Extension) Ping() - Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID) *RequestResponsePair + Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, callback func(reqResPair *RequestResponsePair)) } diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index 8e4dbf397..e3f7b907b 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -13,19 +13,18 @@ func init() { type dissecting string -func (g dissecting) Register(extension *api.Extension) { +func (d dissecting) Register(extension *api.Extension) { extension.Name = "amqp" extension.OutboundPorts = []string{"5671", "5672"} extension.InboundPorts = []string{} } -func (g dissecting) Ping() { +func (d dissecting) Ping() { log.Printf("pong AMQP\n") } -func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) *api.RequestResponsePair { +func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, callback func(reqResPair *api.RequestResponsePair)) { // TODO: Implement - return nil } var Dissector dissecting diff --git a/tap/extensions/http/handlers.go b/tap/extensions/http/handlers.go index 0327832af..db6408748 100644 --- a/tap/extensions/http/handlers.go +++ b/tap/extensions/http/handlers.go @@ -11,10 +11,10 @@ import ( "github.com/up9inc/mizu/tap/api" ) -func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID) (*api.RequestResponsePair, error) { +func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, Emit func(reqResPair *api.RequestResponsePair)) error { streamID, messageHTTP1, err := grpcAssembler.readMessage() if err != nil { - return nil, err + return err } var reqResPair *api.RequestResponsePair @@ -45,18 +45,18 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID) (*api.Req } if reqResPair != nil { - return reqResPair, nil + Emit(reqResPair) } - return nil, nil + return nil } -func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID) (*api.RequestResponsePair, error) { +func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, Emit func(reqResPair *api.RequestResponsePair)) error { requestCounter++ req, err := http.ReadRequest(b) if err != nil { log.Println("Error reading stream:", err) - return nil, err + return err } else { body, _ := ioutil.ReadAll(req.Body) req.Body.Close() @@ -73,17 +73,17 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID) (*api.RequestRes ) reqResPair := reqResMatcher.registerRequest(ident, req, time.Now()) if reqResPair != nil { - fmt.Printf("reqResPair: %+v\n", reqResPair) + Emit(reqResPair) } - return reqResPair, nil + return nil } -func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID) (*api.RequestResponsePair, error) { +func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, Emit func(reqResPair *api.RequestResponsePair)) error { responseCounter++ res, err := http.ReadResponse(b, nil) if err != nil { log.Println("Error reading stream:", err) - return nil, err + return err } else { body, _ := ioutil.ReadAll(res.Body) res.Body.Close() @@ -99,7 +99,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID) (*api.RequestRes ) reqResPair := reqResMatcher.registerResponse(ident, res, time.Now()) if reqResPair != nil { - fmt.Printf("reqResPair: %+v\n", reqResPair) + Emit(reqResPair) } - return reqResPair, nil + return nil } diff --git a/tap/extensions/http/main.go b/tap/extensions/http/main.go index 693383bdf..0c2c6d58f 100644 --- a/tap/extensions/http/main.go +++ b/tap/extensions/http/main.go @@ -20,17 +20,17 @@ func init() { type dissecting string -func (g dissecting) Register(extension *api.Extension) { +func (d dissecting) Register(extension *api.Extension) { extension.Name = "http" extension.OutboundPorts = []string{"80", "8080", "443"} extension.InboundPorts = []string{} } -func (g dissecting) Ping() { +func (d dissecting) Ping() { log.Printf("pong HTTP\n") } -func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) *api.RequestResponsePair { +func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, Emit func(reqResPair *api.RequestResponsePair)) { ident := fmt.Sprintf("%s->%s:%s->%s", tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, tcpID.DstPort) isHTTP2, err := checkIsHTTP2Connection(b, isClient) if err != nil { @@ -47,11 +47,9 @@ func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) *a grpcAssembler = createGrpcAssembler(b) } - var reqResPair *api.RequestResponsePair - for { if isHTTP2 { - reqResPair, err = handleHTTP2Stream(grpcAssembler, tcpID) + err = handleHTTP2Stream(grpcAssembler, tcpID, Emit) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { @@ -59,7 +57,7 @@ func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) *a continue } } else if isClient { - reqResPair, err = handleHTTP1ClientStream(b, tcpID) + err = handleHTTP1ClientStream(b, tcpID, Emit) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { @@ -67,7 +65,7 @@ func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) *a continue } } else { - reqResPair, err = handleHTTP1ServerStream(b, tcpID) + err = handleHTTP1ServerStream(b, tcpID, Emit) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { @@ -76,7 +74,6 @@ func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) *a } } } - return reqResPair } var Dissector dissecting diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index a1a78937f..6aca561a3 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -13,19 +13,18 @@ func init() { type dissecting string -func (g dissecting) Register(extension *api.Extension) { +func (d dissecting) Register(extension *api.Extension) { extension.Name = "kafka" extension.OutboundPorts = []string{"9092"} extension.InboundPorts = []string{} } -func (g dissecting) Ping() { +func (d dissecting) Ping() { log.Printf("pong Kafka\n") } -func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) *api.RequestResponsePair { +func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, callback func(reqResPair *api.RequestResponsePair)) { // TODO: Implement - return nil } var Dissector dissecting diff --git a/tap/tcp_stream_factory.go b/tap/tcp_stream_factory.go index 6cbd2d699..88c84f9b3 100644 --- a/tap/tcp_stream_factory.go +++ b/tap/tcp_stream_factory.go @@ -28,12 +28,18 @@ func containsPort(ports []string, port string) bool { return false } +func Emit(reqResPair *api.RequestResponsePair) { + log.Printf("Emit reqResPair: %+v\n", reqResPair) + log.Printf("Emit reqResPair.Request.Orig: %v\n", reqResPair.Request.Orig) + log.Printf("Emit reqResPair.Response.Orig: %v\n", reqResPair.Response.Orig) +} + func (h *tcpStream) clientRun(tcpID *api.TcpID) { b := bufio.NewReader(&h.r) for _, extension := range extensions { if containsPort(extension.OutboundPorts, h.transport.Dst().String()) { extension.Dissector.Ping() - extension.Dissector.Dissect(b, true, tcpID) + extension.Dissector.Dissect(b, true, tcpID, Emit) } } } @@ -43,7 +49,7 @@ func (h *tcpStream) serverRun(tcpID *api.TcpID) { for _, extension := range extensions { if containsPort(extension.OutboundPorts, h.transport.Src().String()) { extension.Dissector.Ping() - extension.Dissector.Dissect(b, false, tcpID) + extension.Dissector.Dissect(b, false, tcpID, Emit) } } }