mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-09-12 21:01:36 +00:00
Adding tests to the bucket statistics (#1160)
This commit is contained in:
@@ -69,72 +69,15 @@ func GetGeneralStats() GeneralStats {
|
||||
return generalStats
|
||||
}
|
||||
|
||||
func getBucketStatsCopy() BucketStats {
|
||||
bucketStatsCopy := BucketStats{}
|
||||
bucketStatsLocker.Lock()
|
||||
if err := copier.Copy(&bucketStatsCopy, bucketsStats); err != nil {
|
||||
logger.Log.Errorf("Error while copying src stats into temporary copied object")
|
||||
return nil
|
||||
}
|
||||
bucketStatsLocker.Unlock()
|
||||
return bucketStatsCopy
|
||||
}
|
||||
|
||||
func GetAccumulativeStats() []*AccumulativeStatsProtocol {
|
||||
bucketStatsCopy := getBucketStatsCopy()
|
||||
if bucketStatsCopy == nil {
|
||||
if len(bucketStatsCopy) == 0 {
|
||||
return make([]*AccumulativeStatsProtocol, 0)
|
||||
}
|
||||
protocolToColor := make(map[string]string, 0)
|
||||
methodsPerProtocolAggregated := make(map[string]map[string]*AccumulativeStatsCounter, 0)
|
||||
for _, countersOfTimeFrame := range bucketStatsCopy {
|
||||
for protocolName, value := range countersOfTimeFrame.ProtocolStats {
|
||||
if _, ok := protocolToColor[protocolName]; !ok {
|
||||
protocolToColor[protocolName] = value.Color
|
||||
}
|
||||
|
||||
for method, countersValue := range value.MethodsStats {
|
||||
if _, found := methodsPerProtocolAggregated[protocolName]; !found {
|
||||
methodsPerProtocolAggregated[protocolName] = map[string]*AccumulativeStatsCounter{}
|
||||
}
|
||||
if _, found := methodsPerProtocolAggregated[protocolName][method]; !found {
|
||||
methodsPerProtocolAggregated[protocolName][method] = &AccumulativeStatsCounter{
|
||||
Name: method,
|
||||
EntriesCount: 0,
|
||||
VolumeSizeBytes: 0,
|
||||
}
|
||||
}
|
||||
methodsPerProtocolAggregated[protocolName][method].EntriesCount += countersValue.EntriesCount
|
||||
methodsPerProtocolAggregated[protocolName][method].VolumeSizeBytes += countersValue.VolumeInBytes
|
||||
}
|
||||
}
|
||||
}
|
||||
methodsPerProtocolAggregated, protocolToColor := getAggregatedStatsAllTime(bucketStatsCopy)
|
||||
|
||||
return ConvertToPieData(methodsPerProtocolAggregated, protocolToColor)
|
||||
}
|
||||
|
||||
func ConvertToPieData(methodsPerProtocolAggregated map[string]map[string]*AccumulativeStatsCounter, protocolToColor map[string]string) []*AccumulativeStatsProtocol {
|
||||
protocolsData := make([]*AccumulativeStatsProtocol, 0)
|
||||
for protocolName, value := range methodsPerProtocolAggregated {
|
||||
entriesCount := 0
|
||||
volumeSizeBytes := 0
|
||||
methods := make([]*AccumulativeStatsCounter, 0)
|
||||
for _, methodAccData := range value {
|
||||
entriesCount += methodAccData.EntriesCount
|
||||
volumeSizeBytes += methodAccData.VolumeSizeBytes
|
||||
methods = append(methods, methodAccData)
|
||||
}
|
||||
protocolsData = append(protocolsData, &AccumulativeStatsProtocol{
|
||||
AccumulativeStatsCounter: AccumulativeStatsCounter{
|
||||
Name: protocolName,
|
||||
EntriesCount: entriesCount,
|
||||
VolumeSizeBytes: volumeSizeBytes,
|
||||
},
|
||||
Color: protocolToColor[protocolName],
|
||||
Methods: methods,
|
||||
})
|
||||
}
|
||||
return protocolsData
|
||||
return convertAccumulativeStatsDictToArray(methodsPerProtocolAggregated, protocolToColor)
|
||||
}
|
||||
|
||||
func GetAccumulativeStatsTiming(intervalSeconds int, numberOfBars int) []*AccumulativeStatsProtocolTime {
|
||||
@@ -143,79 +86,11 @@ func GetAccumulativeStatsTiming(intervalSeconds int, numberOfBars int) []*Accumu
|
||||
return make([]*AccumulativeStatsProtocolTime, 0)
|
||||
}
|
||||
|
||||
protocolToColor := make(map[string]string, 0)
|
||||
methodsPerProtocolPerTimeAggregated := make(map[time.Time]map[string]map[string]*AccumulativeStatsCounter, 0)
|
||||
firstBucketTime := getFirstBucketTime(time.Now().UTC(), intervalSeconds, numberOfBars)
|
||||
|
||||
// TODO: Extract to function and add tests for those values
|
||||
lastBucketTime := time.Now().UTC().Add(-1 * InternalBucketThreshold / 2).Round(InternalBucketThreshold)
|
||||
firstBucketTime := lastBucketTime.Add(-1 * time.Second * time.Duration(intervalSeconds*(numberOfBars-1)))
|
||||
bucketStatsIndex := len(bucketStatsCopy) - 1
|
||||
methodsPerProtocolPerTimeAggregated, protocolToColor := getAggregatedResultTimingFromSpecificTime(intervalSeconds, bucketStatsCopy, firstBucketTime)
|
||||
|
||||
for bucketStatsIndex >= 0 {
|
||||
currentBucketTime := bucketStatsCopy[bucketStatsIndex].BucketTime
|
||||
if currentBucketTime.After(firstBucketTime) || currentBucketTime.Equal(firstBucketTime) {
|
||||
resultBucketRoundedKey := currentBucketTime.Add(-1 * time.Second * time.Duration(intervalSeconds) / 2).Round(time.Second * time.Duration(intervalSeconds))
|
||||
|
||||
for protocolName, data := range bucketStatsCopy[bucketStatsIndex].ProtocolStats {
|
||||
if _, ok := protocolToColor[protocolName]; !ok {
|
||||
protocolToColor[protocolName] = data.Color
|
||||
}
|
||||
|
||||
for methodName, dataOfMethod := range data.MethodsStats {
|
||||
|
||||
if _, ok := methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey]; !ok {
|
||||
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey] = map[string]map[string]*AccumulativeStatsCounter{}
|
||||
}
|
||||
if _, ok := methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName]; !ok {
|
||||
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName] = map[string]*AccumulativeStatsCounter{}
|
||||
}
|
||||
if _, ok := methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName]; !ok {
|
||||
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName] = &AccumulativeStatsCounter{
|
||||
Name: methodName,
|
||||
EntriesCount: 0,
|
||||
VolumeSizeBytes: 0,
|
||||
}
|
||||
}
|
||||
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName].EntriesCount += dataOfMethod.EntriesCount
|
||||
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName].VolumeSizeBytes += dataOfMethod.VolumeInBytes
|
||||
}
|
||||
}
|
||||
}
|
||||
bucketStatsIndex--
|
||||
}
|
||||
|
||||
return ConvertToTimelineData(methodsPerProtocolPerTimeAggregated, protocolToColor)
|
||||
}
|
||||
|
||||
func ConvertToTimelineData(methodsPerProtocolPerTimeAggregated map[time.Time]map[string]map[string]*AccumulativeStatsCounter, protocolToColor map[string]string) []*AccumulativeStatsProtocolTime {
|
||||
finalResult := make([]*AccumulativeStatsProtocolTime, 0)
|
||||
for timeKey, item := range methodsPerProtocolPerTimeAggregated {
|
||||
protocolsData := make([]*AccumulativeStatsProtocol, 0)
|
||||
for protocolName := range item {
|
||||
entriesCount := 0
|
||||
volumeSizeBytes := 0
|
||||
methods := make([]*AccumulativeStatsCounter, 0)
|
||||
for _, methodAccData := range methodsPerProtocolPerTimeAggregated[timeKey][protocolName] {
|
||||
entriesCount += methodAccData.EntriesCount
|
||||
volumeSizeBytes += methodAccData.VolumeSizeBytes
|
||||
methods = append(methods, methodAccData)
|
||||
}
|
||||
protocolsData = append(protocolsData, &AccumulativeStatsProtocol{
|
||||
AccumulativeStatsCounter: AccumulativeStatsCounter{
|
||||
Name: protocolName,
|
||||
EntriesCount: entriesCount,
|
||||
VolumeSizeBytes: volumeSizeBytes,
|
||||
},
|
||||
Color: protocolToColor[protocolName],
|
||||
Methods: methods,
|
||||
})
|
||||
}
|
||||
finalResult = append(finalResult, &AccumulativeStatsProtocolTime{
|
||||
Time: timeKey.UnixMilli(),
|
||||
ProtocolsData: protocolsData,
|
||||
})
|
||||
}
|
||||
return finalResult
|
||||
return convertAccumulativeStatsTimelineDictToArray(methodsPerProtocolPerTimeAggregated, protocolToColor)
|
||||
}
|
||||
|
||||
func EntryAdded(size int, summery *api.BaseEntry) {
|
||||
@@ -233,14 +108,8 @@ func EntryAdded(size int, summery *api.BaseEntry) {
|
||||
generalStats.LastEntryTimestamp = currentTimestamp
|
||||
}
|
||||
|
||||
//GetBucketOfTimeStamp Round the entry to the nearest threshold (one minute) floored (e.g: 15:31:45 -> 15:31:00)
|
||||
func GetBucketOfTimeStamp(timestamp int64) time.Time {
|
||||
entryTimeStampAsTime := time.UnixMilli(timestamp)
|
||||
return entryTimeStampAsTime.Add(-1 * InternalBucketThreshold / 2).Round(InternalBucketThreshold)
|
||||
}
|
||||
|
||||
func addToBucketStats(size int, summery *api.BaseEntry) {
|
||||
entryTimeBucketRounded := GetBucketOfTimeStamp(summery.Timestamp)
|
||||
entryTimeBucketRounded := getBucketFromTimeStamp(summery.Timestamp)
|
||||
|
||||
if len(bucketsStats) == 0 {
|
||||
bucketsStats = append(bucketsStats, &TimeFrameStatsValue{
|
||||
@@ -272,3 +141,148 @@ func addToBucketStats(size int, summery *api.BaseEntry) {
|
||||
bucketOfEntry.ProtocolStats[summery.Protocol.Abbreviation].MethodsStats[summery.Method].EntriesCount += 1
|
||||
bucketOfEntry.ProtocolStats[summery.Protocol.Abbreviation].MethodsStats[summery.Method].VolumeInBytes += size
|
||||
}
|
||||
|
||||
func getBucketFromTimeStamp(timestamp int64) time.Time {
|
||||
entryTimeStampAsTime := time.UnixMilli(timestamp)
|
||||
return entryTimeStampAsTime.Add(-1 * InternalBucketThreshold / 2).Round(InternalBucketThreshold)
|
||||
}
|
||||
|
||||
func getFirstBucketTime(endTime time.Time, intervalSeconds int, numberOfBars int) time.Time {
|
||||
lastBucketTime := endTime.Add(-1 * time.Second * time.Duration(intervalSeconds) / 2).Round(time.Second * time.Duration(intervalSeconds))
|
||||
firstBucketTime := lastBucketTime.Add(-1 * time.Second * time.Duration(intervalSeconds*(numberOfBars-1)))
|
||||
return firstBucketTime
|
||||
}
|
||||
|
||||
func convertAccumulativeStatsTimelineDictToArray(methodsPerProtocolPerTimeAggregated map[time.Time]map[string]map[string]*AccumulativeStatsCounter, protocolToColor map[string]string) []*AccumulativeStatsProtocolTime {
|
||||
finalResult := make([]*AccumulativeStatsProtocolTime, 0)
|
||||
for timeKey, item := range methodsPerProtocolPerTimeAggregated {
|
||||
protocolsData := make([]*AccumulativeStatsProtocol, 0)
|
||||
for protocolName := range item {
|
||||
entriesCount := 0
|
||||
volumeSizeBytes := 0
|
||||
methods := make([]*AccumulativeStatsCounter, 0)
|
||||
for _, methodAccData := range methodsPerProtocolPerTimeAggregated[timeKey][protocolName] {
|
||||
entriesCount += methodAccData.EntriesCount
|
||||
volumeSizeBytes += methodAccData.VolumeSizeBytes
|
||||
methods = append(methods, methodAccData)
|
||||
}
|
||||
protocolsData = append(protocolsData, &AccumulativeStatsProtocol{
|
||||
AccumulativeStatsCounter: AccumulativeStatsCounter{
|
||||
Name: protocolName,
|
||||
EntriesCount: entriesCount,
|
||||
VolumeSizeBytes: volumeSizeBytes,
|
||||
},
|
||||
Color: protocolToColor[protocolName],
|
||||
Methods: methods,
|
||||
})
|
||||
}
|
||||
finalResult = append(finalResult, &AccumulativeStatsProtocolTime{
|
||||
Time: timeKey.UnixMilli(),
|
||||
ProtocolsData: protocolsData,
|
||||
})
|
||||
}
|
||||
return finalResult
|
||||
}
|
||||
|
||||
func convertAccumulativeStatsDictToArray(methodsPerProtocolAggregated map[string]map[string]*AccumulativeStatsCounter, protocolToColor map[string]string) []*AccumulativeStatsProtocol {
|
||||
protocolsData := make([]*AccumulativeStatsProtocol, 0)
|
||||
for protocolName, value := range methodsPerProtocolAggregated {
|
||||
entriesCount := 0
|
||||
volumeSizeBytes := 0
|
||||
methods := make([]*AccumulativeStatsCounter, 0)
|
||||
for _, methodAccData := range value {
|
||||
entriesCount += methodAccData.EntriesCount
|
||||
volumeSizeBytes += methodAccData.VolumeSizeBytes
|
||||
methods = append(methods, methodAccData)
|
||||
}
|
||||
protocolsData = append(protocolsData, &AccumulativeStatsProtocol{
|
||||
AccumulativeStatsCounter: AccumulativeStatsCounter{
|
||||
Name: protocolName,
|
||||
EntriesCount: entriesCount,
|
||||
VolumeSizeBytes: volumeSizeBytes,
|
||||
},
|
||||
Color: protocolToColor[protocolName],
|
||||
Methods: methods,
|
||||
})
|
||||
}
|
||||
return protocolsData
|
||||
}
|
||||
|
||||
func getBucketStatsCopy() BucketStats {
|
||||
bucketStatsCopy := BucketStats{}
|
||||
bucketStatsLocker.Lock()
|
||||
if err := copier.Copy(&bucketStatsCopy, bucketsStats); err != nil {
|
||||
logger.Log.Errorf("Error while copying src stats into temporary copied object")
|
||||
return nil
|
||||
}
|
||||
bucketStatsLocker.Unlock()
|
||||
return bucketStatsCopy
|
||||
}
|
||||
|
||||
func getAggregatedResultTimingFromSpecificTime(intervalSeconds int, bucketStats BucketStats, firstBucketTime time.Time) (map[time.Time]map[string]map[string]*AccumulativeStatsCounter, map[string]string) {
|
||||
protocolToColor := map[string]string{}
|
||||
methodsPerProtocolPerTimeAggregated := map[time.Time]map[string]map[string]*AccumulativeStatsCounter{}
|
||||
|
||||
bucketStatsIndex := len(bucketStats) - 1
|
||||
for bucketStatsIndex >= 0 {
|
||||
currentBucketTime := bucketStats[bucketStatsIndex].BucketTime
|
||||
if currentBucketTime.After(firstBucketTime) || currentBucketTime.Equal(firstBucketTime) {
|
||||
resultBucketRoundedKey := currentBucketTime.Add(-1 * time.Second * time.Duration(intervalSeconds) / 2).Round(time.Second * time.Duration(intervalSeconds))
|
||||
|
||||
for protocolName, data := range bucketStats[bucketStatsIndex].ProtocolStats {
|
||||
if _, ok := protocolToColor[protocolName]; !ok {
|
||||
protocolToColor[protocolName] = data.Color
|
||||
}
|
||||
|
||||
for methodName, dataOfMethod := range data.MethodsStats {
|
||||
|
||||
if _, ok := methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey]; !ok {
|
||||
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey] = map[string]map[string]*AccumulativeStatsCounter{}
|
||||
}
|
||||
if _, ok := methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName]; !ok {
|
||||
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName] = map[string]*AccumulativeStatsCounter{}
|
||||
}
|
||||
if _, ok := methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName]; !ok {
|
||||
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName] = &AccumulativeStatsCounter{
|
||||
Name: methodName,
|
||||
EntriesCount: 0,
|
||||
VolumeSizeBytes: 0,
|
||||
}
|
||||
}
|
||||
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName].EntriesCount += dataOfMethod.EntriesCount
|
||||
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName].VolumeSizeBytes += dataOfMethod.VolumeInBytes
|
||||
}
|
||||
}
|
||||
}
|
||||
bucketStatsIndex--
|
||||
}
|
||||
return methodsPerProtocolPerTimeAggregated, protocolToColor
|
||||
}
|
||||
|
||||
func getAggregatedStatsAllTime(bucketStatsCopy BucketStats) (map[string]map[string]*AccumulativeStatsCounter, map[string]string) {
|
||||
protocolToColor := make(map[string]string, 0)
|
||||
methodsPerProtocolAggregated := make(map[string]map[string]*AccumulativeStatsCounter, 0)
|
||||
for _, countersOfTimeFrame := range bucketStatsCopy {
|
||||
for protocolName, value := range countersOfTimeFrame.ProtocolStats {
|
||||
if _, ok := protocolToColor[protocolName]; !ok {
|
||||
protocolToColor[protocolName] = value.Color
|
||||
}
|
||||
|
||||
for method, countersValue := range value.MethodsStats {
|
||||
if _, found := methodsPerProtocolAggregated[protocolName]; !found {
|
||||
methodsPerProtocolAggregated[protocolName] = map[string]*AccumulativeStatsCounter{}
|
||||
}
|
||||
if _, found := methodsPerProtocolAggregated[protocolName][method]; !found {
|
||||
methodsPerProtocolAggregated[protocolName][method] = &AccumulativeStatsCounter{
|
||||
Name: method,
|
||||
EntriesCount: 0,
|
||||
VolumeSizeBytes: 0,
|
||||
}
|
||||
}
|
||||
methodsPerProtocolAggregated[protocolName][method].EntriesCount += countersValue.EntriesCount
|
||||
methodsPerProtocolAggregated[protocolName][method].VolumeSizeBytes += countersValue.VolumeInBytes
|
||||
}
|
||||
}
|
||||
}
|
||||
return methodsPerProtocolAggregated, protocolToColor
|
||||
}
|
||||
|
Reference in New Issue
Block a user