From 24f79922e98590dba425f2806ea7d697fb211f21 Mon Sep 17 00:00:00 2001 From: Nimrod Gilboa Markevich <59927337+nimrod-up9@users.noreply.github.com> Date: Mon, 16 Aug 2021 15:50:04 +0300 Subject: [PATCH] Add to periodic stats print in tapper (#221) #patch --- tap/http_reader.go | 7 +-- tap/passive_tapper.go | 24 +++++----- tap/stats_tracker.go | 105 +++++++++++++++++++++++++++++++----------- tap/tcp_stream.go | 1 + 4 files changed, 95 insertions(+), 42 deletions(-) diff --git a/tap/http_reader.go b/tap/http_reader.go index 6f059afaa..da7266a8b 100644 --- a/tap/http_reader.go +++ b/tap/http_reader.go @@ -79,6 +79,7 @@ func (h *httpReader) Read(p []byte) (int, error) { clientHello := tlsx.ClientHello{} err := clientHello.Unmarshall(msg.bytes) if err == nil { + statsTracker.incTlsConnectionsCount() fmt.Printf("Detected TLS client hello with SNI %s\n", clientHello.SNI) numericPort, _ := strconv.Atoi(h.tcpID.dstPort) h.outboundLinkWriter.WriteOutboundLink(h.tcpID.srcIP, h.tcpID.dstIP, numericPort, clientHello.SNI, TLSProtocol) @@ -176,7 +177,7 @@ func (h *httpReader) handleHTTP2Stream() error { } if reqResPair != nil { - statsTracker.incMatchedMessages() + statsTracker.incMatchedPairs() if h.harWriter != nil { h.harWriter.WritePair( @@ -215,7 +216,7 @@ func (h *httpReader) handleHTTP1ClientStream(b *bufio.Reader) error { ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.srcIP, h.tcpID.dstIP, h.tcpID.srcPort, h.tcpID.dstPort, h.messageCount) reqResPair := reqResMatcher.registerRequest(ident, req, h.captureTime) if reqResPair != nil { - statsTracker.incMatchedMessages() + statsTracker.incMatchedPairs() if h.harWriter != nil { h.harWriter.WritePair( @@ -281,7 +282,7 @@ func (h *httpReader) handleHTTP1ServerStream(b *bufio.Reader) error { ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.dstIP, h.tcpID.srcIP, h.tcpID.dstPort, h.tcpID.srcPort, h.messageCount) reqResPair := reqResMatcher.registerResponse(ident, res, h.captureTime) if reqResPair != nil { - statsTracker.incMatchedMessages() + statsTracker.incMatchedPairs() if h.harWriter != nil { h.harWriter.WritePair( diff --git a/tap/passive_tapper.go b/tap/passive_tapper.go index 79274029f..afb0ee37d 100644 --- a/tap/passive_tapper.go +++ b/tap/passive_tapper.go @@ -10,9 +10,9 @@ package tap import ( "encoding/hex" + "encoding/json" "flag" "fmt" - "github.com/romana/rlog" "log" "os" "os/signal" @@ -23,6 +23,8 @@ import ( "sync" "time" + "github.com/romana/rlog" + "github.com/google/gopacket" "github.com/google/gopacket/examples/util" "github.com/google/gopacket/ip4defrag" @@ -384,9 +386,7 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr errorMapLen := len(errorsMap) errorsSummery := fmt.Sprintf("%v", errorsMap) errorsMapMutex.Unlock() - log.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v) - Errors Summary: %s", - statsTracker.appStats.TotalPacketsCount, - statsTracker.appStats.TotalProcessedBytes, + log.Printf("%v (errors: %v, errTypes:%v) - Errors Summary: %s", time.Since(statsTracker.appStats.StartTime), nErrors, errorMapLen, @@ -405,14 +405,15 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr // Since the last print cleanStats := cleaner.dumpStats() - matchedMessages := statsTracker.dumpStats() log.Printf( - "flushed connections %d, closed connections: %d, deleted messages: %d, matched messages: %d", + "cleaner - flushed connections: %d, closed connections: %d, deleted messages: %d", cleanStats.flushed, cleanStats.closed, cleanStats.deleted, - matchedMessages, ) + currentAppStats := statsTracker.dumpStats() + appStatsJSON, _ := json.Marshal(currentAppStats) + log.Printf("app stats - %v", string(appStatsJSON)) } }() @@ -424,7 +425,7 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr packetsCount := statsTracker.incPacketsCount() rlog.Debugf("PACKET #%d", packetsCount) data := packet.Data() - statsTracker.updateProcessedSize(int64(len(data))) + statsTracker.updateProcessedBytes(int64(len(data))) if *hexdumppkt { rlog.Debugf("Packet content (%d/0x%x) - %s", len(data), len(data), hex.Dump(data)) } @@ -458,6 +459,7 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr tcp := packet.Layer(layers.LayerTypeTCP) if tcp != nil { + statsTracker.incTcpPacketsCount() tcp := tcp.(*layers.TCP) if *checksum { err := tcp.SetNetworkLayerForChecksum(packet.NetworkLayer()) @@ -475,14 +477,14 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr assemblerMutex.Unlock() } - done := *maxcount > 0 && statsTracker.appStats.TotalPacketsCount >= *maxcount + done := *maxcount > 0 && statsTracker.appStats.PacketsCount >= *maxcount if done { errorsMapMutex.Lock() errorMapLen := len(errorsMap) errorsMapMutex.Unlock() log.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)", - statsTracker.appStats.TotalPacketsCount, - statsTracker.appStats.TotalProcessedBytes, + statsTracker.appStats.PacketsCount, + statsTracker.appStats.ProcessedBytes, time.Since(statsTracker.appStats.StartTime), nErrors, errorMapLen) diff --git a/tap/stats_tracker.go b/tap/stats_tracker.go index c659e150c..350928330 100644 --- a/tap/stats_tracker.go +++ b/tap/stats_tracker.go @@ -6,50 +6,99 @@ import ( ) type AppStats struct { - StartTime time.Time `json:"startTime"` - MatchedMessages int `json:"matchedMessages"` - TotalPacketsCount int64 `json:"totalPacketsCount"` - TotalProcessedBytes int64 `json:"totalProcessedBytes"` - TotalMatchedMessages int64 `json:"totalMatchedMessages"` + StartTime time.Time `json:"-"` + ProcessedBytes int64 `json:"processedBytes"` + PacketsCount int64 `json:"packetsCount"` + TcpPacketsCount int64 `json:"tcpPacketsCount"` + ReassembledTcpPayloadsCount int64 `json:"reassembledTcpPayloadsCount"` + TlsConnectionsCount int64 `json:"tlsConnectionsCount"` + MatchedPairs int64 `json:"matchedPairs"` } type StatsTracker struct { - appStats AppStats - matchedMessagesMutex sync.Mutex - totalPacketsCountMutex sync.Mutex - totalProcessedSizeMutex sync.Mutex + appStats AppStats + processedBytesMutex sync.Mutex + packetsCountMutex sync.Mutex + tcpPacketsCountMutex sync.Mutex + reassembledTcpPayloadsCountMutex sync.Mutex + tlsConnectionsCountMutex sync.Mutex + matchedPairsMutex sync.Mutex } -func (st *StatsTracker) incMatchedMessages() { - st.matchedMessagesMutex.Lock() - st.appStats.MatchedMessages++ - st.appStats.TotalMatchedMessages++ - st.matchedMessagesMutex.Unlock() +func (st *StatsTracker) incMatchedPairs() { + st.matchedPairsMutex.Lock() + st.appStats.MatchedPairs++ + st.matchedPairsMutex.Unlock() } func (st *StatsTracker) incPacketsCount() int64 { - st.totalPacketsCountMutex.Lock() - st.appStats.TotalPacketsCount++ - currentPacketsCount := st.appStats.TotalPacketsCount - st.totalPacketsCountMutex.Unlock() + st.packetsCountMutex.Lock() + st.appStats.PacketsCount++ + currentPacketsCount := st.appStats.PacketsCount + st.packetsCountMutex.Unlock() return currentPacketsCount } -func (st *StatsTracker) updateProcessedSize(size int64) { - st.totalProcessedSizeMutex.Lock() - st.appStats.TotalProcessedBytes += size - st.totalProcessedSizeMutex.Unlock() +func (st *StatsTracker) incTcpPacketsCount() { + st.tcpPacketsCountMutex.Lock() + st.appStats.TcpPacketsCount++ + st.tcpPacketsCountMutex.Unlock() +} + +func (st *StatsTracker) incReassembledTcpPayloadsCount() { + st.reassembledTcpPayloadsCountMutex.Lock() + st.appStats.ReassembledTcpPayloadsCount++ + st.reassembledTcpPayloadsCountMutex.Unlock() +} + +func (st *StatsTracker) incTlsConnectionsCount() { + st.tlsConnectionsCountMutex.Lock() + st.appStats.TlsConnectionsCount++ + st.tlsConnectionsCountMutex.Unlock() +} + +func (st *StatsTracker) updateProcessedBytes(size int64) { + st.processedBytesMutex.Lock() + st.appStats.ProcessedBytes += size + st.processedBytesMutex.Unlock() } func (st *StatsTracker) setStartTime(startTime time.Time) { st.appStats.StartTime = startTime } -func (st *StatsTracker) dumpStats() int { - st.matchedMessagesMutex.Lock() - matchedMessages := st.appStats.MatchedMessages - st.appStats.MatchedMessages = 0 - st.matchedMessagesMutex.Unlock() +func (st *StatsTracker) dumpStats() *AppStats { + currentAppStats := &AppStats{StartTime: st.appStats.StartTime} - return matchedMessages + st.processedBytesMutex.Lock() + currentAppStats.ProcessedBytes = st.appStats.ProcessedBytes + st.appStats.ProcessedBytes = 0 + st.processedBytesMutex.Unlock() + + st.packetsCountMutex.Lock() + currentAppStats.PacketsCount = st.appStats.PacketsCount + st.appStats.PacketsCount = 0 + st.packetsCountMutex.Unlock() + + st.tcpPacketsCountMutex.Lock() + currentAppStats.TcpPacketsCount = st.appStats.TcpPacketsCount + st.appStats.TcpPacketsCount = 0 + st.tcpPacketsCountMutex.Unlock() + + st.reassembledTcpPayloadsCountMutex.Lock() + currentAppStats.ReassembledTcpPayloadsCount = st.appStats.ReassembledTcpPayloadsCount + st.appStats.ReassembledTcpPayloadsCount = 0 + st.reassembledTcpPayloadsCountMutex.Unlock() + + st.tlsConnectionsCountMutex.Lock() + currentAppStats.TlsConnectionsCount = st.appStats.TlsConnectionsCount + st.appStats.TlsConnectionsCount = 0 + st.tlsConnectionsCountMutex.Unlock() + + st.matchedPairsMutex.Lock() + currentAppStats.MatchedPairs = st.appStats.MatchedPairs + st.appStats.MatchedPairs = 0 + st.matchedPairsMutex.Unlock() + + return currentAppStats } diff --git a/tap/tcp_stream.go b/tap/tcp_stream.go index e2ac51c55..b886567bd 100644 --- a/tap/tcp_stream.go +++ b/tap/tcp_stream.go @@ -148,6 +148,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass } // This is where we pass the reassembled information onwards // This channel is read by an httpReader object + statsTracker.incReassembledTcpPayloadsCount() if dir == reassembly.TCPDirClientToServer && !t.reversed { t.client.msgQueue <- httpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp} } else {