Develop -> main (Release 27.0) (#812)

* ws pause after buttons click (#798)

* ws pause after buttons click

* name change

Co-authored-by: Igor Gov <iggvrv@gmail.com>

* moved CHANGELOG to Mizu wiki in Github (#801)

* Adding logs of k8s client config (#800)

* Support rancher client config (#802)

* Move the request-response matcher's scope from global-level to TCP stream-level (#793)

* Create a new request-response matcher for each TCP stream

* Fix the `ident` formats in request-response matchers

* Don't sort the items in the HTTP tests

* Update tap/extensions/kafka/matcher.go

Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>

* Temporarily change the bucket folder to the new expected

* Bring back the `deleteOlderThan` method

* Use `api.RequestResponseMatcher` instead of `interface{}` as type

* Use `api.RequestResponseMatcher` instead of `interface{}` as type (more)

* Update the key format comments

Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>

* Force DaemonSet apply (#804)

Required for apply to work if the DaemonSet is created by another program e.g. Helm.

* Tag entries destination namespace (#803)

* Update main.go, socket_server_handlers.go, and 7 more files...

* Update cors.go

* omit empty namespace in json so tests pass

* fix indent

* Sending telemetry config to server (#808)

* Print http error response details (#792)

* View command - no version check (#810)

* Update mizu install command (#811)

Co-authored-by: AmitUp9 <96980485+AmitUp9@users.noreply.github.com>
Co-authored-by: Alex Haiut <alex@up9.com>
Co-authored-by: M. Mert Yıldıran <mehmet@up9.com>
Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
Co-authored-by: Nimrod Gilboa Markevich <59927337+nimrod-up9@users.noreply.github.com>
Co-authored-by: RamiBerm <54766858+RamiBerm@users.noreply.github.com>
This commit is contained in:
Igor Gov 2022-02-15 16:54:48 +02:00 committed by GitHub
parent 727d75bccc
commit 77078e78d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 236 additions and 308 deletions

View File

@ -118,8 +118,8 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
for item := range outputItems {
extension := extensionsMap[item.Protocol.Name]
resolvedSource, resolvedDestionation := resolveIP(item.ConnectionInfo)
mizuEntry := extension.Dissector.Analyze(item, resolvedSource, resolvedDestionation)
resolvedSource, resolvedDestionation, namespace := resolveIP(item.ConnectionInfo)
mizuEntry := extension.Dissector.Analyze(item, resolvedSource, resolvedDestionation, namespace)
if extension.Protocol.Name == "http" {
if !disableOASValidation {
var httpPair tapApi.HTTPRequestResponsePair
@ -158,26 +158,32 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
}
}
func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, resolvedDestination string) {
func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, resolvedDestination string, namespace string) {
if k8sResolver != nil {
unresolvedSource := connectionInfo.ClientIP
resolvedSource = k8sResolver.Resolve(unresolvedSource)
if resolvedSource == "" {
resolvedSourceObject := k8sResolver.Resolve(unresolvedSource)
if resolvedSourceObject == nil {
logger.Log.Debugf("Cannot find resolved name to source: %s", unresolvedSource)
if os.Getenv("SKIP_NOT_RESOLVED_SOURCE") == "1" {
return
}
} else {
resolvedSource = resolvedSourceObject.FullAddress
}
unresolvedDestination := fmt.Sprintf("%s:%s", connectionInfo.ServerIP, connectionInfo.ServerPort)
resolvedDestination = k8sResolver.Resolve(unresolvedDestination)
if resolvedDestination == "" {
resolvedDestinationObject := k8sResolver.Resolve(unresolvedDestination)
if resolvedDestinationObject == nil {
logger.Log.Debugf("Cannot find resolved name to dest: %s", unresolvedDestination)
if os.Getenv("SKIP_NOT_RESOLVED_DEST") == "1" {
return
}
} else {
resolvedDestination = resolvedDestinationObject.FullAddress
namespace = resolvedDestinationObject.Namespace
}
}
return resolvedSource, resolvedDestination
return resolvedSource, resolvedDestination, namespace
}
func CheckIsServiceIP(address string) bool {

View File

@ -104,9 +104,9 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) {
}
func handleTLSLink(outboundLinkMessage models.WebsocketOutboundLinkMessage) {
resolvedName := k8sResolver.Resolve(outboundLinkMessage.Data.DstIP)
if resolvedName != "" {
outboundLinkMessage.Data.DstIP = resolvedName
resolvedNameObject := k8sResolver.Resolve(outboundLinkMessage.Data.DstIP)
if resolvedNameObject != nil {
outboundLinkMessage.Data.DstIP = resolvedNameObject.FullAddress
} else if outboundLinkMessage.Data.SuggestedResolvedName != "" {
outboundLinkMessage.Data.DstIP = outboundLinkMessage.Data.SuggestedResolvedName
}

View File

@ -7,7 +7,7 @@ func CORSMiddleware() gin.HandlerFunc {
c.Writer.Header().Set("Access-Control-Allow-Origin", "*")
c.Writer.Header().Set("Access-Control-Allow-Credentials", "true")
c.Writer.Header().Set("Access-Control-Allow-Headers", "Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization, accept, origin, Cache-Control, X-Requested-With, x-session-token")
c.Writer.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS, GET, PUT")
c.Writer.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS, GET, PUT, DELETE")
if c.Request.Method == "OPTIONS" {
c.AbortWithStatus(204)

View File

@ -30,6 +30,11 @@ type Resolver struct {
namespace string
}
type ResolvedObjectInfo struct {
FullAddress string
Namespace string
}
func (resolver *Resolver) Start(ctx context.Context) {
if !resolver.isStarted {
resolver.isStarted = true
@ -40,12 +45,12 @@ func (resolver *Resolver) Start(ctx context.Context) {
}
}
func (resolver *Resolver) Resolve(name string) string {
func (resolver *Resolver) Resolve(name string) *ResolvedObjectInfo {
resolvedName, isFound := resolver.nameMap.Get(name)
if !isFound {
return ""
return nil
}
return resolvedName.(string)
return resolvedName.(*ResolvedObjectInfo)
}
func (resolver *Resolver) GetMap() cmap.ConcurrentMap {
@ -71,7 +76,7 @@ func (resolver *Resolver) watchPods(ctx context.Context) error {
}
if event.Type == watch.Deleted {
pod := event.Object.(*corev1.Pod)
resolver.saveResolvedName(pod.Status.PodIP, "", event.Type)
resolver.saveResolvedName(pod.Status.PodIP, "", pod.Namespace, event.Type)
}
case <-ctx.Done():
watcher.Stop()
@ -106,10 +111,10 @@ func (resolver *Resolver) watchEndpoints(ctx context.Context) error {
}
if subset.Addresses != nil {
for _, address := range subset.Addresses {
resolver.saveResolvedName(address.IP, serviceHostname, event.Type)
resolver.saveResolvedName(address.IP, serviceHostname, endpoint.Namespace, event.Type)
for _, port := range ports {
ipWithPort := fmt.Sprintf("%s:%d", address.IP, port)
resolver.saveResolvedName(ipWithPort, serviceHostname, event.Type)
resolver.saveResolvedName(ipWithPort, serviceHostname, endpoint.Namespace, event.Type)
}
}
}
@ -139,19 +144,19 @@ func (resolver *Resolver) watchServices(ctx context.Context) error {
service := event.Object.(*corev1.Service)
serviceHostname := fmt.Sprintf("%s.%s", service.Name, service.Namespace)
if service.Spec.ClusterIP != "" && service.Spec.ClusterIP != kubClientNullString {
resolver.saveResolvedName(service.Spec.ClusterIP, serviceHostname, event.Type)
resolver.saveResolvedName(service.Spec.ClusterIP, serviceHostname, service.Namespace, event.Type)
if service.Spec.Ports != nil {
for _, port := range service.Spec.Ports {
if port.Port > 0 {
resolver.saveResolvedName(fmt.Sprintf("%s:%d", service.Spec.ClusterIP, port.Port), serviceHostname, event.Type)
resolver.saveResolvedName(fmt.Sprintf("%s:%d", service.Spec.ClusterIP, port.Port), serviceHostname, service.Namespace, event.Type)
}
}
}
resolver.saveServiceIP(service.Spec.ClusterIP, serviceHostname, event.Type)
resolver.saveServiceIP(service.Spec.ClusterIP, serviceHostname, service.Namespace, event.Type)
}
if service.Status.LoadBalancer.Ingress != nil {
for _, ingress := range service.Status.LoadBalancer.Ingress {
resolver.saveResolvedName(ingress.IP, serviceHostname, event.Type)
resolver.saveResolvedName(ingress.IP, serviceHostname, service.Namespace, event.Type)
}
}
case <-ctx.Done():
@ -161,21 +166,22 @@ func (resolver *Resolver) watchServices(ctx context.Context) error {
}
}
func (resolver *Resolver) saveResolvedName(key string, resolved string, eventType watch.EventType) {
func (resolver *Resolver) saveResolvedName(key string, resolved string, namespace string, eventType watch.EventType) {
if eventType == watch.Deleted {
resolver.nameMap.Remove(key)
logger.Log.Infof("setting %s=nil", key)
} else {
resolver.nameMap.Set(key, resolved)
resolver.nameMap.Set(key, &ResolvedObjectInfo{FullAddress: resolved, Namespace: namespace})
logger.Log.Infof("setting %s=%s", key, resolved)
}
}
func (resolver *Resolver) saveServiceIP(key string, resolved string, eventType watch.EventType) {
func (resolver *Resolver) saveServiceIP(key string, resolved string, namespace string, eventType watch.EventType) {
if eventType == watch.Deleted {
resolver.serviceMap.Remove(key)
} else {
resolver.serviceMap.Set(key, resolved)
resolver.nameMap.Set(key, &ResolvedObjectInfo{FullAddress: resolved, Namespace: namespace})
}
}

View File

@ -1,5 +1,5 @@
# Mizu release _VER_
Full changelog for stable release see in [docs](https://github.com/up9inc/mizu/blob/main/docs/CHANGELOG.md)
Mizu CHANGELOG is now part of [Mizu wiki](https://github.com/up9inc/mizu/wiki/CHANGELOG)
## Download Mizu for your platform

View File

@ -4,9 +4,11 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"
"github.com/up9inc/mizu/shared/kubernetes"
@ -57,10 +59,8 @@ func (provider *Provider) TestConnection() error {
func (provider *Provider) isReachable() (bool, error) {
echoUrl := fmt.Sprintf("%s/echo", provider.url)
if response, err := provider.client.Get(echoUrl); err != nil {
if _, err := provider.get(echoUrl); err != nil {
return false, err
} else if response.StatusCode != 200 {
return false, fmt.Errorf("invalid status code %v", response.StatusCode)
} else {
return true, nil
}
@ -72,10 +72,8 @@ func (provider *Provider) ReportTapperStatus(tapperStatus shared.TapperStatus) e
if jsonValue, err := json.Marshal(tapperStatus); err != nil {
return fmt.Errorf("failed Marshal the tapper status %w", err)
} else {
if response, err := provider.client.Post(tapperStatusUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
if _, err := provider.post(tapperStatusUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
return fmt.Errorf("failed sending to API server the tapped pods %w", err)
} else if response.StatusCode != 200 {
return fmt.Errorf("failed sending to API server the tapper status, response status code %v", response.StatusCode)
} else {
logger.Log.Debugf("Reported to server API about tapper status: %v", tapperStatus)
return nil
@ -91,10 +89,8 @@ func (provider *Provider) ReportTappedPods(pods []core.Pod) error {
if jsonValue, err := json.Marshal(podInfos); err != nil {
return fmt.Errorf("failed Marshal the tapped pods %w", err)
} else {
if response, err := provider.client.Post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
if _, err := provider.post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
return fmt.Errorf("failed sending to API server the tapped pods %w", err)
} else if response.StatusCode != 200 {
return fmt.Errorf("failed sending to API server the tapped pods, response status code %v", response.StatusCode)
} else {
logger.Log.Debugf("Reported to server API about %d taped pods successfully", len(podInfos))
return nil
@ -105,11 +101,9 @@ func (provider *Provider) ReportTappedPods(pods []core.Pod) error {
func (provider *Provider) GetGeneralStats() (map[string]interface{}, error) {
generalStatsUrl := fmt.Sprintf("%s/status/general", provider.url)
response, requestErr := provider.client.Get(generalStatsUrl)
response, requestErr := provider.get(generalStatsUrl)
if requestErr != nil {
return nil, fmt.Errorf("failed to get general stats for telemetry, err: %w", requestErr)
} else if response.StatusCode != 200 {
return nil, fmt.Errorf("failed to get general stats for telemetry, status code: %v", response.StatusCode)
}
defer response.Body.Close()
@ -132,7 +126,7 @@ func (provider *Provider) GetVersion() (string, error) {
Method: http.MethodGet,
URL: versionUrl,
}
statusResp, err := provider.client.Do(req)
statusResp, err := provider.do(req)
if err != nil {
return "", err
}
@ -145,3 +139,40 @@ func (provider *Provider) GetVersion() (string, error) {
return versionResponse.Ver, nil
}
// When err is nil, resp always contains a non-nil resp.Body.
// Caller should close resp.Body when done reading from it.
func (provider *Provider) get(url string) (*http.Response, error) {
return provider.checkError(provider.client.Get(url))
}
// When err is nil, resp always contains a non-nil resp.Body.
// Caller should close resp.Body when done reading from it.
func (provider *Provider) post(url, contentType string, body io.Reader) (*http.Response, error) {
return provider.checkError(provider.client.Post(url, contentType, body))
}
// When err is nil, resp always contains a non-nil resp.Body.
// Caller should close resp.Body when done reading from it.
func (provider *Provider) do(req *http.Request) (*http.Response, error) {
return provider.checkError(provider.client.Do(req))
}
func (provider *Provider) checkError(response *http.Response, errInOperation error) (*http.Response, error) {
if (errInOperation != nil) {
return response, errInOperation
// Check only if status != 200 (and not status >= 300). Agent APIs return only 200 on success.
} else if response.StatusCode != http.StatusOK {
body, err := ioutil.ReadAll(response.Body)
response.Body.Close()
response.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
if err != nil {
return response, err
}
errorMsg := strings.ReplaceAll((string(body)), "\n", ";")
return response, fmt.Errorf("got response with status code: %d, body: %s", response.StatusCode, errorMsg)
}
return response, nil
}

View File

@ -1,10 +1,9 @@
package cmd
import (
"fmt"
"github.com/spf13/cobra"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/telemetry"
"github.com/up9inc/mizu/shared/logger"
)
var installCmd = &cobra.Command{
@ -12,13 +11,13 @@ var installCmd = &cobra.Command{
Short: "Installs mizu components",
RunE: func(cmd *cobra.Command, args []string) error {
go telemetry.ReportRun("install", nil)
runMizuInstall()
return nil
},
PreRunE: func(cmd *cobra.Command, args []string) error {
if config.Config.IsNsRestrictedMode() {
return fmt.Errorf("install is not supported in restricted namespace mode")
}
logger.Log.Infof("This command has been deprecated, please use helm as described below.\n\n")
logger.Log.Infof("To install stable build of Mizu on your cluster using helm, run the following command:")
logger.Log.Infof(" helm install mizu https://static.up9.com/mizu/helm --namespace=mizu-ent --create-namespace\n\n")
logger.Log.Infof("To install development build of Mizu on your cluster using helm, run the following command:")
logger.Log.Infof(" helm install mizu https://static.up9.com/mizu/helm-develop --namespace=mizu-ent --create-namespace")
return nil
},
@ -27,4 +26,3 @@ var installCmd = &cobra.Command{
func init() {
rootCmd.AddCommand(installCmd)
}

View File

@ -1,81 +0,0 @@
package cmd
import (
"context"
"errors"
"fmt"
"github.com/creasty/defaults"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/errormessage"
"github.com/up9inc/mizu/cli/resources"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func runMizuInstall() {
kubernetesProvider, err := getKubernetesProviderForCli()
if err != nil {
return
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // cancel will be called when this function exits
var serializedValidationRules string
var serializedContract string
var defaultMaxEntriesDBSizeBytes int64 = 200 * 1000 * 1000
defaultResources := shared.Resources{}
if err := defaults.Set(&defaultResources); err != nil {
logger.Log.Debug(err)
}
mizuAgentConfig := getInstallMizuAgentConfig(defaultMaxEntriesDBSizeBytes, defaultResources)
serializedMizuConfig, err := getSerializedMizuAgentConfig(mizuAgentConfig)
if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error serializing mizu config: %v", errormessage.FormatError(err)))
return
}
if err = resources.CreateInstallMizuResources(ctx, kubernetesProvider, serializedValidationRules,
serializedContract, serializedMizuConfig, config.Config.IsNsRestrictedMode(),
config.Config.MizuResourcesNamespace, config.Config.AgentImage,
config.Config.KratosImage, config.Config.KetoImage,
nil, defaultMaxEntriesDBSizeBytes, defaultResources, config.Config.ImagePullPolicy(),
config.Config.LogLevel(), false); err != nil {
var statusError *k8serrors.StatusError
if errors.As(err, &statusError) && (statusError.ErrStatus.Reason == metav1.StatusReasonAlreadyExists) {
logger.Log.Info("Mizu is already running in this namespace, run `mizu clean` to remove the currently running Mizu instance")
} else {
defer resources.CleanUpMizuResources(ctx, cancel, kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.MizuResourcesNamespace)
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error creating resources: %v", errormessage.FormatError(err)))
}
return
}
logger.Log.Infof(uiUtils.Magenta, "Installation completed, run `mizu view` to connect to the mizu daemon instance")
}
func getInstallMizuAgentConfig(maxDBSizeBytes int64, tapperResources shared.Resources) *shared.MizuAgentConfig {
mizuAgentConfig := shared.MizuAgentConfig{
MaxDBSizeBytes: maxDBSizeBytes,
AgentImage: config.Config.AgentImage,
PullPolicy: config.Config.ImagePullPolicyStr,
LogLevel: config.Config.LogLevel(),
TapperResources: tapperResources,
MizuResourcesNamespace: config.Config.MizuResourcesNamespace,
AgentDatabasePath: shared.DataDirPath,
StandaloneMode: true,
ServiceMap: config.Config.ServiceMap,
OAS: config.Config.OAS,
Elastic: config.Config.Elastic,
}
return &mizuAgentConfig
}

View File

@ -162,6 +162,7 @@ func getTapMizuAgentConfig() *shared.MizuAgentConfig {
AgentDatabasePath: shared.DataDirPath,
ServiceMap: config.Config.ServiceMap,
OAS: config.Config.OAS,
Telemetry: config.Config.Telemetry,
Elastic: config.Config.Elastic,
}

View File

@ -3,13 +3,13 @@ package cmd
import (
"context"
"fmt"
"github.com/up9inc/mizu/cli/utils"
"net/http"
"github.com/up9inc/mizu/cli/utils"
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/mizu/fsUtils"
"github.com/up9inc/mizu/cli/mizu/version"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
@ -62,14 +62,5 @@ func runMizuView() {
uiUtils.OpenBrowser(url)
}
if isCompatible, err := version.CheckVersionCompatibility(apiServerProvider); err != nil {
logger.Log.Errorf("Failed to check versions compatibility %v", err)
cancel()
return
} else if !isCompatible {
cancel()
return
}
utils.WaitForFinish(ctx, cancel)
}

View File

@ -1,72 +1 @@
# CHANGELOG
This document summarizes main and fixes changes published in stable (aka `main`) branch of this project.
Ongoing work and development releases are under `develop` branch.
## 0.24.0
### main features
* ARM64 support -- Mizu is now available for ARM 64bit architecture
* Now you can run Mizu with `minikube` on your Apple M1 laptop or any other ARM-based hosts
* New command helps user verify Mizu deployment
* Run `mizu check` to verify Mizu was deployed successfully
* `mizu check` verifies version compatibility, resources and permissions required by Mizu
* EXPERIMENTAL: Service Map - graph of all service interactions
* Arrow direction show client to server connection
* Graph edge width reflects volume of traffic captured between the services
* to enable this experimental feature use `--set service-map=true` flag
### improvements
* Mizu container images are now served from [Docker Hub](https://hub.docker.com/r/up9inc/mizu), as multi-architecture images (arm64, amd64)
* in Mizu GUI the filter query can now be applied by pressing CONTROL/COMMAND + ENTER
* try port-forwarding if http-proxy connection to Mizu API server is not available
### notable bug fixes
* Fixed HTTP/1.0 presentation which was shown as HTTP/1.1
* Fixed handling of long-living TCP connections, improves capturing gRPC and HTTP/2 traffic, and helps in service-mesh setups (istio, linkerd)
## 0.23.0
### notable bug fixes
* fixed errors in Redis protocol parser (better handling of Array and Bulk String message types)
## 0.22.0
### main features
* Service Mesh support -- mizu is now capable to tap mTLS traffic between pods connected by Istio service mesh
* Use `--service-mesh` option to enable this feature
* New installation option - have the same Mizu functionality as long living pods in your cluster, with password protection
* To install use `mizu install` command
* To access use `mizu view` or `kubectl -n mizu port-forward svc/mizu-api-server`
* To uninstall run `mizu clean`
* At first login
* Set admin password as prompted, use it to login to mizu later on.
* After login, user should select cluster namespaces to tap: by default all namespaces in the cluster are selected, user can select/unselect according to their needs. These settings are retained and can be modified at any time via Settings menu (cog icon on the top-right)
### improvements
* improved Mizu permissions/roles logic to support clusters with strict PodSecurityPolicy (PSP) -- see [PERMISSIONS](PERMISSIONS.md) doc for more details
### notable bug fixes
* mizu now works properly when API service is exposed via HTTPS url
* mizu now properly displays KAFKA message body
## 0.21.0
### main features
* New traffic search & stream exprience
* Rich query language with full-text search capabilities on headers & body
* Distinct live-streaming vs paging/browsing modes, all with filter applied
### improvements
* GUI - source and destination IP addresses & service names for each traffic item
* GUI - Mizu health - display warning sign in top bar when not all requested pods are successfully tapped
* GUI - pod tapping status reflected in the list (ok or problem)
* Mizu telemetry - report platform type
### fixes
* Request duration and body size properly shown in GUI (instead of -1)
Mizu CHANGELOG is now part of [Mizu wiki](https://github.com/up9inc/mizu/wiki/CHANGELOG)

View File

@ -76,6 +76,8 @@ func NewProvider(kubeConfigPath string) (*Provider, error) {
"you can set alternative kube config file path by adding the kube-config-path field to the mizu config file, err: %w", kubeConfigPath, err)
}
logger.Log.Debugf("K8s client config, host: %s, api path: %s, user agent: %s", restClientConfig.Host, restClientConfig.APIPath, restClientConfig.UserAgent)
return &Provider{
clientSet: clientSet,
kubernetesConfig: kubernetesConfig,
@ -952,6 +954,11 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
labelSelector := applyconfmeta.LabelSelector()
labelSelector.WithMatchLabels(map[string]string{"app": tapperPodName})
applyOptions := metav1.ApplyOptions{
Force: true,
FieldManager: fieldManagerName,
}
daemonSet := applyconfapp.DaemonSet(daemonSetName, namespace)
daemonSet.
WithLabels(map[string]string{
@ -960,7 +967,7 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
}).
WithSpec(applyconfapp.DaemonSetSpec().WithSelector(labelSelector).WithTemplate(podTemplate))
_, err = provider.clientSet.AppsV1().DaemonSets(namespace).Apply(ctx, daemonSet, metav1.ApplyOptions{FieldManager: fieldManagerName})
_, err = provider.clientSet.AppsV1().DaemonSets(namespace).Apply(ctx, daemonSet, applyOptions)
return err
}

View File

@ -128,9 +128,14 @@ func getHttpDialer(kubernetesProvider *Provider, namespace string, podName strin
return nil, err
}
path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", namespace, podName)
hostIP := strings.TrimLeft(kubernetesProvider.clientConfig.Host, "htps:/") // no need specify "t" twice
serverURL := url.URL{Scheme: "https", Path: path, Host: hostIP}
clientConfigHostUrl, err := url.Parse(kubernetesProvider.clientConfig.Host)
if err != nil {
return nil, fmt.Errorf("Failed parsing client config host URL %s, error %w", kubernetesProvider.clientConfig.Host, err)
}
path := fmt.Sprintf("%s/api/v1/namespaces/%s/pods/%s/portforward", clientConfigHostUrl.Path, namespace, podName)
serverURL := url.URL{Scheme: "https", Path: path, Host: clientConfigHostUrl.Host}
logger.Log.Debugf("Http dialer url %v", serverURL)
return spdy.NewDialer(upgrader, &http.Client{Transport: roundTripper}, http.MethodPost, &serverURL), nil
}

View File

@ -43,6 +43,7 @@ type MizuAgentConfig struct {
StandaloneMode bool `json:"standaloneMode"`
ServiceMap bool `json:"serviceMap"`
OAS bool `json:"oas"`
Telemetry bool `json:"telemetry"`
Elastic ElasticConfig `json:"elastic"`
}

View File

@ -39,10 +39,9 @@ type TCP struct {
}
type Extension struct {
Protocol *Protocol
Path string
Dissector Dissector
MatcherMap *sync.Map
Protocol *Protocol
Path string
Dissector Dissector
}
type ConnectionInfo struct {
@ -62,7 +61,6 @@ type TcpID struct {
}
type CounterPair struct {
StreamId int64
Request uint
Response uint
sync.Mutex
@ -100,10 +98,15 @@ type SuperIdentifier struct {
type Dissector interface {
Register(*Extension)
Ping()
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions) error
Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string) *Entry
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions, reqResMatcher RequestResponseMatcher) error
Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *Entry
Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error)
Macros() map[string]string
NewResponseRequestMatcher() RequestResponseMatcher
}
type RequestResponseMatcher interface {
GetMap() *sync.Map
}
type Emitting struct {
@ -125,6 +128,7 @@ type Entry struct {
Protocol Protocol `json:"proto"`
Source *TCP `json:"src"`
Destination *TCP `json:"dst"`
Namespace string `json:"namespace,omitempty"`
Outgoing bool `json:"outgoing"`
Timestamp int64 `json:"timestamp"`
StartTime time.Time `json:"startTime"`

View File

@ -22,6 +22,7 @@ type Cleaner struct {
connectionTimeout time.Duration
stats CleanerStats
statsMutex sync.Mutex
streamsMap *tcpStreamMap
}
func (cl *Cleaner) clean() {
@ -32,10 +33,15 @@ func (cl *Cleaner) clean() {
flushed, closed := cl.assembler.FlushCloseOlderThan(startCleanTime.Add(-cl.connectionTimeout))
cl.assemblerMutex.Unlock()
for _, extension := range extensions {
deleted := deleteOlderThan(extension.MatcherMap, startCleanTime.Add(-cl.connectionTimeout))
cl.streamsMap.streams.Range(func(k, v interface{}) bool {
reqResMatcher := v.(*tcpStreamWrapper).reqResMatcher
if reqResMatcher == nil {
return true
}
deleted := deleteOlderThan(reqResMatcher.GetMap(), startCleanTime.Add(-cl.connectionTimeout))
cl.stats.deleted += deleted
}
return true
})
cl.statsMutex.Lock()
logger.Log.Debugf("Assembler Stats after cleaning %s", cl.assembler.Dump())

View File

@ -42,7 +42,7 @@ func (d dissecting) Ping() {
const amqpRequest string = "amqp_request"
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
r := AmqpReader{b}
var remaining int
@ -212,7 +212,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
}
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.Entry {
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry {
request := item.Pair.Request.Payload.(map[string]interface{})
reqDetails := request["details"].(map[string]interface{})
@ -254,6 +254,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
IP: item.ConnectionInfo.ServerIP,
Port: item.ConnectionInfo.ServerPort,
},
Namespace: namespace,
Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails,
Method: request["method"].(string),
@ -300,6 +301,10 @@ func (d dissecting) Macros() map[string]string {
}
}
func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher {
return nil
}
var Dissector dissecting
func NewDissector() api.Dissector {

View File

@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect:
@mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect/http/\* expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect2/http/\* expect

View File

@ -47,7 +47,7 @@ func replaceForwardedFor(item *api.OutputChannelItem) {
item.ConnectionInfo.ClientPort = ""
}
func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) error {
streamID, messageHTTP1, isGrpc, err := http2Assembler.readMessage()
if err != nil {
return err
@ -58,7 +58,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi
switch messageHTTP1 := messageHTTP1.(type) {
case http.Request:
ident := fmt.Sprintf(
"%s->%s %s->%s %d %s",
"%s_%s_%s_%s_%d_%s",
tcpID.SrcIP,
tcpID.DstIP,
tcpID.SrcPort,
@ -78,7 +78,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi
}
case http.Response:
ident := fmt.Sprintf(
"%s->%s %s->%s %d %s",
"%s_%s_%s_%s_%d_%s",
tcpID.DstIP,
tcpID.SrcIP,
tcpID.DstPort,
@ -110,7 +110,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi
return nil
}
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) (switchingProtocolsHTTP2 bool, req *http.Request, err error) {
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, req *http.Request, err error) {
req, err = http.ReadRequest(b)
if err != nil {
return
@ -130,8 +130,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
req.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
ident := fmt.Sprintf(
"%d_%s:%s_%s:%s_%d_%s",
counterPair.StreamId,
"%s_%s_%s_%s_%d_%s",
tcpID.SrcIP,
tcpID.DstIP,
tcpID.SrcPort,
@ -153,7 +152,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
return
}
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) (switchingProtocolsHTTP2 bool, err error) {
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, err error) {
var res *http.Response
res, err = http.ReadResponse(b, nil)
if err != nil {
@ -174,8 +173,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
res.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
ident := fmt.Sprintf(
"%d_%s:%s_%s:%s_%d_%s",
counterPair.StreamId,
"%s_%s_%s_%s_%d_%s",
tcpID.DstIP,
tcpID.SrcIP,
tcpID.DstPort,

View File

@ -84,14 +84,15 @@ type dissecting string
func (d dissecting) Register(extension *api.Extension) {
extension.Protocol = &http11protocol
extension.MatcherMap = reqResMatcher.openMessagesMap
}
func (d dissecting) Ping() {
log.Printf("pong %s", http11protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
var err error
isHTTP2, _ := checkIsHTTP2Connection(b, isClient)
@ -124,7 +125,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
if isHTTP2 {
err = handleHTTP2Stream(http2Assembler, tcpID, superTimer, emitter, options)
err = handleHTTP2Stream(http2Assembler, tcpID, superTimer, emitter, options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@ -133,7 +134,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
superIdentifier.Protocol = &http11protocol
} else if isClient {
var req *http.Request
switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter, options)
switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter, options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@ -144,7 +145,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
// In case of an HTTP2 upgrade, duplicate the HTTP1 request into HTTP2 with stream ID 1
if switchingProtocolsHTTP2 {
ident := fmt.Sprintf(
"%s->%s %s->%s 1 %s",
"%s_%s_%s_%s_1_%s",
tcpID.SrcIP,
tcpID.DstIP,
tcpID.SrcPort,
@ -164,7 +165,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
}
} else {
switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter, options)
switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter, options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@ -181,7 +182,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
return nil
}
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.Entry {
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry {
var host, authority, path string
request := item.Pair.Request.Payload.(map[string]interface{})
@ -279,6 +280,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
IP: item.ConnectionInfo.ServerIP,
Port: item.ConnectionInfo.ServerPort,
},
Namespace: namespace,
Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails,
Response: resDetails,
@ -472,6 +474,10 @@ func (d dissecting) Macros() map[string]string {
}
}
func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher {
return createResponseRequestMatcher()
}
var Dissector dissecting
func NewDissector() api.Dissector {

View File

@ -11,7 +11,6 @@ import (
"os"
"path"
"path/filepath"
"sort"
"testing"
"time"
@ -39,7 +38,6 @@ func TestRegister(t *testing.T) {
extension := &api.Extension{}
dissector.Register(extension)
assert.Equal(t, "http", extension.Protocol.Name)
assert.NotNil(t, extension.MatcherMap)
}
func TestMacros(t *testing.T) {
@ -123,7 +121,8 @@ func TestDissect(t *testing.T) {
SrcPort: "1",
DstPort: "2",
}
err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options)
reqResMatcher := dissector.NewResponseRequestMatcher()
err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err)
}
@ -141,7 +140,7 @@ func TestDissect(t *testing.T) {
SrcPort: "2",
DstPort: "1",
}
err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options)
err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err)
}
@ -155,14 +154,6 @@ func TestDissect(t *testing.T) {
stop <- true
sort.Slice(items, func(i, j int) bool {
iMarshaled, err := json.Marshal(items[i])
assert.Nil(t, err)
jMarshaled, err := json.Marshal(items[j])
assert.Nil(t, err)
return len(iMarshaled) < len(jMarshaled)
})
marshaled, err := json.Marshal(items)
assert.Nil(t, err)
@ -214,7 +205,7 @@ func TestAnalyze(t *testing.T) {
var entries []*api.Entry
for _, item := range items {
entry := dissector.Analyze(item, "", "")
entry := dissector.Analyze(item, "", "", "")
entries = append(entries, entry)
}

View File

@ -8,16 +8,17 @@ import (
"github.com/up9inc/mizu/tap/api"
)
var reqResMatcher = createResponseRequestMatcher() // global
// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}_{incremental_counter}
// Key is {client_addr}_{client_port}_{dest_addr}_{dest_port}_{incremental_counter}_{proto_ident}
type requestResponseMatcher struct {
openMessagesMap *sync.Map
}
func createResponseRequestMatcher() requestResponseMatcher {
newMatcher := &requestResponseMatcher{openMessagesMap: &sync.Map{}}
return *newMatcher
func createResponseRequestMatcher() api.RequestResponseMatcher {
return &requestResponseMatcher{openMessagesMap: &sync.Map{}}
}
func (matcher *requestResponseMatcher) GetMap() *sync.Map {
return matcher.openMessagesMap
}
func (matcher *requestResponseMatcher) registerRequest(ident string, request *http.Request, captureTime time.Time, protoMinor int) *api.OutputChannelItem {

View File

@ -33,27 +33,27 @@ type dissecting string
func (d dissecting) Register(extension *api.Extension) {
extension.Protocol = &_protocol
extension.MatcherMap = reqResMatcher.openMessagesMap
}
func (d dissecting) Ping() {
log.Printf("pong %s", _protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
for {
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &_protocol {
return errors.New("Identified by another protocol")
}
if isClient {
_, _, err := ReadRequest(b, tcpID, counterPair, superTimer)
_, _, err := ReadRequest(b, tcpID, counterPair, superTimer, reqResMatcher)
if err != nil {
return err
}
superIdentifier.Protocol = &_protocol
} else {
err := ReadResponse(b, tcpID, counterPair, superTimer, emitter)
err := ReadResponse(b, tcpID, counterPair, superTimer, emitter, reqResMatcher)
if err != nil {
return err
}
@ -62,7 +62,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
}
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.Entry {
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry {
request := item.Pair.Request.Payload.(map[string]interface{})
reqDetails := request["details"].(map[string]interface{})
apiKey := ApiKey(reqDetails["apiKey"].(float64))
@ -158,6 +158,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
IP: item.ConnectionInfo.ServerIP,
Port: item.ConnectionInfo.ServerPort,
},
Namespace: namespace,
Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails,
Response: item.Pair.Response.Payload.(map[string]interface{})["details"].(map[string]interface{}),
@ -215,6 +216,10 @@ func (d dissecting) Macros() map[string]string {
}
}
func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher {
return createResponseRequestMatcher()
}
var Dissector dissecting
func NewDissector() api.Dissector {

View File

@ -3,9 +3,10 @@ package kafka
import (
"sync"
"time"
"github.com/up9inc/mizu/tap/api"
)
var reqResMatcher = CreateResponseRequestMatcher() // global
const maxTry int = 3000
type RequestResponsePair struct {
@ -13,14 +14,17 @@ type RequestResponsePair struct {
Response Response
}
// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}::{correlation_id}
// Key is {client_addr}_{client_port}_{dest_addr}_{dest_port}_{correlation_id}
type requestResponseMatcher struct {
openMessagesMap *sync.Map
}
func CreateResponseRequestMatcher() requestResponseMatcher {
newMatcher := &requestResponseMatcher{openMessagesMap: &sync.Map{}}
return *newMatcher
func createResponseRequestMatcher() api.RequestResponseMatcher {
return &requestResponseMatcher{openMessagesMap: &sync.Map{}}
}
func (matcher *requestResponseMatcher) GetMap() *sync.Map {
return matcher.openMessagesMap
}
func (matcher *requestResponseMatcher) registerRequest(key string, request *Request) *RequestResponsePair {

View File

@ -19,7 +19,7 @@ type Request struct {
CaptureTime time.Time `json:"captureTime"`
}
func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer) (apiKey ApiKey, apiVersion int16, err error) {
func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, reqResMatcher *requestResponseMatcher) (apiKey ApiKey, apiVersion int16, err error) {
d := &decoder{reader: r, remain: 4}
size := d.readInt32()
@ -214,8 +214,7 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, su
}
key := fmt.Sprintf(
"%d_%s:%s_%s:%s_%d",
counterPair.StreamId,
"%s_%s_%s_%s_%d",
tcpID.SrcIP,
tcpID.SrcPort,
tcpID.DstIP,

View File

@ -16,7 +16,7 @@ type Response struct {
CaptureTime time.Time `json:"captureTime"`
}
func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) (err error) {
func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, reqResMatcher *requestResponseMatcher) (err error) {
d := &decoder{reader: r, remain: 4}
size := d.readInt32()
@ -44,8 +44,7 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, s
}
key := fmt.Sprintf(
"%d_%s:%s_%s:%s_%d",
counterPair.StreamId,
"%s_%s_%s_%s_%d",
tcpID.DstIP,
tcpID.DstPort,
tcpID.SrcIP,

View File

@ -6,15 +6,14 @@ import (
"github.com/up9inc/mizu/tap/api"
)
func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket) error {
func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket, reqResMatcher *requestResponseMatcher) error {
counterPair.Lock()
counterPair.Request++
requestCounter := counterPair.Request
counterPair.Unlock()
ident := fmt.Sprintf(
"%d_%s:%s_%s:%s_%d",
counterPair.StreamId,
"%s_%s_%s_%s_%d",
tcpID.SrcIP,
tcpID.DstIP,
tcpID.SrcPort,
@ -36,15 +35,14 @@ func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTim
return nil
}
func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket) error {
func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket, reqResMatcher *requestResponseMatcher) error {
counterPair.Lock()
counterPair.Response++
responseCounter := counterPair.Response
counterPair.Unlock()
ident := fmt.Sprintf(
"%d_%s:%s_%s:%s_%d",
counterPair.StreamId,
"%s_%s_%s_%s_%d",
tcpID.DstIP,
tcpID.SrcIP,
tcpID.DstPort,

View File

@ -32,14 +32,14 @@ type dissecting string
func (d dissecting) Register(extension *api.Extension) {
extension.Protocol = &protocol
extension.MatcherMap = reqResMatcher.openMessagesMap
}
func (d dissecting) Ping() {
log.Printf("pong %s", protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
is := &RedisInputStream{
Reader: b,
Buf: make([]byte, 8192),
@ -52,9 +52,9 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
if isClient {
err = handleClientStream(tcpID, counterPair, superTimer, emitter, redisPacket)
err = handleClientStream(tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher)
} else {
err = handleServerStream(tcpID, counterPair, superTimer, emitter, redisPacket)
err = handleServerStream(tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher)
}
if err != nil {
@ -63,7 +63,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
}
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.Entry {
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry {
request := item.Pair.Request.Payload.(map[string]interface{})
response := item.Pair.Response.Payload.(map[string]interface{})
reqDetails := request["details"].(map[string]interface{})
@ -96,6 +96,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
IP: item.ConnectionInfo.ServerIP,
Port: item.ConnectionInfo.ServerPort,
},
Namespace: namespace,
Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails,
Response: resDetails,
@ -127,6 +128,10 @@ func (d dissecting) Macros() map[string]string {
}
}
func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher {
return createResponseRequestMatcher()
}
var Dissector dissecting
func NewDissector() api.Dissector {

View File

@ -7,16 +7,17 @@ import (
"github.com/up9inc/mizu/tap/api"
)
var reqResMatcher = createResponseRequestMatcher() // global
// Key is `{stream_id}_{src_ip}:{dst_ip}_{src_ip}:{src_port}_{incremental_counter}`
// Key is `{src_ip}_{dst_ip}_{src_ip}_{src_port}_{incremental_counter}`
type requestResponseMatcher struct {
openMessagesMap *sync.Map
}
func createResponseRequestMatcher() requestResponseMatcher {
newMatcher := &requestResponseMatcher{openMessagesMap: &sync.Map{}}
return *newMatcher
func createResponseRequestMatcher() api.RequestResponseMatcher {
return &requestResponseMatcher{openMessagesMap: &sync.Map{}}
}
func (matcher *requestResponseMatcher) GetMap() *sync.Map {
return matcher.openMessagesMap
}
func (matcher *requestResponseMatcher) registerRequest(ident string, request *RedisPacket, captureTime time.Time) *api.OutputChannelItem {

View File

@ -210,6 +210,7 @@ func startPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem)
assemblerMutex: &assembler.assemblerMutex,
cleanPeriod: cleanPeriod,
connectionTimeout: staleConnectionTimeout,
streamsMap: streamsMap,
}
cleaner.start()

View File

@ -47,6 +47,7 @@ type tcpReader struct {
extension *api.Extension
emitter api.Emitter
counterPair *api.CounterPair
reqResMatcher api.RequestResponseMatcher
sync.Mutex
}
@ -94,7 +95,7 @@ func (h *tcpReader) Close() {
func (h *tcpReader) run(wg *sync.WaitGroup) {
defer wg.Done()
b := bufio.NewReader(h)
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter, filteringOptions)
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter, filteringOptions, h.reqResMatcher)
if err != nil {
_, err = io.Copy(ioutil.Discard, b)
if err != nil {

View File

@ -29,8 +29,9 @@ type tcpStreamFactory struct {
}
type tcpStreamWrapper struct {
stream *tcpStream
createdAt time.Time
stream *tcpStream
reqResMatcher api.RequestResponseMatcher
createdAt time.Time
}
func NewTcpStreamFactory(emitter api.Emitter, streamsMap *tcpStreamMap, opts *TapOpts) *tcpStreamFactory {
@ -81,8 +82,8 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
if stream.isTapTarget {
stream.id = factory.streamsMap.nextId()
for i, extension := range extensions {
reqResMatcher := extension.Dissector.NewResponseRequestMatcher()
counterPair := &api.CounterPair{
StreamId: stream.id,
Request: 0,
Response: 0,
}
@ -103,6 +104,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
extension: extension,
emitter: factory.Emitter,
counterPair: counterPair,
reqResMatcher: reqResMatcher,
})
stream.servers = append(stream.servers, tcpReader{
msgQueue: make(chan tcpReaderDataMsg),
@ -121,11 +123,13 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
extension: extension,
emitter: factory.Emitter,
counterPair: counterPair,
reqResMatcher: reqResMatcher,
})
factory.streamsMap.Store(stream.id, &tcpStreamWrapper{
stream: stream,
createdAt: time.Now(),
stream: stream,
reqResMatcher: reqResMatcher,
createdAt: time.Now(),
})
factory.wg.Add(2)

View File

@ -76,7 +76,7 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus}) => {
const scrollableRef = useRef(null);
const [openOasModal, setOpenOasModal] = useState(false);
const handleOpenModal = () => setOpenOasModal(true);
const handleCloseModal = () => setOpenOasModal(false);
const [showTLSWarning, setShowTLSWarning] = useState(false);
@ -258,8 +258,14 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus}) => {
}
}
const handleOpenOasModal = () => {
ws.current.close();
setOpenOasModal(true);
}
const openServiceMapModalDebounce = debounce(() => {
setServiceMapModalOpen(true)
ws.current.close();
setServiceMapModalOpen(true);
}, 500);
return (
@ -285,7 +291,7 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus}) => {
variant="contained"
className={commonClasses.outlinedButton + " " + commonClasses.imagedButton}
style={{ marginRight: 25 }}
onClick={handleOpenModal}
onClick={handleOpenOasModal}
>
Show OAS
</Button>}