From ba4bab3ed5800d3d527c54fd91b7e67206835021 Mon Sep 17 00:00:00 2001 From: "M. Mert Yildiran" Date: Wed, 20 Apr 2022 10:48:12 +0300 Subject: [PATCH] Remove `tcpStreamWrapper` struct --- tap/cleaner.go | 2 +- tap/tcp_stream.go | 7 +++++-- tap/tcp_stream_factory.go | 13 +------------ tap/tcp_streams_map.go | 5 ++--- 4 files changed, 9 insertions(+), 18 deletions(-) diff --git a/tap/cleaner.go b/tap/cleaner.go index cdf2acf20..ca2b24d37 100644 --- a/tap/cleaner.go +++ b/tap/cleaner.go @@ -34,7 +34,7 @@ func (cl *Cleaner) clean() { cl.assemblerMutex.Unlock() cl.streamsMap.streams.Range(func(k, v interface{}) bool { - reqResMatcher := v.(*tcpStreamWrapper).reqResMatcher + reqResMatcher := v.(*tcpStream).reqResMatcher if reqResMatcher == nil { return true } diff --git a/tap/tcp_stream.go b/tap/tcp_stream.go index ea82861fc..433d349bc 100644 --- a/tap/tcp_stream.go +++ b/tap/tcp_stream.go @@ -3,6 +3,7 @@ package tap import ( "encoding/binary" "sync" + "time" "github.com/google/gopacket" "github.com/google/gopacket/layers" // pulls in all layers decoders @@ -30,6 +31,8 @@ type tcpStream struct { servers []tcpReader ident string origin api.Capture + reqResMatcher api.RequestResponseMatcher + createdAt time.Time sync.Mutex streamsMap *tcpStreamMap } @@ -71,9 +74,9 @@ func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassem if !accept { diagnose.InternalStats.RejectOpt++ } - + *start = true - + return accept } diff --git a/tap/tcp_stream_factory.go b/tap/tcp_stream_factory.go index 06ec19f2b..f1074f02c 100644 --- a/tap/tcp_stream_factory.go +++ b/tap/tcp_stream_factory.go @@ -3,7 +3,6 @@ package tap import ( "fmt" "sync" - "time" "github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/tap/api" @@ -27,12 +26,6 @@ type tcpStreamFactory struct { opts *TapOpts } -type tcpStreamWrapper struct { - stream *tcpStream - reqResMatcher api.RequestResponseMatcher - createdAt time.Time -} - func NewTcpStreamFactory(emitter api.Emitter, streamsMap *tcpStreamMap, opts *TapOpts) *tcpStreamFactory { var ownIps []string @@ -123,11 +116,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T reqResMatcher: reqResMatcher, }) - factory.streamsMap.Store(stream.id, &tcpStreamWrapper{ - stream: stream, - reqResMatcher: reqResMatcher, - createdAt: time.Now(), - }) + factory.streamsMap.Store(stream.id, stream) factory.wg.Add(2) // Start reading from channel stream.reader.bytes diff --git a/tap/tcp_streams_map.go b/tap/tcp_streams_map.go index 9a94f41ce..b3b2be8aa 100644 --- a/tap/tcp_streams_map.go +++ b/tap/tcp_streams_map.go @@ -67,10 +67,9 @@ func (streamMap *tcpStreamMap) closeTimedoutTcpStreamChannels() { time.Sleep(closeTimedoutTcpChannelsIntervalMs) _debug.FreeOSMemory() streamMap.streams.Range(func(key interface{}, value interface{}) bool { - streamWrapper := value.(*tcpStreamWrapper) - stream := streamWrapper.stream + stream := value.(*tcpStream) if stream.superIdentifier.Protocol == nil { - if !stream.isClosed && time.Now().After(streamWrapper.createdAt.Add(tcpStreamChannelTimeout)) { + if !stream.isClosed && time.Now().After(stream.createdAt.Add(tcpStreamChannelTimeout)) { stream.Close() diagnose.AppStats.IncDroppedTcpStreams() logger.Log.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d",