From 75b44f914322cf7e33e8dc74bc70c3bf1ac8e7d0 Mon Sep 17 00:00:00 2001 From: "M. Mert Yildiran" Date: Wed, 20 Apr 2022 13:30:39 +0300 Subject: [PATCH] Remove `SuperTimer` struct --- tap/api/api.go | 4 ---- tap/api/tcp_reader.go | 4 ++-- tap/extensions/amqp/main.go | 16 ++++++++-------- tap/extensions/amqp/main_test.go | 2 -- tap/extensions/http/handlers.go | 15 ++++++++------- tap/extensions/http/main.go | 8 ++++---- tap/extensions/http/main_test.go | 2 -- tap/extensions/kafka/main.go | 4 ++-- tap/extensions/kafka/main_test.go | 2 -- tap/extensions/kafka/request.go | 4 ++-- tap/extensions/kafka/response.go | 4 ++-- tap/extensions/redis/handlers.go | 9 +++++---- tap/extensions/redis/main.go | 4 ++-- tap/extensions/redis/main_test.go | 2 -- tap/tcp_stream_factory.go | 14 ++++++-------- tap/tlstapper/tls_poller.go | 1 - 16 files changed, 41 insertions(+), 54 deletions(-) diff --git a/tap/api/api.go b/tap/api/api.go index 72986a1a9..d1d56e571 100644 --- a/tap/api/api.go +++ b/tap/api/api.go @@ -106,10 +106,6 @@ type OutputChannelItem struct { Namespace string } -type SuperTimer struct { - CaptureTime time.Time -} - type ProtoIdentifier struct { Protocol *Protocol IsClosedOthers bool diff --git a/tap/api/tcp_reader.go b/tap/api/tcp_reader.go index 115c846d8..7858a1888 100644 --- a/tap/api/tcp_reader.go +++ b/tap/api/tcp_reader.go @@ -30,7 +30,7 @@ type TcpReader struct { MsgQueue chan TcpReaderDataMsg // Channel of captured reassembled tcp payload data []byte Progress *ReadProgress - SuperTimer *SuperTimer + CaptureTime time.Time Parent *TcpStream packetsSeen uint Extension *Extension @@ -48,7 +48,7 @@ func (reader *TcpReader) Read(p []byte) (int, error) { msg, ok = <-reader.MsgQueue reader.data = msg.bytes - reader.SuperTimer.CaptureTime = msg.timestamp + reader.CaptureTime = msg.timestamp if len(reader.data) > 0 { reader.packetsSeen += 1 } diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index 1eefe2662..f4e5300c0 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -114,11 +114,11 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha case *BasicPublish: eventBasicPublish.Body = f.Body reader.Parent.CloseOtherProtocolDissectors(&protocol) - emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) + emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, reader.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) case *BasicDeliver: eventBasicDeliver.Body = f.Body reader.Parent.CloseOtherProtocolDissectors(&protocol) - emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) + emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, reader.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) } case *MethodFrame: @@ -139,7 +139,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha Arguments: m.Arguments, } reader.Parent.CloseOtherProtocolDissectors(&protocol) - emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) + emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, reader.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) case *BasicConsume: eventBasicConsume := &BasicConsume{ @@ -152,7 +152,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha Arguments: m.Arguments, } reader.Parent.CloseOtherProtocolDissectors(&protocol) - emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) + emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, reader.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) case *BasicDeliver: eventBasicDeliver.ConsumerTag = m.ConsumerTag @@ -172,7 +172,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha Arguments: m.Arguments, } reader.Parent.CloseOtherProtocolDissectors(&protocol) - emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) + emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, reader.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) case *ExchangeDeclare: eventExchangeDeclare := &ExchangeDeclare{ @@ -186,7 +186,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha Arguments: m.Arguments, } reader.Parent.CloseOtherProtocolDissectors(&protocol) - emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) + emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, reader.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) case *ConnectionStart: eventConnectionStart := &ConnectionStart{ @@ -197,7 +197,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha Locales: m.Locales, } reader.Parent.CloseOtherProtocolDissectors(&protocol) - emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) + emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, reader.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) case *ConnectionClose: eventConnectionClose := &ConnectionClose{ @@ -207,7 +207,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha MethodId: m.MethodId, } reader.Parent.CloseOtherProtocolDissectors(&protocol) - emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) + emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, reader.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) } default: diff --git a/tap/extensions/amqp/main_test.go b/tap/extensions/amqp/main_test.go index e68cb8796..c24fd20ce 100644 --- a/tap/extensions/amqp/main_test.go +++ b/tap/extensions/amqp/main_test.go @@ -132,7 +132,6 @@ func TestDissect(t *testing.T) { }, IsClient: true, TcpID: tcpIDClient, - SuperTimer: &api.SuperTimer{}, Emitter: emitter, ReqResMatcher: reqResMatcher, } @@ -162,7 +161,6 @@ func TestDissect(t *testing.T) { }, IsClient: false, TcpID: tcpIDServer, - SuperTimer: &api.SuperTimer{}, Emitter: emitter, ReqResMatcher: reqResMatcher, } diff --git a/tap/extensions/http/handlers.go b/tap/extensions/http/handlers.go index 3d8441907..1b8641cb6 100644 --- a/tap/extensions/http/handlers.go +++ b/tap/extensions/http/handlers.go @@ -8,6 +8,7 @@ import ( "io/ioutil" "net/http" "strings" + "time" "github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/tap/api" @@ -48,7 +49,7 @@ func replaceForwardedFor(item *api.OutputChannelItem) { item.ConnectionInfo.ClientPort = "" } -func handleHTTP2Stream(http2Assembler *Http2Assembler, progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *shared.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) error { +func handleHTTP2Stream(http2Assembler *Http2Assembler, progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, captureTime time.Time, emitter api.Emitter, options *shared.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) error { streamID, messageHTTP1, isGrpc, err := http2Assembler.readMessage() if err != nil { return err @@ -67,7 +68,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, progress *api.ReadProgres streamID, "HTTP2", ) - item = reqResMatcher.registerRequest(ident, &messageHTTP1, superTimer.CaptureTime, progress.Current(), messageHTTP1.ProtoMinor) + item = reqResMatcher.registerRequest(ident, &messageHTTP1, captureTime, progress.Current(), messageHTTP1.ProtoMinor) if item != nil { item.ConnectionInfo = &api.ConnectionInfo{ ClientIP: tcpID.SrcIP, @@ -87,7 +88,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, progress *api.ReadProgres streamID, "HTTP2", ) - item = reqResMatcher.registerResponse(ident, &messageHTTP1, superTimer.CaptureTime, progress.Current(), messageHTTP1.ProtoMinor) + item = reqResMatcher.registerResponse(ident, &messageHTTP1, captureTime, progress.Current(), messageHTTP1.ProtoMinor) if item != nil { item.ConnectionInfo = &api.ConnectionInfo{ ClientIP: tcpID.DstIP, @@ -112,7 +113,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, progress *api.ReadProgres return nil } -func handleHTTP1ClientStream(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *shared.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, req *http.Request, err error) { +func handleHTTP1ClientStream(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, captureTime time.Time, emitter api.Emitter, options *shared.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, req *http.Request, err error) { req, err = http.ReadRequest(b) if err != nil { return @@ -140,7 +141,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, progress *api.ReadProgress, captur requestCounter, "HTTP1", ) - item := reqResMatcher.registerRequest(ident, req, superTimer.CaptureTime, progress.Current(), req.ProtoMinor) + item := reqResMatcher.registerRequest(ident, req, captureTime, progress.Current(), req.ProtoMinor) if item != nil { item.ConnectionInfo = &api.ConnectionInfo{ ClientIP: tcpID.SrcIP, @@ -155,7 +156,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, progress *api.ReadProgress, captur return } -func handleHTTP1ServerStream(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *shared.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, err error) { +func handleHTTP1ServerStream(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, captureTime time.Time, emitter api.Emitter, options *shared.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, err error) { var res *http.Response res, err = http.ReadResponse(b, nil) if err != nil { @@ -184,7 +185,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, progress *api.ReadProgress, captur responseCounter, "HTTP1", ) - item := reqResMatcher.registerResponse(ident, res, superTimer.CaptureTime, progress.Current(), res.ProtoMinor) + item := reqResMatcher.registerResponse(ident, res, captureTime, progress.Current(), res.ProtoMinor) if item != nil { item.ConnectionInfo = &api.ConnectionInfo{ ClientIP: tcpID.DstIP, diff --git a/tap/extensions/http/main.go b/tap/extensions/http/main.go index 156e3ab54..0256a46b6 100644 --- a/tap/extensions/http/main.go +++ b/tap/extensions/http/main.go @@ -122,7 +122,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha } if isHTTP2 { - err = handleHTTP2Stream(http2Assembler, reader.Progress, reader.Parent.Origin, reader.TcpID, reader.SuperTimer, reader.Emitter, options, reqResMatcher) + err = handleHTTP2Stream(http2Assembler, reader.Progress, reader.Parent.Origin, reader.TcpID, reader.CaptureTime, reader.Emitter, options, reqResMatcher) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { @@ -131,7 +131,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha reader.Parent.CloseOtherProtocolDissectors(&http11protocol) } else if reader.IsClient { var req *http.Request - switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, reader.Progress, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.SuperTimer, reader.Emitter, options, reqResMatcher) + switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, reader.Progress, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.CaptureTime, reader.Emitter, options, reqResMatcher) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { @@ -149,7 +149,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha reader.TcpID.DstPort, "HTTP2", ) - item := reqResMatcher.registerRequest(ident, req, reader.SuperTimer.CaptureTime, reader.Progress.Current(), req.ProtoMinor) + item := reqResMatcher.registerRequest(ident, req, reader.CaptureTime, reader.Progress.Current(), req.ProtoMinor) if item != nil { item.ConnectionInfo = &api.ConnectionInfo{ ClientIP: reader.TcpID.SrcIP, @@ -163,7 +163,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha } } } else { - switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, reader.Progress, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.SuperTimer, reader.Emitter, options, reqResMatcher) + switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, reader.Progress, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.CaptureTime, reader.Emitter, options, reqResMatcher) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { diff --git a/tap/extensions/http/main_test.go b/tap/extensions/http/main_test.go index 793c0b8d5..ad28d5ed3 100644 --- a/tap/extensions/http/main_test.go +++ b/tap/extensions/http/main_test.go @@ -134,7 +134,6 @@ func TestDissect(t *testing.T) { }, IsClient: true, TcpID: tcpIDClient, - SuperTimer: &api.SuperTimer{}, Emitter: emitter, ReqResMatcher: reqResMatcher, } @@ -164,7 +163,6 @@ func TestDissect(t *testing.T) { }, IsClient: false, TcpID: tcpIDServer, - SuperTimer: &api.SuperTimer{}, Emitter: emitter, ReqResMatcher: reqResMatcher, } diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index d527095cb..4c3c0c863 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -44,13 +44,13 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha } if reader.IsClient { - _, _, err := ReadRequest(b, reader.TcpID, reader.CounterPair, reader.SuperTimer, reqResMatcher) + _, _, err := ReadRequest(b, reader.TcpID, reader.CounterPair, reader.CaptureTime, reqResMatcher) if err != nil { return err } reader.Parent.CloseOtherProtocolDissectors(&_protocol) } else { - err := ReadResponse(b, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.SuperTimer, reader.Emitter, reqResMatcher) + err := ReadResponse(b, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.CaptureTime, reader.Emitter, reqResMatcher) if err != nil { return err } diff --git a/tap/extensions/kafka/main_test.go b/tap/extensions/kafka/main_test.go index 0ba54f5b8..f2e688968 100644 --- a/tap/extensions/kafka/main_test.go +++ b/tap/extensions/kafka/main_test.go @@ -133,7 +133,6 @@ func TestDissect(t *testing.T) { }, IsClient: true, TcpID: tcpIDClient, - SuperTimer: &api.SuperTimer{}, Emitter: emitter, ReqResMatcher: reqResMatcher, } @@ -163,7 +162,6 @@ func TestDissect(t *testing.T) { }, IsClient: false, TcpID: tcpIDServer, - SuperTimer: &api.SuperTimer{}, Emitter: emitter, ReqResMatcher: reqResMatcher, } diff --git a/tap/extensions/kafka/request.go b/tap/extensions/kafka/request.go index d3d286300..511efa022 100644 --- a/tap/extensions/kafka/request.go +++ b/tap/extensions/kafka/request.go @@ -19,7 +19,7 @@ type Request struct { CaptureTime time.Time `json:"captureTime"` } -func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, reqResMatcher *requestResponseMatcher) (apiKey ApiKey, apiVersion int16, err error) { +func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, captureTime time.Time, reqResMatcher *requestResponseMatcher) (apiKey ApiKey, apiVersion int16, err error) { d := &decoder{reader: r, remain: 4} size := d.readInt32() @@ -206,7 +206,7 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, su ApiVersion: apiVersion, CorrelationID: correlationID, ClientID: clientID, - CaptureTime: superTimer.CaptureTime, + CaptureTime: captureTime, Payload: payload, } diff --git a/tap/extensions/kafka/response.go b/tap/extensions/kafka/response.go index e18211e71..3fd13b948 100644 --- a/tap/extensions/kafka/response.go +++ b/tap/extensions/kafka/response.go @@ -16,7 +16,7 @@ type Response struct { CaptureTime time.Time `json:"captureTime"` } -func ReadResponse(r io.Reader, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, reqResMatcher *requestResponseMatcher) (err error) { +func ReadResponse(r io.Reader, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, captureTime time.Time, emitter api.Emitter, reqResMatcher *requestResponseMatcher) (err error) { d := &decoder{reader: r, remain: 4} size := d.readInt32() @@ -43,7 +43,7 @@ func ReadResponse(r io.Reader, capture api.Capture, tcpID *api.TcpID, counterPai Size: size, CorrelationID: correlationID, Payload: payload, - CaptureTime: superTimer.CaptureTime, + CaptureTime: captureTime, } key := fmt.Sprintf( diff --git a/tap/extensions/redis/handlers.go b/tap/extensions/redis/handlers.go index b9a364850..8cdd65759 100644 --- a/tap/extensions/redis/handlers.go +++ b/tap/extensions/redis/handlers.go @@ -2,11 +2,12 @@ package redis import ( "fmt" + "time" "github.com/up9inc/mizu/tap/api" ) -func handleClientStream(progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket, reqResMatcher *requestResponseMatcher) error { +func handleClientStream(progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, captureTime time.Time, emitter api.Emitter, request *RedisPacket, reqResMatcher *requestResponseMatcher) error { counterPair.Lock() counterPair.Request++ requestCounter := counterPair.Request @@ -21,7 +22,7 @@ func handleClientStream(progress *api.ReadProgress, capture api.Capture, tcpID * requestCounter, ) - item := reqResMatcher.registerRequest(ident, request, superTimer.CaptureTime, progress.Current()) + item := reqResMatcher.registerRequest(ident, request, captureTime, progress.Current()) if item != nil { item.Capture = capture item.ConnectionInfo = &api.ConnectionInfo{ @@ -36,7 +37,7 @@ func handleClientStream(progress *api.ReadProgress, capture api.Capture, tcpID * return nil } -func handleServerStream(progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket, reqResMatcher *requestResponseMatcher) error { +func handleServerStream(progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, captureTime time.Time, emitter api.Emitter, response *RedisPacket, reqResMatcher *requestResponseMatcher) error { counterPair.Lock() counterPair.Response++ responseCounter := counterPair.Response @@ -51,7 +52,7 @@ func handleServerStream(progress *api.ReadProgress, capture api.Capture, tcpID * responseCounter, ) - item := reqResMatcher.registerResponse(ident, response, superTimer.CaptureTime, progress.Current()) + item := reqResMatcher.registerResponse(ident, response, captureTime, progress.Current()) if item != nil { item.Capture = capture item.ConnectionInfo = &api.ConnectionInfo{ diff --git a/tap/extensions/redis/main.go b/tap/extensions/redis/main.go index f3143e862..a7b751449 100644 --- a/tap/extensions/redis/main.go +++ b/tap/extensions/redis/main.go @@ -49,9 +49,9 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha } if reader.IsClient { - err = handleClientStream(reader.Progress, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.SuperTimer, reader.Emitter, redisPacket, reqResMatcher) + err = handleClientStream(reader.Progress, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.CaptureTime, reader.Emitter, redisPacket, reqResMatcher) } else { - err = handleServerStream(reader.Progress, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.SuperTimer, reader.Emitter, redisPacket, reqResMatcher) + err = handleServerStream(reader.Progress, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.CaptureTime, reader.Emitter, redisPacket, reqResMatcher) } if err != nil { diff --git a/tap/extensions/redis/main_test.go b/tap/extensions/redis/main_test.go index 289fbbe2e..b6ffa0e47 100644 --- a/tap/extensions/redis/main_test.go +++ b/tap/extensions/redis/main_test.go @@ -133,7 +133,6 @@ func TestDissect(t *testing.T) { }, IsClient: true, TcpID: tcpIDClient, - SuperTimer: &api.SuperTimer{}, Emitter: emitter, ReqResMatcher: reqResMatcher, } @@ -163,7 +162,6 @@ func TestDissect(t *testing.T) { }, IsClient: false, TcpID: tcpIDServer, - SuperTimer: &api.SuperTimer{}, Emitter: emitter, ReqResMatcher: reqResMatcher, } diff --git a/tap/tcp_stream_factory.go b/tap/tcp_stream_factory.go index 2580027d2..7df7a41be 100644 --- a/tap/tcp_stream_factory.go +++ b/tap/tcp_stream_factory.go @@ -78,10 +78,9 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T Response: 0, } stream.Clients = append(stream.Clients, api.TcpReader{ - MsgQueue: make(chan api.TcpReaderDataMsg), - Progress: &api.ReadProgress{}, - SuperTimer: &api.SuperTimer{}, - Ident: fmt.Sprintf("%s %s", net, transport), + MsgQueue: make(chan api.TcpReaderDataMsg), + Progress: &api.ReadProgress{}, + Ident: fmt.Sprintf("%s %s", net, transport), TcpID: &api.TcpID{ SrcIP: srcIp, DstIP: dstIp, @@ -97,10 +96,9 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T ReqResMatcher: reqResMatcher, }) stream.Servers = append(stream.Servers, api.TcpReader{ - MsgQueue: make(chan api.TcpReaderDataMsg), - Progress: &api.ReadProgress{}, - SuperTimer: &api.SuperTimer{}, - Ident: fmt.Sprintf("%s %s", net, transport), + MsgQueue: make(chan api.TcpReaderDataMsg), + Progress: &api.ReadProgress{}, + Ident: fmt.Sprintf("%s %s", net, transport), TcpID: &api.TcpID{ SrcIP: net.Dst().String(), DstIP: net.Src().String(), diff --git a/tap/tlstapper/tls_poller.go b/tap/tlstapper/tls_poller.go index 8e4d6411a..a3d92f569 100644 --- a/tap/tlstapper/tls_poller.go +++ b/tap/tlstapper/tls_poller.go @@ -174,7 +174,6 @@ func dissect(extension *api.Extension, reader *tlsReader, isRequest bool, tcpid }, IsClient: isRequest, TcpID: tcpid, - SuperTimer: &api.SuperTimer{}, Emitter: tlsEmitter, ReqResMatcher: reqResMatcher, }