diff --git a/api/go.mod b/api/go.mod index 9f8825215..27833d4e4 100644 --- a/api/go.mod +++ b/api/go.mod @@ -23,6 +23,7 @@ require ( k8s.io/api v0.21.0 k8s.io/apimachinery v0.21.0 k8s.io/client-go v0.21.0 + github.com/fsnotify/fsnotify v1.4.9 ) replace github.com/up9inc/mizu/shared v0.0.0 => ../shared diff --git a/api/go.sum b/api/go.sum index 855914a0f..28981733e 100644 --- a/api/go.sum +++ b/api/go.sum @@ -61,6 +61,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/djherbis/atime v1.0.0 h1:ySLvBAM0EvOGaX7TI4dAM5lWj+RdJUCKtGSEHN8SGBg= github.com/djherbis/atime v1.0.0/go.mod h1:5W+KBIuTwVGcqjIfaTwt+KSYX1o6uep8dtevevQP/f8= +github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw= +github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= @@ -73,6 +75,8 @@ github.com/fasthttp/websocket v1.4.3-beta.1/go.mod h1:JGrgLaT02bL9NuJkZbHN8mVV2t github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= @@ -411,6 +415,7 @@ golang.org/x/sys v0.0.0-20190616124812-15dcb6c0061f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/api/main.go b/api/main.go index eb73c5c31..ba012fca7 100644 --- a/api/main.go +++ b/api/main.go @@ -18,6 +18,7 @@ import ( "mizuserver/pkg/utils" "os" "os/signal" + "strings" ) var shouldTap = flag.Bool("tap", false, "Run in tapper mode without API") @@ -25,13 +26,12 @@ var aggregator = flag.Bool("aggregator", false, "Run in aggregator mode with API var standalone = flag.Bool("standalone", false, "Run in standalone tapper and API mode") var aggregatorAddress = flag.String("aggregator-address", "", "Address of mizu collector for tapping") - func main() { flag.Parse() hostMode := os.Getenv(shared.HostModeEnvVar) == "1" tapOpts := &tap.TapOpts{HostMode: hostMode} - if !*shouldTap && !*aggregator && !*standalone{ + if !*shouldTap && !*aggregator && !*standalone { panic("One of the flags --tap, --api or --standalone must be provided") } @@ -89,7 +89,6 @@ func hostApi(socketHarOutputChannel chan<- *tap.OutputChannelItem) { AllowMethods: "*", AllowHeaders: "*", })) - middleware.FiberMiddleware(app) // Register Fiber's middleware for app. app.Static("/", "./site") @@ -107,7 +106,6 @@ func hostApi(socketHarOutputChannel chan<- *tap.OutputChannelItem) { utils.StartServer(app) } - func getTapTargets() []string { nodeName := os.Getenv(shared.NodeNameEnvVar) var tappedAddressesPerNodeDict map[string][]string @@ -132,11 +130,17 @@ func getTrafficFilteringOptions() *shared.TrafficFilteringOptions { return &filteringOptions } -func filterHarItems(inChannel <- chan *tap.OutputChannelItem, outChannel chan *tap.OutputChannelItem, filterOptions *shared.TrafficFilteringOptions) { +var userAgentsToFilter = []string{"kube-probe", "prometheus"} + +func filterHarItems(inChannel <-chan *tap.OutputChannelItem, outChannel chan *tap.OutputChannelItem, filterOptions *shared.TrafficFilteringOptions) { for message := range inChannel { if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) { continue } + // TODO: move this to tappers https://up9.atlassian.net/browse/TRA-3441 + if filterOptions.HideHealthChecks && isHealthCheckByUserAgent(message) { + continue + } sensitiveDataFiltering.FilterSensitiveInfoFromHarRequest(message, filterOptions) @@ -144,6 +148,20 @@ func filterHarItems(inChannel <- chan *tap.OutputChannelItem, outChannel chan *t } } +func isHealthCheckByUserAgent(message *tap.OutputChannelItem) bool { + for _, header := range message.HarEntry.Request.Headers { + if strings.ToLower(header.Name) == "user-agent" { + for _, userAgent := range userAgentsToFilter { + if strings.Contains(strings.ToLower(header.Value), userAgent) { + return true + } + } + return false + } + } + return false +} + func pipeChannelToSocket(connection *websocket.Conn, messageDataChannel <-chan *tap.OutputChannelItem) { if connection == nil { panic("Websocket connection is nil") diff --git a/api/pkg/api/main.go b/api/pkg/api/main.go index 42411953b..8ae1352af 100644 --- a/api/pkg/api/main.go +++ b/api/pkg/api/main.go @@ -160,6 +160,7 @@ func saveHarToDb(entry *har.Entry, connectionInfo *tap.ConnectionInfo) { ResolvedDestination: resolvedDestination, IsOutgoing: connectionInfo.IsOutgoing, } + mizuEntry.EstimatedSizeBytes = getEstimatedEntrySizeBytes(mizuEntry) database.GetEntriesTable().Create(&mizuEntry) baseEntry := models.BaseEntryDetails{} @@ -179,3 +180,22 @@ func getServiceNameFromUrl(inputUrl string) (string, string) { func CheckIsServiceIP(address string) bool { return k8sResolver.CheckIsServiceIP(address) } + +// gives a rough estimate of the size this will take up in the db, good enough for maintaining db size limit accurately +func getEstimatedEntrySizeBytes(mizuEntry models.MizuEntry) int { + sizeBytes := len(mizuEntry.Entry) + sizeBytes += len(mizuEntry.EntryId) + sizeBytes += len(mizuEntry.Service) + sizeBytes += len(mizuEntry.Url) + sizeBytes += len(mizuEntry.Method) + sizeBytes += len(mizuEntry.RequestSenderIp) + sizeBytes += len(mizuEntry.ResolvedDestination) + sizeBytes += len(mizuEntry.ResolvedSource) + sizeBytes += 8 // Status bytes (sqlite integer is always 8 bytes) + sizeBytes += 8 // Timestamp bytes + sizeBytes += 8 // SizeBytes bytes + sizeBytes += 1 // IsOutgoing bytes + + + return sizeBytes +} diff --git a/api/pkg/database/main.go b/api/pkg/database/main.go index 4f144400e..ea4b420f9 100644 --- a/api/pkg/database/main.go +++ b/api/pkg/database/main.go @@ -4,17 +4,17 @@ import ( "fmt" "gorm.io/driver/sqlite" "gorm.io/gorm" + "gorm.io/gorm/logger" "mizuserver/pkg/models" "mizuserver/pkg/utils" + "time" ) const ( DBPath = "./entries.db" ) -var ( - DB = initDataBase(DBPath) -) + var DB *gorm.DB const ( OrderDesc = "desc" @@ -34,12 +34,19 @@ var ( } ) +func init() { + DB = initDataBase(DBPath) + go StartEnforcingDatabaseSize() +} + func GetEntriesTable() *gorm.DB { return DB.Table("mizu_entries") } func initDataBase(databasePath string) *gorm.DB { - temp, _ := gorm.Open(sqlite.Open(databasePath), &gorm.Config{}) + temp, _ := gorm.Open(sqlite.Open(databasePath), &gorm.Config{ + Logger: &utils.TruncatingLogger{LogLevel: logger.Warn, SlowThreshold: 500 * time.Millisecond}, + }) _ = temp.AutoMigrate(&models.MizuEntry{}) // this will ensure table is created return temp } diff --git a/api/pkg/database/size_enforcer.go b/api/pkg/database/size_enforcer.go new file mode 100644 index 000000000..f26b1a4db --- /dev/null +++ b/api/pkg/database/size_enforcer.go @@ -0,0 +1,117 @@ +package database + +import ( + "fmt" + "github.com/fsnotify/fsnotify" + "github.com/up9inc/mizu/shared" + "github.com/up9inc/mizu/shared/debounce" + "github.com/up9inc/mizu/shared/units" + "log" + "mizuserver/pkg/models" + "os" + "strconv" + "time" +) + +const percentageOfMaxSizeBytesToPrune = 15 +const defaultMaxDatabaseSizeBytes int64 = 200 * 1000 * 1000 + +func StartEnforcingDatabaseSize() { + watcher, err := fsnotify.NewWatcher() + if err != nil { + log.Fatalf("Error creating filesystem watcher for db size enforcement: %v\n", err) + return + } + + maxEntriesDBByteSize, err := getMaxEntriesDBByteSize() + if err != nil { + log.Fatalf("Error parsing max db size: %v\n", err) + return + } + + checkFileSizeDebouncer := debounce.NewDebouncer(5*time.Second, func() { + checkFileSize(maxEntriesDBByteSize) + }) + + go func() { + for { + select { + case event, ok := <-watcher.Events: + if !ok { + return // closed channel + } + if event.Op == fsnotify.Write { + checkFileSizeDebouncer.SetOn() + } + case err, ok := <-watcher.Errors: + if !ok { + return // closed channel + } + fmt.Printf("filesystem watcher encountered error:%v\n", err) + } + } + }() + + err = watcher.Add(DBPath) + if err != nil { + log.Fatalf("Error adding %s to filesystem watcher for db size enforcement: %v\n", DBPath, err) + } +} + +func getMaxEntriesDBByteSize() (int64, error) { + maxEntriesDBByteSize := defaultMaxDatabaseSizeBytes + var err error + + maxEntriesDBSizeByteSEnvVarValue := os.Getenv(shared.MaxEntriesDBSizeByteSEnvVar) + if maxEntriesDBSizeByteSEnvVarValue != "" { + maxEntriesDBByteSize, err = strconv.ParseInt(maxEntriesDBSizeByteSEnvVarValue, 10, 64) + } + return maxEntriesDBByteSize, err +} + +func checkFileSize(maxSizeBytes int64) { + fileStat, err := os.Stat(DBPath) + if err != nil { + fmt.Printf("Error checking %s file size: %v\n", DBPath, err) + } else { + if fileStat.Size() > maxSizeBytes { + pruneOldEntries(fileStat.Size()) + } + } +} + +func pruneOldEntries(currentFileSize int64) { + amountOfBytesToTrim := currentFileSize / (100 / percentageOfMaxSizeBytesToPrune) + + rows, err := GetEntriesTable().Limit(10000).Order("id").Rows() + if err != nil { + fmt.Printf("Error getting 10000 first db rows: %v\n", err) + return + } + + entryIdsToRemove := make([]uint, 0) + bytesToBeRemoved := int64(0) + for rows.Next() { + if bytesToBeRemoved >= amountOfBytesToTrim { + break + } + var entry models.MizuEntry + err = DB.ScanRows(rows, &entry) + if err != nil { + fmt.Printf("Error scanning db row: %v\n", err) + continue + } + + entryIdsToRemove = append(entryIdsToRemove, entry.ID) + bytesToBeRemoved += int64(entry.EstimatedSizeBytes) + } + + if len(entryIdsToRemove) > 0 { + GetEntriesTable().Where(entryIdsToRemove).Delete(models.MizuEntry{}) + // VACUUM causes sqlite to shrink the db file after rows have been deleted, the db file will not shrink without this + DB.Exec("VACUUM") + fmt.Printf("Removed %d rows and cleared %s\n", len(entryIdsToRemove), units.BytesToHumanReadable(bytesToBeRemoved)) + } else { + fmt.Println("Found no rows to remove when pruning") + } +} diff --git a/api/pkg/models/models.go b/api/pkg/models/models.go index 534cc8def..c12119db9 100644 --- a/api/pkg/models/models.go +++ b/api/pkg/models/models.go @@ -33,6 +33,7 @@ type MizuEntry struct { ResolvedSource string `json:"resolvedSource,omitempty" gorm:"column:resolvedSource"` ResolvedDestination string `json:"resolvedDestination,omitempty" gorm:"column:resolvedDestination"` IsOutgoing bool `json:"isOutgoing,omitempty" gorm:"column:isOutgoing"` + EstimatedSizeBytes int `json:"-" gorm:"column:estimatedSizeBytes"` } type BaseEntryDetails struct { diff --git a/api/pkg/utils/truncating_logger.go b/api/pkg/utils/truncating_logger.go new file mode 100644 index 000000000..f3a0c72a6 --- /dev/null +++ b/api/pkg/utils/truncating_logger.go @@ -0,0 +1,58 @@ +package utils + +import ( + "context" + "fmt" + "gorm.io/gorm/logger" + "gorm.io/gorm/utils" + "time" +) + +// TruncatingLogger implements the gorm logger.Interface interface. Its purpose is to act as gorm's logger while truncating logs to a max of 50 characters to minimise the performance impact +type TruncatingLogger struct { + LogLevel logger.LogLevel + SlowThreshold time.Duration +} + +func (truncatingLogger *TruncatingLogger) LogMode(logLevel logger.LogLevel) logger.Interface { + truncatingLogger.LogLevel = logLevel + return truncatingLogger +} + +func (truncatingLogger *TruncatingLogger) Info(_ context.Context, message string, __ ...interface{}) { + if truncatingLogger.LogLevel < logger.Info { + return + } + fmt.Printf("gorm info: %.150s\n", message) +} + +func (truncatingLogger *TruncatingLogger) Warn(_ context.Context, message string, __ ...interface{}) { + if truncatingLogger.LogLevel < logger.Warn { + return + } + fmt.Printf("gorm warning: %.150s\n", message) +} + +func (truncatingLogger *TruncatingLogger) Error(_ context.Context, message string, __ ...interface{}) { + if truncatingLogger.LogLevel < logger.Error { + return + } + fmt.Printf("gorm error: %.150s\n", message) +} + +func (truncatingLogger *TruncatingLogger) Trace(ctx context.Context, begin time.Time, fc func() (string, int64), err error) { + if truncatingLogger.LogLevel == logger.Silent { + return + } + elapsed := time.Since(begin) + if err != nil { + sql, rows := fc() // copied into every condition as this is a potentially heavy operation best done only when necessary + truncatingLogger.Error(ctx, fmt.Sprintf("Error in %s: %v - elapsed: %fs affected rows: %d, sql: %s", utils.FileWithLineNum(), err, elapsed.Seconds(), rows, sql)) + } else if truncatingLogger.LogLevel >= logger.Warn && elapsed > truncatingLogger.SlowThreshold { + sql, rows := fc() + truncatingLogger.Warn(ctx, fmt.Sprintf("Slow sql query - elapse: %fs rows: %d, sql: %s", elapsed.Seconds(), rows, sql)) + } else if truncatingLogger.LogLevel >= logger.Info { + sql, rows := fc() + truncatingLogger.Info(ctx, fmt.Sprintf("Sql query - elapse: %fs rows: %d, sql: %s", elapsed.Seconds(), rows, sql)) + } +} diff --git a/cli/cmd/tap.go b/cli/cmd/tap.go index e9827e1bd..46cb4b69e 100644 --- a/cli/cmd/tap.go +++ b/cli/cmd/tap.go @@ -3,13 +3,13 @@ package cmd import ( "errors" "fmt" + "github.com/spf13/cobra" "github.com/up9inc/mizu/cli/mizu" "github.com/up9inc/mizu/cli/uiUtils" + "github.com/up9inc/mizu/shared/units" "os" "regexp" "strings" - - "github.com/spf13/cobra" ) type MizuTapOptions struct { @@ -22,12 +22,17 @@ type MizuTapOptions struct { MizuImage string PlainTextFilterRegexes []string TapOutgoing bool + HideHealthChecks bool + MaxEntriesDBSizeBytes int64 SleepIntervalSec uint16 } var mizuTapOptions = &MizuTapOptions{} var direction string +var humanMaxEntriesDBSize string var regex *regexp.Regexp +const maxEntriesDBSizeFlagName = "max-entries-db-size" + const analysisMessageToConfirm = `NOTE: running mizu with --analysis flag will upload recorded traffic to UP9 cloud for further analysis and enriched presentation options. @@ -55,6 +60,15 @@ Supported protocols are HTTP and gRPC.`, return errors.New(fmt.Sprintf("%s is not a valid regex %s", args[0], compileErr)) } + var parseHumanDataSizeErr error + mizuTapOptions.MaxEntriesDBSizeBytes, parseHumanDataSizeErr = units.HumanReadableToBytes(humanMaxEntriesDBSize) + if parseHumanDataSizeErr != nil { + return errors.New(fmt.Sprintf("Could not parse --max-entries-db-size value %s", humanMaxEntriesDBSize)) + } else if cmd.Flags().Changed(maxEntriesDBSizeFlagName) { + // We're parsing human readable file sizes here so its best to be unambiguous + fmt.Printf("Setting max entries db size to %s\n", units.BytesToHumanReadable(mizuTapOptions.MaxEntriesDBSizeBytes)) + } + directionLowerCase := strings.ToLower(direction) if directionLowerCase == "any" { mizuTapOptions.TapOutgoing = true @@ -88,4 +102,6 @@ func init() { tapCmd.Flags().StringVarP(&mizuTapOptions.MizuImage, "mizu-image", "", fmt.Sprintf("gcr.io/up9-docker-hub/mizu/%s:latest", mizu.Branch), "Custom image for mizu collector") tapCmd.Flags().StringArrayVarP(&mizuTapOptions.PlainTextFilterRegexes, "regex-masking", "r", nil, "List of regex expressions that are used to filter matching values from text/plain http bodies") tapCmd.Flags().StringVarP(&direction, "direction", "", "in", "Record traffic that goes in this direction (relative to the tapped pod): in/any") + tapCmd.Flags().BoolVar(&mizuTapOptions.HideHealthChecks, "hide-healthchecks", false, "hides requests with kube-probe or prometheus user-agent headers") + tapCmd.Flags().StringVarP(&humanMaxEntriesDBSize, maxEntriesDBSizeFlagName, "", "200MB", "override the default max entries db size of 200mb") } diff --git a/cli/cmd/tapRunner.go b/cli/cmd/tapRunner.go index 0af05ebd8..de2691397 100644 --- a/cli/cmd/tapRunner.go +++ b/cli/cmd/tapRunner.go @@ -4,10 +4,10 @@ import ( "context" "fmt" "github.com/romana/rlog" - "github.com/up9inc/mizu/cli/debounce" "github.com/up9inc/mizu/cli/kubernetes" "github.com/up9inc/mizu/cli/mizu" "github.com/up9inc/mizu/shared" + "github.com/up9inc/mizu/shared/debounce" core "k8s.io/api/core/v1" "log" "net/http" @@ -96,7 +96,7 @@ func createMizuAggregator(ctx context.Context, kubernetesProvider *kubernetes.Pr var err error mizuServiceAccountExists = createRBACIfNecessary(ctx, kubernetesProvider) - _, err = kubernetesProvider.CreateMizuAggregatorPod(ctx, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.MizuImage, mizuServiceAccountExists, mizuApiFilteringOptions) + _, err = kubernetesProvider.CreateMizuAggregatorPod(ctx, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.MizuImage, mizuServiceAccountExists, mizuApiFilteringOptions, tappingOptions.MaxEntriesDBSizeBytes) if err != nil { fmt.Printf("Error creating mizu collector pod: %v\n", err) return err @@ -112,21 +112,21 @@ func createMizuAggregator(ctx context.Context, kubernetesProvider *kubernetes.Pr } func getMizuApiFilteringOptions(tappingOptions *MizuTapOptions) (*shared.TrafficFilteringOptions, error) { - if tappingOptions.PlainTextFilterRegexes == nil || len(tappingOptions.PlainTextFilterRegexes) == 0 { - return nil, nil - } + var compiledRegexSlice []*shared.SerializableRegexp - compiledRegexSlice := make([]*shared.SerializableRegexp, 0) - for _, regexStr := range tappingOptions.PlainTextFilterRegexes { - compiledRegex, err := shared.CompileRegexToSerializableRegexp(regexStr) - if err != nil { - fmt.Printf("Regex %s is invalid: %v", regexStr, err) - return nil, err + if tappingOptions.PlainTextFilterRegexes != nil && len(tappingOptions.PlainTextFilterRegexes) > 0 { + compiledRegexSlice = make([]*shared.SerializableRegexp, 0) + for _, regexStr := range tappingOptions.PlainTextFilterRegexes { + compiledRegex, err := shared.CompileRegexToSerializableRegexp(regexStr) + if err != nil { + fmt.Printf("Regex %s is invalid: %v", regexStr, err) + return nil, err + } + compiledRegexSlice = append(compiledRegexSlice, compiledRegex) } - compiledRegexSlice = append(compiledRegexSlice, compiledRegex) } - return &shared.TrafficFilteringOptions{PlainTextMaskingRegexes: compiledRegexSlice}, nil + return &shared.TrafficFilteringOptions{PlainTextMaskingRegexes: compiledRegexSlice, HideHealthChecks: tappingOptions.HideHealthChecks}, nil } func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions) error { diff --git a/cli/go.sum b/cli/go.sum index f26ffa68e..9a79ea6a5 100644 --- a/cli/go.sum +++ b/cli/go.sum @@ -90,6 +90,7 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/docker/distribution v2.7.1+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw= github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= diff --git a/cli/kubernetes/provider.go b/cli/kubernetes/provider.go index ed041b999..9a9409eaf 100644 --- a/cli/kubernetes/provider.go +++ b/cli/kubernetes/provider.go @@ -8,6 +8,7 @@ import ( "fmt" "path/filepath" "regexp" + "strconv" "github.com/up9inc/mizu/shared" core "k8s.io/api/core/v1" @@ -71,7 +72,7 @@ func (provider *Provider) GetPodWatcher(ctx context.Context, namespace string) w return watcher } -func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace string, podName string, podImage string, linkServiceAccount bool, mizuApiFilteringOptions *shared.TrafficFilteringOptions) (*core.Pod, error) { +func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace string, podName string, podImage string, linkServiceAccount bool, mizuApiFilteringOptions *shared.TrafficFilteringOptions, maxEntriesDBSizeBytes int64) (*core.Pod, error) { marshaledFilteringOptions, err := json.Marshal(mizuApiFilteringOptions) if err != nil { return nil, err @@ -116,6 +117,10 @@ func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace Name: shared.MizuFilteringOptionsEnvVar, Value: string(marshaledFilteringOptions), }, + { + Name: shared.MaxEntriesDBSizeByteSEnvVar, + Value: strconv.FormatInt(maxEntriesDBSizeBytes, 10), + }, }, Resources: core.ResourceRequirements{ Limits: core.ResourceList{ diff --git a/shared/consts.go b/shared/consts.go index ccda67615..0d452910d 100644 --- a/shared/consts.go +++ b/shared/consts.go @@ -1,8 +1,9 @@ package shared const ( - MizuFilteringOptionsEnvVar = "SENSITIVE_DATA_FILTERING_OPTIONS" - HostModeEnvVar = "HOST_MODE" - NodeNameEnvVar = "NODE_NAME" + MizuFilteringOptionsEnvVar = "SENSITIVE_DATA_FILTERING_OPTIONS" + HostModeEnvVar = "HOST_MODE" + NodeNameEnvVar = "NODE_NAME" TappedAddressesPerNodeDictEnvVar = "TAPPED_ADDRESSES_PER_HOST" + MaxEntriesDBSizeByteSEnvVar = "MAX_ENTRIES_DB_BYTES" ) diff --git a/cli/debounce/debounce.go b/shared/debounce/debounce.go similarity index 100% rename from cli/debounce/debounce.go rename to shared/debounce/debounce.go diff --git a/shared/go.mod b/shared/go.mod index d1afa338c..66e5165d6 100644 --- a/shared/go.mod +++ b/shared/go.mod @@ -2,4 +2,8 @@ module github.com/up9inc/mizu/shared go 1.16 -require github.com/gorilla/websocket v1.4.2 +require ( + github.com/gorilla/websocket v1.4.2 + github.com/docker/go-units v0.4.0 +) + diff --git a/shared/go.sum b/shared/go.sum index 85efffd99..b46c3a514 100644 --- a/shared/go.sum +++ b/shared/go.sum @@ -1,2 +1,4 @@ +github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw= +github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= diff --git a/shared/models.go b/shared/models.go index 42e86d9bc..8cf46c0f7 100644 --- a/shared/models.go +++ b/shared/models.go @@ -59,4 +59,5 @@ func CreateWebSocketMessageTypeAnalyzeStatus(analyzeStatus AnalyzeStatus) WebSoc type TrafficFilteringOptions struct { PlainTextMaskingRegexes []*SerializableRegexp + HideHealthChecks bool } diff --git a/shared/socket_client.go b/shared/socket_client.go index 2faeabf5c..4cc618356 100644 --- a/shared/socket_client.go +++ b/shared/socket_client.go @@ -7,7 +7,7 @@ import ( ) const ( - DEFAULT_SOCKET_RETRIES = 3 + DEFAULT_SOCKET_RETRIES = 3 DEFAULT_SOCKET_RETRY_SLEEP_TIME = time.Second * 10 ) diff --git a/shared/units/human_data_sizes.go b/shared/units/human_data_sizes.go new file mode 100644 index 000000000..d3c39bf7c --- /dev/null +++ b/shared/units/human_data_sizes.go @@ -0,0 +1,11 @@ +package units + +import "github.com/docker/go-units" + +func BytesToHumanReadable(bytes int64) string { + return units.HumanSize(float64(bytes)) +} + +func HumanReadableToBytes(humanReadableSize string) (int64, error) { + return units.FromHumanSize(humanReadableSize) +}