kubeshark/agent/pkg/providers/stats_provider.go
gadotroee ec11b21b51
Add available protocols to the stats endpoint (colors for methods are coming from server) (#1184)
* add protocols array to the endpoint

* no message

* no message

* fix tests and small fix for the iteration

* fix the color of the protocol

* Get protocols list and method colors from server

* fix tests

* cr fixes

Co-authored-by: Amit Fainholts <amit@up9.com>
2022-07-05 15:30:39 +03:00

358 lines
12 KiB
Go

package providers
import (
"crypto/md5"
"encoding/hex"
"fmt"
"reflect"
"strings"
"sync"
"time"
"github.com/jinzhu/copier"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/tap/api"
)
type GeneralStats struct {
EntriesCount int
EntriesVolumeInGB float64
FirstEntryTimestamp int
LastEntryTimestamp int
}
type BucketStats []*TimeFrameStatsValue
type TimeFrameStatsValue struct {
BucketTime time.Time `json:"timestamp"`
ProtocolStats map[string]ProtocolStats `json:"protocols"`
}
type ProtocolStats struct {
MethodsStats map[string]*SizeAndEntriesCount `json:"methods"`
}
type SizeAndEntriesCount struct {
EntriesCount int `json:"entriesCount"`
VolumeInBytes int `json:"volumeInBytes"`
}
type AccumulativeStatsCounter struct {
Name string `json:"name"`
Color string `json:"color"`
EntriesCount int `json:"entriesCount"`
VolumeSizeBytes int `json:"volumeSizeBytes"`
}
type AccumulativeStatsProtocol struct {
AccumulativeStatsCounter
Methods []*AccumulativeStatsCounter `json:"methods"`
}
type AccumulativeStatsProtocolTime struct {
ProtocolsData []*AccumulativeStatsProtocol `json:"protocols"`
Time int64 `json:"timestamp"`
}
type TrafficStatsResponse struct {
Protocols []string `json:"protocols"`
PieStats []*AccumulativeStatsProtocol `json:"pie"`
TimelineStats []*AccumulativeStatsProtocolTime `json:"timeline"`
}
var (
generalStats = GeneralStats{}
bucketsStats = BucketStats{}
bucketStatsLocker = sync.Mutex{}
protocolToColor = map[string]string{}
)
const (
InternalBucketThreshold = time.Minute * 1
MaxNumberOfBars = 30
)
func ResetGeneralStats() {
generalStats = GeneralStats{}
}
func GetGeneralStats() *GeneralStats {
return &generalStats
}
func InitProtocolToColor(protocolMap map[string]*api.Protocol) {
for item, value := range protocolMap {
splitted := strings.SplitN(item, "/", 3)
protocolToColor[splitted[len(splitted)-1]] = value.BackgroundColor
}
}
func GetTrafficStats() *TrafficStatsResponse {
bucketsStatsCopy := getBucketStatsCopy()
return &TrafficStatsResponse{
Protocols: getAvailableProtocols(bucketsStatsCopy),
PieStats: getAccumulativeStats(bucketsStatsCopy),
TimelineStats: getAccumulativeStatsTiming(bucketsStatsCopy),
}
}
func EntryAdded(size int, summery *api.BaseEntry) {
generalStats.EntriesCount++
generalStats.EntriesVolumeInGB += float64(size) / (1 << 30)
currentTimestamp := int(time.Now().Unix())
if reflect.Value.IsZero(reflect.ValueOf(generalStats.FirstEntryTimestamp)) {
generalStats.FirstEntryTimestamp = currentTimestamp
}
addToBucketStats(size, summery)
generalStats.LastEntryTimestamp = currentTimestamp
}
func calculateInterval(firstTimestamp int64, lastTimestamp int64) time.Duration {
validDurations := []time.Duration{
time.Minute,
time.Minute * 2,
time.Minute * 3,
time.Minute * 5,
time.Minute * 10,
time.Minute * 15,
time.Minute * 20,
time.Minute * 30,
time.Minute * 45,
time.Minute * 60,
time.Minute * 75,
time.Minute * 90, // 1.5 minutes
time.Minute * 120, // 2 hours
time.Minute * 150, // 2.5 hours
time.Minute * 180, // 3 hours
time.Minute * 240, // 4 hours
time.Minute * 300, // 5 hours
time.Minute * 360, // 6 hours
time.Minute * 420, // 7 hours
time.Minute * 480, // 8 hours
time.Minute * 540, // 9 hours
time.Minute * 600, // 10 hours
time.Minute * 660, // 11 hours
time.Minute * 720, // 12 hours
time.Minute * 1440, // 24 hours
}
duration := time.Duration(lastTimestamp-firstTimestamp) * time.Second / time.Duration(MaxNumberOfBars)
for _, validDuration := range validDurations {
if validDuration-duration >= 0 {
return validDuration
}
}
return duration.Round(validDurations[len(validDurations)-1])
}
func getAccumulativeStats(stats BucketStats) []*AccumulativeStatsProtocol {
if len(stats) == 0 {
return make([]*AccumulativeStatsProtocol, 0)
}
methodsPerProtocolAggregated := getAggregatedStats(stats)
return convertAccumulativeStatsDictToArray(methodsPerProtocolAggregated)
}
func getAccumulativeStatsTiming(stats BucketStats) []*AccumulativeStatsProtocolTime {
if len(stats) == 0 {
return make([]*AccumulativeStatsProtocolTime, 0)
}
interval := calculateInterval(stats[0].BucketTime.Unix(), stats[len(stats)-1].BucketTime.Unix()) // in seconds
methodsPerProtocolPerTimeAggregated := getAggregatedResultTiming(stats, interval)
return convertAccumulativeStatsTimelineDictToArray(methodsPerProtocolPerTimeAggregated)
}
func addToBucketStats(size int, summery *api.BaseEntry) {
entryTimeBucketRounded := getBucketFromTimeStamp(summery.Timestamp)
if len(bucketsStats) == 0 {
bucketsStats = append(bucketsStats, &TimeFrameStatsValue{
BucketTime: entryTimeBucketRounded,
ProtocolStats: map[string]ProtocolStats{},
})
}
bucketOfEntry := bucketsStats[len(bucketsStats)-1]
if bucketOfEntry.BucketTime != entryTimeBucketRounded {
bucketOfEntry = &TimeFrameStatsValue{
BucketTime: entryTimeBucketRounded,
ProtocolStats: map[string]ProtocolStats{},
}
bucketsStats = append(bucketsStats, bucketOfEntry)
}
if _, found := bucketOfEntry.ProtocolStats[summery.Protocol.Abbreviation]; !found {
bucketOfEntry.ProtocolStats[summery.Protocol.Abbreviation] = ProtocolStats{
MethodsStats: map[string]*SizeAndEntriesCount{},
}
}
if _, found := bucketOfEntry.ProtocolStats[summery.Protocol.Abbreviation].MethodsStats[summery.Method]; !found {
bucketOfEntry.ProtocolStats[summery.Protocol.Abbreviation].MethodsStats[summery.Method] = &SizeAndEntriesCount{
VolumeInBytes: 0,
EntriesCount: 0,
}
}
bucketOfEntry.ProtocolStats[summery.Protocol.Abbreviation].MethodsStats[summery.Method].EntriesCount += 1
bucketOfEntry.ProtocolStats[summery.Protocol.Abbreviation].MethodsStats[summery.Method].VolumeInBytes += size
}
func getBucketFromTimeStamp(timestamp int64) time.Time {
entryTimeStampAsTime := time.UnixMilli(timestamp)
return entryTimeStampAsTime.Add(-1 * InternalBucketThreshold / 2).Round(InternalBucketThreshold)
}
func convertAccumulativeStatsTimelineDictToArray(methodsPerProtocolPerTimeAggregated map[time.Time]map[string]map[string]*AccumulativeStatsCounter) []*AccumulativeStatsProtocolTime {
finalResult := make([]*AccumulativeStatsProtocolTime, 0)
for timeKey, item := range methodsPerProtocolPerTimeAggregated {
protocolsData := make([]*AccumulativeStatsProtocol, 0)
for protocolName, value := range item {
entriesCount := 0
volumeSizeBytes := 0
methods := make([]*AccumulativeStatsCounter, 0)
for _, methodAccData := range value {
entriesCount += methodAccData.EntriesCount
volumeSizeBytes += methodAccData.VolumeSizeBytes
methods = append(methods, methodAccData)
}
protocolsData = append(protocolsData, &AccumulativeStatsProtocol{
AccumulativeStatsCounter: AccumulativeStatsCounter{
Name: protocolName,
Color: protocolToColor[protocolName],
EntriesCount: entriesCount,
VolumeSizeBytes: volumeSizeBytes,
},
Methods: methods,
})
}
finalResult = append(finalResult, &AccumulativeStatsProtocolTime{
Time: timeKey.UnixMilli(),
ProtocolsData: protocolsData,
})
}
return finalResult
}
func convertAccumulativeStatsDictToArray(methodsPerProtocolAggregated map[string]map[string]*AccumulativeStatsCounter) []*AccumulativeStatsProtocol {
protocolsData := make([]*AccumulativeStatsProtocol, 0)
for protocolName, value := range methodsPerProtocolAggregated {
entriesCount := 0
volumeSizeBytes := 0
methods := make([]*AccumulativeStatsCounter, 0)
for _, methodAccData := range value {
entriesCount += methodAccData.EntriesCount
volumeSizeBytes += methodAccData.VolumeSizeBytes
methods = append(methods, methodAccData)
}
protocolsData = append(protocolsData, &AccumulativeStatsProtocol{
AccumulativeStatsCounter: AccumulativeStatsCounter{
Name: protocolName,
Color: protocolToColor[protocolName],
EntriesCount: entriesCount,
VolumeSizeBytes: volumeSizeBytes,
},
Methods: methods,
})
}
return protocolsData
}
func getBucketStatsCopy() BucketStats {
bucketStatsCopy := BucketStats{}
bucketStatsLocker.Lock()
if err := copier.Copy(&bucketStatsCopy, bucketsStats); err != nil {
logger.Log.Errorf("Error while copying src stats into temporary copied object")
return nil
}
bucketStatsLocker.Unlock()
return bucketStatsCopy
}
func getAggregatedResultTiming(stats BucketStats, interval time.Duration) map[time.Time]map[string]map[string]*AccumulativeStatsCounter {
methodsPerProtocolPerTimeAggregated := map[time.Time]map[string]map[string]*AccumulativeStatsCounter{}
bucketStatsIndex := len(stats) - 1
for bucketStatsIndex >= 0 {
currentBucketTime := stats[bucketStatsIndex].BucketTime
resultBucketRoundedKey := currentBucketTime.Add(-1 * interval / 2).Round(interval)
for protocolName, data := range stats[bucketStatsIndex].ProtocolStats {
for methodName, dataOfMethod := range data.MethodsStats {
if _, ok := methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey]; !ok {
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey] = map[string]map[string]*AccumulativeStatsCounter{}
}
if _, ok := methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName]; !ok {
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName] = map[string]*AccumulativeStatsCounter{}
}
if _, ok := methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName]; !ok {
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName] = &AccumulativeStatsCounter{
Name: methodName,
Color: getColorForMethod(protocolName, methodName),
EntriesCount: 0,
VolumeSizeBytes: 0,
}
}
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName].EntriesCount += dataOfMethod.EntriesCount
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName].VolumeSizeBytes += dataOfMethod.VolumeInBytes
}
}
bucketStatsIndex--
}
return methodsPerProtocolPerTimeAggregated
}
func getAggregatedStats(stats BucketStats) map[string]map[string]*AccumulativeStatsCounter {
methodsPerProtocolAggregated := make(map[string]map[string]*AccumulativeStatsCounter, 0)
for _, countersOfTimeFrame := range stats {
for protocolName, value := range countersOfTimeFrame.ProtocolStats {
for method, countersValue := range value.MethodsStats {
if _, found := methodsPerProtocolAggregated[protocolName]; !found {
methodsPerProtocolAggregated[protocolName] = map[string]*AccumulativeStatsCounter{}
}
if _, found := methodsPerProtocolAggregated[protocolName][method]; !found {
methodsPerProtocolAggregated[protocolName][method] = &AccumulativeStatsCounter{
Name: method,
Color: getColorForMethod(protocolName, method),
EntriesCount: 0,
VolumeSizeBytes: 0,
}
}
methodsPerProtocolAggregated[protocolName][method].EntriesCount += countersValue.EntriesCount
methodsPerProtocolAggregated[protocolName][method].VolumeSizeBytes += countersValue.VolumeInBytes
}
}
}
return methodsPerProtocolAggregated
}
func getColorForMethod(protocolName string, methodName string) string {
hash := md5.Sum([]byte(fmt.Sprintf("%v_%v", protocolName, methodName)))
input := hex.EncodeToString(hash[:])
return fmt.Sprintf("#%v", input[:6])
}
func getAvailableProtocols(stats BucketStats) []string {
protocols := map[string]bool{}
for _, countersOfTimeFrame := range stats {
for protocolName := range countersOfTimeFrame.ProtocolStats {
protocols[protocolName] = true
}
}
result := make([]string, 0)
for protocol := range protocols {
result = append(result, protocol)
}
result = append(result, "ALL")
return result
}