mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-07-04 11:58:41 +00:00
Bring back the sensitive data filtering feature (#280)
* Bring back the sensitive data filtering feature * Add `// global` comment
This commit is contained in:
parent
6337b75f0e
commit
9fa9b67328
@ -50,14 +50,16 @@ func main() {
|
|||||||
panic("One of the flags --tap, --api or --standalone or --hars-read must be provided")
|
panic("One of the flags --tap, --api or --standalone or --hars-read must be provided")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
filteringOptions := getTrafficFilteringOptions()
|
||||||
|
|
||||||
if *standaloneMode {
|
if *standaloneMode {
|
||||||
api.StartResolving(*namespace)
|
api.StartResolving(*namespace)
|
||||||
|
|
||||||
outputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
outputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
||||||
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
||||||
tap.StartPassiveTapper(tapOpts, outputItemsChannel, extensions)
|
tap.StartPassiveTapper(tapOpts, outputItemsChannel, extensions, filteringOptions)
|
||||||
|
|
||||||
go filterItems(outputItemsChannel, filteredOutputItemsChannel, getTrafficFilteringOptions())
|
go filterItems(outputItemsChannel, filteredOutputItemsChannel, filteringOptions)
|
||||||
go api.StartReadingEntries(filteredOutputItemsChannel, nil, extensionsMap)
|
go api.StartReadingEntries(filteredOutputItemsChannel, nil, extensionsMap)
|
||||||
// go api.StartReadingOutbound(outboundLinkOutputChannel)
|
// go api.StartReadingOutbound(outboundLinkOutputChannel)
|
||||||
|
|
||||||
@ -73,9 +75,8 @@ func main() {
|
|||||||
rlog.Infof("Filtering for the following authorities: %v", tap.GetFilterIPs())
|
rlog.Infof("Filtering for the following authorities: %v", tap.GetFilterIPs())
|
||||||
}
|
}
|
||||||
|
|
||||||
// harOutputChannel, outboundLinkOutputChannel := tap.StartPassiveTapper(tapOpts)
|
|
||||||
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
||||||
tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions)
|
tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions, filteringOptions)
|
||||||
socketConnection, err := shared.ConnectToSocketServer(*apiServerAddress, shared.DEFAULT_SOCKET_RETRIES, shared.DEFAULT_SOCKET_RETRY_SLEEP_TIME, false)
|
socketConnection, err := shared.ConnectToSocketServer(*apiServerAddress, shared.DEFAULT_SOCKET_RETRIES, shared.DEFAULT_SOCKET_RETRY_SLEEP_TIME, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Sprintf("Error connecting to socket server at %s %v", *apiServerAddress, err))
|
panic(fmt.Sprintf("Error connecting to socket server at %s %v", *apiServerAddress, err))
|
||||||
@ -89,7 +90,7 @@ func main() {
|
|||||||
outputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
outputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
||||||
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
||||||
|
|
||||||
go filterItems(outputItemsChannel, filteredOutputItemsChannel, getTrafficFilteringOptions())
|
go filterItems(outputItemsChannel, filteredOutputItemsChannel, filteringOptions)
|
||||||
go api.StartReadingEntries(filteredOutputItemsChannel, nil, extensionsMap)
|
go api.StartReadingEntries(filteredOutputItemsChannel, nil, extensionsMap)
|
||||||
|
|
||||||
hostApi(outputItemsChannel)
|
hostApi(outputItemsChannel)
|
||||||
@ -97,7 +98,7 @@ func main() {
|
|||||||
outputItemsChannel := make(chan *tapApi.OutputChannelItem, 1000)
|
outputItemsChannel := make(chan *tapApi.OutputChannelItem, 1000)
|
||||||
filteredHarChannel := make(chan *tapApi.OutputChannelItem)
|
filteredHarChannel := make(chan *tapApi.OutputChannelItem)
|
||||||
|
|
||||||
go filterItems(outputItemsChannel, filteredHarChannel, getTrafficFilteringOptions())
|
go filterItems(outputItemsChannel, filteredHarChannel, filteringOptions)
|
||||||
go api.StartReadingEntries(filteredHarChannel, harsDir, extensionsMap)
|
go api.StartReadingEntries(filteredHarChannel, harsDir, extensionsMap)
|
||||||
hostApi(nil)
|
hostApi(nil)
|
||||||
}
|
}
|
||||||
@ -225,23 +226,23 @@ func getTapTargets() []string {
|
|||||||
return tappedAddressesPerNodeDict[nodeName]
|
return tappedAddressesPerNodeDict[nodeName]
|
||||||
}
|
}
|
||||||
|
|
||||||
func getTrafficFilteringOptions() *shared.TrafficFilteringOptions {
|
func getTrafficFilteringOptions() *tapApi.TrafficFilteringOptions {
|
||||||
filteringOptionsJson := os.Getenv(shared.MizuFilteringOptionsEnvVar)
|
filteringOptionsJson := os.Getenv(shared.MizuFilteringOptionsEnvVar)
|
||||||
if filteringOptionsJson == "" {
|
if filteringOptionsJson == "" {
|
||||||
return &shared.TrafficFilteringOptions{
|
return &tapApi.TrafficFilteringOptions{
|
||||||
HealthChecksUserAgentHeaders: []string{},
|
HealthChecksUserAgentHeaders: []string{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
var filteringOptions shared.TrafficFilteringOptions
|
var filteringOptions tapApi.TrafficFilteringOptions
|
||||||
err := json.Unmarshal([]byte(filteringOptionsJson), &filteringOptions)
|
err := json.Unmarshal([]byte(filteringOptionsJson), &filteringOptions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Sprintf("env var %s's value of %s is invalid! json must match the shared.TrafficFilteringOptions struct %v", shared.MizuFilteringOptionsEnvVar, filteringOptionsJson, err))
|
panic(fmt.Sprintf("env var %s's value of %s is invalid! json must match the api.TrafficFilteringOptions struct %v", shared.MizuFilteringOptionsEnvVar, filteringOptionsJson, err))
|
||||||
}
|
}
|
||||||
|
|
||||||
return &filteringOptions
|
return &filteringOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
func filterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *tapApi.OutputChannelItem, filterOptions *shared.TrafficFilteringOptions) {
|
func filterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *tapApi.OutputChannelItem, filterOptions *tapApi.TrafficFilteringOptions) {
|
||||||
for message := range inChannel {
|
for message := range inChannel {
|
||||||
if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) {
|
if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) {
|
||||||
continue
|
continue
|
||||||
@ -251,10 +252,6 @@ func filterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *ta
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// if !filterOptions.DisableRedaction {
|
|
||||||
// sensitiveDataFiltering.FilterSensitiveInfoFromHarRequest(message, filterOptions)
|
|
||||||
// }
|
|
||||||
|
|
||||||
outChannel <- message
|
outChannel <- message
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -21,6 +21,7 @@ import (
|
|||||||
"github.com/up9inc/mizu/cli/uiUtils"
|
"github.com/up9inc/mizu/cli/uiUtils"
|
||||||
"github.com/up9inc/mizu/shared"
|
"github.com/up9inc/mizu/shared"
|
||||||
"github.com/up9inc/mizu/shared/debounce"
|
"github.com/up9inc/mizu/shared/debounce"
|
||||||
|
"github.com/up9inc/mizu/tap/api"
|
||||||
yaml "gopkg.in/yaml.v3"
|
yaml "gopkg.in/yaml.v3"
|
||||||
core "k8s.io/api/core/v1"
|
core "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
@ -123,7 +124,7 @@ func readValidationRules(file string) (string, error) {
|
|||||||
return string(newContent), nil
|
return string(newContent), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, mizuApiFilteringOptions *shared.TrafficFilteringOptions, mizuValidationRules string) error {
|
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, mizuApiFilteringOptions *api.TrafficFilteringOptions, mizuValidationRules string) error {
|
||||||
if !config.Config.IsNsRestrictedMode() {
|
if !config.Config.IsNsRestrictedMode() {
|
||||||
if err := createMizuNamespace(ctx, kubernetesProvider); err != nil {
|
if err := createMizuNamespace(ctx, kubernetesProvider); err != nil {
|
||||||
return err
|
return err
|
||||||
@ -158,7 +159,7 @@ func createMizuNamespace(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Provider, mizuApiFilteringOptions *shared.TrafficFilteringOptions) error {
|
func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Provider, mizuApiFilteringOptions *api.TrafficFilteringOptions) error {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
state.mizuServiceAccountExists, err = createRBACIfNecessary(ctx, kubernetesProvider)
|
state.mizuServiceAccountExists, err = createRBACIfNecessary(ctx, kubernetesProvider)
|
||||||
@ -199,13 +200,13 @@ func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getMizuApiFilteringOptions() (*shared.TrafficFilteringOptions, error) {
|
func getMizuApiFilteringOptions() (*api.TrafficFilteringOptions, error) {
|
||||||
var compiledRegexSlice []*shared.SerializableRegexp
|
var compiledRegexSlice []*api.SerializableRegexp
|
||||||
|
|
||||||
if config.Config.Tap.PlainTextFilterRegexes != nil && len(config.Config.Tap.PlainTextFilterRegexes) > 0 {
|
if config.Config.Tap.PlainTextFilterRegexes != nil && len(config.Config.Tap.PlainTextFilterRegexes) > 0 {
|
||||||
compiledRegexSlice = make([]*shared.SerializableRegexp, 0)
|
compiledRegexSlice = make([]*api.SerializableRegexp, 0)
|
||||||
for _, regexStr := range config.Config.Tap.PlainTextFilterRegexes {
|
for _, regexStr := range config.Config.Tap.PlainTextFilterRegexes {
|
||||||
compiledRegex, err := shared.CompileRegexToSerializableRegexp(regexStr)
|
compiledRegex, err := api.CompileRegexToSerializableRegexp(regexStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -213,7 +214,7 @@ func getMizuApiFilteringOptions() (*shared.TrafficFilteringOptions, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return &shared.TrafficFilteringOptions{
|
return &api.TrafficFilteringOptions{
|
||||||
PlainTextMaskingRegexes: compiledRegexSlice,
|
PlainTextMaskingRegexes: compiledRegexSlice,
|
||||||
HealthChecksUserAgentHeaders: config.Config.Tap.HealthChecksUserAgentHeaders,
|
HealthChecksUserAgentHeaders: config.Config.Tap.HealthChecksUserAgentHeaders,
|
||||||
DisableRedaction: config.Config.Tap.DisableRedaction,
|
DisableRedaction: config.Config.Tap.DisableRedaction,
|
||||||
|
@ -11,6 +11,7 @@ require (
|
|||||||
github.com/spf13/cobra v1.1.3
|
github.com/spf13/cobra v1.1.3
|
||||||
github.com/spf13/pflag v1.0.5
|
github.com/spf13/pflag v1.0.5
|
||||||
github.com/up9inc/mizu/shared v0.0.0
|
github.com/up9inc/mizu/shared v0.0.0
|
||||||
|
github.com/up9inc/mizu/tap/api v0.0.0
|
||||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
|
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
|
||||||
k8s.io/api v0.21.2
|
k8s.io/api v0.21.2
|
||||||
k8s.io/apimachinery v0.21.2
|
k8s.io/apimachinery v0.21.2
|
||||||
@ -19,3 +20,5 @@ require (
|
|||||||
)
|
)
|
||||||
|
|
||||||
replace github.com/up9inc/mizu/shared v0.0.0 => ../shared
|
replace github.com/up9inc/mizu/shared v0.0.0 => ../shared
|
||||||
|
|
||||||
|
replace github.com/up9inc/mizu/tap/api v0.0.0 => ../tap/api
|
||||||
|
@ -18,6 +18,7 @@ import (
|
|||||||
|
|
||||||
"github.com/up9inc/mizu/cli/mizu"
|
"github.com/up9inc/mizu/cli/mizu"
|
||||||
"github.com/up9inc/mizu/shared"
|
"github.com/up9inc/mizu/shared"
|
||||||
|
"github.com/up9inc/mizu/tap/api"
|
||||||
core "k8s.io/api/core/v1"
|
core "k8s.io/api/core/v1"
|
||||||
rbac "k8s.io/api/rbac/v1"
|
rbac "k8s.io/api/rbac/v1"
|
||||||
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
@ -150,7 +151,7 @@ type ApiServerOptions struct {
|
|||||||
PodImage string
|
PodImage string
|
||||||
ServiceAccountName string
|
ServiceAccountName string
|
||||||
IsNamespaceRestricted bool
|
IsNamespaceRestricted bool
|
||||||
MizuApiFilteringOptions *shared.TrafficFilteringOptions
|
MizuApiFilteringOptions *api.TrafficFilteringOptions
|
||||||
MaxEntriesDBSizeBytes int64
|
MaxEntriesDBSizeBytes int64
|
||||||
Resources configStructs.Resources
|
Resources configStructs.Resources
|
||||||
ImagePullPolicy core.PullPolicy
|
ImagePullPolicy core.PullPolicy
|
||||||
|
@ -74,12 +74,6 @@ func CreateWebSocketMessageTypeAnalyzeStatus(analyzeStatus AnalyzeStatus) WebSoc
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type TrafficFilteringOptions struct {
|
|
||||||
HealthChecksUserAgentHeaders []string
|
|
||||||
PlainTextMaskingRegexes []*SerializableRegexp
|
|
||||||
DisableRedaction bool
|
|
||||||
}
|
|
||||||
|
|
||||||
type VersionResponse struct {
|
type VersionResponse struct {
|
||||||
SemVer string `json:"semver"`
|
SemVer string `json:"semver"`
|
||||||
}
|
}
|
||||||
|
@ -80,7 +80,7 @@ type SuperIdentifier struct {
|
|||||||
type Dissector interface {
|
type Dissector interface {
|
||||||
Register(*Extension)
|
Register(*Extension)
|
||||||
Ping()
|
Ping()
|
||||||
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter) error
|
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions) error
|
||||||
Analyze(item *OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *MizuEntry
|
Analyze(item *OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *MizuEntry
|
||||||
Summarize(entry *MizuEntry) *BaseEntryDetails
|
Summarize(entry *MizuEntry) *BaseEntryDetails
|
||||||
Represent(entry *MizuEntry) (protocol Protocol, object []byte, bodySize int64, err error)
|
Represent(entry *MizuEntry) (protocol Protocol, object []byte, bodySize int64, err error)
|
||||||
|
7
tap/api/options.go
Normal file
7
tap/api/options.go
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
package api
|
||||||
|
|
||||||
|
type TrafficFilteringOptions struct {
|
||||||
|
HealthChecksUserAgentHeaders []string
|
||||||
|
PlainTextMaskingRegexes []*SerializableRegexp
|
||||||
|
DisableRedaction bool
|
||||||
|
}
|
@ -1,4 +1,4 @@
|
|||||||
package shared
|
package api
|
||||||
|
|
||||||
import "regexp"
|
import "regexp"
|
||||||
|
|
@ -41,7 +41,7 @@ func (d dissecting) Ping() {
|
|||||||
|
|
||||||
const amqpRequest string = "amqp_request"
|
const amqpRequest string = "amqp_request"
|
||||||
|
|
||||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter) error {
|
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
|
||||||
r := AmqpReader{b}
|
r := AmqpReader{b}
|
||||||
|
|
||||||
var remaining int
|
var remaining int
|
||||||
|
@ -3,6 +3,7 @@ module github.com/up9inc/mizu/tap/extensions/http
|
|||||||
go 1.16
|
go 1.16
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
github.com/beevik/etree v1.1.0 // indirect
|
||||||
github.com/google/martian v2.1.0+incompatible
|
github.com/google/martian v2.1.0+incompatible
|
||||||
github.com/romana/rlog v0.0.0-20171115192701-f018bc92e7d7
|
github.com/romana/rlog v0.0.0-20171115192701-f018bc92e7d7
|
||||||
github.com/up9inc/mizu/tap/api v0.0.0
|
github.com/up9inc/mizu/tap/api v0.0.0
|
||||||
|
@ -1,3 +1,5 @@
|
|||||||
|
github.com/beevik/etree v1.1.0 h1:T0xke/WvNtMoCqgzPhkX2r4rjY3GDZFi+FjpRZY2Jbs=
|
||||||
|
github.com/beevik/etree v1.1.0/go.mod h1:r8Aw8JqVegEf0w2fDnATrX9VpkMcyFeM0FhwO62wh+A=
|
||||||
github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
|
github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
|
||||||
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
|
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
|
||||||
github.com/romana/rlog v0.0.0-20171115192701-f018bc92e7d7 h1:jkvpcEatpwuMF5O5LVxTnehj6YZ/aEZN4NWD/Xml4pI=
|
github.com/romana/rlog v0.0.0-20171115192701-f018bc92e7d7 h1:jkvpcEatpwuMF5O5LVxTnehj6YZ/aEZN4NWD/Xml4pI=
|
||||||
|
@ -13,7 +13,12 @@ import (
|
|||||||
"github.com/up9inc/mizu/tap/api"
|
"github.com/up9inc/mizu/tap/api"
|
||||||
)
|
)
|
||||||
|
|
||||||
func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter) error {
|
func filterAndEmit(item *api.OutputChannelItem, emitter api.Emitter, options *api.TrafficFilteringOptions) {
|
||||||
|
FilterSensitiveData(item, options)
|
||||||
|
emitter.Emit(item)
|
||||||
|
}
|
||||||
|
|
||||||
|
func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
|
||||||
streamID, messageHTTP1, err := grpcAssembler.readMessage()
|
streamID, messageHTTP1, err := grpcAssembler.readMessage()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -64,13 +69,13 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, superTime
|
|||||||
|
|
||||||
if item != nil {
|
if item != nil {
|
||||||
item.Protocol = http2Protocol
|
item.Protocol = http2Protocol
|
||||||
emitter.Emit(item)
|
filterAndEmit(item, emitter, options)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error {
|
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
|
||||||
req, err := http.ReadRequest(b)
|
req, err := http.ReadRequest(b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// log.Println("Error reading stream:", err)
|
// log.Println("Error reading stream:", err)
|
||||||
@ -107,12 +112,12 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
|
|||||||
ServerPort: tcpID.DstPort,
|
ServerPort: tcpID.DstPort,
|
||||||
IsOutgoing: true,
|
IsOutgoing: true,
|
||||||
}
|
}
|
||||||
emitter.Emit(item)
|
filterAndEmit(item, emitter, options)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error {
|
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
|
||||||
res, err := http.ReadResponse(b, nil)
|
res, err := http.ReadResponse(b, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// log.Println("Error reading stream:", err)
|
// log.Println("Error reading stream:", err)
|
||||||
@ -157,7 +162,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
|
|||||||
ServerPort: tcpID.SrcPort,
|
ServerPort: tcpID.SrcPort,
|
||||||
IsOutgoing: false,
|
IsOutgoing: false,
|
||||||
}
|
}
|
||||||
emitter.Emit(item)
|
filterAndEmit(item, emitter, options)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -61,7 +61,7 @@ func (d dissecting) Ping() {
|
|||||||
log.Printf("pong %s\n", protocol.Name)
|
log.Printf("pong %s\n", protocol.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter) error {
|
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
|
||||||
ident := fmt.Sprintf("%s->%s:%s->%s", tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, tcpID.DstPort)
|
ident := fmt.Sprintf("%s->%s:%s->%s", tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, tcpID.DstPort)
|
||||||
isHTTP2, err := checkIsHTTP2Connection(b, isClient)
|
isHTTP2, err := checkIsHTTP2Connection(b, isClient)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -85,7 +85,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
|||||||
}
|
}
|
||||||
|
|
||||||
if isHTTP2 {
|
if isHTTP2 {
|
||||||
err = handleHTTP2Stream(grpcAssembler, tcpID, superTimer, emitter)
|
err = handleHTTP2Stream(grpcAssembler, tcpID, superTimer, emitter, options)
|
||||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||||
break
|
break
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
@ -94,7 +94,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
|||||||
}
|
}
|
||||||
dissected = true
|
dissected = true
|
||||||
} else if isClient {
|
} else if isClient {
|
||||||
err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter)
|
err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter, options)
|
||||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||||
break
|
break
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
@ -103,7 +103,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
|||||||
}
|
}
|
||||||
dissected = true
|
dissected = true
|
||||||
} else {
|
} else {
|
||||||
err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter)
|
err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter, options)
|
||||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||||
break
|
break
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
|
209
tap/extensions/http/sensitive_data_cleaner.go
Normal file
209
tap/extensions/http/sensitive_data_cleaner.go
Normal file
@ -0,0 +1,209 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"encoding/xml"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/beevik/etree"
|
||||||
|
"github.com/romana/rlog"
|
||||||
|
"github.com/up9inc/mizu/tap/api"
|
||||||
|
)
|
||||||
|
|
||||||
|
const maskedFieldPlaceholderValue = "[REDACTED]"
|
||||||
|
|
||||||
|
//these values MUST be all lower case and contain no `-` or `_` characters
|
||||||
|
var personallyIdentifiableDataFields = []string{"token", "authorization", "authentication", "cookie", "userid", "password",
|
||||||
|
"username", "user", "key", "passcode", "pass", "auth", "authtoken", "jwt",
|
||||||
|
"bearer", "clientid", "clientsecret", "redirecturi", "phonenumber",
|
||||||
|
"zip", "zipcode", "address", "country", "firstname", "lastname",
|
||||||
|
"middlename", "fname", "lname", "birthdate"}
|
||||||
|
|
||||||
|
func FilterSensitiveData(item *api.OutputChannelItem, options *api.TrafficFilteringOptions) {
|
||||||
|
request := item.Pair.Request.Payload.(HTTPPayload).Data.(*http.Request)
|
||||||
|
response := item.Pair.Response.Payload.(HTTPPayload).Data.(*http.Response)
|
||||||
|
|
||||||
|
filterHeaders(&request.Header)
|
||||||
|
filterHeaders(&response.Header)
|
||||||
|
filterUrl(request.URL)
|
||||||
|
filterRequestBody(request, options)
|
||||||
|
filterResponseBody(response, options)
|
||||||
|
}
|
||||||
|
|
||||||
|
func filterRequestBody(request *http.Request, options *api.TrafficFilteringOptions) {
|
||||||
|
contenType := getContentTypeHeaderValue(request.Header)
|
||||||
|
body, err := ioutil.ReadAll(request.Body)
|
||||||
|
if err != nil {
|
||||||
|
rlog.Debugf("Filtering error reading body: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
filteredBody, err := filterHttpBody([]byte(body), contenType, options)
|
||||||
|
if err == nil {
|
||||||
|
request.Body = ioutil.NopCloser(bytes.NewBuffer(filteredBody))
|
||||||
|
} else {
|
||||||
|
request.Body = ioutil.NopCloser(bytes.NewBuffer(body))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func filterResponseBody(response *http.Response, options *api.TrafficFilteringOptions) {
|
||||||
|
contentType := getContentTypeHeaderValue(response.Header)
|
||||||
|
body, err := ioutil.ReadAll(response.Body)
|
||||||
|
if err != nil {
|
||||||
|
rlog.Debugf("Filtering error reading body: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
filteredBody, err := filterHttpBody([]byte(body), contentType, options)
|
||||||
|
if err == nil {
|
||||||
|
response.Body = ioutil.NopCloser(bytes.NewBuffer(filteredBody))
|
||||||
|
} else {
|
||||||
|
response.Body = ioutil.NopCloser(bytes.NewBuffer(body))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func filterHeaders(headers *http.Header) {
|
||||||
|
for key, _ := range *headers {
|
||||||
|
if strings.ToLower(key) == "cookie" {
|
||||||
|
headers.Del(key)
|
||||||
|
} else if isFieldNameSensitive(key) {
|
||||||
|
headers.Set(key, maskedFieldPlaceholderValue)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getContentTypeHeaderValue(headers http.Header) string {
|
||||||
|
for key, _ := range headers {
|
||||||
|
if strings.ToLower(key) == "content-type" {
|
||||||
|
return headers.Get(key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func isFieldNameSensitive(fieldName string) bool {
|
||||||
|
name := strings.ToLower(fieldName)
|
||||||
|
name = strings.ReplaceAll(name, "_", "")
|
||||||
|
name = strings.ReplaceAll(name, "-", "")
|
||||||
|
name = strings.ReplaceAll(name, " ", "")
|
||||||
|
|
||||||
|
for _, sensitiveField := range personallyIdentifiableDataFields {
|
||||||
|
if strings.Contains(name, sensitiveField) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func filterHttpBody(bytes []byte, contentType string, options *api.TrafficFilteringOptions) ([]byte, error) {
|
||||||
|
mimeType := strings.Split(contentType, ";")[0]
|
||||||
|
switch strings.ToLower(mimeType) {
|
||||||
|
case "application/json":
|
||||||
|
return filterJsonBody(bytes)
|
||||||
|
case "text/html":
|
||||||
|
fallthrough
|
||||||
|
case "application/xhtml+xml":
|
||||||
|
fallthrough
|
||||||
|
case "text/xml":
|
||||||
|
fallthrough
|
||||||
|
case "application/xml":
|
||||||
|
return filterXmlEtree(bytes)
|
||||||
|
case "text/plain":
|
||||||
|
if options != nil && options.PlainTextMaskingRegexes != nil {
|
||||||
|
return filterPlainText(bytes, options), nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return bytes, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func filterPlainText(bytes []byte, options *api.TrafficFilteringOptions) []byte {
|
||||||
|
for _, regex := range options.PlainTextMaskingRegexes {
|
||||||
|
bytes = regex.ReplaceAll(bytes, []byte(maskedFieldPlaceholderValue))
|
||||||
|
}
|
||||||
|
return bytes
|
||||||
|
}
|
||||||
|
|
||||||
|
func filterXmlEtree(bytes []byte) ([]byte, error) {
|
||||||
|
if !IsValidXML(bytes) {
|
||||||
|
return nil, errors.New("Invalid XML")
|
||||||
|
}
|
||||||
|
xmlDoc := etree.NewDocument()
|
||||||
|
err := xmlDoc.ReadFromBytes(bytes)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else {
|
||||||
|
filterXmlElement(xmlDoc.Root())
|
||||||
|
}
|
||||||
|
return xmlDoc.WriteToBytes()
|
||||||
|
}
|
||||||
|
|
||||||
|
func IsValidXML(data []byte) bool {
|
||||||
|
return xml.Unmarshal(data, new(interface{})) == nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func filterXmlElement(element *etree.Element) {
|
||||||
|
for i, attribute := range element.Attr {
|
||||||
|
if isFieldNameSensitive(attribute.Key) {
|
||||||
|
element.Attr[i].Value = maskedFieldPlaceholderValue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if element.ChildElements() == nil || len(element.ChildElements()) == 0 {
|
||||||
|
if isFieldNameSensitive(element.Tag) {
|
||||||
|
element.SetText(maskedFieldPlaceholderValue)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for _, element := range element.ChildElements() {
|
||||||
|
filterXmlElement(element)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func filterJsonBody(bytes []byte) ([]byte, error) {
|
||||||
|
var bodyJsonMap map[string]interface{}
|
||||||
|
err := json.Unmarshal(bytes, &bodyJsonMap)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
filterJsonMap(bodyJsonMap)
|
||||||
|
return json.Marshal(bodyJsonMap)
|
||||||
|
}
|
||||||
|
|
||||||
|
func filterJsonMap(jsonMap map[string]interface{}) {
|
||||||
|
for key, value := range jsonMap {
|
||||||
|
// Do not replace nil values with maskedFieldPlaceholderValue
|
||||||
|
if value == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
nestedMap, isNested := value.(map[string]interface{})
|
||||||
|
if isNested {
|
||||||
|
filterJsonMap(nestedMap)
|
||||||
|
} else {
|
||||||
|
if isFieldNameSensitive(key) {
|
||||||
|
jsonMap[key] = maskedFieldPlaceholderValue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func filterUrl(url *url.URL) {
|
||||||
|
if len(url.RawQuery) > 0 {
|
||||||
|
newQueryArgs := make([]string, 0)
|
||||||
|
for urlQueryParamName, urlQueryParamValues := range url.Query() {
|
||||||
|
newValues := urlQueryParamValues
|
||||||
|
if isFieldNameSensitive(urlQueryParamName) {
|
||||||
|
newValues = []string{maskedFieldPlaceholderValue}
|
||||||
|
}
|
||||||
|
for _, paramValue := range newValues {
|
||||||
|
newQueryArgs = append(newQueryArgs, fmt.Sprintf("%s=%s", urlQueryParamName, paramValue))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
url.RawQuery = strings.Join(newQueryArgs, "&")
|
||||||
|
}
|
||||||
|
}
|
@ -39,7 +39,7 @@ func (d dissecting) Ping() {
|
|||||||
log.Printf("pong %s\n", _protocol.Name)
|
log.Printf("pong %s\n", _protocol.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter) error {
|
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
|
||||||
for {
|
for {
|
||||||
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &_protocol {
|
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &_protocol {
|
||||||
return errors.New("Identified by another protocol")
|
return errors.New("Identified by another protocol")
|
||||||
|
@ -92,9 +92,10 @@ var outputLevel int
|
|||||||
var errorsMap map[string]uint
|
var errorsMap map[string]uint
|
||||||
var errorsMapMutex sync.Mutex
|
var errorsMapMutex sync.Mutex
|
||||||
var nErrors uint
|
var nErrors uint
|
||||||
var ownIps []string // global
|
var ownIps []string // global
|
||||||
var hostMode bool // global
|
var hostMode bool // global
|
||||||
var extensions []*api.Extension // global
|
var extensions []*api.Extension // global
|
||||||
|
var filteringOptions *api.TrafficFilteringOptions // global
|
||||||
|
|
||||||
const baseStreamChannelTimeoutMs int = 5000 * 100
|
const baseStreamChannelTimeoutMs int = 5000 * 100
|
||||||
|
|
||||||
@ -160,9 +161,10 @@ func (c *Context) GetCaptureInfo() gopacket.CaptureInfo {
|
|||||||
return c.CaptureInfo
|
return c.CaptureInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem, extensionsRef []*api.Extension) {
|
func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem, extensionsRef []*api.Extension, options *api.TrafficFilteringOptions) {
|
||||||
hostMode = opts.HostMode
|
hostMode = opts.HostMode
|
||||||
extensions = extensionsRef
|
extensions = extensionsRef
|
||||||
|
filteringOptions = options
|
||||||
|
|
||||||
if GetMemoryProfilingEnabled() {
|
if GetMemoryProfilingEnabled() {
|
||||||
startMemoryProfiler()
|
startMemoryProfiler()
|
||||||
|
@ -107,7 +107,7 @@ func (h *tcpReader) Close() {
|
|||||||
func (h *tcpReader) run(wg *sync.WaitGroup) {
|
func (h *tcpReader) run(wg *sync.WaitGroup) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
b := bufio.NewReader(h)
|
b := bufio.NewReader(h)
|
||||||
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter)
|
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter, filteringOptions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
io.Copy(ioutil.Discard, b)
|
io.Copy(ioutil.Discard, b)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user