Add to periodic stats print in tapper (#220)

This commit is contained in:
Nimrod Gilboa Markevich 2021-08-16 14:51:01 +03:00 committed by GitHub
parent b4f3b2c540
commit 5d5c11c37c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 95 additions and 42 deletions

View File

@ -79,6 +79,7 @@ func (h *httpReader) Read(p []byte) (int, error) {
clientHello := tlsx.ClientHello{} clientHello := tlsx.ClientHello{}
err := clientHello.Unmarshall(msg.bytes) err := clientHello.Unmarshall(msg.bytes)
if err == nil { if err == nil {
statsTracker.incTlsConnectionsCount()
fmt.Printf("Detected TLS client hello with SNI %s\n", clientHello.SNI) fmt.Printf("Detected TLS client hello with SNI %s\n", clientHello.SNI)
numericPort, _ := strconv.Atoi(h.tcpID.dstPort) numericPort, _ := strconv.Atoi(h.tcpID.dstPort)
h.outboundLinkWriter.WriteOutboundLink(h.tcpID.srcIP, h.tcpID.dstIP, numericPort, clientHello.SNI, TLSProtocol) h.outboundLinkWriter.WriteOutboundLink(h.tcpID.srcIP, h.tcpID.dstIP, numericPort, clientHello.SNI, TLSProtocol)
@ -176,7 +177,7 @@ func (h *httpReader) handleHTTP2Stream() error {
} }
if reqResPair != nil { if reqResPair != nil {
statsTracker.incMatchedMessages() statsTracker.incMatchedPairs()
if h.harWriter != nil { if h.harWriter != nil {
h.harWriter.WritePair( h.harWriter.WritePair(
@ -215,7 +216,7 @@ func (h *httpReader) handleHTTP1ClientStream(b *bufio.Reader) error {
ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.srcIP, h.tcpID.dstIP, h.tcpID.srcPort, h.tcpID.dstPort, h.messageCount) ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.srcIP, h.tcpID.dstIP, h.tcpID.srcPort, h.tcpID.dstPort, h.messageCount)
reqResPair := reqResMatcher.registerRequest(ident, req, h.captureTime) reqResPair := reqResMatcher.registerRequest(ident, req, h.captureTime)
if reqResPair != nil { if reqResPair != nil {
statsTracker.incMatchedMessages() statsTracker.incMatchedPairs()
if h.harWriter != nil { if h.harWriter != nil {
h.harWriter.WritePair( h.harWriter.WritePair(
@ -281,7 +282,7 @@ func (h *httpReader) handleHTTP1ServerStream(b *bufio.Reader) error {
ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.dstIP, h.tcpID.srcIP, h.tcpID.dstPort, h.tcpID.srcPort, h.messageCount) ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.dstIP, h.tcpID.srcIP, h.tcpID.dstPort, h.tcpID.srcPort, h.messageCount)
reqResPair := reqResMatcher.registerResponse(ident, res, h.captureTime) reqResPair := reqResMatcher.registerResponse(ident, res, h.captureTime)
if reqResPair != nil { if reqResPair != nil {
statsTracker.incMatchedMessages() statsTracker.incMatchedPairs()
if h.harWriter != nil { if h.harWriter != nil {
h.harWriter.WritePair( h.harWriter.WritePair(

View File

@ -10,9 +10,9 @@ package tap
import ( import (
"encoding/hex" "encoding/hex"
"encoding/json"
"flag" "flag"
"fmt" "fmt"
"github.com/romana/rlog"
"log" "log"
"os" "os"
"os/signal" "os/signal"
@ -23,6 +23,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/romana/rlog"
"github.com/google/gopacket" "github.com/google/gopacket"
"github.com/google/gopacket/examples/util" "github.com/google/gopacket/examples/util"
"github.com/google/gopacket/ip4defrag" "github.com/google/gopacket/ip4defrag"
@ -374,9 +376,7 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
errorMapLen := len(errorsMap) errorMapLen := len(errorsMap)
errorsSummery := fmt.Sprintf("%v", errorsMap) errorsSummery := fmt.Sprintf("%v", errorsMap)
errorsMapMutex.Unlock() errorsMapMutex.Unlock()
log.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v) - Errors Summary: %s", log.Printf("%v (errors: %v, errTypes:%v) - Errors Summary: %s",
statsTracker.appStats.TotalPacketsCount,
statsTracker.appStats.TotalProcessedBytes,
time.Since(statsTracker.appStats.StartTime), time.Since(statsTracker.appStats.StartTime),
nErrors, nErrors,
errorMapLen, errorMapLen,
@ -395,14 +395,15 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
// Since the last print // Since the last print
cleanStats := cleaner.dumpStats() cleanStats := cleaner.dumpStats()
matchedMessages := statsTracker.dumpStats()
log.Printf( log.Printf(
"flushed connections %d, closed connections: %d, deleted messages: %d, matched messages: %d", "cleaner - flushed connections: %d, closed connections: %d, deleted messages: %d",
cleanStats.flushed, cleanStats.flushed,
cleanStats.closed, cleanStats.closed,
cleanStats.deleted, cleanStats.deleted,
matchedMessages,
) )
currentAppStats := statsTracker.dumpStats()
appStatsJSON, _ := json.Marshal(currentAppStats)
log.Printf("app stats - %v", string(appStatsJSON))
} }
}() }()
@ -414,7 +415,7 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
packetsCount := statsTracker.incPacketsCount() packetsCount := statsTracker.incPacketsCount()
rlog.Debugf("PACKET #%d", packetsCount) rlog.Debugf("PACKET #%d", packetsCount)
data := packet.Data() data := packet.Data()
statsTracker.updateProcessedSize(int64(len(data))) statsTracker.updateProcessedBytes(int64(len(data)))
if *hexdumppkt { if *hexdumppkt {
rlog.Debugf("Packet content (%d/0x%x) - %s", len(data), len(data), hex.Dump(data)) rlog.Debugf("Packet content (%d/0x%x) - %s", len(data), len(data), hex.Dump(data))
} }
@ -448,6 +449,7 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
tcp := packet.Layer(layers.LayerTypeTCP) tcp := packet.Layer(layers.LayerTypeTCP)
if tcp != nil { if tcp != nil {
statsTracker.incTcpPacketsCount()
tcp := tcp.(*layers.TCP) tcp := tcp.(*layers.TCP)
if *checksum { if *checksum {
err := tcp.SetNetworkLayerForChecksum(packet.NetworkLayer()) err := tcp.SetNetworkLayerForChecksum(packet.NetworkLayer())
@ -465,14 +467,14 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
assemblerMutex.Unlock() assemblerMutex.Unlock()
} }
done := *maxcount > 0 && statsTracker.appStats.TotalPacketsCount >= *maxcount done := *maxcount > 0 && statsTracker.appStats.PacketsCount >= *maxcount
if done { if done {
errorsMapMutex.Lock() errorsMapMutex.Lock()
errorMapLen := len(errorsMap) errorMapLen := len(errorsMap)
errorsMapMutex.Unlock() errorsMapMutex.Unlock()
log.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)", log.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)",
statsTracker.appStats.TotalPacketsCount, statsTracker.appStats.PacketsCount,
statsTracker.appStats.TotalProcessedBytes, statsTracker.appStats.ProcessedBytes,
time.Since(statsTracker.appStats.StartTime), time.Since(statsTracker.appStats.StartTime),
nErrors, nErrors,
errorMapLen) errorMapLen)

View File

@ -6,50 +6,99 @@ import (
) )
type AppStats struct { type AppStats struct {
StartTime time.Time `json:"startTime"` StartTime time.Time `json:"-"`
MatchedMessages int `json:"matchedMessages"` ProcessedBytes int64 `json:"processedBytes"`
TotalPacketsCount int64 `json:"totalPacketsCount"` PacketsCount int64 `json:"packetsCount"`
TotalProcessedBytes int64 `json:"totalProcessedBytes"` TcpPacketsCount int64 `json:"tcpPacketsCount"`
TotalMatchedMessages int64 `json:"totalMatchedMessages"` ReassembledTcpPayloadsCount int64 `json:"reassembledTcpPayloadsCount"`
TlsConnectionsCount int64 `json:"tlsConnectionsCount"`
MatchedPairs int64 `json:"matchedPairs"`
} }
type StatsTracker struct { type StatsTracker struct {
appStats AppStats appStats AppStats
matchedMessagesMutex sync.Mutex processedBytesMutex sync.Mutex
totalPacketsCountMutex sync.Mutex packetsCountMutex sync.Mutex
totalProcessedSizeMutex sync.Mutex tcpPacketsCountMutex sync.Mutex
reassembledTcpPayloadsCountMutex sync.Mutex
tlsConnectionsCountMutex sync.Mutex
matchedPairsMutex sync.Mutex
} }
func (st *StatsTracker) incMatchedMessages() { func (st *StatsTracker) incMatchedPairs() {
st.matchedMessagesMutex.Lock() st.matchedPairsMutex.Lock()
st.appStats.MatchedMessages++ st.appStats.MatchedPairs++
st.appStats.TotalMatchedMessages++ st.matchedPairsMutex.Unlock()
st.matchedMessagesMutex.Unlock()
} }
func (st *StatsTracker) incPacketsCount() int64 { func (st *StatsTracker) incPacketsCount() int64 {
st.totalPacketsCountMutex.Lock() st.packetsCountMutex.Lock()
st.appStats.TotalPacketsCount++ st.appStats.PacketsCount++
currentPacketsCount := st.appStats.TotalPacketsCount currentPacketsCount := st.appStats.PacketsCount
st.totalPacketsCountMutex.Unlock() st.packetsCountMutex.Unlock()
return currentPacketsCount return currentPacketsCount
} }
func (st *StatsTracker) updateProcessedSize(size int64) { func (st *StatsTracker) incTcpPacketsCount() {
st.totalProcessedSizeMutex.Lock() st.tcpPacketsCountMutex.Lock()
st.appStats.TotalProcessedBytes += size st.appStats.TcpPacketsCount++
st.totalProcessedSizeMutex.Unlock() st.tcpPacketsCountMutex.Unlock()
}
func (st *StatsTracker) incReassembledTcpPayloadsCount() {
st.reassembledTcpPayloadsCountMutex.Lock()
st.appStats.ReassembledTcpPayloadsCount++
st.reassembledTcpPayloadsCountMutex.Unlock()
}
func (st *StatsTracker) incTlsConnectionsCount() {
st.tlsConnectionsCountMutex.Lock()
st.appStats.TlsConnectionsCount++
st.tlsConnectionsCountMutex.Unlock()
}
func (st *StatsTracker) updateProcessedBytes(size int64) {
st.processedBytesMutex.Lock()
st.appStats.ProcessedBytes += size
st.processedBytesMutex.Unlock()
} }
func (st *StatsTracker) setStartTime(startTime time.Time) { func (st *StatsTracker) setStartTime(startTime time.Time) {
st.appStats.StartTime = startTime st.appStats.StartTime = startTime
} }
func (st *StatsTracker) dumpStats() int { func (st *StatsTracker) dumpStats() *AppStats {
st.matchedMessagesMutex.Lock() currentAppStats := &AppStats{StartTime: st.appStats.StartTime}
matchedMessages := st.appStats.MatchedMessages
st.appStats.MatchedMessages = 0
st.matchedMessagesMutex.Unlock()
return matchedMessages st.processedBytesMutex.Lock()
currentAppStats.ProcessedBytes = st.appStats.ProcessedBytes
st.appStats.ProcessedBytes = 0
st.processedBytesMutex.Unlock()
st.packetsCountMutex.Lock()
currentAppStats.PacketsCount = st.appStats.PacketsCount
st.appStats.PacketsCount = 0
st.packetsCountMutex.Unlock()
st.tcpPacketsCountMutex.Lock()
currentAppStats.TcpPacketsCount = st.appStats.TcpPacketsCount
st.appStats.TcpPacketsCount = 0
st.tcpPacketsCountMutex.Unlock()
st.reassembledTcpPayloadsCountMutex.Lock()
currentAppStats.ReassembledTcpPayloadsCount = st.appStats.ReassembledTcpPayloadsCount
st.appStats.ReassembledTcpPayloadsCount = 0
st.reassembledTcpPayloadsCountMutex.Unlock()
st.tlsConnectionsCountMutex.Lock()
currentAppStats.TlsConnectionsCount = st.appStats.TlsConnectionsCount
st.appStats.TlsConnectionsCount = 0
st.tlsConnectionsCountMutex.Unlock()
st.matchedPairsMutex.Lock()
currentAppStats.MatchedPairs = st.appStats.MatchedPairs
st.appStats.MatchedPairs = 0
st.matchedPairsMutex.Unlock()
return currentAppStats
} }

View File

@ -148,6 +148,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
} }
// This is where we pass the reassembled information onwards // This is where we pass the reassembled information onwards
// This channel is read by an httpReader object // This channel is read by an httpReader object
statsTracker.incReassembledTcpPayloadsCount()
if dir == reassembly.TCPDirClientToServer && !t.reversed { if dir == reassembly.TCPDirClientToServer && !t.reversed {
t.client.msgQueue <- httpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp} t.client.msgQueue <- httpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp}
} else { } else {