mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-07-03 11:29:21 +00:00
* Add gin-contrib/pprof dependency * Run pprof server on agent with --profiler flag * Add --profiler flag to cli * Fix error message * Print cpu usage percentage * measure cpu of current pid instead of globaly on the system * Add scripts to plot performance * Plot packetsCount in analysis * Concat to DataFrame * Plot in turbo colorscheme * Make COLORMAP const * Fix rss units * Reduce code repetition by adding function for plotting * Allow grouping based on filenames * Temporary: Marked with comments where to disable code for experiments * Add newline at end of file * Add tap.cpuprofile flag. Change memprofile flag to tap.memprofile * create tapper modes for debugging using env vars * Fix rss plot units (MB instead of bytes) * Remove comment * Add info to plot script * Remove tap.cpumemprofile. Rename tap.memprofile to memprofile * Remove unused import * Remove whitespaces Co-authored-by: M. Mert Yıldıran <mehmet@up9.com> * Remove whitespaces Co-authored-by: M. Mert Yıldıran <mehmet@up9.com> * Remove whitespaces Co-authored-by: M. Mert Yıldıran <mehmet@up9.com> * Remove whitespaces Co-authored-by: M. Mert Yıldıran <mehmet@up9.com> * Remove whitespaces Co-authored-by: M. Mert Yıldıran <mehmet@up9.com> * Remove whitespaces Co-authored-by: M. Mert Yıldıran <mehmet@up9.com> * Rename debug env vars * Create package for debug env vars, read each env var once * Run go mod tidy * Increment MatchedPairs before emitting * Only count cores once * Count virtual and physical cores * Add dbgctl replace in cli * Fix lint: Check return values * Add tap/dbgctl to test-lint make rule * Replace tap/dbgctl in all modules * #run_acceptance_tests * Copy dbgctl module to docker image * Debug/profile tapper benchmark (#1093) * add mizu debug env to avoid all extensions * add readme + run_tapper_benchmark.sh * temporary change branch name * fix readme * fix MIZU_BENCHMARK_CLIENTS_COUNT env * change tap target to tcp stream * track live tcp streams * pr fixes * rename tapperPacketsCount to ignored_packets_count * change mizu tapper to mizu debugg Co-authored-by: David Levanon <dvdlevanon@gmail.com> Co-authored-by: M. Mert Yıldıran <mehmet@up9.com>
119 lines
3.8 KiB
Go
119 lines
3.8 KiB
Go
package app
|
|
|
|
import (
|
|
"fmt"
|
|
"sort"
|
|
"time"
|
|
|
|
"github.com/antelman107/net-wait-go/wait"
|
|
"github.com/op/go-logging"
|
|
basenine "github.com/up9inc/basenine/client/go"
|
|
"github.com/up9inc/mizu/agent/pkg/api"
|
|
"github.com/up9inc/mizu/agent/pkg/utils"
|
|
"github.com/up9inc/mizu/logger"
|
|
"github.com/up9inc/mizu/tap/dbgctl"
|
|
tapApi "github.com/up9inc/mizu/tap/api"
|
|
amqpExt "github.com/up9inc/mizu/tap/extensions/amqp"
|
|
httpExt "github.com/up9inc/mizu/tap/extensions/http"
|
|
kafkaExt "github.com/up9inc/mizu/tap/extensions/kafka"
|
|
redisExt "github.com/up9inc/mizu/tap/extensions/redis"
|
|
)
|
|
|
|
var (
|
|
Extensions []*tapApi.Extension // global
|
|
ExtensionsMap map[string]*tapApi.Extension // global
|
|
)
|
|
|
|
func LoadExtensions() {
|
|
Extensions = make([]*tapApi.Extension, 0)
|
|
ExtensionsMap = make(map[string]*tapApi.Extension)
|
|
|
|
extensionHttp := &tapApi.Extension{}
|
|
dissectorHttp := httpExt.NewDissector()
|
|
dissectorHttp.Register(extensionHttp)
|
|
extensionHttp.Dissector = dissectorHttp
|
|
Extensions = append(Extensions, extensionHttp)
|
|
ExtensionsMap[extensionHttp.Protocol.Name] = extensionHttp
|
|
|
|
if !dbgctl.MizuTapperDisableNonHttpExtensions {
|
|
extensionAmqp := &tapApi.Extension{}
|
|
dissectorAmqp := amqpExt.NewDissector()
|
|
dissectorAmqp.Register(extensionAmqp)
|
|
extensionAmqp.Dissector = dissectorAmqp
|
|
Extensions = append(Extensions, extensionAmqp)
|
|
ExtensionsMap[extensionAmqp.Protocol.Name] = extensionAmqp
|
|
|
|
extensionKafka := &tapApi.Extension{}
|
|
dissectorKafka := kafkaExt.NewDissector()
|
|
dissectorKafka.Register(extensionKafka)
|
|
extensionKafka.Dissector = dissectorKafka
|
|
Extensions = append(Extensions, extensionKafka)
|
|
ExtensionsMap[extensionKafka.Protocol.Name] = extensionKafka
|
|
|
|
extensionRedis := &tapApi.Extension{}
|
|
dissectorRedis := redisExt.NewDissector()
|
|
dissectorRedis.Register(extensionRedis)
|
|
extensionRedis.Dissector = dissectorRedis
|
|
Extensions = append(Extensions, extensionRedis)
|
|
ExtensionsMap[extensionRedis.Protocol.Name] = extensionRedis
|
|
}
|
|
|
|
sort.Slice(Extensions, func(i, j int) bool {
|
|
return Extensions[i].Protocol.Priority < Extensions[j].Protocol.Priority
|
|
})
|
|
|
|
api.InitExtensionsMap(ExtensionsMap)
|
|
}
|
|
|
|
func ConfigureBasenineServer(host string, port string, dbSize int64, logLevel logging.Level, insertionFilter string) {
|
|
if !wait.New(
|
|
wait.WithProto("tcp"),
|
|
wait.WithWait(200*time.Millisecond),
|
|
wait.WithBreak(50*time.Millisecond),
|
|
wait.WithDeadline(20*time.Second),
|
|
wait.WithDebug(logLevel == logging.DEBUG),
|
|
).Do([]string{fmt.Sprintf("%s:%s", host, port)}) {
|
|
logger.Log.Panicf("Basenine is not available!")
|
|
}
|
|
|
|
if err := basenine.Limit(host, port, dbSize); err != nil {
|
|
logger.Log.Panicf("Error while limiting database size: %v", err)
|
|
}
|
|
|
|
// Define the macros
|
|
for _, extension := range Extensions {
|
|
macros := extension.Dissector.Macros()
|
|
for macro, expanded := range macros {
|
|
if err := basenine.Macro(host, port, macro, expanded); err != nil {
|
|
logger.Log.Panicf("Error while adding a macro: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Set the insertion filter that comes from the config
|
|
if err := basenine.InsertionFilter(host, port, insertionFilter); err != nil {
|
|
logger.Log.Errorf("Error while setting the insertion filter: %v", err)
|
|
}
|
|
|
|
utils.StartTime = time.Now().UnixNano() / int64(time.Millisecond)
|
|
}
|
|
|
|
func GetEntryInputChannel() chan *tapApi.OutputChannelItem {
|
|
outputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
|
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
|
go FilterItems(outputItemsChannel, filteredOutputItemsChannel)
|
|
go api.StartReadingEntries(filteredOutputItemsChannel, nil, ExtensionsMap)
|
|
|
|
return outputItemsChannel
|
|
}
|
|
|
|
func FilterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *tapApi.OutputChannelItem) {
|
|
for message := range inChannel {
|
|
if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) {
|
|
continue
|
|
}
|
|
|
|
outChannel <- message
|
|
}
|
|
}
|