diff --git a/agent/pkg/api/main.go b/agent/pkg/api/main.go index 3886af658..685ef2516 100644 --- a/agent/pkg/api/main.go +++ b/agent/pkg/api/main.go @@ -118,8 +118,8 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension for item := range outputItems { extension := extensionsMap[item.Protocol.Name] - resolvedSource, resolvedDestionation := resolveIP(item.ConnectionInfo) - mizuEntry := extension.Dissector.Analyze(item, resolvedSource, resolvedDestionation) + resolvedSource, resolvedDestionation, namespace := resolveIP(item.ConnectionInfo) + mizuEntry := extension.Dissector.Analyze(item, resolvedSource, resolvedDestionation, namespace) if extension.Protocol.Name == "http" { if !disableOASValidation { var httpPair tapApi.HTTPRequestResponsePair @@ -158,26 +158,32 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension } } -func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, resolvedDestination string) { +func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, resolvedDestination string, namespace string) { if k8sResolver != nil { unresolvedSource := connectionInfo.ClientIP - resolvedSource = k8sResolver.Resolve(unresolvedSource) - if resolvedSource == "" { + resolvedSourceObject := k8sResolver.Resolve(unresolvedSource) + if resolvedSourceObject == nil { logger.Log.Debugf("Cannot find resolved name to source: %s", unresolvedSource) if os.Getenv("SKIP_NOT_RESOLVED_SOURCE") == "1" { return } + } else { + resolvedSource = resolvedSourceObject.FullAddress } + unresolvedDestination := fmt.Sprintf("%s:%s", connectionInfo.ServerIP, connectionInfo.ServerPort) - resolvedDestination = k8sResolver.Resolve(unresolvedDestination) - if resolvedDestination == "" { + resolvedDestinationObject := k8sResolver.Resolve(unresolvedDestination) + if resolvedDestinationObject == nil { logger.Log.Debugf("Cannot find resolved name to dest: %s", unresolvedDestination) if os.Getenv("SKIP_NOT_RESOLVED_DEST") == "1" { return } + } else { + resolvedDestination = resolvedDestinationObject.FullAddress + namespace = resolvedDestinationObject.Namespace } } - return resolvedSource, resolvedDestination + return resolvedSource, resolvedDestination, namespace } func CheckIsServiceIP(address string) bool { diff --git a/agent/pkg/api/socket_server_handlers.go b/agent/pkg/api/socket_server_handlers.go index a22508362..bedde49a7 100644 --- a/agent/pkg/api/socket_server_handlers.go +++ b/agent/pkg/api/socket_server_handlers.go @@ -104,9 +104,9 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) { } func handleTLSLink(outboundLinkMessage models.WebsocketOutboundLinkMessage) { - resolvedName := k8sResolver.Resolve(outboundLinkMessage.Data.DstIP) - if resolvedName != "" { - outboundLinkMessage.Data.DstIP = resolvedName + resolvedNameObject := k8sResolver.Resolve(outboundLinkMessage.Data.DstIP) + if resolvedNameObject != nil { + outboundLinkMessage.Data.DstIP = resolvedNameObject.FullAddress } else if outboundLinkMessage.Data.SuggestedResolvedName != "" { outboundLinkMessage.Data.DstIP = outboundLinkMessage.Data.SuggestedResolvedName } diff --git a/agent/pkg/middlewares/cors.go b/agent/pkg/middlewares/cors.go index 04afb297a..e5d711ad9 100644 --- a/agent/pkg/middlewares/cors.go +++ b/agent/pkg/middlewares/cors.go @@ -7,7 +7,7 @@ func CORSMiddleware() gin.HandlerFunc { c.Writer.Header().Set("Access-Control-Allow-Origin", "*") c.Writer.Header().Set("Access-Control-Allow-Credentials", "true") c.Writer.Header().Set("Access-Control-Allow-Headers", "Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization, accept, origin, Cache-Control, X-Requested-With, x-session-token") - c.Writer.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS, GET, PUT") + c.Writer.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS, GET, PUT, DELETE") if c.Request.Method == "OPTIONS" { c.AbortWithStatus(204) diff --git a/agent/pkg/resolver/resolver.go b/agent/pkg/resolver/resolver.go index 60533704d..a1cdc52de 100644 --- a/agent/pkg/resolver/resolver.go +++ b/agent/pkg/resolver/resolver.go @@ -30,6 +30,11 @@ type Resolver struct { namespace string } +type ResolvedObjectInfo struct { + FullAddress string + Namespace string +} + func (resolver *Resolver) Start(ctx context.Context) { if !resolver.isStarted { resolver.isStarted = true @@ -40,12 +45,12 @@ func (resolver *Resolver) Start(ctx context.Context) { } } -func (resolver *Resolver) Resolve(name string) string { +func (resolver *Resolver) Resolve(name string) *ResolvedObjectInfo { resolvedName, isFound := resolver.nameMap.Get(name) if !isFound { - return "" + return nil } - return resolvedName.(string) + return resolvedName.(*ResolvedObjectInfo) } func (resolver *Resolver) GetMap() cmap.ConcurrentMap { @@ -71,7 +76,7 @@ func (resolver *Resolver) watchPods(ctx context.Context) error { } if event.Type == watch.Deleted { pod := event.Object.(*corev1.Pod) - resolver.saveResolvedName(pod.Status.PodIP, "", event.Type) + resolver.saveResolvedName(pod.Status.PodIP, "", pod.Namespace, event.Type) } case <-ctx.Done(): watcher.Stop() @@ -106,10 +111,10 @@ func (resolver *Resolver) watchEndpoints(ctx context.Context) error { } if subset.Addresses != nil { for _, address := range subset.Addresses { - resolver.saveResolvedName(address.IP, serviceHostname, event.Type) + resolver.saveResolvedName(address.IP, serviceHostname, endpoint.Namespace, event.Type) for _, port := range ports { ipWithPort := fmt.Sprintf("%s:%d", address.IP, port) - resolver.saveResolvedName(ipWithPort, serviceHostname, event.Type) + resolver.saveResolvedName(ipWithPort, serviceHostname, endpoint.Namespace, event.Type) } } } @@ -139,19 +144,19 @@ func (resolver *Resolver) watchServices(ctx context.Context) error { service := event.Object.(*corev1.Service) serviceHostname := fmt.Sprintf("%s.%s", service.Name, service.Namespace) if service.Spec.ClusterIP != "" && service.Spec.ClusterIP != kubClientNullString { - resolver.saveResolvedName(service.Spec.ClusterIP, serviceHostname, event.Type) + resolver.saveResolvedName(service.Spec.ClusterIP, serviceHostname, service.Namespace, event.Type) if service.Spec.Ports != nil { for _, port := range service.Spec.Ports { if port.Port > 0 { - resolver.saveResolvedName(fmt.Sprintf("%s:%d", service.Spec.ClusterIP, port.Port), serviceHostname, event.Type) + resolver.saveResolvedName(fmt.Sprintf("%s:%d", service.Spec.ClusterIP, port.Port), serviceHostname, service.Namespace, event.Type) } } } - resolver.saveServiceIP(service.Spec.ClusterIP, serviceHostname, event.Type) + resolver.saveServiceIP(service.Spec.ClusterIP, serviceHostname, service.Namespace, event.Type) } if service.Status.LoadBalancer.Ingress != nil { for _, ingress := range service.Status.LoadBalancer.Ingress { - resolver.saveResolvedName(ingress.IP, serviceHostname, event.Type) + resolver.saveResolvedName(ingress.IP, serviceHostname, service.Namespace, event.Type) } } case <-ctx.Done(): @@ -161,21 +166,22 @@ func (resolver *Resolver) watchServices(ctx context.Context) error { } } -func (resolver *Resolver) saveResolvedName(key string, resolved string, eventType watch.EventType) { +func (resolver *Resolver) saveResolvedName(key string, resolved string, namespace string, eventType watch.EventType) { if eventType == watch.Deleted { resolver.nameMap.Remove(key) logger.Log.Infof("setting %s=nil", key) } else { - resolver.nameMap.Set(key, resolved) + + resolver.nameMap.Set(key, &ResolvedObjectInfo{FullAddress: resolved, Namespace: namespace}) logger.Log.Infof("setting %s=%s", key, resolved) } } -func (resolver *Resolver) saveServiceIP(key string, resolved string, eventType watch.EventType) { +func (resolver *Resolver) saveServiceIP(key string, resolved string, namespace string, eventType watch.EventType) { if eventType == watch.Deleted { resolver.serviceMap.Remove(key) } else { - resolver.serviceMap.Set(key, resolved) + resolver.nameMap.Set(key, &ResolvedObjectInfo{FullAddress: resolved, Namespace: namespace}) } } diff --git a/cli/README.md.TEMPLATE b/cli/README.md.TEMPLATE index fee03a253..0271bdc27 100644 --- a/cli/README.md.TEMPLATE +++ b/cli/README.md.TEMPLATE @@ -1,5 +1,5 @@ # Mizu release _VER_ -Full changelog for stable release see in [docs](https://github.com/up9inc/mizu/blob/main/docs/CHANGELOG.md) +Mizu CHANGELOG is now part of [Mizu wiki](https://github.com/up9inc/mizu/wiki/CHANGELOG) ## Download Mizu for your platform diff --git a/cli/apiserver/provider.go b/cli/apiserver/provider.go index 07afe6803..b4fd4b394 100644 --- a/cli/apiserver/provider.go +++ b/cli/apiserver/provider.go @@ -4,9 +4,11 @@ import ( "bytes" "encoding/json" "fmt" + "io" "io/ioutil" "net/http" "net/url" + "strings" "time" "github.com/up9inc/mizu/shared/kubernetes" @@ -57,10 +59,8 @@ func (provider *Provider) TestConnection() error { func (provider *Provider) isReachable() (bool, error) { echoUrl := fmt.Sprintf("%s/echo", provider.url) - if response, err := provider.client.Get(echoUrl); err != nil { + if _, err := provider.get(echoUrl); err != nil { return false, err - } else if response.StatusCode != 200 { - return false, fmt.Errorf("invalid status code %v", response.StatusCode) } else { return true, nil } @@ -72,10 +72,8 @@ func (provider *Provider) ReportTapperStatus(tapperStatus shared.TapperStatus) e if jsonValue, err := json.Marshal(tapperStatus); err != nil { return fmt.Errorf("failed Marshal the tapper status %w", err) } else { - if response, err := provider.client.Post(tapperStatusUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil { + if _, err := provider.post(tapperStatusUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil { return fmt.Errorf("failed sending to API server the tapped pods %w", err) - } else if response.StatusCode != 200 { - return fmt.Errorf("failed sending to API server the tapper status, response status code %v", response.StatusCode) } else { logger.Log.Debugf("Reported to server API about tapper status: %v", tapperStatus) return nil @@ -91,10 +89,8 @@ func (provider *Provider) ReportTappedPods(pods []core.Pod) error { if jsonValue, err := json.Marshal(podInfos); err != nil { return fmt.Errorf("failed Marshal the tapped pods %w", err) } else { - if response, err := provider.client.Post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil { + if _, err := provider.post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil { return fmt.Errorf("failed sending to API server the tapped pods %w", err) - } else if response.StatusCode != 200 { - return fmt.Errorf("failed sending to API server the tapped pods, response status code %v", response.StatusCode) } else { logger.Log.Debugf("Reported to server API about %d taped pods successfully", len(podInfos)) return nil @@ -105,11 +101,9 @@ func (provider *Provider) ReportTappedPods(pods []core.Pod) error { func (provider *Provider) GetGeneralStats() (map[string]interface{}, error) { generalStatsUrl := fmt.Sprintf("%s/status/general", provider.url) - response, requestErr := provider.client.Get(generalStatsUrl) + response, requestErr := provider.get(generalStatsUrl) if requestErr != nil { return nil, fmt.Errorf("failed to get general stats for telemetry, err: %w", requestErr) - } else if response.StatusCode != 200 { - return nil, fmt.Errorf("failed to get general stats for telemetry, status code: %v", response.StatusCode) } defer response.Body.Close() @@ -132,7 +126,7 @@ func (provider *Provider) GetVersion() (string, error) { Method: http.MethodGet, URL: versionUrl, } - statusResp, err := provider.client.Do(req) + statusResp, err := provider.do(req) if err != nil { return "", err } @@ -145,3 +139,40 @@ func (provider *Provider) GetVersion() (string, error) { return versionResponse.Ver, nil } + +// When err is nil, resp always contains a non-nil resp.Body. +// Caller should close resp.Body when done reading from it. +func (provider *Provider) get(url string) (*http.Response, error) { + return provider.checkError(provider.client.Get(url)) +} + +// When err is nil, resp always contains a non-nil resp.Body. +// Caller should close resp.Body when done reading from it. +func (provider *Provider) post(url, contentType string, body io.Reader) (*http.Response, error) { + return provider.checkError(provider.client.Post(url, contentType, body)) +} + +// When err is nil, resp always contains a non-nil resp.Body. +// Caller should close resp.Body when done reading from it. +func (provider *Provider) do(req *http.Request) (*http.Response, error) { + return provider.checkError(provider.client.Do(req)) +} + +func (provider *Provider) checkError(response *http.Response, errInOperation error) (*http.Response, error) { + if (errInOperation != nil) { + return response, errInOperation + // Check only if status != 200 (and not status >= 300). Agent APIs return only 200 on success. + } else if response.StatusCode != http.StatusOK { + body, err := ioutil.ReadAll(response.Body) + response.Body.Close() + response.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind + if err != nil { + return response, err + } + + errorMsg := strings.ReplaceAll((string(body)), "\n", ";") + return response, fmt.Errorf("got response with status code: %d, body: %s", response.StatusCode, errorMsg) + } + + return response, nil +} diff --git a/cli/cmd/install.go b/cli/cmd/install.go index e17b1258b..509538d61 100644 --- a/cli/cmd/install.go +++ b/cli/cmd/install.go @@ -1,10 +1,9 @@ package cmd import ( - "fmt" "github.com/spf13/cobra" - "github.com/up9inc/mizu/cli/config" "github.com/up9inc/mizu/cli/telemetry" + "github.com/up9inc/mizu/shared/logger" ) var installCmd = &cobra.Command{ @@ -12,13 +11,13 @@ var installCmd = &cobra.Command{ Short: "Installs mizu components", RunE: func(cmd *cobra.Command, args []string) error { go telemetry.ReportRun("install", nil) - runMizuInstall() - return nil - }, - PreRunE: func(cmd *cobra.Command, args []string) error { - if config.Config.IsNsRestrictedMode() { - return fmt.Errorf("install is not supported in restricted namespace mode") - } + logger.Log.Infof("This command has been deprecated, please use helm as described below.\n\n") + + logger.Log.Infof("To install stable build of Mizu on your cluster using helm, run the following command:") + logger.Log.Infof(" helm install mizu https://static.up9.com/mizu/helm --namespace=mizu-ent --create-namespace\n\n") + + logger.Log.Infof("To install development build of Mizu on your cluster using helm, run the following command:") + logger.Log.Infof(" helm install mizu https://static.up9.com/mizu/helm-develop --namespace=mizu-ent --create-namespace") return nil }, @@ -27,4 +26,3 @@ var installCmd = &cobra.Command{ func init() { rootCmd.AddCommand(installCmd) } - diff --git a/cli/cmd/installRunner.go b/cli/cmd/installRunner.go deleted file mode 100644 index 840e5710e..000000000 --- a/cli/cmd/installRunner.go +++ /dev/null @@ -1,81 +0,0 @@ -package cmd - -import ( - "context" - "errors" - "fmt" - - "github.com/creasty/defaults" - "github.com/up9inc/mizu/cli/config" - "github.com/up9inc/mizu/cli/errormessage" - "github.com/up9inc/mizu/cli/resources" - "github.com/up9inc/mizu/cli/uiUtils" - "github.com/up9inc/mizu/shared" - "github.com/up9inc/mizu/shared/logger" - k8serrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -func runMizuInstall() { - kubernetesProvider, err := getKubernetesProviderForCli() - if err != nil { - return - } - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() // cancel will be called when this function exits - - var serializedValidationRules string - var serializedContract string - - var defaultMaxEntriesDBSizeBytes int64 = 200 * 1000 * 1000 - - defaultResources := shared.Resources{} - if err := defaults.Set(&defaultResources); err != nil { - logger.Log.Debug(err) - } - - mizuAgentConfig := getInstallMizuAgentConfig(defaultMaxEntriesDBSizeBytes, defaultResources) - serializedMizuConfig, err := getSerializedMizuAgentConfig(mizuAgentConfig) - if err != nil { - logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error serializing mizu config: %v", errormessage.FormatError(err))) - return - } - - if err = resources.CreateInstallMizuResources(ctx, kubernetesProvider, serializedValidationRules, - serializedContract, serializedMizuConfig, config.Config.IsNsRestrictedMode(), - config.Config.MizuResourcesNamespace, config.Config.AgentImage, - config.Config.KratosImage, config.Config.KetoImage, - nil, defaultMaxEntriesDBSizeBytes, defaultResources, config.Config.ImagePullPolicy(), - config.Config.LogLevel(), false); err != nil { - var statusError *k8serrors.StatusError - if errors.As(err, &statusError) && (statusError.ErrStatus.Reason == metav1.StatusReasonAlreadyExists) { - logger.Log.Info("Mizu is already running in this namespace, run `mizu clean` to remove the currently running Mizu instance") - } else { - defer resources.CleanUpMizuResources(ctx, cancel, kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.MizuResourcesNamespace) - logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error creating resources: %v", errormessage.FormatError(err))) - } - - return - } - - logger.Log.Infof(uiUtils.Magenta, "Installation completed, run `mizu view` to connect to the mizu daemon instance") -} - -func getInstallMizuAgentConfig(maxDBSizeBytes int64, tapperResources shared.Resources) *shared.MizuAgentConfig { - mizuAgentConfig := shared.MizuAgentConfig{ - MaxDBSizeBytes: maxDBSizeBytes, - AgentImage: config.Config.AgentImage, - PullPolicy: config.Config.ImagePullPolicyStr, - LogLevel: config.Config.LogLevel(), - TapperResources: tapperResources, - MizuResourcesNamespace: config.Config.MizuResourcesNamespace, - AgentDatabasePath: shared.DataDirPath, - StandaloneMode: true, - ServiceMap: config.Config.ServiceMap, - OAS: config.Config.OAS, - Elastic: config.Config.Elastic, - } - - return &mizuAgentConfig -} diff --git a/cli/cmd/tapRunner.go b/cli/cmd/tapRunner.go index 5e37135bc..4e8383446 100644 --- a/cli/cmd/tapRunner.go +++ b/cli/cmd/tapRunner.go @@ -162,6 +162,7 @@ func getTapMizuAgentConfig() *shared.MizuAgentConfig { AgentDatabasePath: shared.DataDirPath, ServiceMap: config.Config.ServiceMap, OAS: config.Config.OAS, + Telemetry: config.Config.Telemetry, Elastic: config.Config.Elastic, } diff --git a/cli/cmd/viewRunner.go b/cli/cmd/viewRunner.go index 9a4101edd..ebcddc67e 100644 --- a/cli/cmd/viewRunner.go +++ b/cli/cmd/viewRunner.go @@ -3,13 +3,13 @@ package cmd import ( "context" "fmt" - "github.com/up9inc/mizu/cli/utils" "net/http" + "github.com/up9inc/mizu/cli/utils" + "github.com/up9inc/mizu/cli/apiserver" "github.com/up9inc/mizu/cli/config" "github.com/up9inc/mizu/cli/mizu/fsUtils" - "github.com/up9inc/mizu/cli/mizu/version" "github.com/up9inc/mizu/cli/uiUtils" "github.com/up9inc/mizu/shared/kubernetes" "github.com/up9inc/mizu/shared/logger" @@ -62,14 +62,5 @@ func runMizuView() { uiUtils.OpenBrowser(url) } - if isCompatible, err := version.CheckVersionCompatibility(apiServerProvider); err != nil { - logger.Log.Errorf("Failed to check versions compatibility %v", err) - cancel() - return - } else if !isCompatible { - cancel() - return - } - utils.WaitForFinish(ctx, cancel) } diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index a866207cf..c27c061b0 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,72 +1 @@ -# CHANGELOG -This document summarizes main and fixes changes published in stable (aka `main`) branch of this project. -Ongoing work and development releases are under `develop` branch. - -## 0.24.0 - -### main features -* ARM64 support -- Mizu is now available for ARM 64bit architecture - * Now you can run Mizu with `minikube` on your Apple M1 laptop or any other ARM-based hosts -* New command helps user verify Mizu deployment - * Run `mizu check` to verify Mizu was deployed successfully - * `mizu check` verifies version compatibility, resources and permissions required by Mizu -* EXPERIMENTAL: Service Map - graph of all service interactions - * Arrow direction show client to server connection - * Graph edge width reflects volume of traffic captured between the services - * to enable this experimental feature use `--set service-map=true` flag - -### improvements -* Mizu container images are now served from [Docker Hub](https://hub.docker.com/r/up9inc/mizu), as multi-architecture images (arm64, amd64) -* in Mizu GUI the filter query can now be applied by pressing CONTROL/COMMAND + ENTER -* try port-forwarding if http-proxy connection to Mizu API server is not available - -### notable bug fixes -* Fixed HTTP/1.0 presentation which was shown as HTTP/1.1 -* Fixed handling of long-living TCP connections, improves capturing gRPC and HTTP/2 traffic, and helps in service-mesh setups (istio, linkerd) - - -## 0.23.0 -### notable bug fixes -* fixed errors in Redis protocol parser (better handling of Array and Bulk String message types) - - - -## 0.22.0 - -### main features -* Service Mesh support -- mizu is now capable to tap mTLS traffic between pods connected by Istio service mesh - * Use `--service-mesh` option to enable this feature -* New installation option - have the same Mizu functionality as long living pods in your cluster, with password protection - * To install use `mizu install` command - * To access use `mizu view` or `kubectl -n mizu port-forward svc/mizu-api-server` - * To uninstall run `mizu clean` -* At first login - * Set admin password as prompted, use it to login to mizu later on. - * After login, user should select cluster namespaces to tap: by default all namespaces in the cluster are selected, user can select/unselect according to their needs. These settings are retained and can be modified at any time via Settings menu (cog icon on the top-right) - - -### improvements -* improved Mizu permissions/roles logic to support clusters with strict PodSecurityPolicy (PSP) -- see [PERMISSIONS](PERMISSIONS.md) doc for more details - -### notable bug fixes -* mizu now works properly when API service is exposed via HTTPS url -* mizu now properly displays KAFKA message body - - - - -## 0.21.0 - -### main features -* New traffic search & stream exprience -* Rich query language with full-text search capabilities on headers & body -* Distinct live-streaming vs paging/browsing modes, all with filter applied - -### improvements -* GUI - source and destination IP addresses & service names for each traffic item -* GUI - Mizu health - display warning sign in top bar when not all requested pods are successfully tapped -* GUI - pod tapping status reflected in the list (ok or problem) -* Mizu telemetry - report platform type - -### fixes -* Request duration and body size properly shown in GUI (instead of -1) +Mizu CHANGELOG is now part of [Mizu wiki](https://github.com/up9inc/mizu/wiki/CHANGELOG) diff --git a/shared/kubernetes/provider.go b/shared/kubernetes/provider.go index fbb74df05..3ce71a56b 100644 --- a/shared/kubernetes/provider.go +++ b/shared/kubernetes/provider.go @@ -76,6 +76,8 @@ func NewProvider(kubeConfigPath string) (*Provider, error) { "you can set alternative kube config file path by adding the kube-config-path field to the mizu config file, err: %w", kubeConfigPath, err) } + logger.Log.Debugf("K8s client config, host: %s, api path: %s, user agent: %s", restClientConfig.Host, restClientConfig.APIPath, restClientConfig.UserAgent) + return &Provider{ clientSet: clientSet, kubernetesConfig: kubernetesConfig, @@ -952,6 +954,11 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac labelSelector := applyconfmeta.LabelSelector() labelSelector.WithMatchLabels(map[string]string{"app": tapperPodName}) + applyOptions := metav1.ApplyOptions{ + Force: true, + FieldManager: fieldManagerName, + } + daemonSet := applyconfapp.DaemonSet(daemonSetName, namespace) daemonSet. WithLabels(map[string]string{ @@ -960,7 +967,7 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac }). WithSpec(applyconfapp.DaemonSetSpec().WithSelector(labelSelector).WithTemplate(podTemplate)) - _, err = provider.clientSet.AppsV1().DaemonSets(namespace).Apply(ctx, daemonSet, metav1.ApplyOptions{FieldManager: fieldManagerName}) + _, err = provider.clientSet.AppsV1().DaemonSets(namespace).Apply(ctx, daemonSet, applyOptions) return err } diff --git a/shared/kubernetes/proxy.go b/shared/kubernetes/proxy.go index b21d7d097..521a90b57 100644 --- a/shared/kubernetes/proxy.go +++ b/shared/kubernetes/proxy.go @@ -128,9 +128,14 @@ func getHttpDialer(kubernetesProvider *Provider, namespace string, podName strin return nil, err } - path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", namespace, podName) - hostIP := strings.TrimLeft(kubernetesProvider.clientConfig.Host, "htps:/") // no need specify "t" twice - serverURL := url.URL{Scheme: "https", Path: path, Host: hostIP} + clientConfigHostUrl, err := url.Parse(kubernetesProvider.clientConfig.Host) + if err != nil { + return nil, fmt.Errorf("Failed parsing client config host URL %s, error %w", kubernetesProvider.clientConfig.Host, err) + } + path := fmt.Sprintf("%s/api/v1/namespaces/%s/pods/%s/portforward", clientConfigHostUrl.Path, namespace, podName) + + serverURL := url.URL{Scheme: "https", Path: path, Host: clientConfigHostUrl.Host} + logger.Log.Debugf("Http dialer url %v", serverURL) return spdy.NewDialer(upgrader, &http.Client{Transport: roundTripper}, http.MethodPost, &serverURL), nil } diff --git a/shared/models.go b/shared/models.go index 9081e9a9e..7aa0d3d47 100644 --- a/shared/models.go +++ b/shared/models.go @@ -43,6 +43,7 @@ type MizuAgentConfig struct { StandaloneMode bool `json:"standaloneMode"` ServiceMap bool `json:"serviceMap"` OAS bool `json:"oas"` + Telemetry bool `json:"telemetry"` Elastic ElasticConfig `json:"elastic"` } diff --git a/tap/api/api.go b/tap/api/api.go index 52c3cac11..d1a812e80 100644 --- a/tap/api/api.go +++ b/tap/api/api.go @@ -39,10 +39,9 @@ type TCP struct { } type Extension struct { - Protocol *Protocol - Path string - Dissector Dissector - MatcherMap *sync.Map + Protocol *Protocol + Path string + Dissector Dissector } type ConnectionInfo struct { @@ -62,7 +61,6 @@ type TcpID struct { } type CounterPair struct { - StreamId int64 Request uint Response uint sync.Mutex @@ -100,10 +98,15 @@ type SuperIdentifier struct { type Dissector interface { Register(*Extension) Ping() - Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions) error - Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string) *Entry + Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions, reqResMatcher RequestResponseMatcher) error + Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *Entry Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) Macros() map[string]string + NewResponseRequestMatcher() RequestResponseMatcher +} + +type RequestResponseMatcher interface { + GetMap() *sync.Map } type Emitting struct { @@ -125,6 +128,7 @@ type Entry struct { Protocol Protocol `json:"proto"` Source *TCP `json:"src"` Destination *TCP `json:"dst"` + Namespace string `json:"namespace,omitempty"` Outgoing bool `json:"outgoing"` Timestamp int64 `json:"timestamp"` StartTime time.Time `json:"startTime"` diff --git a/tap/cleaner.go b/tap/cleaner.go index 61be717f3..cdf2acf20 100644 --- a/tap/cleaner.go +++ b/tap/cleaner.go @@ -22,6 +22,7 @@ type Cleaner struct { connectionTimeout time.Duration stats CleanerStats statsMutex sync.Mutex + streamsMap *tcpStreamMap } func (cl *Cleaner) clean() { @@ -32,10 +33,15 @@ func (cl *Cleaner) clean() { flushed, closed := cl.assembler.FlushCloseOlderThan(startCleanTime.Add(-cl.connectionTimeout)) cl.assemblerMutex.Unlock() - for _, extension := range extensions { - deleted := deleteOlderThan(extension.MatcherMap, startCleanTime.Add(-cl.connectionTimeout)) + cl.streamsMap.streams.Range(func(k, v interface{}) bool { + reqResMatcher := v.(*tcpStreamWrapper).reqResMatcher + if reqResMatcher == nil { + return true + } + deleted := deleteOlderThan(reqResMatcher.GetMap(), startCleanTime.Add(-cl.connectionTimeout)) cl.stats.deleted += deleted - } + return true + }) cl.statsMutex.Lock() logger.Log.Debugf("Assembler Stats after cleaning %s", cl.assembler.Dump()) diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index 9ca023a4f..f672dba7b 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -42,7 +42,7 @@ func (d dissecting) Ping() { const amqpRequest string = "amqp_request" -func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error { +func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error { r := AmqpReader{b} var remaining int @@ -212,7 +212,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co } } -func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.Entry { +func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry { request := item.Pair.Request.Payload.(map[string]interface{}) reqDetails := request["details"].(map[string]interface{}) @@ -254,6 +254,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, IP: item.ConnectionInfo.ServerIP, Port: item.ConnectionInfo.ServerPort, }, + Namespace: namespace, Outgoing: item.ConnectionInfo.IsOutgoing, Request: reqDetails, Method: request["method"].(string), @@ -300,6 +301,10 @@ func (d dissecting) Macros() map[string]string { } } +func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher { + return nil +} + var Dissector dissecting func NewDissector() api.Dissector { diff --git a/tap/extensions/http/Makefile b/tap/extensions/http/Makefile index 529cc27ef..253910d58 100644 --- a/tap/extensions/http/Makefile +++ b/tap/extensions/http/Makefile @@ -13,4 +13,4 @@ test-pull-bin: test-pull-expect: @mkdir -p expect - @[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect/http/\* expect + @[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect2/http/\* expect diff --git a/tap/extensions/http/handlers.go b/tap/extensions/http/handlers.go index 8d084be77..7ffa5fee5 100644 --- a/tap/extensions/http/handlers.go +++ b/tap/extensions/http/handlers.go @@ -47,7 +47,7 @@ func replaceForwardedFor(item *api.OutputChannelItem) { item.ConnectionInfo.ClientPort = "" } -func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) error { +func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) error { streamID, messageHTTP1, isGrpc, err := http2Assembler.readMessage() if err != nil { return err @@ -58,7 +58,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi switch messageHTTP1 := messageHTTP1.(type) { case http.Request: ident := fmt.Sprintf( - "%s->%s %s->%s %d %s", + "%s_%s_%s_%s_%d_%s", tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, @@ -78,7 +78,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi } case http.Response: ident := fmt.Sprintf( - "%s->%s %s->%s %d %s", + "%s_%s_%s_%s_%d_%s", tcpID.DstIP, tcpID.SrcIP, tcpID.DstPort, @@ -110,7 +110,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi return nil } -func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) (switchingProtocolsHTTP2 bool, req *http.Request, err error) { +func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, req *http.Request, err error) { req, err = http.ReadRequest(b) if err != nil { return @@ -130,8 +130,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api req.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind ident := fmt.Sprintf( - "%d_%s:%s_%s:%s_%d_%s", - counterPair.StreamId, + "%s_%s_%s_%s_%d_%s", tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, @@ -153,7 +152,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api return } -func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) (switchingProtocolsHTTP2 bool, err error) { +func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, err error) { var res *http.Response res, err = http.ReadResponse(b, nil) if err != nil { @@ -174,8 +173,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api res.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind ident := fmt.Sprintf( - "%d_%s:%s_%s:%s_%d_%s", - counterPair.StreamId, + "%s_%s_%s_%s_%d_%s", tcpID.DstIP, tcpID.SrcIP, tcpID.DstPort, diff --git a/tap/extensions/http/main.go b/tap/extensions/http/main.go index 28c8a1744..2b3d781f1 100644 --- a/tap/extensions/http/main.go +++ b/tap/extensions/http/main.go @@ -84,14 +84,15 @@ type dissecting string func (d dissecting) Register(extension *api.Extension) { extension.Protocol = &http11protocol - extension.MatcherMap = reqResMatcher.openMessagesMap } func (d dissecting) Ping() { log.Printf("pong %s", http11protocol.Name) } -func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error { +func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error { + reqResMatcher := _reqResMatcher.(*requestResponseMatcher) + var err error isHTTP2, _ := checkIsHTTP2Connection(b, isClient) @@ -124,7 +125,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co } if isHTTP2 { - err = handleHTTP2Stream(http2Assembler, tcpID, superTimer, emitter, options) + err = handleHTTP2Stream(http2Assembler, tcpID, superTimer, emitter, options, reqResMatcher) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { @@ -133,7 +134,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co superIdentifier.Protocol = &http11protocol } else if isClient { var req *http.Request - switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter, options) + switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter, options, reqResMatcher) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { @@ -144,7 +145,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co // In case of an HTTP2 upgrade, duplicate the HTTP1 request into HTTP2 with stream ID 1 if switchingProtocolsHTTP2 { ident := fmt.Sprintf( - "%s->%s %s->%s 1 %s", + "%s_%s_%s_%s_1_%s", tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, @@ -164,7 +165,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co } } } else { - switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter, options) + switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter, options, reqResMatcher) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { @@ -181,7 +182,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co return nil } -func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.Entry { +func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry { var host, authority, path string request := item.Pair.Request.Payload.(map[string]interface{}) @@ -279,6 +280,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, IP: item.ConnectionInfo.ServerIP, Port: item.ConnectionInfo.ServerPort, }, + Namespace: namespace, Outgoing: item.ConnectionInfo.IsOutgoing, Request: reqDetails, Response: resDetails, @@ -472,6 +474,10 @@ func (d dissecting) Macros() map[string]string { } } +func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher { + return createResponseRequestMatcher() +} + var Dissector dissecting func NewDissector() api.Dissector { diff --git a/tap/extensions/http/main_test.go b/tap/extensions/http/main_test.go index 97cbc6430..90f90ab39 100644 --- a/tap/extensions/http/main_test.go +++ b/tap/extensions/http/main_test.go @@ -11,7 +11,6 @@ import ( "os" "path" "path/filepath" - "sort" "testing" "time" @@ -39,7 +38,6 @@ func TestRegister(t *testing.T) { extension := &api.Extension{} dissector.Register(extension) assert.Equal(t, "http", extension.Protocol.Name) - assert.NotNil(t, extension.MatcherMap) } func TestMacros(t *testing.T) { @@ -123,7 +121,8 @@ func TestDissect(t *testing.T) { SrcPort: "1", DstPort: "2", } - err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options) + reqResMatcher := dissector.NewResponseRequestMatcher() + err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { panic(err) } @@ -141,7 +140,7 @@ func TestDissect(t *testing.T) { SrcPort: "2", DstPort: "1", } - err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options) + err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { panic(err) } @@ -155,14 +154,6 @@ func TestDissect(t *testing.T) { stop <- true - sort.Slice(items, func(i, j int) bool { - iMarshaled, err := json.Marshal(items[i]) - assert.Nil(t, err) - jMarshaled, err := json.Marshal(items[j]) - assert.Nil(t, err) - return len(iMarshaled) < len(jMarshaled) - }) - marshaled, err := json.Marshal(items) assert.Nil(t, err) @@ -214,7 +205,7 @@ func TestAnalyze(t *testing.T) { var entries []*api.Entry for _, item := range items { - entry := dissector.Analyze(item, "", "") + entry := dissector.Analyze(item, "", "", "") entries = append(entries, entry) } diff --git a/tap/extensions/http/matcher.go b/tap/extensions/http/matcher.go index 0a28a65ac..67c09136a 100644 --- a/tap/extensions/http/matcher.go +++ b/tap/extensions/http/matcher.go @@ -8,16 +8,17 @@ import ( "github.com/up9inc/mizu/tap/api" ) -var reqResMatcher = createResponseRequestMatcher() // global - -// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}_{incremental_counter} +// Key is {client_addr}_{client_port}_{dest_addr}_{dest_port}_{incremental_counter}_{proto_ident} type requestResponseMatcher struct { openMessagesMap *sync.Map } -func createResponseRequestMatcher() requestResponseMatcher { - newMatcher := &requestResponseMatcher{openMessagesMap: &sync.Map{}} - return *newMatcher +func createResponseRequestMatcher() api.RequestResponseMatcher { + return &requestResponseMatcher{openMessagesMap: &sync.Map{}} +} + +func (matcher *requestResponseMatcher) GetMap() *sync.Map { + return matcher.openMessagesMap } func (matcher *requestResponseMatcher) registerRequest(ident string, request *http.Request, captureTime time.Time, protoMinor int) *api.OutputChannelItem { diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index 84a764064..0be1ccdbf 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -33,27 +33,27 @@ type dissecting string func (d dissecting) Register(extension *api.Extension) { extension.Protocol = &_protocol - extension.MatcherMap = reqResMatcher.openMessagesMap } func (d dissecting) Ping() { log.Printf("pong %s", _protocol.Name) } -func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error { +func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error { + reqResMatcher := _reqResMatcher.(*requestResponseMatcher) for { if superIdentifier.Protocol != nil && superIdentifier.Protocol != &_protocol { return errors.New("Identified by another protocol") } if isClient { - _, _, err := ReadRequest(b, tcpID, counterPair, superTimer) + _, _, err := ReadRequest(b, tcpID, counterPair, superTimer, reqResMatcher) if err != nil { return err } superIdentifier.Protocol = &_protocol } else { - err := ReadResponse(b, tcpID, counterPair, superTimer, emitter) + err := ReadResponse(b, tcpID, counterPair, superTimer, emitter, reqResMatcher) if err != nil { return err } @@ -62,7 +62,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co } } -func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.Entry { +func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry { request := item.Pair.Request.Payload.(map[string]interface{}) reqDetails := request["details"].(map[string]interface{}) apiKey := ApiKey(reqDetails["apiKey"].(float64)) @@ -158,6 +158,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, IP: item.ConnectionInfo.ServerIP, Port: item.ConnectionInfo.ServerPort, }, + Namespace: namespace, Outgoing: item.ConnectionInfo.IsOutgoing, Request: reqDetails, Response: item.Pair.Response.Payload.(map[string]interface{})["details"].(map[string]interface{}), @@ -215,6 +216,10 @@ func (d dissecting) Macros() map[string]string { } } +func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher { + return createResponseRequestMatcher() +} + var Dissector dissecting func NewDissector() api.Dissector { diff --git a/tap/extensions/kafka/matcher.go b/tap/extensions/kafka/matcher.go index 8bf8914bd..d5c77618a 100644 --- a/tap/extensions/kafka/matcher.go +++ b/tap/extensions/kafka/matcher.go @@ -3,9 +3,10 @@ package kafka import ( "sync" "time" + + "github.com/up9inc/mizu/tap/api" ) -var reqResMatcher = CreateResponseRequestMatcher() // global const maxTry int = 3000 type RequestResponsePair struct { @@ -13,14 +14,17 @@ type RequestResponsePair struct { Response Response } -// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}::{correlation_id} +// Key is {client_addr}_{client_port}_{dest_addr}_{dest_port}_{correlation_id} type requestResponseMatcher struct { openMessagesMap *sync.Map } -func CreateResponseRequestMatcher() requestResponseMatcher { - newMatcher := &requestResponseMatcher{openMessagesMap: &sync.Map{}} - return *newMatcher +func createResponseRequestMatcher() api.RequestResponseMatcher { + return &requestResponseMatcher{openMessagesMap: &sync.Map{}} +} + +func (matcher *requestResponseMatcher) GetMap() *sync.Map { + return matcher.openMessagesMap } func (matcher *requestResponseMatcher) registerRequest(key string, request *Request) *RequestResponsePair { diff --git a/tap/extensions/kafka/request.go b/tap/extensions/kafka/request.go index 982312936..362d9e1df 100644 --- a/tap/extensions/kafka/request.go +++ b/tap/extensions/kafka/request.go @@ -19,7 +19,7 @@ type Request struct { CaptureTime time.Time `json:"captureTime"` } -func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer) (apiKey ApiKey, apiVersion int16, err error) { +func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, reqResMatcher *requestResponseMatcher) (apiKey ApiKey, apiVersion int16, err error) { d := &decoder{reader: r, remain: 4} size := d.readInt32() @@ -214,8 +214,7 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, su } key := fmt.Sprintf( - "%d_%s:%s_%s:%s_%d", - counterPair.StreamId, + "%s_%s_%s_%s_%d", tcpID.SrcIP, tcpID.SrcPort, tcpID.DstIP, diff --git a/tap/extensions/kafka/response.go b/tap/extensions/kafka/response.go index dd9909034..0eb7950c7 100644 --- a/tap/extensions/kafka/response.go +++ b/tap/extensions/kafka/response.go @@ -16,7 +16,7 @@ type Response struct { CaptureTime time.Time `json:"captureTime"` } -func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) (err error) { +func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, reqResMatcher *requestResponseMatcher) (err error) { d := &decoder{reader: r, remain: 4} size := d.readInt32() @@ -44,8 +44,7 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, s } key := fmt.Sprintf( - "%d_%s:%s_%s:%s_%d", - counterPair.StreamId, + "%s_%s_%s_%s_%d", tcpID.DstIP, tcpID.DstPort, tcpID.SrcIP, diff --git a/tap/extensions/redis/handlers.go b/tap/extensions/redis/handlers.go index a4a3a3858..c4eb7985e 100644 --- a/tap/extensions/redis/handlers.go +++ b/tap/extensions/redis/handlers.go @@ -6,15 +6,14 @@ import ( "github.com/up9inc/mizu/tap/api" ) -func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket) error { +func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket, reqResMatcher *requestResponseMatcher) error { counterPair.Lock() counterPair.Request++ requestCounter := counterPair.Request counterPair.Unlock() ident := fmt.Sprintf( - "%d_%s:%s_%s:%s_%d", - counterPair.StreamId, + "%s_%s_%s_%s_%d", tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, @@ -36,15 +35,14 @@ func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTim return nil } -func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket) error { +func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket, reqResMatcher *requestResponseMatcher) error { counterPair.Lock() counterPair.Response++ responseCounter := counterPair.Response counterPair.Unlock() ident := fmt.Sprintf( - "%d_%s:%s_%s:%s_%d", - counterPair.StreamId, + "%s_%s_%s_%s_%d", tcpID.DstIP, tcpID.SrcIP, tcpID.DstPort, diff --git a/tap/extensions/redis/main.go b/tap/extensions/redis/main.go index 6de87739a..db7480a7b 100644 --- a/tap/extensions/redis/main.go +++ b/tap/extensions/redis/main.go @@ -32,14 +32,14 @@ type dissecting string func (d dissecting) Register(extension *api.Extension) { extension.Protocol = &protocol - extension.MatcherMap = reqResMatcher.openMessagesMap } func (d dissecting) Ping() { log.Printf("pong %s", protocol.Name) } -func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error { +func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error { + reqResMatcher := _reqResMatcher.(*requestResponseMatcher) is := &RedisInputStream{ Reader: b, Buf: make([]byte, 8192), @@ -52,9 +52,9 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co } if isClient { - err = handleClientStream(tcpID, counterPair, superTimer, emitter, redisPacket) + err = handleClientStream(tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher) } else { - err = handleServerStream(tcpID, counterPair, superTimer, emitter, redisPacket) + err = handleServerStream(tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher) } if err != nil { @@ -63,7 +63,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co } } -func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.Entry { +func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry { request := item.Pair.Request.Payload.(map[string]interface{}) response := item.Pair.Response.Payload.(map[string]interface{}) reqDetails := request["details"].(map[string]interface{}) @@ -96,6 +96,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, IP: item.ConnectionInfo.ServerIP, Port: item.ConnectionInfo.ServerPort, }, + Namespace: namespace, Outgoing: item.ConnectionInfo.IsOutgoing, Request: reqDetails, Response: resDetails, @@ -127,6 +128,10 @@ func (d dissecting) Macros() map[string]string { } } +func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher { + return createResponseRequestMatcher() +} + var Dissector dissecting func NewDissector() api.Dissector { diff --git a/tap/extensions/redis/matcher.go b/tap/extensions/redis/matcher.go index 66e1c423b..e63c2f4b2 100644 --- a/tap/extensions/redis/matcher.go +++ b/tap/extensions/redis/matcher.go @@ -7,16 +7,17 @@ import ( "github.com/up9inc/mizu/tap/api" ) -var reqResMatcher = createResponseRequestMatcher() // global - -// Key is `{stream_id}_{src_ip}:{dst_ip}_{src_ip}:{src_port}_{incremental_counter}` +// Key is `{src_ip}_{dst_ip}_{src_ip}_{src_port}_{incremental_counter}` type requestResponseMatcher struct { openMessagesMap *sync.Map } -func createResponseRequestMatcher() requestResponseMatcher { - newMatcher := &requestResponseMatcher{openMessagesMap: &sync.Map{}} - return *newMatcher +func createResponseRequestMatcher() api.RequestResponseMatcher { + return &requestResponseMatcher{openMessagesMap: &sync.Map{}} +} + +func (matcher *requestResponseMatcher) GetMap() *sync.Map { + return matcher.openMessagesMap } func (matcher *requestResponseMatcher) registerRequest(ident string, request *RedisPacket, captureTime time.Time) *api.OutputChannelItem { diff --git a/tap/passive_tapper.go b/tap/passive_tapper.go index 2a0a142ce..2779295e9 100644 --- a/tap/passive_tapper.go +++ b/tap/passive_tapper.go @@ -210,6 +210,7 @@ func startPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem) assemblerMutex: &assembler.assemblerMutex, cleanPeriod: cleanPeriod, connectionTimeout: staleConnectionTimeout, + streamsMap: streamsMap, } cleaner.start() diff --git a/tap/tcp_reader.go b/tap/tcp_reader.go index bacda8ac8..ceb94e98e 100644 --- a/tap/tcp_reader.go +++ b/tap/tcp_reader.go @@ -47,6 +47,7 @@ type tcpReader struct { extension *api.Extension emitter api.Emitter counterPair *api.CounterPair + reqResMatcher api.RequestResponseMatcher sync.Mutex } @@ -94,7 +95,7 @@ func (h *tcpReader) Close() { func (h *tcpReader) run(wg *sync.WaitGroup) { defer wg.Done() b := bufio.NewReader(h) - err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter, filteringOptions) + err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter, filteringOptions, h.reqResMatcher) if err != nil { _, err = io.Copy(ioutil.Discard, b) if err != nil { diff --git a/tap/tcp_stream_factory.go b/tap/tcp_stream_factory.go index ce56dcec8..9073e013b 100644 --- a/tap/tcp_stream_factory.go +++ b/tap/tcp_stream_factory.go @@ -29,8 +29,9 @@ type tcpStreamFactory struct { } type tcpStreamWrapper struct { - stream *tcpStream - createdAt time.Time + stream *tcpStream + reqResMatcher api.RequestResponseMatcher + createdAt time.Time } func NewTcpStreamFactory(emitter api.Emitter, streamsMap *tcpStreamMap, opts *TapOpts) *tcpStreamFactory { @@ -81,8 +82,8 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T if stream.isTapTarget { stream.id = factory.streamsMap.nextId() for i, extension := range extensions { + reqResMatcher := extension.Dissector.NewResponseRequestMatcher() counterPair := &api.CounterPair{ - StreamId: stream.id, Request: 0, Response: 0, } @@ -103,6 +104,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T extension: extension, emitter: factory.Emitter, counterPair: counterPair, + reqResMatcher: reqResMatcher, }) stream.servers = append(stream.servers, tcpReader{ msgQueue: make(chan tcpReaderDataMsg), @@ -121,11 +123,13 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T extension: extension, emitter: factory.Emitter, counterPair: counterPair, + reqResMatcher: reqResMatcher, }) factory.streamsMap.Store(stream.id, &tcpStreamWrapper{ - stream: stream, - createdAt: time.Now(), + stream: stream, + reqResMatcher: reqResMatcher, + createdAt: time.Now(), }) factory.wg.Add(2) diff --git a/ui/src/components/Pages/TrafficPage/TrafficPage.tsx b/ui/src/components/Pages/TrafficPage/TrafficPage.tsx index 5b14a7d6c..d1f14985c 100644 --- a/ui/src/components/Pages/TrafficPage/TrafficPage.tsx +++ b/ui/src/components/Pages/TrafficPage/TrafficPage.tsx @@ -76,7 +76,7 @@ export const TrafficPage: React.FC = ({setAnalyzeStatus}) => { const scrollableRef = useRef(null); const [openOasModal, setOpenOasModal] = useState(false); - const handleOpenModal = () => setOpenOasModal(true); + const handleCloseModal = () => setOpenOasModal(false); const [showTLSWarning, setShowTLSWarning] = useState(false); @@ -258,8 +258,14 @@ export const TrafficPage: React.FC = ({setAnalyzeStatus}) => { } } + const handleOpenOasModal = () => { + ws.current.close(); + setOpenOasModal(true); + } + const openServiceMapModalDebounce = debounce(() => { - setServiceMapModalOpen(true) + ws.current.close(); + setServiceMapModalOpen(true); }, 500); return ( @@ -285,7 +291,7 @@ export const TrafficPage: React.FC = ({setAnalyzeStatus}) => { variant="contained" className={commonClasses.outlinedButton + " " + commonClasses.imagedButton} style={{ marginRight: 25 }} - onClick={handleOpenModal} + onClick={handleOpenOasModal} > Show OAS }