mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-08-17 16:08:12 +00:00
Remove SuperTimer
struct
This commit is contained in:
parent
3c6fe8393f
commit
75b44f9143
@ -106,10 +106,6 @@ type OutputChannelItem struct {
|
|||||||
Namespace string
|
Namespace string
|
||||||
}
|
}
|
||||||
|
|
||||||
type SuperTimer struct {
|
|
||||||
CaptureTime time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
type ProtoIdentifier struct {
|
type ProtoIdentifier struct {
|
||||||
Protocol *Protocol
|
Protocol *Protocol
|
||||||
IsClosedOthers bool
|
IsClosedOthers bool
|
||||||
|
@ -30,7 +30,7 @@ type TcpReader struct {
|
|||||||
MsgQueue chan TcpReaderDataMsg // Channel of captured reassembled tcp payload
|
MsgQueue chan TcpReaderDataMsg // Channel of captured reassembled tcp payload
|
||||||
data []byte
|
data []byte
|
||||||
Progress *ReadProgress
|
Progress *ReadProgress
|
||||||
SuperTimer *SuperTimer
|
CaptureTime time.Time
|
||||||
Parent *TcpStream
|
Parent *TcpStream
|
||||||
packetsSeen uint
|
packetsSeen uint
|
||||||
Extension *Extension
|
Extension *Extension
|
||||||
@ -48,7 +48,7 @@ func (reader *TcpReader) Read(p []byte) (int, error) {
|
|||||||
msg, ok = <-reader.MsgQueue
|
msg, ok = <-reader.MsgQueue
|
||||||
reader.data = msg.bytes
|
reader.data = msg.bytes
|
||||||
|
|
||||||
reader.SuperTimer.CaptureTime = msg.timestamp
|
reader.CaptureTime = msg.timestamp
|
||||||
if len(reader.data) > 0 {
|
if len(reader.data) > 0 {
|
||||||
reader.packetsSeen += 1
|
reader.packetsSeen += 1
|
||||||
}
|
}
|
||||||
|
@ -114,11 +114,11 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha
|
|||||||
case *BasicPublish:
|
case *BasicPublish:
|
||||||
eventBasicPublish.Body = f.Body
|
eventBasicPublish.Body = f.Body
|
||||||
reader.Parent.CloseOtherProtocolDissectors(&protocol)
|
reader.Parent.CloseOtherProtocolDissectors(&protocol)
|
||||||
emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin)
|
emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, reader.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin)
|
||||||
case *BasicDeliver:
|
case *BasicDeliver:
|
||||||
eventBasicDeliver.Body = f.Body
|
eventBasicDeliver.Body = f.Body
|
||||||
reader.Parent.CloseOtherProtocolDissectors(&protocol)
|
reader.Parent.CloseOtherProtocolDissectors(&protocol)
|
||||||
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin)
|
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, reader.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin)
|
||||||
}
|
}
|
||||||
|
|
||||||
case *MethodFrame:
|
case *MethodFrame:
|
||||||
@ -139,7 +139,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha
|
|||||||
Arguments: m.Arguments,
|
Arguments: m.Arguments,
|
||||||
}
|
}
|
||||||
reader.Parent.CloseOtherProtocolDissectors(&protocol)
|
reader.Parent.CloseOtherProtocolDissectors(&protocol)
|
||||||
emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin)
|
emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, reader.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin)
|
||||||
|
|
||||||
case *BasicConsume:
|
case *BasicConsume:
|
||||||
eventBasicConsume := &BasicConsume{
|
eventBasicConsume := &BasicConsume{
|
||||||
@ -152,7 +152,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha
|
|||||||
Arguments: m.Arguments,
|
Arguments: m.Arguments,
|
||||||
}
|
}
|
||||||
reader.Parent.CloseOtherProtocolDissectors(&protocol)
|
reader.Parent.CloseOtherProtocolDissectors(&protocol)
|
||||||
emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin)
|
emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, reader.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin)
|
||||||
|
|
||||||
case *BasicDeliver:
|
case *BasicDeliver:
|
||||||
eventBasicDeliver.ConsumerTag = m.ConsumerTag
|
eventBasicDeliver.ConsumerTag = m.ConsumerTag
|
||||||
@ -172,7 +172,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha
|
|||||||
Arguments: m.Arguments,
|
Arguments: m.Arguments,
|
||||||
}
|
}
|
||||||
reader.Parent.CloseOtherProtocolDissectors(&protocol)
|
reader.Parent.CloseOtherProtocolDissectors(&protocol)
|
||||||
emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin)
|
emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, reader.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin)
|
||||||
|
|
||||||
case *ExchangeDeclare:
|
case *ExchangeDeclare:
|
||||||
eventExchangeDeclare := &ExchangeDeclare{
|
eventExchangeDeclare := &ExchangeDeclare{
|
||||||
@ -186,7 +186,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha
|
|||||||
Arguments: m.Arguments,
|
Arguments: m.Arguments,
|
||||||
}
|
}
|
||||||
reader.Parent.CloseOtherProtocolDissectors(&protocol)
|
reader.Parent.CloseOtherProtocolDissectors(&protocol)
|
||||||
emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin)
|
emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, reader.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin)
|
||||||
|
|
||||||
case *ConnectionStart:
|
case *ConnectionStart:
|
||||||
eventConnectionStart := &ConnectionStart{
|
eventConnectionStart := &ConnectionStart{
|
||||||
@ -197,7 +197,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha
|
|||||||
Locales: m.Locales,
|
Locales: m.Locales,
|
||||||
}
|
}
|
||||||
reader.Parent.CloseOtherProtocolDissectors(&protocol)
|
reader.Parent.CloseOtherProtocolDissectors(&protocol)
|
||||||
emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin)
|
emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, reader.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin)
|
||||||
|
|
||||||
case *ConnectionClose:
|
case *ConnectionClose:
|
||||||
eventConnectionClose := &ConnectionClose{
|
eventConnectionClose := &ConnectionClose{
|
||||||
@ -207,7 +207,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha
|
|||||||
MethodId: m.MethodId,
|
MethodId: m.MethodId,
|
||||||
}
|
}
|
||||||
reader.Parent.CloseOtherProtocolDissectors(&protocol)
|
reader.Parent.CloseOtherProtocolDissectors(&protocol)
|
||||||
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin)
|
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, reader.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin)
|
||||||
}
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
|
@ -132,7 +132,6 @@ func TestDissect(t *testing.T) {
|
|||||||
},
|
},
|
||||||
IsClient: true,
|
IsClient: true,
|
||||||
TcpID: tcpIDClient,
|
TcpID: tcpIDClient,
|
||||||
SuperTimer: &api.SuperTimer{},
|
|
||||||
Emitter: emitter,
|
Emitter: emitter,
|
||||||
ReqResMatcher: reqResMatcher,
|
ReqResMatcher: reqResMatcher,
|
||||||
}
|
}
|
||||||
@ -162,7 +161,6 @@ func TestDissect(t *testing.T) {
|
|||||||
},
|
},
|
||||||
IsClient: false,
|
IsClient: false,
|
||||||
TcpID: tcpIDServer,
|
TcpID: tcpIDServer,
|
||||||
SuperTimer: &api.SuperTimer{},
|
|
||||||
Emitter: emitter,
|
Emitter: emitter,
|
||||||
ReqResMatcher: reqResMatcher,
|
ReqResMatcher: reqResMatcher,
|
||||||
}
|
}
|
||||||
|
@ -8,6 +8,7 @@ import (
|
|||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/up9inc/mizu/shared"
|
"github.com/up9inc/mizu/shared"
|
||||||
"github.com/up9inc/mizu/tap/api"
|
"github.com/up9inc/mizu/tap/api"
|
||||||
@ -48,7 +49,7 @@ func replaceForwardedFor(item *api.OutputChannelItem) {
|
|||||||
item.ConnectionInfo.ClientPort = ""
|
item.ConnectionInfo.ClientPort = ""
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleHTTP2Stream(http2Assembler *Http2Assembler, progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *shared.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) error {
|
func handleHTTP2Stream(http2Assembler *Http2Assembler, progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, captureTime time.Time, emitter api.Emitter, options *shared.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) error {
|
||||||
streamID, messageHTTP1, isGrpc, err := http2Assembler.readMessage()
|
streamID, messageHTTP1, isGrpc, err := http2Assembler.readMessage()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -67,7 +68,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, progress *api.ReadProgres
|
|||||||
streamID,
|
streamID,
|
||||||
"HTTP2",
|
"HTTP2",
|
||||||
)
|
)
|
||||||
item = reqResMatcher.registerRequest(ident, &messageHTTP1, superTimer.CaptureTime, progress.Current(), messageHTTP1.ProtoMinor)
|
item = reqResMatcher.registerRequest(ident, &messageHTTP1, captureTime, progress.Current(), messageHTTP1.ProtoMinor)
|
||||||
if item != nil {
|
if item != nil {
|
||||||
item.ConnectionInfo = &api.ConnectionInfo{
|
item.ConnectionInfo = &api.ConnectionInfo{
|
||||||
ClientIP: tcpID.SrcIP,
|
ClientIP: tcpID.SrcIP,
|
||||||
@ -87,7 +88,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, progress *api.ReadProgres
|
|||||||
streamID,
|
streamID,
|
||||||
"HTTP2",
|
"HTTP2",
|
||||||
)
|
)
|
||||||
item = reqResMatcher.registerResponse(ident, &messageHTTP1, superTimer.CaptureTime, progress.Current(), messageHTTP1.ProtoMinor)
|
item = reqResMatcher.registerResponse(ident, &messageHTTP1, captureTime, progress.Current(), messageHTTP1.ProtoMinor)
|
||||||
if item != nil {
|
if item != nil {
|
||||||
item.ConnectionInfo = &api.ConnectionInfo{
|
item.ConnectionInfo = &api.ConnectionInfo{
|
||||||
ClientIP: tcpID.DstIP,
|
ClientIP: tcpID.DstIP,
|
||||||
@ -112,7 +113,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, progress *api.ReadProgres
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleHTTP1ClientStream(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *shared.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, req *http.Request, err error) {
|
func handleHTTP1ClientStream(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, captureTime time.Time, emitter api.Emitter, options *shared.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, req *http.Request, err error) {
|
||||||
req, err = http.ReadRequest(b)
|
req, err = http.ReadRequest(b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
@ -140,7 +141,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, progress *api.ReadProgress, captur
|
|||||||
requestCounter,
|
requestCounter,
|
||||||
"HTTP1",
|
"HTTP1",
|
||||||
)
|
)
|
||||||
item := reqResMatcher.registerRequest(ident, req, superTimer.CaptureTime, progress.Current(), req.ProtoMinor)
|
item := reqResMatcher.registerRequest(ident, req, captureTime, progress.Current(), req.ProtoMinor)
|
||||||
if item != nil {
|
if item != nil {
|
||||||
item.ConnectionInfo = &api.ConnectionInfo{
|
item.ConnectionInfo = &api.ConnectionInfo{
|
||||||
ClientIP: tcpID.SrcIP,
|
ClientIP: tcpID.SrcIP,
|
||||||
@ -155,7 +156,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, progress *api.ReadProgress, captur
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleHTTP1ServerStream(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *shared.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, err error) {
|
func handleHTTP1ServerStream(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, captureTime time.Time, emitter api.Emitter, options *shared.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, err error) {
|
||||||
var res *http.Response
|
var res *http.Response
|
||||||
res, err = http.ReadResponse(b, nil)
|
res, err = http.ReadResponse(b, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -184,7 +185,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, progress *api.ReadProgress, captur
|
|||||||
responseCounter,
|
responseCounter,
|
||||||
"HTTP1",
|
"HTTP1",
|
||||||
)
|
)
|
||||||
item := reqResMatcher.registerResponse(ident, res, superTimer.CaptureTime, progress.Current(), res.ProtoMinor)
|
item := reqResMatcher.registerResponse(ident, res, captureTime, progress.Current(), res.ProtoMinor)
|
||||||
if item != nil {
|
if item != nil {
|
||||||
item.ConnectionInfo = &api.ConnectionInfo{
|
item.ConnectionInfo = &api.ConnectionInfo{
|
||||||
ClientIP: tcpID.DstIP,
|
ClientIP: tcpID.DstIP,
|
||||||
|
@ -122,7 +122,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha
|
|||||||
}
|
}
|
||||||
|
|
||||||
if isHTTP2 {
|
if isHTTP2 {
|
||||||
err = handleHTTP2Stream(http2Assembler, reader.Progress, reader.Parent.Origin, reader.TcpID, reader.SuperTimer, reader.Emitter, options, reqResMatcher)
|
err = handleHTTP2Stream(http2Assembler, reader.Progress, reader.Parent.Origin, reader.TcpID, reader.CaptureTime, reader.Emitter, options, reqResMatcher)
|
||||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||||
break
|
break
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
@ -131,7 +131,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha
|
|||||||
reader.Parent.CloseOtherProtocolDissectors(&http11protocol)
|
reader.Parent.CloseOtherProtocolDissectors(&http11protocol)
|
||||||
} else if reader.IsClient {
|
} else if reader.IsClient {
|
||||||
var req *http.Request
|
var req *http.Request
|
||||||
switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, reader.Progress, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.SuperTimer, reader.Emitter, options, reqResMatcher)
|
switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, reader.Progress, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.CaptureTime, reader.Emitter, options, reqResMatcher)
|
||||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||||
break
|
break
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
@ -149,7 +149,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha
|
|||||||
reader.TcpID.DstPort,
|
reader.TcpID.DstPort,
|
||||||
"HTTP2",
|
"HTTP2",
|
||||||
)
|
)
|
||||||
item := reqResMatcher.registerRequest(ident, req, reader.SuperTimer.CaptureTime, reader.Progress.Current(), req.ProtoMinor)
|
item := reqResMatcher.registerRequest(ident, req, reader.CaptureTime, reader.Progress.Current(), req.ProtoMinor)
|
||||||
if item != nil {
|
if item != nil {
|
||||||
item.ConnectionInfo = &api.ConnectionInfo{
|
item.ConnectionInfo = &api.ConnectionInfo{
|
||||||
ClientIP: reader.TcpID.SrcIP,
|
ClientIP: reader.TcpID.SrcIP,
|
||||||
@ -163,7 +163,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, reader.Progress, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.SuperTimer, reader.Emitter, options, reqResMatcher)
|
switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, reader.Progress, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.CaptureTime, reader.Emitter, options, reqResMatcher)
|
||||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||||
break
|
break
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
|
@ -134,7 +134,6 @@ func TestDissect(t *testing.T) {
|
|||||||
},
|
},
|
||||||
IsClient: true,
|
IsClient: true,
|
||||||
TcpID: tcpIDClient,
|
TcpID: tcpIDClient,
|
||||||
SuperTimer: &api.SuperTimer{},
|
|
||||||
Emitter: emitter,
|
Emitter: emitter,
|
||||||
ReqResMatcher: reqResMatcher,
|
ReqResMatcher: reqResMatcher,
|
||||||
}
|
}
|
||||||
@ -164,7 +163,6 @@ func TestDissect(t *testing.T) {
|
|||||||
},
|
},
|
||||||
IsClient: false,
|
IsClient: false,
|
||||||
TcpID: tcpIDServer,
|
TcpID: tcpIDServer,
|
||||||
SuperTimer: &api.SuperTimer{},
|
|
||||||
Emitter: emitter,
|
Emitter: emitter,
|
||||||
ReqResMatcher: reqResMatcher,
|
ReqResMatcher: reqResMatcher,
|
||||||
}
|
}
|
||||||
|
@ -44,13 +44,13 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha
|
|||||||
}
|
}
|
||||||
|
|
||||||
if reader.IsClient {
|
if reader.IsClient {
|
||||||
_, _, err := ReadRequest(b, reader.TcpID, reader.CounterPair, reader.SuperTimer, reqResMatcher)
|
_, _, err := ReadRequest(b, reader.TcpID, reader.CounterPair, reader.CaptureTime, reqResMatcher)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
reader.Parent.CloseOtherProtocolDissectors(&_protocol)
|
reader.Parent.CloseOtherProtocolDissectors(&_protocol)
|
||||||
} else {
|
} else {
|
||||||
err := ReadResponse(b, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.SuperTimer, reader.Emitter, reqResMatcher)
|
err := ReadResponse(b, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.CaptureTime, reader.Emitter, reqResMatcher)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -133,7 +133,6 @@ func TestDissect(t *testing.T) {
|
|||||||
},
|
},
|
||||||
IsClient: true,
|
IsClient: true,
|
||||||
TcpID: tcpIDClient,
|
TcpID: tcpIDClient,
|
||||||
SuperTimer: &api.SuperTimer{},
|
|
||||||
Emitter: emitter,
|
Emitter: emitter,
|
||||||
ReqResMatcher: reqResMatcher,
|
ReqResMatcher: reqResMatcher,
|
||||||
}
|
}
|
||||||
@ -163,7 +162,6 @@ func TestDissect(t *testing.T) {
|
|||||||
},
|
},
|
||||||
IsClient: false,
|
IsClient: false,
|
||||||
TcpID: tcpIDServer,
|
TcpID: tcpIDServer,
|
||||||
SuperTimer: &api.SuperTimer{},
|
|
||||||
Emitter: emitter,
|
Emitter: emitter,
|
||||||
ReqResMatcher: reqResMatcher,
|
ReqResMatcher: reqResMatcher,
|
||||||
}
|
}
|
||||||
|
@ -19,7 +19,7 @@ type Request struct {
|
|||||||
CaptureTime time.Time `json:"captureTime"`
|
CaptureTime time.Time `json:"captureTime"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, reqResMatcher *requestResponseMatcher) (apiKey ApiKey, apiVersion int16, err error) {
|
func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, captureTime time.Time, reqResMatcher *requestResponseMatcher) (apiKey ApiKey, apiVersion int16, err error) {
|
||||||
d := &decoder{reader: r, remain: 4}
|
d := &decoder{reader: r, remain: 4}
|
||||||
size := d.readInt32()
|
size := d.readInt32()
|
||||||
|
|
||||||
@ -206,7 +206,7 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, su
|
|||||||
ApiVersion: apiVersion,
|
ApiVersion: apiVersion,
|
||||||
CorrelationID: correlationID,
|
CorrelationID: correlationID,
|
||||||
ClientID: clientID,
|
ClientID: clientID,
|
||||||
CaptureTime: superTimer.CaptureTime,
|
CaptureTime: captureTime,
|
||||||
Payload: payload,
|
Payload: payload,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -16,7 +16,7 @@ type Response struct {
|
|||||||
CaptureTime time.Time `json:"captureTime"`
|
CaptureTime time.Time `json:"captureTime"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func ReadResponse(r io.Reader, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, reqResMatcher *requestResponseMatcher) (err error) {
|
func ReadResponse(r io.Reader, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, captureTime time.Time, emitter api.Emitter, reqResMatcher *requestResponseMatcher) (err error) {
|
||||||
d := &decoder{reader: r, remain: 4}
|
d := &decoder{reader: r, remain: 4}
|
||||||
size := d.readInt32()
|
size := d.readInt32()
|
||||||
|
|
||||||
@ -43,7 +43,7 @@ func ReadResponse(r io.Reader, capture api.Capture, tcpID *api.TcpID, counterPai
|
|||||||
Size: size,
|
Size: size,
|
||||||
CorrelationID: correlationID,
|
CorrelationID: correlationID,
|
||||||
Payload: payload,
|
Payload: payload,
|
||||||
CaptureTime: superTimer.CaptureTime,
|
CaptureTime: captureTime,
|
||||||
}
|
}
|
||||||
|
|
||||||
key := fmt.Sprintf(
|
key := fmt.Sprintf(
|
||||||
|
@ -2,11 +2,12 @@ package redis
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/up9inc/mizu/tap/api"
|
"github.com/up9inc/mizu/tap/api"
|
||||||
)
|
)
|
||||||
|
|
||||||
func handleClientStream(progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket, reqResMatcher *requestResponseMatcher) error {
|
func handleClientStream(progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, captureTime time.Time, emitter api.Emitter, request *RedisPacket, reqResMatcher *requestResponseMatcher) error {
|
||||||
counterPair.Lock()
|
counterPair.Lock()
|
||||||
counterPair.Request++
|
counterPair.Request++
|
||||||
requestCounter := counterPair.Request
|
requestCounter := counterPair.Request
|
||||||
@ -21,7 +22,7 @@ func handleClientStream(progress *api.ReadProgress, capture api.Capture, tcpID *
|
|||||||
requestCounter,
|
requestCounter,
|
||||||
)
|
)
|
||||||
|
|
||||||
item := reqResMatcher.registerRequest(ident, request, superTimer.CaptureTime, progress.Current())
|
item := reqResMatcher.registerRequest(ident, request, captureTime, progress.Current())
|
||||||
if item != nil {
|
if item != nil {
|
||||||
item.Capture = capture
|
item.Capture = capture
|
||||||
item.ConnectionInfo = &api.ConnectionInfo{
|
item.ConnectionInfo = &api.ConnectionInfo{
|
||||||
@ -36,7 +37,7 @@ func handleClientStream(progress *api.ReadProgress, capture api.Capture, tcpID *
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleServerStream(progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket, reqResMatcher *requestResponseMatcher) error {
|
func handleServerStream(progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, captureTime time.Time, emitter api.Emitter, response *RedisPacket, reqResMatcher *requestResponseMatcher) error {
|
||||||
counterPair.Lock()
|
counterPair.Lock()
|
||||||
counterPair.Response++
|
counterPair.Response++
|
||||||
responseCounter := counterPair.Response
|
responseCounter := counterPair.Response
|
||||||
@ -51,7 +52,7 @@ func handleServerStream(progress *api.ReadProgress, capture api.Capture, tcpID *
|
|||||||
responseCounter,
|
responseCounter,
|
||||||
)
|
)
|
||||||
|
|
||||||
item := reqResMatcher.registerResponse(ident, response, superTimer.CaptureTime, progress.Current())
|
item := reqResMatcher.registerResponse(ident, response, captureTime, progress.Current())
|
||||||
if item != nil {
|
if item != nil {
|
||||||
item.Capture = capture
|
item.Capture = capture
|
||||||
item.ConnectionInfo = &api.ConnectionInfo{
|
item.ConnectionInfo = &api.ConnectionInfo{
|
||||||
|
@ -49,9 +49,9 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha
|
|||||||
}
|
}
|
||||||
|
|
||||||
if reader.IsClient {
|
if reader.IsClient {
|
||||||
err = handleClientStream(reader.Progress, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.SuperTimer, reader.Emitter, redisPacket, reqResMatcher)
|
err = handleClientStream(reader.Progress, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.CaptureTime, reader.Emitter, redisPacket, reqResMatcher)
|
||||||
} else {
|
} else {
|
||||||
err = handleServerStream(reader.Progress, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.SuperTimer, reader.Emitter, redisPacket, reqResMatcher)
|
err = handleServerStream(reader.Progress, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.CaptureTime, reader.Emitter, redisPacket, reqResMatcher)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -133,7 +133,6 @@ func TestDissect(t *testing.T) {
|
|||||||
},
|
},
|
||||||
IsClient: true,
|
IsClient: true,
|
||||||
TcpID: tcpIDClient,
|
TcpID: tcpIDClient,
|
||||||
SuperTimer: &api.SuperTimer{},
|
|
||||||
Emitter: emitter,
|
Emitter: emitter,
|
||||||
ReqResMatcher: reqResMatcher,
|
ReqResMatcher: reqResMatcher,
|
||||||
}
|
}
|
||||||
@ -163,7 +162,6 @@ func TestDissect(t *testing.T) {
|
|||||||
},
|
},
|
||||||
IsClient: false,
|
IsClient: false,
|
||||||
TcpID: tcpIDServer,
|
TcpID: tcpIDServer,
|
||||||
SuperTimer: &api.SuperTimer{},
|
|
||||||
Emitter: emitter,
|
Emitter: emitter,
|
||||||
ReqResMatcher: reqResMatcher,
|
ReqResMatcher: reqResMatcher,
|
||||||
}
|
}
|
||||||
|
@ -78,10 +78,9 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
|
|||||||
Response: 0,
|
Response: 0,
|
||||||
}
|
}
|
||||||
stream.Clients = append(stream.Clients, api.TcpReader{
|
stream.Clients = append(stream.Clients, api.TcpReader{
|
||||||
MsgQueue: make(chan api.TcpReaderDataMsg),
|
MsgQueue: make(chan api.TcpReaderDataMsg),
|
||||||
Progress: &api.ReadProgress{},
|
Progress: &api.ReadProgress{},
|
||||||
SuperTimer: &api.SuperTimer{},
|
Ident: fmt.Sprintf("%s %s", net, transport),
|
||||||
Ident: fmt.Sprintf("%s %s", net, transport),
|
|
||||||
TcpID: &api.TcpID{
|
TcpID: &api.TcpID{
|
||||||
SrcIP: srcIp,
|
SrcIP: srcIp,
|
||||||
DstIP: dstIp,
|
DstIP: dstIp,
|
||||||
@ -97,10 +96,9 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
|
|||||||
ReqResMatcher: reqResMatcher,
|
ReqResMatcher: reqResMatcher,
|
||||||
})
|
})
|
||||||
stream.Servers = append(stream.Servers, api.TcpReader{
|
stream.Servers = append(stream.Servers, api.TcpReader{
|
||||||
MsgQueue: make(chan api.TcpReaderDataMsg),
|
MsgQueue: make(chan api.TcpReaderDataMsg),
|
||||||
Progress: &api.ReadProgress{},
|
Progress: &api.ReadProgress{},
|
||||||
SuperTimer: &api.SuperTimer{},
|
Ident: fmt.Sprintf("%s %s", net, transport),
|
||||||
Ident: fmt.Sprintf("%s %s", net, transport),
|
|
||||||
TcpID: &api.TcpID{
|
TcpID: &api.TcpID{
|
||||||
SrcIP: net.Dst().String(),
|
SrcIP: net.Dst().String(),
|
||||||
DstIP: net.Src().String(),
|
DstIP: net.Src().String(),
|
||||||
|
@ -174,7 +174,6 @@ func dissect(extension *api.Extension, reader *tlsReader, isRequest bool, tcpid
|
|||||||
},
|
},
|
||||||
IsClient: isRequest,
|
IsClient: isRequest,
|
||||||
TcpID: tcpid,
|
TcpID: tcpid,
|
||||||
SuperTimer: &api.SuperTimer{},
|
|
||||||
Emitter: tlsEmitter,
|
Emitter: tlsEmitter,
|
||||||
ReqResMatcher: reqResMatcher,
|
ReqResMatcher: reqResMatcher,
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user