diff --git a/tap/api/api.go b/tap/api/api.go index 31289737b..132450f60 100644 --- a/tap/api/api.go +++ b/tap/api/api.go @@ -3,6 +3,7 @@ package api import ( "bufio" "plugin" + "time" ) type Extension struct { @@ -29,8 +30,19 @@ type TcpID struct { DstPort string } +type GenericMessage struct { + IsRequest bool + CaptureTime time.Time + Orig interface{} +} + +type RequestResponsePair struct { + Request GenericMessage `json:"request"` + Response GenericMessage `json:"response"` +} + type Dissector interface { Register(*Extension) Ping() - Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID) interface{} + Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID) *RequestResponsePair } diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index 15012e5e6..8e4dbf397 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -23,7 +23,7 @@ func (g dissecting) Ping() { log.Printf("pong AMQP\n") } -func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) interface{} { +func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) *api.RequestResponsePair { // TODO: Implement return nil } diff --git a/tap/extensions/http/main.go b/tap/extensions/http/main.go index b83cb55cd..9f6e5f2d7 100644 --- a/tap/extensions/http/main.go +++ b/tap/extensions/http/main.go @@ -33,7 +33,7 @@ func (g dissecting) Ping() { log.Printf("pong HTTP\n") } -func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) interface{} { +func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) *api.RequestResponsePair { for { if isClient { requestCounter++ @@ -79,10 +79,11 @@ func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) in ) reqResPair := reqResMatcher.registerResponse(ident, res, time.Now()) if reqResPair != nil { - log.Printf("YES REQRES MATCHED!\n") + return reqResPair } } } + return nil } var Dissector dissecting diff --git a/tap/extensions/http/matcher.go b/tap/extensions/http/matcher.go index fa4db76ec..fa98038af 100644 --- a/tap/extensions/http/matcher.go +++ b/tap/extensions/http/matcher.go @@ -6,21 +6,12 @@ import ( "strings" "sync" "time" + + "github.com/up9inc/mizu/tap/api" ) var reqResMatcher = createResponseRequestMatcher() // global -type requestResponsePair struct { - Request httpMessage `json:"request"` - Response httpMessage `json:"response"` -} - -type httpMessage struct { - isRequest bool - captureTime time.Time - orig interface{} -} - // Key is {client_addr}:{client_port}->{dest_addr}:{dest_port} type requestResponseMatcher struct { openMessagesMap sync.Map @@ -31,21 +22,21 @@ func createResponseRequestMatcher() requestResponseMatcher { return *newMatcher } -func (matcher *requestResponseMatcher) registerRequest(ident string, request *http.Request, captureTime time.Time) *requestResponsePair { +func (matcher *requestResponseMatcher) registerRequest(ident string, request *http.Request, captureTime time.Time) *api.RequestResponsePair { split := splitIdent(ident) key := genKey(split) // fmt.Printf(">>> request key: %v\n", key) - requestHTTPMessage := httpMessage{ - isRequest: true, - captureTime: captureTime, - orig: request, + requestHTTPMessage := api.GenericMessage{ + IsRequest: true, + CaptureTime: captureTime, + Orig: request, } if response, found := matcher.openMessagesMap.LoadAndDelete(key); found { - // Type assertion always succeeds because all of the map's values are of httpMessage type - responseHTTPMessage := response.(*httpMessage) - if responseHTTPMessage.isRequest { + // Type assertion always succeeds because all of the map's values are of api.GenericMessage type + responseHTTPMessage := response.(*api.GenericMessage) + if responseHTTPMessage.IsRequest { SilentError("Request-Duplicate", "Got duplicate request with same identifier") return nil } @@ -58,21 +49,21 @@ func (matcher *requestResponseMatcher) registerRequest(ident string, request *ht return nil } -func (matcher *requestResponseMatcher) registerResponse(ident string, response *http.Response, captureTime time.Time) *requestResponsePair { +func (matcher *requestResponseMatcher) registerResponse(ident string, response *http.Response, captureTime time.Time) *api.RequestResponsePair { split := splitIdent(ident) key := genKey(split) // fmt.Printf(">>> response key: %v\n", key) - responseHTTPMessage := httpMessage{ - isRequest: false, - captureTime: captureTime, - orig: response, + responseHTTPMessage := api.GenericMessage{ + IsRequest: false, + CaptureTime: captureTime, + Orig: response, } if request, found := matcher.openMessagesMap.LoadAndDelete(key); found { - // Type assertion always succeeds because all of the map's values are of httpMessage type - requestHTTPMessage := request.(*httpMessage) - if !requestHTTPMessage.isRequest { + // Type assertion always succeeds because all of the map's values are of api.GenericMessage type + requestHTTPMessage := request.(*api.GenericMessage) + if !requestHTTPMessage.IsRequest { SilentError("Response-Duplicate", "Got duplicate response with same identifier") return nil } @@ -85,8 +76,8 @@ func (matcher *requestResponseMatcher) registerResponse(ident string, response * return nil } -func (matcher *requestResponseMatcher) preparePair(requestHTTPMessage *httpMessage, responseHTTPMessage *httpMessage) *requestResponsePair { - return &requestResponsePair{ +func (matcher *requestResponseMatcher) preparePair(requestHTTPMessage *api.GenericMessage, responseHTTPMessage *api.GenericMessage) *api.RequestResponsePair { + return &api.RequestResponsePair{ Request: *requestHTTPMessage, Response: *responseHTTPMessage, } @@ -106,8 +97,8 @@ func (matcher *requestResponseMatcher) deleteOlderThan(t time.Time) int { numDeleted := 0 matcher.openMessagesMap.Range(func(key interface{}, value interface{}) bool { - message, _ := value.(*httpMessage) - if message.captureTime.Before(t) { + message, _ := value.(*api.GenericMessage) + if message.CaptureTime.Before(t) { matcher.openMessagesMap.Delete(key) numDeleted++ } diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index 10e63195a..a1a78937f 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -23,7 +23,7 @@ func (g dissecting) Ping() { log.Printf("pong Kafka\n") } -func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) interface{} { +func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) *api.RequestResponsePair { // TODO: Implement return nil } diff --git a/tap/tcp_stream_factory.go b/tap/tcp_stream_factory.go index 39dcddb32..6405dd500 100644 --- a/tap/tcp_stream_factory.go +++ b/tap/tcp_stream_factory.go @@ -3,6 +3,7 @@ package tap import ( "bufio" "fmt" + "log" "sync" "github.com/romana/rlog" @@ -45,13 +46,14 @@ func (h *tcpStream) serverRun(tcpID *api.TcpID) { for _, extension := range extensions { if containsPort(extension.OutboundPorts, h.transport.Src().String()) { extension.Dissector.Ping() - extension.Dissector.Dissect(b, false, tcpID) + reqResPair := extension.Dissector.Dissect(b, false, tcpID) + log.Printf("reqResPair: %+v\n", reqResPair) } } } func (h *tcpStreamFactory) New(net, transport gopacket.Flow) tcpassembly.Stream { - fmt.Printf("* NEW: %s %s\n", net, transport) + log.Printf("* NEW: %s %s\n", net, transport) stream := &tcpStream{ net: net, transport: transport,