mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-09-01 02:30:33 +00:00
Refactoring (#84)
* Only use one channel for filtering HARs. * Only check if dest is service ip if message is outgoing. * Parse direction flag on input. * Renamed filterHarHeaders -> filterHarItems. * Fixed compilation bugs.
This commit is contained in:
27
api/main.go
27
api/main.go
@@ -35,12 +35,10 @@ func main() {
|
|||||||
|
|
||||||
if *standalone {
|
if *standalone {
|
||||||
harOutputChannel, outboundLinkOutputChannel := tap.StartPassiveTapper(tapOpts)
|
harOutputChannel, outboundLinkOutputChannel := tap.StartPassiveTapper(tapOpts)
|
||||||
filteredHarChannel0 := make(chan *tap.OutputChannelItem)
|
filteredHarChannel := make(chan *tap.OutputChannelItem)
|
||||||
filteredHarChannel1 := make(chan *tap.OutputChannelItem)
|
|
||||||
|
|
||||||
go filterServices(harOutputChannel, filteredHarChannel0, getTrafficFilteringOptions())
|
go filterHarItems(harOutputChannel, filteredHarChannel, getTrafficFilteringOptions())
|
||||||
go filterHarHeaders(filteredHarChannel0, filteredHarChannel1, getTrafficFilteringOptions())
|
go api.StartReadingEntries(filteredHarChannel, nil)
|
||||||
go api.StartReadingEntries(filteredHarChannel1, nil)
|
|
||||||
go api.StartReadingOutbound(outboundLinkOutputChannel)
|
go api.StartReadingOutbound(outboundLinkOutputChannel)
|
||||||
|
|
||||||
hostApi(nil)
|
hostApi(nil)
|
||||||
@@ -66,12 +64,10 @@ func main() {
|
|||||||
go api.StartReadingOutbound(outboundLinkOutputChannel)
|
go api.StartReadingOutbound(outboundLinkOutputChannel)
|
||||||
} else if *aggregator {
|
} else if *aggregator {
|
||||||
socketHarOutChannel := make(chan *tap.OutputChannelItem, 1000)
|
socketHarOutChannel := make(chan *tap.OutputChannelItem, 1000)
|
||||||
filteredHarChannel0 := make(chan *tap.OutputChannelItem)
|
filteredHarChannel := make(chan *tap.OutputChannelItem)
|
||||||
filteredHarChannel1 := make(chan *tap.OutputChannelItem)
|
|
||||||
|
|
||||||
go filterServices(socketHarOutChannel, filteredHarChannel0, getTrafficFilteringOptions())
|
go filterHarItems(socketHarOutChannel, filteredHarChannel, getTrafficFilteringOptions())
|
||||||
go filterHarHeaders(filteredHarChannel0, filteredHarChannel1, getTrafficFilteringOptions())
|
go api.StartReadingEntries(filteredHarChannel, nil)
|
||||||
go api.StartReadingEntries(filteredHarChannel1, nil)
|
|
||||||
|
|
||||||
hostApi(socketHarOutChannel)
|
hostApi(socketHarOutChannel)
|
||||||
}
|
}
|
||||||
@@ -129,19 +125,14 @@ func getTrafficFilteringOptions() *shared.TrafficFilteringOptions {
|
|||||||
return &filteringOptions
|
return &filteringOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
func filterServices(inChannel <- chan *tap.OutputChannelItem, outChannel chan *tap.OutputChannelItem, filterOptions *shared.TrafficFilteringOptions) {
|
func filterHarItems(inChannel <- chan *tap.OutputChannelItem, outChannel chan *tap.OutputChannelItem, filterOptions *shared.TrafficFilteringOptions) {
|
||||||
for message := range inChannel {
|
for message := range inChannel {
|
||||||
if api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) {
|
if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
outChannel <- message
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func filterHarHeaders(inChannel <- chan *tap.OutputChannelItem, outChannel chan *tap.OutputChannelItem, filterOptions *shared.TrafficFilteringOptions) {
|
|
||||||
for message := range inChannel {
|
|
||||||
sensitiveDataFiltering.FilterSensitiveInfoFromHarRequest(message, filterOptions)
|
sensitiveDataFiltering.FilterSensitiveInfoFromHarRequest(message, filterOptions)
|
||||||
|
|
||||||
outChannel <- message
|
outChannel <- message
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -3,8 +3,10 @@ package cmd
|
|||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/up9inc/mizu/cli/mizu"
|
|
||||||
"regexp"
|
"regexp"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/up9inc/mizu/cli/mizu"
|
||||||
|
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
)
|
)
|
||||||
@@ -17,11 +19,12 @@ type MizuTapOptions struct {
|
|||||||
MizuImage string
|
MizuImage string
|
||||||
MizuPodPort uint16
|
MizuPodPort uint16
|
||||||
PlainTextFilterRegexes []string
|
PlainTextFilterRegexes []string
|
||||||
Direction string
|
TapOutgoing bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
var mizuTapOptions = &MizuTapOptions{}
|
var mizuTapOptions = &MizuTapOptions{}
|
||||||
|
var direction string
|
||||||
|
|
||||||
var tapCmd = &cobra.Command{
|
var tapCmd = &cobra.Command{
|
||||||
Use: "tap [POD REGEX]",
|
Use: "tap [POD REGEX]",
|
||||||
@@ -40,8 +43,13 @@ var tapCmd = &cobra.Command{
|
|||||||
return errors.New(fmt.Sprintf("%s is not a valid regex %s", args[0], err))
|
return errors.New(fmt.Sprintf("%s is not a valid regex %s", args[0], err))
|
||||||
}
|
}
|
||||||
|
|
||||||
if mizuTapOptions.Direction != "in" && mizuTapOptions.Direction != "any" {
|
directionLowerCase := strings.ToLower(direction)
|
||||||
return errors.New(fmt.Sprintf("%s is not a valid value for flag --direction. Acceptable values are in/any.", mizuTapOptions.Direction))
|
if directionLowerCase == "any" {
|
||||||
|
mizuTapOptions.TapOutgoing = true
|
||||||
|
} else if directionLowerCase == "in" {
|
||||||
|
mizuTapOptions.TapOutgoing = false
|
||||||
|
} else {
|
||||||
|
return errors.New(fmt.Sprintf("%s is not a valid value for flag --direction. Acceptable values are in/any.", direction))
|
||||||
}
|
}
|
||||||
|
|
||||||
RunMizuTap(regex, mizuTapOptions)
|
RunMizuTap(regex, mizuTapOptions)
|
||||||
@@ -59,5 +67,5 @@ func init() {
|
|||||||
tapCmd.Flags().StringVarP(&mizuTapOptions.MizuImage, "mizu-image", "", fmt.Sprintf("gcr.io/up9-docker-hub/mizu/%s:latest", mizu.Branch), "Custom image for mizu collector")
|
tapCmd.Flags().StringVarP(&mizuTapOptions.MizuImage, "mizu-image", "", fmt.Sprintf("gcr.io/up9-docker-hub/mizu/%s:latest", mizu.Branch), "Custom image for mizu collector")
|
||||||
tapCmd.Flags().Uint16VarP(&mizuTapOptions.MizuPodPort, "mizu-port", "", 8899, "Port which mizu cli will attempt to forward from the mizu collector pod")
|
tapCmd.Flags().Uint16VarP(&mizuTapOptions.MizuPodPort, "mizu-port", "", 8899, "Port which mizu cli will attempt to forward from the mizu collector pod")
|
||||||
tapCmd.Flags().StringArrayVarP(&mizuTapOptions.PlainTextFilterRegexes, "regex-masking", "r", nil, "List of regex expressions that are used to filter matching values from text/plain http bodies")
|
tapCmd.Flags().StringArrayVarP(&mizuTapOptions.PlainTextFilterRegexes, "regex-masking", "r", nil, "List of regex expressions that are used to filter matching values from text/plain http bodies")
|
||||||
tapCmd.Flags().StringVarP(&mizuTapOptions.Direction, "direction", "", "in", "Record traffic that goes in this direction (relative to the tapped pod): in/any")
|
tapCmd.Flags().StringVarP(&direction, "direction", "", "in", "Record traffic that goes in this direction (relative to the tapped pod): in/any")
|
||||||
}
|
}
|
||||||
|
@@ -121,7 +121,7 @@ func createMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provi
|
|||||||
fmt.Sprintf("%s.%s.svc.cluster.local", aggregatorService.Name, aggregatorService.Namespace),
|
fmt.Sprintf("%s.%s.svc.cluster.local", aggregatorService.Name, aggregatorService.Namespace),
|
||||||
nodeToTappedPodIPMap,
|
nodeToTappedPodIPMap,
|
||||||
mizuServiceAccountExists,
|
mizuServiceAccountExists,
|
||||||
tappingOptions.Direction,
|
tappingOptions.TapOutgoing,
|
||||||
); err != nil {
|
); err != nil {
|
||||||
fmt.Printf("Error creating mizu tapper daemonset: %v\n", err)
|
fmt.Printf("Error creating mizu tapper daemonset: %v\n", err)
|
||||||
return err
|
return err
|
||||||
|
@@ -226,7 +226,7 @@ func (provider *Provider) RemoveDaemonSet(ctx context.Context, namespace string,
|
|||||||
return provider.clientSet.AppsV1().DaemonSets(namespace).Delete(ctx, daemonSetName, metav1.DeleteOptions{})
|
return provider.clientSet.AppsV1().DaemonSets(namespace).Delete(ctx, daemonSetName, metav1.DeleteOptions{})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, aggregatorPodIp string, nodeToTappedPodIPMap map[string][]string, linkServiceAccount bool, direction string) error {
|
func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, aggregatorPodIp string, nodeToTappedPodIPMap map[string][]string, linkServiceAccount bool, tapOutgoing bool) error {
|
||||||
nodeToTappedPodIPMapJsonStr, err := json.Marshal(nodeToTappedPodIPMap)
|
nodeToTappedPodIPMapJsonStr, err := json.Marshal(nodeToTappedPodIPMap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -239,7 +239,7 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
|
|||||||
"--hardump",
|
"--hardump",
|
||||||
"--aggregator-address", fmt.Sprintf("ws://%s/wsTapper", aggregatorPodIp),
|
"--aggregator-address", fmt.Sprintf("ws://%s/wsTapper", aggregatorPodIp),
|
||||||
}
|
}
|
||||||
if direction == "any" {
|
if tapOutgoing {
|
||||||
mizuCmd = append(mizuCmd, "--anydirection")
|
mizuCmd = append(mizuCmd, "--anydirection")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user