mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-02-26 00:24:17 +00:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d5b01347df | ||
|
|
7dca1ad889 | ||
|
|
616eccb2cf |
@@ -379,13 +379,28 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
||||
|
||||
for {
|
||||
select {
|
||||
case pod := <-added:
|
||||
case pod, ok := <-added:
|
||||
if !ok {
|
||||
added = nil
|
||||
continue
|
||||
}
|
||||
|
||||
logger.Log.Debugf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace)
|
||||
restartTappersDebouncer.SetOn()
|
||||
case pod := <-removed:
|
||||
case pod, ok := <-removed:
|
||||
if !ok {
|
||||
removed = nil
|
||||
continue
|
||||
}
|
||||
|
||||
logger.Log.Debugf("Removed matching pod %s, ns: %s", pod.Name, pod.Namespace)
|
||||
restartTappersDebouncer.SetOn()
|
||||
case pod := <-modified:
|
||||
case pod, ok := <-modified:
|
||||
if !ok {
|
||||
modified = nil
|
||||
continue
|
||||
}
|
||||
|
||||
logger.Log.Debugf("Modified matching pod %s, ns: %s, phase: %s, ip: %s", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP)
|
||||
// Act only if the modified pod has already obtained an IP address.
|
||||
// After filtering for IPs, on a normal pod restart this includes the following events:
|
||||
@@ -396,8 +411,12 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
||||
if pod.Status.PodIP != "" {
|
||||
restartTappersDebouncer.SetOn()
|
||||
}
|
||||
case err, ok := <-errorChan:
|
||||
if !ok {
|
||||
errorChan = nil
|
||||
continue
|
||||
}
|
||||
|
||||
case err := <-errorChan:
|
||||
logger.Log.Debugf("Watching pods loop, got error %v, stopping `restart tappers debouncer`", err)
|
||||
restartTappersDebouncer.Cancel()
|
||||
// TODO: Does this also perform cleanup?
|
||||
@@ -477,21 +496,28 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
|
||||
timeAfter := time.After(25 * time.Second)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Log.Debugf("Watching API Server pod loop, ctx done")
|
||||
return
|
||||
case <-added:
|
||||
case _, ok := <-added:
|
||||
if !ok {
|
||||
added = nil
|
||||
continue
|
||||
}
|
||||
|
||||
logger.Log.Debugf("Watching API Server pod loop, added")
|
||||
continue
|
||||
case <-removed:
|
||||
case _, ok := <-removed:
|
||||
if !ok {
|
||||
removed = nil
|
||||
continue
|
||||
}
|
||||
|
||||
logger.Log.Infof("%s removed", mizu.ApiServerPodName)
|
||||
cancel()
|
||||
return
|
||||
case modifiedPod := <-modified:
|
||||
if modifiedPod == nil {
|
||||
logger.Log.Debugf("Watching API Server pod loop, modifiedPod with nil")
|
||||
case modifiedPod, ok := <-modified:
|
||||
if !ok {
|
||||
modified = nil
|
||||
continue
|
||||
}
|
||||
|
||||
logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase)
|
||||
if modifiedPod.Status.Phase == core.PodRunning && !isPodReady {
|
||||
isPodReady = true
|
||||
@@ -510,14 +536,23 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
|
||||
logger.Log.Debugf("[Error] failed update tapped pods %v", err)
|
||||
}
|
||||
}
|
||||
case _, ok := <-errorChan:
|
||||
if !ok {
|
||||
errorChan = nil
|
||||
continue
|
||||
}
|
||||
|
||||
logger.Log.Debugf("[ERROR] Agent creation, watching %v namespace", config.Config.MizuResourcesNamespace)
|
||||
cancel()
|
||||
|
||||
case <-timeAfter:
|
||||
if !isPodReady {
|
||||
logger.Log.Errorf(uiUtils.Error, "Mizu API server was not ready in time")
|
||||
cancel()
|
||||
}
|
||||
case <-errorChan:
|
||||
logger.Log.Debugf("[ERROR] Agent creation, watching %v namespace", config.Config.MizuResourcesNamespace)
|
||||
cancel()
|
||||
case <-ctx.Done():
|
||||
logger.Log.Debugf("Watching API Server pod loop, ctx done")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,6 +87,7 @@ type Dissector interface {
|
||||
}
|
||||
|
||||
type Emitting struct {
|
||||
AppStats *AppStats
|
||||
OutputChannel chan *OutputChannelItem
|
||||
}
|
||||
|
||||
@@ -96,32 +97,39 @@ type Emitter interface {
|
||||
|
||||
func (e *Emitting) Emit(item *OutputChannelItem) {
|
||||
e.OutputChannel <- item
|
||||
e.AppStats.IncMatchedPairs()
|
||||
}
|
||||
|
||||
type MizuEntry struct {
|
||||
ID uint `gorm:"primarykey"`
|
||||
CreatedAt time.Time
|
||||
UpdatedAt time.Time
|
||||
ProtocolName string `json:"protocolKey" gorm:"column:protocolKey"`
|
||||
ProtocolVersion string `json:"protocolVersion" gorm:"column:protocolVersion"`
|
||||
Entry string `json:"entry,omitempty" gorm:"column:entry"`
|
||||
EntryId string `json:"entryId" gorm:"column:entryId"`
|
||||
Url string `json:"url" gorm:"column:url"`
|
||||
Method string `json:"method" gorm:"column:method"`
|
||||
Status int `json:"status" gorm:"column:status"`
|
||||
RequestSenderIp string `json:"requestSenderIp" gorm:"column:requestSenderIp"`
|
||||
Service string `json:"service" gorm:"column:service"`
|
||||
Timestamp int64 `json:"timestamp" gorm:"column:timestamp"`
|
||||
ElapsedTime int64 `json:"elapsedTime" gorm:"column:elapsedTime"`
|
||||
Path string `json:"path" gorm:"column:path"`
|
||||
ResolvedSource string `json:"resolvedSource,omitempty" gorm:"column:resolvedSource"`
|
||||
ResolvedDestination string `json:"resolvedDestination,omitempty" gorm:"column:resolvedDestination"`
|
||||
SourceIp string `json:"sourceIp,omitempty" gorm:"column:sourceIp"`
|
||||
DestinationIp string `json:"destinationIp,omitempty" gorm:"column:destinationIp"`
|
||||
SourcePort string `json:"sourcePort,omitempty" gorm:"column:sourcePort"`
|
||||
DestinationPort string `json:"destinationPort,omitempty" gorm:"column:destinationPort"`
|
||||
IsOutgoing bool `json:"isOutgoing,omitempty" gorm:"column:isOutgoing"`
|
||||
EstimatedSizeBytes int `json:"-" gorm:"column:estimatedSizeBytes"`
|
||||
ID uint `gorm:"primarykey"`
|
||||
CreatedAt time.Time
|
||||
UpdatedAt time.Time
|
||||
ProtocolName string `json:"protocolName" gorm:"column:protocolName"`
|
||||
ProtocolLongName string `json:"protocolLongName" gorm:"column:protocolLongName"`
|
||||
ProtocolAbbreviation string `json:"protocolAbbreviation" gorm:"column:protocolVersion"`
|
||||
ProtocolVersion string `json:"protocolVersion" gorm:"column:protocolVersion"`
|
||||
ProtocolBackgroundColor string `json:"protocolBackgroundColor" gorm:"column:protocolBackgroundColor"`
|
||||
ProtocolForegroundColor string `json:"protocolForegroundColor" gorm:"column:protocolForegroundColor"`
|
||||
ProtocolFontSize int8 `json:"protocolFontSize" gorm:"column:protocolFontSize"`
|
||||
ProtocolReferenceLink string `json:"protocolReferenceLink" gorm:"column:protocolReferenceLink"`
|
||||
Entry string `json:"entry,omitempty" gorm:"column:entry"`
|
||||
EntryId string `json:"entryId" gorm:"column:entryId"`
|
||||
Url string `json:"url" gorm:"column:url"`
|
||||
Method string `json:"method" gorm:"column:method"`
|
||||
Status int `json:"status" gorm:"column:status"`
|
||||
RequestSenderIp string `json:"requestSenderIp" gorm:"column:requestSenderIp"`
|
||||
Service string `json:"service" gorm:"column:service"`
|
||||
Timestamp int64 `json:"timestamp" gorm:"column:timestamp"`
|
||||
ElapsedTime int64 `json:"elapsedTime" gorm:"column:elapsedTime"`
|
||||
Path string `json:"path" gorm:"column:path"`
|
||||
ResolvedSource string `json:"resolvedSource,omitempty" gorm:"column:resolvedSource"`
|
||||
ResolvedDestination string `json:"resolvedDestination,omitempty" gorm:"column:resolvedDestination"`
|
||||
SourceIp string `json:"sourceIp,omitempty" gorm:"column:sourceIp"`
|
||||
DestinationIp string `json:"destinationIp,omitempty" gorm:"column:destinationIp"`
|
||||
SourcePort string `json:"sourcePort,omitempty" gorm:"column:sourcePort"`
|
||||
DestinationPort string `json:"destinationPort,omitempty" gorm:"column:destinationPort"`
|
||||
IsOutgoing bool `json:"isOutgoing,omitempty" gorm:"column:isOutgoing"`
|
||||
EstimatedSizeBytes int `json:"-" gorm:"column:estimatedSizeBytes"`
|
||||
}
|
||||
|
||||
type MizuEntryWrapper struct {
|
||||
@@ -162,11 +170,19 @@ type DataUnmarshaler interface {
|
||||
}
|
||||
|
||||
func (bed *BaseEntryDetails) UnmarshalData(entry *MizuEntry) error {
|
||||
entryUrl := entry.Url
|
||||
service := entry.Service
|
||||
bed.Protocol = Protocol{
|
||||
Name: entry.ProtocolName,
|
||||
LongName: entry.ProtocolLongName,
|
||||
Abbreviation: entry.ProtocolAbbreviation,
|
||||
Version: entry.ProtocolVersion,
|
||||
BackgroundColor: entry.ProtocolBackgroundColor,
|
||||
ForegroundColor: entry.ProtocolForegroundColor,
|
||||
FontSize: entry.ProtocolFontSize,
|
||||
ReferenceLink: entry.ProtocolReferenceLink,
|
||||
}
|
||||
bed.Id = entry.EntryId
|
||||
bed.Url = entryUrl
|
||||
bed.Service = service
|
||||
bed.Url = entry.Url
|
||||
bed.Service = entry.Service
|
||||
bed.Summary = entry.Path
|
||||
bed.StatusCode = entry.Status
|
||||
bed.Method = entry.Method
|
||||
|
||||
70
tap/api/stats_tracker.go
Normal file
70
tap/api/stats_tracker.go
Normal file
@@ -0,0 +1,70 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
type AppStats struct {
|
||||
StartTime time.Time `json:"-"`
|
||||
ProcessedBytes uint64 `json:"processedBytes"`
|
||||
PacketsCount uint64 `json:"packetsCount"`
|
||||
TcpPacketsCount uint64 `json:"tcpPacketsCount"`
|
||||
ReassembledTcpPayloadsCount uint64 `json:"reassembledTcpPayloadsCount"`
|
||||
TlsConnectionsCount uint64 `json:"tlsConnectionsCount"`
|
||||
MatchedPairs uint64 `json:"matchedPairs"`
|
||||
DroppedTcpStreams uint64 `json:"droppedTcpStreams"`
|
||||
}
|
||||
|
||||
func (as *AppStats) IncMatchedPairs() {
|
||||
atomic.AddUint64(&as.MatchedPairs, 1)
|
||||
}
|
||||
|
||||
func (as *AppStats) IncDroppedTcpStreams() {
|
||||
atomic.AddUint64(&as.DroppedTcpStreams, 1)
|
||||
}
|
||||
|
||||
func (as *AppStats) IncPacketsCount() uint64 {
|
||||
atomic.AddUint64(&as.PacketsCount, 1)
|
||||
return as.PacketsCount
|
||||
}
|
||||
|
||||
func (as *AppStats) IncTcpPacketsCount() {
|
||||
atomic.AddUint64(&as.TcpPacketsCount, 1)
|
||||
}
|
||||
|
||||
func (as *AppStats) IncReassembledTcpPayloadsCount() {
|
||||
atomic.AddUint64(&as.ReassembledTcpPayloadsCount, 1)
|
||||
}
|
||||
|
||||
func (as *AppStats) IncTlsConnectionsCount() {
|
||||
atomic.AddUint64(&as.TlsConnectionsCount, 1)
|
||||
}
|
||||
|
||||
func (as *AppStats) UpdateProcessedBytes(size uint64) {
|
||||
atomic.AddUint64(&as.ProcessedBytes, size)
|
||||
}
|
||||
|
||||
func (as *AppStats) SetStartTime(startTime time.Time) {
|
||||
as.StartTime = startTime
|
||||
}
|
||||
|
||||
func (as *AppStats) DumpStats() *AppStats {
|
||||
currentAppStats := &AppStats{StartTime: as.StartTime}
|
||||
|
||||
currentAppStats.ProcessedBytes = resetUint64(&as.ProcessedBytes)
|
||||
currentAppStats.PacketsCount = resetUint64(&as.PacketsCount)
|
||||
currentAppStats.TcpPacketsCount = resetUint64(&as.TcpPacketsCount)
|
||||
currentAppStats.ReassembledTcpPayloadsCount = resetUint64(&as.ReassembledTcpPayloadsCount)
|
||||
currentAppStats.TlsConnectionsCount = resetUint64(&as.TlsConnectionsCount)
|
||||
currentAppStats.MatchedPairs = resetUint64(&as.MatchedPairs)
|
||||
currentAppStats.DroppedTcpStreams = resetUint64(&as.DroppedTcpStreams)
|
||||
|
||||
return currentAppStats
|
||||
}
|
||||
|
||||
func resetUint64(ref *uint64) (val uint64) {
|
||||
val = atomic.LoadUint64(ref)
|
||||
atomic.StoreUint64(ref, 0)
|
||||
return
|
||||
}
|
||||
@@ -267,25 +267,31 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
|
||||
request["url"] = summary
|
||||
entryBytes, _ := json.Marshal(item.Pair)
|
||||
return &api.MizuEntry{
|
||||
ProtocolName: protocol.Name,
|
||||
ProtocolVersion: protocol.Version,
|
||||
EntryId: entryId,
|
||||
Entry: string(entryBytes),
|
||||
Url: fmt.Sprintf("%s%s", service, summary),
|
||||
Method: request["method"].(string),
|
||||
Status: 0,
|
||||
RequestSenderIp: item.ConnectionInfo.ClientIP,
|
||||
Service: service,
|
||||
Timestamp: item.Timestamp,
|
||||
ElapsedTime: 0,
|
||||
Path: summary,
|
||||
ResolvedSource: resolvedSource,
|
||||
ResolvedDestination: resolvedDestination,
|
||||
SourceIp: item.ConnectionInfo.ClientIP,
|
||||
DestinationIp: item.ConnectionInfo.ServerIP,
|
||||
SourcePort: item.ConnectionInfo.ClientPort,
|
||||
DestinationPort: item.ConnectionInfo.ServerPort,
|
||||
IsOutgoing: item.ConnectionInfo.IsOutgoing,
|
||||
ProtocolName: protocol.Name,
|
||||
ProtocolLongName: protocol.LongName,
|
||||
ProtocolAbbreviation: protocol.Abbreviation,
|
||||
ProtocolVersion: protocol.Version,
|
||||
ProtocolBackgroundColor: protocol.BackgroundColor,
|
||||
ProtocolForegroundColor: protocol.ForegroundColor,
|
||||
ProtocolFontSize: protocol.FontSize,
|
||||
ProtocolReferenceLink: protocol.ReferenceLink,
|
||||
EntryId: entryId,
|
||||
Entry: string(entryBytes),
|
||||
Url: fmt.Sprintf("%s%s", service, summary),
|
||||
Method: request["method"].(string),
|
||||
Status: 0,
|
||||
RequestSenderIp: item.ConnectionInfo.ClientIP,
|
||||
Service: service,
|
||||
Timestamp: item.Timestamp,
|
||||
ElapsedTime: 0,
|
||||
Path: summary,
|
||||
ResolvedSource: resolvedSource,
|
||||
ResolvedDestination: resolvedDestination,
|
||||
SourceIp: item.ConnectionInfo.ClientIP,
|
||||
DestinationIp: item.ConnectionInfo.ServerIP,
|
||||
SourcePort: item.ConnectionInfo.ClientPort,
|
||||
DestinationPort: item.ConnectionInfo.ServerPort,
|
||||
IsOutgoing: item.ConnectionInfo.IsOutgoing,
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -172,25 +172,31 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
|
||||
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
|
||||
entryBytes, _ := json.Marshal(item.Pair)
|
||||
return &api.MizuEntry{
|
||||
ProtocolName: protocol.Name,
|
||||
ProtocolVersion: item.Protocol.Version,
|
||||
EntryId: entryId,
|
||||
Entry: string(entryBytes),
|
||||
Url: fmt.Sprintf("%s%s", service, path),
|
||||
Method: reqDetails["method"].(string),
|
||||
Status: int(resDetails["status"].(float64)),
|
||||
RequestSenderIp: item.ConnectionInfo.ClientIP,
|
||||
Service: service,
|
||||
Timestamp: item.Timestamp,
|
||||
ElapsedTime: elapsedTime,
|
||||
Path: path,
|
||||
ResolvedSource: resolvedSource,
|
||||
ResolvedDestination: resolvedDestination,
|
||||
SourceIp: item.ConnectionInfo.ClientIP,
|
||||
DestinationIp: item.ConnectionInfo.ServerIP,
|
||||
SourcePort: item.ConnectionInfo.ClientPort,
|
||||
DestinationPort: item.ConnectionInfo.ServerPort,
|
||||
IsOutgoing: item.ConnectionInfo.IsOutgoing,
|
||||
ProtocolName: protocol.Name,
|
||||
ProtocolLongName: protocol.LongName,
|
||||
ProtocolAbbreviation: protocol.Abbreviation,
|
||||
ProtocolVersion: item.Protocol.Version,
|
||||
ProtocolBackgroundColor: protocol.BackgroundColor,
|
||||
ProtocolForegroundColor: protocol.ForegroundColor,
|
||||
ProtocolFontSize: protocol.FontSize,
|
||||
ProtocolReferenceLink: protocol.ReferenceLink,
|
||||
EntryId: entryId,
|
||||
Entry: string(entryBytes),
|
||||
Url: fmt.Sprintf("%s%s", service, path),
|
||||
Method: reqDetails["method"].(string),
|
||||
Status: int(resDetails["status"].(float64)),
|
||||
RequestSenderIp: item.ConnectionInfo.ClientIP,
|
||||
Service: service,
|
||||
Timestamp: item.Timestamp,
|
||||
ElapsedTime: elapsedTime,
|
||||
Path: path,
|
||||
ResolvedSource: resolvedSource,
|
||||
ResolvedDestination: resolvedDestination,
|
||||
SourceIp: item.ConnectionInfo.ClientIP,
|
||||
DestinationIp: item.ConnectionInfo.ServerIP,
|
||||
SourcePort: item.ConnectionInfo.ClientPort,
|
||||
DestinationPort: item.ConnectionInfo.ServerPort,
|
||||
IsOutgoing: item.ConnectionInfo.IsOutgoing,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -142,25 +142,31 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
|
||||
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
|
||||
entryBytes, _ := json.Marshal(item.Pair)
|
||||
return &api.MizuEntry{
|
||||
ProtocolName: _protocol.Name,
|
||||
ProtocolVersion: _protocol.Version,
|
||||
EntryId: entryId,
|
||||
Entry: string(entryBytes),
|
||||
Url: fmt.Sprintf("%s%s", service, summary),
|
||||
Method: apiNames[apiKey],
|
||||
Status: 0,
|
||||
RequestSenderIp: item.ConnectionInfo.ClientIP,
|
||||
Service: service,
|
||||
Timestamp: item.Timestamp,
|
||||
ElapsedTime: elapsedTime,
|
||||
Path: summary,
|
||||
ResolvedSource: resolvedSource,
|
||||
ResolvedDestination: resolvedDestination,
|
||||
SourceIp: item.ConnectionInfo.ClientIP,
|
||||
DestinationIp: item.ConnectionInfo.ServerIP,
|
||||
SourcePort: item.ConnectionInfo.ClientPort,
|
||||
DestinationPort: item.ConnectionInfo.ServerPort,
|
||||
IsOutgoing: item.ConnectionInfo.IsOutgoing,
|
||||
ProtocolName: _protocol.Name,
|
||||
ProtocolLongName: _protocol.LongName,
|
||||
ProtocolAbbreviation: _protocol.Abbreviation,
|
||||
ProtocolVersion: _protocol.Version,
|
||||
ProtocolBackgroundColor: _protocol.BackgroundColor,
|
||||
ProtocolForegroundColor: _protocol.ForegroundColor,
|
||||
ProtocolFontSize: _protocol.FontSize,
|
||||
ProtocolReferenceLink: _protocol.ReferenceLink,
|
||||
EntryId: entryId,
|
||||
Entry: string(entryBytes),
|
||||
Url: fmt.Sprintf("%s%s", service, summary),
|
||||
Method: apiNames[apiKey],
|
||||
Status: 0,
|
||||
RequestSenderIp: item.ConnectionInfo.ClientIP,
|
||||
Service: service,
|
||||
Timestamp: item.Timestamp,
|
||||
ElapsedTime: elapsedTime,
|
||||
Path: summary,
|
||||
ResolvedSource: resolvedSource,
|
||||
ResolvedDestination: resolvedDestination,
|
||||
SourceIp: item.ConnectionInfo.ClientIP,
|
||||
DestinationIp: item.ConnectionInfo.ServerIP,
|
||||
SourcePort: item.ConnectionInfo.ClientPort,
|
||||
DestinationPort: item.ConnectionInfo.ServerPort,
|
||||
IsOutgoing: item.ConnectionInfo.IsOutgoing,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -63,7 +63,7 @@ var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to k
|
||||
|
||||
var memprofile = flag.String("memprofile", "", "Write memory profile")
|
||||
|
||||
var statsTracker = StatsTracker{}
|
||||
var appStats = api.AppStats{}
|
||||
|
||||
// global
|
||||
var stats struct {
|
||||
@@ -152,8 +152,8 @@ type Context struct {
|
||||
CaptureInfo gopacket.CaptureInfo
|
||||
}
|
||||
|
||||
func GetStats() AppStats {
|
||||
return statsTracker.appStats
|
||||
func GetStats() api.AppStats {
|
||||
return appStats
|
||||
}
|
||||
|
||||
func (c *Context) GetCaptureInfo() gopacket.CaptureInfo {
|
||||
@@ -225,8 +225,8 @@ func closeTimedoutTcpStreamChannels() {
|
||||
if stream.superIdentifier.Protocol == nil {
|
||||
if !stream.isClosed && time.Now().After(streamWrapper.createdAt.Add(TcpStreamChannelTimeoutMs)) {
|
||||
stream.Close()
|
||||
statsTracker.incDroppedTcpStreams()
|
||||
rlog.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d\n", statsTracker.appStats.DroppedTcpStreams, runtime.NumGoroutine(), TcpStreamChannelTimeoutMs/1000000)
|
||||
appStats.IncDroppedTcpStreams()
|
||||
rlog.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d\n", appStats.DroppedTcpStreams, runtime.NumGoroutine(), TcpStreamChannelTimeoutMs/1000000)
|
||||
}
|
||||
} else {
|
||||
if !stream.superIdentifier.IsClosedOthers {
|
||||
@@ -328,10 +328,11 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
||||
source.Lazy = *lazy
|
||||
source.NoCopy = true
|
||||
rlog.Info("Starting to read packets")
|
||||
statsTracker.setStartTime(time.Now())
|
||||
appStats.SetStartTime(time.Now())
|
||||
defragger := ip4defrag.NewIPv4Defragmenter()
|
||||
|
||||
var emitter api.Emitter = &api.Emitting{
|
||||
AppStats: &appStats,
|
||||
OutputChannel: outputItems,
|
||||
}
|
||||
|
||||
@@ -374,7 +375,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
||||
errorsSummery := fmt.Sprintf("%v", errorsMap)
|
||||
errorsMapMutex.Unlock()
|
||||
log.Printf("%v (errors: %v, errTypes:%v) - Errors Summary: %s",
|
||||
time.Since(statsTracker.appStats.StartTime),
|
||||
time.Since(appStats.StartTime),
|
||||
nErrors,
|
||||
errorMapLen,
|
||||
errorsSummery,
|
||||
@@ -397,7 +398,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
||||
cleanStats.closed,
|
||||
cleanStats.deleted,
|
||||
)
|
||||
currentAppStats := statsTracker.dumpStats()
|
||||
currentAppStats := appStats.DumpStats()
|
||||
appStatsJSON, _ := json.Marshal(currentAppStats)
|
||||
log.Printf("app stats - %v", string(appStatsJSON))
|
||||
}
|
||||
@@ -415,10 +416,10 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
||||
rlog.Debugf("Error:", err)
|
||||
continue
|
||||
}
|
||||
packetsCount := statsTracker.incPacketsCount()
|
||||
packetsCount := appStats.IncPacketsCount()
|
||||
rlog.Debugf("PACKET #%d", packetsCount)
|
||||
data := packet.Data()
|
||||
statsTracker.updateProcessedBytes(int64(len(data)))
|
||||
appStats.UpdateProcessedBytes(uint64(len(data)))
|
||||
if *hexdumppkt {
|
||||
rlog.Debugf("Packet content (%d/0x%x) - %s", len(data), len(data), hex.Dump(data))
|
||||
}
|
||||
@@ -452,7 +453,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
||||
|
||||
tcp := packet.Layer(layers.LayerTypeTCP)
|
||||
if tcp != nil {
|
||||
statsTracker.incTcpPacketsCount()
|
||||
appStats.IncTcpPacketsCount()
|
||||
tcp := tcp.(*layers.TCP)
|
||||
if *checksum {
|
||||
err := tcp.SetNetworkLayerForChecksum(packet.NetworkLayer())
|
||||
@@ -470,15 +471,15 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
||||
assemblerMutex.Unlock()
|
||||
}
|
||||
|
||||
done := *maxcount > 0 && statsTracker.appStats.PacketsCount >= *maxcount
|
||||
done := *maxcount > 0 && int64(appStats.PacketsCount) >= *maxcount
|
||||
if done {
|
||||
errorsMapMutex.Lock()
|
||||
errorMapLen := len(errorsMap)
|
||||
errorsMapMutex.Unlock()
|
||||
log.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)",
|
||||
statsTracker.appStats.PacketsCount,
|
||||
statsTracker.appStats.ProcessedBytes,
|
||||
time.Since(statsTracker.appStats.StartTime),
|
||||
appStats.PacketsCount,
|
||||
appStats.ProcessedBytes,
|
||||
time.Since(appStats.StartTime),
|
||||
nErrors,
|
||||
errorMapLen)
|
||||
}
|
||||
|
||||
@@ -1,117 +0,0 @@
|
||||
package tap
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type AppStats struct {
|
||||
StartTime time.Time `json:"-"`
|
||||
ProcessedBytes int64 `json:"processedBytes"`
|
||||
PacketsCount int64 `json:"packetsCount"`
|
||||
TcpPacketsCount int64 `json:"tcpPacketsCount"`
|
||||
ReassembledTcpPayloadsCount int64 `json:"reassembledTcpPayloadsCount"`
|
||||
TlsConnectionsCount int64 `json:"tlsConnectionsCount"`
|
||||
MatchedPairs int64 `json:"matchedPairs"`
|
||||
DroppedTcpStreams int64 `json:"droppedTcpStreams"`
|
||||
}
|
||||
|
||||
type StatsTracker struct {
|
||||
appStats AppStats
|
||||
processedBytesMutex sync.Mutex
|
||||
packetsCountMutex sync.Mutex
|
||||
tcpPacketsCountMutex sync.Mutex
|
||||
reassembledTcpPayloadsCountMutex sync.Mutex
|
||||
tlsConnectionsCountMutex sync.Mutex
|
||||
matchedPairsMutex sync.Mutex
|
||||
droppedTcpStreamsMutex sync.Mutex
|
||||
}
|
||||
|
||||
func (st *StatsTracker) incMatchedPairs() {
|
||||
st.matchedPairsMutex.Lock()
|
||||
st.appStats.MatchedPairs++
|
||||
st.matchedPairsMutex.Unlock()
|
||||
}
|
||||
|
||||
func (st *StatsTracker) incDroppedTcpStreams() {
|
||||
st.droppedTcpStreamsMutex.Lock()
|
||||
st.appStats.DroppedTcpStreams++
|
||||
st.droppedTcpStreamsMutex.Unlock()
|
||||
}
|
||||
|
||||
func (st *StatsTracker) incPacketsCount() int64 {
|
||||
st.packetsCountMutex.Lock()
|
||||
st.appStats.PacketsCount++
|
||||
currentPacketsCount := st.appStats.PacketsCount
|
||||
st.packetsCountMutex.Unlock()
|
||||
return currentPacketsCount
|
||||
}
|
||||
|
||||
func (st *StatsTracker) incTcpPacketsCount() {
|
||||
st.tcpPacketsCountMutex.Lock()
|
||||
st.appStats.TcpPacketsCount++
|
||||
st.tcpPacketsCountMutex.Unlock()
|
||||
}
|
||||
|
||||
func (st *StatsTracker) incReassembledTcpPayloadsCount() {
|
||||
st.reassembledTcpPayloadsCountMutex.Lock()
|
||||
st.appStats.ReassembledTcpPayloadsCount++
|
||||
st.reassembledTcpPayloadsCountMutex.Unlock()
|
||||
}
|
||||
|
||||
func (st *StatsTracker) incTlsConnectionsCount() {
|
||||
st.tlsConnectionsCountMutex.Lock()
|
||||
st.appStats.TlsConnectionsCount++
|
||||
st.tlsConnectionsCountMutex.Unlock()
|
||||
}
|
||||
|
||||
func (st *StatsTracker) updateProcessedBytes(size int64) {
|
||||
st.processedBytesMutex.Lock()
|
||||
st.appStats.ProcessedBytes += size
|
||||
st.processedBytesMutex.Unlock()
|
||||
}
|
||||
|
||||
func (st *StatsTracker) setStartTime(startTime time.Time) {
|
||||
st.appStats.StartTime = startTime
|
||||
}
|
||||
|
||||
func (st *StatsTracker) dumpStats() *AppStats {
|
||||
currentAppStats := &AppStats{StartTime: st.appStats.StartTime}
|
||||
|
||||
st.processedBytesMutex.Lock()
|
||||
currentAppStats.ProcessedBytes = st.appStats.ProcessedBytes
|
||||
st.appStats.ProcessedBytes = 0
|
||||
st.processedBytesMutex.Unlock()
|
||||
|
||||
st.packetsCountMutex.Lock()
|
||||
currentAppStats.PacketsCount = st.appStats.PacketsCount
|
||||
st.appStats.PacketsCount = 0
|
||||
st.packetsCountMutex.Unlock()
|
||||
|
||||
st.tcpPacketsCountMutex.Lock()
|
||||
currentAppStats.TcpPacketsCount = st.appStats.TcpPacketsCount
|
||||
st.appStats.TcpPacketsCount = 0
|
||||
st.tcpPacketsCountMutex.Unlock()
|
||||
|
||||
st.reassembledTcpPayloadsCountMutex.Lock()
|
||||
currentAppStats.ReassembledTcpPayloadsCount = st.appStats.ReassembledTcpPayloadsCount
|
||||
st.appStats.ReassembledTcpPayloadsCount = 0
|
||||
st.reassembledTcpPayloadsCountMutex.Unlock()
|
||||
|
||||
st.tlsConnectionsCountMutex.Lock()
|
||||
currentAppStats.TlsConnectionsCount = st.appStats.TlsConnectionsCount
|
||||
st.appStats.TlsConnectionsCount = 0
|
||||
st.tlsConnectionsCountMutex.Unlock()
|
||||
|
||||
st.matchedPairsMutex.Lock()
|
||||
currentAppStats.MatchedPairs = st.appStats.MatchedPairs
|
||||
st.appStats.MatchedPairs = 0
|
||||
st.matchedPairsMutex.Unlock()
|
||||
|
||||
st.droppedTcpStreamsMutex.Lock()
|
||||
currentAppStats.DroppedTcpStreams = st.appStats.DroppedTcpStreams
|
||||
st.appStats.DroppedTcpStreams = 0
|
||||
st.droppedTcpStreamsMutex.Unlock()
|
||||
|
||||
return currentAppStats
|
||||
}
|
||||
@@ -147,7 +147,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
|
||||
if length > 0 {
|
||||
// This is where we pass the reassembled information onwards
|
||||
// This channel is read by an tcpReader object
|
||||
statsTracker.incReassembledTcpPayloadsCount()
|
||||
appStats.IncReassembledTcpPayloadsCount()
|
||||
timestamp := ac.GetCaptureInfo().Timestamp
|
||||
if dir == reassembly.TCPDirClientToServer {
|
||||
for i := range t.clients {
|
||||
|
||||
@@ -62,8 +62,8 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
|
||||
}
|
||||
if stream.isTapTarget {
|
||||
if runtime.NumGoroutine() > maxNumberOfGoroutines {
|
||||
statsTracker.incDroppedTcpStreams()
|
||||
rlog.Debugf("Dropped a TCP stream because of load. Total dropped: %d Total Goroutines: %d\n", statsTracker.appStats.DroppedTcpStreams, runtime.NumGoroutine())
|
||||
appStats.IncDroppedTcpStreams()
|
||||
rlog.Debugf("Dropped a TCP stream because of load. Total dropped: %d Total Goroutines: %d\n", appStats.DroppedTcpStreams, runtime.NumGoroutine())
|
||||
return stream
|
||||
}
|
||||
streamId++
|
||||
|
||||
22792
ui/package-lock.json
generated
22792
ui/package-lock.json
generated
File diff suppressed because it is too large
Load Diff
@@ -12,8 +12,8 @@
|
||||
"@types/node": "^12.20.10",
|
||||
"@types/react": "^17.0.3",
|
||||
"@types/react-dom": "^17.0.3",
|
||||
"jsonpath": "^1.1.1",
|
||||
"axios": "^0.21.1",
|
||||
"jsonpath": "^1.1.1",
|
||||
"node-sass": "^5.0.0",
|
||||
"numeral": "^2.0.6",
|
||||
"protobuf-decoder": "^0.1.0",
|
||||
@@ -21,7 +21,7 @@
|
||||
"react-copy-to-clipboard": "^5.0.3",
|
||||
"react-dom": "^17.0.2",
|
||||
"react-scripts": "4.0.3",
|
||||
"react-scrollable-feed": "^1.3.0",
|
||||
"react-scrollable-feed-virtualized": "^1.4.2",
|
||||
"react-syntax-highlighter": "^15.4.3",
|
||||
"typescript": "^4.2.4",
|
||||
"web-vitals": "^1.1.1"
|
||||
|
||||
@@ -2,7 +2,7 @@ import {EntryItem} from "./EntryListItem/EntryListItem";
|
||||
import React, {useCallback, useEffect, useMemo, useRef, useState} from "react";
|
||||
import styles from './style/EntriesList.module.sass';
|
||||
import spinner from './assets/spinner.svg';
|
||||
import ScrollableFeed from "react-scrollable-feed";
|
||||
import ScrollableFeedVirtualized from "react-scrollable-feed-virtualized";
|
||||
import {StatusType} from "./Filters";
|
||||
import Api from "../helpers/api";
|
||||
import down from "./assets/downImg.svg";
|
||||
@@ -77,9 +77,6 @@ export const EntriesList: React.FC<EntriesListProps> = ({entries, setEntries, fo
|
||||
}
|
||||
setIsLoadingTop(false);
|
||||
const newEntries = [...data, ...entries];
|
||||
if(newEntries.length >= 1000) {
|
||||
newEntries.splice(1000);
|
||||
}
|
||||
setEntries(newEntries);
|
||||
|
||||
if(scrollTo) {
|
||||
@@ -100,10 +97,6 @@ export const EntriesList: React.FC<EntriesListProps> = ({entries, setEntries, fo
|
||||
}
|
||||
scrollTo = document.getElementById(filteredEntries?.[filteredEntries.length -1]?.id);
|
||||
let newEntries = [...entries, ...data];
|
||||
if(newEntries.length >= 1000) {
|
||||
setNoMoreDataTop(false);
|
||||
newEntries = newEntries.slice(-1000);
|
||||
}
|
||||
setEntries(newEntries);
|
||||
if(scrollTo) {
|
||||
scrollTo.scrollIntoView({behavior: "smooth"});
|
||||
@@ -116,19 +109,20 @@ export const EntriesList: React.FC<EntriesListProps> = ({entries, setEntries, fo
|
||||
{isLoadingTop && <div className={styles.spinnerContainer}>
|
||||
<img alt="spinner" src={spinner} style={{height: 25}}/>
|
||||
</div>}
|
||||
<ScrollableFeed ref={scrollableRef} onScroll={(isAtBottom) => onScrollEvent(isAtBottom)}>
|
||||
<ScrollableFeedVirtualized ref={scrollableRef} itemHeight={48} marginTop={10} onScroll={(isAtBottom) => onScrollEvent(isAtBottom)}>
|
||||
{noMoreDataTop && !connectionOpen && <div id="noMoreDataTop" className={styles.noMoreDataAvailable}>No more data available</div>}
|
||||
{filteredEntries.map(entry => <EntryItem key={entry.id}
|
||||
entry={entry}
|
||||
setFocusedEntryId={setFocusedEntryId}
|
||||
isSelected={focusedEntryId === entry.id}/>)}
|
||||
{!connectionOpen && !noMoreDataBottom && <div className={styles.fetchButtonContainer}>
|
||||
<div className={styles.styledButton} onClick={() => getNewEntries()}>Fetch more entries</div>
|
||||
</div>}
|
||||
</ScrollableFeed>
|
||||
<button type="button"
|
||||
className={`${styles.btnLive} ${scrollableList ? styles.showButton : styles.hideButton}`}
|
||||
onClick={(_) => scrollableRef.current.scrollToBottom()}>
|
||||
isSelected={focusedEntryId === entry.id}
|
||||
style={{}}/>)}
|
||||
</ScrollableFeedVirtualized>
|
||||
{!connectionOpen && !noMoreDataBottom && <div className={styles.fetchButtonContainer}>
|
||||
<div className={styles.styledButton} onClick={() => getNewEntries()}>Fetch more entries</div>
|
||||
</div>}
|
||||
<button type="button"
|
||||
className={`${styles.btnLive} ${scrollableList ? styles.showButton : styles.hideButton}`}
|
||||
onClick={(_) => scrollableRef.current.jumpToBottom()}>
|
||||
<img alt="down" src={down} />
|
||||
</button>
|
||||
</div>
|
||||
|
||||
@@ -23,7 +23,7 @@ interface Entry {
|
||||
sourcePort: string,
|
||||
destinationIp: string,
|
||||
destinationPort: string,
|
||||
isOutgoing?: boolean;
|
||||
isOutgoing?: boolean;
|
||||
latency: number;
|
||||
rules: Rules;
|
||||
}
|
||||
@@ -38,9 +38,10 @@ interface EntryProps {
|
||||
entry: Entry;
|
||||
setFocusedEntryId: (id: string) => void;
|
||||
isSelected?: boolean;
|
||||
style: object;
|
||||
}
|
||||
|
||||
export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, isSelected}) => {
|
||||
export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, isSelected, style}) => {
|
||||
const classification = getClassification(entry.statusCode)
|
||||
let ingoingIcon;
|
||||
let outgoingIcon;
|
||||
@@ -103,7 +104,13 @@ export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, isSel
|
||||
className={`${styles.row}
|
||||
${isSelected ? styles.rowSelected : backgroundColor}`}
|
||||
onClick={() => setFocusedEntryId(entry.id)}
|
||||
style={{border: isSelected ? `1px ${entry.protocol.backgroundColor} solid` : "1px transparent solid"}}
|
||||
style={{
|
||||
border: isSelected ? `1px ${entry.protocol.backgroundColor} solid` : "1px transparent solid",
|
||||
position: "absolute",
|
||||
top: style['top'],
|
||||
marginTop: style['marginTop'],
|
||||
width: "calc(100% - 25px)",
|
||||
}}
|
||||
>
|
||||
<Protocol protocol={entry.protocol} horizontal={false}/>
|
||||
{((entry.protocol.name === "http" && "statusCode" in entry) || entry.statusCode !== 0) && <div>
|
||||
|
||||
@@ -86,10 +86,6 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
|
||||
}
|
||||
if (!focusedEntryId) setFocusedEntryId(entry.id)
|
||||
let newEntries = [...entries];
|
||||
if (entries.length === 1000) {
|
||||
newEntries = newEntries.splice(1);
|
||||
setNoMoreDataTop(false);
|
||||
}
|
||||
setEntries([...newEntries, entry])
|
||||
if(listEntry.current) {
|
||||
if(isScrollable(listEntry.current.firstChild)) {
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
flex-direction: column
|
||||
overflow: hidden
|
||||
flex-grow: 1
|
||||
padding-top: 20px
|
||||
|
||||
.footer
|
||||
display: flex
|
||||
|
||||
Reference in New Issue
Block a user