diff --git a/acceptanceTests/tap_test.go b/acceptanceTests/tap_test.go index 0f04ea6a8..e5dd33855 100644 --- a/acceptanceTests/tap_test.go +++ b/acceptanceTests/tap_test.go @@ -762,6 +762,116 @@ func TestTapRegexMasking(t *testing.T) { } } +func TestTapIgnoredUserAgents(t *testing.T) { + if testing.Short() { + t.Skip("ignored acceptance test") + } + + cliPath, cliPathErr := getCliPath() + if cliPathErr != nil { + t.Errorf("failed to get cli path, err: %v", cliPathErr) + return + } + + tapCmdArgs := getDefaultTapCommandArgs() + + tapNamespace := getDefaultTapNamespace() + tapCmdArgs = append(tapCmdArgs, tapNamespace...) + + ignoredUserAgentValue := "ignore" + tapCmdArgs = append(tapCmdArgs, "--set", fmt.Sprintf("tap.ignored-user-agents=%v", ignoredUserAgentValue)) + + tapCmd := exec.Command(cliPath, tapCmdArgs...) + t.Logf("running command: %v", tapCmd.String()) + + t.Cleanup(func() { + if err := cleanupCommand(tapCmd); err != nil { + t.Logf("failed to cleanup tap command, err: %v", err) + } + }) + + if err := tapCmd.Start(); err != nil { + t.Errorf("failed to start tap command, err: %v", err) + return + } + + apiServerUrl := getApiServerUrl(defaultApiServerPort) + + if err := waitTapPodsReady(apiServerUrl); err != nil { + t.Errorf("failed to start tap pods on time, err: %v", err) + return + } + + proxyUrl := getProxyUrl(defaultNamespaceName, defaultServiceName) + + ignoredUserAgentCustomHeader := "Ignored-User-Agent" + headers := map[string]string {"User-Agent": ignoredUserAgentValue, ignoredUserAgentCustomHeader: ""} + for i := 0; i < defaultEntriesCount; i++ { + if _, requestErr := executeHttpGetRequestWithHeaders(fmt.Sprintf("%v/get", proxyUrl), headers); requestErr != nil { + t.Errorf("failed to send proxy request, err: %v", requestErr) + return + } + } + + for i := 0; i < defaultEntriesCount; i++ { + if _, requestErr := executeHttpGetRequest(fmt.Sprintf("%v/get", proxyUrl)); requestErr != nil { + t.Errorf("failed to send proxy request, err: %v", requestErr) + return + } + } + + ignoredUserAgentsCheckFunc := func() error { + timestamp := time.Now().UnixNano() / int64(time.Millisecond) + + entriesUrl := fmt.Sprintf("%v/api/entries?limit=%v&operator=lt×tamp=%v", apiServerUrl, defaultEntriesCount * 2, timestamp) + requestResult, requestErr := executeHttpGetRequest(entriesUrl) + if requestErr != nil { + return fmt.Errorf("failed to get entries, err: %v", requestErr) + } + + entries := requestResult.([]interface{}) + if len(entries) == 0 { + return fmt.Errorf("unexpected entries result - Expected more than 0 entries") + } + + for _, entryInterface := range entries { + entryUrl := fmt.Sprintf("%v/api/entries/%v", apiServerUrl, entryInterface.(map[string]interface{})["id"]) + requestResult, requestErr = executeHttpGetRequest(entryUrl) + if requestErr != nil { + return fmt.Errorf("failed to get entry, err: %v", requestErr) + } + + data := requestResult.(map[string]interface{})["data"].(map[string]interface{}) + entryJson := data["entry"].(string) + + var entry map[string]interface{} + if parseErr := json.Unmarshal([]byte(entryJson), &entry); parseErr != nil { + return fmt.Errorf("failed to parse entry, err: %v", parseErr) + } + + entryRequest := entry["request"].(map[string]interface{}) + entryPayload := entryRequest["payload"].(map[string]interface{}) + entryDetails := entryPayload["details"].(map[string]interface{}) + + entryHeaders := entryDetails["headers"].([]interface{}) + for _, headerInterface := range entryHeaders { + header := headerInterface.(map[string]interface{}) + if header["name"].(string) != ignoredUserAgentCustomHeader { + continue + } + + return fmt.Errorf("unexpected result - user agent is not ignored") + } + } + + return nil + } + if err := retriesExecute(shortRetriesCount, ignoredUserAgentsCheckFunc); err != nil { + t.Errorf("%v", err) + return + } +} + func TestTapDumpLogs(t *testing.T) { if testing.Short() { t.Skip("ignored acceptance test") diff --git a/acceptanceTests/testsUtils.go b/acceptanceTests/testsUtils.go index 19359b066..45a2c0f8b 100644 --- a/acceptanceTests/testsUtils.go +++ b/acceptanceTests/testsUtils.go @@ -172,6 +172,21 @@ func executeHttpRequest(response *http.Response, requestErr error) (interface{}, return jsonBytesToInterface(data) } +func executeHttpGetRequestWithHeaders(url string, headers map[string]string) (interface{}, error) { + request, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return nil, err + } + + for headerKey, headerValue := range headers { + request.Header.Add(headerKey, headerValue) + } + + client := &http.Client{} + response, requestErr := client.Do(request) + return executeHttpRequest(response, requestErr) +} + func executeHttpGetRequest(url string) (interface{}, error) { response, requestErr := http.Get(url) return executeHttpRequest(response, requestErr) diff --git a/agent/main.go b/agent/main.go index 31e421a19..53ae74fa7 100644 --- a/agent/main.go +++ b/agent/main.go @@ -4,6 +4,13 @@ import ( "encoding/json" "flag" "fmt" + "github.com/gin-contrib/static" + "github.com/gin-gonic/gin" + "github.com/gorilla/websocket" + "github.com/romana/rlog" + "github.com/up9inc/mizu/shared" + "github.com/up9inc/mizu/tap" + tapApi "github.com/up9inc/mizu/tap/api" "io/ioutil" "log" "mizuserver/pkg/api" @@ -18,15 +25,6 @@ import ( "path/filepath" "plugin" "sort" - "strings" - - "github.com/gin-contrib/static" - "github.com/gin-gonic/gin" - "github.com/gorilla/websocket" - "github.com/romana/rlog" - "github.com/up9inc/mizu/shared" - "github.com/up9inc/mizu/tap" - tapApi "github.com/up9inc/mizu/tap/api" ) var tapperMode = flag.Bool("tap", false, "Run in tapper mode without API") @@ -59,7 +57,7 @@ func main() { filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem) tap.StartPassiveTapper(tapOpts, outputItemsChannel, extensions, filteringOptions) - go filterItems(outputItemsChannel, filteredOutputItemsChannel, filteringOptions) + go filterItems(outputItemsChannel, filteredOutputItemsChannel) go api.StartReadingEntries(filteredOutputItemsChannel, nil, extensionsMap) hostApi(nil) @@ -77,7 +75,7 @@ func main() { filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem) tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions, filteringOptions) - socketConnection, err := utils.ConnectToSocketServer(*apiServerAddress) + socketConnection, _, err := websocket.DefaultDialer.Dial(*apiServerAddress, nil) if err != nil { panic(fmt.Sprintf("Error connecting to socket server at %s %v", *apiServerAddress, err)) } @@ -90,7 +88,7 @@ func main() { outputItemsChannel := make(chan *tapApi.OutputChannelItem) filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem) - go filterItems(outputItemsChannel, filteredOutputItemsChannel, filteringOptions) + go filterItems(outputItemsChannel, filteredOutputItemsChannel) go api.StartReadingEntries(filteredOutputItemsChannel, nil, extensionsMap) hostApi(outputItemsChannel) @@ -98,7 +96,7 @@ func main() { outputItemsChannel := make(chan *tapApi.OutputChannelItem, 1000) filteredHarChannel := make(chan *tapApi.OutputChannelItem) - go filterItems(outputItemsChannel, filteredHarChannel, filteringOptions) + go filterItems(outputItemsChannel, filteredHarChannel) go api.StartReadingEntries(filteredHarChannel, harsDir, extensionsMap) hostApi(nil) } @@ -230,7 +228,7 @@ func getTrafficFilteringOptions() *tapApi.TrafficFilteringOptions { filteringOptionsJson := os.Getenv(shared.MizuFilteringOptionsEnvVar) if filteringOptionsJson == "" { return &tapApi.TrafficFilteringOptions{ - HealthChecksUserAgentHeaders: []string{}, + IgnoredUserAgents: []string{}, } } var filteringOptions tapApi.TrafficFilteringOptions @@ -242,42 +240,16 @@ func getTrafficFilteringOptions() *tapApi.TrafficFilteringOptions { return &filteringOptions } -func filterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *tapApi.OutputChannelItem, filterOptions *tapApi.TrafficFilteringOptions) { +func filterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *tapApi.OutputChannelItem) { for message := range inChannel { if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) { continue } - // TODO: move this to tappers https://up9.atlassian.net/browse/TRA-3441 - if isHealthCheckByUserAgent(message, filterOptions.HealthChecksUserAgentHeaders) { - continue - } outChannel <- message } } -func isHealthCheckByUserAgent(item *tapApi.OutputChannelItem, userAgentsToIgnore []string) bool { - if item.Protocol.Name != "http" { - return false - } - - request := item.Pair.Request.Payload.(map[string]interface{}) - reqDetails := request["details"].(map[string]interface{}) - - for _, header := range reqDetails["headers"].([]interface{}) { - h := header.(map[string]interface{}) - if strings.ToLower(h["name"].(string)) == "user-agent" { - for _, userAgent := range userAgentsToIgnore { - if strings.Contains(strings.ToLower(h["value"].(string)), strings.ToLower(userAgent)) { - return true - } - } - return false - } - } - return false -} - func pipeTapChannelToSocket(connection *websocket.Conn, messageDataChannel <-chan *tapApi.OutputChannelItem) { if connection == nil { panic("Websocket connection is nil") diff --git a/agent/pkg/api/main.go b/agent/pkg/api/main.go index a540fd356..7163877fb 100644 --- a/agent/pkg/api/main.go +++ b/agent/pkg/api/main.go @@ -113,9 +113,8 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension json.Unmarshal([]byte(mizuEntry.Entry), &pair) harEntry, err := utils.NewEntry(&pair) if err == nil { - rules, _ := models.RunValidationRulesState(*harEntry, mizuEntry.Service) + rules, _, _ := models.RunValidationRulesState(*harEntry, mizuEntry.Service) baseEntry.Rules = rules - baseEntry.Latency = mizuEntry.ElapsedTime } } diff --git a/agent/pkg/controllers/entries_controller.go b/agent/pkg/controllers/entries_controller.go index 035d99fb6..4737884ff 100644 --- a/agent/pkg/controllers/entries_controller.go +++ b/agent/pkg/controllers/entries_controller.go @@ -143,11 +143,13 @@ func GetEntry(c *gin.Context) { protocol, representation, bodySize, _ := extension.Dissector.Represent(&entryData) var rules []map[string]interface{} + var isRulesEnabled bool if entryData.ProtocolName == "http" { var pair tapApi.RequestResponsePair json.Unmarshal([]byte(entryData.Entry), &pair) harEntry, _ := utils.NewEntry(&pair) - _, rulesMatched := models.RunValidationRulesState(*harEntry, entryData.Service) + _, rulesMatched, _isRulesEnabled := models.RunValidationRulesState(*harEntry, entryData.Service) + isRulesEnabled = _isRulesEnabled inrec, _ := json.Marshal(rulesMatched) json.Unmarshal(inrec, &rules) } @@ -158,6 +160,7 @@ func GetEntry(c *gin.Context) { BodySize: bodySize, Data: entryData, Rules: rules, + IsRulesEnabled: isRulesEnabled, }) } diff --git a/agent/pkg/models/models.go b/agent/pkg/models/models.go index fa733b6cb..3356dc75a 100644 --- a/agent/pkg/models/models.go +++ b/agent/pkg/models/models.go @@ -97,8 +97,8 @@ type ExtendedCreator struct { Source *string `json:"_source"` } -func RunValidationRulesState(harEntry har.Entry, service string) (tapApi.ApplicableRules, []rules.RulesMatched) { - resultPolicyToSend := rules.MatchRequestPolicy(harEntry, service) +func RunValidationRulesState(harEntry har.Entry, service string) (tapApi.ApplicableRules, []rules.RulesMatched, bool) { + resultPolicyToSend, isEnabled := rules.MatchRequestPolicy(harEntry, service) statusPolicyToSend, latency, numberOfRules := rules.PassedValidationRules(resultPolicyToSend) - return tapApi.ApplicableRules{Status: statusPolicyToSend, Latency: latency, NumberOfRules: numberOfRules}, resultPolicyToSend + return tapApi.ApplicableRules{Status: statusPolicyToSend, Latency: latency, NumberOfRules: numberOfRules}, resultPolicyToSend, isEnabled } diff --git a/agent/pkg/rules/rulesHTTP.go b/agent/pkg/rules/rulesHTTP.go index e4dc927b4..d2096fa83 100644 --- a/agent/pkg/rules/rulesHTTP.go +++ b/agent/pkg/rules/rulesHTTP.go @@ -4,11 +4,12 @@ import ( "encoding/base64" "encoding/json" "fmt" - "github.com/romana/rlog" "reflect" "regexp" "strings" + "github.com/romana/rlog" + "github.com/google/martian/har" "github.com/up9inc/mizu/shared" jsonpath "github.com/yalp/jsonpath" @@ -43,9 +44,11 @@ func ValidateService(serviceFromRule string, service string) bool { return true } -func MatchRequestPolicy(harEntry har.Entry, service string) []RulesMatched { - enforcePolicy, _ := shared.DecodeEnforcePolicy(fmt.Sprintf("%s/%s", shared.RulePolicyPath, shared.RulePolicyFileName)) - var resultPolicyToSend []RulesMatched +func MatchRequestPolicy(harEntry har.Entry, service string) (resultPolicyToSend []RulesMatched, isEnabled bool) { + enforcePolicy, err := shared.DecodeEnforcePolicy(fmt.Sprintf("%s/%s", shared.RulePolicyPath, shared.RulePolicyFileName)) + if err == nil { + isEnabled = true + } for _, rule := range enforcePolicy.Rules { if !ValidatePath(rule.Path, harEntry.Request.URL) || !ValidateService(rule.Service, service) { continue @@ -93,12 +96,12 @@ func MatchRequestPolicy(harEntry har.Entry, service string) []RulesMatched { resultPolicyToSend = appendRulesMatched(resultPolicyToSend, true, rule) } } - return resultPolicyToSend + return } func PassedValidationRules(rulesMatched []RulesMatched) (bool, int64, int) { var numberOfRulesMatched = len(rulesMatched) - var latency int64 = -1 + var responseTime int64 = -1 if numberOfRulesMatched == 0 { return false, 0, numberOfRulesMatched @@ -106,15 +109,15 @@ func PassedValidationRules(rulesMatched []RulesMatched) (bool, int64, int) { for _, rule := range rulesMatched { if rule.Matched == false { - return false, latency, numberOfRulesMatched + return false, responseTime, numberOfRulesMatched } else { - if strings.ToLower(rule.Rule.Type) == "latency" { - if rule.Rule.Latency < latency || latency == -1 { - latency = rule.Rule.Latency + if strings.ToLower(rule.Rule.Type) == "responseTime" { + if rule.Rule.ResponseTime < responseTime || responseTime == -1 { + responseTime = rule.Rule.ResponseTime } } } } - return true, latency, numberOfRulesMatched + return true, responseTime, numberOfRulesMatched } diff --git a/agent/pkg/utils/socket_client.go b/agent/pkg/utils/socket_client.go deleted file mode 100644 index aa532852a..000000000 --- a/agent/pkg/utils/socket_client.go +++ /dev/null @@ -1,38 +0,0 @@ -package utils - -import ( - "github.com/gorilla/websocket" - "github.com/romana/rlog" - "time" -) - -const ( - DEFAULT_SOCKET_RETRIES = 3 - DEFAULT_SOCKET_RETRY_SLEEP_TIME = time.Second * 10 -) - -func ConnectToSocketServer(address string) (*websocket.Conn, error) { - var err error - var connection *websocket.Conn - try := 0 - - // Connection to server fails if client pod is up before server. - // Retries solve this issue. - for try < DEFAULT_SOCKET_RETRIES { - rlog.Infof("Trying to connect to websocket: %s, attempt: %v/%v", address, try, DEFAULT_SOCKET_RETRIES) - connection, _, err = websocket.DefaultDialer.Dial(address, nil) - if err != nil { - rlog.Warnf("Failed connecting to websocket: %s, attempt: %v/%v, err: %s, (%v,%+v)", address, try, DEFAULT_SOCKET_RETRIES, err, err, err) - try++ - } else { - break - } - time.Sleep(DEFAULT_SOCKET_RETRY_SLEEP_TIME) - } - - if err != nil { - return nil, err - } - - return connection, nil -} diff --git a/cli/cmd/logs.go b/cli/cmd/logs.go index 16c626f33..3309f86fd 100644 --- a/cli/cmd/logs.go +++ b/cli/cmd/logs.go @@ -32,7 +32,7 @@ var logsCmd = &cobra.Command{ logger.Log.Debugf("Using file path %s", config.Config.Logs.FilePath()) - if dumpLogsErr := fsUtils.DumpLogs(kubernetesProvider, ctx, config.Config.Logs.FilePath()); dumpLogsErr != nil { + if dumpLogsErr := fsUtils.DumpLogs(ctx, kubernetesProvider, config.Config.Logs.FilePath()); dumpLogsErr != nil { logger.Log.Errorf("Failed dump logs %v", dumpLogsErr) } diff --git a/cli/cmd/tap.go b/cli/cmd/tap.go index effc83373..b7eb75613 100644 --- a/cli/cmd/tap.go +++ b/cli/cmd/tap.go @@ -2,7 +2,6 @@ package cmd import ( "errors" - "fmt" "os" "github.com/up9inc/mizu/cli/config" @@ -68,7 +67,4 @@ func init() { tapCmd.Flags().String(configStructs.HumanMaxEntriesDBSizeTapName, defaultTapConfig.HumanMaxEntriesDBSize, "Override the default max entries db size") tapCmd.Flags().Bool(configStructs.DryRunTapName, defaultTapConfig.DryRun, "Preview of all pods matching the regex, without tapping them") tapCmd.Flags().String(configStructs.EnforcePolicyFile, defaultTapConfig.EnforcePolicyFile, "Yaml file path with policy rules") - - tapCmd.Flags().String(configStructs.EnforcePolicyFileDeprecated, defaultTapConfig.EnforcePolicyFileDeprecated, "Yaml file with policy rules") - tapCmd.Flags().MarkDeprecated(configStructs.EnforcePolicyFileDeprecated, fmt.Sprintf("Use --%s instead", configStructs.EnforcePolicyFile)) } diff --git a/cli/cmd/tapRunner.go b/cli/cmd/tapRunner.go index 535bbacb1..66f682f1f 100644 --- a/cli/cmd/tapRunner.go +++ b/cli/cmd/tapRunner.go @@ -8,6 +8,10 @@ import ( "strings" "time" + "gopkg.in/yaml.v3" + core "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" + "github.com/up9inc/mizu/cli/apiserver" "github.com/up9inc/mizu/cli/config" "github.com/up9inc/mizu/cli/config/configStructs" @@ -22,9 +26,6 @@ import ( "github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared/debounce" "github.com/up9inc/mizu/tap/api" - yaml "gopkg.in/yaml.v3" - core "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/wait" ) const ( @@ -36,7 +37,6 @@ type tapState struct { apiServerService *core.Service currentlyTappedPods []core.Pod mizuServiceAccountExists bool - doNotRemoveConfigMap bool } var state tapState @@ -49,15 +49,8 @@ func RunMizuTap() { } var mizuValidationRules string - if config.Config.Tap.EnforcePolicyFile != "" || config.Config.Tap.EnforcePolicyFileDeprecated != "" { - var trafficValidation string - if config.Config.Tap.EnforcePolicyFile != "" { - trafficValidation = config.Config.Tap.EnforcePolicyFile - } else { - trafficValidation = config.Config.Tap.EnforcePolicyFileDeprecated - } - - mizuValidationRules, err = readValidationRules(trafficValidation) + if config.Config.Tap.EnforcePolicyFile != "" { + mizuValidationRules, err = readValidationRules(config.Config.Tap.EnforcePolicyFile) if err != nil { logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error reading policy file: %v", errormessage.FormatError(err))) return @@ -76,7 +69,7 @@ func RunMizuTap() { targetNamespaces := getNamespaces(kubernetesProvider) if config.Config.IsNsRestrictedMode() { - if len(targetNamespaces) != 1 || !mizu.Contains(targetNamespaces, config.Config.MizuResourcesNamespace) { + if len(targetNamespaces) != 1 || !shared.Contains(targetNamespaces, config.Config.MizuResourcesNamespace) { logger.Log.Errorf("Not supported mode. Mizu can't resolve IPs in other namespaces when running in namespace restricted mode.\n"+ "You can use the same namespace for --%s and --%s", configStructs.NamespacesTapName, config.MizuResourcesNamespaceConfigName) return @@ -84,7 +77,7 @@ func RunMizuTap() { } var namespacesStr string - if !mizu.Contains(targetNamespaces, mizu.K8sAllNamespaces) { + if !shared.Contains(targetNamespaces, mizu.K8sAllNamespaces) { namespacesStr = fmt.Sprintf("namespaces \"%s\"", strings.Join(targetNamespaces, "\", \"")) } else { namespacesStr = "all namespaces" @@ -99,7 +92,7 @@ func RunMizuTap() { if len(state.currentlyTappedPods) == 0 { var suggestionStr string - if !mizu.Contains(targetNamespaces, mizu.K8sAllNamespaces) { + if !shared.Contains(targetNamespaces, mizu.K8sAllNamespaces) { suggestionStr = ". Select a different namespace with -n or tap all namespaces with -A" } logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Did not find any pods matching the regex argument%s", suggestionStr)) @@ -109,18 +102,17 @@ func RunMizuTap() { return } - nodeToTappedPodIPMap := getNodeHostToTappedPodIpsMap(state.currentlyTappedPods) - defer finishMizuExecution(kubernetesProvider) - if err := createMizuResources(ctx, kubernetesProvider, nodeToTappedPodIPMap, mizuApiFilteringOptions, mizuValidationRules); err != nil { + if err := createMizuResources(ctx, kubernetesProvider, mizuApiFilteringOptions, mizuValidationRules); err != nil { logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error creating resources: %v", errormessage.FormatError(err))) return } - go goUtils.HandleExcWrapper(watchApiServerPod, ctx, kubernetesProvider, cancel) + go goUtils.HandleExcWrapper(watchApiServerPod, ctx, kubernetesProvider, cancel, mizuApiFilteringOptions) + go goUtils.HandleExcWrapper(watchTapperPod, ctx, kubernetesProvider, cancel) go goUtils.HandleExcWrapper(watchPodsForTapping, ctx, kubernetesProvider, targetNamespaces, cancel, mizuApiFilteringOptions) - //block until exit signal or error + // block until exit signal or error waitForFinish(ctx, cancel) } @@ -133,7 +125,7 @@ func readValidationRules(file string) (string, error) { return string(newContent), nil } -func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, mizuApiFilteringOptions *api.TrafficFilteringOptions, mizuValidationRules string) error { +func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, mizuApiFilteringOptions *api.TrafficFilteringOptions, mizuValidationRules string) error { if !config.Config.IsNsRestrictedMode() { if err := createMizuNamespace(ctx, kubernetesProvider); err != nil { return err @@ -144,15 +136,8 @@ func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Pro return err } - if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, mizuApiFilteringOptions); err != nil { - return err - } - if err := createMizuConfigmap(ctx, kubernetesProvider, mizuValidationRules); err != nil { logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Failed to create resources required for policy validation. Mizu will not validate policy rules. error: %v\n", errormessage.FormatError(err))) - state.doNotRemoveConfigMap = true - } else if mizuValidationRules == "" { - state.doNotRemoveConfigMap = true } return nil @@ -224,13 +209,15 @@ func getMizuApiFilteringOptions() (*api.TrafficFilteringOptions, error) { } return &api.TrafficFilteringOptions{ - PlainTextMaskingRegexes: compiledRegexSlice, - HealthChecksUserAgentHeaders: config.Config.Tap.HealthChecksUserAgentHeaders, - DisableRedaction: config.Config.Tap.DisableRedaction, + PlainTextMaskingRegexes: compiledRegexSlice, + IgnoredUserAgents: config.Config.Tap.IgnoredUserAgents, + DisableRedaction: config.Config.Tap.DisableRedaction, }, nil } -func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, mizuApiFilteringOptions *api.TrafficFilteringOptions) error { +func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, mizuApiFilteringOptions *api.TrafficFilteringOptions) error { + nodeToTappedPodIPMap := getNodeHostToTappedPodIpsMap(state.currentlyTappedPods) + if len(nodeToTappedPodIPMap) > 0 { var serviceAccountName string if state.mizuServiceAccountExists { @@ -268,75 +255,108 @@ func finishMizuExecution(kubernetesProvider *kubernetes.Provider) { telemetry.ReportAPICalls() removalCtx, cancel := context.WithTimeout(context.Background(), cleanupTimeout) defer cancel() - dumpLogsIfNeeded(kubernetesProvider, removalCtx) - cleanUpMizuResources(kubernetesProvider, removalCtx, cancel) + dumpLogsIfNeeded(removalCtx, kubernetesProvider) + cleanUpMizuResources(removalCtx, cancel, kubernetesProvider) } -func dumpLogsIfNeeded(kubernetesProvider *kubernetes.Provider, removalCtx context.Context) { +func dumpLogsIfNeeded(ctx context.Context, kubernetesProvider *kubernetes.Provider) { if !config.Config.DumpLogs { return } mizuDir := mizu.GetMizuFolderPath() filePath := path.Join(mizuDir, fmt.Sprintf("mizu_logs_%s.zip", time.Now().Format("2006_01_02__15_04_05"))) - if err := fsUtils.DumpLogs(kubernetesProvider, removalCtx, filePath); err != nil { + if err := fsUtils.DumpLogs(ctx, kubernetesProvider, filePath); err != nil { logger.Log.Errorf("Failed dump logs %v", err) } } -func cleanUpMizuResources(kubernetesProvider *kubernetes.Provider, removalCtx context.Context, cancel context.CancelFunc) { +func cleanUpMizuResources(ctx context.Context, cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider) { logger.Log.Infof("\nRemoving mizu resources\n") - if !config.Config.IsNsRestrictedMode() { - if err := kubernetesProvider.RemoveNamespace(removalCtx, config.Config.MizuResourcesNamespace); err != nil { - logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Namespace %s: %v", config.Config.MizuResourcesNamespace, errormessage.FormatError(err))) - return - } + var leftoverResources []string + + if config.Config.IsNsRestrictedMode() { + leftoverResources = cleanUpRestrictedMode(ctx, kubernetesProvider) } else { - if err := kubernetesProvider.RemovePod(removalCtx, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName); err != nil { - logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Pod %s in namespace %s: %v", mizu.ApiServerPodName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err))) - } - - if err := kubernetesProvider.RemoveService(removalCtx, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName); err != nil { - logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Service %s in namespace %s: %v", mizu.ApiServerPodName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err))) - } - - if err := kubernetesProvider.RemoveDaemonSet(removalCtx, config.Config.MizuResourcesNamespace, mizu.TapperDaemonSetName); err != nil { - logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing DaemonSet %s in namespace %s: %v", mizu.TapperDaemonSetName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err))) - } - - if !state.doNotRemoveConfigMap { - if err := kubernetesProvider.RemoveConfigMap(removalCtx, config.Config.MizuResourcesNamespace, mizu.ConfigMapName); err != nil { - logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing ConfigMap %s in namespace %s: %v", mizu.ConfigMapName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err))) - } - } - + leftoverResources = cleanUpNonRestrictedMode(ctx, cancel, kubernetesProvider) } - if state.mizuServiceAccountExists { - if !config.Config.IsNsRestrictedMode() { - if err := kubernetesProvider.RemoveNonNamespacedResources(removalCtx, mizu.ClusterRoleName, mizu.ClusterRoleBindingName); err != nil { - logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing non-namespaced resources: %v", errormessage.FormatError(err))) - return - } - } else { - if err := kubernetesProvider.RemoveServicAccount(removalCtx, config.Config.MizuResourcesNamespace, mizu.ServiceAccountName); err != nil { - logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Service Account %s in namespace %s: %v", mizu.ServiceAccountName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err))) - return - } - - if err := kubernetesProvider.RemoveRole(removalCtx, config.Config.MizuResourcesNamespace, mizu.RoleName); err != nil { - logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Role %s in namespace %s: %v", mizu.RoleName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err))) - } - - if err := kubernetesProvider.RemoveRoleBinding(removalCtx, config.Config.MizuResourcesNamespace, mizu.RoleBindingName); err != nil { - logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing RoleBinding %s in namespace %s: %v", mizu.RoleBindingName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err))) - } + if len(leftoverResources) > 0 { + errMsg := fmt.Sprintf("Failed to remove the following resources, for more info check logs at %s:", logger.GetLogFilePath()) + for _, resource := range leftoverResources { + errMsg += "\n- " + resource } + logger.Log.Errorf(uiUtils.Error, errMsg) + } +} + +func cleanUpRestrictedMode(ctx context.Context, kubernetesProvider *kubernetes.Provider) []string { + leftoverResources := make([]string, 0) + + if err := kubernetesProvider.RemovePod(ctx, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName); err != nil { + resourceDesc := fmt.Sprintf("Pod %s in namespace %s", mizu.ApiServerPodName, config.Config.MizuResourcesNamespace) + handleDeletionError(err, resourceDesc, &leftoverResources) } - if !config.Config.IsNsRestrictedMode() { - waitUntilNamespaceDeleted(removalCtx, cancel, kubernetesProvider) + if err := kubernetesProvider.RemoveService(ctx, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName); err != nil { + resourceDesc := fmt.Sprintf("Service %s in namespace %s", mizu.ApiServerPodName, config.Config.MizuResourcesNamespace) + handleDeletionError(err, resourceDesc, &leftoverResources) } + + if err := kubernetesProvider.RemoveDaemonSet(ctx, config.Config.MizuResourcesNamespace, mizu.TapperDaemonSetName); err != nil { + resourceDesc := fmt.Sprintf("DaemonSet %s in namespace %s", mizu.TapperDaemonSetName, config.Config.MizuResourcesNamespace) + handleDeletionError(err, resourceDesc, &leftoverResources) + } + + if err := kubernetesProvider.RemoveConfigMap(ctx, config.Config.MizuResourcesNamespace, mizu.ConfigMapName); err != nil { + resourceDesc := fmt.Sprintf("ConfigMap %s in namespace %s", mizu.ConfigMapName, config.Config.MizuResourcesNamespace) + handleDeletionError(err, resourceDesc, &leftoverResources) + } + + if err := kubernetesProvider.RemoveServicAccount(ctx, config.Config.MizuResourcesNamespace, mizu.ServiceAccountName); err != nil { + resourceDesc := fmt.Sprintf("Service Account %s in namespace %s", mizu.ServiceAccountName, config.Config.MizuResourcesNamespace) + handleDeletionError(err, resourceDesc, &leftoverResources) + } + + if err := kubernetesProvider.RemoveRole(ctx, config.Config.MizuResourcesNamespace, mizu.RoleName); err != nil { + resourceDesc := fmt.Sprintf("Role %s in namespace %s", mizu.RoleName, config.Config.MizuResourcesNamespace) + handleDeletionError(err, resourceDesc, &leftoverResources) + } + + if err := kubernetesProvider.RemoveRoleBinding(ctx, config.Config.MizuResourcesNamespace, mizu.RoleBindingName); err != nil { + resourceDesc := fmt.Sprintf("RoleBinding %s in namespace %s", mizu.RoleBindingName, config.Config.MizuResourcesNamespace) + handleDeletionError(err, resourceDesc, &leftoverResources) + } + + return leftoverResources +} + +func cleanUpNonRestrictedMode(ctx context.Context, cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider) []string { + leftoverResources := make([]string, 0) + + if err := kubernetesProvider.RemoveNamespace(ctx, config.Config.MizuResourcesNamespace); err != nil { + resourceDesc := fmt.Sprintf("Namespace %s", config.Config.MizuResourcesNamespace) + handleDeletionError(err, resourceDesc, &leftoverResources) + } else { + defer waitUntilNamespaceDeleted(ctx, cancel, kubernetesProvider) + } + + if err := kubernetesProvider.RemoveClusterRole(ctx, mizu.ClusterRoleName); err != nil { + resourceDesc := fmt.Sprintf("ClusterRole %s", mizu.ClusterRoleName) + handleDeletionError(err, resourceDesc, &leftoverResources) + } + + if err := kubernetesProvider.RemoveClusterRoleBinding(ctx, mizu.ClusterRoleBindingName); err != nil { + resourceDesc := fmt.Sprintf("ClusterRoleBinding %s", mizu.ClusterRoleBindingName) + handleDeletionError(err, resourceDesc, &leftoverResources) + } + + return leftoverResources +} + +func handleDeletionError(err error, resourceDesc string, leftoverResources *[]string) { + logger.Log.Debugf("Error removing %s: %v", resourceDesc, errormessage.FormatError(err)) + *leftoverResources = append(*leftoverResources, resourceDesc) } func waitUntilNamespaceDeleted(ctx context.Context, cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider) { @@ -376,13 +396,8 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro logger.Log.Debugf("[Error] failed update tapped pods %v", err) } - nodeToTappedPodIPMap := getNodeHostToTappedPodIpsMap(state.currentlyTappedPods) - if err != nil { - logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error building node to ips map: %v", errormessage.FormatError(err))) - cancel() - } - if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, mizuApiFilteringOptions); err != nil { - logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error updating daemonset: %v", errormessage.FormatError(err))) + if err := updateMizuTappers(ctx, kubernetesProvider, mizuApiFilteringOptions); err != nil { + logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error updating tappers: %v", errormessage.FormatError(err))) cancel() } } @@ -500,7 +515,7 @@ func getMissingPods(pods1 []core.Pod, pods2 []core.Pod) []core.Pod { return missingPods } -func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) { +func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, mizuApiFilteringOptions *api.TrafficFilteringOptions) { podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", mizu.ApiServerPodName)) added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, []string{config.Config.MizuResourcesNamespace}, podExactRegex) isPodReady := false @@ -530,16 +545,38 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi } logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase) + + if modifiedPod.Status.Phase == core.PodPending { + if modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue { + logger.Log.Debugf("Wasn't able to deploy the API server. Reason: \"%s\"", modifiedPod.Status.Conditions[0].Message) + logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server, for more info check logs at %s", logger.GetLogFilePath())) + cancel() + break + } + + if len(modifiedPod.Status.ContainerStatuses) > 0 && modifiedPod.Status.ContainerStatuses[0].State.Waiting != nil && modifiedPod.Status.ContainerStatuses[0].State.Waiting.Reason == "ErrImagePull" { + logger.Log.Debugf("Wasn't able to deploy the API server. (ErrImagePull) Reason: \"%s\"", modifiedPod.Status.ContainerStatuses[0].State.Waiting.Message) + logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server: failed to pull the image, for more info check logs at %v", logger.GetLogFilePath())) + cancel() + break + } + } + if modifiedPod.Status.Phase == core.PodRunning && !isPodReady { isPodReady = true go startProxyReportErrorIfAny(kubernetesProvider, cancel) url := GetApiServerUrl() if err := apiserver.Provider.InitAndTestConnection(url); err != nil { - logger.Log.Errorf(uiUtils.Error, "Couldn't connect to API server, check logs") + logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", logger.GetLogFilePath())) cancel() break } + if err := updateMizuTappers(ctx, kubernetesProvider, mizuApiFilteringOptions); err != nil { + logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error updating tappers: %v", errormessage.FormatError(err))) + cancel() + } + logger.Log.Infof("Mizu is available at %s\n", url) openBrowser(url) requestForAnalysisIfNeeded() @@ -547,13 +584,13 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi logger.Log.Debugf("[Error] failed update tapped pods %v", err) } } - case _, ok := <-errorChan: + case err, ok := <-errorChan: if !ok { errorChan = nil continue } - logger.Log.Debugf("[ERROR] Agent creation, watching %v namespace", config.Config.MizuResourcesNamespace) + logger.Log.Debugf("[ERROR] Agent creation, watching %v namespace, error: %v", config.Config.MizuResourcesNamespace, err) cancel() case <-timeAfter: @@ -568,6 +605,72 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi } } +func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) { + podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", mizu.TapperDaemonSetName)) + added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, []string{config.Config.MizuResourcesNamespace}, podExactRegex) + var prevPodPhase core.PodPhase + for { + select { + case addedPod, ok := <-added: + if !ok { + added = nil + continue + } + + logger.Log.Debugf("Tapper is created [%s]", addedPod.Name) + case removedPod, ok := <-removed: + if !ok { + removed = nil + continue + } + + logger.Log.Debugf("Tapper is removed [%s]", removedPod.Name) + case modifiedPod, ok := <-modified: + if !ok { + modified = nil + continue + } + + if modifiedPod.Status.Phase == core.PodPending && modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue { + logger.Log.Infof(uiUtils.Red, "Wasn't able to deploy the tapper %s. Reason: \"%s\"", modifiedPod.Name, modifiedPod.Status.Conditions[0].Message) + cancel() + break + } + + podStatus := modifiedPod.Status + if podStatus.Phase == core.PodPending && prevPodPhase == podStatus.Phase { + logger.Log.Debugf("Tapper %s is %s", modifiedPod.Name, strings.ToLower(string(podStatus.Phase))) + continue + } + prevPodPhase = podStatus.Phase + + if podStatus.Phase == core.PodRunning { + state := podStatus.ContainerStatuses[0].State + if state.Terminated != nil { + switch state.Terminated.Reason { + case "OOMKilled": + logger.Log.Infof(uiUtils.Red, "Tapper %s was terminated (reason: OOMKilled). You should consider increasing machine resources.", modifiedPod.Name) + } + } + } + + logger.Log.Debugf("Tapper %s is %s", modifiedPod.Name, strings.ToLower(string(podStatus.Phase))) + case err, ok := <-errorChan: + if !ok { + errorChan = nil + continue + } + + logger.Log.Debugf("[Error] Error in mizu tapper watch, err: %v", err) + cancel() + + case <-ctx.Done(): + logger.Log.Debugf("Watching tapper pod loop, ctx done") + return + } + } +} + func requestForAnalysisIfNeeded() { if !config.Config.Tap.Analysis { return @@ -609,7 +712,7 @@ func getNamespaces(kubernetesProvider *kubernetes.Provider) []string { if config.Config.Tap.AllNamespaces { return []string{mizu.K8sAllNamespaces} } else if len(config.Config.Tap.Namespaces) > 0 { - return mizu.Unique(config.Config.Tap.Namespaces) + return shared.Unique(config.Config.Tap.Namespaces) } else { return []string{kubernetesProvider.CurrentNamespace()} } diff --git a/cli/cmd/view.go b/cli/cmd/view.go index a8379cf42..b3bf31f67 100644 --- a/cli/cmd/view.go +++ b/cli/cmd/view.go @@ -25,4 +25,7 @@ func init() { defaults.Set(&defaultViewConfig) viewCmd.Flags().Uint16P(configStructs.GuiPortViewName, "p", defaultViewConfig.GuiPort, "Provide a custom port for the web interface webserver") + viewCmd.Flags().StringP(configStructs.UrlViewName, "u", defaultViewConfig.Url, "Provide a custom host") + + viewCmd.Flags().MarkHidden(configStructs.UrlViewName) } diff --git a/cli/cmd/viewRunner.go b/cli/cmd/viewRunner.go index 3fc50161b..63ce6d695 100644 --- a/cli/cmd/viewRunner.go +++ b/cli/cmd/viewRunner.go @@ -24,34 +24,39 @@ func runMizuView() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - exists, err := kubernetesProvider.DoesServicesExist(ctx, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName) - if err != nil { - logger.Log.Errorf("Failed to found mizu service %v", err) - cancel() - return - } - if !exists { - logger.Log.Infof("%s service not found, you should run `mizu tap` command first", mizu.ApiServerPodName) - cancel() - return - } + url := config.Config.View.Url - url := GetApiServerUrl() + if url == "" { + exists, err := kubernetesProvider.DoesServicesExist(ctx, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName) + if err != nil { + logger.Log.Errorf("Failed to found mizu service %v", err) + cancel() + return + } + if !exists { + logger.Log.Infof("%s service not found, you should run `mizu tap` command first", mizu.ApiServerPodName) + cancel() + return + } - response, err := http.Get(fmt.Sprintf("%s/", url)) - if err == nil && response.StatusCode == 200 { - logger.Log.Infof("Found a running service %s and open port %d", mizu.ApiServerPodName, config.Config.View.GuiPort) - return - } - logger.Log.Infof("Establishing connection to k8s cluster...") - go startProxyReportErrorIfAny(kubernetesProvider, cancel) + url = GetApiServerUrl() - if err := apiserver.Provider.InitAndTestConnection(GetApiServerUrl()); err != nil { - logger.Log.Errorf(uiUtils.Error, "Couldn't connect to API server, check logs") - return + response, err := http.Get(fmt.Sprintf("%s/", url)) + if err == nil && response.StatusCode == 200 { + logger.Log.Infof("Found a running service %s and open port %d", mizu.ApiServerPodName, config.Config.View.GuiPort) + return + } + logger.Log.Infof("Establishing connection to k8s cluster...") + go startProxyReportErrorIfAny(kubernetesProvider, cancel) + + if err := apiserver.Provider.InitAndTestConnection(GetApiServerUrl()); err != nil { + logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", logger.GetLogFilePath())) + return + } } logger.Log.Infof("Mizu is available at %s\n", url) + openBrowser(url) if isCompatible, err := version.CheckVersionCompatibility(); err != nil { logger.Log.Errorf("Failed to check versions compatibility %v", err) diff --git a/cli/config/config.go b/cli/config/config.go index b75e79cb8..8a65bdf28 100644 --- a/cli/config/config.go +++ b/cli/config/config.go @@ -4,7 +4,7 @@ import ( "errors" "fmt" "github.com/up9inc/mizu/cli/logger" - "github.com/up9inc/mizu/cli/mizu" + "github.com/up9inc/mizu/shared" "io/ioutil" "os" "reflect" @@ -89,7 +89,7 @@ func initFlag(f *pflag.Flag) { configElemValue := reflect.ValueOf(&Config).Elem() var flagPath []string - if mizu.Contains([]string{ConfigFilePathCommandName}, f.Name) { + if shared.Contains([]string{ConfigFilePathCommandName}, f.Name) { flagPath = []string{f.Name} } else { flagPath = []string{cmdName, f.Name} diff --git a/cli/config/configStructs/tapConfig.go b/cli/config/configStructs/tapConfig.go index 861acea01..80c15375a 100644 --- a/cli/config/configStructs/tapConfig.go +++ b/cli/config/configStructs/tapConfig.go @@ -17,26 +17,24 @@ const ( HumanMaxEntriesDBSizeTapName = "max-entries-db-size" DryRunTapName = "dry-run" EnforcePolicyFile = "traffic-validation-file" - EnforcePolicyFileDeprecated = "test-rules" ) type TapConfig struct { - AnalysisDestination string `yaml:"dest" default:"up9.app"` - SleepIntervalSec int `yaml:"upload-interval" default:"10"` - PodRegexStr string `yaml:"regex" default:".*"` - GuiPort uint16 `yaml:"gui-port" default:"8899"` - Namespaces []string `yaml:"namespaces"` - Analysis bool `yaml:"analysis" default:"false"` - AllNamespaces bool `yaml:"all-namespaces" default:"false"` - PlainTextFilterRegexes []string `yaml:"regex-masking"` - HealthChecksUserAgentHeaders []string `yaml:"ignored-user-agents"` - DisableRedaction bool `yaml:"no-redact" default:"false"` - HumanMaxEntriesDBSize string `yaml:"max-entries-db-size" default:"200MB"` - DryRun bool `yaml:"dry-run" default:"false"` - EnforcePolicyFile string `yaml:"traffic-validation-file"` - EnforcePolicyFileDeprecated string `yaml:"test-rules"` - ApiServerResources Resources `yaml:"api-server-resources"` - TapperResources Resources `yaml:"tapper-resources"` + AnalysisDestination string `yaml:"dest" default:"up9.app"` + SleepIntervalSec int `yaml:"upload-interval" default:"10"` + PodRegexStr string `yaml:"regex" default:".*"` + GuiPort uint16 `yaml:"gui-port" default:"8899"` + Namespaces []string `yaml:"namespaces"` + Analysis bool `yaml:"analysis" default:"false"` + AllNamespaces bool `yaml:"all-namespaces" default:"false"` + PlainTextFilterRegexes []string `yaml:"regex-masking"` + IgnoredUserAgents []string `yaml:"ignored-user-agents"` + DisableRedaction bool `yaml:"no-redact" default:"false"` + HumanMaxEntriesDBSize string `yaml:"max-entries-db-size" default:"200MB"` + DryRun bool `yaml:"dry-run" default:"false"` + EnforcePolicyFile string `yaml:"traffic-validation-file"` + ApiServerResources Resources `yaml:"api-server-resources"` + TapperResources Resources `yaml:"tapper-resources"` } type Resources struct { diff --git a/cli/config/configStructs/viewConfig.go b/cli/config/configStructs/viewConfig.go index aa41a7353..0337680c8 100644 --- a/cli/config/configStructs/viewConfig.go +++ b/cli/config/configStructs/viewConfig.go @@ -2,8 +2,10 @@ package configStructs const ( GuiPortViewName = "gui-port" + UrlViewName = "url" ) type ViewConfig struct { GuiPort uint16 `yaml:"gui-port" default:"8899"` + Url string `yaml:"url,omitempty" readonly:""` } diff --git a/cli/kubernetes/provider.go b/cli/kubernetes/provider.go index f05764b53..5f361fe73 100644 --- a/cli/kubernetes/provider.go +++ b/cli/kubernetes/provider.go @@ -268,67 +268,21 @@ func (provider *Provider) CreateService(ctx context.Context, namespace string, s return provider.clientSet.CoreV1().Services(namespace).Create(ctx, &service, metav1.CreateOptions{}) } -func (provider *Provider) DoesServiceAccountExist(ctx context.Context, namespace string, serviceAccountName string) (bool, error) { - serviceAccount, err := provider.clientSet.CoreV1().ServiceAccounts(namespace).Get(ctx, serviceAccountName, metav1.GetOptions{}) - return provider.doesResourceExist(serviceAccount, err) -} - -func (provider *Provider) DoesConfigMapExist(ctx context.Context, namespace string, name string) (bool, error) { - resource, err := provider.clientSet.CoreV1().ConfigMaps(namespace).Get(ctx, name, metav1.GetOptions{}) - return provider.doesResourceExist(resource, err) -} - func (provider *Provider) DoesServicesExist(ctx context.Context, namespace string, name string) (bool, error) { resource, err := provider.clientSet.CoreV1().Services(namespace).Get(ctx, name, metav1.GetOptions{}) return provider.doesResourceExist(resource, err) } -func (provider *Provider) DoesNamespaceExist(ctx context.Context, name string) (bool, error) { - resource, err := provider.clientSet.CoreV1().Namespaces().Get(ctx, name, metav1.GetOptions{}) - return provider.doesResourceExist(resource, err) -} - -func (provider *Provider) DoesClusterRoleExist(ctx context.Context, name string) (bool, error) { - resource, err := provider.clientSet.RbacV1().ClusterRoles().Get(ctx, name, metav1.GetOptions{}) - return provider.doesResourceExist(resource, err) -} - -func (provider *Provider) DoesClusterRoleBindingExist(ctx context.Context, name string) (bool, error) { - resource, err := provider.clientSet.RbacV1().ClusterRoleBindings().Get(ctx, name, metav1.GetOptions{}) - return provider.doesResourceExist(resource, err) -} - -func (provider *Provider) DoesRoleExist(ctx context.Context, namespace string, name string) (bool, error) { - resource, err := provider.clientSet.RbacV1().Roles(namespace).Get(ctx, name, metav1.GetOptions{}) - return provider.doesResourceExist(resource, err) -} - -func (provider *Provider) DoesRoleBindingExist(ctx context.Context, namespace string, name string) (bool, error) { - resource, err := provider.clientSet.RbacV1().RoleBindings(namespace).Get(ctx, name, metav1.GetOptions{}) - return provider.doesResourceExist(resource, err) -} - -func (provider *Provider) DoesPodExist(ctx context.Context, namespace string, name string) (bool, error) { - resource, err := provider.clientSet.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{}) - return provider.doesResourceExist(resource, err) -} - -func (provider *Provider) DoesDaemonSetExist(ctx context.Context, namespace string, name string) (bool, error) { - resource, err := provider.clientSet.AppsV1().DaemonSets(namespace).Get(ctx, name, metav1.GetOptions{}) - return provider.doesResourceExist(resource, err) -} - func (provider *Provider) doesResourceExist(resource interface{}, err error) (bool, error) { - var statusError *k8serrors.StatusError - if errors.As(err, &statusError) { - // expected behavior when resource does not exist - if statusError.ErrStatus.Reason == metav1.StatusReasonNotFound { - return false, nil - } + // Getting NotFound error is the expected behavior when a resource does not exist. + if k8serrors.IsNotFound(err) { + return false, nil } + if err != nil { return false, err } + return resource != nil, nil } @@ -441,115 +395,63 @@ func (provider *Provider) CreateMizuRBACNamespaceRestricted(ctx context.Context, } func (provider *Provider) RemoveNamespace(ctx context.Context, name string) error { - if isFound, err := provider.DoesNamespaceExist(ctx, name); err != nil { - return err - } else if !isFound { - return nil - } - - return provider.clientSet.CoreV1().Namespaces().Delete(ctx, name, metav1.DeleteOptions{}) -} - -func (provider *Provider) RemoveNonNamespacedResources(ctx context.Context, clusterRoleName string, clusterRoleBindingName string) error { - if err := provider.RemoveClusterRole(ctx, clusterRoleName); err != nil { - return err - } - - if err := provider.RemoveClusterRoleBinding(ctx, clusterRoleBindingName); err != nil { - return err - } - - return nil + err := provider.clientSet.CoreV1().Namespaces().Delete(ctx, name, metav1.DeleteOptions{}) + return provider.handleRemovalError(err) } func (provider *Provider) RemoveClusterRole(ctx context.Context, name string) error { - if isFound, err := provider.DoesClusterRoleExist(ctx, name); err != nil { - return err - } else if !isFound { - return nil - } - - return provider.clientSet.RbacV1().ClusterRoles().Delete(ctx, name, metav1.DeleteOptions{}) + err := provider.clientSet.RbacV1().ClusterRoles().Delete(ctx, name, metav1.DeleteOptions{}) + return provider.handleRemovalError(err) } func (provider *Provider) RemoveClusterRoleBinding(ctx context.Context, name string) error { - if isFound, err := provider.DoesClusterRoleBindingExist(ctx, name); err != nil { - return err - } else if !isFound { - return nil - } - - return provider.clientSet.RbacV1().ClusterRoleBindings().Delete(ctx, name, metav1.DeleteOptions{}) + err := provider.clientSet.RbacV1().ClusterRoleBindings().Delete(ctx, name, metav1.DeleteOptions{}) + return provider.handleRemovalError(err) } func (provider *Provider) RemoveRoleBinding(ctx context.Context, namespace string, name string) error { - if isFound, err := provider.DoesRoleBindingExist(ctx, namespace, name); err != nil { - return err - } else if !isFound { - return nil - } - - return provider.clientSet.RbacV1().RoleBindings(namespace).Delete(ctx, name, metav1.DeleteOptions{}) + err := provider.clientSet.RbacV1().RoleBindings(namespace).Delete(ctx, name, metav1.DeleteOptions{}) + return provider.handleRemovalError(err) } func (provider *Provider) RemoveRole(ctx context.Context, namespace string, name string) error { - if isFound, err := provider.DoesRoleExist(ctx, namespace, name); err != nil { - return err - } else if !isFound { - return nil - } - - return provider.clientSet.RbacV1().Roles(namespace).Delete(ctx, name, metav1.DeleteOptions{}) + err := provider.clientSet.RbacV1().Roles(namespace).Delete(ctx, name, metav1.DeleteOptions{}) + return provider.handleRemovalError(err) } func (provider *Provider) RemoveServicAccount(ctx context.Context, namespace string, name string) error { - if isFound, err := provider.DoesServiceAccountExist(ctx, namespace, name); err != nil { - return err - } else if !isFound { - return nil - } - - return provider.clientSet.CoreV1().ServiceAccounts(namespace).Delete(ctx, name, metav1.DeleteOptions{}) + err := provider.clientSet.CoreV1().ServiceAccounts(namespace).Delete(ctx, name, metav1.DeleteOptions{}) + return provider.handleRemovalError(err) } func (provider *Provider) RemovePod(ctx context.Context, namespace string, podName string) error { - if isFound, err := provider.DoesPodExist(ctx, namespace, podName); err != nil { - return err - } else if !isFound { - return nil - } - - return provider.clientSet.CoreV1().Pods(namespace).Delete(ctx, podName, metav1.DeleteOptions{}) + err := provider.clientSet.CoreV1().Pods(namespace).Delete(ctx, podName, metav1.DeleteOptions{}) + return provider.handleRemovalError(err) } func (provider *Provider) RemoveConfigMap(ctx context.Context, namespace string, configMapName string) error { - if isFound, err := provider.DoesConfigMapExist(ctx, namespace, configMapName); err != nil { - return err - } else if !isFound { - return nil - } - - return provider.clientSet.CoreV1().ConfigMaps(namespace).Delete(ctx, configMapName, metav1.DeleteOptions{}) + err := provider.clientSet.CoreV1().ConfigMaps(namespace).Delete(ctx, configMapName, metav1.DeleteOptions{}) + return provider.handleRemovalError(err) } func (provider *Provider) RemoveService(ctx context.Context, namespace string, serviceName string) error { - if isFound, err := provider.DoesServicesExist(ctx, namespace, serviceName); err != nil { - return err - } else if !isFound { - return nil - } - - return provider.clientSet.CoreV1().Services(namespace).Delete(ctx, serviceName, metav1.DeleteOptions{}) + err := provider.clientSet.CoreV1().Services(namespace).Delete(ctx, serviceName, metav1.DeleteOptions{}) + return provider.handleRemovalError(err) } func (provider *Provider) RemoveDaemonSet(ctx context.Context, namespace string, daemonSetName string) error { - if isFound, err := provider.DoesDaemonSetExist(ctx, namespace, daemonSetName); err != nil { - return err - } else if !isFound { + err := provider.clientSet.AppsV1().DaemonSets(namespace).Delete(ctx, daemonSetName, metav1.DeleteOptions{}) + return provider.handleRemovalError(err) +} + +func (provider *Provider) handleRemovalError(err error) error { + // Ignore NotFound - There is nothing to delete. + // Ignore Forbidden - Assume that a user could not have created the resource in the first place. + if k8serrors.IsNotFound(err) || k8serrors.IsForbidden(err) { return nil } - return provider.clientSet.AppsV1().DaemonSets(namespace).Delete(ctx, daemonSetName, metav1.DeleteOptions{}) + return err } func (provider *Provider) CreateConfigMap(ctx context.Context, namespace string, configMapName string, data string) error { @@ -731,7 +633,7 @@ func (provider *Provider) ListAllRunningPodsMatchingRegex(ctx context.Context, r return matchingPods, nil } -func (provider *Provider) GetPodLogs(namespace string, podName string, ctx context.Context) (string, error) { +func (provider *Provider) GetPodLogs(ctx context.Context, namespace string, podName string) (string, error) { podLogOpts := core.PodLogOptions{} req := provider.clientSet.CoreV1().Pods(namespace).GetLogs(podName, &podLogOpts) podLogs, err := req.Stream(ctx) @@ -747,7 +649,7 @@ func (provider *Provider) GetPodLogs(namespace string, podName string, ctx conte return str, nil } -func (provider *Provider) GetNamespaceEvents(namespace string, ctx context.Context) (string, error) { +func (provider *Provider) GetNamespaceEvents(ctx context.Context, namespace string) (string, error) { eventsOpts := metav1.ListOptions{} eventList, err := provider.clientSet.CoreV1().Events(namespace).List(ctx, eventsOpts) if err != nil { diff --git a/cli/kubernetes/watch.go b/cli/kubernetes/watch.go index 7916feef6..b2df708db 100644 --- a/cli/kubernetes/watch.go +++ b/cli/kubernetes/watch.go @@ -33,7 +33,10 @@ func FilteredWatch(ctx context.Context, kubernetesProvider *Provider, targetName return } - pod := e.Object.(*corev1.Pod) + pod, ok := e.Object.(*corev1.Pod) + if !ok { + continue + } if !podFilter.MatchString(pod.Name) { continue diff --git a/cli/mizu/fsUtils/mizuLogsUtils.go b/cli/mizu/fsUtils/mizuLogsUtils.go index 54ab05d28..2f3605709 100644 --- a/cli/mizu/fsUtils/mizuLogsUtils.go +++ b/cli/mizu/fsUtils/mizuLogsUtils.go @@ -12,7 +12,7 @@ import ( "regexp" ) -func DumpLogs(provider *kubernetes.Provider, ctx context.Context, filePath string) error { +func DumpLogs(ctx context.Context, provider *kubernetes.Provider, filePath string) error { podExactRegex := regexp.MustCompile("^" + mizu.MizuResourcesPrefix) pods, err := provider.ListAllPodsMatchingRegex(ctx, podExactRegex, []string{config.Config.MizuResourcesNamespace}) if err != nil { @@ -32,7 +32,7 @@ func DumpLogs(provider *kubernetes.Provider, ctx context.Context, filePath strin defer zipWriter.Close() for _, pod := range pods { - logs, err := provider.GetPodLogs(pod.Namespace, pod.Name, ctx) + logs, err := provider.GetPodLogs(ctx, pod.Namespace, pod.Name) if err != nil { logger.Log.Errorf("Failed to get logs, %v", err) continue @@ -47,7 +47,7 @@ func DumpLogs(provider *kubernetes.Provider, ctx context.Context, filePath strin } } - events, err := provider.GetNamespaceEvents(config.Config.MizuResourcesNamespace, ctx) + events, err := provider.GetNamespaceEvents(ctx, config.Config.MizuResourcesNamespace) if err != nil { logger.Log.Debugf("Failed to get k8b events, %v", err) } else { diff --git a/docs/POLICY_RULES.md b/docs/POLICY_RULES.md index fa92d92b4..e97afefac 100644 --- a/docs/POLICY_RULES.md +++ b/docs/POLICY_RULES.md @@ -31,12 +31,12 @@ mizu tap --traffic-validation-file rules.yaml The structure of the traffic-validation-file is: * `name`: string, name of the rule -* `type`: string, type of the rule, must be `json` or `header` or `latency` +* `type`: string, type of the rule, must be `json` or `header` or `slo` * `key`: string, [jsonpath](https://code.google.com/archive/p/jsonpath/wikis/Javascript.wiki) used only in `json` or `header` type * `value`: string, [regex](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Guide/Regular_Expressions) used only in `json` or `header` type * `service`: string, [regex](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Guide/Regular_Expressions) service name to filter * `path`: string, [regex](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Guide/Regular_Expressions) URL path to filter -* `latency`: integer, time in ms of the expected latency. +* `response-time`: integer, time in ms of the expected latency. ### For example: @@ -54,8 +54,8 @@ rules: key: "Content-Le.*" value: "(\\d+(?:\\.\\d+)?)" - name: latency-test - type: latency - latency: 1 + type: slo + response-time: 1 service: "carts.*" ``` diff --git a/shared/models.go b/shared/models.go index 5dac6e9cc..73ab4041c 100644 --- a/shared/models.go +++ b/shared/models.go @@ -1,8 +1,8 @@ package shared import ( - "fmt" "io/ioutil" + "log" "strings" "gopkg.in/yaml.v3" @@ -83,14 +83,14 @@ type RulesPolicy struct { } type RulePolicy struct { - Type string `yaml:"type"` - Service string `yaml:"service"` - Path string `yaml:"path"` - Method string `yaml:"method"` - Key string `yaml:"key"` - Value string `yaml:"value"` - Latency int64 `yaml:"latency"` - Name string `yaml:"name"` + Type string `yaml:"type"` + Service string `yaml:"service"` + Path string `yaml:"path"` + Method string `yaml:"method"` + Key string `yaml:"key"` + Value string `yaml:"value"` + ResponseTime int64 `yaml:"response-time"` + Name string `yaml:"name"` } type RulesMatched struct { @@ -99,14 +99,17 @@ type RulesMatched struct { } func (r *RulePolicy) validateType() bool { - permitedTypes := []string{"json", "header", "latency"} + permitedTypes := []string{"json", "header", "slo"} _, found := Find(permitedTypes, r.Type) if !found { - fmt.Printf("\nRule with name %s will be ignored. Err: only json, header and latency types are supported on rule definition.\n", r.Name) + log.Printf("Error: %s. ", r.Name) + log.Printf("Only json, header and slo types are supported on rule definition. This rule will be ignored\n") + found = false } - if strings.ToLower(r.Type) == "latency" { - if r.Latency == 0 { - fmt.Printf("\nRule with name %s will be ignored. Err: when type=latency, the field Latency should be specified and have a value >= 1\n\n", r.Name) + if strings.ToLower(r.Type) == "slo" { + if r.ResponseTime <= 0 { + log.Printf("Error: %s. ", r.Name) + log.Printf("When type=slo, the field response-time should be specified and have a value >= 1\n\n") found = false } } @@ -124,10 +127,6 @@ func (rules *RulesPolicy) ValidateRulesPolicy() []int { return invalidIndex } -func (rules *RulesPolicy) RemoveRule(idx int) { - rules.Rules = append(rules.Rules[:idx], rules.Rules[idx+1:]...) -} - func Find(slice []string, val string) (int, bool) { for i, item := range slice { if item == val { @@ -148,10 +147,15 @@ func DecodeEnforcePolicy(path string) (RulesPolicy, error) { return enforcePolicy, err } invalidIndex := enforcePolicy.ValidateRulesPolicy() + var k = 0 if len(invalidIndex) != 0 { - for i := range invalidIndex { - enforcePolicy.RemoveRule(invalidIndex[i]) + for i, rule := range enforcePolicy.Rules { + if !ContainsInt(invalidIndex, i) { + enforcePolicy.Rules[k] = rule + k++ + } } + enforcePolicy.Rules = enforcePolicy.Rules[:k] } return enforcePolicy, nil } diff --git a/cli/mizu/sliceUtils.go b/shared/sliceUtils.go similarity index 69% rename from cli/mizu/sliceUtils.go rename to shared/sliceUtils.go index 94e253225..6d08c70b5 100644 --- a/cli/mizu/sliceUtils.go +++ b/shared/sliceUtils.go @@ -1,4 +1,4 @@ -package mizu +package shared func Contains(slice []string, containsValue string) bool { for _, sliceValue := range slice { @@ -10,6 +10,16 @@ func Contains(slice []string, containsValue string) bool { return false } +func ContainsInt(slice []int, containsValue int) bool { + for _, sliceValue := range slice { + if sliceValue == containsValue { + return true + } + } + return false +} + + func Unique(slice []string) []string { keys := make(map[string]bool) var list []string diff --git a/cli/mizu/sliceUtils_test.go b/shared/sliceUtils_test.go similarity index 90% rename from cli/mizu/sliceUtils_test.go rename to shared/sliceUtils_test.go index d5e7efe6d..97d79b0bb 100644 --- a/cli/mizu/sliceUtils_test.go +++ b/shared/sliceUtils_test.go @@ -1,8 +1,8 @@ -package mizu_test +package shared_test import ( "fmt" - "github.com/up9inc/mizu/cli/mizu" + "github.com/up9inc/mizu/shared" "reflect" "testing" ) @@ -21,7 +21,7 @@ func TestContainsExists(t *testing.T) { for _, test := range tests { t.Run(test.ContainsValue, func(t *testing.T) { - actual := mizu.Contains(test.Slice, test.ContainsValue) + actual := shared.Contains(test.Slice, test.ContainsValue) if actual != test.Expected { t.Errorf("unexpected result - Expected: %v, actual: %v", test.Expected, actual) } @@ -43,7 +43,7 @@ func TestContainsNotExists(t *testing.T) { for _, test := range tests { t.Run(test.ContainsValue, func(t *testing.T) { - actual := mizu.Contains(test.Slice, test.ContainsValue) + actual := shared.Contains(test.Slice, test.ContainsValue) if actual != test.Expected { t.Errorf("unexpected result - Expected: %v, actual: %v", test.Expected, actual) } @@ -63,7 +63,7 @@ func TestContainsEmptySlice(t *testing.T) { for _, test := range tests { t.Run(test.ContainsValue, func(t *testing.T) { - actual := mizu.Contains(test.Slice, test.ContainsValue) + actual := shared.Contains(test.Slice, test.ContainsValue) if actual != test.Expected { t.Errorf("unexpected result - Expected: %v, actual: %v", test.Expected, actual) } @@ -83,7 +83,7 @@ func TestContainsNilSlice(t *testing.T) { for _, test := range tests { t.Run(test.ContainsValue, func(t *testing.T) { - actual := mizu.Contains(test.Slice, test.ContainsValue) + actual := shared.Contains(test.Slice, test.ContainsValue) if actual != test.Expected { t.Errorf("unexpected result - Expected: %v, actual: %v", test.Expected, actual) } @@ -102,7 +102,7 @@ func TestUniqueNoDuplicateValues(t *testing.T) { for index, test := range tests { t.Run(fmt.Sprintf("%v", index), func(t *testing.T) { - actual := mizu.Unique(test.Slice) + actual := shared.Unique(test.Slice) if !reflect.DeepEqual(test.Expected, actual) { t.Errorf("unexpected result - Expected: %v, actual: %v", test.Expected, actual) } @@ -121,7 +121,7 @@ func TestUniqueDuplicateValues(t *testing.T) { for index, test := range tests { t.Run(fmt.Sprintf("%v", index), func(t *testing.T) { - actual := mizu.Unique(test.Slice) + actual := shared.Unique(test.Slice) if !reflect.DeepEqual(test.Expected, actual) { t.Errorf("unexpected result - Expected: %v, actual: %v", test.Expected, actual) } diff --git a/tap/api/api.go b/tap/api/api.go index 42db3142c..94247624f 100644 --- a/tap/api/api.go +++ b/tap/api/api.go @@ -106,7 +106,7 @@ type MizuEntry struct { UpdatedAt time.Time ProtocolName string `json:"protocolName" gorm:"column:protocolName"` ProtocolLongName string `json:"protocolLongName" gorm:"column:protocolLongName"` - ProtocolAbbreviation string `json:"protocolAbbreviation" gorm:"column:protocolVersion"` + ProtocolAbbreviation string `json:"protocolAbbreviation" gorm:"column:protocolAbbreviation"` ProtocolVersion string `json:"protocolVersion" gorm:"column:protocolVersion"` ProtocolBackgroundColor string `json:"protocolBackgroundColor" gorm:"column:protocolBackgroundColor"` ProtocolForegroundColor string `json:"protocolForegroundColor" gorm:"column:protocolForegroundColor"` @@ -138,6 +138,7 @@ type MizuEntryWrapper struct { BodySize int64 `json:"bodySize"` Data MizuEntry `json:"data"` Rules []map[string]interface{} `json:"rulesMatched,omitempty"` + IsRulesEnabled bool `json:"isRulesEnabled"` } type BaseEntryDetails struct { @@ -156,7 +157,7 @@ type BaseEntryDetails struct { SourcePort string `json:"sourcePort,omitempty"` DestinationPort string `json:"destinationPort,omitempty"` IsOutgoing bool `json:"isOutgoing,omitempty"` - Latency int64 `json:"latency,omitempty"` + Latency int64 `json:"latency"` Rules ApplicableRules `json:"rules,omitempty"` } @@ -190,6 +191,7 @@ func (bed *BaseEntryDetails) UnmarshalData(entry *MizuEntry) error { bed.Timestamp = entry.Timestamp bed.RequestSenderIp = entry.RequestSenderIp bed.IsOutgoing = entry.IsOutgoing + bed.Latency = entry.ElapsedTime return nil } diff --git a/tap/api/options.go b/tap/api/options.go index aff071bf3..22772f0ef 100644 --- a/tap/api/options.go +++ b/tap/api/options.go @@ -1,7 +1,7 @@ package api type TrafficFilteringOptions struct { - HealthChecksUserAgentHeaders []string - PlainTextMaskingRegexes []*SerializableRegexp - DisableRedaction bool + IgnoredUserAgents []string + PlainTextMaskingRegexes []*SerializableRegexp + DisableRedaction bool } diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index 7cc638538..133e8997a 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -312,7 +312,7 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails { SourcePort: entry.SourcePort, DestinationPort: entry.DestinationPort, IsOutgoing: entry.IsOutgoing, - Latency: 0, + Latency: entry.ElapsedTime, Rules: api.ApplicableRules{ Latency: 0, Status: false, diff --git a/tap/extensions/http/handlers.go b/tap/extensions/http/handlers.go index 7c71d7691..d41bf3e54 100644 --- a/tap/extensions/http/handlers.go +++ b/tap/extensions/http/handlers.go @@ -14,9 +14,14 @@ import ( ) func filterAndEmit(item *api.OutputChannelItem, emitter api.Emitter, options *api.TrafficFilteringOptions) { + if IsIgnoredUserAgent(item, options) { + return + } + if !options.DisableRedaction { FilterSensitiveData(item, options) } + emitter.Emit(item) } diff --git a/tap/extensions/http/main.go b/tap/extensions/http/main.go index 0b48e69db..d8beac84f 100644 --- a/tap/extensions/http/main.go +++ b/tap/extensions/http/main.go @@ -223,7 +223,7 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails { SourcePort: entry.SourcePort, DestinationPort: entry.DestinationPort, IsOutgoing: entry.IsOutgoing, - Latency: 0, + Latency: entry.ElapsedTime, Rules: api.ApplicableRules{ Latency: 0, Status: false, diff --git a/tap/extensions/http/sensitive_data_cleaner.go b/tap/extensions/http/sensitive_data_cleaner.go index 4c61c9993..9ea0914bf 100644 --- a/tap/extensions/http/sensitive_data_cleaner.go +++ b/tap/extensions/http/sensitive_data_cleaner.go @@ -25,6 +25,30 @@ var personallyIdentifiableDataFields = []string{"token", "authorization", "authe "zip", "zipcode", "address", "country", "firstname", "lastname", "middlename", "fname", "lname", "birthdate"} +func IsIgnoredUserAgent(item *api.OutputChannelItem, options *api.TrafficFilteringOptions) bool { + if item.Protocol.Name != "http" { + return false + } + + request := item.Pair.Request.Payload.(HTTPPayload).Data.(*http.Request) + + for headerKey, headerValues := range request.Header { + if strings.ToLower(headerKey) == "user-agent" { + for _, userAgent := range options.IgnoredUserAgents { + for _, headerValue := range headerValues { + if strings.Contains(strings.ToLower(headerValue), strings.ToLower(userAgent)) { + return true + } + } + } + + return false + } + } + + return false +} + func FilterSensitiveData(item *api.OutputChannelItem, options *api.TrafficFilteringOptions) { request := item.Pair.Request.Payload.(HTTPPayload).Data.(*http.Request) response := item.Pair.Response.Payload.(HTTPPayload).Data.(*http.Response) @@ -86,6 +110,10 @@ func getContentTypeHeaderValue(headers http.Header) string { } func isFieldNameSensitive(fieldName string) bool { + if fieldName == ":authority" { + return false + } + name := strings.ToLower(fieldName) name = strings.ReplaceAll(name, "_", "") name = strings.ReplaceAll(name, "-", "") diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index e40575519..ca9291c30 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -186,7 +186,7 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails { SourcePort: entry.SourcePort, DestinationPort: entry.DestinationPort, IsOutgoing: entry.IsOutgoing, - Latency: 0, + Latency: entry.ElapsedTime, Rules: api.ApplicableRules{ Latency: 0, Status: false, diff --git a/tap/extensions/redis/errors.go b/tap/extensions/redis/errors.go new file mode 100644 index 000000000..25cb3bf54 --- /dev/null +++ b/tap/extensions/redis/errors.go @@ -0,0 +1,14 @@ +package main + +//ConnectError redis connection error,such as io timeout +type ConnectError struct { + Message string +} + +func newConnectError(message string) *ConnectError { + return &ConnectError{Message: message} +} + +func (e *ConnectError) Error() string { + return e.Message +} diff --git a/tap/extensions/redis/go.mod b/tap/extensions/redis/go.mod new file mode 100644 index 000000000..a69f319af --- /dev/null +++ b/tap/extensions/redis/go.mod @@ -0,0 +1,9 @@ +module github.com/up9inc/mizu/tap/extensions/redis + +go 1.16 + +require ( + github.com/up9inc/mizu/tap/api v0.0.0 +) + +replace github.com/up9inc/mizu/tap/api v0.0.0 => ../../api diff --git a/tap/extensions/redis/handlers.go b/tap/extensions/redis/handlers.go new file mode 100644 index 000000000..d135054b1 --- /dev/null +++ b/tap/extensions/redis/handlers.go @@ -0,0 +1,55 @@ +package main + +import ( + "fmt" + + "github.com/up9inc/mizu/tap/api" +) + +func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket) error { + counterPair.Request++ + ident := fmt.Sprintf( + "%s->%s %s->%s %d", + tcpID.SrcIP, + tcpID.DstIP, + tcpID.SrcPort, + tcpID.DstPort, + counterPair.Request, + ) + item := reqResMatcher.registerRequest(ident, request, superTimer.CaptureTime) + if item != nil { + item.ConnectionInfo = &api.ConnectionInfo{ + ClientIP: tcpID.SrcIP, + ClientPort: tcpID.SrcPort, + ServerIP: tcpID.DstIP, + ServerPort: tcpID.DstPort, + IsOutgoing: true, + } + emitter.Emit(item) + } + return nil +} + +func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket) error { + counterPair.Response++ + ident := fmt.Sprintf( + "%s->%s %s->%s %d", + tcpID.DstIP, + tcpID.SrcIP, + tcpID.DstPort, + tcpID.SrcPort, + counterPair.Response, + ) + item := reqResMatcher.registerResponse(ident, response, superTimer.CaptureTime) + if item != nil { + item.ConnectionInfo = &api.ConnectionInfo{ + ClientIP: tcpID.DstIP, + ClientPort: tcpID.DstPort, + ServerIP: tcpID.SrcIP, + ServerPort: tcpID.SrcPort, + IsOutgoing: false, + } + emitter.Emit(item) + } + return nil +} diff --git a/tap/extensions/redis/helpers.go b/tap/extensions/redis/helpers.go new file mode 100644 index 000000000..a020b722d --- /dev/null +++ b/tap/extensions/redis/helpers.go @@ -0,0 +1,57 @@ +package main + +import ( + "encoding/json" + + "github.com/up9inc/mizu/tap/api" +) + +type RedisPayload struct { + Data interface{} +} + +type RedisPayloader interface { + MarshalJSON() ([]byte, error) +} + +func (h RedisPayload) MarshalJSON() ([]byte, error) { + return json.Marshal(h.Data) +} + +type RedisWrapper struct { + Method string `json:"method"` + Url string `json:"url"` + Details interface{} `json:"details"` +} + +func representGeneric(generic map[string]interface{}) (representation []interface{}) { + details, _ := json.Marshal([]map[string]string{ + { + "name": "Type", + "value": generic["type"].(string), + }, + { + "name": "Command", + "value": generic["command"].(string), + }, + { + "name": "Key", + "value": generic["key"].(string), + }, + { + "name": "Value", + "value": generic["value"].(string), + }, + { + "name": "Keyword", + "value": generic["keyword"].(string), + }, + }) + representation = append(representation, map[string]string{ + "type": api.TABLE, + "title": "Details", + "data": string(details), + }) + + return +} diff --git a/tap/extensions/redis/main.go b/tap/extensions/redis/main.go new file mode 100644 index 000000000..be0650bfc --- /dev/null +++ b/tap/extensions/redis/main.go @@ -0,0 +1,154 @@ +package main + +import ( + "bufio" + "encoding/json" + "fmt" + "log" + + "github.com/up9inc/mizu/tap/api" +) + +var protocol api.Protocol = api.Protocol{ + Name: "redis", + LongName: "Redis Serialization Protocol", + Abbreviation: "REDIS", + Version: "3.x", + BackgroundColor: "#a41e11", + ForegroundColor: "#ffffff", + FontSize: 11, + ReferenceLink: "https://redis.io/topics/protocol", + Ports: []string{"6379"}, + Priority: 3, +} + +func init() { + log.Println("Initializing Redis extension...") +} + +type dissecting string + +func (d dissecting) Register(extension *api.Extension) { + extension.Protocol = &protocol + extension.MatcherMap = reqResMatcher.openMessagesMap +} + +func (d dissecting) Ping() { + log.Printf("pong %s\n", protocol.Name) +} + +func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error { + is := &RedisInputStream{ + Reader: b, + Buf: make([]byte, 8192), + } + proto := NewProtocol(is) + for { + redisPacket, err := proto.Read() + if err != nil { + return err + } + + if isClient { + handleClientStream(tcpID, counterPair, superTimer, emitter, redisPacket) + } else { + handleServerStream(tcpID, counterPair, superTimer, emitter, redisPacket) + } + } +} + +func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *api.MizuEntry { + request := item.Pair.Request.Payload.(map[string]interface{}) + reqDetails := request["details"].(map[string]interface{}) + service := "redis" + if resolvedDestination != "" { + service = resolvedDestination + } else if resolvedSource != "" { + service = resolvedSource + } + + method := "" + if reqDetails["command"] != nil { + method = reqDetails["command"].(string) + } + + summary := "" + if reqDetails["key"] != nil { + summary = reqDetails["key"].(string) + } + + request["url"] = summary + entryBytes, _ := json.Marshal(item.Pair) + return &api.MizuEntry{ + ProtocolName: protocol.Name, + ProtocolLongName: protocol.LongName, + ProtocolAbbreviation: protocol.Abbreviation, + ProtocolVersion: protocol.Version, + ProtocolBackgroundColor: protocol.BackgroundColor, + ProtocolForegroundColor: protocol.ForegroundColor, + ProtocolFontSize: protocol.FontSize, + ProtocolReferenceLink: protocol.ReferenceLink, + EntryId: entryId, + Entry: string(entryBytes), + Url: fmt.Sprintf("%s%s", service, summary), + Method: method, + Status: 0, + RequestSenderIp: item.ConnectionInfo.ClientIP, + Service: service, + Timestamp: item.Timestamp, + ElapsedTime: 0, + Path: summary, + ResolvedSource: resolvedSource, + ResolvedDestination: resolvedDestination, + SourceIp: item.ConnectionInfo.ClientIP, + DestinationIp: item.ConnectionInfo.ServerIP, + SourcePort: item.ConnectionInfo.ClientPort, + DestinationPort: item.ConnectionInfo.ServerPort, + IsOutgoing: item.ConnectionInfo.IsOutgoing, + } + +} + +func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails { + return &api.BaseEntryDetails{ + Id: entry.EntryId, + Protocol: protocol, + Url: entry.Url, + RequestSenderIp: entry.RequestSenderIp, + Service: entry.Service, + Summary: entry.Path, + StatusCode: entry.Status, + Method: entry.Method, + Timestamp: entry.Timestamp, + SourceIp: entry.SourceIp, + DestinationIp: entry.DestinationIp, + SourcePort: entry.SourcePort, + DestinationPort: entry.DestinationPort, + IsOutgoing: entry.IsOutgoing, + Latency: entry.ElapsedTime, + Rules: api.ApplicableRules{ + Latency: 0, + Status: false, + }, + } +} + +func (d dissecting) Represent(entry *api.MizuEntry) (p api.Protocol, object []byte, bodySize int64, err error) { + p = protocol + bodySize = 0 + var root map[string]interface{} + json.Unmarshal([]byte(entry.Entry), &root) + representation := make(map[string]interface{}, 0) + request := root["request"].(map[string]interface{})["payload"].(map[string]interface{}) + response := root["response"].(map[string]interface{})["payload"].(map[string]interface{}) + reqDetails := request["details"].(map[string]interface{}) + resDetails := response["details"].(map[string]interface{}) + repRequest := representGeneric(reqDetails) + repResponse := representGeneric(resDetails) + representation["request"] = repRequest + representation["response"] = repResponse + object, err = json.Marshal(representation) + return +} + +var Dissector dissecting diff --git a/tap/extensions/redis/matcher.go b/tap/extensions/redis/matcher.go new file mode 100644 index 000000000..71d5770d4 --- /dev/null +++ b/tap/extensions/redis/matcher.go @@ -0,0 +1,102 @@ +package main + +import ( + "fmt" + "strings" + "sync" + "time" + + "github.com/up9inc/mizu/tap/api" +) + +var reqResMatcher = createResponseRequestMatcher() // global + +// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}_{incremental_counter} +type requestResponseMatcher struct { + openMessagesMap *sync.Map +} + +func createResponseRequestMatcher() requestResponseMatcher { + newMatcher := &requestResponseMatcher{openMessagesMap: &sync.Map{}} + return *newMatcher +} + +func (matcher *requestResponseMatcher) registerRequest(ident string, request *RedisPacket, captureTime time.Time) *api.OutputChannelItem { + split := splitIdent(ident) + key := genKey(split) + + requestRedisMessage := api.GenericMessage{ + IsRequest: true, + CaptureTime: captureTime, + Payload: RedisPayload{ + Data: &RedisWrapper{ + Method: string(request.Command), + Url: "", + Details: request, + }, + }, + } + + if response, found := matcher.openMessagesMap.LoadAndDelete(key); found { + // Type assertion always succeeds because all of the map's values are of api.GenericMessage type + responseRedisMessage := response.(*api.GenericMessage) + if responseRedisMessage.IsRequest { + return nil + } + return matcher.preparePair(&requestRedisMessage, responseRedisMessage) + } + + matcher.openMessagesMap.Store(key, &requestRedisMessage) + return nil +} + +func (matcher *requestResponseMatcher) registerResponse(ident string, response *RedisPacket, captureTime time.Time) *api.OutputChannelItem { + split := splitIdent(ident) + key := genKey(split) + + responseRedisMessage := api.GenericMessage{ + IsRequest: false, + CaptureTime: captureTime, + Payload: RedisPayload{ + Data: &RedisWrapper{ + Method: string(response.Command), + Url: "", + Details: response, + }, + }, + } + + if request, found := matcher.openMessagesMap.LoadAndDelete(key); found { + // Type assertion always succeeds because all of the map's values are of api.GenericMessage type + requestRedisMessage := request.(*api.GenericMessage) + if !requestRedisMessage.IsRequest { + return nil + } + return matcher.preparePair(requestRedisMessage, &responseRedisMessage) + } + + matcher.openMessagesMap.Store(key, &responseRedisMessage) + return nil +} + +func (matcher *requestResponseMatcher) preparePair(requestRedisMessage *api.GenericMessage, responseRedisMessage *api.GenericMessage) *api.OutputChannelItem { + return &api.OutputChannelItem{ + Protocol: protocol, + Timestamp: requestRedisMessage.CaptureTime.UnixNano() / int64(time.Millisecond), + ConnectionInfo: nil, + Pair: &api.RequestResponsePair{ + Request: *requestRedisMessage, + Response: *responseRedisMessage, + }, + } +} + +func splitIdent(ident string) []string { + ident = strings.Replace(ident, "->", " ", -1) + return strings.Split(ident, " ") +} + +func genKey(split []string) string { + key := fmt.Sprintf("%s:%s->%s:%s,%s", split[0], split[2], split[1], split[3], split[4]) + return key +} diff --git a/tap/extensions/redis/read.go b/tap/extensions/redis/read.go new file mode 100644 index 000000000..93e147303 --- /dev/null +++ b/tap/extensions/redis/read.go @@ -0,0 +1,474 @@ +package main + +import ( + "bufio" + "errors" + "fmt" + "log" + "math" + "reflect" + "strconv" + "strings" + "time" +) + +const ( + askPrefix = "ASK " + movedPrefix = "MOVED " + clusterDownPrefix = "CLUSTERDOWN " + busyPrefix = "BUSY " + noscriptPrefix = "NOSCRIPT " + + defaultHost = "localhost" + defaultPort = 6379 + defaultSentinelPort = 26379 + defaultTimeout = 5 * time.Second + defaultDatabase = 2 * time.Second + + dollarByte = '$' + asteriskByte = '*' + plusByte = '+' + minusByte = '-' + colonByte = ':' + notApplicableByte = '0' + + sentinelMasters = "masters" + sentinelGetMasterAddrByName = "get-master-addr-by-name" + sentinelReset = "reset" + sentinelSlaves = "slaves" + sentinelFailOver = "failover" + sentinelMonitor = "monitor" + sentinelRemove = "remove" + sentinelSet = "set" + + clusterNodes = "nodes" + clusterMeet = "meet" + clusterReset = "reset" + clusterAddSlots = "addslots" + clusterDelSlots = "delslots" + clusterInfo = "info" + clusterGetKeysInSlot = "getkeysinslot" + clusterSetSlot = "setslot" + clusterSetSlotNode = "node" + clusterSetSlotMigrating = "migrating" + clusterSetSlotImporting = "importing" + clusterSetSlotStable = "stable" + clusterForget = "forget" + clusterFlushSlot = "flushslots" + clusterKeySlot = "keyslot" + clusterCountKeyInSlot = "countkeysinslot" + clusterSaveConfig = "saveconfig" + clusterReplicate = "replicate" + clusterSlaves = "slaves" + clusterFailOver = "failover" + clusterSlots = "slots" + pubSubChannels = "channels" + pubSubNumSub = "numsub" + pubSubNumPat = "numpat" +) + +//intToByteArr convert int to byte array +func intToByteArr(a int) []byte { + buf := make([]byte, 0) + return strconv.AppendInt(buf, int64(a), 10) +} + +var ( + bytesTrue = intToByteArr(1) + bytesFalse = intToByteArr(0) + bytesTilde = []byte("~") + + positiveInfinityBytes = []byte("+inf") + negativeInfinityBytes = []byte("-inf") +) + +var ( + sizeTable = []int{9, 99, 999, 9999, 99999, 999999, 9999999, 99999999, + 999999999, math.MaxInt32} + + digitTens = []byte{'0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '1', + '1', '1', '1', '1', '1', '1', '1', '1', '1', '2', '2', '2', '2', '2', '2', '2', '2', '2', + '2', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', '4', '4', '4', '4', '4', '4', '4', + '4', '4', '4', '5', '5', '5', '5', '5', '5', '5', '5', '5', '5', '6', '6', '6', '6', '6', + '6', '6', '6', '6', '6', '7', '7', '7', '7', '7', '7', '7', '7', '7', '7', '8', '8', '8', + '8', '8', '8', '8', '8', '8', '8', '9', '9', '9', '9', '9', '9', '9', '9', '9', '9'} + + digitOnes = []byte{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', + '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', + '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', + '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', + '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', + '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9'} + + digits = []byte{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', + 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', + 't', 'u', 'v', 'w', 'x', 'y', 'z'} +) + +// receive message from redis +type RedisInputStream struct { + *bufio.Reader + Buf []byte + count int + limit int +} + +func (r *RedisInputStream) readByte() (byte, error) { + err := r.ensureFill() + if err != nil { + return 0, err + } + ret := r.Buf[r.count] + r.count++ + return ret, nil +} + +func (r *RedisInputStream) ensureFill() error { + if r.count < r.limit { + return nil + } + var err error + r.limit, err = r.Read(r.Buf) + if err != nil { + return newConnectError(err.Error()) + } + r.count = 0 + if r.limit == -1 { + return newConnectError("Unexpected end of stream") + } + return nil +} + +func (r *RedisInputStream) readLine() (string, error) { + buf := "" + for { + err := r.ensureFill() + if err != nil { + return "", err + } + b := r.Buf[r.count] + r.count++ + if b == '\r' { + err := r.ensureFill() + if err != nil { + return "", err + } + c := r.Buf[r.count] + r.count++ + if c == '\n' { + break + } + buf += string(b) + buf += string(c) + } else { + buf += string(b) + } + } + if buf == "" { + return "", newConnectError("It seems like server has closed the connection.") + } + return buf, nil +} + +func (r *RedisInputStream) readLineBytes() ([]byte, error) { + err := r.ensureFill() + if err != nil { + return nil, err + } + pos := r.count + buf := r.Buf + for { + if pos == r.limit { + return r.readLineBytesSlowly() + } + p := buf[pos] + pos++ + if p == '\r' { + if pos == r.limit { + return r.readLineBytesSlowly() + } + p := buf[pos] + pos++ + if p == '\n' { + break + } + } + } + N := pos - r.count - 2 + line := make([]byte, N) + j := 0 + for i := r.count; i <= N; i++ { + line[j] = buf[i] + j++ + } + r.count = pos + return line, nil +} + +func (r *RedisInputStream) readLineBytesSlowly() ([]byte, error) { + buf := make([]byte, 0) + for { + err := r.ensureFill() + if err != nil { + return nil, err + } + b := r.Buf[r.count] + r.count++ + if b == 'r' { + err := r.ensureFill() + if err != nil { + return nil, err + } + c := r.Buf[r.count] + r.count++ + if c == '\n' { + break + } + buf = append(buf, b) + buf = append(buf, c) + } else { + buf = append(buf, b) + } + } + return buf, nil +} + +func (r *RedisInputStream) readIntCrLf() (int64, error) { + err := r.ensureFill() + if err != nil { + return 0, err + } + buf := r.Buf + isNeg := false + if buf[r.count] == '-' { + isNeg = true + } + if isNeg { + r.count++ + } + value := int64(0) + for { + err := r.ensureFill() + if err != nil { + return 0, err + } + b := buf[r.count] + r.count++ + if b == '\r' { + err := r.ensureFill() + if err != nil { + return 0, err + } + c := buf[r.count] + r.count++ + if c != '\n' { + return 0, newConnectError("Unexpected character!") + } + break + } else { + value = value*10 + int64(b) - int64('0') + } + } + if isNeg { + return -value, nil + } + return value, nil +} + +type RedisProtocol struct { + is *RedisInputStream +} + +func NewProtocol(is *RedisInputStream) *RedisProtocol { + return &RedisProtocol{ + is: is, + } +} + +func (p *RedisProtocol) Read() (packet *RedisPacket, err error) { + x, r, err := p.process() + if err != nil { + return + } + packet = &RedisPacket{} + packet.Type = r + + switch x.(type) { + case []interface{}: + array := x.([]interface{}) + packet.Command = RedisCommand(strings.ToUpper(string(array[0].([]uint8)))) + if len(array) > 1 { + packet.Key = string(array[1].([]uint8)) + } + if len(array) > 2 { + packet.Value = string(array[2].([]uint8)) + } + case []uint8: + val := string(x.([]uint8)) + if packet.Type == types[plusByte] { + packet.Keyword = RedisKeyword(strings.ToUpper(val)) + if !isValidRedisKeyword(keywords, packet.Keyword) { + err = errors.New(fmt.Sprintf("Unrecognized keyword: %s", string(packet.Command))) + return + } + } else { + packet.Value = val + } + case string: + packet.Value = x.(string) + default: + msg := fmt.Sprintf("Unrecognized Redis data type: %v\n", reflect.TypeOf(x)) + log.Printf(msg) + err = errors.New(msg) + return + } + + if packet.Command != "" { + if !isValidRedisCommand(commands, packet.Command) { + err = errors.New(fmt.Sprintf("Unrecognized command: %s", string(packet.Command))) + return + } + } + + return +} + +func (p *RedisProtocol) process() (v interface{}, r RedisType, err error) { + b, err := p.is.readByte() + if err != nil { + return nil, types[notApplicableByte], newConnectError(err.Error()) + } + switch b { + case plusByte: + v, err = p.processSimpleString() + r = types[plusByte] + return + case dollarByte: + v, err = p.processBulkString() + r = types[dollarByte] + return + case asteriskByte: + v, err = p.processArray() + r = types[asteriskByte] + return + case colonByte: + v, err = p.processInteger() + r = types[colonByte] + return + case minusByte: + v, err = p.processError() + r = types[minusByte] + return + default: + return nil, types[notApplicableByte], newConnectError(fmt.Sprintf("Unknown reply: %b", b)) + } +} + +func (p *RedisProtocol) processSimpleString() ([]byte, error) { + return p.is.readLineBytes() +} + +func (p *RedisProtocol) processBulkString() ([]byte, error) { + l, err := p.is.readIntCrLf() + if err != nil { + return nil, newConnectError(err.Error()) + } + if l == -1 { + return nil, nil + } + line := make([]byte, 0) + for { + err := p.is.ensureFill() + if err != nil { + return nil, err + } + b := p.is.Buf[p.is.count] + p.is.count++ + if b == '\r' { + err := p.is.ensureFill() + if err != nil { + return nil, err + } + c := p.is.Buf[p.is.count] + p.is.count++ + if c != '\n' { + return nil, newConnectError("Unexpected character!") + } + break + } else { + line = append(line, b) + } + } + return line, nil +} + +func (p *RedisProtocol) processArray() ([]interface{}, error) { + l, err := p.is.readIntCrLf() + if err != nil { + return nil, newConnectError(err.Error()) + } + if l == -1 { + return nil, nil + } + ret := make([]interface{}, 0) + for i := 0; i < int(l); i++ { + if obj, _, err := p.process(); err != nil { + ret = append(ret, err) + } else { + ret = append(ret, obj) + } + } + return ret, nil +} + +func (p *RedisProtocol) processInteger() (int64, error) { + return p.is.readIntCrLf() +} + +func (p *RedisProtocol) processError() (interface{}, error) { + msg, err := p.is.readLine() + if err != nil { + return nil, newConnectError(err.Error()) + } + if strings.HasPrefix(msg, movedPrefix) { + host, port, slot, err := p.parseTargetHostAndSlot(msg) + if err != nil { + return nil, err + } + return fmt.Sprintf("MovedDataError: %s host: %s port: %d slot: %d", msg, host, port, slot), nil + } else if strings.HasPrefix(msg, askPrefix) { + host, port, slot, err := p.parseTargetHostAndSlot(msg) + if err != nil { + return nil, err + } + return fmt.Sprintf("AskDataError: %s host: %s port: %d slot: %d", msg, host, port, slot), nil + } else if strings.HasPrefix(msg, clusterDownPrefix) { + return fmt.Sprintf("ClusterError: %s", msg), nil + } else if strings.HasPrefix(msg, busyPrefix) { + return fmt.Sprintf("BusyError: %s", msg), nil + } else if strings.HasPrefix(msg, noscriptPrefix) { + return fmt.Sprintf("NoScriptError: %s", msg), nil + } + return fmt.Sprintf("DataError: %s", msg), nil +} + +func (p *RedisProtocol) parseTargetHostAndSlot(clusterRedirectResponse string) (host string, po int, slot int, err error) { + arr := strings.Split(clusterRedirectResponse, " ") + host, port := p.extractParts(arr[2]) + slot, err = strconv.Atoi(arr[1]) + po, err = strconv.Atoi(port) + return +} + +func (p *RedisProtocol) extractParts(from string) (string, string) { + idx := strings.LastIndex(from, ":") + host := from + if idx != -1 { + host = from[0:idx] + } + port := "" + if idx != -1 { + port = from[idx+1:] + } + return host, port +} diff --git a/tap/extensions/redis/structs.go b/tap/extensions/redis/structs.go new file mode 100644 index 000000000..921c1f029 --- /dev/null +++ b/tap/extensions/redis/structs.go @@ -0,0 +1,290 @@ +package main + +type RedisType string +type RedisCommand string +type RedisKeyword string + +var types map[rune]RedisType = map[rune]RedisType{ + plusByte: "Simple String", + dollarByte: "Bulk String", + asteriskByte: "Array", + colonByte: "Integer", + minusByte: "Error", + notApplicableByte: "N/A", +} + +var commands []RedisCommand = []RedisCommand{ + "PING", + "SET", + "GET", + "QUIT", + "EXISTS", + "DEL", + "UNLINK", + "TYPE", + "FLUSHDB", + "KEYS", + "RANDOMKEY", + "RENAME", + "RENAMENX", + "RENAMEX", + "DBSIZE", + "EXPIRE", + "EXPIREAT", + "TTL", + "SELECT", + "MOVE", + "FLUSHALL", + "GETSET", + "MGET", + "SETNX", + "SETEX", + "MSET", + "MSETNX", + "DECRBY", + "DECR", + "INCRBY", + "INCR", + "APPEND", + "SUBSTR", + "HSET", + "HGET", + "HSETNX", + "HMSET", + "HMGET", + "HINCRBY", + "HEXISTS", + "HDEL", + "HLEN", + "HKEYS", + "HVALS", + "HGETALL", + "RPUSH", + "LPUSH", + "LLEN", + "LRANGE", + "LTRIM", + "LINDEX", + "LSET", + "LREM", + "LPOP", + "RPOP", + "RPOPLPUSH", + "SADD", + "SMEMBERS", + "SREM", + "SPOP", + "SMOVE", + "SCARD", + "SISMEMBER", + "SINTER", + "SINTERSTORE", + "SUNION", + "SUNIONSTORE", + "SDIFF", + "SDIFFSTORE", + "SRANDMEMBER", + "ZADD", + "ZRANGE", + "ZREM", + "ZINCRBY", + "ZRANK", + "ZREVRANK", + "ZREVRANGE", + "ZCARD", + "ZSCORE", + "MULTI", + "DISCARD", + "EXEC", + "WATCH", + "UNWATCH", + "SORT", + "BLPOP", + "BRPOP", + "AUTH", + "SUBSCRIBE", + "PUBLISH", + "UNSUBSCRIBE", + "PSUBSCRIBE", + "PUNSUBSCRIBE", + "PUBSUB", + "ZCOUNT", + "ZRANGEBYSCORE", + "ZREVRANGEBYSCORE", + "ZREMRANGEBYRANK", + "ZREMRANGEBYSCORE", + "ZUNIONSTORE", + "ZINTERSTORE", + "ZLEXCOUNT", + "ZRANGEBYLEX", + "ZREVRANGEBYLEX", + "ZREMRANGEBYLEX", + "SAVE", + "BGSAVE", + "BGREWRITEAOF", + "LASTSAVE", + "SHUTDOWN", + "INFO", + "MONITOR", + "SLAVEOF", + "CONFIG", + "STRLEN", + "SYNC", + "LPUSHX", + "PERSIST", + "RPUSHX", + "ECHO", + "LINSERT", + "DEBUG", + "BRPOPLPUSH", + "SETBIT", + "GETBIT", + "BITPOS", + "SETRANGE", + "GETRANGE", + "EVAL", + "EVALSHA", + "SCRIPT", + "SLOWLOG", + "OBJECT", + "BITCOUNT", + "BITOP", + "SENTINEL", + "DUMP", + "RESTORE", + "PEXPIRE", + "PEXPIREAT", + "PTTL", + "INCRBYFLOAT", + "PSETEX", + "CLIENT", + "TIME", + "MIGRATE", + "HINCRBYFLOAT", + "SCAN", + "HSCAN", + "SSCAN", + "ZSCAN", + "WAIT", + "CLUSTER", + "ASKING", + "PFADD", + "PFCOUNT", + "PFMERGE", + "READONLY", + "GEOADD", + "GEODIST", + "GEOHASH", + "GEOPOS", + "GEORADIUS", + "GEORADIUS_RO", + "GEORADIUSBYMEMBER", + "GEORADIUSBYMEMBER_RO", + "MODULE", + "BITFIELD", + "HSTRLEN", + "TOUCH", + "SWAPDB", + "MEMORY", + "XADD", + "XLEN", + "XDEL", + "XTRIM", + "XRANGE", + "XREVRANGE", + "XREAD", + "XACK", + "XGROUP", + "XREADGROUP", + "XPENDING", + "XCLAIM", +} + +var keywords []RedisKeyword = []RedisKeyword{ + "AGGREGATE", + "ALPHA", + "ASC", + "BY", + "DESC", + "GET", + "LIMIT", + "MESSAGE", + "NO", + "NOSORT", + "PMESSAGE", + "PSUBSCRIBE", + "PUNSUBSCRIBE", + "OK", + "ONE", + "QUEUED", + "SET", + "STORE", + "SUBSCRIBE", + "UNSUBSCRIBE", + "WEIGHTS", + "WITHSCORES", + "RESETSTAT", + "REWRITE", + "RESET", + "FLUSH", + "EXISTS", + "LOAD", + "KILL", + "LEN", + "REFCOUNT", + "ENCODING", + "IDLETIME", + "GETNAME", + "SETNAME", + "LIST", + "MATCH", + "COUNT", + "PING", + "PONG", + "UNLOAD", + "REPLACE", + "KEYS", + "PAUSE", + "DOCTOR", + "BLOCK", + "NOACK", + "STREAMS", + "KEY", + "CREATE", + "MKSTREAM", + "SETID", + "DESTROY", + "DELCONSUMER", + "MAXLEN", + "GROUP", + "IDLE", + "TIME", + "RETRYCOUNT", + "FORCE", +} + +type RedisPacket struct { + Type RedisType `json:"type"` + Command RedisCommand `json:"command"` + Key string `json:"key"` + Value string `json:"value"` + Keyword RedisKeyword `json:"keyword"` +} + +func isValidRedisCommand(s []RedisCommand, c RedisCommand) bool { + for _, v := range s { + if v == c { + return true + } + } + return false +} + +func isValidRedisKeyword(s []RedisKeyword, c RedisKeyword) bool { + for _, v := range s { + if v == c { + return true + } + } + return false +} diff --git a/ui/package-lock.json b/ui/package-lock.json index 5da12d0c0..f6a4eff73 100644 --- a/ui/package-lock.json +++ b/ui/package-lock.json @@ -13454,9 +13454,9 @@ } }, "react-scrollable-feed-virtualized": { - "version": "1.4.2", - "resolved": "https://registry.npmjs.org/react-scrollable-feed-virtualized/-/react-scrollable-feed-virtualized-1.4.2.tgz", - "integrity": "sha512-j6M80ETqhSRBlygWe491gMPqCiAkVUQsLd/JR7DCKsZF44IYlJiunZWjdWe3//gxddTL68pjqq8pMvd1YN1bgw==" + "version": "1.4.3", + "resolved": "https://registry.npmjs.org/react-scrollable-feed-virtualized/-/react-scrollable-feed-virtualized-1.4.3.tgz", + "integrity": "sha512-M9WgJKr57jCyWKNCksc3oi+xhtO0YbL9d7Ll8Sdc5ZWOIstNvdNbNX0k4Nq6kXUVaHCJ9qE8omdSI/CxT3MLAQ==" }, "react-syntax-highlighter": { "version": "15.4.3", diff --git a/ui/package.json b/ui/package.json index c5843d388..463ef8c4b 100644 --- a/ui/package.json +++ b/ui/package.json @@ -21,7 +21,7 @@ "react-copy-to-clipboard": "^5.0.3", "react-dom": "^17.0.2", "react-scripts": "4.0.3", - "react-scrollable-feed-virtualized": "^1.4.2", + "react-scrollable-feed-virtualized": "^1.4.3", "react-syntax-highlighter": "^15.4.3", "typescript": "^4.2.4", "web-vitals": "^1.1.1" diff --git a/ui/src/App.tsx b/ui/src/App.tsx index 003fbc8fe..3783e061f 100644 --- a/ui/src/App.tsx +++ b/ui/src/App.tsx @@ -91,7 +91,7 @@ const App = () => { - + return (
diff --git a/ui/src/components/EntriesList.tsx b/ui/src/components/EntriesList.tsx index e67eb8289..53e653a05 100644 --- a/ui/src/components/EntriesList.tsx +++ b/ui/src/components/EntriesList.tsx @@ -19,7 +19,8 @@ interface EntriesListProps { setNoMoreDataBottom: (flag: boolean) => void; methodsFilter: Array; statusFilter: Array; - pathFilter: string + pathFilter: string; + serviceFilter: string; listEntryREF: any; onScrollEvent: (isAtBottom:boolean) => void; scrollableList: boolean; @@ -32,7 +33,7 @@ enum FetchOperator { const api = new Api(); -export const EntriesList: React.FC = ({entries, setEntries, focusedEntryId, setFocusedEntryId, connectionOpen, noMoreDataTop, setNoMoreDataTop, noMoreDataBottom, setNoMoreDataBottom, methodsFilter, statusFilter, pathFilter, listEntryREF, onScrollEvent, scrollableList}) => { +export const EntriesList: React.FC = ({entries, setEntries, focusedEntryId, setFocusedEntryId, connectionOpen, noMoreDataTop, setNoMoreDataTop, noMoreDataBottom, setNoMoreDataBottom, methodsFilter, statusFilter, pathFilter, serviceFilter, listEntryREF, onScrollEvent, scrollableList}) => { const [loadMoreTop, setLoadMoreTop] = useState(false); const [isLoadingTop, setIsLoadingTop] = useState(false); @@ -54,10 +55,11 @@ export const EntriesList: React.FC = ({entries, setEntries, fo const filterEntries = useCallback((entry) => { if(methodsFilter.length > 0 && !methodsFilter.includes(entry.method.toLowerCase())) return; if(pathFilter && entry.path?.toLowerCase()?.indexOf(pathFilter) === -1) return; + if(serviceFilter && entry.service?.toLowerCase()?.indexOf(serviceFilter) === -1) return; if(statusFilter.includes(StatusType.SUCCESS) && entry.statusCode >= 400) return; if(statusFilter.includes(StatusType.ERROR) && entry.statusCode < 400) return; return entry; - },[methodsFilter, pathFilter, statusFilter]) + },[methodsFilter, pathFilter, statusFilter, serviceFilter]) const filteredEntries = useMemo(() => { return entries.filter(filterEntries); diff --git a/ui/src/components/EntryDetailed.tsx b/ui/src/components/EntryDetailed.tsx index 5c7045d6f..65e6ce8b3 100644 --- a/ui/src/components/EntryDetailed.tsx +++ b/ui/src/components/EntryDetailed.tsx @@ -41,7 +41,7 @@ const EntryTitle: React.FC = ({protocol, data, bodySize, elapsedTime}) => {
{response.payload &&
{formatSize(bodySize)}
} -
{Math.round(elapsedTime)}ms
+ {response.payload &&
{Math.round(elapsedTime)}ms
}
; }; @@ -71,7 +71,7 @@ export const EntryDetailed: React.FC = ({entryData}) => { /> {entryData.data && } <> - {entryData.data && } + {entryData.data && } }; diff --git a/ui/src/components/EntryDetailed/EntrySections.tsx b/ui/src/components/EntryDetailed/EntrySections.tsx index 9b9d9d661..586d40d79 100644 --- a/ui/src/components/EntryDetailed/EntrySections.tsx +++ b/ui/src/components/EntryDetailed/EntrySections.tsx @@ -153,10 +153,8 @@ export const EntryTableSection: React.FC = ({title, color, ar interface EntryPolicySectionProps { - service: string, title: string, color: string, - response: any, latency?: number, arrayToIterate: any[], } @@ -200,7 +198,7 @@ export const EntryPolicySectionContainer: React.FC } -export const EntryTablePolicySection: React.FC = ({service, title, color, response, latency, arrayToIterate}) => { +export const EntryTablePolicySection: React.FC = ({title, color, latency, arrayToIterate}) => { return { arrayToIterate && arrayToIterate.length > 0 ? diff --git a/ui/src/components/EntryDetailed/EntryViewer.tsx b/ui/src/components/EntryDetailed/EntryViewer.tsx index 9271a0f47..04ef31074 100644 --- a/ui/src/components/EntryDetailed/EntryViewer.tsx +++ b/ui/src/components/EntryDetailed/EntryViewer.tsx @@ -33,8 +33,8 @@ const SectionsRepresentation: React.FC = ({data, color}) => { return <>{sections}; } -const AutoRepresentation: React.FC = ({representation, rulesMatched, elapsedTime, color}) => { - const TABS = [ +const AutoRepresentation: React.FC = ({representation, isRulesEnabled, rulesMatched, elapsedTime, color}) => { + var TABS = [ { tab: 'request' }, @@ -54,6 +54,14 @@ const AutoRepresentation: React.FC = ({representation, rulesMatched, elapse const {request, response} = JSON.parse(representation); + if (!response) { + TABS[1]['hidden'] = true; + } + + if (!isRulesEnabled) { + TABS.pop() + } + return
{
@@ -63,11 +71,11 @@ const AutoRepresentation: React.FC = ({representation, rulesMatched, elapse {currentTab === TABS[0].tab && } - {currentTab === TABS[1].tab && + {response && currentTab === TABS[1].tab && } - {currentTab === TABS[2].tab && - + {TABS.length > 2 && currentTab === TABS[2].tab && + }
}
; @@ -75,13 +83,14 @@ const AutoRepresentation: React.FC = ({representation, rulesMatched, elapse interface Props { representation: any; + isRulesEnabled: boolean; rulesMatched: any; color: string; elapsedTime: number; } -const EntryViewer: React.FC = ({representation, rulesMatched, elapsedTime, color}) => { - return +const EntryViewer: React.FC = ({representation, isRulesEnabled, rulesMatched, elapsedTime, color}) => { + return }; export default EntryViewer; diff --git a/ui/src/components/EntryListItem/EntryListItem.module.sass b/ui/src/components/EntryListItem/EntryListItem.module.sass index ddedd0ae2..2c6ade2c8 100644 --- a/ui/src/components/EntryListItem/EntryListItem.module.sass +++ b/ui/src/components/EntryListItem/EntryListItem.module.sass @@ -36,8 +36,8 @@ border-left: 5px $failure-color solid .ruleNumberText - font-size: 12px; - font-style: italic; + font-size: 12px + font-weight: 600 .ruleNumberTextFailure color: #DB2156 diff --git a/ui/src/components/EntryListItem/EntryListItem.tsx b/ui/src/components/EntryListItem/EntryListItem.tsx index 81da0a56c..0e8c04546 100644 --- a/ui/src/components/EntryListItem/EntryListItem.tsx +++ b/ui/src/components/EntryListItem/EntryListItem.tsx @@ -68,7 +68,7 @@ export const EntryItem: React.FC = ({entry, setFocusedEntryId, isSel let rule = 'latency' in entry.rules if (rule) { if (entry.rules.latency !== -1) { - if (entry.rules.latency >= entry.latency) { + if (entry.rules.latency >= entry.latency || !('latency' in entry)) { additionalRulesProperties = styles.ruleSuccessRow ruleSuccess = true } else { diff --git a/ui/src/components/Filters.tsx b/ui/src/components/Filters.tsx index c7188c2a5..c1795a67a 100644 --- a/ui/src/components/Filters.tsx +++ b/ui/src/components/Filters.tsx @@ -11,13 +11,16 @@ interface FiltersProps { setStatusFilter: (methods: Array) => void; pathFilter: string setPathFilter: (val: string) => void; + serviceFilter: string + setServiceFilter: (val: string) => void; } -export const Filters: React.FC = ({methodsFilter, setMethodsFilter, statusFilter, setStatusFilter, pathFilter, setPathFilter}) => { +export const Filters: React.FC = ({methodsFilter, setMethodsFilter, statusFilter, setStatusFilter, pathFilter, setPathFilter, serviceFilter, setServiceFilter}) => { return
+
; }; @@ -117,3 +120,18 @@ const PathFilter: React.FC = ({pathFilter, setPathFilter}) => { ; }; +interface ServiceFilterProps { + serviceFilter: string; + setServiceFilter: (val: string) => void; +} + +const ServiceFilter: React.FC = ({serviceFilter, setServiceFilter}) => { + + return +
Service
+
+ setServiceFilter(e.target.value)}/> +
+
; +}; + diff --git a/ui/src/components/TrafficPage.tsx b/ui/src/components/TrafficPage.tsx index 3e9b2911b..fa365fc36 100644 --- a/ui/src/components/TrafficPage.tsx +++ b/ui/src/components/TrafficPage.tsx @@ -58,6 +58,7 @@ export const TrafficPage: React.FC = ({setAnalyzeStatus, onTLS const [methodsFilter, setMethodsFilter] = useState([]); const [statusFilter, setStatusFilter] = useState([]); const [pathFilter, setPathFilter] = useState(""); + const [serviceFilter, setServiceFilter] = useState(""); const [tappingStatus, setTappingStatus] = useState(null); @@ -192,6 +193,8 @@ export const TrafficPage: React.FC = ({setAnalyzeStatus, onTLS setStatusFilter={setStatusFilter} pathFilter={pathFilter} setPathFilter={setPathFilter} + serviceFilter={serviceFilter} + setServiceFilter={setServiceFilter} />
= ({setAnalyzeStatus, onTLS methodsFilter={methodsFilter} statusFilter={statusFilter} pathFilter={pathFilter} + serviceFilter={serviceFilter} listEntryREF={listEntry} onScrollEvent={onScrollEvent} scrollableList={disableScrollList}