Fix body size, receive (elapsed time) and timestamps (#258)

* Fix the HTTP body size (it's not applicable to AMQP and Kafka)

* Fix the elapsed time

* Change JSON fields from snake_case to camelCase
This commit is contained in:
M. Mert Yıldıran
2021-09-04 17:15:39 +03:00
committed by GitHub
parent 366c1d0c6c
commit 4e0ff74944
14 changed files with 98 additions and 67 deletions

View File

@@ -237,10 +237,11 @@ func GetEntry(c *gin.Context) {
// }) // })
// } // }
extension := extensionsMap[entryData.ProtocolName] extension := extensionsMap[entryData.ProtocolName]
protocol, representation, _ := extension.Dissector.Represent(&entryData) protocol, representation, bodySize, _ := extension.Dissector.Represent(&entryData)
c.JSON(http.StatusOK, tapApi.MizuEntryWrapper{ c.JSON(http.StatusOK, tapApi.MizuEntryWrapper{
Protocol: protocol, Protocol: protocol,
Representation: string(representation), Representation: string(representation),
BodySize: bodySize,
Data: entryData, Data: entryData,
}) })
} }

View File

@@ -68,13 +68,17 @@ type OutputChannelItem struct {
Pair *RequestResponsePair Pair *RequestResponsePair
} }
type SuperTimer struct {
CaptureTime time.Time
}
type Dissector interface { type Dissector interface {
Register(*Extension) Register(*Extension)
Ping() Ping()
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, emitter Emitter) error Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, emitter Emitter) error
Analyze(item *OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *MizuEntry Analyze(item *OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *MizuEntry
Summarize(entry *MizuEntry) *BaseEntryDetails Summarize(entry *MizuEntry) *BaseEntryDetails
Represent(entry *MizuEntry) (Protocol, []byte, error) Represent(entry *MizuEntry) (protocol Protocol, object []byte, bodySize int64, err error)
} }
type Emitting struct { type Emitting struct {
@@ -103,6 +107,7 @@ type MizuEntry struct {
RequestSenderIp string `json:"requestSenderIp" gorm:"column:requestSenderIp"` RequestSenderIp string `json:"requestSenderIp" gorm:"column:requestSenderIp"`
Service string `json:"service" gorm:"column:service"` Service string `json:"service" gorm:"column:service"`
Timestamp int64 `json:"timestamp" gorm:"column:timestamp"` Timestamp int64 `json:"timestamp" gorm:"column:timestamp"`
ElapsedTime int64 `json:"elapsedTime" gorm:"column:elapsedTime"`
Path string `json:"path" gorm:"column:path"` Path string `json:"path" gorm:"column:path"`
ResolvedSource string `json:"resolvedSource,omitempty" gorm:"column:resolvedSource"` ResolvedSource string `json:"resolvedSource,omitempty" gorm:"column:resolvedSource"`
ResolvedDestination string `json:"resolvedDestination,omitempty" gorm:"column:resolvedDestination"` ResolvedDestination string `json:"resolvedDestination,omitempty" gorm:"column:resolvedDestination"`
@@ -117,6 +122,7 @@ type MizuEntry struct {
type MizuEntryWrapper struct { type MizuEntryWrapper struct {
Protocol Protocol `json:"protocol"` Protocol Protocol `json:"protocol"`
Representation string `json:"representation"` Representation string `json:"representation"`
BodySize int64 `json:"bodySize"`
Data MizuEntry `json:"data"` Data MizuEntry `json:"data"`
} }

View File

@@ -93,10 +93,10 @@ type AMQPWrapper struct {
Details interface{} `json:"details"` Details interface{} `json:"details"`
} }
func emitAMQP(event interface{}, _type string, method string, connectionInfo *api.ConnectionInfo, emitter api.Emitter) { func emitAMQP(event interface{}, _type string, method string, connectionInfo *api.ConnectionInfo, captureTime time.Time, emitter api.Emitter) {
request := &api.GenericMessage{ request := &api.GenericMessage{
IsRequest: true, IsRequest: true,
CaptureTime: time.Now(), CaptureTime: captureTime,
Payload: AMQPPayload{ Payload: AMQPPayload{
Data: &AMQPWrapper{ Data: &AMQPWrapper{
Method: method, Method: method,
@@ -107,7 +107,7 @@ func emitAMQP(event interface{}, _type string, method string, connectionInfo *ap
} }
item := &api.OutputChannelItem{ item := &api.OutputChannelItem{
Protocol: protocol, Protocol: protocol,
Timestamp: time.Now().UnixNano() / int64(time.Millisecond), Timestamp: captureTime.UnixNano() / int64(time.Millisecond),
ConnectionInfo: connectionInfo, ConnectionInfo: connectionInfo,
Pair: &api.RequestResponsePair{ Pair: &api.RequestResponsePair{
Request: *request, Request: *request,

View File

@@ -41,7 +41,7 @@ func (d dissecting) Ping() {
const amqpRequest string = "amqp_request" const amqpRequest string = "amqp_request"
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error { func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error {
r := AmqpReader{b} r := AmqpReader{b}
var remaining int var remaining int
@@ -110,10 +110,10 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
switch lastMethodFrameMessage.(type) { switch lastMethodFrameMessage.(type) {
case *BasicPublish: case *BasicPublish:
eventBasicPublish.Body = f.Body eventBasicPublish.Body = f.Body
emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, emitter) emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, superTimer.CaptureTime, emitter)
case *BasicDeliver: case *BasicDeliver:
eventBasicDeliver.Body = f.Body eventBasicDeliver.Body = f.Body
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, emitter) emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, superTimer.CaptureTime, emitter)
default: default:
} }
@@ -134,7 +134,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
NoWait: m.NoWait, NoWait: m.NoWait,
Arguments: m.Arguments, Arguments: m.Arguments,
} }
emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, emitter) emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter)
case *BasicConsume: case *BasicConsume:
eventBasicConsume := &BasicConsume{ eventBasicConsume := &BasicConsume{
@@ -146,7 +146,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
NoWait: m.NoWait, NoWait: m.NoWait,
Arguments: m.Arguments, Arguments: m.Arguments,
} }
emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, emitter) emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter)
case *BasicDeliver: case *BasicDeliver:
eventBasicDeliver.ConsumerTag = m.ConsumerTag eventBasicDeliver.ConsumerTag = m.ConsumerTag
@@ -165,7 +165,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
NoWait: m.NoWait, NoWait: m.NoWait,
Arguments: m.Arguments, Arguments: m.Arguments,
} }
emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, emitter) emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter)
case *ExchangeDeclare: case *ExchangeDeclare:
eventExchangeDeclare := &ExchangeDeclare{ eventExchangeDeclare := &ExchangeDeclare{
@@ -178,7 +178,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
NoWait: m.NoWait, NoWait: m.NoWait,
Arguments: m.Arguments, Arguments: m.Arguments,
} }
emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, emitter) emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter)
case *ConnectionStart: case *ConnectionStart:
eventConnectionStart := &ConnectionStart{ eventConnectionStart := &ConnectionStart{
@@ -188,7 +188,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
Mechanisms: m.Mechanisms, Mechanisms: m.Mechanisms,
Locales: m.Locales, Locales: m.Locales,
} }
emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, emitter) emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter)
case *ConnectionClose: case *ConnectionClose:
eventConnectionClose := &ConnectionClose{ eventConnectionClose := &ConnectionClose{
@@ -197,7 +197,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
ClassId: m.ClassId, ClassId: m.ClassId,
MethodId: m.MethodId, MethodId: m.MethodId,
} }
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, emitter) emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, superTimer.CaptureTime, emitter)
default: default:
@@ -264,6 +264,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
RequestSenderIp: item.ConnectionInfo.ClientIP, RequestSenderIp: item.ConnectionInfo.ClientIP,
Service: service, Service: service,
Timestamp: item.Timestamp, Timestamp: item.Timestamp,
ElapsedTime: 0,
Path: summary, Path: summary,
ResolvedSource: resolvedSource, ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination, ResolvedDestination: resolvedDestination,
@@ -300,7 +301,9 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
} }
} }
func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error) { func (d dissecting) Represent(entry *api.MizuEntry) (p api.Protocol, object []byte, bodySize int64, err error) {
p = protocol
bodySize = 0
var root map[string]interface{} var root map[string]interface{}
json.Unmarshal([]byte(entry.Entry), &root) json.Unmarshal([]byte(entry.Entry), &root)
representation := make(map[string]interface{}, 0) representation := make(map[string]interface{}, 0)
@@ -334,8 +337,8 @@ func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error
break break
} }
representation["request"] = repRequest representation["request"] = repRequest
object, err := json.Marshal(representation) object, err = json.Marshal(representation)
return protocol, object, err return
} }
var Dissector dissecting var Dissector dissecting

View File

@@ -7,14 +7,13 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"time"
"github.com/romana/rlog" "github.com/romana/rlog"
"github.com/up9inc/mizu/tap/api" "github.com/up9inc/mizu/tap/api"
) )
func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, emitter api.Emitter) error { func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter) error {
streamID, messageHTTP1, err := grpcAssembler.readMessage() streamID, messageHTTP1, err := grpcAssembler.readMessage()
if err != nil { if err != nil {
return err return err
@@ -32,7 +31,7 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, emitter a
tcpID.DstPort, tcpID.DstPort,
streamID, streamID,
) )
item = reqResMatcher.registerRequest(ident, &messageHTTP1, time.Now()) item = reqResMatcher.registerRequest(ident, &messageHTTP1, superTimer.CaptureTime)
if item != nil { if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{ item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.SrcIP, ClientIP: tcpID.SrcIP,
@@ -51,7 +50,7 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, emitter a
tcpID.SrcPort, tcpID.SrcPort,
streamID, streamID,
) )
item = reqResMatcher.registerResponse(ident, &messageHTTP1, time.Now()) item = reqResMatcher.registerResponse(ident, &messageHTTP1, superTimer.CaptureTime)
if item != nil { if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{ item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.DstIP, ClientIP: tcpID.DstIP,
@@ -71,7 +70,7 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, emitter a
return nil return nil
} }
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error { func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error {
req, err := http.ReadRequest(b) req, err := http.ReadRequest(b)
if err != nil { if err != nil {
// log.Println("Error reading stream:", err) // log.Println("Error reading stream:", err)
@@ -99,7 +98,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
tcpID.DstPort, tcpID.DstPort,
counterPair.Request, counterPair.Request,
) )
item := reqResMatcher.registerRequest(ident, req, time.Now()) item := reqResMatcher.registerRequest(ident, req, superTimer.CaptureTime)
if item != nil { if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{ item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.SrcIP, ClientIP: tcpID.SrcIP,
@@ -113,7 +112,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
return nil return nil
} }
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error { func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error {
res, err := http.ReadResponse(b, nil) res, err := http.ReadResponse(b, nil)
if err != nil { if err != nil {
// log.Println("Error reading stream:", err) // log.Println("Error reading stream:", err)
@@ -149,7 +148,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
tcpID.SrcPort, tcpID.SrcPort,
counterPair.Response, counterPair.Response,
) )
item := reqResMatcher.registerResponse(ident, res, time.Now()) item := reqResMatcher.registerResponse(ident, res, superTimer.CaptureTime)
if item != nil { if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{ item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.DstIP, ClientIP: tcpID.DstIP,

View File

@@ -7,6 +7,7 @@ import (
"io" "io"
"log" "log"
"net/url" "net/url"
"time"
"github.com/romana/rlog" "github.com/romana/rlog"
@@ -59,7 +60,7 @@ func (d dissecting) Ping() {
log.Printf("pong %s\n", protocol.Name) log.Printf("pong %s\n", protocol.Name)
} }
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error { func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error {
ident := fmt.Sprintf("%s->%s:%s->%s", tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, tcpID.DstPort) ident := fmt.Sprintf("%s->%s:%s->%s", tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, tcpID.DstPort)
isHTTP2, err := checkIsHTTP2Connection(b, isClient) isHTTP2, err := checkIsHTTP2Connection(b, isClient)
if err != nil { if err != nil {
@@ -79,7 +80,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
success := false success := false
for { for {
if isHTTP2 { if isHTTP2 {
err = handleHTTP2Stream(grpcAssembler, tcpID, emitter) err = handleHTTP2Stream(grpcAssembler, tcpID, superTimer, emitter)
if err == io.EOF || err == io.ErrUnexpectedEOF { if err == io.EOF || err == io.ErrUnexpectedEOF {
break break
} else if err != nil { } else if err != nil {
@@ -88,7 +89,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
} }
success = true success = true
} else if isClient { } else if isClient {
err = handleHTTP1ClientStream(b, tcpID, counterPair, emitter) err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter)
if err == io.EOF || err == io.ErrUnexpectedEOF { if err == io.EOF || err == io.ErrUnexpectedEOF {
break break
} else if err != nil { } else if err != nil {
@@ -97,7 +98,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
} }
success = true success = true
} else { } else {
err = handleHTTP1ServerStream(b, tcpID, counterPair, emitter) err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter)
if err == io.EOF || err == io.ErrUnexpectedEOF { if err == io.EOF || err == io.ErrUnexpectedEOF {
break break
} else if err != nil { } else if err != nil {
@@ -161,6 +162,8 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
} else if resolvedSource != "" { } else if resolvedSource != "" {
service = SetHostname(service, resolvedSource) service = SetHostname(service, resolvedSource)
} }
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
entryBytes, _ := json.Marshal(item.Pair) entryBytes, _ := json.Marshal(item.Pair)
return &api.MizuEntry{ return &api.MizuEntry{
ProtocolName: protocol.Name, ProtocolName: protocol.Name,
@@ -173,6 +176,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
RequestSenderIp: item.ConnectionInfo.ClientIP, RequestSenderIp: item.ConnectionInfo.ClientIP,
Service: service, Service: service,
Timestamp: item.Timestamp, Timestamp: item.Timestamp,
ElapsedTime: elapsedTime,
Path: path, Path: path,
ResolvedSource: resolvedSource, ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination, ResolvedDestination: resolvedDestination,
@@ -214,9 +218,7 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
} }
} }
func representRequest(request map[string]interface{}) []interface{} { func representRequest(request map[string]interface{}) (repRequest []interface{}) {
repRequest := make([]interface{}, 0)
details, _ := json.Marshal([]map[string]string{ details, _ := json.Marshal([]map[string]string{
{ {
"name": "Method", "name": "Method",
@@ -299,11 +301,13 @@ func representRequest(request map[string]interface{}) []interface{} {
} }
} }
return repRequest return
} }
func representResponse(response map[string]interface{}) []interface{} { func representResponse(response map[string]interface{}) (repResponse []interface{}, bodySize int64) {
repResponse := make([]interface{}, 0) repResponse = make([]interface{}, 0)
bodySize = int64(response["bodySize"].(float64))
details, _ := json.Marshal([]map[string]string{ details, _ := json.Marshal([]map[string]string{
{ {
@@ -316,7 +320,7 @@ func representResponse(response map[string]interface{}) []interface{} {
}, },
{ {
"name": "Body Size", "name": "Body Size",
"value": fmt.Sprintf("%g bytes", response["bodySize"].(float64)), "value": fmt.Sprintf("%d bytes", bodySize),
}, },
}) })
repResponse = append(repResponse, map[string]string{ repResponse = append(repResponse, map[string]string{
@@ -356,11 +360,10 @@ func representResponse(response map[string]interface{}) []interface{} {
}) })
} }
return repResponse return
} }
func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error) { func (d dissecting) Represent(entry *api.MizuEntry) (p api.Protocol, object []byte, bodySize int64, err error) {
var p api.Protocol
if entry.ProtocolVersion == "2.0" { if entry.ProtocolVersion == "2.0" {
p = http2Protocol p = http2Protocol
} else { } else {
@@ -374,11 +377,11 @@ func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error
reqDetails := request["details"].(map[string]interface{}) reqDetails := request["details"].(map[string]interface{})
resDetails := response["details"].(map[string]interface{}) resDetails := response["details"].(map[string]interface{})
repRequest := representRequest(reqDetails) repRequest := representRequest(reqDetails)
repResponse := representResponse(resDetails) repResponse, bodySize := representResponse(resDetails)
representation["request"] = repRequest representation["request"] = repRequest
representation["response"] = repResponse representation["response"] = repResponse
object, err := json.Marshal(representation) object, err = json.Marshal(representation)
return p, object, err return
} }
var Dissector dissecting var Dissector dissecting

View File

@@ -85,7 +85,7 @@ func (matcher *requestResponseMatcher) registerResponse(ident string, response *
func (matcher *requestResponseMatcher) preparePair(requestHTTPMessage *api.GenericMessage, responseHTTPMessage *api.GenericMessage) *api.OutputChannelItem { func (matcher *requestResponseMatcher) preparePair(requestHTTPMessage *api.GenericMessage, responseHTTPMessage *api.GenericMessage) *api.OutputChannelItem {
return &api.OutputChannelItem{ return &api.OutputChannelItem{
Protocol: protocol, Protocol: protocol,
Timestamp: time.Now().UnixNano() / int64(time.Millisecond), Timestamp: requestHTTPMessage.CaptureTime.UnixNano() / int64(time.Millisecond),
ConnectionInfo: nil, ConnectionInfo: nil,
Pair: &api.RequestResponsePair{ Pair: &api.RequestResponsePair{
Request: *requestHTTPMessage, Request: *requestHTTPMessage,

View File

@@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"log" "log"
"time"
"github.com/up9inc/mizu/tap/api" "github.com/up9inc/mizu/tap/api"
) )
@@ -37,15 +38,15 @@ func (d dissecting) Ping() {
log.Printf("pong %s\n", _protocol.Name) log.Printf("pong %s\n", _protocol.Name)
} }
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error { func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error {
for { for {
if isClient { if isClient {
_, _, err := ReadRequest(b, tcpID) _, _, err := ReadRequest(b, tcpID, superTimer)
if err != nil { if err != nil {
return err return err
} }
} else { } else {
err := ReadResponse(b, tcpID, emitter) err := ReadResponse(b, tcpID, superTimer, emitter)
if err != nil { if err != nil {
return err return err
} }
@@ -131,6 +132,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
} }
request["url"] = summary request["url"] = summary
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
entryBytes, _ := json.Marshal(item.Pair) entryBytes, _ := json.Marshal(item.Pair)
return &api.MizuEntry{ return &api.MizuEntry{
ProtocolName: _protocol.Name, ProtocolName: _protocol.Name,
@@ -143,6 +145,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
RequestSenderIp: item.ConnectionInfo.ClientIP, RequestSenderIp: item.ConnectionInfo.ClientIP,
Service: service, Service: service,
Timestamp: item.Timestamp, Timestamp: item.Timestamp,
ElapsedTime: elapsedTime,
Path: summary, Path: summary,
ResolvedSource: resolvedSource, ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination, ResolvedDestination: resolvedDestination,
@@ -178,7 +181,9 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
} }
} }
func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error) { func (d dissecting) Represent(entry *api.MizuEntry) (p api.Protocol, object []byte, bodySize int64, err error) {
p = _protocol
bodySize = 0
var root map[string]interface{} var root map[string]interface{}
json.Unmarshal([]byte(entry.Entry), &root) json.Unmarshal([]byte(entry.Entry), &root)
representation := make(map[string]interface{}, 0) representation := make(map[string]interface{}, 0)
@@ -224,8 +229,8 @@ func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error
representation["request"] = repRequest representation["request"] = repRequest
representation["response"] = repResponse representation["response"] = repResponse
object, err := json.Marshal(representation) object, err = json.Marshal(representation)
return _protocol, object, err return
} }
var Dissector dissecting var Dissector dissecting

View File

@@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"io" "io"
"reflect" "reflect"
"time"
"github.com/up9inc/mizu/tap/api" "github.com/up9inc/mizu/tap/api"
) )
@@ -15,9 +16,10 @@ type Request struct {
CorrelationID int32 CorrelationID int32
ClientID string ClientID string
Payload interface{} Payload interface{}
CaptureTime time.Time
} }
func ReadRequest(r io.Reader, tcpID *api.TcpID) (apiKey ApiKey, apiVersion int16, err error) { func ReadRequest(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer) (apiKey ApiKey, apiVersion int16, err error) {
d := &decoder{reader: r, remain: 4} d := &decoder{reader: r, remain: 4}
size := d.readInt32() size := d.readInt32()
@@ -213,6 +215,7 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID) (apiKey ApiKey, apiVersion int16
ApiVersion: apiVersion, ApiVersion: apiVersion,
CorrelationID: correlationID, CorrelationID: correlationID,
ClientID: clientID, ClientID: clientID,
CaptureTime: superTimer.CaptureTime,
Payload: payload, Payload: payload,
} }

View File

@@ -13,9 +13,10 @@ type Response struct {
Size int32 Size int32
CorrelationID int32 CorrelationID int32
Payload interface{} Payload interface{}
CaptureTime time.Time
} }
func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error) { func ReadResponse(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter) (err error) {
d := &decoder{reader: r, remain: 4} d := &decoder{reader: r, remain: 4}
size := d.readInt32() size := d.readInt32()
@@ -39,6 +40,7 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error
Size: size, Size: size,
CorrelationID: correlationID, CorrelationID: correlationID,
Payload: payload, Payload: payload,
CaptureTime: superTimer.CaptureTime,
} }
key := fmt.Sprintf( key := fmt.Sprintf(
@@ -258,12 +260,12 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error
item := &api.OutputChannelItem{ item := &api.OutputChannelItem{
Protocol: _protocol, Protocol: _protocol,
Timestamp: time.Now().UnixNano() / int64(time.Millisecond), Timestamp: reqResPair.Request.CaptureTime.UnixNano() / int64(time.Millisecond),
ConnectionInfo: connectionInfo, ConnectionInfo: connectionInfo,
Pair: &api.RequestResponsePair{ Pair: &api.RequestResponsePair{
Request: api.GenericMessage{ Request: api.GenericMessage{
IsRequest: true, IsRequest: true,
CaptureTime: time.Now(), CaptureTime: reqResPair.Request.CaptureTime,
Payload: KafkaPayload{ Payload: KafkaPayload{
Data: &KafkaWrapper{ Data: &KafkaWrapper{
Method: apiNames[apiKey], Method: apiNames[apiKey],
@@ -274,7 +276,7 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error
}, },
Response: api.GenericMessage{ Response: api.GenericMessage{
IsRequest: false, IsRequest: false,
CaptureTime: time.Now(), CaptureTime: reqResPair.Response.CaptureTime,
Payload: KafkaPayload{ Payload: KafkaPayload{
Data: &KafkaWrapper{ Data: &KafkaWrapper{
Method: apiNames[apiKey], Method: apiNames[apiKey],

View File

@@ -51,7 +51,7 @@ type tcpReader struct {
isOutgoing bool isOutgoing bool
msgQueue chan tcpReaderDataMsg // Channel of captured reassembled tcp payload msgQueue chan tcpReaderDataMsg // Channel of captured reassembled tcp payload
data []byte data []byte
captureTime time.Time superTimer *api.SuperTimer
parent *tcpStream parent *tcpStream
messageCount uint messageCount uint
packetsSeen uint packetsSeen uint
@@ -69,7 +69,7 @@ func (h *tcpReader) Read(p []byte) (int, error) {
msg, ok = <-h.msgQueue msg, ok = <-h.msgQueue
h.data = msg.bytes h.data = msg.bytes
h.captureTime = msg.timestamp h.superTimer.CaptureTime = msg.timestamp
if len(h.data) > 0 { if len(h.data) > 0 {
h.packetsSeen += 1 h.packetsSeen += 1
} }
@@ -96,7 +96,7 @@ func (h *tcpReader) Read(p []byte) (int, error) {
func (h *tcpReader) run(wg *sync.WaitGroup) { func (h *tcpReader) run(wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
b := bufio.NewReader(h) b := bufio.NewReader(h)
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.emitter) err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.emitter)
if err != nil { if err != nil {
io.Copy(ioutil.Discard, b) io.Copy(ioutil.Discard, b)
} }

View File

@@ -144,13 +144,14 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
// This is where we pass the reassembled information onwards // This is where we pass the reassembled information onwards
// This channel is read by an tcpReader object // This channel is read by an tcpReader object
statsTracker.incReassembledTcpPayloadsCount() statsTracker.incReassembledTcpPayloadsCount()
timestamp := ac.GetCaptureInfo().Timestamp
if dir == reassembly.TCPDirClientToServer { if dir == reassembly.TCPDirClientToServer {
for _, reader := range t.clients { for _, reader := range t.clients {
reader.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp} reader.msgQueue <- tcpReaderDataMsg{data, timestamp}
} }
} else { } else {
for _, reader := range t.servers { for _, reader := range t.servers {
reader.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp} reader.msgQueue <- tcpReaderDataMsg{data, timestamp}
} }
} }
} }

View File

@@ -54,8 +54,9 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
Response: 0, Response: 0,
} }
stream.clients = append(stream.clients, tcpReader{ stream.clients = append(stream.clients, tcpReader{
msgQueue: make(chan tcpReaderDataMsg), msgQueue: make(chan tcpReaderDataMsg),
ident: fmt.Sprintf("%s %s", net, transport), superTimer: &api.SuperTimer{},
ident: fmt.Sprintf("%s %s", net, transport),
tcpID: &api.TcpID{ tcpID: &api.TcpID{
SrcIP: srcIp, SrcIP: srcIp,
DstIP: dstIp, DstIP: dstIp,
@@ -71,8 +72,9 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
counterPair: counterPair, counterPair: counterPair,
}) })
stream.servers = append(stream.servers, tcpReader{ stream.servers = append(stream.servers, tcpReader{
msgQueue: make(chan tcpReaderDataMsg), msgQueue: make(chan tcpReaderDataMsg),
ident: fmt.Sprintf("%s %s", net, transport), superTimer: &api.SuperTimer{},
ident: fmt.Sprintf("%s %s", net, transport),
tcpID: &api.TcpID{ tcpID: &api.TcpID{
SrcIP: net.Dst().String(), SrcIP: net.Dst().String(),
DstIP: net.Src().String(), DstIP: net.Src().String(),

View File

@@ -32,7 +32,7 @@ interface EntryDetailedProps {
export const formatSize = (n: number) => n > 1000 ? `${Math.round(n / 1000)}KB` : `${n} B`; export const formatSize = (n: number) => n > 1000 ? `${Math.round(n / 1000)}KB` : `${n} B`;
const EntryTitle: React.FC<any> = ({protocol, data}) => { const EntryTitle: React.FC<any> = ({protocol, data, bodySize, elapsedTime}) => {
const classes = useStyles(); const classes = useStyles();
const {response} = JSON.parse(data.entry); const {response} = JSON.parse(data.entry);
@@ -40,7 +40,8 @@ const EntryTitle: React.FC<any> = ({protocol, data}) => {
return <div className={classes.entryTitle}> return <div className={classes.entryTitle}>
<Protocol protocol={protocol} horizontal={true}/> <Protocol protocol={protocol} horizontal={true}/>
<div style={{right: "30px", position: "absolute", display: "flex"}}> <div style={{right: "30px", position: "absolute", display: "flex"}}>
{response.payload && <div style={{margin: "0 18px", opacity: 0.5}}>{formatSize(response.payload.bodySize)}</div>} {response.payload && <div style={{margin: "0 18px", opacity: 0.5}}>{formatSize(bodySize)}</div>}
<div style={{marginRight: 18, opacity: 0.5}}>{Math.round(elapsedTime)}ms</div>
<div style={{opacity: 0.5}}>{'rulesMatched' in data ? data.rulesMatched?.length : '0'} Rules Applied</div> <div style={{opacity: 0.5}}>{'rulesMatched' in data ? data.rulesMatched?.length : '0'} Rules Applied</div>
</div> </div>
</div>; </div>;
@@ -63,7 +64,12 @@ const EntrySummary: React.FC<any> = ({data}) => {
export const EntryDetailed: React.FC<EntryDetailedProps> = ({entryData}) => { export const EntryDetailed: React.FC<EntryDetailedProps> = ({entryData}) => {
return <> return <>
<EntryTitle protocol={entryData.protocol} data={entryData.data}/> <EntryTitle
protocol={entryData.protocol}
data={entryData.data}
bodySize={entryData.bodySize}
elapsedTime={entryData.data.elapsedTime}
/>
{entryData.data && <EntrySummary data={entryData.data}/>} {entryData.data && <EntrySummary data={entryData.data}/>}
<> <>
{entryData.data && <EntryViewer representation={entryData.representation} color={entryData.protocol.background_color}/>} {entryData.data && <EntryViewer representation={entryData.representation} color={entryData.protocol.background_color}/>}