mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-09-07 21:40:44 +00:00
Merge traffic stats endpoints to one and add auto interval logic (#1174)
This commit is contained in:
@@ -2,6 +2,7 @@ package providers
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -26,7 +27,6 @@ type TimeFrameStatsValue struct {
|
||||
|
||||
type ProtocolStats struct {
|
||||
MethodsStats map[string]*SizeAndEntriesCount `json:"methods"`
|
||||
Color string `json:"color"`
|
||||
}
|
||||
|
||||
type SizeAndEntriesCount struct {
|
||||
@@ -51,46 +51,103 @@ type AccumulativeStatsProtocolTime struct {
|
||||
Time int64 `json:"timestamp"`
|
||||
}
|
||||
|
||||
type TrafficStatsResponse struct {
|
||||
PieStats []*AccumulativeStatsProtocol `json:"pie"`
|
||||
TimelineStats []*AccumulativeStatsProtocolTime `json:"timeline"`
|
||||
}
|
||||
|
||||
var (
|
||||
generalStats = GeneralStats{}
|
||||
bucketsStats = BucketStats{}
|
||||
bucketStatsLocker = sync.Mutex{}
|
||||
protocolToColor = map[string]string{}
|
||||
)
|
||||
|
||||
const (
|
||||
InternalBucketThreshold = time.Minute * 1
|
||||
MaxNumberOfBars = 30
|
||||
)
|
||||
|
||||
func ResetGeneralStats() {
|
||||
generalStats = GeneralStats{}
|
||||
}
|
||||
|
||||
func GetGeneralStats() GeneralStats {
|
||||
return generalStats
|
||||
func GetGeneralStats() *GeneralStats {
|
||||
return &generalStats
|
||||
}
|
||||
|
||||
func GetAccumulativeStats() []*AccumulativeStatsProtocol {
|
||||
bucketStatsCopy := getBucketStatsCopy()
|
||||
if len(bucketStatsCopy) == 0 {
|
||||
func InitProtocolToColor(protocolMap map[string]*api.Protocol) {
|
||||
for item, value := range protocolMap {
|
||||
protocolToColor[strings.Split(item, "/")[2]] = value.BackgroundColor
|
||||
}
|
||||
}
|
||||
|
||||
func GetTrafficStats() *TrafficStatsResponse {
|
||||
bucketsStatsCopy := getBucketStatsCopy()
|
||||
interval := calculateInterval(bucketsStatsCopy[0].BucketTime.Unix(), bucketsStatsCopy[len(bucketsStatsCopy)-1].BucketTime.Unix()) // in seconds
|
||||
|
||||
return &TrafficStatsResponse{
|
||||
PieStats: getAccumulativeStats(bucketsStatsCopy),
|
||||
TimelineStats: getAccumulativeStatsTiming(bucketsStatsCopy, interval),
|
||||
}
|
||||
}
|
||||
|
||||
func calculateInterval(firstTimestamp int64, lastTimestamp int64) time.Duration {
|
||||
validDurations := []time.Duration{
|
||||
time.Minute,
|
||||
time.Minute * 2,
|
||||
time.Minute * 3,
|
||||
time.Minute * 5,
|
||||
time.Minute * 10,
|
||||
time.Minute * 15,
|
||||
time.Minute * 20,
|
||||
time.Minute * 30,
|
||||
time.Minute * 45,
|
||||
time.Minute * 60,
|
||||
time.Minute * 75,
|
||||
time.Minute * 90, // 1.5 minutes
|
||||
time.Minute * 120, // 2 hours
|
||||
time.Minute * 150, // 2.5 hours
|
||||
time.Minute * 180, // 3 hours
|
||||
time.Minute * 240, // 4 hours
|
||||
time.Minute * 300, // 5 hours
|
||||
time.Minute * 360, // 6 hours
|
||||
time.Minute * 420, // 7 hours
|
||||
time.Minute * 480, // 8 hours
|
||||
time.Minute * 540, // 9 hours
|
||||
time.Minute * 600, // 10 hours
|
||||
time.Minute * 660, // 11 hours
|
||||
time.Minute * 720, // 12 hours
|
||||
time.Minute * 1440, // 24 hours
|
||||
}
|
||||
duration := time.Duration(lastTimestamp-firstTimestamp) * time.Second / time.Duration(MaxNumberOfBars)
|
||||
for _, validDuration := range validDurations {
|
||||
if validDuration-duration >= 0 {
|
||||
return validDuration
|
||||
}
|
||||
}
|
||||
return duration.Round(validDurations[len(validDurations)-1])
|
||||
|
||||
}
|
||||
|
||||
func getAccumulativeStats(stats BucketStats) []*AccumulativeStatsProtocol {
|
||||
if len(stats) == 0 {
|
||||
return make([]*AccumulativeStatsProtocol, 0)
|
||||
}
|
||||
|
||||
methodsPerProtocolAggregated, protocolToColor := getAggregatedStatsAllTime(bucketStatsCopy)
|
||||
methodsPerProtocolAggregated := getAggregatedStats(stats)
|
||||
|
||||
return convertAccumulativeStatsDictToArray(methodsPerProtocolAggregated, protocolToColor)
|
||||
return convertAccumulativeStatsDictToArray(methodsPerProtocolAggregated)
|
||||
}
|
||||
|
||||
func GetAccumulativeStatsTiming(intervalSeconds int, numberOfBars int) []*AccumulativeStatsProtocolTime {
|
||||
bucketStatsCopy := getBucketStatsCopy()
|
||||
if len(bucketStatsCopy) == 0 {
|
||||
func getAccumulativeStatsTiming(stats BucketStats, interval time.Duration) []*AccumulativeStatsProtocolTime {
|
||||
if len(stats) == 0 {
|
||||
return make([]*AccumulativeStatsProtocolTime, 0)
|
||||
}
|
||||
|
||||
firstBucketTime := getFirstBucketTime(time.Now().UTC(), intervalSeconds, numberOfBars)
|
||||
methodsPerProtocolPerTimeAggregated := getAggregatedResultTiming(interval, stats)
|
||||
|
||||
methodsPerProtocolPerTimeAggregated, protocolToColor := getAggregatedResultTimingFromSpecificTime(intervalSeconds, bucketStatsCopy, firstBucketTime)
|
||||
|
||||
return convertAccumulativeStatsTimelineDictToArray(methodsPerProtocolPerTimeAggregated, protocolToColor)
|
||||
return convertAccumulativeStatsTimelineDictToArray(methodsPerProtocolPerTimeAggregated)
|
||||
}
|
||||
|
||||
func EntryAdded(size int, summery *api.BaseEntry) {
|
||||
@@ -128,7 +185,6 @@ func addToBucketStats(size int, summery *api.BaseEntry) {
|
||||
if _, found := bucketOfEntry.ProtocolStats[summery.Protocol.Abbreviation]; !found {
|
||||
bucketOfEntry.ProtocolStats[summery.Protocol.Abbreviation] = ProtocolStats{
|
||||
MethodsStats: map[string]*SizeAndEntriesCount{},
|
||||
Color: summery.Protocol.BackgroundColor,
|
||||
}
|
||||
}
|
||||
if _, found := bucketOfEntry.ProtocolStats[summery.Protocol.Abbreviation].MethodsStats[summery.Method]; !found {
|
||||
@@ -147,13 +203,7 @@ func getBucketFromTimeStamp(timestamp int64) time.Time {
|
||||
return entryTimeStampAsTime.Add(-1 * InternalBucketThreshold / 2).Round(InternalBucketThreshold)
|
||||
}
|
||||
|
||||
func getFirstBucketTime(endTime time.Time, intervalSeconds int, numberOfBars int) time.Time {
|
||||
lastBucketTime := endTime.Add(-1 * time.Second * time.Duration(intervalSeconds) / 2).Round(time.Second * time.Duration(intervalSeconds))
|
||||
firstBucketTime := lastBucketTime.Add(-1 * time.Second * time.Duration(intervalSeconds*(numberOfBars-1)))
|
||||
return firstBucketTime
|
||||
}
|
||||
|
||||
func convertAccumulativeStatsTimelineDictToArray(methodsPerProtocolPerTimeAggregated map[time.Time]map[string]map[string]*AccumulativeStatsCounter, protocolToColor map[string]string) []*AccumulativeStatsProtocolTime {
|
||||
func convertAccumulativeStatsTimelineDictToArray(methodsPerProtocolPerTimeAggregated map[time.Time]map[string]map[string]*AccumulativeStatsCounter) []*AccumulativeStatsProtocolTime {
|
||||
finalResult := make([]*AccumulativeStatsProtocolTime, 0)
|
||||
for timeKey, item := range methodsPerProtocolPerTimeAggregated {
|
||||
protocolsData := make([]*AccumulativeStatsProtocol, 0)
|
||||
@@ -184,7 +234,7 @@ func convertAccumulativeStatsTimelineDictToArray(methodsPerProtocolPerTimeAggreg
|
||||
return finalResult
|
||||
}
|
||||
|
||||
func convertAccumulativeStatsDictToArray(methodsPerProtocolAggregated map[string]map[string]*AccumulativeStatsCounter, protocolToColor map[string]string) []*AccumulativeStatsProtocol {
|
||||
func convertAccumulativeStatsDictToArray(methodsPerProtocolAggregated map[string]map[string]*AccumulativeStatsCounter) []*AccumulativeStatsProtocol {
|
||||
protocolsData := make([]*AccumulativeStatsProtocol, 0)
|
||||
for protocolName, value := range methodsPerProtocolAggregated {
|
||||
entriesCount := 0
|
||||
@@ -219,55 +269,44 @@ func getBucketStatsCopy() BucketStats {
|
||||
return bucketStatsCopy
|
||||
}
|
||||
|
||||
func getAggregatedResultTimingFromSpecificTime(intervalSeconds int, bucketStats BucketStats, firstBucketTime time.Time) (map[time.Time]map[string]map[string]*AccumulativeStatsCounter, map[string]string) {
|
||||
protocolToColor := map[string]string{}
|
||||
func getAggregatedResultTiming(interval time.Duration, stats BucketStats) map[time.Time]map[string]map[string]*AccumulativeStatsCounter {
|
||||
methodsPerProtocolPerTimeAggregated := map[time.Time]map[string]map[string]*AccumulativeStatsCounter{}
|
||||
|
||||
bucketStatsIndex := len(bucketStats) - 1
|
||||
bucketStatsIndex := len(stats) - 1
|
||||
for bucketStatsIndex >= 0 {
|
||||
currentBucketTime := bucketStats[bucketStatsIndex].BucketTime
|
||||
if currentBucketTime.After(firstBucketTime) || currentBucketTime.Equal(firstBucketTime) {
|
||||
resultBucketRoundedKey := currentBucketTime.Add(-1 * time.Second * time.Duration(intervalSeconds) / 2).Round(time.Second * time.Duration(intervalSeconds))
|
||||
currentBucketTime := stats[bucketStatsIndex].BucketTime
|
||||
resultBucketRoundedKey := currentBucketTime.Add(-1 * interval / 2).Round(interval)
|
||||
|
||||
for protocolName, data := range bucketStats[bucketStatsIndex].ProtocolStats {
|
||||
if _, ok := protocolToColor[protocolName]; !ok {
|
||||
protocolToColor[protocolName] = data.Color
|
||||
for protocolName, data := range stats[bucketStatsIndex].ProtocolStats {
|
||||
for methodName, dataOfMethod := range data.MethodsStats {
|
||||
|
||||
if _, ok := methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey]; !ok {
|
||||
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey] = map[string]map[string]*AccumulativeStatsCounter{}
|
||||
}
|
||||
|
||||
for methodName, dataOfMethod := range data.MethodsStats {
|
||||
|
||||
if _, ok := methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey]; !ok {
|
||||
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey] = map[string]map[string]*AccumulativeStatsCounter{}
|
||||
}
|
||||
if _, ok := methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName]; !ok {
|
||||
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName] = map[string]*AccumulativeStatsCounter{}
|
||||
}
|
||||
if _, ok := methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName]; !ok {
|
||||
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName] = &AccumulativeStatsCounter{
|
||||
Name: methodName,
|
||||
EntriesCount: 0,
|
||||
VolumeSizeBytes: 0,
|
||||
}
|
||||
}
|
||||
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName].EntriesCount += dataOfMethod.EntriesCount
|
||||
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName].VolumeSizeBytes += dataOfMethod.VolumeInBytes
|
||||
if _, ok := methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName]; !ok {
|
||||
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName] = map[string]*AccumulativeStatsCounter{}
|
||||
}
|
||||
if _, ok := methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName]; !ok {
|
||||
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName] = &AccumulativeStatsCounter{
|
||||
Name: methodName,
|
||||
EntriesCount: 0,
|
||||
VolumeSizeBytes: 0,
|
||||
}
|
||||
}
|
||||
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName].EntriesCount += dataOfMethod.EntriesCount
|
||||
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName].VolumeSizeBytes += dataOfMethod.VolumeInBytes
|
||||
}
|
||||
}
|
||||
|
||||
bucketStatsIndex--
|
||||
}
|
||||
return methodsPerProtocolPerTimeAggregated, protocolToColor
|
||||
return methodsPerProtocolPerTimeAggregated
|
||||
}
|
||||
|
||||
func getAggregatedStatsAllTime(bucketStatsCopy BucketStats) (map[string]map[string]*AccumulativeStatsCounter, map[string]string) {
|
||||
protocolToColor := make(map[string]string, 0)
|
||||
func getAggregatedStats(bucketStatsCopy BucketStats) map[string]map[string]*AccumulativeStatsCounter {
|
||||
methodsPerProtocolAggregated := make(map[string]map[string]*AccumulativeStatsCounter, 0)
|
||||
for _, countersOfTimeFrame := range bucketStatsCopy {
|
||||
for protocolName, value := range countersOfTimeFrame.ProtocolStats {
|
||||
if _, ok := protocolToColor[protocolName]; !ok {
|
||||
protocolToColor[protocolName] = value.Color
|
||||
}
|
||||
|
||||
for method, countersValue := range value.MethodsStats {
|
||||
if _, found := methodsPerProtocolAggregated[protocolName]; !found {
|
||||
methodsPerProtocolAggregated[protocolName] = map[string]*AccumulativeStatsCounter{}
|
||||
@@ -284,5 +323,5 @@ func getAggregatedStatsAllTime(bucketStatsCopy BucketStats) (map[string]map[stri
|
||||
}
|
||||
}
|
||||
}
|
||||
return methodsPerProtocolAggregated, protocolToColor
|
||||
return methodsPerProtocolAggregated
|
||||
}
|
||||
|
Reference in New Issue
Block a user