diff --git a/tap/api/api.go b/tap/api/api.go index f0d854eb1..1c50df3c2 100644 --- a/tap/api/api.go +++ b/tap/api/api.go @@ -51,5 +51,5 @@ type OutputChannelItem struct { type Dissector interface { Register(*Extension) Ping() - Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, callback func(reqResPair *RequestResponsePair)) + Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, callback func(item *OutputChannelItem)) } diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index 3e92e078b..381ee6245 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -13,19 +13,18 @@ func init() { type dissecting string -func (g dissecting) Register(extension *api.Extension) { +func (d dissecting) Register(extension *api.Extension) { extension.Name = "amqp" extension.OutboundPorts = []string{"5671", "5672"} extension.InboundPorts = []string{} } -func (g dissecting) Ping() { +func (d dissecting) Ping() { log.Printf("pong AMQP\n") } -func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) *api.OutputChannelItem { +func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, callback func(item *api.OutputChannelItem)) { // TODO: Implement - return nil } var Dissector dissecting diff --git a/tap/extensions/http/go.mod b/tap/extensions/http/go.mod index d1dc80f1a..e59ed5839 100644 --- a/tap/extensions/http/go.mod +++ b/tap/extensions/http/go.mod @@ -9,7 +9,7 @@ require ( golang.org/x/net v0.0.0-20210224082022-3d97a244fca7 golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073 golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d - golang.org/x/text v0.3.4 + golang.org/x/text v0.3.5 golang.org/x/tools v0.0.0-20210106214847-113979e3529a ) diff --git a/tap/extensions/http/go.sum b/tap/extensions/http/go.sum index d99c45d08..8107f3588 100644 --- a/tap/extensions/http/go.sum +++ b/tap/extensions/http/go.sum @@ -28,6 +28,8 @@ golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4 h1:0YWbFKbhXG/wIiuHDSKpS0Iy7FSA+u45VtBMfQcFTTc= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ= +golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20210106214847-113979e3529a h1:CB3a9Nez8M13wwlr/E2YtwoU+qYHKfC+JrDa45RXXoQ= diff --git a/tap/extensions/http/handlers.go b/tap/extensions/http/handlers.go index db6408748..674c907a4 100644 --- a/tap/extensions/http/handlers.go +++ b/tap/extensions/http/handlers.go @@ -11,13 +11,13 @@ import ( "github.com/up9inc/mizu/tap/api" ) -func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, Emit func(reqResPair *api.RequestResponsePair)) error { +func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, Emit func(item *api.OutputChannelItem)) error { streamID, messageHTTP1, err := grpcAssembler.readMessage() if err != nil { return err } - var reqResPair *api.RequestResponsePair + var item *api.OutputChannelItem switch messageHTTP1 := messageHTTP1.(type) { case http.Request: @@ -30,7 +30,7 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, Emit func tcpID.DstPort, streamID, ) - reqResPair = reqResMatcher.registerRequest(ident, &messageHTTP1, time.Now()) + item = reqResMatcher.registerRequest(ident, &messageHTTP1, time.Now()) case http.Response: responseCounter++ ident := fmt.Sprintf( @@ -41,17 +41,17 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, Emit func tcpID.SrcPort, streamID, ) - reqResPair = reqResMatcher.registerResponse(ident, &messageHTTP1, time.Now()) + item = reqResMatcher.registerResponse(ident, &messageHTTP1, time.Now()) } - if reqResPair != nil { - Emit(reqResPair) + if item != nil { + Emit(item) } return nil } -func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, Emit func(reqResPair *api.RequestResponsePair)) error { +func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, Emit func(item *api.OutputChannelItem)) error { requestCounter++ req, err := http.ReadRequest(b) if err != nil { @@ -71,14 +71,14 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, Emit func(reqRes tcpID.DstPort, requestCounter, ) - reqResPair := reqResMatcher.registerRequest(ident, req, time.Now()) - if reqResPair != nil { - Emit(reqResPair) + item := reqResMatcher.registerRequest(ident, req, time.Now()) + if item != nil { + Emit(item) } return nil } -func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, Emit func(reqResPair *api.RequestResponsePair)) error { +func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, Emit func(item *api.OutputChannelItem)) error { responseCounter++ res, err := http.ReadResponse(b, nil) if err != nil { @@ -97,9 +97,9 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, Emit func(reqRes tcpID.SrcPort, responseCounter, ) - reqResPair := reqResMatcher.registerResponse(ident, res, time.Now()) - if reqResPair != nil { - Emit(reqResPair) + item := reqResMatcher.registerResponse(ident, res, time.Now()) + if item != nil { + Emit(item) } return nil } diff --git a/tap/extensions/http/main.go b/tap/extensions/http/main.go index b7cf3b44f..6799d5879 100644 --- a/tap/extensions/http/main.go +++ b/tap/extensions/http/main.go @@ -4,10 +4,7 @@ import ( "bufio" "fmt" "io" - "io/ioutil" "log" - "net/http" - "time" "github.com/up9inc/mizu/tap/api" ) @@ -25,67 +22,60 @@ type dissecting string const ExtensionName = "http" -func (g dissecting) Register(extension *api.Extension) { +func (d dissecting) Register(extension *api.Extension) { extension.Name = ExtensionName extension.OutboundPorts = []string{"80", "8080", "443"} extension.InboundPorts = []string{} } -func (g dissecting) Ping() { +func (d dissecting) Ping() { log.Printf("pong HTTP\n") } -func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) *api.OutputChannelItem { - for { - if isClient { - requestCounter++ - req, err := http.ReadRequest(b) - if err == io.EOF || err == io.ErrUnexpectedEOF { - return nil - } else if err != nil { - log.Println("Error reading stream:", err) - } else { - body, _ := ioutil.ReadAll(req.Body) - req.Body.Close() - log.Printf("Received request: %+v with body: %+v\n", req, body) - } +func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, Emit func(item *api.OutputChannelItem)) { + ident := fmt.Sprintf("%s->%s:%s->%s", tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, tcpID.DstPort) + isHTTP2, err := checkIsHTTP2Connection(b, isClient) + if err != nil { + SilentError("HTTP/2-Prepare-Connection", "stream %s Failed to check if client is HTTP/2: %s (%v,%+v)", ident, err, err, err) + // Do something? + } - ident := fmt.Sprintf( - "%s->%s %s->%s %d", - tcpID.SrcIP, - tcpID.DstIP, - tcpID.SrcPort, - tcpID.DstPort, - requestCounter, - ) - reqResMatcher.registerRequest(ident, req, time.Now()) - } else { - responseCounter++ - res, err := http.ReadResponse(b, nil) + var grpcAssembler *GrpcAssembler + if isHTTP2 { + err := prepareHTTP2Connection(b, isClient) + if err != nil { + SilentError("HTTP/2-Prepare-Connection-After-Check", "stream %s error: %s (%v,%+v)", ident, err, err, err) + } + grpcAssembler = createGrpcAssembler(b) + } + + for { + if isHTTP2 { + err = handleHTTP2Stream(grpcAssembler, tcpID, Emit) if err == io.EOF || err == io.ErrUnexpectedEOF { - return nil + break } else if err != nil { - log.Println("Error reading stream:", err) - } else { - body, _ := ioutil.ReadAll(res.Body) - res.Body.Close() - log.Printf("Received response: %+v with body: %+v\n", res, body) + SilentError("HTTP/2", "stream %s error: %s (%v,%+v)", ident, err, err, err) + continue } - ident := fmt.Sprintf( - "%s->%s %s->%s %d", - tcpID.DstIP, - tcpID.SrcIP, - tcpID.DstPort, - tcpID.SrcPort, - responseCounter, - ) - reqResPair := reqResMatcher.registerResponse(ident, res, time.Now()) - if reqResPair != nil { - return reqResPair + } else if isClient { + err = handleHTTP1ClientStream(b, tcpID, Emit) + if err == io.EOF || err == io.ErrUnexpectedEOF { + break + } else if err != nil { + SilentError("HTTP-request", "stream %s Request error: %s (%v,%+v)", ident, err, err, err) + continue + } + } else { + err = handleHTTP1ServerStream(b, tcpID, Emit) + if err == io.EOF || err == io.ErrUnexpectedEOF { + break + } else if err != nil { + SilentError("HTTP-response", "stream %s Response error: %s (%v,%+v)", ident, err, err, err) + continue } } } - return nil } var Dissector dissecting diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index 4edb543b3..1385d842d 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -13,19 +13,18 @@ func init() { type dissecting string -func (g dissecting) Register(extension *api.Extension) { +func (d dissecting) Register(extension *api.Extension) { extension.Name = "kafka" extension.OutboundPorts = []string{"9092"} extension.InboundPorts = []string{} } -func (g dissecting) Ping() { +func (d dissecting) Ping() { log.Printf("pong Kafka\n") } -func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) *api.OutputChannelItem { +func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, callback func(item *api.OutputChannelItem)) { // TODO: Implement - return nil } var Dissector dissecting diff --git a/tap/go.mod b/tap/go.mod index 0df13d898..c0b84a753 100644 --- a/tap/go.mod +++ b/tap/go.mod @@ -9,7 +9,7 @@ require ( golang.org/x/net v0.0.0-20210224082022-3d97a244fca7 golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073 golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d - golang.org/x/text v0.3.4 + golang.org/x/text v0.3.5 golang.org/x/tools v0.0.0-20210106214847-113979e3529a ) diff --git a/tap/go.sum b/tap/go.sum index b852c21d6..a7ee32da5 100644 --- a/tap/go.sum +++ b/tap/go.sum @@ -31,6 +31,8 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4 h1:0YWbFKbhXG/wIiuHDSKpS0Iy7FSA+u45VtBMfQcFTTc= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ= +golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= diff --git a/tap/passive_tapper.go b/tap/passive_tapper.go index 3a917e379..37d36da35 100644 --- a/tap/passive_tapper.go +++ b/tap/passive_tapper.go @@ -189,7 +189,7 @@ func (c *Context) GetCaptureInfo() gopacket.CaptureInfo { return c.CaptureInfo } -func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem) () { +func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem) { hostMode = opts.HostMode if GetMemoryProfilingEnabled() { diff --git a/tap/tcp_stream_factory.go b/tap/tcp_stream_factory.go index 79ba4bb95..ad57d58ce 100644 --- a/tap/tcp_stream_factory.go +++ b/tap/tcp_stream_factory.go @@ -15,7 +15,7 @@ import ( type tcpStreamFactory struct { outbountLinkWriter *OutboundLinkWriter - OutputChannelItem chan *api.OutputChannelItem + OutputChannelItem chan *api.OutputChannelItem } const checkTLSPacketAmount = 100 @@ -29,10 +29,8 @@ func containsPort(ports []string, port string) bool { return false } -func Emit(reqResPair *api.RequestResponsePair) { - log.Printf("Emit reqResPair: %+v\n", reqResPair) - log.Printf("Emit reqResPair.Request.Orig: %v\n", reqResPair.Request.Orig) - log.Printf("Emit reqResPair.Response.Orig: %v\n", reqResPair.Response.Orig) +func Emit(item *api.OutputChannelItem) { + log.Printf("Emit item: %+v\n", item) } func (h *tcpStream) clientRun(tcpID *api.TcpID) {