mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-08-14 14:43:46 +00:00
* add protocols array to the endpoint * no message * no message * fix tests and small fix for the iteration * fix the color of the protocol * Get protocols list and method colors from server * fix tests * cr fixes Co-authored-by: Amit Fainholts <amit@up9.com>
358 lines
12 KiB
Go
358 lines
12 KiB
Go
package providers
|
|
|
|
import (
|
|
"crypto/md5"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"reflect"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/jinzhu/copier"
|
|
"github.com/up9inc/mizu/logger"
|
|
"github.com/up9inc/mizu/tap/api"
|
|
)
|
|
|
|
type GeneralStats struct {
|
|
EntriesCount int
|
|
EntriesVolumeInGB float64
|
|
FirstEntryTimestamp int
|
|
LastEntryTimestamp int
|
|
}
|
|
|
|
type BucketStats []*TimeFrameStatsValue
|
|
|
|
type TimeFrameStatsValue struct {
|
|
BucketTime time.Time `json:"timestamp"`
|
|
ProtocolStats map[string]ProtocolStats `json:"protocols"`
|
|
}
|
|
|
|
type ProtocolStats struct {
|
|
MethodsStats map[string]*SizeAndEntriesCount `json:"methods"`
|
|
}
|
|
|
|
type SizeAndEntriesCount struct {
|
|
EntriesCount int `json:"entriesCount"`
|
|
VolumeInBytes int `json:"volumeInBytes"`
|
|
}
|
|
|
|
type AccumulativeStatsCounter struct {
|
|
Name string `json:"name"`
|
|
Color string `json:"color"`
|
|
EntriesCount int `json:"entriesCount"`
|
|
VolumeSizeBytes int `json:"volumeSizeBytes"`
|
|
}
|
|
|
|
type AccumulativeStatsProtocol struct {
|
|
AccumulativeStatsCounter
|
|
Methods []*AccumulativeStatsCounter `json:"methods"`
|
|
}
|
|
|
|
type AccumulativeStatsProtocolTime struct {
|
|
ProtocolsData []*AccumulativeStatsProtocol `json:"protocols"`
|
|
Time int64 `json:"timestamp"`
|
|
}
|
|
|
|
type TrafficStatsResponse struct {
|
|
Protocols []string `json:"protocols"`
|
|
PieStats []*AccumulativeStatsProtocol `json:"pie"`
|
|
TimelineStats []*AccumulativeStatsProtocolTime `json:"timeline"`
|
|
}
|
|
|
|
var (
|
|
generalStats = GeneralStats{}
|
|
bucketsStats = BucketStats{}
|
|
bucketStatsLocker = sync.Mutex{}
|
|
protocolToColor = map[string]string{}
|
|
)
|
|
|
|
const (
|
|
InternalBucketThreshold = time.Minute * 1
|
|
MaxNumberOfBars = 30
|
|
)
|
|
|
|
func ResetGeneralStats() {
|
|
generalStats = GeneralStats{}
|
|
}
|
|
|
|
func GetGeneralStats() *GeneralStats {
|
|
return &generalStats
|
|
}
|
|
|
|
func InitProtocolToColor(protocolMap map[string]*api.Protocol) {
|
|
for item, value := range protocolMap {
|
|
splitted := strings.SplitN(item, "/", 3)
|
|
protocolToColor[splitted[len(splitted)-1]] = value.BackgroundColor
|
|
}
|
|
}
|
|
|
|
func GetTrafficStats() *TrafficStatsResponse {
|
|
bucketsStatsCopy := getBucketStatsCopy()
|
|
|
|
return &TrafficStatsResponse{
|
|
Protocols: getAvailableProtocols(bucketsStatsCopy),
|
|
PieStats: getAccumulativeStats(bucketsStatsCopy),
|
|
TimelineStats: getAccumulativeStatsTiming(bucketsStatsCopy),
|
|
}
|
|
}
|
|
|
|
func EntryAdded(size int, summery *api.BaseEntry) {
|
|
generalStats.EntriesCount++
|
|
generalStats.EntriesVolumeInGB += float64(size) / (1 << 30)
|
|
|
|
currentTimestamp := int(time.Now().Unix())
|
|
|
|
if reflect.Value.IsZero(reflect.ValueOf(generalStats.FirstEntryTimestamp)) {
|
|
generalStats.FirstEntryTimestamp = currentTimestamp
|
|
}
|
|
|
|
addToBucketStats(size, summery)
|
|
|
|
generalStats.LastEntryTimestamp = currentTimestamp
|
|
}
|
|
|
|
func calculateInterval(firstTimestamp int64, lastTimestamp int64) time.Duration {
|
|
validDurations := []time.Duration{
|
|
time.Minute,
|
|
time.Minute * 2,
|
|
time.Minute * 3,
|
|
time.Minute * 5,
|
|
time.Minute * 10,
|
|
time.Minute * 15,
|
|
time.Minute * 20,
|
|
time.Minute * 30,
|
|
time.Minute * 45,
|
|
time.Minute * 60,
|
|
time.Minute * 75,
|
|
time.Minute * 90, // 1.5 minutes
|
|
time.Minute * 120, // 2 hours
|
|
time.Minute * 150, // 2.5 hours
|
|
time.Minute * 180, // 3 hours
|
|
time.Minute * 240, // 4 hours
|
|
time.Minute * 300, // 5 hours
|
|
time.Minute * 360, // 6 hours
|
|
time.Minute * 420, // 7 hours
|
|
time.Minute * 480, // 8 hours
|
|
time.Minute * 540, // 9 hours
|
|
time.Minute * 600, // 10 hours
|
|
time.Minute * 660, // 11 hours
|
|
time.Minute * 720, // 12 hours
|
|
time.Minute * 1440, // 24 hours
|
|
}
|
|
duration := time.Duration(lastTimestamp-firstTimestamp) * time.Second / time.Duration(MaxNumberOfBars)
|
|
for _, validDuration := range validDurations {
|
|
if validDuration-duration >= 0 {
|
|
return validDuration
|
|
}
|
|
}
|
|
return duration.Round(validDurations[len(validDurations)-1])
|
|
|
|
}
|
|
|
|
func getAccumulativeStats(stats BucketStats) []*AccumulativeStatsProtocol {
|
|
if len(stats) == 0 {
|
|
return make([]*AccumulativeStatsProtocol, 0)
|
|
}
|
|
|
|
methodsPerProtocolAggregated := getAggregatedStats(stats)
|
|
|
|
return convertAccumulativeStatsDictToArray(methodsPerProtocolAggregated)
|
|
}
|
|
|
|
func getAccumulativeStatsTiming(stats BucketStats) []*AccumulativeStatsProtocolTime {
|
|
if len(stats) == 0 {
|
|
return make([]*AccumulativeStatsProtocolTime, 0)
|
|
}
|
|
|
|
interval := calculateInterval(stats[0].BucketTime.Unix(), stats[len(stats)-1].BucketTime.Unix()) // in seconds
|
|
methodsPerProtocolPerTimeAggregated := getAggregatedResultTiming(stats, interval)
|
|
|
|
return convertAccumulativeStatsTimelineDictToArray(methodsPerProtocolPerTimeAggregated)
|
|
}
|
|
|
|
func addToBucketStats(size int, summery *api.BaseEntry) {
|
|
entryTimeBucketRounded := getBucketFromTimeStamp(summery.Timestamp)
|
|
|
|
if len(bucketsStats) == 0 {
|
|
bucketsStats = append(bucketsStats, &TimeFrameStatsValue{
|
|
BucketTime: entryTimeBucketRounded,
|
|
ProtocolStats: map[string]ProtocolStats{},
|
|
})
|
|
}
|
|
bucketOfEntry := bucketsStats[len(bucketsStats)-1]
|
|
if bucketOfEntry.BucketTime != entryTimeBucketRounded {
|
|
bucketOfEntry = &TimeFrameStatsValue{
|
|
BucketTime: entryTimeBucketRounded,
|
|
ProtocolStats: map[string]ProtocolStats{},
|
|
}
|
|
bucketsStats = append(bucketsStats, bucketOfEntry)
|
|
}
|
|
if _, found := bucketOfEntry.ProtocolStats[summery.Protocol.Abbreviation]; !found {
|
|
bucketOfEntry.ProtocolStats[summery.Protocol.Abbreviation] = ProtocolStats{
|
|
MethodsStats: map[string]*SizeAndEntriesCount{},
|
|
}
|
|
}
|
|
if _, found := bucketOfEntry.ProtocolStats[summery.Protocol.Abbreviation].MethodsStats[summery.Method]; !found {
|
|
bucketOfEntry.ProtocolStats[summery.Protocol.Abbreviation].MethodsStats[summery.Method] = &SizeAndEntriesCount{
|
|
VolumeInBytes: 0,
|
|
EntriesCount: 0,
|
|
}
|
|
}
|
|
|
|
bucketOfEntry.ProtocolStats[summery.Protocol.Abbreviation].MethodsStats[summery.Method].EntriesCount += 1
|
|
bucketOfEntry.ProtocolStats[summery.Protocol.Abbreviation].MethodsStats[summery.Method].VolumeInBytes += size
|
|
}
|
|
|
|
func getBucketFromTimeStamp(timestamp int64) time.Time {
|
|
entryTimeStampAsTime := time.UnixMilli(timestamp)
|
|
return entryTimeStampAsTime.Add(-1 * InternalBucketThreshold / 2).Round(InternalBucketThreshold)
|
|
}
|
|
|
|
func convertAccumulativeStatsTimelineDictToArray(methodsPerProtocolPerTimeAggregated map[time.Time]map[string]map[string]*AccumulativeStatsCounter) []*AccumulativeStatsProtocolTime {
|
|
finalResult := make([]*AccumulativeStatsProtocolTime, 0)
|
|
for timeKey, item := range methodsPerProtocolPerTimeAggregated {
|
|
protocolsData := make([]*AccumulativeStatsProtocol, 0)
|
|
for protocolName, value := range item {
|
|
entriesCount := 0
|
|
volumeSizeBytes := 0
|
|
methods := make([]*AccumulativeStatsCounter, 0)
|
|
for _, methodAccData := range value {
|
|
entriesCount += methodAccData.EntriesCount
|
|
volumeSizeBytes += methodAccData.VolumeSizeBytes
|
|
methods = append(methods, methodAccData)
|
|
}
|
|
protocolsData = append(protocolsData, &AccumulativeStatsProtocol{
|
|
AccumulativeStatsCounter: AccumulativeStatsCounter{
|
|
Name: protocolName,
|
|
Color: protocolToColor[protocolName],
|
|
EntriesCount: entriesCount,
|
|
VolumeSizeBytes: volumeSizeBytes,
|
|
},
|
|
Methods: methods,
|
|
})
|
|
}
|
|
finalResult = append(finalResult, &AccumulativeStatsProtocolTime{
|
|
Time: timeKey.UnixMilli(),
|
|
ProtocolsData: protocolsData,
|
|
})
|
|
}
|
|
return finalResult
|
|
}
|
|
|
|
func convertAccumulativeStatsDictToArray(methodsPerProtocolAggregated map[string]map[string]*AccumulativeStatsCounter) []*AccumulativeStatsProtocol {
|
|
protocolsData := make([]*AccumulativeStatsProtocol, 0)
|
|
for protocolName, value := range methodsPerProtocolAggregated {
|
|
entriesCount := 0
|
|
volumeSizeBytes := 0
|
|
methods := make([]*AccumulativeStatsCounter, 0)
|
|
for _, methodAccData := range value {
|
|
entriesCount += methodAccData.EntriesCount
|
|
volumeSizeBytes += methodAccData.VolumeSizeBytes
|
|
methods = append(methods, methodAccData)
|
|
}
|
|
protocolsData = append(protocolsData, &AccumulativeStatsProtocol{
|
|
AccumulativeStatsCounter: AccumulativeStatsCounter{
|
|
Name: protocolName,
|
|
Color: protocolToColor[protocolName],
|
|
EntriesCount: entriesCount,
|
|
VolumeSizeBytes: volumeSizeBytes,
|
|
},
|
|
Methods: methods,
|
|
})
|
|
}
|
|
return protocolsData
|
|
}
|
|
|
|
func getBucketStatsCopy() BucketStats {
|
|
bucketStatsCopy := BucketStats{}
|
|
bucketStatsLocker.Lock()
|
|
if err := copier.Copy(&bucketStatsCopy, bucketsStats); err != nil {
|
|
logger.Log.Errorf("Error while copying src stats into temporary copied object")
|
|
return nil
|
|
}
|
|
bucketStatsLocker.Unlock()
|
|
return bucketStatsCopy
|
|
}
|
|
|
|
func getAggregatedResultTiming(stats BucketStats, interval time.Duration) map[time.Time]map[string]map[string]*AccumulativeStatsCounter {
|
|
methodsPerProtocolPerTimeAggregated := map[time.Time]map[string]map[string]*AccumulativeStatsCounter{}
|
|
|
|
bucketStatsIndex := len(stats) - 1
|
|
for bucketStatsIndex >= 0 {
|
|
currentBucketTime := stats[bucketStatsIndex].BucketTime
|
|
resultBucketRoundedKey := currentBucketTime.Add(-1 * interval / 2).Round(interval)
|
|
|
|
for protocolName, data := range stats[bucketStatsIndex].ProtocolStats {
|
|
for methodName, dataOfMethod := range data.MethodsStats {
|
|
|
|
if _, ok := methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey]; !ok {
|
|
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey] = map[string]map[string]*AccumulativeStatsCounter{}
|
|
}
|
|
if _, ok := methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName]; !ok {
|
|
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName] = map[string]*AccumulativeStatsCounter{}
|
|
}
|
|
if _, ok := methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName]; !ok {
|
|
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName] = &AccumulativeStatsCounter{
|
|
Name: methodName,
|
|
Color: getColorForMethod(protocolName, methodName),
|
|
EntriesCount: 0,
|
|
VolumeSizeBytes: 0,
|
|
}
|
|
}
|
|
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName].EntriesCount += dataOfMethod.EntriesCount
|
|
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName].VolumeSizeBytes += dataOfMethod.VolumeInBytes
|
|
}
|
|
}
|
|
|
|
bucketStatsIndex--
|
|
}
|
|
return methodsPerProtocolPerTimeAggregated
|
|
}
|
|
|
|
func getAggregatedStats(stats BucketStats) map[string]map[string]*AccumulativeStatsCounter {
|
|
methodsPerProtocolAggregated := make(map[string]map[string]*AccumulativeStatsCounter, 0)
|
|
for _, countersOfTimeFrame := range stats {
|
|
for protocolName, value := range countersOfTimeFrame.ProtocolStats {
|
|
for method, countersValue := range value.MethodsStats {
|
|
if _, found := methodsPerProtocolAggregated[protocolName]; !found {
|
|
methodsPerProtocolAggregated[protocolName] = map[string]*AccumulativeStatsCounter{}
|
|
}
|
|
if _, found := methodsPerProtocolAggregated[protocolName][method]; !found {
|
|
methodsPerProtocolAggregated[protocolName][method] = &AccumulativeStatsCounter{
|
|
Name: method,
|
|
Color: getColorForMethod(protocolName, method),
|
|
EntriesCount: 0,
|
|
VolumeSizeBytes: 0,
|
|
}
|
|
}
|
|
methodsPerProtocolAggregated[protocolName][method].EntriesCount += countersValue.EntriesCount
|
|
methodsPerProtocolAggregated[protocolName][method].VolumeSizeBytes += countersValue.VolumeInBytes
|
|
}
|
|
}
|
|
}
|
|
return methodsPerProtocolAggregated
|
|
}
|
|
|
|
func getColorForMethod(protocolName string, methodName string) string {
|
|
hash := md5.Sum([]byte(fmt.Sprintf("%v_%v", protocolName, methodName)))
|
|
input := hex.EncodeToString(hash[:])
|
|
return fmt.Sprintf("#%v", input[:6])
|
|
}
|
|
|
|
func getAvailableProtocols(stats BucketStats) []string {
|
|
protocols := map[string]bool{}
|
|
for _, countersOfTimeFrame := range stats {
|
|
for protocolName := range countersOfTimeFrame.ProtocolStats {
|
|
protocols[protocolName] = true
|
|
}
|
|
}
|
|
|
|
result := make([]string, 0)
|
|
for protocol := range protocols {
|
|
result = append(result, protocol)
|
|
}
|
|
result = append(result, "ALL")
|
|
return result
|
|
}
|