Call a function pointer to emit dissected data back to the tap package

This commit is contained in:
M. Mert Yildiran 2021-08-18 08:11:32 +03:00
parent b1f20fea1e
commit e1267b6c44
No known key found for this signature in database
GPG Key ID: D42ADB236521BF7A
6 changed files with 33 additions and 32 deletions

View File

@ -44,5 +44,5 @@ type RequestResponsePair struct {
type Dissector interface { type Dissector interface {
Register(*Extension) Register(*Extension)
Ping() Ping()
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID) *RequestResponsePair Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, callback func(reqResPair *RequestResponsePair))
} }

View File

@ -13,19 +13,18 @@ func init() {
type dissecting string type dissecting string
func (g dissecting) Register(extension *api.Extension) { func (d dissecting) Register(extension *api.Extension) {
extension.Name = "amqp" extension.Name = "amqp"
extension.OutboundPorts = []string{"5671", "5672"} extension.OutboundPorts = []string{"5671", "5672"}
extension.InboundPorts = []string{} extension.InboundPorts = []string{}
} }
func (g dissecting) Ping() { func (d dissecting) Ping() {
log.Printf("pong AMQP\n") log.Printf("pong AMQP\n")
} }
func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) *api.RequestResponsePair { func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, callback func(reqResPair *api.RequestResponsePair)) {
// TODO: Implement // TODO: Implement
return nil
} }
var Dissector dissecting var Dissector dissecting

View File

@ -11,10 +11,10 @@ import (
"github.com/up9inc/mizu/tap/api" "github.com/up9inc/mizu/tap/api"
) )
func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID) (*api.RequestResponsePair, error) { func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, Emit func(reqResPair *api.RequestResponsePair)) error {
streamID, messageHTTP1, err := grpcAssembler.readMessage() streamID, messageHTTP1, err := grpcAssembler.readMessage()
if err != nil { if err != nil {
return nil, err return err
} }
var reqResPair *api.RequestResponsePair var reqResPair *api.RequestResponsePair
@ -45,18 +45,18 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID) (*api.Req
} }
if reqResPair != nil { if reqResPair != nil {
return reqResPair, nil Emit(reqResPair)
} }
return nil, nil return nil
} }
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID) (*api.RequestResponsePair, error) { func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, Emit func(reqResPair *api.RequestResponsePair)) error {
requestCounter++ requestCounter++
req, err := http.ReadRequest(b) req, err := http.ReadRequest(b)
if err != nil { if err != nil {
log.Println("Error reading stream:", err) log.Println("Error reading stream:", err)
return nil, err return err
} else { } else {
body, _ := ioutil.ReadAll(req.Body) body, _ := ioutil.ReadAll(req.Body)
req.Body.Close() req.Body.Close()
@ -73,17 +73,17 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID) (*api.RequestRes
) )
reqResPair := reqResMatcher.registerRequest(ident, req, time.Now()) reqResPair := reqResMatcher.registerRequest(ident, req, time.Now())
if reqResPair != nil { if reqResPair != nil {
fmt.Printf("reqResPair: %+v\n", reqResPair) Emit(reqResPair)
} }
return reqResPair, nil return nil
} }
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID) (*api.RequestResponsePair, error) { func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, Emit func(reqResPair *api.RequestResponsePair)) error {
responseCounter++ responseCounter++
res, err := http.ReadResponse(b, nil) res, err := http.ReadResponse(b, nil)
if err != nil { if err != nil {
log.Println("Error reading stream:", err) log.Println("Error reading stream:", err)
return nil, err return err
} else { } else {
body, _ := ioutil.ReadAll(res.Body) body, _ := ioutil.ReadAll(res.Body)
res.Body.Close() res.Body.Close()
@ -99,7 +99,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID) (*api.RequestRes
) )
reqResPair := reqResMatcher.registerResponse(ident, res, time.Now()) reqResPair := reqResMatcher.registerResponse(ident, res, time.Now())
if reqResPair != nil { if reqResPair != nil {
fmt.Printf("reqResPair: %+v\n", reqResPair) Emit(reqResPair)
} }
return reqResPair, nil return nil
} }

View File

@ -20,17 +20,17 @@ func init() {
type dissecting string type dissecting string
func (g dissecting) Register(extension *api.Extension) { func (d dissecting) Register(extension *api.Extension) {
extension.Name = "http" extension.Name = "http"
extension.OutboundPorts = []string{"80", "8080", "443"} extension.OutboundPorts = []string{"80", "8080", "443"}
extension.InboundPorts = []string{} extension.InboundPorts = []string{}
} }
func (g dissecting) Ping() { func (d dissecting) Ping() {
log.Printf("pong HTTP\n") log.Printf("pong HTTP\n")
} }
func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) *api.RequestResponsePair { func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, Emit func(reqResPair *api.RequestResponsePair)) {
ident := fmt.Sprintf("%s->%s:%s->%s", tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, tcpID.DstPort) ident := fmt.Sprintf("%s->%s:%s->%s", tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, tcpID.DstPort)
isHTTP2, err := checkIsHTTP2Connection(b, isClient) isHTTP2, err := checkIsHTTP2Connection(b, isClient)
if err != nil { if err != nil {
@ -47,11 +47,9 @@ func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) *a
grpcAssembler = createGrpcAssembler(b) grpcAssembler = createGrpcAssembler(b)
} }
var reqResPair *api.RequestResponsePair
for { for {
if isHTTP2 { if isHTTP2 {
reqResPair, err = handleHTTP2Stream(grpcAssembler, tcpID) err = handleHTTP2Stream(grpcAssembler, tcpID, Emit)
if err == io.EOF || err == io.ErrUnexpectedEOF { if err == io.EOF || err == io.ErrUnexpectedEOF {
break break
} else if err != nil { } else if err != nil {
@ -59,7 +57,7 @@ func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) *a
continue continue
} }
} else if isClient { } else if isClient {
reqResPair, err = handleHTTP1ClientStream(b, tcpID) err = handleHTTP1ClientStream(b, tcpID, Emit)
if err == io.EOF || err == io.ErrUnexpectedEOF { if err == io.EOF || err == io.ErrUnexpectedEOF {
break break
} else if err != nil { } else if err != nil {
@ -67,7 +65,7 @@ func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) *a
continue continue
} }
} else { } else {
reqResPair, err = handleHTTP1ServerStream(b, tcpID) err = handleHTTP1ServerStream(b, tcpID, Emit)
if err == io.EOF || err == io.ErrUnexpectedEOF { if err == io.EOF || err == io.ErrUnexpectedEOF {
break break
} else if err != nil { } else if err != nil {
@ -76,7 +74,6 @@ func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) *a
} }
} }
} }
return reqResPair
} }
var Dissector dissecting var Dissector dissecting

View File

@ -13,19 +13,18 @@ func init() {
type dissecting string type dissecting string
func (g dissecting) Register(extension *api.Extension) { func (d dissecting) Register(extension *api.Extension) {
extension.Name = "kafka" extension.Name = "kafka"
extension.OutboundPorts = []string{"9092"} extension.OutboundPorts = []string{"9092"}
extension.InboundPorts = []string{} extension.InboundPorts = []string{}
} }
func (g dissecting) Ping() { func (d dissecting) Ping() {
log.Printf("pong Kafka\n") log.Printf("pong Kafka\n")
} }
func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) *api.RequestResponsePair { func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, callback func(reqResPair *api.RequestResponsePair)) {
// TODO: Implement // TODO: Implement
return nil
} }
var Dissector dissecting var Dissector dissecting

View File

@ -28,12 +28,18 @@ func containsPort(ports []string, port string) bool {
return false return false
} }
func Emit(reqResPair *api.RequestResponsePair) {
log.Printf("Emit reqResPair: %+v\n", reqResPair)
log.Printf("Emit reqResPair.Request.Orig: %v\n", reqResPair.Request.Orig)
log.Printf("Emit reqResPair.Response.Orig: %v\n", reqResPair.Response.Orig)
}
func (h *tcpStream) clientRun(tcpID *api.TcpID) { func (h *tcpStream) clientRun(tcpID *api.TcpID) {
b := bufio.NewReader(&h.r) b := bufio.NewReader(&h.r)
for _, extension := range extensions { for _, extension := range extensions {
if containsPort(extension.OutboundPorts, h.transport.Dst().String()) { if containsPort(extension.OutboundPorts, h.transport.Dst().String()) {
extension.Dissector.Ping() extension.Dissector.Ping()
extension.Dissector.Dissect(b, true, tcpID) extension.Dissector.Dissect(b, true, tcpID, Emit)
} }
} }
} }
@ -43,7 +49,7 @@ func (h *tcpStream) serverRun(tcpID *api.TcpID) {
for _, extension := range extensions { for _, extension := range extensions {
if containsPort(extension.OutboundPorts, h.transport.Src().String()) { if containsPort(extension.OutboundPorts, h.transport.Src().String()) {
extension.Dissector.Ping() extension.Dissector.Ping()
extension.Dissector.Dissect(b, false, tcpID) extension.Dissector.Dissect(b, false, tcpID, Emit)
} }
} }
} }