From 7dca1ad889359113349d1e7f0452530a3151cdd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=2E=20Mert=20Y=C4=B1ld=C4=B1ran?= Date: Mon, 13 Sep 2021 16:22:16 +0300 Subject: [PATCH] Move `stats_tracker.go` into the extension API and increment `MatchedPairs` from inside the `Emit` method (#272) * Move `stats_tracker.go` into the extension API and increment `MatchedPairs` from inside the `Emit` method * Replace multiple `sync.Mutex`(es) with low-level atomic memory primitives --- tap/api/api.go | 2 + tap/api/stats_tracker.go | 70 +++++++++++++++++++++++ tap/passive_tapper.go | 31 +++++----- tap/stats_tracker.go | 117 -------------------------------------- tap/tcp_stream.go | 2 +- tap/tcp_stream_factory.go | 4 +- 6 files changed, 91 insertions(+), 135 deletions(-) create mode 100644 tap/api/stats_tracker.go delete mode 100644 tap/stats_tracker.go diff --git a/tap/api/api.go b/tap/api/api.go index 4c3872798..e9f9c0208 100644 --- a/tap/api/api.go +++ b/tap/api/api.go @@ -87,6 +87,7 @@ type Dissector interface { } type Emitting struct { + AppStats *AppStats OutputChannel chan *OutputChannelItem } @@ -96,6 +97,7 @@ type Emitter interface { func (e *Emitting) Emit(item *OutputChannelItem) { e.OutputChannel <- item + e.AppStats.IncMatchedPairs() } type MizuEntry struct { diff --git a/tap/api/stats_tracker.go b/tap/api/stats_tracker.go new file mode 100644 index 000000000..e8905026f --- /dev/null +++ b/tap/api/stats_tracker.go @@ -0,0 +1,70 @@ +package api + +import ( + "sync/atomic" + "time" +) + +type AppStats struct { + StartTime time.Time `json:"-"` + ProcessedBytes uint64 `json:"processedBytes"` + PacketsCount uint64 `json:"packetsCount"` + TcpPacketsCount uint64 `json:"tcpPacketsCount"` + ReassembledTcpPayloadsCount uint64 `json:"reassembledTcpPayloadsCount"` + TlsConnectionsCount uint64 `json:"tlsConnectionsCount"` + MatchedPairs uint64 `json:"matchedPairs"` + DroppedTcpStreams uint64 `json:"droppedTcpStreams"` +} + +func (as *AppStats) IncMatchedPairs() { + atomic.AddUint64(&as.MatchedPairs, 1) +} + +func (as *AppStats) IncDroppedTcpStreams() { + atomic.AddUint64(&as.DroppedTcpStreams, 1) +} + +func (as *AppStats) IncPacketsCount() uint64 { + atomic.AddUint64(&as.PacketsCount, 1) + return as.PacketsCount +} + +func (as *AppStats) IncTcpPacketsCount() { + atomic.AddUint64(&as.TcpPacketsCount, 1) +} + +func (as *AppStats) IncReassembledTcpPayloadsCount() { + atomic.AddUint64(&as.ReassembledTcpPayloadsCount, 1) +} + +func (as *AppStats) IncTlsConnectionsCount() { + atomic.AddUint64(&as.TlsConnectionsCount, 1) +} + +func (as *AppStats) UpdateProcessedBytes(size uint64) { + atomic.AddUint64(&as.ProcessedBytes, size) +} + +func (as *AppStats) SetStartTime(startTime time.Time) { + as.StartTime = startTime +} + +func (as *AppStats) DumpStats() *AppStats { + currentAppStats := &AppStats{StartTime: as.StartTime} + + currentAppStats.ProcessedBytes = resetUint64(&as.ProcessedBytes) + currentAppStats.PacketsCount = resetUint64(&as.PacketsCount) + currentAppStats.TcpPacketsCount = resetUint64(&as.TcpPacketsCount) + currentAppStats.ReassembledTcpPayloadsCount = resetUint64(&as.ReassembledTcpPayloadsCount) + currentAppStats.TlsConnectionsCount = resetUint64(&as.TlsConnectionsCount) + currentAppStats.MatchedPairs = resetUint64(&as.MatchedPairs) + currentAppStats.DroppedTcpStreams = resetUint64(&as.DroppedTcpStreams) + + return currentAppStats +} + +func resetUint64(ref *uint64) (val uint64) { + val = atomic.LoadUint64(ref) + atomic.StoreUint64(ref, 0) + return +} diff --git a/tap/passive_tapper.go b/tap/passive_tapper.go index 1f9ecb25b..d4aec99f3 100644 --- a/tap/passive_tapper.go +++ b/tap/passive_tapper.go @@ -63,7 +63,7 @@ var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to k var memprofile = flag.String("memprofile", "", "Write memory profile") -var statsTracker = StatsTracker{} +var appStats = api.AppStats{} // global var stats struct { @@ -152,8 +152,8 @@ type Context struct { CaptureInfo gopacket.CaptureInfo } -func GetStats() AppStats { - return statsTracker.appStats +func GetStats() api.AppStats { + return appStats } func (c *Context) GetCaptureInfo() gopacket.CaptureInfo { @@ -225,8 +225,8 @@ func closeTimedoutTcpStreamChannels() { if stream.superIdentifier.Protocol == nil { if !stream.isClosed && time.Now().After(streamWrapper.createdAt.Add(TcpStreamChannelTimeoutMs)) { stream.Close() - statsTracker.incDroppedTcpStreams() - rlog.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d\n", statsTracker.appStats.DroppedTcpStreams, runtime.NumGoroutine(), TcpStreamChannelTimeoutMs/1000000) + appStats.IncDroppedTcpStreams() + rlog.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d\n", appStats.DroppedTcpStreams, runtime.NumGoroutine(), TcpStreamChannelTimeoutMs/1000000) } } else { if !stream.superIdentifier.IsClosedOthers { @@ -328,10 +328,11 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) { source.Lazy = *lazy source.NoCopy = true rlog.Info("Starting to read packets") - statsTracker.setStartTime(time.Now()) + appStats.SetStartTime(time.Now()) defragger := ip4defrag.NewIPv4Defragmenter() var emitter api.Emitter = &api.Emitting{ + AppStats: &appStats, OutputChannel: outputItems, } @@ -374,7 +375,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) { errorsSummery := fmt.Sprintf("%v", errorsMap) errorsMapMutex.Unlock() log.Printf("%v (errors: %v, errTypes:%v) - Errors Summary: %s", - time.Since(statsTracker.appStats.StartTime), + time.Since(appStats.StartTime), nErrors, errorMapLen, errorsSummery, @@ -397,7 +398,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) { cleanStats.closed, cleanStats.deleted, ) - currentAppStats := statsTracker.dumpStats() + currentAppStats := appStats.DumpStats() appStatsJSON, _ := json.Marshal(currentAppStats) log.Printf("app stats - %v", string(appStatsJSON)) } @@ -415,10 +416,10 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) { rlog.Debugf("Error:", err) continue } - packetsCount := statsTracker.incPacketsCount() + packetsCount := appStats.IncPacketsCount() rlog.Debugf("PACKET #%d", packetsCount) data := packet.Data() - statsTracker.updateProcessedBytes(int64(len(data))) + appStats.UpdateProcessedBytes(uint64(len(data))) if *hexdumppkt { rlog.Debugf("Packet content (%d/0x%x) - %s", len(data), len(data), hex.Dump(data)) } @@ -452,7 +453,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) { tcp := packet.Layer(layers.LayerTypeTCP) if tcp != nil { - statsTracker.incTcpPacketsCount() + appStats.IncTcpPacketsCount() tcp := tcp.(*layers.TCP) if *checksum { err := tcp.SetNetworkLayerForChecksum(packet.NetworkLayer()) @@ -470,15 +471,15 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) { assemblerMutex.Unlock() } - done := *maxcount > 0 && statsTracker.appStats.PacketsCount >= *maxcount + done := *maxcount > 0 && int64(appStats.PacketsCount) >= *maxcount if done { errorsMapMutex.Lock() errorMapLen := len(errorsMap) errorsMapMutex.Unlock() log.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)", - statsTracker.appStats.PacketsCount, - statsTracker.appStats.ProcessedBytes, - time.Since(statsTracker.appStats.StartTime), + appStats.PacketsCount, + appStats.ProcessedBytes, + time.Since(appStats.StartTime), nErrors, errorMapLen) } diff --git a/tap/stats_tracker.go b/tap/stats_tracker.go deleted file mode 100644 index 2ddbc7623..000000000 --- a/tap/stats_tracker.go +++ /dev/null @@ -1,117 +0,0 @@ -package tap - -import ( - "sync" - "time" -) - -type AppStats struct { - StartTime time.Time `json:"-"` - ProcessedBytes int64 `json:"processedBytes"` - PacketsCount int64 `json:"packetsCount"` - TcpPacketsCount int64 `json:"tcpPacketsCount"` - ReassembledTcpPayloadsCount int64 `json:"reassembledTcpPayloadsCount"` - TlsConnectionsCount int64 `json:"tlsConnectionsCount"` - MatchedPairs int64 `json:"matchedPairs"` - DroppedTcpStreams int64 `json:"droppedTcpStreams"` -} - -type StatsTracker struct { - appStats AppStats - processedBytesMutex sync.Mutex - packetsCountMutex sync.Mutex - tcpPacketsCountMutex sync.Mutex - reassembledTcpPayloadsCountMutex sync.Mutex - tlsConnectionsCountMutex sync.Mutex - matchedPairsMutex sync.Mutex - droppedTcpStreamsMutex sync.Mutex -} - -func (st *StatsTracker) incMatchedPairs() { - st.matchedPairsMutex.Lock() - st.appStats.MatchedPairs++ - st.matchedPairsMutex.Unlock() -} - -func (st *StatsTracker) incDroppedTcpStreams() { - st.droppedTcpStreamsMutex.Lock() - st.appStats.DroppedTcpStreams++ - st.droppedTcpStreamsMutex.Unlock() -} - -func (st *StatsTracker) incPacketsCount() int64 { - st.packetsCountMutex.Lock() - st.appStats.PacketsCount++ - currentPacketsCount := st.appStats.PacketsCount - st.packetsCountMutex.Unlock() - return currentPacketsCount -} - -func (st *StatsTracker) incTcpPacketsCount() { - st.tcpPacketsCountMutex.Lock() - st.appStats.TcpPacketsCount++ - st.tcpPacketsCountMutex.Unlock() -} - -func (st *StatsTracker) incReassembledTcpPayloadsCount() { - st.reassembledTcpPayloadsCountMutex.Lock() - st.appStats.ReassembledTcpPayloadsCount++ - st.reassembledTcpPayloadsCountMutex.Unlock() -} - -func (st *StatsTracker) incTlsConnectionsCount() { - st.tlsConnectionsCountMutex.Lock() - st.appStats.TlsConnectionsCount++ - st.tlsConnectionsCountMutex.Unlock() -} - -func (st *StatsTracker) updateProcessedBytes(size int64) { - st.processedBytesMutex.Lock() - st.appStats.ProcessedBytes += size - st.processedBytesMutex.Unlock() -} - -func (st *StatsTracker) setStartTime(startTime time.Time) { - st.appStats.StartTime = startTime -} - -func (st *StatsTracker) dumpStats() *AppStats { - currentAppStats := &AppStats{StartTime: st.appStats.StartTime} - - st.processedBytesMutex.Lock() - currentAppStats.ProcessedBytes = st.appStats.ProcessedBytes - st.appStats.ProcessedBytes = 0 - st.processedBytesMutex.Unlock() - - st.packetsCountMutex.Lock() - currentAppStats.PacketsCount = st.appStats.PacketsCount - st.appStats.PacketsCount = 0 - st.packetsCountMutex.Unlock() - - st.tcpPacketsCountMutex.Lock() - currentAppStats.TcpPacketsCount = st.appStats.TcpPacketsCount - st.appStats.TcpPacketsCount = 0 - st.tcpPacketsCountMutex.Unlock() - - st.reassembledTcpPayloadsCountMutex.Lock() - currentAppStats.ReassembledTcpPayloadsCount = st.appStats.ReassembledTcpPayloadsCount - st.appStats.ReassembledTcpPayloadsCount = 0 - st.reassembledTcpPayloadsCountMutex.Unlock() - - st.tlsConnectionsCountMutex.Lock() - currentAppStats.TlsConnectionsCount = st.appStats.TlsConnectionsCount - st.appStats.TlsConnectionsCount = 0 - st.tlsConnectionsCountMutex.Unlock() - - st.matchedPairsMutex.Lock() - currentAppStats.MatchedPairs = st.appStats.MatchedPairs - st.appStats.MatchedPairs = 0 - st.matchedPairsMutex.Unlock() - - st.droppedTcpStreamsMutex.Lock() - currentAppStats.DroppedTcpStreams = st.appStats.DroppedTcpStreams - st.appStats.DroppedTcpStreams = 0 - st.droppedTcpStreamsMutex.Unlock() - - return currentAppStats -} diff --git a/tap/tcp_stream.go b/tap/tcp_stream.go index 0df9c313e..2c9b91d33 100644 --- a/tap/tcp_stream.go +++ b/tap/tcp_stream.go @@ -147,7 +147,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass if length > 0 { // This is where we pass the reassembled information onwards // This channel is read by an tcpReader object - statsTracker.incReassembledTcpPayloadsCount() + appStats.IncReassembledTcpPayloadsCount() timestamp := ac.GetCaptureInfo().Timestamp if dir == reassembly.TCPDirClientToServer { for i := range t.clients { diff --git a/tap/tcp_stream_factory.go b/tap/tcp_stream_factory.go index d96878921..e14e8860f 100644 --- a/tap/tcp_stream_factory.go +++ b/tap/tcp_stream_factory.go @@ -62,8 +62,8 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T } if stream.isTapTarget { if runtime.NumGoroutine() > maxNumberOfGoroutines { - statsTracker.incDroppedTcpStreams() - rlog.Debugf("Dropped a TCP stream because of load. Total dropped: %d Total Goroutines: %d\n", statsTracker.appStats.DroppedTcpStreams, runtime.NumGoroutine()) + appStats.IncDroppedTcpStreams() + rlog.Debugf("Dropped a TCP stream because of load. Total dropped: %d Total Goroutines: %d\n", appStats.DroppedTcpStreams, runtime.NumGoroutine()) return stream } streamId++