diff --git a/agent/pkg/controllers/status_controller.go b/agent/pkg/controllers/status_controller.go index a1a096df0..ee3b8b08f 100644 --- a/agent/pkg/controllers/status_controller.go +++ b/agent/pkg/controllers/status_controller.go @@ -83,6 +83,11 @@ func GetAccumulativeStats(c *gin.Context) { c.JSON(http.StatusOK, providers.GetAccumulativeStats()) } +func GetAccumulativeStatsTiming(c *gin.Context) { + // for now hardcoded 10 bars of 5 minutes interval + c.JSON(http.StatusOK, providers.GetAccumulativeStatsTiming(300, 10)) +} + func GetCurrentResolvingInformation(c *gin.Context) { c.JSON(http.StatusOK, holder.GetResolver().GetMap()) } diff --git a/agent/pkg/providers/stats_provider.go b/agent/pkg/providers/stats_provider.go index eb60c1e45..1e0340cd3 100644 --- a/agent/pkg/providers/stats_provider.go +++ b/agent/pkg/providers/stats_provider.go @@ -2,6 +2,7 @@ package providers import ( "reflect" + "sync" "time" "github.com/jinzhu/copier" @@ -19,18 +20,18 @@ type GeneralStats struct { type BucketStats []*TimeFrameStatsValue type TimeFrameStatsValue struct { - BucketTime time.Time - ProtocolStats map[string]ProtocolStats + BucketTime time.Time `json:"timestamp"` + ProtocolStats map[string]ProtocolStats `json:"protocols"` } type ProtocolStats struct { - MethodsStats map[string]*SizeAndEntriesCount - Color string + MethodsStats map[string]*SizeAndEntriesCount `json:"methods"` + Color string `json:"color"` } type SizeAndEntriesCount struct { - EntriesCount int - VolumeInBytes int + EntriesCount int `json:"entriesCount"` + VolumeInBytes int `json:"volumeInBytes"` } type AccumulativeStatsCounter struct { @@ -45,9 +46,19 @@ type AccumulativeStatsProtocol struct { Methods []*AccumulativeStatsCounter `json:"methods"` } +type AccumulativeStatsProtocolTime struct { + ProtocolsData []*AccumulativeStatsProtocol `json:"protocols"` + Time int64 `json:"timestamp"` +} + var ( - generalStats = GeneralStats{} - bucketsStats = BucketStats{} + generalStats = GeneralStats{} + bucketsStats = BucketStats{} + bucketStatsLocker = sync.Mutex{} +) + +const ( + InternalBucketThreshold = time.Minute * 1 ) func ResetGeneralStats() { @@ -58,33 +69,34 @@ func GetGeneralStats() GeneralStats { return generalStats } -func GetAccumulativeStats() []*AccumulativeStatsProtocol { +func getBucketStatsCopy() BucketStats { bucketStatsCopy := BucketStats{} + bucketStatsLocker.Lock() if err := copier.Copy(&bucketStatsCopy, bucketsStats); err != nil { logger.Log.Errorf("Error while copying src stats into temporary copied object") + return nil + } + bucketStatsLocker.Unlock() + return bucketStatsCopy +} + +func GetAccumulativeStats() []*AccumulativeStatsProtocol { + bucketStatsCopy := getBucketStatsCopy() + if bucketStatsCopy == nil { return make([]*AccumulativeStatsProtocol, 0) } - - result := make(map[string]*AccumulativeStatsProtocol, 0) + protocolToColor := make(map[string]string, 0) methodsPerProtocolAggregated := make(map[string]map[string]*AccumulativeStatsCounter, 0) for _, countersOfTimeFrame := range bucketStatsCopy { for protocolName, value := range countersOfTimeFrame.ProtocolStats { - - if _, found := result[protocolName]; !found { - result[protocolName] = &AccumulativeStatsProtocol{ - AccumulativeStatsCounter: AccumulativeStatsCounter{ - Name: protocolName, - EntriesCount: 0, - VolumeSizeBytes: 0, - }, - Color: value.Color, - } - } - if _, found := methodsPerProtocolAggregated[protocolName]; !found { - methodsPerProtocolAggregated[protocolName] = map[string]*AccumulativeStatsCounter{} + if _, ok := protocolToColor[protocolName]; !ok { + protocolToColor[protocolName] = value.Color } for method, countersValue := range value.MethodsStats { + if _, found := methodsPerProtocolAggregated[protocolName]; !found { + methodsPerProtocolAggregated[protocolName] = map[string]*AccumulativeStatsCounter{} + } if _, found := methodsPerProtocolAggregated[protocolName][method]; !found { methodsPerProtocolAggregated[protocolName][method] = &AccumulativeStatsCounter{ Name: method, @@ -92,23 +104,116 @@ func GetAccumulativeStats() []*AccumulativeStatsProtocol { VolumeSizeBytes: 0, } } - - result[protocolName].AccumulativeStatsCounter.EntriesCount += countersValue.EntriesCount methodsPerProtocolAggregated[protocolName][method].EntriesCount += countersValue.EntriesCount - result[protocolName].AccumulativeStatsCounter.VolumeSizeBytes += countersValue.VolumeInBytes methodsPerProtocolAggregated[protocolName][method].VolumeSizeBytes += countersValue.VolumeInBytes } } } - finalResult := make([]*AccumulativeStatsProtocol, 0) - for _, value := range result { - methodsForProtocol := make([]*AccumulativeStatsCounter, 0) - for _, methodValue := range methodsPerProtocolAggregated[value.Name] { - methodsForProtocol = append(methodsForProtocol, methodValue) + return ConvertToPieData(methodsPerProtocolAggregated, protocolToColor) +} + +func ConvertToPieData(methodsPerProtocolAggregated map[string]map[string]*AccumulativeStatsCounter, protocolToColor map[string]string) []*AccumulativeStatsProtocol { + protocolsData := make([]*AccumulativeStatsProtocol, 0) + for protocolName, value := range methodsPerProtocolAggregated { + entriesCount := 0 + volumeSizeBytes := 0 + methods := make([]*AccumulativeStatsCounter, 0) + for _, methodAccData := range value { + entriesCount += methodAccData.EntriesCount + volumeSizeBytes += methodAccData.VolumeSizeBytes + methods = append(methods, methodAccData) } - value.Methods = methodsForProtocol - finalResult = append(finalResult, value) + protocolsData = append(protocolsData, &AccumulativeStatsProtocol{ + AccumulativeStatsCounter: AccumulativeStatsCounter{ + Name: protocolName, + EntriesCount: entriesCount, + VolumeSizeBytes: volumeSizeBytes, + }, + Color: protocolToColor[protocolName], + Methods: methods, + }) + } + return protocolsData +} + +func GetAccumulativeStatsTiming(intervalSeconds int, numberOfBars int) []*AccumulativeStatsProtocolTime { + bucketStatsCopy := getBucketStatsCopy() + if len(bucketStatsCopy) == 0 { + return make([]*AccumulativeStatsProtocolTime, 0) + } + + protocolToColor := make(map[string]string, 0) + methodsPerProtocolPerTimeAggregated := make(map[time.Time]map[string]map[string]*AccumulativeStatsCounter, 0) + + // TODO: Extract to function and add tests for those values + lastBucketTime := time.Now().UTC().Add(-1 * InternalBucketThreshold / 2).Round(InternalBucketThreshold) + firstBucketTime := lastBucketTime.Add(-1 * time.Second * time.Duration(intervalSeconds*(numberOfBars-1))) + bucketStatsIndex := len(bucketStatsCopy) - 1 + + for bucketStatsIndex >= 0 { + currentBucketTime := bucketStatsCopy[bucketStatsIndex].BucketTime + if currentBucketTime.After(firstBucketTime) || currentBucketTime.Equal(firstBucketTime) { + resultBucketRoundedKey := currentBucketTime.Add(-1 * time.Second * time.Duration(intervalSeconds) / 2).Round(time.Second * time.Duration(intervalSeconds)) + + for protocolName, data := range bucketStatsCopy[bucketStatsIndex].ProtocolStats { + if _, ok := protocolToColor[protocolName]; !ok { + protocolToColor[protocolName] = data.Color + } + + for methodName, dataOfMethod := range data.MethodsStats { + + if _, ok := methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey]; !ok { + methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey] = map[string]map[string]*AccumulativeStatsCounter{} + } + if _, ok := methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName]; !ok { + methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName] = map[string]*AccumulativeStatsCounter{} + } + if _, ok := methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName]; !ok { + methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName] = &AccumulativeStatsCounter{ + Name: methodName, + EntriesCount: 0, + VolumeSizeBytes: 0, + } + } + methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName].EntriesCount += dataOfMethod.EntriesCount + methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName].VolumeSizeBytes += dataOfMethod.VolumeInBytes + } + } + } + bucketStatsIndex-- + } + + return ConvertToTimelineData(methodsPerProtocolPerTimeAggregated, protocolToColor) +} + +func ConvertToTimelineData(methodsPerProtocolPerTimeAggregated map[time.Time]map[string]map[string]*AccumulativeStatsCounter, protocolToColor map[string]string) []*AccumulativeStatsProtocolTime { + finalResult := make([]*AccumulativeStatsProtocolTime, 0) + for timeKey, item := range methodsPerProtocolPerTimeAggregated { + protocolsData := make([]*AccumulativeStatsProtocol, 0) + for protocolName := range item { + entriesCount := 0 + volumeSizeBytes := 0 + methods := make([]*AccumulativeStatsCounter, 0) + for _, methodAccData := range methodsPerProtocolPerTimeAggregated[timeKey][protocolName] { + entriesCount += methodAccData.EntriesCount + volumeSizeBytes += methodAccData.VolumeSizeBytes + methods = append(methods, methodAccData) + } + protocolsData = append(protocolsData, &AccumulativeStatsProtocol{ + AccumulativeStatsCounter: AccumulativeStatsCounter{ + Name: protocolName, + EntriesCount: entriesCount, + VolumeSizeBytes: volumeSizeBytes, + }, + Color: protocolToColor[protocolName], + Methods: methods, + }) + } + finalResult = append(finalResult, &AccumulativeStatsProtocolTime{ + Time: timeKey.UnixMilli(), + ProtocolsData: protocolsData, + }) } return finalResult } @@ -128,8 +233,15 @@ func EntryAdded(size int, summery *api.BaseEntry) { generalStats.LastEntryTimestamp = currentTimestamp } +//GetBucketOfTimeStamp Round the entry to the nearest threshold (one minute) floored (e.g: 15:31:45 -> 15:31:00) +func GetBucketOfTimeStamp(timestamp int64) time.Time { + entryTimeStampAsTime := time.UnixMilli(timestamp) + return entryTimeStampAsTime.Add(-1 * InternalBucketThreshold / 2).Round(InternalBucketThreshold) +} + func addToBucketStats(size int, summery *api.BaseEntry) { - entryTimeBucketRounded := time.Unix(summery.Timestamp, 0).Round(time.Minute * 1) + entryTimeBucketRounded := GetBucketOfTimeStamp(summery.Timestamp) + if len(bucketsStats) == 0 { bucketsStats = append(bucketsStats, &TimeFrameStatsValue{ BucketTime: entryTimeBucketRounded, diff --git a/agent/pkg/providers/stats_provider_test.go b/agent/pkg/providers/stats_provider_test.go index 36e10df86..8392dc7db 100644 --- a/agent/pkg/providers/stats_provider_test.go +++ b/agent/pkg/providers/stats_provider_test.go @@ -83,3 +83,22 @@ func TestEntryAddedVolume(t *testing.T) { } } + +func TestGetBucketOfTimeStamp(t *testing.T) { + tests := map[int64]time.Time{ + time.Date(2022, time.Month(1), 1, 10, 34, 45, 0, time.Local).UnixMilli(): time.Date(2022, time.Month(1), 1, 10, 34, 00, 0, time.Local), + time.Date(2022, time.Month(1), 1, 10, 34, 00, 0, time.Local).UnixMilli(): time.Date(2022, time.Month(1), 1, 10, 34, 00, 0, time.Local), + time.Date(2022, time.Month(1), 1, 10, 59, 01, 0, time.Local).UnixMilli(): time.Date(2022, time.Month(1), 1, 10, 59, 00, 0, time.Local), + } + + for key, value := range tests { + t.Run(fmt.Sprintf("%v", key), func(t *testing.T) { + + actual := providers.GetBucketOfTimeStamp(key) + + if actual != value { + t.Errorf("unexpected result - expected: %v, actual: %v", value, actual) + } + }) + } +} diff --git a/agent/pkg/routes/status_routes.go b/agent/pkg/routes/status_routes.go index c82a4945b..73e58ca86 100644 --- a/agent/pkg/routes/status_routes.go +++ b/agent/pkg/routes/status_routes.go @@ -16,7 +16,8 @@ func StatusRoutes(ginApp *gin.Engine) { routeGroup.GET("/tap", controllers.GetTappingStatus) routeGroup.GET("/general", controllers.GetGeneralStats) // get general stats about entries in DB - routeGroup.GET("/accumulative", controllers.GetAccumulativeStats) // get general stats about entries in DB + routeGroup.GET("/accumulative", controllers.GetAccumulativeStats) + routeGroup.GET("/accumulativeTiming", controllers.GetAccumulativeStatsTiming) routeGroup.GET("/resolving", controllers.GetCurrentResolvingInformation) }