Update go.sum, main.go, and 10 more files...

This commit is contained in:
RamiBerm
2021-07-13 16:21:32 +03:00
parent d684dee7a4
commit 96f47116f0
12 changed files with 126 additions and 34 deletions

View File

@@ -61,6 +61,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/djherbis/atime v1.0.0 h1:ySLvBAM0EvOGaX7TI4dAM5lWj+RdJUCKtGSEHN8SGBg=
github.com/djherbis/atime v1.0.0/go.mod h1:5W+KBIuTwVGcqjIfaTwt+KSYX1o6uep8dtevevQP/f8=
github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw=
github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=

View File

@@ -17,6 +17,7 @@ import (
"mizuserver/pkg/utils"
"os"
"os/signal"
"strings"
)
var shouldTap = flag.Bool("tap", false, "Run in tapper mode without API")
@@ -126,11 +127,17 @@ func getTrafficFilteringOptions() *shared.TrafficFilteringOptions {
return &filteringOptions
}
var userAgentsToFilter = []string{"kube-probe", "prometheus"}
func filterHarItems(inChannel <- chan *tap.OutputChannelItem, outChannel chan *tap.OutputChannelItem, filterOptions *shared.TrafficFilteringOptions) {
for message := range inChannel {
if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) {
continue
}
// TODO: move this to tappers
if filterOptions.HideHealthChecks && isHealthCheckByUserAgent(message) {
continue
}
sensitiveDataFiltering.FilterSensitiveInfoFromHarRequest(message, filterOptions)
@@ -138,6 +145,20 @@ func filterHarItems(inChannel <- chan *tap.OutputChannelItem, outChannel chan *t
}
}
func isHealthCheckByUserAgent(message *tap.OutputChannelItem) bool {
for _, header := range message.HarEntry.Request.Headers {
if strings.ToLower(header.Name) == "user-agent" {
for _, userAgent := range userAgentsToFilter {
if strings.Contains(strings.ToLower(header.Value), userAgent) {
return true
}
return false
}
}
}
return false
}
func pipeChannelToSocket(connection *websocket.Conn, messageDataChannel <-chan *tap.OutputChannelItem) {
if connection == nil {
panic("Websocket connection is nil")

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"gorm.io/gorm/logger"
"mizuserver/pkg/models"
"mizuserver/pkg/utils"
)
@@ -12,9 +13,7 @@ const (
DBPath = "./entries.db"
)
var (
DB = initDataBase(DBPath)
)
var DB *gorm.DB
const (
OrderDesc = "desc"
@@ -34,13 +33,19 @@ var (
}
)
func init() {
DB = initDataBase(DBPath)
go StartEnforcingDatabaseSize()
}
func GetEntriesTable() *gorm.DB {
return DB.Table("mizu_entries")
}
func initDataBase(databasePath string) *gorm.DB {
go StartEnforcingDatabaseSize( 10 * 1000 * 1000)
temp, _ := gorm.Open(sqlite.Open(databasePath), &gorm.Config{})
temp, _ := gorm.Open(sqlite.Open(databasePath), &gorm.Config{
Logger: logger.Default.LogMode(logger.Silent),
})
_ = temp.AutoMigrate(&models.MizuEntry{}) // this will ensure table is created
return temp
}

View File

@@ -3,24 +3,34 @@ package database
import (
"fmt"
"github.com/fsnotify/fsnotify"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/debounce"
"log"
"mizuserver/pkg/models"
"os"
"strconv"
"time"
)
const percentageOfMaxSizeBytesToPrune = 5
const percentageOfMaxSizeBytesToPrune = 15
const defaultMaxDatabaseSizeBytes = 200 * 1000 * 1000
func StartEnforcingDatabaseSize(maxSizeBytes int64) {
func StartEnforcingDatabaseSize() {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Fatal("Error creating filesystem watcher for db size enforcement:", err)
log.Printf("Error creating filesystem watcher for db size enforcement: %v\n", err) // TODO: make fatal
return
}
defer watcher.Close()
checkFileSizeDebouncer := debounce.NewDebouncer(5 * time.Second, func() {
checkFileSize(maxSizeBytes)
maxEntriesDBByteSize, err := getMaxEntriesDBByteSize()
if err != nil {
log.Printf("Error parsing max db size: %v\n", err) // TODO: make fatal
return
}
checkFileSizeDebouncer := debounce.NewDebouncer(5*time.Second, func() {
checkFileSize(maxEntriesDBByteSize)
})
done := make(chan bool)
@@ -45,28 +55,40 @@ func StartEnforcingDatabaseSize(maxSizeBytes int64) {
err = watcher.Add(DBPath)
if err != nil {
log.Fatal(fmt.Sprintf("Error adding %s to filesystem watcher for db size enforcement: %v", DBPath, err))
log.Printf("Error adding %s to filesystem watcher for db size enforcement: %v\n", DBPath, err) //TODO: make fatal
}
<-done
}
func getMaxEntriesDBByteSize() (int64, error) {
maxEntriesDBByteSize := int64(defaultMaxDatabaseSizeBytes)
var err error
maxEntriesDBSizeByteSEnvVarValue := os.Getenv(shared.MaxEntriesDBSizeByteSEnvVar)
if maxEntriesDBSizeByteSEnvVarValue != "" {
maxEntriesDBByteSize, err = strconv.ParseInt(maxEntriesDBSizeByteSEnvVarValue, 10, 64)
}
return maxEntriesDBByteSize, err
}
func checkFileSize(maxSizeBytes int64) {
fileStat, err := os.Stat(DBPath)
if err != nil {
fmt.Printf("Error checking %s file size: %v\n", DBPath, err)
} else {
fmt.Printf("%s size is %s, checking if over %s\n", DBPath, shared.BytesToHumanReadable(fileStat.Size()), shared.BytesToHumanReadable(maxSizeBytes))
if fileStat.Size() > maxSizeBytes {
pruneOldEntries(maxSizeBytes)
pruneOldEntries(fileStat.Size())
}
}
}
func pruneOldEntries(maxSizeBytes int64) {
amountOfBytesToTrim := maxSizeBytes * (percentageOfMaxSizeBytesToPrune / 100)
func pruneOldEntries(currentFileSize int64) {
amountOfBytesToTrim := currentFileSize / (100 / percentageOfMaxSizeBytesToPrune)
rows, err := GetEntriesTable().Limit(100).Order("id").Rows()
rows, err := GetEntriesTable().Limit(10000).Order("id").Rows()
if err != nil {
fmt.Printf("Error getting 100 first db rows: %v\n", err)
fmt.Printf("Error getting 10000 first db rows: %v\n", err)
return
}
@@ -88,7 +110,11 @@ func pruneOldEntries(maxSizeBytes int64) {
}
if len(entryIdsToRemove) > 0 {
GetEntriesTable().Delete(entryIdsToRemove)
fmt.Printf("Removed %d rows and cleared %d bytes", len(entryIdsToRemove), bytesToBeRemoved)
GetEntriesTable().Where(entryIdsToRemove).Delete(models.MizuEntry{})
// VACUUM causes sqlite to shrink the db file after rows have been deleted, the db file will not shrink without this
DB.Exec("VACUUM")
fmt.Printf("Removed %d rows and cleared %s bytes", len(entryIdsToRemove), shared.BytesToHumanReadable(bytesToBeRemoved))
} else {
fmt.Printf("Found no rows to remove when pruning")
}
}

View File

@@ -15,8 +15,8 @@ import (
)
func FilterSensitiveInfoFromHarRequest(harOutputItem *tap.OutputChannelItem, options *shared.TrafficFilteringOptions) {
harOutputItem.HarEntry.Request.Headers = filterHarHeaders(harOutputItem.HarEntry.Request.Headers)
harOutputItem.HarEntry.Response.Headers = filterHarHeaders(harOutputItem.HarEntry.Response.Headers)
//harOutputItem.HarEntry.Request.Headers = filterHarHeaders(harOutputItem.HarEntry.Request.Headers)
//harOutputItem.HarEntry.Response.Headers = filterHarHeaders(harOutputItem.HarEntry.Response.Headers)
harOutputItem.HarEntry.Request.Cookies = make([]har.Cookie, 0, 0)
harOutputItem.HarEntry.Response.Cookies = make([]har.Cookie, 0, 0)

View File

@@ -3,6 +3,7 @@ package cmd
import (
"errors"
"fmt"
"github.com/up9inc/mizu/shared"
"regexp"
"strings"
@@ -21,10 +22,15 @@ type MizuTapOptions struct {
MizuImage string
PlainTextFilterRegexes []string
TapOutgoing bool
HideHealthChecks bool
MaxEntriesDBSizeBytes int64
}
var mizuTapOptions = &MizuTapOptions{}
var direction string
var humanMaxEntriesDBSize string
const maxEntriesDBSizeFlagName = "max-entries-db-size"
var tapCmd = &cobra.Command{
Use: "tap [POD REGEX]",
@@ -43,6 +49,14 @@ Supported protocols are HTTP and gRPC.`,
return errors.New(fmt.Sprintf("%s is not a valid regex %s", args[0], err))
}
mizuTapOptions.MaxEntriesDBSizeBytes, err = shared.HumanReadableToBytes(humanMaxEntriesDBSize)
if err != nil {
return errors.New(fmt.Sprintf("Could not parse --max-entries-db-size value %s", humanMaxEntriesDBSize))
} else if cmd.Flags().Changed(maxEntriesDBSizeFlagName) {
// We're parsing human readable file sizes here so its best to be unambiguous
fmt.Printf("Setting max entries db size to %s\n", shared.BytesToHumanReadable(mizuTapOptions.MaxEntriesDBSizeBytes))
}
directionLowerCase := strings.ToLower(direction)
if directionLowerCase == "any" {
mizuTapOptions.TapOutgoing = true
@@ -69,4 +83,6 @@ func init() {
tapCmd.Flags().StringVarP(&mizuTapOptions.MizuImage, "mizu-image", "", fmt.Sprintf("gcr.io/up9-docker-hub/mizu/%s:latest", mizu.Branch), "Custom image for mizu collector")
tapCmd.Flags().StringArrayVarP(&mizuTapOptions.PlainTextFilterRegexes, "regex-masking", "r", nil, "List of regex expressions that are used to filter matching values from text/plain http bodies")
tapCmd.Flags().StringVarP(&direction, "direction", "", "in", "Record traffic that goes in this direction (relative to the tapped pod): in/any")
tapCmd.Flags().BoolVar(&mizuTapOptions.HideHealthChecks, "hide-healthchecks", false, "hides requests with kube-probe or prometheus user-agent headers")
tapCmd.Flags().StringVarP(&humanMaxEntriesDBSize, maxEntriesDBSizeFlagName, "", "200MB", "override the default max entries db size of 200mb")
}

View File

@@ -95,7 +95,7 @@ func createMizuAggregator(ctx context.Context, kubernetesProvider *kubernetes.Pr
var err error
mizuServiceAccountExists = createRBACIfNecessary(ctx, kubernetesProvider)
_, err = kubernetesProvider.CreateMizuAggregatorPod(ctx, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.MizuImage, mizuServiceAccountExists, mizuApiFilteringOptions)
_, err = kubernetesProvider.CreateMizuAggregatorPod(ctx, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.MizuImage, mizuServiceAccountExists, mizuApiFilteringOptions, tappingOptions.MaxEntriesDBSizeBytes)
if err != nil {
fmt.Printf("Error creating mizu collector pod: %v\n", err)
return err
@@ -111,21 +111,21 @@ func createMizuAggregator(ctx context.Context, kubernetesProvider *kubernetes.Pr
}
func getMizuApiFilteringOptions(tappingOptions *MizuTapOptions) (*shared.TrafficFilteringOptions, error) {
if tappingOptions.PlainTextFilterRegexes == nil || len(tappingOptions.PlainTextFilterRegexes) == 0 {
return nil, nil
}
var compiledRegexSlice []*shared.SerializableRegexp
compiledRegexSlice := make([]*shared.SerializableRegexp, 0)
for _, regexStr := range tappingOptions.PlainTextFilterRegexes {
compiledRegex, err := shared.CompileRegexToSerializableRegexp(regexStr)
if err != nil {
fmt.Printf("Regex %s is invalid: %v", regexStr, err)
return nil, err
if tappingOptions.PlainTextFilterRegexes != nil && len(tappingOptions.PlainTextFilterRegexes) > 0 {
compiledRegexSlice = make([]*shared.SerializableRegexp, 0)
for _, regexStr := range tappingOptions.PlainTextFilterRegexes {
compiledRegex, err := shared.CompileRegexToSerializableRegexp(regexStr)
if err != nil {
fmt.Printf("Regex %s is invalid: %v", regexStr, err)
return nil, err
}
compiledRegexSlice = append(compiledRegexSlice, compiledRegex)
}
compiledRegexSlice = append(compiledRegexSlice, compiledRegex)
}
return &shared.TrafficFilteringOptions{PlainTextMaskingRegexes: compiledRegexSlice}, nil
return &shared.TrafficFilteringOptions{PlainTextMaskingRegexes: compiledRegexSlice, HideHealthChecks: tappingOptions.HideHealthChecks}, nil
}
func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions) error {

View File

@@ -8,6 +8,7 @@ import (
"fmt"
"path/filepath"
"regexp"
"strconv"
"github.com/up9inc/mizu/shared"
core "k8s.io/api/core/v1"
@@ -70,7 +71,7 @@ func (provider *Provider) GetPodWatcher(ctx context.Context, namespace string) w
return watcher
}
func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace string, podName string, podImage string, linkServiceAccount bool, mizuApiFilteringOptions *shared.TrafficFilteringOptions) (*core.Pod, error) {
func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace string, podName string, podImage string, linkServiceAccount bool, mizuApiFilteringOptions *shared.TrafficFilteringOptions, maxEntriesDBSizeBytes int64) (*core.Pod, error) {
marshaledFilteringOptions, err := json.Marshal(mizuApiFilteringOptions)
if err != nil {
return nil, err
@@ -97,6 +98,10 @@ func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace
Name: shared.MizuFilteringOptionsEnvVar,
Value: string(marshaledFilteringOptions),
},
{
Name: shared.MaxEntriesDBSizeByteSEnvVar,
Value: strconv.FormatInt(maxEntriesDBSizeBytes, 10),
},
},
},
},

View File

@@ -5,4 +5,5 @@ const (
HostModeEnvVar = "HOST_MODE"
NodeNameEnvVar = "NODE_NAME"
TappedAddressesPerNodeDictEnvVar = "TAPPED_ADDRESSES_PER_HOST"
MaxEntriesDBSizeByteSEnvVar = "MAX_ENTRIES_DB_BYTES"
)

View File

@@ -2,4 +2,8 @@ module github.com/up9inc/mizu/shared
go 1.16
require github.com/gorilla/websocket v1.4.2
require (
github.com/gorilla/websocket v1.4.2
github.com/docker/go-units v0.4.0
)

View File

@@ -0,0 +1,11 @@
package shared
import "github.com/docker/go-units"
func BytesToHumanReadable(bytes int64) string {
return units.HumanSize(float64(bytes))
}
func HumanReadableToBytes(humanReadableSize string) (int64, error) {
return units.FromHumanSize(humanReadableSize)
}

View File

@@ -58,4 +58,5 @@ func CreateWebSocketMessageTypeAnalyzeStatus(analyzeStatus AnalyzeStatus) WebSoc
type TrafficFilteringOptions struct {
PlainTextMaskingRegexes []*SerializableRegexp
HideHealthChecks bool
}