diff --git a/api/main.go b/api/main.go index 7134733d6..c257bb638 100644 --- a/api/main.go +++ b/api/main.go @@ -16,7 +16,6 @@ import ( "mizuserver/pkg/utils" "os" "os/signal" - "regexp" ) var shouldTap = flag.Bool("tap", false, "Run in tapper mode without API") @@ -24,9 +23,6 @@ var aggregator = flag.Bool("aggregator", false, "Run in aggregator mode with API var standalone = flag.Bool("standalone", false, "Run in standalone tapper and API mode") var aggregatorAddress = flag.String("aggregator-address", "", "Address of mizu collector for tapping") -const nodeNameEnvVar = "NODE_NAME" -const tappedAddressesPerNodeDictEnvVar = "TAPPED_ADDRESSES_PER_HOST" -const plainTextRegexesEnvVar = "PLAINTEXT_REGEXES" func main() { flag.Parse() @@ -38,7 +34,7 @@ func main() { if *standalone { harOutputChannel := tap.StartPassiveTapper() filteredHarChannel := make(chan *tap.OutputChannelItem) - go filterHarHeaders(harOutputChannel, filteredHarChannel) + go filterHarHeaders(harOutputChannel, filteredHarChannel, getFilteringOptions()) go api.StartReadingEntries(filteredHarChannel, nil) hostApi(nil) } else if *shouldTap { @@ -57,12 +53,12 @@ func main() { if err != nil { panic(fmt.Sprintf("Error connecting to socket server at %s %v", *aggregatorAddress, err)) } - filteredHarChannel := make(chan *tap.OutputChannelItem) - go filterHarHeaders(harOutputChannel, filteredHarChannel) - go pipeChannelToSocket(socketConnection, filteredHarChannel) + go pipeChannelToSocket(socketConnection, harOutputChannel) } else if *aggregator { socketHarOutChannel := make(chan *tap.OutputChannelItem, 1000) - go api.StartReadingEntries(socketHarOutChannel, nil) + filteredHarChannel := make(chan *tap.OutputChannelItem) + go api.StartReadingEntries(filteredHarChannel, nil) + go filterHarHeaders(socketHarOutChannel, filteredHarChannel, getFilteringOptions()) hostApi(socketHarOutChannel) } @@ -96,40 +92,32 @@ func hostApi(socketHarOutputChannel chan<- *tap.OutputChannelItem) { func getTapTargets() []string { - nodeName := os.Getenv(nodeNameEnvVar) + nodeName := os.Getenv(shared.NodeNameEnvVar) var tappedAddressesPerNodeDict map[string][]string - err := json.Unmarshal([]byte(os.Getenv(tappedAddressesPerNodeDictEnvVar)), &tappedAddressesPerNodeDict) + err := json.Unmarshal([]byte(os.Getenv(shared.TappedAddressesPerNodeDictEnvVar)), &tappedAddressesPerNodeDict) if err != nil { - panic(fmt.Sprintf("env var %s's value of %s is invalid! must be map[string][]string %v", tappedAddressesPerNodeDictEnvVar, tappedAddressesPerNodeDict, err)) + panic(fmt.Sprintf("env var %s's value of %s is invalid! must be map[string][]string %v", shared.TappedAddressesPerNodeDictEnvVar, tappedAddressesPerNodeDict, err)) } return tappedAddressesPerNodeDict[nodeName] } -func getFilteringOptions() *sensitiveDataFiltering.FilteringOptions { - regexJsonArr := os.Getenv(plainTextRegexesEnvVar) - if regexJsonArr == "" { +func getFilteringOptions() *shared.FilteringOptions { + filteringOptionsJson := os.Getenv(shared.MizuFilteringOptionsEnvVar) + if filteringOptionsJson == "" { return nil } - var regexStrSlice []string - err := json.Unmarshal([]byte(regexJsonArr), ®exStrSlice) + var filteringOptions shared.FilteringOptions + err := json.Unmarshal([]byte(filteringOptionsJson), &filteringOptions) if err != nil { - panic(fmt.Sprintf("env var %s's value of %s is invalid! must be []string %v", plainTextRegexesEnvVar, regexJsonArr, err)) + panic(fmt.Sprintf("env var %s's value of %s is invalid! json must match the shared.FilteringOptions struct %v", shared.MizuFilteringOptionsEnvVar, filteringOptionsJson, err)) } - parsedRegexSlice := make([]regexp.Regexp, 0) - for _, regexStr := range regexStrSlice { - regex, err := regexp.Compile(regexStr) - if err != nil { - panic(fmt.Sprintf("env var %s's value of %s is invalid! must be []string %v", plainTextRegexesEnvVar, regexJsonArr, err)) - } - parsedRegexSlice = append(parsedRegexSlice, *regex) - } - return &sensitiveDataFiltering.FilteringOptions{PlainTextFilterRegexes: parsedRegexSlice} + return &filteringOptions } -func filterHarHeaders(inChannel <- chan *tap.OutputChannelItem, outChannel chan *tap.OutputChannelItem) { +func filterHarHeaders(inChannel <- chan *tap.OutputChannelItem, outChannel chan *tap.OutputChannelItem, filterOptions *shared.FilteringOptions) { for message := range inChannel { - sensitiveDataFiltering.FilterSensitiveInfoFromHarRequest(message, nil) + sensitiveDataFiltering.FilterSensitiveInfoFromHarRequest(message, filterOptions) outChannel <- message } } diff --git a/api/pkg/sensitiveDataFiltering/messageSensitiveDataCleaner.go b/api/pkg/sensitiveDataFiltering/messageSensitiveDataCleaner.go index fa247f196..0bd8b95bd 100644 --- a/api/pkg/sensitiveDataFiltering/messageSensitiveDataCleaner.go +++ b/api/pkg/sensitiveDataFiltering/messageSensitiveDataCleaner.go @@ -3,20 +3,16 @@ package sensitiveDataFiltering import ( "encoding/json" "fmt" + "github.com/up9inc/mizu/shared" "mizuserver/pkg/tap" "net/url" - "regexp" "strings" "github.com/beevik/etree" "github.com/google/martian/har" ) -type FilteringOptions struct { - PlainTextFilterRegexes []regexp.Regexp -} - -func FilterSensitiveInfoFromHarRequest(harOutputItem *tap.OutputChannelItem, options *FilteringOptions) { +func FilterSensitiveInfoFromHarRequest(harOutputItem *tap.OutputChannelItem, options *shared.FilteringOptions) { filterHarHeaders(harOutputItem.HarEntry.Request.Headers) filterHarHeaders(harOutputItem.HarEntry.Response.Headers) @@ -78,7 +74,7 @@ func isFieldNameSensitive(fieldName string) bool { return false } -func filterHttpBody(bytes []byte, contentType string, options *FilteringOptions) ([]byte, error) { +func filterHttpBody(bytes []byte, contentType string, options *shared.FilteringOptions) ([]byte, error) { mimeType := strings.Split(contentType, ";")[0] switch strings.ToLower(mimeType) { case "application/json": @@ -99,7 +95,7 @@ func filterHttpBody(bytes []byte, contentType string, options *FilteringOptions) return bytes, nil } -func filterPlainText(bytes []byte, options *FilteringOptions) []byte { +func filterPlainText(bytes []byte, options *shared.FilteringOptions) []byte { for _, regex := range options.PlainTextFilterRegexes { bytes = regex.ReplaceAll(bytes, []byte(maskedFieldPlaceholderValue)) } diff --git a/api/pkg/tap/passive_tapper.go b/api/pkg/tap/passive_tapper.go index b5cacc9e8..72f5c1294 100644 --- a/api/pkg/tap/passive_tapper.go +++ b/api/pkg/tap/passive_tapper.go @@ -13,6 +13,7 @@ import ( "encoding/json" "flag" "fmt" + "github.com/up9inc/mizu/shared" "log" "os" "os/signal" @@ -34,7 +35,6 @@ import ( const AppPortsEnvVar = "APP_PORTS" const OutPortEnvVar = "WEB_SOCKET_PORT" const maxHTTP2DataLenEnvVar = "HTTP2_DATA_SIZE_LIMIT" -const hostModeEnvVar = "HOST_MODE" // default is 1MB, more than the max size accepted by collector and traffic-dumper const maxHTTP2DataLenDefault = 1 * 1024 * 1024 const cleanPeriod = time.Second * 10 @@ -258,7 +258,7 @@ func startPassiveTapper(harWriter *HarWriter) { maxHTTP2DataLen = convertedInt } } - hostMode = os.Getenv(hostModeEnvVar) == "1" + hostMode = os.Getenv(shared.HostModeEnvVar) == "1" fmt.Printf("App Ports: %v\n", appPorts) fmt.Printf("Tap output websocket port: %s\n", tapOutputPort) diff --git a/cli/cmd/tapRunner.go b/cli/cmd/tapRunner.go index 040957ecb..44e38904d 100644 --- a/cli/cmd/tapRunner.go +++ b/cli/cmd/tapRunner.go @@ -3,6 +3,7 @@ package cmd import ( "context" "fmt" + "github.com/up9inc/mizu/shared" "os" "os/signal" "regexp" @@ -26,6 +27,10 @@ const ( var currentlyTappedPods []core.Pod func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) { + mizuApiFilteringOptions, err := getMizuApiFilteringOptions(tappingOptions) + if err != nil { + return + } kubernetesProvider := kubernetes.NewProvider(tappingOptions.KubeConfigPath, tappingOptions.Namespace) defer cleanUpMizuResources(kubernetesProvider) @@ -43,7 +48,7 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) { return } - if err := createMizuResources(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil { + if err := createMizuResources(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions, mizuApiFilteringOptions); err != nil { return } @@ -57,8 +62,8 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) { // TODO handle incoming traffic from tapper using a channel } -func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions) error { - if err := createMizuAggregator(ctx, kubernetesProvider, tappingOptions); err != nil { +func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions, mizuApiFilteringOptions *shared.FilteringOptions) error { + if err := createMizuAggregator(ctx, kubernetesProvider, tappingOptions, mizuApiFilteringOptions); err != nil { return err } @@ -69,11 +74,11 @@ func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Pro return nil } -func createMizuAggregator(ctx context.Context, kubernetesProvider *kubernetes.Provider, tappingOptions *MizuTapOptions) error { +func createMizuAggregator(ctx context.Context, kubernetesProvider *kubernetes.Provider, tappingOptions *MizuTapOptions, mizuApiFilteringOptions *shared.FilteringOptions) error { var err error mizuServiceAccountExists = createRBACIfNecessary(ctx, kubernetesProvider) - _, err = kubernetesProvider.CreateMizuAggregatorPod(ctx, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.MizuImage, mizuServiceAccountExists) + _, err = kubernetesProvider.CreateMizuAggregatorPod(ctx, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.MizuImage, mizuServiceAccountExists, mizuApiFilteringOptions) if err != nil { fmt.Printf("Error creating mizu collector pod: %v\n", err) return err @@ -88,6 +93,24 @@ func createMizuAggregator(ctx context.Context, kubernetesProvider *kubernetes.Pr return nil } +func getMizuApiFilteringOptions(tappingOptions *MizuTapOptions) (*shared.FilteringOptions, error) { + if tappingOptions.PlainTextFilterRegexes == nil || len(tappingOptions.PlainTextFilterRegexes) == 0 { + return nil, nil + } + + compiledRegexSlice := make([]*shared.SerializableRegexp, 0) + for _, regexStr := range tappingOptions.PlainTextFilterRegexes { + compiledRegex, err := shared.CompileRegexToSerializableRegexp(regexStr) + if err != nil { + fmt.Printf("Regex %s is invalid: %v", regexStr, err) + return nil, err + } + compiledRegexSlice = append(compiledRegexSlice, compiledRegex) + } + + return &shared.FilteringOptions{PlainTextFilterRegexes: compiledRegexSlice}, nil +} + func createMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions) error { if err := kubernetesProvider.ApplyMizuTapperDaemonSet( ctx, diff --git a/cli/kubernetes/provider.go b/cli/kubernetes/provider.go index 348068b4c..d6b598a27 100644 --- a/cli/kubernetes/provider.go +++ b/cli/kubernetes/provider.go @@ -6,19 +6,20 @@ import ( "encoding/json" "errors" "fmt" + "github.com/up9inc/mizu/shared" "path/filepath" "regexp" - applyconfapp "k8s.io/client-go/applyconfigurations/apps/v1" - applyconfmeta "k8s.io/client-go/applyconfigurations/meta/v1" - applyconfcore "k8s.io/client-go/applyconfigurations/core/v1" core "k8s.io/api/core/v1" rbac "k8s.io/api/rbac/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/watch" + applyconfapp "k8s.io/client-go/applyconfigurations/apps/v1" + applyconfcore "k8s.io/client-go/applyconfigurations/core/v1" + applyconfmeta "k8s.io/client-go/applyconfigurations/meta/v1" "k8s.io/client-go/kubernetes" _ "k8s.io/client-go/plugin/pkg/client/auth/azure" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" @@ -85,7 +86,11 @@ func (provider *Provider) GetPods(ctx context.Context, namespace string) { fmt.Printf("There are %d pods in Namespace %s\n", len(pods.Items), namespace) } -func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace string, podName string, podImage string, linkServiceAccount bool) (*core.Pod, error) { +func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace string, podName string, podImage string, linkServiceAccount bool, mizuApiFilteringOptions *shared.FilteringOptions) (*core.Pod, error) { + marshaledFilteringOptions, err := json.Marshal(mizuApiFilteringOptions) + if err != nil { + return nil, err + } pod := &core.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: podName, @@ -101,9 +106,13 @@ func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace Command: []string {"./mizuagent", "--aggregator"}, Env: []core.EnvVar{ { - Name: "HOST_MODE", + Name: shared.HostModeEnvVar, Value: "1", }, + { + Name: shared.MizuFilteringOptionsEnvVar, + Value: string(marshaledFilteringOptions), + }, }, }, }, @@ -232,12 +241,11 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac agentContainer.WithSecurityContext(applyconfcore.SecurityContext().WithPrivileged(privileged)) agentContainer.WithCommand("./mizuagent", "-i", "any", "--tap", "--hardump", "--aggregator-address", fmt.Sprintf("ws://%s/wsTapper", aggregatorPodIp)) agentContainer.WithEnv( - applyconfcore.EnvVar().WithName("HOST_MODE").WithValue("1"), - applyconfcore.EnvVar().WithName("AGGREGATOR_ADDRESS").WithValue(aggregatorPodIp), - applyconfcore.EnvVar().WithName("TAPPED_ADDRESSES_PER_HOST").WithValue(string(nodeToTappedPodIPMapJsonStr)), + applyconfcore.EnvVar().WithName(shared.HostModeEnvVar).WithValue("1"), + applyconfcore.EnvVar().WithName(shared.TappedAddressesPerNodeDictEnvVar).WithValue(string(nodeToTappedPodIPMapJsonStr)), ) agentContainer.WithEnv( - applyconfcore.EnvVar().WithName("NODE_NAME").WithValueFrom( + applyconfcore.EnvVar().WithName(shared.NodeNameEnvVar).WithValueFrom( applyconfcore.EnvVarSource().WithFieldRef( applyconfcore.ObjectFieldSelector().WithAPIVersion("v1").WithFieldPath("spec.nodeName"), ), diff --git a/shared/consts.go b/shared/consts.go new file mode 100644 index 000000000..ccda67615 --- /dev/null +++ b/shared/consts.go @@ -0,0 +1,8 @@ +package shared + +const ( + MizuFilteringOptionsEnvVar = "SENSITIVE_DATA_FILTERING_OPTIONS" + HostModeEnvVar = "HOST_MODE" + NodeNameEnvVar = "NODE_NAME" + TappedAddressesPerNodeDictEnvVar = "TAPPED_ADDRESSES_PER_HOST" +) diff --git a/shared/models.go b/shared/models.go index dba9e1731..44e3ff9f5 100644 --- a/shared/models.go +++ b/shared/models.go @@ -33,3 +33,7 @@ func CreateWebSocketStatusMessage(tappingStatus TapStatus) WebSocketStatusMessag TappingStatus: tappingStatus, } } + +type FilteringOptions struct { + PlainTextFilterRegexes []*SerializableRegexp +} diff --git a/shared/serializableRegexp.go b/shared/serializableRegexp.go new file mode 100644 index 000000000..a0a513ff6 --- /dev/null +++ b/shared/serializableRegexp.go @@ -0,0 +1,34 @@ +package shared + +import "regexp" + +type SerializableRegexp struct { + regexp.Regexp +} + +// CompileRegexToSerializableRegexp wraps the result of the standard library's +// regexp.Compile, for easy (un)marshaling. +func CompileRegexToSerializableRegexp(expr string) (*SerializableRegexp, error) { + re, err := regexp.Compile(expr) + if err != nil { + return nil, err + } + return &SerializableRegexp{*re}, nil +} + +// UnmarshalText satisfies the encoding.TextMarshaler interface, +// also used by json.Unmarshal. +func (r *SerializableRegexp) UnmarshalText(text []byte) error { + rr, err := CompileRegexToSerializableRegexp(string(text)) + if err != nil { + return err + } + *r = *rr + return nil +} + +// MarshalText satisfies the encoding.TextMarshaler interface, +// also used by json.Marshal. +func (r *SerializableRegexp) MarshalText() ([]byte, error) { + return []byte(r.String()), nil +}