From 9fa9b67328010d7c5e23c0366a75ba6c93927fae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=2E=20Mert=20Y=C4=B1ld=C4=B1ran?= Date: Sat, 18 Sep 2021 20:13:59 +0300 Subject: [PATCH] Bring back the sensitive data filtering feature (#280) * Bring back the sensitive data filtering feature * Add `// global` comment --- agent/main.go | 27 +-- cli/cmd/tapRunner.go | 15 +- cli/go.mod | 3 + cli/kubernetes/provider.go | 3 +- shared/models.go | 6 - tap/api/api.go | 2 +- tap/api/options.go | 7 + .../api/serializable_regexp.go | 2 +- tap/extensions/amqp/main.go | 2 +- tap/extensions/http/go.mod | 1 + tap/extensions/http/go.sum | 2 + tap/extensions/http/handlers.go | 17 +- tap/extensions/http/main.go | 8 +- tap/extensions/http/sensitive_data_cleaner.go | 209 ++++++++++++++++++ tap/extensions/kafka/main.go | 2 +- tap/passive_tapper.go | 10 +- tap/tcp_reader.go | 2 +- 17 files changed, 270 insertions(+), 48 deletions(-) create mode 100644 tap/api/options.go rename shared/serializableRegexp.go => tap/api/serializable_regexp.go (97%) create mode 100644 tap/extensions/http/sensitive_data_cleaner.go diff --git a/agent/main.go b/agent/main.go index b19db67b3..f0cff4971 100644 --- a/agent/main.go +++ b/agent/main.go @@ -50,14 +50,16 @@ func main() { panic("One of the flags --tap, --api or --standalone or --hars-read must be provided") } + filteringOptions := getTrafficFilteringOptions() + if *standaloneMode { api.StartResolving(*namespace) outputItemsChannel := make(chan *tapApi.OutputChannelItem) filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem) - tap.StartPassiveTapper(tapOpts, outputItemsChannel, extensions) + tap.StartPassiveTapper(tapOpts, outputItemsChannel, extensions, filteringOptions) - go filterItems(outputItemsChannel, filteredOutputItemsChannel, getTrafficFilteringOptions()) + go filterItems(outputItemsChannel, filteredOutputItemsChannel, filteringOptions) go api.StartReadingEntries(filteredOutputItemsChannel, nil, extensionsMap) // go api.StartReadingOutbound(outboundLinkOutputChannel) @@ -73,9 +75,8 @@ func main() { rlog.Infof("Filtering for the following authorities: %v", tap.GetFilterIPs()) } - // harOutputChannel, outboundLinkOutputChannel := tap.StartPassiveTapper(tapOpts) filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem) - tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions) + tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions, filteringOptions) socketConnection, err := shared.ConnectToSocketServer(*apiServerAddress, shared.DEFAULT_SOCKET_RETRIES, shared.DEFAULT_SOCKET_RETRY_SLEEP_TIME, false) if err != nil { panic(fmt.Sprintf("Error connecting to socket server at %s %v", *apiServerAddress, err)) @@ -89,7 +90,7 @@ func main() { outputItemsChannel := make(chan *tapApi.OutputChannelItem) filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem) - go filterItems(outputItemsChannel, filteredOutputItemsChannel, getTrafficFilteringOptions()) + go filterItems(outputItemsChannel, filteredOutputItemsChannel, filteringOptions) go api.StartReadingEntries(filteredOutputItemsChannel, nil, extensionsMap) hostApi(outputItemsChannel) @@ -97,7 +98,7 @@ func main() { outputItemsChannel := make(chan *tapApi.OutputChannelItem, 1000) filteredHarChannel := make(chan *tapApi.OutputChannelItem) - go filterItems(outputItemsChannel, filteredHarChannel, getTrafficFilteringOptions()) + go filterItems(outputItemsChannel, filteredHarChannel, filteringOptions) go api.StartReadingEntries(filteredHarChannel, harsDir, extensionsMap) hostApi(nil) } @@ -225,23 +226,23 @@ func getTapTargets() []string { return tappedAddressesPerNodeDict[nodeName] } -func getTrafficFilteringOptions() *shared.TrafficFilteringOptions { +func getTrafficFilteringOptions() *tapApi.TrafficFilteringOptions { filteringOptionsJson := os.Getenv(shared.MizuFilteringOptionsEnvVar) if filteringOptionsJson == "" { - return &shared.TrafficFilteringOptions{ + return &tapApi.TrafficFilteringOptions{ HealthChecksUserAgentHeaders: []string{}, } } - var filteringOptions shared.TrafficFilteringOptions + var filteringOptions tapApi.TrafficFilteringOptions err := json.Unmarshal([]byte(filteringOptionsJson), &filteringOptions) if err != nil { - panic(fmt.Sprintf("env var %s's value of %s is invalid! json must match the shared.TrafficFilteringOptions struct %v", shared.MizuFilteringOptionsEnvVar, filteringOptionsJson, err)) + panic(fmt.Sprintf("env var %s's value of %s is invalid! json must match the api.TrafficFilteringOptions struct %v", shared.MizuFilteringOptionsEnvVar, filteringOptionsJson, err)) } return &filteringOptions } -func filterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *tapApi.OutputChannelItem, filterOptions *shared.TrafficFilteringOptions) { +func filterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *tapApi.OutputChannelItem, filterOptions *tapApi.TrafficFilteringOptions) { for message := range inChannel { if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) { continue @@ -251,10 +252,6 @@ func filterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *ta continue } - // if !filterOptions.DisableRedaction { - // sensitiveDataFiltering.FilterSensitiveInfoFromHarRequest(message, filterOptions) - // } - outChannel <- message } } diff --git a/cli/cmd/tapRunner.go b/cli/cmd/tapRunner.go index 0363e1b24..0ea5a0114 100644 --- a/cli/cmd/tapRunner.go +++ b/cli/cmd/tapRunner.go @@ -21,6 +21,7 @@ import ( "github.com/up9inc/mizu/cli/uiUtils" "github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared/debounce" + "github.com/up9inc/mizu/tap/api" yaml "gopkg.in/yaml.v3" core "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" @@ -123,7 +124,7 @@ func readValidationRules(file string) (string, error) { return string(newContent), nil } -func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, mizuApiFilteringOptions *shared.TrafficFilteringOptions, mizuValidationRules string) error { +func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, mizuApiFilteringOptions *api.TrafficFilteringOptions, mizuValidationRules string) error { if !config.Config.IsNsRestrictedMode() { if err := createMizuNamespace(ctx, kubernetesProvider); err != nil { return err @@ -158,7 +159,7 @@ func createMizuNamespace(ctx context.Context, kubernetesProvider *kubernetes.Pro return err } -func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Provider, mizuApiFilteringOptions *shared.TrafficFilteringOptions) error { +func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Provider, mizuApiFilteringOptions *api.TrafficFilteringOptions) error { var err error state.mizuServiceAccountExists, err = createRBACIfNecessary(ctx, kubernetesProvider) @@ -199,13 +200,13 @@ func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Pro return nil } -func getMizuApiFilteringOptions() (*shared.TrafficFilteringOptions, error) { - var compiledRegexSlice []*shared.SerializableRegexp +func getMizuApiFilteringOptions() (*api.TrafficFilteringOptions, error) { + var compiledRegexSlice []*api.SerializableRegexp if config.Config.Tap.PlainTextFilterRegexes != nil && len(config.Config.Tap.PlainTextFilterRegexes) > 0 { - compiledRegexSlice = make([]*shared.SerializableRegexp, 0) + compiledRegexSlice = make([]*api.SerializableRegexp, 0) for _, regexStr := range config.Config.Tap.PlainTextFilterRegexes { - compiledRegex, err := shared.CompileRegexToSerializableRegexp(regexStr) + compiledRegex, err := api.CompileRegexToSerializableRegexp(regexStr) if err != nil { return nil, err } @@ -213,7 +214,7 @@ func getMizuApiFilteringOptions() (*shared.TrafficFilteringOptions, error) { } } - return &shared.TrafficFilteringOptions{ + return &api.TrafficFilteringOptions{ PlainTextMaskingRegexes: compiledRegexSlice, HealthChecksUserAgentHeaders: config.Config.Tap.HealthChecksUserAgentHeaders, DisableRedaction: config.Config.Tap.DisableRedaction, diff --git a/cli/go.mod b/cli/go.mod index cfdfa96c6..577df798c 100644 --- a/cli/go.mod +++ b/cli/go.mod @@ -11,6 +11,7 @@ require ( github.com/spf13/cobra v1.1.3 github.com/spf13/pflag v1.0.5 github.com/up9inc/mizu/shared v0.0.0 + github.com/up9inc/mizu/tap/api v0.0.0 gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b k8s.io/api v0.21.2 k8s.io/apimachinery v0.21.2 @@ -19,3 +20,5 @@ require ( ) replace github.com/up9inc/mizu/shared v0.0.0 => ../shared + +replace github.com/up9inc/mizu/tap/api v0.0.0 => ../tap/api diff --git a/cli/kubernetes/provider.go b/cli/kubernetes/provider.go index 64dfb8cb9..04a65662f 100644 --- a/cli/kubernetes/provider.go +++ b/cli/kubernetes/provider.go @@ -18,6 +18,7 @@ import ( "github.com/up9inc/mizu/cli/mizu" "github.com/up9inc/mizu/shared" + "github.com/up9inc/mizu/tap/api" core "k8s.io/api/core/v1" rbac "k8s.io/api/rbac/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -150,7 +151,7 @@ type ApiServerOptions struct { PodImage string ServiceAccountName string IsNamespaceRestricted bool - MizuApiFilteringOptions *shared.TrafficFilteringOptions + MizuApiFilteringOptions *api.TrafficFilteringOptions MaxEntriesDBSizeBytes int64 Resources configStructs.Resources ImagePullPolicy core.PullPolicy diff --git a/shared/models.go b/shared/models.go index 9a02d8043..5dac6e9cc 100644 --- a/shared/models.go +++ b/shared/models.go @@ -74,12 +74,6 @@ func CreateWebSocketMessageTypeAnalyzeStatus(analyzeStatus AnalyzeStatus) WebSoc } } -type TrafficFilteringOptions struct { - HealthChecksUserAgentHeaders []string - PlainTextMaskingRegexes []*SerializableRegexp - DisableRedaction bool -} - type VersionResponse struct { SemVer string `json:"semver"` } diff --git a/tap/api/api.go b/tap/api/api.go index cf89eab80..42db3142c 100644 --- a/tap/api/api.go +++ b/tap/api/api.go @@ -80,7 +80,7 @@ type SuperIdentifier struct { type Dissector interface { Register(*Extension) Ping() - Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter) error + Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions) error Analyze(item *OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *MizuEntry Summarize(entry *MizuEntry) *BaseEntryDetails Represent(entry *MizuEntry) (protocol Protocol, object []byte, bodySize int64, err error) diff --git a/tap/api/options.go b/tap/api/options.go new file mode 100644 index 000000000..aff071bf3 --- /dev/null +++ b/tap/api/options.go @@ -0,0 +1,7 @@ +package api + +type TrafficFilteringOptions struct { + HealthChecksUserAgentHeaders []string + PlainTextMaskingRegexes []*SerializableRegexp + DisableRedaction bool +} diff --git a/shared/serializableRegexp.go b/tap/api/serializable_regexp.go similarity index 97% rename from shared/serializableRegexp.go rename to tap/api/serializable_regexp.go index e311fdeb5..3b888aa6c 100644 --- a/shared/serializableRegexp.go +++ b/tap/api/serializable_regexp.go @@ -1,4 +1,4 @@ -package shared +package api import "regexp" diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index 4f6fb860d..7cc638538 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -41,7 +41,7 @@ func (d dissecting) Ping() { const amqpRequest string = "amqp_request" -func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter) error { +func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error { r := AmqpReader{b} var remaining int diff --git a/tap/extensions/http/go.mod b/tap/extensions/http/go.mod index 61b21db71..ab576cf81 100644 --- a/tap/extensions/http/go.mod +++ b/tap/extensions/http/go.mod @@ -3,6 +3,7 @@ module github.com/up9inc/mizu/tap/extensions/http go 1.16 require ( + github.com/beevik/etree v1.1.0 // indirect github.com/google/martian v2.1.0+incompatible github.com/romana/rlog v0.0.0-20171115192701-f018bc92e7d7 github.com/up9inc/mizu/tap/api v0.0.0 diff --git a/tap/extensions/http/go.sum b/tap/extensions/http/go.sum index 1ff511257..03fbf179f 100644 --- a/tap/extensions/http/go.sum +++ b/tap/extensions/http/go.sum @@ -1,3 +1,5 @@ +github.com/beevik/etree v1.1.0 h1:T0xke/WvNtMoCqgzPhkX2r4rjY3GDZFi+FjpRZY2Jbs= +github.com/beevik/etree v1.1.0/go.mod h1:r8Aw8JqVegEf0w2fDnATrX9VpkMcyFeM0FhwO62wh+A= github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/romana/rlog v0.0.0-20171115192701-f018bc92e7d7 h1:jkvpcEatpwuMF5O5LVxTnehj6YZ/aEZN4NWD/Xml4pI= diff --git a/tap/extensions/http/handlers.go b/tap/extensions/http/handlers.go index fbdeb85e0..e29b8dd58 100644 --- a/tap/extensions/http/handlers.go +++ b/tap/extensions/http/handlers.go @@ -13,7 +13,12 @@ import ( "github.com/up9inc/mizu/tap/api" ) -func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter) error { +func filterAndEmit(item *api.OutputChannelItem, emitter api.Emitter, options *api.TrafficFilteringOptions) { + FilterSensitiveData(item, options) + emitter.Emit(item) +} + +func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) error { streamID, messageHTTP1, err := grpcAssembler.readMessage() if err != nil { return err @@ -64,13 +69,13 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, superTime if item != nil { item.Protocol = http2Protocol - emitter.Emit(item) + filterAndEmit(item, emitter, options) } return nil } -func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error { +func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) error { req, err := http.ReadRequest(b) if err != nil { // log.Println("Error reading stream:", err) @@ -107,12 +112,12 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api ServerPort: tcpID.DstPort, IsOutgoing: true, } - emitter.Emit(item) + filterAndEmit(item, emitter, options) } return nil } -func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error { +func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) error { res, err := http.ReadResponse(b, nil) if err != nil { // log.Println("Error reading stream:", err) @@ -157,7 +162,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api ServerPort: tcpID.SrcPort, IsOutgoing: false, } - emitter.Emit(item) + filterAndEmit(item, emitter, options) } return nil } diff --git a/tap/extensions/http/main.go b/tap/extensions/http/main.go index 13c5d78d9..0b48e69db 100644 --- a/tap/extensions/http/main.go +++ b/tap/extensions/http/main.go @@ -61,7 +61,7 @@ func (d dissecting) Ping() { log.Printf("pong %s\n", protocol.Name) } -func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter) error { +func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error { ident := fmt.Sprintf("%s->%s:%s->%s", tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, tcpID.DstPort) isHTTP2, err := checkIsHTTP2Connection(b, isClient) if err != nil { @@ -85,7 +85,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co } if isHTTP2 { - err = handleHTTP2Stream(grpcAssembler, tcpID, superTimer, emitter) + err = handleHTTP2Stream(grpcAssembler, tcpID, superTimer, emitter, options) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { @@ -94,7 +94,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co } dissected = true } else if isClient { - err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter) + err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter, options) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { @@ -103,7 +103,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co } dissected = true } else { - err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter) + err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter, options) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { diff --git a/tap/extensions/http/sensitive_data_cleaner.go b/tap/extensions/http/sensitive_data_cleaner.go new file mode 100644 index 000000000..4c61c9993 --- /dev/null +++ b/tap/extensions/http/sensitive_data_cleaner.go @@ -0,0 +1,209 @@ +package main + +import ( + "bytes" + "encoding/json" + "encoding/xml" + "errors" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "strings" + + "github.com/beevik/etree" + "github.com/romana/rlog" + "github.com/up9inc/mizu/tap/api" +) + +const maskedFieldPlaceholderValue = "[REDACTED]" + +//these values MUST be all lower case and contain no `-` or `_` characters +var personallyIdentifiableDataFields = []string{"token", "authorization", "authentication", "cookie", "userid", "password", + "username", "user", "key", "passcode", "pass", "auth", "authtoken", "jwt", + "bearer", "clientid", "clientsecret", "redirecturi", "phonenumber", + "zip", "zipcode", "address", "country", "firstname", "lastname", + "middlename", "fname", "lname", "birthdate"} + +func FilterSensitiveData(item *api.OutputChannelItem, options *api.TrafficFilteringOptions) { + request := item.Pair.Request.Payload.(HTTPPayload).Data.(*http.Request) + response := item.Pair.Response.Payload.(HTTPPayload).Data.(*http.Response) + + filterHeaders(&request.Header) + filterHeaders(&response.Header) + filterUrl(request.URL) + filterRequestBody(request, options) + filterResponseBody(response, options) +} + +func filterRequestBody(request *http.Request, options *api.TrafficFilteringOptions) { + contenType := getContentTypeHeaderValue(request.Header) + body, err := ioutil.ReadAll(request.Body) + if err != nil { + rlog.Debugf("Filtering error reading body: %v", err) + return + } + filteredBody, err := filterHttpBody([]byte(body), contenType, options) + if err == nil { + request.Body = ioutil.NopCloser(bytes.NewBuffer(filteredBody)) + } else { + request.Body = ioutil.NopCloser(bytes.NewBuffer(body)) + } +} + +func filterResponseBody(response *http.Response, options *api.TrafficFilteringOptions) { + contentType := getContentTypeHeaderValue(response.Header) + body, err := ioutil.ReadAll(response.Body) + if err != nil { + rlog.Debugf("Filtering error reading body: %v", err) + return + } + filteredBody, err := filterHttpBody([]byte(body), contentType, options) + if err == nil { + response.Body = ioutil.NopCloser(bytes.NewBuffer(filteredBody)) + } else { + response.Body = ioutil.NopCloser(bytes.NewBuffer(body)) + } +} + +func filterHeaders(headers *http.Header) { + for key, _ := range *headers { + if strings.ToLower(key) == "cookie" { + headers.Del(key) + } else if isFieldNameSensitive(key) { + headers.Set(key, maskedFieldPlaceholderValue) + } + } +} + +func getContentTypeHeaderValue(headers http.Header) string { + for key, _ := range headers { + if strings.ToLower(key) == "content-type" { + return headers.Get(key) + } + } + return "" +} + +func isFieldNameSensitive(fieldName string) bool { + name := strings.ToLower(fieldName) + name = strings.ReplaceAll(name, "_", "") + name = strings.ReplaceAll(name, "-", "") + name = strings.ReplaceAll(name, " ", "") + + for _, sensitiveField := range personallyIdentifiableDataFields { + if strings.Contains(name, sensitiveField) { + return true + } + } + + return false +} + +func filterHttpBody(bytes []byte, contentType string, options *api.TrafficFilteringOptions) ([]byte, error) { + mimeType := strings.Split(contentType, ";")[0] + switch strings.ToLower(mimeType) { + case "application/json": + return filterJsonBody(bytes) + case "text/html": + fallthrough + case "application/xhtml+xml": + fallthrough + case "text/xml": + fallthrough + case "application/xml": + return filterXmlEtree(bytes) + case "text/plain": + if options != nil && options.PlainTextMaskingRegexes != nil { + return filterPlainText(bytes, options), nil + } + } + return bytes, nil +} + +func filterPlainText(bytes []byte, options *api.TrafficFilteringOptions) []byte { + for _, regex := range options.PlainTextMaskingRegexes { + bytes = regex.ReplaceAll(bytes, []byte(maskedFieldPlaceholderValue)) + } + return bytes +} + +func filterXmlEtree(bytes []byte) ([]byte, error) { + if !IsValidXML(bytes) { + return nil, errors.New("Invalid XML") + } + xmlDoc := etree.NewDocument() + err := xmlDoc.ReadFromBytes(bytes) + if err != nil { + return nil, err + } else { + filterXmlElement(xmlDoc.Root()) + } + return xmlDoc.WriteToBytes() +} + +func IsValidXML(data []byte) bool { + return xml.Unmarshal(data, new(interface{})) == nil +} + +func filterXmlElement(element *etree.Element) { + for i, attribute := range element.Attr { + if isFieldNameSensitive(attribute.Key) { + element.Attr[i].Value = maskedFieldPlaceholderValue + } + } + if element.ChildElements() == nil || len(element.ChildElements()) == 0 { + if isFieldNameSensitive(element.Tag) { + element.SetText(maskedFieldPlaceholderValue) + } + } else { + for _, element := range element.ChildElements() { + filterXmlElement(element) + } + } +} + +func filterJsonBody(bytes []byte) ([]byte, error) { + var bodyJsonMap map[string]interface{} + err := json.Unmarshal(bytes, &bodyJsonMap) + if err != nil { + return nil, err + } + filterJsonMap(bodyJsonMap) + return json.Marshal(bodyJsonMap) +} + +func filterJsonMap(jsonMap map[string]interface{}) { + for key, value := range jsonMap { + // Do not replace nil values with maskedFieldPlaceholderValue + if value == nil { + continue + } + + nestedMap, isNested := value.(map[string]interface{}) + if isNested { + filterJsonMap(nestedMap) + } else { + if isFieldNameSensitive(key) { + jsonMap[key] = maskedFieldPlaceholderValue + } + } + } +} + +func filterUrl(url *url.URL) { + if len(url.RawQuery) > 0 { + newQueryArgs := make([]string, 0) + for urlQueryParamName, urlQueryParamValues := range url.Query() { + newValues := urlQueryParamValues + if isFieldNameSensitive(urlQueryParamName) { + newValues = []string{maskedFieldPlaceholderValue} + } + for _, paramValue := range newValues { + newQueryArgs = append(newQueryArgs, fmt.Sprintf("%s=%s", urlQueryParamName, paramValue)) + } + } + + url.RawQuery = strings.Join(newQueryArgs, "&") + } +} diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index 7289f1fa2..e40575519 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -39,7 +39,7 @@ func (d dissecting) Ping() { log.Printf("pong %s\n", _protocol.Name) } -func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter) error { +func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error { for { if superIdentifier.Protocol != nil && superIdentifier.Protocol != &_protocol { return errors.New("Identified by another protocol") diff --git a/tap/passive_tapper.go b/tap/passive_tapper.go index d4aec99f3..14b4867e7 100644 --- a/tap/passive_tapper.go +++ b/tap/passive_tapper.go @@ -92,9 +92,10 @@ var outputLevel int var errorsMap map[string]uint var errorsMapMutex sync.Mutex var nErrors uint -var ownIps []string // global -var hostMode bool // global -var extensions []*api.Extension // global +var ownIps []string // global +var hostMode bool // global +var extensions []*api.Extension // global +var filteringOptions *api.TrafficFilteringOptions // global const baseStreamChannelTimeoutMs int = 5000 * 100 @@ -160,9 +161,10 @@ func (c *Context) GetCaptureInfo() gopacket.CaptureInfo { return c.CaptureInfo } -func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem, extensionsRef []*api.Extension) { +func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem, extensionsRef []*api.Extension, options *api.TrafficFilteringOptions) { hostMode = opts.HostMode extensions = extensionsRef + filteringOptions = options if GetMemoryProfilingEnabled() { startMemoryProfiler() diff --git a/tap/tcp_reader.go b/tap/tcp_reader.go index c9e8d4b76..4b9604917 100644 --- a/tap/tcp_reader.go +++ b/tap/tcp_reader.go @@ -107,7 +107,7 @@ func (h *tcpReader) Close() { func (h *tcpReader) run(wg *sync.WaitGroup) { defer wg.Done() b := bufio.NewReader(h) - err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter) + err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter, filteringOptions) if err != nil { io.Copy(ioutil.Discard, b) }