mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-08-13 06:08:15 +00:00
TRA-3278 data masking 2nd step
TRA-3278 data masking 2nd step
This commit is contained in:
commit
7dad5be676
@ -4,6 +4,7 @@ go 1.16
|
|||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/antoniodipinto/ikisocket v0.0.0-20210417133349-f1502512d69a
|
github.com/antoniodipinto/ikisocket v0.0.0-20210417133349-f1502512d69a
|
||||||
|
github.com/beevik/etree v1.1.0
|
||||||
github.com/djherbis/atime v1.0.0
|
github.com/djherbis/atime v1.0.0
|
||||||
github.com/fasthttp/websocket v1.4.3-beta.1 // indirect
|
github.com/fasthttp/websocket v1.4.3-beta.1 // indirect
|
||||||
github.com/go-playground/locales v0.13.0
|
github.com/go-playground/locales v0.13.0
|
||||||
|
@ -48,6 +48,8 @@ github.com/antoniodipinto/ikisocket v0.0.0-20210417133349-f1502512d69a h1:76llBl
|
|||||||
github.com/antoniodipinto/ikisocket v0.0.0-20210417133349-f1502512d69a/go.mod h1:QvDfsDQDmGxUsvEeWabVZ5pp2FMXpOkwQV0L6SE6cp0=
|
github.com/antoniodipinto/ikisocket v0.0.0-20210417133349-f1502512d69a/go.mod h1:QvDfsDQDmGxUsvEeWabVZ5pp2FMXpOkwQV0L6SE6cp0=
|
||||||
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
|
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
|
||||||
github.com/aws/aws-sdk-go v1.34.28/go.mod h1:H7NKnBqNVzoTJpGfLrQkkD+ytBA93eiDYi/+8rV9s48=
|
github.com/aws/aws-sdk-go v1.34.28/go.mod h1:H7NKnBqNVzoTJpGfLrQkkD+ytBA93eiDYi/+8rV9s48=
|
||||||
|
github.com/beevik/etree v1.1.0 h1:T0xke/WvNtMoCqgzPhkX2r4rjY3GDZFi+FjpRZY2Jbs=
|
||||||
|
github.com/beevik/etree v1.1.0/go.mod h1:r8Aw8JqVegEf0w2fDnATrX9VpkMcyFeM0FhwO62wh+A=
|
||||||
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
||||||
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
|
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
|
||||||
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
|
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
|
||||||
|
36
api/main.go
36
api/main.go
@ -23,8 +23,6 @@ var aggregator = flag.Bool("aggregator", false, "Run in aggregator mode with API
|
|||||||
var standalone = flag.Bool("standalone", false, "Run in standalone tapper and API mode")
|
var standalone = flag.Bool("standalone", false, "Run in standalone tapper and API mode")
|
||||||
var aggregatorAddress = flag.String("aggregator-address", "", "Address of mizu collector for tapping")
|
var aggregatorAddress = flag.String("aggregator-address", "", "Address of mizu collector for tapping")
|
||||||
|
|
||||||
const nodeNameEnvVar = "NODE_NAME"
|
|
||||||
const tappedAddressesPerNodeDictEnvVar = "TAPPED_ADDRESSES_PER_HOST"
|
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
@ -36,7 +34,7 @@ func main() {
|
|||||||
if *standalone {
|
if *standalone {
|
||||||
harOutputChannel := tap.StartPassiveTapper()
|
harOutputChannel := tap.StartPassiveTapper()
|
||||||
filteredHarChannel := make(chan *tap.OutputChannelItem)
|
filteredHarChannel := make(chan *tap.OutputChannelItem)
|
||||||
go filterHarHeaders(harOutputChannel, filteredHarChannel)
|
go filterHarHeaders(harOutputChannel, filteredHarChannel, getTrafficFilteringOptions())
|
||||||
go api.StartReadingEntries(filteredHarChannel, nil)
|
go api.StartReadingEntries(filteredHarChannel, nil)
|
||||||
hostApi(nil)
|
hostApi(nil)
|
||||||
} else if *shouldTap {
|
} else if *shouldTap {
|
||||||
@ -55,12 +53,12 @@ func main() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Sprintf("Error connecting to socket server at %s %v", *aggregatorAddress, err))
|
panic(fmt.Sprintf("Error connecting to socket server at %s %v", *aggregatorAddress, err))
|
||||||
}
|
}
|
||||||
filteredHarChannel := make(chan *tap.OutputChannelItem)
|
go pipeChannelToSocket(socketConnection, harOutputChannel)
|
||||||
go filterHarHeaders(harOutputChannel, filteredHarChannel)
|
|
||||||
go pipeChannelToSocket(socketConnection, filteredHarChannel)
|
|
||||||
} else if *aggregator {
|
} else if *aggregator {
|
||||||
socketHarOutChannel := make(chan *tap.OutputChannelItem, 1000)
|
socketHarOutChannel := make(chan *tap.OutputChannelItem, 1000)
|
||||||
go api.StartReadingEntries(socketHarOutChannel, nil)
|
filteredHarChannel := make(chan *tap.OutputChannelItem)
|
||||||
|
go api.StartReadingEntries(filteredHarChannel, nil)
|
||||||
|
go filterHarHeaders(socketHarOutChannel, filteredHarChannel, getTrafficFilteringOptions())
|
||||||
hostApi(socketHarOutChannel)
|
hostApi(socketHarOutChannel)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -94,18 +92,32 @@ func hostApi(socketHarOutputChannel chan<- *tap.OutputChannelItem) {
|
|||||||
|
|
||||||
|
|
||||||
func getTapTargets() []string {
|
func getTapTargets() []string {
|
||||||
nodeName := os.Getenv(nodeNameEnvVar)
|
nodeName := os.Getenv(shared.NodeNameEnvVar)
|
||||||
var tappedAddressesPerNodeDict map[string][]string
|
var tappedAddressesPerNodeDict map[string][]string
|
||||||
err := json.Unmarshal([]byte(os.Getenv(tappedAddressesPerNodeDictEnvVar)), &tappedAddressesPerNodeDict)
|
err := json.Unmarshal([]byte(os.Getenv(shared.TappedAddressesPerNodeDictEnvVar)), &tappedAddressesPerNodeDict)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Sprintf("env var value of %s is invalid! must be map[string][]string %v", tappedAddressesPerNodeDict, err))
|
panic(fmt.Sprintf("env var %s's value of %s is invalid! must be map[string][]string %v", shared.TappedAddressesPerNodeDictEnvVar, tappedAddressesPerNodeDict, err))
|
||||||
}
|
}
|
||||||
return tappedAddressesPerNodeDict[nodeName]
|
return tappedAddressesPerNodeDict[nodeName]
|
||||||
}
|
}
|
||||||
|
|
||||||
func filterHarHeaders(inChannel <- chan *tap.OutputChannelItem, outChannel chan *tap.OutputChannelItem) {
|
func getTrafficFilteringOptions() *shared.TrafficFilteringOptions {
|
||||||
|
filteringOptionsJson := os.Getenv(shared.MizuFilteringOptionsEnvVar)
|
||||||
|
if filteringOptionsJson == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
var filteringOptions shared.TrafficFilteringOptions
|
||||||
|
err := json.Unmarshal([]byte(filteringOptionsJson), &filteringOptions)
|
||||||
|
if err != nil {
|
||||||
|
panic(fmt.Sprintf("env var %s's value of %s is invalid! json must match the shared.TrafficFilteringOptions struct %v", shared.MizuFilteringOptionsEnvVar, filteringOptionsJson, err))
|
||||||
|
}
|
||||||
|
|
||||||
|
return &filteringOptions
|
||||||
|
}
|
||||||
|
|
||||||
|
func filterHarHeaders(inChannel <- chan *tap.OutputChannelItem, outChannel chan *tap.OutputChannelItem, filterOptions *shared.TrafficFilteringOptions) {
|
||||||
for message := range inChannel {
|
for message := range inChannel {
|
||||||
sensitiveDataFiltering.FilterSensitiveInfoFromHarRequest(message)
|
sensitiveDataFiltering.FilterSensitiveInfoFromHarRequest(message, filterOptions)
|
||||||
outChannel <- message
|
outChannel <- message
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2,7 +2,7 @@ package sensitiveDataFiltering
|
|||||||
|
|
||||||
const maskedFieldPlaceholderValue = "[REDACTED]"
|
const maskedFieldPlaceholderValue = "[REDACTED]"
|
||||||
|
|
||||||
//these values MUST be all lower case
|
//these values MUST be all lower case and contain no `-` or `_` characters
|
||||||
var personallyIdentifiableDataFields = []string{"token", "authorization", "authentication", "cookie", "userid", "password",
|
var personallyIdentifiableDataFields = []string{"token", "authorization", "authentication", "cookie", "userid", "password",
|
||||||
"username", "user", "key", "passcode", "pass", "auth", "authtoken", "jwt",
|
"username", "user", "key", "passcode", "pass", "auth", "authtoken", "jwt",
|
||||||
"bearer", "clientid", "clientsecret", "redirecturi", "phonenumber",
|
"bearer", "clientid", "clientsecret", "redirecturi", "phonenumber",
|
||||||
|
@ -3,13 +3,16 @@ package sensitiveDataFiltering
|
|||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/google/martian/har"
|
|
||||||
"mizuserver/pkg/tap"
|
"mizuserver/pkg/tap"
|
||||||
"net/url"
|
"net/url"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/beevik/etree"
|
||||||
|
"github.com/google/martian/har"
|
||||||
|
"github.com/up9inc/mizu/shared"
|
||||||
)
|
)
|
||||||
|
|
||||||
func FilterSensitiveInfoFromHarRequest(harOutputItem *tap.OutputChannelItem) {
|
func FilterSensitiveInfoFromHarRequest(harOutputItem *tap.OutputChannelItem, options *shared.TrafficFilteringOptions) {
|
||||||
filterHarHeaders(harOutputItem.HarEntry.Request.Headers)
|
filterHarHeaders(harOutputItem.HarEntry.Request.Headers)
|
||||||
filterHarHeaders(harOutputItem.HarEntry.Response.Headers)
|
filterHarHeaders(harOutputItem.HarEntry.Response.Headers)
|
||||||
|
|
||||||
@ -24,13 +27,15 @@ func FilterSensitiveInfoFromHarRequest(harOutputItem *tap.OutputChannelItem) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if harOutputItem.HarEntry.Request.PostData != nil {
|
if harOutputItem.HarEntry.Request.PostData != nil {
|
||||||
filteredRequestBody, err := filterHttpBody([]byte(harOutputItem.HarEntry.Request.PostData.Text))
|
requestContentType := getContentTypeHeaderValue(harOutputItem.HarEntry.Request.Headers)
|
||||||
|
filteredRequestBody, err := filterHttpBody([]byte(harOutputItem.HarEntry.Request.PostData.Text), requestContentType, options)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
harOutputItem.HarEntry.Request.PostData.Text = string(filteredRequestBody)
|
harOutputItem.HarEntry.Request.PostData.Text = string(filteredRequestBody)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if harOutputItem.HarEntry.Response.Content != nil {
|
if harOutputItem.HarEntry.Response.Content != nil {
|
||||||
filteredResponseBody, err := filterHttpBody(harOutputItem.HarEntry.Response.Content.Text)
|
responseContentType := getContentTypeHeaderValue(harOutputItem.HarEntry.Response.Headers)
|
||||||
|
filteredResponseBody, err := filterHttpBody(harOutputItem.HarEntry.Response.Content.Text, responseContentType, options)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
harOutputItem.HarEntry.Response.Content.Text = filteredResponseBody
|
harOutputItem.HarEntry.Response.Content.Text = filteredResponseBody
|
||||||
}
|
}
|
||||||
@ -45,6 +50,15 @@ func filterHarHeaders(headers []har.Header) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getContentTypeHeaderValue(headers []har.Header) string {
|
||||||
|
for _, header := range headers {
|
||||||
|
if strings.ToLower(header.Name) == "content-type" {
|
||||||
|
return header.Value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
func isFieldNameSensitive(fieldName string) bool {
|
func isFieldNameSensitive(fieldName string) bool {
|
||||||
name := strings.ToLower(fieldName)
|
name := strings.ToLower(fieldName)
|
||||||
name = strings.ReplaceAll(name, "_", "")
|
name = strings.ReplaceAll(name, "_", "")
|
||||||
@ -60,7 +74,63 @@ func isFieldNameSensitive(fieldName string) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func filterHttpBody(bytes []byte) ([]byte, error){
|
func filterHttpBody(bytes []byte, contentType string, options *shared.TrafficFilteringOptions) ([]byte, error) {
|
||||||
|
mimeType := strings.Split(contentType, ";")[0]
|
||||||
|
switch strings.ToLower(mimeType) {
|
||||||
|
case "application/json":
|
||||||
|
return filterJsonBody(bytes)
|
||||||
|
case "text/html":
|
||||||
|
fallthrough
|
||||||
|
case "application/xhtml+xml":
|
||||||
|
fallthrough
|
||||||
|
case "text/xml":
|
||||||
|
fallthrough
|
||||||
|
case "application/xml":
|
||||||
|
return filterXmlEtree(bytes)
|
||||||
|
case "text/plain":
|
||||||
|
if options != nil && options.PlainTextMaskingRegexes != nil {
|
||||||
|
return filterPlainText(bytes, options), nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return bytes, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func filterPlainText(bytes []byte, options *shared.TrafficFilteringOptions) []byte {
|
||||||
|
for _, regex := range options.PlainTextMaskingRegexes {
|
||||||
|
bytes = regex.ReplaceAll(bytes, []byte(maskedFieldPlaceholderValue))
|
||||||
|
}
|
||||||
|
return bytes
|
||||||
|
}
|
||||||
|
|
||||||
|
func filterXmlEtree(bytes []byte) ([]byte, error) {
|
||||||
|
xmlDoc := etree.NewDocument()
|
||||||
|
err := xmlDoc.ReadFromBytes(bytes)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else {
|
||||||
|
filterXmlElement(xmlDoc.Root())
|
||||||
|
}
|
||||||
|
return xmlDoc.WriteToBytes()
|
||||||
|
}
|
||||||
|
|
||||||
|
func filterXmlElement(element *etree.Element) {
|
||||||
|
for i, attribute := range element.Attr {
|
||||||
|
if isFieldNameSensitive(attribute.Key) {
|
||||||
|
element.Attr[i].Value = maskedFieldPlaceholderValue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if element.ChildElements() == nil || len(element.ChildElements()) == 0 {
|
||||||
|
if isFieldNameSensitive(element.Tag) {
|
||||||
|
element.SetText(maskedFieldPlaceholderValue)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for _, element := range element.ChildElements() {
|
||||||
|
filterXmlElement(element)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func filterJsonBody(bytes []byte) ([]byte, error) {
|
||||||
var bodyJsonMap map[string] interface{}
|
var bodyJsonMap map[string] interface{}
|
||||||
err := json.Unmarshal(bytes ,&bodyJsonMap)
|
err := json.Unmarshal(bytes ,&bodyJsonMap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -13,6 +13,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/up9inc/mizu/shared"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
@ -34,7 +35,6 @@ import (
|
|||||||
const AppPortsEnvVar = "APP_PORTS"
|
const AppPortsEnvVar = "APP_PORTS"
|
||||||
const OutPortEnvVar = "WEB_SOCKET_PORT"
|
const OutPortEnvVar = "WEB_SOCKET_PORT"
|
||||||
const maxHTTP2DataLenEnvVar = "HTTP2_DATA_SIZE_LIMIT"
|
const maxHTTP2DataLenEnvVar = "HTTP2_DATA_SIZE_LIMIT"
|
||||||
const hostModeEnvVar = "HOST_MODE"
|
|
||||||
// default is 1MB, more than the max size accepted by collector and traffic-dumper
|
// default is 1MB, more than the max size accepted by collector and traffic-dumper
|
||||||
const maxHTTP2DataLenDefault = 1 * 1024 * 1024
|
const maxHTTP2DataLenDefault = 1 * 1024 * 1024
|
||||||
const cleanPeriod = time.Second * 10
|
const cleanPeriod = time.Second * 10
|
||||||
@ -258,7 +258,7 @@ func startPassiveTapper(harWriter *HarWriter) {
|
|||||||
maxHTTP2DataLen = convertedInt
|
maxHTTP2DataLen = convertedInt
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
hostMode = os.Getenv(hostModeEnvVar) == "1"
|
hostMode = os.Getenv(shared.HostModeEnvVar) == "1"
|
||||||
|
|
||||||
fmt.Printf("App Ports: %v\n", appPorts)
|
fmt.Printf("App Ports: %v\n", appPorts)
|
||||||
fmt.Printf("Tap output websocket port: %s\n", tapOutputPort)
|
fmt.Printf("Tap output websocket port: %s\n", tapOutputPort)
|
||||||
|
@ -10,11 +10,12 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type MizuTapOptions struct {
|
type MizuTapOptions struct {
|
||||||
GuiPort uint16
|
GuiPort uint16
|
||||||
Namespace string
|
Namespace string
|
||||||
KubeConfigPath string
|
KubeConfigPath string
|
||||||
MizuImage string
|
MizuImage string
|
||||||
MizuPodPort uint16
|
MizuPodPort uint16
|
||||||
|
PlainTextFilterRegexes []string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -50,4 +51,5 @@ func init() {
|
|||||||
tapCmd.Flags().StringVarP(&mizuTapOptions.KubeConfigPath, "kube-config", "k", "", "Path to kube-config file")
|
tapCmd.Flags().StringVarP(&mizuTapOptions.KubeConfigPath, "kube-config", "k", "", "Path to kube-config file")
|
||||||
tapCmd.Flags().StringVarP(&mizuTapOptions.MizuImage, "mizu-image", "", fmt.Sprintf("gcr.io/up9-docker-hub/mizu/%s:latest", mizu.Branch), "Custom image for mizu collector")
|
tapCmd.Flags().StringVarP(&mizuTapOptions.MizuImage, "mizu-image", "", fmt.Sprintf("gcr.io/up9-docker-hub/mizu/%s:latest", mizu.Branch), "Custom image for mizu collector")
|
||||||
tapCmd.Flags().Uint16VarP(&mizuTapOptions.MizuPodPort, "mizu-port", "", 8899, "Port which mizu cli will attempt to forward from the mizu collector pod")
|
tapCmd.Flags().Uint16VarP(&mizuTapOptions.MizuPodPort, "mizu-port", "", 8899, "Port which mizu cli will attempt to forward from the mizu collector pod")
|
||||||
|
tapCmd.Flags().StringArrayVarP(&mizuTapOptions.PlainTextFilterRegexes, "regex-masking", "r", nil, "List of regex expressions that are used to filter matching values from text/plain http bodies")
|
||||||
}
|
}
|
||||||
|
@ -3,6 +3,7 @@ package cmd
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/up9inc/mizu/shared"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"regexp"
|
"regexp"
|
||||||
@ -26,6 +27,10 @@ const (
|
|||||||
var currentlyTappedPods []core.Pod
|
var currentlyTappedPods []core.Pod
|
||||||
|
|
||||||
func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) {
|
func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) {
|
||||||
|
mizuApiFilteringOptions, err := getMizuApiFilteringOptions(tappingOptions)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
kubernetesProvider := kubernetes.NewProvider(tappingOptions.KubeConfigPath, tappingOptions.Namespace)
|
kubernetesProvider := kubernetes.NewProvider(tappingOptions.KubeConfigPath, tappingOptions.Namespace)
|
||||||
|
|
||||||
defer cleanUpMizuResources(kubernetesProvider)
|
defer cleanUpMizuResources(kubernetesProvider)
|
||||||
@ -43,7 +48,7 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := createMizuResources(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil {
|
if err := createMizuResources(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions, mizuApiFilteringOptions); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -57,8 +62,8 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) {
|
|||||||
// TODO handle incoming traffic from tapper using a channel
|
// TODO handle incoming traffic from tapper using a channel
|
||||||
}
|
}
|
||||||
|
|
||||||
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions) error {
|
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions, mizuApiFilteringOptions *shared.TrafficFilteringOptions) error {
|
||||||
if err := createMizuAggregator(ctx, kubernetesProvider, tappingOptions); err != nil {
|
if err := createMizuAggregator(ctx, kubernetesProvider, tappingOptions, mizuApiFilteringOptions); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -69,11 +74,11 @@ func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func createMizuAggregator(ctx context.Context, kubernetesProvider *kubernetes.Provider, tappingOptions *MizuTapOptions) error {
|
func createMizuAggregator(ctx context.Context, kubernetesProvider *kubernetes.Provider, tappingOptions *MizuTapOptions, mizuApiFilteringOptions *shared.TrafficFilteringOptions) error {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
mizuServiceAccountExists = createRBACIfNecessary(ctx, kubernetesProvider)
|
mizuServiceAccountExists = createRBACIfNecessary(ctx, kubernetesProvider)
|
||||||
_, err = kubernetesProvider.CreateMizuAggregatorPod(ctx, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.MizuImage, mizuServiceAccountExists)
|
_, err = kubernetesProvider.CreateMizuAggregatorPod(ctx, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.MizuImage, mizuServiceAccountExists, mizuApiFilteringOptions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("Error creating mizu collector pod: %v\n", err)
|
fmt.Printf("Error creating mizu collector pod: %v\n", err)
|
||||||
return err
|
return err
|
||||||
@ -88,6 +93,24 @@ func createMizuAggregator(ctx context.Context, kubernetesProvider *kubernetes.Pr
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getMizuApiFilteringOptions(tappingOptions *MizuTapOptions) (*shared.TrafficFilteringOptions, error) {
|
||||||
|
if tappingOptions.PlainTextFilterRegexes == nil || len(tappingOptions.PlainTextFilterRegexes) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
compiledRegexSlice := make([]*shared.SerializableRegexp, 0)
|
||||||
|
for _, regexStr := range tappingOptions.PlainTextFilterRegexes {
|
||||||
|
compiledRegex, err := shared.CompileRegexToSerializableRegexp(regexStr)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("Regex %s is invalid: %v", regexStr, err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
compiledRegexSlice = append(compiledRegexSlice, compiledRegex)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &shared.TrafficFilteringOptions{PlainTextMaskingRegexes: compiledRegexSlice}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func createMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions) error {
|
func createMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions) error {
|
||||||
if err := kubernetesProvider.ApplyMizuTapperDaemonSet(
|
if err := kubernetesProvider.ApplyMizuTapperDaemonSet(
|
||||||
ctx,
|
ctx,
|
||||||
|
@ -10,15 +10,16 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
"regexp"
|
"regexp"
|
||||||
|
|
||||||
applyconfapp "k8s.io/client-go/applyconfigurations/apps/v1"
|
"github.com/up9inc/mizu/shared"
|
||||||
applyconfmeta "k8s.io/client-go/applyconfigurations/meta/v1"
|
|
||||||
applyconfcore "k8s.io/client-go/applyconfigurations/core/v1"
|
|
||||||
core "k8s.io/api/core/v1"
|
core "k8s.io/api/core/v1"
|
||||||
rbac "k8s.io/api/rbac/v1"
|
rbac "k8s.io/api/rbac/v1"
|
||||||
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/util/intstr"
|
"k8s.io/apimachinery/pkg/util/intstr"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
|
applyconfapp "k8s.io/client-go/applyconfigurations/apps/v1"
|
||||||
|
applyconfcore "k8s.io/client-go/applyconfigurations/core/v1"
|
||||||
|
applyconfmeta "k8s.io/client-go/applyconfigurations/meta/v1"
|
||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
_ "k8s.io/client-go/plugin/pkg/client/auth/azure"
|
_ "k8s.io/client-go/plugin/pkg/client/auth/azure"
|
||||||
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
|
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
|
||||||
@ -85,7 +86,11 @@ func (provider *Provider) GetPods(ctx context.Context, namespace string) {
|
|||||||
fmt.Printf("There are %d pods in Namespace %s\n", len(pods.Items), namespace)
|
fmt.Printf("There are %d pods in Namespace %s\n", len(pods.Items), namespace)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace string, podName string, podImage string, linkServiceAccount bool) (*core.Pod, error) {
|
func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace string, podName string, podImage string, linkServiceAccount bool, mizuApiFilteringOptions *shared.TrafficFilteringOptions) (*core.Pod, error) {
|
||||||
|
marshaledFilteringOptions, err := json.Marshal(mizuApiFilteringOptions)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
pod := &core.Pod{
|
pod := &core.Pod{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Name: podName,
|
Name: podName,
|
||||||
@ -101,9 +106,13 @@ func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace
|
|||||||
Command: []string {"./mizuagent", "--aggregator"},
|
Command: []string {"./mizuagent", "--aggregator"},
|
||||||
Env: []core.EnvVar{
|
Env: []core.EnvVar{
|
||||||
{
|
{
|
||||||
Name: "HOST_MODE",
|
Name: shared.HostModeEnvVar,
|
||||||
Value: "1",
|
Value: "1",
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
Name: shared.MizuFilteringOptionsEnvVar,
|
||||||
|
Value: string(marshaledFilteringOptions),
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -232,12 +241,11 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
|
|||||||
agentContainer.WithSecurityContext(applyconfcore.SecurityContext().WithPrivileged(privileged))
|
agentContainer.WithSecurityContext(applyconfcore.SecurityContext().WithPrivileged(privileged))
|
||||||
agentContainer.WithCommand("./mizuagent", "-i", "any", "--tap", "--hardump", "--aggregator-address", fmt.Sprintf("ws://%s/wsTapper", aggregatorPodIp))
|
agentContainer.WithCommand("./mizuagent", "-i", "any", "--tap", "--hardump", "--aggregator-address", fmt.Sprintf("ws://%s/wsTapper", aggregatorPodIp))
|
||||||
agentContainer.WithEnv(
|
agentContainer.WithEnv(
|
||||||
applyconfcore.EnvVar().WithName("HOST_MODE").WithValue("1"),
|
applyconfcore.EnvVar().WithName(shared.HostModeEnvVar).WithValue("1"),
|
||||||
applyconfcore.EnvVar().WithName("AGGREGATOR_ADDRESS").WithValue(aggregatorPodIp),
|
applyconfcore.EnvVar().WithName(shared.TappedAddressesPerNodeDictEnvVar).WithValue(string(nodeToTappedPodIPMapJsonStr)),
|
||||||
applyconfcore.EnvVar().WithName("TAPPED_ADDRESSES_PER_HOST").WithValue(string(nodeToTappedPodIPMapJsonStr)),
|
|
||||||
)
|
)
|
||||||
agentContainer.WithEnv(
|
agentContainer.WithEnv(
|
||||||
applyconfcore.EnvVar().WithName("NODE_NAME").WithValueFrom(
|
applyconfcore.EnvVar().WithName(shared.NodeNameEnvVar).WithValueFrom(
|
||||||
applyconfcore.EnvVarSource().WithFieldRef(
|
applyconfcore.EnvVarSource().WithFieldRef(
|
||||||
applyconfcore.ObjectFieldSelector().WithAPIVersion("v1").WithFieldPath("spec.nodeName"),
|
applyconfcore.ObjectFieldSelector().WithAPIVersion("v1").WithFieldPath("spec.nodeName"),
|
||||||
),
|
),
|
||||||
|
8
shared/consts.go
Normal file
8
shared/consts.go
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
package shared
|
||||||
|
|
||||||
|
const (
|
||||||
|
MizuFilteringOptionsEnvVar = "SENSITIVE_DATA_FILTERING_OPTIONS"
|
||||||
|
HostModeEnvVar = "HOST_MODE"
|
||||||
|
NodeNameEnvVar = "NODE_NAME"
|
||||||
|
TappedAddressesPerNodeDictEnvVar = "TAPPED_ADDRESSES_PER_HOST"
|
||||||
|
)
|
@ -33,3 +33,7 @@ func CreateWebSocketStatusMessage(tappingStatus TapStatus) WebSocketStatusMessag
|
|||||||
TappingStatus: tappingStatus,
|
TappingStatus: tappingStatus,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type TrafficFilteringOptions struct {
|
||||||
|
PlainTextMaskingRegexes []*SerializableRegexp
|
||||||
|
}
|
||||||
|
30
shared/serializableRegexp.go
Normal file
30
shared/serializableRegexp.go
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
package shared
|
||||||
|
|
||||||
|
import "regexp"
|
||||||
|
|
||||||
|
type SerializableRegexp struct {
|
||||||
|
regexp.Regexp
|
||||||
|
}
|
||||||
|
|
||||||
|
func CompileRegexToSerializableRegexp(expr string) (*SerializableRegexp, error) {
|
||||||
|
re, err := regexp.Compile(expr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &SerializableRegexp{*re}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalText is by json.Unmarshal.
|
||||||
|
func (r *SerializableRegexp) UnmarshalText(text []byte) error {
|
||||||
|
rr, err := CompileRegexToSerializableRegexp(string(text))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
*r = *rr
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarshalText is used by json.Marshal.
|
||||||
|
func (r *SerializableRegexp) MarshalText() ([]byte, error) {
|
||||||
|
return []byte(r.String()), nil
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user