diff --git a/README.md b/README.md index 0205f5726..1d0cd48ef 100644 --- a/README.md +++ b/README.md @@ -13,13 +13,6 @@ curl -Lo mizu \ https://github.com/up9inc/mizu/releases/latest/download/mizu_darwin_amd64 \ && chmod 755 mizu ``` - -* for MacOS - Apple Silicon -``` -curl -Lo mizu \ -https://github.com/up9inc/mizu/releases/latest/download/mizu_darwin_arm64 \ -&& chmod 755 mizu -``` * for Linux - Intel 64bit ``` @@ -28,7 +21,6 @@ https://github.com/up9inc/mizu/releases/latest/download/mizu_linux_amd64 \ && chmod 755 mizu ``` - SHA256 checksums are available on the [Releases](https://github.com/up9inc/mizu/releases) page. ### Development (unstable) build diff --git a/api/main.go b/api/main.go index b7bbaa14d..eebac00d1 100644 --- a/api/main.go +++ b/api/main.go @@ -37,7 +37,7 @@ func main() { harOutputChannel, outboundLinkOutputChannel := tap.StartPassiveTapper(tapOpts) filteredHarChannel := make(chan *tap.OutputChannelItem) - go filterHarHeaders(harOutputChannel, filteredHarChannel, getTrafficFilteringOptions()) + go filterHarItems(harOutputChannel, filteredHarChannel, getTrafficFilteringOptions()) go api.StartReadingEntries(filteredHarChannel, nil) go api.StartReadingOutbound(outboundLinkOutputChannel) @@ -66,8 +66,8 @@ func main() { socketHarOutChannel := make(chan *tap.OutputChannelItem, 1000) filteredHarChannel := make(chan *tap.OutputChannelItem) + go filterHarItems(socketHarOutChannel, filteredHarChannel, getTrafficFilteringOptions()) go api.StartReadingEntries(filteredHarChannel, nil) - go filterHarHeaders(socketHarOutChannel, filteredHarChannel, getTrafficFilteringOptions()) hostApi(socketHarOutChannel) } @@ -125,9 +125,14 @@ func getTrafficFilteringOptions() *shared.TrafficFilteringOptions { return &filteringOptions } -func filterHarHeaders(inChannel <- chan *tap.OutputChannelItem, outChannel chan *tap.OutputChannelItem, filterOptions *shared.TrafficFilteringOptions) { +func filterHarItems(inChannel <- chan *tap.OutputChannelItem, outChannel chan *tap.OutputChannelItem, filterOptions *shared.TrafficFilteringOptions) { for message := range inChannel { + if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) { + continue + } + sensitiveDataFiltering.FilterSensitiveInfoFromHarRequest(message, filterOptions) + outChannel <- message } } diff --git a/api/pkg/api/main.go b/api/pkg/api/main.go index 0907b0146..532344f3e 100644 --- a/api/pkg/api/main.go +++ b/api/pkg/api/main.go @@ -84,7 +84,7 @@ func startReadingFiles(workingDir string) { for _, entry := range inputHar.Log.Entries { time.Sleep(time.Millisecond * 250) - saveHarToDb(entry, fileInfo.Name()) + saveHarToDb(entry, fileInfo.Name(), false) } rmErr := os.Remove(inputFilePath) utils.CheckErr(rmErr) @@ -97,7 +97,7 @@ func startReadingChannel(outputItems <-chan *tap.OutputChannelItem) { } for item := range outputItems { - saveHarToDb(item.HarEntry, item.ConnectionInfo.ClientIP) + saveHarToDb(item.HarEntry, item.ConnectionInfo.ClientIP, item.ConnectionInfo.IsOutgoing) } } @@ -109,7 +109,7 @@ func StartReadingOutbound(outboundLinkChannel <-chan *tap.OutboundLink) { } -func saveHarToDb(entry *har.Entry, sender string) { +func saveHarToDb(entry *har.Entry, sender string, isOutgoing bool) { entryBytes, _ := json.Marshal(entry) serviceName, urlPath, serviceHostName := getServiceNameFromUrl(entry.Request.URL) entryId := primitive.NewObjectID().Hex() @@ -133,6 +133,7 @@ func saveHarToDb(entry *har.Entry, sender string) { Timestamp: entry.StartedDateTime.UnixNano() / int64(time.Millisecond), ResolvedSource: resolvedSource, ResolvedDestination: resolvedDestination, + IsOutgoing: isOutgoing, } database.GetEntriesTable().Create(&mizuEntry) @@ -146,3 +147,7 @@ func getServiceNameFromUrl(inputUrl string) (string, string, string) { utils.CheckErr(err) return fmt.Sprintf("%s://%s", parsed.Scheme, parsed.Host), parsed.Path, parsed.Host } + +func CheckIsServiceIP(address string) bool { + return k8sResolver.CheckIsServiceIP(address) +} diff --git a/api/pkg/models/models.go b/api/pkg/models/models.go index 5cde7622c..108801e1e 100644 --- a/api/pkg/models/models.go +++ b/api/pkg/models/models.go @@ -23,6 +23,7 @@ type MizuEntry struct { Path string `json:"path" gorm:"column:path"` ResolvedSource string `json:"resolvedSource,omitempty" gorm:"column:resolvedSource"` ResolvedDestination string `json:"resolvedDestination,omitempty" gorm:"column:resolvedDestination"` + IsOutgoing bool `json:"isOutgoing,omitempty" gorm:"column:isOutgoing"` } type BaseEntryDetails struct { @@ -34,6 +35,7 @@ type BaseEntryDetails struct { StatusCode int `json:"statusCode,omitempty"` Method string `json:"method,omitempty"` Timestamp int64 `json:"timestamp,omitempty"` + IsOutgoing bool `json:"isOutgoing,omitempty"` } type EntryData struct { diff --git a/api/pkg/resolver/loader.go b/api/pkg/resolver/loader.go index c6836116c..f25fab3f0 100644 --- a/api/pkg/resolver/loader.go +++ b/api/pkg/resolver/loader.go @@ -21,7 +21,7 @@ func NewFromInCluster(errOut chan error) (*Resolver, error) { if err != nil { return nil, err } - return &Resolver{clientConfig: config, clientSet: clientset, nameMap: make(map[string]string), errOut: errOut}, nil + return &Resolver{clientConfig: config, clientSet: clientset, nameMap: make(map[string]string), serviceMap: make(map[string]string), errOut: errOut}, nil } func NewFromOutOfCluster(kubeConfigPath string, errOut chan error) (*Resolver, error) { @@ -53,9 +53,9 @@ func NewFromOutOfCluster(kubeConfigPath string, errOut chan error) (*Resolver, e return nil, err } - return &Resolver{clientConfig: clientConfig, clientSet: clientset, nameMap: make(map[string]string), errOut: errOut}, nil + return &Resolver{clientConfig: clientConfig, clientSet: clientset, nameMap: make(map[string]string), serviceMap: make(map[string]string), errOut: errOut}, nil } func NewFromExisting(clientConfig *restclient.Config, clientSet *kubernetes.Clientset, errOut chan error) *Resolver { - return &Resolver{clientConfig: clientConfig, clientSet: clientSet, nameMap: make(map[string]string), errOut: errOut} + return &Resolver{clientConfig: clientConfig, clientSet: clientSet, nameMap: make(map[string]string), serviceMap: make(map[string]string), errOut: errOut} } diff --git a/api/pkg/resolver/resolver.go b/api/pkg/resolver/resolver.go index 14076a108..88dbbc929 100644 --- a/api/pkg/resolver/resolver.go +++ b/api/pkg/resolver/resolver.go @@ -20,6 +20,7 @@ type Resolver struct { clientConfig *restclient.Config clientSet *kubernetes.Clientset nameMap map[string]string + serviceMap map[string]string isStarted bool errOut chan error } @@ -41,6 +42,11 @@ func (resolver *Resolver) Resolve(name string) string { return resolvedName } +func (resolver *Resolver) CheckIsServiceIP(address string) bool { + _, isFound := resolver.serviceMap[address] + return isFound +} + func (resolver *Resolver) watchPods(ctx context.Context) error { // empty namespace makes the client watch all namespaces watcher, err := resolver.clientSet.CoreV1().Pods("").Watch(ctx, metav1.ListOptions{Watch: true}) @@ -124,6 +130,7 @@ func (resolver *Resolver) watchServices(ctx context.Context) error { serviceHostname := fmt.Sprintf("%s.%s", service.Name, service.Namespace) if service.Spec.ClusterIP != "" && service.Spec.ClusterIP != kubClientNullString { resolver.saveResolvedName(service.Spec.ClusterIP, serviceHostname, event.Type) + resolver.saveServiceIP(service.Spec.ClusterIP, serviceHostname, event.Type) } if service.Status.LoadBalancer.Ingress != nil { for _, ingress := range service.Status.LoadBalancer.Ingress { @@ -147,6 +154,14 @@ func (resolver *Resolver) saveResolvedName(key string, resolved string, eventTyp } } +func (resolver *Resolver) saveServiceIP(key string, resolved string, eventType watch.EventType) { + if eventType == watch.Deleted { + delete(resolver.serviceMap, key) + } else { + resolver.serviceMap[key] = resolved + } +} + func (resolver *Resolver) infiniteErrorHandleRetryFunc(ctx context.Context, fun func(ctx context.Context) error) { for { err := fun(ctx) diff --git a/api/pkg/utils/utils.go b/api/pkg/utils/utils.go index 9492c9b55..c50f65c9d 100644 --- a/api/pkg/utils/utils.go +++ b/api/pkg/utils/utils.go @@ -70,14 +70,15 @@ func GetResolvedBaseEntry(entry models.MizuEntry) models.BaseEntryDetails { service = SetHostname(service, entry.ResolvedDestination) } return models.BaseEntryDetails{ - Id: entry.EntryId, - Url: entryUrl, - Service: service, - Path: entry.Path, - StatusCode: entry.Status, - Method: entry.Method, - Timestamp: entry.Timestamp, + Id: entry.EntryId, + Url: entryUrl, + Service: service, + Path: entry.Path, + StatusCode: entry.Status, + Method: entry.Method, + Timestamp: entry.Timestamp, RequestSenderIp: entry.RequestSenderIp, + IsOutgoing: entry.IsOutgoing, } } diff --git a/cli/Makefile b/cli/Makefile index ef97249c7..b228e9fd1 100644 --- a/cli/Makefile +++ b/cli/Makefile @@ -26,10 +26,10 @@ build-all: ## build for all supported platforms @mkdir -p bin && echo "SHA256 checksums available for compiled binaries \n\nRun \`shasum -a 256 -c mizu_OS_ARCH.sha256\` to verify\n\n" > bin/README.md @$(MAKE) build GOOS=darwin GOARCH=amd64 @$(MAKE) build GOOS=linux GOARCH=amd64 + @# $(MAKE) build GOOS=darwin GOARCH=arm64 @# $(MAKE) GOOS=windows GOARCH=amd64 @# $(MAKE) GOOS=linux GOARCH=386 @# $(MAKE) GOOS=windows GOARCH=386 - @$(MAKE) GOOS=darwin GOARCH=arm64 @# $(MAKE) GOOS=linux GOARCH=arm64 @# $(MAKE) GOOS=windows GOARCH=arm64 @echo "---------" diff --git a/cli/cmd/tap.go b/cli/cmd/tap.go index 69c4b988a..c3e6fb747 100644 --- a/cli/cmd/tap.go +++ b/cli/cmd/tap.go @@ -3,8 +3,10 @@ package cmd import ( "errors" "fmt" - "github.com/up9inc/mizu/cli/mizu" "regexp" + "strings" + + "github.com/up9inc/mizu/cli/mizu" "github.com/spf13/cobra" ) @@ -17,10 +19,12 @@ type MizuTapOptions struct { MizuImage string MizuPodPort uint16 PlainTextFilterRegexes []string + TapOutgoing bool } var mizuTapOptions = &MizuTapOptions{} +var direction string var tapCmd = &cobra.Command{ Use: "tap [POD REGEX]", @@ -39,6 +43,15 @@ var tapCmd = &cobra.Command{ return errors.New(fmt.Sprintf("%s is not a valid regex %s", args[0], err)) } + directionLowerCase := strings.ToLower(direction) + if directionLowerCase == "any" { + mizuTapOptions.TapOutgoing = true + } else if directionLowerCase == "in" { + mizuTapOptions.TapOutgoing = false + } else { + return errors.New(fmt.Sprintf("%s is not a valid value for flag --direction. Acceptable values are in/any.", direction)) + } + RunMizuTap(regex, mizuTapOptions) return nil }, @@ -54,4 +67,5 @@ func init() { tapCmd.Flags().StringVarP(&mizuTapOptions.MizuImage, "mizu-image", "", fmt.Sprintf("gcr.io/up9-docker-hub/mizu/%s:latest", mizu.Branch), "Custom image for mizu collector") tapCmd.Flags().Uint16VarP(&mizuTapOptions.MizuPodPort, "mizu-port", "", 8899, "Port which mizu cli will attempt to forward from the mizu collector pod") tapCmd.Flags().StringArrayVarP(&mizuTapOptions.PlainTextFilterRegexes, "regex-masking", "r", nil, "List of regex expressions that are used to filter matching values from text/plain http bodies") + tapCmd.Flags().StringVarP(&direction, "direction", "", "in", "Record traffic that goes in this direction (relative to the tapped pod): in/any") } diff --git a/cli/cmd/tapRunner.go b/cli/cmd/tapRunner.go index 6c0cc7d9b..76011e1dc 100644 --- a/cli/cmd/tapRunner.go +++ b/cli/cmd/tapRunner.go @@ -3,13 +3,14 @@ package cmd import ( "context" "fmt" - "github.com/up9inc/mizu/shared" "os" "os/signal" "regexp" "syscall" "time" + "github.com/up9inc/mizu/shared" + core "k8s.io/api/core/v1" "github.com/up9inc/mizu/cli/debounce" @@ -45,6 +46,22 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) { currentlyTappedPods = matchingPods } + var namespacesStr string + if targetNamespace != mizu.K8sAllNamespaces { + namespacesStr = fmt.Sprintf("namespace \"%s\"", targetNamespace) + } else { + namespacesStr = "all namespaces" + } + fmt.Printf("Tapping pods in %s\n", namespacesStr) + + if len(currentlyTappedPods) == 0 { + var suggestionStr string + if targetNamespace != mizu.K8sAllNamespaces { + suggestionStr = "\nSelect a different namespace with -n or tap all namespaces with -A" + } + fmt.Printf("Did not find any pods matching the regex argument%s\n", suggestionStr) + } + nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(currentlyTappedPods) if err != nil { return @@ -60,8 +77,6 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) { //block until exit signal or error waitForFinish(ctx, cancel) - - // TODO handle incoming traffic from tapper using a channel } func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions, mizuApiFilteringOptions *shared.TrafficFilteringOptions) error { @@ -69,7 +84,7 @@ func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Pro return err } - if err := createMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil { + if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil { return err } @@ -113,19 +128,27 @@ func getMizuApiFilteringOptions(tappingOptions *MizuTapOptions) (*shared.Traffic return &shared.TrafficFilteringOptions{PlainTextMaskingRegexes: compiledRegexSlice}, nil } -func createMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions) error { - if err := kubernetesProvider.ApplyMizuTapperDaemonSet( - ctx, - mizu.ResourcesNamespace, - mizu.TapperDaemonSetName, - tappingOptions.MizuImage, - mizu.TapperPodName, - fmt.Sprintf("%s.%s.svc.cluster.local", aggregatorService.Name, aggregatorService.Namespace), - nodeToTappedPodIPMap, - mizuServiceAccountExists, - ); err != nil { - fmt.Printf("Error creating mizu tapper daemonset: %v\n", err) - return err +func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions) error { + if len(nodeToTappedPodIPMap) > 0 { + if err := kubernetesProvider.ApplyMizuTapperDaemonSet( + ctx, + mizu.ResourcesNamespace, + mizu.TapperDaemonSetName, + tappingOptions.MizuImage, + mizu.TapperPodName, + fmt.Sprintf("%s.%s.svc.cluster.local", aggregatorService.Name, aggregatorService.Namespace), + nodeToTappedPodIPMap, + mizuServiceAccountExists, + tappingOptions.TapOutgoing, + ); err != nil { + fmt.Printf("Error creating mizu tapper daemonset: %v\n", err) + return err + } + } else { + if err := kubernetesProvider.RemoveDaemonSet(ctx, mizu.ResourcesNamespace, mizu.TapperDaemonSetName); err != nil { + fmt.Printf("Error deleting mizu tapper daemonset: %v\n", err) + return err + } } return nil @@ -165,7 +188,7 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro cancel() } - if err := createMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil { + if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil { fmt.Printf("Error updating daemonset: %s (%v,%+v)\n", err, err, err) cancel() } diff --git a/cli/kubernetes/provider.go b/cli/kubernetes/provider.go index 09b000bcb..0bf40c1b8 100644 --- a/cli/kubernetes/provider.go +++ b/cli/kubernetes/provider.go @@ -102,7 +102,6 @@ func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace }, DNSPolicy: core.DNSClusterFirstWithHostNet, TerminationGracePeriodSeconds: new(int64), - // Affinity: TODO: define node selector for all relevant nodes for this mizu instance }, } //define the service account only when it exists to prevent pod crash @@ -215,30 +214,117 @@ func (provider *Provider) CreateMizuRBAC(ctx context.Context, namespace string, } func (provider *Provider) RemovePod(ctx context.Context, namespace string, podName string) error { + if isFound, err := provider.CheckPodExists(ctx, namespace, podName); + err != nil { + return err + } else if !isFound { + return nil + } + return provider.clientSet.CoreV1().Pods(namespace).Delete(ctx, podName, metav1.DeleteOptions{}) } func (provider *Provider) RemoveService(ctx context.Context, namespace string, serviceName string) error { + if isFound, err := provider.CheckServiceExists(ctx, namespace, serviceName); + err != nil { + return err + } else if !isFound { + return nil + } + return provider.clientSet.CoreV1().Services(namespace).Delete(ctx, serviceName, metav1.DeleteOptions{}) } func (provider *Provider) RemoveDaemonSet(ctx context.Context, namespace string, daemonSetName string) error { + if isFound, err := provider.CheckDaemonSetExists(ctx, namespace, daemonSetName); + err != nil { + return err + } else if !isFound { + return nil + } + return provider.clientSet.AppsV1().DaemonSets(namespace).Delete(ctx, daemonSetName, metav1.DeleteOptions{}) } -func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, aggregatorPodIp string, nodeToTappedPodIPMap map[string][]string, linkServiceAccount bool) error { +func (provider *Provider) CheckPodExists(ctx context.Context, namespace string, name string) (bool, error) { + listOptions := metav1.ListOptions{ + FieldSelector: fmt.Sprintf("metadata.name=%s", name), + Limit: 1, + } + resourceList, err := provider.clientSet.CoreV1().Pods(namespace).List(ctx, listOptions) + if err != nil { + return false, err + } + + if len(resourceList.Items) > 0 { + return true, nil + } + + return false, nil +} + +func (provider *Provider) CheckServiceExists(ctx context.Context, namespace string, name string) (bool, error) { + listOptions := metav1.ListOptions{ + FieldSelector: fmt.Sprintf("metadata.name=%s", name), + Limit: 1, + } + resourceList, err := provider.clientSet.CoreV1().Services(namespace).List(ctx, listOptions) + if err != nil { + return false, err + } + + if len(resourceList.Items) > 0 { + return true, nil + } + + return false, nil +} + +func (provider *Provider) CheckDaemonSetExists(ctx context.Context, namespace string, name string) (bool, error) { + listOptions := metav1.ListOptions{ + FieldSelector: fmt.Sprintf("metadata.name=%s", name), + Limit: 1, + } + resourceList, err := provider.clientSet.AppsV1().DaemonSets(namespace).List(ctx, listOptions) + if err != nil { + return false, err + } + + if len(resourceList.Items) > 0 { + return true, nil + } + + return false, nil +} + +func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, aggregatorPodIp string, nodeToTappedPodIPMap map[string][]string, linkServiceAccount bool, tapOutgoing bool) error { + if len(nodeToTappedPodIPMap) == 0 { + return fmt.Errorf("Daemon set %s must tap at least 1 pod", daemonSetName) + } + nodeToTappedPodIPMapJsonStr, err := json.Marshal(nodeToTappedPodIPMap) if err != nil { return err } + mizuCmd := []string{ + "./mizuagent", + "-i", "any", + "--tap", + "--hardump", + "--aggregator-address", fmt.Sprintf("ws://%s/wsTapper", aggregatorPodIp), + } + if tapOutgoing { + mizuCmd = append(mizuCmd, "--anydirection") + } + privileged := true agentContainer := applyconfcore.Container() agentContainer.WithName(tapperPodName) agentContainer.WithImage(podImage) agentContainer.WithImagePullPolicy(core.PullAlways) agentContainer.WithSecurityContext(applyconfcore.SecurityContext().WithPrivileged(privileged)) - agentContainer.WithCommand("./mizuagent", "-i", "any", "--tap", "--hardump", "--aggregator-address", fmt.Sprintf("ws://%s/wsTapper", aggregatorPodIp)) + agentContainer.WithCommand(mizuCmd...) agentContainer.WithEnv( applyconfcore.EnvVar().WithName(shared.HostModeEnvVar).WithValue("1"), applyconfcore.EnvVar().WithName(shared.TappedAddressesPerNodeDictEnvVar).WithValue(string(nodeToTappedPodIPMapJsonStr)), diff --git a/shared/models.go b/shared/models.go index cbf73b594..854dbf1f6 100644 --- a/shared/models.go +++ b/shared/models.go @@ -3,7 +3,7 @@ package shared type WebSocketMessageType string const ( WebSocketMessageTypeEntry WebSocketMessageType = "entry" - WebSocketMessageTypeTappedEntry WebSocketMessageType = "tappedEntry" + WebSocketMessageTypeTappedEntry WebSocketMessageType = "tappedEntry" WebSocketMessageTypeUpdateStatus WebSocketMessageType = "status" ) diff --git a/tap/http_matcher.go b/tap/http_matcher.go index a0377cd46..6e0393e5c 100644 --- a/tap/http_matcher.go +++ b/tap/http_matcher.go @@ -14,18 +14,10 @@ type requestResponsePair struct { Response httpMessage `json:"response"` } -type ConnectionInfo struct { - ClientIP string - ClientPort string - ServerIP string - ServerPort string -} - type httpMessage struct { isRequest bool captureTime time.Time orig interface{} - connectionInfo ConnectionInfo } @@ -44,18 +36,10 @@ func (matcher *requestResponseMatcher) registerRequest(ident string, request *ht split := splitIdent(ident) key := genKey(split) - connectionInfo := &ConnectionInfo{ - ClientIP: split[0], - ClientPort: split[2], - ServerIP: split[1], - ServerPort: split[3], - } - requestHTTPMessage := httpMessage{ isRequest: true, captureTime: captureTime, orig: request, - connectionInfo: *connectionInfo, } if response, found := matcher.openMessagesMap.Pop(key); found { diff --git a/tap/http_reader.go b/tap/http_reader.go index 00d20f425..774f4c5a2 100644 --- a/tap/http_reader.go +++ b/tap/http_reader.go @@ -24,6 +24,14 @@ type tcpID struct { dstPort string } +type ConnectionInfo struct { + ClientIP string + ClientPort string + ServerIP string + ServerPort string + IsOutgoing bool +} + func (tid *tcpID) String() string { return fmt.Sprintf("%s->%s %s->%s", tid.srcIP, tid.dstIP, tid.srcPort, tid.dstPort) } @@ -38,6 +46,7 @@ type httpReader struct { tcpID tcpID isClient bool isHTTP2 bool + isOutgoing bool msgQueue chan httpReaderDataMsg // Channel of captured reassembled tcp payload data []byte captureTime time.Time @@ -121,13 +130,28 @@ func (h *httpReader) handleHTTP2Stream() error { } var reqResPair *requestResponsePair + var connectionInfo *ConnectionInfo switch messageHTTP1 := messageHTTP1.(type) { case http.Request: ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.srcIP, h.tcpID.dstIP, h.tcpID.srcPort, h.tcpID.dstPort, streamID) + connectionInfo = &ConnectionInfo{ + ClientIP: h.tcpID.srcIP, + ClientPort: h.tcpID.srcPort, + ServerIP: h.tcpID.dstIP, + ServerPort: h.tcpID.dstPort, + IsOutgoing: h.isOutgoing, + } reqResPair = reqResMatcher.registerRequest(ident, &messageHTTP1, h.captureTime) case http.Response: ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.dstIP, h.tcpID.srcIP, h.tcpID.dstPort, h.tcpID.srcPort, streamID) + connectionInfo = &ConnectionInfo{ + ClientIP: h.tcpID.dstIP, + ClientPort: h.tcpID.dstPort, + ServerIP: h.tcpID.srcIP, + ServerPort: h.tcpID.srcPort, + IsOutgoing: h.isOutgoing, + } reqResPair = reqResMatcher.registerResponse(ident, &messageHTTP1, h.captureTime) } @@ -140,7 +164,7 @@ func (h *httpReader) handleHTTP2Stream() error { reqResPair.Request.captureTime, reqResPair.Response.orig.(*http.Response), reqResPair.Response.captureTime, - &reqResPair.Request.connectionInfo, + connectionInfo, ) } } @@ -179,7 +203,13 @@ func (h *httpReader) handleHTTP1ClientStream(b *bufio.Reader) error { reqResPair.Request.captureTime, reqResPair.Response.orig.(*http.Response), reqResPair.Response.captureTime, - &reqResPair.Request.connectionInfo, + &ConnectionInfo{ + ClientIP: h.tcpID.srcIP, + ClientPort: h.tcpID.srcPort, + ServerIP: h.tcpID.dstIP, + ServerPort: h.tcpID.dstPort, + IsOutgoing: h.isOutgoing, + }, ) } } @@ -239,7 +269,13 @@ func (h *httpReader) handleHTTP1ServerStream(b *bufio.Reader) error { reqResPair.Request.captureTime, reqResPair.Response.orig.(*http.Response), reqResPair.Response.captureTime, - &reqResPair.Request.connectionInfo, + &ConnectionInfo{ + ClientIP: h.tcpID.dstIP, + ClientPort: h.tcpID.dstPort, + ServerIP: h.tcpID.srcIP, + ServerPort: h.tcpID.srcPort, + IsOutgoing: h.isOutgoing, + }, ) } } diff --git a/tap/tcp_stream_factory.go b/tap/tcp_stream_factory.go index 6808cc13e..bb268a929 100644 --- a/tap/tcp_stream_factory.go +++ b/tap/tcp_stream_factory.go @@ -27,13 +27,15 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T SupportMissingEstablishment: *allowmissinginit, } Debug("Current App Ports: %v", gSettings.filterPorts) + srcIp := net.Src().String() dstIp := net.Dst().String() dstPort := int(tcp.DstPort) if factory.shouldNotifyOnOutboundLink(dstIp, dstPort) { factory.outbountLinkWriter.WriteOutboundLink(net.Src().String(), dstIp, dstPort) } - isHTTP := factory.shouldTap(dstIp, dstPort) + props := factory.getStreamProps(srcIp, dstIp, dstPort) + isHTTP := props.isTapTarget stream := &tcpStream{ net: net, transport: transport, @@ -57,6 +59,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T hexdump: *hexdump, parent: stream, isClient: true, + isOutgoing: props.isOutgoing, harWriter: factory.harWriter, } stream.server = httpReader{ @@ -70,6 +73,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T }, hexdump: *hexdump, parent: stream, + isOutgoing: props.isOutgoing, harWriter: factory.harWriter, } factory.wg.Add(2) @@ -84,28 +88,29 @@ func (factory *tcpStreamFactory) WaitGoRoutines() { factory.wg.Wait() } -func (factory *tcpStreamFactory) shouldTap(dstIP string, dstPort int) bool { +func (factory *tcpStreamFactory) getStreamProps(srcIP string, dstIP string, dstPort int) *streamProps { if hostMode { if inArrayString(gSettings.filterAuthorities, fmt.Sprintf("%s:%d", dstIP, dstPort)) == true { - return true + return &streamProps{isTapTarget: true, isOutgoing: false} } else if inArrayString(gSettings.filterAuthorities, dstIP) == true { - return true + return &streamProps{isTapTarget: true, isOutgoing: false} + } else if *anydirection && inArrayString(gSettings.filterAuthorities, srcIP) == true { + return &streamProps{isTapTarget: true, isOutgoing: true} } - return false + return &streamProps{isTapTarget: false} } else { isTappedPort := dstPort == 80 || (gSettings.filterPorts != nil && (inArrayInt(gSettings.filterPorts, dstPort))) if !isTappedPort { - return false + return &streamProps{isTapTarget: false, isOutgoing: false} } - if !*anydirection { - isDirectedHere := inArrayString(ownIps, dstIP) - if !isDirectedHere { - return false - } + isOutgoing := !inArrayString(ownIps, dstIP) + + if !*anydirection && isOutgoing { + return &streamProps{isTapTarget: false, isOutgoing: isOutgoing} } - return true + return &streamProps{isTapTarget: true} } } @@ -116,3 +121,9 @@ func (factory *tcpStreamFactory) shouldNotifyOnOutboundLink(dstIP string, dstPor } return true } + +type streamProps struct { + isTapTarget bool + isOutgoing bool +} + diff --git a/ui/src/components/HarEntry.tsx b/ui/src/components/HarEntry.tsx index 6ef8d76b9..95f3c6ffd 100644 --- a/ui/src/components/HarEntry.tsx +++ b/ui/src/components/HarEntry.tsx @@ -2,6 +2,8 @@ import React from "react"; import styles from './style/HarEntry.module.sass'; import StatusCode from "./StatusCode"; import {EndpointPath} from "./EndpointPath"; +import ingoingIcon from "./assets/ingoing-traffic.svg" +import outgoingIcon from "./assets/outgoing-traffic.svg" interface HAREntry { method?: string, @@ -12,6 +14,7 @@ interface HAREntry { url?: string; isCurrentRevision?: boolean; timestamp: Date; + isOutgoing?: boolean; } interface HAREntryProps { @@ -33,7 +36,18 @@ export const HarEntry: React.FC = ({entry, setFocusedEntryId, isS {entry.service} +
+ {entry.isOutgoing ? +
+ outgoing traffic +
+ : +
+ ingoing traffic +
+ } +
{new Date(+entry.timestamp)?.toLocaleString()}
-}; \ No newline at end of file +}; diff --git a/ui/src/components/assets/ingoing-traffic.svg b/ui/src/components/assets/ingoing-traffic.svg new file mode 100644 index 000000000..7aaded9a8 --- /dev/null +++ b/ui/src/components/assets/ingoing-traffic.svg @@ -0,0 +1 @@ + diff --git a/ui/src/components/assets/outgoing-traffic.svg b/ui/src/components/assets/outgoing-traffic.svg new file mode 100644 index 000000000..1c8ec87d7 --- /dev/null +++ b/ui/src/components/assets/outgoing-traffic.svg @@ -0,0 +1 @@ + diff --git a/ui/src/components/style/HarEntry.module.sass b/ui/src/components/style/HarEntry.module.sass index 4f1a8185b..3e28bfee9 100644 --- a/ui/src/components/style/HarEntry.module.sass +++ b/ui/src/components/style/HarEntry.module.sass @@ -47,4 +47,17 @@ overflow: hidden padding-right: 10px padding-left: 10px - flex-grow: 1 \ No newline at end of file + flex-grow: 1 + +.directionContainer + display: flex + width: 28px + flex-direction: column + +.outgoingIcon + display: flex + align-self: flex-end + +.ingoingIcon + display: flex + align-self: flex-start diff --git a/ui/src/components/style/variables.module.scss b/ui/src/components/style/variables.module.scss index d1152084c..d2f3d89bd 100644 --- a/ui/src/components/style/variables.module.scss +++ b/ui/src/components/style/variables.module.scss @@ -19,4 +19,4 @@ $blue-gray: #494677; successColor: $success-color; failureColor: $failure-color; blueGray: $blue-gray; -} \ No newline at end of file +}