Add timing endpoint (timeline data) for stats (#1157)

This commit is contained in:
gadotroee
2022-06-21 14:33:04 +03:00
committed by GitHub
parent f9a9c05f48
commit 9a40895e9c
4 changed files with 173 additions and 36 deletions

View File

@@ -83,6 +83,11 @@ func GetAccumulativeStats(c *gin.Context) {
c.JSON(http.StatusOK, providers.GetAccumulativeStats()) c.JSON(http.StatusOK, providers.GetAccumulativeStats())
} }
func GetAccumulativeStatsTiming(c *gin.Context) {
// for now hardcoded 10 bars of 5 minutes interval
c.JSON(http.StatusOK, providers.GetAccumulativeStatsTiming(300, 10))
}
func GetCurrentResolvingInformation(c *gin.Context) { func GetCurrentResolvingInformation(c *gin.Context) {
c.JSON(http.StatusOK, holder.GetResolver().GetMap()) c.JSON(http.StatusOK, holder.GetResolver().GetMap())
} }

View File

@@ -2,6 +2,7 @@ package providers
import ( import (
"reflect" "reflect"
"sync"
"time" "time"
"github.com/jinzhu/copier" "github.com/jinzhu/copier"
@@ -19,18 +20,18 @@ type GeneralStats struct {
type BucketStats []*TimeFrameStatsValue type BucketStats []*TimeFrameStatsValue
type TimeFrameStatsValue struct { type TimeFrameStatsValue struct {
BucketTime time.Time BucketTime time.Time `json:"timestamp"`
ProtocolStats map[string]ProtocolStats ProtocolStats map[string]ProtocolStats `json:"protocols"`
} }
type ProtocolStats struct { type ProtocolStats struct {
MethodsStats map[string]*SizeAndEntriesCount MethodsStats map[string]*SizeAndEntriesCount `json:"methods"`
Color string Color string `json:"color"`
} }
type SizeAndEntriesCount struct { type SizeAndEntriesCount struct {
EntriesCount int EntriesCount int `json:"entriesCount"`
VolumeInBytes int VolumeInBytes int `json:"volumeInBytes"`
} }
type AccumulativeStatsCounter struct { type AccumulativeStatsCounter struct {
@@ -45,9 +46,19 @@ type AccumulativeStatsProtocol struct {
Methods []*AccumulativeStatsCounter `json:"methods"` Methods []*AccumulativeStatsCounter `json:"methods"`
} }
type AccumulativeStatsProtocolTime struct {
ProtocolsData []*AccumulativeStatsProtocol `json:"protocols"`
Time int64 `json:"timestamp"`
}
var ( var (
generalStats = GeneralStats{} generalStats = GeneralStats{}
bucketsStats = BucketStats{} bucketsStats = BucketStats{}
bucketStatsLocker = sync.Mutex{}
)
const (
InternalBucketThreshold = time.Minute * 1
) )
func ResetGeneralStats() { func ResetGeneralStats() {
@@ -58,33 +69,34 @@ func GetGeneralStats() GeneralStats {
return generalStats return generalStats
} }
func GetAccumulativeStats() []*AccumulativeStatsProtocol { func getBucketStatsCopy() BucketStats {
bucketStatsCopy := BucketStats{} bucketStatsCopy := BucketStats{}
bucketStatsLocker.Lock()
if err := copier.Copy(&bucketStatsCopy, bucketsStats); err != nil { if err := copier.Copy(&bucketStatsCopy, bucketsStats); err != nil {
logger.Log.Errorf("Error while copying src stats into temporary copied object") logger.Log.Errorf("Error while copying src stats into temporary copied object")
return nil
}
bucketStatsLocker.Unlock()
return bucketStatsCopy
}
func GetAccumulativeStats() []*AccumulativeStatsProtocol {
bucketStatsCopy := getBucketStatsCopy()
if bucketStatsCopy == nil {
return make([]*AccumulativeStatsProtocol, 0) return make([]*AccumulativeStatsProtocol, 0)
} }
protocolToColor := make(map[string]string, 0)
result := make(map[string]*AccumulativeStatsProtocol, 0)
methodsPerProtocolAggregated := make(map[string]map[string]*AccumulativeStatsCounter, 0) methodsPerProtocolAggregated := make(map[string]map[string]*AccumulativeStatsCounter, 0)
for _, countersOfTimeFrame := range bucketStatsCopy { for _, countersOfTimeFrame := range bucketStatsCopy {
for protocolName, value := range countersOfTimeFrame.ProtocolStats { for protocolName, value := range countersOfTimeFrame.ProtocolStats {
if _, ok := protocolToColor[protocolName]; !ok {
if _, found := result[protocolName]; !found { protocolToColor[protocolName] = value.Color
result[protocolName] = &AccumulativeStatsProtocol{
AccumulativeStatsCounter: AccumulativeStatsCounter{
Name: protocolName,
EntriesCount: 0,
VolumeSizeBytes: 0,
},
Color: value.Color,
}
}
if _, found := methodsPerProtocolAggregated[protocolName]; !found {
methodsPerProtocolAggregated[protocolName] = map[string]*AccumulativeStatsCounter{}
} }
for method, countersValue := range value.MethodsStats { for method, countersValue := range value.MethodsStats {
if _, found := methodsPerProtocolAggregated[protocolName]; !found {
methodsPerProtocolAggregated[protocolName] = map[string]*AccumulativeStatsCounter{}
}
if _, found := methodsPerProtocolAggregated[protocolName][method]; !found { if _, found := methodsPerProtocolAggregated[protocolName][method]; !found {
methodsPerProtocolAggregated[protocolName][method] = &AccumulativeStatsCounter{ methodsPerProtocolAggregated[protocolName][method] = &AccumulativeStatsCounter{
Name: method, Name: method,
@@ -92,23 +104,116 @@ func GetAccumulativeStats() []*AccumulativeStatsProtocol {
VolumeSizeBytes: 0, VolumeSizeBytes: 0,
} }
} }
result[protocolName].AccumulativeStatsCounter.EntriesCount += countersValue.EntriesCount
methodsPerProtocolAggregated[protocolName][method].EntriesCount += countersValue.EntriesCount methodsPerProtocolAggregated[protocolName][method].EntriesCount += countersValue.EntriesCount
result[protocolName].AccumulativeStatsCounter.VolumeSizeBytes += countersValue.VolumeInBytes
methodsPerProtocolAggregated[protocolName][method].VolumeSizeBytes += countersValue.VolumeInBytes methodsPerProtocolAggregated[protocolName][method].VolumeSizeBytes += countersValue.VolumeInBytes
} }
} }
} }
finalResult := make([]*AccumulativeStatsProtocol, 0) return ConvertToPieData(methodsPerProtocolAggregated, protocolToColor)
for _, value := range result { }
methodsForProtocol := make([]*AccumulativeStatsCounter, 0)
for _, methodValue := range methodsPerProtocolAggregated[value.Name] { func ConvertToPieData(methodsPerProtocolAggregated map[string]map[string]*AccumulativeStatsCounter, protocolToColor map[string]string) []*AccumulativeStatsProtocol {
methodsForProtocol = append(methodsForProtocol, methodValue) protocolsData := make([]*AccumulativeStatsProtocol, 0)
for protocolName, value := range methodsPerProtocolAggregated {
entriesCount := 0
volumeSizeBytes := 0
methods := make([]*AccumulativeStatsCounter, 0)
for _, methodAccData := range value {
entriesCount += methodAccData.EntriesCount
volumeSizeBytes += methodAccData.VolumeSizeBytes
methods = append(methods, methodAccData)
} }
value.Methods = methodsForProtocol protocolsData = append(protocolsData, &AccumulativeStatsProtocol{
finalResult = append(finalResult, value) AccumulativeStatsCounter: AccumulativeStatsCounter{
Name: protocolName,
EntriesCount: entriesCount,
VolumeSizeBytes: volumeSizeBytes,
},
Color: protocolToColor[protocolName],
Methods: methods,
})
}
return protocolsData
}
func GetAccumulativeStatsTiming(intervalSeconds int, numberOfBars int) []*AccumulativeStatsProtocolTime {
bucketStatsCopy := getBucketStatsCopy()
if len(bucketStatsCopy) == 0 {
return make([]*AccumulativeStatsProtocolTime, 0)
}
protocolToColor := make(map[string]string, 0)
methodsPerProtocolPerTimeAggregated := make(map[time.Time]map[string]map[string]*AccumulativeStatsCounter, 0)
// TODO: Extract to function and add tests for those values
lastBucketTime := time.Now().UTC().Add(-1 * InternalBucketThreshold / 2).Round(InternalBucketThreshold)
firstBucketTime := lastBucketTime.Add(-1 * time.Second * time.Duration(intervalSeconds*(numberOfBars-1)))
bucketStatsIndex := len(bucketStatsCopy) - 1
for bucketStatsIndex >= 0 {
currentBucketTime := bucketStatsCopy[bucketStatsIndex].BucketTime
if currentBucketTime.After(firstBucketTime) || currentBucketTime.Equal(firstBucketTime) {
resultBucketRoundedKey := currentBucketTime.Add(-1 * time.Second * time.Duration(intervalSeconds) / 2).Round(time.Second * time.Duration(intervalSeconds))
for protocolName, data := range bucketStatsCopy[bucketStatsIndex].ProtocolStats {
if _, ok := protocolToColor[protocolName]; !ok {
protocolToColor[protocolName] = data.Color
}
for methodName, dataOfMethod := range data.MethodsStats {
if _, ok := methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey]; !ok {
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey] = map[string]map[string]*AccumulativeStatsCounter{}
}
if _, ok := methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName]; !ok {
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName] = map[string]*AccumulativeStatsCounter{}
}
if _, ok := methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName]; !ok {
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName] = &AccumulativeStatsCounter{
Name: methodName,
EntriesCount: 0,
VolumeSizeBytes: 0,
}
}
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName].EntriesCount += dataOfMethod.EntriesCount
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName].VolumeSizeBytes += dataOfMethod.VolumeInBytes
}
}
}
bucketStatsIndex--
}
return ConvertToTimelineData(methodsPerProtocolPerTimeAggregated, protocolToColor)
}
func ConvertToTimelineData(methodsPerProtocolPerTimeAggregated map[time.Time]map[string]map[string]*AccumulativeStatsCounter, protocolToColor map[string]string) []*AccumulativeStatsProtocolTime {
finalResult := make([]*AccumulativeStatsProtocolTime, 0)
for timeKey, item := range methodsPerProtocolPerTimeAggregated {
protocolsData := make([]*AccumulativeStatsProtocol, 0)
for protocolName := range item {
entriesCount := 0
volumeSizeBytes := 0
methods := make([]*AccumulativeStatsCounter, 0)
for _, methodAccData := range methodsPerProtocolPerTimeAggregated[timeKey][protocolName] {
entriesCount += methodAccData.EntriesCount
volumeSizeBytes += methodAccData.VolumeSizeBytes
methods = append(methods, methodAccData)
}
protocolsData = append(protocolsData, &AccumulativeStatsProtocol{
AccumulativeStatsCounter: AccumulativeStatsCounter{
Name: protocolName,
EntriesCount: entriesCount,
VolumeSizeBytes: volumeSizeBytes,
},
Color: protocolToColor[protocolName],
Methods: methods,
})
}
finalResult = append(finalResult, &AccumulativeStatsProtocolTime{
Time: timeKey.UnixMilli(),
ProtocolsData: protocolsData,
})
} }
return finalResult return finalResult
} }
@@ -128,8 +233,15 @@ func EntryAdded(size int, summery *api.BaseEntry) {
generalStats.LastEntryTimestamp = currentTimestamp generalStats.LastEntryTimestamp = currentTimestamp
} }
//GetBucketOfTimeStamp Round the entry to the nearest threshold (one minute) floored (e.g: 15:31:45 -> 15:31:00)
func GetBucketOfTimeStamp(timestamp int64) time.Time {
entryTimeStampAsTime := time.UnixMilli(timestamp)
return entryTimeStampAsTime.Add(-1 * InternalBucketThreshold / 2).Round(InternalBucketThreshold)
}
func addToBucketStats(size int, summery *api.BaseEntry) { func addToBucketStats(size int, summery *api.BaseEntry) {
entryTimeBucketRounded := time.Unix(summery.Timestamp, 0).Round(time.Minute * 1) entryTimeBucketRounded := GetBucketOfTimeStamp(summery.Timestamp)
if len(bucketsStats) == 0 { if len(bucketsStats) == 0 {
bucketsStats = append(bucketsStats, &TimeFrameStatsValue{ bucketsStats = append(bucketsStats, &TimeFrameStatsValue{
BucketTime: entryTimeBucketRounded, BucketTime: entryTimeBucketRounded,

View File

@@ -83,3 +83,22 @@ func TestEntryAddedVolume(t *testing.T) {
} }
} }
func TestGetBucketOfTimeStamp(t *testing.T) {
tests := map[int64]time.Time{
time.Date(2022, time.Month(1), 1, 10, 34, 45, 0, time.Local).UnixMilli(): time.Date(2022, time.Month(1), 1, 10, 34, 00, 0, time.Local),
time.Date(2022, time.Month(1), 1, 10, 34, 00, 0, time.Local).UnixMilli(): time.Date(2022, time.Month(1), 1, 10, 34, 00, 0, time.Local),
time.Date(2022, time.Month(1), 1, 10, 59, 01, 0, time.Local).UnixMilli(): time.Date(2022, time.Month(1), 1, 10, 59, 00, 0, time.Local),
}
for key, value := range tests {
t.Run(fmt.Sprintf("%v", key), func(t *testing.T) {
actual := providers.GetBucketOfTimeStamp(key)
if actual != value {
t.Errorf("unexpected result - expected: %v, actual: %v", value, actual)
}
})
}
}

View File

@@ -16,7 +16,8 @@ func StatusRoutes(ginApp *gin.Engine) {
routeGroup.GET("/tap", controllers.GetTappingStatus) routeGroup.GET("/tap", controllers.GetTappingStatus)
routeGroup.GET("/general", controllers.GetGeneralStats) // get general stats about entries in DB routeGroup.GET("/general", controllers.GetGeneralStats) // get general stats about entries in DB
routeGroup.GET("/accumulative", controllers.GetAccumulativeStats) // get general stats about entries in DB routeGroup.GET("/accumulative", controllers.GetAccumulativeStats)
routeGroup.GET("/accumulativeTiming", controllers.GetAccumulativeStatsTiming)
routeGroup.GET("/resolving", controllers.GetCurrentResolvingInformation) routeGroup.GET("/resolving", controllers.GetCurrentResolvingInformation)
} }