Merge branch 'develop' into main

# Conflicts:
#	README.md
#	api/main.go
#	api/pkg/api/main.go
#	api/pkg/models/models.go
#	api/pkg/resolver/resolver.go
#	cli/Makefile
#	cli/cmd/tap.go
#	cli/cmd/tapRunner.go
#	tap/http_matcher.go
#	tap/http_reader.go
#	tap/tcp_stream_factory.go
This commit is contained in:
Roee Gadot 2021-06-29 11:16:47 +03:00
commit 9b72cc7aa6
20 changed files with 285 additions and 82 deletions

View File

@ -14,13 +14,6 @@ https://github.com/up9inc/mizu/releases/latest/download/mizu_darwin_amd64 \
&& chmod 755 mizu && chmod 755 mizu
``` ```
* for MacOS - Apple Silicon
```
curl -Lo mizu \
https://github.com/up9inc/mizu/releases/latest/download/mizu_darwin_arm64 \
&& chmod 755 mizu
```
* for Linux - Intel 64bit * for Linux - Intel 64bit
``` ```
curl -Lo mizu \ curl -Lo mizu \
@ -28,7 +21,6 @@ https://github.com/up9inc/mizu/releases/latest/download/mizu_linux_amd64 \
&& chmod 755 mizu && chmod 755 mizu
``` ```
SHA256 checksums are available on the [Releases](https://github.com/up9inc/mizu/releases) page. SHA256 checksums are available on the [Releases](https://github.com/up9inc/mizu/releases) page.
### Development (unstable) build ### Development (unstable) build

View File

@ -37,7 +37,7 @@ func main() {
harOutputChannel, outboundLinkOutputChannel := tap.StartPassiveTapper(tapOpts) harOutputChannel, outboundLinkOutputChannel := tap.StartPassiveTapper(tapOpts)
filteredHarChannel := make(chan *tap.OutputChannelItem) filteredHarChannel := make(chan *tap.OutputChannelItem)
go filterHarHeaders(harOutputChannel, filteredHarChannel, getTrafficFilteringOptions()) go filterHarItems(harOutputChannel, filteredHarChannel, getTrafficFilteringOptions())
go api.StartReadingEntries(filteredHarChannel, nil) go api.StartReadingEntries(filteredHarChannel, nil)
go api.StartReadingOutbound(outboundLinkOutputChannel) go api.StartReadingOutbound(outboundLinkOutputChannel)
@ -66,8 +66,8 @@ func main() {
socketHarOutChannel := make(chan *tap.OutputChannelItem, 1000) socketHarOutChannel := make(chan *tap.OutputChannelItem, 1000)
filteredHarChannel := make(chan *tap.OutputChannelItem) filteredHarChannel := make(chan *tap.OutputChannelItem)
go filterHarItems(socketHarOutChannel, filteredHarChannel, getTrafficFilteringOptions())
go api.StartReadingEntries(filteredHarChannel, nil) go api.StartReadingEntries(filteredHarChannel, nil)
go filterHarHeaders(socketHarOutChannel, filteredHarChannel, getTrafficFilteringOptions())
hostApi(socketHarOutChannel) hostApi(socketHarOutChannel)
} }
@ -125,9 +125,14 @@ func getTrafficFilteringOptions() *shared.TrafficFilteringOptions {
return &filteringOptions return &filteringOptions
} }
func filterHarHeaders(inChannel <- chan *tap.OutputChannelItem, outChannel chan *tap.OutputChannelItem, filterOptions *shared.TrafficFilteringOptions) { func filterHarItems(inChannel <- chan *tap.OutputChannelItem, outChannel chan *tap.OutputChannelItem, filterOptions *shared.TrafficFilteringOptions) {
for message := range inChannel { for message := range inChannel {
if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) {
continue
}
sensitiveDataFiltering.FilterSensitiveInfoFromHarRequest(message, filterOptions) sensitiveDataFiltering.FilterSensitiveInfoFromHarRequest(message, filterOptions)
outChannel <- message outChannel <- message
} }
} }

View File

@ -84,7 +84,7 @@ func startReadingFiles(workingDir string) {
for _, entry := range inputHar.Log.Entries { for _, entry := range inputHar.Log.Entries {
time.Sleep(time.Millisecond * 250) time.Sleep(time.Millisecond * 250)
saveHarToDb(entry, fileInfo.Name()) saveHarToDb(entry, fileInfo.Name(), false)
} }
rmErr := os.Remove(inputFilePath) rmErr := os.Remove(inputFilePath)
utils.CheckErr(rmErr) utils.CheckErr(rmErr)
@ -97,7 +97,7 @@ func startReadingChannel(outputItems <-chan *tap.OutputChannelItem) {
} }
for item := range outputItems { for item := range outputItems {
saveHarToDb(item.HarEntry, item.ConnectionInfo.ClientIP) saveHarToDb(item.HarEntry, item.ConnectionInfo.ClientIP, item.ConnectionInfo.IsOutgoing)
} }
} }
@ -109,7 +109,7 @@ func StartReadingOutbound(outboundLinkChannel <-chan *tap.OutboundLink) {
} }
func saveHarToDb(entry *har.Entry, sender string) { func saveHarToDb(entry *har.Entry, sender string, isOutgoing bool) {
entryBytes, _ := json.Marshal(entry) entryBytes, _ := json.Marshal(entry)
serviceName, urlPath, serviceHostName := getServiceNameFromUrl(entry.Request.URL) serviceName, urlPath, serviceHostName := getServiceNameFromUrl(entry.Request.URL)
entryId := primitive.NewObjectID().Hex() entryId := primitive.NewObjectID().Hex()
@ -133,6 +133,7 @@ func saveHarToDb(entry *har.Entry, sender string) {
Timestamp: entry.StartedDateTime.UnixNano() / int64(time.Millisecond), Timestamp: entry.StartedDateTime.UnixNano() / int64(time.Millisecond),
ResolvedSource: resolvedSource, ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination, ResolvedDestination: resolvedDestination,
IsOutgoing: isOutgoing,
} }
database.GetEntriesTable().Create(&mizuEntry) database.GetEntriesTable().Create(&mizuEntry)
@ -146,3 +147,7 @@ func getServiceNameFromUrl(inputUrl string) (string, string, string) {
utils.CheckErr(err) utils.CheckErr(err)
return fmt.Sprintf("%s://%s", parsed.Scheme, parsed.Host), parsed.Path, parsed.Host return fmt.Sprintf("%s://%s", parsed.Scheme, parsed.Host), parsed.Path, parsed.Host
} }
func CheckIsServiceIP(address string) bool {
return k8sResolver.CheckIsServiceIP(address)
}

View File

@ -23,6 +23,7 @@ type MizuEntry struct {
Path string `json:"path" gorm:"column:path"` Path string `json:"path" gorm:"column:path"`
ResolvedSource string `json:"resolvedSource,omitempty" gorm:"column:resolvedSource"` ResolvedSource string `json:"resolvedSource,omitempty" gorm:"column:resolvedSource"`
ResolvedDestination string `json:"resolvedDestination,omitempty" gorm:"column:resolvedDestination"` ResolvedDestination string `json:"resolvedDestination,omitempty" gorm:"column:resolvedDestination"`
IsOutgoing bool `json:"isOutgoing,omitempty" gorm:"column:isOutgoing"`
} }
type BaseEntryDetails struct { type BaseEntryDetails struct {
@ -34,6 +35,7 @@ type BaseEntryDetails struct {
StatusCode int `json:"statusCode,omitempty"` StatusCode int `json:"statusCode,omitempty"`
Method string `json:"method,omitempty"` Method string `json:"method,omitempty"`
Timestamp int64 `json:"timestamp,omitempty"` Timestamp int64 `json:"timestamp,omitempty"`
IsOutgoing bool `json:"isOutgoing,omitempty"`
} }
type EntryData struct { type EntryData struct {

View File

@ -21,7 +21,7 @@ func NewFromInCluster(errOut chan error) (*Resolver, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &Resolver{clientConfig: config, clientSet: clientset, nameMap: make(map[string]string), errOut: errOut}, nil return &Resolver{clientConfig: config, clientSet: clientset, nameMap: make(map[string]string), serviceMap: make(map[string]string), errOut: errOut}, nil
} }
func NewFromOutOfCluster(kubeConfigPath string, errOut chan error) (*Resolver, error) { func NewFromOutOfCluster(kubeConfigPath string, errOut chan error) (*Resolver, error) {
@ -53,9 +53,9 @@ func NewFromOutOfCluster(kubeConfigPath string, errOut chan error) (*Resolver, e
return nil, err return nil, err
} }
return &Resolver{clientConfig: clientConfig, clientSet: clientset, nameMap: make(map[string]string), errOut: errOut}, nil return &Resolver{clientConfig: clientConfig, clientSet: clientset, nameMap: make(map[string]string), serviceMap: make(map[string]string), errOut: errOut}, nil
} }
func NewFromExisting(clientConfig *restclient.Config, clientSet *kubernetes.Clientset, errOut chan error) *Resolver { func NewFromExisting(clientConfig *restclient.Config, clientSet *kubernetes.Clientset, errOut chan error) *Resolver {
return &Resolver{clientConfig: clientConfig, clientSet: clientSet, nameMap: make(map[string]string), errOut: errOut} return &Resolver{clientConfig: clientConfig, clientSet: clientSet, nameMap: make(map[string]string), serviceMap: make(map[string]string), errOut: errOut}
} }

View File

@ -20,6 +20,7 @@ type Resolver struct {
clientConfig *restclient.Config clientConfig *restclient.Config
clientSet *kubernetes.Clientset clientSet *kubernetes.Clientset
nameMap map[string]string nameMap map[string]string
serviceMap map[string]string
isStarted bool isStarted bool
errOut chan error errOut chan error
} }
@ -41,6 +42,11 @@ func (resolver *Resolver) Resolve(name string) string {
return resolvedName return resolvedName
} }
func (resolver *Resolver) CheckIsServiceIP(address string) bool {
_, isFound := resolver.serviceMap[address]
return isFound
}
func (resolver *Resolver) watchPods(ctx context.Context) error { func (resolver *Resolver) watchPods(ctx context.Context) error {
// empty namespace makes the client watch all namespaces // empty namespace makes the client watch all namespaces
watcher, err := resolver.clientSet.CoreV1().Pods("").Watch(ctx, metav1.ListOptions{Watch: true}) watcher, err := resolver.clientSet.CoreV1().Pods("").Watch(ctx, metav1.ListOptions{Watch: true})
@ -124,6 +130,7 @@ func (resolver *Resolver) watchServices(ctx context.Context) error {
serviceHostname := fmt.Sprintf("%s.%s", service.Name, service.Namespace) serviceHostname := fmt.Sprintf("%s.%s", service.Name, service.Namespace)
if service.Spec.ClusterIP != "" && service.Spec.ClusterIP != kubClientNullString { if service.Spec.ClusterIP != "" && service.Spec.ClusterIP != kubClientNullString {
resolver.saveResolvedName(service.Spec.ClusterIP, serviceHostname, event.Type) resolver.saveResolvedName(service.Spec.ClusterIP, serviceHostname, event.Type)
resolver.saveServiceIP(service.Spec.ClusterIP, serviceHostname, event.Type)
} }
if service.Status.LoadBalancer.Ingress != nil { if service.Status.LoadBalancer.Ingress != nil {
for _, ingress := range service.Status.LoadBalancer.Ingress { for _, ingress := range service.Status.LoadBalancer.Ingress {
@ -147,6 +154,14 @@ func (resolver *Resolver) saveResolvedName(key string, resolved string, eventTyp
} }
} }
func (resolver *Resolver) saveServiceIP(key string, resolved string, eventType watch.EventType) {
if eventType == watch.Deleted {
delete(resolver.serviceMap, key)
} else {
resolver.serviceMap[key] = resolved
}
}
func (resolver *Resolver) infiniteErrorHandleRetryFunc(ctx context.Context, fun func(ctx context.Context) error) { func (resolver *Resolver) infiniteErrorHandleRetryFunc(ctx context.Context, fun func(ctx context.Context) error) {
for { for {
err := fun(ctx) err := fun(ctx)

View File

@ -70,14 +70,15 @@ func GetResolvedBaseEntry(entry models.MizuEntry) models.BaseEntryDetails {
service = SetHostname(service, entry.ResolvedDestination) service = SetHostname(service, entry.ResolvedDestination)
} }
return models.BaseEntryDetails{ return models.BaseEntryDetails{
Id: entry.EntryId, Id: entry.EntryId,
Url: entryUrl, Url: entryUrl,
Service: service, Service: service,
Path: entry.Path, Path: entry.Path,
StatusCode: entry.Status, StatusCode: entry.Status,
Method: entry.Method, Method: entry.Method,
Timestamp: entry.Timestamp, Timestamp: entry.Timestamp,
RequestSenderIp: entry.RequestSenderIp, RequestSenderIp: entry.RequestSenderIp,
IsOutgoing: entry.IsOutgoing,
} }
} }

View File

@ -26,10 +26,10 @@ build-all: ## build for all supported platforms
@mkdir -p bin && echo "SHA256 checksums available for compiled binaries \n\nRun \`shasum -a 256 -c mizu_OS_ARCH.sha256\` to verify\n\n" > bin/README.md @mkdir -p bin && echo "SHA256 checksums available for compiled binaries \n\nRun \`shasum -a 256 -c mizu_OS_ARCH.sha256\` to verify\n\n" > bin/README.md
@$(MAKE) build GOOS=darwin GOARCH=amd64 @$(MAKE) build GOOS=darwin GOARCH=amd64
@$(MAKE) build GOOS=linux GOARCH=amd64 @$(MAKE) build GOOS=linux GOARCH=amd64
@# $(MAKE) build GOOS=darwin GOARCH=arm64
@# $(MAKE) GOOS=windows GOARCH=amd64 @# $(MAKE) GOOS=windows GOARCH=amd64
@# $(MAKE) GOOS=linux GOARCH=386 @# $(MAKE) GOOS=linux GOARCH=386
@# $(MAKE) GOOS=windows GOARCH=386 @# $(MAKE) GOOS=windows GOARCH=386
@$(MAKE) GOOS=darwin GOARCH=arm64
@# $(MAKE) GOOS=linux GOARCH=arm64 @# $(MAKE) GOOS=linux GOARCH=arm64
@# $(MAKE) GOOS=windows GOARCH=arm64 @# $(MAKE) GOOS=windows GOARCH=arm64
@echo "---------" @echo "---------"

View File

@ -3,8 +3,10 @@ package cmd
import ( import (
"errors" "errors"
"fmt" "fmt"
"github.com/up9inc/mizu/cli/mizu"
"regexp" "regexp"
"strings"
"github.com/up9inc/mizu/cli/mizu"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -17,10 +19,12 @@ type MizuTapOptions struct {
MizuImage string MizuImage string
MizuPodPort uint16 MizuPodPort uint16
PlainTextFilterRegexes []string PlainTextFilterRegexes []string
TapOutgoing bool
} }
var mizuTapOptions = &MizuTapOptions{} var mizuTapOptions = &MizuTapOptions{}
var direction string
var tapCmd = &cobra.Command{ var tapCmd = &cobra.Command{
Use: "tap [POD REGEX]", Use: "tap [POD REGEX]",
@ -39,6 +43,15 @@ var tapCmd = &cobra.Command{
return errors.New(fmt.Sprintf("%s is not a valid regex %s", args[0], err)) return errors.New(fmt.Sprintf("%s is not a valid regex %s", args[0], err))
} }
directionLowerCase := strings.ToLower(direction)
if directionLowerCase == "any" {
mizuTapOptions.TapOutgoing = true
} else if directionLowerCase == "in" {
mizuTapOptions.TapOutgoing = false
} else {
return errors.New(fmt.Sprintf("%s is not a valid value for flag --direction. Acceptable values are in/any.", direction))
}
RunMizuTap(regex, mizuTapOptions) RunMizuTap(regex, mizuTapOptions)
return nil return nil
}, },
@ -54,4 +67,5 @@ func init() {
tapCmd.Flags().StringVarP(&mizuTapOptions.MizuImage, "mizu-image", "", fmt.Sprintf("gcr.io/up9-docker-hub/mizu/%s:latest", mizu.Branch), "Custom image for mizu collector") tapCmd.Flags().StringVarP(&mizuTapOptions.MizuImage, "mizu-image", "", fmt.Sprintf("gcr.io/up9-docker-hub/mizu/%s:latest", mizu.Branch), "Custom image for mizu collector")
tapCmd.Flags().Uint16VarP(&mizuTapOptions.MizuPodPort, "mizu-port", "", 8899, "Port which mizu cli will attempt to forward from the mizu collector pod") tapCmd.Flags().Uint16VarP(&mizuTapOptions.MizuPodPort, "mizu-port", "", 8899, "Port which mizu cli will attempt to forward from the mizu collector pod")
tapCmd.Flags().StringArrayVarP(&mizuTapOptions.PlainTextFilterRegexes, "regex-masking", "r", nil, "List of regex expressions that are used to filter matching values from text/plain http bodies") tapCmd.Flags().StringArrayVarP(&mizuTapOptions.PlainTextFilterRegexes, "regex-masking", "r", nil, "List of regex expressions that are used to filter matching values from text/plain http bodies")
tapCmd.Flags().StringVarP(&direction, "direction", "", "in", "Record traffic that goes in this direction (relative to the tapped pod): in/any")
} }

View File

@ -3,13 +3,14 @@ package cmd
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/up9inc/mizu/shared"
"os" "os"
"os/signal" "os/signal"
"regexp" "regexp"
"syscall" "syscall"
"time" "time"
"github.com/up9inc/mizu/shared"
core "k8s.io/api/core/v1" core "k8s.io/api/core/v1"
"github.com/up9inc/mizu/cli/debounce" "github.com/up9inc/mizu/cli/debounce"
@ -45,6 +46,22 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) {
currentlyTappedPods = matchingPods currentlyTappedPods = matchingPods
} }
var namespacesStr string
if targetNamespace != mizu.K8sAllNamespaces {
namespacesStr = fmt.Sprintf("namespace \"%s\"", targetNamespace)
} else {
namespacesStr = "all namespaces"
}
fmt.Printf("Tapping pods in %s\n", namespacesStr)
if len(currentlyTappedPods) == 0 {
var suggestionStr string
if targetNamespace != mizu.K8sAllNamespaces {
suggestionStr = "\nSelect a different namespace with -n or tap all namespaces with -A"
}
fmt.Printf("Did not find any pods matching the regex argument%s\n", suggestionStr)
}
nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(currentlyTappedPods) nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(currentlyTappedPods)
if err != nil { if err != nil {
return return
@ -60,8 +77,6 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) {
//block until exit signal or error //block until exit signal or error
waitForFinish(ctx, cancel) waitForFinish(ctx, cancel)
// TODO handle incoming traffic from tapper using a channel
} }
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions, mizuApiFilteringOptions *shared.TrafficFilteringOptions) error { func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions, mizuApiFilteringOptions *shared.TrafficFilteringOptions) error {
@ -69,7 +84,7 @@ func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Pro
return err return err
} }
if err := createMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil { if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil {
return err return err
} }
@ -113,19 +128,27 @@ func getMizuApiFilteringOptions(tappingOptions *MizuTapOptions) (*shared.Traffic
return &shared.TrafficFilteringOptions{PlainTextMaskingRegexes: compiledRegexSlice}, nil return &shared.TrafficFilteringOptions{PlainTextMaskingRegexes: compiledRegexSlice}, nil
} }
func createMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions) error { func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions) error {
if err := kubernetesProvider.ApplyMizuTapperDaemonSet( if len(nodeToTappedPodIPMap) > 0 {
ctx, if err := kubernetesProvider.ApplyMizuTapperDaemonSet(
mizu.ResourcesNamespace, ctx,
mizu.TapperDaemonSetName, mizu.ResourcesNamespace,
tappingOptions.MizuImage, mizu.TapperDaemonSetName,
mizu.TapperPodName, tappingOptions.MizuImage,
fmt.Sprintf("%s.%s.svc.cluster.local", aggregatorService.Name, aggregatorService.Namespace), mizu.TapperPodName,
nodeToTappedPodIPMap, fmt.Sprintf("%s.%s.svc.cluster.local", aggregatorService.Name, aggregatorService.Namespace),
mizuServiceAccountExists, nodeToTappedPodIPMap,
); err != nil { mizuServiceAccountExists,
fmt.Printf("Error creating mizu tapper daemonset: %v\n", err) tappingOptions.TapOutgoing,
return err ); err != nil {
fmt.Printf("Error creating mizu tapper daemonset: %v\n", err)
return err
}
} else {
if err := kubernetesProvider.RemoveDaemonSet(ctx, mizu.ResourcesNamespace, mizu.TapperDaemonSetName); err != nil {
fmt.Printf("Error deleting mizu tapper daemonset: %v\n", err)
return err
}
} }
return nil return nil
@ -165,7 +188,7 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
cancel() cancel()
} }
if err := createMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil { if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil {
fmt.Printf("Error updating daemonset: %s (%v,%+v)\n", err, err, err) fmt.Printf("Error updating daemonset: %s (%v,%+v)\n", err, err, err)
cancel() cancel()
} }

View File

@ -102,7 +102,6 @@ func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace
}, },
DNSPolicy: core.DNSClusterFirstWithHostNet, DNSPolicy: core.DNSClusterFirstWithHostNet,
TerminationGracePeriodSeconds: new(int64), TerminationGracePeriodSeconds: new(int64),
// Affinity: TODO: define node selector for all relevant nodes for this mizu instance
}, },
} }
//define the service account only when it exists to prevent pod crash //define the service account only when it exists to prevent pod crash
@ -215,30 +214,117 @@ func (provider *Provider) CreateMizuRBAC(ctx context.Context, namespace string,
} }
func (provider *Provider) RemovePod(ctx context.Context, namespace string, podName string) error { func (provider *Provider) RemovePod(ctx context.Context, namespace string, podName string) error {
if isFound, err := provider.CheckPodExists(ctx, namespace, podName);
err != nil {
return err
} else if !isFound {
return nil
}
return provider.clientSet.CoreV1().Pods(namespace).Delete(ctx, podName, metav1.DeleteOptions{}) return provider.clientSet.CoreV1().Pods(namespace).Delete(ctx, podName, metav1.DeleteOptions{})
} }
func (provider *Provider) RemoveService(ctx context.Context, namespace string, serviceName string) error { func (provider *Provider) RemoveService(ctx context.Context, namespace string, serviceName string) error {
if isFound, err := provider.CheckServiceExists(ctx, namespace, serviceName);
err != nil {
return err
} else if !isFound {
return nil
}
return provider.clientSet.CoreV1().Services(namespace).Delete(ctx, serviceName, metav1.DeleteOptions{}) return provider.clientSet.CoreV1().Services(namespace).Delete(ctx, serviceName, metav1.DeleteOptions{})
} }
func (provider *Provider) RemoveDaemonSet(ctx context.Context, namespace string, daemonSetName string) error { func (provider *Provider) RemoveDaemonSet(ctx context.Context, namespace string, daemonSetName string) error {
if isFound, err := provider.CheckDaemonSetExists(ctx, namespace, daemonSetName);
err != nil {
return err
} else if !isFound {
return nil
}
return provider.clientSet.AppsV1().DaemonSets(namespace).Delete(ctx, daemonSetName, metav1.DeleteOptions{}) return provider.clientSet.AppsV1().DaemonSets(namespace).Delete(ctx, daemonSetName, metav1.DeleteOptions{})
} }
func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, aggregatorPodIp string, nodeToTappedPodIPMap map[string][]string, linkServiceAccount bool) error { func (provider *Provider) CheckPodExists(ctx context.Context, namespace string, name string) (bool, error) {
listOptions := metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", name),
Limit: 1,
}
resourceList, err := provider.clientSet.CoreV1().Pods(namespace).List(ctx, listOptions)
if err != nil {
return false, err
}
if len(resourceList.Items) > 0 {
return true, nil
}
return false, nil
}
func (provider *Provider) CheckServiceExists(ctx context.Context, namespace string, name string) (bool, error) {
listOptions := metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", name),
Limit: 1,
}
resourceList, err := provider.clientSet.CoreV1().Services(namespace).List(ctx, listOptions)
if err != nil {
return false, err
}
if len(resourceList.Items) > 0 {
return true, nil
}
return false, nil
}
func (provider *Provider) CheckDaemonSetExists(ctx context.Context, namespace string, name string) (bool, error) {
listOptions := metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", name),
Limit: 1,
}
resourceList, err := provider.clientSet.AppsV1().DaemonSets(namespace).List(ctx, listOptions)
if err != nil {
return false, err
}
if len(resourceList.Items) > 0 {
return true, nil
}
return false, nil
}
func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, aggregatorPodIp string, nodeToTappedPodIPMap map[string][]string, linkServiceAccount bool, tapOutgoing bool) error {
if len(nodeToTappedPodIPMap) == 0 {
return fmt.Errorf("Daemon set %s must tap at least 1 pod", daemonSetName)
}
nodeToTappedPodIPMapJsonStr, err := json.Marshal(nodeToTappedPodIPMap) nodeToTappedPodIPMapJsonStr, err := json.Marshal(nodeToTappedPodIPMap)
if err != nil { if err != nil {
return err return err
} }
mizuCmd := []string{
"./mizuagent",
"-i", "any",
"--tap",
"--hardump",
"--aggregator-address", fmt.Sprintf("ws://%s/wsTapper", aggregatorPodIp),
}
if tapOutgoing {
mizuCmd = append(mizuCmd, "--anydirection")
}
privileged := true privileged := true
agentContainer := applyconfcore.Container() agentContainer := applyconfcore.Container()
agentContainer.WithName(tapperPodName) agentContainer.WithName(tapperPodName)
agentContainer.WithImage(podImage) agentContainer.WithImage(podImage)
agentContainer.WithImagePullPolicy(core.PullAlways) agentContainer.WithImagePullPolicy(core.PullAlways)
agentContainer.WithSecurityContext(applyconfcore.SecurityContext().WithPrivileged(privileged)) agentContainer.WithSecurityContext(applyconfcore.SecurityContext().WithPrivileged(privileged))
agentContainer.WithCommand("./mizuagent", "-i", "any", "--tap", "--hardump", "--aggregator-address", fmt.Sprintf("ws://%s/wsTapper", aggregatorPodIp)) agentContainer.WithCommand(mizuCmd...)
agentContainer.WithEnv( agentContainer.WithEnv(
applyconfcore.EnvVar().WithName(shared.HostModeEnvVar).WithValue("1"), applyconfcore.EnvVar().WithName(shared.HostModeEnvVar).WithValue("1"),
applyconfcore.EnvVar().WithName(shared.TappedAddressesPerNodeDictEnvVar).WithValue(string(nodeToTappedPodIPMapJsonStr)), applyconfcore.EnvVar().WithName(shared.TappedAddressesPerNodeDictEnvVar).WithValue(string(nodeToTappedPodIPMapJsonStr)),

View File

@ -3,7 +3,7 @@ package shared
type WebSocketMessageType string type WebSocketMessageType string
const ( const (
WebSocketMessageTypeEntry WebSocketMessageType = "entry" WebSocketMessageTypeEntry WebSocketMessageType = "entry"
WebSocketMessageTypeTappedEntry WebSocketMessageType = "tappedEntry" WebSocketMessageTypeTappedEntry WebSocketMessageType = "tappedEntry"
WebSocketMessageTypeUpdateStatus WebSocketMessageType = "status" WebSocketMessageTypeUpdateStatus WebSocketMessageType = "status"
) )

View File

@ -14,18 +14,10 @@ type requestResponsePair struct {
Response httpMessage `json:"response"` Response httpMessage `json:"response"`
} }
type ConnectionInfo struct {
ClientIP string
ClientPort string
ServerIP string
ServerPort string
}
type httpMessage struct { type httpMessage struct {
isRequest bool isRequest bool
captureTime time.Time captureTime time.Time
orig interface{} orig interface{}
connectionInfo ConnectionInfo
} }
@ -44,18 +36,10 @@ func (matcher *requestResponseMatcher) registerRequest(ident string, request *ht
split := splitIdent(ident) split := splitIdent(ident)
key := genKey(split) key := genKey(split)
connectionInfo := &ConnectionInfo{
ClientIP: split[0],
ClientPort: split[2],
ServerIP: split[1],
ServerPort: split[3],
}
requestHTTPMessage := httpMessage{ requestHTTPMessage := httpMessage{
isRequest: true, isRequest: true,
captureTime: captureTime, captureTime: captureTime,
orig: request, orig: request,
connectionInfo: *connectionInfo,
} }
if response, found := matcher.openMessagesMap.Pop(key); found { if response, found := matcher.openMessagesMap.Pop(key); found {

View File

@ -24,6 +24,14 @@ type tcpID struct {
dstPort string dstPort string
} }
type ConnectionInfo struct {
ClientIP string
ClientPort string
ServerIP string
ServerPort string
IsOutgoing bool
}
func (tid *tcpID) String() string { func (tid *tcpID) String() string {
return fmt.Sprintf("%s->%s %s->%s", tid.srcIP, tid.dstIP, tid.srcPort, tid.dstPort) return fmt.Sprintf("%s->%s %s->%s", tid.srcIP, tid.dstIP, tid.srcPort, tid.dstPort)
} }
@ -38,6 +46,7 @@ type httpReader struct {
tcpID tcpID tcpID tcpID
isClient bool isClient bool
isHTTP2 bool isHTTP2 bool
isOutgoing bool
msgQueue chan httpReaderDataMsg // Channel of captured reassembled tcp payload msgQueue chan httpReaderDataMsg // Channel of captured reassembled tcp payload
data []byte data []byte
captureTime time.Time captureTime time.Time
@ -121,13 +130,28 @@ func (h *httpReader) handleHTTP2Stream() error {
} }
var reqResPair *requestResponsePair var reqResPair *requestResponsePair
var connectionInfo *ConnectionInfo
switch messageHTTP1 := messageHTTP1.(type) { switch messageHTTP1 := messageHTTP1.(type) {
case http.Request: case http.Request:
ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.srcIP, h.tcpID.dstIP, h.tcpID.srcPort, h.tcpID.dstPort, streamID) ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.srcIP, h.tcpID.dstIP, h.tcpID.srcPort, h.tcpID.dstPort, streamID)
connectionInfo = &ConnectionInfo{
ClientIP: h.tcpID.srcIP,
ClientPort: h.tcpID.srcPort,
ServerIP: h.tcpID.dstIP,
ServerPort: h.tcpID.dstPort,
IsOutgoing: h.isOutgoing,
}
reqResPair = reqResMatcher.registerRequest(ident, &messageHTTP1, h.captureTime) reqResPair = reqResMatcher.registerRequest(ident, &messageHTTP1, h.captureTime)
case http.Response: case http.Response:
ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.dstIP, h.tcpID.srcIP, h.tcpID.dstPort, h.tcpID.srcPort, streamID) ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.dstIP, h.tcpID.srcIP, h.tcpID.dstPort, h.tcpID.srcPort, streamID)
connectionInfo = &ConnectionInfo{
ClientIP: h.tcpID.dstIP,
ClientPort: h.tcpID.dstPort,
ServerIP: h.tcpID.srcIP,
ServerPort: h.tcpID.srcPort,
IsOutgoing: h.isOutgoing,
}
reqResPair = reqResMatcher.registerResponse(ident, &messageHTTP1, h.captureTime) reqResPair = reqResMatcher.registerResponse(ident, &messageHTTP1, h.captureTime)
} }
@ -140,7 +164,7 @@ func (h *httpReader) handleHTTP2Stream() error {
reqResPair.Request.captureTime, reqResPair.Request.captureTime,
reqResPair.Response.orig.(*http.Response), reqResPair.Response.orig.(*http.Response),
reqResPair.Response.captureTime, reqResPair.Response.captureTime,
&reqResPair.Request.connectionInfo, connectionInfo,
) )
} }
} }
@ -179,7 +203,13 @@ func (h *httpReader) handleHTTP1ClientStream(b *bufio.Reader) error {
reqResPair.Request.captureTime, reqResPair.Request.captureTime,
reqResPair.Response.orig.(*http.Response), reqResPair.Response.orig.(*http.Response),
reqResPair.Response.captureTime, reqResPair.Response.captureTime,
&reqResPair.Request.connectionInfo, &ConnectionInfo{
ClientIP: h.tcpID.srcIP,
ClientPort: h.tcpID.srcPort,
ServerIP: h.tcpID.dstIP,
ServerPort: h.tcpID.dstPort,
IsOutgoing: h.isOutgoing,
},
) )
} }
} }
@ -239,7 +269,13 @@ func (h *httpReader) handleHTTP1ServerStream(b *bufio.Reader) error {
reqResPair.Request.captureTime, reqResPair.Request.captureTime,
reqResPair.Response.orig.(*http.Response), reqResPair.Response.orig.(*http.Response),
reqResPair.Response.captureTime, reqResPair.Response.captureTime,
&reqResPair.Request.connectionInfo, &ConnectionInfo{
ClientIP: h.tcpID.dstIP,
ClientPort: h.tcpID.dstPort,
ServerIP: h.tcpID.srcIP,
ServerPort: h.tcpID.srcPort,
IsOutgoing: h.isOutgoing,
},
) )
} }
} }

View File

@ -27,13 +27,15 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
SupportMissingEstablishment: *allowmissinginit, SupportMissingEstablishment: *allowmissinginit,
} }
Debug("Current App Ports: %v", gSettings.filterPorts) Debug("Current App Ports: %v", gSettings.filterPorts)
srcIp := net.Src().String()
dstIp := net.Dst().String() dstIp := net.Dst().String()
dstPort := int(tcp.DstPort) dstPort := int(tcp.DstPort)
if factory.shouldNotifyOnOutboundLink(dstIp, dstPort) { if factory.shouldNotifyOnOutboundLink(dstIp, dstPort) {
factory.outbountLinkWriter.WriteOutboundLink(net.Src().String(), dstIp, dstPort) factory.outbountLinkWriter.WriteOutboundLink(net.Src().String(), dstIp, dstPort)
} }
isHTTP := factory.shouldTap(dstIp, dstPort) props := factory.getStreamProps(srcIp, dstIp, dstPort)
isHTTP := props.isTapTarget
stream := &tcpStream{ stream := &tcpStream{
net: net, net: net,
transport: transport, transport: transport,
@ -57,6 +59,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
hexdump: *hexdump, hexdump: *hexdump,
parent: stream, parent: stream,
isClient: true, isClient: true,
isOutgoing: props.isOutgoing,
harWriter: factory.harWriter, harWriter: factory.harWriter,
} }
stream.server = httpReader{ stream.server = httpReader{
@ -70,6 +73,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
}, },
hexdump: *hexdump, hexdump: *hexdump,
parent: stream, parent: stream,
isOutgoing: props.isOutgoing,
harWriter: factory.harWriter, harWriter: factory.harWriter,
} }
factory.wg.Add(2) factory.wg.Add(2)
@ -84,28 +88,29 @@ func (factory *tcpStreamFactory) WaitGoRoutines() {
factory.wg.Wait() factory.wg.Wait()
} }
func (factory *tcpStreamFactory) shouldTap(dstIP string, dstPort int) bool { func (factory *tcpStreamFactory) getStreamProps(srcIP string, dstIP string, dstPort int) *streamProps {
if hostMode { if hostMode {
if inArrayString(gSettings.filterAuthorities, fmt.Sprintf("%s:%d", dstIP, dstPort)) == true { if inArrayString(gSettings.filterAuthorities, fmt.Sprintf("%s:%d", dstIP, dstPort)) == true {
return true return &streamProps{isTapTarget: true, isOutgoing: false}
} else if inArrayString(gSettings.filterAuthorities, dstIP) == true { } else if inArrayString(gSettings.filterAuthorities, dstIP) == true {
return true return &streamProps{isTapTarget: true, isOutgoing: false}
} else if *anydirection && inArrayString(gSettings.filterAuthorities, srcIP) == true {
return &streamProps{isTapTarget: true, isOutgoing: true}
} }
return false return &streamProps{isTapTarget: false}
} else { } else {
isTappedPort := dstPort == 80 || (gSettings.filterPorts != nil && (inArrayInt(gSettings.filterPorts, dstPort))) isTappedPort := dstPort == 80 || (gSettings.filterPorts != nil && (inArrayInt(gSettings.filterPorts, dstPort)))
if !isTappedPort { if !isTappedPort {
return false return &streamProps{isTapTarget: false, isOutgoing: false}
} }
if !*anydirection { isOutgoing := !inArrayString(ownIps, dstIP)
isDirectedHere := inArrayString(ownIps, dstIP)
if !isDirectedHere { if !*anydirection && isOutgoing {
return false return &streamProps{isTapTarget: false, isOutgoing: isOutgoing}
}
} }
return true return &streamProps{isTapTarget: true}
} }
} }
@ -116,3 +121,9 @@ func (factory *tcpStreamFactory) shouldNotifyOnOutboundLink(dstIP string, dstPor
} }
return true return true
} }
type streamProps struct {
isTapTarget bool
isOutgoing bool
}

View File

@ -2,6 +2,8 @@ import React from "react";
import styles from './style/HarEntry.module.sass'; import styles from './style/HarEntry.module.sass';
import StatusCode from "./StatusCode"; import StatusCode from "./StatusCode";
import {EndpointPath} from "./EndpointPath"; import {EndpointPath} from "./EndpointPath";
import ingoingIcon from "./assets/ingoing-traffic.svg"
import outgoingIcon from "./assets/outgoing-traffic.svg"
interface HAREntry { interface HAREntry {
method?: string, method?: string,
@ -12,6 +14,7 @@ interface HAREntry {
url?: string; url?: string;
isCurrentRevision?: boolean; isCurrentRevision?: boolean;
timestamp: Date; timestamp: Date;
isOutgoing?: boolean;
} }
interface HAREntryProps { interface HAREntryProps {
@ -33,6 +36,17 @@ export const HarEntry: React.FC<HAREntryProps> = ({entry, setFocusedEntryId, isS
{entry.service} {entry.service}
</div> </div>
</div> </div>
<div className={styles.directionContainer}>
{entry.isOutgoing ?
<div className={styles.outgoingIcon}>
<img src={outgoingIcon} alt="outgoing traffic" title="outgoing"/>
</div>
:
<div className={styles.ingoingIcon}>
<img src={ingoingIcon} alt="ingoing traffic" title="ingoing"/>
</div>
}
</div>
<div className={styles.timestamp}>{new Date(+entry.timestamp)?.toLocaleString()}</div> <div className={styles.timestamp}>{new Date(+entry.timestamp)?.toLocaleString()}</div>
</div> </div>
</> </>

View File

@ -0,0 +1 @@
<svg xmlns="http://www.w3.org/2000/svg" enable-background="new 0 0 24 24" height="24px" viewBox="0 0 24 24" width="24px" fill="#000000"><g><rect fill="none" height="24" width="24"/></g><g><path d="M11,7L9.6,8.4l2.6,2.6H2v2h10.2l-2.6,2.6L11,17l5-5L11,7z M20,19h-8v2h8c1.1,0,2-0.9,2-2V5c0-1.1-0.9-2-2-2h-8v2h8V19z"/></g></svg>

After

Width:  |  Height:  |  Size: 325 B

View File

@ -0,0 +1 @@
<svg xmlns="http://www.w3.org/2000/svg" enable-background="new 0 0 24 24" height="24px" viewBox="0 0 24 24" width="24px" fill="#000000"><g><path d="M0,0h24v24H0V0z" fill="none"/></g><g><path d="M17,8l-1.41,1.41L17.17,11H9v2h8.17l-1.58,1.58L17,16l4-4L17,8z M5,5h7V3H5C3.9,3,3,3.9,3,5v14c0,1.1,0.9,2,2,2h7v-2H5V5z"/></g></svg>

After

Width:  |  Height:  |  Size: 325 B

View File

@ -48,3 +48,16 @@
padding-right: 10px padding-right: 10px
padding-left: 10px padding-left: 10px
flex-grow: 1 flex-grow: 1
.directionContainer
display: flex
width: 28px
flex-direction: column
.outgoingIcon
display: flex
align-self: flex-end
.ingoingIcon
display: flex
align-self: flex-start