kubeshark/agent/pkg/app/main.go
Nimrod Gilboa Markevich ab38f4c011
Add profiling tools (#1087)
* Add gin-contrib/pprof dependency

* Run pprof server on agent with --profiler flag

* Add --profiler flag to cli

* Fix error message

* Print cpu usage percentage

* measure cpu of current pid instead of globaly on the system

* Add scripts to plot performance

* Plot packetsCount in analysis

* Concat to DataFrame

* Plot in turbo colorscheme

* Make COLORMAP const

* Fix rss units

* Reduce code repetition by adding function for plotting

* Allow grouping based on filenames

* Temporary: Marked with comments where to disable code for experiments

* Add newline at end of file

* Add tap.cpuprofile flag. Change memprofile flag to tap.memprofile

* create tapper modes for debugging using env vars

* Fix rss plot units (MB instead of bytes)

* Remove comment

* Add info to plot script

* Remove tap.cpumemprofile. Rename tap.memprofile to memprofile

* Remove unused import

* Remove whitespaces

Co-authored-by: M. Mert Yıldıran <mehmet@up9.com>

* Remove whitespaces

Co-authored-by: M. Mert Yıldıran <mehmet@up9.com>

* Remove whitespaces

Co-authored-by: M. Mert Yıldıran <mehmet@up9.com>

* Remove whitespaces

Co-authored-by: M. Mert Yıldıran <mehmet@up9.com>

* Remove whitespaces

Co-authored-by: M. Mert Yıldıran <mehmet@up9.com>

* Remove whitespaces

Co-authored-by: M. Mert Yıldıran <mehmet@up9.com>

* Rename debug env vars

* Create package for debug env vars, read each env var once

* Run go mod tidy

* Increment MatchedPairs before emitting

* Only count cores once

* Count virtual and physical cores

* Add dbgctl replace in cli

* Fix lint: Check return values

* Add tap/dbgctl to test-lint make rule

* Replace tap/dbgctl in all modules

* #run_acceptance_tests

* Copy dbgctl module to docker image

* Debug/profile tapper benchmark (#1093)

* add mizu debug env to avoid all extensions

* add readme + run_tapper_benchmark.sh

* temporary change branch name

* fix readme

* fix MIZU_BENCHMARK_CLIENTS_COUNT env

* change tap target to tcp stream

* track live tcp streams

* pr fixes

* rename tapperPacketsCount to ignored_packets_count

* change mizu tapper to mizu debugg

Co-authored-by: David Levanon <dvdlevanon@gmail.com>
Co-authored-by: M. Mert Yıldıran <mehmet@up9.com>
2022-05-18 15:42:13 +03:00

119 lines
3.8 KiB
Go

package app
import (
"fmt"
"sort"
"time"
"github.com/antelman107/net-wait-go/wait"
"github.com/op/go-logging"
basenine "github.com/up9inc/basenine/client/go"
"github.com/up9inc/mizu/agent/pkg/api"
"github.com/up9inc/mizu/agent/pkg/utils"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/tap/dbgctl"
tapApi "github.com/up9inc/mizu/tap/api"
amqpExt "github.com/up9inc/mizu/tap/extensions/amqp"
httpExt "github.com/up9inc/mizu/tap/extensions/http"
kafkaExt "github.com/up9inc/mizu/tap/extensions/kafka"
redisExt "github.com/up9inc/mizu/tap/extensions/redis"
)
var (
Extensions []*tapApi.Extension // global
ExtensionsMap map[string]*tapApi.Extension // global
)
func LoadExtensions() {
Extensions = make([]*tapApi.Extension, 0)
ExtensionsMap = make(map[string]*tapApi.Extension)
extensionHttp := &tapApi.Extension{}
dissectorHttp := httpExt.NewDissector()
dissectorHttp.Register(extensionHttp)
extensionHttp.Dissector = dissectorHttp
Extensions = append(Extensions, extensionHttp)
ExtensionsMap[extensionHttp.Protocol.Name] = extensionHttp
if !dbgctl.MizuTapperDisableNonHttpExtensions {
extensionAmqp := &tapApi.Extension{}
dissectorAmqp := amqpExt.NewDissector()
dissectorAmqp.Register(extensionAmqp)
extensionAmqp.Dissector = dissectorAmqp
Extensions = append(Extensions, extensionAmqp)
ExtensionsMap[extensionAmqp.Protocol.Name] = extensionAmqp
extensionKafka := &tapApi.Extension{}
dissectorKafka := kafkaExt.NewDissector()
dissectorKafka.Register(extensionKafka)
extensionKafka.Dissector = dissectorKafka
Extensions = append(Extensions, extensionKafka)
ExtensionsMap[extensionKafka.Protocol.Name] = extensionKafka
extensionRedis := &tapApi.Extension{}
dissectorRedis := redisExt.NewDissector()
dissectorRedis.Register(extensionRedis)
extensionRedis.Dissector = dissectorRedis
Extensions = append(Extensions, extensionRedis)
ExtensionsMap[extensionRedis.Protocol.Name] = extensionRedis
}
sort.Slice(Extensions, func(i, j int) bool {
return Extensions[i].Protocol.Priority < Extensions[j].Protocol.Priority
})
api.InitExtensionsMap(ExtensionsMap)
}
func ConfigureBasenineServer(host string, port string, dbSize int64, logLevel logging.Level, insertionFilter string) {
if !wait.New(
wait.WithProto("tcp"),
wait.WithWait(200*time.Millisecond),
wait.WithBreak(50*time.Millisecond),
wait.WithDeadline(20*time.Second),
wait.WithDebug(logLevel == logging.DEBUG),
).Do([]string{fmt.Sprintf("%s:%s", host, port)}) {
logger.Log.Panicf("Basenine is not available!")
}
if err := basenine.Limit(host, port, dbSize); err != nil {
logger.Log.Panicf("Error while limiting database size: %v", err)
}
// Define the macros
for _, extension := range Extensions {
macros := extension.Dissector.Macros()
for macro, expanded := range macros {
if err := basenine.Macro(host, port, macro, expanded); err != nil {
logger.Log.Panicf("Error while adding a macro: %v", err)
}
}
}
// Set the insertion filter that comes from the config
if err := basenine.InsertionFilter(host, port, insertionFilter); err != nil {
logger.Log.Errorf("Error while setting the insertion filter: %v", err)
}
utils.StartTime = time.Now().UnixNano() / int64(time.Millisecond)
}
func GetEntryInputChannel() chan *tapApi.OutputChannelItem {
outputItemsChannel := make(chan *tapApi.OutputChannelItem)
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
go FilterItems(outputItemsChannel, filteredOutputItemsChannel)
go api.StartReadingEntries(filteredOutputItemsChannel, nil, ExtensionsMap)
return outputItemsChannel
}
func FilterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *tapApi.OutputChannelItem) {
for message := range inChannel {
if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) {
continue
}
outChannel <- message
}
}