diff --git a/api/go.sum b/api/go.sum index baf901d28..28981733e 100644 --- a/api/go.sum +++ b/api/go.sum @@ -61,6 +61,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/djherbis/atime v1.0.0 h1:ySLvBAM0EvOGaX7TI4dAM5lWj+RdJUCKtGSEHN8SGBg= github.com/djherbis/atime v1.0.0/go.mod h1:5W+KBIuTwVGcqjIfaTwt+KSYX1o6uep8dtevevQP/f8= +github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw= +github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= diff --git a/api/main.go b/api/main.go index 0a6dc516a..2239b1e75 100644 --- a/api/main.go +++ b/api/main.go @@ -17,6 +17,7 @@ import ( "mizuserver/pkg/utils" "os" "os/signal" + "strings" ) var shouldTap = flag.Bool("tap", false, "Run in tapper mode without API") @@ -126,11 +127,17 @@ func getTrafficFilteringOptions() *shared.TrafficFilteringOptions { return &filteringOptions } +var userAgentsToFilter = []string{"kube-probe", "prometheus"} + func filterHarItems(inChannel <- chan *tap.OutputChannelItem, outChannel chan *tap.OutputChannelItem, filterOptions *shared.TrafficFilteringOptions) { for message := range inChannel { if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) { continue } + // TODO: move this to tappers + if filterOptions.HideHealthChecks && isHealthCheckByUserAgent(message) { + continue + } sensitiveDataFiltering.FilterSensitiveInfoFromHarRequest(message, filterOptions) @@ -138,6 +145,20 @@ func filterHarItems(inChannel <- chan *tap.OutputChannelItem, outChannel chan *t } } +func isHealthCheckByUserAgent(message *tap.OutputChannelItem) bool { + for _, header := range message.HarEntry.Request.Headers { + if strings.ToLower(header.Name) == "user-agent" { + for _, userAgent := range userAgentsToFilter { + if strings.Contains(strings.ToLower(header.Value), userAgent) { + return true + } + return false + } + } + } + return false +} + func pipeChannelToSocket(connection *websocket.Conn, messageDataChannel <-chan *tap.OutputChannelItem) { if connection == nil { panic("Websocket connection is nil") diff --git a/api/pkg/database/main.go b/api/pkg/database/main.go index 921b13442..58d215bb3 100644 --- a/api/pkg/database/main.go +++ b/api/pkg/database/main.go @@ -4,6 +4,7 @@ import ( "fmt" "gorm.io/driver/sqlite" "gorm.io/gorm" + "gorm.io/gorm/logger" "mizuserver/pkg/models" "mizuserver/pkg/utils" ) @@ -12,9 +13,7 @@ const ( DBPath = "./entries.db" ) -var ( - DB = initDataBase(DBPath) -) + var DB *gorm.DB const ( OrderDesc = "desc" @@ -34,13 +33,19 @@ var ( } ) +func init() { + DB = initDataBase(DBPath) + go StartEnforcingDatabaseSize() +} + func GetEntriesTable() *gorm.DB { return DB.Table("mizu_entries") } func initDataBase(databasePath string) *gorm.DB { - go StartEnforcingDatabaseSize( 10 * 1000 * 1000) - temp, _ := gorm.Open(sqlite.Open(databasePath), &gorm.Config{}) + temp, _ := gorm.Open(sqlite.Open(databasePath), &gorm.Config{ + Logger: logger.Default.LogMode(logger.Silent), + }) _ = temp.AutoMigrate(&models.MizuEntry{}) // this will ensure table is created return temp } diff --git a/api/pkg/database/size_enforcer.go b/api/pkg/database/size_enforcer.go index f84d461b9..e4841e014 100644 --- a/api/pkg/database/size_enforcer.go +++ b/api/pkg/database/size_enforcer.go @@ -3,24 +3,34 @@ package database import ( "fmt" "github.com/fsnotify/fsnotify" + "github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared/debounce" "log" "mizuserver/pkg/models" "os" + "strconv" "time" ) -const percentageOfMaxSizeBytesToPrune = 5 +const percentageOfMaxSizeBytesToPrune = 15 +const defaultMaxDatabaseSizeBytes = 200 * 1000 * 1000 -func StartEnforcingDatabaseSize(maxSizeBytes int64) { +func StartEnforcingDatabaseSize() { watcher, err := fsnotify.NewWatcher() if err != nil { - log.Fatal("Error creating filesystem watcher for db size enforcement:", err) + log.Printf("Error creating filesystem watcher for db size enforcement: %v\n", err) // TODO: make fatal + return } defer watcher.Close() - checkFileSizeDebouncer := debounce.NewDebouncer(5 * time.Second, func() { - checkFileSize(maxSizeBytes) + maxEntriesDBByteSize, err := getMaxEntriesDBByteSize() + if err != nil { + log.Printf("Error parsing max db size: %v\n", err) // TODO: make fatal + return + } + + checkFileSizeDebouncer := debounce.NewDebouncer(5*time.Second, func() { + checkFileSize(maxEntriesDBByteSize) }) done := make(chan bool) @@ -45,28 +55,40 @@ func StartEnforcingDatabaseSize(maxSizeBytes int64) { err = watcher.Add(DBPath) if err != nil { - log.Fatal(fmt.Sprintf("Error adding %s to filesystem watcher for db size enforcement: %v", DBPath, err)) + log.Printf("Error adding %s to filesystem watcher for db size enforcement: %v\n", DBPath, err) //TODO: make fatal } <-done } +func getMaxEntriesDBByteSize() (int64, error) { + maxEntriesDBByteSize := int64(defaultMaxDatabaseSizeBytes) + var err error + + maxEntriesDBSizeByteSEnvVarValue := os.Getenv(shared.MaxEntriesDBSizeByteSEnvVar) + if maxEntriesDBSizeByteSEnvVarValue != "" { + maxEntriesDBByteSize, err = strconv.ParseInt(maxEntriesDBSizeByteSEnvVarValue, 10, 64) + } + return maxEntriesDBByteSize, err +} + func checkFileSize(maxSizeBytes int64) { fileStat, err := os.Stat(DBPath) if err != nil { fmt.Printf("Error checking %s file size: %v\n", DBPath, err) } else { + fmt.Printf("%s size is %s, checking if over %s\n", DBPath, shared.BytesToHumanReadable(fileStat.Size()), shared.BytesToHumanReadable(maxSizeBytes)) if fileStat.Size() > maxSizeBytes { - pruneOldEntries(maxSizeBytes) + pruneOldEntries(fileStat.Size()) } } } -func pruneOldEntries(maxSizeBytes int64) { - amountOfBytesToTrim := maxSizeBytes * (percentageOfMaxSizeBytesToPrune / 100) +func pruneOldEntries(currentFileSize int64) { + amountOfBytesToTrim := currentFileSize / (100 / percentageOfMaxSizeBytesToPrune) - rows, err := GetEntriesTable().Limit(100).Order("id").Rows() + rows, err := GetEntriesTable().Limit(10000).Order("id").Rows() if err != nil { - fmt.Printf("Error getting 100 first db rows: %v\n", err) + fmt.Printf("Error getting 10000 first db rows: %v\n", err) return } @@ -88,7 +110,11 @@ func pruneOldEntries(maxSizeBytes int64) { } if len(entryIdsToRemove) > 0 { - GetEntriesTable().Delete(entryIdsToRemove) - fmt.Printf("Removed %d rows and cleared %d bytes", len(entryIdsToRemove), bytesToBeRemoved) + GetEntriesTable().Where(entryIdsToRemove).Delete(models.MizuEntry{}) + // VACUUM causes sqlite to shrink the db file after rows have been deleted, the db file will not shrink without this + DB.Exec("VACUUM") + fmt.Printf("Removed %d rows and cleared %s bytes", len(entryIdsToRemove), shared.BytesToHumanReadable(bytesToBeRemoved)) + } else { + fmt.Printf("Found no rows to remove when pruning") } } diff --git a/api/pkg/sensitiveDataFiltering/messageSensitiveDataCleaner.go b/api/pkg/sensitiveDataFiltering/messageSensitiveDataCleaner.go index cc0e4d289..0d9882fb0 100644 --- a/api/pkg/sensitiveDataFiltering/messageSensitiveDataCleaner.go +++ b/api/pkg/sensitiveDataFiltering/messageSensitiveDataCleaner.go @@ -15,8 +15,8 @@ import ( ) func FilterSensitiveInfoFromHarRequest(harOutputItem *tap.OutputChannelItem, options *shared.TrafficFilteringOptions) { - harOutputItem.HarEntry.Request.Headers = filterHarHeaders(harOutputItem.HarEntry.Request.Headers) - harOutputItem.HarEntry.Response.Headers = filterHarHeaders(harOutputItem.HarEntry.Response.Headers) + //harOutputItem.HarEntry.Request.Headers = filterHarHeaders(harOutputItem.HarEntry.Request.Headers) + //harOutputItem.HarEntry.Response.Headers = filterHarHeaders(harOutputItem.HarEntry.Response.Headers) harOutputItem.HarEntry.Request.Cookies = make([]har.Cookie, 0, 0) harOutputItem.HarEntry.Response.Cookies = make([]har.Cookie, 0, 0) diff --git a/cli/cmd/tap.go b/cli/cmd/tap.go index 47f209738..30611bee0 100644 --- a/cli/cmd/tap.go +++ b/cli/cmd/tap.go @@ -3,6 +3,7 @@ package cmd import ( "errors" "fmt" + "github.com/up9inc/mizu/shared" "regexp" "strings" @@ -21,10 +22,15 @@ type MizuTapOptions struct { MizuImage string PlainTextFilterRegexes []string TapOutgoing bool + HideHealthChecks bool + MaxEntriesDBSizeBytes int64 } var mizuTapOptions = &MizuTapOptions{} var direction string +var humanMaxEntriesDBSize string + +const maxEntriesDBSizeFlagName = "max-entries-db-size" var tapCmd = &cobra.Command{ Use: "tap [POD REGEX]", @@ -43,6 +49,14 @@ Supported protocols are HTTP and gRPC.`, return errors.New(fmt.Sprintf("%s is not a valid regex %s", args[0], err)) } + mizuTapOptions.MaxEntriesDBSizeBytes, err = shared.HumanReadableToBytes(humanMaxEntriesDBSize) + if err != nil { + return errors.New(fmt.Sprintf("Could not parse --max-entries-db-size value %s", humanMaxEntriesDBSize)) + } else if cmd.Flags().Changed(maxEntriesDBSizeFlagName) { + // We're parsing human readable file sizes here so its best to be unambiguous + fmt.Printf("Setting max entries db size to %s\n", shared.BytesToHumanReadable(mizuTapOptions.MaxEntriesDBSizeBytes)) + } + directionLowerCase := strings.ToLower(direction) if directionLowerCase == "any" { mizuTapOptions.TapOutgoing = true @@ -69,4 +83,6 @@ func init() { tapCmd.Flags().StringVarP(&mizuTapOptions.MizuImage, "mizu-image", "", fmt.Sprintf("gcr.io/up9-docker-hub/mizu/%s:latest", mizu.Branch), "Custom image for mizu collector") tapCmd.Flags().StringArrayVarP(&mizuTapOptions.PlainTextFilterRegexes, "regex-masking", "r", nil, "List of regex expressions that are used to filter matching values from text/plain http bodies") tapCmd.Flags().StringVarP(&direction, "direction", "", "in", "Record traffic that goes in this direction (relative to the tapped pod): in/any") + tapCmd.Flags().BoolVar(&mizuTapOptions.HideHealthChecks, "hide-healthchecks", false, "hides requests with kube-probe or prometheus user-agent headers") + tapCmd.Flags().StringVarP(&humanMaxEntriesDBSize, maxEntriesDBSizeFlagName, "", "200MB", "override the default max entries db size of 200mb") } diff --git a/cli/cmd/tapRunner.go b/cli/cmd/tapRunner.go index 320b37987..eb63a39e2 100644 --- a/cli/cmd/tapRunner.go +++ b/cli/cmd/tapRunner.go @@ -95,7 +95,7 @@ func createMizuAggregator(ctx context.Context, kubernetesProvider *kubernetes.Pr var err error mizuServiceAccountExists = createRBACIfNecessary(ctx, kubernetesProvider) - _, err = kubernetesProvider.CreateMizuAggregatorPod(ctx, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.MizuImage, mizuServiceAccountExists, mizuApiFilteringOptions) + _, err = kubernetesProvider.CreateMizuAggregatorPod(ctx, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.MizuImage, mizuServiceAccountExists, mizuApiFilteringOptions, tappingOptions.MaxEntriesDBSizeBytes) if err != nil { fmt.Printf("Error creating mizu collector pod: %v\n", err) return err @@ -111,21 +111,21 @@ func createMizuAggregator(ctx context.Context, kubernetesProvider *kubernetes.Pr } func getMizuApiFilteringOptions(tappingOptions *MizuTapOptions) (*shared.TrafficFilteringOptions, error) { - if tappingOptions.PlainTextFilterRegexes == nil || len(tappingOptions.PlainTextFilterRegexes) == 0 { - return nil, nil - } + var compiledRegexSlice []*shared.SerializableRegexp - compiledRegexSlice := make([]*shared.SerializableRegexp, 0) - for _, regexStr := range tappingOptions.PlainTextFilterRegexes { - compiledRegex, err := shared.CompileRegexToSerializableRegexp(regexStr) - if err != nil { - fmt.Printf("Regex %s is invalid: %v", regexStr, err) - return nil, err + if tappingOptions.PlainTextFilterRegexes != nil && len(tappingOptions.PlainTextFilterRegexes) > 0 { + compiledRegexSlice = make([]*shared.SerializableRegexp, 0) + for _, regexStr := range tappingOptions.PlainTextFilterRegexes { + compiledRegex, err := shared.CompileRegexToSerializableRegexp(regexStr) + if err != nil { + fmt.Printf("Regex %s is invalid: %v", regexStr, err) + return nil, err + } + compiledRegexSlice = append(compiledRegexSlice, compiledRegex) } - compiledRegexSlice = append(compiledRegexSlice, compiledRegex) } - return &shared.TrafficFilteringOptions{PlainTextMaskingRegexes: compiledRegexSlice}, nil + return &shared.TrafficFilteringOptions{PlainTextMaskingRegexes: compiledRegexSlice, HideHealthChecks: tappingOptions.HideHealthChecks}, nil } func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions) error { diff --git a/cli/kubernetes/provider.go b/cli/kubernetes/provider.go index 0bf40c1b8..ab92dfc17 100644 --- a/cli/kubernetes/provider.go +++ b/cli/kubernetes/provider.go @@ -8,6 +8,7 @@ import ( "fmt" "path/filepath" "regexp" + "strconv" "github.com/up9inc/mizu/shared" core "k8s.io/api/core/v1" @@ -70,7 +71,7 @@ func (provider *Provider) GetPodWatcher(ctx context.Context, namespace string) w return watcher } -func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace string, podName string, podImage string, linkServiceAccount bool, mizuApiFilteringOptions *shared.TrafficFilteringOptions) (*core.Pod, error) { +func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace string, podName string, podImage string, linkServiceAccount bool, mizuApiFilteringOptions *shared.TrafficFilteringOptions, maxEntriesDBSizeBytes int64) (*core.Pod, error) { marshaledFilteringOptions, err := json.Marshal(mizuApiFilteringOptions) if err != nil { return nil, err @@ -97,6 +98,10 @@ func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace Name: shared.MizuFilteringOptionsEnvVar, Value: string(marshaledFilteringOptions), }, + { + Name: shared.MaxEntriesDBSizeByteSEnvVar, + Value: strconv.FormatInt(maxEntriesDBSizeBytes, 10), + }, }, }, }, diff --git a/shared/consts.go b/shared/consts.go index ccda67615..b752b6ed3 100644 --- a/shared/consts.go +++ b/shared/consts.go @@ -5,4 +5,5 @@ const ( HostModeEnvVar = "HOST_MODE" NodeNameEnvVar = "NODE_NAME" TappedAddressesPerNodeDictEnvVar = "TAPPED_ADDRESSES_PER_HOST" + MaxEntriesDBSizeByteSEnvVar = "MAX_ENTRIES_DB_BYTES" ) diff --git a/shared/go.mod b/shared/go.mod index d1afa338c..625391625 100644 --- a/shared/go.mod +++ b/shared/go.mod @@ -2,4 +2,8 @@ module github.com/up9inc/mizu/shared go 1.16 -require github.com/gorilla/websocket v1.4.2 +require ( + github.com/gorilla/websocket v1.4.2 + github.com/docker/go-units v0.4.0 +) + diff --git a/shared/human_data_sizes.go b/shared/human_data_sizes.go new file mode 100644 index 000000000..30596b9fb --- /dev/null +++ b/shared/human_data_sizes.go @@ -0,0 +1,11 @@ +package shared + +import "github.com/docker/go-units" + +func BytesToHumanReadable(bytes int64) string { + return units.HumanSize(float64(bytes)) +} + +func HumanReadableToBytes(humanReadableSize string) (int64, error) { + return units.FromHumanSize(humanReadableSize) +} diff --git a/shared/models.go b/shared/models.go index f1d443cdf..edeb3b51b 100644 --- a/shared/models.go +++ b/shared/models.go @@ -58,4 +58,5 @@ func CreateWebSocketMessageTypeAnalyzeStatus(analyzeStatus AnalyzeStatus) WebSoc type TrafficFilteringOptions struct { PlainTextMaskingRegexes []*SerializableRegexp + HideHealthChecks bool }