diff --git a/agent/pkg/controllers/entries_controller.go b/agent/pkg/controllers/entries_controller.go index 0b4dfa702..ce639eb26 100644 --- a/agent/pkg/controllers/entries_controller.go +++ b/agent/pkg/controllers/entries_controller.go @@ -237,10 +237,11 @@ func GetEntry(c *gin.Context) { // }) // } extension := extensionsMap[entryData.ProtocolName] - protocol, representation, _ := extension.Dissector.Represent(&entryData) + protocol, representation, bodySize, _ := extension.Dissector.Represent(&entryData) c.JSON(http.StatusOK, tapApi.MizuEntryWrapper{ Protocol: protocol, Representation: string(representation), + BodySize: bodySize, Data: entryData, }) } diff --git a/tap/api/api.go b/tap/api/api.go index 65674628f..8bf04e5fc 100644 --- a/tap/api/api.go +++ b/tap/api/api.go @@ -68,13 +68,17 @@ type OutputChannelItem struct { Pair *RequestResponsePair } +type SuperTimer struct { + CaptureTime time.Time +} + type Dissector interface { Register(*Extension) Ping() - Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, emitter Emitter) error + Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, emitter Emitter) error Analyze(item *OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *MizuEntry Summarize(entry *MizuEntry) *BaseEntryDetails - Represent(entry *MizuEntry) (Protocol, []byte, error) + Represent(entry *MizuEntry) (protocol Protocol, object []byte, bodySize int64, err error) } type Emitting struct { @@ -103,6 +107,7 @@ type MizuEntry struct { RequestSenderIp string `json:"requestSenderIp" gorm:"column:requestSenderIp"` Service string `json:"service" gorm:"column:service"` Timestamp int64 `json:"timestamp" gorm:"column:timestamp"` + ElapsedTime int64 `json:"elapsedTime" gorm:"column:elapsedTime"` Path string `json:"path" gorm:"column:path"` ResolvedSource string `json:"resolvedSource,omitempty" gorm:"column:resolvedSource"` ResolvedDestination string `json:"resolvedDestination,omitempty" gorm:"column:resolvedDestination"` @@ -117,6 +122,7 @@ type MizuEntry struct { type MizuEntryWrapper struct { Protocol Protocol `json:"protocol"` Representation string `json:"representation"` + BodySize int64 `json:"bodySize"` Data MizuEntry `json:"data"` } diff --git a/tap/extensions/amqp/helpers.go b/tap/extensions/amqp/helpers.go index b4eab04a4..71514c74b 100644 --- a/tap/extensions/amqp/helpers.go +++ b/tap/extensions/amqp/helpers.go @@ -93,10 +93,10 @@ type AMQPWrapper struct { Details interface{} `json:"details"` } -func emitAMQP(event interface{}, _type string, method string, connectionInfo *api.ConnectionInfo, emitter api.Emitter) { +func emitAMQP(event interface{}, _type string, method string, connectionInfo *api.ConnectionInfo, captureTime time.Time, emitter api.Emitter) { request := &api.GenericMessage{ IsRequest: true, - CaptureTime: time.Now(), + CaptureTime: captureTime, Payload: AMQPPayload{ Data: &AMQPWrapper{ Method: method, @@ -107,7 +107,7 @@ func emitAMQP(event interface{}, _type string, method string, connectionInfo *ap } item := &api.OutputChannelItem{ Protocol: protocol, - Timestamp: time.Now().UnixNano() / int64(time.Millisecond), + Timestamp: captureTime.UnixNano() / int64(time.Millisecond), ConnectionInfo: connectionInfo, Pair: &api.RequestResponsePair{ Request: *request, diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index 19b5cd913..af372b77d 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -41,7 +41,7 @@ func (d dissecting) Ping() { const amqpRequest string = "amqp_request" -func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error { +func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error { r := AmqpReader{b} var remaining int @@ -110,10 +110,10 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co switch lastMethodFrameMessage.(type) { case *BasicPublish: eventBasicPublish.Body = f.Body - emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, emitter) + emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, superTimer.CaptureTime, emitter) case *BasicDeliver: eventBasicDeliver.Body = f.Body - emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, emitter) + emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, superTimer.CaptureTime, emitter) default: } @@ -134,7 +134,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co NoWait: m.NoWait, Arguments: m.Arguments, } - emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, emitter) + emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter) case *BasicConsume: eventBasicConsume := &BasicConsume{ @@ -146,7 +146,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co NoWait: m.NoWait, Arguments: m.Arguments, } - emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, emitter) + emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter) case *BasicDeliver: eventBasicDeliver.ConsumerTag = m.ConsumerTag @@ -165,7 +165,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co NoWait: m.NoWait, Arguments: m.Arguments, } - emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, emitter) + emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter) case *ExchangeDeclare: eventExchangeDeclare := &ExchangeDeclare{ @@ -178,7 +178,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co NoWait: m.NoWait, Arguments: m.Arguments, } - emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, emitter) + emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter) case *ConnectionStart: eventConnectionStart := &ConnectionStart{ @@ -188,7 +188,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co Mechanisms: m.Mechanisms, Locales: m.Locales, } - emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, emitter) + emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter) case *ConnectionClose: eventConnectionClose := &ConnectionClose{ @@ -197,7 +197,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co ClassId: m.ClassId, MethodId: m.MethodId, } - emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, emitter) + emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, superTimer.CaptureTime, emitter) default: @@ -264,6 +264,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve RequestSenderIp: item.ConnectionInfo.ClientIP, Service: service, Timestamp: item.Timestamp, + ElapsedTime: 0, Path: summary, ResolvedSource: resolvedSource, ResolvedDestination: resolvedDestination, @@ -300,7 +301,9 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails { } } -func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error) { +func (d dissecting) Represent(entry *api.MizuEntry) (p api.Protocol, object []byte, bodySize int64, err error) { + p = protocol + bodySize = 0 var root map[string]interface{} json.Unmarshal([]byte(entry.Entry), &root) representation := make(map[string]interface{}, 0) @@ -334,8 +337,8 @@ func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error break } representation["request"] = repRequest - object, err := json.Marshal(representation) - return protocol, object, err + object, err = json.Marshal(representation) + return } var Dissector dissecting diff --git a/tap/extensions/http/handlers.go b/tap/extensions/http/handlers.go index ef615f8f6..fbdeb85e0 100644 --- a/tap/extensions/http/handlers.go +++ b/tap/extensions/http/handlers.go @@ -7,14 +7,13 @@ import ( "io" "io/ioutil" "net/http" - "time" "github.com/romana/rlog" "github.com/up9inc/mizu/tap/api" ) -func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, emitter api.Emitter) error { +func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter) error { streamID, messageHTTP1, err := grpcAssembler.readMessage() if err != nil { return err @@ -32,7 +31,7 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, emitter a tcpID.DstPort, streamID, ) - item = reqResMatcher.registerRequest(ident, &messageHTTP1, time.Now()) + item = reqResMatcher.registerRequest(ident, &messageHTTP1, superTimer.CaptureTime) if item != nil { item.ConnectionInfo = &api.ConnectionInfo{ ClientIP: tcpID.SrcIP, @@ -51,7 +50,7 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, emitter a tcpID.SrcPort, streamID, ) - item = reqResMatcher.registerResponse(ident, &messageHTTP1, time.Now()) + item = reqResMatcher.registerResponse(ident, &messageHTTP1, superTimer.CaptureTime) if item != nil { item.ConnectionInfo = &api.ConnectionInfo{ ClientIP: tcpID.DstIP, @@ -71,7 +70,7 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, emitter a return nil } -func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error { +func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error { req, err := http.ReadRequest(b) if err != nil { // log.Println("Error reading stream:", err) @@ -99,7 +98,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api tcpID.DstPort, counterPair.Request, ) - item := reqResMatcher.registerRequest(ident, req, time.Now()) + item := reqResMatcher.registerRequest(ident, req, superTimer.CaptureTime) if item != nil { item.ConnectionInfo = &api.ConnectionInfo{ ClientIP: tcpID.SrcIP, @@ -113,7 +112,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api return nil } -func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error { +func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error { res, err := http.ReadResponse(b, nil) if err != nil { // log.Println("Error reading stream:", err) @@ -149,7 +148,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api tcpID.SrcPort, counterPair.Response, ) - item := reqResMatcher.registerResponse(ident, res, time.Now()) + item := reqResMatcher.registerResponse(ident, res, superTimer.CaptureTime) if item != nil { item.ConnectionInfo = &api.ConnectionInfo{ ClientIP: tcpID.DstIP, diff --git a/tap/extensions/http/main.go b/tap/extensions/http/main.go index 033a17648..d77737f36 100644 --- a/tap/extensions/http/main.go +++ b/tap/extensions/http/main.go @@ -7,6 +7,7 @@ import ( "io" "log" "net/url" + "time" "github.com/romana/rlog" @@ -59,7 +60,7 @@ func (d dissecting) Ping() { log.Printf("pong %s\n", protocol.Name) } -func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error { +func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error { ident := fmt.Sprintf("%s->%s:%s->%s", tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, tcpID.DstPort) isHTTP2, err := checkIsHTTP2Connection(b, isClient) if err != nil { @@ -79,7 +80,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co success := false for { if isHTTP2 { - err = handleHTTP2Stream(grpcAssembler, tcpID, emitter) + err = handleHTTP2Stream(grpcAssembler, tcpID, superTimer, emitter) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { @@ -88,7 +89,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co } success = true } else if isClient { - err = handleHTTP1ClientStream(b, tcpID, counterPair, emitter) + err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { @@ -97,7 +98,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co } success = true } else { - err = handleHTTP1ServerStream(b, tcpID, counterPair, emitter) + err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { @@ -161,6 +162,8 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve } else if resolvedSource != "" { service = SetHostname(service, resolvedSource) } + + elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds() entryBytes, _ := json.Marshal(item.Pair) return &api.MizuEntry{ ProtocolName: protocol.Name, @@ -173,6 +176,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve RequestSenderIp: item.ConnectionInfo.ClientIP, Service: service, Timestamp: item.Timestamp, + ElapsedTime: elapsedTime, Path: path, ResolvedSource: resolvedSource, ResolvedDestination: resolvedDestination, @@ -214,9 +218,7 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails { } } -func representRequest(request map[string]interface{}) []interface{} { - repRequest := make([]interface{}, 0) - +func representRequest(request map[string]interface{}) (repRequest []interface{}) { details, _ := json.Marshal([]map[string]string{ { "name": "Method", @@ -299,11 +301,13 @@ func representRequest(request map[string]interface{}) []interface{} { } } - return repRequest + return } -func representResponse(response map[string]interface{}) []interface{} { - repResponse := make([]interface{}, 0) +func representResponse(response map[string]interface{}) (repResponse []interface{}, bodySize int64) { + repResponse = make([]interface{}, 0) + + bodySize = int64(response["bodySize"].(float64)) details, _ := json.Marshal([]map[string]string{ { @@ -316,7 +320,7 @@ func representResponse(response map[string]interface{}) []interface{} { }, { "name": "Body Size", - "value": fmt.Sprintf("%g bytes", response["bodySize"].(float64)), + "value": fmt.Sprintf("%d bytes", bodySize), }, }) repResponse = append(repResponse, map[string]string{ @@ -356,11 +360,10 @@ func representResponse(response map[string]interface{}) []interface{} { }) } - return repResponse + return } -func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error) { - var p api.Protocol +func (d dissecting) Represent(entry *api.MizuEntry) (p api.Protocol, object []byte, bodySize int64, err error) { if entry.ProtocolVersion == "2.0" { p = http2Protocol } else { @@ -374,11 +377,11 @@ func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error reqDetails := request["details"].(map[string]interface{}) resDetails := response["details"].(map[string]interface{}) repRequest := representRequest(reqDetails) - repResponse := representResponse(resDetails) + repResponse, bodySize := representResponse(resDetails) representation["request"] = repRequest representation["response"] = repResponse - object, err := json.Marshal(representation) - return p, object, err + object, err = json.Marshal(representation) + return } var Dissector dissecting diff --git a/tap/extensions/http/matcher.go b/tap/extensions/http/matcher.go index 9692b5461..b53793717 100644 --- a/tap/extensions/http/matcher.go +++ b/tap/extensions/http/matcher.go @@ -85,7 +85,7 @@ func (matcher *requestResponseMatcher) registerResponse(ident string, response * func (matcher *requestResponseMatcher) preparePair(requestHTTPMessage *api.GenericMessage, responseHTTPMessage *api.GenericMessage) *api.OutputChannelItem { return &api.OutputChannelItem{ Protocol: protocol, - Timestamp: time.Now().UnixNano() / int64(time.Millisecond), + Timestamp: requestHTTPMessage.CaptureTime.UnixNano() / int64(time.Millisecond), ConnectionInfo: nil, Pair: &api.RequestResponsePair{ Request: *requestHTTPMessage, diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index 46b9b710b..068b04605 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "log" + "time" "github.com/up9inc/mizu/tap/api" ) @@ -37,15 +38,15 @@ func (d dissecting) Ping() { log.Printf("pong %s\n", _protocol.Name) } -func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error { +func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error { for { if isClient { - _, _, err := ReadRequest(b, tcpID) + _, _, err := ReadRequest(b, tcpID, superTimer) if err != nil { return err } } else { - err := ReadResponse(b, tcpID, emitter) + err := ReadResponse(b, tcpID, superTimer, emitter) if err != nil { return err } @@ -131,6 +132,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve } request["url"] = summary + elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds() entryBytes, _ := json.Marshal(item.Pair) return &api.MizuEntry{ ProtocolName: _protocol.Name, @@ -143,6 +145,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve RequestSenderIp: item.ConnectionInfo.ClientIP, Service: service, Timestamp: item.Timestamp, + ElapsedTime: elapsedTime, Path: summary, ResolvedSource: resolvedSource, ResolvedDestination: resolvedDestination, @@ -178,7 +181,9 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails { } } -func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error) { +func (d dissecting) Represent(entry *api.MizuEntry) (p api.Protocol, object []byte, bodySize int64, err error) { + p = _protocol + bodySize = 0 var root map[string]interface{} json.Unmarshal([]byte(entry.Entry), &root) representation := make(map[string]interface{}, 0) @@ -224,8 +229,8 @@ func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error representation["request"] = repRequest representation["response"] = repResponse - object, err := json.Marshal(representation) - return _protocol, object, err + object, err = json.Marshal(representation) + return } var Dissector dissecting diff --git a/tap/extensions/kafka/request.go b/tap/extensions/kafka/request.go index cb4ac4748..b7ac67d7d 100644 --- a/tap/extensions/kafka/request.go +++ b/tap/extensions/kafka/request.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "reflect" + "time" "github.com/up9inc/mizu/tap/api" ) @@ -15,9 +16,10 @@ type Request struct { CorrelationID int32 ClientID string Payload interface{} + CaptureTime time.Time } -func ReadRequest(r io.Reader, tcpID *api.TcpID) (apiKey ApiKey, apiVersion int16, err error) { +func ReadRequest(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer) (apiKey ApiKey, apiVersion int16, err error) { d := &decoder{reader: r, remain: 4} size := d.readInt32() @@ -213,6 +215,7 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID) (apiKey ApiKey, apiVersion int16 ApiVersion: apiVersion, CorrelationID: correlationID, ClientID: clientID, + CaptureTime: superTimer.CaptureTime, Payload: payload, } diff --git a/tap/extensions/kafka/response.go b/tap/extensions/kafka/response.go index ac4debf8a..574efa8a2 100644 --- a/tap/extensions/kafka/response.go +++ b/tap/extensions/kafka/response.go @@ -13,9 +13,10 @@ type Response struct { Size int32 CorrelationID int32 Payload interface{} + CaptureTime time.Time } -func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error) { +func ReadResponse(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter) (err error) { d := &decoder{reader: r, remain: 4} size := d.readInt32() @@ -39,6 +40,7 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error Size: size, CorrelationID: correlationID, Payload: payload, + CaptureTime: superTimer.CaptureTime, } key := fmt.Sprintf( @@ -258,12 +260,12 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error item := &api.OutputChannelItem{ Protocol: _protocol, - Timestamp: time.Now().UnixNano() / int64(time.Millisecond), + Timestamp: reqResPair.Request.CaptureTime.UnixNano() / int64(time.Millisecond), ConnectionInfo: connectionInfo, Pair: &api.RequestResponsePair{ Request: api.GenericMessage{ IsRequest: true, - CaptureTime: time.Now(), + CaptureTime: reqResPair.Request.CaptureTime, Payload: KafkaPayload{ Data: &KafkaWrapper{ Method: apiNames[apiKey], @@ -274,7 +276,7 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error }, Response: api.GenericMessage{ IsRequest: false, - CaptureTime: time.Now(), + CaptureTime: reqResPair.Response.CaptureTime, Payload: KafkaPayload{ Data: &KafkaWrapper{ Method: apiNames[apiKey], diff --git a/tap/tcp_reader.go b/tap/tcp_reader.go index 3e276adbb..83aa7b1e1 100644 --- a/tap/tcp_reader.go +++ b/tap/tcp_reader.go @@ -51,7 +51,7 @@ type tcpReader struct { isOutgoing bool msgQueue chan tcpReaderDataMsg // Channel of captured reassembled tcp payload data []byte - captureTime time.Time + superTimer *api.SuperTimer parent *tcpStream messageCount uint packetsSeen uint @@ -69,7 +69,7 @@ func (h *tcpReader) Read(p []byte) (int, error) { msg, ok = <-h.msgQueue h.data = msg.bytes - h.captureTime = msg.timestamp + h.superTimer.CaptureTime = msg.timestamp if len(h.data) > 0 { h.packetsSeen += 1 } @@ -96,7 +96,7 @@ func (h *tcpReader) Read(p []byte) (int, error) { func (h *tcpReader) run(wg *sync.WaitGroup) { defer wg.Done() b := bufio.NewReader(h) - err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.emitter) + err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.emitter) if err != nil { io.Copy(ioutil.Discard, b) } diff --git a/tap/tcp_stream.go b/tap/tcp_stream.go index 55ab74558..657e2f9ef 100644 --- a/tap/tcp_stream.go +++ b/tap/tcp_stream.go @@ -144,13 +144,14 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass // This is where we pass the reassembled information onwards // This channel is read by an tcpReader object statsTracker.incReassembledTcpPayloadsCount() + timestamp := ac.GetCaptureInfo().Timestamp if dir == reassembly.TCPDirClientToServer { for _, reader := range t.clients { - reader.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp} + reader.msgQueue <- tcpReaderDataMsg{data, timestamp} } } else { for _, reader := range t.servers { - reader.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp} + reader.msgQueue <- tcpReaderDataMsg{data, timestamp} } } } diff --git a/tap/tcp_stream_factory.go b/tap/tcp_stream_factory.go index 900adf842..9f85949ce 100644 --- a/tap/tcp_stream_factory.go +++ b/tap/tcp_stream_factory.go @@ -54,8 +54,9 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T Response: 0, } stream.clients = append(stream.clients, tcpReader{ - msgQueue: make(chan tcpReaderDataMsg), - ident: fmt.Sprintf("%s %s", net, transport), + msgQueue: make(chan tcpReaderDataMsg), + superTimer: &api.SuperTimer{}, + ident: fmt.Sprintf("%s %s", net, transport), tcpID: &api.TcpID{ SrcIP: srcIp, DstIP: dstIp, @@ -71,8 +72,9 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T counterPair: counterPair, }) stream.servers = append(stream.servers, tcpReader{ - msgQueue: make(chan tcpReaderDataMsg), - ident: fmt.Sprintf("%s %s", net, transport), + msgQueue: make(chan tcpReaderDataMsg), + superTimer: &api.SuperTimer{}, + ident: fmt.Sprintf("%s %s", net, transport), tcpID: &api.TcpID{ SrcIP: net.Dst().String(), DstIP: net.Src().String(), diff --git a/ui/src/components/EntryDetailed.tsx b/ui/src/components/EntryDetailed.tsx index f7040f35c..9527876fc 100644 --- a/ui/src/components/EntryDetailed.tsx +++ b/ui/src/components/EntryDetailed.tsx @@ -32,7 +32,7 @@ interface EntryDetailedProps { export const formatSize = (n: number) => n > 1000 ? `${Math.round(n / 1000)}KB` : `${n} B`; -const EntryTitle: React.FC = ({protocol, data}) => { +const EntryTitle: React.FC = ({protocol, data, bodySize, elapsedTime}) => { const classes = useStyles(); const {response} = JSON.parse(data.entry); @@ -40,7 +40,8 @@ const EntryTitle: React.FC = ({protocol, data}) => { return
- {response.payload &&
{formatSize(response.payload.bodySize)}
} + {response.payload &&
{formatSize(bodySize)}
} +
{Math.round(elapsedTime)}ms
{'rulesMatched' in data ? data.rulesMatched?.length : '0'} Rules Applied
; @@ -63,7 +64,12 @@ const EntrySummary: React.FC = ({data}) => { export const EntryDetailed: React.FC = ({entryData}) => { return <> - + {entryData.data && } <> {entryData.data && }