diff --git a/tap/extensions/http/handlers.go b/tap/extensions/http/handlers.go index 132d3d654..0327832af 100644 --- a/tap/extensions/http/handlers.go +++ b/tap/extensions/http/handlers.go @@ -3,7 +3,6 @@ package main import ( "bufio" "fmt" - "io" "io/ioutil" "log" "net/http" @@ -52,13 +51,12 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID) (*api.Req return nil, nil } -func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID) error { +func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID) (*api.RequestResponsePair, error) { requestCounter++ req, err := http.ReadRequest(b) - if err == io.EOF || err == io.ErrUnexpectedEOF { - return nil - } else if err != nil { + if err != nil { log.Println("Error reading stream:", err) + return nil, err } else { body, _ := ioutil.ReadAll(req.Body) req.Body.Close() @@ -73,17 +71,19 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID) error { tcpID.DstPort, requestCounter, ) - reqResMatcher.registerRequest(ident, req, time.Now()) - return err + reqResPair := reqResMatcher.registerRequest(ident, req, time.Now()) + if reqResPair != nil { + fmt.Printf("reqResPair: %+v\n", reqResPair) + } + return reqResPair, nil } func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID) (*api.RequestResponsePair, error) { responseCounter++ res, err := http.ReadResponse(b, nil) - if err == io.EOF || err == io.ErrUnexpectedEOF { - return nil, nil - } else if err != nil { + if err != nil { log.Println("Error reading stream:", err) + return nil, err } else { body, _ := ioutil.ReadAll(res.Body) res.Body.Close() @@ -99,7 +99,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID) (*api.RequestRes ) reqResPair := reqResMatcher.registerResponse(ident, res, time.Now()) if reqResPair != nil { - return reqResPair, nil + fmt.Printf("reqResPair: %+v\n", reqResPair) } - return nil, err + return reqResPair, nil } diff --git a/tap/extensions/http/main.go b/tap/extensions/http/main.go index f463ed604..693383bdf 100644 --- a/tap/extensions/http/main.go +++ b/tap/extensions/http/main.go @@ -47,12 +47,11 @@ func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) *a grpcAssembler = createGrpcAssembler(b) } + var reqResPair *api.RequestResponsePair + for { if isHTTP2 { - reqResPair, err := handleHTTP2Stream(grpcAssembler, tcpID) - if reqResPair != nil { - return reqResPair - } + reqResPair, err = handleHTTP2Stream(grpcAssembler, tcpID) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { @@ -60,7 +59,7 @@ func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) *a continue } } else if isClient { - err := handleHTTP1ClientStream(b, tcpID) + reqResPair, err = handleHTTP1ClientStream(b, tcpID) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { @@ -68,10 +67,7 @@ func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) *a continue } } else { - reqResPair, err := handleHTTP1ServerStream(b, tcpID) - if reqResPair != nil { - return reqResPair - } + reqResPair, err = handleHTTP1ServerStream(b, tcpID) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { @@ -80,7 +76,7 @@ func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) *a } } } - return nil + return reqResPair } var Dissector dissecting diff --git a/tap/tcp_stream_factory.go b/tap/tcp_stream_factory.go index 6405dd500..6cbd2d699 100644 --- a/tap/tcp_stream_factory.go +++ b/tap/tcp_stream_factory.go @@ -4,7 +4,6 @@ import ( "bufio" "fmt" "log" - "sync" "github.com/romana/rlog" "github.com/up9inc/mizu/tap/api" @@ -15,8 +14,6 @@ import ( ) type tcpStreamFactory struct { - wg sync.WaitGroup - doHTTP bool outbountLinkWriter *OutboundLinkWriter } @@ -46,8 +43,7 @@ func (h *tcpStream) serverRun(tcpID *api.TcpID) { for _, extension := range extensions { if containsPort(extension.OutboundPorts, h.transport.Src().String()) { extension.Dissector.Ping() - reqResPair := extension.Dissector.Dissect(b, false, tcpID) - log.Printf("reqResPair: %+v\n", reqResPair) + extension.Dissector.Dissect(b, false, tcpID) } } } @@ -73,10 +69,6 @@ func (h *tcpStreamFactory) New(net, transport gopacket.Flow) tcpassembly.Stream return &stream.r } -func (factory *tcpStreamFactory) WaitGoRoutines() { - factory.wg.Wait() -} - func (factory *tcpStreamFactory) getStreamProps(srcIP string, dstIP string, dstPort int) *streamProps { if hostMode { if inArrayString(gSettings.filterAuthorities, fmt.Sprintf("%s:%d", dstIP, dstPort)) == true {