From 931b6f4260743e6f191fda445a7962310eb767aa Mon Sep 17 00:00:00 2001 From: RamiBerm Date: Mon, 7 Jun 2021 11:35:12 +0300 Subject: [PATCH 1/2] Update main.go and messageSensitiveDataCleaner.go --- api/main.go | 147 ------------------ .../messageSensitiveDataCleaner.go | 9 ++ 2 files changed, 9 insertions(+), 147 deletions(-) delete mode 100644 api/main.go diff --git a/api/main.go b/api/main.go deleted file mode 100644 index b1bd17a6c..000000000 --- a/api/main.go +++ /dev/null @@ -1,147 +0,0 @@ -package main - -import ( - "encoding/json" - "flag" - "fmt" - "github.com/gofiber/fiber/v2" - "github.com/gorilla/websocket" - "github.com/up9inc/mizu/shared" - "mizuserver/pkg/api" - "mizuserver/pkg/middleware" - "mizuserver/pkg/models" - "mizuserver/pkg/routes" - "mizuserver/pkg/sensitiveDataFiltering" - "mizuserver/pkg/tap" - "mizuserver/pkg/utils" - "os" - "os/signal" -) - -var shouldTap = flag.Bool("tap", false, "Run in tapper mode without API") -var aggregator = flag.Bool("aggregator", false, "Run in aggregator mode with API") -var standalone = flag.Bool("standalone", false, "Run in standalone tapper and API mode") -var aggregatorAddress = flag.String("aggregator-address", "", "Address of mizu collector for tapping") - - -func main() { - flag.Parse() - - if !*shouldTap && !*aggregator && !*standalone{ - panic("One of the flags --tap, --api or --standalone must be provided") - } - - if *standalone { - harOutputChannel := tap.StartPassiveTapper() - filteredHarChannel := make(chan *tap.OutputChannelItem) - go filterHarHeaders(harOutputChannel, filteredHarChannel, getTrafficFilteringOptions()) - go api.StartReadingEntries(filteredHarChannel, nil) - hostApi(nil) - } else if *shouldTap { - if *aggregatorAddress == "" { - panic("Aggregator address must be provided with --aggregator-address when using --tap") - } - - tapTargets := getTapTargets() - if tapTargets != nil { - tap.HostAppAddresses = tapTargets - fmt.Println("Filtering for the following addresses:", tap.HostAppAddresses) - } - - harOutputChannel := tap.StartPassiveTapper() - socketConnection, err := shared.ConnectToSocketServer(*aggregatorAddress, shared.DEFAULT_SOCKET_RETRIES, shared.DEFAULT_SOCKET_RETRY_SLEEP_TIME, false) - if err != nil { - panic(fmt.Sprintf("Error connecting to socket server at %s %v", *aggregatorAddress, err)) - } - go pipeChannelToSocket(socketConnection, harOutputChannel) - } else if *aggregator { - socketHarOutChannel := make(chan *tap.OutputChannelItem, 1000) - filteredHarChannel := make(chan *tap.OutputChannelItem) - go api.StartReadingEntries(filteredHarChannel, nil) - go filterHarHeaders(socketHarOutChannel, filteredHarChannel, getTrafficFilteringOptions()) - hostApi(socketHarOutChannel) - } - - signalChan := make(chan os.Signal, 1) - signal.Notify(signalChan, os.Interrupt) - <-signalChan - - fmt.Println("Exiting") -} - -func hostApi(socketHarOutputChannel chan<- *tap.OutputChannelItem) { - app := fiber.New() - - - middleware.FiberMiddleware(app) // Register Fiber's middleware for app. - app.Static("/", "./site") - - //Simple route to know server is running - app.Get("/echo", func(c *fiber.Ctx) error { - return c.SendString("Hello, World 👋!") - }) - eventHandlers := api.RoutesEventHandlers{ - SocketHarOutChannel: socketHarOutputChannel, - } - routes.WebSocketRoutes(app, &eventHandlers) - routes.EntriesRoutes(app) - routes.NotFoundRoute(app) - - utils.StartServer(app) -} - - -func getTapTargets() []string { - nodeName := os.Getenv(shared.NodeNameEnvVar) - var tappedAddressesPerNodeDict map[string][]string - err := json.Unmarshal([]byte(os.Getenv(shared.TappedAddressesPerNodeDictEnvVar)), &tappedAddressesPerNodeDict) - if err != nil { - panic(fmt.Sprintf("env var %s's value of %s is invalid! must be map[string][]string %v", shared.TappedAddressesPerNodeDictEnvVar, tappedAddressesPerNodeDict, err)) - } - return tappedAddressesPerNodeDict[nodeName] -} - -func getTrafficFilteringOptions() *shared.TrafficFilteringOptions { - filteringOptionsJson := os.Getenv(shared.MizuFilteringOptionsEnvVar) - if filteringOptionsJson == "" { - return nil - } - var filteringOptions shared.TrafficFilteringOptions - err := json.Unmarshal([]byte(filteringOptionsJson), &filteringOptions) - if err != nil { - panic(fmt.Sprintf("env var %s's value of %s is invalid! json must match the shared.TrafficFilteringOptions struct %v", shared.MizuFilteringOptionsEnvVar, filteringOptionsJson, err)) - } - - return &filteringOptions -} - -func filterHarHeaders(inChannel <- chan *tap.OutputChannelItem, outChannel chan *tap.OutputChannelItem, filterOptions *shared.TrafficFilteringOptions) { - for message := range inChannel { - sensitiveDataFiltering.FilterSensitiveInfoFromHarRequest(message, filterOptions) - outChannel <- message - } -} - -func pipeChannelToSocket(connection *websocket.Conn, messageDataChannel <-chan *tap.OutputChannelItem) { - if connection == nil { - panic("Websocket connection is nil") - } - - if messageDataChannel == nil { - panic("Channel of captured messages is nil") - } - - for messageData := range messageDataChannel { - marshaledData, err := models.CreateWebsocketTappedEntryMessage(messageData) - if err != nil { - fmt.Printf("error converting message to json %s, (%v,%+v)\n", err, err, err) - continue - } - - err = connection.WriteMessage(websocket.TextMessage, marshaledData) - if err != nil { - fmt.Printf("error sending message through socket server %s, (%v,%+v)\n", err, err, err) - continue - } - } -} diff --git a/api/pkg/sensitiveDataFiltering/messageSensitiveDataCleaner.go b/api/pkg/sensitiveDataFiltering/messageSensitiveDataCleaner.go index baa46e3e6..7cf9660f9 100644 --- a/api/pkg/sensitiveDataFiltering/messageSensitiveDataCleaner.go +++ b/api/pkg/sensitiveDataFiltering/messageSensitiveDataCleaner.go @@ -2,6 +2,8 @@ package sensitiveDataFiltering import ( "encoding/json" + "encoding/xml" + "errors" "fmt" "mizuserver/pkg/tap" "net/url" @@ -103,6 +105,9 @@ func filterPlainText(bytes []byte, options *shared.TrafficFilteringOptions) []by } func filterXmlEtree(bytes []byte) ([]byte, error) { + if !IsValidXML(bytes) { + return nil, errors.New("Invalid XML") + } xmlDoc := etree.NewDocument() err := xmlDoc.ReadFromBytes(bytes) if err != nil { @@ -113,6 +118,10 @@ func filterXmlEtree(bytes []byte) ([]byte, error) { return xmlDoc.WriteToBytes() } +func IsValidXML(data []byte) bool { + return xml.Unmarshal(data, new(interface{})) == nil +} + func filterXmlElement(element *etree.Element) { for i, attribute := range element.Attr { if isFieldNameSensitive(attribute.Key) { From b7d3ff6eb8795ef430d09c4e46432745127cc980 Mon Sep 17 00:00:00 2001 From: RamiBerm Date: Mon, 7 Jun 2021 11:35:50 +0300 Subject: [PATCH 2/2] Update main.go --- api/main.go | 147 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 147 insertions(+) create mode 100644 api/main.go diff --git a/api/main.go b/api/main.go new file mode 100644 index 000000000..b1bd17a6c --- /dev/null +++ b/api/main.go @@ -0,0 +1,147 @@ +package main + +import ( + "encoding/json" + "flag" + "fmt" + "github.com/gofiber/fiber/v2" + "github.com/gorilla/websocket" + "github.com/up9inc/mizu/shared" + "mizuserver/pkg/api" + "mizuserver/pkg/middleware" + "mizuserver/pkg/models" + "mizuserver/pkg/routes" + "mizuserver/pkg/sensitiveDataFiltering" + "mizuserver/pkg/tap" + "mizuserver/pkg/utils" + "os" + "os/signal" +) + +var shouldTap = flag.Bool("tap", false, "Run in tapper mode without API") +var aggregator = flag.Bool("aggregator", false, "Run in aggregator mode with API") +var standalone = flag.Bool("standalone", false, "Run in standalone tapper and API mode") +var aggregatorAddress = flag.String("aggregator-address", "", "Address of mizu collector for tapping") + + +func main() { + flag.Parse() + + if !*shouldTap && !*aggregator && !*standalone{ + panic("One of the flags --tap, --api or --standalone must be provided") + } + + if *standalone { + harOutputChannel := tap.StartPassiveTapper() + filteredHarChannel := make(chan *tap.OutputChannelItem) + go filterHarHeaders(harOutputChannel, filteredHarChannel, getTrafficFilteringOptions()) + go api.StartReadingEntries(filteredHarChannel, nil) + hostApi(nil) + } else if *shouldTap { + if *aggregatorAddress == "" { + panic("Aggregator address must be provided with --aggregator-address when using --tap") + } + + tapTargets := getTapTargets() + if tapTargets != nil { + tap.HostAppAddresses = tapTargets + fmt.Println("Filtering for the following addresses:", tap.HostAppAddresses) + } + + harOutputChannel := tap.StartPassiveTapper() + socketConnection, err := shared.ConnectToSocketServer(*aggregatorAddress, shared.DEFAULT_SOCKET_RETRIES, shared.DEFAULT_SOCKET_RETRY_SLEEP_TIME, false) + if err != nil { + panic(fmt.Sprintf("Error connecting to socket server at %s %v", *aggregatorAddress, err)) + } + go pipeChannelToSocket(socketConnection, harOutputChannel) + } else if *aggregator { + socketHarOutChannel := make(chan *tap.OutputChannelItem, 1000) + filteredHarChannel := make(chan *tap.OutputChannelItem) + go api.StartReadingEntries(filteredHarChannel, nil) + go filterHarHeaders(socketHarOutChannel, filteredHarChannel, getTrafficFilteringOptions()) + hostApi(socketHarOutChannel) + } + + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, os.Interrupt) + <-signalChan + + fmt.Println("Exiting") +} + +func hostApi(socketHarOutputChannel chan<- *tap.OutputChannelItem) { + app := fiber.New() + + + middleware.FiberMiddleware(app) // Register Fiber's middleware for app. + app.Static("/", "./site") + + //Simple route to know server is running + app.Get("/echo", func(c *fiber.Ctx) error { + return c.SendString("Hello, World 👋!") + }) + eventHandlers := api.RoutesEventHandlers{ + SocketHarOutChannel: socketHarOutputChannel, + } + routes.WebSocketRoutes(app, &eventHandlers) + routes.EntriesRoutes(app) + routes.NotFoundRoute(app) + + utils.StartServer(app) +} + + +func getTapTargets() []string { + nodeName := os.Getenv(shared.NodeNameEnvVar) + var tappedAddressesPerNodeDict map[string][]string + err := json.Unmarshal([]byte(os.Getenv(shared.TappedAddressesPerNodeDictEnvVar)), &tappedAddressesPerNodeDict) + if err != nil { + panic(fmt.Sprintf("env var %s's value of %s is invalid! must be map[string][]string %v", shared.TappedAddressesPerNodeDictEnvVar, tappedAddressesPerNodeDict, err)) + } + return tappedAddressesPerNodeDict[nodeName] +} + +func getTrafficFilteringOptions() *shared.TrafficFilteringOptions { + filteringOptionsJson := os.Getenv(shared.MizuFilteringOptionsEnvVar) + if filteringOptionsJson == "" { + return nil + } + var filteringOptions shared.TrafficFilteringOptions + err := json.Unmarshal([]byte(filteringOptionsJson), &filteringOptions) + if err != nil { + panic(fmt.Sprintf("env var %s's value of %s is invalid! json must match the shared.TrafficFilteringOptions struct %v", shared.MizuFilteringOptionsEnvVar, filteringOptionsJson, err)) + } + + return &filteringOptions +} + +func filterHarHeaders(inChannel <- chan *tap.OutputChannelItem, outChannel chan *tap.OutputChannelItem, filterOptions *shared.TrafficFilteringOptions) { + for message := range inChannel { + sensitiveDataFiltering.FilterSensitiveInfoFromHarRequest(message, filterOptions) + outChannel <- message + } +} + +func pipeChannelToSocket(connection *websocket.Conn, messageDataChannel <-chan *tap.OutputChannelItem) { + if connection == nil { + panic("Websocket connection is nil") + } + + if messageDataChannel == nil { + panic("Channel of captured messages is nil") + } + + for messageData := range messageDataChannel { + marshaledData, err := models.CreateWebsocketTappedEntryMessage(messageData) + if err != nil { + fmt.Printf("error converting message to json %s, (%v,%+v)\n", err, err, err) + continue + } + + err = connection.WriteMessage(websocket.TextMessage, marshaledData) + if err != nil { + fmt.Printf("error sending message through socket server %s, (%v,%+v)\n", err, err, err) + continue + } + } +}