mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-07-06 04:48:40 +00:00
Move stats_tracker.go
into the extension API and increment MatchedPairs
from inside the Emit
method (#272)
* Move `stats_tracker.go` into the extension API and increment `MatchedPairs` from inside the `Emit` method * Replace multiple `sync.Mutex`(es) with low-level atomic memory primitives
This commit is contained in:
parent
616eccb2cf
commit
7dca1ad889
@ -87,6 +87,7 @@ type Dissector interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Emitting struct {
|
type Emitting struct {
|
||||||
|
AppStats *AppStats
|
||||||
OutputChannel chan *OutputChannelItem
|
OutputChannel chan *OutputChannelItem
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -96,6 +97,7 @@ type Emitter interface {
|
|||||||
|
|
||||||
func (e *Emitting) Emit(item *OutputChannelItem) {
|
func (e *Emitting) Emit(item *OutputChannelItem) {
|
||||||
e.OutputChannel <- item
|
e.OutputChannel <- item
|
||||||
|
e.AppStats.IncMatchedPairs()
|
||||||
}
|
}
|
||||||
|
|
||||||
type MizuEntry struct {
|
type MizuEntry struct {
|
||||||
|
70
tap/api/stats_tracker.go
Normal file
70
tap/api/stats_tracker.go
Normal file
@ -0,0 +1,70 @@
|
|||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type AppStats struct {
|
||||||
|
StartTime time.Time `json:"-"`
|
||||||
|
ProcessedBytes uint64 `json:"processedBytes"`
|
||||||
|
PacketsCount uint64 `json:"packetsCount"`
|
||||||
|
TcpPacketsCount uint64 `json:"tcpPacketsCount"`
|
||||||
|
ReassembledTcpPayloadsCount uint64 `json:"reassembledTcpPayloadsCount"`
|
||||||
|
TlsConnectionsCount uint64 `json:"tlsConnectionsCount"`
|
||||||
|
MatchedPairs uint64 `json:"matchedPairs"`
|
||||||
|
DroppedTcpStreams uint64 `json:"droppedTcpStreams"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (as *AppStats) IncMatchedPairs() {
|
||||||
|
atomic.AddUint64(&as.MatchedPairs, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (as *AppStats) IncDroppedTcpStreams() {
|
||||||
|
atomic.AddUint64(&as.DroppedTcpStreams, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (as *AppStats) IncPacketsCount() uint64 {
|
||||||
|
atomic.AddUint64(&as.PacketsCount, 1)
|
||||||
|
return as.PacketsCount
|
||||||
|
}
|
||||||
|
|
||||||
|
func (as *AppStats) IncTcpPacketsCount() {
|
||||||
|
atomic.AddUint64(&as.TcpPacketsCount, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (as *AppStats) IncReassembledTcpPayloadsCount() {
|
||||||
|
atomic.AddUint64(&as.ReassembledTcpPayloadsCount, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (as *AppStats) IncTlsConnectionsCount() {
|
||||||
|
atomic.AddUint64(&as.TlsConnectionsCount, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (as *AppStats) UpdateProcessedBytes(size uint64) {
|
||||||
|
atomic.AddUint64(&as.ProcessedBytes, size)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (as *AppStats) SetStartTime(startTime time.Time) {
|
||||||
|
as.StartTime = startTime
|
||||||
|
}
|
||||||
|
|
||||||
|
func (as *AppStats) DumpStats() *AppStats {
|
||||||
|
currentAppStats := &AppStats{StartTime: as.StartTime}
|
||||||
|
|
||||||
|
currentAppStats.ProcessedBytes = resetUint64(&as.ProcessedBytes)
|
||||||
|
currentAppStats.PacketsCount = resetUint64(&as.PacketsCount)
|
||||||
|
currentAppStats.TcpPacketsCount = resetUint64(&as.TcpPacketsCount)
|
||||||
|
currentAppStats.ReassembledTcpPayloadsCount = resetUint64(&as.ReassembledTcpPayloadsCount)
|
||||||
|
currentAppStats.TlsConnectionsCount = resetUint64(&as.TlsConnectionsCount)
|
||||||
|
currentAppStats.MatchedPairs = resetUint64(&as.MatchedPairs)
|
||||||
|
currentAppStats.DroppedTcpStreams = resetUint64(&as.DroppedTcpStreams)
|
||||||
|
|
||||||
|
return currentAppStats
|
||||||
|
}
|
||||||
|
|
||||||
|
func resetUint64(ref *uint64) (val uint64) {
|
||||||
|
val = atomic.LoadUint64(ref)
|
||||||
|
atomic.StoreUint64(ref, 0)
|
||||||
|
return
|
||||||
|
}
|
@ -63,7 +63,7 @@ var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to k
|
|||||||
|
|
||||||
var memprofile = flag.String("memprofile", "", "Write memory profile")
|
var memprofile = flag.String("memprofile", "", "Write memory profile")
|
||||||
|
|
||||||
var statsTracker = StatsTracker{}
|
var appStats = api.AppStats{}
|
||||||
|
|
||||||
// global
|
// global
|
||||||
var stats struct {
|
var stats struct {
|
||||||
@ -152,8 +152,8 @@ type Context struct {
|
|||||||
CaptureInfo gopacket.CaptureInfo
|
CaptureInfo gopacket.CaptureInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetStats() AppStats {
|
func GetStats() api.AppStats {
|
||||||
return statsTracker.appStats
|
return appStats
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Context) GetCaptureInfo() gopacket.CaptureInfo {
|
func (c *Context) GetCaptureInfo() gopacket.CaptureInfo {
|
||||||
@ -225,8 +225,8 @@ func closeTimedoutTcpStreamChannels() {
|
|||||||
if stream.superIdentifier.Protocol == nil {
|
if stream.superIdentifier.Protocol == nil {
|
||||||
if !stream.isClosed && time.Now().After(streamWrapper.createdAt.Add(TcpStreamChannelTimeoutMs)) {
|
if !stream.isClosed && time.Now().After(streamWrapper.createdAt.Add(TcpStreamChannelTimeoutMs)) {
|
||||||
stream.Close()
|
stream.Close()
|
||||||
statsTracker.incDroppedTcpStreams()
|
appStats.IncDroppedTcpStreams()
|
||||||
rlog.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d\n", statsTracker.appStats.DroppedTcpStreams, runtime.NumGoroutine(), TcpStreamChannelTimeoutMs/1000000)
|
rlog.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d\n", appStats.DroppedTcpStreams, runtime.NumGoroutine(), TcpStreamChannelTimeoutMs/1000000)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if !stream.superIdentifier.IsClosedOthers {
|
if !stream.superIdentifier.IsClosedOthers {
|
||||||
@ -328,10 +328,11 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
|||||||
source.Lazy = *lazy
|
source.Lazy = *lazy
|
||||||
source.NoCopy = true
|
source.NoCopy = true
|
||||||
rlog.Info("Starting to read packets")
|
rlog.Info("Starting to read packets")
|
||||||
statsTracker.setStartTime(time.Now())
|
appStats.SetStartTime(time.Now())
|
||||||
defragger := ip4defrag.NewIPv4Defragmenter()
|
defragger := ip4defrag.NewIPv4Defragmenter()
|
||||||
|
|
||||||
var emitter api.Emitter = &api.Emitting{
|
var emitter api.Emitter = &api.Emitting{
|
||||||
|
AppStats: &appStats,
|
||||||
OutputChannel: outputItems,
|
OutputChannel: outputItems,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -374,7 +375,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
|||||||
errorsSummery := fmt.Sprintf("%v", errorsMap)
|
errorsSummery := fmt.Sprintf("%v", errorsMap)
|
||||||
errorsMapMutex.Unlock()
|
errorsMapMutex.Unlock()
|
||||||
log.Printf("%v (errors: %v, errTypes:%v) - Errors Summary: %s",
|
log.Printf("%v (errors: %v, errTypes:%v) - Errors Summary: %s",
|
||||||
time.Since(statsTracker.appStats.StartTime),
|
time.Since(appStats.StartTime),
|
||||||
nErrors,
|
nErrors,
|
||||||
errorMapLen,
|
errorMapLen,
|
||||||
errorsSummery,
|
errorsSummery,
|
||||||
@ -397,7 +398,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
|||||||
cleanStats.closed,
|
cleanStats.closed,
|
||||||
cleanStats.deleted,
|
cleanStats.deleted,
|
||||||
)
|
)
|
||||||
currentAppStats := statsTracker.dumpStats()
|
currentAppStats := appStats.DumpStats()
|
||||||
appStatsJSON, _ := json.Marshal(currentAppStats)
|
appStatsJSON, _ := json.Marshal(currentAppStats)
|
||||||
log.Printf("app stats - %v", string(appStatsJSON))
|
log.Printf("app stats - %v", string(appStatsJSON))
|
||||||
}
|
}
|
||||||
@ -415,10 +416,10 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
|||||||
rlog.Debugf("Error:", err)
|
rlog.Debugf("Error:", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
packetsCount := statsTracker.incPacketsCount()
|
packetsCount := appStats.IncPacketsCount()
|
||||||
rlog.Debugf("PACKET #%d", packetsCount)
|
rlog.Debugf("PACKET #%d", packetsCount)
|
||||||
data := packet.Data()
|
data := packet.Data()
|
||||||
statsTracker.updateProcessedBytes(int64(len(data)))
|
appStats.UpdateProcessedBytes(uint64(len(data)))
|
||||||
if *hexdumppkt {
|
if *hexdumppkt {
|
||||||
rlog.Debugf("Packet content (%d/0x%x) - %s", len(data), len(data), hex.Dump(data))
|
rlog.Debugf("Packet content (%d/0x%x) - %s", len(data), len(data), hex.Dump(data))
|
||||||
}
|
}
|
||||||
@ -452,7 +453,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
|||||||
|
|
||||||
tcp := packet.Layer(layers.LayerTypeTCP)
|
tcp := packet.Layer(layers.LayerTypeTCP)
|
||||||
if tcp != nil {
|
if tcp != nil {
|
||||||
statsTracker.incTcpPacketsCount()
|
appStats.IncTcpPacketsCount()
|
||||||
tcp := tcp.(*layers.TCP)
|
tcp := tcp.(*layers.TCP)
|
||||||
if *checksum {
|
if *checksum {
|
||||||
err := tcp.SetNetworkLayerForChecksum(packet.NetworkLayer())
|
err := tcp.SetNetworkLayerForChecksum(packet.NetworkLayer())
|
||||||
@ -470,15 +471,15 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
|||||||
assemblerMutex.Unlock()
|
assemblerMutex.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
done := *maxcount > 0 && statsTracker.appStats.PacketsCount >= *maxcount
|
done := *maxcount > 0 && int64(appStats.PacketsCount) >= *maxcount
|
||||||
if done {
|
if done {
|
||||||
errorsMapMutex.Lock()
|
errorsMapMutex.Lock()
|
||||||
errorMapLen := len(errorsMap)
|
errorMapLen := len(errorsMap)
|
||||||
errorsMapMutex.Unlock()
|
errorsMapMutex.Unlock()
|
||||||
log.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)",
|
log.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)",
|
||||||
statsTracker.appStats.PacketsCount,
|
appStats.PacketsCount,
|
||||||
statsTracker.appStats.ProcessedBytes,
|
appStats.ProcessedBytes,
|
||||||
time.Since(statsTracker.appStats.StartTime),
|
time.Since(appStats.StartTime),
|
||||||
nErrors,
|
nErrors,
|
||||||
errorMapLen)
|
errorMapLen)
|
||||||
}
|
}
|
||||||
|
@ -1,117 +0,0 @@
|
|||||||
package tap
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
type AppStats struct {
|
|
||||||
StartTime time.Time `json:"-"`
|
|
||||||
ProcessedBytes int64 `json:"processedBytes"`
|
|
||||||
PacketsCount int64 `json:"packetsCount"`
|
|
||||||
TcpPacketsCount int64 `json:"tcpPacketsCount"`
|
|
||||||
ReassembledTcpPayloadsCount int64 `json:"reassembledTcpPayloadsCount"`
|
|
||||||
TlsConnectionsCount int64 `json:"tlsConnectionsCount"`
|
|
||||||
MatchedPairs int64 `json:"matchedPairs"`
|
|
||||||
DroppedTcpStreams int64 `json:"droppedTcpStreams"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type StatsTracker struct {
|
|
||||||
appStats AppStats
|
|
||||||
processedBytesMutex sync.Mutex
|
|
||||||
packetsCountMutex sync.Mutex
|
|
||||||
tcpPacketsCountMutex sync.Mutex
|
|
||||||
reassembledTcpPayloadsCountMutex sync.Mutex
|
|
||||||
tlsConnectionsCountMutex sync.Mutex
|
|
||||||
matchedPairsMutex sync.Mutex
|
|
||||||
droppedTcpStreamsMutex sync.Mutex
|
|
||||||
}
|
|
||||||
|
|
||||||
func (st *StatsTracker) incMatchedPairs() {
|
|
||||||
st.matchedPairsMutex.Lock()
|
|
||||||
st.appStats.MatchedPairs++
|
|
||||||
st.matchedPairsMutex.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (st *StatsTracker) incDroppedTcpStreams() {
|
|
||||||
st.droppedTcpStreamsMutex.Lock()
|
|
||||||
st.appStats.DroppedTcpStreams++
|
|
||||||
st.droppedTcpStreamsMutex.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (st *StatsTracker) incPacketsCount() int64 {
|
|
||||||
st.packetsCountMutex.Lock()
|
|
||||||
st.appStats.PacketsCount++
|
|
||||||
currentPacketsCount := st.appStats.PacketsCount
|
|
||||||
st.packetsCountMutex.Unlock()
|
|
||||||
return currentPacketsCount
|
|
||||||
}
|
|
||||||
|
|
||||||
func (st *StatsTracker) incTcpPacketsCount() {
|
|
||||||
st.tcpPacketsCountMutex.Lock()
|
|
||||||
st.appStats.TcpPacketsCount++
|
|
||||||
st.tcpPacketsCountMutex.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (st *StatsTracker) incReassembledTcpPayloadsCount() {
|
|
||||||
st.reassembledTcpPayloadsCountMutex.Lock()
|
|
||||||
st.appStats.ReassembledTcpPayloadsCount++
|
|
||||||
st.reassembledTcpPayloadsCountMutex.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (st *StatsTracker) incTlsConnectionsCount() {
|
|
||||||
st.tlsConnectionsCountMutex.Lock()
|
|
||||||
st.appStats.TlsConnectionsCount++
|
|
||||||
st.tlsConnectionsCountMutex.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (st *StatsTracker) updateProcessedBytes(size int64) {
|
|
||||||
st.processedBytesMutex.Lock()
|
|
||||||
st.appStats.ProcessedBytes += size
|
|
||||||
st.processedBytesMutex.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (st *StatsTracker) setStartTime(startTime time.Time) {
|
|
||||||
st.appStats.StartTime = startTime
|
|
||||||
}
|
|
||||||
|
|
||||||
func (st *StatsTracker) dumpStats() *AppStats {
|
|
||||||
currentAppStats := &AppStats{StartTime: st.appStats.StartTime}
|
|
||||||
|
|
||||||
st.processedBytesMutex.Lock()
|
|
||||||
currentAppStats.ProcessedBytes = st.appStats.ProcessedBytes
|
|
||||||
st.appStats.ProcessedBytes = 0
|
|
||||||
st.processedBytesMutex.Unlock()
|
|
||||||
|
|
||||||
st.packetsCountMutex.Lock()
|
|
||||||
currentAppStats.PacketsCount = st.appStats.PacketsCount
|
|
||||||
st.appStats.PacketsCount = 0
|
|
||||||
st.packetsCountMutex.Unlock()
|
|
||||||
|
|
||||||
st.tcpPacketsCountMutex.Lock()
|
|
||||||
currentAppStats.TcpPacketsCount = st.appStats.TcpPacketsCount
|
|
||||||
st.appStats.TcpPacketsCount = 0
|
|
||||||
st.tcpPacketsCountMutex.Unlock()
|
|
||||||
|
|
||||||
st.reassembledTcpPayloadsCountMutex.Lock()
|
|
||||||
currentAppStats.ReassembledTcpPayloadsCount = st.appStats.ReassembledTcpPayloadsCount
|
|
||||||
st.appStats.ReassembledTcpPayloadsCount = 0
|
|
||||||
st.reassembledTcpPayloadsCountMutex.Unlock()
|
|
||||||
|
|
||||||
st.tlsConnectionsCountMutex.Lock()
|
|
||||||
currentAppStats.TlsConnectionsCount = st.appStats.TlsConnectionsCount
|
|
||||||
st.appStats.TlsConnectionsCount = 0
|
|
||||||
st.tlsConnectionsCountMutex.Unlock()
|
|
||||||
|
|
||||||
st.matchedPairsMutex.Lock()
|
|
||||||
currentAppStats.MatchedPairs = st.appStats.MatchedPairs
|
|
||||||
st.appStats.MatchedPairs = 0
|
|
||||||
st.matchedPairsMutex.Unlock()
|
|
||||||
|
|
||||||
st.droppedTcpStreamsMutex.Lock()
|
|
||||||
currentAppStats.DroppedTcpStreams = st.appStats.DroppedTcpStreams
|
|
||||||
st.appStats.DroppedTcpStreams = 0
|
|
||||||
st.droppedTcpStreamsMutex.Unlock()
|
|
||||||
|
|
||||||
return currentAppStats
|
|
||||||
}
|
|
@ -147,7 +147,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
|
|||||||
if length > 0 {
|
if length > 0 {
|
||||||
// This is where we pass the reassembled information onwards
|
// This is where we pass the reassembled information onwards
|
||||||
// This channel is read by an tcpReader object
|
// This channel is read by an tcpReader object
|
||||||
statsTracker.incReassembledTcpPayloadsCount()
|
appStats.IncReassembledTcpPayloadsCount()
|
||||||
timestamp := ac.GetCaptureInfo().Timestamp
|
timestamp := ac.GetCaptureInfo().Timestamp
|
||||||
if dir == reassembly.TCPDirClientToServer {
|
if dir == reassembly.TCPDirClientToServer {
|
||||||
for i := range t.clients {
|
for i := range t.clients {
|
||||||
|
@ -62,8 +62,8 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
|
|||||||
}
|
}
|
||||||
if stream.isTapTarget {
|
if stream.isTapTarget {
|
||||||
if runtime.NumGoroutine() > maxNumberOfGoroutines {
|
if runtime.NumGoroutine() > maxNumberOfGoroutines {
|
||||||
statsTracker.incDroppedTcpStreams()
|
appStats.IncDroppedTcpStreams()
|
||||||
rlog.Debugf("Dropped a TCP stream because of load. Total dropped: %d Total Goroutines: %d\n", statsTracker.appStats.DroppedTcpStreams, runtime.NumGoroutine())
|
rlog.Debugf("Dropped a TCP stream because of load. Total dropped: %d Total Goroutines: %d\n", appStats.DroppedTcpStreams, runtime.NumGoroutine())
|
||||||
return stream
|
return stream
|
||||||
}
|
}
|
||||||
streamId++
|
streamId++
|
||||||
|
Loading…
Reference in New Issue
Block a user