Accumulative stats for protocol and methods (#1144)

only data for pie chart currently
This commit is contained in:
gadotroee 2022-06-19 16:59:35 +03:00 committed by GitHub
parent 99cb0b4f44
commit 6f117d0a84
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 157 additions and 12 deletions

View File

@ -11,16 +11,15 @@ import (
"strings" "strings"
"time" "time"
"github.com/up9inc/mizu/agent/pkg/models"
"github.com/up9inc/mizu/agent/pkg/dependency" "github.com/up9inc/mizu/agent/pkg/dependency"
"github.com/up9inc/mizu/agent/pkg/models"
"github.com/up9inc/mizu/agent/pkg/oas"
"github.com/up9inc/mizu/agent/pkg/servicemap"
"github.com/up9inc/mizu/agent/pkg/har" "github.com/up9inc/mizu/agent/pkg/har"
"github.com/up9inc/mizu/agent/pkg/holder" "github.com/up9inc/mizu/agent/pkg/holder"
"github.com/up9inc/mizu/agent/pkg/providers" "github.com/up9inc/mizu/agent/pkg/providers"
"github.com/up9inc/mizu/agent/pkg/oas"
"github.com/up9inc/mizu/agent/pkg/servicemap"
"github.com/up9inc/mizu/agent/pkg/resolver" "github.com/up9inc/mizu/agent/pkg/resolver"
"github.com/up9inc/mizu/agent/pkg/utils" "github.com/up9inc/mizu/agent/pkg/utils"
@ -144,13 +143,14 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
continue continue
} }
providers.EntryAdded(len(data))
entryInserter := dependency.GetInstance(dependency.EntriesInserter).(EntryInserter) entryInserter := dependency.GetInstance(dependency.EntriesInserter).(EntryInserter)
if err := entryInserter.Insert(mizuEntry); err != nil { if err := entryInserter.Insert(mizuEntry); err != nil {
logger.Log.Errorf("Error inserting entry, err: %v", err) logger.Log.Errorf("Error inserting entry, err: %v", err)
} }
summary := extension.Dissector.Summarize(mizuEntry)
providers.EntryAdded(len(data), summary)
serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMapSink) serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMapSink)
serviceMapGenerator.NewTCPEntry(mizuEntry.Source, mizuEntry.Destination, &item.Protocol) serviceMapGenerator.NewTCPEntry(mizuEntry.Source, mizuEntry.Destination, &item.Protocol)

View File

@ -79,6 +79,10 @@ func GetGeneralStats(c *gin.Context) {
c.JSON(http.StatusOK, providers.GetGeneralStats()) c.JSON(http.StatusOK, providers.GetGeneralStats())
} }
func GetAccumulativeStats(c *gin.Context) {
c.JSON(http.StatusOK, providers.GetAccumulativeStats())
}
func GetCurrentResolvingInformation(c *gin.Context) { func GetCurrentResolvingInformation(c *gin.Context) {
c.JSON(http.StatusOK, holder.GetResolver().GetMap()) c.JSON(http.StatusOK, holder.GetResolver().GetMap())
} }

View File

@ -3,6 +3,10 @@ package providers
import ( import (
"reflect" "reflect"
"time" "time"
"github.com/jinzhu/copier"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/tap/api"
) )
type GeneralStats struct { type GeneralStats struct {
@ -12,7 +16,39 @@ type GeneralStats struct {
LastEntryTimestamp int LastEntryTimestamp int
} }
var generalStats = GeneralStats{} type BucketStats []*TimeFrameStatsValue
type TimeFrameStatsValue struct {
BucketTime time.Time
ProtocolStats map[string]ProtocolStats
}
type ProtocolStats struct {
MethodsStats map[string]*SizeAndEntriesCount
Color string
}
type SizeAndEntriesCount struct {
EntriesCount int
VolumeInBytes int
}
type AccumulativeStatsCounter struct {
Name string `json:"name"`
EntriesCount int `json:"entriesCount"`
VolumeSizeBytes int `json:"volumeSizeBytes"`
}
type AccumulativeStatsProtocol struct {
AccumulativeStatsCounter
Color string `json:"color"`
Methods []*AccumulativeStatsCounter `json:"methods"`
}
var (
generalStats = GeneralStats{}
bucketsStats = BucketStats{}
)
func ResetGeneralStats() { func ResetGeneralStats() {
generalStats = GeneralStats{} generalStats = GeneralStats{}
@ -22,7 +58,62 @@ func GetGeneralStats() GeneralStats {
return generalStats return generalStats
} }
func EntryAdded(size int) { func GetAccumulativeStats() []*AccumulativeStatsProtocol {
bucketStatsCopy := BucketStats{}
if err := copier.Copy(&bucketStatsCopy, bucketsStats); err != nil {
logger.Log.Errorf("Error while copying src stats into temporary copied object")
return make([]*AccumulativeStatsProtocol, 0)
}
result := make(map[string]*AccumulativeStatsProtocol, 0)
methodsPerProtocolAggregated := make(map[string]map[string]*AccumulativeStatsCounter, 0)
for _, countersOfTimeFrame := range bucketStatsCopy {
for protocolName, value := range countersOfTimeFrame.ProtocolStats {
if _, found := result[protocolName]; !found {
result[protocolName] = &AccumulativeStatsProtocol{
AccumulativeStatsCounter: AccumulativeStatsCounter{
Name: protocolName,
EntriesCount: 0,
VolumeSizeBytes: 0,
},
Color: value.Color,
}
}
if _, found := methodsPerProtocolAggregated[protocolName]; !found {
methodsPerProtocolAggregated[protocolName] = map[string]*AccumulativeStatsCounter{}
}
for method, countersValue := range value.MethodsStats {
if _, found := methodsPerProtocolAggregated[protocolName][method]; !found {
methodsPerProtocolAggregated[protocolName][method] = &AccumulativeStatsCounter{
Name: method,
EntriesCount: 0,
VolumeSizeBytes: 0,
}
}
result[protocolName].AccumulativeStatsCounter.EntriesCount += countersValue.EntriesCount
methodsPerProtocolAggregated[protocolName][method].EntriesCount += countersValue.EntriesCount
result[protocolName].AccumulativeStatsCounter.VolumeSizeBytes += countersValue.VolumeInBytes
methodsPerProtocolAggregated[protocolName][method].VolumeSizeBytes += countersValue.VolumeInBytes
}
}
}
finalResult := make([]*AccumulativeStatsProtocol, 0)
for _, value := range result {
methodsForProtocol := make([]*AccumulativeStatsCounter, 0)
for _, methodValue := range methodsPerProtocolAggregated[value.Name] {
methodsForProtocol = append(methodsForProtocol, methodValue)
}
value.Methods = methodsForProtocol
finalResult = append(finalResult, value)
}
return finalResult
}
func EntryAdded(size int, summery *api.BaseEntry) {
generalStats.EntriesCount++ generalStats.EntriesCount++
generalStats.EntriesVolumeInGB += float64(size) / (1 << 30) generalStats.EntriesVolumeInGB += float64(size) / (1 << 30)
@ -32,5 +123,40 @@ func EntryAdded(size int) {
generalStats.FirstEntryTimestamp = currentTimestamp generalStats.FirstEntryTimestamp = currentTimestamp
} }
addToBucketStats(size, summery)
generalStats.LastEntryTimestamp = currentTimestamp generalStats.LastEntryTimestamp = currentTimestamp
} }
func addToBucketStats(size int, summery *api.BaseEntry) {
entryTimeBucketRounded := time.Unix(summery.Timestamp, 0).Round(time.Minute * 1)
if len(bucketsStats) == 0 {
bucketsStats = append(bucketsStats, &TimeFrameStatsValue{
BucketTime: entryTimeBucketRounded,
ProtocolStats: map[string]ProtocolStats{},
})
}
bucketOfEntry := bucketsStats[len(bucketsStats)-1]
if bucketOfEntry.BucketTime != entryTimeBucketRounded {
bucketOfEntry = &TimeFrameStatsValue{
BucketTime: entryTimeBucketRounded,
ProtocolStats: map[string]ProtocolStats{},
}
bucketsStats = append(bucketsStats, bucketOfEntry)
}
if _, found := bucketOfEntry.ProtocolStats[summery.Protocol.Abbreviation]; !found {
bucketOfEntry.ProtocolStats[summery.Protocol.Abbreviation] = ProtocolStats{
MethodsStats: map[string]*SizeAndEntriesCount{},
Color: summery.Protocol.BackgroundColor,
}
}
if _, found := bucketOfEntry.ProtocolStats[summery.Protocol.Abbreviation].MethodsStats[summery.Method]; !found {
bucketOfEntry.ProtocolStats[summery.Protocol.Abbreviation].MethodsStats[summery.Method] = &SizeAndEntriesCount{
VolumeInBytes: 0,
EntriesCount: 0,
}
}
bucketOfEntry.ProtocolStats[summery.Protocol.Abbreviation].MethodsStats[summery.Method].EntriesCount += 1
bucketOfEntry.ProtocolStats[summery.Protocol.Abbreviation].MethodsStats[summery.Method].VolumeInBytes += size
}

View File

@ -3,8 +3,10 @@ package providers_test
import ( import (
"fmt" "fmt"
"testing" "testing"
"time"
"github.com/up9inc/mizu/agent/pkg/providers" "github.com/up9inc/mizu/agent/pkg/providers"
"github.com/up9inc/mizu/tap/api"
) )
func TestNoEntryAddedCount(t *testing.T) { func TestNoEntryAddedCount(t *testing.T) {
@ -22,10 +24,13 @@ func TestNoEntryAddedCount(t *testing.T) {
func TestEntryAddedCount(t *testing.T) { func TestEntryAddedCount(t *testing.T) {
tests := []int{1, 5, 10, 100, 500, 1000} tests := []int{1, 5, 10, 100, 500, 1000}
entryBucketKey := time.Date(2021, 1, 1, 10, 0, 0, 0, time.UTC)
valueLessThanBucketThreshold := time.Second * 130
mockSummery := &api.BaseEntry{Protocol: api.Protocol{Name: "mock"}, Method: "mock-method", Timestamp: entryBucketKey.Add(valueLessThanBucketThreshold).UnixNano()}
for _, entriesCount := range tests { for _, entriesCount := range tests {
t.Run(fmt.Sprintf("%d", entriesCount), func(t *testing.T) { t.Run(fmt.Sprintf("%d", entriesCount), func(t *testing.T) {
for i := 0; i < entriesCount; i++ { for i := 0; i < entriesCount; i++ {
providers.EntryAdded(0) providers.EntryAdded(0, mockSummery)
} }
entriesStats := providers.GetGeneralStats() entriesStats := providers.GetGeneralStats()
@ -38,7 +43,14 @@ func TestEntryAddedCount(t *testing.T) {
t.Errorf("unexpected result - expected: %v, actual: %v", 0, entriesStats.EntriesVolumeInGB) t.Errorf("unexpected result - expected: %v, actual: %v", 0, entriesStats.EntriesVolumeInGB)
} }
t.Cleanup(providers.ResetGeneralStats) t.Cleanup(func() {
providers.ResetGeneralStats()
generalStats := providers.GetGeneralStats()
if generalStats.EntriesCount != 0 {
t.Errorf("unexpected result - expected: %v, actual: %v", 0, generalStats.EntriesCount)
}
})
}) })
} }
} }
@ -49,12 +61,14 @@ func TestEntryAddedVolume(t *testing.T) {
var expectedEntriesCount int var expectedEntriesCount int
var expectedVolumeInGB float64 var expectedVolumeInGB float64
mockSummery := &api.BaseEntry{Protocol: api.Protocol{Name: "mock"}, Method: "mock-method", Timestamp: time.Date(2021, 1, 1, 10, 0, 0, 0, time.UTC).UnixNano()}
for _, data := range tests { for _, data := range tests {
t.Run(fmt.Sprintf("%d", len(data)), func(t *testing.T) { t.Run(fmt.Sprintf("%d", len(data)), func(t *testing.T) {
expectedEntriesCount++ expectedEntriesCount++
expectedVolumeInGB += float64(len(data)) / (1 << 30) expectedVolumeInGB += float64(len(data)) / (1 << 30)
providers.EntryAdded(len(data)) providers.EntryAdded(len(data), mockSummery)
entriesStats := providers.GetGeneralStats() entriesStats := providers.GetGeneralStats()

View File

@ -16,6 +16,7 @@ func StatusRoutes(ginApp *gin.Engine) {
routeGroup.GET("/tap", controllers.GetTappingStatus) routeGroup.GET("/tap", controllers.GetTappingStatus)
routeGroup.GET("/general", controllers.GetGeneralStats) // get general stats about entries in DB routeGroup.GET("/general", controllers.GetGeneralStats) // get general stats about entries in DB
routeGroup.GET("/accumulative", controllers.GetAccumulativeStats) // get general stats about entries in DB
routeGroup.GET("/resolving", controllers.GetCurrentResolvingInformation) routeGroup.GET("/resolving", controllers.GetCurrentResolvingInformation)
} }