mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-08-02 00:57:45 +00:00
Tapper stats in stats tracker (#166)
This commit is contained in:
parent
d18f1f8316
commit
06c8056443
@ -51,7 +51,7 @@ func parseAppPorts(appPortsList string) []int {
|
||||
return ports
|
||||
}
|
||||
|
||||
var maxcount = flag.Int("c", -1, "Only grab this many packets, then exit")
|
||||
var maxcount = flag.Int64("c", -1, "Only grab this many packets, then exit")
|
||||
var decoder = flag.String("decoder", "", "Name of the decoder to use (default: guess from capture)")
|
||||
var statsevery = flag.Int("stats", 60, "Output statistics every N seconds")
|
||||
var lazy = flag.Bool("lazy", false, "If true, do lazy decoding")
|
||||
@ -175,6 +175,10 @@ type Context struct {
|
||||
CaptureInfo gopacket.CaptureInfo
|
||||
}
|
||||
|
||||
func GetStats() AppStats {
|
||||
return statsTracker.appStats
|
||||
}
|
||||
|
||||
func (c *Context) GetCaptureInfo() gopacket.CaptureInfo {
|
||||
return c.CaptureInfo
|
||||
}
|
||||
@ -336,9 +340,7 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
|
||||
source.Lazy = *lazy
|
||||
source.NoCopy = true
|
||||
rlog.Info("Starting to read packets")
|
||||
count := 0
|
||||
bytes := int64(0)
|
||||
start := time.Now()
|
||||
statsTracker.setStartTime(time.Now())
|
||||
defragger := ip4defrag.NewIPv4Defragmenter()
|
||||
|
||||
streamFactory := &tcpStreamFactory{
|
||||
@ -383,9 +385,9 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
|
||||
errorsSummery := fmt.Sprintf("%v", errorsMap)
|
||||
errorsMapMutex.Unlock()
|
||||
log.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v) - Errors Summary: %s",
|
||||
count,
|
||||
bytes,
|
||||
time.Since(start),
|
||||
statsTracker.appStats.TotalPacketsCount,
|
||||
statsTracker.appStats.TotalProcessedBytes,
|
||||
time.Since(statsTracker.appStats.StartTime),
|
||||
nErrors,
|
||||
errorMapLen,
|
||||
errorsSummery,
|
||||
@ -403,13 +405,13 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
|
||||
|
||||
// Since the last print
|
||||
cleanStats := cleaner.dumpStats()
|
||||
appStats := statsTracker.dumpStats()
|
||||
matchedMessages := statsTracker.dumpStats()
|
||||
log.Printf(
|
||||
"flushed connections %d, closed connections: %d, deleted messages: %d, matched messages: %d",
|
||||
cleanStats.flushed,
|
||||
cleanStats.closed,
|
||||
cleanStats.deleted,
|
||||
appStats.matchedMessages,
|
||||
matchedMessages,
|
||||
)
|
||||
}
|
||||
}()
|
||||
@ -419,10 +421,10 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
|
||||
}
|
||||
|
||||
for packet := range source.Packets() {
|
||||
count++
|
||||
rlog.Debugf("PACKET #%d", count)
|
||||
packetsCount := statsTracker.incPacketsCount()
|
||||
rlog.Debugf("PACKET #%d", packetsCount)
|
||||
data := packet.Data()
|
||||
bytes += int64(len(data))
|
||||
statsTracker.updateProcessedSize(int64(len(data)))
|
||||
if *hexdumppkt {
|
||||
rlog.Debugf("Packet content (%d/0x%x) - %s", len(data), len(data), hex.Dump(data))
|
||||
}
|
||||
@ -473,12 +475,17 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
|
||||
assemblerMutex.Unlock()
|
||||
}
|
||||
|
||||
done := *maxcount > 0 && count >= *maxcount
|
||||
done := *maxcount > 0 && statsTracker.appStats.TotalPacketsCount >= *maxcount
|
||||
if done {
|
||||
errorsMapMutex.Lock()
|
||||
errorMapLen := len(errorsMap)
|
||||
errorsMapMutex.Unlock()
|
||||
log.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)", count, bytes, time.Since(start), nErrors, errorMapLen)
|
||||
log.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)",
|
||||
statsTracker.appStats.TotalPacketsCount,
|
||||
statsTracker.appStats.TotalProcessedBytes,
|
||||
time.Since(statsTracker.appStats.StartTime),
|
||||
nErrors,
|
||||
errorMapLen)
|
||||
}
|
||||
select {
|
||||
case <-signalChan:
|
||||
@ -535,4 +542,5 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
|
||||
for e := range errorsMap {
|
||||
log.Printf(" %s:\t\t%d", e, errorsMap[e])
|
||||
}
|
||||
log.Printf("AppStats: %v", GetStats())
|
||||
}
|
||||
|
@ -2,34 +2,54 @@ package tap
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type AppStats struct {
|
||||
matchedMessages int
|
||||
StartTime time.Time `json:"startTime"`
|
||||
MatchedMessages int `json:"matchedMessages"`
|
||||
TotalPacketsCount int64 `json:"totalPacketsCount"`
|
||||
TotalProcessedBytes int64 `json:"totalProcessedBytes"`
|
||||
TotalMatchedMessages int64 `json:"totalMatchedMessages"`
|
||||
}
|
||||
|
||||
type StatsTracker struct {
|
||||
stats AppStats
|
||||
statsMutex sync.Mutex
|
||||
appStats AppStats
|
||||
matchedMessagesMutex sync.Mutex
|
||||
totalPacketsCountMutex sync.Mutex
|
||||
totalProcessedSizeMutex sync.Mutex
|
||||
}
|
||||
|
||||
func (st *StatsTracker) incMatchedMessages() {
|
||||
st.statsMutex.Lock()
|
||||
st.stats.matchedMessages++
|
||||
st.statsMutex.Unlock()
|
||||
st.matchedMessagesMutex.Lock()
|
||||
st.appStats.MatchedMessages++
|
||||
st.appStats.TotalMatchedMessages++
|
||||
st.matchedMessagesMutex.Unlock()
|
||||
}
|
||||
|
||||
func (st *StatsTracker) dumpStats() AppStats {
|
||||
st.statsMutex.Lock()
|
||||
|
||||
stats := AppStats{
|
||||
matchedMessages: st.stats.matchedMessages,
|
||||
}
|
||||
|
||||
st.stats.matchedMessages = 0
|
||||
|
||||
st.statsMutex.Unlock()
|
||||
|
||||
return stats
|
||||
func (st *StatsTracker) incPacketsCount() int64 {
|
||||
st.totalPacketsCountMutex.Lock()
|
||||
st.appStats.TotalPacketsCount++
|
||||
currentPacketsCount := st.appStats.TotalPacketsCount
|
||||
st.totalPacketsCountMutex.Unlock()
|
||||
return currentPacketsCount
|
||||
}
|
||||
|
||||
func (st *StatsTracker) updateProcessedSize(size int64) {
|
||||
st.totalProcessedSizeMutex.Lock()
|
||||
st.appStats.TotalProcessedBytes += size
|
||||
st.totalProcessedSizeMutex.Unlock()
|
||||
}
|
||||
|
||||
func (st *StatsTracker) setStartTime(startTime time.Time) {
|
||||
st.appStats.StartTime = startTime
|
||||
}
|
||||
|
||||
func (st *StatsTracker) dumpStats() int {
|
||||
st.matchedMessagesMutex.Lock()
|
||||
matchedMessages := st.appStats.MatchedMessages
|
||||
st.appStats.MatchedMessages = 0
|
||||
st.matchedMessagesMutex.Unlock()
|
||||
|
||||
return matchedMessages
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user