From 6f117d0a847a33d4f79d1b54731acf55c54d7ba9 Mon Sep 17 00:00:00 2001 From: gadotroee <55343099+gadotroee@users.noreply.github.com> Date: Sun, 19 Jun 2022 16:59:35 +0300 Subject: [PATCH] Accumulative stats for protocol and methods (#1144) only data for pie chart currently --- agent/pkg/api/main.go | 14 +-- agent/pkg/controllers/status_controller.go | 4 + agent/pkg/providers/stats_provider.go | 130 ++++++++++++++++++++- agent/pkg/providers/stats_provider_test.go | 20 +++- agent/pkg/routes/status_routes.go | 1 + 5 files changed, 157 insertions(+), 12 deletions(-) diff --git a/agent/pkg/api/main.go b/agent/pkg/api/main.go index 37c49ba12..376d72999 100644 --- a/agent/pkg/api/main.go +++ b/agent/pkg/api/main.go @@ -11,16 +11,15 @@ import ( "strings" "time" - "github.com/up9inc/mizu/agent/pkg/models" - "github.com/up9inc/mizu/agent/pkg/dependency" + "github.com/up9inc/mizu/agent/pkg/models" + "github.com/up9inc/mizu/agent/pkg/oas" + "github.com/up9inc/mizu/agent/pkg/servicemap" + "github.com/up9inc/mizu/agent/pkg/har" "github.com/up9inc/mizu/agent/pkg/holder" "github.com/up9inc/mizu/agent/pkg/providers" - "github.com/up9inc/mizu/agent/pkg/oas" - "github.com/up9inc/mizu/agent/pkg/servicemap" - "github.com/up9inc/mizu/agent/pkg/resolver" "github.com/up9inc/mizu/agent/pkg/utils" @@ -144,13 +143,14 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension continue } - providers.EntryAdded(len(data)) - entryInserter := dependency.GetInstance(dependency.EntriesInserter).(EntryInserter) if err := entryInserter.Insert(mizuEntry); err != nil { logger.Log.Errorf("Error inserting entry, err: %v", err) } + summary := extension.Dissector.Summarize(mizuEntry) + providers.EntryAdded(len(data), summary) + serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMapSink) serviceMapGenerator.NewTCPEntry(mizuEntry.Source, mizuEntry.Destination, &item.Protocol) diff --git a/agent/pkg/controllers/status_controller.go b/agent/pkg/controllers/status_controller.go index 817ac006a..a1a096df0 100644 --- a/agent/pkg/controllers/status_controller.go +++ b/agent/pkg/controllers/status_controller.go @@ -79,6 +79,10 @@ func GetGeneralStats(c *gin.Context) { c.JSON(http.StatusOK, providers.GetGeneralStats()) } +func GetAccumulativeStats(c *gin.Context) { + c.JSON(http.StatusOK, providers.GetAccumulativeStats()) +} + func GetCurrentResolvingInformation(c *gin.Context) { c.JSON(http.StatusOK, holder.GetResolver().GetMap()) } diff --git a/agent/pkg/providers/stats_provider.go b/agent/pkg/providers/stats_provider.go index 4565b7868..eb60c1e45 100644 --- a/agent/pkg/providers/stats_provider.go +++ b/agent/pkg/providers/stats_provider.go @@ -3,6 +3,10 @@ package providers import ( "reflect" "time" + + "github.com/jinzhu/copier" + "github.com/up9inc/mizu/logger" + "github.com/up9inc/mizu/tap/api" ) type GeneralStats struct { @@ -12,7 +16,39 @@ type GeneralStats struct { LastEntryTimestamp int } -var generalStats = GeneralStats{} +type BucketStats []*TimeFrameStatsValue + +type TimeFrameStatsValue struct { + BucketTime time.Time + ProtocolStats map[string]ProtocolStats +} + +type ProtocolStats struct { + MethodsStats map[string]*SizeAndEntriesCount + Color string +} + +type SizeAndEntriesCount struct { + EntriesCount int + VolumeInBytes int +} + +type AccumulativeStatsCounter struct { + Name string `json:"name"` + EntriesCount int `json:"entriesCount"` + VolumeSizeBytes int `json:"volumeSizeBytes"` +} + +type AccumulativeStatsProtocol struct { + AccumulativeStatsCounter + Color string `json:"color"` + Methods []*AccumulativeStatsCounter `json:"methods"` +} + +var ( + generalStats = GeneralStats{} + bucketsStats = BucketStats{} +) func ResetGeneralStats() { generalStats = GeneralStats{} @@ -22,7 +58,62 @@ func GetGeneralStats() GeneralStats { return generalStats } -func EntryAdded(size int) { +func GetAccumulativeStats() []*AccumulativeStatsProtocol { + bucketStatsCopy := BucketStats{} + if err := copier.Copy(&bucketStatsCopy, bucketsStats); err != nil { + logger.Log.Errorf("Error while copying src stats into temporary copied object") + return make([]*AccumulativeStatsProtocol, 0) + } + + result := make(map[string]*AccumulativeStatsProtocol, 0) + methodsPerProtocolAggregated := make(map[string]map[string]*AccumulativeStatsCounter, 0) + for _, countersOfTimeFrame := range bucketStatsCopy { + for protocolName, value := range countersOfTimeFrame.ProtocolStats { + + if _, found := result[protocolName]; !found { + result[protocolName] = &AccumulativeStatsProtocol{ + AccumulativeStatsCounter: AccumulativeStatsCounter{ + Name: protocolName, + EntriesCount: 0, + VolumeSizeBytes: 0, + }, + Color: value.Color, + } + } + if _, found := methodsPerProtocolAggregated[protocolName]; !found { + methodsPerProtocolAggregated[protocolName] = map[string]*AccumulativeStatsCounter{} + } + + for method, countersValue := range value.MethodsStats { + if _, found := methodsPerProtocolAggregated[protocolName][method]; !found { + methodsPerProtocolAggregated[protocolName][method] = &AccumulativeStatsCounter{ + Name: method, + EntriesCount: 0, + VolumeSizeBytes: 0, + } + } + + result[protocolName].AccumulativeStatsCounter.EntriesCount += countersValue.EntriesCount + methodsPerProtocolAggregated[protocolName][method].EntriesCount += countersValue.EntriesCount + result[protocolName].AccumulativeStatsCounter.VolumeSizeBytes += countersValue.VolumeInBytes + methodsPerProtocolAggregated[protocolName][method].VolumeSizeBytes += countersValue.VolumeInBytes + } + } + } + + finalResult := make([]*AccumulativeStatsProtocol, 0) + for _, value := range result { + methodsForProtocol := make([]*AccumulativeStatsCounter, 0) + for _, methodValue := range methodsPerProtocolAggregated[value.Name] { + methodsForProtocol = append(methodsForProtocol, methodValue) + } + value.Methods = methodsForProtocol + finalResult = append(finalResult, value) + } + return finalResult +} + +func EntryAdded(size int, summery *api.BaseEntry) { generalStats.EntriesCount++ generalStats.EntriesVolumeInGB += float64(size) / (1 << 30) @@ -32,5 +123,40 @@ func EntryAdded(size int) { generalStats.FirstEntryTimestamp = currentTimestamp } + addToBucketStats(size, summery) + generalStats.LastEntryTimestamp = currentTimestamp } + +func addToBucketStats(size int, summery *api.BaseEntry) { + entryTimeBucketRounded := time.Unix(summery.Timestamp, 0).Round(time.Minute * 1) + if len(bucketsStats) == 0 { + bucketsStats = append(bucketsStats, &TimeFrameStatsValue{ + BucketTime: entryTimeBucketRounded, + ProtocolStats: map[string]ProtocolStats{}, + }) + } + bucketOfEntry := bucketsStats[len(bucketsStats)-1] + if bucketOfEntry.BucketTime != entryTimeBucketRounded { + bucketOfEntry = &TimeFrameStatsValue{ + BucketTime: entryTimeBucketRounded, + ProtocolStats: map[string]ProtocolStats{}, + } + bucketsStats = append(bucketsStats, bucketOfEntry) + } + if _, found := bucketOfEntry.ProtocolStats[summery.Protocol.Abbreviation]; !found { + bucketOfEntry.ProtocolStats[summery.Protocol.Abbreviation] = ProtocolStats{ + MethodsStats: map[string]*SizeAndEntriesCount{}, + Color: summery.Protocol.BackgroundColor, + } + } + if _, found := bucketOfEntry.ProtocolStats[summery.Protocol.Abbreviation].MethodsStats[summery.Method]; !found { + bucketOfEntry.ProtocolStats[summery.Protocol.Abbreviation].MethodsStats[summery.Method] = &SizeAndEntriesCount{ + VolumeInBytes: 0, + EntriesCount: 0, + } + } + + bucketOfEntry.ProtocolStats[summery.Protocol.Abbreviation].MethodsStats[summery.Method].EntriesCount += 1 + bucketOfEntry.ProtocolStats[summery.Protocol.Abbreviation].MethodsStats[summery.Method].VolumeInBytes += size +} diff --git a/agent/pkg/providers/stats_provider_test.go b/agent/pkg/providers/stats_provider_test.go index 7c0631957..36e10df86 100644 --- a/agent/pkg/providers/stats_provider_test.go +++ b/agent/pkg/providers/stats_provider_test.go @@ -3,8 +3,10 @@ package providers_test import ( "fmt" "testing" + "time" "github.com/up9inc/mizu/agent/pkg/providers" + "github.com/up9inc/mizu/tap/api" ) func TestNoEntryAddedCount(t *testing.T) { @@ -22,10 +24,13 @@ func TestNoEntryAddedCount(t *testing.T) { func TestEntryAddedCount(t *testing.T) { tests := []int{1, 5, 10, 100, 500, 1000} + entryBucketKey := time.Date(2021, 1, 1, 10, 0, 0, 0, time.UTC) + valueLessThanBucketThreshold := time.Second * 130 + mockSummery := &api.BaseEntry{Protocol: api.Protocol{Name: "mock"}, Method: "mock-method", Timestamp: entryBucketKey.Add(valueLessThanBucketThreshold).UnixNano()} for _, entriesCount := range tests { t.Run(fmt.Sprintf("%d", entriesCount), func(t *testing.T) { for i := 0; i < entriesCount; i++ { - providers.EntryAdded(0) + providers.EntryAdded(0, mockSummery) } entriesStats := providers.GetGeneralStats() @@ -38,7 +43,14 @@ func TestEntryAddedCount(t *testing.T) { t.Errorf("unexpected result - expected: %v, actual: %v", 0, entriesStats.EntriesVolumeInGB) } - t.Cleanup(providers.ResetGeneralStats) + t.Cleanup(func() { + providers.ResetGeneralStats() + generalStats := providers.GetGeneralStats() + if generalStats.EntriesCount != 0 { + t.Errorf("unexpected result - expected: %v, actual: %v", 0, generalStats.EntriesCount) + } + + }) }) } } @@ -49,12 +61,14 @@ func TestEntryAddedVolume(t *testing.T) { var expectedEntriesCount int var expectedVolumeInGB float64 + mockSummery := &api.BaseEntry{Protocol: api.Protocol{Name: "mock"}, Method: "mock-method", Timestamp: time.Date(2021, 1, 1, 10, 0, 0, 0, time.UTC).UnixNano()} + for _, data := range tests { t.Run(fmt.Sprintf("%d", len(data)), func(t *testing.T) { expectedEntriesCount++ expectedVolumeInGB += float64(len(data)) / (1 << 30) - providers.EntryAdded(len(data)) + providers.EntryAdded(len(data), mockSummery) entriesStats := providers.GetGeneralStats() diff --git a/agent/pkg/routes/status_routes.go b/agent/pkg/routes/status_routes.go index 7914df2b1..c82a4945b 100644 --- a/agent/pkg/routes/status_routes.go +++ b/agent/pkg/routes/status_routes.go @@ -16,6 +16,7 @@ func StatusRoutes(ginApp *gin.Engine) { routeGroup.GET("/tap", controllers.GetTappingStatus) routeGroup.GET("/general", controllers.GetGeneralStats) // get general stats about entries in DB + routeGroup.GET("/accumulative", controllers.GetAccumulativeStats) // get general stats about entries in DB routeGroup.GET("/resolving", controllers.GetCurrentResolvingInformation) }